diff --git a/BEDIENUNGSANLEITUNG.md b/BEDIENUNGSANLEITUNG.md index cc95448..f4e3fb4 100644 --- a/BEDIENUNGSANLEITUNG.md +++ b/BEDIENUNGSANLEITUNG.md @@ -26,6 +26,7 @@ | `flac` / `lame` / `opusenc` / `ffmpeg` | Encoder je nach Format | je nach Format | | `tesseract` | OCR für Coverbilder | nein (optional) | | Ollama / OpenAI-API | LLM für Tracklisten-Extraktion | nein (optional) | +| Ollama / OpenAI-API | Vision-LLM für Backcover-Analyse (parallel zum Ripping) | nein (optional) | Installation der externen Tools (Debian/Ubuntu): @@ -33,6 +34,12 @@ Installation der externen Tools (Debian/Ubuntu): sudo apt install abcde cdparanoia flac lame opus-tools ffmpeg tesseract-ocr ``` +Firewall (falls aktiv): Port 8765 für Smartphone-Upload freigeben: + +```bash +sudo ufw allow 8765/tcp +``` + --- ## 2. Installation @@ -57,27 +64,41 @@ musiksammlung --help musiksammlung rip │ ▼ -EAN/Barcode eingeben (Enter = überspringen) +Scanner-Server starten (Port 8765) + QR-Code im Terminal + │ + ▼ +EAN/Barcode eingeben ODER CD-Hülle fotografieren + │ + ├─ Foto → KI liest Barcode → Bestätigung/Korrektur │ ├─ MusicBrainz-Treffer → Auto-Rip │ Zeige: Artist – Album (Year), N Discs, M Tracks - │ Für jede Disc: "CD 1/3 einlegen, Enter" → Rip → Rename - │ album.json automatisch aus MusicBrainz-Daten + │ CAA-Cover herunterladen (frontcover.jpg + backcover.jpg) + │ Vision-LLM im Hintergrund starten (analysiert backcover.jpg) ←─┐ + │ Für jede Disc: "CD 1/3 einlegen, Enter" → Rip → Rename │ parallel + │ Vision-LLM-Ergebnis abwarten (max. 120 s, oft schon fertig) ───┘ + │ Priorität: Vision-LLM > MusicBrainz + │ album.json gespeichert │ → direkt weiter mit musiksammlung apply │ └─ Kein Treffer / EAN leer → Fallback - Albumname eingeben → CD-Nummer → Rip → CDDB-Confirm - album.json aus CDDB-Daten (falls bestätigt) + QR-Code im Terminal + Scanner-Server starten (Port 8765) + Albumname eingeben ODER Backcover-Foto hochladen (Auto-Continue) + CD-Nummer → Rip → CDDB-Confirm + Backcover-Foto → Vision-LLM im Hintergrund ←─┐ + abcde sucht MBID → CAA-Cover herunterladen │ parallel + Weitere Discs rippen ───┘ + Vision-LLM-Ergebnis abwarten + Priorität: Vision-LLM > CDDB + album.json gespeichert (ggf. Verzeichnis umbenannt) │ - ├─ album.json vorhanden? - │ ja → direkt weiter mit musiksammlung apply - │ - └─ nein → album.json manuell erzeugen: - A: musiksammlung scan --from-text trackliste.txt - B: musiksammlung scan back.jpg - C: musiksammlung scan back.jpg --vision - D: musiksammlung scan --barcode 0602557360561 - → album.json prüfen/korrigieren + └─ ohne Vision-LLM und ohne CDDB-Treffer: + album.json manuell erzeugen: + A: musiksammlung scan --from-text trackliste.txt + B: musiksammlung scan back.jpg + C: musiksammlung scan back.jpg --vision + D: musiksammlung scan --barcode 0602557360561 + → album.json prüfen/korrigieren │ ▼ musiksammlung apply ← Umbenennen, Tags, Cover, Playlist @@ -112,6 +133,9 @@ Das Programm fragt zuerst nach dem EAN/Barcode. Bei einem MusicBrainz-Treffer st | `-o /pfad` | Ausgabe-Verzeichnis | Standard: `./temp` | | `-d /dev/sr0` | CD-Laufwerk | falls nicht `/dev/cdrom` | | `--no-cddb` | CDDB-Lookup deaktivieren | bei Offline-Betrieb | +| `--vision-model` | Vision-LLM-Modell für Backcover-Analyse | Standard: `qwen3-vl:235b-cloud` | +| `--vision-url` | Ollama/OpenAI-Basis-URL für Vision-LLM | Standard: `http://localhost:11434` | +| `--scanner-port` | Port des Smartphone-Upload-Servers | Standard: `8765` | ### Beispiel: Schnelles FLAC-Ripping mit allen Kernen @@ -129,6 +153,9 @@ EAN/Barcode (Enter = überspringen): 028943753227 MusicBrainz-Suche nach Barcode 028943753227 ... ✓ Herbert von Karajan – Beethoven: 9 Symphonies (1963, 5 Disc(s), 50 Tracks) + Cover-Download: frontcover.jpg, backcover.jpg + [Vision-LLM analysiert backcover.jpg im Hintergrund...] + CD 1/5 einlegen und Enter drücken (9 Tracks) ... Ripping to: temp/Beethoven__9_Symphonies/CD1 @@ -143,21 +170,34 @@ EAN/Barcode (Enter = überspringen): 028943753227 album.json gespeichert: temp/Beethoven__9_Symphonies/album.json +Next steps: + Album 1: Beethoven__9_Symphonies (1963) + 1. Prüfen/bearbeiten: temp/Beethoven__9_Symphonies/album.json + 2. musiksammlung apply temp/Beethoven__9_Symphonies/CD1 \ + temp/Beethoven__9_Symphonies/album.json + Next album? (y/n): n ``` -**Ohne EAN (Fallback):** +**Ohne EAN (Fallback mit Backcover-Foto):** ``` --- Album 1 --- EAN/Barcode (Enter = überspringen): -Album name (Enter = CDDB name / default 'Album1'): Beethoven Sinfonien - Album: Beethoven Sinfonien + Scanner-Server gestartet: http://192.168.1.42:8765 + [QR-Code im Terminal] + Smartphone öffnen → Backcover fotografieren → automatisch weiter + + Album name (Enter = CDDB name / default 'Album1'): [Warten auf Foto oder Texteingabe] + + → Foto empfangen! Vision-LLM analysiert im Hintergrund... + [Programm fährt automatisch fort] + CD Drive: /dev/cdrom CD number [1]: 1 - Ripping to: temp/Beethoven_Sinfonien/CD1 + Ripping to: temp/Album1/CD1 -------------------------------------------------- Track 1/4 Allegro con brio [████████████████░░░░░░░░░░░░░░] 54.3% 18.2 MB @@ -169,14 +209,64 @@ Album name (Enter = CDDB name / default 'Album1'): Beethoven Sinfonien Treffer korrekt? (j/n) [j]: j Umbenennen ... + Vision-LLM: Karajan / Beethoven Sinfonien (1963, 4 Tracks) + Verzeichnis umbenannt: temp/Album1 → temp/Beethoven_Sinfonien + Next CD for this album? (y/n): n + album.json gespeichert: temp/Beethoven_Sinfonien/album.json + +Next steps: + Album 1: Beethoven_Sinfonien + 1. Prüfen/bearbeiten: temp/Beethoven_Sinfonien/album.json + 2. musiksammlung apply temp/Beethoven_Sinfonien/CD1 \ + temp/Beethoven_Sinfonien/album.json + Next album? (y/n): n ``` +### Barcode / EAN per Smartphone scannen + +Das Programm startet zu Beginn jedes Albums automatisch einen lokalen HTTP-Server und zeigt einen QR-Code im Terminal an. Das Smartphone kann damit: + +1. **EAN scannen** — CD-Hülle fotografieren → KI liest den Barcode automatisch aus +2. **Backcover hochladen** — Rückseite fotografieren → Vision-LLM analysiert parallel + +Der Server läuft für das gesamte Album (EAN + Backcover). Beim nächsten Album startet er neu. + +Ablauf EAN-Scan: +``` + Scanner bereit: http://192.168.1.42:8765 + [QR-Code] + EAN/Barcode (Enter = überspringen): [Foto hochladen oder tippen] + Barcode wird per KI erkannt... + Erkannter Barcode: 0028943753227 + Korrekt? (Enter = ja, neuer Wert = tippen): +``` + +### Backcover-Foto per Smartphone hochladen + +Im Fallback-Modus (kein MusicBrainz-Treffer) kann derselbe laufende Scanner-Server auch für das Backcover genutzt werden: + +``` + Scanner-Server gestartet: http://192.168.1.42:8765 + [█████████████] + [█ ██ █ ] ← QR-Code im Terminal + [█████████████] + → Smartphone: QR-Code scannen → Backcover fotografieren +``` + +**Voraussetzungen:** +- Smartphone im gleichen WLAN wie der Rip-Rechner +- Firewall-Port 8765 offen: `sudo ufw allow 8765/tcp` + +Nach dem Hochladen fährt das Programm automatisch fort und startet die Vision-LLM-Analyse im Hintergrund. Sobald der Ripping-Prozess abgeschlossen ist, wird das Ergebnis übernommen. + +Bei MusicBrainz-Treffern wird `backcover.jpg` aus dem Cover Art Archive heruntergeladen — kein Foto nötig. + ### Ergebnis-Verzeichnis -**Wenn CDDB Daten liefert:** +**Wenn CDDB/Vision-LLM Daten liefert:** ``` ~/rip/ Beethoven_Sinfonien/ @@ -184,11 +274,13 @@ Next album? (y/n): n 01_-_Allegro_con_brio_-_Karajan.flac 02_-_Andante_con_moto_-_Karajan.flac ... + frontcover.jpg ← aus CAA oder automatisch hinzugefügt + backcover.jpg ← aus CAA oder Smartphone-Foto album.json ← automatisch gespeichert ``` -→ Direkt weiter mit `musiksammlung apply ~/rip/Beethoven_Sinfonien album.json` +→ Am Ende zeigt das Programm den fertigen `apply`-Befehl an (copy-paste-fähig). -**Wenn kein CDDB-Treffer:** +**Wenn kein CDDB-Treffer und kein Foto:** ``` ~/rip/ Beethoven_Sinfonien/ @@ -377,7 +469,9 @@ Ist bereits ein `frontcover.*` vorhanden (z.B. bei erneutem `apply`), wird es oh Beispiel: `01_-_Allegro_con_brio_-_Karajan.flac` -- Leerzeichen und Satzzeichen → `_` +- Leerzeichen → `_` +- Klammern `()[]{}` und Satzzeichen `,;:!?.'"` → werden entfernt +- Nur Buchstaben, Ziffern, Unterstriche und Bindestriche bleiben erhalten - Mehrere `_` hintereinander → ein `_` - Umlaute (ä, ö, ü, ß) bleiben erhalten - Künstler pro Track: falls im `album.json` ein Track-`artist` gesetzt ist, wird dieser verwendet; sonst der Album-Künstler @@ -424,11 +518,21 @@ musiksammlung process temp/Album/CD1 ~/Musik --back back.jpg ## 9. Tipps und Hinweise -**EAN/Barcode verfügbar? → Schnellster Weg** -- EAN-13 oder UPC-12 von der CD-Hülle ablesen (ggf. Barcode-Scanner-App nutzen) -- Beim `rip`-Befehl wird die EAN als erstes abgefragt — bei MusicBrainz-Treffer startet der Auto-Rip sofort (keine weiteren Fragen) -- Alternativ: `musiksammlung scan --barcode 0602557360561 -o album.json` -- Kein Bild, kein OCR, kein lokales LLM notwendig +**EAN/Barcode eingeben → Schnellster Weg** +- Beim `rip`-Befehl erscheint direkt ein QR-Code im Terminal +- **Option A (tippen):** EAN-13 oder UPC-12 von der CD-Hülle ablesen und eingeben +- **Option B (fotografieren):** CD-Hülle mit dem Smartphone fotografieren → KI liest Barcode automatisch → Bestätigung/Korrektur möglich +- Bei MusicBrainz-Treffer startet der Auto-Rip sofort (keine weiteren Fragen) +- Cover (Front + Rückseite) werden automatisch aus dem Cover Art Archive heruntergeladen +- Alternativ ohne `rip`: `musiksammlung scan --barcode 0602557360561 -o album.json` + +**Backcover-Foto per Smartphone hochladen** +- Derselbe QR-Code / Scanner-Server wird für EAN und Backcover genutzt +- Smartphone muss im gleichen WLAN sein +- Firewall-Port 8765 (TCP) muss offen sein: `sudo ufw allow 8765/tcp` +- Nach dem Hochladen fährt das Programm automatisch fort +- Vision-LLM analysiert das Foto parallel zum Ripping — kein Warten nötig +- Anderes Modell oder URL: `--vision-model qwen3-vl:7b --vision-url http://localhost:11434` **CDDB-Lookup schlägt fehl?** - Internetverbindung prüfen @@ -443,6 +547,7 @@ musiksammlung process temp/Album/CD1 ~/Musik --back back.jpg **Mehrspaltige Trackliste auf dem Backcover?** - OCR erkennt mehrspaltige Layouts oft unvollständig - Vision-LLM verwenden: `--vision --vision-model qwen3-vl:235b-cloud` +- Beim `rip`-Befehl: Backcover per Smartphone hochladen → Vision-LLM läuft automatisch **Mehrere CDs eines Albums (Multi-Disc)?** - Bei EAN-Treffer: MusicBrainz kennt die Disc-Anzahl, der Auto-Rip fordert automatisch jede CD an @@ -450,6 +555,16 @@ musiksammlung process temp/Album/CD1 ~/Musik --back back.jpg - Jede CD erhält ein eigenes Unterverzeichnis `CD1`, `CD2`, ... - `apply` einmal mit dem Album-Verzeichnis aufrufen (nicht pro CD) +**Copy-paste-fähige `apply`-Befehle** +- Am Ende des Rippens zeigt das Programm für jedes Album konkrete Kommandos an: + ``` + Next steps: + Album 1: Beethoven_Sinfonien (1963) + 1. Prüfen/bearbeiten: ~/rip/Beethoven_Sinfonien/album.json + 2. musiksammlung apply ~/rip/Beethoven_Sinfonien/CD1 \ + ~/rip/Beethoven_Sinfonien/album.json + ``` + **Unterstützte Audio-Formate:** | Format | Qualität high | Verwendung | diff --git a/README.md b/README.md index bc87fa3..da43d75 100644 --- a/README.md +++ b/README.md @@ -9,10 +9,12 @@ CLI-Tool zum Digitalisieren von CD-Sammlungen für Jellyfin. **Musiksammlung** automatisiert die Digitalisierung physischer CDs: 1. CDs rippen (via `abcde`, CDDB-Lookup) -2. Coverbilder per OCR oder Vision-LLM analysieren -3. Tracklisten per LLM extrahieren und als JSON speichern -4. Audiodateien umbenennen, taggen und in Jellyfin-Struktur ablegen -5. M3U-Playlisten erzeugen +2. Backcover automatisch via Smartphone-Upload erfassen +3. Backcover per Vision-LLM parallel zum Ripping analysieren (Cloud-Modell, kein lokales VRAM nötig) +4. Coverbilder per OCR oder Vision-LLM analysieren +5. Tracklisten per LLM extrahieren und als JSON speichern +6. Audiodateien umbenennen, taggen und in Jellyfin-Struktur ablegen +7. M3U-Playlisten erzeugen ## Voraussetzungen @@ -20,6 +22,7 @@ CLI-Tool zum Digitalisieren von CD-Sammlungen für Jellyfin. - `abcde` (CD-Ripping) - `tesseract` (OCR, optional) - Ollama oder OpenAI-kompatibles LLM (optional) +- Firewall: Port 8765 (TCP) für Smartphone-Upload offen (`sudo ufw allow 8765/tcp`) ## Installation @@ -31,6 +34,7 @@ pip install -e ".[dev]" ```bash # CDs rippen (interaktiv, EAN-First: MusicBrainz → Auto-Rip bei Treffer) +# QR-Code im Terminal: Backcover mit Smartphone fotografieren → Vision-LLM analysiert parallel musiksammlung rip -j 0 -P # Variante A: EAN/Barcode → MusicBrainz → album.json (schnellste Methode) diff --git a/pyproject.toml b/pyproject.toml index ffcae8f..545e2d2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -13,6 +13,7 @@ dependencies = [ "mutagen>=1.47", "Pillow>=10.0", "httpx>=0.27", + "qrcode>=7.0", ] [project.optional-dependencies] diff --git a/src/musiksammlung/cddb.py b/src/musiksammlung/cddb.py index f9bffb5..00d105f 100644 --- a/src/musiksammlung/cddb.py +++ b/src/musiksammlung/cddb.py @@ -191,7 +191,10 @@ def _read_gnudb(category: str, discid: str) -> CddbResult | None: title=title, )) - logger.info("GnuDB: %d Tracks für '%s' geladen (year=%s, genre=%s)", len(tracks), dtitle, year, dgenre) + logger.info( + "GnuDB: %d Tracks für '%s' geladen (year=%s, genre=%s)", + len(tracks), dtitle, year, dgenre, + ) return CddbResult( tracks=tracks, artist=album_artist, diff --git a/src/musiksammlung/cli.py b/src/musiksammlung/cli.py index a53abe4..b7fb97c 100644 --- a/src/musiksammlung/cli.py +++ b/src/musiksammlung/cli.py @@ -54,7 +54,8 @@ def _scan_to_album( if barcode: typer.echo(f"MusicBrainz-Suche nach Barcode {barcode}...") try: - return lookup_by_barcode(barcode) + album, _mbid = lookup_by_barcode(barcode) + return album except ValueError as e: typer.echo(f"Fehler: {e}", err=True) typer.echo( @@ -338,6 +339,18 @@ def rip( no_cddb: bool = typer.Option( False, "--no-cddb", help="Disable CDDB lookup" ), + vision_model: str = typer.Option( + "qwen3-vl:235b-cloud", "--vision-model", + help="Vision-LLM für Backcover-Analyse (Ollama-Modell)" + ), + vision_url: str = typer.Option( + "http://localhost:11434", "--vision-url", + help="Ollama API URL für Vision-LLM" + ), + scanner_port: int = typer.Option( + 8765, "--scanner-port", + help="Port für den Foto-Upload-Server (Fallback-Pfad)" + ), ) -> None: """Interactive CD ripping with abcde. @@ -377,6 +390,9 @@ def rip( parallel_jobs=parallel, use_pipes=pipes, use_cddb=not no_cddb, + vision_model=vision_model, + vision_url=vision_url, + scanner_port=scanner_port, ) interactive_rip(config) diff --git a/src/musiksammlung/cover.py b/src/musiksammlung/cover.py index dd10688..f8270cd 100644 --- a/src/musiksammlung/cover.py +++ b/src/musiksammlung/cover.py @@ -3,8 +3,10 @@ from __future__ import annotations import logging +import tempfile from pathlib import Path +import httpx from PIL import Image logger = logging.getLogger(__name__) @@ -79,3 +81,48 @@ def copy_covers( prepare_cover(back_image, album_dir / "backcover.jpg") else: logger.debug("Kein Back-Cover angegeben") + + +_CAA_BASE = "https://coverartarchive.org/release" + + +def download_caa_covers(mbid: str, album_dir: Path) -> None: + """Lädt Front- und Back-Cover vom Cover Art Archive herunter. + + Nutzt die CAA-API: GET /release/{mbid}/front bzw. /back. + Bereits vorhandene Cover werden nicht überschrieben. + Fehler (404, Netzwerk) werden geloggt, brechen aber nicht ab. + + Args: + mbid: MusicBrainz Release-MBID + album_dir: Zielverzeichnis für frontcover.jpg / backcover.jpg + """ + for kind, filename in [("front", "frontcover.jpg"), ("back", "backcover.jpg")]: + target = album_dir / filename + if target.exists(): + logger.info("CAA: %s existiert bereits, überspringe.", filename) + continue + + url = f"{_CAA_BASE}/{mbid}/{kind}" + try: + response = httpx.get(url, follow_redirects=True, timeout=30.0) + if response.status_code == 404: + logger.info("CAA: Kein %s-Cover für MBID %s verfügbar.", kind, mbid) + continue + response.raise_for_status() + except httpx.HTTPError as e: + logger.warning("CAA: Fehler beim Download von %s-Cover: %s", kind, e) + continue + + album_dir.mkdir(parents=True, exist_ok=True) + with tempfile.NamedTemporaryFile(suffix=".img", delete=False) as tmp: + tmp.write(response.content) + tmp_path = Path(tmp.name) + + try: + prepare_cover(tmp_path, target, max_size=1200) + logger.info("CAA: %s heruntergeladen → %s", kind, target) + except Exception as e: + logger.warning("CAA: Fehler beim Verarbeiten von %s-Cover: %s", kind, e) + finally: + tmp_path.unlink(missing_ok=True) diff --git a/src/musiksammlung/musicbrainz.py b/src/musiksammlung/musicbrainz.py index 0902b92..77663fa 100644 --- a/src/musiksammlung/musicbrainz.py +++ b/src/musiksammlung/musicbrainz.py @@ -28,7 +28,7 @@ def _get(path: str, params: dict) -> dict: return response.json() -def lookup_by_barcode(ean: str) -> Album: +def lookup_by_barcode(ean: str) -> tuple[Album, str]: """Schlägt ein Album anhand des EAN-Barcodes in MusicBrainz nach. Führt zwei API-Requests durch: @@ -42,7 +42,7 @@ def lookup_by_barcode(ean: str) -> Album: ean: EAN-13- oder UPC-12-Barcode Returns: - Album mit vollständiger Trackliste + Tuple aus (Album mit vollständiger Trackliste, MusicBrainz Release-MBID) Raises: ValueError: Kein Eintrag für diesen Barcode gefunden @@ -60,7 +60,7 @@ def lookup_by_barcode(ean: str) -> Album: time.sleep(_RATE_SLEEP) detail = _get(f"/release/{mbid}", {"inc": "recordings", "fmt": "json"}) - return _parse_release(detail) + return _parse_release(detail), mbid def _parse_release(data: dict) -> Album: diff --git a/src/musiksammlung/organizer.py b/src/musiksammlung/organizer.py index 4d8a572..cb0ed1f 100644 --- a/src/musiksammlung/organizer.py +++ b/src/musiksammlung/organizer.py @@ -134,7 +134,10 @@ def build_mapping( artist_raw = track.artist or album.artist if artist_raw: safe_artist = sanitize_filename(artist_raw) - new_name = f"{track.track_number:02d}_-_{safe_title}_-_{safe_artist}{audio_file.suffix}" + new_name = ( + f"{track.track_number:02d}_-_{safe_title}" + f"_-_{safe_artist}{audio_file.suffix}" + ) else: new_name = f"{track.track_number:02d}_-_{safe_title}{audio_file.suffix}" mapping[audio_file] = target_dir / new_name diff --git a/src/musiksammlung/ripper.py b/src/musiksammlung/ripper.py index 750f3dc..651846e 100644 --- a/src/musiksammlung/ripper.py +++ b/src/musiksammlung/ripper.py @@ -3,19 +3,25 @@ from __future__ import annotations import logging +import queue as _queue_module import re import subprocess +import sys +import threading from pathlib import Path from pydantic import BaseModel -from musiksammlung.cddb import CddbResult, get_discid, lookup_by_discid +from musiksammlung.cddb import get_discid, lookup_by_discid from musiksammlung.config import AudioFormat +from musiksammlung.cover import download_caa_covers, prepare_cover from musiksammlung.models import Album as AlbumModel from musiksammlung.models import Disc as DiscModel from musiksammlung.models import Track as TrackModel from musiksammlung.models import TrackInfo from musiksammlung.musicbrainz import lookup_by_barcode +from musiksammlung.scanner_server import ScannerServer, print_qr +from musiksammlung.vision_llm import extract_barcode_from_image, parse_image logger = logging.getLogger(__name__) @@ -33,6 +39,9 @@ class RipperConfig(BaseModel): parallel_jobs: int = 1 # Number of parallel encoder processes use_pipes: bool = False # True = faster, no WAV files use_cddb: bool = True # Use CDDB lookup + vision_model: str = "qwen3-vl:235b-cloud" + vision_url: str = "http://localhost:11434" + scanner_port: int = 8765 def _clean_input(raw: str) -> str: @@ -53,18 +62,144 @@ def _clean_input(raw: str) -> str: return cleaned -def _sanitize_name(name: str) -> str: - """Remove problematic characters and replace spaces. +def _print_apply_hint(album_root: Path, json_path: Path, num_discs: int) -> None: + """Gibt einen kopierbaren apply-Befehl aus.""" + input_dir = album_root / "CD1" if num_discs == 1 else album_root + parts = ["musiksammlung apply", str(input_dir), str(json_path)] + if not (album_root / "frontcover.jpg").exists(): + parts.append("--front ") + print(f" → {' '.join(parts)}") + + +def _find_abcde_mbid(disc_dir: Path) -> str | None: + """Liest die MusicBrainz-ID aus dem abcde-Temp-Verzeichnis. + + abcde legt die ausgewählte Treffer-Nummer in der status-Datei als + 'cddb-choice=N' ab; die zugehörige MBID steht in 'mbid.N'. Args: - name: Original name + disc_dir: Verzeichnis, in das abcde geripped hat (enthält abcde.XXXX/) Returns: - Cleaned name (spaces -> underscores) + MusicBrainz Release-MBID oder None + """ + for abcde_dir in sorted(disc_dir.glob("abcde.*")): + status_file = abcde_dir / "status" + if not status_file.exists(): + continue + + choice = 1 + for line in status_file.read_text(encoding="utf-8", errors="replace").splitlines(): + if line.startswith("cddb-choice="): + try: + choice = int(line.split("=", 1)[1]) + except ValueError: + pass + break + + mbid_file = abcde_dir / f"mbid.{choice}" + if mbid_file.exists(): + mbid = mbid_file.read_text(encoding="utf-8").strip() + if mbid: + logger.info("abcde MBID gefunden: %s (Wahl %d)", mbid, choice) + return mbid + + return None + + +def _start_vision_thread( + image_path: Path, + model: str, + base_url: str, +) -> _queue_module.Queue: + """Startet Vision-LLM-Analyse im Hintergrund-Thread. + + Returns: + Queue, in die AlbumModel (bei Erfolg) oder None (bei Fehler) gelegt wird. + """ + result_q: _queue_module.Queue = _queue_module.Queue() + + def _run() -> None: + try: + album = parse_image([image_path], model=model, base_url=base_url) + result_q.put(album) + except Exception as exc: + logger.warning("Vision-LLM-Analyse fehlgeschlagen: %s", exc) + result_q.put(None) + + threading.Thread(target=_run, daemon=True).start() + return result_q + + +def _get_vision_result( + result_q: _queue_module.Queue, + timeout: float = 120.0, +) -> AlbumModel | None: + """Holt Vision-LLM-Ergebnis aus Queue mit Timeout.""" + try: + return result_q.get(timeout=timeout) + except _queue_module.Empty: + logger.warning("Vision-LLM: Timeout nach %.0f s", timeout) + return None + + +def _input_or_scan( + prompt: str, + scanner: ScannerServer | None, +) -> tuple[str, Path | None]: + """Kombiniertes input() + Scanner-Queue: wartet gleichzeitig auf Tastatur und Foto. + + Returns: + (eingegebener Text, None) — wenn der User Enter drückt + ("", photo_path) — wenn ein Foto hochgeladen wird + """ + if scanner is None: + return _clean_input(input(prompt)), None + + print(prompt, end="", flush=True) + + stdin_q: _queue_module.Queue = _queue_module.Queue() + + def _read_stdin() -> None: + try: + val = sys.stdin.readline().rstrip("\n") + except EOFError: + val = "" + stdin_q.put(val) + + threading.Thread(target=_read_stdin, daemon=True).start() + + while True: + photo = scanner.get_photo(timeout=0) + if photo is not None: + print("\n [Foto empfangen — weiter automatisch]", flush=True) + return "", photo + + try: + val = stdin_q.get(timeout=0.1) + return _clean_input(val), None + except _queue_module.Empty: + continue + + +def _sanitize_name(name: str) -> str: + """Bereinigt einen Namen für die Verwendung als Verzeichnis- oder Dateiname. + + - Leerzeichen → Unterstrich + - Alle Sonder- und Satzzeichen (Klammern, Komma, Punkt, …) werden entfernt + - Nur Buchstaben (inkl. Umlaute), Ziffern, Unterstrich und Bindestrich bleiben + - Mehrfache Unterstriche werden zusammengefasst + + Args: + name: Original-Name + + Returns: + Bereinigter Name """ name = name.replace(" ", "_") - name = re.sub(r'[<>:"\'/\\|?*]', "", name) - name = name.strip("_") + name = re.sub(r"[^\w\-]", "", name) # nur \w (Buchstaben/Ziffern/_) und - behalten + name = re.sub(r"_+", "_", name) # mehrfache Unterstriche zusammenfassen + name = name.strip("_-") return name @@ -517,18 +652,45 @@ def interactive_rip(config: RipperConfig) -> None: print("\nNote: Do not use arrow keys while typing — press Enter to confirm.\n") album_counter = 1 + processed_albums: list[tuple[Path, Path, int]] = [] # (album_root, json_path, num_discs) while True: print(f"\n--- Album {album_counter} ---") - # ── EAN zuerst abfragen ── - raw_ean = input("EAN/Barcode (Enter = überspringen): ") - ean = _clean_input(raw_ean) + # ── Scanner-Server starten (EAN-Foto + späteres Backcover) ── + scanner = ScannerServer(port=config.scanner_port) + scanner.start() + scanner_url = scanner.url() + print(f"\n Scanner bereit: {scanner_url}") + print_qr(scanner_url) + + # ── EAN: Texteingabe oder Barcode-Foto ── + raw_ean, ean_photo = _input_or_scan( + "EAN/Barcode (Enter = überspringen): ", scanner + ) + ean = _clean_input(raw_ean) if raw_ean else "" + + if not ean and ean_photo: + print(" Barcode wird per KI erkannt...", flush=True) + detected_ean = extract_barcode_from_image( + ean_photo, config.vision_model, config.vision_url + ) + if detected_ean: + print(f" Erkannter Barcode: {detected_ean}") + confirm = _clean_input( + input(" Korrekt? (Enter = ja, neuer Wert = tippen): ") + ) + ean = confirm if confirm else detected_ean + else: + print(" Kein Barcode erkannt.") + ean = _clean_input(input(" EAN manuell eingeben (Enter = überspringen): ")) + mb_album: AlbumModel | None = None + mb_mbid: str | None = None if ean: try: print(f" MusicBrainz-Suche nach Barcode {ean} ...", flush=True) - mb_album = lookup_by_barcode(ean) + mb_album, mb_mbid = lookup_by_barcode(ean) total_tracks = sum(len(d.tracks) for d in mb_album.discs) print( f" ✓ {mb_album.artist} – {mb_album.album}" @@ -545,13 +707,28 @@ def interactive_rip(config: RipperConfig) -> None: album_name = mb_album.album or f"Album{album_counter}" total_discs = len(mb_album.discs) + # Album-Root und Cover VOR dem Ripping anlegen, + # damit backcover.jpg für Vision-LLM verfügbar ist. + album_root = config.output_dir / _sanitize_name(album_name) + album_root.mkdir(parents=True, exist_ok=True) + if mb_mbid: + download_caa_covers(mb_mbid, album_root) + + # Vision-LLM im Hintergrund starten, falls Back-Cover vorhanden + vision_queue = None + backcover = album_root / "backcover.jpg" + if backcover.exists(): + print( + " Backcover verfügbar → Vision-LLM-Analyse im Hintergrund...", + flush=True, + ) + vision_queue = _start_vision_thread( + backcover, config.vision_model, config.vision_url + ) + for disc in mb_album.discs: disc_num = disc.disc_number - disc_dir = ( - config.output_dir - / _sanitize_name(album_name) - / f"CD{disc_num}" - ) + disc_dir = album_root / f"CD{disc_num}" input( f"\n CD {disc_num}/{total_discs} einlegen und Enter drücken " @@ -589,30 +766,76 @@ def interactive_rip(config: RipperConfig) -> None: print(" Bitte Album neu starten.") break - # album.json aus MusicBrainz-Daten schreiben - album_root = ( - config.output_dir / _sanitize_name(album_name) - ) - album_root.mkdir(parents=True, exist_ok=True) + # Vision-LLM-Ergebnis abholen (läuft parallel zum Ripping) + final_album = mb_album + if vision_queue is not None: + print(" Warte auf Vision-LLM-Ergebnis ...", flush=True) + vision_result = _get_vision_result(vision_queue, timeout=120.0) + if vision_result: + n_tracks = sum(len(d.tracks) for d in vision_result.discs) + print( + f" ✓ Vision-LLM: {n_tracks} Tracks aus Backcover extrahiert" + f" (überschreibt MusicBrainz-Trackliste)", + flush=True, + ) + final_album = vision_result + else: + print( + " Vision-LLM: kein Ergebnis — MusicBrainz-Daten werden verwendet.", + flush=True, + ) + json_path = album_root / "album.json" json_path.write_text( - mb_album.model_dump_json(indent=2), encoding="utf-8" + final_album.model_dump_json(indent=2), encoding="utf-8" ) print(f"\n album.json gespeichert: {json_path}") - print(" → Weiter mit: musiksammlung apply album.json") + _print_apply_hint(album_root, json_path, len(final_album.discs)) + processed_albums.append((album_root, json_path, len(final_album.discs))) else: # ── Fallback: kein MusicBrainz-Treffer ── - raw = input("Album name (Enter = CDDB name / default 'Album{N}'): ") - album_name = _clean_input(raw) - if not album_name: - album_name = f"Album{album_counter}" + # Scanner läuft bereits (seit EAN-Prompt), kann auch für Backcover genutzt werden + print("\n Kein MusicBrainz-Treffer.") + print(" Optional: Backcover-Foto hochladen für automatische Metadaten-Extraktion.") + print(f" URL: {scanner_url}") + print_qr(scanner_url) + print(" → Foto hochladen und Enter drücken — oder Album-Namen eingeben:") + + raw, photo_from_prompt = _input_or_scan( + "Album name (Enter = CDDB/Vision-LLM): ", + scanner, + ) + album_name = raw if raw else f"Album{album_counter}" disc_counter = 1 all_discs: list[DiscModel] = [] cddb_album: str | None = None + # Vision-LLM sofort starten wenn Foto bereits beim Prompt hochgeladen wurde + vision_queue = None + uploaded_photo: Path | None = None + if photo_from_prompt: + uploaded_photo = photo_from_prompt + print(" Backcover-Foto empfangen → Vision-LLM-Analyse gestartet...", flush=True) + vision_queue = _start_vision_thread( + photo_from_prompt, config.vision_model, config.vision_url + ) + while True: + # Foto prüfen (non-blocking) — Vision-LLM starten falls noch nicht geschehen + if vision_queue is None: + photo = scanner.get_photo(timeout=0) + if photo: + uploaded_photo = photo + print( + " Backcover-Foto empfangen → Vision-LLM-Analyse gestartet...", + flush=True, + ) + vision_queue = _start_vision_thread( + photo, config.vision_model, config.vision_url + ) + print(f"\n Album: {album_name}") print(f" CD Drive: {config.device}") @@ -697,17 +920,76 @@ def interactive_rip(config: RipperConfig) -> None: # album_root = tatsächliches Elternverzeichnis der CD-Ordner album_root = disc_dir.parent - if all_discs: + # Vision-LLM-Ergebnis: Priorität über CDDB-Daten + final_album: AlbumModel | None = None + if vision_queue is not None: + print(" Warte auf Vision-LLM-Ergebnis ...", flush=True) + vision_result = _get_vision_result(vision_queue, timeout=120.0) + if vision_result: + n_tracks = sum(len(d.tracks) for d in vision_result.discs) + print( + f" ✓ Vision-LLM: {n_tracks} Tracks aus Backcover extrahiert" + f" (Priorität über CDDB)", + flush=True, + ) + final_album = vision_result + else: + print(" Vision-LLM: kein Ergebnis — verwende CDDB-Daten.", flush=True) + + if final_album is None and all_discs: artist = all_discs[0].tracks[0].artist or "" - album_model = AlbumModel(artist=artist, album=album_name, discs=all_discs) + final_album = AlbumModel(artist=artist, album=album_name, discs=all_discs) + + # MBID aus abcde-Temp-Verzeichnis ermitteln → CAA-Cover laden + # (funktioniert auch wenn der Barcode nicht in MusicBrainz verknüpft war) + abcde_mbid: str | None = None + for cd_dir in sorted(album_root.glob("CD*")): + abcde_mbid = _find_abcde_mbid(cd_dir) + if abcde_mbid: + break + if abcde_mbid: + print(f" MusicBrainz-ID aus CD-Daten: {abcde_mbid}", flush=True) + print(" Lade Cover vom Cover Art Archive...", flush=True) + download_caa_covers(abcde_mbid, album_root) + + if final_album is not None: + # Verzeichnis umbenennen, wenn Vision-LLM/CDDB einen anderen + # Namen lieferte als der Platzhalter (z. B. "Album1") + proper_name = _sanitize_name(final_album.album or album_name) + if proper_name and proper_name != album_root.name: + new_root = album_root.parent / proper_name + if new_root.exists(): + print( + f" Hinweis: '{proper_name}' existiert bereits" + " — Verzeichnis nicht umbenannt.", + flush=True, + ) + else: + album_root.rename(new_root) + print( + f" Verzeichnis umbenannt: {album_root.name} → {proper_name}", + flush=True, + ) + album_root = new_root + album_root.mkdir(parents=True, exist_ok=True) + + # Hochgeladenes Backcover ins Album-Verzeichnis kopieren + # (überschreibt ggf. das CAA-Backcover — das Handy-Foto hat Vorrang) + if uploaded_photo and uploaded_photo.exists(): + dest = album_root / "backcover.jpg" + prepare_cover(uploaded_photo, dest) + print(f" backcover.jpg gespeichert: {dest}", flush=True) + json_path = album_root / "album.json" json_path.write_text( - album_model.model_dump_json(indent=2), encoding="utf-8" + final_album.model_dump_json(indent=2), encoding="utf-8" ) print(f"\n album.json gespeichert: {json_path}") - print(" → Weiter mit: musiksammlung apply album.json") + _print_apply_hint(album_root, json_path, len(final_album.discs)) + processed_albums.append((album_root, json_path, len(final_album.discs))) + scanner.stop() raw_album = input("\nNext album? (y/n): ") if _clean_input(raw_album).lower() != "y": break @@ -717,11 +999,22 @@ def interactive_rip(config: RipperConfig) -> None: print("\n" + "=" * 60) print("Ripping completed!") print(f"Files are in: {config.output_dir.absolute()}") - print("\nNext steps:") - print(" 1. Check filenames and tags") - if config.use_cddb: - print(" 2. Adjust tags/covers with 'musiksammlung apply'") - else: - print(" 2. Run 'musiksammlung scan' to extract metadata") - print(" 3. Run 'musiksammlung apply' to organize & tag") + + if processed_albums: + print("\nNext steps:") + for i, (album_root, json_path, num_discs) in enumerate(processed_albums, 1): + input_dir = album_root / "CD1" if num_discs == 1 else album_root + front_flag = ( + "" if (album_root / "frontcover.jpg").exists() + else " --front " + ) + print(f"\n Album {i}: {album_root.name}") + print(f" 1. Prüfen/bearbeiten: {json_path}") + print( + f" 2. musiksammlung apply" + f" {input_dir}" + f" {json_path}" + f"{front_flag}" + f" " + ) print("=" * 60 + "\n") diff --git a/src/musiksammlung/scanner_server.py b/src/musiksammlung/scanner_server.py new file mode 100644 index 0000000..e03f3f6 --- /dev/null +++ b/src/musiksammlung/scanner_server.py @@ -0,0 +1,274 @@ +"""Mini-HTTP-Server für Handy-basiertes Backcover-Upload. + +Startet einen lokalen HTTP-Server (kein HTTPS nötig, da nur file-input, +kein getUserMedia), der eine mobile Upload-Seite ausliefert und +hochgeladene Fotos in einer Queue bereitstellt. +""" + +from __future__ import annotations + +import base64 +import json +import logging +import queue +import socket +import tempfile +import threading +from http.server import BaseHTTPRequestHandler, HTTPServer +from pathlib import Path + +logger = logging.getLogger(__name__) + +_UPLOAD_HTML = """\ + + + + + + CD-Backcover + + + +

