Raju2024's picture
Upload 1072 files
e3278e4 verified
raw
history blame
4.59 kB
"""
Helper functions to query prometheus API
"""
import time
from datetime import datetime, timedelta
from typing import Optional
from litellm import get_secret
from litellm._logging import verbose_logger
from litellm.llms.custom_httpx.http_handler import (
get_async_httpx_client,
httpxSpecialProvider,
)
PROMETHEUS_URL: Optional[str] = get_secret("PROMETHEUS_URL") # type: ignore
PROMETHEUS_SELECTED_INSTANCE: Optional[str] = get_secret("PROMETHEUS_SELECTED_INSTANCE") # type: ignore
async_http_handler = get_async_httpx_client(
llm_provider=httpxSpecialProvider.LoggingCallback
)
async def get_metric_from_prometheus(
metric_name: str,
):
# Get the start of the current day in Unix timestamp
if PROMETHEUS_URL is None:
raise ValueError(
"PROMETHEUS_URL not set please set 'PROMETHEUS_URL=<>' in .env"
)
query = f"{metric_name}[24h]"
now = int(time.time())
response = await async_http_handler.get(
f"{PROMETHEUS_URL}/api/v1/query", params={"query": query, "time": now}
) # End of the day
_json_response = response.json()
verbose_logger.debug("json response from prometheus /query api %s", _json_response)
results = response.json()["data"]["result"]
return results
async def get_fallback_metric_from_prometheus():
"""
Gets fallback metrics from prometheus for the last 24 hours
"""
response_message = ""
relevant_metrics = [
"litellm_deployment_successful_fallbacks_total",
"litellm_deployment_failed_fallbacks_total",
]
for metric in relevant_metrics:
response_json = await get_metric_from_prometheus(
metric_name=metric,
)
if response_json:
verbose_logger.debug("response json %s", response_json)
for result in response_json:
verbose_logger.debug("result= %s", result)
metric = result["metric"]
metric_values = result["values"]
most_recent_value = metric_values[0]
if PROMETHEUS_SELECTED_INSTANCE is not None:
if metric.get("instance") != PROMETHEUS_SELECTED_INSTANCE:
continue
value = int(float(most_recent_value[1])) # Convert value to integer
primary_model = metric.get("primary_model", "Unknown")
fallback_model = metric.get("fallback_model", "Unknown")
response_message += f"`{value} successful fallback requests` with primary model=`{primary_model}` -> fallback model=`{fallback_model}`"
response_message += "\n"
verbose_logger.debug("response message %s", response_message)
return response_message
def is_prometheus_connected() -> bool:
if PROMETHEUS_URL is not None:
return True
return False
async def get_daily_spend_from_prometheus(api_key: Optional[str]):
"""
Expected Response Format:
[
{
"date": "2024-08-18T00:00:00+00:00",
"spend": 1.001818099998933
},
...]
"""
if PROMETHEUS_URL is None:
raise ValueError(
"PROMETHEUS_URL not set please set 'PROMETHEUS_URL=<>' in .env"
)
# Calculate the start and end dates for the last 30 days
end_date = datetime.utcnow()
start_date = end_date - timedelta(days=30)
# Format dates as ISO 8601 strings with UTC offset
start_str = start_date.isoformat() + "+00:00"
end_str = end_date.isoformat() + "+00:00"
url = f"{PROMETHEUS_URL}/api/v1/query_range"
if api_key is None:
query = "sum(delta(litellm_spend_metric_total[1d]))"
else:
query = (
f'sum(delta(litellm_spend_metric_total{{hashed_api_key="{api_key}"}}[1d]))'
)
params = {
"query": query,
"start": start_str,
"end": end_str,
"step": "86400", # Step size of 1 day in seconds
}
response = await async_http_handler.get(url, params=params)
_json_response = response.json()
verbose_logger.debug("json response from prometheus /query api %s", _json_response)
results = response.json()["data"]["result"]
formatted_results = []
for result in results:
metric_data = result["values"]
for timestamp, value in metric_data:
# Convert timestamp to ISO 8601 string with UTC offset
date = datetime.fromtimestamp(float(timestamp)).isoformat() + "+00:00"
spend = float(value)
formatted_results.append({"date": date, "spend": spend})
return formatted_results