cmd[-2] was overwriting the -a action value instead of the -o format value when -P was appended last. Now output_fmt is computed upfront and the cmd list is built cleanly without post-hoc index manipulation. Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
502 lines
16 KiB
Python
502 lines
16 KiB
Python
"""CD-Ripping via abcde with interactive multi-disc workflow."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import re
|
|
import subprocess
|
|
from pathlib import Path
|
|
from typing import NamedTuple
|
|
|
|
from pydantic import BaseModel
|
|
|
|
from musiksammlung.config import AudioFormat
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# ANSI escape sequence pattern (e.g. arrow keys from broken readline)
|
|
_ANSI_ESC = re.compile(r"(\x1b|\^)\[[\d;]*[A-Za-z@]?")
|
|
|
|
|
|
class TrackInfo(NamedTuple):
|
|
"""Track information from abcde."""
|
|
|
|
track_number: int
|
|
artist: str
|
|
title: str
|
|
|
|
|
|
class RipperConfig(BaseModel):
|
|
"""Configuration for ripping process."""
|
|
|
|
device: str = "/dev/cdrom"
|
|
audio_format: AudioFormat = AudioFormat.FLAC
|
|
output_dir: Path = Path("temp")
|
|
quality: str = "high" # low, medium, high
|
|
parallel_jobs: int = 1 # Number of parallel encoder processes
|
|
use_pipes: bool = False # True = faster, no WAV files
|
|
use_cddb: bool = True # Use CDDB lookup
|
|
|
|
|
|
def _clean_input(raw: str) -> str:
|
|
"""Strip ANSI escape codes, control characters and surrounding quotes.
|
|
|
|
Handles broken readline environments where arrow keys produce
|
|
literal escape sequences like ^[[D instead of moving the cursor.
|
|
|
|
Args:
|
|
raw: Raw string from input()
|
|
|
|
Returns:
|
|
Cleaned string
|
|
"""
|
|
cleaned = _ANSI_ESC.sub("", raw)
|
|
cleaned = re.sub(r"[\x00-\x1f\x7f]", "", cleaned)
|
|
cleaned = cleaned.strip().strip('"\'')
|
|
return cleaned
|
|
|
|
|
|
def _sanitize_name(name: str) -> str:
|
|
"""Remove problematic characters and replace spaces.
|
|
|
|
Args:
|
|
name: Original name
|
|
|
|
Returns:
|
|
Cleaned name (spaces -> underscores)
|
|
"""
|
|
name = name.replace(" ", "_")
|
|
name = re.sub(r'[<>:"/\\|?*]', "", name)
|
|
name = name.strip("_")
|
|
return name
|
|
|
|
|
|
def _parse_cddb_lines(lines: list[str]) -> list[TrackInfo]:
|
|
"""Parse CDDB track list from abcde output lines.
|
|
|
|
Matches lines like: "1: Wolfgang Anheisser - Wer recht in Freuden wandern will"
|
|
|
|
Args:
|
|
lines: Lines collected from abcde stdout+stderr
|
|
|
|
Returns:
|
|
List of TrackInfo (may be empty if CDDB lookup failed)
|
|
"""
|
|
tracks = []
|
|
pattern = re.compile(r"^\s*(\d+):\s*(.+?)\s+-\s+(.+)$")
|
|
for line in lines:
|
|
m = pattern.match(line)
|
|
if m:
|
|
tracks.append(TrackInfo(
|
|
track_number=int(m.group(1)),
|
|
artist=m.group(2).strip(),
|
|
title=m.group(3).strip(),
|
|
))
|
|
return tracks
|
|
|
|
|
|
def _stream_abcde(
|
|
process: subprocess.Popen,
|
|
use_cddb: bool,
|
|
) -> tuple[list[TrackInfo] | None, int]:
|
|
"""Stream abcde output live, show meaningful progress, collect CDDB data.
|
|
|
|
Filters abcde/cdparanoia output into three layers:
|
|
- Track progress: 'Grabbing track N: Title'
|
|
- Sector progress bar from cdparanoia
|
|
- CDDB/MusicBrainz info lines
|
|
|
|
Args:
|
|
process: Running abcde subprocess
|
|
use_cddb: Whether to expect and parse CDDB output
|
|
|
|
Returns:
|
|
Tuple (list of TrackInfo or None, total track count)
|
|
"""
|
|
grab_re = re.compile(r"Grabbing.*track\s+(\d+)(?:\s+of\s+(\d+))?[:\s]*(.*)", re.I)
|
|
tag_re = re.compile(r"Tagging track\s+(\d+)\s+of\s+(\d+)", re.I)
|
|
sector_re = re.compile(r"\(== PROGRESS ==.*\|\s*(\d+)\s+(\d+)\s*\]")
|
|
cddb_re = re.compile(r"^\s*(\d+):\s*(.+?)\s+-\s+(.+)$")
|
|
header_re = re.compile(r"-{2,}.+-{2,}") # ---- Artist / Album ----
|
|
total_re = re.compile(r"tracks?:\s+([\d\s]+)", re.I)
|
|
|
|
all_lines: list[str] = []
|
|
cddb_lines: list[str] = []
|
|
total_tracks = 0
|
|
current_track = 0
|
|
track_end_sector = 0
|
|
|
|
for raw in process.stdout:
|
|
line = raw.rstrip("\n\r")
|
|
all_lines.append(line)
|
|
|
|
# ── Track count from "Grabbing entire CD - tracks: 01 02 03 ..."
|
|
m = total_re.search(line)
|
|
if m and total_tracks == 0:
|
|
nums = m.group(1).split()
|
|
if nums:
|
|
total_tracks = len(nums)
|
|
|
|
# ── Grab / encode progress
|
|
m = grab_re.search(line)
|
|
if m:
|
|
current_track = int(m.group(1))
|
|
if m.group(2):
|
|
total_tracks = int(m.group(2))
|
|
title = m.group(3).strip().rstrip(".")
|
|
counter = f"{current_track}/{total_tracks}" if total_tracks else str(current_track)
|
|
print(f"\n Track {counter} {title}", flush=True)
|
|
track_end_sector = 0 # reset sector bar for new track
|
|
continue
|
|
|
|
# ── Tagging progress
|
|
m = tag_re.search(line)
|
|
if m:
|
|
print(f"\r Tagging {m.group(1)}/{m.group(2)} ", flush=True)
|
|
continue
|
|
|
|
# ── cdparanoia sector progress bar
|
|
m = sector_re.search(line)
|
|
if m:
|
|
cur = int(m.group(1))
|
|
end = int(m.group(2)) if int(m.group(2)) > 0 else cur
|
|
if track_end_sector == 0:
|
|
track_end_sector = end
|
|
pct = min(cur / track_end_sector, 1.0) if track_end_sector > 0 else 0
|
|
bar_w = 30
|
|
filled = int(pct * bar_w)
|
|
bar = "█" * filled + "░" * (bar_w - filled)
|
|
mb = cur * 2352 / 1_048_576 # rough size in MB
|
|
print(f"\r [{bar}] {pct:5.1%} {mb:5.1f} MB", end="", flush=True)
|
|
continue
|
|
|
|
# ── CDDB / MusicBrainz album header
|
|
if header_re.search(line):
|
|
print(f"\n {line.strip()}", flush=True)
|
|
continue
|
|
|
|
# ── CDDB track lines "1: Artist - Title"
|
|
m = cddb_re.match(line)
|
|
if m:
|
|
cddb_lines.append(line)
|
|
continue
|
|
|
|
# ── Other important info (errors, status)
|
|
stripped = line.strip()
|
|
if stripped and any(kw in line for kw in (
|
|
"Retrieved", "Selected", "Finished", "MusicBrainz",
|
|
"Error", "ERROR", "Cannot", "failed", "No tracks",
|
|
)):
|
|
print(f"\n {stripped}", flush=True)
|
|
|
|
returncode = process.wait()
|
|
|
|
# Newline after last progress bar
|
|
print(flush=True)
|
|
|
|
tracks = _parse_cddb_lines(cddb_lines) if use_cddb else None
|
|
return tracks, returncode
|
|
|
|
|
|
def _extract_tracks(output_dir: Path, audio_format: AudioFormat) -> list[Path]:
|
|
"""Find abcde track files recursively and move them flat into output_dir.
|
|
|
|
abcde stores encoded files inside its temp dir as:
|
|
output_dir/abcde.XXXX/track01.flac
|
|
|
|
Moves them to:
|
|
output_dir/track01.flac
|
|
|
|
Args:
|
|
output_dir: Directory to search and target for flat layout
|
|
audio_format: Audio format
|
|
|
|
Returns:
|
|
Sorted list of moved files in output_dir
|
|
"""
|
|
ext = audio_format.extension.lstrip(".")
|
|
pattern = re.compile(rf"^track(\d+)\.{ext}$", re.IGNORECASE)
|
|
|
|
moved = []
|
|
for file in sorted(output_dir.rglob("*")):
|
|
if file.is_file() and pattern.match(file.name):
|
|
dest = output_dir / file.name
|
|
if file != dest:
|
|
logger.info("Extracting: %s", file.name)
|
|
file.rename(dest)
|
|
moved.append(dest)
|
|
|
|
return moved
|
|
|
|
|
|
def _rename_files(
|
|
output_dir: Path,
|
|
tracks: list[TrackInfo],
|
|
audio_format: AudioFormat,
|
|
) -> None:
|
|
"""Rename track files according to naming scheme.
|
|
|
|
Input: track01.flac, track02.flac, ...
|
|
Output: 01_-_title_-_artist.flac, ...
|
|
|
|
Falls back to plain 01.flac etc. for tracks without CDDB info.
|
|
|
|
Args:
|
|
output_dir: Directory with files
|
|
tracks: Track information from CDDB
|
|
audio_format: Audio format
|
|
"""
|
|
ext = audio_format.extension.lstrip(".")
|
|
abcde_pattern = re.compile(rf"^track(\d+)\.{ext}$", re.IGNORECASE)
|
|
by_num = {t.track_number: t for t in tracks}
|
|
|
|
for file in sorted(output_dir.glob(f"track*.{ext}")):
|
|
m = abcde_pattern.match(file.name)
|
|
if not m:
|
|
continue
|
|
num = int(m.group(1))
|
|
track = by_num.get(num)
|
|
if track:
|
|
new_name = (
|
|
f"{num:02d}_-_{_sanitize_name(track.title)}_-_"
|
|
f"{_sanitize_name(track.artist)}{audio_format.extension}"
|
|
)
|
|
else:
|
|
new_name = f"{num:02d}{audio_format.extension}"
|
|
|
|
new_path = output_dir / new_name
|
|
if file != new_path:
|
|
logger.info("Renaming: %s → %s", file.name, new_name)
|
|
print(f" {file.name} → {new_name}", flush=True)
|
|
file.rename(new_path)
|
|
|
|
|
|
def _rip_with_abcde(
|
|
device: str,
|
|
output_dir: Path,
|
|
audio_format: AudioFormat,
|
|
quality: str = "high",
|
|
parallel_jobs: int = 1,
|
|
use_pipes: bool = False,
|
|
use_cddb: bool = True,
|
|
) -> tuple[list[Path], list[TrackInfo] | None]:
|
|
"""Rip a CD with abcde directly to desired format.
|
|
|
|
Args:
|
|
device: CD drive, e.g. '/dev/cdrom'
|
|
output_dir: Target directory for files
|
|
audio_format: Output audio format
|
|
quality: Quality setting (low, medium, high)
|
|
parallel_jobs: Number of parallel encoder processes
|
|
use_pipes: True = faster, no WAV files
|
|
use_cddb: True = use CDDB lookup
|
|
|
|
Returns:
|
|
Tuple (list of created files, track information or None)
|
|
"""
|
|
output_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Build output format string: "flac" or "flac:-8" (with quality options)
|
|
encoder_opts = audio_format.get_encoder_options(quality)
|
|
output_fmt = audio_format.get_abcde_format()
|
|
if encoder_opts:
|
|
output_fmt = f"{output_fmt}:{encoder_opts}"
|
|
|
|
# abcde options:
|
|
# -a actions: cddb+read+encode+tag (no 'move' — we extract files ourselves)
|
|
# -p: pad track numbers with zeros
|
|
# -o format[:options]: output format with optional encoder options
|
|
# -d device: CD drive
|
|
# -x: eject CD after ripping
|
|
# -N: non-interactive (auto-select first CDDB match, no prompts)
|
|
actions = "cddb,read,encode,tag" if use_cddb else "read,encode"
|
|
|
|
cmd = [
|
|
"abcde",
|
|
"-a", actions,
|
|
"-p",
|
|
"-o", output_fmt,
|
|
"-d", device,
|
|
"-x",
|
|
"-N",
|
|
]
|
|
|
|
if parallel_jobs > 1:
|
|
cmd.extend(["-j", str(parallel_jobs)])
|
|
|
|
if use_pipes:
|
|
cmd.append("-P")
|
|
|
|
print(f"\n Command: {' '.join(cmd)}", flush=True)
|
|
logger.info("Starting abcde: %s", " ".join(cmd))
|
|
|
|
process = subprocess.Popen(
|
|
cmd,
|
|
cwd=str(output_dir),
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.STDOUT, # merge stderr into stdout
|
|
text=True,
|
|
bufsize=1, # line-buffered
|
|
)
|
|
|
|
tracks, returncode = _stream_abcde(process, use_cddb)
|
|
|
|
if returncode != 0:
|
|
raise RuntimeError(f"abcde failed (exit {returncode}).")
|
|
|
|
if use_cddb:
|
|
if tracks:
|
|
print(f"\n CDDB: {len(tracks)} tracks found", flush=True)
|
|
logger.info("CDDB data: %d tracks", len(tracks))
|
|
else:
|
|
print("\n CDDB: no track data found", flush=True)
|
|
logger.warning("CDDB lookup returned no track data")
|
|
|
|
# Extract track files from abcde's temp dir into output_dir (flat)
|
|
audio_files = _extract_tracks(output_dir, audio_format)
|
|
|
|
if not audio_files:
|
|
raise RuntimeError(
|
|
"No audio files found after ripping. "
|
|
"Check that a CD is in the drive."
|
|
)
|
|
|
|
logger.info("Ripping completed: %d tracks in %s", len(audio_files), output_dir)
|
|
return audio_files, tracks
|
|
|
|
|
|
def rip_disc(
|
|
device: str,
|
|
output_dir: Path,
|
|
audio_format: AudioFormat = AudioFormat.FLAC,
|
|
quality: str = "high",
|
|
parallel_jobs: int = 1,
|
|
use_pipes: bool = False,
|
|
use_cddb: bool = True,
|
|
) -> tuple[Path, str | None, list[TrackInfo] | None]:
|
|
"""Rip a CD directly to the desired format.
|
|
|
|
Args:
|
|
device: CD drive, e.g. '/dev/cdrom'
|
|
output_dir: Target directory for files
|
|
audio_format: Output audio format
|
|
quality: Quality setting (low, medium, high)
|
|
parallel_jobs: Number of parallel encoder processes
|
|
use_pipes: True = faster, no WAV files
|
|
use_cddb: True = use CDDB lookup
|
|
|
|
Returns:
|
|
Tuple (directory path, album name or None, track information or None)
|
|
"""
|
|
_, tracks = _rip_with_abcde(
|
|
device, output_dir, audio_format, quality, parallel_jobs, use_pipes, use_cddb
|
|
)
|
|
|
|
album_name = None
|
|
if tracks:
|
|
album_name = tracks[0].artist
|
|
print("\n Renaming files ...", flush=True)
|
|
_rename_files(output_dir, tracks, audio_format)
|
|
|
|
return output_dir, album_name, tracks
|
|
|
|
|
|
def interactive_rip(config: RipperConfig) -> None:
|
|
"""Interactive rip workflow for multiple CDs.
|
|
|
|
Files are placed under config.output_dir:
|
|
Album_Name/CD1/01_-_title_-_artist.flac, ...
|
|
|
|
Args:
|
|
config: Ripper configuration
|
|
"""
|
|
print("\n" + "=" * 60)
|
|
print(" Musiksammlung - Interactive CD Ripper (abcde)")
|
|
print("=" * 60)
|
|
print(f"\nCD Drive: {config.device}")
|
|
print(f"Audio Format: {config.audio_format.value}")
|
|
print(f"Quality: {config.quality}")
|
|
print(f"CDDB Lookup: {config.use_cddb}")
|
|
print(f"Parallel Encodes: {config.parallel_jobs}")
|
|
print(f"Pipes: {config.use_pipes}")
|
|
print(f"Output Directory: {config.output_dir.absolute()}")
|
|
print("\nNote: Do not use arrow keys while typing — press Enter to confirm.\n")
|
|
|
|
album_counter = 1
|
|
|
|
while True:
|
|
print(f"\n--- Album {album_counter} ---")
|
|
|
|
raw = input("Album name (Enter = CDDB name / default 'Album{N}'): ")
|
|
album_name = _clean_input(raw)
|
|
if not album_name:
|
|
album_name = f"Album{album_counter}"
|
|
|
|
disc_counter = 1
|
|
|
|
while True:
|
|
print(f"\n Album: {album_name}")
|
|
print(f" CD Drive: {config.device}")
|
|
|
|
raw_disc = input(" CD number [1]: ")
|
|
disc_num = int(_clean_input(raw_disc)) if _clean_input(raw_disc) else 1
|
|
|
|
disc_dir = (
|
|
config.output_dir
|
|
/ _sanitize_name(album_name)
|
|
/ f"CD{disc_num}"
|
|
)
|
|
|
|
print(f"\n Ripping to: {disc_dir}")
|
|
print(" " + "-" * 50)
|
|
|
|
try:
|
|
_, detected_album, tracks = rip_disc(
|
|
device=config.device,
|
|
output_dir=disc_dir,
|
|
audio_format=config.audio_format,
|
|
quality=config.quality,
|
|
parallel_jobs=config.parallel_jobs,
|
|
use_pipes=config.use_pipes,
|
|
use_cddb=config.use_cddb,
|
|
)
|
|
|
|
print("\n " + "-" * 50)
|
|
if tracks:
|
|
print(f" ✓ Done — {len(tracks)} tracks")
|
|
for t in tracks:
|
|
print(f" {t.track_number:2d}. {t.title} [{t.artist}]")
|
|
else:
|
|
print(" ✓ Done (no CDDB data)")
|
|
|
|
except RuntimeError as e:
|
|
print(f"\n ✗ Error: {e}")
|
|
raw_retry = input(" Try again? (y/n): ")
|
|
if _clean_input(raw_retry).lower() != "y":
|
|
print(" Aborting disc.")
|
|
break
|
|
continue
|
|
|
|
raw_next = input("\n Next CD for this album? (y/n): ")
|
|
if _clean_input(raw_next).lower() != "y":
|
|
break
|
|
|
|
disc_counter += 1
|
|
|
|
raw_album = input("\nNext album? (y/n): ")
|
|
if _clean_input(raw_album).lower() != "y":
|
|
break
|
|
|
|
album_counter += 1
|
|
|
|
print("\n" + "=" * 60)
|
|
print("Ripping completed!")
|
|
print(f"Files are in: {config.output_dir.absolute()}")
|
|
print("\nNext steps:")
|
|
print(" 1. Check filenames and tags")
|
|
if config.use_cddb:
|
|
print(" 2. Adjust tags/covers with 'musiksammlung apply'")
|
|
else:
|
|
print(" 2. Run 'musiksammlung scan' to extract metadata")
|
|
print(" 3. Run 'musiksammlung apply' to organize & tag")
|
|
print("=" * 60 + "\n")
|