Compare commits

...

11 commits

Author SHA1 Message Date
b6abfae16c Add YouTube ID detection and metadata lookup via yt-dlp
- Extract 11-char YouTube video IDs from audio filenames
- Fetch title, uploader, chapters via yt-dlp (--dump-json)
- Use chapters as tracklist when no .txt tracklist is available
- Store yt_title / yt_uploader in AlbumHints for LLM prompt context
- Fall back to YouTube video title as track title for single-file albums

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-29 05:42:03 +02:00
888464b4d0 Regenerate M3U playlist after rename with correct order and durations
_update_m3u(): writes #EXTM3U + #EXTINF:seconds,Artist - Title + filename
per track, in disc/track order (same order as the renamed files).
Duration is read from mutagen; -1 if unavailable.

execute_album(): after renaming, finds existing *.m3u / *.m3u8 in the
album directory and overwrites it. Only triggered when files_renamed > 0
and a playlist file exists — never creates a new one from scratch.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-29 05:42:03 +02:00
40a2ef3fb6 Add OCR fallback via Ollama Vision for albums without tracklist text
hint_extractor: _ocr_back_cover() sends back/inlay images to Ollama Vision
  when no tracklist .txt/.htm/.nfo is present. Model priority:
  qwen3-vl:latest → minicpm-v:latest → deepseek-ocr:latest (configurable
  via OLLAMA_OCR_MODEL env var). Timeout 180s. OCR text is fed into the
  same _parse_tracklist() pipeline as regular text files.

music_enricher: extract_hints(use_ocr=not args.no_api) — OCR is skipped
  with --no-api to allow fully offline/fast runs.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-29 05:42:03 +02:00
28f716f8f2 Fix disc numbering consistency and false tracklist matches
executor: disc=1 now generates '1-01' prefix (same as disc=2 → '2-01'),
  so multi-disc albums have consistent D-TT scheme throughout.
  Single-disc tracks without disc tag stay as plain 'TT'.

hint_extractor: tracklist pattern 2 now requires '.' ')' or ':' as separator
  (not bare whitespace) — prevents false-positive matches like
  '2 x CD, Compilation, Remastered' being parsed as track 2.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-29 05:42:03 +02:00
d1391fc36a Robust tracklist matching: fuzzy titles, catalog numbers, correct disc/track
hint_extractor:
- _norm_for_match(): strips all non-alnum for punctuation-agnostic comparison
- _catalog_key(): extracts BWV/Op./K./HWV/... catalog number for matching
  (fixes abbreviated filenames like "Fantasia_Cm_BWV_562" vs "Fantasia In C Minor, BWV 562")
- Matching priority: exact number+disc → exact title → fuzzy title → catalog number
- Tracklist disc+track OVERRIDE M3U position when a match is found
  (M3U is only used as last fallback; fixes wrong alphabetical ordering)

metadata_resolver:
- LLM prompt now defines artist/albumartist roles explicitly
  (artist = composer for classical; albumartist = performer/interpreter)
- LLM albumartist can override dir_artist when confidence < 0.4
- _build_track_proposals: when track artist == albumartist (performer from filename),
  composer (album-level artist) is used as track artist instead
- Tracklist header (first lines before tracks) included in LLM prompt
  for label/year/album-title discovery
- import re added (was missing)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-29 05:42:03 +02:00
5011cef4db Underscore filename schema, classical detection, NameToUnix post-processing
Pop schema:      TT_-_Artist_-_Title.ext
Classical schema: TT_-_Performer_-_Komponist_-_Werk[-_Orchester_Dirigent].ext
  triggered when albumartist ≠ track artist (pianist vs composer)

All spaces in names → underscores; separator _-_ between parts.
Missing parts (orchestra, conductor) are omitted.

models.py: added conductor/orchestra optional fields to TrackProposal.
executor.py: sanitize_dir_names() tries NameToUnix first, falls back to detox.
  Called after all renames in a directory are complete.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-29 05:42:03 +02:00
8bd48cf166 Include albumartist in filename; remove Claude API from LLM chain
Filename schema now: TT - AlbumArtist - TrackArtist - Title when albumartist
differs from track artist (e.g. pianist vs. composer). Identical artist → old
two-part format unchanged.

metadata_resolver: removed Claude API fallback entirely from _claude_resolve.
Chain is now Ollama (local, free) → OpenRouter (DeepSeek V3, cheap) only.

music_enricher: updated status line and use_claude flag accordingly.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-29 05:42:03 +02:00
460b92aab3 Fix Invalid ID3TimeStamp error when writing date tags
Strip non-timestamp characters (BOM, invisible chars) from date/year values
both when reading existing tags in metadata_resolver and when writing in
executor. Also harden the EasyID3 except block to not wipe existing tags
when adding a missing ID3 header, and add per-field try/except in MP3 tag
writing so one bad field doesn't abort the entire track.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-29 05:42:03 +02:00
d91eb36007 fix: korrekte Track-Nummerierung, Scanner-Rekursion, M3U-Reihenfolge
scanner: nicht in Unterordner wenn Root Audio-Dateien enthält (verhindert
  Doppel-Scan bei versehentlichen Unterordner-Kopien); nur Disc-Ordner
  (CD1, Disc 2…) werden bei Multi-CD-Alben rekursiert.

hint_extractor: M3U/Playlist-Dateien als Track-Reihenfolge-Quelle; BOM-
  Bereinigung; Tracklist-Matching auch per Titel (nicht nur per Nummer);
  tracknumber=0 wird als 'keine Nummer' gewertet.

metadata_resolver: sequenzielle Fallback-Nummerierung (1,2,3…) für Tracks
  ohne Tracknummer — verhindert '00'-Präfix beim --rename; dir_artist hat
  Vorrang vor 'Various Artists'-Heuristik; LLM darf bei Konfidenz <0.3
  auch bestehende Werte korrigieren (Tippfehler im Verzeichnisnamen).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-29 05:42:03 +02:00
c205fa8943 feat: Ollama + OpenRouter als LLM-Reasoning-Backends
_claude_resolve() nutzt jetzt Ollama lokal (kostenlos, RTX 3090) als
erste Wahl, dann OpenRouter/DeepSeek V3 (sehr günstig) und zuletzt
Claude API. Neue ENV-Variablen: OPENROUTER_API_KEY, OLLAMA_RESOLVE_MODEL.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-29 05:42:03 +02:00
f7cf520dbe Initial implementation of Music Metadata Enricher
AI-powered per-album pipeline: scan → local hints → MusicBrainz/Discogs/Claude
resolve → cover art → interactive or auto review → tag write + rename + report.
All external dependencies optional; 17/17 unit tests passing.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-29 05:42:03 +02:00
8 changed files with 2425 additions and 0 deletions

171
cover_handler.py Normal file
View file

@ -0,0 +1,171 @@
from __future__ import annotations
import sys
import tempfile
import time
from pathlib import Path
from typing import Optional, List
try:
from PIL import Image
HAS_PIL = True
except ImportError:
HAS_PIL = False
try:
import requests
HAS_REQUESTS = True
except ImportError:
HAS_REQUESTS = False
try:
import musicbrainzngs as mb
HAS_MB = True
except ImportError:
HAS_MB = False
try:
from mutagen.id3 import ID3, APIC, error as ID3Error
from mutagen.flac import FLAC, Picture
from mutagen.mp4 import MP4, MP4Cover
HAS_MUTAGEN = True
except ImportError:
HAS_MUTAGEN = False
_MIN_COVER_SIZE = 200 # pixels
def _image_ok(path: Path) -> bool:
if not HAS_PIL:
return path.stat().st_size > 5000
try:
with Image.open(path) as img:
w, h = img.size
return w >= _MIN_COVER_SIZE and h >= _MIN_COVER_SIZE
except Exception:
return False
def find_local_cover(image_files: List[Path]) -> Optional[Path]:
priority = ("front", "folder", "cover", "album")
# Sort by priority keyword, then size descending
def key(p: Path):
name = p.name.lower()
score = next((i for i, kw in enumerate(priority) if kw in name), len(priority))
size = p.stat().st_size if p.exists() else 0
return (score, -size)
for p in sorted(image_files, key=key):
if _image_ok(p):
return p
return None
def _mb_cover_url(release_mbid: str) -> Optional[str]:
url = f"https://coverartarchive.org/release/{release_mbid}/front"
if not HAS_REQUESTS:
return None
try:
r = requests.head(url, timeout=5, allow_redirects=True)
if r.status_code == 200:
return url
except Exception:
pass
return None
def download_cover(release_mbid: Optional[str], dest_dir: Path) -> Optional[Path]:
if not release_mbid or not HAS_REQUESTS:
return None
url = _mb_cover_url(release_mbid)
if not url:
return None
try:
r = requests.get(url, timeout=15)
if r.status_code == 200:
ext = ".jpg"
ct = r.headers.get("content-type", "")
if "png" in ct:
ext = ".png"
dest = dest_dir / f"_cover_download{ext}"
dest.write_bytes(r.content)
if _image_ok(dest):
return dest
dest.unlink(missing_ok=True)
except Exception as e:
print(f" ⚠️ Cover-Download-Fehler: {e}", file=sys.stderr)
return None
def embed_cover(audio_path: Path, cover_path: Path) -> bool:
if not HAS_MUTAGEN:
return False
try:
img_data = cover_path.read_bytes()
mime = "image/jpeg" if cover_path.suffix.lower() in (".jpg", ".jpeg") else "image/png"
ext = audio_path.suffix.lower()
if ext == ".mp3":
try:
tags = ID3(str(audio_path))
except ID3Error:
tags = ID3()
tags.delall("APIC")
tags.add(APIC(encoding=3, mime=mime, type=3, desc="Cover", data=img_data))
tags.save(str(audio_path), v2_version=4)
return True
elif ext == ".flac":
audio = FLAC(str(audio_path))
audio.clear_pictures()
pic = Picture()
pic.type = 3
pic.mime = mime
pic.desc = "Cover"
pic.data = img_data
audio.add_picture(pic)
audio.save()
return True
elif ext == ".m4a":
audio = MP4(str(audio_path))
fmt = MP4Cover.FORMAT_JPEG if mime == "image/jpeg" else MP4Cover.FORMAT_PNG
audio.tags["covr"] = [MP4Cover(img_data, imageformat=fmt)]
audio.save()
return True
else:
# Generic mutagen fallback
from mutagen import File as MutagenFile
audio = MutagenFile(str(audio_path), easy=False)
if audio is not None:
if audio.tags is None:
audio.add_tags()
if hasattr(audio.tags, "add"):
audio.tags.add(
APIC(encoding=3, mime=mime, type=3, desc="Cover", data=img_data)
)
audio.save()
return True
except Exception as e:
print(f" ⚠️ Cover-Einbettungsfehler {audio_path.name}: {e}", file=sys.stderr)
return False
def resolve_cover(
image_files: List[Path],
release_mbid: Optional[str],
album_dir: Path,
) -> tuple[Optional[Path], Optional[str]]:
"""Returns (cover_path, source_label)."""
local = find_local_cover(image_files)
if local:
return local, "local"
if release_mbid:
downloaded = download_cover(release_mbid, album_dir)
if downloaded:
return downloaded, "musicbrainz"
return None, None

368
executor.py Normal file
View file

