import streamlit as st import base64 import os from PyPDF2 import PdfReader import threading import time import hashlib from datetime import datetime import json import asyncio import edge_tts # Patch asyncio for nested event loops import nest_asyncio nest_asyncio.apply() # Available English voices for Edge TTS EDGE_TTS_VOICES = [ "en-US-AriaNeural", "en-US-GuyNeural", "en-US-JennyNeural", "en-GB-SoniaNeural", "en-GB-RyanNeural", "en-AU-NatashaNeural", "en-AU-WilliamNeural", "en-CA-ClaraNeural", "en-CA-LiamNeural" ] # Initialize session state for voice selection if 'tts_voice' not in st.session_state: st.session_state['tts_voice'] = EDGE_TTS_VOICES[0] class AudioProcessor: def __init__(self): self.cache_dir = "audio_cache" os.makedirs(self.cache_dir, exist_ok=True) self.metadata = self._load_metadata() def _load_metadata(self): metadata_file = os.path.join(self.cache_dir, "metadata.json") return json.load(open(metadata_file)) if os.path.exists(metadata_file) else {} def _save_metadata(self): metadata_file = os.path.join(self.cache_dir, "metadata.json") with open(metadata_file, 'w') as f: json.dump(self.metadata, f) async def create_audio(self, text, voice='en-US-AriaNeural'): cache_key = hashlib.md5(f"{text}:{voice}".encode()).hexdigest() cache_path = os.path.join(self.cache_dir, f"{cache_key}.mp3") if cache_key in self.metadata and os.path.exists(cache_path): return open(cache_path, 'rb').read() # Clean text for speech text = text.replace("\n", " ").replace("", " ").strip() if not text: return None # Generate audio with edge_tts communicate = edge_tts.Communicate(text, voice) await communicate.save(cache_path) # Update metadata self.metadata[cache_key] = { 'timestamp': datetime.now().isoformat(), 'text_length': len(text), 'voice': voice } self._save_metadata() return open(cache_path, 'rb').read() def get_download_link(bin_data, filename, size_mb=None): b64 = base64.b64encode(bin_data).decode() size_str = f"({size_mb:.1f} MB)" if size_mb else "" return f'''
📥 {filename}
{size_str}
''' def process_pdf(pdf_file, max_pages, voice, audio_processor): reader = PdfReader(pdf_file) total_pages = min(len(reader.pages), max_pages) texts, audios = [], {} async def process_page(i, text): audio_data = await audio_processor.create_audio(text, voice) audios[i] = audio_data # Extract text and start audio processing for i in range(total_pages): text = reader.pages[i].extract_text() texts.append(text) # Process audio in background threading.Thread( target=lambda: asyncio.run(process_page(i, text)) ).start() return texts, audios, total_pages def main(): st.set_page_config(page_title="📚 PDF to Audio 🎧", page_icon="🎉", layout="wide") # Apply styling st.markdown(""" """, unsafe_allow_html=True) # Initialize processor audio_processor = AudioProcessor() # Sidebar settings st.sidebar.title("📥 Downloads & Settings") # Voice selection UI from second app st.sidebar.markdown("### 🎤 Voice Settings") selected_voice = st.sidebar.selectbox( "👄 Select TTS Voice:", options=EDGE_TTS_VOICES, index=EDGE_TTS_VOICES.index(st.session_state['tts_voice']) ) st.sidebar.markdown(""" # 🎙️ Voice Character Agent Selector 🎭 *Female Voices*: - 🌸 **Aria** – Elegant, creative storytelling - 🎶 **Jenny** – Friendly, conversational - 🌺 **Sonia** – Bold, confident - 🌌 **Natasha** – Sophisticated, mysterious - 🌷 **Clara** – Cheerful, empathetic *Male Voices*: - 🌟 **Guy** – Authoritative, versatile - 🛠️ **Ryan** – Approachable, casual - 🎻 **William** – Classic, scholarly - 🌟 **Liam** – Energetic, engaging """) if selected_voice != st.session_state['tts_voice']: st.session_state['tts_voice'] = selected_voice st.rerun() # Main interface st.markdown("

📚 PDF to Audio Converter 🎧

", unsafe_allow_html=True) col1, col2 = st.columns(2) with col1: uploaded_file = st.file_uploader("Choose a PDF file", "pdf") with col2: max_pages = st.slider('Select pages to process', min_value=1, max_value=100, value=10) if uploaded_file: progress_bar = st.progress(0) status = st.empty() with st.spinner('Processing PDF...'): texts, audios, total_pages = process_pdf(uploaded_file, max_pages, st.session_state['tts_voice'], audio_processor) for i, text in enumerate(texts): with st.expander(f"Page {i+1}", expanded=i==0): st.markdown(text) # Wait for audio processing while i not in audios: time.sleep(0.1) if audios[i]: st.audio(audios[i], format='audio/mp3') # Add download link if audios[i]: size_mb = len(audios[i]) / (1024 * 1024) st.sidebar.markdown( get_download_link(audios[i], f'page_{i+1}.mp3', size_mb), unsafe_allow_html=True ) progress_bar.progress((i + 1) / total_pages) status.text(f"Processing page {i+1}/{total_pages}") st.success(f"✅ Successfully processed {total_pages} pages!") # Text to Audio section st.markdown("### ✍️ Text to Audio") prompt = st.text_area("Enter text to convert to audio", height=200) if prompt: with st.spinner('Converting text to audio...'): audio_data = asyncio.run(audio_processor.create_audio(prompt, st.session_state['tts_voice'])) if audio_data: st.audio(audio_data, format='audio/mp3') size_mb = len(audio_data) / (1024 * 1024) st.sidebar.markdown("### 🎵 Custom Audio") st.sidebar.markdown( get_download_link(audio_data, 'custom_text.mp3', size_mb), unsafe_allow_html=True ) # Cache management if st.sidebar.button("Clear Cache"): for file in os.listdir(audio_processor.cache_dir): os.remove(os.path.join(audio_processor.cache_dir, file)) audio_processor.metadata = {} audio_processor._save_metadata() st.sidebar.success("Cache cleared successfully!") if __name__ == "__main__": main()