Initial implementation of Music Metadata Enricher

AI-powered per-album pipeline: scan → local hints → MusicBrainz/Discogs/Claude
resolve → cover art → interactive or auto review → tag write + rename + report.
All external dependencies optional; 17/17 unit tests passing.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Dieter Schlüter 2026-04-28 16:55:18 +02:00
commit f7cf520dbe
8 changed files with 1748 additions and 0 deletions

171
cover_handler.py Normal file
View file

@ -0,0 +1,171 @@
from __future__ import annotations
import sys
import tempfile
import time
from pathlib import Path
from typing import Optional, List
try:
from PIL import Image
HAS_PIL = True
except ImportError:
HAS_PIL = False
try:
import requests
HAS_REQUESTS = True
except ImportError:
HAS_REQUESTS = False
try:
import musicbrainzngs as mb
HAS_MB = True
except ImportError:
HAS_MB = False
try:
from mutagen.id3 import ID3, APIC, error as ID3Error
from mutagen.flac import FLAC, Picture
from mutagen.mp4 import MP4, MP4Cover
HAS_MUTAGEN = True
except ImportError:
HAS_MUTAGEN = False
_MIN_COVER_SIZE = 200 # pixels
def _image_ok(path: Path) -> bool:
if not HAS_PIL:
return path.stat().st_size > 5000
try:
with Image.open(path) as img:
w, h = img.size
return w >= _MIN_COVER_SIZE and h >= _MIN_COVER_SIZE
except Exception:
return False
def find_local_cover(image_files: List[Path]) -> Optional[Path]:
priority = ("front", "folder", "cover", "album")
# Sort by priority keyword, then size descending
def key(p: Path):
name = p.name.lower()
score = next((i for i, kw in enumerate(priority) if kw in name), len(priority))
size = p.stat().st_size if p.exists() else 0
return (score, -size)
for p in sorted(image_files, key=key):
if _image_ok(p):
return p
return None
def _mb_cover_url(release_mbid: str) -> Optional[str]:
url = f"https://coverartarchive.org/release/{release_mbid}/front"
if not HAS_REQUESTS:
return None
try:
r = requests.head(url, timeout=5, allow_redirects=True)
if r.status_code == 200:
return url
except Exception:
pass
return None
def download_cover(release_mbid: Optional[str], dest_dir: Path) -> Optional[Path]:
if not release_mbid or not HAS_REQUESTS:
return None
url = _mb_cover_url(release_mbid)
if not url:
return None
try:
r = requests.get(url, timeout=15)
if r.status_code == 200:
ext = ".jpg"
ct = r.headers.get("content-type", "")
if "png" in ct:
ext = ".png"
dest = dest_dir / f"_cover_download{ext}"
dest.write_bytes(r.content)
if _image_ok(dest):
return dest
dest.unlink(missing_ok=True)
except Exception as e:
print(f" ⚠️ Cover-Download-Fehler: {e}", file=sys.stderr)
return None
def embed_cover(audio_path: Path, cover_path: Path) -> bool:
if not HAS_MUTAGEN:
return False
try:
img_data = cover_path.read_bytes()
mime = "image/jpeg" if cover_path.suffix.lower() in (".jpg", ".jpeg") else "image/png"
ext = audio_path.suffix.lower()
if ext == ".mp3":
try:
tags = ID3(str(audio_path))
except ID3Error:
tags = ID3()
tags.delall("APIC")
tags.add(APIC(encoding=3, mime=mime, type=3, desc="Cover", data=img_data))
tags.save(str(audio_path), v2_version=4)
return True
elif ext == ".flac":
audio = FLAC(str(audio_path))
audio.clear_pictures()
pic = Picture()
pic.type = 3
pic.mime = mime
pic.desc = "Cover"
pic.data = img_data
audio.add_picture(pic)
audio.save()
return True
elif ext == ".m4a":
audio = MP4(str(audio_path))
fmt = MP4Cover.FORMAT_JPEG if mime == "image/jpeg" else MP4Cover.FORMAT_PNG
audio.tags["covr"] = [MP4Cover(img_data, imageformat=fmt)]
audio.save()
return True
else:
# Generic mutagen fallback
from mutagen import File as MutagenFile
audio = MutagenFile(str(audio_path), easy=False)
if audio is not None:
if audio.tags is None:
audio.add_tags()
if hasattr(audio.tags, "add"):
audio.tags.add(
APIC(encoding=3, mime=mime, type=3, desc="Cover", data=img_data)
)
audio.save()
return True
except Exception as e:
print(f" ⚠️ Cover-Einbettungsfehler {audio_path.name}: {e}", file=sys.stderr)
return False
def resolve_cover(
image_files: List[Path],
release_mbid: Optional[str],
album_dir: Path,
) -> tuple[Optional[Path], Optional[str]]:
"""Returns (cover_path, source_label)."""
local = find_local_cover(image_files)
if local:
return local, "local"
if release_mbid:
downloaded = download_cover(release_mbid, album_dir)
if downloaded:
return downloaded, "musicbrainz"
return None, None

228
executor.py Normal file
View file

