Spaces:
Runtime error
Runtime error
import os | |
import json | |
import logging | |
import asyncio | |
import time | |
from datetime import datetime | |
from enum import Enum | |
from typing import Dict, List, Optional, Set, Union, Any | |
from dataclasses import dataclass, field | |
from pathlib import Path | |
import hashlib | |
import tempfile | |
import shutil | |
import gradio as gr | |
import networkx as nx | |
from langchain.prompts import PromptTemplate | |
import torch | |
from transformers import AutoModelForCausalLM, AutoTokenizer, pipeline | |
from langchain.llms import HuggingFacePipeline | |
from langchain.agents import initialize_agent, Tool | |
import subprocess | |
import asyncio | |
# Configure logging | |
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") | |
logger = logging.getLogger(__name__) | |
# Load the LLM and tokenizer | |
def load_model(): | |
"""Load the model and tokenizer.""" | |
try: | |
from transformers import AutoModelForCausalLM, AutoTokenizer | |
model_name = "gpt2" # Using a smaller model for testing | |
tokenizer = AutoTokenizer.from_pretrained(model_name) | |
model = AutoModelForCausalLM.from_pretrained(model_name) | |
return tokenizer, model | |
except Exception as e: | |
logger.error(f"Failed to load model: {str(e)}") | |
raise | |
# Initialize models lazily | |
tokenizer = None | |
model = None | |
hf_pipeline = None | |
llm = None | |
def get_llm(): | |
"""Get or initialize the language model.""" | |
global llm, tokenizer, model, hf_pipeline | |
try: | |
if llm is None: | |
tokenizer, model = load_model() | |
hf_pipeline = pipeline( | |
"text-generation", | |
model=model, | |
tokenizer=tokenizer, | |
max_length=500, | |
temperature=0.7, | |
) | |
llm = HuggingFacePipeline(pipeline=hf_pipeline) | |
return llm | |
except Exception as e: | |
logger.error(f"Failed to get LLM: {str(e)}") | |
raise | |
def get_agent(agent_type): | |
"""Get or initialize an agent with the specified type.""" | |
try: | |
llm = get_llm() | |
return initialize_agent( | |
tools=[ | |
Tool( | |
name="Code Formatter", | |
func=lambda x: subprocess.run(["black", "-"], input=x.encode(), capture_output=True).stdout.decode(), | |
description="Formats code using Black.", | |
), | |
Tool( | |
name="API Generator", | |
func=lambda x: json.dumps({"endpoints": {"example": "POST - Example endpoint."}}), | |
description="Generates API details from code.", | |
), | |
Tool( | |
name="Task Decomposer", | |
func=lambda x: json.dumps({"tasks": ["Design UI", "Develop Backend", "Test App", "Deploy App"]}), | |
description="Breaks down app requirements into smaller tasks.", | |
), | |
], | |
llm=llm, | |
agent="zero-shot-react-description", | |
verbose=True, | |
) | |
except Exception as e: | |
logger.error(f"Failed to get agent: {str(e)}") | |
raise | |
# Enhanced prompt templates with more specific instructions | |
ui_designer_prompt = PromptTemplate( | |
input_variables=["input"], | |
template="""You are an expert UI Designer specializing in modern, responsive web applications. | |
Task: {input} | |
Focus on: | |
1. Clean, intuitive user interface | |
2. Responsive design principles | |
3. Modern UI components | |
4. Accessibility standards | |
5. Cross-browser compatibility | |
Generate code using: | |
- HTML5 semantic elements | |
- Modern CSS (Flexbox/Grid) | |
- React/Vue.js best practices | |
- Material UI or Tailwind CSS | |
Provide detailed component structure and styling.""" | |
) | |
backend_developer_prompt = PromptTemplate( | |
input_variables=["input"], | |
template="""You are an expert Backend Developer specializing in scalable applications. | |
Task: {input} | |
Focus on: | |
1. RESTful API design | |
2. Database schema optimization | |
3. Security best practices | |
4. Error handling | |
5. Performance optimization | |
Include: | |
- API endpoint definitions | |
- Database models | |
- Authentication/Authorization | |
- Input validation | |
- Error handling middleware | |
- Rate limiting | |
- Logging | |
Use modern backend frameworks (FastAPI/Django/Express).""" | |
) | |
qa_engineer_prompt = PromptTemplate( | |
input_variables=["input"], | |
template="""You are an expert QA Engineer focusing on comprehensive testing. | |
Task: {input} | |
Implement: | |
1. Unit tests | |
2. Integration tests | |
3. API endpoint tests | |
4. UI component tests | |
5. Performance tests | |
Include: | |
- Test cases for edge cases | |
- Input validation tests | |
- Error handling tests | |
- Load testing scenarios | |
- Security testing checks""" | |
) | |
devops_engineer_prompt = PromptTemplate( | |
input_variables=["input"], | |
template="""You are an expert DevOps Engineer specializing in modern deployment practices. | |
Task: {input} | |
Provide: | |
1. Dockerfile configuration | |
2. Docker Compose setup | |
3. CI/CD pipeline configuration | |
4. Environment configuration | |
5. Monitoring setup | |
Include: | |
- Development/Production configs | |
- Environment variables | |
- Health checks | |
- Logging setup | |
- Monitoring integration | |
- Backup strategies""" | |
) | |
def generate_project_structure(app_name, features): | |
"""Generate a complete project structure based on features.""" | |
return f""" | |
{app_name}/ | |
├── frontend/ | |
│ ├── src/ | |
│ │ ├── components/ | |
│ │ ├── pages/ | |
│ │ ├── hooks/ | |
│ │ ├── utils/ | |
│ │ └── styles/ | |
│ ├── package.json | |
│ └── README.md | |
├── backend/ | |
│ ├── src/ | |
│ │ ├── routes/ | |
│ │ ├── controllers/ | |
│ │ ├── models/ | |
│ │ ├── middleware/ | |
│ │ └── utils/ | |
│ ├── requirements.txt | |
│ └── README.md | |
├── tests/ | |
│ ├── unit/ | |
│ ├── integration/ | |
│ └── e2e/ | |
├── docs/ | |
│ ├── API.md | |
│ ├── SETUP.md | |
│ └── DEPLOYMENT.md | |
├── docker-compose.yml | |
├── .env.example | |
└── README.md | |
""" | |
def generate_documentation(app_name, features, api_details): | |
"""Generate comprehensive documentation.""" | |
return f""" | |
# {app_name} | |
## Overview | |
A modern web application with the following features: | |
{features} | |
## Quick Start | |
```bash | |
# Clone the repository | |
git clone <repository-url> | |
# Install dependencies | |
cd {app_name} | |
# Frontend | |
cd frontend && npm install | |
# Backend | |
cd ../backend && pip install -r requirements.txt | |
# Run the application | |
docker-compose up | |
``` | |
## API Documentation | |
{api_details} | |
## Development | |
- Frontend: React.js with TypeScript | |
- Backend: Python with FastAPI | |
- Database: PostgreSQL | |
- Cache: Redis | |
- Testing: Jest, Pytest | |
## Deployment | |
Includes Docker configuration for easy deployment: | |
- Frontend container | |
- Backend container | |
- Database container | |
- Redis container | |
## Testing | |
```bash | |
# Run frontend tests | |
cd frontend && npm test | |
# Run backend tests | |
cd backend && pytest | |
``` | |
## Contributing | |
Please read CONTRIBUTING.md for details on our code of conduct and the process for submitting pull requests. | |
## License | |
This project is licensed under the MIT License - see the LICENSE.md file for details | |
""" | |
# AI Flow States and Types | |
class FlowState(Enum): | |
PENDING = "pending" | |
RUNNING = "running" | |
COMPLETED = "completed" | |
FAILED = "failed" | |
class AgentRole(Enum): | |
ARCHITECT = "architect" | |
UI_DESIGNER = "ui_designer" | |
BACKEND_DEVELOPER = "backend_developer" | |
DATABASE_ENGINEER = "database_engineer" | |
SECURITY_EXPERT = "security_expert" | |
QA_ENGINEER = "qa_engineer" | |
DEVOPS_ENGINEER = "devops_engineer" | |
DOCUMENTATION_WRITER = "documentation_writer" | |
class AgentContext: | |
"""Context information for each agent in the flow.""" | |
role: AgentRole | |
state: FlowState | |
artifacts: Dict[str, str] | |
dependencies: List[AgentRole] | |
feedback: List[str] | |
class AIFlow: | |
"""Manages the flow of work between different AI agents.""" | |
def __init__(self): | |
self.flow_graph = nx.DiGraph() | |
self.contexts: Dict[AgentRole, AgentContext] = {} | |
self.global_context = {} | |
def initialize_flow(self): | |
"""Initialize the AI Flow with agent relationships and dependencies.""" | |
# Define agent relationships | |
flow_structure = { | |
AgentRole.ARCHITECT: [AgentRole.UI_DESIGNER, AgentRole.BACKEND_DEVELOPER, AgentRole.DATABASE_ENGINEER], | |
AgentRole.UI_DESIGNER: [AgentRole.QA_ENGINEER], | |
AgentRole.BACKEND_DEVELOPER: [AgentRole.SECURITY_EXPERT, AgentRole.QA_ENGINEER], | |
AgentRole.DATABASE_ENGINEER: [AgentRole.SECURITY_EXPERT], | |
AgentRole.SECURITY_EXPERT: [AgentRole.QA_ENGINEER], | |
AgentRole.QA_ENGINEER: [AgentRole.DEVOPS_ENGINEER], | |
AgentRole.DEVOPS_ENGINEER: [AgentRole.DOCUMENTATION_WRITER], | |
AgentRole.DOCUMENTATION_WRITER: [] | |
} | |
# Build the flow graph | |
for role, dependencies in flow_structure.items(): | |
self.flow_graph.add_node(role) | |
for dep in dependencies: | |
self.flow_graph.add_edge(role, dep) | |
# Initialize context for each agent | |
self.contexts[role] = AgentContext( | |
role=role, | |
state=FlowState.PENDING, | |
artifacts={}, | |
dependencies=dependencies, | |
feedback=[] | |
) | |
async def execute_flow(self, requirements: str): | |
"""Execute the AI Flow with parallel processing where possible.""" | |
try: | |
self.initialize_flow() | |
self.global_context["requirements"] = requirements | |
# Get all paths through the flow graph | |
paths = list(nx.all_simple_paths( | |
self.flow_graph, | |
AgentRole.ARCHITECT, | |
AgentRole.DOCUMENTATION_WRITER | |
)) | |
# Execute paths in parallel | |
await self._execute_paths(paths) | |
return self._compile_results() | |
except Exception as e: | |
logger.error(f"Flow execution failed: {str(e)}") | |
raise | |
async def _execute_paths(self, paths: List[List[AgentRole]]): | |
"""Execute all paths in the flow graph.""" | |
try: | |
results = [] | |
for path in paths: | |
path_results = [] | |
for role in path: | |
# Get the agent's prompt based on previous results | |
prompt = self._generate_prompt(role, path_results) | |
# Execute the agent's task | |
result = await self._execute_agent_task(role, prompt) | |
path_results.append(result) | |
# Store result in context | |
self.context_manager.add_memory( | |
f"{role.value}_result", | |
result, | |
{"timestamp": datetime.now()} | |
) | |
results.extend(path_results) | |
# Store all results in context | |
self.context_manager.add_memory( | |
"path_results", | |
results, | |
{"timestamp": datetime.now()} | |
) | |
return results | |
except Exception as e: | |
logger.error(f"Failed to execute paths: {str(e)}") | |
raise | |
def _generate_prompt(self, role: AgentRole, previous_results: List[str]) -> str: | |
"""Generate a prompt for an agent based on previous results.""" | |
requirements = self.context_manager.global_context.get("requirements", "") | |
# Base prompt with requirements | |
prompt = f"Requirements: {requirements}\n\n" | |
# Add context from previous results | |
if previous_results: | |
prompt += "Previous work:\n" | |
for i, result in enumerate(previous_results): | |
prompt += f"{i+1}. {result}\n" | |
# Add role-specific instructions | |
if role == AgentRole.ARCHITECT: | |
prompt += "\nAs the Architect, design the high-level system architecture." | |
elif role == AgentRole.UI_DESIGNER: | |
prompt += "\nAs the UI Designer, create the user interface design." | |
elif role == AgentRole.BACKEND_DEVELOPER: | |
prompt += "\nAs the Backend Developer, implement the server-side logic." | |
elif role == AgentRole.DATABASE_ENGINEER: | |
prompt += "\nAs the Database Engineer, design the data model and storage." | |
elif role == AgentRole.SECURITY_EXPERT: | |
prompt += "\nAs the Security Expert, ensure security best practices." | |
elif role == AgentRole.QA_ENGINEER: | |
prompt += "\nAs the QA Engineer, create test cases and validation." | |
elif role == AgentRole.DEVOPS_ENGINEER: | |
prompt += "\nAs the DevOps Engineer, set up deployment and CI/CD." | |
elif role == AgentRole.DOCUMENTATION_WRITER: | |
prompt += "\nAs the Documentation Writer, create comprehensive documentation." | |
return prompt | |
def _compile_results(self) -> str: | |
"""Compile all results into a final output.""" | |
try: | |
results = [] | |
# Get all results from memory | |
for role in AgentRole: | |
result = self.context_manager.get_memory(f"{role.value}_result") | |
if result: | |
results.append(f"## {role.value}\n{result['value']}\n") | |
return "\n".join(results) | |
except Exception as e: | |
logger.error(f"Failed to compile results: {str(e)}") | |
raise | |
async def _execute_agent_task(self, role: AgentRole, prompt: str) -> str: | |
"""Execute a specific agent's task with the given prompt.""" | |
try: | |
if role == AgentRole.ARCHITECT: | |
agent = get_agent("architect") | |
elif role == AgentRole.UI_DESIGNER: | |
agent = get_agent("ui_designer") | |
elif role == AgentRole.BACKEND_DEVELOPER: | |
agent = get_agent("backend_developer") | |
elif role == AgentRole.DATABASE_ENGINEER: | |
agent = get_agent("database_engineer") | |
elif role == AgentRole.SECURITY_EXPERT: | |
agent = get_agent("security_expert") | |
elif role == AgentRole.QA_ENGINEER: | |
agent = get_agent("qa_engineer") | |
elif role == AgentRole.DEVOPS_ENGINEER: | |
agent = get_agent("devops_engineer") | |
elif role == AgentRole.DOCUMENTATION_WRITER: | |
agent = get_agent("documentation_writer") | |
else: | |
raise ValueError(f"Unknown agent role: {role}") | |
# Execute the agent's task | |
result = agent.run(prompt) | |
# Log the execution | |
logger.info(f"Agent {role.value} completed task") | |
return result | |
except Exception as e: | |
logger.error(f"Agent {role.value} failed: {str(e)}") | |
raise | |
class FileContext: | |
"""Context for file operations and tracking.""" | |
path: Path | |
content: str | |
last_modified: datetime | |
dependencies: Set[Path] | |
checksum: str | |
def from_path(cls, path: Path): | |
content = path.read_text() | |
return cls( | |
path=path, | |
content=content, | |
last_modified=datetime.fromtimestamp(path.stat().st_mtime), | |
dependencies=set(), | |
checksum=hashlib.md5(content.encode()).hexdigest() | |
) | |
class MemoryItem: | |
"""Represents a single memory item in the system.""" | |
key: str | |
value: Any | |
context: dict | |
timestamp: datetime | |
importance: float = 1.0 | |
references: Set[str] = field(default_factory=set) | |
class ContextManager: | |
"""Manages real-time context awareness across the system.""" | |
def __init__(self): | |
self.file_contexts: Dict[Path, FileContext] = {} | |
self.global_context: Dict[str, Any] = {} | |
self.command_history: List[Dict] = [] | |
self.memory_store: Dict[str, MemoryItem] = {} | |
def update_file_context(self, path: Path) -> FileContext: | |
"""Update context for a specific file.""" | |
context = FileContext.from_path(path) | |
self.file_contexts[path] = context | |
return context | |
def get_related_files(self, path: Path) -> Set[Path]: | |
"""Find files related to the given file.""" | |
if path not in self.file_contexts: | |
self.update_file_context(path) | |
context = self.file_contexts[path] | |
return context.dependencies | |
def track_command(self, command: str, args: List[str], result: Any): | |
"""Track command execution and results.""" | |
self.command_history.append({ | |
'command': command, | |
'args': args, | |
'result': result, | |
'timestamp': datetime.now(), | |
}) | |
def add_memory(self, key: str, value: Any, context: dict = None): | |
"""Add an item to the memory store.""" | |
self.memory_store[key] = MemoryItem( | |
key=key, | |
value=value, | |
context=context or {}, | |
timestamp=datetime.now() | |
) | |
def get_memory(self, key: str) -> Any: | |
"""Retrieve an item from memory.""" | |
item = self.memory_store.get(key) | |
return item.value if item else None | |
class FileOperationManager: | |
"""Manages multi-file operations and tracking.""" | |
def __init__(self, context_manager: ContextManager): | |
self.context_manager = context_manager | |
self.pending_changes: Dict[Path, str] = {} | |
async def edit_files(self, changes: Dict[Path, str]): | |
"""Apply changes to multiple files atomically.""" | |
try: | |
# Validate all changes first | |
for path, content in changes.items(): | |
if not self._validate_change(path, content): | |
raise ValueError(f"Invalid change for {path}") | |
# Apply changes | |
for path, content in changes.items(): | |
await self._apply_change(path, content) | |
# Update contexts | |
for path in changes: | |
self.context_manager.update_file_context(path) | |
except Exception as e: | |
logger.error(f"Failed to apply multi-file changes: {str(e)}") | |
raise | |
def _validate_change(self, path: Path, content: str) -> bool: | |
"""Validate a proposed file change.""" | |
try: | |
# Check file exists or can be created | |
if not path.parent.exists(): | |
path.parent.mkdir(parents=True) | |
# Validate syntax if it's a Python file | |
if path.suffix == '.py': | |
compile(content, str(path), 'exec') | |
return True | |
except Exception as e: | |
logger.error(f"Validation failed for {path}: {str(e)}") | |
return False | |
async def _apply_change(self, path: Path, content: str): | |
"""Apply a single file change.""" | |
path.write_text(content) | |
class CommandManager: | |
"""Manages command suggestions and execution.""" | |
def __init__(self, context_manager: ContextManager): | |
self.context_manager = context_manager | |
self.command_templates: Dict[str, str] = {} | |
def suggest_commands(self, context: dict) -> List[Dict]: | |
"""Suggest relevant commands based on context.""" | |
suggestions = [] | |
for cmd_name, template in self.command_templates.items(): | |
if self._is_relevant(cmd_name, context): | |
suggestions.append({ | |
'command': cmd_name, | |
'template': template, | |
'confidence': self._calculate_confidence(cmd_name, context) | |
}) | |
return sorted(suggestions, key=lambda x: x['confidence'], reverse=True) | |
async def execute_command(self, command: str, args: List[str]) -> Any: | |
"""Execute a command and track its result.""" | |
try: | |
# Execute the command | |
result = await self._run_command(command, args) | |
# Track the execution | |
self.context_manager.track_command(command, args, result) | |
return result | |
except Exception as e: | |
logger.error(f"Command execution failed: {str(e)}") | |
raise | |
def _is_relevant(self, cmd_name: str, context: dict) -> bool: | |
"""Determine if a command is relevant to the current context.""" | |
# Implementation depends on specific rules | |
return True | |
def _calculate_confidence(self, cmd_name: str, context: dict) -> float: | |
"""Calculate confidence score for a command suggestion.""" | |
# Implementation depends on specific metrics | |
return 1.0 | |
class RuleSystem: | |
"""Manages system rules and constraints.""" | |
def __init__(self): | |
self.rules: Dict[str, callable] = {} | |
self.constraints: Dict[str, callable] = {} | |
def add_rule(self, name: str, rule_func: callable): | |
"""Add a new rule to the system.""" | |
self.rules[name] = rule_func | |
def add_constraint(self, name: str, constraint_func: callable): | |
"""Add a new constraint to the system.""" | |
self.constraints[name] = constraint_func | |
def evaluate_rules(self, context: dict) -> Dict[str, bool]: | |
"""Evaluate all rules against the current context.""" | |
return {name: rule(context) for name, rule in self.rules.items()} | |
def check_constraints(self, context: dict) -> Dict[str, bool]: | |
"""Check all constraints against the current context.""" | |
return {name: constraint(context) for name, constraint in self.constraints.items()} | |
class ProjectBuilder: | |
"""Handles autonomous creation of project files and folders.""" | |
def __init__(self, base_path: Path): | |
self.base_path = Path(base_path) | |
self.current_build = None | |
self.file_manifest = [] | |
async def create_project(self, app_name: str, structure: dict) -> Path: | |
"""Create a new project with the specified structure.""" | |
try: | |
# Create temporary build directory | |
build_dir = Path(tempfile.mkdtemp()) | |
self.current_build = build_dir / app_name | |
self.current_build.mkdir(parents=True) | |
# Create project structure | |
await self._create_structure(self.current_build, structure) | |
return self.current_build | |
except Exception as e: | |
logger.error(f"Project creation failed: {str(e)}") | |
if self.current_build and self.current_build.exists(): | |
shutil.rmtree(self.current_build) | |
raise | |
async def _create_structure(self, parent: Path, structure: dict): | |
"""Recursively create project structure.""" | |
for name, content in structure.items(): | |
path = parent / name | |
if isinstance(content, dict): | |
path.mkdir(exist_ok=True) | |
await self._create_structure(path, content) | |
else: | |
path.write_text(str(content)) | |
self.file_manifest.append(path) | |
class OutputManager: | |
"""Manages project outputs and creates downloadable artifacts.""" | |
def __init__(self, project_builder: ProjectBuilder): | |
self.project_builder = project_builder | |
self.output_dir = Path(tempfile.mkdtemp()) | |
self.downloads = {} | |
def create_download(self, app_name: str) -> str: | |
"""Create a downloadable zip file of the project.""" | |
try: | |
if not self.project_builder.current_build: | |
raise ValueError("No project has been built yet") | |
# Create zip file | |
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
zip_name = f"{app_name}_{timestamp}.zip" | |
zip_path = self.output_dir / zip_name | |
with ZipFile(zip_path, 'w') as zipf: | |
for file_path in self.project_builder.file_manifest: | |
rel_path = file_path.relative_to(self.project_builder.current_build) | |
zipf.write(file_path, rel_path) | |
# Store download info | |
self.downloads[zip_name] = { | |
'path': zip_path, | |
'created_at': datetime.now(), | |
'size': zip_path.stat().st_size | |
} | |
return str(zip_path) | |
except Exception as e: | |
logger.error(f"Failed to create download: {str(e)}") | |
raise | |
class EnhancedAIFlow(AIFlow): | |
"""Enhanced AI Flow with project building and output management.""" | |
def __init__(self): | |
super().__init__() | |
self.project_builder = ProjectBuilder(Path(tempfile.mkdtemp())) | |
self.output_manager = OutputManager(self.project_builder) | |
self.context_manager = ContextManager() | |
self.file_manager = FileOperationManager(self.context_manager) | |
self.command_manager = CommandManager(self.context_manager) | |
self.rule_system = RuleSystem() | |
self.flow_graph = nx.DiGraph() | |
self.contexts: Dict[AgentRole, AgentContext] = {} | |
self.global_context = {} | |
self.requirements = "" | |
def initialize_flow(self): | |
"""Initialize the AI Flow with agent relationships and dependencies.""" | |
# Create nodes for each agent role | |
for role in AgentRole: | |
self.flow_graph.add_node(role) | |
self.contexts[role] = AgentContext( | |
role=role, | |
state=FlowState.PENDING, | |
artifacts={}, | |
dependencies=[], | |
feedback=[] | |
) | |
# Define dependencies | |
dependencies = { | |
AgentRole.UI_DESIGNER: [AgentRole.ARCHITECT], | |
AgentRole.BACKEND_DEVELOPER: [AgentRole.ARCHITECT], | |
AgentRole.DATABASE_ENGINEER: [AgentRole.ARCHITECT, AgentRole.BACKEND_DEVELOPER], | |
AgentRole.SECURITY_EXPERT: [AgentRole.ARCHITECT, AgentRole.BACKEND_DEVELOPER], | |
AgentRole.QA_ENGINEER: [AgentRole.UI_DESIGNER, AgentRole.BACKEND_DEVELOPER], | |
AgentRole.DEVOPS_ENGINEER: [AgentRole.BACKEND_DEVELOPER, AgentRole.DATABASE_ENGINEER], | |
AgentRole.DOCUMENTATION_WRITER: [AgentRole.ARCHITECT, AgentRole.UI_DESIGNER, AgentRole.BACKEND_DEVELOPER] | |
} | |
# Add edges based on dependencies | |
for role, deps in dependencies.items(): | |
for dep in deps: | |
self.flow_graph.add_edge(dep, role) | |
self.contexts[role].dependencies.extend(deps) | |
def _generate_prompt(self, role: AgentRole) -> str: | |
"""Generate a prompt for an agent based on context and dependencies.""" | |
try: | |
context = self.contexts[role] | |
dependencies_output = [] | |
# Gather outputs from dependencies | |
for dep_role in context.dependencies: | |
dep_context = self.contexts[dep_role] | |
if dep_context.state == FlowState.COMPLETED and "output" in dep_context.artifacts: | |
dependencies_output.append(f"## {dep_role.value} Output:\n{dep_context.artifacts['output']}") | |
# Build role-specific prompts | |
role_prompts = { | |
AgentRole.ARCHITECT: """You are a Software Architect. Design the high-level architecture for: | |
Requirements: | |
{requirements} | |
Focus on: | |
1. Overall system design | |
2. Component interactions | |
3. Technology stack | |
4. Data flow | |
5. Scalability considerations""", | |
AgentRole.UI_DESIGNER: """You are a UI/UX Designer. Design the user interface for: | |
Requirements: | |
{requirements} | |
Previous Designs: | |
{dependencies} | |
Focus on: | |
1. User experience | |
2. Component layout | |
3. Responsive design | |
4. Visual hierarchy | |
5. Accessibility""", | |
AgentRole.BACKEND_DEVELOPER: """You are a Backend Developer. Implement the server-side logic for: | |
Requirements: | |
{requirements} | |
Architecture: | |
{dependencies} | |
Focus on: | |
1. API design | |
2. Business logic | |
3. Data validation | |
4. Error handling | |
5. Performance""", | |
AgentRole.DATABASE_ENGINEER: """You are a Database Engineer. Design the data layer for: | |
Requirements: | |
{requirements} | |
System Context: | |
{dependencies} | |
Focus on: | |
1. Schema design | |
2. Data relationships | |
3. Indexing strategy | |
4. Query optimization | |
5. Data integrity""", | |
AgentRole.SECURITY_EXPERT: """You are a Security Expert. Review and enhance security for: | |
Requirements: | |
{requirements} | |
System Context: | |
{dependencies} | |
Focus on: | |
1. Authentication | |
2. Authorization | |
3. Data protection | |
4. Security best practices | |
5. Compliance requirements""", | |
AgentRole.QA_ENGINEER: """You are a QA Engineer. Design the testing strategy for: | |
Requirements: | |
{requirements} | |
Implementation Details: | |
{dependencies} | |
Focus on: | |
1. Test coverage | |
2. Test automation | |
3. Edge cases | |
4. Performance testing | |
5. Security testing""", | |
AgentRole.DEVOPS_ENGINEER: """You are a DevOps Engineer. Set up the deployment pipeline for: | |
Requirements: | |
{requirements} | |
System Context: | |
{dependencies} | |
Focus on: | |
1. CI/CD pipeline | |
2. Infrastructure as code | |
3. Monitoring | |
4. Scaling | |
5. Disaster recovery""", | |
AgentRole.DOCUMENTATION_WRITER: """You are a Technical Writer. Create comprehensive documentation for: | |
Requirements: | |
{requirements} | |
System Details: | |
{dependencies} | |
Focus on: | |
1. Setup instructions | |
2. API documentation | |
3. User guides | |
4. Architecture overview | |
5. Troubleshooting guides""" | |
} | |
# Get the base prompt for the role | |
base_prompt = role_prompts.get(role, "") | |
# Format the prompt with requirements and dependencies | |
formatted_prompt = base_prompt.format( | |
requirements=self.requirements, | |
dependencies="\n\n".join(dependencies_output) if dependencies_output else "No previous context available." | |
) | |
return formatted_prompt | |
except Exception as e: | |
logger.error(f"Failed to generate prompt for {role}: {str(e)}") | |
raise | |
async def execute_flow(self, requirements: str) -> str: | |
"""Execute the AI Flow and build the project.""" | |
try: | |
# Initialize flow with requirements | |
self.requirements = requirements | |
self.initialize_flow() | |
# Extract app name | |
app_name = requirements.split()[0].lower().replace(" ", "_") | |
# Execute agents in parallel where possible | |
paths = list(nx.all_simple_paths(self.flow_graph, AgentRole.ARCHITECT, AgentRole.DOCUMENTATION_WRITER)) | |
await self._execute_paths(paths) | |
# Generate project structure and documentation | |
project_structure = generate_project_structure(app_name, self.contexts[AgentRole.ARCHITECT].artifacts) | |
documentation = generate_documentation(app_name, requirements, self.contexts[AgentRole.DOCUMENTATION_WRITER].artifacts) | |
# Create downloadable output | |
download_path = self.output_manager.create_download(app_name) | |
return f""" | |
# {app_name.title()} - Generated Application | |
## Project Structure | |
``` | |
{project_structure} | |
``` | |
## Documentation | |
{documentation} | |
## Download | |
To download your project, use this path: {download_path} | |
## Next Steps | |
1. Review the generated architecture and components | |
2. Set up the development environment | |
3. Implement the components following the provided structure | |
4. Run the test suite | |
5. Deploy using the provided configurations | |
## Support | |
For any issues or questions, please refer to the documentation or create an issue in the repository. | |
""" | |
except Exception as e: | |
logger.error(f"Failed to execute flow: {str(e)}") | |
raise | |
finally: | |
if torch.cuda.is_available(): | |
torch.cuda.empty_cache() | |
async def _execute_paths(self, paths: List[List[AgentRole]]): | |
"""Execute all paths in the flow graph.""" | |
try: | |
for path in paths: | |
for role in path: | |
if self.contexts[role].state != FlowState.COMPLETED: | |
await self._execute_agent(role) | |
except Exception as e: | |
logger.error(f"Failed to execute paths: {str(e)}") | |
raise | |
async def _execute_agent(self, role: AgentRole): | |
"""Execute a single agent's tasks with enhanced context.""" | |
try: | |
context = self.contexts[role] | |
if context.state == FlowState.COMPLETED: | |
return | |
context.state = FlowState.RUNNING | |
prompt = self._generate_prompt(role) | |
# Execute agent task | |
result = await self._execute_agent_task(role, prompt) | |
# Update context with results | |
context.artifacts.update(result) | |
context.state = FlowState.COMPLETED | |
except Exception as e: | |
context.state = FlowState.FAILED | |
logger.error(f"Failed to execute agent {role}: {str(e)}") | |
raise | |
async def _execute_agent_task(self, role: AgentRole, prompt: str) -> Dict[str, str]: | |
"""Execute a specific agent's task with the given prompt.""" | |
try: | |
# Get the appropriate agent for the role | |
agent = get_agent(role) | |
# Execute the agent's task | |
result = await agent.arun(prompt) | |
# Process and return the result | |
return {"output": result} | |
except Exception as e: | |
logger.error(f"Failed to execute agent task for {role}: {str(e)}") | |
raise | |
# Update the multi_agent_workflow function to use AI Flows | |
async def multi_agent_workflow(requirements: str) -> str: | |
""" | |
Execute a multi-agent workflow using AI Flows to generate a complex app. | |
Args: | |
requirements (str): App requirements. | |
Returns: | |
str: Generated app code and API details. | |
""" | |
try: | |
# Initialize AI Flow | |
ai_flow = EnhancedAIFlow() | |
# Execute the flow with requirements | |
result = await ai_flow.execute_flow(requirements) | |
# Return the compiled results | |
return result | |
except Exception as e: | |
logger.error(f"Failed to execute multi-agent workflow: {str(e)}") | |
raise | |
# Update the app_generator function to handle async execution | |
async def app_generator(requirements: str) -> str: | |
""" | |
Generate an app based on the provided requirements using AI Flows. | |
Args: | |
requirements (str): App requirements. | |
Returns: | |
str: Generated app code and API details. | |
""" | |
try: | |
# Execute the multi-agent workflow | |
result = await multi_agent_workflow(requirements) | |
return result | |
except Exception as e: | |
logger.error(f"Failed to generate app: {str(e)}") | |
raise | |
class StreamHandler: | |
"""Handles streaming output for the Gradio interface.""" | |
def __init__(self): | |
self.output = [] | |
self.current_status = "" | |
def update(self, message: str, status: str = None): | |
"""Update the output stream.""" | |
timestamp = datetime.now().strftime("%H:%M:%S") | |
formatted_message = f"[{timestamp}] {message}" | |
self.output.append(formatted_message) | |
if status: | |
self.current_status = status | |
# Keep only the last 100 lines | |
if len(self.output) > 100: | |
self.output = self.output[-100:] | |
return "\n".join(self.output), self.current_status | |
# Gradio UI | |
with gr.Blocks(theme=gr.themes.Soft()) as ui: | |
stream_handler = StreamHandler() | |
gr.Markdown("# Autonomous App Generator with AI Flow") | |
gr.Markdown(""" | |
## Instructions | |
1. Describe the app you want to build in detail | |
2. Include any specific requirements or features | |
3. Click 'Generate App' to start the process | |
4. Download your generated app from the provided link | |
### Example: | |
``` | |
Create a personal task management application with: | |
- User authentication (email/password, Google OAuth) | |
- Task management (CRUD, priorities, due dates, reminders) | |
- Modern UI with dark/light theme | |
- Real-time updates using WebSocket | |
- PostgreSQL and Redis for storage | |
``` | |
""") | |
with gr.Row(): | |
with gr.Column(scale=4): | |
requirements_input = gr.Textbox( | |
label="App Requirements", | |
placeholder="Describe the app you want to build...", | |
lines=10 | |
) | |
with gr.Row(): | |
generate_button = gr.Button("Generate App", variant="primary") | |
cancel_button = gr.Button("Cancel", variant="stop") | |
status = gr.Textbox( | |
label="Status", | |
value="Ready", | |
interactive=False | |
) | |
with gr.Column(scale=6): | |
with gr.Tabs(): | |
with gr.TabItem("Output"): | |
output = gr.Markdown( | |
label="Generated App Details", | |
value="Your app details will appear here..." | |
) | |
with gr.TabItem("Download"): | |
file_output = gr.File( | |
label="Download Generated App", | |
interactive=False | |
) | |
with gr.TabItem("Live Log"): | |
log_output = gr.Textbox( | |
label="Generation Logs", | |
value="Logs will appear here...", | |
lines=10, | |
max_lines=20, | |
interactive=False, | |
show_copy_button=True | |
) | |
async def stream_output(requirements, progress=gr.Progress()): | |
"""Stream the output during app generation.""" | |
try: | |
# Initialize | |
stream_handler.update(" Starting app generation...", "Initializing") | |
yield "Starting...", None, " Starting app generation...", "Initializing" | |
# Update progress | |
phases = [ | |
(" Analyzing requirements...", "Analyzing"), | |
(" Generating architecture...", "Designing"), | |
(" Creating project structure...", "Creating"), | |
(" Implementing features...", "Implementing"), | |
(" Finalizing...", "Finalizing") | |
] | |
for msg, status in progress.tqdm(phases): | |
stream_handler.update(msg, status) | |
yield None, None, stream_handler.output[-1], status | |
await asyncio.sleep(1) # Non-blocking sleep | |
# Generate the app | |
stream_handler.update(" Running AI Flow system...", "Processing") | |
result = await app_generator(requirements) | |
# Extract download path and logs | |
download_path = None | |
logs = [] | |
for line in result.split('\n'): | |
if line.startswith("To download your project, use this path:"): | |
download_path = line.split(": ")[1].strip() | |
stream_handler.update(f" Generated download file: {Path(download_path).name}", "Processing") | |
if line.startswith("2025-") or line.startswith("INFO") or line.startswith("ERROR"): | |
logs.append(line) | |
stream_handler.update(line.split(" - ")[-1], "Processing") | |
yield None, None, "\n".join(stream_handler.output), "Processing" | |
await asyncio.sleep(0.1) # Small delay to prevent UI freezing | |
if download_path and Path(download_path).exists(): | |
stream_handler.update(" App generated successfully! Download is ready.", "Complete") | |
yield result, download_path, "\n".join(stream_handler.output), "Complete" | |
else: | |
error_msg = " Failed to generate download file" | |
stream_handler.update(error_msg, "Error") | |
yield result, None, "\n".join(stream_handler.output), "Error" | |
except Exception as e: | |
error_msg = f" Error: {str(e)}" | |
logger.error(error_msg) | |
stream_handler.update(error_msg, "Error") | |
yield error_msg, None, "\n".join(stream_handler.output), "Error" | |
def cancel_generation(): | |
"""Cancel the current generation process.""" | |
stream_handler.update(" Generation cancelled by user", "Cancelled") | |
return "Generation cancelled", None, "\n".join(stream_handler.output), "Cancelled" | |
generate_button.click( | |
stream_output, | |
inputs=[requirements_input], | |
outputs=[output, file_output, log_output, status], | |
show_progress=True | |
) | |
cancel_button.click( | |
cancel_generation, | |
outputs=[output, file_output, log_output, status] | |
) | |
# Run the Gradio app | |
if __name__ == "__main__": | |
try: | |
ui.launch( | |
share=True, # Enable sharing | |
server_name="0.0.0.0", | |
server_port=7860, | |
show_error=True | |
) | |
except Exception as e: | |
logger.error(f"Failed to launch Gradio interface: {str(e)}") |