import streamlit as st import json import os from datetime import datetime, timedelta import subprocess from huggingface_hub import HfApi from pathlib import Path from calendar_rag import ( create_default_config, AcademicCalendarRAG, PipelineConfig ) # Custom CSS for enhanced styling def load_custom_css(): st.markdown(""" """, unsafe_allow_html=True) def initialize_pipeline(): """Initialize RAG pipeline with configurations""" try: openai_api_key = os.getenv('OPENAI_API_KEY') or st.secrets['OPENAI_API_KEY'] config = create_default_config(openai_api_key) config.localization.enable_thai_normalization = True config.retriever.top_k = 5 config.model.temperature = 0.3 pipeline = AcademicCalendarRAG(config) with open("calendar.json", "r", encoding="utf-8") as f: calendar_data = json.load(f) pipeline.load_data(calendar_data) return pipeline except Exception as e: st.error(f"Error initializing pipeline: {str(e)}") return None def load_qa_history(): """Load QA history from local JSON file""" try: history_file = Path("qa_history.json") if history_file.exists(): with open(history_file, "r", encoding="utf-8") as f: return json.load(f) return [] except Exception as e: st.error(f"Error loading QA history: {str(e)}") return [] def save_qa_history(history_entry): """Save QA history entry to local JSON file and push to GitHub""" try: history_file = Path("qa_history.json") # Initialize or load existing history if history_file.exists(): with open(history_file, "r", encoding="utf-8") as f: try: history_data = json.load(f) except json.JSONDecodeError: st.error("Error parsing existing JSON file") history_data = [] else: history_data = [] # Append new entry history_data.append(history_entry) # Save updated history locally with open("qa_history.json", "w", encoding="utf-8") as f: json.dump(history_data, f, ensure_ascii=False, indent=2) # st.info("Saved locally, attempting GitHub upload...") # Push to GitHub github_token = os.getenv('GITHUB_TOKEN') or st.secrets['GITHUB_TOKEN'] if not github_token: st.error("GitHub token not found!") return from github import Github g = Github(github_token) # Replace with your repository name repo = g.get_repo("jirasaksaimekJijo/swu-chat-bot-project") try: # Try to get the file first contents = repo.get_contents("qa_history.json") response = repo.update_file( path="qa_history.json", message="Update QA history", content=json.dumps(history_data, ensure_ascii=False, indent=2), sha=contents.sha ) # st.success(f"Successfully updated file on GitHub: {response.commit.html_url}") except Exception as e: # st.info(f"File doesn't exist yet, creating new one... Error was: {str(e)}") # File doesn't exist, create it response = repo.create_file( path="qa_history.json", message="Create QA history", content=json.dumps(history_data, ensure_ascii=False, indent=2) ) # st.success(f"Successfully created file on GitHub: {response['commit'].html_url}") except Exception as e: # st.error(f"Error saving QA history: {str(e)}") import traceback # st.error(f"Full error: {traceback.format_exc()}") def add_to_qa_history(query: str, answer: str): """Add new QA pair to history""" history_entry = { "timestamp": (datetime.now() + timedelta(hours=5)).isoformat(), "query": query, "answer": answer } save_qa_history(history_entry) return history_entry def add_to_history(role: str, message: str): """Add message to chat history and save if it's a complete QA pair""" st.session_state.chat_history.append((role, message)) # If this is an assistant response, save the QA pair if role == "assistant" and len(st.session_state.chat_history) >= 2: # Get the corresponding user query (previous message) user_query = st.session_state.chat_history[-2][1] add_to_qa_history(user_query, message) def display_chat_history(): """Display chat history with enhanced styling""" for i, (role, message) in enumerate(st.session_state.chat_history): if role == "user": st.markdown(f"""
🧑 āļ„āļģāļ–āļēāļĄ:
{message}
""", unsafe_allow_html=True) else: st.markdown(f"""
ðŸĪ– āļ„āļģāļ•āļ­āļš:
{message}
""", unsafe_allow_html=True) if 'context_memory' not in st.session_state: st.session_state.context_memory = [] def handle_submit(user_query: str): """Handle form submission logic""" if not user_query: st.warning("⚠ïļ āļāļĢāļļāļ“āļēāļĢāļ°āļšāļļāļ„āļģāļ–āļēāļĄ") return user_query = user_query.strip() # Prevent duplicate submissions by checking last message if not st.session_state.chat_history or st.session_state.chat_history[-1][1] != user_query: try: st.session_state.processing_query = True # Add user message to chat history st.session_state.chat_history.append(("user", user_query)) # Maintain context memory if len(st.session_state.context_memory) > 5: st.session_state.context_memory.pop(0) # Build query with context query_with_context = "\n".join( [f"Q: {qa['query']}\nA: {qa['answer']}" for qa in st.session_state.context_memory] ) + f"\nQ: {user_query}" # Process query result = st.session_state.pipeline.process_query(query_with_context) # Create response dictionary with answer and documents response_dict = { "answer": result.get("answer", ""), "documents": result.get("documents", []) } # Update chat history and context st.session_state.chat_history.append(("assistant", response_dict)) st.session_state.context_memory.append({"query": user_query, "answer": response_dict}) # Save to QA history add_to_qa_history(user_query, response_dict) except Exception as e: st.session_state.chat_history.append(("assistant", f"❌ āđ€āļāļīāļ”āļ‚āđ‰āļ­āļœāļīāļ”āļžāļĨāļēāļ”: {str(e)}")) st.error(f"Query processing error: {e}") finally: st.session_state.processing_query = False st.rerun() def create_chat_input(): """Create the chat input section with form handling""" # Column layout for the entire input section input_col, clear_col = st.columns([4, 1]) with input_col: # Create the form for chat input with st.form(key="chat_form", clear_on_submit=True): st.markdown(""" """, unsafe_allow_html=True) # Text input query = st.text_input( "", key="query_input", placeholder="āđ€āļŠāđˆāļ™: āļ§āļąāļ™āļŠāļļāļ”āļ—āđ‰āļēāļĒāļ‚āļ­āļ‡āļāļēāļĢāļŠāļ­āļšāļ›āļēāļāđ€āļ›āļĨāđˆāļēāđƒāļ™āļ āļēāļ„āđ€āļĢāļĩāļĒāļ™āļ—āļĩāđˆ 1/2567 āļ„āļ·āļ­āļ§āļąāļ™āļ—āļĩāđˆāđ€āļ—āđˆāļēāđ„āļĢ?" ) # Submit button in form submitted = st.form_submit_button( "ðŸ“Ī āļŠāđˆāļ‡āļ„āļģāļ–āļēāļĄ", type="primary", use_container_width=True ) if submitted: handle_submit(query) # Clear history button outside the form with clear_col: if st.button("🗑ïļ āļĨāđ‰āļēāļ‡āļ›āļĢāļ°āļ§āļąāļ•āļī", type="secondary", use_container_width=True): st.session_state.context_memory = [] st.session_state.chat_history = [] st.rerun() def main(): # Page config st.set_page_config( page_title="Academic Calendar Assistant", page_icon="📅", layout="wide", initial_sidebar_state="collapsed" ) # Load custom CSS load_custom_css() # Initialize session states if 'pipeline' not in st.session_state: st.session_state.pipeline = None if 'chat_history' not in st.session_state: st.session_state.chat_history = [] if 'context_memory' not in st.session_state: st.session_state.context_memory = [] if 'processing_query' not in st.session_state: st.session_state.processing_query = False # Load QA history at startup if 'qa_history_loaded' not in st.session_state: st.session_state.qa_history_loaded = True load_qa_history() # Initialize pipeline if st.session_state.pipeline is None: with st.spinner("āļāļģāļĨāļąāļ‡āđ€āļĢāļīāđˆāļĄāļ•āđ‰āļ™āļĢāļ°āļšāļš..."): st.session_state.pipeline = initialize_pipeline() # Header st.markdown("""

