pnp-chatbot-v1 / app /chat.py
FauziIsyrinApridal
update replay
d461eb9
raw
history blame
6.79 kB
import streamlit as st
from streamlit_chat import message
from streamlit_mic_recorder import speech_to_text
import base64
import asyncio
import edge_tts
import gtts
from hashlib import md5
import os
import glob
import time
from dotenv import load_dotenv
from app.db import supabase
load_dotenv()
# ----------------------------
# Pembersihan cache audio lama
# ----------------------------
def clean_old_cache(tts_dir="cache_tts", max_age_hours=12):
now = time.time()
for f in glob.glob(os.path.join(tts_dir, "*.mp3")):
if os.stat(f).st_mtime < now - max_age_hours * 3600:
os.remove(f)
clean_old_cache()
# ----------------------------
# Simpan feedback ke Supabase
# ----------------------------
def save_feedback_to_supabase(feedback_text):
try:
data = {"message": feedback_text}
supabase.table("feedback").insert(data).execute()
return True
except Exception as e:
st.error(f"Gagal menyimpan feedback: {e}")
return False
# ----------------------------
# State awal
# ----------------------------
def initialize_session_state():
if 'history' not in st.session_state:
st.session_state['history'] = []
if 'generated' not in st.session_state:
st.session_state['generated'] = ["Halo! Saya bisa membantu anda menjawab pertanyaan seputar Politeknik Negeri Padang!"]
if 'past' not in st.session_state:
st.session_state['past'] = ["Hai! πŸ‘‹"]
if 'should_speak' not in st.session_state:
st.session_state['should_speak'] = True
if 'input_text' not in st.session_state:
st.session_state['input_text'] = ""
if 'tts_cache' not in st.session_state:
st.session_state['tts_cache'] = {}
if 'last_tts' not in st.session_state:
st.session_state['last_tts'] = None
# ----------------------------
# TTS gTTS β†’ Edge fallback
# ----------------------------
async def generate_audio_edge(text, path, voice="id-ID-GadisNeural"):
communicate = edge_tts.Communicate(text, voice=voice)
await communicate.save(path)
def get_tts_path(text):
cache_dir = "cache_tts"
os.makedirs(cache_dir, exist_ok=True)
filename = f"{md5(text.encode()).hexdigest()}.mp3"
return os.path.join(cache_dir, filename)
def text_to_speech(text):
path = get_tts_path(text)
if not os.path.exists(path):
try:
tts = gtts.gTTS(text, lang="id")
tts.save(path)
except Exception as e:
print(f"[gTTS gagal] {e}")
try:
asyncio.run(generate_audio_edge(text, path))
except Exception as e2:
print(f"[Edge-TTS gagal] {e2}")
return None
return path
def play_audio_ui(path, autoplay=False):
with open(path, "rb") as audio_file:
audio_base64 = base64.b64encode(audio_file.read()).decode()
autoplay_attr = "autoplay" if autoplay else ""
st.markdown(
f"""
<audio {autoplay_attr} style="display:none;">
<source src="data:audio/mp3;base64,{audio_base64}" type="audio/mp3">
</audio>
""",
unsafe_allow_html=True
)
# ----------------------------
# Chat & TTS Bubble
# ----------------------------
def conversation_chat(query, chain, history):
result = chain({"question": query, "chat_history": history})
history.append((query, result["answer"]))
return result["answer"]
def display_chat_history(chain):
reply_container = st.container()
user_input_obj = st.chat_input("Masukkan pertanyaan", key="chat_input_field")
col2, col3 = st.columns([1, 1])
with col2:
if st.button("πŸ”Š Aktifkan" if st.session_state['should_speak'] else "πŸ”‡ Nonaktifkan",
key="toggle_tts", use_container_width=True):
st.session_state['should_speak'] = not st.session_state['should_speak']
st.experimental_rerun()
with col3:
stt_text = speech_to_text(
start_prompt="🎀 Input Suara",
stop_prompt="πŸ›‘ Stop",
language='id',
just_once=True,
key='stt_input',
use_container_width=True,
)
if stt_text:
st.session_state.input_text = stt_text
st.experimental_rerun()
user_input = user_input_obj or st.session_state.get("input_text", "")
if user_input:
with st.spinner('Sedang membuat jawaban...'):
output = conversation_chat(user_input, chain, st.session_state['history'])
st.session_state['past'].append(user_input)
st.session_state['generated'].append(output)
st.session_state.input_text = ""
# Cache TTS
tts_path = text_to_speech(output)
if tts_path:
st.session_state['tts_cache'][output] = tts_path
if st.session_state['should_speak']:
st.session_state['last_tts'] = output
# Tampilkan chat + tombol icon kecil
if st.session_state['generated']:
with reply_container:
for i in range(len(st.session_state['generated'])):
# Bubble user
message(st.session_state["past"][i], is_user=True, key=f"{i}_user", avatar_style="no-avatar")
# Bubble bot + icon play kecil
bot_text = st.session_state["generated"][i]
message(bot_text, key=f"{i}", avatar_style="no-avatar")
if bot_text in st.session_state['tts_cache']:
audio_id = f"audio_{i}"
audio_path = st.session_state['tts_cache'][bot_text]
with open(audio_path, "rb") as audio_file:
audio_base64 = base64.b64encode(audio_file.read()).decode()
st.markdown(
f"""
<div style="text-align:right; margin-top:-30px; margin-bottom:10px;">
<button onclick="var audio = document.getElementById('{audio_id}'); audio.play();"
style="background:none; border:none; cursor:pointer; font-size:18px;">
πŸ”Š
</button>
<audio id="{audio_id}">
<source src="data:audio/mp3;base64,{audio_base64}" type="audio/mp3">
</audio>
</div>
""",
unsafe_allow_html=True
)
# Auto-play untuk jawaban baru
if st.session_state['last_tts']:
last_text = st.session_state['last_tts']
if last_text in st.session_state['tts_cache']:
play_audio_ui(st.session_state['tts_cache'][last_text], autoplay=True)
st.session_state['last_tts'] = None