import os import sys import threading import time import requests from requests.exceptions import ConnectionError os.environ['MPLCONFIGDIR'] = os.path.join(os.getcwd(), 'cache', 'matplotlib') os.makedirs(os.environ['MPLCONFIGDIR'], exist_ok=True) import gradio as gr import uvicorn from fastapi import FastAPI from fastapi.responses import HTMLResponse # Add project root to Python path current_dir = os.path.abspath(os.path.dirname(__file__)) if current_dir not in sys.path: sys.path.insert(0, current_dir) # Import our main application from api.fastapi_server import app as fastapi_app def wait_for_fastapi(timeout=30, interval=0.5): """Wait for FastAPI server to start""" start_time = time.time() while time.time() - start_time < timeout: try: response = requests.get("http://127.0.0.1:7860/health") if response.status_code == 200: print("FastAPI server is ready!") return True except ConnectionError: time.sleep(interval) return False # Run FastAPI server in a separate thread #def run_fastapi(): # uvicorn.run(fastapi_app, host="0.0.0.0", port=7860) def run_fastapi(): import multiprocessing multiprocessing.Process(target=uvicorn.run, args=(fastapi_app,), kwargs={"host": "0.0.0.0", "port": 7860}).start() # Start FastAPI in a background thread fastapi_thread = threading.Thread(target=run_fastapi, daemon=True) fastapi_thread.start() # Wait for FastAPI to start if not wait_for_fastapi(): print("Failed to start FastAPI server") sys.exit(1) # Create a Gradio interface that will proxy requests to FastAPI def respond(message, history, conversation_id=None): """Handle chat messages and update conversation history""" if not message: return history, conversation_id try: # Send request to FastAPI backend response = requests.post( "http://127.0.0.1:7860/chat", json={"message": message, "conversation_id": conversation_id} ) if response.status_code == 200: data = response.json() # Update conversation history history = history + [(message, data["response"])] return history, data["conversation_id"] else: error_message = f"Error: {response.status_code} - {response.text}" history = history + [(message, error_message)] return history, conversation_id except Exception as e: error_message = f"Error connecting to API: {str(e)}" history = history + [(message, error_message)] return history, conversation_id def build_kb(): try: response = requests.post("http://127.0.0.1:7860/build-kb") if response.status_code == 200: return f"Success: {response.json()['message']}" else: return f"Error: {response.status_code} - {response.text}" except Exception as e: return f"API connection error: {str(e)}" def get_kb_size(): """Calculate actual knowledge base size from files""" vector_store_path = "vector_store" try: total_size = 0 for file in ["index.faiss", "index.pkl"]: file_path = os.path.join(vector_store_path, file) if os.path.exists(file_path): total_size += os.path.getsize(file_path) return total_size / (1024 * 1024) # Convert to MB except Exception: return None # Добавим функцию проверки статуса базы знаний def check_kb_status(): try: response = requests.get("http://127.0.0.1:7860/status") if response.status_code == 200: data = response.json() if data["knowledge_base_exists"]: kb_info = data["kb_info"] version = kb_info.get('version', 'N/A') # Получаем реальный размер файлов actual_size = get_kb_size() size_text = f"{actual_size:.2f} MB" if actual_size is not None else "N/A" return f"✅ База знаний готова к работе\nВерсия: {version}\nРазмер: {size_text}" else: # Проверяем, есть ли файлы на диске if os.path.exists(os.path.join("vector_store", "index.faiss")): actual_size = get_kb_size() size_text = f"{actual_size:.2f} MB" if actual_size is not None else "N/A" return f"✅ База знаний найдена на диске\nРазмер: {size_text}\nТребуется перезагрузка сервера" return "❌ База знаний не создана. Нажмите кнопку 'Create/Update Knowledge Base'" except Exception as e: # Проверяем наличие файлов даже при ошибке соединения if os.path.exists(os.path.join("vector_store", "index.faiss")): actual_size = get_kb_size() size_text = f"{actual_size:.2f} MB" if actual_size is not None else "N/A" return f"⚠️ Ошибка соединения, но база знаний существует\nРазмер: {size_text}" return f"❌ Ошибка проверки статуса: {str(e)}" def clear_history(): """Reset chat history and conversation ID""" return [], None # Returns empty history and resets conversation ID # Create the Gradio interface demo = gr.Blocks(title="Status Law Assistant", theme=gr.themes.Soft()) with demo: # Add conversation_id state first conversation_id = gr.State(None) gr.Markdown("# 🤖 Status Law Assistant") with gr.Row(): # Left column for chat (larger) with gr.Column(scale=3): chatbot = gr.Chatbot( label="Conversation", height=600, show_label=False, bubble_full_width=False ) with gr.Row(): msg = gr.Textbox( label="Enter your question here", placeholder="Type your message and press Enter...", scale=4 ) submit_btn = gr.Button("Send", variant="primary", scale=1) # Right column for controls (smaller) with gr.Column(scale=1): gr.Markdown("### Knowledge Base Controls") build_kb_btn = gr.Button("Create/Update Knowledge Base", variant="primary") check_status_btn = gr.Button("Check Status") kb_status = gr.Textbox( label="Knowledge Base Status", value="Checking status...", interactive=False ) gr.Markdown("### Chat Controls") clear_btn = gr.Button("Clear Chat History") # Help section with gr.Accordion("ℹ️ How to use", open=False): gr.Markdown(""" 1. First, click **Create/Update Knowledge Base** button 2. Wait for confirmation message 3. Enter your question in the text field and press Enter or "Send" button 4. Use "Clear Chat History" to start a new conversation """) # Event handlers remain the same msg.submit(respond, [msg, chatbot, conversation_id], [chatbot, conversation_id]) submit_btn.click(respond, [msg, chatbot, conversation_id], [chatbot, conversation_id]) clear_btn.click(clear_history, None, [chatbot, conversation_id]) build_kb_btn.click(build_kb, inputs=None, outputs=kb_status) check_status_btn.click(check_kb_status, inputs=None, outputs=kb_status) # Mount Gradio app to FastAPI at root path app = gr.mount_gradio_app(fastapi_app, demo, path="/") if __name__ == "__main__": # Launch the combined app import uvicorn uvicorn.run(app, host="0.0.0.0", port=7860) #demo.launch(server_name="0.0.0.0", server_port=7860, share=False)