🎓 āļĢāļ°āļšāļšāļ„āđ‰āļ™āļŦāļēāļ‚āđ‰āļ­āļĄāļđāļĨāļ›āļāļīāļ—āļīāļ™āļāļēāļĢāļĻāļķāļāļĐāļē

āļšāļąāļ“āļ‘āļīāļ•āļ§āļīāļ—āļĒāļēāļĨāļąāļĒ āļĄāļŦāļēāļ§āļīāļ—āļĒāļēāļĨāļąāļĒāļĻāļĢāļĩāļ™āļ„āļĢāļīāļ™āļ—āļĢāļ§āļīāđ‚āļĢāļ’

""", unsafe_allow_html=True) chat_col, info_col = st.columns([7, 3]) with chat_col: # Display chat history first for i, (role, content) in enumerate(st.session_state.chat_history): if role == "user": st.markdown(f"""
🧑 āļ„āļģāļ–āļēāļĄ:
{content}
""", unsafe_allow_html=True) else: if isinstance(content, dict): assistant_response = content.get('answer', '❌ āđ„āļĄāđˆāļĄāļĩāļ‚āđ‰āļ­āļĄāļđāļĨāļ„āļģāļ•āļ­āļš') else: assistant_response = content st.markdown(f"""
ðŸĪ– āļ„āļģāļ•āļ­āļš:
{assistant_response}
""", unsafe_allow_html=True) if isinstance(content, dict) and content.get('documents'): with st.expander("📚 āđāļŠāļ”āļ‡āļ‚āđ‰āļ­āļĄāļđāļĨāļ­āđ‰āļēāļ‡āļ­āļīāļ‡", expanded=False): for i, doc in enumerate(content['documents'], 1): st.markdown(f"""
āđ€āļ­āļāļŠāļēāļĢāļ—āļĩāđˆ {i}:
{doc.content}
""", unsafe_allow_html=True) # Create chat input using the new implementation create_chat_input() # Sidebar info column with info_col: st.markdown("""

