import streamlit as st import json import os from datetime import datetime, timedelta import subprocess from huggingface_hub import HfApi from pathlib import Path from calendar_rag import ( create_default_config, AcademicCalendarRAG, PipelineConfig ) # Custom CSS for enhanced styling def load_custom_css(): st.markdown(""" """, unsafe_allow_html=True) def initialize_pipeline(): """Initialize RAG pipeline with configurations""" try: openai_api_key = os.getenv('OPENAI_API_KEY') or st.secrets['OPENAI_API_KEY'] config = create_default_config(openai_api_key) config.localization.enable_thai_normalization = True config.retriever.top_k = 5 config.model.temperature = 0.3 pipeline = AcademicCalendarRAG(config) with open("calendar.json", "r", encoding="utf-8") as f: calendar_data = json.load(f) pipeline.load_data(calendar_data) return pipeline except Exception as e: st.error(f"Error initializing pipeline: {str(e)}") return None def load_qa_history(): """Load QA history from local JSON file""" try: history_file = Path("qa_history.json") if history_file.exists(): with open(history_file, "r", encoding="utf-8") as f: return json.load(f) return [] except Exception as e: st.error(f"Error loading QA history: {str(e)}") return [] def save_qa_history(history_entry): """Save QA history entry to local JSON file and push to GitHub""" try: history_file = Path("qa_history.json") # Initialize or load existing history if history_file.exists(): try: with open(history_file, "r", encoding="utf-8") as f: file_content = f.read() if not file_content.strip(): st.warning("JSON file is empty, initializing new history") history_data = [] else: try: history_data = json.loads(file_content) if not isinstance(history_data, list): st.error("JSON file does not contain a list, resetting history") history_data = [] except json.JSONDecodeError as json_err: st.error(f"JSON parsing error: {str(json_err)}") # Try to salvage valid JSON if possible try: # Remove any trailing commas file_content = file_content.replace(",]", "]").replace(",}", "}") history_data = json.loads(file_content) except: st.error("Could not salvage JSON, initializing new history") history_data = [] except Exception as file_err: st.error(f"File reading error: {str(file_err)}") history_data = [] else: history_data = [] # Append new entry history_data.append(history_entry) # Validate history data before saving if not isinstance(history_data, list): st.error("Invalid history data format, must be a list") history_data = [] # Process and validate each entry processed_history = [] for entry in history_data: if isinstance(entry, dict) and all(key in entry for key in ["timestamp", "query", "answer"]): # Process answer if it's a Document or dict if isinstance(entry["answer"], dict): entry["answer"] = entry["answer"].get('answer', str(entry["answer"])) elif hasattr(entry["answer"], 'content'): entry["answer"] = entry["answer"].content else: entry["answer"] = str(entry["answer"]) processed_history.append(entry) history_data = processed_history # Save updated history locally try: json_content = json.dumps(history_data, ensure_ascii=False, indent=2) with open("qa_history.json", "w", encoding="utf-8") as f: f.write(json_content) except Exception as save_err: st.error(f"Error saving history locally: {str(save_err)}") return # Push to GitHub with error logging github_token = os.getenv('GITHUB_TOKEN') or st.secrets.get('GITHUB_TOKEN') if not github_token: st.error("GitHub token not found in environment or secrets!") return try: from github import Github g = Github(github_token) repo = g.get_repo("jirasaksaimekJijo/swu-chat-bot-project") try: # Try to get the file first contents = repo.get_contents("qa_history.json") # Ensure content is properly encoded content = json.dumps(history_data, ensure_ascii=False, indent=2) response = repo.update_file( path="qa_history.json", message="Update QA history", content=content, sha=contents.sha, branch="main" # Explicitly specify branch ) except Exception as file_error: # File doesn't exist, create it content = json.dumps(history_data, ensure_ascii=False, indent=2) response = repo.create_file( path="qa_history.json", message="Create QA history", content=content, branch="main" # Explicitly specify branch ) st.success("Successfully created qa_history.json on GitHub") except Exception as github_error: st.error(f"GitHub API error: {str(github_error)}") import traceback st.error(f"Full GitHub error trace: {traceback.format_exc()}") except Exception as e: st.error(f"General error in save_qa_history: {str(e)}") import traceback st.error(f"Full error trace: {traceback.format_exc()}") def add_to_qa_history(query: str, answer: str): """Add new QA pair to history with validation""" try: # Validate inputs if not query or not answer: st.warning("Empty query or answer detected, skipping history update") return None # Handle different answer types if isinstance(answer, dict): # If answer is a dict with 'answer' key, extract it processed_answer = answer.get('answer', str(answer)) elif hasattr(answer, 'content'): # If answer is a Document-like object with content attribute processed_answer = answer.content else: # Convert answer to string for any other type processed_answer = str(answer) # Create history entry with proper timestamp history_entry = { "timestamp": (datetime.now() + timedelta(hours=5)).strftime("%Y-%m-%dT%H:%M:%S"), "query": query, "answer": processed_answer } # Save entry save_qa_history(history_entry) return history_entry except Exception as e: st.error(f"Error in add_to_qa_history: {str(e)}") return None def add_to_history(role: str, message: str): """Add message to chat history and save if it's a complete QA pair""" st.session_state.chat_history.append((role, message)) # If this is an assistant response, save the QA pair if role == "assistant" and len(st.session_state.chat_history) >= 2: # Get the corresponding user query (previous message) user_query = st.session_state.chat_history[-2][1] add_to_qa_history(user_query, message) def display_chat_history(): """Display chat history with enhanced styling""" for i, (role, message) in enumerate(st.session_state.chat_history): if role == "user": st.markdown(f"""
""", unsafe_allow_html=True) else: st.markdown(f""" """, unsafe_allow_html=True) if 'context_memory' not in st.session_state: st.session_state.context_memory = [] def handle_submit(user_query: str): """Handle form submission logic""" if not user_query: st.warning("â ïļ āļāļĢāļļāļāļēāļĢāļ°āļāļļāļāļģāļāļēāļĄ") return user_query = user_query.strip() # Prevent duplicate submissions by checking last message if not st.session_state.chat_history or st.session_state.chat_history[-1][1] != user_query: try: st.session_state.processing_query = True # Add user message to chat history st.session_state.chat_history.append(("user", user_query)) # Maintain context memory if len(st.session_state.context_memory) > 5: st.session_state.context_memory.pop(0) # Build query with context query_with_context = "\n".join( [f"Q: {qa['query']}\nA: {qa['answer']}" for qa in st.session_state.context_memory] ) + f"\nQ: {user_query}" # Process query result = st.session_state.pipeline.process_query(query_with_context) # Create response dictionary with answer and documents response_dict = { "answer": result.get("answer", ""), "documents": result.get("documents", []) } # Update chat history and context st.session_state.chat_history.append(("assistant", response_dict)) st.session_state.context_memory.append({"query": user_query, "answer": response_dict}) # Save to QA history add_to_qa_history(user_query, response_dict) except Exception as e: st.session_state.chat_history.append(("assistant", f"â āđāļāļīāļāļāđāļāļāļīāļāļāļĨāļēāļ: {str(e)}")) st.error(f"Query processing error: {e}") finally: st.session_state.processing_query = False st.rerun() def create_chat_input(): """Create the chat input section with form handling""" # Create the form for chat input with st.form(key="chat_form", clear_on_submit=True): st.markdown(""" """, unsafe_allow_html=True) # Text input query = st.text_input( "", key="query_input", placeholder="āđāļāđāļ: āļ§āļąāļāļŠāļļāļāļāđāļēāļĒāļāļāļāļāļēāļĢāļŠāļāļāļāļēāļāđāļāļĨāđāļēāđāļāļ āļēāļāđāļĢāļĩāļĒāļāļāļĩāđ 1/2567 āļāļ·āļāļ§āļąāļāļāļĩāđāđāļāđāļēāđāļĢ?" ) # Create two columns for buttons with a 7:3 ratio col1, col2 = st.columns([7, 3]) with col1: # Submit button in form submitted = st.form_submit_button( "ðĪ āļŠāđāļāļāļģāļāļēāļĄ", type="primary", use_container_width=True ) with col2: # Clear history button inside the form clear_button = st.form_submit_button( "ðïļ āļĨāđāļēāļāļāļĢāļ°āļ§āļąāļāļī", type="secondary", use_container_width=True ) if submitted: handle_submit(query) if clear_button: st.session_state.context_memory = [] st.session_state.chat_history = [] st.rerun() def main(): # Page config st.set_page_config( page_title="Academic Calendar Assistant", page_icon="ð ", layout="wide", initial_sidebar_state="collapsed" ) # Load custom CSS load_custom_css() # Initialize session states if 'pipeline' not in st.session_state: st.session_state.pipeline = None if 'chat_history' not in st.session_state: st.session_state.chat_history = [] if 'context_memory' not in st.session_state: st.session_state.context_memory = [] if 'processing_query' not in st.session_state: st.session_state.processing_query = False # Load QA history at startup if 'qa_history_loaded' not in st.session_state: st.session_state.qa_history_loaded = True load_qa_history() # Initialize pipeline if st.session_state.pipeline is None: with st.spinner("āļāļģāļĨāļąāļāđāļĢāļīāđāļĄāļāđāļāļĢāļ°āļāļ..."): st.session_state.pipeline = initialize_pipeline() # Header st.markdown("""āļāļąāļāļāļīāļāļ§āļīāļāļĒāļēāļĨāļąāļĒ āļĄāļŦāļēāļ§āļīāļāļĒāļēāļĨāļąāļĒāļĻāļĢāļĩāļāļāļĢāļīāļāļāļĢāļ§āļīāđāļĢāļ
āļĢāļ°āļāļāļāļĩāđāđāļāđāđāļāļāđāļāđāļĨāļĒāļĩ RAG (Retrieval-Augmented Generation) āđāļāļāļēāļĢāļāđāļāļŦāļēāđāļĨāļ°āļāļāļāļāļģāļāļēāļĄāđāļāļĩāđāļĒāļ§āļāļąāļāļāļāļīāļāļīāļāļāļēāļĢāļĻāļķāļāļĐāļē
â° āđāļ§āļĨāļēāļāļąāļāļāļļāļāļąāļ:
{}
ðĄ āļŠāļāļēāļāļ°āļĢāļ°āļāļ:
{} {}