Spaces:
Runtime error
Runtime error
#!/usr/bin/env python3 | |
""" | |
User State Manager for Residential Architecture Assistant | |
Handles saving, loading, and managing user conversation states with timestamps. | |
Provides persistent storage and history functionality. | |
""" | |
import json | |
import os | |
from datetime import datetime | |
from typing import Dict, List, Optional, Any | |
from state import ConversationState | |
import uuid | |
import hashlib | |
class UserStateManager: | |
"""Manages user conversation states with persistent JSON storage""" | |
def __init__(self, storage_dir: str = "user_conversations"): | |
self.storage_dir = storage_dir | |
self.ensure_storage_directory() | |
self.current_session_id = None | |
self.current_user_id = None | |
def ensure_storage_directory(self): | |
"""Create storage directory if it doesn't exist""" | |
if not os.path.exists(self.storage_dir): | |
os.makedirs(self.storage_dir) | |
def generate_user_id(self, ip_address: str = None) -> str: | |
"""Generate a unique user ID based on session or IP""" | |
if ip_address: | |
# Create consistent user ID from IP | |
return hashlib.md5(ip_address.encode()).hexdigest()[:12] | |
else: | |
# Generate random session-based ID | |
return str(uuid.uuid4())[:12] | |
def start_new_session(self, user_id: str = None, ip_address: str = None) -> str: | |
"""Start a new conversation session""" | |
if not user_id: | |
user_id = self.generate_user_id(ip_address) | |
self.current_user_id = user_id | |
self.current_session_id = str(uuid.uuid4()) | |
return self.current_session_id | |
def save_user_state(self, state: ConversationState, user_id: str = None, session_id: str = None) -> str: | |
"""Save user state with timestamp""" | |
if not user_id: | |
user_id = self.current_user_id or self.generate_user_id() | |
if not session_id: | |
session_id = self.current_session_id or str(uuid.uuid4()) | |
timestamp = datetime.now().isoformat() | |
# Create save record | |
save_record = { | |
"user_id": user_id, | |
"session_id": session_id, | |
"timestamp": timestamp, | |
"state": self._serialize_state(state), | |
"summary": self._create_state_summary(state) | |
} | |
# Save to user-specific file | |
user_file = os.path.join(self.storage_dir, f"user_{user_id}.json") | |
# Load existing conversations or create new | |
if os.path.exists(user_file): | |
with open(user_file, 'r', encoding='utf-8') as f: | |
user_data = json.load(f) | |
else: | |
user_data = { | |
"user_id": user_id, | |
"created": timestamp, | |
"conversations": [] | |
} | |
# Add or update conversation | |
conversation_exists = False | |
for i, conv in enumerate(user_data["conversations"]): | |
if conv["session_id"] == session_id: | |
user_data["conversations"][i] = save_record | |
conversation_exists = True | |
break | |
if not conversation_exists: | |
user_data["conversations"].append(save_record) | |
# Sort conversations by timestamp (newest first) | |
user_data["conversations"].sort(key=lambda x: x["timestamp"], reverse=True) | |
# Save to file | |
with open(user_file, 'w', encoding='utf-8') as f: | |
json.dump(user_data, f, indent=2, ensure_ascii=False) | |
self.current_user_id = user_id | |
self.current_session_id = session_id | |
return session_id | |
def load_user_state(self, user_id: str, session_id: str = None) -> Optional[ConversationState]: | |
"""Load user state by user_id and optional session_id""" | |
user_file = os.path.join(self.storage_dir, f"user_{user_id}.json") | |
if not os.path.exists(user_file): | |
return None | |
with open(user_file, 'r', encoding='utf-8') as f: | |
user_data = json.load(f) | |
conversations = user_data.get("conversations", []) | |
if session_id: | |
# Load specific session | |
for conv in conversations: | |
if conv["session_id"] == session_id: | |
return self._deserialize_state(conv["state"]) | |
else: | |
# Load most recent conversation | |
if conversations: | |
return self._deserialize_state(conversations[0]["state"]) | |
return None | |
def get_user_history(self, user_id: str) -> List[Dict[str, Any]]: | |
"""Get conversation history for a user""" | |
user_file = os.path.join(self.storage_dir, f"user_{user_id}.json") | |
if not os.path.exists(user_file): | |
return [] | |
with open(user_file, 'r', encoding='utf-8') as f: | |
user_data = json.load(f) | |
return user_data.get("conversations", []) | |
def get_all_users(self) -> List[Dict[str, Any]]: | |
"""Get summary of all users and their conversations""" | |
users = [] | |
for filename in os.listdir(self.storage_dir): | |
if filename.startswith("user_") and filename.endswith(".json"): | |
user_id = filename[5:-5] # Remove "user_" prefix and ".json" suffix | |
try: | |
user_file = os.path.join(self.storage_dir, filename) | |
with open(user_file, 'r', encoding='utf-8') as f: | |
user_data = json.load(f) | |
conversations = user_data.get("conversations", []) | |
users.append({ | |
"user_id": user_id, | |
"created": user_data.get("created", "Unknown"), | |
"total_conversations": len(conversations), | |
"last_activity": conversations[0]["timestamp"] if conversations else "Never", | |
"latest_summary": conversations[0].get("summary", {}) if conversations else {} | |
}) | |
except Exception as e: | |
print(f"Error reading user file {filename}: {e}") | |
continue | |
# Sort by last activity | |
users.sort(key=lambda x: x["last_activity"], reverse=True) | |
return users | |
def search_conversations_by_time(self, start_time: str, end_time: str = None) -> List[Dict[str, Any]]: | |
"""Search conversations within a time range""" | |
results = [] | |
for filename in os.listdir(self.storage_dir): | |
if not filename.startswith("user_") or not filename.endswith(".json"): | |
continue | |
try: | |
user_file = os.path.join(self.storage_dir, filename) | |
with open(user_file, 'r', encoding='utf-8') as f: | |
user_data = json.load(f) | |
conversations = user_data.get("conversations", []) | |
for conv in conversations: | |
conv_time = conv["timestamp"] | |
if end_time: | |
if start_time <= conv_time <= end_time: | |
results.append({ | |
"user_id": conv["user_id"], | |
"session_id": conv["session_id"], | |
"timestamp": conv_time, | |
"summary": conv.get("summary", {}) | |
}) | |
else: | |
if conv_time >= start_time: | |
results.append({ | |
"user_id": conv["user_id"], | |
"session_id": conv["session_id"], | |
"timestamp": conv_time, | |
"summary": conv.get("summary", {}) | |
}) | |
except Exception as e: | |
print(f"Error searching in file {filename}: {e}") | |
continue | |
# Sort by timestamp | |
results.sort(key=lambda x: x["timestamp"], reverse=True) | |
return results | |
def delete_user_data(self, user_id: str) -> bool: | |
"""Delete all data for a specific user""" | |
user_file = os.path.join(self.storage_dir, f"user_{user_id}.json") | |
if os.path.exists(user_file): | |
try: | |
os.remove(user_file) | |
return True | |
except Exception as e: | |
print(f"Error deleting user data: {e}") | |
return False | |
return False | |
def _serialize_state(self, state: ConversationState) -> Dict[str, Any]: | |
"""Convert ConversationState to JSON-serializable format""" | |
# Create a deep copy of the state for serialization | |
serialized = {} | |
for key, value in state.items(): | |
if isinstance(value, (dict, list, str, int, float, bool)) or value is None: | |
serialized[key] = value | |
else: | |
# Convert complex objects to string representation | |
serialized[key] = str(value) | |
return serialized | |
def _deserialize_state(self, serialized_state: Dict[str, Any]) -> ConversationState: | |
"""Convert JSON data back to ConversationState""" | |
from graph import create_initial_state | |
# Start with fresh state structure | |
state = create_initial_state() | |
# Update with saved values | |
for key, value in serialized_state.items(): | |
if key in state: | |
state[key] = value | |
return state | |
def _create_state_summary(self, state: ConversationState) -> Dict[str, Any]: | |
"""Create a summary of the conversation state for quick reference""" | |
summary = { | |
"total_messages": len(state.get("messages", [])), | |
"current_topic": state.get("current_topic"), | |
"user_requirements": {}, | |
"floorplan_status": {}, | |
"project_progress": {} | |
} | |
# User requirements summary | |
user_reqs = state.get("user_requirements", {}) | |
if user_reqs.get("budget"): | |
summary["user_requirements"]["budget"] = f"${user_reqs['budget']:,.0f}" | |
if user_reqs.get("location"): | |
summary["user_requirements"]["location"] = user_reqs["location"] | |
if user_reqs.get("family_size"): | |
summary["user_requirements"]["family_size"] = user_reqs["family_size"] | |
# Floorplan status | |
floorplan_reqs = state.get("floorplan_requirements", {}) | |
if floorplan_reqs.get("total_sqft"): | |
summary["floorplan_status"]["size"] = f"{floorplan_reqs['total_sqft']} sq ft" | |
if floorplan_reqs.get("num_floors"): | |
summary["floorplan_status"]["floors"] = floorplan_reqs["num_floors"] | |
if floorplan_reqs.get("rooms"): | |
rooms = floorplan_reqs["rooms"] | |
room_summary = ", ".join([f"{r['count']}x {r['type']}" for r in rooms]) | |
summary["floorplan_status"]["rooms"] = room_summary | |
# Project progress | |
completed_phases = [] | |
if state.get("detailed_floorplan", {}).get("detailed_rooms"): | |
completed_phases.append("Architectural Design") | |
if state.get("budget_breakdown", {}).get("total_construction_cost"): | |
completed_phases.append("Budget Analysis") | |
if state.get("agent_memory", {}).get("structural_analysis"): | |
completed_phases.append("Structural Analysis") | |
if state.get("agent_memory", {}).get("sustainability"): | |
completed_phases.append("Sustainability Review") | |
if state.get("agent_memory", {}).get("permits"): | |
completed_phases.append("Permit Planning") | |
if state.get("agent_memory", {}).get("interior_design"): | |
completed_phases.append("Interior Design") | |
summary["project_progress"]["completed_phases"] = completed_phases | |
summary["project_progress"]["completion_percentage"] = int((len(completed_phases) / 6) * 100) | |
return summary | |
# Global instance for easy access | |
user_state_manager = UserStateManager() |