|
import json |
|
from typing import List, Optional |
|
|
|
import litellm |
|
from litellm._logging import verbose_proxy_logger |
|
from litellm.caching.caching import DualCache |
|
from litellm.integrations.custom_logger import Span |
|
from litellm.proxy._types import UserAPIKeyAuth |
|
from litellm.router_strategy.budget_limiter import RouterBudgetLimiting |
|
from litellm.types.llms.openai import AllMessageValues |
|
from litellm.types.utils import ( |
|
BudgetConfig, |
|
GenericBudgetConfigType, |
|
StandardLoggingPayload, |
|
) |
|
|
|
VIRTUAL_KEY_SPEND_CACHE_KEY_PREFIX = "virtual_key_spend" |
|
|
|
|
|
class _PROXY_VirtualKeyModelMaxBudgetLimiter(RouterBudgetLimiting): |
|
""" |
|
Handles budgets for model + virtual key |
|
|
|
Example: key=sk-1234567890, model=gpt-4o, max_budget=100, time_period=1d |
|
""" |
|
|
|
def __init__(self, dual_cache: DualCache): |
|
self.dual_cache = dual_cache |
|
self.redis_increment_operation_queue = [] |
|
|
|
async def is_key_within_model_budget( |
|
self, |
|
user_api_key_dict: UserAPIKeyAuth, |
|
model: str, |
|
) -> bool: |
|
""" |
|
Check if the user_api_key_dict is within the model budget |
|
|
|
Raises: |
|
BudgetExceededError: If the user_api_key_dict has exceeded the model budget |
|
""" |
|
_model_max_budget = user_api_key_dict.model_max_budget |
|
internal_model_max_budget: GenericBudgetConfigType = {} |
|
|
|
for _model, _budget_info in _model_max_budget.items(): |
|
internal_model_max_budget[_model] = BudgetConfig(**_budget_info) |
|
|
|
verbose_proxy_logger.debug( |
|
"internal_model_max_budget %s", |
|
json.dumps(internal_model_max_budget, indent=4, default=str), |
|
) |
|
|
|
|
|
_current_model_budget_info = self._get_request_model_budget_config( |
|
model=model, internal_model_max_budget=internal_model_max_budget |
|
) |
|
if _current_model_budget_info is None: |
|
verbose_proxy_logger.debug( |
|
f"Model {model} not found in internal_model_max_budget" |
|
) |
|
return True |
|
|
|
|
|
if ( |
|
_current_model_budget_info.max_budget |
|
and _current_model_budget_info.max_budget > 0 |
|
): |
|
_current_spend = await self._get_virtual_key_spend_for_model( |
|
user_api_key_hash=user_api_key_dict.token, |
|
model=model, |
|
key_budget_config=_current_model_budget_info, |
|
) |
|
if ( |
|
_current_spend is not None |
|
and _current_model_budget_info.max_budget is not None |
|
and _current_spend > _current_model_budget_info.max_budget |
|
): |
|
raise litellm.BudgetExceededError( |
|
message=f"LiteLLM Virtual Key: {user_api_key_dict.token}, key_alias: {user_api_key_dict.key_alias}, exceeded budget for model={model}", |
|
current_cost=_current_spend, |
|
max_budget=_current_model_budget_info.max_budget, |
|
) |
|
|
|
return True |
|
|
|
async def _get_virtual_key_spend_for_model( |
|
self, |
|
user_api_key_hash: Optional[str], |
|
model: str, |
|
key_budget_config: BudgetConfig, |
|
) -> Optional[float]: |
|
""" |
|
Get the current spend for a virtual key for a model |
|
|
|
Lookup model in this order: |
|
1. model: directly look up `model` |
|
2. If 1, does not exist, check if passed as {custom_llm_provider}/model |
|
""" |
|
|
|
|
|
virtual_key_model_spend_cache_key = f"{VIRTUAL_KEY_SPEND_CACHE_KEY_PREFIX}:{user_api_key_hash}:{model}:{key_budget_config.budget_duration}" |
|
_current_spend = await self.dual_cache.async_get_cache( |
|
key=virtual_key_model_spend_cache_key, |
|
) |
|
|
|
if _current_spend is None: |
|
|
|
|
|
virtual_key_model_spend_cache_key = f"{VIRTUAL_KEY_SPEND_CACHE_KEY_PREFIX}:{user_api_key_hash}:{self._get_model_without_custom_llm_provider(model)}:{key_budget_config.budget_duration}" |
|
_current_spend = await self.dual_cache.async_get_cache( |
|
key=virtual_key_model_spend_cache_key, |
|
) |
|
return _current_spend |
|
|
|
def _get_request_model_budget_config( |
|
self, model: str, internal_model_max_budget: GenericBudgetConfigType |
|
) -> Optional[BudgetConfig]: |
|
""" |
|
Get the budget config for the request model |
|
|
|
1. Check if `model` is in `internal_model_max_budget` |
|
2. If not, check if `model` without custom llm provider is in `internal_model_max_budget` |
|
""" |
|
return internal_model_max_budget.get( |
|
model, None |
|
) or internal_model_max_budget.get( |
|
self._get_model_without_custom_llm_provider(model), None |
|
) |
|
|
|
def _get_model_without_custom_llm_provider(self, model: str) -> str: |
|
if "/" in model: |
|
return model.split("/")[-1] |
|
return model |
|
|
|
async def async_filter_deployments( |
|
self, |
|
model: str, |
|
healthy_deployments: List, |
|
messages: Optional[List[AllMessageValues]], |
|
request_kwargs: Optional[dict] = None, |
|
parent_otel_span: Optional[Span] = None, |
|
) -> List[dict]: |
|
return healthy_deployments |
|
|
|
async def async_log_success_event(self, kwargs, response_obj, start_time, end_time): |
|
""" |
|
Track spend for virtual key + model in DualCache |
|
|
|
Example: key=sk-1234567890, model=gpt-4o, max_budget=100, time_period=1d |
|
""" |
|
verbose_proxy_logger.debug("in RouterBudgetLimiting.async_log_success_event") |
|
standard_logging_payload: Optional[StandardLoggingPayload] = kwargs.get( |
|
"standard_logging_object", None |
|
) |
|
if standard_logging_payload is None: |
|
raise ValueError("standard_logging_payload is required") |
|
|
|
_litellm_params: dict = kwargs.get("litellm_params", {}) or {} |
|
_metadata: dict = _litellm_params.get("metadata", {}) or {} |
|
user_api_key_model_max_budget: Optional[dict] = _metadata.get( |
|
"user_api_key_model_max_budget", None |
|
) |
|
if ( |
|
user_api_key_model_max_budget is None |
|
or len(user_api_key_model_max_budget) == 0 |
|
): |
|
verbose_proxy_logger.debug( |
|
"Not running _PROXY_VirtualKeyModelMaxBudgetLimiter.async_log_success_event because user_api_key_model_max_budget is None or empty. `user_api_key_model_max_budget`=%s", |
|
user_api_key_model_max_budget, |
|
) |
|
return |
|
response_cost: float = standard_logging_payload.get("response_cost", 0) |
|
model = standard_logging_payload.get("model") |
|
|
|
virtual_key = standard_logging_payload.get("metadata").get("user_api_key_hash") |
|
model = standard_logging_payload.get("model") |
|
if virtual_key is not None: |
|
budget_config = BudgetConfig(time_period="1d", budget_limit=0.1) |
|
virtual_spend_key = f"{VIRTUAL_KEY_SPEND_CACHE_KEY_PREFIX}:{virtual_key}:{model}:{budget_config.budget_duration}" |
|
virtual_start_time_key = f"virtual_key_budget_start_time:{virtual_key}" |
|
await self._increment_spend_for_key( |
|
budget_config=budget_config, |
|
spend_key=virtual_spend_key, |
|
start_time_key=virtual_start_time_key, |
|
response_cost=response_cost, |
|
) |
|
verbose_proxy_logger.debug( |
|
"current state of in memory cache %s", |
|
json.dumps( |
|
self.dual_cache.in_memory_cache.cache_dict, indent=4, default=str |
|
), |
|
) |
|
|