@ -0,0 +1,228 @@
from __future__ import annotations
import csv
import re
import shutil
import sys
from pathlib import Path
from typing import Optional, List, Dict, Any
from models import AlbumProposal, TrackProposal
try:
from mutagen import File as MutagenFile
from mutagen.easyid3 import EasyID3
from mutagen.flac import FLAC
from mutagen.mp4 import MP4, MP4Tags
HAS_MUTAGEN = True
except ImportError:
HAS_MUTAGEN = False
from cover_handler import embed_cover
_SAFE_RE = re.compile(r'[<>:"/\\|?*\x00-\x1f]')
REPORT_FIELDS = [
"status", "album_dir", "track_path",
"old_title", "new_title",
"old_artist", "new_artist",
"album", "albumartist", "date", "genre", "label",
"track_number", "disc_number",
"cover_embedded", "renamed_to",
"confidence", "sources",
]
def _safe_name(s: str) -> str:
return _SAFE_RE.sub("_", s).strip(". ")
def _proposed_filename(proposal: TrackProposal, ext: str) -> str:
tn = f"{proposal.track_number:02d}" if proposal.track_number else "00"
prefix = f"{proposal.disc_number}-{tn}" if proposal.disc_number and proposal.disc_number > 1 else tn
artist = _safe_name(proposal.artist or "Unknown")
title = _safe_name(proposal.title or "Unknown")
return f"{prefix} - {artist} - {title}{ext}"
def backup_file(path: Path, backup_dir: Path) -> bool:
try:
backup_dir.mkdir(parents=True, exist_ok=True)
rel = path.parent.name + "__" + path.name
dest = backup_dir / rel
if not dest.exists():
shutil.copy2(path, dest)
return True
except Exception as e:
print(f" ⚠️ Backup-Fehler {path.name}: {e}", file=sys.stderr)
return False
def write_tags(path: Path, proposal: TrackProposal, album_proposal: AlbumProposal) -> bool:
if not HAS_MUTAGEN:
return False
ext = path.suffix.lower()
tags_to_write = {
"title": proposal.title or "",
"artist": proposal.artist or "",
"album": album_proposal.album or "",
"albumartist": album_proposal.albumartist or "",
}
if proposal.track_number:
total = len(album_proposal.tracks)
tags_to_write["tracknumber"] = f"{proposal.track_number}/{total}"
if proposal.disc_number:
tags_to_write["discnumber"] = str(proposal.disc_number)
if album_proposal.date:
tags_to_write["date"] = album_proposal.date
if album_proposal.genre:
tags_to_write["genre"] = album_proposal.genre
if album_proposal.label:
tags_to_write["organization"] = album_proposal.label
try:
if ext == ".mp3":
try:
audio = EasyID3(str(path))
except Exception:
audio = EasyID3()
audio.save(str(path))
audio = EasyID3(str(path))
for k, v in tags_to_write.items():
audio[k] = [v]
audio.save(v2_version=4)
return True
elif ext == ".flac":
audio = FLAC(str(path))
for k, v in tags_to_write.items():
audio[k] = [v]
audio.save()
return True
elif ext == ".m4a":
audio = MP4(str(path))
mapping = {
"title": "\xa9nam", "artist": "\xa9ART",
"album": "\xa9alb", "albumartist": "aART",
"tracknumber": "trkn", "date": "\xa9day",
"genre": "\xa9gen",
}
for k, v in tags_to_write.items():
tag_key = mapping.get(k)
if tag_key:
if tag_key == "trkn":
try:
num, total = v.split("/") if "/" in v else (v, "0")
audio[tag_key] = [(int(num), int(total))]
except Exception:
pass
else:
audio[tag_key] = [v]
audio.save()
return True
else:
audio = MutagenFile(str(path), easy=True)
if audio is not None:
if audio.tags is None:
audio.add_tags()
for k, v in tags_to_write.items():
try:
audio[k] = [v]
except Exception:
pass
audio.save()
return True
except Exception as e:
print(f" ⚠️ Tag-Schreibfehler {path.name}: {e}", file=sys.stderr)
return False
def execute_album(
proposal: AlbumProposal,
backup_dir: Optional[Path],
do_rename: bool,
embed_cover_art: bool,
dry_run: bool,
report_data: List[Dict[str, Any]],
) -> Dict[str, int]:
stats = {"tags_written": 0, "covers_embedded": 0, "files_renamed": 0, "errors": 0}
for tp in proposal.tracks:
old_title = tp.path.stem
old_artist = ""
if HAS_MUTAGEN:
try:
audio = MutagenFile(str(tp.path), easy=True)
if audio and audio.tags:
old_artist = str(audio.tags.get("artist", [""])[0])
old_title = str(audio.tags.get("title", [tp.path.stem])[0])
except Exception:
pass
new_path = tp.path
renamed_to = ""
cover_embedded = False
if not dry_run:
if backup_dir:
backup_file(tp.path, backup_dir)
if write_tags(tp.path, tp, proposal):
stats["tags_written"] += 1
else:
stats["errors"] += 1
if embed_cover_art and proposal.cover_path:
if embed_cover(tp.path, proposal.cover_path):
stats["covers_embedded"] += 1
cover_embedded = True
if do_rename:
new_name = _proposed_filename(tp, tp.path.suffix)
candidate = tp.path.parent / new_name
if candidate != tp.path:
try:
tp.path.rename(candidate)
new_path = candidate
renamed_to = new_name
stats["files_renamed"] += 1
except Exception as e:
print(f" ⚠️ Umbenennungsfehler {tp.path.name}: {e}", file=sys.stderr)
stats["errors"] += 1
report_data.append({
"status": "dry-run" if dry_run else "ok",
"album_dir": str(proposal.album_dir.name),
"track_path": str(new_path),
"old_title": old_title,
"new_title": tp.title,
"old_artist": old_artist,
"new_artist": tp.artist,
"album": proposal.album,
"albumartist": proposal.albumartist,
"date": proposal.date or "",
"genre": proposal.genre or "",
"label": proposal.label or "",
"track_number": tp.track_number or "",
"disc_number": tp.disc_number or "",
"cover_embedded": cover_embedded,
"renamed_to": renamed_to,
"confidence": f"{proposal.confidence:.2f}",
"sources": ", ".join(proposal.sources),
})
return stats
def write_report(report_data: List[Dict[str, Any]], report_path: Path) -> None:
try:
report_path.parent.mkdir(parents=True, exist_ok=True)
with report_path.open("w", encoding="utf-8", newline="") as f:
w = csv.DictWriter(f, fieldnames=REPORT_FIELDS)
w.writeheader()
w.writerows(report_data)
print(f"📊 Report gespeichert: {report_path}")
except Exception as e:
print(f"⚠️ Report-Fehler: {e}", file=sys.stderr)

260
hint_extractor.py Normal file
View file

