Add 4 new cover/tracklist sources: MB back cover, iTunes, Last.fm, Discogs tracklist

cover_handler.py:
- _download_image(): shared helper replaces duplicated download logic
- download_back_cover(): fetches back cover from MusicBrainz CAA (/back endpoint),
  saves as back.jpg; skips if already present
- _itunes_cover_url() / download_itunes_cover(): iTunes Search API (no auth),
  requests 600x600 artwork; fallback after Discogs
- _lastfm_cover_url() / download_lastfm_cover(): Last.fm album.getinfo
  (LASTFM_API_KEY env var); last cover fallback, skips placeholder images
- resolve_cover(): extended with iTunes → Last.fm fallback chain

metadata_resolver.py:
- _discogs_get_tracklist(): fetches full Discogs release via REST API,
  parses tracklist[] including heading-based disc detection
- _lastfm_tracklist(): fetches Last.fm album.getinfo tracks (LASTFM_API_KEY)
- resolve(): uses Discogs tracklist → Last.fm tracklist as fallback when
  MusicBrainz returns no tracks; LASTFM_API_KEY added to env var block

music_enricher.py:
- process_album(): calls download_back_cover() after execute_album() when MBID known

New cover priority:  local → MusicBrainz front → Discogs → iTunes → Last.fm
New tracklist priority: local → YouTube → MusicBrainz → Discogs → Last.fm → OCR
Test suite: 29 → 33 tests (all pass)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Dieter Schlüter 2026-04-29 08:55:17 +02:00
commit 80472653b4
4 changed files with 273 additions and 33 deletions

View file

