AI_Agent_Server / backend /cache_utils.py
Sidreds06's picture
bug fixes
81c40fc
raw
history blame
4.94 kB
import time
from typing import Dict, Any, Optional
from backend.credentials import setup_google_credentials
setup_google_credentials()
# Simple in-memory cache
user_cache: Dict[str, Dict[str, Any]] = {}
routing_cache: Dict[str, str] = {}
CACHE_DURATION = 300 # 5 minutes in seconds
ROUTING_CACHE_DURATION = 1800 # 30 minutes for routing
def get_cache_key(user_id: str) -> str:
"""Generate cache key for user data"""
return f"user_data:{user_id}"
def is_cache_valid(cache_entry: Dict[str, Any], duration: int = CACHE_DURATION) -> bool:
"""Check if cache entry is still valid"""
current_time = time.time()
return current_time - cache_entry['timestamp'] < duration
def get_cached_user_data(user_id: str) -> Optional[Dict[str, Any]]:
"""Retrieve cached user data if valid"""
cache_key = get_cache_key(user_id)
if cache_key in user_cache:
cache_entry = user_cache[cache_key]
if is_cache_valid(cache_entry):
return cache_entry['data']
else:
# Remove expired entry
del user_cache[cache_key]
return None
def cache_user_data(user_id: str, data: Dict[str, Any]) -> None:
"""Cache user data with timestamp"""
cache_key = get_cache_key(user_id)
user_cache[cache_key] = {
'data': data,
'timestamp': time.time()
}
print(f"[CACHE] Cached user data for user: {user_id}")
def get_cached_route(user_message: str) -> Optional[str]:
"""Get cached routing decision"""
# Create a simple hash of the message for caching
import hashlib
message_hash = hashlib.md5(user_message.lower().strip().encode()).hexdigest()
if message_hash in routing_cache:
cache_entry = routing_cache[message_hash]
if is_cache_valid(cache_entry, ROUTING_CACHE_DURATION):
return cache_entry['route']
else:
del routing_cache[message_hash]
return None
def cache_route(user_message: str, route: str) -> None:
"""Cache routing decision"""
import hashlib
message_hash = hashlib.md5(user_message.lower().strip().encode()).hexdigest()
routing_cache[message_hash] = {
'route': route,
'timestamp': time.time()
}
print(f"[CACHE] Cached route '{route}' for message hash: {message_hash[:8]}...")
def clear_user_cache(user_id: str = None) -> None:
"""Clear cache for specific user or all users"""
global user_cache
if user_id:
cache_key = get_cache_key(user_id)
user_cache.pop(cache_key, None)
print(f"[CACHE] Cleared cache for user: {user_id}")
else:
user_cache.clear()
print("[CACHE] Cleared all user cache")
def get_cache_stats() -> Dict[str, Any]:
"""Get cache statistics"""
current_time = time.time()
valid_user_entries = 0
expired_user_entries = 0
valid_routing_entries = 0
expired_routing_entries = 0
# User cache stats
for entry in user_cache.values():
if current_time - entry['timestamp'] < CACHE_DURATION:
valid_user_entries += 1
else:
expired_user_entries += 1
# Routing cache stats
for entry in routing_cache.values():
if current_time - entry['timestamp'] < ROUTING_CACHE_DURATION:
valid_routing_entries += 1
else:
expired_routing_entries += 1
return {
"user_cache": {
"total_entries": len(user_cache),
"valid_entries": valid_user_entries,
"expired_entries": expired_user_entries,
"cache_duration_seconds": CACHE_DURATION
},
"routing_cache": {
"total_entries": len(routing_cache),
"valid_entries": valid_routing_entries,
"expired_entries": expired_routing_entries,
"cache_duration_seconds": ROUTING_CACHE_DURATION
}
}
def cleanup_expired_cache() -> Dict[str, int]:
"""Remove expired cache entries and return count removed"""
global user_cache, routing_cache
current_time = time.time()
# Clean user cache
expired_user_keys = []
for key, entry in user_cache.items():
if current_time - entry['timestamp'] >= CACHE_DURATION:
expired_user_keys.append(key)
for key in expired_user_keys:
del user_cache[key]
# Clean routing cache
expired_routing_keys = []
for key, entry in routing_cache.items():
if current_time - entry['timestamp'] >= ROUTING_CACHE_DURATION:
expired_routing_keys.append(key)
for key in expired_routing_keys:
del routing_cache[key]
if expired_user_keys or expired_routing_keys:
print(f"[CACHE] Cleaned up {len(expired_user_keys)} user entries and {len(expired_routing_keys)} routing entries")
return {
"user_entries_removed": len(expired_user_keys),
"routing_entries_removed": len(expired_routing_keys)
}