Spaces:
Sleeping
Sleeping
import streamlit as st | |
from streamlit_chat import message | |
from streamlit_mic_recorder import speech_to_text | |
import base64 | |
import asyncio | |
import edge_tts | |
import gtts | |
from hashlib import md5 | |
import os | |
import glob | |
import time | |
from dotenv import load_dotenv | |
from app.db import supabase | |
load_dotenv() | |
# ---------------------------- | |
# Pembersihan cache audio lama | |
# ---------------------------- | |
def clean_old_cache(tts_dir="cache_tts", max_age_hours=12): | |
now = time.time() | |
for f in glob.glob(os.path.join(tts_dir, "*.mp3")): | |
if os.stat(f).st_mtime < now - max_age_hours * 3600: | |
os.remove(f) | |
clean_old_cache() | |
# ---------------------------- | |
# Simpan feedback ke Supabase | |
# ---------------------------- | |
def save_feedback_to_supabase(feedback_text): | |
try: | |
data = {"message": feedback_text} | |
supabase.table("feedback").insert(data).execute() | |
return True | |
except Exception as e: | |
st.error(f"Gagal menyimpan feedback: {e}") | |
return False | |
# ---------------------------- | |
# State awal | |
# ---------------------------- | |
def initialize_session_state(): | |
if 'history' not in st.session_state: | |
st.session_state['history'] = [] | |
if 'generated' not in st.session_state: | |
st.session_state['generated'] = ["Halo! Saya bisa membantu anda menjawab pertanyaan seputar Politeknik Negeri Padang!"] | |
if 'past' not in st.session_state: | |
st.session_state['past'] = ["Hai! π"] | |
if 'should_speak' not in st.session_state: | |
st.session_state['should_speak'] = True | |
if 'input_text' not in st.session_state: | |
st.session_state['input_text'] = "" | |
if 'tts_cache' not in st.session_state: | |
st.session_state['tts_cache'] = {} | |
if 'last_tts' not in st.session_state: | |
st.session_state['last_tts'] = None | |
# ---------------------------- | |
# TTS gTTS β Edge fallback | |
# ---------------------------- | |
async def generate_audio_edge(text, path, voice="id-ID-GadisNeural"): | |
communicate = edge_tts.Communicate(text, voice=voice) | |
await communicate.save(path) | |
def get_tts_path(text): | |
cache_dir = "cache_tts" | |
os.makedirs(cache_dir, exist_ok=True) | |
filename = f"{md5(text.encode()).hexdigest()}.mp3" | |
return os.path.join(cache_dir, filename) | |
def text_to_speech(text): | |
path = get_tts_path(text) | |
if not os.path.exists(path): | |
try: | |
tts = gtts.gTTS(text, lang="id") | |
tts.save(path) | |
except Exception as e: | |
print(f"[gTTS gagal] {e}") | |
try: | |
asyncio.run(generate_audio_edge(text, path)) | |
except Exception as e2: | |
print(f"[Edge-TTS gagal] {e2}") | |
return None | |
return path | |
def play_audio_ui(path, autoplay=False): | |
with open(path, "rb") as audio_file: | |
audio_base64 = base64.b64encode(audio_file.read()).decode() | |
autoplay_attr = "autoplay" if autoplay else "" | |
st.markdown( | |
f""" | |
<audio {autoplay_attr} style="display:none;"> | |
<source src="data:audio/mp3;base64,{audio_base64}" type="audio/mp3"> | |
</audio> | |
""", | |
unsafe_allow_html=True | |
) | |
# ---------------------------- | |
# Chat & TTS Bubble | |
# ---------------------------- | |
def conversation_chat(query, chain, history): | |
result = chain({"question": query, "chat_history": history}) | |
history.append((query, result["answer"])) | |
return result["answer"] | |
def display_chat_history(chain): | |
reply_container = st.container() | |
user_input_obj = st.chat_input("Masukkan pertanyaan", key="chat_input_field") | |
col2, col3 = st.columns([1, 1]) | |
with col2: | |
if st.button("π Aktifkan" if st.session_state['should_speak'] else "π Nonaktifkan", | |
key="toggle_tts", use_container_width=True): | |
st.session_state['should_speak'] = not st.session_state['should_speak'] | |
st.experimental_rerun() | |
with col3: | |
stt_text = speech_to_text( | |
start_prompt="π€ Input Suara", | |
stop_prompt="π Stop", | |
language='id', | |
just_once=True, | |
key='stt_input', | |
use_container_width=True, | |
) | |
if stt_text: | |
st.session_state.input_text = stt_text | |
st.experimental_rerun() | |
user_input = user_input_obj or st.session_state.get("input_text", "") | |
if user_input: | |
with st.spinner('Sedang membuat jawaban...'): | |
output = conversation_chat(user_input, chain, st.session_state['history']) | |
st.session_state['past'].append(user_input) | |
st.session_state['generated'].append(output) | |
st.session_state.input_text = "" | |
# Cache TTS | |
tts_path = text_to_speech(output) | |
if tts_path: | |
st.session_state['tts_cache'][output] = tts_path | |
if st.session_state['should_speak']: | |
st.session_state['last_tts'] = output | |
# Tampilkan chat + tombol icon kecil | |
if st.session_state['generated']: | |
with reply_container: | |
for i in range(len(st.session_state['generated'])): | |
# Bubble user | |
message(st.session_state["past"][i], is_user=True, key=f"{i}_user", avatar_style="no-avatar") | |
# Bubble bot + icon play kecil | |
bot_text = st.session_state["generated"][i] | |
message(bot_text, key=f"{i}", avatar_style="no-avatar") | |
if bot_text in st.session_state['tts_cache']: | |
audio_id = f"audio_{i}" | |
audio_path = st.session_state['tts_cache'][bot_text] | |
with open(audio_path, "rb") as audio_file: | |
audio_base64 = base64.b64encode(audio_file.read()).decode() | |
st.markdown( | |
f""" | |
<div style="text-align:right; margin-top:-30px; margin-bottom:10px;"> | |
<button onclick="var audio = document.getElementById('{audio_id}'); audio.play();" | |
style="background:none; border:none; cursor:pointer; font-size:18px;"> | |
π | |
</button> | |
<audio id="{audio_id}"> | |
<source src="data:audio/mp3;base64,{audio_base64}" type="audio/mp3"> | |
</audio> | |
</div> | |
""", | |
unsafe_allow_html=True | |
) | |
# Auto-play untuk jawaban baru | |
if st.session_state['last_tts']: | |
last_text = st.session_state['last_tts'] | |
if last_text in st.session_state['tts_cache']: | |
play_audio_ui(st.session_state['tts_cache'][last_text], autoplay=True) | |
st.session_state['last_tts'] = None | |