#!/usr/bin/env python3 """ User State Manager for Residential Architecture Assistant Handles saving, loading, and managing user conversation states with timestamps. Provides persistent storage and history functionality. """ import json import os from datetime import datetime from typing import Dict, List, Optional, Any from state import ConversationState import uuid import hashlib class UserStateManager: """Manages user conversation states with persistent JSON storage""" def __init__(self, storage_dir: str = "user_conversations"): self.storage_dir = storage_dir self.ensure_storage_directory() self.current_session_id = None self.current_user_id = None def ensure_storage_directory(self): """Create storage directory if it doesn't exist""" if not os.path.exists(self.storage_dir): os.makedirs(self.storage_dir) def generate_user_id(self, ip_address: str = None) -> str: """Generate a unique user ID based on session or IP""" if ip_address: # Create consistent user ID from IP return hashlib.md5(ip_address.encode()).hexdigest()[:12] else: # Generate random session-based ID return str(uuid.uuid4())[:12] def start_new_session(self, user_id: str = None, ip_address: str = None) -> str: """Start a new conversation session""" if not user_id: user_id = self.generate_user_id(ip_address) self.current_user_id = user_id self.current_session_id = str(uuid.uuid4()) return self.current_session_id def save_user_state(self, state: ConversationState, user_id: str = None, session_id: str = None) -> str: """Save user state with timestamp""" if not user_id: user_id = self.current_user_id or self.generate_user_id() if not session_id: session_id = self.current_session_id or str(uuid.uuid4()) timestamp = datetime.now().isoformat() # Create save record save_record = { "user_id": user_id, "session_id": session_id, "timestamp": timestamp, "state": self._serialize_state(state), "summary": self._create_state_summary(state) } # Save to user-specific file user_file = os.path.join(self.storage_dir, f"user_{user_id}.json") # Load existing conversations or create new if os.path.exists(user_file): with open(user_file, 'r', encoding='utf-8') as f: user_data = json.load(f) else: user_data = { "user_id": user_id, "created": timestamp, "conversations": [] } # Add or update conversation conversation_exists = False for i, conv in enumerate(user_data["conversations"]): if conv["session_id"] == session_id: user_data["conversations"][i] = save_record conversation_exists = True break if not conversation_exists: user_data["conversations"].append(save_record) # Sort conversations by timestamp (newest first) user_data["conversations"].sort(key=lambda x: x["timestamp"], reverse=True) # Save to file with open(user_file, 'w', encoding='utf-8') as f: json.dump(user_data, f, indent=2, ensure_ascii=False) self.current_user_id = user_id self.current_session_id = session_id return session_id def load_user_state(self, user_id: str, session_id: str = None) -> Optional[ConversationState]: """Load user state by user_id and optional session_id""" user_file = os.path.join(self.storage_dir, f"user_{user_id}.json") if not os.path.exists(user_file): return None with open(user_file, 'r', encoding='utf-8') as f: user_data = json.load(f) conversations = user_data.get("conversations", []) if session_id: # Load specific session for conv in conversations: if conv["session_id"] == session_id: return self._deserialize_state(conv["state"]) else: # Load most recent conversation if conversations: return self._deserialize_state(conversations[0]["state"]) return None def get_user_history(self, user_id: str) -> List[Dict[str, Any]]: """Get conversation history for a user""" user_file = os.path.join(self.storage_dir, f"user_{user_id}.json") if not os.path.exists(user_file): return [] with open(user_file, 'r', encoding='utf-8') as f: user_data = json.load(f) return user_data.get("conversations", []) def get_all_users(self) -> List[Dict[str, Any]]: """Get summary of all users and their conversations""" users = [] for filename in os.listdir(self.storage_dir): if filename.startswith("user_") and filename.endswith(".json"): user_id = filename[5:-5] # Remove "user_" prefix and ".json" suffix try: user_file = os.path.join(self.storage_dir, filename) with open(user_file, 'r', encoding='utf-8') as f: user_data = json.load(f) conversations = user_data.get("conversations", []) users.append({ "user_id": user_id, "created": user_data.get("created", "Unknown"), "total_conversations": len(conversations), "last_activity": conversations[0]["timestamp"] if conversations else "Never", "latest_summary": conversations[0].get("summary", {}) if conversations else {} }) except Exception as e: print(f"Error reading user file {filename}: {e}") continue # Sort by last activity users.sort(key=lambda x: x["last_activity"], reverse=True) return users def search_conversations_by_time(self, start_time: str, end_time: str = None) -> List[Dict[str, Any]]: """Search conversations within a time range""" results = [] for filename in os.listdir(self.storage_dir): if not filename.startswith("user_") or not filename.endswith(".json"): continue try: user_file = os.path.join(self.storage_dir, filename) with open(user_file, 'r', encoding='utf-8') as f: user_data = json.load(f) conversations = user_data.get("conversations", []) for conv in conversations: conv_time = conv["timestamp"] if end_time: if start_time <= conv_time <= end_time: results.append({ "user_id": conv["user_id"], "session_id": conv["session_id"], "timestamp": conv_time, "summary": conv.get("summary", {}) }) else: if conv_time >= start_time: results.append({ "user_id": conv["user_id"], "session_id": conv["session_id"], "timestamp": conv_time, "summary": conv.get("summary", {}) }) except Exception as e: print(f"Error searching in file {filename}: {e}") continue # Sort by timestamp results.sort(key=lambda x: x["timestamp"], reverse=True) return results def delete_user_data(self, user_id: str) -> bool: """Delete all data for a specific user""" user_file = os.path.join(self.storage_dir, f"user_{user_id}.json") if os.path.exists(user_file): try: os.remove(user_file) return True except Exception as e: print(f"Error deleting user data: {e}") return False return False def _serialize_state(self, state: ConversationState) -> Dict[str, Any]: """Convert ConversationState to JSON-serializable format""" # Create a deep copy of the state for serialization serialized = {} for key, value in state.items(): if isinstance(value, (dict, list, str, int, float, bool)) or value is None: serialized[key] = value else: # Convert complex objects to string representation serialized[key] = str(value) return serialized def _deserialize_state(self, serialized_state: Dict[str, Any]) -> ConversationState: """Convert JSON data back to ConversationState""" from graph import create_initial_state # Start with fresh state structure state = create_initial_state() # Update with saved values for key, value in serialized_state.items(): if key in state: state[key] = value return state def _create_state_summary(self, state: ConversationState) -> Dict[str, Any]: """Create a summary of the conversation state for quick reference""" summary = { "total_messages": len(state.get("messages", [])), "current_topic": state.get("current_topic"), "user_requirements": {}, "floorplan_status": {}, "project_progress": {} } # User requirements summary user_reqs = state.get("user_requirements", {}) if user_reqs.get("budget"): summary["user_requirements"]["budget"] = f"${user_reqs['budget']:,.0f}" if user_reqs.get("location"): summary["user_requirements"]["location"] = user_reqs["location"] if user_reqs.get("family_size"): summary["user_requirements"]["family_size"] = user_reqs["family_size"] # Floorplan status floorplan_reqs = state.get("floorplan_requirements", {}) if floorplan_reqs.get("total_sqft"): summary["floorplan_status"]["size"] = f"{floorplan_reqs['total_sqft']} sq ft" if floorplan_reqs.get("num_floors"): summary["floorplan_status"]["floors"] = floorplan_reqs["num_floors"] if floorplan_reqs.get("rooms"): rooms = floorplan_reqs["rooms"] room_summary = ", ".join([f"{r['count']}x {r['type']}" for r in rooms]) summary["floorplan_status"]["rooms"] = room_summary # Project progress completed_phases = [] if state.get("detailed_floorplan", {}).get("detailed_rooms"): completed_phases.append("Architectural Design") if state.get("budget_breakdown", {}).get("total_construction_cost"): completed_phases.append("Budget Analysis") if state.get("agent_memory", {}).get("structural_analysis"): completed_phases.append("Structural Analysis") if state.get("agent_memory", {}).get("sustainability"): completed_phases.append("Sustainability Review") if state.get("agent_memory", {}).get("permits"): completed_phases.append("Permit Planning") if state.get("agent_memory", {}).get("interior_design"): completed_phases.append("Interior Design") summary["project_progress"]["completed_phases"] = completed_phases summary["project_progress"]["completion_percentage"] = int((len(completed_phases) / 6) * 100) return summary # Global instance for easy access user_state_manager = UserStateManager()