feat: GnuDB fallback with retries when abcde CDDB lookup returns nothing

- New module cddb.py: direct GnuDB/FreeDB HTTP lookup using CDDB protocol,
  with same retry+random-delay logic as MusicBrainz barcode lookup
- get_discid() reads disc fingerprint via cd-discid before ripping
- If abcde returns no CDDB track data, lookup_by_discid() queries GnuDB
  directly (up to 3 retries, 2-6 s random pause between attempts)
- TrackInfo moved from ripper.py to models.py to break circular import
  (cddb.py and ripper.py both use TrackInfo)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Dieter Schlüter 2026-02-18 07:24:16 +01:00
commit e75e5d7de0
4 changed files with 229 additions and 11 deletions

206
src/musiksammlung/cddb.py Normal file
View file

@ -0,0 +1,206 @@
"""Direkter GnuDB/CDDB-Lookup via HTTP mit Retry-Logik.
Wird als Fallback verwendet, wenn abcde keinen CDDB-Treffer liefert.
GnuDB (gnudb.gnudb.org) ist der freie Nachfolger von FreeDB/CDDB.
"""
from __future__ import annotations
import logging
import random
import subprocess
import time
import httpx
from musiksammlung.models import TrackInfo
logger = logging.getLogger(__name__)
_GNUDB_URL = "https://gnudb.gnudb.org/~cddb/cddb.cgi"
_HELLO = "musiksammlung+localhost+musiksammlung+0.1"
_RATE_SLEEP = 1.1
def get_discid(device: str = "/dev/cdrom") -> str | None:
"""Liest den CDDB-Disc-Fingerprint mit cd-discid aus.
Args:
device: CD-Laufwerk
Returns:
Ausgabe von cd-discid (z.B. 'b30d3a0d 13 150 22987 ... 3512')
oder None wenn cd-discid nicht verfügbar oder fehlgeschlagen.
"""
try:
result = subprocess.run(
["cd-discid", device],
capture_output=True,
text=True,
timeout=15,
)
if result.returncode == 0 and result.stdout.strip():
discid_line = result.stdout.strip()
logger.info("cd-discid: %s", discid_line)
return discid_line
logger.warning("cd-discid fehlgeschlagen (exit %d): %s", result.returncode, result.stderr)
except FileNotFoundError:
logger.info("cd-discid nicht installiert — kein direkter CDDB-Fallback möglich")
except Exception as e:
logger.warning("cd-discid Fehler: %s", e)
return None
def _query_gnudb(discid_line: str) -> tuple[str, str] | None:
"""Sucht auf GnuDB nach Kategorie + Disc-ID.
Args:
discid_line: Ausgabe von cd-discid (discid numtracks offsets... total)
Returns:
(category, discid) oder None wenn kein Treffer.
"""
parts = discid_line.split()
if len(parts) < 3:
return None
discid = parts[0]
rest = " ".join(parts[1:])
response = httpx.get(
_GNUDB_URL,
params={
"cmd": f"cddb query {discid} {rest}",
"hello": _HELLO,
"proto": "6",
},
timeout=20.0,
)
response.raise_for_status()
text = response.text.strip()
logger.debug("GnuDB query response: %s", text[:200])
lines = text.splitlines()
if not lines:
return None
code = lines[0][:3]
if code == "200":
# Exakter Treffer: "200 category discid title"
parts2 = lines[0].split(None, 3)
if len(parts2) >= 3:
return parts2[1], parts2[2]
elif code in ("211", "210"):
# Mehrere Treffer — ersten nehmen
for line in lines[1:]:
if line.strip() == ".":
break
match_parts = line.split(None, 2)
if len(match_parts) >= 2:
return match_parts[0], match_parts[1]
return None
def _read_gnudb(category: str, discid: str) -> list[TrackInfo] | None:
"""Liest Trackliste einer CDDB-Kategorie/Disc-ID von GnuDB.
Returns:
Liste von TrackInfo oder None bei Fehler.
"""
time.sleep(_RATE_SLEEP)
response = httpx.get(
_GNUDB_URL,
params={
"cmd": f"cddb read {category} {discid}",
"hello": _HELLO,
"proto": "6",
},
timeout=20.0,
)
response.raise_for_status()
text = response.text
logger.debug("GnuDB read response (%d Zeichen)", len(text))
lines = text.splitlines()
if not lines or not lines[0].startswith("210"):
logger.warning("GnuDB read: unerwarteter Statuscode: %s", lines[0] if lines else "")
return None
# xmcd-Format parsen
dtitle = ""
ttitles: dict[int, str] = {}
for line in lines[1:]:
if line.startswith("#") or line == ".":
continue
if line.startswith("DTITLE="):
dtitle = line[7:].strip()
elif line.startswith("TTITLE"):
eq = line.index("=")
idx = int(line[6:eq])
ttitles[idx] = line[eq + 1:].strip()
# Künstler aus "Artist / Title" extrahieren
album_artist = ""
if " / " in dtitle:
album_artist = dtitle.split(" / ", 1)[0].strip()
if not ttitles:
return None
tracks = []
for i in sorted(ttitles.keys()):
title = ttitles[i]
# Manche Einträge: "Artist - Title" bei Samplern
track_artist = album_artist
if " - " in title and album_artist.lower() in ("various", "various artists", "va", ""):
parts = title.split(" - ", 1)
track_artist = parts[0].strip()
title = parts[1].strip()
tracks.append(TrackInfo(
track_number=i + 1,
artist=track_artist,
title=title,
))
logger.info("GnuDB: %d Tracks für '%s' geladen", len(tracks), dtitle)
return tracks
def lookup_by_discid(
discid_line: str,
retries: int = 3,
) -> list[TrackInfo] | None:
"""Sucht Trackliste auf GnuDB anhand des Disc-Fingerprints.
Bei leerem Ergebnis wird bis zu `retries`-mal mit zufälliger Pause
(26 s) wiederholt.
Args:
discid_line: Ausgabe von cd-discid
retries: Anzahl Wiederholungsversuche
Returns:
Liste von TrackInfo oder None wenn kein Treffer.
"""
for attempt in range(retries + 1):
logger.info(
"GnuDB-Lookup (Versuch %d/%d): %s",
attempt + 1, retries + 1, discid_line.split()[0],
)
try:
match = _query_gnudb(discid_line)
if match:
category, discid = match
logger.info("GnuDB: Treffer in Kategorie '%s', discid=%s", category, discid)
return _read_gnudb(category, discid)
except httpx.HTTPError as e:
logger.warning("GnuDB HTTP-Fehler (Versuch %d): %s", attempt + 1, e)
if attempt < retries:
wait = random.uniform(2.0, 6.0)
logger.info("Kein Treffer — warte %.1f s vor erneutem Versuch...", wait)
time.sleep(wait)
logger.info("GnuDB: kein Treffer nach %d Versuchen", retries + 1)
return None