@ -0,0 +1,260 @@
from __future__ import annotations
import re
import sys
from pathlib import Path
from typing import Optional, List, Dict, Tuple
from models import AlbumScan, AlbumHints, TrackHints
try:
from mutagen import File as MutagenFile
HAS_MUTAGEN = True
except ImportError:
HAS_MUTAGEN = False
try:
from bs4 import BeautifulSoup
HAS_BS4 = True
except ImportError:
HAS_BS4 = False
_NATSORT_RE = re.compile(r"(\d+)")
_BAD_VALUES = {"unknown", "unknown artist", "unknown album", "untitled", "track", "va", "various"}
# Filename patterns: most specific first
_FILENAME_PATTERNS = [
re.compile(r"^(?P<disc>\d{1,2})[- _]+(?P<track>\d{1,3})\s*[-._ ]+\s*(?P<artist>.+?)\s*[-]\s*(?P<title>.+)$"),
re.compile(r"^(?P<disc>\d{1,2})[- _]+(?P<track>\d{1,3})\s*[-._ ]+\s*(?P<title>.+)$"),
re.compile(r"^(?P<track>\d{1,3})\s*[-._ ]+\s*(?P<artist>.+?)\s*[-]\s*(?P<title>.+)$"),
re.compile(r"^(?P<track>\d{1,3})\s*[-._ ]+\s*(?P<title>.+)$"),
re.compile(r"^(?P<artist>.+?)\s*[-]\s*(?P<title>.+)$"),
]
# Directory name patterns
_DIR_PATTERNS = [
re.compile(r"^(?P<artist>.+?)[_ -]+[-][_ -]+(?P<album>.+?)(?:[_ -]+(?P<year>\d{4}))?$"),
re.compile(r"^(?P<artist>.+?)[_ ]+(?P<year>\d{4})[._ -]+(?P<album>.+)$"),
re.compile(r"^(?P<album>.+?)[_ -]+(?P<year>\d{4})$"),
]
# Tracklist line patterns
_TRACKLIST_PATTERNS = [
re.compile(r"^(?P<disc>\d{1,2})[- _](?P<track>\d{1,3})\s+(?P<title>.+?)(?:\s+\d+:\d{2})?$"),
re.compile(r"^(?P<track>\d{1,3})[.):\s]+(?P<title>.+?)(?:\s+\d+:\d{2})?$"),
re.compile(r"^(?P<track>[A-Z]\d{1,2})[.):\s]+(?P<title>.+?)(?:\s+\d+:\d{2})?$"),
]
_DISC_SECTION_RE = re.compile(r"(?i)(?:cd|disc|disk|side)[_ \-]*(\d{1,2})")
def _clean(s: Optional[str]) -> str:
if not s:
return ""
return re.sub(r"\s+", " ", s.replace("_", " ")).strip(" -._")
def _is_good(v: Optional[str]) -> bool:
if not v:
return False
return _clean(v).casefold() not in _BAD_VALUES
def _parse_dirname(name: str) -> Tuple[Optional[str], Optional[str], Optional[str]]:
name_clean = _clean(name)
for pat in _DIR_PATTERNS:
m = pat.match(name_clean)
if m:
d = m.groupdict()
artist = _clean(d.get("artist")) or None
album = _clean(d.get("album")) or None
year = d.get("year")
if _is_good(artist) or _is_good(album):
return artist, album, year
# No pattern matched — treat whole name as album
return None, _clean(name_clean), None
def _parse_filename(stem: str) -> Dict[str, str]:
stem_clean = _clean(stem)
for pat in _FILENAME_PATTERNS:
m = pat.match(stem_clean)
if m:
return {k: _clean(v) for k, v in m.groupdict().items() if v}
return {"title": stem_clean}
def _read_tags(path: Path) -> Tuple[Dict[str, str], Optional[float]]:
if not HAS_MUTAGEN:
return {}, None
try:
audio = MutagenFile(str(path), easy=True)
if not audio:
return {}, None
tags: Dict[str, str] = {}
for k in ("title", "artist", "album", "albumartist", "tracknumber",
"discnumber", "date", "year", "genre", "label", "organization"):
v = audio.get(k)
if v:
tags[k] = str(v[0]).strip()
if "year" in tags and "date" not in tags:
tags["date"] = tags["year"]
duration = None
if hasattr(audio, "info") and audio.info and hasattr(audio.info, "length"):
duration = audio.info.length
return tags, duration
except Exception as e:
print(f" ⚠️ Tag-Lesefehler {path.name}: {e}", file=sys.stderr)
return {}, None
def _parse_tracklist(text: str) -> List[Dict[str, str]]:
tracks: List[Dict[str, str]] = []
current_disc = 1
for line in text.splitlines():
line = line.strip()
if not line:
continue
disc_m = _DISC_SECTION_RE.match(line)
if disc_m and len(line) < 30:
current_disc = int(disc_m.group(1))
continue
for pat in _TRACKLIST_PATTERNS:
m = pat.match(line)
if m:
d = m.groupdict()
entry: Dict[str, str] = {"title": _clean(d.get("title", ""))}
raw_track = d.get("track", "")
if raw_track and raw_track.isdigit():
entry["track"] = raw_track.lstrip("0") or "0"
elif raw_track:
entry["track"] = raw_track
if "disc" in d and d["disc"]:
entry["disc"] = d["disc"]
else:
entry["disc"] = str(current_disc)
if entry.get("title"):
tracks.append(entry)
break
return tracks
def _read_tracklist_file(path: Path) -> Optional[str]:
try:
if path.suffix.lower() in (".htm", ".html"):
raw = path.read_bytes()
encoding = "utf-8"
for enc in ("utf-8", "latin-1", "cp1252"):
try:
raw.decode(enc)
encoding = enc
break
except UnicodeDecodeError:
continue
text = raw.decode(encoding, errors="replace")
if HAS_BS4:
soup = BeautifulSoup(text, "html.parser")
return soup.get_text(separator="\n")
# Fallback: strip HTML tags
return re.sub(r"<[^>]+>", " ", text)
else:
for enc in ("utf-8", "latin-1", "cp1252"):
try:
return path.read_text(encoding=enc)
except UnicodeDecodeError:
continue
except Exception as e:
print(f" ⚠️ Tracklist-Lesefehler {path.name}: {e}", file=sys.stderr)
return None
def _check_cover_images(paths: List[Path]) -> List[Path]:
good: List[Path] = []
for p in paths:
name_lower = p.name.lower()
# Prefer front covers
if any(kw in name_lower for kw in ("front", "folder", "cover", "album")):
good.insert(0, p)
else:
good.append(p)
return good
def extract_hints(scan: AlbumScan) -> AlbumHints:
hints = AlbumHints(album_dir=scan.album_dir)
# Directory name
hints.dir_artist, hints.dir_album, hints.dir_year = _parse_dirname(scan.album_dir.name)
# Cover images
hints.cover_images = _check_cover_images(scan.image_files)
# Tracklist files
texts: List[str] = []
for tf in scan.tracklist_files:
txt = _read_tracklist_file(tf)
if txt:
texts.append(txt)
hints.tracklist_text = "\n\n".join(texts) if texts else None
parsed_tracklist = _parse_tracklist(hints.tracklist_text) if hints.tracklist_text else []
# Build TrackHints per audio file
for audio_path in sorted(scan.audio_files):
tags, duration = _read_tags(audio_path)
fn_hints = _parse_filename(audio_path.stem)
track_num: Optional[int] = None
disc_num: Optional[int] = None
# Track number: tag > filename
raw_tn = tags.get("tracknumber") or fn_hints.get("track")
if raw_tn:
try:
track_num = int(str(raw_tn).split("/")[0])
except ValueError:
pass
# Disc number: tag > filename > path segment
raw_dn = tags.get("discnumber") or fn_hints.get("disc")
if raw_dn:
try:
disc_num = int(str(raw_dn).split("/")[0])
except ValueError:
pass
if not disc_num:
for part in audio_path.relative_to(scan.album_dir).parts[:-1]:
dm = _DISC_SECTION_RE.search(part)
if dm:
disc_num = int(dm.group(1))
break
title = tags.get("title") or fn_hints.get("title")
artist = tags.get("artist") or fn_hints.get("artist")
# Enrich from parsed tracklist if track_num matches
if parsed_tracklist and track_num:
for tl_entry in parsed_tracklist:
tl_track = tl_entry.get("track")
tl_disc = tl_entry.get("disc", "1")
if (tl_track and int(tl_track) == track_num
and int(tl_disc) == (disc_num or 1)):
if not _is_good(title) and _is_good(tl_entry.get("title")):
title = tl_entry["title"]
break
hints.tracks.append(TrackHints(
path=audio_path,
track_number=track_num,
disc_number=disc_num,
title=_clean(title) if title else None,
artist=_clean(artist) if artist else None,
duration=duration,
existing_tags=tags,
))
return hints

410
metadata_resolver.py Normal file
View file

