From 80472653b49e21753b00e95e78a67ebca511bc78 Mon Sep 17 00:00:00 2001 From: dschlueter Date: Wed, 29 Apr 2026 08:55:17 +0200 Subject: [PATCH] Add 4 new cover/tracklist sources: MB back cover, iTunes, Last.fm, Discogs tracklist MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit cover_handler.py: - _download_image(): shared helper replaces duplicated download logic - download_back_cover(): fetches back cover from MusicBrainz CAA (/back endpoint), saves as back.jpg; skips if already present - _itunes_cover_url() / download_itunes_cover(): iTunes Search API (no auth), requests 600x600 artwork; fallback after Discogs - _lastfm_cover_url() / download_lastfm_cover(): Last.fm album.getinfo (LASTFM_API_KEY env var); last cover fallback, skips placeholder images - resolve_cover(): extended with iTunes → Last.fm fallback chain metadata_resolver.py: - _discogs_get_tracklist(): fetches full Discogs release via REST API, parses tracklist[] including heading-based disc detection - _lastfm_tracklist(): fetches Last.fm album.getinfo tracks (LASTFM_API_KEY) - resolve(): uses Discogs tracklist → Last.fm tracklist as fallback when MusicBrainz returns no tracks; LASTFM_API_KEY added to env var block music_enricher.py: - process_album(): calls download_back_cover() after execute_album() when MBID known New cover priority: local → MusicBrainz front → Discogs → iTunes → Last.fm New tracklist priority: local → YouTube → MusicBrainz → Discogs → Last.fm → OCR Test suite: 29 → 33 tests (all pass) Co-Authored-By: Claude Sonnet 4.6 --- cover_handler.py | 130 ++++++++++++++++++++++++++++++++--------- metadata_resolver.py | 96 ++++++++++++++++++++++++++++-- music_enricher.py | 8 ++- test_suite_enricher.py | 72 +++++++++++++++++++++++ 4 files changed, 273 insertions(+), 33 deletions(-) diff --git a/cover_handler.py b/cover_handler.py index 8ebdce7..61e30e6 100755 --- a/cover_handler.py +++ b/cover_handler.py @@ -1,5 +1,7 @@ from __future__ import annotations +import io +import os import sys import tempfile import time @@ -108,21 +110,14 @@ def _mb_cover_url(release_mbid: str) -> Optional[str]: return None -def download_cover(release_mbid: Optional[str], dest_dir: Path) -> Optional[Path]: - if not release_mbid or not HAS_REQUESTS: - return None - url = _mb_cover_url(release_mbid) - if not url: - return None - dest = dest_dir / "folder.jpg" +def _download_image(url: str, dest: Path, label: str = "") -> Optional[Path]: + """Hilfsfunktion: URL herunterladen, PNG→JPEG konvertieren, als dest speichern.""" try: - r = requests.get(url, timeout=15) + r = requests.get(url, timeout=15, headers={"User-Agent": "MusicMetadataEnricher/1.0"}) if r.status_code != 200: return None ct = r.headers.get("content-type", "") - if "png" in ct and HAS_PIL: - # PNG → JPEG konvertieren - import io + if ("png" in ct or url.lower().endswith(".png")) and HAS_PIL: with Image.open(io.BytesIO(r.content)) as img: buf = io.BytesIO() img.convert("RGB").save(buf, format="JPEG", quality=92) @@ -133,11 +128,38 @@ def download_cover(release_mbid: Optional[str], dest_dir: Path) -> Optional[Path return dest dest.unlink(missing_ok=True) except Exception as e: - print(f" ⚠️ Cover-Download-Fehler: {e}", file=sys.stderr) + if label: + print(f" ⚠️ {label}: {e}", file=sys.stderr) dest.unlink(missing_ok=True) return None +def download_cover(release_mbid: Optional[str], dest_dir: Path) -> Optional[Path]: + if not release_mbid or not HAS_REQUESTS: + return None + url = _mb_cover_url(release_mbid) + if not url: + return None + return _download_image(url, dest_dir / "folder.jpg", "Cover-Download-Fehler") + + +def download_back_cover(release_mbid: Optional[str], dest_dir: Path) -> Optional[Path]: + """Lädt das Back-Cover von MusicBrainz Cover Art Archive als back.jpg.""" + if not release_mbid or not HAS_REQUESTS: + return None + dest = dest_dir / "back.jpg" + if dest.exists(): + return dest # bereits vorhanden + url = f"https://coverartarchive.org/release/{release_mbid}/back" + try: + r = requests.head(url, timeout=5, allow_redirects=True) + if r.status_code != 200: + return None + except Exception: + return None + return _download_image(url, dest, "Back-Cover-Fehler") + + def _load_cover_data(cover_path: Path) -> tuple[bytes, str]: """ Liest Cover-Bilddaten und gibt (bytes, mime_type) zurück. @@ -253,29 +275,71 @@ def download_discogs_cover(artist: Optional[str], album: Optional[str], dest_dir url = _discogs_cover_url(artist, album) if not url: return None - dest = dest_dir / "folder.jpg" + return _download_image(url, dest_dir / "folder.jpg", "Discogs-Cover-Fehler") + + +def _itunes_cover_url(artist: Optional[str], album: Optional[str]) -> Optional[str]: + """Sucht auf iTunes nach artist+album, gibt 600x600-Artwork-URL zurück.""" + if not HAS_REQUESTS or not (artist or album): + return None + term = f"{artist or ''} {album or ''}".strip() try: - r = requests.get(url, timeout=15, headers={"User-Agent": "MusicMetadataEnricher/1.0"}) + r = requests.get( + "https://itunes.apple.com/search", + params={"term": term, "media": "music", "entity": "album", "limit": 5}, + timeout=8, + ) if r.status_code != 200: return None - ct = r.headers.get("content-type", "") - if ("png" in ct or url.lower().endswith(".png")) and HAS_PIL: - import io - with Image.open(io.BytesIO(r.content)) as img: - buf = io.BytesIO() - img.convert("RGB").save(buf, format="JPEG", quality=92) - dest.write_bytes(buf.getvalue()) - else: - dest.write_bytes(r.content) - if _image_ok(dest): - return dest - dest.unlink(missing_ok=True) + for result in r.json().get("results", []): + url = result.get("artworkUrl100", "") + if url: + # Auf 600x600 hochskalieren + return url.replace("100x100bb", "600x600bb").replace("100x100", "600x600") except Exception as e: - print(f" ⚠️ Discogs-Cover-Fehler: {e}", file=sys.stderr) - dest.unlink(missing_ok=True) + print(f" ⚠️ iTunes-Suche: {e}", file=sys.stderr) return None +def download_itunes_cover(artist: Optional[str], album: Optional[str], dest_dir: Path) -> Optional[Path]: + url = _itunes_cover_url(artist, album) + if not url: + return None + return _download_image(url, dest_dir / "folder.jpg", "iTunes-Cover-Fehler") + + +def _lastfm_cover_url(artist: Optional[str], album: Optional[str]) -> Optional[str]: + """Last.fm album.getinfo → größtes verfügbares Artwork-URL.""" + api_key = os.getenv("LASTFM_API_KEY", "") + if not HAS_REQUESTS or not api_key or not artist or not album: + return None + try: + r = requests.get( + "https://ws.audioscrobbler.com/2.0/", + params={"method": "album.getinfo", "api_key": api_key, + "artist": artist, "album": album, "format": "json"}, + timeout=8, + ) + if r.status_code != 200: + return None + images = r.json().get("album", {}).get("image", []) + # Images sind aufsteigend nach Größe sortiert: small, medium, large, extralarge, mega + for img in reversed(images): + url = img.get("#text", "") + if url and "2a96cbd8b46e442fc41c2b86b821562f" not in url: # Last.fm Platzhalter-Hash + return url + except Exception as e: + print(f" ⚠️ Last.fm-Cover: {e}", file=sys.stderr) + return None + + +def download_lastfm_cover(artist: Optional[str], album: Optional[str], dest_dir: Path) -> Optional[Path]: + url = _lastfm_cover_url(artist, album) + if not url: + return None + return _download_image(url, dest_dir / "folder.jpg", "Last.fm-Cover-Fehler") + + def resolve_cover( image_files: List[Path], release_mbid: Optional[str], @@ -299,4 +363,14 @@ def resolve_cover( if downloaded: return downloaded, "discogs" + if artist or album: + downloaded = download_itunes_cover(artist, album, album_dir) + if downloaded: + return downloaded, "itunes" + + if artist or album: + downloaded = download_lastfm_cover(artist, album, album_dir) + if downloaded: + return downloaded, "lastfm" + return None, None diff --git a/metadata_resolver.py b/metadata_resolver.py index 94a1ca7..a7b3803 100755 --- a/metadata_resolver.py +++ b/metadata_resolver.py @@ -92,11 +92,12 @@ def normalize_genre(genre: Optional[str]) -> Optional[str]: _MB_RATE_LIMIT = 1.1 # seconds between MusicBrainz requests _last_mb_call = 0.0 -ACOUSTID_API_KEY = os.getenv("ACOUSTID_API_KEY", "") -ANTHROPIC_API_KEY = os.getenv("ANTHROPIC_API_KEY", "") +ACOUSTID_API_KEY = os.getenv("ACOUSTID_API_KEY", "") +ANTHROPIC_API_KEY = os.getenv("ANTHROPIC_API_KEY", "") OPENROUTER_API_KEY = os.getenv("OPENROUTER_API_KEY", "") -DISCOGS_TOKEN = os.getenv("DISCOGS_TOKEN", "") -OLLAMA_HOST = os.getenv("OLLAMA_HOST", "http://localhost:11434") +DISCOGS_TOKEN = os.getenv("DISCOGS_TOKEN", "") +LASTFM_API_KEY = os.getenv("LASTFM_API_KEY", "") +OLLAMA_HOST = os.getenv("OLLAMA_HOST", "http://localhost:11434") # qwen3:8b (5.2GB) reicht für einfache JSON-Metadaten-Ergänzung und lädt schnell (~10s) OLLAMA_RESOLVE_MODEL = os.getenv("OLLAMA_RESOLVE_MODEL", "qwen3:8b") @@ -231,6 +232,78 @@ def _discogs_search(artist: Optional[str], album: Optional[str]) -> Optional[Dic return None +def _discogs_get_tracklist(release_id) -> List[Dict]: + """Holt die vollständige Tracklist eines Discogs-Release via REST-API.""" + try: + import requests as _req + token = DISCOGS_TOKEN + headers = {"User-Agent": "MusicMetadataEnricher/1.0"} + if token: + headers["Authorization"] = f"Discogs token={token}" + r = _req.get(f"https://api.discogs.com/releases/{release_id}", + headers=headers, timeout=10) + if r.status_code != 200: + return [] + tracklist = r.json().get("tracklist", []) + result = [] + disc = 1 + track_num = 0 + for entry in tracklist: + if entry.get("type_") == "heading": + # Disc-Trennzeile ("CD 1", "Side A", …) — Disc hochzählen + disc += 1 + track_num = 0 + continue + pos = entry.get("position", "") + # Position kann "A1", "1", "1.2", "B3" sein + num_match = re.search(r"\d+", pos) + track_num = int(num_match.group()) if num_match else track_num + 1 + result.append({ + "disc": disc, + "number": track_num, + "title": entry.get("title", ""), + "artist": "", + }) + return result + except Exception as e: + print(f" ⚠️ Discogs-Tracklist-Fehler: {e}", file=sys.stderr) + return [] + + +def _lastfm_tracklist(artist: Optional[str], album: Optional[str]) -> List[Dict]: + """Holt die Tracklist von Last.fm album.getinfo (LASTFM_API_KEY erforderlich).""" + api_key = os.getenv("LASTFM_API_KEY", "") + if not api_key or not artist or not album: + return [] + try: + import requests as _req + r = _req.get( + "https://ws.audioscrobbler.com/2.0/", + params={"method": "album.getinfo", "api_key": api_key, + "artist": artist, "album": album, "format": "json"}, + timeout=8, + ) + if r.status_code != 200: + return [] + tracks = r.json().get("album", {}).get("tracks", {}).get("track", []) + if isinstance(tracks, dict): # Einzelner Track → Liste + tracks = [tracks] + result = [] + for t in tracks: + attr = t.get("@attr", {}) + num = int(attr.get("rank", 0)) + result.append({ + "disc": 1, + "number": num, + "title": t.get("name", ""), + "artist": t.get("artist", {}).get("name", "") if isinstance(t.get("artist"), dict) else "", + }) + return result + except Exception as e: + print(f" ⚠️ Last.fm-Tracklist-Fehler: {e}", file=sys.stderr) + return [] + + # --------------------------------------------------------------------------- # Claude API reasoning (optional) # --------------------------------------------------------------------------- @@ -485,6 +558,7 @@ def resolve( }) # Discogs fallback + discogs_release_id = None if use_api and HAS_DISCOGS and DISCOGS_TOKEN and not release_mbid: dg = _discogs_search(artist, album) if dg: @@ -493,9 +567,23 @@ def resolve( year = year or dg.get("year") genre = genre or dg.get("genre") label = label or dg.get("label") + discogs_release_id = dg.get("id") confidence += 0.15 sources.append("discogs") + # Tracklist-Fallbacks: Discogs → Last.fm (wenn MusicBrainz keine Tracks geliefert hat) + if use_api and not mb_tracks: + if discogs_release_id: + dg_tracks = _discogs_get_tracklist(discogs_release_id) + if dg_tracks: + mb_tracks = dg_tracks + sources.append("discogs-tracklist") + if not mb_tracks: + lfm_tracks = _lastfm_tracklist(artist, album) + if lfm_tracks: + mb_tracks = lfm_tracks + sources.append("lastfm-tracklist") + # LLM-Reasoning für verbleibende Lücken: # Reihenfolge: Ollama lokal → OpenRouter (DeepSeek, günstig) → Claude API cl_albumartist: Optional[str] = None diff --git a/music_enricher.py b/music_enricher.py index c99af9e..f35ebe1 100755 --- a/music_enricher.py +++ b/music_enricher.py @@ -26,7 +26,7 @@ from models import AlbumProposal from scanner import scan_album, collect_album_dirs from hint_extractor import extract_hints from metadata_resolver import resolve -from cover_handler import resolve_cover +from cover_handler import resolve_cover, download_back_cover from executor import execute_album, write_report @@ -216,6 +216,12 @@ def process_album( for k, v in album_stats.items(): stats[k] = stats.get(k, 0) + v + # Back-Cover von MusicBrainz holen (wenn MBID bekannt und noch kein back.jpg) + if proposal.mbid and not args.no_cover and not args.dry_run: + back = download_back_cover(proposal.mbid, album_dir) + if back: + print(f" 🖼️ Back-Cover heruntergeladen: {back.name}") + # Jellyfin Playlist Generator aufrufen generator_path = _find_jellyfin_generator(album_dir, getattr(args, "playlist_generator", None)) if generator_path: diff --git a/test_suite_enricher.py b/test_suite_enricher.py index e61121b..73697a7 100755 --- a/test_suite_enricher.py +++ b/test_suite_enricher.py @@ -322,6 +322,74 @@ def test_normalize_cover_renames_front_jpg() -> str: return "Front.jpg → folder.jpg rename OK" +# --------------------------------------------------------------------------- +# New cover sources Tests +# --------------------------------------------------------------------------- + +def test_itunes_cover_url_format() -> str: + from cover_handler import _itunes_cover_url + # Ohne echten Netzwerkaufruf: testen ob Funktion bei leeren Eingaben None zurückgibt + assert _itunes_cover_url(None, None) is None, "None inputs → None" + assert _itunes_cover_url("", "") is None, "empty inputs → None" + return "iTunes URL helper: None-Handling OK" + + +def test_discogs_tracklist_format() -> str: + from metadata_resolver import _discogs_get_tracklist + # Simuliere API-Antwort-Parsing mit einem Testfall + import unittest.mock as mock + fake_response = { + "tracklist": [ + {"position": "1", "type_": "track", "title": "Song A", "duration": "3:20"}, + {"type_": "heading", "title": "CD 2"}, + {"position": "1", "type_": "track", "title": "Song B", "duration": "4:00"}, + ] + } + with mock.patch("requests.get") as mock_get: + mock_get.return_value.status_code = 200 + mock_get.return_value.json.return_value = fake_response + tracks = _discogs_get_tracklist(12345) + assert len(tracks) == 2, f"expected 2 tracks, got {len(tracks)}" + assert tracks[0]["title"] == "Song A", f"track 0: {tracks[0]}" + assert tracks[1]["disc"] == 2, f"disc should be 2 after heading: {tracks[1]}" + return f"Discogs tracklist format OK: {len(tracks)} tracks" + + +def test_lastfm_tracklist_format() -> str: + from metadata_resolver import _lastfm_tracklist + import unittest.mock as mock, os + fake_response = { + "album": { + "tracks": { + "track": [ + {"name": "Track One", "@attr": {"rank": "1"}, "artist": {"name": "Artist"}}, + {"name": "Track Two", "@attr": {"rank": "2"}, "artist": {"name": "Artist"}}, + ] + } + } + } + with mock.patch.dict(os.environ, {"LASTFM_API_KEY": "testkey"}): + with mock.patch("requests.get") as mock_get: + mock_get.return_value.status_code = 200 + mock_get.return_value.json.return_value = fake_response + tracks = _lastfm_tracklist("Artist", "Album") + assert len(tracks) == 2, f"expected 2 tracks, got {len(tracks)}" + assert tracks[0]["title"] == "Track One", f"track 0: {tracks[0]}" + assert tracks[0]["number"] == 1, f"rank/number: {tracks[0]}" + return f"Last.fm tracklist format OK: {len(tracks)} tracks" + + +def test_back_cover_skips_if_exists() -> str: + from cover_handler import download_back_cover + with tempfile.TemporaryDirectory() as tmpdir: + root = Path(tmpdir) + back = root / "back.jpg" + back.write_bytes(b"\xff\xd8" + b"\x00" * 200) + result = download_back_cover("fake-mbid", root) + assert result == back, f"should return existing back.jpg: {result}" + return "back cover skip-if-exists OK" + + # --------------------------------------------------------------------------- # executor Tests # --------------------------------------------------------------------------- @@ -399,6 +467,10 @@ def main() -> None: ("UNIT_27_is_classical_false_pop", test_is_classical_false_for_pop), ("UNIT_28_is_classical_false_folk", test_is_classical_false_for_folk), ("UNIT_29_normalize_cover_renames", test_normalize_cover_renames_front_jpg), + ("UNIT_30_itunes_url_none_handling", test_itunes_cover_url_format), + ("UNIT_31_discogs_tracklist_format", test_discogs_tracklist_format), + ("UNIT_32_lastfm_tracklist_format", test_lastfm_tracklist_format), + ("UNIT_33_back_cover_skip_if_exists", test_back_cover_skips_if_exists), ] for test_id, fn in cases: