Spaces:
Running
Running
import threading | |
from collections.abc import Mapping | |
from enum import Enum | |
from typing import Any | |
from weakref import WeakValueDictionary | |
from opentelemetry import metrics | |
from opentelemetry.exporter.prometheus import PrometheusMetricReader | |
from opentelemetry.metrics import CallbackOptions, Observation | |
from opentelemetry.metrics._internal.instrument import Counter, Histogram, UpDownCounter | |
from opentelemetry.sdk.metrics import MeterProvider | |
from opentelemetry.sdk.resources import Resource | |
# a default OpenTelemetry meter name | |
langflow_meter_name = "langflow" | |
""" | |
If the measurement values are non-additive, use an Asynchronous Gauge. | |
ObservableGauge reports the current absolute value when observed. | |
If the measurement values are additive: If the value is monotonically increasing - use an Asynchronous Counter. | |
If the value is NOT monotonically increasing - use an Asynchronous UpDownCounter. | |
UpDownCounter reports changes/deltas to the last observed value. | |
If the measurement values are additive and you want to observe the distribution of the values - use a Histogram. | |
""" | |
class MetricType(Enum): | |
COUNTER = "counter" | |
OBSERVABLE_GAUGE = "observable_gauge" | |
HISTOGRAM = "histogram" | |
UP_DOWN_COUNTER = "up_down_counter" | |
mandatory_label = True | |
optional_label = False | |
class ObservableGaugeWrapper: | |
"""Wrapper class for ObservableGauge. | |
Since OpenTelemetry does not provide a way to set the value of an ObservableGauge, | |
instead it uses a callback function to get the value, we need to create a wrapper class. | |
""" | |
def __init__(self, name: str, description: str, unit: str): | |
self._values: dict[tuple[tuple[str, str], ...], float] = {} | |
self._meter = metrics.get_meter(langflow_meter_name) | |
self._gauge = self._meter.create_observable_gauge( | |
name=name, description=description, unit=unit, callbacks=[self._callback] | |
) | |
def _callback(self, _options: CallbackOptions): | |
return [Observation(value, attributes=dict(labels)) for labels, value in self._values.items()] | |
# return [Observation(self._value)] | |
def set_value(self, value: float, labels: Mapping[str, str]) -> None: | |
self._values[tuple(sorted(labels.items()))] = value | |
class Metric: | |
def __init__( | |
self, | |
name: str, | |
description: str, | |
metric_type: MetricType, | |
labels: dict[str, bool], | |
unit: str = "", | |
): | |
self.name = name | |
self.description = description | |
self.type = metric_type | |
self.unit = unit | |
self.labels = labels | |
self.mandatory_labels = [label for label, required in labels.items() if required] | |
self.allowed_labels = list(labels.keys()) | |
def validate_labels(self, labels: Mapping[str, str]) -> None: | |
"""Validate if the labels provided are valid.""" | |
if labels is None or len(labels) == 0: | |
msg = "Labels must be provided for the metric" | |
raise ValueError(msg) | |
missing_labels = set(self.mandatory_labels) - set(labels.keys()) | |
if missing_labels: | |
msg = f"Missing required labels: {missing_labels}" | |
raise ValueError(msg) | |
def __repr__(self) -> str: | |
return f"Metric(name='{self.name}', description='{self.description}', type={self.type}, unit='{self.unit}')" | |
class ThreadSafeSingletonMetaUsingWeakref(type): | |
"""Thread-safe Singleton metaclass using WeakValueDictionary.""" | |
_instances: WeakValueDictionary[Any, Any] = WeakValueDictionary() | |
_lock: threading.Lock = threading.Lock() | |
def __call__(cls, *args, **kwargs): | |
if cls not in cls._instances: | |
with cls._lock: | |
if cls not in cls._instances: | |
instance = super().__call__(*args, **kwargs) | |
cls._instances[cls] = instance | |
return cls._instances[cls] | |
class OpenTelemetry(metaclass=ThreadSafeSingletonMetaUsingWeakref): | |
_metrics_registry: dict[str, Metric] = {} | |
_metrics: dict[str, Counter | ObservableGaugeWrapper | Histogram | UpDownCounter] = {} | |
_meter_provider: MeterProvider | None = None | |
_initialized: bool = False # Add initialization flag | |
prometheus_enabled: bool = True | |
def _add_metric( | |
self, name: str, description: str, unit: str, metric_type: MetricType, labels: dict[str, bool] | |
) -> None: | |
metric = Metric(name=name, description=description, metric_type=metric_type, unit=unit, labels=labels) | |
self._metrics_registry[name] = metric | |
if labels is None or len(labels) == 0: | |
msg = "Labels must be provided for the metric upon registration" | |
raise ValueError(msg) | |
def _register_metric(self) -> None: | |
"""Define any custom metrics here. | |
A thread safe singleton class to manage metrics. | |
""" | |
self._add_metric( | |
name="file_uploads", | |
description="The uploaded file size in bytes", | |
unit="bytes", | |
metric_type=MetricType.OBSERVABLE_GAUGE, | |
labels={"flow_id": mandatory_label}, | |
) | |
self._add_metric( | |
name="num_files_uploaded", | |
description="The number of file uploaded", | |
unit="", | |
metric_type=MetricType.COUNTER, | |
labels={"flow_id": mandatory_label}, | |
) | |
def __init__(self, *, prometheus_enabled: bool = True): | |
# Only initialize once | |
self.prometheus_enabled = prometheus_enabled | |
if OpenTelemetry._initialized: | |
return | |
if not self._metrics_registry: | |
self._register_metric() | |
if self._meter_provider is None: | |
# Get existing meter provider if any | |
existing_provider = metrics.get_meter_provider() | |
# Check if FastAPI instrumentation is already set up | |
if hasattr(existing_provider, "get_meter") and existing_provider.get_meter("http.server"): | |
self._meter_provider = existing_provider | |
else: | |
resource = Resource.create({"service.name": "langflow"}) | |
metric_readers = [] | |
if self.prometheus_enabled: | |
metric_readers.append(PrometheusMetricReader()) | |
self._meter_provider = MeterProvider(resource=resource, metric_readers=metric_readers) | |
metrics.set_meter_provider(self._meter_provider) | |
self.meter = self._meter_provider.get_meter(langflow_meter_name) | |
for name, metric in self._metrics_registry.items(): | |
if name != metric.name: | |
msg = f"Key '{name}' does not match metric name '{metric.name}'" | |
raise ValueError(msg) | |
if name not in self._metrics: | |
self._metrics[metric.name] = self._create_metric(metric) | |
OpenTelemetry._initialized = True | |
def _create_metric(self, metric): | |
# Remove _created_instruments check | |
if metric.name in self._metrics: | |
return self._metrics[metric.name] | |
if metric.type == MetricType.COUNTER: | |
return self.meter.create_counter( | |
name=metric.name, | |
unit=metric.unit, | |
description=metric.description, | |
) | |
if metric.type == MetricType.OBSERVABLE_GAUGE: | |
return ObservableGaugeWrapper( | |
name=metric.name, | |
description=metric.description, | |
unit=metric.unit, | |
) | |
if metric.type == MetricType.UP_DOWN_COUNTER: | |
return self.meter.create_up_down_counter( | |
name=metric.name, | |
unit=metric.unit, | |
description=metric.description, | |
) | |
if metric.type == MetricType.HISTOGRAM: | |
return self.meter.create_histogram( | |
name=metric.name, | |
unit=metric.unit, | |
description=metric.description, | |
) | |
msg = f"Unknown metric type: {metric.type}" | |
raise ValueError(msg) | |
def validate_labels(self, metric_name: str, labels: Mapping[str, str]) -> None: | |
reg = self._metrics_registry.get(metric_name) | |
if reg is None: | |
msg = f"Metric '{metric_name}' is not registered" | |
raise ValueError(msg) | |
reg.validate_labels(labels) | |
def increment_counter(self, metric_name: str, labels: Mapping[str, str], value: float = 1.0) -> None: | |
self.validate_labels(metric_name, labels) | |
counter = self._metrics.get(metric_name) | |
if isinstance(counter, Counter): | |
counter.add(value, labels) | |
else: | |
msg = f"Metric '{metric_name}' is not a counter" | |
raise TypeError(msg) | |
def up_down_counter(self, metric_name: str, value: float, labels: Mapping[str, str]) -> None: | |
self.validate_labels(metric_name, labels) | |
up_down_counter = self._metrics.get(metric_name) | |
if isinstance(up_down_counter, UpDownCounter): | |
up_down_counter.add(value, labels) | |
else: | |
msg = f"Metric '{metric_name}' is not an up down counter" | |
raise TypeError(msg) | |
def update_gauge(self, metric_name: str, value: float, labels: Mapping[str, str]) -> None: | |
self.validate_labels(metric_name, labels) | |
gauge = self._metrics.get(metric_name) | |
if isinstance(gauge, ObservableGaugeWrapper): | |
gauge.set_value(value, labels) | |
else: | |
msg = f"Metric '{metric_name}' is not a gauge" | |
raise TypeError(msg) | |
def observe_histogram(self, metric_name: str, value: float, labels: Mapping[str, str]) -> None: | |
self.validate_labels(metric_name, labels) | |
histogram = self._metrics.get(metric_name) | |
if isinstance(histogram, Histogram): | |
histogram.record(value, labels) | |
else: | |
msg = f"Metric '{metric_name}' is not a histogram" | |
raise TypeError(msg) | |