"""CD-Ripping via abcde with interactive multi-disc workflow.""" from __future__ import annotations import logging import re import subprocess from pathlib import Path from typing import NamedTuple from pydantic import BaseModel from musiksammlung.config import AudioFormat logger = logging.getLogger(__name__) # ANSI escape sequence pattern (e.g. arrow keys from broken readline) _ANSI_ESC = re.compile(r"(\x1b|\^)\[[\d;]*[A-Za-z@]?") class TrackInfo(NamedTuple): """Track information from abcde.""" track_number: int artist: str title: str class RipperConfig(BaseModel): """Configuration for ripping process.""" device: str = "/dev/cdrom" audio_format: AudioFormat = AudioFormat.FLAC output_dir: Path = Path("temp") quality: str = "high" # low, medium, high parallel_jobs: int = 1 # Number of parallel encoder processes use_pipes: bool = False # True = faster, no WAV files use_cddb: bool = True # Use CDDB lookup def _clean_input(raw: str) -> str: """Strip ANSI escape codes, control characters and surrounding quotes. Handles broken readline environments where arrow keys produce literal escape sequences like ^[[D instead of moving the cursor. Args: raw: Raw string from input() Returns: Cleaned string """ # Remove ANSI escape sequences (\x1b[... and ^[[...) cleaned = _ANSI_ESC.sub("", raw) # Remove remaining control characters (backspace \x08, etc.) cleaned = re.sub(r"[\x00-\x1f\x7f]", "", cleaned) # Strip surrounding whitespace and quotes cleaned = cleaned.strip().strip('"\'') return cleaned def _sanitize_name(name: str) -> str: """Remove problematic characters and replace spaces. Args: name: Original name Returns: Cleaned name (spaces -> underscores) """ # Replace spaces with underscores name = name.replace(" ", "_") # Keep umlauts and special characters # Only remove problematic filename characters name = re.sub(r'[<>:"/\\|?*]', "", name) # Remove leading/trailing underscores name = name.strip("_") return name def _parse_cddb_response(output: str) -> list[TrackInfo]: """Parse CDDB data from abcde output. Args: output: abcde stdout/stderr output Returns: List of track information """ tracks = [] # Pattern: "N: Artist - Title" pattern = re.compile(r"^\s*(\d+):\s*(.+?)\s*-\s*(.+)$") for line in output.split("\n"): match = pattern.match(line) if match: track_num = int(match.group(1)) artist = match.group(2).strip() title = match.group(3).strip() tracks.append(TrackInfo(track_num, artist, title)) return tracks def _extract_tracks(output_dir: Path, audio_format: AudioFormat) -> list[Path]: """Find abcde track files recursively and move them flat into output_dir. abcde stores encoded files inside its temp dir as: output_dir/abcde.XXXX/track01.flac output_dir/abcde.XXXX/track02.flac ... This function moves them to: output_dir/track01.flac output_dir/track02.flac ... Args: output_dir: Directory to search and target for flat layout audio_format: Audio format Returns: Sorted list of moved files in output_dir """ ext = audio_format.extension.lstrip(".") # abcde names files trackNN.ext (with -p: track01, track02, ...) pattern = re.compile(rf"^track(\d+)\.{ext}$", re.IGNORECASE) moved = [] for file in sorted(output_dir.rglob("*")): if file.is_file() and pattern.match(file.name): dest = output_dir / file.name if file != dest: file.rename(dest) moved.append(dest) return moved def _rename_files( output_dir: Path, tracks: list[TrackInfo], audio_format: AudioFormat, ) -> None: """Rename track files according to naming scheme. Expected input: track01.flac, track02.flac, ... Output: 01_-_title_-_artist.flac, 02_-_title_-_artist.flac, ... Args: output_dir: Directory with files tracks: Track information from CDDB audio_format: Audio format """ ext = audio_format.extension.lstrip(".") # Matches track01.flac, track02.flac, ... (abcde naming) abcde_pattern = re.compile(rf"^track(\d+)\.{ext}$", re.IGNORECASE) audio_files = sorted(output_dir.glob(f"track*.{ext}")) for track in tracks: for file in audio_files: match = abcde_pattern.match(file.name) if match and int(match.group(1)) == track.track_number: track_num_padded = f"{track.track_number:02d}" artist_clean = _sanitize_name(track.artist) title_clean = _sanitize_name(track.title) new_name = ( f"{track_num_padded}_-_{title_clean}_-_" f"{artist_clean}{audio_format.extension}" ) new_path = output_dir / new_name if file != new_path: logger.info("Renaming: %s -> %s", file.name, new_name) file.rename(new_path) break # Rename remaining track files without CDDB info (fallback: 01.flac, ...) for file in sorted(output_dir.glob(f"track*.{ext}")): match = abcde_pattern.match(file.name) if match: num = int(match.group(1)) new_path = output_dir / f"{num:02d}{audio_format.extension}" if file != new_path: logger.info("Renaming (no CDDB): %s -> %s", file.name, new_path.name) file.rename(new_path) def _rip_with_abcde( device: str, output_dir: Path, audio_format: AudioFormat, quality: str = "medium", parallel_jobs: int = 1, use_pipes: bool = False, use_cddb: bool = True, ) -> tuple[list[Path], list[TrackInfo] | None]: """Rip a CD with abcde directly to desired format. Args: device: CD drive, e.g. '/dev/cdrom' output_dir: Target directory for files audio_format: Output audio format quality: Quality setting (low, medium, high) parallel_jobs: Number of parallel encoder processes use_pipes: True = faster, no WAV files use_cddb: True = use CDDB lookup Returns: Tuple (list of created files, track information or None) """ output_dir.mkdir(parents=True, exist_ok=True) # abcde options: # -a actions: cddb+read+encode+tag (no 'move' — we extract files ourselves) # -p: pad track numbers with zeros # -o format: output format # -d device: CD drive # -x: eject CD after ripping # -N: non-interactive (no prompts, auto-select first CDDB match) cmd = [ "abcde", "-p", "-o", audio_format.get_abcde_format(), "-d", device, "-x", "-N", ] if use_cddb: cmd.extend(["-a", "cddb,read,encode,tag"]) else: cmd.extend(["-a", "read,encode"]) # Parallel encodes if parallel_jobs > 1: cmd.extend(["-j", str(parallel_jobs)]) # Use pipes if use_pipes: cmd.append("-P") # Encoder options for quality encoder_opts = audio_format.get_encoder_options(quality) if encoder_opts: # abcde accepts encoder options with colon: -o format:options cmd[-2] = f"{audio_format.get_abcde_format()}:{encoder_opts}" logger.info( "Starting abcde in %s (Format: %s, Quality: %s, CDDB: %s)", output_dir, audio_format.value, quality, use_cddb, ) logger.debug("Command: %s", " ".join(cmd)) # Run abcde non-interactively, capture output for CDDB parsing result = subprocess.run( cmd, cwd=str(output_dir), capture_output=True, text=True, ) # Log output for debugging if result.stdout: logger.debug("abcde stdout:\n%s", result.stdout) if result.stderr: logger.debug("abcde stderr:\n%s", result.stderr) if result.returncode != 0: raise RuntimeError( f"abcde failed (exit {result.returncode}).\n" f"{result.stderr or result.stdout}" ) # Parse track info from CDDB output tracks = None if use_cddb: combined = result.stdout + result.stderr tracks = _parse_cddb_response(combined) if tracks: logger.info("CDDB data found: %d tracks", len(tracks)) # Extract track files from abcde's temp dir into output_dir (flat) audio_files = _extract_tracks(output_dir, audio_format) if not audio_files: raise RuntimeError( "No audio files found after ripping.\n" "abcde output:\n" + (result.stderr or result.stdout) ) logger.info("Ripping completed: %d tracks in %s", len(audio_files), output_dir) return audio_files, tracks def rip_disc( device: str, output_dir: Path, audio_format: AudioFormat = AudioFormat.FLAC, quality: str = "medium", parallel_jobs: int = 1, use_pipes: bool = False, use_cddb: bool = True, ) -> tuple[Path, str | None, list[TrackInfo] | None]: """Rip a CD directly to the desired format. Args: device: CD drive, e.g. '/dev/cdrom' output_dir: Target directory for files audio_format: Output audio format quality: Quality setting (low, medium, high) parallel_jobs: Number of parallel encoder processes use_pipes: True = faster, no WAV files use_cddb: True = use CDDB lookup Returns: Tuple (directory path, album name or None, track information or None) """ _, tracks = _rip_with_abcde( device, output_dir, audio_format, quality, parallel_jobs, use_pipes, use_cddb ) album_name = None if tracks: album_name = tracks[0].artist _rename_files(output_dir, tracks, audio_format) return output_dir, album_name, tracks def interactive_rip(config: RipperConfig) -> None: """Interactive rip workflow for multiple CDs. Files are placed under config.output_dir: Album_Name/CD1/01_-_title_-_artist.flac, ... Args: config: Ripper configuration """ print("\n" + "=" * 60) print(" Musiksammlung - Interactive CD Ripper (abcde)") print("=" * 60) print(f"\nCD Drive: {config.device}") print(f"Audio Format: {config.audio_format.value}") print(f"Quality: {config.quality}") print(f"CDDB Lookup: {config.use_cddb}") print(f"Parallel Encodes: {config.parallel_jobs}") print(f"Pipes: {config.use_pipes}") print(f"Output Directory: {config.output_dir.absolute()}") print("\nNote: Do not use arrow keys while typing — press Enter to confirm.\n") album_counter = 1 while True: print(f"\n--- Album {album_counter} ---") raw = input("Album name (Enter = CDDB name / default 'Album{N}'): ") album_name = _clean_input(raw) if not album_name: album_name = f"Album{album_counter}" disc_counter = 1 while True: print(f"\n Album: {album_name}") print(f" CD Drive: {config.device}") raw_disc = input(" CD number [1]: ") disc_num = int(_clean_input(raw_disc)) if _clean_input(raw_disc) else 1 disc_dir = ( config.output_dir / _sanitize_name(album_name) / f"CD{disc_num}" ) print(f" Ripping to: {disc_dir.relative_to(config.output_dir)}") print(" (Ripping in progress, please wait...)") try: _, detected_album, tracks = rip_disc( device=config.device, output_dir=disc_dir, audio_format=config.audio_format, quality=config.quality, parallel_jobs=config.parallel_jobs, use_pipes=config.use_pipes, use_cddb=config.use_cddb, ) if tracks: print(f" ✓ CD {disc_num} ripped successfully — {len(tracks)} tracks") first = tracks[0] last = tracks[-1] print(f" {first.track_number:2d}. {first.title} — {first.artist}") if last != first: print(f" {last.track_number:2d}. {last.title} — {last.artist}") else: print(f" ✓ CD {disc_num} ripped successfully") except RuntimeError as e: print(f" ✗ Ripping error: {e}") raw_retry = input(" Try again? (y/n): ") if _clean_input(raw_retry).lower() != "y": print(" Aborting disc.") break continue raw_next = input(" Next CD for this album? (y/n): ") if _clean_input(raw_next).lower() != "y": break disc_counter += 1 raw_album = input("\nNext album? (y/n): ") if _clean_input(raw_album).lower() != "y": break album_counter += 1 print("\n" + "=" * 60) print("Ripping completed!") print(f"Files are in: {config.output_dir.absolute()}") print("\nNext steps:") print(" 1. Check filenames and tags") if config.use_cddb: print(" 2. Adjust tags/covers with 'musiksammlung apply'") else: print(" 2. Run 'musiksammlung scan' to extract metadata") print(" 3. Run 'musiksammlung apply' to organize & tag") print("=" * 60 + "\n")