Spaces:
Runtime error
Runtime error
import gradio as gr | |
import torch | |
from transformers import AutoModelForCausalLM, AutoTokenizer, pipeline | |
from langchain_community.llms import HuggingFacePipeline | |
from langchain.agents import initialize_agent, Tool | |
from langchain.prompts import PromptTemplate | |
import json | |
import subprocess | |
import logging | |
import asyncio | |
from typing import Dict, List, Optional | |
from dataclasses import dataclass | |
from enum import Enum | |
import networkx as nx | |
from pathlib import Path | |
from datetime import datetime | |
from typing import Set, Union, Any | |
import hashlib | |
import os | |
import json | |
from dataclasses import asdict, field | |
import shutil | |
import tempfile | |
from zipfile import ZipFile | |
# Configure logging | |
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") | |
logger = logging.getLogger(__name__) | |
# Load the LLM and tokenizer | |
MODEL_NAME = "unit-mesh/autodev-coder-deepseek-6.7b-finetunes" | |
def load_model(): | |
try: | |
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME) | |
# Check available device and memory | |
if torch.cuda.is_available(): | |
device = "cuda" | |
# Get available GPU memory | |
gpu_memory = torch.cuda.get_device_properties(0).total_memory / 1024**3 # Convert to GB | |
if gpu_memory < 8: # If less than 8GB available | |
logger.warning("Limited GPU memory available. Using CPU instead.") | |
device = "cpu" | |
else: | |
device = "cpu" | |
logger.info("No GPU detected. Using CPU.") | |
model = AutoModelForCausalLM.from_pretrained( | |
MODEL_NAME, | |
torch_dtype=torch.float16 if device == "cuda" else torch.float32, | |
device_map="auto" if device == "cuda" else None, | |
low_cpu_mem_usage=True | |
) | |
return tokenizer, model | |
except Exception as e: | |
logger.error(f"Failed to load model: {str(e)}") | |
raise RuntimeError(f"Model initialization failed: {str(e)}") | |
# Initialize models lazily | |
tokenizer = None | |
model = None | |
hf_pipeline = None | |
llm = None | |
def get_llm(): | |
global tokenizer, model, hf_pipeline, llm | |
if llm is None: | |
tokenizer, model = load_model() | |
hf_pipeline = pipeline( | |
"text-generation", | |
model=model, | |
tokenizer=tokenizer, | |
max_length=500, | |
temperature=0.7, | |
) | |
llm = HuggingFacePipeline(pipeline=hf_pipeline) | |
return llm | |
# Lazy initialization of agents | |
def get_agent(agent_type): | |
llm = get_llm() | |
return initialize_agent( | |
tools=[ | |
Tool( | |
name="Code Formatter", | |
func=lambda x: subprocess.run(["black", "-"], input=x.encode(), capture_output=True).stdout.decode(), | |
description="Formats code using Black.", | |
), | |
Tool( | |
name="API Generator", | |
func=lambda x: json.dumps({"endpoints": {"example": "POST - Example endpoint."}}), | |
description="Generates API details from code.", | |
), | |
Tool( | |
name="Task Decomposer", | |
func=lambda x: json.dumps({"tasks": ["Design UI", "Develop Backend", "Test App", "Deploy App"]}), | |
description="Breaks down app requirements into smaller tasks.", | |
), | |
], | |
llm=llm, | |
agent="zero-shot-react-description", | |
verbose=True, | |
) | |
# Enhanced prompt templates with more specific instructions | |
ui_designer_prompt = PromptTemplate( | |
input_variables=["input"], | |
template="""You are an expert UI Designer specializing in modern, responsive web applications. | |
Task: {input} | |
Focus on: | |
1. Clean, intuitive user interface | |
2. Responsive design principles | |
3. Modern UI components | |
4. Accessibility standards | |
5. Cross-browser compatibility | |
Generate code using: | |
- HTML5 semantic elements | |
- Modern CSS (Flexbox/Grid) | |
- React/Vue.js best practices | |
- Material UI or Tailwind CSS | |
Provide detailed component structure and styling.""" | |
) | |
backend_developer_prompt = PromptTemplate( | |
input_variables=["input"], | |
template="""You are an expert Backend Developer specializing in scalable applications. | |
Task: {input} | |
Focus on: | |
1. RESTful API design | |
2. Database schema optimization | |
3. Security best practices | |
4. Error handling | |
5. Performance optimization | |
Include: | |
- API endpoint definitions | |
- Database models | |
- Authentication/Authorization | |
- Input validation | |
- Error handling middleware | |
- Rate limiting | |
- Logging | |
Use modern backend frameworks (FastAPI/Django/Express).""" | |
) | |
qa_engineer_prompt = PromptTemplate( | |
input_variables=["input"], | |
template="""You are an expert QA Engineer focusing on comprehensive testing. | |
Task: {input} | |
Implement: | |
1. Unit tests | |
2. Integration tests | |
3. API endpoint tests | |
4. UI component tests | |
5. Performance tests | |
Include: | |
- Test cases for edge cases | |
- Input validation tests | |
- Error handling tests | |
- Load testing scenarios | |
- Security testing checks""" | |
) | |
devops_engineer_prompt = PromptTemplate( | |
input_variables=["input"], | |
template="""You are an expert DevOps Engineer specializing in modern deployment practices. | |
Task: {input} | |
Provide: | |
1. Dockerfile configuration | |
2. Docker Compose setup | |
3. CI/CD pipeline configuration | |
4. Environment configuration | |
5. Monitoring setup | |
Include: | |
- Development/Production configs | |
- Environment variables | |
- Health checks | |
- Logging setup | |
- Monitoring integration | |
- Backup strategies""" | |
) | |
def generate_project_structure(app_name, features): | |
"""Generate a complete project structure based on features.""" | |
return f""" | |
{app_name}/ | |
├── frontend/ | |
│ ├── src/ | |
│ │ ├── components/ | |
│ │ ├── pages/ | |
│ │ ├── hooks/ | |
│ │ ├── utils/ | |
│ │ └── styles/ | |
│ ├── package.json | |
│ └── README.md | |
├── backend/ | |
│ ├── src/ | |
│ │ ├── routes/ | |
│ │ ├── controllers/ | |
│ │ ├── models/ | |
│ │ ├── middleware/ | |
│ │ └── utils/ | |
│ ├── requirements.txt | |
│ └── README.md | |
├── tests/ | |
│ ├── unit/ | |
│ ├── integration/ | |
│ └── e2e/ | |
├── docs/ | |
│ ├── API.md | |
│ ├── SETUP.md | |
│ └── DEPLOYMENT.md | |
├── docker-compose.yml | |
├── .env.example | |
└── README.md | |
""" | |
def generate_documentation(app_name, features, api_details): | |
"""Generate comprehensive documentation.""" | |
return f""" | |
# {app_name} | |
## Overview | |
A modern web application with the following features: | |
{features} | |
## Quick Start | |
```bash | |
# Clone the repository | |
git clone <repository-url> | |
# Install dependencies | |
cd {app_name} | |
# Frontend | |
cd frontend && npm install | |
# Backend | |
cd ../backend && pip install -r requirements.txt | |
# Run the application | |
docker-compose up | |
``` | |
## API Documentation | |
{api_details} | |
## Development | |
- Frontend: React.js with TypeScript | |
- Backend: Python with FastAPI | |
- Database: PostgreSQL | |
- Cache: Redis | |
- Testing: Jest, Pytest | |
## Deployment | |
Includes Docker configuration for easy deployment: | |
- Frontend container | |
- Backend container | |
- Database container | |
- Redis container | |
## Testing | |
```bash | |
# Run frontend tests | |
cd frontend && npm test | |
# Run backend tests | |
cd backend && pytest | |
``` | |
## Contributing | |
Please read CONTRIBUTING.md for details on our code of conduct and the process for submitting pull requests. | |
## License | |
This project is licensed under the MIT License - see the LICENSE.md file for details | |
""" | |
# AI Flow States and Types | |
class FlowState(Enum): | |
PENDING = "pending" | |
RUNNING = "running" | |
COMPLETED = "completed" | |
FAILED = "failed" | |
class AgentRole(Enum): | |
ARCHITECT = "architect" | |
UI_DESIGNER = "ui_designer" | |
BACKEND_DEVELOPER = "backend_developer" | |
DATABASE_ENGINEER = "database_engineer" | |
SECURITY_EXPERT = "security_expert" | |
QA_ENGINEER = "qa_engineer" | |
DEVOPS_ENGINEER = "devops_engineer" | |
DOCUMENTATION_WRITER = "documentation_writer" | |
class AgentContext: | |
"""Context information for each agent in the flow.""" | |
role: AgentRole | |
state: FlowState | |
artifacts: Dict[str, str] | |
dependencies: List[AgentRole] | |
feedback: List[str] | |
class AIFlow: | |
"""Manages the flow of work between different AI agents.""" | |
def __init__(self): | |
self.flow_graph = nx.DiGraph() | |
self.contexts: Dict[AgentRole, AgentContext] = {} | |
self.global_context = {} | |
def initialize_flow(self): | |
"""Initialize the AI Flow with agent relationships and dependencies.""" | |
# Define agent relationships | |
flow_structure = { | |
AgentRole.ARCHITECT: [AgentRole.UI_DESIGNER, AgentRole.BACKEND_DEVELOPER, AgentRole.DATABASE_ENGINEER], | |
AgentRole.UI_DESIGNER: [AgentRole.QA_ENGINEER], | |
AgentRole.BACKEND_DEVELOPER: [AgentRole.SECURITY_EXPERT, AgentRole.QA_ENGINEER], | |
AgentRole.DATABASE_ENGINEER: [AgentRole.SECURITY_EXPERT], | |
AgentRole.SECURITY_EXPERT: [AgentRole.QA_ENGINEER], | |
AgentRole.QA_ENGINEER: [AgentRole.DEVOPS_ENGINEER], | |
AgentRole.DEVOPS_ENGINEER: [AgentRole.DOCUMENTATION_WRITER], | |
AgentRole.DOCUMENTATION_WRITER: [] | |
} | |
# Build the flow graph | |
for role, dependencies in flow_structure.items(): | |
self.flow_graph.add_node(role) | |
for dep in dependencies: | |
self.flow_graph.add_edge(role, dep) | |
# Initialize context for each agent | |
self.contexts[role] = AgentContext( | |
role=role, | |
state=FlowState.PENDING, | |
artifacts={}, | |
dependencies=dependencies, | |
feedback=[] | |
) | |
async def execute_flow(self, requirements: str): | |
"""Execute the AI Flow with parallel processing where possible.""" | |
try: | |
self.initialize_flow() | |
self.global_context["requirements"] = requirements | |
# Get all paths through the flow graph | |
paths = list(nx.all_simple_paths( | |
self.flow_graph, | |
AgentRole.ARCHITECT, | |
AgentRole.DOCUMENTATION_WRITER | |
)) | |
# Execute paths in parallel | |
await self._execute_paths(paths) | |
return self._compile_results() | |
except Exception as e: | |
logger.error(f"Flow execution failed: {str(e)}") | |
raise | |
async def _execute_paths(self, paths: List[List[AgentRole]]): | |
"""Execute all paths in the flow graph.""" | |
try: | |
results = [] | |
for path in paths: | |
path_results = [] | |
for role in path: | |
# Get the agent's prompt based on previous results | |
prompt = self._generate_prompt(role, path_results) | |
# Execute the agent's task | |
result = await self._execute_agent_task(role, prompt) | |
path_results.append(result) | |
# Store result in context | |
self.context_manager.add_memory( | |
f"{role.value}_result", | |
result, | |
{"timestamp": datetime.now()} | |
) | |
results.extend(path_results) | |
# Store all results in context | |
self.context_manager.add_memory( | |
"path_results", | |
results, | |
{"timestamp": datetime.now()} | |
) | |
return results | |
except Exception as e: | |
logger.error(f"Failed to execute paths: {str(e)}") | |
raise | |
def _generate_prompt(self, role: AgentRole, previous_results: List[str]) -> str: | |
"""Generate a prompt for an agent based on previous results.""" | |
requirements = self.context_manager.global_context.get("requirements", "") | |
# Base prompt with requirements | |
prompt = f"Requirements: {requirements}\n\n" | |
# Add context from previous results | |
if previous_results: | |
prompt += "Previous work:\n" | |
for i, result in enumerate(previous_results): | |
prompt += f"{i+1}. {result}\n" | |
# Add role-specific instructions | |
if role == AgentRole.ARCHITECT: | |
prompt += "\nAs the Architect, design the high-level system architecture." | |
elif role == AgentRole.UI_DESIGNER: | |
prompt += "\nAs the UI Designer, create the user interface design." | |
elif role == AgentRole.BACKEND_DEVELOPER: | |
prompt += "\nAs the Backend Developer, implement the server-side logic." | |
elif role == AgentRole.DATABASE_ENGINEER: | |
prompt += "\nAs the Database Engineer, design the data model and storage." | |
elif role == AgentRole.SECURITY_EXPERT: | |
prompt += "\nAs the Security Expert, ensure security best practices." | |
elif role == AgentRole.QA_ENGINEER: | |
prompt += "\nAs the QA Engineer, create test cases and validation." | |
elif role == AgentRole.DEVOPS_ENGINEER: | |
prompt += "\nAs the DevOps Engineer, set up deployment and CI/CD." | |
elif role == AgentRole.DOCUMENTATION_WRITER: | |
prompt += "\nAs the Documentation Writer, create comprehensive documentation." | |
return prompt | |
def _compile_results(self) -> str: | |
"""Compile all results into a final output.""" | |
try: | |
results = [] | |
# Get all results from memory | |
for role in AgentRole: | |
result = self.context_manager.get_memory(f"{role.value}_result") | |
if result: | |
results.append(f"## {role.value}\n{result['value']}\n") | |
return "\n".join(results) | |
except Exception as e: | |
logger.error(f"Failed to compile results: {str(e)}") | |
raise | |
async def _execute_agent_task(self, role: AgentRole, prompt: str) -> str: | |
"""Execute a specific agent's task with the given prompt.""" | |
try: | |
if role == AgentRole.ARCHITECT: | |
agent = get_agent("architect") | |
elif role == AgentRole.UI_DESIGNER: | |
agent = get_agent("ui_designer") | |
elif role == AgentRole.BACKEND_DEVELOPER: | |
agent = get_agent("backend_developer") | |
elif role == AgentRole.DATABASE_ENGINEER: | |
agent = get_agent("database_engineer") | |
elif role == AgentRole.SECURITY_EXPERT: | |
agent = get_agent("security_expert") | |
elif role == AgentRole.QA_ENGINEER: | |
agent = get_agent("qa_engineer") | |
elif role == AgentRole.DEVOPS_ENGINEER: | |
agent = get_agent("devops_engineer") | |
elif role == AgentRole.DOCUMENTATION_WRITER: | |
agent = get_agent("documentation_writer") | |
else: | |
raise ValueError(f"Unknown agent role: {role}") | |
# Execute the agent's task | |
result = agent.run(prompt) | |
# Log the execution | |
logger.info(f"Agent {role.value} completed task") | |
return result | |
except Exception as e: | |
logger.error(f"Agent {role.value} failed: {str(e)}") | |
raise | |
class FileContext: | |
"""Context for file operations and tracking.""" | |
path: Path | |
content: str | |
last_modified: datetime | |
dependencies: Set[Path] | |
checksum: str | |
def from_path(cls, path: Path): | |
content = path.read_text() | |
return cls( | |
path=path, | |
content=content, | |
last_modified=datetime.fromtimestamp(path.stat().st_mtime), | |
dependencies=set(), | |
checksum=hashlib.md5(content.encode()).hexdigest() | |
) | |
class MemoryItem: | |
"""Represents a single memory item in the system.""" | |
key: str | |
value: Any | |
context: dict | |
timestamp: datetime | |
importance: float = 1.0 | |
references: Set[str] = field(default_factory=set) | |
class ContextManager: | |
"""Manages real-time context awareness across the system.""" | |
def __init__(self): | |
self.file_contexts: Dict[Path, FileContext] = {} | |
self.global_context: Dict[str, Any] = {} | |
self.command_history: List[Dict] = [] | |
self.memory_store: Dict[str, MemoryItem] = {} | |
def update_file_context(self, path: Path) -> FileContext: | |
"""Update context for a specific file.""" | |
context = FileContext.from_path(path) | |
self.file_contexts[path] = context | |
return context | |
def get_related_files(self, path: Path) -> Set[Path]: | |
"""Find files related to the given file.""" | |
if path not in self.file_contexts: | |
self.update_file_context(path) | |
context = self.file_contexts[path] | |
return context.dependencies | |
def track_command(self, command: str, args: List[str], result: Any): | |
"""Track command execution and results.""" | |
self.command_history.append({ | |
'command': command, | |
'args': args, | |
'result': result, | |
'timestamp': datetime.now(), | |
}) | |
def add_memory(self, key: str, value: Any, context: dict = None): | |
"""Add an item to the memory store.""" | |
self.memory_store[key] = MemoryItem( | |
key=key, | |
value=value, | |
context=context or {}, | |
timestamp=datetime.now() | |
) | |
def get_memory(self, key: str) -> Any: | |
"""Retrieve an item from memory.""" | |
item = self.memory_store.get(key) | |
return item.value if item else None | |
class FileOperationManager: | |
"""Manages multi-file operations and tracking.""" | |
def __init__(self, context_manager: ContextManager): | |
self.context_manager = context_manager | |
self.pending_changes: Dict[Path, str] = {} | |
async def edit_files(self, changes: Dict[Path, str]): | |
"""Apply changes to multiple files atomically.""" | |
try: | |
# Validate all changes first | |
for path, content in changes.items(): | |
if not self._validate_change(path, content): | |
raise ValueError(f"Invalid change for {path}") | |
# Apply changes | |
for path, content in changes.items(): | |
await self._apply_change(path, content) | |
# Update contexts | |
for path in changes: | |
self.context_manager.update_file_context(path) | |
except Exception as e: | |
logger.error(f"Failed to apply multi-file changes: {str(e)}") | |
raise | |
def _validate_change(self, path: Path, content: str) -> bool: | |
"""Validate a proposed file change.""" | |
try: | |
# Check file exists or can be created | |
if not path.parent.exists(): | |
path.parent.mkdir(parents=True) | |
# Validate syntax if it's a Python file | |
if path.suffix == '.py': | |
compile(content, str(path), 'exec') | |
return True | |
except Exception as e: | |
logger.error(f"Validation failed for {path}: {str(e)}") | |
return False | |
async def _apply_change(self, path: Path, content: str): | |
"""Apply a single file change.""" | |
path.write_text(content) | |
class CommandManager: | |
"""Manages command suggestions and execution.""" | |
def __init__(self, context_manager: ContextManager): | |
self.context_manager = context_manager | |
self.command_templates: Dict[str, str] = {} | |
def suggest_commands(self, context: dict) -> List[Dict]: | |
"""Suggest relevant commands based on context.""" | |
suggestions = [] | |
for cmd_name, template in self.command_templates.items(): | |
if self._is_relevant(cmd_name, context): | |
suggestions.append({ | |
'command': cmd_name, | |
'template': template, | |
'confidence': self._calculate_confidence(cmd_name, context) | |
}) | |
return sorted(suggestions, key=lambda x: x['confidence'], reverse=True) | |
async def execute_command(self, command: str, args: List[str]) -> Any: | |
"""Execute a command and track its result.""" | |
try: | |
# Execute the command | |
result = await self._run_command(command, args) | |
# Track the execution | |
self.context_manager.track_command(command, args, result) | |
return result | |
except Exception as e: | |
logger.error(f"Command execution failed: {str(e)}") | |
raise | |
def _is_relevant(self, cmd_name: str, context: dict) -> bool: | |
"""Determine if a command is relevant to the current context.""" | |
# Implementation depends on specific rules | |
return True | |
def _calculate_confidence(self, cmd_name: str, context: dict) -> float: | |
"""Calculate confidence score for a command suggestion.""" | |
# Implementation depends on specific metrics | |
return 1.0 | |
class RuleSystem: | |
"""Manages system rules and constraints.""" | |
def __init__(self): | |
self.rules: Dict[str, callable] = {} | |
self.constraints: Dict[str, callable] = {} | |
def add_rule(self, name: str, rule_func: callable): | |
"""Add a new rule to the system.""" | |
self.rules[name] = rule_func | |
def add_constraint(self, name: str, constraint_func: callable): | |
"""Add a new constraint to the system.""" | |
self.constraints[name] = constraint_func | |
def evaluate_rules(self, context: dict) -> Dict[str, bool]: | |
"""Evaluate all rules against the current context.""" | |
return {name: rule(context) for name, rule in self.rules.items()} | |
def check_constraints(self, context: dict) -> Dict[str, bool]: | |
"""Check all constraints against the current context.""" | |
return {name: constraint(context) for name, constraint in self.constraints.items()} | |
class ProjectBuilder: | |
"""Handles autonomous creation of project files and folders.""" | |
def __init__(self, base_path: Path): | |
self.base_path = Path(base_path) | |
self.current_build = None | |
self.file_manifest = [] | |
async def create_project(self, app_name: str, structure: dict) -> Path: | |
"""Create a new project with the specified structure.""" | |
try: | |
# Create temporary build directory | |
build_dir = Path(tempfile.mkdtemp()) | |
self.current_build = build_dir / app_name | |
self.current_build.mkdir(parents=True) | |
# Create project structure | |
await self._create_structure(self.current_build, structure) | |
return self.current_build | |
except Exception as e: | |
logger.error(f"Project creation failed: {str(e)}") | |
if self.current_build and self.current_build.exists(): | |
shutil.rmtree(self.current_build) | |
raise | |
async def _create_structure(self, parent: Path, structure: dict): | |
"""Recursively create project structure.""" | |
for name, content in structure.items(): | |
path = parent / name | |
if isinstance(content, dict): | |
path.mkdir(exist_ok=True) | |
await self._create_structure(path, content) | |
else: | |
path.write_text(str(content)) | |
self.file_manifest.append(path) | |
class OutputManager: | |
"""Manages project outputs and creates downloadable artifacts.""" | |
def __init__(self, project_builder: ProjectBuilder): | |
self.project_builder = project_builder | |
self.output_dir = Path(tempfile.mkdtemp()) | |
self.downloads = {} | |
def create_download(self, app_name: str) -> str: | |
"""Create a downloadable zip file of the project.""" | |
try: | |
if not self.project_builder.current_build: | |
raise ValueError("No project has been built yet") | |
# Create zip file | |
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
zip_name = f"{app_name}_{timestamp}.zip" | |
zip_path = self.output_dir / zip_name | |
with ZipFile(zip_path, 'w') as zipf: | |
for file_path in self.project_builder.file_manifest: | |
rel_path = file_path.relative_to(self.project_builder.current_build) | |
zipf.write(file_path, rel_path) | |
# Store download info | |
self.downloads[zip_name] = { | |
'path': zip_path, | |
'created_at': datetime.now(), | |
'size': zip_path.stat().st_size | |
} | |
return str(zip_path) | |
except Exception as e: | |
logger.error(f"Failed to create download: {str(e)}") | |
raise | |
class EnhancedAIFlow(AIFlow): | |
"""Enhanced AI Flow with project building and output management.""" | |
def __init__(self): | |
super().__init__() | |
self.project_builder = ProjectBuilder(Path(tempfile.mkdtemp())) | |
self.output_manager = OutputManager(self.project_builder) | |
self.context_manager = ContextManager() | |
self.file_manager = FileOperationManager(self.context_manager) | |
self.command_manager = CommandManager(self.context_manager) | |
self.rule_system = RuleSystem() | |
async def _execute_agent_task(self, role: AgentRole, prompt: str) -> str: | |
"""Execute a specific agent's task with the given prompt.""" | |
try: | |
if role == AgentRole.ARCHITECT: | |
agent = get_agent("architect") | |
elif role == AgentRole.UI_DESIGNER: | |
agent = get_agent("ui_designer") | |
elif role == AgentRole.BACKEND_DEVELOPER: | |
agent = get_agent("backend_developer") | |
elif role == AgentRole.DATABASE_ENGINEER: | |
agent = get_agent("database_engineer") | |
elif role == AgentRole.SECURITY_EXPERT: | |
agent = get_agent("security_expert") | |
elif role == AgentRole.QA_ENGINEER: | |
agent = get_agent("qa_engineer") | |
elif role == AgentRole.DEVOPS_ENGINEER: | |
agent = get_agent("devops_engineer") | |
elif role == AgentRole.DOCUMENTATION_WRITER: | |
agent = get_agent("documentation_writer") | |
else: | |
raise ValueError(f"Unknown agent role: {role}") | |
# Execute the agent's task | |
result = agent.run(prompt) | |
# Log the execution | |
logger.info(f"Agent {role.value} completed task") | |
return result | |
except Exception as e: | |
logger.error(f"Agent {role.value} failed: {str(e)}") | |
raise | |
async def execute_flow(self, requirements: str): | |
"""Execute the AI Flow and build the project.""" | |
try: | |
# Initialize flow with context | |
self.initialize_flow() | |
self.global_context["requirements"] = requirements | |
self.context_manager.global_context.update(self.global_context) | |
# Add memory of requirements | |
self.context_manager.add_memory( | |
"requirements", | |
requirements, | |
{"timestamp": datetime.now()} | |
) | |
# Get all paths through the flow graph | |
paths = list(nx.all_simple_paths( | |
self.flow_graph, | |
AgentRole.ARCHITECT, | |
AgentRole.DOCUMENTATION_WRITER | |
)) | |
# Execute paths with enhanced context | |
await self._execute_paths(paths) | |
# Extract app name | |
app_name = requirements.split()[0].lower().replace(" ", "_") | |
# Create basic project structure | |
structure = { | |
"frontend": { | |
"src": { | |
"components": {}, | |
"pages": {}, | |
"styles": {}, | |
}, | |
"package.json": "{\n \"name\": \"frontend\",\n \"version\": \"1.0.0\"\n}", | |
"README.md": "# Frontend\n" | |
}, | |
"backend": { | |
"src": { | |
"routes": {}, | |
"models": {}, | |
"controllers": {}, | |
}, | |
"requirements.txt": "fastapi\nuvicorn\n", | |
"README.md": "# Backend\n" | |
}, | |
"README.md": f"# {app_name}\nGenerated by AI Flow\n" | |
} | |
# Build the project | |
await self.project_builder.create_project(app_name, structure) | |
# Create download | |
download_path = self.output_manager.create_download(app_name) | |
# Compile results | |
results = self._compile_results() | |
# Add download information to results | |
results += f""" | |
## Download | |
Your project has been created and is ready for download: | |
- File: {Path(download_path).name} | |
- Size: {self.output_manager.downloads[Path(download_path).name]['size'] / 1024:.1f} KB | |
- Created: {self.output_manager.downloads[Path(download_path).name]['created_at'].strftime('%Y-%m-%d %H:%M:%S')} | |
To download your project, use this path: {download_path} | |
## Project Structure | |
The following files have been created: | |
""" | |
for file_path in self.project_builder.file_manifest: | |
rel_path = file_path.relative_to(self.project_builder.current_build) | |
results += f"- {rel_path}\n" | |
return results | |
except Exception as e: | |
logger.error(f"Enhanced flow execution failed: {str(e)}") | |
raise | |
async def _execute_agent(self, role: AgentRole): | |
"""Execute a single agent's tasks with enhanced context.""" | |
context = self.contexts[role] | |
context.state = FlowState.RUNNING | |
try: | |
# Get agent-specific prompt with context | |
prompt = self._get_agent_prompt(role) | |
# Add current context to prompt | |
prompt += f"\n\nContext:\n{json.dumps(self.context_manager.global_context, indent=2)}" | |
# Execute agent's task | |
result = await self._execute_agent_task(role, prompt) | |
# Store results with context | |
context.artifacts["output"] = result | |
context.state = FlowState.COMPLETED | |
# Update memory | |
self.context_manager.add_memory( | |
f"agent_result_{role.value}", | |
result, | |
{"role": role.value, "timestamp": datetime.now()} | |
) | |
except Exception as e: | |
context.state = FlowState.FAILED | |
context.feedback.append(str(e)) | |
raise | |
# Update the multi_agent_workflow function to use AI Flows | |
async def multi_agent_workflow(requirements: str) -> str: | |
""" | |
Execute a multi-agent workflow using AI Flows to generate a complex app. | |
Args: | |
requirements (str): App requirements. | |
Returns: | |
str: Generated app code and API details. | |
""" | |
try: | |
# Initialize and execute AI Flow | |
flow = EnhancedAIFlow() | |
results = await flow.execute_flow(requirements) | |
# Extract app name | |
app_name = requirements.split()[0].lower().replace(" ", "_") | |
# Generate project structure and documentation | |
project_structure = generate_project_structure(app_name, flow.contexts[AgentRole.ARCHITECT].artifacts) | |
documentation = generate_documentation(app_name, requirements, flow.contexts[AgentRole.DOCUMENTATION_WRITER].artifacts) | |
return f""" | |
# {app_name.title()} - Generated Application | |
## Project Structure | |
``` | |
{project_structure} | |
``` | |
{results} | |
## Documentation | |
{documentation} | |
## Next Steps | |
1. Review the generated architecture and components | |
2. Set up the development environment | |
3. Implement the components following the provided structure | |
4. Run the test suite | |
5. Deploy using the provided configurations | |
## Support | |
For any issues or questions, please refer to the documentation or create an issue in the repository. | |
""" | |
except Exception as e: | |
error_msg = f"Workflow failed: {str(e)}" | |
logger.error(error_msg) | |
return error_msg | |
finally: | |
if torch.cuda.is_available(): | |
torch.cuda.empty_cache() | |
# Update the app_generator function to handle async execution | |
def app_generator(requirements: str): | |
""" | |
Generate an app based on the provided requirements using AI Flows. | |
Args: | |
requirements (str): App requirements. | |
Returns: | |
str: Generated app code and API details. | |
""" | |
if not requirements or len(requirements.strip()) == 0: | |
return "Please provide app requirements." | |
try: | |
# Run the async workflow in a new event loop | |
loop = asyncio.new_event_loop() | |
asyncio.set_event_loop(loop) | |
return loop.run_until_complete(multi_agent_workflow(requirements)) | |
except Exception as e: | |
error_msg = f"App generation failed: {str(e)}" | |
logger.error(error_msg) | |
return error_msg | |
finally: | |
loop.close() | |
class StreamHandler: | |
"""Handles streaming output for the Gradio interface.""" | |
def __init__(self): | |
self.output = [] | |
self.current_status = "" | |
def update(self, message: str, status: str = None): | |
"""Update the output stream.""" | |
self.output.append(message) | |
if status: | |
self.current_status = status | |
return "\n".join(self.output), self.current_status | |
# Gradio UI | |
with gr.Blocks(theme=gr.themes.Soft()) as ui: | |
stream_handler = StreamHandler() | |
gr.Markdown("# Autonomous App Generator with AI Flow") | |
gr.Markdown(""" | |
## Instructions | |
1. Describe the app you want to build in detail | |
2. Include any specific requirements or features | |
3. Click 'Generate App' to start the process | |
4. Download your generated app from the provided link | |
### Example: | |
``` | |
Create a personal task management application with: | |
- User authentication (email/password, Google OAuth) | |
- Task management (CRUD, priorities, due dates, reminders) | |
- Modern UI with dark/light theme | |
- Real-time updates using WebSocket | |
- PostgreSQL and Redis for storage | |
``` | |
""") | |
with gr.Row(): | |
with gr.Column(scale=4): | |
requirements_input = gr.Textbox( | |
label="App Requirements", | |
placeholder="Describe the app you want to build...", | |
lines=10 | |
) | |
with gr.Row(): | |
generate_button = gr.Button("Generate App", variant="primary") | |
cancel_button = gr.Button("Cancel", variant="stop") | |
status = gr.Textbox( | |
label="Status", | |
value="Ready", | |
interactive=False | |
) | |
with gr.Column(scale=6): | |
with gr.Tabs(): | |
with gr.TabItem("Output"): | |
output = gr.Markdown( | |
label="Generated App Details", | |
value="Your app details will appear here..." | |
) | |
with gr.TabItem("Download"): | |
file_output = gr.File( | |
label="Download Generated App", | |
interactive=False | |
) | |
with gr.TabItem("Live Log"): | |
log_output = gr.Markdown( | |
label="Generation Logs", | |
value="Logs will appear here...", | |
autoscroll=True | |
) | |
def stream_output(requirements, progress=gr.Progress()): | |
"""Stream the output during app generation.""" | |
try: | |
# Initialize | |
stream_handler.update("Starting app generation...", "Initializing") | |
yield "Starting...", None, "Starting app generation...", "Initializing" | |
# Update progress | |
for i in progress.tqdm(range(5)): | |
if i == 0: | |
msg = "Analyzing requirements..." | |
stream_handler.update(msg, "Analyzing") | |
yield None, None, stream_handler.output[-1], "Analyzing" | |
elif i == 1: | |
msg = "Generating architecture..." | |
stream_handler.update(msg, "Designing") | |
yield None, None, stream_handler.output[-1], "Designing" | |
elif i == 2: | |
msg = "Creating project structure..." | |
stream_handler.update(msg, "Creating") | |
yield None, None, stream_handler.output[-1], "Creating" | |
elif i == 3: | |
msg = "Implementing features..." | |
stream_handler.update(msg, "Implementing") | |
yield None, None, stream_handler.output[-1], "Implementing" | |
elif i == 4: | |
msg = "Finalizing..." | |
stream_handler.update(msg, "Finalizing") | |
yield None, None, stream_handler.output[-1], "Finalizing" | |
time.sleep(1) # Simulate work | |
# Generate the app | |
result = asyncio.run(app_generator(requirements)) | |
# Extract download path and logs | |
download_path = None | |
logs = [] | |
for line in result.split('\n'): | |
if line.startswith("To download your project, use this path:"): | |
download_path = line.split(": ")[1].strip() | |
if line.startswith("2025-") or line.startswith("INFO") or line.startswith("ERROR"): | |
logs.append(line) | |
stream_handler.update(line) | |
yield None, None, "\n".join(stream_handler.output), "Processing" | |
if download_path and Path(download_path).exists(): | |
stream_handler.update(" App generated successfully!", "Complete") | |
yield result, download_path, "\n".join(stream_handler.output), "Complete" | |
else: | |
error_msg = " Failed to generate download file" | |
stream_handler.update(error_msg, "Error") | |
yield result, None, "\n".join(stream_handler.output), "Error" | |
except Exception as e: | |
error_msg = f" Failed to generate app: {str(e)}" | |
logger.error(error_msg) | |
stream_handler.update(error_msg, "Error") | |
yield error_msg, None, "\n".join(stream_handler.output), "Error" | |
def cancel_generation(): | |
"""Cancel the current generation process.""" | |
stream_handler.update(" Generation cancelled by user", "Cancelled") | |
return "Generation cancelled", None, "\n".join(stream_handler.output), "Cancelled" | |
generate_button.click( | |
stream_output, | |
inputs=[requirements_input], | |
outputs=[output, file_output, log_output, status], | |
show_progress=True | |
) | |
cancel_button.click( | |
cancel_generation, | |
outputs=[output, file_output, log_output, status] | |
) | |
# Run the Gradio app | |
if __name__ == "__main__": | |
try: | |
ui.launch( | |
share=True, # Enable sharing | |
server_name="0.0.0.0", | |
server_port=7860, | |
show_error=True, | |
enable_queue=True | |
) | |
except Exception as e: | |
logger.error(f"Failed to launch Gradio interface: {str(e)}") |