Raju2024's picture
Upload 1072 files
e3278e4 verified
raw
history blame
4.33 kB
# What is this?
## On Success events log cost to OpenMeter - https://github.com/BerriAI/litellm/issues/1268
import json
import os
import httpx
import litellm
from litellm.integrations.custom_logger import CustomLogger
from litellm.llms.custom_httpx.http_handler import (
HTTPHandler,
get_async_httpx_client,
httpxSpecialProvider,
)
def get_utc_datetime():
import datetime as dt
from datetime import datetime
if hasattr(dt, "UTC"):
return datetime.now(dt.UTC) # type: ignore
else:
return datetime.utcnow() # type: ignore
class OpenMeterLogger(CustomLogger):
def __init__(self) -> None:
super().__init__()
self.validate_environment()
self.async_http_handler = get_async_httpx_client(
llm_provider=httpxSpecialProvider.LoggingCallback
)
self.sync_http_handler = HTTPHandler()
def validate_environment(self):
"""
Expects
OPENMETER_API_ENDPOINT,
OPENMETER_API_KEY,
in the environment
"""
missing_keys = []
if os.getenv("OPENMETER_API_KEY", None) is None:
missing_keys.append("OPENMETER_API_KEY")
if len(missing_keys) > 0:
raise Exception("Missing keys={} in environment.".format(missing_keys))
def _common_logic(self, kwargs: dict, response_obj):
call_id = response_obj.get("id", kwargs.get("litellm_call_id"))
dt = get_utc_datetime().isoformat()
cost = kwargs.get("response_cost", None)
model = kwargs.get("model")
usage = {}
if (
isinstance(response_obj, litellm.ModelResponse)
or isinstance(response_obj, litellm.EmbeddingResponse)
) and hasattr(response_obj, "usage"):
usage = {
"prompt_tokens": response_obj["usage"].get("prompt_tokens", 0),
"completion_tokens": response_obj["usage"].get("completion_tokens", 0),
"total_tokens": response_obj["usage"].get("total_tokens"),
}
subject = (kwargs.get("user", None),) # end-user passed in via 'user' param
if not subject:
raise Exception("OpenMeter: user is required")
return {
"specversion": "1.0",
"type": os.getenv("OPENMETER_EVENT_TYPE", "litellm_tokens"),
"id": call_id,
"time": dt,
"subject": subject,
"source": "litellm-proxy",
"data": {"model": model, "cost": cost, **usage},
}
def log_success_event(self, kwargs, response_obj, start_time, end_time):
_url = os.getenv("OPENMETER_API_ENDPOINT", "https://openmeter.cloud")
if _url.endswith("/"):
_url += "api/v1/events"
else:
_url += "/api/v1/events"
api_key = os.getenv("OPENMETER_API_KEY")
_data = self._common_logic(kwargs=kwargs, response_obj=response_obj)
_headers = {
"Content-Type": "application/cloudevents+json",
"Authorization": "Bearer {}".format(api_key),
}
try:
self.sync_http_handler.post(
url=_url,
data=json.dumps(_data),
headers=_headers,
)
except httpx.HTTPStatusError as e:
raise Exception(f"OpenMeter logging error: {e.response.text}")
except Exception as e:
raise e
async def async_log_success_event(self, kwargs, response_obj, start_time, end_time):
_url = os.getenv("OPENMETER_API_ENDPOINT", "https://openmeter.cloud")
if _url.endswith("/"):
_url += "api/v1/events"
else:
_url += "/api/v1/events"
api_key = os.getenv("OPENMETER_API_KEY")
_data = self._common_logic(kwargs=kwargs, response_obj=response_obj)
_headers = {
"Content-Type": "application/cloudevents+json",
"Authorization": "Bearer {}".format(api_key),
}
try:
await self.async_http_handler.post(
url=_url,
data=json.dumps(_data),
headers=_headers,
)
except httpx.HTTPStatusError as e:
raise Exception(f"OpenMeter logging error: {e.response.text}")
except Exception as e:
raise e