#!/usr/bin/env python3 import argparse import importlib.util import queue import re import sys import threading import time from pathlib import Path from typing import List, Optional, Tuple import torch import torchaudio as ta # SDPA does not support output_attentions=True (required by AlignmentStreamAnalyzer hook); # fall back to eager attention so attention weights are returned as tensors, not None. import chatterbox.models.t3.llama_configs as _llama_cfg _llama_cfg.LLAMA_520M_CONFIG_DICT["attn_implementation"] = "eager" from chatterbox.tts import ChatterboxTTS try: from chatterbox.mtl_tts import ChatterboxMultilingualTTS HAS_MULTILINGUAL = True except Exception: ChatterboxMultilingualTTS = None HAS_MULTILINGUAL = False SUPPORTED_LANGS = { "ar", "da", "de", "el", "en", "es", "fi", "fr", "he", "hi", "it", "ja", "ko", "ms", "nl", "no", "pl", "pt", "ru", "sv", "sw", "tr", "zh" } SENTENCE_END_RE = re.compile( r'.+?(?:' r'\.\.\.|…|' r'[!?¡¿]+|' r'[!?。]+|' r'‽|' r'(? str: for phrase, replacement in sorted(pron_dict.items(), key=lambda x: len(x[0]), reverse=True): text = text.replace(phrase, replacement) return text def clean_raw_text(text: str) -> str: """Unsichtbare Steuerzeichen entfernen, die Splitting oder TTS stoeren.""" for ch in ('​', '‌', '‍', ''): text = text.replace(ch, '') return text def has_module(name: str) -> bool: return importlib.util.find_spec(name) is not None def get_device(explicit_device: Optional[str] = None) -> str: if explicit_device: if explicit_device.startswith("cuda") and not torch.cuda.is_available(): raise RuntimeError("CUDA angefordert, aber keine CUDA-GPU verfügbar.") if explicit_device.startswith("cuda"): try: idx = int(explicit_device.split(":")[1]) if ":" in explicit_device else 0 except (IndexError, ValueError): idx = 0 torch.cuda.set_device(idx) return explicit_device if torch.cuda.is_available(): torch.cuda.set_device(0) return "cuda:0" return "cpu" def number_to_words_de(n: int) -> str: ones = { 0: "null", 1: "eins", 2: "zwei", 3: "drei", 4: "vier", 5: "fünf", 6: "sechs", 7: "sieben", 8: "acht", 9: "neun", 10: "zehn", 11: "elf", 12: "zwölf", 13: "dreizehn", 14: "vierzehn", 15: "fünfzehn", 16: "sechzehn", 17: "siebzehn", 18: "achtzehn", 19: "neunzehn" } tens = { 20: "zwanzig", 30: "dreißig", 40: "vierzig", 50: "fünfzig", 60: "sechzig", 70: "siebzig", 80: "achtzig", 90: "neunzig" } if n < 20: return ones[n] if n < 100: t = (n // 10) * 10 o = n % 10 if o == 0: return tens[t] one_prefix = "ein" if o == 1 else ones[o] return f"{one_prefix}und{tens[t]}" if n < 1000: h = n // 100 r = n % 100 prefix = "einhundert" if h == 1 else f"{ones[h]}hundert" return prefix if r == 0 else f"{prefix}{number_to_words_de(r)}" if n < 1000000: th = n // 1000 r = n % 1000 prefix = "eintausend" if th == 1 else f"{number_to_words_de(th)}tausend" return prefix if r == 0 else f"{prefix}{number_to_words_de(r)}" return str(n) def number_to_words_en(n: int) -> str: ones = { 0: "zero", 1: "one", 2: "two", 3: "three", 4: "four", 5: "five", 6: "six", 7: "seven", 8: "eight", 9: "nine", 10: "ten", 11: "eleven", 12: "twelve", 13: "thirteen", 14: "fourteen", 15: "fifteen", 16: "sixteen", 17: "seventeen", 18: "eighteen", 19: "nineteen" } tens = { 20: "twenty", 30: "thirty", 40: "forty", 50: "fifty", 60: "sixty", 70: "seventy", 80: "eighty", 90: "ninety" } if n < 20: return ones[n] if n < 100: t = (n // 10) * 10 o = n % 10 return tens[t] if o == 0 else f"{tens[t]}-{ones[o]}" if n < 1000: h = n // 100 r = n % 100 prefix = f"{ones[h]} hundred" return prefix if r == 0 else f"{prefix} {number_to_words_en(r)}" if n < 1000000: th = n // 1000 r = n % 1000 prefix = f"{number_to_words_en(th)} thousand" return prefix if r == 0 else f"{prefix} {number_to_words_en(r)}" return str(n) def year_to_words_de(year: int) -> str: if year < 1000 or year > 9999: return str(year) if year == 2000: return "zweitausend" if 2001 <= year <= 2099: return f"zweitausend{number_to_words_de(year - 2000)}" return number_to_words_de(year) def year_to_words_en(year: int) -> str: if year < 1000 or year > 9999: return str(year) if 2000 <= year <= 2009: if year == 2000: return "two thousand" return f"two thousand {number_to_words_en(year - 2000)}" if 2010 <= year <= 2099: last_two = year % 100 return f"twenty {number_to_words_en(last_two)}" first_two = year // 100 last_two = year % 100 if last_two == 0: return f"{number_to_words_en(first_two)} hundred" return f"{number_to_words_en(first_two)} {number_to_words_en(last_two)}" def spell_out_acronym(token: str, mode: str = "period_space") -> str: chars = list(token) if mode == "german": return " ".join(GERMAN_LETTER_NAMES.get(c, c) for c in chars) if mode == "space": return " ".join(chars) if mode == "period": return ".".join(chars) + "." if mode == "comma": return ", ".join(chars) if mode == "period_space": return ". ".join(chars) + "." raise ValueError(f"Unbekannter mode: {mode}") def normalize_units(text: str, lang: str) -> str: if lang != "de": return text for unit, expanded in sorted(UNIT_REPLACEMENTS.items(), key=lambda x: len(x[0]), reverse=True): text = re.sub(rf'(?<=\d)\s*{re.escape(unit)}\b', f" {expanded}", text) return text def normalize_times(text: str, lang: str) -> str: def repl(match: re.Match) -> str: hh = int(match.group(1)) mm = int(match.group(3)) if lang == "de": if mm == 0: return f"{number_to_words_de(hh)} Uhr" return f"{number_to_words_de(hh)} Uhr {number_to_words_de(mm)}" if lang == "en": if hh == 0 and mm == 0: return "twelve midnight" if hh == 12 and mm == 0: return "twelve noon" hour12 = hh % 12 if hour12 == 0: hour12 = 12 suffix = "a m" if hh < 12 else "p m" if mm == 0: return f"{number_to_words_en(hour12)} {suffix}" if mm < 10: return f"{number_to_words_en(hour12)} oh {number_to_words_en(mm)} {suffix}" return f"{number_to_words_en(hour12)} {number_to_words_en(mm)} {suffix}" return match.group(0) return TIME_RE.sub(repl, text) def normalize_years(text: str, lang: str) -> str: def repl(match: re.Match) -> str: year = int(match.group(1)) if lang == "de": return year_to_words_de(year) if lang == "en": return year_to_words_en(year) return match.group(0) return YEAR_RE.sub(repl, text) def preprocess_tts_text( text: str, lang: str, spell_uppercase_acronyms: bool = True, acronym_mode: Optional[str] = None, # None = auto: 'german' bei de, sonst 'period_space' normalize_time_values: bool = True, normalize_year_values: bool = True, normalize_units_values: bool = True, pronunciation_dict: Optional[dict] = None, ) -> str: if acronym_mode is None: acronym_mode = "german" if lang == "de" else "period_space" # 1. Aussprache-Wörterbuch zuerst (vor Akronym-Expansion, damit Eigennamen greifen) if lang == "de": text = apply_pronunciation_dict(text, DEFAULT_PRONUNCIATION_DE) if pronunciation_dict: text = apply_pronunciation_dict(text, pronunciation_dict) if normalize_units_values: text = normalize_units(text, lang) if normalize_time_values: text = normalize_times(text, lang) if normalize_year_values: text = normalize_years(text, lang) if spell_uppercase_acronyms: def repl_compound(match: re.Match) -> str: acr = match.group(1) if acr in NON_SPELLED_ACRONYMS: return acr + " " return spell_out_acronym(acr, mode=acronym_mode) + " " def repl(match: re.Match) -> str: token = match.group(0) if token in NON_SPELLED_ACRONYMS: return token return spell_out_acronym(token, mode=acronym_mode) # Compound zuerst: "US-Präsident" → "U Es Präsident" (Bindestrich weg) text = ACRONYM_COMPOUND_RE.sub(repl_compound, text) # Dann verbleibende Akronyme buchstabieren text = UPPER_ACRONYM_RE.sub(repl, text) text = re.sub(r'\s+', ' ', text).strip() return text def split_long_text(text: str, max_len: int = 400) -> List[str]: chunks = [] current = "" for part in text.split("\n\n"): part = part.strip() if not part: continue sentences = SENTENCE_END_RE.findall(part) consumed = "".join(sentences).strip() rest = part[len(consumed):].strip() if rest: sentences.append(rest) for sentence in sentences: sentence = sentence.strip() if not sentence: continue if len(sentence) > max_len: if current: chunks.append(current.strip()) current = "" chunks.extend(force_split_sentence(sentence, max_len)) continue if current and len(current) + 1 + len(sentence) > max_len: chunks.append(current.strip()) current = sentence else: current = f"{current} {sentence}".strip() if current else sentence if current: chunks.append(current.strip()) current = "" return chunks def split_for_conversation(text: str, first_chunk_len: int = 120, max_len: int = 400) -> List[str]: base_chunks = split_long_text(text, max_len=max_len) if not base_chunks: return [] first = base_chunks[0] if len(first) <= first_chunk_len: return base_chunks early = force_split_sentence(first, first_chunk_len) return early + base_chunks[1:] def force_split_sentence(text: str, max_len: int) -> List[str]: text = re.sub(r"\s+", " ", text).strip() if len(text) <= max_len: return [text] parts = [] remaining = text while len(remaining) > max_len: split_pos = remaining.rfind(" ", 0, max_len + 1) if split_pos <= 0: split_pos = max_len parts.append(remaining[:split_pos].strip()) remaining = remaining[split_pos:].strip() if remaining: parts.append(remaining) return parts def split_into_sentences(text: str, max_len: int = 200) -> List[str]: result = [] for part in text.split("\n\n"): part = part.strip() if not part: continue if SEPARATOR_LINE_RE.match(part): continue sentences = SENTENCE_END_RE.findall(part) consumed = "".join(sentences).strip() rest = part[len(consumed):].strip() if rest: sentences.append(rest) for sentence in sentences: sentence = sentence.strip() if not sentence: continue if len(sentence) > max_len: result.extend(force_split_sentence(sentence, max_len)) else: result.append(sentence) return result def read_input_text(text_arg: Optional[str], input_path: Optional[str]) -> str: if text_arg and input_path: raise ValueError("Bitte entweder --text oder --input angeben, nicht beides.") if not text_arg and not input_path: raise ValueError("Bitte --text oder --input angeben.") if text_arg: return text_arg.strip() path = Path(input_path) if not path.exists(): raise FileNotFoundError(f"Input-Datei nicht gefunden: {path}") return path.read_text(encoding="utf-8").strip() def default_output_path(input_path: Optional[str], lang: str) -> Path: if input_path: src = Path(input_path) return src.with_suffix(f".{lang}.wav") return Path(f"tts_output.{lang}.wav") def load_model(lang: str, device: str, t3_model: Optional[str] = None): if lang == "en": model = ChatterboxTTS.from_pretrained(device=device) return model, "mono", model.sr if not HAS_MULTILINGUAL: raise RuntimeError( "Multilingual-Modell nicht verfügbar. Installiere ein Chatterbox-Paket mit chatterbox.mtl_tts." ) model = ChatterboxMultilingualTTS.from_pretrained(device=device, t3_model=t3_model) return model, "multi", model.sr def generate_chunk(model, model_kind: str, text: str, lang: str, voice_path: Optional[str]): kwargs = {} if voice_path: kwargs["audio_prompt_path"] = voice_path if model_kind == "mono": return model.generate(text, **kwargs) return model.generate(text, language_id=lang, **kwargs) def generate_stream_chunk( model, model_kind: str, text: str, lang: str, voice_path: Optional[str], stream_chunk_size: int, ): kwargs = { "chunk_size": stream_chunk_size, "print_metrics": False, } if voice_path: kwargs["audio_prompt_path"] = voice_path if model_kind == "mono": return model.generate_stream(text, **kwargs) return model.generate_stream(text, language_id=lang, **kwargs) class PlaybackWorker: PLAYBACK_RATE = 48000 # PipeWire/PulseAudio standard CALLBACK_BLOCK = 2048 # ~43 ms pro Callback-Block bei 48 kHz def __init__(self, sample_rate: int, device: Optional[str] = "pulse", speed: float = 1.0): self.sample_rate = sample_rate self.device = device self.speed = speed # Eingang: Torch-Tensoren vom TTS-Modell self.audio_queue: "queue.Queue[Optional[torch.Tensor]]" = queue.Queue() # Intern: fertig vorbereitete numpy-Blöcke für den Callback self._block_queue: "queue.Queue" = queue.Queue(maxsize=500) self._blocks_produced = 0 self._blocks_consumed = 0 self.thread = None self.error = None def start(self): if not has_module("sounddevice"): raise RuntimeError( "Für Live-Wiedergabe ist das Modul 'sounddevice' nötig. Installiere z. B. 'pip install sounddevice'." ) self.thread = threading.Thread(target=self._run, daemon=True) self.thread.start() def _callback(self, outdata, frames, time_info, status): # Läuft im Audio-Thread: so schnell wie möglich, kein Lock nötig. try: data = self._block_queue.get_nowait() outdata[:, 0] = data self._blocks_consumed += 1 except queue.Empty: outdata[:] = 0.0 # Stille statt Underrun-Klick def _produce(self): """Wandelt Torch-Tensoren in CALLBACK_BLOCK-große numpy-Arrays um.""" import numpy as np remainder = np.zeros(0, dtype="float32") while True: item = self.audio_queue.get() if item is None: break chunk = item.detach().cpu() if chunk.ndim == 2: chunk = chunk.squeeze(0) if self.speed != 1.0: import pyrubberband as pyrb # R3-Engine (--fine): deutlich weniger Phasiness als R2, besser für Sprache. # rate < 1.0 = langsamer, rate > 1.0 = schneller; Pitch bleibt gleich. stretched = pyrb.time_stretch( chunk.numpy().astype("float64"), self.sample_rate, self.speed, rbargs={"--fine": ""}, ) chunk = torch.from_numpy(stretched.astype("float32")) chunk = ta.functional.resample(chunk, self.sample_rate, self.PLAYBACK_RATE) samples = np.concatenate([remainder, chunk.numpy().astype("float32")]) i = 0 while i + self.CALLBACK_BLOCK <= len(samples): self._block_queue.put(samples[i : i + self.CALLBACK_BLOCK]) self._blocks_produced += 1 i += self.CALLBACK_BLOCK remainder = samples[i:] # Restliche Samples (< CALLBACK_BLOCK) mit Stille auffüllen if len(remainder) > 0: block = np.zeros(self.CALLBACK_BLOCK, dtype="float32") block[: len(remainder)] = remainder self._block_queue.put(block) self._blocks_produced += 1 def _run(self): try: import sounddevice as sd producer = threading.Thread(target=self._produce, daemon=True) producer.start() with sd.OutputStream( samplerate=self.PLAYBACK_RATE, channels=1, dtype="float32", device=self.device, blocksize=self.CALLBACK_BLOCK, callback=self._callback, ): producer.join() # alle Tensoren sind zu Blöcken konvertiert # Warten bis der Callback alle Blöcke abgespielt hat while self._blocks_consumed < self._blocks_produced: time.sleep(0.02) # Letzten Block aus Hardware-Buffer ausspielen lassen time.sleep(self.CALLBACK_BLOCK / self.PLAYBACK_RATE + 0.1) except Exception as e: self.error = e def put(self, chunk: torch.Tensor): self.audio_queue.put(chunk) def stop(self): self.audio_queue.put(None) if self.thread: self.thread.join() if self.error: raise RuntimeError(f"Fehler bei Live-Wiedergabe: {self.error}") def synthesize_non_streaming( text: str, lang: str, output_path: Optional[Path], max_len: int, first_chunk_len: int, voice_path: Optional[str], device: str, show_progress: bool = True, spell_uppercase_acronyms: bool = True, acronym_mode: Optional[str] = None, normalize_time_values: bool = True, normalize_year_values: bool = True, normalize_units_values: bool = True, conversation_mode: bool = True, play_audio: bool = False, save_wav: bool = True, audio_device: Optional[str] = "pulse", sentence_mode: bool = True, speed: float = 1.0, debug_delay: float = 0.0, t3_model: Optional[str] = None, pronunciation_dict: Optional[dict] = None, ) -> Optional[Path]: if lang not in SUPPORTED_LANGS: raise ValueError( f"Nicht unterstützte Sprache '{lang}'. Unterstützt: {', '.join(sorted(SUPPORTED_LANGS))}" ) if voice_path and not Path(voice_path).exists(): raise FileNotFoundError(f"Voice-Referenz nicht gefunden: {voice_path}") # Erst unsichtbare Zeichen entfernen, dann Sätze splitten (Paragraphen-Struktur erhalten), # danach erst Akronym-Expansion — sonst erzeugen "A. R. D."-Punkte falsche Satzgrenzen. text = clean_raw_text(text) model, model_kind, sr = load_model(lang, device, t3_model=t3_model) if sentence_mode: raw_chunks = split_into_sentences(text, max_len=max_len) elif conversation_mode: raw_chunks = split_for_conversation(text, first_chunk_len=first_chunk_len, max_len=max_len) else: raw_chunks = split_long_text(text, max_len=max_len) preprocess_kw = dict( lang=lang, spell_uppercase_acronyms=spell_uppercase_acronyms, acronym_mode=acronym_mode, normalize_time_values=normalize_time_values, normalize_year_values=normalize_year_values, normalize_units_values=normalize_units_values, pronunciation_dict=pronunciation_dict, ) chunks = [preprocess_tts_text(c, **preprocess_kw) for c in raw_chunks] chunks = [c for c in chunks if c.strip()] if not chunks: raise ValueError("Kein verwertbarer Text nach dem Einlesen gefunden.") if show_progress: print(f"Sprache: {lang}") print(f"Gerät: {device}") print(f"Modell: {'ChatterboxTTS (monolingual)' if model_kind == 'mono' else 'ChatterboxMultilingualTTS'}") print(f"Sätze: {len(chunks)}") print(f"Modus: {'Satz-für-Satz' if sentence_mode else 'non-streaming'} + Playback") print(f"Live-Wiedergabe: {'ja' if play_audio else 'nein'}") print(f"WAV speichern: {'ja' if save_wav and output_path else 'nein'}") if output_path and save_wav: print(f"Ausgabe: {output_path}") if play_audio: playback = PlaybackWorker(sample_rate=sr, device=audio_device, speed=speed) playback.start() else: playback = None wavs = [] try: for i, chunk in enumerate(chunks, start=1): if debug_delay > 0: if show_progress: print(f"[{i}/{len(chunks)}] Warte {debug_delay:.0f}s (debug_delay) ...") time.sleep(debug_delay) if show_progress: print(f"[{i}/{len(chunks)}] Generiere ({len(chunk)} Zeichen) ...") wav = generate_chunk(model, model_kind, chunk, lang, voice_path) wavs.append(wav) if playback is not None: playback.put(wav) finally: if playback is not None: playback.stop() if not wavs: return None final_wav = wavs[0] if len(wavs) == 1 else torch.cat(wavs, dim=-1) if save_wav and output_path: output_path.parent.mkdir(parents=True, exist_ok=True) ta.save(str(output_path), final_wav, sr) return output_path return None def synthesize_streaming( text: str, lang: str, output_path: Optional[Path], max_len: int, first_chunk_len: int, voice_path: Optional[str], device: str, show_progress: bool = True, spell_uppercase_acronyms: bool = True, acronym_mode: str = "period_space", normalize_time_values: bool = True, normalize_year_values: bool = True, normalize_units_values: bool = True, conversation_mode: bool = True, play_audio: bool = True, save_wav: bool = True, stream_chunk_size: int = 25, audio_device: Optional[str] = None, ) -> Optional[Path]: if lang not in SUPPORTED_LANGS: raise ValueError( f"Nicht unterstützte Sprache '{lang}'. Unterstützt: {', '.join(sorted(SUPPORTED_LANGS))}" ) if voice_path and not Path(voice_path).exists(): raise FileNotFoundError(f"Voice-Referenz nicht gefunden: {voice_path}") text = preprocess_tts_text( text=text, lang=lang, spell_uppercase_acronyms=spell_uppercase_acronyms, acronym_mode=acronym_mode, normalize_time_values=normalize_time_values, normalize_year_values=normalize_year_values, normalize_units_values=normalize_units_values, ) model, model_kind, sr = load_model(lang, device) if not hasattr(model, "generate_stream"): raise RuntimeError( "Dieses Chatterbox-Paket bietet kein generate_stream(). " "Installiere z. B. 'chatterbox-streaming'." ) if conversation_mode: text_chunks = split_for_conversation(text, first_chunk_len=first_chunk_len, max_len=max_len) else: text_chunks = split_long_text(text, max_len=max_len) if not text_chunks: raise ValueError("Kein verwertbarer Text nach dem Einlesen gefunden.") if play_audio: playback = PlaybackWorker(sample_rate=sr, device=audio_device) playback.start() else: playback = None all_audio_chunks: List[torch.Tensor] = [] t0 = time.perf_counter() first_audio_started = False if show_progress: print(f"Sprache: {lang}") print(f"Gerät: {device}") print(f"Modell: {'ChatterboxTTS (monolingual)' if model_kind == 'mono' else 'ChatterboxMultilingualTTS'}") print(f"Text-Chunks: {len(text_chunks)}") print(f"Modus: streaming") print(f"Gesprächsmodus: {'ja' if conversation_mode else 'nein'}") print(f"Live-Wiedergabe: {'ja' if play_audio else 'nein'}") print(f"WAV speichern: {'ja' if save_wav and output_path else 'nein'}") print(f"Streaming chunk_size: {stream_chunk_size}") if output_path: print(f"Ausgabe: {output_path}") try: for text_idx, text_chunk in enumerate(text_chunks, start=1): if show_progress: print(f"[Text {text_idx}/{len(text_chunks)}] Starte Streaming für {len(text_chunk)} Zeichen ...") stream_iter = generate_stream_chunk( model=model, model_kind=model_kind, text=text_chunk, lang=lang, voice_path=voice_path, stream_chunk_size=stream_chunk_size, ) for audio_idx, item in enumerate(stream_iter, start=1): if isinstance(item, tuple) and len(item) == 2: audio_chunk, metrics = item else: audio_chunk, metrics = item, None all_audio_chunks.append(audio_chunk) if playback is not None: playback.put(audio_chunk) if not first_audio_started: first_audio_started = True if show_progress: dt = time.perf_counter() - t0 print(f"Audio-Wiedergabe gestartet nach {dt:.3f}s") if show_progress: msg = f" -> Audio-Chunk {audio_idx}" if metrics is not None: latency = getattr(metrics, "latency_to_first_chunk", None) rtf = getattr(metrics, "rtf", None) chunk_count = getattr(metrics, "chunk_count", None) if chunk_count is not None: msg += f", model_chunk={chunk_count}" if latency: msg += f", first_latency={latency:.3f}s" if rtf: msg += f", rtf={rtf:.3f}" print(msg) finally: if playback is not None: playback.stop() final_output = None if save_wav and output_path: final_audio = all_audio_chunks[0] if len(all_audio_chunks) == 1 else torch.cat(all_audio_chunks, dim=-1) output_path.parent.mkdir(parents=True, exist_ok=True) ta.save(str(output_path), final_audio, sr) final_output = output_path return final_output def build_argparser() -> argparse.ArgumentParser: p = argparse.ArgumentParser( description="Low-Latency Chatterbox TTS CLI mit deutscher Text-Normalisierung und optionalem Streaming." ) p.add_argument("--text", type=str, help="Direkter Eingabetext.") p.add_argument("--input", type=str, help="Pfad zu UTF-8-Textdatei.") p.add_argument("--lang", type=str, default="de", help="Sprachcode, default: de.") p.add_argument("--len", dest="max_len", type=int, default=400, help="Maximale Chunk-Länge, default: 400.") p.add_argument("--first-chunk-len", type=int, default=80, help="Kleinere Zielgröße für den ersten Chunk im Gesprächsmodus. Default: 80.") p.add_argument("--output", type=str, help="Ausgabedatei .wav") p.add_argument("--voice", type=str, help="Optionale Referenz-WAV für Voice-Cloning.") p.add_argument("--device", type=str, default=None, help="z. B. cuda:0 oder cpu.") p.add_argument("--no-progress", action="store_true", help="Weniger Konsolen-Output.") p.add_argument("--no-spell-acronyms", action="store_true", help="Großgeschriebene Akronyme nicht buchstabieren.") p.add_argument( "--acronym-mode", type=str, default=None, # None = automatisch: 'german' bei de, 'period_space' sonst choices=["space", "period", "comma", "period_space", "german"], help="Ausgabeformat für buchstabierte Akronyme. Default: 'german' bei --lang de, sonst 'period_space'." ) p.add_argument("--pronunciation-dict", type=str, default=None, help="Pfad zu einer JSON-Datei mit Aussprache-Substitutionen (Eigenname → Lautschrift).") p.add_argument("--no-normalize-times", action="store_true", help="Uhrzeiten nicht in sprechbaren Text umwandeln.") p.add_argument("--no-normalize-years", action="store_true", help="Jahreszahlen nicht in sprechbaren Text umwandeln.") p.add_argument("--no-normalize-units", action="store_true", help="Einheiten nicht in sprechbaren Text umwandeln.") p.add_argument("--stream", action="store_true", help="Streaming-TTS-Modus (experimentell, kann abgehackt klingen).") p.add_argument("--no-play", action="store_true", help="Nicht live abspielen.") p.add_argument("--audio-device", type=str, default="pulse", help="Sounddevice-Ausgabegerät, z. B. 'pulse' oder 'M2: USB Audio'. Standard: pulse.") p.add_argument("--save", action="store_true", help="WAV-Datei speichern (Standard: nein).") p.add_argument("--stream-chunk-size", type=int, default=12, help="Streaming chunk_size (nur mit --stream). Default: 12.") p.add_argument("--no-sentence-mode", action="store_true", help="Sätze zu größeren Chunks gruppieren statt einzeln ausgeben.") p.add_argument("--speed", type=float, default=1.0, help="Wiedergabegeschwindigkeit: 0.8 = 20%% langsamer, 1.2 = 20%% schneller. Default: 1.0.") p.add_argument("--debug-delay", type=float, default=0.0, help="Sekunden Pause vor jedem Satz (simuliert langsame KI). Nur zum Testen.") p.add_argument("--t3-model", type=str, default="v3", help="Multilingual T3-Modell: 'v3' (default), 'v2' oder Dateiname.") p.add_argument("--no-conversation-mode", action="store_true", help="Ersten Chunk nicht künstlich kleiner machen (nur ohne --no-sentence-mode).") return p def main() -> int: parser = build_argparser() args = parser.parse_args() try: text = read_input_text(args.text, args.input) device = get_device(args.device) output_path = Path(args.output) if args.output else default_output_path(args.input, args.lang) save_wav = args.save or bool(args.output) # Acronym-Mode-Default: 'german' bei Deutsch, 'period_space' sonst acronym_mode = args.acronym_mode or ("german" if args.lang == "de" else "period_space") # Optionales Aussprache-Wörterbuch laden pronunciation_dict: Optional[dict] = None if args.pronunciation_dict: import json pron_path = Path(args.pronunciation_dict) if not pron_path.exists(): raise FileNotFoundError(f"Aussprache-Dict nicht gefunden: {pron_path}") pronunciation_dict = json.loads(pron_path.read_text(encoding="utf-8")) if args.stream: out = synthesize_streaming( text=text, lang=args.lang, output_path=output_path if save_wav else None, max_len=args.max_len, first_chunk_len=args.first_chunk_len, voice_path=args.voice, device=device, show_progress=not args.no_progress, spell_uppercase_acronyms=not args.no_spell_acronyms, acronym_mode=acronym_mode, normalize_time_values=not args.no_normalize_times, normalize_year_values=not args.no_normalize_years, normalize_units_values=not args.no_normalize_units, conversation_mode=not args.no_conversation_mode, play_audio=not args.no_play, save_wav=save_wav, stream_chunk_size=args.stream_chunk_size, audio_device=args.audio_device, ) else: out = synthesize_non_streaming( text=text, lang=args.lang, output_path=output_path if save_wav else None, max_len=args.max_len, first_chunk_len=args.first_chunk_len, voice_path=args.voice, device=device, show_progress=not args.no_progress, spell_uppercase_acronyms=not args.no_spell_acronyms, acronym_mode=acronym_mode, normalize_time_values=not args.no_normalize_times, normalize_year_values=not args.no_normalize_years, normalize_units_values=not args.no_normalize_units, conversation_mode=not args.no_conversation_mode, play_audio=not args.no_play, save_wav=save_wav, audio_device=args.audio_device, sentence_mode=not args.no_sentence_mode, speed=args.speed, debug_delay=args.debug_delay, t3_model=args.t3_model, pronunciation_dict=pronunciation_dict, ) if out is not None: print(f"Fertig: {out}") else: print("Fertig.") return 0 except Exception as e: print(f"Fehler: {e}", file=sys.stderr) return 1 if __name__ == "__main__": raise SystemExit(main())