Spaces:
Running
Running
from __future__ import annotations | |
from abc import ABC, abstractmethod | |
from typing import TYPE_CHECKING, Any | |
if TYPE_CHECKING: | |
from collections.abc import Sequence | |
from uuid import UUID | |
from langchain.callbacks.base import BaseCallbackHandler | |
from langflow.graph.vertex.base import Vertex | |
from langflow.services.tracing.schema import Log | |
class BaseTracer(ABC): | |
trace_id: UUID | |
def __init__(self, trace_name: str, trace_type: str, project_name: str, trace_id: UUID): | |
raise NotImplementedError | |
def ready(self) -> bool: | |
raise NotImplementedError | |
def add_trace( | |
self, | |
trace_id: str, | |
trace_name: str, | |
trace_type: str, | |
inputs: dict[str, Any], | |
metadata: dict[str, Any] | None = None, | |
vertex: Vertex | None = None, | |
) -> None: | |
raise NotImplementedError | |
def end_trace( | |
self, | |
trace_id: str, | |
trace_name: str, | |
outputs: dict[str, Any] | None = None, | |
error: Exception | None = None, | |
logs: Sequence[Log | dict] = (), | |
) -> None: | |
raise NotImplementedError | |
def end( | |
self, | |
inputs: dict[str, Any], | |
outputs: dict[str, Any], | |
error: Exception | None = None, | |
metadata: dict[str, Any] | None = None, | |
) -> None: | |
raise NotImplementedError | |
def get_langchain_callback(self) -> BaseCallbackHandler | None: | |
raise NotImplementedError | |