import gradio as gr import os import time import asyncio from cerebras.cloud.sdk import Cerebras from groq import Groq import requests from bs4 import BeautifulSoup from urllib.parse import urlparse import re import json import logging import aiohttp # API Setup CEREBRAS_API_KEY = os.getenv("CEREBRAS_API_KEY") GROQ_API_KEY = os.getenv("GROQ_API_KEY") if not CEREBRAS_API_KEY or not GROQ_API_KEY: raise ValueError("Both CEREBRAS_API_KEY and GROQ_API_KEY environment variables must be set.") cerebras_client = Cerebras(api_key=CEREBRAS_API_KEY) groq_client = Groq(api_key=GROQ_API_KEY) # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', filename='agent.log' ) # Helper Functions class EnhancedToolkit: @staticmethod async def fetch_webpage_async(url, timeout=10): try: async with aiohttp.ClientSession() as session: async with session.get(url, timeout=timeout) as response: if response.status == 200: return await response.text() return f"Error: HTTP {response.status}" except Exception as e: logging.error(f"Error fetching URL: {str(e)}") return f"Error fetching URL: {str(e)}" @staticmethod def extract_text_from_html(html): soup = BeautifulSoup(html, 'html.parser') for script in soup(["script", "style"]): script.decompose() text = soup.get_text(separator=' ', strip=True) return ' '.join(text.split()) @staticmethod def validate_url(url): try: result = urlparse(url) return all([result.scheme, result.netloc]) except ValueError: return False @staticmethod def summarize_text(text, max_length=500): sentences = text.split('. ') if len(sentences) <= 3: return text scores = [(len(sentence.split()) * (1.0 / (i + 1)), sentence) for i, sentence in enumerate(sentences)] scores.sort(reverse=True) return '. '.join([sentence for _, sentence in scores[:3]]) + '.' @staticmethod def analyze_sentiment(text): positive_words = set(['good', 'great', 'excellent', 'positive', 'amazing']) negative_words = set(['bad', 'poor', 'negative', 'terrible', 'horrible']) words = text.lower().split() pos_count = sum(1 for word in words if word in positive_words) neg_count = sum(1 for word in words if word in negative_words) if pos_count > neg_count: return 'positive' elif neg_count > pos_count: return 'negative' return 'neutral' class AgentCore: def __init__(self): self.toolkit = EnhancedToolkit() self.tool_execution_count = 0 self.max_tools_per_turn = 5 self.context_window = [] self.max_context_items = 10 def update_context(self, user_input, ai_response): self.context_window.append({ 'user_input': user_input, 'ai_response': ai_response, 'timestamp': datetime.now().isoformat() }) if len(self.context_window) > self.max_context_items: self.context_window.pop(0) async def execute_tool(self, action, parameters): if self.tool_execution_count >= self.max_tools_per_turn: return "Tool usage limit reached for this turn." self.tool_execution_count += 1 if action == "scrape": url = parameters.get("url") if not self.toolkit.validate_url(url): return "Invalid URL provided." html_content = await self.toolkit.fetch_webpage_async(url) if html_content.startswith("Error"): return html_content text_content = self.toolkit.extract_text_from_html(html_content) summary = self.toolkit.summarize_text(text_content) sentiment = self.toolkit.analyze_sentiment(text_content) return {'summary': summary, 'sentiment': sentiment, 'full_text': text_content[:1000] + '...' if len(text_content) > 1000 else text_content} if action == "analyze": text = parameters.get("text") if not text: return "No text provided for analysis" return {'sentiment': self.toolkit.analyze_sentiment(text), 'summary': self.toolkit.summarize_text(text)} return f"Unknown tool: {action}" # Chat Interaction async def chat_with_agent(user_input, chat_history, agent_core): start_time = time.time() try: # Reset tool counter for new turn agent_core.tool_execution_count = 0 system_prompt = """You are OmniAgent, a highly advanced AI assistant with multiple capabilities.""" messages = [{"role": "system", "content": system_prompt}, {"role": "user", "content": user_input}] async def get_cerebras_response(): response = cerebras_client.completions.create(prompt=f"{system_prompt}\n\nUser: {user_input}", max_tokens=1000, temperature=0.7) return response.text async def get_groq_response(): completion = groq_client.chat.completions.create(messages=messages, temperature=0.7, max_tokens=2048, stream=True) return completion # Parallel AI Responses cerebras_future = asyncio.create_task(get_cerebras_response()) groq_stream = await get_groq_response() # Process responses response = "" chain_of_thought = "" for chunk in groq_stream: if chunk.choices[0].delta and chunk.choices[0].delta.content: content = chunk.choices[0].delta.content response += content if "Chain of Thought:" in content: chain_of_thought += content.split("Chain of Thought:", 1)[-1] # Tool execution handling if "Action:" in content: action_match = re.search(r"Action: (\w+), Parameters: (\{.*\})", content) if action_match: action = action_match.group(1) try: parameters = json.loads(action_match.group(2)) tool_result = await agent_core.execute_tool(parameters.get("action"), parameters.get("parameters", {})) response += f"\nTool Result: {json.dumps(tool_result, indent=2)}\n" except json.JSONDecodeError: response += "\nError: Invalid tool parameters\n" # Get Cerebras response and combine cerebras_response = await cerebras_future final_response = f"{response}\n\nAdditional Insights:\n{cerebras_response}" # Update context agent_core.update_context(user_input, final_response) compute_time = time.time() - start_time token_usage = len(user_input.split()) + len(final_response.split()) return final_response, chain_of_thought, f"Compute Time: {compute_time:.2f}s", f"Tokens: {token_usage}" except Exception as e: logging.error(f"Error in chat_with_agent: {str(e)}", exc_info=True) return f"Error: {str(e)}", "", "Error occurred", "" def create_interface(): with gr.Blocks(theme=gr.themes.Soft()) as demo: agent_core = AgentCore() gr.Markdown("""# 🌟 OmniAgent: Advanced AI Assistant""") with gr.Row(): with gr.Column(scale=6): chat_history = gr.Chatbot(label="Interaction History", height=600, show_label=True) with gr.Column(scale=2): with gr.Accordion("Performance Metrics", open=True): compute_time = gr.Textbox(label="Processing Time", interactive=False) token_usage_display = gr.Textbox(label="Resource Usage", interactive=False) with gr.Accordion("Agent Insights", open=True): chain_of_thought_display = gr.Textbox(label="Reasoning Process", interactive=False, lines=10) user_input = gr.Textbox(label="Your Request", placeholder="How can I assist you today?", lines=3) send_button = gr.Button("Send", variant="primary") clear_button = gr.Button("Clear History", variant="secondary") export_button = gr.Button("Export Chat", variant="secondary") async def handle_chat(chat_history, user_input): if not user_input.strip(): return chat_history, "", "", "" ai_response, chain_of_thought, compute_info, token_usage = await chat_with_agent(user_input, chat_history, agent_core) chat_history.append((user_input, ai_response)) return chat_history, chain_of_thought, compute_info, token_usage def clear_chat(): agent_core.context_window.clear() return [], "", "", "" def export_chat(chat_history): if not chat_history: return "No chat history to export.", "" filename = f"omnigent_chat_{int(time.time())}.txt" chat_text = "\n".join([f"User: {item[0]}\nAI: {item[1]}\n" for item in chat_history]) with open(filename, "w") as file: file.write(chat_text) return f"Chat exported to {filename}", "" # Event handlers send_button.click(handle_chat, inputs=[chat_history, user_input], outputs=[chat_history, chain_of_thought_display, compute_time, token_usage_display]) clear_button.click(clear_chat, outputs=[chat_history, chain_of_thought_display, compute_time, token_usage_display]) export_button.click(export_chat, inputs=[chat_history], outputs=[compute_time, chain_of_thought_display]) user_input.submit(handle_chat, inputs=[chat_history, user_input], outputs=[chat_history, chain_of_thought_display, compute_time, token_usage_display]) gr.Markdown("""### 🚀 Advanced Capabilities: - Dual AI Model Processing - Advanced Web Content Analysis - Sentiment Understanding - Intelligent Text Summarization - Context-Aware Responses - Enhanced Error Handling - Detailed Performance Tracking - Comprehensive Logging """) return demo if __name__ == "__main__": demo = create_interface() demo.launch(share=True)