Strip non-timestamp characters (BOM, invisible chars) from date/year values both when reading existing tags in metadata_resolver and when writing in executor. Also harden the EasyID3 except block to not wipe existing tags when adding a missing ID3 header, and add per-field try/except in MP3 tag writing so one bad field doesn't abort the entire track. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
242 lines
8.3 KiB
Python
242 lines
8.3 KiB
Python
from __future__ import annotations
|
|
|
|
import csv
|
|
import re
|
|
import shutil
|
|
import sys
|
|
from pathlib import Path
|
|
from typing import Optional, List, Dict, Any
|
|
|
|
from models import AlbumProposal, TrackProposal
|
|
|
|
try:
|
|
from mutagen import File as MutagenFile
|
|
from mutagen.easyid3 import EasyID3
|
|
from mutagen.flac import FLAC
|
|
from mutagen.mp4 import MP4, MP4Tags
|
|
HAS_MUTAGEN = True
|
|
except ImportError:
|
|
HAS_MUTAGEN = False
|
|
|
|
from cover_handler import embed_cover
|
|
|
|
_SAFE_RE = re.compile(r'[<>:"/\\|?*\x00-\x1f]')
|
|
REPORT_FIELDS = [
|
|
"status", "album_dir", "track_path",
|
|
"old_title", "new_title",
|
|
"old_artist", "new_artist",
|
|
"album", "albumartist", "date", "genre", "label",
|
|
"track_number", "disc_number",
|
|
"cover_embedded", "renamed_to",
|
|
"confidence", "sources",
|
|
]
|
|
|
|
|
|
def _safe_name(s: str) -> str:
|
|
return _SAFE_RE.sub("_", s).strip(". ")
|
|
|
|
|
|
def _proposed_filename(proposal: TrackProposal, ext: str) -> str:
|
|
tn = f"{proposal.track_number:02d}" if proposal.track_number else "00"
|
|
prefix = f"{proposal.disc_number}-{tn}" if proposal.disc_number and proposal.disc_number > 1 else tn
|
|
artist = _safe_name(proposal.artist or "Unknown")
|
|
title = _safe_name(proposal.title or "Unknown")
|
|
return f"{prefix} - {artist} - {title}{ext}"
|
|
|
|
|
|
def backup_file(path: Path, backup_dir: Path) -> bool:
|
|
try:
|
|
backup_dir.mkdir(parents=True, exist_ok=True)
|
|
rel = path.parent.name + "__" + path.name
|
|
dest = backup_dir / rel
|
|
if not dest.exists():
|
|
shutil.copy2(path, dest)
|
|
return True
|
|
except Exception as e:
|
|
print(f" ⚠️ Backup-Fehler {path.name}: {e}", file=sys.stderr)
|
|
return False
|
|
|
|
|
|
def write_tags(path: Path, proposal: TrackProposal, album_proposal: AlbumProposal) -> bool:
|
|
if not HAS_MUTAGEN:
|
|
return False
|
|
ext = path.suffix.lower()
|
|
tags_to_write = {
|
|
"title": proposal.title or "",
|
|
"artist": proposal.artist or "",
|
|
"album": album_proposal.album or "",
|
|
"albumartist": album_proposal.albumartist or "",
|
|
}
|
|
if proposal.track_number:
|
|
total = len(album_proposal.tracks)
|
|
tags_to_write["tracknumber"] = f"{proposal.track_number}/{total}"
|
|
if proposal.disc_number:
|
|
tags_to_write["discnumber"] = str(proposal.disc_number)
|
|
if album_proposal.date:
|
|
# Strip everything except valid ID3 timestamp characters to prevent ID3TimeStamp errors
|
|
date_clean = re.sub(r"[^\d\-T:+Z]", "", str(album_proposal.date)).strip()
|
|
if date_clean:
|
|
tags_to_write["date"] = date_clean
|
|
if album_proposal.genre:
|
|
tags_to_write["genre"] = album_proposal.genre
|
|
if album_proposal.label:
|
|
tags_to_write["organization"] = album_proposal.label
|
|
|
|
try:
|
|
if ext == ".mp3":
|
|
try:
|
|
audio = EasyID3(str(path))
|
|
except Exception:
|
|
# File has no ID3 header — add one without wiping audio data
|
|
from mutagen.id3 import ID3NoHeaderError
|
|
try:
|
|
from mutagen.mp3 import MP3
|
|
full = MP3(str(path))
|
|
full.tags = None
|
|
full.add_tags()
|
|
full.save(str(path), v2_version=4)
|
|
except Exception:
|
|
pass
|
|
audio = EasyID3(str(path))
|
|
for k, v in tags_to_write.items():
|
|
try:
|
|
audio[k] = [v]
|
|
except Exception as tag_err:
|
|
print(f" ⚠️ Tag-Feld '{k}' übersprungen ({path.name}): {tag_err}", file=sys.stderr)
|
|
audio.save(v2_version=4)
|
|
return True
|
|
|
|
elif ext == ".flac":
|
|
audio = FLAC(str(path))
|
|
for k, v in tags_to_write.items():
|
|
audio[k] = [v]
|
|
audio.save()
|
|
return True
|
|
|
|
elif ext == ".m4a":
|
|
audio = MP4(str(path))
|
|
mapping = {
|
|
"title": "\xa9nam", "artist": "\xa9ART",
|
|
"album": "\xa9alb", "albumartist": "aART",
|
|
"tracknumber": "trkn", "date": "\xa9day",
|
|
"genre": "\xa9gen",
|
|
}
|
|
for k, v in tags_to_write.items():
|
|
tag_key = mapping.get(k)
|
|
if tag_key:
|
|
if tag_key == "trkn":
|
|
try:
|
|
num, total = v.split("/") if "/" in v else (v, "0")
|
|
audio[tag_key] = [(int(num), int(total))]
|
|
except Exception:
|
|
pass
|
|
else:
|
|
audio[tag_key] = [v]
|
|
audio.save()
|
|
return True
|
|
|
|
else:
|
|
audio = MutagenFile(str(path), easy=True)
|
|
if audio is not None:
|
|
if audio.tags is None:
|
|
audio.add_tags()
|
|
for k, v in tags_to_write.items():
|
|
try:
|
|
audio[k] = [v]
|
|
except Exception:
|
|
pass
|
|
audio.save()
|
|
return True
|
|
|
|
except Exception as e:
|
|
print(f" ⚠️ Tag-Schreibfehler {path.name}: {e}", file=sys.stderr)
|
|
return False
|
|
|
|
|
|
def execute_album(
|
|
proposal: AlbumProposal,
|
|
backup_dir: Optional[Path],
|
|
do_rename: bool,
|
|
embed_cover_art: bool,
|
|
dry_run: bool,
|
|
report_data: List[Dict[str, Any]],
|
|
) -> Dict[str, int]:
|
|
stats = {"tags_written": 0, "covers_embedded": 0, "files_renamed": 0, "errors": 0}
|
|
|
|
for tp in proposal.tracks:
|
|
old_title = tp.path.stem
|
|
old_artist = ""
|
|
if HAS_MUTAGEN:
|
|
try:
|
|
audio = MutagenFile(str(tp.path), easy=True)
|
|
if audio and audio.tags:
|
|
old_artist = str(audio.tags.get("artist", [""])[0])
|
|
old_title = str(audio.tags.get("title", [tp.path.stem])[0])
|
|
except Exception:
|
|
pass
|
|
|
|
new_path = tp.path
|
|
renamed_to = ""
|
|
cover_embedded = False
|
|
|
|
if not dry_run:
|
|
if backup_dir:
|
|
backup_file(tp.path, backup_dir)
|
|
|
|
if write_tags(tp.path, tp, proposal):
|
|
stats["tags_written"] += 1
|
|
else:
|
|
stats["errors"] += 1
|
|
|
|
if embed_cover_art and proposal.cover_path:
|
|
if embed_cover(tp.path, proposal.cover_path):
|
|
stats["covers_embedded"] += 1
|
|
cover_embedded = True
|
|
|
|
if do_rename:
|
|
new_name = _proposed_filename(tp, tp.path.suffix)
|
|
candidate = tp.path.parent / new_name
|
|
if candidate != tp.path:
|
|
try:
|
|
tp.path.rename(candidate)
|
|
new_path = candidate
|
|
renamed_to = new_name
|
|
stats["files_renamed"] += 1
|
|
except Exception as e:
|
|
print(f" ⚠️ Umbenennungsfehler {tp.path.name}: {e}", file=sys.stderr)
|
|
stats["errors"] += 1
|
|
|
|
report_data.append({
|
|
"status": "dry-run" if dry_run else "ok",
|
|
"album_dir": str(proposal.album_dir.name),
|
|
"track_path": str(new_path),
|
|
"old_title": old_title,
|
|
"new_title": tp.title,
|
|
"old_artist": old_artist,
|
|
"new_artist": tp.artist,
|
|
"album": proposal.album,
|
|
"albumartist": proposal.albumartist,
|
|
"date": proposal.date or "",
|
|
"genre": proposal.genre or "",
|
|
"label": proposal.label or "",
|
|
"track_number": tp.track_number or "",
|
|
"disc_number": tp.disc_number or "",
|
|
"cover_embedded": cover_embedded,
|
|
"renamed_to": renamed_to,
|
|
"confidence": f"{proposal.confidence:.2f}",
|
|
"sources": ", ".join(proposal.sources),
|
|
})
|
|
|
|
return stats
|
|
|
|
|
|
def write_report(report_data: List[Dict[str, Any]], report_path: Path) -> None:
|
|
try:
|
|
report_path.parent.mkdir(parents=True, exist_ok=True)
|
|
with report_path.open("w", encoding="utf-8", newline="") as f:
|
|
w = csv.DictWriter(f, fieldnames=REPORT_FIELDS)
|
|
w.writeheader()
|
|
w.writerows(report_data)
|
|
print(f"📊 Report gespeichert: {report_path}")
|
|
except Exception as e:
|
|
print(f"⚠️ Report-Fehler: {e}", file=sys.stderr)
|