Musiksammlung/tests/test_ripper.py
dschlueter 32c84b9edb Add phone-based EAN scanning, scanner server for cover upload, Vision-LLM integration
New features:
- EAN/Barcode can now be entered by typing or by photographing the CD sleeve;
  Vision-LLM (extract_barcode_from_image) reads the barcode from the photo
- Scanner server (port 8765) starts at the beginning of every album loop,
  serving both EAN barcode scanning and back cover upload via QR code
- Vision-LLM analyses back cover in background thread while ripping;
  priority: Vision-LLM > MusicBrainz > CDDB
- _find_abcde_mbid reads MBID from abcde temp dirs for CAA cover download
  even when the CD barcode is not linked in MusicBrainz
- Concrete copy-paste apply commands shown after each album in 'Next steps'
- _sanitize_name: whitelist approach (removes brackets and punctuation)
- qrcode added as dependency for terminal QR code display

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-19 14:05:59 +01:00

578 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Tests für den CD-Ripper."""
from pathlib import Path
from unittest.mock import MagicMock, patch
from musiksammlung.config import AudioFormat
from musiksammlung.models import Album, Disc, Track, TrackInfo
from musiksammlung.ripper import (
RipperConfig,
_clean_input,
_extract_tracks,
_parse_cddb_lines,
_parse_grab_tracks,
_rename_files,
_sanitize_name,
interactive_rip,
)
class TestSanitizeName:
"""Tests für _sanitize_name."""
def test_replace_spaces(self) -> None:
assert _sanitize_name("Hello World") == "Hello_World"
assert _sanitize_name("Test Track Title") == "Test_Track_Title"
def test_remove_invalid_chars(self) -> None:
assert _sanitize_name("Test/Track:Name") == "TestTrackName"
assert _sanitize_name('Test<Track>"Name') == "TestTrackName"
assert _sanitize_name("Test|Track?Name*") == "TestTrackName"
assert _sanitize_name("It's_a_Test") == "Its_a_Test"
def test_remove_brackets_and_punctuation(self) -> None:
assert _sanitize_name("Best of (1990)") == "Best_of_1990"
assert _sanitize_name("Hello, World!") == "Hello_World"
assert _sanitize_name("Vol. 2") == "Vol_2"
assert _sanitize_name("Salt & Pepper [Remix]") == "Salt_Pepper_Remix"
assert _sanitize_name("The Best of... (Deluxe Edition)") == "The_Best_of_Deluxe_Edition"
def test_keep_umlauts(self) -> None:
assert _sanitize_name("Grüße aus Österreich") == "Grüße_aus_Österreich"
assert _sanitize_name("Schöne Sänger") == "Schöne_Sänger"
def test_strip_underscores(self) -> None:
assert _sanitize_name("_Test_") == "Test"
assert _sanitize_name(" _Test_ ") == "Test"
class TestCleanInput:
"""Tests für _clean_input."""
def test_strips_whitespace(self) -> None:
assert _clean_input(" hello ") == "hello"
def test_strips_quotes(self) -> None:
assert _clean_input('"Album Name"') == "Album Name"
assert _clean_input("'Album Name'") == "Album Name"
def test_removes_ansi_escape(self) -> None:
# ESC-based sequences (e.g. \x1b[A from arrow keys) are removed
assert _clean_input("\x1b[A word") == "word"
assert _clean_input("\x1b[1;5D text") == "text"
def test_empty_string(self) -> None:
assert _clean_input("") == ""
def test_plain_input_unchanged(self) -> None:
assert _clean_input("Beethoven Sinfonien") == "Beethoven Sinfonien"
class TestParseCddbLines:
"""Tests für _parse_cddb_lines."""
def test_parse_single_track_title_only(self) -> None:
"""Reguläres Album: Zeile ohne ' / ' → artist leer, gesamter Inhalt = Titel."""
lines = ["1: Für Elise"]
tracks = _parse_cddb_lines(lines)
assert len(tracks) == 1
assert tracks[0].track_number == 1
assert tracks[0].artist == ""
assert tracks[0].title == "Für Elise"
def test_parse_regular_multiple_tracks(self) -> None:
"""Mehrere reguläre Tracks werden korrekt geparst."""
lines = [
"1: First Title",
"2: Second Title",
"3: Third Title",
]
tracks = _parse_cddb_lines(lines)
assert len(tracks) == 3
assert tracks[2].track_number == 3
assert tracks[2].artist == ""
assert tracks[2].title == "Third Title"
def test_dash_in_title_not_split(self) -> None:
"""' - ' in klassischen Titeln wird NICHT als Künstler-Separator behandelt."""
lines = ['1: Sonata "Tempest": I. Largo - Allegro']
tracks = _parse_cddb_lines(lines)
assert tracks[0].artist == ""
assert tracks[0].title == 'Sonata "Tempest": I. Largo - Allegro'
def test_ignores_non_matching_lines(self) -> None:
lines = [
"Retrieving CDDB info ...",
"1: Artist - Title",
"Tagging track 1 of 1",
]
tracks = _parse_cddb_lines(lines)
assert len(tracks) == 1
def test_compilation_slash_separator(self) -> None:
"""Kompilations-Format: 'N: Artist / Title' wird korrekt geparst."""
lines = [
"1: Trini Lopez / This Land Is Your Land (live)",
"2: The Foundations / In the Bad Bad Old Days",
]
tracks = _parse_cddb_lines(lines)
assert len(tracks) == 2
assert tracks[0].artist == "Trini Lopez"
assert tracks[0].title == "This Land Is Your Land (live)"
assert tracks[1].artist == "The Foundations"
assert tracks[1].title == "In the Bad Bad Old Days"
def test_empty_input(self) -> None:
assert _parse_cddb_lines([]) == []
class TestParseGrabTracks:
"""Tests für _parse_grab_tracks."""
def test_artist_slash_title(self) -> None:
data = [(1, "Trini Lopez / This Land Is Your Land (live)")]
tracks = _parse_grab_tracks(data)
assert len(tracks) == 1
assert tracks[0].track_number == 1
assert tracks[0].artist == "Trini Lopez"
assert tracks[0].title == "This Land Is Your Land (live)"
def test_title_only_no_slash(self) -> None:
"""Ohne Slash → leerer Künstler, Titel = gesamter String."""
data = [(3, "Beethoven 5. Sinfonie")]
tracks = _parse_grab_tracks(data)
assert tracks[0].artist == ""
assert tracks[0].title == "Beethoven 5. Sinfonie"
def test_multiple_tracks(self) -> None:
data = [
(1, "KC and the Sunshine Band / Give It Up"),
(2, "Sam & Dave / Can't You Find Another Way"),
]
tracks = _parse_grab_tracks(data)
assert len(tracks) == 2
assert tracks[1].artist == "Sam & Dave"
assert tracks[1].title == "Can't You Find Another Way"
def test_empty_input(self) -> None:
assert _parse_grab_tracks([]) == []
class TestRipperConfig:
"""Tests für RipperConfig."""
def test_default_config(self) -> None:
config = RipperConfig()
assert config.device == "/dev/cdrom"
assert config.audio_format == AudioFormat.FLAC
assert config.quality == "high"
assert config.parallel_jobs == 1
assert config.use_pipes is False
assert config.use_cddb is True
def test_custom_config(self) -> None:
config = RipperConfig(
device="/dev/sr0",
audio_format=AudioFormat.MP3,
quality="low",
parallel_jobs=4,
use_pipes=True,
use_cddb=False,
)
assert config.device == "/dev/sr0"
assert config.audio_format == AudioFormat.MP3
assert config.quality == "low"
assert config.parallel_jobs == 4
assert config.use_pipes is True
assert config.use_cddb is False
class TestAudioFormat:
"""Tests für AudioFormat-Methoden."""
def test_abcde_format(self) -> None:
assert AudioFormat.FLAC.get_abcde_format() == "flac"
assert AudioFormat.MP3.get_abcde_format() == "mp3"
assert AudioFormat.OPUS.get_abcde_format() == "opus"
assert AudioFormat.AAC.get_abcde_format() == "m4a"
assert AudioFormat.WAV.get_abcde_format() == "wav"
def test_extension(self) -> None:
assert AudioFormat.FLAC.extension == ".flac"
assert AudioFormat.MP3.extension == ".mp3"
assert AudioFormat.OPUS.extension == ".opus"
assert AudioFormat.AAC.extension == ".aac"
assert AudioFormat.WAV.extension == ".wav"
def test_encoder_options_flac(self) -> None:
assert AudioFormat.FLAC.get_encoder_options("low") == ""
assert AudioFormat.FLAC.get_encoder_options("medium") == ""
assert AudioFormat.FLAC.get_encoder_options("high") == "-8"
def test_encoder_options_mp3(self) -> None:
assert AudioFormat.MP3.get_encoder_options("low") == "-V 7"
assert AudioFormat.MP3.get_encoder_options("medium") == "-V 5"
assert AudioFormat.MP3.get_encoder_options("high") == "-V 0"
def test_encoder_options_opus(self) -> None:
assert AudioFormat.OPUS.get_encoder_options("low") == "-b 96"
assert AudioFormat.OPUS.get_encoder_options("medium") == "-b 128"
assert AudioFormat.OPUS.get_encoder_options("high") == "-b 192"
def test_encoder_options_aac(self) -> None:
assert AudioFormat.AAC.get_encoder_options("low") == "-q:a 2"
assert AudioFormat.AAC.get_encoder_options("medium") == "-q:a 3"
assert AudioFormat.AAC.get_encoder_options("high") == "-q:a 4"
def test_encoder_options_wav(self) -> None:
assert AudioFormat.WAV.get_encoder_options("low") == ""
assert AudioFormat.WAV.get_encoder_options("high") == ""
class TestExtractTracks:
"""Tests für _extract_tracks."""
def test_moves_files_from_subdir(self, tmp_path: Path) -> None:
subdir = tmp_path / "abcde.XXXX"
subdir.mkdir()
(subdir / "track01.flac").touch()
(subdir / "track02.flac").touch()
result = _extract_tracks(tmp_path, AudioFormat.FLAC)
assert len(result) == 2
assert (tmp_path / "track01.flac").exists()
assert (tmp_path / "track02.flac").exists()
# Files no longer in subdir
assert not (subdir / "track01.flac").exists()
def test_leaves_already_flat_files(self, tmp_path: Path) -> None:
(tmp_path / "track01.flac").touch()
result = _extract_tracks(tmp_path, AudioFormat.FLAC)
assert len(result) == 1
def test_ignores_wrong_format(self, tmp_path: Path) -> None:
(tmp_path / "track01.mp3").touch()
result = _extract_tracks(tmp_path, AudioFormat.FLAC)
assert result == []
def test_empty_directory(self, tmp_path: Path) -> None:
result = _extract_tracks(tmp_path, AudioFormat.FLAC)
assert result == []
class TestRenameFiles:
"""Tests für _rename_files."""
def test_renames_with_cddb_data(self, tmp_path: Path) -> None:
(tmp_path / "track01.flac").touch()
(tmp_path / "track02.flac").touch()
tracks = [
TrackInfo(1, "Karajan", "Allegro con brio"),
TrackInfo(2, "Karajan", "Andante con moto"),
]
_rename_files(tmp_path, tracks, AudioFormat.FLAC)
assert (tmp_path / "01_-_Allegro_con_brio_-_Karajan.flac").exists()
assert (tmp_path / "02_-_Andante_con_moto_-_Karajan.flac").exists()
def test_fallback_without_cddb(self, tmp_path: Path) -> None:
(tmp_path / "track01.flac").touch()
_rename_files(tmp_path, [], AudioFormat.FLAC)
# No CDDB data → fallback to "01.flac"
assert (tmp_path / "01.flac").exists()
def test_sanitizes_title_in_filename(self, tmp_path: Path) -> None:
(tmp_path / "track01.flac").touch()
# _sanitize_name: spaces → _, then chars like :/\ are removed (not replaced)
tracks = [TrackInfo(1, "Art ist", "My Title")]
_rename_files(tmp_path, tracks, AudioFormat.FLAC)
assert (tmp_path / "01_-_My_Title_-_Art_ist.flac").exists()
def test_regular_track_without_artist(self, tmp_path: Path) -> None:
"""Reguläre Tracks (artist='') → kein '_-_Artist'-Suffix."""
(tmp_path / "track01.flac").touch()
tracks = [TrackInfo(1, "", "Allegro con brio")]
_rename_files(tmp_path, tracks, AudioFormat.FLAC)
assert (tmp_path / "01_-_Allegro_con_brio.flac").exists()
# ---------------------------------------------------------------------------
# interactive_rip EAN-First Workflow
# ---------------------------------------------------------------------------
_MB_ALBUM = Album(
artist="The Beatles",
album="Abbey Road",
year=1969,
discs=[
Disc(
disc_number=1,
tracks=[
Track(track_number=1, title="Come Together"),
Track(track_number=2, title="Something"),
],
)
],
)
_MB_ALBUM_2DISC = Album(
artist="The Beatles",
album="White Album",
year=1968,
discs=[
Disc(disc_number=1, tracks=[
Track(track_number=1, title="Back in the U.S.S.R."),
]),
Disc(disc_number=2, tracks=[
Track(track_number=1, title="Birthday"),
]),
],
)
_CDDB_TRACKS = [
TrackInfo(1, "The Beatles", "Come Together"),
TrackInfo(2, "The Beatles", "Something"),
]
class TestInteractiveRipEanFirst:
"""Tests für EAN-First Workflow in interactive_rip.
Neuer Flow:
1. EAN abfragen
2. MB-Treffer → Auto-Rip (kein Albumname, kein CDDB-Confirm)
3. Kein Treffer → Fallback (Albumname, CDDB-Confirm wie bisher)
"""
# ── Auto-Rip (MusicBrainz-Treffer) ──
def test_mb_hit_auto_rip_single_disc(self, tmp_path: Path) -> None:
"""MB-Treffer → Auto-Rip: eine Disc, kein Albumname-Prompt, album.json aus MB."""
inputs = [
"0602557360561", # EAN → MB-Treffer
"", # "CD 1/1 einlegen und Enter drücken"
"n", # next album?
]
config = RipperConfig(output_dir=tmp_path)
sp = self._scanner_patches()
with (
sp[0], sp[1],
patch("musiksammlung.ripper.rip_disc", return_value=(tmp_path, None, _CDDB_TRACKS)),
patch("builtins.input", side_effect=iter(inputs)),
patch("musiksammlung.ripper.lookup_by_barcode", return_value=(_MB_ALBUM, "fake-mbid")),
patch("musiksammlung.ripper.download_caa_covers") as mock_caa,
):
interactive_rip(config)
json_path = tmp_path / "Abbey_Road" / "album.json"
assert json_path.exists()
import json
data = json.loads(json_path.read_text())
assert data["artist"] == "The Beatles"
assert data["album"] == "Abbey Road"
assert data["year"] == 1969
mock_caa.assert_called_once_with("fake-mbid", tmp_path / "Abbey_Road")
def test_mb_hit_auto_rip_multi_disc(self, tmp_path: Path) -> None:
"""MB-Treffer mit 2 Discs → 2x Disc-Insert-Prompt, album.json aus MB."""
inputs = [
"1234567890123", # EAN → MB-Treffer (2 Discs)
"", # "CD 1/2 einlegen und Enter drücken"
"", # "CD 2/2 einlegen und Enter drücken"
"n", # next album?
]
config = RipperConfig(output_dir=tmp_path)
sp = self._scanner_patches()
with (
sp[0], sp[1],
patch("musiksammlung.ripper.rip_disc", return_value=(tmp_path, None, _CDDB_TRACKS)),
patch("builtins.input", side_effect=iter(inputs)),
patch("musiksammlung.ripper.lookup_by_barcode",
return_value=(_MB_ALBUM_2DISC, "fake-mbid-2")),
patch("musiksammlung.ripper.download_caa_covers"),
):
interactive_rip(config)
json_path = tmp_path / "White_Album" / "album.json"
assert json_path.exists()
import json
data = json.loads(json_path.read_text())
assert data["artist"] == "The Beatles"
assert data["album"] == "White Album"
assert data["year"] == 1968
assert len(data["discs"]) == 2
def test_mb_hit_triggers_lookup(self, tmp_path: Path) -> None:
"""EAN eingegeben → lookup_by_barcode wird aufgerufen."""
inputs = [
"0602557360561", # EAN
"", # disc insert
"n", # next album?
]
config = RipperConfig(output_dir=tmp_path)
sp = self._scanner_patches()
with (
sp[0], sp[1],
patch("musiksammlung.ripper.rip_disc", return_value=(tmp_path, None, _CDDB_TRACKS)),
patch("builtins.input", side_effect=iter(inputs)),
patch("musiksammlung.ripper.lookup_by_barcode",
return_value=(_MB_ALBUM, "fake-mbid")) as mock_lookup,
patch("musiksammlung.ripper.download_caa_covers"),
):
interactive_rip(config)
mock_lookup.assert_called_once_with("0602557360561")
def test_mb_hit_renames_with_cddb_tracks(self, tmp_path: Path) -> None:
"""Auto-Rip nutzt CDDB-Tracks zum Umbenennen der Dateien."""
inputs = [
"0602557360561",
"", # disc insert
"n",
]
config = RipperConfig(output_dir=tmp_path)
disc_dir = tmp_path / "Abbey_Road" / "CD1"
disc_dir.mkdir(parents=True)
(disc_dir / "track01.flac").touch()
(disc_dir / "track02.flac").touch()
sp = self._scanner_patches()
with (
sp[0], sp[1],
patch("musiksammlung.ripper.rip_disc", return_value=(disc_dir, None, _CDDB_TRACKS)),
patch("builtins.input", side_effect=iter(inputs)),
patch("musiksammlung.ripper.lookup_by_barcode", return_value=(_MB_ALBUM, "fake-mbid")),
patch("musiksammlung.ripper.download_caa_covers"),
):
interactive_rip(config)
# _rename_files wurde aufgerufen (tracks vorhanden)
# Dateien existieren schon vorher, rename findet in _rename_files statt
assert (tmp_path / "Abbey_Road" / "album.json").exists()
# ── Gemeinsame Scanner-Patches (alle interactive_rip-Tests) ──
#
# Ab EAN-Prompt startet interactive_rip immer einen ScannerServer und
# nutzt _input_or_scan. Beide werden in allen Tests gemockt.
@staticmethod
def _scanner_patches():
"""Patches für ScannerServer und _input_or_scan (alle interactive_rip-Tests)."""
mock_scanner = MagicMock()
mock_scanner.url.return_value = "http://127.0.0.1:8765"
mock_scanner.get_photo.return_value = None
return [
patch("musiksammlung.ripper.ScannerServer", return_value=mock_scanner),
patch(
"musiksammlung.ripper._input_or_scan",
side_effect=lambda prompt, scanner: (input(prompt), None),
),
]
@staticmethod
def _fallback_patches(inputs: list[str]):
"""Gemeinsame Patches für Fallback-Tests."""
patches = TestInteractiveRipEanFirst._scanner_patches()
patches.append(patch("builtins.input", side_effect=iter(inputs)))
return patches
def test_ean_empty_falls_back_to_album_prompt(self, tmp_path: Path) -> None:
"""Leere EAN → Fallback: Albumname wird abgefragt."""
inputs = [
"", # EAN: leer → überspringen
"Abbey Road", # album name (Fallback-Flow)
"1", # disc number
"j", # CDDB korrekt?
"n", # next CD?
"n", # next album?
]
config = RipperConfig(output_dir=tmp_path)
patches = self._fallback_patches(inputs)
with (
patches[0], patches[1], patches[2],
patch("musiksammlung.ripper.rip_disc", return_value=(tmp_path, None, _CDDB_TRACKS)),
patch("musiksammlung.ripper.lookup_by_barcode") as mock_lookup,
):
interactive_rip(config)
mock_lookup.assert_not_called()
json_path = tmp_path / "Abbey_Road" / "album.json"
assert json_path.exists()
def test_mb_miss_falls_back_to_album_prompt(self, tmp_path: Path) -> None:
"""MB-Fehler → Fallback: Albumname wird abgefragt, CDDB-Daten genutzt."""
inputs = [
"0000000000000", # EAN (kein Treffer)
"Abbey Road", # album name (Fallback-Flow)
"1", # disc number
"j", # CDDB korrekt?
"n", # next CD?
"n", # next album?
]
config = RipperConfig(output_dir=tmp_path)
patches = self._fallback_patches(inputs)
with (
patches[0], patches[1], patches[2],
patch("musiksammlung.ripper.rip_disc", return_value=(tmp_path, None, _CDDB_TRACKS)),
patch(
"musiksammlung.ripper.lookup_by_barcode",
side_effect=ValueError("Kein MusicBrainz-Eintrag"),
),
):
interactive_rip(config)
json_path = tmp_path / "Abbey_Road" / "album.json"
assert json_path.exists()
def test_fallback_cddb_confirm_rejected(self, tmp_path: Path) -> None:
"""Fallback: CDDB-Daten abgelehnt → keine album.json."""
inputs = [
"", # EAN: leer
"TestAlbum", # album name
"1", # disc number
"n", # CDDB korrekt? → nein
"n", # next CD?
"n", # next album?
]
config = RipperConfig(output_dir=tmp_path)
patches = self._fallback_patches(inputs)
with (
patches[0], patches[1], patches[2],
patch("musiksammlung.ripper.rip_disc", return_value=(tmp_path, None, _CDDB_TRACKS)),
patch("musiksammlung.ripper.lookup_by_barcode"),
):
interactive_rip(config)
# Keine album.json, da CDDB-Daten verworfen
json_path = tmp_path / "TestAlbum" / "album.json"
assert not json_path.exists()
def test_fallback_multiple_discs(self, tmp_path: Path) -> None:
"""Fallback: Mehrere CDs für ein Album."""
inputs = [
"", # EAN: leer
"TestAlbum", # album name
"1", # disc 1
"j", # CDDB korrekt?
"y", # next CD? → ja
"2", # disc 2
"j", # CDDB korrekt?
"n", # next CD?
"n", # next album?
]
config = RipperConfig(output_dir=tmp_path)
patches = self._fallback_patches(inputs)
with (
patches[0], patches[1], patches[2],
patch("musiksammlung.ripper.rip_disc", return_value=(tmp_path, None, _CDDB_TRACKS)),
patch("musiksammlung.ripper.lookup_by_barcode"),
):
interactive_rip(config)
json_path = tmp_path / "TestAlbum" / "album.json"
assert json_path.exists()
import json
data = json.loads(json_path.read_text())
assert len(data["discs"]) == 2