diff --git a/src/musiksammlung/tagger.py b/src/musiksammlung/tagger.py index 9ff573e..298618a 100644 --- a/src/musiksammlung/tagger.py +++ b/src/musiksammlung/tagger.py @@ -2,6 +2,7 @@ from __future__ import annotations +import base64 import io import logging from pathlib import Path @@ -9,6 +10,8 @@ from pathlib import Path from mutagen import File as MutagenFile from mutagen.flac import FLAC, Picture from mutagen.id3 import APIC, ID3, ID3NoHeaderError +from mutagen.mp4 import MP4, MP4Cover +from mutagen.oggopus import OggOpus from PIL import Image from musiksammlung.models import Album, Disc, Track @@ -127,6 +130,24 @@ def embed_cover(audio_path: Path, cover_path: Path) -> None: )) audio.save(str(audio_path)) + elif suffix == ".opus": + audio = OggOpus(str(audio_path)) + pic = Picture() + pic.type = 3 # Front cover + pic.mime = mime + pic.data = cover_data + pic.width = 0 + pic.height = 0 + pic.depth = 0 + pic.colors = 0 + audio["metadata_block_picture"] = [base64.b64encode(pic.write()).decode("ascii")] + audio.save() + + elif suffix == ".m4a": + audio = MP4(str(audio_path)) + audio["covr"] = [MP4Cover(cover_data, imageformat=MP4Cover.FORMAT_JPEG)] + audio.save() + else: logger.debug("Cover-Embedding für %s nicht unterstützt", suffix) return diff --git a/tests/test_cover.py b/tests/test_cover.py index 70341df..264743d 100644 --- a/tests/test_cover.py +++ b/tests/test_cover.py @@ -2,7 +2,18 @@ from pathlib import Path -from musiksammlung.cover import find_cover +from PIL import Image + +from musiksammlung.cover import copy_covers, find_cover, prepare_cover + + +def _make_image(path: Path, size: tuple[int, int] = (100, 100), mode: str = "RGB") -> Path: + img = Image.new(mode, size, color=(200, 100, 50)) + fmt = "PNG" if path.suffix.lower() == ".png" else "JPEG" + if mode == "RGBA" and fmt == "JPEG": + img = img.convert("RGB") + img.save(str(path), fmt) + return path class TestFindCover: @@ -41,3 +52,70 @@ class TestFindCover: (tmp_path / "cover.jpg").touch() (tmp_path / "back.jpg").touch() assert find_cover(tmp_path, "front") is None + + +class TestPrepareCover: + def test_creates_jpeg_output(self, tmp_path: Path) -> None: + src = _make_image(tmp_path / "src.jpg") + dst = tmp_path / "out.jpg" + prepare_cover(src, dst) + assert dst.exists() + assert Image.open(dst).format == "JPEG" + + def test_scales_down_large_image(self, tmp_path: Path) -> None: + src = _make_image(tmp_path / "src.jpg", size=(2000, 2000)) + dst = tmp_path / "out.jpg" + prepare_cover(src, dst, max_size=1200) + assert max(Image.open(dst).size) <= 1200 + + def test_does_not_upscale_small_image(self, tmp_path: Path) -> None: + src = _make_image(tmp_path / "src.jpg", size=(300, 300)) + dst = tmp_path / "out.jpg" + prepare_cover(src, dst, max_size=1200) + assert max(Image.open(dst).size) <= 300 + + def test_converts_rgba_to_rgb(self, tmp_path: Path) -> None: + src = tmp_path / "src.png" + Image.new("RGBA", (100, 100), (255, 0, 0, 128)).save(str(src), "PNG") + dst = tmp_path / "out.jpg" + prepare_cover(src, dst) + assert Image.open(dst).mode == "RGB" + + def test_creates_parent_directory(self, tmp_path: Path) -> None: + src = _make_image(tmp_path / "src.jpg") + dst = tmp_path / "subdir" / "nested" / "out.jpg" + prepare_cover(src, dst) + assert dst.exists() + + +class TestCopyCovers: + def test_copies_front_cover(self, tmp_path: Path) -> None: + src = _make_image(tmp_path / "src.jpg") + copy_covers(src, None, tmp_path) + assert (tmp_path / "frontcover.jpg").exists() + + def test_copies_back_cover(self, tmp_path: Path) -> None: + src = _make_image(tmp_path / "src.jpg") + copy_covers(None, src, tmp_path) + assert (tmp_path / "backcover.jpg").exists() + + def test_copies_both_covers(self, tmp_path: Path) -> None: + front = _make_image(tmp_path / "front.jpg") + back = _make_image(tmp_path / "back.jpg") + copy_covers(front, back, tmp_path) + assert (tmp_path / "frontcover.jpg").exists() + assert (tmp_path / "backcover.jpg").exists() + + def test_skips_nonexistent_front(self, tmp_path: Path) -> None: + copy_covers(tmp_path / "nope.jpg", None, tmp_path) + assert not (tmp_path / "frontcover.jpg").exists() + + def test_skips_nonexistent_back(self, tmp_path: Path) -> None: + copy_covers(None, tmp_path / "nope.jpg", tmp_path) + assert not (tmp_path / "backcover.jpg").exists() + + def test_existing_frontcover_not_overwritten_when_no_source(self, tmp_path: Path) -> None: + existing = _make_image(tmp_path / "frontcover.jpg") + original_mtime = existing.stat().st_mtime + copy_covers(None, None, tmp_path) + assert (tmp_path / "frontcover.jpg").stat().st_mtime == original_mtime diff --git a/tests/test_ocr.py b/tests/test_ocr.py new file mode 100644 index 0000000..5601c7b --- /dev/null +++ b/tests/test_ocr.py @@ -0,0 +1,191 @@ +"""Tests für OCR-Funktionen (subprocess via Mock).""" + +from __future__ import annotations + +from pathlib import Path +from unittest.mock import MagicMock, patch + +from PIL import Image + +from musiksammlung.ocr import _detect_and_fix_rotation, ocr_images, run_ocr + + +def _fake_run(stdout: str = "", returncode: int = 0) -> MagicMock: + """Erstellt ein Mock-subprocess.CompletedProcess.""" + result = MagicMock() + result.returncode = returncode + result.stdout = stdout + result.stderr = "" + return result + + +def _make_image(path: Path, size: tuple[int, int] = (100, 100)) -> Path: + Image.new("RGB", size, color=(200, 100, 50)).save(str(path), "JPEG") + return path + + +# --------------------------------------------------------------------------- +# run_ocr +# --------------------------------------------------------------------------- + + +class TestRunOcr: + def test_returns_stdout_on_success(self, tmp_path: Path) -> None: + img = _make_image(tmp_path / "test.jpg") + with patch("musiksammlung.ocr.subprocess.run", return_value=_fake_run("Trackliste\n")): + result = run_ocr(img) + assert result == "Trackliste" + + def test_calls_tesseract_with_correct_args(self, tmp_path: Path) -> None: + img = _make_image(tmp_path / "test.jpg") + with patch("musiksammlung.ocr.subprocess.run", return_value=_fake_run()) as mock_run: + run_ocr(img, languages="deu+eng") + args = mock_run.call_args[0][0] + assert args[0] == "tesseract" + assert str(img) in args + assert "deu+eng" in args + assert "--psm" in args + + def test_raises_on_nonzero_returncode(self, tmp_path: Path) -> None: + img = _make_image(tmp_path / "test.jpg") + with patch( + "musiksammlung.ocr.subprocess.run", + return_value=_fake_run("", returncode=1), + ): + try: + run_ocr(img) + assert False, "RuntimeError expected" + except RuntimeError as e: + assert "Tesseract" in str(e) + + def test_custom_language(self, tmp_path: Path) -> None: + img = _make_image(tmp_path / "test.jpg") + with patch("musiksammlung.ocr.subprocess.run", return_value=_fake_run()) as mock_run: + run_ocr(img, languages="fra") + args = mock_run.call_args[0][0] + assert "fra" in args + + +# --------------------------------------------------------------------------- +# _detect_and_fix_rotation +# --------------------------------------------------------------------------- + + +class TestDetectAndFixRotation: + def test_no_rotation_needed(self) -> None: + img = Image.new("L", (200, 100)) + osd_output = "Rotate: 0\nOrientation in degrees: 0\n" + with patch("musiksammlung.ocr.subprocess.run", return_value=_fake_run(osd_output)): + result = _detect_and_fix_rotation(img) + assert result.size == (200, 100) + + def test_rotates_90_degrees(self) -> None: + img = Image.new("L", (200, 100)) # breit + osd_output = "Rotate: 90\nOrientation in degrees: 90\n" + with patch("musiksammlung.ocr.subprocess.run", return_value=_fake_run(osd_output)): + result = _detect_and_fix_rotation(img) + # 200×100 rotiert um 90° → 100×200 + assert result.size == (100, 200) + + def test_rotates_180_degrees(self) -> None: + img = Image.new("L", (200, 100)) + osd_output = "Rotate: 180\n" + with patch("musiksammlung.ocr.subprocess.run", return_value=_fake_run(osd_output)): + result = _detect_and_fix_rotation(img) + # 180° Rotation ändert Größe nicht + assert result.size == (200, 100) + + def test_fallback_brute_force_when_osd_fails(self) -> None: + img = Image.new("L", (200, 100)) + call_count = [0] + + def side_effect(cmd, **kwargs): + call_count[0] += 1 + if call_count[0] == 1: + # Erster Aufruf (OSD) schlägt fehl + return _fake_run("", returncode=1) + # Weitere Aufrufe (brute-force): 90° liefert die meisten Buchstaben + if "--psm" in cmd and "6" in cmd: + return _fake_run("Allegro Andante Beethoven" if call_count[0] == 3 else "a") + return _fake_run("") + + with patch("musiksammlung.ocr.subprocess.run", side_effect=side_effect): + _detect_and_fix_rotation(img) + # Brute-force wurde verwendet (mind. 4 Subprocess-Aufrufe) + assert call_count[0] >= 2 + + +# --------------------------------------------------------------------------- +# ocr_images +# --------------------------------------------------------------------------- + + +class TestOcrImages: + def test_returns_text_for_single_image(self, tmp_path: Path) -> None: + img = _make_image(tmp_path / "img.jpg") + + with ( + patch("musiksammlung.ocr.preprocess_image", return_value=img), + patch("musiksammlung.ocr.run_ocr", return_value="Track 1\nTrack 2"), + ): + result = ocr_images([img]) + + assert result == "Track 1\nTrack 2" + + def test_concatenates_multiple_images(self, tmp_path: Path) -> None: + img1 = _make_image(tmp_path / "img1.jpg") + img2 = _make_image(tmp_path / "img2.jpg") + preprocessed = tmp_path / "pre.png" + preprocessed.write_bytes(b"fake") + + with ( + patch("musiksammlung.ocr.preprocess_image", return_value=preprocessed), + patch("musiksammlung.ocr.run_ocr", side_effect=["Text A", "Text B"]), + ): + result = ocr_images([img1, img2]) + + assert result == "Text A\n\nText B" + + def test_skips_empty_ocr_result(self, tmp_path: Path) -> None: + img = _make_image(tmp_path / "img.jpg") + preprocessed = tmp_path / "pre.png" + preprocessed.write_bytes(b"fake") + + with ( + patch("musiksammlung.ocr.preprocess_image", return_value=preprocessed), + patch("musiksammlung.ocr.run_ocr", return_value=""), + ): + result = ocr_images([img]) + + assert result == "" + + def test_cleans_up_temp_file(self, tmp_path: Path) -> None: + img = _make_image(tmp_path / "img.jpg") + preprocessed = tmp_path / "pre.png" + preprocessed.write_bytes(b"fake") + + with ( + patch("musiksammlung.ocr.preprocess_image", return_value=preprocessed), + patch("musiksammlung.ocr.run_ocr", return_value="text"), + ): + ocr_images([img]) + + # Temporäre Datei wurde gelöscht + assert not preprocessed.exists() + + def test_cleans_up_even_on_error(self, tmp_path: Path) -> None: + img = _make_image(tmp_path / "img.jpg") + preprocessed = tmp_path / "pre.png" + preprocessed.write_bytes(b"fake") + + err = RuntimeError("Tesseract fehlgeschlagen") + with ( + patch("musiksammlung.ocr.preprocess_image", return_value=preprocessed), + patch("musiksammlung.ocr.run_ocr", side_effect=err), + ): + try: + ocr_images([img]) + except RuntimeError: + pass + + assert not preprocessed.exists() diff --git a/tests/test_tagger.py b/tests/test_tagger.py index 989a0eb..cbf793f 100644 --- a/tests/test_tagger.py +++ b/tests/test_tagger.py @@ -2,6 +2,7 @@ from __future__ import annotations +import base64 import io import struct from pathlib import Path @@ -9,6 +10,8 @@ from pathlib import Path from mutagen import File as MutagenFile from mutagen.flac import FLAC from mutagen.id3 import ID3 +from mutagen.mp4 import MP4 +from mutagen.oggopus import OggOpus from PIL import Image from musiksammlung.models import Album, Disc, Track @@ -21,6 +24,24 @@ from musiksammlung.tagger import ( tag_file, ) +# --------------------------------------------------------------------------- +# Audio-Fixtures (minimal gültige Binärdaten) +# --------------------------------------------------------------------------- + +# Minimale Opus-Datei (176 Bytes, Mono, 48kHz, 0.05s Stille, via ffmpeg erzeugt) +_OPUS_BYTES = base64.b64decode( + "T2dnUwACAAAAAAAAAABrYLxYAAAAACpOxZABE09wdXNIZWFkAQE4AYC7AAAAAABPZ2dTAAAAAAAA" + "AAAAAGtgvFgBAAAAB1dgAwE+T3B1c1RhZ3MNAAAATGF2ZjYwLjE2LjEwMAEAAAAdAAAAZW5jb2Rl" + "cj1MYXZjNjAuMzEuMTAyIGxpYm9wdXNPZ2dTAASYCgAAAAAAAGtgvFgCAAAAezPduwMDAwP4//74" + "//74//4=" +) + +# Minimale M4A-Datei (856 Bytes, AAC-Mono, 44100Hz, 0.05s Stille) +_M4A_BYTES = base64.b64decode( + "AAAAHGZ0eXBNNEEgAAACAE00QSBpc29taXNvMgAAAAhmcmVlAAAAKW1kYXTeAgBMYXZjNjAuMzEu" + "MTAyAAIwQA4BGCAHARggBwEYIAcAAAMLbW9vdgAAAGxtdmhkAAAAAAAAAAAAAAAAAAAD6AAAADIAAQAAAQAAAAAAAAAAAAAAAAEAAAAAAAAAAAAAAAAAAAABAAAAAAAAAAAAAAAAAABAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAgAAAjV0cmFrAAAAXHRraGQAAAADAAAAAAAAAAAAAAABAAAAAAAAADIAAAAAAAAAAAAAAAEBAAAAAAEAAAAAAAAAAAAAAAAAAAABAAAAAAAAAAAAAAAAAABAAAAAAAAAAAAAAAAAAAAkZWR0cwAAABxlbHN0AAAAAAAAAAEAAAAyAAAEAAABAAAAAAGtbWRpYQAAACBtZGhkAAAAAAAAAAAAAAAAAACsRAAADJ1VxAAAAAAALWhkbHIAAAAAAAAAAHNvdW4AAAAAAAAAAAAAAABTb3VuZEhhbmRsZXIAAAABWG1pbmYAAAAQc21oZAAAAAAAAAAAAAAAJGRpbmYAAAAcZHJlZgAAAAAAAAABAAAADHVybCAAAAABAAABHHN0YmwAAABqc3RzZAAAAAAAAAABAAAAWm1wNGEAAAAAAAAAAQAAAAAAAAAAAAEAEAAAAACsRAAAAAAANmVzZHMAAAAAA4CAgCUAAQAEgICAF0AVAAAAAAENiAAADhUFgICABRIIVuUABoCAgAECAAAAIHN0dHMAAAAAAAAAAgAAAAMAAAQAAAAAAQAAAJ0AAAAcc3RzYwAAAAAAAAABAAAAAQAAAAQAAAABAAAAJHN0c3oAAAAAAAAAAAAAAAQAAAAVAAAABAAAAAQAAAAEAAAAFHN0Y28AAAAAAAAAAQAAACwAAAAac2dwZAEAAAByb2xsAAAAAgAAAAH//wAAABxzYmdwAAAAAHJvbGwAAAABAAAABAAAAAEAAABidWR0YQAAAFptZXRhAAAAAAAAACFoZGxyAAAAAAAAAABtZGlyYXBwbAAAAAAAAAAAAAAAAC1pbHN0AAAAJal0b28AAAAdZGF0YQAAAAEAAAAATGF2ZjYwLjE2LjEwMA==" +) + # --------------------------------------------------------------------------- # Hilfsfunktionen # --------------------------------------------------------------------------- @@ -41,6 +62,18 @@ def _make_flac(path: Path) -> Path: return path +def _make_opus(path: Path) -> Path: + """Erstellt eine minimale gültige Opus-Datei aus eingebettetem Fixture.""" + path.write_bytes(_OPUS_BYTES) + return path + + +def _make_m4a(path: Path) -> Path: + """Erstellt eine minimale gültige M4A-Datei aus eingebettetem Fixture.""" + path.write_bytes(_M4A_BYTES) + return path + + def _make_mp3(path: Path) -> Path: """Erstellt eine minimale gültige MP3-Datei (MPEG1 Layer3, 128kbps, 44100Hz, mono).""" # Frame-Header: 0xFF 0xFB 0x90 0xC4 @@ -281,6 +314,25 @@ class TestEmbedCover: tags = ID3(str(path)) assert any(k.startswith("APIC") for k in tags.keys()) + def test_embeds_cover_in_opus(self, tmp_path: Path) -> None: + path = _make_opus(tmp_path / "t.opus") + cover = _make_cover(tmp_path / "c.jpg") + + embed_cover(path, cover) + + audio = OggOpus(str(path)) + assert "metadata_block_picture" in audio + + def test_embeds_cover_in_m4a(self, tmp_path: Path) -> None: + path = _make_m4a(tmp_path / "t.m4a") + cover = _make_cover(tmp_path / "c.jpg") + + embed_cover(path, cover) + + audio = MP4(str(path)) + assert "covr" in audio + assert len(audio["covr"]) == 1 + def test_unsupported_format_does_not_crash(self, tmp_path: Path) -> None: path = tmp_path / "t.ogg" path.write_bytes(b"\x00" * 64)