110 lines
3.4 KiB
Python
110 lines
3.4 KiB
Python
|
|
"""MusicBrainz-Lookup via EAN/Barcode."""
|
||
|
|
|
||
|
|
from __future__ import annotations
|
||
|
|
|
||
|
|
import logging
|
||
|
|
import time
|
||
|
|
|
||
|
|
import httpx
|
||
|
|
|
||
|
|
from musiksammlung.models import Album, Disc, Track
|
||
|
|
|
||
|
|
logger = logging.getLogger(__name__)
|
||
|
|
|
||
|
|
_MB_BASE = "https://musicbrainz.org/ws/2"
|
||
|
|
_USER_AGENT = "musiksammlung/0.1 ( https://kitux.de/forgejo/dschlueter/Musiksammlung )"
|
||
|
|
_RATE_SLEEP = 1.1 # MusicBrainz erlaubt max. 1 Request/Sekunde
|
||
|
|
|
||
|
|
|
||
|
|
def _get(path: str, params: dict) -> dict:
|
||
|
|
"""HTTP-GET gegen die MusicBrainz-API mit korrektem User-Agent."""
|
||
|
|
response = httpx.get(
|
||
|
|
f"{_MB_BASE}{path}",
|
||
|
|
params=params,
|
||
|
|
headers={"User-Agent": _USER_AGENT},
|
||
|
|
timeout=30.0,
|
||
|
|
)
|
||
|
|
response.raise_for_status()
|
||
|
|
return response.json()
|
||
|
|
|
||
|
|
|
||
|
|
def lookup_by_barcode(ean: str) -> Album:
|
||
|
|
"""Schlägt ein Album anhand des EAN-Barcodes in MusicBrainz nach.
|
||
|
|
|
||
|
|
Führt zwei API-Requests durch:
|
||
|
|
1. Barcode-Suche → MBID des ersten Treffers
|
||
|
|
2. Release-Details mit Recordings → Trackliste
|
||
|
|
|
||
|
|
Args:
|
||
|
|
ean: EAN-13- oder UPC-12-Barcode
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
Album mit vollständiger Trackliste
|
||
|
|
|
||
|
|
Raises:
|
||
|
|
ValueError: Kein Eintrag für diesen Barcode gefunden
|
||
|
|
httpx.HTTPError: Netzwerk- oder API-Fehler
|
||
|
|
"""
|
||
|
|
# Schritt 1: Barcode-Suche
|
||
|
|
logger.info("MusicBrainz: Suche nach Barcode %s", ean)
|
||
|
|
data = _get("/release/", {"query": f"barcode:{ean}", "fmt": "json"})
|
||
|
|
|
||
|
|
releases = data.get("releases", [])
|
||
|
|
if not releases:
|
||
|
|
raise ValueError(f"Kein MusicBrainz-Eintrag für Barcode {ean!r} gefunden.")
|
||
|
|
|
||
|
|
mbid = releases[0]["id"]
|
||
|
|
logger.info("MusicBrainz: Treffer MBID=%s, lade Details...", mbid)
|
||
|
|
time.sleep(_RATE_SLEEP)
|
||
|
|
|
||
|
|
# Schritt 2: Trackliste laden
|
||
|
|
detail = _get(f"/release/{mbid}", {"inc": "recordings", "fmt": "json"})
|
||
|
|
return _parse_release(detail)
|
||
|
|
|
||
|
|
|
||
|
|
def _parse_release(data: dict) -> Album:
|
||
|
|
"""Wandelt eine MusicBrainz-Release-Antwort in ein Album-Modell um."""
|
||
|
|
# Künstler
|
||
|
|
artist_credit = data.get("artist-credit", [])
|
||
|
|
artist = artist_credit[0]["artist"]["name"] if artist_credit else ""
|
||
|
|
|
||
|
|
# Albumtitel
|
||
|
|
title = data.get("title", "")
|
||
|
|
|
||
|
|
# Jahr aus "date" extrahieren ("YYYY", "YYYY-MM" oder "YYYY-MM-DD")
|
||
|
|
year: int | None = None
|
||
|
|
date_str = data.get("date", "")
|
||
|
|
if date_str and len(date_str) >= 4:
|
||
|
|
try:
|
||
|
|
year = int(date_str[:4])
|
||
|
|
except ValueError:
|
||
|
|
pass
|
||
|
|
|
||
|
|
# Medien → Discs
|
||
|
|
discs: list[Disc] = []
|
||
|
|
for medium in data.get("media", []):
|
||
|
|
disc_number = medium.get("position", len(discs) + 1)
|
||
|
|
tracks: list[Track] = []
|
||
|
|
for t in medium.get("tracks", []):
|
||
|
|
track_number = t.get("position", len(tracks) + 1)
|
||
|
|
track_title = t.get("title", "")
|
||
|
|
|
||
|
|
# Track-Künstler nur setzen, wenn er vom Album-Künstler abweicht
|
||
|
|
t_credits = t.get("artist-credit", [])
|
||
|
|
track_artist: str | None = None
|
||
|
|
if t_credits:
|
||
|
|
t_artist = t_credits[0]["artist"]["name"]
|
||
|
|
if t_artist != artist:
|
||
|
|
track_artist = t_artist
|
||
|
|
|
||
|
|
tracks.append(Track(
|
||
|
|
track_number=track_number,
|
||
|
|
title=track_title,
|
||
|
|
artist=track_artist,
|
||
|
|
))
|
||
|
|
discs.append(Disc(disc_number=disc_number, tracks=tracks))
|
||
|
|
|
||
|
|
if not discs:
|
||
|
|
raise ValueError("MusicBrainz-Release enthält keine Medien/Tracks.")
|
||
|
|
|
||
|
|
return Album(artist=artist, album=title, year=year, discs=discs)
|