Spaces:
Sleeping
Sleeping
import os | |
import uuid | |
import json | |
import fitz | |
import requests | |
import streamlit as st | |
from io import BytesIO | |
from docx import Document | |
from dotenv import load_dotenv | |
from elevenlabs.client import ElevenLabs | |
from utils import voice_map, get_voice_prompt_style, AUDIO_DIR | |
from generate_audio import generate_audio | |
from logger_setup import logger | |
# Load API keys | |
load_dotenv() | |
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") | |
ELEVENLABS_API_KEY = os.getenv("ELEVENLABS_API_KEY") | |
client = ElevenLabs(api_key=ELEVENLABS_API_KEY) | |
# Streamlit config | |
st.set_page_config(page_title="Voice Agent Pro", page_icon="π§") | |
logger.info("π¬ Streamlit app started") | |
# Inject large fonts + tips | |
st.markdown(""" | |
<style> | |
.big-title { | |
font-size: 2.4em !important; | |
font-weight: bold; | |
color: #333333; | |
text-align: center; | |
} | |
.big-answer { | |
font-size: 1.6em; | |
line-height: 1.5; | |
color: #111; | |
} | |
textarea, input { | |
font-size: 1.2em !important; | |
} | |
.instructions { | |
font-size: 1.1em; | |
padding: 0.5em; | |
background-color: #f0f4ff; | |
border-radius: 0.5em; | |
margin-bottom: 1em; | |
} | |
</style> | |
""", unsafe_allow_html=True) | |
st.markdown('<div class="big-title">π§ Voice Agent Pro</div>', unsafe_allow_html=True) | |
st.markdown("""<div class="instructions">Ask a question <b>OR</b> paste a URL <b>OR</b> upload a file β and I'll summarize it in bullet points with expressive AI narration!</div>""", unsafe_allow_html=True) | |
# Voice selection | |
st.sidebar.header("ποΈ Voice Settings") | |
voice_label = st.sidebar.selectbox("Choose a voice:", list(voice_map.keys())) | |
voice_id = voice_map[voice_label] | |
tone_prompt = get_voice_prompt_style(voice_label) | |
font_size = st.sidebar.radio("Font Size", ["Normal", "Large"]) | |
font_class = "big-answer" if font_size == "Large" else "" | |
# Add Bolt attribution to sidebar | |
st.sidebar.markdown("---") | |
st.sidebar.markdown("β‘ Made with [bolt.new](https://bolt.new)") | |
# One-liners per voice | |
preview_lines = { | |
"grandma GG": "Back in my day, we didnβt need AI to sound this fabulous.", | |
"tech wizard": "System online. You may now enter your query, human.", | |
"perky sidekick": "You got this! Letβs answer that question together!", | |
"bill the newscaster": "Breaking news β youβve just selected the perfect voice.", | |
"spunky charlie": "Whoa! Is it story time already? Letβs go!", | |
"sassy teen": "Seriously? You better ask something cool." | |
} | |
preview_line = preview_lines.get(voice_label, "Testing voice.") | |
st.markdown(f"π§ <b>{voice_label}</b> says:", unsafe_allow_html=True) | |
st.markdown(f"_{preview_line}_", unsafe_allow_html=True) | |
# Stream preview audio (no autoplay) | |
try: | |
audio_stream = client.text_to_speech.convert( | |
text=preview_line, | |
voice_id=voice_id, | |
model_id="eleven_multilingual_v2" | |
) | |
full_audio_content = b"" | |
for chunk in audio_stream: | |
full_audio_content += chunk | |
st.audio(full_audio_content) | |
except Exception as e: | |
st.warning("Voice preview unavailable.") | |
logger.exception("π§ Voice preview error") | |
# Session state | |
if "answer" not in st.session_state: st.session_state.answer = "" | |
if "audio_key" not in st.session_state: st.session_state.audio_key = None | |
if "file_text" not in st.session_state: st.session_state.file_text = "" | |
if "key_points" not in st.session_state: st.session_state.key_points = [] | |
# Inputs | |
query = st.text_area("π¨οΈ Ask your question:", value="", placeholder="Ask your question", key="query") | |
url = st.text_input("π Or paste a URL:") | |
uploaded_file = st.file_uploader("π Or upload a file (PDF, TXT, DOCX)", type=["pdf", "txt", "docx"]) | |
# File reader | |
def extract_text_from_file(file): | |
file_type = file.name.split('.')[-1].lower() | |
if file_type == "pdf": | |
try: | |
with fitz.open(stream=file.read(), filetype="pdf") as doc: | |
return "\n".join(page.get_text() for page in doc) | |
except Exception as e: | |
logger.error(f"β PDF read failed: {e}") | |
return "Failed to read the PDF." | |
elif file_type == "txt": | |
return file.read().decode("utf-8", errors="ignore") | |
elif file_type == "docx": | |
try: | |
doc = Document(file) | |
return "\n".join(p.text for p in doc.paragraphs) | |
except Exception as e: | |
logger.error(f"β DOCX read failed: {e}") | |
return "Failed to read the DOCX file." | |
return "Unsupported file type." | |
if uploaded_file: | |
st.session_state.file_text = extract_text_from_file(uploaded_file) | |
logger.info(f"π Extracted from file: {uploaded_file.name}") | |
# Clear app | |
if st.button("π§Ή Clear All"): | |
logger.info("π§Ό Reset clicked") | |
st.rerun() | |
# GPT streaming | |
def stream_openai_response(payload, headers): | |
with requests.post("https://api.openai.com/v1/chat/completions", headers=headers, json=payload, stream=True) as r: | |
for line in r.iter_lines(): | |
if line and line.startswith(b"data: "): | |
yield line[len(b"data: "):].decode() | |
# Summarize | |
if st.button("π Summarize"): | |
if not query and not url and not uploaded_file: | |
st.warning("Please enter a question, a URL, or upload a file.") | |
logger.warning("β οΈ Summarize clicked with no input") | |
else: | |
with st.spinner("Talking to GPT..."): | |
try: | |
context = "" | |
if st.session_state.file_text: | |
context += st.session_state.file_text + "\n\n" | |
if url: | |
context += f"Summarize this page: {url}\n\n" | |
context += ( | |
"You are a voice assistant with the following tone:\n" | |
f"{tone_prompt}\n\n" | |
) | |
if query.strip(): | |
context += f"Now answer this in bullet points:\n{query}" | |
else: | |
context += "Summarize the content above in bullet points." | |
headers = {"Authorization": f"Bearer {OPENAI_API_KEY}"} | |
payload = { | |
"model": "gpt-4o", | |
"messages": [{"role": "user", "content": context}], | |
"temperature": 0.7, | |
"stream": True | |
} | |
st.session_state.answer = "" | |
answer_box = st.empty() | |
logger.info("π§ GPT stream started") | |
for chunk in stream_openai_response(payload, headers): | |
if chunk.strip() == "[DONE]": | |
logger.info("π’ GPT done") | |
continue | |
try: | |
parsed = json.loads(chunk) | |
delta = parsed['choices'][0]['delta'].get('content', '') | |
st.session_state.answer += delta | |
answer_box.markdown(f'<div class="{font_class}">{st.session_state.answer}</div>', unsafe_allow_html=True) | |
except json.JSONDecodeError: | |
logger.warning(f"β οΈ Non-JSON chunk skipped: {chunk}") | |
continue | |
audio_key = str(uuid.uuid4()) | |
generate_audio(st.session_state.answer, voice_id, audio_key) | |
st.session_state.audio_key = audio_key | |
logger.info(f"π§ Audio ready: {audio_key}") | |
except Exception as e: | |
st.error(f"π₯ Error: {e}") | |
logger.exception("π₯ GPT/audio failed") | |
# Output | |
if st.session_state.answer: | |
st.subheader("π Answer") | |
st.success(st.session_state.answer) | |
if st.session_state.audio_key: | |
audio_path = os.path.join(AUDIO_DIR, f"{st.session_state.audio_key}.mp3") | |
if os.path.exists(audio_path): | |
st.audio(audio_path) | |
else: | |
st.error("β Audio file missing.") | |
logger.warning(f"β Missing audio file: {audio_path}") |