fix: korrekte Track-Nummerierung, Scanner-Rekursion, M3U-Reihenfolge

scanner: nicht in Unterordner wenn Root Audio-Dateien enthält (verhindert
  Doppel-Scan bei versehentlichen Unterordner-Kopien); nur Disc-Ordner
  (CD1, Disc 2…) werden bei Multi-CD-Alben rekursiert.

hint_extractor: M3U/Playlist-Dateien als Track-Reihenfolge-Quelle; BOM-
  Bereinigung; Tracklist-Matching auch per Titel (nicht nur per Nummer);
  tracknumber=0 wird als 'keine Nummer' gewertet.

metadata_resolver: sequenzielle Fallback-Nummerierung (1,2,3…) für Tracks
  ohne Tracknummer — verhindert '00'-Präfix beim --rename; dir_artist hat
  Vorrang vor 'Various Artists'-Heuristik; LLM darf bei Konfidenz <0.3
  auch bestehende Werte korrigieren (Tippfehler im Verzeichnisnamen).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Dieter Schlüter 2026-04-28 21:49:00 +02:00
commit d91eb36007
4 changed files with 189 additions and 48 deletions

View file

@ -51,6 +51,8 @@ _DISC_SECTION_RE = re.compile(r"(?i)(?:cd|disc|disk|side)[_ \-]*(\d{1,2})")
def _clean(s: Optional[str]) -> str: def _clean(s: Optional[str]) -> str:
if not s: if not s:
return "" return ""
# BOM (U+FEFF), Zero-Width-Space (U+200B), Soft-Hyphen (U+00AD) entfernen
s = re.sub(r"[­]", "", s)
return re.sub(r"\s+", " ", s.replace("_", " ")).strip(" -._") return re.sub(r"\s+", " ", s.replace("_", " ")).strip(" -._")
@ -143,6 +145,34 @@ def _parse_tracklist(text: str) -> List[Dict[str, str]]:
return tracks return tracks
def _parse_m3u(text: str) -> List[Dict[str, str]]:
"""M3U/M3U8 → geordnete Liste: [{filename, title, position}].
Reihenfolge der Einträge = gewünschte Trackreihenfolge.
"""
tracks: List[Dict[str, str]] = []
pending_title: Optional[str] = None
position = 0
for line in text.splitlines():
line = line.strip()
if not line:
continue
if line.upper().startswith("#EXTINF:"):
parts = line.split(",", 1)
pending_title = parts[1].strip() if len(parts) > 1 else None
elif not line.startswith("#"):
filename = Path(line.replace("\\", "/")).name
if not filename:
continue
position += 1
tracks.append({
"position": str(position),
"filename": filename,
"title": pending_title or "",
})
pending_title = None
return tracks
def _read_tracklist_file(path: Path) -> Optional[str]: def _read_tracklist_file(path: Path) -> Optional[str]:
try: try:
if path.suffix.lower() in (".htm", ".html"): if path.suffix.lower() in (".htm", ".html"):
@ -203,6 +233,29 @@ def extract_hints(scan: AlbumScan) -> AlbumHints:
parsed_tracklist = _parse_tracklist(hints.tracklist_text) if hints.tracklist_text else [] parsed_tracklist = _parse_tracklist(hints.tracklist_text) if hints.tracklist_text else []
# M3U/Playlist-Reihenfolge: filename (stem, normalisiert) → Tracknummer
m3u_order: Dict[str, int] = {}
m3u_titles: Dict[str, str] = {}
for pf in scan.playlist_files:
try:
text = pf.read_text(encoding="utf-8", errors="replace")
for entry in _parse_m3u(text):
stem = _clean(Path(entry["filename"]).stem).casefold()
pos = int(entry["position"])
if stem and stem not in m3u_order:
m3u_order[stem] = pos
if entry.get("title"):
m3u_titles[stem] = entry["title"]
except Exception as e:
print(f" ⚠️ Playlist-Lesefehler {pf.name}: {e}", file=sys.stderr)
# Tracklist-Lookup: normalisierter Titel → Eintrag (für titelbasiertes Matching)
tl_by_title: Dict[str, Dict[str, str]] = {}
for entry in parsed_tracklist:
key = _clean(entry.get("title", "")).casefold()
if key:
tl_by_title[key] = entry
# Build TrackHints per audio file # Build TrackHints per audio file
for audio_path in sorted(scan.audio_files): for audio_path in sorted(scan.audio_files):
tags, duration = _read_tags(audio_path) tags, duration = _read_tags(audio_path)
@ -215,10 +268,18 @@ def extract_hints(scan: AlbumScan) -> AlbumHints:
raw_tn = tags.get("tracknumber") or fn_hints.get("track") raw_tn = tags.get("tracknumber") or fn_hints.get("track")
if raw_tn: if raw_tn:
try: try:
track_num = int(str(raw_tn).split("/")[0]) tn_int = int(str(raw_tn).split("/")[0])
if tn_int > 0: # 0 gilt als "keine Nummer"
track_num = tn_int
except ValueError: except ValueError:
pass pass
# Track number aus M3U-Reihenfolge (Vorrang vor Dateiname, aber nicht vor Tag)
if track_num is None:
stem_key = _clean(audio_path.stem).casefold()
if stem_key in m3u_order:
track_num = m3u_order[stem_key]
# Disc number: tag > filename > path segment # Disc number: tag > filename > path segment
raw_dn = tags.get("discnumber") or fn_hints.get("disc") raw_dn = tags.get("discnumber") or fn_hints.get("disc")
if raw_dn: if raw_dn:
@ -236,16 +297,27 @@ def extract_hints(scan: AlbumScan) -> AlbumHints:
title = tags.get("title") or fn_hints.get("title") title = tags.get("title") or fn_hints.get("title")
artist = tags.get("artist") or fn_hints.get("artist") artist = tags.get("artist") or fn_hints.get("artist")
# Enrich from parsed tracklist if track_num matches # Tracklist: erst nach Nummer, dann nach Titel
if parsed_tracklist and track_num: if parsed_tracklist:
for tl_entry in parsed_tracklist: matched_tl: Optional[Dict[str, str]] = None
tl_track = tl_entry.get("track") if track_num:
tl_disc = tl_entry.get("disc", "1") for tl_entry in parsed_tracklist:
if (tl_track and int(tl_track) == track_num tl_track = tl_entry.get("track")
and int(tl_disc) == (disc_num or 1)): tl_disc = tl_entry.get("disc", "1")
if not _is_good(title) and _is_good(tl_entry.get("title")): if (tl_track and int(tl_track) == track_num
title = tl_entry["title"] and int(tl_disc) == (disc_num or 1)):
break matched_tl = tl_entry
break
if matched_tl is None and title:
matched_tl = tl_by_title.get(_clean(title).casefold())
if matched_tl and not _is_good(title) and _is_good(matched_tl.get("title")):
title = matched_tl["title"]
# M3U-Titel als Fallback (enthält "Composer - Title" — nur nutzen wenn kein besserer Titel)
if not _is_good(title):
stem_key = _clean(audio_path.stem).casefold()
if stem_key in m3u_titles:
title = m3u_titles[stem_key]
hints.tracks.append(TrackHints( hints.tracks.append(TrackHints(
path=audio_path, path=audio_path,

View file

@ -40,8 +40,8 @@ OPENROUTER_API_KEY = os.getenv("OPENROUTER_API_KEY", "")
DISCOGS_TOKEN = os.getenv("DISCOGS_TOKEN", "") DISCOGS_TOKEN = os.getenv("DISCOGS_TOKEN", "")
OLLAMA_HOST = os.getenv("OLLAMA_HOST", "http://localhost:11434") OLLAMA_HOST = os.getenv("OLLAMA_HOST", "http://localhost:11434")
# Lokales Reasoning-Modell für Metadaten-Ergänzung (passt auf RTX 3090) # qwen3:8b (5.2GB) reicht für einfache JSON-Metadaten-Ergänzung und lädt schnell (~10s)
OLLAMA_RESOLVE_MODEL = os.getenv("OLLAMA_RESOLVE_MODEL", "qwen3.5:27b") OLLAMA_RESOLVE_MODEL = os.getenv("OLLAMA_RESOLVE_MODEL", "qwen3:8b")
def _mb_wait(): def _mb_wait():
@ -184,10 +184,12 @@ def _build_resolve_prompt(hints: AlbumHints, partial: Dict) -> str:
for t in hints.tracks[:20] for t in hints.tracks[:20]
) )
return ( return (
"Du bist ein Musikexperte. Analysiere diese Album-Daten und vervollständige die fehlenden Felder.\n\n" "Du bist ein Musikexperte. Analysiere diese Album-Daten.\n"
"Vervollständige fehlende Felder UND korrigiere erkennbare Tippfehler "
"(z.B. im Albumtitel oder Künstlernamen — Verzeichnisnamen enthalten oft Schreibfehler).\n\n"
f"Verzeichnisname: {hints.album_dir.name}\n" f"Verzeichnisname: {hints.album_dir.name}\n"
f"Bekannte Artist: {hints.dir_artist or partial.get('artist', 'unbekannt')}\n" f"Künstler (aus Verzeichnis): {hints.dir_artist or partial.get('artist', 'unbekannt')}\n"
f"Bekannter Albumtitel: {hints.dir_album or partial.get('album', 'unbekannt')}\n" f"Albumtitel (aus Verzeichnis, evtl. mit Tippfehlern): {hints.dir_album or partial.get('album', 'unbekannt')}\n"
f"Jahr: {hints.dir_year or partial.get('year', 'unbekannt')}\n" f"Jahr: {hints.dir_year or partial.get('year', 'unbekannt')}\n"
f"Tracklist-Hinweise:\n{tracks_summary}\n\n" f"Tracklist-Hinweise:\n{tracks_summary}\n\n"
'Antworte NUR mit einem JSON-Objekt mit diesen Feldern (null wenn unbekannt):\n' 'Antworte NUR mit einem JSON-Objekt mit diesen Feldern (null wenn unbekannt):\n'
@ -224,7 +226,7 @@ def _resolve_via_ollama(hints: AlbumHints, partial: Dict) -> Optional[Dict]:
headers={"Content-Type": "application/json"}, headers={"Content-Type": "application/json"},
method="POST", method="POST",
) )
with urllib.request.urlopen(req, timeout=120) as resp: with urllib.request.urlopen(req, timeout=240) as resp:
data = json.loads(resp.read()) data = json.loads(resp.read())
text = data.get("message", {}).get("content", "").strip() text = data.get("message", {}).get("content", "").strip()
return _parse_json_response(text) return _parse_json_response(text)
@ -422,19 +424,34 @@ def resolve(
if not artist or not album or confidence < 0.5: if not artist or not album or confidence < 0.5:
cl = _claude_resolve(hints, partial) cl = _claude_resolve(hints, partial)
if cl: if cl:
artist = artist or cl.get("artist") if confidence < 0.3:
album = album or cl.get("album") # Sehr unsicher: LLM darf auch bestehende Werte korrigieren
year = year or cl.get("year") # (z.B. Tippfehler im Albumtitel aus dem Verzeichnisnamen)
genre = genre or cl.get("genre") artist = cl.get("artist") or artist
label = label or cl.get("label") album = cl.get("album") or album
year = cl.get("year") or year
genre = cl.get("genre") or genre
label = cl.get("label") or label
else:
artist = artist or cl.get("artist")
album = album or cl.get("album")
year = year or cl.get("year")
genre = genre or cl.get("genre")
label = label or cl.get("label")
confidence += 0.10 confidence += 0.10
sources.append("llm-resolve") sources.append("llm-resolve")
# Finalize albumartist # Finalize albumartist
# dir_artist hat Vorrang: wenn der Verzeichnisname einen Künstler nennt
# (z.B. "Eugen_Cicero_-_Jazz_meets_Classic"), ist das der Albumkünstler —
# auch wenn die Track-Dateinamen die Komponisten-Namen enthalten.
track_artists = [t.artist for t in hints.tracks if t.artist] track_artists = [t.artist for t in hints.tracks if t.artist]
from collections import Counter from collections import Counter
distinct_artists = set(a for a in track_artists if a) distinct_artists = set(a for a in track_artists if a)
if len(distinct_artists) >= 3: if hints.dir_artist:
# Verzeichnisname nennt explizit einen Künstler → immer verwenden
albumartist = hints.dir_artist
elif len(distinct_artists) >= 3:
albumartist = "Various Artists" albumartist = "Various Artists"
elif track_artists: elif track_artists:
albumartist = artist or Counter(track_artists).most_common(1)[0][0] albumartist = artist or Counter(track_artists).most_common(1)[0][0]
@ -500,4 +517,15 @@ def _build_track_proposals(
mbid=None, mbid=None,
)) ))
# Sequenzielle Nummerierung als letzter Fallback:
# Tracks ohne Nummer (None) erhalten eine laufende Nummer pro Disc.
# Damit werden "00" und "??" im Dateinamen beim --rename verhindert.
if any(p.track_number is None for p in proposals):
disc_counters: Dict[int, int] = {}
for p in proposals:
if p.track_number is None:
disc = p.disc_number or 1
disc_counters[disc] = disc_counters.get(disc, 0) + 1
p.track_number = disc_counters[disc]
return proposals return proposals

