# What is this? ## Allocates dynamic tpm/rpm quota for a project based on current traffic ## Tracks num active projects per minute import asyncio import os from typing import List, Literal, Optional, Tuple, Union from fastapi import HTTPException import litellm from litellm import ModelResponse, Router from litellm._logging import verbose_proxy_logger from litellm.caching.caching import DualCache from litellm.integrations.custom_logger import CustomLogger from litellm.proxy._types import UserAPIKeyAuth from litellm.types.router import ModelGroupInfo from litellm.utils import get_utc_datetime class DynamicRateLimiterCache: """ Thin wrapper on DualCache for this file. Track number of active projects calling a model. """ def __init__(self, cache: DualCache) -> None: self.cache = cache self.ttl = 60 # 1 min ttl async def async_get_cache(self, model: str) -> Optional[int]: dt = get_utc_datetime() current_minute = dt.strftime("%H-%M") key_name = "{}:{}".format(current_minute, model) _response = await self.cache.async_get_cache(key=key_name) response: Optional[int] = None if _response is not None: response = len(_response) return response async def async_set_cache_sadd(self, model: str, value: List): """ Add value to set. Parameters: - model: str, the name of the model group - value: str, the team id Returns: - None Raises: - Exception, if unable to connect to cache client (if redis caching enabled) """ try: dt = get_utc_datetime() current_minute = dt.strftime("%H-%M") key_name = "{}:{}".format(current_minute, model) await self.cache.async_set_cache_sadd( key=key_name, value=value, ttl=self.ttl ) except Exception as e: verbose_proxy_logger.exception( "litellm.proxy.hooks.dynamic_rate_limiter.py::async_set_cache_sadd(): Exception occured - {}".format( str(e) ) ) raise e class _PROXY_DynamicRateLimitHandler(CustomLogger): # Class variables or attributes def __init__(self, internal_usage_cache: DualCache): self.internal_usage_cache = DynamicRateLimiterCache(cache=internal_usage_cache) def update_variables(self, llm_router: Router): self.llm_router = llm_router async def check_available_usage( self, model: str, priority: Optional[str] = None ) -> Tuple[ Optional[int], Optional[int], Optional[int], Optional[int], Optional[int] ]: """ For a given model, get its available tpm Params: - model: str, the name of the model in the router model_list - priority: Optional[str], the priority for the request. Returns - Tuple[available_tpm, available_tpm, model_tpm, model_rpm, active_projects] - available_tpm: int or null - always 0 or positive. - available_tpm: int or null - always 0 or positive. - remaining_model_tpm: int or null. If available tpm is int, then this will be too. - remaining_model_rpm: int or null. If available rpm is int, then this will be too. - active_projects: int or null """ try: weight: float = 1 if ( litellm.priority_reservation is None or priority not in litellm.priority_reservation ): verbose_proxy_logger.error( "Priority Reservation not set. priority={}, but litellm.priority_reservation is {}.".format( priority, litellm.priority_reservation ) ) elif priority is not None and litellm.priority_reservation is not None: if os.getenv("LITELLM_LICENSE", None) is None: verbose_proxy_logger.error( "PREMIUM FEATURE: Reserving tpm/rpm by priority is a premium feature. Please add a 'LITELLM_LICENSE' to your .env to enable this.\nGet a license: https://docs.litellm.ai/docs/proxy/enterprise." ) else: weight = litellm.priority_reservation[priority] active_projects = await self.internal_usage_cache.async_get_cache( model=model ) current_model_tpm, current_model_rpm = ( await self.llm_router.get_model_group_usage(model_group=model) ) model_group_info: Optional[ModelGroupInfo] = ( self.llm_router.get_model_group_info(model_group=model) ) total_model_tpm: Optional[int] = None total_model_rpm: Optional[int] = None if model_group_info is not None: if model_group_info.tpm is not None: total_model_tpm = model_group_info.tpm if model_group_info.rpm is not None: total_model_rpm = model_group_info.rpm remaining_model_tpm: Optional[int] = None if total_model_tpm is not None and current_model_tpm is not None: remaining_model_tpm = total_model_tpm - current_model_tpm elif total_model_tpm is not None: remaining_model_tpm = total_model_tpm remaining_model_rpm: Optional[int] = None if total_model_rpm is not None and current_model_rpm is not None: remaining_model_rpm = total_model_rpm - current_model_rpm elif total_model_rpm is not None: remaining_model_rpm = total_model_rpm available_tpm: Optional[int] = None if remaining_model_tpm is not None: if active_projects is not None: available_tpm = int(remaining_model_tpm * weight / active_projects) else: available_tpm = int(remaining_model_tpm * weight) if available_tpm is not None and available_tpm < 0: available_tpm = 0 available_rpm: Optional[int] = None if remaining_model_rpm is not None: if active_projects is not None: available_rpm = int(remaining_model_rpm * weight / active_projects) else: available_rpm = int(remaining_model_rpm * weight) if available_rpm is not None and available_rpm < 0: available_rpm = 0 return ( available_tpm, available_rpm, remaining_model_tpm, remaining_model_rpm, active_projects, ) except Exception as e: verbose_proxy_logger.exception( "litellm.proxy.hooks.dynamic_rate_limiter.py::check_available_usage: Exception occurred - {}".format( str(e) ) ) return None, None, None, None, None async def async_pre_call_hook( self, user_api_key_dict: UserAPIKeyAuth, cache: DualCache, data: dict, call_type: Literal[ "completion", "text_completion", "embeddings", "image_generation", "moderation", "audio_transcription", "pass_through_endpoint", "rerank", ], ) -> Optional[ Union[Exception, str, dict] ]: # raise exception if invalid, return a str for the user to receive - if rejected, or return a modified dictionary for passing into litellm """ - For a model group - Check if tpm/rpm available - Raise RateLimitError if no tpm/rpm available """ if "model" in data: key_priority: Optional[str] = user_api_key_dict.metadata.get( "priority", None ) available_tpm, available_rpm, model_tpm, model_rpm, active_projects = ( await self.check_available_usage( model=data["model"], priority=key_priority ) ) ### CHECK TPM ### if available_tpm is not None and available_tpm == 0: raise HTTPException( status_code=429, detail={ "error": "Key={} over available TPM={}. Model TPM={}, Active keys={}".format( user_api_key_dict.api_key, available_tpm, model_tpm, active_projects, ) }, ) ### CHECK RPM ### elif available_rpm is not None and available_rpm == 0: raise HTTPException( status_code=429, detail={ "error": "Key={} over available RPM={}. Model RPM={}, Active keys={}".format( user_api_key_dict.api_key, available_rpm, model_rpm, active_projects, ) }, ) elif available_rpm is not None or available_tpm is not None: ## UPDATE CACHE WITH ACTIVE PROJECT asyncio.create_task( self.internal_usage_cache.async_set_cache_sadd( # this is a set model=data["model"], # type: ignore value=[user_api_key_dict.token or "default_key"], ) ) return None async def async_post_call_success_hook( self, data: dict, user_api_key_dict: UserAPIKeyAuth, response ): try: if isinstance(response, ModelResponse): model_info = self.llm_router.get_model_info( id=response._hidden_params["model_id"] ) assert ( model_info is not None ), "Model info for model with id={} is None".format( response._hidden_params["model_id"] ) key_priority: Optional[str] = user_api_key_dict.metadata.get( "priority", None ) available_tpm, available_rpm, model_tpm, model_rpm, active_projects = ( await self.check_available_usage( model=model_info["model_name"], priority=key_priority ) ) response._hidden_params["additional_headers"] = ( { # Add additional response headers - easier debugging "x-litellm-model_group": model_info["model_name"], "x-ratelimit-remaining-litellm-project-tokens": available_tpm, "x-ratelimit-remaining-litellm-project-requests": available_rpm, "x-ratelimit-remaining-model-tokens": model_tpm, "x-ratelimit-remaining-model-requests": model_rpm, "x-ratelimit-current-active-projects": active_projects, } ) return response return await super().async_post_call_success_hook( data=data, user_api_key_dict=user_api_key_dict, response=response, ) except Exception as e: verbose_proxy_logger.exception( "litellm.proxy.hooks.dynamic_rate_limiter.py::async_post_call_success_hook(): Exception occured - {}".format( str(e) ) ) return response