Tai Truong
fix readme
d202ada
from __future__ import annotations
from typing import TYPE_CHECKING, Any, cast
import nanoid
from loguru import logger
from typing_extensions import override
from langflow.schema.data import Data
from langflow.services.tracing.base import BaseTracer
if TYPE_CHECKING:
from collections.abc import Sequence
from uuid import UUID
from langchain.callbacks.base import BaseCallbackHandler
from langwatch.tracer import ContextSpan
from langflow.graph.vertex.base import Vertex
from langflow.services.tracing.schema import Log
class LangWatchTracer(BaseTracer):
flow_id: str
def __init__(self, trace_name: str, trace_type: str, project_name: str, trace_id: UUID):
self.trace_name = trace_name
self.trace_type = trace_type
self.project_name = project_name
self.trace_id = trace_id
self.flow_id = trace_name.split(" - ")[-1]
try:
self._ready = self.setup_langwatch()
if not self._ready:
return
self.trace = self._client.trace(
trace_id=str(self.trace_id),
)
self.spans: dict[str, ContextSpan] = {}
name_without_id = " - ".join(trace_name.split(" - ")[0:-1])
name_without_id = project_name if name_without_id == "None" else name_without_id
self.trace.root_span.update(
# nanoid to make the span_id globally unique, which is required for LangWatch for now
span_id=f"{self.flow_id}-{nanoid.generate(size=6)}",
name=name_without_id,
type="workflow",
)
except Exception: # noqa: BLE001
logger.debug("Error setting up LangWatch tracer")
self._ready = False
@property
def ready(self):
return self._ready
def setup_langwatch(self) -> bool:
try:
import langwatch
self._client = langwatch
except ImportError:
logger.exception("Could not import langwatch. Please install it with `pip install langwatch`.")
return False
return True
@override
def add_trace(
self,
trace_id: str,
trace_name: str,
trace_type: str,
inputs: dict[str, Any],
metadata: dict[str, Any] | None = None,
vertex: Vertex | None = None,
) -> None:
if not self._ready:
return
# If user is not using session_id, then it becomes the same as flow_id, but
# we don't want to have an infinite thread with all the flow messages
if "session_id" in inputs and inputs["session_id"] != self.flow_id:
self.trace.update(metadata=(self.trace.metadata or {}) | {"thread_id": inputs["session_id"]})
name_without_id = " (".join(trace_name.split(" (")[0:-1])
previous_nodes = (
[span for key, span in self.spans.items() for edge in vertex.incoming_edges if key == edge.source_id]
if vertex and len(vertex.incoming_edges) > 0
else []
)
span = self.trace.span(
# Add a nanoid to make the span_id globally unique, which is required for LangWatch for now
span_id=f"{trace_id}-{nanoid.generate(size=6)}",
name=name_without_id,
type="component",
parent=(previous_nodes[-1] if len(previous_nodes) > 0 else self.trace.root_span),
input=self._convert_to_langwatch_types(inputs),
)
self.trace.set_current_span(span)
self.spans[trace_id] = span
@override
def end_trace(
self,
trace_id: str,
trace_name: str,
outputs: dict[str, Any] | None = None,
error: Exception | None = None,
logs: Sequence[Log | dict] = (),
) -> None:
if not self._ready:
return
if self.spans.get(trace_id):
self.spans[trace_id].end(output=self._convert_to_langwatch_types(outputs), error=error)
def end(
self,
inputs: dict[str, Any],
outputs: dict[str, Any],
error: Exception | None = None,
metadata: dict[str, Any] | None = None,
) -> None:
if not self._ready:
return
self.trace.root_span.end(
input=self._convert_to_langwatch_types(inputs),
output=self._convert_to_langwatch_types(outputs),
error=error,
)
if metadata and "flow_name" in metadata:
self.trace.update(metadata=(self.trace.metadata or {}) | {"labels": [f"Flow: {metadata['flow_name']}"]})
if self.trace.api_key or self._client.api_key:
self.trace.deferred_send_spans()
def _convert_to_langwatch_types(self, io_dict: dict[str, Any] | None):
from langwatch.utils import autoconvert_typed_values
if io_dict is None:
return None
converted = {}
for key, value in io_dict.items():
converted[key] = self._convert_to_langwatch_type(value)
return autoconvert_typed_values(converted)
def _convert_to_langwatch_type(self, value):
from langwatch.langchain import langchain_message_to_chat_message, langchain_messages_to_chat_messages
from langflow.schema.message import BaseMessage, Message
if isinstance(value, dict):
value = {key: self._convert_to_langwatch_type(val) for key, val in value.items()}
elif isinstance(value, list):
value = [self._convert_to_langwatch_type(v) for v in value]
elif isinstance(value, Message):
if "prompt" in value:
prompt = value.load_lc_prompt()
if len(prompt.input_variables) == 0 and all(isinstance(m, BaseMessage) for m in prompt.messages):
value = langchain_messages_to_chat_messages([cast("list[BaseMessage]", prompt.messages)])
else:
value = cast("dict", value.load_lc_prompt())
elif value.sender:
value = langchain_message_to_chat_message(value.to_lc_message())
else:
value = cast("dict", value.to_lc_document())
elif isinstance(value, Data):
value = cast("dict", value.to_lc_document())
return value
def get_langchain_callback(self) -> BaseCallbackHandler | None:
if self.trace is None:
return None
return self.trace.get_langchain_callback()