From f7cf520dbe287f30fe754f957956f1d99c710690 Mon Sep 17 00:00:00 2001 From: dschlueter Date: Tue, 28 Apr 2026 16:55:18 +0200 Subject: [PATCH 01/11] Initial implementation of Music Metadata Enricher MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit AI-powered per-album pipeline: scan → local hints → MusicBrainz/Discogs/Claude resolve → cover art → interactive or auto review → tag write + rename + report. All external dependencies optional; 17/17 unit tests passing. Co-Authored-By: Claude Sonnet 4.6 --- cover_handler.py | 171 +++++++++++++++++ executor.py | 228 +++++++++++++++++++++++ hint_extractor.py | 260 ++++++++++++++++++++++++++ metadata_resolver.py | 410 +++++++++++++++++++++++++++++++++++++++++ models.py | 79 ++++++++ music_enricher.py | 269 +++++++++++++++++++++++++++ scanner.py | 59 ++++++ test_suite_enricher.py | 272 +++++++++++++++++++++++++++ 8 files changed, 1748 insertions(+) create mode 100644 cover_handler.py create mode 100644 executor.py create mode 100644 hint_extractor.py create mode 100644 metadata_resolver.py create mode 100644 models.py create mode 100644 music_enricher.py create mode 100644 scanner.py create mode 100644 test_suite_enricher.py diff --git a/cover_handler.py b/cover_handler.py new file mode 100644 index 0000000..221c772 --- /dev/null +++ b/cover_handler.py @@ -0,0 +1,171 @@ +from __future__ import annotations + +import sys +import tempfile +import time +from pathlib import Path +from typing import Optional, List + +try: + from PIL import Image + HAS_PIL = True +except ImportError: + HAS_PIL = False + +try: + import requests + HAS_REQUESTS = True +except ImportError: + HAS_REQUESTS = False + +try: + import musicbrainzngs as mb + HAS_MB = True +except ImportError: + HAS_MB = False + +try: + from mutagen.id3 import ID3, APIC, error as ID3Error + from mutagen.flac import FLAC, Picture + from mutagen.mp4 import MP4, MP4Cover + HAS_MUTAGEN = True +except ImportError: + HAS_MUTAGEN = False + +_MIN_COVER_SIZE = 200 # pixels + + +def _image_ok(path: Path) -> bool: + if not HAS_PIL: + return path.stat().st_size > 5000 + try: + with Image.open(path) as img: + w, h = img.size + return w >= _MIN_COVER_SIZE and h >= _MIN_COVER_SIZE + except Exception: + return False + + +def find_local_cover(image_files: List[Path]) -> Optional[Path]: + priority = ("front", "folder", "cover", "album") + # Sort by priority keyword, then size descending + def key(p: Path): + name = p.name.lower() + score = next((i for i, kw in enumerate(priority) if kw in name), len(priority)) + size = p.stat().st_size if p.exists() else 0 + return (score, -size) + + for p in sorted(image_files, key=key): + if _image_ok(p): + return p + return None + + +def _mb_cover_url(release_mbid: str) -> Optional[str]: + url = f"https://coverartarchive.org/release/{release_mbid}/front" + if not HAS_REQUESTS: + return None + try: + r = requests.head(url, timeout=5, allow_redirects=True) + if r.status_code == 200: + return url + except Exception: + pass + return None + + +def download_cover(release_mbid: Optional[str], dest_dir: Path) -> Optional[Path]: + if not release_mbid or not HAS_REQUESTS: + return None + url = _mb_cover_url(release_mbid) + if not url: + return None + try: + r = requests.get(url, timeout=15) + if r.status_code == 200: + ext = ".jpg" + ct = r.headers.get("content-type", "") + if "png" in ct: + ext = ".png" + dest = dest_dir / f"_cover_download{ext}" + dest.write_bytes(r.content) + if _image_ok(dest): + return dest + dest.unlink(missing_ok=True) + except Exception as e: + print(f" ⚠️ Cover-Download-Fehler: {e}", file=sys.stderr) + return None + + +def embed_cover(audio_path: Path, cover_path: Path) -> bool: + if not HAS_MUTAGEN: + return False + try: + img_data = cover_path.read_bytes() + mime = "image/jpeg" if cover_path.suffix.lower() in (".jpg", ".jpeg") else "image/png" + ext = audio_path.suffix.lower() + + if ext == ".mp3": + try: + tags = ID3(str(audio_path)) + except ID3Error: + tags = ID3() + tags.delall("APIC") + tags.add(APIC(encoding=3, mime=mime, type=3, desc="Cover", data=img_data)) + tags.save(str(audio_path), v2_version=4) + return True + + elif ext == ".flac": + audio = FLAC(str(audio_path)) + audio.clear_pictures() + pic = Picture() + pic.type = 3 + pic.mime = mime + pic.desc = "Cover" + pic.data = img_data + audio.add_picture(pic) + audio.save() + return True + + elif ext == ".m4a": + audio = MP4(str(audio_path)) + fmt = MP4Cover.FORMAT_JPEG if mime == "image/jpeg" else MP4Cover.FORMAT_PNG + audio.tags["covr"] = [MP4Cover(img_data, imageformat=fmt)] + audio.save() + return True + + else: + # Generic mutagen fallback + from mutagen import File as MutagenFile + audio = MutagenFile(str(audio_path), easy=False) + if audio is not None: + if audio.tags is None: + audio.add_tags() + if hasattr(audio.tags, "add"): + audio.tags.add( + APIC(encoding=3, mime=mime, type=3, desc="Cover", data=img_data) + ) + audio.save() + return True + + except Exception as e: + print(f" ⚠️ Cover-Einbettungsfehler {audio_path.name}: {e}", file=sys.stderr) + return False + + +def resolve_cover( + image_files: List[Path], + release_mbid: Optional[str], + album_dir: Path, +) -> tuple[Optional[Path], Optional[str]]: + """Returns (cover_path, source_label).""" + local = find_local_cover(image_files) + if local: + return local, "local" + + if release_mbid: + downloaded = download_cover(release_mbid, album_dir) + if downloaded: + return downloaded, "musicbrainz" + + return None, None diff --git a/executor.py b/executor.py new file mode 100644 index 0000000..077c3e6 --- /dev/null +++ b/executor.py @@ -0,0 +1,228 @@ +from __future__ import annotations + +import csv +import re +import shutil +import sys +from pathlib import Path +from typing import Optional, List, Dict, Any + +from models import AlbumProposal, TrackProposal + +try: + from mutagen import File as MutagenFile + from mutagen.easyid3 import EasyID3 + from mutagen.flac import FLAC + from mutagen.mp4 import MP4, MP4Tags + HAS_MUTAGEN = True +except ImportError: + HAS_MUTAGEN = False + +from cover_handler import embed_cover + +_SAFE_RE = re.compile(r'[<>:"/\\|?*\x00-\x1f]') +REPORT_FIELDS = [ + "status", "album_dir", "track_path", + "old_title", "new_title", + "old_artist", "new_artist", + "album", "albumartist", "date", "genre", "label", + "track_number", "disc_number", + "cover_embedded", "renamed_to", + "confidence", "sources", +] + + +def _safe_name(s: str) -> str: + return _SAFE_RE.sub("_", s).strip(". ") + + +def _proposed_filename(proposal: TrackProposal, ext: str) -> str: + tn = f"{proposal.track_number:02d}" if proposal.track_number else "00" + prefix = f"{proposal.disc_number}-{tn}" if proposal.disc_number and proposal.disc_number > 1 else tn + artist = _safe_name(proposal.artist or "Unknown") + title = _safe_name(proposal.title or "Unknown") + return f"{prefix} - {artist} - {title}{ext}" + + +def backup_file(path: Path, backup_dir: Path) -> bool: + try: + backup_dir.mkdir(parents=True, exist_ok=True) + rel = path.parent.name + "__" + path.name + dest = backup_dir / rel + if not dest.exists(): + shutil.copy2(path, dest) + return True + except Exception as e: + print(f" ⚠️ Backup-Fehler {path.name}: {e}", file=sys.stderr) + return False + + +def write_tags(path: Path, proposal: TrackProposal, album_proposal: AlbumProposal) -> bool: + if not HAS_MUTAGEN: + return False + ext = path.suffix.lower() + tags_to_write = { + "title": proposal.title or "", + "artist": proposal.artist or "", + "album": album_proposal.album or "", + "albumartist": album_proposal.albumartist or "", + } + if proposal.track_number: + total = len(album_proposal.tracks) + tags_to_write["tracknumber"] = f"{proposal.track_number}/{total}" + if proposal.disc_number: + tags_to_write["discnumber"] = str(proposal.disc_number) + if album_proposal.date: + tags_to_write["date"] = album_proposal.date + if album_proposal.genre: + tags_to_write["genre"] = album_proposal.genre + if album_proposal.label: + tags_to_write["organization"] = album_proposal.label + + try: + if ext == ".mp3": + try: + audio = EasyID3(str(path)) + except Exception: + audio = EasyID3() + audio.save(str(path)) + audio = EasyID3(str(path)) + for k, v in tags_to_write.items(): + audio[k] = [v] + audio.save(v2_version=4) + return True + + elif ext == ".flac": + audio = FLAC(str(path)) + for k, v in tags_to_write.items(): + audio[k] = [v] + audio.save() + return True + + elif ext == ".m4a": + audio = MP4(str(path)) + mapping = { + "title": "\xa9nam", "artist": "\xa9ART", + "album": "\xa9alb", "albumartist": "aART", + "tracknumber": "trkn", "date": "\xa9day", + "genre": "\xa9gen", + } + for k, v in tags_to_write.items(): + tag_key = mapping.get(k) + if tag_key: + if tag_key == "trkn": + try: + num, total = v.split("/") if "/" in v else (v, "0") + audio[tag_key] = [(int(num), int(total))] + except Exception: + pass + else: + audio[tag_key] = [v] + audio.save() + return True + + else: + audio = MutagenFile(str(path), easy=True) + if audio is not None: + if audio.tags is None: + audio.add_tags() + for k, v in tags_to_write.items(): + try: + audio[k] = [v] + except Exception: + pass + audio.save() + return True + + except Exception as e: + print(f" ⚠️ Tag-Schreibfehler {path.name}: {e}", file=sys.stderr) + return False + + +def execute_album( + proposal: AlbumProposal, + backup_dir: Optional[Path], + do_rename: bool, + embed_cover_art: bool, + dry_run: bool, + report_data: List[Dict[str, Any]], +) -> Dict[str, int]: + stats = {"tags_written": 0, "covers_embedded": 0, "files_renamed": 0, "errors": 0} + + for tp in proposal.tracks: + old_title = tp.path.stem + old_artist = "" + if HAS_MUTAGEN: + try: + audio = MutagenFile(str(tp.path), easy=True) + if audio and audio.tags: + old_artist = str(audio.tags.get("artist", [""])[0]) + old_title = str(audio.tags.get("title", [tp.path.stem])[0]) + except Exception: + pass + + new_path = tp.path + renamed_to = "" + cover_embedded = False + + if not dry_run: + if backup_dir: + backup_file(tp.path, backup_dir) + + if write_tags(tp.path, tp, proposal): + stats["tags_written"] += 1 + else: + stats["errors"] += 1 + + if embed_cover_art and proposal.cover_path: + if embed_cover(tp.path, proposal.cover_path): + stats["covers_embedded"] += 1 + cover_embedded = True + + if do_rename: + new_name = _proposed_filename(tp, tp.path.suffix) + candidate = tp.path.parent / new_name + if candidate != tp.path: + try: + tp.path.rename(candidate) + new_path = candidate + renamed_to = new_name + stats["files_renamed"] += 1 + except Exception as e: + print(f" ⚠️ Umbenennungsfehler {tp.path.name}: {e}", file=sys.stderr) + stats["errors"] += 1 + + report_data.append({ + "status": "dry-run" if dry_run else "ok", + "album_dir": str(proposal.album_dir.name), + "track_path": str(new_path), + "old_title": old_title, + "new_title": tp.title, + "old_artist": old_artist, + "new_artist": tp.artist, + "album": proposal.album, + "albumartist": proposal.albumartist, + "date": proposal.date or "", + "genre": proposal.genre or "", + "label": proposal.label or "", + "track_number": tp.track_number or "", + "disc_number": tp.disc_number or "", + "cover_embedded": cover_embedded, + "renamed_to": renamed_to, + "confidence": f"{proposal.confidence:.2f}", + "sources": ", ".join(proposal.sources), + }) + + return stats + + +def write_report(report_data: List[Dict[str, Any]], report_path: Path) -> None: + try: + report_path.parent.mkdir(parents=True, exist_ok=True) + with report_path.open("w", encoding="utf-8", newline="") as f: + w = csv.DictWriter(f, fieldnames=REPORT_FIELDS) + w.writeheader() + w.writerows(report_data) + print(f"📊 Report gespeichert: {report_path}") + except Exception as e: + print(f"⚠️ Report-Fehler: {e}", file=sys.stderr) diff --git a/hint_extractor.py b/hint_extractor.py new file mode 100644 index 0000000..fbac646 --- /dev/null +++ b/hint_extractor.py @@ -0,0 +1,260 @@ +from __future__ import annotations + +import re +import sys +from pathlib import Path +from typing import Optional, List, Dict, Tuple + +from models import AlbumScan, AlbumHints, TrackHints + +try: + from mutagen import File as MutagenFile + HAS_MUTAGEN = True +except ImportError: + HAS_MUTAGEN = False + +try: + from bs4 import BeautifulSoup + HAS_BS4 = True +except ImportError: + HAS_BS4 = False + +_NATSORT_RE = re.compile(r"(\d+)") +_BAD_VALUES = {"unknown", "unknown artist", "unknown album", "untitled", "track", "va", "various"} + +# Filename patterns: most specific first +_FILENAME_PATTERNS = [ + re.compile(r"^(?P\d{1,2})[- _]+(?P\d{1,3})\s*[-._ ]+\s*(?P.+?)\s*[-–]\s*(?P.+)$"), + re.compile(r"^(?P<disc>\d{1,2})[- _]+(?P<track>\d{1,3})\s*[-._ ]+\s*(?P<title>.+)$"), + re.compile(r"^(?P<track>\d{1,3})\s*[-._ ]+\s*(?P<artist>.+?)\s*[-–]\s*(?P<title>.+)$"), + re.compile(r"^(?P<track>\d{1,3})\s*[-._ ]+\s*(?P<title>.+)$"), + re.compile(r"^(?P<artist>.+?)\s*[-–]\s*(?P<title>.+)$"), +] + +# Directory name patterns +_DIR_PATTERNS = [ + re.compile(r"^(?P<artist>.+?)[_ -]+[-–][_ -]+(?P<album>.+?)(?:[_ -]+(?P<year>\d{4}))?$"), + re.compile(r"^(?P<artist>.+?)[_ ]+(?P<year>\d{4})[._ -]+(?P<album>.+)$"), + re.compile(r"^(?P<album>.+?)[_ -]+(?P<year>\d{4})$"), +] + +# Tracklist line patterns +_TRACKLIST_PATTERNS = [ + re.compile(r"^(?P<disc>\d{1,2})[- _](?P<track>\d{1,3})\s+(?P<title>.+?)(?:\s+\d+:\d{2})?$"), + re.compile(r"^(?P<track>\d{1,3})[.):\s]+(?P<title>.+?)(?:\s+\d+:\d{2})?$"), + re.compile(r"^(?P<track>[A-Z]\d{1,2})[.):\s]+(?P<title>.+?)(?:\s+\d+:\d{2})?$"), +] + +_DISC_SECTION_RE = re.compile(r"(?i)(?:cd|disc|disk|side)[_ \-]*(\d{1,2})") + + +def _clean(s: Optional[str]) -> str: + if not s: + return "" + return re.sub(r"\s+", " ", s.replace("_", " ")).strip(" -._") + + +def _is_good(v: Optional[str]) -> bool: + if not v: + return False + return _clean(v).casefold() not in _BAD_VALUES + + +def _parse_dirname(name: str) -> Tuple[Optional[str], Optional[str], Optional[str]]: + name_clean = _clean(name) + for pat in _DIR_PATTERNS: + m = pat.match(name_clean) + if m: + d = m.groupdict() + artist = _clean(d.get("artist")) or None + album = _clean(d.get("album")) or None + year = d.get("year") + if _is_good(artist) or _is_good(album): + return artist, album, year + # No pattern matched — treat whole name as album + return None, _clean(name_clean), None + + +def _parse_filename(stem: str) -> Dict[str, str]: + stem_clean = _clean(stem) + for pat in _FILENAME_PATTERNS: + m = pat.match(stem_clean) + if m: + return {k: _clean(v) for k, v in m.groupdict().items() if v} + return {"title": stem_clean} + + +def _read_tags(path: Path) -> Tuple[Dict[str, str], Optional[float]]: + if not HAS_MUTAGEN: + return {}, None + try: + audio = MutagenFile(str(path), easy=True) + if not audio: + return {}, None + tags: Dict[str, str] = {} + for k in ("title", "artist", "album", "albumartist", "tracknumber", + "discnumber", "date", "year", "genre", "label", "organization"): + v = audio.get(k) + if v: + tags[k] = str(v[0]).strip() + if "year" in tags and "date" not in tags: + tags["date"] = tags["year"] + duration = None + if hasattr(audio, "info") and audio.info and hasattr(audio.info, "length"): + duration = audio.info.length + return tags, duration + except Exception as e: + print(f" ⚠️ Tag-Lesefehler {path.name}: {e}", file=sys.stderr) + return {}, None + + +def _parse_tracklist(text: str) -> List[Dict[str, str]]: + tracks: List[Dict[str, str]] = [] + current_disc = 1 + + for line in text.splitlines(): + line = line.strip() + if not line: + continue + + disc_m = _DISC_SECTION_RE.match(line) + if disc_m and len(line) < 30: + current_disc = int(disc_m.group(1)) + continue + + for pat in _TRACKLIST_PATTERNS: + m = pat.match(line) + if m: + d = m.groupdict() + entry: Dict[str, str] = {"title": _clean(d.get("title", ""))} + raw_track = d.get("track", "") + if raw_track and raw_track.isdigit(): + entry["track"] = raw_track.lstrip("0") or "0" + elif raw_track: + entry["track"] = raw_track + if "disc" in d and d["disc"]: + entry["disc"] = d["disc"] + else: + entry["disc"] = str(current_disc) + if entry.get("title"): + tracks.append(entry) + break + + return tracks + + +def _read_tracklist_file(path: Path) -> Optional[str]: + try: + if path.suffix.lower() in (".htm", ".html"): + raw = path.read_bytes() + encoding = "utf-8" + for enc in ("utf-8", "latin-1", "cp1252"): + try: + raw.decode(enc) + encoding = enc + break + except UnicodeDecodeError: + continue + text = raw.decode(encoding, errors="replace") + if HAS_BS4: + soup = BeautifulSoup(text, "html.parser") + return soup.get_text(separator="\n") + # Fallback: strip HTML tags + return re.sub(r"<[^>]+>", " ", text) + else: + for enc in ("utf-8", "latin-1", "cp1252"): + try: + return path.read_text(encoding=enc) + except UnicodeDecodeError: + continue + except Exception as e: + print(f" ⚠️ Tracklist-Lesefehler {path.name}: {e}", file=sys.stderr) + return None + + +def _check_cover_images(paths: List[Path]) -> List[Path]: + good: List[Path] = [] + for p in paths: + name_lower = p.name.lower() + # Prefer front covers + if any(kw in name_lower for kw in ("front", "folder", "cover", "album")): + good.insert(0, p) + else: + good.append(p) + return good + + +def extract_hints(scan: AlbumScan) -> AlbumHints: + hints = AlbumHints(album_dir=scan.album_dir) + + # Directory name + hints.dir_artist, hints.dir_album, hints.dir_year = _parse_dirname(scan.album_dir.name) + + # Cover images + hints.cover_images = _check_cover_images(scan.image_files) + + # Tracklist files + texts: List[str] = [] + for tf in scan.tracklist_files: + txt = _read_tracklist_file(tf) + if txt: + texts.append(txt) + hints.tracklist_text = "\n\n".join(texts) if texts else None + + parsed_tracklist = _parse_tracklist(hints.tracklist_text) if hints.tracklist_text else [] + + # Build TrackHints per audio file + for audio_path in sorted(scan.audio_files): + tags, duration = _read_tags(audio_path) + fn_hints = _parse_filename(audio_path.stem) + + track_num: Optional[int] = None + disc_num: Optional[int] = None + + # Track number: tag > filename + raw_tn = tags.get("tracknumber") or fn_hints.get("track") + if raw_tn: + try: + track_num = int(str(raw_tn).split("/")[0]) + except ValueError: + pass + + # Disc number: tag > filename > path segment + raw_dn = tags.get("discnumber") or fn_hints.get("disc") + if raw_dn: + try: + disc_num = int(str(raw_dn).split("/")[0]) + except ValueError: + pass + if not disc_num: + for part in audio_path.relative_to(scan.album_dir).parts[:-1]: + dm = _DISC_SECTION_RE.search(part) + if dm: + disc_num = int(dm.group(1)) + break + + title = tags.get("title") or fn_hints.get("title") + artist = tags.get("artist") or fn_hints.get("artist") + + # Enrich from parsed tracklist if track_num matches + if parsed_tracklist and track_num: + for tl_entry in parsed_tracklist: + tl_track = tl_entry.get("track") + tl_disc = tl_entry.get("disc", "1") + if (tl_track and int(tl_track) == track_num + and int(tl_disc) == (disc_num or 1)): + if not _is_good(title) and _is_good(tl_entry.get("title")): + title = tl_entry["title"] + break + + hints.tracks.append(TrackHints( + path=audio_path, + track_number=track_num, + disc_number=disc_num, + title=_clean(title) if title else None, + artist=_clean(artist) if artist else None, + duration=duration, + existing_tags=tags, + )) + + return hints diff --git a/metadata_resolver.py b/metadata_resolver.py new file mode 100644 index 0000000..32a0642 --- /dev/null +++ b/metadata_resolver.py @@ -0,0 +1,410 @@ +from __future__ import annotations + +import os +import sys +import time +from typing import Optional, List, Dict, Tuple + +from models import AlbumHints, AlbumProposal, TrackProposal + +try: + import musicbrainzngs as mb + mb.set_useragent("MusicMetadataEnricher", "1.0", "https://github.com/dschlueter") + HAS_MB = True +except ImportError: + HAS_MB = False + +try: + import acoustid + HAS_ACOUSTID = True +except ImportError: + HAS_ACOUSTID = False + +try: + import discogs_client as dc + HAS_DISCOGS = True +except ImportError: + HAS_DISCOGS = False + +try: + import anthropic + HAS_ANTHROPIC = True +except ImportError: + HAS_ANTHROPIC = False + +_MB_RATE_LIMIT = 1.1 # seconds between MusicBrainz requests +_last_mb_call = 0.0 +ACOUSTID_API_KEY = os.getenv("ACOUSTID_API_KEY", "") +ANTHROPIC_API_KEY = os.getenv("ANTHROPIC_API_KEY", "") +DISCOGS_TOKEN = os.getenv("DISCOGS_TOKEN", "") + + +def _mb_wait(): + global _last_mb_call + elapsed = time.monotonic() - _last_mb_call + if elapsed < _MB_RATE_LIMIT: + time.sleep(_MB_RATE_LIMIT - elapsed) + _last_mb_call = time.monotonic() + + +# --------------------------------------------------------------------------- +# AcoustID fingerprinting +# --------------------------------------------------------------------------- + +def _fingerprint_tracks(hints: AlbumHints) -> Dict[str, List[str]]: + """Returns {audio_path_str: [mbid, ...]}""" + if not HAS_ACOUSTID or not ACOUSTID_API_KEY: + return {} + results: Dict[str, List[str]] = {} + for t in hints.tracks: + try: + duration, fp = acoustid.fingerprint_file(str(t.path)) + response = acoustid.lookup(ACOUSTID_API_KEY, fp, duration, + meta="recordings releasegroups") + mbids: List[str] = [] + for result in response.get("results", []): + if result.get("score", 0) >= 0.90: + for rec in result.get("recordings", []): + mbids.append(rec["id"]) + results[str(t.path)] = mbids + except Exception as e: + print(f" ⚠️ AcoustID-Fehler {t.path.name}: {e}", file=sys.stderr) + return results + + +# --------------------------------------------------------------------------- +# MusicBrainz lookup +# --------------------------------------------------------------------------- + +def _mb_search_release(artist: Optional[str], album: Optional[str], + year: Optional[str]) -> Optional[Dict]: + if not HAS_MB or (not artist and not album): + return None + query_parts = [] + if album: + query_parts.append(f'release:"{album}"') + if artist: + query_parts.append(f'artist:"{artist}"') + if year: + query_parts.append(f'date:{year}') + query = " AND ".join(query_parts) + try: + _mb_wait() + result = mb.search_releases(query=query, limit=3) + releases = result.get("release-list", []) + if not releases: + return None + # Take highest-score release + best = max(releases, key=lambda r: int(r.get("ext:score", 0))) + score = int(best.get("ext:score", 0)) + if score < 70: + return None + return best + except Exception as e: + print(f" ⚠️ MusicBrainz-Suchfehler: {e}", file=sys.stderr) + return None + + +def _mb_get_release_tracks(release_id: str) -> Optional[List[Dict]]: + if not HAS_MB: + return None + try: + _mb_wait() + result = mb.get_release_by_id( + release_id, + includes=["recordings", "artist-credits", "labels", "release-groups"], + ) + return result.get("release") + except Exception as e: + print(f" ⚠️ MusicBrainz-Release-Fehler: {e}", file=sys.stderr) + return None + + +def _mb_recording_to_release(recording_mbid: str) -> Optional[Dict]: + if not HAS_MB: + return None + try: + _mb_wait() + result = mb.get_recording_by_id( + recording_mbid, + includes=["releases", "artist-credits", "release-groups"], + ) + rec = result.get("recording", {}) + releases = rec.get("release-list", []) + if releases: + return releases[0] + return None + except Exception as e: + print(f" ⚠️ MusicBrainz-Recording-Fehler: {e}", file=sys.stderr) + return None + + +# --------------------------------------------------------------------------- +# Discogs fallback +# --------------------------------------------------------------------------- + +def _discogs_search(artist: Optional[str], album: Optional[str]) -> Optional[Dict]: + if not HAS_DISCOGS or not DISCOGS_TOKEN: + return None + try: + client = dc.Client("MusicMetadataEnricher/1.0", user_token=DISCOGS_TOKEN) + results = client.search( + album or artist or "", + artist=artist or "", + type="release", + ) + if results.count: + r = results[0] + return { + "album": r.title, + "artist": r.artists[0].name if r.artists else None, + "year": str(r.year) if r.year else None, + "genre": r.genres[0] if r.genres else None, + "label": r.labels[0].name if r.labels else None, + "id": r.id, + } + except Exception as e: + print(f" ⚠️ Discogs-Fehler: {e}", file=sys.stderr) + return None + + +# --------------------------------------------------------------------------- +# Claude API reasoning (optional) +# --------------------------------------------------------------------------- + +def _claude_resolve(hints: AlbumHints, partial: Dict) -> Optional[Dict]: + if not HAS_ANTHROPIC or not ANTHROPIC_API_KEY: + return None + try: + client = anthropic.Anthropic(api_key=ANTHROPIC_API_KEY) + tracks_summary = "\n".join( + f" - Track {t.track_number or '?'}: {t.title or t.path.stem}" + + (f" [{t.artist}]" if t.artist else "") + for t in hints.tracks[:20] + ) + prompt = f"""Du bist ein Musikexperte. Analysiere diese Album-Daten und vervollständige die fehlenden Felder. + +Verzeichnisname: {hints.album_dir.name} +Bekannte Artist: {hints.dir_artist or partial.get('artist', 'unbekannt')} +Bekannter Albumtitel: {hints.dir_album or partial.get('album', 'unbekannt')} +Jahr: {hints.dir_year or partial.get('year', 'unbekannt')} +Tracklist-Hinweise: +{tracks_summary} + +Antworte NUR mit einem JSON-Objekt mit diesen Feldern (null wenn unbekannt): +{{"artist": ..., "album": ..., "albumartist": ..., "year": ..., "genre": ..., "label": ...}}""" + + message = client.messages.create( + model="claude-haiku-4-5-20251001", + max_tokens=300, + messages=[{"role": "user", "content": prompt}], + ) + import json + text = message.content[0].text.strip() + # Extract JSON from response + json_match = __import__("re").search(r"\{.*\}", text, __import__("re").DOTALL) + if json_match: + return json.loads(json_match.group()) + except Exception as e: + print(f" ⚠️ Claude-API-Fehler: {e}", file=sys.stderr) + return None + + +# --------------------------------------------------------------------------- +# Main resolver +# --------------------------------------------------------------------------- + +def resolve( + hints: AlbumHints, + use_fingerprint: bool = True, + use_api: bool = True, + use_claude: bool = True, +) -> AlbumProposal: + confidence = 0.0 + sources: List[str] = [] + notes: List[str] = [] + + artist = hints.dir_artist + album = hints.dir_album + year = hints.dir_year + genre: Optional[str] = None + label: Optional[str] = None + release_mbid: Optional[str] = None + mb_tracks: Optional[List] = None + + # Collect artist/album from existing tags (majority vote) + tag_artists = [t.existing_tags.get("artist") for t in hints.tracks if t.existing_tags.get("artist")] + tag_albums = [t.existing_tags.get("album") for t in hints.tracks if t.existing_tags.get("album")] + if tag_artists: + from collections import Counter + artist = artist or Counter(tag_artists).most_common(1)[0][0] + if tag_albums: + from collections import Counter + album = album or Counter(tag_albums).most_common(1)[0][0] + + # Tag year/genre/label + for t in hints.tracks: + year = year or t.existing_tags.get("date") or t.existing_tags.get("year") + genre = genre or t.existing_tags.get("genre") + label = label or t.existing_tags.get("label") or t.existing_tags.get("organization") + + if artist or album: + confidence += 0.05 + sources.append("local-hints") + + # AcoustID fingerprinting + fp_mbids: Dict[str, List[str]] = {} + if use_fingerprint and use_api and HAS_ACOUSTID and ACOUSTID_API_KEY: + fp_mbids = _fingerprint_tracks(hints) + if fp_mbids: + confidence += 0.20 + sources.append("acoustid") + # Try to get release from first matched recording + for mbids in fp_mbids.values(): + for mbid in mbids[:1]: + rel = _mb_recording_to_release(mbid) + if rel: + release_mbid = rel.get("id") + confidence += 0.25 + sources.append("musicbrainz-fingerprint") + break + if release_mbid: + break + + # MusicBrainz text search + if use_api and HAS_MB and not release_mbid: + mb_result = _mb_search_release(artist, album, year) + if mb_result: + release_mbid = mb_result.get("id") + score = int(mb_result.get("ext:score", 0)) + confidence += 0.30 * (score / 100) + sources.append("musicbrainz-text") + notes.append(f"MusicBrainz score: {score}") + + # Fetch full release data + if use_api and release_mbid: + full_release = _mb_get_release_tracks(release_mbid) + if full_release: + if not artist: + creds = full_release.get("artist-credit", []) + artist = "".join(c.get("artist", {}).get("name", "") + c.get("joinphrase", "") + for c in creds if isinstance(c, dict)).strip() or artist + if not album: + album = full_release.get("title", album) + if not year: + year = full_release.get("date", "")[:4] or None + label_info = full_release.get("label-info-list", []) + if label_info and not label: + label = label_info[0].get("label", {}).get("name") if label_info else None + rg = full_release.get("release-group", {}) + if not genre: + genre = (rg.get("primary-type") or "").strip() or None + mb_tracks = [] + for medium in full_release.get("medium-list", []): + disc_num = medium.get("position", 1) + for track in medium.get("track-list", []): + mb_tracks.append({ + "disc": disc_num, + "number": int(track.get("number", 0) or 0), + "title": track.get("recording", {}).get("title", ""), + "artist": track.get("artist-credit-phrase", ""), + "mbid": track.get("recording", {}).get("id"), + }) + + # Discogs fallback + if use_api and HAS_DISCOGS and DISCOGS_TOKEN and not release_mbid: + dg = _discogs_search(artist, album) + if dg: + artist = artist or dg.get("artist") + album = album or dg.get("album") + year = year or dg.get("year") + genre = genre or dg.get("genre") + label = label or dg.get("label") + confidence += 0.15 + sources.append("discogs") + + # Claude API for remaining gaps + partial = {"artist": artist, "album": album, "year": year} + if use_claude and use_api and ANTHROPIC_API_KEY and HAS_ANTHROPIC: + if not artist or not album or confidence < 0.5: + cl = _claude_resolve(hints, partial) + if cl: + artist = artist or cl.get("artist") + album = album or cl.get("album") + year = year or cl.get("year") + genre = genre or cl.get("genre") + label = label or cl.get("label") + confidence += 0.10 + sources.append("claude") + + # Finalize albumartist + track_artists = [t.artist for t in hints.tracks if t.artist] + from collections import Counter + distinct_artists = set(a for a in track_artists if a) + if len(distinct_artists) >= 3: + albumartist = "Various Artists" + elif track_artists: + albumartist = artist or Counter(track_artists).most_common(1)[0][0] + else: + albumartist = artist or "Unknown Artist" + + album = album or hints.album_dir.name.replace("_", " ") + artist = artist or albumartist + confidence = min(confidence, 1.0) + + # Build track proposals + track_proposals = _build_track_proposals(hints, mb_tracks, album, artist) + + return AlbumProposal( + album_dir=hints.album_dir, + album=album, + albumartist=albumartist, + date=year, + genre=genre, + label=label, + mbid=release_mbid, + cover_path=None, + cover_source=None, + tracks=track_proposals, + confidence=confidence, + sources=sources, + notes=notes, + ) + + +def _build_track_proposals( + hints: AlbumHints, + mb_tracks: Optional[List], + album: str, + album_artist: str, +) -> List[TrackProposal]: + proposals: List[TrackProposal] = [] + + for th in sorted(hints.tracks, key=lambda t: (t.disc_number or 1, t.track_number or 9999, str(t.path))): + title = th.title + artist = th.artist or album_artist + track_num = th.track_number + disc_num = th.disc_number + + # Try to match from MusicBrainz track list + if mb_tracks and track_num: + for mb_t in mb_tracks: + if mb_t["number"] == track_num and mb_t["disc"] == (disc_num or 1): + if mb_t.get("title"): + title = mb_t["title"] + if mb_t.get("artist"): + artist = mb_t["artist"] + break + + title = title or th.path.stem + + proposals.append(TrackProposal( + path=th.path, + title=title, + artist=artist, + track_number=track_num, + disc_number=disc_num, + mbid=None, + )) + + return proposals diff --git a/models.py b/models.py new file mode 100644 index 0000000..a5b3301 --- /dev/null +++ b/models.py @@ -0,0 +1,79 @@ +from __future__ import annotations + +from dataclasses import dataclass, field +from pathlib import Path +from typing import Optional, List, Dict + + +AUDIO_EXTENSIONS = { + ".mp3", ".flac", ".m4a", ".aac", ".ogg", ".opus", + ".wav", ".wma", ".aiff", ".ape", +} +IMAGE_EXTENSIONS = {".jpg", ".jpeg", ".png", ".webp", ".bmp", ".gif"} +TRACKLIST_EXTENSIONS = {".txt", ".htm", ".html", ".nfo"} +PLAYLIST_EXTENSIONS = {".m3u", ".m3u8", ".pls"} + + +@dataclass +class ScannedFile: + path: Path + kind: str # "audio" | "image" | "tracklist" | "playlist" | "other" + + +@dataclass +class AlbumScan: + album_dir: Path + audio_files: List[Path] = field(default_factory=list) + image_files: List[Path] = field(default_factory=list) + tracklist_files: List[Path] = field(default_factory=list) + other_files: List[Path] = field(default_factory=list) + + +@dataclass +class TrackHints: + path: Path + track_number: Optional[int] = None + disc_number: Optional[int] = None + title: Optional[str] = None + artist: Optional[str] = None + duration: Optional[float] = None + existing_tags: Dict[str, str] = field(default_factory=dict) + + +@dataclass +class AlbumHints: + album_dir: Path + dir_artist: Optional[str] = None + dir_album: Optional[str] = None + dir_year: Optional[str] = None + tracklist_text: Optional[str] = None # merged text from all tracklist files + cover_images: List[Path] = field(default_factory=list) + tracks: List[TrackHints] = field(default_factory=list) + + +@dataclass +class TrackProposal: + path: Path + title: str + artist: str + track_number: Optional[int] + disc_number: Optional[int] + new_filename: Optional[str] = None # only set when --rename is active + mbid: Optional[str] = None + + +@dataclass +class AlbumProposal: + album_dir: Path + album: str + albumartist: str + date: Optional[str] + genre: Optional[str] + label: Optional[str] + mbid: Optional[str] # MusicBrainz release ID + cover_path: Optional[Path] # resolved local or downloaded cover + cover_source: Optional[str] # "local" | "musicbrainz" | "discogs" + tracks: List[TrackProposal] + confidence: float + sources: List[str] = field(default_factory=list) + notes: List[str] = field(default_factory=list) diff --git a/music_enricher.py b/music_enricher.py new file mode 100644 index 0000000..aabd5ae --- /dev/null +++ b/music_enricher.py @@ -0,0 +1,269 @@ +#!/usr/bin/env python3 +""" +music_enricher.py +KI-gestützter Musik-Metadaten-Enricher für Jellyfin-Bibliotheken. + +Pipeline pro Album: + Scan → HintExtractor → MetadataResolver → CoverHandler → Review → Executor +""" +from __future__ import annotations + +import argparse +import os +import sys +from pathlib import Path +from typing import Any, Dict, List, Optional + +try: + from tqdm import tqdm + HAS_TQDM = True +except ImportError: + HAS_TQDM = False + +from models import AlbumProposal +from scanner import scan_album, collect_album_dirs +from hint_extractor import extract_hints +from metadata_resolver import resolve +from cover_handler import resolve_cover +from executor import execute_album, write_report + + +def maybe_tqdm(iterable, show: bool, **kwargs): + return tqdm(iterable, **kwargs) if show else iterable + + +# --------------------------------------------------------------------------- +# Review / Display +# --------------------------------------------------------------------------- + +def _print_proposal(proposal: AlbumProposal) -> None: + conf_bar = "█" * int(proposal.confidence * 10) + "░" * (10 - int(proposal.confidence * 10)) + print(f"\n{'─' * 60}") + print(f"💿 {proposal.album_dir.name}") + print(f" Album: {proposal.album}") + print(f" Artist: {proposal.albumartist}") + print(f" Jahr: {proposal.date or '–'}") + print(f" Genre: {proposal.genre or '–'}") + print(f" Label: {proposal.label or '–'}") + print(f" Cover: {proposal.cover_source or '–'} ({proposal.cover_path.name if proposal.cover_path else 'keins'})") + print(f" Konfidenz: [{conf_bar}] {proposal.confidence:.0%} Quellen: {', '.join(proposal.sources) or '–'}") + if proposal.notes: + for n in proposal.notes: + print(f" ℹ️ {n}") + print(f" Tracks ({len(proposal.tracks)}):") + for tp in proposal.tracks[:8]: + tn = f"{tp.disc_number}-{tp.track_number:02d}" if tp.disc_number and tp.disc_number > 1 else ( + f"{tp.track_number:02d}" if tp.track_number else "??") + print(f" {tn} {tp.artist} – {tp.title}") + if len(proposal.tracks) > 8: + print(f" … und {len(proposal.tracks) - 8} weitere") + + +def _interactive_review(proposal: AlbumProposal) -> bool: + """Returns True if user accepts the proposal.""" + _print_proposal(proposal) + while True: + answer = input("\n [Enter] Akzeptieren [s] Überspringen [q] Abbrechen: ").strip().lower() + if answer in ("", "j", "y"): + return True + if answer == "s": + return False + if answer == "q": + sys.exit(0) + + +# --------------------------------------------------------------------------- +# Main pipeline +# --------------------------------------------------------------------------- + +def process_album( + album_dir: Path, + args: argparse.Namespace, + report_data: List[Dict[str, Any]], +) -> Dict[str, int]: + stats = {"tags_written": 0, "covers_embedded": 0, "files_renamed": 0, + "errors": 0, "skipped": 0} + + try: + scan = scan_album(album_dir) + if not scan.audio_files: + stats["skipped"] += 1 + return stats + + hints = extract_hints(scan) + + proposal = resolve( + hints, + use_fingerprint=not args.no_fingerprint, + use_api=not args.no_api, + use_claude=bool(os.getenv("ANTHROPIC_API_KEY")), + ) + + # Cover art + cover_path, cover_source = resolve_cover( + hints.cover_images, + proposal.mbid, + album_dir, + ) + if cover_path and not args.no_cover: + proposal.cover_path = cover_path + proposal.cover_source = cover_source + + # Set proposed filenames if --rename + if args.rename: + from executor import _proposed_filename + for tp in proposal.tracks: + tp.new_filename = _proposed_filename(tp, tp.path.suffix) + + # Review step + if args.dry_run: + _print_proposal(proposal) + for tp in proposal.tracks: + report_data.append({ + "status": "dry-run", + "album_dir": str(album_dir.name), + "track_path": str(tp.path), + "old_title": tp.path.stem, + "new_title": tp.title, + "old_artist": "", + "new_artist": tp.artist, + "album": proposal.album, + "albumartist": proposal.albumartist, + "date": proposal.date or "", + "genre": proposal.genre or "", + "label": proposal.label or "", + "track_number": tp.track_number or "", + "disc_number": tp.disc_number or "", + "cover_embedded": False, + "renamed_to": tp.new_filename or "", + "confidence": f"{proposal.confidence:.2f}", + "sources": ", ".join(proposal.sources), + }) + return stats + + accepted = True + if not args.auto: + accepted = _interactive_review(proposal) + elif args.auto and proposal.confidence < args.confidence: + print(f" ⏭️ Konfidenz {proposal.confidence:.0%} < {args.confidence:.0%} → übersprungen: {album_dir.name}") + stats["skipped"] += 1 + return stats + else: + _print_proposal(proposal) + + if not accepted: + stats["skipped"] += 1 + return stats + + album_stats = execute_album( + proposal=proposal, + backup_dir=args.backup, + do_rename=args.rename, + embed_cover_art=args.embed_cover, + dry_run=False, + report_data=report_data, + ) + for k, v in album_stats.items(): + stats[k] = stats.get(k, 0) + v + + except Exception as e: + stats["errors"] += 1 + print(f" ❌ Fehler in {album_dir.name}: {e}", file=sys.stderr) + import traceback + traceback.print_exc(file=sys.stderr) + + return stats + + +def main() -> None: + parser = argparse.ArgumentParser( + description="KI-gestützter Musik-Metadaten-Enricher für Jellyfin", + formatter_class=argparse.RawTextHelpFormatter, + ) + parser.add_argument("paths", nargs="*", + help="Root-Verzeichnisse (direkte Unterordner = Alben)") + parser.add_argument("--album", type=Path, + help="Einzelnes Album-Verzeichnis verarbeiten") + parser.add_argument("--dry-run", action="store_true", + help="Vorschläge anzeigen, nichts schreiben") + parser.add_argument("--auto", action="store_true", + help="Kein interaktiver Review-Schritt") + parser.add_argument("--confidence", type=float, default=0.85, + help="Min-Konfidenz für --auto (default: 0.85)") + parser.add_argument("--rename", action="store_true", + help="Dateien nach Schema umbenennen: TT - Artist - Titel.ext") + parser.add_argument("--embed-cover", action="store_true", + help="Cover-Art in Audiodatei einbetten") + parser.add_argument("--backup", type=Path, + help="Backup-Verzeichnis vor Änderungen") + parser.add_argument("--report", type=Path, + help="CSV-Report der Änderungen") + parser.add_argument("--no-fingerprint", action="store_true", + help="AcoustID-Fingerprinting überspringen") + parser.add_argument("--no-api", action="store_true", + help="Keine externen API-Calls") + parser.add_argument("--no-cover", action="store_true", + help="Kein Cover-Art-Download") + parser.add_argument("--no-tqdm", action="store_true", + help="Fortschrittsanzeige deaktivieren") + + args = parser.parse_args() + + if not args.album and not args.paths: + parser.error("Mindestens ein Pfad oder --album erforderlich.") + + show_progress = HAS_TQDM and not args.no_tqdm and args.auto + report_data: List[Dict[str, Any]] = [] + totals: Dict[str, int] = { + "albums": 0, "skipped": 0, "tags_written": 0, + "covers_embedded": 0, "files_renamed": 0, "errors": 0, + } + + # Collect album directories + album_dirs: List[Path] = [] + if args.album: + album_dirs.append(args.album.expanduser().resolve()) + for raw in args.paths: + root = Path(raw).expanduser().resolve() + if not root.is_dir(): + print(f"⚠️ Kein Verzeichnis: {root}") + continue + album_dirs.extend(collect_album_dirs(root)) + + if not album_dirs: + print("⚠️ Keine Album-Verzeichnisse gefunden.") + sys.exit(1) + + print(f"🎵 {len(album_dirs)} Album-Verzeichnisse gefunden.") + if os.getenv("ANTHROPIC_API_KEY"): + print("🤖 Claude API aktiv.") + if not args.no_api: + print("🔍 MusicBrainz-Lookup aktiv.") + if args.dry_run: + print("🧪 DRY-RUN — nichts wird geschrieben.") + + for album_dir in maybe_tqdm(album_dirs, show_progress, + desc="Alben", unit="album", dynamic_ncols=True): + stats = process_album(album_dir, args, report_data) + totals["albums"] += 1 + for k in ("skipped", "tags_written", "covers_embedded", "files_renamed", "errors"): + totals[k] += stats.get(k, 0) + + if args.report and report_data: + write_report(report_data, args.report) + + print(f"\n{'=' * 50}") + print("✅ Zusammenfassung:") + print(f" 💿 Alben verarbeitet: {totals['albums']}") + print(f" ⏭️ Übersprungen: {totals['skipped']}") + print(f" 🏷️ Tags geschrieben: {totals['tags_written']}") + print(f" 🖼️ Cover eingebettet: {totals['covers_embedded']}") + print(f" 📝 Dateien umbenannt: {totals['files_renamed']}") + print(f" ❌ Fehler: {totals['errors']}") + if args.dry_run: + print(" 🧪 Modus: DRY-RUN") + print("=" * 50) + + +if __name__ == "__main__": + main() diff --git a/scanner.py b/scanner.py new file mode 100644 index 0000000..04fa997 --- /dev/null +++ b/scanner.py @@ -0,0 +1,59 @@ +from __future__ import annotations + +import sys +from pathlib import Path +from typing import List + +from models import AlbumScan, AUDIO_EXTENSIONS, IMAGE_EXTENSIONS, TRACKLIST_EXTENSIONS + + +def _is_hidden(name: str) -> bool: + return name.startswith(".") or name.startswith("_") + + +def scan_album(album_dir: Path) -> AlbumScan: + result = AlbumScan(album_dir=album_dir) + + for dirpath, dirnames, filenames in album_dir.walk() if hasattr(album_dir, "walk") else _os_walk(album_dir): + dirnames[:] = [d for d in dirnames if not _is_hidden(d)] + current = Path(dirpath) if isinstance(dirpath, str) else dirpath + + for name in filenames: + if _is_hidden(name): + continue + p = current / name + ext = p.suffix.lower() + + if ext in AUDIO_EXTENSIONS: + result.audio_files.append(p) + elif ext in IMAGE_EXTENSIONS: + result.image_files.append(p) + elif ext in TRACKLIST_EXTENSIONS: + result.tracklist_files.append(p) + else: + result.other_files.append(p) + + result.audio_files.sort() + result.image_files.sort() + result.tracklist_files.sort() + return result + + +def _os_walk(album_dir: Path): + import os + return os.walk( + album_dir, + followlinks=False, + onerror=lambda e: print(f"⚠️ Scan-Fehler: {e}", file=sys.stderr), + ) + + +def collect_album_dirs(root: Path) -> List[Path]: + dirs: List[Path] = [] + try: + for item in sorted(root.iterdir()): + if item.is_dir() and not _is_hidden(item.name): + dirs.append(item) + except (PermissionError, OSError) as e: + print(f"⚠️ Lesefehler {root}: {e}", file=sys.stderr) + return dirs diff --git a/test_suite_enricher.py b/test_suite_enricher.py new file mode 100644 index 0000000..f587793 --- /dev/null +++ b/test_suite_enricher.py @@ -0,0 +1,272 @@ +#!/usr/bin/env python3 +"""test_suite_enricher.py — Unit- und Integrationstests für music_enricher.""" +from __future__ import annotations + +import sys +import tempfile +import traceback +from pathlib import Path +from typing import Callable + +sys.path.insert(0, str(Path(__file__).parent)) + +from models import AlbumScan, TrackHints, AlbumHints + +RESULTS: list[dict] = [] + + +def record(test_id: str, passed: bool, detail: str = "") -> None: + RESULTS.append({"id": test_id, "status": "PASS" if passed else "FAIL", "detail": detail}) + + +def run_case(test_id: str, fn: Callable[[], str]) -> None: + try: + detail = fn() + record(test_id, True, detail) + except Exception: + record(test_id, False, traceback.format_exc()[:300]) + + +# --------------------------------------------------------------------------- +# hint_extractor Tests +# --------------------------------------------------------------------------- + +def test_parse_dirname_artist_album() -> str: + from hint_extractor import _parse_dirname + artist, album, year = _parse_dirname("Pink_Floyd_-_The_Wall") + assert artist and "Pink" in artist, f"artist: {artist}" + assert album and "Wall" in album, f"album: {album}" + return f"artist={artist!r}, album={album!r}" + + +def test_parse_dirname_with_year() -> str: + from hint_extractor import _parse_dirname + artist, album, year = _parse_dirname("Abba_-_Greatest_Hits_1992") + assert year == "1992", f"year: {year}" + return f"year={year}" + + +def test_parse_dirname_album_only() -> str: + from hint_extractor import _parse_dirname + artist, album, year = _parse_dirname("Beethoven_Complete_Edition") + assert album is not None, "album should not be None" + return f"album={album!r}" + + +def test_parse_filename_track_artist_title() -> str: + from hint_extractor import _parse_filename + r = _parse_filename("07 - ABBA - Dancing Queen") + assert r.get("track") == "07", f"track: {r}" + assert "ABBA" in r.get("artist", ""), f"artist: {r}" + assert "Dancing" in r.get("title", ""), f"title: {r}" + return str(r) + + +def test_parse_filename_disc_track_title() -> str: + from hint_extractor import _parse_filename + r = _parse_filename("2-07 - Bach - Toccata") + assert r.get("disc") == "2", f"disc: {r}" + assert r.get("track") == "07", f"track: {r}" + return str(r) + + +def test_parse_filename_track_title() -> str: + from hint_extractor import _parse_filename + r = _parse_filename("01 - Dancing Queen") + assert r.get("track") == "01", f"track: {r}" + assert "Dancing" in r.get("title", ""), f"title: {r}" + return str(r) + + +def test_parse_filename_artist_title() -> str: + from hint_extractor import _parse_filename + r = _parse_filename("Miles Davis - So What") + assert "Miles" in r.get("artist", ""), f"artist: {r}" + assert "What" in r.get("title", ""), f"title: {r}" + return str(r) + + +def test_parse_tracklist_numbered() -> str: + from hint_extractor import _parse_tracklist + text = "1. Dancing Queen\n2. Waterloo\n3. Fernando" + tracks = _parse_tracklist(text) + assert len(tracks) == 3, f"count: {len(tracks)}" + assert tracks[0]["title"] == "Dancing Queen", f"title: {tracks[0]}" + return f"{len(tracks)} tracks parsed" + + +def test_parse_tracklist_with_duration() -> str: + from hint_extractor import _parse_tracklist + text = "1-1 Toccata And Fugue 9:17\n1-2 Heartbeat 2:19\n2-1 Finale 5:00" + tracks = _parse_tracklist(text) + assert len(tracks) >= 2, f"count: {len(tracks)}" + assert tracks[0]["disc"] == "1", f"disc: {tracks[0]}" + return f"{len(tracks)} tracks parsed" + + +def test_parse_tracklist_with_disc_sections() -> str: + from hint_extractor import _parse_tracklist + text = "CD 1\n1. Track A\n2. Track B\nCD 2\n1. Track C" + tracks = _parse_tracklist(text) + disc2 = [t for t in tracks if t.get("disc") == "2"] + assert len(disc2) >= 1, f"disc2: {disc2}" + return f"{len(tracks)} total, {len(disc2)} on disc 2" + + +# --------------------------------------------------------------------------- +# Scanner Tests +# --------------------------------------------------------------------------- + +def test_scanner_classifies_files() -> str: + from scanner import scan_album + with tempfile.TemporaryDirectory() as tmpdir: + root = Path(tmpdir) / "TestAlbum" + root.mkdir() + (root / "01 - Song.mp3").write_bytes(b"\x00" * 100) + (root / "02 - Song.flac").write_bytes(b"\x00" * 100) + (root / "front.jpg").write_bytes(b"\xff\xd8" + b"\x00" * 100) + (root / "tracklist.txt").write_text("1. Track One\n2. Track Two") + (root / "notes.pdf").write_bytes(b"\x00" * 50) + + scan = scan_album(root) + assert len(scan.audio_files) == 2, f"audio: {scan.audio_files}" + assert len(scan.image_files) == 1, f"images: {scan.image_files}" + assert len(scan.tracklist_files) == 1, f"tracklists: {scan.tracklist_files}" + return "scan OK: 2 audio, 1 image, 1 tracklist" + + +def test_scanner_ignores_hidden() -> str: + from scanner import scan_album + with tempfile.TemporaryDirectory() as tmpdir: + root = Path(tmpdir) / "Album" + root.mkdir() + (root / "song.mp3").write_bytes(b"\x00" * 100) + (root / ".hidden.mp3").write_bytes(b"\x00" * 100) + (root / "_trash.mp3").write_bytes(b"\x00" * 100) + scan = scan_album(root) + assert len(scan.audio_files) == 1, f"should ignore hidden: {scan.audio_files}" + return "hidden files correctly ignored" + + +# --------------------------------------------------------------------------- +# extract_hints integration +# --------------------------------------------------------------------------- + +def test_extract_hints_from_scan() -> str: + from scanner import scan_album + from hint_extractor import extract_hints + with tempfile.TemporaryDirectory() as tmpdir: + root = Path(tmpdir) / "ABBA_-_Greatest_Hits" + root.mkdir() + (root / "01 - ABBA - Dancing Queen.mp3").write_bytes(b"\x00" * 1024) + (root / "02 - ABBA - Waterloo.mp3").write_bytes(b"\x00" * 1024) + (root / "tracklist.txt").write_text("1. Dancing Queen\n2. Waterloo\n") + + scan = scan_album(root) + hints = extract_hints(scan) + assert hints.dir_album is not None, "album hint missing" + assert len(hints.tracks) == 2, f"tracks: {len(hints.tracks)}" + assert hints.tracklist_text is not None, "tracklist not read" + return f"hints OK: album={hints.dir_album!r}, {len(hints.tracks)} tracks" + + +def test_extract_hints_multi_disc() -> str: + from scanner import scan_album + from hint_extractor import extract_hints + with tempfile.TemporaryDirectory() as tmpdir: + root = Path(tmpdir) / "Bach_Complete" + (root / "CD1").mkdir(parents=True) + (root / "CD2").mkdir() + (root / "CD1" / "01 - Toccata.mp3").write_bytes(b"\x00" * 1024) + (root / "CD2" / "01 - Fugue.mp3").write_bytes(b"\x00" * 1024) + + scan = scan_album(root) + hints = extract_hints(scan) + disc_nums = {t.disc_number for t in hints.tracks if t.disc_number} + assert 1 in disc_nums, f"disc 1 missing: {disc_nums}" + assert 2 in disc_nums, f"disc 2 missing: {disc_nums}" + return f"disc numbers detected: {disc_nums}" + + +# --------------------------------------------------------------------------- +# executor Tests +# --------------------------------------------------------------------------- + +def test_proposed_filename_single_disc() -> str: + from executor import _proposed_filename + from models import TrackProposal + from pathlib import Path + tp = TrackProposal(path=Path("dummy.mp3"), title="Dancing Queen", + artist="ABBA", track_number=1, disc_number=None) + name = _proposed_filename(tp, ".mp3") + assert name == "01 - ABBA - Dancing Queen.mp3", f"got: {name!r}" + return name + + +def test_proposed_filename_multi_disc() -> str: + from executor import _proposed_filename + from models import TrackProposal + from pathlib import Path + tp = TrackProposal(path=Path("dummy.flac"), title="Toccata", + artist="Bach", track_number=7, disc_number=2) + name = _proposed_filename(tp, ".flac") + assert name == "2-07 - Bach - Toccata.flac", f"got: {name!r}" + return name + + +def test_proposed_filename_sanitizes_chars() -> str: + from executor import _proposed_filename + from models import TrackProposal + from pathlib import Path + tp = TrackProposal(path=Path("x.mp3"), title='Track: "Live" / Today', + artist="Artist?", track_number=3, disc_number=None) + name = _proposed_filename(tp, ".mp3") + assert "/" not in name and ":" not in name, f"unsafe chars in: {name!r}" + return name + + +# --------------------------------------------------------------------------- +# Runner +# --------------------------------------------------------------------------- + +def main() -> None: + print("🧪 Starte Music Metadata Enricher Tests...") + + cases = [ + ("UNIT_01_parse_dirname_artist_album", test_parse_dirname_artist_album), + ("UNIT_02_parse_dirname_with_year", test_parse_dirname_with_year), + ("UNIT_03_parse_dirname_album_only", test_parse_dirname_album_only), + ("UNIT_04_parse_filename_track_artist_title", test_parse_filename_track_artist_title), + ("UNIT_05_parse_filename_disc_track_title", test_parse_filename_disc_track_title), + ("UNIT_06_parse_filename_track_title", test_parse_filename_track_title), + ("UNIT_07_parse_filename_artist_title", test_parse_filename_artist_title), + ("UNIT_08_parse_tracklist_numbered", test_parse_tracklist_numbered), + ("UNIT_09_parse_tracklist_with_duration", test_parse_tracklist_with_duration), + ("UNIT_10_parse_tracklist_disc_sections", test_parse_tracklist_with_disc_sections), + ("UNIT_11_scanner_classifies_files", test_scanner_classifies_files), + ("UNIT_12_scanner_ignores_hidden", test_scanner_ignores_hidden), + ("UNIT_13_extract_hints_from_scan", test_extract_hints_from_scan), + ("UNIT_14_extract_hints_multi_disc", test_extract_hints_multi_disc), + ("UNIT_15_proposed_filename_single_disc", test_proposed_filename_single_disc), + ("UNIT_16_proposed_filename_multi_disc", test_proposed_filename_multi_disc), + ("UNIT_17_proposed_filename_sanitizes_chars", test_proposed_filename_sanitizes_chars), + ] + + for test_id, fn in cases: + run_case(test_id, fn) + + print("=" * 70) + for r in RESULTS: + icon = "✅" if r["status"] == "PASS" else "❌" + detail = r["detail"][:100] + "..." if len(r["detail"]) > 100 else r["detail"] + print(f"{icon} [{r['status']}] {r['id']} {detail}") + print("=" * 70) + + passed = sum(1 for r in RESULTS if r["status"] == "PASS") + total = len(RESULTS) + print(f"📊 {passed}/{total} Tests erfolgreich") + sys.exit(0 if passed == total else 1) + + +if __name__ == "__main__": + main() From c205fa8943a8f6adebe8ad0d6a55057031b8456a Mon Sep 17 00:00:00 2001 From: dschlueter <dschlueter@kitux.de> Date: Tue, 28 Apr 2026 21:03:29 +0200 Subject: [PATCH 02/11] feat: Ollama + OpenRouter als LLM-Reasoning-Backends MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit _claude_resolve() nutzt jetzt Ollama lokal (kostenlos, RTX 3090) als erste Wahl, dann OpenRouter/DeepSeek V3 (sehr günstig) und zuletzt Claude API. Neue ENV-Variablen: OPENROUTER_API_KEY, OLLAMA_RESOLVE_MODEL. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> --- metadata_resolver.py | 147 +++++++++++++++++++++++++++++++++++-------- 1 file changed, 120 insertions(+), 27 deletions(-) diff --git a/metadata_resolver.py b/metadata_resolver.py index 32a0642..af06b91 100644 --- a/metadata_resolver.py +++ b/metadata_resolver.py @@ -34,9 +34,14 @@ except ImportError: _MB_RATE_LIMIT = 1.1 # seconds between MusicBrainz requests _last_mb_call = 0.0 -ACOUSTID_API_KEY = os.getenv("ACOUSTID_API_KEY", "") +ACOUSTID_API_KEY = os.getenv("ACOUSTID_API_KEY", "") ANTHROPIC_API_KEY = os.getenv("ANTHROPIC_API_KEY", "") -DISCOGS_TOKEN = os.getenv("DISCOGS_TOKEN", "") +OPENROUTER_API_KEY = os.getenv("OPENROUTER_API_KEY", "") +DISCOGS_TOKEN = os.getenv("DISCOGS_TOKEN", "") +OLLAMA_HOST = os.getenv("OLLAMA_HOST", "http://localhost:11434") + +# Lokales Reasoning-Modell für Metadaten-Ergänzung (passt auf RTX 3090) +OLLAMA_RESOLVE_MODEL = os.getenv("OLLAMA_RESOLVE_MODEL", "qwen3.5:27b") def _mb_wait(): @@ -172,39 +177,126 @@ def _discogs_search(artist: Optional[str], album: Optional[str]) -> Optional[Dic # Claude API reasoning (optional) # --------------------------------------------------------------------------- +def _build_resolve_prompt(hints: AlbumHints, partial: Dict) -> str: + tracks_summary = "\n".join( + f" - Track {t.track_number or '?'}: {t.title or t.path.stem}" + + (f" [{t.artist}]" if t.artist else "") + for t in hints.tracks[:20] + ) + return ( + "Du bist ein Musikexperte. Analysiere diese Album-Daten und vervollständige die fehlenden Felder.\n\n" + f"Verzeichnisname: {hints.album_dir.name}\n" + f"Bekannte Artist: {hints.dir_artist or partial.get('artist', 'unbekannt')}\n" + f"Bekannter Albumtitel: {hints.dir_album or partial.get('album', 'unbekannt')}\n" + f"Jahr: {hints.dir_year or partial.get('year', 'unbekannt')}\n" + f"Tracklist-Hinweise:\n{tracks_summary}\n\n" + 'Antworte NUR mit einem JSON-Objekt mit diesen Feldern (null wenn unbekannt):\n' + '{"artist": ..., "album": ..., "albumartist": ..., "year": ..., "genre": ..., "label": ...}' + ) + + +def _parse_json_response(text: str) -> Optional[Dict]: + import json, re + m = re.search(r"\{.*\}", text, re.DOTALL) + if m: + try: + return json.loads(m.group()) + except Exception: + pass + return None + + +def _resolve_via_ollama(hints: AlbumHints, partial: Dict) -> Optional[Dict]: + """Lokales Reasoning via Ollama (kein API-Key nötig).""" + import urllib.request, json + prompt = _build_resolve_prompt(hints, partial) + payload = json.dumps({ + "model": OLLAMA_RESOLVE_MODEL, + "messages": [{"role": "user", "content": prompt}], + "stream": False, + "format": "json", + "options": {"temperature": 0.1}, + }).encode() + try: + req = urllib.request.Request( + f"{OLLAMA_HOST}/api/chat", + data=payload, + headers={"Content-Type": "application/json"}, + method="POST", + ) + with urllib.request.urlopen(req, timeout=120) as resp: + data = json.loads(resp.read()) + text = data.get("message", {}).get("content", "").strip() + return _parse_json_response(text) + except Exception as e: + print(f" ⚠️ Ollama-Resolve-Fehler: {e}", file=sys.stderr) + return None + + +def _resolve_via_openrouter(hints: AlbumHints, partial: Dict) -> Optional[Dict]: + """Reasoning via OpenRouter (günstige chinesische Modelle bevorzugt).""" + if not OPENROUTER_API_KEY: + return None + import urllib.request, json + prompt = _build_resolve_prompt(hints, partial) + # DeepSeek V3: extrem günstig, sehr kompetent + model = "deepseek/deepseek-chat-v3-0324" + payload = json.dumps({ + "model": model, + "messages": [{"role": "user", "content": prompt}], + "temperature": 0.1, + "max_tokens": 300, + }).encode() + try: + req = urllib.request.Request( + "https://openrouter.ai/api/v1/chat/completions", + data=payload, + headers={ + "Content-Type": "application/json", + "Authorization": f"Bearer {OPENROUTER_API_KEY}", + "HTTP-Referer": "https://pi.local", + "X-Title": "MusicMetadataEnricher", + }, + method="POST", + ) + with urllib.request.urlopen(req, timeout=30) as resp: + data = json.loads(resp.read()) + text = data["choices"][0]["message"]["content"].strip() + return _parse_json_response(text) + except Exception as e: + print(f" ⚠️ OpenRouter-Resolve-Fehler: {e}", file=sys.stderr) + return None + + def _claude_resolve(hints: AlbumHints, partial: Dict) -> Optional[Dict]: + """ + Reihenfolge: Ollama (lokal, kostenlos) → OpenRouter (günstig) → Claude API. + Ollama wird versucht wenn OLLAMA_HOST erreichbar; kein Key nötig. + """ + # 1. Ollama lokal (bevorzugt — kostenlos, RTX 3090) + result = _resolve_via_ollama(hints, partial) + if result: + return result + + # 2. OpenRouter (DeepSeek V3, günstig) wenn Key gesetzt + if OPENROUTER_API_KEY: + result = _resolve_via_openrouter(hints, partial) + if result: + return result + + # 3. Claude API als letzter Fallback if not HAS_ANTHROPIC or not ANTHROPIC_API_KEY: return None try: client = anthropic.Anthropic(api_key=ANTHROPIC_API_KEY) - tracks_summary = "\n".join( - f" - Track {t.track_number or '?'}: {t.title or t.path.stem}" - + (f" [{t.artist}]" if t.artist else "") - for t in hints.tracks[:20] - ) - prompt = f"""Du bist ein Musikexperte. Analysiere diese Album-Daten und vervollständige die fehlenden Felder. - -Verzeichnisname: {hints.album_dir.name} -Bekannte Artist: {hints.dir_artist or partial.get('artist', 'unbekannt')} -Bekannter Albumtitel: {hints.dir_album or partial.get('album', 'unbekannt')} -Jahr: {hints.dir_year or partial.get('year', 'unbekannt')} -Tracklist-Hinweise: -{tracks_summary} - -Antworte NUR mit einem JSON-Objekt mit diesen Feldern (null wenn unbekannt): -{{"artist": ..., "album": ..., "albumartist": ..., "year": ..., "genre": ..., "label": ...}}""" - + prompt = _build_resolve_prompt(hints, partial) message = client.messages.create( model="claude-haiku-4-5-20251001", max_tokens=300, messages=[{"role": "user", "content": prompt}], ) - import json text = message.content[0].text.strip() - # Extract JSON from response - json_match = __import__("re").search(r"\{.*\}", text, __import__("re").DOTALL) - if json_match: - return json.loads(json_match.group()) + return _parse_json_response(text) except Exception as e: print(f" ⚠️ Claude-API-Fehler: {e}", file=sys.stderr) return None @@ -323,9 +415,10 @@ def resolve( confidence += 0.15 sources.append("discogs") - # Claude API for remaining gaps + # LLM-Reasoning für verbleibende Lücken: + # Reihenfolge: Ollama lokal → OpenRouter (DeepSeek, günstig) → Claude API partial = {"artist": artist, "album": album, "year": year} - if use_claude and use_api and ANTHROPIC_API_KEY and HAS_ANTHROPIC: + if use_claude and use_api: if not artist or not album or confidence < 0.5: cl = _claude_resolve(hints, partial) if cl: @@ -335,7 +428,7 @@ def resolve( genre = genre or cl.get("genre") label = label or cl.get("label") confidence += 0.10 - sources.append("claude") + sources.append("llm-resolve") # Finalize albumartist track_artists = [t.artist for t in hints.tracks if t.artist] From d91eb360075c244b33c498703b6e4537370c73e8 Mon Sep 17 00:00:00 2001 From: dschlueter <dschlueter@kitux.de> Date: Tue, 28 Apr 2026 21:49:00 +0200 Subject: [PATCH 03/11] fix: korrekte Track-Nummerierung, Scanner-Rekursion, M3U-Reihenfolge MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit scanner: nicht in Unterordner wenn Root Audio-Dateien enthält (verhindert Doppel-Scan bei versehentlichen Unterordner-Kopien); nur Disc-Ordner (CD1, Disc 2…) werden bei Multi-CD-Alben rekursiert. hint_extractor: M3U/Playlist-Dateien als Track-Reihenfolge-Quelle; BOM- Bereinigung; Tracklist-Matching auch per Titel (nicht nur per Nummer); tracknumber=0 wird als 'keine Nummer' gewertet. metadata_resolver: sequenzielle Fallback-Nummerierung (1,2,3…) für Tracks ohne Tracknummer — verhindert '00'-Präfix beim --rename; dir_artist hat Vorrang vor 'Various Artists'-Heuristik; LLM darf bei Konfidenz <0.3 auch bestehende Werte korrigieren (Tippfehler im Verzeichnisnamen). Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> --- hint_extractor.py | 94 ++++++++++++++++++++++++++++++++++++++------ metadata_resolver.py | 52 ++++++++++++++++++------ models.py | 1 + scanner.py | 90 ++++++++++++++++++++++++++++++------------ 4 files changed, 189 insertions(+), 48 deletions(-) diff --git a/hint_extractor.py b/hint_extractor.py index fbac646..a11bace 100644 --- a/hint_extractor.py +++ b/hint_extractor.py @@ -51,6 +51,8 @@ _DISC_SECTION_RE = re.compile(r"(?i)(?:cd|disc|disk|side)[_ \-]*(\d{1,2})") def _clean(s: Optional[str]) -> str: if not s: return "" + # BOM (U+FEFF), Zero-Width-Space (U+200B), Soft-Hyphen (U+00AD) entfernen + s = re.sub(r"[​­]", "", s) return re.sub(r"\s+", " ", s.replace("_", " ")).strip(" -._") @@ -143,6 +145,34 @@ def _parse_tracklist(text: str) -> List[Dict[str, str]]: return tracks +def _parse_m3u(text: str) -> List[Dict[str, str]]: + """M3U/M3U8 → geordnete Liste: [{filename, title, position}]. + Reihenfolge der Einträge = gewünschte Trackreihenfolge. + """ + tracks: List[Dict[str, str]] = [] + pending_title: Optional[str] = None + position = 0 + for line in text.splitlines(): + line = line.strip() + if not line: + continue + if line.upper().startswith("#EXTINF:"): + parts = line.split(",", 1) + pending_title = parts[1].strip() if len(parts) > 1 else None + elif not line.startswith("#"): + filename = Path(line.replace("\\", "/")).name + if not filename: + continue + position += 1 + tracks.append({ + "position": str(position), + "filename": filename, + "title": pending_title or "", + }) + pending_title = None + return tracks + + def _read_tracklist_file(path: Path) -> Optional[str]: try: if path.suffix.lower() in (".htm", ".html"): @@ -203,6 +233,29 @@ def extract_hints(scan: AlbumScan) -> AlbumHints: parsed_tracklist = _parse_tracklist(hints.tracklist_text) if hints.tracklist_text else [] + # M3U/Playlist-Reihenfolge: filename (stem, normalisiert) → Tracknummer + m3u_order: Dict[str, int] = {} + m3u_titles: Dict[str, str] = {} + for pf in scan.playlist_files: + try: + text = pf.read_text(encoding="utf-8", errors="replace") + for entry in _parse_m3u(text): + stem = _clean(Path(entry["filename"]).stem).casefold() + pos = int(entry["position"]) + if stem and stem not in m3u_order: + m3u_order[stem] = pos + if entry.get("title"): + m3u_titles[stem] = entry["title"] + except Exception as e: + print(f" ⚠️ Playlist-Lesefehler {pf.name}: {e}", file=sys.stderr) + + # Tracklist-Lookup: normalisierter Titel → Eintrag (für titelbasiertes Matching) + tl_by_title: Dict[str, Dict[str, str]] = {} + for entry in parsed_tracklist: + key = _clean(entry.get("title", "")).casefold() + if key: + tl_by_title[key] = entry + # Build TrackHints per audio file for audio_path in sorted(scan.audio_files): tags, duration = _read_tags(audio_path) @@ -215,10 +268,18 @@ def extract_hints(scan: AlbumScan) -> AlbumHints: raw_tn = tags.get("tracknumber") or fn_hints.get("track") if raw_tn: try: - track_num = int(str(raw_tn).split("/")[0]) + tn_int = int(str(raw_tn).split("/")[0]) + if tn_int > 0: # 0 gilt als "keine Nummer" + track_num = tn_int except ValueError: pass + # Track number aus M3U-Reihenfolge (Vorrang vor Dateiname, aber nicht vor Tag) + if track_num is None: + stem_key = _clean(audio_path.stem).casefold() + if stem_key in m3u_order: + track_num = m3u_order[stem_key] + # Disc number: tag > filename > path segment raw_dn = tags.get("discnumber") or fn_hints.get("disc") if raw_dn: @@ -236,16 +297,27 @@ def extract_hints(scan: AlbumScan) -> AlbumHints: title = tags.get("title") or fn_hints.get("title") artist = tags.get("artist") or fn_hints.get("artist") - # Enrich from parsed tracklist if track_num matches - if parsed_tracklist and track_num: - for tl_entry in parsed_tracklist: - tl_track = tl_entry.get("track") - tl_disc = tl_entry.get("disc", "1") - if (tl_track and int(tl_track) == track_num - and int(tl_disc) == (disc_num or 1)): - if not _is_good(title) and _is_good(tl_entry.get("title")): - title = tl_entry["title"] - break + # Tracklist: erst nach Nummer, dann nach Titel + if parsed_tracklist: + matched_tl: Optional[Dict[str, str]] = None + if track_num: + for tl_entry in parsed_tracklist: + tl_track = tl_entry.get("track") + tl_disc = tl_entry.get("disc", "1") + if (tl_track and int(tl_track) == track_num + and int(tl_disc) == (disc_num or 1)): + matched_tl = tl_entry + break + if matched_tl is None and title: + matched_tl = tl_by_title.get(_clean(title).casefold()) + if matched_tl and not _is_good(title) and _is_good(matched_tl.get("title")): + title = matched_tl["title"] + + # M3U-Titel als Fallback (enthält "Composer - Title" — nur nutzen wenn kein besserer Titel) + if not _is_good(title): + stem_key = _clean(audio_path.stem).casefold() + if stem_key in m3u_titles: + title = m3u_titles[stem_key] hints.tracks.append(TrackHints( path=audio_path, diff --git a/metadata_resolver.py b/metadata_resolver.py index af06b91..2fd8eb5 100644 --- a/metadata_resolver.py +++ b/metadata_resolver.py @@ -40,8 +40,8 @@ OPENROUTER_API_KEY = os.getenv("OPENROUTER_API_KEY", "") DISCOGS_TOKEN = os.getenv("DISCOGS_TOKEN", "") OLLAMA_HOST = os.getenv("OLLAMA_HOST", "http://localhost:11434") -# Lokales Reasoning-Modell für Metadaten-Ergänzung (passt auf RTX 3090) -OLLAMA_RESOLVE_MODEL = os.getenv("OLLAMA_RESOLVE_MODEL", "qwen3.5:27b") +# qwen3:8b (5.2GB) reicht für einfache JSON-Metadaten-Ergänzung und lädt schnell (~10s) +OLLAMA_RESOLVE_MODEL = os.getenv("OLLAMA_RESOLVE_MODEL", "qwen3:8b") def _mb_wait(): @@ -184,10 +184,12 @@ def _build_resolve_prompt(hints: AlbumHints, partial: Dict) -> str: for t in hints.tracks[:20] ) return ( - "Du bist ein Musikexperte. Analysiere diese Album-Daten und vervollständige die fehlenden Felder.\n\n" + "Du bist ein Musikexperte. Analysiere diese Album-Daten.\n" + "Vervollständige fehlende Felder UND korrigiere erkennbare Tippfehler " + "(z.B. im Albumtitel oder Künstlernamen — Verzeichnisnamen enthalten oft Schreibfehler).\n\n" f"Verzeichnisname: {hints.album_dir.name}\n" - f"Bekannte Artist: {hints.dir_artist or partial.get('artist', 'unbekannt')}\n" - f"Bekannter Albumtitel: {hints.dir_album or partial.get('album', 'unbekannt')}\n" + f"Künstler (aus Verzeichnis): {hints.dir_artist or partial.get('artist', 'unbekannt')}\n" + f"Albumtitel (aus Verzeichnis, evtl. mit Tippfehlern): {hints.dir_album or partial.get('album', 'unbekannt')}\n" f"Jahr: {hints.dir_year or partial.get('year', 'unbekannt')}\n" f"Tracklist-Hinweise:\n{tracks_summary}\n\n" 'Antworte NUR mit einem JSON-Objekt mit diesen Feldern (null wenn unbekannt):\n' @@ -224,7 +226,7 @@ def _resolve_via_ollama(hints: AlbumHints, partial: Dict) -> Optional[Dict]: headers={"Content-Type": "application/json"}, method="POST", ) - with urllib.request.urlopen(req, timeout=120) as resp: + with urllib.request.urlopen(req, timeout=240) as resp: data = json.loads(resp.read()) text = data.get("message", {}).get("content", "").strip() return _parse_json_response(text) @@ -422,19 +424,34 @@ def resolve( if not artist or not album or confidence < 0.5: cl = _claude_resolve(hints, partial) if cl: - artist = artist or cl.get("artist") - album = album or cl.get("album") - year = year or cl.get("year") - genre = genre or cl.get("genre") - label = label or cl.get("label") + if confidence < 0.3: + # Sehr unsicher: LLM darf auch bestehende Werte korrigieren + # (z.B. Tippfehler im Albumtitel aus dem Verzeichnisnamen) + artist = cl.get("artist") or artist + album = cl.get("album") or album + year = cl.get("year") or year + genre = cl.get("genre") or genre + label = cl.get("label") or label + else: + artist = artist or cl.get("artist") + album = album or cl.get("album") + year = year or cl.get("year") + genre = genre or cl.get("genre") + label = label or cl.get("label") confidence += 0.10 sources.append("llm-resolve") # Finalize albumartist + # dir_artist hat Vorrang: wenn der Verzeichnisname einen Künstler nennt + # (z.B. "Eugen_Cicero_-_Jazz_meets_Classic"), ist das der Albumkünstler — + # auch wenn die Track-Dateinamen die Komponisten-Namen enthalten. track_artists = [t.artist for t in hints.tracks if t.artist] from collections import Counter distinct_artists = set(a for a in track_artists if a) - if len(distinct_artists) >= 3: + if hints.dir_artist: + # Verzeichnisname nennt explizit einen Künstler → immer verwenden + albumartist = hints.dir_artist + elif len(distinct_artists) >= 3: albumartist = "Various Artists" elif track_artists: albumartist = artist or Counter(track_artists).most_common(1)[0][0] @@ -500,4 +517,15 @@ def _build_track_proposals( mbid=None, )) + # Sequenzielle Nummerierung als letzter Fallback: + # Tracks ohne Nummer (None) erhalten eine laufende Nummer pro Disc. + # Damit werden "00" und "??" im Dateinamen beim --rename verhindert. + if any(p.track_number is None for p in proposals): + disc_counters: Dict[int, int] = {} + for p in proposals: + if p.track_number is None: + disc = p.disc_number or 1 + disc_counters[disc] = disc_counters.get(disc, 0) + 1 + p.track_number = disc_counters[disc] + return proposals diff --git a/models.py b/models.py index a5b3301..004b5da 100644 --- a/models.py +++ b/models.py @@ -26,6 +26,7 @@ class AlbumScan: audio_files: List[Path] = field(default_factory=list) image_files: List[Path] = field(default_factory=list) tracklist_files: List[Path] = field(default_factory=list) + playlist_files: List[Path] = field(default_factory=list) # .m3u / .m3u8 / .pls other_files: List[Path] = field(default_factory=list) diff --git a/scanner.py b/scanner.py index 04fa997..de06281 100644 --- a/scanner.py +++ b/scanner.py @@ -1,51 +1,91 @@ from __future__ import annotations +import re import sys from pathlib import Path from typing import List -from models import AlbumScan, AUDIO_EXTENSIONS, IMAGE_EXTENSIONS, TRACKLIST_EXTENSIONS +from models import AlbumScan, AUDIO_EXTENSIONS, IMAGE_EXTENSIONS, TRACKLIST_EXTENSIONS, PLAYLIST_EXTENSIONS + +_DISC_DIR_RE = re.compile(r"(?i)^(?:cd|disc|disk|side)[_ \-]*\d{1,2}$") def _is_hidden(name: str) -> bool: return name.startswith(".") or name.startswith("_") +def _is_disc_dir(name: str) -> bool: + """True für Ordner wie 'CD1', 'Disc 2', 'Side A', 'Disk_1'.""" + return bool(_DISC_DIR_RE.match(name)) + + def scan_album(album_dir: Path) -> AlbumScan: + """ + Scannt ein Album-Verzeichnis. + + Rekursions-Regel: + - Hat das Album-Verzeichnis selbst Audio-Dateien → kein Abstieg in Unterordner + (Einzelscheibe; Sub-Ordner wie Artworks, Scans, irrtümliche Kopien werden ignoriert). + - Hat der Root KEINE Audio-Dateien → Abstieg nur in Disc-Unterordner (CD1, Disc 2 …). + """ result = AlbumScan(album_dir=album_dir) - for dirpath, dirnames, filenames in album_dir.walk() if hasattr(album_dir, "walk") else _os_walk(album_dir): - dirnames[:] = [d for d in dirnames if not _is_hidden(d)] - current = Path(dirpath) if isinstance(dirpath, str) else dirpath + # Erst nur die Wurzel-Ebene scannen, um zu entscheiden ob rekursiert wird + root_has_audio = any( + (album_dir / name).suffix.lower() in AUDIO_EXTENSIONS + for name in _listdir(album_dir) + if not _is_hidden(name) + ) - for name in filenames: - if _is_hidden(name): - continue - p = current / name - ext = p.suffix.lower() - - if ext in AUDIO_EXTENSIONS: - result.audio_files.append(p) - elif ext in IMAGE_EXTENSIONS: - result.image_files.append(p) - elif ext in TRACKLIST_EXTENSIONS: - result.tracklist_files.append(p) - else: - result.other_files.append(p) + if root_has_audio: + # Nur Root-Ebene — keine Unterordner + _scan_dir(album_dir, album_dir, result, recurse=False) + else: + # Kein Audio an der Wurzel → Multi-CD: nur Disc-Unterordner + _scan_dir(album_dir, album_dir, result, recurse=True) result.audio_files.sort() result.image_files.sort() result.tracklist_files.sort() + result.playlist_files.sort() return result -def _os_walk(album_dir: Path): - import os - return os.walk( - album_dir, - followlinks=False, - onerror=lambda e: print(f"⚠️ Scan-Fehler: {e}", file=sys.stderr), - ) +def _listdir(path: Path) -> List[str]: + try: + return [e.name for e in path.iterdir()] + except (PermissionError, OSError) as e: + print(f"⚠️ Scan-Fehler: {e}", file=sys.stderr) + return [] + + +def _scan_dir(current: Path, album_dir: Path, result: AlbumScan, recurse: bool) -> None: + try: + entries = sorted(current.iterdir()) + except (PermissionError, OSError) as e: + print(f"⚠️ Scan-Fehler {current}: {e}", file=sys.stderr) + return + + for entry in entries: + name = entry.name + if _is_hidden(name): + continue + if entry.is_dir(): + if recurse and _is_disc_dir(name): + _scan_dir(entry, album_dir, result, recurse=True) + # Andere Unterordner (Artworks, irrtümliche Kopien…) werden übersprungen + elif entry.is_file(): + ext = entry.suffix.lower() + if ext in AUDIO_EXTENSIONS: + result.audio_files.append(entry) + elif ext in IMAGE_EXTENSIONS: + result.image_files.append(entry) + elif ext in TRACKLIST_EXTENSIONS: + result.tracklist_files.append(entry) + elif ext in PLAYLIST_EXTENSIONS: + result.playlist_files.append(entry) + else: + result.other_files.append(entry) def collect_album_dirs(root: Path) -> List[Path]: From 460b92aab309473b8b8027cc913ca0a785144a02 Mon Sep 17 00:00:00 2001 From: dschlueter <dschlueter@kitux.de> Date: Tue, 28 Apr 2026 22:06:50 +0200 Subject: [PATCH 04/11] Fix Invalid ID3TimeStamp error when writing date tags Strip non-timestamp characters (BOM, invisible chars) from date/year values both when reading existing tags in metadata_resolver and when writing in executor. Also harden the EasyID3 except block to not wipe existing tags when adding a missing ID3 header, and add per-field try/except in MP3 tag writing so one bad field doesn't abort the entire track. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> --- executor.py | 22 ++++++++++++++++++---- metadata_resolver.py | 6 +++++- 2 files changed, 23 insertions(+), 5 deletions(-) diff --git a/executor.py b/executor.py index 077c3e6..f13c808 100644 --- a/executor.py +++ b/executor.py @@ -73,7 +73,10 @@ def write_tags(path: Path, proposal: TrackProposal, album_proposal: AlbumProposa if proposal.disc_number: tags_to_write["discnumber"] = str(proposal.disc_number) if album_proposal.date: - tags_to_write["date"] = album_proposal.date + # Strip everything except valid ID3 timestamp characters to prevent ID3TimeStamp errors + date_clean = re.sub(r"[^\d\-T:+Z]", "", str(album_proposal.date)).strip() + if date_clean: + tags_to_write["date"] = date_clean if album_proposal.genre: tags_to_write["genre"] = album_proposal.genre if album_proposal.label: @@ -84,11 +87,22 @@ def write_tags(path: Path, proposal: TrackProposal, album_proposal: AlbumProposa try: audio = EasyID3(str(path)) except Exception: - audio = EasyID3() - audio.save(str(path)) + # File has no ID3 header — add one without wiping audio data + from mutagen.id3 import ID3NoHeaderError + try: + from mutagen.mp3 import MP3 + full = MP3(str(path)) + full.tags = None + full.add_tags() + full.save(str(path), v2_version=4) + except Exception: + pass audio = EasyID3(str(path)) for k, v in tags_to_write.items(): - audio[k] = [v] + try: + audio[k] = [v] + except Exception as tag_err: + print(f" ⚠️ Tag-Feld '{k}' übersprungen ({path.name}): {tag_err}", file=sys.stderr) audio.save(v2_version=4) return True diff --git a/metadata_resolver.py b/metadata_resolver.py index 2fd8eb5..56a8ccc 100644 --- a/metadata_resolver.py +++ b/metadata_resolver.py @@ -337,8 +337,12 @@ def resolve( album = album or Counter(tag_albums).most_common(1)[0][0] # Tag year/genre/label + import re as _re for t in hints.tracks: - year = year or t.existing_tags.get("date") or t.existing_tags.get("year") + raw_year = t.existing_tags.get("date") or t.existing_tags.get("year") + if raw_year and not year: + # Strip invisible chars so ID3TimeStamp validation doesn't fail later + year = _re.sub(r"[^\d\-T:+Z]", "", str(raw_year)).strip()[:10] or None genre = genre or t.existing_tags.get("genre") label = label or t.existing_tags.get("label") or t.existing_tags.get("organization") From 8bd48cf1669cb9aa26cf602df84af0f3a2ebbd97 Mon Sep 17 00:00:00 2001 From: dschlueter <dschlueter@kitux.de> Date: Tue, 28 Apr 2026 22:22:10 +0200 Subject: [PATCH 05/11] Include albumartist in filename; remove Claude API from LLM chain MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Filename schema now: TT - AlbumArtist - TrackArtist - Title when albumartist differs from track artist (e.g. pianist vs. composer). Identical artist → old two-part format unchanged. metadata_resolver: removed Claude API fallback entirely from _claude_resolve. Chain is now Ollama (local, free) → OpenRouter (DeepSeek V3, cheap) only. music_enricher: updated status line and use_claude flag accordingly. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> --- executor.py | 12 ++++++++---- metadata_resolver.py | 19 ++----------------- music_enricher.py | 6 +++--- 3 files changed, 13 insertions(+), 24 deletions(-) diff --git a/executor.py b/executor.py index f13c808..3bff22e 100644 --- a/executor.py +++ b/executor.py @@ -36,12 +36,16 @@ def _safe_name(s: str) -> str: return _SAFE_RE.sub("_", s).strip(". ") -def _proposed_filename(proposal: TrackProposal, ext: str) -> str: +def _proposed_filename(proposal: TrackProposal, ext: str, albumartist: str = "") -> str: tn = f"{proposal.track_number:02d}" if proposal.track_number else "00" prefix = f"{proposal.disc_number}-{tn}" if proposal.disc_number and proposal.disc_number > 1 else tn - artist = _safe_name(proposal.artist or "Unknown") + track_artist = _safe_name(proposal.artist or "Unknown") + aa = _safe_name(albumartist) title = _safe_name(proposal.title or "Unknown") - return f"{prefix} - {artist} - {title}{ext}" + # Include albumartist when it differs from track artist (e.g. pianist vs. composer) + if aa and aa.casefold() != track_artist.casefold() and aa.casefold() not in ("various artists", "unknown"): + return f"{prefix} - {aa} - {track_artist} - {title}{ext}" + return f"{prefix} - {track_artist} - {title}{ext}" def backup_file(path: Path, backup_dir: Path) -> bool: @@ -194,7 +198,7 @@ def execute_album( cover_embedded = True if do_rename: - new_name = _proposed_filename(tp, tp.path.suffix) + new_name = _proposed_filename(tp, tp.path.suffix, proposal.albumartist or "") candidate = tp.path.parent / new_name if candidate != tp.path: try: diff --git a/metadata_resolver.py b/metadata_resolver.py index 56a8ccc..bd6b17a 100644 --- a/metadata_resolver.py +++ b/metadata_resolver.py @@ -272,8 +272,8 @@ def _resolve_via_openrouter(hints: AlbumHints, partial: Dict) -> Optional[Dict]: def _claude_resolve(hints: AlbumHints, partial: Dict) -> Optional[Dict]: """ - Reihenfolge: Ollama (lokal, kostenlos) → OpenRouter (günstig) → Claude API. - Ollama wird versucht wenn OLLAMA_HOST erreichbar; kein Key nötig. + Reihenfolge: Ollama (lokal, kostenlos) → OpenRouter (günstig). + Claude API wird bewusst nicht genutzt (zu teuer). """ # 1. Ollama lokal (bevorzugt — kostenlos, RTX 3090) result = _resolve_via_ollama(hints, partial) @@ -286,21 +286,6 @@ def _claude_resolve(hints: AlbumHints, partial: Dict) -> Optional[Dict]: if result: return result - # 3. Claude API als letzter Fallback - if not HAS_ANTHROPIC or not ANTHROPIC_API_KEY: - return None - try: - client = anthropic.Anthropic(api_key=ANTHROPIC_API_KEY) - prompt = _build_resolve_prompt(hints, partial) - message = client.messages.create( - model="claude-haiku-4-5-20251001", - max_tokens=300, - messages=[{"role": "user", "content": prompt}], - ) - text = message.content[0].text.strip() - return _parse_json_response(text) - except Exception as e: - print(f" ⚠️ Claude-API-Fehler: {e}", file=sys.stderr) return None diff --git a/music_enricher.py b/music_enricher.py index aabd5ae..30387cc 100644 --- a/music_enricher.py +++ b/music_enricher.py @@ -96,7 +96,7 @@ def process_album( hints, use_fingerprint=not args.no_fingerprint, use_api=not args.no_api, - use_claude=bool(os.getenv("ANTHROPIC_API_KEY")), + use_claude=not args.no_api, ) # Cover art @@ -235,8 +235,8 @@ def main() -> None: sys.exit(1) print(f"🎵 {len(album_dirs)} Album-Verzeichnisse gefunden.") - if os.getenv("ANTHROPIC_API_KEY"): - print("🤖 Claude API aktiv.") + if os.getenv("OLLAMA_HOST") or True: # Ollama always attempted + print("🤖 LLM-Resolve: Ollama → OpenRouter (kein Claude)") if not args.no_api: print("🔍 MusicBrainz-Lookup aktiv.") if args.dry_run: From 5011cef4dbc7e81fda30831ba67a434b904503be Mon Sep 17 00:00:00 2001 From: dschlueter <dschlueter@kitux.de> Date: Tue, 28 Apr 2026 22:46:43 +0200 Subject: [PATCH 06/11] Underscore filename schema, classical detection, NameToUnix post-processing MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Pop schema: TT_-_Artist_-_Title.ext Classical schema: TT_-_Performer_-_Komponist_-_Werk[-_Orchester_Dirigent].ext triggered when albumartist ≠ track artist (pianist vs composer) All spaces in names → underscores; separator _-_ between parts. Missing parts (orchestra, conductor) are omitted. models.py: added conductor/orchestra optional fields to TrackProposal. executor.py: sanitize_dir_names() tries NameToUnix first, falls back to detox. Called after all renames in a directory are complete. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> --- executor.py | 96 ++++++++++++++++++++++++++++++++++++++---- models.py | 2 + test_suite_enricher.py | 10 +++-- 3 files changed, 96 insertions(+), 12 deletions(-) diff --git a/executor.py b/executor.py index 3bff22e..03631be 100644 --- a/executor.py +++ b/executor.py @@ -3,6 +3,7 @@ from __future__ import annotations import csv import re import shutil +import subprocess import sys from pathlib import Path from typing import Optional, List, Dict, Any @@ -21,6 +22,9 @@ except ImportError: from cover_handler import embed_cover _SAFE_RE = re.compile(r'[<>:"/\\|?*\x00-\x1f]') +_CLASSICAL_GENRES = re.compile( + r"(?i)class|baroque|romantic|renaissance|opera|symphony|chamber|concerto|sonata|oratorio" +) REPORT_FIELDS = [ "status", "album_dir", "track_path", "old_title", "new_title", @@ -33,19 +37,62 @@ REPORT_FIELDS = [ def _safe_name(s: str) -> str: - return _SAFE_RE.sub("_", s).strip(". ") + """Filesystem-safe name: illegal chars → '_', spaces → '_'.""" + s = _SAFE_RE.sub("_", s) + return re.sub(r"\s+", "_", s).strip("._-") -def _proposed_filename(proposal: TrackProposal, ext: str, albumartist: str = "") -> str: +def _is_classical(albumartist: str, track_artist: str, genre: str) -> bool: + """ + Classical schema applies when performer (albumartist) ≠ composer (track_artist), + which covers both 'real' classical music and jazz-on-classical-themes albums. + Genre keyword matching is used as additional signal but not required. + """ + aa = (albumartist or "").casefold().strip() + ta = (track_artist or "").casefold().strip() + if not aa or aa in ("various artists", "unknown artist", "unknown"): + return False + if aa == ta: + return False + return True # performer ≠ composer → classical naming + + +def _proposed_filename( + proposal: TrackProposal, + ext: str, + albumartist: str = "", + genre: str = "", +) -> str: + """ + Pop/Default: TT_-_Artist_-_Titel.ext + Klassik: TT_-_Performer_-_Komponist_-_Titel[-_Orchester_Dirigent].ext + + Separator zwischen Teilen: _-_ + Leerzeichen innerhalb von Namen: _ + Fehlende Teile werden weggelassen. + """ tn = f"{proposal.track_number:02d}" if proposal.track_number else "00" - prefix = f"{proposal.disc_number}-{tn}" if proposal.disc_number and proposal.disc_number > 1 else tn + disc_prefix = f"{proposal.disc_number}-" if proposal.disc_number and proposal.disc_number > 1 else "" + prefix = f"{disc_prefix}{tn}" + track_artist = _safe_name(proposal.artist or "Unknown") aa = _safe_name(albumartist) title = _safe_name(proposal.title or "Unknown") - # Include albumartist when it differs from track artist (e.g. pianist vs. composer) - if aa and aa.casefold() != track_artist.casefold() and aa.casefold() not in ("various artists", "unknown"): - return f"{prefix} - {aa} - {track_artist} - {title}{ext}" - return f"{prefix} - {track_artist} - {title}{ext}" + + if _is_classical(aa, track_artist, genre): + # Klassik-Schema: Performer _-_ Komponist _-_ Werk [_-_ Orchester,Dirigent] + parts = [prefix, aa, track_artist, title] + # Orchester und Dirigent anhängen wenn vorhanden + extra = "_".join(filter(None, [ + _safe_name(proposal.orchestra or ""), + _safe_name(proposal.conductor or ""), + ])) + if extra: + parts.append(extra) + return "_-_".join(parts) + ext + else: + # Pop/Default-Schema: Tracknummer _-_ Artist _-_ Titel + return f"{prefix}_-_{track_artist}_-_{title}{ext}" def backup_file(path: Path, backup_dir: Path) -> bool: @@ -198,7 +245,11 @@ def execute_album( cover_embedded = True if do_rename: - new_name = _proposed_filename(tp, tp.path.suffix, proposal.albumartist or "") + new_name = _proposed_filename( + tp, tp.path.suffix, + albumartist=proposal.albumartist or "", + genre=proposal.genre or "", + ) candidate = tp.path.parent / new_name if candidate != tp.path: try: @@ -231,9 +282,38 @@ def execute_album( "sources": ", ".join(proposal.sources), }) + # Nach allen Umbenennungen: Verzeichnis Linux-kompatibel bereinigen + if do_rename and not dry_run: + sanitize_dir_names(proposal.album_dir) + return stats +def sanitize_dir_names(directory: Path) -> None: + """ + Macht alle Dateinamen im Verzeichnis Linux-kompatibel. + Bevorzugt 'NameToUnix <dir>', fällt auf 'detox <file>' zurück. + """ + name_to_unix = shutil.which("NameToUnix") + if name_to_unix: + try: + subprocess.run([name_to_unix, str(directory)], check=True, capture_output=True) + return + except subprocess.CalledProcessError as e: + print(f" ⚠️ NameToUnix-Fehler: {e.stderr.decode(errors='replace').strip()}", file=sys.stderr) + + detox = shutil.which("detox") + if detox: + for f in sorted(directory.rglob("*")): + if f.is_file(): + try: + subprocess.run([detox, str(f)], check=True, capture_output=True) + except subprocess.CalledProcessError as e: + print(f" ⚠️ detox-Fehler {f.name}: {e.stderr.decode(errors='replace').strip()}", file=sys.stderr) + else: + print(" ℹ️ Weder NameToUnix noch detox gefunden — Dateinamen nicht nachbereinigt.", file=sys.stderr) + + def write_report(report_data: List[Dict[str, Any]], report_path: Path) -> None: try: report_path.parent.mkdir(parents=True, exist_ok=True) diff --git a/models.py b/models.py index 004b5da..f0b7f3a 100644 --- a/models.py +++ b/models.py @@ -61,6 +61,8 @@ class TrackProposal: disc_number: Optional[int] new_filename: Optional[str] = None # only set when --rename is active mbid: Optional[str] = None + conductor: Optional[str] = None # classical: Dirigent + orchestra: Optional[str] = None # classical: Orchester / Ensemble @dataclass diff --git a/test_suite_enricher.py b/test_suite_enricher.py index f587793..71bc588 100644 --- a/test_suite_enricher.py +++ b/test_suite_enricher.py @@ -196,10 +196,11 @@ def test_proposed_filename_single_disc() -> str: from executor import _proposed_filename from models import TrackProposal from pathlib import Path + # Pop schema: albumartist == track artist → TT_-_Artist_-_Title tp = TrackProposal(path=Path("dummy.mp3"), title="Dancing Queen", artist="ABBA", track_number=1, disc_number=None) - name = _proposed_filename(tp, ".mp3") - assert name == "01 - ABBA - Dancing Queen.mp3", f"got: {name!r}" + name = _proposed_filename(tp, ".mp3", albumartist="ABBA") + assert name == "01_-_ABBA_-_Dancing_Queen.mp3", f"got: {name!r}" return name @@ -207,10 +208,11 @@ def test_proposed_filename_multi_disc() -> str: from executor import _proposed_filename from models import TrackProposal from pathlib import Path + # Classical schema: albumartist (performer) ≠ track artist (composer) tp = TrackProposal(path=Path("dummy.flac"), title="Toccata", artist="Bach", track_number=7, disc_number=2) - name = _proposed_filename(tp, ".flac") - assert name == "2-07 - Bach - Toccata.flac", f"got: {name!r}" + name = _proposed_filename(tp, ".flac", albumartist="Gardiner") + assert name == "2-07_-_Gardiner_-_Bach_-_Toccata.flac", f"got: {name!r}" return name From d1391fc36a4e007beb35f160c33e262177eacdfb Mon Sep 17 00:00:00 2001 From: dschlueter <dschlueter@kitux.de> Date: Wed, 29 Apr 2026 02:32:11 +0200 Subject: [PATCH 07/11] Robust tracklist matching: fuzzy titles, catalog numbers, correct disc/track MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit hint_extractor: - _norm_for_match(): strips all non-alnum for punctuation-agnostic comparison - _catalog_key(): extracts BWV/Op./K./HWV/... catalog number for matching (fixes abbreviated filenames like "Fantasia_Cm_BWV_562" vs "Fantasia In C Minor, BWV 562") - Matching priority: exact number+disc → exact title → fuzzy title → catalog number - Tracklist disc+track OVERRIDE M3U position when a match is found (M3U is only used as last fallback; fixes wrong alphabetical ordering) metadata_resolver: - LLM prompt now defines artist/albumartist roles explicitly (artist = composer for classical; albumartist = performer/interpreter) - LLM albumartist can override dir_artist when confidence < 0.4 - _build_track_proposals: when track artist == albumartist (performer from filename), composer (album-level artist) is used as track artist instead - Tracklist header (first lines before tracks) included in LLM prompt for label/year/album-title discovery - import re added (was missing) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> --- hint_extractor.py | 87 ++++++++++++++++++++++++++++++++++++-------- metadata_resolver.py | 77 +++++++++++++++++++++++++++++++-------- 2 files changed, 134 insertions(+), 30 deletions(-) diff --git a/hint_extractor.py b/hint_extractor.py index a11bace..4e34775 100644 --- a/hint_extractor.py +++ b/hint_extractor.py @@ -56,6 +56,26 @@ def _clean(s: Optional[str]) -> str: return re.sub(r"\s+", " ", s.replace("_", " ")).strip(" -._") +def _norm_for_match(s: str) -> str: + """Nur Buchstaben und Ziffern — für fuzzy Titelvergleich (Interpunktion-agnostisch).""" + return re.sub(r"[^a-z0-9]", "", s.casefold()) + + +# Klassische Werkverzeichnis-Nummern: BWV 565, Op. 27, K. 331, HWV 56, … +_CATALOG_RE = re.compile( + r"\b(bwv|hwv|op|k|kv|d|sz|wq|bbwv|rv|twv|hob)\W*(\d+[a-z]?(?:[\/\.]\d+)?)", + re.IGNORECASE, +) + + +def _catalog_key(s: str) -> Optional[str]: + """Extrahiert normalisierte Katalognummer, z.B. 'bwv565' oder 'op27'.""" + m = _CATALOG_RE.search(s) + if m: + return m.group(1).lower() + re.sub(r"\W", "", m.group(2)) + return None + + def _is_good(v: Optional[str]) -> bool: if not v: return False @@ -249,12 +269,21 @@ def extract_hints(scan: AlbumScan) -> AlbumHints: except Exception as e: print(f" ⚠️ Playlist-Lesefehler {pf.name}: {e}", file=sys.stderr) - # Tracklist-Lookup: normalisierter Titel → Eintrag (für titelbasiertes Matching) + # Tracklist-Lookup: exakter Titel, fuzzy Titel, Katalognummer (BWV, Op., K., …) tl_by_title: Dict[str, Dict[str, str]] = {} + tl_by_title_norm: Dict[str, Dict[str, str]] = {} + tl_by_catalog: Dict[str, Dict[str, str]] = {} for entry in parsed_tracklist: - key = _clean(entry.get("title", "")).casefold() - if key: - tl_by_title[key] = entry + raw_title = entry.get("title", "") + exact_key = _clean(raw_title).casefold() + if exact_key: + tl_by_title[exact_key] = entry + norm_key = _norm_for_match(raw_title) + if norm_key: + tl_by_title_norm[norm_key] = entry + cat_key = _catalog_key(raw_title) + if cat_key: + tl_by_catalog[cat_key] = entry # Build TrackHints per audio file for audio_path in sorted(scan.audio_files): @@ -274,12 +303,6 @@ def extract_hints(scan: AlbumScan) -> AlbumHints: except ValueError: pass - # Track number aus M3U-Reihenfolge (Vorrang vor Dateiname, aber nicht vor Tag) - if track_num is None: - stem_key = _clean(audio_path.stem).casefold() - if stem_key in m3u_order: - track_num = m3u_order[stem_key] - # Disc number: tag > filename > path segment raw_dn = tags.get("discnumber") or fn_hints.get("disc") if raw_dn: @@ -297,21 +320,55 @@ def extract_hints(scan: AlbumScan) -> AlbumHints: title = tags.get("title") or fn_hints.get("title") artist = tags.get("artist") or fn_hints.get("artist") - # Tracklist: erst nach Nummer, dann nach Titel + # Tracklist-Matching: Nummer → exakter Titel → fuzzy Titel + # Wenn ein Match gefunden: disc+track aus Tracklist übernehmen (Tracklist ist + # autoritativer als M3U-Reihenfolge bei Alben mit expliziter Disc-Nummerierung). if parsed_tracklist: matched_tl: Optional[Dict[str, str]] = None - if track_num: + + # 1. Exakt per Tracknummer + Disc (nur wenn beides aus Tag/Dateiname bekannt) + if track_num and disc_num: for tl_entry in parsed_tracklist: tl_track = tl_entry.get("track") tl_disc = tl_entry.get("disc", "1") if (tl_track and int(tl_track) == track_num - and int(tl_disc) == (disc_num or 1)): + and int(tl_disc) == disc_num): matched_tl = tl_entry break + + # 2. Exakter Titelvergleich if matched_tl is None and title: matched_tl = tl_by_title.get(_clean(title).casefold()) - if matched_tl and not _is_good(title) and _is_good(matched_tl.get("title")): - title = matched_tl["title"] + + # 3. Fuzzy Titelvergleich (ignoriert Kommas, Apostrophe, Groß-/Kleinschreibung) + if matched_tl is None and title: + matched_tl = tl_by_title_norm.get(_norm_for_match(title)) + + # 4. Katalognummer (BWV, Op., K. …) — greift bei abgekürzten Dateinamen + if matched_tl is None and title: + cat = _catalog_key(title) + if cat: + matched_tl = tl_by_catalog.get(cat) + + if matched_tl: + # Titel aus Tracklist übernehmen wenn besser + if _is_good(matched_tl.get("title")): + title = matched_tl["title"] + # disc+track aus Tracklist sind autoritativer als M3U-Reihenfolge + try: + tl_track_n = int(matched_tl["track"]) if matched_tl.get("track") else None + tl_disc_n = int(matched_tl.get("disc", "1")) + if tl_track_n: + track_num = tl_track_n + disc_num = tl_disc_n + except (ValueError, KeyError): + pass + + # M3U-Reihenfolge nur als letzter Fallback (wenn Tracklist kein Match liefert) + if track_num is None: + stem_key = _clean(audio_path.stem).casefold() + if stem_key in m3u_order: + track_num = m3u_order[stem_key] # M3U-Titel als Fallback (enthält "Composer - Title" — nur nutzen wenn kein besserer Titel) if not _is_good(title): diff --git a/metadata_resolver.py b/metadata_resolver.py index bd6b17a..804c38c 100644 --- a/metadata_resolver.py +++ b/metadata_resolver.py @@ -1,6 +1,7 @@ from __future__ import annotations import os +import re import sys import time from typing import Optional, List, Dict, Tuple @@ -179,20 +180,42 @@ def _discogs_search(artist: Optional[str], album: Optional[str]) -> Optional[Dic def _build_resolve_prompt(hints: AlbumHints, partial: Dict) -> str: tracks_summary = "\n".join( - f" - Track {t.track_number or '?'}: {t.title or t.path.stem}" + f" - {('D'+str(t.disc_number)+'-') if t.disc_number else ''}T{t.track_number or '?'}: " + f"{t.title or t.path.stem}" + (f" [{t.artist}]" if t.artist else "") for t in hints.tracks[:20] ) + # Tracklist-Kopfzeilen (erste 400 Zeichen, vor der Track-Liste) für Album/Label-Info + tracklist_header = "" + if hints.tracklist_text: + header_lines = [] + for line in hints.tracklist_text.splitlines(): + line = line.strip() + if not line: + continue + # Stopp bei erster Zeile die wie ein Track aussieht (1-1, 1. etc.) + if re.match(r"^\d[\d\-]\s+\S", line) or re.match(r"^\d{1,3}[.)]\s+", line): + break + header_lines.append(line) + if sum(len(l) for l in header_lines) > 400: + break + tracklist_header = "\n".join(header_lines[:15]) + return ( - "Du bist ein Musikexperte. Analysiere diese Album-Daten.\n" - "Vervollständige fehlende Felder UND korrigiere erkennbare Tippfehler " - "(z.B. im Albumtitel oder Künstlernamen — Verzeichnisnamen enthalten oft Schreibfehler).\n\n" + "Du bist ein Musikexperte. Analysiere diese Album-Daten und gib korrekte Metadaten zurück.\n" + "Korrigiere auch erkennbare Tippfehler (Verzeichnisnamen enthalten oft Schreibfehler).\n\n" + "WICHTIGE FELDDEFINITIONEN:\n" + '- "artist" = Komponist (Klassik) ODER Band/Sänger (Pop/Rock/Jazz)\n' + '- "albumartist" = Interpret/Performer/Dirigent (Klassik) ODER gleich wie artist (Pop)\n' + " Beispiel Klassik: artist='Johann Sebastian Bach', albumartist='Peter Hurford'\n" + " Beispiel Pop: artist='ABBA', albumartist='ABBA'\n\n" f"Verzeichnisname: {hints.album_dir.name}\n" - f"Künstler (aus Verzeichnis): {hints.dir_artist or partial.get('artist', 'unbekannt')}\n" - f"Albumtitel (aus Verzeichnis, evtl. mit Tippfehlern): {hints.dir_album or partial.get('album', 'unbekannt')}\n" + f"Hinweis Künstler/Titel (aus Verzeichnis, kann vertauscht oder falsch sein): " + f"{hints.dir_artist or '?'} / {hints.dir_album or partial.get('album', '?')}\n" f"Jahr: {hints.dir_year or partial.get('year', 'unbekannt')}\n" - f"Tracklist-Hinweise:\n{tracks_summary}\n\n" - 'Antworte NUR mit einem JSON-Objekt mit diesen Feldern (null wenn unbekannt):\n' + + (f"Tracklist-Kopf (Label/Jahr/Albumtitel):\n{tracklist_header}\n\n" if tracklist_header else "") + + f"Tracks:\n{tracks_summary}\n\n" + 'Antworte NUR mit einem JSON-Objekt (null wenn unbekannt):\n' '{"artist": ..., "album": ..., "albumartist": ..., "year": ..., "genre": ..., "label": ...}' ) @@ -408,6 +431,7 @@ def resolve( # LLM-Reasoning für verbleibende Lücken: # Reihenfolge: Ollama lokal → OpenRouter (DeepSeek, günstig) → Claude API + cl_albumartist: Optional[str] = None partial = {"artist": artist, "album": album, "year": year} if use_claude and use_api: if not artist or not album or confidence < 0.5: @@ -427,18 +451,28 @@ def resolve( year = year or cl.get("year") genre = genre or cl.get("genre") label = label or cl.get("label") + cl_albumartist = cl.get("albumartist") or None confidence += 0.10 sources.append("llm-resolve") # Finalize albumartist - # dir_artist hat Vorrang: wenn der Verzeichnisname einen Künstler nennt - # (z.B. "Eugen_Cicero_-_Jazz_meets_Classic"), ist das der Albumkünstler — - # auch wenn die Track-Dateinamen die Komponisten-Namen enthalten. + # Priorität: (1) LLM-albumartist bei niedriger Konfidenz + # (2) dir_artist wenn Verzeichnisname einen Künstler nennt + # (3) Heuristiken (Various Artists, Mehrheitsabstimmung) + # Rationale: "Bach_Organ_-_Peter_Hurford" → dir_artist="Bach Organ" ist kein Künstler, + # aber der Verzeichnisname sieht aus wie Künstler; LLM kann das korrekt auflösen. track_artists = [t.artist for t in hints.tracks if t.artist] from collections import Counter distinct_artists = set(a for a in track_artists if a) - if hints.dir_artist: - # Verzeichnisname nennt explizit einen Künstler → immer verwenden + + _bad_aa = {"various artists", "unknown artist", "unknown", "va"} + def _good_aa(s: Optional[str]) -> bool: + return bool(s) and s.casefold().strip() not in _bad_aa + + if _good_aa(cl_albumartist) and confidence < 0.4: + # LLM kennt den echten Albumkünstler besser als der Verzeichnisname + albumartist = cl_albumartist # type: ignore[assignment] + elif hints.dir_artist: albumartist = hints.dir_artist elif len(distinct_artists) >= 3: albumartist = "Various Artists" @@ -452,7 +486,9 @@ def resolve( confidence = min(confidence, 1.0) # Build track proposals - track_proposals = _build_track_proposals(hints, mb_tracks, album, artist) + # `artist` = Komponist/Hauptkünstler (LLM-aufgelöst), `albumartist` = Performer + # Werden beide weitergegeben damit _build_track_proposals richtig zuordnen kann. + track_proposals = _build_track_proposals(hints, mb_tracks, album, albumartist, composer=artist) return AlbumProposal( album_dir=hints.album_dir, @@ -476,15 +512,26 @@ def _build_track_proposals( mb_tracks: Optional[List], album: str, album_artist: str, + composer: Optional[str] = None, ) -> List[TrackProposal]: proposals: List[TrackProposal] = [] for th in sorted(hints.tracks, key=lambda t: (t.disc_number or 1, t.track_number or 9999, str(t.path))): title = th.title - artist = th.artist or album_artist track_num = th.track_number disc_num = th.disc_number + # Klassik-Fall: Performer aus Dateiname, Komponist aus LLM + # Wenn th.artist == albumartist (Performer), und wir den Komponisten kennen, + # wird der Komponist als Track-Artist gesetzt → Filename: TT_-_Performer_-_Komponist_-_Werk + th_artist_cf = (th.artist or "").casefold().strip() + aa_cf = album_artist.casefold().strip() + if composer and th_artist_cf == aa_cf and th_artist_cf: + # Performer == albumartist → Komponist als Track-Artist + artist = composer + else: + artist = th.artist or album_artist + # Try to match from MusicBrainz track list if mb_tracks and track_num: for mb_t in mb_tracks: From 28f716f8f29ebdd0cdd26bc4f69936c936c11240 Mon Sep 17 00:00:00 2001 From: dschlueter <dschlueter@kitux.de> Date: Wed, 29 Apr 2026 02:50:20 +0200 Subject: [PATCH 08/11] Fix disc numbering consistency and false tracklist matches MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit executor: disc=1 now generates '1-01' prefix (same as disc=2 → '2-01'), so multi-disc albums have consistent D-TT scheme throughout. Single-disc tracks without disc tag stay as plain 'TT'. hint_extractor: tracklist pattern 2 now requires '.' ')' or ':' as separator (not bare whitespace) — prevents false-positive matches like '2 x CD, Compilation, Remastered' being parsed as track 2. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> --- executor.py | 4 +++- hint_extractor.py | 6 ++++-- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/executor.py b/executor.py index 03631be..708049d 100644 --- a/executor.py +++ b/executor.py @@ -72,7 +72,9 @@ def _proposed_filename( Fehlende Teile werden weggelassen. """ tn = f"{proposal.track_number:02d}" if proposal.track_number else "00" - disc_prefix = f"{proposal.disc_number}-" if proposal.disc_number and proposal.disc_number > 1 else "" + # Wenn disc_number gesetzt (auch disc=1): immer "D-TT" — konsistent über alle CDs. + # disc=None (Einzel-CD ohne Tag): nur "TT". + disc_prefix = f"{proposal.disc_number}-" if proposal.disc_number else "" prefix = f"{disc_prefix}{tn}" track_artist = _safe_name(proposal.artist or "Unknown") diff --git a/hint_extractor.py b/hint_extractor.py index 4e34775..92654ce 100644 --- a/hint_extractor.py +++ b/hint_extractor.py @@ -41,8 +41,10 @@ _DIR_PATTERNS = [ # Tracklist line patterns _TRACKLIST_PATTERNS = [ re.compile(r"^(?P<disc>\d{1,2})[- _](?P<track>\d{1,3})\s+(?P<title>.+?)(?:\s+\d+:\d{2})?$"), - re.compile(r"^(?P<track>\d{1,3})[.):\s]+(?P<title>.+?)(?:\s+\d+:\d{2})?$"), - re.compile(r"^(?P<track>[A-Z]\d{1,2})[.):\s]+(?P<title>.+?)(?:\s+\d+:\d{2})?$"), + # Separator muss . ) oder : sein — reines Leerzeichen reicht nicht + # (verhindert False-Positives wie "2 x CD, Compilation, Remastered") + re.compile(r"^(?P<track>\d{1,3})[.):]\s*(?P<title>.+?)(?:\s+\d+:\d{2})?$"), + re.compile(r"^(?P<track>[A-Z]\d{1,2})[.):]\s*(?P<title>.+?)(?:\s+\d+:\d{2})?$"), ] _DISC_SECTION_RE = re.compile(r"(?i)(?:cd|disc|disk|side)[_ \-]*(\d{1,2})") From 40a2ef3fb6b99f1d5ddbfe86e6765bea03ec4ba8 Mon Sep 17 00:00:00 2001 From: dschlueter <dschlueter@kitux.de> Date: Wed, 29 Apr 2026 03:08:21 +0200 Subject: [PATCH 09/11] Add OCR fallback via Ollama Vision for albums without tracklist text MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit hint_extractor: _ocr_back_cover() sends back/inlay images to Ollama Vision when no tracklist .txt/.htm/.nfo is present. Model priority: qwen3-vl:latest → minicpm-v:latest → deepseek-ocr:latest (configurable via OLLAMA_OCR_MODEL env var). Timeout 180s. OCR text is fed into the same _parse_tracklist() pipeline as regular text files. music_enricher: extract_hints(use_ocr=not args.no_api) — OCR is skipped with --no-api to allow fully offline/fast runs. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> --- hint_extractor.py | 91 ++++++++++++++++++++++++++++++++++++++++++++++- music_enricher.py | 2 +- 2 files changed, 91 insertions(+), 2 deletions(-) diff --git a/hint_extractor.py b/hint_extractor.py index 92654ce..9a3117a 100644 --- a/hint_extractor.py +++ b/hint_extractor.py @@ -1,7 +1,11 @@ from __future__ import annotations +import base64 +import json +import os import re import sys +import urllib.request from pathlib import Path from typing import Optional, List, Dict, Tuple @@ -224,6 +228,85 @@ def _read_tracklist_file(path: Path) -> Optional[str]: return None +_OLLAMA_HOST = os.getenv("OLLAMA_HOST", "http://localhost:11434") +# Modelle in Prioritätsreihenfolge; überschreibbar via OLLAMA_OCR_MODEL +_OCR_MODELS = [m.strip() for m in os.getenv( + "OLLAMA_OCR_MODEL", + "qwen3-vl:latest,minicpm-v:latest,deepseek-ocr:latest" +).split(",") if m.strip()] + +_OCR_PROMPT = ( + "This image shows a CD album back cover or booklet page. " + "Your task: extract the complete tracklist as plain text.\n" + "Rules:\n" + "- Output track number and title per line, e.g. '1. Title' or '1-1 Title'\n" + "- If multiple discs/CDs: include a header like 'CD 1' or 'Disc 1' before each group\n" + "- Include durations if visible (e.g. '1. Title 4:32')\n" + "- Do NOT include label info, barcodes, or other non-tracklist text\n" + "- If no tracklist is visible, reply with: NO_TRACKLIST" +) + + +def _ocr_back_cover(image_files: List[Path]) -> Optional[str]: + """ + OCR eines Back-Cover- oder Booklet-Bildes via Ollama Vision. + Gibt den erkannten Text zurück, oder None wenn nichts gefunden. + """ + # Nur Bilder die nach Back-Cover aussehen + candidates = [ + p for p in image_files + if any(kw in p.name.lower() for kw in ("back", "inlay", "booklet", "inside", "rear")) + ] + # Fallback: alle Bilder außer dem Front-Cover + if not candidates: + candidates = [ + p for p in image_files + if not any(kw in p.name.lower() for kw in ("front", "folder", "cover")) + ] + if not candidates: + return None + + image_path = candidates[0] + try: + img_b64 = base64.b64encode(image_path.read_bytes()).decode() + except Exception as e: + print(f" ⚠️ OCR-Bild lesen {image_path.name}: {e}", file=sys.stderr) + return None + + for model in _OCR_MODELS: + payload = json.dumps({ + "model": model, + "messages": [{ + "role": "user", + "content": _OCR_PROMPT, + "images": [img_b64], + }], + "stream": False, + "options": {"temperature": 0.0}, + }).encode() + try: + req = urllib.request.Request( + f"{_OLLAMA_HOST}/api/chat", + data=payload, + headers={"Content-Type": "application/json"}, + method="POST", + ) + with urllib.request.urlopen(req, timeout=180) as resp: + data = json.loads(resp.read()) + text = data.get("message", {}).get("content", "").strip() + if text and "NO_TRACKLIST" not in text: + print(f" 📷 OCR {image_path.name} via {model}: {len(text)} Zeichen extrahiert", + file=sys.stderr) + return text + elif "NO_TRACKLIST" in text: + print(f" 📷 OCR {image_path.name}: kein Tracklist-Text erkannt", file=sys.stderr) + return None + except Exception as e: + print(f" ⚠️ OCR-Fehler ({model}) {image_path.name}: {e}", file=sys.stderr) + continue + return None + + def _check_cover_images(paths: List[Path]) -> List[Path]: good: List[Path] = [] for p in paths: @@ -236,7 +319,7 @@ def _check_cover_images(paths: List[Path]) -> List[Path]: return good -def extract_hints(scan: AlbumScan) -> AlbumHints: +def extract_hints(scan: AlbumScan, use_ocr: bool = True) -> AlbumHints: hints = AlbumHints(album_dir=scan.album_dir) # Directory name @@ -253,6 +336,12 @@ def extract_hints(scan: AlbumScan) -> AlbumHints: texts.append(txt) hints.tracklist_text = "\n\n".join(texts) if texts else None + # OCR-Fallback: Back-Cover scannen wenn keine Tracklist-Textdatei vorhanden + if use_ocr and not hints.tracklist_text and scan.image_files: + ocr_text = _ocr_back_cover(scan.image_files) + if ocr_text: + hints.tracklist_text = ocr_text + parsed_tracklist = _parse_tracklist(hints.tracklist_text) if hints.tracklist_text else [] # M3U/Playlist-Reihenfolge: filename (stem, normalisiert) → Tracknummer diff --git a/music_enricher.py b/music_enricher.py index 30387cc..3e86d4c 100644 --- a/music_enricher.py +++ b/music_enricher.py @@ -90,7 +90,7 @@ def process_album( stats["skipped"] += 1 return stats - hints = extract_hints(scan) + hints = extract_hints(scan, use_ocr=not args.no_api) proposal = resolve( hints, From 888464b4d0411729b7e4bb4a3fa1604b34169c93 Mon Sep 17 00:00:00 2001 From: dschlueter <dschlueter@kitux.de> Date: Wed, 29 Apr 2026 03:13:39 +0200 Subject: [PATCH 10/11] Regenerate M3U playlist after rename with correct order and durations MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit _update_m3u(): writes #EXTM3U + #EXTINF:seconds,Artist - Title + filename per track, in disc/track order (same order as the renamed files). Duration is read from mutagen; -1 if unavailable. execute_album(): after renaming, finds existing *.m3u / *.m3u8 in the album directory and overwrites it. Only triggered when files_renamed > 0 and a playlist file exists — never creates a new one from scratch. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> --- executor.py | 40 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/executor.py b/executor.py index 708049d..42c19cb 100644 --- a/executor.py +++ b/executor.py @@ -206,6 +206,33 @@ def write_tags(path: Path, proposal: TrackProposal, album_proposal: AlbumProposa return False +def _update_m3u(m3u_path: Path, tracks: List[tuple]) -> bool: + """ + Schreibt M3U neu mit den umbenannten Dateien in Track-Reihenfolge. + tracks: [(TrackProposal, actual_path_after_rename), ...] + """ + try: + lines = ["#EXTM3U"] + for tp, track_path in tracks: + duration = -1 + if HAS_MUTAGEN: + try: + audio = MutagenFile(str(track_path)) + if audio and hasattr(audio, "info") and audio.info: + duration = int(audio.info.length) + except Exception: + pass + label = f"{tp.artist} - {tp.title}" if tp.artist else (tp.title or track_path.stem) + lines.append(f"#EXTINF:{duration},{label}") + lines.append(track_path.name) + m3u_path.write_text("\n".join(lines) + "\n", encoding="utf-8") + print(f" 📋 Playlist aktualisiert: {m3u_path.name}") + return True + except Exception as e: + print(f" ⚠️ M3U-Fehler {m3u_path.name}: {e}", file=sys.stderr) + return False + + def execute_album( proposal: AlbumProposal, backup_dir: Optional[Path], @@ -215,6 +242,7 @@ def execute_album( report_data: List[Dict[str, Any]], ) -> Dict[str, int]: stats = {"tags_written": 0, "covers_embedded": 0, "files_renamed": 0, "errors": 0} + final_tracks: List[tuple] = [] # (TrackProposal, final_path) für M3U for tp in proposal.tracks: old_title = tp.path.stem @@ -263,6 +291,9 @@ def execute_album( print(f" ⚠️ Umbenennungsfehler {tp.path.name}: {e}", file=sys.stderr) stats["errors"] += 1 + if not dry_run: + final_tracks.append((tp, new_path)) + report_data.append({ "status": "dry-run" if dry_run else "ok", "album_dir": str(proposal.album_dir.name), @@ -284,6 +315,15 @@ def execute_album( "sources": ", ".join(proposal.sources), }) + # M3U-Playlist aktualisieren wenn Dateien umbenannt wurden + if do_rename and not dry_run and stats["files_renamed"] > 0 and final_tracks: + m3u_files = ( + list(proposal.album_dir.glob("*.m3u")) + + list(proposal.album_dir.glob("*.m3u8")) + ) + if m3u_files: + _update_m3u(m3u_files[0], final_tracks) + # Nach allen Umbenennungen: Verzeichnis Linux-kompatibel bereinigen if do_rename and not dry_run: sanitize_dir_names(proposal.album_dir) From b6abfae16cca37354c90e4e29b4a6c0b5ec0a6c0 Mon Sep 17 00:00:00 2001 From: dschlueter <dschlueter@kitux.de> Date: Wed, 29 Apr 2026 05:19:26 +0200 Subject: [PATCH 11/11] Add YouTube ID detection and metadata lookup via yt-dlp - Extract 11-char YouTube video IDs from audio filenames - Fetch title, uploader, chapters via yt-dlp (--dump-json) - Use chapters as tracklist when no .txt tracklist is available - Store yt_title / yt_uploader in AlbumHints for LLM prompt context - Fall back to YouTube video title as track title for single-file albums Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> --- hint_extractor.py | 103 +++++++++++++++++++++++++++++++++++++++++++ metadata_resolver.py | 10 +++++ models.py | 2 + 3 files changed, 115 insertions(+) mode change 100644 => 100755 hint_extractor.py mode change 100644 => 100755 metadata_resolver.py mode change 100644 => 100755 models.py diff --git a/hint_extractor.py b/hint_extractor.py old mode 100644 new mode 100755 index 9a3117a..433c71d --- a/hint_extractor.py +++ b/hint_extractor.py @@ -4,6 +4,8 @@ import base64 import json import os import re +import shutil +import subprocess import sys import urllib.request from pathlib import Path @@ -319,6 +321,59 @@ def _check_cover_images(paths: List[Path]) -> List[Path]: return good +# YouTube-Video-ID: 11 Zeichen aus [A-Za-z0-9_-], eingebettet im Dateinamen +_YT_ID_RE = re.compile(r"(?<![A-Za-z0-9_-])([A-Za-z0-9_-]{11})(?![A-Za-z0-9_-])") + + +def _extract_youtube_id(path: Path) -> Optional[str]: + """Sucht eine YouTube-Video-ID im Dateinamen (Stem oder Suffix).""" + name = path.stem + path.suffix + for m in _YT_ID_RE.finditer(name): + candidate = m.group(1) + # Einfache Plausibilitätsprüfung: muss gemischte Zeichen haben + if re.search(r"[A-Z]", candidate) and re.search(r"[0-9a-z]", candidate): + return candidate + return None + + +def _fetch_youtube_metadata(video_id: str) -> Optional[Dict]: + """ + Ruft YouTube-Metadaten via yt-dlp ab (kein API-Key nötig). + Gibt Dict mit title, uploader, chapters, description zurück oder None. + """ + ytdlp = shutil.which("yt-dlp") + if not ytdlp: + return None + url = f"https://www.youtube.com/watch?v={video_id}" + try: + result = subprocess.run( + [ytdlp, "--dump-json", "--no-download", "--no-playlist", url], + capture_output=True, text=True, timeout=30, + ) + if result.returncode != 0 or not result.stdout.strip(): + return None + return json.loads(result.stdout) + except Exception as e: + print(f" ⚠️ YouTube-Fehler ({video_id}): {e}", file=sys.stderr) + return None + + +def _chapters_to_tracklist_text(chapters: List[Dict]) -> str: + """ + Konvertiert yt-dlp-Chapters in Tracklist-Text der vom _parse_tracklist + verarbeitetet werden kann: '1. Titel MM:SS' + """ + lines = [] + for i, ch in enumerate(chapters, 1): + title = ch.get("title", "").strip() + if not title or title.startswith("<Untitled"): + continue + secs = int(ch.get("start_time", 0)) + mm, ss = divmod(secs, 60) + lines.append(f"{i}. {title} {mm}:{ss:02d}") + return "\n".join(lines) + + def extract_hints(scan: AlbumScan, use_ocr: bool = True) -> AlbumHints: hints = AlbumHints(album_dir=scan.album_dir) @@ -342,6 +397,43 @@ def extract_hints(scan: AlbumScan, use_ocr: bool = True) -> AlbumHints: if ocr_text: hints.tracklist_text = ocr_text + # YouTube-Lookup: IDs aus Dateinamen extrahieren, Metadaten per yt-dlp holen + yt_meta_by_id: Dict[str, Optional[Dict]] = {} + yt_ids_by_stem: Dict[str, str] = {} # stem (normalisiert) → youtube_id + + for audio_path in scan.audio_files: + yt_id = _extract_youtube_id(audio_path) + if yt_id: + stem_key = _clean(audio_path.stem).casefold() + yt_ids_by_stem[stem_key] = yt_id + yt_meta_by_id.setdefault(yt_id, None) + + if yt_meta_by_id: + print(f" 📺 YouTube-IDs gefunden: {', '.join(list(yt_meta_by_id.keys())[:5])}", file=sys.stderr) + for yt_id in list(yt_meta_by_id.keys())[:5]: + meta = _fetch_youtube_metadata(yt_id) + yt_meta_by_id[yt_id] = meta + + # Chapters als Tracklist nutzen wenn noch keine vorhanden + if not hints.tracklist_text: + for yt_id, meta in yt_meta_by_id.items(): + if meta and meta.get("chapters"): + chapter_text = _chapters_to_tracklist_text(meta["chapters"]) + if chapter_text: + hints.tracklist_text = chapter_text + print(f" 📺 YouTube-Chapters als Tracklist: {len(meta['chapters'])} Tracks", + file=sys.stderr) + break + + # Album-Level-Hints (erster erfolgreicher Treffer) + for yt_id, meta in yt_meta_by_id.items(): + if meta: + hints.yt_title = (meta.get("title") or "").strip() or None + hints.yt_uploader = ( + meta.get("uploader") or meta.get("channel") or "" + ).strip() or None + break + parsed_tracklist = _parse_tracklist(hints.tracklist_text) if hints.tracklist_text else [] # M3U/Playlist-Reihenfolge: filename (stem, normalisiert) → Tracknummer @@ -467,6 +559,17 @@ def extract_hints(scan: AlbumScan, use_ocr: bool = True) -> AlbumHints: if stem_key in m3u_titles: title = m3u_titles[stem_key] + # YouTube-Titel als letzter Fallback (bei einzelner Datei = das ganze Video) + if not _is_good(title): + stem_key = _clean(audio_path.stem).casefold() + yt_id = yt_ids_by_stem.get(stem_key) + if yt_id: + meta = yt_meta_by_id.get(yt_id) + if meta: + yt_video_title = (meta.get("title") or "").strip() + if yt_video_title: + title = yt_video_title + hints.tracks.append(TrackHints( path=audio_path, track_number=track_num, diff --git a/metadata_resolver.py b/metadata_resolver.py old mode 100644 new mode 100755 index 804c38c..f109d04 --- a/metadata_resolver.py +++ b/metadata_resolver.py @@ -213,6 +213,8 @@ def _build_resolve_prompt(hints: AlbumHints, partial: Dict) -> str: f"Hinweis Künstler/Titel (aus Verzeichnis, kann vertauscht oder falsch sein): " f"{hints.dir_artist or '?'} / {hints.dir_album or partial.get('album', '?')}\n" f"Jahr: {hints.dir_year or partial.get('year', 'unbekannt')}\n" + + (f"YouTube-Videotitel: {hints.yt_title}\n" if hints.yt_title else "") + + (f"YouTube-Uploader/Kanal: {hints.yt_uploader}\n" if hints.yt_uploader else "") + (f"Tracklist-Kopf (Label/Jahr/Albumtitel):\n{tracklist_header}\n\n" if tracklist_header else "") + f"Tracks:\n{tracks_summary}\n\n" 'Antworte NUR mit einem JSON-Objekt (null wenn unbekannt):\n' @@ -354,9 +356,17 @@ def resolve( genre = genre or t.existing_tags.get("genre") label = label or t.existing_tags.get("label") or t.existing_tags.get("organization") + # YouTube-Metadaten als zusätzliche Hinweise (Uploader → Künstler, Titel → Album/Track) + if hints.yt_uploader and not artist: + artist = hints.yt_uploader + if hints.yt_title and not album: + album = hints.yt_title + if artist or album: confidence += 0.05 sources.append("local-hints") + if hints.yt_title or hints.yt_uploader: + sources.append("youtube") # AcoustID fingerprinting fp_mbids: Dict[str, List[str]] = {} diff --git a/models.py b/models.py old mode 100644 new mode 100755 index f0b7f3a..a95662f --- a/models.py +++ b/models.py @@ -50,6 +50,8 @@ class AlbumHints: tracklist_text: Optional[str] = None # merged text from all tracklist files cover_images: List[Path] = field(default_factory=list) tracks: List[TrackHints] = field(default_factory=list) + yt_title: Optional[str] = None # YouTube video title (if found) + yt_uploader: Optional[str] = None # YouTube channel/uploader name @dataclass