Spaces:
Running
Running
import asyncio | |
import json | |
from http import HTTPStatus | |
from typing import Annotated, Any | |
from fastapi import APIRouter, HTTPException, Query, Request | |
from fastapi.responses import JSONResponse, StreamingResponse | |
from langflow.logging.logger import log_buffer | |
log_router = APIRouter(tags=["Log"]) | |
NUMBER_OF_NOT_SENT_BEFORE_KEEPALIVE = 5 | |
async def event_generator(request: Request): | |
global log_buffer # noqa: PLW0602 | |
last_read_item = None | |
current_not_sent = 0 | |
while not await request.is_disconnected(): | |
to_write: list[Any] = [] | |
with log_buffer.get_write_lock(): | |
if last_read_item is None: | |
last_read_item = log_buffer.buffer[len(log_buffer.buffer) - 1] | |
else: | |
found_last = False | |
for item in log_buffer.buffer: | |
if found_last: | |
to_write.append(item) | |
last_read_item = item | |
continue | |
if item is last_read_item: | |
found_last = True | |
continue | |
# in case the last item is nomore in the buffer | |
if not found_last: | |
for item in log_buffer.buffer: | |
to_write.append(item) | |
last_read_item = item | |
if to_write: | |
for ts, msg in to_write: | |
yield f"{json.dumps({ts: msg})}\n\n" | |
else: | |
current_not_sent += 1 | |
if current_not_sent == NUMBER_OF_NOT_SENT_BEFORE_KEEPALIVE: | |
current_not_sent = 0 | |
yield "keepalive\n\n" | |
await asyncio.sleep(1) | |
async def stream_logs( | |
request: Request, | |
): | |
"""HTTP/2 Server-Sent-Event (SSE) endpoint for streaming logs. | |
It establishes a long-lived connection to the server and receives log messages in real-time. | |
The client should use the header "Accept: text/event-stream". | |
""" | |
global log_buffer # noqa: PLW0602 | |
if log_buffer.enabled() is False: | |
raise HTTPException( | |
status_code=HTTPStatus.NOT_IMPLEMENTED, | |
detail="Log retrieval is disabled", | |
) | |
return StreamingResponse(event_generator(request), media_type="text/event-stream") | |
async def logs( | |
lines_before: Annotated[int, Query(description="The number of logs before the timestamp or the last log")] = 0, | |
lines_after: Annotated[int, Query(description="The number of logs after the timestamp")] = 0, | |
timestamp: Annotated[int, Query(description="The timestamp to start getting logs from")] = 0, | |
): | |
global log_buffer # noqa: PLW0602 | |
if log_buffer.enabled() is False: | |
raise HTTPException( | |
status_code=HTTPStatus.NOT_IMPLEMENTED, | |
detail="Log retrieval is disabled", | |
) | |
if lines_after > 0 and lines_before > 0: | |
raise HTTPException( | |
status_code=HTTPStatus.BAD_REQUEST, | |
detail="Cannot request logs before and after the timestamp", | |
) | |
if timestamp <= 0: | |
if lines_after > 0: | |
raise HTTPException( | |
status_code=HTTPStatus.BAD_REQUEST, | |
detail="Timestamp is required when requesting logs after the timestamp", | |
) | |
content = log_buffer.get_last_n(10) if lines_before <= 0 else log_buffer.get_last_n(lines_before) | |
elif lines_before > 0: | |
content = log_buffer.get_before_timestamp(timestamp=timestamp, lines=lines_before) | |
elif lines_after > 0: | |
content = log_buffer.get_after_timestamp(timestamp=timestamp, lines=lines_after) | |
else: | |
content = log_buffer.get_before_timestamp(timestamp=timestamp, lines=10) | |
return JSONResponse(content=content) | |