Musiksammlung/test_backcover_scan.py

308 lines
11 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
"""Backcover-Scanner-Test: Foto hochladen → Vision-LLM → strukturierte CD-Daten.
Extrahiert alle sichtbaren Informationen vom Backcover und ordnet sie
intelligent den Strukturdaten einer CD zu.
Starten: python3 test_backcover_scan.py
Beenden: Strg+C
"""
from __future__ import annotations
import base64
import json
import re
import textwrap
from pathlib import Path
import httpx
from pydantic import BaseModel, Field
from musiksammlung.scanner_server import ScannerServer, print_qr
# ---------------------------------------------------------------------------
# Konfiguration
# ---------------------------------------------------------------------------
UPLOAD_DIR = Path("/tmp/backcover_scan_test")
PORT = 8765
MODEL = "qwen3-vl:235b-cloud"
BASE_URL = "http://localhost:11434"
TIMEOUT_PHOTO = 300.0 # Sekunden warten auf Foto-Upload
TIMEOUT_LLM = 180.0 # Sekunden warten auf LLM-Antwort
# ---------------------------------------------------------------------------
# Datenmodell
# ---------------------------------------------------------------------------
class TrackInfo(BaseModel):
number: int
title: str
artist: str | None = None # nur bei Abweichung vom Hauptkünstler
duration: str | None = None # "3:45"
class DiscInfo(BaseModel):
disc_number: int
name: str | None = None # "CD 1", "Disc A", ...
tracks: list[TrackInfo] = Field(default_factory=list)
class BackcoverData(BaseModel):
# Kern-Metadaten
title: str | None = None
artist: str | None = None
year: int | None = None
# CD-spezifisch
ean: str | None = None # NUR Ziffern
catalog_number: str | None = None # Katalognummer, z.B. "435 712-2"
quality: str | None = None # "DDD", "ADD", "AAD", "DDA" …
num_discs: int | None = None
# Klassik-Felder
composer: str | None = None
conductor: str | None = None
orchestra: str | None = None
soloists: list[str] = Field(default_factory=list)
# Produktion
label: str | None = None # Plattenlabel / Verlag
producer: str | None = None
recording_info: str | None = None # Ort und/oder Datum
# Tracklisten
discs: list[DiscInfo] = Field(default_factory=list)
# Alle weiteren Infos
additional: dict[str, str] = Field(default_factory=dict)
# ---------------------------------------------------------------------------
# LLM-Prompt
# ---------------------------------------------------------------------------
BACKCOVER_PROMPT = """\
Du siehst das Foto einer CD-Hülle (Rückseite, Booklet oder beides).
Extrahiere ALLE sichtbaren Informationen und ordne sie intelligent den Feldern zu.
KERN-METADATEN:
- title: Albumtitel
- artist: Hauptkünstler / Interpret (bei Samplern: "Various Artists")
- year: Erscheinungsjahr als Zahl (null wenn nicht sichtbar)
CD-SPEZIFISCH:
- ean: EAN-13 oder UPC-12 Barcode-Ziffern (NUR Ziffern, kein Leerzeichen; null wenn kein Barcode sichtbar)
- catalog_number: Katalognummer des Labels (z.B. "435 712-2", "7243 5 55359 2 8")
- quality: Aufnahme-/Abmischqualität falls angegeben (z.B. "DDD", "ADD", "AAD")
- num_discs: Anzahl der CDs in der Box (1 wenn nicht angegeben)
KLASSIK-FELDER (null / leer wenn nicht zutreffend):
- composer: Komponist(en)
- conductor: Dirigent
- orchestra: Orchester oder Ensemble
- soloists: Liste der Solisten, Format ["Vorname Name (Instrument)", ...]
PRODUKTION:
- label: Plattenlabel / Verlag (z.B. "Deutsche Grammophon", "EMI Classics", "Philips")
- producer: Produzent(en) als Text
- recording_info: Aufnahmeort und/oder -datum als Freitext
TRACKLISTE:
- discs: Liste aller CDs
Jede CD hat:
- disc_number: Nummer der CD (1, 2, ...)
- name: Name der CD falls angegeben, sonst null
- tracks: Liste der Tracks, jeder Track hat:
- number: Tracknummer (Zahl)
- title: Titel GENAU wie gedruckt (ohne Zeitangabe)
- artist: Interpret NUR wenn vom Hauptkünstler abweichend, sonst null
- duration: Laufzeit als "M:SS" oder "H:MM:SS" falls sichtbar, sonst null
SONSTIGES:
- additional: Alle weiteren Infos als Schlüssel-Wert-Paare, z.B.:
{"Copyright": "© 1985 Polydor", "Tonmeister": "Max Muster", "Vertrieb": "Universal"}
WICHTIGE REGELN:
- Lies ALLE Spalten vollständig von oben nach unten (Backcoverss haben oft 24 Spalten)
- EAN-Ziffern ohne Trennzeichen
- Zeitangaben aus Titeln weglassen nur in "duration" erfassen
- Deutsche Umlaute und Sonderzeichen korrekt übernehmen
- Felder auf null setzen wenn Information nicht sichtbar
Antworte NUR mit dem JSON-Objekt, kein weiterer Text. /no_think"""
# ---------------------------------------------------------------------------
# Hilfsfunktionen
# ---------------------------------------------------------------------------
def query_llm(image_path: Path) -> str:
"""Schickt das Bild ans Vision-LLM, gibt Rohausgabe zurück."""
b64 = base64.b64encode(image_path.read_bytes()).decode()
response = httpx.post(
f"{BASE_URL}/api/chat",
json={
"model": MODEL,
"messages": [
{"role": "user", "content": BACKCOVER_PROMPT, "images": [b64]}
],
"stream": False,
},
timeout=TIMEOUT_LLM,
)
response.raise_for_status()
return response.json()["message"]["content"]
def extract_json(raw: str) -> str:
"""Extrahiert JSON aus LLM-Antwort (bereinigt Think-Tags, Markdown)."""
# Think-Tags entfernen
raw = re.sub(r"<think>.*?</think>", "", raw, flags=re.DOTALL).strip()
# Markdown-Codeblock
md = re.search(r"```(?:json)?\s*(\{.*?\})\s*```", raw, re.DOTALL)
if md:
return md.group(1)
# Äußerstes JSON-Objekt
obj = re.search(r"\{.*\}", raw, re.DOTALL)
if obj:
return obj.group(0)
raise ValueError(f"Kein JSON in Antwort: {raw[:300]!r}")
def print_result(data: BackcoverData) -> None:
"""Gibt das extrahierte Album strukturiert aus."""
W = 60
print(f"\n{''*W}")
print(f" BACKCOVER-SCAN ERGEBNIS")
print(f"{''*W}")
def row(label: str, value: object) -> None:
if value is None or value == [] or value == {}:
return
label_str = f" {label:<22}"
val_str = str(value)
wrapped = textwrap.wrap(val_str, width=W - 24)
print(f"{label_str}{wrapped[0] if wrapped else val_str}")
for line in wrapped[1:]:
print(f" {' '*22}{line}")
print(f"\n ── Kern-Metadaten ──────────────────────────")
row("Titel", data.title)
row("Künstler", data.artist)
row("Jahr", data.year)
row("Qualität", data.quality)
row("Anzahl CDs", data.num_discs)
row("EAN", data.ean)
row("Katalognummer", data.catalog_number)
if any([data.composer, data.conductor, data.orchestra, data.soloists]):
print(f"\n ── Klassik ─────────────────────────────────")
row("Komponist", data.composer)
row("Dirigent", data.conductor)
row("Orchester", data.orchestra)
for s in data.soloists:
row("Solist", s)
if any([data.label, data.producer, data.recording_info]):
print(f"\n ── Produktion ──────────────────────────────")
row("Label", data.label)
row("Produzent", data.producer)
row("Aufnahme", data.recording_info)
if data.discs:
print(f"\n ── Trackliste ──────────────────────────────")
for disc in data.discs:
disc_label = disc.name or f"CD {disc.disc_number}"
print(f"\n [{disc_label}] ({len(disc.tracks)} Tracks)")
for t in disc.tracks:
dur = f" [{t.duration}]" if t.duration else ""
art = f" ({t.artist})" if t.artist else ""
print(f" {t.number:>2}. {t.title}{art}{dur}")
if data.additional:
print(f"\n ── Weitere Infos ───────────────────────────")
for k, v in data.additional.items():
row(k, v)
print(f"\n{''*W}")
# ---------------------------------------------------------------------------
# Hauptprogramm
# ---------------------------------------------------------------------------
def run() -> None:
UPLOAD_DIR.mkdir(parents=True, exist_ok=True)
server = ScannerServer(port=PORT, upload_dir=UPLOAD_DIR)
server.start()
print(f"\nModell: {MODEL}")
print_qr(server.url())
print(f"Upload-URL: {server.url()}\n")
print("Tipp: CD-Hülle so fotografieren, dass Barcode UND Trackliste sichtbar sind.")
runde = 0
try:
while True:
runde += 1
print(f"\n{''*60}")
print(f"Runde {runde} — Bitte Foto hochladen (URL: {server.url()})")
print(f"(Formular setzt sich nach 3s automatisch zurück)")
photo = server.get_photo(timeout=TIMEOUT_PHOTO)
if photo is None:
print("Timeout — kein Foto empfangen.")
print("Nochmal? (j/n) ", end="", flush=True)
if input().strip().lower() not in ("j", "ja", "y", "yes"):
break
continue
print(f"Foto empfangen: {photo} ({photo.stat().st_size:,} Bytes)")
print(f"Vision-LLM analysiert ({MODEL}) …", flush=True)
try:
raw = query_llm(photo)
except Exception as exc:
print(f"LLM-Fehler: {exc}")
else:
print(f"\nRohantwort ({len(raw)} Zeichen):")
print(textwrap.indent(raw[:800], " "))
if len(raw) > 800:
print(f" … (+{len(raw)-800} weitere Zeichen)")
try:
json_str = extract_json(raw)
data_dict = json.loads(json_str)
data = BackcoverData.model_validate(data_dict)
print_result(data)
# JSON-Datei speichern
out = UPLOAD_DIR / f"backcover_{runde:02d}.json"
out.write_text(
json.dumps(data_dict, indent=2, ensure_ascii=False),
encoding="utf-8",
)
print(f"\n Gespeichert: {out}")
except (ValueError, json.JSONDecodeError) as exc:
print(f"\n JSON-Fehler: {exc}")
print(f" Rohantwort vollständig:\n{raw}")
print("\nNochmal? (j/n) ", end="", flush=True)
if input().strip().lower() not in ("j", "ja", "y", "yes"):
break
except KeyboardInterrupt:
print("\nAbgebrochen.")
finally:
server.stop()
print("Server gestoppt.")
if __name__ == "__main__":
run()