"""CD-Ripping via abcde with interactive multi-disc workflow.""" from __future__ import annotations import logging import re import subprocess from pathlib import Path from typing import NamedTuple from pydantic import BaseModel from musiksammlung.config import AudioFormat logger = logging.getLogger(__name__) class TrackInfo(NamedTuple): """Track information from abcde.""" track_number: int artist: str title: str class RipperConfig(BaseModel): """Configuration for ripping process.""" device: str = "/dev/cdrom" audio_format: AudioFormat = AudioFormat.FLAC output_dir: Path = Path("temp") quality: str = "medium" # low, medium, high parallel_jobs: int = 1 # Number of parallel encoder processes use_pipes: bool = False # True = faster, no WAV files use_cddb: bool = True # Use CDDB lookup def _sanitize_name(name: str) -> str: """Remove problematic characters and replace spaces. Args: name: Original name Returns: Cleaned name (spaces -> underscores) """ # Replace spaces with underscores name = name.replace(" ", "_") # Keep umlauts and special characters # Only remove problematic filename characters name = re.sub(r'[<>:"/\\|?*]', "", name) # Remove leading/trailing underscores name = name.strip("_") return name def _parse_cddb_response(output: str) -> list[TrackInfo]: """Parse CDDB data from abcde output. Args: output: abcde stdout/stderr output Returns: List of track information """ tracks = [] # Pattern: "N: Artist - Title" pattern = re.compile(r"^\s*(\d+):\s*(.+?)\s*-\s*(.+)$") for line in output.split("\n"): match = pattern.match(line) if match: track_num = int(match.group(1)) artist = match.group(2).strip() title = match.group(3).strip() tracks.append(TrackInfo(track_num, artist, title)) return tracks def _get_audio_files(output_dir: Path, audio_format: AudioFormat) -> list[Path]: """Find all audio files in directory (case-insensitive). Args: output_dir: Target directory audio_format: Audio format Returns: Sorted list of found files """ # Regex pattern for case-insensitive search ext = audio_format.extension.lstrip(".") pattern = re.compile(rf".*\.{ext}$", re.IGNORECASE) audio_files = [] for file in output_dir.iterdir(): if file.is_file() and pattern.match(file.name): audio_files.append(file) return sorted(audio_files) def _rename_files( output_dir: Path, tracks: list[TrackInfo], audio_format: AudioFormat, ) -> None: """Rename files according to naming scheme. Format: _-_title_-_artist.extension Args: output_dir: Directory with files tracks: Track information audio_format: Audio format """ audio_files = _get_audio_files(output_dir, audio_format) # Pattern for abcde filenames: 01, 02, ..., 10, 11, ... abcde_pattern = re.compile(r"^(\d+)\.") for track in tracks: # Find matching file for file in audio_files: match = abcde_pattern.match(file.name) if match and int(match.group(1)) == track.track_number: # New name: _-_title_-_artist.extension track_num_padded = f"{track.track_number:02d}" artist_clean = _sanitize_name(track.artist) title_clean = _sanitize_name(track.title) new_name = ( f"{track_num_padded}_-_{title_clean}_-_" f"{artist_clean}{audio_format.extension}" ) old_path = file new_path = output_dir / new_name if old_path != new_path: logger.info("Renaming: %s -> %s", old_path.name, new_name) old_path.rename(new_path) break def _rip_with_abcde( device: str, output_dir: Path, audio_format: AudioFormat, quality: str = "medium", parallel_jobs: int = 1, use_pipes: bool = False, use_cddb: bool = True, ) -> tuple[list[Path], list[TrackInfo] | None]: """Rip a CD with abcde directly to desired format. Args: device: CD drive, e.g. '/dev/cdrom' output_dir: Target directory for files audio_format: Output audio format quality: Quality setting (low, medium, high) parallel_jobs: Number of parallel encoder processes use_pipes: True = faster, no WAV files use_cddb: True = use CDDB lookup Returns: Tuple (list of created files, track information or None) """ output_dir.mkdir(parents=True, exist_ok=True) # abcde options: # -a: cddb,read,encode,tag if use_cddb, else read,encode # -p: pad track numbers with zeros # -o format: output format # -d device: CD drive # -x: eject CD after ripping # -N: non-interactive (no prompts) cmd = [ "abcde", "-p", "-o", audio_format.get_abcde_format(), "-d", device, "-x", "-N", ] # Actions if use_cddb: cmd.extend(["-a", "cddb,read,encode,tag"]) else: cmd.extend(["-a", "read,encode"]) # Parallel encodes if parallel_jobs > 1: cmd.extend(["-j", str(parallel_jobs)]) # Use pipes if use_pipes: cmd.append("-P") # Encoder options for quality encoder_opts = audio_format.get_encoder_options(quality) if encoder_opts: # abcde accepts encoder options with colon # Format: -o format:options cmd[-2] = f"{audio_format.get_abcde_format()}:{encoder_opts}" logger.info("Starting abcde in %s (Format: %s, Quality: %s, CDDB: %s)", output_dir, audio_format.value, quality, use_cddb) # Run abcde non-interactively result = subprocess.run( cmd, cwd=str(output_dir), capture_output=True, text=True, ) if result.returncode != 0: raise RuntimeError( f"abcde failed (exit {result.returncode}). " "Check if a CD is in the drive and readable." ) # Track information from CDDB parsing tracks = None if use_cddb: tracks = _parse_cddb_response(result.stdout) if tracks: logger.info("CDDB data found: %d tracks", len(tracks)) # Find files (case-insensitive) audio_files = _get_audio_files(output_dir, audio_format) if not audio_files: raise RuntimeError("No files created by abcde") logger.info("Ripping completed: %d tracks in %s", len(audio_files), output_dir) return audio_files, tracks def rip_disc( device: str, output_dir: Path, audio_format: AudioFormat = AudioFormat.FLAC, quality: str = "medium", parallel_jobs: int = 1, use_pipes: bool = False, use_cddb: bool = True, ) -> tuple[Path, str | None, list[TrackInfo] | None]: """Rip a CD directly to the desired format. Args: device: CD drive, e.g. '/dev/cdrom' output_dir: Target directory for files audio_format: Output audio format quality: Quality setting (low, medium, high) parallel_jobs: Number of parallel encoder processes use_pipes: True = faster, no WAV files use_cddb: True = use CDDB lookup Returns: Tuple (directory path, album name or None, track information or None) """ _, tracks = _rip_with_abcde( device, output_dir, audio_format, quality, parallel_jobs, use_pipes, use_cddb ) # Extract album name from first track (artist part) album_name = None if tracks and len(tracks) > 0: # For Various Artists, this will be "Sampler" or similar # For single artist, this will be the artist name album_name = tracks[0].artist # If CDDB data available, rename files if tracks: _rename_files(output_dir, tracks, audio_format) return output_dir, album_name, tracks def interactive_rip( config: RipperConfig, ) -> None: """Interactive rip workflow for multiple CDs. Prompts for each album/CD: - Album name (or empty for default 'Album{N}') - CD number (e.g., 1, 2, ...) - Optional continuation Files are placed under config.output_dir: temp/Album_Name/CD1/01_-_title_-_artist.flac, ... If CDDB is available, files are automatically named. Args: config: Ripper configuration """ print("\n" + "=" * 60) print(" Musiksammlung - Interactive CD Ripper (abcde)") print("=" * 60) print(f"\nCD Drive: {config.device}") print(f"Audio Format: {config.audio_format.value}") print(f"Quality: {config.quality}") print(f"CDDB Lookup: {config.use_cddb}") print(f"Parallel Encodes: {config.parallel_jobs}") print(f"Pipes: {config.use_pipes}") print(f"Output Directory: {config.output_dir.absolute()}\n") album_counter = 1 while True: print(f"\n--- Album {album_counter} ---") # Ask for album name (optional, overridden if CDDB available) album_name = input( "Enter album name (or Enter for CDDB/default 'Album{N}'): " ).strip() default_album_name = album_name if album_name else f"Album{album_counter}" disc_counter = 1 while True: print(f"\n Album: {default_album_name}") print(f" CD Drive: {config.device}") # Ask for disc number disc_input = input( " CD number for this CD [1]: " ).strip() disc_num = int(disc_input) if disc_input else 1 # Build target directory disc_dir = ( config.output_dir / _sanitize_name(default_album_name) / f"CD{disc_num}" ) print(f" Ripping CD to: {disc_dir.relative_to(config.output_dir)}") print(" (Ripping in progress, please wait...)") try: _, detected_album, tracks = rip_disc( device=config.device, output_dir=disc_dir, audio_format=config.audio_format, quality=config.quality, parallel_jobs=config.parallel_jobs, use_pipes=config.use_pipes, use_cddb=config.use_cddb, ) # Show detected information if tracks and detected_album: print(f" ✓ CD {disc_num} ripped successfully") print(f" Detected: {detected_album}") if len(tracks) > 0: print(f" Tracks: {len(tracks)}") # Show first and last track first = tracks[0] last = tracks[-1] if len(tracks) > 1 else None print(f" {first.track_number}. {first.title} ({first.artist})") if last: print(f" ... {last.track_number}. {last.title} ({last.artist})") else: print(f" ✓ CD {disc_num} ripped successfully") except RuntimeError as e: print(f" ✗ Ripping error: {e}") retry = input(" Try again? (y/n): ").strip().lower() if retry != "y": print(" Aborting disc.") break continue # Continue? next_disc = input( " Next CD for this album? (y/n): " ).strip().lower() if next_disc != "y": break disc_counter += 1 # Next album? next_album = input("\nNext album? (y/n): ").strip().lower() if next_album != "y": break album_counter += 1 print("\n" + "=" * 60) print("Ripping completed!") print(f"\nFiles are in: {config.output_dir.absolute()}") print("\nNext steps:") print(" 1. Check filenames and tags") if config.use_cddb: print(" 2. Adjust tags and covers with 'musiksammlung apply'") else: print(" 2. Scan CD cover images") print(" 3. 'musiksammlung scan' for album JSON") print(" 4. 'musiksammlung apply' to organize & tag") print("=" * 60 + "\n")