import streamlit as st
import base64
import os
from PyPDF2 import PdfReader
import threading
import time
import hashlib
from datetime import datetime
import json
import asyncio
import edge_tts
# Patch asyncio for nested event loops
import nest_asyncio
nest_asyncio.apply()
# Available English voices for Edge TTS
EDGE_TTS_VOICES = [
"en-US-AriaNeural",
"en-US-GuyNeural",
"en-US-JennyNeural",
"en-GB-SoniaNeural",
"en-GB-RyanNeural",
"en-AU-NatashaNeural",
"en-AU-WilliamNeural",
"en-CA-ClaraNeural",
"en-CA-LiamNeural"
]
# Initialize session state for voice selection
if 'tts_voice' not in st.session_state:
st.session_state['tts_voice'] = EDGE_TTS_VOICES[0]
class AudioProcessor:
def __init__(self):
self.cache_dir = "audio_cache"
os.makedirs(self.cache_dir, exist_ok=True)
self.metadata = self._load_metadata()
def _load_metadata(self):
metadata_file = os.path.join(self.cache_dir, "metadata.json")
return json.load(open(metadata_file)) if os.path.exists(metadata_file) else {}
def _save_metadata(self):
metadata_file = os.path.join(self.cache_dir, "metadata.json")
with open(metadata_file, 'w') as f:
json.dump(self.metadata, f)
async def create_audio(self, text, voice='en-US-AriaNeural'):
cache_key = hashlib.md5(f"{text}:{voice}".encode()).hexdigest()
cache_path = os.path.join(self.cache_dir, f"{cache_key}.mp3")
if cache_key in self.metadata and os.path.exists(cache_path):
return open(cache_path, 'rb').read()
# Clean text for speech
text = text.replace("\n", " ").replace("", " ").strip()
if not text:
return None
# Generate audio with edge_tts
communicate = edge_tts.Communicate(text, voice)
await communicate.save(cache_path)
# Update metadata
self.metadata[cache_key] = {
'timestamp': datetime.now().isoformat(),
'text_length': len(text),
'voice': voice
}
self._save_metadata()
return open(cache_path, 'rb').read()
def get_download_link(bin_data, filename, size_mb=None):
b64 = base64.b64encode(bin_data).decode()
size_str = f"({size_mb:.1f} MB)" if size_mb else ""
return f'''
'''
def process_pdf(pdf_file, max_pages, voice, audio_processor):
reader = PdfReader(pdf_file)
total_pages = min(len(reader.pages), max_pages)
texts, audios = [], {}
async def process_page(i, text):
audio_data = await audio_processor.create_audio(text, voice)
audios[i] = audio_data
# Extract text and start audio processing
for i in range(total_pages):
text = reader.pages[i].extract_text()
texts.append(text)
# Process audio in background
threading.Thread(
target=lambda: asyncio.run(process_page(i, text))
).start()
return texts, audios, total_pages
def main():
st.set_page_config(page_title="📚 PDF to Audio 🎧", page_icon="🎉", layout="wide")
# Apply styling
st.markdown("""
""", unsafe_allow_html=True)
# Initialize processor
audio_processor = AudioProcessor()
# Sidebar settings
st.sidebar.title("📥 Downloads & Settings")
# Voice selection UI from second app
st.sidebar.markdown("### 🎤 Voice Settings")
selected_voice = st.sidebar.selectbox(
"👄 Select TTS Voice:",
options=EDGE_TTS_VOICES,
index=EDGE_TTS_VOICES.index(st.session_state['tts_voice'])
)
st.sidebar.markdown("""
# 🎙️ Voice Character Agent Selector 🎭
*Female Voices*:
- 🌸 **Aria** – Elegant, creative storytelling
- 🎶 **Jenny** – Friendly, conversational
- 🌺 **Sonia** – Bold, confident
- 🌌 **Natasha** – Sophisticated, mysterious
- 🌷 **Clara** – Cheerful, empathetic
*Male Voices*:
- 🌟 **Guy** – Authoritative, versatile
- 🛠️ **Ryan** – Approachable, casual
- 🎻 **William** – Classic, scholarly
- 🌟 **Liam** – Energetic, engaging
""")
if selected_voice != st.session_state['tts_voice']:
st.session_state['tts_voice'] = selected_voice
st.rerun()
# Main interface
st.markdown("📚 PDF to Audio Converter 🎧
", unsafe_allow_html=True)
col1, col2 = st.columns(2)
with col1:
uploaded_file = st.file_uploader("Choose a PDF file", "pdf")
with col2:
max_pages = st.slider('Select pages to process', min_value=1, max_value=100, value=10)
if uploaded_file:
progress_bar = st.progress(0)
status = st.empty()
with st.spinner('Processing PDF...'):
texts, audios, total_pages = process_pdf(uploaded_file, max_pages, st.session_state['tts_voice'], audio_processor)
for i, text in enumerate(texts):
with st.expander(f"Page {i+1}", expanded=i==0):
st.markdown(text)
# Wait for audio processing
while i not in audios:
time.sleep(0.1)
if audios[i]:
st.audio(audios[i], format='audio/mp3')
# Add download link
if audios[i]:
size_mb = len(audios[i]) / (1024 * 1024)
st.sidebar.markdown(
get_download_link(audios[i], f'page_{i+1}.mp3', size_mb),
unsafe_allow_html=True
)
progress_bar.progress((i + 1) / total_pages)
status.text(f"Processing page {i+1}/{total_pages}")
st.success(f"✅ Successfully processed {total_pages} pages!")
# Text to Audio section
st.markdown("### ✍️ Text to Audio")
prompt = st.text_area("Enter text to convert to audio", height=200)
if prompt:
with st.spinner('Converting text to audio...'):
audio_data = asyncio.run(audio_processor.create_audio(prompt, st.session_state['tts_voice']))
if audio_data:
st.audio(audio_data, format='audio/mp3')
size_mb = len(audio_data) / (1024 * 1024)
st.sidebar.markdown("### 🎵 Custom Audio")
st.sidebar.markdown(
get_download_link(audio_data, 'custom_text.mp3', size_mb),
unsafe_allow_html=True
)
# Cache management
if st.sidebar.button("Clear Cache"):
for file in os.listdir(audio_processor.cache_dir):
os.remove(os.path.join(audio_processor.cache_dir, file))
audio_processor.metadata = {}
audio_processor._save_metadata()
st.sidebar.success("Cache cleared successfully!")
if __name__ == "__main__":
main()