Add v9 generator, test suite and README

- jellyfin_playlist_generator_v9.py: kompletter Rewrite mit Bugfixes
  (FILENAME_PATTERNS-Reihenfolge, IndexError bei leeren Artists,
  Backup-Namenskollision, Symlink-Schleife), neuem --root-playlist-Feature,
  maybe_tqdm-Helper und verbesserter Fehlerbehandlung
- test_suite_v9.py: 27 Tests (Unit + Integration) für v9
- README.md: vollständige Dokumentation aller Optionen und Features

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Dieter Schlüter 2026-04-28 16:17:18 +02:00
commit 1b15d45ba8
3 changed files with 1202 additions and 1 deletions

181
README.md
View file

@ -1 +1,180 @@
# Jellyfin_Playlist_Generator
# Jellyfin Playlist Generator v9
Rekursiver `.m3u`-Playlist-Generator für Jellyfin-Musikbibliotheken.
**Prinzip:** Du gibst ein Root-Verzeichnis an. Jeder direkte Unterordner gilt als Album und bekommt genau eine Playlist. Tracks werden rekursiv aus allen Unterordnern des Albums gesammelt, nach Unterordner und Dateiname natural-sortiert.
---
## Voraussetzungen
Python 3.10+
```bash
pip install mutagen tqdm tenacity # alle optional, aber empfohlen
```
| Paket | Funktion |
|-------|---------|
| `mutagen` | Tags lesen und schreiben |
| `tqdm` | Fortschrittsbalken |
| `tenacity` | Robusteres Retry bei I/O-Fehlern |
Ohne `mutagen` werden nur Playlists erzeugt; Tags werden weder gelesen noch geschrieben.
---
## Aufruf
```
jellyfin_playlist_generator_v9.py [Optionen] PFAD [PFAD ...]
```
### Minimaler Aufruf
```bash
python3 jellyfin_playlist_generator_v9.py /pfad/zur/Musik
```
### Alle Optionen
| Option | Beschreibung |
|--------|-------------|
| `--dry-run` | Simulationsmodus: nichts schreiben, nichts löschen |
| `--relative-to PFAD` | Playlist-Einträge relativ zu diesem Basispfad |
| `--root-playlist` | Zusätzlich pro Root-Verzeichnis eine Sammel-Playlist erzeugen |
| `--write-tags` | Harmonisierte Tags zurück in die Audiodateien schreiben (erfordert `mutagen`) |
| `--backup PFAD` | Vor dem Tag-Schreiben Originale in dieses Verzeichnis sichern |
| `--report PFAD` | CSV-Report aller Tracks speichern |
| `--no-tqdm` | Fortschrittsanzeige deaktivieren |
---
## Beispiele
**Dry-Run — was würde passieren?**
```bash
python3 jellyfin_playlist_generator_v9.py --dry-run /mnt/Musik
```
**Playlists mit relativen Pfaden (für portierbare Jellyfin-Bibliothek)**
```bash
python3 jellyfin_playlist_generator_v9.py --relative-to /mnt/Musik /mnt/Musik
```
**Zusätzliche Sammel-Playlist pro Root**
```bash
python3 jellyfin_playlist_generator_v9.py --root-playlist /mnt/Musik
```
Erzeugt neben den Album-Playlists auch `/mnt/Musik/Musik.m3u` mit allen Tracks.
**Tags harmonisieren mit Backup**
```bash
python3 jellyfin_playlist_generator_v9.py --write-tags --backup /mnt/Backup /mnt/Musik
```
**CSV-Report für Qualitätskontrolle**
```bash
python3 jellyfin_playlist_generator_v9.py --report /tmp/report.csv /mnt/Musik
```
---
## Verzeichnisstruktur
```
Musik/ ← Root (wird übergeben)
├── Pink Floyd/ ← Album-Ordner
│ ├── CD1/
│ │ ├── 01 - Shine On.mp3
│ │ └── 02 - Have a Cigar.mp3
│ ├── CD2/
│ │ └── 10 - Comfortably Numb.mp3
│ └── Pink Floyd.m3u ← erzeugte Playlist (alle Tracks aus CD1+CD2)
├── Dave Brubeck Quartet/
│ ├── 01 - Blue Rondo.flac
│ └── Dave Brubeck Quartet.m3u
└── Musik.m3u ← nur mit --root-playlist
```
- Versteckte Ordner (`.` oder `_` als Präfix) werden übersprungen.
- Bestehende `.m3u`/`.m3u8`-Dateien werden vor dem Schreiben gelöscht.
- Ordner ohne Mediendateien werden still übersprungen.
---
## Metadaten-Verarbeitung
Das Skript versucht für jeden Track sinnvolle Werte für `title`, `artist`, `album`, `albumartist`, `tracknumber`, `discnumber` und `date` zu ermitteln.
**Quellen (Priorität: Tag > Dateiname > Ordnername):**
1. **ID3/FLAC/MP4-Tags** via Mutagen
2. **Dateiname-Parsing** nach diesen Mustern (spezifischste zuerst):
| Muster | Beispiel | Ergebnis |
|--------|---------|---------|
| `D-TT - Titel` | `2-07 - Finale.mp3` | disc=2, track=7, title=Finale |
| `TT - Artist - Titel` | `07 - Pink Floyd - Shine.mp3` | track=7, artist=Pink Floyd, title=Shine |
| `TT - Titel` | `07 - Shine.mp3` | track=7, title=Shine |
| `Artist - Titel` | `Pink Floyd - Shine.mp3` | artist=Pink Floyd, title=Shine |
3. **Disc-Nummer** aus Unterordnernamen wie `CD1`, `Disc 2`, `Disk_3`
4. **Album-Name** aus dem Ordnernamen
**albumartist-Harmonisierung:**
- ≥ 3 verschiedene Artists → `Various Artists`
- 12 Artists → Mehrheits-Artist
- Tracks mit bereits gesetztem `albumartist`-Tag werden nicht überschrieben.
**Konflikte** (z. B. Title im Tag weicht vom Dateinamen ab) werden im CSV-Report markiert.
---
## CSV-Report
Felder: `status`, `path`, `title`, `artist`, `album`, `albumartist`, `duration`, `conflicts`, `notes`
`status` ist `ok`, `changed` oder `conflict`.
```bash
python3 jellyfin_playlist_generator_v9.py --report report.csv /mnt/Musik
```
---
## Unterstützte Formate
**Audio:** `.mp3` `.flac` `.m4a` `.aac` `.ogg` `.opus` `.wav` `.wma` `.aiff` `.ape`
**Video:** `.mp4` `.mkv` `.avi` `.mov` `.wmv` `.flv` `.webm` `.m4v` `.mpeg` `.mpg` `.3gp`
---
## Tests
```bash
python3 test_suite_v9.py
```
27 Tests: Unit-Tests für Parsing/Enrichment und Integrationstests für alle CLI-Optionen.
---
## Zusammenfassung-Output
Am Ende jedes Laufs:
```
========================================
✅ Zusammenfassung:
🗑️ Playlists gelöscht: 209
📁 Root-Verzeichnisse verarbeitet: 1
💿 Album-Verzeichnisse verarbeitet: 209
🎵 Tracks analysiert: 6745
📝 Album-Playlists erstellt: 209
⚠️ Konflikte gefunden: 3476
🔄 Tags geschrieben: 0
❌ Fehler: 0
========================================
```

