diff --git a/cover_handler.py b/cover_handler.py deleted file mode 100644 index 221c772..0000000 --- a/cover_handler.py +++ /dev/null @@ -1,171 +0,0 @@ -from __future__ import annotations - -import sys -import tempfile -import time -from pathlib import Path -from typing import Optional, List - -try: - from PIL import Image - HAS_PIL = True -except ImportError: - HAS_PIL = False - -try: - import requests - HAS_REQUESTS = True -except ImportError: - HAS_REQUESTS = False - -try: - import musicbrainzngs as mb - HAS_MB = True -except ImportError: - HAS_MB = False - -try: - from mutagen.id3 import ID3, APIC, error as ID3Error - from mutagen.flac import FLAC, Picture - from mutagen.mp4 import MP4, MP4Cover - HAS_MUTAGEN = True -except ImportError: - HAS_MUTAGEN = False - -_MIN_COVER_SIZE = 200 # pixels - - -def _image_ok(path: Path) -> bool: - if not HAS_PIL: - return path.stat().st_size > 5000 - try: - with Image.open(path) as img: - w, h = img.size - return w >= _MIN_COVER_SIZE and h >= _MIN_COVER_SIZE - except Exception: - return False - - -def find_local_cover(image_files: List[Path]) -> Optional[Path]: - priority = ("front", "folder", "cover", "album") - # Sort by priority keyword, then size descending - def key(p: Path): - name = p.name.lower() - score = next((i for i, kw in enumerate(priority) if kw in name), len(priority)) - size = p.stat().st_size if p.exists() else 0 - return (score, -size) - - for p in sorted(image_files, key=key): - if _image_ok(p): - return p - return None - - -def _mb_cover_url(release_mbid: str) -> Optional[str]: - url = f"https://coverartarchive.org/release/{release_mbid}/front" - if not HAS_REQUESTS: - return None - try: - r = requests.head(url, timeout=5, allow_redirects=True) - if r.status_code == 200: - return url - except Exception: - pass - return None - - -def download_cover(release_mbid: Optional[str], dest_dir: Path) -> Optional[Path]: - if not release_mbid or not HAS_REQUESTS: - return None - url = _mb_cover_url(release_mbid) - if not url: - return None - try: - r = requests.get(url, timeout=15) - if r.status_code == 200: - ext = ".jpg" - ct = r.headers.get("content-type", "") - if "png" in ct: - ext = ".png" - dest = dest_dir / f"_cover_download{ext}" - dest.write_bytes(r.content) - if _image_ok(dest): - return dest - dest.unlink(missing_ok=True) - except Exception as e: - print(f" ⚠️ Cover-Download-Fehler: {e}", file=sys.stderr) - return None - - -def embed_cover(audio_path: Path, cover_path: Path) -> bool: - if not HAS_MUTAGEN: - return False - try: - img_data = cover_path.read_bytes() - mime = "image/jpeg" if cover_path.suffix.lower() in (".jpg", ".jpeg") else "image/png" - ext = audio_path.suffix.lower() - - if ext == ".mp3": - try: - tags = ID3(str(audio_path)) - except ID3Error: - tags = ID3() - tags.delall("APIC") - tags.add(APIC(encoding=3, mime=mime, type=3, desc="Cover", data=img_data)) - tags.save(str(audio_path), v2_version=4) - return True - - elif ext == ".flac": - audio = FLAC(str(audio_path)) - audio.clear_pictures() - pic = Picture() - pic.type = 3 - pic.mime = mime - pic.desc = "Cover" - pic.data = img_data - audio.add_picture(pic) - audio.save() - return True - - elif ext == ".m4a": - audio = MP4(str(audio_path)) - fmt = MP4Cover.FORMAT_JPEG if mime == "image/jpeg" else MP4Cover.FORMAT_PNG - audio.tags["covr"] = [MP4Cover(img_data, imageformat=fmt)] - audio.save() - return True - - else: - # Generic mutagen fallback - from mutagen import File as MutagenFile - audio = MutagenFile(str(audio_path), easy=False) - if audio is not None: - if audio.tags is None: - audio.add_tags() - if hasattr(audio.tags, "add"): - audio.tags.add( - APIC(encoding=3, mime=mime, type=3, desc="Cover", data=img_data) - ) - audio.save() - return True - - except Exception as e: - print(f" ⚠️ Cover-Einbettungsfehler {audio_path.name}: {e}", file=sys.stderr) - return False - - -def resolve_cover( - image_files: List[Path], - release_mbid: Optional[str], - album_dir: Path, -) -> tuple[Optional[Path], Optional[str]]: - """Returns (cover_path, source_label).""" - local = find_local_cover(image_files) - if local: - return local, "local" - - if release_mbid: - downloaded = download_cover(release_mbid, album_dir) - if downloaded: - return downloaded, "musicbrainz" - - return None, None diff --git a/executor.py b/executor.py deleted file mode 100644 index 42c19cb..0000000 --- a/executor.py +++ /dev/null @@ -1,368 +0,0 @@ -from __future__ import annotations - -import csv -import re -import shutil -import subprocess -import sys -from pathlib import Path -from typing import Optional, List, Dict, Any - -from models import AlbumProposal, TrackProposal - -try: - from mutagen import File as MutagenFile - from mutagen.easyid3 import EasyID3 - from mutagen.flac import FLAC - from mutagen.mp4 import MP4, MP4Tags - HAS_MUTAGEN = True -except ImportError: - HAS_MUTAGEN = False - -from cover_handler import embed_cover - -_SAFE_RE = re.compile(r'[<>:"/\\|?*\x00-\x1f]') -_CLASSICAL_GENRES = re.compile( - r"(?i)class|baroque|romantic|renaissance|opera|symphony|chamber|concerto|sonata|oratorio" -) -REPORT_FIELDS = [ - "status", "album_dir", "track_path", - "old_title", "new_title", - "old_artist", "new_artist", - "album", "albumartist", "date", "genre", "label", - "track_number", "disc_number", - "cover_embedded", "renamed_to", - "confidence", "sources", -] - - -def _safe_name(s: str) -> str: - """Filesystem-safe name: illegal chars → '_', spaces → '_'.""" - s = _SAFE_RE.sub("_", s) - return re.sub(r"\s+", "_", s).strip("._-") - - -def _is_classical(albumartist: str, track_artist: str, genre: str) -> bool: - """ - Classical schema applies when performer (albumartist) ≠ composer (track_artist), - which covers both 'real' classical music and jazz-on-classical-themes albums. - Genre keyword matching is used as additional signal but not required. - """ - aa = (albumartist or "").casefold().strip() - ta = (track_artist or "").casefold().strip() - if not aa or aa in ("various artists", "unknown artist", "unknown"): - return False - if aa == ta: - return False - return True # performer ≠ composer → classical naming - - -def _proposed_filename( - proposal: TrackProposal, - ext: str, - albumartist: str = "", - genre: str = "", -) -> str: - """ - Pop/Default: TT_-_Artist_-_Titel.ext - Klassik: TT_-_Performer_-_Komponist_-_Titel[-_Orchester_Dirigent].ext - - Separator zwischen Teilen: _-_ - Leerzeichen innerhalb von Namen: _ - Fehlende Teile werden weggelassen. - """ - tn = f"{proposal.track_number:02d}" if proposal.track_number else "00" - # Wenn disc_number gesetzt (auch disc=1): immer "D-TT" — konsistent über alle CDs. - # disc=None (Einzel-CD ohne Tag): nur "TT". - disc_prefix = f"{proposal.disc_number}-" if proposal.disc_number else "" - prefix = f"{disc_prefix}{tn}" - - track_artist = _safe_name(proposal.artist or "Unknown") - aa = _safe_name(albumartist) - title = _safe_name(proposal.title or "Unknown") - - if _is_classical(aa, track_artist, genre): - # Klassik-Schema: Performer _-_ Komponist _-_ Werk [_-_ Orchester,Dirigent] - parts = [prefix, aa, track_artist, title] - # Orchester und Dirigent anhängen wenn vorhanden - extra = "_".join(filter(None, [ - _safe_name(proposal.orchestra or ""), - _safe_name(proposal.conductor or ""), - ])) - if extra: - parts.append(extra) - return "_-_".join(parts) + ext - else: - # Pop/Default-Schema: Tracknummer _-_ Artist _-_ Titel - return f"{prefix}_-_{track_artist}_-_{title}{ext}" - - -def backup_file(path: Path, backup_dir: Path) -> bool: - try: - backup_dir.mkdir(parents=True, exist_ok=True) - rel = path.parent.name + "__" + path.name - dest = backup_dir / rel - if not dest.exists(): - shutil.copy2(path, dest) - return True - except Exception as e: - print(f" ⚠️ Backup-Fehler {path.name}: {e}", file=sys.stderr) - return False - - -def write_tags(path: Path, proposal: TrackProposal, album_proposal: AlbumProposal) -> bool: - if not HAS_MUTAGEN: - return False - ext = path.suffix.lower() - tags_to_write = { - "title": proposal.title or "", - "artist": proposal.artist or "", - "album": album_proposal.album or "", - "albumartist": album_proposal.albumartist or "", - } - if proposal.track_number: - total = len(album_proposal.tracks) - tags_to_write["tracknumber"] = f"{proposal.track_number}/{total}" - if proposal.disc_number: - tags_to_write["discnumber"] = str(proposal.disc_number) - if album_proposal.date: - # Strip everything except valid ID3 timestamp characters to prevent ID3TimeStamp errors - date_clean = re.sub(r"[^\d\-T:+Z]", "", str(album_proposal.date)).strip() - if date_clean: - tags_to_write["date"] = date_clean - if album_proposal.genre: - tags_to_write["genre"] = album_proposal.genre - if album_proposal.label: - tags_to_write["organization"] = album_proposal.label - - try: - if ext == ".mp3": - try: - audio = EasyID3(str(path)) - except Exception: - # File has no ID3 header — add one without wiping audio data - from mutagen.id3 import ID3NoHeaderError - try: - from mutagen.mp3 import MP3 - full = MP3(str(path)) - full.tags = None - full.add_tags() - full.save(str(path), v2_version=4) - except Exception: - pass - audio = EasyID3(str(path)) - for k, v in tags_to_write.items(): - try: - audio[k] = [v] - except Exception as tag_err: - print(f" ⚠️ Tag-Feld '{k}' übersprungen ({path.name}): {tag_err}", file=sys.stderr) - audio.save(v2_version=4) - return True - - elif ext == ".flac": - audio = FLAC(str(path)) - for k, v in tags_to_write.items(): - audio[k] = [v] - audio.save() - return True - - elif ext == ".m4a": - audio = MP4(str(path)) - mapping = { - "title": "\xa9nam", "artist": "\xa9ART", - "album": "\xa9alb", "albumartist": "aART", - "tracknumber": "trkn", "date": "\xa9day", - "genre": "\xa9gen", - } - for k, v in tags_to_write.items(): - tag_key = mapping.get(k) - if tag_key: - if tag_key == "trkn": - try: - num, total = v.split("/") if "/" in v else (v, "0") - audio[tag_key] = [(int(num), int(total))] - except Exception: - pass - else: - audio[tag_key] = [v] - audio.save() - return True - - else: - audio = MutagenFile(str(path), easy=True) - if audio is not None: - if audio.tags is None: - audio.add_tags() - for k, v in tags_to_write.items(): - try: - audio[k] = [v] - except Exception: - pass - audio.save() - return True - - except Exception as e: - print(f" ⚠️ Tag-Schreibfehler {path.name}: {e}", file=sys.stderr) - return False - - -def _update_m3u(m3u_path: Path, tracks: List[tuple]) -> bool: - """ - Schreibt M3U neu mit den umbenannten Dateien in Track-Reihenfolge. - tracks: [(TrackProposal, actual_path_after_rename), ...] - """ - try: - lines = ["#EXTM3U"] - for tp, track_path in tracks: - duration = -1 - if HAS_MUTAGEN: - try: - audio = MutagenFile(str(track_path)) - if audio and hasattr(audio, "info") and audio.info: - duration = int(audio.info.length) - except Exception: - pass - label = f"{tp.artist} - {tp.title}" if tp.artist else (tp.title or track_path.stem) - lines.append(f"#EXTINF:{duration},{label}") - lines.append(track_path.name) - m3u_path.write_text("\n".join(lines) + "\n", encoding="utf-8") - print(f" 📋 Playlist aktualisiert: {m3u_path.name}") - return True - except Exception as e: - print(f" ⚠️ M3U-Fehler {m3u_path.name}: {e}", file=sys.stderr) - return False - - -def execute_album( - proposal: AlbumProposal, - backup_dir: Optional[Path], - do_rename: bool, - embed_cover_art: bool, - dry_run: bool, - report_data: List[Dict[str, Any]], -) -> Dict[str, int]: - stats = {"tags_written": 0, "covers_embedded": 0, "files_renamed": 0, "errors": 0} - final_tracks: List[tuple] = [] # (TrackProposal, final_path) für M3U - - for tp in proposal.tracks: - old_title = tp.path.stem - old_artist = "" - if HAS_MUTAGEN: - try: - audio = MutagenFile(str(tp.path), easy=True) - if audio and audio.tags: - old_artist = str(audio.tags.get("artist", [""])[0]) - old_title = str(audio.tags.get("title", [tp.path.stem])[0]) - except Exception: - pass - - new_path = tp.path - renamed_to = "" - cover_embedded = False - - if not dry_run: - if backup_dir: - backup_file(tp.path, backup_dir) - - if write_tags(tp.path, tp, proposal): - stats["tags_written"] += 1 - else: - stats["errors"] += 1 - - if embed_cover_art and proposal.cover_path: - if embed_cover(tp.path, proposal.cover_path): - stats["covers_embedded"] += 1 - cover_embedded = True - - if do_rename: - new_name = _proposed_filename( - tp, tp.path.suffix, - albumartist=proposal.albumartist or "", - genre=proposal.genre or "", - ) - candidate = tp.path.parent / new_name - if candidate != tp.path: - try: - tp.path.rename(candidate) - new_path = candidate - renamed_to = new_name - stats["files_renamed"] += 1 - except Exception as e: - print(f" ⚠️ Umbenennungsfehler {tp.path.name}: {e}", file=sys.stderr) - stats["errors"] += 1 - - if not dry_run: - final_tracks.append((tp, new_path)) - - report_data.append({ - "status": "dry-run" if dry_run else "ok", - "album_dir": str(proposal.album_dir.name), - "track_path": str(new_path), - "old_title": old_title, - "new_title": tp.title, - "old_artist": old_artist, - "new_artist": tp.artist, - "album": proposal.album, - "albumartist": proposal.albumartist, - "date": proposal.date or "", - "genre": proposal.genre or "", - "label": proposal.label or "", - "track_number": tp.track_number or "", - "disc_number": tp.disc_number or "", - "cover_embedded": cover_embedded, - "renamed_to": renamed_to, - "confidence": f"{proposal.confidence:.2f}", - "sources": ", ".join(proposal.sources), - }) - - # M3U-Playlist aktualisieren wenn Dateien umbenannt wurden - if do_rename and not dry_run and stats["files_renamed"] > 0 and final_tracks: - m3u_files = ( - list(proposal.album_dir.glob("*.m3u")) + - list(proposal.album_dir.glob("*.m3u8")) - ) - if m3u_files: - _update_m3u(m3u_files[0], final_tracks) - - # Nach allen Umbenennungen: Verzeichnis Linux-kompatibel bereinigen - if do_rename and not dry_run: - sanitize_dir_names(proposal.album_dir) - - return stats - - -def sanitize_dir_names(directory: Path) -> None: - """ - Macht alle Dateinamen im Verzeichnis Linux-kompatibel. - Bevorzugt 'NameToUnix ', fällt auf 'detox ' zurück. - """ - name_to_unix = shutil.which("NameToUnix") - if name_to_unix: - try: - subprocess.run([name_to_unix, str(directory)], check=True, capture_output=True) - return - except subprocess.CalledProcessError as e: - print(f" ⚠️ NameToUnix-Fehler: {e.stderr.decode(errors='replace').strip()}", file=sys.stderr) - - detox = shutil.which("detox") - if detox: - for f in sorted(directory.rglob("*")): - if f.is_file(): - try: - subprocess.run([detox, str(f)], check=True, capture_output=True) - except subprocess.CalledProcessError as e: - print(f" ⚠️ detox-Fehler {f.name}: {e.stderr.decode(errors='replace').strip()}", file=sys.stderr) - else: - print(" ℹ️ Weder NameToUnix noch detox gefunden — Dateinamen nicht nachbereinigt.", file=sys.stderr) - - -def write_report(report_data: List[Dict[str, Any]], report_path: Path) -> None: - try: - report_path.parent.mkdir(parents=True, exist_ok=True) - with report_path.open("w", encoding="utf-8", newline="") as f: - w = csv.DictWriter(f, fieldnames=REPORT_FIELDS) - w.writeheader() - w.writerows(report_data) - print(f"📊 Report gespeichert: {report_path}") - except Exception as e: - print(f"⚠️ Report-Fehler: {e}", file=sys.stderr) diff --git a/hint_extractor.py b/hint_extractor.py deleted file mode 100755 index 433c71d..0000000 --- a/hint_extractor.py +++ /dev/null @@ -1,583 +0,0 @@ -from __future__ import annotations - -import base64 -import json -import os -import re -import shutil -import subprocess -import sys -import urllib.request -from pathlib import Path -from typing import Optional, List, Dict, Tuple - -from models import AlbumScan, AlbumHints, TrackHints - -try: - from mutagen import File as MutagenFile - HAS_MUTAGEN = True -except ImportError: - HAS_MUTAGEN = False - -try: - from bs4 import BeautifulSoup - HAS_BS4 = True -except ImportError: - HAS_BS4 = False - -_NATSORT_RE = re.compile(r"(\d+)") -_BAD_VALUES = {"unknown", "unknown artist", "unknown album", "untitled", "track", "va", "various"} - -# Filename patterns: most specific first -_FILENAME_PATTERNS = [ - re.compile(r"^(?P\d{1,2})[- _]+(?P\d{1,3})\s*[-._ ]+\s*(?P.+?)\s*[-–]\s*(?P.+)$"), - re.compile(r"^(?P<disc>\d{1,2})[- _]+(?P<track>\d{1,3})\s*[-._ ]+\s*(?P<title>.+)$"), - re.compile(r"^(?P<track>\d{1,3})\s*[-._ ]+\s*(?P<artist>.+?)\s*[-–]\s*(?P<title>.+)$"), - re.compile(r"^(?P<track>\d{1,3})\s*[-._ ]+\s*(?P<title>.+)$"), - re.compile(r"^(?P<artist>.+?)\s*[-–]\s*(?P<title>.+)$"), -] - -# Directory name patterns -_DIR_PATTERNS = [ - re.compile(r"^(?P<artist>.+?)[_ -]+[-–][_ -]+(?P<album>.+?)(?:[_ -]+(?P<year>\d{4}))?$"), - re.compile(r"^(?P<artist>.+?)[_ ]+(?P<year>\d{4})[._ -]+(?P<album>.+)$"), - re.compile(r"^(?P<album>.+?)[_ -]+(?P<year>\d{4})$"), -] - -# Tracklist line patterns -_TRACKLIST_PATTERNS = [ - re.compile(r"^(?P<disc>\d{1,2})[- _](?P<track>\d{1,3})\s+(?P<title>.+?)(?:\s+\d+:\d{2})?$"), - # Separator muss . ) oder : sein — reines Leerzeichen reicht nicht - # (verhindert False-Positives wie "2 x CD, Compilation, Remastered") - re.compile(r"^(?P<track>\d{1,3})[.):]\s*(?P<title>.+?)(?:\s+\d+:\d{2})?$"), - re.compile(r"^(?P<track>[A-Z]\d{1,2})[.):]\s*(?P<title>.+?)(?:\s+\d+:\d{2})?$"), -] - -_DISC_SECTION_RE = re.compile(r"(?i)(?:cd|disc|disk|side)[_ \-]*(\d{1,2})") - - -def _clean(s: Optional[str]) -> str: - if not s: - return "" - # BOM (U+FEFF), Zero-Width-Space (U+200B), Soft-Hyphen (U+00AD) entfernen - s = re.sub(r"[​­]", "", s) - return re.sub(r"\s+", " ", s.replace("_", " ")).strip(" -._") - - -def _norm_for_match(s: str) -> str: - """Nur Buchstaben und Ziffern — für fuzzy Titelvergleich (Interpunktion-agnostisch).""" - return re.sub(r"[^a-z0-9]", "", s.casefold()) - - -# Klassische Werkverzeichnis-Nummern: BWV 565, Op. 27, K. 331, HWV 56, … -_CATALOG_RE = re.compile( - r"\b(bwv|hwv|op|k|kv|d|sz|wq|bbwv|rv|twv|hob)\W*(\d+[a-z]?(?:[\/\.]\d+)?)", - re.IGNORECASE, -) - - -def _catalog_key(s: str) -> Optional[str]: - """Extrahiert normalisierte Katalognummer, z.B. 'bwv565' oder 'op27'.""" - m = _CATALOG_RE.search(s) - if m: - return m.group(1).lower() + re.sub(r"\W", "", m.group(2)) - return None - - -def _is_good(v: Optional[str]) -> bool: - if not v: - return False - return _clean(v).casefold() not in _BAD_VALUES - - -def _parse_dirname(name: str) -> Tuple[Optional[str], Optional[str], Optional[str]]: - name_clean = _clean(name) - for pat in _DIR_PATTERNS: - m = pat.match(name_clean) - if m: - d = m.groupdict() - artist = _clean(d.get("artist")) or None - album = _clean(d.get("album")) or None - year = d.get("year") - if _is_good(artist) or _is_good(album): - return artist, album, year - # No pattern matched — treat whole name as album - return None, _clean(name_clean), None - - -def _parse_filename(stem: str) -> Dict[str, str]: - stem_clean = _clean(stem) - for pat in _FILENAME_PATTERNS: - m = pat.match(stem_clean) - if m: - return {k: _clean(v) for k, v in m.groupdict().items() if v} - return {"title": stem_clean} - - -def _read_tags(path: Path) -> Tuple[Dict[str, str], Optional[float]]: - if not HAS_MUTAGEN: - return {}, None - try: - audio = MutagenFile(str(path), easy=True) - if not audio: - return {}, None - tags: Dict[str, str] = {} - for k in ("title", "artist", "album", "albumartist", "tracknumber", - "discnumber", "date", "year", "genre", "label", "organization"): - v = audio.get(k) - if v: - tags[k] = str(v[0]).strip() - if "year" in tags and "date" not in tags: - tags["date"] = tags["year"] - duration = None - if hasattr(audio, "info") and audio.info and hasattr(audio.info, "length"): - duration = audio.info.length - return tags, duration - except Exception as e: - print(f" ⚠️ Tag-Lesefehler {path.name}: {e}", file=sys.stderr) - return {}, None - - -def _parse_tracklist(text: str) -> List[Dict[str, str]]: - tracks: List[Dict[str, str]] = [] - current_disc = 1 - - for line in text.splitlines(): - line = line.strip() - if not line: - continue - - disc_m = _DISC_SECTION_RE.match(line) - if disc_m and len(line) < 30: - current_disc = int(disc_m.group(1)) - continue - - for pat in _TRACKLIST_PATTERNS: - m = pat.match(line) - if m: - d = m.groupdict() - entry: Dict[str, str] = {"title": _clean(d.get("title", ""))} - raw_track = d.get("track", "") - if raw_track and raw_track.isdigit(): - entry["track"] = raw_track.lstrip("0") or "0" - elif raw_track: - entry["track"] = raw_track - if "disc" in d and d["disc"]: - entry["disc"] = d["disc"] - else: - entry["disc"] = str(current_disc) - if entry.get("title"): - tracks.append(entry) - break - - return tracks - - -def _parse_m3u(text: str) -> List[Dict[str, str]]: - """M3U/M3U8 → geordnete Liste: [{filename, title, position}]. - Reihenfolge der Einträge = gewünschte Trackreihenfolge. - """ - tracks: List[Dict[str, str]] = [] - pending_title: Optional[str] = None - position = 0 - for line in text.splitlines(): - line = line.strip() - if not line: - continue - if line.upper().startswith("#EXTINF:"): - parts = line.split(",", 1) - pending_title = parts[1].strip() if len(parts) > 1 else None - elif not line.startswith("#"): - filename = Path(line.replace("\\", "/")).name - if not filename: - continue - position += 1 - tracks.append({ - "position": str(position), - "filename": filename, - "title": pending_title or "", - }) - pending_title = None - return tracks - - -def _read_tracklist_file(path: Path) -> Optional[str]: - try: - if path.suffix.lower() in (".htm", ".html"): - raw = path.read_bytes() - encoding = "utf-8" - for enc in ("utf-8", "latin-1", "cp1252"): - try: - raw.decode(enc) - encoding = enc - break - except UnicodeDecodeError: - continue - text = raw.decode(encoding, errors="replace") - if HAS_BS4: - soup = BeautifulSoup(text, "html.parser") - return soup.get_text(separator="\n") - # Fallback: strip HTML tags - return re.sub(r"<[^>]+>", " ", text) - else: - for enc in ("utf-8", "latin-1", "cp1252"): - try: - return path.read_text(encoding=enc) - except UnicodeDecodeError: - continue - except Exception as e: - print(f" ⚠️ Tracklist-Lesefehler {path.name}: {e}", file=sys.stderr) - return None - - -_OLLAMA_HOST = os.getenv("OLLAMA_HOST", "http://localhost:11434") -# Modelle in Prioritätsreihenfolge; überschreibbar via OLLAMA_OCR_MODEL -_OCR_MODELS = [m.strip() for m in os.getenv( - "OLLAMA_OCR_MODEL", - "qwen3-vl:latest,minicpm-v:latest,deepseek-ocr:latest" -).split(",") if m.strip()] - -_OCR_PROMPT = ( - "This image shows a CD album back cover or booklet page. " - "Your task: extract the complete tracklist as plain text.\n" - "Rules:\n" - "- Output track number and title per line, e.g. '1. Title' or '1-1 Title'\n" - "- If multiple discs/CDs: include a header like 'CD 1' or 'Disc 1' before each group\n" - "- Include durations if visible (e.g. '1. Title 4:32')\n" - "- Do NOT include label info, barcodes, or other non-tracklist text\n" - "- If no tracklist is visible, reply with: NO_TRACKLIST" -) - - -def _ocr_back_cover(image_files: List[Path]) -> Optional[str]: - """ - OCR eines Back-Cover- oder Booklet-Bildes via Ollama Vision. - Gibt den erkannten Text zurück, oder None wenn nichts gefunden. - """ - # Nur Bilder die nach Back-Cover aussehen - candidates = [ - p for p in image_files - if any(kw in p.name.lower() for kw in ("back", "inlay", "booklet", "inside", "rear")) - ] - # Fallback: alle Bilder außer dem Front-Cover - if not candidates: - candidates = [ - p for p in image_files - if not any(kw in p.name.lower() for kw in ("front", "folder", "cover")) - ] - if not candidates: - return None - - image_path = candidates[0] - try: - img_b64 = base64.b64encode(image_path.read_bytes()).decode() - except Exception as e: - print(f" ⚠️ OCR-Bild lesen {image_path.name}: {e}", file=sys.stderr) - return None - - for model in _OCR_MODELS: - payload = json.dumps({ - "model": model, - "messages": [{ - "role": "user", - "content": _OCR_PROMPT, - "images": [img_b64], - }], - "stream": False, - "options": {"temperature": 0.0}, - }).encode() - try: - req = urllib.request.Request( - f"{_OLLAMA_HOST}/api/chat", - data=payload, - headers={"Content-Type": "application/json"}, - method="POST", - ) - with urllib.request.urlopen(req, timeout=180) as resp: - data = json.loads(resp.read()) - text = data.get("message", {}).get("content", "").strip() - if text and "NO_TRACKLIST" not in text: - print(f" 📷 OCR {image_path.name} via {model}: {len(text)} Zeichen extrahiert", - file=sys.stderr) - return text - elif "NO_TRACKLIST" in text: - print(f" 📷 OCR {image_path.name}: kein Tracklist-Text erkannt", file=sys.stderr) - return None - except Exception as e: - print(f" ⚠️ OCR-Fehler ({model}) {image_path.name}: {e}", file=sys.stderr) - continue - return None - - -def _check_cover_images(paths: List[Path]) -> List[Path]: - good: List[Path] = [] - for p in paths: - name_lower = p.name.lower() - # Prefer front covers - if any(kw in name_lower for kw in ("front", "folder", "cover", "album")): - good.insert(0, p) - else: - good.append(p) - return good - - -# YouTube-Video-ID: 11 Zeichen aus [A-Za-z0-9_-], eingebettet im Dateinamen -_YT_ID_RE = re.compile(r"(?<![A-Za-z0-9_-])([A-Za-z0-9_-]{11})(?![A-Za-z0-9_-])") - - -def _extract_youtube_id(path: Path) -> Optional[str]: - """Sucht eine YouTube-Video-ID im Dateinamen (Stem oder Suffix).""" - name = path.stem + path.suffix - for m in _YT_ID_RE.finditer(name): - candidate = m.group(1) - # Einfache Plausibilitätsprüfung: muss gemischte Zeichen haben - if re.search(r"[A-Z]", candidate) and re.search(r"[0-9a-z]", candidate): - return candidate - return None - - -def _fetch_youtube_metadata(video_id: str) -> Optional[Dict]: - """ - Ruft YouTube-Metadaten via yt-dlp ab (kein API-Key nötig). - Gibt Dict mit title, uploader, chapters, description zurück oder None. - """ - ytdlp = shutil.which("yt-dlp") - if not ytdlp: - return None - url = f"https://www.youtube.com/watch?v={video_id}" - try: - result = subprocess.run( - [ytdlp, "--dump-json", "--no-download", "--no-playlist", url], - capture_output=True, text=True, timeout=30, - ) - if result.returncode != 0 or not result.stdout.strip(): - return None - return json.loads(result.stdout) - except Exception as e: - print(f" ⚠️ YouTube-Fehler ({video_id}): {e}", file=sys.stderr) - return None - - -def _chapters_to_tracklist_text(chapters: List[Dict]) -> str: - """ - Konvertiert yt-dlp-Chapters in Tracklist-Text der vom _parse_tracklist - verarbeitetet werden kann: '1. Titel MM:SS' - """ - lines = [] - for i, ch in enumerate(chapters, 1): - title = ch.get("title", "").strip() - if not title or title.startswith("<Untitled"): - continue - secs = int(ch.get("start_time", 0)) - mm, ss = divmod(secs, 60) - lines.append(f"{i}. {title} {mm}:{ss:02d}") - return "\n".join(lines) - - -def extract_hints(scan: AlbumScan, use_ocr: bool = True) -> AlbumHints: - hints = AlbumHints(album_dir=scan.album_dir) - - # Directory name - hints.dir_artist, hints.dir_album, hints.dir_year = _parse_dirname(scan.album_dir.name) - - # Cover images - hints.cover_images = _check_cover_images(scan.image_files) - - # Tracklist files - texts: List[str] = [] - for tf in scan.tracklist_files: - txt = _read_tracklist_file(tf) - if txt: - texts.append(txt) - hints.tracklist_text = "\n\n".join(texts) if texts else None - - # OCR-Fallback: Back-Cover scannen wenn keine Tracklist-Textdatei vorhanden - if use_ocr and not hints.tracklist_text and scan.image_files: - ocr_text = _ocr_back_cover(scan.image_files) - if ocr_text: - hints.tracklist_text = ocr_text - - # YouTube-Lookup: IDs aus Dateinamen extrahieren, Metadaten per yt-dlp holen - yt_meta_by_id: Dict[str, Optional[Dict]] = {} - yt_ids_by_stem: Dict[str, str] = {} # stem (normalisiert) → youtube_id - - for audio_path in scan.audio_files: - yt_id = _extract_youtube_id(audio_path) - if yt_id: - stem_key = _clean(audio_path.stem).casefold() - yt_ids_by_stem[stem_key] = yt_id - yt_meta_by_id.setdefault(yt_id, None) - - if yt_meta_by_id: - print(f" 📺 YouTube-IDs gefunden: {', '.join(list(yt_meta_by_id.keys())[:5])}", file=sys.stderr) - for yt_id in list(yt_meta_by_id.keys())[:5]: - meta = _fetch_youtube_metadata(yt_id) - yt_meta_by_id[yt_id] = meta - - # Chapters als Tracklist nutzen wenn noch keine vorhanden - if not hints.tracklist_text: - for yt_id, meta in yt_meta_by_id.items(): - if meta and meta.get("chapters"): - chapter_text = _chapters_to_tracklist_text(meta["chapters"]) - if chapter_text: - hints.tracklist_text = chapter_text - print(f" 📺 YouTube-Chapters als Tracklist: {len(meta['chapters'])} Tracks", - file=sys.stderr) - break - - # Album-Level-Hints (erster erfolgreicher Treffer) - for yt_id, meta in yt_meta_by_id.items(): - if meta: - hints.yt_title = (meta.get("title") or "").strip() or None - hints.yt_uploader = ( - meta.get("uploader") or meta.get("channel") or "" - ).strip() or None - break - - parsed_tracklist = _parse_tracklist(hints.tracklist_text) if hints.tracklist_text else [] - - # M3U/Playlist-Reihenfolge: filename (stem, normalisiert) → Tracknummer - m3u_order: Dict[str, int] = {} - m3u_titles: Dict[str, str] = {} - for pf in scan.playlist_files: - try: - text = pf.read_text(encoding="utf-8", errors="replace") - for entry in _parse_m3u(text): - stem = _clean(Path(entry["filename"]).stem).casefold() - pos = int(entry["position"]) - if stem and stem not in m3u_order: - m3u_order[stem] = pos - if entry.get("title"): - m3u_titles[stem] = entry["title"] - except Exception as e: - print(f" ⚠️ Playlist-Lesefehler {pf.name}: {e}", file=sys.stderr) - - # Tracklist-Lookup: exakter Titel, fuzzy Titel, Katalognummer (BWV, Op., K., …) - tl_by_title: Dict[str, Dict[str, str]] = {} - tl_by_title_norm: Dict[str, Dict[str, str]] = {} - tl_by_catalog: Dict[str, Dict[str, str]] = {} - for entry in parsed_tracklist: - raw_title = entry.get("title", "") - exact_key = _clean(raw_title).casefold() - if exact_key: - tl_by_title[exact_key] = entry - norm_key = _norm_for_match(raw_title) - if norm_key: - tl_by_title_norm[norm_key] = entry - cat_key = _catalog_key(raw_title) - if cat_key: - tl_by_catalog[cat_key] = entry - - # Build TrackHints per audio file - for audio_path in sorted(scan.audio_files): - tags, duration = _read_tags(audio_path) - fn_hints = _parse_filename(audio_path.stem) - - track_num: Optional[int] = None - disc_num: Optional[int] = None - - # Track number: tag > filename - raw_tn = tags.get("tracknumber") or fn_hints.get("track") - if raw_tn: - try: - tn_int = int(str(raw_tn).split("/")[0]) - if tn_int > 0: # 0 gilt als "keine Nummer" - track_num = tn_int - except ValueError: - pass - - # Disc number: tag > filename > path segment - raw_dn = tags.get("discnumber") or fn_hints.get("disc") - if raw_dn: - try: - disc_num = int(str(raw_dn).split("/")[0]) - except ValueError: - pass - if not disc_num: - for part in audio_path.relative_to(scan.album_dir).parts[:-1]: - dm = _DISC_SECTION_RE.search(part) - if dm: - disc_num = int(dm.group(1)) - break - - title = tags.get("title") or fn_hints.get("title") - artist = tags.get("artist") or fn_hints.get("artist") - - # Tracklist-Matching: Nummer → exakter Titel → fuzzy Titel - # Wenn ein Match gefunden: disc+track aus Tracklist übernehmen (Tracklist ist - # autoritativer als M3U-Reihenfolge bei Alben mit expliziter Disc-Nummerierung). - if parsed_tracklist: - matched_tl: Optional[Dict[str, str]] = None - - # 1. Exakt per Tracknummer + Disc (nur wenn beides aus Tag/Dateiname bekannt) - if track_num and disc_num: - for tl_entry in parsed_tracklist: - tl_track = tl_entry.get("track") - tl_disc = tl_entry.get("disc", "1") - if (tl_track and int(tl_track) == track_num - and int(tl_disc) == disc_num): - matched_tl = tl_entry - break - - # 2. Exakter Titelvergleich - if matched_tl is None and title: - matched_tl = tl_by_title.get(_clean(title).casefold()) - - # 3. Fuzzy Titelvergleich (ignoriert Kommas, Apostrophe, Groß-/Kleinschreibung) - if matched_tl is None and title: - matched_tl = tl_by_title_norm.get(_norm_for_match(title)) - - # 4. Katalognummer (BWV, Op., K. …) — greift bei abgekürzten Dateinamen - if matched_tl is None and title: - cat = _catalog_key(title) - if cat: - matched_tl = tl_by_catalog.get(cat) - - if matched_tl: - # Titel aus Tracklist übernehmen wenn besser - if _is_good(matched_tl.get("title")): - title = matched_tl["title"] - # disc+track aus Tracklist sind autoritativer als M3U-Reihenfolge - try: - tl_track_n = int(matched_tl["track"]) if matched_tl.get("track") else None - tl_disc_n = int(matched_tl.get("disc", "1")) - if tl_track_n: - track_num = tl_track_n - disc_num = tl_disc_n - except (ValueError, KeyError): - pass - - # M3U-Reihenfolge nur als letzter Fallback (wenn Tracklist kein Match liefert) - if track_num is None: - stem_key = _clean(audio_path.stem).casefold() - if stem_key in m3u_order: - track_num = m3u_order[stem_key] - - # M3U-Titel als Fallback (enthält "Composer - Title" — nur nutzen wenn kein besserer Titel) - if not _is_good(title): - stem_key = _clean(audio_path.stem).casefold() - if stem_key in m3u_titles: - title = m3u_titles[stem_key] - - # YouTube-Titel als letzter Fallback (bei einzelner Datei = das ganze Video) - if not _is_good(title): - stem_key = _clean(audio_path.stem).casefold() - yt_id = yt_ids_by_stem.get(stem_key) - if yt_id: - meta = yt_meta_by_id.get(yt_id) - if meta: - yt_video_title = (meta.get("title") or "").strip() - if yt_video_title: - title = yt_video_title - - hints.tracks.append(TrackHints( - path=audio_path, - track_number=track_num, - disc_number=disc_num, - title=_clean(title) if title else None, - artist=_clean(artist) if artist else None, - duration=duration, - existing_tags=tags, - )) - - return hints diff --git a/metadata_resolver.py b/metadata_resolver.py deleted file mode 100755 index f109d04..0000000 --- a/metadata_resolver.py +++ /dev/null @@ -1,577 +0,0 @@ -from __future__ import annotations - -import os -import re -import sys -import time -from typing import Optional, List, Dict, Tuple - -from models import AlbumHints, AlbumProposal, TrackProposal - -try: - import musicbrainzngs as mb - mb.set_useragent("MusicMetadataEnricher", "1.0", "https://github.com/dschlueter") - HAS_MB = True -except ImportError: - HAS_MB = False - -try: - import acoustid - HAS_ACOUSTID = True -except ImportError: - HAS_ACOUSTID = False - -try: - import discogs_client as dc - HAS_DISCOGS = True -except ImportError: - HAS_DISCOGS = False - -try: - import anthropic - HAS_ANTHROPIC = True -except ImportError: - HAS_ANTHROPIC = False - -_MB_RATE_LIMIT = 1.1 # seconds between MusicBrainz requests -_last_mb_call = 0.0 -ACOUSTID_API_KEY = os.getenv("ACOUSTID_API_KEY", "") -ANTHROPIC_API_KEY = os.getenv("ANTHROPIC_API_KEY", "") -OPENROUTER_API_KEY = os.getenv("OPENROUTER_API_KEY", "") -DISCOGS_TOKEN = os.getenv("DISCOGS_TOKEN", "") -OLLAMA_HOST = os.getenv("OLLAMA_HOST", "http://localhost:11434") - -# qwen3:8b (5.2GB) reicht für einfache JSON-Metadaten-Ergänzung und lädt schnell (~10s) -OLLAMA_RESOLVE_MODEL = os.getenv("OLLAMA_RESOLVE_MODEL", "qwen3:8b") - - -def _mb_wait(): - global _last_mb_call - elapsed = time.monotonic() - _last_mb_call - if elapsed < _MB_RATE_LIMIT: - time.sleep(_MB_RATE_LIMIT - elapsed) - _last_mb_call = time.monotonic() - - -# --------------------------------------------------------------------------- -# AcoustID fingerprinting -# --------------------------------------------------------------------------- - -def _fingerprint_tracks(hints: AlbumHints) -> Dict[str, List[str]]: - """Returns {audio_path_str: [mbid, ...]}""" - if not HAS_ACOUSTID or not ACOUSTID_API_KEY: - return {} - results: Dict[str, List[str]] = {} - for t in hints.tracks: - try: - duration, fp = acoustid.fingerprint_file(str(t.path)) - response = acoustid.lookup(ACOUSTID_API_KEY, fp, duration, - meta="recordings releasegroups") - mbids: List[str] = [] - for result in response.get("results", []): - if result.get("score", 0) >= 0.90: - for rec in result.get("recordings", []): - mbids.append(rec["id"]) - results[str(t.path)] = mbids - except Exception as e: - print(f" ⚠️ AcoustID-Fehler {t.path.name}: {e}", file=sys.stderr) - return results - - -# --------------------------------------------------------------------------- -# MusicBrainz lookup -# --------------------------------------------------------------------------- - -def _mb_search_release(artist: Optional[str], album: Optional[str], - year: Optional[str]) -> Optional[Dict]: - if not HAS_MB or (not artist and not album): - return None - query_parts = [] - if album: - query_parts.append(f'release:"{album}"') - if artist: - query_parts.append(f'artist:"{artist}"') - if year: - query_parts.append(f'date:{year}') - query = " AND ".join(query_parts) - try: - _mb_wait() - result = mb.search_releases(query=query, limit=3) - releases = result.get("release-list", []) - if not releases: - return None - # Take highest-score release - best = max(releases, key=lambda r: int(r.get("ext:score", 0))) - score = int(best.get("ext:score", 0)) - if score < 70: - return None - return best - except Exception as e: - print(f" ⚠️ MusicBrainz-Suchfehler: {e}", file=sys.stderr) - return None - - -def _mb_get_release_tracks(release_id: str) -> Optional[List[Dict]]: - if not HAS_MB: - return None - try: - _mb_wait() - result = mb.get_release_by_id( - release_id, - includes=["recordings", "artist-credits", "labels", "release-groups"], - ) - return result.get("release") - except Exception as e: - print(f" ⚠️ MusicBrainz-Release-Fehler: {e}", file=sys.stderr) - return None - - -def _mb_recording_to_release(recording_mbid: str) -> Optional[Dict]: - if not HAS_MB: - return None - try: - _mb_wait() - result = mb.get_recording_by_id( - recording_mbid, - includes=["releases", "artist-credits", "release-groups"], - ) - rec = result.get("recording", {}) - releases = rec.get("release-list", []) - if releases: - return releases[0] - return None - except Exception as e: - print(f" ⚠️ MusicBrainz-Recording-Fehler: {e}", file=sys.stderr) - return None - - -# --------------------------------------------------------------------------- -# Discogs fallback -# --------------------------------------------------------------------------- - -def _discogs_search(artist: Optional[str], album: Optional[str]) -> Optional[Dict]: - if not HAS_DISCOGS or not DISCOGS_TOKEN: - return None - try: - client = dc.Client("MusicMetadataEnricher/1.0", user_token=DISCOGS_TOKEN) - results = client.search( - album or artist or "", - artist=artist or "", - type="release", - ) - if results.count: - r = results[0] - return { - "album": r.title, - "artist": r.artists[0].name if r.artists else None, - "year": str(r.year) if r.year else None, - "genre": r.genres[0] if r.genres else None, - "label": r.labels[0].name if r.labels else None, - "id": r.id, - } - except Exception as e: - print(f" ⚠️ Discogs-Fehler: {e}", file=sys.stderr) - return None - - -# --------------------------------------------------------------------------- -# Claude API reasoning (optional) -# --------------------------------------------------------------------------- - -def _build_resolve_prompt(hints: AlbumHints, partial: Dict) -> str: - tracks_summary = "\n".join( - f" - {('D'+str(t.disc_number)+'-') if t.disc_number else ''}T{t.track_number or '?'}: " - f"{t.title or t.path.stem}" - + (f" [{t.artist}]" if t.artist else "") - for t in hints.tracks[:20] - ) - # Tracklist-Kopfzeilen (erste 400 Zeichen, vor der Track-Liste) für Album/Label-Info - tracklist_header = "" - if hints.tracklist_text: - header_lines = [] - for line in hints.tracklist_text.splitlines(): - line = line.strip() - if not line: - continue - # Stopp bei erster Zeile die wie ein Track aussieht (1-1, 1. etc.) - if re.match(r"^\d[\d\-]\s+\S", line) or re.match(r"^\d{1,3}[.)]\s+", line): - break - header_lines.append(line) - if sum(len(l) for l in header_lines) > 400: - break - tracklist_header = "\n".join(header_lines[:15]) - - return ( - "Du bist ein Musikexperte. Analysiere diese Album-Daten und gib korrekte Metadaten zurück.\n" - "Korrigiere auch erkennbare Tippfehler (Verzeichnisnamen enthalten oft Schreibfehler).\n\n" - "WICHTIGE FELDDEFINITIONEN:\n" - '- "artist" = Komponist (Klassik) ODER Band/Sänger (Pop/Rock/Jazz)\n' - '- "albumartist" = Interpret/Performer/Dirigent (Klassik) ODER gleich wie artist (Pop)\n' - " Beispiel Klassik: artist='Johann Sebastian Bach', albumartist='Peter Hurford'\n" - " Beispiel Pop: artist='ABBA', albumartist='ABBA'\n\n" - f"Verzeichnisname: {hints.album_dir.name}\n" - f"Hinweis Künstler/Titel (aus Verzeichnis, kann vertauscht oder falsch sein): " - f"{hints.dir_artist or '?'} / {hints.dir_album or partial.get('album', '?')}\n" - f"Jahr: {hints.dir_year or partial.get('year', 'unbekannt')}\n" - + (f"YouTube-Videotitel: {hints.yt_title}\n" if hints.yt_title else "") - + (f"YouTube-Uploader/Kanal: {hints.yt_uploader}\n" if hints.yt_uploader else "") - + (f"Tracklist-Kopf (Label/Jahr/Albumtitel):\n{tracklist_header}\n\n" if tracklist_header else "") - + f"Tracks:\n{tracks_summary}\n\n" - 'Antworte NUR mit einem JSON-Objekt (null wenn unbekannt):\n' - '{"artist": ..., "album": ..., "albumartist": ..., "year": ..., "genre": ..., "label": ...}' - ) - - -def _parse_json_response(text: str) -> Optional[Dict]: - import json, re - m = re.search(r"\{.*\}", text, re.DOTALL) - if m: - try: - return json.loads(m.group()) - except Exception: - pass - return None - - -def _resolve_via_ollama(hints: AlbumHints, partial: Dict) -> Optional[Dict]: - """Lokales Reasoning via Ollama (kein API-Key nötig).""" - import urllib.request, json - prompt = _build_resolve_prompt(hints, partial) - payload = json.dumps({ - "model": OLLAMA_RESOLVE_MODEL, - "messages": [{"role": "user", "content": prompt}], - "stream": False, - "format": "json", - "options": {"temperature": 0.1}, - }).encode() - try: - req = urllib.request.Request( - f"{OLLAMA_HOST}/api/chat", - data=payload, - headers={"Content-Type": "application/json"}, - method="POST", - ) - with urllib.request.urlopen(req, timeout=240) as resp: - data = json.loads(resp.read()) - text = data.get("message", {}).get("content", "").strip() - return _parse_json_response(text) - except Exception as e: - print(f" ⚠️ Ollama-Resolve-Fehler: {e}", file=sys.stderr) - return None - - -def _resolve_via_openrouter(hints: AlbumHints, partial: Dict) -> Optional[Dict]: - """Reasoning via OpenRouter (günstige chinesische Modelle bevorzugt).""" - if not OPENROUTER_API_KEY: - return None - import urllib.request, json - prompt = _build_resolve_prompt(hints, partial) - # DeepSeek V3: extrem günstig, sehr kompetent - model = "deepseek/deepseek-chat-v3-0324" - payload = json.dumps({ - "model": model, - "messages": [{"role": "user", "content": prompt}], - "temperature": 0.1, - "max_tokens": 300, - }).encode() - try: - req = urllib.request.Request( - "https://openrouter.ai/api/v1/chat/completions", - data=payload, - headers={ - "Content-Type": "application/json", - "Authorization": f"Bearer {OPENROUTER_API_KEY}", - "HTTP-Referer": "https://pi.local", - "X-Title": "MusicMetadataEnricher", - }, - method="POST", - ) - with urllib.request.urlopen(req, timeout=30) as resp: - data = json.loads(resp.read()) - text = data["choices"][0]["message"]["content"].strip() - return _parse_json_response(text) - except Exception as e: - print(f" ⚠️ OpenRouter-Resolve-Fehler: {e}", file=sys.stderr) - return None - - -def _claude_resolve(hints: AlbumHints, partial: Dict) -> Optional[Dict]: - """ - Reihenfolge: Ollama (lokal, kostenlos) → OpenRouter (günstig). - Claude API wird bewusst nicht genutzt (zu teuer). - """ - # 1. Ollama lokal (bevorzugt — kostenlos, RTX 3090) - result = _resolve_via_ollama(hints, partial) - if result: - return result - - # 2. OpenRouter (DeepSeek V3, günstig) wenn Key gesetzt - if OPENROUTER_API_KEY: - result = _resolve_via_openrouter(hints, partial) - if result: - return result - - return None - - -# --------------------------------------------------------------------------- -# Main resolver -# --------------------------------------------------------------------------- - -def resolve( - hints: AlbumHints, - use_fingerprint: bool = True, - use_api: bool = True, - use_claude: bool = True, -) -> AlbumProposal: - confidence = 0.0 - sources: List[str] = [] - notes: List[str] = [] - - artist = hints.dir_artist - album = hints.dir_album - year = hints.dir_year - genre: Optional[str] = None - label: Optional[str] = None - release_mbid: Optional[str] = None - mb_tracks: Optional[List] = None - - # Collect artist/album from existing tags (majority vote) - tag_artists = [t.existing_tags.get("artist") for t in hints.tracks if t.existing_tags.get("artist")] - tag_albums = [t.existing_tags.get("album") for t in hints.tracks if t.existing_tags.get("album")] - if tag_artists: - from collections import Counter - artist = artist or Counter(tag_artists).most_common(1)[0][0] - if tag_albums: - from collections import Counter - album = album or Counter(tag_albums).most_common(1)[0][0] - - # Tag year/genre/label - import re as _re - for t in hints.tracks: - raw_year = t.existing_tags.get("date") or t.existing_tags.get("year") - if raw_year and not year: - # Strip invisible chars so ID3TimeStamp validation doesn't fail later - year = _re.sub(r"[^\d\-T:+Z]", "", str(raw_year)).strip()[:10] or None - genre = genre or t.existing_tags.get("genre") - label = label or t.existing_tags.get("label") or t.existing_tags.get("organization") - - # YouTube-Metadaten als zusätzliche Hinweise (Uploader → Künstler, Titel → Album/Track) - if hints.yt_uploader and not artist: - artist = hints.yt_uploader - if hints.yt_title and not album: - album = hints.yt_title - - if artist or album: - confidence += 0.05 - sources.append("local-hints") - if hints.yt_title or hints.yt_uploader: - sources.append("youtube") - - # AcoustID fingerprinting - fp_mbids: Dict[str, List[str]] = {} - if use_fingerprint and use_api and HAS_ACOUSTID and ACOUSTID_API_KEY: - fp_mbids = _fingerprint_tracks(hints) - if fp_mbids: - confidence += 0.20 - sources.append("acoustid") - # Try to get release from first matched recording - for mbids in fp_mbids.values(): - for mbid in mbids[:1]: - rel = _mb_recording_to_release(mbid) - if rel: - release_mbid = rel.get("id") - confidence += 0.25 - sources.append("musicbrainz-fingerprint") - break - if release_mbid: - break - - # MusicBrainz text search - if use_api and HAS_MB and not release_mbid: - mb_result = _mb_search_release(artist, album, year) - if mb_result: - release_mbid = mb_result.get("id") - score = int(mb_result.get("ext:score", 0)) - confidence += 0.30 * (score / 100) - sources.append("musicbrainz-text") - notes.append(f"MusicBrainz score: {score}") - - # Fetch full release data - if use_api and release_mbid: - full_release = _mb_get_release_tracks(release_mbid) - if full_release: - if not artist: - creds = full_release.get("artist-credit", []) - artist = "".join(c.get("artist", {}).get("name", "") + c.get("joinphrase", "") - for c in creds if isinstance(c, dict)).strip() or artist - if not album: - album = full_release.get("title", album) - if not year: - year = full_release.get("date", "")[:4] or None - label_info = full_release.get("label-info-list", []) - if label_info and not label: - label = label_info[0].get("label", {}).get("name") if label_info else None - rg = full_release.get("release-group", {}) - if not genre: - genre = (rg.get("primary-type") or "").strip() or None - mb_tracks = [] - for medium in full_release.get("medium-list", []): - disc_num = medium.get("position", 1) - for track in medium.get("track-list", []): - mb_tracks.append({ - "disc": disc_num, - "number": int(track.get("number", 0) or 0), - "title": track.get("recording", {}).get("title", ""), - "artist": track.get("artist-credit-phrase", ""), - "mbid": track.get("recording", {}).get("id"), - }) - - # Discogs fallback - if use_api and HAS_DISCOGS and DISCOGS_TOKEN and not release_mbid: - dg = _discogs_search(artist, album) - if dg: - artist = artist or dg.get("artist") - album = album or dg.get("album") - year = year or dg.get("year") - genre = genre or dg.get("genre") - label = label or dg.get("label") - confidence += 0.15 - sources.append("discogs") - - # LLM-Reasoning für verbleibende Lücken: - # Reihenfolge: Ollama lokal → OpenRouter (DeepSeek, günstig) → Claude API - cl_albumartist: Optional[str] = None - partial = {"artist": artist, "album": album, "year": year} - if use_claude and use_api: - if not artist or not album or confidence < 0.5: - cl = _claude_resolve(hints, partial) - if cl: - if confidence < 0.3: - # Sehr unsicher: LLM darf auch bestehende Werte korrigieren - # (z.B. Tippfehler im Albumtitel aus dem Verzeichnisnamen) - artist = cl.get("artist") or artist - album = cl.get("album") or album - year = cl.get("year") or year - genre = cl.get("genre") or genre - label = cl.get("label") or label - else: - artist = artist or cl.get("artist") - album = album or cl.get("album") - year = year or cl.get("year") - genre = genre or cl.get("genre") - label = label or cl.get("label") - cl_albumartist = cl.get("albumartist") or None - confidence += 0.10 - sources.append("llm-resolve") - - # Finalize albumartist - # Priorität: (1) LLM-albumartist bei niedriger Konfidenz - # (2) dir_artist wenn Verzeichnisname einen Künstler nennt - # (3) Heuristiken (Various Artists, Mehrheitsabstimmung) - # Rationale: "Bach_Organ_-_Peter_Hurford" → dir_artist="Bach Organ" ist kein Künstler, - # aber der Verzeichnisname sieht aus wie Künstler; LLM kann das korrekt auflösen. - track_artists = [t.artist for t in hints.tracks if t.artist] - from collections import Counter - distinct_artists = set(a for a in track_artists if a) - - _bad_aa = {"various artists", "unknown artist", "unknown", "va"} - def _good_aa(s: Optional[str]) -> bool: - return bool(s) and s.casefold().strip() not in _bad_aa - - if _good_aa(cl_albumartist) and confidence < 0.4: - # LLM kennt den echten Albumkünstler besser als der Verzeichnisname - albumartist = cl_albumartist # type: ignore[assignment] - elif hints.dir_artist: - albumartist = hints.dir_artist - elif len(distinct_artists) >= 3: - albumartist = "Various Artists" - elif track_artists: - albumartist = artist or Counter(track_artists).most_common(1)[0][0] - else: - albumartist = artist or "Unknown Artist" - - album = album or hints.album_dir.name.replace("_", " ") - artist = artist or albumartist - confidence = min(confidence, 1.0) - - # Build track proposals - # `artist` = Komponist/Hauptkünstler (LLM-aufgelöst), `albumartist` = Performer - # Werden beide weitergegeben damit _build_track_proposals richtig zuordnen kann. - track_proposals = _build_track_proposals(hints, mb_tracks, album, albumartist, composer=artist) - - return AlbumProposal( - album_dir=hints.album_dir, - album=album, - albumartist=albumartist, - date=year, - genre=genre, - label=label, - mbid=release_mbid, - cover_path=None, - cover_source=None, - tracks=track_proposals, - confidence=confidence, - sources=sources, - notes=notes, - ) - - -def _build_track_proposals( - hints: AlbumHints, - mb_tracks: Optional[List], - album: str, - album_artist: str, - composer: Optional[str] = None, -) -> List[TrackProposal]: - proposals: List[TrackProposal] = [] - - for th in sorted(hints.tracks, key=lambda t: (t.disc_number or 1, t.track_number or 9999, str(t.path))): - title = th.title - track_num = th.track_number - disc_num = th.disc_number - - # Klassik-Fall: Performer aus Dateiname, Komponist aus LLM - # Wenn th.artist == albumartist (Performer), und wir den Komponisten kennen, - # wird der Komponist als Track-Artist gesetzt → Filename: TT_-_Performer_-_Komponist_-_Werk - th_artist_cf = (th.artist or "").casefold().strip() - aa_cf = album_artist.casefold().strip() - if composer and th_artist_cf == aa_cf and th_artist_cf: - # Performer == albumartist → Komponist als Track-Artist - artist = composer - else: - artist = th.artist or album_artist - - # Try to match from MusicBrainz track list - if mb_tracks and track_num: - for mb_t in mb_tracks: - if mb_t["number"] == track_num and mb_t["disc"] == (disc_num or 1): - if mb_t.get("title"): - title = mb_t["title"] - if mb_t.get("artist"): - artist = mb_t["artist"] - break - - title = title or th.path.stem - - proposals.append(TrackProposal( - path=th.path, - title=title, - artist=artist, - track_number=track_num, - disc_number=disc_num, - mbid=None, - )) - - # Sequenzielle Nummerierung als letzter Fallback: - # Tracks ohne Nummer (None) erhalten eine laufende Nummer pro Disc. - # Damit werden "00" und "??" im Dateinamen beim --rename verhindert. - if any(p.track_number is None for p in proposals): - disc_counters: Dict[int, int] = {} - for p in proposals: - if p.track_number is None: - disc = p.disc_number or 1 - disc_counters[disc] = disc_counters.get(disc, 0) + 1 - p.track_number = disc_counters[disc] - - return proposals diff --git a/models.py b/models.py deleted file mode 100755 index a95662f..0000000 --- a/models.py +++ /dev/null @@ -1,84 +0,0 @@ -from __future__ import annotations - -from dataclasses import dataclass, field -from pathlib import Path -from typing import Optional, List, Dict - - -AUDIO_EXTENSIONS = { - ".mp3", ".flac", ".m4a", ".aac", ".ogg", ".opus", - ".wav", ".wma", ".aiff", ".ape", -} -IMAGE_EXTENSIONS = {".jpg", ".jpeg", ".png", ".webp", ".bmp", ".gif"} -TRACKLIST_EXTENSIONS = {".txt", ".htm", ".html", ".nfo"} -PLAYLIST_EXTENSIONS = {".m3u", ".m3u8", ".pls"} - - -@dataclass -class ScannedFile: - path: Path - kind: str # "audio" | "image" | "tracklist" | "playlist" | "other" - - -@dataclass -class AlbumScan: - album_dir: Path - audio_files: List[Path] = field(default_factory=list) - image_files: List[Path] = field(default_factory=list) - tracklist_files: List[Path] = field(default_factory=list) - playlist_files: List[Path] = field(default_factory=list) # .m3u / .m3u8 / .pls - other_files: List[Path] = field(default_factory=list) - - -@dataclass -class TrackHints: - path: Path - track_number: Optional[int] = None - disc_number: Optional[int] = None - title: Optional[str] = None - artist: Optional[str] = None - duration: Optional[float] = None - existing_tags: Dict[str, str] = field(default_factory=dict) - - -@dataclass -class AlbumHints: - album_dir: Path - dir_artist: Optional[str] = None - dir_album: Optional[str] = None - dir_year: Optional[str] = None - tracklist_text: Optional[str] = None # merged text from all tracklist files - cover_images: List[Path] = field(default_factory=list) - tracks: List[TrackHints] = field(default_factory=list) - yt_title: Optional[str] = None # YouTube video title (if found) - yt_uploader: Optional[str] = None # YouTube channel/uploader name - - -@dataclass -class TrackProposal: - path: Path - title: str - artist: str - track_number: Optional[int] - disc_number: Optional[int] - new_filename: Optional[str] = None # only set when --rename is active - mbid: Optional[str] = None - conductor: Optional[str] = None # classical: Dirigent - orchestra: Optional[str] = None # classical: Orchester / Ensemble - - -@dataclass -class AlbumProposal: - album_dir: Path - album: str - albumartist: str - date: Optional[str] - genre: Optional[str] - label: Optional[str] - mbid: Optional[str] # MusicBrainz release ID - cover_path: Optional[Path] # resolved local or downloaded cover - cover_source: Optional[str] # "local" | "musicbrainz" | "discogs" - tracks: List[TrackProposal] - confidence: float - sources: List[str] = field(default_factory=list) - notes: List[str] = field(default_factory=list) diff --git a/music_enricher.py b/music_enricher.py deleted file mode 100644 index 3e86d4c..0000000 --- a/music_enricher.py +++ /dev/null @@ -1,269 +0,0 @@ -#!/usr/bin/env python3 -""" -music_enricher.py -KI-gestützter Musik-Metadaten-Enricher für Jellyfin-Bibliotheken. - -Pipeline pro Album: - Scan → HintExtractor → MetadataResolver → CoverHandler → Review → Executor -""" -from __future__ import annotations - -import argparse -import os -import sys -from pathlib import Path -from typing import Any, Dict, List, Optional - -try: - from tqdm import tqdm - HAS_TQDM = True -except ImportError: - HAS_TQDM = False - -from models import AlbumProposal -from scanner import scan_album, collect_album_dirs -from hint_extractor import extract_hints -from metadata_resolver import resolve -from cover_handler import resolve_cover -from executor import execute_album, write_report - - -def maybe_tqdm(iterable, show: bool, **kwargs): - return tqdm(iterable, **kwargs) if show else iterable - - -# --------------------------------------------------------------------------- -# Review / Display -# --------------------------------------------------------------------------- - -def _print_proposal(proposal: AlbumProposal) -> None: - conf_bar = "█" * int(proposal.confidence * 10) + "░" * (10 - int(proposal.confidence * 10)) - print(f"\n{'─' * 60}") - print(f"💿 {proposal.album_dir.name}") - print(f" Album: {proposal.album}") - print(f" Artist: {proposal.albumartist}") - print(f" Jahr: {proposal.date or '–'}") - print(f" Genre: {proposal.genre or '–'}") - print(f" Label: {proposal.label or '–'}") - print(f" Cover: {proposal.cover_source or '–'} ({proposal.cover_path.name if proposal.cover_path else 'keins'})") - print(f" Konfidenz: [{conf_bar}] {proposal.confidence:.0%} Quellen: {', '.join(proposal.sources) or '–'}") - if proposal.notes: - for n in proposal.notes: - print(f" ℹ️ {n}") - print(f" Tracks ({len(proposal.tracks)}):") - for tp in proposal.tracks[:8]: - tn = f"{tp.disc_number}-{tp.track_number:02d}" if tp.disc_number and tp.disc_number > 1 else ( - f"{tp.track_number:02d}" if tp.track_number else "??") - print(f" {tn} {tp.artist} – {tp.title}") - if len(proposal.tracks) > 8: - print(f" … und {len(proposal.tracks) - 8} weitere") - - -def _interactive_review(proposal: AlbumProposal) -> bool: - """Returns True if user accepts the proposal.""" - _print_proposal(proposal) - while True: - answer = input("\n [Enter] Akzeptieren [s] Überspringen [q] Abbrechen: ").strip().lower() - if answer in ("", "j", "y"): - return True - if answer == "s": - return False - if answer == "q": - sys.exit(0) - - -# --------------------------------------------------------------------------- -# Main pipeline -# --------------------------------------------------------------------------- - -def process_album( - album_dir: Path, - args: argparse.Namespace, - report_data: List[Dict[str, Any]], -) -> Dict[str, int]: - stats = {"tags_written": 0, "covers_embedded": 0, "files_renamed": 0, - "errors": 0, "skipped": 0} - - try: - scan = scan_album(album_dir) - if not scan.audio_files: - stats["skipped"] += 1 - return stats - - hints = extract_hints(scan, use_ocr=not args.no_api) - - proposal = resolve( - hints, - use_fingerprint=not args.no_fingerprint, - use_api=not args.no_api, - use_claude=not args.no_api, - ) - - # Cover art - cover_path, cover_source = resolve_cover( - hints.cover_images, - proposal.mbid, - album_dir, - ) - if cover_path and not args.no_cover: - proposal.cover_path = cover_path - proposal.cover_source = cover_source - - # Set proposed filenames if --rename - if args.rename: - from executor import _proposed_filename - for tp in proposal.tracks: - tp.new_filename = _proposed_filename(tp, tp.path.suffix) - - # Review step - if args.dry_run: - _print_proposal(proposal) - for tp in proposal.tracks: - report_data.append({ - "status": "dry-run", - "album_dir": str(album_dir.name), - "track_path": str(tp.path), - "old_title": tp.path.stem, - "new_title": tp.title, - "old_artist": "", - "new_artist": tp.artist, - "album": proposal.album, - "albumartist": proposal.albumartist, - "date": proposal.date or "", - "genre": proposal.genre or "", - "label": proposal.label or "", - "track_number": tp.track_number or "", - "disc_number": tp.disc_number or "", - "cover_embedded": False, - "renamed_to": tp.new_filename or "", - "confidence": f"{proposal.confidence:.2f}", - "sources": ", ".join(proposal.sources), - }) - return stats - - accepted = True - if not args.auto: - accepted = _interactive_review(proposal) - elif args.auto and proposal.confidence < args.confidence: - print(f" ⏭️ Konfidenz {proposal.confidence:.0%} < {args.confidence:.0%} → übersprungen: {album_dir.name}") - stats["skipped"] += 1 - return stats - else: - _print_proposal(proposal) - - if not accepted: - stats["skipped"] += 1 - return stats - - album_stats = execute_album( - proposal=proposal, - backup_dir=args.backup, - do_rename=args.rename, - embed_cover_art=args.embed_cover, - dry_run=False, - report_data=report_data, - ) - for k, v in album_stats.items(): - stats[k] = stats.get(k, 0) + v - - except Exception as e: - stats["errors"] += 1 - print(f" ❌ Fehler in {album_dir.name}: {e}", file=sys.stderr) - import traceback - traceback.print_exc(file=sys.stderr) - - return stats - - -def main() -> None: - parser = argparse.ArgumentParser( - description="KI-gestützter Musik-Metadaten-Enricher für Jellyfin", - formatter_class=argparse.RawTextHelpFormatter, - ) - parser.add_argument("paths", nargs="*", - help="Root-Verzeichnisse (direkte Unterordner = Alben)") - parser.add_argument("--album", type=Path, - help="Einzelnes Album-Verzeichnis verarbeiten") - parser.add_argument("--dry-run", action="store_true", - help="Vorschläge anzeigen, nichts schreiben") - parser.add_argument("--auto", action="store_true", - help="Kein interaktiver Review-Schritt") - parser.add_argument("--confidence", type=float, default=0.85, - help="Min-Konfidenz für --auto (default: 0.85)") - parser.add_argument("--rename", action="store_true", - help="Dateien nach Schema umbenennen: TT - Artist - Titel.ext") - parser.add_argument("--embed-cover", action="store_true", - help="Cover-Art in Audiodatei einbetten") - parser.add_argument("--backup", type=Path, - help="Backup-Verzeichnis vor Änderungen") - parser.add_argument("--report", type=Path, - help="CSV-Report der Änderungen") - parser.add_argument("--no-fingerprint", action="store_true", - help="AcoustID-Fingerprinting überspringen") - parser.add_argument("--no-api", action="store_true", - help="Keine externen API-Calls") - parser.add_argument("--no-cover", action="store_true", - help="Kein Cover-Art-Download") - parser.add_argument("--no-tqdm", action="store_true", - help="Fortschrittsanzeige deaktivieren") - - args = parser.parse_args() - - if not args.album and not args.paths: - parser.error("Mindestens ein Pfad oder --album erforderlich.") - - show_progress = HAS_TQDM and not args.no_tqdm and args.auto - report_data: List[Dict[str, Any]] = [] - totals: Dict[str, int] = { - "albums": 0, "skipped": 0, "tags_written": 0, - "covers_embedded": 0, "files_renamed": 0, "errors": 0, - } - - # Collect album directories - album_dirs: List[Path] = [] - if args.album: - album_dirs.append(args.album.expanduser().resolve()) - for raw in args.paths: - root = Path(raw).expanduser().resolve() - if not root.is_dir(): - print(f"⚠️ Kein Verzeichnis: {root}") - continue - album_dirs.extend(collect_album_dirs(root)) - - if not album_dirs: - print("⚠️ Keine Album-Verzeichnisse gefunden.") - sys.exit(1) - - print(f"🎵 {len(album_dirs)} Album-Verzeichnisse gefunden.") - if os.getenv("OLLAMA_HOST") or True: # Ollama always attempted - print("🤖 LLM-Resolve: Ollama → OpenRouter (kein Claude)") - if not args.no_api: - print("🔍 MusicBrainz-Lookup aktiv.") - if args.dry_run: - print("🧪 DRY-RUN — nichts wird geschrieben.") - - for album_dir in maybe_tqdm(album_dirs, show_progress, - desc="Alben", unit="album", dynamic_ncols=True): - stats = process_album(album_dir, args, report_data) - totals["albums"] += 1 - for k in ("skipped", "tags_written", "covers_embedded", "files_renamed", "errors"): - totals[k] += stats.get(k, 0) - - if args.report and report_data: - write_report(report_data, args.report) - - print(f"\n{'=' * 50}") - print("✅ Zusammenfassung:") - print(f" 💿 Alben verarbeitet: {totals['albums']}") - print(f" ⏭️ Übersprungen: {totals['skipped']}") - print(f" 🏷️ Tags geschrieben: {totals['tags_written']}") - print(f" 🖼️ Cover eingebettet: {totals['covers_embedded']}") - print(f" 📝 Dateien umbenannt: {totals['files_renamed']}") - print(f" ❌ Fehler: {totals['errors']}") - if args.dry_run: - print(" 🧪 Modus: DRY-RUN") - print("=" * 50) - - -if __name__ == "__main__": - main() diff --git a/scanner.py b/scanner.py deleted file mode 100644 index de06281..0000000 --- a/scanner.py +++ /dev/null @@ -1,99 +0,0 @@ -from __future__ import annotations - -import re -import sys -from pathlib import Path -from typing import List - -from models import AlbumScan, AUDIO_EXTENSIONS, IMAGE_EXTENSIONS, TRACKLIST_EXTENSIONS, PLAYLIST_EXTENSIONS - -_DISC_DIR_RE = re.compile(r"(?i)^(?:cd|disc|disk|side)[_ \-]*\d{1,2}$") - - -def _is_hidden(name: str) -> bool: - return name.startswith(".") or name.startswith("_") - - -def _is_disc_dir(name: str) -> bool: - """True für Ordner wie 'CD1', 'Disc 2', 'Side A', 'Disk_1'.""" - return bool(_DISC_DIR_RE.match(name)) - - -def scan_album(album_dir: Path) -> AlbumScan: - """ - Scannt ein Album-Verzeichnis. - - Rekursions-Regel: - - Hat das Album-Verzeichnis selbst Audio-Dateien → kein Abstieg in Unterordner - (Einzelscheibe; Sub-Ordner wie Artworks, Scans, irrtümliche Kopien werden ignoriert). - - Hat der Root KEINE Audio-Dateien → Abstieg nur in Disc-Unterordner (CD1, Disc 2 …). - """ - result = AlbumScan(album_dir=album_dir) - - # Erst nur die Wurzel-Ebene scannen, um zu entscheiden ob rekursiert wird - root_has_audio = any( - (album_dir / name).suffix.lower() in AUDIO_EXTENSIONS - for name in _listdir(album_dir) - if not _is_hidden(name) - ) - - if root_has_audio: - # Nur Root-Ebene — keine Unterordner - _scan_dir(album_dir, album_dir, result, recurse=False) - else: - # Kein Audio an der Wurzel → Multi-CD: nur Disc-Unterordner - _scan_dir(album_dir, album_dir, result, recurse=True) - - result.audio_files.sort() - result.image_files.sort() - result.tracklist_files.sort() - result.playlist_files.sort() - return result - - -def _listdir(path: Path) -> List[str]: - try: - return [e.name for e in path.iterdir()] - except (PermissionError, OSError) as e: - print(f"⚠️ Scan-Fehler: {e}", file=sys.stderr) - return [] - - -def _scan_dir(current: Path, album_dir: Path, result: AlbumScan, recurse: bool) -> None: - try: - entries = sorted(current.iterdir()) - except (PermissionError, OSError) as e: - print(f"⚠️ Scan-Fehler {current}: {e}", file=sys.stderr) - return - - for entry in entries: - name = entry.name - if _is_hidden(name): - continue - if entry.is_dir(): - if recurse and _is_disc_dir(name): - _scan_dir(entry, album_dir, result, recurse=True) - # Andere Unterordner (Artworks, irrtümliche Kopien…) werden übersprungen - elif entry.is_file(): - ext = entry.suffix.lower() - if ext in AUDIO_EXTENSIONS: - result.audio_files.append(entry) - elif ext in IMAGE_EXTENSIONS: - result.image_files.append(entry) - elif ext in TRACKLIST_EXTENSIONS: - result.tracklist_files.append(entry) - elif ext in PLAYLIST_EXTENSIONS: - result.playlist_files.append(entry) - else: - result.other_files.append(entry) - - -def collect_album_dirs(root: Path) -> List[Path]: - dirs: List[Path] = [] - try: - for item in sorted(root.iterdir()): - if item.is_dir() and not _is_hidden(item.name): - dirs.append(item) - except (PermissionError, OSError) as e: - print(f"⚠️ Lesefehler {root}: {e}", file=sys.stderr) - return dirs diff --git a/test_suite_enricher.py b/test_suite_enricher.py deleted file mode 100644 index 71bc588..0000000 --- a/test_suite_enricher.py +++ /dev/null @@ -1,274 +0,0 @@ -#!/usr/bin/env python3 -"""test_suite_enricher.py — Unit- und Integrationstests für music_enricher.""" -from __future__ import annotations - -import sys -import tempfile -import traceback -from pathlib import Path -from typing import Callable - -sys.path.insert(0, str(Path(__file__).parent)) - -from models import AlbumScan, TrackHints, AlbumHints - -RESULTS: list[dict] = [] - - -def record(test_id: str, passed: bool, detail: str = "") -> None: - RESULTS.append({"id": test_id, "status": "PASS" if passed else "FAIL", "detail": detail}) - - -def run_case(test_id: str, fn: Callable[[], str]) -> None: - try: - detail = fn() - record(test_id, True, detail) - except Exception: - record(test_id, False, traceback.format_exc()[:300]) - - -# --------------------------------------------------------------------------- -# hint_extractor Tests -# --------------------------------------------------------------------------- - -def test_parse_dirname_artist_album() -> str: - from hint_extractor import _parse_dirname - artist, album, year = _parse_dirname("Pink_Floyd_-_The_Wall") - assert artist and "Pink" in artist, f"artist: {artist}" - assert album and "Wall" in album, f"album: {album}" - return f"artist={artist!r}, album={album!r}" - - -def test_parse_dirname_with_year() -> str: - from hint_extractor import _parse_dirname - artist, album, year = _parse_dirname("Abba_-_Greatest_Hits_1992") - assert year == "1992", f"year: {year}" - return f"year={year}" - - -def test_parse_dirname_album_only() -> str: - from hint_extractor import _parse_dirname - artist, album, year = _parse_dirname("Beethoven_Complete_Edition") - assert album is not None, "album should not be None" - return f"album={album!r}" - - -def test_parse_filename_track_artist_title() -> str: - from hint_extractor import _parse_filename - r = _parse_filename("07 - ABBA - Dancing Queen") - assert r.get("track") == "07", f"track: {r}" - assert "ABBA" in r.get("artist", ""), f"artist: {r}" - assert "Dancing" in r.get("title", ""), f"title: {r}" - return str(r) - - -def test_parse_filename_disc_track_title() -> str: - from hint_extractor import _parse_filename - r = _parse_filename("2-07 - Bach - Toccata") - assert r.get("disc") == "2", f"disc: {r}" - assert r.get("track") == "07", f"track: {r}" - return str(r) - - -def test_parse_filename_track_title() -> str: - from hint_extractor import _parse_filename - r = _parse_filename("01 - Dancing Queen") - assert r.get("track") == "01", f"track: {r}" - assert "Dancing" in r.get("title", ""), f"title: {r}" - return str(r) - - -def test_parse_filename_artist_title() -> str: - from hint_extractor import _parse_filename - r = _parse_filename("Miles Davis - So What") - assert "Miles" in r.get("artist", ""), f"artist: {r}" - assert "What" in r.get("title", ""), f"title: {r}" - return str(r) - - -def test_parse_tracklist_numbered() -> str: - from hint_extractor import _parse_tracklist - text = "1. Dancing Queen\n2. Waterloo\n3. Fernando" - tracks = _parse_tracklist(text) - assert len(tracks) == 3, f"count: {len(tracks)}" - assert tracks[0]["title"] == "Dancing Queen", f"title: {tracks[0]}" - return f"{len(tracks)} tracks parsed" - - -def test_parse_tracklist_with_duration() -> str: - from hint_extractor import _parse_tracklist - text = "1-1 Toccata And Fugue 9:17\n1-2 Heartbeat 2:19\n2-1 Finale 5:00" - tracks = _parse_tracklist(text) - assert len(tracks) >= 2, f"count: {len(tracks)}" - assert tracks[0]["disc"] == "1", f"disc: {tracks[0]}" - return f"{len(tracks)} tracks parsed" - - -def test_parse_tracklist_with_disc_sections() -> str: - from hint_extractor import _parse_tracklist - text = "CD 1\n1. Track A\n2. Track B\nCD 2\n1. Track C" - tracks = _parse_tracklist(text) - disc2 = [t for t in tracks if t.get("disc") == "2"] - assert len(disc2) >= 1, f"disc2: {disc2}" - return f"{len(tracks)} total, {len(disc2)} on disc 2" - - -# --------------------------------------------------------------------------- -# Scanner Tests -# --------------------------------------------------------------------------- - -def test_scanner_classifies_files() -> str: - from scanner import scan_album - with tempfile.TemporaryDirectory() as tmpdir: - root = Path(tmpdir) / "TestAlbum" - root.mkdir() - (root / "01 - Song.mp3").write_bytes(b"\x00" * 100) - (root / "02 - Song.flac").write_bytes(b"\x00" * 100) - (root / "front.jpg").write_bytes(b"\xff\xd8" + b"\x00" * 100) - (root / "tracklist.txt").write_text("1. Track One\n2. Track Two") - (root / "notes.pdf").write_bytes(b"\x00" * 50) - - scan = scan_album(root) - assert len(scan.audio_files) == 2, f"audio: {scan.audio_files}" - assert len(scan.image_files) == 1, f"images: {scan.image_files}" - assert len(scan.tracklist_files) == 1, f"tracklists: {scan.tracklist_files}" - return "scan OK: 2 audio, 1 image, 1 tracklist" - - -def test_scanner_ignores_hidden() -> str: - from scanner import scan_album - with tempfile.TemporaryDirectory() as tmpdir: - root = Path(tmpdir) / "Album" - root.mkdir() - (root / "song.mp3").write_bytes(b"\x00" * 100) - (root / ".hidden.mp3").write_bytes(b"\x00" * 100) - (root / "_trash.mp3").write_bytes(b"\x00" * 100) - scan = scan_album(root) - assert len(scan.audio_files) == 1, f"should ignore hidden: {scan.audio_files}" - return "hidden files correctly ignored" - - -# --------------------------------------------------------------------------- -# extract_hints integration -# --------------------------------------------------------------------------- - -def test_extract_hints_from_scan() -> str: - from scanner import scan_album - from hint_extractor import extract_hints - with tempfile.TemporaryDirectory() as tmpdir: - root = Path(tmpdir) / "ABBA_-_Greatest_Hits" - root.mkdir() - (root / "01 - ABBA - Dancing Queen.mp3").write_bytes(b"\x00" * 1024) - (root / "02 - ABBA - Waterloo.mp3").write_bytes(b"\x00" * 1024) - (root / "tracklist.txt").write_text("1. Dancing Queen\n2. Waterloo\n") - - scan = scan_album(root) - hints = extract_hints(scan) - assert hints.dir_album is not None, "album hint missing" - assert len(hints.tracks) == 2, f"tracks: {len(hints.tracks)}" - assert hints.tracklist_text is not None, "tracklist not read" - return f"hints OK: album={hints.dir_album!r}, {len(hints.tracks)} tracks" - - -def test_extract_hints_multi_disc() -> str: - from scanner import scan_album - from hint_extractor import extract_hints - with tempfile.TemporaryDirectory() as tmpdir: - root = Path(tmpdir) / "Bach_Complete" - (root / "CD1").mkdir(parents=True) - (root / "CD2").mkdir() - (root / "CD1" / "01 - Toccata.mp3").write_bytes(b"\x00" * 1024) - (root / "CD2" / "01 - Fugue.mp3").write_bytes(b"\x00" * 1024) - - scan = scan_album(root) - hints = extract_hints(scan) - disc_nums = {t.disc_number for t in hints.tracks if t.disc_number} - assert 1 in disc_nums, f"disc 1 missing: {disc_nums}" - assert 2 in disc_nums, f"disc 2 missing: {disc_nums}" - return f"disc numbers detected: {disc_nums}" - - -# --------------------------------------------------------------------------- -# executor Tests -# --------------------------------------------------------------------------- - -def test_proposed_filename_single_disc() -> str: - from executor import _proposed_filename - from models import TrackProposal - from pathlib import Path - # Pop schema: albumartist == track artist → TT_-_Artist_-_Title - tp = TrackProposal(path=Path("dummy.mp3"), title="Dancing Queen", - artist="ABBA", track_number=1, disc_number=None) - name = _proposed_filename(tp, ".mp3", albumartist="ABBA") - assert name == "01_-_ABBA_-_Dancing_Queen.mp3", f"got: {name!r}" - return name - - -def test_proposed_filename_multi_disc() -> str: - from executor import _proposed_filename - from models import TrackProposal - from pathlib import Path - # Classical schema: albumartist (performer) ≠ track artist (composer) - tp = TrackProposal(path=Path("dummy.flac"), title="Toccata", - artist="Bach", track_number=7, disc_number=2) - name = _proposed_filename(tp, ".flac", albumartist="Gardiner") - assert name == "2-07_-_Gardiner_-_Bach_-_Toccata.flac", f"got: {name!r}" - return name - - -def test_proposed_filename_sanitizes_chars() -> str: - from executor import _proposed_filename - from models import TrackProposal - from pathlib import Path - tp = TrackProposal(path=Path("x.mp3"), title='Track: "Live" / Today', - artist="Artist?", track_number=3, disc_number=None) - name = _proposed_filename(tp, ".mp3") - assert "/" not in name and ":" not in name, f"unsafe chars in: {name!r}" - return name - - -# --------------------------------------------------------------------------- -# Runner -# --------------------------------------------------------------------------- - -def main() -> None: - print("🧪 Starte Music Metadata Enricher Tests...") - - cases = [ - ("UNIT_01_parse_dirname_artist_album", test_parse_dirname_artist_album), - ("UNIT_02_parse_dirname_with_year", test_parse_dirname_with_year), - ("UNIT_03_parse_dirname_album_only", test_parse_dirname_album_only), - ("UNIT_04_parse_filename_track_artist_title", test_parse_filename_track_artist_title), - ("UNIT_05_parse_filename_disc_track_title", test_parse_filename_disc_track_title), - ("UNIT_06_parse_filename_track_title", test_parse_filename_track_title), - ("UNIT_07_parse_filename_artist_title", test_parse_filename_artist_title), - ("UNIT_08_parse_tracklist_numbered", test_parse_tracklist_numbered), - ("UNIT_09_parse_tracklist_with_duration", test_parse_tracklist_with_duration), - ("UNIT_10_parse_tracklist_disc_sections", test_parse_tracklist_with_disc_sections), - ("UNIT_11_scanner_classifies_files", test_scanner_classifies_files), - ("UNIT_12_scanner_ignores_hidden", test_scanner_ignores_hidden), - ("UNIT_13_extract_hints_from_scan", test_extract_hints_from_scan), - ("UNIT_14_extract_hints_multi_disc", test_extract_hints_multi_disc), - ("UNIT_15_proposed_filename_single_disc", test_proposed_filename_single_disc), - ("UNIT_16_proposed_filename_multi_disc", test_proposed_filename_multi_disc), - ("UNIT_17_proposed_filename_sanitizes_chars", test_proposed_filename_sanitizes_chars), - ] - - for test_id, fn in cases: - run_case(test_id, fn) - - print("=" * 70) - for r in RESULTS: - icon = "✅" if r["status"] == "PASS" else "❌" - detail = r["detail"][:100] + "..." if len(r["detail"]) > 100 else r["detail"] - print(f"{icon} [{r['status']}] {r['id']} {detail}") - print("=" * 70) - - passed = sum(1 for r in RESULTS if r["status"] == "PASS") - total = len(RESULTS) - print(f"📊 {passed}/{total} Tests erfolgreich") - sys.exit(0 if passed == total else 1) - - -if __name__ == "__main__": - main()