Fix sanitize_filename consistency, add scanner server tests, remove stray file

- Unify sanitize_filename (organizer) and _sanitize_name (ripper): both now use
  whitelist approach — spaces→underscore, keep \w and hyphens, remove everything
  else (brackets, punctuation, commas, dots, …). _sanitize_name removed from
  ripper.py; ripper now imports sanitize_filename from organizer directly.
- Add tests/test_scanner_server.py: 15 tests covering HTTP GET/POST handlers,
  image upload queue, 404/400 error paths, _get_local_ip fallback, print_qr
  graceful degradation without qrcode installed.
- Delete empty stray file '3' from repo root.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Dieter Schlüter 2026-02-19 14:20:03 +01:00
commit 7135e681f8
5 changed files with 258 additions and 51 deletions

View file

@ -38,14 +38,17 @@ class DiscCheck:
def sanitize_filename(name: str) -> str: def sanitize_filename(name: str) -> str:
"""Ersetzt Sonderzeichen und Leerzeichen durch Unterstriche. """Bereinigt einen Namen für die Verwendung als Verzeichnis- oder Dateiname.
Buchstaben (inkl. Umlaute), Ziffern und bestehende Unterstriche bleiben erhalten. - Leerzeichen Unterstriche
Mehrere aufeinanderfolgende Unterstriche werden zu einem zusammengezogen. - Buchstaben (inkl. Umlaute), Ziffern, Unterstriche und Bindestriche bleiben erhalten
- Alle anderen Zeichen (Klammern, Satzzeichen, ) werden entfernt
- Mehrere aufeinanderfolgende Unterstriche werden zu einem zusammengezogen
""" """
name = re.sub(r'[^\w]', '_', name) # alles außer Buchstaben/Ziffern/_ → _ name = name.replace(" ", "_")
name = re.sub(r'_+', '_', name) # mehrere _ → einer name = re.sub(r"[^\w\-]", "", name) # nur \w und Bindestrich behalten, Rest entfernen
return name.strip('_') name = re.sub(r"_+", "_", name) # mehrfache _ zusammenfassen
return name.strip("_-")
def discover_audio_files(directory: Path) -> list[Path]: def discover_audio_files(directory: Path) -> list[Path]:

View file