@ -0,0 +1,368 @@
from __future__ import annotations
import csv
import re
import shutil
import subprocess
import sys
from pathlib import Path
from typing import Optional, List, Dict, Any
from models import AlbumProposal, TrackProposal
try:
from mutagen import File as MutagenFile
from mutagen.easyid3 import EasyID3
from mutagen.flac import FLAC
from mutagen.mp4 import MP4, MP4Tags
HAS_MUTAGEN = True
except ImportError:
HAS_MUTAGEN = False
from cover_handler import embed_cover
_SAFE_RE = re.compile(r'[<>:"/\\|?*\x00-\x1f]')
_CLASSICAL_GENRES = re.compile(
r"(?i)class|baroque|romantic|renaissance|opera|symphony|chamber|concerto|sonata|oratorio"
)
REPORT_FIELDS = [
"status", "album_dir", "track_path",
"old_title", "new_title",
"old_artist", "new_artist",
"album", "albumartist", "date", "genre", "label",
"track_number", "disc_number",
"cover_embedded", "renamed_to",
"confidence", "sources",
]
def _safe_name(s: str) -> str:
"""Filesystem-safe name: illegal chars → '_', spaces → '_'."""
s = _SAFE_RE.sub("_", s)
return re.sub(r"\s+", "_", s).strip("._-")
def _is_classical(albumartist: str, track_artist: str, genre: str) -> bool:
"""
Classical schema applies when performer (albumartist) composer (track_artist),
which covers both 'real' classical music and jazz-on-classical-themes albums.
Genre keyword matching is used as additional signal but not required.
"""
aa = (albumartist or "").casefold().strip()
ta = (track_artist or "").casefold().strip()
if not aa or aa in ("various artists", "unknown artist", "unknown"):
return False
if aa == ta:
return False
return True # performer ≠ composer → classical naming
def _proposed_filename(
proposal: TrackProposal,
ext: str,
albumartist: str = "",
genre: str = "",
) -> str:
"""
Pop/Default: TT_-_Artist_-_Titel.ext
Klassik: TT_-_Performer_-_Komponist_-_Titel[-_Orchester_Dirigent].ext
Separator zwischen Teilen: _-_
Leerzeichen innerhalb von Namen: _
Fehlende Teile werden weggelassen.
"""
tn = f"{proposal.track_number:02d}" if proposal.track_number else "00"
# Wenn disc_number gesetzt (auch disc=1): immer "D-TT" — konsistent über alle CDs.
# disc=None (Einzel-CD ohne Tag): nur "TT".
disc_prefix = f"{proposal.disc_number}-" if proposal.disc_number else ""
prefix = f"{disc_prefix}{tn}"
track_artist = _safe_name(proposal.artist or "Unknown")
aa = _safe_name(albumartist)
title = _safe_name(proposal.title or "Unknown")
if _is_classical(aa, track_artist, genre):
# Klassik-Schema: Performer _-_ Komponist _-_ Werk [_-_ Orchester,Dirigent]
parts = [prefix, aa, track_artist, title]
# Orchester und Dirigent anhängen wenn vorhanden
extra = "_".join(filter(None, [
_safe_name(proposal.orchestra or ""),
_safe_name(proposal.conductor or ""),
]))
if extra:
parts.append(extra)
return "_-_".join(parts) + ext
else:
# Pop/Default-Schema: Tracknummer _-_ Artist _-_ Titel
return f"{prefix}_-_{track_artist}_-_{title}{ext}"
def backup_file(path: Path, backup_dir: Path) -> bool:
try:
backup_dir.mkdir(parents=True, exist_ok=True)
rel = path.parent.name + "__" + path.name
dest = backup_dir / rel
if not dest.exists():
shutil.copy2(path, dest)
return True
except Exception as e:
print(f" ⚠️ Backup-Fehler {path.name}: {e}", file=sys.stderr)
return False
def write_tags(path: Path, proposal: TrackProposal, album_proposal: AlbumProposal) -> bool:
if not HAS_MUTAGEN:
return False
ext = path.suffix.lower()
tags_to_write = {
"title": proposal.title or "",
"artist": proposal.artist or "",
"album": album_proposal.album or "",
"albumartist": album_proposal.albumartist or "",
}
if proposal.track_number:
total = len(album_proposal.tracks)
tags_to_write["tracknumber"] = f"{proposal.track_number}/{total}"
if proposal.disc_number:
tags_to_write["discnumber"] = str(proposal.disc_number)
if album_proposal.date:
# Strip everything except valid ID3 timestamp characters to prevent ID3TimeStamp errors
date_clean = re.sub(r"[^\d\-T:+Z]", "", str(album_proposal.date)).strip()
if date_clean:
tags_to_write["date"] = date_clean
if album_proposal.genre:
tags_to_write["genre"] = album_proposal.genre
if album_proposal.label:
tags_to_write["organization"] = album_proposal.label
try:
if ext == ".mp3":
try:
audio = EasyID3(str(path))
except Exception:
# File has no ID3 header — add one without wiping audio data
from mutagen.id3 import ID3NoHeaderError
try:
from mutagen.mp3 import MP3
full = MP3(str(path))
full.tags = None
full.add_tags()
full.save(str(path), v2_version=4)
except Exception:
pass
audio = EasyID3(str(path))
for k, v in tags_to_write.items():
try:
audio[k] = [v]
except Exception as tag_err:
print(f" ⚠️ Tag-Feld '{k}' übersprungen ({path.name}): {tag_err}", file=sys.stderr)
audio.save(v2_version=4)
return True
elif ext == ".flac":
audio = FLAC(str(path))
for k, v in tags_to_write.items():
audio[k] = [v]
audio.save()
return True
elif ext == ".m4a":
audio = MP4(str(path))
mapping = {
"title": "\xa9nam", "artist": "\xa9ART",
"album": "\xa9alb", "albumartist": "aART",
"tracknumber": "trkn", "date": "\xa9day",
"genre": "\xa9gen",
}
for k, v in tags_to_write.items():
tag_key = mapping.get(k)
if tag_key:
if tag_key == "trkn":
try:
num, total = v.split("/") if "/" in v else (v, "0")
audio[tag_key] = [(int(num), int(total))]
except Exception:
pass
else:
audio[tag_key] = [v]
audio.save()
return True
else:
audio = MutagenFile(str(path), easy=True)
if audio is not None:
if audio.tags is None:
audio.add_tags()
for k, v in tags_to_write.items():
try:
audio[k] = [v]
except Exception:
pass
audio.save()
return True
except Exception as e:
print(f" ⚠️ Tag-Schreibfehler {path.name}: {e}", file=sys.stderr)
return False
def _update_m3u(m3u_path: Path, tracks: List[tuple]) -> bool:
"""
Schreibt M3U neu mit den umbenannten Dateien in Track-Reihenfolge.
tracks: [(TrackProposal, actual_path_after_rename), ...]
"""
try:
lines = ["#EXTM3U"]
for tp, track_path in tracks:
duration = -1
if HAS_MUTAGEN:
try:
audio = MutagenFile(str(track_path))
if audio and hasattr(audio, "info") and audio.info:
duration = int(audio.info.length)
except Exception:
pass
label = f"{tp.artist} - {tp.title}" if tp.artist else (tp.title or track_path.stem)
lines.append(f"#EXTINF:{duration},{label}")
lines.append(track_path.name)
m3u_path.write_text("\n".join(lines) + "\n", encoding="utf-8")
print(f" 📋 Playlist aktualisiert: {m3u_path.name}")
return True
except Exception as e:
print(f" ⚠️ M3U-Fehler {m3u_path.name}: {e}", file=sys.stderr)
return False
def execute_album(
proposal: AlbumProposal,
backup_dir: Optional[Path],
do_rename: bool,
embed_cover_art: bool,
dry_run: bool,
report_data: List[Dict[str, Any]],
) -> Dict[str, int]:
stats = {"tags_written": 0, "covers_embedded": 0, "files_renamed": 0, "errors": 0}
final_tracks: List[tuple] = [] # (TrackProposal, final_path) für M3U
for tp in proposal.tracks:
old_title = tp.path.stem
old_artist = ""
if HAS_MUTAGEN:
try:
audio = MutagenFile(str(tp.path), easy=True)
if audio and audio.tags:
old_artist = str(audio.tags.get("artist", [""])[0])
old_title = str(audio.tags.get("title", [tp.path.stem])[0])
except Exception:
pass
new_path = tp.path
renamed_to = ""
cover_embedded = False
if not dry_run:
if backup_dir:
backup_file(tp.path, backup_dir)
if write_tags(tp.path, tp, proposal):
stats["tags_written"] += 1
else:
stats["errors"] += 1
if embed_cover_art and proposal.cover_path:
if embed_cover(tp.path, proposal.cover_path):
stats["covers_embedded"] += 1
cover_embedded = True
if do_rename:
new_name = _proposed_filename(
tp, tp.path.suffix,
albumartist=proposal.albumartist or "",
genre=proposal.genre or "",
)
candidate = tp.path.parent / new_name
if candidate != tp.path:
try:
tp.path.rename(candidate)
new_path = candidate
renamed_to = new_name
stats["files_renamed"] += 1
except Exception as e:
print(f" ⚠️ Umbenennungsfehler {tp.path.name}: {e}", file=sys.stderr)
stats["errors"] += 1
if not dry_run:
final_tracks.append((tp, new_path))
report_data.append({
"status": "dry-run" if dry_run else "ok",
"album_dir": str(proposal.album_dir.name),
"track_path": str(new_path),
"old_title": old_title,
"new_title": tp.title,
"old_artist": old_artist,
"new_artist": tp.artist,
"album": proposal.album,
"albumartist": proposal.albumartist,
"date": proposal.date or "",
"genre": proposal.genre or "",
"label": proposal.label or "",
"track_number": tp.track_number or "",
"disc_number": tp.disc_number or "",
"cover_embedded": cover_embedded,
"renamed_to": renamed_to,
"confidence": f"{proposal.confidence:.2f}",
"sources": ", ".join(proposal.sources),
})
# M3U-Playlist aktualisieren wenn Dateien umbenannt wurden
if do_rename and not dry_run and stats["files_renamed"] > 0 and final_tracks:
m3u_files = (
list(proposal.album_dir.glob("*.m3u")) +
list(proposal.album_dir.glob("*.m3u8"))
)
if m3u_files:
_update_m3u(m3u_files[0], final_tracks)
# Nach allen Umbenennungen: Verzeichnis Linux-kompatibel bereinigen
if do_rename and not dry_run:
sanitize_dir_names(proposal.album_dir)
return stats
def sanitize_dir_names(directory: Path) -> None:
"""
Macht alle Dateinamen im Verzeichnis Linux-kompatibel.
Bevorzugt 'NameToUnix <dir>', fällt auf 'detox <file>' zurück.
"""
name_to_unix = shutil.which("NameToUnix")
if name_to_unix:
try:
subprocess.run([name_to_unix, str(directory)], check=True, capture_output=True)
return
except subprocess.CalledProcessError as e:
print(f" ⚠️ NameToUnix-Fehler: {e.stderr.decode(errors='replace').strip()}", file=sys.stderr)
detox = shutil.which("detox")
if detox:
for f in sorted(directory.rglob("*")):
if f.is_file():
try:
subprocess.run([detox, str(f)], check=True, capture_output=True)
except subprocess.CalledProcessError as e:
print(f" ⚠️ detox-Fehler {f.name}: {e.stderr.decode(errors='replace').strip()}", file=sys.stderr)
else:
print(" Weder NameToUnix noch detox gefunden — Dateinamen nicht nachbereinigt.", file=sys.stderr)
def write_report(report_data: List[Dict[str, Any]], report_path: Path) -> None:
try:
report_path.parent.mkdir(parents=True, exist_ok=True)
with report_path.open("w", encoding="utf-8", newline="") as f:
w = csv.DictWriter(f, fieldnames=REPORT_FIELDS)
w.writeheader()
w.writerows(report_data)
print(f"📊 Report gespeichert: {report_path}")
except Exception as e:
print(f"⚠️ Report-Fehler: {e}", file=sys.stderr)