View file

@ -3,10 +3,19 @@
from __future__ import annotations
import re
from typing import NamedTuple
from pydantic import BaseModel, field_validator
class TrackInfo(NamedTuple):
"""Track-Information aus CDDB/GnuDB (rohe Rip-Daten, vor Pydantic-Validierung)."""
track_number: int
artist: str
title: str
class Track(BaseModel):
track_number: int
title: str

View file

@ -6,14 +6,15 @@ import logging
import re
import subprocess
from pathlib import Path
from typing import NamedTuple
from pydantic import BaseModel
from musiksammlung.cddb import get_discid, lookup_by_discid
from musiksammlung.config import AudioFormat
from musiksammlung.models import Album as AlbumModel
from musiksammlung.models import Disc as DiscModel
from musiksammlung.models import Track as TrackModel
from musiksammlung.models import TrackInfo
from musiksammlung.musicbrainz import lookup_by_barcode
logger = logging.getLogger(__name__)
@ -22,14 +23,6 @@ logger = logging.getLogger(__name__)
_ANSI_ESC = re.compile(r"(\x1b|\^)\[[\d;]*[A-Za-z@]?")
class TrackInfo(NamedTuple):
"""Track information from abcde."""
track_number: int
artist: str
title: str
class RipperConfig(BaseModel):
"""Configuration for ripping process."""
@ -331,6 +324,9 @@ def _rip_with_abcde(
if use_pipes:
cmd.append("-P")
# Disc-Fingerprint vor dem Ripping holen (für GnuDB-Fallback)
discid_line = get_discid(device)
print(f"\n Command: {' '.join(cmd)}", flush=True)
logger.info("Starting abcde: %s", " ".join(cmd))
@ -355,6 +351,14 @@ def _rip_with_abcde(
else:
print("\n CDDB: no track data found", flush=True)
logger.warning("CDDB lookup returned no track data")
# Fallback: GnuDB direkt anfragen (mit Retries + Zufallspause)
if discid_line:
print(" GnuDB-Fallback: direkter Lookup mit Retries...", flush=True)
tracks = lookup_by_discid(discid_line) or None
if tracks:
print(f" GnuDB: {len(tracks)} Tracks gefunden", flush=True)
else:
print(" GnuDB: kein Treffer.", flush=True)
# Extract track files from abcde's temp dir into output_dir (flat)
audio_files = _extract_tracks(output_dir, audio_format)

View file

@ -4,10 +4,9 @@ from pathlib import Path
from unittest.mock import MagicMock, patch
from musiksammlung.config import AudioFormat
from musiksammlung.models import Album, Disc, Track
from musiksammlung.models import Album, Disc, Track, TrackInfo
from musiksammlung.ripper import (
RipperConfig,
TrackInfo,
_clean_input,
_extract_tracks,
_parse_cddb_lines,