MusicBrainz returns vinyl track numbers as 'A1', 'B3' etc. instead of
plain integers. int('A1') raised ValueError crashing the entire album.
metadata_resolver.py: parse vinyl positions with regex before int():
- 'A1' → track 1, disc 1 (side A)
- 'B3' → track 3, disc 1 (side B)
- 'C1' → track 1, disc 2 (side C)
- Non-vinyl: extract first digit group via re.search
hint_extractor.py: guard int(tl_track) in tracklist matching with
try/except + re.search so any non-numeric track position is skipped
gracefully instead of crashing.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
733 lines
28 KiB
Python
Executable file
733 lines
28 KiB
Python
Executable file
from __future__ import annotations
|
|
|
|
import os
|
|
import re
|
|
import sys
|
|
import time
|
|
from typing import Optional, List, Dict, Tuple
|
|
|
|
from models import AlbumHints, AlbumProposal, TrackProposal
|
|
|
|
try:
|
|
import musicbrainzngs as mb
|
|
mb.set_useragent("MusicMetadataEnricher", "1.0", "https://github.com/dschlueter")
|
|
HAS_MB = True
|
|
except ImportError:
|
|
HAS_MB = False
|
|
|
|
try:
|
|
import acoustid
|
|
HAS_ACOUSTID = True
|
|
except ImportError:
|
|
HAS_ACOUSTID = False
|
|
|
|
try:
|
|
import discogs_client as dc
|
|
HAS_DISCOGS = True
|
|
except ImportError:
|
|
HAS_DISCOGS = False
|
|
|
|
try:
|
|
import anthropic
|
|
HAS_ANTHROPIC = True
|
|
except ImportError:
|
|
HAS_ANTHROPIC = False
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Genre normalization
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_GENRE_MAP: Dict[str, str] = {
|
|
# Deutsch → Englisch (Jellyfin-Standardbegriffe)
|
|
"volksmusik": "Folk",
|
|
"volkslieder": "Folk",
|
|
"volkslied": "Folk",
|
|
"heimatlieder": "Folk",
|
|
"schlager": "Schlager",
|
|
"deutsche schlager": "Schlager",
|
|
"marsch": "March",
|
|
"marschmusik": "March",
|
|
"militaermusik": "March",
|
|
"militärmusik": "March",
|
|
"kirchenmusik": "Sacred",
|
|
"chormusik": "Choral",
|
|
"kinderlieder": "Children",
|
|
"weihnachtslieder": "Christmas",
|
|
"weihnachtsmusik": "Christmas",
|
|
"blasmusik": "Brass Band",
|
|
"operette": "Operetta",
|
|
"oper": "Opera",
|
|
"kammermusik": "Chamber Music",
|
|
"klassik": "Classical",
|
|
"classic": "Classical",
|
|
"klassische musik": "Classical",
|
|
"barock": "Baroque",
|
|
"romantik": "Romantic",
|
|
# Englische Varianten vereinheitlichen
|
|
"rhythm and blues": "R&B",
|
|
"rhythmic soul": "R&B",
|
|
"rock and roll": "Rock 'n' Roll",
|
|
"rock & roll": "Rock 'n' Roll",
|
|
"easy listening": "Easy Listening",
|
|
"vocal pop": "Pop",
|
|
"adult contemporary": "Pop",
|
|
"big band": "Big Band",
|
|
"swing music": "Swing",
|
|
"latin jazz": "Latin Jazz",
|
|
"bossa nova": "Bossa Nova",
|
|
"nueva cancion": "Nueva Canción",
|
|
}
|
|
|
|
|
|
def normalize_genre(genre: Optional[str]) -> Optional[str]:
|
|
if not genre:
|
|
return genre
|
|
key = genre.strip().lower()
|
|
normalized = _GENRE_MAP.get(key)
|
|
if normalized:
|
|
return normalized
|
|
# Titlcase wenn nicht in der Map (verhindert ALL CAPS oder all lowercase)
|
|
return genre.strip().title() if genre == genre.upper() or genre == genre.lower() else genre.strip()
|
|
|
|
|
|
_MB_RATE_LIMIT = 1.1 # seconds between MusicBrainz requests
|
|
_last_mb_call = 0.0
|
|
ACOUSTID_API_KEY = os.getenv("ACOUSTID_API_KEY", "")
|
|
ANTHROPIC_API_KEY = os.getenv("ANTHROPIC_API_KEY", "")
|
|
OPENROUTER_API_KEY = os.getenv("OPENROUTER_API_KEY", "")
|
|
DISCOGS_TOKEN = os.getenv("DISCOGS_TOKEN", "")
|
|
LASTFM_API_KEY = os.getenv("LASTFM_API_KEY", "")
|
|
OLLAMA_HOST = os.getenv("OLLAMA_HOST", "http://localhost:11434")
|
|
|
|
# qwen3:8b (5.2GB) reicht für einfache JSON-Metadaten-Ergänzung und lädt schnell (~10s)
|
|
OLLAMA_RESOLVE_MODEL = os.getenv("OLLAMA_RESOLVE_MODEL", "qwen3:8b")
|
|
|
|
|
|
def _mb_wait():
|
|
global _last_mb_call
|
|
elapsed = time.monotonic() - _last_mb_call
|
|
if elapsed < _MB_RATE_LIMIT:
|
|
time.sleep(_MB_RATE_LIMIT - elapsed)
|
|
_last_mb_call = time.monotonic()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# AcoustID fingerprinting
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _fingerprint_tracks(hints: AlbumHints) -> Dict[str, List[str]]:
|
|
"""Returns {audio_path_str: [mbid, ...]}"""
|
|
if not HAS_ACOUSTID or not ACOUSTID_API_KEY:
|
|
return {}
|
|
results: Dict[str, List[str]] = {}
|
|
for t in hints.tracks:
|
|
try:
|
|
duration, fp = acoustid.fingerprint_file(str(t.path))
|
|
response = acoustid.lookup(ACOUSTID_API_KEY, fp, duration,
|
|
meta="recordings releasegroups")
|
|
mbids: List[str] = []
|
|
for result in response.get("results", []):
|
|
if result.get("score", 0) >= 0.90:
|
|
for rec in result.get("recordings", []):
|
|
mbids.append(rec["id"])
|
|
results[str(t.path)] = mbids
|
|
except Exception as e:
|
|
print(f" ⚠️ AcoustID-Fehler {t.path.name}: {e}", file=sys.stderr)
|
|
return results
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# MusicBrainz lookup
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _mb_search_release(artist: Optional[str], album: Optional[str],
|
|
year: Optional[str]) -> Optional[Dict]:
|
|
if not HAS_MB or (not artist and not album):
|
|
return None
|
|
query_parts = []
|
|
if album:
|
|
query_parts.append(f'release:"{album}"')
|
|
if artist:
|
|
query_parts.append(f'artist:"{artist}"')
|
|
if year:
|
|
query_parts.append(f'date:{year}')
|
|
query = " AND ".join(query_parts)
|
|
try:
|
|
_mb_wait()
|
|
result = mb.search_releases(query=query, limit=3)
|
|
releases = result.get("release-list", [])
|
|
if not releases:
|
|
return None
|
|
# Take highest-score release
|
|
best = max(releases, key=lambda r: int(r.get("ext:score", 0)))
|
|
score = int(best.get("ext:score", 0))
|
|
if score < 70:
|
|
return None
|
|
return best
|
|
except Exception as e:
|
|
print(f" ⚠️ MusicBrainz-Suchfehler: {e}", file=sys.stderr)
|
|
return None
|
|
|
|
|
|
def _mb_get_release_tracks(release_id: str) -> Optional[List[Dict]]:
|
|
if not HAS_MB:
|
|
return None
|
|
try:
|
|
_mb_wait()
|
|
result = mb.get_release_by_id(
|
|
release_id,
|
|
includes=["recordings", "artist-credits", "labels", "release-groups"],
|
|
)
|
|
return result.get("release")
|
|
except Exception as e:
|
|
print(f" ⚠️ MusicBrainz-Release-Fehler: {e}", file=sys.stderr)
|
|
return None
|
|
|
|
|
|
def _mb_recording_to_release(recording_mbid: str) -> Optional[Dict]:
|
|
if not HAS_MB:
|
|
return None
|
|
try:
|
|
_mb_wait()
|
|
result = mb.get_recording_by_id(
|
|
recording_mbid,
|
|
includes=["releases", "artist-credits", "release-groups"],
|
|
)
|
|
rec = result.get("recording", {})
|
|
releases = rec.get("release-list", [])
|
|
if releases:
|
|
return releases[0]
|
|
return None
|
|
except Exception as e:
|
|
print(f" ⚠️ MusicBrainz-Recording-Fehler: {e}", file=sys.stderr)
|
|
return None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Discogs fallback
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _discogs_search(artist: Optional[str], album: Optional[str]) -> Optional[Dict]:
|
|
if not HAS_DISCOGS or not DISCOGS_TOKEN:
|
|
return None
|
|
try:
|
|
client = dc.Client("MusicMetadataEnricher/1.0", user_token=DISCOGS_TOKEN)
|
|
results = client.search(
|
|
album or artist or "",
|
|
artist=artist or "",
|
|
type="release",
|
|
)
|
|
if results.count:
|
|
r = results[0]
|
|
return {
|
|
"album": r.title,
|
|
"artist": r.artists[0].name if r.artists else None,
|
|
"year": str(r.year) if r.year else None,
|
|
"genre": r.genres[0] if r.genres else None,
|
|
"label": r.labels[0].name if r.labels else None,
|
|
"id": r.id,
|
|
}
|
|
except Exception as e:
|
|
print(f" ⚠️ Discogs-Fehler: {e}", file=sys.stderr)
|
|
return None
|
|
|
|
|
|
def _discogs_get_tracklist(release_id) -> List[Dict]:
|
|
"""Holt die vollständige Tracklist eines Discogs-Release via REST-API."""
|
|
try:
|
|
import requests as _req
|
|
token = DISCOGS_TOKEN
|
|
headers = {"User-Agent": "MusicMetadataEnricher/1.0"}
|
|
if token:
|
|
headers["Authorization"] = f"Discogs token={token}"
|
|
r = _req.get(f"https://api.discogs.com/releases/{release_id}",
|
|
headers=headers, timeout=10)
|
|
if r.status_code != 200:
|
|
return []
|
|
tracklist = r.json().get("tracklist", [])
|
|
result = []
|
|
disc = 1
|
|
track_num = 0
|
|
for entry in tracklist:
|
|
if entry.get("type_") == "heading":
|
|
# Disc-Trennzeile ("CD 1", "Side A", …) — Disc hochzählen
|
|
disc += 1
|
|
track_num = 0
|
|
continue
|
|
pos = entry.get("position", "")
|
|
# Position kann "A1", "1", "1.2", "B3" sein
|
|
num_match = re.search(r"\d+", pos)
|
|
track_num = int(num_match.group()) if num_match else track_num + 1
|
|
result.append({
|
|
"disc": disc,
|
|
"number": track_num,
|
|
"title": entry.get("title", ""),
|
|
"artist": "",
|
|
})
|
|
return result
|
|
except Exception as e:
|
|
print(f" ⚠️ Discogs-Tracklist-Fehler: {e}", file=sys.stderr)
|
|
return []
|
|
|
|
|
|
def _lastfm_tracklist(artist: Optional[str], album: Optional[str]) -> List[Dict]:
|
|
"""Holt die Tracklist von Last.fm album.getinfo (LASTFM_API_KEY erforderlich)."""
|
|
api_key = os.getenv("LASTFM_API_KEY", "")
|
|
if not api_key or not artist or not album:
|
|
return []
|
|
try:
|
|
import requests as _req
|
|
r = _req.get(
|
|
"https://ws.audioscrobbler.com/2.0/",
|
|
params={"method": "album.getinfo", "api_key": api_key,
|
|
"artist": artist, "album": album, "format": "json"},
|
|
timeout=8,
|
|
)
|
|
if r.status_code != 200:
|
|
return []
|
|
tracks = r.json().get("album", {}).get("tracks", {}).get("track", [])
|
|
if isinstance(tracks, dict): # Einzelner Track → Liste
|
|
tracks = [tracks]
|
|
result = []
|
|
for t in tracks:
|
|
attr = t.get("@attr", {})
|
|
num = int(attr.get("rank", 0))
|
|
result.append({
|
|
"disc": 1,
|
|
"number": num,
|
|
"title": t.get("name", ""),
|
|
"artist": t.get("artist", {}).get("name", "") if isinstance(t.get("artist"), dict) else "",
|
|
})
|
|
return result
|
|
except Exception as e:
|
|
print(f" ⚠️ Last.fm-Tracklist-Fehler: {e}", file=sys.stderr)
|
|
return []
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Claude API reasoning (optional)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _build_resolve_prompt(hints: AlbumHints, partial: Dict) -> str:
|
|
tracks_summary = "\n".join(
|
|
f" - {('D'+str(t.disc_number)+'-') if t.disc_number else ''}T{t.track_number or '?'}: "
|
|
f"{t.title or t.path.stem}"
|
|
+ (f" [{t.artist}]" if t.artist else "")
|
|
for t in hints.tracks[:20]
|
|
)
|
|
# Tracklist-Kopfzeilen (erste 400 Zeichen, vor der Track-Liste) für Album/Label-Info
|
|
tracklist_header = ""
|
|
if hints.tracklist_text:
|
|
header_lines = []
|
|
for line in hints.tracklist_text.splitlines():
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
# Stopp bei erster Zeile die wie ein Track aussieht (1-1, 1. etc.)
|
|
if re.match(r"^\d[\d\-]\s+\S", line) or re.match(r"^\d{1,3}[.)]\s+", line):
|
|
break
|
|
header_lines.append(line)
|
|
if sum(len(l) for l in header_lines) > 400:
|
|
break
|
|
tracklist_header = "\n".join(header_lines[:15])
|
|
|
|
return (
|
|
"Du bist ein Musikexperte. Analysiere diese Album-Daten und gib korrekte Metadaten zurück.\n"
|
|
"Korrigiere auch erkennbare Tippfehler (Verzeichnisnamen enthalten oft Schreibfehler).\n\n"
|
|
"WICHTIGE FELDDEFINITIONEN:\n"
|
|
'- "artist" = Komponist (Klassik) ODER Band/Sänger (Pop/Rock/Jazz)\n'
|
|
'- "albumartist" = Interpret/Performer/Dirigent (Klassik) ODER gleich wie artist (Pop)\n'
|
|
" Beispiel Klassik: artist='Johann Sebastian Bach', albumartist='Peter Hurford'\n"
|
|
" Beispiel Pop: artist='ABBA', albumartist='ABBA'\n\n"
|
|
f"Verzeichnisname: {hints.album_dir.name}\n"
|
|
f"Hinweis Künstler/Titel (aus Verzeichnis, kann vertauscht oder falsch sein): "
|
|
f"{hints.dir_artist or '?'} / {hints.dir_album or partial.get('album', '?')}\n"
|
|
f"Jahr: {hints.dir_year or partial.get('year', 'unbekannt')}\n"
|
|
+ (f"YouTube-Videotitel: {hints.yt_title}\n" if hints.yt_title else "")
|
|
+ (f"YouTube-Uploader/Kanal: {hints.yt_uploader}\n" if hints.yt_uploader else "")
|
|
+ (f"Tracklist-Kopf (Label/Jahr/Albumtitel):\n{tracklist_header}\n\n" if tracklist_header else "")
|
|
+ f"Tracks:\n{tracks_summary}\n\n"
|
|
'Antworte NUR mit einem JSON-Objekt (null wenn unbekannt):\n'
|
|
'{"artist": ..., "album": ..., "albumartist": ..., "year": ..., "genre": ..., "label": ...}'
|
|
)
|
|
|
|
|
|
def _parse_json_response(text: str) -> Optional[Dict]:
|
|
import json, re
|
|
m = re.search(r"\{.*\}", text, re.DOTALL)
|
|
if m:
|
|
try:
|
|
return json.loads(m.group())
|
|
except Exception:
|
|
pass
|
|
return None
|
|
|
|
|
|
def _resolve_via_ollama(hints: AlbumHints, partial: Dict) -> Optional[Dict]:
|
|
"""Lokales Reasoning via Ollama (kein API-Key nötig)."""
|
|
import urllib.request, json
|
|
prompt = _build_resolve_prompt(hints, partial)
|
|
payload = json.dumps({
|
|
"model": OLLAMA_RESOLVE_MODEL,
|
|
"messages": [{"role": "user", "content": prompt}],
|
|
"stream": False,
|
|
"format": "json",
|
|
"options": {"temperature": 0.1},
|
|
}).encode()
|
|
try:
|
|
req = urllib.request.Request(
|
|
f"{OLLAMA_HOST}/api/chat",
|
|
data=payload,
|
|
headers={"Content-Type": "application/json"},
|
|
method="POST",
|
|
)
|
|
with urllib.request.urlopen(req, timeout=240) as resp:
|
|
data = json.loads(resp.read())
|
|
text = data.get("message", {}).get("content", "").strip()
|
|
return _parse_json_response(text)
|
|
except Exception as e:
|
|
print(f" ⚠️ Ollama-Resolve-Fehler: {e}", file=sys.stderr)
|
|
return None
|
|
|
|
|
|
def _resolve_via_openrouter(hints: AlbumHints, partial: Dict) -> Optional[Dict]:
|
|
"""Reasoning via OpenRouter (günstige chinesische Modelle bevorzugt)."""
|
|
if not OPENROUTER_API_KEY:
|
|
return None
|
|
import urllib.request, json
|
|
prompt = _build_resolve_prompt(hints, partial)
|
|
# DeepSeek V3: extrem günstig, sehr kompetent
|
|
model = "deepseek/deepseek-chat-v3-0324"
|
|
payload = json.dumps({
|
|
"model": model,
|
|
"messages": [{"role": "user", "content": prompt}],
|
|
"temperature": 0.1,
|
|
"max_tokens": 300,
|
|
}).encode()
|
|
try:
|
|
req = urllib.request.Request(
|
|
"https://openrouter.ai/api/v1/chat/completions",
|
|
data=payload,
|
|
headers={
|
|
"Content-Type": "application/json",
|
|
"Authorization": f"Bearer {OPENROUTER_API_KEY}",
|
|
"HTTP-Referer": "https://pi.local",
|
|
"X-Title": "MusicMetadataEnricher",
|
|
},
|
|
method="POST",
|
|
)
|
|
with urllib.request.urlopen(req, timeout=30) as resp:
|
|
data = json.loads(resp.read())
|
|
text = data["choices"][0]["message"]["content"].strip()
|
|
return _parse_json_response(text)
|
|
except Exception as e:
|
|
print(f" ⚠️ OpenRouter-Resolve-Fehler: {e}", file=sys.stderr)
|
|
return None
|
|
|
|
|
|
def _claude_resolve(hints: AlbumHints, partial: Dict) -> Optional[Dict]:
|
|
"""
|
|
Reihenfolge: Ollama (lokal, kostenlos) → OpenRouter (günstig).
|
|
Claude API wird bewusst nicht genutzt (zu teuer).
|
|
"""
|
|
# 1. Ollama lokal (bevorzugt — kostenlos, RTX 3090)
|
|
result = _resolve_via_ollama(hints, partial)
|
|
if result:
|
|
return result
|
|
|
|
# 2. OpenRouter (DeepSeek V3, günstig) wenn Key gesetzt
|
|
if OPENROUTER_API_KEY:
|
|
result = _resolve_via_openrouter(hints, partial)
|
|
if result:
|
|
return result
|
|
|
|
return None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Main resolver
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def resolve(
|
|
hints: AlbumHints,
|
|
use_fingerprint: bool = True,
|
|
use_api: bool = True,
|
|
use_claude: bool = True,
|
|
) -> AlbumProposal:
|
|
confidence = 0.0
|
|
sources: List[str] = []
|
|
notes: List[str] = []
|
|
|
|
artist = hints.dir_artist
|
|
album = hints.dir_album
|
|
year = hints.dir_year
|
|
genre: Optional[str] = None
|
|
label: Optional[str] = None
|
|
release_mbid: Optional[str] = None
|
|
mb_tracks: Optional[List] = None
|
|
|
|
# Collect artist/album from existing tags (majority vote)
|
|
tag_artists = [t.existing_tags.get("artist") for t in hints.tracks if t.existing_tags.get("artist")]
|
|
tag_albums = [t.existing_tags.get("album") for t in hints.tracks if t.existing_tags.get("album")]
|
|
if tag_artists:
|
|
from collections import Counter
|
|
artist = artist or Counter(tag_artists).most_common(1)[0][0]
|
|
if tag_albums:
|
|
from collections import Counter
|
|
album = album or Counter(tag_albums).most_common(1)[0][0]
|
|
|
|
# Tag year/genre/label
|
|
import re as _re
|
|
for t in hints.tracks:
|
|
raw_year = t.existing_tags.get("date") or t.existing_tags.get("year")
|
|
if raw_year and not year:
|
|
# Strip invisible chars so ID3TimeStamp validation doesn't fail later
|
|
year = _re.sub(r"[^\d\-T:+Z]", "", str(raw_year)).strip()[:10] or None
|
|
genre = genre or t.existing_tags.get("genre")
|
|
label = label or t.existing_tags.get("label") or t.existing_tags.get("organization")
|
|
|
|
# YouTube-Metadaten als zusätzliche Hinweise (Uploader → Künstler, Titel → Album/Track)
|
|
if hints.yt_uploader and not artist:
|
|
artist = hints.yt_uploader
|
|
if hints.yt_title and not album:
|
|
album = hints.yt_title
|
|
|
|
if artist or album:
|
|
confidence += 0.05
|
|
sources.append("local-hints")
|
|
if hints.yt_title or hints.yt_uploader:
|
|
sources.append("youtube")
|
|
|
|
# AcoustID fingerprinting
|
|
fp_mbids: Dict[str, List[str]] = {}
|
|
if use_fingerprint and use_api and HAS_ACOUSTID and ACOUSTID_API_KEY:
|
|
fp_mbids = _fingerprint_tracks(hints)
|
|
if fp_mbids:
|
|
confidence += 0.20
|
|
sources.append("acoustid")
|
|
# Try to get release from first matched recording
|
|
for mbids in fp_mbids.values():
|
|
for mbid in mbids[:1]:
|
|
rel = _mb_recording_to_release(mbid)
|
|
if rel:
|
|
release_mbid = rel.get("id")
|
|
confidence += 0.25
|
|
sources.append("musicbrainz-fingerprint")
|
|
break
|
|
if release_mbid:
|
|
break
|
|
|
|
# MusicBrainz text search
|
|
if use_api and HAS_MB and not release_mbid:
|
|
mb_result = _mb_search_release(artist, album, year)
|
|
if mb_result:
|
|
release_mbid = mb_result.get("id")
|
|
score = int(mb_result.get("ext:score", 0))
|
|
confidence += 0.30 * (score / 100)
|
|
sources.append("musicbrainz-text")
|
|
notes.append(f"MusicBrainz score: {score}")
|
|
|
|
# Fetch full release data
|
|
if use_api and release_mbid:
|
|
full_release = _mb_get_release_tracks(release_mbid)
|
|
if full_release:
|
|
if not artist:
|
|
creds = full_release.get("artist-credit", [])
|
|
artist = "".join(c.get("artist", {}).get("name", "") + c.get("joinphrase", "")
|
|
for c in creds if isinstance(c, dict)).strip() or artist
|
|
if not album:
|
|
album = full_release.get("title", album)
|
|
if not year:
|
|
year = full_release.get("date", "")[:4] or None
|
|
label_info = full_release.get("label-info-list", [])
|
|
if label_info and not label:
|
|
label = label_info[0].get("label", {}).get("name") if label_info else None
|
|
rg = full_release.get("release-group", {})
|
|
if not genre:
|
|
genre = (rg.get("primary-type") or "").strip() or None
|
|
mb_tracks = []
|
|
for medium in full_release.get("medium-list", []):
|
|
disc_num = medium.get("position", 1)
|
|
for track in medium.get("track-list", []):
|
|
raw_num = str(track.get("number", "0") or "0").strip()
|
|
# Vinyl: "A1"→track 1 disc 1, "B3"→track 3 disc 1, "C1"→disc 2, "D1"→disc 2
|
|
vinyl_match = re.match(r"^([A-Za-z])(\d+)$", raw_num)
|
|
if vinyl_match:
|
|
side_letter = vinyl_match.group(1).upper()
|
|
track_number = int(vinyl_match.group(2))
|
|
# A,B → disc 1; C,D → disc 2; E,F → disc 3 …
|
|
disc_num = (ord(side_letter) - ord("A")) // 2 + 1
|
|
else:
|
|
num_match = re.search(r"\d+", raw_num)
|
|
track_number = int(num_match.group()) if num_match else 0
|
|
mb_tracks.append({
|
|
"disc": disc_num,
|
|
"number": track_number,
|
|
"title": track.get("recording", {}).get("title", ""),
|
|
"artist": track.get("artist-credit-phrase", ""),
|
|
"mbid": track.get("recording", {}).get("id"),
|
|
})
|
|
|
|
# Discogs fallback
|
|
discogs_release_id = None
|
|
if use_api and HAS_DISCOGS and DISCOGS_TOKEN and not release_mbid:
|
|
dg = _discogs_search(artist, album)
|
|
if dg:
|
|
artist = artist or dg.get("artist")
|
|
album = album or dg.get("album")
|
|
year = year or dg.get("year")
|
|
genre = genre or dg.get("genre")
|
|
label = label or dg.get("label")
|
|
discogs_release_id = dg.get("id")
|
|
confidence += 0.15
|
|
sources.append("discogs")
|
|
|
|
# Tracklist-Fallbacks: Discogs → Last.fm (wenn MusicBrainz keine Tracks geliefert hat)
|
|
if use_api and not mb_tracks:
|
|
if discogs_release_id:
|
|
dg_tracks = _discogs_get_tracklist(discogs_release_id)
|
|
if dg_tracks:
|
|
mb_tracks = dg_tracks
|
|
sources.append("discogs-tracklist")
|
|
if not mb_tracks:
|
|
lfm_tracks = _lastfm_tracklist(artist, album)
|
|
if lfm_tracks:
|
|
mb_tracks = lfm_tracks
|
|
sources.append("lastfm-tracklist")
|
|
|
|
# LLM-Reasoning für verbleibende Lücken:
|
|
# Reihenfolge: Ollama lokal → OpenRouter (DeepSeek, günstig) → Claude API
|
|
cl_albumartist: Optional[str] = None
|
|
partial = {"artist": artist, "album": album, "year": year}
|
|
if use_claude and use_api:
|
|
if not artist or not album or confidence < 0.5:
|
|
cl = _claude_resolve(hints, partial)
|
|
if cl:
|
|
if confidence < 0.3:
|
|
# Sehr unsicher: LLM darf auch bestehende Werte korrigieren
|
|
# (z.B. Tippfehler im Albumtitel aus dem Verzeichnisnamen)
|
|
artist = cl.get("artist") or artist
|
|
album = cl.get("album") or album
|
|
year = cl.get("year") or year
|
|
genre = cl.get("genre") or genre
|
|
label = cl.get("label") or label
|
|
else:
|
|
artist = artist or cl.get("artist")
|
|
album = album or cl.get("album")
|
|
year = year or cl.get("year")
|
|
genre = genre or cl.get("genre")
|
|
label = label or cl.get("label")
|
|
cl_albumartist = cl.get("albumartist") or None
|
|
confidence += 0.10
|
|
sources.append("llm-resolve")
|
|
|
|
# Finalize albumartist
|
|
# Priorität: (1) LLM-albumartist bei niedriger Konfidenz
|
|
# (2) dir_artist wenn Verzeichnisname einen Künstler nennt
|
|
# (3) Heuristiken (Various Artists, Mehrheitsabstimmung)
|
|
# Rationale: "Bach_Organ_-_Peter_Hurford" → dir_artist="Bach Organ" ist kein Künstler,
|
|
# aber der Verzeichnisname sieht aus wie Künstler; LLM kann das korrekt auflösen.
|
|
track_artists = [t.artist for t in hints.tracks if t.artist]
|
|
from collections import Counter
|
|
distinct_artists = set(a for a in track_artists if a)
|
|
|
|
_bad_aa = {"various artists", "unknown artist", "unknown", "va"}
|
|
def _good_aa(s: Optional[str]) -> bool:
|
|
return bool(s) and s.casefold().strip() not in _bad_aa
|
|
|
|
if _good_aa(cl_albumartist) and confidence < 0.4:
|
|
# LLM kennt den echten Albumkünstler besser als der Verzeichnisname
|
|
albumartist = cl_albumartist # type: ignore[assignment]
|
|
elif hints.dir_artist:
|
|
albumartist = hints.dir_artist
|
|
elif len(distinct_artists) >= 3:
|
|
albumartist = "Various Artists"
|
|
elif track_artists:
|
|
albumartist = artist or Counter(track_artists).most_common(1)[0][0]
|
|
else:
|
|
albumartist = artist or "Unknown Artist"
|
|
|
|
album = album or hints.album_dir.name.replace("_", " ")
|
|
artist = artist or albumartist
|
|
confidence = min(confidence, 1.0)
|
|
|
|
# Build track proposals
|
|
# `artist` = Komponist/Hauptkünstler (LLM-aufgelöst), `albumartist` = Performer
|
|
# Werden beide weitergegeben damit _build_track_proposals richtig zuordnen kann.
|
|
track_proposals = _build_track_proposals(hints, mb_tracks, album, albumartist, composer=artist)
|
|
|
|
return AlbumProposal(
|
|
album_dir=hints.album_dir,
|
|
album=album,
|
|
albumartist=albumartist,
|
|
date=year,
|
|
genre=normalize_genre(genre),
|
|
label=label,
|
|
mbid=release_mbid,
|
|
cover_path=None,
|
|
cover_source=None,
|
|
tracks=track_proposals,
|
|
confidence=confidence,
|
|
sources=sources,
|
|
notes=notes,
|
|
)
|
|
|
|
|
|
def _build_track_proposals(
|
|
hints: AlbumHints,
|
|
mb_tracks: Optional[List],
|
|
album: str,
|
|
album_artist: str,
|
|
composer: Optional[str] = None,
|
|
) -> List[TrackProposal]:
|
|
proposals: List[TrackProposal] = []
|
|
|
|
for th in sorted(hints.tracks, key=lambda t: (t.disc_number or 1, t.track_number or 9999, str(t.path))):
|
|
title = th.title
|
|
track_num = th.track_number
|
|
disc_num = th.disc_number
|
|
|
|
# Klassik-Fall: Performer aus Dateiname, Komponist aus LLM
|
|
# Wenn th.artist == albumartist (Performer), und wir den Komponisten kennen,
|
|
# wird der Komponist als Track-Artist gesetzt → Filename: TT_-_Performer_-_Komponist_-_Werk
|
|
th_artist_cf = (th.artist or "").casefold().strip()
|
|
aa_cf = album_artist.casefold().strip()
|
|
if composer and th_artist_cf == aa_cf and th_artist_cf:
|
|
# Performer == albumartist → Komponist als Track-Artist
|
|
artist = composer
|
|
else:
|
|
artist = th.artist or album_artist
|
|
|
|
# Try to match from MusicBrainz track list
|
|
if mb_tracks and track_num:
|
|
for mb_t in mb_tracks:
|
|
if mb_t["number"] == track_num and mb_t["disc"] == (disc_num or 1):
|
|
if mb_t.get("title"):
|
|
title = mb_t["title"]
|
|
if mb_t.get("artist"):
|
|
artist = mb_t["artist"]
|
|
break
|
|
|
|
title = title or th.path.stem
|
|
|
|
proposals.append(TrackProposal(
|
|
path=th.path,
|
|
title=title,
|
|
artist=artist,
|
|
track_number=track_num,
|
|
disc_number=disc_num,
|
|
mbid=None,
|
|
))
|
|
|
|
# Sequenzielle Nummerierung als letzter Fallback:
|
|
# Tracks ohne Nummer (None) erhalten eine laufende Nummer pro Disc.
|
|
# Damit werden "00" und "??" im Dateinamen beim --rename verhindert.
|
|
if any(p.track_number is None for p in proposals):
|
|
disc_counters: Dict[int, int] = {}
|
|
for p in proposals:
|
|
if p.track_number is None:
|
|
disc = p.disc_number or 1
|
|
disc_counters[disc] = disc_counters.get(disc, 0) + 1
|
|
p.track_number = disc_counters[disc]
|
|
|
|
return proposals
|