Home_Design_Agent / user_state_manager.py
wangzerui's picture
init
10b617b
#!/usr/bin/env python3
"""
User State Manager for Residential Architecture Assistant
Handles saving, loading, and managing user conversation states with timestamps.
Provides persistent storage and history functionality.
"""
import json
import os
from datetime import datetime
from typing import Dict, List, Optional, Any
from state import ConversationState
import uuid
import hashlib
class UserStateManager:
"""Manages user conversation states with persistent JSON storage"""
def __init__(self, storage_dir: str = "user_conversations"):
self.storage_dir = storage_dir
self.ensure_storage_directory()
self.current_session_id = None
self.current_user_id = None
def ensure_storage_directory(self):
"""Create storage directory if it doesn't exist"""
if not os.path.exists(self.storage_dir):
os.makedirs(self.storage_dir)
def generate_user_id(self, ip_address: str = None) -> str:
"""Generate a unique user ID based on session or IP"""
if ip_address:
# Create consistent user ID from IP
return hashlib.md5(ip_address.encode()).hexdigest()[:12]
else:
# Generate random session-based ID
return str(uuid.uuid4())[:12]
def start_new_session(self, user_id: str = None, ip_address: str = None) -> str:
"""Start a new conversation session"""
if not user_id:
user_id = self.generate_user_id(ip_address)
self.current_user_id = user_id
self.current_session_id = str(uuid.uuid4())
return self.current_session_id
def save_user_state(self, state: ConversationState, user_id: str = None, session_id: str = None) -> str:
"""Save user state with timestamp"""
if not user_id:
user_id = self.current_user_id or self.generate_user_id()
if not session_id:
session_id = self.current_session_id or str(uuid.uuid4())
timestamp = datetime.now().isoformat()
# Create save record
save_record = {
"user_id": user_id,
"session_id": session_id,
"timestamp": timestamp,
"state": self._serialize_state(state),
"summary": self._create_state_summary(state)
}
# Save to user-specific file
user_file = os.path.join(self.storage_dir, f"user_{user_id}.json")
# Load existing conversations or create new
if os.path.exists(user_file):
with open(user_file, 'r', encoding='utf-8') as f:
user_data = json.load(f)
else:
user_data = {
"user_id": user_id,
"created": timestamp,
"conversations": []
}
# Add or update conversation
conversation_exists = False
for i, conv in enumerate(user_data["conversations"]):
if conv["session_id"] == session_id:
user_data["conversations"][i] = save_record
conversation_exists = True
break
if not conversation_exists:
user_data["conversations"].append(save_record)
# Sort conversations by timestamp (newest first)
user_data["conversations"].sort(key=lambda x: x["timestamp"], reverse=True)
# Save to file
with open(user_file, 'w', encoding='utf-8') as f:
json.dump(user_data, f, indent=2, ensure_ascii=False)
self.current_user_id = user_id
self.current_session_id = session_id
return session_id
def load_user_state(self, user_id: str, session_id: str = None) -> Optional[ConversationState]:
"""Load user state by user_id and optional session_id"""
user_file = os.path.join(self.storage_dir, f"user_{user_id}.json")
if not os.path.exists(user_file):
return None
with open(user_file, 'r', encoding='utf-8') as f:
user_data = json.load(f)
conversations = user_data.get("conversations", [])
if session_id:
# Load specific session
for conv in conversations:
if conv["session_id"] == session_id:
return self._deserialize_state(conv["state"])
else:
# Load most recent conversation
if conversations:
return self._deserialize_state(conversations[0]["state"])
return None
def get_user_history(self, user_id: str) -> List[Dict[str, Any]]:
"""Get conversation history for a user"""
user_file = os.path.join(self.storage_dir, f"user_{user_id}.json")
if not os.path.exists(user_file):
return []
with open(user_file, 'r', encoding='utf-8') as f:
user_data = json.load(f)
return user_data.get("conversations", [])
def get_all_users(self) -> List[Dict[str, Any]]:
"""Get summary of all users and their conversations"""
users = []
for filename in os.listdir(self.storage_dir):
if filename.startswith("user_") and filename.endswith(".json"):
user_id = filename[5:-5] # Remove "user_" prefix and ".json" suffix
try:
user_file = os.path.join(self.storage_dir, filename)
with open(user_file, 'r', encoding='utf-8') as f:
user_data = json.load(f)
conversations = user_data.get("conversations", [])
users.append({
"user_id": user_id,
"created": user_data.get("created", "Unknown"),
"total_conversations": len(conversations),
"last_activity": conversations[0]["timestamp"] if conversations else "Never",
"latest_summary": conversations[0].get("summary", {}) if conversations else {}
})
except Exception as e:
print(f"Error reading user file {filename}: {e}")
continue
# Sort by last activity
users.sort(key=lambda x: x["last_activity"], reverse=True)
return users
def search_conversations_by_time(self, start_time: str, end_time: str = None) -> List[Dict[str, Any]]:
"""Search conversations within a time range"""
results = []
for filename in os.listdir(self.storage_dir):
if not filename.startswith("user_") or not filename.endswith(".json"):
continue
try:
user_file = os.path.join(self.storage_dir, filename)
with open(user_file, 'r', encoding='utf-8') as f:
user_data = json.load(f)
conversations = user_data.get("conversations", [])
for conv in conversations:
conv_time = conv["timestamp"]
if end_time:
if start_time <= conv_time <= end_time:
results.append({
"user_id": conv["user_id"],
"session_id": conv["session_id"],
"timestamp": conv_time,
"summary": conv.get("summary", {})
})
else:
if conv_time >= start_time:
results.append({
"user_id": conv["user_id"],
"session_id": conv["session_id"],
"timestamp": conv_time,
"summary": conv.get("summary", {})
})
except Exception as e:
print(f"Error searching in file {filename}: {e}")
continue
# Sort by timestamp
results.sort(key=lambda x: x["timestamp"], reverse=True)
return results
def delete_user_data(self, user_id: str) -> bool:
"""Delete all data for a specific user"""
user_file = os.path.join(self.storage_dir, f"user_{user_id}.json")
if os.path.exists(user_file):
try:
os.remove(user_file)
return True
except Exception as e:
print(f"Error deleting user data: {e}")
return False
return False
def _serialize_state(self, state: ConversationState) -> Dict[str, Any]:
"""Convert ConversationState to JSON-serializable format"""
# Create a deep copy of the state for serialization
serialized = {}
for key, value in state.items():
if isinstance(value, (dict, list, str, int, float, bool)) or value is None:
serialized[key] = value
else:
# Convert complex objects to string representation
serialized[key] = str(value)
return serialized
def _deserialize_state(self, serialized_state: Dict[str, Any]) -> ConversationState:
"""Convert JSON data back to ConversationState"""
from graph import create_initial_state
# Start with fresh state structure
state = create_initial_state()
# Update with saved values
for key, value in serialized_state.items():
if key in state:
state[key] = value
return state
def _create_state_summary(self, state: ConversationState) -> Dict[str, Any]:
"""Create a summary of the conversation state for quick reference"""
summary = {
"total_messages": len(state.get("messages", [])),
"current_topic": state.get("current_topic"),
"user_requirements": {},
"floorplan_status": {},
"project_progress": {}
}
# User requirements summary
user_reqs = state.get("user_requirements", {})
if user_reqs.get("budget"):
summary["user_requirements"]["budget"] = f"${user_reqs['budget']:,.0f}"
if user_reqs.get("location"):
summary["user_requirements"]["location"] = user_reqs["location"]
if user_reqs.get("family_size"):
summary["user_requirements"]["family_size"] = user_reqs["family_size"]
# Floorplan status
floorplan_reqs = state.get("floorplan_requirements", {})
if floorplan_reqs.get("total_sqft"):
summary["floorplan_status"]["size"] = f"{floorplan_reqs['total_sqft']} sq ft"
if floorplan_reqs.get("num_floors"):
summary["floorplan_status"]["floors"] = floorplan_reqs["num_floors"]
if floorplan_reqs.get("rooms"):
rooms = floorplan_reqs["rooms"]
room_summary = ", ".join([f"{r['count']}x {r['type']}" for r in rooms])
summary["floorplan_status"]["rooms"] = room_summary
# Project progress
completed_phases = []
if state.get("detailed_floorplan", {}).get("detailed_rooms"):
completed_phases.append("Architectural Design")
if state.get("budget_breakdown", {}).get("total_construction_cost"):
completed_phases.append("Budget Analysis")
if state.get("agent_memory", {}).get("structural_analysis"):
completed_phases.append("Structural Analysis")
if state.get("agent_memory", {}).get("sustainability"):
completed_phases.append("Sustainability Review")
if state.get("agent_memory", {}).get("permits"):
completed_phases.append("Permit Planning")
if state.get("agent_memory", {}).get("interior_design"):
completed_phases.append("Interior Design")
summary["project_progress"]["completed_phases"] = completed_phases
summary["project_progress"]["completion_percentage"] = int((len(completed_phases) / 6) * 100)
return summary
# Global instance for easy access
user_state_manager = UserStateManager()