â„đïļ āđ€āļāļĩāđˆāļĒāļ§āļāļąāļšāļĢāļ°āļšāļš

āļĢāļ°āļšāļšāļ™āļĩāđ‰āđƒāļŠāđ‰āđ€āļ—āļ„āđ‚āļ™āđ‚āļĨāļĒāļĩ RAG (Retrieval-Augmented Generation) āđƒāļ™āļāļēāļĢāļ„āđ‰āļ™āļŦāļēāđāļĨāļ°āļ•āļ­āļšāļ„āļģāļ–āļēāļĄāđ€āļāļĩāđˆāļĒāļ§āļāļąāļšāļ›āļāļīāļ—āļīāļ™āļāļēāļĢāļĻāļķāļāļĐāļē

āļŠāļēāļĄāļēāļĢāļ–āļŠāļ­āļšāļ–āļēāļĄāļ‚āđ‰āļ­āļĄāļđāļĨāđ€āļāļĩāđˆāļĒāļ§āļāļąāļš:

""", unsafe_allow_html=True) st.markdown("""

🔄 āļŠāļ–āļēāļ™āļ°āļĢāļ°āļšāļš

⏰ āđ€āļ§āļĨāļēāļ›āļąāļˆāļˆāļļāļšāļąāļ™:
{}

ðŸ“Ą āļŠāļ–āļēāļ™āļ°āļĢāļ°āļšāļš:
{} {}

""".format( (datetime.now() + timedelta(hours=5)).strftime('%Y-%m-%d %H:%M:%S'), "status-online" if st.session_state.pipeline else "status-offline", "ðŸŸĒ" if st.session_state.pipeline else "ðŸ”ī", "āļžāļĢāđ‰āļ­āļĄāđƒāļŠāđ‰āļ‡āļēāļ™" if st.session_state.pipeline else "āđ„āļĄāđˆāļžāļĢāđ‰āļ­āļĄāđƒāļŠāđ‰āļ‡āļēāļ™" ), unsafe_allow_html=True) if __name__ == "__main__": main()