From bed29fb1c8e2e7b320cf6dcf03a4cf9ff913bbd9 Mon Sep 17 00:00:00 2001 From: dschlueter Date: Sat, 16 May 2026 08:56:50 +0200 Subject: [PATCH] Initial commit: chatterbox TTS CLI v4 --- .gitignore | 27 + BEDIENUNGSANLEITUNG.md | 200 ++++++++ README.md | 139 ++++++ Trump_in_China_kurz.txt | 18 + chatterbox_cli_v4.py | 1033 +++++++++++++++++++++++++++++++++++++++ requirements.txt | 17 + 6 files changed, 1434 insertions(+) create mode 100644 .gitignore create mode 100644 BEDIENUNGSANLEITUNG.md create mode 100644 README.md create mode 100644 Trump_in_China_kurz.txt create mode 100755 chatterbox_cli_v4.py create mode 100644 requirements.txt diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..0c215f6 --- /dev/null +++ b/.gitignore @@ -0,0 +1,27 @@ +# Python +__pycache__/ +*.py[cod] +*.egg-info/ +.eggs/ + +# Ausgabe-Dateien +*.wav +*.mp3 +*.ogg + +# Persönliche Daten (Stimmaufnahmen) +my_voice*.wav +voice_*.wav + +# Umgebung +.env +*.log +.venv/ +env/ + +# IDE +.vscode/ +.idea/ + +# Claude Code +.claude/ diff --git a/BEDIENUNGSANLEITUNG.md b/BEDIENUNGSANLEITUNG.md new file mode 100644 index 0000000..2cff9c1 --- /dev/null +++ b/BEDIENUNGSANLEITUNG.md @@ -0,0 +1,200 @@ +# Bedienungsanleitung: Chatterbox TTS-Assistent + +Dieses Programm liest Texte laut vor — ähnlich wie ein Vorlesedienst. +Es wandelt geschriebenen Text in natürlich klingende Sprache um. + +--- + +## Was das Programm braucht + +- Einen Computer mit Linux +- Eine installierte Conda-Umgebung namens `chatterbox` +- Eine Grafikkarte (GPU) — macht das Programm deutlich schneller + +--- + +## Das Programm starten + +Öffne ein Terminal und gib folgende Befehle ein: + +```bash +conda activate chatterbox +cd ~/Python_Programs/chatterbox +``` + +--- + +## Einen Text vorlesen lassen + +### Text aus einer Datei vorlesen + +```bash +python chatterbox_cli_v4.py --lang de --input mein_text.txt +``` + +Ersetze `mein_text.txt` durch den Pfad zu deiner Textdatei. +Die Datei muss im Format **UTF-8** gespeichert sein (das ist der Standard +bei modernen Texteditoren). + +### Einen kurzen Text direkt eingeben + +```bash +python chatterbox_cli_v4.py --lang de --text "Guten Morgen! Wie geht es Ihnen heute?" +``` + +--- + +## Die eigene Stimme verwenden + +Wenn du eine Aufnahme deiner Stimme hast (eine WAV-Datei von ca. 10–30 Sekunden), +kann das Programm diese Stimme nachahmen: + +```bash +python chatterbox_cli_v4.py --lang de \ + --voice meine_stimme.wav \ + --input mein_text.txt +``` + +**Tipp:** Eine Aufnahme von 20 Sekunden reicht aus. Am besten in ruhiger Umgebung +und deutlich sprechen. + +--- + +## Sprache wählen + +Das Programm kann in vielen Sprachen vorlesen. Die Sprache wählt man mit `--lang`: + +| Befehl | Sprache | +|--------|---------| +| `--lang de` | Deutsch (Standard) | +| `--lang en` | Englisch | +| `--lang fr` | Französisch | +| `--lang es` | Spanisch | +| `--lang it` | Italienisch | + +Beispiel auf Englisch: + +```bash +python chatterbox_cli_v4.py --lang en --text "Good morning, how are you?" +``` + +--- + +## Sprechgeschwindigkeit anpassen + +Mit `--speed` kann man einstellen, wie schnell der Text gesprochen wird. + +- `1.0` = normale Geschwindigkeit (Standard) +- `0.85` = etwas langsamer — gut für ältere Hörer +- `0.75` = deutlich langsamer +- `1.2` = etwas schneller + +```bash +python chatterbox_cli_v4.py --lang de --speed 0.85 --input mein_text.txt +``` + +**Hinweis:** Die Stimmhöhe bleibt gleich — nur das Tempo ändert sich. + +--- + +## Audio als Datei speichern + +Wenn du die Audiodatei behalten möchtest: + +```bash +python chatterbox_cli_v4.py --lang de --save --input mein_text.txt +``` + +Die Datei wird automatisch als `mein_text.de.wav` gespeichert — im selben +Ordner wie die Eingabedatei. + +Oder mit eigenem Dateinamen: + +```bash +python chatterbox_cli_v4.py --lang de --output ausgabe.wav --input mein_text.txt +``` + +--- + +## Nur speichern, nicht abspielen + +```bash +python chatterbox_cli_v4.py --lang de --no-play --output ausgabe.wav --input mein_text.txt +``` + +--- + +## Aussprache von Eigennamen anpassen + +Manche Namen — vor allem aus anderen Sprachen — werden falsch ausgesprochen. +Du kannst das mit einer einfachen Textdatei im JSON-Format korrigieren. + +**Beispiel:** Datei `aussprache.json` anlegen: + +```json +{ + "Xi Jinping": "Schi Dschinping", + "Seoul": "Söul", + "Macron": "Makron" +} +``` + +Dann so aufrufen: + +```bash +python chatterbox_cli_v4.py --lang de \ + --pronunciation-dict aussprache.json \ + --input nachricht.txt +``` + +--- + +## Typischer Arbeitsablauf + +1. Text in einem Editor schreiben und als `.txt`-Datei speichern +2. Terminal öffnen, `conda activate chatterbox` +3. Programm aufrufen: + ```bash + python chatterbox_cli_v4.py --lang de --voice meine_stimme.wav --input text.txt + ``` +4. Das Programm beginnt sofort zu sprechen — Satz für Satz + +--- + +## Was das Programm automatisch macht + +- Abkürzungen buchstabieren: ARD wird zu „Ah Er De", YMCA zu „Ypsilon Em Tse Ah" +- Zusammengesetzte Wörter mit Abkürzung: „US-Präsident" wird zu „U Es Präsident" +- Uhrzeiten vorlesen: „14:58" wird zu „vierzehn Uhr achtundfünfzig" +- Jahreszahlen aussprechen: „2026" wird zu „zweitausendsechsundzwanzig" +- Trennzeilen wie „--- Ende ---" werden stillschweigend übersprungen + +--- + +## Wenn etwas nicht klappt + +**Kein Ton zu hören:** +```bash +# Ausgabegerät prüfen +python -c "import sounddevice; print(sounddevice.query_devices())" +``` +Dann `--audio-device pulse` oder das passende Gerät angeben. + +**„Modell nicht gefunden":** +Beim ersten Start wird das Modell heruntergeladen (~2 GB). +Sicherstellen, dass eine Internetverbindung besteht. + +**Programm ist sehr langsam:** +Ohne GPU dauert die Generierung länger als die Wiedergabe — ein Satz +kann 30–60 Sekunden brauchen. Mit GPU (CUDA) dauert es ca. 5–10 Sekunden. + +--- + +## Bekannte Grenzen + +- **Betonung einzelner Wörter** lässt sich nicht direkt steuern. + Eine Aufnahme der eigenen Stimme mit natürlicher Betonung kann helfen. +- **Manche Fremdwörter** (z. B. chinesische oder arabische Namen) klingen + nicht immer perfekt — mit der Aussprache-Datei lässt sich das korrigieren. +- Das Programm liest alles vor, was in der Datei steht — also auch + Überschriften und Metadaten wie „Schlagzeile:" oder „Stand:". diff --git a/README.md b/README.md new file mode 100644 index 0000000..fb68309 --- /dev/null +++ b/README.md @@ -0,0 +1,139 @@ +# chatterbox-tts-cli + +Ein kommandozeilenbasierter TTS-Assistent (Text-to-Speech) auf Basis von +[Chatterbox TTS](https://github.com/resemble-ai/chatterbox) (Resemble AI). +Optimiert für deutsche Sprache und den Einsatz als Audio-Vorlesehilfe, z. B. +für Senioren oder Accessibility-Anwendungen. + +## Features + +- **Satz-für-Satz-Streaming** — gibt den ersten Satz aus, während die nächsten + bereits generiert werden; minimale Latenz +- **Lückenlose Audiowiedergabe** — Callback-basierter OutputStream mit + vorgefertigten Blöcken; keine Unterbrechungen zwischen Sätzen +- **Geschwindigkeitsanpassung** — pitch-erhaltende Zeitstreckung via + pyrubberband (R3-Engine); konfigurierbar per `--speed` +- **Voice Cloning** — optionale WAV-Referenz für Akzent und Klang +- **Mehrsprachig** — Deutsch, Englisch und 20+ weitere Sprachen via + `ChatterboxMultilingualTTS` (erfordert Multilingual-Setup, s. u.) +- **Deutsche Textnormalisierung** + - Abkürzungen: ARD → "Ah Er De", YMCA → "Ypsilon Em Tse Ah" + - Komposita: US-Präsident → "U Es Präsident" + - Uhrzeiten: 14:58 → "vierzehn Uhr achtundfünfzig" + - Jahreszahlen: 2026 → "zweitausendsechsundzwanzig" + - Einheiten: 120 km/h → "120 Kilometer pro Stunde" +- **Konfigurierbares Aussprache-Wörterbuch** — Eigennamen und Fremdwörter + per JSON-Datei phonetisch überschreiben +- **Automatische Satz-Erkennung** — intelligentes Splitting inkl. + Ordinalzahlen, Paragraphen und Trennzeilen + +## Systemvoraussetzungen + +- Python 3.11+ +- CUDA-GPU empfohlen (RTX 3070 oder besser; CPU möglich aber langsam) +- Linux mit PipeWire oder PulseAudio (für `--audio-device pulse`) +- `rubberband-cli` (nur wenn `--speed` != 1.0 genutzt wird): + ```bash + sudo apt install rubberband-cli + ``` + +## Installation + +```bash +# 1. Conda-Umgebung erstellen (empfohlen) +conda create -n chatterbox python=3.11 +conda activate chatterbox + +# 2. PyTorch mit CUDA installieren (Beispiel für CUDA 12.4) +pip install torch torchaudio --index-url https://download.pytorch.org/whl/cu124 + +# 3. Chatterbox und weitere Abhängigkeiten +pip install chatterbox-tts sounddevice pyrubberband + +# 4. Skript herunterladen +# chatterbox_cli_v4.py in das Arbeitsverzeichnis legen +``` + +### Multilingual-Setup (für Deutsch und andere Nicht-Englisch-Sprachen) + +Das Standard-Paket `chatterbox-tts` enthält die Multilingual-Unterstützung +noch nicht vollständig. Notwendige Schritte: + +```bash +# Multilingual-Modell herunterladen (beim ersten Start automatisch) +# Modell-Auswahl: v3 (Standard, besser) oder v2 +# Wird in ~/.cache/huggingface/ gespeichert +``` + +Beim ersten Start mit `--lang de` werden die Modelle automatisch heruntergeladen +(ca. 2–3 GB). + +## Schnellstart + +```bash +# Deutschen Text vorlesen (aus Datei) +python chatterbox_cli_v4.py --lang de --input mein_text.txt + +# Mit eigener Stimme (Voice Cloning) +python chatterbox_cli_v4.py --lang de \ + --voice meine_stimme.wav \ + --input mein_text.txt + +# Etwas langsamer sprechen +python chatterbox_cli_v4.py --lang de --speed 0.85 --input mein_text.txt + +# Englisch +python chatterbox_cli_v4.py --lang en --text "Hello, how are you today?" +``` + +## Optionen + +| Option | Standard | Beschreibung | +|--------|----------|--------------| +| `--text TEXT` | — | Text direkt als Argument | +| `--input DATEI` | — | UTF-8-Textdatei als Eingabe | +| `--lang CODE` | `de` | Sprachcode (de, en, fr, es, …) | +| `--voice DATEI.wav` | — | Referenz-WAV für Voice Cloning | +| `--speed 0.85` | `1.0` | Geschwindigkeit (0.7–1.3); pitch bleibt gleich | +| `--audio-device` | `pulse` | Ausgabegerät (z. B. `pulse`, `default`) | +| `--t3-model` | `v3` | Multilingual-Modell: `v3` oder `v2` | +| `--acronym-mode` | `german` | Akronym-Modus: `german`, `space`, `period_space` | +| `--pronunciation-dict` | — | JSON-Datei mit Aussprache-Substitutionen | +| `--save` | nein | WAV-Datei speichern | +| `--output DATEI.wav` | — | Ausgabepfad (impliziert `--save`) | +| `--no-play` | — | Nur speichern, nicht abspielen | +| `--no-sentence-mode` | — | Text als Ganzes statt satzweise verarbeiten | +| `--debug-delay N` | `0` | Pause in Sekunden vor jedem Satz (zum Testen) | +| `--speed` | `1.0` | Sprechgeschwindigkeit | + +## Aussprache-Wörterbuch + +Für Eigennamen und Fremdwörter, die das Modell falsch ausspricht: + +```json +{ + "Xi Jinping": "Schi Dschinping", + "Putin": "Pjutin", + "Seoul": "Söul" +} +``` + +```bash +python chatterbox_cli_v4.py --lang de \ + --pronunciation-dict aussprache.json \ + --input nachricht.txt +``` + +## Bekannte Einschränkungen + +- **Wortbetonung** lässt sich nicht steuern — das Modell kennt kein SSML. + Abhilfe: Voice-Referenz mit gewünschter Betonung aufnehmen. +- **Chinesische/japanische Namen** werden phonetisch angenähert; das Modell + ist nicht für asiatische Phonetik optimiert. +- **Sehr lange Texte** werden satzweise verarbeitet; zwischen Absätzen können + kurze Pausen entstehen (Generierungszeit für den nächsten Satz). + +## Lizenz + +MIT — dieses Skript. Das Chatterbox-Modell unterliegt der MIT-Lizenz von +Resemble AI. Die Modellgewichte sind nicht-kommerziell (CC BY-NC 4.0). diff --git a/Trump_in_China_kurz.txt b/Trump_in_China_kurz.txt new file mode 100644 index 0000000..556cd4c --- /dev/null +++ b/Trump_in_China_kurz.txt @@ -0,0 +1,18 @@ + +Schlagzeile: Die neue Harmonie - mit Schlagseite + +Stand: 15. Mai 2026 - 14:58 + +​Unterschlagzeile: Ein Besuch in großer Eintracht, so hatte es US-Präsident Trump schon vor Abreise nach China versprochen, und so kam es auch. Doch der Eindruck bleibt, dass die Gegenleistung überschaubar ausfällt. + +Eine Analyse von Marie von Mallinckrodt, ARD Peking + +Die Blaskapelle der Volksbefreiungsarmee spielt eine Version von "YMCA", eines der Lieblingslieder von US-Präsident Donald Trump. Serviert wird Lobster in Tomatensuppe, Peking-Ente und vieles mehr. Gegessen wird mit goldenem Besteck. Die Stimmung beim pompösen Staatsbankett in der Halle des Volkes ist feierlich und freundlich. Zwei ehemalige Rivalen stoßen auf ihre Annäherung an. Es ist diese Aussage des chinesischen Staats- und Parteichefs Xi Jinping an diesem Abend, die den Wesenskern der Beziehung der beiden Staatsoberhäupter wohl am besten beschreibt: "Die große Wiederbelebung der chinesischen Nation und das Ziel 'Make America great again' können Hand in Hand gehen. Wir können uns gegenseitig zum Erfolg verhelfen." + +Zwischenzeile: Beide können zufrieden sein + +Sowohl Xi als auch Trump führen eine nationalistische und auf Eigeninteressen bedachte Außenpolitik. Mit dem Unterschied, dass Xi einen langfristigen, strategischen Plan hat, wie er die "Wiederbelebung der Nation" erreichen will und Trump ein eher impulsives Deal-Making betreibt. Xi möchte China gern bis 2049 zur dominierenden, globalen Supermacht machen. So betrachtet fällt die Bilanz des zweitägigen Treffens ganz offenbar für beide nicht schlecht aus, wenn man die beiden wichtigsten Krisenfelder aus ihrer jeweiligen Perspektive betrachtet: Handel und Geopolitik. + + +--- Ende --- + diff --git a/chatterbox_cli_v4.py b/chatterbox_cli_v4.py new file mode 100755 index 0000000..14c2a64 --- /dev/null +++ b/chatterbox_cli_v4.py @@ -0,0 +1,1033 @@ +#!/usr/bin/env python3 + +import argparse +import importlib.util +import queue +import re +import sys +import threading +import time +from pathlib import Path +from typing import List, Optional, Tuple + +import torch +import torchaudio as ta + +# SDPA does not support output_attentions=True (required by AlignmentStreamAnalyzer hook); +# fall back to eager attention so attention weights are returned as tensors, not None. +import chatterbox.models.t3.llama_configs as _llama_cfg +_llama_cfg.LLAMA_520M_CONFIG_DICT["attn_implementation"] = "eager" + +from chatterbox.tts import ChatterboxTTS + +try: + from chatterbox.mtl_tts import ChatterboxMultilingualTTS + HAS_MULTILINGUAL = True +except Exception: + ChatterboxMultilingualTTS = None + HAS_MULTILINGUAL = False + + +SUPPORTED_LANGS = { + "ar", "da", "de", "el", "en", "es", "fi", "fr", "he", "hi", "it", + "ja", "ko", "ms", "nl", "no", "pl", "pt", "ru", "sv", "sw", "tr", "zh" +} + +SENTENCE_END_RE = re.compile( + r'.+?(?:' + r'\.\.\.|…|' + r'[!?¡¿]+|' + r'[!?。]+|' + r'‽|' + r'(? str: + for phrase, replacement in sorted(pron_dict.items(), key=lambda x: len(x[0]), reverse=True): + text = text.replace(phrase, replacement) + return text + + +def clean_raw_text(text: str) -> str: + """Unsichtbare Steuerzeichen entfernen, die Splitting oder TTS stoeren.""" + for ch in ('​', '‌', '‍', ''): + text = text.replace(ch, '') + return text + + +def has_module(name: str) -> bool: + return importlib.util.find_spec(name) is not None + + +def get_device(explicit_device: Optional[str] = None) -> str: + if explicit_device: + if explicit_device.startswith("cuda") and not torch.cuda.is_available(): + raise RuntimeError("CUDA angefordert, aber keine CUDA-GPU verfügbar.") + if explicit_device.startswith("cuda"): + try: + idx = int(explicit_device.split(":")[1]) if ":" in explicit_device else 0 + except (IndexError, ValueError): + idx = 0 + torch.cuda.set_device(idx) + return explicit_device + + if torch.cuda.is_available(): + torch.cuda.set_device(0) + return "cuda:0" + + return "cpu" + + +def number_to_words_de(n: int) -> str: + ones = { + 0: "null", 1: "eins", 2: "zwei", 3: "drei", 4: "vier", 5: "fünf", + 6: "sechs", 7: "sieben", 8: "acht", 9: "neun", 10: "zehn", + 11: "elf", 12: "zwölf", 13: "dreizehn", 14: "vierzehn", + 15: "fünfzehn", 16: "sechzehn", 17: "siebzehn", 18: "achtzehn", + 19: "neunzehn" + } + tens = { + 20: "zwanzig", 30: "dreißig", 40: "vierzig", 50: "fünfzig", + 60: "sechzig", 70: "siebzig", 80: "achtzig", 90: "neunzig" + } + + if n < 20: + return ones[n] + + if n < 100: + t = (n // 10) * 10 + o = n % 10 + if o == 0: + return tens[t] + one_prefix = "ein" if o == 1 else ones[o] + return f"{one_prefix}und{tens[t]}" + + if n < 1000: + h = n // 100 + r = n % 100 + prefix = "einhundert" if h == 1 else f"{ones[h]}hundert" + return prefix if r == 0 else f"{prefix}{number_to_words_de(r)}" + + if n < 1000000: + th = n // 1000 + r = n % 1000 + prefix = "eintausend" if th == 1 else f"{number_to_words_de(th)}tausend" + return prefix if r == 0 else f"{prefix}{number_to_words_de(r)}" + + return str(n) + + +def number_to_words_en(n: int) -> str: + ones = { + 0: "zero", 1: "one", 2: "two", 3: "three", 4: "four", 5: "five", + 6: "six", 7: "seven", 8: "eight", 9: "nine", 10: "ten", + 11: "eleven", 12: "twelve", 13: "thirteen", 14: "fourteen", + 15: "fifteen", 16: "sixteen", 17: "seventeen", 18: "eighteen", + 19: "nineteen" + } + tens = { + 20: "twenty", 30: "thirty", 40: "forty", 50: "fifty", + 60: "sixty", 70: "seventy", 80: "eighty", 90: "ninety" + } + + if n < 20: + return ones[n] + + if n < 100: + t = (n // 10) * 10 + o = n % 10 + return tens[t] if o == 0 else f"{tens[t]}-{ones[o]}" + + if n < 1000: + h = n // 100 + r = n % 100 + prefix = f"{ones[h]} hundred" + return prefix if r == 0 else f"{prefix} {number_to_words_en(r)}" + + if n < 1000000: + th = n // 1000 + r = n % 1000 + prefix = f"{number_to_words_en(th)} thousand" + return prefix if r == 0 else f"{prefix} {number_to_words_en(r)}" + + return str(n) + + +def year_to_words_de(year: int) -> str: + if year < 1000 or year > 9999: + return str(year) + + if year == 2000: + return "zweitausend" + + if 2001 <= year <= 2099: + return f"zweitausend{number_to_words_de(year - 2000)}" + + return number_to_words_de(year) + + +def year_to_words_en(year: int) -> str: + if year < 1000 or year > 9999: + return str(year) + + if 2000 <= year <= 2009: + if year == 2000: + return "two thousand" + return f"two thousand {number_to_words_en(year - 2000)}" + + if 2010 <= year <= 2099: + last_two = year % 100 + return f"twenty {number_to_words_en(last_two)}" + + first_two = year // 100 + last_two = year % 100 + if last_two == 0: + return f"{number_to_words_en(first_two)} hundred" + return f"{number_to_words_en(first_two)} {number_to_words_en(last_two)}" + + +def spell_out_acronym(token: str, mode: str = "period_space") -> str: + chars = list(token) + + if mode == "german": + return " ".join(GERMAN_LETTER_NAMES.get(c, c) for c in chars) + + if mode == "space": + return " ".join(chars) + + if mode == "period": + return ".".join(chars) + "." + + if mode == "comma": + return ", ".join(chars) + + if mode == "period_space": + return ". ".join(chars) + "." + + raise ValueError(f"Unbekannter mode: {mode}") + + +def normalize_units(text: str, lang: str) -> str: + if lang != "de": + return text + + for unit, expanded in sorted(UNIT_REPLACEMENTS.items(), key=lambda x: len(x[0]), reverse=True): + text = re.sub(rf'(?<=\d)\s*{re.escape(unit)}\b', f" {expanded}", text) + + return text + + +def normalize_times(text: str, lang: str) -> str: + def repl(match: re.Match) -> str: + hh = int(match.group(1)) + mm = int(match.group(3)) + + if lang == "de": + if mm == 0: + return f"{number_to_words_de(hh)} Uhr" + return f"{number_to_words_de(hh)} Uhr {number_to_words_de(mm)}" + + if lang == "en": + if hh == 0 and mm == 0: + return "twelve midnight" + if hh == 12 and mm == 0: + return "twelve noon" + + hour12 = hh % 12 + if hour12 == 0: + hour12 = 12 + suffix = "a m" if hh < 12 else "p m" + + if mm == 0: + return f"{number_to_words_en(hour12)} {suffix}" + if mm < 10: + return f"{number_to_words_en(hour12)} oh {number_to_words_en(mm)} {suffix}" + return f"{number_to_words_en(hour12)} {number_to_words_en(mm)} {suffix}" + + return match.group(0) + + return TIME_RE.sub(repl, text) + + +def normalize_years(text: str, lang: str) -> str: + def repl(match: re.Match) -> str: + year = int(match.group(1)) + + if lang == "de": + return year_to_words_de(year) + + if lang == "en": + return year_to_words_en(year) + + return match.group(0) + + return YEAR_RE.sub(repl, text) + + +def preprocess_tts_text( + text: str, + lang: str, + spell_uppercase_acronyms: bool = True, + acronym_mode: Optional[str] = None, # None = auto: 'german' bei de, sonst 'period_space' + normalize_time_values: bool = True, + normalize_year_values: bool = True, + normalize_units_values: bool = True, + pronunciation_dict: Optional[dict] = None, +) -> str: + if acronym_mode is None: + acronym_mode = "german" if lang == "de" else "period_space" + + # 1. Aussprache-Wörterbuch zuerst (vor Akronym-Expansion, damit Eigennamen greifen) + if lang == "de": + text = apply_pronunciation_dict(text, DEFAULT_PRONUNCIATION_DE) + if pronunciation_dict: + text = apply_pronunciation_dict(text, pronunciation_dict) + + if normalize_units_values: + text = normalize_units(text, lang) + + if normalize_time_values: + text = normalize_times(text, lang) + + if normalize_year_values: + text = normalize_years(text, lang) + + if spell_uppercase_acronyms: + def repl_compound(match: re.Match) -> str: + acr = match.group(1) + if acr in NON_SPELLED_ACRONYMS: + return acr + " " + return spell_out_acronym(acr, mode=acronym_mode) + " " + + def repl(match: re.Match) -> str: + token = match.group(0) + if token in NON_SPELLED_ACRONYMS: + return token + return spell_out_acronym(token, mode=acronym_mode) + + # Compound zuerst: "US-Präsident" → "U Es Präsident" (Bindestrich weg) + text = ACRONYM_COMPOUND_RE.sub(repl_compound, text) + # Dann verbleibende Akronyme buchstabieren + text = UPPER_ACRONYM_RE.sub(repl, text) + + text = re.sub(r'\s+', ' ', text).strip() + return text + + +def split_long_text(text: str, max_len: int = 400) -> List[str]: + chunks = [] + current = "" + + for part in text.split("\n\n"): + part = part.strip() + if not part: + continue + + sentences = SENTENCE_END_RE.findall(part) + + consumed = "".join(sentences).strip() + rest = part[len(consumed):].strip() + if rest: + sentences.append(rest) + + for sentence in sentences: + sentence = sentence.strip() + if not sentence: + continue + + if len(sentence) > max_len: + if current: + chunks.append(current.strip()) + current = "" + + chunks.extend(force_split_sentence(sentence, max_len)) + continue + + if current and len(current) + 1 + len(sentence) > max_len: + chunks.append(current.strip()) + current = sentence + else: + current = f"{current} {sentence}".strip() if current else sentence + + if current: + chunks.append(current.strip()) + current = "" + + return chunks + + +def split_for_conversation(text: str, first_chunk_len: int = 120, max_len: int = 400) -> List[str]: + base_chunks = split_long_text(text, max_len=max_len) + if not base_chunks: + return [] + + first = base_chunks[0] + if len(first) <= first_chunk_len: + return base_chunks + + early = force_split_sentence(first, first_chunk_len) + return early + base_chunks[1:] + + +def force_split_sentence(text: str, max_len: int) -> List[str]: + text = re.sub(r"\s+", " ", text).strip() + if len(text) <= max_len: + return [text] + + parts = [] + remaining = text + + while len(remaining) > max_len: + split_pos = remaining.rfind(" ", 0, max_len + 1) + if split_pos <= 0: + split_pos = max_len + parts.append(remaining[:split_pos].strip()) + remaining = remaining[split_pos:].strip() + + if remaining: + parts.append(remaining) + + return parts + + +def split_into_sentences(text: str, max_len: int = 200) -> List[str]: + result = [] + for part in text.split("\n\n"): + part = part.strip() + if not part: + continue + if SEPARATOR_LINE_RE.match(part): + continue + sentences = SENTENCE_END_RE.findall(part) + consumed = "".join(sentences).strip() + rest = part[len(consumed):].strip() + if rest: + sentences.append(rest) + for sentence in sentences: + sentence = sentence.strip() + if not sentence: + continue + if len(sentence) > max_len: + result.extend(force_split_sentence(sentence, max_len)) + else: + result.append(sentence) + return result + + +def read_input_text(text_arg: Optional[str], input_path: Optional[str]) -> str: + if text_arg and input_path: + raise ValueError("Bitte entweder --text oder --input angeben, nicht beides.") + if not text_arg and not input_path: + raise ValueError("Bitte --text oder --input angeben.") + + if text_arg: + return text_arg.strip() + + path = Path(input_path) + if not path.exists(): + raise FileNotFoundError(f"Input-Datei nicht gefunden: {path}") + + return path.read_text(encoding="utf-8").strip() + + +def default_output_path(input_path: Optional[str], lang: str) -> Path: + if input_path: + src = Path(input_path) + return src.with_suffix(f".{lang}.wav") + return Path(f"tts_output.{lang}.wav") + + +def load_model(lang: str, device: str, t3_model: Optional[str] = None): + if lang == "en": + model = ChatterboxTTS.from_pretrained(device=device) + return model, "mono", model.sr + + if not HAS_MULTILINGUAL: + raise RuntimeError( + "Multilingual-Modell nicht verfügbar. Installiere ein Chatterbox-Paket mit chatterbox.mtl_tts." + ) + + model = ChatterboxMultilingualTTS.from_pretrained(device=device, t3_model=t3_model) + return model, "multi", model.sr + + +def generate_chunk(model, model_kind: str, text: str, lang: str, voice_path: Optional[str]): + kwargs = {} + if voice_path: + kwargs["audio_prompt_path"] = voice_path + + if model_kind == "mono": + return model.generate(text, **kwargs) + + return model.generate(text, language_id=lang, **kwargs) + + +def generate_stream_chunk( + model, + model_kind: str, + text: str, + lang: str, + voice_path: Optional[str], + stream_chunk_size: int, +): + kwargs = { + "chunk_size": stream_chunk_size, + "print_metrics": False, + } + if voice_path: + kwargs["audio_prompt_path"] = voice_path + + if model_kind == "mono": + return model.generate_stream(text, **kwargs) + + return model.generate_stream(text, language_id=lang, **kwargs) + + +class PlaybackWorker: + PLAYBACK_RATE = 48000 # PipeWire/PulseAudio standard + CALLBACK_BLOCK = 2048 # ~43 ms pro Callback-Block bei 48 kHz + + def __init__(self, sample_rate: int, device: Optional[str] = "pulse", speed: float = 1.0): + self.sample_rate = sample_rate + self.device = device + self.speed = speed + # Eingang: Torch-Tensoren vom TTS-Modell + self.audio_queue: "queue.Queue[Optional[torch.Tensor]]" = queue.Queue() + # Intern: fertig vorbereitete numpy-Blöcke für den Callback + self._block_queue: "queue.Queue" = queue.Queue(maxsize=500) + self._blocks_produced = 0 + self._blocks_consumed = 0 + self.thread = None + self.error = None + + def start(self): + if not has_module("sounddevice"): + raise RuntimeError( + "Für Live-Wiedergabe ist das Modul 'sounddevice' nötig. Installiere z. B. 'pip install sounddevice'." + ) + self.thread = threading.Thread(target=self._run, daemon=True) + self.thread.start() + + def _callback(self, outdata, frames, time_info, status): + # Läuft im Audio-Thread: so schnell wie möglich, kein Lock nötig. + try: + data = self._block_queue.get_nowait() + outdata[:, 0] = data + self._blocks_consumed += 1 + except queue.Empty: + outdata[:] = 0.0 # Stille statt Underrun-Klick + + def _produce(self): + """Wandelt Torch-Tensoren in CALLBACK_BLOCK-große numpy-Arrays um.""" + import numpy as np + + remainder = np.zeros(0, dtype="float32") + + while True: + item = self.audio_queue.get() + if item is None: + break + + chunk = item.detach().cpu() + if chunk.ndim == 2: + chunk = chunk.squeeze(0) + + if self.speed != 1.0: + import pyrubberband as pyrb + # R3-Engine (--fine): deutlich weniger Phasiness als R2, besser für Sprache. + # rate < 1.0 = langsamer, rate > 1.0 = schneller; Pitch bleibt gleich. + stretched = pyrb.time_stretch( + chunk.numpy().astype("float64"), self.sample_rate, self.speed, + rbargs={"--fine": ""}, + ) + chunk = torch.from_numpy(stretched.astype("float32")) + + chunk = ta.functional.resample(chunk, self.sample_rate, self.PLAYBACK_RATE) + + samples = np.concatenate([remainder, chunk.numpy().astype("float32")]) + + i = 0 + while i + self.CALLBACK_BLOCK <= len(samples): + self._block_queue.put(samples[i : i + self.CALLBACK_BLOCK]) + self._blocks_produced += 1 + i += self.CALLBACK_BLOCK + remainder = samples[i:] + + # Restliche Samples (< CALLBACK_BLOCK) mit Stille auffüllen + if len(remainder) > 0: + block = np.zeros(self.CALLBACK_BLOCK, dtype="float32") + block[: len(remainder)] = remainder + self._block_queue.put(block) + self._blocks_produced += 1 + + def _run(self): + try: + import sounddevice as sd + + producer = threading.Thread(target=self._produce, daemon=True) + producer.start() + + with sd.OutputStream( + samplerate=self.PLAYBACK_RATE, + channels=1, + dtype="float32", + device=self.device, + blocksize=self.CALLBACK_BLOCK, + callback=self._callback, + ): + producer.join() # alle Tensoren sind zu Blöcken konvertiert + + # Warten bis der Callback alle Blöcke abgespielt hat + while self._blocks_consumed < self._blocks_produced: + time.sleep(0.02) + + # Letzten Block aus Hardware-Buffer ausspielen lassen + time.sleep(self.CALLBACK_BLOCK / self.PLAYBACK_RATE + 0.1) + + except Exception as e: + self.error = e + + def put(self, chunk: torch.Tensor): + self.audio_queue.put(chunk) + + def stop(self): + self.audio_queue.put(None) + if self.thread: + self.thread.join() + if self.error: + raise RuntimeError(f"Fehler bei Live-Wiedergabe: {self.error}") + + +def synthesize_non_streaming( + text: str, + lang: str, + output_path: Optional[Path], + max_len: int, + first_chunk_len: int, + voice_path: Optional[str], + device: str, + show_progress: bool = True, + spell_uppercase_acronyms: bool = True, + acronym_mode: Optional[str] = None, + normalize_time_values: bool = True, + normalize_year_values: bool = True, + normalize_units_values: bool = True, + conversation_mode: bool = True, + play_audio: bool = False, + save_wav: bool = True, + audio_device: Optional[str] = "pulse", + sentence_mode: bool = True, + speed: float = 1.0, + debug_delay: float = 0.0, + t3_model: Optional[str] = None, + pronunciation_dict: Optional[dict] = None, +) -> Optional[Path]: + if lang not in SUPPORTED_LANGS: + raise ValueError( + f"Nicht unterstützte Sprache '{lang}'. Unterstützt: {', '.join(sorted(SUPPORTED_LANGS))}" + ) + + if voice_path and not Path(voice_path).exists(): + raise FileNotFoundError(f"Voice-Referenz nicht gefunden: {voice_path}") + + # Erst unsichtbare Zeichen entfernen, dann Sätze splitten (Paragraphen-Struktur erhalten), + # danach erst Akronym-Expansion — sonst erzeugen "A. R. D."-Punkte falsche Satzgrenzen. + text = clean_raw_text(text) + + model, model_kind, sr = load_model(lang, device, t3_model=t3_model) + + if sentence_mode: + raw_chunks = split_into_sentences(text, max_len=max_len) + elif conversation_mode: + raw_chunks = split_for_conversation(text, first_chunk_len=first_chunk_len, max_len=max_len) + else: + raw_chunks = split_long_text(text, max_len=max_len) + + preprocess_kw = dict( + lang=lang, + spell_uppercase_acronyms=spell_uppercase_acronyms, + acronym_mode=acronym_mode, + normalize_time_values=normalize_time_values, + normalize_year_values=normalize_year_values, + normalize_units_values=normalize_units_values, + pronunciation_dict=pronunciation_dict, + ) + chunks = [preprocess_tts_text(c, **preprocess_kw) for c in raw_chunks] + chunks = [c for c in chunks if c.strip()] + + if not chunks: + raise ValueError("Kein verwertbarer Text nach dem Einlesen gefunden.") + + if show_progress: + print(f"Sprache: {lang}") + print(f"Gerät: {device}") + print(f"Modell: {'ChatterboxTTS (monolingual)' if model_kind == 'mono' else 'ChatterboxMultilingualTTS'}") + print(f"Sätze: {len(chunks)}") + print(f"Modus: {'Satz-für-Satz' if sentence_mode else 'non-streaming'} + Playback") + print(f"Live-Wiedergabe: {'ja' if play_audio else 'nein'}") + print(f"WAV speichern: {'ja' if save_wav and output_path else 'nein'}") + if output_path and save_wav: + print(f"Ausgabe: {output_path}") + + if play_audio: + playback = PlaybackWorker(sample_rate=sr, device=audio_device, speed=speed) + playback.start() + else: + playback = None + + wavs = [] + try: + for i, chunk in enumerate(chunks, start=1): + if debug_delay > 0: + if show_progress: + print(f"[{i}/{len(chunks)}] Warte {debug_delay:.0f}s (debug_delay) ...") + time.sleep(debug_delay) + if show_progress: + print(f"[{i}/{len(chunks)}] Generiere ({len(chunk)} Zeichen) ...") + wav = generate_chunk(model, model_kind, chunk, lang, voice_path) + wavs.append(wav) + if playback is not None: + playback.put(wav) + finally: + if playback is not None: + playback.stop() + + if not wavs: + return None + + final_wav = wavs[0] if len(wavs) == 1 else torch.cat(wavs, dim=-1) + + if save_wav and output_path: + output_path.parent.mkdir(parents=True, exist_ok=True) + ta.save(str(output_path), final_wav, sr) + return output_path + + return None + + +def synthesize_streaming( + text: str, + lang: str, + output_path: Optional[Path], + max_len: int, + first_chunk_len: int, + voice_path: Optional[str], + device: str, + show_progress: bool = True, + spell_uppercase_acronyms: bool = True, + acronym_mode: str = "period_space", + normalize_time_values: bool = True, + normalize_year_values: bool = True, + normalize_units_values: bool = True, + conversation_mode: bool = True, + play_audio: bool = True, + save_wav: bool = True, + stream_chunk_size: int = 25, + audio_device: Optional[str] = None, +) -> Optional[Path]: + if lang not in SUPPORTED_LANGS: + raise ValueError( + f"Nicht unterstützte Sprache '{lang}'. Unterstützt: {', '.join(sorted(SUPPORTED_LANGS))}" + ) + + if voice_path and not Path(voice_path).exists(): + raise FileNotFoundError(f"Voice-Referenz nicht gefunden: {voice_path}") + + text = preprocess_tts_text( + text=text, + lang=lang, + spell_uppercase_acronyms=spell_uppercase_acronyms, + acronym_mode=acronym_mode, + normalize_time_values=normalize_time_values, + normalize_year_values=normalize_year_values, + normalize_units_values=normalize_units_values, + ) + + model, model_kind, sr = load_model(lang, device) + + if not hasattr(model, "generate_stream"): + raise RuntimeError( + "Dieses Chatterbox-Paket bietet kein generate_stream(). " + "Installiere z. B. 'chatterbox-streaming'." + ) + + if conversation_mode: + text_chunks = split_for_conversation(text, first_chunk_len=first_chunk_len, max_len=max_len) + else: + text_chunks = split_long_text(text, max_len=max_len) + + if not text_chunks: + raise ValueError("Kein verwertbarer Text nach dem Einlesen gefunden.") + + if play_audio: + playback = PlaybackWorker(sample_rate=sr, device=audio_device) + playback.start() + else: + playback = None + + all_audio_chunks: List[torch.Tensor] = [] + t0 = time.perf_counter() + first_audio_started = False + + if show_progress: + print(f"Sprache: {lang}") + print(f"Gerät: {device}") + print(f"Modell: {'ChatterboxTTS (monolingual)' if model_kind == 'mono' else 'ChatterboxMultilingualTTS'}") + print(f"Text-Chunks: {len(text_chunks)}") + print(f"Modus: streaming") + print(f"Gesprächsmodus: {'ja' if conversation_mode else 'nein'}") + print(f"Live-Wiedergabe: {'ja' if play_audio else 'nein'}") + print(f"WAV speichern: {'ja' if save_wav and output_path else 'nein'}") + print(f"Streaming chunk_size: {stream_chunk_size}") + if output_path: + print(f"Ausgabe: {output_path}") + + try: + for text_idx, text_chunk in enumerate(text_chunks, start=1): + if show_progress: + print(f"[Text {text_idx}/{len(text_chunks)}] Starte Streaming für {len(text_chunk)} Zeichen ...") + + stream_iter = generate_stream_chunk( + model=model, + model_kind=model_kind, + text=text_chunk, + lang=lang, + voice_path=voice_path, + stream_chunk_size=stream_chunk_size, + ) + + for audio_idx, item in enumerate(stream_iter, start=1): + if isinstance(item, tuple) and len(item) == 2: + audio_chunk, metrics = item + else: + audio_chunk, metrics = item, None + + all_audio_chunks.append(audio_chunk) + + if playback is not None: + playback.put(audio_chunk) + if not first_audio_started: + first_audio_started = True + if show_progress: + dt = time.perf_counter() - t0 + print(f"Audio-Wiedergabe gestartet nach {dt:.3f}s") + + if show_progress: + msg = f" -> Audio-Chunk {audio_idx}" + if metrics is not None: + latency = getattr(metrics, "latency_to_first_chunk", None) + rtf = getattr(metrics, "rtf", None) + chunk_count = getattr(metrics, "chunk_count", None) + if chunk_count is not None: + msg += f", model_chunk={chunk_count}" + if latency: + msg += f", first_latency={latency:.3f}s" + if rtf: + msg += f", rtf={rtf:.3f}" + print(msg) + + finally: + if playback is not None: + playback.stop() + + final_output = None + if save_wav and output_path: + final_audio = all_audio_chunks[0] if len(all_audio_chunks) == 1 else torch.cat(all_audio_chunks, dim=-1) + output_path.parent.mkdir(parents=True, exist_ok=True) + ta.save(str(output_path), final_audio, sr) + final_output = output_path + + return final_output + + +def build_argparser() -> argparse.ArgumentParser: + p = argparse.ArgumentParser( + description="Low-Latency Chatterbox TTS CLI mit deutscher Text-Normalisierung und optionalem Streaming." + ) + p.add_argument("--text", type=str, help="Direkter Eingabetext.") + p.add_argument("--input", type=str, help="Pfad zu UTF-8-Textdatei.") + p.add_argument("--lang", type=str, default="de", help="Sprachcode, default: de.") + p.add_argument("--len", dest="max_len", type=int, default=400, help="Maximale Chunk-Länge, default: 400.") + p.add_argument("--first-chunk-len", type=int, default=80, help="Kleinere Zielgröße für den ersten Chunk im Gesprächsmodus. Default: 80.") + p.add_argument("--output", type=str, help="Ausgabedatei .wav") + p.add_argument("--voice", type=str, help="Optionale Referenz-WAV für Voice-Cloning.") + p.add_argument("--device", type=str, default=None, help="z. B. cuda:0 oder cpu.") + p.add_argument("--no-progress", action="store_true", help="Weniger Konsolen-Output.") + p.add_argument("--no-spell-acronyms", action="store_true", help="Großgeschriebene Akronyme nicht buchstabieren.") + p.add_argument( + "--acronym-mode", + type=str, + default=None, # None = automatisch: 'german' bei de, 'period_space' sonst + choices=["space", "period", "comma", "period_space", "german"], + help="Ausgabeformat für buchstabierte Akronyme. Default: 'german' bei --lang de, sonst 'period_space'." + ) + p.add_argument("--pronunciation-dict", type=str, default=None, help="Pfad zu einer JSON-Datei mit Aussprache-Substitutionen (Eigenname → Lautschrift).") + p.add_argument("--no-normalize-times", action="store_true", help="Uhrzeiten nicht in sprechbaren Text umwandeln.") + p.add_argument("--no-normalize-years", action="store_true", help="Jahreszahlen nicht in sprechbaren Text umwandeln.") + p.add_argument("--no-normalize-units", action="store_true", help="Einheiten nicht in sprechbaren Text umwandeln.") + p.add_argument("--stream", action="store_true", help="Streaming-TTS-Modus (experimentell, kann abgehackt klingen).") + p.add_argument("--no-play", action="store_true", help="Nicht live abspielen.") + p.add_argument("--audio-device", type=str, default="pulse", help="Sounddevice-Ausgabegerät, z. B. 'pulse' oder 'M2: USB Audio'. Standard: pulse.") + p.add_argument("--save", action="store_true", help="WAV-Datei speichern (Standard: nein).") + p.add_argument("--stream-chunk-size", type=int, default=12, help="Streaming chunk_size (nur mit --stream). Default: 12.") + p.add_argument("--no-sentence-mode", action="store_true", help="Sätze zu größeren Chunks gruppieren statt einzeln ausgeben.") + p.add_argument("--speed", type=float, default=1.0, help="Wiedergabegeschwindigkeit: 0.8 = 20%% langsamer, 1.2 = 20%% schneller. Default: 1.0.") + p.add_argument("--debug-delay", type=float, default=0.0, help="Sekunden Pause vor jedem Satz (simuliert langsame KI). Nur zum Testen.") + p.add_argument("--t3-model", type=str, default="v3", help="Multilingual T3-Modell: 'v3' (default), 'v2' oder Dateiname.") + p.add_argument("--no-conversation-mode", action="store_true", help="Ersten Chunk nicht künstlich kleiner machen (nur ohne --no-sentence-mode).") + return p + + +def main() -> int: + parser = build_argparser() + args = parser.parse_args() + + try: + text = read_input_text(args.text, args.input) + device = get_device(args.device) + output_path = Path(args.output) if args.output else default_output_path(args.input, args.lang) + + save_wav = args.save or bool(args.output) + + # Acronym-Mode-Default: 'german' bei Deutsch, 'period_space' sonst + acronym_mode = args.acronym_mode or ("german" if args.lang == "de" else "period_space") + + # Optionales Aussprache-Wörterbuch laden + pronunciation_dict: Optional[dict] = None + if args.pronunciation_dict: + import json + pron_path = Path(args.pronunciation_dict) + if not pron_path.exists(): + raise FileNotFoundError(f"Aussprache-Dict nicht gefunden: {pron_path}") + pronunciation_dict = json.loads(pron_path.read_text(encoding="utf-8")) + + if args.stream: + out = synthesize_streaming( + text=text, + lang=args.lang, + output_path=output_path if save_wav else None, + max_len=args.max_len, + first_chunk_len=args.first_chunk_len, + voice_path=args.voice, + device=device, + show_progress=not args.no_progress, + spell_uppercase_acronyms=not args.no_spell_acronyms, + acronym_mode=acronym_mode, + normalize_time_values=not args.no_normalize_times, + normalize_year_values=not args.no_normalize_years, + normalize_units_values=not args.no_normalize_units, + conversation_mode=not args.no_conversation_mode, + play_audio=not args.no_play, + save_wav=save_wav, + stream_chunk_size=args.stream_chunk_size, + audio_device=args.audio_device, + ) + else: + out = synthesize_non_streaming( + text=text, + lang=args.lang, + output_path=output_path if save_wav else None, + max_len=args.max_len, + first_chunk_len=args.first_chunk_len, + voice_path=args.voice, + device=device, + show_progress=not args.no_progress, + spell_uppercase_acronyms=not args.no_spell_acronyms, + acronym_mode=acronym_mode, + normalize_time_values=not args.no_normalize_times, + normalize_year_values=not args.no_normalize_years, + normalize_units_values=not args.no_normalize_units, + conversation_mode=not args.no_conversation_mode, + play_audio=not args.no_play, + save_wav=save_wav, + audio_device=args.audio_device, + sentence_mode=not args.no_sentence_mode, + speed=args.speed, + debug_delay=args.debug_delay, + t3_model=args.t3_model, + pronunciation_dict=pronunciation_dict, + ) + + if out is not None: + print(f"Fertig: {out}") + else: + print("Fertig.") + return 0 + + except Exception as e: + print(f"Fehler: {e}", file=sys.stderr) + return 1 + + +if __name__ == "__main__": + raise SystemExit(main()) + diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..6408a4d --- /dev/null +++ b/requirements.txt @@ -0,0 +1,17 @@ +# Chatterbox TTS CLI — Abhängigkeiten +# Getestet mit Python 3.11, CUDA 12.x, Ubuntu 22.04/24.04 + +# TTS-Kern +chatterbox-tts>=0.1.7 + +# PyTorch (passende CUDA-Version separat installieren, z. B. via pytorch.org) +torch>=2.6.0 +torchaudio>=2.6.0 + +# Audio-Ausgabe (Linux/PipeWire/PulseAudio) +sounddevice>=0.4.0 + +# Pitch-erhaltende Zeitstreckung (Geschwindigkeitsanpassung) +pyrubberband>=0.4.0 +# rubberband-cli muss zusätzlich als Systempakete installiert sein: +# sudo apt install rubberband-cli