|
import streamlit as st |
|
import json |
|
import os |
|
from datetime import datetime, timedelta |
|
import subprocess |
|
from huggingface_hub import HfApi |
|
from pathlib import Path |
|
from calendar_rag import ( |
|
create_default_config, |
|
AcademicCalendarRAG, |
|
PipelineConfig |
|
) |
|
|
|
def load_custom_css(): |
|
st.markdown(""" |
|
<style> |
|
/* General body styling */ |
|
body { |
|
font-family: "Arial", sans-serif !important; |
|
color: #000000 !important; |
|
background-color: white !important; |
|
line-height: 1.7 !important; |
|
} |
|
|
|
/* Main container styling */ |
|
.main { |
|
padding: 2rem; |
|
color: #000000; |
|
background-color: white; |
|
} |
|
|
|
/* Headers styling */ |
|
h1 { |
|
color: #000000; |
|
font-size: 2.8rem !important; |
|
font-weight: 700 !important; |
|
margin-bottom: 1.5rem !important; |
|
text-align: center; |
|
padding: 1rem 0; |
|
border-bottom: 3px solid #1E3A8A; |
|
} |
|
|
|
h3, h4 { |
|
color: #000000; |
|
font-weight: 600 !important; |
|
font-size: 1.6rem !important; |
|
margin-top: 1.5rem !important; |
|
} |
|
|
|
/* Chat message styling */ |
|
.chat-message { |
|
padding: 1.5rem; |
|
border-radius: 10px; |
|
margin: 1rem 0; |
|
box-shadow: 0 2px 4px rgba(0, 0, 0, 0.1); |
|
font-size: 1.1rem !important; |
|
line-height: 1.6 !important; |
|
font-family: "Arial", sans-serif !important; |
|
color: #000000 !important; |
|
} |
|
|
|
.user-message { |
|
background-color: #F3F4F6 !important; |
|
} |
|
|
|
.assistant-message { |
|
background-color: #EFF6FF !important; |
|
} |
|
|
|
/* Status indicators */ |
|
.status-indicator { |
|
padding: 0.5rem 1rem; |
|
border-radius: 6px; |
|
font-weight: 500; |
|
font-size: 1.2rem; |
|
color: #000000; |
|
} |
|
|
|
.status-online { |
|
background-color: #DEF7EC; |
|
color: #03543F; |
|
} |
|
|
|
.status-offline { |
|
background-color: #FDE8E8; |
|
color: rgb(255, 255, 255); |
|
} |
|
</style> |
|
""", unsafe_allow_html=True) |
|
|
|
def initialize_pipeline(): |
|
"""Initialize RAG pipeline with configurations""" |
|
try: |
|
openai_api_key = os.getenv('OPENAI_API_KEY') or st.secrets['OPENAI_API_KEY'] |
|
config = create_default_config(openai_api_key) |
|
config.localization.enable_thai_normalization = True |
|
config.retriever.top_k = 5 |
|
config.model.temperature = 0.3 |
|
pipeline = AcademicCalendarRAG(config) |
|
|
|
with open("calendar.json", "r", encoding="utf-8") as f: |
|
calendar_data = json.load(f) |
|
pipeline.load_data(calendar_data) |
|
|
|
return pipeline |
|
|
|
except Exception as e: |
|
st.error(f"Error initializing pipeline: {str(e)}") |
|
return None |
|
|
|
def load_qa_history(): |
|
"""Load QA history from local JSON file""" |
|
try: |
|
history_file = Path("qa_history.json") |
|
if history_file.exists(): |
|
with open(history_file, "r", encoding="utf-8") as f: |
|
return json.load(f) |
|
return [] |
|
except Exception as e: |
|
st.error(f"Error loading QA history: {str(e)}") |
|
return [] |
|
|
|
def save_qa_history(history_entry): |
|
"""Save QA history entry to local JSON file and push to GitHub""" |
|
try: |
|
history_file = Path("qa_history.json") |
|
|
|
|
|
if history_file.exists(): |
|
try: |
|
with open(history_file, "r", encoding="utf-8") as f: |
|
file_content = f.read() |
|
|
|
|
|
if not file_content.strip(): |
|
st.warning("JSON file is empty, initializing new history") |
|
history_data = [] |
|
else: |
|
try: |
|
history_data = json.loads(file_content) |
|
if not isinstance(history_data, list): |
|
st.error("JSON file does not contain a list, resetting history") |
|
history_data = [] |
|
except json.JSONDecodeError as json_err: |
|
st.error(f"JSON parsing error: {str(json_err)}") |
|
|
|
try: |
|
|
|
file_content = file_content.replace(",]", "]").replace(",}", "}") |
|
history_data = json.loads(file_content) |
|
except: |
|
st.error("Could not salvage JSON, initializing new history") |
|
history_data = [] |
|
except Exception as file_err: |
|
st.error(f"File reading error: {str(file_err)}") |
|
history_data = [] |
|
else: |
|
history_data = [] |
|
|
|
|
|
history_data.append(history_entry) |
|
|
|
|
|
if not isinstance(history_data, list): |
|
st.error("Invalid history data format, must be a list") |
|
history_data = [] |
|
|
|
|
|
processed_history = [] |
|
for entry in history_data: |
|
if isinstance(entry, dict) and all(key in entry for key in ["timestamp", "query", "answer"]): |
|
|
|
if isinstance(entry["answer"], dict): |
|
entry["answer"] = entry["answer"].get('answer', str(entry["answer"])) |
|
elif hasattr(entry["answer"], 'content'): |
|
entry["answer"] = entry["answer"].content |
|
else: |
|
entry["answer"] = str(entry["answer"]) |
|
processed_history.append(entry) |
|
|
|
history_data = processed_history |
|
|
|
|
|
try: |
|
json_content = json.dumps(history_data, ensure_ascii=False, indent=2) |
|
with open("qa_history.json", "w", encoding="utf-8") as f: |
|
f.write(json_content) |
|
except Exception as save_err: |
|
st.error(f"Error saving history locally: {str(save_err)}") |
|
return |
|
|
|
|
|
github_token = os.getenv('GITHUB_TOKEN') or st.secrets.get('GITHUB_TOKEN') |
|
if not github_token: |
|
st.error("GitHub token not found in environment or secrets!") |
|
return |
|
|
|
try: |
|
from github import Github |
|
g = Github(github_token) |
|
|
|
repo = g.get_repo("jirasaksaimekJijo/swu-chat-bot-project") |
|
|
|
try: |
|
|
|
contents = repo.get_contents("qa_history.json") |
|
|
|
|
|
content = json.dumps(history_data, ensure_ascii=False, indent=2) |
|
|
|
response = repo.update_file( |
|
path="qa_history.json", |
|
message="Update QA history", |
|
content=content, |
|
sha=contents.sha, |
|
branch="main" |
|
) |
|
|
|
except Exception as file_error: |
|
|
|
content = json.dumps(history_data, ensure_ascii=False, indent=2) |
|
response = repo.create_file( |
|
path="qa_history.json", |
|
message="Create QA history", |
|
content=content, |
|
branch="main" |
|
) |
|
st.success("Successfully created qa_history.json on GitHub") |
|
|
|
except Exception as github_error: |
|
st.error(f"GitHub API error: {str(github_error)}") |
|
import traceback |
|
st.error(f"Full GitHub error trace: {traceback.format_exc()}") |
|
|
|
except Exception as e: |
|
st.error(f"General error in save_qa_history: {str(e)}") |
|
import traceback |
|
st.error(f"Full error trace: {traceback.format_exc()}") |
|
|
|
def add_to_qa_history(query: str, answer: str): |
|
"""Add new QA pair to history with validation""" |
|
try: |
|
|
|
if not query or not answer: |
|
st.warning("Empty query or answer detected, skipping history update") |
|
return None |
|
|
|
|
|
if isinstance(answer, dict): |
|
|
|
processed_answer = answer.get('answer', str(answer)) |
|
elif hasattr(answer, 'content'): |
|
|
|
processed_answer = answer.content |
|
else: |
|
|
|
processed_answer = str(answer) |
|
|
|
|
|
history_entry = { |
|
"timestamp": (datetime.now() + timedelta(hours=5)).strftime("%Y-%m-%dT%H:%M:%S"), |
|
"query": query, |
|
"answer": processed_answer |
|
} |
|
|
|
|
|
save_qa_history(history_entry) |
|
return history_entry |
|
|
|
except Exception as e: |
|
st.error(f"Error in add_to_qa_history: {str(e)}") |
|
return None |
|
|
|
def add_to_history(role: str, message: str): |
|
"""Add message to chat history and save if it's a complete QA pair""" |
|
st.session_state.chat_history.append((role, message)) |
|
|
|
|
|
if role == "assistant" and len(st.session_state.chat_history) >= 2: |
|
|
|
user_query = st.session_state.chat_history[-2][1] |
|
add_to_qa_history(user_query, message) |
|
|
|
def display_chat_history(): |
|
"""Display chat history with enhanced styling""" |
|
for i, (role, message) in enumerate(st.session_state.chat_history): |
|
if role == "user": |
|
st.markdown(f""" |
|
<div class="chat-message user-message"> |
|
<strong>🧑 คำถาม:</strong><br> |
|
{message} |
|
</div> |
|
""", unsafe_allow_html=True) |
|
else: |
|
st.markdown(f""" |
|
<div class="chat-message assistant-message"> |
|
<strong>🤖 คำตอบ:</strong><br> |
|
{message} |
|
</div> |
|
""", unsafe_allow_html=True) |
|
|
|
if 'context_memory' not in st.session_state: |
|
st.session_state.context_memory = [] |
|
|
|
def handle_submit(user_query: str): |
|
"""Handle form submission logic""" |
|
if not user_query: |
|
st.warning("⚠️ กรุณาระบุคำถาม") |
|
return |
|
|
|
user_query = user_query.strip() |
|
|
|
|
|
if not st.session_state.chat_history or st.session_state.chat_history[-1][1] != user_query: |
|
try: |
|
st.session_state.processing_query = True |
|
|
|
st.session_state.chat_history.append(("user", user_query)) |
|
|
|
|
|
with st.spinner("🔍 กำลังค้นหาคำตอบ..."): |
|
|
|
|
|
st.session_state.chat_history.append(("user", user_query)) |
|
|
|
|
|
if len(st.session_state.context_memory) > 5: |
|
st.session_state.context_memory.pop(0) |
|
|
|
|
|
query_with_context = "\n".join( |
|
[f"Q: {qa['query']}\nA: {qa['answer']}" for qa in st.session_state.context_memory] |
|
) + f"\nQ: {user_query}" |
|
|
|
|
|
result = st.session_state.pipeline.process_query(query_with_context) |
|
|
|
|
|
response_dict = { |
|
"answer": result.get("answer", ""), |
|
"documents": result.get("documents", []) |
|
} |
|
|
|
|
|
st.session_state.chat_history.append(("assistant", response_dict)) |
|
st.session_state.context_memory.append({"query": user_query, "answer": response_dict}) |
|
|
|
|
|
add_to_qa_history(user_query, response_dict) |
|
|
|
except Exception as e: |
|
st.session_state.chat_history.append(("assistant", f"❌ เกิดข้อผิดพลาด: {str(e)}")) |
|
st.error(f"Query processing error: {e}") |
|
|
|
finally: |
|
st.session_state.processing_query = False |
|
st.rerun() |
|
|
|
def create_chat_input(): |
|
"""Create the chat input section with form handling""" |
|
|
|
with st.form(key="chat_form", clear_on_submit=True): |
|
st.markdown(""" |
|
<label for="query_input" style="font-size: 1.2rem; font-weight: 600; margin-bottom: 1rem; display: block;"> |
|
<span style="color: #ffffff; border-left: 4px solid #ffffff; padding-left: 0.8rem;"> |
|
โปรดระบุคำถามเกี่ยวกับปฏิทินการศึกษา: |
|
</span> |
|
</label> |
|
""", unsafe_allow_html=True) |
|
|
|
|
|
query = st.text_input( |
|
"", |
|
key="query_input", |
|
placeholder="เช่น: วันสุดท้ายของการสอบปากเปล่าในภาคเรียนที่ 1/2567 คือวันที่เท่าไร?" |
|
) |
|
|
|
|
|
col1, col2 = st.columns([7, 3]) |
|
|
|
with col1: |
|
|
|
submitted = st.form_submit_button( |
|
"📤 ส่งคำถาม", |
|
type="primary", |
|
use_container_width=True |
|
) |
|
|
|
with col2: |
|
|
|
clear_button = st.form_submit_button( |
|
"🗑️ ล้างประวัติ", |
|
type="secondary", |
|
use_container_width=True |
|
) |
|
|
|
if submitted: |
|
handle_submit(query) |
|
|
|
if clear_button: |
|
st.session_state.context_memory = [] |
|
st.session_state.chat_history = [] |
|
st.rerun() |
|
|
|
def main(): |
|
|
|
st.set_page_config( |
|
page_title="Academic Calendar Assistant", |
|
page_icon="📅", |
|
layout="wide", |
|
initial_sidebar_state="collapsed" |
|
) |
|
|
|
|
|
load_custom_css() |
|
|
|
|
|
if 'pipeline' not in st.session_state: |
|
st.session_state.pipeline = None |
|
|
|
if 'chat_history' not in st.session_state: |
|
st.session_state.chat_history = [] |
|
|
|
if 'context_memory' not in st.session_state: |
|
st.session_state.context_memory = [] |
|
|
|
if 'processing_query' not in st.session_state: |
|
st.session_state.processing_query = False |
|
|
|
|
|
if 'qa_history_loaded' not in st.session_state: |
|
st.session_state.qa_history_loaded = True |
|
load_qa_history() |
|
|
|
|
|
if st.session_state.pipeline is None: |
|
with st.spinner("กำลังเริ่มต้นระบบ..."): |
|
st.session_state.pipeline = initialize_pipeline() |
|
|
|
|
|
st.markdown(""" |
|
<div style="text-align: center; padding: 2rem 0;"> |
|
<h1>🎓 ระบบค้นหาข้อมูลปฏิทินการศึกษา</h1> |
|
<p style="font-size: 1.2rem; color: #666;">บัณฑิตวิทยาลัย มหาวิทยาลัยศรีนครินทรวิโรฒ</p> |
|
</div> |
|
""", unsafe_allow_html=True) |
|
|
|
chat_col, info_col = st.columns([7, 3]) |
|
|
|
with chat_col: |
|
|
|
for i, (role, content) in enumerate(st.session_state.chat_history): |
|
if role == "user": |
|
st.markdown(f""" |
|
<div class="chat-message user-message"> |
|
<strong>🧑 คำถาม:</strong><br> |
|
{content} |
|
</div> |
|
""", unsafe_allow_html=True) |
|
else: |
|
if isinstance(content, dict): |
|
assistant_response = content.get('answer', '❌ ไม่มีข้อมูลคำตอบ') |
|
else: |
|
assistant_response = content |
|
|
|
st.markdown(f""" |
|
<div class="chat-message assistant-message"> |
|
<strong>🤖 คำตอบ:</strong><br> |
|
{assistant_response} |
|
</div> |
|
""", unsafe_allow_html=True) |
|
|
|
if isinstance(content, dict) and content.get('documents'): |
|
with st.expander("📚 แสดงข้อมูลอ้างอิง", expanded=False): |
|
for i, doc in enumerate(content['documents'], 1): |
|
st.markdown(f""" |
|
<div style="padding: 1rem; background-color: #000000; border-radius: 8px; margin: 0.5rem 0;"> |
|
<strong>เอกสารที่ {i}:</strong><br> |
|
{doc.content} |
|
</div> |
|
""", unsafe_allow_html=True) |
|
|
|
|
|
create_chat_input() |
|
|
|
|
|
with info_col: |
|
st.markdown(""" |
|
<div style="background-color: #F9FAFB; padding: 1.5rem; border-radius: 12px; margin-bottom: 2rem;"> |
|
<h3 style="color: #1E3A8A;">ℹ️ เกี่ยวกับระบบ</h3> |
|
<p style="color: #000000;"> |
|
ระบบนี้ใช้เทคโนโลยี <strong>RAG (Retrieval-Augmented Generation)</strong> |
|
ในการค้นหาและตอบคำถามเกี่ยวกับปฏิทินการศึกษา |
|
</p> |
|
<h4 style="color: #1E3A8A; margin-top: 1rem;">สามารถสอบถามข้อมูลเกี่ยวกับ:</h4> |
|
<ul style="list-style-type: none; padding-left: 0;"> |
|
<li style="color: #000000; margin-bottom: 0.5rem;">📅 กำหนดการต่างๆ ในปฏิทินการศึกษา</li> |
|
<li style="color: #000000; margin-bottom: 0.5rem;">🎯 วันสำคัญและกิจกรรม</li> |
|
<li style="color: #000000; margin-bottom: 0.5rem;">📝 การลงทะเบียนเรียน</li> |
|
<li style="color: #000000; margin-bottom: 0.5rem;">📚 กำหนดการสอบ</li> |
|
<li style="color: #000000; margin-bottom: 0.5rem;">🏖️ วันหยุดการศึกษา</li> |
|
</ul> |
|
</div> |
|
""", unsafe_allow_html=True) |
|
|
|
st.markdown(""" |
|
<div style="background-color: #f9fafb; padding: 1.5rem; border-radius: 12px;"> |
|
<h3 style="color: #1E3A8A;">🔄 สถานะระบบ</h3> |
|
<div style="margin-top: 1rem;"> |
|
<p><strong style="color: #000000;">⏰ เวลาปัจจุบัน:</strong><br> |
|
<span style="color: #000000;">{}</span></p> |
|
<p><strong style="color: #000000;">📡 สถานะระบบ:</strong><br> |
|
<span class="status-indicator {}"> |
|
{} {} |
|
</span></p> |
|
</div> |
|
</div> |
|
""".format( |
|
(datetime.now() + timedelta(hours=5)).strftime('%Y-%m-%d %H:%M:%S'), |
|
"status-online" if st.session_state.pipeline else "status-offline", |
|
"🟢" if st.session_state.pipeline else "🔴", |
|
"พร้อมใช้งาน" if st.session_state.pipeline else "ไม่พร้อมใช้งาน" |
|
), unsafe_allow_html=True) |
|
|
|
if __name__ == "__main__": |
|
main() |