"""CD-Ripping via abcde with interactive multi-disc workflow.""" from __future__ import annotations import logging import re import subprocess from pathlib import Path from typing import NamedTuple from pydantic import BaseModel from musiksammlung.config import AudioFormat logger = logging.getLogger(__name__) # ANSI escape sequence pattern (e.g. arrow keys from broken readline) _ANSI_ESC = re.compile(r"(\x1b|\^)\[[\d;]*[A-Za-z@]?") class TrackInfo(NamedTuple): """Track information from abcde.""" track_number: int artist: str title: str class RipperConfig(BaseModel): """Configuration for ripping process.""" device: str = "/dev/cdrom" audio_format: AudioFormat = AudioFormat.FLAC output_dir: Path = Path("temp") quality: str = "medium" # low, medium, high parallel_jobs: int = 1 # Number of parallel encoder processes use_pipes: bool = False # True = faster, no WAV files use_cddb: bool = True # Use CDDB lookup def _clean_input(raw: str) -> str: """Strip ANSI escape codes, control characters and surrounding quotes. Handles broken readline environments where arrow keys produce literal escape sequences like ^[[D instead of moving the cursor. Args: raw: Raw string from input() Returns: Cleaned string """ # Remove ANSI escape sequences (\x1b[... and ^[[...) cleaned = _ANSI_ESC.sub("", raw) # Remove remaining control characters (backspace \x08, etc.) cleaned = re.sub(r"[\x00-\x1f\x7f]", "", cleaned) # Strip surrounding whitespace and quotes cleaned = cleaned.strip().strip('"\'') return cleaned def _sanitize_name(name: str) -> str: """Remove problematic characters and replace spaces. Args: name: Original name Returns: Cleaned name (spaces -> underscores) """ # Replace spaces with underscores name = name.replace(" ", "_") # Keep umlauts and special characters # Only remove problematic filename characters name = re.sub(r'[<>:"/\\|?*]', "", name) # Remove leading/trailing underscores name = name.strip("_") return name def _parse_cddb_response(output: str) -> list[TrackInfo]: """Parse CDDB data from abcde output. Args: output: abcde stdout/stderr output Returns: List of track information """ tracks = [] # Pattern: "N: Artist - Title" pattern = re.compile(r"^\s*(\d+):\s*(.+?)\s*-\s*(.+)$") for line in output.split("\n"): match = pattern.match(line) if match: track_num = int(match.group(1)) artist = match.group(2).strip() title = match.group(3).strip() tracks.append(TrackInfo(track_num, artist, title)) return tracks def _get_audio_files(output_dir: Path, audio_format: AudioFormat) -> list[Path]: """Find all audio files in directory recursively (case-insensitive). Args: output_dir: Target directory audio_format: Audio format Returns: Sorted list of found files """ # Regex pattern for case-insensitive search ext = audio_format.extension.lstrip(".") pattern = re.compile(rf".*\.{ext}$", re.IGNORECASE) audio_files = [] # rglob: search recursively so abcde subdirs are also covered for file in output_dir.rglob("*"): if file.is_file() and pattern.match(file.name): audio_files.append(file) return sorted(audio_files) def _write_abcde_config(output_dir: Path) -> Path: """Write a temporary abcde config file. Sets OUTPUTDIR to output_dir and uses a flat filename format (track number only) so we can rename files ourselves afterward. Args: output_dir: Directory where encoded files should be placed Returns: Path to the config file """ config = f"""\ OUTPUTDIR="{output_dir}" OUTPUTFORMAT="${{TRACKNUM}}" VAOUTPUTFORMAT="${{TRACKNUM}}" ONETRACKOUTPUTFORMAT="${{TRACKNUM}}" PLAYLISTFORMAT="" """ config_path = output_dir / ".abcde.conf" config_path.write_text(config, encoding="utf-8") return config_path def _rename_files( output_dir: Path, tracks: list[TrackInfo], audio_format: AudioFormat, ) -> None: """Rename files according to naming scheme. Format: _-_title_-_artist.extension Args: output_dir: Directory with files tracks: Track information audio_format: Audio format """ audio_files = _get_audio_files(output_dir, audio_format) # Pattern for abcde filenames: 01, 02, ..., 10, 11, ... abcde_pattern = re.compile(r"^(\d+)\.") for track in tracks: # Find matching file for file in audio_files: match = abcde_pattern.match(file.name) if match and int(match.group(1)) == track.track_number: # New name: _-_title_-_artist.extension track_num_padded = f"{track.track_number:02d}" artist_clean = _sanitize_name(track.artist) title_clean = _sanitize_name(track.title) new_name = ( f"{track_num_padded}_-_{title_clean}_-_" f"{artist_clean}{audio_format.extension}" ) new_path = output_dir / new_name if file != new_path: logger.info("Renaming: %s -> %s", file.name, new_name) file.rename(new_path) break def _rip_with_abcde( device: str, output_dir: Path, audio_format: AudioFormat, quality: str = "medium", parallel_jobs: int = 1, use_pipes: bool = False, use_cddb: bool = True, ) -> tuple[list[Path], list[TrackInfo] | None]: """Rip a CD with abcde directly to desired format. Args: device: CD drive, e.g. '/dev/cdrom' output_dir: Target directory for files audio_format: Output audio format quality: Quality setting (low, medium, high) parallel_jobs: Number of parallel encoder processes use_pipes: True = faster, no WAV files use_cddb: True = use CDDB lookup Returns: Tuple (list of created files, track information or None) """ output_dir.mkdir(parents=True, exist_ok=True) # Write abcde config: controls OUTPUTDIR and flat filename format config_path = _write_abcde_config(output_dir) # abcde options: # -c config: use our config (OUTPUTDIR, OUTPUTFORMAT) # -a actions: cddb+read+encode+tag+move, or read+encode+move # -p: pad track numbers with zeros # -o format: output format # -d device: CD drive # -x: eject CD after ripping # -N: non-interactive (no prompts, auto-select first CDDB match) cmd = [ "abcde", "-c", str(config_path), "-p", "-o", audio_format.get_abcde_format(), "-d", device, "-x", "-N", ] # Actions — move is required so files land in OUTPUTDIR if use_cddb: cmd.extend(["-a", "cddb,read,encode,tag,move"]) else: cmd.extend(["-a", "read,encode,move"]) # Parallel encodes if parallel_jobs > 1: cmd.extend(["-j", str(parallel_jobs)]) # Use pipes if use_pipes: cmd.append("-P") # Encoder options for quality encoder_opts = audio_format.get_encoder_options(quality) if encoder_opts: # abcde accepts encoder options with colon: -o format:options cmd[-2] = f"{audio_format.get_abcde_format()}:{encoder_opts}" logger.info( "Starting abcde in %s (Format: %s, Quality: %s, CDDB: %s)", output_dir, audio_format.value, quality, use_cddb, ) logger.debug("Command: %s", " ".join(cmd)) # Run abcde non-interactively, capture output for CDDB parsing result = subprocess.run( cmd, cwd=str(output_dir), capture_output=True, text=True, ) # Log output for debugging if result.stdout: logger.debug("abcde stdout:\n%s", result.stdout) if result.stderr: logger.debug("abcde stderr:\n%s", result.stderr) if result.returncode != 0: raise RuntimeError( f"abcde failed (exit {result.returncode}).\n" f"{result.stderr or result.stdout}" ) # Parse track info from CDDB output tracks = None if use_cddb: combined = result.stdout + result.stderr tracks = _parse_cddb_response(combined) if tracks: logger.info("CDDB data found: %d tracks", len(tracks)) # Find files (case-insensitive, recursive) audio_files = _get_audio_files(output_dir, audio_format) if not audio_files: raise RuntimeError( "No audio files found after ripping.\n" "abcde output:\n" + (result.stderr or result.stdout) ) logger.info("Ripping completed: %d tracks in %s", len(audio_files), output_dir) return audio_files, tracks def rip_disc( device: str, output_dir: Path, audio_format: AudioFormat = AudioFormat.FLAC, quality: str = "medium", parallel_jobs: int = 1, use_pipes: bool = False, use_cddb: bool = True, ) -> tuple[Path, str | None, list[TrackInfo] | None]: """Rip a CD directly to the desired format. Args: device: CD drive, e.g. '/dev/cdrom' output_dir: Target directory for files audio_format: Output audio format quality: Quality setting (low, medium, high) parallel_jobs: Number of parallel encoder processes use_pipes: True = faster, no WAV files use_cddb: True = use CDDB lookup Returns: Tuple (directory path, album name or None, track information or None) """ _, tracks = _rip_with_abcde( device, output_dir, audio_format, quality, parallel_jobs, use_pipes, use_cddb ) album_name = None if tracks: album_name = tracks[0].artist _rename_files(output_dir, tracks, audio_format) return output_dir, album_name, tracks def interactive_rip(config: RipperConfig) -> None: """Interactive rip workflow for multiple CDs. Files are placed under config.output_dir: Album_Name/CD1/01_-_title_-_artist.flac, ... Args: config: Ripper configuration """ print("\n" + "=" * 60) print(" Musiksammlung - Interactive CD Ripper (abcde)") print("=" * 60) print(f"\nCD Drive: {config.device}") print(f"Audio Format: {config.audio_format.value}") print(f"Quality: {config.quality}") print(f"CDDB Lookup: {config.use_cddb}") print(f"Parallel Encodes: {config.parallel_jobs}") print(f"Pipes: {config.use_pipes}") print(f"Output Directory: {config.output_dir.absolute()}") print("\nNote: Do not use arrow keys while typing — press Enter to confirm.\n") album_counter = 1 while True: print(f"\n--- Album {album_counter} ---") raw = input("Album name (Enter = CDDB name / default 'Album{N}'): ") album_name = _clean_input(raw) if not album_name: album_name = f"Album{album_counter}" disc_counter = 1 while True: print(f"\n Album: {album_name}") print(f" CD Drive: {config.device}") raw_disc = input(" CD number [1]: ") disc_num = int(_clean_input(raw_disc)) if _clean_input(raw_disc) else 1 disc_dir = ( config.output_dir / _sanitize_name(album_name) / f"CD{disc_num}" ) print(f" Ripping to: {disc_dir.relative_to(config.output_dir)}") print(" (Ripping in progress, please wait...)") try: _, detected_album, tracks = rip_disc( device=config.device, output_dir=disc_dir, audio_format=config.audio_format, quality=config.quality, parallel_jobs=config.parallel_jobs, use_pipes=config.use_pipes, use_cddb=config.use_cddb, ) if tracks: print(f" ✓ CD {disc_num} ripped successfully — {len(tracks)} tracks") first = tracks[0] last = tracks[-1] print(f" {first.track_number:2d}. {first.title} — {first.artist}") if last != first: print(f" {last.track_number:2d}. {last.title} — {last.artist}") else: print(f" ✓ CD {disc_num} ripped successfully") except RuntimeError as e: print(f" ✗ Ripping error: {e}") raw_retry = input(" Try again? (y/n): ") if _clean_input(raw_retry).lower() != "y": print(" Aborting disc.") break continue raw_next = input(" Next CD for this album? (y/n): ") if _clean_input(raw_next).lower() != "y": break disc_counter += 1 raw_album = input("\nNext album? (y/n): ") if _clean_input(raw_album).lower() != "y": break album_counter += 1 print("\n" + "=" * 60) print("Ripping completed!") print(f"Files are in: {config.output_dir.absolute()}") print("\nNext steps:") print(" 1. Check filenames and tags") if config.use_cddb: print(" 2. Adjust tags/covers with 'musiksammlung apply'") else: print(" 2. Run 'musiksammlung scan' to extract metadata") print(" 3. Run 'musiksammlung apply' to organize & tag") print("=" * 60 + "\n")