diff --git a/cover_handler.py b/cover_handler.py
deleted file mode 100644
index 221c772..0000000
--- a/cover_handler.py
+++ /dev/null
@@ -1,171 +0,0 @@
-from __future__ import annotations
-
-import sys
-import tempfile
-import time
-from pathlib import Path
-from typing import Optional, List
-
-try:
- from PIL import Image
- HAS_PIL = True
-except ImportError:
- HAS_PIL = False
-
-try:
- import requests
- HAS_REQUESTS = True
-except ImportError:
- HAS_REQUESTS = False
-
-try:
- import musicbrainzngs as mb
- HAS_MB = True
-except ImportError:
- HAS_MB = False
-
-try:
- from mutagen.id3 import ID3, APIC, error as ID3Error
- from mutagen.flac import FLAC, Picture
- from mutagen.mp4 import MP4, MP4Cover
- HAS_MUTAGEN = True
-except ImportError:
- HAS_MUTAGEN = False
-
-_MIN_COVER_SIZE = 200 # pixels
-
-
-def _image_ok(path: Path) -> bool:
- if not HAS_PIL:
- return path.stat().st_size > 5000
- try:
- with Image.open(path) as img:
- w, h = img.size
- return w >= _MIN_COVER_SIZE and h >= _MIN_COVER_SIZE
- except Exception:
- return False
-
-
-def find_local_cover(image_files: List[Path]) -> Optional[Path]:
- priority = ("front", "folder", "cover", "album")
- # Sort by priority keyword, then size descending
- def key(p: Path):
- name = p.name.lower()
- score = next((i for i, kw in enumerate(priority) if kw in name), len(priority))
- size = p.stat().st_size if p.exists() else 0
- return (score, -size)
-
- for p in sorted(image_files, key=key):
- if _image_ok(p):
- return p
- return None
-
-
-def _mb_cover_url(release_mbid: str) -> Optional[str]:
- url = f"https://coverartarchive.org/release/{release_mbid}/front"
- if not HAS_REQUESTS:
- return None
- try:
- r = requests.head(url, timeout=5, allow_redirects=True)
- if r.status_code == 200:
- return url
- except Exception:
- pass
- return None
-
-
-def download_cover(release_mbid: Optional[str], dest_dir: Path) -> Optional[Path]:
- if not release_mbid or not HAS_REQUESTS:
- return None
- url = _mb_cover_url(release_mbid)
- if not url:
- return None
- try:
- r = requests.get(url, timeout=15)
- if r.status_code == 200:
- ext = ".jpg"
- ct = r.headers.get("content-type", "")
- if "png" in ct:
- ext = ".png"
- dest = dest_dir / f"_cover_download{ext}"
- dest.write_bytes(r.content)
- if _image_ok(dest):
- return dest
- dest.unlink(missing_ok=True)
- except Exception as e:
- print(f" ⚠️ Cover-Download-Fehler: {e}", file=sys.stderr)
- return None
-
-
-def embed_cover(audio_path: Path, cover_path: Path) -> bool:
- if not HAS_MUTAGEN:
- return False
- try:
- img_data = cover_path.read_bytes()
- mime = "image/jpeg" if cover_path.suffix.lower() in (".jpg", ".jpeg") else "image/png"
- ext = audio_path.suffix.lower()
-
- if ext == ".mp3":
- try:
- tags = ID3(str(audio_path))
- except ID3Error:
- tags = ID3()
- tags.delall("APIC")
- tags.add(APIC(encoding=3, mime=mime, type=3, desc="Cover", data=img_data))
- tags.save(str(audio_path), v2_version=4)
- return True
-
- elif ext == ".flac":
- audio = FLAC(str(audio_path))
- audio.clear_pictures()
- pic = Picture()
- pic.type = 3
- pic.mime = mime
- pic.desc = "Cover"
- pic.data = img_data
- audio.add_picture(pic)
- audio.save()
- return True
-
- elif ext == ".m4a":
- audio = MP4(str(audio_path))
- fmt = MP4Cover.FORMAT_JPEG if mime == "image/jpeg" else MP4Cover.FORMAT_PNG
- audio.tags["covr"] = [MP4Cover(img_data, imageformat=fmt)]
- audio.save()
- return True
-
- else:
- # Generic mutagen fallback
- from mutagen import File as MutagenFile
- audio = MutagenFile(str(audio_path), easy=False)
- if audio is not None:
- if audio.tags is None:
- audio.add_tags()
- if hasattr(audio.tags, "add"):
- audio.tags.add(
- APIC(encoding=3, mime=mime, type=3, desc="Cover", data=img_data)
- )
- audio.save()
- return True
-
- except Exception as e:
- print(f" ⚠️ Cover-Einbettungsfehler {audio_path.name}: {e}", file=sys.stderr)
- return False
-
-
-def resolve_cover(
- image_files: List[Path],
- release_mbid: Optional[str],
- album_dir: Path,
-) -> tuple[Optional[Path], Optional[str]]:
- """Returns (cover_path, source_label)."""
- local = find_local_cover(image_files)
- if local:
- return local, "local"
-
- if release_mbid:
- downloaded = download_cover(release_mbid, album_dir)
- if downloaded:
- return downloaded, "musicbrainz"
-
- return None, None
diff --git a/executor.py b/executor.py
deleted file mode 100644
index 42c19cb..0000000
--- a/executor.py
+++ /dev/null
@@ -1,368 +0,0 @@
-from __future__ import annotations
-
-import csv
-import re
-import shutil
-import subprocess
-import sys
-from pathlib import Path
-from typing import Optional, List, Dict, Any
-
-from models import AlbumProposal, TrackProposal
-
-try:
- from mutagen import File as MutagenFile
- from mutagen.easyid3 import EasyID3
- from mutagen.flac import FLAC
- from mutagen.mp4 import MP4, MP4Tags
- HAS_MUTAGEN = True
-except ImportError:
- HAS_MUTAGEN = False
-
-from cover_handler import embed_cover
-
-_SAFE_RE = re.compile(r'[<>:"/\\|?*\x00-\x1f]')
-_CLASSICAL_GENRES = re.compile(
- r"(?i)class|baroque|romantic|renaissance|opera|symphony|chamber|concerto|sonata|oratorio"
-)
-REPORT_FIELDS = [
- "status", "album_dir", "track_path",
- "old_title", "new_title",
- "old_artist", "new_artist",
- "album", "albumartist", "date", "genre", "label",
- "track_number", "disc_number",
- "cover_embedded", "renamed_to",
- "confidence", "sources",
-]
-
-
-def _safe_name(s: str) -> str:
- """Filesystem-safe name: illegal chars → '_', spaces → '_'."""
- s = _SAFE_RE.sub("_", s)
- return re.sub(r"\s+", "_", s).strip("._-")
-
-
-def _is_classical(albumartist: str, track_artist: str, genre: str) -> bool:
- """
- Classical schema applies when performer (albumartist) ≠ composer (track_artist),
- which covers both 'real' classical music and jazz-on-classical-themes albums.
- Genre keyword matching is used as additional signal but not required.
- """
- aa = (albumartist or "").casefold().strip()
- ta = (track_artist or "").casefold().strip()
- if not aa or aa in ("various artists", "unknown artist", "unknown"):
- return False
- if aa == ta:
- return False
- return True # performer ≠ composer → classical naming
-
-
-def _proposed_filename(
- proposal: TrackProposal,
- ext: str,
- albumartist: str = "",
- genre: str = "",
-) -> str:
- """
- Pop/Default: TT_-_Artist_-_Titel.ext
- Klassik: TT_-_Performer_-_Komponist_-_Titel[-_Orchester_Dirigent].ext
-
- Separator zwischen Teilen: _-_
- Leerzeichen innerhalb von Namen: _
- Fehlende Teile werden weggelassen.
- """
- tn = f"{proposal.track_number:02d}" if proposal.track_number else "00"
- # Wenn disc_number gesetzt (auch disc=1): immer "D-TT" — konsistent über alle CDs.
- # disc=None (Einzel-CD ohne Tag): nur "TT".
- disc_prefix = f"{proposal.disc_number}-" if proposal.disc_number else ""
- prefix = f"{disc_prefix}{tn}"
-
- track_artist = _safe_name(proposal.artist or "Unknown")
- aa = _safe_name(albumartist)
- title = _safe_name(proposal.title or "Unknown")
-
- if _is_classical(aa, track_artist, genre):
- # Klassik-Schema: Performer _-_ Komponist _-_ Werk [_-_ Orchester,Dirigent]
- parts = [prefix, aa, track_artist, title]
- # Orchester und Dirigent anhängen wenn vorhanden
- extra = "_".join(filter(None, [
- _safe_name(proposal.orchestra or ""),
- _safe_name(proposal.conductor or ""),
- ]))
- if extra:
- parts.append(extra)
- return "_-_".join(parts) + ext
- else:
- # Pop/Default-Schema: Tracknummer _-_ Artist _-_ Titel
- return f"{prefix}_-_{track_artist}_-_{title}{ext}"
-
-
-def backup_file(path: Path, backup_dir: Path) -> bool:
- try:
- backup_dir.mkdir(parents=True, exist_ok=True)
- rel = path.parent.name + "__" + path.name
- dest = backup_dir / rel
- if not dest.exists():
- shutil.copy2(path, dest)
- return True
- except Exception as e:
- print(f" ⚠️ Backup-Fehler {path.name}: {e}", file=sys.stderr)
- return False
-
-
-def write_tags(path: Path, proposal: TrackProposal, album_proposal: AlbumProposal) -> bool:
- if not HAS_MUTAGEN:
- return False
- ext = path.suffix.lower()
- tags_to_write = {
- "title": proposal.title or "",
- "artist": proposal.artist or "",
- "album": album_proposal.album or "",
- "albumartist": album_proposal.albumartist or "",
- }
- if proposal.track_number:
- total = len(album_proposal.tracks)
- tags_to_write["tracknumber"] = f"{proposal.track_number}/{total}"
- if proposal.disc_number:
- tags_to_write["discnumber"] = str(proposal.disc_number)
- if album_proposal.date:
- # Strip everything except valid ID3 timestamp characters to prevent ID3TimeStamp errors
- date_clean = re.sub(r"[^\d\-T:+Z]", "", str(album_proposal.date)).strip()
- if date_clean:
- tags_to_write["date"] = date_clean
- if album_proposal.genre:
- tags_to_write["genre"] = album_proposal.genre
- if album_proposal.label:
- tags_to_write["organization"] = album_proposal.label
-
- try:
- if ext == ".mp3":
- try:
- audio = EasyID3(str(path))
- except Exception:
- # File has no ID3 header — add one without wiping audio data
- from mutagen.id3 import ID3NoHeaderError
- try:
- from mutagen.mp3 import MP3
- full = MP3(str(path))
- full.tags = None
- full.add_tags()
- full.save(str(path), v2_version=4)
- except Exception:
- pass
- audio = EasyID3(str(path))
- for k, v in tags_to_write.items():
- try:
- audio[k] = [v]
- except Exception as tag_err:
- print(f" ⚠️ Tag-Feld '{k}' übersprungen ({path.name}): {tag_err}", file=sys.stderr)
- audio.save(v2_version=4)
- return True
-
- elif ext == ".flac":
- audio = FLAC(str(path))
- for k, v in tags_to_write.items():
- audio[k] = [v]
- audio.save()
- return True
-
- elif ext == ".m4a":
- audio = MP4(str(path))
- mapping = {
- "title": "\xa9nam", "artist": "\xa9ART",
- "album": "\xa9alb", "albumartist": "aART",
- "tracknumber": "trkn", "date": "\xa9day",
- "genre": "\xa9gen",
- }
- for k, v in tags_to_write.items():
- tag_key = mapping.get(k)
- if tag_key:
- if tag_key == "trkn":
- try:
- num, total = v.split("/") if "/" in v else (v, "0")
- audio[tag_key] = [(int(num), int(total))]
- except Exception:
- pass
- else:
- audio[tag_key] = [v]
- audio.save()
- return True
-
- else:
- audio = MutagenFile(str(path), easy=True)
- if audio is not None:
- if audio.tags is None:
- audio.add_tags()
- for k, v in tags_to_write.items():
- try:
- audio[k] = [v]
- except Exception:
- pass
- audio.save()
- return True
-
- except Exception as e:
- print(f" ⚠️ Tag-Schreibfehler {path.name}: {e}", file=sys.stderr)
- return False
-
-
-def _update_m3u(m3u_path: Path, tracks: List[tuple]) -> bool:
- """
- Schreibt M3U neu mit den umbenannten Dateien in Track-Reihenfolge.
- tracks: [(TrackProposal, actual_path_after_rename), ...]
- """
- try:
- lines = ["#EXTM3U"]
- for tp, track_path in tracks:
- duration = -1
- if HAS_MUTAGEN:
- try:
- audio = MutagenFile(str(track_path))
- if audio and hasattr(audio, "info") and audio.info:
- duration = int(audio.info.length)
- except Exception:
- pass
- label = f"{tp.artist} - {tp.title}" if tp.artist else (tp.title or track_path.stem)
- lines.append(f"#EXTINF:{duration},{label}")
- lines.append(track_path.name)
- m3u_path.write_text("\n".join(lines) + "\n", encoding="utf-8")
- print(f" 📋 Playlist aktualisiert: {m3u_path.name}")
- return True
- except Exception as e:
- print(f" ⚠️ M3U-Fehler {m3u_path.name}: {e}", file=sys.stderr)
- return False
-
-
-def execute_album(
- proposal: AlbumProposal,
- backup_dir: Optional[Path],
- do_rename: bool,
- embed_cover_art: bool,
- dry_run: bool,
- report_data: List[Dict[str, Any]],
-) -> Dict[str, int]:
- stats = {"tags_written": 0, "covers_embedded": 0, "files_renamed": 0, "errors": 0}
- final_tracks: List[tuple] = [] # (TrackProposal, final_path) für M3U
-
- for tp in proposal.tracks:
- old_title = tp.path.stem
- old_artist = ""
- if HAS_MUTAGEN:
- try:
- audio = MutagenFile(str(tp.path), easy=True)
- if audio and audio.tags:
- old_artist = str(audio.tags.get("artist", [""])[0])
- old_title = str(audio.tags.get("title", [tp.path.stem])[0])
- except Exception:
- pass
-
- new_path = tp.path
- renamed_to = ""
- cover_embedded = False
-
- if not dry_run:
- if backup_dir:
- backup_file(tp.path, backup_dir)
-
- if write_tags(tp.path, tp, proposal):
- stats["tags_written"] += 1
- else:
- stats["errors"] += 1
-
- if embed_cover_art and proposal.cover_path:
- if embed_cover(tp.path, proposal.cover_path):
- stats["covers_embedded"] += 1
- cover_embedded = True
-
- if do_rename:
- new_name = _proposed_filename(
- tp, tp.path.suffix,
- albumartist=proposal.albumartist or "",
- genre=proposal.genre or "",
- )
- candidate = tp.path.parent / new_name
- if candidate != tp.path:
- try:
- tp.path.rename(candidate)
- new_path = candidate
- renamed_to = new_name
- stats["files_renamed"] += 1
- except Exception as e:
- print(f" ⚠️ Umbenennungsfehler {tp.path.name}: {e}", file=sys.stderr)
- stats["errors"] += 1
-
- if not dry_run:
- final_tracks.append((tp, new_path))
-
- report_data.append({
- "status": "dry-run" if dry_run else "ok",
- "album_dir": str(proposal.album_dir.name),
- "track_path": str(new_path),
- "old_title": old_title,
- "new_title": tp.title,
- "old_artist": old_artist,
- "new_artist": tp.artist,
- "album": proposal.album,
- "albumartist": proposal.albumartist,
- "date": proposal.date or "",
- "genre": proposal.genre or "",
- "label": proposal.label or "",
- "track_number": tp.track_number or "",
- "disc_number": tp.disc_number or "",
- "cover_embedded": cover_embedded,
- "renamed_to": renamed_to,
- "confidence": f"{proposal.confidence:.2f}",
- "sources": ", ".join(proposal.sources),
- })
-
- # M3U-Playlist aktualisieren wenn Dateien umbenannt wurden
- if do_rename and not dry_run and stats["files_renamed"] > 0 and final_tracks:
- m3u_files = (
- list(proposal.album_dir.glob("*.m3u")) +
- list(proposal.album_dir.glob("*.m3u8"))
- )
- if m3u_files:
- _update_m3u(m3u_files[0], final_tracks)
-
- # Nach allen Umbenennungen: Verzeichnis Linux-kompatibel bereinigen
- if do_rename and not dry_run:
- sanitize_dir_names(proposal.album_dir)
-
- return stats
-
-
-def sanitize_dir_names(directory: Path) -> None:
- """
- Macht alle Dateinamen im Verzeichnis Linux-kompatibel.
- Bevorzugt 'NameToUnix
', fällt auf 'detox ' zurück.
- """
- name_to_unix = shutil.which("NameToUnix")
- if name_to_unix:
- try:
- subprocess.run([name_to_unix, str(directory)], check=True, capture_output=True)
- return
- except subprocess.CalledProcessError as e:
- print(f" ⚠️ NameToUnix-Fehler: {e.stderr.decode(errors='replace').strip()}", file=sys.stderr)
-
- detox = shutil.which("detox")
- if detox:
- for f in sorted(directory.rglob("*")):
- if f.is_file():
- try:
- subprocess.run([detox, str(f)], check=True, capture_output=True)
- except subprocess.CalledProcessError as e:
- print(f" ⚠️ detox-Fehler {f.name}: {e.stderr.decode(errors='replace').strip()}", file=sys.stderr)
- else:
- print(" ℹ️ Weder NameToUnix noch detox gefunden — Dateinamen nicht nachbereinigt.", file=sys.stderr)
-
-
-def write_report(report_data: List[Dict[str, Any]], report_path: Path) -> None:
- try:
- report_path.parent.mkdir(parents=True, exist_ok=True)
- with report_path.open("w", encoding="utf-8", newline="") as f:
- w = csv.DictWriter(f, fieldnames=REPORT_FIELDS)
- w.writeheader()
- w.writerows(report_data)
- print(f"📊 Report gespeichert: {report_path}")
- except Exception as e:
- print(f"⚠️ Report-Fehler: {e}", file=sys.stderr)
diff --git a/hint_extractor.py b/hint_extractor.py
deleted file mode 100755
index 433c71d..0000000
--- a/hint_extractor.py
+++ /dev/null
@@ -1,583 +0,0 @@
-from __future__ import annotations
-
-import base64
-import json
-import os
-import re
-import shutil
-import subprocess
-import sys
-import urllib.request
-from pathlib import Path
-from typing import Optional, List, Dict, Tuple
-
-from models import AlbumScan, AlbumHints, TrackHints
-
-try:
- from mutagen import File as MutagenFile
- HAS_MUTAGEN = True
-except ImportError:
- HAS_MUTAGEN = False
-
-try:
- from bs4 import BeautifulSoup
- HAS_BS4 = True
-except ImportError:
- HAS_BS4 = False
-
-_NATSORT_RE = re.compile(r"(\d+)")
-_BAD_VALUES = {"unknown", "unknown artist", "unknown album", "untitled", "track", "va", "various"}
-
-# Filename patterns: most specific first
-_FILENAME_PATTERNS = [
- re.compile(r"^(?P\d{1,2})[- _]+(?P