@ -0,0 +1,410 @@
from __future__ import annotations
import os
import sys
import time
from typing import Optional, List, Dict, Tuple
from models import AlbumHints, AlbumProposal, TrackProposal
try:
import musicbrainzngs as mb
mb.set_useragent("MusicMetadataEnricher", "1.0", "https://github.com/dschlueter")
HAS_MB = True
except ImportError:
HAS_MB = False
try:
import acoustid
HAS_ACOUSTID = True
except ImportError:
HAS_ACOUSTID = False
try:
import discogs_client as dc
HAS_DISCOGS = True
except ImportError:
HAS_DISCOGS = False
try:
import anthropic
HAS_ANTHROPIC = True
except ImportError:
HAS_ANTHROPIC = False
_MB_RATE_LIMIT = 1.1 # seconds between MusicBrainz requests
_last_mb_call = 0.0
ACOUSTID_API_KEY = os.getenv("ACOUSTID_API_KEY", "")
ANTHROPIC_API_KEY = os.getenv("ANTHROPIC_API_KEY", "")
DISCOGS_TOKEN = os.getenv("DISCOGS_TOKEN", "")
def _mb_wait():
global _last_mb_call
elapsed = time.monotonic() - _last_mb_call
if elapsed < _MB_RATE_LIMIT:
time.sleep(_MB_RATE_LIMIT - elapsed)
_last_mb_call = time.monotonic()
# ---------------------------------------------------------------------------
# AcoustID fingerprinting
# ---------------------------------------------------------------------------
def _fingerprint_tracks(hints: AlbumHints) -> Dict[str, List[str]]:
"""Returns {audio_path_str: [mbid, ...]}"""
if not HAS_ACOUSTID or not ACOUSTID_API_KEY:
return {}
results: Dict[str, List[str]] = {}
for t in hints.tracks:
try:
duration, fp = acoustid.fingerprint_file(str(t.path))
response = acoustid.lookup(ACOUSTID_API_KEY, fp, duration,
meta="recordings releasegroups")
mbids: List[str] = []
for result in response.get("results", []):
if result.get("score", 0) >= 0.90:
for rec in result.get("recordings", []):
mbids.append(rec["id"])
results[str(t.path)] = mbids
except Exception as e:
print(f" ⚠️ AcoustID-Fehler {t.path.name}: {e}", file=sys.stderr)
return results
# ---------------------------------------------------------------------------
# MusicBrainz lookup
# ---------------------------------------------------------------------------
def _mb_search_release(artist: Optional[str], album: Optional[str],
year: Optional[str]) -> Optional[Dict]:
if not HAS_MB or (not artist and not album):
return None
query_parts = []
if album:
query_parts.append(f'release:"{album}"')
if artist:
query_parts.append(f'artist:"{artist}"')
if year:
query_parts.append(f'date:{year}')
query = " AND ".join(query_parts)
try:
_mb_wait()
result = mb.search_releases(query=query, limit=3)
releases = result.get("release-list", [])
if not releases:
return None
# Take highest-score release
best = max(releases, key=lambda r: int(r.get("ext:score", 0)))
score = int(best.get("ext:score", 0))
if score < 70:
return None
return best
except Exception as e:
print(f" ⚠️ MusicBrainz-Suchfehler: {e}", file=sys.stderr)
return None
def _mb_get_release_tracks(release_id: str) -> Optional[List[Dict]]:
if not HAS_MB:
return None
try:
_mb_wait()
result = mb.get_release_by_id(
release_id,
includes=["recordings", "artist-credits", "labels", "release-groups"],
)
return result.get("release")
except Exception as e:
print(f" ⚠️ MusicBrainz-Release-Fehler: {e}", file=sys.stderr)
return None
def _mb_recording_to_release(recording_mbid: str) -> Optional[Dict]:
if not HAS_MB:
return None
try:
_mb_wait()
result = mb.get_recording_by_id(
recording_mbid,
includes=["releases", "artist-credits", "release-groups"],
)
rec = result.get("recording", {})
releases = rec.get("release-list", [])
if releases:
return releases[0]
return None
except Exception as e:
print(f" ⚠️ MusicBrainz-Recording-Fehler: {e}", file=sys.stderr)
return None
# ---------------------------------------------------------------------------
# Discogs fallback
# ---------------------------------------------------------------------------
def _discogs_search(artist: Optional[str], album: Optional[str]) -> Optional[Dict]:
if not HAS_DISCOGS or not DISCOGS_TOKEN:
return None
try:
client = dc.Client("MusicMetadataEnricher/1.0", user_token=DISCOGS_TOKEN)
results = client.search(
album or artist or "",
artist=artist or "",
type="release",
)
if results.count:
r = results[0]
return {
"album": r.title,
"artist": r.artists[0].name if r.artists else None,
"year": str(r.year) if r.year else None,
"genre": r.genres[0] if r.genres else None,
"label": r.labels[0].name if r.labels else None,
"id": r.id,
}
except Exception as e:
print(f" ⚠️ Discogs-Fehler: {e}", file=sys.stderr)
return None
# ---------------------------------------------------------------------------
# Claude API reasoning (optional)
# ---------------------------------------------------------------------------
def _claude_resolve(hints: AlbumHints, partial: Dict) -> Optional[Dict]:
if not HAS_ANTHROPIC or not ANTHROPIC_API_KEY:
return None
try:
client = anthropic.Anthropic(api_key=ANTHROPIC_API_KEY)
tracks_summary = "\n".join(
f" - Track {t.track_number or '?'}: {t.title or t.path.stem}"
+ (f" [{t.artist}]" if t.artist else "")
for t in hints.tracks[:20]
)
prompt = f"""Du bist ein Musikexperte. Analysiere diese Album-Daten und vervollständige die fehlenden Felder.
Verzeichnisname: {hints.album_dir.name}
Bekannte Artist: {hints.dir_artist or partial.get('artist', 'unbekannt')}
Bekannter Albumtitel: {hints.dir_album or partial.get('album', 'unbekannt')}
Jahr: {hints.dir_year or partial.get('year', 'unbekannt')}
Tracklist-Hinweise:
{tracks_summary}
Antworte NUR mit einem JSON-Objekt mit diesen Feldern (null wenn unbekannt):
{{"artist": ..., "album": ..., "albumartist": ..., "year": ..., "genre": ..., "label": ...}}"""
message = client.messages.create(
model="claude-haiku-4-5-20251001",
max_tokens=300,
messages=[{"role": "user", "content": prompt}],
)
import json
text = message.content[0].text.strip()
# Extract JSON from response
json_match = __import__("re").search(r"\{.*\}", text, __import__("re").DOTALL)
if json_match:
return json.loads(json_match.group())
except Exception as e:
print(f" ⚠️ Claude-API-Fehler: {e}", file=sys.stderr)
return None
# ---------------------------------------------------------------------------
# Main resolver
# ---------------------------------------------------------------------------
def resolve(
hints: AlbumHints,
use_fingerprint: bool = True,
use_api: bool = True,
use_claude: bool = True,
) -> AlbumProposal:
confidence = 0.0
sources: List[str] = []
notes: List[str] = []
artist = hints.dir_artist
album = hints.dir_album
year = hints.dir_year
genre: Optional[str] = None
label: Optional[str] = None
release_mbid: Optional[str] = None
mb_tracks: Optional[List] = None
# Collect artist/album from existing tags (majority vote)
tag_artists = [t.existing_tags.get("artist") for t in hints.tracks if t.existing_tags.get("artist")]
tag_albums = [t.existing_tags.get("album") for t in hints.tracks if t.existing_tags.get("album")]
if tag_artists:
from collections import Counter
artist = artist or Counter(tag_artists).most_common(1)[0][0]
if tag_albums:
from collections import Counter
album = album or Counter(tag_albums).most_common(1)[0][0]
# Tag year/genre/label
for t in hints.tracks:
year = year or t.existing_tags.get("date") or t.existing_tags.get("year")
genre = genre or t.existing_tags.get("genre")
label = label or t.existing_tags.get("label") or t.existing_tags.get("organization")
if artist or album:
confidence += 0.05
sources.append("local-hints")
# AcoustID fingerprinting
fp_mbids: Dict[str, List[str]] = {}
if use_fingerprint and use_api and HAS_ACOUSTID and ACOUSTID_API_KEY:
fp_mbids = _fingerprint_tracks(hints)
if fp_mbids:
confidence += 0.20
sources.append("acoustid")
# Try to get release from first matched recording
for mbids in fp_mbids.values():
for mbid in mbids[:1]:
rel = _mb_recording_to_release(mbid)
if rel:
release_mbid = rel.get("id")
confidence += 0.25
sources.append("musicbrainz-fingerprint")
break
if release_mbid:
break
# MusicBrainz text search
if use_api and HAS_MB and not release_mbid:
mb_result = _mb_search_release(artist, album, year)
if mb_result:
release_mbid = mb_result.get("id")
score = int(mb_result.get("ext:score", 0))
confidence += 0.30 * (score / 100)
sources.append("musicbrainz-text")
notes.append(f"MusicBrainz score: {score}")
# Fetch full release data
if use_api and release_mbid:
full_release = _mb_get_release_tracks(release_mbid)
if full_release:
if not artist:
creds = full_release.get("artist-credit", [])
artist = "".join(c.get("artist", {}).get("name", "") + c.get("joinphrase", "")
for c in creds if isinstance(c, dict)).strip() or artist
if not album:
album = full_release.get("title", album)
if not year:
year = full_release.get("date", "")[:4] or None
label_info = full_release.get("label-info-list", [])
if label_info and not label:
label = label_info[0].get("label", {}).get("name") if label_info else None
rg = full_release.get("release-group", {})
if not genre:
genre = (rg.get("primary-type") or "").strip() or None
mb_tracks = []
for medium in full_release.get("medium-list", []):
disc_num = medium.get("position", 1)
for track in medium.get("track-list", []):
mb_tracks.append({
"disc": disc_num,
"number": int(track.get("number", 0) or 0),
"title": track.get("recording", {}).get("title", ""),
"artist": track.get("artist-credit-phrase", ""),
"mbid": track.get("recording", {}).get("id"),
})
# Discogs fallback
if use_api and HAS_DISCOGS and DISCOGS_TOKEN and not release_mbid:
dg = _discogs_search(artist, album)
if dg:
artist = artist or dg.get("artist")
album = album or dg.get("album")
year = year or dg.get("year")
genre = genre or dg.get("genre")
label = label or dg.get("label")
confidence += 0.15
sources.append("discogs")
# Claude API for remaining gaps
partial = {"artist": artist, "album": album, "year": year}
if use_claude and use_api and ANTHROPIC_API_KEY and HAS_ANTHROPIC:
if not artist or not album or confidence < 0.5:
cl = _claude_resolve(hints, partial)
if cl:
artist = artist or cl.get("artist")
album = album or cl.get("album")
year = year or cl.get("year")
genre = genre or cl.get("genre")
label = label or cl.get("label")
confidence += 0.10
sources.append("claude")
# Finalize albumartist
track_artists = [t.artist for t in hints.tracks if t.artist]
from collections import Counter
distinct_artists = set(a for a in track_artists if a)
if len(distinct_artists) >= 3:
albumartist = "Various Artists"
elif track_artists:
albumartist = artist or Counter(track_artists).most_common(1)[0][0]
else:
albumartist = artist or "Unknown Artist"
album = album or hints.album_dir.name.replace("_", " ")
artist = artist or albumartist
confidence = min(confidence, 1.0)
# Build track proposals
track_proposals = _build_track_proposals(hints, mb_tracks, album, artist)
return AlbumProposal(
album_dir=hints.album_dir,
album=album,
albumartist=albumartist,
date=year,
genre=genre,
label=label,
mbid=release_mbid,
cover_path=None,
cover_source=None,
tracks=track_proposals,
confidence=confidence,
sources=sources,
notes=notes,
)
def _build_track_proposals(
hints: AlbumHints,
mb_tracks: Optional[List],
album: str,
album_artist: str,
) -> List[TrackProposal]:
proposals: List[TrackProposal] = []
for th in sorted(hints.tracks, key=lambda t: (t.disc_number or 1, t.track_number or 9999, str(t.path))):
title = th.title
artist = th.artist or album_artist
track_num = th.track_number
disc_num = th.disc_number
# Try to match from MusicBrainz track list
if mb_tracks and track_num:
for mb_t in mb_tracks:
if mb_t["number"] == track_num and mb_t["disc"] == (disc_num or 1):
if mb_t.get("title"):
title = mb_t["title"]
if mb_t.get("artist"):
artist = mb_t["artist"]
break
title = title or th.path.stem
proposals.append(TrackProposal(
path=th.path,
title=title,
artist=artist,
track_number=track_num,
disc_number=disc_num,
mbid=None,
))
return proposals

