diff --git a/hint_extractor.py b/hint_extractor.py index fbac646..a11bace 100644 --- a/hint_extractor.py +++ b/hint_extractor.py @@ -51,6 +51,8 @@ _DISC_SECTION_RE = re.compile(r"(?i)(?:cd|disc|disk|side)[_ \-]*(\d{1,2})") def _clean(s: Optional[str]) -> str: if not s: return "" + # BOM (U+FEFF), Zero-Width-Space (U+200B), Soft-Hyphen (U+00AD) entfernen + s = re.sub(r"[​­]", "", s) return re.sub(r"\s+", " ", s.replace("_", " ")).strip(" -._") @@ -143,6 +145,34 @@ def _parse_tracklist(text: str) -> List[Dict[str, str]]: return tracks +def _parse_m3u(text: str) -> List[Dict[str, str]]: + """M3U/M3U8 → geordnete Liste: [{filename, title, position}]. + Reihenfolge der Einträge = gewünschte Trackreihenfolge. + """ + tracks: List[Dict[str, str]] = [] + pending_title: Optional[str] = None + position = 0 + for line in text.splitlines(): + line = line.strip() + if not line: + continue + if line.upper().startswith("#EXTINF:"): + parts = line.split(",", 1) + pending_title = parts[1].strip() if len(parts) > 1 else None + elif not line.startswith("#"): + filename = Path(line.replace("\\", "/")).name + if not filename: + continue + position += 1 + tracks.append({ + "position": str(position), + "filename": filename, + "title": pending_title or "", + }) + pending_title = None + return tracks + + def _read_tracklist_file(path: Path) -> Optional[str]: try: if path.suffix.lower() in (".htm", ".html"): @@ -203,6 +233,29 @@ def extract_hints(scan: AlbumScan) -> AlbumHints: parsed_tracklist = _parse_tracklist(hints.tracklist_text) if hints.tracklist_text else [] + # M3U/Playlist-Reihenfolge: filename (stem, normalisiert) → Tracknummer + m3u_order: Dict[str, int] = {} + m3u_titles: Dict[str, str] = {} + for pf in scan.playlist_files: + try: + text = pf.read_text(encoding="utf-8", errors="replace") + for entry in _parse_m3u(text): + stem = _clean(Path(entry["filename"]).stem).casefold() + pos = int(entry["position"]) + if stem and stem not in m3u_order: + m3u_order[stem] = pos + if entry.get("title"): + m3u_titles[stem] = entry["title"] + except Exception as e: + print(f" ⚠️ Playlist-Lesefehler {pf.name}: {e}", file=sys.stderr) + + # Tracklist-Lookup: normalisierter Titel → Eintrag (für titelbasiertes Matching) + tl_by_title: Dict[str, Dict[str, str]] = {} + for entry in parsed_tracklist: + key = _clean(entry.get("title", "")).casefold() + if key: + tl_by_title[key] = entry + # Build TrackHints per audio file for audio_path in sorted(scan.audio_files): tags, duration = _read_tags(audio_path) @@ -215,10 +268,18 @@ def extract_hints(scan: AlbumScan) -> AlbumHints: raw_tn = tags.get("tracknumber") or fn_hints.get("track") if raw_tn: try: - track_num = int(str(raw_tn).split("/")[0]) + tn_int = int(str(raw_tn).split("/")[0]) + if tn_int > 0: # 0 gilt als "keine Nummer" + track_num = tn_int except ValueError: pass + # Track number aus M3U-Reihenfolge (Vorrang vor Dateiname, aber nicht vor Tag) + if track_num is None: + stem_key = _clean(audio_path.stem).casefold() + if stem_key in m3u_order: + track_num = m3u_order[stem_key] + # Disc number: tag > filename > path segment raw_dn = tags.get("discnumber") or fn_hints.get("disc") if raw_dn: @@ -236,16 +297,27 @@ def extract_hints(scan: AlbumScan) -> AlbumHints: title = tags.get("title") or fn_hints.get("title") artist = tags.get("artist") or fn_hints.get("artist") - # Enrich from parsed tracklist if track_num matches - if parsed_tracklist and track_num: - for tl_entry in parsed_tracklist: - tl_track = tl_entry.get("track") - tl_disc = tl_entry.get("disc", "1") - if (tl_track and int(tl_track) == track_num - and int(tl_disc) == (disc_num or 1)): - if not _is_good(title) and _is_good(tl_entry.get("title")): - title = tl_entry["title"] - break + # Tracklist: erst nach Nummer, dann nach Titel + if parsed_tracklist: + matched_tl: Optional[Dict[str, str]] = None + if track_num: + for tl_entry in parsed_tracklist: + tl_track = tl_entry.get("track") + tl_disc = tl_entry.get("disc", "1") + if (tl_track and int(tl_track) == track_num + and int(tl_disc) == (disc_num or 1)): + matched_tl = tl_entry + break + if matched_tl is None and title: + matched_tl = tl_by_title.get(_clean(title).casefold()) + if matched_tl and not _is_good(title) and _is_good(matched_tl.get("title")): + title = matched_tl["title"] + + # M3U-Titel als Fallback (enthält "Composer - Title" — nur nutzen wenn kein besserer Titel) + if not _is_good(title): + stem_key = _clean(audio_path.stem).casefold() + if stem_key in m3u_titles: + title = m3u_titles[stem_key] hints.tracks.append(TrackHints( path=audio_path, diff --git a/metadata_resolver.py b/metadata_resolver.py index af06b91..2fd8eb5 100644 --- a/metadata_resolver.py +++ b/metadata_resolver.py @@ -40,8 +40,8 @@ OPENROUTER_API_KEY = os.getenv("OPENROUTER_API_KEY", "") DISCOGS_TOKEN = os.getenv("DISCOGS_TOKEN", "") OLLAMA_HOST = os.getenv("OLLAMA_HOST", "http://localhost:11434") -# Lokales Reasoning-Modell für Metadaten-Ergänzung (passt auf RTX 3090) -OLLAMA_RESOLVE_MODEL = os.getenv("OLLAMA_RESOLVE_MODEL", "qwen3.5:27b") +# qwen3:8b (5.2GB) reicht für einfache JSON-Metadaten-Ergänzung und lädt schnell (~10s) +OLLAMA_RESOLVE_MODEL = os.getenv("OLLAMA_RESOLVE_MODEL", "qwen3:8b") def _mb_wait(): @@ -184,10 +184,12 @@ def _build_resolve_prompt(hints: AlbumHints, partial: Dict) -> str: for t in hints.tracks[:20] ) return ( - "Du bist ein Musikexperte. Analysiere diese Album-Daten und vervollständige die fehlenden Felder.\n\n" + "Du bist ein Musikexperte. Analysiere diese Album-Daten.\n" + "Vervollständige fehlende Felder UND korrigiere erkennbare Tippfehler " + "(z.B. im Albumtitel oder Künstlernamen — Verzeichnisnamen enthalten oft Schreibfehler).\n\n" f"Verzeichnisname: {hints.album_dir.name}\n" - f"Bekannte Artist: {hints.dir_artist or partial.get('artist', 'unbekannt')}\n" - f"Bekannter Albumtitel: {hints.dir_album or partial.get('album', 'unbekannt')}\n" + f"Künstler (aus Verzeichnis): {hints.dir_artist or partial.get('artist', 'unbekannt')}\n" + f"Albumtitel (aus Verzeichnis, evtl. mit Tippfehlern): {hints.dir_album or partial.get('album', 'unbekannt')}\n" f"Jahr: {hints.dir_year or partial.get('year', 'unbekannt')}\n" f"Tracklist-Hinweise:\n{tracks_summary}\n\n" 'Antworte NUR mit einem JSON-Objekt mit diesen Feldern (null wenn unbekannt):\n' @@ -224,7 +226,7 @@ def _resolve_via_ollama(hints: AlbumHints, partial: Dict) -> Optional[Dict]: headers={"Content-Type": "application/json"}, method="POST", ) - with urllib.request.urlopen(req, timeout=120) as resp: + with urllib.request.urlopen(req, timeout=240) as resp: data = json.loads(resp.read()) text = data.get("message", {}).get("content", "").strip() return _parse_json_response(text) @@ -422,19 +424,34 @@ def resolve( if not artist or not album or confidence < 0.5: cl = _claude_resolve(hints, partial) if cl: - artist = artist or cl.get("artist") - album = album or cl.get("album") - year = year or cl.get("year") - genre = genre or cl.get("genre") - label = label or cl.get("label") + if confidence < 0.3: + # Sehr unsicher: LLM darf auch bestehende Werte korrigieren + # (z.B. Tippfehler im Albumtitel aus dem Verzeichnisnamen) + artist = cl.get("artist") or artist + album = cl.get("album") or album + year = cl.get("year") or year + genre = cl.get("genre") or genre + label = cl.get("label") or label + else: + artist = artist or cl.get("artist") + album = album or cl.get("album") + year = year or cl.get("year") + genre = genre or cl.get("genre") + label = label or cl.get("label") confidence += 0.10 sources.append("llm-resolve") # Finalize albumartist + # dir_artist hat Vorrang: wenn der Verzeichnisname einen Künstler nennt + # (z.B. "Eugen_Cicero_-_Jazz_meets_Classic"), ist das der Albumkünstler — + # auch wenn die Track-Dateinamen die Komponisten-Namen enthalten. track_artists = [t.artist for t in hints.tracks if t.artist] from collections import Counter distinct_artists = set(a for a in track_artists if a) - if len(distinct_artists) >= 3: + if hints.dir_artist: + # Verzeichnisname nennt explizit einen Künstler → immer verwenden + albumartist = hints.dir_artist + elif len(distinct_artists) >= 3: albumartist = "Various Artists" elif track_artists: albumartist = artist or Counter(track_artists).most_common(1)[0][0] @@ -500,4 +517,15 @@ def _build_track_proposals( mbid=None, )) + # Sequenzielle Nummerierung als letzter Fallback: + # Tracks ohne Nummer (None) erhalten eine laufende Nummer pro Disc. + # Damit werden "00" und "??" im Dateinamen beim --rename verhindert. + if any(p.track_number is None for p in proposals): + disc_counters: Dict[int, int] = {} + for p in proposals: + if p.track_number is None: + disc = p.disc_number or 1 + disc_counters[disc] = disc_counters.get(disc, 0) + 1 + p.track_number = disc_counters[disc] + return proposals diff --git a/models.py b/models.py index a5b3301..004b5da 100644 --- a/models.py +++ b/models.py @@ -26,6 +26,7 @@ class AlbumScan: audio_files: List[Path] = field(default_factory=list) image_files: List[Path] = field(default_factory=list) tracklist_files: List[Path] = field(default_factory=list) + playlist_files: List[Path] = field(default_factory=list) # .m3u / .m3u8 / .pls other_files: List[Path] = field(default_factory=list) diff --git a/scanner.py b/scanner.py index 04fa997..de06281 100644 --- a/scanner.py +++ b/scanner.py @@ -1,51 +1,91 @@ from __future__ import annotations +import re import sys from pathlib import Path from typing import List -from models import AlbumScan, AUDIO_EXTENSIONS, IMAGE_EXTENSIONS, TRACKLIST_EXTENSIONS +from models import AlbumScan, AUDIO_EXTENSIONS, IMAGE_EXTENSIONS, TRACKLIST_EXTENSIONS, PLAYLIST_EXTENSIONS + +_DISC_DIR_RE = re.compile(r"(?i)^(?:cd|disc|disk|side)[_ \-]*\d{1,2}$") def _is_hidden(name: str) -> bool: return name.startswith(".") or name.startswith("_") +def _is_disc_dir(name: str) -> bool: + """True für Ordner wie 'CD1', 'Disc 2', 'Side A', 'Disk_1'.""" + return bool(_DISC_DIR_RE.match(name)) + + def scan_album(album_dir: Path) -> AlbumScan: + """ + Scannt ein Album-Verzeichnis. + + Rekursions-Regel: + - Hat das Album-Verzeichnis selbst Audio-Dateien → kein Abstieg in Unterordner + (Einzelscheibe; Sub-Ordner wie Artworks, Scans, irrtümliche Kopien werden ignoriert). + - Hat der Root KEINE Audio-Dateien → Abstieg nur in Disc-Unterordner (CD1, Disc 2 …). + """ result = AlbumScan(album_dir=album_dir) - for dirpath, dirnames, filenames in album_dir.walk() if hasattr(album_dir, "walk") else _os_walk(album_dir): - dirnames[:] = [d for d in dirnames if not _is_hidden(d)] - current = Path(dirpath) if isinstance(dirpath, str) else dirpath + # Erst nur die Wurzel-Ebene scannen, um zu entscheiden ob rekursiert wird + root_has_audio = any( + (album_dir / name).suffix.lower() in AUDIO_EXTENSIONS + for name in _listdir(album_dir) + if not _is_hidden(name) + ) - for name in filenames: - if _is_hidden(name): - continue - p = current / name - ext = p.suffix.lower() - - if ext in AUDIO_EXTENSIONS: - result.audio_files.append(p) - elif ext in IMAGE_EXTENSIONS: - result.image_files.append(p) - elif ext in TRACKLIST_EXTENSIONS: - result.tracklist_files.append(p) - else: - result.other_files.append(p) + if root_has_audio: + # Nur Root-Ebene — keine Unterordner + _scan_dir(album_dir, album_dir, result, recurse=False) + else: + # Kein Audio an der Wurzel → Multi-CD: nur Disc-Unterordner + _scan_dir(album_dir, album_dir, result, recurse=True) result.audio_files.sort() result.image_files.sort() result.tracklist_files.sort() + result.playlist_files.sort() return result -def _os_walk(album_dir: Path): - import os - return os.walk( - album_dir, - followlinks=False, - onerror=lambda e: print(f"⚠️ Scan-Fehler: {e}", file=sys.stderr), - ) +def _listdir(path: Path) -> List[str]: + try: + return [e.name for e in path.iterdir()] + except (PermissionError, OSError) as e: + print(f"⚠️ Scan-Fehler: {e}", file=sys.stderr) + return [] + + +def _scan_dir(current: Path, album_dir: Path, result: AlbumScan, recurse: bool) -> None: + try: + entries = sorted(current.iterdir()) + except (PermissionError, OSError) as e: + print(f"⚠️ Scan-Fehler {current}: {e}", file=sys.stderr) + return + + for entry in entries: + name = entry.name + if _is_hidden(name): + continue + if entry.is_dir(): + if recurse and _is_disc_dir(name): + _scan_dir(entry, album_dir, result, recurse=True) + # Andere Unterordner (Artworks, irrtümliche Kopien…) werden übersprungen + elif entry.is_file(): + ext = entry.suffix.lower() + if ext in AUDIO_EXTENSIONS: + result.audio_files.append(entry) + elif ext in IMAGE_EXTENSIONS: + result.image_files.append(entry) + elif ext in TRACKLIST_EXTENSIONS: + result.tracklist_files.append(entry) + elif ext in PLAYLIST_EXTENSIONS: + result.playlist_files.append(entry) + else: + result.other_files.append(entry) def collect_album_dirs(root: Path) -> List[Path]: