|
import streamlit as st |
|
import base64 |
|
import os |
|
from PyPDF2 import PdfReader |
|
import threading |
|
import time |
|
import hashlib |
|
from datetime import datetime |
|
import json |
|
import asyncio |
|
import edge_tts |
|
|
|
|
|
import nest_asyncio |
|
nest_asyncio.apply() |
|
|
|
|
|
EDGE_TTS_VOICES = [ |
|
"en-US-AriaNeural", |
|
"en-US-GuyNeural", |
|
"en-US-JennyNeural", |
|
"en-GB-SoniaNeural", |
|
"en-GB-RyanNeural", |
|
"en-AU-NatashaNeural", |
|
"en-AU-WilliamNeural", |
|
"en-CA-ClaraNeural", |
|
"en-CA-LiamNeural" |
|
] |
|
|
|
|
|
if 'tts_voice' not in st.session_state: |
|
st.session_state['tts_voice'] = EDGE_TTS_VOICES[0] |
|
|
|
class AudioProcessor: |
|
def __init__(self): |
|
self.cache_dir = "audio_cache" |
|
os.makedirs(self.cache_dir, exist_ok=True) |
|
self.metadata = self._load_metadata() |
|
|
|
def _load_metadata(self): |
|
metadata_file = os.path.join(self.cache_dir, "metadata.json") |
|
return json.load(open(metadata_file)) if os.path.exists(metadata_file) else {} |
|
|
|
def _save_metadata(self): |
|
metadata_file = os.path.join(self.cache_dir, "metadata.json") |
|
with open(metadata_file, 'w') as f: |
|
json.dump(self.metadata, f) |
|
|
|
async def create_audio(self, text, voice='en-US-AriaNeural'): |
|
cache_key = hashlib.md5(f"{text}:{voice}".encode()).hexdigest() |
|
cache_path = os.path.join(self.cache_dir, f"{cache_key}.mp3") |
|
|
|
if cache_key in self.metadata and os.path.exists(cache_path): |
|
return open(cache_path, 'rb').read() |
|
|
|
|
|
text = text.replace("\n", " ").replace("</s>", " ").strip() |
|
if not text: |
|
return None |
|
|
|
|
|
communicate = edge_tts.Communicate(text, voice) |
|
await communicate.save(cache_path) |
|
|
|
|
|
self.metadata[cache_key] = { |
|
'timestamp': datetime.now().isoformat(), |
|
'text_length': len(text), |
|
'voice': voice |
|
} |
|
self._save_metadata() |
|
|
|
return open(cache_path, 'rb').read() |
|
|
|
def get_download_link(bin_data, filename, size_mb=None): |
|
b64 = base64.b64encode(bin_data).decode() |
|
size_str = f"({size_mb:.1f} MB)" if size_mb else "" |
|
return f''' |
|
<div class="download-container"> |
|
<a href="data:audio/mpeg;base64,{b64}" |
|
download="{filename}" class="download-link">π₯ {filename}</a> |
|
<div class="file-info">{size_str}</div> |
|
</div> |
|
''' |
|
|
|
def process_pdf(pdf_file, max_pages, voice, audio_processor): |
|
reader = PdfReader(pdf_file) |
|
total_pages = min(len(reader.pages), max_pages) |
|
texts, audios = [], {} |
|
|
|
async def process_page(i, text): |
|
audio_data = await audio_processor.create_audio(text, voice) |
|
audios[i] = audio_data |
|
|
|
|
|
for i in range(total_pages): |
|
text = reader.pages[i].extract_text() |
|
texts.append(text) |
|
|
|
threading.Thread( |
|
target=lambda: asyncio.run(process_page(i, text)) |
|
).start() |
|
|
|
return texts, audios, total_pages |
|
|
|
def main(): |
|
st.set_page_config(page_title="π PDF to Audio π§", page_icon="π", layout="wide") |
|
|
|
|
|
st.markdown(""" |
|
<style> |
|
.download-link { |
|
color: #1E90FF; |
|
text-decoration: none; |
|
padding: 8px 12px; |
|
margin: 5px; |
|
border: 1px solid #1E90FF; |
|
border-radius: 5px; |
|
display: inline-block; |
|
transition: all 0.3s ease; |
|
} |
|
.download-link:hover { |
|
background-color: #1E90FF; |
|
color: white; |
|
} |
|
.file-info { |
|
font-size: 0.8em; |
|
color: gray; |
|
margin-top: 4px; |
|
} |
|
</style> |
|
""", unsafe_allow_html=True) |
|
|
|
|
|
audio_processor = AudioProcessor() |
|
|
|
|
|
st.sidebar.title("π₯ Downloads & Settings") |
|
|
|
|
|
st.sidebar.markdown("### π€ Voice Settings") |
|
selected_voice = st.sidebar.selectbox( |
|
"π Select TTS Voice:", |
|
options=EDGE_TTS_VOICES, |
|
index=EDGE_TTS_VOICES.index(st.session_state['tts_voice']) |
|
) |
|
st.sidebar.markdown(""" |
|
# ποΈ Voice Character Agent Selector π |
|
*Female Voices*: |
|
- πΈ **Aria** β Elegant, creative storytelling |
|
- πΆ **Jenny** β Friendly, conversational |
|
- πΊ **Sonia** β Bold, confident |
|
- π **Natasha** β Sophisticated, mysterious |
|
- π· **Clara** β Cheerful, empathetic |
|
|
|
*Male Voices*: |
|
- π **Guy** β Authoritative, versatile |
|
- π οΈ **Ryan** β Approachable, casual |
|
- π» **William** β Classic, scholarly |
|
- π **Liam** β Energetic, engaging |
|
""") |
|
|
|
if selected_voice != st.session_state['tts_voice']: |
|
st.session_state['tts_voice'] = selected_voice |
|
st.rerun() |
|
|
|
|
|
st.markdown("<h1>π PDF to Audio Converter π§</h1>", unsafe_allow_html=True) |
|
|
|
col1, col2 = st.columns(2) |
|
with col1: |
|
uploaded_file = st.file_uploader("Choose a PDF file", "pdf") |
|
with col2: |
|
max_pages = st.slider('Select pages to process', min_value=1, max_value=100, value=10) |
|
|
|
if uploaded_file: |
|
progress_bar = st.progress(0) |
|
status = st.empty() |
|
|
|
with st.spinner('Processing PDF...'): |
|
texts, audios, total_pages = process_pdf(uploaded_file, max_pages, st.session_state['tts_voice'], audio_processor) |
|
|
|
for i, text in enumerate(texts): |
|
with st.expander(f"Page {i+1}", expanded=i==0): |
|
st.markdown(text) |
|
|
|
|
|
while i not in audios: |
|
time.sleep(0.1) |
|
if audios[i]: |
|
st.audio(audios[i], format='audio/mp3') |
|
|
|
|
|
if audios[i]: |
|
size_mb = len(audios[i]) / (1024 * 1024) |
|
st.sidebar.markdown( |
|
get_download_link(audios[i], f'page_{i+1}.mp3', size_mb), |
|
unsafe_allow_html=True |
|
) |
|
|
|
progress_bar.progress((i + 1) / total_pages) |
|
status.text(f"Processing page {i+1}/{total_pages}") |
|
|
|
st.success(f"β
Successfully processed {total_pages} pages!") |
|
|
|
|
|
st.markdown("### βοΈ Text to Audio") |
|
prompt = st.text_area("Enter text to convert to audio", height=200) |
|
|
|
if prompt: |
|
with st.spinner('Converting text to audio...'): |
|
audio_data = asyncio.run(audio_processor.create_audio(prompt, st.session_state['tts_voice'])) |
|
if audio_data: |
|
st.audio(audio_data, format='audio/mp3') |
|
|
|
size_mb = len(audio_data) / (1024 * 1024) |
|
st.sidebar.markdown("### π΅ Custom Audio") |
|
st.sidebar.markdown( |
|
get_download_link(audio_data, 'custom_text.mp3', size_mb), |
|
unsafe_allow_html=True |
|
) |
|
|
|
|
|
if st.sidebar.button("Clear Cache"): |
|
for file in os.listdir(audio_processor.cache_dir): |
|
os.remove(os.path.join(audio_processor.cache_dir, file)) |
|
audio_processor.metadata = {} |
|
audio_processor._save_metadata() |
|
st.sidebar.success("Cache cleared successfully!") |
|
|
|
if __name__ == "__main__": |
|
main() |