Spaces:
Runtime error
Runtime error
import gradio as gr | |
import os | |
import time | |
import asyncio | |
from cerebras.cloud.sdk import Cerebras | |
from groq import Groq | |
import requests | |
from bs4 import BeautifulSoup | |
from urllib.parse import urlparse | |
import re | |
import json | |
import logging | |
import aiohttp | |
# API Setup | |
CEREBRAS_API_KEY = os.getenv("CEREBRAS_API_KEY") | |
GROQ_API_KEY = os.getenv("GROQ_API_KEY") | |
if not CEREBRAS_API_KEY or not GROQ_API_KEY: | |
raise ValueError("Both CEREBRAS_API_KEY and GROQ_API_KEY environment variables must be set.") | |
cerebras_client = Cerebras(api_key=CEREBRAS_API_KEY) | |
groq_client = Groq(api_key=GROQ_API_KEY) | |
# Configure logging | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(levelname)s - %(message)s', | |
filename='agent.log' | |
) | |
# Helper Functions | |
class EnhancedToolkit: | |
async def fetch_webpage_async(url, timeout=10): | |
try: | |
async with aiohttp.ClientSession() as session: | |
async with session.get(url, timeout=timeout) as response: | |
if response.status == 200: | |
return await response.text() | |
return f"Error: HTTP {response.status}" | |
except Exception as e: | |
logging.error(f"Error fetching URL: {str(e)}") | |
return f"Error fetching URL: {str(e)}" | |
def extract_text_from_html(html): | |
soup = BeautifulSoup(html, 'html.parser') | |
for script in soup(["script", "style"]): | |
script.decompose() | |
text = soup.get_text(separator=' ', strip=True) | |
return ' '.join(text.split()) | |
def validate_url(url): | |
try: | |
result = urlparse(url) | |
return all([result.scheme, result.netloc]) | |
except ValueError: | |
return False | |
def summarize_text(text, max_length=500): | |
sentences = text.split('. ') | |
if len(sentences) <= 3: | |
return text | |
scores = [(len(sentence.split()) * (1.0 / (i + 1)), sentence) for i, sentence in enumerate(sentences)] | |
scores.sort(reverse=True) | |
return '. '.join([sentence for _, sentence in scores[:3]]) + '.' | |
def analyze_sentiment(text): | |
positive_words = set(['good', 'great', 'excellent', 'positive', 'amazing']) | |
negative_words = set(['bad', 'poor', 'negative', 'terrible', 'horrible']) | |
words = text.lower().split() | |
pos_count = sum(1 for word in words if word in positive_words) | |
neg_count = sum(1 for word in words if word in negative_words) | |
if pos_count > neg_count: | |
return 'positive' | |
elif neg_count > pos_count: | |
return 'negative' | |
return 'neutral' | |
class AgentCore: | |
def __init__(self): | |
self.toolkit = EnhancedToolkit() | |
self.tool_execution_count = 0 | |
self.max_tools_per_turn = 5 | |
self.context_window = [] | |
self.max_context_items = 10 | |
def update_context(self, user_input, ai_response): | |
self.context_window.append({ | |
'user_input': user_input, | |
'ai_response': ai_response, | |
'timestamp': datetime.now().isoformat() | |
}) | |
if len(self.context_window) > self.max_context_items: | |
self.context_window.pop(0) | |
async def execute_tool(self, action, parameters): | |
if self.tool_execution_count >= self.max_tools_per_turn: | |
return "Tool usage limit reached for this turn." | |
self.tool_execution_count += 1 | |
if action == "scrape": | |
url = parameters.get("url") | |
if not self.toolkit.validate_url(url): | |
return "Invalid URL provided." | |
html_content = await self.toolkit.fetch_webpage_async(url) | |
if html_content.startswith("Error"): | |
return html_content | |
text_content = self.toolkit.extract_text_from_html(html_content) | |
summary = self.toolkit.summarize_text(text_content) | |
sentiment = self.toolkit.analyze_sentiment(text_content) | |
return {'summary': summary, 'sentiment': sentiment, 'full_text': text_content[:1000] + '...' if len(text_content) > 1000 else text_content} | |
if action == "analyze": | |
text = parameters.get("text") | |
if not text: | |
return "No text provided for analysis" | |
return {'sentiment': self.toolkit.analyze_sentiment(text), 'summary': self.toolkit.summarize_text(text)} | |
return f"Unknown tool: {action}" | |
# Chat Interaction | |
async def chat_with_agent(user_input, chat_history, agent_core): | |
start_time = time.time() | |
try: | |
# Reset tool counter for new turn | |
agent_core.tool_execution_count = 0 | |
system_prompt = """You are OmniAgent, a highly advanced AI assistant with multiple capabilities.""" | |
messages = [{"role": "system", "content": system_prompt}, {"role": "user", "content": user_input}] | |
async def get_cerebras_response(): | |
response = cerebras_client.completions.create(prompt=f"{system_prompt}\n\nUser: {user_input}", max_tokens=1000, temperature=0.7) | |
return response.text | |
async def get_groq_response(): | |
completion = groq_client.chat.completions.create(messages=messages, temperature=0.7, max_tokens=2048, stream=True) | |
return completion | |
# Parallel AI Responses | |
cerebras_future = asyncio.create_task(get_cerebras_response()) | |
groq_stream = await get_groq_response() | |
# Process responses | |
response = "" | |
chain_of_thought = "" | |
for chunk in groq_stream: | |
if chunk.choices[0].delta and chunk.choices[0].delta.content: | |
content = chunk.choices[0].delta.content | |
response += content | |
if "Chain of Thought:" in content: | |
chain_of_thought += content.split("Chain of Thought:", 1)[-1] | |
# Tool execution handling | |
if "Action:" in content: | |
action_match = re.search(r"Action: (\w+), Parameters: (\{.*\})", content) | |
if action_match: | |
action = action_match.group(1) | |
try: | |
parameters = json.loads(action_match.group(2)) | |
tool_result = await agent_core.execute_tool(parameters.get("action"), parameters.get("parameters", {})) | |
response += f"\nTool Result: {json.dumps(tool_result, indent=2)}\n" | |
except json.JSONDecodeError: | |
response += "\nError: Invalid tool parameters\n" | |
# Get Cerebras response and combine | |
cerebras_response = await cerebras_future | |
final_response = f"{response}\n\nAdditional Insights:\n{cerebras_response}" | |
# Update context | |
agent_core.update_context(user_input, final_response) | |
compute_time = time.time() - start_time | |
token_usage = len(user_input.split()) + len(final_response.split()) | |
return final_response, chain_of_thought, f"Compute Time: {compute_time:.2f}s", f"Tokens: {token_usage}" | |
except Exception as e: | |
logging.error(f"Error in chat_with_agent: {str(e)}", exc_info=True) | |
return f"Error: {str(e)}", "", "Error occurred", "" | |
def create_interface(): | |
with gr.Blocks(theme=gr.themes.Soft()) as demo: | |
agent_core = AgentCore() | |
gr.Markdown("""# π OmniAgent: Advanced AI Assistant""") | |
with gr.Row(): | |
with gr.Column(scale=6): | |
chat_history = gr.Chatbot(label="Interaction History", height=600, show_label=True) | |
with gr.Column(scale=2): | |
with gr.Accordion("Performance Metrics", open=True): | |
compute_time = gr.Textbox(label="Processing Time", interactive=False) | |
token_usage_display = gr.Textbox(label="Resource Usage", interactive=False) | |
with gr.Accordion("Agent Insights", open=True): | |
chain_of_thought_display = gr.Textbox(label="Reasoning Process", interactive=False, lines=10) | |
user_input = gr.Textbox(label="Your Request", placeholder="How can I assist you today?", lines=3) | |
send_button = gr.Button("Send", variant="primary") | |
clear_button = gr.Button("Clear History", variant="secondary") | |
export_button = gr.Button("Export Chat", variant="secondary") | |
async def handle_chat(chat_history, user_input): | |
if not user_input.strip(): | |
return chat_history, "", "", "" | |
ai_response, chain_of_thought, compute_info, token_usage = await chat_with_agent(user_input, chat_history, agent_core) | |
chat_history.append((user_input, ai_response)) | |
return chat_history, chain_of_thought, compute_info, token_usage | |
def clear_chat(): | |
agent_core.context_window.clear() | |
return [], "", "", "" | |
def export_chat(chat_history): | |
if not chat_history: | |
return "No chat history to export.", "" | |
filename = f"omnigent_chat_{int(time.time())}.txt" | |
chat_text = "\n".join([f"User: {item[0]}\nAI: {item[1]}\n" for item in chat_history]) | |
with open(filename, "w") as file: | |
file.write(chat_text) | |
return f"Chat exported to {filename}", "" | |
# Event handlers | |
send_button.click(handle_chat, inputs=[chat_history, user_input], outputs=[chat_history, chain_of_thought_display, compute_time, token_usage_display]) | |
clear_button.click(clear_chat, outputs=[chat_history, chain_of_thought_display, compute_time, token_usage_display]) | |
export_button.click(export_chat, inputs=[chat_history], outputs=[compute_time, chain_of_thought_display]) | |
user_input.submit(handle_chat, inputs=[chat_history, user_input], outputs=[chat_history, chain_of_thought_display, compute_time, token_usage_display]) | |
gr.Markdown("""### π Advanced Capabilities: | |
- Dual AI Model Processing | |
- Advanced Web Content Analysis | |
- Sentiment Understanding | |
- Intelligent Text Summarization | |
- Context-Aware Responses | |
- Enhanced Error Handling | |
- Detailed Performance Tracking | |
- Comprehensive Logging | |
""") | |
return demo | |
if __name__ == "__main__": | |
demo = create_interface() | |
demo.launch(share=True) | |