import streamlit as st import json import os import requests import json import base64 from datetime import datetime, timedelta import subprocess from huggingface_hub import HfApi from pathlib import Path from calendar_rag import ( create_default_config, AcademicCalendarRAG, PipelineConfig, ModelConfig, RetrieverConfig, CacheConfig, ProcessingConfig, LocalizationConfig ) # Custom CSS for enhanced styling def load_custom_css(): st.markdown(""" """, unsafe_allow_html=True) def clear_conversation_context(): """Clear conversation context but keep chat display history""" # Clear the RAG pipeline's conversation history if 'pipeline' in st.session_state and st.session_state.pipeline: st.session_state.pipeline.conversation_history = [] # Clear the context memory st.session_state.context_memory = [] # Note: We keep st.session_state.chat_history for UI display purposes def initialize_pipeline(): """Initialize RAG pipeline with conversation memory support""" try: # Get API key from environment or secrets openai_api_key = os.getenv('OPENAI_API_KEY') or st.secrets['OPENAI_API_KEY'] # Create config with same settings as main() config = create_default_config(openai_api_key) # Create pipeline pipeline = AcademicCalendarRAG(config) # Load raw data instead of calendar.json try: with open("calendar.json", "r", encoding="utf-8") as f: raw_data = json.load(f) pipeline.load_data(raw_data) # Initialize conversation history from session state if available if 'context_memory' in st.session_state and st.session_state.context_memory: # Convert context memory to conversation history format conversation_history = [] for item in st.session_state.context_memory: conversation_history.append({"role": "user", "content": item["query"]}) conversation_history.append({"role": "assistant", "content": item["response"]}) pipeline.conversation_history = conversation_history return pipeline except FileNotFoundError: st.error("calendar.json not found. Please ensure the file exists in the same directory.") return None except Exception as e: st.error(f"Error initializing pipeline: {str(e)}") return None def load_qa_history(): """Load QA history directly from GitHub repository""" try: import requests import base64 import json # GitHub API configuration REPO_OWNER = "jirasaksaimekJijo" REPO_NAME = "swu-chat-bot-project" FILE_PATH = "qa_history.json" GITHUB_TOKEN = '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' # Set up GitHub API request api_url = f"https://api.github.com/repos/{REPO_OWNER}/{REPO_NAME}/contents/{FILE_PATH}" headers = {"Accept": "application/vnd.github.v3+json"} if GITHUB_TOKEN: headers["Authorization"] = f"token {GITHUB_TOKEN}" # Make the request to GitHub API response = requests.get(api_url, headers=headers) if response.status_code == 200: # Decode the content from base64 content_data = response.json() file_content = base64.b64decode(content_data["content"]).decode("utf-8") # Parse JSON history_data = json.loads(file_content) return history_data else: st.warning(f"Failed to fetch QA history: {response.status_code} - {response.reason}") # Return empty list if file doesn't exist or can't be accessed return [] except Exception as e: st.error(f"Error loading QA history from GitHub: {str(e)}") return [] def save_qa_history(history_entry): """Save QA history entry to local JSON file and push to GitHub""" try: import requests import base64 import json from pathlib import Path # GitHub API configuration REPO_OWNER = "jirasaksaimekJijo" REPO_NAME = "swu-chat-bot-project" FILE_PATH = "qa_history.json" GITHUB_TOKEN = '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' # First, load existing data from GitHub api_url = f"https://api.github.com/repos/{REPO_OWNER}/{REPO_NAME}/contents/{FILE_PATH}" headers = { "Accept": "application/vnd.github.v3+json", "Authorization": f"token {GITHUB_TOKEN}" } # Try to get existing file first response = requests.get(api_url, headers=headers) # Initialize empty history data history_data = [] sha = None if response.status_code == 200: # File exists, get its content and SHA content_data = response.json() sha = content_data["sha"] try: # Decode and parse existing content file_content = base64.b64decode(content_data["content"]).decode("utf-8") if file_content.strip(): # Make sure content is not empty history_data = json.loads(file_content) # Ensure history_data is a list if not isinstance(history_data, list): st.warning("Existing history data is not a list. Initializing new list.") history_data = [] except Exception as e: st.warning(f"Error parsing existing history: {e}. Initializing new list.") elif response.status_code == 404: # File doesn't exist yet st.info("Creating new QA history file.") else: st.error(f"Failed to check existing history: {response.status_code} - {response.reason}") # Process history entry before appending if isinstance(history_entry, dict) and all(key in history_entry for key in ["timestamp", "query", "answer"]): # Process answer if it's a dict if isinstance(history_entry["answer"], dict): history_entry["answer"] = history_entry["answer"].get('answer', str(history_entry["answer"])) # Process answer if it's a Document-like object elif hasattr(history_entry["answer"], 'content'): history_entry["answer"] = history_entry["answer"].content # Convert to string for any other type else: history_entry["answer"] = str(history_entry["answer"]) # Append new entry to history data history_data.append(history_entry) # Also save locally for backup try: local_path = Path("qa_history.json") with open(local_path, "w", encoding="utf-8") as f: json.dump(history_data, f, ensure_ascii=False, indent=2) except Exception as local_err: st.warning(f"Failed to save local backup: {local_err}") # Prepare content for GitHub updated_content = json.dumps(history_data, ensure_ascii=False, indent=2) encoded_content = base64.b64encode(updated_content.encode('utf-8')).decode('utf-8') # Prepare the update/create payload data = { "message": "Update QA history", "content": encoded_content, } if sha: # If file exists, include its SHA data["sha"] = sha # Update or create the file update_response = requests.put(api_url, headers=headers, json=data) if update_response.status_code in [200, 201]: return True else: st.error(f"Failed to update QA history: {update_response.status_code} - {update_response.text}") return False except Exception as e: import traceback st.error(f"Error in save_qa_history: {str(e)}") st.error(f"Traceback: {traceback.format_exc()}") return False def add_to_qa_history(query: str, answer: str): """Add new QA pair to history with validation""" try: # Validate inputs if not query or not answer: st.warning("Empty query or answer detected, skipping history update") return None # Handle different answer types if isinstance(answer, dict): # If answer is a dict with 'answer' key, extract it processed_answer = answer.get('answer', str(answer)) elif hasattr(answer, 'content'): # If answer is a Document-like object with content attribute processed_answer = answer.content else: # Convert answer to string for any other type processed_answer = str(answer) # Create history entry with proper timestamp history_entry = { "timestamp": (datetime.now() + timedelta(hours=5)).strftime("%Y-%m-%dT%H:%M:%S"), "query": query, "answer": processed_answer } # Save entry save_qa_history(history_entry) return history_entry except Exception as e: st.error(f"Error in add_to_qa_history: {str(e)}") return None def add_to_history(role: str, message: str): """Add message to chat history, save if it's a complete QA pair, and update context memory""" st.session_state.chat_history.append((role, message)) # If this is an assistant response, save the QA pair if role == "assistant" and len(st.session_state.chat_history) >= 2: # Get the corresponding user query (previous message) user_query = st.session_state.chat_history[-2][1] # Process and save the QA pair history_entry = add_to_qa_history(user_query, message) # Also update context memory if needed if 'context_memory' not in st.session_state: st.session_state.context_memory = [] # Format response content for context memory if isinstance(message, dict) and "answer" in message: response_content = message["answer"] else: response_content = message st.session_state.context_memory.append({ "query": user_query, "response": response_content, "timestamp": (datetime.now() + timedelta(hours=5)).strftime("%Y-%m-%dT%H:%M:%S") }) # Limit context memory size to prevent performance issues if len(st.session_state.context_memory) > 10: # Keep last 10 exchanges st.session_state.context_memory = st.session_state.context_memory[-10:] def display_chat_history(): """Display chat history with improved document display""" for role, content in st.session_state.chat_history: if role == "user": st.markdown(f"""
""", unsafe_allow_html=True) else: if isinstance(content, dict): assistant_response = content.get('answer', 'â āđāļĄāđāļĄāļĩāļāđāļāļĄāļđāļĨāļāļģāļāļāļ') st.markdown(f""" """, unsafe_allow_html=True) # Show reference documents like in main() if content.get('documents'): with st.expander("ð āļāđāļāļĄāļđāļĨāļāđāļēāļāļāļīāļ", expanded=False): for i, doc in enumerate(content['documents'], 1): st.markdown(f"""āļāļąāļāļāļīāļāļ§āļīāļāļĒāļēāļĨāļąāļĒ āļĄāļŦāļēāļ§āļīāļāļĒāļēāļĨāļąāļĒāļĻāļĢāļĩāļāļāļĢāļīāļāļāļĢāļ§āļīāđāļĢāļ
āļĢāļ°āļāļāļāļĩāđāđāļāđāđāļāļāđāļāđāļĨāļĒāļĩ RAG (Retrieval-Augmented Generation) āđāļāļāļēāļĢāļāđāļāļŦāļēāđāļĨāļ°āļāļāļāļāļģāļāļēāļĄāđāļāļĩāđāļĒāļ§āļāļąāļāļŦāļĨāļąāļāļŠāļđāļāļĢāđāļĨāļ°āļāļāļīāļāļīāļāļāļēāļĢāļĻāļķāļāļĐāļē
â° āđāļ§āļĨāļēāļāļąāļāļāļļāļāļąāļ:
{}
ðĄ āļŠāļāļēāļāļ°āļĢāļ°āļāļ:
{} {}