from __future__ import annotations import csv import re import shutil import sys from pathlib import Path from typing import Optional, List, Dict, Any from models import AlbumProposal, TrackProposal try: from mutagen import File as MutagenFile from mutagen.easyid3 import EasyID3 from mutagen.flac import FLAC from mutagen.mp4 import MP4, MP4Tags HAS_MUTAGEN = True except ImportError: HAS_MUTAGEN = False from cover_handler import embed_cover _SAFE_RE = re.compile(r'[<>:"/\\|?*\x00-\x1f]') REPORT_FIELDS = [ "status", "album_dir", "track_path", "old_title", "new_title", "old_artist", "new_artist", "album", "albumartist", "date", "genre", "label", "track_number", "disc_number", "cover_embedded", "renamed_to", "confidence", "sources", ] def _safe_name(s: str) -> str: return _SAFE_RE.sub("_", s).strip(". ") def _proposed_filename(proposal: TrackProposal, ext: str) -> str: tn = f"{proposal.track_number:02d}" if proposal.track_number else "00" prefix = f"{proposal.disc_number}-{tn}" if proposal.disc_number and proposal.disc_number > 1 else tn artist = _safe_name(proposal.artist or "Unknown") title = _safe_name(proposal.title or "Unknown") return f"{prefix} - {artist} - {title}{ext}" def backup_file(path: Path, backup_dir: Path) -> bool: try: backup_dir.mkdir(parents=True, exist_ok=True) rel = path.parent.name + "__" + path.name dest = backup_dir / rel if not dest.exists(): shutil.copy2(path, dest) return True except Exception as e: print(f" ⚠️ Backup-Fehler {path.name}: {e}", file=sys.stderr) return False def write_tags(path: Path, proposal: TrackProposal, album_proposal: AlbumProposal) -> bool: if not HAS_MUTAGEN: return False ext = path.suffix.lower() tags_to_write = { "title": proposal.title or "", "artist": proposal.artist or "", "album": album_proposal.album or "", "albumartist": album_proposal.albumartist or "", } if proposal.track_number: total = len(album_proposal.tracks) tags_to_write["tracknumber"] = f"{proposal.track_number}/{total}" if proposal.disc_number: tags_to_write["discnumber"] = str(proposal.disc_number) if album_proposal.date: tags_to_write["date"] = album_proposal.date if album_proposal.genre: tags_to_write["genre"] = album_proposal.genre if album_proposal.label: tags_to_write["organization"] = album_proposal.label try: if ext == ".mp3": try: audio = EasyID3(str(path)) except Exception: audio = EasyID3() audio.save(str(path)) audio = EasyID3(str(path)) for k, v in tags_to_write.items(): audio[k] = [v] audio.save(v2_version=4) return True elif ext == ".flac": audio = FLAC(str(path)) for k, v in tags_to_write.items(): audio[k] = [v] audio.save() return True elif ext == ".m4a": audio = MP4(str(path)) mapping = { "title": "\xa9nam", "artist": "\xa9ART", "album": "\xa9alb", "albumartist": "aART", "tracknumber": "trkn", "date": "\xa9day", "genre": "\xa9gen", } for k, v in tags_to_write.items(): tag_key = mapping.get(k) if tag_key: if tag_key == "trkn": try: num, total = v.split("/") if "/" in v else (v, "0") audio[tag_key] = [(int(num), int(total))] except Exception: pass else: audio[tag_key] = [v] audio.save() return True else: audio = MutagenFile(str(path), easy=True) if audio is not None: if audio.tags is None: audio.add_tags() for k, v in tags_to_write.items(): try: audio[k] = [v] except Exception: pass audio.save() return True except Exception as e: print(f" ⚠️ Tag-Schreibfehler {path.name}: {e}", file=sys.stderr) return False def execute_album( proposal: AlbumProposal, backup_dir: Optional[Path], do_rename: bool, embed_cover_art: bool, dry_run: bool, report_data: List[Dict[str, Any]], ) -> Dict[str, int]: stats = {"tags_written": 0, "covers_embedded": 0, "files_renamed": 0, "errors": 0} for tp in proposal.tracks: old_title = tp.path.stem old_artist = "" if HAS_MUTAGEN: try: audio = MutagenFile(str(tp.path), easy=True) if audio and audio.tags: old_artist = str(audio.tags.get("artist", [""])[0]) old_title = str(audio.tags.get("title", [tp.path.stem])[0]) except Exception: pass new_path = tp.path renamed_to = "" cover_embedded = False if not dry_run: if backup_dir: backup_file(tp.path, backup_dir) if write_tags(tp.path, tp, proposal): stats["tags_written"] += 1 else: stats["errors"] += 1 if embed_cover_art and proposal.cover_path: if embed_cover(tp.path, proposal.cover_path): stats["covers_embedded"] += 1 cover_embedded = True if do_rename: new_name = _proposed_filename(tp, tp.path.suffix) candidate = tp.path.parent / new_name if candidate != tp.path: try: tp.path.rename(candidate) new_path = candidate renamed_to = new_name stats["files_renamed"] += 1 except Exception as e: print(f" ⚠️ Umbenennungsfehler {tp.path.name}: {e}", file=sys.stderr) stats["errors"] += 1 report_data.append({ "status": "dry-run" if dry_run else "ok", "album_dir": str(proposal.album_dir.name), "track_path": str(new_path), "old_title": old_title, "new_title": tp.title, "old_artist": old_artist, "new_artist": tp.artist, "album": proposal.album, "albumartist": proposal.albumartist, "date": proposal.date or "", "genre": proposal.genre or "", "label": proposal.label or "", "track_number": tp.track_number or "", "disc_number": tp.disc_number or "", "cover_embedded": cover_embedded, "renamed_to": renamed_to, "confidence": f"{proposal.confidence:.2f}", "sources": ", ".join(proposal.sources), }) return stats def write_report(report_data: List[Dict[str, Any]], report_path: Path) -> None: try: report_path.parent.mkdir(parents=True, exist_ok=True) with report_path.open("w", encoding="utf-8", newline="") as f: w = csv.DictWriter(f, fieldnames=REPORT_FIELDS) w.writeheader() w.writerows(report_data) print(f"📊 Report gespeichert: {report_path}") except Exception as e: print(f"⚠️ Report-Fehler: {e}", file=sys.stderr)