|
import time |
|
import requests |
|
import pathlib |
|
from io import BytesIO |
|
from playsound import playsound |
|
from webscout import exceptions |
|
from webscout.AIbase import TTSProvider |
|
from webscout.litagent import LitAgent |
|
from concurrent.futures import ThreadPoolExecutor, as_completed |
|
""" |
|
Text processing utilities for TTS providers. |
|
""" |
|
from typing import List, Dict, Tuple, Set, Optional, Pattern |
|
import re |
|
|
|
|
|
class SentenceTokenizer: |
|
"""Advanced sentence tokenizer with support for complex cases and proper formatting.""" |
|
|
|
def __init__(self) -> None: |
|
|
|
self.TITLES: Set[str] = { |
|
'mr', 'mrs', 'ms', 'dr', 'prof', 'rev', 'sr', 'jr', 'esq', |
|
'hon', 'pres', 'gov', 'atty', 'supt', 'det', 'rev', 'col','maj', 'gen', 'capt', 'cmdr', |
|
'lt', 'sgt', 'cpl', 'pvt' |
|
} |
|
|
|
self.ACADEMIC: Set[str] = { |
|
'ph.d', 'phd', 'm.d', 'md', 'b.a', 'ba', 'm.a', 'ma', 'd.d.s', 'dds', |
|
'm.b.a', 'mba', 'b.sc', 'bsc', 'm.sc', 'msc', 'llb', 'll.b', 'bl' |
|
} |
|
|
|
self.ORGANIZATIONS: Set[str] = { |
|
'inc', 'ltd', 'co', 'corp', 'llc', 'llp', 'assn', 'bros', 'plc', 'cos', |
|
'intl', 'dept', 'est', 'dist', 'mfg', 'div' |
|
} |
|
|
|
self.MONTHS: Set[str] = { |
|
'jan', 'feb', 'mar', 'apr', 'jun', 'jul', 'aug', 'sep', 'oct', 'nov', 'dec' |
|
} |
|
|
|
self.UNITS: Set[str] = { |
|
'oz', 'pt', 'qt', 'gal', 'ml', 'cc', 'km', 'cm', 'mm', 'ft', 'in', |
|
'kg', 'lb', 'lbs', 'hz', 'khz', 'mhz', 'ghz', 'kb', 'mb', 'gb', 'tb' |
|
} |
|
|
|
self.TECHNOLOGY: Set[str] = { |
|
'v', 'ver', 'app', 'sys', 'dir', 'exe', 'lib', 'api', 'sdk', 'url', |
|
'cpu', 'gpu', 'ram', 'rom', 'hdd', 'ssd', 'lan', 'wan', 'sql', 'html' |
|
} |
|
|
|
self.MISC: Set[str] = { |
|
'vs', 'etc', 'ie', 'eg', 'no', 'al', 'ca', 'cf', 'pp', 'est', 'st', |
|
'approx', 'appt', 'apt', 'dept', 'depts', 'min', 'max', 'avg' |
|
} |
|
|
|
|
|
self.all_abbreviations: Set[str] = ( |
|
self.TITLES | self.ACADEMIC | self.ORGANIZATIONS | |
|
self.MONTHS | self.UNITS | self.TECHNOLOGY | self.MISC |
|
) |
|
|
|
|
|
self.ELLIPSIS: str = r'\.{2,}|β¦' |
|
self.URL_PATTERN: str = ( |
|
r'(?:https?:\/\/|www\.)[\w\-\.]+\.[a-zA-Z]{2,}(?:\/[^\s]*)?' |
|
) |
|
self.EMAIL_PATTERN: str = r'[\w\.-]+@[\w\.-]+\.\w+' |
|
self.NUMBER_PATTERN: str = ( |
|
r'\d+(?:\.\d+)?(?:%|Β°|km|cm|mm|m|kg|g|lb|ft|in|mph|kmh|hz|mhz|ghz)?' |
|
) |
|
|
|
|
|
self.QUOTE_PAIRS: Dict[str, str] = { |
|
'"': '"', "'": "'", '"': '"', "γ": "γ", "γ": "γ", |
|
"Β«": "Β»", "βΉ": "βΊ", "'": "'", "β": "'" |
|
} |
|
|
|
self.BRACKETS: Dict[str, str] = { |
|
'(': ')', '[': ']', '{': '}', 'β¨': 'β©', 'γ': 'γ', |
|
'γ': 'γ', 'γ': 'γ', 'γ': 'γ', 'ο½’': 'ο½£' |
|
} |
|
|
|
|
|
self._compile_patterns() |
|
|
|
def _compile_patterns(self) -> None: |
|
"""Compile regex patterns for better performance.""" |
|
|
|
self.SENTENCE_END: Pattern = re.compile( |
|
r''' |
|
# Group for sentence endings |
|
(?: |
|
# Standard endings with optional quotes/brackets |
|
(?<=[.!?])[\"\'\)\]\}Β»βΊγγ\s]* |
|
|
|
# Ellipsis |
|
|(?:\.{2,}|β¦) |
|
|
|
# Asian-style endings |
|
|(?<=[γοΌοΌγγγ\s]) |
|
) |
|
|
|
# Must be followed by whitespace and capital letter or number |
|
(?=\s+(?:[A-Z0-9]|["'({[\[γγγβΉγ][A-Z])) |
|
''', |
|
re.VERBOSE |
|
) |
|
|
|
|
|
abbrev_pattern = '|'.join(re.escape(abbr) for abbr in self.all_abbreviations) |
|
self.ABBREV_PATTERN: Pattern = re.compile( |
|
fr'\b(?:{abbrev_pattern})\.?', |
|
re.IGNORECASE |
|
) |
|
|
|
def _protect_special_cases(self, text: str) -> Tuple[str, Dict[str, str]]: |
|
"""Protect URLs, emails, and other special cases from being split.""" |
|
protected = text |
|
placeholders: Dict[str, str] = {} |
|
counter = 0 |
|
|
|
|
|
for pattern in [self.URL_PATTERN, self.EMAIL_PATTERN]: |
|
for match in re.finditer(pattern, protected): |
|
placeholder = f'__PROTECTED_{counter}__' |
|
placeholders[placeholder] = match.group() |
|
protected = protected.replace(match.group(), placeholder) |
|
counter += 1 |
|
|
|
|
|
stack = [] |
|
protected_chars = list(protected) |
|
i = 0 |
|
while i < len(protected_chars): |
|
char = protected_chars[i] |
|
if char in self.QUOTE_PAIRS: |
|
stack.append((char, i)) |
|
elif stack and char == self.QUOTE_PAIRS[stack[-1][0]]: |
|
start_quote, start_idx = stack.pop() |
|
content = ''.join(protected_chars[start_idx:i + 1]) |
|
placeholder = f'__PROTECTED_{counter}__' |
|
placeholders[placeholder] = content |
|
protected_chars[start_idx:i + 1] = list(placeholder) |
|
counter += 1 |
|
i += 1 |
|
|
|
return ''.join(protected_chars), placeholders |
|
|
|
def _restore_special_cases(self, text: str, placeholders: Dict[str, str]) -> str: |
|
"""Restore protected content.""" |
|
restored = text |
|
for placeholder, original in placeholders.items(): |
|
restored = restored.replace(placeholder, original) |
|
return restored |
|
|
|
def _handle_abbreviations(self, text: str) -> str: |
|
"""Handle abbreviations to prevent incorrect sentence splitting.""" |
|
def replace_abbrev(match: re.Match) -> str: |
|
abbr = match.group().lower().rstrip('.') |
|
if abbr in self.all_abbreviations: |
|
return match.group().replace('.', '__DOT__') |
|
return match.group() |
|
|
|
return self.ABBREV_PATTERN.sub(replace_abbrev, text) |
|
|
|
def _normalize_whitespace(self, text: str) -> str: |
|
"""Normalize whitespace while preserving paragraph breaks.""" |
|
|
|
text = re.sub(r'\n\s*\n', ' __PARA__ ', text) |
|
|
|
text = re.sub(r'\s+', ' ', text) |
|
return text.strip() |
|
|
|
def _restore_formatting(self, sentences: List[str]) -> List[str]: |
|
"""Restore original formatting and clean up sentences.""" |
|
restored = [] |
|
for sentence in sentences: |
|
|
|
sentence = sentence.replace('__DOT__', '.') |
|
|
|
|
|
sentence = sentence.replace('__PARA__', '\n\n') |
|
|
|
|
|
sentence = re.sub(r'\s+', ' ', sentence).strip() |
|
|
|
|
|
words = sentence.split() |
|
if words and words[0].lower() not in self.all_abbreviations: |
|
sentence = sentence[0].upper() + sentence[1:] |
|
|
|
if sentence: |
|
restored.append(sentence) |
|
|
|
return restored |
|
|
|
def tokenize(self, text: str) -> List[str]: |
|
""" |
|
Split text into sentences while handling complex cases. |
|
|
|
Args: |
|
text (str): Input text to split into sentences. |
|
|
|
Returns: |
|
List[str]: List of properly formatted sentences. |
|
""" |
|
if not text or not text.strip(): |
|
return [] |
|
|
|
|
|
protected_text, placeholders = self._protect_special_cases(text) |
|
|
|
|
|
protected_text = self._normalize_whitespace(protected_text) |
|
|
|
|
|
protected_text = self._handle_abbreviations(protected_text) |
|
|
|
|
|
potential_sentences = self.SENTENCE_END.split(protected_text) |
|
|
|
|
|
sentences = self._restore_formatting(potential_sentences) |
|
|
|
|
|
sentences = [self._restore_special_cases(s, placeholders) for s in sentences] |
|
|
|
|
|
final_sentences = [] |
|
current_sentence = [] |
|
|
|
for sentence in sentences: |
|
|
|
if not sentence.strip(): |
|
continue |
|
|
|
|
|
if current_sentence and sentence[0].islower(): |
|
current_sentence.append(sentence) |
|
else: |
|
if current_sentence: |
|
final_sentences.append(' '.join(current_sentence)) |
|
current_sentence = [sentence] |
|
|
|
|
|
if current_sentence: |
|
final_sentences.append(' '.join(current_sentence)) |
|
|
|
return final_sentences |
|
|
|
|
|
def split_sentences(text: str) -> List[str]: |
|
""" |
|
Convenience function to split text into sentences using SentenceTokenizer. |
|
|
|
Args: |
|
text (str): Input text to split into sentences. |
|
|
|
Returns: |
|
List[str]: List of properly formatted sentences. |
|
""" |
|
tokenizer = SentenceTokenizer() |
|
return tokenizer.tokenize(text) |
|
|
|
|
|
class ElevenlabsTTS(TTSProvider): |
|
""" |
|
Text-to-speech provider using the ElevenlabsTTS API. |
|
""" |
|
|
|
headers: dict[str, str] = { |
|
"User-Agent": LitAgent().random() |
|
} |
|
cache_dir = pathlib.Path("./audio_cache") |
|
all_voices: dict[str, str] = {"Brian": "nPczCjzI2devNBz1zQrb", "Alice":"Xb7hH8MSUJpSbSDYk0k2", "Bill":"pqHfZKP75CvOlQylNhV4", "Callum":"N2lVS1w4EtoT3dr4eOWO", "Charlie":"IKne3meq5aSn9XLyUdCD", "Charlotte":"XB0fDUnXU5powFXDhCwa", "Chris":"iP95p4xoKVk53GoZ742B", "Daniel":"onwK4e9ZLuTAKqWW03F9", "Eric":"cjVigY5qzO86Huf0OWal", "George":"JBFqnCBsd6RMkjVDRZzb", "Jessica":"cgSgspJ2msm6clMCkdW9", "Laura":"FGY2WhTYpPnrIDTdsKH5", "Liam":"TX3LPaxmHKxFdv7VOQHJ", "Lily":"pFZP5JQG7iQjIQuC4Bku", "Matilda":"XrExE9yKIg1WjnnlVkGX", "Sarah":"EXAVITQu4vr4xnSDxMaL", "Will":"bIHbv24MWmeRgasZH58o", "Neal":"Zp1aWhL05Pi5BkhizFC3"} |
|
|
|
def __init__(self, timeout: int = 20, proxies: dict = None): |
|
"""Initializes the ElevenlabsTTS TTS client.""" |
|
self.session = requests.Session() |
|
self.session.headers.update(self.headers) |
|
if proxies: |
|
self.session.proxies.update(proxies) |
|
self.timeout = timeout |
|
self.params = {'allow_unauthenticated': '1'} |
|
|
|
def tts(self, text: str, voice: str = "Brian", verbose:bool = True) -> str: |
|
""" |
|
Converts text to speech using the ElevenlabsTTS API and saves it to a file. |
|
""" |
|
assert ( |
|
voice in self.all_voices |
|
), f"Voice '{voice}' not one of [{', '.join(self.all_voices.keys())}]" |
|
|
|
filename = self.cache_dir / f"{int(time.time())}.mp3" |
|
|
|
|
|
sentences = split_sentences(text) |
|
|
|
|
|
def generate_audio_for_chunk(part_text: str, part_number: int): |
|
while True: |
|
try: |
|
json_data = {'text': part_text, 'model_id': 'eleven_multilingual_v2'} |
|
response = self.session.post(f'https://api.elevenlabs.io/v1/text-to-speech/{self.all_voices[voice]}',params=self.params, headers=self.headers, json=json_data, timeout=self.timeout) |
|
response.raise_for_status() |
|
|
|
|
|
self.cache_dir.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
if response.ok and response.status_code == 200: |
|
return part_number, response.content |
|
else: |
|
raise exceptions.FailedToGenerateResponseError( |
|
f"Failed to generate audio for chunk {part_number}: {response.status_code}" |
|
) |
|
except requests.RequestException as e: |
|
time.sleep(1) |
|
continue |
|
|
|
try: |
|
|
|
with ThreadPoolExecutor() as executor: |
|
futures = {executor.submit(generate_audio_for_chunk, sentence.strip(), chunk_num): chunk_num |
|
for chunk_num, sentence in enumerate(sentences, start=1)} |
|
|
|
|
|
audio_chunks = {} |
|
|
|
for future in as_completed(futures): |
|
chunk_num = futures[future] |
|
try: |
|
part_number, audio_data = future.result() |
|
audio_chunks[part_number] = audio_data |
|
except Exception as e: |
|
raise exceptions.FailedToGenerateResponseError( |
|
f"Failed to generate audio for chunk {chunk_num}: {e}" |
|
) |
|
|
|
|
|
combined_audio = BytesIO() |
|
for part_number in sorted(audio_chunks.keys()): |
|
combined_audio.write(audio_chunks[part_number]) |
|
|
|
|
|
with open(filename, 'wb') as f: |
|
f.write(combined_audio.getvalue()) |
|
return filename.as_posix() |
|
|
|
except requests.exceptions.RequestException as e: |
|
raise exceptions.FailedToGenerateResponseError( |
|
f"Failed to perform the operation: {e}" |
|
) |
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
elevenlabs = ElevenlabsTTS() |
|
text = "This is a test of the ElevenlabsTTS text-to-speech API. It supports multiple sentences and advanced logging." |
|
|
|
audio_file = elevenlabs.tts(text, voice="Brian") |
|
|