583
hint_extractor.py Executable file
View file

@ -0,0 +1,583 @@
from __future__ import annotations
import base64
import json
import os
import re
import shutil
import subprocess
import sys
import urllib.request
from pathlib import Path
from typing import Optional, List, Dict, Tuple
from models import AlbumScan, AlbumHints, TrackHints
try:
from mutagen import File as MutagenFile
HAS_MUTAGEN = True
except ImportError:
HAS_MUTAGEN = False
try:
from bs4 import BeautifulSoup
HAS_BS4 = True
except ImportError:
HAS_BS4 = False
_NATSORT_RE = re.compile(r"(\d+)")
_BAD_VALUES = {"unknown", "unknown artist", "unknown album", "untitled", "track", "va", "various"}
# Filename patterns: most specific first
_FILENAME_PATTERNS = [
re.compile(r"^(?P<disc>\d{1,2})[- _]+(?P<track>\d{1,3})\s*[-._ ]+\s*(?P<artist>.+?)\s*[-]\s*(?P<title>.+)$"),
re.compile(r"^(?P<disc>\d{1,2})[- _]+(?P<track>\d{1,3})\s*[-._ ]+\s*(?P<title>.+)$"),
re.compile(r"^(?P<track>\d{1,3})\s*[-._ ]+\s*(?P<artist>.+?)\s*[-]\s*(?P<title>.+)$"),
re.compile(r"^(?P<track>\d{1,3})\s*[-._ ]+\s*(?P<title>.+)$"),
re.compile(r"^(?P<artist>.+?)\s*[-]\s*(?P<title>.+)$"),
]
# Directory name patterns
_DIR_PATTERNS = [
re.compile(r"^(?P<artist>.+?)[_ -]+[-][_ -]+(?P<album>.+?)(?:[_ -]+(?P<year>\d{4}))?$"),
re.compile(r"^(?P<artist>.+?)[_ ]+(?P<year>\d{4})[._ -]+(?P<album>.+)$"),
re.compile(r"^(?P<album>.+?)[_ -]+(?P<year>\d{4})$"),
]
# Tracklist line patterns
_TRACKLIST_PATTERNS = [
re.compile(r"^(?P<disc>\d{1,2})[- _](?P<track>\d{1,3})\s+(?P<title>.+?)(?:\s+\d+:\d{2})?$"),
# Separator muss . ) oder : sein — reines Leerzeichen reicht nicht
# (verhindert False-Positives wie "2 x CD, Compilation, Remastered")
re.compile(r"^(?P<track>\d{1,3})[.):]\s*(?P<title>.+?)(?:\s+\d+:\d{2})?$"),
re.compile(r"^(?P<track>[A-Z]\d{1,2})[.):]\s*(?P<title>.+?)(?:\s+\d+:\d{2})?$"),
]
_DISC_SECTION_RE = re.compile(r"(?i)(?:cd|disc|disk|side)[_ \-]*(\d{1,2})")
def _clean(s: Optional[str]) -> str:
if not s:
return ""
# BOM (U+FEFF), Zero-Width-Space (U+200B), Soft-Hyphen (U+00AD) entfernen
s = re.sub(r"[­]", "", s)
return re.sub(r"\s+", " ", s.replace("_", " ")).strip(" -._")
def _norm_for_match(s: str) -> str:
"""Nur Buchstaben und Ziffern — für fuzzy Titelvergleich (Interpunktion-agnostisch)."""
return re.sub(r"[^a-z0-9]", "", s.casefold())
# Klassische Werkverzeichnis-Nummern: BWV 565, Op. 27, K. 331, HWV 56, …
_CATALOG_RE = re.compile(
r"\b(bwv|hwv|op|k|kv|d|sz|wq|bbwv|rv|twv|hob)\W*(\d+[a-z]?(?:[\/\.]\d+)?)",
re.IGNORECASE,
)
def _catalog_key(s: str) -> Optional[str]:
"""Extrahiert normalisierte Katalognummer, z.B. 'bwv565' oder 'op27'."""
m = _CATALOG_RE.search(s)
if m:
return m.group(1).lower() + re.sub(r"\W", "", m.group(2))
return None
def _is_good(v: Optional[str]) -> bool:
if not v:
return False
return _clean(v).casefold() not in _BAD_VALUES
def _parse_dirname(name: str) -> Tuple[Optional[str], Optional[str], Optional[str]]:
name_clean = _clean(name)
for pat in _DIR_PATTERNS:
m = pat.match(name_clean)
if m:
d = m.groupdict()
artist = _clean(d.get("artist")) or None
album = _clean(d.get("album")) or None
year = d.get("year")
if _is_good(artist) or _is_good(album):
return artist, album, year
# No pattern matched — treat whole name as album
return None, _clean(name_clean), None
def _parse_filename(stem: str) -> Dict[str, str]:
stem_clean = _clean(stem)
for pat in _FILENAME_PATTERNS:
m = pat.match(stem_clean)
if m:
return {k: _clean(v) for k, v in m.groupdict().items() if v}
return {"title": stem_clean}
def _read_tags(path: Path) -> Tuple[Dict[str, str], Optional[float]]:
if not HAS_MUTAGEN:
return {}, None
try:
audio = MutagenFile(str(path), easy=True)
if not audio:
return {}, None
tags: Dict[str, str] = {}
for k in ("title", "artist", "album", "albumartist", "tracknumber",
"discnumber", "date", "year", "genre", "label", "organization"):
v = audio.get(k)
if v:
tags[k] = str(v[0]).strip()
if "year" in tags and "date" not in tags:
tags["date"] = tags["year"]
duration = None
if hasattr(audio, "info") and audio.info and hasattr(audio.info, "length"):
duration = audio.info.length
return tags, duration
except Exception as e:
print(f" ⚠️ Tag-Lesefehler {path.name}: {e}", file=sys.stderr)
return {}, None
def _parse_tracklist(text: str) -> List[Dict[str, str]]:
tracks: List[Dict[str, str]] = []
current_disc = 1
for line in text.splitlines():
line = line.strip()
if not line:
continue
disc_m = _DISC_SECTION_RE.match(line)
if disc_m and len(line) < 30:
current_disc = int(disc_m.group(1))
continue
for pat in _TRACKLIST_PATTERNS:
m = pat.match(line)
if m:
d = m.groupdict()
entry: Dict[str, str] = {"title": _clean(d.get("title", ""))}
raw_track = d.get("track", "")
if raw_track and raw_track.isdigit():
entry["track"] = raw_track.lstrip("0") or "0"
elif raw_track:
entry["track"] = raw_track
if "disc" in d and d["disc"]:
entry["disc"] = d["disc"]
else:
entry["disc"] = str(current_disc)
if entry.get("title"):
tracks.append(entry)
break
return tracks
def _parse_m3u(text: str) -> List[Dict[str, str]]:
"""M3U/M3U8 → geordnete Liste: [{filename, title, position}].
Reihenfolge der Einträge = gewünschte Trackreihenfolge.
"""
tracks: List[Dict[str, str]] = []
pending_title: Optional[str] = None
position = 0
for line in text.splitlines():
line = line.strip()
if not line:
continue
if line.upper().startswith("#EXTINF:"):
parts = line.split(",", 1)
pending_title = parts[1].strip() if len(parts) > 1 else None
elif not line.startswith("#"):
filename = Path(line.replace("\\", "/")).name
if not filename:
continue
position += 1
tracks.append({
"position": str(position),
"filename": filename,
"title": pending_title or "",
})
pending_title = None
return tracks
def _read_tracklist_file(path: Path) -> Optional[str]:
try:
if path.suffix.lower() in (".htm", ".html"):
raw = path.read_bytes()
encoding = "utf-8"
for enc in ("utf-8", "latin-1", "cp1252"):
try:
raw.decode(enc)
encoding = enc
break
except UnicodeDecodeError:
continue
text = raw.decode(encoding, errors="replace")
if HAS_BS4:
soup = BeautifulSoup(text, "html.parser")
return soup.get_text(separator="\n")
# Fallback: strip HTML tags
return re.sub(r"<[^>]+>", " ", text)
else:
for enc in ("utf-8", "latin-1", "cp1252"):
try:
return path.read_text(encoding=enc)
except UnicodeDecodeError:
continue
except Exception as e:
print(f" ⚠️ Tracklist-Lesefehler {path.name}: {e}", file=sys.stderr)
return None
_OLLAMA_HOST = os.getenv("OLLAMA_HOST", "http://localhost:11434")
# Modelle in Prioritätsreihenfolge; überschreibbar via OLLAMA_OCR_MODEL
_OCR_MODELS = [m.strip() for m in os.getenv(
"OLLAMA_OCR_MODEL",
"qwen3-vl:latest,minicpm-v:latest,deepseek-ocr:latest"
).split(",") if m.strip()]
_OCR_PROMPT = (
"This image shows a CD album back cover or booklet page. "
"Your task: extract the complete tracklist as plain text.\n"
"Rules:\n"
"- Output track number and title per line, e.g. '1. Title' or '1-1 Title'\n"
"- If multiple discs/CDs: include a header like 'CD 1' or 'Disc 1' before each group\n"
"- Include durations if visible (e.g. '1. Title 4:32')\n"
"- Do NOT include label info, barcodes, or other non-tracklist text\n"
"- If no tracklist is visible, reply with: NO_TRACKLIST"
)
def _ocr_back_cover(image_files: List[Path]) -> Optional[str]:
"""
OCR eines Back-Cover- oder Booklet-Bildes via Ollama Vision.
Gibt den erkannten Text zurück, oder None wenn nichts gefunden.
"""
# Nur Bilder die nach Back-Cover aussehen
candidates = [
p for p in image_files
if any(kw in p.name.lower() for kw in ("back", "inlay", "booklet", "inside", "rear"))
]
# Fallback: alle Bilder außer dem Front-Cover
if not candidates:
candidates = [
p for p in image_files
if not any(kw in p.name.lower() for kw in ("front", "folder", "cover"))
]
if not candidates:
return None
image_path = candidates[0]
try:
img_b64 = base64.b64encode(image_path.read_bytes()).decode()
except Exception as e:
print(f" ⚠️ OCR-Bild lesen {image_path.name}: {e}", file=sys.stderr)
return None
for model in _OCR_MODELS:
payload = json.dumps({
"model": model,
"messages": [{
"role": "user",
"content": _OCR_PROMPT,
"images": [img_b64],
}],
"stream": False,
"options": {"temperature": 0.0},
}).encode()
try:
req = urllib.request.Request(
f"{_OLLAMA_HOST}/api/chat",
data=payload,
headers={"Content-Type": "application/json"},
method="POST",
)
with urllib.request.urlopen(req, timeout=180) as resp:
data = json.loads(resp.read())
text = data.get("message", {}).get("content", "").strip()
if text and "NO_TRACKLIST" not in text:
print(f" 📷 OCR {image_path.name} via {model}: {len(text)} Zeichen extrahiert",
file=sys.stderr)
return text
elif "NO_TRACKLIST" in text:
print(f" 📷 OCR {image_path.name}: kein Tracklist-Text erkannt", file=sys.stderr)
return None
except Exception as e:
print(f" ⚠️ OCR-Fehler ({model}) {image_path.name}: {e}", file=sys.stderr)
continue
return None
def _check_cover_images(paths: List[Path]) -> List[Path]:
good: List[Path] = []
for p in paths:
name_lower = p.name.lower()
# Prefer front covers
if any(kw in name_lower for kw in ("front", "folder", "cover", "album")):
good.insert(0, p)
else:
good.append(p)
return good
# YouTube-Video-ID: 11 Zeichen aus [A-Za-z0-9_-], eingebettet im Dateinamen
_YT_ID_RE = re.compile(r"(?<![A-Za-z0-9_-])([A-Za-z0-9_-]{11})(?![A-Za-z0-9_-])")
def _extract_youtube_id(path: Path) -> Optional[str]:
"""Sucht eine YouTube-Video-ID im Dateinamen (Stem oder Suffix)."""
name = path.stem + path.suffix
for m in _YT_ID_RE.finditer(name):
candidate = m.group(1)
# Einfache Plausibilitätsprüfung: muss gemischte Zeichen haben
if re.search(r"[A-Z]", candidate) and re.search(r"[0-9a-z]", candidate):
return candidate
return None
def _fetch_youtube_metadata(video_id: str) -> Optional[Dict]:
"""
Ruft YouTube-Metadaten via yt-dlp ab (kein API-Key nötig).
Gibt Dict mit title, uploader, chapters, description zurück oder None.
"""
ytdlp = shutil.which("yt-dlp")
if not ytdlp:
return None
url = f"https://www.youtube.com/watch?v={video_id}"
try:
result = subprocess.run(
[ytdlp, "--dump-json", "--no-download", "--no-playlist", url],
capture_output=True, text=True, timeout=30,
)
if result.returncode != 0 or not result.stdout.strip():
return None
return json.loads(result.stdout)
except Exception as e:
print(f" ⚠️ YouTube-Fehler ({video_id}): {e}", file=sys.stderr)
return None
def _chapters_to_tracklist_text(chapters: List[Dict]) -> str:
"""
Konvertiert yt-dlp-Chapters in Tracklist-Text der vom _parse_tracklist
verarbeitetet werden kann: '1. Titel MM:SS'
"""
lines = []
for i, ch in enumerate(chapters, 1):
title = ch.get("title", "").strip()
if not title or title.startswith("<Untitled"):
continue
secs = int(ch.get("start_time", 0))
mm, ss = divmod(secs, 60)
lines.append(f"{i}. {title} {mm}:{ss:02d}")
return "\n".join(lines)
def extract_hints(scan: AlbumScan, use_ocr: bool = True) -> AlbumHints:
hints = AlbumHints(album_dir=scan.album_dir)
# Directory name
hints.dir_artist, hints.dir_album, hints.dir_year = _parse_dirname(scan.album_dir.name)
# Cover images
hints.cover_images = _check_cover_images(scan.image_files)
# Tracklist files
texts: List[str] = []
for tf in scan.tracklist_files:
txt = _read_tracklist_file(tf)
if txt:
texts.append(txt)
hints.tracklist_text = "\n\n".join(texts) if texts else None
# OCR-Fallback: Back-Cover scannen wenn keine Tracklist-Textdatei vorhanden
if use_ocr and not hints.tracklist_text and scan.image_files:
ocr_text = _ocr_back_cover(scan.image_files)
if ocr_text:
hints.tracklist_text = ocr_text
# YouTube-Lookup: IDs aus Dateinamen extrahieren, Metadaten per yt-dlp holen
yt_meta_by_id: Dict[str, Optional[Dict]] = {}
yt_ids_by_stem: Dict[str, str] = {} # stem (normalisiert) → youtube_id
for audio_path in scan.audio_files:
yt_id = _extract_youtube_id(audio_path)
if yt_id:
stem_key = _clean(audio_path.stem).casefold()
yt_ids_by_stem[stem_key] = yt_id
yt_meta_by_id.setdefault(yt_id, None)
if yt_meta_by_id:
print(f" 📺 YouTube-IDs gefunden: {', '.join(list(yt_meta_by_id.keys())[:5])}", file=sys.stderr)
for yt_id in list(yt_meta_by_id.keys())[:5]:
meta = _fetch_youtube_metadata(yt_id)
yt_meta_by_id[yt_id] = meta
# Chapters als Tracklist nutzen wenn noch keine vorhanden
if not hints.tracklist_text:
for yt_id, meta in yt_meta_by_id.items():
if meta and meta.get("chapters"):
chapter_text = _chapters_to_tracklist_text(meta["chapters"])
if chapter_text:
hints.tracklist_text = chapter_text
print(f" 📺 YouTube-Chapters als Tracklist: {len(meta['chapters'])} Tracks",
file=sys.stderr)
break
# Album-Level-Hints (erster erfolgreicher Treffer)
for yt_id, meta in yt_meta_by_id.items():
if meta:
hints.yt_title = (meta.get("title") or "").strip() or None
hints.yt_uploader = (
meta.get("uploader") or meta.get("channel") or ""
).strip() or None
break
parsed_tracklist = _parse_tracklist(hints.tracklist_text) if hints.tracklist_text else []
# M3U/Playlist-Reihenfolge: filename (stem, normalisiert) → Tracknummer
m3u_order: Dict[str, int] = {}
m3u_titles: Dict[str, str] = {}
for pf in scan.playlist_files:
try:
text = pf.read_text(encoding="utf-8", errors="replace")
for entry in _parse_m3u(text):
stem = _clean(Path(entry["filename"]).stem).casefold()
pos = int(entry["position"])
if stem and stem not in m3u_order:
m3u_order[stem] = pos
if entry.get("title"):
m3u_titles[stem] = entry["title"]
except Exception as e:
print(f" ⚠️ Playlist-Lesefehler {pf.name}: {e}", file=sys.stderr)
# Tracklist-Lookup: exakter Titel, fuzzy Titel, Katalognummer (BWV, Op., K., …)
tl_by_title: Dict[str, Dict[str, str]] = {}
tl_by_title_norm: Dict[str, Dict[str, str]] = {}
tl_by_catalog: Dict[str, Dict[str, str]] = {}
for entry in parsed_tracklist:
raw_title = entry.get("title", "")
exact_key = _clean(raw_title).casefold()
if exact_key:
tl_by_title[exact_key] = entry
norm_key = _norm_for_match(raw_title)
if norm_key:
tl_by_title_norm[norm_key] = entry
cat_key = _catalog_key(raw_title)
if cat_key:
tl_by_catalog[cat_key] = entry
# Build TrackHints per audio file
for audio_path in sorted(scan.audio_files):
tags, duration = _read_tags(audio_path)
fn_hints = _parse_filename(audio_path.stem)
track_num: Optional[int] = None
disc_num: Optional[int] = None
# Track number: tag > filename
raw_tn = tags.get("tracknumber") or fn_hints.get("track")
if raw_tn:
try:
tn_int = int(str(raw_tn).split("/")[0])
if tn_int > 0: # 0 gilt als "keine Nummer"
track_num = tn_int
except ValueError:
pass
# Disc number: tag > filename > path segment
raw_dn = tags.get("discnumber") or fn_hints.get("disc")
if raw_dn:
try:
disc_num = int(str(raw_dn).split("/")[0])
except ValueError:
pass
if not disc_num:
for part in audio_path.relative_to(scan.album_dir).parts[:-1]:
dm = _DISC_SECTION_RE.search(part)
if dm:
disc_num = int(dm.group(1))
break
title = tags.get("title") or fn_hints.get("title")
artist = tags.get("artist") or fn_hints.get("artist")
# Tracklist-Matching: Nummer → exakter Titel → fuzzy Titel
# Wenn ein Match gefunden: disc+track aus Tracklist übernehmen (Tracklist ist
# autoritativer als M3U-Reihenfolge bei Alben mit expliziter Disc-Nummerierung).
if parsed_tracklist:
matched_tl: Optional[Dict[str, str]] = None
# 1. Exakt per Tracknummer + Disc (nur wenn beides aus Tag/Dateiname bekannt)
if track_num and disc_num:
for tl_entry in parsed_tracklist:
tl_track = tl_entry.get("track")
tl_disc = tl_entry.get("disc", "1")
if (tl_track and int(tl_track) == track_num
and int(tl_disc) == disc_num):
matched_tl = tl_entry
break
# 2. Exakter Titelvergleich
if matched_tl is None and title:
matched_tl = tl_by_title.get(_clean(title).casefold())
# 3. Fuzzy Titelvergleich (ignoriert Kommas, Apostrophe, Groß-/Kleinschreibung)
if matched_tl is None and title:
matched_tl = tl_by_title_norm.get(_norm_for_match(title))
# 4. Katalognummer (BWV, Op., K. …) — greift bei abgekürzten Dateinamen
if matched_tl is None and title:
cat = _catalog_key(title)
if cat:
matched_tl = tl_by_catalog.get(cat)
if matched_tl:
# Titel aus Tracklist übernehmen wenn besser
if _is_good(matched_tl.get("title")):
title = matched_tl["title"]
# disc+track aus Tracklist sind autoritativer als M3U-Reihenfolge
try:
tl_track_n = int(matched_tl["track"]) if matched_tl.get("track") else None
tl_disc_n = int(matched_tl.get("disc", "1"))
if tl_track_n:
track_num = tl_track_n
disc_num = tl_disc_n
except (ValueError, KeyError):
pass
# M3U-Reihenfolge nur als letzter Fallback (wenn Tracklist kein Match liefert)
if track_num is None:
stem_key = _clean(audio_path.stem).casefold()
if stem_key in m3u_order:
track_num = m3u_order[stem_key]
# M3U-Titel als Fallback (enthält "Composer - Title" — nur nutzen wenn kein besserer Titel)
if not _is_good(title):
stem_key = _clean(audio_path.stem).casefold()
if stem_key in m3u_titles:
title = m3u_titles[stem_key]
# YouTube-Titel als letzter Fallback (bei einzelner Datei = das ganze Video)
if not _is_good(title):
stem_key = _clean(audio_path.stem).casefold()
yt_id = yt_ids_by_stem.get(stem_key)
if yt_id:
meta = yt_meta_by_id.get(yt_id)
if meta:
yt_video_title = (meta.get("title") or "").strip()
if yt_video_title:
title = yt_video_title
hints.tracks.append(TrackHints(
path=audio_path,
track_number=track_num,
disc_number=disc_num,
title=_clean(title) if title else None,
artist=_clean(artist) if artist else None,
duration=duration,
existing_tags=tags,
))
return hints