@ -20,6 +20,7 @@ from musiksammlung.models import Disc as DiscModel
from musiksammlung.models import Track as TrackModel from musiksammlung.models import Track as TrackModel
from musiksammlung.models import TrackInfo from musiksammlung.models import TrackInfo
from musiksammlung.musicbrainz import lookup_by_barcode from musiksammlung.musicbrainz import lookup_by_barcode
from musiksammlung.organizer import sanitize_filename
from musiksammlung.scanner_server import ScannerServer, print_qr from musiksammlung.scanner_server import ScannerServer, print_qr
from musiksammlung.vision_llm import extract_barcode_from_image, parse_image from musiksammlung.vision_llm import extract_barcode_from_image, parse_image
@ -182,26 +183,6 @@ def _input_or_scan(
continue continue
def _sanitize_name(name: str) -> str:
"""Bereinigt einen Namen für die Verwendung als Verzeichnis- oder Dateiname.
- Leerzeichen Unterstrich
- Alle Sonder- und Satzzeichen (Klammern, Komma, Punkt, ) werden entfernt
- Nur Buchstaben (inkl. Umlaute), Ziffern, Unterstrich und Bindestrich bleiben
- Mehrfache Unterstriche werden zusammengefasst
Args:
name: Original-Name
Returns:
Bereinigter Name
"""
name = name.replace(" ", "_")
name = re.sub(r"[^\w\-]", "", name) # nur \w (Buchstaben/Ziffern/_) und - behalten
name = re.sub(r"_+", "_", name) # mehrfache Unterstriche zusammenfassen
name = name.strip("_-")
return name
def _parse_cddb_lines(lines: list[str]) -> list[TrackInfo]: def _parse_cddb_lines(lines: list[str]) -> list[TrackInfo]:
"""Parse CDDB track list from abcde output lines. """Parse CDDB track list from abcde output lines.
@ -458,11 +439,11 @@ def _rename_files(
num = int(m.group(1)) num = int(m.group(1))
track = by_num.get(num) track = by_num.get(num)
if track: if track:
safe_title = _sanitize_name(track.title) safe_title = sanitize_filename(track.title)
if track.artist: if track.artist:
new_name = ( new_name = (
f"{num:02d}_-_{safe_title}_-_" f"{num:02d}_-_{safe_title}_-_"
f"{_sanitize_name(track.artist)}{audio_format.extension}" f"{sanitize_filename(track.artist)}{audio_format.extension}"
) )
else: else:
new_name = f"{num:02d}_-_{safe_title}{audio_format.extension}" new_name = f"{num:02d}_-_{safe_title}{audio_format.extension}"
@ -709,7 +690,7 @@ def interactive_rip(config: RipperConfig) -> None:
# Album-Root und Cover VOR dem Ripping anlegen, # Album-Root und Cover VOR dem Ripping anlegen,
# damit backcover.jpg für Vision-LLM verfügbar ist. # damit backcover.jpg für Vision-LLM verfügbar ist.
album_root = config.output_dir / _sanitize_name(album_name) album_root = config.output_dir / sanitize_filename(album_name)
album_root.mkdir(parents=True, exist_ok=True) album_root.mkdir(parents=True, exist_ok=True)
if mb_mbid: if mb_mbid:
download_caa_covers(mb_mbid, album_root) download_caa_covers(mb_mbid, album_root)
@ -844,7 +825,7 @@ def interactive_rip(config: RipperConfig) -> None:
disc_dir = ( disc_dir = (
config.output_dir config.output_dir
/ _sanitize_name(album_name) / sanitize_filename(album_name)
/ f"CD{disc_num}" / f"CD{disc_num}"
) )
@ -955,7 +936,7 @@ def interactive_rip(config: RipperConfig) -> None:
if final_album is not None: if final_album is not None:
# Verzeichnis umbenennen, wenn Vision-LLM/CDDB einen anderen # Verzeichnis umbenennen, wenn Vision-LLM/CDDB einen anderen
# Namen lieferte als der Platzhalter (z. B. "Album1") # Namen lieferte als der Platzhalter (z. B. "Album1")
proper_name = _sanitize_name(final_album.album or album_name) proper_name = sanitize_filename(final_album.album or album_name)
if proper_name and proper_name != album_root.name: if proper_name and proper_name != album_root.name:
new_root = album_root.parent / proper_name new_root = album_root.parent / proper_name
if new_root.exists(): if new_root.exists():

View file

@ -17,16 +17,22 @@ class TestSanitizeFilename:
def test_replaces_spaces(self) -> None: def test_replaces_spaces(self) -> None:
assert sanitize_filename("Hello World") == "Hello_World" assert sanitize_filename("Hello World") == "Hello_World"
def test_replaces_punctuation(self) -> None: def test_removes_punctuation(self) -> None:
assert sanitize_filename("Song (Live)") == "Song_Live" assert sanitize_filename("Song (Live)") == "Song_Live"
assert sanitize_filename("Artist: Name") == "Artist_Name" assert sanitize_filename("Artist: Name") == "Artist_Name"
assert sanitize_filename("Vol. 2") == "Vol_2"
assert sanitize_filename("Hello, World!") == "Hello_World"
def test_keeps_hyphens(self) -> None:
assert sanitize_filename("AC-DC") == "AC-DC"
assert sanitize_filename("Sonata - Allegro") == "Sonata_-_Allegro"
def test_collapses_multiple_underscores(self) -> None: def test_collapses_multiple_underscores(self) -> None:
assert sanitize_filename("A B") == "A_B" assert sanitize_filename("A B") == "A_B"
assert sanitize_filename("A--B") == "A_B"
def test_strips_leading_trailing_underscores(self) -> None: def test_strips_leading_trailing(self) -> None:
assert sanitize_filename("_Test_") == "Test" assert sanitize_filename("_Test_") == "Test"
assert sanitize_filename("-Test-") == "Test"
def test_keeps_umlauts(self) -> None: def test_keeps_umlauts(self) -> None:
assert sanitize_filename("Für Elise") == "Für_Elise" assert sanitize_filename("Für Elise") == "Für_Elise"

View file

@ -5,6 +5,7 @@ from unittest.mock import MagicMock, patch
from musiksammlung.config import AudioFormat from musiksammlung.config import AudioFormat
from musiksammlung.models import Album, Disc, Track, TrackInfo from musiksammlung.models import Album, Disc, Track, TrackInfo
from musiksammlung.organizer import sanitize_filename
from musiksammlung.ripper import ( from musiksammlung.ripper import (
RipperConfig, RipperConfig,
_clean_input, _clean_input,
@ -12,38 +13,37 @@ from musiksammlung.ripper import (
_parse_cddb_lines, _parse_cddb_lines,
_parse_grab_tracks, _parse_grab_tracks,
_rename_files, _rename_files,
_sanitize_name,
interactive_rip, interactive_rip,
) )
class TestSanitizeName: class TestSanitizeName:
"""Tests für _sanitize_name.""" """Tests für sanitize_filename (aus organizer, genutzt von ripper und apply)."""
def test_replace_spaces(self) -> None: def test_replace_spaces(self) -> None:
assert _sanitize_name("Hello World") == "Hello_World" assert sanitize_filename("Hello World") == "Hello_World"
assert _sanitize_name("Test Track Title") == "Test_Track_Title" assert sanitize_filename("Test Track Title") == "Test_Track_Title"
def test_remove_invalid_chars(self) -> None: def test_remove_invalid_chars(self) -> None:
assert _sanitize_name("Test/Track:Name") == "TestTrackName" assert sanitize_filename("Test/Track:Name") == "TestTrackName"
assert _sanitize_name('Test<Track>"Name') == "TestTrackName" assert sanitize_filename('Test<Track>"Name') == "TestTrackName"
assert _sanitize_name("Test|Track?Name*") == "TestTrackName" assert sanitize_filename("Test|Track?Name*") == "TestTrackName"
assert _sanitize_name("It's_a_Test") == "Its_a_Test" assert sanitize_filename("It's_a_Test") == "Its_a_Test"
def test_remove_brackets_and_punctuation(self) -> None: def test_remove_brackets_and_punctuation(self) -> None:
assert _sanitize_name("Best of (1990)") == "Best_of_1990" assert sanitize_filename("Best of (1990)") == "Best_of_1990"
assert _sanitize_name("Hello, World!") == "Hello_World" assert sanitize_filename("Hello, World!") == "Hello_World"
assert _sanitize_name("Vol. 2") == "Vol_2" assert sanitize_filename("Vol. 2") == "Vol_2"
assert _sanitize_name("Salt & Pepper [Remix]") == "Salt_Pepper_Remix" assert sanitize_filename("Salt & Pepper [Remix]") == "Salt_Pepper_Remix"
assert _sanitize_name("The Best of... (Deluxe Edition)") == "The_Best_of_Deluxe_Edition" assert sanitize_filename("The Best of... (Deluxe Edition)") == "The_Best_of_Deluxe_Edition"
def test_keep_umlauts(self) -> None: def test_keep_umlauts(self) -> None:
assert _sanitize_name("Grüße aus Österreich") == "Grüße_aus_Österreich" assert sanitize_filename("Grüße aus Österreich") == "Grüße_aus_Österreich"
assert _sanitize_name("Schöne Sänger") == "Schöne_Sänger" assert sanitize_filename("Schöne Sänger") == "Schöne_Sänger"
def test_strip_underscores(self) -> None: def test_strip_underscores(self) -> None:
assert _sanitize_name("_Test_") == "Test" assert sanitize_filename("_Test_") == "Test"
assert _sanitize_name(" _Test_ ") == "Test" assert sanitize_filename(" _Test_ ") == "Test"
class TestCleanInput: class TestCleanInput:

View file

@ -0,0 +1,217 @@
"""Tests für den Scanner-Server (HTTP-Upload für Backcover/EAN-Fotos)."""
from __future__ import annotations
import base64
import json
import socket
import urllib.error
import urllib.request
from pathlib import Path
from unittest.mock import patch
from musiksammlung.scanner_server import ScannerServer, _get_local_ip, print_qr
def _free_port() -> int:
"""Gibt einen freien TCP-Port zurück."""
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(("", 0))
return s.getsockname()[1]
class TestScannerServerBasics:
"""Grundlegende ScannerServer-Tests ohne Netzwerk."""
def test_url_format(self) -> None:
server = ScannerServer(port=12345)
url = server.url()
assert url.startswith("http://")
assert "12345" in url
def test_get_photo_empty_queue_returns_none(self, tmp_path: Path) -> None:
server = ScannerServer(port=_free_port(), upload_dir=tmp_path)
assert server.get_photo(timeout=0) is None
def test_custom_upload_dir(self, tmp_path: Path) -> None:
server = ScannerServer(port=_free_port(), upload_dir=tmp_path)
assert server._upload_dir == tmp_path
class TestScannerServerHttp:
"""Integrationstests mit echtem HTTP-Server."""
def test_get_root_returns_html(self, tmp_path: Path) -> None:
port = _free_port()
server = ScannerServer(port=port, upload_dir=tmp_path)
server.start()
try:
with urllib.request.urlopen(f"http://127.0.0.1:{port}/") as resp:
assert resp.status == 200
html = resp.read().decode("utf-8")
assert "<html" in html.lower()
assert "upload" in html.lower()
finally:
server.stop()
def test_get_index_html_alias(self, tmp_path: Path) -> None:
port = _free_port()
server = ScannerServer(port=port, upload_dir=tmp_path)
server.start()
try:
with urllib.request.urlopen(f"http://127.0.0.1:{port}/index.html") as resp:
assert resp.status == 200
finally:
server.stop()
def test_get_unknown_path_returns_404(self, tmp_path: Path) -> None:
port = _free_port()
server = ScannerServer(port=port, upload_dir=tmp_path)
server.start()
try:
try:
urllib.request.urlopen(f"http://127.0.0.1:{port}/unknown")
assert False, "Should have raised HTTPError"
except urllib.error.HTTPError as e:
assert e.code == 404
finally:
server.stop()
def test_post_upload_valid_image(self, tmp_path: Path) -> None:
port = _free_port()
server = ScannerServer(port=port, upload_dir=tmp_path)
server.start()
try:
fake_img = b"fake image bytes for testing"
b64 = base64.b64encode(fake_img).decode()
payload = json.dumps({"image": f"data:image/jpeg;base64,{b64}"}).encode()
req = urllib.request.Request(
f"http://127.0.0.1:{port}/upload",
data=payload,
headers={"Content-Type": "application/json"},
)
with urllib.request.urlopen(req) as resp:
data = json.loads(resp.read())
assert data["status"] == "ok"
photo = server.get_photo(timeout=2.0)
assert photo is not None
assert photo.suffix == ".jpg"
assert photo.read_bytes() == fake_img
finally:
server.stop()
def test_post_upload_png_extension(self, tmp_path: Path) -> None:
port = _free_port()
server = ScannerServer(port=port, upload_dir=tmp_path)
server.start()
try:
b64 = base64.b64encode(b"png data").decode()
payload = json.dumps({"image": f"data:image/png;base64,{b64}"}).encode()
req = urllib.request.Request(
f"http://127.0.0.1:{port}/upload",
data=payload,
headers={"Content-Type": "application/json"},
)
with urllib.request.urlopen(req):
pass
photo = server.get_photo(timeout=2.0)
assert photo is not None
assert photo.suffix == ".png"
finally:
server.stop()
def test_post_upload_invalid_json_returns_400(self, tmp_path: Path) -> None:
port = _free_port()
server = ScannerServer(port=port, upload_dir=tmp_path)
server.start()
try:
payload = b"not valid json at all"
req = urllib.request.Request(
f"http://127.0.0.1:{port}/upload",
data=payload,
headers={"Content-Type": "application/json"},
)
try:
urllib.request.urlopen(req)
assert False, "Should have raised HTTPError"
except urllib.error.HTTPError as e:
assert e.code == 400
finally:
server.stop()
def test_post_upload_missing_image_key_returns_400(self, tmp_path: Path) -> None:
port = _free_port()
server = ScannerServer(port=port, upload_dir=tmp_path)
server.start()
try:
payload = json.dumps({"wrong_key": "data"}).encode()
req = urllib.request.Request(
f"http://127.0.0.1:{port}/upload",
data=payload,
headers={"Content-Type": "application/json"},
)
try:
urllib.request.urlopen(req)
assert False, "Should have raised HTTPError"
except urllib.error.HTTPError as e:
assert e.code == 400
finally:
server.stop()
def test_post_to_unknown_path_returns_404(self, tmp_path: Path) -> None:
port = _free_port()
server = ScannerServer(port=port, upload_dir=tmp_path)
server.start()
try:
req = urllib.request.Request(
f"http://127.0.0.1:{port}/other",
data=b"data",
headers={"Content-Type": "application/json"},
)
try:
urllib.request.urlopen(req)
assert False, "Should have raised HTTPError"
except urllib.error.HTTPError as e:
assert e.code == 404
finally:
server.stop()
class TestGetLocalIp:
"""Tests für _get_local_ip."""
def test_returns_ip_string(self) -> None:
ip = _get_local_ip()
assert isinstance(ip, str)
parts = ip.split(".")
assert len(parts) == 4
def test_fallback_on_network_error(self) -> None:
with patch("socket.socket") as mock_sock:
mock_sock.return_value.__enter__.return_value.connect.side_effect = OSError
result = _get_local_ip()
assert result == "127.0.0.1"
class TestPrintQr:
"""Tests für print_qr."""
def test_with_qrcode_installed(self, capsys) -> None:
# qrcode ist installiert (steht in pyproject.toml); kein Fehler erwartet
print_qr("http://192.168.1.1:8765")
# Ausgabe enthält mindestens etwas (QR oder leer — beides ok)
def test_without_qrcode_graceful(self, monkeypatch) -> None:
"""Wenn qrcode nicht installiert ist, wird kein Fehler geworfen."""
import builtins
original_import = builtins.__import__
def mock_import(name, *args, **kwargs):
if name == "qrcode":
raise ImportError("No module named 'qrcode'")
return original_import(name, *args, **kwargs)
monkeypatch.setattr(builtins, "__import__", mock_import)
print_qr("http://example.com") # darf keinen Fehler werfen