Add Vision-LLM mode for direct image-to-JSON extraction

Tesseract OCR fails on rotated/low-contrast CD back covers.
New vision_llm module sends images directly to qwen3-vl via
Ollama chat API, bypassing OCR entirely. Robust JSON extraction
handles thinking tags, markdown blocks, and empty responses.
CLI scan/process commands gain --vision flag.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Dieter Schlüter 2026-02-15 01:35:05 +01:00
commit 1753ab204f
5 changed files with 359 additions and 55 deletions

1
.gitignore vendored
View file

@ -11,6 +11,7 @@ dist/
*.egg *.egg
idea/ idea/
CLAUDE.md
# Virtuelle Umgebungen # Virtuelle Umgebungen
.venv/ .venv/

View file

@ -15,6 +15,7 @@ from musiksammlung.ocr import ocr_images
from musiksammlung.organizer import apply_mapping, build_mapping from musiksammlung.organizer import apply_mapping, build_mapping
from musiksammlung.playlist import generate_playlist from musiksammlung.playlist import generate_playlist
from musiksammlung.tagger import tag_album from musiksammlung.tagger import tag_album
from musiksammlung.vision_llm import parse_image
app = typer.Typer( app = typer.Typer(
name="musiksammlung", name="musiksammlung",
@ -27,53 +28,96 @@ logging.basicConfig(
) )
@app.command() def _scan_images(
def scan( images: list[Path],
images: list[Path] = typer.Argument(..., help="Bilder der CD-Rückseite/Booklet"), vision: bool,
output: Path = typer.Option("album.json", "--output", "-o", help="Ausgabe-JSON-Datei"), vision_model: str,
languages: str = typer.Option("deu+eng", "--lang", "-l", help="OCR-Sprachen"), languages: str,
backend: str = typer.Option("ollama", "--backend", "-b", help="LLM-Backend"), backend: str,
model: str = typer.Option("llama3", "--model", "-m", help="LLM-Modell"), model: str,
base_url: str = typer.Option("http://localhost:11434", "--url", help="LLM-API-URL"), base_url: str,
) -> None: ) -> Album:
"""OCR + LLM → Album-JSON erzeugen (zur Prüfung vor dem Anwenden).""" """Gemeinsame Scan-Logik für scan und process."""
# Bilder prüfen if vision:
for img in images: typer.echo(f"Vision-LLM ({vision_model})...")
if not img.exists(): return parse_image(images, model=vision_model, base_url=base_url)
typer.echo(f"Fehler: Bild nicht gefunden: {img}", err=True) else:
raise typer.Exit(1) typer.echo("OCR...")
typer.echo("Starte OCR...")
ocr_text = ocr_images(images, languages) ocr_text = ocr_images(images, languages)
typer.echo(f"OCR-Text ({len(ocr_text)} Zeichen) erkannt.") typer.echo(f"OCR-Text ({len(ocr_text)} Zeichen). LLM-Parsing...")
return parse_tracklist(
ocr_text, backend=backend, model=model, base_url=base_url
)
typer.echo("Starte LLM-Parsing...")
album = parse_tracklist(ocr_text, backend=backend, model=model, base_url=base_url)
output.write_text(album.model_dump_json(indent=2), encoding="utf-8") def _print_album_summary(album: Album) -> None:
typer.echo(f"Album-JSON gespeichert: {output}") """Gibt eine kompakte Album-Zusammenfassung aus."""
typer.echo(f" Artist: {album.artist}") typer.echo(f" Artist: {album.artist}")
typer.echo(f" Album: {album.album}") typer.echo(f" Album: {album.album}")
typer.echo(f" Year: {album.year}") typer.echo(f" Year: {album.year}")
for disc in album.discs: for disc in album.discs:
typer.echo(f" Disc {disc.disc_number}: {len(disc.tracks)} Tracks") typer.echo(f" Disc {disc.disc_number}: {len(disc.tracks)} Tracks")
for track in disc.tracks:
typer.echo(f" {track.track_number:2d}. {track.title}")
@app.command()
def scan(
images: list[Path] = typer.Argument(..., help="Bilder der CD-Rückseite/Booklet"),
output: Path = typer.Option(
"album.json", "--output", "-o", help="Ausgabe-JSON-Datei"
),
vision: bool = typer.Option(
False, "--vision", "-v", help="Vision-LLM statt OCR+Text-LLM"
),
vision_model: str = typer.Option(
"qwen3-vl:latest", "--vision-model", help="Vision-LLM-Modell"
),
languages: str = typer.Option("deu+eng", "--lang", "-l", help="OCR-Sprachen"),
backend: str = typer.Option("ollama", "--backend", "-b", help="LLM-Backend"),
model: str = typer.Option("llama3", "--model", "-m", help="Text-LLM-Modell"),
base_url: str = typer.Option(
"http://localhost:11434", "--url", help="LLM-API-URL"
),
) -> None:
"""Bilder → Album-JSON erzeugen (zur Prüfung vor dem Anwenden).
Mit --vision wird ein Vision-LLM (z.B. qwen3-vl) direkt auf die Bilder
angewendet. Ohne --vision wird Tesseract-OCR + Text-LLM verwendet.
"""
for img in images:
if not img.exists():
typer.echo(f"Fehler: Bild nicht gefunden: {img}", err=True)
raise typer.Exit(1)
album = _scan_images(
images, vision, vision_model, languages, backend, model, base_url
)
output.write_text(album.model_dump_json(indent=2), encoding="utf-8")
typer.echo(f"Album-JSON gespeichert: {output}")
_print_album_summary(album)
@app.command() @app.command()
def apply( def apply(
input_dir: Path = typer.Argument(..., help="Verzeichnis mit gerippten Audiodateien"), input_dir: Path = typer.Argument(
..., help="Verzeichnis mit gerippten Audiodateien"
),
album_json: Path = typer.Argument(..., help="Album-JSON aus 'scan'"), album_json: Path = typer.Argument(..., help="Album-JSON aus 'scan'"),
output_dir: Path = typer.Argument(..., help="Jellyfin-Musikverzeichnis"), output_dir: Path = typer.Argument(..., help="Jellyfin-Musikverzeichnis"),
front: Path | None = typer.Option(None, "--front", help="Front-Cover-Bild"), front: Path | None = typer.Option(None, "--front", help="Front-Cover-Bild"),
back: Path | None = typer.Option(None, "--back", help="Rückseiten-Cover-Bild"), back: Path | None = typer.Option(
dry_run: bool = typer.Option(False, "--dry-run", help="Nur anzeigen, nichts ändern"), None, "--back", help="Rückseiten-Cover-Bild"
),
dry_run: bool = typer.Option(
False, "--dry-run", help="Nur anzeigen, nichts ändern"
),
) -> None: ) -> None:
"""Album-JSON + Audiodateien → Jellyfin-Struktur aufbauen.""" """Album-JSON + Audiodateien → Jellyfin-Struktur aufbauen."""
# JSON laden und validieren
raw = json.loads(album_json.read_text(encoding="utf-8")) raw = json.loads(album_json.read_text(encoding="utf-8"))
album = Album.model_validate(raw) album = Album.model_validate(raw)
# Mapping berechnen und anzeigen
mapping = build_mapping(album, input_dir, output_dir) mapping = build_mapping(album, input_dir, output_dir)
typer.echo(f"Mapping: {len(mapping)} Dateien") typer.echo(f"Mapping: {len(mapping)} Dateien")
for src, dst in mapping.items(): for src, dst in mapping.items():
@ -83,24 +127,17 @@ def apply(
typer.echo("[DRY-RUN] Keine Änderungen vorgenommen.") typer.echo("[DRY-RUN] Keine Änderungen vorgenommen.")
return return
# Dateien verschieben
apply_mapping(mapping) apply_mapping(mapping)
# Album-Verzeichnis bestimmen
first_target = next(iter(mapping.values())) first_target = next(iter(mapping.values()))
if len(album.discs) > 1: if len(album.discs) > 1:
album_dir = first_target.parent.parent # CD1/ → Album/ album_dir = first_target.parent.parent
else: else:
album_dir = first_target.parent album_dir = first_target.parent
# Tags setzen
typer.echo("Setze Audio-Tags...") typer.echo("Setze Audio-Tags...")
tag_album(album, album_dir) tag_album(album, album_dir)
# Cover kopieren
copy_covers(front, back, album_dir) copy_covers(front, back, album_dir)
# Playlist erzeugen
generate_playlist(album, album_dir) generate_playlist(album, album_dir)
typer.echo(f"Fertig! Album liegt in: {album_dir}") typer.echo(f"Fertig! Album liegt in: {album_dir}")
@ -108,12 +145,22 @@ def apply(
@app.command() @app.command()
def process( def process(
input_dir: Path = typer.Argument(..., help="Verzeichnis mit Audiodateien und Bildern"), input_dir: Path = typer.Argument(
..., help="Verzeichnis mit Audiodateien und Bildern"
),
output_dir: Path = typer.Argument(..., help="Jellyfin-Musikverzeichnis"), output_dir: Path = typer.Argument(..., help="Jellyfin-Musikverzeichnis"),
front: Path | None = typer.Option(None, "--front", help="Front-Cover-Bild"), front: Path | None = typer.Option(None, "--front", help="Front-Cover-Bild"),
back: Path | None = typer.Option(None, "--back", help="Rückseiten-Bild (für OCR + Cover)"), back: Path | None = typer.Option(
None, "--back", help="Rückseiten-Bild (für Scan + Cover)"
),
images: list[Path] | None = typer.Option( images: list[Path] | None = typer.Option(
None, "--image", "-i", help="Zusätzliche Bilder für OCR" None, "--image", "-i", help="Zusätzliche Bilder für Scan"
),
vision: bool = typer.Option(
False, "--vision", "-v", help="Vision-LLM statt OCR+Text-LLM"
),
vision_model: str = typer.Option(
"qwen3-vl:latest", "--vision-model", help="Vision-LLM-Modell"
), ),
languages: str = typer.Option("deu+eng", "--lang", "-l"), languages: str = typer.Option("deu+eng", "--lang", "-l"),
backend: str = typer.Option("ollama", "--backend", "-b"), backend: str = typer.Option("ollama", "--backend", "-b"),
@ -121,33 +168,32 @@ def process(
base_url: str = typer.Option("http://localhost:11434", "--url"), base_url: str = typer.Option("http://localhost:11434", "--url"),
dry_run: bool = typer.Option(False, "--dry-run"), dry_run: bool = typer.Option(False, "--dry-run"),
) -> None: ) -> None:
"""Komplett-Pipeline: OCR → LLM → Organize → Tag → Playlist.""" """Komplett-Pipeline: Scan → Organize → Tag → Playlist."""
# OCR-Bilder zusammenstellen scan_sources: list[Path] = []
ocr_sources: list[Path] = []
if back and back.exists(): if back and back.exists():
ocr_sources.append(back) scan_sources.append(back)
if images: if images:
ocr_sources.extend(images) scan_sources.extend(images)
if not ocr_sources: if not scan_sources:
typer.echo("Fehler: Mindestens ein Bild für OCR nötig (--back oder --image)", err=True) typer.echo(
"Fehler: Mindestens ein Bild nötig (--back oder --image)", err=True
)
raise typer.Exit(1) raise typer.Exit(1)
# 1. OCR # 1. Scan (Vision oder OCR+LLM)
typer.echo("Schritt 1/5: OCR...") typer.echo("Schritt 1/4: Bilderkennung...")
ocr_text = ocr_images(ocr_sources, languages) album = _scan_images(
scan_sources, vision, vision_model, languages, backend, model, base_url
# 2. LLM-Parsing )
typer.echo("Schritt 2/5: LLM-Parsing...") _print_album_summary(album)
album = parse_tracklist(ocr_text, backend=backend, model=model, base_url=base_url)
typer.echo(f"{album.artist} {album.album} ({album.year})")
# JSON zur Kontrolle speichern # JSON zur Kontrolle speichern
json_path = input_dir / "album.json" json_path = input_dir / "album.json"
json_path.write_text(album.model_dump_json(indent=2), encoding="utf-8") json_path.write_text(album.model_dump_json(indent=2), encoding="utf-8")
# 3. Dateien organisieren # 2. Dateien organisieren
typer.echo("Schritt 3/5: Dateien organisieren...") typer.echo("Schritt 2/4: Dateien organisieren...")
mapping = build_mapping(album, input_dir, output_dir) mapping = build_mapping(album, input_dir, output_dir)
apply_mapping(mapping, dry_run=dry_run) apply_mapping(mapping, dry_run=dry_run)
@ -155,17 +201,18 @@ def process(
typer.echo("[DRY-RUN] Abbruch nach Mapping-Anzeige.") typer.echo("[DRY-RUN] Abbruch nach Mapping-Anzeige.")
return return
# Album-Verzeichnis bestimmen
first_target = next(iter(mapping.values())) first_target = next(iter(mapping.values()))
album_dir = first_target.parent.parent if len(album.discs) > 1 else first_target.parent album_dir = (
first_target.parent.parent if len(album.discs) > 1 else first_target.parent
)
# 4. Tags + Cover # 3. Tags + Cover
typer.echo("Schritt 4/5: Tags & Cover...") typer.echo("Schritt 3/4: Tags & Cover...")
tag_album(album, album_dir) tag_album(album, album_dir)
copy_covers(front, back, album_dir) copy_covers(front, back, album_dir)
# 5. Playlist # 4. Playlist
typer.echo("Schritt 5/5: Playlist...") typer.echo("Schritt 4/4: Playlist...")
generate_playlist(album, album_dir) generate_playlist(album, album_dir)
typer.echo(f"Fertig! Album: {album_dir}") typer.echo(f"Fertig! Album: {album_dir}")

View file

@ -12,17 +12,86 @@ from PIL import Image, ImageFilter, ImageOps
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def _detect_and_fix_rotation(img: Image.Image) -> Image.Image:
"""Erkennt und korrigiert Rotation via Tesseract OSD.
Tesseract --psm 0 gibt die erkannte Rotation aus. Wenn das fehlschlägt
(z.B. bei zu wenig Text), probieren wir alle 90°-Rotationen und nehmen
die mit dem meisten erkannten Text.
"""
# Versuch 1: Tesseract OSD (Orientation and Script Detection)
with tempfile.NamedTemporaryFile(suffix=".png", delete=False) as tmp:
img.save(tmp.name, dpi=(300, 300))
tmp_path = tmp.name
try:
result = subprocess.run(
["tesseract", tmp_path, "stdout", "--psm", "0"],
capture_output=True, text=True,
)
if result.returncode == 0:
for line in result.stdout.splitlines():
if "Rotate:" in line:
angle = int(line.split(":")[-1].strip())
if angle != 0:
logger.info("OSD erkannte Rotation: %d°, korrigiere...", angle)
return img.rotate(angle, expand=True)
return img
except Exception:
pass
finally:
Path(tmp_path).unlink(missing_ok=True)
# Versuch 2: Brute-Force — alle 90°-Rotationen testen
logger.debug("OSD fehlgeschlagen, teste alle Rotationen...")
best_text_len = 0
best_angle = 0
for angle in [0, 90, 180, 270]:
rotated = img.rotate(angle, expand=True) if angle else img
with tempfile.NamedTemporaryFile(suffix=".png", delete=False) as tmp:
rotated.save(tmp.name, dpi=(300, 300))
tmp_path = tmp.name
try:
result = subprocess.run(
["tesseract", tmp_path, "stdout", "-l", "deu+eng", "--psm", "6"],
capture_output=True, text=True,
)
text = result.stdout.strip()
# Zähle alphanumerische Zeichen als Qualitätsmaß
alpha_count = sum(1 for c in text if c.isalpha())
if alpha_count > best_text_len:
best_text_len = alpha_count
best_angle = angle
finally:
Path(tmp_path).unlink(missing_ok=True)
if best_angle != 0:
logger.info("Beste Rotation: %d° (%d Buchstaben erkannt)", best_angle, best_text_len)
return img.rotate(best_angle, expand=True)
return img
def preprocess_image(image_path: Path) -> Path: def preprocess_image(image_path: Path) -> Path:
"""Verbessert Kontrast und Schärfe für bessere OCR-Ergebnisse. """Verbessert Kontrast und Schärfe für bessere OCR-Ergebnisse.
Erkennt und korrigiert automatisch die Bildrotation.
Returns: Returns:
Pfad zum vorverarbeiteten Bild (temporäre Datei). Pfad zum vorverarbeiteten Bild (temporäre Datei).
""" """
img = Image.open(image_path) img = Image.open(image_path)
# EXIF-Rotation anwenden (z.B. vom Smartphone)
img = ImageOps.exif_transpose(img)
img = ImageOps.grayscale(img) img = ImageOps.grayscale(img)
img = ImageOps.autocontrast(img, cutoff=2) img = ImageOps.autocontrast(img, cutoff=2)
img = img.filter(ImageFilter.SHARPEN) img = img.filter(ImageFilter.SHARPEN)
# Rotation erkennen und korrigieren
img = _detect_and_fix_rotation(img)
tmp = tempfile.NamedTemporaryFile(suffix=".png", delete=False) tmp = tempfile.NamedTemporaryFile(suffix=".png", delete=False)
img.save(tmp.name, dpi=(300, 300)) img.save(tmp.name, dpi=(300, 300))
logger.debug("Vorverarbeitetes Bild: %s%s", image_path, tmp.name) logger.debug("Vorverarbeitetes Bild: %s%s", image_path, tmp.name)

View file

@ -0,0 +1,150 @@
"""Vision-LLM: Bild direkt an ein multimodales LLM senden, ohne OCR-Zwischenschritt."""
from __future__ import annotations
import base64
import json
import logging
import re
from pathlib import Path
import httpx
from pydantic import ValidationError
from musiksammlung.models import Album
logger = logging.getLogger(__name__)
VISION_PROMPT = """\
Lies das Foto einer CD-Rückseite oder eines Booklets ab. Das Bild kann gedreht sein.
Extrahiere daraus die Metadaten und die vollständige Trackliste.
WICHTIG:
- "artist" ist der Hauptinterpret oder "Various Artists" bei Samplern/Compilations.
- "album" ist der Albumtitel (z.B. "Deutsche Volkslieder", "Abbey Road").
- "year" ist das Erscheinungsjahr (Zahl oder null wenn nicht sichtbar).
- Lies die Tracktitel GENAU so ab, wie sie auf der CD stehen.
- Achte besonders auf korrekte deutsche Umlaute (ä, ö, ü, ß).
- Wenn "CD 1", "CD 2", "Disc 1" etc. sichtbar sind, erstelle mehrere Einträge in "discs".
- Ohne Disc-Angabe: eine Disc mit disc_number=1.
- Lasse Zeitangaben (z.B. "3:12") und Interpretenangaben pro Track weg.
Antworte NUR mit dem JSON, ohne Erklärung. Beispiel:
{"artist":"Various Artists","album":"Deutsche Volkslieder","year":null,""" # noqa: E501
VISION_PROMPT += """"discs":[{"disc_number":1,"name":null,"tracks":["""
VISION_PROMPT += """{"track_number":1,"title":"Erster Song"},"""
VISION_PROMPT += """{"track_number":2,"title":"Zweiter Song"}]}]}"""
VISION_PROMPT += """
Jetzt lies das Bild ab und gib das vollständige JSON aus. /no_think"""
def _encode_image(image_path: Path) -> str:
"""Liest ein Bild und gibt es als Base64-String zurück."""
return base64.b64encode(image_path.read_bytes()).decode("utf-8")
def _extract_json(text: str) -> str:
"""Extrahiert JSON aus einer LLM-Antwort.
Behandelt:
- Reines JSON
- JSON in Markdown-Codeblöcken (```json ... ```)
- Thinking-Tags (<think>...</think>) vor dem JSON
- Sonstiger Text vor/nach dem JSON
"""
if not text or not text.strip():
raise ValueError("Leere Antwort vom Vision-LLM")
# Thinking-Tags entfernen
text = re.sub(r"<think>.*?</think>", "", text, flags=re.DOTALL).strip()
# Markdown-Codeblock extrahieren
md_match = re.search(r"```(?:json)?\s*(\{.*?\})\s*```", text, re.DOTALL)
if md_match:
return md_match.group(1)
# Äußerstes JSON-Objekt finden
brace_match = re.search(r"\{.*\}", text, re.DOTALL)
if brace_match:
return brace_match.group(0)
raise ValueError(f"Kein JSON in Antwort gefunden: {text[:200]}")
def parse_image(
image_paths: list[Path],
model: str = "qwen3-vl:latest",
base_url: str = "http://localhost:11434",
max_retries: int = 3,
) -> Album:
"""Sendet Bilder direkt an ein Vision-LLM und extrahiert Album-Daten.
Args:
image_paths: Liste von Bilddateien (Cover-Rückseite, Booklet, etc.)
model: Ollama Vision-Modell
base_url: Ollama-API-URL
max_retries: Anzahl Wiederholungsversuche bei ungültigem JSON
Returns:
Validiertes Album-Objekt
"""
images_b64 = [_encode_image(p) for p in image_paths]
messages = [
{
"role": "user",
"content": VISION_PROMPT,
"images": images_b64,
}
]
last_error: Exception | None = None
for attempt in range(max_retries + 1):
try:
response = httpx.post(
f"{base_url}/api/chat",
json={
"model": model,
"messages": messages,
"stream": False,
},
timeout=300.0,
)
response.raise_for_status()
raw_text = response.json()["message"]["content"]
logger.info(
"Vision-LLM Antwort (Versuch %d, %d Zeichen)",
attempt + 1, len(raw_text),
)
logger.debug("Rohantwort: %s", raw_text[:1000])
json_str = _extract_json(raw_text)
data = json.loads(json_str)
album = Album.model_validate(data)
logger.info(
"Vision-LLM erfolgreich: %s - %s (%d Discs, %d Tracks)",
album.artist,
album.album,
len(album.discs),
sum(len(d.tracks) for d in album.discs),
)
return album
except (json.JSONDecodeError, ValidationError, ValueError) as e:
last_error = e
logger.warning(
"Versuch %d/%d fehlgeschlagen: %s",
attempt + 1, max_retries + 1, e,
)
except httpx.HTTPStatusError as e:
logger.error("HTTP-Fehler vom Vision-LLM: %s", e)
raise
msg = f"Vision-LLM lieferte nach {max_retries + 1} Versuchen kein valides Ergebnis"
raise ValueError(msg) from last_error

37
tests/test_vision_llm.py Normal file
View file

@ -0,0 +1,37 @@
"""Tests für die Vision-LLM JSON-Extraktion."""
import pytest
from musiksammlung.vision_llm import _extract_json
def test_extract_pure_json():
text = '{"artist": "Test", "album": "Album"}'
assert '"Test"' in _extract_json(text)
def test_extract_json_from_markdown_block():
text = 'Hier ist das Ergebnis:\n```json\n{"artist": "Test"}\n```\nFertig.'
assert '"Test"' in _extract_json(text)
def test_extract_json_with_thinking_tags():
text = '<think>Ich denke nach...</think>\n{"artist": "Test", "album": "X"}'
result = _extract_json(text)
assert '"Test"' in result
def test_extract_json_with_surrounding_text():
text = 'Das JSON:\n{"artist": "A", "album": "B"}\nEnde.'
result = _extract_json(text)
assert '"A"' in result
def test_extract_json_empty_raises():
with pytest.raises(ValueError, match="Leere Antwort"):
_extract_json("")
def test_extract_json_no_json_raises():
with pytest.raises(ValueError, match="Kein JSON"):
_extract_json("Hier ist kein JSON, nur Text.")