Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| from streamlit_chat import message | |
| from streamlit_mic_recorder import speech_to_text | |
| import base64 | |
| import asyncio | |
| import edge_tts | |
| import gtts | |
| from hashlib import md5 | |
| from io import BytesIO | |
| from app.db import supabase | |
| import os | |
| import glob | |
| import time | |
| from dotenv import load_dotenv | |
| from uuid import uuid4 | |
| load_dotenv() | |
| # Bersihkan cache audio lama (opsional) | |
| def clean_old_cache(tts_dir="cache_tts", max_age_hours=12): | |
| now = time.time() | |
| for f in glob.glob(os.path.join(tts_dir, "*.mp3")): | |
| if os.stat(f).st_mtime < now - max_age_hours * 3600: | |
| os.remove(f) | |
| # Jalankan pembersihan saat startup | |
| clean_old_cache() | |
| def save_feedback_to_supabase(feedback_text): | |
| try: | |
| data = {"message": feedback_text} | |
| supabase.table("feedback").insert(data).execute() | |
| return True | |
| except Exception as e: | |
| st.error(f"Gagal menyimpan feedback: {e}") | |
| return False | |
| def initialize_session_state(): | |
| if 'history' not in st.session_state: | |
| st.session_state['history'] = [] | |
| if 'generated' not in st.session_state: | |
| st.session_state['generated'] = ["Halo! Saya bisa membantu anda menjawab pertanyaan seputar Politeknik Negeri Padang!"] | |
| if 'past' not in st.session_state: | |
| st.session_state['past'] = ["Hai! π"] | |
| if 'data_len' not in st.session_state: | |
| st.session_state['data_len'] = 0 | |
| if 'vector_store' not in st.session_state: | |
| st.session_state['vector_store'] = None | |
| if 'should_speak' not in st.session_state: | |
| st.session_state['should_speak'] = True | |
| if 'input_text' not in st.session_state: | |
| st.session_state['input_text'] = "" | |
| if 'tts_output' not in st.session_state: | |
| st.session_state['tts_output'] = "" | |
| if 'tts_played' not in st.session_state: | |
| st.session_state['tts_played'] = True | |
| # Jangan auto-generate session_id di sini agar tidak membuat sesi baru tiap refresh. | |
| # Session akan dibuat/ditetapkan hanya oleh ensure_active_session() berdasarkan data di DB. | |
| def _current_user(): | |
| return st.session_state.get('user') | |
| def save_message_to_supabase(user_id: str, session_id: str, role: str, content: str): | |
| try: | |
| supabase.table("chat_messages").insert({ | |
| "user_id": user_id, | |
| "session_id": session_id, | |
| "role": role, | |
| "content": content, | |
| }).execute() | |
| except Exception as e: | |
| # Non-fatal in UI; log to console | |
| print(f"Gagal menyimpan pesan: {e}") | |
| def load_history_from_supabase(user_id: str, session_id: str, limit: int = 100): | |
| try: | |
| res = ( | |
| supabase | |
| .table("chat_messages") | |
| .select("role, content, created_at") | |
| .eq("user_id", user_id) | |
| .eq("session_id", session_id) | |
| .order("created_at", desc=False) | |
| .limit(limit) | |
| .execute() | |
| ) | |
| rows = getattr(res, 'data', None) or [] | |
| past, generated, history = [], [], [] | |
| # Bootstrap greeting if empty | |
| if not rows: | |
| return past, generated, history | |
| for r in rows: | |
| if r["role"] == "user": | |
| past.append(r["content"]) | |
| elif r["role"] == "assistant": | |
| generated.append(r["content"]) | |
| # Build history pairs progressively when both sides exist | |
| # Reconstruct history as list of (user, assistant) tuples | |
| for i in range(min(len(past), len(generated))): | |
| history.append((past[i], generated[i])) | |
| return past, generated, history | |
| except Exception as e: | |
| print(f"Gagal memuat riwayat: {e}") | |
| return [], [], [] | |
| def create_chat_session(user_id: str, title: str = None) -> str: | |
| """Create a new chat session for user and return session_id.""" | |
| try: | |
| payload = {"user_id": user_id} | |
| if title: | |
| payload["title"] = title | |
| res = ( | |
| supabase | |
| .table("chat_sessions") | |
| .insert(payload) | |
| .execute() | |
| ) | |
| data = getattr(res, 'data', None) or [] | |
| if isinstance(data, list) and data: | |
| return data[0].get('id') or str(uuid4()) | |
| if isinstance(data, dict) and data.get('id'): | |
| return data['id'] | |
| return str(uuid4()) | |
| except Exception as e: | |
| print(f"Gagal membuat sesi: {e}") | |
| return str(uuid4()) | |
| def list_chat_sessions(user_id: str, limit: int = 20): | |
| """List recent chat sessions for a user, newest first.""" | |
| try: | |
| res = ( | |
| supabase | |
| .table("chat_sessions") | |
| .select("id, title, created_at") | |
| .eq("user_id", user_id) | |
| .order("created_at", desc=True) | |
| .limit(limit) | |
| .execute() | |
| ) | |
| data = getattr(res, 'data', None) or [] | |
| return data | |
| except Exception as e: | |
| print(f"Gagal memuat daftar sesi: {e}") | |
| return [] | |
| def delete_chat_session(user_id: str, session_id: str) -> bool: | |
| """Delete a single chat session and its messages for a user.""" | |
| try: | |
| # Delete messages first (FK safety) | |
| try: | |
| supabase.table("chat_messages").delete().eq("user_id", user_id).eq("session_id", session_id).execute() | |
| except Exception as e: | |
| # If messages table missing/empty, continue | |
| print(f"Info: hapus pesan sesi gagal/abaikan: {e}") | |
| # Delete the session row | |
| supabase.table("chat_sessions").delete().eq("user_id", user_id).eq("id", session_id).execute() | |
| return True | |
| except Exception as e: | |
| print(f"Gagal menghapus sesi: {e}") | |
| return False | |
| def delete_all_chat_sessions(user_id: str) -> bool: | |
| """Delete all chat sessions and all messages for a user.""" | |
| try: | |
| # Delete all messages for user | |
| try: | |
| supabase.table("chat_messages").delete().eq("user_id", user_id).execute() | |
| except Exception as e: | |
| print(f"Info: hapus semua pesan gagal/abaikan: {e}") | |
| # Delete all sessions for user | |
| supabase.table("chat_sessions").delete().eq("user_id", user_id).execute() | |
| return True | |
| except Exception as e: | |
| print(f"Gagal menghapus semua sesi: {e}") | |
| return False | |
| def ensure_chat_session(user_id: str, session_id: str, title: str = None) -> str: | |
| """Ensure a chat session with given id exists; create if missing. Returns the session id.""" | |
| try: | |
| # Check existence | |
| chk = ( | |
| supabase | |
| .table("chat_sessions") | |
| .select("id") | |
| .eq("id", session_id) | |
| .limit(1) | |
| .execute() | |
| ) | |
| data = getattr(chk, 'data', None) or [] | |
| if (isinstance(data, list) and data) or (isinstance(data, dict) and data.get('id')): | |
| return session_id | |
| except Exception: | |
| pass | |
| # Create with explicit id | |
| try: | |
| payload = {"id": session_id, "user_id": user_id} | |
| if title: | |
| payload["title"] = title | |
| ins = ( | |
| supabase | |
| .table("chat_sessions") | |
| .insert(payload) | |
| .execute() | |
| ) | |
| data = getattr(ins, 'data', None) or [] | |
| if isinstance(data, list) and data: | |
| return data[0].get('id', session_id) | |
| if isinstance(data, dict) and data.get('id'): | |
| return data.get('id', session_id) | |
| return session_id | |
| except Exception as e: | |
| print(f"Gagal memastikan sesi: {e}") | |
| return session_id | |
| def _generate_session_title_from_text(text: str, max_len: int = 60) -> str: | |
| """Generate a concise session title from the first user message.""" | |
| if not text: | |
| return "Percakapan Baru" | |
| # Normalize whitespace and strip | |
| t = " ".join(text.strip().split()) | |
| # Remove surrounding quotes or trailing punctuation if too noisy | |
| t = t.strip('"\'\u201c\u201d') | |
| if len(t) > max_len: | |
| t = t[:max_len - 1].rstrip() + "β¦" | |
| return t or "Percakapan Baru" | |
| def update_chat_session_title_if_empty(user_id: str, session_id: str, candidate_title: str) -> None: | |
| """If session has no title, set it to candidate_title.""" | |
| try: | |
| chk = ( | |
| supabase | |
| .table("chat_sessions") | |
| .select("id, title") | |
| .eq("id", session_id) | |
| .eq("user_id", user_id) | |
| .limit(1) | |
| .execute() | |
| ) | |
| rows = getattr(chk, 'data', None) or [] | |
| title_val = None | |
| if isinstance(rows, list) and rows: | |
| title_val = rows[0].get("title") | |
| elif isinstance(rows, dict): | |
| title_val = rows.get("title") | |
| if not title_val: | |
| safe_title = _generate_session_title_from_text(candidate_title) | |
| ( | |
| supabase | |
| .table("chat_sessions") | |
| .update({"title": safe_title}) | |
| .eq("id", session_id) | |
| .eq("user_id", user_id) | |
| .execute() | |
| ) | |
| except Exception as e: | |
| # Non-fatal; just log | |
| print(f"Gagal mengatur judul sesi: {e}") | |
| # edge-tts fallback (cadangan) | |
| async def generate_audio_edge(text, path, voice="id-ID-GadisNeural"): | |
| communicate = edge_tts.Communicate(text, voice=voice) | |
| await communicate.save(path) | |
| # fungsi utama TTS dengan fallback | |
| def text_to_speech(text): | |
| cache_dir = "cache_tts" | |
| os.makedirs(cache_dir, exist_ok=True) | |
| filename = f"{md5(text.encode()).hexdigest()}.mp3" | |
| path = os.path.join(cache_dir, filename) | |
| if not os.path.exists(path): | |
| try: | |
| # β Utama: gTTS | |
| tts = gtts.gTTS(text, lang="id") | |
| tts.save(path) | |
| except Exception as e: | |
| print(f"[gTTS gagal] {e}") | |
| try: | |
| # β Cadangan: edge-tts | |
| asyncio.run(generate_audio_edge(text, path)) | |
| except Exception as e2: | |
| print(f"[Edge-TTS juga gagal] {e2}") | |
| st.warning("π Gagal membuat audio TTS.") | |
| return "" | |
| try: | |
| with open(path, "rb") as audio_file: | |
| audio_base64 = base64.b64encode(audio_file.read()).decode() | |
| return f""" | |
| <audio autoplay> | |
| <source src="data:audio/mp3;base64,{audio_base64}" type="audio/mp3"> | |
| </audio> | |
| """ | |
| except Exception as e: | |
| print(f"[Error saat membaca audio] {e}") | |
| return "" | |
| def conversation_chat(query, chain, history): | |
| # Save user message first | |
| user = _current_user() | |
| if user: | |
| try: | |
| save_message_to_supabase(user_id=user["id"], session_id=st.session_state.get('session_id'), role="user", content=query) | |
| except Exception: | |
| pass | |
| # Try auto-name the session on first user message | |
| try: | |
| update_chat_session_title_if_empty(user_id=user["id"], session_id=st.session_state.get('session_id'), candidate_title=query) | |
| except Exception: | |
| pass | |
| result = chain({"question": query, "chat_history": history}) | |
| answer = result["answer"] | |
| history.append((query, answer)) | |
| # Save assistant reply | |
| if user: | |
| try: | |
| save_message_to_supabase(user_id=user["id"], session_id=st.session_state.get('session_id'), role="assistant", content=answer) | |
| except Exception: | |
| pass | |
| return answer | |
| def display_chat_history(chain): | |
| reply_container = st.container() | |
| user_input_obj = st.chat_input("Masukkan pertanyaan", key="chat_input_field") | |
| col2, col3 = st.columns([1, 1]) | |
| # Tombol TTS Aktif / Nonaktif | |
| with col2: | |
| if st.button("π Text-to-Speech Aktif" if st.session_state['should_speak'] else "π Text-to-Speech Nonaktif", | |
| key="toggle_tts", | |
| help="Aktifkan/Nonaktifkan Text-to-Speech", | |
| use_container_width=True): | |
| st.session_state['should_speak'] = not st.session_state['should_speak'] | |
| st.rerun() | |
| # Tombol Input Suara | |
| with col3: | |
| stt_text = speech_to_text( | |
| start_prompt="π€ Input Suara", | |
| stop_prompt="π Stop", | |
| language='id', | |
| just_once=True, | |
| key='stt_input', | |
| use_container_width=True, | |
| ) | |
| # Jika ada STT | |
| if stt_text: | |
| st.session_state.input_text = stt_text | |
| st.rerun() | |
| # Ambil input user | |
| user_input = user_input_obj or st.session_state.get("input_text", "") | |
| if user_input: | |
| with st.spinner('Sedang membuat jawaban...'): | |
| output = conversation_chat(user_input, chain, st.session_state['history']) | |
| st.session_state['past'].append(user_input) | |
| st.session_state['generated'].append(output) | |
| st.session_state.input_text = "" | |
| # Reset flag supaya TTS siap memutar lagi | |
| if st.session_state['should_speak'] and output: | |
| st.session_state['tts_output'] = output | |
| st.session_state['tts_played'] = False | |
| # If user just logged in and no local history, try loading from DB | |
| if not st.session_state['history']: | |
| user = _current_user() | |
| if user: | |
| past, generated, history = load_history_from_supabase(user_id=user['id'], session_id=st.session_state.get('session_id')) | |
| if past or generated: | |
| st.session_state['past'] = past or st.session_state['past'] | |
| st.session_state['generated'] = generated or st.session_state['generated'] | |
| st.session_state['history'] = history or st.session_state['history'] | |
| # Tampilkan Riwayat Chat | |
| if st.session_state['generated']: | |
| with reply_container: | |
| for i in range(len(st.session_state['generated'])): | |
| message(st.session_state["past"][i], is_user=True, key=str(i) + '_user', avatar_style="no-avatar") | |
| message(st.session_state["generated"][i], key=str(i), avatar_style="no-avatar") | |
| # Pemutaran TTS | |
| if st.session_state.get('tts_output') and not st.session_state.get('tts_played'): | |
| st.markdown(text_to_speech(st.session_state['tts_output']), unsafe_allow_html=True) | |
| st.session_state['tts_played'] = True | |