#!/usr/bin/env python3 """Backcover-Scanner-Test: Foto hochladen → Vision-LLM → strukturierte CD-Daten. Extrahiert alle sichtbaren Informationen vom Backcover und ordnet sie intelligent den Strukturdaten einer CD zu. Starten: python3 test_backcover_scan.py Beenden: Strg+C """ from __future__ import annotations import base64 import json import re import textwrap from pathlib import Path import httpx from pydantic import BaseModel, Field from musiksammlung.scanner_server import ScannerServer, print_qr # --------------------------------------------------------------------------- # Konfiguration # --------------------------------------------------------------------------- UPLOAD_DIR = Path("/tmp/backcover_scan_test") PORT = 8765 MODEL = "qwen3-vl:235b-cloud" BASE_URL = "http://localhost:11434" TIMEOUT_PHOTO = 300.0 # Sekunden warten auf Foto-Upload TIMEOUT_LLM = 180.0 # Sekunden warten auf LLM-Antwort # --------------------------------------------------------------------------- # Datenmodell # --------------------------------------------------------------------------- class TrackInfo(BaseModel): number: int title: str artist: str | None = None # nur bei Abweichung vom Hauptkünstler duration: str | None = None # "3:45" class DiscInfo(BaseModel): disc_number: int name: str | None = None # "CD 1", "Disc A", ... tracks: list[TrackInfo] = Field(default_factory=list) class BackcoverData(BaseModel): # Kern-Metadaten title: str | None = None artist: str | None = None year: int | None = None # CD-spezifisch ean: str | None = None # NUR Ziffern catalog_number: str | None = None # Katalognummer, z.B. "435 712-2" quality: str | None = None # "DDD", "ADD", "AAD", "DDA" … num_discs: int | None = None # Klassik-Felder composer: str | None = None conductor: str | None = None orchestra: str | None = None soloists: list[str] = Field(default_factory=list) # Produktion label: str | None = None # Plattenlabel / Verlag producer: str | None = None recording_info: str | None = None # Ort und/oder Datum # Tracklisten discs: list[DiscInfo] = Field(default_factory=list) # Alle weiteren Infos additional: dict[str, str] = Field(default_factory=dict) # --------------------------------------------------------------------------- # LLM-Prompt # --------------------------------------------------------------------------- BACKCOVER_PROMPT = """\ Du siehst das Foto einer CD-Hülle (Rückseite, Booklet oder beides). Extrahiere ALLE sichtbaren Informationen und ordne sie intelligent den Feldern zu. KERN-METADATEN: - title: Albumtitel - artist: Hauptkünstler / Interpret (bei Samplern: "Various Artists") - year: Erscheinungsjahr als Zahl (null wenn nicht sichtbar) CD-SPEZIFISCH: - ean: EAN-13 oder UPC-12 Barcode-Ziffern (NUR Ziffern, kein Leerzeichen; null wenn kein Barcode sichtbar) - catalog_number: Katalognummer des Labels (z.B. "435 712-2", "7243 5 55359 2 8") - quality: Aufnahme-/Abmischqualität falls angegeben (z.B. "DDD", "ADD", "AAD") - num_discs: Anzahl der CDs in der Box (1 wenn nicht angegeben) KLASSIK-FELDER (null / leer wenn nicht zutreffend): - composer: Komponist(en) - conductor: Dirigent - orchestra: Orchester oder Ensemble - soloists: Liste der Solisten, Format ["Vorname Name (Instrument)", ...] PRODUKTION: - label: Plattenlabel / Verlag (z.B. "Deutsche Grammophon", "EMI Classics", "Philips") - producer: Produzent(en) als Text - recording_info: Aufnahmeort und/oder -datum als Freitext TRACKLISTE: - discs: Liste aller CDs Jede CD hat: - disc_number: Nummer der CD (1, 2, ...) - name: Name der CD falls angegeben, sonst null - tracks: Liste der Tracks, jeder Track hat: - number: Tracknummer (Zahl) - title: Titel GENAU wie gedruckt (ohne Zeitangabe) - artist: Interpret NUR wenn vom Hauptkünstler abweichend, sonst null - duration: Laufzeit als "M:SS" oder "H:MM:SS" falls sichtbar, sonst null SONSTIGES: - additional: Alle weiteren Infos als Schlüssel-Wert-Paare, z.B.: {"Copyright": "© 1985 Polydor", "Tonmeister": "Max Muster", "Vertrieb": "Universal"} WICHTIGE REGELN: - Lies ALLE Spalten vollständig von oben nach unten (Backcoverss haben oft 2–4 Spalten) - EAN-Ziffern ohne Trennzeichen - Zeitangaben aus Titeln weglassen — nur in "duration" erfassen - Deutsche Umlaute und Sonderzeichen korrekt übernehmen - Felder auf null setzen wenn Information nicht sichtbar Antworte NUR mit dem JSON-Objekt, kein weiterer Text. /no_think""" # --------------------------------------------------------------------------- # Hilfsfunktionen # --------------------------------------------------------------------------- def query_llm(image_path: Path) -> str: """Schickt das Bild ans Vision-LLM, gibt Rohausgabe zurück.""" b64 = base64.b64encode(image_path.read_bytes()).decode() response = httpx.post( f"{BASE_URL}/api/chat", json={ "model": MODEL, "messages": [ {"role": "user", "content": BACKCOVER_PROMPT, "images": [b64]} ], "stream": False, }, timeout=TIMEOUT_LLM, ) response.raise_for_status() return response.json()["message"]["content"] def extract_json(raw: str) -> str: """Extrahiert JSON aus LLM-Antwort (bereinigt Think-Tags, Markdown).""" # Think-Tags entfernen raw = re.sub(r".*?", "", raw, flags=re.DOTALL).strip() # Markdown-Codeblock md = re.search(r"```(?:json)?\s*(\{.*?\})\s*```", raw, re.DOTALL) if md: return md.group(1) # Äußerstes JSON-Objekt obj = re.search(r"\{.*\}", raw, re.DOTALL) if obj: return obj.group(0) raise ValueError(f"Kein JSON in Antwort: {raw[:300]!r}") def print_result(data: BackcoverData) -> None: """Gibt das extrahierte Album strukturiert aus.""" W = 60 print(f"\n{'═'*W}") print(f" BACKCOVER-SCAN ERGEBNIS") print(f"{'═'*W}") def row(label: str, value: object) -> None: if value is None or value == [] or value == {}: return label_str = f" {label:<22}" val_str = str(value) wrapped = textwrap.wrap(val_str, width=W - 24) print(f"{label_str}{wrapped[0] if wrapped else val_str}") for line in wrapped[1:]: print(f" {' '*22}{line}") print(f"\n ── Kern-Metadaten ──────────────────────────") row("Titel", data.title) row("Künstler", data.artist) row("Jahr", data.year) row("Qualität", data.quality) row("Anzahl CDs", data.num_discs) row("EAN", data.ean) row("Katalognummer", data.catalog_number) if any([data.composer, data.conductor, data.orchestra, data.soloists]): print(f"\n ── Klassik ─────────────────────────────────") row("Komponist", data.composer) row("Dirigent", data.conductor) row("Orchester", data.orchestra) for s in data.soloists: row("Solist", s) if any([data.label, data.producer, data.recording_info]): print(f"\n ── Produktion ──────────────────────────────") row("Label", data.label) row("Produzent", data.producer) row("Aufnahme", data.recording_info) if data.discs: print(f"\n ── Trackliste ──────────────────────────────") for disc in data.discs: disc_label = disc.name or f"CD {disc.disc_number}" print(f"\n [{disc_label}] ({len(disc.tracks)} Tracks)") for t in disc.tracks: dur = f" [{t.duration}]" if t.duration else "" art = f" ({t.artist})" if t.artist else "" print(f" {t.number:>2}. {t.title}{art}{dur}") if data.additional: print(f"\n ── Weitere Infos ───────────────────────────") for k, v in data.additional.items(): row(k, v) print(f"\n{'═'*W}") # --------------------------------------------------------------------------- # Hauptprogramm # --------------------------------------------------------------------------- def run() -> None: UPLOAD_DIR.mkdir(parents=True, exist_ok=True) server = ScannerServer(port=PORT, upload_dir=UPLOAD_DIR) server.start() print(f"\nModell: {MODEL}") print_qr(server.url()) print(f"Upload-URL: {server.url()}\n") print("Tipp: CD-Hülle so fotografieren, dass Barcode UND Trackliste sichtbar sind.") runde = 0 try: while True: runde += 1 print(f"\n{'─'*60}") print(f"Runde {runde} — Bitte Foto hochladen (URL: {server.url()})") print(f"(Formular setzt sich nach 3s automatisch zurück)") photo = server.get_photo(timeout=TIMEOUT_PHOTO) if photo is None: print("Timeout — kein Foto empfangen.") print("Nochmal? (j/n) ", end="", flush=True) if input().strip().lower() not in ("j", "ja", "y", "yes"): break continue print(f"Foto empfangen: {photo} ({photo.stat().st_size:,} Bytes)") print(f"Vision-LLM analysiert ({MODEL}) …", flush=True) try: raw = query_llm(photo) except Exception as exc: print(f"LLM-Fehler: {exc}") else: print(f"\nRohantwort ({len(raw)} Zeichen):") print(textwrap.indent(raw[:800], " ")) if len(raw) > 800: print(f" … (+{len(raw)-800} weitere Zeichen)") try: json_str = extract_json(raw) data_dict = json.loads(json_str) data = BackcoverData.model_validate(data_dict) print_result(data) # JSON-Datei speichern out = UPLOAD_DIR / f"backcover_{runde:02d}.json" out.write_text( json.dumps(data_dict, indent=2, ensure_ascii=False), encoding="utf-8", ) print(f"\n Gespeichert: {out}") except (ValueError, json.JSONDecodeError) as exc: print(f"\n JSON-Fehler: {exc}") print(f" Rohantwort vollständig:\n{raw}") print("\nNochmal? (j/n) ", end="", flush=True) if input().strip().lower() not in ("j", "ja", "y", "yes"): break except KeyboardInterrupt: print("\nAbgebrochen.") finally: server.stop() print("Server gestoppt.") if __name__ == "__main__": run()