Tai Truong
fix readme
d202ada
import threading
from collections.abc import Mapping
from enum import Enum
from typing import Any
from weakref import WeakValueDictionary
from opentelemetry import metrics
from opentelemetry.exporter.prometheus import PrometheusMetricReader
from opentelemetry.metrics import CallbackOptions, Observation
from opentelemetry.metrics._internal.instrument import Counter, Histogram, UpDownCounter
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.resources import Resource
# a default OpenTelemetry meter name
langflow_meter_name = "langflow"
"""
If the measurement values are non-additive, use an Asynchronous Gauge.
ObservableGauge reports the current absolute value when observed.
If the measurement values are additive: If the value is monotonically increasing - use an Asynchronous Counter.
If the value is NOT monotonically increasing - use an Asynchronous UpDownCounter.
UpDownCounter reports changes/deltas to the last observed value.
If the measurement values are additive and you want to observe the distribution of the values - use a Histogram.
"""
class MetricType(Enum):
COUNTER = "counter"
OBSERVABLE_GAUGE = "observable_gauge"
HISTOGRAM = "histogram"
UP_DOWN_COUNTER = "up_down_counter"
mandatory_label = True
optional_label = False
class ObservableGaugeWrapper:
"""Wrapper class for ObservableGauge.
Since OpenTelemetry does not provide a way to set the value of an ObservableGauge,
instead it uses a callback function to get the value, we need to create a wrapper class.
"""
def __init__(self, name: str, description: str, unit: str):
self._values: dict[tuple[tuple[str, str], ...], float] = {}
self._meter = metrics.get_meter(langflow_meter_name)
self._gauge = self._meter.create_observable_gauge(
name=name, description=description, unit=unit, callbacks=[self._callback]
)
def _callback(self, _options: CallbackOptions):
return [Observation(value, attributes=dict(labels)) for labels, value in self._values.items()]
# return [Observation(self._value)]
def set_value(self, value: float, labels: Mapping[str, str]) -> None:
self._values[tuple(sorted(labels.items()))] = value
class Metric:
def __init__(
self,
name: str,
description: str,
metric_type: MetricType,
labels: dict[str, bool],
unit: str = "",
):
self.name = name
self.description = description
self.type = metric_type
self.unit = unit
self.labels = labels
self.mandatory_labels = [label for label, required in labels.items() if required]
self.allowed_labels = list(labels.keys())
def validate_labels(self, labels: Mapping[str, str]) -> None:
"""Validate if the labels provided are valid."""
if labels is None or len(labels) == 0:
msg = "Labels must be provided for the metric"
raise ValueError(msg)
missing_labels = set(self.mandatory_labels) - set(labels.keys())
if missing_labels:
msg = f"Missing required labels: {missing_labels}"
raise ValueError(msg)
def __repr__(self) -> str:
return f"Metric(name='{self.name}', description='{self.description}', type={self.type}, unit='{self.unit}')"
class ThreadSafeSingletonMetaUsingWeakref(type):
"""Thread-safe Singleton metaclass using WeakValueDictionary."""
_instances: WeakValueDictionary[Any, Any] = WeakValueDictionary()
_lock: threading.Lock = threading.Lock()
def __call__(cls, *args, **kwargs):
if cls not in cls._instances:
with cls._lock:
if cls not in cls._instances:
instance = super().__call__(*args, **kwargs)
cls._instances[cls] = instance
return cls._instances[cls]
class OpenTelemetry(metaclass=ThreadSafeSingletonMetaUsingWeakref):
_metrics_registry: dict[str, Metric] = {}
_metrics: dict[str, Counter | ObservableGaugeWrapper | Histogram | UpDownCounter] = {}
_meter_provider: MeterProvider | None = None
_initialized: bool = False # Add initialization flag
prometheus_enabled: bool = True
def _add_metric(
self, name: str, description: str, unit: str, metric_type: MetricType, labels: dict[str, bool]
) -> None:
metric = Metric(name=name, description=description, metric_type=metric_type, unit=unit, labels=labels)
self._metrics_registry[name] = metric
if labels is None or len(labels) == 0:
msg = "Labels must be provided for the metric upon registration"
raise ValueError(msg)
def _register_metric(self) -> None:
"""Define any custom metrics here.
A thread safe singleton class to manage metrics.
"""
self._add_metric(
name="file_uploads",
description="The uploaded file size in bytes",
unit="bytes",
metric_type=MetricType.OBSERVABLE_GAUGE,
labels={"flow_id": mandatory_label},
)
self._add_metric(
name="num_files_uploaded",
description="The number of file uploaded",
unit="",
metric_type=MetricType.COUNTER,
labels={"flow_id": mandatory_label},
)
def __init__(self, *, prometheus_enabled: bool = True):
# Only initialize once
self.prometheus_enabled = prometheus_enabled
if OpenTelemetry._initialized:
return
if not self._metrics_registry:
self._register_metric()
if self._meter_provider is None:
# Get existing meter provider if any
existing_provider = metrics.get_meter_provider()
# Check if FastAPI instrumentation is already set up
if hasattr(existing_provider, "get_meter") and existing_provider.get_meter("http.server"):
self._meter_provider = existing_provider
else:
resource = Resource.create({"service.name": "langflow"})
metric_readers = []
if self.prometheus_enabled:
metric_readers.append(PrometheusMetricReader())
self._meter_provider = MeterProvider(resource=resource, metric_readers=metric_readers)
metrics.set_meter_provider(self._meter_provider)
self.meter = self._meter_provider.get_meter(langflow_meter_name)
for name, metric in self._metrics_registry.items():
if name != metric.name:
msg = f"Key '{name}' does not match metric name '{metric.name}'"
raise ValueError(msg)
if name not in self._metrics:
self._metrics[metric.name] = self._create_metric(metric)
OpenTelemetry._initialized = True
def _create_metric(self, metric):
# Remove _created_instruments check
if metric.name in self._metrics:
return self._metrics[metric.name]
if metric.type == MetricType.COUNTER:
return self.meter.create_counter(
name=metric.name,
unit=metric.unit,
description=metric.description,
)
if metric.type == MetricType.OBSERVABLE_GAUGE:
return ObservableGaugeWrapper(
name=metric.name,
description=metric.description,
unit=metric.unit,
)
if metric.type == MetricType.UP_DOWN_COUNTER:
return self.meter.create_up_down_counter(
name=metric.name,
unit=metric.unit,
description=metric.description,
)
if metric.type == MetricType.HISTOGRAM:
return self.meter.create_histogram(
name=metric.name,
unit=metric.unit,
description=metric.description,
)
msg = f"Unknown metric type: {metric.type}"
raise ValueError(msg)
def validate_labels(self, metric_name: str, labels: Mapping[str, str]) -> None:
reg = self._metrics_registry.get(metric_name)
if reg is None:
msg = f"Metric '{metric_name}' is not registered"
raise ValueError(msg)
reg.validate_labels(labels)
def increment_counter(self, metric_name: str, labels: Mapping[str, str], value: float = 1.0) -> None:
self.validate_labels(metric_name, labels)
counter = self._metrics.get(metric_name)
if isinstance(counter, Counter):
counter.add(value, labels)
else:
msg = f"Metric '{metric_name}' is not a counter"
raise TypeError(msg)
def up_down_counter(self, metric_name: str, value: float, labels: Mapping[str, str]) -> None:
self.validate_labels(metric_name, labels)
up_down_counter = self._metrics.get(metric_name)
if isinstance(up_down_counter, UpDownCounter):
up_down_counter.add(value, labels)
else:
msg = f"Metric '{metric_name}' is not an up down counter"
raise TypeError(msg)
def update_gauge(self, metric_name: str, value: float, labels: Mapping[str, str]) -> None:
self.validate_labels(metric_name, labels)
gauge = self._metrics.get(metric_name)
if isinstance(gauge, ObservableGaugeWrapper):
gauge.set_value(value, labels)
else:
msg = f"Metric '{metric_name}' is not a gauge"
raise TypeError(msg)
def observe_histogram(self, metric_name: str, value: float, labels: Mapping[str, str]) -> None:
self.validate_labels(metric_name, labels)
histogram = self._metrics.get(metric_name)
if isinstance(histogram, Histogram):
histogram.record(value, labels)
else:
msg = f"Metric '{metric_name}' is not a histogram"
raise TypeError(msg)