|
import streamlit as st |
|
import base64 |
|
import os |
|
import random |
|
import glob |
|
from PyPDF2 import PdfReader |
|
import threading |
|
import time |
|
import hashlib |
|
from datetime import datetime |
|
import json |
|
import asyncio |
|
import edge_tts |
|
|
|
|
|
import nest_asyncio |
|
nest_asyncio.apply() |
|
|
|
|
|
CHARACTERS = { |
|
"Aria": {"emoji": "πΈ", "voice": "en-US-AriaNeural"}, |
|
"Jenny": {"emoji": "πΆ", "voice": "en-US-JennyNeural"}, |
|
"Sonia": {"emoji": "πΊ", "voice": "en-GB-SoniaNeural"}, |
|
"Natasha": {"emoji": "π", "voice": "en-AU-NatashaNeural"}, |
|
"Clara": {"emoji": "π·", "voice": "en-CA-ClaraNeural"}, |
|
"Guy": {"emoji": "π", "voice": "en-US-GuyNeural"}, |
|
"Ryan": {"emoji": "π οΈ", "voice": "en-GB-RyanNeural"}, |
|
"William": {"emoji": "π»", "voice": "en-AU-WilliamNeural"}, |
|
"Liam": {"emoji": "π", "voice": "en-CA-LiamNeural"} |
|
} |
|
|
|
|
|
if 'tts_voice' not in st.session_state: |
|
st.session_state['tts_voice'] = random.choice([char["voice"] for char in CHARACTERS.values()]) |
|
if 'character' not in st.session_state: |
|
st.session_state['character'] = random.choice(list(CHARACTERS.keys())) |
|
if 'history' not in st.session_state: |
|
st.session_state['history'] = [] |
|
|
|
class AudioProcessor: |
|
def __init__(self): |
|
self.cache_dir = "audio_cache" |
|
self.markdown_dir = "markdown_files" |
|
self.log_file = "history_log.md" |
|
os.makedirs(self.cache_dir, exist_ok=True) |
|
os.makedirs(self.markdown_dir, exist_ok=True) |
|
self.metadata = self._load_metadata() |
|
|
|
def _load_metadata(self): |
|
metadata_file = os.path.join(self.cache_dir, "metadata.json") |
|
return json.load(open(metadata_file)) if os.path.exists(metadata_file) else {} |
|
|
|
def _save_metadata(self): |
|
metadata_file = os.path.join(self.cache_dir, "metadata.json") |
|
with open(metadata_file, 'w') as f: |
|
json.dump(self.metadata, f) |
|
|
|
def _log_action(self, action, details): |
|
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") |
|
with open(self.log_file, 'a', encoding='utf-8') as f: |
|
f.write(f"[{timestamp}] {action}: {details}\n") |
|
st.session_state['history'].append(f"[{timestamp}] {action}: {details}") |
|
|
|
async def create_audio(self, text, voice, character): |
|
cache_key = hashlib.md5(f"{text}:{voice}".encode()).hexdigest() |
|
|
|
text = text.replace("\n", " ").replace("</s>", " ").strip() |
|
if not text: |
|
return None, None |
|
|
|
|
|
timestamp = datetime.now().strftime("%I%M %p %m%d%Y") |
|
title_words = '_'.join(text.split()[:10]) |
|
filename_base = f"{timestamp}_{character}_{title_words}" |
|
audio_filename = f"{filename_base}.mp3" |
|
md_filename = f"{filename_base}.md" |
|
audio_path = os.path.join(self.cache_dir, audio_filename) |
|
|
|
|
|
if cache_key in self.metadata and os.path.exists(audio_path): |
|
return open(audio_path, 'rb').read(), cache_key |
|
|
|
|
|
communicate = edge_tts.Communicate(text, voice) |
|
await communicate.save(audio_path) |
|
|
|
|
|
md_filepath = os.path.join(self.markdown_dir, md_filename) |
|
with open(md_filepath, 'w', encoding='utf-8') as f: |
|
f.write(f"# {title_words.replace('_', ' ')}\n\n**Character:** {character}\n**Voice:** {voice}\n\n{text}") |
|
|
|
|
|
self._log_action("Text to Audio", f"Created audio and markdown for '{title_words}' with {character} ({voice})") |
|
|
|
|
|
self.metadata[cache_key] = { |
|
'timestamp': datetime.now().isoformat(), |
|
'text_length': len(text), |
|
'voice': voice, |
|
'character': character, |
|
'markdown_file': md_filename, |
|
'audio_file': audio_filename |
|
} |
|
self._save_metadata() |
|
|
|
return open(audio_path, 'rb').read(), cache_key |
|
|
|
def get_download_link(bin_data, filename, size_mb=None): |
|
b64 = base64.b64encode(bin_data).decode() |
|
size_str = f"({size_mb:.1f} MB)" if size_mb else "" |
|
return f''' |
|
<div class="download-container"> |
|
<a href="data:audio/mpeg;base64,{b64}" |
|
download="{filename}" class="download-link">π₯ {filename}</a> |
|
<div class="file-info">{size_str}</div> |
|
</div> |
|
''' |
|
|
|
def process_pdf(pdf_file, max_pages, voice, character, audio_processor): |
|
reader = PdfReader(pdf_file) |
|
total_pages = min(len(reader.pages), max_pages) |
|
texts, audios = [], {} |
|
|
|
async def process_page(i, text): |
|
audio_data, _ = await audio_processor.create_audio(text, voice, character) |
|
audios[i] = audio_data |
|
|
|
for i in range(total_pages): |
|
text = reader.pages[i].extract_text() |
|
texts.append(text) |
|
threading.Thread( |
|
target=lambda: asyncio.run(process_page(i, text)) |
|
).start() |
|
|
|
return texts, audios, total_pages |
|
|
|
def main(): |
|
st.set_page_config(page_title="πPDF πͺText to π£οΈSpeech π€Transformer", page_icon="π", layout="wide") |
|
|
|
|
|
st.markdown(""" |
|
<style> |
|
.download-link { |
|
color: #1E90FF; |
|
text-decoration: none; |
|
padding: 8px 12px; |
|
margin: 5px; |
|
border: 1px solid #1E90FF; |
|
border-radius: 5px; |
|
display: inline-block; |
|
transition: all 0.3s ease; |
|
} |
|
.download-link:hover { |
|
background-color: #1E90FF; |
|
color: white; |
|
} |
|
.file-info { |
|
font-size: 0.8em; |
|
color: gray; |
|
margin-top: 4px; |
|
} |
|
</style> |
|
""", unsafe_allow_html=True) |
|
|
|
|
|
audio_processor = AudioProcessor() |
|
|
|
|
|
st.sidebar.title(f"{CHARACTERS[st.session_state['character']]['emoji']} Character Name: {st.session_state['character']}") |
|
|
|
|
|
st.sidebar.markdown("### π€ Voice Settings") |
|
selected_voice = st.sidebar.selectbox( |
|
"π Select TTS Voice:", |
|
options=[char["voice"] for char in CHARACTERS.values()], |
|
index=[char["voice"] for char in CHARACTERS.values()].index(st.session_state['tts_voice']), |
|
key="voice_select" |
|
) |
|
selected_character = next(char for char, info in CHARACTERS.items() if info["voice"] == selected_voice) |
|
|
|
st.sidebar.markdown(""" |
|
# ποΈ Voice Character Agent Selector π |
|
*Female Voices*: |
|
- πΈ **Aria** β Elegant, creative storytelling |
|
- πΆ **Jenny** β Friendly, conversational |
|
- πΊ **Sonia** β Bold, confident |
|
- π **Natasha** β Sophisticated, mysterious |
|
- π· **Clara** β Cheerful, empathetic |
|
|
|
*Male Voices*: |
|
- π **Guy** β Authoritative, versatile |
|
- π οΈ **Ryan** β Approachable, casual |
|
- π» **William** β Classic, scholarly |
|
- π **Liam** β Energetic, engaging |
|
""") |
|
|
|
if selected_voice != st.session_state['tts_voice'] or selected_character != st.session_state['character']: |
|
st.session_state['tts_voice'] = selected_voice |
|
st.session_state['character'] = selected_character |
|
audio_processor._log_action("Voice Change", f"Changed to {selected_character} ({selected_voice})") |
|
st.rerun() |
|
|
|
|
|
st.sidebar.markdown("### π Markdown History") |
|
md_files = [f for f in os.listdir(audio_processor.markdown_dir) if f.endswith('.md') and f != 'README.md'] |
|
for md_file in md_files: |
|
col1, col2, col3 = st.sidebar.columns([3, 1, 1]) |
|
with col1: |
|
if st.button(f"ποΈ {md_file}", key=f"view_{md_file}"): |
|
with open(os.path.join(audio_processor.markdown_dir, md_file), 'r', encoding='utf-8') as f: |
|
st.session_state['current_md'] = f.read() |
|
audio_processor._log_action("View File", f"Viewed {md_file}") |
|
with col2: |
|
if st.button("ποΈ", key=f"delete_md_{md_file}"): |
|
os.remove(os.path.join(audio_processor.markdown_dir, md_file)) |
|
audio_processor._log_action("Delete Markdown", f"Deleted {md_file}") |
|
st.rerun() |
|
with col3: |
|
st.write("") |
|
|
|
|
|
st.sidebar.markdown("### π΅ Audio History") |
|
audio_files = [f for f in glob.glob(os.path.join(audio_processor.cache_dir, "*.mp3")) if os.path.basename(f).startswith(tuple([f.split('.')[0] for f in md_files]))] |
|
for audio_file in audio_files: |
|
audio_filename = os.path.basename(audio_file) |
|
col1, col2, col3 = st.sidebar.columns([3, 1, 1]) |
|
with col1: |
|
if st.button(f"βΆοΈ {audio_filename}", key=f"play_{audio_filename}"): |
|
with open(audio_file, 'rb') as f: |
|
st.session_state['current_audio'] = {'data': f.read(), 'name': audio_filename} |
|
audio_processor._log_action("Play Audio", f"Played {audio_filename}") |
|
with col2: |
|
if st.button("ποΈ", key=f"delete_audio_{audio_filename}"): |
|
os.remove(audio_file) |
|
audio_processor._log_action("Delete Audio", f"Deleted {audio_filename}") |
|
st.rerun() |
|
with col3: |
|
st.write("") |
|
|
|
|
|
st.sidebar.markdown("### π Action History") |
|
for entry in st.session_state['history']: |
|
st.sidebar.write(entry) |
|
|
|
|
|
st.markdown("<h1>π PDF to Audio Converter π§</h1>", unsafe_allow_html=True) |
|
|
|
|
|
if 'current_md' in st.session_state: |
|
st.markdown(st.session_state['current_md']) |
|
if 'current_audio' in st.session_state: |
|
st.markdown(f"**Playing:** {st.session_state['current_audio']['name']}") |
|
st.audio(st.session_state['current_audio']['data'], format='audio/mp3') |
|
|
|
col1, col2 = st.columns(2) |
|
with col1: |
|
uploaded_file = st.file_uploader("Choose a PDF file", "pdf") |
|
with col2: |
|
max_pages = st.slider('Select pages to process', min_value=1, max_value=100, value=10) |
|
|
|
if uploaded_file: |
|
progress_bar = st.progress(0) |
|
status = st.empty() |
|
|
|
with st.spinner('Processing PDF...'): |
|
texts, audios, total_pages = process_pdf( |
|
uploaded_file, max_pages, |
|
st.session_state['tts_voice'], |
|
st.session_state['character'], |
|
audio_processor |
|
) |
|
|
|
for i, text in enumerate(texts): |
|
with st.expander(f"Page {i+1}", expanded=i==0): |
|
st.markdown(text) |
|
|
|
while i not in audios: |
|
time.sleep(0.1) |
|
if audios[i]: |
|
st.audio(audios[i], format='audio/mp3') |
|
|
|
if audios[i]: |
|
size_mb = len(audios[i]) / (1024 * 1024) |
|
st.sidebar.markdown( |
|
get_download_link(audios[i], f'page_{i+1}.mp3', size_mb), |
|
unsafe_allow_html=True |
|
) |
|
|
|
progress_bar.progress((i + 1) / total_pages) |
|
status.text(f"Processing page {i+1}/{total_pages}") |
|
|
|
st.success(f"β
Successfully processed {total_pages} pages!") |
|
audio_processor._log_action("PDF Processed", f"Processed {uploaded_file.name} ({total_pages} pages)") |
|
|
|
|
|
st.markdown("### βοΈ Text to Audio") |
|
prompt = st.text_area("Enter text to convert to audio", height=200) |
|
|
|
if prompt: |
|
with st.spinner('Converting text to audio...'): |
|
audio_data, cache_key = asyncio.run(audio_processor.create_audio( |
|
prompt, |
|
st.session_state['tts_voice'], |
|
st.session_state['character'] |
|
)) |
|
if audio_data: |
|
st.audio(audio_data, format='audio/mp3') |
|
|
|
size_mb = len(audio_data) / (1024 * 1024) |
|
st.sidebar.markdown("### π΅ Custom Audio") |
|
audio_filename = audio_processor.metadata[cache_key]['audio_file'] |
|
st.sidebar.markdown( |
|
get_download_link(audio_data, audio_filename, size_mb), |
|
unsafe_allow_html=True |
|
) |
|
|
|
|
|
if st.sidebar.button("Clear Cache"): |
|
for file in os.listdir(audio_processor.cache_dir): |
|
os.remove(os.path.join(audio_processor.cache_dir, file)) |
|
for file in os.listdir(audio_processor.markdown_dir): |
|
if file != 'README.md': |
|
os.remove(os.path.join(audio_processor.markdown_dir, file)) |
|
audio_processor.metadata = {} |
|
audio_processor._save_metadata() |
|
audio_processor._log_action("Clear Cache", "Cleared audio and markdown cache") |
|
st.sidebar.success("Cache cleared successfully!") |
|
|
|
if __name__ == "__main__": |
|
main() |