diff --git a/.gitignore b/.gitignore
index b423d6a..adb26b3 100644
--- a/.gitignore
+++ b/.gitignore
@@ -11,6 +11,7 @@ dist/
*.egg
idea/
+CLAUDE.md
# Virtuelle Umgebungen
.venv/
diff --git a/src/musiksammlung/cli.py b/src/musiksammlung/cli.py
index f7404cb..5485bb3 100644
--- a/src/musiksammlung/cli.py
+++ b/src/musiksammlung/cli.py
@@ -15,6 +15,7 @@ from musiksammlung.ocr import ocr_images
from musiksammlung.organizer import apply_mapping, build_mapping
from musiksammlung.playlist import generate_playlist
from musiksammlung.tagger import tag_album
+from musiksammlung.vision_llm import parse_image
app = typer.Typer(
name="musiksammlung",
@@ -27,53 +28,96 @@ logging.basicConfig(
)
-@app.command()
-def scan(
- images: list[Path] = typer.Argument(..., help="Bilder der CD-Rückseite/Booklet"),
- output: Path = typer.Option("album.json", "--output", "-o", help="Ausgabe-JSON-Datei"),
- languages: str = typer.Option("deu+eng", "--lang", "-l", help="OCR-Sprachen"),
- backend: str = typer.Option("ollama", "--backend", "-b", help="LLM-Backend"),
- model: str = typer.Option("llama3", "--model", "-m", help="LLM-Modell"),
- base_url: str = typer.Option("http://localhost:11434", "--url", help="LLM-API-URL"),
-) -> None:
- """OCR + LLM → Album-JSON erzeugen (zur Prüfung vor dem Anwenden)."""
- # Bilder prüfen
- for img in images:
- if not img.exists():
- typer.echo(f"Fehler: Bild nicht gefunden: {img}", err=True)
- raise typer.Exit(1)
+def _scan_images(
+ images: list[Path],
+ vision: bool,
+ vision_model: str,
+ languages: str,
+ backend: str,
+ model: str,
+ base_url: str,
+) -> Album:
+ """Gemeinsame Scan-Logik für scan und process."""
+ if vision:
+ typer.echo(f"Vision-LLM ({vision_model})...")
+ return parse_image(images, model=vision_model, base_url=base_url)
+ else:
+ typer.echo("OCR...")
+ ocr_text = ocr_images(images, languages)
+ typer.echo(f"OCR-Text ({len(ocr_text)} Zeichen). LLM-Parsing...")
+ return parse_tracklist(
+ ocr_text, backend=backend, model=model, base_url=base_url
+ )
- typer.echo("Starte OCR...")
- ocr_text = ocr_images(images, languages)
- typer.echo(f"OCR-Text ({len(ocr_text)} Zeichen) erkannt.")
- typer.echo("Starte LLM-Parsing...")
- album = parse_tracklist(ocr_text, backend=backend, model=model, base_url=base_url)
-
- output.write_text(album.model_dump_json(indent=2), encoding="utf-8")
- typer.echo(f"Album-JSON gespeichert: {output}")
+def _print_album_summary(album: Album) -> None:
+ """Gibt eine kompakte Album-Zusammenfassung aus."""
typer.echo(f" Artist: {album.artist}")
typer.echo(f" Album: {album.album}")
typer.echo(f" Year: {album.year}")
for disc in album.discs:
typer.echo(f" Disc {disc.disc_number}: {len(disc.tracks)} Tracks")
+ for track in disc.tracks:
+ typer.echo(f" {track.track_number:2d}. {track.title}")
+
+
+@app.command()
+def scan(
+ images: list[Path] = typer.Argument(..., help="Bilder der CD-Rückseite/Booklet"),
+ output: Path = typer.Option(
+ "album.json", "--output", "-o", help="Ausgabe-JSON-Datei"
+ ),
+ vision: bool = typer.Option(
+ False, "--vision", "-v", help="Vision-LLM statt OCR+Text-LLM"
+ ),
+ vision_model: str = typer.Option(
+ "qwen3-vl:latest", "--vision-model", help="Vision-LLM-Modell"
+ ),
+ languages: str = typer.Option("deu+eng", "--lang", "-l", help="OCR-Sprachen"),
+ backend: str = typer.Option("ollama", "--backend", "-b", help="LLM-Backend"),
+ model: str = typer.Option("llama3", "--model", "-m", help="Text-LLM-Modell"),
+ base_url: str = typer.Option(
+ "http://localhost:11434", "--url", help="LLM-API-URL"
+ ),
+) -> None:
+ """Bilder → Album-JSON erzeugen (zur Prüfung vor dem Anwenden).
+
+ Mit --vision wird ein Vision-LLM (z.B. qwen3-vl) direkt auf die Bilder
+ angewendet. Ohne --vision wird Tesseract-OCR + Text-LLM verwendet.
+ """
+ for img in images:
+ if not img.exists():
+ typer.echo(f"Fehler: Bild nicht gefunden: {img}", err=True)
+ raise typer.Exit(1)
+
+ album = _scan_images(
+ images, vision, vision_model, languages, backend, model, base_url
+ )
+
+ output.write_text(album.model_dump_json(indent=2), encoding="utf-8")
+ typer.echo(f"Album-JSON gespeichert: {output}")
+ _print_album_summary(album)
@app.command()
def apply(
- input_dir: Path = typer.Argument(..., help="Verzeichnis mit gerippten Audiodateien"),
+ input_dir: Path = typer.Argument(
+ ..., help="Verzeichnis mit gerippten Audiodateien"
+ ),
album_json: Path = typer.Argument(..., help="Album-JSON aus 'scan'"),
output_dir: Path = typer.Argument(..., help="Jellyfin-Musikverzeichnis"),
front: Path | None = typer.Option(None, "--front", help="Front-Cover-Bild"),
- back: Path | None = typer.Option(None, "--back", help="Rückseiten-Cover-Bild"),
- dry_run: bool = typer.Option(False, "--dry-run", help="Nur anzeigen, nichts ändern"),
+ back: Path | None = typer.Option(
+ None, "--back", help="Rückseiten-Cover-Bild"
+ ),
+ dry_run: bool = typer.Option(
+ False, "--dry-run", help="Nur anzeigen, nichts ändern"
+ ),
) -> None:
"""Album-JSON + Audiodateien → Jellyfin-Struktur aufbauen."""
- # JSON laden und validieren
raw = json.loads(album_json.read_text(encoding="utf-8"))
album = Album.model_validate(raw)
- # Mapping berechnen und anzeigen
mapping = build_mapping(album, input_dir, output_dir)
typer.echo(f"Mapping: {len(mapping)} Dateien")
for src, dst in mapping.items():
@@ -83,24 +127,17 @@ def apply(
typer.echo("[DRY-RUN] Keine Änderungen vorgenommen.")
return
- # Dateien verschieben
apply_mapping(mapping)
- # Album-Verzeichnis bestimmen
first_target = next(iter(mapping.values()))
if len(album.discs) > 1:
- album_dir = first_target.parent.parent # CD1/ → Album/
+ album_dir = first_target.parent.parent
else:
album_dir = first_target.parent
- # Tags setzen
typer.echo("Setze Audio-Tags...")
tag_album(album, album_dir)
-
- # Cover kopieren
copy_covers(front, back, album_dir)
-
- # Playlist erzeugen
generate_playlist(album, album_dir)
typer.echo(f"Fertig! Album liegt in: {album_dir}")
@@ -108,12 +145,22 @@ def apply(
@app.command()
def process(
- input_dir: Path = typer.Argument(..., help="Verzeichnis mit Audiodateien und Bildern"),
+ input_dir: Path = typer.Argument(
+ ..., help="Verzeichnis mit Audiodateien und Bildern"
+ ),
output_dir: Path = typer.Argument(..., help="Jellyfin-Musikverzeichnis"),
front: Path | None = typer.Option(None, "--front", help="Front-Cover-Bild"),
- back: Path | None = typer.Option(None, "--back", help="Rückseiten-Bild (für OCR + Cover)"),
+ back: Path | None = typer.Option(
+ None, "--back", help="Rückseiten-Bild (für Scan + Cover)"
+ ),
images: list[Path] | None = typer.Option(
- None, "--image", "-i", help="Zusätzliche Bilder für OCR"
+ None, "--image", "-i", help="Zusätzliche Bilder für Scan"
+ ),
+ vision: bool = typer.Option(
+ False, "--vision", "-v", help="Vision-LLM statt OCR+Text-LLM"
+ ),
+ vision_model: str = typer.Option(
+ "qwen3-vl:latest", "--vision-model", help="Vision-LLM-Modell"
),
languages: str = typer.Option("deu+eng", "--lang", "-l"),
backend: str = typer.Option("ollama", "--backend", "-b"),
@@ -121,33 +168,32 @@ def process(
base_url: str = typer.Option("http://localhost:11434", "--url"),
dry_run: bool = typer.Option(False, "--dry-run"),
) -> None:
- """Komplett-Pipeline: OCR → LLM → Organize → Tag → Playlist."""
- # OCR-Bilder zusammenstellen
- ocr_sources: list[Path] = []
+ """Komplett-Pipeline: Scan → Organize → Tag → Playlist."""
+ scan_sources: list[Path] = []
if back and back.exists():
- ocr_sources.append(back)
+ scan_sources.append(back)
if images:
- ocr_sources.extend(images)
+ scan_sources.extend(images)
- if not ocr_sources:
- typer.echo("Fehler: Mindestens ein Bild für OCR nötig (--back oder --image)", err=True)
+ if not scan_sources:
+ typer.echo(
+ "Fehler: Mindestens ein Bild nötig (--back oder --image)", err=True
+ )
raise typer.Exit(1)
- # 1. OCR
- typer.echo("Schritt 1/5: OCR...")
- ocr_text = ocr_images(ocr_sources, languages)
-
- # 2. LLM-Parsing
- typer.echo("Schritt 2/5: LLM-Parsing...")
- album = parse_tracklist(ocr_text, backend=backend, model=model, base_url=base_url)
- typer.echo(f" → {album.artist} – {album.album} ({album.year})")
+ # 1. Scan (Vision oder OCR+LLM)
+ typer.echo("Schritt 1/4: Bilderkennung...")
+ album = _scan_images(
+ scan_sources, vision, vision_model, languages, backend, model, base_url
+ )
+ _print_album_summary(album)
# JSON zur Kontrolle speichern
json_path = input_dir / "album.json"
json_path.write_text(album.model_dump_json(indent=2), encoding="utf-8")
- # 3. Dateien organisieren
- typer.echo("Schritt 3/5: Dateien organisieren...")
+ # 2. Dateien organisieren
+ typer.echo("Schritt 2/4: Dateien organisieren...")
mapping = build_mapping(album, input_dir, output_dir)
apply_mapping(mapping, dry_run=dry_run)
@@ -155,17 +201,18 @@ def process(
typer.echo("[DRY-RUN] Abbruch nach Mapping-Anzeige.")
return
- # Album-Verzeichnis bestimmen
first_target = next(iter(mapping.values()))
- album_dir = first_target.parent.parent if len(album.discs) > 1 else first_target.parent
+ album_dir = (
+ first_target.parent.parent if len(album.discs) > 1 else first_target.parent
+ )
- # 4. Tags + Cover
- typer.echo("Schritt 4/5: Tags & Cover...")
+ # 3. Tags + Cover
+ typer.echo("Schritt 3/4: Tags & Cover...")
tag_album(album, album_dir)
copy_covers(front, back, album_dir)
- # 5. Playlist
- typer.echo("Schritt 5/5: Playlist...")
+ # 4. Playlist
+ typer.echo("Schritt 4/4: Playlist...")
generate_playlist(album, album_dir)
typer.echo(f"Fertig! Album: {album_dir}")
diff --git a/src/musiksammlung/ocr.py b/src/musiksammlung/ocr.py
index 8cbab2a..f963258 100644
--- a/src/musiksammlung/ocr.py
+++ b/src/musiksammlung/ocr.py
@@ -12,17 +12,86 @@ from PIL import Image, ImageFilter, ImageOps
logger = logging.getLogger(__name__)
+def _detect_and_fix_rotation(img: Image.Image) -> Image.Image:
+ """Erkennt und korrigiert Rotation via Tesseract OSD.
+
+ Tesseract --psm 0 gibt die erkannte Rotation aus. Wenn das fehlschlägt
+ (z.B. bei zu wenig Text), probieren wir alle 90°-Rotationen und nehmen
+ die mit dem meisten erkannten Text.
+ """
+ # Versuch 1: Tesseract OSD (Orientation and Script Detection)
+ with tempfile.NamedTemporaryFile(suffix=".png", delete=False) as tmp:
+ img.save(tmp.name, dpi=(300, 300))
+ tmp_path = tmp.name
+
+ try:
+ result = subprocess.run(
+ ["tesseract", tmp_path, "stdout", "--psm", "0"],
+ capture_output=True, text=True,
+ )
+ if result.returncode == 0:
+ for line in result.stdout.splitlines():
+ if "Rotate:" in line:
+ angle = int(line.split(":")[-1].strip())
+ if angle != 0:
+ logger.info("OSD erkannte Rotation: %d°, korrigiere...", angle)
+ return img.rotate(angle, expand=True)
+ return img
+ except Exception:
+ pass
+ finally:
+ Path(tmp_path).unlink(missing_ok=True)
+
+ # Versuch 2: Brute-Force — alle 90°-Rotationen testen
+ logger.debug("OSD fehlgeschlagen, teste alle Rotationen...")
+ best_text_len = 0
+ best_angle = 0
+
+ for angle in [0, 90, 180, 270]:
+ rotated = img.rotate(angle, expand=True) if angle else img
+ with tempfile.NamedTemporaryFile(suffix=".png", delete=False) as tmp:
+ rotated.save(tmp.name, dpi=(300, 300))
+ tmp_path = tmp.name
+ try:
+ result = subprocess.run(
+ ["tesseract", tmp_path, "stdout", "-l", "deu+eng", "--psm", "6"],
+ capture_output=True, text=True,
+ )
+ text = result.stdout.strip()
+ # Zähle alphanumerische Zeichen als Qualitätsmaß
+ alpha_count = sum(1 for c in text if c.isalpha())
+ if alpha_count > best_text_len:
+ best_text_len = alpha_count
+ best_angle = angle
+ finally:
+ Path(tmp_path).unlink(missing_ok=True)
+
+ if best_angle != 0:
+ logger.info("Beste Rotation: %d° (%d Buchstaben erkannt)", best_angle, best_text_len)
+ return img.rotate(best_angle, expand=True)
+ return img
+
+
def preprocess_image(image_path: Path) -> Path:
"""Verbessert Kontrast und Schärfe für bessere OCR-Ergebnisse.
+ Erkennt und korrigiert automatisch die Bildrotation.
+
Returns:
Pfad zum vorverarbeiteten Bild (temporäre Datei).
"""
img = Image.open(image_path)
+
+ # EXIF-Rotation anwenden (z.B. vom Smartphone)
+ img = ImageOps.exif_transpose(img)
+
img = ImageOps.grayscale(img)
img = ImageOps.autocontrast(img, cutoff=2)
img = img.filter(ImageFilter.SHARPEN)
+ # Rotation erkennen und korrigieren
+ img = _detect_and_fix_rotation(img)
+
tmp = tempfile.NamedTemporaryFile(suffix=".png", delete=False)
img.save(tmp.name, dpi=(300, 300))
logger.debug("Vorverarbeitetes Bild: %s → %s", image_path, tmp.name)
diff --git a/src/musiksammlung/vision_llm.py b/src/musiksammlung/vision_llm.py
new file mode 100644
index 0000000..171d23e
--- /dev/null
+++ b/src/musiksammlung/vision_llm.py
@@ -0,0 +1,150 @@
+"""Vision-LLM: Bild direkt an ein multimodales LLM senden, ohne OCR-Zwischenschritt."""
+
+from __future__ import annotations
+
+import base64
+import json
+import logging
+import re
+from pathlib import Path
+
+import httpx
+from pydantic import ValidationError
+
+from musiksammlung.models import Album
+
+logger = logging.getLogger(__name__)
+
+VISION_PROMPT = """\
+Lies das Foto einer CD-Rückseite oder eines Booklets ab. Das Bild kann gedreht sein.
+Extrahiere daraus die Metadaten und die vollständige Trackliste.
+
+WICHTIG:
+- "artist" ist der Hauptinterpret oder "Various Artists" bei Samplern/Compilations.
+- "album" ist der Albumtitel (z.B. "Deutsche Volkslieder", "Abbey Road").
+- "year" ist das Erscheinungsjahr (Zahl oder null wenn nicht sichtbar).
+- Lies die Tracktitel GENAU so ab, wie sie auf der CD stehen.
+- Achte besonders auf korrekte deutsche Umlaute (ä, ö, ü, ß).
+- Wenn "CD 1", "CD 2", "Disc 1" etc. sichtbar sind, erstelle mehrere Einträge in "discs".
+- Ohne Disc-Angabe: eine Disc mit disc_number=1.
+- Lasse Zeitangaben (z.B. "3:12") und Interpretenangaben pro Track weg.
+
+Antworte NUR mit dem JSON, ohne Erklärung. Beispiel:
+
+{"artist":"Various Artists","album":"Deutsche Volkslieder","year":null,""" # noqa: E501
+VISION_PROMPT += """"discs":[{"disc_number":1,"name":null,"tracks":["""
+VISION_PROMPT += """{"track_number":1,"title":"Erster Song"},"""
+VISION_PROMPT += """{"track_number":2,"title":"Zweiter Song"}]}]}"""
+VISION_PROMPT += """
+
+Jetzt lies das Bild ab und gib das vollständige JSON aus. /no_think"""
+
+
+def _encode_image(image_path: Path) -> str:
+ """Liest ein Bild und gibt es als Base64-String zurück."""
+ return base64.b64encode(image_path.read_bytes()).decode("utf-8")
+
+
+def _extract_json(text: str) -> str:
+ """Extrahiert JSON aus einer LLM-Antwort.
+
+ Behandelt:
+ - Reines JSON
+ - JSON in Markdown-Codeblöcken (```json ... ```)
+ - Thinking-Tags (...) vor dem JSON
+ - Sonstiger Text vor/nach dem JSON
+ """
+ if not text or not text.strip():
+ raise ValueError("Leere Antwort vom Vision-LLM")
+
+ # Thinking-Tags entfernen
+ text = re.sub(r".*?", "", text, flags=re.DOTALL).strip()
+
+ # Markdown-Codeblock extrahieren
+ md_match = re.search(r"```(?:json)?\s*(\{.*?\})\s*```", text, re.DOTALL)
+ if md_match:
+ return md_match.group(1)
+
+ # Äußerstes JSON-Objekt finden
+ brace_match = re.search(r"\{.*\}", text, re.DOTALL)
+ if brace_match:
+ return brace_match.group(0)
+
+ raise ValueError(f"Kein JSON in Antwort gefunden: {text[:200]}")
+
+
+def parse_image(
+ image_paths: list[Path],
+ model: str = "qwen3-vl:latest",
+ base_url: str = "http://localhost:11434",
+ max_retries: int = 3,
+) -> Album:
+ """Sendet Bilder direkt an ein Vision-LLM und extrahiert Album-Daten.
+
+ Args:
+ image_paths: Liste von Bilddateien (Cover-Rückseite, Booklet, etc.)
+ model: Ollama Vision-Modell
+ base_url: Ollama-API-URL
+ max_retries: Anzahl Wiederholungsversuche bei ungültigem JSON
+
+ Returns:
+ Validiertes Album-Objekt
+ """
+ images_b64 = [_encode_image(p) for p in image_paths]
+
+ messages = [
+ {
+ "role": "user",
+ "content": VISION_PROMPT,
+ "images": images_b64,
+ }
+ ]
+
+ last_error: Exception | None = None
+
+ for attempt in range(max_retries + 1):
+ try:
+ response = httpx.post(
+ f"{base_url}/api/chat",
+ json={
+ "model": model,
+ "messages": messages,
+ "stream": False,
+ },
+ timeout=300.0,
+ )
+ response.raise_for_status()
+
+ raw_text = response.json()["message"]["content"]
+ logger.info(
+ "Vision-LLM Antwort (Versuch %d, %d Zeichen)",
+ attempt + 1, len(raw_text),
+ )
+ logger.debug("Rohantwort: %s", raw_text[:1000])
+
+ json_str = _extract_json(raw_text)
+ data = json.loads(json_str)
+ album = Album.model_validate(data)
+
+ logger.info(
+ "Vision-LLM erfolgreich: %s - %s (%d Discs, %d Tracks)",
+ album.artist,
+ album.album,
+ len(album.discs),
+ sum(len(d.tracks) for d in album.discs),
+ )
+ return album
+
+ except (json.JSONDecodeError, ValidationError, ValueError) as e:
+ last_error = e
+ logger.warning(
+ "Versuch %d/%d fehlgeschlagen: %s",
+ attempt + 1, max_retries + 1, e,
+ )
+
+ except httpx.HTTPStatusError as e:
+ logger.error("HTTP-Fehler vom Vision-LLM: %s", e)
+ raise
+
+ msg = f"Vision-LLM lieferte nach {max_retries + 1} Versuchen kein valides Ergebnis"
+ raise ValueError(msg) from last_error
diff --git a/tests/test_vision_llm.py b/tests/test_vision_llm.py
new file mode 100644
index 0000000..b0dd881
--- /dev/null
+++ b/tests/test_vision_llm.py
@@ -0,0 +1,37 @@
+"""Tests für die Vision-LLM JSON-Extraktion."""
+
+import pytest
+
+from musiksammlung.vision_llm import _extract_json
+
+
+def test_extract_pure_json():
+ text = '{"artist": "Test", "album": "Album"}'
+ assert '"Test"' in _extract_json(text)
+
+
+def test_extract_json_from_markdown_block():
+ text = 'Hier ist das Ergebnis:\n```json\n{"artist": "Test"}\n```\nFertig.'
+ assert '"Test"' in _extract_json(text)
+
+
+def test_extract_json_with_thinking_tags():
+ text = 'Ich denke nach...\n{"artist": "Test", "album": "X"}'
+ result = _extract_json(text)
+ assert '"Test"' in result
+
+
+def test_extract_json_with_surrounding_text():
+ text = 'Das JSON:\n{"artist": "A", "album": "B"}\nEnde.'
+ result = _extract_json(text)
+ assert '"A"' in result
+
+
+def test_extract_json_empty_raises():
+ with pytest.raises(ValueError, match="Leere Antwort"):
+ _extract_json("")
+
+
+def test_extract_json_no_json_raises():
+ with pytest.raises(ValueError, match="Kein JSON"):
+ _extract_json("Hier ist kein JSON, nur Text.")