import streamlit as st import os, glob, re, base64, asyncio, requests from datetime import datetime from collections import defaultdict from urllib.parse import quote from xml.etree import ElementTree as ET import edge_tts import streamlit.components.v1 as components # -------------------- Configuration -------------------- # Exactly 11 user names and 11 voices (as an example) USER_NAMES = [ "Aria", "Guy", "Sonia", "Tony", "Jenny", "Davis", "Libby", "Clara", "Liam", "Natasha", "William" ] ENGLISH_VOICES = [ "en-US-AriaNeural", "en-US-GuyNeural", "en-GB-SoniaNeural", "en-GB-TonyNeural", "en-US-JennyNeural", "en-US-DavisNeural", "en-GB-LibbyNeural", "en-CA-ClaraNeural", "en-CA-LiamNeural", "en-AU-NatashaNeural", "en-AU-WilliamNeural" ] USER_VOICES = dict(zip(USER_NAMES, ENGLISH_VOICES)) SAVED_INPUTS_DIR = "saved_inputs" os.makedirs(SAVED_INPUTS_DIR, exist_ok=True) # Session state if 'user_name' not in st.session_state: st.session_state['user_name'] = USER_NAMES[0] if 'old_val' not in st.session_state: st.session_state['old_val'] = None if 'should_rerun' not in st.session_state: st.session_state['should_rerun'] = False if 'viewing_prefix' not in st.session_state: st.session_state['viewing_prefix'] = None # -------------------- Utility Functions -------------------- def clean_for_speech(text: str) -> str: text = text.replace("\n", " ") text = text.replace("", " ") text = text.replace("#", "") text = re.sub(r"\(https?:\/\/[^\)]+\)", "", text) text = re.sub(r"\s+", " ", text).strip() return text async def edge_tts_generate_audio(text, voice="en-US-AriaNeural"): text = clean_for_speech(text) if not text.strip(): return None communicate = edge_tts.Communicate(text, voice) out_fn = f"speech_{datetime.now().strftime('%Y%m%d_%H%M%S')}.mp3" try: await communicate.save(out_fn) except edge_tts.exceptions.NoAudioReceived: st.error("No audio received from TTS service.") return None return out_fn def speak_with_edge_tts(text, voice="en-US-AriaNeural"): return asyncio.run(edge_tts_generate_audio(text, voice)) def play_and_download_audio(file_path): if file_path and os.path.exists(file_path): st.audio(file_path) dl_link = f'Download {os.path.basename(file_path)}' st.markdown(dl_link, unsafe_allow_html=True) def save_input_as_md(user_name, text, prefix="input"): if not text.strip(): return timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") safe_text = re.sub(r'[^\w\s-]', '', text[:50]).strip().lower() safe_text = re.sub(r'[-\s]+', '-', safe_text) fn = f"{prefix}_{timestamp}_{safe_text}.md" full_path = os.path.join(SAVED_INPUTS_DIR, fn) with open(full_path, 'w', encoding='utf-8') as f: f.write(f"# User: {user_name}\n") f.write(f"**Timestamp:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n") f.write(text) return full_path def list_saved_inputs(): files = sorted(glob.glob(os.path.join(SAVED_INPUTS_DIR, "*.md"))) return files def parse_md_file(fpath): user_line = "" ts_line = "" content_lines = [] with open(fpath, 'r', encoding='utf-8') as f: lines = f.readlines() for line in lines: if line.startswith("# User:"): user_line = line.replace("# User:", "").strip() elif line.startswith("**Timestamp:**"): ts_line = line.replace("**Timestamp:**", "").strip() else: content_lines.append(line.strip()) content = "\n".join(content_lines).strip() return user_line, ts_line, content def arxiv_search(query, max_results=3): base_url = "http://export.arxiv.org/api/query" params = { 'search_query': query.replace(' ', '+'), 'start': 0, 'max_results': max_results } response = requests.get(base_url, params=params, timeout=30) if response.status_code == 200: root = ET.fromstring(response.text) ns = {"a": "http://www.w3.org/2005/Atom"} entries = root.findall('a:entry', ns) results = [] for entry in entries: title = entry.find('a:title', ns).text.strip() summary = entry.find('a:summary', ns).text.strip() summary_short = summary[:300] + "..." results.append((title, summary_short)) return results return [] def summarize_arxiv_results(results): lines = [] for i, (title, summary) in enumerate(results, 1): lines.append(f"Result {i}: {title}\n{summary}\n") return "\n\n".join(lines) def concatenate_mp3(files, output_file): with open(output_file, 'wb') as outfile: for f in files: with open(f, 'rb') as infile: outfile.write(infile.read()) def load_groups(): files = list_saved_inputs() groups = defaultdict(list) for fpath in files: fname = os.path.basename(fpath) prefix = fname[:10] groups[prefix].append(fpath) for prefix in groups: groups[prefix].sort(key=lambda x: os.path.getmtime(x), reverse=True) sorted_prefixes = sorted(groups.keys(), key=lambda pre: max(os.path.getmtime(x) for x in groups[pre]), reverse=True) return groups, sorted_prefixes # -------------------- Main Application -------------------- st.title("🎙️ Voice Chat & ArXiv Search") with st.sidebar: st.session_state['user_name'] = st.selectbox("Current User:", USER_NAMES, index=0) saved_files = list_saved_inputs() st.write("📝 Saved Inputs:") for fpath in saved_files: user, ts, content = parse_md_file(fpath) fname = os.path.basename(fpath) st.write(f"- {fname} (User: {user})") if st.button("🗑️ Clear All History"): for fpath in saved_files: os.remove(fpath) st.session_state['viewing_prefix'] = None st.success("All history cleared!") st.experimental_rerun() # Voice input component (replace path with your component) mycomponent = components.declare_component("mycomponent", path="mycomponent") voice_val = mycomponent(my_input_value="Start speaking...") tabs = st.tabs(["🎤 Voice Chat", "🔍 ArXiv Search", "💾 History", "⚙️ Settings"]) # ------------------ Voice Chat Tab ------------------------- with tabs[0]: st.subheader("🎤 Voice Chat") if voice_val: voice_text = voice_val.strip() edited_input = st.text_area("✏️ Edit Voice Input:", value=voice_text, height=100) autorun = st.checkbox("⚡ Auto-Run", value=True) input_changed = (voice_text != st.session_state.get('old_val')) if autorun and input_changed: st.session_state['old_val'] = voice_text # Save input right away saved_path = save_input_as_md(st.session_state['user_name'], edited_input, prefix="input") st.success("Saved input!") if st.button("📝 Save Input Manually"): saved_path = save_input_as_md(st.session_state['user_name'], edited_input, prefix="input") st.success("Saved input!") st.write("Use the sidebar to select user and the voice input component above to record messages.") # ------------------ ArXiv Search Tab ------------------------- with tabs[1]: st.subheader("🔍 ArXiv Search") query = st.text_input("Enter Query:") if query and st.button("🔍 Search ArXiv"): with st.spinner("Searching..."): results = arxiv_search(query) if results: summary = summarize_arxiv_results(results) # Save as response save_input_as_md(st.session_state['user_name'], summary, prefix="arxiv") st.write(summary) # Read aloud summary voice = USER_VOICES.get(st.session_state['user_name'], "en-US-AriaNeural") audio_file = speak_with_edge_tts(summary, voice=voice) if audio_file: play_and_download_audio(audio_file) else: st.warning("No results found.") # ------------------ History Tab ------------------------- with tabs[2]: st.subheader("💾 History") files = list_saved_inputs() conversation = [] for fpath in files: user, ts, content = parse_md_file(fpath) conversation.append((user, ts, content, fpath)) # Show conversation and read aloud each line for i, (user, ts, content, fpath) in enumerate(reversed(conversation), start=1): with st.expander(f"{ts} - {user}", expanded=False): st.write(content) if st.button(f"🔊 Read Aloud {ts}-{user}", key=f"read_{i}_{fpath}"): voice = USER_VOICES.get(user, "en-US-AriaNeural") audio_file = speak_with_edge_tts(content, voice=voice) if audio_file: play_and_download_audio(audio_file) # Read entire conversation if st.button("📜 Read Entire Conversation"): conversation_chrono = list(reversed(conversation)) mp3_files = [] for user, ts, content, fpath in conversation_chrono: voice = USER_VOICES.get(user, "en-US-AriaNeural") audio_file = speak_with_edge_tts(content, voice=voice) if audio_file: mp3_files.append(audio_file) st.write(f"**{user} ({ts}):**") play_and_download_audio(audio_file) if mp3_files: combined_file = f"full_conversation_{datetime.now().strftime('%Y%m%d_%H%M%S')}.mp3" concatenate_mp3(mp3_files, combined_file) st.write("**Full Conversation Audio:**") play_and_download_audio(combined_file) # ------------------ Settings Tab ------------------------- with tabs[3]: st.subheader("⚙️ Settings") st.write("Adjust parameters in the sidebar. Currently, no other settings to configure.") if st.session_state.should_rerun: st.session_state.should_rerun = False st.rerun()