308 lines
11 KiB
Python
308 lines
11 KiB
Python
|
|
#!/usr/bin/env python3
|
|||
|
|
"""Backcover-Scanner-Test: Foto hochladen → Vision-LLM → strukturierte CD-Daten.
|
|||
|
|
|
|||
|
|
Extrahiert alle sichtbaren Informationen vom Backcover und ordnet sie
|
|||
|
|
intelligent den Strukturdaten einer CD zu.
|
|||
|
|
|
|||
|
|
Starten: python3 test_backcover_scan.py
|
|||
|
|
Beenden: Strg+C
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
from __future__ import annotations
|
|||
|
|
|
|||
|
|
import base64
|
|||
|
|
import json
|
|||
|
|
import re
|
|||
|
|
import textwrap
|
|||
|
|
from pathlib import Path
|
|||
|
|
|
|||
|
|
import httpx
|
|||
|
|
from pydantic import BaseModel, Field
|
|||
|
|
|
|||
|
|
from musiksammlung.scanner_server import ScannerServer, print_qr
|
|||
|
|
|
|||
|
|
# ---------------------------------------------------------------------------
|
|||
|
|
# Konfiguration
|
|||
|
|
# ---------------------------------------------------------------------------
|
|||
|
|
|
|||
|
|
UPLOAD_DIR = Path("/tmp/backcover_scan_test")
|
|||
|
|
PORT = 8765
|
|||
|
|
MODEL = "qwen3-vl:235b-cloud"
|
|||
|
|
BASE_URL = "http://localhost:11434"
|
|||
|
|
TIMEOUT_PHOTO = 300.0 # Sekunden warten auf Foto-Upload
|
|||
|
|
TIMEOUT_LLM = 180.0 # Sekunden warten auf LLM-Antwort
|
|||
|
|
|
|||
|
|
# ---------------------------------------------------------------------------
|
|||
|
|
# Datenmodell
|
|||
|
|
# ---------------------------------------------------------------------------
|
|||
|
|
|
|||
|
|
|
|||
|
|
class TrackInfo(BaseModel):
|
|||
|
|
number: int
|
|||
|
|
title: str
|
|||
|
|
artist: str | None = None # nur bei Abweichung vom Hauptkünstler
|
|||
|
|
duration: str | None = None # "3:45"
|
|||
|
|
|
|||
|
|
|
|||
|
|
class DiscInfo(BaseModel):
|
|||
|
|
disc_number: int
|
|||
|
|
name: str | None = None # "CD 1", "Disc A", ...
|
|||
|
|
tracks: list[TrackInfo] = Field(default_factory=list)
|
|||
|
|
|
|||
|
|
|
|||
|
|
class BackcoverData(BaseModel):
|
|||
|
|
# Kern-Metadaten
|
|||
|
|
title: str | None = None
|
|||
|
|
artist: str | None = None
|
|||
|
|
year: int | None = None
|
|||
|
|
|
|||
|
|
# CD-spezifisch
|
|||
|
|
ean: str | None = None # NUR Ziffern
|
|||
|
|
catalog_number: str | None = None # Katalognummer, z.B. "435 712-2"
|
|||
|
|
quality: str | None = None # "DDD", "ADD", "AAD", "DDA" …
|
|||
|
|
num_discs: int | None = None
|
|||
|
|
|
|||
|
|
# Klassik-Felder
|
|||
|
|
composer: str | None = None
|
|||
|
|
conductor: str | None = None
|
|||
|
|
orchestra: str | None = None
|
|||
|
|
soloists: list[str] = Field(default_factory=list)
|
|||
|
|
|
|||
|
|
# Produktion
|
|||
|
|
label: str | None = None # Plattenlabel / Verlag
|
|||
|
|
producer: str | None = None
|
|||
|
|
recording_info: str | None = None # Ort und/oder Datum
|
|||
|
|
|
|||
|
|
# Tracklisten
|
|||
|
|
discs: list[DiscInfo] = Field(default_factory=list)
|
|||
|
|
|
|||
|
|
# Alle weiteren Infos
|
|||
|
|
additional: dict[str, str] = Field(default_factory=dict)
|
|||
|
|
|
|||
|
|
|
|||
|
|
# ---------------------------------------------------------------------------
|
|||
|
|
# LLM-Prompt
|
|||
|
|
# ---------------------------------------------------------------------------
|
|||
|
|
|
|||
|
|
BACKCOVER_PROMPT = """\
|
|||
|
|
Du siehst das Foto einer CD-Hülle (Rückseite, Booklet oder beides).
|
|||
|
|
Extrahiere ALLE sichtbaren Informationen und ordne sie intelligent den Feldern zu.
|
|||
|
|
|
|||
|
|
KERN-METADATEN:
|
|||
|
|
- title: Albumtitel
|
|||
|
|
- artist: Hauptkünstler / Interpret (bei Samplern: "Various Artists")
|
|||
|
|
- year: Erscheinungsjahr als Zahl (null wenn nicht sichtbar)
|
|||
|
|
|
|||
|
|
CD-SPEZIFISCH:
|
|||
|
|
- ean: EAN-13 oder UPC-12 Barcode-Ziffern (NUR Ziffern, kein Leerzeichen; null wenn kein Barcode sichtbar)
|
|||
|
|
- catalog_number: Katalognummer des Labels (z.B. "435 712-2", "7243 5 55359 2 8")
|
|||
|
|
- quality: Aufnahme-/Abmischqualität falls angegeben (z.B. "DDD", "ADD", "AAD")
|
|||
|
|
- num_discs: Anzahl der CDs in der Box (1 wenn nicht angegeben)
|
|||
|
|
|
|||
|
|
KLASSIK-FELDER (null / leer wenn nicht zutreffend):
|
|||
|
|
- composer: Komponist(en)
|
|||
|
|
- conductor: Dirigent
|
|||
|
|
- orchestra: Orchester oder Ensemble
|
|||
|
|
- soloists: Liste der Solisten, Format ["Vorname Name (Instrument)", ...]
|
|||
|
|
|
|||
|
|
PRODUKTION:
|
|||
|
|
- label: Plattenlabel / Verlag (z.B. "Deutsche Grammophon", "EMI Classics", "Philips")
|
|||
|
|
- producer: Produzent(en) als Text
|
|||
|
|
- recording_info: Aufnahmeort und/oder -datum als Freitext
|
|||
|
|
|
|||
|
|
TRACKLISTE:
|
|||
|
|
- discs: Liste aller CDs
|
|||
|
|
Jede CD hat:
|
|||
|
|
- disc_number: Nummer der CD (1, 2, ...)
|
|||
|
|
- name: Name der CD falls angegeben, sonst null
|
|||
|
|
- tracks: Liste der Tracks, jeder Track hat:
|
|||
|
|
- number: Tracknummer (Zahl)
|
|||
|
|
- title: Titel GENAU wie gedruckt (ohne Zeitangabe)
|
|||
|
|
- artist: Interpret NUR wenn vom Hauptkünstler abweichend, sonst null
|
|||
|
|
- duration: Laufzeit als "M:SS" oder "H:MM:SS" falls sichtbar, sonst null
|
|||
|
|
|
|||
|
|
SONSTIGES:
|
|||
|
|
- additional: Alle weiteren Infos als Schlüssel-Wert-Paare, z.B.:
|
|||
|
|
{"Copyright": "© 1985 Polydor", "Tonmeister": "Max Muster", "Vertrieb": "Universal"}
|
|||
|
|
|
|||
|
|
WICHTIGE REGELN:
|
|||
|
|
- Lies ALLE Spalten vollständig von oben nach unten (Backcoverss haben oft 2–4 Spalten)
|
|||
|
|
- EAN-Ziffern ohne Trennzeichen
|
|||
|
|
- Zeitangaben aus Titeln weglassen — nur in "duration" erfassen
|
|||
|
|
- Deutsche Umlaute und Sonderzeichen korrekt übernehmen
|
|||
|
|
- Felder auf null setzen wenn Information nicht sichtbar
|
|||
|
|
|
|||
|
|
Antworte NUR mit dem JSON-Objekt, kein weiterer Text. /no_think"""
|
|||
|
|
|
|||
|
|
|
|||
|
|
# ---------------------------------------------------------------------------
|
|||
|
|
# Hilfsfunktionen
|
|||
|
|
# ---------------------------------------------------------------------------
|
|||
|
|
|
|||
|
|
|
|||
|
|
def query_llm(image_path: Path) -> str:
|
|||
|
|
"""Schickt das Bild ans Vision-LLM, gibt Rohausgabe zurück."""
|
|||
|
|
b64 = base64.b64encode(image_path.read_bytes()).decode()
|
|||
|
|
response = httpx.post(
|
|||
|
|
f"{BASE_URL}/api/chat",
|
|||
|
|
json={
|
|||
|
|
"model": MODEL,
|
|||
|
|
"messages": [
|
|||
|
|
{"role": "user", "content": BACKCOVER_PROMPT, "images": [b64]}
|
|||
|
|
],
|
|||
|
|
"stream": False,
|
|||
|
|
},
|
|||
|
|
timeout=TIMEOUT_LLM,
|
|||
|
|
)
|
|||
|
|
response.raise_for_status()
|
|||
|
|
return response.json()["message"]["content"]
|
|||
|
|
|
|||
|
|
|
|||
|
|
def extract_json(raw: str) -> str:
|
|||
|
|
"""Extrahiert JSON aus LLM-Antwort (bereinigt Think-Tags, Markdown)."""
|
|||
|
|
# Think-Tags entfernen
|
|||
|
|
raw = re.sub(r"<think>.*?</think>", "", raw, flags=re.DOTALL).strip()
|
|||
|
|
# Markdown-Codeblock
|
|||
|
|
md = re.search(r"```(?:json)?\s*(\{.*?\})\s*```", raw, re.DOTALL)
|
|||
|
|
if md:
|
|||
|
|
return md.group(1)
|
|||
|
|
# Äußerstes JSON-Objekt
|
|||
|
|
obj = re.search(r"\{.*\}", raw, re.DOTALL)
|
|||
|
|
if obj:
|
|||
|
|
return obj.group(0)
|
|||
|
|
raise ValueError(f"Kein JSON in Antwort: {raw[:300]!r}")
|
|||
|
|
|
|||
|
|
|
|||
|
|
def print_result(data: BackcoverData) -> None:
|
|||
|
|
"""Gibt das extrahierte Album strukturiert aus."""
|
|||
|
|
W = 60
|
|||
|
|
print(f"\n{'═'*W}")
|
|||
|
|
print(f" BACKCOVER-SCAN ERGEBNIS")
|
|||
|
|
print(f"{'═'*W}")
|
|||
|
|
|
|||
|
|
def row(label: str, value: object) -> None:
|
|||
|
|
if value is None or value == [] or value == {}:
|
|||
|
|
return
|
|||
|
|
label_str = f" {label:<22}"
|
|||
|
|
val_str = str(value)
|
|||
|
|
wrapped = textwrap.wrap(val_str, width=W - 24)
|
|||
|
|
print(f"{label_str}{wrapped[0] if wrapped else val_str}")
|
|||
|
|
for line in wrapped[1:]:
|
|||
|
|
print(f" {' '*22}{line}")
|
|||
|
|
|
|||
|
|
print(f"\n ── Kern-Metadaten ──────────────────────────")
|
|||
|
|
row("Titel", data.title)
|
|||
|
|
row("Künstler", data.artist)
|
|||
|
|
row("Jahr", data.year)
|
|||
|
|
row("Qualität", data.quality)
|
|||
|
|
row("Anzahl CDs", data.num_discs)
|
|||
|
|
row("EAN", data.ean)
|
|||
|
|
row("Katalognummer", data.catalog_number)
|
|||
|
|
|
|||
|
|
if any([data.composer, data.conductor, data.orchestra, data.soloists]):
|
|||
|
|
print(f"\n ── Klassik ─────────────────────────────────")
|
|||
|
|
row("Komponist", data.composer)
|
|||
|
|
row("Dirigent", data.conductor)
|
|||
|
|
row("Orchester", data.orchestra)
|
|||
|
|
for s in data.soloists:
|
|||
|
|
row("Solist", s)
|
|||
|
|
|
|||
|
|
if any([data.label, data.producer, data.recording_info]):
|
|||
|
|
print(f"\n ── Produktion ──────────────────────────────")
|
|||
|
|
row("Label", data.label)
|
|||
|
|
row("Produzent", data.producer)
|
|||
|
|
row("Aufnahme", data.recording_info)
|
|||
|
|
|
|||
|
|
if data.discs:
|
|||
|
|
print(f"\n ── Trackliste ──────────────────────────────")
|
|||
|
|
for disc in data.discs:
|
|||
|
|
disc_label = disc.name or f"CD {disc.disc_number}"
|
|||
|
|
print(f"\n [{disc_label}] ({len(disc.tracks)} Tracks)")
|
|||
|
|
for t in disc.tracks:
|
|||
|
|
dur = f" [{t.duration}]" if t.duration else ""
|
|||
|
|
art = f" ({t.artist})" if t.artist else ""
|
|||
|
|
print(f" {t.number:>2}. {t.title}{art}{dur}")
|
|||
|
|
|
|||
|
|
if data.additional:
|
|||
|
|
print(f"\n ── Weitere Infos ───────────────────────────")
|
|||
|
|
for k, v in data.additional.items():
|
|||
|
|
row(k, v)
|
|||
|
|
|
|||
|
|
print(f"\n{'═'*W}")
|
|||
|
|
|
|||
|
|
|
|||
|
|
# ---------------------------------------------------------------------------
|
|||
|
|
# Hauptprogramm
|
|||
|
|
# ---------------------------------------------------------------------------
|
|||
|
|
|
|||
|
|
|
|||
|
|
def run() -> None:
|
|||
|
|
UPLOAD_DIR.mkdir(parents=True, exist_ok=True)
|
|||
|
|
server = ScannerServer(port=PORT, upload_dir=UPLOAD_DIR)
|
|||
|
|
server.start()
|
|||
|
|
|
|||
|
|
print(f"\nModell: {MODEL}")
|
|||
|
|
print_qr(server.url())
|
|||
|
|
print(f"Upload-URL: {server.url()}\n")
|
|||
|
|
print("Tipp: CD-Hülle so fotografieren, dass Barcode UND Trackliste sichtbar sind.")
|
|||
|
|
|
|||
|
|
runde = 0
|
|||
|
|
try:
|
|||
|
|
while True:
|
|||
|
|
runde += 1
|
|||
|
|
print(f"\n{'─'*60}")
|
|||
|
|
print(f"Runde {runde} — Bitte Foto hochladen (URL: {server.url()})")
|
|||
|
|
print(f"(Formular setzt sich nach 3s automatisch zurück)")
|
|||
|
|
|
|||
|
|
photo = server.get_photo(timeout=TIMEOUT_PHOTO)
|
|||
|
|
if photo is None:
|
|||
|
|
print("Timeout — kein Foto empfangen.")
|
|||
|
|
print("Nochmal? (j/n) ", end="", flush=True)
|
|||
|
|
if input().strip().lower() not in ("j", "ja", "y", "yes"):
|
|||
|
|
break
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
print(f"Foto empfangen: {photo} ({photo.stat().st_size:,} Bytes)")
|
|||
|
|
print(f"Vision-LLM analysiert ({MODEL}) …", flush=True)
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
raw = query_llm(photo)
|
|||
|
|
except Exception as exc:
|
|||
|
|
print(f"LLM-Fehler: {exc}")
|
|||
|
|
else:
|
|||
|
|
print(f"\nRohantwort ({len(raw)} Zeichen):")
|
|||
|
|
print(textwrap.indent(raw[:800], " "))
|
|||
|
|
if len(raw) > 800:
|
|||
|
|
print(f" … (+{len(raw)-800} weitere Zeichen)")
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
json_str = extract_json(raw)
|
|||
|
|
data_dict = json.loads(json_str)
|
|||
|
|
data = BackcoverData.model_validate(data_dict)
|
|||
|
|
print_result(data)
|
|||
|
|
|
|||
|
|
# JSON-Datei speichern
|
|||
|
|
out = UPLOAD_DIR / f"backcover_{runde:02d}.json"
|
|||
|
|
out.write_text(
|
|||
|
|
json.dumps(data_dict, indent=2, ensure_ascii=False),
|
|||
|
|
encoding="utf-8",
|
|||
|
|
)
|
|||
|
|
print(f"\n Gespeichert: {out}")
|
|||
|
|
|
|||
|
|
except (ValueError, json.JSONDecodeError) as exc:
|
|||
|
|
print(f"\n JSON-Fehler: {exc}")
|
|||
|
|
print(f" Rohantwort vollständig:\n{raw}")
|
|||
|
|
|
|||
|
|
print("\nNochmal? (j/n) ", end="", flush=True)
|
|||
|
|
if input().strip().lower() not in ("j", "ja", "y", "yes"):
|
|||
|
|
break
|
|||
|
|
|
|||
|
|
except KeyboardInterrupt:
|
|||
|
|
print("\nAbgebrochen.")
|
|||
|
|
finally:
|
|||
|
|
server.stop()
|
|||
|
|
print("Server gestoppt.")
|
|||
|
|
|
|||
|
|
|
|||
|
|
if __name__ == "__main__":
|
|||
|
|
run()
|