- Unify sanitize_filename (organizer) and _sanitize_name (ripper): both now use whitelist approach — spaces→underscore, keep \w and hyphens, remove everything else (brackets, punctuation, commas, dots, …). _sanitize_name removed from ripper.py; ripper now imports sanitize_filename from organizer directly. - Add tests/test_scanner_server.py: 15 tests covering HTTP GET/POST handlers, image upload queue, 404/400 error paths, _get_local_ip fallback, print_qr graceful degradation without qrcode installed. - Delete empty stray file '3' from repo root. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
217 lines
7.6 KiB
Python
217 lines
7.6 KiB
Python
"""Tests für den Scanner-Server (HTTP-Upload für Backcover/EAN-Fotos)."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import base64
|
|
import json
|
|
import socket
|
|
import urllib.error
|
|
import urllib.request
|
|
from pathlib import Path
|
|
from unittest.mock import patch
|
|
|
|
from musiksammlung.scanner_server import ScannerServer, _get_local_ip, print_qr
|
|
|
|
|
|
def _free_port() -> int:
|
|
"""Gibt einen freien TCP-Port zurück."""
|
|
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
|
|
s.bind(("", 0))
|
|
return s.getsockname()[1]
|
|
|
|
|
|
class TestScannerServerBasics:
|
|
"""Grundlegende ScannerServer-Tests ohne Netzwerk."""
|
|
|
|
def test_url_format(self) -> None:
|
|
server = ScannerServer(port=12345)
|
|
url = server.url()
|
|
assert url.startswith("http://")
|
|
assert "12345" in url
|
|
|
|
def test_get_photo_empty_queue_returns_none(self, tmp_path: Path) -> None:
|
|
server = ScannerServer(port=_free_port(), upload_dir=tmp_path)
|
|
assert server.get_photo(timeout=0) is None
|
|
|
|
def test_custom_upload_dir(self, tmp_path: Path) -> None:
|
|
server = ScannerServer(port=_free_port(), upload_dir=tmp_path)
|
|
assert server._upload_dir == tmp_path
|
|
|
|
|
|
class TestScannerServerHttp:
|
|
"""Integrationstests mit echtem HTTP-Server."""
|
|
|
|
def test_get_root_returns_html(self, tmp_path: Path) -> None:
|
|
port = _free_port()
|
|
server = ScannerServer(port=port, upload_dir=tmp_path)
|
|
server.start()
|
|
try:
|
|
with urllib.request.urlopen(f"http://127.0.0.1:{port}/") as resp:
|
|
assert resp.status == 200
|
|
html = resp.read().decode("utf-8")
|
|
assert "<html" in html.lower()
|
|
assert "upload" in html.lower()
|
|
finally:
|
|
server.stop()
|
|
|
|
def test_get_index_html_alias(self, tmp_path: Path) -> None:
|
|
port = _free_port()
|
|
server = ScannerServer(port=port, upload_dir=tmp_path)
|
|
server.start()
|
|
try:
|
|
with urllib.request.urlopen(f"http://127.0.0.1:{port}/index.html") as resp:
|
|
assert resp.status == 200
|
|
finally:
|
|
server.stop()
|
|
|
|
def test_get_unknown_path_returns_404(self, tmp_path: Path) -> None:
|
|
port = _free_port()
|
|
server = ScannerServer(port=port, upload_dir=tmp_path)
|
|
server.start()
|
|
try:
|
|
try:
|
|
urllib.request.urlopen(f"http://127.0.0.1:{port}/unknown")
|
|
assert False, "Should have raised HTTPError"
|
|
except urllib.error.HTTPError as e:
|
|
assert e.code == 404
|
|
finally:
|
|
server.stop()
|
|
|
|
def test_post_upload_valid_image(self, tmp_path: Path) -> None:
|
|
port = _free_port()
|
|
server = ScannerServer(port=port, upload_dir=tmp_path)
|
|
server.start()
|
|
try:
|
|
fake_img = b"fake image bytes for testing"
|
|
b64 = base64.b64encode(fake_img).decode()
|
|
payload = json.dumps({"image": f"data:image/jpeg;base64,{b64}"}).encode()
|
|
req = urllib.request.Request(
|
|
f"http://127.0.0.1:{port}/upload",
|
|
data=payload,
|
|
headers={"Content-Type": "application/json"},
|
|
)
|
|
with urllib.request.urlopen(req) as resp:
|
|
data = json.loads(resp.read())
|
|
assert data["status"] == "ok"
|
|
|
|
photo = server.get_photo(timeout=2.0)
|
|
assert photo is not None
|
|
assert photo.suffix == ".jpg"
|
|
assert photo.read_bytes() == fake_img
|
|
finally:
|
|
server.stop()
|
|
|
|
def test_post_upload_png_extension(self, tmp_path: Path) -> None:
|
|
port = _free_port()
|
|
server = ScannerServer(port=port, upload_dir=tmp_path)
|
|
server.start()
|
|
try:
|
|
b64 = base64.b64encode(b"png data").decode()
|
|
payload = json.dumps({"image": f"data:image/png;base64,{b64}"}).encode()
|
|
req = urllib.request.Request(
|
|
f"http://127.0.0.1:{port}/upload",
|
|
data=payload,
|
|
headers={"Content-Type": "application/json"},
|
|
)
|
|
with urllib.request.urlopen(req):
|
|
pass
|
|
|
|
photo = server.get_photo(timeout=2.0)
|
|
assert photo is not None
|
|
assert photo.suffix == ".png"
|
|
finally:
|
|
server.stop()
|
|
|
|
def test_post_upload_invalid_json_returns_400(self, tmp_path: Path) -> None:
|
|
port = _free_port()
|
|
server = ScannerServer(port=port, upload_dir=tmp_path)
|
|
server.start()
|
|
try:
|
|
payload = b"not valid json at all"
|
|
req = urllib.request.Request(
|
|
f"http://127.0.0.1:{port}/upload",
|
|
data=payload,
|
|
headers={"Content-Type": "application/json"},
|
|
)
|
|
try:
|
|
urllib.request.urlopen(req)
|
|
assert False, "Should have raised HTTPError"
|
|
except urllib.error.HTTPError as e:
|
|
assert e.code == 400
|
|
finally:
|
|
server.stop()
|
|
|
|
def test_post_upload_missing_image_key_returns_400(self, tmp_path: Path) -> None:
|
|
port = _free_port()
|
|
server = ScannerServer(port=port, upload_dir=tmp_path)
|
|
server.start()
|
|
try:
|
|
payload = json.dumps({"wrong_key": "data"}).encode()
|
|
req = urllib.request.Request(
|
|
f"http://127.0.0.1:{port}/upload",
|
|
data=payload,
|
|
headers={"Content-Type": "application/json"},
|
|
)
|
|
try:
|
|
urllib.request.urlopen(req)
|
|
assert False, "Should have raised HTTPError"
|
|
except urllib.error.HTTPError as e:
|
|
assert e.code == 400
|
|
finally:
|
|
server.stop()
|
|
|
|
def test_post_to_unknown_path_returns_404(self, tmp_path: Path) -> None:
|
|
port = _free_port()
|
|
server = ScannerServer(port=port, upload_dir=tmp_path)
|
|
server.start()
|
|
try:
|
|
req = urllib.request.Request(
|
|
f"http://127.0.0.1:{port}/other",
|
|
data=b"data",
|
|
headers={"Content-Type": "application/json"},
|
|
)
|
|
try:
|
|
urllib.request.urlopen(req)
|
|
assert False, "Should have raised HTTPError"
|
|
except urllib.error.HTTPError as e:
|
|
assert e.code == 404
|
|
finally:
|
|
server.stop()
|
|
|
|
|
|
class TestGetLocalIp:
|
|
"""Tests für _get_local_ip."""
|
|
|
|
def test_returns_ip_string(self) -> None:
|
|
ip = _get_local_ip()
|
|
assert isinstance(ip, str)
|
|
parts = ip.split(".")
|
|
assert len(parts) == 4
|
|
|
|
def test_fallback_on_network_error(self) -> None:
|
|
with patch("socket.socket") as mock_sock:
|
|
mock_sock.return_value.__enter__.return_value.connect.side_effect = OSError
|
|
result = _get_local_ip()
|
|
assert result == "127.0.0.1"
|
|
|
|
|
|
class TestPrintQr:
|
|
"""Tests für print_qr."""
|
|
|
|
def test_with_qrcode_installed(self, capsys) -> None:
|
|
# qrcode ist installiert (steht in pyproject.toml); kein Fehler erwartet
|
|
print_qr("http://192.168.1.1:8765")
|
|
# Ausgabe enthält mindestens etwas (QR oder leer — beides ok)
|
|
|
|
def test_without_qrcode_graceful(self, monkeypatch) -> None:
|
|
"""Wenn qrcode nicht installiert ist, wird kein Fehler geworfen."""
|
|
import builtins
|
|
original_import = builtins.__import__
|
|
|
|
def mock_import(name, *args, **kwargs):
|
|
if name == "qrcode":
|
|
raise ImportError("No module named 'qrcode'")
|
|
return original_import(name, *args, **kwargs)
|
|
|
|
monkeypatch.setattr(builtins, "__import__", mock_import)
|
|
print_qr("http://example.com") # darf keinen Fehler werfen
|