Music_Metadata_Enricher/metadata_resolver.py

520 lines
19 KiB
Python
Raw Normal View History

from __future__ import annotations
import os
import sys
import time
from typing import Optional, List, Dict, Tuple
from models import AlbumHints, AlbumProposal, TrackProposal
try:
import musicbrainzngs as mb
mb.set_useragent("MusicMetadataEnricher", "1.0", "https://github.com/dschlueter")
HAS_MB = True
except ImportError:
HAS_MB = False
try:
import acoustid
HAS_ACOUSTID = True
except ImportError:
HAS_ACOUSTID = False
try:
import discogs_client as dc
HAS_DISCOGS = True
except ImportError:
HAS_DISCOGS = False
try:
import anthropic
HAS_ANTHROPIC = True
except ImportError:
HAS_ANTHROPIC = False
_MB_RATE_LIMIT = 1.1 # seconds between MusicBrainz requests
_last_mb_call = 0.0
ACOUSTID_API_KEY = os.getenv("ACOUSTID_API_KEY", "")
ANTHROPIC_API_KEY = os.getenv("ANTHROPIC_API_KEY", "")
OPENROUTER_API_KEY = os.getenv("OPENROUTER_API_KEY", "")
DISCOGS_TOKEN = os.getenv("DISCOGS_TOKEN", "")
OLLAMA_HOST = os.getenv("OLLAMA_HOST", "http://localhost:11434")
# qwen3:8b (5.2GB) reicht für einfache JSON-Metadaten-Ergänzung und lädt schnell (~10s)
OLLAMA_RESOLVE_MODEL = os.getenv("OLLAMA_RESOLVE_MODEL", "qwen3:8b")
def _mb_wait():
global _last_mb_call
elapsed = time.monotonic() - _last_mb_call
if elapsed < _MB_RATE_LIMIT:
time.sleep(_MB_RATE_LIMIT - elapsed)
_last_mb_call = time.monotonic()
# ---------------------------------------------------------------------------
# AcoustID fingerprinting
# ---------------------------------------------------------------------------
def _fingerprint_tracks(hints: AlbumHints) -> Dict[str, List[str]]:
"""Returns {audio_path_str: [mbid, ...]}"""
if not HAS_ACOUSTID or not ACOUSTID_API_KEY:
return {}
results: Dict[str, List[str]] = {}
for t in hints.tracks:
try:
duration, fp = acoustid.fingerprint_file(str(t.path))
response = acoustid.lookup(ACOUSTID_API_KEY, fp, duration,
meta="recordings releasegroups")
mbids: List[str] = []
for result in response.get("results", []):
if result.get("score", 0) >= 0.90:
for rec in result.get("recordings", []):
mbids.append(rec["id"])
results[str(t.path)] = mbids
except Exception as e:
print(f" ⚠️ AcoustID-Fehler {t.path.name}: {e}", file=sys.stderr)
return results
# ---------------------------------------------------------------------------
# MusicBrainz lookup
# ---------------------------------------------------------------------------
def _mb_search_release(artist: Optional[str], album: Optional[str],
year: Optional[str]) -> Optional[Dict]:
if not HAS_MB or (not artist and not album):
return None
query_parts = []
if album:
query_parts.append(f'release:"{album}"')
if artist:
query_parts.append(f'artist:"{artist}"')
if year:
query_parts.append(f'date:{year}')
query = " AND ".join(query_parts)
try:
_mb_wait()
result = mb.search_releases(query=query, limit=3)
releases = result.get("release-list", [])
if not releases:
return None
# Take highest-score release
best = max(releases, key=lambda r: int(r.get("ext:score", 0)))
score = int(best.get("ext:score", 0))
if score < 70:
return None
return best
except Exception as e:
print(f" ⚠️ MusicBrainz-Suchfehler: {e}", file=sys.stderr)
return None
def _mb_get_release_tracks(release_id: str) -> Optional[List[Dict]]:
if not HAS_MB:
return None
try:
_mb_wait()
result = mb.get_release_by_id(
release_id,
includes=["recordings", "artist-credits", "labels", "release-groups"],
)
return result.get("release")
except Exception as e:
print(f" ⚠️ MusicBrainz-Release-Fehler: {e}", file=sys.stderr)
return None
def _mb_recording_to_release(recording_mbid: str) -> Optional[Dict]:
if not HAS_MB:
return None
try:
_mb_wait()
result = mb.get_recording_by_id(
recording_mbid,
includes=["releases", "artist-credits", "release-groups"],
)
rec = result.get("recording", {})
releases = rec.get("release-list", [])
if releases:
return releases[0]
return None
except Exception as e:
print(f" ⚠️ MusicBrainz-Recording-Fehler: {e}", file=sys.stderr)
return None
# ---------------------------------------------------------------------------
# Discogs fallback
# ---------------------------------------------------------------------------
def _discogs_search(artist: Optional[str], album: Optional[str]) -> Optional[Dict]:
if not HAS_DISCOGS or not DISCOGS_TOKEN:
return None
try:
client = dc.Client("MusicMetadataEnricher/1.0", user_token=DISCOGS_TOKEN)
results = client.search(
album or artist or "",
artist=artist or "",
type="release",
)
if results.count:
r = results[0]
return {
"album": r.title,
"artist": r.artists[0].name if r.artists else None,
"year": str(r.year) if r.year else None,
"genre": r.genres[0] if r.genres else None,
"label": r.labels[0].name if r.labels else None,
"id": r.id,
}
except Exception as e:
print(f" ⚠️ Discogs-Fehler: {e}", file=sys.stderr)
return None
# ---------------------------------------------------------------------------
# Claude API reasoning (optional)
# ---------------------------------------------------------------------------
def _build_resolve_prompt(hints: AlbumHints, partial: Dict) -> str:
tracks_summary = "\n".join(
f" - Track {t.track_number or '?'}: {t.title or t.path.stem}"
+ (f" [{t.artist}]" if t.artist else "")
for t in hints.tracks[:20]
)
return (
"Du bist ein Musikexperte. Analysiere diese Album-Daten.\n"
"Vervollständige fehlende Felder UND korrigiere erkennbare Tippfehler "
"(z.B. im Albumtitel oder Künstlernamen — Verzeichnisnamen enthalten oft Schreibfehler).\n\n"
f"Verzeichnisname: {hints.album_dir.name}\n"
f"Künstler (aus Verzeichnis): {hints.dir_artist or partial.get('artist', 'unbekannt')}\n"
f"Albumtitel (aus Verzeichnis, evtl. mit Tippfehlern): {hints.dir_album or partial.get('album', 'unbekannt')}\n"
f"Jahr: {hints.dir_year or partial.get('year', 'unbekannt')}\n"
f"Tracklist-Hinweise:\n{tracks_summary}\n\n"
'Antworte NUR mit einem JSON-Objekt mit diesen Feldern (null wenn unbekannt):\n'
'{"artist": ..., "album": ..., "albumartist": ..., "year": ..., "genre": ..., "label": ...}'
)
def _parse_json_response(text: str) -> Optional[Dict]:
import json, re
m = re.search(r"\{.*\}", text, re.DOTALL)
if m:
try:
return json.loads(m.group())
except Exception:
pass
return None
def _resolve_via_ollama(hints: AlbumHints, partial: Dict) -> Optional[Dict]:
"""Lokales Reasoning via Ollama (kein API-Key nötig)."""
import urllib.request, json
prompt = _build_resolve_prompt(hints, partial)
payload = json.dumps({
"model": OLLAMA_RESOLVE_MODEL,
"messages": [{"role": "user", "content": prompt}],
"stream": False,
"format": "json",
"options": {"temperature": 0.1},
}).encode()
try:
req = urllib.request.Request(
f"{OLLAMA_HOST}/api/chat",
data=payload,
headers={"Content-Type": "application/json"},
method="POST",
)
with urllib.request.urlopen(req, timeout=240) as resp:
data = json.loads(resp.read())
text = data.get("message", {}).get("content", "").strip()
return _parse_json_response(text)
except Exception as e:
print(f" ⚠️ Ollama-Resolve-Fehler: {e}", file=sys.stderr)
return None
def _resolve_via_openrouter(hints: AlbumHints, partial: Dict) -> Optional[Dict]:
"""Reasoning via OpenRouter (günstige chinesische Modelle bevorzugt)."""
if not OPENROUTER_API_KEY:
return None
import urllib.request, json
prompt = _build_resolve_prompt(hints, partial)
# DeepSeek V3: extrem günstig, sehr kompetent
model = "deepseek/deepseek-chat-v3-0324"
payload = json.dumps({
"model": model,
"messages": [{"role": "user", "content": prompt}],
"temperature": 0.1,
"max_tokens": 300,
}).encode()
try:
req = urllib.request.Request(
"https://openrouter.ai/api/v1/chat/completions",
data=payload,
headers={
"Content-Type": "application/json",
"Authorization": f"Bearer {OPENROUTER_API_KEY}",
"HTTP-Referer": "https://pi.local",
"X-Title": "MusicMetadataEnricher",
},
method="POST",
)
with urllib.request.urlopen(req, timeout=30) as resp:
data = json.loads(resp.read())
text = data["choices"][0]["message"]["content"].strip()
return _parse_json_response(text)
except Exception as e:
print(f" ⚠️ OpenRouter-Resolve-Fehler: {e}", file=sys.stderr)
return None
def _claude_resolve(hints: AlbumHints, partial: Dict) -> Optional[Dict]:
"""
Reihenfolge: Ollama (lokal, kostenlos) OpenRouter (günstig).
Claude API wird bewusst nicht genutzt (zu teuer).
"""
# 1. Ollama lokal (bevorzugt — kostenlos, RTX 3090)
result = _resolve_via_ollama(hints, partial)
if result:
return result
# 2. OpenRouter (DeepSeek V3, günstig) wenn Key gesetzt
if OPENROUTER_API_KEY:
result = _resolve_via_openrouter(hints, partial)
if result:
return result
return None
# ---------------------------------------------------------------------------
# Main resolver
# ---------------------------------------------------------------------------
def resolve(
hints: AlbumHints,
use_fingerprint: bool = True,
use_api: bool = True,
use_claude: bool = True,
) -> AlbumProposal:
confidence = 0.0
sources: List[str] = []
notes: List[str] = []
artist = hints.dir_artist
album = hints.dir_album
year = hints.dir_year
genre: Optional[str] = None
label: Optional[str] = None
release_mbid: Optional[str] = None
mb_tracks: Optional[List] = None
# Collect artist/album from existing tags (majority vote)
tag_artists = [t.existing_tags.get("artist") for t in hints.tracks if t.existing_tags.get("artist")]
tag_albums = [t.existing_tags.get("album") for t in hints.tracks if t.existing_tags.get("album")]
if tag_artists:
from collections import Counter
artist = artist or Counter(tag_artists).most_common(1)[0][0]
if tag_albums:
from collections import Counter
album = album or Counter(tag_albums).most_common(1)[0][0]
# Tag year/genre/label
import re as _re
for t in hints.tracks:
raw_year = t.existing_tags.get("date") or t.existing_tags.get("year")
if raw_year and not year:
# Strip invisible chars so ID3TimeStamp validation doesn't fail later
year = _re.sub(r"[^\d\-T:+Z]", "", str(raw_year)).strip()[:10] or None
genre = genre or t.existing_tags.get("genre")
label = label or t.existing_tags.get("label") or t.existing_tags.get("organization")
if artist or album:
confidence += 0.05
sources.append("local-hints")
# AcoustID fingerprinting
fp_mbids: Dict[str, List[str]] = {}
if use_fingerprint and use_api and HAS_ACOUSTID and ACOUSTID_API_KEY:
fp_mbids = _fingerprint_tracks(hints)
if fp_mbids:
confidence += 0.20
sources.append("acoustid")
# Try to get release from first matched recording
for mbids in fp_mbids.values():
for mbid in mbids[:1]:
rel = _mb_recording_to_release(mbid)
if rel:
release_mbid = rel.get("id")
confidence += 0.25
sources.append("musicbrainz-fingerprint")
break
if release_mbid:
break
# MusicBrainz text search
if use_api and HAS_MB and not release_mbid:
mb_result = _mb_search_release(artist, album, year)
if mb_result:
release_mbid = mb_result.get("id")
score = int(mb_result.get("ext:score", 0))
confidence += 0.30 * (score / 100)
sources.append("musicbrainz-text")
notes.append(f"MusicBrainz score: {score}")
# Fetch full release data
if use_api and release_mbid:
full_release = _mb_get_release_tracks(release_mbid)
if full_release:
if not artist:
creds = full_release.get("artist-credit", [])
artist = "".join(c.get("artist", {}).get("name", "") + c.get("joinphrase", "")
for c in creds if isinstance(c, dict)).strip() or artist
if not album:
album = full_release.get("title", album)
if not year:
year = full_release.get("date", "")[:4] or None
label_info = full_release.get("label-info-list", [])
if label_info and not label:
label = label_info[0].get("label", {}).get("name") if label_info else None
rg = full_release.get("release-group", {})
if not genre:
genre = (rg.get("primary-type") or "").strip() or None
mb_tracks = []
for medium in full_release.get("medium-list", []):
disc_num = medium.get("position", 1)
for track in medium.get("track-list", []):
mb_tracks.append({
"disc": disc_num,
"number": int(track.get("number", 0) or 0),
"title": track.get("recording", {}).get("title", ""),
"artist": track.get("artist-credit-phrase", ""),
"mbid": track.get("recording", {}).get("id"),
})
# Discogs fallback
if use_api and HAS_DISCOGS and DISCOGS_TOKEN and not release_mbid:
dg = _discogs_search(artist, album)
if dg:
artist = artist or dg.get("artist")
album = album or dg.get("album")
year = year or dg.get("year")
genre = genre or dg.get("genre")
label = label or dg.get("label")
confidence += 0.15
sources.append("discogs")
# LLM-Reasoning für verbleibende Lücken:
# Reihenfolge: Ollama lokal → OpenRouter (DeepSeek, günstig) → Claude API
partial = {"artist": artist, "album": album, "year": year}
if use_claude and use_api:
if not artist or not album or confidence < 0.5:
cl = _claude_resolve(hints, partial)
if cl:
if confidence < 0.3:
# Sehr unsicher: LLM darf auch bestehende Werte korrigieren
# (z.B. Tippfehler im Albumtitel aus dem Verzeichnisnamen)
artist = cl.get("artist") or artist
album = cl.get("album") or album
year = cl.get("year") or year
genre = cl.get("genre") or genre
label = cl.get("label") or label
else:
artist = artist or cl.get("artist")
album = album or cl.get("album")
year = year or cl.get("year")
genre = genre or cl.get("genre")
label = label or cl.get("label")
confidence += 0.10
sources.append("llm-resolve")
# Finalize albumartist
# dir_artist hat Vorrang: wenn der Verzeichnisname einen Künstler nennt
# (z.B. "Eugen_Cicero_-_Jazz_meets_Classic"), ist das der Albumkünstler —
# auch wenn die Track-Dateinamen die Komponisten-Namen enthalten.
track_artists = [t.artist for t in hints.tracks if t.artist]
from collections import Counter
distinct_artists = set(a for a in track_artists if a)
if hints.dir_artist:
# Verzeichnisname nennt explizit einen Künstler → immer verwenden
albumartist = hints.dir_artist
elif len(distinct_artists) >= 3:
albumartist = "Various Artists"
elif track_artists:
albumartist = artist or Counter(track_artists).most_common(1)[0][0]
else:
albumartist = artist or "Unknown Artist"
album = album or hints.album_dir.name.replace("_", " ")
artist = artist or albumartist
confidence = min(confidence, 1.0)
# Build track proposals
track_proposals = _build_track_proposals(hints, mb_tracks, album, artist)
return AlbumProposal(
album_dir=hints.album_dir,
album=album,
albumartist=albumartist,
date=year,
genre=genre,
label=label,
mbid=release_mbid,
cover_path=None,
cover_source=None,
tracks=track_proposals,
confidence=confidence,
sources=sources,
notes=notes,
)
def _build_track_proposals(
hints: AlbumHints,
mb_tracks: Optional[List],
album: str,
album_artist: str,
) -> List[TrackProposal]:
proposals: List[TrackProposal] = []
for th in sorted(hints.tracks, key=lambda t: (t.disc_number or 1, t.track_number or 9999, str(t.path))):
title = th.title
artist = th.artist or album_artist
track_num = th.track_number
disc_num = th.disc_number
# Try to match from MusicBrainz track list
if mb_tracks and track_num:
for mb_t in mb_tracks:
if mb_t["number"] == track_num and mb_t["disc"] == (disc_num or 1):
if mb_t.get("title"):
title = mb_t["title"]
if mb_t.get("artist"):
artist = mb_t["artist"]
break
title = title or th.path.stem
proposals.append(TrackProposal(
path=th.path,
title=title,
artist=artist,
track_number=track_num,
disc_number=disc_num,
mbid=None,
))
# Sequenzielle Nummerierung als letzter Fallback:
# Tracks ohne Nummer (None) erhalten eine laufende Nummer pro Disc.
# Damit werden "00" und "??" im Dateinamen beim --rename verhindert.
if any(p.track_number is None for p in proposals):
disc_counters: Dict[int, int] = {}
for p in proposals:
if p.track_number is None:
disc = p.disc_number or 1
disc_counters[disc] = disc_counters.get(disc, 0) + 1
p.track_number = disc_counters[disc]
return proposals