577
metadata_resolver.py Executable file
View file

@ -0,0 +1,577 @@
from __future__ import annotations
import os
import re
import sys
import time
from typing import Optional, List, Dict, Tuple
from models import AlbumHints, AlbumProposal, TrackProposal
try:
import musicbrainzngs as mb
mb.set_useragent("MusicMetadataEnricher", "1.0", "https://github.com/dschlueter")
HAS_MB = True
except ImportError:
HAS_MB = False
try:
import acoustid
HAS_ACOUSTID = True
except ImportError:
HAS_ACOUSTID = False
try:
import discogs_client as dc
HAS_DISCOGS = True
except ImportError:
HAS_DISCOGS = False
try:
import anthropic
HAS_ANTHROPIC = True
except ImportError:
HAS_ANTHROPIC = False
_MB_RATE_LIMIT = 1.1 # seconds between MusicBrainz requests
_last_mb_call = 0.0
ACOUSTID_API_KEY = os.getenv("ACOUSTID_API_KEY", "")
ANTHROPIC_API_KEY = os.getenv("ANTHROPIC_API_KEY", "")
OPENROUTER_API_KEY = os.getenv("OPENROUTER_API_KEY", "")
DISCOGS_TOKEN = os.getenv("DISCOGS_TOKEN", "")
OLLAMA_HOST = os.getenv("OLLAMA_HOST", "http://localhost:11434")
# qwen3:8b (5.2GB) reicht für einfache JSON-Metadaten-Ergänzung und lädt schnell (~10s)
OLLAMA_RESOLVE_MODEL = os.getenv("OLLAMA_RESOLVE_MODEL", "qwen3:8b")
def _mb_wait():
global _last_mb_call
elapsed = time.monotonic() - _last_mb_call
if elapsed < _MB_RATE_LIMIT:
time.sleep(_MB_RATE_LIMIT - elapsed)
_last_mb_call = time.monotonic()
# ---------------------------------------------------------------------------
# AcoustID fingerprinting
# ---------------------------------------------------------------------------
def _fingerprint_tracks(hints: AlbumHints) -> Dict[str, List[str]]:
"""Returns {audio_path_str: [mbid, ...]}"""
if not HAS_ACOUSTID or not ACOUSTID_API_KEY:
return {}
results: Dict[str, List[str]] = {}
for t in hints.tracks:
try:
duration, fp = acoustid.fingerprint_file(str(t.path))
response = acoustid.lookup(ACOUSTID_API_KEY, fp, duration,
meta="recordings releasegroups")
mbids: List[str] = []
for result in response.get("results", []):
if result.get("score", 0) >= 0.90:
for rec in result.get("recordings", []):
mbids.append(rec["id"])
results[str(t.path)] = mbids
except Exception as e:
print(f" ⚠️ AcoustID-Fehler {t.path.name}: {e}", file=sys.stderr)
return results
# ---------------------------------------------------------------------------
# MusicBrainz lookup
# ---------------------------------------------------------------------------
def _mb_search_release(artist: Optional[str], album: Optional[str],
year: Optional[str]) -> Optional[Dict]:
if not HAS_MB or (not artist and not album):
return None
query_parts = []
if album:
query_parts.append(f'release:"{album}"')
if artist:
query_parts.append(f'artist:"{artist}"')
if year:
query_parts.append(f'date:{year}')
query = " AND ".join(query_parts)
try:
_mb_wait()
result = mb.search_releases(query=query, limit=3)
releases = result.get("release-list", [])
if not releases:
return None
# Take highest-score release
best = max(releases, key=lambda r: int(r.get("ext:score", 0)))
score = int(best.get("ext:score", 0))
if score < 70:
return None
return best
except Exception as e:
print(f" ⚠️ MusicBrainz-Suchfehler: {e}", file=sys.stderr)
return None
def _mb_get_release_tracks(release_id: str) -> Optional[List[Dict]]:
if not HAS_MB:
return None
try:
_mb_wait()
result = mb.get_release_by_id(
release_id,
includes=["recordings", "artist-credits", "labels", "release-groups"],
)
return result.get("release")
except Exception as e:
print(f" ⚠️ MusicBrainz-Release-Fehler: {e}", file=sys.stderr)
return None
def _mb_recording_to_release(recording_mbid: str) -> Optional[Dict]:
if not HAS_MB:
return None
try:
_mb_wait()
result = mb.get_recording_by_id(
recording_mbid,
includes=["releases", "artist-credits", "release-groups"],
)
rec = result.get("recording", {})
releases = rec.get("release-list", [])
if releases:
return releases[0]
return None
except Exception as e:
print(f" ⚠️ MusicBrainz-Recording-Fehler: {e}", file=sys.stderr)
return None
# ---------------------------------------------------------------------------
# Discogs fallback
# ---------------------------------------------------------------------------
def _discogs_search(artist: Optional[str], album: Optional[str]) -> Optional[Dict]:
if not HAS_DISCOGS or not DISCOGS_TOKEN:
return None
try:
client = dc.Client("MusicMetadataEnricher/1.0", user_token=DISCOGS_TOKEN)
results = client.search(
album or artist or "",
artist=artist or "",
type="release",
)
if results.count:
r = results[0]
return {
"album": r.title,
"artist": r.artists[0].name if r.artists else None,
"year": str(r.year) if r.year else None,
"genre": r.genres[0] if r.genres else None,
"label": r.labels[0].name if r.labels else None,
"id": r.id,
}
except Exception as e:
print(f" ⚠️ Discogs-Fehler: {e}", file=sys.stderr)
return None
# ---------------------------------------------------------------------------
# Claude API reasoning (optional)
# ---------------------------------------------------------------------------
def _build_resolve_prompt(hints: AlbumHints, partial: Dict) -> str:
tracks_summary = "\n".join(
f" - {('D'+str(t.disc_number)+'-') if t.disc_number else ''}T{t.track_number or '?'}: "
f"{t.title or t.path.stem}"
+ (f" [{t.artist}]" if t.artist else "")
for t in hints.tracks[:20]
)
# Tracklist-Kopfzeilen (erste 400 Zeichen, vor der Track-Liste) für Album/Label-Info
tracklist_header = ""
if hints.tracklist_text:
header_lines = []
for line in hints.tracklist_text.splitlines():
line = line.strip()
if not line:
continue
# Stopp bei erster Zeile die wie ein Track aussieht (1-1, 1. etc.)
if re.match(r"^\d[\d\-]\s+\S", line) or re.match(r"^\d{1,3}[.)]\s+", line):
break
header_lines.append(line)
if sum(len(l) for l in header_lines) > 400:
break
tracklist_header = "\n".join(header_lines[:15])
return (
"Du bist ein Musikexperte. Analysiere diese Album-Daten und gib korrekte Metadaten zurück.\n"
"Korrigiere auch erkennbare Tippfehler (Verzeichnisnamen enthalten oft Schreibfehler).\n\n"
"WICHTIGE FELDDEFINITIONEN:\n"
'- "artist" = Komponist (Klassik) ODER Band/Sänger (Pop/Rock/Jazz)\n'
'- "albumartist" = Interpret/Performer/Dirigent (Klassik) ODER gleich wie artist (Pop)\n'
" Beispiel Klassik: artist='Johann Sebastian Bach', albumartist='Peter Hurford'\n"
" Beispiel Pop: artist='ABBA', albumartist='ABBA'\n\n"
f"Verzeichnisname: {hints.album_dir.name}\n"
f"Hinweis Künstler/Titel (aus Verzeichnis, kann vertauscht oder falsch sein): "
f"{hints.dir_artist or '?'} / {hints.dir_album or partial.get('album', '?')}\n"
f"Jahr: {hints.dir_year or partial.get('year', 'unbekannt')}\n"
+ (f"YouTube-Videotitel: {hints.yt_title}\n" if hints.yt_title else "")
+ (f"YouTube-Uploader/Kanal: {hints.yt_uploader}\n" if hints.yt_uploader else "")
+ (f"Tracklist-Kopf (Label/Jahr/Albumtitel):\n{tracklist_header}\n\n" if tracklist_header else "")
+ f"Tracks:\n{tracks_summary}\n\n"
'Antworte NUR mit einem JSON-Objekt (null wenn unbekannt):\n'
'{"artist": ..., "album": ..., "albumartist": ..., "year": ..., "genre": ..., "label": ...}'
)
def _parse_json_response(text: str) -> Optional[Dict]:
import json, re
m = re.search(r"\{.*\}", text, re.DOTALL)
if m:
try:
return json.loads(m.group())
except Exception:
pass
return None
def _resolve_via_ollama(hints: AlbumHints, partial: Dict) -> Optional[Dict]:
"""Lokales Reasoning via Ollama (kein API-Key nötig)."""
import urllib.request, json
prompt = _build_resolve_prompt(hints, partial)
payload = json.dumps({
"model": OLLAMA_RESOLVE_MODEL,
"messages": [{"role": "user", "content": prompt}],
"stream": False,
"format": "json",
"options": {"temperature": 0.1},
}).encode()
try:
req = urllib.request.Request(
f"{OLLAMA_HOST}/api/chat",
data=payload,
headers={"Content-Type": "application/json"},
method="POST",
)
with urllib.request.urlopen(req, timeout=240) as resp:
data = json.loads(resp.read())
text = data.get("message", {}).get("content", "").strip()
return _parse_json_response(text)
except Exception as e:
print(f" ⚠️ Ollama-Resolve-Fehler: {e}", file=sys.stderr)
return None
def _resolve_via_openrouter(hints: AlbumHints, partial: Dict) -> Optional[Dict]:
"""Reasoning via OpenRouter (günstige chinesische Modelle bevorzugt)."""
if not OPENROUTER_API_KEY:
return None
import urllib.request, json
prompt = _build_resolve_prompt(hints, partial)
# DeepSeek V3: extrem günstig, sehr kompetent
model = "deepseek/deepseek-chat-v3-0324"
payload = json.dumps({
"model": model,
"messages": [{"role": "user", "content": prompt}],
"temperature": 0.1,
"max_tokens": 300,
}).encode()
try:
req = urllib.request.Request(
"https://openrouter.ai/api/v1/chat/completions",
data=payload,
headers={
"Content-Type": "application/json",
"Authorization": f"Bearer {OPENROUTER_API_KEY}",
"HTTP-Referer": "https://pi.local",
"X-Title": "MusicMetadataEnricher",
},
method="POST",
)
with urllib.request.urlopen(req, timeout=30) as resp:
data = json.loads(resp.read())
text = data["choices"][0]["message"]["content"].strip()
return _parse_json_response(text)
except Exception as e:
print(f" ⚠️ OpenRouter-Resolve-Fehler: {e}", file=sys.stderr)
return None
def _claude_resolve(hints: AlbumHints, partial: Dict) -> Optional[Dict]:
"""
Reihenfolge: Ollama (lokal, kostenlos) OpenRouter (günstig).
Claude API wird bewusst nicht genutzt (zu teuer).
"""
# 1. Ollama lokal (bevorzugt — kostenlos, RTX 3090)
result = _resolve_via_ollama(hints, partial)
if result:
return result
# 2. OpenRouter (DeepSeek V3, günstig) wenn Key gesetzt
if OPENROUTER_API_KEY:
result = _resolve_via_openrouter(hints, partial)
if result:
return result
return None
# ---------------------------------------------------------------------------
# Main resolver
# ---------------------------------------------------------------------------
def resolve(
hints: AlbumHints,
use_fingerprint: bool = True,
use_api: bool = True,
use_claude: bool = True,
) -> AlbumProposal:
confidence = 0.0
sources: List[str] = []
notes: List[str] = []
artist = hints.dir_artist
album = hints.dir_album
year = hints.dir_year
genre: Optional[str] = None
label: Optional[str] = None
release_mbid: Optional[str] = None
mb_tracks: Optional[List] = None
# Collect artist/album from existing tags (majority vote)
tag_artists = [t.existing_tags.get("artist") for t in hints.tracks if t.existing_tags.get("artist")]
tag_albums = [t.existing_tags.get("album") for t in hints.tracks if t.existing_tags.get("album")]
if tag_artists:
from collections import Counter
artist = artist or Counter(tag_artists).most_common(1)[0][0]
if tag_albums:
from collections import Counter
album = album or Counter(tag_albums).most_common(1)[0][0]
# Tag year/genre/label
import re as _re
for t in hints.tracks:
raw_year = t.existing_tags.get("date") or t.existing_tags.get("year")
if raw_year and not year:
# Strip invisible chars so ID3TimeStamp validation doesn't fail later
year = _re.sub(r"[^\d\-T:+Z]", "", str(raw_year)).strip()[:10] or None
genre = genre or t.existing_tags.get("genre")
label = label or t.existing_tags.get("label") or t.existing_tags.get("organization")
# YouTube-Metadaten als zusätzliche Hinweise (Uploader → Künstler, Titel → Album/Track)
if hints.yt_uploader and not artist:
artist = hints.yt_uploader
if hints.yt_title and not album:
album = hints.yt_title
if artist or album:
confidence += 0.05
sources.append("local-hints")
if hints.yt_title or hints.yt_uploader:
sources.append("youtube")
# AcoustID fingerprinting
fp_mbids: Dict[str, List[str]] = {}
if use_fingerprint and use_api and HAS_ACOUSTID and ACOUSTID_API_KEY:
fp_mbids = _fingerprint_tracks(hints)
if fp_mbids:
confidence += 0.20
sources.append("acoustid")
# Try to get release from first matched recording
for mbids in fp_mbids.values():
for mbid in mbids[:1]:
rel = _mb_recording_to_release(mbid)
if rel:
release_mbid = rel.get("id")
confidence += 0.25
sources.append("musicbrainz-fingerprint")
break
if release_mbid:
break
# MusicBrainz text search
if use_api and HAS_MB and not release_mbid:
mb_result = _mb_search_release(artist, album, year)
if mb_result:
release_mbid = mb_result.get("id")
score = int(mb_result.get("ext:score", 0))
confidence += 0.30 * (score / 100)
sources.append("musicbrainz-text")
notes.append(f"MusicBrainz score: {score}")
# Fetch full release data
if use_api and release_mbid:
full_release = _mb_get_release_tracks(release_mbid)
if full_release:
if not artist:
creds = full_release.get("artist-credit", [])
artist = "".join(c.get("artist", {}).get("name", "") + c.get("joinphrase", "")
for c in creds if isinstance(c, dict)).strip() or artist
if not album:
album = full_release.get("title", album)
if not year:
year = full_release.get("date", "")[:4] or None
label_info = full_release.get("label-info-list", [])
if label_info and not label:
label = label_info[0].get("label", {}).get("name") if label_info else None
rg = full_release.get("release-group", {})
if not genre:
genre = (rg.get("primary-type") or "").strip() or None
mb_tracks = []
for medium in full_release.get("medium-list", []):
disc_num = medium.get("position", 1)
for track in medium.get("track-list", []):
mb_tracks.append({
"disc": disc_num,
"number": int(track.get("number", 0) or 0),
"title": track.get("recording", {}).get("title", ""),
"artist": track.get("artist-credit-phrase", ""),
"mbid": track.get("recording", {}).get("id"),
})
# Discogs fallback
if use_api and HAS_DISCOGS and DISCOGS_TOKEN and not release_mbid:
dg = _discogs_search(artist, album)
if dg:
artist = artist or dg.get("artist")
album = album or dg.get("album")
year = year or dg.get("year")
genre = genre or dg.get("genre")
label = label or dg.get("label")
confidence += 0.15
sources.append("discogs")
# LLM-Reasoning für verbleibende Lücken:
# Reihenfolge: Ollama lokal → OpenRouter (DeepSeek, günstig) → Claude API
cl_albumartist: Optional[str] = None
partial = {"artist": artist, "album": album, "year": year}
if use_claude and use_api:
if not artist or not album or confidence < 0.5:
cl = _claude_resolve(hints, partial)
if cl:
if confidence < 0.3:
# Sehr unsicher: LLM darf auch bestehende Werte korrigieren
# (z.B. Tippfehler im Albumtitel aus dem Verzeichnisnamen)
artist = cl.get("artist") or artist
album = cl.get("album") or album
year = cl.get("year") or year
genre = cl.get("genre") or genre
label = cl.get("label") or label
else:
artist = artist or cl.get("artist")
album = album or cl.get("album")
year = year or cl.get("year")
genre = genre or cl.get("genre")
label = label or cl.get("label")
cl_albumartist = cl.get("albumartist") or None
confidence += 0.10
sources.append("llm-resolve")
# Finalize albumartist
# Priorität: (1) LLM-albumartist bei niedriger Konfidenz
# (2) dir_artist wenn Verzeichnisname einen Künstler nennt
# (3) Heuristiken (Various Artists, Mehrheitsabstimmung)
# Rationale: "Bach_Organ_-_Peter_Hurford" → dir_artist="Bach Organ" ist kein Künstler,
# aber der Verzeichnisname sieht aus wie Künstler; LLM kann das korrekt auflösen.
track_artists = [t.artist for t in hints.tracks if t.artist]
from collections import Counter
distinct_artists = set(a for a in track_artists if a)
_bad_aa = {"various artists", "unknown artist", "unknown", "va"}
def _good_aa(s: Optional[str]) -> bool:
return bool(s) and s.casefold().strip() not in _bad_aa
if _good_aa(cl_albumartist) and confidence < 0.4:
# LLM kennt den echten Albumkünstler besser als der Verzeichnisname
albumartist = cl_albumartist # type: ignore[assignment]
elif hints.dir_artist:
albumartist = hints.dir_artist
elif len(distinct_artists) >= 3:
albumartist = "Various Artists"
elif track_artists:
albumartist = artist or Counter(track_artists).most_common(1)[0][0]
else:
albumartist = artist or "Unknown Artist"
album = album or hints.album_dir.name.replace("_", " ")
artist = artist or albumartist
confidence = min(confidence, 1.0)
# Build track proposals
# `artist` = Komponist/Hauptkünstler (LLM-aufgelöst), `albumartist` = Performer
# Werden beide weitergegeben damit _build_track_proposals richtig zuordnen kann.
track_proposals = _build_track_proposals(hints, mb_tracks, album, albumartist, composer=artist)
return AlbumProposal(
album_dir=hints.album_dir,
album=album,
albumartist=albumartist,
date=year,
genre=genre,
label=label,
mbid=release_mbid,
cover_path=None,
cover_source=None,
tracks=track_proposals,
confidence=confidence,
sources=sources,
notes=notes,
)
def _build_track_proposals(
hints: AlbumHints,
mb_tracks: Optional[List],
album: str,
album_artist: str,
composer: Optional[str] = None,
) -> List[TrackProposal]:
proposals: List[TrackProposal] = []
for th in sorted(hints.tracks, key=lambda t: (t.disc_number or 1, t.track_number or 9999, str(t.path))):
title = th.title
track_num = th.track_number
disc_num = th.disc_number
# Klassik-Fall: Performer aus Dateiname, Komponist aus LLM
# Wenn th.artist == albumartist (Performer), und wir den Komponisten kennen,
# wird der Komponist als Track-Artist gesetzt → Filename: TT_-_Performer_-_Komponist_-_Werk
th_artist_cf = (th.artist or "").casefold().strip()
aa_cf = album_artist.casefold().strip()
if composer and th_artist_cf == aa_cf and th_artist_cf:
# Performer == albumartist → Komponist als Track-Artist
artist = composer
else:
artist = th.artist or album_artist
# Try to match from MusicBrainz track list
if mb_tracks and track_num:
for mb_t in mb_tracks:
if mb_t["number"] == track_num and mb_t["disc"] == (disc_num or 1):
if mb_t.get("title"):
title = mb_t["title"]
if mb_t.get("artist"):
artist = mb_t["artist"]
break
title = title or th.path.stem
proposals.append(TrackProposal(
path=th.path,
title=title,
artist=artist,
track_number=track_num,
disc_number=disc_num,
mbid=None,
))
# Sequenzielle Nummerierung als letzter Fallback:
# Tracks ohne Nummer (None) erhalten eine laufende Nummer pro Disc.
# Damit werden "00" und "??" im Dateinamen beim --rename verhindert.
if any(p.track_number is None for p in proposals):
disc_counters: Dict[int, int] = {}
for p in proposals:
if p.track_number is None:
disc = p.disc_number or 1
disc_counters[disc] = disc_counters.get(disc, 0) + 1
p.track_number = disc_counters[disc]
return proposals

