Musiksammlung/src/musiksammlung/ripper.py
dschlueter 09c01c9370 Fix CDDB parser for compilations and add grab-progress fallback
- _parse_cddb_lines now handles both 'Artist - Title' and 'Artist / Title'
  (slash separator used by abcde for compilation albums like Various Artists)
- _stream_abcde collects grab-progress lines (track N: Artist / Title)
  as a fallback TrackInfo source when no CDDB lines are found
- New _parse_grab_tracks() splits grab titles on ' / ' into artist+title
- 5 new tests (TestParseCddbLines.test_compilation_slash_separator,
  TestParseGrabTracks.*)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-18 09:42:03 +01:00

606 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""CD-Ripping via abcde with interactive multi-disc workflow."""
from __future__ import annotations
import logging
import re
import subprocess
from pathlib import Path
from pydantic import BaseModel
from musiksammlung.cddb import get_discid, lookup_by_discid
from musiksammlung.config import AudioFormat
from musiksammlung.models import Album as AlbumModel
from musiksammlung.models import Disc as DiscModel
from musiksammlung.models import Track as TrackModel
from musiksammlung.models import TrackInfo
from musiksammlung.musicbrainz import lookup_by_barcode
logger = logging.getLogger(__name__)
# ANSI escape sequence pattern (e.g. arrow keys from broken readline)
_ANSI_ESC = re.compile(r"(\x1b|\^)\[[\d;]*[A-Za-z@]?")
class RipperConfig(BaseModel):
"""Configuration for ripping process."""
device: str = "/dev/cdrom"
audio_format: AudioFormat = AudioFormat.FLAC
output_dir: Path = Path("temp")
quality: str = "high" # low, medium, high
parallel_jobs: int = 1 # Number of parallel encoder processes
use_pipes: bool = False # True = faster, no WAV files
use_cddb: bool = True # Use CDDB lookup
def _clean_input(raw: str) -> str:
"""Strip ANSI escape codes, control characters and surrounding quotes.
Handles broken readline environments where arrow keys produce
literal escape sequences like ^[[D instead of moving the cursor.
Args:
raw: Raw string from input()
Returns:
Cleaned string
"""
cleaned = _ANSI_ESC.sub("", raw)
cleaned = re.sub(r"[\x00-\x1f\x7f]", "", cleaned)
cleaned = cleaned.strip().strip('"\'')
return cleaned
def _sanitize_name(name: str) -> str:
"""Remove problematic characters and replace spaces.
Args:
name: Original name
Returns:
Cleaned name (spaces -> underscores)
"""
name = name.replace(" ", "_")
name = re.sub(r'[<>:"/\\|?*]', "", name)
name = name.strip("_")
return name
def _parse_cddb_lines(lines: list[str]) -> list[TrackInfo]:
"""Parse CDDB track list from abcde output lines.
Matches lines like:
"1: Wolfgang Anheisser - Wer recht in Freuden wandern will" (regular albums)
"1: Trini Lopez / This Land Is Your Land (live)" (compilations)
Args:
lines: Lines collected from abcde stdout+stderr
Returns:
List of TrackInfo (may be empty if CDDB lookup failed)
"""
tracks = []
pattern = re.compile(r"^\s*(\d+):\s*(.+?)\s+(?:-|/)\s+(.+)$")
for line in lines:
m = pattern.match(line)
if m:
tracks.append(TrackInfo(
track_number=int(m.group(1)),
artist=m.group(2).strip(),
title=m.group(3).strip(),
))
return tracks
def _parse_grab_tracks(grab_data: list[tuple[int, str]]) -> list[TrackInfo]:
"""Build TrackInfo list from grab-progress lines captured during ripping.
abcde prints "Grabbing track N of M: Artist / Title" (or just "Title")
during the grab phase. This serves as a fallback when CDDB lines are absent.
Args:
grab_data: List of (track_number, raw_title) from grab_re matches
Returns:
List of TrackInfo
"""
tracks = []
sep = re.compile(r"\s+/\s+")
for num, raw in grab_data:
parts = sep.split(raw, maxsplit=1)
if len(parts) == 2:
artist, title = parts[0].strip(), parts[1].strip()
else:
artist, title = "", raw.strip()
tracks.append(TrackInfo(track_number=num, artist=artist, title=title))
return tracks
def _stream_abcde(
process: subprocess.Popen,
use_cddb: bool,
) -> tuple[list[TrackInfo] | None, int]:
"""Stream abcde output live, show meaningful progress, collect CDDB data.
Filters abcde/cdparanoia output into three layers:
- Track progress: 'Grabbing track N: Title'
- Sector progress bar from cdparanoia
- CDDB/MusicBrainz info lines
Handles both regular albums ("Artist - Title") and compilations
("Artist / Title") in CDDB output. Grab-progress lines are stored as
a fallback in case CDDB lines are absent.
Args:
process: Running abcde subprocess
use_cddb: Whether to expect and parse CDDB output
Returns:
Tuple (list of TrackInfo or None, return code)
"""
grab_re = re.compile(r"Grabbing.*track\s+(\d+)(?:\s+of\s+(\d+))?[:\s]*(.*)", re.I)
tag_re = re.compile(r"Tagging track\s+(\d+)\s+of\s+(\d+)", re.I)
sector_re = re.compile(r"\(== PROGRESS ==.*\|\s*(\d+)\s+(\d+)\s*\]")
# Handle both "Artist - Title" and "Artist / Title" (compilations)
cddb_re = re.compile(r"^\s*(\d+):\s*(.+?)\s+(?:-|/)\s+(.+)$")
header_re = re.compile(r"-{2,}.+-{2,}") # ---- Artist / Album ----
total_re = re.compile(r"tracks?:\s+([\d\s]+)", re.I)
all_lines: list[str] = []
cddb_lines: list[str] = []
grab_data: list[tuple[int, str]] = [] # (track_number, raw_title) fallback
total_tracks = 0
current_track = 0
track_end_sector = 0
for raw in process.stdout:
line = raw.rstrip("\n\r")
all_lines.append(line)
# ── Track count from "Grabbing entire CD - tracks: 01 02 03 ..."
m = total_re.search(line)
if m and total_tracks == 0:
nums = m.group(1).split()
if nums:
total_tracks = len(nums)
# ── Grab / encode progress
m = grab_re.search(line)
if m:
current_track = int(m.group(1))
if m.group(2):
total_tracks = int(m.group(2))
title = m.group(3).strip().rstrip(".")
counter = f"{current_track}/{total_tracks}" if total_tracks else str(current_track)
print(f"\n Track {counter} {title}", flush=True)
if title:
grab_data.append((current_track, title))
track_end_sector = 0 # reset sector bar for new track
continue
# ── Tagging progress
m = tag_re.search(line)
if m:
print(f"\r Tagging {m.group(1)}/{m.group(2)} ", flush=True)
continue
# ── cdparanoia sector progress bar
m = sector_re.search(line)
if m:
cur = int(m.group(1))
end = int(m.group(2)) if int(m.group(2)) > 0 else cur
if track_end_sector == 0:
track_end_sector = end
pct = min(cur / track_end_sector, 1.0) if track_end_sector > 0 else 0
bar_w = 30
filled = int(pct * bar_w)
bar = "" * filled + "" * (bar_w - filled)
mb = cur * 2352 / 1_048_576 # rough size in MB
print(f"\r [{bar}] {pct:5.1%} {mb:5.1f} MB", end="", flush=True)
continue
# ── CDDB / MusicBrainz album header
if header_re.search(line):
print(f"\n {line.strip()}", flush=True)
continue
# ── CDDB track lines "1: Artist - Title" or "1: Artist / Title"
m = cddb_re.match(line)
if m:
cddb_lines.append(line)
continue
# ── Other important info (errors, status)
stripped = line.strip()
if stripped and any(kw in line for kw in (
"Retrieved", "Selected", "Finished", "MusicBrainz",
"Error", "ERROR", "Cannot", "failed", "No tracks",
)):
print(f"\n {stripped}", flush=True)
returncode = process.wait()
# Newline after last progress bar
print(flush=True)
if not use_cddb:
return None, returncode
tracks = _parse_cddb_lines(cddb_lines)
if not tracks and grab_data:
# CDDB lines absent but grab progress contained track titles
tracks = _parse_grab_tracks(grab_data)
if tracks:
print(f" (Tracklist aus Grab-Fortschritt: {len(tracks)} Tracks)", flush=True)
return tracks or None, returncode
def _extract_tracks(output_dir: Path, audio_format: AudioFormat) -> list[Path]:
"""Find abcde track files recursively and move them flat into output_dir.
abcde stores encoded files inside its temp dir as:
output_dir/abcde.XXXX/track01.flac
Moves them to:
output_dir/track01.flac
Args:
output_dir: Directory to search and target for flat layout
audio_format: Audio format
Returns:
Sorted list of moved files in output_dir
"""
ext = audio_format.extension.lstrip(".")
pattern = re.compile(rf"^track(\d+)\.{ext}$", re.IGNORECASE)
moved = []
for file in sorted(output_dir.rglob("*")):
if file.is_file() and pattern.match(file.name):
dest = output_dir / file.name
if file != dest:
logger.info("Extracting: %s", file.name)
file.rename(dest)
moved.append(dest)
return moved
def _rename_files(
output_dir: Path,
tracks: list[TrackInfo],
audio_format: AudioFormat,
) -> None:
"""Rename track files according to naming scheme.
Input: track01.flac, track02.flac, ...
Output: 01_-_title_-_artist.flac, ...
Falls back to plain 01.flac etc. for tracks without CDDB info.
Args:
output_dir: Directory with files
tracks: Track information from CDDB
audio_format: Audio format
"""
ext = audio_format.extension.lstrip(".")
abcde_pattern = re.compile(rf"^track(\d+)\.{ext}$", re.IGNORECASE)
by_num = {t.track_number: t for t in tracks}
for file in sorted(output_dir.glob(f"track*.{ext}")):
m = abcde_pattern.match(file.name)
if not m:
continue
num = int(m.group(1))
track = by_num.get(num)
if track:
new_name = (
f"{num:02d}_-_{_sanitize_name(track.title)}_-_"
f"{_sanitize_name(track.artist)}{audio_format.extension}"
)
else:
new_name = f"{num:02d}{audio_format.extension}"
new_path = output_dir / new_name
if file != new_path:
logger.info("Renaming: %s%s", file.name, new_name)
print(f" {file.name}{new_name}", flush=True)
file.rename(new_path)
def _rip_with_abcde(
device: str,
output_dir: Path,
audio_format: AudioFormat,
quality: str = "high",
parallel_jobs: int = 1,
use_pipes: bool = False,
use_cddb: bool = True,
) -> tuple[list[Path], list[TrackInfo] | None]:
"""Rip a CD with abcde directly to desired format.
Args:
device: CD drive, e.g. '/dev/cdrom'
output_dir: Target directory for files
audio_format: Output audio format
quality: Quality setting (low, medium, high)
parallel_jobs: Number of parallel encoder processes
use_pipes: True = faster, no WAV files
use_cddb: True = use CDDB lookup
Returns:
Tuple (list of created files, track information or None)
"""
output_dir.mkdir(parents=True, exist_ok=True)
# Build output format string: "flac" or "flac:-8" (with quality options)
encoder_opts = audio_format.get_encoder_options(quality)
output_fmt = audio_format.get_abcde_format()
if encoder_opts:
output_fmt = f"{output_fmt}:{encoder_opts}"
# abcde options:
# -a actions: cddb+read+encode+tag (no 'move' — we extract files ourselves)
# -p: pad track numbers with zeros
# -o format[:options]: output format with optional encoder options
# -d device: CD drive
# -x: eject CD after ripping
# -N: non-interactive (auto-select first CDDB match, no prompts)
actions = "cddb,read,encode,tag" if use_cddb else "read,encode"
cmd = [
"abcde",
"-a", actions,
"-p",
"-o", output_fmt,
"-d", device,
"-x",
"-N",
]
if parallel_jobs > 1:
cmd.extend(["-j", str(parallel_jobs)])
if use_pipes:
cmd.append("-P")
# Disc-Fingerprint vor dem Ripping holen (für GnuDB-Fallback)
discid_line = get_discid(device)
print(f"\n Command: {' '.join(cmd)}", flush=True)
logger.info("Starting abcde: %s", " ".join(cmd))
process = subprocess.Popen(
cmd,
cwd=str(output_dir),
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT, # merge stderr into stdout
text=True,
bufsize=1, # line-buffered
)
tracks, returncode = _stream_abcde(process, use_cddb)
if returncode != 0:
raise RuntimeError(f"abcde failed (exit {returncode}).")
if use_cddb:
if tracks:
print(f"\n CDDB: {len(tracks)} tracks found", flush=True)
logger.info("CDDB data: %d tracks", len(tracks))
else:
print("\n CDDB: no track data found", flush=True)
logger.warning("CDDB lookup returned no track data")
# Fallback: GnuDB direkt anfragen (mit Retries + Zufallspause)
if discid_line:
print(" GnuDB-Fallback: direkter Lookup mit Retries...", flush=True)
tracks = lookup_by_discid(discid_line) or None
if tracks:
print(f" GnuDB: {len(tracks)} Tracks gefunden", flush=True)
else:
print(" GnuDB: kein Treffer.", flush=True)
# Extract track files from abcde's temp dir into output_dir (flat)
audio_files = _extract_tracks(output_dir, audio_format)
if not audio_files:
raise RuntimeError(
"No audio files found after ripping. "
"Check that a CD is in the drive."
)
logger.info("Ripping completed: %d tracks in %s", len(audio_files), output_dir)
return audio_files, tracks
def rip_disc(
device: str,
output_dir: Path,
audio_format: AudioFormat = AudioFormat.FLAC,
quality: str = "high",
parallel_jobs: int = 1,
use_pipes: bool = False,
use_cddb: bool = True,
) -> tuple[Path, str | None, list[TrackInfo] | None]:
"""Rip a CD directly to the desired format.
Args:
device: CD drive, e.g. '/dev/cdrom'
output_dir: Target directory for files
audio_format: Output audio format
quality: Quality setting (low, medium, high)
parallel_jobs: Number of parallel encoder processes
use_pipes: True = faster, no WAV files
use_cddb: True = use CDDB lookup
Returns:
Tuple (directory path, album name or None, track information or None)
"""
_, tracks = _rip_with_abcde(
device, output_dir, audio_format, quality, parallel_jobs, use_pipes, use_cddb
)
album_name = None
if tracks:
album_name = tracks[0].artist
print("\n Renaming files ...", flush=True)
_rename_files(output_dir, tracks, audio_format)
return output_dir, album_name, tracks
def interactive_rip(config: RipperConfig) -> None:
"""Interactive rip workflow for multiple CDs.
Files are placed under config.output_dir:
Album_Name/CD1/01_-_title_-_artist.flac, ...
Args:
config: Ripper configuration
"""
print("\n" + "=" * 60)
print(" Musiksammlung - Interactive CD Ripper (abcde)")
print("=" * 60)
print(f"\nCD Drive: {config.device}")
print(f"Audio Format: {config.audio_format.value}")
print(f"Quality: {config.quality}")
print(f"CDDB Lookup: {config.use_cddb}")
print(f"Parallel Encodes: {config.parallel_jobs}")
print(f"Pipes: {config.use_pipes}")
print(f"Output Directory: {config.output_dir.absolute()}")
print("\nNote: Do not use arrow keys while typing — press Enter to confirm.\n")
album_counter = 1
while True:
print(f"\n--- Album {album_counter} ---")
raw = input("Album name (Enter = CDDB name / default 'Album{N}'): ")
album_name = _clean_input(raw)
if not album_name:
album_name = f"Album{album_counter}"
# Optional: EAN/Barcode für MusicBrainz-Lookup
raw_ean = input("EAN/Barcode für MusicBrainz (Enter = überspringen): ")
ean = _clean_input(raw_ean)
mb_album: AlbumModel | None = None
if ean:
try:
print(f" MusicBrainz-Suche nach Barcode {ean} ...", flush=True)
mb_album = lookup_by_barcode(ean)
print(
f"{mb_album.artist} {mb_album.album}"
f" ({mb_album.year or '?'},"
f" {sum(len(d.tracks) for d in mb_album.discs)} Tracks)",
flush=True,
)
# Albumnamen aus MusicBrainz übernehmen, wenn nicht manuell gesetzt
if album_name == f"Album{album_counter}":
album_name = mb_album.album or album_name
except Exception as e:
print(f" MusicBrainz: kein Treffer — {e}", flush=True)
disc_counter = 1
all_discs: list[DiscModel] = []
while True:
print(f"\n Album: {album_name}")
print(f" CD Drive: {config.device}")
raw_disc = input(" CD number [1]: ")
disc_num = int(_clean_input(raw_disc)) if _clean_input(raw_disc) else 1
disc_dir = (
config.output_dir
/ _sanitize_name(album_name)
/ f"CD{disc_num}"
)
print(f"\n Ripping to: {disc_dir}")
print(" " + "-" * 50)
try:
_, detected_album, tracks = rip_disc(
device=config.device,
output_dir=disc_dir,
audio_format=config.audio_format,
quality=config.quality,
parallel_jobs=config.parallel_jobs,
use_pipes=config.use_pipes,
use_cddb=config.use_cddb,
)
print("\n " + "-" * 50)
if tracks:
print(f" ✓ Done — {len(tracks)} tracks")
for t in tracks:
print(f" {t.track_number:2d}. {t.title} [{t.artist}]")
all_discs.append(DiscModel(
disc_number=disc_num,
tracks=[
TrackModel(
track_number=t.track_number,
title=t.title,
artist=t.artist,
)
for t in tracks
],
))
else:
print(" ✓ Done (no CDDB data)")
except RuntimeError as e:
print(f"\n ✗ Error: {e}")
raw_retry = input(" Try again? (y/n): ")
if _clean_input(raw_retry).lower() != "y":
print(" Aborting disc.")
break
continue
raw_next = input("\n Next CD for this album? (y/n): ")
if _clean_input(raw_next).lower() != "y":
break
disc_counter += 1
if mb_album:
# MusicBrainz-Daten haben Priorität (inkl. Jahr, kuratierte Titel)
album_model = mb_album
album_root = config.output_dir / _sanitize_name(mb_album.album or album_name)
elif all_discs:
artist = all_discs[0].tracks[0].artist or album_name
album_model = AlbumModel(artist=artist, album=album_name, discs=all_discs)
album_root = config.output_dir / _sanitize_name(album_name)
else:
album_root = config.output_dir / _sanitize_name(album_name)
album_model = None
if album_model is not None:
album_root.mkdir(parents=True, exist_ok=True)
json_path = album_root / "album.json"
json_path.write_text(
album_model.model_dump_json(indent=2), encoding="utf-8"
)
print(f"\n album.json gespeichert: {json_path}")
print(" → Weiter mit: musiksammlung apply <album-verzeichnis> album.json")
raw_album = input("\nNext album? (y/n): ")
if _clean_input(raw_album).lower() != "y":
break
album_counter += 1
print("\n" + "=" * 60)
print("Ripping completed!")
print(f"Files are in: {config.output_dir.absolute()}")
print("\nNext steps:")
print(" 1. Check filenames and tags")
if config.use_cddb:
print(" 2. Adjust tags/covers with 'musiksammlung apply'")
else:
print(" 2. Run 'musiksammlung scan' to extract metadata")
print(" 3. Run 'musiksammlung apply' to organize & tag")
print("=" * 60 + "\n")