|
""" |
|
Helper functions to query prometheus API |
|
""" |
|
|
|
import time |
|
from datetime import datetime, timedelta |
|
from typing import Optional |
|
|
|
from litellm import get_secret |
|
from litellm._logging import verbose_logger |
|
from litellm.llms.custom_httpx.http_handler import ( |
|
get_async_httpx_client, |
|
httpxSpecialProvider, |
|
) |
|
|
|
PROMETHEUS_URL: Optional[str] = get_secret("PROMETHEUS_URL") |
|
PROMETHEUS_SELECTED_INSTANCE: Optional[str] = get_secret("PROMETHEUS_SELECTED_INSTANCE") |
|
async_http_handler = get_async_httpx_client( |
|
llm_provider=httpxSpecialProvider.LoggingCallback |
|
) |
|
|
|
|
|
async def get_metric_from_prometheus( |
|
metric_name: str, |
|
): |
|
|
|
if PROMETHEUS_URL is None: |
|
raise ValueError( |
|
"PROMETHEUS_URL not set please set 'PROMETHEUS_URL=<>' in .env" |
|
) |
|
|
|
query = f"{metric_name}[24h]" |
|
now = int(time.time()) |
|
response = await async_http_handler.get( |
|
f"{PROMETHEUS_URL}/api/v1/query", params={"query": query, "time": now} |
|
) |
|
_json_response = response.json() |
|
verbose_logger.debug("json response from prometheus /query api %s", _json_response) |
|
results = response.json()["data"]["result"] |
|
return results |
|
|
|
|
|
async def get_fallback_metric_from_prometheus(): |
|
""" |
|
Gets fallback metrics from prometheus for the last 24 hours |
|
""" |
|
response_message = "" |
|
relevant_metrics = [ |
|
"litellm_deployment_successful_fallbacks_total", |
|
"litellm_deployment_failed_fallbacks_total", |
|
] |
|
for metric in relevant_metrics: |
|
response_json = await get_metric_from_prometheus( |
|
metric_name=metric, |
|
) |
|
|
|
if response_json: |
|
verbose_logger.debug("response json %s", response_json) |
|
for result in response_json: |
|
verbose_logger.debug("result= %s", result) |
|
metric = result["metric"] |
|
metric_values = result["values"] |
|
most_recent_value = metric_values[0] |
|
|
|
if PROMETHEUS_SELECTED_INSTANCE is not None: |
|
if metric.get("instance") != PROMETHEUS_SELECTED_INSTANCE: |
|
continue |
|
|
|
value = int(float(most_recent_value[1])) |
|
primary_model = metric.get("primary_model", "Unknown") |
|
fallback_model = metric.get("fallback_model", "Unknown") |
|
response_message += f"`{value} successful fallback requests` with primary model=`{primary_model}` -> fallback model=`{fallback_model}`" |
|
response_message += "\n" |
|
verbose_logger.debug("response message %s", response_message) |
|
return response_message |
|
|
|
|
|
def is_prometheus_connected() -> bool: |
|
if PROMETHEUS_URL is not None: |
|
return True |
|
return False |
|
|
|
|
|
async def get_daily_spend_from_prometheus(api_key: Optional[str]): |
|
""" |
|
Expected Response Format: |
|
[ |
|
{ |
|
"date": "2024-08-18T00:00:00+00:00", |
|
"spend": 1.001818099998933 |
|
}, |
|
...] |
|
""" |
|
if PROMETHEUS_URL is None: |
|
raise ValueError( |
|
"PROMETHEUS_URL not set please set 'PROMETHEUS_URL=<>' in .env" |
|
) |
|
|
|
|
|
end_date = datetime.utcnow() |
|
start_date = end_date - timedelta(days=30) |
|
|
|
|
|
start_str = start_date.isoformat() + "+00:00" |
|
end_str = end_date.isoformat() + "+00:00" |
|
|
|
url = f"{PROMETHEUS_URL}/api/v1/query_range" |
|
|
|
if api_key is None: |
|
query = "sum(delta(litellm_spend_metric_total[1d]))" |
|
else: |
|
query = ( |
|
f'sum(delta(litellm_spend_metric_total{{hashed_api_key="{api_key}"}}[1d]))' |
|
) |
|
|
|
params = { |
|
"query": query, |
|
"start": start_str, |
|
"end": end_str, |
|
"step": "86400", |
|
} |
|
|
|
response = await async_http_handler.get(url, params=params) |
|
_json_response = response.json() |
|
verbose_logger.debug("json response from prometheus /query api %s", _json_response) |
|
results = response.json()["data"]["result"] |
|
formatted_results = [] |
|
|
|
for result in results: |
|
metric_data = result["values"] |
|
for timestamp, value in metric_data: |
|
|
|
date = datetime.fromtimestamp(float(timestamp)).isoformat() + "+00:00" |
|
spend = float(value) |
|
formatted_results.append({"date": date, "spend": spend}) |
|
|
|
return formatted_results |
|
|