Music_Metadata_Enricher/hint_extractor.py
dschlueter 40a2ef3fb6 Add OCR fallback via Ollama Vision for albums without tracklist text
hint_extractor: _ocr_back_cover() sends back/inlay images to Ollama Vision
  when no tracklist .txt/.htm/.nfo is present. Model priority:
  qwen3-vl:latest → minicpm-v:latest → deepseek-ocr:latest (configurable
  via OLLAMA_OCR_MODEL env var). Timeout 180s. OCR text is fed into the
  same _parse_tracklist() pipeline as regular text files.

music_enricher: extract_hints(use_ocr=not args.no_api) — OCR is skipped
  with --no-api to allow fully offline/fast runs.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-29 05:42:03 +02:00

480 lines
17 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
import base64
import json
import os
import re
import sys
import urllib.request
from pathlib import Path
from typing import Optional, List, Dict, Tuple
from models import AlbumScan, AlbumHints, TrackHints
try:
from mutagen import File as MutagenFile
HAS_MUTAGEN = True
except ImportError:
HAS_MUTAGEN = False
try:
from bs4 import BeautifulSoup
HAS_BS4 = True
except ImportError:
HAS_BS4 = False
_NATSORT_RE = re.compile(r"(\d+)")
_BAD_VALUES = {"unknown", "unknown artist", "unknown album", "untitled", "track", "va", "various"}
# Filename patterns: most specific first
_FILENAME_PATTERNS = [
re.compile(r"^(?P<disc>\d{1,2})[- _]+(?P<track>\d{1,3})\s*[-._ ]+\s*(?P<artist>.+?)\s*[-]\s*(?P<title>.+)$"),
re.compile(r"^(?P<disc>\d{1,2})[- _]+(?P<track>\d{1,3})\s*[-._ ]+\s*(?P<title>.+)$"),
re.compile(r"^(?P<track>\d{1,3})\s*[-._ ]+\s*(?P<artist>.+?)\s*[-]\s*(?P<title>.+)$"),
re.compile(r"^(?P<track>\d{1,3})\s*[-._ ]+\s*(?P<title>.+)$"),
re.compile(r"^(?P<artist>.+?)\s*[-]\s*(?P<title>.+)$"),
]
# Directory name patterns
_DIR_PATTERNS = [
re.compile(r"^(?P<artist>.+?)[_ -]+[-][_ -]+(?P<album>.+?)(?:[_ -]+(?P<year>\d{4}))?$"),
re.compile(r"^(?P<artist>.+?)[_ ]+(?P<year>\d{4})[._ -]+(?P<album>.+)$"),
re.compile(r"^(?P<album>.+?)[_ -]+(?P<year>\d{4})$"),
]
# Tracklist line patterns
_TRACKLIST_PATTERNS = [
re.compile(r"^(?P<disc>\d{1,2})[- _](?P<track>\d{1,3})\s+(?P<title>.+?)(?:\s+\d+:\d{2})?$"),
# Separator muss . ) oder : sein — reines Leerzeichen reicht nicht
# (verhindert False-Positives wie "2 x CD, Compilation, Remastered")
re.compile(r"^(?P<track>\d{1,3})[.):]\s*(?P<title>.+?)(?:\s+\d+:\d{2})?$"),
re.compile(r"^(?P<track>[A-Z]\d{1,2})[.):]\s*(?P<title>.+?)(?:\s+\d+:\d{2})?$"),
]
_DISC_SECTION_RE = re.compile(r"(?i)(?:cd|disc|disk|side)[_ \-]*(\d{1,2})")
def _clean(s: Optional[str]) -> str:
if not s:
return ""
# BOM (U+FEFF), Zero-Width-Space (U+200B), Soft-Hyphen (U+00AD) entfernen
s = re.sub(r"[­]", "", s)
return re.sub(r"\s+", " ", s.replace("_", " ")).strip(" -._")
def _norm_for_match(s: str) -> str:
"""Nur Buchstaben und Ziffern — für fuzzy Titelvergleich (Interpunktion-agnostisch)."""
return re.sub(r"[^a-z0-9]", "", s.casefold())
# Klassische Werkverzeichnis-Nummern: BWV 565, Op. 27, K. 331, HWV 56, …
_CATALOG_RE = re.compile(
r"\b(bwv|hwv|op|k|kv|d|sz|wq|bbwv|rv|twv|hob)\W*(\d+[a-z]?(?:[\/\.]\d+)?)",
re.IGNORECASE,
)
def _catalog_key(s: str) -> Optional[str]:
"""Extrahiert normalisierte Katalognummer, z.B. 'bwv565' oder 'op27'."""
m = _CATALOG_RE.search(s)
if m:
return m.group(1).lower() + re.sub(r"\W", "", m.group(2))
return None
def _is_good(v: Optional[str]) -> bool:
if not v:
return False
return _clean(v).casefold() not in _BAD_VALUES
def _parse_dirname(name: str) -> Tuple[Optional[str], Optional[str], Optional[str]]:
name_clean = _clean(name)
for pat in _DIR_PATTERNS:
m = pat.match(name_clean)
if m:
d = m.groupdict()
artist = _clean(d.get("artist")) or None
album = _clean(d.get("album")) or None
year = d.get("year")
if _is_good(artist) or _is_good(album):
return artist, album, year
# No pattern matched — treat whole name as album
return None, _clean(name_clean), None
def _parse_filename(stem: str) -> Dict[str, str]:
stem_clean = _clean(stem)
for pat in _FILENAME_PATTERNS:
m = pat.match(stem_clean)
if m:
return {k: _clean(v) for k, v in m.groupdict().items() if v}
return {"title": stem_clean}
def _read_tags(path: Path) -> Tuple[Dict[str, str], Optional[float]]:
if not HAS_MUTAGEN:
return {}, None
try:
audio = MutagenFile(str(path), easy=True)
if not audio:
return {}, None
tags: Dict[str, str] = {}
for k in ("title", "artist", "album", "albumartist", "tracknumber",
"discnumber", "date", "year", "genre", "label", "organization"):
v = audio.get(k)
if v:
tags[k] = str(v[0]).strip()
if "year" in tags and "date" not in tags:
tags["date"] = tags["year"]
duration = None
if hasattr(audio, "info") and audio.info and hasattr(audio.info, "length"):
duration = audio.info.length
return tags, duration
except Exception as e:
print(f" ⚠️ Tag-Lesefehler {path.name}: {e}", file=sys.stderr)
return {}, None
def _parse_tracklist(text: str) -> List[Dict[str, str]]:
tracks: List[Dict[str, str]] = []
current_disc = 1
for line in text.splitlines():
line = line.strip()
if not line:
continue
disc_m = _DISC_SECTION_RE.match(line)
if disc_m and len(line) < 30:
current_disc = int(disc_m.group(1))
continue
for pat in _TRACKLIST_PATTERNS:
m = pat.match(line)
if m:
d = m.groupdict()
entry: Dict[str, str] = {"title": _clean(d.get("title", ""))}
raw_track = d.get("track", "")
if raw_track and raw_track.isdigit():
entry["track"] = raw_track.lstrip("0") or "0"
elif raw_track:
entry["track"] = raw_track
if "disc" in d and d["disc"]:
entry["disc"] = d["disc"]
else:
entry["disc"] = str(current_disc)
if entry.get("title"):
tracks.append(entry)
break
return tracks
def _parse_m3u(text: str) -> List[Dict[str, str]]:
"""M3U/M3U8 → geordnete Liste: [{filename, title, position}].
Reihenfolge der Einträge = gewünschte Trackreihenfolge.
"""
tracks: List[Dict[str, str]] = []
pending_title: Optional[str] = None
position = 0
for line in text.splitlines():
line = line.strip()
if not line:
continue
if line.upper().startswith("#EXTINF:"):
parts = line.split(",", 1)
pending_title = parts[1].strip() if len(parts) > 1 else None
elif not line.startswith("#"):
filename = Path(line.replace("\\", "/")).name
if not filename:
continue
position += 1
tracks.append({
"position": str(position),
"filename": filename,
"title": pending_title or "",
})
pending_title = None
return tracks
def _read_tracklist_file(path: Path) -> Optional[str]:
try:
if path.suffix.lower() in (".htm", ".html"):
raw = path.read_bytes()
encoding = "utf-8"
for enc in ("utf-8", "latin-1", "cp1252"):
try:
raw.decode(enc)
encoding = enc
break
except UnicodeDecodeError:
continue
text = raw.decode(encoding, errors="replace")
if HAS_BS4:
soup = BeautifulSoup(text, "html.parser")
return soup.get_text(separator="\n")
# Fallback: strip HTML tags
return re.sub(r"<[^>]+>", " ", text)
else:
for enc in ("utf-8", "latin-1", "cp1252"):
try:
return path.read_text(encoding=enc)
except UnicodeDecodeError:
continue
except Exception as e:
print(f" ⚠️ Tracklist-Lesefehler {path.name}: {e}", file=sys.stderr)
return None
_OLLAMA_HOST = os.getenv("OLLAMA_HOST", "http://localhost:11434")
# Modelle in Prioritätsreihenfolge; überschreibbar via OLLAMA_OCR_MODEL
_OCR_MODELS = [m.strip() for m in os.getenv(
"OLLAMA_OCR_MODEL",
"qwen3-vl:latest,minicpm-v:latest,deepseek-ocr:latest"
).split(",") if m.strip()]
_OCR_PROMPT = (
"This image shows a CD album back cover or booklet page. "
"Your task: extract the complete tracklist as plain text.\n"
"Rules:\n"
"- Output track number and title per line, e.g. '1. Title' or '1-1 Title'\n"
"- If multiple discs/CDs: include a header like 'CD 1' or 'Disc 1' before each group\n"
"- Include durations if visible (e.g. '1. Title 4:32')\n"
"- Do NOT include label info, barcodes, or other non-tracklist text\n"
"- If no tracklist is visible, reply with: NO_TRACKLIST"
)
def _ocr_back_cover(image_files: List[Path]) -> Optional[str]:
"""
OCR eines Back-Cover- oder Booklet-Bildes via Ollama Vision.
Gibt den erkannten Text zurück, oder None wenn nichts gefunden.
"""
# Nur Bilder die nach Back-Cover aussehen
candidates = [
p for p in image_files
if any(kw in p.name.lower() for kw in ("back", "inlay", "booklet", "inside", "rear"))
]
# Fallback: alle Bilder außer dem Front-Cover
if not candidates:
candidates = [
p for p in image_files
if not any(kw in p.name.lower() for kw in ("front", "folder", "cover"))
]
if not candidates:
return None
image_path = candidates[0]
try:
img_b64 = base64.b64encode(image_path.read_bytes()).decode()
except Exception as e:
print(f" ⚠️ OCR-Bild lesen {image_path.name}: {e}", file=sys.stderr)
return None
for model in _OCR_MODELS:
payload = json.dumps({
"model": model,
"messages": [{
"role": "user",
"content": _OCR_PROMPT,
"images": [img_b64],
}],
"stream": False,
"options": {"temperature": 0.0},
}).encode()
try:
req = urllib.request.Request(
f"{_OLLAMA_HOST}/api/chat",
data=payload,
headers={"Content-Type": "application/json"},
method="POST",
)
with urllib.request.urlopen(req, timeout=180) as resp:
data = json.loads(resp.read())
text = data.get("message", {}).get("content", "").strip()
if text and "NO_TRACKLIST" not in text:
print(f" 📷 OCR {image_path.name} via {model}: {len(text)} Zeichen extrahiert",
file=sys.stderr)
return text
elif "NO_TRACKLIST" in text:
print(f" 📷 OCR {image_path.name}: kein Tracklist-Text erkannt", file=sys.stderr)
return None
except Exception as e:
print(f" ⚠️ OCR-Fehler ({model}) {image_path.name}: {e}", file=sys.stderr)
continue
return None
def _check_cover_images(paths: List[Path]) -> List[Path]:
good: List[Path] = []
for p in paths:
name_lower = p.name.lower()
# Prefer front covers
if any(kw in name_lower for kw in ("front", "folder", "cover", "album")):
good.insert(0, p)
else:
good.append(p)
return good
def extract_hints(scan: AlbumScan, use_ocr: bool = True) -> AlbumHints:
hints = AlbumHints(album_dir=scan.album_dir)
# Directory name
hints.dir_artist, hints.dir_album, hints.dir_year = _parse_dirname(scan.album_dir.name)
# Cover images
hints.cover_images = _check_cover_images(scan.image_files)
# Tracklist files
texts: List[str] = []
for tf in scan.tracklist_files:
txt = _read_tracklist_file(tf)
if txt:
texts.append(txt)
hints.tracklist_text = "\n\n".join(texts) if texts else None
# OCR-Fallback: Back-Cover scannen wenn keine Tracklist-Textdatei vorhanden
if use_ocr and not hints.tracklist_text and scan.image_files:
ocr_text = _ocr_back_cover(scan.image_files)
if ocr_text:
hints.tracklist_text = ocr_text
parsed_tracklist = _parse_tracklist(hints.tracklist_text) if hints.tracklist_text else []
# M3U/Playlist-Reihenfolge: filename (stem, normalisiert) → Tracknummer
m3u_order: Dict[str, int] = {}
m3u_titles: Dict[str, str] = {}
for pf in scan.playlist_files:
try:
text = pf.read_text(encoding="utf-8", errors="replace")
for entry in _parse_m3u(text):
stem = _clean(Path(entry["filename"]).stem).casefold()
pos = int(entry["position"])
if stem and stem not in m3u_order:
m3u_order[stem] = pos
if entry.get("title"):
m3u_titles[stem] = entry["title"]
except Exception as e:
print(f" ⚠️ Playlist-Lesefehler {pf.name}: {e}", file=sys.stderr)
# Tracklist-Lookup: exakter Titel, fuzzy Titel, Katalognummer (BWV, Op., K., …)
tl_by_title: Dict[str, Dict[str, str]] = {}
tl_by_title_norm: Dict[str, Dict[str, str]] = {}
tl_by_catalog: Dict[str, Dict[str, str]] = {}
for entry in parsed_tracklist:
raw_title = entry.get("title", "")
exact_key = _clean(raw_title).casefold()
if exact_key:
tl_by_title[exact_key] = entry
norm_key = _norm_for_match(raw_title)
if norm_key:
tl_by_title_norm[norm_key] = entry
cat_key = _catalog_key(raw_title)
if cat_key:
tl_by_catalog[cat_key] = entry
# Build TrackHints per audio file
for audio_path in sorted(scan.audio_files):
tags, duration = _read_tags(audio_path)
fn_hints = _parse_filename(audio_path.stem)
track_num: Optional[int] = None
disc_num: Optional[int] = None
# Track number: tag > filename
raw_tn = tags.get("tracknumber") or fn_hints.get("track")
if raw_tn:
try:
tn_int = int(str(raw_tn).split("/")[0])
if tn_int > 0: # 0 gilt als "keine Nummer"
track_num = tn_int
except ValueError:
pass
# Disc number: tag > filename > path segment
raw_dn = tags.get("discnumber") or fn_hints.get("disc")
if raw_dn:
try:
disc_num = int(str(raw_dn).split("/")[0])
except ValueError:
pass
if not disc_num:
for part in audio_path.relative_to(scan.album_dir).parts[:-1]:
dm = _DISC_SECTION_RE.search(part)
if dm:
disc_num = int(dm.group(1))
break
title = tags.get("title") or fn_hints.get("title")
artist = tags.get("artist") or fn_hints.get("artist")
# Tracklist-Matching: Nummer → exakter Titel → fuzzy Titel
# Wenn ein Match gefunden: disc+track aus Tracklist übernehmen (Tracklist ist
# autoritativer als M3U-Reihenfolge bei Alben mit expliziter Disc-Nummerierung).
if parsed_tracklist:
matched_tl: Optional[Dict[str, str]] = None
# 1. Exakt per Tracknummer + Disc (nur wenn beides aus Tag/Dateiname bekannt)
if track_num and disc_num:
for tl_entry in parsed_tracklist:
tl_track = tl_entry.get("track")
tl_disc = tl_entry.get("disc", "1")
if (tl_track and int(tl_track) == track_num
and int(tl_disc) == disc_num):
matched_tl = tl_entry
break
# 2. Exakter Titelvergleich
if matched_tl is None and title:
matched_tl = tl_by_title.get(_clean(title).casefold())
# 3. Fuzzy Titelvergleich (ignoriert Kommas, Apostrophe, Groß-/Kleinschreibung)
if matched_tl is None and title:
matched_tl = tl_by_title_norm.get(_norm_for_match(title))
# 4. Katalognummer (BWV, Op., K. …) — greift bei abgekürzten Dateinamen
if matched_tl is None and title:
cat = _catalog_key(title)
if cat:
matched_tl = tl_by_catalog.get(cat)
if matched_tl:
# Titel aus Tracklist übernehmen wenn besser
if _is_good(matched_tl.get("title")):
title = matched_tl["title"]
# disc+track aus Tracklist sind autoritativer als M3U-Reihenfolge
try:
tl_track_n = int(matched_tl["track"]) if matched_tl.get("track") else None
tl_disc_n = int(matched_tl.get("disc", "1"))
if tl_track_n:
track_num = tl_track_n
disc_num = tl_disc_n
except (ValueError, KeyError):
pass
# M3U-Reihenfolge nur als letzter Fallback (wenn Tracklist kein Match liefert)
if track_num is None:
stem_key = _clean(audio_path.stem).casefold()
if stem_key in m3u_order:
track_num = m3u_order[stem_key]
# M3U-Titel als Fallback (enthält "Composer - Title" — nur nutzen wenn kein besserer Titel)
if not _is_good(title):
stem_key = _clean(audio_path.stem).casefold()
if stem_key in m3u_titles:
title = m3u_titles[stem_key]
hints.tracks.append(TrackHints(
path=audio_path,
track_number=track_num,
disc_number=disc_num,
title=_clean(title) if title else None,
artist=_clean(artist) if artist else None,
duration=duration,
existing_tags=tags,
))
return hints