EAN-first workflow in interactive_rip + GnuDB DYEAR/DGENRE parsing

EAN is now asked before the album name. On MusicBrainz hit, the ripper
enters an auto-rip flow (no album name prompt, no CDDB confirm, disc
count from MB data). On miss/empty EAN, the previous fallback flow
(album name → CDDB confirm) is preserved.

GnuDB responses now parse DYEAR and DGENRE fields into a new CddbResult
NamedTuple. Album model gains an optional genre field.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Dieter Schlüter 2026-02-19 00:21:42 +01:00
commit 8b449493cd
5 changed files with 510 additions and 196 deletions

View file

@ -10,6 +10,7 @@ import logging
import random
import subprocess
import time
from typing import NamedTuple
import httpx
@ -17,6 +18,16 @@ from musiksammlung.models import TrackInfo
logger = logging.getLogger(__name__)
class CddbResult(NamedTuple):
"""Ergebnis eines GnuDB/CDDB-Lookups mit Album-Metadaten."""
tracks: list[TrackInfo]
artist: str # Album-Artist aus DTITLE
album: str # Album-Titel aus DTITLE
year: int | None # DYEAR (kann fehlen)
genre: str # DGENRE (leer wenn nicht vorhanden)
_GNUDB_URL = "https://gnudb.gnudb.org/~cddb/cddb.cgi"
_HELLO = "musiksammlung+localhost+musiksammlung+0.1"
_RATE_SLEEP = 1.1
@ -101,11 +112,11 @@ def _query_gnudb(discid_line: str) -> tuple[str, str] | None:
return None
def _read_gnudb(category: str, discid: str) -> list[TrackInfo] | None:
def _read_gnudb(category: str, discid: str) -> CddbResult | None:
"""Liest Trackliste einer CDDB-Kategorie/Disc-ID von GnuDB.
Returns:
Liste von TrackInfo oder None bei Fehler.
CddbResult mit Tracks und Album-Metadaten, oder None bei Fehler.
"""
time.sleep(_RATE_SLEEP)
response = httpx.get(
@ -128,6 +139,8 @@ def _read_gnudb(category: str, discid: str) -> list[TrackInfo] | None:
# xmcd-Format parsen
dtitle = ""
dyear = ""
dgenre = ""
ttitles: dict[int, str] = {}
for line in lines[1:]:
@ -135,15 +148,30 @@ def _read_gnudb(category: str, discid: str) -> list[TrackInfo] | None:
continue
if line.startswith("DTITLE="):
dtitle = line[7:].strip()
elif line.startswith("DYEAR="):
dyear = line[6:].strip()
elif line.startswith("DGENRE="):
dgenre = line[7:].strip()
elif line.startswith("TTITLE"):
eq = line.index("=")
idx = int(line[6:eq])
ttitles[idx] = line[eq + 1:].strip()
# Künstler aus "Artist / Title" extrahieren
# Künstler und Album aus "Artist / Title" extrahieren
album_artist = ""
album_title = dtitle
if " / " in dtitle:
album_artist = dtitle.split(" / ", 1)[0].strip()
album_artist, album_title = dtitle.split(" / ", 1)
album_artist = album_artist.strip()
album_title = album_title.strip()
# DYEAR in int konvertieren
year: int | None = None
if dyear:
try:
year = int(dyear)
except ValueError:
pass
if not ttitles:
return None
@ -163,14 +191,20 @@ def _read_gnudb(category: str, discid: str) -> list[TrackInfo] | None:
title=title,
))
logger.info("GnuDB: %d Tracks für '%s' geladen", len(tracks), dtitle)
return tracks
logger.info("GnuDB: %d Tracks für '%s' geladen (year=%s, genre=%s)", len(tracks), dtitle, year, dgenre)
return CddbResult(
tracks=tracks,
artist=album_artist,
album=album_title,
year=year,
genre=dgenre,
)
def lookup_by_discid(
discid_line: str,
retries: int = 3,
) -> list[TrackInfo] | None:
) -> CddbResult | None:
"""Sucht Trackliste auf GnuDB anhand des Disc-Fingerprints.
Bei leerem Ergebnis wird bis zu `retries`-mal mit zufälliger Pause
@ -181,7 +215,7 @@ def lookup_by_discid(
retries: Anzahl Wiederholungsversuche
Returns:
Liste von TrackInfo oder None wenn kein Treffer.
CddbResult mit Tracks und Album-Metadaten, oder None wenn kein Treffer.
"""
for attempt in range(retries + 1):
logger.info(

View file

@ -32,6 +32,7 @@ class Album(BaseModel):
artist: str
album: str = ""
year: int | None = None
genre: str | None = None
discs: list[Disc]
@field_validator("album", "artist", mode="before")

View file

@ -9,7 +9,7 @@ from pathlib import Path
from pydantic import BaseModel
from musiksammlung.cddb import get_discid, lookup_by_discid
from musiksammlung.cddb import CddbResult, get_discid, lookup_by_discid
from musiksammlung.config import AudioFormat
from musiksammlung.models import Album as AlbumModel
from musiksammlung.models import Disc as DiscModel
@ -427,8 +427,9 @@ def _rip_with_abcde(
# Fallback: GnuDB direkt anfragen (mit Retries + Zufallspause)
if discid_line:
print(" GnuDB-Fallback: direkter Lookup mit Retries...", flush=True)
tracks = lookup_by_discid(discid_line) or None
if tracks:
cddb_result = lookup_by_discid(discid_line)
if cddb_result:
tracks = cddb_result.tracks
print(f" GnuDB: {len(tracks)} Tracks gefunden", flush=True)
else:
print(" GnuDB: kein Treffer.", flush=True)
@ -492,6 +493,11 @@ def rip_disc(
def interactive_rip(config: RipperConfig) -> None:
"""Interactive rip workflow for multiple CDs.
EAN-First Flow:
1. EAN abfragen MusicBrainz-Lookup
2. Bei Treffer: Auto-Rip (Disc-Loop aus MB-Daten, kein Albumname nötig)
3. Kein Treffer: Fallback (Albumname fragen, CDDB-Confirm wie bisher)
Files are placed under config.output_dir:
Album_Name/CD1/01_-_title_-_artist.flac, ...
@ -515,137 +521,193 @@ def interactive_rip(config: RipperConfig) -> None:
while True:
print(f"\n--- Album {album_counter} ---")
raw = input("Album name (Enter = CDDB name / default 'Album{N}'): ")
album_name = _clean_input(raw)
if not album_name:
album_name = f"Album{album_counter}"
# Optional: EAN/Barcode für MusicBrainz-Lookup
raw_ean = input("EAN/Barcode für MusicBrainz (Enter = überspringen): ")
# ── EAN zuerst abfragen ──
raw_ean = input("EAN/Barcode (Enter = überspringen): ")
ean = _clean_input(raw_ean)
mb_album: AlbumModel | None = None
if ean:
try:
print(f" MusicBrainz-Suche nach Barcode {ean} ...", flush=True)
mb_album = lookup_by_barcode(ean)
total_tracks = sum(len(d.tracks) for d in mb_album.discs)
print(
f"{mb_album.artist} {mb_album.album}"
f" ({mb_album.year or '?'},"
f" {sum(len(d.tracks) for d in mb_album.discs)} Tracks)",
f" {len(mb_album.discs)} Disc(s),"
f" {total_tracks} Tracks)",
flush=True,
)
# Albumnamen aus MusicBrainz übernehmen, wenn nicht manuell gesetzt
if album_name == f"Album{album_counter}":
album_name = mb_album.album or album_name
except Exception as e:
print(f" MusicBrainz: kein Treffer — {e}", flush=True)
disc_counter = 1
all_discs: list[DiscModel] = []
if mb_album:
# ── Auto-Rip: MusicBrainz-Treffer ──
album_name = mb_album.album or f"Album{album_counter}"
total_discs = len(mb_album.discs)
while True:
print(f"\n Album: {album_name}")
print(f" CD Drive: {config.device}")
raw_disc = input(f" CD number [{disc_counter}]: ")
disc_num = int(_clean_input(raw_disc)) if _clean_input(raw_disc) else disc_counter
disc_dir = (
config.output_dir
/ _sanitize_name(album_name)
/ f"CD{disc_num}"
)
print(f"\n Ripping to: {disc_dir}")
print(" " + "-" * 50)
try:
_, cddb_album, tracks = rip_disc(
device=config.device,
output_dir=disc_dir,
audio_format=config.audio_format,
quality=config.quality,
parallel_jobs=config.parallel_jobs,
use_pipes=config.use_pipes,
use_cddb=config.use_cddb,
rename=False,
for disc in mb_album.discs:
disc_num = disc.disc_number
disc_dir = (
config.output_dir
/ _sanitize_name(album_name)
/ f"CD{disc_num}"
)
print("\n " + "-" * 50)
if tracks:
label = f"'{cddb_album}'" if cddb_album else "?"
print(f" CDDB-Treffer: {label}{len(tracks)} Tracks:")
for t in tracks:
print(f" {t.track_number:2d}. {t.title} [{t.artist}]")
input(
f"\n CD {disc_num}/{total_discs} einlegen und Enter drücken "
f"({len(disc.tracks)} Tracks) ..."
)
raw_ok = input("\n Treffer korrekt? (j/n) [j]: ")
if _clean_input(raw_ok).lower() not in ("n", "no", "nein"):
print(" Umbenennen ...", flush=True)
print(f" Ripping to: {disc_dir}")
print(" " + "-" * 50)
try:
_, _cddb_album, tracks = rip_disc(
device=config.device,
output_dir=disc_dir,
audio_format=config.audio_format,
quality=config.quality,
parallel_jobs=config.parallel_jobs,
use_pipes=config.use_pipes,
use_cddb=config.use_cddb,
rename=False,
)
# Rename mit CDDB-Tracks (lokale Dateinamen), falls vorhanden
if tracks:
print(" Umbenennen (CDDB-Daten) ...", flush=True)
_rename_files(disc_dir, tracks, config.audio_format)
all_discs.append(DiscModel(
disc_number=disc_num,
tracks=[
TrackModel(
track_number=t.track_number,
title=t.title,
artist=t.artist,
)
for t in tracks
],
))
else:
print(
" CDDB-Daten verworfen — Dateien bleiben als track01.flac",
flush=True,
)
tracks = None
else:
print(" ✓ Fertig (keine CDDB-Daten)")
print(" ✓ Fertig (keine CDDB-Daten für Rename)")
except RuntimeError as e:
print(f"\n ✗ Error: {e}")
raw_retry = input(" Try again? (y/n): ")
if _clean_input(raw_retry).lower() != "y":
print(" Aborting disc.")
except RuntimeError as e:
print(f"\n ✗ Error: {e}")
raw_retry = input(" Nochmal versuchen? (j/n): ")
if _clean_input(raw_retry).lower() in ("j", "ja", "y", "yes"):
# Gleiche Disc nochmal — aber wir können im for-loop
# nicht einfach zurückspringen, daher Hinweis
print(" Bitte Album neu starten.")
break
continue
raw_next = input("\n Next CD for this album? (y/n): ")
if _clean_input(raw_next).lower() != "y":
break
disc_counter += 1
# CDDB-Albumname übernehmen, wenn der User nur den Default verwendet hat
if cddb_album and album_name == f"Album{album_counter}":
# CDDB DTITLE: "Artist / Album" → Album extrahieren
if " / " in cddb_album:
_, cddb_title = cddb_album.split(" / ", 1)
else:
cddb_title = cddb_album
album_name = cddb_title.strip()
# album_root = tatsächliches Elternverzeichnis der CD-Ordner
album_root = disc_dir.parent
if mb_album:
# MusicBrainz-Daten haben Priorität (inkl. Jahr, kuratierte Titel)
album_model = mb_album
elif all_discs:
artist = all_discs[0].tracks[0].artist or ""
album_model = AlbumModel(artist=artist, album=album_name, discs=all_discs)
else:
album_model = None
if album_model is not None:
# album.json aus MusicBrainz-Daten schreiben
album_root = (
config.output_dir / _sanitize_name(album_name)
)
album_root.mkdir(parents=True, exist_ok=True)
json_path = album_root / "album.json"
json_path.write_text(
album_model.model_dump_json(indent=2), encoding="utf-8"
mb_album.model_dump_json(indent=2), encoding="utf-8"
)
print(f"\n album.json gespeichert: {json_path}")
print(" → Weiter mit: musiksammlung apply <album-verzeichnis> album.json")
else:
# ── Fallback: kein MusicBrainz-Treffer ──
raw = input("Album name (Enter = CDDB name / default 'Album{N}'): ")
album_name = _clean_input(raw)
if not album_name:
album_name = f"Album{album_counter}"
disc_counter = 1
all_discs: list[DiscModel] = []
cddb_album: str | None = None
while True:
print(f"\n Album: {album_name}")
print(f" CD Drive: {config.device}")
raw_disc = input(f" CD number [{disc_counter}]: ")
disc_num = int(_clean_input(raw_disc)) if _clean_input(raw_disc) else disc_counter
disc_dir = (
config.output_dir
/ _sanitize_name(album_name)
/ f"CD{disc_num}"
)
print(f"\n Ripping to: {disc_dir}")
print(" " + "-" * 50)
try:
_, cddb_album, tracks = rip_disc(
device=config.device,
output_dir=disc_dir,
audio_format=config.audio_format,
quality=config.quality,
parallel_jobs=config.parallel_jobs,
use_pipes=config.use_pipes,
use_cddb=config.use_cddb,
rename=False,
)
print("\n " + "-" * 50)
if tracks:
label = f"'{cddb_album}'" if cddb_album else "?"
print(f" CDDB-Treffer: {label}{len(tracks)} Tracks:")
for t in tracks:
print(f" {t.track_number:2d}. {t.title} [{t.artist}]")
raw_ok = input("\n Treffer korrekt? (j/n) [j]: ")
if _clean_input(raw_ok).lower() not in ("n", "no", "nein"):
print(" Umbenennen ...", flush=True)
_rename_files(disc_dir, tracks, config.audio_format)
all_discs.append(DiscModel(
disc_number=disc_num,
tracks=[
TrackModel(
track_number=t.track_number,
title=t.title,
artist=t.artist,
)
for t in tracks
],
))
else:
print(
" CDDB-Daten verworfen — Dateien bleiben als track01.flac",
flush=True,
)
tracks = None
else:
print(" ✓ Fertig (keine CDDB-Daten)")
except RuntimeError as e:
print(f"\n ✗ Error: {e}")
raw_retry = input(" Try again? (y/n): ")
if _clean_input(raw_retry).lower() != "y":
print(" Aborting disc.")
break
continue
raw_next = input("\n Next CD for this album? (y/n): ")
if _clean_input(raw_next).lower() != "y":
break
disc_counter += 1
# CDDB-Albumname übernehmen, wenn der User nur den Default verwendet hat
if cddb_album and album_name == f"Album{album_counter}":
# CDDB DTITLE: "Artist / Album" → Album extrahieren
if " / " in cddb_album:
_, cddb_title = cddb_album.split(" / ", 1)
else:
cddb_title = cddb_album
album_name = cddb_title.strip()
# album_root = tatsächliches Elternverzeichnis der CD-Ordner
album_root = disc_dir.parent
if all_discs:
artist = all_discs[0].tracks[0].artist or ""
album_model = AlbumModel(artist=artist, album=album_name, discs=all_discs)
album_root.mkdir(parents=True, exist_ok=True)
json_path = album_root / "album.json"
json_path.write_text(
album_model.model_dump_json(indent=2), encoding="utf-8"
)
print(f"\n album.json gespeichert: {json_path}")
print(" → Weiter mit: musiksammlung apply <album-verzeichnis> album.json")
raw_album = input("\nNext album? (y/n): ")
if _clean_input(raw_album).lower() != "y":
break

170
tests/test_cddb.py Normal file
View file

@ -0,0 +1,170 @@
"""Tests für cddb.py — CddbResult, DYEAR/DGENRE-Parsing."""
from unittest.mock import MagicMock, patch
import httpx
from musiksammlung.cddb import CddbResult, _read_gnudb, lookup_by_discid
def _make_xmcd_response(
dtitle: str = "Artist / Album Title",
dyear: str = "",
dgenre: str = "",
ttitles: dict[int, str] | None = None,
) -> str:
"""Baut eine GnuDB-xmcd-Antwort zusammen."""
lines = ["210 OK"]
lines.append(f"DTITLE={dtitle}")
if dyear:
lines.append(f"DYEAR={dyear}")
if dgenre:
lines.append(f"DGENRE={dgenre}")
if ttitles is None:
ttitles = {0: "Track One", 1: "Track Two"}
for idx, title in sorted(ttitles.items()):
lines.append(f"TTITLE{idx}={title}")
lines.append(".")
return "\n".join(lines)
class TestReadGnudbCddbResult:
"""Tests für _read_gnudb mit CddbResult-Rückgabe."""
def _call(self, xmcd_text: str) -> CddbResult | None:
"""Ruft _read_gnudb mit gemockter HTTP-Antwort auf."""
mock_response = MagicMock()
mock_response.text = xmcd_text
mock_response.raise_for_status = MagicMock()
with patch("musiksammlung.cddb.httpx.get", return_value=mock_response):
with patch("musiksammlung.cddb.time.sleep"):
return _read_gnudb("rock", "ab0c1d0e")
def test_basic_result_fields(self) -> None:
"""CddbResult enthält Artist, Album, Tracks."""
text = _make_xmcd_response(dtitle="Beatles / Abbey Road")
result = self._call(text)
assert result is not None
assert isinstance(result, CddbResult)
assert result.artist == "Beatles"
assert result.album == "Abbey Road"
assert len(result.tracks) == 2
assert result.tracks[0].track_number == 1
assert result.tracks[0].title == "Track One"
def test_dyear_parsed(self) -> None:
"""DYEAR wird als int geparst."""
text = _make_xmcd_response(dyear="1969")
result = self._call(text)
assert result is not None
assert result.year == 1969
def test_dyear_empty(self) -> None:
"""Kein DYEAR → year=None."""
text = _make_xmcd_response(dyear="")
result = self._call(text)
assert result is not None
assert result.year is None
def test_dyear_invalid(self) -> None:
"""Ungültiges DYEAR → year=None."""
text = _make_xmcd_response(dyear="unknown")
result = self._call(text)
assert result is not None
assert result.year is None
def test_dgenre_parsed(self) -> None:
"""DGENRE wird übernommen."""
text = _make_xmcd_response(dgenre="Classical")
result = self._call(text)
assert result is not None
assert result.genre == "Classical"
def test_dgenre_empty(self) -> None:
"""Kein DGENRE → genre=''."""
text = _make_xmcd_response()
result = self._call(text)
assert result is not None
assert result.genre == ""
def test_dyear_and_dgenre_together(self) -> None:
"""Beide Felder gleichzeitig."""
text = _make_xmcd_response(
dtitle="Karajan / Beethoven Sinfonien",
dyear="1985",
dgenre="Classical",
)
result = self._call(text)
assert result is not None
assert result.artist == "Karajan"
assert result.album == "Beethoven Sinfonien"
assert result.year == 1985
assert result.genre == "Classical"
def test_no_ttitles_returns_none(self) -> None:
"""Keine TTITLEs → None."""
text = _make_xmcd_response(ttitles={})
result = self._call(text)
assert result is None
def test_dtitle_without_slash(self) -> None:
"""DTITLE ohne ' / ' → artist leer, album = gesamter DTITLE."""
text = _make_xmcd_response(dtitle="Just An Album Name")
result = self._call(text)
assert result is not None
assert result.artist == ""
assert result.album == "Just An Album Name"
def test_bad_status_code_returns_none(self) -> None:
"""Unerwarteter Statuscode → None."""
mock_response = MagicMock()
mock_response.text = "401 permission denied\n"
mock_response.raise_for_status = MagicMock()
with patch("musiksammlung.cddb.httpx.get", return_value=mock_response):
with patch("musiksammlung.cddb.time.sleep"):
result = _read_gnudb("rock", "ab0c1d0e")
assert result is None
class TestLookupByDiscidCddbResult:
"""Tests dass lookup_by_discid CddbResult zurückgibt."""
def test_returns_cddb_result(self) -> None:
"""Erfolgreicher Lookup liefert CddbResult."""
expected = CddbResult(
tracks=[],
artist="Test",
album="Album",
year=2000,
genre="Pop",
)
with (
patch("musiksammlung.cddb._query_gnudb", return_value=("rock", "ab0c1d0e")),
patch("musiksammlung.cddb._read_gnudb", return_value=expected),
):
result = lookup_by_discid("ab0c1d0e 2 150 1000 200")
assert result is expected
def test_returns_none_on_no_match(self) -> None:
"""Kein Treffer → None."""
with (
patch("musiksammlung.cddb._query_gnudb", return_value=None),
patch("musiksammlung.cddb.time.sleep"),
patch("musiksammlung.cddb.random.uniform", return_value=0.01),
):
result = lookup_by_discid("ab0c1d0e 2 150 1000 200", retries=0)
assert result is None

View file

@ -292,7 +292,7 @@ class TestRenameFiles:
# ---------------------------------------------------------------------------
# interactive_rip EAN/Barcode-Integration
# interactive_rip EAN-First Workflow
# ---------------------------------------------------------------------------
_MB_ALBUM = Album(
@ -310,87 +310,43 @@ _MB_ALBUM = Album(
],
)
_MB_ALBUM_2DISC = Album(
artist="The Beatles",
album="White Album",
year=1968,
discs=[
Disc(disc_number=1, tracks=[
Track(track_number=1, title="Back in the U.S.S.R."),
]),
Disc(disc_number=2, tracks=[
Track(track_number=1, title="Birthday"),
]),
],
)
_CDDB_TRACKS = [
TrackInfo(1, "The Beatles", "Come Together"),
TrackInfo(2, "The Beatles", "Something"),
]
def _make_rip_disc_mock(tracks: list[TrackInfo] | None = None):
"""Erstellt ein Mock für rip_disc, das sofort zurückgibt."""
mock = MagicMock(return_value=(Path("/tmp/disc"), "Album1", tracks or []))
return mock
class TestInteractiveRipEanFirst:
"""Tests für EAN-First Workflow in interactive_rip.
Neuer Flow:
1. EAN abfragen
2. MB-Treffer Auto-Rip (kein Albumname, kein CDDB-Confirm)
3. Kein Treffer Fallback (Albumname, CDDB-Confirm wie bisher)
"""
class TestInteractiveRipBarcode:
"""Tests für EAN/Barcode-Abfrage in interactive_rip."""
# ── Auto-Rip (MusicBrainz-Treffer) ──
def _run(self, tmp_path: Path, inputs: list[str], mb_album=None, mb_error=None):
"""Führt interactive_rip mit gemocktem I/O aus."""
config = RipperConfig(output_dir=tmp_path)
input_iter = iter(inputs)
with (
patch("musiksammlung.ripper.rip_disc", return_value=(tmp_path, None, _CDDB_TRACKS)),
patch("builtins.input", side_effect=input_iter),
patch(
"musiksammlung.ripper.lookup_by_barcode",
side_effect=mb_error if mb_error else (lambda _: mb_album) if mb_album else None,
) as mock_lookup,
):
interactive_rip(config)
return mock_lookup
def test_ean_skipped_does_not_call_musicbrainz(self, tmp_path: Path) -> None:
"""Kein EAN → lookup_by_barcode wird nicht aufgerufen."""
def test_mb_hit_auto_rip_single_disc(self, tmp_path: Path) -> None:
"""MB-Treffer → Auto-Rip: eine Disc, kein Albumname-Prompt, album.json aus MB."""
inputs = [
"Abbey Road", # album name
"", # EAN: leer → überspringen
"1", # disc number
"j", # CDDB korrekt?
"n", # next CD?
"n", # next album?
]
config = RipperConfig(output_dir=tmp_path)
with (
patch("musiksammlung.ripper.rip_disc", return_value=(tmp_path, None, _CDDB_TRACKS)),
patch("builtins.input", side_effect=iter(inputs)),
patch("musiksammlung.ripper.lookup_by_barcode") as mock_lookup,
):
interactive_rip(config)
mock_lookup.assert_not_called()
def test_ean_triggers_musicbrainz_lookup(self, tmp_path: Path) -> None:
"""EAN eingegeben → lookup_by_barcode wird mit der EAN aufgerufen."""
inputs = [
"Abbey Road", # album name
"0602557360561", # EAN
"1", # disc number
"j", # CDDB korrekt?
"n", # next CD?
"n", # next album?
]
config = RipperConfig(output_dir=tmp_path)
with (
patch("musiksammlung.ripper.rip_disc", return_value=(tmp_path, None, _CDDB_TRACKS)),
patch("builtins.input", side_effect=iter(inputs)),
patch("musiksammlung.ripper.lookup_by_barcode", return_value=_MB_ALBUM) as mock_lookup,
):
interactive_rip(config)
mock_lookup.assert_called_once_with("0602557360561")
def test_musicbrainz_data_saved_to_json(self, tmp_path: Path) -> None:
"""MusicBrainz-Daten werden in album.json gespeichert."""
inputs = [
"", # album name: leer → Default
"0602557360561", # EAN
"1", # disc number
"j", # CDDB korrekt?
"n", # next CD?
"n", # next album?
"0602557360561", # EAN → MB-Treffer
"", # "CD 1/1 einlegen und Enter drücken"
"n", # next album?
]
config = RipperConfig(output_dir=tmp_path)
with (
@ -408,15 +364,105 @@ class TestInteractiveRipBarcode:
assert data["album"] == "Abbey Road"
assert data["year"] == 1969
def test_musicbrainz_failure_falls_back_to_cddb(self, tmp_path: Path) -> None:
"""MusicBrainz-Fehler → CDDB-Daten werden verwendet, kein Absturz."""
def test_mb_hit_auto_rip_multi_disc(self, tmp_path: Path) -> None:
"""MB-Treffer mit 2 Discs → 2x Disc-Insert-Prompt, album.json aus MB."""
inputs = [
"Abbey Road", # album name
"0000000000000", # EAN (kein Treffer)
"1", # disc number
"j", # CDDB korrekt?
"n", # next CD?
"n", # next album?
"1234567890123", # EAN → MB-Treffer (2 Discs)
"", # "CD 1/2 einlegen und Enter drücken"
"", # "CD 2/2 einlegen und Enter drücken"
"n", # next album?
]
config = RipperConfig(output_dir=tmp_path)
with (
patch("musiksammlung.ripper.rip_disc", return_value=(tmp_path, None, _CDDB_TRACKS)),
patch("builtins.input", side_effect=iter(inputs)),
patch("musiksammlung.ripper.lookup_by_barcode", return_value=_MB_ALBUM_2DISC),
):
interactive_rip(config)
json_path = tmp_path / "White_Album" / "album.json"
assert json_path.exists()
import json
data = json.loads(json_path.read_text())
assert data["artist"] == "The Beatles"
assert data["album"] == "White Album"
assert data["year"] == 1968
assert len(data["discs"]) == 2
def test_mb_hit_triggers_lookup(self, tmp_path: Path) -> None:
"""EAN eingegeben → lookup_by_barcode wird aufgerufen."""
inputs = [
"0602557360561", # EAN
"", # disc insert
"n", # next album?
]
config = RipperConfig(output_dir=tmp_path)
with (
patch("musiksammlung.ripper.rip_disc", return_value=(tmp_path, None, _CDDB_TRACKS)),
patch("builtins.input", side_effect=iter(inputs)),
patch("musiksammlung.ripper.lookup_by_barcode", return_value=_MB_ALBUM) as mock_lookup,
):
interactive_rip(config)
mock_lookup.assert_called_once_with("0602557360561")
def test_mb_hit_renames_with_cddb_tracks(self, tmp_path: Path) -> None:
"""Auto-Rip nutzt CDDB-Tracks zum Umbenennen der Dateien."""
inputs = [
"0602557360561",
"", # disc insert
"n",
]
config = RipperConfig(output_dir=tmp_path)
disc_dir = tmp_path / "Abbey_Road" / "CD1"
disc_dir.mkdir(parents=True)
(disc_dir / "track01.flac").touch()
(disc_dir / "track02.flac").touch()
with (
patch("musiksammlung.ripper.rip_disc", return_value=(disc_dir, None, _CDDB_TRACKS)),
patch("builtins.input", side_effect=iter(inputs)),
patch("musiksammlung.ripper.lookup_by_barcode", return_value=_MB_ALBUM),
):
interactive_rip(config)
# _rename_files wurde aufgerufen (tracks vorhanden)
# Dateien existieren schon vorher, rename findet in _rename_files statt
assert (tmp_path / "Abbey_Road" / "album.json").exists()
# ── Fallback (kein MB-Treffer / EAN leer) ──
def test_ean_empty_falls_back_to_album_prompt(self, tmp_path: Path) -> None:
"""Leere EAN → Fallback: Albumname wird abgefragt."""
inputs = [
"", # EAN: leer → überspringen
"Abbey Road", # album name (Fallback-Flow)
"1", # disc number
"j", # CDDB korrekt?
"n", # next CD?
"n", # next album?
]
config = RipperConfig(output_dir=tmp_path)
with (
patch("musiksammlung.ripper.rip_disc", return_value=(tmp_path, None, _CDDB_TRACKS)),
patch("builtins.input", side_effect=iter(inputs)),
patch("musiksammlung.ripper.lookup_by_barcode") as mock_lookup,
):
interactive_rip(config)
mock_lookup.assert_not_called()
json_path = tmp_path / "Abbey_Road" / "album.json"
assert json_path.exists()
def test_mb_miss_falls_back_to_album_prompt(self, tmp_path: Path) -> None:
"""MB-Fehler → Fallback: Albumname wird abgefragt, CDDB-Daten genutzt."""
inputs = [
"0000000000000", # EAN (kein Treffer)
"Abbey Road", # album name (Fallback-Flow)
"1", # disc number
"j", # CDDB korrekt?
"n", # next CD?
"n", # next album?
]
config = RipperConfig(output_dir=tmp_path)
with (
@ -429,53 +475,54 @@ class TestInteractiveRipBarcode:
):
interactive_rip(config) # darf nicht werfen
# CDDB-basierte album.json wurde erstellt
json_path = tmp_path / "Abbey_Road" / "album.json"
assert json_path.exists()
def test_album_name_taken_from_musicbrainz_when_default(self, tmp_path: Path) -> None:
"""Albumnamen wird von MusicBrainz übernommen wenn kein Name manuell eingegeben."""
def test_fallback_cddb_confirm_rejected(self, tmp_path: Path) -> None:
"""Fallback: CDDB-Daten abgelehnt → keine album.json."""
inputs = [
"", # album name: leer → Default (Album1)
"0602557360561", # EAN
"1", # disc number
"j", # CDDB korrekt?
"n",
"n",
"", # EAN: leer
"TestAlbum", # album name
"1", # disc number
"n", # CDDB korrekt? → nein
"n", # next CD?
"n", # next album?
]
config = RipperConfig(output_dir=tmp_path)
with (
patch("musiksammlung.ripper.rip_disc", return_value=(tmp_path, None, _CDDB_TRACKS)),
patch("builtins.input", side_effect=iter(inputs)),
patch("musiksammlung.ripper.lookup_by_barcode", return_value=_MB_ALBUM),
patch("musiksammlung.ripper.lookup_by_barcode"),
):
interactive_rip(config)
# Verzeichnis und JSON nach MusicBrainz-Namen benannt
assert (tmp_path / "Abbey_Road" / "album.json").exists()
# Keine album.json, da CDDB-Daten verworfen
json_path = tmp_path / "TestAlbum" / "album.json"
assert not json_path.exists()
def test_manual_album_name_kept_when_not_default(self, tmp_path: Path) -> None:
"""Manuell eingegebener Albumname → album.json liegt im manuellen Verzeichnis."""
def test_fallback_multiple_discs(self, tmp_path: Path) -> None:
"""Fallback: Mehrere CDs für ein Album."""
inputs = [
"Mein Album", # manuell eingegebener Name
"0602557360561", # EAN
"1", # disc number
"j", # CDDB korrekt?
"n",
"n",
"", # EAN: leer
"TestAlbum", # album name
"1", # disc 1
"j", # CDDB korrekt?
"y", # next CD? → ja
"2", # disc 2
"j", # CDDB korrekt?
"n", # next CD?
"n", # next album?
]
config = RipperConfig(output_dir=tmp_path)
with (
patch("musiksammlung.ripper.rip_disc", return_value=(tmp_path, None, _CDDB_TRACKS)),
patch("builtins.input", side_effect=iter(inputs)),
patch("musiksammlung.ripper.lookup_by_barcode", return_value=_MB_ALBUM),
patch("musiksammlung.ripper.lookup_by_barcode"),
):
interactive_rip(config)
# album.json liegt im Verzeichnis mit dem manuellen Namen (dort liegen die Dateien),
# Inhalt stammt aber aus MusicBrainz
json_path = tmp_path / "Mein_Album" / "album.json"
json_path = tmp_path / "TestAlbum" / "album.json"
assert json_path.exists()
import json
data = json.loads(json_path.read_text())
assert data["artist"] == "The Beatles"
assert len(data["discs"]) == 2