Tai Truong
fix readme
d202ada
import time
from collections.abc import Callable
import socketio
from loguru import logger
from sqlmodel import select
from langflow.api.utils import format_elapsed_time
from langflow.api.v1.schemas import ResultDataResponse, VertexBuildResponse
from langflow.graph.graph.base import Graph
from langflow.graph.utils import log_vertex_build
from langflow.graph.vertex.base import Vertex
from langflow.services.database.models.flow.model import Flow
from langflow.services.deps import get_session
def set_socketio_server(socketio_server) -> None:
from langflow.services.deps import get_socket_service
socket_service = get_socket_service()
socket_service.init(socketio_server)
async def get_vertices(sio, sid, flow_id, chat_service) -> None:
try:
session = await anext(get_session())
stmt = select(Flow).where(Flow.id == flow_id)
flow: Flow = (await session.exec(stmt)).first()
if not flow or not flow.data:
await sio.emit("error", data="Invalid flow ID", to=sid)
return
graph = Graph.from_payload(flow.data)
chat_service.set_cache(flow_id, graph)
vertices = graph.layered_topological_sort(graph.vertices)
# Emit the vertices to the client
await sio.emit("vertices_order", data=vertices, to=sid)
except Exception as exc: # noqa: BLE001
logger.opt(exception=True).debug("Error getting vertices")
await sio.emit("error", data=str(exc), to=sid)
async def build_vertex(
sio: socketio.AsyncServer,
sid: str,
flow_id: str,
vertex_id: str,
get_cache: Callable,
set_cache: Callable,
) -> None:
try:
cache = await get_cache(flow_id)
graph = cache.get("result")
if not isinstance(graph, Graph):
await sio.emit("error", data="Invalid graph", to=sid)
return
vertex = graph.get_vertex(vertex_id)
if not vertex:
await sio.emit("error", data="Invalid vertex", to=sid)
return
start_time = time.perf_counter()
try:
if isinstance(vertex, Vertex) or not vertex.built:
await vertex.build(user_id=None, session_id=sid)
params = vertex.built_object_repr()
valid = True
result_dict = vertex.get_built_result()
# We need to set the artifacts to pass information
# to the frontend
vertex.set_artifacts()
artifacts = vertex.artifacts
timedelta = time.perf_counter() - start_time
duration = format_elapsed_time(timedelta)
result_dict = ResultDataResponse(
results=result_dict,
artifacts=artifacts,
duration=duration,
timedelta=timedelta,
)
except Exception as exc: # noqa: BLE001
logger.opt(exception=True).debug("Error building vertex")
params = str(exc)
valid = False
result_dict = ResultDataResponse(results={})
artifacts = {}
await set_cache(flow_id, graph)
await log_vertex_build(
flow_id=flow_id,
vertex_id=vertex_id,
valid=valid,
params=params,
data=result_dict,
artifacts=artifacts,
)
# Emit the vertex build response
response = VertexBuildResponse(valid=valid, params=params, id=vertex.id, data=result_dict)
await sio.emit("vertex_build", data=response.model_dump(), to=sid)
except Exception as exc: # noqa: BLE001
logger.opt(exception=True).debug("Error building vertex")
await sio.emit("error", data=str(exc), to=sid)