79
models.py Normal file
View file

@ -0,0 +1,79 @@
from __future__ import annotations
from dataclasses import dataclass, field
from pathlib import Path
from typing import Optional, List, Dict
AUDIO_EXTENSIONS = {
".mp3", ".flac", ".m4a", ".aac", ".ogg", ".opus",
".wav", ".wma", ".aiff", ".ape",
}
IMAGE_EXTENSIONS = {".jpg", ".jpeg", ".png", ".webp", ".bmp", ".gif"}
TRACKLIST_EXTENSIONS = {".txt", ".htm", ".html", ".nfo"}
PLAYLIST_EXTENSIONS = {".m3u", ".m3u8", ".pls"}
@dataclass
class ScannedFile:
path: Path
kind: str # "audio" | "image" | "tracklist" | "playlist" | "other"
@dataclass
class AlbumScan:
album_dir: Path
audio_files: List[Path] = field(default_factory=list)
image_files: List[Path] = field(default_factory=list)
tracklist_files: List[Path] = field(default_factory=list)
other_files: List[Path] = field(default_factory=list)
@dataclass
class TrackHints:
path: Path
track_number: Optional[int] = None
disc_number: Optional[int] = None
title: Optional[str] = None
artist: Optional[str] = None
duration: Optional[float] = None
existing_tags: Dict[str, str] = field(default_factory=dict)
@dataclass
class AlbumHints:
album_dir: Path
dir_artist: Optional[str] = None
dir_album: Optional[str] = None
dir_year: Optional[str] = None
tracklist_text: Optional[str] = None # merged text from all tracklist files
cover_images: List[Path] = field(default_factory=list)
tracks: List[TrackHints] = field(default_factory=list)
@dataclass
class TrackProposal:
path: Path
title: str
artist: str
track_number: Optional[int]
disc_number: Optional[int]
new_filename: Optional[str] = None # only set when --rename is active
mbid: Optional[str] = None
@dataclass
class AlbumProposal:
album_dir: Path
album: str
albumartist: str
date: Optional[str]
genre: Optional[str]
label: Optional[str]
mbid: Optional[str] # MusicBrainz release ID
cover_path: Optional[Path] # resolved local or downloaded cover
cover_source: Optional[str] # "local" | "musicbrainz" | "discogs"
tracks: List[TrackProposal]
confidence: float
sources: List[str] = field(default_factory=list)
notes: List[str] = field(default_factory=list)

269
music_enricher.py Normal file
View file

