chatterbox-tts-cli/chatterbox_cli_v4.py

1084 lines
37 KiB
Python
Raw Normal View History

2026-05-16 08:56:50 +02:00
#!/usr/bin/env python3
import argparse
import importlib.util
import queue
import re
import sys
import threading
import time
from pathlib import Path
from typing import List, Optional, Tuple
# ---------------------------------------------------------------------------
# Kooperativer Stop-Mechanismus
# ---------------------------------------------------------------------------
STOP_REQUESTED = threading.Event()
def request_stop() -> None:
STOP_REQUESTED.set()
def clear_stop() -> None:
STOP_REQUESTED.clear()
def stop_requested() -> bool:
return STOP_REQUESTED.is_set()
2026-05-16 08:56:50 +02:00
import torch
import torchaudio as ta
# SDPA does not support output_attentions=True (required by AlignmentStreamAnalyzer hook);
# fall back to eager attention so attention weights are returned as tensors, not None.
import chatterbox.models.t3.llama_configs as _llama_cfg
_llama_cfg.LLAMA_520M_CONFIG_DICT["attn_implementation"] = "eager"
from chatterbox.tts import ChatterboxTTS
try:
from chatterbox.mtl_tts import ChatterboxMultilingualTTS
HAS_MULTILINGUAL = True
except Exception:
ChatterboxMultilingualTTS = None
HAS_MULTILINGUAL = False
SUPPORTED_LANGS = {
"ar", "da", "de", "el", "en", "es", "fi", "fr", "he", "hi", "it",
"ja", "ko", "ms", "nl", "no", "pl", "pt", "ru", "sv", "sw", "tr", "zh"
}
SENTENCE_END_RE = re.compile(
r'.+?(?:'
r'\.\.\.|…|'
r'[!?¡¿]+|'
r'[!?。]+|'
r'‽|'
r'(?<!\d)\.' # Punkt, aber NICHT nach einer Ziffer (kein Ordinalzahl-Split)
r')(?=\s+|$)',
re.DOTALL
)
NON_SPELLED_ACRONYMS = {
"NATO",
"NASA",
"UNESCO",
"OPEC",
}
GERMAN_LETTER_NAMES = {
'A': 'Ah', 'B': 'Be', 'C': 'Tse', 'D': 'De', 'E': 'E',
'F': 'Ef', 'G': 'Ge', 'H': 'Ha', 'I': 'I', 'J': 'Jot',
'K': 'Ka', 'L': 'El', 'M': 'Em', 'N': 'En', 'O': 'O',
'P': 'Pe', 'Q': 'Ku', 'R': 'Er', 'S': 'Es', 'T': 'Te',
'U': 'U', 'V': 'Fau', 'W': 'We', 'X': 'Iks', 'Y': 'Ypsilon',
'Z': 'Tset',
}
# Trennlinien wie "--- Ende ---", "===", "---" filtern
# Matcht: reine Strichlinien ODER "---Wort---"-Muster mit kurzem Inhalt (<= 20 Zeichen)
SEPARATOR_LINE_RE = re.compile(r'^\s*-{2,}\s*[\w\s]{0,20}\s*-{2,}\s*$|^\s*[=_-]{3,}\s*$')
UPPER_ACRONYM_RE = re.compile(r'\b[A-ZÄÖÜ]{2,}(?:[A-ZÄÖÜ0-9]*[A-ZÄÖÜ])?\b')
# Akronym direkt vor Bindestrich + Wort: "US-Präsident", "NATO-Mitglied"
ACRONYM_COMPOUND_RE = re.compile(r'\b([A-ZÄÖÜ]{2,}(?:[A-ZÄÖÜ0-9]*[A-ZÄÖÜ])?)-(?=[A-ZÄÖÜa-zäöü])')
# Unterstützt:
# - 14:58
# - 14.58
# - 14:58 Uhr
# - 14.58 Uhr
TIME_RE = re.compile(r'\b([01]?\d|2[0-3])([:.])([0-5]\d)(?:\s*Uhr)?\b', re.IGNORECASE)
# Vierstellige Jahreszahlen
YEAR_RE = re.compile(r'\b(19\d{2}|20\d{2}|21\d{2})\b')
# Einfache deutsche Einheiten
UNIT_REPLACEMENTS = {
"km/h": "Kilometer pro Stunde",
"km": "Kilometer",
"m": "Meter",
"cm": "Zentimeter",
"mm": "Millimeter",
"kg": "Kilogramm",
"g": "Gramm",
"mg": "Milligramm",
"Hz": "Hertz",
"kHz": "Kilohertz",
"MHz": "Megahertz",
"GHz": "Gigahertz",
"": "Euro",
"$": "Dollar",
"%": "Prozent",
"Kb": "Kilobyte",
"Mb": "Megabyte",
"GB": "Gigabyte",
"TB": "Terabyte",
"PB": "Petabyte",
}
# Eingebaute phonetische Annäherungen für häufige Fremdnamen (Deutsch)
DEFAULT_PRONUNCIATION_DE: dict[str, str] = {
"Xi Jinping": "Schi Jinping",
"Xi": "Schi",
"Jinping": "Jinping",
"Peking": "Peking", # bleibt — deutsches TTS kennt es
}
def apply_pronunciation_dict(text: str, pron_dict: dict[str, str]) -> str:
for phrase, replacement in sorted(pron_dict.items(), key=lambda x: len(x[0]), reverse=True):
text = text.replace(phrase, replacement)
return text
def clean_raw_text(text: str) -> str:
"""Unsichtbare Steuerzeichen entfernen, die Splitting oder TTS stoeren."""
for ch in ('', '', '', ''):
text = text.replace(ch, '')
return text
def has_module(name: str) -> bool:
return importlib.util.find_spec(name) is not None
def get_device(explicit_device: Optional[str] = None) -> str:
if explicit_device:
if explicit_device.startswith("cuda") and not torch.cuda.is_available():
raise RuntimeError("CUDA angefordert, aber keine CUDA-GPU verfügbar.")
if explicit_device.startswith("cuda"):
try:
idx = int(explicit_device.split(":")[1]) if ":" in explicit_device else 0
except (IndexError, ValueError):
idx = 0
torch.cuda.set_device(idx)
return explicit_device
if torch.cuda.is_available():
torch.cuda.set_device(0)
return "cuda:0"
return "cpu"
def number_to_words_de(n: int) -> str:
ones = {
0: "null", 1: "eins", 2: "zwei", 3: "drei", 4: "vier", 5: "fünf",
6: "sechs", 7: "sieben", 8: "acht", 9: "neun", 10: "zehn",
11: "elf", 12: "zwölf", 13: "dreizehn", 14: "vierzehn",
15: "fünfzehn", 16: "sechzehn", 17: "siebzehn", 18: "achtzehn",
19: "neunzehn"
}
tens = {
20: "zwanzig", 30: "dreißig", 40: "vierzig", 50: "fünfzig",
60: "sechzig", 70: "siebzig", 80: "achtzig", 90: "neunzig"
}
if n < 20:
return ones[n]
if n < 100:
t = (n // 10) * 10
o = n % 10
if o == 0:
return tens[t]
one_prefix = "ein" if o == 1 else ones[o]
return f"{one_prefix}und{tens[t]}"
if n < 1000:
h = n // 100
r = n % 100
prefix = "einhundert" if h == 1 else f"{ones[h]}hundert"
return prefix if r == 0 else f"{prefix}{number_to_words_de(r)}"
if n < 1000000:
th = n // 1000
r = n % 1000
prefix = "eintausend" if th == 1 else f"{number_to_words_de(th)}tausend"
return prefix if r == 0 else f"{prefix}{number_to_words_de(r)}"
return str(n)
def number_to_words_en(n: int) -> str:
ones = {
0: "zero", 1: "one", 2: "two", 3: "three", 4: "four", 5: "five",
6: "six", 7: "seven", 8: "eight", 9: "nine", 10: "ten",
11: "eleven", 12: "twelve", 13: "thirteen", 14: "fourteen",
15: "fifteen", 16: "sixteen", 17: "seventeen", 18: "eighteen",
19: "nineteen"
}
tens = {
20: "twenty", 30: "thirty", 40: "forty", 50: "fifty",
60: "sixty", 70: "seventy", 80: "eighty", 90: "ninety"
}
if n < 20:
return ones[n]
if n < 100:
t = (n // 10) * 10
o = n % 10
return tens[t] if o == 0 else f"{tens[t]}-{ones[o]}"
if n < 1000:
h = n // 100
r = n % 100
prefix = f"{ones[h]} hundred"
return prefix if r == 0 else f"{prefix} {number_to_words_en(r)}"
if n < 1000000:
th = n // 1000
r = n % 1000
prefix = f"{number_to_words_en(th)} thousand"
return prefix if r == 0 else f"{prefix} {number_to_words_en(r)}"
return str(n)
def year_to_words_de(year: int) -> str:
if year < 1000 or year > 9999:
return str(year)
if year == 2000:
return "zweitausend"
if 2001 <= year <= 2099:
return f"zweitausend{number_to_words_de(year - 2000)}"
return number_to_words_de(year)
def year_to_words_en(year: int) -> str:
if year < 1000 or year > 9999:
return str(year)
if 2000 <= year <= 2009:
if year == 2000:
return "two thousand"
return f"two thousand {number_to_words_en(year - 2000)}"
if 2010 <= year <= 2099:
last_two = year % 100
return f"twenty {number_to_words_en(last_two)}"
first_two = year // 100
last_two = year % 100
if last_two == 0:
return f"{number_to_words_en(first_two)} hundred"
return f"{number_to_words_en(first_two)} {number_to_words_en(last_two)}"
def spell_out_acronym(token: str, mode: str = "period_space") -> str:
chars = list(token)
if mode == "german":
return " ".join(GERMAN_LETTER_NAMES.get(c, c) for c in chars)
if mode == "space":
return " ".join(chars)
if mode == "period":
return ".".join(chars) + "."
if mode == "comma":
return ", ".join(chars)
if mode == "period_space":
return ". ".join(chars) + "."
raise ValueError(f"Unbekannter mode: {mode}")
def normalize_units(text: str, lang: str) -> str:
if lang != "de":
return text
for unit, expanded in sorted(UNIT_REPLACEMENTS.items(), key=lambda x: len(x[0]), reverse=True):
text = re.sub(rf'(?<=\d)\s*{re.escape(unit)}\b', f" {expanded}", text)
return text
def normalize_times(text: str, lang: str) -> str:
def repl(match: re.Match) -> str:
hh = int(match.group(1))
mm = int(match.group(3))
if lang == "de":
if mm == 0:
return f"{number_to_words_de(hh)} Uhr"
return f"{number_to_words_de(hh)} Uhr {number_to_words_de(mm)}"
if lang == "en":
if hh == 0 and mm == 0:
return "twelve midnight"
if hh == 12 and mm == 0:
return "twelve noon"
hour12 = hh % 12
if hour12 == 0:
hour12 = 12
suffix = "a m" if hh < 12 else "p m"
if mm == 0:
return f"{number_to_words_en(hour12)} {suffix}"
if mm < 10:
return f"{number_to_words_en(hour12)} oh {number_to_words_en(mm)} {suffix}"
return f"{number_to_words_en(hour12)} {number_to_words_en(mm)} {suffix}"
return match.group(0)
return TIME_RE.sub(repl, text)
def normalize_years(text: str, lang: str) -> str:
def repl(match: re.Match) -> str:
year = int(match.group(1))
if lang == "de":
return year_to_words_de(year)
if lang == "en":
return year_to_words_en(year)
return match.group(0)
return YEAR_RE.sub(repl, text)
def preprocess_tts_text(
text: str,
lang: str,
spell_uppercase_acronyms: bool = True,
acronym_mode: Optional[str] = None, # None = auto: 'german' bei de, sonst 'period_space'
normalize_time_values: bool = True,
normalize_year_values: bool = True,
normalize_units_values: bool = True,
pronunciation_dict: Optional[dict] = None,
) -> str:
if acronym_mode is None:
acronym_mode = "german" if lang == "de" else "period_space"
# 1. Aussprache-Wörterbuch zuerst (vor Akronym-Expansion, damit Eigennamen greifen)
if lang == "de":
text = apply_pronunciation_dict(text, DEFAULT_PRONUNCIATION_DE)
if pronunciation_dict:
text = apply_pronunciation_dict(text, pronunciation_dict)
if normalize_units_values:
text = normalize_units(text, lang)
if normalize_time_values:
text = normalize_times(text, lang)
if normalize_year_values:
text = normalize_years(text, lang)
if spell_uppercase_acronyms:
def repl_compound(match: re.Match) -> str:
acr = match.group(1)
if acr in NON_SPELLED_ACRONYMS:
return acr + " "
return spell_out_acronym(acr, mode=acronym_mode) + " "
def repl(match: re.Match) -> str:
token = match.group(0)
if token in NON_SPELLED_ACRONYMS:
return token
return spell_out_acronym(token, mode=acronym_mode)
# Compound zuerst: "US-Präsident" → "U Es Präsident" (Bindestrich weg)
text = ACRONYM_COMPOUND_RE.sub(repl_compound, text)
# Dann verbleibende Akronyme buchstabieren
text = UPPER_ACRONYM_RE.sub(repl, text)
text = re.sub(r'\s+', ' ', text).strip()
return text
def split_long_text(text: str, max_len: int = 400) -> List[str]:
chunks = []
current = ""
for part in text.split("\n\n"):
part = part.strip()
if not part:
continue
sentences = SENTENCE_END_RE.findall(part)
consumed = "".join(sentences).strip()
rest = part[len(consumed):].strip()
if rest:
sentences.append(rest)
for sentence in sentences:
sentence = sentence.strip()
if not sentence:
continue
if len(sentence) > max_len:
if current:
chunks.append(current.strip())
current = ""
chunks.extend(force_split_sentence(sentence, max_len))
continue
if current and len(current) + 1 + len(sentence) > max_len:
chunks.append(current.strip())
current = sentence
else:
current = f"{current} {sentence}".strip() if current else sentence
if current:
chunks.append(current.strip())
current = ""
return chunks
def split_for_conversation(text: str, first_chunk_len: int = 120, max_len: int = 400) -> List[str]:
base_chunks = split_long_text(text, max_len=max_len)
if not base_chunks:
return []
first = base_chunks[0]
if len(first) <= first_chunk_len:
return base_chunks
early = force_split_sentence(first, first_chunk_len)
return early + base_chunks[1:]
def force_split_sentence(text: str, max_len: int) -> List[str]:
text = re.sub(r"\s+", " ", text).strip()
if len(text) <= max_len:
return [text]
parts = []
remaining = text
while len(remaining) > max_len:
split_pos = remaining.rfind(" ", 0, max_len + 1)
if split_pos <= 0:
# Kein Leerzeichen vor max_len — vorwärts zum nächsten Wortende suchen
next_space = remaining.find(" ", max_len)
split_pos = next_space if next_space != -1 else len(remaining)
2026-05-16 08:56:50 +02:00
parts.append(remaining[:split_pos].strip())
remaining = remaining[split_pos:].strip()
if remaining:
parts.append(remaining)
return parts
def split_into_sentences(text: str, max_len: int = 200) -> List[str]:
result = []
for part in text.split("\n\n"):
part = part.strip()
if not part:
continue
if SEPARATOR_LINE_RE.match(part):
continue
sentences = SENTENCE_END_RE.findall(part)
consumed = "".join(sentences).strip()
rest = part[len(consumed):].strip()
if rest:
sentences.append(rest)
for sentence in sentences:
sentence = sentence.strip()
if not sentence:
continue
if len(sentence) > max_len:
result.extend(force_split_sentence(sentence, max_len))
else:
result.append(sentence)
return result
def read_input_text(text_arg: Optional[str], input_path: Optional[str]) -> str:
if text_arg and input_path:
raise ValueError("Bitte entweder --text oder --input angeben, nicht beides.")
if not text_arg and not input_path:
raise ValueError("Bitte --text oder --input angeben.")
if text_arg:
return text_arg.strip()
path = Path(input_path)
if not path.exists():
raise FileNotFoundError(f"Input-Datei nicht gefunden: {path}")
return path.read_text(encoding="utf-8").strip()
def default_output_path(input_path: Optional[str], lang: str) -> Path:
if input_path:
src = Path(input_path)
return src.with_suffix(f".{lang}.wav")
return Path(f"tts_output.{lang}.wav")
def load_model(lang: str, device: str, t3_model: Optional[str] = None):
if lang == "en":
model = ChatterboxTTS.from_pretrained(device=device)
return model, "mono", model.sr
if not HAS_MULTILINGUAL:
raise RuntimeError(
"Multilingual-Modell nicht verfügbar. Installiere ein Chatterbox-Paket mit chatterbox.mtl_tts."
)
model = ChatterboxMultilingualTTS.from_pretrained(device=device, t3_model=t3_model)
return model, "multi", model.sr
def generate_chunk(model, model_kind: str, text: str, lang: str, voice_path: Optional[str]):
kwargs = {}
if voice_path:
kwargs["audio_prompt_path"] = voice_path
if model_kind == "mono":
return model.generate(text, **kwargs)
return model.generate(text, language_id=lang, **kwargs)
def generate_stream_chunk(
model,
model_kind: str,
text: str,
lang: str,
voice_path: Optional[str],
stream_chunk_size: int,
):
kwargs = {
"chunk_size": stream_chunk_size,
"print_metrics": False,
}
if voice_path:
kwargs["audio_prompt_path"] = voice_path
if model_kind == "mono":
return model.generate_stream(text, **kwargs)
return model.generate_stream(text, language_id=lang, **kwargs)
class PlaybackWorker:
PLAYBACK_RATE = 48000 # PipeWire/PulseAudio standard
CALLBACK_BLOCK = 2048 # ~43 ms pro Callback-Block bei 48 kHz
def __init__(self, sample_rate: int, device: Optional[str] = "pulse", speed: float = 1.0,
stop_event: Optional[threading.Event] = None):
2026-05-16 08:56:50 +02:00
self.sample_rate = sample_rate
self.device = device
self.speed = speed
self.stop_event = stop_event
2026-05-16 08:56:50 +02:00
# Eingang: Torch-Tensoren vom TTS-Modell
self.audio_queue: "queue.Queue[Optional[torch.Tensor]]" = queue.Queue()
# Intern: fertig vorbereitete numpy-Blöcke für den Callback
self._block_queue: "queue.Queue" = queue.Queue(maxsize=500)
self._blocks_produced = 0
self._blocks_consumed = 0
self.thread = None
self.error = None
def start(self):
if not has_module("sounddevice"):
raise RuntimeError(
"Für Live-Wiedergabe ist das Modul 'sounddevice' nötig. Installiere z. B. 'pip install sounddevice'."
)
self.thread = threading.Thread(target=self._run, daemon=True)
self.thread.start()
def _callback(self, outdata, frames, time_info, status):
# Läuft im Audio-Thread: so schnell wie möglich, kein Lock nötig.
if self.stop_event and self.stop_event.is_set():
outdata[:] = 0.0
return
2026-05-16 08:56:50 +02:00
try:
data = self._block_queue.get_nowait()
outdata[:, 0] = data
self._blocks_consumed += 1
except queue.Empty:
outdata[:] = 0.0 # Stille statt Underrun-Klick
def _produce(self):
"""Wandelt Torch-Tensoren in CALLBACK_BLOCK-große numpy-Arrays um."""
import numpy as np
remainder = np.zeros(0, dtype="float32")
while True:
if self.stop_event and self.stop_event.is_set():
break
2026-05-16 08:56:50 +02:00
item = self.audio_queue.get()
if item is None:
break
chunk = item.detach().cpu()
if chunk.ndim == 2:
chunk = chunk.squeeze(0)
if self.speed != 1.0:
import pyrubberband as pyrb
# R3-Engine (--fine): deutlich weniger Phasiness als R2, besser für Sprache.
# rate < 1.0 = langsamer, rate > 1.0 = schneller; Pitch bleibt gleich.
stretched = pyrb.time_stretch(
chunk.numpy().astype("float64"), self.sample_rate, self.speed,
rbargs={"--fine": ""},
)
chunk = torch.from_numpy(stretched.astype("float32"))
chunk = ta.functional.resample(chunk, self.sample_rate, self.PLAYBACK_RATE)
samples = np.concatenate([remainder, chunk.numpy().astype("float32")])
i = 0
while i + self.CALLBACK_BLOCK <= len(samples):
self._block_queue.put(samples[i : i + self.CALLBACK_BLOCK])
self._blocks_produced += 1
i += self.CALLBACK_BLOCK
remainder = samples[i:]
# Restliche Samples (< CALLBACK_BLOCK) mit Stille auffüllen
if len(remainder) > 0:
block = np.zeros(self.CALLBACK_BLOCK, dtype="float32")
block[: len(remainder)] = remainder
self._block_queue.put(block)
self._blocks_produced += 1
def _run(self):
try:
import sounddevice as sd
producer = threading.Thread(target=self._produce, daemon=True)
producer.start()
with sd.OutputStream(
samplerate=self.PLAYBACK_RATE,
channels=1,
dtype="float32",
device=self.device,
blocksize=self.CALLBACK_BLOCK,
callback=self._callback,
):
producer.join() # alle Tensoren sind zu Blöcken konvertiert
# Warten bis der Callback alle Blöcke abgespielt hat
while self._blocks_consumed < self._blocks_produced:
time.sleep(0.02)
# Letzten Block aus Hardware-Buffer ausspielen lassen
time.sleep(self.CALLBACK_BLOCK / self.PLAYBACK_RATE + 0.1)
except Exception as e:
self.error = e
def put(self, chunk: torch.Tensor):
self.audio_queue.put(chunk)
def stop(self):
self.audio_queue.put(None)
if self.thread:
self.thread.join()
if self.error:
raise RuntimeError(f"Fehler bei Live-Wiedergabe: {self.error}")
def synthesize_non_streaming(
text: str,
lang: str,
output_path: Optional[Path],
max_len: int,
first_chunk_len: int,
voice_path: Optional[str],
device: str,
show_progress: bool = True,
spell_uppercase_acronyms: bool = True,
acronym_mode: Optional[str] = None,
normalize_time_values: bool = True,
normalize_year_values: bool = True,
normalize_units_values: bool = True,
conversation_mode: bool = True,
play_audio: bool = False,
save_wav: bool = True,
audio_device: Optional[str] = "pulse",
sentence_mode: bool = True,
speed: float = 1.0,
debug_delay: float = 0.0,
t3_model: Optional[str] = None,
pronunciation_dict: Optional[dict] = None,
stop_event: Optional[threading.Event] = None,
2026-05-16 08:56:50 +02:00
) -> Optional[Path]:
if lang not in SUPPORTED_LANGS:
raise ValueError(
f"Nicht unterstützte Sprache '{lang}'. Unterstützt: {', '.join(sorted(SUPPORTED_LANGS))}"
)
if voice_path and not Path(voice_path).exists():
raise FileNotFoundError(f"Voice-Referenz nicht gefunden: {voice_path}")
# Erst unsichtbare Zeichen entfernen, dann Sätze splitten (Paragraphen-Struktur erhalten),
# danach erst Akronym-Expansion — sonst erzeugen "A. R. D."-Punkte falsche Satzgrenzen.
text = clean_raw_text(text)
model, model_kind, sr = load_model(lang, device, t3_model=t3_model)
if sentence_mode:
raw_chunks = split_into_sentences(text, max_len=max_len)
elif conversation_mode:
raw_chunks = split_for_conversation(text, first_chunk_len=first_chunk_len, max_len=max_len)
else:
raw_chunks = split_long_text(text, max_len=max_len)
preprocess_kw = dict(
lang=lang,
spell_uppercase_acronyms=spell_uppercase_acronyms,
acronym_mode=acronym_mode,
normalize_time_values=normalize_time_values,
normalize_year_values=normalize_year_values,
normalize_units_values=normalize_units_values,
pronunciation_dict=pronunciation_dict,
)
chunks = [preprocess_tts_text(c, **preprocess_kw) for c in raw_chunks]
chunks = [c for c in chunks if c.strip()]
if not chunks:
raise ValueError("Kein verwertbarer Text nach dem Einlesen gefunden.")
if show_progress:
print(f"Sprache: {lang}")
print(f"Gerät: {device}")
print(f"Modell: {'ChatterboxTTS (monolingual)' if model_kind == 'mono' else 'ChatterboxMultilingualTTS'}")
print(f"Sätze: {len(chunks)}")
print(f"Modus: {'Satz-für-Satz' if sentence_mode else 'non-streaming'} + Playback")
print(f"Live-Wiedergabe: {'ja' if play_audio else 'nein'}")
print(f"WAV speichern: {'ja' if save_wav and output_path else 'nein'}")
if output_path and save_wav:
print(f"Ausgabe: {output_path}")
if play_audio:
playback = PlaybackWorker(sample_rate=sr, device=audio_device, speed=speed,
stop_event=stop_event)
2026-05-16 08:56:50 +02:00
playback.start()
else:
playback = None
wavs = []
try:
for i, chunk in enumerate(chunks, start=1):
if stop_event and stop_event.is_set():
if show_progress:
print("Abbruch angefordert Synthese gestoppt.")
break
2026-05-16 08:56:50 +02:00
if debug_delay > 0:
if show_progress:
print(f"[{i}/{len(chunks)}] Warte {debug_delay:.0f}s (debug_delay) ...")
time.sleep(debug_delay)
if show_progress:
print(f"[{i}/{len(chunks)}] Generiere ({len(chunk)} Zeichen) ...")
wav = generate_chunk(model, model_kind, chunk, lang, voice_path)
wavs.append(wav)
if playback is not None:
playback.put(wav)
finally:
if playback is not None:
playback.stop()
if not wavs:
return None
final_wav = wavs[0] if len(wavs) == 1 else torch.cat(wavs, dim=-1)
if save_wav and output_path:
output_path.parent.mkdir(parents=True, exist_ok=True)
ta.save(str(output_path), final_wav, sr)
return output_path
return None
def synthesize_streaming(
text: str,
lang: str,
output_path: Optional[Path],
max_len: int,
first_chunk_len: int,
voice_path: Optional[str],
device: str,
show_progress: bool = True,
spell_uppercase_acronyms: bool = True,
acronym_mode: str = "period_space",
normalize_time_values: bool = True,
normalize_year_values: bool = True,
normalize_units_values: bool = True,
conversation_mode: bool = True,
play_audio: bool = True,
save_wav: bool = True,
stream_chunk_size: int = 25,
audio_device: Optional[str] = None,
stop_event: Optional[threading.Event] = None,
2026-05-16 08:56:50 +02:00
) -> Optional[Path]:
if lang not in SUPPORTED_LANGS:
raise ValueError(
f"Nicht unterstützte Sprache '{lang}'. Unterstützt: {', '.join(sorted(SUPPORTED_LANGS))}"
)
if voice_path and not Path(voice_path).exists():
raise FileNotFoundError(f"Voice-Referenz nicht gefunden: {voice_path}")
# Erst bereinigen und splitten, dann pro Chunk normalisieren —
# sonst erzeugen Akronym-Punkte ("ARD" → "Ah Er De.") falsche Satzgrenzen.
text = clean_raw_text(text)
2026-05-16 08:56:50 +02:00
model, model_kind, sr = load_model(lang, device)
if not hasattr(model, "generate_stream"):
raise RuntimeError(
"Dieses Chatterbox-Paket bietet kein generate_stream(). "
"Installiere z. B. 'chatterbox-streaming'."
)
if conversation_mode:
raw_chunks = split_for_conversation(text, first_chunk_len=first_chunk_len, max_len=max_len)
2026-05-16 08:56:50 +02:00
else:
raw_chunks = split_long_text(text, max_len=max_len)
preprocess_kw = dict(
lang=lang,
spell_uppercase_acronyms=spell_uppercase_acronyms,
acronym_mode=acronym_mode,
normalize_time_values=normalize_time_values,
normalize_year_values=normalize_year_values,
normalize_units_values=normalize_units_values,
)
text_chunks = [preprocess_tts_text(c, **preprocess_kw) for c in raw_chunks]
text_chunks = [c for c in text_chunks if c.strip()]
2026-05-16 08:56:50 +02:00
if not text_chunks:
raise ValueError("Kein verwertbarer Text nach dem Einlesen gefunden.")
if play_audio:
playback = PlaybackWorker(sample_rate=sr, device=audio_device, stop_event=stop_event)
2026-05-16 08:56:50 +02:00
playback.start()
else:
playback = None
all_audio_chunks: List[torch.Tensor] = []
t0 = time.perf_counter()
first_audio_started = False
if show_progress:
print(f"Sprache: {lang}")
print(f"Gerät: {device}")
print(f"Modell: {'ChatterboxTTS (monolingual)' if model_kind == 'mono' else 'ChatterboxMultilingualTTS'}")
print(f"Text-Chunks: {len(text_chunks)}")
print(f"Modus: streaming")
print(f"Gesprächsmodus: {'ja' if conversation_mode else 'nein'}")
print(f"Live-Wiedergabe: {'ja' if play_audio else 'nein'}")
print(f"WAV speichern: {'ja' if save_wav and output_path else 'nein'}")
print(f"Streaming chunk_size: {stream_chunk_size}")
if output_path:
print(f"Ausgabe: {output_path}")
try:
for text_idx, text_chunk in enumerate(text_chunks, start=1):
if stop_event and stop_event.is_set():
if show_progress:
print("Abbruch angefordert Streaming gestoppt.")
break
2026-05-16 08:56:50 +02:00
if show_progress:
print(f"[Text {text_idx}/{len(text_chunks)}] Starte Streaming für {len(text_chunk)} Zeichen ...")
stream_iter = generate_stream_chunk(
model=model,
model_kind=model_kind,
text=text_chunk,
lang=lang,
voice_path=voice_path,
stream_chunk_size=stream_chunk_size,
)
for audio_idx, item in enumerate(stream_iter, start=1):
if stop_event and stop_event.is_set():
break
2026-05-16 08:56:50 +02:00
if isinstance(item, tuple) and len(item) == 2:
audio_chunk, metrics = item
else:
audio_chunk, metrics = item, None
all_audio_chunks.append(audio_chunk)
if playback is not None:
playback.put(audio_chunk)
if not first_audio_started:
first_audio_started = True
if show_progress:
dt = time.perf_counter() - t0
print(f"Audio-Wiedergabe gestartet nach {dt:.3f}s")
if show_progress:
msg = f" -> Audio-Chunk {audio_idx}"
if metrics is not None:
latency = getattr(metrics, "latency_to_first_chunk", None)
rtf = getattr(metrics, "rtf", None)
chunk_count = getattr(metrics, "chunk_count", None)
if chunk_count is not None:
msg += f", model_chunk={chunk_count}"
if latency:
msg += f", first_latency={latency:.3f}s"
if rtf:
msg += f", rtf={rtf:.3f}"
print(msg)
finally:
if playback is not None:
playback.stop()
final_output = None
if save_wav and output_path:
final_audio = all_audio_chunks[0] if len(all_audio_chunks) == 1 else torch.cat(all_audio_chunks, dim=-1)
output_path.parent.mkdir(parents=True, exist_ok=True)
ta.save(str(output_path), final_audio, sr)
final_output = output_path
return final_output
def build_argparser() -> argparse.ArgumentParser:
p = argparse.ArgumentParser(
description="Low-Latency Chatterbox TTS CLI mit deutscher Text-Normalisierung und optionalem Streaming."
)
p.add_argument("--text", type=str, help="Direkter Eingabetext.")
p.add_argument("--input", type=str, help="Pfad zu UTF-8-Textdatei.")
p.add_argument("--lang", type=str, default="de", help="Sprachcode, default: de.")
p.add_argument("--len", dest="max_len", type=int, default=400, help="Maximale Chunk-Länge, default: 400.")
p.add_argument("--first-chunk-len", type=int, default=80, help="Kleinere Zielgröße für den ersten Chunk im Gesprächsmodus. Default: 80.")
p.add_argument("--output", type=str, help="Ausgabedatei .wav")
p.add_argument("--voice", type=str, help="Optionale Referenz-WAV für Voice-Cloning.")
p.add_argument("--device", type=str, default=None, help="z. B. cuda:0 oder cpu.")
p.add_argument("--no-progress", action="store_true", help="Weniger Konsolen-Output.")
p.add_argument("--no-spell-acronyms", action="store_true", help="Großgeschriebene Akronyme nicht buchstabieren.")
p.add_argument(
"--acronym-mode",
type=str,
default=None, # None = automatisch: 'german' bei de, 'period_space' sonst
choices=["space", "period", "comma", "period_space", "german"],
help="Ausgabeformat für buchstabierte Akronyme. Default: 'german' bei --lang de, sonst 'period_space'."
)
p.add_argument("--pronunciation-dict", type=str, default=None, help="Pfad zu einer JSON-Datei mit Aussprache-Substitutionen (Eigenname → Lautschrift).")
p.add_argument("--no-normalize-times", action="store_true", help="Uhrzeiten nicht in sprechbaren Text umwandeln.")
p.add_argument("--no-normalize-years", action="store_true", help="Jahreszahlen nicht in sprechbaren Text umwandeln.")
p.add_argument("--no-normalize-units", action="store_true", help="Einheiten nicht in sprechbaren Text umwandeln.")
p.add_argument("--stream", action="store_true", help="Streaming-TTS-Modus (experimentell, kann abgehackt klingen).")
p.add_argument("--no-play", action="store_true", help="Nicht live abspielen.")
p.add_argument("--audio-device", type=str, default="pulse", help="Sounddevice-Ausgabegerät, z. B. 'pulse' oder 'M2: USB Audio'. Standard: pulse.")
p.add_argument("--save", action="store_true", help="WAV-Datei speichern (Standard: nein).")
p.add_argument("--stream-chunk-size", type=int, default=12, help="Streaming chunk_size (nur mit --stream). Default: 12.")
p.add_argument("--no-sentence-mode", action="store_true", help="Sätze zu größeren Chunks gruppieren statt einzeln ausgeben.")
p.add_argument("--speed", type=float, default=1.0, help="Wiedergabegeschwindigkeit: 0.8 = 20%% langsamer, 1.2 = 20%% schneller. Default: 1.0.")
p.add_argument("--debug-delay", type=float, default=0.0, help="Sekunden Pause vor jedem Satz (simuliert langsame KI). Nur zum Testen.")
p.add_argument("--t3-model", type=str, default="v3", help="Multilingual T3-Modell: 'v3' (default), 'v2' oder Dateiname.")
p.add_argument("--no-conversation-mode", action="store_true", help="Ersten Chunk nicht künstlich kleiner machen (nur ohne --no-sentence-mode).")
p.add_argument("--stop", action="store_true", help="Globales Stop-Signal setzen (für Tests und Service-Integration).")
2026-05-16 08:56:50 +02:00
return p
def main() -> int:
parser = build_argparser()
args = parser.parse_args()
if args.stop:
request_stop()
print("Stop-Signal gesetzt.")
return 0
2026-05-16 08:56:50 +02:00
try:
text = read_input_text(args.text, args.input)
device = get_device(args.device)
output_path = Path(args.output) if args.output else default_output_path(args.input, args.lang)
save_wav = args.save or bool(args.output)
# Acronym-Mode-Default: 'german' bei Deutsch, 'period_space' sonst
acronym_mode = args.acronym_mode or ("german" if args.lang == "de" else "period_space")
# Optionales Aussprache-Wörterbuch laden
pronunciation_dict: Optional[dict] = None
if args.pronunciation_dict:
import json
pron_path = Path(args.pronunciation_dict)
if not pron_path.exists():
raise FileNotFoundError(f"Aussprache-Dict nicht gefunden: {pron_path}")
pronunciation_dict = json.loads(pron_path.read_text(encoding="utf-8"))
clear_stop()
2026-05-16 08:56:50 +02:00
if args.stream:
out = synthesize_streaming(
text=text,
lang=args.lang,
output_path=output_path if save_wav else None,
max_len=args.max_len,
first_chunk_len=args.first_chunk_len,
voice_path=args.voice,
device=device,
show_progress=not args.no_progress,
spell_uppercase_acronyms=not args.no_spell_acronyms,
acronym_mode=acronym_mode,
normalize_time_values=not args.no_normalize_times,
normalize_year_values=not args.no_normalize_years,
normalize_units_values=not args.no_normalize_units,
conversation_mode=not args.no_conversation_mode,
play_audio=not args.no_play,
save_wav=save_wav,
stream_chunk_size=args.stream_chunk_size,
audio_device=args.audio_device,
stop_event=STOP_REQUESTED,
2026-05-16 08:56:50 +02:00
)
else:
out = synthesize_non_streaming(
text=text,
lang=args.lang,
output_path=output_path if save_wav else None,
max_len=args.max_len,
first_chunk_len=args.first_chunk_len,
voice_path=args.voice,
device=device,
show_progress=not args.no_progress,
spell_uppercase_acronyms=not args.no_spell_acronyms,
acronym_mode=acronym_mode,
normalize_time_values=not args.no_normalize_times,
normalize_year_values=not args.no_normalize_years,
normalize_units_values=not args.no_normalize_units,
conversation_mode=not args.no_conversation_mode,
play_audio=not args.no_play,
save_wav=save_wav,
audio_device=args.audio_device,
sentence_mode=not args.no_sentence_mode,
speed=args.speed,
debug_delay=args.debug_delay,
t3_model=args.t3_model,
pronunciation_dict=pronunciation_dict,
stop_event=STOP_REQUESTED,
2026-05-16 08:56:50 +02:00
)
if out is not None:
print(f"Fertig: {out}")
else:
print("Fertig.")
return 0
except Exception as e:
print(f"Fehler: {e}", file=sys.stderr)
return 1
if __name__ == "__main__":
raise SystemExit(main())