Musiksammlung/src/musiksammlung/ripper.py

528 lines
17 KiB
Python
Raw Normal View History

"""CD-Ripping via abcde with interactive multi-disc workflow."""
from __future__ import annotations
import logging
import re
import subprocess
from pathlib import Path
from typing import NamedTuple
from pydantic import BaseModel
from musiksammlung.config import AudioFormat
from musiksammlung.models import Album as AlbumModel
from musiksammlung.models import Disc as DiscModel
from musiksammlung.models import Track as TrackModel
logger = logging.getLogger(__name__)
# ANSI escape sequence pattern (e.g. arrow keys from broken readline)
_ANSI_ESC = re.compile(r"(\x1b|\^)\[[\d;]*[A-Za-z@]?")
class TrackInfo(NamedTuple):
"""Track information from abcde."""
track_number: int
artist: str
title: str
class RipperConfig(BaseModel):
"""Configuration for ripping process."""
device: str = "/dev/cdrom"
audio_format: AudioFormat = AudioFormat.FLAC
output_dir: Path = Path("temp")
quality: str = "high" # low, medium, high
parallel_jobs: int = 1 # Number of parallel encoder processes
use_pipes: bool = False # True = faster, no WAV files
use_cddb: bool = True # Use CDDB lookup
def _clean_input(raw: str) -> str:
"""Strip ANSI escape codes, control characters and surrounding quotes.
Handles broken readline environments where arrow keys produce
literal escape sequences like ^[[D instead of moving the cursor.
Args:
raw: Raw string from input()
Returns:
Cleaned string
"""
cleaned = _ANSI_ESC.sub("", raw)
cleaned = re.sub(r"[\x00-\x1f\x7f]", "", cleaned)
cleaned = cleaned.strip().strip('"\'')
return cleaned
def _sanitize_name(name: str) -> str:
"""Remove problematic characters and replace spaces.
Args:
name: Original name
Returns:
Cleaned name (spaces -> underscores)
"""
name = name.replace(" ", "_")
name = re.sub(r'[<>:"/\\|?*]', "", name)
name = name.strip("_")
return name
def _parse_cddb_lines(lines: list[str]) -> list[TrackInfo]:
"""Parse CDDB track list from abcde output lines.
Matches lines like: "1: Wolfgang Anheisser - Wer recht in Freuden wandern will"
Args:
lines: Lines collected from abcde stdout+stderr
Returns:
List of TrackInfo (may be empty if CDDB lookup failed)
"""
tracks = []
pattern = re.compile(r"^\s*(\d+):\s*(.+?)\s+-\s+(.+)$")
for line in lines:
m = pattern.match(line)
if m:
tracks.append(TrackInfo(
track_number=int(m.group(1)),
artist=m.group(2).strip(),
title=m.group(3).strip(),
))
return tracks
def _stream_abcde(
process: subprocess.Popen,
use_cddb: bool,
) -> tuple[list[TrackInfo] | None, int]:
"""Stream abcde output live, show meaningful progress, collect CDDB data.
Filters abcde/cdparanoia output into three layers:
- Track progress: 'Grabbing track N: Title'
- Sector progress bar from cdparanoia
- CDDB/MusicBrainz info lines
Args:
process: Running abcde subprocess
use_cddb: Whether to expect and parse CDDB output
Returns:
Tuple (list of TrackInfo or None, total track count)
"""
grab_re = re.compile(r"Grabbing.*track\s+(\d+)(?:\s+of\s+(\d+))?[:\s]*(.*)", re.I)
tag_re = re.compile(r"Tagging track\s+(\d+)\s+of\s+(\d+)", re.I)
sector_re = re.compile(r"\(== PROGRESS ==.*\|\s*(\d+)\s+(\d+)\s*\]")
cddb_re = re.compile(r"^\s*(\d+):\s*(.+?)\s+-\s+(.+)$")
header_re = re.compile(r"-{2,}.+-{2,}") # ---- Artist / Album ----
total_re = re.compile(r"tracks?:\s+([\d\s]+)", re.I)
all_lines: list[str] = []
cddb_lines: list[str] = []
total_tracks = 0
current_track = 0
track_end_sector = 0
for raw in process.stdout:
line = raw.rstrip("\n\r")
all_lines.append(line)
# ── Track count from "Grabbing entire CD - tracks: 01 02 03 ..."
m = total_re.search(line)
if m and total_tracks == 0:
nums = m.group(1).split()
if nums:
total_tracks = len(nums)
# ── Grab / encode progress
m = grab_re.search(line)
if m:
current_track = int(m.group(1))
if m.group(2):
total_tracks = int(m.group(2))
title = m.group(3).strip().rstrip(".")
counter = f"{current_track}/{total_tracks}" if total_tracks else str(current_track)
print(f"\n Track {counter} {title}", flush=True)
track_end_sector = 0 # reset sector bar for new track
continue
# ── Tagging progress
m = tag_re.search(line)
if m:
print(f"\r Tagging {m.group(1)}/{m.group(2)} ", flush=True)
continue
# ── cdparanoia sector progress bar
m = sector_re.search(line)
if m:
cur = int(m.group(1))
end = int(m.group(2)) if int(m.group(2)) > 0 else cur
if track_end_sector == 0:
track_end_sector = end
pct = min(cur / track_end_sector, 1.0) if track_end_sector > 0 else 0
bar_w = 30
filled = int(pct * bar_w)
bar = "" * filled + "" * (bar_w - filled)
mb = cur * 2352 / 1_048_576 # rough size in MB
print(f"\r [{bar}] {pct:5.1%} {mb:5.1f} MB", end="", flush=True)
continue
# ── CDDB / MusicBrainz album header
if header_re.search(line):
print(f"\n {line.strip()}", flush=True)
continue
# ── CDDB track lines "1: Artist - Title"
m = cddb_re.match(line)
if m:
cddb_lines.append(line)
continue
# ── Other important info (errors, status)
stripped = line.strip()
if stripped and any(kw in line for kw in (
"Retrieved", "Selected", "Finished", "MusicBrainz",
"Error", "ERROR", "Cannot", "failed", "No tracks",
)):
print(f"\n {stripped}", flush=True)
returncode = process.wait()
# Newline after last progress bar
print(flush=True)
tracks = _parse_cddb_lines(cddb_lines) if use_cddb else None
return tracks, returncode
def _extract_tracks(output_dir: Path, audio_format: AudioFormat) -> list[Path]:
"""Find abcde track files recursively and move them flat into output_dir.
abcde stores encoded files inside its temp dir as:
output_dir/abcde.XXXX/track01.flac
Moves them to:
output_dir/track01.flac
Args:
output_dir: Directory to search and target for flat layout
audio_format: Audio format
Returns:
Sorted list of moved files in output_dir
"""
ext = audio_format.extension.lstrip(".")
pattern = re.compile(rf"^track(\d+)\.{ext}$", re.IGNORECASE)
moved = []
for file in sorted(output_dir.rglob("*")):
if file.is_file() and pattern.match(file.name):
dest = output_dir / file.name
if file != dest:
logger.info("Extracting: %s", file.name)
file.rename(dest)
moved.append(dest)
return moved
def _rename_files(
output_dir: Path,
tracks: list[TrackInfo],
audio_format: AudioFormat,
) -> None:
"""Rename track files according to naming scheme.
Input: track01.flac, track02.flac, ...
Output: 01_-_title_-_artist.flac, ...
Falls back to plain 01.flac etc. for tracks without CDDB info.
Args:
output_dir: Directory with files
tracks: Track information from CDDB
audio_format: Audio format
"""
ext = audio_format.extension.lstrip(".")
abcde_pattern = re.compile(rf"^track(\d+)\.{ext}$", re.IGNORECASE)
by_num = {t.track_number: t for t in tracks}
for file in sorted(output_dir.glob(f"track*.{ext}")):
m = abcde_pattern.match(file.name)
if not m:
continue
num = int(m.group(1))
track = by_num.get(num)
if track:
new_name = (
f"{num:02d}_-_{_sanitize_name(track.title)}_-_"
f"{_sanitize_name(track.artist)}{audio_format.extension}"
)
else:
new_name = f"{num:02d}{audio_format.extension}"
new_path = output_dir / new_name
if file != new_path:
logger.info("Renaming: %s%s", file.name, new_name)
print(f" {file.name}{new_name}", flush=True)
file.rename(new_path)
def _rip_with_abcde(
device: str,
output_dir: Path,
audio_format: AudioFormat,
quality: str = "high",
parallel_jobs: int = 1,
use_pipes: bool = False,
use_cddb: bool = True,
) -> tuple[list[Path], list[TrackInfo] | None]:
"""Rip a CD with abcde directly to desired format.
Args:
device: CD drive, e.g. '/dev/cdrom'
output_dir: Target directory for files
audio_format: Output audio format
quality: Quality setting (low, medium, high)
parallel_jobs: Number of parallel encoder processes
use_pipes: True = faster, no WAV files
use_cddb: True = use CDDB lookup
Returns:
Tuple (list of created files, track information or None)
"""
output_dir.mkdir(parents=True, exist_ok=True)
# Build output format string: "flac" or "flac:-8" (with quality options)
encoder_opts = audio_format.get_encoder_options(quality)
output_fmt = audio_format.get_abcde_format()
if encoder_opts:
output_fmt = f"{output_fmt}:{encoder_opts}"
# abcde options:
# -a actions: cddb+read+encode+tag (no 'move' — we extract files ourselves)
# -p: pad track numbers with zeros
# -o format[:options]: output format with optional encoder options
# -d device: CD drive
# -x: eject CD after ripping
# -N: non-interactive (auto-select first CDDB match, no prompts)
actions = "cddb,read,encode,tag" if use_cddb else "read,encode"
cmd = [
"abcde",
"-a", actions,
"-p",
"-o", output_fmt,
"-d", device,
"-x",
"-N",
]
if parallel_jobs > 1:
cmd.extend(["-j", str(parallel_jobs)])
if use_pipes:
cmd.append("-P")
print(f"\n Command: {' '.join(cmd)}", flush=True)
logger.info("Starting abcde: %s", " ".join(cmd))
process = subprocess.Popen(
cmd,
cwd=str(output_dir),
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT, # merge stderr into stdout
text=True,
bufsize=1, # line-buffered
)
tracks, returncode = _stream_abcde(process, use_cddb)
if returncode != 0:
raise RuntimeError(f"abcde failed (exit {returncode}).")
if use_cddb:
if tracks:
print(f"\n CDDB: {len(tracks)} tracks found", flush=True)
logger.info("CDDB data: %d tracks", len(tracks))
else:
print("\n CDDB: no track data found", flush=True)
logger.warning("CDDB lookup returned no track data")
# Extract track files from abcde's temp dir into output_dir (flat)
audio_files = _extract_tracks(output_dir, audio_format)
if not audio_files:
raise RuntimeError(
"No audio files found after ripping. "
"Check that a CD is in the drive."
)
logger.info("Ripping completed: %d tracks in %s", len(audio_files), output_dir)
return audio_files, tracks
def rip_disc(
device: str,
output_dir: Path,
audio_format: AudioFormat = AudioFormat.FLAC,
quality: str = "high",
parallel_jobs: int = 1,
use_pipes: bool = False,
use_cddb: bool = True,
) -> tuple[Path, str | None, list[TrackInfo] | None]:
"""Rip a CD directly to the desired format.
Args:
device: CD drive, e.g. '/dev/cdrom'
output_dir: Target directory for files
audio_format: Output audio format
quality: Quality setting (low, medium, high)
parallel_jobs: Number of parallel encoder processes
use_pipes: True = faster, no WAV files
use_cddb: True = use CDDB lookup
Returns:
Tuple (directory path, album name or None, track information or None)
"""
_, tracks = _rip_with_abcde(
device, output_dir, audio_format, quality, parallel_jobs, use_pipes, use_cddb
)
album_name = None
if tracks:
album_name = tracks[0].artist
print("\n Renaming files ...", flush=True)
_rename_files(output_dir, tracks, audio_format)
return output_dir, album_name, tracks
def interactive_rip(config: RipperConfig) -> None:
"""Interactive rip workflow for multiple CDs.
Files are placed under config.output_dir:
Album_Name/CD1/01_-_title_-_artist.flac, ...
Args:
config: Ripper configuration
"""
print("\n" + "=" * 60)
print(" Musiksammlung - Interactive CD Ripper (abcde)")
print("=" * 60)
print(f"\nCD Drive: {config.device}")
print(f"Audio Format: {config.audio_format.value}")
print(f"Quality: {config.quality}")
print(f"CDDB Lookup: {config.use_cddb}")
print(f"Parallel Encodes: {config.parallel_jobs}")
print(f"Pipes: {config.use_pipes}")
print(f"Output Directory: {config.output_dir.absolute()}")
print("\nNote: Do not use arrow keys while typing — press Enter to confirm.\n")
album_counter = 1
while True:
print(f"\n--- Album {album_counter} ---")
raw = input("Album name (Enter = CDDB name / default 'Album{N}'): ")
album_name = _clean_input(raw)
if not album_name:
album_name = f"Album{album_counter}"
disc_counter = 1
all_discs: list[DiscModel] = []
while True:
print(f"\n Album: {album_name}")
print(f" CD Drive: {config.device}")
raw_disc = input(" CD number [1]: ")
disc_num = int(_clean_input(raw_disc)) if _clean_input(raw_disc) else 1
disc_dir = (
config.output_dir
/ _sanitize_name(album_name)
/ f"CD{disc_num}"
)
print(f"\n Ripping to: {disc_dir}")
print(" " + "-" * 50)
try:
_, detected_album, tracks = rip_disc(
device=config.device,
output_dir=disc_dir,
audio_format=config.audio_format,
quality=config.quality,
parallel_jobs=config.parallel_jobs,
use_pipes=config.use_pipes,
use_cddb=config.use_cddb,
)
print("\n " + "-" * 50)
if tracks:
print(f" ✓ Done — {len(tracks)} tracks")
for t in tracks:
print(f" {t.track_number:2d}. {t.title} [{t.artist}]")
all_discs.append(DiscModel(
disc_number=disc_num,
tracks=[
TrackModel(
track_number=t.track_number,
title=t.title,
artist=t.artist,
)
for t in tracks
],
))
else:
print(" ✓ Done (no CDDB data)")
except RuntimeError as e:
print(f"\n ✗ Error: {e}")
raw_retry = input(" Try again? (y/n): ")
if _clean_input(raw_retry).lower() != "y":
print(" Aborting disc.")
break
continue
raw_next = input("\n Next CD for this album? (y/n): ")
if _clean_input(raw_next).lower() != "y":
break
disc_counter += 1
if all_discs:
artist = all_discs[0].tracks[0].artist or album_name
album_model = AlbumModel(artist=artist, album=album_name, discs=all_discs)
album_root = config.output_dir / _sanitize_name(album_name)
json_path = album_root / "album.json"
json_path.write_text(
album_model.model_dump_json(indent=2), encoding="utf-8"
)
print(f"\n album.json gespeichert: {json_path}")
print(" → Weiter mit: musiksammlung apply <album-verzeichnis> album.json")
raw_album = input("\nNext album? (y/n): ")
if _clean_input(raw_album).lower() != "y":
break
album_counter += 1
print("\n" + "=" * 60)
print("Ripping completed!")
print(f"Files are in: {config.output_dir.absolute()}")
print("\nNext steps:")
print(" 1. Check filenames and tags")
if config.use_cddb:
print(" 2. Adjust tags/covers with 'musiksammlung apply'")
else:
print(" 2. Run 'musiksammlung scan' to extract metadata")
print(" 3. Run 'musiksammlung apply' to organize & tag")
print("=" * 60 + "\n")