import threading from collections.abc import Mapping from enum import Enum from typing import Any from weakref import WeakValueDictionary from opentelemetry import metrics from opentelemetry.exporter.prometheus import PrometheusMetricReader from opentelemetry.metrics import CallbackOptions, Observation from opentelemetry.metrics._internal.instrument import Counter, Histogram, UpDownCounter from opentelemetry.sdk.metrics import MeterProvider from opentelemetry.sdk.resources import Resource # a default OpenTelemetry meter name langflow_meter_name = "langflow" """ If the measurement values are non-additive, use an Asynchronous Gauge. ObservableGauge reports the current absolute value when observed. If the measurement values are additive: If the value is monotonically increasing - use an Asynchronous Counter. If the value is NOT monotonically increasing - use an Asynchronous UpDownCounter. UpDownCounter reports changes/deltas to the last observed value. If the measurement values are additive and you want to observe the distribution of the values - use a Histogram. """ class MetricType(Enum): COUNTER = "counter" OBSERVABLE_GAUGE = "observable_gauge" HISTOGRAM = "histogram" UP_DOWN_COUNTER = "up_down_counter" mandatory_label = True optional_label = False class ObservableGaugeWrapper: """Wrapper class for ObservableGauge. Since OpenTelemetry does not provide a way to set the value of an ObservableGauge, instead it uses a callback function to get the value, we need to create a wrapper class. """ def __init__(self, name: str, description: str, unit: str): self._values: dict[tuple[tuple[str, str], ...], float] = {} self._meter = metrics.get_meter(langflow_meter_name) self._gauge = self._meter.create_observable_gauge( name=name, description=description, unit=unit, callbacks=[self._callback] ) def _callback(self, _options: CallbackOptions): return [Observation(value, attributes=dict(labels)) for labels, value in self._values.items()] # return [Observation(self._value)] def set_value(self, value: float, labels: Mapping[str, str]) -> None: self._values[tuple(sorted(labels.items()))] = value class Metric: def __init__( self, name: str, description: str, metric_type: MetricType, labels: dict[str, bool], unit: str = "", ): self.name = name self.description = description self.type = metric_type self.unit = unit self.labels = labels self.mandatory_labels = [label for label, required in labels.items() if required] self.allowed_labels = list(labels.keys()) def validate_labels(self, labels: Mapping[str, str]) -> None: """Validate if the labels provided are valid.""" if labels is None or len(labels) == 0: msg = "Labels must be provided for the metric" raise ValueError(msg) missing_labels = set(self.mandatory_labels) - set(labels.keys()) if missing_labels: msg = f"Missing required labels: {missing_labels}" raise ValueError(msg) def __repr__(self) -> str: return f"Metric(name='{self.name}', description='{self.description}', type={self.type}, unit='{self.unit}')" class ThreadSafeSingletonMetaUsingWeakref(type): """Thread-safe Singleton metaclass using WeakValueDictionary.""" _instances: WeakValueDictionary[Any, Any] = WeakValueDictionary() _lock: threading.Lock = threading.Lock() def __call__(cls, *args, **kwargs): if cls not in cls._instances: with cls._lock: if cls not in cls._instances: instance = super().__call__(*args, **kwargs) cls._instances[cls] = instance return cls._instances[cls] class OpenTelemetry(metaclass=ThreadSafeSingletonMetaUsingWeakref): _metrics_registry: dict[str, Metric] = {} _metrics: dict[str, Counter | ObservableGaugeWrapper | Histogram | UpDownCounter] = {} _meter_provider: MeterProvider | None = None _initialized: bool = False # Add initialization flag prometheus_enabled: bool = True def _add_metric( self, name: str, description: str, unit: str, metric_type: MetricType, labels: dict[str, bool] ) -> None: metric = Metric(name=name, description=description, metric_type=metric_type, unit=unit, labels=labels) self._metrics_registry[name] = metric if labels is None or len(labels) == 0: msg = "Labels must be provided for the metric upon registration" raise ValueError(msg) def _register_metric(self) -> None: """Define any custom metrics here. A thread safe singleton class to manage metrics. """ self._add_metric( name="file_uploads", description="The uploaded file size in bytes", unit="bytes", metric_type=MetricType.OBSERVABLE_GAUGE, labels={"flow_id": mandatory_label}, ) self._add_metric( name="num_files_uploaded", description="The number of file uploaded", unit="", metric_type=MetricType.COUNTER, labels={"flow_id": mandatory_label}, ) def __init__(self, *, prometheus_enabled: bool = True): # Only initialize once self.prometheus_enabled = prometheus_enabled if OpenTelemetry._initialized: return if not self._metrics_registry: self._register_metric() if self._meter_provider is None: # Get existing meter provider if any existing_provider = metrics.get_meter_provider() # Check if FastAPI instrumentation is already set up if hasattr(existing_provider, "get_meter") and existing_provider.get_meter("http.server"): self._meter_provider = existing_provider else: resource = Resource.create({"service.name": "langflow"}) metric_readers = [] if self.prometheus_enabled: metric_readers.append(PrometheusMetricReader()) self._meter_provider = MeterProvider(resource=resource, metric_readers=metric_readers) metrics.set_meter_provider(self._meter_provider) self.meter = self._meter_provider.get_meter(langflow_meter_name) for name, metric in self._metrics_registry.items(): if name != metric.name: msg = f"Key '{name}' does not match metric name '{metric.name}'" raise ValueError(msg) if name not in self._metrics: self._metrics[metric.name] = self._create_metric(metric) OpenTelemetry._initialized = True def _create_metric(self, metric): # Remove _created_instruments check if metric.name in self._metrics: return self._metrics[metric.name] if metric.type == MetricType.COUNTER: return self.meter.create_counter( name=metric.name, unit=metric.unit, description=metric.description, ) if metric.type == MetricType.OBSERVABLE_GAUGE: return ObservableGaugeWrapper( name=metric.name, description=metric.description, unit=metric.unit, ) if metric.type == MetricType.UP_DOWN_COUNTER: return self.meter.create_up_down_counter( name=metric.name, unit=metric.unit, description=metric.description, ) if metric.type == MetricType.HISTOGRAM: return self.meter.create_histogram( name=metric.name, unit=metric.unit, description=metric.description, ) msg = f"Unknown metric type: {metric.type}" raise ValueError(msg) def validate_labels(self, metric_name: str, labels: Mapping[str, str]) -> None: reg = self._metrics_registry.get(metric_name) if reg is None: msg = f"Metric '{metric_name}' is not registered" raise ValueError(msg) reg.validate_labels(labels) def increment_counter(self, metric_name: str, labels: Mapping[str, str], value: float = 1.0) -> None: self.validate_labels(metric_name, labels) counter = self._metrics.get(metric_name) if isinstance(counter, Counter): counter.add(value, labels) else: msg = f"Metric '{metric_name}' is not a counter" raise TypeError(msg) def up_down_counter(self, metric_name: str, value: float, labels: Mapping[str, str]) -> None: self.validate_labels(metric_name, labels) up_down_counter = self._metrics.get(metric_name) if isinstance(up_down_counter, UpDownCounter): up_down_counter.add(value, labels) else: msg = f"Metric '{metric_name}' is not an up down counter" raise TypeError(msg) def update_gauge(self, metric_name: str, value: float, labels: Mapping[str, str]) -> None: self.validate_labels(metric_name, labels) gauge = self._metrics.get(metric_name) if isinstance(gauge, ObservableGaugeWrapper): gauge.set_value(value, labels) else: msg = f"Metric '{metric_name}' is not a gauge" raise TypeError(msg) def observe_histogram(self, metric_name: str, value: float, labels: Mapping[str, str]) -> None: self.validate_labels(metric_name, labels) histogram = self._metrics.get(metric_name) if isinstance(histogram, Histogram): histogram.record(value, labels) else: msg = f"Metric '{metric_name}' is not a histogram" raise TypeError(msg)