84
models.py Executable file
View file

@ -0,0 +1,84 @@
from __future__ import annotations
from dataclasses import dataclass, field
from pathlib import Path
from typing import Optional, List, Dict
AUDIO_EXTENSIONS = {
".mp3", ".flac", ".m4a", ".aac", ".ogg", ".opus",
".wav", ".wma", ".aiff", ".ape",
}
IMAGE_EXTENSIONS = {".jpg", ".jpeg", ".png", ".webp", ".bmp", ".gif"}
TRACKLIST_EXTENSIONS = {".txt", ".htm", ".html", ".nfo"}
PLAYLIST_EXTENSIONS = {".m3u", ".m3u8", ".pls"}
@dataclass
class ScannedFile:
path: Path
kind: str # "audio" | "image" | "tracklist" | "playlist" | "other"
@dataclass
class AlbumScan:
album_dir: Path
audio_files: List[Path] = field(default_factory=list)
image_files: List[Path] = field(default_factory=list)
tracklist_files: List[Path] = field(default_factory=list)
playlist_files: List[Path] = field(default_factory=list) # .m3u / .m3u8 / .pls
other_files: List[Path] = field(default_factory=list)
@dataclass
class TrackHints:
path: Path
track_number: Optional[int] = None
disc_number: Optional[int] = None
title: Optional[str] = None
artist: Optional[str] = None
duration: Optional[float] = None
existing_tags: Dict[str, str] = field(default_factory=dict)
@dataclass
class AlbumHints:
album_dir: Path
dir_artist: Optional[str] = None
dir_album: Optional[str] = None
dir_year: Optional[str] = None
tracklist_text: Optional[str] = None # merged text from all tracklist files
cover_images: List[Path] = field(default_factory=list)
tracks: List[TrackHints] = field(default_factory=list)
yt_title: Optional[str] = None # YouTube video title (if found)
yt_uploader: Optional[str] = None # YouTube channel/uploader name
@dataclass
class TrackProposal:
path: Path
title: str
artist: str
track_number: Optional[int]
disc_number: Optional[int]
new_filename: Optional[str] = None # only set when --rename is active
mbid: Optional[str] = None
conductor: Optional[str] = None # classical: Dirigent
orchestra: Optional[str] = None # classical: Orchester / Ensemble
@dataclass
class AlbumProposal:
album_dir: Path
album: str
albumartist: str
date: Optional[str]
genre: Optional[str]
label: Optional[str]
mbid: Optional[str] # MusicBrainz release ID
cover_path: Optional[Path] # resolved local or downloaded cover
cover_source: Optional[str] # "local" | "musicbrainz" | "discogs"
tracks: List[TrackProposal]
confidence: float
sources: List[str] = field(default_factory=list)
notes: List[str] = field(default_factory=list)

