from __future__ import annotations import os from datetime import datetime, timezone from typing import TYPE_CHECKING, Any from loguru import logger from typing_extensions import override from langflow.services.tracing.base import BaseTracer if TYPE_CHECKING: from collections.abc import Sequence from uuid import UUID from langchain.callbacks.base import BaseCallbackHandler from langfuse.client import StatefulSpanClient from langflow.graph.vertex.base import Vertex from langflow.services.tracing.schema import Log class LangFuseTracer(BaseTracer): flow_id: str def __init__(self, trace_name: str, trace_type: str, project_name: str, trace_id: UUID): self.project_name = project_name self.trace_name = trace_name self.trace_type = trace_type self.trace_id = trace_id self.flow_id = trace_name.split(" - ")[-1] self.last_span: StatefulSpanClient | None = None self.spans: dict = {} config = self._get_config() self._ready: bool = self.setup_langfuse(config) if config else False @property def ready(self): return self._ready def setup_langfuse(self, config) -> bool: try: from langfuse import Langfuse from langfuse.callback.langchain import LangchainCallbackHandler self._client = Langfuse(**config) self.trace = self._client.trace(id=str(self.trace_id), name=self.flow_id) config |= { "trace_name": self.flow_id, "stateful_client": self.trace, "update_stateful_client": True, } self._callback = LangchainCallbackHandler(**config) except ImportError: logger.exception("Could not import langfuse. Please install it with `pip install langfuse`.") return False except Exception as e: # noqa: BLE001 logger.debug(f"Error setting up LangSmith tracer: {e}") return False return True @override def add_trace( self, trace_id: str, trace_name: str, trace_type: str, inputs: dict[str, Any], metadata: dict[str, Any] | None = None, vertex: Vertex | None = None, ) -> None: start_time = datetime.now(tz=timezone.utc) if not self._ready: return metadata_: dict = {} metadata_ |= {"trace_type": trace_type} if trace_type else {} metadata_ |= metadata or {} name = trace_name.removesuffix(f" ({trace_id})") content_span = { "name": name, "input": inputs, "metadata": metadata_, "start_time": start_time, } span = self.last_span.span(**content_span) if self.last_span else self.trace.span(**content_span) self.last_span = span self.spans[trace_id] = span @override def end_trace( self, trace_id: str, trace_name: str, outputs: dict[str, Any] | None = None, error: Exception | None = None, logs: Sequence[Log | dict] = (), ) -> None: end_time = datetime.now(tz=timezone.utc) if not self._ready: return span = self.spans.get(trace_id, None) if span: output: dict = {} output |= outputs or {} output |= {"error": str(error)} if error else {} output |= {"logs": list(logs)} if logs else {} content = {"output": output, "end_time": end_time} span.update(**content) @override def end( self, inputs: dict[str, Any], outputs: dict[str, Any], error: Exception | None = None, metadata: dict[str, Any] | None = None, ) -> None: if not self._ready: return self._client.flush() def get_langchain_callback(self) -> BaseCallbackHandler | None: if not self._ready: return None return None # self._callback def _get_config(self) -> dict: secret_key = os.getenv("LANGFUSE_SECRET_KEY", None) public_key = os.getenv("LANGFUSE_PUBLIC_KEY", None) host = os.getenv("LANGFUSE_HOST", None) if secret_key and public_key and host: return {"secret_key": secret_key, "public_key": public_key, "host": host} return {}