Musiksammlung/src/musiksammlung/ripper.py

502 lines
16 KiB
Python
Raw Normal View History

"""CD-Ripping via abcde with interactive multi-disc workflow."""
from __future__ import annotations
import logging
import re
import subprocess
from pathlib import Path
from typing import NamedTuple
from pydantic import BaseModel
from musiksammlung.config import AudioFormat
logger = logging.getLogger(__name__)
# ANSI escape sequence pattern (e.g. arrow keys from broken readline)
_ANSI_ESC = re.compile(r"(\x1b|\^)\[[\d;]*[A-Za-z@]?")
class TrackInfo(NamedTuple):
"""Track information from abcde."""
track_number: int
artist: str
title: str
class RipperConfig(BaseModel):
"""Configuration for ripping process."""
device: str = "/dev/cdrom"
audio_format: AudioFormat = AudioFormat.FLAC
output_dir: Path = Path("temp")
quality: str = "high" # low, medium, high
parallel_jobs: int = 1 # Number of parallel encoder processes
use_pipes: bool = False # True = faster, no WAV files
use_cddb: bool = True # Use CDDB lookup
def _clean_input(raw: str) -> str:
"""Strip ANSI escape codes, control characters and surrounding quotes.
Handles broken readline environments where arrow keys produce
literal escape sequences like ^[[D instead of moving the cursor.
Args:
raw: Raw string from input()
Returns:
Cleaned string
"""
cleaned = _ANSI_ESC.sub("", raw)
cleaned = re.sub(r"[\x00-\x1f\x7f]", "", cleaned)
cleaned = cleaned.strip().strip('"\'')
return cleaned
def _sanitize_name(name: str) -> str:
"""Remove problematic characters and replace spaces.
Args:
name: Original name
Returns:
Cleaned name (spaces -> underscores)
"""
name = name.replace(" ", "_")
name = re.sub(r'[<>:"/\\|?*]', "", name)
name = name.strip("_")
return name
def _parse_cddb_lines(lines: list[str]) -> list[TrackInfo]:
"""Parse CDDB track list from abcde output lines.
Matches lines like: "1: Wolfgang Anheisser - Wer recht in Freuden wandern will"
Args:
lines: Lines collected from abcde stdout+stderr
Returns:
List of TrackInfo (may be empty if CDDB lookup failed)
"""
tracks = []
pattern = re.compile(r"^\s*(\d+):\s*(.+?)\s+-\s+(.+)$")
for line in lines:
m = pattern.match(line)
if m:
tracks.append(TrackInfo(
track_number=int(m.group(1)),
artist=m.group(2).strip(),
title=m.group(3).strip(),
))
return tracks
def _stream_abcde(
process: subprocess.Popen,
use_cddb: bool,
) -> tuple[list[TrackInfo] | None, int]:
"""Stream abcde output live, show meaningful progress, collect CDDB data.
Filters abcde/cdparanoia output into three layers:
- Track progress: 'Grabbing track N: Title'
- Sector progress bar from cdparanoia
- CDDB/MusicBrainz info lines
Args:
process: Running abcde subprocess
use_cddb: Whether to expect and parse CDDB output
Returns:
Tuple (list of TrackInfo or None, total track count)
"""
grab_re = re.compile(r"Grabbing.*track\s+(\d+)(?:\s+of\s+(\d+))?[:\s]*(.*)", re.I)
tag_re = re.compile(r"Tagging track\s+(\d+)\s+of\s+(\d+)", re.I)
sector_re = re.compile(r"\(== PROGRESS ==.*\|\s*(\d+)\s+(\d+)\s*\]")
cddb_re = re.compile(r"^\s*(\d+):\s*(.+?)\s+-\s+(.+)$")
header_re = re.compile(r"-{2,}.+-{2,}") # ---- Artist / Album ----
total_re = re.compile(r"tracks?:\s+([\d\s]+)", re.I)
all_lines: list[str] = []
cddb_lines: list[str] = []
total_tracks = 0
current_track = 0
track_end_sector = 0
for raw in process.stdout:
line = raw.rstrip("\n\r")
all_lines.append(line)
# ── Track count from "Grabbing entire CD - tracks: 01 02 03 ..."
m = total_re.search(line)
if m and total_tracks == 0:
nums = m.group(1).split()
if nums:
total_tracks = len(nums)
# ── Grab / encode progress
m = grab_re.search(line)
if m:
current_track = int(m.group(1))
if m.group(2):
total_tracks = int(m.group(2))
title = m.group(3).strip().rstrip(".")
counter = f"{current_track}/{total_tracks}" if total_tracks else str(current_track)
print(f"\n Track {counter} {title}", flush=True)
track_end_sector = 0 # reset sector bar for new track
continue
# ── Tagging progress
m = tag_re.search(line)
if m:
print(f"\r Tagging {m.group(1)}/{m.group(2)} ", flush=True)
continue
# ── cdparanoia sector progress bar
m = sector_re.search(line)
if m:
cur = int(m.group(1))
end = int(m.group(2)) if int(m.group(2)) > 0 else cur
if track_end_sector == 0:
track_end_sector = end
pct = min(cur / track_end_sector, 1.0) if track_end_sector > 0 else 0
bar_w = 30
filled = int(pct * bar_w)
bar = "" * filled + "" * (bar_w - filled)
mb = cur * 2352 / 1_048_576 # rough size in MB
print(f"\r [{bar}] {pct:5.1%} {mb:5.1f} MB", end="", flush=True)
continue
# ── CDDB / MusicBrainz album header
if header_re.search(line):
print(f"\n {line.strip()}", flush=True)
continue
# ── CDDB track lines "1: Artist - Title"
m = cddb_re.match(line)
if m:
cddb_lines.append(line)
continue
# ── Other important info (errors, status)
stripped = line.strip()
if stripped and any(kw in line for kw in (
"Retrieved", "Selected", "Finished", "MusicBrainz",
"Error", "ERROR", "Cannot", "failed", "No tracks",
)):
print(f"\n {stripped}", flush=True)
returncode = process.wait()
# Newline after last progress bar
print(flush=True)
tracks = _parse_cddb_lines(cddb_lines) if use_cddb else None
return tracks, returncode
def _extract_tracks(output_dir: Path, audio_format: AudioFormat) -> list[Path]:
"""Find abcde track files recursively and move them flat into output_dir.
abcde stores encoded files inside its temp dir as:
output_dir/abcde.XXXX/track01.flac
Moves them to:
output_dir/track01.flac
Args:
output_dir: Directory to search and target for flat layout
audio_format: Audio format
Returns:
Sorted list of moved files in output_dir
"""
ext = audio_format.extension.lstrip(".")
pattern = re.compile(rf"^track(\d+)\.{ext}$", re.IGNORECASE)
moved = []
for file in sorted(output_dir.rglob("*")):
if file.is_file() and pattern.match(file.name):
dest = output_dir / file.name
if file != dest:
logger.info("Extracting: %s", file.name)
file.rename(dest)
moved.append(dest)
return moved
def _rename_files(
output_dir: Path,
tracks: list[TrackInfo],
audio_format: AudioFormat,
) -> None:
"""Rename track files according to naming scheme.
Input: track01.flac, track02.flac, ...
Output: 01_-_title_-_artist.flac, ...
Falls back to plain 01.flac etc. for tracks without CDDB info.
Args:
output_dir: Directory with files
tracks: Track information from CDDB
audio_format: Audio format
"""
ext = audio_format.extension.lstrip(".")
abcde_pattern = re.compile(rf"^track(\d+)\.{ext}$", re.IGNORECASE)
by_num = {t.track_number: t for t in tracks}
for file in sorted(output_dir.glob(f"track*.{ext}")):
m = abcde_pattern.match(file.name)
if not m:
continue
num = int(m.group(1))
track = by_num.get(num)
if track:
new_name = (
f"{num:02d}_-_{_sanitize_name(track.title)}_-_"
f"{_sanitize_name(track.artist)}{audio_format.extension}"
)
else:
new_name = f"{num:02d}{audio_format.extension}"
new_path = output_dir / new_name
if file != new_path:
logger.info("Renaming: %s%s", file.name, new_name)
print(f" {file.name}{new_name}", flush=True)
file.rename(new_path)
def _rip_with_abcde(
device: str,
output_dir: Path,
audio_format: AudioFormat,
quality: str = "high",
parallel_jobs: int = 1,
use_pipes: bool = False,
use_cddb: bool = True,
) -> tuple[list[Path], list[TrackInfo] | None]:
"""Rip a CD with abcde directly to desired format.
Args:
device: CD drive, e.g. '/dev/cdrom'
output_dir: Target directory for files
audio_format: Output audio format
quality: Quality setting (low, medium, high)
parallel_jobs: Number of parallel encoder processes
use_pipes: True = faster, no WAV files
use_cddb: True = use CDDB lookup
Returns:
Tuple (list of created files, track information or None)
"""
output_dir.mkdir(parents=True, exist_ok=True)
# Build output format string: "flac" or "flac:-8" (with quality options)
encoder_opts = audio_format.get_encoder_options(quality)
output_fmt = audio_format.get_abcde_format()
if encoder_opts:
output_fmt = f"{output_fmt}:{encoder_opts}"
# abcde options:
# -a actions: cddb+read+encode+tag (no 'move' — we extract files ourselves)
# -p: pad track numbers with zeros
# -o format[:options]: output format with optional encoder options
# -d device: CD drive
# -x: eject CD after ripping
# -N: non-interactive (auto-select first CDDB match, no prompts)
actions = "cddb,read,encode,tag" if use_cddb else "read,encode"
cmd = [
"abcde",
"-a", actions,
"-p",
"-o", output_fmt,
"-d", device,
"-x",
"-N",
]
if parallel_jobs > 1:
cmd.extend(["-j", str(parallel_jobs)])
if use_pipes:
cmd.append("-P")
print(f"\n Command: {' '.join(cmd)}", flush=True)
logger.info("Starting abcde: %s", " ".join(cmd))
process = subprocess.Popen(
cmd,
cwd=str(output_dir),
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT, # merge stderr into stdout
text=True,
bufsize=1, # line-buffered
)
tracks, returncode = _stream_abcde(process, use_cddb)
if returncode != 0:
raise RuntimeError(f"abcde failed (exit {returncode}).")
if use_cddb:
if tracks:
print(f"\n CDDB: {len(tracks)} tracks found", flush=True)
logger.info("CDDB data: %d tracks", len(tracks))
else:
print("\n CDDB: no track data found", flush=True)
logger.warning("CDDB lookup returned no track data")
# Extract track files from abcde's temp dir into output_dir (flat)
audio_files = _extract_tracks(output_dir, audio_format)
if not audio_files:
raise RuntimeError(
"No audio files found after ripping. "
"Check that a CD is in the drive."
)
logger.info("Ripping completed: %d tracks in %s", len(audio_files), output_dir)
return audio_files, tracks
def rip_disc(
device: str,
output_dir: Path,
audio_format: AudioFormat = AudioFormat.FLAC,
quality: str = "high",
parallel_jobs: int = 1,
use_pipes: bool = False,
use_cddb: bool = True,
) -> tuple[Path, str | None, list[TrackInfo] | None]:
"""Rip a CD directly to the desired format.
Args:
device: CD drive, e.g. '/dev/cdrom'
output_dir: Target directory for files
audio_format: Output audio format
quality: Quality setting (low, medium, high)
parallel_jobs: Number of parallel encoder processes
use_pipes: True = faster, no WAV files
use_cddb: True = use CDDB lookup
Returns:
Tuple (directory path, album name or None, track information or None)
"""
_, tracks = _rip_with_abcde(
device, output_dir, audio_format, quality, parallel_jobs, use_pipes, use_cddb
)
album_name = None
if tracks:
album_name = tracks[0].artist
print("\n Renaming files ...", flush=True)
_rename_files(output_dir, tracks, audio_format)
return output_dir, album_name, tracks
def interactive_rip(config: RipperConfig) -> None:
"""Interactive rip workflow for multiple CDs.
Files are placed under config.output_dir:
Album_Name/CD1/01_-_title_-_artist.flac, ...
Args:
config: Ripper configuration
"""
print("\n" + "=" * 60)
print(" Musiksammlung - Interactive CD Ripper (abcde)")
print("=" * 60)
print(f"\nCD Drive: {config.device}")
print(f"Audio Format: {config.audio_format.value}")
print(f"Quality: {config.quality}")
print(f"CDDB Lookup: {config.use_cddb}")
print(f"Parallel Encodes: {config.parallel_jobs}")
print(f"Pipes: {config.use_pipes}")
print(f"Output Directory: {config.output_dir.absolute()}")
print("\nNote: Do not use arrow keys while typing — press Enter to confirm.\n")
album_counter = 1
while True:
print(f"\n--- Album {album_counter} ---")
raw = input("Album name (Enter = CDDB name / default 'Album{N}'): ")
album_name = _clean_input(raw)
if not album_name:
album_name = f"Album{album_counter}"
disc_counter = 1
while True:
print(f"\n Album: {album_name}")
print(f" CD Drive: {config.device}")
raw_disc = input(" CD number [1]: ")
disc_num = int(_clean_input(raw_disc)) if _clean_input(raw_disc) else 1
disc_dir = (
config.output_dir
/ _sanitize_name(album_name)
/ f"CD{disc_num}"
)
print(f"\n Ripping to: {disc_dir}")
print(" " + "-" * 50)
try:
_, detected_album, tracks = rip_disc(
device=config.device,
output_dir=disc_dir,
audio_format=config.audio_format,
quality=config.quality,
parallel_jobs=config.parallel_jobs,
use_pipes=config.use_pipes,
use_cddb=config.use_cddb,
)
print("\n " + "-" * 50)
if tracks:
print(f" ✓ Done — {len(tracks)} tracks")
for t in tracks:
print(f" {t.track_number:2d}. {t.title} [{t.artist}]")
else:
print(" ✓ Done (no CDDB data)")
except RuntimeError as e:
print(f"\n ✗ Error: {e}")
raw_retry = input(" Try again? (y/n): ")
if _clean_input(raw_retry).lower() != "y":
print(" Aborting disc.")
break
continue
raw_next = input("\n Next CD for this album? (y/n): ")
if _clean_input(raw_next).lower() != "y":
break
disc_counter += 1
raw_album = input("\nNext album? (y/n): ")
if _clean_input(raw_album).lower() != "y":
break
album_counter += 1
print("\n" + "=" * 60)
print("Ripping completed!")
print(f"Files are in: {config.output_dir.absolute()}")
print("\nNext steps:")
print(" 1. Check filenames and tags")
if config.use_cddb:
print(" 2. Adjust tags/covers with 'musiksammlung apply'")
else:
print(" 2. Run 'musiksammlung scan' to extract metadata")
print(" 3. Run 'musiksammlung apply' to organize & tag")
print("=" * 60 + "\n")