elve / tts_script.py
Niansuh's picture
Create tts_script.py
92d5846 verified
raw
history blame
14.5 kB
import time
import requests
import pathlib
from io import BytesIO
from playsound import playsound
from webscout import exceptions
from webscout.AIbase import TTSProvider
from webscout.litagent import LitAgent
from concurrent.futures import ThreadPoolExecutor, as_completed
"""
Text processing utilities for TTS providers.
"""
from typing import List, Dict, Tuple, Set, Optional, Pattern
import re
class SentenceTokenizer:
"""Advanced sentence tokenizer with support for complex cases and proper formatting."""
def __init__(self) -> None:
# Common abbreviations by category
self.TITLES: Set[str] = {
'mr', 'mrs', 'ms', 'dr', 'prof', 'rev', 'sr', 'jr', 'esq',
'hon', 'pres', 'gov', 'atty', 'supt', 'det', 'rev', 'col','maj', 'gen', 'capt', 'cmdr',
'lt', 'sgt', 'cpl', 'pvt'
}
self.ACADEMIC: Set[str] = {
'ph.d', 'phd', 'm.d', 'md', 'b.a', 'ba', 'm.a', 'ma', 'd.d.s', 'dds',
'm.b.a', 'mba', 'b.sc', 'bsc', 'm.sc', 'msc', 'llb', 'll.b', 'bl'
}
self.ORGANIZATIONS: Set[str] = {
'inc', 'ltd', 'co', 'corp', 'llc', 'llp', 'assn', 'bros', 'plc', 'cos',
'intl', 'dept', 'est', 'dist', 'mfg', 'div'
}
self.MONTHS: Set[str] = {
'jan', 'feb', 'mar', 'apr', 'jun', 'jul', 'aug', 'sep', 'oct', 'nov', 'dec'
}
self.UNITS: Set[str] = {
'oz', 'pt', 'qt', 'gal', 'ml', 'cc', 'km', 'cm', 'mm', 'ft', 'in',
'kg', 'lb', 'lbs', 'hz', 'khz', 'mhz', 'ghz', 'kb', 'mb', 'gb', 'tb'
}
self.TECHNOLOGY: Set[str] = {
'v', 'ver', 'app', 'sys', 'dir', 'exe', 'lib', 'api', 'sdk', 'url',
'cpu', 'gpu', 'ram', 'rom', 'hdd', 'ssd', 'lan', 'wan', 'sql', 'html'
}
self.MISC: Set[str] = {
'vs', 'etc', 'ie', 'eg', 'no', 'al', 'ca', 'cf', 'pp', 'est', 'st',
'approx', 'appt', 'apt', 'dept', 'depts', 'min', 'max', 'avg'
}
# Combine all abbreviations
self.all_abbreviations: Set[str] = (
self.TITLES | self.ACADEMIC | self.ORGANIZATIONS |
self.MONTHS | self.UNITS | self.TECHNOLOGY | self.MISC
)
# Special patterns
self.ELLIPSIS: str = r'\.{2,}|…'
self.URL_PATTERN: str = (
r'(?:https?:\/\/|www\.)[\w\-\.]+\.[a-zA-Z]{2,}(?:\/[^\s]*)?'
)
self.EMAIL_PATTERN: str = r'[\w\.-]+@[\w\.-]+\.\w+'
self.NUMBER_PATTERN: str = (
r'\d+(?:\.\d+)?(?:%|Β°|km|cm|mm|m|kg|g|lb|ft|in|mph|kmh|hz|mhz|ghz)?'
)
# Quote and bracket pairs
self.QUOTE_PAIRS: Dict[str, str] = {
'"': '"', "'": "'", '"': '"', "γ€Œ": "」", "γ€Ž": "』",
"Β«": "Β»", "β€Ή": "β€Ί", "'": "'", "β€š": "'"
}
self.BRACKETS: Dict[str, str] = {
'(': ')', '[': ']', '{': '}', '⟨': '⟩', 'γ€Œ': '」',
'γ€Ž': '』', '【': '】', 'γ€–': 'γ€—', 'ο½’': 'ο½£'
}
# Compile regex patterns
self._compile_patterns()
def _compile_patterns(self) -> None:
"""Compile regex patterns for better performance."""
# Pattern for finding potential sentence boundaries
self.SENTENCE_END: Pattern = re.compile(
r'''
# Group for sentence endings
(?:
# Standard endings with optional quotes/brackets
(?<=[.!?])[\"\'\)\]\}»›」』\s]*
# Ellipsis
|(?:\.{2,}|…)
# Asian-style endings
|(?<=[γ€‚οΌοΌŸγ€γ€γ€‘\s])
)
# Must be followed by whitespace and capital letter or number
(?=\s+(?:[A-Z0-9]|["'({[\[γ€Œγ€Žγ€Šβ€Ήγ€ˆ][A-Z]))
''',
re.VERBOSE
)
# Pattern for abbreviations
abbrev_pattern = '|'.join(re.escape(abbr) for abbr in self.all_abbreviations)
self.ABBREV_PATTERN: Pattern = re.compile(
fr'\b(?:{abbrev_pattern})\.?',
re.IGNORECASE
)
def _protect_special_cases(self, text: str) -> Tuple[str, Dict[str, str]]:
"""Protect URLs, emails, and other special cases from being split."""
protected = text
placeholders: Dict[str, str] = {}
counter = 0
# Protect URLs and emails
for pattern in [self.URL_PATTERN, self.EMAIL_PATTERN]:
for match in re.finditer(pattern, protected):
placeholder = f'__PROTECTED_{counter}__'
placeholders[placeholder] = match.group()
protected = protected.replace(match.group(), placeholder)
counter += 1
# Protect quoted content
stack = []
protected_chars = list(protected)
i = 0
while i < len(protected_chars):
char = protected_chars[i]
if char in self.QUOTE_PAIRS:
stack.append((char, i))
elif stack and char == self.QUOTE_PAIRS[stack[-1][0]]:
start_quote, start_idx = stack.pop()
content = ''.join(protected_chars[start_idx:i + 1])
placeholder = f'__PROTECTED_{counter}__'
placeholders[placeholder] = content
protected_chars[start_idx:i + 1] = list(placeholder)
counter += 1
i += 1
return ''.join(protected_chars), placeholders
def _restore_special_cases(self, text: str, placeholders: Dict[str, str]) -> str:
"""Restore protected content."""
restored = text
for placeholder, original in placeholders.items():
restored = restored.replace(placeholder, original)
return restored
def _handle_abbreviations(self, text: str) -> str:
"""Handle abbreviations to prevent incorrect sentence splitting."""
def replace_abbrev(match: re.Match) -> str:
abbr = match.group().lower().rstrip('.')
if abbr in self.all_abbreviations:
return match.group().replace('.', '__DOT__')
return match.group()
return self.ABBREV_PATTERN.sub(replace_abbrev, text)
def _normalize_whitespace(self, text: str) -> str:
"""Normalize whitespace while preserving paragraph breaks."""
# Replace multiple newlines with special marker
text = re.sub(r'\n\s*\n', ' __PARA__ ', text)
# Normalize remaining whitespace
text = re.sub(r'\s+', ' ', text)
return text.strip()
def _restore_formatting(self, sentences: List[str]) -> List[str]:
"""Restore original formatting and clean up sentences."""
restored = []
for sentence in sentences:
# Restore dots in abbreviations
sentence = sentence.replace('__DOT__', '.')
# Restore paragraph breaks
sentence = sentence.replace('__PARA__', '\n\n')
# Clean up whitespace
sentence = re.sub(r'\s+', ' ', sentence).strip()
# Capitalize first letter if it's lowercase and not an abbreviation
words = sentence.split()
if words and words[0].lower() not in self.all_abbreviations:
sentence = sentence[0].upper() + sentence[1:]
if sentence:
restored.append(sentence)
return restored
def tokenize(self, text: str) -> List[str]:
"""
Split text into sentences while handling complex cases.
Args:
text (str): Input text to split into sentences.
Returns:
List[str]: List of properly formatted sentences.
"""
if not text or not text.strip():
return []
# Step 1: Protect special cases
protected_text, placeholders = self._protect_special_cases(text)
# Step 2: Normalize whitespace
protected_text = self._normalize_whitespace(protected_text)
# Step 3: Handle abbreviations
protected_text = self._handle_abbreviations(protected_text)
# Step 4: Split into potential sentences
potential_sentences = self.SENTENCE_END.split(protected_text)
# Step 5: Process and restore formatting
sentences = self._restore_formatting(potential_sentences)
# Step 6: Restore special cases
sentences = [self._restore_special_cases(s, placeholders) for s in sentences]
# Step 7: Post-process sentences
final_sentences = []
current_sentence = []
for sentence in sentences:
# Skip empty sentences
if not sentence.strip():
continue
# Check if sentence might be continuation of previous
if current_sentence and sentence[0].islower():
current_sentence.append(sentence)
else:
if current_sentence:
final_sentences.append(' '.join(current_sentence))
current_sentence = [sentence]
# Add last sentence if exists
if current_sentence:
final_sentences.append(' '.join(current_sentence))
return final_sentences
def split_sentences(text: str) -> List[str]:
"""
Convenience function to split text into sentences using SentenceTokenizer.
Args:
text (str): Input text to split into sentences.
Returns:
List[str]: List of properly formatted sentences.
"""
tokenizer = SentenceTokenizer()
return tokenizer.tokenize(text)
class ElevenlabsTTS(TTSProvider):
"""
Text-to-speech provider using the ElevenlabsTTS API.
"""
# Request headers
headers: dict[str, str] = {
"User-Agent": LitAgent().random()
}
cache_dir = pathlib.Path("./audio_cache")
all_voices: dict[str, str] = {"Brian": "nPczCjzI2devNBz1zQrb", "Alice":"Xb7hH8MSUJpSbSDYk0k2", "Bill":"pqHfZKP75CvOlQylNhV4", "Callum":"N2lVS1w4EtoT3dr4eOWO", "Charlie":"IKne3meq5aSn9XLyUdCD", "Charlotte":"XB0fDUnXU5powFXDhCwa", "Chris":"iP95p4xoKVk53GoZ742B", "Daniel":"onwK4e9ZLuTAKqWW03F9", "Eric":"cjVigY5qzO86Huf0OWal", "George":"JBFqnCBsd6RMkjVDRZzb", "Jessica":"cgSgspJ2msm6clMCkdW9", "Laura":"FGY2WhTYpPnrIDTdsKH5", "Liam":"TX3LPaxmHKxFdv7VOQHJ", "Lily":"pFZP5JQG7iQjIQuC4Bku", "Matilda":"XrExE9yKIg1WjnnlVkGX", "Sarah":"EXAVITQu4vr4xnSDxMaL", "Will":"bIHbv24MWmeRgasZH58o", "Neal":"Zp1aWhL05Pi5BkhizFC3"}
def __init__(self, timeout: int = 20, proxies: dict = None):
"""Initializes the ElevenlabsTTS TTS client."""
self.session = requests.Session()
self.session.headers.update(self.headers)
if proxies:
self.session.proxies.update(proxies)
self.timeout = timeout
self.params = {'allow_unauthenticated': '1'}
def tts(self, text: str, voice: str = "Brian", verbose:bool = True) -> str:
"""
Converts text to speech using the ElevenlabsTTS API and saves it to a file.
"""
assert (
voice in self.all_voices
), f"Voice '{voice}' not one of [{', '.join(self.all_voices.keys())}]"
filename = self.cache_dir / f"{int(time.time())}.mp3"
# Split text into sentences
sentences = split_sentences(text)
# Function to request audio for each chunk
def generate_audio_for_chunk(part_text: str, part_number: int):
while True:
try:
json_data = {'text': part_text, 'model_id': 'eleven_multilingual_v2'}
response = self.session.post(f'https://api.elevenlabs.io/v1/text-to-speech/{self.all_voices[voice]}',params=self.params, headers=self.headers, json=json_data, timeout=self.timeout)
response.raise_for_status()
# Create the audio_cache directory if it doesn't exist
self.cache_dir.mkdir(parents=True, exist_ok=True)
# Check if the request was successful
if response.ok and response.status_code == 200:
return part_number, response.content
else:
raise exceptions.FailedToGenerateResponseError(
f"Failed to generate audio for chunk {part_number}: {response.status_code}"
)
except requests.RequestException as e:
time.sleep(1)
continue
try:
# Using ThreadPoolExecutor to handle requests concurrently
with ThreadPoolExecutor() as executor:
futures = {executor.submit(generate_audio_for_chunk, sentence.strip(), chunk_num): chunk_num
for chunk_num, sentence in enumerate(sentences, start=1)}
# Dictionary to store results with order preserved
audio_chunks = {}
for future in as_completed(futures):
chunk_num = futures[future]
try:
part_number, audio_data = future.result()
audio_chunks[part_number] = audio_data
except Exception as e:
raise exceptions.FailedToGenerateResponseError(
f"Failed to generate audio for chunk {chunk_num}: {e}"
)
# Combine audio chunks in the correct sequence
combined_audio = BytesIO()
for part_number in sorted(audio_chunks.keys()):
combined_audio.write(audio_chunks[part_number])
# Save the combined audio data to a single file
with open(filename, 'wb') as f:
f.write(combined_audio.getvalue())
return filename.as_posix()
except requests.exceptions.RequestException as e:
raise exceptions.FailedToGenerateResponseError(
f"Failed to perform the operation: {e}"
)
# Example usage
if __name__ == "__main__":
elevenlabs = ElevenlabsTTS()
text = "This is a test of the ElevenlabsTTS text-to-speech API. It supports multiple sentences and advanced logging."
audio_file = elevenlabs.tts(text, voice="Brian")