@ -0,0 +1,269 @@
#!/usr/bin/env python3
"""
music_enricher.py
KI-gestützter Musik-Metadaten-Enricher für Jellyfin-Bibliotheken.
Pipeline pro Album:
Scan HintExtractor MetadataResolver CoverHandler Review Executor
"""
from __future__ import annotations
import argparse
import os
import sys
from pathlib import Path
from typing import Any, Dict, List, Optional
try:
from tqdm import tqdm
HAS_TQDM = True
except ImportError:
HAS_TQDM = False
from models import AlbumProposal
from scanner import scan_album, collect_album_dirs
from hint_extractor import extract_hints
from metadata_resolver import resolve
from cover_handler import resolve_cover
from executor import execute_album, write_report
def maybe_tqdm(iterable, show: bool, **kwargs):
return tqdm(iterable, **kwargs) if show else iterable
# ---------------------------------------------------------------------------
# Review / Display
# ---------------------------------------------------------------------------
def _print_proposal(proposal: AlbumProposal) -> None:
conf_bar = "" * int(proposal.confidence * 10) + "" * (10 - int(proposal.confidence * 10))
print(f"\n{'' * 60}")
print(f"💿 {proposal.album_dir.name}")
print(f" Album: {proposal.album}")
print(f" Artist: {proposal.albumartist}")
print(f" Jahr: {proposal.date or ''}")
print(f" Genre: {proposal.genre or ''}")
print(f" Label: {proposal.label or ''}")
print(f" Cover: {proposal.cover_source or ''} ({proposal.cover_path.name if proposal.cover_path else 'keins'})")
print(f" Konfidenz: [{conf_bar}] {proposal.confidence:.0%} Quellen: {', '.join(proposal.sources) or ''}")
if proposal.notes:
for n in proposal.notes:
print(f" {n}")
print(f" Tracks ({len(proposal.tracks)}):")
for tp in proposal.tracks[:8]:
tn = f"{tp.disc_number}-{tp.track_number:02d}" if tp.disc_number and tp.disc_number > 1 else (
f"{tp.track_number:02d}" if tp.track_number else "??")
print(f" {tn} {tp.artist} {tp.title}")
if len(proposal.tracks) > 8:
print(f" … und {len(proposal.tracks) - 8} weitere")
def _interactive_review(proposal: AlbumProposal) -> bool:
"""Returns True if user accepts the proposal."""
_print_proposal(proposal)
while True:
answer = input("\n [Enter] Akzeptieren [s] Überspringen [q] Abbrechen: ").strip().lower()
if answer in ("", "j", "y"):
return True
if answer == "s":
return False
if answer == "q":
sys.exit(0)
# ---------------------------------------------------------------------------
# Main pipeline
# ---------------------------------------------------------------------------
def process_album(
album_dir: Path,
args: argparse.Namespace,
report_data: List[Dict[str, Any]],
) -> Dict[str, int]:
stats = {"tags_written": 0, "covers_embedded": 0, "files_renamed": 0,
"errors": 0, "skipped": 0}
try:
scan = scan_album(album_dir)
if not scan.audio_files:
stats["skipped"] += 1
return stats
hints = extract_hints(scan)
proposal = resolve(
hints,
use_fingerprint=not args.no_fingerprint,
use_api=not args.no_api,
use_claude=bool(os.getenv("ANTHROPIC_API_KEY")),
)
# Cover art
cover_path, cover_source = resolve_cover(
hints.cover_images,
proposal.mbid,
album_dir,
)
if cover_path and not args.no_cover:
proposal.cover_path = cover_path
proposal.cover_source = cover_source
# Set proposed filenames if --rename
if args.rename:
from executor import _proposed_filename
for tp in proposal.tracks:
tp.new_filename = _proposed_filename(tp, tp.path.suffix)
# Review step
if args.dry_run:
_print_proposal(proposal)
for tp in proposal.tracks:
report_data.append({
"status": "dry-run",
"album_dir": str(album_dir.name),
"track_path": str(tp.path),
"old_title": tp.path.stem,
"new_title": tp.title,
"old_artist": "",
"new_artist": tp.artist,
"album": proposal.album,
"albumartist": proposal.albumartist,
"date": proposal.date or "",
"genre": proposal.genre or "",
"label": proposal.label or "",
"track_number": tp.track_number or "",
"disc_number": tp.disc_number or "",
"cover_embedded": False,
"renamed_to": tp.new_filename or "",
"confidence": f"{proposal.confidence:.2f}",
"sources": ", ".join(proposal.sources),
})
return stats
accepted = True
if not args.auto:
accepted = _interactive_review(proposal)
elif args.auto and proposal.confidence < args.confidence:
print(f" ⏭️ Konfidenz {proposal.confidence:.0%} < {args.confidence:.0%} → übersprungen: {album_dir.name}")
stats["skipped"] += 1
return stats
else:
_print_proposal(proposal)
if not accepted:
stats["skipped"] += 1
return stats
album_stats = execute_album(
proposal=proposal,
backup_dir=args.backup,
do_rename=args.rename,
embed_cover_art=args.embed_cover,
dry_run=False,
report_data=report_data,
)
for k, v in album_stats.items():
stats[k] = stats.get(k, 0) + v
except Exception as e:
stats["errors"] += 1
print(f" ❌ Fehler in {album_dir.name}: {e}", file=sys.stderr)
import traceback
traceback.print_exc(file=sys.stderr)
return stats
def main() -> None:
parser = argparse.ArgumentParser(
description="KI-gestützter Musik-Metadaten-Enricher für Jellyfin",
formatter_class=argparse.RawTextHelpFormatter,
)
parser.add_argument("paths", nargs="*",
help="Root-Verzeichnisse (direkte Unterordner = Alben)")
parser.add_argument("--album", type=Path,
help="Einzelnes Album-Verzeichnis verarbeiten")
parser.add_argument("--dry-run", action="store_true",
help="Vorschläge anzeigen, nichts schreiben")
parser.add_argument("--auto", action="store_true",
help="Kein interaktiver Review-Schritt")
parser.add_argument("--confidence", type=float, default=0.85,
help="Min-Konfidenz für --auto (default: 0.85)")
parser.add_argument("--rename", action="store_true",
help="Dateien nach Schema umbenennen: TT - Artist - Titel.ext")
parser.add_argument("--embed-cover", action="store_true",
help="Cover-Art in Audiodatei einbetten")
parser.add_argument("--backup", type=Path,
help="Backup-Verzeichnis vor Änderungen")
parser.add_argument("--report", type=Path,
help="CSV-Report der Änderungen")
parser.add_argument("--no-fingerprint", action="store_true",
help="AcoustID-Fingerprinting überspringen")
parser.add_argument("--no-api", action="store_true",
help="Keine externen API-Calls")
parser.add_argument("--no-cover", action="store_true",
help="Kein Cover-Art-Download")
parser.add_argument("--no-tqdm", action="store_true",
help="Fortschrittsanzeige deaktivieren")
args = parser.parse_args()
if not args.album and not args.paths:
parser.error("Mindestens ein Pfad oder --album erforderlich.")
show_progress = HAS_TQDM and not args.no_tqdm and args.auto
report_data: List[Dict[str, Any]] = []
totals: Dict[str, int] = {
"albums": 0, "skipped": 0, "tags_written": 0,
"covers_embedded": 0, "files_renamed": 0, "errors": 0,
}
# Collect album directories
album_dirs: List[Path] = []
if args.album:
album_dirs.append(args.album.expanduser().resolve())
for raw in args.paths:
root = Path(raw).expanduser().resolve()
if not root.is_dir():
print(f"⚠️ Kein Verzeichnis: {root}")
continue
album_dirs.extend(collect_album_dirs(root))
if not album_dirs:
print("⚠️ Keine Album-Verzeichnisse gefunden.")
sys.exit(1)
print(f"🎵 {len(album_dirs)} Album-Verzeichnisse gefunden.")
if os.getenv("ANTHROPIC_API_KEY"):
print("🤖 Claude API aktiv.")
if not args.no_api:
print("🔍 MusicBrainz-Lookup aktiv.")
if args.dry_run:
print("🧪 DRY-RUN — nichts wird geschrieben.")
for album_dir in maybe_tqdm(album_dirs, show_progress,
desc="Alben", unit="album", dynamic_ncols=True):
stats = process_album(album_dir, args, report_data)
totals["albums"] += 1
for k in ("skipped", "tags_written", "covers_embedded", "files_renamed", "errors"):
totals[k] += stats.get(k, 0)
if args.report and report_data:
write_report(report_data, args.report)
print(f"\n{'=' * 50}")
print("✅ Zusammenfassung:")
print(f" 💿 Alben verarbeitet: {totals['albums']}")
print(f" ⏭️ Übersprungen: {totals['skipped']}")
print(f" 🏷️ Tags geschrieben: {totals['tags_written']}")
print(f" 🖼️ Cover eingebettet: {totals['covers_embedded']}")
print(f" 📝 Dateien umbenannt: {totals['files_renamed']}")
print(f" ❌ Fehler: {totals['errors']}")
if args.dry_run:
print(" 🧪 Modus: DRY-RUN")
print("=" * 50)
if __name__ == "__main__":
main()