269
music_enricher.py Normal file
View file

@ -0,0 +1,269 @@
#!/usr/bin/env python3
"""
music_enricher.py
KI-gestützter Musik-Metadaten-Enricher für Jellyfin-Bibliotheken.
Pipeline pro Album:
Scan HintExtractor MetadataResolver CoverHandler Review Executor
"""
from __future__ import annotations
import argparse
import os
import sys
from pathlib import Path
from typing import Any, Dict, List, Optional
try:
from tqdm import tqdm
HAS_TQDM = True
except ImportError:
HAS_TQDM = False
from models import AlbumProposal
from scanner import scan_album, collect_album_dirs
from hint_extractor import extract_hints
from metadata_resolver import resolve
from cover_handler import resolve_cover
from executor import execute_album, write_report
def maybe_tqdm(iterable, show: bool, **kwargs):
return tqdm(iterable, **kwargs) if show else iterable
# ---------------------------------------------------------------------------
# Review / Display
# ---------------------------------------------------------------------------
def _print_proposal(proposal: AlbumProposal) -> None:
conf_bar = "" * int(proposal.confidence * 10) + "" * (10 - int(proposal.confidence * 10))
print(f"\n{'' * 60}")
print(f"💿 {proposal.album_dir.name}")
print(f" Album: {proposal.album}")
print(f" Artist: {proposal.albumartist}")
print(f" Jahr: {proposal.date or ''}")
print(f" Genre: {proposal.genre or ''}")
print(f" Label: {proposal.label or ''}")
print(f" Cover: {proposal.cover_source or ''} ({proposal.cover_path.name if proposal.cover_path else 'keins'})")
print(f" Konfidenz: [{conf_bar}] {proposal.confidence:.0%} Quellen: {', '.join(proposal.sources) or ''}")
if proposal.notes:
for n in proposal.notes:
print(f" {n}")
print(f" Tracks ({len(proposal.tracks)}):")
for tp in proposal.tracks[:8]:
tn = f"{tp.disc_number}-{tp.track_number:02d}" if tp.disc_number and tp.disc_number > 1 else (
f"{tp.track_number:02d}" if tp.track_number else "??")
print(f" {tn} {tp.artist} {tp.title}")
if len(proposal.tracks) > 8:
print(f" … und {len(proposal.tracks) - 8} weitere")
def _interactive_review(proposal: AlbumProposal) -> bool:
"""Returns True if user accepts the proposal."""
_print_proposal(proposal)
while True:
answer = input("\n [Enter] Akzeptieren [s] Überspringen [q] Abbrechen: ").strip().lower()
if answer in ("", "j", "y"):
return True
if answer == "s":
return False
if answer == "q":
sys.exit(0)
# ---------------------------------------------------------------------------
# Main pipeline
# ---------------------------------------------------------------------------
def process_album(
album_dir: Path,
args: argparse.Namespace,
report_data: List[Dict[str, Any]],
) -> Dict[str, int]:
stats = {"tags_written": 0, "covers_embedded": 0, "files_renamed": 0,
"errors": 0, "skipped": 0}
try:
scan = scan_album(album_dir)
if not scan.audio_files:
stats["skipped"] += 1
return stats
hints = extract_hints(scan, use_ocr=not args.no_api)
proposal = resolve(
hints,
use_fingerprint=not args.no_fingerprint,
use_api=not args.no_api,
use_claude=not args.no_api,
)
# Cover art
cover_path, cover_source = resolve_cover(
hints.cover_images,
proposal.mbid,
album_dir,
)
if cover_path and not args.no_cover:
proposal.cover_path = cover_path
proposal.cover_source = cover_source
# Set proposed filenames if --rename
if args.rename:
from executor import _proposed_filename
for tp in proposal.tracks:
tp.new_filename = _proposed_filename(tp, tp.path.suffix)
# Review step
if args.dry_run:
_print_proposal(proposal)
for tp in proposal.tracks:
report_data.append({
"status": "dry-run",
"album_dir": str(album_dir.name),
"track_path": str(tp.path),
"old_title": tp.path.stem,
"new_title": tp.title,
"old_artist": "",
"new_artist": tp.artist,
"album": proposal.album,
"albumartist": proposal.albumartist,
"date": proposal.date or "",
"genre": proposal.genre or "",
"label": proposal.label or "",
"track_number": tp.track_number or "",
"disc_number": tp.disc_number or "",
"cover_embedded": False,
"renamed_to": tp.new_filename or "",
"confidence": f"{proposal.confidence:.2f}",
"sources": ", ".join(proposal.sources),
})
return stats
accepted = True
if not args.auto:
accepted = _interactive_review(proposal)
elif args.auto and proposal.confidence < args.confidence:
print(f" ⏭️ Konfidenz {proposal.confidence:.0%} < {args.confidence:.0%} → übersprungen: {album_dir.name}")
stats["skipped"] += 1
return stats
else:
_print_proposal(proposal)
if not accepted:
stats["skipped"] += 1
return stats
album_stats = execute_album(
proposal=proposal,
backup_dir=args.backup,
do_rename=args.rename,
embed_cover_art=args.embed_cover,
dry_run=False,
report_data=report_data,
)
for k, v in album_stats.items():
stats[k] = stats.get(k, 0) + v
except Exception as e:
stats["errors"] += 1
print(f" ❌ Fehler in {album_dir.name}: {e}", file=sys.stderr)
import traceback
traceback.print_exc(file=sys.stderr)
return stats
def main() -> None:
parser = argparse.ArgumentParser(
description="KI-gestützter Musik-Metadaten-Enricher für Jellyfin",
formatter_class=argparse.RawTextHelpFormatter,
)
parser.add_argument("paths", nargs="*",
help="Root-Verzeichnisse (direkte Unterordner = Alben)")
parser.add_argument("--album", type=Path,
help="Einzelnes Album-Verzeichnis verarbeiten")
parser.add_argument("--dry-run", action="store_true",
help="Vorschläge anzeigen, nichts schreiben")
parser.add_argument("--auto", action="store_true",
help="Kein interaktiver Review-Schritt")
parser.add_argument("--confidence", type=float, default=0.85,
help="Min-Konfidenz für --auto (default: 0.85)")
parser.add_argument("--rename", action="store_true",
help="Dateien nach Schema umbenennen: TT - Artist - Titel.ext")
parser.add_argument("--embed-cover", action="store_true",
help="Cover-Art in Audiodatei einbetten")
parser.add_argument("--backup", type=Path,
help="Backup-Verzeichnis vor Änderungen")
parser.add_argument("--report", type=Path,
help="CSV-Report der Änderungen")
parser.add_argument("--no-fingerprint", action="store_true",
help="AcoustID-Fingerprinting überspringen")
parser.add_argument("--no-api", action="store_true",
help="Keine externen API-Calls")
parser.add_argument("--no-cover", action="store_true",
help="Kein Cover-Art-Download")
parser.add_argument("--no-tqdm", action="store_true",
help="Fortschrittsanzeige deaktivieren")
args = parser.parse_args()
if not args.album and not args.paths:
parser.error("Mindestens ein Pfad oder --album erforderlich.")
show_progress = HAS_TQDM and not args.no_tqdm and args.auto
report_data: List[Dict[str, Any]] = []
totals: Dict[str, int] = {
"albums": 0, "skipped": 0, "tags_written": 0,
"covers_embedded": 0, "files_renamed": 0, "errors": 0,
}
# Collect album directories
album_dirs: List[Path] = []
if args.album:
album_dirs.append(args.album.expanduser().resolve())
for raw in args.paths:
root = Path(raw).expanduser().resolve()
if not root.is_dir():
print(f"⚠️ Kein Verzeichnis: {root}")
continue
album_dirs.extend(collect_album_dirs(root))
if not album_dirs:
print("⚠️ Keine Album-Verzeichnisse gefunden.")
sys.exit(1)
print(f"🎵 {len(album_dirs)} Album-Verzeichnisse gefunden.")
if os.getenv("OLLAMA_HOST") or True: # Ollama always attempted
print("🤖 LLM-Resolve: Ollama → OpenRouter (kein Claude)")
if not args.no_api:
print("🔍 MusicBrainz-Lookup aktiv.")
if args.dry_run:
print("🧪 DRY-RUN — nichts wird geschrieben.")
for album_dir in maybe_tqdm(album_dirs, show_progress,
desc="Alben", unit="album", dynamic_ncols=True):
stats = process_album(album_dir, args, report_data)
totals["albums"] += 1
for k in ("skipped", "tags_written", "covers_embedded", "files_renamed", "errors"):
totals[k] += stats.get(k, 0)
if args.report and report_data:
write_report(report_data, args.report)
print(f"\n{'=' * 50}")
print("✅ Zusammenfassung:")
print(f" 💿 Alben verarbeitet: {totals['albums']}")
print(f" ⏭️ Übersprungen: {totals['skipped']}")
print(f" 🏷️ Tags geschrieben: {totals['tags_written']}")
print(f" 🖼️ Cover eingebettet: {totals['covers_embedded']}")
print(f" 📝 Dateien umbenannt: {totals['files_renamed']}")
print(f" ❌ Fehler: {totals['errors']}")
if args.dry_run:
print(" 🧪 Modus: DRY-RUN")
print("=" * 50)
if __name__ == "__main__":
main()

