Music_Metadata_Enricher/executor.py

367 lines
13 KiB
Python
Raw Normal View History

from __future__ import annotations
import csv
import re
import shutil
import subprocess
import sys
from pathlib import Path
from typing import Optional, List, Dict, Any
from models import AlbumProposal, TrackProposal
try:
from mutagen import File as MutagenFile
from mutagen.easyid3 import EasyID3
from mutagen.flac import FLAC
from mutagen.mp4 import MP4, MP4Tags
HAS_MUTAGEN = True
except ImportError:
HAS_MUTAGEN = False
from cover_handler import embed_cover
_SAFE_RE = re.compile(r'[<>:"/\\|?*\x00-\x1f]')
_CLASSICAL_GENRES = re.compile(
r"(?i)class|baroque|romantic|renaissance|opera|symphony|chamber|concerto|sonata|oratorio"
)
REPORT_FIELDS = [
"status", "album_dir", "track_path",
"old_title", "new_title",
"old_artist", "new_artist",
"album", "albumartist", "date", "genre", "label",
"track_number", "disc_number",
"cover_embedded", "renamed_to",
"confidence", "sources",
]
def _safe_name(s: str) -> str:
"""Filesystem-safe name: illegal chars → '_', spaces → '_'."""
s = _SAFE_RE.sub("_", s)
return re.sub(r"\s+", "_", s).strip("._-")
def _is_classical(albumartist: str, track_artist: str, genre: str) -> bool:
"""
Classical schema applies when performer (albumartist) composer (track_artist),
which covers both 'real' classical music and jazz-on-classical-themes albums.
Genre keyword matching is used as additional signal but not required.
"""
aa = (albumartist or "").casefold().strip()
ta = (track_artist or "").casefold().strip()
if not aa or aa in ("various artists", "unknown artist", "unknown"):
return False
if aa == ta:
return False
return True # performer ≠ composer → classical naming
def _proposed_filename(
proposal: TrackProposal,
ext: str,
albumartist: str = "",
genre: str = "",
) -> str:
"""
Pop/Default: TT_-_Artist_-_Titel.ext
Klassik: TT_-_Performer_-_Komponist_-_Titel[-_Orchester_Dirigent].ext
Separator zwischen Teilen: _-_
Leerzeichen innerhalb von Namen: _
Fehlende Teile werden weggelassen.
"""
tn = f"{proposal.track_number:02d}" if proposal.track_number else "00"
# disc_prefix nur bei echtem Multi-CD (disc > 1). disc=1 oder None → Einzel-CD → kein Präfix.
disc_prefix = f"{proposal.disc_number}-" if (proposal.disc_number and proposal.disc_number > 1) else ""
prefix = f"{disc_prefix}{tn}"
track_artist = _safe_name(proposal.artist or "Unknown")
aa = _safe_name(albumartist)
title = _safe_name(proposal.title or "Unknown")
if _is_classical(aa, track_artist, genre):
# Klassik-Schema: Performer _-_ Komponist _-_ Werk [_-_ Orchester,Dirigent]
parts = [prefix, aa, track_artist, title]
# Orchester und Dirigent anhängen wenn vorhanden
extra = "_".join(filter(None, [
_safe_name(proposal.orchestra or ""),
_safe_name(proposal.conductor or ""),
]))
if extra:
parts.append(extra)
return "_-_".join(parts) + ext
else:
# Pop/Default-Schema: Tracknummer _-_ Artist _-_ Titel
return f"{prefix}_-_{track_artist}_-_{title}{ext}"
def backup_file(path: Path, backup_dir: Path) -> bool:
try:
backup_dir.mkdir(parents=True, exist_ok=True)
rel = path.parent.name + "__" + path.name
dest = backup_dir / rel
if not dest.exists():
shutil.copy2(path, dest)
return True
except Exception as e:
print(f" ⚠️ Backup-Fehler {path.name}: {e}", file=sys.stderr)
return False
def write_tags(path: Path, proposal: TrackProposal, album_proposal: AlbumProposal) -> bool:
if not HAS_MUTAGEN:
return False
ext = path.suffix.lower()
tags_to_write = {
"title": proposal.title or "",
"artist": proposal.artist or "",
"album": album_proposal.album or "",
"albumartist": album_proposal.albumartist or "",
}
if proposal.track_number:
total = len(album_proposal.tracks)
tags_to_write["tracknumber"] = f"{proposal.track_number}/{total}"
if proposal.disc_number and proposal.disc_number > 1:
tags_to_write["discnumber"] = str(proposal.disc_number)
if album_proposal.date:
# Strip everything except valid ID3 timestamp characters to prevent ID3TimeStamp errors
date_clean = re.sub(r"[^\d\-T:+Z]", "", str(album_proposal.date)).strip()
if date_clean:
tags_to_write["date"] = date_clean
if album_proposal.genre:
tags_to_write["genre"] = album_proposal.genre
if album_proposal.label:
tags_to_write["organization"] = album_proposal.label
try:
if ext == ".mp3":
try:
audio = EasyID3(str(path))
except Exception:
# File has no ID3 header — add one without wiping audio data
from mutagen.id3 import ID3NoHeaderError
try:
from mutagen.mp3 import MP3
full = MP3(str(path))
full.tags = None
full.add_tags()
full.save(str(path), v2_version=4)
except Exception:
pass
audio = EasyID3(str(path))
for k, v in tags_to_write.items():
try:
audio[k] = [v]
except Exception as tag_err:
print(f" ⚠️ Tag-Feld '{k}' übersprungen ({path.name}): {tag_err}", file=sys.stderr)
audio.save(v2_version=4)
return True
elif ext == ".flac":
audio = FLAC(str(path))
for k, v in tags_to_write.items():
audio[k] = [v]
audio.save()
return True
elif ext == ".m4a":
audio = MP4(str(path))
mapping = {
"title": "\xa9nam", "artist": "\xa9ART",
"album": "\xa9alb", "albumartist": "aART",
"tracknumber": "trkn", "date": "\xa9day",
"genre": "\xa9gen",
}
for k, v in tags_to_write.items():
tag_key = mapping.get(k)
if tag_key:
if tag_key == "trkn":
try:
num, total = v.split("/") if "/" in v else (v, "0")
audio[tag_key] = [(int(num), int(total))]
except Exception:
pass
else:
audio[tag_key] = [v]
audio.save()
return True
else:
audio = MutagenFile(str(path), easy=True)
if audio is not None:
if audio.tags is None:
audio.add_tags()
for k, v in tags_to_write.items():
try:
audio[k] = [v]
except Exception:
pass
audio.save()
return True
except Exception as e:
print(f" ⚠️ Tag-Schreibfehler {path.name}: {e}", file=sys.stderr)
return False
def _update_m3u(m3u_path: Path, tracks: List[tuple]) -> bool:
"""
Schreibt M3U neu mit den umbenannten Dateien in Track-Reihenfolge.
tracks: [(TrackProposal, actual_path_after_rename), ...]
"""
try:
lines = ["#EXTM3U"]
for tp, track_path in tracks:
duration = -1
if HAS_MUTAGEN:
try:
audio = MutagenFile(str(track_path))
if audio and hasattr(audio, "info") and audio.info:
duration = int(audio.info.length)
except Exception:
pass
label = f"{tp.artist} - {tp.title}" if tp.artist else (tp.title or track_path.stem)
lines.append(f"#EXTINF:{duration},{label}")
lines.append(track_path.name)
m3u_path.write_text("\n".join(lines) + "\n", encoding="utf-8")
print(f" 📋 Playlist aktualisiert: {m3u_path.name}")
return True
except Exception as e:
print(f" ⚠️ M3U-Fehler {m3u_path.name}: {e}", file=sys.stderr)
return False
def execute_album(
proposal: AlbumProposal,
backup_dir: Optional[Path],
do_rename: bool,
embed_cover_art: bool,
dry_run: bool,
report_data: List[Dict[str, Any]],
) -> Dict[str, int]:
stats = {"tags_written": 0, "covers_embedded": 0, "files_renamed": 0, "errors": 0}
final_tracks: List[tuple] = [] # (TrackProposal, final_path) für M3U
for tp in proposal.tracks:
old_title = tp.path.stem
old_artist = ""
if HAS_MUTAGEN:
try:
audio = MutagenFile(str(tp.path), easy=True)
if audio and audio.tags:
old_artist = str(audio.tags.get("artist", [""])[0])
old_title = str(audio.tags.get("title", [tp.path.stem])[0])
except Exception:
pass
new_path = tp.path
renamed_to = ""
cover_embedded = False
if not dry_run:
if backup_dir:
backup_file(tp.path, backup_dir)
if write_tags(tp.path, tp, proposal):
stats["tags_written"] += 1
else:
stats["errors"] += 1
if embed_cover_art and proposal.cover_path:
if embed_cover(tp.path, proposal.cover_path):
stats["covers_embedded"] += 1
cover_embedded = True
if do_rename:
new_name = _proposed_filename(
tp, tp.path.suffix,
albumartist=proposal.albumartist or "",
genre=proposal.genre or "",
)
candidate = tp.path.parent / new_name
if candidate != tp.path:
try:
tp.path.rename(candidate)
new_path = candidate
renamed_to = new_name
stats["files_renamed"] += 1
except Exception as e:
print(f" ⚠️ Umbenennungsfehler {tp.path.name}: {e}", file=sys.stderr)
stats["errors"] += 1
if not dry_run:
final_tracks.append((tp, new_path))
report_data.append({
"status": "dry-run" if dry_run else "ok",
"album_dir": str(proposal.album_dir.name),
"track_path": str(new_path),
"old_title": old_title,
"new_title": tp.title,
"old_artist": old_artist,
"new_artist": tp.artist,
"album": proposal.album,
"albumartist": proposal.albumartist,
"date": proposal.date or "",
"genre": proposal.genre or "",
"label": proposal.label or "",
"track_number": tp.track_number or "",
"disc_number": tp.disc_number or "",
"cover_embedded": cover_embedded,
"renamed_to": renamed_to,
"confidence": f"{proposal.confidence:.2f}",
"sources": ", ".join(proposal.sources),
})
# M3U-Playlist aktualisieren wenn Dateien umbenannt wurden
if do_rename and not dry_run and stats["files_renamed"] > 0 and final_tracks:
m3u_files = (
list(proposal.album_dir.glob("*.m3u")) +
list(proposal.album_dir.glob("*.m3u8"))
)
if m3u_files:
_update_m3u(m3u_files[0], final_tracks)
# Nach allen Umbenennungen: Verzeichnis Linux-kompatibel bereinigen
if do_rename and not dry_run:
sanitize_dir_names(proposal.album_dir)
return stats
def sanitize_dir_names(directory: Path) -> None:
"""
Macht alle Dateinamen im Verzeichnis Linux-kompatibel.
Bevorzugt 'NameToUnix <dir>', fällt auf 'detox <file>' zurück.
"""
name_to_unix = shutil.which("NameToUnix")
if name_to_unix:
try:
subprocess.run([name_to_unix, str(directory)], check=True, capture_output=True)
return
except subprocess.CalledProcessError as e:
print(f" ⚠️ NameToUnix-Fehler: {e.stderr.decode(errors='replace').strip()}", file=sys.stderr)
detox = shutil.which("detox")
if detox:
for f in sorted(directory.rglob("*")):
if f.is_file():
try:
subprocess.run([detox, str(f)], check=True, capture_output=True)
except subprocess.CalledProcessError as e:
print(f" ⚠️ detox-Fehler {f.name}: {e.stderr.decode(errors='replace').strip()}", file=sys.stderr)
else:
print(" Weder NameToUnix noch detox gefunden — Dateinamen nicht nachbereinigt.", file=sys.stderr)
def write_report(report_data: List[Dict[str, Any]], report_path: Path) -> None:
try:
report_path.parent.mkdir(parents=True, exist_ok=True)
with report_path.open("w", encoding="utf-8", newline="") as f:
w = csv.DictWriter(f, fieldnames=REPORT_FIELDS)
w.writeheader()
w.writerows(report_data)
print(f"📊 Report gespeichert: {report_path}")
except Exception as e:
print(f"⚠️ Report-Fehler: {e}", file=sys.stderr)