|
|
|
|
|
|
|
import json |
|
import os |
|
|
|
import httpx |
|
|
|
import litellm |
|
from litellm.integrations.custom_logger import CustomLogger |
|
from litellm.llms.custom_httpx.http_handler import ( |
|
HTTPHandler, |
|
get_async_httpx_client, |
|
httpxSpecialProvider, |
|
) |
|
|
|
|
|
def get_utc_datetime(): |
|
import datetime as dt |
|
from datetime import datetime |
|
|
|
if hasattr(dt, "UTC"): |
|
return datetime.now(dt.UTC) |
|
else: |
|
return datetime.utcnow() |
|
|
|
|
|
class OpenMeterLogger(CustomLogger): |
|
def __init__(self) -> None: |
|
super().__init__() |
|
self.validate_environment() |
|
self.async_http_handler = get_async_httpx_client( |
|
llm_provider=httpxSpecialProvider.LoggingCallback |
|
) |
|
self.sync_http_handler = HTTPHandler() |
|
|
|
def validate_environment(self): |
|
""" |
|
Expects |
|
OPENMETER_API_ENDPOINT, |
|
OPENMETER_API_KEY, |
|
|
|
in the environment |
|
""" |
|
missing_keys = [] |
|
if os.getenv("OPENMETER_API_KEY", None) is None: |
|
missing_keys.append("OPENMETER_API_KEY") |
|
|
|
if len(missing_keys) > 0: |
|
raise Exception("Missing keys={} in environment.".format(missing_keys)) |
|
|
|
def _common_logic(self, kwargs: dict, response_obj): |
|
call_id = response_obj.get("id", kwargs.get("litellm_call_id")) |
|
dt = get_utc_datetime().isoformat() |
|
cost = kwargs.get("response_cost", None) |
|
model = kwargs.get("model") |
|
usage = {} |
|
if ( |
|
isinstance(response_obj, litellm.ModelResponse) |
|
or isinstance(response_obj, litellm.EmbeddingResponse) |
|
) and hasattr(response_obj, "usage"): |
|
usage = { |
|
"prompt_tokens": response_obj["usage"].get("prompt_tokens", 0), |
|
"completion_tokens": response_obj["usage"].get("completion_tokens", 0), |
|
"total_tokens": response_obj["usage"].get("total_tokens"), |
|
} |
|
|
|
subject = (kwargs.get("user", None),) |
|
if not subject: |
|
raise Exception("OpenMeter: user is required") |
|
|
|
return { |
|
"specversion": "1.0", |
|
"type": os.getenv("OPENMETER_EVENT_TYPE", "litellm_tokens"), |
|
"id": call_id, |
|
"time": dt, |
|
"subject": subject, |
|
"source": "litellm-proxy", |
|
"data": {"model": model, "cost": cost, **usage}, |
|
} |
|
|
|
def log_success_event(self, kwargs, response_obj, start_time, end_time): |
|
_url = os.getenv("OPENMETER_API_ENDPOINT", "https://openmeter.cloud") |
|
if _url.endswith("/"): |
|
_url += "api/v1/events" |
|
else: |
|
_url += "/api/v1/events" |
|
|
|
api_key = os.getenv("OPENMETER_API_KEY") |
|
|
|
_data = self._common_logic(kwargs=kwargs, response_obj=response_obj) |
|
_headers = { |
|
"Content-Type": "application/cloudevents+json", |
|
"Authorization": "Bearer {}".format(api_key), |
|
} |
|
|
|
try: |
|
self.sync_http_handler.post( |
|
url=_url, |
|
data=json.dumps(_data), |
|
headers=_headers, |
|
) |
|
except httpx.HTTPStatusError as e: |
|
raise Exception(f"OpenMeter logging error: {e.response.text}") |
|
except Exception as e: |
|
raise e |
|
|
|
async def async_log_success_event(self, kwargs, response_obj, start_time, end_time): |
|
_url = os.getenv("OPENMETER_API_ENDPOINT", "https://openmeter.cloud") |
|
if _url.endswith("/"): |
|
_url += "api/v1/events" |
|
else: |
|
_url += "/api/v1/events" |
|
|
|
api_key = os.getenv("OPENMETER_API_KEY") |
|
|
|
_data = self._common_logic(kwargs=kwargs, response_obj=response_obj) |
|
_headers = { |
|
"Content-Type": "application/cloudevents+json", |
|
"Authorization": "Bearer {}".format(api_key), |
|
} |
|
|
|
try: |
|
await self.async_http_handler.post( |
|
url=_url, |
|
data=json.dumps(_data), |
|
headers=_headers, |
|
) |
|
except httpx.HTTPStatusError as e: |
|
raise Exception(f"OpenMeter logging error: {e.response.text}") |
|
except Exception as e: |
|
raise e |
|
|