|
|
|
|
|
|
|
|
|
|
|
import traceback |
|
from typing import Literal, Optional |
|
|
|
from fastapi import HTTPException |
|
|
|
import litellm |
|
from litellm._logging import verbose_proxy_logger |
|
from litellm.caching.caching import DualCache, InMemoryCache, RedisCache |
|
from litellm.integrations.custom_logger import CustomLogger |
|
from litellm.proxy._types import UserAPIKeyAuth |
|
|
|
|
|
class _PROXY_BatchRedisRequests(CustomLogger): |
|
|
|
in_memory_cache: Optional[InMemoryCache] = None |
|
|
|
def __init__(self): |
|
if litellm.cache is not None: |
|
litellm.cache.async_get_cache = ( |
|
self.async_get_cache |
|
) |
|
|
|
def print_verbose( |
|
self, print_statement, debug_level: Literal["INFO", "DEBUG"] = "DEBUG" |
|
): |
|
if debug_level == "DEBUG": |
|
verbose_proxy_logger.debug(print_statement) |
|
elif debug_level == "INFO": |
|
verbose_proxy_logger.debug(print_statement) |
|
if litellm.set_verbose is True: |
|
print(print_statement) |
|
|
|
async def async_pre_call_hook( |
|
self, |
|
user_api_key_dict: UserAPIKeyAuth, |
|
cache: DualCache, |
|
data: dict, |
|
call_type: str, |
|
): |
|
try: |
|
""" |
|
Get the user key |
|
|
|
Check if a key starting with `litellm:<api_key>:<call_type:` exists in-memory |
|
|
|
If no, then get relevant cache from redis |
|
""" |
|
api_key = user_api_key_dict.api_key |
|
|
|
cache_key_name = f"litellm:{api_key}:{call_type}" |
|
self.in_memory_cache = cache.in_memory_cache |
|
|
|
key_value_dict = {} |
|
in_memory_cache_exists = False |
|
for key in cache.in_memory_cache.cache_dict.keys(): |
|
if isinstance(key, str) and key.startswith(cache_key_name): |
|
in_memory_cache_exists = True |
|
|
|
if in_memory_cache_exists is False and litellm.cache is not None: |
|
""" |
|
- Check if `litellm.Cache` is redis |
|
- Get the relevant values |
|
""" |
|
if litellm.cache.type is not None and isinstance( |
|
litellm.cache.cache, RedisCache |
|
): |
|
|
|
keys = [] |
|
self.print_verbose(f"cache_key_name: {cache_key_name}") |
|
|
|
keys = await litellm.cache.cache.async_scan_iter( |
|
pattern=cache_key_name, count=100 |
|
) |
|
|
|
|
|
|
|
self.print_verbose(f"redis keys: {keys}") |
|
if len(keys) > 0: |
|
key_value_dict = ( |
|
await litellm.cache.cache.async_batch_get_cache( |
|
key_list=keys |
|
) |
|
) |
|
|
|
|
|
if len(key_value_dict.items()) > 0: |
|
await cache.in_memory_cache.async_set_cache_pipeline( |
|
cache_list=list(key_value_dict.items()), ttl=60 |
|
) |
|
|
|
data["metadata"]["redis_namespace"] = cache_key_name |
|
except HTTPException as e: |
|
raise e |
|
except Exception as e: |
|
verbose_proxy_logger.error( |
|
"litellm.proxy.hooks.batch_redis_get.py::async_pre_call_hook(): Exception occured - {}".format( |
|
str(e) |
|
) |
|
) |
|
verbose_proxy_logger.debug(traceback.format_exc()) |
|
|
|
async def async_get_cache(self, *args, **kwargs): |
|
""" |
|
- Check if the cache key is in-memory |
|
|
|
- Else: |
|
- add missing cache key from REDIS |
|
- update in-memory cache |
|
- return redis cache request |
|
""" |
|
try: |
|
cache_key: Optional[str] = None |
|
if "cache_key" in kwargs: |
|
cache_key = kwargs["cache_key"] |
|
elif litellm.cache is not None: |
|
cache_key = litellm.cache.get_cache_key( |
|
*args, **kwargs |
|
) |
|
|
|
if ( |
|
cache_key is not None |
|
and self.in_memory_cache is not None |
|
and litellm.cache is not None |
|
): |
|
cache_control_args = kwargs.get("cache", {}) |
|
max_age = cache_control_args.get( |
|
"s-max-age", cache_control_args.get("s-maxage", float("inf")) |
|
) |
|
cached_result = self.in_memory_cache.get_cache( |
|
cache_key, *args, **kwargs |
|
) |
|
if cached_result is None: |
|
cached_result = await litellm.cache.cache.async_get_cache( |
|
cache_key, *args, **kwargs |
|
) |
|
if cached_result is not None: |
|
await self.in_memory_cache.async_set_cache( |
|
cache_key, cached_result, ttl=60 |
|
) |
|
return litellm.cache._get_cache_logic( |
|
cached_result=cached_result, max_age=max_age |
|
) |
|
except Exception: |
|
return None |
|
|