diff --git a/src/musiksammlung/ripper.py b/src/musiksammlung/ripper.py index 3e72f7c..0f59f62 100644 --- a/src/musiksammlung/ripper.py +++ b/src/musiksammlung/ripper.py @@ -14,6 +14,9 @@ from musiksammlung.config import AudioFormat logger = logging.getLogger(__name__) +# ANSI escape sequence pattern (e.g. arrow keys from broken readline) +_ANSI_ESC = re.compile(r"(\x1b|\^)\[[\d;]*[A-Za-z@]?") + class TrackInfo(NamedTuple): """Track information from abcde.""" @@ -35,6 +38,27 @@ class RipperConfig(BaseModel): use_cddb: bool = True # Use CDDB lookup +def _clean_input(raw: str) -> str: + """Strip ANSI escape codes, control characters and surrounding quotes. + + Handles broken readline environments where arrow keys produce + literal escape sequences like ^[[D instead of moving the cursor. + + Args: + raw: Raw string from input() + + Returns: + Cleaned string + """ + # Remove ANSI escape sequences (\x1b[... and ^[[...) + cleaned = _ANSI_ESC.sub("", raw) + # Remove remaining control characters (backspace \x08, etc.) + cleaned = re.sub(r"[\x00-\x1f\x7f]", "", cleaned) + # Strip surrounding whitespace and quotes + cleaned = cleaned.strip().strip('"\'') + return cleaned + + def _sanitize_name(name: str) -> str: """Remove problematic characters and replace spaces. @@ -79,7 +103,7 @@ def _parse_cddb_response(output: str) -> list[TrackInfo]: def _get_audio_files(output_dir: Path, audio_format: AudioFormat) -> list[Path]: - """Find all audio files in directory (case-insensitive). + """Find all audio files in directory recursively (case-insensitive). Args: output_dir: Target directory @@ -93,13 +117,38 @@ def _get_audio_files(output_dir: Path, audio_format: AudioFormat) -> list[Path]: pattern = re.compile(rf".*\.{ext}$", re.IGNORECASE) audio_files = [] - for file in output_dir.iterdir(): + # rglob: search recursively so abcde subdirs are also covered + for file in output_dir.rglob("*"): if file.is_file() and pattern.match(file.name): audio_files.append(file) return sorted(audio_files) +def _write_abcde_config(output_dir: Path) -> Path: + """Write a temporary abcde config file. + + Sets OUTPUTDIR to output_dir and uses a flat filename format + (track number only) so we can rename files ourselves afterward. + + Args: + output_dir: Directory where encoded files should be placed + + Returns: + Path to the config file + """ + config = f"""\ +OUTPUTDIR="{output_dir}" +OUTPUTFORMAT="${{TRACKNUM}}" +VAOUTPUTFORMAT="${{TRACKNUM}}" +ONETRACKOUTPUTFORMAT="${{TRACKNUM}}" +PLAYLISTFORMAT="" +""" + config_path = output_dir / ".abcde.conf" + config_path.write_text(config, encoding="utf-8") + return config_path + + def _rename_files( output_dir: Path, tracks: list[TrackInfo], @@ -133,12 +182,11 @@ def _rename_files( f"{artist_clean}{audio_format.extension}" ) - old_path = file new_path = output_dir / new_name - if old_path != new_path: - logger.info("Renaming: %s -> %s", old_path.name, new_name) - old_path.rename(new_path) + if file != new_path: + logger.info("Renaming: %s -> %s", file.name, new_name) + file.rename(new_path) break @@ -167,15 +215,20 @@ def _rip_with_abcde( """ output_dir.mkdir(parents=True, exist_ok=True) + # Write abcde config: controls OUTPUTDIR and flat filename format + config_path = _write_abcde_config(output_dir) + # abcde options: - # -a: cddb,read,encode,tag if use_cddb, else read,encode + # -c config: use our config (OUTPUTDIR, OUTPUTFORMAT) + # -a actions: cddb+read+encode+tag+move, or read+encode+move # -p: pad track numbers with zeros # -o format: output format # -d device: CD drive # -x: eject CD after ripping - # -N: non-interactive (no prompts) + # -N: non-interactive (no prompts, auto-select first CDDB match) cmd = [ "abcde", + "-c", str(config_path), "-p", "-o", audio_format.get_abcde_format(), "-d", device, @@ -183,11 +236,11 @@ def _rip_with_abcde( "-N", ] - # Actions + # Actions — move is required so files land in OUTPUTDIR if use_cddb: - cmd.extend(["-a", "cddb,read,encode,tag"]) + cmd.extend(["-a", "cddb,read,encode,tag,move"]) else: - cmd.extend(["-a", "read,encode"]) + cmd.extend(["-a", "read,encode,move"]) # Parallel encodes if parallel_jobs > 1: @@ -200,14 +253,16 @@ def _rip_with_abcde( # Encoder options for quality encoder_opts = audio_format.get_encoder_options(quality) if encoder_opts: - # abcde accepts encoder options with colon - # Format: -o format:options + # abcde accepts encoder options with colon: -o format:options cmd[-2] = f"{audio_format.get_abcde_format()}:{encoder_opts}" - logger.info("Starting abcde in %s (Format: %s, Quality: %s, CDDB: %s)", - output_dir, audio_format.value, quality, use_cddb) + logger.info( + "Starting abcde in %s (Format: %s, Quality: %s, CDDB: %s)", + output_dir, audio_format.value, quality, use_cddb, + ) + logger.debug("Command: %s", " ".join(cmd)) - # Run abcde non-interactively + # Run abcde non-interactively, capture output for CDDB parsing result = subprocess.run( cmd, cwd=str(output_dir), @@ -215,24 +270,34 @@ def _rip_with_abcde( text=True, ) + # Log output for debugging + if result.stdout: + logger.debug("abcde stdout:\n%s", result.stdout) + if result.stderr: + logger.debug("abcde stderr:\n%s", result.stderr) + if result.returncode != 0: raise RuntimeError( - f"abcde failed (exit {result.returncode}). " - "Check if a CD is in the drive and readable." + f"abcde failed (exit {result.returncode}).\n" + f"{result.stderr or result.stdout}" ) - # Track information from CDDB parsing + # Parse track info from CDDB output tracks = None if use_cddb: - tracks = _parse_cddb_response(result.stdout) + combined = result.stdout + result.stderr + tracks = _parse_cddb_response(combined) if tracks: logger.info("CDDB data found: %d tracks", len(tracks)) - # Find files (case-insensitive) + # Find files (case-insensitive, recursive) audio_files = _get_audio_files(output_dir, audio_format) if not audio_files: - raise RuntimeError("No files created by abcde") + raise RuntimeError( + "No audio files found after ripping.\n" + "abcde output:\n" + (result.stderr or result.stdout) + ) logger.info("Ripping completed: %d tracks in %s", len(audio_files), output_dir) return audio_files, tracks @@ -265,34 +330,19 @@ def rip_disc( device, output_dir, audio_format, quality, parallel_jobs, use_pipes, use_cddb ) - # Extract album name from first track (artist part) album_name = None - if tracks and len(tracks) > 0: - # For Various Artists, this will be "Sampler" or similar - # For single artist, this will be the artist name - album_name = tracks[0].artist - - # If CDDB data available, rename files if tracks: + album_name = tracks[0].artist _rename_files(output_dir, tracks, audio_format) return output_dir, album_name, tracks -def interactive_rip( - config: RipperConfig, -) -> None: +def interactive_rip(config: RipperConfig) -> None: """Interactive rip workflow for multiple CDs. - Prompts for each album/CD: - - Album name (or empty for default 'Album{N}') - - CD number (e.g., 1, 2, ...) - - Optional continuation - Files are placed under config.output_dir: - temp/Album_Name/CD1/01_-_title_-_artist.flac, ... - - If CDDB is available, files are automatically named. + Album_Name/CD1/01_-_title_-_artist.flac, ... Args: config: Ripper configuration @@ -300,45 +350,41 @@ def interactive_rip( print("\n" + "=" * 60) print(" Musiksammlung - Interactive CD Ripper (abcde)") print("=" * 60) - print(f"\nCD Drive: {config.device}") - print(f"Audio Format: {config.audio_format.value}") - print(f"Quality: {config.quality}") - print(f"CDDB Lookup: {config.use_cddb}") - print(f"Parallel Encodes: {config.parallel_jobs}") - print(f"Pipes: {config.use_pipes}") - print(f"Output Directory: {config.output_dir.absolute()}\n") + print(f"\nCD Drive: {config.device}") + print(f"Audio Format: {config.audio_format.value}") + print(f"Quality: {config.quality}") + print(f"CDDB Lookup: {config.use_cddb}") + print(f"Parallel Encodes: {config.parallel_jobs}") + print(f"Pipes: {config.use_pipes}") + print(f"Output Directory: {config.output_dir.absolute()}") + print("\nNote: Do not use arrow keys while typing — press Enter to confirm.\n") album_counter = 1 while True: print(f"\n--- Album {album_counter} ---") - # Ask for album name (optional, overridden if CDDB available) - album_name = input( - "Enter album name (or Enter for CDDB/default 'Album{N}'): " - ).strip() - default_album_name = album_name if album_name else f"Album{album_counter}" + raw = input("Album name (Enter = CDDB name / default 'Album{N}'): ") + album_name = _clean_input(raw) + if not album_name: + album_name = f"Album{album_counter}" disc_counter = 1 while True: - print(f"\n Album: {default_album_name}") + print(f"\n Album: {album_name}") print(f" CD Drive: {config.device}") - # Ask for disc number - disc_input = input( - " CD number for this CD [1]: " - ).strip() - disc_num = int(disc_input) if disc_input else 1 + raw_disc = input(" CD number [1]: ") + disc_num = int(_clean_input(raw_disc)) if _clean_input(raw_disc) else 1 - # Build target directory disc_dir = ( config.output_dir - / _sanitize_name(default_album_name) + / _sanitize_name(album_name) / f"CD{disc_num}" ) - print(f" Ripping CD to: {disc_dir.relative_to(config.output_dir)}") + print(f" Ripping to: {disc_dir.relative_to(config.output_dir)}") print(" (Ripping in progress, please wait...)") try: @@ -352,55 +398,44 @@ def interactive_rip( use_cddb=config.use_cddb, ) - # Show detected information - if tracks and detected_album: - print(f" ✓ CD {disc_num} ripped successfully") - print(f" Detected: {detected_album}") - if len(tracks) > 0: - print(f" Tracks: {len(tracks)}") - # Show first and last track - first = tracks[0] - last = tracks[-1] if len(tracks) > 1 else None - print(f" {first.track_number}. {first.title} ({first.artist})") - if last: - print(f" ... {last.track_number}. {last.title} ({last.artist})") + if tracks: + print(f" ✓ CD {disc_num} ripped successfully — {len(tracks)} tracks") + first = tracks[0] + last = tracks[-1] + print(f" {first.track_number:2d}. {first.title} — {first.artist}") + if last != first: + print(f" {last.track_number:2d}. {last.title} — {last.artist}") else: print(f" ✓ CD {disc_num} ripped successfully") except RuntimeError as e: print(f" ✗ Ripping error: {e}") - retry = input(" Try again? (y/n): ").strip().lower() - if retry != "y": + raw_retry = input(" Try again? (y/n): ") + if _clean_input(raw_retry).lower() != "y": print(" Aborting disc.") break continue - # Continue? - next_disc = input( - " Next CD for this album? (y/n): " - ).strip().lower() - - if next_disc != "y": + raw_next = input(" Next CD for this album? (y/n): ") + if _clean_input(raw_next).lower() != "y": break disc_counter += 1 - # Next album? - next_album = input("\nNext album? (y/n): ").strip().lower() - if next_album != "y": + raw_album = input("\nNext album? (y/n): ") + if _clean_input(raw_album).lower() != "y": break album_counter += 1 print("\n" + "=" * 60) print("Ripping completed!") - print(f"\nFiles are in: {config.output_dir.absolute()}") + print(f"Files are in: {config.output_dir.absolute()}") print("\nNext steps:") print(" 1. Check filenames and tags") if config.use_cddb: - print(" 2. Adjust tags and covers with 'musiksammlung apply'") + print(" 2. Adjust tags/covers with 'musiksammlung apply'") else: - print(" 2. Scan CD cover images") - print(" 3. 'musiksammlung scan' for album JSON") - print(" 4. 'musiksammlung apply' to organize & tag") + print(" 2. Run 'musiksammlung scan' to extract metadata") + print(" 3. Run 'musiksammlung apply' to organize & tag") print("=" * 60 + "\n")