Spaces:
Sleeping
Sleeping
import time | |
from typing import Dict, Any, Optional | |
from backend.credentials import setup_google_credentials | |
setup_google_credentials() | |
# Simple in-memory cache | |
user_cache: Dict[str, Dict[str, Any]] = {} | |
routing_cache: Dict[str, str] = {} | |
CACHE_DURATION = 300 # 5 minutes in seconds | |
ROUTING_CACHE_DURATION = 1800 # 30 minutes for routing | |
def get_cache_key(user_id: str) -> str: | |
"""Generate cache key for user data""" | |
return f"user_data:{user_id}" | |
def is_cache_valid(cache_entry: Dict[str, Any], duration: int = CACHE_DURATION) -> bool: | |
"""Check if cache entry is still valid""" | |
current_time = time.time() | |
return current_time - cache_entry['timestamp'] < duration | |
def get_cached_user_data(user_id: str) -> Optional[Dict[str, Any]]: | |
"""Retrieve cached user data if valid""" | |
cache_key = get_cache_key(user_id) | |
if cache_key in user_cache: | |
cache_entry = user_cache[cache_key] | |
if is_cache_valid(cache_entry): | |
return cache_entry['data'] | |
else: | |
# Remove expired entry | |
del user_cache[cache_key] | |
return None | |
def cache_user_data(user_id: str, data: Dict[str, Any]) -> None: | |
"""Cache user data with timestamp""" | |
cache_key = get_cache_key(user_id) | |
user_cache[cache_key] = { | |
'data': data, | |
'timestamp': time.time() | |
} | |
print(f"[CACHE] Cached user data for user: {user_id}") | |
def get_cached_route(user_message: str) -> Optional[str]: | |
"""Get cached routing decision""" | |
# Create a simple hash of the message for caching | |
import hashlib | |
message_hash = hashlib.md5(user_message.lower().strip().encode()).hexdigest() | |
if message_hash in routing_cache: | |
cache_entry = routing_cache[message_hash] | |
if is_cache_valid(cache_entry, ROUTING_CACHE_DURATION): | |
return cache_entry['route'] | |
else: | |
del routing_cache[message_hash] | |
return None | |
def cache_route(user_message: str, route: str) -> None: | |
"""Cache routing decision""" | |
import hashlib | |
message_hash = hashlib.md5(user_message.lower().strip().encode()).hexdigest() | |
routing_cache[message_hash] = { | |
'route': route, | |
'timestamp': time.time() | |
} | |
print(f"[CACHE] Cached route '{route}' for message hash: {message_hash[:8]}...") | |
def clear_user_cache(user_id: str = None) -> None: | |
"""Clear cache for specific user or all users""" | |
global user_cache | |
if user_id: | |
cache_key = get_cache_key(user_id) | |
user_cache.pop(cache_key, None) | |
print(f"[CACHE] Cleared cache for user: {user_id}") | |
else: | |
user_cache.clear() | |
print("[CACHE] Cleared all user cache") | |
def get_cache_stats() -> Dict[str, Any]: | |
"""Get cache statistics""" | |
current_time = time.time() | |
valid_user_entries = 0 | |
expired_user_entries = 0 | |
valid_routing_entries = 0 | |
expired_routing_entries = 0 | |
# User cache stats | |
for entry in user_cache.values(): | |
if current_time - entry['timestamp'] < CACHE_DURATION: | |
valid_user_entries += 1 | |
else: | |
expired_user_entries += 1 | |
# Routing cache stats | |
for entry in routing_cache.values(): | |
if current_time - entry['timestamp'] < ROUTING_CACHE_DURATION: | |
valid_routing_entries += 1 | |
else: | |
expired_routing_entries += 1 | |
return { | |
"user_cache": { | |
"total_entries": len(user_cache), | |
"valid_entries": valid_user_entries, | |
"expired_entries": expired_user_entries, | |
"cache_duration_seconds": CACHE_DURATION | |
}, | |
"routing_cache": { | |
"total_entries": len(routing_cache), | |
"valid_entries": valid_routing_entries, | |
"expired_entries": expired_routing_entries, | |
"cache_duration_seconds": ROUTING_CACHE_DURATION | |
} | |
} | |
def cleanup_expired_cache() -> Dict[str, int]: | |
"""Remove expired cache entries and return count removed""" | |
global user_cache, routing_cache | |
current_time = time.time() | |
# Clean user cache | |
expired_user_keys = [] | |
for key, entry in user_cache.items(): | |
if current_time - entry['timestamp'] >= CACHE_DURATION: | |
expired_user_keys.append(key) | |
for key in expired_user_keys: | |
del user_cache[key] | |
# Clean routing cache | |
expired_routing_keys = [] | |
for key, entry in routing_cache.items(): | |
if current_time - entry['timestamp'] >= ROUTING_CACHE_DURATION: | |
expired_routing_keys.append(key) | |
for key in expired_routing_keys: | |
del routing_cache[key] | |
if expired_user_keys or expired_routing_keys: | |
print(f"[CACHE] Cleaned up {len(expired_user_keys)} user entries and {len(expired_routing_keys)} routing entries") | |
return { | |
"user_entries_removed": len(expired_user_keys), | |
"routing_entries_removed": len(expired_routing_keys) | |
} | |