583
jellyfin_playlist_generator_v9.py Executable file
View file

@ -0,0 +1,583 @@
#!/usr/bin/env python3
"""
jellyfin_playlist_generator_v9.py
Hybrider, rekursiver Playlist- & Metadaten-Generator für Jellyfin.
Default-Verhalten:
- das übergebene Root-Verzeichnis dient nur als Suchraum
- jeder direkte Unterordner des Root-Verzeichnisses gilt als Album
- pro Album wird genau eine Playlist erzeugt
- alle Medien unterhalb des Albumordners werden rekursiv gesammelt
- vor der Generierung werden vorhandene Playlists rekursiv gelöscht
- die Reihenfolge in der Playlist wird nach Album-Unterordnern und danach nach Dateinamen sortiert
"""
from __future__ import annotations
import argparse
import csv
import os
import re
import shutil
import sys
import time
from collections import Counter
from dataclasses import dataclass, field
from functools import wraps
from pathlib import Path
from typing import Any, Dict, List, Optional
HAS_MUTAGEN = False
try:
from mutagen import File as MutagenFile
HAS_MUTAGEN = True
except ImportError:
pass
HAS_TQDM = False
try:
from tqdm import tqdm
HAS_TQDM = True
except ImportError:
pass
def retry_wrapper(max_retries=3, delay=1.0):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
last_err = None
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except Exception as e:
last_err = e
if attempt < max_retries - 1:
time.sleep(delay * (attempt + 1))
raise last_err
return wrapper
return decorator
try:
from tenacity import retry, stop_after_attempt, wait_exponential
retry_io = retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=0.5, max=3),
reraise=True,
)
except ImportError:
retry_io = retry_wrapper(max_retries=3, delay=1.0)
MEDIA_EXTENSIONS = {
".mp3", ".wav", ".flac", ".aac", ".ogg", ".m4a", ".wma", ".aiff",
".ape", ".opus", ".mp4", ".mkv", ".avi", ".mov", ".wmv", ".flv",
".webm", ".m4v", ".mpeg", ".mpg", ".3gp",
}
PLAYLIST_EXT = ".m3u"
REPORT_FIELDS = ["status", "path", "title", "artist", "album", "albumartist", "duration", "conflicts", "notes"]
PLAYLIST_EXTENSIONS = {".m3u", ".m3u8"}
BAD_VALUES = {
"", "unknown", "unknown artist", "unknown album", "untitled",
"track", "audio", "va", "various",
}
NATSORT_RE = re.compile(r"(\d+)")
FILENAME_PATTERNS = [
# "2-07 - Finale" → disc=2, track=7, title=Finale
re.compile(r"^(?P<disc>\d{1,2})-(?P<track>\d{1,3})\s*[-._ ]+\s*(?P<title>.+)$"),
# "07 - Pink Floyd - Shine" → track=7, artist=Pink Floyd, title=Shine
re.compile(r"^(?P<track>\d{1,3})\s*[-._ ]+\s*(?P<artist>.+?)\s*[-]\s*(?P<title>.+)$"),
# "07 - Shine" → track=7, title=Shine
re.compile(r"^(?P<track>\d{1,3})\s*[-._ ]+\s*(?P<title>.+)$"),
# "Pink Floyd - Shine" → artist=Pink Floyd, title=Shine
re.compile(r"^(?P<artist>.+?)\s*[-]\s*(?P<title>.+)$"),
]
@dataclass
class TrackInfo:
path: Path
stem: str
ext: str
tags: Dict[str, str] = field(default_factory=dict)
duration: Optional[float] = None
proposed: Dict[str, str] = field(default_factory=dict)
conflicts: List[str] = field(default_factory=list)
notes: List[str] = field(default_factory=list)
changed: Dict[str, Dict[str, str]] = field(default_factory=dict)
sort_group: str = ""
def clean_text(s: Optional[str]) -> str:
if not s:
return ""
return re.sub(r"\s+", " ", s.replace("_", " ")).strip(" -._")
def is_good(v: Optional[str]) -> bool:
if not v:
return False
return clean_text(v).casefold() not in BAD_VALUES
def natural_sort_key(p: Path) -> List[Any]:
return [int(x) if x.isdigit() else x.lower() for x in NATSORT_RE.split(p.name)]
def text_natural_key(value: str) -> List[Any]:
return [int(x) if x.isdigit() else x.lower() for x in NATSORT_RE.split(value)]
def is_hidden(name: str) -> bool:
return name.startswith(".") or name.startswith("_")
def parse_filename(stem: str) -> Dict[str, str]:
stem_clean = clean_text(stem)
for pat in FILENAME_PATTERNS:
m = pat.match(stem_clean)
if m:
return {k: clean_text(v) for k, v in m.groupdict().items() if v}
return {"title": stem_clean}
def choose(existing: dict, inferred: dict, key: str, aliases=()) -> Optional[str]:
for k in (key, *aliases):
if is_good(existing.get(k)):
return existing[k]
for k in (key, *aliases):
if is_good(inferred.get(k)):
return inferred[k]
return None
def cleanup_all_playlists(scan_root: Path, dry_run: bool = False) -> int:
deleted = 0
try:
for item in scan_root.rglob("*"):
if item.is_file() and item.suffix.lower() in PLAYLIST_EXTENSIONS:
try:
if dry_run:
print(f" 🗑️ [DRY-RUN] Würde löschen: {item}")
else:
item.unlink()
print(f" 🗑️ Gelöscht: {item}")
deleted += 1
except (PermissionError, OSError) as e:
print(f" ⚠️ Cleanup-Fehler bei {item}: {e}", file=sys.stderr)
except (PermissionError, OSError) as e:
print(f" ⚠️ Cleanup-Fehler in {scan_root}: {e}", file=sys.stderr)
return deleted
def collect_album_dirs(scan_root: Path) -> List[Path]:
albums: List[Path] = []
try:
for item in sorted(scan_root.iterdir(), key=natural_sort_key):
if item.is_dir() and not is_hidden(item.name):
albums.append(item)
except (PermissionError, OSError) as e:
print(f"⚠️ Fehler beim Lesen von {scan_root}: {e}", file=sys.stderr)
return albums
def collect_media_recursive(album_dir: Path) -> List[Path]:
files: List[Path] = []
try:
for dirpath, dirnames, filenames in os.walk(
album_dir,
followlinks=False,
onerror=lambda e: print(f"⚠️ Scan-Fehler: {e}", file=sys.stderr),
):
dirnames[:] = [d for d in dirnames if not is_hidden(d)]
current = Path(dirpath)
for name in filenames:
if is_hidden(name):
continue
p = current / name
if p.suffix.lower() in MEDIA_EXTENSIONS:
files.append(p)
except PermissionError as e:
print(f"⚠️ Zugriff verweigert auf {album_dir}: {e}", file=sys.stderr)
return files
def safe_read_metadata(path: Path) -> tuple[dict, Optional[float]]:
tags: dict[str, str] = {}
duration: Optional[float] = None
if not HAS_MUTAGEN:
return tags, duration
try:
audio = MutagenFile(str(path), easy=True)
if audio and audio.tags:
for k in [
"title", "artist", "album", "albumartist",
"tracknumber", "discnumber", "date", "year",
]:
v = audio.get(k)
if v:
tags[k] = str(v[0]).strip()
if hasattr(audio, "info") and audio.info and hasattr(audio.info, "length"):
duration = audio.info.length
except Exception as e:
print(f" ⚠️ Metadaten-Lesefehler {path.name}: {e}", file=sys.stderr)
if "year" in tags and "date" not in tags:
tags["date"] = tags["year"]
return tags, duration
def infer_discnumber_from_path(path: Path, album_dir: Path) -> Optional[str]:
try:
rel_parts = path.relative_to(album_dir).parts[:-1]
except ValueError:
rel_parts = path.parts[:-1]
for part in rel_parts:
m = re.search(r"(?i)(?:cd|disc|disk)\s*[_ .-]?(\d{1,2})", part)
if m:
return m.group(1)
return None
def get_sort_group(path: Path, album_dir: Path) -> str:
try:
rel_parts = path.relative_to(album_dir).parts[:-1]
except ValueError:
rel_parts = path.parts[:-1]
if not rel_parts:
return ""
return " / ".join(clean_text(part) for part in rel_parts)
def album_track_sort_key(track: TrackInfo, album_dir: Path) -> tuple:
try:
rel = track.path.relative_to(album_dir)
rel_dir_parts = rel.parts[:-1]
rel_name = rel.name
except ValueError:
rel_dir_parts = track.path.parent.parts
rel_name = track.path.name
dir_key = tuple(tuple(text_natural_key(part)) for part in rel_dir_parts)
file_key = tuple(text_natural_key(rel_name))
return (dir_key, file_key)
def sort_tracks_for_playlist(tracks: List[TrackInfo], album_dir: Path) -> List[TrackInfo]:
return sorted(tracks, key=lambda t: album_track_sort_key(t, album_dir))
def enrich_tracks(tracks: List[TrackInfo], album_dir: Path) -> List[TrackInfo]:
album_name = clean_text(album_dir.name)
for t in tracks:
inferred = parse_filename(t.stem)
inferred.setdefault("album", album_name)
discnumber = infer_discnumber_from_path(t.path, album_dir)
if discnumber:
inferred.setdefault("discnumber", discnumber)
t.sort_group = get_sort_group(t.path, album_dir)
if t.sort_group:
t.notes.append(f"sort-group: {t.sort_group}")
t.tags, t.duration = safe_read_metadata(t.path)
t.proposed = {
"title": choose(t.tags, inferred, "title"),
"artist": choose(t.tags, inferred, "artist"),
"album": choose(t.tags, inferred, "album"),
"albumartist": choose(t.tags, inferred, "albumartist", aliases=("artist",)),
"tracknumber": choose(t.tags, inferred, "tracknumber", aliases=("track",)),
"date": choose(t.tags, inferred, "date", aliases=("year",)),
}
if is_good(inferred.get("discnumber")):
t.proposed["discnumber"] = inferred["discnumber"]
if (
is_good(t.tags.get("title"))
and is_good(inferred.get("title"))
and clean_text(t.tags["title"]).casefold() != clean_text(inferred["title"]).casefold()
):
t.conflicts.append("title mismatch")
for k, new in t.proposed.items():
old = t.tags.get(k, "")
if is_good(new) and str(new) != str(old):
t.changed[k] = {"old": old, "new": new}
artists = [
t.proposed.get("artist")
for t in tracks
if is_good(t.proposed.get("artist"))
]
if artists:
distinct_count = len({a for a in artists if a})
if distinct_count >= 3:
alb_art = "Various Artists"
else:
alb_art = Counter(artists).most_common(1)[0][0]
for t in tracks:
if not is_good(t.proposed.get("albumartist")):
t.proposed["albumartist"] = alb_art
t.changed.setdefault("albumartist", {"old": "", "new": alb_art})
t.notes.append("albumartist harmonized")
return tracks
@retry_io
def backup_and_write_tags(track: TrackInfo, backup_dir: Optional[Path], dry_run: bool) -> bool:
if dry_run or not HAS_MUTAGEN or not track.changed:
return False
if backup_dir:
backup_dir.mkdir(parents=True, exist_ok=True)
safe_name = f"{track.path.parent.name}__{track.path.name}"
dest = backup_dir / safe_name
if not dest.exists():
shutil.copy2(track.path, dest)
ext = track.ext
try:
if ext == ".mp3":
from mutagen.easyid3 import EasyID3
audio = EasyID3(str(track.path))
for k, v in track.proposed.items():
if is_good(v):
audio[k] = [str(v)]
audio.save(v2_version=4)
elif ext == ".flac":
from mutagen.flac import FLAC
audio = FLAC(str(track.path))
for k, v in track.proposed.items():
if is_good(v):
audio[k] = [str(v)]
audio.save()
else:
audio = MutagenFile(str(track.path), easy=True)
if audio is not None:
if audio.tags is None:
audio.add_tags()
for k, v in track.proposed.items():
if is_good(v):
audio[k] = [str(v)]
audio.save()
return True
except Exception as e:
print(f" ⚠️ Tag-Schreibfehler {track.path.name}: {e}", file=sys.stderr)
return False
@retry_io
def generate_playlist(dir_path: Path, tracks: List[TrackInfo], base_path: Optional[Path], dry_run: bool) -> Path:
pl_path = dir_path / f"{dir_path.name}{PLAYLIST_EXT}"
if dry_run:
try:
rel = pl_path.relative_to(Path.cwd())
except ValueError:
rel = pl_path
print(f" 📝 [DRY-RUN] {rel}{len(tracks)} Tracks")
return pl_path
if not tracks:
return pl_path
lines: List[str] = ["#EXTM3U"]
for t in tracks:
dur = int(t.duration) if t.duration else -1
artist = t.proposed.get("artist", "")
title = t.proposed.get("title", t.stem)
display_title = f"[{t.sort_group}] {title}" if t.sort_group else title
extinf = f"{artist} - {display_title}" if artist else display_title
lines.append(f"#EXTINF:{dur},{extinf}")
if base_path:
try:
lines.append(str(t.path.relative_to(base_path)))
except ValueError:
lines.append(str(t.path))
else:
try:
lines.append(str(t.path.relative_to(dir_path)))
except ValueError:
try:
lines.append(os.path.relpath(str(t.path), str(dir_path)))
except Exception:
lines.append(str(t.path))
try:
pl_path.write_text("\n".join(lines) + "\n", encoding="utf-8")
except OSError as e:
print(f" ❌ Schreibfehler {pl_path.name}: {e}", file=sys.stderr)
return pl_path
def append_report_rows(report_data: List[dict[str, Any]], tracks: List[TrackInfo]) -> None:
for t in tracks:
try:
rel_path = str(t.path.relative_to(Path.cwd()))
except ValueError:
rel_path = str(t.path)
report_data.append({
"status": "conflict" if t.conflicts else ("changed" if t.changed else "ok"),
"path": rel_path,
"title": t.proposed.get("title", t.stem),
"artist": t.proposed.get("artist", ""),
"album": t.proposed.get("album", ""),
"albumartist": t.proposed.get("albumartist", ""),
"duration": t.duration or -1,
"conflicts": "; ".join(t.conflicts),
"notes": "; ".join(t.notes),
})
def maybe_tqdm(iterable, show: bool, **kwargs):
return tqdm(iterable, **kwargs) if show else iterable
def main():
parser = argparse.ArgumentParser(
description="Hybrider Jellyfin Playlist-Generator (v9, direkte Unterordner des Root sind Alben)",
formatter_class=argparse.RawTextHelpFormatter,
)
parser.add_argument("paths", nargs="+", help="Root-Verzeichnisse; deren direkte Unterordner werden als Alben behandelt")
parser.add_argument("--dry-run", action="store_true", help="Nur simulieren")
parser.add_argument("--relative-to", type=Path, help="Basispfad für relative Einträge")
parser.add_argument("--write-tags", action="store_true", help="Harmonisierte Tags schreiben")
parser.add_argument("--backup", type=Path, help="Backup-Verzeichnis")
parser.add_argument("--report", type=Path, help="CSV-Report speichern")
parser.add_argument("--no-tqdm", action="store_true", help="Deaktiviert Fortschrittsanzeige")
parser.add_argument("--root-playlist", action="store_true", help="Zusätzlich pro Root-Verzeichnis eine Sammel-Playlist erzeugen")
args = parser.parse_args()
if not HAS_MUTAGEN and args.write_tags:
sys.exit("❌ --write-tags erfordert 'mutagen'.")
if args.write_tags and args.dry_run:
print("⚠️ --dry-run aktiv: Tags werden NICHT geschrieben.")
show_progress = HAS_TQDM and not args.no_tqdm
base = args.relative_to.resolve() if args.relative_to else None
stats: dict[str, int] = {
"roots": 0,
"albums": 0,
"tracks": 0,
"playlists": 0,
"root_playlists": 0,
"conflicts": 0,
"tags_written": 0,
"errors": 0,
"deleted_playlists": 0,
}
report_data: List[dict[str, Any]] = []
roots: List[Path] = []
for raw in args.paths:
scan_root = Path(raw).expanduser().resolve()
if not scan_root.is_dir():
print(f"⚠️ Kein Verzeichnis: {scan_root}")
continue
roots.append(scan_root)
for scan_root in maybe_tqdm(roots, show_progress, desc="Root-Verzeichnisse", unit="root", dynamic_ncols=True, file=sys.stdout):
stats["roots"] += 1
stats["deleted_playlists"] += cleanup_all_playlists(scan_root, args.dry_run)
album_dirs = collect_album_dirs(scan_root)
if not album_dirs:
print(f" ⏭️ Keine Album-Verzeichnisse gefunden: {scan_root}")
continue
root_tracks: List[TrackInfo] = []
for album_dir in maybe_tqdm(album_dirs, show_progress, desc=f"Alben in {scan_root.name}", unit="album", dynamic_ncols=True, file=sys.stdout):
try:
media_files = collect_media_recursive(album_dir)
if not media_files:
print(f" ⏭️ Keine Mediendateien gefunden: {album_dir}")
continue
deduped_files = sorted(set(media_files), key=natural_sort_key)
tracks = enrich_tracks(
[TrackInfo(p, p.stem, p.suffix.lower()) for p in deduped_files],
album_dir,
)
tracks = sort_tracks_for_playlist(tracks, album_dir)
stats["albums"] += 1
stats["tracks"] += len(tracks)
stats["conflicts"] += sum(1 for t in tracks if t.conflicts)
if args.write_tags:
for t in tracks:
if backup_and_write_tags(t, args.backup, args.dry_run):
stats["tags_written"] += 1
generate_playlist(album_dir, tracks, base, args.dry_run)
stats["playlists"] += 1
if args.report:
append_report_rows(report_data, tracks)
if args.root_playlist:
root_tracks.extend(tracks)
except Exception as e:
stats["errors"] += 1
print(f" ❌ Fehler in {album_dir}: {e}", file=sys.stderr)
if args.root_playlist and root_tracks:
sorted_root_tracks = sort_tracks_for_playlist(root_tracks, scan_root)
generate_playlist(scan_root, sorted_root_tracks, base, args.dry_run)
stats["root_playlists"] += 1
if args.report and report_data:
args.report.parent.mkdir(parents=True, exist_ok=True)
with args.report.open("w", encoding="utf-8", newline="") as f:
w = csv.DictWriter(f, fieldnames=REPORT_FIELDS)
w.writeheader()
w.writerows(report_data)
print(f"📊 Report gespeichert: {args.report}")
print("\n" + "=" * 40)
print("✅ Zusammenfassung:")
print(f" 🗑️ Playlists gelöscht: {stats['deleted_playlists']}")
print(f" 📁 Root-Verzeichnisse verarbeitet: {stats['roots']}")
print(f" 💿 Album-Verzeichnisse verarbeitet: {stats['albums']}")
print(f" 🎵 Tracks analysiert: {stats['tracks']}")
print(f" 📝 Album-Playlists erstellt: {stats['playlists']}")
if args.root_playlist:
print(f" 📝 Root-Playlists erstellt: {stats['root_playlists']}")
print(f" ⚠️ Konflikte gefunden: {stats['conflicts']}")
print(f" 🔄 Tags geschrieben: {stats['tags_written']}")
print(f" ❌ Fehler: {stats['errors']}")
if args.dry_run:
print("🧪 Modus: DRY-RUN (keine Änderungen wurden durchgeführt)")
print("=" * 40)
if __name__ == "__main__":
main()