@ -1,5 +1,7 @@
from __future__ import annotations
import io
import os
import sys
import tempfile
import time
@ -108,21 +110,14 @@ def _mb_cover_url(release_mbid: str) -> Optional[str]:
return None
def download_cover(release_mbid: Optional[str], dest_dir: Path) -> Optional[Path]:
if not release_mbid or not HAS_REQUESTS:
return None
url = _mb_cover_url(release_mbid)
if not url:
return None
dest = dest_dir / "folder.jpg"
def _download_image(url: str, dest: Path, label: str = "") -> Optional[Path]:
"""Hilfsfunktion: URL herunterladen, PNG→JPEG konvertieren, als dest speichern."""
try:
r = requests.get(url, timeout=15)
r = requests.get(url, timeout=15, headers={"User-Agent": "MusicMetadataEnricher/1.0"})
if r.status_code != 200:
return None
ct = r.headers.get("content-type", "")
if "png" in ct and HAS_PIL:
# PNG → JPEG konvertieren
import io
if ("png" in ct or url.lower().endswith(".png")) and HAS_PIL:
with Image.open(io.BytesIO(r.content)) as img:
buf = io.BytesIO()
img.convert("RGB").save(buf, format="JPEG", quality=92)
@ -133,11 +128,38 @@ def download_cover(release_mbid: Optional[str], dest_dir: Path) -> Optional[Path
return dest
dest.unlink(missing_ok=True)
except Exception as e:
print(f" ⚠️ Cover-Download-Fehler: {e}", file=sys.stderr)
if label:
print(f" ⚠️ {label}: {e}", file=sys.stderr)
dest.unlink(missing_ok=True)
return None
def download_cover(release_mbid: Optional[str], dest_dir: Path) -> Optional[Path]:
if not release_mbid or not HAS_REQUESTS:
return None
url = _mb_cover_url(release_mbid)
if not url:
return None
return _download_image(url, dest_dir / "folder.jpg", "Cover-Download-Fehler")
def download_back_cover(release_mbid: Optional[str], dest_dir: Path) -> Optional[Path]:
"""Lädt das Back-Cover von MusicBrainz Cover Art Archive als back.jpg."""
if not release_mbid or not HAS_REQUESTS:
return None
dest = dest_dir / "back.jpg"
if dest.exists():
return dest # bereits vorhanden
url = f"https://coverartarchive.org/release/{release_mbid}/back"
try:
r = requests.head(url, timeout=5, allow_redirects=True)
if r.status_code != 200:
return None
except Exception:
return None
return _download_image(url, dest, "Back-Cover-Fehler")
def _load_cover_data(cover_path: Path) -> tuple[bytes, str]:
"""
Liest Cover-Bilddaten und gibt (bytes, mime_type) zurück.
@ -253,29 +275,71 @@ def download_discogs_cover(artist: Optional[str], album: Optional[str], dest_dir
url = _discogs_cover_url(artist, album)
if not url:
return None
dest = dest_dir / "folder.jpg"
return _download_image(url, dest_dir / "folder.jpg", "Discogs-Cover-Fehler")
def _itunes_cover_url(artist: Optional[str], album: Optional[str]) -> Optional[str]:
"""Sucht auf iTunes nach artist+album, gibt 600x600-Artwork-URL zurück."""
if not HAS_REQUESTS or not (artist or album):
return None
term = f"{artist or ''} {album or ''}".strip()
try:
r = requests.get(url, timeout=15, headers={"User-Agent": "MusicMetadataEnricher/1.0"})
r = requests.get(
"https://itunes.apple.com/search",
params={"term": term, "media": "music", "entity": "album", "limit": 5},
timeout=8,
)
if r.status_code != 200:
return None
ct = r.headers.get("content-type", "")
if ("png" in ct or url.lower().endswith(".png")) and HAS_PIL:
import io
with Image.open(io.BytesIO(r.content)) as img:
buf = io.BytesIO()
img.convert("RGB").save(buf, format="JPEG", quality=92)
dest.write_bytes(buf.getvalue())
else:
dest.write_bytes(r.content)
if _image_ok(dest):
return dest
dest.unlink(missing_ok=True)
for result in r.json().get("results", []):
url = result.get("artworkUrl100", "")
if url:
# Auf 600x600 hochskalieren
return url.replace("100x100bb", "600x600bb").replace("100x100", "600x600")
except Exception as e:
print(f" ⚠️ Discogs-Cover-Fehler: {e}", file=sys.stderr)
dest.unlink(missing_ok=True)
print(f" ⚠️ iTunes-Suche: {e}", file=sys.stderr)
return None
def download_itunes_cover(artist: Optional[str], album: Optional[str], dest_dir: Path) -> Optional[Path]:
url = _itunes_cover_url(artist, album)
if not url:
return None
return _download_image(url, dest_dir / "folder.jpg", "iTunes-Cover-Fehler")
def _lastfm_cover_url(artist: Optional[str], album: Optional[str]) -> Optional[str]:
"""Last.fm album.getinfo → größtes verfügbares Artwork-URL."""
api_key = os.getenv("LASTFM_API_KEY", "")
if not HAS_REQUESTS or not api_key or not artist or not album:
return None
try:
r = requests.get(
"https://ws.audioscrobbler.com/2.0/",
params={"method": "album.getinfo", "api_key": api_key,
"artist": artist, "album": album, "format": "json"},
timeout=8,
)
if r.status_code != 200:
return None
images = r.json().get("album", {}).get("image", [])
# Images sind aufsteigend nach Größe sortiert: small, medium, large, extralarge, mega
for img in reversed(images):
url = img.get("#text", "")
if url and "2a96cbd8b46e442fc41c2b86b821562f" not in url: # Last.fm Platzhalter-Hash
return url
except Exception as e:
print(f" ⚠️ Last.fm-Cover: {e}", file=sys.stderr)
return None
def download_lastfm_cover(artist: Optional[str], album: Optional[str], dest_dir: Path) -> Optional[Path]:
url = _lastfm_cover_url(artist, album)
if not url:
return None
return _download_image(url, dest_dir / "folder.jpg", "Last.fm-Cover-Fehler")
def resolve_cover(
image_files: List[Path],
release_mbid: Optional[str],
@ -299,4 +363,14 @@ def resolve_cover(
if downloaded:
return downloaded, "discogs"
if artist or album:
downloaded = download_itunes_cover(artist, album, album_dir)
if downloaded:
return downloaded, "itunes"
if artist or album:
downloaded = download_lastfm_cover(artist, album, album_dir)
if downloaded:
return downloaded, "lastfm"
return None, None

View file

@ -92,11 +92,12 @@ def normalize_genre(genre: Optional[str]) -> Optional[str]:
_MB_RATE_LIMIT = 1.1 # seconds between MusicBrainz requests
_last_mb_call = 0.0
ACOUSTID_API_KEY = os.getenv("ACOUSTID_API_KEY", "")
ANTHROPIC_API_KEY = os.getenv("ANTHROPIC_API_KEY", "")
ACOUSTID_API_KEY = os.getenv("ACOUSTID_API_KEY", "")
ANTHROPIC_API_KEY = os.getenv("ANTHROPIC_API_KEY", "")
OPENROUTER_API_KEY = os.getenv("OPENROUTER_API_KEY", "")
DISCOGS_TOKEN = os.getenv("DISCOGS_TOKEN", "")
OLLAMA_HOST = os.getenv("OLLAMA_HOST", "http://localhost:11434")
DISCOGS_TOKEN = os.getenv("DISCOGS_TOKEN", "")
LASTFM_API_KEY = os.getenv("LASTFM_API_KEY", "")
OLLAMA_HOST = os.getenv("OLLAMA_HOST", "http://localhost:11434")
# qwen3:8b (5.2GB) reicht für einfache JSON-Metadaten-Ergänzung und lädt schnell (~10s)
OLLAMA_RESOLVE_MODEL = os.getenv("OLLAMA_RESOLVE_MODEL", "qwen3:8b")
@ -231,6 +232,78 @@ def _discogs_search(artist: Optional[str], album: Optional[str]) -> Optional[Dic
return None
def _discogs_get_tracklist(release_id) -> List[Dict]:
"""Holt die vollständige Tracklist eines Discogs-Release via REST-API."""
try:
import requests as _req
token = DISCOGS_TOKEN
headers = {"User-Agent": "MusicMetadataEnricher/1.0"}
if token:
headers["Authorization"] = f"Discogs token={token}"
r = _req.get(f"https://api.discogs.com/releases/{release_id}",
headers=headers, timeout=10)
if r.status_code != 200:
return []
tracklist = r.json().get("tracklist", [])
result = []
disc = 1
track_num = 0
for entry in tracklist:
if entry.get("type_") == "heading":
# Disc-Trennzeile ("CD 1", "Side A", …) — Disc hochzählen
disc += 1
track_num = 0
continue
pos = entry.get("position", "")
# Position kann "A1", "1", "1.2", "B3" sein
num_match = re.search(r"\d+", pos)
track_num = int(num_match.group()) if num_match else track_num + 1
result.append({
"disc": disc,
"number": track_num,
"title": entry.get("title", ""),
"artist": "",
})
return result
except Exception as e:
print(f" ⚠️ Discogs-Tracklist-Fehler: {e}", file=sys.stderr)
return []
def _lastfm_tracklist(artist: Optional[str], album: Optional[str]) -> List[Dict]:
"""Holt die Tracklist von Last.fm album.getinfo (LASTFM_API_KEY erforderlich)."""
api_key = os.getenv("LASTFM_API_KEY", "")
if not api_key or not artist or not album:
return []
try:
import requests as _req
r = _req.get(
"https://ws.audioscrobbler.com/2.0/",
params={"method": "album.getinfo", "api_key": api_key,
"artist": artist, "album": album, "format": "json"},
timeout=8,
)
if r.status_code != 200:
return []
tracks = r.json().get("album", {}).get("tracks", {}).get("track", [])
if isinstance(tracks, dict): # Einzelner Track → Liste
tracks = [tracks]
result = []
for t in tracks:
attr = t.get("@attr", {})
num = int(attr.get("rank", 0))
result.append({
"disc": 1,
"number": num,
"title": t.get("name", ""),
"artist": t.get("artist", {}).get("name", "") if isinstance(t.get("artist"), dict) else "",
})
return result
except Exception as e:
print(f" ⚠️ Last.fm-Tracklist-Fehler: {e}", file=sys.stderr)
return []
# ---------------------------------------------------------------------------
# Claude API reasoning (optional)
# ---------------------------------------------------------------------------
@ -485,6 +558,7 @@ def resolve(
})
# Discogs fallback
discogs_release_id = None
if use_api and HAS_DISCOGS and DISCOGS_TOKEN and not release_mbid:
dg = _discogs_search(artist, album)
if dg:
@ -493,9 +567,23 @@ def resolve(
year = year or dg.get("year")
genre = genre or dg.get("genre")
label = label or dg.get("label")
discogs_release_id = dg.get("id")
confidence += 0.15
sources.append("discogs")
# Tracklist-Fallbacks: Discogs → Last.fm (wenn MusicBrainz keine Tracks geliefert hat)
if use_api and not mb_tracks:
if discogs_release_id:
dg_tracks = _discogs_get_tracklist(discogs_release_id)
if dg_tracks:
mb_tracks = dg_tracks
sources.append("discogs-tracklist")
if not mb_tracks:
lfm_tracks = _lastfm_tracklist(artist, album)
if lfm_tracks:
mb_tracks = lfm_tracks
sources.append("lastfm-tracklist")
# LLM-Reasoning für verbleibende Lücken:
# Reihenfolge: Ollama lokal → OpenRouter (DeepSeek, günstig) → Claude API
cl_albumartist: Optional[str] = None

View file

@ -26,7 +26,7 @@ from models import AlbumProposal
from scanner import scan_album, collect_album_dirs
from hint_extractor import extract_hints
from metadata_resolver import resolve
from cover_handler import resolve_cover
from cover_handler import resolve_cover, download_back_cover
from executor import execute_album, write_report
@ -216,6 +216,12 @@ def process_album(
for k, v in album_stats.items():
stats[k] = stats.get(k, 0) + v
# Back-Cover von MusicBrainz holen (wenn MBID bekannt und noch kein back.jpg)
if proposal.mbid and not args.no_cover and not args.dry_run:
back = download_back_cover(proposal.mbid, album_dir)
if back:
print(f" 🖼️ Back-Cover heruntergeladen: {back.name}")
# Jellyfin Playlist Generator aufrufen
generator_path = _find_jellyfin_generator(album_dir, getattr(args, "playlist_generator", None))
if generator_path:

View file

@ -322,6 +322,74 @@ def test_normalize_cover_renames_front_jpg() -> str:
return "Front.jpg → folder.jpg rename OK"
# ---------------------------------------------------------------------------
# New cover sources Tests
# ---------------------------------------------------------------------------
def test_itunes_cover_url_format() -> str:
from cover_handler import _itunes_cover_url
# Ohne echten Netzwerkaufruf: testen ob Funktion bei leeren Eingaben None zurückgibt
assert _itunes_cover_url(None, None) is None, "None inputs → None"
assert _itunes_cover_url("", "") is None, "empty inputs → None"
return "iTunes URL helper: None-Handling OK"
def test_discogs_tracklist_format() -> str:
from metadata_resolver import _discogs_get_tracklist
# Simuliere API-Antwort-Parsing mit einem Testfall
import unittest.mock as mock
fake_response = {
"tracklist": [
{"position": "1", "type_": "track", "title": "Song A", "duration": "3:20"},
{"type_": "heading", "title": "CD 2"},
{"position": "1", "type_": "track", "title": "Song B", "duration": "4:00"},
]
}
with mock.patch("requests.get") as mock_get:
mock_get.return_value.status_code = 200
mock_get.return_value.json.return_value = fake_response
tracks = _discogs_get_tracklist(12345)
assert len(tracks) == 2, f"expected 2 tracks, got {len(tracks)}"
assert tracks[0]["title"] == "Song A", f"track 0: {tracks[0]}"
assert tracks[1]["disc"] == 2, f"disc should be 2 after heading: {tracks[1]}"
return f"Discogs tracklist format OK: {len(tracks)} tracks"
def test_lastfm_tracklist_format() -> str:
from metadata_resolver import _lastfm_tracklist
import unittest.mock as mock, os
fake_response = {
"album": {
"tracks": {
"track": [
{"name": "Track One", "@attr": {"rank": "1"}, "artist": {"name": "Artist"}},
{"name": "Track Two", "@attr": {"rank": "2"}, "artist": {"name": "Artist"}},
]
}
}
}
with mock.patch.dict(os.environ, {"LASTFM_API_KEY": "testkey"}):
with mock.patch("requests.get") as mock_get:
mock_get.return_value.status_code = 200
mock_get.return_value.json.return_value = fake_response
tracks = _lastfm_tracklist("Artist", "Album")
assert len(tracks) == 2, f"expected 2 tracks, got {len(tracks)}"
assert tracks[0]["title"] == "Track One", f"track 0: {tracks[0]}"
assert tracks[0]["number"] == 1, f"rank/number: {tracks[0]}"
return f"Last.fm tracklist format OK: {len(tracks)} tracks"
def test_back_cover_skips_if_exists() -> str:
from cover_handler import download_back_cover
with tempfile.TemporaryDirectory() as tmpdir:
root = Path(tmpdir)
back = root / "back.jpg"
back.write_bytes(b"\xff\xd8" + b"\x00" * 200)
result = download_back_cover("fake-mbid", root)
assert result == back, f"should return existing back.jpg: {result}"
return "back cover skip-if-exists OK"
# ---------------------------------------------------------------------------
# executor Tests
# ---------------------------------------------------------------------------
@ -399,6 +467,10 @@ def main() -> None:
("UNIT_27_is_classical_false_pop", test_is_classical_false_for_pop),
("UNIT_28_is_classical_false_folk", test_is_classical_false_for_folk),
("UNIT_29_normalize_cover_renames", test_normalize_cover_renames_front_jpg),
("UNIT_30_itunes_url_none_handling", test_itunes_cover_url_format),
("UNIT_31_discogs_tracklist_format", test_discogs_tracklist_format),
("UNIT_32_lastfm_tracklist_format", test_lastfm_tracklist_format),
("UNIT_33_back_cover_skip_if_exists", test_back_cover_skips_if_exists),
]
for test_id, fn in cases: