Music_Metadata_Enricher/hint_extractor.py
dschlueter cd6c0ae185 Fix crash on vinyl track positions like 'A1', 'B2' from MusicBrainz
MusicBrainz returns vinyl track numbers as 'A1', 'B3' etc. instead of
plain integers. int('A1') raised ValueError crashing the entire album.

metadata_resolver.py: parse vinyl positions with regex before int():
- 'A1' → track 1, disc 1 (side A)
- 'B3' → track 3, disc 1 (side B)
- 'C1' → track 1, disc 2 (side C)
- Non-vinyl: extract first digit group via re.search

hint_extractor.py: guard int(tl_track) in tracklist matching with
try/except + re.search so any non-numeric track position is skipped
gracefully instead of crashing.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-29 09:12:31 +02:00

649 lines
24 KiB
Python
Executable file
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
import base64
import json
import os
import re
import shutil
import subprocess
import sys
import urllib.request
from pathlib import Path
from typing import Optional, List, Dict, Tuple
from models import AlbumScan, AlbumHints, TrackHints
try:
from mutagen import File as MutagenFile
HAS_MUTAGEN = True
except ImportError:
HAS_MUTAGEN = False
try:
from bs4 import BeautifulSoup
HAS_BS4 = True
except ImportError:
HAS_BS4 = False
_NATSORT_RE = re.compile(r"(\d+)")
_BAD_VALUES = {"unknown", "unknown artist", "unknown album", "untitled", "track", "va", "various"}
# Filename patterns: most specific first
_FILENAME_PATTERNS = [
re.compile(r"^(?P<disc>\d{1,2})[- _]+(?P<track>\d{1,3})\s*[-._ ]+\s*(?P<artist>.+?)\s*[-]\s*(?P<title>.+)$"),
re.compile(r"^(?P<disc>\d{1,2})[- _]+(?P<track>\d{1,3})\s*[-._ ]+\s*(?P<title>.+)$"),
re.compile(r"^(?P<track>\d{1,3})\s*[-._ ]+\s*(?P<artist>.+?)\s*[-]\s*(?P<title>.+)$"),
re.compile(r"^(?P<track>\d{1,3})\s*[-._ ]+\s*(?P<title>.+)$"),
re.compile(r"^(?P<artist>.+?)\s*[-]\s*(?P<title>.+)$"),
]
# Directory name patterns
_DIR_PATTERNS = [
re.compile(r"^(?P<artist>.+?)[_ -]+[-][_ -]+(?P<album>.+?)(?:[_ -]+(?P<year>\d{4}))?$"),
re.compile(r"^(?P<artist>.+?)[_ ]+(?P<year>\d{4})[._ -]+(?P<album>.+)$"),
re.compile(r"^(?P<album>.+?)[_ -]+(?P<year>\d{4})$"),
]
# Tracklist line patterns
_TRACKLIST_PATTERNS = [
re.compile(r"^(?P<disc>\d{1,2})[- _](?P<track>\d{1,3})\s+(?P<title>.+?)(?:\s+\d+:\d{2})?$"),
# Separator muss . ) oder : sein — reines Leerzeichen reicht nicht
# (verhindert False-Positives wie "2 x CD, Compilation, Remastered")
re.compile(r"^(?P<track>\d{1,3})[.):]\s*(?P<title>.+?)(?:\s+\d+:\d{2})?$"),
re.compile(r"^(?P<track>[A-Z]\d{1,2})[.):]\s*(?P<title>.+?)(?:\s+\d+:\d{2})?$"),
]
_DISC_SECTION_RE = re.compile(r"(?i)(?:cd|disc|disk|side)[_ \-]*(\d{1,2})")
def _clean(s: Optional[str]) -> str:
if not s:
return ""
# BOM (U+FEFF), Zero-Width-Space (U+200B), Soft-Hyphen (U+00AD) entfernen
s = re.sub(r"[­]", "", s)
return re.sub(r"\s+", " ", s.replace("_", " ")).strip(" -._")
def _norm_for_match(s: str) -> str:
"""Nur Buchstaben und Ziffern — für fuzzy Titelvergleich (Interpunktion-agnostisch)."""
return re.sub(r"[^a-z0-9]", "", s.casefold())
# Klassische Werkverzeichnis-Nummern: BWV 565, Op. 27, K. 331, HWV 56, …
_CATALOG_RE = re.compile(
r"\b(bwv|hwv|op|k|kv|d|sz|wq|bbwv|rv|twv|hob)\W*(\d+[a-z]?(?:[\/\.]\d+)?)",
re.IGNORECASE,
)
def _catalog_key(s: str) -> Optional[str]:
"""Extrahiert normalisierte Katalognummer, z.B. 'bwv565' oder 'op27'."""
m = _CATALOG_RE.search(s)
if m:
return m.group(1).lower() + re.sub(r"\W", "", m.group(2))
return None
def _is_good(v: Optional[str]) -> bool:
if not v:
return False
return _clean(v).casefold() not in _BAD_VALUES
def _parse_dirname(name: str) -> Tuple[Optional[str], Optional[str], Optional[str]]:
name_clean = _clean(name)
for pat in _DIR_PATTERNS:
m = pat.match(name_clean)
if m:
d = m.groupdict()
artist = _clean(d.get("artist")) or None
album = _clean(d.get("album")) or None
year = d.get("year")
if _is_good(artist) or _is_good(album):
return artist, album, year
# No pattern matched — treat whole name as album
return None, _clean(name_clean), None
def _parse_filename(stem: str) -> Dict[str, str]:
stem_clean = _clean(stem)
for pat in _FILENAME_PATTERNS:
m = pat.match(stem_clean)
if m:
return {k: _clean(v) for k, v in m.groupdict().items() if v}
return {"title": stem_clean}
def _read_tags(path: Path) -> Tuple[Dict[str, str], Optional[float]]:
if not HAS_MUTAGEN:
return {}, None
try:
audio = MutagenFile(str(path), easy=True)
if not audio:
return {}, None
tags: Dict[str, str] = {}
for k in ("title", "artist", "album", "albumartist", "tracknumber",
"discnumber", "date", "year", "genre", "label", "organization"):
v = audio.get(k)
if v:
tags[k] = str(v[0]).strip()
if "year" in tags and "date" not in tags:
tags["date"] = tags["year"]
duration = None
if hasattr(audio, "info") and audio.info and hasattr(audio.info, "length"):
duration = audio.info.length
return tags, duration
except Exception as e:
print(f" ⚠️ Tag-Lesefehler {path.name}: {e}", file=sys.stderr)
return {}, None
_STANDALONE_NUM_RE = re.compile(r"^\d{1,3}$")
_DURATION_ONLY_RE = re.compile(r"^\d{1,2}:\d{2}$")
def _normalize_vertical_tracklist(text: str) -> Optional[str]:
"""
Erkennt 'vertikales' Format:
1
Katka dovádí
3:22
2
Záludná
→ konvertiert zu '1. Katka dovádí 3:22\\n2. Záludná ...'
"""
non_empty = [l.strip() for l in text.splitlines() if l.strip()]
# Mindestens 3 Standalone-Zahlen als Heuristik
num_lines = sum(1 for l in non_empty if _STANDALONE_NUM_RE.match(l))
if num_lines < 3:
return None
result = []
i = 0
while i < len(non_empty):
line = non_empty[i]
if _STANDALONE_NUM_RE.match(line) and i + 1 < len(non_empty):
title_candidate = non_empty[i + 1]
# Nächste Zeile darf selbst keine Zahl und keine Dauer sein
if not _STANDALONE_NUM_RE.match(title_candidate) and not _DURATION_ONLY_RE.match(title_candidate):
duration = ""
skip = 2
if i + 2 < len(non_empty) and _DURATION_ONLY_RE.match(non_empty[i + 2]):
duration = non_empty[i + 2]
skip = 3
entry = f"{line}. {title_candidate}"
if duration:
entry += f" {duration}"
result.append(entry)
i += skip
continue
i += 1
return "\n".join(result) if len(result) >= 3 else None
def _parse_tracklist(text: str) -> List[Dict[str, str]]:
# Vertikales Format normalisieren bevor das reguläre Parsing läuft
normalized = _normalize_vertical_tracklist(text)
if normalized:
text = normalized
tracks: List[Dict[str, str]] = []
current_disc = 1
for line in text.splitlines():
line = line.strip()
if not line:
continue
disc_m = _DISC_SECTION_RE.match(line)
if disc_m and len(line) < 30:
current_disc = int(disc_m.group(1))
continue
for pat in _TRACKLIST_PATTERNS:
m = pat.match(line)
if m:
d = m.groupdict()
entry: Dict[str, str] = {"title": _clean(d.get("title", ""))}
raw_track = d.get("track", "")
if raw_track and raw_track.isdigit():
entry["track"] = raw_track.lstrip("0") or "0"
elif raw_track:
entry["track"] = raw_track
if "disc" in d and d["disc"]:
entry["disc"] = d["disc"]
else:
entry["disc"] = str(current_disc)
if entry.get("title"):
tracks.append(entry)
break
return tracks
def _parse_m3u(text: str) -> List[Dict[str, str]]:
"""M3U/M3U8 → geordnete Liste: [{filename, title, position}].
Reihenfolge der Einträge = gewünschte Trackreihenfolge.
"""
tracks: List[Dict[str, str]] = []
pending_title: Optional[str] = None
position = 0
for line in text.splitlines():
line = line.strip()
if not line:
continue
if line.upper().startswith("#EXTINF:"):
parts = line.split(",", 1)
pending_title = parts[1].strip() if len(parts) > 1 else None
elif not line.startswith("#"):
filename = Path(line.replace("\\", "/")).name
if not filename:
continue
position += 1
tracks.append({
"position": str(position),
"filename": filename,
"title": pending_title or "",
})
pending_title = None
return tracks
def _read_tracklist_file(path: Path) -> Optional[str]:
try:
if path.suffix.lower() in (".htm", ".html"):
raw = path.read_bytes()
encoding = "utf-8"
for enc in ("utf-8", "latin-1", "cp1252"):
try:
raw.decode(enc)
encoding = enc
break
except UnicodeDecodeError:
continue
text = raw.decode(encoding, errors="replace")
if HAS_BS4:
soup = BeautifulSoup(text, "html.parser")
return soup.get_text(separator="\n")
# Fallback: strip HTML tags
return re.sub(r"<[^>]+>", " ", text)
else:
for enc in ("utf-8", "latin-1", "cp1252"):
try:
return path.read_text(encoding=enc)
except UnicodeDecodeError:
continue
except Exception as e:
print(f" ⚠️ Tracklist-Lesefehler {path.name}: {e}", file=sys.stderr)
return None
_OLLAMA_HOST = os.getenv("OLLAMA_HOST", "http://localhost:11434")
# Modelle in Prioritätsreihenfolge; überschreibbar via OLLAMA_OCR_MODEL
_OCR_MODELS = [m.strip() for m in os.getenv(
"OLLAMA_OCR_MODEL",
"qwen3-vl:latest,minicpm-v:latest,deepseek-ocr:latest"
).split(",") if m.strip()]
_OCR_PROMPT = (
"This image shows a CD album back cover or booklet page. "
"Your task: extract the complete tracklist as plain text.\n"
"Rules:\n"
"- Output track number and title per line, e.g. '1. Title' or '1-1 Title'\n"
"- If multiple discs/CDs: include a header like 'CD 1' or 'Disc 1' before each group\n"
"- Include durations if visible (e.g. '1. Title 4:32')\n"
"- Do NOT include label info, barcodes, or other non-tracklist text\n"
"- If no tracklist is visible, reply with: NO_TRACKLIST"
)
def _ocr_back_cover(image_files: List[Path]) -> Optional[str]:
"""
OCR eines Back-Cover- oder Booklet-Bildes via Ollama Vision.
Gibt den erkannten Text zurück, oder None wenn nichts gefunden.
"""
# Nur Bilder die nach Back-Cover aussehen
candidates = [
p for p in image_files
if any(kw in p.name.lower() for kw in ("back", "inlay", "booklet", "inside", "rear"))
]
# Fallback: alle Bilder außer dem Front-Cover
if not candidates:
candidates = [
p for p in image_files
if not any(kw in p.name.lower() for kw in ("front", "folder", "cover"))
]
if not candidates:
return None
image_path = candidates[0]
try:
img_b64 = base64.b64encode(image_path.read_bytes()).decode()
except Exception as e:
print(f" ⚠️ OCR-Bild lesen {image_path.name}: {e}", file=sys.stderr)
return None
for model in _OCR_MODELS:
payload = json.dumps({
"model": model,
"messages": [{
"role": "user",
"content": _OCR_PROMPT,
"images": [img_b64],
}],
"stream": False,
"options": {"temperature": 0.0},
}).encode()
try:
req = urllib.request.Request(
f"{_OLLAMA_HOST}/api/chat",
data=payload,
headers={"Content-Type": "application/json"},
method="POST",
)
with urllib.request.urlopen(req, timeout=180) as resp:
data = json.loads(resp.read())
text = data.get("message", {}).get("content", "").strip()
if text and "NO_TRACKLIST" not in text:
print(f" 📷 OCR {image_path.name} via {model}: {len(text)} Zeichen extrahiert",
file=sys.stderr)
return text
elif "NO_TRACKLIST" in text:
print(f" 📷 OCR {image_path.name}: kein Tracklist-Text erkannt", file=sys.stderr)
return None
except Exception as e:
print(f" ⚠️ OCR-Fehler ({model}) {image_path.name}: {e}", file=sys.stderr)
continue
return None
def _check_cover_images(paths: List[Path]) -> List[Path]:
good: List[Path] = []
for p in paths:
name_lower = p.name.lower()
# Prefer front covers
if any(kw in name_lower for kw in ("front", "folder", "cover", "album")):
good.insert(0, p)
else:
good.append(p)
return good
# YouTube-Video-ID: exakt 11 Zeichen aus [A-Za-z0-9_-], typischerweise letztes _-Token
_YT_ID_CHARS = re.compile(r"^[A-Za-z0-9_-]{11}$")
def _extract_youtube_id(path: Path) -> Optional[str]:
"""
Erkennt YouTube-Video-ID als letztes '_'-getrenntes Token im Dateinamen.
Plausibilitätsprüfung: mind. ein Großbuchstabe UND mind. ein Kleinbuchstabe/Ziffer.
"""
candidate = path.stem.split("_")[-1] # letztes Token nach Unterstrich
if (len(candidate) == 11
and _YT_ID_CHARS.match(candidate)
and re.search(r"[A-Z]", candidate)
and re.search(r"[0-9a-z]", candidate)):
return candidate
return None
def _fetch_youtube_metadata(video_id: str) -> Optional[Dict]:
"""
Ruft YouTube-Metadaten via yt-dlp ab (kein API-Key nötig).
Gibt Dict mit title, uploader, chapters, description zurück oder None.
"""
ytdlp = shutil.which("yt-dlp")
if not ytdlp:
return None
url = f"https://www.youtube.com/watch?v={video_id}"
try:
result = subprocess.run(
[ytdlp, "--dump-json", "--no-download", "--no-playlist", url],
capture_output=True, text=True, timeout=30,
)
if result.returncode != 0 or not result.stdout.strip():
return None
return json.loads(result.stdout)
except Exception as e:
print(f" ⚠️ YouTube-Fehler ({video_id}): {e}", file=sys.stderr)
return None
def _chapters_to_tracklist_text(chapters: List[Dict]) -> str:
"""
Konvertiert yt-dlp-Chapters in Tracklist-Text der vom _parse_tracklist
verarbeitetet werden kann: '1. Titel MM:SS'
"""
lines = []
for i, ch in enumerate(chapters, 1):
title = ch.get("title", "").strip()
if not title or title.startswith("<Untitled"):
continue
secs = int(ch.get("start_time", 0))
mm, ss = divmod(secs, 60)
lines.append(f"{i}. {title} {mm}:{ss:02d}")
return "\n".join(lines)
def extract_hints(scan: AlbumScan, use_ocr: bool = True) -> AlbumHints:
hints = AlbumHints(album_dir=scan.album_dir)
# Directory name
hints.dir_artist, hints.dir_album, hints.dir_year = _parse_dirname(scan.album_dir.name)
# Cover images
hints.cover_images = _check_cover_images(scan.image_files)
# Tracklist files
texts: List[str] = []
for tf in scan.tracklist_files:
txt = _read_tracklist_file(tf)
if txt:
texts.append(txt)
hints.tracklist_text = "\n\n".join(texts) if texts else None
# OCR-Fallback: Back-Cover scannen wenn keine Tracklist-Textdatei vorhanden
if use_ocr and not hints.tracklist_text and scan.image_files:
ocr_text = _ocr_back_cover(scan.image_files)
if ocr_text:
hints.tracklist_text = ocr_text
# YouTube-Lookup: IDs aus Dateinamen extrahieren, Metadaten per yt-dlp holen
yt_meta_by_id: Dict[str, Optional[Dict]] = {}
yt_ids_by_stem: Dict[str, str] = {} # stem (normalisiert) → youtube_id
for audio_path in scan.audio_files:
yt_id = _extract_youtube_id(audio_path)
if yt_id:
stem_key = _clean(audio_path.stem).casefold()
yt_ids_by_stem[stem_key] = yt_id
yt_meta_by_id.setdefault(yt_id, None)
if yt_meta_by_id:
print(f" 📺 YouTube-IDs gefunden: {', '.join(list(yt_meta_by_id.keys())[:5])}", file=sys.stderr)
for yt_id in list(yt_meta_by_id.keys())[:5]:
meta = _fetch_youtube_metadata(yt_id)
yt_meta_by_id[yt_id] = meta
# Chapters als Tracklist nutzen wenn noch keine vorhanden
if not hints.tracklist_text:
for yt_id, meta in yt_meta_by_id.items():
if meta and meta.get("chapters"):
chapter_text = _chapters_to_tracklist_text(meta["chapters"])
if chapter_text:
hints.tracklist_text = chapter_text
print(f" 📺 YouTube-Chapters als Tracklist: {len(meta['chapters'])} Tracks",
file=sys.stderr)
break
# Album-Level-Hints (erster erfolgreicher Treffer)
for yt_id, meta in yt_meta_by_id.items():
if meta:
hints.yt_title = (meta.get("title") or "").strip() or None
hints.yt_uploader = (
meta.get("uploader") or meta.get("channel") or ""
).strip() or None
break
parsed_tracklist = _parse_tracklist(hints.tracklist_text) if hints.tracklist_text else []
# M3U/Playlist-Reihenfolge: filename (stem, normalisiert) → Tracknummer
m3u_order: Dict[str, int] = {}
m3u_titles: Dict[str, str] = {}
for pf in scan.playlist_files:
try:
text = pf.read_text(encoding="utf-8", errors="replace")
for entry in _parse_m3u(text):
stem = _clean(Path(entry["filename"]).stem).casefold()
pos = int(entry["position"])
if stem and stem not in m3u_order:
m3u_order[stem] = pos
if entry.get("title"):
m3u_titles[stem] = entry["title"]
except Exception as e:
print(f" ⚠️ Playlist-Lesefehler {pf.name}: {e}", file=sys.stderr)
# Tracklist-Lookup: exakter Titel, fuzzy Titel, Katalognummer (BWV, Op., K., …)
tl_by_title: Dict[str, Dict[str, str]] = {}
tl_by_title_norm: Dict[str, Dict[str, str]] = {}
tl_by_catalog: Dict[str, Dict[str, str]] = {}
for entry in parsed_tracklist:
raw_title = entry.get("title", "")
exact_key = _clean(raw_title).casefold()
if exact_key:
tl_by_title[exact_key] = entry
norm_key = _norm_for_match(raw_title)
if norm_key:
tl_by_title_norm[norm_key] = entry
cat_key = _catalog_key(raw_title)
if cat_key:
tl_by_catalog[cat_key] = entry
# Build TrackHints per audio file
for audio_path in sorted(scan.audio_files):
stem_key = _clean(audio_path.stem).casefold()
yt_id_for_file = yt_ids_by_stem.get(stem_key)
# Stem ohne YouTube-ID für Dateiname-Parsing
parse_stem = audio_path.stem
if yt_id_for_file:
tokens = parse_stem.rsplit("_", 1)
if len(tokens) == 2 and tokens[1] == yt_id_for_file:
parse_stem = tokens[0]
tags, duration = _read_tags(audio_path)
fn_hints = _parse_filename(parse_stem)
track_num: Optional[int] = None
disc_num: Optional[int] = None
# Track number: tag > filename
raw_tn = tags.get("tracknumber") or fn_hints.get("track")
if raw_tn:
try:
tn_int = int(str(raw_tn).split("/")[0])
if tn_int > 0: # 0 gilt als "keine Nummer"
track_num = tn_int
except ValueError:
pass
# Disc number: tag > filename > path segment
raw_dn = tags.get("discnumber") or fn_hints.get("disc")
if raw_dn:
try:
disc_num = int(str(raw_dn).split("/")[0])
except ValueError:
pass
if not disc_num:
for part in audio_path.relative_to(scan.album_dir).parts[:-1]:
dm = _DISC_SECTION_RE.search(part)
if dm:
disc_num = int(dm.group(1))
break
title_raw = tags.get("title") or fn_hints.get("title")
title = title_raw if _is_good(title_raw) else fn_hints.get("title")
artist_raw = tags.get("artist") or fn_hints.get("artist")
artist = artist_raw if _is_good(artist_raw) else fn_hints.get("artist")
# Tracklist-Matching: Nummer → exakter Titel → fuzzy Titel
# Wenn ein Match gefunden: disc+track aus Tracklist übernehmen (Tracklist ist
# autoritativer als M3U-Reihenfolge bei Alben mit expliziter Disc-Nummerierung).
if parsed_tracklist:
matched_tl: Optional[Dict[str, str]] = None
# 1. Exakt per Tracknummer + Disc (disc_num=None → Single-CD, assume 1)
if track_num:
assumed_disc = disc_num if disc_num else 1
for tl_entry in parsed_tracklist:
tl_track = tl_entry.get("track")
try:
tl_disc = int(tl_entry.get("disc", "1"))
tl_track_int = int(re.search(r"\d+", str(tl_track)).group()) if tl_track else None
except (ValueError, AttributeError):
continue
if (tl_track_int is not None and tl_track_int == track_num
and tl_disc == assumed_disc):
matched_tl = tl_entry
break
# 2. Exakter Titelvergleich
if matched_tl is None and title:
matched_tl = tl_by_title.get(_clean(title).casefold())
# 3. Fuzzy Titelvergleich (ignoriert Kommas, Apostrophe, Groß-/Kleinschreibung)
if matched_tl is None and title:
matched_tl = tl_by_title_norm.get(_norm_for_match(title))
# 4. Katalognummer (BWV, Op., K. …) — greift bei abgekürzten Dateinamen
if matched_tl is None and title:
cat = _catalog_key(title)
if cat:
matched_tl = tl_by_catalog.get(cat)
if matched_tl:
# Titel aus Tracklist übernehmen wenn besser
if _is_good(matched_tl.get("title")):
title = matched_tl["title"]
# disc+track aus Tracklist sind autoritativer als M3U-Reihenfolge
try:
tl_track_n = int(matched_tl["track"]) if matched_tl.get("track") else None
tl_disc_n = int(matched_tl.get("disc", "1"))
if tl_track_n:
track_num = tl_track_n
disc_num = tl_disc_n
except (ValueError, KeyError):
pass
# M3U-Reihenfolge nur als letzter Fallback (wenn Tracklist kein Match liefert)
if track_num is None:
if stem_key in m3u_order:
track_num = m3u_order[stem_key]
# M3U-Titel als Fallback (enthält "Composer - Title" — nur nutzen wenn kein besserer Titel)
if not _is_good(title):
if stem_key in m3u_titles:
title = m3u_titles[stem_key]
# YouTube-Titel als letzter Fallback (bei einzelner Datei = das ganze Video)
if not _is_good(title):
yt_id = yt_id_for_file
if yt_id:
meta = yt_meta_by_id.get(yt_id)
if meta:
yt_video_title = (meta.get("title") or "").strip()
if yt_video_title:
title = yt_video_title
hints.tracks.append(TrackHints(
path=audio_path,
track_number=track_num,
disc_number=disc_num,
title=_clean(title) if title else None,
artist=_clean(artist) if artist else None,
duration=duration,
existing_tags=tags,
))
return hints