59
scanner.py Normal file
View file

@ -0,0 +1,59 @@
from __future__ import annotations
import sys
from pathlib import Path
from typing import List
from models import AlbumScan, AUDIO_EXTENSIONS, IMAGE_EXTENSIONS, TRACKLIST_EXTENSIONS
def _is_hidden(name: str) -> bool:
return name.startswith(".") or name.startswith("_")
def scan_album(album_dir: Path) -> AlbumScan:
result = AlbumScan(album_dir=album_dir)
for dirpath, dirnames, filenames in album_dir.walk() if hasattr(album_dir, "walk") else _os_walk(album_dir):
dirnames[:] = [d for d in dirnames if not _is_hidden(d)]
current = Path(dirpath) if isinstance(dirpath, str) else dirpath
for name in filenames:
if _is_hidden(name):
continue
p = current / name
ext = p.suffix.lower()
if ext in AUDIO_EXTENSIONS:
result.audio_files.append(p)
elif ext in IMAGE_EXTENSIONS:
result.image_files.append(p)
elif ext in TRACKLIST_EXTENSIONS:
result.tracklist_files.append(p)
else:
result.other_files.append(p)
result.audio_files.sort()
result.image_files.sort()
result.tracklist_files.sort()
return result
def _os_walk(album_dir: Path):
import os
return os.walk(
album_dir,
followlinks=False,
onerror=lambda e: print(f"⚠️ Scan-Fehler: {e}", file=sys.stderr),
)
def collect_album_dirs(root: Path) -> List[Path]:
dirs: List[Path] = []
try:
for item in sorted(root.iterdir()):
if item.is_dir() and not _is_hidden(item.name):
dirs.append(item)
except (PermissionError, OSError) as e:
print(f"⚠️ Lesefehler {root}: {e}", file=sys.stderr)
return dirs

272
test_suite_enricher.py Normal file
View file

