Spaces:
Runtime error
Runtime error
| import gradio as gr | |
| import os | |
| import time | |
| import asyncio | |
| from cerebras.cloud.sdk import Cerebras | |
| from groq import Groq | |
| import requests | |
| from bs4 import BeautifulSoup | |
| from urllib.parse import urljoin, urlparse | |
| import re | |
| import json | |
| import numpy as np | |
| from datetime import datetime | |
| import logging | |
| import aiohttp | |
| # Enhanced API Setup | |
| CEREBRAS_API_KEY = os.getenv("CEREBRAS_API_KEY") | |
| GROQ_API_KEY = os.getenv("GROQ_API_KEY") | |
| if not CEREBRAS_API_KEY or not GROQ_API_KEY: | |
| raise ValueError("Both CEREBRAS_API_KEY and GROQ_API_KEY environment variables must be set.") | |
| cerebras_client = Cerebras(api_key=CEREBRAS_API_KEY) | |
| groq_client = Groq(api_key=GROQ_API_KEY) | |
| # Configure logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(levelname)s - %(message)s', | |
| filename='agent.log' | |
| ) | |
| class EnhancedToolkit: | |
| async def fetch_webpage_async(url, timeout=10): | |
| try: | |
| async with aiohttp.ClientSession() as session: | |
| async with session.get(url, timeout=timeout) as response: | |
| if response.status == 200: | |
| return await response.text() | |
| return f"Error: HTTP {response.status}" | |
| except Exception as e: | |
| return f"Error fetching URL: {str(e)}" | |
| def extract_text_from_html(html): | |
| soup = BeautifulSoup(html, 'html.parser') | |
| # Remove script and style elements | |
| for script in soup(["script", "style"]): | |
| script.decompose() | |
| text = soup.get_text(separator=' ', strip=True) | |
| # Normalize whitespace | |
| text = ' '.join(text.split()) | |
| return text | |
| def validate_url(url): | |
| try: | |
| result = urlparse(url) | |
| return all([result.scheme, result.netloc]) | |
| except ValueError: | |
| return False | |
| def summarize_text(text, max_length=500): | |
| """Simple text summarization by extracting key sentences""" | |
| sentences = text.split('. ') | |
| if len(sentences) <= 3: | |
| return text | |
| # Simple importance scoring based on sentence length and position | |
| scores = [] | |
| for i, sentence in enumerate(sentences): | |
| score = len(sentence.split()) * (1.0 / (i + 1)) # Length and position weight | |
| scores.append((score, sentence)) | |
| # Get top sentences | |
| scores.sort(reverse=True) | |
| summary = '. '.join(sent for _, sent in scores[:3]) + '.' | |
| return summary | |
| def analyze_sentiment(text): | |
| """Simple sentiment analysis""" | |
| positive_words = set(['good', 'great', 'excellent', 'positive', 'amazing', 'wonderful']) | |
| negative_words = set(['bad', 'poor', 'negative', 'terrible', 'awful', 'horrible']) | |
| words = text.lower().split() | |
| pos_count = sum(1 for word in words if word in positive_words) | |
| neg_count = sum(1 for word in words if word in negative_words) | |
| if pos_count > neg_count: | |
| return 'positive' | |
| elif neg_count > pos_count: | |
| return 'negative' | |
| return 'neutral' | |
| class AgentCore: | |
| def __init__(self): | |
| self.toolkit = EnhancedToolkit() | |
| self.tool_execution_count = 0 | |
| self.max_tools_per_turn = 5 | |
| self.context_window = [] | |
| self.max_context_items = 10 | |
| def update_context(self, user_input, ai_response): | |
| self.context_window.append({ | |
| 'user_input': user_input, | |
| 'ai_response': ai_response, | |
| 'timestamp': datetime.now().isoformat() | |
| }) | |
| if len(self.context_window) > self.max_context_items: | |
| self.context_window.pop(0) | |
| async def execute_tool(self, action, parameters): | |
| if self.tool_execution_count >= self.max_tools_per_turn: | |
| return "Tool usage limit reached for this turn." | |
| self.tool_execution_count += 1 | |
| if action == "scrape": | |
| url = parameters.get("url") | |
| if not self.toolkit.validate_url(url): | |
| return "Invalid URL provided." | |
| html_content = await self.toolkit.fetch_webpage_async(url) | |
| if html_content.startswith("Error"): | |
| return html_content | |
| text_content = self.toolkit.extract_text_from_html(html_content) | |
| summary = self.toolkit.summarize_text(text_content) | |
| sentiment = self.toolkit.analyze_sentiment(text_content) | |
| return { | |
| 'summary': summary, | |
| 'sentiment': sentiment, | |
| 'full_text': text_content[:1000] + '...' if len(text_content) > 1000 else text_content | |
| } | |
| elif action == "search": | |
| query = parameters.get("query") | |
| return f"Simulated search for: {query}\nThis would connect to a search API in production." | |
| elif action == "analyze": | |
| text = parameters.get("text") | |
| if not text: | |
| return "No text provided for analysis" | |
| return { | |
| 'sentiment': self.toolkit.analyze_sentiment(text), | |
| 'summary': self.toolkit.summarize_text(text) | |
| } | |
| return f"Unknown tool: {action}" | |
| async def chat_with_agent(user_input, chat_history, agent_core): | |
| start_time = time.time() | |
| try: | |
| # Reset tool counter for new turn | |
| agent_core.tool_execution_count = 0 | |
| # Prepare context-aware prompt | |
| system_prompt = """You are OmniAgent, a highly advanced AI assistant with multiple capabilities: | |
| Core Abilities: | |
| 1. Task Understanding & Planning | |
| 2. Web Information Retrieval & Analysis | |
| 3. Content Summarization & Sentiment Analysis | |
| 4. Context-Aware Problem Solving | |
| 5. Creative Solution Generation | |
| Available Tools: | |
| - scrape: Extract and analyze web content | |
| - search: Find relevant information | |
| - analyze: Process and understand text | |
| Use format: | |
| Action: take_action | |
| Parameters: {"action": "tool_name", "parameters": {...}} | |
| Approach each task with: | |
| 1. Initial analysis | |
| 2. Step-by-step planning | |
| 3. Tool utilization when needed | |
| 4. Result synthesis | |
| 5. Clear explanation | |
| Remember to maintain a helpful, professional, yet friendly tone.""" | |
| messages = [ | |
| {"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": user_input} | |
| ] | |
| # Use both models for different aspects of processing | |
| async def get_cerebras_response(): | |
| response = cerebras_client.completions.create( | |
| prompt=f"{system_prompt}\n\nUser: {user_input}", | |
| max_tokens=1000, | |
| temperature=0.7 | |
| ) | |
| return response.text | |
| async def get_groq_response(): | |
| completion = groq_client.chat.completions.create( | |
| messages=messages, | |
| temperature=0.7, | |
| max_tokens=2048, | |
| stream=True | |
| ) | |
| return completion | |
| # Get responses from both models | |
| cerebras_future = asyncio.create_task(get_cerebras_response()) | |
| groq_stream = await get_groq_response() | |
| # Process responses | |
| response = "" | |
| chain_of_thought = "" | |
| # Process Groq stream | |
| for chunk in groq_stream: | |
| if chunk.choices[0].delta and chunk.choices[0].delta.content: | |
| content = chunk.choices[0].delta.content | |
| response += content | |
| if "Chain of Thought:" in content: | |
| chain_of_thought += content.split("Chain of Thought:", 1)[-1] | |
| # Tool execution handling | |
| if "Action:" in content: | |
| action_match = re.search(r"Action: (\w+), Parameters: (\{.*\})", content) | |
| if action_match: | |
| action = action_match.group(1) | |
| try: | |
| parameters = json.loads(action_match.group(2)) | |
| tool_result = await agent_core.execute_tool( | |
| parameters.get("action"), | |
| parameters.get("parameters", {}) | |
| ) | |
| response += f"\nTool Result: {json.dumps(tool_result, indent=2)}\n" | |
| except json.JSONDecodeError: | |
| response += "\nError: Invalid tool parameters\n" | |
| # Integrate Cerebras response | |
| cerebras_response = await cerebras_future | |
| # Combine insights from both models | |
| final_response = f"{response}\n\nAdditional Insights:\n{cerebras_response}" | |
| # Update context | |
| agent_core.update_context(user_input, final_response) | |
| compute_time = time.time() - start_time | |
| token_usage = len(user_input.split()) + len(final_response.split()) | |
| return final_response, chain_of_thought, f"Compute Time: {compute_time:.2f}s", f"Tokens: {token_usage}" | |
| except Exception as e: | |
| logging.error(f"Error in chat_with_agent: {str(e)}", exc_info=True) | |
| return f"Error: {str(e)}", "", "Error occurred", "" | |
| def create_interface(): | |
| with gr.Blocks(theme=gr.themes.Soft()) as demo: | |
| agent_core = AgentCore() | |
| gr.Markdown("""# π OmniAgent: Advanced AI Assistant | |
| Powered by dual AI models for enhanced capabilities and deeper understanding.""") | |
| with gr.Row(): | |
| with gr.Column(scale=6): | |
| chat_history = gr.Chatbot( | |
| label="Interaction History", | |
| height=600, | |
| show_label=True | |
| ) | |
| with gr.Column(scale=2): | |
| with gr.Accordion("Performance Metrics", open=True): | |
| compute_time = gr.Textbox(label="Processing Time", interactive=False) | |
| token_usage_display = gr.Textbox(label="Resource Usage", interactive=False) | |
| with gr.Accordion("Agent Insights", open=True): | |
| chain_of_thought_display = gr.Textbox( | |
| label="Reasoning Process", | |
| interactive=False, | |
| lines=10 | |
| ) | |
| user_input = gr.Textbox( | |
| label="Your Request", | |
| placeholder="How can I assist you today?", | |
| lines=3 | |
| ) | |
| with gr.Row(): | |
| send_button = gr.Button("Send", variant="primary") | |
| clear_button = gr.Button("Clear History", variant="secondary") | |
| export_button = gr.Button("Export Chat", variant="secondary") | |
| async def handle_chat(chat_history, user_input): | |
| if not user_input.strip(): | |
| return chat_history, "", "", "" | |
| ai_response, chain_of_thought, compute_info, token_usage = await chat_with_agent( | |
| user_input, | |
| chat_history, | |
| agent_core | |
| ) | |
| chat_history.append((user_input, ai_response)) | |
| return chat_history, chain_of_thought, compute_info, token_usage | |
| def clear_chat(): | |
| agent_core.context_window.clear() | |
| return [], "", "", "" | |
| def export_chat(chat_history): | |
| if not chat_history: | |
| return "No chat history to export.", "" | |
| filename = f"omnigent_chat_{int(time.time())}.txt" | |
| chat_text = "\n".join([ | |
| f"User: {item[0]}\nAI: {item[1]}\n" | |
| for item in chat_history | |
| ]) | |
| with open(filename, "w") as file: | |
| file.write(chat_text) | |
| return f"Chat exported to {filename}", "" | |
| # Event handlers | |
| send_button.click( | |
| handle_chat, | |
| inputs=[chat_history, user_input], | |
| outputs=[chat_history, chain_of_thought_display, compute_time, token_usage_display] | |
| ) | |
| clear_button.click( | |
| clear_chat, | |
| outputs=[chat_history, chain_of_thought_display, compute_time, token_usage_display] | |
| ) | |
| export_button.click( | |
| export_chat, | |
| inputs=[chat_history], | |
| outputs=[compute_time, chain_of_thought_display] | |
| ) | |
| user_input.submit( | |
| handle_chat, | |
| inputs=[chat_history, user_input], | |
| outputs=[chat_history, chain_of_thought_display, compute_time, token_usage_display] | |
| ) | |
| gr.Markdown("""### π Advanced Capabilities: | |
| - Dual AI Model Processing | |
| - Advanced Web Content Analysis | |
| - Sentiment Understanding | |
| - Intelligent Text Summarization | |
| - Context-Aware Responses | |
| - Enhanced Error Handling | |
| - Detailed Performance Tracking | |
| - Comprehensive Logging | |
| """) | |
| return demo | |
| if __name__ == "__main__": | |
| demo = create_interface() | |
| demo.launch(share=True) |