from __future__ import annotations import base64 import json import os import re import shutil import subprocess import sys import urllib.request from pathlib import Path from typing import Optional, List, Dict, Tuple from models import AlbumScan, AlbumHints, TrackHints try: from mutagen import File as MutagenFile HAS_MUTAGEN = True except ImportError: HAS_MUTAGEN = False try: from bs4 import BeautifulSoup HAS_BS4 = True except ImportError: HAS_BS4 = False _NATSORT_RE = re.compile(r"(\d+)") _BAD_VALUES = {"unknown", "unknown artist", "unknown album", "untitled", "track", "va", "various"} # Filename patterns: most specific first _FILENAME_PATTERNS = [ re.compile(r"^(?P\d{1,2})[- _]+(?P\d{1,3})\s*[-._ ]+\s*(?P.+?)\s*[-–]\s*(?P.+)$"), re.compile(r"^(?P<disc>\d{1,2})[- _]+(?P<track>\d{1,3})\s*[-._ ]+\s*(?P<title>.+)$"), re.compile(r"^(?P<track>\d{1,3})\s*[-._ ]+\s*(?P<artist>.+?)\s*[-–]\s*(?P<title>.+)$"), re.compile(r"^(?P<track>\d{1,3})\s*[-._ ]+\s*(?P<title>.+)$"), re.compile(r"^(?P<artist>.+?)\s*[-–]\s*(?P<title>.+)$"), ] # Directory name patterns _DIR_PATTERNS = [ re.compile(r"^(?P<artist>.+?)[_ -]+[-–][_ -]+(?P<album>.+?)(?:[_ -]+(?P<year>\d{4}))?$"), re.compile(r"^(?P<artist>.+?)[_ ]+(?P<year>\d{4})[._ -]+(?P<album>.+)$"), re.compile(r"^(?P<album>.+?)[_ -]+(?P<year>\d{4})$"), ] # Tracklist line patterns _TRACKLIST_PATTERNS = [ re.compile(r"^(?P<disc>\d{1,2})[- _](?P<track>\d{1,3})\s+(?P<title>.+?)(?:\s+\d+:\d{2})?$"), # Separator muss . ) oder : sein — reines Leerzeichen reicht nicht # (verhindert False-Positives wie "2 x CD, Compilation, Remastered") re.compile(r"^(?P<track>\d{1,3})[.):]\s*(?P<title>.+?)(?:\s+\d+:\d{2})?$"), re.compile(r"^(?P<track>[A-Z]\d{1,2})[.):]\s*(?P<title>.+?)(?:\s+\d+:\d{2})?$"), ] _DISC_SECTION_RE = re.compile(r"(?i)(?:cd|disc|disk|side)[_ \-]*(\d{1,2})") def _clean(s: Optional[str]) -> str: if not s: return "" # BOM (U+FEFF), Zero-Width-Space (U+200B), Soft-Hyphen (U+00AD) entfernen s = re.sub(r"[​­]", "", s) return re.sub(r"\s+", " ", s.replace("_", " ")).strip(" -._") def _norm_for_match(s: str) -> str: """Nur Buchstaben und Ziffern — für fuzzy Titelvergleich (Interpunktion-agnostisch).""" return re.sub(r"[^a-z0-9]", "", s.casefold()) # Klassische Werkverzeichnis-Nummern: BWV 565, Op. 27, K. 331, HWV 56, … _CATALOG_RE = re.compile( r"\b(bwv|hwv|op|k|kv|d|sz|wq|bbwv|rv|twv|hob)\W*(\d+[a-z]?(?:[\/\.]\d+)?)", re.IGNORECASE, ) def _catalog_key(s: str) -> Optional[str]: """Extrahiert normalisierte Katalognummer, z.B. 'bwv565' oder 'op27'.""" m = _CATALOG_RE.search(s) if m: return m.group(1).lower() + re.sub(r"\W", "", m.group(2)) return None def _is_good(v: Optional[str]) -> bool: if not v: return False return _clean(v).casefold() not in _BAD_VALUES def _parse_dirname(name: str) -> Tuple[Optional[str], Optional[str], Optional[str]]: name_clean = _clean(name) for pat in _DIR_PATTERNS: m = pat.match(name_clean) if m: d = m.groupdict() artist = _clean(d.get("artist")) or None album = _clean(d.get("album")) or None year = d.get("year") if _is_good(artist) or _is_good(album): return artist, album, year # No pattern matched — treat whole name as album return None, _clean(name_clean), None def _parse_filename(stem: str) -> Dict[str, str]: stem_clean = _clean(stem) for pat in _FILENAME_PATTERNS: m = pat.match(stem_clean) if m: return {k: _clean(v) for k, v in m.groupdict().items() if v} return {"title": stem_clean} def _read_tags(path: Path) -> Tuple[Dict[str, str], Optional[float]]: if not HAS_MUTAGEN: return {}, None try: audio = MutagenFile(str(path), easy=True) if not audio: return {}, None tags: Dict[str, str] = {} for k in ("title", "artist", "album", "albumartist", "tracknumber", "discnumber", "date", "year", "genre", "label", "organization"): v = audio.get(k) if v: tags[k] = str(v[0]).strip() if "year" in tags and "date" not in tags: tags["date"] = tags["year"] duration = None if hasattr(audio, "info") and audio.info and hasattr(audio.info, "length"): duration = audio.info.length return tags, duration except Exception as e: print(f" ⚠️ Tag-Lesefehler {path.name}: {e}", file=sys.stderr) return {}, None _STANDALONE_NUM_RE = re.compile(r"^\d{1,3}$") _DURATION_ONLY_RE = re.compile(r"^\d{1,2}:\d{2}$") def _normalize_vertical_tracklist(text: str) -> Optional[str]: """ Erkennt 'vertikales' Format: 1 Katka dovádí 3:22 2 Záludná → konvertiert zu '1. Katka dovádí 3:22\\n2. Záludná ...' """ non_empty = [l.strip() for l in text.splitlines() if l.strip()] # Mindestens 3 Standalone-Zahlen als Heuristik num_lines = sum(1 for l in non_empty if _STANDALONE_NUM_RE.match(l)) if num_lines < 3: return None result = [] i = 0 while i < len(non_empty): line = non_empty[i] if _STANDALONE_NUM_RE.match(line) and i + 1 < len(non_empty): title_candidate = non_empty[i + 1] # Nächste Zeile darf selbst keine Zahl und keine Dauer sein if not _STANDALONE_NUM_RE.match(title_candidate) and not _DURATION_ONLY_RE.match(title_candidate): duration = "" skip = 2 if i + 2 < len(non_empty) and _DURATION_ONLY_RE.match(non_empty[i + 2]): duration = non_empty[i + 2] skip = 3 entry = f"{line}. {title_candidate}" if duration: entry += f" {duration}" result.append(entry) i += skip continue i += 1 return "\n".join(result) if len(result) >= 3 else None def _parse_tracklist(text: str) -> List[Dict[str, str]]: # Vertikales Format normalisieren bevor das reguläre Parsing läuft normalized = _normalize_vertical_tracklist(text) if normalized: text = normalized tracks: List[Dict[str, str]] = [] current_disc = 1 for line in text.splitlines(): line = line.strip() if not line: continue disc_m = _DISC_SECTION_RE.match(line) if disc_m and len(line) < 30: current_disc = int(disc_m.group(1)) continue for pat in _TRACKLIST_PATTERNS: m = pat.match(line) if m: d = m.groupdict() entry: Dict[str, str] = {"title": _clean(d.get("title", ""))} raw_track = d.get("track", "") if raw_track and raw_track.isdigit(): entry["track"] = raw_track.lstrip("0") or "0" elif raw_track: entry["track"] = raw_track if "disc" in d and d["disc"]: entry["disc"] = d["disc"] else: entry["disc"] = str(current_disc) if entry.get("title"): tracks.append(entry) break return tracks def _parse_m3u(text: str) -> List[Dict[str, str]]: """M3U/M3U8 → geordnete Liste: [{filename, title, position}]. Reihenfolge der Einträge = gewünschte Trackreihenfolge. """ tracks: List[Dict[str, str]] = [] pending_title: Optional[str] = None position = 0 for line in text.splitlines(): line = line.strip() if not line: continue if line.upper().startswith("#EXTINF:"): parts = line.split(",", 1) pending_title = parts[1].strip() if len(parts) > 1 else None elif not line.startswith("#"): filename = Path(line.replace("\\", "/")).name if not filename: continue position += 1 tracks.append({ "position": str(position), "filename": filename, "title": pending_title or "", }) pending_title = None return tracks def _read_tracklist_file(path: Path) -> Optional[str]: try: if path.suffix.lower() in (".htm", ".html"): raw = path.read_bytes() encoding = "utf-8" for enc in ("utf-8", "latin-1", "cp1252"): try: raw.decode(enc) encoding = enc break except UnicodeDecodeError: continue text = raw.decode(encoding, errors="replace") if HAS_BS4: soup = BeautifulSoup(text, "html.parser") return soup.get_text(separator="\n") # Fallback: strip HTML tags return re.sub(r"<[^>]+>", " ", text) else: for enc in ("utf-8", "latin-1", "cp1252"): try: return path.read_text(encoding=enc) except UnicodeDecodeError: continue except Exception as e: print(f" ⚠️ Tracklist-Lesefehler {path.name}: {e}", file=sys.stderr) return None _OLLAMA_HOST = os.getenv("OLLAMA_HOST", "http://localhost:11434") # Modelle in Prioritätsreihenfolge; überschreibbar via OLLAMA_OCR_MODEL _OCR_MODELS = [m.strip() for m in os.getenv( "OLLAMA_OCR_MODEL", "qwen3-vl:latest,minicpm-v:latest,deepseek-ocr:latest" ).split(",") if m.strip()] _OCR_PROMPT = ( "This image shows a CD album back cover or booklet page. " "Your task: extract the complete tracklist as plain text.\n" "Rules:\n" "- Output track number and title per line, e.g. '1. Title' or '1-1 Title'\n" "- If multiple discs/CDs: include a header like 'CD 1' or 'Disc 1' before each group\n" "- Include durations if visible (e.g. '1. Title 4:32')\n" "- Do NOT include label info, barcodes, or other non-tracklist text\n" "- If no tracklist is visible, reply with: NO_TRACKLIST" ) def _ocr_back_cover(image_files: List[Path]) -> Optional[str]: """ OCR eines Back-Cover- oder Booklet-Bildes via Ollama Vision. Gibt den erkannten Text zurück, oder None wenn nichts gefunden. """ # Nur Bilder die nach Back-Cover aussehen candidates = [ p for p in image_files if any(kw in p.name.lower() for kw in ("back", "inlay", "booklet", "inside", "rear")) ] # Fallback: alle Bilder außer dem Front-Cover if not candidates: candidates = [ p for p in image_files if not any(kw in p.name.lower() for kw in ("front", "folder", "cover")) ] if not candidates: return None image_path = candidates[0] try: img_b64 = base64.b64encode(image_path.read_bytes()).decode() except Exception as e: print(f" ⚠️ OCR-Bild lesen {image_path.name}: {e}", file=sys.stderr) return None for model in _OCR_MODELS: payload = json.dumps({ "model": model, "messages": [{ "role": "user", "content": _OCR_PROMPT, "images": [img_b64], }], "stream": False, "options": {"temperature": 0.0}, }).encode() try: req = urllib.request.Request( f"{_OLLAMA_HOST}/api/chat", data=payload, headers={"Content-Type": "application/json"}, method="POST", ) with urllib.request.urlopen(req, timeout=180) as resp: data = json.loads(resp.read()) text = data.get("message", {}).get("content", "").strip() if text and "NO_TRACKLIST" not in text: print(f" 📷 OCR {image_path.name} via {model}: {len(text)} Zeichen extrahiert", file=sys.stderr) return text elif "NO_TRACKLIST" in text: print(f" 📷 OCR {image_path.name}: kein Tracklist-Text erkannt", file=sys.stderr) return None except Exception as e: print(f" ⚠️ OCR-Fehler ({model}) {image_path.name}: {e}", file=sys.stderr) continue return None def _check_cover_images(paths: List[Path]) -> List[Path]: good: List[Path] = [] for p in paths: name_lower = p.name.lower() # Prefer front covers if any(kw in name_lower for kw in ("front", "folder", "cover", "album")): good.insert(0, p) else: good.append(p) return good # YouTube-Video-ID: exakt 11 Zeichen aus [A-Za-z0-9_-], typischerweise letztes _-Token _YT_ID_CHARS = re.compile(r"^[A-Za-z0-9_-]{11}$") def _extract_youtube_id(path: Path) -> Optional[str]: """ Erkennt YouTube-Video-ID als letztes '_'-getrenntes Token im Dateinamen. Plausibilitätsprüfung: mind. ein Großbuchstabe UND mind. ein Kleinbuchstabe/Ziffer. """ candidate = path.stem.split("_")[-1] # letztes Token nach Unterstrich if (len(candidate) == 11 and _YT_ID_CHARS.match(candidate) and re.search(r"[A-Z]", candidate) and re.search(r"[0-9a-z]", candidate)): return candidate return None def _fetch_youtube_metadata(video_id: str) -> Optional[Dict]: """ Ruft YouTube-Metadaten via yt-dlp ab (kein API-Key nötig). Gibt Dict mit title, uploader, chapters, description zurück oder None. """ ytdlp = shutil.which("yt-dlp") if not ytdlp: return None url = f"https://www.youtube.com/watch?v={video_id}" try: result = subprocess.run( [ytdlp, "--dump-json", "--no-download", "--no-playlist", url], capture_output=True, text=True, timeout=30, ) if result.returncode != 0 or not result.stdout.strip(): return None return json.loads(result.stdout) except Exception as e: print(f" ⚠️ YouTube-Fehler ({video_id}): {e}", file=sys.stderr) return None def _chapters_to_tracklist_text(chapters: List[Dict]) -> str: """ Konvertiert yt-dlp-Chapters in Tracklist-Text der vom _parse_tracklist verarbeitetet werden kann: '1. Titel MM:SS' """ lines = [] for i, ch in enumerate(chapters, 1): title = ch.get("title", "").strip() if not title or title.startswith("<Untitled"): continue secs = int(ch.get("start_time", 0)) mm, ss = divmod(secs, 60) lines.append(f"{i}. {title} {mm}:{ss:02d}") return "\n".join(lines) def extract_hints(scan: AlbumScan, use_ocr: bool = True) -> AlbumHints: hints = AlbumHints(album_dir=scan.album_dir) # Directory name hints.dir_artist, hints.dir_album, hints.dir_year = _parse_dirname(scan.album_dir.name) # Cover images hints.cover_images = _check_cover_images(scan.image_files) # Tracklist files texts: List[str] = [] for tf in scan.tracklist_files: txt = _read_tracklist_file(tf) if txt: texts.append(txt) hints.tracklist_text = "\n\n".join(texts) if texts else None # OCR-Fallback: Back-Cover scannen wenn keine Tracklist-Textdatei vorhanden if use_ocr and not hints.tracklist_text and scan.image_files: ocr_text = _ocr_back_cover(scan.image_files) if ocr_text: hints.tracklist_text = ocr_text # YouTube-Lookup: IDs aus Dateinamen extrahieren, Metadaten per yt-dlp holen yt_meta_by_id: Dict[str, Optional[Dict]] = {} yt_ids_by_stem: Dict[str, str] = {} # stem (normalisiert) → youtube_id for audio_path in scan.audio_files: yt_id = _extract_youtube_id(audio_path) if yt_id: stem_key = _clean(audio_path.stem).casefold() yt_ids_by_stem[stem_key] = yt_id yt_meta_by_id.setdefault(yt_id, None) if yt_meta_by_id: print(f" 📺 YouTube-IDs gefunden: {', '.join(list(yt_meta_by_id.keys())[:5])}", file=sys.stderr) for yt_id in list(yt_meta_by_id.keys())[:5]: meta = _fetch_youtube_metadata(yt_id) yt_meta_by_id[yt_id] = meta # Chapters als Tracklist nutzen wenn noch keine vorhanden if not hints.tracklist_text: for yt_id, meta in yt_meta_by_id.items(): if meta and meta.get("chapters"): chapter_text = _chapters_to_tracklist_text(meta["chapters"]) if chapter_text: hints.tracklist_text = chapter_text print(f" 📺 YouTube-Chapters als Tracklist: {len(meta['chapters'])} Tracks", file=sys.stderr) break # Album-Level-Hints (erster erfolgreicher Treffer) for yt_id, meta in yt_meta_by_id.items(): if meta: hints.yt_title = (meta.get("title") or "").strip() or None hints.yt_uploader = ( meta.get("uploader") or meta.get("channel") or "" ).strip() or None break parsed_tracklist = _parse_tracklist(hints.tracklist_text) if hints.tracklist_text else [] # M3U/Playlist-Reihenfolge: filename (stem, normalisiert) → Tracknummer m3u_order: Dict[str, int] = {} m3u_titles: Dict[str, str] = {} for pf in scan.playlist_files: try: text = pf.read_text(encoding="utf-8", errors="replace") for entry in _parse_m3u(text): stem = _clean(Path(entry["filename"]).stem).casefold() pos = int(entry["position"]) if stem and stem not in m3u_order: m3u_order[stem] = pos if entry.get("title"): m3u_titles[stem] = entry["title"] except Exception as e: print(f" ⚠️ Playlist-Lesefehler {pf.name}: {e}", file=sys.stderr) # Tracklist-Lookup: exakter Titel, fuzzy Titel, Katalognummer (BWV, Op., K., …) tl_by_title: Dict[str, Dict[str, str]] = {} tl_by_title_norm: Dict[str, Dict[str, str]] = {} tl_by_catalog: Dict[str, Dict[str, str]] = {} for entry in parsed_tracklist: raw_title = entry.get("title", "") exact_key = _clean(raw_title).casefold() if exact_key: tl_by_title[exact_key] = entry norm_key = _norm_for_match(raw_title) if norm_key: tl_by_title_norm[norm_key] = entry cat_key = _catalog_key(raw_title) if cat_key: tl_by_catalog[cat_key] = entry # Build TrackHints per audio file for audio_path in sorted(scan.audio_files): stem_key = _clean(audio_path.stem).casefold() yt_id_for_file = yt_ids_by_stem.get(stem_key) # Stem ohne YouTube-ID für Dateiname-Parsing parse_stem = audio_path.stem if yt_id_for_file: tokens = parse_stem.rsplit("_", 1) if len(tokens) == 2 and tokens[1] == yt_id_for_file: parse_stem = tokens[0] tags, duration = _read_tags(audio_path) fn_hints = _parse_filename(parse_stem) track_num: Optional[int] = None disc_num: Optional[int] = None # Track number: tag > filename raw_tn = tags.get("tracknumber") or fn_hints.get("track") if raw_tn: try: tn_int = int(str(raw_tn).split("/")[0]) if tn_int > 0: # 0 gilt als "keine Nummer" track_num = tn_int except ValueError: pass # Disc number: tag > filename > path segment raw_dn = tags.get("discnumber") or fn_hints.get("disc") if raw_dn: try: disc_num = int(str(raw_dn).split("/")[0]) except ValueError: pass if not disc_num: for part in audio_path.relative_to(scan.album_dir).parts[:-1]: dm = _DISC_SECTION_RE.search(part) if dm: disc_num = int(dm.group(1)) break title_raw = tags.get("title") or fn_hints.get("title") title = title_raw if _is_good(title_raw) else fn_hints.get("title") artist_raw = tags.get("artist") or fn_hints.get("artist") artist = artist_raw if _is_good(artist_raw) else fn_hints.get("artist") # Tracklist-Matching: Nummer → exakter Titel → fuzzy Titel # Wenn ein Match gefunden: disc+track aus Tracklist übernehmen (Tracklist ist # autoritativer als M3U-Reihenfolge bei Alben mit expliziter Disc-Nummerierung). if parsed_tracklist: matched_tl: Optional[Dict[str, str]] = None # 1. Exakt per Tracknummer + Disc (disc_num=None → Single-CD, assume 1) if track_num: assumed_disc = disc_num if disc_num else 1 for tl_entry in parsed_tracklist: tl_track = tl_entry.get("track") tl_disc = int(tl_entry.get("disc", "1")) if (tl_track and int(tl_track) == track_num and tl_disc == assumed_disc): matched_tl = tl_entry break # 2. Exakter Titelvergleich if matched_tl is None and title: matched_tl = tl_by_title.get(_clean(title).casefold()) # 3. Fuzzy Titelvergleich (ignoriert Kommas, Apostrophe, Groß-/Kleinschreibung) if matched_tl is None and title: matched_tl = tl_by_title_norm.get(_norm_for_match(title)) # 4. Katalognummer (BWV, Op., K. …) — greift bei abgekürzten Dateinamen if matched_tl is None and title: cat = _catalog_key(title) if cat: matched_tl = tl_by_catalog.get(cat) if matched_tl: # Titel aus Tracklist übernehmen wenn besser if _is_good(matched_tl.get("title")): title = matched_tl["title"] # disc+track aus Tracklist sind autoritativer als M3U-Reihenfolge try: tl_track_n = int(matched_tl["track"]) if matched_tl.get("track") else None tl_disc_n = int(matched_tl.get("disc", "1")) if tl_track_n: track_num = tl_track_n disc_num = tl_disc_n except (ValueError, KeyError): pass # M3U-Reihenfolge nur als letzter Fallback (wenn Tracklist kein Match liefert) if track_num is None: if stem_key in m3u_order: track_num = m3u_order[stem_key] # M3U-Titel als Fallback (enthält "Composer - Title" — nur nutzen wenn kein besserer Titel) if not _is_good(title): if stem_key in m3u_titles: title = m3u_titles[stem_key] # YouTube-Titel als letzter Fallback (bei einzelner Datei = das ganze Video) if not _is_good(title): yt_id = yt_id_for_file if yt_id: meta = yt_meta_by_id.get(yt_id) if meta: yt_video_title = (meta.get("title") or "").strip() if yt_video_title: title = yt_video_title hints.tracks.append(TrackHints( path=audio_path, track_number=track_num, disc_number=disc_num, title=_clean(title) if title else None, artist=_clean(artist) if artist else None, duration=duration, existing_tags=tags, )) return hints