|
import os |
|
from dataclasses import dataclass |
|
from datetime import datetime |
|
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union |
|
|
|
import litellm |
|
from litellm._logging import verbose_logger |
|
from litellm.integrations.custom_logger import CustomLogger |
|
from litellm.types.services import ServiceLoggerPayload |
|
from litellm.types.utils import ( |
|
ChatCompletionMessageToolCall, |
|
Function, |
|
StandardLoggingPayload, |
|
) |
|
|
|
if TYPE_CHECKING: |
|
from opentelemetry.sdk.trace.export import SpanExporter as _SpanExporter |
|
from opentelemetry.trace import Span as _Span |
|
|
|
from litellm.proxy._types import ( |
|
ManagementEndpointLoggingPayload as _ManagementEndpointLoggingPayload, |
|
) |
|
from litellm.proxy.proxy_server import UserAPIKeyAuth as _UserAPIKeyAuth |
|
|
|
Span = _Span |
|
SpanExporter = _SpanExporter |
|
UserAPIKeyAuth = _UserAPIKeyAuth |
|
ManagementEndpointLoggingPayload = _ManagementEndpointLoggingPayload |
|
else: |
|
Span = Any |
|
SpanExporter = Any |
|
UserAPIKeyAuth = Any |
|
ManagementEndpointLoggingPayload = Any |
|
|
|
|
|
LITELLM_TRACER_NAME = os.getenv("OTEL_TRACER_NAME", "litellm") |
|
LITELLM_RESOURCE: Dict[Any, Any] = { |
|
"service.name": os.getenv("OTEL_SERVICE_NAME", "litellm"), |
|
"deployment.environment": os.getenv("OTEL_ENVIRONMENT_NAME", "production"), |
|
"model_id": os.getenv("OTEL_SERVICE_NAME", "litellm"), |
|
} |
|
RAW_REQUEST_SPAN_NAME = "raw_gen_ai_request" |
|
LITELLM_REQUEST_SPAN_NAME = "litellm_request" |
|
|
|
|
|
@dataclass |
|
class OpenTelemetryConfig: |
|
|
|
exporter: Union[str, SpanExporter] = "console" |
|
endpoint: Optional[str] = None |
|
headers: Optional[str] = None |
|
|
|
@classmethod |
|
def from_env(cls): |
|
""" |
|
OTEL_HEADERS=x-honeycomb-team=B85YgLm9**** |
|
OTEL_EXPORTER="otlp_http" |
|
OTEL_ENDPOINT="https://api.honeycomb.io/v1/traces" |
|
|
|
OTEL_HEADERS gets sent as headers = {"x-honeycomb-team": "B85YgLm96******"} |
|
""" |
|
from opentelemetry.sdk.trace.export.in_memory_span_exporter import ( |
|
InMemorySpanExporter, |
|
) |
|
|
|
if os.getenv("OTEL_EXPORTER") == "in_memory": |
|
return cls(exporter=InMemorySpanExporter()) |
|
return cls( |
|
exporter=os.getenv("OTEL_EXPORTER", "console"), |
|
endpoint=os.getenv("OTEL_ENDPOINT"), |
|
headers=os.getenv( |
|
"OTEL_HEADERS" |
|
), |
|
) |
|
|
|
|
|
class OpenTelemetry(CustomLogger): |
|
def __init__( |
|
self, |
|
config: Optional[OpenTelemetryConfig] = None, |
|
callback_name: Optional[str] = None, |
|
**kwargs, |
|
): |
|
from opentelemetry import trace |
|
from opentelemetry.sdk.resources import Resource |
|
from opentelemetry.sdk.trace import TracerProvider |
|
from opentelemetry.trace import SpanKind |
|
|
|
if config is None: |
|
config = OpenTelemetryConfig.from_env() |
|
|
|
self.config = config |
|
self.OTEL_EXPORTER = self.config.exporter |
|
self.OTEL_ENDPOINT = self.config.endpoint |
|
self.OTEL_HEADERS = self.config.headers |
|
provider = TracerProvider(resource=Resource(attributes=LITELLM_RESOURCE)) |
|
provider.add_span_processor(self._get_span_processor()) |
|
self.callback_name = callback_name |
|
|
|
trace.set_tracer_provider(provider) |
|
self.tracer = trace.get_tracer(LITELLM_TRACER_NAME) |
|
|
|
self.span_kind = SpanKind |
|
|
|
_debug_otel = str(os.getenv("DEBUG_OTEL", "False")).lower() |
|
|
|
if _debug_otel == "true": |
|
|
|
import logging |
|
|
|
logging.basicConfig(level=logging.DEBUG) |
|
logging.getLogger(__name__) |
|
|
|
|
|
otel_exporter_logger = logging.getLogger("opentelemetry.sdk.trace.export") |
|
otel_exporter_logger.setLevel(logging.DEBUG) |
|
|
|
|
|
super().__init__(**kwargs) |
|
self._init_otel_logger_on_litellm_proxy() |
|
|
|
def _init_otel_logger_on_litellm_proxy(self): |
|
""" |
|
Initializes OpenTelemetry for litellm proxy server |
|
|
|
- Adds Otel as a service callback |
|
- Sets `proxy_server.open_telemetry_logger` to self |
|
""" |
|
from litellm.proxy import proxy_server |
|
|
|
|
|
if "otel" not in litellm.service_callback: |
|
litellm.service_callback.append("otel") |
|
setattr(proxy_server, "open_telemetry_logger", self) |
|
|
|
def log_success_event(self, kwargs, response_obj, start_time, end_time): |
|
self._handle_sucess(kwargs, response_obj, start_time, end_time) |
|
|
|
def log_failure_event(self, kwargs, response_obj, start_time, end_time): |
|
self._handle_failure(kwargs, response_obj, start_time, end_time) |
|
|
|
async def async_log_success_event(self, kwargs, response_obj, start_time, end_time): |
|
self._handle_sucess(kwargs, response_obj, start_time, end_time) |
|
|
|
async def async_log_failure_event(self, kwargs, response_obj, start_time, end_time): |
|
self._handle_failure(kwargs, response_obj, start_time, end_time) |
|
|
|
async def async_service_success_hook( |
|
self, |
|
payload: ServiceLoggerPayload, |
|
parent_otel_span: Optional[Span] = None, |
|
start_time: Optional[Union[datetime, float]] = None, |
|
end_time: Optional[Union[datetime, float]] = None, |
|
event_metadata: Optional[dict] = None, |
|
): |
|
|
|
from opentelemetry import trace |
|
from opentelemetry.trace import Status, StatusCode |
|
|
|
_start_time_ns = 0 |
|
_end_time_ns = 0 |
|
|
|
if isinstance(start_time, float): |
|
_start_time_ns = int(start_time * 1e9) |
|
else: |
|
_start_time_ns = self._to_ns(start_time) |
|
|
|
if isinstance(end_time, float): |
|
_end_time_ns = int(end_time * 1e9) |
|
else: |
|
_end_time_ns = self._to_ns(end_time) |
|
|
|
if parent_otel_span is not None: |
|
_span_name = payload.service |
|
service_logging_span = self.tracer.start_span( |
|
name=_span_name, |
|
context=trace.set_span_in_context(parent_otel_span), |
|
start_time=_start_time_ns, |
|
) |
|
self.safe_set_attribute( |
|
span=service_logging_span, |
|
key="call_type", |
|
value=payload.call_type, |
|
) |
|
self.safe_set_attribute( |
|
span=service_logging_span, |
|
key="service", |
|
value=payload.service.value, |
|
) |
|
|
|
if event_metadata: |
|
for key, value in event_metadata.items(): |
|
if value is None: |
|
value = "None" |
|
if isinstance(value, dict): |
|
try: |
|
value = str(value) |
|
except Exception: |
|
value = "litellm logging error - could_not_json_serialize" |
|
self.safe_set_attribute( |
|
span=service_logging_span, |
|
key=key, |
|
value=value, |
|
) |
|
service_logging_span.set_status(Status(StatusCode.OK)) |
|
service_logging_span.end(end_time=_end_time_ns) |
|
|
|
async def async_service_failure_hook( |
|
self, |
|
payload: ServiceLoggerPayload, |
|
error: Optional[str] = "", |
|
parent_otel_span: Optional[Span] = None, |
|
start_time: Optional[Union[datetime, float]] = None, |
|
end_time: Optional[Union[float, datetime]] = None, |
|
event_metadata: Optional[dict] = None, |
|
): |
|
|
|
from opentelemetry import trace |
|
from opentelemetry.trace import Status, StatusCode |
|
|
|
_start_time_ns = 0 |
|
_end_time_ns = 0 |
|
|
|
if isinstance(start_time, float): |
|
_start_time_ns = int(int(start_time) * 1e9) |
|
else: |
|
_start_time_ns = self._to_ns(start_time) |
|
|
|
if isinstance(end_time, float): |
|
_end_time_ns = int(int(end_time) * 1e9) |
|
else: |
|
_end_time_ns = self._to_ns(end_time) |
|
|
|
if parent_otel_span is not None: |
|
_span_name = payload.service |
|
service_logging_span = self.tracer.start_span( |
|
name=_span_name, |
|
context=trace.set_span_in_context(parent_otel_span), |
|
start_time=_start_time_ns, |
|
) |
|
self.safe_set_attribute( |
|
span=service_logging_span, |
|
key="call_type", |
|
value=payload.call_type, |
|
) |
|
self.safe_set_attribute( |
|
span=service_logging_span, |
|
key="service", |
|
value=payload.service.value, |
|
) |
|
if error: |
|
self.safe_set_attribute( |
|
span=service_logging_span, |
|
key="error", |
|
value=error, |
|
) |
|
if event_metadata: |
|
for key, value in event_metadata.items(): |
|
if isinstance(value, dict): |
|
try: |
|
value = str(value) |
|
except Exception: |
|
value = "litllm logging error - could_not_json_serialize" |
|
self.safe_set_attribute( |
|
span=service_logging_span, |
|
key=key, |
|
value=value, |
|
) |
|
|
|
service_logging_span.set_status(Status(StatusCode.ERROR)) |
|
service_logging_span.end(end_time=_end_time_ns) |
|
|
|
async def async_post_call_failure_hook( |
|
self, |
|
request_data: dict, |
|
original_exception: Exception, |
|
user_api_key_dict: UserAPIKeyAuth, |
|
): |
|
from opentelemetry import trace |
|
from opentelemetry.trace import Status, StatusCode |
|
|
|
parent_otel_span = user_api_key_dict.parent_otel_span |
|
if parent_otel_span is not None: |
|
parent_otel_span.set_status(Status(StatusCode.ERROR)) |
|
_span_name = "Failed Proxy Server Request" |
|
|
|
|
|
exception_logging_span = self.tracer.start_span( |
|
name=_span_name, |
|
context=trace.set_span_in_context(parent_otel_span), |
|
) |
|
self.safe_set_attribute( |
|
span=exception_logging_span, |
|
key="exception", |
|
value=str(original_exception), |
|
) |
|
exception_logging_span.set_status(Status(StatusCode.ERROR)) |
|
exception_logging_span.end(end_time=self._to_ns(datetime.now())) |
|
|
|
|
|
parent_otel_span.end(end_time=self._to_ns(datetime.now())) |
|
|
|
def _handle_sucess(self, kwargs, response_obj, start_time, end_time): |
|
from opentelemetry import trace |
|
from opentelemetry.trace import Status, StatusCode |
|
|
|
verbose_logger.debug( |
|
"OpenTelemetry Logger: Logging kwargs: %s, OTEL config settings=%s", |
|
kwargs, |
|
self.config, |
|
) |
|
_parent_context, parent_otel_span = self._get_span_context(kwargs) |
|
|
|
|
|
span = self.tracer.start_span( |
|
name=self._get_span_name(kwargs), |
|
start_time=self._to_ns(start_time), |
|
context=_parent_context, |
|
) |
|
span.set_status(Status(StatusCode.OK)) |
|
self.set_attributes(span, kwargs, response_obj) |
|
|
|
if litellm.turn_off_message_logging is True: |
|
pass |
|
elif self.message_logging is not True: |
|
pass |
|
else: |
|
|
|
raw_request_span = self.tracer.start_span( |
|
name=RAW_REQUEST_SPAN_NAME, |
|
start_time=self._to_ns(start_time), |
|
context=trace.set_span_in_context(span), |
|
) |
|
|
|
raw_request_span.set_status(Status(StatusCode.OK)) |
|
self.set_raw_request_attributes(raw_request_span, kwargs, response_obj) |
|
raw_request_span.end(end_time=self._to_ns(end_time)) |
|
|
|
span.end(end_time=self._to_ns(end_time)) |
|
|
|
if parent_otel_span is not None: |
|
parent_otel_span.end(end_time=self._to_ns(datetime.now())) |
|
|
|
def _handle_failure(self, kwargs, response_obj, start_time, end_time): |
|
from opentelemetry.trace import Status, StatusCode |
|
|
|
verbose_logger.debug( |
|
"OpenTelemetry Logger: Failure HandlerLogging kwargs: %s, OTEL config settings=%s", |
|
kwargs, |
|
self.config, |
|
) |
|
_parent_context, parent_otel_span = self._get_span_context(kwargs) |
|
|
|
|
|
span = self.tracer.start_span( |
|
name=self._get_span_name(kwargs), |
|
start_time=self._to_ns(start_time), |
|
context=_parent_context, |
|
) |
|
span.set_status(Status(StatusCode.ERROR)) |
|
self.set_attributes(span, kwargs, response_obj) |
|
span.end(end_time=self._to_ns(end_time)) |
|
|
|
if parent_otel_span is not None: |
|
parent_otel_span.end(end_time=self._to_ns(datetime.now())) |
|
|
|
def set_tools_attributes(self, span: Span, tools): |
|
import json |
|
|
|
from litellm.proxy._types import SpanAttributes |
|
|
|
if not tools: |
|
return |
|
|
|
try: |
|
for i, tool in enumerate(tools): |
|
function = tool.get("function") |
|
if not function: |
|
continue |
|
|
|
prefix = f"{SpanAttributes.LLM_REQUEST_FUNCTIONS}.{i}" |
|
self.safe_set_attribute( |
|
span=span, |
|
key=f"{prefix}.name", |
|
value=function.get("name"), |
|
) |
|
self.safe_set_attribute( |
|
span=span, |
|
key=f"{prefix}.description", |
|
value=function.get("description"), |
|
) |
|
self.safe_set_attribute( |
|
span=span, |
|
key=f"{prefix}.parameters", |
|
value=json.dumps(function.get("parameters")), |
|
) |
|
except Exception as e: |
|
verbose_logger.error( |
|
"OpenTelemetry: Error setting tools attributes: %s", str(e) |
|
) |
|
pass |
|
|
|
def cast_as_primitive_value_type(self, value) -> Union[str, bool, int, float]: |
|
""" |
|
Casts the value to a primitive OTEL type if it is not already a primitive type. |
|
|
|
OTEL supports - str, bool, int, float |
|
|
|
If it's not a primitive type, then it's converted to a string |
|
""" |
|
if value is None: |
|
return "" |
|
if isinstance(value, (str, bool, int, float)): |
|
return value |
|
try: |
|
return str(value) |
|
except Exception: |
|
return "" |
|
|
|
@staticmethod |
|
def _tool_calls_kv_pair( |
|
tool_calls: List[ChatCompletionMessageToolCall], |
|
) -> Dict[str, Any]: |
|
from litellm.proxy._types import SpanAttributes |
|
|
|
kv_pairs: Dict[str, Any] = {} |
|
for idx, tool_call in enumerate(tool_calls): |
|
_function = tool_call.get("function") |
|
if not _function: |
|
continue |
|
|
|
keys = Function.__annotations__.keys() |
|
for key in keys: |
|
_value = _function.get(key) |
|
if _value: |
|
kv_pairs[ |
|
f"{SpanAttributes.LLM_COMPLETIONS}.{idx}.function_call.{key}" |
|
] = _value |
|
|
|
return kv_pairs |
|
|
|
def set_attributes( |
|
self, span: Span, kwargs, response_obj: Optional[Any] |
|
): |
|
try: |
|
if self.callback_name == "arize": |
|
from litellm.integrations.arize_ai import ArizeLogger |
|
|
|
ArizeLogger.set_arize_ai_attributes(span, kwargs, response_obj) |
|
return |
|
elif self.callback_name == "langtrace": |
|
from litellm.integrations.langtrace import LangtraceAttributes |
|
|
|
LangtraceAttributes().set_langtrace_attributes( |
|
span, kwargs, response_obj |
|
) |
|
return |
|
from litellm.proxy._types import SpanAttributes |
|
|
|
optional_params = kwargs.get("optional_params", {}) |
|
litellm_params = kwargs.get("litellm_params", {}) or {} |
|
standard_logging_payload: Optional[StandardLoggingPayload] = kwargs.get( |
|
"standard_logging_object" |
|
) |
|
if standard_logging_payload is None: |
|
raise ValueError("standard_logging_object not found in kwargs") |
|
|
|
|
|
|
|
|
|
|
|
|
|
metadata = standard_logging_payload["metadata"] |
|
for key, value in metadata.items(): |
|
self.safe_set_attribute( |
|
span=span, key="metadata.{}".format(key), value=value |
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
if kwargs.get("model"): |
|
self.safe_set_attribute( |
|
span=span, |
|
key=SpanAttributes.LLM_REQUEST_MODEL, |
|
value=kwargs.get("model"), |
|
) |
|
|
|
|
|
self.safe_set_attribute( |
|
span=span, |
|
key=SpanAttributes.LLM_REQUEST_TYPE, |
|
value=standard_logging_payload["call_type"], |
|
) |
|
|
|
|
|
self.safe_set_attribute( |
|
span=span, |
|
key=SpanAttributes.LLM_SYSTEM, |
|
value=litellm_params.get("custom_llm_provider", "Unknown"), |
|
) |
|
|
|
|
|
if optional_params.get("max_tokens"): |
|
self.safe_set_attribute( |
|
span=span, |
|
key=SpanAttributes.LLM_REQUEST_MAX_TOKENS, |
|
value=optional_params.get("max_tokens"), |
|
) |
|
|
|
|
|
if optional_params.get("temperature"): |
|
self.safe_set_attribute( |
|
span=span, |
|
key=SpanAttributes.LLM_REQUEST_TEMPERATURE, |
|
value=optional_params.get("temperature"), |
|
) |
|
|
|
|
|
if optional_params.get("top_p"): |
|
self.safe_set_attribute( |
|
span=span, |
|
key=SpanAttributes.LLM_REQUEST_TOP_P, |
|
value=optional_params.get("top_p"), |
|
) |
|
|
|
self.safe_set_attribute( |
|
span=span, |
|
key=SpanAttributes.LLM_IS_STREAMING, |
|
value=str(optional_params.get("stream", False)), |
|
) |
|
|
|
if optional_params.get("user"): |
|
self.safe_set_attribute( |
|
span=span, |
|
key=SpanAttributes.LLM_USER, |
|
value=optional_params.get("user"), |
|
) |
|
|
|
|
|
if response_obj and response_obj.get("id"): |
|
self.safe_set_attribute( |
|
span=span, key="gen_ai.response.id", value=response_obj.get("id") |
|
) |
|
|
|
|
|
if response_obj and response_obj.get("model"): |
|
self.safe_set_attribute( |
|
span=span, |
|
key=SpanAttributes.LLM_RESPONSE_MODEL, |
|
value=response_obj.get("model"), |
|
) |
|
|
|
usage = response_obj and response_obj.get("usage") |
|
if usage: |
|
self.safe_set_attribute( |
|
span=span, |
|
key=SpanAttributes.LLM_USAGE_TOTAL_TOKENS, |
|
value=usage.get("total_tokens"), |
|
) |
|
|
|
|
|
self.safe_set_attribute( |
|
span=span, |
|
key=SpanAttributes.LLM_USAGE_COMPLETION_TOKENS, |
|
value=usage.get("completion_tokens"), |
|
) |
|
|
|
|
|
self.safe_set_attribute( |
|
span=span, |
|
key=SpanAttributes.LLM_USAGE_PROMPT_TOKENS, |
|
value=usage.get("prompt_tokens"), |
|
) |
|
|
|
|
|
|
|
|
|
|
|
if litellm.turn_off_message_logging is True: |
|
return |
|
if self.message_logging is not True: |
|
return |
|
|
|
if optional_params.get("tools"): |
|
tools = optional_params["tools"] |
|
self.set_tools_attributes(span, tools) |
|
|
|
if kwargs.get("messages"): |
|
for idx, prompt in enumerate(kwargs.get("messages")): |
|
if prompt.get("role"): |
|
self.safe_set_attribute( |
|
span=span, |
|
key=f"{SpanAttributes.LLM_PROMPTS}.{idx}.role", |
|
value=prompt.get("role"), |
|
) |
|
|
|
if prompt.get("content"): |
|
if not isinstance(prompt.get("content"), str): |
|
prompt["content"] = str(prompt.get("content")) |
|
self.safe_set_attribute( |
|
span=span, |
|
key=f"{SpanAttributes.LLM_PROMPTS}.{idx}.content", |
|
value=prompt.get("content"), |
|
) |
|
|
|
|
|
|
|
if response_obj is not None: |
|
if response_obj.get("choices"): |
|
for idx, choice in enumerate(response_obj.get("choices")): |
|
if choice.get("finish_reason"): |
|
self.safe_set_attribute( |
|
span=span, |
|
key=f"{SpanAttributes.LLM_COMPLETIONS}.{idx}.finish_reason", |
|
value=choice.get("finish_reason"), |
|
) |
|
if choice.get("message"): |
|
if choice.get("message").get("role"): |
|
self.safe_set_attribute( |
|
span=span, |
|
key=f"{SpanAttributes.LLM_COMPLETIONS}.{idx}.role", |
|
value=choice.get("message").get("role"), |
|
) |
|
if choice.get("message").get("content"): |
|
if not isinstance( |
|
choice.get("message").get("content"), str |
|
): |
|
choice["message"]["content"] = str( |
|
choice.get("message").get("content") |
|
) |
|
self.safe_set_attribute( |
|
span=span, |
|
key=f"{SpanAttributes.LLM_COMPLETIONS}.{idx}.content", |
|
value=choice.get("message").get("content"), |
|
) |
|
|
|
message = choice.get("message") |
|
tool_calls = message.get("tool_calls") |
|
if tool_calls: |
|
kv_pairs = OpenTelemetry._tool_calls_kv_pair(tool_calls) |
|
for key, value in kv_pairs.items(): |
|
self.safe_set_attribute( |
|
span=span, |
|
key=key, |
|
value=value, |
|
) |
|
|
|
except Exception as e: |
|
verbose_logger.exception( |
|
"OpenTelemetry logging error in set_attributes %s", str(e) |
|
) |
|
|
|
def _cast_as_primitive_value_type(self, value) -> Union[str, bool, int, float]: |
|
""" |
|
Casts the value to a primitive OTEL type if it is not already a primitive type. |
|
|
|
OTEL supports - str, bool, int, float |
|
|
|
If it's not a primitive type, then it's converted to a string |
|
""" |
|
if value is None: |
|
return "" |
|
if isinstance(value, (str, bool, int, float)): |
|
return value |
|
try: |
|
return str(value) |
|
except Exception: |
|
return "" |
|
|
|
def safe_set_attribute(self, span: Span, key: str, value: Any): |
|
""" |
|
Safely sets an attribute on the span, ensuring the value is a primitive type. |
|
""" |
|
primitive_value = self._cast_as_primitive_value_type(value) |
|
span.set_attribute(key, primitive_value) |
|
|
|
def set_raw_request_attributes(self, span: Span, kwargs, response_obj): |
|
|
|
kwargs.get("optional_params", {}) |
|
litellm_params = kwargs.get("litellm_params", {}) or {} |
|
custom_llm_provider = litellm_params.get("custom_llm_provider", "Unknown") |
|
|
|
_raw_response = kwargs.get("original_response") |
|
_additional_args = kwargs.get("additional_args", {}) or {} |
|
complete_input_dict = _additional_args.get("complete_input_dict") |
|
|
|
|
|
|
|
|
|
|
|
if complete_input_dict and isinstance(complete_input_dict, dict): |
|
for param, val in complete_input_dict.items(): |
|
self.safe_set_attribute( |
|
span=span, key=f"llm.{custom_llm_provider}.{param}", value=val |
|
) |
|
|
|
|
|
|
|
|
|
if _raw_response and isinstance(_raw_response, str): |
|
|
|
import json |
|
|
|
try: |
|
_raw_response = json.loads(_raw_response) |
|
for param, val in _raw_response.items(): |
|
self.safe_set_attribute( |
|
span=span, |
|
key=f"llm.{custom_llm_provider}.{param}", |
|
value=val, |
|
) |
|
except json.JSONDecodeError: |
|
verbose_logger.debug( |
|
"litellm.integrations.opentelemetry.py::set_raw_request_attributes() - raw_response not json string - {}".format( |
|
_raw_response |
|
) |
|
) |
|
|
|
self.safe_set_attribute( |
|
span=span, |
|
key=f"llm.{custom_llm_provider}.stringified_raw_response", |
|
value=_raw_response, |
|
) |
|
|
|
def _to_ns(self, dt): |
|
return int(dt.timestamp() * 1e9) |
|
|
|
def _get_span_name(self, kwargs): |
|
return LITELLM_REQUEST_SPAN_NAME |
|
|
|
def get_traceparent_from_header(self, headers): |
|
if headers is None: |
|
return None |
|
_traceparent = headers.get("traceparent", None) |
|
if _traceparent is None: |
|
return None |
|
|
|
from opentelemetry.trace.propagation.tracecontext import ( |
|
TraceContextTextMapPropagator, |
|
) |
|
|
|
propagator = TraceContextTextMapPropagator() |
|
carrier = {"traceparent": _traceparent} |
|
_parent_context = propagator.extract(carrier=carrier) |
|
|
|
return _parent_context |
|
|
|
def _get_span_context(self, kwargs): |
|
from opentelemetry import trace |
|
from opentelemetry.trace.propagation.tracecontext import ( |
|
TraceContextTextMapPropagator, |
|
) |
|
|
|
litellm_params = kwargs.get("litellm_params", {}) or {} |
|
proxy_server_request = litellm_params.get("proxy_server_request", {}) or {} |
|
headers = proxy_server_request.get("headers", {}) or {} |
|
traceparent = headers.get("traceparent", None) |
|
_metadata = litellm_params.get("metadata", {}) or {} |
|
parent_otel_span = _metadata.get("litellm_parent_otel_span", None) |
|
|
|
""" |
|
Two way to use parents in opentelemetry |
|
- using the traceparent header |
|
- using the parent_otel_span in the [metadata][parent_otel_span] |
|
""" |
|
if parent_otel_span is not None: |
|
return trace.set_span_in_context(parent_otel_span), parent_otel_span |
|
|
|
if traceparent is None: |
|
return None, None |
|
else: |
|
carrier = {"traceparent": traceparent} |
|
return TraceContextTextMapPropagator().extract(carrier=carrier), None |
|
|
|
def _get_span_processor(self): |
|
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import ( |
|
OTLPSpanExporter as OTLPSpanExporterGRPC, |
|
) |
|
from opentelemetry.exporter.otlp.proto.http.trace_exporter import ( |
|
OTLPSpanExporter as OTLPSpanExporterHTTP, |
|
) |
|
from opentelemetry.sdk.trace.export import ( |
|
BatchSpanProcessor, |
|
ConsoleSpanExporter, |
|
SimpleSpanProcessor, |
|
SpanExporter, |
|
) |
|
|
|
verbose_logger.debug( |
|
"OpenTelemetry Logger, initializing span processor \nself.OTEL_EXPORTER: %s\nself.OTEL_ENDPOINT: %s\nself.OTEL_HEADERS: %s", |
|
self.OTEL_EXPORTER, |
|
self.OTEL_ENDPOINT, |
|
self.OTEL_HEADERS, |
|
) |
|
_split_otel_headers = {} |
|
if self.OTEL_HEADERS is not None and isinstance(self.OTEL_HEADERS, str): |
|
_split_otel_headers = self.OTEL_HEADERS.split("=") |
|
_split_otel_headers = {_split_otel_headers[0]: _split_otel_headers[1]} |
|
|
|
if isinstance(self.OTEL_EXPORTER, SpanExporter): |
|
verbose_logger.debug( |
|
"OpenTelemetry: intiializing SpanExporter. Value of OTEL_EXPORTER: %s", |
|
self.OTEL_EXPORTER, |
|
) |
|
return SimpleSpanProcessor(self.OTEL_EXPORTER) |
|
|
|
if self.OTEL_EXPORTER == "console": |
|
verbose_logger.debug( |
|
"OpenTelemetry: intiializing console exporter. Value of OTEL_EXPORTER: %s", |
|
self.OTEL_EXPORTER, |
|
) |
|
return BatchSpanProcessor(ConsoleSpanExporter()) |
|
elif self.OTEL_EXPORTER == "otlp_http": |
|
verbose_logger.debug( |
|
"OpenTelemetry: intiializing http exporter. Value of OTEL_EXPORTER: %s", |
|
self.OTEL_EXPORTER, |
|
) |
|
return BatchSpanProcessor( |
|
OTLPSpanExporterHTTP( |
|
endpoint=self.OTEL_ENDPOINT, headers=_split_otel_headers |
|
), |
|
) |
|
elif self.OTEL_EXPORTER == "otlp_grpc": |
|
verbose_logger.debug( |
|
"OpenTelemetry: intiializing grpc exporter. Value of OTEL_EXPORTER: %s", |
|
self.OTEL_EXPORTER, |
|
) |
|
return BatchSpanProcessor( |
|
OTLPSpanExporterGRPC( |
|
endpoint=self.OTEL_ENDPOINT, headers=_split_otel_headers |
|
), |
|
) |
|
else: |
|
verbose_logger.debug( |
|
"OpenTelemetry: intiializing console exporter. Value of OTEL_EXPORTER: %s", |
|
self.OTEL_EXPORTER, |
|
) |
|
return BatchSpanProcessor(ConsoleSpanExporter()) |
|
|
|
async def async_management_endpoint_success_hook( |
|
self, |
|
logging_payload: ManagementEndpointLoggingPayload, |
|
parent_otel_span: Optional[Span] = None, |
|
): |
|
|
|
from opentelemetry import trace |
|
from opentelemetry.trace import Status, StatusCode |
|
|
|
_start_time_ns = 0 |
|
_end_time_ns = 0 |
|
|
|
start_time = logging_payload.start_time |
|
end_time = logging_payload.end_time |
|
|
|
if isinstance(start_time, float): |
|
_start_time_ns = int(start_time * 1e9) |
|
else: |
|
_start_time_ns = self._to_ns(start_time) |
|
|
|
if isinstance(end_time, float): |
|
_end_time_ns = int(end_time * 1e9) |
|
else: |
|
_end_time_ns = self._to_ns(end_time) |
|
|
|
if parent_otel_span is not None: |
|
_span_name = logging_payload.route |
|
management_endpoint_span = self.tracer.start_span( |
|
name=_span_name, |
|
context=trace.set_span_in_context(parent_otel_span), |
|
start_time=_start_time_ns, |
|
) |
|
|
|
_request_data = logging_payload.request_data |
|
if _request_data is not None: |
|
for key, value in _request_data.items(): |
|
self.safe_set_attribute( |
|
span=management_endpoint_span, |
|
key=f"request.{key}", |
|
value=value, |
|
) |
|
|
|
_response = logging_payload.response |
|
if _response is not None: |
|
for key, value in _response.items(): |
|
self.safe_set_attribute( |
|
span=management_endpoint_span, |
|
key=f"response.{key}", |
|
value=value, |
|
) |
|
|
|
management_endpoint_span.set_status(Status(StatusCode.OK)) |
|
management_endpoint_span.end(end_time=_end_time_ns) |
|
|
|
async def async_management_endpoint_failure_hook( |
|
self, |
|
logging_payload: ManagementEndpointLoggingPayload, |
|
parent_otel_span: Optional[Span] = None, |
|
): |
|
|
|
from opentelemetry import trace |
|
from opentelemetry.trace import Status, StatusCode |
|
|
|
_start_time_ns = 0 |
|
_end_time_ns = 0 |
|
|
|
start_time = logging_payload.start_time |
|
end_time = logging_payload.end_time |
|
|
|
if isinstance(start_time, float): |
|
_start_time_ns = int(int(start_time) * 1e9) |
|
else: |
|
_start_time_ns = self._to_ns(start_time) |
|
|
|
if isinstance(end_time, float): |
|
_end_time_ns = int(int(end_time) * 1e9) |
|
else: |
|
_end_time_ns = self._to_ns(end_time) |
|
|
|
if parent_otel_span is not None: |
|
_span_name = logging_payload.route |
|
management_endpoint_span = self.tracer.start_span( |
|
name=_span_name, |
|
context=trace.set_span_in_context(parent_otel_span), |
|
start_time=_start_time_ns, |
|
) |
|
|
|
_request_data = logging_payload.request_data |
|
if _request_data is not None: |
|
for key, value in _request_data.items(): |
|
self.safe_set_attribute( |
|
span=management_endpoint_span, |
|
key=f"request.{key}", |
|
value=value, |
|
) |
|
|
|
_exception = logging_payload.exception |
|
self.safe_set_attribute( |
|
span=management_endpoint_span, |
|
key="exception", |
|
value=str(_exception), |
|
) |
|
management_endpoint_span.set_status(Status(StatusCode.ERROR)) |
|
management_endpoint_span.end(end_time=_end_time_ns) |
|
|