import streamlit as st from streamlit_chat import message from streamlit_mic_recorder import speech_to_text import base64 import asyncio import edge_tts import gtts from hashlib import md5 from io import BytesIO from app.db import supabase import os import glob import time from dotenv import load_dotenv load_dotenv() # Bersihkan cache audio lama (opsional) def clean_old_cache(tts_dir="cache_tts", max_age_hours=12): now = time.time() for f in glob.glob(os.path.join(tts_dir, "*.mp3")): if os.stat(f).st_mtime < now - max_age_hours * 3600: os.remove(f) # Jalankan pembersihan saat startup clean_old_cache() def save_feedback_to_supabase(feedback_text): try: data = {"message": feedback_text} supabase.table("feedback").insert(data).execute() return True except Exception as e: st.error(f"Gagal menyimpan feedback: {e}") return False def initialize_session_state(): if 'history' not in st.session_state: st.session_state['history'] = [] if 'generated' not in st.session_state: st.session_state['generated'] = ["Halo! Saya bisa membantu anda menjawab pertanyaan seputar Politeknik Negeri Padang!"] if 'past' not in st.session_state: st.session_state['past'] = ["Hai! 👋"] if 'data_len' not in st.session_state: st.session_state['data_len'] = 0 if 'vector_store' not in st.session_state: st.session_state['vector_store'] = None if 'should_speak' not in st.session_state: st.session_state['should_speak'] = True if 'input_text' not in st.session_state: st.session_state['input_text'] = "" if 'tts_output' not in st.session_state: st.session_state['tts_output'] = "" if 'tts_played' not in st.session_state: st.session_state['tts_played'] = True # edge-tts fallback (cadangan) async def generate_audio_edge(text, path, voice="id-ID-GadisNeural"): communicate = edge_tts.Communicate(text, voice=voice) await communicate.save(path) # fungsi utama TTS dengan fallback def text_to_speech(text): cache_dir = "cache_tts" os.makedirs(cache_dir, exist_ok=True) filename = f"{md5(text.encode()).hexdigest()}.mp3" path = os.path.join(cache_dir, filename) if not os.path.exists(path): try: # ✅ Utama: gTTS tts = gtts.gTTS(text, lang="id") tts.save(path) except Exception as e: print(f"[gTTS gagal] {e}") try: # ✅ Cadangan: edge-tts asyncio.run(generate_audio_edge(text, path)) except Exception as e2: print(f"[Edge-TTS juga gagal] {e2}") st.warning("🔇 Gagal membuat audio TTS.") return "" try: with open(path, "rb") as audio_file: audio_base64 = base64.b64encode(audio_file.read()).decode() return f""" """ except Exception as e: print(f"[Error saat membaca audio] {e}") return "" def conversation_chat(query, chain, history): result = chain({"question": query, "chat_history": history}) history.append((query, result["answer"])) return result["answer"] def display_chat_history(chain): reply_container = st.container() user_input_obj = st.chat_input("Masukkan pertanyaan", key="chat_input_field") col2, col3 = st.columns([1, 1]) # Tombol TTS Aktif / Nonaktif with col2: if st.button("🔊 Text-to-Speech Aktif" if st.session_state['should_speak'] else "🔇 Text-to-Speech Nonaktif", key="toggle_tts", help="Aktifkan/Nonaktifkan Text-to-Speech", use_container_width=True): st.session_state['should_speak'] = not st.session_state['should_speak'] st.experimental_rerun() # Tombol Input Suara with col3: stt_text = speech_to_text( start_prompt="🎤 Input Suara", stop_prompt="🛑 Stop", language='id', just_once=True, key='stt_input', use_container_width=True, ) # Jika ada STT if stt_text: st.session_state.input_text = stt_text st.experimental_rerun() # Ambil input user user_input = user_input_obj or st.session_state.get("input_text", "") if user_input: with st.spinner('Sedang membuat jawaban...'): output = conversation_chat(user_input, chain, st.session_state['history']) st.session_state['past'].append(user_input) st.session_state['generated'].append(output) st.session_state.input_text = "" # Reset flag supaya TTS siap memutar lagi if st.session_state['should_speak'] and output: st.session_state['tts_output'] = output st.session_state['tts_played'] = False # Tampilkan Riwayat Chat if st.session_state['generated']: with reply_container: for i in range(len(st.session_state['generated'])): message(st.session_state["past"][i], is_user=True, key=str(i) + '_user', avatar_style="no-avatar") message(st.session_state["generated"][i], key=str(i), avatar_style="no-avatar") # Pemutaran TTS if st.session_state.get('tts_output') and not st.session_state.get('tts_played'): st.markdown(text_to_speech(st.session_state['tts_output']), unsafe_allow_html=True) st.session_state['tts_played'] = True