Music_Metadata_Enricher/executor.py

400 lines
14 KiB
Python
Raw Normal View History

from __future__ import annotations
import csv
import re
import shutil
import subprocess
import sys
from pathlib import Path
from typing import Optional, List, Dict, Any
from models import AlbumProposal, TrackProposal
try:
from mutagen import File as MutagenFile
from mutagen.easyid3 import EasyID3
from mutagen.flac import FLAC
from mutagen.mp4 import MP4, MP4Tags
HAS_MUTAGEN = True
except ImportError:
HAS_MUTAGEN = False
from cover_handler import embed_cover
_SAFE_RE = re.compile(r'[<>:"/\\|?*\x00-\x1f]')
_CLASSICAL_GENRES = re.compile(
r"(?i)class|baroque|romantic|renaissance|opera|symphony|chamber|concerto|sonata|oratorio"
)
REPORT_FIELDS = [
"status", "album_dir", "track_path",
"old_title", "new_title",
"old_artist", "new_artist",
"album", "albumartist", "date", "genre", "label",
"track_number", "disc_number",
"cover_embedded", "renamed_to",
"confidence", "sources",
]
def _safe_name(s: str) -> str:
"""Filesystem-safe name: illegal chars → '_', spaces → '_'."""
s = _SAFE_RE.sub("_", s)
return re.sub(r"\s+", "_", s).strip("._-")
_CLASSICAL_GENRE_KEYWORDS = {
"classical", "klassik", "baroque", "barock", "romantic", "romantik",
"opera", "oper", "operetta", "operette", "chamber", "kammermusik",
"symphon", "concerto", "oratorio", "sacred", "kirchenmusik",
"renaissance", "medieval", "contemporary classical",
}
_CLASSICAL_COMPOSER_KEYWORDS = {
# Bekannte Komponisten als Signal (Nachname reicht)
"bach", "beethoven", "mozart", "handel", "haydn", "schubert", "brahms",
"chopin", "liszt", "schumann", "wagner", "verdi", "puccini", "vivaldi",
"telemann", "buxtehude", "monteverdi", "palestrina", "purcell",
"mahler", "bruckner", "dvorak", "tchaikovsky", "tschaikowski",
"debussy", "ravel", "satie", "strauss", "sibelius", "grieg",
}
def _is_classical(albumartist: str, track_artist: str, genre: str) -> bool:
"""
Klassik-Schema (Performer_-_Komponist_-_Werk) wird angewendet wenn:
1. Genre explizit klassisch ist, ODER
2. track_artist ist ein bekannter Komponist (und albumartist), ODER
3. albumartist track_artist UND beide sind bekannte Komponistennamen.
Reine PerformerKomponist-Heuristik ohne Genre-Bestätigung ist abgeschaltet
(zu viele Falschpositive bei Samplern, Jazz, Volksmusik).
"""
aa = (albumartist or "").casefold().strip()
ta = (track_artist or "").casefold().strip()
g = (genre or "").casefold().strip()
if not aa or aa in ("various artists", "unknown artist", "unknown"):
return False
if not ta or ta in ("unknown artist", "unknown"):
return False
if aa == ta:
return False
# Primäres Signal: Genre-Keyword
if any(kw in g for kw in _CLASSICAL_GENRE_KEYWORDS):
return True
# Sekundäres Signal: track_artist enthält bekannten Komponistennamen
if any(kw in ta for kw in _CLASSICAL_COMPOSER_KEYWORDS):
return True
return False
def _proposed_filename(
proposal: TrackProposal,
ext: str,
albumartist: str = "",
genre: str = "",
) -> str:
"""
Pop/Default: TT_-_Artist_-_Titel.ext
Klassik: TT_-_Performer_-_Komponist_-_Titel[-_Orchester_Dirigent].ext
Separator zwischen Teilen: _-_
Leerzeichen innerhalb von Namen: _
Fehlende Teile werden weggelassen.
"""
tn = f"{proposal.track_number:02d}" if proposal.track_number else "00"
# disc_prefix nur bei echtem Multi-CD (disc > 1). disc=1 oder None → Einzel-CD → kein Präfix.
disc_prefix = f"{proposal.disc_number}-" if (proposal.disc_number and proposal.disc_number > 1) else ""
prefix = f"{disc_prefix}{tn}"
track_artist = _safe_name(proposal.artist or albumartist or "Unknown")
aa = _safe_name(albumartist)
title = _safe_name(proposal.title or "Unknown")
if _is_classical(aa, track_artist, genre):
# Klassik-Schema: Performer _-_ Komponist _-_ Werk [_-_ Orchester,Dirigent]
parts = [prefix, aa, track_artist, title]
# Orchester und Dirigent anhängen wenn vorhanden
extra = "_".join(filter(None, [
_safe_name(proposal.orchestra or ""),
_safe_name(proposal.conductor or ""),
]))
if extra:
parts.append(extra)
return "_-_".join(parts) + ext
else:
# Pop/Default-Schema: Tracknummer _-_ Artist _-_ Titel
return f"{prefix}_-_{track_artist}_-_{title}{ext}"
def backup_file(path: Path, backup_dir: Path) -> bool:
try:
backup_dir.mkdir(parents=True, exist_ok=True)
rel = path.parent.name + "__" + path.name
dest = backup_dir / rel
if not dest.exists():
shutil.copy2(path, dest)
return True
except Exception as e:
print(f" ⚠️ Backup-Fehler {path.name}: {e}", file=sys.stderr)
return False
def write_tags(path: Path, proposal: TrackProposal, album_proposal: AlbumProposal) -> bool:
if not HAS_MUTAGEN:
return False
ext = path.suffix.lower()
tags_to_write = {
"title": proposal.title or "",
"artist": proposal.artist or "",
"album": album_proposal.album or "",
"albumartist": album_proposal.albumartist or "",
}
if proposal.track_number:
total = len(album_proposal.tracks)
tags_to_write["tracknumber"] = f"{proposal.track_number}/{total}"
if proposal.disc_number and proposal.disc_number > 1:
tags_to_write["discnumber"] = str(proposal.disc_number)
if album_proposal.date:
# Strip everything except valid ID3 timestamp characters to prevent ID3TimeStamp errors
date_clean = re.sub(r"[^\d\-T:+Z]", "", str(album_proposal.date)).strip()
if date_clean:
tags_to_write["date"] = date_clean
if album_proposal.genre:
tags_to_write["genre"] = album_proposal.genre
if album_proposal.label:
tags_to_write["organization"] = album_proposal.label
try:
if ext == ".mp3":
try:
audio = EasyID3(str(path))
except Exception:
# File has no ID3 header — add one without wiping audio data
from mutagen.id3 import ID3NoHeaderError
try:
from mutagen.mp3 import MP3
full = MP3(str(path))
full.tags = None
full.add_tags()
full.save(str(path), v2_version=4)
except Exception:
pass
audio = EasyID3(str(path))
for k, v in tags_to_write.items():
try:
audio[k] = [v]
except Exception as tag_err:
print(f" ⚠️ Tag-Feld '{k}' übersprungen ({path.name}): {tag_err}", file=sys.stderr)
audio.save(v2_version=4)
return True
elif ext == ".flac":
audio = FLAC(str(path))
for k, v in tags_to_write.items():
audio[k] = [v]
audio.save()
return True
elif ext == ".m4a":
audio = MP4(str(path))
mapping = {
"title": "\xa9nam", "artist": "\xa9ART",
"album": "\xa9alb", "albumartist": "aART",
"tracknumber": "trkn", "date": "\xa9day",
"genre": "\xa9gen",
}
for k, v in tags_to_write.items():
tag_key = mapping.get(k)
if tag_key:
if tag_key == "trkn":
try:
num, total = v.split("/") if "/" in v else (v, "0")
audio[tag_key] = [(int(num), int(total))]
except Exception:
pass
else:
audio[tag_key] = [v]
audio.save()
return True
else:
audio = MutagenFile(str(path), easy=True)
if audio is not None:
if audio.tags is None:
audio.add_tags()
for k, v in tags_to_write.items():
try:
audio[k] = [v]
except Exception:
pass
audio.save()
return True
except Exception as e:
print(f" ⚠️ Tag-Schreibfehler {path.name}: {e}", file=sys.stderr)
return False
def _update_m3u(m3u_path: Path, tracks: List[tuple]) -> bool:
"""
Schreibt M3U neu mit den umbenannten Dateien in Track-Reihenfolge.
tracks: [(TrackProposal, actual_path_after_rename), ...]
"""
try:
lines = ["#EXTM3U"]
for tp, track_path in tracks:
duration = -1
if HAS_MUTAGEN:
try:
audio = MutagenFile(str(track_path))
if audio and hasattr(audio, "info") and audio.info:
duration = int(audio.info.length)
except Exception:
pass
label = f"{tp.artist} - {tp.title}" if tp.artist else (tp.title or track_path.stem)
lines.append(f"#EXTINF:{duration},{label}")
lines.append(track_path.name)
m3u_path.write_text("\n".join(lines) + "\n", encoding="utf-8")
print(f" 📋 Playlist aktualisiert: {m3u_path.name}")
return True
except Exception as e:
print(f" ⚠️ M3U-Fehler {m3u_path.name}: {e}", file=sys.stderr)
return False
def execute_album(
proposal: AlbumProposal,
backup_dir: Optional[Path],
do_rename: bool,
embed_cover_art: bool,
dry_run: bool,
report_data: List[Dict[str, Any]],
) -> Dict[str, int]:
stats = {"tags_written": 0, "covers_embedded": 0, "files_renamed": 0, "errors": 0}
final_tracks: List[tuple] = [] # (TrackProposal, final_path) für M3U
for tp in proposal.tracks:
old_title = tp.path.stem
old_artist = ""
if HAS_MUTAGEN:
try:
audio = MutagenFile(str(tp.path), easy=True)
if audio and audio.tags:
old_artist = str(audio.tags.get("artist", [""])[0])
old_title = str(audio.tags.get("title", [tp.path.stem])[0])
except Exception:
pass
new_path = tp.path
renamed_to = ""
cover_embedded = False
if not dry_run:
if backup_dir:
backup_file(tp.path, backup_dir)
if write_tags(tp.path, tp, proposal):
stats["tags_written"] += 1
else:
stats["errors"] += 1
if embed_cover_art and proposal.cover_path:
if embed_cover(tp.path, proposal.cover_path):
stats["covers_embedded"] += 1
cover_embedded = True
if do_rename:
new_name = _proposed_filename(
tp, tp.path.suffix,
albumartist=proposal.albumartist or "",
genre=proposal.genre or "",
)
candidate = tp.path.parent / new_name
if candidate != tp.path:
try:
tp.path.rename(candidate)
new_path = candidate
renamed_to = new_name
stats["files_renamed"] += 1
except Exception as e:
print(f" ⚠️ Umbenennungsfehler {tp.path.name}: {e}", file=sys.stderr)
stats["errors"] += 1
if not dry_run:
final_tracks.append((tp, new_path))
report_data.append({
"status": "dry-run" if dry_run else "ok",
"album_dir": str(proposal.album_dir.name),
"track_path": str(new_path),
"old_title": old_title,
"new_title": tp.title,
"old_artist": old_artist,
"new_artist": tp.artist,
"album": proposal.album,
"albumartist": proposal.albumartist,
"date": proposal.date or "",
"genre": proposal.genre or "",
"label": proposal.label or "",
"track_number": tp.track_number or "",
"disc_number": tp.disc_number or "",
"cover_embedded": cover_embedded,
"renamed_to": renamed_to,
"confidence": f"{proposal.confidence:.2f}",
"sources": ", ".join(proposal.sources),
})
# M3U-Playlist aktualisieren wenn Dateien umbenannt wurden
if do_rename and not dry_run and stats["files_renamed"] > 0 and final_tracks:
m3u_files = (
list(proposal.album_dir.glob("*.m3u")) +
list(proposal.album_dir.glob("*.m3u8"))
)
if m3u_files:
_update_m3u(m3u_files[0], final_tracks)
# Nach allen Umbenennungen: Verzeichnis Linux-kompatibel bereinigen
if do_rename and not dry_run:
sanitize_dir_names(proposal.album_dir)
return stats
def sanitize_dir_names(directory: Path) -> None:
"""
Macht alle Dateinamen im Verzeichnis Linux-kompatibel.
Bevorzugt 'NameToUnix <dir>', fällt auf 'detox <file>' zurück.
"""
name_to_unix = shutil.which("NameToUnix")
if name_to_unix:
try:
subprocess.run([name_to_unix, str(directory)], check=True, capture_output=True)
return
except subprocess.CalledProcessError as e:
print(f" ⚠️ NameToUnix-Fehler: {e.stderr.decode(errors='replace').strip()}", file=sys.stderr)
detox = shutil.which("detox")
if detox:
for f in sorted(directory.rglob("*")):
if f.is_file():
try:
subprocess.run([detox, str(f)], check=True, capture_output=True)
except subprocess.CalledProcessError as e:
print(f" ⚠️ detox-Fehler {f.name}: {e.stderr.decode(errors='replace').strip()}", file=sys.stderr)
else:
print(" Weder NameToUnix noch detox gefunden — Dateinamen nicht nachbereinigt.", file=sys.stderr)
def write_report(report_data: List[Dict[str, Any]], report_path: Path) -> None:
try:
report_path.parent.mkdir(parents=True, exist_ok=True)
with report_path.open("w", encoding="utf-8", newline="") as f:
w = csv.DictWriter(f, fieldnames=REPORT_FIELDS)
w.writeheader()
w.writerows(report_data)
print(f"📊 Report gespeichert: {report_path}")
except Exception as e:
print(f"⚠️ Report-Fehler: {e}", file=sys.stderr)