99
scanner.py Normal file
View file

@ -0,0 +1,99 @@
from __future__ import annotations
import re
import sys
from pathlib import Path
from typing import List
from models import AlbumScan, AUDIO_EXTENSIONS, IMAGE_EXTENSIONS, TRACKLIST_EXTENSIONS, PLAYLIST_EXTENSIONS
_DISC_DIR_RE = re.compile(r"(?i)^(?:cd|disc|disk|side)[_ \-]*\d{1,2}$")
def _is_hidden(name: str) -> bool:
return name.startswith(".") or name.startswith("_")
def _is_disc_dir(name: str) -> bool:
"""True für Ordner wie 'CD1', 'Disc 2', 'Side A', 'Disk_1'."""
return bool(_DISC_DIR_RE.match(name))
def scan_album(album_dir: Path) -> AlbumScan:
"""
Scannt ein Album-Verzeichnis.
Rekursions-Regel:
- Hat das Album-Verzeichnis selbst Audio-Dateien kein Abstieg in Unterordner
(Einzelscheibe; Sub-Ordner wie Artworks, Scans, irrtümliche Kopien werden ignoriert).
- Hat der Root KEINE Audio-Dateien Abstieg nur in Disc-Unterordner (CD1, Disc 2 ).
"""
result = AlbumScan(album_dir=album_dir)
# Erst nur die Wurzel-Ebene scannen, um zu entscheiden ob rekursiert wird
root_has_audio = any(
(album_dir / name).suffix.lower() in AUDIO_EXTENSIONS
for name in _listdir(album_dir)
if not _is_hidden(name)
)
if root_has_audio:
# Nur Root-Ebene — keine Unterordner
_scan_dir(album_dir, album_dir, result, recurse=False)
else:
# Kein Audio an der Wurzel → Multi-CD: nur Disc-Unterordner
_scan_dir(album_dir, album_dir, result, recurse=True)
result.audio_files.sort()
result.image_files.sort()
result.tracklist_files.sort()
result.playlist_files.sort()
return result
def _listdir(path: Path) -> List[str]:
try:
return [e.name for e in path.iterdir()]
except (PermissionError, OSError) as e:
print(f"⚠️ Scan-Fehler: {e}", file=sys.stderr)
return []
def _scan_dir(current: Path, album_dir: Path, result: AlbumScan, recurse: bool) -> None:
try:
entries = sorted(current.iterdir())
except (PermissionError, OSError) as e:
print(f"⚠️ Scan-Fehler {current}: {e}", file=sys.stderr)
return
for entry in entries:
name = entry.name
if _is_hidden(name):
continue
if entry.is_dir():
if recurse and _is_disc_dir(name):
_scan_dir(entry, album_dir, result, recurse=True)
# Andere Unterordner (Artworks, irrtümliche Kopien…) werden übersprungen
elif entry.is_file():
ext = entry.suffix.lower()
if ext in AUDIO_EXTENSIONS:
result.audio_files.append(entry)
elif ext in IMAGE_EXTENSIONS:
result.image_files.append(entry)
elif ext in TRACKLIST_EXTENSIONS:
result.tracklist_files.append(entry)
elif ext in PLAYLIST_EXTENSIONS:
result.playlist_files.append(entry)
else:
result.other_files.append(entry)
def collect_album_dirs(root: Path) -> List[Path]:
dirs: List[Path] = []
try:
for item in sorted(root.iterdir()):
if item.is_dir() and not _is_hidden(item.name):
dirs.append(item)
except (PermissionError, OSError) as e:
print(f"⚠️ Lesefehler {root}: {e}", file=sys.stderr)
return dirs

