cover_handler.py: - _download_image(): shared helper replaces duplicated download logic - download_back_cover(): fetches back cover from MusicBrainz CAA (/back endpoint), saves as back.jpg; skips if already present - _itunes_cover_url() / download_itunes_cover(): iTunes Search API (no auth), requests 600x600 artwork; fallback after Discogs - _lastfm_cover_url() / download_lastfm_cover(): Last.fm album.getinfo (LASTFM_API_KEY env var); last cover fallback, skips placeholder images - resolve_cover(): extended with iTunes → Last.fm fallback chain metadata_resolver.py: - _discogs_get_tracklist(): fetches full Discogs release via REST API, parses tracklist[] including heading-based disc detection - _lastfm_tracklist(): fetches Last.fm album.getinfo tracks (LASTFM_API_KEY) - resolve(): uses Discogs tracklist → Last.fm tracklist as fallback when MusicBrainz returns no tracks; LASTFM_API_KEY added to env var block music_enricher.py: - process_album(): calls download_back_cover() after execute_album() when MBID known New cover priority: local → MusicBrainz front → Discogs → iTunes → Last.fm New tracklist priority: local → YouTube → MusicBrainz → Discogs → Last.fm → OCR Test suite: 29 → 33 tests (all pass) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
376 lines
13 KiB
Python
Executable file
376 lines
13 KiB
Python
Executable file
from __future__ import annotations
|
|
|
|
import io
|
|
import os
|
|
import sys
|
|
import tempfile
|
|
import time
|
|
from pathlib import Path
|
|
from typing import Optional, List
|
|
|
|
try:
|
|
from PIL import Image
|
|
HAS_PIL = True
|
|
except ImportError:
|
|
HAS_PIL = False
|
|
|
|
try:
|
|
import requests
|
|
HAS_REQUESTS = True
|
|
except ImportError:
|
|
HAS_REQUESTS = False
|
|
|
|
try:
|
|
import musicbrainzngs as mb
|
|
HAS_MB = True
|
|
except ImportError:
|
|
HAS_MB = False
|
|
|
|
try:
|
|
from mutagen.id3 import ID3, APIC, error as ID3Error
|
|
from mutagen.flac import FLAC, Picture
|
|
from mutagen.mp4 import MP4, MP4Cover
|
|
HAS_MUTAGEN = True
|
|
except ImportError:
|
|
HAS_MUTAGEN = False
|
|
|
|
_MIN_COVER_SIZE = 200 # pixels
|
|
|
|
|
|
def _image_ok(path: Path) -> bool:
|
|
if not HAS_PIL:
|
|
return path.stat().st_size > 5000
|
|
try:
|
|
with Image.open(path) as img:
|
|
w, h = img.size
|
|
return w >= _MIN_COVER_SIZE and h >= _MIN_COVER_SIZE
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
def find_local_cover(image_files: List[Path]) -> Optional[Path]:
|
|
priority = ("folder", "front", "cover", "album")
|
|
def key(p: Path):
|
|
name = p.name.lower()
|
|
score = next((i for i, kw in enumerate(priority) if kw in name), len(priority))
|
|
size = p.stat().st_size if p.exists() else 0
|
|
return (score, -size)
|
|
|
|
for p in sorted(image_files, key=key):
|
|
if _image_ok(p):
|
|
return p
|
|
return None
|
|
|
|
|
|
def normalize_cover_to_folder_jpg(cover_path: Path) -> Path:
|
|
"""
|
|
Stellt sicher dass das Cover als folder.jpg (JPEG) im Album-Verzeichnis liegt.
|
|
- Ist es bereits folder.jpg → unverändert zurückgeben.
|
|
- Ist es eine andere JPEG → umbenennen.
|
|
- Ist es WebP oder PNG → zu JPEG konvertieren, Original löschen.
|
|
Gibt den Pfad zur folder.jpg zurück.
|
|
"""
|
|
dest = cover_path.parent / "folder.jpg"
|
|
if cover_path.resolve() == dest.resolve():
|
|
return dest
|
|
|
|
suffix = cover_path.suffix.lower()
|
|
try:
|
|
if suffix in (".jpg", ".jpeg"):
|
|
cover_path.rename(dest)
|
|
elif HAS_PIL:
|
|
import io
|
|
with cover_path.open("rb") as f:
|
|
raw = f.read()
|
|
with Image.open(io.BytesIO(raw)) as img:
|
|
buf = io.BytesIO()
|
|
img.convert("RGB").save(buf, format="JPEG", quality=92)
|
|
dest.write_bytes(buf.getvalue())
|
|
cover_path.unlink()
|
|
else:
|
|
# PIL nicht verfügbar: einfach umbenennen, auch wenn es kein JPEG ist
|
|
cover_path.rename(dest)
|
|
print(f" 🖼️ Cover normalisiert → folder.jpg ({cover_path.name})")
|
|
except Exception as e:
|
|
print(f" ⚠️ Cover-Normalisierung fehlgeschlagen: {e}", file=sys.stderr)
|
|
return cover_path
|
|
return dest
|
|
|
|
|
|
def _mb_cover_url(release_mbid: str) -> Optional[str]:
|
|
url = f"https://coverartarchive.org/release/{release_mbid}/front"
|
|
if not HAS_REQUESTS:
|
|
return None
|
|
try:
|
|
r = requests.head(url, timeout=5, allow_redirects=True)
|
|
if r.status_code == 200:
|
|
return url
|
|
except Exception:
|
|
pass
|
|
return None
|
|
|
|
|
|
def _download_image(url: str, dest: Path, label: str = "") -> Optional[Path]:
|
|
"""Hilfsfunktion: URL herunterladen, PNG→JPEG konvertieren, als dest speichern."""
|
|
try:
|
|
r = requests.get(url, timeout=15, headers={"User-Agent": "MusicMetadataEnricher/1.0"})
|
|
if r.status_code != 200:
|
|
return None
|
|
ct = r.headers.get("content-type", "")
|
|
if ("png" in ct or url.lower().endswith(".png")) and HAS_PIL:
|
|
with Image.open(io.BytesIO(r.content)) as img:
|
|
buf = io.BytesIO()
|
|
img.convert("RGB").save(buf, format="JPEG", quality=92)
|
|
dest.write_bytes(buf.getvalue())
|
|
else:
|
|
dest.write_bytes(r.content)
|
|
if _image_ok(dest):
|
|
return dest
|
|
dest.unlink(missing_ok=True)
|
|
except Exception as e:
|
|
if label:
|
|
print(f" ⚠️ {label}: {e}", file=sys.stderr)
|
|
dest.unlink(missing_ok=True)
|
|
return None
|
|
|
|
|
|
def download_cover(release_mbid: Optional[str], dest_dir: Path) -> Optional[Path]:
|
|
if not release_mbid or not HAS_REQUESTS:
|
|
return None
|
|
url = _mb_cover_url(release_mbid)
|
|
if not url:
|
|
return None
|
|
return _download_image(url, dest_dir / "folder.jpg", "Cover-Download-Fehler")
|
|
|
|
|
|
def download_back_cover(release_mbid: Optional[str], dest_dir: Path) -> Optional[Path]:
|
|
"""Lädt das Back-Cover von MusicBrainz Cover Art Archive als back.jpg."""
|
|
if not release_mbid or not HAS_REQUESTS:
|
|
return None
|
|
dest = dest_dir / "back.jpg"
|
|
if dest.exists():
|
|
return dest # bereits vorhanden
|
|
url = f"https://coverartarchive.org/release/{release_mbid}/back"
|
|
try:
|
|
r = requests.head(url, timeout=5, allow_redirects=True)
|
|
if r.status_code != 200:
|
|
return None
|
|
except Exception:
|
|
return None
|
|
return _download_image(url, dest, "Back-Cover-Fehler")
|
|
|
|
|
|
def _load_cover_data(cover_path: Path) -> tuple[bytes, str]:
|
|
"""
|
|
Liest Cover-Bilddaten und gibt (bytes, mime_type) zurück.
|
|
WebP wird zu JPEG konvertiert wenn PIL verfügbar (bessere Player-Kompatibilität).
|
|
"""
|
|
suffix = cover_path.suffix.lower()
|
|
if suffix in (".jpg", ".jpeg"):
|
|
return cover_path.read_bytes(), "image/jpeg"
|
|
if suffix == ".webp" and HAS_PIL:
|
|
try:
|
|
with Image.open(cover_path) as img:
|
|
img = img.convert("RGB")
|
|
buf = tempfile.SpooledTemporaryFile(max_size=10 * 1024 * 1024)
|
|
img.save(buf, format="JPEG", quality=90)
|
|
buf.seek(0)
|
|
return buf.read(), "image/jpeg"
|
|
except Exception as e:
|
|
print(f" ⚠️ WebP→JPEG-Konvertierung fehlgeschlagen ({cover_path.name}): {e}",
|
|
file=sys.stderr)
|
|
if suffix == ".webp":
|
|
return cover_path.read_bytes(), "image/webp"
|
|
if suffix == ".png":
|
|
return cover_path.read_bytes(), "image/png"
|
|
# Fallback: raw bytes, JPEG assumed
|
|
return cover_path.read_bytes(), "image/jpeg"
|
|
|
|
|
|
def embed_cover(audio_path: Path, cover_path: Path) -> bool:
|
|
if not HAS_MUTAGEN:
|
|
return False
|
|
try:
|
|
img_data, mime = _load_cover_data(cover_path)
|
|
ext = audio_path.suffix.lower()
|
|
|
|
if ext == ".mp3":
|
|
try:
|
|
tags = ID3(str(audio_path))
|
|
except ID3Error:
|
|
tags = ID3()
|
|
tags.delall("APIC")
|
|
tags.add(APIC(encoding=3, mime=mime, type=3, desc="Cover", data=img_data))
|
|
tags.save(str(audio_path), v2_version=4)
|
|
return True
|
|
|
|
elif ext == ".flac":
|
|
audio = FLAC(str(audio_path))
|
|
audio.clear_pictures()
|
|
pic = Picture()
|
|
pic.type = 3
|
|
pic.mime = mime
|
|
pic.desc = "Cover"
|
|
pic.data = img_data
|
|
audio.add_picture(pic)
|
|
audio.save()
|
|
return True
|
|
|
|
elif ext == ".m4a":
|
|
audio = MP4(str(audio_path))
|
|
fmt = MP4Cover.FORMAT_JPEG if mime == "image/jpeg" else MP4Cover.FORMAT_PNG
|
|
# WebP wurde bereits zu JPEG konvertiert, mime ist dann "image/jpeg"
|
|
audio.tags["covr"] = [MP4Cover(img_data, imageformat=fmt)]
|
|
audio.save()
|
|
return True
|
|
|
|
else:
|
|
# Generic mutagen fallback
|
|
from mutagen import File as MutagenFile
|
|
audio = MutagenFile(str(audio_path), easy=False)
|
|
if audio is not None:
|
|
if audio.tags is None:
|
|
audio.add_tags()
|
|
if hasattr(audio.tags, "add"):
|
|
audio.tags.add(
|
|
APIC(encoding=3, mime=mime, type=3, desc="Cover", data=img_data)
|
|
)
|
|
audio.save()
|
|
return True
|
|
|
|
except Exception as e:
|
|
print(f" ⚠️ Cover-Einbettungsfehler {audio_path.name}: {e}", file=sys.stderr)
|
|
return False
|
|
|
|
|
|
def _discogs_cover_url(artist: Optional[str], album: Optional[str]) -> Optional[str]:
|
|
"""Sucht auf Discogs nach artist+album und gibt die primäre Image-URL zurück."""
|
|
if not HAS_REQUESTS or not artist or not album:
|
|
return None
|
|
import os
|
|
token = os.getenv("DISCOGS_TOKEN", "")
|
|
headers = {"User-Agent": "MusicMetadataEnricher/1.0"}
|
|
if token:
|
|
headers["Authorization"] = f"Discogs token={token}"
|
|
try:
|
|
r = requests.get(
|
|
"https://api.discogs.com/database/search",
|
|
params={"artist": artist, "release_title": album, "type": "release", "per_page": 3},
|
|
headers=headers,
|
|
timeout=10,
|
|
)
|
|
if r.status_code != 200:
|
|
return None
|
|
results = r.json().get("results", [])
|
|
for result in results:
|
|
cover = result.get("cover_image") or result.get("thumb")
|
|
if cover and "spacer" not in cover:
|
|
return cover
|
|
except Exception as e:
|
|
print(f" ⚠️ Discogs-Suchfehler: {e}", file=sys.stderr)
|
|
return None
|
|
|
|
|
|
def download_discogs_cover(artist: Optional[str], album: Optional[str], dest_dir: Path) -> Optional[Path]:
|
|
url = _discogs_cover_url(artist, album)
|
|
if not url:
|
|
return None
|
|
return _download_image(url, dest_dir / "folder.jpg", "Discogs-Cover-Fehler")
|
|
|
|
|
|
def _itunes_cover_url(artist: Optional[str], album: Optional[str]) -> Optional[str]:
|
|
"""Sucht auf iTunes nach artist+album, gibt 600x600-Artwork-URL zurück."""
|
|
if not HAS_REQUESTS or not (artist or album):
|
|
return None
|
|
term = f"{artist or ''} {album or ''}".strip()
|
|
try:
|
|
r = requests.get(
|
|
"https://itunes.apple.com/search",
|
|
params={"term": term, "media": "music", "entity": "album", "limit": 5},
|
|
timeout=8,
|
|
)
|
|
if r.status_code != 200:
|
|
return None
|
|
for result in r.json().get("results", []):
|
|
url = result.get("artworkUrl100", "")
|
|
if url:
|
|
# Auf 600x600 hochskalieren
|
|
return url.replace("100x100bb", "600x600bb").replace("100x100", "600x600")
|
|
except Exception as e:
|
|
print(f" ⚠️ iTunes-Suche: {e}", file=sys.stderr)
|
|
return None
|
|
|
|
|
|
def download_itunes_cover(artist: Optional[str], album: Optional[str], dest_dir: Path) -> Optional[Path]:
|
|
url = _itunes_cover_url(artist, album)
|
|
if not url:
|
|
return None
|
|
return _download_image(url, dest_dir / "folder.jpg", "iTunes-Cover-Fehler")
|
|
|
|
|
|
def _lastfm_cover_url(artist: Optional[str], album: Optional[str]) -> Optional[str]:
|
|
"""Last.fm album.getinfo → größtes verfügbares Artwork-URL."""
|
|
api_key = os.getenv("LASTFM_API_KEY", "")
|
|
if not HAS_REQUESTS or not api_key or not artist or not album:
|
|
return None
|
|
try:
|
|
r = requests.get(
|
|
"https://ws.audioscrobbler.com/2.0/",
|
|
params={"method": "album.getinfo", "api_key": api_key,
|
|
"artist": artist, "album": album, "format": "json"},
|
|
timeout=8,
|
|
)
|
|
if r.status_code != 200:
|
|
return None
|
|
images = r.json().get("album", {}).get("image", [])
|
|
# Images sind aufsteigend nach Größe sortiert: small, medium, large, extralarge, mega
|
|
for img in reversed(images):
|
|
url = img.get("#text", "")
|
|
if url and "2a96cbd8b46e442fc41c2b86b821562f" not in url: # Last.fm Platzhalter-Hash
|
|
return url
|
|
except Exception as e:
|
|
print(f" ⚠️ Last.fm-Cover: {e}", file=sys.stderr)
|
|
return None
|
|
|
|
|
|
def download_lastfm_cover(artist: Optional[str], album: Optional[str], dest_dir: Path) -> Optional[Path]:
|
|
url = _lastfm_cover_url(artist, album)
|
|
if not url:
|
|
return None
|
|
return _download_image(url, dest_dir / "folder.jpg", "Last.fm-Cover-Fehler")
|
|
|
|
|
|
def resolve_cover(
|
|
image_files: List[Path],
|
|
release_mbid: Optional[str],
|
|
album_dir: Path,
|
|
artist: Optional[str] = None,
|
|
album: Optional[str] = None,
|
|
) -> tuple[Optional[Path], Optional[str]]:
|
|
"""Returns (cover_path, source_label)."""
|
|
local = find_local_cover(image_files)
|
|
if local:
|
|
local = normalize_cover_to_folder_jpg(local)
|
|
return local, "local"
|
|
|
|
if release_mbid:
|
|
downloaded = download_cover(release_mbid, album_dir)
|
|
if downloaded:
|
|
return downloaded, "musicbrainz"
|
|
|
|
if artist or album:
|
|
downloaded = download_discogs_cover(artist, album, album_dir)
|
|
if downloaded:
|
|
return downloaded, "discogs"
|
|
|
|
if artist or album:
|
|
downloaded = download_itunes_cover(artist, album, album_dir)
|
|
if downloaded:
|
|
return downloaded, "itunes"
|
|
|
|
if artist or album:
|
|
downloaded = download_lastfm_cover(artist, album, album_dir)
|
|
if downloaded:
|
|
return downloaded, "lastfm"
|
|
|
|
return None, None
|