Spaces:
Runtime error
Runtime error
nananie143
Enhanced app creator with autonomous file creation, project building, and download capabilities
a74a84f
import gradio as gr | |
import torch | |
from transformers import AutoModelForCausalLM, AutoTokenizer, pipeline | |
from langchain_community.llms import HuggingFacePipeline | |
from langchain.agents import initialize_agent, Tool | |
from langchain.prompts import PromptTemplate | |
import json | |
import subprocess | |
import logging | |
import asyncio | |
from typing import Dict, List, Optional | |
from dataclasses import dataclass | |
from enum import Enum | |
import networkx as nx | |
from pathlib import Path | |
from datetime import datetime | |
from typing import Set, Union, Any | |
import hashlib | |
import os | |
import json | |
from dataclasses import asdict, field | |
import shutil | |
import tempfile | |
from zipfile import ZipFile | |
# Configure logging | |
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") | |
logger = logging.getLogger(__name__) | |
# Load the LLM and tokenizer | |
MODEL_NAME = "unit-mesh/autodev-coder-deepseek-6.7b-finetunes" | |
def load_model(): | |
try: | |
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME) | |
# Check available device and memory | |
if torch.cuda.is_available(): | |
device = "cuda" | |
# Get available GPU memory | |
gpu_memory = torch.cuda.get_device_properties(0).total_memory / 1024**3 # Convert to GB | |
if gpu_memory < 8: # If less than 8GB available | |
logger.warning("Limited GPU memory available. Using CPU instead.") | |
device = "cpu" | |
else: | |
device = "cpu" | |
logger.info("No GPU detected. Using CPU.") | |
model = AutoModelForCausalLM.from_pretrained( | |
MODEL_NAME, | |
torch_dtype=torch.float16 if device == "cuda" else torch.float32, | |
device_map="auto" if device == "cuda" else None, | |
low_cpu_mem_usage=True | |
) | |
return tokenizer, model | |
except Exception as e: | |
logger.error(f"Failed to load model: {str(e)}") | |
raise RuntimeError(f"Model initialization failed: {str(e)}") | |
# Initialize models lazily | |
tokenizer = None | |
model = None | |
hf_pipeline = None | |
llm = None | |
def get_llm(): | |
global tokenizer, model, hf_pipeline, llm | |
if llm is None: | |
tokenizer, model = load_model() | |
hf_pipeline = pipeline( | |
"text-generation", | |
model=model, | |
tokenizer=tokenizer, | |
max_length=500, | |
temperature=0.7, | |
) | |
llm = HuggingFacePipeline(pipeline=hf_pipeline) | |
return llm | |
# Lazy initialization of agents | |
def get_agent(agent_type): | |
llm = get_llm() | |
return initialize_agent( | |
tools=[ | |
Tool( | |
name="Code Formatter", | |
func=lambda x: subprocess.run(["black", "-"], input=x.encode(), capture_output=True).stdout.decode(), | |
description="Formats code using Black.", | |
), | |
Tool( | |
name="API Generator", | |
func=lambda x: json.dumps({"endpoints": {"example": "POST - Example endpoint."}}), | |
description="Generates API details from code.", | |
), | |
Tool( | |
name="Task Decomposer", | |
func=lambda x: json.dumps({"tasks": ["Design UI", "Develop Backend", "Test App", "Deploy App"]}), | |
description="Breaks down app requirements into smaller tasks.", | |
), | |
], | |
llm=llm, | |
agent="zero-shot-react-description", | |
verbose=True, | |
) | |
# Enhanced prompt templates with more specific instructions | |
ui_designer_prompt = PromptTemplate( | |
input_variables=["input"], | |
template="""You are an expert UI Designer specializing in modern, responsive web applications. | |
Task: {input} | |
Focus on: | |
1. Clean, intuitive user interface | |
2. Responsive design principles | |
3. Modern UI components | |
4. Accessibility standards | |
5. Cross-browser compatibility | |
Generate code using: | |
- HTML5 semantic elements | |
- Modern CSS (Flexbox/Grid) | |
- React/Vue.js best practices | |
- Material UI or Tailwind CSS | |
Provide detailed component structure and styling.""" | |
) | |
backend_developer_prompt = PromptTemplate( | |
input_variables=["input"], | |
template="""You are an expert Backend Developer specializing in scalable applications. | |
Task: {input} | |
Focus on: | |
1. RESTful API design | |
2. Database schema optimization | |
3. Security best practices | |
4. Error handling | |
5. Performance optimization | |
Include: | |
- API endpoint definitions | |
- Database models | |
- Authentication/Authorization | |
- Input validation | |
- Error handling middleware | |
- Rate limiting | |
- Logging | |
Use modern backend frameworks (FastAPI/Django/Express).""" | |
) | |
qa_engineer_prompt = PromptTemplate( | |
input_variables=["input"], | |
template="""You are an expert QA Engineer focusing on comprehensive testing. | |
Task: {input} | |
Implement: | |
1. Unit tests | |
2. Integration tests | |
3. API endpoint tests | |
4. UI component tests | |
5. Performance tests | |
Include: | |
- Test cases for edge cases | |
- Input validation tests | |
- Error handling tests | |
- Load testing scenarios | |
- Security testing checks""" | |
) | |
devops_engineer_prompt = PromptTemplate( | |
input_variables=["input"], | |
template="""You are an expert DevOps Engineer specializing in modern deployment practices. | |
Task: {input} | |
Provide: | |
1. Dockerfile configuration | |
2. Docker Compose setup | |
3. CI/CD pipeline configuration | |
4. Environment configuration | |
5. Monitoring setup | |
Include: | |
- Development/Production configs | |
- Environment variables | |
- Health checks | |
- Logging setup | |
- Monitoring integration | |
- Backup strategies""" | |
) | |
def generate_project_structure(app_name, features): | |
"""Generate a complete project structure based on features.""" | |
return f""" | |
{app_name}/ | |
├── frontend/ | |
│ ├── src/ | |
│ │ ├── components/ | |
│ │ ├── pages/ | |
│ │ ├── hooks/ | |
│ │ ├── utils/ | |
│ │ └── styles/ | |
│ ├── package.json | |
│ └── README.md | |
├── backend/ | |
│ ├── src/ | |
│ │ ├── routes/ | |
│ │ ├── controllers/ | |
│ │ ├── models/ | |
│ │ ├── middleware/ | |
│ │ └── utils/ | |
│ ├── requirements.txt | |
│ └── README.md | |
├── tests/ | |
│ ├── unit/ | |
│ ├── integration/ | |
│ └── e2e/ | |
├── docs/ | |
│ ├── API.md | |
│ ├── SETUP.md | |
│ └── DEPLOYMENT.md | |
├── docker-compose.yml | |
├── .env.example | |
└── README.md | |
""" | |
def generate_documentation(app_name, features, api_details): | |
"""Generate comprehensive documentation.""" | |
return f""" | |
# {app_name} | |
## Overview | |
A modern web application with the following features: | |
{features} | |
## Quick Start | |
```bash | |
# Clone the repository | |
git clone <repository-url> | |
# Install dependencies | |
cd {app_name} | |
# Frontend | |
cd frontend && npm install | |
# Backend | |
cd ../backend && pip install -r requirements.txt | |
# Run the application | |
docker-compose up | |
``` | |
## API Documentation | |
{api_details} | |
## Development | |
- Frontend: React.js with TypeScript | |
- Backend: Python with FastAPI | |
- Database: PostgreSQL | |
- Cache: Redis | |
- Testing: Jest, Pytest | |
## Deployment | |
Includes Docker configuration for easy deployment: | |
- Frontend container | |
- Backend container | |
- Database container | |
- Redis container | |
## Testing | |
```bash | |
# Run frontend tests | |
cd frontend && npm test | |
# Run backend tests | |
cd backend && pytest | |
``` | |
## Contributing | |
Please read CONTRIBUTING.md for details on our code of conduct and the process for submitting pull requests. | |
## License | |
This project is licensed under the MIT License - see the LICENSE.md file for details | |
""" | |
# AI Flow States and Types | |
class FlowState(Enum): | |
PENDING = "pending" | |
RUNNING = "running" | |
COMPLETED = "completed" | |
FAILED = "failed" | |
class AgentRole(Enum): | |
ARCHITECT = "architect" | |
UI_DESIGNER = "ui_designer" | |
BACKEND_DEVELOPER = "backend_developer" | |
DATABASE_ENGINEER = "database_engineer" | |
SECURITY_EXPERT = "security_expert" | |
QA_ENGINEER = "qa_engineer" | |
DEVOPS_ENGINEER = "devops_engineer" | |
DOCUMENTATION_WRITER = "documentation_writer" | |
class AgentContext: | |
"""Context information for each agent in the flow.""" | |
role: AgentRole | |
state: FlowState | |
artifacts: Dict[str, str] | |
dependencies: List[AgentRole] | |
feedback: List[str] | |
class AIFlow: | |
"""Manages the flow of work between different AI agents.""" | |
def __init__(self): | |
self.flow_graph = nx.DiGraph() | |
self.contexts: Dict[AgentRole, AgentContext] = {} | |
self.global_context = {} | |
def initialize_flow(self): | |
"""Initialize the AI Flow with agent relationships and dependencies.""" | |
# Define agent relationships | |
flow_structure = { | |
AgentRole.ARCHITECT: [AgentRole.UI_DESIGNER, AgentRole.BACKEND_DEVELOPER, AgentRole.DATABASE_ENGINEER], | |
AgentRole.UI_DESIGNER: [AgentRole.QA_ENGINEER], | |
AgentRole.BACKEND_DEVELOPER: [AgentRole.SECURITY_EXPERT, AgentRole.QA_ENGINEER], | |
AgentRole.DATABASE_ENGINEER: [AgentRole.SECURITY_EXPERT], | |
AgentRole.SECURITY_EXPERT: [AgentRole.QA_ENGINEER], | |
AgentRole.QA_ENGINEER: [AgentRole.DEVOPS_ENGINEER], | |
AgentRole.DEVOPS_ENGINEER: [AgentRole.DOCUMENTATION_WRITER], | |
AgentRole.DOCUMENTATION_WRITER: [] | |
} | |
# Build the flow graph | |
for role, dependencies in flow_structure.items(): | |
self.flow_graph.add_node(role) | |
for dep in dependencies: | |
self.flow_graph.add_edge(role, dep) | |
# Initialize context for each agent | |
self.contexts[role] = AgentContext( | |
role=role, | |
state=FlowState.PENDING, | |
artifacts={}, | |
dependencies=dependencies, | |
feedback=[] | |
) | |
async def execute_flow(self, requirements: str): | |
"""Execute the AI Flow with parallel processing where possible.""" | |
try: | |
self.initialize_flow() | |
self.global_context["requirements"] = requirements | |
# Get all paths through the flow graph | |
paths = list(nx.all_simple_paths( | |
self.flow_graph, | |
AgentRole.ARCHITECT, | |
AgentRole.DOCUMENTATION_WRITER | |
)) | |
# Execute paths in parallel | |
await self._execute_paths(paths) | |
return self._compile_results() | |
except Exception as e: | |
logger.error(f"Flow execution failed: {str(e)}") | |
raise | |
async def _execute_paths(self, paths: List[List[AgentRole]]): | |
"""Execute multiple paths through the flow in parallel.""" | |
path_tasks = [self._execute_path(path) for path in paths] | |
await asyncio.gather(*path_tasks) | |
async def _execute_path(self, path: List[AgentRole]): | |
"""Execute a single path through the flow.""" | |
for role in path: | |
context = self.contexts[role] | |
if context.state != FlowState.COMPLETED: | |
await self._execute_agent(role) | |
async def _execute_agent(self, role: AgentRole): | |
"""Execute a single agent's tasks.""" | |
context = self.contexts[role] | |
context.state = FlowState.RUNNING | |
try: | |
# Get agent-specific prompt | |
prompt = self._get_agent_prompt(role) | |
# Execute agent's task | |
if role == AgentRole.ARCHITECT: | |
result = await self._execute_architect(prompt) | |
elif role == AgentRole.UI_DESIGNER: | |
result = await self._execute_ui_designer(prompt) | |
elif role == AgentRole.BACKEND_DEVELOPER: | |
result = await self._execute_backend_developer(prompt) | |
# ... (similar for other roles) | |
# Store results in context | |
context.artifacts["output"] = result | |
context.state = FlowState.COMPLETED | |
except Exception as e: | |
context.state = FlowState.FAILED | |
context.feedback.append(str(e)) | |
raise | |
def _get_agent_prompt(self, role: AgentRole) -> str: | |
"""Get the appropriate prompt template for each agent role.""" | |
prompts = { | |
AgentRole.ARCHITECT: """You are a Software Architect designing a scalable application. | |
Requirements: {requirements} | |
Focus on: | |
1. System architecture | |
2. Component interactions | |
3. Technology stack selection | |
4. Scalability considerations | |
5. Integration patterns | |
Provide: | |
- High-level architecture diagram | |
- Component breakdown | |
- Technology recommendations | |
- Integration patterns | |
- Performance considerations""", | |
# ... (other role-specific prompts) | |
} | |
base_prompt = prompts.get(role, "") | |
return base_prompt.format(**self.global_context) | |
async def _execute_architect(self, prompt: str) -> str: | |
"""Execute the Architect agent's tasks.""" | |
agent = get_agent("architect") | |
return agent.run(prompt) | |
async def _execute_ui_designer(self, prompt: str) -> str: | |
"""Execute the UI Designer agent's tasks.""" | |
agent = get_agent("ui_designer") | |
return agent.run(prompt) | |
# ... (similar methods for other roles) | |
def _compile_results(self) -> str: | |
"""Compile the results from all agents into a final output.""" | |
results = [] | |
for role, context in self.contexts.items(): | |
if context.state == FlowState.COMPLETED: | |
results.append(f"## {role.value.replace('_', ' ').title()} Output") | |
results.append(context.artifacts.get("output", "No output available")) | |
results.append("") | |
return "\n".join(results) | |
class FileContext: | |
"""Context for file operations and tracking.""" | |
path: Path | |
content: str | |
last_modified: datetime | |
dependencies: Set[Path] | |
checksum: str | |
def from_path(cls, path: Path): | |
content = path.read_text() | |
return cls( | |
path=path, | |
content=content, | |
last_modified=datetime.fromtimestamp(path.stat().st_mtime), | |
dependencies=set(), | |
checksum=hashlib.md5(content.encode()).hexdigest() | |
) | |
class MemoryItem: | |
"""Represents a single memory item in the system.""" | |
key: str | |
value: Any | |
context: dict | |
timestamp: datetime | |
importance: float = 1.0 | |
references: Set[str] = field(default_factory=set) | |
class ContextManager: | |
"""Manages real-time context awareness across the system.""" | |
def __init__(self): | |
self.file_contexts: Dict[Path, FileContext] = {} | |
self.global_context: Dict[str, Any] = {} | |
self.command_history: List[Dict] = [] | |
self.memory_store: Dict[str, MemoryItem] = {} | |
def update_file_context(self, path: Path) -> FileContext: | |
"""Update context for a specific file.""" | |
context = FileContext.from_path(path) | |
self.file_contexts[path] = context | |
return context | |
def get_related_files(self, path: Path) -> Set[Path]: | |
"""Find files related to the given file.""" | |
if path not in self.file_contexts: | |
self.update_file_context(path) | |
context = self.file_contexts[path] | |
return context.dependencies | |
def track_command(self, command: str, args: List[str], result: Any): | |
"""Track command execution and results.""" | |
self.command_history.append({ | |
'command': command, | |
'args': args, | |
'result': result, | |
'timestamp': datetime.now(), | |
}) | |
def add_memory(self, key: str, value: Any, context: dict = None): | |
"""Add an item to the memory store.""" | |
self.memory_store[key] = MemoryItem( | |
key=key, | |
value=value, | |
context=context or {}, | |
timestamp=datetime.now() | |
) | |
def get_memory(self, key: str) -> Any: | |
"""Retrieve an item from memory.""" | |
item = self.memory_store.get(key) | |
return item.value if item else None | |
class FileOperationManager: | |
"""Manages multi-file operations and tracking.""" | |
def __init__(self, context_manager: ContextManager): | |
self.context_manager = context_manager | |
self.pending_changes: Dict[Path, str] = {} | |
async def edit_files(self, changes: Dict[Path, str]): | |
"""Apply changes to multiple files atomically.""" | |
try: | |
# Validate all changes first | |
for path, content in changes.items(): | |
if not self._validate_change(path, content): | |
raise ValueError(f"Invalid change for {path}") | |
# Apply changes | |
for path, content in changes.items(): | |
await self._apply_change(path, content) | |
# Update contexts | |
for path in changes: | |
self.context_manager.update_file_context(path) | |
except Exception as e: | |
logger.error(f"Failed to apply multi-file changes: {str(e)}") | |
raise | |
def _validate_change(self, path: Path, content: str) -> bool: | |
"""Validate a proposed file change.""" | |
try: | |
# Check file exists or can be created | |
if not path.parent.exists(): | |
path.parent.mkdir(parents=True) | |
# Validate syntax if it's a Python file | |
if path.suffix == '.py': | |
compile(content, str(path), 'exec') | |
return True | |
except Exception as e: | |
logger.error(f"Validation failed for {path}: {str(e)}") | |
return False | |
async def _apply_change(self, path: Path, content: str): | |
"""Apply a single file change.""" | |
path.write_text(content) | |
class CommandManager: | |
"""Manages command suggestions and execution.""" | |
def __init__(self, context_manager: ContextManager): | |
self.context_manager = context_manager | |
self.command_templates: Dict[str, str] = {} | |
def suggest_commands(self, context: dict) -> List[Dict]: | |
"""Suggest relevant commands based on context.""" | |
suggestions = [] | |
for cmd_name, template in self.command_templates.items(): | |
if self._is_relevant(cmd_name, context): | |
suggestions.append({ | |
'command': cmd_name, | |
'template': template, | |
'confidence': self._calculate_confidence(cmd_name, context) | |
}) | |
return sorted(suggestions, key=lambda x: x['confidence'], reverse=True) | |
async def execute_command(self, command: str, args: List[str]) -> Any: | |
"""Execute a command and track its result.""" | |
try: | |
# Execute the command | |
result = await self._run_command(command, args) | |
# Track the execution | |
self.context_manager.track_command(command, args, result) | |
return result | |
except Exception as e: | |
logger.error(f"Command execution failed: {str(e)}") | |
raise | |
def _is_relevant(self, cmd_name: str, context: dict) -> bool: | |
"""Determine if a command is relevant to the current context.""" | |
# Implementation depends on specific rules | |
return True | |
def _calculate_confidence(self, cmd_name: str, context: dict) -> float: | |
"""Calculate confidence score for a command suggestion.""" | |
# Implementation depends on specific metrics | |
return 1.0 | |
class RuleSystem: | |
"""Manages system rules and constraints.""" | |
def __init__(self): | |
self.rules: Dict[str, callable] = {} | |
self.constraints: Dict[str, callable] = {} | |
def add_rule(self, name: str, rule_func: callable): | |
"""Add a new rule to the system.""" | |
self.rules[name] = rule_func | |
def add_constraint(self, name: str, constraint_func: callable): | |
"""Add a new constraint to the system.""" | |
self.constraints[name] = constraint_func | |
def evaluate_rules(self, context: dict) -> Dict[str, bool]: | |
"""Evaluate all rules against the current context.""" | |
return {name: rule(context) for name, rule in self.rules.items()} | |
def check_constraints(self, context: dict) -> Dict[str, bool]: | |
"""Check all constraints against the current context.""" | |
return {name: constraint(context) for name, constraint in self.constraints.items()} | |
class ProjectBuilder: | |
"""Handles autonomous creation of project files and folders.""" | |
def __init__(self, base_path: Path): | |
self.base_path = Path(base_path) | |
self.current_build = None | |
self.file_manifest = [] | |
async def create_project(self, app_name: str, structure: dict) -> Path: | |
"""Create a new project with the specified structure.""" | |
try: | |
# Create temporary build directory | |
build_dir = Path(tempfile.mkdtemp()) | |
self.current_build = build_dir / app_name | |
self.current_build.mkdir(parents=True) | |
# Create project structure | |
await self._create_structure(self.current_build, structure) | |
return self.current_build | |
except Exception as e: | |
logger.error(f"Project creation failed: {str(e)}") | |
if self.current_build and self.current_build.exists(): | |
shutil.rmtree(self.current_build) | |
raise | |
async def _create_structure(self, parent: Path, structure: dict): | |
"""Recursively create project structure.""" | |
for name, content in structure.items(): | |
path = parent / name | |
if isinstance(content, dict): | |
path.mkdir(exist_ok=True) | |
await self._create_structure(path, content) | |
else: | |
path.write_text(str(content)) | |
self.file_manifest.append(path) | |
class OutputManager: | |
"""Manages project outputs and creates downloadable artifacts.""" | |
def __init__(self, project_builder: ProjectBuilder): | |
self.project_builder = project_builder | |
self.output_dir = Path(tempfile.mkdtemp()) | |
self.downloads = {} | |
def create_download(self, app_name: str) -> str: | |
"""Create a downloadable zip file of the project.""" | |
try: | |
if not self.project_builder.current_build: | |
raise ValueError("No project has been built yet") | |
# Create zip file | |
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
zip_name = f"{app_name}_{timestamp}.zip" | |
zip_path = self.output_dir / zip_name | |
with ZipFile(zip_path, 'w') as zipf: | |
for file_path in self.project_builder.file_manifest: | |
rel_path = file_path.relative_to(self.project_builder.current_build) | |
zipf.write(file_path, rel_path) | |
# Store download info | |
self.downloads[zip_name] = { | |
'path': zip_path, | |
'created_at': datetime.now(), | |
'size': zip_path.stat().st_size | |
} | |
return str(zip_path) | |
except Exception as e: | |
logger.error(f"Failed to create download: {str(e)}") | |
raise | |
class EnhancedAIFlow(AIFlow): | |
"""Enhanced AI Flow with project building and output management.""" | |
def __init__(self): | |
super().__init__() | |
self.project_builder = ProjectBuilder(Path(tempfile.mkdtemp())) | |
self.output_manager = OutputManager(self.project_builder) | |
self.context_manager = ContextManager() | |
self.file_manager = FileOperationManager(self.context_manager) | |
self.command_manager = CommandManager(self.context_manager) | |
self.rule_system = RuleSystem() | |
async def execute_flow(self, requirements: str): | |
"""Execute the AI Flow and build the project.""" | |
try: | |
# Execute normal flow | |
results = await super().execute_flow(requirements) | |
# Extract app name and generate documentation | |
app_name = requirements.split()[0].lower().replace(" ", "_") | |
# Create basic project structure | |
structure = { | |
"frontend": { | |
"src": { | |
"components": {}, | |
"pages": {}, | |
"styles": {}, | |
}, | |
"package.json": "{\n \"name\": \"frontend\",\n \"version\": \"1.0.0\"\n}", | |
"README.md": "# Frontend\n" | |
}, | |
"backend": { | |
"src": { | |
"routes": {}, | |
"models": {}, | |
"controllers": {}, | |
}, | |
"requirements.txt": "fastapi\nuvicorn\n", | |
"README.md": "# Backend\n" | |
}, | |
"README.md": f"# {app_name}\nGenerated by AI Flow\n" | |
} | |
# Build the project | |
await self.project_builder.create_project(app_name, structure) | |
# Create download | |
download_path = self.output_manager.create_download(app_name) | |
# Add download information to results | |
results += f""" | |
## Download | |
Your project has been created and is ready for download: | |
- File: {Path(download_path).name} | |
- Size: {self.output_manager.downloads[Path(download_path).name]['size'] / 1024:.1f} KB | |
- Created: {self.output_manager.downloads[Path(download_path).name]['created_at'].strftime('%Y-%m-%d %H:%M:%S')} | |
To download your project, use this path: {download_path} | |
## Project Structure | |
The following files have been created: | |
""" | |
for file_path in self.project_builder.file_manifest: | |
rel_path = file_path.relative_to(self.project_builder.current_build) | |
results += f"- {rel_path}\n" | |
return results | |
except Exception as e: | |
logger.error(f"Enhanced flow execution failed: {str(e)}") | |
raise | |
async def _execute_agent(self, role: AgentRole): | |
"""Execute a single agent's tasks with enhanced context.""" | |
context = self.contexts[role] | |
context.state = FlowState.RUNNING | |
try: | |
# Get agent-specific prompt with context | |
prompt = self._get_agent_prompt(role) | |
# Add current context to prompt | |
prompt += f"\n\nContext:\n{json.dumps(self.context_manager.global_context, indent=2)}" | |
# Execute agent's task | |
result = await self._execute_agent_task(role, prompt) | |
# Store results with context | |
context.artifacts["output"] = result | |
context.state = FlowState.COMPLETED | |
# Update memory | |
self.context_manager.add_memory( | |
f"agent_result_{role.value}", | |
result, | |
{"role": role.value, "timestamp": datetime.now()} | |
) | |
except Exception as e: | |
context.state = FlowState.FAILED | |
context.feedback.append(str(e)) | |
raise | |
# Update the multi_agent_workflow function to use AI Flows | |
async def multi_agent_workflow(requirements: str) -> str: | |
""" | |
Execute a multi-agent workflow using AI Flows to generate a complex app. | |
Args: | |
requirements (str): App requirements. | |
Returns: | |
str: Generated app code and API details. | |
""" | |
try: | |
# Initialize and execute AI Flow | |
flow = EnhancedAIFlow() | |
results = await flow.execute_flow(requirements) | |
# Extract app name and generate documentation | |
app_name = requirements.split()[0].lower().replace(" ", "_") | |
# Generate project structure and documentation | |
project_structure = generate_project_structure(app_name, flow.contexts[AgentRole.ARCHITECT].artifacts) | |
documentation = generate_documentation(app_name, requirements, flow.contexts[AgentRole.DOCUMENTATION_WRITER].artifacts) | |
return f""" | |
# {app_name.title()} - Generated Application | |
## Project Structure | |
``` | |
{project_structure} | |
``` | |
{results} | |
## Documentation | |
{documentation} | |
## Next Steps | |
1. Review the generated architecture and components | |
2. Set up the development environment | |
3. Implement the components following the provided structure | |
4. Run the test suite | |
5. Deploy using the provided configurations | |
## Support | |
For any issues or questions, please refer to the documentation or create an issue in the repository. | |
""" | |
except Exception as e: | |
error_msg = f"Workflow failed: {str(e)}" | |
logger.error(error_msg) | |
return error_msg | |
finally: | |
if torch.cuda.is_available(): | |
torch.cuda.empty_cache() | |
# Update the app_generator function to handle async execution | |
def app_generator(requirements: str): | |
""" | |
Generate an app based on the provided requirements using AI Flows. | |
Args: | |
requirements (str): App requirements. | |
Returns: | |
str: Generated app code and API details. | |
""" | |
if not requirements or len(requirements.strip()) == 0: | |
return "Please provide app requirements." | |
try: | |
# Run the async workflow in a new event loop | |
loop = asyncio.new_event_loop() | |
asyncio.set_event_loop(loop) | |
return loop.run_until_complete(multi_agent_workflow(requirements)) | |
except Exception as e: | |
error_msg = f"App generation failed: {str(e)}" | |
logger.error(error_msg) | |
return error_msg | |
finally: | |
loop.close() | |
# Gradio UI | |
with gr.Blocks() as ui: | |
gr.Markdown("# Autonomous App Generator with LangChain Agents") | |
gr.Markdown(""" | |
## Instructions | |
1. Describe the app you want to build in detail | |
2. Include any specific requirements or features | |
3. Click 'Generate App' to start the process | |
4. Download your generated app from the provided link | |
""") | |
with gr.Row(): | |
requirements_input = gr.Textbox( | |
label="App Requirements", | |
placeholder="Describe the app you want to build...", | |
lines=5 | |
) | |
generate_button = gr.Button("Generate App") | |
with gr.Row(): | |
output = gr.Textbox( | |
label="Generated App Details", | |
lines=20 | |
) | |
file_output = gr.File( | |
label="Download Generated App", | |
interactive=False | |
) | |
def process_output(requirements): | |
try: | |
# Generate the app | |
result = app_generator(requirements) | |
# Extract download path from the result | |
download_path = None | |
for line in result.split('\n'): | |
if line.startswith("To download your project, use this path:"): | |
download_path = line.split(": ")[1].strip() | |
break | |
if download_path and Path(download_path).exists(): | |
return result, download_path | |
else: | |
return result, None | |
except Exception as e: | |
error_msg = f"Failed to generate app: {str(e)}" | |
logger.error(error_msg) | |
return error_msg, None | |
generate_button.click( | |
process_output, | |
inputs=[requirements_input], | |
outputs=[output, file_output] | |
) | |
# Run the Gradio app | |
if __name__ == "__main__": | |
try: | |
ui.launch() | |
except Exception as e: | |
logger.error(f"Failed to launch Gradio interface: {str(e)}") |