From 7135e681f8bdf2bab2c34becfdd6c73a011e8835 Mon Sep 17 00:00:00 2001 From: dschlueter Date: Thu, 19 Feb 2026 14:20:03 +0100 Subject: [PATCH] Fix sanitize_filename consistency, add scanner server tests, remove stray file MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Unify sanitize_filename (organizer) and _sanitize_name (ripper): both now use whitelist approach — spaces→underscore, keep \w and hyphens, remove everything else (brackets, punctuation, commas, dots, …). _sanitize_name removed from ripper.py; ripper now imports sanitize_filename from organizer directly. - Add tests/test_scanner_server.py: 15 tests covering HTTP GET/POST handlers, image upload queue, 404/400 error paths, _get_local_ip fallback, print_qr graceful degradation without qrcode installed. - Delete empty stray file '3' from repo root. Co-Authored-By: Claude Sonnet 4.6 --- src/musiksammlung/organizer.py | 15 ++- src/musiksammlung/ripper.py | 31 +---- tests/test_organizer.py | 12 +- tests/test_ripper.py | 34 +++--- tests/test_scanner_server.py | 217 +++++++++++++++++++++++++++++++++ 5 files changed, 258 insertions(+), 51 deletions(-) create mode 100644 tests/test_scanner_server.py diff --git a/src/musiksammlung/organizer.py b/src/musiksammlung/organizer.py index cb0ed1f..2dd1060 100644 --- a/src/musiksammlung/organizer.py +++ b/src/musiksammlung/organizer.py @@ -38,14 +38,17 @@ class DiscCheck: def sanitize_filename(name: str) -> str: - """Ersetzt Sonderzeichen und Leerzeichen durch Unterstriche. + """Bereinigt einen Namen für die Verwendung als Verzeichnis- oder Dateiname. - Buchstaben (inkl. Umlaute), Ziffern und bestehende Unterstriche bleiben erhalten. - Mehrere aufeinanderfolgende Unterstriche werden zu einem zusammengezogen. + - Leerzeichen → Unterstriche + - Buchstaben (inkl. Umlaute), Ziffern, Unterstriche und Bindestriche bleiben erhalten + - Alle anderen Zeichen (Klammern, Satzzeichen, …) werden entfernt + - Mehrere aufeinanderfolgende Unterstriche werden zu einem zusammengezogen """ - name = re.sub(r'[^\w]', '_', name) # alles außer Buchstaben/Ziffern/_ → _ - name = re.sub(r'_+', '_', name) # mehrere _ → einer - return name.strip('_') + name = name.replace(" ", "_") + name = re.sub(r"[^\w\-]", "", name) # nur \w und Bindestrich behalten, Rest entfernen + name = re.sub(r"_+", "_", name) # mehrfache _ zusammenfassen + return name.strip("_-") def discover_audio_files(directory: Path) -> list[Path]: diff --git a/src/musiksammlung/ripper.py b/src/musiksammlung/ripper.py index 651846e..729b817 100644 --- a/src/musiksammlung/ripper.py +++ b/src/musiksammlung/ripper.py @@ -20,6 +20,7 @@ from musiksammlung.models import Disc as DiscModel from musiksammlung.models import Track as TrackModel from musiksammlung.models import TrackInfo from musiksammlung.musicbrainz import lookup_by_barcode +from musiksammlung.organizer import sanitize_filename from musiksammlung.scanner_server import ScannerServer, print_qr from musiksammlung.vision_llm import extract_barcode_from_image, parse_image @@ -182,26 +183,6 @@ def _input_or_scan( continue -def _sanitize_name(name: str) -> str: - """Bereinigt einen Namen für die Verwendung als Verzeichnis- oder Dateiname. - - - Leerzeichen → Unterstrich - - Alle Sonder- und Satzzeichen (Klammern, Komma, Punkt, …) werden entfernt - - Nur Buchstaben (inkl. Umlaute), Ziffern, Unterstrich und Bindestrich bleiben - - Mehrfache Unterstriche werden zusammengefasst - - Args: - name: Original-Name - - Returns: - Bereinigter Name - """ - name = name.replace(" ", "_") - name = re.sub(r"[^\w\-]", "", name) # nur \w (Buchstaben/Ziffern/_) und - behalten - name = re.sub(r"_+", "_", name) # mehrfache Unterstriche zusammenfassen - name = name.strip("_-") - return name - def _parse_cddb_lines(lines: list[str]) -> list[TrackInfo]: """Parse CDDB track list from abcde output lines. @@ -458,11 +439,11 @@ def _rename_files( num = int(m.group(1)) track = by_num.get(num) if track: - safe_title = _sanitize_name(track.title) + safe_title = sanitize_filename(track.title) if track.artist: new_name = ( f"{num:02d}_-_{safe_title}_-_" - f"{_sanitize_name(track.artist)}{audio_format.extension}" + f"{sanitize_filename(track.artist)}{audio_format.extension}" ) else: new_name = f"{num:02d}_-_{safe_title}{audio_format.extension}" @@ -709,7 +690,7 @@ def interactive_rip(config: RipperConfig) -> None: # Album-Root und Cover VOR dem Ripping anlegen, # damit backcover.jpg für Vision-LLM verfügbar ist. - album_root = config.output_dir / _sanitize_name(album_name) + album_root = config.output_dir / sanitize_filename(album_name) album_root.mkdir(parents=True, exist_ok=True) if mb_mbid: download_caa_covers(mb_mbid, album_root) @@ -844,7 +825,7 @@ def interactive_rip(config: RipperConfig) -> None: disc_dir = ( config.output_dir - / _sanitize_name(album_name) + / sanitize_filename(album_name) / f"CD{disc_num}" ) @@ -955,7 +936,7 @@ def interactive_rip(config: RipperConfig) -> None: if final_album is not None: # Verzeichnis umbenennen, wenn Vision-LLM/CDDB einen anderen # Namen lieferte als der Platzhalter (z. B. "Album1") - proper_name = _sanitize_name(final_album.album or album_name) + proper_name = sanitize_filename(final_album.album or album_name) if proper_name and proper_name != album_root.name: new_root = album_root.parent / proper_name if new_root.exists(): diff --git a/tests/test_organizer.py b/tests/test_organizer.py index 70891ab..6da3729 100644 --- a/tests/test_organizer.py +++ b/tests/test_organizer.py @@ -17,16 +17,22 @@ class TestSanitizeFilename: def test_replaces_spaces(self) -> None: assert sanitize_filename("Hello World") == "Hello_World" - def test_replaces_punctuation(self) -> None: + def test_removes_punctuation(self) -> None: assert sanitize_filename("Song (Live)") == "Song_Live" assert sanitize_filename("Artist: Name") == "Artist_Name" + assert sanitize_filename("Vol. 2") == "Vol_2" + assert sanitize_filename("Hello, World!") == "Hello_World" + + def test_keeps_hyphens(self) -> None: + assert sanitize_filename("AC-DC") == "AC-DC" + assert sanitize_filename("Sonata - Allegro") == "Sonata_-_Allegro" def test_collapses_multiple_underscores(self) -> None: assert sanitize_filename("A B") == "A_B" - assert sanitize_filename("A--B") == "A_B" - def test_strips_leading_trailing_underscores(self) -> None: + def test_strips_leading_trailing(self) -> None: assert sanitize_filename("_Test_") == "Test" + assert sanitize_filename("-Test-") == "Test" def test_keeps_umlauts(self) -> None: assert sanitize_filename("Für Elise") == "Für_Elise" diff --git a/tests/test_ripper.py b/tests/test_ripper.py index 159ec03..7b9ff25 100644 --- a/tests/test_ripper.py +++ b/tests/test_ripper.py @@ -5,6 +5,7 @@ from unittest.mock import MagicMock, patch from musiksammlung.config import AudioFormat from musiksammlung.models import Album, Disc, Track, TrackInfo +from musiksammlung.organizer import sanitize_filename from musiksammlung.ripper import ( RipperConfig, _clean_input, @@ -12,38 +13,37 @@ from musiksammlung.ripper import ( _parse_cddb_lines, _parse_grab_tracks, _rename_files, - _sanitize_name, interactive_rip, ) class TestSanitizeName: - """Tests für _sanitize_name.""" + """Tests für sanitize_filename (aus organizer, genutzt von ripper und apply).""" def test_replace_spaces(self) -> None: - assert _sanitize_name("Hello World") == "Hello_World" - assert _sanitize_name("Test Track Title") == "Test_Track_Title" + assert sanitize_filename("Hello World") == "Hello_World" + assert sanitize_filename("Test Track Title") == "Test_Track_Title" def test_remove_invalid_chars(self) -> None: - assert _sanitize_name("Test/Track:Name") == "TestTrackName" - assert _sanitize_name('Test"Name') == "TestTrackName" - assert _sanitize_name("Test|Track?Name*") == "TestTrackName" - assert _sanitize_name("It's_a_Test") == "Its_a_Test" + assert sanitize_filename("Test/Track:Name") == "TestTrackName" + assert sanitize_filename('Test"Name') == "TestTrackName" + assert sanitize_filename("Test|Track?Name*") == "TestTrackName" + assert sanitize_filename("It's_a_Test") == "Its_a_Test" def test_remove_brackets_and_punctuation(self) -> None: - assert _sanitize_name("Best of (1990)") == "Best_of_1990" - assert _sanitize_name("Hello, World!") == "Hello_World" - assert _sanitize_name("Vol. 2") == "Vol_2" - assert _sanitize_name("Salt & Pepper [Remix]") == "Salt_Pepper_Remix" - assert _sanitize_name("The Best of... (Deluxe Edition)") == "The_Best_of_Deluxe_Edition" + assert sanitize_filename("Best of (1990)") == "Best_of_1990" + assert sanitize_filename("Hello, World!") == "Hello_World" + assert sanitize_filename("Vol. 2") == "Vol_2" + assert sanitize_filename("Salt & Pepper [Remix]") == "Salt_Pepper_Remix" + assert sanitize_filename("The Best of... (Deluxe Edition)") == "The_Best_of_Deluxe_Edition" def test_keep_umlauts(self) -> None: - assert _sanitize_name("Grüße aus Österreich") == "Grüße_aus_Österreich" - assert _sanitize_name("Schöne Sänger") == "Schöne_Sänger" + assert sanitize_filename("Grüße aus Österreich") == "Grüße_aus_Österreich" + assert sanitize_filename("Schöne Sänger") == "Schöne_Sänger" def test_strip_underscores(self) -> None: - assert _sanitize_name("_Test_") == "Test" - assert _sanitize_name(" _Test_ ") == "Test" + assert sanitize_filename("_Test_") == "Test" + assert sanitize_filename(" _Test_ ") == "Test" class TestCleanInput: diff --git a/tests/test_scanner_server.py b/tests/test_scanner_server.py new file mode 100644 index 0000000..8b8819c --- /dev/null +++ b/tests/test_scanner_server.py @@ -0,0 +1,217 @@ +"""Tests für den Scanner-Server (HTTP-Upload für Backcover/EAN-Fotos).""" + +from __future__ import annotations + +import base64 +import json +import socket +import urllib.error +import urllib.request +from pathlib import Path +from unittest.mock import patch + +from musiksammlung.scanner_server import ScannerServer, _get_local_ip, print_qr + + +def _free_port() -> int: + """Gibt einen freien TCP-Port zurück.""" + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.bind(("", 0)) + return s.getsockname()[1] + + +class TestScannerServerBasics: + """Grundlegende ScannerServer-Tests ohne Netzwerk.""" + + def test_url_format(self) -> None: + server = ScannerServer(port=12345) + url = server.url() + assert url.startswith("http://") + assert "12345" in url + + def test_get_photo_empty_queue_returns_none(self, tmp_path: Path) -> None: + server = ScannerServer(port=_free_port(), upload_dir=tmp_path) + assert server.get_photo(timeout=0) is None + + def test_custom_upload_dir(self, tmp_path: Path) -> None: + server = ScannerServer(port=_free_port(), upload_dir=tmp_path) + assert server._upload_dir == tmp_path + + +class TestScannerServerHttp: + """Integrationstests mit echtem HTTP-Server.""" + + def test_get_root_returns_html(self, tmp_path: Path) -> None: + port = _free_port() + server = ScannerServer(port=port, upload_dir=tmp_path) + server.start() + try: + with urllib.request.urlopen(f"http://127.0.0.1:{port}/") as resp: + assert resp.status == 200 + html = resp.read().decode("utf-8") + assert " None: + port = _free_port() + server = ScannerServer(port=port, upload_dir=tmp_path) + server.start() + try: + with urllib.request.urlopen(f"http://127.0.0.1:{port}/index.html") as resp: + assert resp.status == 200 + finally: + server.stop() + + def test_get_unknown_path_returns_404(self, tmp_path: Path) -> None: + port = _free_port() + server = ScannerServer(port=port, upload_dir=tmp_path) + server.start() + try: + try: + urllib.request.urlopen(f"http://127.0.0.1:{port}/unknown") + assert False, "Should have raised HTTPError" + except urllib.error.HTTPError as e: + assert e.code == 404 + finally: + server.stop() + + def test_post_upload_valid_image(self, tmp_path: Path) -> None: + port = _free_port() + server = ScannerServer(port=port, upload_dir=tmp_path) + server.start() + try: + fake_img = b"fake image bytes for testing" + b64 = base64.b64encode(fake_img).decode() + payload = json.dumps({"image": f"data:image/jpeg;base64,{b64}"}).encode() + req = urllib.request.Request( + f"http://127.0.0.1:{port}/upload", + data=payload, + headers={"Content-Type": "application/json"}, + ) + with urllib.request.urlopen(req) as resp: + data = json.loads(resp.read()) + assert data["status"] == "ok" + + photo = server.get_photo(timeout=2.0) + assert photo is not None + assert photo.suffix == ".jpg" + assert photo.read_bytes() == fake_img + finally: + server.stop() + + def test_post_upload_png_extension(self, tmp_path: Path) -> None: + port = _free_port() + server = ScannerServer(port=port, upload_dir=tmp_path) + server.start() + try: + b64 = base64.b64encode(b"png data").decode() + payload = json.dumps({"image": f"data:image/png;base64,{b64}"}).encode() + req = urllib.request.Request( + f"http://127.0.0.1:{port}/upload", + data=payload, + headers={"Content-Type": "application/json"}, + ) + with urllib.request.urlopen(req): + pass + + photo = server.get_photo(timeout=2.0) + assert photo is not None + assert photo.suffix == ".png" + finally: + server.stop() + + def test_post_upload_invalid_json_returns_400(self, tmp_path: Path) -> None: + port = _free_port() + server = ScannerServer(port=port, upload_dir=tmp_path) + server.start() + try: + payload = b"not valid json at all" + req = urllib.request.Request( + f"http://127.0.0.1:{port}/upload", + data=payload, + headers={"Content-Type": "application/json"}, + ) + try: + urllib.request.urlopen(req) + assert False, "Should have raised HTTPError" + except urllib.error.HTTPError as e: + assert e.code == 400 + finally: + server.stop() + + def test_post_upload_missing_image_key_returns_400(self, tmp_path: Path) -> None: + port = _free_port() + server = ScannerServer(port=port, upload_dir=tmp_path) + server.start() + try: + payload = json.dumps({"wrong_key": "data"}).encode() + req = urllib.request.Request( + f"http://127.0.0.1:{port}/upload", + data=payload, + headers={"Content-Type": "application/json"}, + ) + try: + urllib.request.urlopen(req) + assert False, "Should have raised HTTPError" + except urllib.error.HTTPError as e: + assert e.code == 400 + finally: + server.stop() + + def test_post_to_unknown_path_returns_404(self, tmp_path: Path) -> None: + port = _free_port() + server = ScannerServer(port=port, upload_dir=tmp_path) + server.start() + try: + req = urllib.request.Request( + f"http://127.0.0.1:{port}/other", + data=b"data", + headers={"Content-Type": "application/json"}, + ) + try: + urllib.request.urlopen(req) + assert False, "Should have raised HTTPError" + except urllib.error.HTTPError as e: + assert e.code == 404 + finally: + server.stop() + + +class TestGetLocalIp: + """Tests für _get_local_ip.""" + + def test_returns_ip_string(self) -> None: + ip = _get_local_ip() + assert isinstance(ip, str) + parts = ip.split(".") + assert len(parts) == 4 + + def test_fallback_on_network_error(self) -> None: + with patch("socket.socket") as mock_sock: + mock_sock.return_value.__enter__.return_value.connect.side_effect = OSError + result = _get_local_ip() + assert result == "127.0.0.1" + + +class TestPrintQr: + """Tests für print_qr.""" + + def test_with_qrcode_installed(self, capsys) -> None: + # qrcode ist installiert (steht in pyproject.toml); kein Fehler erwartet + print_qr("http://192.168.1.1:8765") + # Ausgabe enthält mindestens etwas (QR oder leer — beides ok) + + def test_without_qrcode_graceful(self, monkeypatch) -> None: + """Wenn qrcode nicht installiert ist, wird kein Fehler geworfen.""" + import builtins + original_import = builtins.__import__ + + def mock_import(name, *args, **kwargs): + if name == "qrcode": + raise ImportError("No module named 'qrcode'") + return original_import(name, *args, **kwargs) + + monkeypatch.setattr(builtins, "__import__", mock_import) + print_qr("http://example.com") # darf keinen Fehler werfen