diff --git a/src/musiksammlung/ripper.py b/src/musiksammlung/ripper.py index 4a0936b..f8c5a7b 100644 --- a/src/musiksammlung/ripper.py +++ b/src/musiksammlung/ripper.py @@ -50,11 +50,8 @@ def _clean_input(raw: str) -> str: Returns: Cleaned string """ - # Remove ANSI escape sequences (\x1b[... and ^[[...) cleaned = _ANSI_ESC.sub("", raw) - # Remove remaining control characters (backspace \x08, etc.) cleaned = re.sub(r"[\x00-\x1f\x7f]", "", cleaned) - # Strip surrounding whitespace and quotes cleaned = cleaned.strip().strip('"\'') return cleaned @@ -68,50 +65,147 @@ def _sanitize_name(name: str) -> str: Returns: Cleaned name (spaces -> underscores) """ - # Replace spaces with underscores name = name.replace(" ", "_") - # Keep umlauts and special characters - # Only remove problematic filename characters name = re.sub(r'[<>:"/\\|?*]', "", name) - # Remove leading/trailing underscores name = name.strip("_") return name -def _parse_cddb_response(output: str) -> list[TrackInfo]: - """Parse CDDB data from abcde output. +def _parse_cddb_lines(lines: list[str]) -> list[TrackInfo]: + """Parse CDDB track list from abcde output lines. + + Matches lines like: "1: Wolfgang Anheisser - Wer recht in Freuden wandern will" Args: - output: abcde stdout/stderr output + lines: Lines collected from abcde stdout+stderr Returns: - List of track information + List of TrackInfo (may be empty if CDDB lookup failed) """ tracks = [] - # Pattern: "N: Artist - Title" - pattern = re.compile(r"^\s*(\d+):\s*(.+?)\s*-\s*(.+)$") - - for line in output.split("\n"): - match = pattern.match(line) - if match: - track_num = int(match.group(1)) - artist = match.group(2).strip() - title = match.group(3).strip() - tracks.append(TrackInfo(track_num, artist, title)) - + pattern = re.compile(r"^\s*(\d+):\s*(.+?)\s+-\s+(.+)$") + for line in lines: + m = pattern.match(line) + if m: + tracks.append(TrackInfo( + track_number=int(m.group(1)), + artist=m.group(2).strip(), + title=m.group(3).strip(), + )) return tracks +def _stream_abcde( + process: subprocess.Popen, + use_cddb: bool, +) -> tuple[list[TrackInfo] | None, int]: + """Stream abcde output live, show meaningful progress, collect CDDB data. + + Filters abcde/cdparanoia output into three layers: + - Track progress: 'Grabbing track N: Title' + - Sector progress bar from cdparanoia + - CDDB/MusicBrainz info lines + + Args: + process: Running abcde subprocess + use_cddb: Whether to expect and parse CDDB output + + Returns: + Tuple (list of TrackInfo or None, total track count) + """ + grab_re = re.compile(r"Grabbing.*track\s+(\d+)(?:\s+of\s+(\d+))?[:\s]*(.*)", re.I) + tag_re = re.compile(r"Tagging track\s+(\d+)\s+of\s+(\d+)", re.I) + sector_re = re.compile(r"\(== PROGRESS ==.*\|\s*(\d+)\s+(\d+)\s*\]") + cddb_re = re.compile(r"^\s*(\d+):\s*(.+?)\s+-\s+(.+)$") + header_re = re.compile(r"-{2,}.+-{2,}") # ---- Artist / Album ---- + total_re = re.compile(r"tracks?:\s+([\d\s]+)", re.I) + + all_lines: list[str] = [] + cddb_lines: list[str] = [] + total_tracks = 0 + current_track = 0 + track_end_sector = 0 + + for raw in process.stdout: + line = raw.rstrip("\n\r") + all_lines.append(line) + + # ── Track count from "Grabbing entire CD - tracks: 01 02 03 ..." + m = total_re.search(line) + if m and total_tracks == 0: + nums = m.group(1).split() + if nums: + total_tracks = len(nums) + + # ── Grab / encode progress + m = grab_re.search(line) + if m: + current_track = int(m.group(1)) + if m.group(2): + total_tracks = int(m.group(2)) + title = m.group(3).strip().rstrip(".") + counter = f"{current_track}/{total_tracks}" if total_tracks else str(current_track) + print(f"\n Track {counter} {title}", flush=True) + track_end_sector = 0 # reset sector bar for new track + continue + + # ── Tagging progress + m = tag_re.search(line) + if m: + print(f"\r Tagging {m.group(1)}/{m.group(2)} ", flush=True) + continue + + # ── cdparanoia sector progress bar + m = sector_re.search(line) + if m: + cur = int(m.group(1)) + end = int(m.group(2)) if int(m.group(2)) > 0 else cur + if track_end_sector == 0: + track_end_sector = end + pct = min(cur / track_end_sector, 1.0) if track_end_sector > 0 else 0 + bar_w = 30 + filled = int(pct * bar_w) + bar = "█" * filled + "░" * (bar_w - filled) + mb = cur * 2352 / 1_048_576 # rough size in MB + print(f"\r [{bar}] {pct:5.1%} {mb:5.1f} MB", end="", flush=True) + continue + + # ── CDDB / MusicBrainz album header + if header_re.search(line): + print(f"\n {line.strip()}", flush=True) + continue + + # ── CDDB track lines "1: Artist - Title" + m = cddb_re.match(line) + if m: + cddb_lines.append(line) + continue + + # ── Other important info (errors, status) + stripped = line.strip() + if stripped and any(kw in line for kw in ( + "Retrieved", "Selected", "Finished", "MusicBrainz", + "Error", "ERROR", "Cannot", "failed", "No tracks", + )): + print(f"\n {stripped}", flush=True) + + returncode = process.wait() + + # Newline after last progress bar + print(flush=True) + + tracks = _parse_cddb_lines(cddb_lines) if use_cddb else None + return tracks, returncode + + def _extract_tracks(output_dir: Path, audio_format: AudioFormat) -> list[Path]: """Find abcde track files recursively and move them flat into output_dir. abcde stores encoded files inside its temp dir as: output_dir/abcde.XXXX/track01.flac - output_dir/abcde.XXXX/track02.flac ... - This function moves them to: + Moves them to: output_dir/track01.flac - output_dir/track02.flac ... Args: output_dir: Directory to search and target for flat layout @@ -121,7 +215,6 @@ def _extract_tracks(output_dir: Path, audio_format: AudioFormat) -> list[Path]: Sorted list of moved files in output_dir """ ext = audio_format.extension.lstrip(".") - # abcde names files trackNN.ext (with -p: track01, track02, ...) pattern = re.compile(rf"^track(\d+)\.{ext}$", re.IGNORECASE) moved = [] @@ -129,6 +222,7 @@ def _extract_tracks(output_dir: Path, audio_format: AudioFormat) -> list[Path]: if file.is_file() and pattern.match(file.name): dest = output_dir / file.name if file != dest: + logger.info("Extracting: %s", file.name) file.rename(dest) moved.append(dest) @@ -142,8 +236,10 @@ def _rename_files( ) -> None: """Rename track files according to naming scheme. - Expected input: track01.flac, track02.flac, ... - Output: 01_-_title_-_artist.flac, 02_-_title_-_artist.flac, ... + Input: track01.flac, track02.flac, ... + Output: 01_-_title_-_artist.flac, ... + + Falls back to plain 01.flac etc. for tracks without CDDB info. Args: output_dir: Directory with files @@ -151,44 +247,35 @@ def _rename_files( audio_format: Audio format """ ext = audio_format.extension.lstrip(".") - # Matches track01.flac, track02.flac, ... (abcde naming) abcde_pattern = re.compile(rf"^track(\d+)\.{ext}$", re.IGNORECASE) + by_num = {t.track_number: t for t in tracks} - audio_files = sorted(output_dir.glob(f"track*.{ext}")) - - for track in tracks: - for file in audio_files: - match = abcde_pattern.match(file.name) - if match and int(match.group(1)) == track.track_number: - track_num_padded = f"{track.track_number:02d}" - artist_clean = _sanitize_name(track.artist) - title_clean = _sanitize_name(track.title) - new_name = ( - f"{track_num_padded}_-_{title_clean}_-_" - f"{artist_clean}{audio_format.extension}" - ) - new_path = output_dir / new_name - if file != new_path: - logger.info("Renaming: %s -> %s", file.name, new_name) - file.rename(new_path) - break - - # Rename remaining track files without CDDB info (fallback: 01.flac, ...) for file in sorted(output_dir.glob(f"track*.{ext}")): - match = abcde_pattern.match(file.name) - if match: - num = int(match.group(1)) - new_path = output_dir / f"{num:02d}{audio_format.extension}" - if file != new_path: - logger.info("Renaming (no CDDB): %s -> %s", file.name, new_path.name) - file.rename(new_path) + m = abcde_pattern.match(file.name) + if not m: + continue + num = int(m.group(1)) + track = by_num.get(num) + if track: + new_name = ( + f"{num:02d}_-_{_sanitize_name(track.title)}_-_" + f"{_sanitize_name(track.artist)}{audio_format.extension}" + ) + else: + new_name = f"{num:02d}{audio_format.extension}" + + new_path = output_dir / new_name + if file != new_path: + logger.info("Renaming: %s → %s", file.name, new_name) + print(f" {file.name} → {new_name}", flush=True) + file.rename(new_path) def _rip_with_abcde( device: str, output_dir: Path, audio_format: AudioFormat, - quality: str = "medium", + quality: str = "high", parallel_jobs: int = 1, use_pipes: bool = False, use_cddb: bool = True, @@ -215,7 +302,7 @@ def _rip_with_abcde( # -o format: output format # -d device: CD drive # -x: eject CD after ripping - # -N: non-interactive (no prompts, auto-select first CDDB match) + # -N: non-interactive (auto-select first CDDB match, no prompts) cmd = [ "abcde", "-p", @@ -230,61 +317,48 @@ def _rip_with_abcde( else: cmd.extend(["-a", "read,encode"]) - # Parallel encodes if parallel_jobs > 1: cmd.extend(["-j", str(parallel_jobs)]) - # Use pipes if use_pipes: cmd.append("-P") - # Encoder options for quality encoder_opts = audio_format.get_encoder_options(quality) if encoder_opts: - # abcde accepts encoder options with colon: -o format:options cmd[-2] = f"{audio_format.get_abcde_format()}:{encoder_opts}" - logger.info( - "Starting abcde in %s (Format: %s, Quality: %s, CDDB: %s)", - output_dir, audio_format.value, quality, use_cddb, - ) - logger.debug("Command: %s", " ".join(cmd)) + print(f"\n Command: {' '.join(cmd)}", flush=True) + logger.info("Starting abcde: %s", " ".join(cmd)) - # Run abcde non-interactively, capture output for CDDB parsing - result = subprocess.run( + process = subprocess.Popen( cmd, cwd=str(output_dir), - capture_output=True, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, # merge stderr into stdout text=True, + bufsize=1, # line-buffered ) - # Log output for debugging - if result.stdout: - logger.debug("abcde stdout:\n%s", result.stdout) - if result.stderr: - logger.debug("abcde stderr:\n%s", result.stderr) + tracks, returncode = _stream_abcde(process, use_cddb) - if result.returncode != 0: - raise RuntimeError( - f"abcde failed (exit {result.returncode}).\n" - f"{result.stderr or result.stdout}" - ) + if returncode != 0: + raise RuntimeError(f"abcde failed (exit {returncode}).") - # Parse track info from CDDB output - tracks = None if use_cddb: - combined = result.stdout + result.stderr - tracks = _parse_cddb_response(combined) if tracks: - logger.info("CDDB data found: %d tracks", len(tracks)) + print(f"\n CDDB: {len(tracks)} tracks found", flush=True) + logger.info("CDDB data: %d tracks", len(tracks)) + else: + print("\n CDDB: no track data found", flush=True) + logger.warning("CDDB lookup returned no track data") # Extract track files from abcde's temp dir into output_dir (flat) audio_files = _extract_tracks(output_dir, audio_format) if not audio_files: raise RuntimeError( - "No audio files found after ripping.\n" - "abcde output:\n" + (result.stderr or result.stdout) + "No audio files found after ripping. " + "Check that a CD is in the drive." ) logger.info("Ripping completed: %d tracks in %s", len(audio_files), output_dir) @@ -295,7 +369,7 @@ def rip_disc( device: str, output_dir: Path, audio_format: AudioFormat = AudioFormat.FLAC, - quality: str = "medium", + quality: str = "high", parallel_jobs: int = 1, use_pipes: bool = False, use_cddb: bool = True, @@ -321,6 +395,7 @@ def rip_disc( album_name = None if tracks: album_name = tracks[0].artist + print("\n Renaming files ...", flush=True) _rename_files(output_dir, tracks, audio_format) return output_dir, album_name, tracks @@ -372,8 +447,8 @@ def interactive_rip(config: RipperConfig) -> None: / f"CD{disc_num}" ) - print(f" Ripping to: {disc_dir.relative_to(config.output_dir)}") - print(" (Ripping in progress, please wait...)") + print(f"\n Ripping to: {disc_dir}") + print(" " + "-" * 50) try: _, detected_album, tracks = rip_disc( @@ -386,25 +461,23 @@ def interactive_rip(config: RipperConfig) -> None: use_cddb=config.use_cddb, ) + print("\n " + "-" * 50) if tracks: - print(f" ✓ CD {disc_num} ripped successfully — {len(tracks)} tracks") - first = tracks[0] - last = tracks[-1] - print(f" {first.track_number:2d}. {first.title} — {first.artist}") - if last != first: - print(f" {last.track_number:2d}. {last.title} — {last.artist}") + print(f" ✓ Done — {len(tracks)} tracks") + for t in tracks: + print(f" {t.track_number:2d}. {t.title} [{t.artist}]") else: - print(f" ✓ CD {disc_num} ripped successfully") + print(" ✓ Done (no CDDB data)") except RuntimeError as e: - print(f" ✗ Ripping error: {e}") + print(f"\n ✗ Error: {e}") raw_retry = input(" Try again? (y/n): ") if _clean_input(raw_retry).lower() != "y": print(" Aborting disc.") break continue - raw_next = input(" Next CD for this album? (y/n): ") + raw_next = input("\n Next CD for this album? (y/n): ") if _clean_input(raw_next).lower() != "y": break