274
test_suite_enricher.py Normal file
View file

@ -0,0 +1,274 @@
#!/usr/bin/env python3
"""test_suite_enricher.py — Unit- und Integrationstests für music_enricher."""
from __future__ import annotations
import sys
import tempfile
import traceback
from pathlib import Path
from typing import Callable
sys.path.insert(0, str(Path(__file__).parent))
from models import AlbumScan, TrackHints, AlbumHints
RESULTS: list[dict] = []
def record(test_id: str, passed: bool, detail: str = "") -> None:
RESULTS.append({"id": test_id, "status": "PASS" if passed else "FAIL", "detail": detail})
def run_case(test_id: str, fn: Callable[[], str]) -> None:
try:
detail = fn()
record(test_id, True, detail)
except Exception:
record(test_id, False, traceback.format_exc()[:300])
# ---------------------------------------------------------------------------
# hint_extractor Tests
# ---------------------------------------------------------------------------
def test_parse_dirname_artist_album() -> str:
from hint_extractor import _parse_dirname
artist, album, year = _parse_dirname("Pink_Floyd_-_The_Wall")
assert artist and "Pink" in artist, f"artist: {artist}"
assert album and "Wall" in album, f"album: {album}"
return f"artist={artist!r}, album={album!r}"
def test_parse_dirname_with_year() -> str:
from hint_extractor import _parse_dirname
artist, album, year = _parse_dirname("Abba_-_Greatest_Hits_1992")
assert year == "1992", f"year: {year}"
return f"year={year}"
def test_parse_dirname_album_only() -> str:
from hint_extractor import _parse_dirname
artist, album, year = _parse_dirname("Beethoven_Complete_Edition")
assert album is not None, "album should not be None"
return f"album={album!r}"
def test_parse_filename_track_artist_title() -> str:
from hint_extractor import _parse_filename
r = _parse_filename("07 - ABBA - Dancing Queen")
assert r.get("track") == "07", f"track: {r}"
assert "ABBA" in r.get("artist", ""), f"artist: {r}"
assert "Dancing" in r.get("title", ""), f"title: {r}"
return str(r)
def test_parse_filename_disc_track_title() -> str:
from hint_extractor import _parse_filename
r = _parse_filename("2-07 - Bach - Toccata")
assert r.get("disc") == "2", f"disc: {r}"
assert r.get("track") == "07", f"track: {r}"
return str(r)
def test_parse_filename_track_title() -> str:
from hint_extractor import _parse_filename
r = _parse_filename("01 - Dancing Queen")
assert r.get("track") == "01", f"track: {r}"
assert "Dancing" in r.get("title", ""), f"title: {r}"
return str(r)
def test_parse_filename_artist_title() -> str:
from hint_extractor import _parse_filename
r = _parse_filename("Miles Davis - So What")
assert "Miles" in r.get("artist", ""), f"artist: {r}"
assert "What" in r.get("title", ""), f"title: {r}"
return str(r)
def test_parse_tracklist_numbered() -> str:
from hint_extractor import _parse_tracklist
text = "1. Dancing Queen\n2. Waterloo\n3. Fernando"
tracks = _parse_tracklist(text)
assert len(tracks) == 3, f"count: {len(tracks)}"
assert tracks[0]["title"] == "Dancing Queen", f"title: {tracks[0]}"
return f"{len(tracks)} tracks parsed"
def test_parse_tracklist_with_duration() -> str:
from hint_extractor import _parse_tracklist
text = "1-1 Toccata And Fugue 9:17\n1-2 Heartbeat 2:19\n2-1 Finale 5:00"
tracks = _parse_tracklist(text)
assert len(tracks) >= 2, f"count: {len(tracks)}"
assert tracks[0]["disc"] == "1", f"disc: {tracks[0]}"
return f"{len(tracks)} tracks parsed"
def test_parse_tracklist_with_disc_sections() -> str:
from hint_extractor import _parse_tracklist
text = "CD 1\n1. Track A\n2. Track B\nCD 2\n1. Track C"
tracks = _parse_tracklist(text)
disc2 = [t for t in tracks if t.get("disc") == "2"]
assert len(disc2) >= 1, f"disc2: {disc2}"
return f"{len(tracks)} total, {len(disc2)} on disc 2"
# ---------------------------------------------------------------------------
# Scanner Tests
# ---------------------------------------------------------------------------
def test_scanner_classifies_files() -> str:
from scanner import scan_album
with tempfile.TemporaryDirectory() as tmpdir:
root = Path(tmpdir) / "TestAlbum"
root.mkdir()
(root / "01 - Song.mp3").write_bytes(b"\x00" * 100)
(root / "02 - Song.flac").write_bytes(b"\x00" * 100)
(root / "front.jpg").write_bytes(b"\xff\xd8" + b"\x00" * 100)
(root / "tracklist.txt").write_text("1. Track One\n2. Track Two")
(root / "notes.pdf").write_bytes(b"\x00" * 50)
scan = scan_album(root)
assert len(scan.audio_files) == 2, f"audio: {scan.audio_files}"
assert len(scan.image_files) == 1, f"images: {scan.image_files}"
assert len(scan.tracklist_files) == 1, f"tracklists: {scan.tracklist_files}"
return "scan OK: 2 audio, 1 image, 1 tracklist"
def test_scanner_ignores_hidden() -> str:
from scanner import scan_album
with tempfile.TemporaryDirectory() as tmpdir:
root = Path(tmpdir) / "Album"
root.mkdir()
(root / "song.mp3").write_bytes(b"\x00" * 100)
(root / ".hidden.mp3").write_bytes(b"\x00" * 100)
(root / "_trash.mp3").write_bytes(b"\x00" * 100)
scan = scan_album(root)
assert len(scan.audio_files) == 1, f"should ignore hidden: {scan.audio_files}"
return "hidden files correctly ignored"
# ---------------------------------------------------------------------------
# extract_hints integration
# ---------------------------------------------------------------------------
def test_extract_hints_from_scan() -> str:
from scanner import scan_album
from hint_extractor import extract_hints
with tempfile.TemporaryDirectory() as tmpdir:
root = Path(tmpdir) / "ABBA_-_Greatest_Hits"
root.mkdir()
(root / "01 - ABBA - Dancing Queen.mp3").write_bytes(b"\x00" * 1024)
(root / "02 - ABBA - Waterloo.mp3").write_bytes(b"\x00" * 1024)
(root / "tracklist.txt").write_text("1. Dancing Queen\n2. Waterloo\n")
scan = scan_album(root)
hints = extract_hints(scan)
assert hints.dir_album is not None, "album hint missing"
assert len(hints.tracks) == 2, f"tracks: {len(hints.tracks)}"
assert hints.tracklist_text is not None, "tracklist not read"
return f"hints OK: album={hints.dir_album!r}, {len(hints.tracks)} tracks"
def test_extract_hints_multi_disc() -> str:
from scanner import scan_album
from hint_extractor import extract_hints
with tempfile.TemporaryDirectory() as tmpdir:
root = Path(tmpdir) / "Bach_Complete"
(root / "CD1").mkdir(parents=True)
(root / "CD2").mkdir()
(root / "CD1" / "01 - Toccata.mp3").write_bytes(b"\x00" * 1024)
(root / "CD2" / "01 - Fugue.mp3").write_bytes(b"\x00" * 1024)
scan = scan_album(root)
hints = extract_hints(scan)
disc_nums = {t.disc_number for t in hints.tracks if t.disc_number}
assert 1 in disc_nums, f"disc 1 missing: {disc_nums}"
assert 2 in disc_nums, f"disc 2 missing: {disc_nums}"
return f"disc numbers detected: {disc_nums}"
# ---------------------------------------------------------------------------
# executor Tests
# ---------------------------------------------------------------------------
def test_proposed_filename_single_disc() -> str:
from executor import _proposed_filename
from models import TrackProposal
from pathlib import Path
# Pop schema: albumartist == track artist → TT_-_Artist_-_Title
tp = TrackProposal(path=Path("dummy.mp3"), title="Dancing Queen",
artist="ABBA", track_number=1, disc_number=None)
name = _proposed_filename(tp, ".mp3", albumartist="ABBA")
assert name == "01_-_ABBA_-_Dancing_Queen.mp3", f"got: {name!r}"
return name
def test_proposed_filename_multi_disc() -> str:
from executor import _proposed_filename
from models import TrackProposal
from pathlib import Path
# Classical schema: albumartist (performer) ≠ track artist (composer)
tp = TrackProposal(path=Path("dummy.flac"), title="Toccata",
artist="Bach", track_number=7, disc_number=2)
name = _proposed_filename(tp, ".flac", albumartist="Gardiner")
assert name == "2-07_-_Gardiner_-_Bach_-_Toccata.flac", f"got: {name!r}"
return name
def test_proposed_filename_sanitizes_chars() -> str:
from executor import _proposed_filename
from models import TrackProposal
from pathlib import Path
tp = TrackProposal(path=Path("x.mp3"), title='Track: "Live" / Today',
artist="Artist?", track_number=3, disc_number=None)
name = _proposed_filename(tp, ".mp3")
assert "/" not in name and ":" not in name, f"unsafe chars in: {name!r}"
return name
# ---------------------------------------------------------------------------
# Runner
# ---------------------------------------------------------------------------
def main() -> None:
print("🧪 Starte Music Metadata Enricher Tests...")
cases = [
("UNIT_01_parse_dirname_artist_album", test_parse_dirname_artist_album),
("UNIT_02_parse_dirname_with_year", test_parse_dirname_with_year),
("UNIT_03_parse_dirname_album_only", test_parse_dirname_album_only),
("UNIT_04_parse_filename_track_artist_title", test_parse_filename_track_artist_title),
("UNIT_05_parse_filename_disc_track_title", test_parse_filename_disc_track_title),
("UNIT_06_parse_filename_track_title", test_parse_filename_track_title),
("UNIT_07_parse_filename_artist_title", test_parse_filename_artist_title),
("UNIT_08_parse_tracklist_numbered", test_parse_tracklist_numbered),
("UNIT_09_parse_tracklist_with_duration", test_parse_tracklist_with_duration),
("UNIT_10_parse_tracklist_disc_sections", test_parse_tracklist_with_disc_sections),
("UNIT_11_scanner_classifies_files", test_scanner_classifies_files),
("UNIT_12_scanner_ignores_hidden", test_scanner_ignores_hidden),
("UNIT_13_extract_hints_from_scan", test_extract_hints_from_scan),
("UNIT_14_extract_hints_multi_disc", test_extract_hints_multi_disc),
("UNIT_15_proposed_filename_single_disc", test_proposed_filename_single_disc),
("UNIT_16_proposed_filename_multi_disc", test_proposed_filename_multi_disc),
("UNIT_17_proposed_filename_sanitizes_chars", test_proposed_filename_sanitizes_chars),
]
for test_id, fn in cases:
run_case(test_id, fn)
print("=" * 70)
for r in RESULTS:
icon = "" if r["status"] == "PASS" else ""
detail = r["detail"][:100] + "..." if len(r["detail"]) > 100 else r["detail"]
print(f"{icon} [{r['status']}] {r['id']} {detail}")
print("=" * 70)
passed = sum(1 for r in RESULTS if r["status"] == "PASS")
total = len(RESULTS)
print(f"📊 {passed}/{total} Tests erfolgreich")
sys.exit(0 if passed == total else 1)
if __name__ == "__main__":
main()