CD-Backcover hochladen

+

Fotografiere die Rückseite der CD-Hülle und lade das Bild hoch.

+ + + + Vorschau + +
+ + + + +""" + + +def _get_local_ip() -> str: + """Ermittelt die lokale LAN-IP-Adresse.""" + try: + with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s: + s.connect(("8.8.8.8", 80)) + return s.getsockname()[0] + except OSError: + return "127.0.0.1" + + +def print_qr(url: str) -> None: + """Gibt einen QR-Code für die URL im Terminal aus. + + Zeigt alternativ nur die URL, wenn qrcode nicht installiert ist. + """ + try: + import qrcode # type: ignore[import-untyped] + qr = qrcode.QRCode(border=1) + qr.add_data(url) + qr.make(fit=True) + qr.print_ascii(invert=True) + except ImportError: + pass # qrcode optional — URL wird sowieso separat ausgegeben + + +class ScannerServer: + """Lokaler HTTP-Server für Backcover-Foto-Upload vom Handy. + + Beispiel: + server = ScannerServer(port=8765) + server.start() + print(server.url()) + photo_path = server.get_photo(timeout=300) + server.stop() + """ + + def __init__(self, port: int = 8765, upload_dir: Path | None = None) -> None: + self._port = port + self._queue: queue.Queue[Path] = queue.Queue() + self._upload_dir = upload_dir or Path(tempfile.mkdtemp(prefix="ms_scan_")) + self._server: HTTPServer | None = None + self._thread: threading.Thread | None = None + + # ------------------------------------------------------------------ + # Public API + # ------------------------------------------------------------------ + + def start(self) -> None: + """Startet den HTTP-Server im Hintergrund-Thread.""" + server_instance = self + + class _Handler(BaseHTTPRequestHandler): + def log_message(self, fmt: str, *args: object) -> None: # suppress output + logger.debug("Scanner HTTP: " + fmt, *args) + + def do_GET(self) -> None: # noqa: N802 + if self.path in ("/", "/index.html"): + body = _UPLOAD_HTML.encode("utf-8") + self.send_response(200) + self.send_header("Content-Type", "text/html; charset=utf-8") + self.send_header("Content-Length", str(len(body))) + self.end_headers() + self.wfile.write(body) + else: + self.send_response(404) + self.end_headers() + + def do_POST(self) -> None: # noqa: N802 + if self.path != "/upload": + self.send_response(404) + self.end_headers() + return + + length = int(self.headers.get("Content-Length", 0)) + body = self.rfile.read(length) + + try: + data = json.loads(body) + img_data: str = data["image"] # "data:;base64," + + # Mime-Typ und Erweiterung bestimmen + mime = "image/jpeg" + if img_data.startswith("data:"): + mime = img_data.split(";")[0][5:] + ext = { + "image/jpeg": ".jpg", + "image/png": ".png", + "image/webp": ".webp", + }.get(mime, ".jpg") + + _, b64 = img_data.split(",", 1) + img_bytes = base64.b64decode(b64) + + path = server_instance._upload_dir / f"backcover{ext}" + path.write_bytes(img_bytes) + logger.info("Backcover hochgeladen: %s (%d bytes)", path, len(img_bytes)) + server_instance._queue.put(path) + + resp = json.dumps({"status": "ok"}).encode() + self.send_response(200) + self.send_header("Content-Type", "application/json") + self.send_header("Content-Length", str(len(resp))) + self.end_headers() + self.wfile.write(resp) + + except Exception as exc: + logger.warning("Upload-Fehler: %s", exc) + resp = json.dumps({"status": "error", "message": str(exc)}).encode() + self.send_response(400) + self.send_header("Content-Type", "application/json") + self.send_header("Content-Length", str(len(resp))) + self.end_headers() + self.wfile.write(resp) + + self._server = HTTPServer(("0.0.0.0", self._port), _Handler) + self._thread = threading.Thread(target=self._server.serve_forever, daemon=True) + self._thread.start() + logger.info("Scanner-Server gestartet: %s", self.url()) + + def stop(self) -> None: + """Beendet den HTTP-Server.""" + if self._server: + self._server.shutdown() + self._server = None + logger.info("Scanner-Server gestoppt.") + + def get_photo(self, timeout: float | None = None) -> Path | None: + """Gibt den Pfad zum hochgeladenen Foto zurück (blockierend mit Timeout). + + Args: + timeout: Wartezeit in Sekunden. None = unbegrenzt. 0 = sofort. + + Returns: + Pfad zur Bilddatei oder None wenn Timeout abgelaufen. + """ + try: + if timeout == 0: + return self._queue.get_nowait() + return self._queue.get(timeout=timeout) + except queue.Empty: + return None + + def url(self) -> str: + """Gibt die URL des Servers im LAN zurück.""" + return f"http://{_get_local_ip()}:{self._port}" diff --git a/src/musiksammlung/vision_llm.py b/src/musiksammlung/vision_llm.py index fe9336f..ad4ae76 100644 --- a/src/musiksammlung/vision_llm.py +++ b/src/musiksammlung/vision_llm.py @@ -46,6 +46,45 @@ VISION_PROMPT += """ Jetzt lies das Bild ab und gib das vollständige JSON aus. /no_think""" +EAN_PROMPT = """\ +Schau dir das Bild an. Es zeigt eine CD-Hülle oder Produktverpackung. +Suche den EAN-13 oder UPC-A Barcode — die Ziffernreihe unter dem Strichcode-Symbol. +Gib NUR die Ziffern aus, ohne Leerzeichen, ohne Erklärung, kein weiterer Text. +Wenn kein Barcode erkennbar ist, gib einen leeren String zurück. /no_think""" + + +def extract_barcode_from_image( + image_path: Path, + model: str = "qwen3-vl:235b-cloud", + base_url: str = "http://localhost:11434", +) -> str | None: + """Extrahiert EAN/Barcode-Nummer aus einem Foto via Vision-LLM. + + Returns: + Nur Ziffern (z.B. '4006408262121') oder None wenn nicht erkannt. + """ + images_b64 = [_encode_image(image_path)] + messages = [{"role": "user", "content": EAN_PROMPT, "images": images_b64}] + try: + response = httpx.post( + f"{base_url}/api/chat", + json={"model": model, "messages": messages, "stream": False}, + timeout=60.0, + ) + response.raise_for_status() + raw = response.json()["message"]["content"] + raw = re.sub(r".*?", "", raw, flags=re.DOTALL).strip() + digits = re.sub(r"\D", "", raw) + if digits: + logger.info("Barcode aus Bild extrahiert: %s", digits) + return digits + logger.warning("Kein Barcode im Bild erkannt") + return None + except Exception as exc: + logger.warning("Barcode-Extraktion fehlgeschlagen: %s", exc) + return None + + def _encode_image(image_path: Path) -> str: """Liest ein Bild und gibt es als Base64-String zurück.""" return base64.b64encode(image_path.read_bytes()).decode("utf-8") diff --git a/tests/test_cddb.py b/tests/test_cddb.py index eed5494..8ee74ac 100644 --- a/tests/test_cddb.py +++ b/tests/test_cddb.py @@ -2,8 +2,6 @@ from unittest.mock import MagicMock, patch -import httpx - from musiksammlung.cddb import CddbResult, _read_gnudb, lookup_by_discid diff --git a/tests/test_cover.py b/tests/test_cover.py index 264743d..2a2ece7 100644 --- a/tests/test_cover.py +++ b/tests/test_cover.py @@ -1,10 +1,13 @@ """Tests für Cover-Funktionen.""" +from io import BytesIO from pathlib import Path +from unittest.mock import MagicMock, patch +import httpx from PIL import Image -from musiksammlung.cover import copy_covers, find_cover, prepare_cover +from musiksammlung.cover import copy_covers, download_caa_covers, find_cover, prepare_cover def _make_image(path: Path, size: tuple[int, int] = (100, 100), mode: str = "RGB") -> Path: @@ -119,3 +122,90 @@ class TestCopyCovers: original_mtime = existing.stat().st_mtime copy_covers(None, None, tmp_path) assert (tmp_path / "frontcover.jpg").stat().st_mtime == original_mtime + + +def _fake_image_bytes() -> bytes: + """Erzeugt ein gültiges JPEG-Bild als bytes.""" + buf = BytesIO() + Image.new("RGB", (200, 200), (100, 150, 200)).save(buf, "JPEG") + return buf.getvalue() + + +def _mock_caa_response(status_code: int = 200, content: bytes = b"") -> MagicMock: + """Erstellt ein Mock-httpx.Response für CAA-Requests.""" + resp = MagicMock(spec=httpx.Response) + resp.status_code = status_code + resp.content = content + resp.raise_for_status.return_value = None + return resp + + +class TestDownloadCaaCovers: + """Tests für download_caa_covers.""" + + def test_downloads_front_and_back(self, tmp_path: Path) -> None: + """Beide Cover werden heruntergeladen und als JPEG gespeichert.""" + img_bytes = _fake_image_bytes() + resp = _mock_caa_response(200, img_bytes) + + with patch("musiksammlung.cover.httpx.get", return_value=resp): + download_caa_covers("test-mbid", tmp_path) + + assert (tmp_path / "frontcover.jpg").exists() + assert (tmp_path / "backcover.jpg").exists() + # Ergebnis ist ein gültiges JPEG + assert Image.open(tmp_path / "frontcover.jpg").format == "JPEG" + + def test_404_skips_cover(self, tmp_path: Path) -> None: + """404 → kein Cover, kein Fehler.""" + resp_404 = _mock_caa_response(404) + + with patch("musiksammlung.cover.httpx.get", return_value=resp_404): + download_caa_covers("no-cover-mbid", tmp_path) + + assert not (tmp_path / "frontcover.jpg").exists() + assert not (tmp_path / "backcover.jpg").exists() + + def test_http_error_continues(self, tmp_path: Path) -> None: + """Netzwerkfehler → Warnung, kein Abbruch.""" + with patch( + "musiksammlung.cover.httpx.get", + side_effect=httpx.HTTPError("timeout"), + ): + download_caa_covers("error-mbid", tmp_path) + + assert not (tmp_path / "frontcover.jpg").exists() + assert not (tmp_path / "backcover.jpg").exists() + + def test_skips_existing_cover(self, tmp_path: Path) -> None: + """Bereits vorhandene Cover werden nicht überschrieben.""" + existing = _make_image(tmp_path / "frontcover.jpg") + original_size = existing.stat().st_size + + img_bytes = _fake_image_bytes() + resp = _mock_caa_response(200, img_bytes) + + with patch("musiksammlung.cover.httpx.get", return_value=resp) as mock_get: + download_caa_covers("test-mbid", tmp_path) + + # frontcover.jpg bleibt unverändert + assert (tmp_path / "frontcover.jpg").stat().st_size == original_size + # backcover.jpg wird heruntergeladen (war nicht vorhanden) + assert (tmp_path / "backcover.jpg").exists() + # Nur ein HTTP-Request (für back), nicht zwei + assert mock_get.call_count == 1 + + def test_front_only_on_back_404(self, tmp_path: Path) -> None: + """Front 200, Back 404 → nur frontcover.jpg erstellt.""" + img_bytes = _fake_image_bytes() + resp_ok = _mock_caa_response(200, img_bytes) + resp_404 = _mock_caa_response(404) + + with patch( + "musiksammlung.cover.httpx.get", + side_effect=[resp_ok, resp_404], + ): + download_caa_covers("mixed-mbid", tmp_path) + + assert (tmp_path / "frontcover.jpg").exists() + assert not (tmp_path / "backcover.jpg").exists() diff --git a/tests/test_musicbrainz.py b/tests/test_musicbrainz.py index 41308dd..8880277 100644 --- a/tests/test_musicbrainz.py +++ b/tests/test_musicbrainz.py @@ -160,11 +160,12 @@ class TestLookupByBarcode: patch("musiksammlung.musicbrainz.httpx.get", side_effect=responses), patch("musiksammlung.musicbrainz.time.sleep"), ): - album = lookup_by_barcode("0602557360561") + album, mbid = lookup_by_barcode("0602557360561") assert album.artist == "The Beatles" assert album.album == "Abbey Road" assert album.year == 1969 + assert mbid == "abc-123" def test_raises_when_no_releases(self) -> None: empty = _mock_response({"releases": []}) @@ -173,7 +174,7 @@ class TestLookupByBarcode: patch("musiksammlung.musicbrainz.time.sleep"), pytest.raises(ValueError, match="Kein MusicBrainz-Eintrag"), ): - lookup_by_barcode("0000000000000") + lookup_by_barcode("0000000000000") # raises before returning tuple def test_uses_first_release(self) -> None: barcode_data = {"releases": [{"id": "first-id"}, {"id": "second-id"}]} @@ -182,11 +183,12 @@ class TestLookupByBarcode: patch("musiksammlung.musicbrainz.httpx.get", side_effect=responses) as mock_get, patch("musiksammlung.musicbrainz.time.sleep"), ): - lookup_by_barcode("1234567890123") + _album, mbid = lookup_by_barcode("1234567890123") # Zweiter Request muss die MBID des ersten Treffers verwenden second_call_url = mock_get.call_args_list[1][0][0] assert "first-id" in second_call_url + assert mbid == "first-id" def test_rate_limit_sleep_is_called(self) -> None: responses = [_mock_response(_BARCODE_RESPONSE), _mock_response(_RELEASE_RESPONSE)] @@ -194,7 +196,7 @@ class TestLookupByBarcode: patch("musiksammlung.musicbrainz.httpx.get", side_effect=responses), patch("musiksammlung.musicbrainz.time.sleep") as mock_sleep, ): - lookup_by_barcode("0602557360561") + _album, _mbid = lookup_by_barcode("0602557360561") mock_sleep.assert_called_once() assert mock_sleep.call_args[0][0] >= 1.0 @@ -206,4 +208,4 @@ class TestLookupByBarcode: patch("musiksammlung.musicbrainz.httpx.get", side_effect=httpx.HTTPError("timeout")), pytest.raises(httpx.HTTPError), ): - lookup_by_barcode("0000000000000") + lookup_by_barcode("0000000000000") # raises before returning tuple diff --git a/tests/test_ripper.py b/tests/test_ripper.py index 4a8f8b2..159ec03 100644 --- a/tests/test_ripper.py +++ b/tests/test_ripper.py @@ -30,6 +30,13 @@ class TestSanitizeName: assert _sanitize_name("Test|Track?Name*") == "TestTrackName" assert _sanitize_name("It's_a_Test") == "Its_a_Test" + def test_remove_brackets_and_punctuation(self) -> None: + assert _sanitize_name("Best of (1990)") == "Best_of_1990" + assert _sanitize_name("Hello, World!") == "Hello_World" + assert _sanitize_name("Vol. 2") == "Vol_2" + assert _sanitize_name("Salt & Pepper [Remix]") == "Salt_Pepper_Remix" + assert _sanitize_name("The Best of... (Deluxe Edition)") == "The_Best_of_Deluxe_Edition" + def test_keep_umlauts(self) -> None: assert _sanitize_name("Grüße aus Österreich") == "Grüße_aus_Österreich" assert _sanitize_name("Schöne Sänger") == "Schöne_Sänger" @@ -349,10 +356,13 @@ class TestInteractiveRipEanFirst: "n", # next album? ] config = RipperConfig(output_dir=tmp_path) + sp = self._scanner_patches() with ( + sp[0], sp[1], patch("musiksammlung.ripper.rip_disc", return_value=(tmp_path, None, _CDDB_TRACKS)), patch("builtins.input", side_effect=iter(inputs)), - patch("musiksammlung.ripper.lookup_by_barcode", return_value=_MB_ALBUM), + patch("musiksammlung.ripper.lookup_by_barcode", return_value=(_MB_ALBUM, "fake-mbid")), + patch("musiksammlung.ripper.download_caa_covers") as mock_caa, ): interactive_rip(config) @@ -363,6 +373,7 @@ class TestInteractiveRipEanFirst: assert data["artist"] == "The Beatles" assert data["album"] == "Abbey Road" assert data["year"] == 1969 + mock_caa.assert_called_once_with("fake-mbid", tmp_path / "Abbey_Road") def test_mb_hit_auto_rip_multi_disc(self, tmp_path: Path) -> None: """MB-Treffer mit 2 Discs → 2x Disc-Insert-Prompt, album.json aus MB.""" @@ -373,10 +384,14 @@ class TestInteractiveRipEanFirst: "n", # next album? ] config = RipperConfig(output_dir=tmp_path) + sp = self._scanner_patches() with ( + sp[0], sp[1], patch("musiksammlung.ripper.rip_disc", return_value=(tmp_path, None, _CDDB_TRACKS)), patch("builtins.input", side_effect=iter(inputs)), - patch("musiksammlung.ripper.lookup_by_barcode", return_value=_MB_ALBUM_2DISC), + patch("musiksammlung.ripper.lookup_by_barcode", + return_value=(_MB_ALBUM_2DISC, "fake-mbid-2")), + patch("musiksammlung.ripper.download_caa_covers"), ): interactive_rip(config) @@ -397,10 +412,14 @@ class TestInteractiveRipEanFirst: "n", # next album? ] config = RipperConfig(output_dir=tmp_path) + sp = self._scanner_patches() with ( + sp[0], sp[1], patch("musiksammlung.ripper.rip_disc", return_value=(tmp_path, None, _CDDB_TRACKS)), patch("builtins.input", side_effect=iter(inputs)), - patch("musiksammlung.ripper.lookup_by_barcode", return_value=_MB_ALBUM) as mock_lookup, + patch("musiksammlung.ripper.lookup_by_barcode", + return_value=(_MB_ALBUM, "fake-mbid")) as mock_lookup, + patch("musiksammlung.ripper.download_caa_covers"), ): interactive_rip(config) @@ -419,10 +438,13 @@ class TestInteractiveRipEanFirst: (disc_dir / "track01.flac").touch() (disc_dir / "track02.flac").touch() + sp = self._scanner_patches() with ( + sp[0], sp[1], patch("musiksammlung.ripper.rip_disc", return_value=(disc_dir, None, _CDDB_TRACKS)), patch("builtins.input", side_effect=iter(inputs)), - patch("musiksammlung.ripper.lookup_by_barcode", return_value=_MB_ALBUM), + patch("musiksammlung.ripper.lookup_by_barcode", return_value=(_MB_ALBUM, "fake-mbid")), + patch("musiksammlung.ripper.download_caa_covers"), ): interactive_rip(config) @@ -430,7 +452,31 @@ class TestInteractiveRipEanFirst: # Dateien existieren schon vorher, rename findet in _rename_files statt assert (tmp_path / "Abbey_Road" / "album.json").exists() - # ── Fallback (kein MB-Treffer / EAN leer) ── + # ── Gemeinsame Scanner-Patches (alle interactive_rip-Tests) ── + # + # Ab EAN-Prompt startet interactive_rip immer einen ScannerServer und + # nutzt _input_or_scan. Beide werden in allen Tests gemockt. + + @staticmethod + def _scanner_patches(): + """Patches für ScannerServer und _input_or_scan (alle interactive_rip-Tests).""" + mock_scanner = MagicMock() + mock_scanner.url.return_value = "http://127.0.0.1:8765" + mock_scanner.get_photo.return_value = None + return [ + patch("musiksammlung.ripper.ScannerServer", return_value=mock_scanner), + patch( + "musiksammlung.ripper._input_or_scan", + side_effect=lambda prompt, scanner: (input(prompt), None), + ), + ] + + @staticmethod + def _fallback_patches(inputs: list[str]): + """Gemeinsame Patches für Fallback-Tests.""" + patches = TestInteractiveRipEanFirst._scanner_patches() + patches.append(patch("builtins.input", side_effect=iter(inputs))) + return patches def test_ean_empty_falls_back_to_album_prompt(self, tmp_path: Path) -> None: """Leere EAN → Fallback: Albumname wird abgefragt.""" @@ -443,9 +489,10 @@ class TestInteractiveRipEanFirst: "n", # next album? ] config = RipperConfig(output_dir=tmp_path) + patches = self._fallback_patches(inputs) with ( + patches[0], patches[1], patches[2], patch("musiksammlung.ripper.rip_disc", return_value=(tmp_path, None, _CDDB_TRACKS)), - patch("builtins.input", side_effect=iter(inputs)), patch("musiksammlung.ripper.lookup_by_barcode") as mock_lookup, ): interactive_rip(config) @@ -465,15 +512,16 @@ class TestInteractiveRipEanFirst: "n", # next album? ] config = RipperConfig(output_dir=tmp_path) + patches = self._fallback_patches(inputs) with ( + patches[0], patches[1], patches[2], patch("musiksammlung.ripper.rip_disc", return_value=(tmp_path, None, _CDDB_TRACKS)), - patch("builtins.input", side_effect=iter(inputs)), patch( "musiksammlung.ripper.lookup_by_barcode", side_effect=ValueError("Kein MusicBrainz-Eintrag"), ), ): - interactive_rip(config) # darf nicht werfen + interactive_rip(config) json_path = tmp_path / "Abbey_Road" / "album.json" assert json_path.exists() @@ -489,9 +537,10 @@ class TestInteractiveRipEanFirst: "n", # next album? ] config = RipperConfig(output_dir=tmp_path) + patches = self._fallback_patches(inputs) with ( + patches[0], patches[1], patches[2], patch("musiksammlung.ripper.rip_disc", return_value=(tmp_path, None, _CDDB_TRACKS)), - patch("builtins.input", side_effect=iter(inputs)), patch("musiksammlung.ripper.lookup_by_barcode"), ): interactive_rip(config) @@ -514,9 +563,10 @@ class TestInteractiveRipEanFirst: "n", # next album? ] config = RipperConfig(output_dir=tmp_path) + patches = self._fallback_patches(inputs) with ( + patches[0], patches[1], patches[2], patch("musiksammlung.ripper.rip_disc", return_value=(tmp_path, None, _CDDB_TRACKS)), - patch("builtins.input", side_effect=iter(inputs)), patch("musiksammlung.ripper.lookup_by_barcode"), ): interactive_rip(config)