View file

@ -26,6 +26,7 @@ class AlbumScan:
audio_files: List[Path] = field(default_factory=list) audio_files: List[Path] = field(default_factory=list)
image_files: List[Path] = field(default_factory=list) image_files: List[Path] = field(default_factory=list)
tracklist_files: List[Path] = field(default_factory=list) tracklist_files: List[Path] = field(default_factory=list)
playlist_files: List[Path] = field(default_factory=list) # .m3u / .m3u8 / .pls
other_files: List[Path] = field(default_factory=list) other_files: List[Path] = field(default_factory=list)

View file

@ -1,51 +1,91 @@
from __future__ import annotations from __future__ import annotations
import re
import sys import sys
from pathlib import Path from pathlib import Path
from typing import List from typing import List
from models import AlbumScan, AUDIO_EXTENSIONS, IMAGE_EXTENSIONS, TRACKLIST_EXTENSIONS from models import AlbumScan, AUDIO_EXTENSIONS, IMAGE_EXTENSIONS, TRACKLIST_EXTENSIONS, PLAYLIST_EXTENSIONS
_DISC_DIR_RE = re.compile(r"(?i)^(?:cd|disc|disk|side)[_ \-]*\d{1,2}$")
def _is_hidden(name: str) -> bool: def _is_hidden(name: str) -> bool:
return name.startswith(".") or name.startswith("_") return name.startswith(".") or name.startswith("_")
def _is_disc_dir(name: str) -> bool:
"""True für Ordner wie 'CD1', 'Disc 2', 'Side A', 'Disk_1'."""
return bool(_DISC_DIR_RE.match(name))
def scan_album(album_dir: Path) -> AlbumScan: def scan_album(album_dir: Path) -> AlbumScan:
"""
Scannt ein Album-Verzeichnis.
Rekursions-Regel:
- Hat das Album-Verzeichnis selbst Audio-Dateien kein Abstieg in Unterordner
(Einzelscheibe; Sub-Ordner wie Artworks, Scans, irrtümliche Kopien werden ignoriert).
- Hat der Root KEINE Audio-Dateien Abstieg nur in Disc-Unterordner (CD1, Disc 2 ).
"""
result = AlbumScan(album_dir=album_dir) result = AlbumScan(album_dir=album_dir)
for dirpath, dirnames, filenames in album_dir.walk() if hasattr(album_dir, "walk") else _os_walk(album_dir): # Erst nur die Wurzel-Ebene scannen, um zu entscheiden ob rekursiert wird
dirnames[:] = [d for d in dirnames if not _is_hidden(d)] root_has_audio = any(
current = Path(dirpath) if isinstance(dirpath, str) else dirpath (album_dir / name).suffix.lower() in AUDIO_EXTENSIONS
for name in _listdir(album_dir)
if not _is_hidden(name)
)
for name in filenames: if root_has_audio:
if _is_hidden(name): # Nur Root-Ebene — keine Unterordner
continue _scan_dir(album_dir, album_dir, result, recurse=False)
p = current / name else:
ext = p.suffix.lower() # Kein Audio an der Wurzel → Multi-CD: nur Disc-Unterordner
_scan_dir(album_dir, album_dir, result, recurse=True)
if ext in AUDIO_EXTENSIONS:
result.audio_files.append(p)
elif ext in IMAGE_EXTENSIONS:
result.image_files.append(p)
elif ext in TRACKLIST_EXTENSIONS:
result.tracklist_files.append(p)
else:
result.other_files.append(p)
result.audio_files.sort() result.audio_files.sort()
result.image_files.sort() result.image_files.sort()
result.tracklist_files.sort() result.tracklist_files.sort()
result.playlist_files.sort()
return result return result
def _os_walk(album_dir: Path): def _listdir(path: Path) -> List[str]:
import os try:
return os.walk( return [e.name for e in path.iterdir()]
album_dir, except (PermissionError, OSError) as e:
followlinks=False, print(f"⚠️ Scan-Fehler: {e}", file=sys.stderr)
onerror=lambda e: print(f"⚠️ Scan-Fehler: {e}", file=sys.stderr), return []
)
def _scan_dir(current: Path, album_dir: Path, result: AlbumScan, recurse: bool) -> None:
try:
entries = sorted(current.iterdir())
except (PermissionError, OSError) as e:
print(f"⚠️ Scan-Fehler {current}: {e}", file=sys.stderr)
return
for entry in entries:
name = entry.name
if _is_hidden(name):
continue
if entry.is_dir():
if recurse and _is_disc_dir(name):
_scan_dir(entry, album_dir, result, recurse=True)
# Andere Unterordner (Artworks, irrtümliche Kopien…) werden übersprungen
elif entry.is_file():
ext = entry.suffix.lower()
if ext in AUDIO_EXTENSIONS:
result.audio_files.append(entry)
elif ext in IMAGE_EXTENSIONS:
result.image_files.append(entry)
elif ext in TRACKLIST_EXTENSIONS:
result.tracklist_files.append(entry)
elif ext in PLAYLIST_EXTENSIONS:
result.playlist_files.append(entry)
else:
result.other_files.append(entry)
def collect_album_dirs(root: Path) -> List[Path]: def collect_album_dirs(root: Path) -> List[Path]: