import time import requests import pathlib from io import BytesIO from playsound import playsound from webscout import exceptions from webscout.AIbase import TTSProvider from webscout.litagent import LitAgent from concurrent.futures import ThreadPoolExecutor, as_completed """ Text processing utilities for TTS providers. """ from typing import List, Dict, Tuple, Set, Optional, Pattern import re class SentenceTokenizer: """Advanced sentence tokenizer with support for complex cases and proper formatting.""" def __init__(self) -> None: # Common abbreviations by category self.TITLES: Set[str] = { 'mr', 'mrs', 'ms', 'dr', 'prof', 'rev', 'sr', 'jr', 'esq', 'hon', 'pres', 'gov', 'atty', 'supt', 'det', 'rev', 'col','maj', 'gen', 'capt', 'cmdr', 'lt', 'sgt', 'cpl', 'pvt' } self.ACADEMIC: Set[str] = { 'ph.d', 'phd', 'm.d', 'md', 'b.a', 'ba', 'm.a', 'ma', 'd.d.s', 'dds', 'm.b.a', 'mba', 'b.sc', 'bsc', 'm.sc', 'msc', 'llb', 'll.b', 'bl' } self.ORGANIZATIONS: Set[str] = { 'inc', 'ltd', 'co', 'corp', 'llc', 'llp', 'assn', 'bros', 'plc', 'cos', 'intl', 'dept', 'est', 'dist', 'mfg', 'div' } self.MONTHS: Set[str] = { 'jan', 'feb', 'mar', 'apr', 'jun', 'jul', 'aug', 'sep', 'oct', 'nov', 'dec' } self.UNITS: Set[str] = { 'oz', 'pt', 'qt', 'gal', 'ml', 'cc', 'km', 'cm', 'mm', 'ft', 'in', 'kg', 'lb', 'lbs', 'hz', 'khz', 'mhz', 'ghz', 'kb', 'mb', 'gb', 'tb' } self.TECHNOLOGY: Set[str] = { 'v', 'ver', 'app', 'sys', 'dir', 'exe', 'lib', 'api', 'sdk', 'url', 'cpu', 'gpu', 'ram', 'rom', 'hdd', 'ssd', 'lan', 'wan', 'sql', 'html' } self.MISC: Set[str] = { 'vs', 'etc', 'ie', 'eg', 'no', 'al', 'ca', 'cf', 'pp', 'est', 'st', 'approx', 'appt', 'apt', 'dept', 'depts', 'min', 'max', 'avg' } # Combine all abbreviations self.all_abbreviations: Set[str] = ( self.TITLES | self.ACADEMIC | self.ORGANIZATIONS | self.MONTHS | self.UNITS | self.TECHNOLOGY | self.MISC ) # Special patterns self.ELLIPSIS: str = r'\.{2,}|…' self.URL_PATTERN: str = ( r'(?:https?:\/\/|www\.)[\w\-\.]+\.[a-zA-Z]{2,}(?:\/[^\s]*)?' ) self.EMAIL_PATTERN: str = r'[\w\.-]+@[\w\.-]+\.\w+' self.NUMBER_PATTERN: str = ( r'\d+(?:\.\d+)?(?:%|°|km|cm|mm|m|kg|g|lb|ft|in|mph|kmh|hz|mhz|ghz)?' ) # Quote and bracket pairs self.QUOTE_PAIRS: Dict[str, str] = { '"': '"', "'": "'", '"': '"', "「": "」", "『": "』", "«": "»", "‹": "›", "'": "'", "‚": "'" } self.BRACKETS: Dict[str, str] = { '(': ')', '[': ']', '{': '}', '⟨': '⟩', '「': '」', '『': '』', '【': '】', '〖': '〗', '「': '」' } # Compile regex patterns self._compile_patterns() def _compile_patterns(self) -> None: """Compile regex patterns for better performance.""" # Pattern for finding potential sentence boundaries self.SENTENCE_END: Pattern = re.compile( r''' # Group for sentence endings (?: # Standard endings with optional quotes/brackets (?<=[.!?])[\"\'\)\]\}»›」』\s]* # Ellipsis |(?:\.{2,}|…) # Asian-style endings |(?<=[。!?」』】\s]) ) # Must be followed by whitespace and capital letter or number (?=\s+(?:[A-Z0-9]|["'({[\[「『《‹〈][A-Z])) ''', re.VERBOSE ) # Pattern for abbreviations abbrev_pattern = '|'.join(re.escape(abbr) for abbr in self.all_abbreviations) self.ABBREV_PATTERN: Pattern = re.compile( fr'\b(?:{abbrev_pattern})\.?', re.IGNORECASE ) def _protect_special_cases(self, text: str) -> Tuple[str, Dict[str, str]]: """Protect URLs, emails, and other special cases from being split.""" protected = text placeholders: Dict[str, str] = {} counter = 0 # Protect URLs and emails for pattern in [self.URL_PATTERN, self.EMAIL_PATTERN]: for match in re.finditer(pattern, protected): placeholder = f'__PROTECTED_{counter}__' placeholders[placeholder] = match.group() protected = protected.replace(match.group(), placeholder) counter += 1 # Protect quoted content stack = [] protected_chars = list(protected) i = 0 while i < len(protected_chars): char = protected_chars[i] if char in self.QUOTE_PAIRS: stack.append((char, i)) elif stack and char == self.QUOTE_PAIRS[stack[-1][0]]: start_quote, start_idx = stack.pop() content = ''.join(protected_chars[start_idx:i + 1]) placeholder = f'__PROTECTED_{counter}__' placeholders[placeholder] = content protected_chars[start_idx:i + 1] = list(placeholder) counter += 1 i += 1 return ''.join(protected_chars), placeholders def _restore_special_cases(self, text: str, placeholders: Dict[str, str]) -> str: """Restore protected content.""" restored = text for placeholder, original in placeholders.items(): restored = restored.replace(placeholder, original) return restored def _handle_abbreviations(self, text: str) -> str: """Handle abbreviations to prevent incorrect sentence splitting.""" def replace_abbrev(match: re.Match) -> str: abbr = match.group().lower().rstrip('.') if abbr in self.all_abbreviations: return match.group().replace('.', '__DOT__') return match.group() return self.ABBREV_PATTERN.sub(replace_abbrev, text) def _normalize_whitespace(self, text: str) -> str: """Normalize whitespace while preserving paragraph breaks.""" # Replace multiple newlines with special marker text = re.sub(r'\n\s*\n', ' __PARA__ ', text) # Normalize remaining whitespace text = re.sub(r'\s+', ' ', text) return text.strip() def _restore_formatting(self, sentences: List[str]) -> List[str]: """Restore original formatting and clean up sentences.""" restored = [] for sentence in sentences: # Restore dots in abbreviations sentence = sentence.replace('__DOT__', '.') # Restore paragraph breaks sentence = sentence.replace('__PARA__', '\n\n') # Clean up whitespace sentence = re.sub(r'\s+', ' ', sentence).strip() # Capitalize first letter if it's lowercase and not an abbreviation words = sentence.split() if words and words[0].lower() not in self.all_abbreviations: sentence = sentence[0].upper() + sentence[1:] if sentence: restored.append(sentence) return restored def tokenize(self, text: str) -> List[str]: """ Split text into sentences while handling complex cases. Args: text (str): Input text to split into sentences. Returns: List[str]: List of properly formatted sentences. """ if not text or not text.strip(): return [] # Step 1: Protect special cases protected_text, placeholders = self._protect_special_cases(text) # Step 2: Normalize whitespace protected_text = self._normalize_whitespace(protected_text) # Step 3: Handle abbreviations protected_text = self._handle_abbreviations(protected_text) # Step 4: Split into potential sentences potential_sentences = self.SENTENCE_END.split(protected_text) # Step 5: Process and restore formatting sentences = self._restore_formatting(potential_sentences) # Step 6: Restore special cases sentences = [self._restore_special_cases(s, placeholders) for s in sentences] # Step 7: Post-process sentences final_sentences = [] current_sentence = [] for sentence in sentences: # Skip empty sentences if not sentence.strip(): continue # Check if sentence might be continuation of previous if current_sentence and sentence[0].islower(): current_sentence.append(sentence) else: if current_sentence: final_sentences.append(' '.join(current_sentence)) current_sentence = [sentence] # Add last sentence if exists if current_sentence: final_sentences.append(' '.join(current_sentence)) return final_sentences def split_sentences(text: str) -> List[str]: """ Convenience function to split text into sentences using SentenceTokenizer. Args: text (str): Input text to split into sentences. Returns: List[str]: List of properly formatted sentences. """ tokenizer = SentenceTokenizer() return tokenizer.tokenize(text) class ElevenlabsTTS(TTSProvider): """ Text-to-speech provider using the ElevenlabsTTS API. """ # Request headers headers: dict[str, str] = { "User-Agent": LitAgent().random() } cache_dir = pathlib.Path("./audio_cache") all_voices: dict[str, str] = {"Brian": "nPczCjzI2devNBz1zQrb", "Alice":"Xb7hH8MSUJpSbSDYk0k2", "Bill":"pqHfZKP75CvOlQylNhV4", "Callum":"N2lVS1w4EtoT3dr4eOWO", "Charlie":"IKne3meq5aSn9XLyUdCD", "Charlotte":"XB0fDUnXU5powFXDhCwa", "Chris":"iP95p4xoKVk53GoZ742B", "Daniel":"onwK4e9ZLuTAKqWW03F9", "Eric":"cjVigY5qzO86Huf0OWal", "George":"JBFqnCBsd6RMkjVDRZzb", "Jessica":"cgSgspJ2msm6clMCkdW9", "Laura":"FGY2WhTYpPnrIDTdsKH5", "Liam":"TX3LPaxmHKxFdv7VOQHJ", "Lily":"pFZP5JQG7iQjIQuC4Bku", "Matilda":"XrExE9yKIg1WjnnlVkGX", "Sarah":"EXAVITQu4vr4xnSDxMaL", "Will":"bIHbv24MWmeRgasZH58o", "Neal":"Zp1aWhL05Pi5BkhizFC3"} def __init__(self, timeout: int = 20, proxies: dict = None): """Initializes the ElevenlabsTTS TTS client.""" self.session = requests.Session() self.session.headers.update(self.headers) if proxies: self.session.proxies.update(proxies) self.timeout = timeout self.params = {'allow_unauthenticated': '1'} def tts(self, text: str, voice: str = "Brian", verbose:bool = True) -> str: """ Converts text to speech using the ElevenlabsTTS API and saves it to a file. """ assert ( voice in self.all_voices ), f"Voice '{voice}' not one of [{', '.join(self.all_voices.keys())}]" filename = self.cache_dir / f"{int(time.time())}.mp3" # Split text into sentences sentences = split_sentences(text) # Function to request audio for each chunk def generate_audio_for_chunk(part_text: str, part_number: int): while True: try: json_data = {'text': part_text, 'model_id': 'eleven_multilingual_v2'} response = self.session.post(f'https://api.elevenlabs.io/v1/text-to-speech/{self.all_voices[voice]}',params=self.params, headers=self.headers, json=json_data, timeout=self.timeout) response.raise_for_status() # Create the audio_cache directory if it doesn't exist self.cache_dir.mkdir(parents=True, exist_ok=True) # Check if the request was successful if response.ok and response.status_code == 200: return part_number, response.content else: raise exceptions.FailedToGenerateResponseError( f"Failed to generate audio for chunk {part_number}: {response.status_code}" ) except requests.RequestException as e: time.sleep(1) continue try: # Using ThreadPoolExecutor to handle requests concurrently with ThreadPoolExecutor() as executor: futures = {executor.submit(generate_audio_for_chunk, sentence.strip(), chunk_num): chunk_num for chunk_num, sentence in enumerate(sentences, start=1)} # Dictionary to store results with order preserved audio_chunks = {} for future in as_completed(futures): chunk_num = futures[future] try: part_number, audio_data = future.result() audio_chunks[part_number] = audio_data except Exception as e: raise exceptions.FailedToGenerateResponseError( f"Failed to generate audio for chunk {chunk_num}: {e}" ) # Combine audio chunks in the correct sequence combined_audio = BytesIO() for part_number in sorted(audio_chunks.keys()): combined_audio.write(audio_chunks[part_number]) # Save the combined audio data to a single file with open(filename, 'wb') as f: f.write(combined_audio.getvalue()) return filename.as_posix() except requests.exceptions.RequestException as e: raise exceptions.FailedToGenerateResponseError( f"Failed to perform the operation: {e}" ) # Example usage if __name__ == "__main__": elevenlabs = ElevenlabsTTS() text = "This is a test of the ElevenlabsTTS text-to-speech API. It supports multiple sentences and advanced logging." audio_file = elevenlabs.tts(text, voice="Brian")