from __future__ import annotations import os import traceback import types from datetime import datetime, timezone from typing import TYPE_CHECKING, Any from loguru import logger from langflow.schema.data import Data from langflow.services.tracing.base import BaseTracer if TYPE_CHECKING: from collections.abc import Sequence from uuid import UUID from langchain.callbacks.base import BaseCallbackHandler from langflow.graph.vertex.base import Vertex from langflow.services.tracing.schema import Log class LangSmithTracer(BaseTracer): def __init__(self, trace_name: str, trace_type: str, project_name: str, trace_id: UUID): try: self._ready = self.setup_langsmith() if not self._ready: return from langsmith.run_trees import RunTree self.trace_name = trace_name self.trace_type = trace_type self.project_name = project_name self.trace_id = trace_id self._run_tree = RunTree( project_name=self.project_name, name=self.trace_name, run_type=self.trace_type, id=self.trace_id, ) self._run_tree.add_event({"name": "Start", "time": datetime.now(timezone.utc).isoformat()}) self._children: dict[str, RunTree] = {} except Exception: # noqa: BLE001 logger.debug("Error setting up LangSmith tracer") self._ready = False @property def ready(self): return self._ready def setup_langsmith(self) -> bool: if os.getenv("LANGCHAIN_API_KEY") is None: return False try: from langsmith import Client self._client = Client() except ImportError: logger.exception("Could not import langsmith. Please install it with `pip install langsmith`.") return False os.environ["LANGCHAIN_TRACING_V2"] = "true" return True def add_trace( self, trace_id: str, # noqa: ARG002 trace_name: str, trace_type: str, inputs: dict[str, Any], metadata: dict[str, Any] | None = None, vertex: Vertex | None = None, # noqa: ARG002 ) -> None: if not self._ready or not self._run_tree: return processed_inputs = {} if inputs: processed_inputs = self._convert_to_langchain_types(inputs) child = self._run_tree.create_child( name=trace_name, run_type=trace_type, # type: ignore[arg-type] inputs=processed_inputs, ) if metadata: child.add_metadata(self._convert_to_langchain_types(metadata)) self._children[trace_name] = child self._child_link: dict[str, str] = {} def _convert_to_langchain_types(self, io_dict: dict[str, Any]): converted = {} for key, value in io_dict.items(): converted[key] = self._convert_to_langchain_type(value) return converted def _convert_to_langchain_type(self, value): from langflow.schema.message import Message if isinstance(value, dict): value = {key: self._convert_to_langchain_type(val) for key, val in value.items()} elif isinstance(value, list): value = [self._convert_to_langchain_type(v) for v in value] elif isinstance(value, Message): if "prompt" in value: value = value.load_lc_prompt() elif value.sender: value = value.to_lc_message() else: value = value.to_lc_document() elif isinstance(value, Data): value = value.to_lc_document() elif isinstance(value, types.GeneratorType): # generator is not serializable, also we can't consume it value = str(value) return value def end_trace( self, trace_id: str, # noqa: ARG002 trace_name: str, outputs: dict[str, Any] | None = None, error: Exception | None = None, logs: Sequence[Log | dict] = (), ): if not self._ready or trace_name not in self._children: return child = self._children[trace_name] raw_outputs = {} processed_outputs = {} if outputs: raw_outputs = outputs processed_outputs = self._convert_to_langchain_types(outputs) if logs: logs_dicts = [log if isinstance(log, dict) else log.model_dump() for log in logs] child.add_metadata(self._convert_to_langchain_types({"logs": {log.get("name"): log for log in logs_dicts}})) child.add_metadata(self._convert_to_langchain_types({"outputs": raw_outputs})) child.end(outputs=processed_outputs, error=self._error_to_string(error)) if error: child.patch() else: child.post() self._child_link[trace_name] = child.get_url() def _error_to_string(self, error: Exception | None): error_message = None if error: string_stacktrace = traceback.format_exception(error) error_message = f"{error.__class__.__name__}: {error}\n\n{string_stacktrace}" return error_message def end( self, inputs: dict[str, Any], outputs: dict[str, Any], error: Exception | None = None, metadata: dict[str, Any] | None = None, ) -> None: if not self._ready or not self._run_tree: return self._run_tree.add_metadata({"inputs": inputs}) if metadata: self._run_tree.add_metadata(metadata) self._run_tree.end(outputs=outputs, error=self._error_to_string(error)) self._run_tree.post() self._run_link = self._run_tree.get_url() def get_langchain_callback(self) -> BaseCallbackHandler | None: return None