Spaces:
Running
Running
import json | |
import os | |
from unittest.mock import patch | |
import pytest | |
from langflow.logging.logger import SizedLogBuffer | |
def sized_log_buffer(): | |
return SizedLogBuffer() | |
def test_init_default(): | |
buffer = SizedLogBuffer() | |
assert buffer.max == 0 | |
assert buffer._max_readers == 20 | |
def test_init_with_env_variable(): | |
with patch.dict(os.environ, {"LANGFLOW_LOG_RETRIEVER_BUFFER_SIZE": "100"}): | |
buffer = SizedLogBuffer() | |
assert buffer.max == 100 | |
def test_write(sized_log_buffer): | |
message = json.dumps({"text": "Test log", "record": {"time": {"timestamp": 1625097600.1244334}}}) | |
sized_log_buffer.max = 1 # Set max size to 1 for testing | |
sized_log_buffer.write(message) | |
assert len(sized_log_buffer.buffer) == 1 | |
assert sized_log_buffer.buffer[0][0] == 1625097600124 | |
assert sized_log_buffer.buffer[0][1] == "Test log" | |
def test_write_overflow(sized_log_buffer): | |
sized_log_buffer.max = 2 | |
messages = [json.dumps({"text": f"Log {i}", "record": {"time": {"timestamp": 1625097600 + i}}}) for i in range(3)] | |
for message in messages: | |
sized_log_buffer.write(message) | |
assert len(sized_log_buffer.buffer) == 2 | |
assert sized_log_buffer.buffer[0][0] == 1625097601000 | |
assert sized_log_buffer.buffer[1][0] == 1625097602000 | |
def test_len(sized_log_buffer): | |
sized_log_buffer.max = 3 | |
messages = [json.dumps({"text": f"Log {i}", "record": {"time": {"timestamp": 1625097600 + i}}}) for i in range(3)] | |
for message in messages: | |
sized_log_buffer.write(message) | |
assert len(sized_log_buffer) == 3 | |
def test_get_after_timestamp(sized_log_buffer): | |
sized_log_buffer.max = 5 | |
messages = [json.dumps({"text": f"Log {i}", "record": {"time": {"timestamp": 1625097600 + i}}}) for i in range(5)] | |
for message in messages: | |
sized_log_buffer.write(message) | |
result = sized_log_buffer.get_after_timestamp(1625097602000, lines=2) | |
assert len(result) == 2 | |
assert 1625097603000 in result | |
assert 1625097602000 in result | |
def test_get_before_timestamp(sized_log_buffer): | |
sized_log_buffer.max = 5 | |
messages = [json.dumps({"text": f"Log {i}", "record": {"time": {"timestamp": 1625097600 + i}}}) for i in range(5)] | |
for message in messages: | |
sized_log_buffer.write(message) | |
result = sized_log_buffer.get_before_timestamp(1625097603000, lines=2) | |
assert len(result) == 2 | |
assert 1625097601000 in result | |
assert 1625097602000 in result | |
def test_get_last_n(sized_log_buffer): | |
sized_log_buffer.max = 5 | |
messages = [json.dumps({"text": f"Log {i}", "record": {"time": {"timestamp": 1625097600 + i}}}) for i in range(5)] | |
for message in messages: | |
sized_log_buffer.write(message) | |
result = sized_log_buffer.get_last_n(3) | |
assert len(result) == 3 | |
assert 1625097602000 in result | |
assert 1625097603000 in result | |
assert 1625097604000 in result | |
def test_enabled(sized_log_buffer): | |
assert not sized_log_buffer.enabled() | |
sized_log_buffer.max = 1 | |
assert sized_log_buffer.enabled() | |
def test_max_size(sized_log_buffer): | |
assert sized_log_buffer.max_size() == 0 | |
sized_log_buffer.max = 100 | |
assert sized_log_buffer.max_size() == 100 | |