Home_Design_Agent / graph.py
wangzerui's picture
init
10b617b
from typing import Literal
from langgraph.graph import StateGraph, END
from langchain_openai import ChatOpenAI
from state import ConversationState
from agents import (
RouterAgent,
GeneralDesignAgent,
BudgetAnalysisAgent,
FloorplanAgent,
FloorplanGeneratorAgent,
RegulationAgent
)
from detailed_budget_agent import DetailedBudgetAgent
# Removed creative_specialists - they had NoneType formatting errors
# Keeping only the core working agents
def create_initial_state() -> ConversationState:
"""Create initial conversation state with enhanced multi-agent memory"""
return {
"messages": [],
"current_topic": None,
"user_requirements": {
"budget": None,
"location": "Montreal", # Default as specified in requirements
"family_size": None,
"lifestyle_preferences": [],
"special_needs": []
},
"floorplan_requirements": {
"num_floors": None,
"total_sqft": None,
"lot_shape": None,
"lot_dimensions": None,
"rooms": []
},
"detailed_floorplan": {
"design_analysis": None,
"detailed_rooms": [],
"structural_elements": [],
"circulation_plan": {},
"lot_utilization": {},
"architectural_features": []
},
"budget_breakdown": {
"site_preparation": None,
"foundation": None,
"framing": None,
"roofing": None,
"exterior_finishes": None,
"interior_finishes": None,
"mechanical_systems": None,
"electrical_systems": None,
"plumbing_systems": None,
"permits_fees": None,
"professional_services": None,
"contingency": None,
"total_construction_cost": None,
"cost_per_sqft": None,
"budget_analysis": None
},
"conversation_history": [],
"agent_recommendations": [],
"agent_memory": {}, # Shared memory between agents
"next_agent": None,
"floorplan_ready": False,
"budget_ready": False
}
def should_generate_floorplan(state: ConversationState) -> Literal["generate_floorplan", "route_to_agent"]:
"""Decide whether to generate floorplan or continue conversation"""
if state["floorplan_ready"]:
return "generate_floorplan"
return "route_to_agent"
def should_generate_budget(state: ConversationState) -> Literal["generate_budget", "route_to_agent"]:
"""Decide whether to generate detailed budget or continue conversation"""
# Check if we have enough info for detailed budget and floorplan is designed
has_budget = state["user_requirements"]["budget"] is not None
has_floorplan_details = state["detailed_floorplan"]["detailed_rooms"] != []
if has_budget and has_floorplan_details and not state["budget_ready"]:
return "generate_budget"
return "route_to_agent"
def route_to_specialist(state: ConversationState) -> Literal[
"general_design", "budget_analysis", "floorplan", "detailed_budget", "regulation", "end"
]:
"""Route to appropriate specialist agent - core working agents plus regulation"""
next_agent = state.get("next_agent")
# Check if we should do detailed budget after floorplan
if state["detailed_floorplan"]["detailed_rooms"] and not state["budget_ready"]:
return "detailed_budget"
# Route to specialist based on agent decision - core working agents
routing_map = {
"general": "general_design",
"budget": "budget_analysis",
"floorplan": "floorplan",
"regulation": "regulation"
}
return routing_map.get(next_agent, "end")
def create_architecture_assistant_graph(model: ChatOpenAI) -> StateGraph:
"""Create the LangGraph workflow for the architecture assistant with multi-agent collaboration"""
# Initialize core working agents plus regulation
router = RouterAgent(model)
general_agent = GeneralDesignAgent(model)
budget_agent = BudgetAnalysisAgent(model)
floorplan_agent = FloorplanAgent(model)
floorplan_generator = FloorplanGeneratorAgent(model)
detailed_budget_agent = DetailedBudgetAgent(model)
regulation_agent = RegulationAgent(model)
# Create the graph
workflow = StateGraph(ConversationState)
# Add core working nodes plus regulation
workflow.add_node("router", router.process)
workflow.add_node("general_design", general_agent.process)
workflow.add_node("budget_analysis", budget_agent.process)
workflow.add_node("floorplan", floorplan_agent.process)
workflow.add_node("generate_floorplan", floorplan_generator.process)
workflow.add_node("detailed_budget", detailed_budget_agent.process)
workflow.add_node("regulation", regulation_agent.process)
# Set entry point
workflow.set_entry_point("router")
# Add conditional routing from router to core working specialists plus regulation
workflow.add_conditional_edges(
"router",
route_to_specialist,
{
"general_design": "general_design",
"budget_analysis": "budget_analysis",
"floorplan": "floorplan",
"detailed_budget": "detailed_budget",
"regulation": "regulation",
"end": END
}
)
# Add conditional routing from floorplan agent to check if ready for generation
workflow.add_conditional_edges(
"floorplan",
should_generate_floorplan,
{
"generate_floorplan": "generate_floorplan",
"route_to_agent": END
}
)
# After floorplan generation, automatically trigger detailed budget if budget available
workflow.add_conditional_edges(
"generate_floorplan",
should_generate_budget,
{
"generate_budget": "detailed_budget",
"route_to_agent": END
}
)
# All core agents end conversation after responding (user can continue with new input)
workflow.add_edge("general_design", END)
workflow.add_edge("budget_analysis", END)
workflow.add_edge("detailed_budget", END)
workflow.add_edge("regulation", END)
return workflow.compile()
class ArchitectureAssistant:
"""Main architecture assistant class with persistent state management"""
def __init__(self, openai_api_key: str, user_id: str = None):
self.model = ChatOpenAI(
api_key=openai_api_key,
model="gpt-4o-mini",
temperature=0.7
)
self.graph = create_architecture_assistant_graph(self.model)
self.state = create_initial_state()
# Initialize user state management
from user_state_manager import user_state_manager
self.state_manager = user_state_manager
self.user_id = user_id
self.session_id = None
# Start new session
if user_id:
self.session_id = self.state_manager.start_new_session(user_id)
def chat(self, user_input: str, save_state: bool = True) -> str:
"""Process user input and return response with optional state saving"""
# Add user message to state
self.state["messages"].append({
"role": "user",
"content": user_input
})
# Process through the graph
result = self.graph.invoke(self.state)
# Update state with result
self.state = result
# Save state after each interaction
if save_state and self.user_id:
self.state_manager.save_user_state(
self.state,
self.user_id,
self.session_id
)
# Return the last assistant message
assistant_messages = [msg for msg in self.state["messages"] if msg["role"] == "assistant"]
if assistant_messages:
return assistant_messages[-1]["content"]
else:
return "I'm here to help with your home design questions!"
def get_conversation_summary(self) -> dict:
"""Get a summary of the current conversation state"""
return {
"user_requirements": self.state["user_requirements"],
"floorplan_requirements": self.state["floorplan_requirements"],
"current_topic": self.state["current_topic"],
"total_messages": len(self.state["messages"])
}
def reset_conversation(self, start_new_session: bool = True):
"""Reset the conversation state"""
self.state = create_initial_state()
# Start new session if requested
if start_new_session and self.user_id:
self.session_id = self.state_manager.start_new_session(self.user_id)
def load_previous_state(self, session_id: str = None) -> bool:
"""Load a previous conversation state"""
if not self.user_id:
return False
loaded_state = self.state_manager.load_user_state(self.user_id, session_id)
if loaded_state:
self.state = loaded_state
if session_id:
self.session_id = session_id
return True
return False
def get_user_history(self) -> list:
"""Get conversation history for current user"""
if not self.user_id:
return []
return self.state_manager.get_user_history(self.user_id)
def set_user_id(self, user_id: str):
"""Set or change the user ID"""
self.user_id = user_id
self.session_id = self.state_manager.start_new_session(user_id)