TestLLM / litellm /proxy /hooks /model_max_budget_limiter.py
Raju2024's picture
Upload 1072 files
e3278e4 verified
raw
history blame
7.88 kB
import json
from typing import List, Optional
import litellm
from litellm._logging import verbose_proxy_logger
from litellm.caching.caching import DualCache
from litellm.integrations.custom_logger import Span
from litellm.proxy._types import UserAPIKeyAuth
from litellm.router_strategy.budget_limiter import RouterBudgetLimiting
from litellm.types.llms.openai import AllMessageValues
from litellm.types.utils import (
BudgetConfig,
GenericBudgetConfigType,
StandardLoggingPayload,
)
VIRTUAL_KEY_SPEND_CACHE_KEY_PREFIX = "virtual_key_spend"
class _PROXY_VirtualKeyModelMaxBudgetLimiter(RouterBudgetLimiting):
"""
Handles budgets for model + virtual key
Example: key=sk-1234567890, model=gpt-4o, max_budget=100, time_period=1d
"""
def __init__(self, dual_cache: DualCache):
self.dual_cache = dual_cache
self.redis_increment_operation_queue = []
async def is_key_within_model_budget(
self,
user_api_key_dict: UserAPIKeyAuth,
model: str,
) -> bool:
"""
Check if the user_api_key_dict is within the model budget
Raises:
BudgetExceededError: If the user_api_key_dict has exceeded the model budget
"""
_model_max_budget = user_api_key_dict.model_max_budget
internal_model_max_budget: GenericBudgetConfigType = {}
for _model, _budget_info in _model_max_budget.items():
internal_model_max_budget[_model] = BudgetConfig(**_budget_info)
verbose_proxy_logger.debug(
"internal_model_max_budget %s",
json.dumps(internal_model_max_budget, indent=4, default=str),
)
# check if current model is in internal_model_max_budget
_current_model_budget_info = self._get_request_model_budget_config(
model=model, internal_model_max_budget=internal_model_max_budget
)
if _current_model_budget_info is None:
verbose_proxy_logger.debug(
f"Model {model} not found in internal_model_max_budget"
)
return True
# check if current model is within budget
if (
_current_model_budget_info.max_budget
and _current_model_budget_info.max_budget > 0
):
_current_spend = await self._get_virtual_key_spend_for_model(
user_api_key_hash=user_api_key_dict.token,
model=model,
key_budget_config=_current_model_budget_info,
)
if (
_current_spend is not None
and _current_model_budget_info.max_budget is not None
and _current_spend > _current_model_budget_info.max_budget
):
raise litellm.BudgetExceededError(
message=f"LiteLLM Virtual Key: {user_api_key_dict.token}, key_alias: {user_api_key_dict.key_alias}, exceeded budget for model={model}",
current_cost=_current_spend,
max_budget=_current_model_budget_info.max_budget,
)
return True
async def _get_virtual_key_spend_for_model(
self,
user_api_key_hash: Optional[str],
model: str,
key_budget_config: BudgetConfig,
) -> Optional[float]:
"""
Get the current spend for a virtual key for a model
Lookup model in this order:
1. model: directly look up `model`
2. If 1, does not exist, check if passed as {custom_llm_provider}/model
"""
# 1. model: directly look up `model`
virtual_key_model_spend_cache_key = f"{VIRTUAL_KEY_SPEND_CACHE_KEY_PREFIX}:{user_api_key_hash}:{model}:{key_budget_config.budget_duration}"
_current_spend = await self.dual_cache.async_get_cache(
key=virtual_key_model_spend_cache_key,
)
if _current_spend is None:
# 2. If 1, does not exist, check if passed as {custom_llm_provider}/model
# if "/" in model, remove first part before "/" - eg. openai/o1-preview -> o1-preview
virtual_key_model_spend_cache_key = f"{VIRTUAL_KEY_SPEND_CACHE_KEY_PREFIX}:{user_api_key_hash}:{self._get_model_without_custom_llm_provider(model)}:{key_budget_config.budget_duration}"
_current_spend = await self.dual_cache.async_get_cache(
key=virtual_key_model_spend_cache_key,
)
return _current_spend
def _get_request_model_budget_config(
self, model: str, internal_model_max_budget: GenericBudgetConfigType
) -> Optional[BudgetConfig]:
"""
Get the budget config for the request model
1. Check if `model` is in `internal_model_max_budget`
2. If not, check if `model` without custom llm provider is in `internal_model_max_budget`
"""
return internal_model_max_budget.get(
model, None
) or internal_model_max_budget.get(
self._get_model_without_custom_llm_provider(model), None
)
def _get_model_without_custom_llm_provider(self, model: str) -> str:
if "/" in model:
return model.split("/")[-1]
return model
async def async_filter_deployments(
self,
model: str,
healthy_deployments: List,
messages: Optional[List[AllMessageValues]],
request_kwargs: Optional[dict] = None,
parent_otel_span: Optional[Span] = None, # type: ignore
) -> List[dict]:
return healthy_deployments
async def async_log_success_event(self, kwargs, response_obj, start_time, end_time):
"""
Track spend for virtual key + model in DualCache
Example: key=sk-1234567890, model=gpt-4o, max_budget=100, time_period=1d
"""
verbose_proxy_logger.debug("in RouterBudgetLimiting.async_log_success_event")
standard_logging_payload: Optional[StandardLoggingPayload] = kwargs.get(
"standard_logging_object", None
)
if standard_logging_payload is None:
raise ValueError("standard_logging_payload is required")
_litellm_params: dict = kwargs.get("litellm_params", {}) or {}
_metadata: dict = _litellm_params.get("metadata", {}) or {}
user_api_key_model_max_budget: Optional[dict] = _metadata.get(
"user_api_key_model_max_budget", None
)
if (
user_api_key_model_max_budget is None
or len(user_api_key_model_max_budget) == 0
):
verbose_proxy_logger.debug(
"Not running _PROXY_VirtualKeyModelMaxBudgetLimiter.async_log_success_event because user_api_key_model_max_budget is None or empty. `user_api_key_model_max_budget`=%s",
user_api_key_model_max_budget,
)
return
response_cost: float = standard_logging_payload.get("response_cost", 0)
model = standard_logging_payload.get("model")
virtual_key = standard_logging_payload.get("metadata").get("user_api_key_hash")
model = standard_logging_payload.get("model")
if virtual_key is not None:
budget_config = BudgetConfig(time_period="1d", budget_limit=0.1)
virtual_spend_key = f"{VIRTUAL_KEY_SPEND_CACHE_KEY_PREFIX}:{virtual_key}:{model}:{budget_config.budget_duration}"
virtual_start_time_key = f"virtual_key_budget_start_time:{virtual_key}"
await self._increment_spend_for_key(
budget_config=budget_config,
spend_key=virtual_spend_key,
start_time_key=virtual_start_time_key,
response_cost=response_cost,
)
verbose_proxy_logger.debug(
"current state of in memory cache %s",
json.dumps(
self.dual_cache.in_memory_cache.cache_dict, indent=4, default=str
),
)