"""CD-Ripping via abcde with interactive multi-disc workflow.""" from __future__ import annotations import logging import re import subprocess from pathlib import Path from typing import NamedTuple from pydantic import BaseModel from musiksammlung.config import AudioFormat logger = logging.getLogger(__name__) # ANSI escape sequence pattern (e.g. arrow keys from broken readline) _ANSI_ESC = re.compile(r"(\x1b|\^)\[[\d;]*[A-Za-z@]?") class TrackInfo(NamedTuple): """Track information from abcde.""" track_number: int artist: str title: str class RipperConfig(BaseModel): """Configuration for ripping process.""" device: str = "/dev/cdrom" audio_format: AudioFormat = AudioFormat.FLAC output_dir: Path = Path("temp") quality: str = "high" # low, medium, high parallel_jobs: int = 1 # Number of parallel encoder processes use_pipes: bool = False # True = faster, no WAV files use_cddb: bool = True # Use CDDB lookup def _clean_input(raw: str) -> str: """Strip ANSI escape codes, control characters and surrounding quotes. Handles broken readline environments where arrow keys produce literal escape sequences like ^[[D instead of moving the cursor. Args: raw: Raw string from input() Returns: Cleaned string """ cleaned = _ANSI_ESC.sub("", raw) cleaned = re.sub(r"[\x00-\x1f\x7f]", "", cleaned) cleaned = cleaned.strip().strip('"\'') return cleaned def _sanitize_name(name: str) -> str: """Remove problematic characters and replace spaces. Args: name: Original name Returns: Cleaned name (spaces -> underscores) """ name = name.replace(" ", "_") name = re.sub(r'[<>:"/\\|?*]', "", name) name = name.strip("_") return name def _parse_cddb_lines(lines: list[str]) -> list[TrackInfo]: """Parse CDDB track list from abcde output lines. Matches lines like: "1: Wolfgang Anheisser - Wer recht in Freuden wandern will" Args: lines: Lines collected from abcde stdout+stderr Returns: List of TrackInfo (may be empty if CDDB lookup failed) """ tracks = [] pattern = re.compile(r"^\s*(\d+):\s*(.+?)\s+-\s+(.+)$") for line in lines: m = pattern.match(line) if m: tracks.append(TrackInfo( track_number=int(m.group(1)), artist=m.group(2).strip(), title=m.group(3).strip(), )) return tracks def _stream_abcde( process: subprocess.Popen, use_cddb: bool, ) -> tuple[list[TrackInfo] | None, int]: """Stream abcde output live, show meaningful progress, collect CDDB data. Filters abcde/cdparanoia output into three layers: - Track progress: 'Grabbing track N: Title' - Sector progress bar from cdparanoia - CDDB/MusicBrainz info lines Args: process: Running abcde subprocess use_cddb: Whether to expect and parse CDDB output Returns: Tuple (list of TrackInfo or None, total track count) """ grab_re = re.compile(r"Grabbing.*track\s+(\d+)(?:\s+of\s+(\d+))?[:\s]*(.*)", re.I) tag_re = re.compile(r"Tagging track\s+(\d+)\s+of\s+(\d+)", re.I) sector_re = re.compile(r"\(== PROGRESS ==.*\|\s*(\d+)\s+(\d+)\s*\]") cddb_re = re.compile(r"^\s*(\d+):\s*(.+?)\s+-\s+(.+)$") header_re = re.compile(r"-{2,}.+-{2,}") # ---- Artist / Album ---- total_re = re.compile(r"tracks?:\s+([\d\s]+)", re.I) all_lines: list[str] = [] cddb_lines: list[str] = [] total_tracks = 0 current_track = 0 track_end_sector = 0 for raw in process.stdout: line = raw.rstrip("\n\r") all_lines.append(line) # ── Track count from "Grabbing entire CD - tracks: 01 02 03 ..." m = total_re.search(line) if m and total_tracks == 0: nums = m.group(1).split() if nums: total_tracks = len(nums) # ── Grab / encode progress m = grab_re.search(line) if m: current_track = int(m.group(1)) if m.group(2): total_tracks = int(m.group(2)) title = m.group(3).strip().rstrip(".") counter = f"{current_track}/{total_tracks}" if total_tracks else str(current_track) print(f"\n Track {counter} {title}", flush=True) track_end_sector = 0 # reset sector bar for new track continue # ── Tagging progress m = tag_re.search(line) if m: print(f"\r Tagging {m.group(1)}/{m.group(2)} ", flush=True) continue # ── cdparanoia sector progress bar m = sector_re.search(line) if m: cur = int(m.group(1)) end = int(m.group(2)) if int(m.group(2)) > 0 else cur if track_end_sector == 0: track_end_sector = end pct = min(cur / track_end_sector, 1.0) if track_end_sector > 0 else 0 bar_w = 30 filled = int(pct * bar_w) bar = "█" * filled + "░" * (bar_w - filled) mb = cur * 2352 / 1_048_576 # rough size in MB print(f"\r [{bar}] {pct:5.1%} {mb:5.1f} MB", end="", flush=True) continue # ── CDDB / MusicBrainz album header if header_re.search(line): print(f"\n {line.strip()}", flush=True) continue # ── CDDB track lines "1: Artist - Title" m = cddb_re.match(line) if m: cddb_lines.append(line) continue # ── Other important info (errors, status) stripped = line.strip() if stripped and any(kw in line for kw in ( "Retrieved", "Selected", "Finished", "MusicBrainz", "Error", "ERROR", "Cannot", "failed", "No tracks", )): print(f"\n {stripped}", flush=True) returncode = process.wait() # Newline after last progress bar print(flush=True) tracks = _parse_cddb_lines(cddb_lines) if use_cddb else None return tracks, returncode def _extract_tracks(output_dir: Path, audio_format: AudioFormat) -> list[Path]: """Find abcde track files recursively and move them flat into output_dir. abcde stores encoded files inside its temp dir as: output_dir/abcde.XXXX/track01.flac Moves them to: output_dir/track01.flac Args: output_dir: Directory to search and target for flat layout audio_format: Audio format Returns: Sorted list of moved files in output_dir """ ext = audio_format.extension.lstrip(".") pattern = re.compile(rf"^track(\d+)\.{ext}$", re.IGNORECASE) moved = [] for file in sorted(output_dir.rglob("*")): if file.is_file() and pattern.match(file.name): dest = output_dir / file.name if file != dest: logger.info("Extracting: %s", file.name) file.rename(dest) moved.append(dest) return moved def _rename_files( output_dir: Path, tracks: list[TrackInfo], audio_format: AudioFormat, ) -> None: """Rename track files according to naming scheme. Input: track01.flac, track02.flac, ... Output: 01_-_title_-_artist.flac, ... Falls back to plain 01.flac etc. for tracks without CDDB info. Args: output_dir: Directory with files tracks: Track information from CDDB audio_format: Audio format """ ext = audio_format.extension.lstrip(".") abcde_pattern = re.compile(rf"^track(\d+)\.{ext}$", re.IGNORECASE) by_num = {t.track_number: t for t in tracks} for file in sorted(output_dir.glob(f"track*.{ext}")): m = abcde_pattern.match(file.name) if not m: continue num = int(m.group(1)) track = by_num.get(num) if track: new_name = ( f"{num:02d}_-_{_sanitize_name(track.title)}_-_" f"{_sanitize_name(track.artist)}{audio_format.extension}" ) else: new_name = f"{num:02d}{audio_format.extension}" new_path = output_dir / new_name if file != new_path: logger.info("Renaming: %s → %s", file.name, new_name) print(f" {file.name} → {new_name}", flush=True) file.rename(new_path) def _rip_with_abcde( device: str, output_dir: Path, audio_format: AudioFormat, quality: str = "high", parallel_jobs: int = 1, use_pipes: bool = False, use_cddb: bool = True, ) -> tuple[list[Path], list[TrackInfo] | None]: """Rip a CD with abcde directly to desired format. Args: device: CD drive, e.g. '/dev/cdrom' output_dir: Target directory for files audio_format: Output audio format quality: Quality setting (low, medium, high) parallel_jobs: Number of parallel encoder processes use_pipes: True = faster, no WAV files use_cddb: True = use CDDB lookup Returns: Tuple (list of created files, track information or None) """ output_dir.mkdir(parents=True, exist_ok=True) # abcde options: # -a actions: cddb+read+encode+tag (no 'move' — we extract files ourselves) # -p: pad track numbers with zeros # -o format: output format # -d device: CD drive # -x: eject CD after ripping # -N: non-interactive (auto-select first CDDB match, no prompts) cmd = [ "abcde", "-p", "-o", audio_format.get_abcde_format(), "-d", device, "-x", "-N", ] if use_cddb: cmd.extend(["-a", "cddb,read,encode,tag"]) else: cmd.extend(["-a", "read,encode"]) if parallel_jobs > 1: cmd.extend(["-j", str(parallel_jobs)]) if use_pipes: cmd.append("-P") encoder_opts = audio_format.get_encoder_options(quality) if encoder_opts: cmd[-2] = f"{audio_format.get_abcde_format()}:{encoder_opts}" print(f"\n Command: {' '.join(cmd)}", flush=True) logger.info("Starting abcde: %s", " ".join(cmd)) process = subprocess.Popen( cmd, cwd=str(output_dir), stdout=subprocess.PIPE, stderr=subprocess.STDOUT, # merge stderr into stdout text=True, bufsize=1, # line-buffered ) tracks, returncode = _stream_abcde(process, use_cddb) if returncode != 0: raise RuntimeError(f"abcde failed (exit {returncode}).") if use_cddb: if tracks: print(f"\n CDDB: {len(tracks)} tracks found", flush=True) logger.info("CDDB data: %d tracks", len(tracks)) else: print("\n CDDB: no track data found", flush=True) logger.warning("CDDB lookup returned no track data") # Extract track files from abcde's temp dir into output_dir (flat) audio_files = _extract_tracks(output_dir, audio_format) if not audio_files: raise RuntimeError( "No audio files found after ripping. " "Check that a CD is in the drive." ) logger.info("Ripping completed: %d tracks in %s", len(audio_files), output_dir) return audio_files, tracks def rip_disc( device: str, output_dir: Path, audio_format: AudioFormat = AudioFormat.FLAC, quality: str = "high", parallel_jobs: int = 1, use_pipes: bool = False, use_cddb: bool = True, ) -> tuple[Path, str | None, list[TrackInfo] | None]: """Rip a CD directly to the desired format. Args: device: CD drive, e.g. '/dev/cdrom' output_dir: Target directory for files audio_format: Output audio format quality: Quality setting (low, medium, high) parallel_jobs: Number of parallel encoder processes use_pipes: True = faster, no WAV files use_cddb: True = use CDDB lookup Returns: Tuple (directory path, album name or None, track information or None) """ _, tracks = _rip_with_abcde( device, output_dir, audio_format, quality, parallel_jobs, use_pipes, use_cddb ) album_name = None if tracks: album_name = tracks[0].artist print("\n Renaming files ...", flush=True) _rename_files(output_dir, tracks, audio_format) return output_dir, album_name, tracks def interactive_rip(config: RipperConfig) -> None: """Interactive rip workflow for multiple CDs. Files are placed under config.output_dir: Album_Name/CD1/01_-_title_-_artist.flac, ... Args: config: Ripper configuration """ print("\n" + "=" * 60) print(" Musiksammlung - Interactive CD Ripper (abcde)") print("=" * 60) print(f"\nCD Drive: {config.device}") print(f"Audio Format: {config.audio_format.value}") print(f"Quality: {config.quality}") print(f"CDDB Lookup: {config.use_cddb}") print(f"Parallel Encodes: {config.parallel_jobs}") print(f"Pipes: {config.use_pipes}") print(f"Output Directory: {config.output_dir.absolute()}") print("\nNote: Do not use arrow keys while typing — press Enter to confirm.\n") album_counter = 1 while True: print(f"\n--- Album {album_counter} ---") raw = input("Album name (Enter = CDDB name / default 'Album{N}'): ") album_name = _clean_input(raw) if not album_name: album_name = f"Album{album_counter}" disc_counter = 1 while True: print(f"\n Album: {album_name}") print(f" CD Drive: {config.device}") raw_disc = input(" CD number [1]: ") disc_num = int(_clean_input(raw_disc)) if _clean_input(raw_disc) else 1 disc_dir = ( config.output_dir / _sanitize_name(album_name) / f"CD{disc_num}" ) print(f"\n Ripping to: {disc_dir}") print(" " + "-" * 50) try: _, detected_album, tracks = rip_disc( device=config.device, output_dir=disc_dir, audio_format=config.audio_format, quality=config.quality, parallel_jobs=config.parallel_jobs, use_pipes=config.use_pipes, use_cddb=config.use_cddb, ) print("\n " + "-" * 50) if tracks: print(f" ✓ Done — {len(tracks)} tracks") for t in tracks: print(f" {t.track_number:2d}. {t.title} [{t.artist}]") else: print(" ✓ Done (no CDDB data)") except RuntimeError as e: print(f"\n ✗ Error: {e}") raw_retry = input(" Try again? (y/n): ") if _clean_input(raw_retry).lower() != "y": print(" Aborting disc.") break continue raw_next = input("\n Next CD for this album? (y/n): ") if _clean_input(raw_next).lower() != "y": break disc_counter += 1 raw_album = input("\nNext album? (y/n): ") if _clean_input(raw_album).lower() != "y": break album_counter += 1 print("\n" + "=" * 60) print("Ripping completed!") print(f"Files are in: {config.output_dir.absolute()}") print("\nNext steps:") print(" 1. Check filenames and tags") if config.use_cddb: print(" 2. Adjust tags/covers with 'musiksammlung apply'") else: print(" 2. Run 'musiksammlung scan' to extract metadata") print(" 3. Run 'musiksammlung apply' to organize & tag") print("=" * 60 + "\n")