import streamlit as st import json import os from datetime import datetime, timedelta import subprocess from huggingface_hub import HfApi from pathlib import Path from calendar_rag import ( create_default_config, AcademicCalendarRAG, PipelineConfig ) # Custom CSS for enhanced styling def load_custom_css(): st.markdown(""" """, unsafe_allow_html=True) def initialize_pipeline(): """Initialize RAG pipeline with configurations""" try: openai_api_key = os.getenv('OPENAI_API_KEY') or st.secrets['OPENAI_API_KEY'] config = create_default_config(openai_api_key) config.localization.enable_thai_normalization = True config.retriever.top_k = 5 config.model.temperature = 0.3 pipeline = AcademicCalendarRAG(config) with open("calendar.json", "r", encoding="utf-8") as f: calendar_data = json.load(f) pipeline.load_data(calendar_data) return pipeline except Exception as e: st.error(f"Error initializing pipeline: {str(e)}") return None def load_qa_history(): """Load QA history from local JSON file""" try: history_file = Path("qa_history.json") if history_file.exists(): with open(history_file, "r", encoding="utf-8") as f: return json.load(f) return [] except Exception as e: st.error(f"Error loading QA history: {str(e)}") return [] def save_qa_history(history_entry): """Save QA history entry to local JSON file and push to GitHub""" try: history_file = Path("qa_history.json") # Initialize or load existing history if history_file.exists(): with open(history_file, "r", encoding="utf-8") as f: try: history_data = json.load(f) except json.JSONDecodeError: st.error("Error parsing existing JSON file") history_data = [] else: history_data = [] # Append new entry history_data.append(history_entry) # Save updated history locally with open("qa_history.json", "w", encoding="utf-8") as f: json.dump(history_data, f, ensure_ascii=False, indent=2) # st.info("Saved locally, attempting GitHub upload...") # Push to GitHub github_token = os.getenv('GITHUB_TOKEN') or st.secrets['GITHUB_TOKEN'] if not github_token: st.error("GitHub token not found!") return from github import Github g = Github(github_token) # Replace with your repository name repo = g.get_repo("jirasaksaimekJijo/swu-chat-bot-project") try: # Try to get the file first contents = repo.get_contents("qa_history.json") response = repo.update_file( path="qa_history.json", message="Update QA history", content=json.dumps(history_data, ensure_ascii=False, indent=2), sha=contents.sha ) # st.success(f"Successfully updated file on GitHub: {response.commit.html_url}") except Exception as e: # st.info(f"File doesn't exist yet, creating new one... Error was: {str(e)}") # File doesn't exist, create it response = repo.create_file( path="qa_history.json", message="Create QA history", content=json.dumps(history_data, ensure_ascii=False, indent=2) ) # st.success(f"Successfully created file on GitHub: {response['commit'].html_url}") except Exception as e: # st.error(f"Error saving QA history: {str(e)}") import traceback # st.error(f"Full error: {traceback.format_exc()}") def add_to_qa_history(query: str, answer: str): """Add new QA pair to history""" history_entry = { "timestamp": (datetime.now() + timedelta(hours=5)).isoformat(), "query": query, "answer": answer } save_qa_history(history_entry) return history_entry def add_to_history(role: str, message: str): """Add message to chat history and save if it's a complete QA pair""" st.session_state.chat_history.append((role, message)) # If this is an assistant response, save the QA pair if role == "assistant" and len(st.session_state.chat_history) >= 2: # Get the corresponding user query (previous message) user_query = st.session_state.chat_history[-2][1] add_to_qa_history(user_query, message) def display_chat_history(): """Display chat history with enhanced styling""" for i, (role, message) in enumerate(st.session_state.chat_history): if role == "user": st.markdown(f"""
""", unsafe_allow_html=True) else: st.markdown(f""" """, unsafe_allow_html=True) if 'context_memory' not in st.session_state: st.session_state.context_memory = [] def handle_submit(user_query: str): """Handle form submission logic""" if not user_query: st.warning("â ïļ āļāļĢāļļāļāļēāļĢāļ°āļāļļāļāļģāļāļēāļĄ") return user_query = user_query.strip() # Prevent duplicate submissions by checking last message if not st.session_state.chat_history or st.session_state.chat_history[-1][1] != user_query: try: st.session_state.processing_query = True # Add user message to chat history st.session_state.chat_history.append(("user", user_query)) # Maintain context memory if len(st.session_state.context_memory) > 5: st.session_state.context_memory.pop(0) # Build query with context query_with_context = "\n".join( [f"Q: {qa['query']}\nA: {qa['answer']}" for qa in st.session_state.context_memory] ) + f"\nQ: {user_query}" # Process query result = st.session_state.pipeline.process_query(query_with_context) # Create response dictionary with answer and documents response_dict = { "answer": result.get("answer", ""), "documents": result.get("documents", []) } # Update chat history and context st.session_state.chat_history.append(("assistant", response_dict)) st.session_state.context_memory.append({"query": user_query, "answer": response_dict}) # Save to QA history add_to_qa_history(user_query, response_dict) except Exception as e: st.session_state.chat_history.append(("assistant", f"â āđāļāļīāļāļāđāļāļāļīāļāļāļĨāļēāļ: {str(e)}")) st.error(f"Query processing error: {e}") finally: st.session_state.processing_query = False st.rerun() def create_chat_input(): """Create the chat input section with form handling""" # Column layout for the entire input section input_col, clear_col = st.columns([4, 1]) with input_col: # Create the form for chat input with st.form(key="chat_form", clear_on_submit=True): st.markdown(""" """, unsafe_allow_html=True) # Text input query = st.text_input( "", key="query_input", placeholder="āđāļāđāļ: āļ§āļąāļāļŠāļļāļāļāđāļēāļĒāļāļāļāļāļēāļĢāļŠāļāļāļāļēāļāđāļāļĨāđāļēāđāļāļ āļēāļāđāļĢāļĩāļĒāļāļāļĩāđ 1/2567 āļāļ·āļāļ§āļąāļāļāļĩāđāđāļāđāļēāđāļĢ?" ) # Submit button in form submitted = st.form_submit_button( "ðĪ āļŠāđāļāļāļģāļāļēāļĄ", type="primary", use_container_width=True ) if submitted: handle_submit(query) # Clear history button outside the form with clear_col: if st.button("ðïļ āļĨāđāļēāļāļāļĢāļ°āļ§āļąāļāļī", type="secondary", use_container_width=True): st.session_state.context_memory = [] st.session_state.chat_history = [] st.rerun() def main(): # Page config st.set_page_config( page_title="Academic Calendar Assistant", page_icon="ð ", layout="wide", initial_sidebar_state="collapsed" ) # Load custom CSS load_custom_css() # Initialize session states if 'pipeline' not in st.session_state: st.session_state.pipeline = None if 'chat_history' not in st.session_state: st.session_state.chat_history = [] if 'context_memory' not in st.session_state: st.session_state.context_memory = [] if 'processing_query' not in st.session_state: st.session_state.processing_query = False # Load QA history at startup if 'qa_history_loaded' not in st.session_state: st.session_state.qa_history_loaded = True load_qa_history() # Initialize pipeline if st.session_state.pipeline is None: with st.spinner("āļāļģāļĨāļąāļāđāļĢāļīāđāļĄāļāđāļāļĢāļ°āļāļ..."): st.session_state.pipeline = initialize_pipeline() # Header st.markdown("""āļāļąāļāļāļīāļāļ§āļīāļāļĒāļēāļĨāļąāļĒ āļĄāļŦāļēāļ§āļīāļāļĒāļēāļĨāļąāļĒāļĻāļĢāļĩāļāļāļĢāļīāļāļāļĢāļ§āļīāđāļĢāļ
āļĢāļ°āļāļāļāļĩāđāđāļāđāđāļāļāđāļāđāļĨāļĒāļĩ RAG (Retrieval-Augmented Generation) āđāļāļāļēāļĢāļāđāļāļŦāļēāđāļĨāļ°āļāļāļāļāļģāļāļēāļĄāđāļāļĩāđāļĒāļ§āļāļąāļāļāļāļīāļāļīāļāļāļēāļĢāļĻāļķāļāļĐāļē
â° āđāļ§āļĨāļēāļāļąāļāļāļļāļāļąāļ:
{}
ðĄ āļŠāļāļēāļāļ°āļĢāļ°āļāļ:
{} {}