Remove tests/ from repo, update .gitignore, improve ripper

- Remove tests/ directory from version control (added to .gitignore)
- Add .idea/ to .gitignore
- Ripper: CDDB lookup, non-interactive mode, English UI, file renaming
- Config: abcde format mapping, per-format quality options
- CLI: English help texts, new --no-cddb / --pipes / --parallel / --quality options

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
This commit is contained in:
Dieter Schlüter 2026-02-17 17:35:34 +01:00
commit 851dbf3a46
9 changed files with 511 additions and 217 deletions

2
.gitignore vendored
View file

@ -11,6 +11,8 @@ dist/
*.egg *.egg
idea/ idea/
.idea/
tests/
testdata/ testdata/
CLAUDE.md CLAUDE.md

View file

@ -8,12 +8,14 @@ from pathlib import Path
import typer import typer
from musiksammlung.config import AudioFormat
from musiksammlung.cover import copy_covers from musiksammlung.cover import copy_covers
from musiksammlung.llm_parser import parse_tracklist from musiksammlung.llm_parser import parse_tracklist
from musiksammlung.models import Album from musiksammlung.models import Album
from musiksammlung.ocr import ocr_images from musiksammlung.ocr import ocr_images
from musiksammlung.organizer import apply_mapping, build_mapping from musiksammlung.organizer import apply_mapping, build_mapping
from musiksammlung.playlist import generate_playlist from musiksammlung.playlist import generate_playlist
from musiksammlung.ripper import RipperConfig, interactive_rip
from musiksammlung.tagger import tag_album from musiksammlung.tagger import tag_album
from musiksammlung.vision_llm import parse_image from musiksammlung.vision_llm import parse_image
@ -169,6 +171,72 @@ def apply(
typer.echo(f"Fertig! Album liegt in: {album_dir}") typer.echo(f"Fertig! Album liegt in: {album_dir}")
@app.command()
def rip(
output_dir: Path = typer.Option(
Path("temp"), "--output", "-o", help="Output directory for ripped CDs"
),
device: str = typer.Option(
"/dev/cdrom", "--device", "-d", help="CD drive device"
),
audio_format: str = typer.Option(
"flac", "--format", "-f", help="Audio format (flac, mp3, opus, aac, wav)"
),
quality: str = typer.Option(
"medium", "--quality", "-q", help="Quality (low, medium, high)"
),
parallel: int = typer.Option(
1, "--parallel", "-j", help="Number of parallel encoder processes (0 = all)"
),
pipes: bool = typer.Option(
False, "--pipes", "-P", help="Use pipes (faster, no WAV files)"
),
no_cddb: bool = typer.Option(
False, "--no-cddb", help="Disable CDDB lookup"
),
) -> None:
"""Interactive CD ripping with abcde.
Rips multiple CDs in sequence to:
temp/Album_Name/CD1/01_-_title_-_artist.flac, ...
Supported formats: flac, mp3, opus, aac, wav
Quality settings per format:
FLAC: high = -8 (best compression)
MP3: low=-V7, medium=-V5, high=-V0 (VBR)
OPUS: low=96kbit, medium=128kbit, high=192kbit
AAC: low/medium/high (quality 2-4)
Filenames (when CDDB available):
<two-digit track_number>_-_title_-_artist>.extension
Spaces are replaced with underscores.
abcde rips with CDDB lookup and automatically renames files.
"""
try:
fmt = AudioFormat(audio_format.lower())
except ValueError:
typer.echo(f"Error: Invalid format '{audio_format}'", err=True)
typer.echo("Valid formats: flac, mp3, opus, aac, wav", err=True)
raise typer.Exit(1)
if quality not in ("low", "medium", "high"):
typer.echo(f"Error: Invalid quality '{quality}'", err=True)
typer.echo("Valid qualities: low, medium, high", err=True)
raise typer.Exit(1)
config = RipperConfig(
device=device,
audio_format=fmt,
output_dir=output_dir,
quality=quality,
parallel_jobs=parallel,
use_pipes=pipes,
use_cddb=not no_cddb,
)
interactive_rip(config)
@app.command() @app.command()
def process( def process(
input_dir: Path = typer.Argument( input_dir: Path = typer.Argument(

View file

@ -2,6 +2,7 @@
from __future__ import annotations from __future__ import annotations
from enum import Enum
from pathlib import Path from pathlib import Path
from pydantic import BaseModel from pydantic import BaseModel
@ -9,6 +10,74 @@ from pydantic import BaseModel
# Unterstützte Audio-Formate # Unterstützte Audio-Formate
AUDIO_EXTENSIONS = {".flac", ".mp3", ".ogg", ".opus", ".wav", ".m4a"} AUDIO_EXTENSIONS = {".flac", ".mp3", ".ogg", ".opus", ".wav", ".m4a"}
class AudioFormat(str, Enum):
"""Unterstützte Audio-Formate für die Ausgabe."""
FLAC = "flac"
MP3 = "mp3"
OPUS = "opus"
AAC = "aac"
WAV = "wav"
@property
def extension(self) -> str:
"""Dateiendung für das Format."""
return f".{self.value.lower()}"
@property
def encoder_cmd(self) -> str:
"""Encoder-Kommando für das Format."""
encoders = {
AudioFormat.FLAC: "flac",
AudioFormat.MP3: "lame",
AudioFormat.OPUS: "opusenc",
AudioFormat.AAC: "ffmpeg",
}
if self == AudioFormat.WAV:
raise ValueError("WAV benötigt kein Encoding")
return encoders[self]
def get_abcde_format(self) -> str:
"""Gibt das abcde-Format zurück."""
# abcde verwendet 'm4a' für AAC
return "m4a" if self == AudioFormat.AAC else self.value
def get_encoder_options(self, quality: str = "medium") -> str:
"""Encoder-Optionen für abcde.
Args:
quality: low, medium, high (default: medium)
"""
options = {
AudioFormat.FLAC: {
"low": "",
"medium": "",
"high": "-8",
},
AudioFormat.MP3: {
"low": "-V 7",
"medium": "-V 5",
"high": "-V 0",
},
AudioFormat.OPUS: {
"low": "-b 96",
"medium": "-b 128",
"high": "-b 192",
},
AudioFormat.AAC: {
"low": "-q:a 2",
"medium": "-q:a 3",
"high": "-q:a 4",
},
AudioFormat.WAV: {
"low": "",
"medium": "",
"high": "",
},
}
return options[self][quality]
# Standard-Bilddateien, die als Cover/Rückseite erkannt werden # Standard-Bilddateien, die als Cover/Rückseite erkannt werden
DEFAULT_FRONT_PATTERNS = ["cover_front.*", "front.*", "cover.*"] DEFAULT_FRONT_PATTERNS = ["cover_front.*", "front.*", "cover.*"]
DEFAULT_BACK_PATTERNS = ["cover_back.*", "back.*", "inlay.*", "booklet.*"] DEFAULT_BACK_PATTERNS = ["cover_back.*", "back.*", "inlay.*", "booklet.*"]

View file

@ -1,47 +1,213 @@
"""CD-Ripping via abcde.""" """CD-Ripping via abcde with interactive multi-disc workflow."""
from __future__ import annotations from __future__ import annotations
import logging import logging
import re
import subprocess import subprocess
from pathlib import Path from pathlib import Path
from typing import NamedTuple
from pydantic import BaseModel
from musiksammlung.config import AudioFormat
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def rip_disc( class TrackInfo(NamedTuple):
device: str, """Track information from abcde."""
output_dir: Path,
audio_format: str = "flac", track_number: int
eject: bool = True, artist: str
) -> Path: title: str
"""Rippt eine CD mit abcde in output_dir.
class RipperConfig(BaseModel):
"""Configuration for ripping process."""
device: str = "/dev/cdrom"
audio_format: AudioFormat = AudioFormat.FLAC
output_dir: Path = Path("temp")
quality: str = "medium" # low, medium, high
parallel_jobs: int = 1 # Number of parallel encoder processes
use_pipes: bool = False # True = faster, no WAV files
use_cddb: bool = True # Use CDDB lookup
def _sanitize_name(name: str) -> str:
"""Remove problematic characters and replace spaces.
Args: Args:
device: CD-Laufwerk, z.B. '/dev/cdrom' name: Original name
output_dir: Zielverzeichnis für die gerippten Dateien
audio_format: Ausgabeformat (flac, mp3, ogg, opus)
eject: CD nach dem Rippen auswerfen
Returns: Returns:
Pfad zum Verzeichnis mit den gerippten Dateien Cleaned name (spaces -> underscores)
"""
# Replace spaces with underscores
name = name.replace(" ", "_")
# Keep umlauts and special characters
# Only remove problematic filename characters
name = re.sub(r'[<>:"/\\|?*]', "", name)
# Remove leading/trailing underscores
name = name.strip("_")
return name
def _parse_cddb_response(output: str) -> list[TrackInfo]:
"""Parse CDDB data from abcde output.
Args:
output: abcde stdout/stderr output
Returns:
List of track information
"""
tracks = []
# Pattern: "N: Artist - Title"
pattern = re.compile(r"^\s*(\d+):\s*(.+?)\s*-\s*(.+)$")
for line in output.split("\n"):
match = pattern.match(line)
if match:
track_num = int(match.group(1))
artist = match.group(2).strip()
title = match.group(3).strip()
tracks.append(TrackInfo(track_num, artist, title))
return tracks
def _get_audio_files(output_dir: Path, audio_format: AudioFormat) -> list[Path]:
"""Find all audio files in directory (case-insensitive).
Args:
output_dir: Target directory
audio_format: Audio format
Returns:
Sorted list of found files
"""
# Regex pattern for case-insensitive search
ext = audio_format.extension.lstrip(".")
pattern = re.compile(rf".*\.{ext}$", re.IGNORECASE)
audio_files = []
for file in output_dir.iterdir():
if file.is_file() and pattern.match(file.name):
audio_files.append(file)
return sorted(audio_files)
def _rename_files(
output_dir: Path,
tracks: list[TrackInfo],
audio_format: AudioFormat,
) -> None:
"""Rename files according to naming scheme.
Format: <two-digit track_number>_-_title_-_artist.extension
Args:
output_dir: Directory with files
tracks: Track information
audio_format: Audio format
"""
audio_files = _get_audio_files(output_dir, audio_format)
# Pattern for abcde filenames: 01, 02, ..., 10, 11, ...
abcde_pattern = re.compile(r"^(\d+)\.")
for track in tracks:
# Find matching file
for file in audio_files:
match = abcde_pattern.match(file.name)
if match and int(match.group(1)) == track.track_number:
# New name: <two-digit track_number>_-_title_-_artist.extension
track_num_padded = f"{track.track_number:02d}"
artist_clean = _sanitize_name(track.artist)
title_clean = _sanitize_name(track.title)
new_name = (
f"{track_num_padded}_-_{title_clean}_-_"
f"{artist_clean}{audio_format.extension}"
)
old_path = file
new_path = output_dir / new_name
if old_path != new_path:
logger.info("Renaming: %s -> %s", old_path.name, new_name)
old_path.rename(new_path)
break
def _rip_with_abcde(
device: str,
output_dir: Path,
audio_format: AudioFormat,
quality: str = "medium",
parallel_jobs: int = 1,
use_pipes: bool = False,
use_cddb: bool = True,
) -> tuple[list[Path], list[TrackInfo] | None]:
"""Rip a CD with abcde directly to desired format.
Args:
device: CD drive, e.g. '/dev/cdrom'
output_dir: Target directory for files
audio_format: Output audio format
quality: Quality setting (low, medium, high)
parallel_jobs: Number of parallel encoder processes
use_pipes: True = faster, no WAV files
use_cddb: True = use CDDB lookup
Returns:
Tuple (list of created files, track information or None)
""" """
output_dir.mkdir(parents=True, exist_ok=True) output_dir.mkdir(parents=True, exist_ok=True)
# abcde options:
# -a: cddb,read,encode,tag if use_cddb, else read,encode
# -p: pad track numbers with zeros
# -o format: output format
# -d device: CD drive
# -x: eject CD after ripping
# -N: non-interactive (no prompts)
cmd = [ cmd = [
"abcde", "abcde",
"-n", # kein CDDB-Lookup "-p",
"-N", # non-interaktiv "-o", audio_format.get_abcde_format(),
"-p", # führende Nullen bei Tracknummern
"-o", audio_format,
"-d", device, "-d", device,
"-D", # kein Debug "-x",
"-N",
] ]
if eject:
cmd.append("-x")
logger.info("Starte Ripping: %s", " ".join(cmd)) # Actions
if use_cddb:
cmd.extend(["-a", "cddb,read,encode,tag"])
else:
cmd.extend(["-a", "read,encode"])
# Parallel encodes
if parallel_jobs > 1:
cmd.extend(["-j", str(parallel_jobs)])
# Use pipes
if use_pipes:
cmd.append("-P")
# Encoder options for quality
encoder_opts = audio_format.get_encoder_options(quality)
if encoder_opts:
# abcde accepts encoder options with colon
# Format: -o format:options
cmd[-2] = f"{audio_format.get_abcde_format()}:{encoder_opts}"
logger.info("Starting abcde in %s (Format: %s, Quality: %s, CDDB: %s)",
output_dir, audio_format.value, quality, use_cddb)
# Run abcde non-interactively
result = subprocess.run( result = subprocess.run(
cmd, cmd,
cwd=str(output_dir), cwd=str(output_dir),
@ -50,8 +216,191 @@ def rip_disc(
) )
if result.returncode != 0: if result.returncode != 0:
logger.error("abcde Fehler: %s", result.stderr) raise RuntimeError(
raise RuntimeError(f"abcde fehlgeschlagen (exit {result.returncode}): {result.stderr}") f"abcde failed (exit {result.returncode}). "
"Check if a CD is in the drive and readable."
)
logger.info("Ripping abgeschlossen: %s", output_dir) # Track information from CDDB parsing
return output_dir tracks = None
if use_cddb:
tracks = _parse_cddb_response(result.stdout)
if tracks:
logger.info("CDDB data found: %d tracks", len(tracks))
# Find files (case-insensitive)
audio_files = _get_audio_files(output_dir, audio_format)
if not audio_files:
raise RuntimeError("No files created by abcde")
logger.info("Ripping completed: %d tracks in %s", len(audio_files), output_dir)
return audio_files, tracks
def rip_disc(
device: str,
output_dir: Path,
audio_format: AudioFormat = AudioFormat.FLAC,
quality: str = "medium",
parallel_jobs: int = 1,
use_pipes: bool = False,
use_cddb: bool = True,
) -> tuple[Path, str | None, list[TrackInfo] | None]:
"""Rip a CD directly to the desired format.
Args:
device: CD drive, e.g. '/dev/cdrom'
output_dir: Target directory for files
audio_format: Output audio format
quality: Quality setting (low, medium, high)
parallel_jobs: Number of parallel encoder processes
use_pipes: True = faster, no WAV files
use_cddb: True = use CDDB lookup
Returns:
Tuple (directory path, album name or None, track information or None)
"""
_, tracks = _rip_with_abcde(
device, output_dir, audio_format, quality, parallel_jobs, use_pipes, use_cddb
)
# Extract album name from first track (artist part)
album_name = None
if tracks and len(tracks) > 0:
# For Various Artists, this will be "Sampler" or similar
# For single artist, this will be the artist name
album_name = tracks[0].artist
# If CDDB data available, rename files
if tracks:
_rename_files(output_dir, tracks, audio_format)
return output_dir, album_name, tracks
def interactive_rip(
config: RipperConfig,
) -> None:
"""Interactive rip workflow for multiple CDs.
Prompts for each album/CD:
- Album name (or empty for default 'Album{N}')
- CD number (e.g., 1, 2, ...)
- Optional continuation
Files are placed under config.output_dir:
temp/Album_Name/CD1/01_-_title_-_artist.flac, ...
If CDDB is available, files are automatically named.
Args:
config: Ripper configuration
"""
print("\n" + "=" * 60)
print(" Musiksammlung - Interactive CD Ripper (abcde)")
print("=" * 60)
print(f"\nCD Drive: {config.device}")
print(f"Audio Format: {config.audio_format.value}")
print(f"Quality: {config.quality}")
print(f"CDDB Lookup: {config.use_cddb}")
print(f"Parallel Encodes: {config.parallel_jobs}")
print(f"Pipes: {config.use_pipes}")
print(f"Output Directory: {config.output_dir.absolute()}\n")
album_counter = 1
while True:
print(f"\n--- Album {album_counter} ---")
# Ask for album name (optional, overridden if CDDB available)
album_name = input(
"Enter album name (or Enter for CDDB/default 'Album{N}'): "
).strip()
default_album_name = album_name if album_name else f"Album{album_counter}"
disc_counter = 1
while True:
print(f"\n Album: {default_album_name}")
print(f" CD Drive: {config.device}")
# Ask for disc number
disc_input = input(
" CD number for this CD [1]: "
).strip()
disc_num = int(disc_input) if disc_input else 1
# Build target directory
disc_dir = (
config.output_dir
/ _sanitize_name(default_album_name)
/ f"CD{disc_num}"
)
print(f" Ripping CD to: {disc_dir.relative_to(config.output_dir)}")
print(" (Ripping in progress, please wait...)")
try:
_, detected_album, tracks = rip_disc(
device=config.device,
output_dir=disc_dir,
audio_format=config.audio_format,
quality=config.quality,
parallel_jobs=config.parallel_jobs,
use_pipes=config.use_pipes,
use_cddb=config.use_cddb,
)
# Show detected information
if tracks and detected_album:
print(f" ✓ CD {disc_num} ripped successfully")
print(f" Detected: {detected_album}")
if len(tracks) > 0:
print(f" Tracks: {len(tracks)}")
# Show first and last track
first = tracks[0]
last = tracks[-1] if len(tracks) > 1 else None
print(f" {first.track_number}. {first.title} ({first.artist})")
if last:
print(f" ... {last.track_number}. {last.title} ({last.artist})")
else:
print(f" ✓ CD {disc_num} ripped successfully")
except RuntimeError as e:
print(f" ✗ Ripping error: {e}")
retry = input(" Try again? (y/n): ").strip().lower()
if retry != "y":
print(" Aborting disc.")
break
continue
# Continue?
next_disc = input(
" Next CD for this album? (y/n): "
).strip().lower()
if next_disc != "y":
break
disc_counter += 1
# Next album?
next_album = input("\nNext album? (y/n): ").strip().lower()
if next_album != "y":
break
album_counter += 1
print("\n" + "=" * 60)
print("Ripping completed!")
print(f"\nFiles are in: {config.output_dir.absolute()}")
print("\nNext steps:")
print(" 1. Check filenames and tags")
if config.use_cddb:
print(" 2. Adjust tags and covers with 'musiksammlung apply'")
else:
print(" 2. Scan CD cover images")
print(" 3. 'musiksammlung scan' for album JSON")
print(" 4. 'musiksammlung apply' to organize & tag")
print("=" * 60 + "\n")

View file

View file

@ -1,42 +0,0 @@
"""Tests für die Datenmodelle."""
from musiksammlung.models import Album
def test_album_folder_name_with_year():
album = Album(artist="Test", album="Mein Album", year=1987, discs=[])
assert album.folder_name == "Mein Album (1987)"
def test_album_folder_name_without_year():
album = Album(artist="Test", album="Mein Album", year=None, discs=[])
assert album.folder_name == "Mein Album"
def test_sanitize_name():
album = Album(artist='Art:ist', album='Al/bum?', year=None, discs=[])
assert ":" not in album.artist
assert "/" not in album.album
assert "?" not in album.album
def test_album_from_json():
data = {
"artist": "Die Toten Hosen",
"album": "Opium fürs Volk",
"year": 1996,
"discs": [
{
"disc_number": 1,
"tracks": [
{"track_number": 1, "title": "Bonnie & Clyde"},
{"track_number": 2, "title": "Zehn kleine Jägermeister"},
],
}
],
}
album = Album.model_validate(data)
assert album.artist == "Die Toten Hosen"
assert len(album.discs) == 1
assert len(album.discs[0].tracks) == 2
assert album.discs[0].tracks[1].title == "Zehn kleine Jägermeister"

View file

@ -1,78 +0,0 @@
"""Tests für den Organizer."""
from pathlib import Path
from musiksammlung.models import Album, Disc, Track
from musiksammlung.organizer import build_mapping, discover_audio_files
def test_discover_audio_files(tmp_path: Path):
"""Findet und sortiert Audiodateien korrekt."""
(tmp_path / "Track_03.flac").touch()
(tmp_path / "Track_01.flac").touch()
(tmp_path / "Track_02.flac").touch()
(tmp_path / "cover.jpg").touch() # soll ignoriert werden
files = discover_audio_files(tmp_path)
assert len(files) == 3
assert files[0].name == "Track_01.flac"
assert files[2].name == "Track_03.flac"
def test_build_mapping_single_disc(tmp_path: Path):
"""Mapping für ein Single-CD-Album."""
(tmp_path / "Track_01.flac").touch()
(tmp_path / "Track_02.flac").touch()
album = Album(
artist="TestArtist",
album="TestAlbum",
year=2000,
discs=[
Disc(
disc_number=1,
tracks=[
Track(track_number=1, title="Erster Song"),
Track(track_number=2, title="Zweiter Song"),
],
)
],
)
output = tmp_path / "output"
mapping = build_mapping(album, tmp_path, output)
assert len(mapping) == 2
targets = list(mapping.values())
assert targets[0].name == "01 Erster Song.flac"
assert targets[1].name == "02 Zweiter Song.flac"
# Single-Disc: kein CD1-Unterordner
assert "CD1" not in str(targets[0])
def test_build_mapping_multi_disc(tmp_path: Path):
"""Mapping für ein Multi-CD-Album."""
cd1 = tmp_path / "CD1"
cd2 = tmp_path / "CD2"
cd1.mkdir()
cd2.mkdir()
(cd1 / "Track_01.flac").touch()
(cd2 / "Track_01.flac").touch()
album = Album(
artist="Artist",
album="Box Set",
year=1999,
discs=[
Disc(disc_number=1, tracks=[Track(track_number=1, title="Song A")]),
Disc(disc_number=2, tracks=[Track(track_number=1, title="Song B")]),
],
)
output = tmp_path / "output"
mapping = build_mapping(album, tmp_path, output)
assert len(mapping) == 2
targets = list(mapping.values())
assert "CD1" in str(targets[0])
assert "CD2" in str(targets[1])

View file

@ -1,37 +0,0 @@
"""Tests für die Playlist-Generierung."""
from pathlib import Path
from musiksammlung.models import Album, Disc, Track
from musiksammlung.playlist import generate_playlist
def test_generate_playlist_single_disc(tmp_path: Path):
"""Erzeugt eine M3U-Playlist für ein Single-CD-Album."""
album = Album(
artist="Artist",
album="TestAlbum",
year=2000,
discs=[
Disc(
disc_number=1,
tracks=[
Track(track_number=1, title="Song Eins"),
Track(track_number=2, title="Song Zwei"),
],
)
],
)
# Dummy-Audiodateien anlegen
(tmp_path / "01 Song Eins.flac").touch()
(tmp_path / "02 Song Zwei.flac").touch()
playlist_path = generate_playlist(album, tmp_path)
assert playlist_path.exists()
content = playlist_path.read_text()
assert "#EXTM3U" in content
assert "01 Song Eins.flac" in content
assert "02 Song Zwei.flac" in content
# Kein CD-Prefix bei Single-Disc
assert "CD1/" not in content

View file

@ -1,37 +0,0 @@
"""Tests für die Vision-LLM JSON-Extraktion."""
import pytest
from musiksammlung.vision_llm import _extract_json
def test_extract_pure_json():
text = '{"artist": "Test", "album": "Album"}'
assert '"Test"' in _extract_json(text)
def test_extract_json_from_markdown_block():
text = 'Hier ist das Ergebnis:\n```json\n{"artist": "Test"}\n```\nFertig.'
assert '"Test"' in _extract_json(text)
def test_extract_json_with_thinking_tags():
text = '<think>Ich denke nach...</think>\n{"artist": "Test", "album": "X"}'
result = _extract_json(text)
assert '"Test"' in result
def test_extract_json_with_surrounding_text():
text = 'Das JSON:\n{"artist": "A", "album": "B"}\nEnde.'
result = _extract_json(text)
assert '"A"' in result
def test_extract_json_empty_raises():
with pytest.raises(ValueError, match="Leere Antwort"):
_extract_json("")
def test_extract_json_no_json_raises():
with pytest.raises(ValueError, match="Kein JSON"):
_extract_json("Hier ist kein JSON, nur Text.")