@ -0,0 +1,272 @@
#!/usr/bin/env python3
"""test_suite_enricher.py — Unit- und Integrationstests für music_enricher."""
from __future__ import annotations
import sys
import tempfile
import traceback
from pathlib import Path
from typing import Callable
sys.path.insert(0, str(Path(__file__).parent))
from models import AlbumScan, TrackHints, AlbumHints
RESULTS: list[dict] = []
def record(test_id: str, passed: bool, detail: str = "") -> None:
RESULTS.append({"id": test_id, "status": "PASS" if passed else "FAIL", "detail": detail})
def run_case(test_id: str, fn: Callable[[], str]) -> None:
try:
detail = fn()
record(test_id, True, detail)
except Exception:
record(test_id, False, traceback.format_exc()[:300])
# ---------------------------------------------------------------------------
# hint_extractor Tests
# ---------------------------------------------------------------------------
def test_parse_dirname_artist_album() -> str:
from hint_extractor import _parse_dirname
artist, album, year = _parse_dirname("Pink_Floyd_-_The_Wall")
assert artist and "Pink" in artist, f"artist: {artist}"
assert album and "Wall" in album, f"album: {album}"
return f"artist={artist!r}, album={album!r}"
def test_parse_dirname_with_year() -> str:
from hint_extractor import _parse_dirname
artist, album, year = _parse_dirname("Abba_-_Greatest_Hits_1992")
assert year == "1992", f"year: {year}"
return f"year={year}"
def test_parse_dirname_album_only() -> str:
from hint_extractor import _parse_dirname
artist, album, year = _parse_dirname("Beethoven_Complete_Edition")
assert album is not None, "album should not be None"
return f"album={album!r}"
def test_parse_filename_track_artist_title() -> str:
from hint_extractor import _parse_filename
r = _parse_filename("07 - ABBA - Dancing Queen")
assert r.get("track") == "07", f"track: {r}"
assert "ABBA" in r.get("artist", ""), f"artist: {r}"
assert "Dancing" in r.get("title", ""), f"title: {r}"
return str(r)
def test_parse_filename_disc_track_title() -> str:
from hint_extractor import _parse_filename
r = _parse_filename("2-07 - Bach - Toccata")
assert r.get("disc") == "2", f"disc: {r}"
assert r.get("track") == "07", f"track: {r}"
return str(r)
def test_parse_filename_track_title() -> str:
from hint_extractor import _parse_filename
r = _parse_filename("01 - Dancing Queen")
assert r.get("track") == "01", f"track: {r}"
assert "Dancing" in r.get("title", ""), f"title: {r}"
return str(r)
def test_parse_filename_artist_title() -> str:
from hint_extractor import _parse_filename
r = _parse_filename("Miles Davis - So What")
assert "Miles" in r.get("artist", ""), f"artist: {r}"
assert "What" in r.get("title", ""), f"title: {r}"
return str(r)
def test_parse_tracklist_numbered() -> str:
from hint_extractor import _parse_tracklist
text = "1. Dancing Queen\n2. Waterloo\n3. Fernando"
tracks = _parse_tracklist(text)
assert len(tracks) == 3, f"count: {len(tracks)}"
assert tracks[0]["title"] == "Dancing Queen", f"title: {tracks[0]}"
return f"{len(tracks)} tracks parsed"
def test_parse_tracklist_with_duration() -> str:
from hint_extractor import _parse_tracklist
text = "1-1 Toccata And Fugue 9:17\n1-2 Heartbeat 2:19\n2-1 Finale 5:00"
tracks = _parse_tracklist(text)
assert len(tracks) >= 2, f"count: {len(tracks)}"
assert tracks[0]["disc"] == "1", f"disc: {tracks[0]}"
return f"{len(tracks)} tracks parsed"
def test_parse_tracklist_with_disc_sections() -> str:
from hint_extractor import _parse_tracklist
text = "CD 1\n1. Track A\n2. Track B\nCD 2\n1. Track C"
tracks = _parse_tracklist(text)
disc2 = [t for t in tracks if t.get("disc") == "2"]
assert len(disc2) >= 1, f"disc2: {disc2}"
return f"{len(tracks)} total, {len(disc2)} on disc 2"
# ---------------------------------------------------------------------------
# Scanner Tests
# ---------------------------------------------------------------------------
def test_scanner_classifies_files() -> str:
from scanner import scan_album
with tempfile.TemporaryDirectory() as tmpdir:
root = Path(tmpdir) / "TestAlbum"
root.mkdir()
(root / "01 - Song.mp3").write_bytes(b"\x00" * 100)
(root / "02 - Song.flac").write_bytes(b"\x00" * 100)
(root / "front.jpg").write_bytes(b"\xff\xd8" + b"\x00" * 100)
(root / "tracklist.txt").write_text("1. Track One\n2. Track Two")
(root / "notes.pdf").write_bytes(b"\x00" * 50)
scan = scan_album(root)
assert len(scan.audio_files) == 2, f"audio: {scan.audio_files}"
assert len(scan.image_files) == 1, f"images: {scan.image_files}"
assert len(scan.tracklist_files) == 1, f"tracklists: {scan.tracklist_files}"
return "scan OK: 2 audio, 1 image, 1 tracklist"
def test_scanner_ignores_hidden() -> str:
from scanner import scan_album
with tempfile.TemporaryDirectory() as tmpdir:
root = Path(tmpdir) / "Album"
root.mkdir()
(root / "song.mp3").write_bytes(b"\x00" * 100)
(root / ".hidden.mp3").write_bytes(b"\x00" * 100)
(root / "_trash.mp3").write_bytes(b"\x00" * 100)
scan = scan_album(root)
assert len(scan.audio_files) == 1, f"should ignore hidden: {scan.audio_files}"
return "hidden files correctly ignored"
# ---------------------------------------------------------------------------
# extract_hints integration
# ---------------------------------------------------------------------------
def test_extract_hints_from_scan() -> str:
from scanner import scan_album
from hint_extractor import extract_hints
with tempfile.TemporaryDirectory() as tmpdir:
root = Path(tmpdir) / "ABBA_-_Greatest_Hits"
root.mkdir()
(root / "01 - ABBA - Dancing Queen.mp3").write_bytes(b"\x00" * 1024)
(root / "02 - ABBA - Waterloo.mp3").write_bytes(b"\x00" * 1024)
(root / "tracklist.txt").write_text("1. Dancing Queen\n2. Waterloo\n")
scan = scan_album(root)
hints = extract_hints(scan)
assert hints.dir_album is not None, "album hint missing"
assert len(hints.tracks) == 2, f"tracks: {len(hints.tracks)}"
assert hints.tracklist_text is not None, "tracklist not read"
return f"hints OK: album={hints.dir_album!r}, {len(hints.tracks)} tracks"
def test_extract_hints_multi_disc() -> str:
from scanner import scan_album
from hint_extractor import extract_hints
with tempfile.TemporaryDirectory() as tmpdir:
root = Path(tmpdir) / "Bach_Complete"
(root / "CD1").mkdir(parents=True)
(root / "CD2").mkdir()
(root / "CD1" / "01 - Toccata.mp3").write_bytes(b"\x00" * 1024)
(root / "CD2" / "01 - Fugue.mp3").write_bytes(b"\x00" * 1024)
scan = scan_album(root)
hints = extract_hints(scan)
disc_nums = {t.disc_number for t in hints.tracks if t.disc_number}
assert 1 in disc_nums, f"disc 1 missing: {disc_nums}"
assert 2 in disc_nums, f"disc 2 missing: {disc_nums}"
return f"disc numbers detected: {disc_nums}"
# ---------------------------------------------------------------------------
# executor Tests
# ---------------------------------------------------------------------------
def test_proposed_filename_single_disc() -> str:
from executor import _proposed_filename
from models import TrackProposal
from pathlib import Path
tp = TrackProposal(path=Path("dummy.mp3"), title="Dancing Queen",
artist="ABBA", track_number=1, disc_number=None)
name = _proposed_filename(tp, ".mp3")
assert name == "01 - ABBA - Dancing Queen.mp3", f"got: {name!r}"
return name
def test_proposed_filename_multi_disc() -> str:
from executor import _proposed_filename
from models import TrackProposal
from pathlib import Path
tp = TrackProposal(path=Path("dummy.flac"), title="Toccata",
artist="Bach", track_number=7, disc_number=2)
name = _proposed_filename(tp, ".flac")
assert name == "2-07 - Bach - Toccata.flac", f"got: {name!r}"
return name
def test_proposed_filename_sanitizes_chars() -> str:
from executor import _proposed_filename
from models import TrackProposal
from pathlib import Path
tp = TrackProposal(path=Path("x.mp3"), title='Track: "Live" / Today',
artist="Artist?", track_number=3, disc_number=None)
name = _proposed_filename(tp, ".mp3")
assert "/" not in name and ":" not in name, f"unsafe chars in: {name!r}"
return name
# ---------------------------------------------------------------------------
# Runner
# ---------------------------------------------------------------------------
def main() -> None:
print("🧪 Starte Music Metadata Enricher Tests...")
cases = [
("UNIT_01_parse_dirname_artist_album", test_parse_dirname_artist_album),
("UNIT_02_parse_dirname_with_year", test_parse_dirname_with_year),
("UNIT_03_parse_dirname_album_only", test_parse_dirname_album_only),
("UNIT_04_parse_filename_track_artist_title", test_parse_filename_track_artist_title),
("UNIT_05_parse_filename_disc_track_title", test_parse_filename_disc_track_title),
("UNIT_06_parse_filename_track_title", test_parse_filename_track_title),
("UNIT_07_parse_filename_artist_title", test_parse_filename_artist_title),
("UNIT_08_parse_tracklist_numbered", test_parse_tracklist_numbered),
("UNIT_09_parse_tracklist_with_duration", test_parse_tracklist_with_duration),
("UNIT_10_parse_tracklist_disc_sections", test_parse_tracklist_with_disc_sections),
("UNIT_11_scanner_classifies_files", test_scanner_classifies_files),
("UNIT_12_scanner_ignores_hidden", test_scanner_ignores_hidden),
("UNIT_13_extract_hints_from_scan", test_extract_hints_from_scan),
("UNIT_14_extract_hints_multi_disc", test_extract_hints_multi_disc),
("UNIT_15_proposed_filename_single_disc", test_proposed_filename_single_disc),
("UNIT_16_proposed_filename_multi_disc", test_proposed_filename_multi_disc),
("UNIT_17_proposed_filename_sanitizes_chars", test_proposed_filename_sanitizes_chars),
]
for test_id, fn in cases:
run_case(test_id, fn)
print("=" * 70)
for r in RESULTS:
icon = "" if r["status"] == "PASS" else ""
detail = r["detail"][:100] + "..." if len(r["detail"]) > 100 else r["detail"]
print(f"{icon} [{r['status']}] {r['id']} {detail}")
print("=" * 70)
passed = sum(1 for r in RESULTS if r["status"] == "PASS")
total = len(RESULTS)
print(f"📊 {passed}/{total} Tests erfolgreich")
sys.exit(0 if passed == total else 1)
if __name__ == "__main__":
main()