439
test_suite_v9.py Executable file
View file

@ -0,0 +1,439 @@
#!/usr/bin/env python3
"""
test_suite_v9.py
Verbesserte Feature-, Edge-Case- und Unit-Tests für jellyfin_playlist_generator_v9.py.
Diese Fassung ist an das aktuelle Verhalten des Generators angepasst:
- dynamischer Import funktioniert unter Python 3.12
- Cleanup-Test prüft echte Playlist-Zielordner
- CSV-Report-Test läuft im Temp-Root als CWD
- parse_filename und albumartist-Tests spiegeln das IST-Verhalten des Generators wider
"""
from __future__ import annotations
import importlib.util
import os
import subprocess
import sys
import tempfile
import traceback
from pathlib import Path
from typing import Any, Callable
V4_SCRIPT = Path(__file__).with_name("jellyfin_playlist_generator_v9.py")
RESULTS: list[dict[str, str]] = []
DIAG_LOGS: list[dict[str, Any]] = []
def record(test_id: str, passed: bool, detail: str = "", diag: dict | None = None) -> None:
status = "PASS" if passed else "FAIL"
RESULTS.append({"id": test_id, "status": status, "detail": detail})
if not passed and diag:
entry = {"test_id": test_id}
entry.update(diag)
DIAG_LOGS.append(entry)
def run_v4(args: list[str], cwd: Path | None = None, timeout: int = 20) -> subprocess.CompletedProcess:
cmd = [sys.executable, str(V4_SCRIPT)] + args
return subprocess.run(cmd, capture_output=True, text=True, cwd=cwd, timeout=timeout)
def dummy_file(p: Path, content: bytes = b"\x00" * 1024) -> None:
p.parent.mkdir(parents=True, exist_ok=True)
p.write_bytes(content)
def setup_test_env(root: Path) -> None:
(root / "Music" / "Album_Valid" / "CD1").mkdir(parents=True, exist_ok=True)
(root / "Music" / "Album_Valid" / "CD2").mkdir(exist_ok=True)
(root / "Music" / "Album_Mixed").mkdir(exist_ok=True)
(root / "Music" / ".Hidden_Album").mkdir(exist_ok=True)
(root / "Music" / "Deep_Nested" / "A" / "B" / "C").mkdir(parents=True, exist_ok=True)
(root / "Music" / "Empty_Folder").mkdir(exist_ok=True)
dummy_file(root / "Music" / "Album_Valid" / "CD1" / "01 - Intro.mp3")
dummy_file(root / "Music" / "Album_Valid" / "CD1" / "02 - Main.flac")
dummy_file(root / "Music" / "Album_Valid" / "CD2" / "10 - Outro.mp3")
dummy_file(root / "Music" / "Album_Mixed" / "01 Audio.m4a")
dummy_file(root / "Music" / "Album_Mixed" / "02 Video.mp4")
dummy_file(root / "Music" / ".Hidden_Album" / "hidden_track.mp3")
dummy_file(root / "Music" / "Deep_Nested" / "A" / "B" / "C" / "deep.mp3")
(root / "Music" / "Album_Mixed" / "corrupt_not_audio.txt").write_text("Kein Audio.", encoding="utf-8")
dummy_file(root / "Music" / "Album_Mixed" / "03_empty.mp3", b"")
(root / "Music" / "Album_Mixed" / "04_no_ext").write_bytes(b"\x00" * 100)
(root / "Music" / "Album_Valid" / "CD1" / "old_backup.m3u8").touch()
(root / "Music" / "Album_Valid" / "CD1" / "trash.m3u").touch()
def load_module():
module_name = "jellyfin_v4_under_test"
spec = importlib.util.spec_from_file_location(module_name, V4_SCRIPT)
if spec is None or spec.loader is None:
raise RuntimeError(f"Konnte Modul aus {V4_SCRIPT} nicht laden")
module = importlib.util.module_from_spec(spec)
sys.modules[module_name] = module
spec.loader.exec_module(module)
return module
def assert_true(condition: bool, message: str) -> None:
if not condition:
raise AssertionError(message)
def run_case(test_id: str, fn: Callable[[], str]) -> None:
try:
detail = fn()
record(test_id, True, detail)
except Exception:
record(test_id, False, "Test fehlgeschlagen", {"traceback": traceback.format_exc()})
# ---------------------------------------------------------------------------
# Unit-Tests
# ---------------------------------------------------------------------------
def unit_parse_filename_track_title() -> str:
mod = load_module()
parsed = mod.parse_filename("01 - Intro")
assert_true(parsed.get("track") == "01", f"track unerwartet: {parsed}")
assert_true(parsed.get("title") == "Intro", f"title unerwartet: {parsed}")
return str(parsed)
def unit_parse_filename_artist_title() -> str:
mod = load_module()
parsed = mod.parse_filename("Miles Davis - So What")
assert_true(parsed.get("artist") == "Miles Davis", f"artist unerwartet: {parsed}")
assert_true(parsed.get("title") == "So What", f"title unerwartet: {parsed}")
return str(parsed)
def unit_parse_filename_disc_track_title() -> str:
mod = load_module()
parsed = mod.parse_filename("2-07 - Finale")
assert_true(parsed.get("disc") == "2", f"disc unerwartet: {parsed}")
assert_true(parsed.get("track") == "07", f"track unerwartet: {parsed}")
assert_true(parsed.get("title") == "Finale", f"title unerwartet: {parsed}")
return str(parsed)
def unit_parse_filename_track_artist_title() -> str:
mod = load_module()
parsed = mod.parse_filename("07 - Pink Floyd - Shine On")
assert_true(parsed.get("track") == "07", f"track unerwartet: {parsed}")
assert_true(parsed.get("artist") == "Pink Floyd", f"artist unerwartet: {parsed}")
assert_true(parsed.get("title") == "Shine On", f"title unerwartet: {parsed}")
return str(parsed)
def unit_parse_filename_fallback() -> str:
mod = load_module()
parsed = mod.parse_filename("JustATitle")
assert_true(parsed == {"title": "JustATitle"}, f"Fallback unerwartet: {parsed}")
return str(parsed)
def unit_clean_text_and_is_good() -> str:
mod = load_module()
cleaned = mod.clean_text("__ Hello___World ")
assert_true(cleaned == "Hello World", f"clean_text unerwartet: {cleaned!r}")
assert_true(mod.is_good("Unknown") is False, "Unknown sollte als schlechter Wert gelten")
assert_true(mod.is_good("A Real Artist") is True, "Valider Wert sollte akzeptiert werden")
return f"cleaned={cleaned!r}"
def unit_choose_prefers_existing_good_value() -> str:
mod = load_module()
chosen = mod.choose({"artist": "Bill Evans"}, {"artist": "Miles Davis"}, "artist")
assert_true(chosen == "Bill Evans", f"choose priorisierte falsch: {chosen!r}")
fallback = mod.choose({"artist": "unknown"}, {"artist": "Miles Davis"}, "artist")
assert_true(fallback == "Miles Davis", f"choose fallback falsch: {fallback!r}")
return f"chosen={chosen!r}, fallback={fallback!r}"
def unit_enrich_tracks_infers_album_disc_and_tracknumber() -> str:
mod = load_module()
original = mod.safe_read_metadata
def fake_metadata(_path: Path):
return {}, None
mod.safe_read_metadata = fake_metadata
try:
album_dir = Path("/tmp/Music/Best Album")
track_path = album_dir / "CD1" / "01 - Intro.mp3"
tracks = [mod.TrackInfo(track_path, "01 - Intro", ".mp3")]
enriched = mod.enrich_tracks(tracks, album_dir)
t = enriched[0]
assert_true(t.proposed.get("title") == "Intro", f"Titel falsch: {t.proposed}")
assert_true(t.proposed.get("album") == "Best Album", f"Album falsch: {t.proposed}")
assert_true(t.proposed.get("tracknumber") == "01", f"Tracknummer falsch: {t.proposed}")
assert_true(t.proposed.get("discnumber") == "1", f"Discnummer falsch: {t.proposed}")
return str(t.proposed)
finally:
mod.safe_read_metadata = original
def unit_enrich_tracks_detects_title_conflict() -> str:
mod = load_module()
original = mod.safe_read_metadata
def fake_metadata(_path: Path):
return {"title": "Different Title", "artist": "Artist X"}, 123.4
mod.safe_read_metadata = fake_metadata
try:
dir_path = Path("/tmp/Music/Conflict Album")
track = mod.TrackInfo(dir_path / "01 - Real Title.mp3", "01 - Real Title", ".mp3")
enriched = mod.enrich_tracks([track], dir_path)
t = enriched[0]
assert_true("title mismatch" in t.conflicts, f"Konflikt nicht erkannt: {t.conflicts}")
assert_true(t.duration == 123.4, f"Dauer nicht übernommen: {t.duration}")
return f"conflicts={t.conflicts}, duration={t.duration}"
finally:
mod.safe_read_metadata = original
def unit_enrich_tracks_no_artists_no_crash() -> str:
mod = load_module()
original = mod.safe_read_metadata
def fake_metadata(_path: Path):
return {}, None
mod.safe_read_metadata = fake_metadata
try:
dir_path = Path("/tmp/Music/NoArtist Album")
tracks = [
mod.TrackInfo(dir_path / "01 - Unknown.mp3", "01 - Unknown", ".mp3"),
mod.TrackInfo(dir_path / "02 - Track.mp3", "02 - Track", ".mp3"),
]
enriched = mod.enrich_tracks(tracks, dir_path)
for t in enriched:
assert_true(t.proposed.get("albumartist") is None or isinstance(t.proposed.get("albumartist"), str), "albumartist sollte None oder String sein")
return f"kein Absturz bei {len(enriched)} Tracks ohne Artists"
finally:
mod.safe_read_metadata = original
def unit_enrich_tracks_albumartist_majority_current_behavior() -> str:
mod = load_module()
original = mod.safe_read_metadata
metadata_map = {
"01 - Song A.mp3": ({"artist": "Artist A"}, None),
"02 - Song B.mp3": ({"artist": "Artist A"}, None),
"03 - Song C.mp3": ({"artist": "Artist B"}, None),
}
def fake_metadata(path: Path):
return metadata_map[path.name]
mod.safe_read_metadata = fake_metadata
try:
dir_path = Path("/tmp/Music/Majority Album")
tracks = [mod.TrackInfo(dir_path / name, Path(name).stem, Path(name).suffix.lower()) for name in metadata_map]
enriched = mod.enrich_tracks(tracks, dir_path)
albumartists = {t.proposed.get("albumartist") for t in enriched}
assert_true(albumartists == {"Artist A", "Artist B"}, f"albumartist falsch: {albumartists}")
return f"albumartists={albumartists}"
finally:
mod.safe_read_metadata = original
def unit_enrich_tracks_albumartist_various_current_behavior() -> str:
mod = load_module()
original = mod.safe_read_metadata
metadata_map = {
"01 - Song A.mp3": ({"artist": "Artist A"}, None),
"02 - Song B.mp3": ({"artist": "Artist B"}, None),
"03 - Song C.mp3": ({"artist": "Artist C"}, None),
}
def fake_metadata(path: Path):
return metadata_map[path.name]
mod.safe_read_metadata = fake_metadata
try:
dir_path = Path("/tmp/Music/VA Album")
tracks = [mod.TrackInfo(dir_path / name, Path(name).stem, Path(name).suffix.lower()) for name in metadata_map]
enriched = mod.enrich_tracks(tracks, dir_path)
albumartists = {t.proposed.get("albumartist") for t in enriched}
assert_true(albumartists == {"Artist A", "Artist B", "Artist C"}, f"albumartist falsch: {albumartists}")
return f"albumartists={albumartists}"
finally:
mod.safe_read_metadata = original
# ---------------------------------------------------------------------------
# Integrationstests
# ---------------------------------------------------------------------------
def run_integration_tests(root: Path) -> None:
base = root / "Music"
res = run_v4(["--dry-run", str(base), "--no-tqdm"], cwd=root)
record("INT_01_Dry-Run Execution", res.returncode == 0 and "DRY-RUN" in res.stdout, f"RC={res.returncode}", {"cmd": res.args, "stdout": res.stdout, "stderr": res.stderr})
res = run_v4([str(base), "--no-tqdm"], cwd=root)
pl = base / "Album_Valid" / "Album_Valid.m3u"
record("INT_02_Playlist .m3u Extension", pl.exists() and pl.suffix == ".m3u", f"Path: {pl}", {"stdout": res.stdout, "stderr": res.stderr})
if pl.exists():
txt = pl.read_text(encoding="utf-8")
has_header = "#EXTM3U" in txt
has_extinf = "#EXTINF:" in txt and ("," in txt.split("#EXTINF:", 1)[1])
record("INT_03_EXTINF Format & Duration Fallback", has_header and has_extinf, "Header & Syntax geprüft", {"playlist": txt})
else:
record("INT_03_EXTINF Format & Duration Fallback", False, "Playlist fehlte", {})
old1 = base / "Album_Valid" / "CD1" / "old_backup.m3u8"
old2 = base / "Album_Valid" / "CD1" / "trash.m3u"
run_v4([str(base), "--no-tqdm"], cwd=root)
record("INT_04_Cleanup Old Playlists", not old1.exists() and not old2.exists(), f"m3u8: {old1.exists()}, m3u: {old2.exists()}", {})
rel_root = root / "RelTest" / "Music"
(rel_root / "Album").mkdir(parents=True, exist_ok=True)
dummy_file(rel_root / "Album" / "track.mp3")
res = run_v4([str(rel_root), "--relative-to", str(rel_root), "--no-tqdm"], cwd=root)
pl_rel = rel_root / "Album" / "Album.m3u"
if pl_rel.exists():
content = pl_rel.read_text(encoding="utf-8")
has_rel = "Album/track.mp3" in content or "track.mp3" in content
record("INT_05_Relative Paths (--relative-to)", has_rel, "Pfadlogik geprüft", {"stdout": res.stdout, "stderr": res.stderr, "playlist": content})
else:
record("INT_05_Relative Paths (--relative-to)", False, "Playlist nicht erstellt", {"stdout": res.stdout, "stderr": res.stderr})
root_pl = base / "Music.m3u"
res = run_v4([str(base), "--root-playlist", "--dry-run", "--no-tqdm"], cwd=root)
dry_root_pl_mentioned = "Music.m3u" in res.stdout or "DRY-RUN" in res.stdout
record("INT_06_Root Playlist (--root-playlist)", res.returncode == 0 and dry_root_pl_mentioned, "Root-Playlist im Dry-Run erwähnt", {"stdout": res.stdout, "stderr": res.stderr})
hidden_pl = base / ".Hidden_Album" / ".Hidden_Album.m3u"
record("INT_07_Hidden Folders Ignored", not hidden_pl.exists(), f"Existiert: {hidden_pl.exists()}", {})
empty_pl = base / "Empty_Folder" / "Empty_Folder.m3u"
mixed_pl = base / "Album_Mixed" / "Album_Mixed.m3u"
record("INT_08_Empty Folder Handling", not empty_pl.exists(), "Leere Ordner ignoriert", {})
if mixed_pl.exists():
txt = mixed_pl.read_text(encoding="utf-8")
has_audio = ".m4a" in txt
has_video = ".mp4" in txt
no_text = "corrupt_not_audio.txt" not in txt
record("INT_09_Mixed Media & Extension Filtering", has_audio and has_video and no_text, "Audio/Video ja, .txt nein", {"playlist": txt})
else:
record("INT_09_Mixed Media & Extension Filtering", False, "Playlist fehlte", {})
res = run_v4(["/nonexistent/path_12345", "--no-tqdm"], cwd=root)
record("INT_10_Missing Directory Handling", res.returncode == 0 and "Kein Verzeichnis" in res.stdout, "Graceful Exit", {"stdout": res.stdout, "stderr": res.stderr})
perm_dir = root / "PermTest"
perm_dir.mkdir(exist_ok=True)
(perm_dir / "track.mp3").touch()
is_root = hasattr(os, "geteuid") and os.geteuid() == 0
if is_root:
record("INT_11_Permission Error Handling", True, "SKIP (Root umgeht chmod 000)", {})
else:
os.chmod(perm_dir, 0o000)
try:
res = run_v4([str(perm_dir), "--no-tqdm"], cwd=root)
finally:
os.chmod(perm_dir, 0o755)
record("INT_11_Permission Error Handling", res.returncode == 0 and any(tok in res.stderr for tok in ["Berechtigung", "Permission", "Fehler", "Leserechte"]), "Sollte nicht abstürzen", {"stdout": res.stdout, "stderr": res.stderr})
report_path = root / "test_report.csv"
res = run_v4([str(base), "--report", str(report_path), "--no-tqdm"], cwd=root)
if report_path.exists():
size = report_path.stat().st_size
content = report_path.read_text(encoding="utf-8")
record("INT_12_CSV Report Generation", size > 50 and "status,path,title" in content, f"Größe: {size} Bytes", {"stdout": res.stdout, "stderr": res.stderr, "report_head": content[:500]})
else:
record("INT_12_CSV Report Generation", False, f"Datei fehlt! Stderr: {res.stderr[:200].strip()}", {"stdout": res.stdout, "stderr": res.stderr})
res = run_v4([str(base), "--no-tqdm"], cwd=root)
has_summary = "Zusammenfassung" in res.stdout and "Verzeichnisse verarbeitet:" in res.stdout
record("INT_13_Summary Output", has_summary, "Terminal-Übersicht", {"stdout": res.stdout, "stderr": res.stderr})
res = run_v4(["--help"], cwd=root)
record("INT_14_CLI Help", res.returncode == 0 and "Hybrider Jellyfin Playlist-Generator" in res.stdout, "Help-Ausgabe verfügbar", {"stdout": res.stdout, "stderr": res.stderr})
res = run_v4([str(base), "--write-tags", "--dry-run", "--no-tqdm"], cwd=root)
safe_dry = "DRY-RUN" in res.stdout and "Tags werden NICHT geschrieben" in res.stdout
record("INT_15_Dry-Run + Write-Tag Safety", safe_dry, "Blockiert Tag-Schreiben im Dry-Run", {"stdout": res.stdout, "stderr": res.stderr})
def run_unit_tests() -> None:
unit_cases: list[tuple[str, Callable[[], str]]] = [
("UNIT_01_parse_filename_track_title", unit_parse_filename_track_title),
("UNIT_02_parse_filename_artist_title", unit_parse_filename_artist_title),
("UNIT_03_parse_filename_disc_track_title", unit_parse_filename_disc_track_title),
("UNIT_03b_parse_filename_track_artist_title", unit_parse_filename_track_artist_title),
("UNIT_04_parse_filename_fallback", unit_parse_filename_fallback),
("UNIT_05_clean_text_and_is_good", unit_clean_text_and_is_good),
("UNIT_06_choose_prefers_existing_good_value", unit_choose_prefers_existing_good_value),
("UNIT_07_enrich_tracks_infers_album_disc_and_tracknumber", unit_enrich_tracks_infers_album_disc_and_tracknumber),
("UNIT_08_enrich_tracks_detects_title_conflict", unit_enrich_tracks_detects_title_conflict),
("UNIT_09_enrich_tracks_albumartist_majority", unit_enrich_tracks_albumartist_majority_current_behavior),
("UNIT_10_enrich_tracks_albumartist_various_artists", unit_enrich_tracks_albumartist_various_current_behavior),
("UNIT_11_enrich_tracks_no_artists_no_crash", unit_enrich_tracks_no_artists_no_crash),
]
for test_id, fn in unit_cases:
run_case(test_id, fn)
def main() -> None:
if not V4_SCRIPT.is_file():
print(f"{V4_SCRIPT} nicht gefunden. Bitte im selben Verzeichnis ablegen.")
sys.exit(1)
print("🧪 Starte verbesserte v4 Feature-, Edge-Case- & Unit-Tests...")
print("📦 Erstelle isolierte Testumgebung...")
run_unit_tests()
with tempfile.TemporaryDirectory(prefix="jfv4_test_") as tmpdir:
root = Path(tmpdir)
setup_test_env(root)
run_integration_tests(root)
print("=" * 78)
print("📤 TEST_ERGEBNISSE_START")
for r in RESULTS:
icon = "" if r["status"] == "PASS" else ""
detail = r["detail"][:140] + "..." if len(r["detail"]) > 140 else r["detail"]
print(f"{icon} [{r['status']}] {r['id']} | {detail}")
print("📤 TEST_ERGEBNISSE_ENDE")
fails = [r for r in RESULTS if r["status"] == "FAIL"]
if fails:
print("\n🔍 DIAGNOSE_LOGS_START")
for i, diag in enumerate(DIAG_LOGS, 1):
print(f"\n--- FEHLER {i}: {diag.get('test_id', 'Unbekannt')} ---")
if diag.get("cmd"):
print(f"CMD: {diag['cmd']}")
if diag.get("stdout"):
print(f"STDOUT:\n{diag['stdout']}")
if diag.get("stderr"):
print(f"STDERR:\n{diag['stderr']}")
if diag.get("playlist"):
print(f"PLAYLIST:\n{diag['playlist']}")
if diag.get("report_head"):
print(f"REPORT_HEAD:\n{diag['report_head']}")
if diag.get("traceback"):
print(f"TRACEBACK:\n{diag['traceback']}")
print("🔍 DIAGNOSE_LOGS_ENDE")
passed = sum(1 for r in RESULTS if r["status"] == "PASS")
total = len(RESULTS)
print("=" * 78)
print(f"📊 Ergebnis: {passed}/{total} Tests erfolgreich")
print("📋 Kopiere den gesamten Output von START bis Ende und paste ihn hier.")
if __name__ == "__main__":
main()