Compare commits
No commits in common. "b6abfae16cca37354c90e4e29b4a6c0b5ec0a6c0" and "b273052f68a2c9f9c9a2bd93bee29133bd9577e3" have entirely different histories.
b6abfae16c
...
b273052f68
8 changed files with 0 additions and 2425 deletions
171
cover_handler.py
171
cover_handler.py
|
|
@ -1,171 +0,0 @@
|
||||||
from __future__ import annotations
|
|
||||||
|
|
||||||
import sys
|
|
||||||
import tempfile
|
|
||||||
import time
|
|
||||||
from pathlib import Path
|
|
||||||
from typing import Optional, List
|
|
||||||
|
|
||||||
try:
|
|
||||||
from PIL import Image
|
|
||||||
HAS_PIL = True
|
|
||||||
except ImportError:
|
|
||||||
HAS_PIL = False
|
|
||||||
|
|
||||||
try:
|
|
||||||
import requests
|
|
||||||
HAS_REQUESTS = True
|
|
||||||
except ImportError:
|
|
||||||
HAS_REQUESTS = False
|
|
||||||
|
|
||||||
try:
|
|
||||||
import musicbrainzngs as mb
|
|
||||||
HAS_MB = True
|
|
||||||
except ImportError:
|
|
||||||
HAS_MB = False
|
|
||||||
|
|
||||||
try:
|
|
||||||
from mutagen.id3 import ID3, APIC, error as ID3Error
|
|
||||||
from mutagen.flac import FLAC, Picture
|
|
||||||
from mutagen.mp4 import MP4, MP4Cover
|
|
||||||
HAS_MUTAGEN = True
|
|
||||||
except ImportError:
|
|
||||||
HAS_MUTAGEN = False
|
|
||||||
|
|
||||||
_MIN_COVER_SIZE = 200 # pixels
|
|
||||||
|
|
||||||
|
|
||||||
def _image_ok(path: Path) -> bool:
|
|
||||||
if not HAS_PIL:
|
|
||||||
return path.stat().st_size > 5000
|
|
||||||
try:
|
|
||||||
with Image.open(path) as img:
|
|
||||||
w, h = img.size
|
|
||||||
return w >= _MIN_COVER_SIZE and h >= _MIN_COVER_SIZE
|
|
||||||
except Exception:
|
|
||||||
return False
|
|
||||||
|
|
||||||
|
|
||||||
def find_local_cover(image_files: List[Path]) -> Optional[Path]:
|
|
||||||
priority = ("front", "folder", "cover", "album")
|
|
||||||
# Sort by priority keyword, then size descending
|
|
||||||
def key(p: Path):
|
|
||||||
name = p.name.lower()
|
|
||||||
score = next((i for i, kw in enumerate(priority) if kw in name), len(priority))
|
|
||||||
size = p.stat().st_size if p.exists() else 0
|
|
||||||
return (score, -size)
|
|
||||||
|
|
||||||
for p in sorted(image_files, key=key):
|
|
||||||
if _image_ok(p):
|
|
||||||
return p
|
|
||||||
return None
|
|
||||||
|
|
||||||
|
|
||||||
def _mb_cover_url(release_mbid: str) -> Optional[str]:
|
|
||||||
url = f"https://coverartarchive.org/release/{release_mbid}/front"
|
|
||||||
if not HAS_REQUESTS:
|
|
||||||
return None
|
|
||||||
try:
|
|
||||||
r = requests.head(url, timeout=5, allow_redirects=True)
|
|
||||||
if r.status_code == 200:
|
|
||||||
return url
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
return None
|
|
||||||
|
|
||||||
|
|
||||||
def download_cover(release_mbid: Optional[str], dest_dir: Path) -> Optional[Path]:
|
|
||||||
if not release_mbid or not HAS_REQUESTS:
|
|
||||||
return None
|
|
||||||
url = _mb_cover_url(release_mbid)
|
|
||||||
if not url:
|
|
||||||
return None
|
|
||||||
try:
|
|
||||||
r = requests.get(url, timeout=15)
|
|
||||||
if r.status_code == 200:
|
|
||||||
ext = ".jpg"
|
|
||||||
ct = r.headers.get("content-type", "")
|
|
||||||
if "png" in ct:
|
|
||||||
ext = ".png"
|
|
||||||
dest = dest_dir / f"_cover_download{ext}"
|
|
||||||
dest.write_bytes(r.content)
|
|
||||||
if _image_ok(dest):
|
|
||||||
return dest
|
|
||||||
dest.unlink(missing_ok=True)
|
|
||||||
except Exception as e:
|
|
||||||
print(f" ⚠️ Cover-Download-Fehler: {e}", file=sys.stderr)
|
|
||||||
return None
|
|
||||||
|
|
||||||
|
|
||||||
def embed_cover(audio_path: Path, cover_path: Path) -> bool:
|
|
||||||
if not HAS_MUTAGEN:
|
|
||||||
return False
|
|
||||||
try:
|
|
||||||
img_data = cover_path.read_bytes()
|
|
||||||
mime = "image/jpeg" if cover_path.suffix.lower() in (".jpg", ".jpeg") else "image/png"
|
|
||||||
ext = audio_path.suffix.lower()
|
|
||||||
|
|
||||||
if ext == ".mp3":
|
|
||||||
try:
|
|
||||||
tags = ID3(str(audio_path))
|
|
||||||
except ID3Error:
|
|
||||||
tags = ID3()
|
|
||||||
tags.delall("APIC")
|
|
||||||
tags.add(APIC(encoding=3, mime=mime, type=3, desc="Cover", data=img_data))
|
|
||||||
tags.save(str(audio_path), v2_version=4)
|
|
||||||
return True
|
|
||||||
|
|
||||||
elif ext == ".flac":
|
|
||||||
audio = FLAC(str(audio_path))
|
|
||||||
audio.clear_pictures()
|
|
||||||
pic = Picture()
|
|
||||||
pic.type = 3
|
|
||||||
pic.mime = mime
|
|
||||||
pic.desc = "Cover"
|
|
||||||
pic.data = img_data
|
|
||||||
audio.add_picture(pic)
|
|
||||||
audio.save()
|
|
||||||
return True
|
|
||||||
|
|
||||||
elif ext == ".m4a":
|
|
||||||
audio = MP4(str(audio_path))
|
|
||||||
fmt = MP4Cover.FORMAT_JPEG if mime == "image/jpeg" else MP4Cover.FORMAT_PNG
|
|
||||||
audio.tags["covr"] = [MP4Cover(img_data, imageformat=fmt)]
|
|
||||||
audio.save()
|
|
||||||
return True
|
|
||||||
|
|
||||||
else:
|
|
||||||
# Generic mutagen fallback
|
|
||||||
from mutagen import File as MutagenFile
|
|
||||||
audio = MutagenFile(str(audio_path), easy=False)
|
|
||||||
if audio is not None:
|
|
||||||
if audio.tags is None:
|
|
||||||
audio.add_tags()
|
|
||||||
if hasattr(audio.tags, "add"):
|
|
||||||
audio.tags.add(
|
|
||||||
APIC(encoding=3, mime=mime, type=3, desc="Cover", data=img_data)
|
|
||||||
)
|
|
||||||
audio.save()
|
|
||||||
return True
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
print(f" ⚠️ Cover-Einbettungsfehler {audio_path.name}: {e}", file=sys.stderr)
|
|
||||||
return False
|
|
||||||
|
|
||||||
|
|
||||||
def resolve_cover(
|
|
||||||
image_files: List[Path],
|
|
||||||
release_mbid: Optional[str],
|
|
||||||
album_dir: Path,
|
|
||||||
) -> tuple[Optional[Path], Optional[str]]:
|
|
||||||
"""Returns (cover_path, source_label)."""
|
|
||||||
local = find_local_cover(image_files)
|
|
||||||
if local:
|
|
||||||
return local, "local"
|
|
||||||
|
|
||||||
if release_mbid:
|
|
||||||
downloaded = download_cover(release_mbid, album_dir)
|
|
||||||
if downloaded:
|
|
||||||
return downloaded, "musicbrainz"
|
|
||||||
|
|
||||||
return None, None
|
|
||||||
368
executor.py
368
executor.py
|
|
@ -1,368 +0,0 @@
|
||||||
from __future__ import annotations
|
|
||||||
|
|
||||||
import csv
|
|
||||||
import re
|
|
||||||
import shutil
|
|
||||||
import subprocess
|
|
||||||
import sys
|
|
||||||
from pathlib import Path
|
|
||||||
from typing import Optional, List, Dict, Any
|
|
||||||
|
|
||||||
from models import AlbumProposal, TrackProposal
|
|
||||||
|
|
||||||
try:
|
|
||||||
from mutagen import File as MutagenFile
|
|
||||||
from mutagen.easyid3 import EasyID3
|
|
||||||
from mutagen.flac import FLAC
|
|
||||||
from mutagen.mp4 import MP4, MP4Tags
|
|
||||||
HAS_MUTAGEN = True
|
|
||||||
except ImportError:
|
|
||||||
HAS_MUTAGEN = False
|
|
||||||
|
|
||||||
from cover_handler import embed_cover
|
|
||||||
|
|
||||||
_SAFE_RE = re.compile(r'[<>:"/\\|?*\x00-\x1f]')
|
|
||||||
_CLASSICAL_GENRES = re.compile(
|
|
||||||
r"(?i)class|baroque|romantic|renaissance|opera|symphony|chamber|concerto|sonata|oratorio"
|
|
||||||
)
|
|
||||||
REPORT_FIELDS = [
|
|
||||||
"status", "album_dir", "track_path",
|
|
||||||
"old_title", "new_title",
|
|
||||||
"old_artist", "new_artist",
|
|
||||||
"album", "albumartist", "date", "genre", "label",
|
|
||||||
"track_number", "disc_number",
|
|
||||||
"cover_embedded", "renamed_to",
|
|
||||||
"confidence", "sources",
|
|
||||||
]
|
|
||||||
|
|
||||||
|
|
||||||
def _safe_name(s: str) -> str:
|
|
||||||
"""Filesystem-safe name: illegal chars → '_', spaces → '_'."""
|
|
||||||
s = _SAFE_RE.sub("_", s)
|
|
||||||
return re.sub(r"\s+", "_", s).strip("._-")
|
|
||||||
|
|
||||||
|
|
||||||
def _is_classical(albumartist: str, track_artist: str, genre: str) -> bool:
|
|
||||||
"""
|
|
||||||
Classical schema applies when performer (albumartist) ≠ composer (track_artist),
|
|
||||||
which covers both 'real' classical music and jazz-on-classical-themes albums.
|
|
||||||
Genre keyword matching is used as additional signal but not required.
|
|
||||||
"""
|
|
||||||
aa = (albumartist or "").casefold().strip()
|
|
||||||
ta = (track_artist or "").casefold().strip()
|
|
||||||
if not aa or aa in ("various artists", "unknown artist", "unknown"):
|
|
||||||
return False
|
|
||||||
if aa == ta:
|
|
||||||
return False
|
|
||||||
return True # performer ≠ composer → classical naming
|
|
||||||
|
|
||||||
|
|
||||||
def _proposed_filename(
|
|
||||||
proposal: TrackProposal,
|
|
||||||
ext: str,
|
|
||||||
albumartist: str = "",
|
|
||||||
genre: str = "",
|
|
||||||
) -> str:
|
|
||||||
"""
|
|
||||||
Pop/Default: TT_-_Artist_-_Titel.ext
|
|
||||||
Klassik: TT_-_Performer_-_Komponist_-_Titel[-_Orchester_Dirigent].ext
|
|
||||||
|
|
||||||
Separator zwischen Teilen: _-_
|
|
||||||
Leerzeichen innerhalb von Namen: _
|
|
||||||
Fehlende Teile werden weggelassen.
|
|
||||||
"""
|
|
||||||
tn = f"{proposal.track_number:02d}" if proposal.track_number else "00"
|
|
||||||
# Wenn disc_number gesetzt (auch disc=1): immer "D-TT" — konsistent über alle CDs.
|
|
||||||
# disc=None (Einzel-CD ohne Tag): nur "TT".
|
|
||||||
disc_prefix = f"{proposal.disc_number}-" if proposal.disc_number else ""
|
|
||||||
prefix = f"{disc_prefix}{tn}"
|
|
||||||
|
|
||||||
track_artist = _safe_name(proposal.artist or "Unknown")
|
|
||||||
aa = _safe_name(albumartist)
|
|
||||||
title = _safe_name(proposal.title or "Unknown")
|
|
||||||
|
|
||||||
if _is_classical(aa, track_artist, genre):
|
|
||||||
# Klassik-Schema: Performer _-_ Komponist _-_ Werk [_-_ Orchester,Dirigent]
|
|
||||||
parts = [prefix, aa, track_artist, title]
|
|
||||||
# Orchester und Dirigent anhängen wenn vorhanden
|
|
||||||
extra = "_".join(filter(None, [
|
|
||||||
_safe_name(proposal.orchestra or ""),
|
|
||||||
_safe_name(proposal.conductor or ""),
|
|
||||||
]))
|
|
||||||
if extra:
|
|
||||||
parts.append(extra)
|
|
||||||
return "_-_".join(parts) + ext
|
|
||||||
else:
|
|
||||||
# Pop/Default-Schema: Tracknummer _-_ Artist _-_ Titel
|
|
||||||
return f"{prefix}_-_{track_artist}_-_{title}{ext}"
|
|
||||||
|
|
||||||
|
|
||||||
def backup_file(path: Path, backup_dir: Path) -> bool:
|
|
||||||
try:
|
|
||||||
backup_dir.mkdir(parents=True, exist_ok=True)
|
|
||||||
rel = path.parent.name + "__" + path.name
|
|
||||||
dest = backup_dir / rel
|
|
||||||
if not dest.exists():
|
|
||||||
shutil.copy2(path, dest)
|
|
||||||
return True
|
|
||||||
except Exception as e:
|
|
||||||
print(f" ⚠️ Backup-Fehler {path.name}: {e}", file=sys.stderr)
|
|
||||||
return False
|
|
||||||
|
|
||||||
|
|
||||||
def write_tags(path: Path, proposal: TrackProposal, album_proposal: AlbumProposal) -> bool:
|
|
||||||
if not HAS_MUTAGEN:
|
|
||||||
return False
|
|
||||||
ext = path.suffix.lower()
|
|
||||||
tags_to_write = {
|
|
||||||
"title": proposal.title or "",
|
|
||||||
"artist": proposal.artist or "",
|
|
||||||
"album": album_proposal.album or "",
|
|
||||||
"albumartist": album_proposal.albumartist or "",
|
|
||||||
}
|
|
||||||
if proposal.track_number:
|
|
||||||
total = len(album_proposal.tracks)
|
|
||||||
tags_to_write["tracknumber"] = f"{proposal.track_number}/{total}"
|
|
||||||
if proposal.disc_number:
|
|
||||||
tags_to_write["discnumber"] = str(proposal.disc_number)
|
|
||||||
if album_proposal.date:
|
|
||||||
# Strip everything except valid ID3 timestamp characters to prevent ID3TimeStamp errors
|
|
||||||
date_clean = re.sub(r"[^\d\-T:+Z]", "", str(album_proposal.date)).strip()
|
|
||||||
if date_clean:
|
|
||||||
tags_to_write["date"] = date_clean
|
|
||||||
if album_proposal.genre:
|
|
||||||
tags_to_write["genre"] = album_proposal.genre
|
|
||||||
if album_proposal.label:
|
|
||||||
tags_to_write["organization"] = album_proposal.label
|
|
||||||
|
|
||||||
try:
|
|
||||||
if ext == ".mp3":
|
|
||||||
try:
|
|
||||||
audio = EasyID3(str(path))
|
|
||||||
except Exception:
|
|
||||||
# File has no ID3 header — add one without wiping audio data
|
|
||||||
from mutagen.id3 import ID3NoHeaderError
|
|
||||||
try:
|
|
||||||
from mutagen.mp3 import MP3
|
|
||||||
full = MP3(str(path))
|
|
||||||
full.tags = None
|
|
||||||
full.add_tags()
|
|
||||||
full.save(str(path), v2_version=4)
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
audio = EasyID3(str(path))
|
|
||||||
for k, v in tags_to_write.items():
|
|
||||||
try:
|
|
||||||
audio[k] = [v]
|
|
||||||
except Exception as tag_err:
|
|
||||||
print(f" ⚠️ Tag-Feld '{k}' übersprungen ({path.name}): {tag_err}", file=sys.stderr)
|
|
||||||
audio.save(v2_version=4)
|
|
||||||
return True
|
|
||||||
|
|
||||||
elif ext == ".flac":
|
|
||||||
audio = FLAC(str(path))
|
|
||||||
for k, v in tags_to_write.items():
|
|
||||||
audio[k] = [v]
|
|
||||||
audio.save()
|
|
||||||
return True
|
|
||||||
|
|
||||||
elif ext == ".m4a":
|
|
||||||
audio = MP4(str(path))
|
|
||||||
mapping = {
|
|
||||||
"title": "\xa9nam", "artist": "\xa9ART",
|
|
||||||
"album": "\xa9alb", "albumartist": "aART",
|
|
||||||
"tracknumber": "trkn", "date": "\xa9day",
|
|
||||||
"genre": "\xa9gen",
|
|
||||||
}
|
|
||||||
for k, v in tags_to_write.items():
|
|
||||||
tag_key = mapping.get(k)
|
|
||||||
if tag_key:
|
|
||||||
if tag_key == "trkn":
|
|
||||||
try:
|
|
||||||
num, total = v.split("/") if "/" in v else (v, "0")
|
|
||||||
audio[tag_key] = [(int(num), int(total))]
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
else:
|
|
||||||
audio[tag_key] = [v]
|
|
||||||
audio.save()
|
|
||||||
return True
|
|
||||||
|
|
||||||
else:
|
|
||||||
audio = MutagenFile(str(path), easy=True)
|
|
||||||
if audio is not None:
|
|
||||||
if audio.tags is None:
|
|
||||||
audio.add_tags()
|
|
||||||
for k, v in tags_to_write.items():
|
|
||||||
try:
|
|
||||||
audio[k] = [v]
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
audio.save()
|
|
||||||
return True
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
print(f" ⚠️ Tag-Schreibfehler {path.name}: {e}", file=sys.stderr)
|
|
||||||
return False
|
|
||||||
|
|
||||||
|
|
||||||
def _update_m3u(m3u_path: Path, tracks: List[tuple]) -> bool:
|
|
||||||
"""
|
|
||||||
Schreibt M3U neu mit den umbenannten Dateien in Track-Reihenfolge.
|
|
||||||
tracks: [(TrackProposal, actual_path_after_rename), ...]
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
lines = ["#EXTM3U"]
|
|
||||||
for tp, track_path in tracks:
|
|
||||||
duration = -1
|
|
||||||
if HAS_MUTAGEN:
|
|
||||||
try:
|
|
||||||
audio = MutagenFile(str(track_path))
|
|
||||||
if audio and hasattr(audio, "info") and audio.info:
|
|
||||||
duration = int(audio.info.length)
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
label = f"{tp.artist} - {tp.title}" if tp.artist else (tp.title or track_path.stem)
|
|
||||||
lines.append(f"#EXTINF:{duration},{label}")
|
|
||||||
lines.append(track_path.name)
|
|
||||||
m3u_path.write_text("\n".join(lines) + "\n", encoding="utf-8")
|
|
||||||
print(f" 📋 Playlist aktualisiert: {m3u_path.name}")
|
|
||||||
return True
|
|
||||||
except Exception as e:
|
|
||||||
print(f" ⚠️ M3U-Fehler {m3u_path.name}: {e}", file=sys.stderr)
|
|
||||||
return False
|
|
||||||
|
|
||||||
|
|
||||||
def execute_album(
|
|
||||||
proposal: AlbumProposal,
|
|
||||||
backup_dir: Optional[Path],
|
|
||||||
do_rename: bool,
|
|
||||||
embed_cover_art: bool,
|
|
||||||
dry_run: bool,
|
|
||||||
report_data: List[Dict[str, Any]],
|
|
||||||
) -> Dict[str, int]:
|
|
||||||
stats = {"tags_written": 0, "covers_embedded": 0, "files_renamed": 0, "errors": 0}
|
|
||||||
final_tracks: List[tuple] = [] # (TrackProposal, final_path) für M3U
|
|
||||||
|
|
||||||
for tp in proposal.tracks:
|
|
||||||
old_title = tp.path.stem
|
|
||||||
old_artist = ""
|
|
||||||
if HAS_MUTAGEN:
|
|
||||||
try:
|
|
||||||
audio = MutagenFile(str(tp.path), easy=True)
|
|
||||||
if audio and audio.tags:
|
|
||||||
old_artist = str(audio.tags.get("artist", [""])[0])
|
|
||||||
old_title = str(audio.tags.get("title", [tp.path.stem])[0])
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
|
|
||||||
new_path = tp.path
|
|
||||||
renamed_to = ""
|
|
||||||
cover_embedded = False
|
|
||||||
|
|
||||||
if not dry_run:
|
|
||||||
if backup_dir:
|
|
||||||
backup_file(tp.path, backup_dir)
|
|
||||||
|
|
||||||
if write_tags(tp.path, tp, proposal):
|
|
||||||
stats["tags_written"] += 1
|
|
||||||
else:
|
|
||||||
stats["errors"] += 1
|
|
||||||
|
|
||||||
if embed_cover_art and proposal.cover_path:
|
|
||||||
if embed_cover(tp.path, proposal.cover_path):
|
|
||||||
stats["covers_embedded"] += 1
|
|
||||||
cover_embedded = True
|
|
||||||
|
|
||||||
if do_rename:
|
|
||||||
new_name = _proposed_filename(
|
|
||||||
tp, tp.path.suffix,
|
|
||||||
albumartist=proposal.albumartist or "",
|
|
||||||
genre=proposal.genre or "",
|
|
||||||
)
|
|
||||||
candidate = tp.path.parent / new_name
|
|
||||||
if candidate != tp.path:
|
|
||||||
try:
|
|
||||||
tp.path.rename(candidate)
|
|
||||||
new_path = candidate
|
|
||||||
renamed_to = new_name
|
|
||||||
stats["files_renamed"] += 1
|
|
||||||
except Exception as e:
|
|
||||||
print(f" ⚠️ Umbenennungsfehler {tp.path.name}: {e}", file=sys.stderr)
|
|
||||||
stats["errors"] += 1
|
|
||||||
|
|
||||||
if not dry_run:
|
|
||||||
final_tracks.append((tp, new_path))
|
|
||||||
|
|
||||||
report_data.append({
|
|
||||||
"status": "dry-run" if dry_run else "ok",
|
|
||||||
"album_dir": str(proposal.album_dir.name),
|
|
||||||
"track_path": str(new_path),
|
|
||||||
"old_title": old_title,
|
|
||||||
"new_title": tp.title,
|
|
||||||
"old_artist": old_artist,
|
|
||||||
"new_artist": tp.artist,
|
|
||||||
"album": proposal.album,
|
|
||||||
"albumartist": proposal.albumartist,
|
|
||||||
"date": proposal.date or "",
|
|
||||||
"genre": proposal.genre or "",
|
|
||||||
"label": proposal.label or "",
|
|
||||||
"track_number": tp.track_number or "",
|
|
||||||
"disc_number": tp.disc_number or "",
|
|
||||||
"cover_embedded": cover_embedded,
|
|
||||||
"renamed_to": renamed_to,
|
|
||||||
"confidence": f"{proposal.confidence:.2f}",
|
|
||||||
"sources": ", ".join(proposal.sources),
|
|
||||||
})
|
|
||||||
|
|
||||||
# M3U-Playlist aktualisieren wenn Dateien umbenannt wurden
|
|
||||||
if do_rename and not dry_run and stats["files_renamed"] > 0 and final_tracks:
|
|
||||||
m3u_files = (
|
|
||||||
list(proposal.album_dir.glob("*.m3u")) +
|
|
||||||
list(proposal.album_dir.glob("*.m3u8"))
|
|
||||||
)
|
|
||||||
if m3u_files:
|
|
||||||
_update_m3u(m3u_files[0], final_tracks)
|
|
||||||
|
|
||||||
# Nach allen Umbenennungen: Verzeichnis Linux-kompatibel bereinigen
|
|
||||||
if do_rename and not dry_run:
|
|
||||||
sanitize_dir_names(proposal.album_dir)
|
|
||||||
|
|
||||||
return stats
|
|
||||||
|
|
||||||
|
|
||||||
def sanitize_dir_names(directory: Path) -> None:
|
|
||||||
"""
|
|
||||||
Macht alle Dateinamen im Verzeichnis Linux-kompatibel.
|
|
||||||
Bevorzugt 'NameToUnix <dir>', fällt auf 'detox <file>' zurück.
|
|
||||||
"""
|
|
||||||
name_to_unix = shutil.which("NameToUnix")
|
|
||||||
if name_to_unix:
|
|
||||||
try:
|
|
||||||
subprocess.run([name_to_unix, str(directory)], check=True, capture_output=True)
|
|
||||||
return
|
|
||||||
except subprocess.CalledProcessError as e:
|
|
||||||
print(f" ⚠️ NameToUnix-Fehler: {e.stderr.decode(errors='replace').strip()}", file=sys.stderr)
|
|
||||||
|
|
||||||
detox = shutil.which("detox")
|
|
||||||
if detox:
|
|
||||||
for f in sorted(directory.rglob("*")):
|
|
||||||
if f.is_file():
|
|
||||||
try:
|
|
||||||
subprocess.run([detox, str(f)], check=True, capture_output=True)
|
|
||||||
except subprocess.CalledProcessError as e:
|
|
||||||
print(f" ⚠️ detox-Fehler {f.name}: {e.stderr.decode(errors='replace').strip()}", file=sys.stderr)
|
|
||||||
else:
|
|
||||||
print(" ℹ️ Weder NameToUnix noch detox gefunden — Dateinamen nicht nachbereinigt.", file=sys.stderr)
|
|
||||||
|
|
||||||
|
|
||||||
def write_report(report_data: List[Dict[str, Any]], report_path: Path) -> None:
|
|
||||||
try:
|
|
||||||
report_path.parent.mkdir(parents=True, exist_ok=True)
|
|
||||||
with report_path.open("w", encoding="utf-8", newline="") as f:
|
|
||||||
w = csv.DictWriter(f, fieldnames=REPORT_FIELDS)
|
|
||||||
w.writeheader()
|
|
||||||
w.writerows(report_data)
|
|
||||||
print(f"📊 Report gespeichert: {report_path}")
|
|
||||||
except Exception as e:
|
|
||||||
print(f"⚠️ Report-Fehler: {e}", file=sys.stderr)
|
|
||||||
|
|
@ -1,583 +0,0 @@
|
||||||
from __future__ import annotations
|
|
||||||
|
|
||||||
import base64
|
|
||||||
import json
|
|
||||||
import os
|
|
||||||
import re
|
|
||||||
import shutil
|
|
||||||
import subprocess
|
|
||||||
import sys
|
|
||||||
import urllib.request
|
|
||||||
from pathlib import Path
|
|
||||||
from typing import Optional, List, Dict, Tuple
|
|
||||||
|
|
||||||
from models import AlbumScan, AlbumHints, TrackHints
|
|
||||||
|
|
||||||
try:
|
|
||||||
from mutagen import File as MutagenFile
|
|
||||||
HAS_MUTAGEN = True
|
|
||||||
except ImportError:
|
|
||||||
HAS_MUTAGEN = False
|
|
||||||
|
|
||||||
try:
|
|
||||||
from bs4 import BeautifulSoup
|
|
||||||
HAS_BS4 = True
|
|
||||||
except ImportError:
|
|
||||||
HAS_BS4 = False
|
|
||||||
|
|
||||||
_NATSORT_RE = re.compile(r"(\d+)")
|
|
||||||
_BAD_VALUES = {"unknown", "unknown artist", "unknown album", "untitled", "track", "va", "various"}
|
|
||||||
|
|
||||||
# Filename patterns: most specific first
|
|
||||||
_FILENAME_PATTERNS = [
|
|
||||||
re.compile(r"^(?P<disc>\d{1,2})[- _]+(?P<track>\d{1,3})\s*[-._ ]+\s*(?P<artist>.+?)\s*[-–]\s*(?P<title>.+)$"),
|
|
||||||
re.compile(r"^(?P<disc>\d{1,2})[- _]+(?P<track>\d{1,3})\s*[-._ ]+\s*(?P<title>.+)$"),
|
|
||||||
re.compile(r"^(?P<track>\d{1,3})\s*[-._ ]+\s*(?P<artist>.+?)\s*[-–]\s*(?P<title>.+)$"),
|
|
||||||
re.compile(r"^(?P<track>\d{1,3})\s*[-._ ]+\s*(?P<title>.+)$"),
|
|
||||||
re.compile(r"^(?P<artist>.+?)\s*[-–]\s*(?P<title>.+)$"),
|
|
||||||
]
|
|
||||||
|
|
||||||
# Directory name patterns
|
|
||||||
_DIR_PATTERNS = [
|
|
||||||
re.compile(r"^(?P<artist>.+?)[_ -]+[-–][_ -]+(?P<album>.+?)(?:[_ -]+(?P<year>\d{4}))?$"),
|
|
||||||
re.compile(r"^(?P<artist>.+?)[_ ]+(?P<year>\d{4})[._ -]+(?P<album>.+)$"),
|
|
||||||
re.compile(r"^(?P<album>.+?)[_ -]+(?P<year>\d{4})$"),
|
|
||||||
]
|
|
||||||
|
|
||||||
# Tracklist line patterns
|
|
||||||
_TRACKLIST_PATTERNS = [
|
|
||||||
re.compile(r"^(?P<disc>\d{1,2})[- _](?P<track>\d{1,3})\s+(?P<title>.+?)(?:\s+\d+:\d{2})?$"),
|
|
||||||
# Separator muss . ) oder : sein — reines Leerzeichen reicht nicht
|
|
||||||
# (verhindert False-Positives wie "2 x CD, Compilation, Remastered")
|
|
||||||
re.compile(r"^(?P<track>\d{1,3})[.):]\s*(?P<title>.+?)(?:\s+\d+:\d{2})?$"),
|
|
||||||
re.compile(r"^(?P<track>[A-Z]\d{1,2})[.):]\s*(?P<title>.+?)(?:\s+\d+:\d{2})?$"),
|
|
||||||
]
|
|
||||||
|
|
||||||
_DISC_SECTION_RE = re.compile(r"(?i)(?:cd|disc|disk|side)[_ \-]*(\d{1,2})")
|
|
||||||
|
|
||||||
|
|
||||||
def _clean(s: Optional[str]) -> str:
|
|
||||||
if not s:
|
|
||||||
return ""
|
|
||||||
# BOM (U+FEFF), Zero-Width-Space (U+200B), Soft-Hyphen (U+00AD) entfernen
|
|
||||||
s = re.sub(r"[]", "", s)
|
|
||||||
return re.sub(r"\s+", " ", s.replace("_", " ")).strip(" -._")
|
|
||||||
|
|
||||||
|
|
||||||
def _norm_for_match(s: str) -> str:
|
|
||||||
"""Nur Buchstaben und Ziffern — für fuzzy Titelvergleich (Interpunktion-agnostisch)."""
|
|
||||||
return re.sub(r"[^a-z0-9]", "", s.casefold())
|
|
||||||
|
|
||||||
|
|
||||||
# Klassische Werkverzeichnis-Nummern: BWV 565, Op. 27, K. 331, HWV 56, …
|
|
||||||
_CATALOG_RE = re.compile(
|
|
||||||
r"\b(bwv|hwv|op|k|kv|d|sz|wq|bbwv|rv|twv|hob)\W*(\d+[a-z]?(?:[\/\.]\d+)?)",
|
|
||||||
re.IGNORECASE,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def _catalog_key(s: str) -> Optional[str]:
|
|
||||||
"""Extrahiert normalisierte Katalognummer, z.B. 'bwv565' oder 'op27'."""
|
|
||||||
m = _CATALOG_RE.search(s)
|
|
||||||
if m:
|
|
||||||
return m.group(1).lower() + re.sub(r"\W", "", m.group(2))
|
|
||||||
return None
|
|
||||||
|
|
||||||
|
|
||||||
def _is_good(v: Optional[str]) -> bool:
|
|
||||||
if not v:
|
|
||||||
return False
|
|
||||||
return _clean(v).casefold() not in _BAD_VALUES
|
|
||||||
|
|
||||||
|
|
||||||
def _parse_dirname(name: str) -> Tuple[Optional[str], Optional[str], Optional[str]]:
|
|
||||||
name_clean = _clean(name)
|
|
||||||
for pat in _DIR_PATTERNS:
|
|
||||||
m = pat.match(name_clean)
|
|
||||||
if m:
|
|
||||||
d = m.groupdict()
|
|
||||||
artist = _clean(d.get("artist")) or None
|
|
||||||
album = _clean(d.get("album")) or None
|
|
||||||
year = d.get("year")
|
|
||||||
if _is_good(artist) or _is_good(album):
|
|
||||||
return artist, album, year
|
|
||||||
# No pattern matched — treat whole name as album
|
|
||||||
return None, _clean(name_clean), None
|
|
||||||
|
|
||||||
|
|
||||||
def _parse_filename(stem: str) -> Dict[str, str]:
|
|
||||||
stem_clean = _clean(stem)
|
|
||||||
for pat in _FILENAME_PATTERNS:
|
|
||||||
m = pat.match(stem_clean)
|
|
||||||
if m:
|
|
||||||
return {k: _clean(v) for k, v in m.groupdict().items() if v}
|
|
||||||
return {"title": stem_clean}
|
|
||||||
|
|
||||||
|
|
||||||
def _read_tags(path: Path) -> Tuple[Dict[str, str], Optional[float]]:
|
|
||||||
if not HAS_MUTAGEN:
|
|
||||||
return {}, None
|
|
||||||
try:
|
|
||||||
audio = MutagenFile(str(path), easy=True)
|
|
||||||
if not audio:
|
|
||||||
return {}, None
|
|
||||||
tags: Dict[str, str] = {}
|
|
||||||
for k in ("title", "artist", "album", "albumartist", "tracknumber",
|
|
||||||
"discnumber", "date", "year", "genre", "label", "organization"):
|
|
||||||
v = audio.get(k)
|
|
||||||
if v:
|
|
||||||
tags[k] = str(v[0]).strip()
|
|
||||||
if "year" in tags and "date" not in tags:
|
|
||||||
tags["date"] = tags["year"]
|
|
||||||
duration = None
|
|
||||||
if hasattr(audio, "info") and audio.info and hasattr(audio.info, "length"):
|
|
||||||
duration = audio.info.length
|
|
||||||
return tags, duration
|
|
||||||
except Exception as e:
|
|
||||||
print(f" ⚠️ Tag-Lesefehler {path.name}: {e}", file=sys.stderr)
|
|
||||||
return {}, None
|
|
||||||
|
|
||||||
|
|
||||||
def _parse_tracklist(text: str) -> List[Dict[str, str]]:
|
|
||||||
tracks: List[Dict[str, str]] = []
|
|
||||||
current_disc = 1
|
|
||||||
|
|
||||||
for line in text.splitlines():
|
|
||||||
line = line.strip()
|
|
||||||
if not line:
|
|
||||||
continue
|
|
||||||
|
|
||||||
disc_m = _DISC_SECTION_RE.match(line)
|
|
||||||
if disc_m and len(line) < 30:
|
|
||||||
current_disc = int(disc_m.group(1))
|
|
||||||
continue
|
|
||||||
|
|
||||||
for pat in _TRACKLIST_PATTERNS:
|
|
||||||
m = pat.match(line)
|
|
||||||
if m:
|
|
||||||
d = m.groupdict()
|
|
||||||
entry: Dict[str, str] = {"title": _clean(d.get("title", ""))}
|
|
||||||
raw_track = d.get("track", "")
|
|
||||||
if raw_track and raw_track.isdigit():
|
|
||||||
entry["track"] = raw_track.lstrip("0") or "0"
|
|
||||||
elif raw_track:
|
|
||||||
entry["track"] = raw_track
|
|
||||||
if "disc" in d and d["disc"]:
|
|
||||||
entry["disc"] = d["disc"]
|
|
||||||
else:
|
|
||||||
entry["disc"] = str(current_disc)
|
|
||||||
if entry.get("title"):
|
|
||||||
tracks.append(entry)
|
|
||||||
break
|
|
||||||
|
|
||||||
return tracks
|
|
||||||
|
|
||||||
|
|
||||||
def _parse_m3u(text: str) -> List[Dict[str, str]]:
|
|
||||||
"""M3U/M3U8 → geordnete Liste: [{filename, title, position}].
|
|
||||||
Reihenfolge der Einträge = gewünschte Trackreihenfolge.
|
|
||||||
"""
|
|
||||||
tracks: List[Dict[str, str]] = []
|
|
||||||
pending_title: Optional[str] = None
|
|
||||||
position = 0
|
|
||||||
for line in text.splitlines():
|
|
||||||
line = line.strip()
|
|
||||||
if not line:
|
|
||||||
continue
|
|
||||||
if line.upper().startswith("#EXTINF:"):
|
|
||||||
parts = line.split(",", 1)
|
|
||||||
pending_title = parts[1].strip() if len(parts) > 1 else None
|
|
||||||
elif not line.startswith("#"):
|
|
||||||
filename = Path(line.replace("\\", "/")).name
|
|
||||||
if not filename:
|
|
||||||
continue
|
|
||||||
position += 1
|
|
||||||
tracks.append({
|
|
||||||
"position": str(position),
|
|
||||||
"filename": filename,
|
|
||||||
"title": pending_title or "",
|
|
||||||
})
|
|
||||||
pending_title = None
|
|
||||||
return tracks
|
|
||||||
|
|
||||||
|
|
||||||
def _read_tracklist_file(path: Path) -> Optional[str]:
|
|
||||||
try:
|
|
||||||
if path.suffix.lower() in (".htm", ".html"):
|
|
||||||
raw = path.read_bytes()
|
|
||||||
encoding = "utf-8"
|
|
||||||
for enc in ("utf-8", "latin-1", "cp1252"):
|
|
||||||
try:
|
|
||||||
raw.decode(enc)
|
|
||||||
encoding = enc
|
|
||||||
break
|
|
||||||
except UnicodeDecodeError:
|
|
||||||
continue
|
|
||||||
text = raw.decode(encoding, errors="replace")
|
|
||||||
if HAS_BS4:
|
|
||||||
soup = BeautifulSoup(text, "html.parser")
|
|
||||||
return soup.get_text(separator="\n")
|
|
||||||
# Fallback: strip HTML tags
|
|
||||||
return re.sub(r"<[^>]+>", " ", text)
|
|
||||||
else:
|
|
||||||
for enc in ("utf-8", "latin-1", "cp1252"):
|
|
||||||
try:
|
|
||||||
return path.read_text(encoding=enc)
|
|
||||||
except UnicodeDecodeError:
|
|
||||||
continue
|
|
||||||
except Exception as e:
|
|
||||||
print(f" ⚠️ Tracklist-Lesefehler {path.name}: {e}", file=sys.stderr)
|
|
||||||
return None
|
|
||||||
|
|
||||||
|
|
||||||
_OLLAMA_HOST = os.getenv("OLLAMA_HOST", "http://localhost:11434")
|
|
||||||
# Modelle in Prioritätsreihenfolge; überschreibbar via OLLAMA_OCR_MODEL
|
|
||||||
_OCR_MODELS = [m.strip() for m in os.getenv(
|
|
||||||
"OLLAMA_OCR_MODEL",
|
|
||||||
"qwen3-vl:latest,minicpm-v:latest,deepseek-ocr:latest"
|
|
||||||
).split(",") if m.strip()]
|
|
||||||
|
|
||||||
_OCR_PROMPT = (
|
|
||||||
"This image shows a CD album back cover or booklet page. "
|
|
||||||
"Your task: extract the complete tracklist as plain text.\n"
|
|
||||||
"Rules:\n"
|
|
||||||
"- Output track number and title per line, e.g. '1. Title' or '1-1 Title'\n"
|
|
||||||
"- If multiple discs/CDs: include a header like 'CD 1' or 'Disc 1' before each group\n"
|
|
||||||
"- Include durations if visible (e.g. '1. Title 4:32')\n"
|
|
||||||
"- Do NOT include label info, barcodes, or other non-tracklist text\n"
|
|
||||||
"- If no tracklist is visible, reply with: NO_TRACKLIST"
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def _ocr_back_cover(image_files: List[Path]) -> Optional[str]:
|
|
||||||
"""
|
|
||||||
OCR eines Back-Cover- oder Booklet-Bildes via Ollama Vision.
|
|
||||||
Gibt den erkannten Text zurück, oder None wenn nichts gefunden.
|
|
||||||
"""
|
|
||||||
# Nur Bilder die nach Back-Cover aussehen
|
|
||||||
candidates = [
|
|
||||||
p for p in image_files
|
|
||||||
if any(kw in p.name.lower() for kw in ("back", "inlay", "booklet", "inside", "rear"))
|
|
||||||
]
|
|
||||||
# Fallback: alle Bilder außer dem Front-Cover
|
|
||||||
if not candidates:
|
|
||||||
candidates = [
|
|
||||||
p for p in image_files
|
|
||||||
if not any(kw in p.name.lower() for kw in ("front", "folder", "cover"))
|
|
||||||
]
|
|
||||||
if not candidates:
|
|
||||||
return None
|
|
||||||
|
|
||||||
image_path = candidates[0]
|
|
||||||
try:
|
|
||||||
img_b64 = base64.b64encode(image_path.read_bytes()).decode()
|
|
||||||
except Exception as e:
|
|
||||||
print(f" ⚠️ OCR-Bild lesen {image_path.name}: {e}", file=sys.stderr)
|
|
||||||
return None
|
|
||||||
|
|
||||||
for model in _OCR_MODELS:
|
|
||||||
payload = json.dumps({
|
|
||||||
"model": model,
|
|
||||||
"messages": [{
|
|
||||||
"role": "user",
|
|
||||||
"content": _OCR_PROMPT,
|
|
||||||
"images": [img_b64],
|
|
||||||
}],
|
|
||||||
"stream": False,
|
|
||||||
"options": {"temperature": 0.0},
|
|
||||||
}).encode()
|
|
||||||
try:
|
|
||||||
req = urllib.request.Request(
|
|
||||||
f"{_OLLAMA_HOST}/api/chat",
|
|
||||||
data=payload,
|
|
||||||
headers={"Content-Type": "application/json"},
|
|
||||||
method="POST",
|
|
||||||
)
|
|
||||||
with urllib.request.urlopen(req, timeout=180) as resp:
|
|
||||||
data = json.loads(resp.read())
|
|
||||||
text = data.get("message", {}).get("content", "").strip()
|
|
||||||
if text and "NO_TRACKLIST" not in text:
|
|
||||||
print(f" 📷 OCR {image_path.name} via {model}: {len(text)} Zeichen extrahiert",
|
|
||||||
file=sys.stderr)
|
|
||||||
return text
|
|
||||||
elif "NO_TRACKLIST" in text:
|
|
||||||
print(f" 📷 OCR {image_path.name}: kein Tracklist-Text erkannt", file=sys.stderr)
|
|
||||||
return None
|
|
||||||
except Exception as e:
|
|
||||||
print(f" ⚠️ OCR-Fehler ({model}) {image_path.name}: {e}", file=sys.stderr)
|
|
||||||
continue
|
|
||||||
return None
|
|
||||||
|
|
||||||
|
|
||||||
def _check_cover_images(paths: List[Path]) -> List[Path]:
|
|
||||||
good: List[Path] = []
|
|
||||||
for p in paths:
|
|
||||||
name_lower = p.name.lower()
|
|
||||||
# Prefer front covers
|
|
||||||
if any(kw in name_lower for kw in ("front", "folder", "cover", "album")):
|
|
||||||
good.insert(0, p)
|
|
||||||
else:
|
|
||||||
good.append(p)
|
|
||||||
return good
|
|
||||||
|
|
||||||
|
|
||||||
# YouTube-Video-ID: 11 Zeichen aus [A-Za-z0-9_-], eingebettet im Dateinamen
|
|
||||||
_YT_ID_RE = re.compile(r"(?<![A-Za-z0-9_-])([A-Za-z0-9_-]{11})(?![A-Za-z0-9_-])")
|
|
||||||
|
|
||||||
|
|
||||||
def _extract_youtube_id(path: Path) -> Optional[str]:
|
|
||||||
"""Sucht eine YouTube-Video-ID im Dateinamen (Stem oder Suffix)."""
|
|
||||||
name = path.stem + path.suffix
|
|
||||||
for m in _YT_ID_RE.finditer(name):
|
|
||||||
candidate = m.group(1)
|
|
||||||
# Einfache Plausibilitätsprüfung: muss gemischte Zeichen haben
|
|
||||||
if re.search(r"[A-Z]", candidate) and re.search(r"[0-9a-z]", candidate):
|
|
||||||
return candidate
|
|
||||||
return None
|
|
||||||
|
|
||||||
|
|
||||||
def _fetch_youtube_metadata(video_id: str) -> Optional[Dict]:
|
|
||||||
"""
|
|
||||||
Ruft YouTube-Metadaten via yt-dlp ab (kein API-Key nötig).
|
|
||||||
Gibt Dict mit title, uploader, chapters, description zurück oder None.
|
|
||||||
"""
|
|
||||||
ytdlp = shutil.which("yt-dlp")
|
|
||||||
if not ytdlp:
|
|
||||||
return None
|
|
||||||
url = f"https://www.youtube.com/watch?v={video_id}"
|
|
||||||
try:
|
|
||||||
result = subprocess.run(
|
|
||||||
[ytdlp, "--dump-json", "--no-download", "--no-playlist", url],
|
|
||||||
capture_output=True, text=True, timeout=30,
|
|
||||||
)
|
|
||||||
if result.returncode != 0 or not result.stdout.strip():
|
|
||||||
return None
|
|
||||||
return json.loads(result.stdout)
|
|
||||||
except Exception as e:
|
|
||||||
print(f" ⚠️ YouTube-Fehler ({video_id}): {e}", file=sys.stderr)
|
|
||||||
return None
|
|
||||||
|
|
||||||
|
|
||||||
def _chapters_to_tracklist_text(chapters: List[Dict]) -> str:
|
|
||||||
"""
|
|
||||||
Konvertiert yt-dlp-Chapters in Tracklist-Text der vom _parse_tracklist
|
|
||||||
verarbeitetet werden kann: '1. Titel MM:SS'
|
|
||||||
"""
|
|
||||||
lines = []
|
|
||||||
for i, ch in enumerate(chapters, 1):
|
|
||||||
title = ch.get("title", "").strip()
|
|
||||||
if not title or title.startswith("<Untitled"):
|
|
||||||
continue
|
|
||||||
secs = int(ch.get("start_time", 0))
|
|
||||||
mm, ss = divmod(secs, 60)
|
|
||||||
lines.append(f"{i}. {title} {mm}:{ss:02d}")
|
|
||||||
return "\n".join(lines)
|
|
||||||
|
|
||||||
|
|
||||||
def extract_hints(scan: AlbumScan, use_ocr: bool = True) -> AlbumHints:
|
|
||||||
hints = AlbumHints(album_dir=scan.album_dir)
|
|
||||||
|
|
||||||
# Directory name
|
|
||||||
hints.dir_artist, hints.dir_album, hints.dir_year = _parse_dirname(scan.album_dir.name)
|
|
||||||
|
|
||||||
# Cover images
|
|
||||||
hints.cover_images = _check_cover_images(scan.image_files)
|
|
||||||
|
|
||||||
# Tracklist files
|
|
||||||
texts: List[str] = []
|
|
||||||
for tf in scan.tracklist_files:
|
|
||||||
txt = _read_tracklist_file(tf)
|
|
||||||
if txt:
|
|
||||||
texts.append(txt)
|
|
||||||
hints.tracklist_text = "\n\n".join(texts) if texts else None
|
|
||||||
|
|
||||||
# OCR-Fallback: Back-Cover scannen wenn keine Tracklist-Textdatei vorhanden
|
|
||||||
if use_ocr and not hints.tracklist_text and scan.image_files:
|
|
||||||
ocr_text = _ocr_back_cover(scan.image_files)
|
|
||||||
if ocr_text:
|
|
||||||
hints.tracklist_text = ocr_text
|
|
||||||
|
|
||||||
# YouTube-Lookup: IDs aus Dateinamen extrahieren, Metadaten per yt-dlp holen
|
|
||||||
yt_meta_by_id: Dict[str, Optional[Dict]] = {}
|
|
||||||
yt_ids_by_stem: Dict[str, str] = {} # stem (normalisiert) → youtube_id
|
|
||||||
|
|
||||||
for audio_path in scan.audio_files:
|
|
||||||
yt_id = _extract_youtube_id(audio_path)
|
|
||||||
if yt_id:
|
|
||||||
stem_key = _clean(audio_path.stem).casefold()
|
|
||||||
yt_ids_by_stem[stem_key] = yt_id
|
|
||||||
yt_meta_by_id.setdefault(yt_id, None)
|
|
||||||
|
|
||||||
if yt_meta_by_id:
|
|
||||||
print(f" 📺 YouTube-IDs gefunden: {', '.join(list(yt_meta_by_id.keys())[:5])}", file=sys.stderr)
|
|
||||||
for yt_id in list(yt_meta_by_id.keys())[:5]:
|
|
||||||
meta = _fetch_youtube_metadata(yt_id)
|
|
||||||
yt_meta_by_id[yt_id] = meta
|
|
||||||
|
|
||||||
# Chapters als Tracklist nutzen wenn noch keine vorhanden
|
|
||||||
if not hints.tracklist_text:
|
|
||||||
for yt_id, meta in yt_meta_by_id.items():
|
|
||||||
if meta and meta.get("chapters"):
|
|
||||||
chapter_text = _chapters_to_tracklist_text(meta["chapters"])
|
|
||||||
if chapter_text:
|
|
||||||
hints.tracklist_text = chapter_text
|
|
||||||
print(f" 📺 YouTube-Chapters als Tracklist: {len(meta['chapters'])} Tracks",
|
|
||||||
file=sys.stderr)
|
|
||||||
break
|
|
||||||
|
|
||||||
# Album-Level-Hints (erster erfolgreicher Treffer)
|
|
||||||
for yt_id, meta in yt_meta_by_id.items():
|
|
||||||
if meta:
|
|
||||||
hints.yt_title = (meta.get("title") or "").strip() or None
|
|
||||||
hints.yt_uploader = (
|
|
||||||
meta.get("uploader") or meta.get("channel") or ""
|
|
||||||
).strip() or None
|
|
||||||
break
|
|
||||||
|
|
||||||
parsed_tracklist = _parse_tracklist(hints.tracklist_text) if hints.tracklist_text else []
|
|
||||||
|
|
||||||
# M3U/Playlist-Reihenfolge: filename (stem, normalisiert) → Tracknummer
|
|
||||||
m3u_order: Dict[str, int] = {}
|
|
||||||
m3u_titles: Dict[str, str] = {}
|
|
||||||
for pf in scan.playlist_files:
|
|
||||||
try:
|
|
||||||
text = pf.read_text(encoding="utf-8", errors="replace")
|
|
||||||
for entry in _parse_m3u(text):
|
|
||||||
stem = _clean(Path(entry["filename"]).stem).casefold()
|
|
||||||
pos = int(entry["position"])
|
|
||||||
if stem and stem not in m3u_order:
|
|
||||||
m3u_order[stem] = pos
|
|
||||||
if entry.get("title"):
|
|
||||||
m3u_titles[stem] = entry["title"]
|
|
||||||
except Exception as e:
|
|
||||||
print(f" ⚠️ Playlist-Lesefehler {pf.name}: {e}", file=sys.stderr)
|
|
||||||
|
|
||||||
# Tracklist-Lookup: exakter Titel, fuzzy Titel, Katalognummer (BWV, Op., K., …)
|
|
||||||
tl_by_title: Dict[str, Dict[str, str]] = {}
|
|
||||||
tl_by_title_norm: Dict[str, Dict[str, str]] = {}
|
|
||||||
tl_by_catalog: Dict[str, Dict[str, str]] = {}
|
|
||||||
for entry in parsed_tracklist:
|
|
||||||
raw_title = entry.get("title", "")
|
|
||||||
exact_key = _clean(raw_title).casefold()
|
|
||||||
if exact_key:
|
|
||||||
tl_by_title[exact_key] = entry
|
|
||||||
norm_key = _norm_for_match(raw_title)
|
|
||||||
if norm_key:
|
|
||||||
tl_by_title_norm[norm_key] = entry
|
|
||||||
cat_key = _catalog_key(raw_title)
|
|
||||||
if cat_key:
|
|
||||||
tl_by_catalog[cat_key] = entry
|
|
||||||
|
|
||||||
# Build TrackHints per audio file
|
|
||||||
for audio_path in sorted(scan.audio_files):
|
|
||||||
tags, duration = _read_tags(audio_path)
|
|
||||||
fn_hints = _parse_filename(audio_path.stem)
|
|
||||||
|
|
||||||
track_num: Optional[int] = None
|
|
||||||
disc_num: Optional[int] = None
|
|
||||||
|
|
||||||
# Track number: tag > filename
|
|
||||||
raw_tn = tags.get("tracknumber") or fn_hints.get("track")
|
|
||||||
if raw_tn:
|
|
||||||
try:
|
|
||||||
tn_int = int(str(raw_tn).split("/")[0])
|
|
||||||
if tn_int > 0: # 0 gilt als "keine Nummer"
|
|
||||||
track_num = tn_int
|
|
||||||
except ValueError:
|
|
||||||
pass
|
|
||||||
|
|
||||||
# Disc number: tag > filename > path segment
|
|
||||||
raw_dn = tags.get("discnumber") or fn_hints.get("disc")
|
|
||||||
if raw_dn:
|
|
||||||
try:
|
|
||||||
disc_num = int(str(raw_dn).split("/")[0])
|
|
||||||
except ValueError:
|
|
||||||
pass
|
|
||||||
if not disc_num:
|
|
||||||
for part in audio_path.relative_to(scan.album_dir).parts[:-1]:
|
|
||||||
dm = _DISC_SECTION_RE.search(part)
|
|
||||||
if dm:
|
|
||||||
disc_num = int(dm.group(1))
|
|
||||||
break
|
|
||||||
|
|
||||||
title = tags.get("title") or fn_hints.get("title")
|
|
||||||
artist = tags.get("artist") or fn_hints.get("artist")
|
|
||||||
|
|
||||||
# Tracklist-Matching: Nummer → exakter Titel → fuzzy Titel
|
|
||||||
# Wenn ein Match gefunden: disc+track aus Tracklist übernehmen (Tracklist ist
|
|
||||||
# autoritativer als M3U-Reihenfolge bei Alben mit expliziter Disc-Nummerierung).
|
|
||||||
if parsed_tracklist:
|
|
||||||
matched_tl: Optional[Dict[str, str]] = None
|
|
||||||
|
|
||||||
# 1. Exakt per Tracknummer + Disc (nur wenn beides aus Tag/Dateiname bekannt)
|
|
||||||
if track_num and disc_num:
|
|
||||||
for tl_entry in parsed_tracklist:
|
|
||||||
tl_track = tl_entry.get("track")
|
|
||||||
tl_disc = tl_entry.get("disc", "1")
|
|
||||||
if (tl_track and int(tl_track) == track_num
|
|
||||||
and int(tl_disc) == disc_num):
|
|
||||||
matched_tl = tl_entry
|
|
||||||
break
|
|
||||||
|
|
||||||
# 2. Exakter Titelvergleich
|
|
||||||
if matched_tl is None and title:
|
|
||||||
matched_tl = tl_by_title.get(_clean(title).casefold())
|
|
||||||
|
|
||||||
# 3. Fuzzy Titelvergleich (ignoriert Kommas, Apostrophe, Groß-/Kleinschreibung)
|
|
||||||
if matched_tl is None and title:
|
|
||||||
matched_tl = tl_by_title_norm.get(_norm_for_match(title))
|
|
||||||
|
|
||||||
# 4. Katalognummer (BWV, Op., K. …) — greift bei abgekürzten Dateinamen
|
|
||||||
if matched_tl is None and title:
|
|
||||||
cat = _catalog_key(title)
|
|
||||||
if cat:
|
|
||||||
matched_tl = tl_by_catalog.get(cat)
|
|
||||||
|
|
||||||
if matched_tl:
|
|
||||||
# Titel aus Tracklist übernehmen wenn besser
|
|
||||||
if _is_good(matched_tl.get("title")):
|
|
||||||
title = matched_tl["title"]
|
|
||||||
# disc+track aus Tracklist sind autoritativer als M3U-Reihenfolge
|
|
||||||
try:
|
|
||||||
tl_track_n = int(matched_tl["track"]) if matched_tl.get("track") else None
|
|
||||||
tl_disc_n = int(matched_tl.get("disc", "1"))
|
|
||||||
if tl_track_n:
|
|
||||||
track_num = tl_track_n
|
|
||||||
disc_num = tl_disc_n
|
|
||||||
except (ValueError, KeyError):
|
|
||||||
pass
|
|
||||||
|
|
||||||
# M3U-Reihenfolge nur als letzter Fallback (wenn Tracklist kein Match liefert)
|
|
||||||
if track_num is None:
|
|
||||||
stem_key = _clean(audio_path.stem).casefold()
|
|
||||||
if stem_key in m3u_order:
|
|
||||||
track_num = m3u_order[stem_key]
|
|
||||||
|
|
||||||
# M3U-Titel als Fallback (enthält "Composer - Title" — nur nutzen wenn kein besserer Titel)
|
|
||||||
if not _is_good(title):
|
|
||||||
stem_key = _clean(audio_path.stem).casefold()
|
|
||||||
if stem_key in m3u_titles:
|
|
||||||
title = m3u_titles[stem_key]
|
|
||||||
|
|
||||||
# YouTube-Titel als letzter Fallback (bei einzelner Datei = das ganze Video)
|
|
||||||
if not _is_good(title):
|
|
||||||
stem_key = _clean(audio_path.stem).casefold()
|
|
||||||
yt_id = yt_ids_by_stem.get(stem_key)
|
|
||||||
if yt_id:
|
|
||||||
meta = yt_meta_by_id.get(yt_id)
|
|
||||||
if meta:
|
|
||||||
yt_video_title = (meta.get("title") or "").strip()
|
|
||||||
if yt_video_title:
|
|
||||||
title = yt_video_title
|
|
||||||
|
|
||||||
hints.tracks.append(TrackHints(
|
|
||||||
path=audio_path,
|
|
||||||
track_number=track_num,
|
|
||||||
disc_number=disc_num,
|
|
||||||
title=_clean(title) if title else None,
|
|
||||||
artist=_clean(artist) if artist else None,
|
|
||||||
duration=duration,
|
|
||||||
existing_tags=tags,
|
|
||||||
))
|
|
||||||
|
|
||||||
return hints
|
|
||||||
|
|
@ -1,577 +0,0 @@
|
||||||
from __future__ import annotations
|
|
||||||
|
|
||||||
import os
|
|
||||||
import re
|
|
||||||
import sys
|
|
||||||
import time
|
|
||||||
from typing import Optional, List, Dict, Tuple
|
|
||||||
|
|
||||||
from models import AlbumHints, AlbumProposal, TrackProposal
|
|
||||||
|
|
||||||
try:
|
|
||||||
import musicbrainzngs as mb
|
|
||||||
mb.set_useragent("MusicMetadataEnricher", "1.0", "https://github.com/dschlueter")
|
|
||||||
HAS_MB = True
|
|
||||||
except ImportError:
|
|
||||||
HAS_MB = False
|
|
||||||
|
|
||||||
try:
|
|
||||||
import acoustid
|
|
||||||
HAS_ACOUSTID = True
|
|
||||||
except ImportError:
|
|
||||||
HAS_ACOUSTID = False
|
|
||||||
|
|
||||||
try:
|
|
||||||
import discogs_client as dc
|
|
||||||
HAS_DISCOGS = True
|
|
||||||
except ImportError:
|
|
||||||
HAS_DISCOGS = False
|
|
||||||
|
|
||||||
try:
|
|
||||||
import anthropic
|
|
||||||
HAS_ANTHROPIC = True
|
|
||||||
except ImportError:
|
|
||||||
HAS_ANTHROPIC = False
|
|
||||||
|
|
||||||
_MB_RATE_LIMIT = 1.1 # seconds between MusicBrainz requests
|
|
||||||
_last_mb_call = 0.0
|
|
||||||
ACOUSTID_API_KEY = os.getenv("ACOUSTID_API_KEY", "")
|
|
||||||
ANTHROPIC_API_KEY = os.getenv("ANTHROPIC_API_KEY", "")
|
|
||||||
OPENROUTER_API_KEY = os.getenv("OPENROUTER_API_KEY", "")
|
|
||||||
DISCOGS_TOKEN = os.getenv("DISCOGS_TOKEN", "")
|
|
||||||
OLLAMA_HOST = os.getenv("OLLAMA_HOST", "http://localhost:11434")
|
|
||||||
|
|
||||||
# qwen3:8b (5.2GB) reicht für einfache JSON-Metadaten-Ergänzung und lädt schnell (~10s)
|
|
||||||
OLLAMA_RESOLVE_MODEL = os.getenv("OLLAMA_RESOLVE_MODEL", "qwen3:8b")
|
|
||||||
|
|
||||||
|
|
||||||
def _mb_wait():
|
|
||||||
global _last_mb_call
|
|
||||||
elapsed = time.monotonic() - _last_mb_call
|
|
||||||
if elapsed < _MB_RATE_LIMIT:
|
|
||||||
time.sleep(_MB_RATE_LIMIT - elapsed)
|
|
||||||
_last_mb_call = time.monotonic()
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# AcoustID fingerprinting
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
def _fingerprint_tracks(hints: AlbumHints) -> Dict[str, List[str]]:
|
|
||||||
"""Returns {audio_path_str: [mbid, ...]}"""
|
|
||||||
if not HAS_ACOUSTID or not ACOUSTID_API_KEY:
|
|
||||||
return {}
|
|
||||||
results: Dict[str, List[str]] = {}
|
|
||||||
for t in hints.tracks:
|
|
||||||
try:
|
|
||||||
duration, fp = acoustid.fingerprint_file(str(t.path))
|
|
||||||
response = acoustid.lookup(ACOUSTID_API_KEY, fp, duration,
|
|
||||||
meta="recordings releasegroups")
|
|
||||||
mbids: List[str] = []
|
|
||||||
for result in response.get("results", []):
|
|
||||||
if result.get("score", 0) >= 0.90:
|
|
||||||
for rec in result.get("recordings", []):
|
|
||||||
mbids.append(rec["id"])
|
|
||||||
results[str(t.path)] = mbids
|
|
||||||
except Exception as e:
|
|
||||||
print(f" ⚠️ AcoustID-Fehler {t.path.name}: {e}", file=sys.stderr)
|
|
||||||
return results
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# MusicBrainz lookup
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
def _mb_search_release(artist: Optional[str], album: Optional[str],
|
|
||||||
year: Optional[str]) -> Optional[Dict]:
|
|
||||||
if not HAS_MB or (not artist and not album):
|
|
||||||
return None
|
|
||||||
query_parts = []
|
|
||||||
if album:
|
|
||||||
query_parts.append(f'release:"{album}"')
|
|
||||||
if artist:
|
|
||||||
query_parts.append(f'artist:"{artist}"')
|
|
||||||
if year:
|
|
||||||
query_parts.append(f'date:{year}')
|
|
||||||
query = " AND ".join(query_parts)
|
|
||||||
try:
|
|
||||||
_mb_wait()
|
|
||||||
result = mb.search_releases(query=query, limit=3)
|
|
||||||
releases = result.get("release-list", [])
|
|
||||||
if not releases:
|
|
||||||
return None
|
|
||||||
# Take highest-score release
|
|
||||||
best = max(releases, key=lambda r: int(r.get("ext:score", 0)))
|
|
||||||
score = int(best.get("ext:score", 0))
|
|
||||||
if score < 70:
|
|
||||||
return None
|
|
||||||
return best
|
|
||||||
except Exception as e:
|
|
||||||
print(f" ⚠️ MusicBrainz-Suchfehler: {e}", file=sys.stderr)
|
|
||||||
return None
|
|
||||||
|
|
||||||
|
|
||||||
def _mb_get_release_tracks(release_id: str) -> Optional[List[Dict]]:
|
|
||||||
if not HAS_MB:
|
|
||||||
return None
|
|
||||||
try:
|
|
||||||
_mb_wait()
|
|
||||||
result = mb.get_release_by_id(
|
|
||||||
release_id,
|
|
||||||
includes=["recordings", "artist-credits", "labels", "release-groups"],
|
|
||||||
)
|
|
||||||
return result.get("release")
|
|
||||||
except Exception as e:
|
|
||||||
print(f" ⚠️ MusicBrainz-Release-Fehler: {e}", file=sys.stderr)
|
|
||||||
return None
|
|
||||||
|
|
||||||
|
|
||||||
def _mb_recording_to_release(recording_mbid: str) -> Optional[Dict]:
|
|
||||||
if not HAS_MB:
|
|
||||||
return None
|
|
||||||
try:
|
|
||||||
_mb_wait()
|
|
||||||
result = mb.get_recording_by_id(
|
|
||||||
recording_mbid,
|
|
||||||
includes=["releases", "artist-credits", "release-groups"],
|
|
||||||
)
|
|
||||||
rec = result.get("recording", {})
|
|
||||||
releases = rec.get("release-list", [])
|
|
||||||
if releases:
|
|
||||||
return releases[0]
|
|
||||||
return None
|
|
||||||
except Exception as e:
|
|
||||||
print(f" ⚠️ MusicBrainz-Recording-Fehler: {e}", file=sys.stderr)
|
|
||||||
return None
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# Discogs fallback
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
def _discogs_search(artist: Optional[str], album: Optional[str]) -> Optional[Dict]:
|
|
||||||
if not HAS_DISCOGS or not DISCOGS_TOKEN:
|
|
||||||
return None
|
|
||||||
try:
|
|
||||||
client = dc.Client("MusicMetadataEnricher/1.0", user_token=DISCOGS_TOKEN)
|
|
||||||
results = client.search(
|
|
||||||
album or artist or "",
|
|
||||||
artist=artist or "",
|
|
||||||
type="release",
|
|
||||||
)
|
|
||||||
if results.count:
|
|
||||||
r = results[0]
|
|
||||||
return {
|
|
||||||
"album": r.title,
|
|
||||||
"artist": r.artists[0].name if r.artists else None,
|
|
||||||
"year": str(r.year) if r.year else None,
|
|
||||||
"genre": r.genres[0] if r.genres else None,
|
|
||||||
"label": r.labels[0].name if r.labels else None,
|
|
||||||
"id": r.id,
|
|
||||||
}
|
|
||||||
except Exception as e:
|
|
||||||
print(f" ⚠️ Discogs-Fehler: {e}", file=sys.stderr)
|
|
||||||
return None
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# Claude API reasoning (optional)
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
def _build_resolve_prompt(hints: AlbumHints, partial: Dict) -> str:
|
|
||||||
tracks_summary = "\n".join(
|
|
||||||
f" - {('D'+str(t.disc_number)+'-') if t.disc_number else ''}T{t.track_number or '?'}: "
|
|
||||||
f"{t.title or t.path.stem}"
|
|
||||||
+ (f" [{t.artist}]" if t.artist else "")
|
|
||||||
for t in hints.tracks[:20]
|
|
||||||
)
|
|
||||||
# Tracklist-Kopfzeilen (erste 400 Zeichen, vor der Track-Liste) für Album/Label-Info
|
|
||||||
tracklist_header = ""
|
|
||||||
if hints.tracklist_text:
|
|
||||||
header_lines = []
|
|
||||||
for line in hints.tracklist_text.splitlines():
|
|
||||||
line = line.strip()
|
|
||||||
if not line:
|
|
||||||
continue
|
|
||||||
# Stopp bei erster Zeile die wie ein Track aussieht (1-1, 1. etc.)
|
|
||||||
if re.match(r"^\d[\d\-]\s+\S", line) or re.match(r"^\d{1,3}[.)]\s+", line):
|
|
||||||
break
|
|
||||||
header_lines.append(line)
|
|
||||||
if sum(len(l) for l in header_lines) > 400:
|
|
||||||
break
|
|
||||||
tracklist_header = "\n".join(header_lines[:15])
|
|
||||||
|
|
||||||
return (
|
|
||||||
"Du bist ein Musikexperte. Analysiere diese Album-Daten und gib korrekte Metadaten zurück.\n"
|
|
||||||
"Korrigiere auch erkennbare Tippfehler (Verzeichnisnamen enthalten oft Schreibfehler).\n\n"
|
|
||||||
"WICHTIGE FELDDEFINITIONEN:\n"
|
|
||||||
'- "artist" = Komponist (Klassik) ODER Band/Sänger (Pop/Rock/Jazz)\n'
|
|
||||||
'- "albumartist" = Interpret/Performer/Dirigent (Klassik) ODER gleich wie artist (Pop)\n'
|
|
||||||
" Beispiel Klassik: artist='Johann Sebastian Bach', albumartist='Peter Hurford'\n"
|
|
||||||
" Beispiel Pop: artist='ABBA', albumartist='ABBA'\n\n"
|
|
||||||
f"Verzeichnisname: {hints.album_dir.name}\n"
|
|
||||||
f"Hinweis Künstler/Titel (aus Verzeichnis, kann vertauscht oder falsch sein): "
|
|
||||||
f"{hints.dir_artist or '?'} / {hints.dir_album or partial.get('album', '?')}\n"
|
|
||||||
f"Jahr: {hints.dir_year or partial.get('year', 'unbekannt')}\n"
|
|
||||||
+ (f"YouTube-Videotitel: {hints.yt_title}\n" if hints.yt_title else "")
|
|
||||||
+ (f"YouTube-Uploader/Kanal: {hints.yt_uploader}\n" if hints.yt_uploader else "")
|
|
||||||
+ (f"Tracklist-Kopf (Label/Jahr/Albumtitel):\n{tracklist_header}\n\n" if tracklist_header else "")
|
|
||||||
+ f"Tracks:\n{tracks_summary}\n\n"
|
|
||||||
'Antworte NUR mit einem JSON-Objekt (null wenn unbekannt):\n'
|
|
||||||
'{"artist": ..., "album": ..., "albumartist": ..., "year": ..., "genre": ..., "label": ...}'
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def _parse_json_response(text: str) -> Optional[Dict]:
|
|
||||||
import json, re
|
|
||||||
m = re.search(r"\{.*\}", text, re.DOTALL)
|
|
||||||
if m:
|
|
||||||
try:
|
|
||||||
return json.loads(m.group())
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
return None
|
|
||||||
|
|
||||||
|
|
||||||
def _resolve_via_ollama(hints: AlbumHints, partial: Dict) -> Optional[Dict]:
|
|
||||||
"""Lokales Reasoning via Ollama (kein API-Key nötig)."""
|
|
||||||
import urllib.request, json
|
|
||||||
prompt = _build_resolve_prompt(hints, partial)
|
|
||||||
payload = json.dumps({
|
|
||||||
"model": OLLAMA_RESOLVE_MODEL,
|
|
||||||
"messages": [{"role": "user", "content": prompt}],
|
|
||||||
"stream": False,
|
|
||||||
"format": "json",
|
|
||||||
"options": {"temperature": 0.1},
|
|
||||||
}).encode()
|
|
||||||
try:
|
|
||||||
req = urllib.request.Request(
|
|
||||||
f"{OLLAMA_HOST}/api/chat",
|
|
||||||
data=payload,
|
|
||||||
headers={"Content-Type": "application/json"},
|
|
||||||
method="POST",
|
|
||||||
)
|
|
||||||
with urllib.request.urlopen(req, timeout=240) as resp:
|
|
||||||
data = json.loads(resp.read())
|
|
||||||
text = data.get("message", {}).get("content", "").strip()
|
|
||||||
return _parse_json_response(text)
|
|
||||||
except Exception as e:
|
|
||||||
print(f" ⚠️ Ollama-Resolve-Fehler: {e}", file=sys.stderr)
|
|
||||||
return None
|
|
||||||
|
|
||||||
|
|
||||||
def _resolve_via_openrouter(hints: AlbumHints, partial: Dict) -> Optional[Dict]:
|
|
||||||
"""Reasoning via OpenRouter (günstige chinesische Modelle bevorzugt)."""
|
|
||||||
if not OPENROUTER_API_KEY:
|
|
||||||
return None
|
|
||||||
import urllib.request, json
|
|
||||||
prompt = _build_resolve_prompt(hints, partial)
|
|
||||||
# DeepSeek V3: extrem günstig, sehr kompetent
|
|
||||||
model = "deepseek/deepseek-chat-v3-0324"
|
|
||||||
payload = json.dumps({
|
|
||||||
"model": model,
|
|
||||||
"messages": [{"role": "user", "content": prompt}],
|
|
||||||
"temperature": 0.1,
|
|
||||||
"max_tokens": 300,
|
|
||||||
}).encode()
|
|
||||||
try:
|
|
||||||
req = urllib.request.Request(
|
|
||||||
"https://openrouter.ai/api/v1/chat/completions",
|
|
||||||
data=payload,
|
|
||||||
headers={
|
|
||||||
"Content-Type": "application/json",
|
|
||||||
"Authorization": f"Bearer {OPENROUTER_API_KEY}",
|
|
||||||
"HTTP-Referer": "https://pi.local",
|
|
||||||
"X-Title": "MusicMetadataEnricher",
|
|
||||||
},
|
|
||||||
method="POST",
|
|
||||||
)
|
|
||||||
with urllib.request.urlopen(req, timeout=30) as resp:
|
|
||||||
data = json.loads(resp.read())
|
|
||||||
text = data["choices"][0]["message"]["content"].strip()
|
|
||||||
return _parse_json_response(text)
|
|
||||||
except Exception as e:
|
|
||||||
print(f" ⚠️ OpenRouter-Resolve-Fehler: {e}", file=sys.stderr)
|
|
||||||
return None
|
|
||||||
|
|
||||||
|
|
||||||
def _claude_resolve(hints: AlbumHints, partial: Dict) -> Optional[Dict]:
|
|
||||||
"""
|
|
||||||
Reihenfolge: Ollama (lokal, kostenlos) → OpenRouter (günstig).
|
|
||||||
Claude API wird bewusst nicht genutzt (zu teuer).
|
|
||||||
"""
|
|
||||||
# 1. Ollama lokal (bevorzugt — kostenlos, RTX 3090)
|
|
||||||
result = _resolve_via_ollama(hints, partial)
|
|
||||||
if result:
|
|
||||||
return result
|
|
||||||
|
|
||||||
# 2. OpenRouter (DeepSeek V3, günstig) wenn Key gesetzt
|
|
||||||
if OPENROUTER_API_KEY:
|
|
||||||
result = _resolve_via_openrouter(hints, partial)
|
|
||||||
if result:
|
|
||||||
return result
|
|
||||||
|
|
||||||
return None
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# Main resolver
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
def resolve(
|
|
||||||
hints: AlbumHints,
|
|
||||||
use_fingerprint: bool = True,
|
|
||||||
use_api: bool = True,
|
|
||||||
use_claude: bool = True,
|
|
||||||
) -> AlbumProposal:
|
|
||||||
confidence = 0.0
|
|
||||||
sources: List[str] = []
|
|
||||||
notes: List[str] = []
|
|
||||||
|
|
||||||
artist = hints.dir_artist
|
|
||||||
album = hints.dir_album
|
|
||||||
year = hints.dir_year
|
|
||||||
genre: Optional[str] = None
|
|
||||||
label: Optional[str] = None
|
|
||||||
release_mbid: Optional[str] = None
|
|
||||||
mb_tracks: Optional[List] = None
|
|
||||||
|
|
||||||
# Collect artist/album from existing tags (majority vote)
|
|
||||||
tag_artists = [t.existing_tags.get("artist") for t in hints.tracks if t.existing_tags.get("artist")]
|
|
||||||
tag_albums = [t.existing_tags.get("album") for t in hints.tracks if t.existing_tags.get("album")]
|
|
||||||
if tag_artists:
|
|
||||||
from collections import Counter
|
|
||||||
artist = artist or Counter(tag_artists).most_common(1)[0][0]
|
|
||||||
if tag_albums:
|
|
||||||
from collections import Counter
|
|
||||||
album = album or Counter(tag_albums).most_common(1)[0][0]
|
|
||||||
|
|
||||||
# Tag year/genre/label
|
|
||||||
import re as _re
|
|
||||||
for t in hints.tracks:
|
|
||||||
raw_year = t.existing_tags.get("date") or t.existing_tags.get("year")
|
|
||||||
if raw_year and not year:
|
|
||||||
# Strip invisible chars so ID3TimeStamp validation doesn't fail later
|
|
||||||
year = _re.sub(r"[^\d\-T:+Z]", "", str(raw_year)).strip()[:10] or None
|
|
||||||
genre = genre or t.existing_tags.get("genre")
|
|
||||||
label = label or t.existing_tags.get("label") or t.existing_tags.get("organization")
|
|
||||||
|
|
||||||
# YouTube-Metadaten als zusätzliche Hinweise (Uploader → Künstler, Titel → Album/Track)
|
|
||||||
if hints.yt_uploader and not artist:
|
|
||||||
artist = hints.yt_uploader
|
|
||||||
if hints.yt_title and not album:
|
|
||||||
album = hints.yt_title
|
|
||||||
|
|
||||||
if artist or album:
|
|
||||||
confidence += 0.05
|
|
||||||
sources.append("local-hints")
|
|
||||||
if hints.yt_title or hints.yt_uploader:
|
|
||||||
sources.append("youtube")
|
|
||||||
|
|
||||||
# AcoustID fingerprinting
|
|
||||||
fp_mbids: Dict[str, List[str]] = {}
|
|
||||||
if use_fingerprint and use_api and HAS_ACOUSTID and ACOUSTID_API_KEY:
|
|
||||||
fp_mbids = _fingerprint_tracks(hints)
|
|
||||||
if fp_mbids:
|
|
||||||
confidence += 0.20
|
|
||||||
sources.append("acoustid")
|
|
||||||
# Try to get release from first matched recording
|
|
||||||
for mbids in fp_mbids.values():
|
|
||||||
for mbid in mbids[:1]:
|
|
||||||
rel = _mb_recording_to_release(mbid)
|
|
||||||
if rel:
|
|
||||||
release_mbid = rel.get("id")
|
|
||||||
confidence += 0.25
|
|
||||||
sources.append("musicbrainz-fingerprint")
|
|
||||||
break
|
|
||||||
if release_mbid:
|
|
||||||
break
|
|
||||||
|
|
||||||
# MusicBrainz text search
|
|
||||||
if use_api and HAS_MB and not release_mbid:
|
|
||||||
mb_result = _mb_search_release(artist, album, year)
|
|
||||||
if mb_result:
|
|
||||||
release_mbid = mb_result.get("id")
|
|
||||||
score = int(mb_result.get("ext:score", 0))
|
|
||||||
confidence += 0.30 * (score / 100)
|
|
||||||
sources.append("musicbrainz-text")
|
|
||||||
notes.append(f"MusicBrainz score: {score}")
|
|
||||||
|
|
||||||
# Fetch full release data
|
|
||||||
if use_api and release_mbid:
|
|
||||||
full_release = _mb_get_release_tracks(release_mbid)
|
|
||||||
if full_release:
|
|
||||||
if not artist:
|
|
||||||
creds = full_release.get("artist-credit", [])
|
|
||||||
artist = "".join(c.get("artist", {}).get("name", "") + c.get("joinphrase", "")
|
|
||||||
for c in creds if isinstance(c, dict)).strip() or artist
|
|
||||||
if not album:
|
|
||||||
album = full_release.get("title", album)
|
|
||||||
if not year:
|
|
||||||
year = full_release.get("date", "")[:4] or None
|
|
||||||
label_info = full_release.get("label-info-list", [])
|
|
||||||
if label_info and not label:
|
|
||||||
label = label_info[0].get("label", {}).get("name") if label_info else None
|
|
||||||
rg = full_release.get("release-group", {})
|
|
||||||
if not genre:
|
|
||||||
genre = (rg.get("primary-type") or "").strip() or None
|
|
||||||
mb_tracks = []
|
|
||||||
for medium in full_release.get("medium-list", []):
|
|
||||||
disc_num = medium.get("position", 1)
|
|
||||||
for track in medium.get("track-list", []):
|
|
||||||
mb_tracks.append({
|
|
||||||
"disc": disc_num,
|
|
||||||
"number": int(track.get("number", 0) or 0),
|
|
||||||
"title": track.get("recording", {}).get("title", ""),
|
|
||||||
"artist": track.get("artist-credit-phrase", ""),
|
|
||||||
"mbid": track.get("recording", {}).get("id"),
|
|
||||||
})
|
|
||||||
|
|
||||||
# Discogs fallback
|
|
||||||
if use_api and HAS_DISCOGS and DISCOGS_TOKEN and not release_mbid:
|
|
||||||
dg = _discogs_search(artist, album)
|
|
||||||
if dg:
|
|
||||||
artist = artist or dg.get("artist")
|
|
||||||
album = album or dg.get("album")
|
|
||||||
year = year or dg.get("year")
|
|
||||||
genre = genre or dg.get("genre")
|
|
||||||
label = label or dg.get("label")
|
|
||||||
confidence += 0.15
|
|
||||||
sources.append("discogs")
|
|
||||||
|
|
||||||
# LLM-Reasoning für verbleibende Lücken:
|
|
||||||
# Reihenfolge: Ollama lokal → OpenRouter (DeepSeek, günstig) → Claude API
|
|
||||||
cl_albumartist: Optional[str] = None
|
|
||||||
partial = {"artist": artist, "album": album, "year": year}
|
|
||||||
if use_claude and use_api:
|
|
||||||
if not artist or not album or confidence < 0.5:
|
|
||||||
cl = _claude_resolve(hints, partial)
|
|
||||||
if cl:
|
|
||||||
if confidence < 0.3:
|
|
||||||
# Sehr unsicher: LLM darf auch bestehende Werte korrigieren
|
|
||||||
# (z.B. Tippfehler im Albumtitel aus dem Verzeichnisnamen)
|
|
||||||
artist = cl.get("artist") or artist
|
|
||||||
album = cl.get("album") or album
|
|
||||||
year = cl.get("year") or year
|
|
||||||
genre = cl.get("genre") or genre
|
|
||||||
label = cl.get("label") or label
|
|
||||||
else:
|
|
||||||
artist = artist or cl.get("artist")
|
|
||||||
album = album or cl.get("album")
|
|
||||||
year = year or cl.get("year")
|
|
||||||
genre = genre or cl.get("genre")
|
|
||||||
label = label or cl.get("label")
|
|
||||||
cl_albumartist = cl.get("albumartist") or None
|
|
||||||
confidence += 0.10
|
|
||||||
sources.append("llm-resolve")
|
|
||||||
|
|
||||||
# Finalize albumartist
|
|
||||||
# Priorität: (1) LLM-albumartist bei niedriger Konfidenz
|
|
||||||
# (2) dir_artist wenn Verzeichnisname einen Künstler nennt
|
|
||||||
# (3) Heuristiken (Various Artists, Mehrheitsabstimmung)
|
|
||||||
# Rationale: "Bach_Organ_-_Peter_Hurford" → dir_artist="Bach Organ" ist kein Künstler,
|
|
||||||
# aber der Verzeichnisname sieht aus wie Künstler; LLM kann das korrekt auflösen.
|
|
||||||
track_artists = [t.artist for t in hints.tracks if t.artist]
|
|
||||||
from collections import Counter
|
|
||||||
distinct_artists = set(a for a in track_artists if a)
|
|
||||||
|
|
||||||
_bad_aa = {"various artists", "unknown artist", "unknown", "va"}
|
|
||||||
def _good_aa(s: Optional[str]) -> bool:
|
|
||||||
return bool(s) and s.casefold().strip() not in _bad_aa
|
|
||||||
|
|
||||||
if _good_aa(cl_albumartist) and confidence < 0.4:
|
|
||||||
# LLM kennt den echten Albumkünstler besser als der Verzeichnisname
|
|
||||||
albumartist = cl_albumartist # type: ignore[assignment]
|
|
||||||
elif hints.dir_artist:
|
|
||||||
albumartist = hints.dir_artist
|
|
||||||
elif len(distinct_artists) >= 3:
|
|
||||||
albumartist = "Various Artists"
|
|
||||||
elif track_artists:
|
|
||||||
albumartist = artist or Counter(track_artists).most_common(1)[0][0]
|
|
||||||
else:
|
|
||||||
albumartist = artist or "Unknown Artist"
|
|
||||||
|
|
||||||
album = album or hints.album_dir.name.replace("_", " ")
|
|
||||||
artist = artist or albumartist
|
|
||||||
confidence = min(confidence, 1.0)
|
|
||||||
|
|
||||||
# Build track proposals
|
|
||||||
# `artist` = Komponist/Hauptkünstler (LLM-aufgelöst), `albumartist` = Performer
|
|
||||||
# Werden beide weitergegeben damit _build_track_proposals richtig zuordnen kann.
|
|
||||||
track_proposals = _build_track_proposals(hints, mb_tracks, album, albumartist, composer=artist)
|
|
||||||
|
|
||||||
return AlbumProposal(
|
|
||||||
album_dir=hints.album_dir,
|
|
||||||
album=album,
|
|
||||||
albumartist=albumartist,
|
|
||||||
date=year,
|
|
||||||
genre=genre,
|
|
||||||
label=label,
|
|
||||||
mbid=release_mbid,
|
|
||||||
cover_path=None,
|
|
||||||
cover_source=None,
|
|
||||||
tracks=track_proposals,
|
|
||||||
confidence=confidence,
|
|
||||||
sources=sources,
|
|
||||||
notes=notes,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def _build_track_proposals(
|
|
||||||
hints: AlbumHints,
|
|
||||||
mb_tracks: Optional[List],
|
|
||||||
album: str,
|
|
||||||
album_artist: str,
|
|
||||||
composer: Optional[str] = None,
|
|
||||||
) -> List[TrackProposal]:
|
|
||||||
proposals: List[TrackProposal] = []
|
|
||||||
|
|
||||||
for th in sorted(hints.tracks, key=lambda t: (t.disc_number or 1, t.track_number or 9999, str(t.path))):
|
|
||||||
title = th.title
|
|
||||||
track_num = th.track_number
|
|
||||||
disc_num = th.disc_number
|
|
||||||
|
|
||||||
# Klassik-Fall: Performer aus Dateiname, Komponist aus LLM
|
|
||||||
# Wenn th.artist == albumartist (Performer), und wir den Komponisten kennen,
|
|
||||||
# wird der Komponist als Track-Artist gesetzt → Filename: TT_-_Performer_-_Komponist_-_Werk
|
|
||||||
th_artist_cf = (th.artist or "").casefold().strip()
|
|
||||||
aa_cf = album_artist.casefold().strip()
|
|
||||||
if composer and th_artist_cf == aa_cf and th_artist_cf:
|
|
||||||
# Performer == albumartist → Komponist als Track-Artist
|
|
||||||
artist = composer
|
|
||||||
else:
|
|
||||||
artist = th.artist or album_artist
|
|
||||||
|
|
||||||
# Try to match from MusicBrainz track list
|
|
||||||
if mb_tracks and track_num:
|
|
||||||
for mb_t in mb_tracks:
|
|
||||||
if mb_t["number"] == track_num and mb_t["disc"] == (disc_num or 1):
|
|
||||||
if mb_t.get("title"):
|
|
||||||
title = mb_t["title"]
|
|
||||||
if mb_t.get("artist"):
|
|
||||||
artist = mb_t["artist"]
|
|
||||||
break
|
|
||||||
|
|
||||||
title = title or th.path.stem
|
|
||||||
|
|
||||||
proposals.append(TrackProposal(
|
|
||||||
path=th.path,
|
|
||||||
title=title,
|
|
||||||
artist=artist,
|
|
||||||
track_number=track_num,
|
|
||||||
disc_number=disc_num,
|
|
||||||
mbid=None,
|
|
||||||
))
|
|
||||||
|
|
||||||
# Sequenzielle Nummerierung als letzter Fallback:
|
|
||||||
# Tracks ohne Nummer (None) erhalten eine laufende Nummer pro Disc.
|
|
||||||
# Damit werden "00" und "??" im Dateinamen beim --rename verhindert.
|
|
||||||
if any(p.track_number is None for p in proposals):
|
|
||||||
disc_counters: Dict[int, int] = {}
|
|
||||||
for p in proposals:
|
|
||||||
if p.track_number is None:
|
|
||||||
disc = p.disc_number or 1
|
|
||||||
disc_counters[disc] = disc_counters.get(disc, 0) + 1
|
|
||||||
p.track_number = disc_counters[disc]
|
|
||||||
|
|
||||||
return proposals
|
|
||||||
84
models.py
84
models.py
|
|
@ -1,84 +0,0 @@
|
||||||
from __future__ import annotations
|
|
||||||
|
|
||||||
from dataclasses import dataclass, field
|
|
||||||
from pathlib import Path
|
|
||||||
from typing import Optional, List, Dict
|
|
||||||
|
|
||||||
|
|
||||||
AUDIO_EXTENSIONS = {
|
|
||||||
".mp3", ".flac", ".m4a", ".aac", ".ogg", ".opus",
|
|
||||||
".wav", ".wma", ".aiff", ".ape",
|
|
||||||
}
|
|
||||||
IMAGE_EXTENSIONS = {".jpg", ".jpeg", ".png", ".webp", ".bmp", ".gif"}
|
|
||||||
TRACKLIST_EXTENSIONS = {".txt", ".htm", ".html", ".nfo"}
|
|
||||||
PLAYLIST_EXTENSIONS = {".m3u", ".m3u8", ".pls"}
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
|
||||||
class ScannedFile:
|
|
||||||
path: Path
|
|
||||||
kind: str # "audio" | "image" | "tracklist" | "playlist" | "other"
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
|
||||||
class AlbumScan:
|
|
||||||
album_dir: Path
|
|
||||||
audio_files: List[Path] = field(default_factory=list)
|
|
||||||
image_files: List[Path] = field(default_factory=list)
|
|
||||||
tracklist_files: List[Path] = field(default_factory=list)
|
|
||||||
playlist_files: List[Path] = field(default_factory=list) # .m3u / .m3u8 / .pls
|
|
||||||
other_files: List[Path] = field(default_factory=list)
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
|
||||||
class TrackHints:
|
|
||||||
path: Path
|
|
||||||
track_number: Optional[int] = None
|
|
||||||
disc_number: Optional[int] = None
|
|
||||||
title: Optional[str] = None
|
|
||||||
artist: Optional[str] = None
|
|
||||||
duration: Optional[float] = None
|
|
||||||
existing_tags: Dict[str, str] = field(default_factory=dict)
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
|
||||||
class AlbumHints:
|
|
||||||
album_dir: Path
|
|
||||||
dir_artist: Optional[str] = None
|
|
||||||
dir_album: Optional[str] = None
|
|
||||||
dir_year: Optional[str] = None
|
|
||||||
tracklist_text: Optional[str] = None # merged text from all tracklist files
|
|
||||||
cover_images: List[Path] = field(default_factory=list)
|
|
||||||
tracks: List[TrackHints] = field(default_factory=list)
|
|
||||||
yt_title: Optional[str] = None # YouTube video title (if found)
|
|
||||||
yt_uploader: Optional[str] = None # YouTube channel/uploader name
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
|
||||||
class TrackProposal:
|
|
||||||
path: Path
|
|
||||||
title: str
|
|
||||||
artist: str
|
|
||||||
track_number: Optional[int]
|
|
||||||
disc_number: Optional[int]
|
|
||||||
new_filename: Optional[str] = None # only set when --rename is active
|
|
||||||
mbid: Optional[str] = None
|
|
||||||
conductor: Optional[str] = None # classical: Dirigent
|
|
||||||
orchestra: Optional[str] = None # classical: Orchester / Ensemble
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
|
||||||
class AlbumProposal:
|
|
||||||
album_dir: Path
|
|
||||||
album: str
|
|
||||||
albumartist: str
|
|
||||||
date: Optional[str]
|
|
||||||
genre: Optional[str]
|
|
||||||
label: Optional[str]
|
|
||||||
mbid: Optional[str] # MusicBrainz release ID
|
|
||||||
cover_path: Optional[Path] # resolved local or downloaded cover
|
|
||||||
cover_source: Optional[str] # "local" | "musicbrainz" | "discogs"
|
|
||||||
tracks: List[TrackProposal]
|
|
||||||
confidence: float
|
|
||||||
sources: List[str] = field(default_factory=list)
|
|
||||||
notes: List[str] = field(default_factory=list)
|
|
||||||
|
|
@ -1,269 +0,0 @@
|
||||||
#!/usr/bin/env python3
|
|
||||||
"""
|
|
||||||
music_enricher.py
|
|
||||||
KI-gestützter Musik-Metadaten-Enricher für Jellyfin-Bibliotheken.
|
|
||||||
|
|
||||||
Pipeline pro Album:
|
|
||||||
Scan → HintExtractor → MetadataResolver → CoverHandler → Review → Executor
|
|
||||||
"""
|
|
||||||
from __future__ import annotations
|
|
||||||
|
|
||||||
import argparse
|
|
||||||
import os
|
|
||||||
import sys
|
|
||||||
from pathlib import Path
|
|
||||||
from typing import Any, Dict, List, Optional
|
|
||||||
|
|
||||||
try:
|
|
||||||
from tqdm import tqdm
|
|
||||||
HAS_TQDM = True
|
|
||||||
except ImportError:
|
|
||||||
HAS_TQDM = False
|
|
||||||
|
|
||||||
from models import AlbumProposal
|
|
||||||
from scanner import scan_album, collect_album_dirs
|
|
||||||
from hint_extractor import extract_hints
|
|
||||||
from metadata_resolver import resolve
|
|
||||||
from cover_handler import resolve_cover
|
|
||||||
from executor import execute_album, write_report
|
|
||||||
|
|
||||||
|
|
||||||
def maybe_tqdm(iterable, show: bool, **kwargs):
|
|
||||||
return tqdm(iterable, **kwargs) if show else iterable
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# Review / Display
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
def _print_proposal(proposal: AlbumProposal) -> None:
|
|
||||||
conf_bar = "█" * int(proposal.confidence * 10) + "░" * (10 - int(proposal.confidence * 10))
|
|
||||||
print(f"\n{'─' * 60}")
|
|
||||||
print(f"💿 {proposal.album_dir.name}")
|
|
||||||
print(f" Album: {proposal.album}")
|
|
||||||
print(f" Artist: {proposal.albumartist}")
|
|
||||||
print(f" Jahr: {proposal.date or '–'}")
|
|
||||||
print(f" Genre: {proposal.genre or '–'}")
|
|
||||||
print(f" Label: {proposal.label or '–'}")
|
|
||||||
print(f" Cover: {proposal.cover_source or '–'} ({proposal.cover_path.name if proposal.cover_path else 'keins'})")
|
|
||||||
print(f" Konfidenz: [{conf_bar}] {proposal.confidence:.0%} Quellen: {', '.join(proposal.sources) or '–'}")
|
|
||||||
if proposal.notes:
|
|
||||||
for n in proposal.notes:
|
|
||||||
print(f" ℹ️ {n}")
|
|
||||||
print(f" Tracks ({len(proposal.tracks)}):")
|
|
||||||
for tp in proposal.tracks[:8]:
|
|
||||||
tn = f"{tp.disc_number}-{tp.track_number:02d}" if tp.disc_number and tp.disc_number > 1 else (
|
|
||||||
f"{tp.track_number:02d}" if tp.track_number else "??")
|
|
||||||
print(f" {tn} {tp.artist} – {tp.title}")
|
|
||||||
if len(proposal.tracks) > 8:
|
|
||||||
print(f" … und {len(proposal.tracks) - 8} weitere")
|
|
||||||
|
|
||||||
|
|
||||||
def _interactive_review(proposal: AlbumProposal) -> bool:
|
|
||||||
"""Returns True if user accepts the proposal."""
|
|
||||||
_print_proposal(proposal)
|
|
||||||
while True:
|
|
||||||
answer = input("\n [Enter] Akzeptieren [s] Überspringen [q] Abbrechen: ").strip().lower()
|
|
||||||
if answer in ("", "j", "y"):
|
|
||||||
return True
|
|
||||||
if answer == "s":
|
|
||||||
return False
|
|
||||||
if answer == "q":
|
|
||||||
sys.exit(0)
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# Main pipeline
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
def process_album(
|
|
||||||
album_dir: Path,
|
|
||||||
args: argparse.Namespace,
|
|
||||||
report_data: List[Dict[str, Any]],
|
|
||||||
) -> Dict[str, int]:
|
|
||||||
stats = {"tags_written": 0, "covers_embedded": 0, "files_renamed": 0,
|
|
||||||
"errors": 0, "skipped": 0}
|
|
||||||
|
|
||||||
try:
|
|
||||||
scan = scan_album(album_dir)
|
|
||||||
if not scan.audio_files:
|
|
||||||
stats["skipped"] += 1
|
|
||||||
return stats
|
|
||||||
|
|
||||||
hints = extract_hints(scan, use_ocr=not args.no_api)
|
|
||||||
|
|
||||||
proposal = resolve(
|
|
||||||
hints,
|
|
||||||
use_fingerprint=not args.no_fingerprint,
|
|
||||||
use_api=not args.no_api,
|
|
||||||
use_claude=not args.no_api,
|
|
||||||
)
|
|
||||||
|
|
||||||
# Cover art
|
|
||||||
cover_path, cover_source = resolve_cover(
|
|
||||||
hints.cover_images,
|
|
||||||
proposal.mbid,
|
|
||||||
album_dir,
|
|
||||||
)
|
|
||||||
if cover_path and not args.no_cover:
|
|
||||||
proposal.cover_path = cover_path
|
|
||||||
proposal.cover_source = cover_source
|
|
||||||
|
|
||||||
# Set proposed filenames if --rename
|
|
||||||
if args.rename:
|
|
||||||
from executor import _proposed_filename
|
|
||||||
for tp in proposal.tracks:
|
|
||||||
tp.new_filename = _proposed_filename(tp, tp.path.suffix)
|
|
||||||
|
|
||||||
# Review step
|
|
||||||
if args.dry_run:
|
|
||||||
_print_proposal(proposal)
|
|
||||||
for tp in proposal.tracks:
|
|
||||||
report_data.append({
|
|
||||||
"status": "dry-run",
|
|
||||||
"album_dir": str(album_dir.name),
|
|
||||||
"track_path": str(tp.path),
|
|
||||||
"old_title": tp.path.stem,
|
|
||||||
"new_title": tp.title,
|
|
||||||
"old_artist": "",
|
|
||||||
"new_artist": tp.artist,
|
|
||||||
"album": proposal.album,
|
|
||||||
"albumartist": proposal.albumartist,
|
|
||||||
"date": proposal.date or "",
|
|
||||||
"genre": proposal.genre or "",
|
|
||||||
"label": proposal.label or "",
|
|
||||||
"track_number": tp.track_number or "",
|
|
||||||
"disc_number": tp.disc_number or "",
|
|
||||||
"cover_embedded": False,
|
|
||||||
"renamed_to": tp.new_filename or "",
|
|
||||||
"confidence": f"{proposal.confidence:.2f}",
|
|
||||||
"sources": ", ".join(proposal.sources),
|
|
||||||
})
|
|
||||||
return stats
|
|
||||||
|
|
||||||
accepted = True
|
|
||||||
if not args.auto:
|
|
||||||
accepted = _interactive_review(proposal)
|
|
||||||
elif args.auto and proposal.confidence < args.confidence:
|
|
||||||
print(f" ⏭️ Konfidenz {proposal.confidence:.0%} < {args.confidence:.0%} → übersprungen: {album_dir.name}")
|
|
||||||
stats["skipped"] += 1
|
|
||||||
return stats
|
|
||||||
else:
|
|
||||||
_print_proposal(proposal)
|
|
||||||
|
|
||||||
if not accepted:
|
|
||||||
stats["skipped"] += 1
|
|
||||||
return stats
|
|
||||||
|
|
||||||
album_stats = execute_album(
|
|
||||||
proposal=proposal,
|
|
||||||
backup_dir=args.backup,
|
|
||||||
do_rename=args.rename,
|
|
||||||
embed_cover_art=args.embed_cover,
|
|
||||||
dry_run=False,
|
|
||||||
report_data=report_data,
|
|
||||||
)
|
|
||||||
for k, v in album_stats.items():
|
|
||||||
stats[k] = stats.get(k, 0) + v
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
stats["errors"] += 1
|
|
||||||
print(f" ❌ Fehler in {album_dir.name}: {e}", file=sys.stderr)
|
|
||||||
import traceback
|
|
||||||
traceback.print_exc(file=sys.stderr)
|
|
||||||
|
|
||||||
return stats
|
|
||||||
|
|
||||||
|
|
||||||
def main() -> None:
|
|
||||||
parser = argparse.ArgumentParser(
|
|
||||||
description="KI-gestützter Musik-Metadaten-Enricher für Jellyfin",
|
|
||||||
formatter_class=argparse.RawTextHelpFormatter,
|
|
||||||
)
|
|
||||||
parser.add_argument("paths", nargs="*",
|
|
||||||
help="Root-Verzeichnisse (direkte Unterordner = Alben)")
|
|
||||||
parser.add_argument("--album", type=Path,
|
|
||||||
help="Einzelnes Album-Verzeichnis verarbeiten")
|
|
||||||
parser.add_argument("--dry-run", action="store_true",
|
|
||||||
help="Vorschläge anzeigen, nichts schreiben")
|
|
||||||
parser.add_argument("--auto", action="store_true",
|
|
||||||
help="Kein interaktiver Review-Schritt")
|
|
||||||
parser.add_argument("--confidence", type=float, default=0.85,
|
|
||||||
help="Min-Konfidenz für --auto (default: 0.85)")
|
|
||||||
parser.add_argument("--rename", action="store_true",
|
|
||||||
help="Dateien nach Schema umbenennen: TT - Artist - Titel.ext")
|
|
||||||
parser.add_argument("--embed-cover", action="store_true",
|
|
||||||
help="Cover-Art in Audiodatei einbetten")
|
|
||||||
parser.add_argument("--backup", type=Path,
|
|
||||||
help="Backup-Verzeichnis vor Änderungen")
|
|
||||||
parser.add_argument("--report", type=Path,
|
|
||||||
help="CSV-Report der Änderungen")
|
|
||||||
parser.add_argument("--no-fingerprint", action="store_true",
|
|
||||||
help="AcoustID-Fingerprinting überspringen")
|
|
||||||
parser.add_argument("--no-api", action="store_true",
|
|
||||||
help="Keine externen API-Calls")
|
|
||||||
parser.add_argument("--no-cover", action="store_true",
|
|
||||||
help="Kein Cover-Art-Download")
|
|
||||||
parser.add_argument("--no-tqdm", action="store_true",
|
|
||||||
help="Fortschrittsanzeige deaktivieren")
|
|
||||||
|
|
||||||
args = parser.parse_args()
|
|
||||||
|
|
||||||
if not args.album and not args.paths:
|
|
||||||
parser.error("Mindestens ein Pfad oder --album erforderlich.")
|
|
||||||
|
|
||||||
show_progress = HAS_TQDM and not args.no_tqdm and args.auto
|
|
||||||
report_data: List[Dict[str, Any]] = []
|
|
||||||
totals: Dict[str, int] = {
|
|
||||||
"albums": 0, "skipped": 0, "tags_written": 0,
|
|
||||||
"covers_embedded": 0, "files_renamed": 0, "errors": 0,
|
|
||||||
}
|
|
||||||
|
|
||||||
# Collect album directories
|
|
||||||
album_dirs: List[Path] = []
|
|
||||||
if args.album:
|
|
||||||
album_dirs.append(args.album.expanduser().resolve())
|
|
||||||
for raw in args.paths:
|
|
||||||
root = Path(raw).expanduser().resolve()
|
|
||||||
if not root.is_dir():
|
|
||||||
print(f"⚠️ Kein Verzeichnis: {root}")
|
|
||||||
continue
|
|
||||||
album_dirs.extend(collect_album_dirs(root))
|
|
||||||
|
|
||||||
if not album_dirs:
|
|
||||||
print("⚠️ Keine Album-Verzeichnisse gefunden.")
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
print(f"🎵 {len(album_dirs)} Album-Verzeichnisse gefunden.")
|
|
||||||
if os.getenv("OLLAMA_HOST") or True: # Ollama always attempted
|
|
||||||
print("🤖 LLM-Resolve: Ollama → OpenRouter (kein Claude)")
|
|
||||||
if not args.no_api:
|
|
||||||
print("🔍 MusicBrainz-Lookup aktiv.")
|
|
||||||
if args.dry_run:
|
|
||||||
print("🧪 DRY-RUN — nichts wird geschrieben.")
|
|
||||||
|
|
||||||
for album_dir in maybe_tqdm(album_dirs, show_progress,
|
|
||||||
desc="Alben", unit="album", dynamic_ncols=True):
|
|
||||||
stats = process_album(album_dir, args, report_data)
|
|
||||||
totals["albums"] += 1
|
|
||||||
for k in ("skipped", "tags_written", "covers_embedded", "files_renamed", "errors"):
|
|
||||||
totals[k] += stats.get(k, 0)
|
|
||||||
|
|
||||||
if args.report and report_data:
|
|
||||||
write_report(report_data, args.report)
|
|
||||||
|
|
||||||
print(f"\n{'=' * 50}")
|
|
||||||
print("✅ Zusammenfassung:")
|
|
||||||
print(f" 💿 Alben verarbeitet: {totals['albums']}")
|
|
||||||
print(f" ⏭️ Übersprungen: {totals['skipped']}")
|
|
||||||
print(f" 🏷️ Tags geschrieben: {totals['tags_written']}")
|
|
||||||
print(f" 🖼️ Cover eingebettet: {totals['covers_embedded']}")
|
|
||||||
print(f" 📝 Dateien umbenannt: {totals['files_renamed']}")
|
|
||||||
print(f" ❌ Fehler: {totals['errors']}")
|
|
||||||
if args.dry_run:
|
|
||||||
print(" 🧪 Modus: DRY-RUN")
|
|
||||||
print("=" * 50)
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
main()
|
|
||||||
99
scanner.py
99
scanner.py
|
|
@ -1,99 +0,0 @@
|
||||||
from __future__ import annotations
|
|
||||||
|
|
||||||
import re
|
|
||||||
import sys
|
|
||||||
from pathlib import Path
|
|
||||||
from typing import List
|
|
||||||
|
|
||||||
from models import AlbumScan, AUDIO_EXTENSIONS, IMAGE_EXTENSIONS, TRACKLIST_EXTENSIONS, PLAYLIST_EXTENSIONS
|
|
||||||
|
|
||||||
_DISC_DIR_RE = re.compile(r"(?i)^(?:cd|disc|disk|side)[_ \-]*\d{1,2}$")
|
|
||||||
|
|
||||||
|
|
||||||
def _is_hidden(name: str) -> bool:
|
|
||||||
return name.startswith(".") or name.startswith("_")
|
|
||||||
|
|
||||||
|
|
||||||
def _is_disc_dir(name: str) -> bool:
|
|
||||||
"""True für Ordner wie 'CD1', 'Disc 2', 'Side A', 'Disk_1'."""
|
|
||||||
return bool(_DISC_DIR_RE.match(name))
|
|
||||||
|
|
||||||
|
|
||||||
def scan_album(album_dir: Path) -> AlbumScan:
|
|
||||||
"""
|
|
||||||
Scannt ein Album-Verzeichnis.
|
|
||||||
|
|
||||||
Rekursions-Regel:
|
|
||||||
- Hat das Album-Verzeichnis selbst Audio-Dateien → kein Abstieg in Unterordner
|
|
||||||
(Einzelscheibe; Sub-Ordner wie Artworks, Scans, irrtümliche Kopien werden ignoriert).
|
|
||||||
- Hat der Root KEINE Audio-Dateien → Abstieg nur in Disc-Unterordner (CD1, Disc 2 …).
|
|
||||||
"""
|
|
||||||
result = AlbumScan(album_dir=album_dir)
|
|
||||||
|
|
||||||
# Erst nur die Wurzel-Ebene scannen, um zu entscheiden ob rekursiert wird
|
|
||||||
root_has_audio = any(
|
|
||||||
(album_dir / name).suffix.lower() in AUDIO_EXTENSIONS
|
|
||||||
for name in _listdir(album_dir)
|
|
||||||
if not _is_hidden(name)
|
|
||||||
)
|
|
||||||
|
|
||||||
if root_has_audio:
|
|
||||||
# Nur Root-Ebene — keine Unterordner
|
|
||||||
_scan_dir(album_dir, album_dir, result, recurse=False)
|
|
||||||
else:
|
|
||||||
# Kein Audio an der Wurzel → Multi-CD: nur Disc-Unterordner
|
|
||||||
_scan_dir(album_dir, album_dir, result, recurse=True)
|
|
||||||
|
|
||||||
result.audio_files.sort()
|
|
||||||
result.image_files.sort()
|
|
||||||
result.tracklist_files.sort()
|
|
||||||
result.playlist_files.sort()
|
|
||||||
return result
|
|
||||||
|
|
||||||
|
|
||||||
def _listdir(path: Path) -> List[str]:
|
|
||||||
try:
|
|
||||||
return [e.name for e in path.iterdir()]
|
|
||||||
except (PermissionError, OSError) as e:
|
|
||||||
print(f"⚠️ Scan-Fehler: {e}", file=sys.stderr)
|
|
||||||
return []
|
|
||||||
|
|
||||||
|
|
||||||
def _scan_dir(current: Path, album_dir: Path, result: AlbumScan, recurse: bool) -> None:
|
|
||||||
try:
|
|
||||||
entries = sorted(current.iterdir())
|
|
||||||
except (PermissionError, OSError) as e:
|
|
||||||
print(f"⚠️ Scan-Fehler {current}: {e}", file=sys.stderr)
|
|
||||||
return
|
|
||||||
|
|
||||||
for entry in entries:
|
|
||||||
name = entry.name
|
|
||||||
if _is_hidden(name):
|
|
||||||
continue
|
|
||||||
if entry.is_dir():
|
|
||||||
if recurse and _is_disc_dir(name):
|
|
||||||
_scan_dir(entry, album_dir, result, recurse=True)
|
|
||||||
# Andere Unterordner (Artworks, irrtümliche Kopien…) werden übersprungen
|
|
||||||
elif entry.is_file():
|
|
||||||
ext = entry.suffix.lower()
|
|
||||||
if ext in AUDIO_EXTENSIONS:
|
|
||||||
result.audio_files.append(entry)
|
|
||||||
elif ext in IMAGE_EXTENSIONS:
|
|
||||||
result.image_files.append(entry)
|
|
||||||
elif ext in TRACKLIST_EXTENSIONS:
|
|
||||||
result.tracklist_files.append(entry)
|
|
||||||
elif ext in PLAYLIST_EXTENSIONS:
|
|
||||||
result.playlist_files.append(entry)
|
|
||||||
else:
|
|
||||||
result.other_files.append(entry)
|
|
||||||
|
|
||||||
|
|
||||||
def collect_album_dirs(root: Path) -> List[Path]:
|
|
||||||
dirs: List[Path] = []
|
|
||||||
try:
|
|
||||||
for item in sorted(root.iterdir()):
|
|
||||||
if item.is_dir() and not _is_hidden(item.name):
|
|
||||||
dirs.append(item)
|
|
||||||
except (PermissionError, OSError) as e:
|
|
||||||
print(f"⚠️ Lesefehler {root}: {e}", file=sys.stderr)
|
|
||||||
return dirs
|
|
||||||
|
|
@ -1,274 +0,0 @@
|
||||||
#!/usr/bin/env python3
|
|
||||||
"""test_suite_enricher.py — Unit- und Integrationstests für music_enricher."""
|
|
||||||
from __future__ import annotations
|
|
||||||
|
|
||||||
import sys
|
|
||||||
import tempfile
|
|
||||||
import traceback
|
|
||||||
from pathlib import Path
|
|
||||||
from typing import Callable
|
|
||||||
|
|
||||||
sys.path.insert(0, str(Path(__file__).parent))
|
|
||||||
|
|
||||||
from models import AlbumScan, TrackHints, AlbumHints
|
|
||||||
|
|
||||||
RESULTS: list[dict] = []
|
|
||||||
|
|
||||||
|
|
||||||
def record(test_id: str, passed: bool, detail: str = "") -> None:
|
|
||||||
RESULTS.append({"id": test_id, "status": "PASS" if passed else "FAIL", "detail": detail})
|
|
||||||
|
|
||||||
|
|
||||||
def run_case(test_id: str, fn: Callable[[], str]) -> None:
|
|
||||||
try:
|
|
||||||
detail = fn()
|
|
||||||
record(test_id, True, detail)
|
|
||||||
except Exception:
|
|
||||||
record(test_id, False, traceback.format_exc()[:300])
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# hint_extractor Tests
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
def test_parse_dirname_artist_album() -> str:
|
|
||||||
from hint_extractor import _parse_dirname
|
|
||||||
artist, album, year = _parse_dirname("Pink_Floyd_-_The_Wall")
|
|
||||||
assert artist and "Pink" in artist, f"artist: {artist}"
|
|
||||||
assert album and "Wall" in album, f"album: {album}"
|
|
||||||
return f"artist={artist!r}, album={album!r}"
|
|
||||||
|
|
||||||
|
|
||||||
def test_parse_dirname_with_year() -> str:
|
|
||||||
from hint_extractor import _parse_dirname
|
|
||||||
artist, album, year = _parse_dirname("Abba_-_Greatest_Hits_1992")
|
|
||||||
assert year == "1992", f"year: {year}"
|
|
||||||
return f"year={year}"
|
|
||||||
|
|
||||||
|
|
||||||
def test_parse_dirname_album_only() -> str:
|
|
||||||
from hint_extractor import _parse_dirname
|
|
||||||
artist, album, year = _parse_dirname("Beethoven_Complete_Edition")
|
|
||||||
assert album is not None, "album should not be None"
|
|
||||||
return f"album={album!r}"
|
|
||||||
|
|
||||||
|
|
||||||
def test_parse_filename_track_artist_title() -> str:
|
|
||||||
from hint_extractor import _parse_filename
|
|
||||||
r = _parse_filename("07 - ABBA - Dancing Queen")
|
|
||||||
assert r.get("track") == "07", f"track: {r}"
|
|
||||||
assert "ABBA" in r.get("artist", ""), f"artist: {r}"
|
|
||||||
assert "Dancing" in r.get("title", ""), f"title: {r}"
|
|
||||||
return str(r)
|
|
||||||
|
|
||||||
|
|
||||||
def test_parse_filename_disc_track_title() -> str:
|
|
||||||
from hint_extractor import _parse_filename
|
|
||||||
r = _parse_filename("2-07 - Bach - Toccata")
|
|
||||||
assert r.get("disc") == "2", f"disc: {r}"
|
|
||||||
assert r.get("track") == "07", f"track: {r}"
|
|
||||||
return str(r)
|
|
||||||
|
|
||||||
|
|
||||||
def test_parse_filename_track_title() -> str:
|
|
||||||
from hint_extractor import _parse_filename
|
|
||||||
r = _parse_filename("01 - Dancing Queen")
|
|
||||||
assert r.get("track") == "01", f"track: {r}"
|
|
||||||
assert "Dancing" in r.get("title", ""), f"title: {r}"
|
|
||||||
return str(r)
|
|
||||||
|
|
||||||
|
|
||||||
def test_parse_filename_artist_title() -> str:
|
|
||||||
from hint_extractor import _parse_filename
|
|
||||||
r = _parse_filename("Miles Davis - So What")
|
|
||||||
assert "Miles" in r.get("artist", ""), f"artist: {r}"
|
|
||||||
assert "What" in r.get("title", ""), f"title: {r}"
|
|
||||||
return str(r)
|
|
||||||
|
|
||||||
|
|
||||||
def test_parse_tracklist_numbered() -> str:
|
|
||||||
from hint_extractor import _parse_tracklist
|
|
||||||
text = "1. Dancing Queen\n2. Waterloo\n3. Fernando"
|
|
||||||
tracks = _parse_tracklist(text)
|
|
||||||
assert len(tracks) == 3, f"count: {len(tracks)}"
|
|
||||||
assert tracks[0]["title"] == "Dancing Queen", f"title: {tracks[0]}"
|
|
||||||
return f"{len(tracks)} tracks parsed"
|
|
||||||
|
|
||||||
|
|
||||||
def test_parse_tracklist_with_duration() -> str:
|
|
||||||
from hint_extractor import _parse_tracklist
|
|
||||||
text = "1-1 Toccata And Fugue 9:17\n1-2 Heartbeat 2:19\n2-1 Finale 5:00"
|
|
||||||
tracks = _parse_tracklist(text)
|
|
||||||
assert len(tracks) >= 2, f"count: {len(tracks)}"
|
|
||||||
assert tracks[0]["disc"] == "1", f"disc: {tracks[0]}"
|
|
||||||
return f"{len(tracks)} tracks parsed"
|
|
||||||
|
|
||||||
|
|
||||||
def test_parse_tracklist_with_disc_sections() -> str:
|
|
||||||
from hint_extractor import _parse_tracklist
|
|
||||||
text = "CD 1\n1. Track A\n2. Track B\nCD 2\n1. Track C"
|
|
||||||
tracks = _parse_tracklist(text)
|
|
||||||
disc2 = [t for t in tracks if t.get("disc") == "2"]
|
|
||||||
assert len(disc2) >= 1, f"disc2: {disc2}"
|
|
||||||
return f"{len(tracks)} total, {len(disc2)} on disc 2"
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# Scanner Tests
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
def test_scanner_classifies_files() -> str:
|
|
||||||
from scanner import scan_album
|
|
||||||
with tempfile.TemporaryDirectory() as tmpdir:
|
|
||||||
root = Path(tmpdir) / "TestAlbum"
|
|
||||||
root.mkdir()
|
|
||||||
(root / "01 - Song.mp3").write_bytes(b"\x00" * 100)
|
|
||||||
(root / "02 - Song.flac").write_bytes(b"\x00" * 100)
|
|
||||||
(root / "front.jpg").write_bytes(b"\xff\xd8" + b"\x00" * 100)
|
|
||||||
(root / "tracklist.txt").write_text("1. Track One\n2. Track Two")
|
|
||||||
(root / "notes.pdf").write_bytes(b"\x00" * 50)
|
|
||||||
|
|
||||||
scan = scan_album(root)
|
|
||||||
assert len(scan.audio_files) == 2, f"audio: {scan.audio_files}"
|
|
||||||
assert len(scan.image_files) == 1, f"images: {scan.image_files}"
|
|
||||||
assert len(scan.tracklist_files) == 1, f"tracklists: {scan.tracklist_files}"
|
|
||||||
return "scan OK: 2 audio, 1 image, 1 tracklist"
|
|
||||||
|
|
||||||
|
|
||||||
def test_scanner_ignores_hidden() -> str:
|
|
||||||
from scanner import scan_album
|
|
||||||
with tempfile.TemporaryDirectory() as tmpdir:
|
|
||||||
root = Path(tmpdir) / "Album"
|
|
||||||
root.mkdir()
|
|
||||||
(root / "song.mp3").write_bytes(b"\x00" * 100)
|
|
||||||
(root / ".hidden.mp3").write_bytes(b"\x00" * 100)
|
|
||||||
(root / "_trash.mp3").write_bytes(b"\x00" * 100)
|
|
||||||
scan = scan_album(root)
|
|
||||||
assert len(scan.audio_files) == 1, f"should ignore hidden: {scan.audio_files}"
|
|
||||||
return "hidden files correctly ignored"
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# extract_hints integration
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
def test_extract_hints_from_scan() -> str:
|
|
||||||
from scanner import scan_album
|
|
||||||
from hint_extractor import extract_hints
|
|
||||||
with tempfile.TemporaryDirectory() as tmpdir:
|
|
||||||
root = Path(tmpdir) / "ABBA_-_Greatest_Hits"
|
|
||||||
root.mkdir()
|
|
||||||
(root / "01 - ABBA - Dancing Queen.mp3").write_bytes(b"\x00" * 1024)
|
|
||||||
(root / "02 - ABBA - Waterloo.mp3").write_bytes(b"\x00" * 1024)
|
|
||||||
(root / "tracklist.txt").write_text("1. Dancing Queen\n2. Waterloo\n")
|
|
||||||
|
|
||||||
scan = scan_album(root)
|
|
||||||
hints = extract_hints(scan)
|
|
||||||
assert hints.dir_album is not None, "album hint missing"
|
|
||||||
assert len(hints.tracks) == 2, f"tracks: {len(hints.tracks)}"
|
|
||||||
assert hints.tracklist_text is not None, "tracklist not read"
|
|
||||||
return f"hints OK: album={hints.dir_album!r}, {len(hints.tracks)} tracks"
|
|
||||||
|
|
||||||
|
|
||||||
def test_extract_hints_multi_disc() -> str:
|
|
||||||
from scanner import scan_album
|
|
||||||
from hint_extractor import extract_hints
|
|
||||||
with tempfile.TemporaryDirectory() as tmpdir:
|
|
||||||
root = Path(tmpdir) / "Bach_Complete"
|
|
||||||
(root / "CD1").mkdir(parents=True)
|
|
||||||
(root / "CD2").mkdir()
|
|
||||||
(root / "CD1" / "01 - Toccata.mp3").write_bytes(b"\x00" * 1024)
|
|
||||||
(root / "CD2" / "01 - Fugue.mp3").write_bytes(b"\x00" * 1024)
|
|
||||||
|
|
||||||
scan = scan_album(root)
|
|
||||||
hints = extract_hints(scan)
|
|
||||||
disc_nums = {t.disc_number for t in hints.tracks if t.disc_number}
|
|
||||||
assert 1 in disc_nums, f"disc 1 missing: {disc_nums}"
|
|
||||||
assert 2 in disc_nums, f"disc 2 missing: {disc_nums}"
|
|
||||||
return f"disc numbers detected: {disc_nums}"
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# executor Tests
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
def test_proposed_filename_single_disc() -> str:
|
|
||||||
from executor import _proposed_filename
|
|
||||||
from models import TrackProposal
|
|
||||||
from pathlib import Path
|
|
||||||
# Pop schema: albumartist == track artist → TT_-_Artist_-_Title
|
|
||||||
tp = TrackProposal(path=Path("dummy.mp3"), title="Dancing Queen",
|
|
||||||
artist="ABBA", track_number=1, disc_number=None)
|
|
||||||
name = _proposed_filename(tp, ".mp3", albumartist="ABBA")
|
|
||||||
assert name == "01_-_ABBA_-_Dancing_Queen.mp3", f"got: {name!r}"
|
|
||||||
return name
|
|
||||||
|
|
||||||
|
|
||||||
def test_proposed_filename_multi_disc() -> str:
|
|
||||||
from executor import _proposed_filename
|
|
||||||
from models import TrackProposal
|
|
||||||
from pathlib import Path
|
|
||||||
# Classical schema: albumartist (performer) ≠ track artist (composer)
|
|
||||||
tp = TrackProposal(path=Path("dummy.flac"), title="Toccata",
|
|
||||||
artist="Bach", track_number=7, disc_number=2)
|
|
||||||
name = _proposed_filename(tp, ".flac", albumartist="Gardiner")
|
|
||||||
assert name == "2-07_-_Gardiner_-_Bach_-_Toccata.flac", f"got: {name!r}"
|
|
||||||
return name
|
|
||||||
|
|
||||||
|
|
||||||
def test_proposed_filename_sanitizes_chars() -> str:
|
|
||||||
from executor import _proposed_filename
|
|
||||||
from models import TrackProposal
|
|
||||||
from pathlib import Path
|
|
||||||
tp = TrackProposal(path=Path("x.mp3"), title='Track: "Live" / Today',
|
|
||||||
artist="Artist?", track_number=3, disc_number=None)
|
|
||||||
name = _proposed_filename(tp, ".mp3")
|
|
||||||
assert "/" not in name and ":" not in name, f"unsafe chars in: {name!r}"
|
|
||||||
return name
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# Runner
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
def main() -> None:
|
|
||||||
print("🧪 Starte Music Metadata Enricher Tests...")
|
|
||||||
|
|
||||||
cases = [
|
|
||||||
("UNIT_01_parse_dirname_artist_album", test_parse_dirname_artist_album),
|
|
||||||
("UNIT_02_parse_dirname_with_year", test_parse_dirname_with_year),
|
|
||||||
("UNIT_03_parse_dirname_album_only", test_parse_dirname_album_only),
|
|
||||||
("UNIT_04_parse_filename_track_artist_title", test_parse_filename_track_artist_title),
|
|
||||||
("UNIT_05_parse_filename_disc_track_title", test_parse_filename_disc_track_title),
|
|
||||||
("UNIT_06_parse_filename_track_title", test_parse_filename_track_title),
|
|
||||||
("UNIT_07_parse_filename_artist_title", test_parse_filename_artist_title),
|
|
||||||
("UNIT_08_parse_tracklist_numbered", test_parse_tracklist_numbered),
|
|
||||||
("UNIT_09_parse_tracklist_with_duration", test_parse_tracklist_with_duration),
|
|
||||||
("UNIT_10_parse_tracklist_disc_sections", test_parse_tracklist_with_disc_sections),
|
|
||||||
("UNIT_11_scanner_classifies_files", test_scanner_classifies_files),
|
|
||||||
("UNIT_12_scanner_ignores_hidden", test_scanner_ignores_hidden),
|
|
||||||
("UNIT_13_extract_hints_from_scan", test_extract_hints_from_scan),
|
|
||||||
("UNIT_14_extract_hints_multi_disc", test_extract_hints_multi_disc),
|
|
||||||
("UNIT_15_proposed_filename_single_disc", test_proposed_filename_single_disc),
|
|
||||||
("UNIT_16_proposed_filename_multi_disc", test_proposed_filename_multi_disc),
|
|
||||||
("UNIT_17_proposed_filename_sanitizes_chars", test_proposed_filename_sanitizes_chars),
|
|
||||||
]
|
|
||||||
|
|
||||||
for test_id, fn in cases:
|
|
||||||
run_case(test_id, fn)
|
|
||||||
|
|
||||||
print("=" * 70)
|
|
||||||
for r in RESULTS:
|
|
||||||
icon = "✅" if r["status"] == "PASS" else "❌"
|
|
||||||
detail = r["detail"][:100] + "..." if len(r["detail"]) > 100 else r["detail"]
|
|
||||||
print(f"{icon} [{r['status']}] {r['id']} {detail}")
|
|
||||||
print("=" * 70)
|
|
||||||
|
|
||||||
passed = sum(1 for r in RESULTS if r["status"] == "PASS")
|
|
||||||
total = len(RESULTS)
|
|
||||||
print(f"📊 {passed}/{total} Tests erfolgreich")
|
|
||||||
sys.exit(0 if passed == total else 1)
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
main()
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue