pnp-chatbot-v1 / app /chat.py
FauziIsyrinApridal
revisi 1
daa81fb
raw
history blame
14.1 kB
import streamlit as st
from streamlit_chat import message
from streamlit_mic_recorder import speech_to_text
import base64
import asyncio
import edge_tts
import gtts
from hashlib import md5
from io import BytesIO
from app.db import supabase
import os
import glob
import time
from dotenv import load_dotenv
from uuid import uuid4
load_dotenv()
# Bersihkan cache audio lama (opsional)
def clean_old_cache(tts_dir="cache_tts", max_age_hours=12):
now = time.time()
for f in glob.glob(os.path.join(tts_dir, "*.mp3")):
if os.stat(f).st_mtime < now - max_age_hours * 3600:
os.remove(f)
# Jalankan pembersihan saat startup
clean_old_cache()
def save_feedback_to_supabase(feedback_text):
try:
data = {"message": feedback_text}
supabase.table("feedback").insert(data).execute()
return True
except Exception as e:
st.error(f"Gagal menyimpan feedback: {e}")
return False
def initialize_session_state():
if 'history' not in st.session_state:
st.session_state['history'] = []
if 'generated' not in st.session_state:
st.session_state['generated'] = ["Halo! Saya bisa membantu anda menjawab pertanyaan seputar Politeknik Negeri Padang!"]
if 'past' not in st.session_state:
st.session_state['past'] = ["Hai! πŸ‘‹"]
if 'data_len' not in st.session_state:
st.session_state['data_len'] = 0
if 'vector_store' not in st.session_state:
st.session_state['vector_store'] = None
if 'should_speak' not in st.session_state:
st.session_state['should_speak'] = True
if 'input_text' not in st.session_state:
st.session_state['input_text'] = ""
if 'tts_output' not in st.session_state:
st.session_state['tts_output'] = ""
if 'tts_played' not in st.session_state:
st.session_state['tts_played'] = True
if 'session_id' not in st.session_state:
st.session_state['session_id'] = str(uuid4())
def _current_user():
return st.session_state.get('user')
def save_message_to_supabase(user_id: str, session_id: str, role: str, content: str):
try:
supabase.table("chat_messages").insert({
"user_id": user_id,
"session_id": session_id,
"role": role,
"content": content,
}).execute()
except Exception as e:
# Non-fatal in UI; log to console
print(f"Gagal menyimpan pesan: {e}")
def load_history_from_supabase(user_id: str, session_id: str, limit: int = 100):
try:
res = (
supabase
.table("chat_messages")
.select("role, content, created_at")
.eq("user_id", user_id)
.eq("session_id", session_id)
.order("created_at", desc=False)
.limit(limit)
.execute()
)
rows = getattr(res, 'data', None) or []
past, generated, history = [], [], []
# Bootstrap greeting if empty
if not rows:
return past, generated, history
for r in rows:
if r["role"] == "user":
past.append(r["content"])
elif r["role"] == "assistant":
generated.append(r["content"])
# Build history pairs progressively when both sides exist
# Reconstruct history as list of (user, assistant) tuples
for i in range(min(len(past), len(generated))):
history.append((past[i], generated[i]))
return past, generated, history
except Exception as e:
print(f"Gagal memuat riwayat: {e}")
return [], [], []
def create_chat_session(user_id: str, title: str = None) -> str:
"""Create a new chat session for user and return session_id."""
try:
payload = {"user_id": user_id}
if title:
payload["title"] = title
res = (
supabase
.table("chat_sessions")
.insert(payload)
.execute()
)
data = getattr(res, 'data', None) or []
if isinstance(data, list) and data:
return data[0].get('id') or str(uuid4())
if isinstance(data, dict) and data.get('id'):
return data['id']
return str(uuid4())
except Exception as e:
print(f"Gagal membuat sesi: {e}")
return str(uuid4())
def list_chat_sessions(user_id: str, limit: int = 20):
"""List recent chat sessions for a user, newest first."""
try:
res = (
supabase
.table("chat_sessions")
.select("id, title, created_at")
.eq("user_id", user_id)
.order("created_at", desc=True)
.limit(limit)
.execute()
)
data = getattr(res, 'data', None) or []
return data
except Exception as e:
print(f"Gagal memuat daftar sesi: {e}")
return []
def delete_chat_session(user_id: str, session_id: str) -> bool:
"""Delete a single chat session and its messages for a user."""
try:
# Delete messages first (FK safety)
try:
supabase.table("chat_messages").delete().eq("user_id", user_id).eq("session_id", session_id).execute()
except Exception as e:
# If messages table missing/empty, continue
print(f"Info: hapus pesan sesi gagal/abaikan: {e}")
# Delete the session row
supabase.table("chat_sessions").delete().eq("user_id", user_id).eq("id", session_id).execute()
return True
except Exception as e:
print(f"Gagal menghapus sesi: {e}")
return False
def delete_all_chat_sessions(user_id: str) -> bool:
"""Delete all chat sessions and all messages for a user."""
try:
# Delete all messages for user
try:
supabase.table("chat_messages").delete().eq("user_id", user_id).execute()
except Exception as e:
print(f"Info: hapus semua pesan gagal/abaikan: {e}")
# Delete all sessions for user
supabase.table("chat_sessions").delete().eq("user_id", user_id).execute()
return True
except Exception as e:
print(f"Gagal menghapus semua sesi: {e}")
return False
def ensure_chat_session(user_id: str, session_id: str, title: str = None) -> str:
"""Ensure a chat session with given id exists; create if missing. Returns the session id."""
try:
# Check existence
chk = (
supabase
.table("chat_sessions")
.select("id")
.eq("id", session_id)
.limit(1)
.execute()
)
data = getattr(chk, 'data', None) or []
if (isinstance(data, list) and data) or (isinstance(data, dict) and data.get('id')):
return session_id
except Exception:
pass
# Create with explicit id
try:
payload = {"id": session_id, "user_id": user_id}
if title:
payload["title"] = title
ins = (
supabase
.table("chat_sessions")
.insert(payload)
.execute()
)
data = getattr(ins, 'data', None) or []
if isinstance(data, list) and data:
return data[0].get('id', session_id)
if isinstance(data, dict) and data.get('id'):
return data.get('id', session_id)
return session_id
except Exception as e:
print(f"Gagal memastikan sesi: {e}")
return session_id
def _generate_session_title_from_text(text: str, max_len: int = 60) -> str:
"""Generate a concise session title from the first user message."""
if not text:
return "Percakapan Baru"
# Normalize whitespace and strip
t = " ".join(text.strip().split())
# Remove surrounding quotes or trailing punctuation if too noisy
t = t.strip('"\'\u201c\u201d')
if len(t) > max_len:
t = t[:max_len - 1].rstrip() + "…"
return t or "Percakapan Baru"
def update_chat_session_title_if_empty(user_id: str, session_id: str, candidate_title: str) -> None:
"""If session has no title, set it to candidate_title."""
try:
chk = (
supabase
.table("chat_sessions")
.select("id, title")
.eq("id", session_id)
.eq("user_id", user_id)
.limit(1)
.execute()
)
rows = getattr(chk, 'data', None) or []
title_val = None
if isinstance(rows, list) and rows:
title_val = rows[0].get("title")
elif isinstance(rows, dict):
title_val = rows.get("title")
if not title_val:
safe_title = _generate_session_title_from_text(candidate_title)
(
supabase
.table("chat_sessions")
.update({"title": safe_title})
.eq("id", session_id)
.eq("user_id", user_id)
.execute()
)
except Exception as e:
# Non-fatal; just log
print(f"Gagal mengatur judul sesi: {e}")
# edge-tts fallback (cadangan)
async def generate_audio_edge(text, path, voice="id-ID-GadisNeural"):
communicate = edge_tts.Communicate(text, voice=voice)
await communicate.save(path)
# fungsi utama TTS dengan fallback
def text_to_speech(text):
cache_dir = "cache_tts"
os.makedirs(cache_dir, exist_ok=True)
filename = f"{md5(text.encode()).hexdigest()}.mp3"
path = os.path.join(cache_dir, filename)
if not os.path.exists(path):
try:
# βœ… Utama: gTTS
tts = gtts.gTTS(text, lang="id")
tts.save(path)
except Exception as e:
print(f"[gTTS gagal] {e}")
try:
# βœ… Cadangan: edge-tts
asyncio.run(generate_audio_edge(text, path))
except Exception as e2:
print(f"[Edge-TTS juga gagal] {e2}")
st.warning("πŸ”‡ Gagal membuat audio TTS.")
return ""
try:
with open(path, "rb") as audio_file:
audio_base64 = base64.b64encode(audio_file.read()).decode()
return f"""
<audio autoplay>
<source src="data:audio/mp3;base64,{audio_base64}" type="audio/mp3">
</audio>
"""
except Exception as e:
print(f"[Error saat membaca audio] {e}")
return ""
def conversation_chat(query, chain, history):
# Save user message first
user = _current_user()
if user:
try:
save_message_to_supabase(user_id=user["id"], session_id=st.session_state.get('session_id'), role="user", content=query)
except Exception:
pass
# Try auto-name the session on first user message
try:
update_chat_session_title_if_empty(user_id=user["id"], session_id=st.session_state.get('session_id'), candidate_title=query)
except Exception:
pass
result = chain({"question": query, "chat_history": history})
answer = result["answer"]
history.append((query, answer))
# Save assistant reply
if user:
try:
save_message_to_supabase(user_id=user["id"], session_id=st.session_state.get('session_id'), role="assistant", content=answer)
except Exception:
pass
return answer
def display_chat_history(chain):
reply_container = st.container()
user_input_obj = st.chat_input("Masukkan pertanyaan", key="chat_input_field")
col2, col3 = st.columns([1, 1])
# Tombol TTS Aktif / Nonaktif
with col2:
if st.button("πŸ”Š Text-to-Speech Aktif" if st.session_state['should_speak'] else "πŸ”‡ Text-to-Speech Nonaktif",
key="toggle_tts",
help="Aktifkan/Nonaktifkan Text-to-Speech",
use_container_width=True):
st.session_state['should_speak'] = not st.session_state['should_speak']
st.rerun()
# Tombol Input Suara
with col3:
stt_text = speech_to_text(
start_prompt="🎀 Input Suara",
stop_prompt="πŸ›‘ Stop",
language='id',
just_once=True,
key='stt_input',
use_container_width=True,
)
# Jika ada STT
if stt_text:
st.session_state.input_text = stt_text
st.rerun()
# Ambil input user
user_input = user_input_obj or st.session_state.get("input_text", "")
if user_input:
with st.spinner('Sedang membuat jawaban...'):
output = conversation_chat(user_input, chain, st.session_state['history'])
st.session_state['past'].append(user_input)
st.session_state['generated'].append(output)
st.session_state.input_text = ""
# Reset flag supaya TTS siap memutar lagi
if st.session_state['should_speak'] and output:
st.session_state['tts_output'] = output
st.session_state['tts_played'] = False
# If user just logged in and no local history, try loading from DB
if not st.session_state['history']:
user = _current_user()
if user:
past, generated, history = load_history_from_supabase(user_id=user['id'], session_id=st.session_state.get('session_id'))
if past or generated:
st.session_state['past'] = past or st.session_state['past']
st.session_state['generated'] = generated or st.session_state['generated']
st.session_state['history'] = history or st.session_state['history']
# Tampilkan Riwayat Chat
if st.session_state['generated']:
with reply_container:
for i in range(len(st.session_state['generated'])):
message(st.session_state["past"][i], is_user=True, key=str(i) + '_user', avatar_style="no-avatar")
message(st.session_state["generated"][i], key=str(i), avatar_style="no-avatar")
# Pemutaran TTS
if st.session_state.get('tts_output') and not st.session_state.get('tts_played'):
st.markdown(text_to_speech(st.session_state['tts_output']), unsafe_allow_html=True)
st.session_state['tts_played'] = True