Add project skeleton: CLI pipeline for CD digitization
Modular Python package with Typer CLI (scan/apply/process commands), Pydantic data models, OCR via Tesseract, LLM-based tracklist parsing, mutagen audio tagging, M3U playlist generation, and cover processing. Includes 8 passing tests and ruff lint config. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
225f6b3dbf
commit
3e073250ca
17 changed files with 1027 additions and 0 deletions
3
src/musiksammlung/__init__.py
Normal file
3
src/musiksammlung/__init__.py
Normal file
|
|
@ -0,0 +1,3 @@
|
|||
"""Musiksammlung – CLI-Tool zum Digitalisieren von CD-Sammlungen für Jellyfin."""
|
||||
|
||||
__version__ = "0.1.0"
|
||||
175
src/musiksammlung/cli.py
Normal file
175
src/musiksammlung/cli.py
Normal file
|
|
@ -0,0 +1,175 @@
|
|||
"""CLI-Interface mit Typer."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import logging
|
||||
from pathlib import Path
|
||||
|
||||
import typer
|
||||
|
||||
from musiksammlung.cover import copy_covers
|
||||
from musiksammlung.llm_parser import parse_tracklist
|
||||
from musiksammlung.models import Album
|
||||
from musiksammlung.ocr import ocr_images
|
||||
from musiksammlung.organizer import apply_mapping, build_mapping
|
||||
from musiksammlung.playlist import generate_playlist
|
||||
from musiksammlung.tagger import tag_album
|
||||
|
||||
app = typer.Typer(
|
||||
name="musiksammlung",
|
||||
help="CLI-Tool zum Digitalisieren von CD-Sammlungen für Jellyfin.",
|
||||
)
|
||||
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
|
||||
)
|
||||
|
||||
|
||||
@app.command()
|
||||
def scan(
|
||||
images: list[Path] = typer.Argument(..., help="Bilder der CD-Rückseite/Booklet"),
|
||||
output: Path = typer.Option("album.json", "--output", "-o", help="Ausgabe-JSON-Datei"),
|
||||
languages: str = typer.Option("deu+eng", "--lang", "-l", help="OCR-Sprachen"),
|
||||
backend: str = typer.Option("ollama", "--backend", "-b", help="LLM-Backend"),
|
||||
model: str = typer.Option("llama3", "--model", "-m", help="LLM-Modell"),
|
||||
base_url: str = typer.Option("http://localhost:11434", "--url", help="LLM-API-URL"),
|
||||
) -> None:
|
||||
"""OCR + LLM → Album-JSON erzeugen (zur Prüfung vor dem Anwenden)."""
|
||||
# Bilder prüfen
|
||||
for img in images:
|
||||
if not img.exists():
|
||||
typer.echo(f"Fehler: Bild nicht gefunden: {img}", err=True)
|
||||
raise typer.Exit(1)
|
||||
|
||||
typer.echo("Starte OCR...")
|
||||
ocr_text = ocr_images(images, languages)
|
||||
typer.echo(f"OCR-Text ({len(ocr_text)} Zeichen) erkannt.")
|
||||
|
||||
typer.echo("Starte LLM-Parsing...")
|
||||
album = parse_tracklist(ocr_text, backend=backend, model=model, base_url=base_url)
|
||||
|
||||
output.write_text(album.model_dump_json(indent=2), encoding="utf-8")
|
||||
typer.echo(f"Album-JSON gespeichert: {output}")
|
||||
typer.echo(f" Artist: {album.artist}")
|
||||
typer.echo(f" Album: {album.album}")
|
||||
typer.echo(f" Year: {album.year}")
|
||||
for disc in album.discs:
|
||||
typer.echo(f" Disc {disc.disc_number}: {len(disc.tracks)} Tracks")
|
||||
|
||||
|
||||
@app.command()
|
||||
def apply(
|
||||
input_dir: Path = typer.Argument(..., help="Verzeichnis mit gerippten Audiodateien"),
|
||||
album_json: Path = typer.Argument(..., help="Album-JSON aus 'scan'"),
|
||||
output_dir: Path = typer.Argument(..., help="Jellyfin-Musikverzeichnis"),
|
||||
front: Path | None = typer.Option(None, "--front", help="Front-Cover-Bild"),
|
||||
back: Path | None = typer.Option(None, "--back", help="Rückseiten-Cover-Bild"),
|
||||
dry_run: bool = typer.Option(False, "--dry-run", help="Nur anzeigen, nichts ändern"),
|
||||
) -> None:
|
||||
"""Album-JSON + Audiodateien → Jellyfin-Struktur aufbauen."""
|
||||
# JSON laden und validieren
|
||||
raw = json.loads(album_json.read_text(encoding="utf-8"))
|
||||
album = Album.model_validate(raw)
|
||||
|
||||
# Mapping berechnen und anzeigen
|
||||
mapping = build_mapping(album, input_dir, output_dir)
|
||||
typer.echo(f"Mapping: {len(mapping)} Dateien")
|
||||
for src, dst in mapping.items():
|
||||
typer.echo(f" {src.name} → {dst.relative_to(output_dir)}")
|
||||
|
||||
if dry_run:
|
||||
typer.echo("[DRY-RUN] Keine Änderungen vorgenommen.")
|
||||
return
|
||||
|
||||
# Dateien verschieben
|
||||
apply_mapping(mapping)
|
||||
|
||||
# Album-Verzeichnis bestimmen
|
||||
first_target = next(iter(mapping.values()))
|
||||
if len(album.discs) > 1:
|
||||
album_dir = first_target.parent.parent # CD1/ → Album/
|
||||
else:
|
||||
album_dir = first_target.parent
|
||||
|
||||
# Tags setzen
|
||||
typer.echo("Setze Audio-Tags...")
|
||||
tag_album(album, album_dir)
|
||||
|
||||
# Cover kopieren
|
||||
copy_covers(front, back, album_dir)
|
||||
|
||||
# Playlist erzeugen
|
||||
generate_playlist(album, album_dir)
|
||||
|
||||
typer.echo(f"Fertig! Album liegt in: {album_dir}")
|
||||
|
||||
|
||||
@app.command()
|
||||
def process(
|
||||
input_dir: Path = typer.Argument(..., help="Verzeichnis mit Audiodateien und Bildern"),
|
||||
output_dir: Path = typer.Argument(..., help="Jellyfin-Musikverzeichnis"),
|
||||
front: Path | None = typer.Option(None, "--front", help="Front-Cover-Bild"),
|
||||
back: Path | None = typer.Option(None, "--back", help="Rückseiten-Bild (für OCR + Cover)"),
|
||||
images: list[Path] | None = typer.Option(
|
||||
None, "--image", "-i", help="Zusätzliche Bilder für OCR"
|
||||
),
|
||||
languages: str = typer.Option("deu+eng", "--lang", "-l"),
|
||||
backend: str = typer.Option("ollama", "--backend", "-b"),
|
||||
model: str = typer.Option("llama3", "--model", "-m"),
|
||||
base_url: str = typer.Option("http://localhost:11434", "--url"),
|
||||
dry_run: bool = typer.Option(False, "--dry-run"),
|
||||
) -> None:
|
||||
"""Komplett-Pipeline: OCR → LLM → Organize → Tag → Playlist."""
|
||||
# OCR-Bilder zusammenstellen
|
||||
ocr_sources: list[Path] = []
|
||||
if back and back.exists():
|
||||
ocr_sources.append(back)
|
||||
if images:
|
||||
ocr_sources.extend(images)
|
||||
|
||||
if not ocr_sources:
|
||||
typer.echo("Fehler: Mindestens ein Bild für OCR nötig (--back oder --image)", err=True)
|
||||
raise typer.Exit(1)
|
||||
|
||||
# 1. OCR
|
||||
typer.echo("Schritt 1/5: OCR...")
|
||||
ocr_text = ocr_images(ocr_sources, languages)
|
||||
|
||||
# 2. LLM-Parsing
|
||||
typer.echo("Schritt 2/5: LLM-Parsing...")
|
||||
album = parse_tracklist(ocr_text, backend=backend, model=model, base_url=base_url)
|
||||
typer.echo(f" → {album.artist} – {album.album} ({album.year})")
|
||||
|
||||
# JSON zur Kontrolle speichern
|
||||
json_path = input_dir / "album.json"
|
||||
json_path.write_text(album.model_dump_json(indent=2), encoding="utf-8")
|
||||
|
||||
# 3. Dateien organisieren
|
||||
typer.echo("Schritt 3/5: Dateien organisieren...")
|
||||
mapping = build_mapping(album, input_dir, output_dir)
|
||||
apply_mapping(mapping, dry_run=dry_run)
|
||||
|
||||
if dry_run:
|
||||
typer.echo("[DRY-RUN] Abbruch nach Mapping-Anzeige.")
|
||||
return
|
||||
|
||||
# Album-Verzeichnis bestimmen
|
||||
first_target = next(iter(mapping.values()))
|
||||
album_dir = first_target.parent.parent if len(album.discs) > 1 else first_target.parent
|
||||
|
||||
# 4. Tags + Cover
|
||||
typer.echo("Schritt 4/5: Tags & Cover...")
|
||||
tag_album(album, album_dir)
|
||||
copy_covers(front, back, album_dir)
|
||||
|
||||
# 5. Playlist
|
||||
typer.echo("Schritt 5/5: Playlist...")
|
||||
generate_playlist(album, album_dir)
|
||||
|
||||
typer.echo(f"Fertig! Album: {album_dir}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
app()
|
||||
28
src/musiksammlung/config.py
Normal file
28
src/musiksammlung/config.py
Normal file
|
|
@ -0,0 +1,28 @@
|
|||
"""Konfiguration und Defaults."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
# Unterstützte Audio-Formate
|
||||
AUDIO_EXTENSIONS = {".flac", ".mp3", ".ogg", ".opus", ".wav", ".m4a"}
|
||||
|
||||
# Standard-Bilddateien, die als Cover/Rückseite erkannt werden
|
||||
DEFAULT_FRONT_PATTERNS = ["cover_front.*", "front.*", "cover.*"]
|
||||
DEFAULT_BACK_PATTERNS = ["cover_back.*", "back.*", "inlay.*", "booklet.*"]
|
||||
|
||||
|
||||
class AppConfig(BaseModel):
|
||||
"""Globale Konfiguration für einen Durchlauf."""
|
||||
|
||||
input_dir: Path
|
||||
output_dir: Path
|
||||
audio_format: str = "flac"
|
||||
cd_device: str = "/dev/cdrom"
|
||||
ocr_languages: str = "deu+eng"
|
||||
llm_backend: str = "ollama" # "ollama", "openai", "anthropic"
|
||||
llm_model: str = "llama3"
|
||||
llm_base_url: str = "http://localhost:11434"
|
||||
dry_run: bool = False
|
||||
51
src/musiksammlung/cover.py
Normal file
51
src/musiksammlung/cover.py
Normal file
|
|
@ -0,0 +1,51 @@
|
|||
"""Cover-Bilder verarbeiten und ins Album-Verzeichnis kopieren."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from pathlib import Path
|
||||
|
||||
from PIL import Image
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Jellyfin erkennt diese Dateinamen automatisch
|
||||
FRONT_COVER_NAME = "cover.jpg"
|
||||
BACK_COVER_NAME = "back.jpg"
|
||||
|
||||
|
||||
def prepare_cover(source: Path, target: Path, max_size: int = 1200) -> None:
|
||||
"""Kopiert und optimiert ein Cover-Bild.
|
||||
|
||||
Konvertiert zu JPEG, begrenzt Größe auf max_size Pixel (längste Seite).
|
||||
"""
|
||||
img = Image.open(source)
|
||||
|
||||
# Auf max_size skalieren, Seitenverhältnis beibehalten
|
||||
if max(img.size) > max_size:
|
||||
img.thumbnail((max_size, max_size), Image.LANCZOS)
|
||||
|
||||
# In RGB konvertieren (JPEG unterstützt kein RGBA)
|
||||
if img.mode in ("RGBA", "P"):
|
||||
img = img.convert("RGB")
|
||||
|
||||
target.parent.mkdir(parents=True, exist_ok=True)
|
||||
img.save(target, "JPEG", quality=90)
|
||||
logger.info("Cover gespeichert: %s → %s", source.name, target)
|
||||
|
||||
|
||||
def copy_covers(
|
||||
front_image: Path | None,
|
||||
back_image: Path | None,
|
||||
album_dir: Path,
|
||||
) -> None:
|
||||
"""Kopiert Front- und Rückseiten-Cover in das Album-Verzeichnis."""
|
||||
if front_image and front_image.exists():
|
||||
prepare_cover(front_image, album_dir / FRONT_COVER_NAME)
|
||||
else:
|
||||
logger.warning("Kein Front-Cover gefunden")
|
||||
|
||||
if back_image and back_image.exists():
|
||||
prepare_cover(back_image, album_dir / BACK_COVER_NAME)
|
||||
else:
|
||||
logger.debug("Kein Back-Cover angegeben")
|
||||
126
src/musiksammlung/llm_parser.py
Normal file
126
src/musiksammlung/llm_parser.py
Normal file
|
|
@ -0,0 +1,126 @@
|
|||
"""LLM-basiertes Parsing von OCR-Text zu strukturierten Album-Daten."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import logging
|
||||
|
||||
import httpx
|
||||
from pydantic import ValidationError
|
||||
|
||||
from musiksammlung.models import Album
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
SYSTEM_PROMPT = """\
|
||||
Du bist ein Parser für CD-Rückseiten und Tracklisten.
|
||||
Analysiere den OCR-Text und extrahiere: Artist, Albumtitel, Jahr (falls vorhanden) \
|
||||
und für jede CD die Tracks in korrekter Reihenfolge.
|
||||
Ignoriere Werbung, Copyright-Hinweise und Kleingedrucktes.
|
||||
|
||||
Regeln:
|
||||
- Wenn es Hinweise wie "CD 1", "CD 2", "Disc 1", "Disc 2" gibt, ordne die Tracks \
|
||||
der entsprechenden disc_number zu.
|
||||
- Ohne Disc-Angabe: alles als disc_number=1 behandeln.
|
||||
- Zusätze wie "live", "bonus track", "remastered" gehören in den Tracktitel.
|
||||
- Bei Unsicherheit: Feld weglassen oder null setzen, nichts erfinden.
|
||||
|
||||
Gib ausschließlich valides JSON zurück, kein anderer Text. Format:
|
||||
{
|
||||
"artist": "...",
|
||||
"album": "...",
|
||||
"year": 1987,
|
||||
"discs": [
|
||||
{
|
||||
"disc_number": 1,
|
||||
"name": null,
|
||||
"tracks": [
|
||||
{"track_number": 1, "title": "..."},
|
||||
{"track_number": 2, "title": "..."}
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
"""
|
||||
|
||||
|
||||
def _call_ollama(ocr_text: str, model: str, base_url: str) -> str:
|
||||
"""Ruft Ollama-API auf und gibt die Antwort als String zurück."""
|
||||
response = httpx.post(
|
||||
f"{base_url}/api/generate",
|
||||
json={
|
||||
"model": model,
|
||||
"system": SYSTEM_PROMPT,
|
||||
"prompt": ocr_text,
|
||||
"stream": False,
|
||||
"format": "json",
|
||||
},
|
||||
timeout=120.0,
|
||||
)
|
||||
response.raise_for_status()
|
||||
return response.json()["response"]
|
||||
|
||||
|
||||
def _call_openai_compatible(
|
||||
ocr_text: str, model: str, base_url: str, api_key: str | None = None
|
||||
) -> str:
|
||||
"""Ruft eine OpenAI-kompatible API auf (OpenAI, Anthropic via Proxy, etc.)."""
|
||||
headers = {}
|
||||
if api_key:
|
||||
headers["Authorization"] = f"Bearer {api_key}"
|
||||
|
||||
response = httpx.post(
|
||||
f"{base_url}/v1/chat/completions",
|
||||
headers=headers,
|
||||
json={
|
||||
"model": model,
|
||||
"messages": [
|
||||
{"role": "system", "content": SYSTEM_PROMPT},
|
||||
{"role": "user", "content": ocr_text},
|
||||
],
|
||||
"response_format": {"type": "json_object"},
|
||||
},
|
||||
timeout=120.0,
|
||||
)
|
||||
response.raise_for_status()
|
||||
return response.json()["choices"][0]["message"]["content"]
|
||||
|
||||
|
||||
def parse_tracklist(
|
||||
ocr_text: str,
|
||||
backend: str = "ollama",
|
||||
model: str = "llama3",
|
||||
base_url: str = "http://localhost:11434",
|
||||
api_key: str | None = None,
|
||||
max_retries: int = 2,
|
||||
) -> Album:
|
||||
"""Parst OCR-Text via LLM zu einem Album-Modell.
|
||||
|
||||
Args:
|
||||
ocr_text: Rohtext aus der OCR-Erkennung
|
||||
backend: 'ollama' oder 'openai'
|
||||
model: Modellname
|
||||
base_url: API-Basis-URL
|
||||
api_key: API-Key (nur für OpenAI-kompatible Backends)
|
||||
max_retries: Anzahl Wiederholungsversuche bei ungültigem JSON
|
||||
|
||||
Returns:
|
||||
Validiertes Album-Objekt
|
||||
"""
|
||||
for attempt in range(max_retries + 1):
|
||||
try:
|
||||
if backend == "ollama":
|
||||
raw = _call_ollama(ocr_text, model, base_url)
|
||||
else:
|
||||
raw = _call_openai_compatible(ocr_text, model, base_url, api_key)
|
||||
|
||||
data = json.loads(raw)
|
||||
album = Album.model_validate(data)
|
||||
logger.info("LLM-Parsing erfolgreich: %s - %s", album.artist, album.album)
|
||||
return album
|
||||
|
||||
except (json.JSONDecodeError, ValidationError) as e:
|
||||
logger.warning("Versuch %d/%d fehlgeschlagen: %s", attempt + 1, max_retries + 1, e)
|
||||
if attempt == max_retries:
|
||||
msg = f"LLM lieferte nach {max_retries + 1} Versuchen kein valides JSON"
|
||||
raise ValueError(msg) from e
|
||||
38
src/musiksammlung/models.py
Normal file
38
src/musiksammlung/models.py
Normal file
|
|
@ -0,0 +1,38 @@
|
|||
"""Zentrale Datenmodelle für Album, Disc und Track."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import re
|
||||
|
||||
from pydantic import BaseModel, field_validator
|
||||
|
||||
|
||||
class Track(BaseModel):
|
||||
track_number: int
|
||||
title: str
|
||||
|
||||
|
||||
class Disc(BaseModel):
|
||||
disc_number: int
|
||||
name: str | None = None # z.B. "Live in Berlin"
|
||||
tracks: list[Track]
|
||||
|
||||
|
||||
class Album(BaseModel):
|
||||
artist: str
|
||||
album: str
|
||||
year: int | None = None
|
||||
discs: list[Disc]
|
||||
|
||||
@field_validator("album", "artist")
|
||||
@classmethod
|
||||
def sanitize_name(cls, v: str) -> str:
|
||||
"""Entfernt Zeichen, die in Dateinamen problematisch sind."""
|
||||
return re.sub(r'[<>:"/\\|?*]', "_", v).strip()
|
||||
|
||||
@property
|
||||
def folder_name(self) -> str:
|
||||
"""Jellyfin-konformer Ordnername: 'Album (Year)' oder nur 'Album'."""
|
||||
if self.year:
|
||||
return f"{self.album} ({self.year})"
|
||||
return self.album
|
||||
70
src/musiksammlung/ocr.py
Normal file
70
src/musiksammlung/ocr.py
Normal file
|
|
@ -0,0 +1,70 @@
|
|||
"""OCR via Tesseract mit optionaler Bildvorverarbeitung."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import subprocess
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
|
||||
from PIL import Image, ImageFilter, ImageOps
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def preprocess_image(image_path: Path) -> Path:
|
||||
"""Verbessert Kontrast und Schärfe für bessere OCR-Ergebnisse.
|
||||
|
||||
Returns:
|
||||
Pfad zum vorverarbeiteten Bild (temporäre Datei).
|
||||
"""
|
||||
img = Image.open(image_path)
|
||||
img = ImageOps.grayscale(img)
|
||||
img = ImageOps.autocontrast(img, cutoff=2)
|
||||
img = img.filter(ImageFilter.SHARPEN)
|
||||
|
||||
tmp = tempfile.NamedTemporaryFile(suffix=".png", delete=False)
|
||||
img.save(tmp.name, dpi=(300, 300))
|
||||
logger.debug("Vorverarbeitetes Bild: %s → %s", image_path, tmp.name)
|
||||
return Path(tmp.name)
|
||||
|
||||
|
||||
def run_ocr(image_path: Path, languages: str = "deu+eng") -> str:
|
||||
"""Führt Tesseract-OCR auf einem Bild aus.
|
||||
|
||||
Args:
|
||||
image_path: Pfad zum Bild
|
||||
languages: Tesseract-Sprachcodes, z.B. 'deu+eng'
|
||||
|
||||
Returns:
|
||||
Erkannter Text als String.
|
||||
"""
|
||||
cmd = [
|
||||
"tesseract",
|
||||
str(image_path),
|
||||
"stdout",
|
||||
"-l", languages,
|
||||
"--psm", "6", # einheitlicher Textblock
|
||||
]
|
||||
|
||||
logger.info("OCR: %s", " ".join(cmd))
|
||||
result = subprocess.run(cmd, capture_output=True, text=True)
|
||||
|
||||
if result.returncode != 0:
|
||||
raise RuntimeError(f"Tesseract fehlgeschlagen: {result.stderr}")
|
||||
|
||||
return result.stdout.strip()
|
||||
|
||||
|
||||
def ocr_images(image_paths: list[Path], languages: str = "deu+eng") -> str:
|
||||
"""Führt OCR auf mehreren Bildern aus und fügt die Texte zusammen."""
|
||||
texts = []
|
||||
for path in image_paths:
|
||||
preprocessed = preprocess_image(path)
|
||||
try:
|
||||
text = run_ocr(preprocessed, languages)
|
||||
if text:
|
||||
texts.append(text)
|
||||
finally:
|
||||
preprocessed.unlink(missing_ok=True)
|
||||
return "\n\n".join(texts)
|
||||
92
src/musiksammlung/organizer.py
Normal file
92
src/musiksammlung/organizer.py
Normal file
|
|
@ -0,0 +1,92 @@
|
|||
"""Verzeichnisstruktur anlegen und Audiodateien umbenennen."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import re
|
||||
import shutil
|
||||
from pathlib import Path
|
||||
|
||||
from musiksammlung.config import AUDIO_EXTENSIONS
|
||||
from musiksammlung.models import Album
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _sanitize_filename(name: str) -> str:
|
||||
"""Entfernt problematische Zeichen aus Dateinamen."""
|
||||
return re.sub(r'[<>:"/\\|?*]', "_", name).strip()
|
||||
|
||||
|
||||
def discover_audio_files(directory: Path) -> list[Path]:
|
||||
"""Findet und sortiert Audiodateien numerisch nach Track-Nummer."""
|
||||
files = [f for f in directory.iterdir() if f.suffix.lower() in AUDIO_EXTENSIONS]
|
||||
# Sortiere nach der Zahl im Dateinamen (z.B. Track_01 → 1)
|
||||
def extract_number(p: Path) -> int:
|
||||
match = re.search(r"(\d+)", p.stem)
|
||||
return int(match.group(1)) if match else 0
|
||||
return sorted(files, key=extract_number)
|
||||
|
||||
|
||||
def build_mapping(
|
||||
album: Album,
|
||||
input_dir: Path,
|
||||
output_root: Path,
|
||||
) -> dict[Path, Path]:
|
||||
"""Berechnet das Quell→Ziel-Mapping für alle Audiodateien.
|
||||
|
||||
Args:
|
||||
album: Validiertes Album-Modell
|
||||
input_dir: Verzeichnis mit den gerippten Dateien
|
||||
output_root: Jellyfin-Musikverzeichnis
|
||||
|
||||
Returns:
|
||||
Dict von Quellpfad → Zielpfad
|
||||
"""
|
||||
artist_dir = _sanitize_filename(album.artist)
|
||||
album_dir = output_root / artist_dir / _sanitize_filename(album.folder_name)
|
||||
mapping: dict[Path, Path] = {}
|
||||
multi_disc = len(album.discs) > 1
|
||||
|
||||
for disc in album.discs:
|
||||
# Quellverzeichnis: bei Multi-CD z.B. input_dir/CD1, sonst input_dir direkt
|
||||
if multi_disc:
|
||||
source_dir = input_dir / f"CD{disc.disc_number}"
|
||||
target_dir = album_dir / f"CD{disc.disc_number}"
|
||||
else:
|
||||
source_dir = input_dir
|
||||
target_dir = album_dir
|
||||
|
||||
audio_files = discover_audio_files(source_dir)
|
||||
|
||||
if len(audio_files) != len(disc.tracks):
|
||||
logger.warning(
|
||||
"Disc %d: %d Dateien gefunden, aber %d Tracks im JSON",
|
||||
disc.disc_number,
|
||||
len(audio_files),
|
||||
len(disc.tracks),
|
||||
)
|
||||
|
||||
for audio_file, track in zip(audio_files, disc.tracks):
|
||||
safe_title = _sanitize_filename(track.title)
|
||||
new_name = f"{track.track_number:02d} {safe_title}{audio_file.suffix}"
|
||||
mapping[audio_file] = target_dir / new_name
|
||||
|
||||
return mapping
|
||||
|
||||
|
||||
def apply_mapping(mapping: dict[Path, Path], dry_run: bool = False) -> None:
|
||||
"""Verschiebt/benennt Dateien gemäß dem Mapping um.
|
||||
|
||||
Args:
|
||||
mapping: Quellpfad → Zielpfad
|
||||
dry_run: Wenn True, nur loggen ohne Dateien zu bewegen
|
||||
"""
|
||||
for source, target in mapping.items():
|
||||
if dry_run:
|
||||
logger.info("[DRY-RUN] %s → %s", source, target)
|
||||
continue
|
||||
|
||||
target.parent.mkdir(parents=True, exist_ok=True)
|
||||
shutil.move(str(source), str(target))
|
||||
logger.info("Verschoben: %s → %s", source, target)
|
||||
57
src/musiksammlung/playlist.py
Normal file
57
src/musiksammlung/playlist.py
Normal file
|
|
@ -0,0 +1,57 @@
|
|||
"""M3U-Playlist-Generierung für Jellyfin."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from pathlib import Path
|
||||
|
||||
from musiksammlung.models import Album
|
||||
from musiksammlung.organizer import _sanitize_filename
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def generate_playlist(album: Album, album_dir: Path) -> Path:
|
||||
"""Erzeugt eine M3U-Playlist für das gesamte Album.
|
||||
|
||||
Die Playlist liegt im Album-Root und referenziert alle Tracks
|
||||
über relative Pfade (CD1/01 Titel.flac, CD2/01 Titel.flac, ...).
|
||||
|
||||
Returns:
|
||||
Pfad zur erzeugten Playlist-Datei.
|
||||
"""
|
||||
playlist_name = _sanitize_filename(album.album) + ".m3u"
|
||||
playlist_path = album_dir / playlist_name
|
||||
multi_disc = len(album.discs) > 1
|
||||
|
||||
lines = ["#EXTM3U"]
|
||||
|
||||
for disc in album.discs:
|
||||
if multi_disc:
|
||||
disc_prefix = f"CD{disc.disc_number}/"
|
||||
else:
|
||||
disc_prefix = ""
|
||||
|
||||
for track in disc.tracks:
|
||||
safe_title = _sanitize_filename(track.title)
|
||||
# Audiodatei im Zielverzeichnis finden
|
||||
pattern = f"{track.track_number:02d} {safe_title}.*"
|
||||
if multi_disc:
|
||||
search_dir = album_dir / f"CD{disc.disc_number}"
|
||||
else:
|
||||
search_dir = album_dir
|
||||
|
||||
matches = list(search_dir.glob(pattern))
|
||||
if matches:
|
||||
filename = matches[0].name
|
||||
else:
|
||||
# Fallback: generischer Name mit .flac
|
||||
filename = f"{track.track_number:02d} {safe_title}.flac"
|
||||
logger.warning("Datei nicht gefunden, Fallback: %s", filename)
|
||||
|
||||
lines.append(f"#EXTINF:0,{track.title}")
|
||||
lines.append(f"{disc_prefix}{filename}")
|
||||
|
||||
playlist_path.write_text("\n".join(lines) + "\n", encoding="utf-8")
|
||||
logger.info("Playlist erstellt: %s", playlist_path)
|
||||
return playlist_path
|
||||
57
src/musiksammlung/ripper.py
Normal file
57
src/musiksammlung/ripper.py
Normal file
|
|
@ -0,0 +1,57 @@
|
|||
"""CD-Ripping via abcde."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def rip_disc(
|
||||
device: str,
|
||||
output_dir: Path,
|
||||
audio_format: str = "flac",
|
||||
eject: bool = True,
|
||||
) -> Path:
|
||||
"""Rippt eine CD mit abcde in output_dir.
|
||||
|
||||
Args:
|
||||
device: CD-Laufwerk, z.B. '/dev/cdrom'
|
||||
output_dir: Zielverzeichnis für die gerippten Dateien
|
||||
audio_format: Ausgabeformat (flac, mp3, ogg, opus)
|
||||
eject: CD nach dem Rippen auswerfen
|
||||
|
||||
Returns:
|
||||
Pfad zum Verzeichnis mit den gerippten Dateien
|
||||
"""
|
||||
output_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
cmd = [
|
||||
"abcde",
|
||||
"-n", # kein CDDB-Lookup
|
||||
"-N", # non-interaktiv
|
||||
"-p", # führende Nullen bei Tracknummern
|
||||
"-o", audio_format,
|
||||
"-d", device,
|
||||
"-D", # kein Debug
|
||||
]
|
||||
if eject:
|
||||
cmd.append("-x")
|
||||
|
||||
logger.info("Starte Ripping: %s", " ".join(cmd))
|
||||
|
||||
result = subprocess.run(
|
||||
cmd,
|
||||
cwd=str(output_dir),
|
||||
capture_output=True,
|
||||
text=True,
|
||||
)
|
||||
|
||||
if result.returncode != 0:
|
||||
logger.error("abcde Fehler: %s", result.stderr)
|
||||
raise RuntimeError(f"abcde fehlgeschlagen (exit {result.returncode}): {result.stderr}")
|
||||
|
||||
logger.info("Ripping abgeschlossen: %s", output_dir)
|
||||
return output_dir
|
||||
99
src/musiksammlung/tagger.py
Normal file
99
src/musiksammlung/tagger.py
Normal file
|
|
@ -0,0 +1,99 @@
|
|||
"""Audio-Tagging mit mutagen."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from pathlib import Path
|
||||
|
||||
from mutagen import File as MutagenFile
|
||||
from mutagen.flac import FLAC, Picture
|
||||
from mutagen.id3 import APIC, ID3
|
||||
|
||||
from musiksammlung.models import Album, Disc, Track
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def tag_file(
|
||||
path: Path,
|
||||
album: Album,
|
||||
disc: Disc,
|
||||
track: Track,
|
||||
) -> None:
|
||||
"""Setzt Audio-Tags auf einer Datei.
|
||||
|
||||
Verwendet mutagen im Easy-Modus für formatunabhängiges Tagging.
|
||||
"""
|
||||
audio = MutagenFile(str(path), easy=True)
|
||||
if audio is None:
|
||||
logger.warning("Kann Datei nicht öffnen: %s", path)
|
||||
return
|
||||
|
||||
audio["artist"] = album.artist
|
||||
audio["album"] = album.album
|
||||
audio["albumartist"] = album.artist
|
||||
audio["title"] = track.title
|
||||
audio["tracknumber"] = f"{track.track_number}/{len(disc.tracks)}"
|
||||
audio["discnumber"] = str(disc.disc_number)
|
||||
|
||||
if album.year:
|
||||
audio["date"] = str(album.year)
|
||||
|
||||
audio.save()
|
||||
logger.info("Tags gesetzt: %s", path.name)
|
||||
|
||||
|
||||
def tag_album(album: Album, album_dir: Path) -> None:
|
||||
"""Setzt Tags auf allen Audiodateien eines Albums."""
|
||||
multi_disc = len(album.discs) > 1
|
||||
|
||||
for disc in album.discs:
|
||||
if multi_disc:
|
||||
disc_dir = album_dir / f"CD{disc.disc_number}"
|
||||
else:
|
||||
disc_dir = album_dir
|
||||
|
||||
for track in disc.tracks:
|
||||
# Dateiname-Pattern: "01 Titel.ext"
|
||||
pattern = f"{track.track_number:02d} *"
|
||||
matches = list(disc_dir.glob(pattern))
|
||||
if matches:
|
||||
tag_file(matches[0], album, disc, track)
|
||||
else:
|
||||
logger.warning(
|
||||
"Keine Datei für Track %d: %s", track.track_number, track.title
|
||||
)
|
||||
|
||||
|
||||
def embed_cover(audio_path: Path, cover_path: Path) -> None:
|
||||
"""Bettet ein Cover-Bild in eine Audiodatei ein."""
|
||||
cover_data = cover_path.read_bytes()
|
||||
mime = "image/jpeg" if cover_path.suffix.lower() in (".jpg", ".jpeg") else "image/png"
|
||||
|
||||
suffix = audio_path.suffix.lower()
|
||||
|
||||
if suffix == ".flac":
|
||||
audio = FLAC(str(audio_path))
|
||||
pic = Picture()
|
||||
pic.type = 3 # Front cover
|
||||
pic.mime = mime
|
||||
pic.data = cover_data
|
||||
audio.add_picture(pic)
|
||||
audio.save()
|
||||
|
||||
elif suffix == ".mp3":
|
||||
audio = ID3(str(audio_path))
|
||||
audio.add(APIC(
|
||||
encoding=3,
|
||||
mime=mime,
|
||||
type=3,
|
||||
desc="Front cover",
|
||||
data=cover_data,
|
||||
))
|
||||
audio.save()
|
||||
|
||||
else:
|
||||
logger.debug("Cover-Embedding für %s nicht unterstützt", suffix)
|
||||
return
|
||||
|
||||
logger.info("Cover eingebettet: %s", audio_path.name)
|
||||
Loading…
Add table
Add a link
Reference in a new issue