from __future__ import annotations import csv import re import shutil import subprocess import sys from pathlib import Path from typing import Optional, List, Dict, Any from models import AlbumProposal, TrackProposal try: from mutagen import File as MutagenFile from mutagen.easyid3 import EasyID3 from mutagen.flac import FLAC from mutagen.mp4 import MP4, MP4Tags HAS_MUTAGEN = True except ImportError: HAS_MUTAGEN = False from cover_handler import embed_cover _SAFE_RE = re.compile(r'[<>:"/\\|?*\x00-\x1f]') _CLASSICAL_GENRES = re.compile( r"(?i)class|baroque|romantic|renaissance|opera|symphony|chamber|concerto|sonata|oratorio" ) REPORT_FIELDS = [ "status", "album_dir", "track_path", "old_title", "new_title", "old_artist", "new_artist", "album", "albumartist", "date", "genre", "label", "track_number", "disc_number", "cover_embedded", "renamed_to", "confidence", "sources", ] def _safe_name(s: str) -> str: """Filesystem-safe name: illegal chars → '_', spaces → '_'.""" s = _SAFE_RE.sub("_", s) return re.sub(r"\s+", "_", s).strip("._-") _CLASSICAL_GENRE_KEYWORDS = { "classical", "klassik", "baroque", "barock", "romantic", "romantik", "opera", "oper", "operetta", "operette", "chamber", "kammermusik", "symphon", "concerto", "oratorio", "sacred", "kirchenmusik", "renaissance", "medieval", "contemporary classical", } _CLASSICAL_COMPOSER_KEYWORDS = { # Bekannte Komponisten als Signal (Nachname reicht) "bach", "beethoven", "mozart", "handel", "haydn", "schubert", "brahms", "chopin", "liszt", "schumann", "wagner", "verdi", "puccini", "vivaldi", "telemann", "buxtehude", "monteverdi", "palestrina", "purcell", "mahler", "bruckner", "dvorak", "tchaikovsky", "tschaikowski", "debussy", "ravel", "satie", "strauss", "sibelius", "grieg", } def _is_classical(albumartist: str, track_artist: str, genre: str) -> bool: """ Klassik-Schema (Performer_-_Komponist_-_Werk) wird angewendet wenn: 1. Genre explizit klassisch ist, ODER 2. track_artist ist ein bekannter Komponist (und ≠ albumartist), ODER 3. albumartist ≠ track_artist UND beide sind bekannte Komponistennamen. Reine Performer≠Komponist-Heuristik ohne Genre-Bestätigung ist abgeschaltet (zu viele Falschpositive bei Samplern, Jazz, Volksmusik). """ aa = (albumartist or "").casefold().strip() ta = (track_artist or "").casefold().strip() g = (genre or "").casefold().strip() if not aa or aa in ("various artists", "unknown artist", "unknown"): return False if not ta or ta in ("unknown artist", "unknown"): return False if aa == ta: return False # Primäres Signal: Genre-Keyword if any(kw in g for kw in _CLASSICAL_GENRE_KEYWORDS): return True # Sekundäres Signal: track_artist enthält bekannten Komponistennamen if any(kw in ta for kw in _CLASSICAL_COMPOSER_KEYWORDS): return True return False def _proposed_filename( proposal: TrackProposal, ext: str, albumartist: str = "", genre: str = "", ) -> str: """ Pop/Default: TT_-_Artist_-_Titel.ext Klassik: TT_-_Performer_-_Komponist_-_Titel[-_Orchester_Dirigent].ext Separator zwischen Teilen: _-_ Leerzeichen innerhalb von Namen: _ Fehlende Teile werden weggelassen. """ tn = f"{proposal.track_number:02d}" if proposal.track_number else "00" # disc_prefix nur bei echtem Multi-CD (disc > 1). disc=1 oder None → Einzel-CD → kein Präfix. disc_prefix = f"{proposal.disc_number}-" if (proposal.disc_number and proposal.disc_number > 1) else "" prefix = f"{disc_prefix}{tn}" track_artist = _safe_name(proposal.artist or albumartist or "Unknown") aa = _safe_name(albumartist) title = _safe_name(proposal.title or "Unknown") if _is_classical(aa, track_artist, genre): # Klassik-Schema: Performer _-_ Komponist _-_ Werk [_-_ Orchester,Dirigent] parts = [prefix, aa, track_artist, title] # Orchester und Dirigent anhängen wenn vorhanden extra = "_".join(filter(None, [ _safe_name(proposal.orchestra or ""), _safe_name(proposal.conductor or ""), ])) if extra: parts.append(extra) return "_-_".join(parts) + ext else: # Pop/Default-Schema: Tracknummer _-_ Artist _-_ Titel return f"{prefix}_-_{track_artist}_-_{title}{ext}" def backup_file(path: Path, backup_dir: Path) -> bool: try: backup_dir.mkdir(parents=True, exist_ok=True) rel = path.parent.name + "__" + path.name dest = backup_dir / rel if not dest.exists(): shutil.copy2(path, dest) return True except Exception as e: print(f" ⚠️ Backup-Fehler {path.name}: {e}", file=sys.stderr) return False def write_tags(path: Path, proposal: TrackProposal, album_proposal: AlbumProposal) -> bool: if not HAS_MUTAGEN: return False ext = path.suffix.lower() tags_to_write = { "title": proposal.title or "", "artist": proposal.artist or "", "album": album_proposal.album or "", "albumartist": album_proposal.albumartist or "", } if proposal.track_number: total = len(album_proposal.tracks) tags_to_write["tracknumber"] = f"{proposal.track_number}/{total}" if proposal.disc_number and proposal.disc_number > 1: tags_to_write["discnumber"] = str(proposal.disc_number) if album_proposal.date: # Strip everything except valid ID3 timestamp characters to prevent ID3TimeStamp errors date_clean = re.sub(r"[^\d\-T:+Z]", "", str(album_proposal.date)).strip() if date_clean: tags_to_write["date"] = date_clean if album_proposal.genre: tags_to_write["genre"] = album_proposal.genre if album_proposal.label: tags_to_write["organization"] = album_proposal.label try: if ext == ".mp3": try: audio = EasyID3(str(path)) except Exception: # File has no ID3 header — add one without wiping audio data from mutagen.id3 import ID3NoHeaderError try: from mutagen.mp3 import MP3 full = MP3(str(path)) full.tags = None full.add_tags() full.save(str(path), v2_version=4) except Exception: pass audio = EasyID3(str(path)) for k, v in tags_to_write.items(): try: audio[k] = [v] except Exception as tag_err: print(f" ⚠️ Tag-Feld '{k}' übersprungen ({path.name}): {tag_err}", file=sys.stderr) audio.save(v2_version=4) return True elif ext == ".flac": audio = FLAC(str(path)) for k, v in tags_to_write.items(): audio[k] = [v] audio.save() return True elif ext == ".m4a": audio = MP4(str(path)) mapping = { "title": "\xa9nam", "artist": "\xa9ART", "album": "\xa9alb", "albumartist": "aART", "tracknumber": "trkn", "date": "\xa9day", "genre": "\xa9gen", } for k, v in tags_to_write.items(): tag_key = mapping.get(k) if tag_key: if tag_key == "trkn": try: num, total = v.split("/") if "/" in v else (v, "0") audio[tag_key] = [(int(num), int(total))] except Exception: pass else: audio[tag_key] = [v] audio.save() return True else: audio = MutagenFile(str(path), easy=True) if audio is not None: if audio.tags is None: audio.add_tags() for k, v in tags_to_write.items(): try: audio[k] = [v] except Exception: pass audio.save() return True except Exception as e: print(f" ⚠️ Tag-Schreibfehler {path.name}: {e}", file=sys.stderr) return False def _update_m3u(m3u_path: Path, tracks: List[tuple]) -> bool: """ Schreibt M3U neu mit den umbenannten Dateien in Track-Reihenfolge. tracks: [(TrackProposal, actual_path_after_rename), ...] """ try: lines = ["#EXTM3U"] for tp, track_path in tracks: duration = -1 if HAS_MUTAGEN: try: audio = MutagenFile(str(track_path)) if audio and hasattr(audio, "info") and audio.info: duration = int(audio.info.length) except Exception: pass label = f"{tp.artist} - {tp.title}" if tp.artist else (tp.title or track_path.stem) lines.append(f"#EXTINF:{duration},{label}") lines.append(track_path.name) m3u_path.write_text("\n".join(lines) + "\n", encoding="utf-8") print(f" 📋 Playlist aktualisiert: {m3u_path.name}") return True except Exception as e: print(f" ⚠️ M3U-Fehler {m3u_path.name}: {e}", file=sys.stderr) return False def execute_album( proposal: AlbumProposal, backup_dir: Optional[Path], do_rename: bool, embed_cover_art: bool, dry_run: bool, report_data: List[Dict[str, Any]], ) -> Dict[str, int]: stats = {"tags_written": 0, "covers_embedded": 0, "files_renamed": 0, "errors": 0} final_tracks: List[tuple] = [] # (TrackProposal, final_path) für M3U for tp in proposal.tracks: old_title = tp.path.stem old_artist = "" if HAS_MUTAGEN: try: audio = MutagenFile(str(tp.path), easy=True) if audio and audio.tags: old_artist = str(audio.tags.get("artist", [""])[0]) old_title = str(audio.tags.get("title", [tp.path.stem])[0]) except Exception: pass new_path = tp.path renamed_to = "" cover_embedded = False if not dry_run: if backup_dir: backup_file(tp.path, backup_dir) if write_tags(tp.path, tp, proposal): stats["tags_written"] += 1 else: stats["errors"] += 1 if embed_cover_art and proposal.cover_path: if embed_cover(tp.path, proposal.cover_path): stats["covers_embedded"] += 1 cover_embedded = True if do_rename: new_name = _proposed_filename( tp, tp.path.suffix, albumartist=proposal.albumartist or "", genre=proposal.genre or "", ) candidate = tp.path.parent / new_name if candidate != tp.path: try: tp.path.rename(candidate) new_path = candidate renamed_to = new_name stats["files_renamed"] += 1 except Exception as e: print(f" ⚠️ Umbenennungsfehler {tp.path.name}: {e}", file=sys.stderr) stats["errors"] += 1 if not dry_run: final_tracks.append((tp, new_path)) report_data.append({ "status": "dry-run" if dry_run else "ok", "album_dir": str(proposal.album_dir.name), "track_path": str(new_path), "old_title": old_title, "new_title": tp.title, "old_artist": old_artist, "new_artist": tp.artist, "album": proposal.album, "albumartist": proposal.albumartist, "date": proposal.date or "", "genre": proposal.genre or "", "label": proposal.label or "", "track_number": tp.track_number or "", "disc_number": tp.disc_number or "", "cover_embedded": cover_embedded, "renamed_to": renamed_to, "confidence": f"{proposal.confidence:.2f}", "sources": ", ".join(proposal.sources), }) # M3U-Playlist aktualisieren wenn Dateien umbenannt wurden if do_rename and not dry_run and stats["files_renamed"] > 0 and final_tracks: m3u_files = ( list(proposal.album_dir.glob("*.m3u")) + list(proposal.album_dir.glob("*.m3u8")) ) if m3u_files: _update_m3u(m3u_files[0], final_tracks) # Nach allen Umbenennungen: Verzeichnis Linux-kompatibel bereinigen if do_rename and not dry_run: sanitize_dir_names(proposal.album_dir) return stats def sanitize_dir_names(directory: Path) -> None: """ Macht alle Dateinamen im Verzeichnis Linux-kompatibel. Bevorzugt 'NameToUnix ', fällt auf 'detox ' zurück. """ name_to_unix = shutil.which("NameToUnix") if name_to_unix: try: subprocess.run([name_to_unix, str(directory)], check=True, capture_output=True) return except subprocess.CalledProcessError as e: print(f" ⚠️ NameToUnix-Fehler: {e.stderr.decode(errors='replace').strip()}", file=sys.stderr) detox = shutil.which("detox") if detox: for f in sorted(directory.rglob("*")): if f.is_file(): try: subprocess.run([detox, str(f)], check=True, capture_output=True) except subprocess.CalledProcessError as e: print(f" ⚠️ detox-Fehler {f.name}: {e.stderr.decode(errors='replace').strip()}", file=sys.stderr) else: print(" ℹ️ Weder NameToUnix noch detox gefunden — Dateinamen nicht nachbereinigt.", file=sys.stderr) def write_report(report_data: List[Dict[str, Any]], report_path: Path) -> None: try: report_path.parent.mkdir(parents=True, exist_ok=True) with report_path.open("w", encoding="utf-8", newline="") as f: w = csv.DictWriter(f, fieldnames=REPORT_FIELDS) w.writeheader() w.writerows(report_data) print(f"📊 Report gespeichert: {report_path}") except Exception as e: print(f"⚠️ Report-Fehler: {e}", file=sys.stderr)