Pop schema: TT_-_Artist_-_Title.ext Classical schema: TT_-_Performer_-_Komponist_-_Werk[-_Orchester_Dirigent].ext triggered when albumartist ≠ track artist (pianist vs composer) All spaces in names → underscores; separator _-_ between parts. Missing parts (orchestra, conductor) are omitted. models.py: added conductor/orchestra optional fields to TrackProposal. executor.py: sanitize_dir_names() tries NameToUnix first, falls back to detox. Called after all renames in a directory are complete. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
326 lines
11 KiB
Python
326 lines
11 KiB
Python
from __future__ import annotations
|
||
|
||
import csv
|
||
import re
|
||
import shutil
|
||
import subprocess
|
||
import sys
|
||
from pathlib import Path
|
||
from typing import Optional, List, Dict, Any
|
||
|
||
from models import AlbumProposal, TrackProposal
|
||
|
||
try:
|
||
from mutagen import File as MutagenFile
|
||
from mutagen.easyid3 import EasyID3
|
||
from mutagen.flac import FLAC
|
||
from mutagen.mp4 import MP4, MP4Tags
|
||
HAS_MUTAGEN = True
|
||
except ImportError:
|
||
HAS_MUTAGEN = False
|
||
|
||
from cover_handler import embed_cover
|
||
|
||
_SAFE_RE = re.compile(r'[<>:"/\\|?*\x00-\x1f]')
|
||
_CLASSICAL_GENRES = re.compile(
|
||
r"(?i)class|baroque|romantic|renaissance|opera|symphony|chamber|concerto|sonata|oratorio"
|
||
)
|
||
REPORT_FIELDS = [
|
||
"status", "album_dir", "track_path",
|
||
"old_title", "new_title",
|
||
"old_artist", "new_artist",
|
||
"album", "albumartist", "date", "genre", "label",
|
||
"track_number", "disc_number",
|
||
"cover_embedded", "renamed_to",
|
||
"confidence", "sources",
|
||
]
|
||
|
||
|
||
def _safe_name(s: str) -> str:
|
||
"""Filesystem-safe name: illegal chars → '_', spaces → '_'."""
|
||
s = _SAFE_RE.sub("_", s)
|
||
return re.sub(r"\s+", "_", s).strip("._-")
|
||
|
||
|
||
def _is_classical(albumartist: str, track_artist: str, genre: str) -> bool:
|
||
"""
|
||
Classical schema applies when performer (albumartist) ≠ composer (track_artist),
|
||
which covers both 'real' classical music and jazz-on-classical-themes albums.
|
||
Genre keyword matching is used as additional signal but not required.
|
||
"""
|
||
aa = (albumartist or "").casefold().strip()
|
||
ta = (track_artist or "").casefold().strip()
|
||
if not aa or aa in ("various artists", "unknown artist", "unknown"):
|
||
return False
|
||
if aa == ta:
|
||
return False
|
||
return True # performer ≠ composer → classical naming
|
||
|
||
|
||
def _proposed_filename(
|
||
proposal: TrackProposal,
|
||
ext: str,
|
||
albumartist: str = "",
|
||
genre: str = "",
|
||
) -> str:
|
||
"""
|
||
Pop/Default: TT_-_Artist_-_Titel.ext
|
||
Klassik: TT_-_Performer_-_Komponist_-_Titel[-_Orchester_Dirigent].ext
|
||
|
||
Separator zwischen Teilen: _-_
|
||
Leerzeichen innerhalb von Namen: _
|
||
Fehlende Teile werden weggelassen.
|
||
"""
|
||
tn = f"{proposal.track_number:02d}" if proposal.track_number else "00"
|
||
disc_prefix = f"{proposal.disc_number}-" if proposal.disc_number and proposal.disc_number > 1 else ""
|
||
prefix = f"{disc_prefix}{tn}"
|
||
|
||
track_artist = _safe_name(proposal.artist or "Unknown")
|
||
aa = _safe_name(albumartist)
|
||
title = _safe_name(proposal.title or "Unknown")
|
||
|
||
if _is_classical(aa, track_artist, genre):
|
||
# Klassik-Schema: Performer _-_ Komponist _-_ Werk [_-_ Orchester,Dirigent]
|
||
parts = [prefix, aa, track_artist, title]
|
||
# Orchester und Dirigent anhängen wenn vorhanden
|
||
extra = "_".join(filter(None, [
|
||
_safe_name(proposal.orchestra or ""),
|
||
_safe_name(proposal.conductor or ""),
|
||
]))
|
||
if extra:
|
||
parts.append(extra)
|
||
return "_-_".join(parts) + ext
|
||
else:
|
||
# Pop/Default-Schema: Tracknummer _-_ Artist _-_ Titel
|
||
return f"{prefix}_-_{track_artist}_-_{title}{ext}"
|
||
|
||
|
||
def backup_file(path: Path, backup_dir: Path) -> bool:
|
||
try:
|
||
backup_dir.mkdir(parents=True, exist_ok=True)
|
||
rel = path.parent.name + "__" + path.name
|
||
dest = backup_dir / rel
|
||
if not dest.exists():
|
||
shutil.copy2(path, dest)
|
||
return True
|
||
except Exception as e:
|
||
print(f" ⚠️ Backup-Fehler {path.name}: {e}", file=sys.stderr)
|
||
return False
|
||
|
||
|
||
def write_tags(path: Path, proposal: TrackProposal, album_proposal: AlbumProposal) -> bool:
|
||
if not HAS_MUTAGEN:
|
||
return False
|
||
ext = path.suffix.lower()
|
||
tags_to_write = {
|
||
"title": proposal.title or "",
|
||
"artist": proposal.artist or "",
|
||
"album": album_proposal.album or "",
|
||
"albumartist": album_proposal.albumartist or "",
|
||
}
|
||
if proposal.track_number:
|
||
total = len(album_proposal.tracks)
|
||
tags_to_write["tracknumber"] = f"{proposal.track_number}/{total}"
|
||
if proposal.disc_number:
|
||
tags_to_write["discnumber"] = str(proposal.disc_number)
|
||
if album_proposal.date:
|
||
# Strip everything except valid ID3 timestamp characters to prevent ID3TimeStamp errors
|
||
date_clean = re.sub(r"[^\d\-T:+Z]", "", str(album_proposal.date)).strip()
|
||
if date_clean:
|
||
tags_to_write["date"] = date_clean
|
||
if album_proposal.genre:
|
||
tags_to_write["genre"] = album_proposal.genre
|
||
if album_proposal.label:
|
||
tags_to_write["organization"] = album_proposal.label
|
||
|
||
try:
|
||
if ext == ".mp3":
|
||
try:
|
||
audio = EasyID3(str(path))
|
||
except Exception:
|
||
# File has no ID3 header — add one without wiping audio data
|
||
from mutagen.id3 import ID3NoHeaderError
|
||
try:
|
||
from mutagen.mp3 import MP3
|
||
full = MP3(str(path))
|
||
full.tags = None
|
||
full.add_tags()
|
||
full.save(str(path), v2_version=4)
|
||
except Exception:
|
||
pass
|
||
audio = EasyID3(str(path))
|
||
for k, v in tags_to_write.items():
|
||
try:
|
||
audio[k] = [v]
|
||
except Exception as tag_err:
|
||
print(f" ⚠️ Tag-Feld '{k}' übersprungen ({path.name}): {tag_err}", file=sys.stderr)
|
||
audio.save(v2_version=4)
|
||
return True
|
||
|
||
elif ext == ".flac":
|
||
audio = FLAC(str(path))
|
||
for k, v in tags_to_write.items():
|
||
audio[k] = [v]
|
||
audio.save()
|
||
return True
|
||
|
||
elif ext == ".m4a":
|
||
audio = MP4(str(path))
|
||
mapping = {
|
||
"title": "\xa9nam", "artist": "\xa9ART",
|
||
"album": "\xa9alb", "albumartist": "aART",
|
||
"tracknumber": "trkn", "date": "\xa9day",
|
||
"genre": "\xa9gen",
|
||
}
|
||
for k, v in tags_to_write.items():
|
||
tag_key = mapping.get(k)
|
||
if tag_key:
|
||
if tag_key == "trkn":
|
||
try:
|
||
num, total = v.split("/") if "/" in v else (v, "0")
|
||
audio[tag_key] = [(int(num), int(total))]
|
||
except Exception:
|
||
pass
|
||
else:
|
||
audio[tag_key] = [v]
|
||
audio.save()
|
||
return True
|
||
|
||
else:
|
||
audio = MutagenFile(str(path), easy=True)
|
||
if audio is not None:
|
||
if audio.tags is None:
|
||
audio.add_tags()
|
||
for k, v in tags_to_write.items():
|
||
try:
|
||
audio[k] = [v]
|
||
except Exception:
|
||
pass
|
||
audio.save()
|
||
return True
|
||
|
||
except Exception as e:
|
||
print(f" ⚠️ Tag-Schreibfehler {path.name}: {e}", file=sys.stderr)
|
||
return False
|
||
|
||
|
||
def execute_album(
|
||
proposal: AlbumProposal,
|
||
backup_dir: Optional[Path],
|
||
do_rename: bool,
|
||
embed_cover_art: bool,
|
||
dry_run: bool,
|
||
report_data: List[Dict[str, Any]],
|
||
) -> Dict[str, int]:
|
||
stats = {"tags_written": 0, "covers_embedded": 0, "files_renamed": 0, "errors": 0}
|
||
|
||
for tp in proposal.tracks:
|
||
old_title = tp.path.stem
|
||
old_artist = ""
|
||
if HAS_MUTAGEN:
|
||
try:
|
||
audio = MutagenFile(str(tp.path), easy=True)
|
||
if audio and audio.tags:
|
||
old_artist = str(audio.tags.get("artist", [""])[0])
|
||
old_title = str(audio.tags.get("title", [tp.path.stem])[0])
|
||
except Exception:
|
||
pass
|
||
|
||
new_path = tp.path
|
||
renamed_to = ""
|
||
cover_embedded = False
|
||
|
||
if not dry_run:
|
||
if backup_dir:
|
||
backup_file(tp.path, backup_dir)
|
||
|
||
if write_tags(tp.path, tp, proposal):
|
||
stats["tags_written"] += 1
|
||
else:
|
||
stats["errors"] += 1
|
||
|
||
if embed_cover_art and proposal.cover_path:
|
||
if embed_cover(tp.path, proposal.cover_path):
|
||
stats["covers_embedded"] += 1
|
||
cover_embedded = True
|
||
|
||
if do_rename:
|
||
new_name = _proposed_filename(
|
||
tp, tp.path.suffix,
|
||
albumartist=proposal.albumartist or "",
|
||
genre=proposal.genre or "",
|
||
)
|
||
candidate = tp.path.parent / new_name
|
||
if candidate != tp.path:
|
||
try:
|
||
tp.path.rename(candidate)
|
||
new_path = candidate
|
||
renamed_to = new_name
|
||
stats["files_renamed"] += 1
|
||
except Exception as e:
|
||
print(f" ⚠️ Umbenennungsfehler {tp.path.name}: {e}", file=sys.stderr)
|
||
stats["errors"] += 1
|
||
|
||
report_data.append({
|
||
"status": "dry-run" if dry_run else "ok",
|
||
"album_dir": str(proposal.album_dir.name),
|
||
"track_path": str(new_path),
|
||
"old_title": old_title,
|
||
"new_title": tp.title,
|
||
"old_artist": old_artist,
|
||
"new_artist": tp.artist,
|
||
"album": proposal.album,
|
||
"albumartist": proposal.albumartist,
|
||
"date": proposal.date or "",
|
||
"genre": proposal.genre or "",
|
||
"label": proposal.label or "",
|
||
"track_number": tp.track_number or "",
|
||
"disc_number": tp.disc_number or "",
|
||
"cover_embedded": cover_embedded,
|
||
"renamed_to": renamed_to,
|
||
"confidence": f"{proposal.confidence:.2f}",
|
||
"sources": ", ".join(proposal.sources),
|
||
})
|
||
|
||
# Nach allen Umbenennungen: Verzeichnis Linux-kompatibel bereinigen
|
||
if do_rename and not dry_run:
|
||
sanitize_dir_names(proposal.album_dir)
|
||
|
||
return stats
|
||
|
||
|
||
def sanitize_dir_names(directory: Path) -> None:
|
||
"""
|
||
Macht alle Dateinamen im Verzeichnis Linux-kompatibel.
|
||
Bevorzugt 'NameToUnix <dir>', fällt auf 'detox <file>' zurück.
|
||
"""
|
||
name_to_unix = shutil.which("NameToUnix")
|
||
if name_to_unix:
|
||
try:
|
||
subprocess.run([name_to_unix, str(directory)], check=True, capture_output=True)
|
||
return
|
||
except subprocess.CalledProcessError as e:
|
||
print(f" ⚠️ NameToUnix-Fehler: {e.stderr.decode(errors='replace').strip()}", file=sys.stderr)
|
||
|
||
detox = shutil.which("detox")
|
||
if detox:
|
||
for f in sorted(directory.rglob("*")):
|
||
if f.is_file():
|
||
try:
|
||
subprocess.run([detox, str(f)], check=True, capture_output=True)
|
||
except subprocess.CalledProcessError as e:
|
||
print(f" ⚠️ detox-Fehler {f.name}: {e.stderr.decode(errors='replace').strip()}", file=sys.stderr)
|
||
else:
|
||
print(" ℹ️ Weder NameToUnix noch detox gefunden — Dateinamen nicht nachbereinigt.", file=sys.stderr)
|
||
|
||
|
||
def write_report(report_data: List[Dict[str, Any]], report_path: Path) -> None:
|
||
try:
|
||
report_path.parent.mkdir(parents=True, exist_ok=True)
|
||
with report_path.open("w", encoding="utf-8", newline="") as f:
|
||
w = csv.DictWriter(f, fieldnames=REPORT_FIELDS)
|
||
w.writeheader()
|
||
w.writerows(report_data)
|
||
print(f"📊 Report gespeichert: {report_path}")
|
||
except Exception as e:
|
||
print(f"⚠️ Report-Fehler: {e}", file=sys.stderr)
|