#!/usr/bin/env python3 import argparse import importlib.util import queue import re import sys import threading import time from pathlib import Path from typing import List, Optional, Tuple # --------------------------------------------------------------------------- # Kooperativer Stop-Mechanismus # --------------------------------------------------------------------------- STOP_REQUESTED = threading.Event() def request_stop() -> None: STOP_REQUESTED.set() def clear_stop() -> None: STOP_REQUESTED.clear() def stop_requested() -> bool: return STOP_REQUESTED.is_set() import torch import torchaudio as ta # SDPA does not support output_attentions=True (required by AlignmentStreamAnalyzer hook); # fall back to eager attention so attention weights are returned as tensors, not None. import chatterbox.models.t3.llama_configs as _llama_cfg _llama_cfg.LLAMA_520M_CONFIG_DICT["attn_implementation"] = "eager" from chatterbox.tts import ChatterboxTTS try: from chatterbox.mtl_tts import ChatterboxMultilingualTTS HAS_MULTILINGUAL = True except Exception: ChatterboxMultilingualTTS = None HAS_MULTILINGUAL = False SUPPORTED_LANGS = { "ar", "da", "de", "el", "en", "es", "fi", "fr", "he", "hi", "it", "ja", "ko", "ms", "nl", "no", "pl", "pt", "ru", "sv", "sw", "tr", "zh" } SENTENCE_END_RE = re.compile( r'.+?(?:' r'\.\.\.|…|' r'[!?¡¿]+|' r'[!?。]+|' r'‽|' r'(? str: for phrase, replacement in sorted(pron_dict.items(), key=lambda x: len(x[0]), reverse=True): text = text.replace(phrase, replacement) return text def clean_raw_text(text: str) -> str: """Unsichtbare Steuerzeichen entfernen, die Splitting oder TTS stoeren.""" for ch in ('​', '‌', '‍', ''): text = text.replace(ch, '') return text def has_module(name: str) -> bool: return importlib.util.find_spec(name) is not None def get_device(explicit_device: Optional[str] = None) -> str: if explicit_device: if explicit_device.startswith("cuda") and not torch.cuda.is_available(): raise RuntimeError("CUDA angefordert, aber keine CUDA-GPU verfügbar.") if explicit_device.startswith("cuda"): try: idx = int(explicit_device.split(":")[1]) if ":" in explicit_device else 0 except (IndexError, ValueError): idx = 0 torch.cuda.set_device(idx) return explicit_device if torch.cuda.is_available(): torch.cuda.set_device(0) return "cuda:0" return "cpu" def number_to_words_de(n: int) -> str: ones = { 0: "null", 1: "eins", 2: "zwei", 3: "drei", 4: "vier", 5: "fünf", 6: "sechs", 7: "sieben", 8: "acht", 9: "neun", 10: "zehn", 11: "elf", 12: "zwölf", 13: "dreizehn", 14: "vierzehn", 15: "fünfzehn", 16: "sechzehn", 17: "siebzehn", 18: "achtzehn", 19: "neunzehn" } tens = { 20: "zwanzig", 30: "dreißig", 40: "vierzig", 50: "fünfzig", 60: "sechzig", 70: "siebzig", 80: "achtzig", 90: "neunzig" } if n < 20: return ones[n] if n < 100: t = (n // 10) * 10 o = n % 10 if o == 0: return tens[t] one_prefix = "ein" if o == 1 else ones[o] return f"{one_prefix}und{tens[t]}" if n < 1000: h = n // 100 r = n % 100 prefix = "einhundert" if h == 1 else f"{ones[h]}hundert" return prefix if r == 0 else f"{prefix}{number_to_words_de(r)}" if n < 1000000: th = n // 1000 r = n % 1000 prefix = "eintausend" if th == 1 else f"{number_to_words_de(th)}tausend" return prefix if r == 0 else f"{prefix}{number_to_words_de(r)}" return str(n) def number_to_words_en(n: int) -> str: ones = { 0: "zero", 1: "one", 2: "two", 3: "three", 4: "four", 5: "five", 6: "six", 7: "seven", 8: "eight", 9: "nine", 10: "ten", 11: "eleven", 12: "twelve", 13: "thirteen", 14: "fourteen", 15: "fifteen", 16: "sixteen", 17: "seventeen", 18: "eighteen", 19: "nineteen" } tens = { 20: "twenty", 30: "thirty", 40: "forty", 50: "fifty", 60: "sixty", 70: "seventy", 80: "eighty", 90: "ninety" } if n < 20: return ones[n] if n < 100: t = (n // 10) * 10 o = n % 10 return tens[t] if o == 0 else f"{tens[t]}-{ones[o]}" if n < 1000: h = n // 100 r = n % 100 prefix = f"{ones[h]} hundred" return prefix if r == 0 else f"{prefix} {number_to_words_en(r)}" if n < 1000000: th = n // 1000 r = n % 1000 prefix = f"{number_to_words_en(th)} thousand" return prefix if r == 0 else f"{prefix} {number_to_words_en(r)}" return str(n) def year_to_words_de(year: int) -> str: if year < 1000 or year > 9999: return str(year) if year == 2000: return "zweitausend" if 2001 <= year <= 2099: return f"zweitausend{number_to_words_de(year - 2000)}" return number_to_words_de(year) def year_to_words_en(year: int) -> str: if year < 1000 or year > 9999: return str(year) if 2000 <= year <= 2009: if year == 2000: return "two thousand" return f"two thousand {number_to_words_en(year - 2000)}" if 2010 <= year <= 2099: last_two = year % 100 return f"twenty {number_to_words_en(last_two)}" first_two = year // 100 last_two = year % 100 if last_two == 0: return f"{number_to_words_en(first_two)} hundred" return f"{number_to_words_en(first_two)} {number_to_words_en(last_two)}" def spell_out_acronym(token: str, mode: str = "period_space") -> str: chars = list(token) if mode == "german": return " ".join(GERMAN_LETTER_NAMES.get(c, c) for c in chars) if mode == "space": return " ".join(chars) if mode == "period": return ".".join(chars) + "." if mode == "comma": return ", ".join(chars) if mode == "period_space": return ". ".join(chars) + "." raise ValueError(f"Unbekannter mode: {mode}") def normalize_units(text: str, lang: str) -> str: if lang != "de": return text for unit, expanded in sorted(UNIT_REPLACEMENTS.items(), key=lambda x: len(x[0]), reverse=True): text = re.sub(rf'(?<=\d)\s*{re.escape(unit)}\b', f" {expanded}", text) return text def normalize_times(text: str, lang: str) -> str: def repl(match: re.Match) -> str: hh = int(match.group(1)) mm = int(match.group(3)) if lang == "de": if mm == 0: return f"{number_to_words_de(hh)} Uhr" return f"{number_to_words_de(hh)} Uhr {number_to_words_de(mm)}" if lang == "en": if hh == 0 and mm == 0: return "twelve midnight" if hh == 12 and mm == 0: return "twelve noon" hour12 = hh % 12 if hour12 == 0: hour12 = 12 suffix = "a m" if hh < 12 else "p m" if mm == 0: return f"{number_to_words_en(hour12)} {suffix}" if mm < 10: return f"{number_to_words_en(hour12)} oh {number_to_words_en(mm)} {suffix}" return f"{number_to_words_en(hour12)} {number_to_words_en(mm)} {suffix}" return match.group(0) return TIME_RE.sub(repl, text) def normalize_years(text: str, lang: str) -> str: def repl(match: re.Match) -> str: year = int(match.group(1)) if lang == "de": return year_to_words_de(year) if lang == "en": return year_to_words_en(year) return match.group(0) return YEAR_RE.sub(repl, text) def preprocess_tts_text( text: str, lang: str, spell_uppercase_acronyms: bool = True, acronym_mode: Optional[str] = None, # None = auto: 'german' bei de, sonst 'period_space' normalize_time_values: bool = True, normalize_year_values: bool = True, normalize_units_values: bool = True, pronunciation_dict: Optional[dict] = None, ) -> str: if acronym_mode is None: acronym_mode = "german" if lang == "de" else "period_space" # 1. Aussprache-Wörterbuch zuerst (vor Akronym-Expansion, damit Eigennamen greifen) if lang == "de": text = apply_pronunciation_dict(text, DEFAULT_PRONUNCIATION_DE) if pronunciation_dict: text = apply_pronunciation_dict(text, pronunciation_dict) if normalize_units_values: text = normalize_units(text, lang) if normalize_time_values: text = normalize_times(text, lang) if normalize_year_values: text = normalize_years(text, lang) if spell_uppercase_acronyms: def repl_compound(match: re.Match) -> str: acr = match.group(1) if acr in NON_SPELLED_ACRONYMS: return acr + " " return spell_out_acronym(acr, mode=acronym_mode) + " " def repl(match: re.Match) -> str: token = match.group(0) if token in NON_SPELLED_ACRONYMS: return token return spell_out_acronym(token, mode=acronym_mode) # Compound zuerst: "US-Präsident" → "U Es Präsident" (Bindestrich weg) text = ACRONYM_COMPOUND_RE.sub(repl_compound, text) # Dann verbleibende Akronyme buchstabieren text = UPPER_ACRONYM_RE.sub(repl, text) text = re.sub(r'\s+', ' ', text).strip() return text def split_long_text(text: str, max_len: int = 400) -> List[str]: chunks = [] current = "" for part in text.split("\n\n"): part = part.strip() if not part: continue sentences = SENTENCE_END_RE.findall(part) consumed = "".join(sentences).strip() rest = part[len(consumed):].strip() if rest: sentences.append(rest) for sentence in sentences: sentence = sentence.strip() if not sentence: continue if len(sentence) > max_len: if current: chunks.append(current.strip()) current = "" chunks.extend(force_split_sentence(sentence, max_len)) continue if current and len(current) + 1 + len(sentence) > max_len: chunks.append(current.strip()) current = sentence else: current = f"{current} {sentence}".strip() if current else sentence if current: chunks.append(current.strip()) current = "" return chunks def split_for_conversation(text: str, first_chunk_len: int = 120, max_len: int = 400) -> List[str]: base_chunks = split_long_text(text, max_len=max_len) if not base_chunks: return [] first = base_chunks[0] if len(first) <= first_chunk_len: return base_chunks early = force_split_sentence(first, first_chunk_len) return early + base_chunks[1:] def force_split_sentence(text: str, max_len: int) -> List[str]: text = re.sub(r"\s+", " ", text).strip() if len(text) <= max_len: return [text] parts = [] remaining = text while len(remaining) > max_len: split_pos = remaining.rfind(" ", 0, max_len + 1) if split_pos <= 0: split_pos = max_len parts.append(remaining[:split_pos].strip()) remaining = remaining[split_pos:].strip() if remaining: parts.append(remaining) return parts def split_into_sentences(text: str, max_len: int = 200) -> List[str]: result = [] for part in text.split("\n\n"): part = part.strip() if not part: continue if SEPARATOR_LINE_RE.match(part): continue sentences = SENTENCE_END_RE.findall(part) consumed = "".join(sentences).strip() rest = part[len(consumed):].strip() if rest: sentences.append(rest) for sentence in sentences: sentence = sentence.strip() if not sentence: continue if len(sentence) > max_len: result.extend(force_split_sentence(sentence, max_len)) else: result.append(sentence) return result def read_input_text(text_arg: Optional[str], input_path: Optional[str]) -> str: if text_arg and input_path: raise ValueError("Bitte entweder --text oder --input angeben, nicht beides.") if not text_arg and not input_path: raise ValueError("Bitte --text oder --input angeben.") if text_arg: return text_arg.strip() path = Path(input_path) if not path.exists(): raise FileNotFoundError(f"Input-Datei nicht gefunden: {path}") return path.read_text(encoding="utf-8").strip() def default_output_path(input_path: Optional[str], lang: str) -> Path: if input_path: src = Path(input_path) return src.with_suffix(f".{lang}.wav") return Path(f"tts_output.{lang}.wav") def load_model(lang: str, device: str, t3_model: Optional[str] = None): if lang == "en": model = ChatterboxTTS.from_pretrained(device=device) return model, "mono", model.sr if not HAS_MULTILINGUAL: raise RuntimeError( "Multilingual-Modell nicht verfügbar. Installiere ein Chatterbox-Paket mit chatterbox.mtl_tts." ) model = ChatterboxMultilingualTTS.from_pretrained(device=device, t3_model=t3_model) return model, "multi", model.sr def generate_chunk(model, model_kind: str, text: str, lang: str, voice_path: Optional[str]): kwargs = {} if voice_path: kwargs["audio_prompt_path"] = voice_path if model_kind == "mono": return model.generate(text, **kwargs) return model.generate(text, language_id=lang, **kwargs) def generate_stream_chunk( model, model_kind: str, text: str, lang: str, voice_path: Optional[str], stream_chunk_size: int, ): kwargs = { "chunk_size": stream_chunk_size, "print_metrics": False, } if voice_path: kwargs["audio_prompt_path"] = voice_path if model_kind == "mono": return model.generate_stream(text, **kwargs) return model.generate_stream(text, language_id=lang, **kwargs) class PlaybackWorker: PLAYBACK_RATE = 48000 # PipeWire/PulseAudio standard CALLBACK_BLOCK = 2048 # ~43 ms pro Callback-Block bei 48 kHz def __init__(self, sample_rate: int, device: Optional[str] = "pulse", speed: float = 1.0, stop_event: Optional[threading.Event] = None): self.sample_rate = sample_rate self.device = device self.speed = speed self.stop_event = stop_event # Eingang: Torch-Tensoren vom TTS-Modell self.audio_queue: "queue.Queue[Optional[torch.Tensor]]" = queue.Queue() # Intern: fertig vorbereitete numpy-Blöcke für den Callback self._block_queue: "queue.Queue" = queue.Queue(maxsize=500) self._blocks_produced = 0 self._blocks_consumed = 0 self.thread = None self.error = None def start(self): if not has_module("sounddevice"): raise RuntimeError( "Für Live-Wiedergabe ist das Modul 'sounddevice' nötig. Installiere z. B. 'pip install sounddevice'." ) self.thread = threading.Thread(target=self._run, daemon=True) self.thread.start() def _callback(self, outdata, frames, time_info, status): # Läuft im Audio-Thread: so schnell wie möglich, kein Lock nötig. if self.stop_event and self.stop_event.is_set(): outdata[:] = 0.0 return try: data = self._block_queue.get_nowait() outdata[:, 0] = data self._blocks_consumed += 1 except queue.Empty: outdata[:] = 0.0 # Stille statt Underrun-Klick def _produce(self): """Wandelt Torch-Tensoren in CALLBACK_BLOCK-große numpy-Arrays um.""" import numpy as np remainder = np.zeros(0, dtype="float32") while True: if self.stop_event and self.stop_event.is_set(): break item = self.audio_queue.get() if item is None: break chunk = item.detach().cpu() if chunk.ndim == 2: chunk = chunk.squeeze(0) if self.speed != 1.0: import pyrubberband as pyrb # R3-Engine (--fine): deutlich weniger Phasiness als R2, besser für Sprache. # rate < 1.0 = langsamer, rate > 1.0 = schneller; Pitch bleibt gleich. stretched = pyrb.time_stretch( chunk.numpy().astype("float64"), self.sample_rate, self.speed, rbargs={"--fine": ""}, ) chunk = torch.from_numpy(stretched.astype("float32")) chunk = ta.functional.resample(chunk, self.sample_rate, self.PLAYBACK_RATE) samples = np.concatenate([remainder, chunk.numpy().astype("float32")]) i = 0 while i + self.CALLBACK_BLOCK <= len(samples): self._block_queue.put(samples[i : i + self.CALLBACK_BLOCK]) self._blocks_produced += 1 i += self.CALLBACK_BLOCK remainder = samples[i:] # Restliche Samples (< CALLBACK_BLOCK) mit Stille auffüllen if len(remainder) > 0: block = np.zeros(self.CALLBACK_BLOCK, dtype="float32") block[: len(remainder)] = remainder self._block_queue.put(block) self._blocks_produced += 1 def _run(self): try: import sounddevice as sd producer = threading.Thread(target=self._produce, daemon=True) producer.start() with sd.OutputStream( samplerate=self.PLAYBACK_RATE, channels=1, dtype="float32", device=self.device, blocksize=self.CALLBACK_BLOCK, callback=self._callback, ): producer.join() # alle Tensoren sind zu Blöcken konvertiert # Warten bis der Callback alle Blöcke abgespielt hat while self._blocks_consumed < self._blocks_produced: time.sleep(0.02) # Letzten Block aus Hardware-Buffer ausspielen lassen time.sleep(self.CALLBACK_BLOCK / self.PLAYBACK_RATE + 0.1) except Exception as e: self.error = e def put(self, chunk: torch.Tensor): self.audio_queue.put(chunk) def stop(self): self.audio_queue.put(None) if self.thread: self.thread.join() if self.error: raise RuntimeError(f"Fehler bei Live-Wiedergabe: {self.error}") def synthesize_non_streaming( text: str, lang: str, output_path: Optional[Path], max_len: int, first_chunk_len: int, voice_path: Optional[str], device: str, show_progress: bool = True, spell_uppercase_acronyms: bool = True, acronym_mode: Optional[str] = None, normalize_time_values: bool = True, normalize_year_values: bool = True, normalize_units_values: bool = True, conversation_mode: bool = True, play_audio: bool = False, save_wav: bool = True, audio_device: Optional[str] = "pulse", sentence_mode: bool = True, speed: float = 1.0, debug_delay: float = 0.0, t3_model: Optional[str] = None, pronunciation_dict: Optional[dict] = None, stop_event: Optional[threading.Event] = None, ) -> Optional[Path]: if lang not in SUPPORTED_LANGS: raise ValueError( f"Nicht unterstützte Sprache '{lang}'. Unterstützt: {', '.join(sorted(SUPPORTED_LANGS))}" ) if voice_path and not Path(voice_path).exists(): raise FileNotFoundError(f"Voice-Referenz nicht gefunden: {voice_path}") # Erst unsichtbare Zeichen entfernen, dann Sätze splitten (Paragraphen-Struktur erhalten), # danach erst Akronym-Expansion — sonst erzeugen "A. R. D."-Punkte falsche Satzgrenzen. text = clean_raw_text(text) model, model_kind, sr = load_model(lang, device, t3_model=t3_model) if sentence_mode: raw_chunks = split_into_sentences(text, max_len=max_len) elif conversation_mode: raw_chunks = split_for_conversation(text, first_chunk_len=first_chunk_len, max_len=max_len) else: raw_chunks = split_long_text(text, max_len=max_len) preprocess_kw = dict( lang=lang, spell_uppercase_acronyms=spell_uppercase_acronyms, acronym_mode=acronym_mode, normalize_time_values=normalize_time_values, normalize_year_values=normalize_year_values, normalize_units_values=normalize_units_values, pronunciation_dict=pronunciation_dict, ) chunks = [preprocess_tts_text(c, **preprocess_kw) for c in raw_chunks] chunks = [c for c in chunks if c.strip()] if not chunks: raise ValueError("Kein verwertbarer Text nach dem Einlesen gefunden.") if show_progress: print(f"Sprache: {lang}") print(f"Gerät: {device}") print(f"Modell: {'ChatterboxTTS (monolingual)' if model_kind == 'mono' else 'ChatterboxMultilingualTTS'}") print(f"Sätze: {len(chunks)}") print(f"Modus: {'Satz-für-Satz' if sentence_mode else 'non-streaming'} + Playback") print(f"Live-Wiedergabe: {'ja' if play_audio else 'nein'}") print(f"WAV speichern: {'ja' if save_wav and output_path else 'nein'}") if output_path and save_wav: print(f"Ausgabe: {output_path}") if play_audio: playback = PlaybackWorker(sample_rate=sr, device=audio_device, speed=speed, stop_event=stop_event) playback.start() else: playback = None wavs = [] try: for i, chunk in enumerate(chunks, start=1): if stop_event and stop_event.is_set(): if show_progress: print("Abbruch angefordert – Synthese gestoppt.") break if debug_delay > 0: if show_progress: print(f"[{i}/{len(chunks)}] Warte {debug_delay:.0f}s (debug_delay) ...") time.sleep(debug_delay) if show_progress: print(f"[{i}/{len(chunks)}] Generiere ({len(chunk)} Zeichen) ...") wav = generate_chunk(model, model_kind, chunk, lang, voice_path) wavs.append(wav) if playback is not None: playback.put(wav) finally: if playback is not None: playback.stop() if not wavs: return None final_wav = wavs[0] if len(wavs) == 1 else torch.cat(wavs, dim=-1) if save_wav and output_path: output_path.parent.mkdir(parents=True, exist_ok=True) ta.save(str(output_path), final_wav, sr) return output_path return None def synthesize_streaming( text: str, lang: str, output_path: Optional[Path], max_len: int, first_chunk_len: int, voice_path: Optional[str], device: str, show_progress: bool = True, spell_uppercase_acronyms: bool = True, acronym_mode: str = "period_space", normalize_time_values: bool = True, normalize_year_values: bool = True, normalize_units_values: bool = True, conversation_mode: bool = True, play_audio: bool = True, save_wav: bool = True, stream_chunk_size: int = 25, audio_device: Optional[str] = None, stop_event: Optional[threading.Event] = None, ) -> Optional[Path]: if lang not in SUPPORTED_LANGS: raise ValueError( f"Nicht unterstützte Sprache '{lang}'. Unterstützt: {', '.join(sorted(SUPPORTED_LANGS))}" ) if voice_path and not Path(voice_path).exists(): raise FileNotFoundError(f"Voice-Referenz nicht gefunden: {voice_path}") text = preprocess_tts_text( text=text, lang=lang, spell_uppercase_acronyms=spell_uppercase_acronyms, acronym_mode=acronym_mode, normalize_time_values=normalize_time_values, normalize_year_values=normalize_year_values, normalize_units_values=normalize_units_values, ) model, model_kind, sr = load_model(lang, device) if not hasattr(model, "generate_stream"): raise RuntimeError( "Dieses Chatterbox-Paket bietet kein generate_stream(). " "Installiere z. B. 'chatterbox-streaming'." ) if conversation_mode: text_chunks = split_for_conversation(text, first_chunk_len=first_chunk_len, max_len=max_len) else: text_chunks = split_long_text(text, max_len=max_len) if not text_chunks: raise ValueError("Kein verwertbarer Text nach dem Einlesen gefunden.") if play_audio: playback = PlaybackWorker(sample_rate=sr, device=audio_device, stop_event=stop_event) playback.start() else: playback = None all_audio_chunks: List[torch.Tensor] = [] t0 = time.perf_counter() first_audio_started = False if show_progress: print(f"Sprache: {lang}") print(f"Gerät: {device}") print(f"Modell: {'ChatterboxTTS (monolingual)' if model_kind == 'mono' else 'ChatterboxMultilingualTTS'}") print(f"Text-Chunks: {len(text_chunks)}") print(f"Modus: streaming") print(f"Gesprächsmodus: {'ja' if conversation_mode else 'nein'}") print(f"Live-Wiedergabe: {'ja' if play_audio else 'nein'}") print(f"WAV speichern: {'ja' if save_wav and output_path else 'nein'}") print(f"Streaming chunk_size: {stream_chunk_size}") if output_path: print(f"Ausgabe: {output_path}") try: for text_idx, text_chunk in enumerate(text_chunks, start=1): if stop_event and stop_event.is_set(): if show_progress: print("Abbruch angefordert – Streaming gestoppt.") break if show_progress: print(f"[Text {text_idx}/{len(text_chunks)}] Starte Streaming für {len(text_chunk)} Zeichen ...") stream_iter = generate_stream_chunk( model=model, model_kind=model_kind, text=text_chunk, lang=lang, voice_path=voice_path, stream_chunk_size=stream_chunk_size, ) for audio_idx, item in enumerate(stream_iter, start=1): if stop_event and stop_event.is_set(): break if isinstance(item, tuple) and len(item) == 2: audio_chunk, metrics = item else: audio_chunk, metrics = item, None all_audio_chunks.append(audio_chunk) if playback is not None: playback.put(audio_chunk) if not first_audio_started: first_audio_started = True if show_progress: dt = time.perf_counter() - t0 print(f"Audio-Wiedergabe gestartet nach {dt:.3f}s") if show_progress: msg = f" -> Audio-Chunk {audio_idx}" if metrics is not None: latency = getattr(metrics, "latency_to_first_chunk", None) rtf = getattr(metrics, "rtf", None) chunk_count = getattr(metrics, "chunk_count", None) if chunk_count is not None: msg += f", model_chunk={chunk_count}" if latency: msg += f", first_latency={latency:.3f}s" if rtf: msg += f", rtf={rtf:.3f}" print(msg) finally: if playback is not None: playback.stop() final_output = None if save_wav and output_path: final_audio = all_audio_chunks[0] if len(all_audio_chunks) == 1 else torch.cat(all_audio_chunks, dim=-1) output_path.parent.mkdir(parents=True, exist_ok=True) ta.save(str(output_path), final_audio, sr) final_output = output_path return final_output def build_argparser() -> argparse.ArgumentParser: p = argparse.ArgumentParser( description="Low-Latency Chatterbox TTS CLI mit deutscher Text-Normalisierung und optionalem Streaming." ) p.add_argument("--text", type=str, help="Direkter Eingabetext.") p.add_argument("--input", type=str, help="Pfad zu UTF-8-Textdatei.") p.add_argument("--lang", type=str, default="de", help="Sprachcode, default: de.") p.add_argument("--len", dest="max_len", type=int, default=400, help="Maximale Chunk-Länge, default: 400.") p.add_argument("--first-chunk-len", type=int, default=80, help="Kleinere Zielgröße für den ersten Chunk im Gesprächsmodus. Default: 80.") p.add_argument("--output", type=str, help="Ausgabedatei .wav") p.add_argument("--voice", type=str, help="Optionale Referenz-WAV für Voice-Cloning.") p.add_argument("--device", type=str, default=None, help="z. B. cuda:0 oder cpu.") p.add_argument("--no-progress", action="store_true", help="Weniger Konsolen-Output.") p.add_argument("--no-spell-acronyms", action="store_true", help="Großgeschriebene Akronyme nicht buchstabieren.") p.add_argument( "--acronym-mode", type=str, default=None, # None = automatisch: 'german' bei de, 'period_space' sonst choices=["space", "period", "comma", "period_space", "german"], help="Ausgabeformat für buchstabierte Akronyme. Default: 'german' bei --lang de, sonst 'period_space'." ) p.add_argument("--pronunciation-dict", type=str, default=None, help="Pfad zu einer JSON-Datei mit Aussprache-Substitutionen (Eigenname → Lautschrift).") p.add_argument("--no-normalize-times", action="store_true", help="Uhrzeiten nicht in sprechbaren Text umwandeln.") p.add_argument("--no-normalize-years", action="store_true", help="Jahreszahlen nicht in sprechbaren Text umwandeln.") p.add_argument("--no-normalize-units", action="store_true", help="Einheiten nicht in sprechbaren Text umwandeln.") p.add_argument("--stream", action="store_true", help="Streaming-TTS-Modus (experimentell, kann abgehackt klingen).") p.add_argument("--no-play", action="store_true", help="Nicht live abspielen.") p.add_argument("--audio-device", type=str, default="pulse", help="Sounddevice-Ausgabegerät, z. B. 'pulse' oder 'M2: USB Audio'. Standard: pulse.") p.add_argument("--save", action="store_true", help="WAV-Datei speichern (Standard: nein).") p.add_argument("--stream-chunk-size", type=int, default=12, help="Streaming chunk_size (nur mit --stream). Default: 12.") p.add_argument("--no-sentence-mode", action="store_true", help="Sätze zu größeren Chunks gruppieren statt einzeln ausgeben.") p.add_argument("--speed", type=float, default=1.0, help="Wiedergabegeschwindigkeit: 0.8 = 20%% langsamer, 1.2 = 20%% schneller. Default: 1.0.") p.add_argument("--debug-delay", type=float, default=0.0, help="Sekunden Pause vor jedem Satz (simuliert langsame KI). Nur zum Testen.") p.add_argument("--t3-model", type=str, default="v3", help="Multilingual T3-Modell: 'v3' (default), 'v2' oder Dateiname.") p.add_argument("--no-conversation-mode", action="store_true", help="Ersten Chunk nicht künstlich kleiner machen (nur ohne --no-sentence-mode).") p.add_argument("--stop", action="store_true", help="Globales Stop-Signal setzen (für Tests und Service-Integration).") return p def main() -> int: parser = build_argparser() args = parser.parse_args() if args.stop: request_stop() print("Stop-Signal gesetzt.") return 0 try: text = read_input_text(args.text, args.input) device = get_device(args.device) output_path = Path(args.output) if args.output else default_output_path(args.input, args.lang) save_wav = args.save or bool(args.output) # Acronym-Mode-Default: 'german' bei Deutsch, 'period_space' sonst acronym_mode = args.acronym_mode or ("german" if args.lang == "de" else "period_space") # Optionales Aussprache-Wörterbuch laden pronunciation_dict: Optional[dict] = None if args.pronunciation_dict: import json pron_path = Path(args.pronunciation_dict) if not pron_path.exists(): raise FileNotFoundError(f"Aussprache-Dict nicht gefunden: {pron_path}") pronunciation_dict = json.loads(pron_path.read_text(encoding="utf-8")) clear_stop() if args.stream: out = synthesize_streaming( text=text, lang=args.lang, output_path=output_path if save_wav else None, max_len=args.max_len, first_chunk_len=args.first_chunk_len, voice_path=args.voice, device=device, show_progress=not args.no_progress, spell_uppercase_acronyms=not args.no_spell_acronyms, acronym_mode=acronym_mode, normalize_time_values=not args.no_normalize_times, normalize_year_values=not args.no_normalize_years, normalize_units_values=not args.no_normalize_units, conversation_mode=not args.no_conversation_mode, play_audio=not args.no_play, save_wav=save_wav, stream_chunk_size=args.stream_chunk_size, audio_device=args.audio_device, stop_event=STOP_REQUESTED, ) else: out = synthesize_non_streaming( text=text, lang=args.lang, output_path=output_path if save_wav else None, max_len=args.max_len, first_chunk_len=args.first_chunk_len, voice_path=args.voice, device=device, show_progress=not args.no_progress, spell_uppercase_acronyms=not args.no_spell_acronyms, acronym_mode=acronym_mode, normalize_time_values=not args.no_normalize_times, normalize_year_values=not args.no_normalize_years, normalize_units_values=not args.no_normalize_units, conversation_mode=not args.no_conversation_mode, play_audio=not args.no_play, save_wav=save_wav, audio_device=args.audio_device, sentence_mode=not args.no_sentence_mode, speed=args.speed, debug_delay=args.debug_delay, t3_model=args.t3_model, pronunciation_dict=pronunciation_dict, stop_event=STOP_REQUESTED, ) if out is not None: print(f"Fertig: {out}") else: print("Fertig.") return 0 except Exception as e: print(f"Fehler: {e}", file=sys.stderr) return 1 if __name__ == "__main__": raise SystemExit(main())