Tai Truong
fix readme
d202ada
import asyncio
import json
from http import HTTPStatus
from typing import Annotated, Any
from fastapi import APIRouter, HTTPException, Query, Request
from fastapi.responses import JSONResponse, StreamingResponse
from langflow.logging.logger import log_buffer
log_router = APIRouter(tags=["Log"])
NUMBER_OF_NOT_SENT_BEFORE_KEEPALIVE = 5
async def event_generator(request: Request):
global log_buffer # noqa: PLW0602
last_read_item = None
current_not_sent = 0
while not await request.is_disconnected():
to_write: list[Any] = []
with log_buffer.get_write_lock():
if last_read_item is None:
last_read_item = log_buffer.buffer[len(log_buffer.buffer) - 1]
else:
found_last = False
for item in log_buffer.buffer:
if found_last:
to_write.append(item)
last_read_item = item
continue
if item is last_read_item:
found_last = True
continue
# in case the last item is nomore in the buffer
if not found_last:
for item in log_buffer.buffer:
to_write.append(item)
last_read_item = item
if to_write:
for ts, msg in to_write:
yield f"{json.dumps({ts: msg})}\n\n"
else:
current_not_sent += 1
if current_not_sent == NUMBER_OF_NOT_SENT_BEFORE_KEEPALIVE:
current_not_sent = 0
yield "keepalive\n\n"
await asyncio.sleep(1)
@log_router.get("/logs-stream")
async def stream_logs(
request: Request,
):
"""HTTP/2 Server-Sent-Event (SSE) endpoint for streaming logs.
It establishes a long-lived connection to the server and receives log messages in real-time.
The client should use the header "Accept: text/event-stream".
"""
global log_buffer # noqa: PLW0602
if log_buffer.enabled() is False:
raise HTTPException(
status_code=HTTPStatus.NOT_IMPLEMENTED,
detail="Log retrieval is disabled",
)
return StreamingResponse(event_generator(request), media_type="text/event-stream")
@log_router.get("/logs")
async def logs(
lines_before: Annotated[int, Query(description="The number of logs before the timestamp or the last log")] = 0,
lines_after: Annotated[int, Query(description="The number of logs after the timestamp")] = 0,
timestamp: Annotated[int, Query(description="The timestamp to start getting logs from")] = 0,
):
global log_buffer # noqa: PLW0602
if log_buffer.enabled() is False:
raise HTTPException(
status_code=HTTPStatus.NOT_IMPLEMENTED,
detail="Log retrieval is disabled",
)
if lines_after > 0 and lines_before > 0:
raise HTTPException(
status_code=HTTPStatus.BAD_REQUEST,
detail="Cannot request logs before and after the timestamp",
)
if timestamp <= 0:
if lines_after > 0:
raise HTTPException(
status_code=HTTPStatus.BAD_REQUEST,
detail="Timestamp is required when requesting logs after the timestamp",
)
content = log_buffer.get_last_n(10) if lines_before <= 0 else log_buffer.get_last_n(lines_before)
elif lines_before > 0:
content = log_buffer.get_before_timestamp(timestamp=timestamp, lines=lines_before)
elif lines_after > 0:
content = log_buffer.get_after_timestamp(timestamp=timestamp, lines=lines_after)
else:
content = log_buffer.get_before_timestamp(timestamp=timestamp, lines=10)
return JSONResponse(content=content)