import streamlit as st
import base64
import os
import random
from PyPDF2 import PdfReader
import threading
import time
import hashlib
from datetime import datetime
import json
import asyncio
import edge_tts
# Patch asyncio for nested event loops
import nest_asyncio
nest_asyncio.apply()
# Character definitions with emojis
CHARACTERS = {
"Aria": {"emoji": "πΈ", "voice": "en-US-AriaNeural"},
"Jenny": {"emoji": "πΆ", "voice": "en-US-JennyNeural"},
"Sonia": {"emoji": "πΊ", "voice": "en-GB-SoniaNeural"},
"Natasha": {"emoji": "π", "voice": "en-AU-NatashaNeural"},
"Clara": {"emoji": "π·", "voice": "en-CA-ClaraNeural"},
"Guy": {"emoji": "π", "voice": "en-US-GuyNeural"},
"Ryan": {"emoji": "π οΈ", "voice": "en-GB-RyanNeural"},
"William": {"emoji": "π»", "voice": "en-AU-WilliamNeural"},
"Liam": {"emoji": "π", "voice": "en-CA-LiamNeural"}
}
# Available English voices for Edge TTS
EDGE_TTS_VOICES = list(CHARACTERS.values())[0]["voice"]
# Initialize session state
if 'tts_voice' not in st.session_state:
st.session_state['tts_voice'] = random.choice(list(CHARACTERS.values()))["voice"]
if 'character' not in st.session_state:
st.session_state['character'] = random.choice(list(CHARACTERS.keys()))
if 'history' not in st.session_state:
st.session_state['history'] = []
class AudioProcessor:
def __init__(self):
self.cache_dir = "audio_cache"
self.markdown_dir = "markdown_files"
self.log_file = "history_log.md"
os.makedirs(self.cache_dir, exist_ok=True)
os.makedirs(self.markdown_dir, exist_ok=True)
self.metadata = self._load_metadata()
def _load_metadata(self):
metadata_file = os.path.join(self.cache_dir, "metadata.json")
return json.load(open(metadata_file)) if os.path.exists(metadata_file) else {}
def _save_metadata(self):
metadata_file = os.path.join(self.cache_dir, "metadata.json")
with open(metadata_file, 'w') as f:
json.dump(self.metadata, f)
def _log_action(self, action, details):
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
with open(self.log_file, 'a', encoding='utf-8') as f:
f.write(f"[{timestamp}] {action}: {details}\n")
st.session_state['history'].append(f"[{timestamp}] {action}: {details}")
async def create_audio(self, text, voice, character):
cache_key = hashlib.md5(f"{text}:{voice}".encode()).hexdigest()
cache_path = os.path.join(self.cache_dir, f"{cache_key}.mp3")
if cache_key in self.metadata and os.path.exists(cache_path):
return open(cache_path, 'rb').read()
# Clean text for speech
text = text.replace("\n", " ").replace("", " ").strip()
if not text:
return None
# Generate audio with edge_tts
communicate = edge_tts.Communicate(text, voice)
await communicate.save(cache_path)
# Save markdown file
timestamp = datetime.now().strftime("%I%M %p %m%d%Y")
title_words = ' '.join(text.split()[:10])
filename = f"{timestamp} {character} {title_words}.md"
filepath = os.path.join(self.markdown_dir, filename)
with open(filepath, 'w', encoding='utf-8') as f:
f.write(f"# {title_words}\n\n**Character:** {character}\n**Voice:** {voice}\n\n{text}")
# Log action
self._log_action("Text to Audio", f"Created audio for '{title_words}' with {character} ({voice})")
# Update metadata
self.metadata[cache_key] = {
'timestamp': datetime.now().isoformat(),
'text_length': len(text),
'voice': voice,
'character': character,
'markdown_file': filename
}
self._save_metadata()
return open(cache_path, 'rb').read()
def get_download_link(bin_data, filename, size_mb=None):
b64 = base64.b64encode(bin_data).decode()
size_str = f"({size_mb:.1f} MB)" if size_mb else ""
return f'''
'''
def process_pdf(pdf_file, max_pages, voice, character, audio_processor):
reader = PdfReader(pdf_file)
total_pages = min(len(reader.pages), max_pages)
texts, audios = [], {}
async def process_page(i, text):
audio_data = await audio_processor.create_audio(text, voice, character)
audios[i] = audio_data
# Extract text and start audio processing
for i in range(total_pages):
text = reader.pages[i].extract_text()
texts.append(text)
# Process audio in background
threading.Thread(
target=lambda: asyncio.run(process_page(i, text))
).start()
return texts, audios, total_pages
def main():
st.set_page_config(page_title="πPDF πͺText to π£οΈSpeech π€Transformer", page_icon="π", layout="wide")
# Apply styling
st.markdown("""
""", unsafe_allow_html=True)
# Initialize processor
audio_processor = AudioProcessor()
# Sidebar settings
st.sidebar.title(f"{CHARACTERS[st.session_state['character']]['emoji']} Character Name: {st.session_state['character']}")
# Voice selection UI
st.sidebar.markdown("### π€ Voice Settings")
selected_voice = st.sidebar.selectbox(
"π Select TTS Voice:",
options=[char["voice"] for char in CHARACTERS.values()],
index=[char["voice"] for char in CHARACTERS.values()].index(st.session_state['tts_voice']),
key="voice_select"
)
selected_character = next(char for char, info in CHARACTERS.items() if info["voice"] == selected_voice)
st.sidebar.markdown("""
# ποΈ Voice Character Agent Selector π
*Female Voices*:
- πΈ **Aria** β Elegant, creative storytelling
- πΆ **Jenny** β Friendly, conversational
- πΊ **Sonia** β Bold, confident
- π **Natasha** β Sophisticated, mysterious
- π· **Clara** β Cheerful, empathetic
*Male Voices*:
- π **Guy** β Authoritative, versatile
- π οΈ **Ryan** β Approachable, casual
- π» **William** β Classic, scholarly
- π **Liam** β Energetic, engaging
""")
if selected_voice != st.session_state['tts_voice'] or selected_character != st.session_state['character']:
st.session_state['tts_voice'] = selected_voice
st.session_state['character'] = selected_character
audio_processor._log_action("Voice Change", f"Changed to {selected_character} ({selected_voice})")
st.rerun()
# Markdown file history
st.sidebar.markdown("### π History")
md_files = [f for f in os.listdir(audio_processor.markdown_dir) if f.endswith('.md') and f != 'README.md']
for md_file in md_files:
col1, col2, col3 = st.sidebar.columns([3, 1, 1])
with col1:
if st.button(f"ποΈ {md_file}", key=f"view_{md_file}"):
with open(os.path.join(audio_processor.markdown_dir, md_file), 'r', encoding='utf-8') as f:
st.session_state['current_md'] = f.read()
audio_processor._log_action("View File", f"Viewed {md_file}")
with col2:
if st.button("ποΈ", key=f"delete_{md_file}"):
os.remove(os.path.join(audio_processor.markdown_dir, md_file))
audio_processor._log_action("Delete File", f"Deleted {md_file}")
st.rerun()
with col3:
st.write("")
# History log
st.sidebar.markdown("### π Action History")
for entry in st.session_state['history']:
st.sidebar.write(entry)
# Main interface
st.markdown("π PDF to Audio Converter π§
", unsafe_allow_html=True)
# Display current markdown if selected
if 'current_md' in st.session_state:
st.markdown(st.session_state['current_md'])
col1, col2 = st.columns(2)
with col1:
uploaded_file = st.file_uploader("Choose a PDF file", "pdf")
with col2:
max_pages = st.slider('Select pages to process', min_value=1, max_value=100, value=10)
if uploaded_file:
progress_bar = st.progress(0)
status = st.empty()
with st.spinner('Processing PDF...'):
texts, audios, total_pages = process_pdf(
uploaded_file, max_pages,
st.session_state['tts_voice'],
st.session_state['character'],
audio_processor
)
for i, text in enumerate(texts):
with st.expander(f"Page {i+1}", expanded=i==0):
st.markdown(text)
# Wait for audio processing
while i not in audios:
time.sleep(0.1)
if audios[i]:
st.audio(audios[i], format='audio/mp3')
# Add download link
if audios[i]:
size_mb = len(audios[i]) / (1024 * 1024)
st.sidebar.markdown(
get_download_link(audios[i], f'page_{i+1}.mp3', size_mb),
unsafe_allow_html=True
)
progress_bar.progress((i + 1) / total_pages)
status.text(f"Processing page {i+1}/{total_pages}")
st.success(f"β
Successfully processed {total_pages} pages!")
audio_processor._log_action("PDF Processed", f"Processed {uploaded_file.name} ({total_pages} pages)")
# Text to Audio section
st.markdown("### βοΈ Text to Audio")
prompt = st.text_area("Enter text to convert to audio", height=200)
if prompt:
with st.spinner('Converting text to audio...'):
audio_data = asyncio.run(audio_processor.create_audio(
prompt,
st.session_state['tts_voice'],
st.session_state['character']
))
if audio_data:
st.audio(audio_data, format='audio/mp3')
size_mb = len(audio_data) / (1024 * 1024)
st.sidebar.markdown("### π΅ Custom Audio")
st.sidebar.markdown(
get_download_link(audio_data, 'custom_text.mp3', size_mb),
unsafe_allow_html=True
)
# Cache management
if st.sidebar.button("Clear Cache"):
for file in os.listdir(audio_processor.cache_dir):
os.remove(os.path.join(audio_processor.cache_dir, file))
audio_processor.metadata = {}
audio_processor._save_metadata()
audio_processor._log_action("Clear Cache", "Cleared audio cache")
st.sidebar.success("Cache cleared successfully!")
if __name__ == "__main__":
main()