Spaces:
Runtime error
Runtime error
File size: 5,348 Bytes
dcb2a99 3921722 dcb2a99 798eb17 dcb2a99 3921722 798eb17 3921722 dcb2a99 3921722 dcb2a99 798eb17 dcb2a99 798eb17 dcb2a99 798eb17 dcb2a99 798eb17 dcb2a99 3921722 dcb2a99 798eb17 dcb2a99 798eb17 dcb2a99 798eb17 3921722 dcb2a99 798eb17 3921722 798eb17 dcb2a99 798eb17 dcb2a99 798eb17 dcb2a99 798eb17 dcb2a99 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 |
"""
Advanced Agentic System Interface
-------------------------------
Provides an interface to interact with the autonomous agent system
using local LLM for improved performance.
"""
import gradio as gr
import asyncio
from typing import Dict, Any, List
import json
from datetime import datetime
import logging
import os
import socket
import requests
from requests.adapters import HTTPAdapter, Retry
from agentic_system import AgenticSystem
from team_management import TeamManager
from orchestrator import AgentOrchestrator
from reasoning import UnifiedReasoningEngine
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Configure network settings
TIMEOUT = int(os.getenv('REQUESTS_TIMEOUT', '30'))
RETRIES = 3
def check_network():
"""Check network connectivity."""
try:
# Test connection to Hugging Face
response = requests.get('https://huggingface.co', timeout=TIMEOUT)
return response.status_code == 200
except requests.RequestException as e:
logger.warning(f"Network connectivity issue: {e}")
return False
class AgentInterface:
"""Interface for the agentic system."""
def __init__(self):
"""Initialize the interface components."""
# Check network connectivity
if not check_network():
logger.warning("Network connectivity issues detected")
self.orchestrator = AgentOrchestrator()
self.reasoning_engine = UnifiedReasoningEngine(
min_confidence=0.7,
parallel_threshold=3,
learning_rate=0.1
)
self.team_manager = TeamManager(self.orchestrator)
self.system = AgenticSystem()
async def process_query(self, message: str) -> str:
"""Process user query through the reasoning system."""
try:
# Log incoming query
logger.info(f"Processing query: {message}")
# Get reasoning result
result = await self.reasoning_engine.reason(
query=message,
context={"timestamp": datetime.now().isoformat()}
)
# Format response
if result.success:
response = f"Answer: {result.answer}\nConfidence: {result.confidence:.2f}"
if result.meta_insights:
response += "\nInsights:\n" + "\n".join(f"- {insight}" for insight in result.meta_insights)
else:
response = "I apologize, but I couldn't process your query effectively. Please try rephrasing or providing more context."
return response
except Exception as e:
logger.error(f"Error processing query: {e}")
return f"An error occurred: {str(e)}"
def health_check(self) -> Dict[str, Any]:
"""Check system health."""
return {
"status": "healthy",
"network": check_network(),
"components": {
"orchestrator": self.orchestrator is not None,
"reasoning_engine": self.reasoning_engine is not None,
"team_manager": self.team_manager is not None,
"system": self.system is not None
},
"timestamp": datetime.now().isoformat()
}
# Initialize interface
interface = AgentInterface()
# Create Gradio interface
with gr.Blocks(title="Advanced Reasoning System", theme=gr.themes.Soft()) as demo:
gr.Markdown("""
# 🤖 Advanced Reasoning System
Welcome to the Advanced Reasoning System! This system combines multiple reasoning strategies:
- Chain of Thought
- Tree of Thoughts
- Meta Learning
- Local LLM
- And more!
Ask any question and the system will use its advanced reasoning capabilities to help you.
""")
with gr.Row():
with gr.Column(scale=4):
query_input = gr.Textbox(
label="Your Question",
placeholder="Enter your question here...",
lines=3
)
with gr.Row():
submit_btn = gr.Button("Submit", variant="primary")
clear_btn = gr.Button("Clear")
with gr.Column(scale=6):
output = gr.Textbox(
label="Response",
lines=10,
interactive=False
)
# Add examples
gr.Examples(
examples=[
"How would you approach designing a scalable microservices architecture?",
"What are the key considerations for implementing a secure authentication system?",
"Can you help me understand the differences between various machine learning algorithms?",
],
inputs=query_input,
label="Example Questions"
)
# Event handlers
submit_btn.click(
fn=interface.process_query,
inputs=query_input,
outputs=output,
api_name="process_query"
)
clear_btn.click(
fn=lambda: ("", ""),
inputs=None,
outputs=[query_input, output],
api_name="clear"
)
# Add health check endpoint
demo.load(
fn=interface.health_check,
inputs=None,
outputs=None,
api_name="health"
)
# Launch the interface
if __name__ == "__main__":
demo.launch()
|