Spaces:
Runtime error
Runtime error
import gradio as gr | |
import os | |
import time | |
import asyncio | |
from cerebras.cloud.sdk import Cerebras | |
from groq import Groq | |
import requests | |
from bs4 import BeautifulSoup | |
from urllib.parse import urljoin, urlparse | |
import re | |
import json | |
import numpy as np | |
from datetime import datetime | |
import logging | |
import aiohttp | |
# Enhanced API Setup | |
CEREBRAS_API_KEY = os.getenv("CEREBRAS_API_KEY") | |
GROQ_API_KEY = os.getenv("GROQ_API_KEY") | |
if not CEREBRAS_API_KEY or not GROQ_API_KEY: | |
raise ValueError("Both CEREBRAS_API_KEY and GROQ_API_KEY environment variables must be set.") | |
cerebras_client = Cerebras(api_key=CEREBRAS_API_KEY) | |
groq_client = Groq(api_key=GROQ_API_KEY) | |
# Configure logging | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(levelname)s - %(message)s', | |
filename='agent.log' | |
) | |
class EnhancedToolkit: | |
async def fetch_webpage_async(url, timeout=10): | |
try: | |
async with aiohttp.ClientSession() as session: | |
async with session.get(url, timeout=timeout) as response: | |
if response.status == 200: | |
return await response.text() | |
return f"Error: HTTP {response.status}" | |
except Exception as e: | |
return f"Error fetching URL: {str(e)}" | |
def extract_text_from_html(html): | |
soup = BeautifulSoup(html, 'html.parser') | |
# Remove script and style elements | |
for script in soup(["script", "style"]): | |
script.decompose() | |
text = soup.get_text(separator=' ', strip=True) | |
# Normalize whitespace | |
text = ' '.join(text.split()) | |
return text | |
def validate_url(url): | |
try: | |
result = urlparse(url) | |
return all([result.scheme, result.netloc]) | |
except ValueError: | |
return False | |
def summarize_text(text, max_length=500): | |
"""Simple text summarization by extracting key sentences""" | |
sentences = text.split('. ') | |
if len(sentences) <= 3: | |
return text | |
# Simple importance scoring based on sentence length and position | |
scores = [] | |
for i, sentence in enumerate(sentences): | |
score = len(sentence.split()) * (1.0 / (i + 1)) # Length and position weight | |
scores.append((score, sentence)) | |
# Get top sentences | |
scores.sort(reverse=True) | |
summary = '. '.join(sent for _, sent in scores[:3]) + '.' | |
return summary | |
def analyze_sentiment(text): | |
"""Simple sentiment analysis""" | |
positive_words = set(['good', 'great', 'excellent', 'positive', 'amazing', 'wonderful']) | |
negative_words = set(['bad', 'poor', 'negative', 'terrible', 'awful', 'horrible']) | |
words = text.lower().split() | |
pos_count = sum(1 for word in words if word in positive_words) | |
neg_count = sum(1 for word in words if word in negative_words) | |
if pos_count > neg_count: | |
return 'positive' | |
elif neg_count > pos_count: | |
return 'negative' | |
return 'neutral' | |
class AgentCore: | |
def __init__(self): | |
self.toolkit = EnhancedToolkit() | |
self.tool_execution_count = 0 | |
self.max_tools_per_turn = 5 | |
self.context_window = [] | |
self.max_context_items = 10 | |
def update_context(self, user_input, ai_response): | |
self.context_window.append({ | |
'user_input': user_input, | |
'ai_response': ai_response, | |
'timestamp': datetime.now().isoformat() | |
}) | |
if len(self.context_window) > self.max_context_items: | |
self.context_window.pop(0) | |
async def execute_tool(self, action, parameters): | |
if self.tool_execution_count >= self.max_tools_per_turn: | |
return "Tool usage limit reached for this turn." | |
self.tool_execution_count += 1 | |
if action == "scrape": | |
url = parameters.get("url") | |
if not self.toolkit.validate_url(url): | |
return "Invalid URL provided." | |
html_content = await self.toolkit.fetch_webpage_async(url) | |
if html_content.startswith("Error"): | |
return html_content | |
text_content = self.toolkit.extract_text_from_html(html_content) | |
summary = self.toolkit.summarize_text(text_content) | |
sentiment = self.toolkit.analyze_sentiment(text_content) | |
return { | |
'summary': summary, | |
'sentiment': sentiment, | |
'full_text': text_content[:1000] + '...' if len(text_content) > 1000 else text_content | |
} | |
elif action == "search": | |
query = parameters.get("query") | |
return f"Simulated search for: {query}\nThis would connect to a search API in production." | |
elif action == "analyze": | |
text = parameters.get("text") | |
if not text: | |
return "No text provided for analysis" | |
return { | |
'sentiment': self.toolkit.analyze_sentiment(text), | |
'summary': self.toolkit.summarize_text(text) | |
} | |
return f"Unknown tool: {action}" | |
async def chat_with_agent(user_input, chat_history, agent_core): | |
start_time = time.time() | |
try: | |
# Reset tool counter for new turn | |
agent_core.tool_execution_count = 0 | |
# Prepare context-aware prompt | |
system_prompt = """You are OmniAgent, a highly advanced AI assistant with multiple capabilities: | |
Core Abilities: | |
1. Task Understanding & Planning | |
2. Web Information Retrieval & Analysis | |
3. Content Summarization & Sentiment Analysis | |
4. Context-Aware Problem Solving | |
5. Creative Solution Generation | |
Available Tools: | |
- scrape: Extract and analyze web content | |
- search: Find relevant information | |
- analyze: Process and understand text | |
Use format: | |
Action: take_action | |
Parameters: {"action": "tool_name", "parameters": {...}} | |
Approach each task with: | |
1. Initial analysis | |
2. Step-by-step planning | |
3. Tool utilization when needed | |
4. Result synthesis | |
5. Clear explanation | |
Remember to maintain a helpful, professional, yet friendly tone.""" | |
messages = [ | |
{"role": "system", "content": system_prompt}, | |
{"role": "user", "content": user_input} | |
] | |
# Use both models for different aspects of processing | |
async def get_cerebras_response(): | |
response = cerebras_client.completions.create( | |
prompt=f"{system_prompt}\n\nUser: {user_input}", | |
max_tokens=1000, | |
temperature=0.7 | |
) | |
return response.text | |
async def get_groq_response(): | |
completion = groq_client.chat.completions.create( | |
messages=messages, | |
temperature=0.7, | |
max_tokens=2048, | |
stream=True | |
) | |
return completion | |
# Get responses from both models | |
cerebras_future = asyncio.create_task(get_cerebras_response()) | |
groq_stream = await get_groq_response() | |
# Process responses | |
response = "" | |
chain_of_thought = "" | |
# Process Groq stream | |
for chunk in groq_stream: | |
if chunk.choices[0].delta and chunk.choices[0].delta.content: | |
content = chunk.choices[0].delta.content | |
response += content | |
if "Chain of Thought:" in content: | |
chain_of_thought += content.split("Chain of Thought:", 1)[-1] | |
# Tool execution handling | |
if "Action:" in content: | |
action_match = re.search(r"Action: (\w+), Parameters: (\{.*\})", content) | |
if action_match: | |
action = action_match.group(1) | |
try: | |
parameters = json.loads(action_match.group(2)) | |
tool_result = await agent_core.execute_tool( | |
parameters.get("action"), | |
parameters.get("parameters", {}) | |
) | |
response += f"\nTool Result: {json.dumps(tool_result, indent=2)}\n" | |
except json.JSONDecodeError: | |
response += "\nError: Invalid tool parameters\n" | |
# Integrate Cerebras response | |
cerebras_response = await cerebras_future | |
# Combine insights from both models | |
final_response = f"{response}\n\nAdditional Insights:\n{cerebras_response}" | |
# Update context | |
agent_core.update_context(user_input, final_response) | |
compute_time = time.time() - start_time | |
token_usage = len(user_input.split()) + len(final_response.split()) | |
return final_response, chain_of_thought, f"Compute Time: {compute_time:.2f}s", f"Tokens: {token_usage}" | |
except Exception as e: | |
logging.error(f"Error in chat_with_agent: {str(e)}", exc_info=True) | |
return f"Error: {str(e)}", "", "Error occurred", "" | |
def create_interface(): | |
with gr.Blocks(theme=gr.themes.Soft()) as demo: | |
agent_core = AgentCore() | |
gr.Markdown("""# π OmniAgent: Advanced AI Assistant | |
Powered by dual AI models for enhanced capabilities and deeper understanding.""") | |
with gr.Row(): | |
with gr.Column(scale=6): | |
chat_history = gr.Chatbot( | |
label="Interaction History", | |
height=600, | |
show_label=True | |
) | |
with gr.Column(scale=2): | |
with gr.Accordion("Performance Metrics", open=True): | |
compute_time = gr.Textbox(label="Processing Time", interactive=False) | |
token_usage_display = gr.Textbox(label="Resource Usage", interactive=False) | |
with gr.Accordion("Agent Insights", open=True): | |
chain_of_thought_display = gr.Textbox( | |
label="Reasoning Process", | |
interactive=False, | |
lines=10 | |
) | |
user_input = gr.Textbox( | |
label="Your Request", | |
placeholder="How can I assist you today?", | |
lines=3 | |
) | |
with gr.Row(): | |
send_button = gr.Button("Send", variant="primary") | |
clear_button = gr.Button("Clear History", variant="secondary") | |
export_button = gr.Button("Export Chat", variant="secondary") | |
async def handle_chat(chat_history, user_input): | |
if not user_input.strip(): | |
return chat_history, "", "", "" | |
ai_response, chain_of_thought, compute_info, token_usage = await chat_with_agent( | |
user_input, | |
chat_history, | |
agent_core | |
) | |
chat_history.append((user_input, ai_response)) | |
return chat_history, chain_of_thought, compute_info, token_usage | |
def clear_chat(): | |
agent_core.context_window.clear() | |
return [], "", "", "" | |
def export_chat(chat_history): | |
if not chat_history: | |
return "No chat history to export.", "" | |
filename = f"omnigent_chat_{int(time.time())}.txt" | |
chat_text = "\n".join([ | |
f"User: {item[0]}\nAI: {item[1]}\n" | |
for item in chat_history | |
]) | |
with open(filename, "w") as file: | |
file.write(chat_text) | |
return f"Chat exported to {filename}", "" | |
# Event handlers | |
send_button.click( | |
handle_chat, | |
inputs=[chat_history, user_input], | |
outputs=[chat_history, chain_of_thought_display, compute_time, token_usage_display] | |
) | |
clear_button.click( | |
clear_chat, | |
outputs=[chat_history, chain_of_thought_display, compute_time, token_usage_display] | |
) | |
export_button.click( | |
export_chat, | |
inputs=[chat_history], | |
outputs=[compute_time, chain_of_thought_display] | |
) | |
user_input.submit( | |
handle_chat, | |
inputs=[chat_history, user_input], | |
outputs=[chat_history, chain_of_thought_display, compute_time, token_usage_display] | |
) | |
gr.Markdown("""### π Advanced Capabilities: | |
- Dual AI Model Processing | |
- Advanced Web Content Analysis | |
- Sentiment Understanding | |
- Intelligent Text Summarization | |
- Context-Aware Responses | |
- Enhanced Error Handling | |
- Detailed Performance Tracking | |
- Comprehensive Logging | |
""") | |
return demo | |
if __name__ == "__main__": | |
demo = create_interface() | |
demo.launch(share=True) |