import streamlit as st import base64 import os import random from PyPDF2 import PdfReader import threading import time import hashlib from datetime import datetime import json import asyncio import edge_tts # Patch asyncio for nested event loops import nest_asyncio nest_asyncio.apply() # Character definitions with emojis CHARACTERS = { "Aria": {"emoji": "🌸", "voice": "en-US-AriaNeural"}, "Jenny": {"emoji": "🎢", "voice": "en-US-JennyNeural"}, "Sonia": {"emoji": "🌺", "voice": "en-GB-SoniaNeural"}, "Natasha": {"emoji": "🌌", "voice": "en-AU-NatashaNeural"}, "Clara": {"emoji": "🌷", "voice": "en-CA-ClaraNeural"}, "Guy": {"emoji": "🌟", "voice": "en-US-GuyNeural"}, "Ryan": {"emoji": "πŸ› οΈ", "voice": "en-GB-RyanNeural"}, "William": {"emoji": "🎻", "voice": "en-AU-WilliamNeural"}, "Liam": {"emoji": "🌟", "voice": "en-CA-LiamNeural"} } # Available English voices for Edge TTS EDGE_TTS_VOICES = list(CHARACTERS.values())[0]["voice"] # Initialize session state if 'tts_voice' not in st.session_state: st.session_state['tts_voice'] = random.choice(list(CHARACTERS.values()))["voice"] if 'character' not in st.session_state: st.session_state['character'] = random.choice(list(CHARACTERS.keys())) if 'history' not in st.session_state: st.session_state['history'] = [] class AudioProcessor: def __init__(self): self.cache_dir = "audio_cache" self.markdown_dir = "markdown_files" self.log_file = "history_log.md" os.makedirs(self.cache_dir, exist_ok=True) os.makedirs(self.markdown_dir, exist_ok=True) self.metadata = self._load_metadata() def _load_metadata(self): metadata_file = os.path.join(self.cache_dir, "metadata.json") return json.load(open(metadata_file)) if os.path.exists(metadata_file) else {} def _save_metadata(self): metadata_file = os.path.join(self.cache_dir, "metadata.json") with open(metadata_file, 'w') as f: json.dump(self.metadata, f) def _log_action(self, action, details): timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") with open(self.log_file, 'a', encoding='utf-8') as f: f.write(f"[{timestamp}] {action}: {details}\n") st.session_state['history'].append(f"[{timestamp}] {action}: {details}") async def create_audio(self, text, voice, character): cache_key = hashlib.md5(f"{text}:{voice}".encode()).hexdigest() cache_path = os.path.join(self.cache_dir, f"{cache_key}.mp3") if cache_key in self.metadata and os.path.exists(cache_path): return open(cache_path, 'rb').read() # Clean text for speech text = text.replace("\n", " ").replace("", " ").strip() if not text: return None # Generate audio with edge_tts communicate = edge_tts.Communicate(text, voice) await communicate.save(cache_path) # Save markdown file timestamp = datetime.now().strftime("%I%M %p %m%d%Y") title_words = ' '.join(text.split()[:10]) filename = f"{timestamp} {character} {title_words}.md" filepath = os.path.join(self.markdown_dir, filename) with open(filepath, 'w', encoding='utf-8') as f: f.write(f"# {title_words}\n\n**Character:** {character}\n**Voice:** {voice}\n\n{text}") # Log action self._log_action("Text to Audio", f"Created audio for '{title_words}' with {character} ({voice})") # Update metadata self.metadata[cache_key] = { 'timestamp': datetime.now().isoformat(), 'text_length': len(text), 'voice': voice, 'character': character, 'markdown_file': filename } self._save_metadata() return open(cache_path, 'rb').read() def get_download_link(bin_data, filename, size_mb=None): b64 = base64.b64encode(bin_data).decode() size_str = f"({size_mb:.1f} MB)" if size_mb else "" return f'''
πŸ“₯ {filename}
{size_str}
''' def process_pdf(pdf_file, max_pages, voice, character, audio_processor): reader = PdfReader(pdf_file) total_pages = min(len(reader.pages), max_pages) texts, audios = [], {} async def process_page(i, text): audio_data = await audio_processor.create_audio(text, voice, character) audios[i] = audio_data # Extract text and start audio processing for i in range(total_pages): text = reader.pages[i].extract_text() texts.append(text) # Process audio in background threading.Thread( target=lambda: asyncio.run(process_page(i, text)) ).start() return texts, audios, total_pages def main(): st.set_page_config(page_title="πŸ“šPDF πŸͺ„Text to πŸ—£οΈSpeech πŸ€–Transformer", page_icon="πŸ“š", layout="wide") # Apply styling st.markdown(""" """, unsafe_allow_html=True) # Initialize processor audio_processor = AudioProcessor() # Sidebar settings st.sidebar.title(f"{CHARACTERS[st.session_state['character']]['emoji']} Character Name: {st.session_state['character']}") # Voice selection UI st.sidebar.markdown("### 🎀 Voice Settings") selected_voice = st.sidebar.selectbox( "πŸ‘„ Select TTS Voice:", options=[char["voice"] for char in CHARACTERS.values()], index=[char["voice"] for char in CHARACTERS.values()].index(st.session_state['tts_voice']), key="voice_select" ) selected_character = next(char for char, info in CHARACTERS.items() if info["voice"] == selected_voice) st.sidebar.markdown(""" # πŸŽ™οΈ Voice Character Agent Selector 🎭 *Female Voices*: - 🌸 **Aria** – Elegant, creative storytelling - 🎢 **Jenny** – Friendly, conversational - 🌺 **Sonia** – Bold, confident - 🌌 **Natasha** – Sophisticated, mysterious - 🌷 **Clara** – Cheerful, empathetic *Male Voices*: - 🌟 **Guy** – Authoritative, versatile - πŸ› οΈ **Ryan** – Approachable, casual - 🎻 **William** – Classic, scholarly - 🌟 **Liam** – Energetic, engaging """) if selected_voice != st.session_state['tts_voice'] or selected_character != st.session_state['character']: st.session_state['tts_voice'] = selected_voice st.session_state['character'] = selected_character audio_processor._log_action("Voice Change", f"Changed to {selected_character} ({selected_voice})") st.rerun() # Markdown file history st.sidebar.markdown("### πŸ“œ History") md_files = [f for f in os.listdir(audio_processor.markdown_dir) if f.endswith('.md') and f != 'README.md'] for md_file in md_files: col1, col2, col3 = st.sidebar.columns([3, 1, 1]) with col1: if st.button(f"πŸ‘οΈ {md_file}", key=f"view_{md_file}"): with open(os.path.join(audio_processor.markdown_dir, md_file), 'r', encoding='utf-8') as f: st.session_state['current_md'] = f.read() audio_processor._log_action("View File", f"Viewed {md_file}") with col2: if st.button("πŸ—‘οΈ", key=f"delete_{md_file}"): os.remove(os.path.join(audio_processor.markdown_dir, md_file)) audio_processor._log_action("Delete File", f"Deleted {md_file}") st.rerun() with col3: st.write("") # History log st.sidebar.markdown("### πŸ“‹ Action History") for entry in st.session_state['history']: st.sidebar.write(entry) # Main interface st.markdown("

πŸ“š PDF to Audio Converter 🎧

", unsafe_allow_html=True) # Display current markdown if selected if 'current_md' in st.session_state: st.markdown(st.session_state['current_md']) col1, col2 = st.columns(2) with col1: uploaded_file = st.file_uploader("Choose a PDF file", "pdf") with col2: max_pages = st.slider('Select pages to process', min_value=1, max_value=100, value=10) if uploaded_file: progress_bar = st.progress(0) status = st.empty() with st.spinner('Processing PDF...'): texts, audios, total_pages = process_pdf( uploaded_file, max_pages, st.session_state['tts_voice'], st.session_state['character'], audio_processor ) for i, text in enumerate(texts): with st.expander(f"Page {i+1}", expanded=i==0): st.markdown(text) # Wait for audio processing while i not in audios: time.sleep(0.1) if audios[i]: st.audio(audios[i], format='audio/mp3') # Add download link if audios[i]: size_mb = len(audios[i]) / (1024 * 1024) st.sidebar.markdown( get_download_link(audios[i], f'page_{i+1}.mp3', size_mb), unsafe_allow_html=True ) progress_bar.progress((i + 1) / total_pages) status.text(f"Processing page {i+1}/{total_pages}") st.success(f"βœ… Successfully processed {total_pages} pages!") audio_processor._log_action("PDF Processed", f"Processed {uploaded_file.name} ({total_pages} pages)") # Text to Audio section st.markdown("### ✍️ Text to Audio") prompt = st.text_area("Enter text to convert to audio", height=200) if prompt: with st.spinner('Converting text to audio...'): audio_data = asyncio.run(audio_processor.create_audio( prompt, st.session_state['tts_voice'], st.session_state['character'] )) if audio_data: st.audio(audio_data, format='audio/mp3') size_mb = len(audio_data) / (1024 * 1024) st.sidebar.markdown("### 🎡 Custom Audio") st.sidebar.markdown( get_download_link(audio_data, 'custom_text.mp3', size_mb), unsafe_allow_html=True ) # Cache management if st.sidebar.button("Clear Cache"): for file in os.listdir(audio_processor.cache_dir): os.remove(os.path.join(audio_processor.cache_dir, file)) audio_processor.metadata = {} audio_processor._save_metadata() audio_processor._log_action("Clear Cache", "Cleared audio cache") st.sidebar.success("Cache cleared successfully!") if __name__ == "__main__": main()