Tai Truong
fix readme
d202ada
from collections.abc import AsyncIterator
from typing import Any
from unittest.mock import AsyncMock
from langchain_core.agents import AgentFinish
from langflow.base.agents.agent import process_agent_events
from langflow.base.agents.events import (
handle_on_chain_end,
handle_on_chain_start,
handle_on_chain_stream,
handle_on_tool_end,
handle_on_tool_error,
handle_on_tool_start,
)
from langflow.schema.content_block import ContentBlock
from langflow.schema.content_types import ToolContent
from langflow.schema.message import Message
from langflow.utils.constants import MESSAGE_SENDER_AI
async def create_event_iterator(events: list[dict[str, Any]]) -> AsyncIterator[dict[str, Any]]:
"""Helper function to create an async iterator from a list of events."""
for event in events:
yield event
async def test_chain_start_event():
"""Test handling of on_chain_start event."""
send_message = AsyncMock(side_effect=lambda message: message)
events = [
{"event": "on_chain_start", "data": {"input": {"input": "test input", "chat_history": []}}, "start_time": 0}
]
# Initialize message with content blocks
agent_message = Message(
sender=MESSAGE_SENDER_AI,
sender_name="Agent",
properties={"icon": "Bot", "state": "partial"},
content_blocks=[ContentBlock(title="Agent Steps", contents=[])],
session_id="test_session_id",
)
send_message.return_value = agent_message
result = await process_agent_events(create_event_iterator(events), agent_message, send_message)
assert result.properties.icon == "Bot"
assert len(result.content_blocks) == 1
assert result.content_blocks[0].title == "Agent Steps"
async def test_chain_end_event():
"""Test handling of on_chain_end event."""
send_message = AsyncMock(side_effect=lambda message: message)
# Create a mock AgentFinish output
output = AgentFinish(return_values={"output": "final output"}, log="test log")
events = [{"event": "on_chain_end", "data": {"output": output}, "start_time": 0}]
# Initialize message with content blocks
agent_message = Message(
sender=MESSAGE_SENDER_AI,
sender_name="Agent",
properties={"icon": "Bot", "state": "partial"},
content_blocks=[ContentBlock(title="Agent Steps", contents=[])],
session_id="test_session_id",
)
send_message.return_value = agent_message
result = await process_agent_events(create_event_iterator(events), agent_message, send_message)
assert result.properties.icon == "Bot"
assert result.properties.state == "complete"
assert result.text == "final output"
async def test_tool_start_event():
"""Test handling of on_tool_start event."""
send_message = AsyncMock()
# Set up the send_message mock to return the modified message
def update_message(message):
# Return a copy of the message to simulate real behavior
return Message(**message.model_dump())
send_message.side_effect = update_message
events = [
{
"event": "on_tool_start",
"name": "test_tool",
"run_id": "test_run",
"data": {"input": {"query": "tool input"}},
"start_time": 0,
}
]
agent_message = Message(
sender=MESSAGE_SENDER_AI,
sender_name="Agent",
properties={"icon": "Bot", "state": "partial"},
content_blocks=[ContentBlock(title="Agent Steps", contents=[])],
session_id="test_session_id",
)
result = await process_agent_events(create_event_iterator(events), agent_message, send_message)
assert result.properties.icon == "Bot"
assert len(result.content_blocks) == 1
assert result.content_blocks[0].title == "Agent Steps"
assert len(result.content_blocks[0].contents) > 0
tool_content = result.content_blocks[0].contents[-1]
assert isinstance(tool_content, ToolContent)
assert tool_content.name == "test_tool"
assert tool_content.tool_input == {"query": "tool input"}, tool_content
async def test_tool_end_event():
"""Test handling of on_tool_end event."""
send_message = AsyncMock(side_effect=lambda message: message)
events = [
{
"event": "on_tool_start",
"name": "test_tool",
"run_id": "test_run",
"data": {"input": {"query": "tool input"}},
"start_time": 0,
},
{
"event": "on_tool_end",
"name": "test_tool",
"run_id": "test_run",
"data": {"output": "tool output"},
"start_time": 0,
},
]
agent_message = Message(
sender=MESSAGE_SENDER_AI,
sender_name="Agent",
properties={"icon": "Bot", "state": "partial"},
content_blocks=[ContentBlock(title="Agent Steps", contents=[])],
session_id="test_session_id",
)
result = await process_agent_events(create_event_iterator(events), agent_message, send_message)
assert len(result.content_blocks) == 1
tool_content = result.content_blocks[0].contents[-1]
assert tool_content.name == "test_tool"
assert tool_content.output == "tool output"
async def test_tool_error_event():
"""Test handling of on_tool_error event."""
send_message = AsyncMock(side_effect=lambda message: message)
events = [
{
"event": "on_tool_start",
"name": "test_tool",
"run_id": "test_run",
"data": {"input": {"query": "tool input"}},
"start_time": 0,
},
{
"event": "on_tool_error",
"name": "test_tool",
"run_id": "test_run",
"data": {"error": "error message"},
"start_time": 0,
},
]
agent_message = Message(
sender=MESSAGE_SENDER_AI,
sender_name="Agent",
properties={"icon": "Bot", "state": "partial"},
content_blocks=[ContentBlock(title="Agent Steps", contents=[])],
session_id="test_session_id",
)
result = await process_agent_events(create_event_iterator(events), agent_message, send_message)
tool_content = result.content_blocks[0].contents[-1]
assert tool_content.name == "test_tool"
assert tool_content.error == "error message"
assert tool_content.header["title"] == "Error using **test_tool**"
async def test_chain_stream_event():
"""Test handling of on_chain_stream event."""
send_message = AsyncMock(side_effect=lambda message: message)
events = [{"event": "on_chain_stream", "data": {"chunk": {"output": "streamed output"}}, "start_time": 0}]
agent_message = Message(
sender=MESSAGE_SENDER_AI,
sender_name="Agent",
properties={"icon": "Bot", "state": "partial"},
content_blocks=[ContentBlock(title="Agent Steps", contents=[])],
session_id="test_session_id",
)
result = await process_agent_events(create_event_iterator(events), agent_message, send_message)
assert result.properties.state == "complete"
assert result.text == "streamed output"
async def test_multiple_events():
"""Test handling of multiple events in sequence."""
send_message = AsyncMock(side_effect=lambda message: message)
# Create a mock AgentFinish output instead of MockOutput
output = AgentFinish(return_values={"output": "final output"}, log="test log")
events = [
{"event": "on_chain_start", "data": {"input": {"input": "initial input", "chat_history": []}}, "start_time": 0},
{
"event": "on_tool_start",
"name": "test_tool",
"run_id": "test_run",
"data": {"input": {"query": "tool input"}},
"start_time": 0,
},
{
"event": "on_tool_end",
"name": "test_tool",
"run_id": "test_run",
"data": {"output": "tool output"},
"start_time": 0,
},
{"event": "on_chain_end", "data": {"output": output}, "start_time": 0},
]
# Initialize message with content blocks
agent_message = Message(
sender=MESSAGE_SENDER_AI,
sender_name="Agent",
properties={"icon": "Bot", "state": "partial"},
content_blocks=[ContentBlock(title="Agent Steps", contents=[])],
)
send_message.return_value = agent_message
result = await process_agent_events(create_event_iterator(events), agent_message, send_message)
assert result.properties.state == "complete"
assert result.properties.icon == "Bot"
assert len(result.content_blocks) == 1
assert result.text == "final output"
async def test_unknown_event():
"""Test handling of unknown event type."""
send_message = AsyncMock(side_effect=lambda message: message)
agent_message = Message(
sender=MESSAGE_SENDER_AI,
sender_name="Agent",
properties={"icon": "Bot", "state": "partial"},
content_blocks=[ContentBlock(title="Agent Steps", contents=[])], # Initialize with empty content block
)
send_message.return_value = agent_message
events = [{"event": "unknown_event", "data": {"some": "data"}, "start_time": 0}]
result = await process_agent_events(create_event_iterator(events), agent_message, send_message)
# Should complete without error and maintain default state
assert result.properties.state == "complete"
# Content blocks should be empty but present
assert len(result.content_blocks) == 1
assert len(result.content_blocks[0].contents) == 0
# Additional tests for individual handler functions
async def test_handle_on_chain_start_with_input():
"""Test handle_on_chain_start with input."""
send_message = AsyncMock(side_effect=lambda message: message)
agent_message = Message(
sender=MESSAGE_SENDER_AI,
sender_name="Agent",
properties={"icon": "Bot", "state": "partial"},
content_blocks=[ContentBlock(title="Agent Steps", contents=[])],
)
event = {"event": "on_chain_start", "data": {"input": {"input": "test input", "chat_history": []}}, "start_time": 0}
updated_message, start_time = await handle_on_chain_start(event, agent_message, send_message, 0.0)
assert updated_message.properties.icon == "Bot"
assert len(updated_message.content_blocks) == 1
assert updated_message.content_blocks[0].title == "Agent Steps"
assert isinstance(start_time, float)
async def test_handle_on_chain_start_no_input():
"""Test handle_on_chain_start without input."""
send_message = AsyncMock(side_effect=lambda message: message)
agent_message = Message(
sender=MESSAGE_SENDER_AI,
sender_name="Agent",
properties={"icon": "Bot", "state": "partial"},
content_blocks=[ContentBlock(title="Agent Steps", contents=[])],
)
event = {"event": "on_chain_start", "data": {}, "start_time": 0}
updated_message, start_time = await handle_on_chain_start(event, agent_message, send_message, 0.0)
assert updated_message.properties.icon == "Bot"
assert len(updated_message.content_blocks) == 1
assert len(updated_message.content_blocks[0].contents) == 0
assert isinstance(start_time, float)
async def test_handle_on_chain_end_with_output():
"""Test handle_on_chain_end with output."""
send_message = AsyncMock(side_effect=lambda message: message)
agent_message = Message(
sender=MESSAGE_SENDER_AI,
sender_name="Agent",
properties={"icon": "Bot", "state": "partial"},
content_blocks=[ContentBlock(title="Agent Steps", contents=[])],
)
output = AgentFinish(return_values={"output": "final output"}, log="test log")
event = {"event": "on_chain_end", "data": {"output": output}, "start_time": 0}
updated_message, start_time = await handle_on_chain_end(event, agent_message, send_message, 0.0)
assert updated_message.properties.icon == "Bot"
assert updated_message.properties.state == "complete"
assert updated_message.text == "final output"
assert isinstance(start_time, float)
async def test_handle_on_chain_end_no_output():
"""Test handle_on_chain_end without output key in data."""
send_message = AsyncMock(side_effect=lambda message: message)
agent_message = Message(
sender=MESSAGE_SENDER_AI,
sender_name="Agent",
properties={"icon": "Bot", "state": "partial"},
content_blocks=[ContentBlock(title="Agent Steps", contents=[])],
)
event = {"event": "on_chain_end", "data": {}, "start_time": 0}
updated_message, start_time = await handle_on_chain_end(event, agent_message, send_message, 0.0)
assert updated_message.properties.icon == "Bot"
assert updated_message.properties.state == "partial"
assert updated_message.text == ""
assert isinstance(start_time, float)
async def test_handle_on_chain_end_empty_data():
"""Test handle_on_chain_end with empty data."""
send_message = AsyncMock(side_effect=lambda message: message)
agent_message = Message(
sender=MESSAGE_SENDER_AI,
sender_name="Agent",
properties={"icon": "Bot", "state": "partial"},
content_blocks=[ContentBlock(title="Agent Steps", contents=[])],
)
event = {"event": "on_chain_end", "data": {"output": None}, "start_time": 0}
updated_message, start_time = await handle_on_chain_end(event, agent_message, send_message, 0.0)
assert updated_message.properties.icon == "Bot"
assert updated_message.properties.state == "partial"
assert updated_message.text == ""
assert isinstance(start_time, float)
async def test_handle_on_chain_end_with_empty_return_values():
"""Test handle_on_chain_end with empty return_values."""
send_message = AsyncMock(side_effect=lambda message: message)
agent_message = Message(
sender=MESSAGE_SENDER_AI,
sender_name="Agent",
properties={"icon": "Bot", "state": "partial"},
content_blocks=[ContentBlock(title="Agent Steps", contents=[])],
)
class MockOutputEmptyReturnValues:
def __init__(self):
self.return_values = {}
event = {"event": "on_chain_end", "data": {"output": MockOutputEmptyReturnValues()}, "start_time": 0}
updated_message, start_time = await handle_on_chain_end(event, agent_message, send_message, 0.0)
assert updated_message.properties.icon == "Bot"
assert updated_message.properties.state == "partial"
assert updated_message.text == ""
assert isinstance(start_time, float)
async def test_handle_on_tool_start():
"""Test handle_on_tool_start event."""
send_message = AsyncMock(side_effect=lambda message: message)
tool_blocks_map = {}
agent_message = Message(
sender=MESSAGE_SENDER_AI,
sender_name="Agent",
properties={"icon": "Bot", "state": "partial"},
content_blocks=[ContentBlock(title="Agent Steps", contents=[])],
)
event = {
"event": "on_tool_start",
"name": "test_tool",
"run_id": "test_run",
"data": {"input": {"query": "tool input"}},
"start_time": 0,
}
updated_message, start_time = await handle_on_tool_start(event, agent_message, tool_blocks_map, send_message, 0.0)
assert len(updated_message.content_blocks) == 1
assert len(updated_message.content_blocks[0].contents) > 0
tool_key = f"{event['name']}_{event['run_id']}"
tool_content = updated_message.content_blocks[0].contents[-1]
assert tool_content == tool_blocks_map.get(tool_key)
assert isinstance(tool_content, ToolContent)
assert tool_content.name == "test_tool"
assert tool_content.tool_input == {"query": "tool input"}
assert isinstance(tool_content.duration, int)
assert isinstance(start_time, float)
async def test_handle_on_tool_end():
"""Test handle_on_tool_end event."""
send_message = AsyncMock(side_effect=lambda message: message)
tool_blocks_map = {}
agent_message = Message(
sender=MESSAGE_SENDER_AI,
sender_name="Agent",
properties={"icon": "Bot", "state": "partial"},
content_blocks=[ContentBlock(title="Agent Steps", contents=[])],
)
start_event = {
"event": "on_tool_start",
"name": "test_tool",
"run_id": "test_run",
"data": {"input": {"query": "tool input"}},
}
agent_message, _ = await handle_on_tool_start(start_event, agent_message, tool_blocks_map, send_message, 0.0)
end_event = {
"event": "on_tool_end",
"name": "test_tool",
"run_id": "test_run",
"data": {"output": "tool output"},
"start_time": 0,
}
updated_message, start_time = await handle_on_tool_end(end_event, agent_message, tool_blocks_map, send_message, 0.0)
f"{end_event['name']}_{end_event['run_id']}"
tool_content = updated_message.content_blocks[0].contents[-1]
assert tool_content.name == "test_tool"
assert tool_content.output == "tool output"
assert isinstance(tool_content.duration, int)
assert isinstance(start_time, float)
async def test_handle_on_tool_error():
"""Test handle_on_tool_error event."""
send_message = AsyncMock(side_effect=lambda message: message)
tool_blocks_map = {}
agent_message = Message(
sender=MESSAGE_SENDER_AI,
sender_name="Agent",
properties={"icon": "Bot", "state": "partial"},
content_blocks=[ContentBlock(title="Agent Steps", contents=[])],
)
start_event = {
"event": "on_tool_start",
"name": "test_tool",
"run_id": "test_run",
"data": {"input": {"query": "tool input"}},
}
agent_message, _ = await handle_on_tool_start(start_event, agent_message, tool_blocks_map, send_message, 0.0)
error_event = {
"event": "on_tool_error",
"name": "test_tool",
"run_id": "test_run",
"data": {"error": "error message"},
"start_time": 0,
}
updated_message, start_time = await handle_on_tool_error(
error_event, agent_message, tool_blocks_map, send_message, 0.0
)
tool_content = updated_message.content_blocks[0].contents[-1]
assert tool_content.name == "test_tool"
assert tool_content.error == "error message"
assert tool_content.header["title"] == "Error using **test_tool**"
assert isinstance(tool_content.duration, int)
assert isinstance(start_time, float)
async def test_handle_on_chain_stream_with_output():
"""Test handle_on_chain_stream with output."""
send_message = AsyncMock(side_effect=lambda message: message)
agent_message = Message(
sender=MESSAGE_SENDER_AI,
sender_name="Agent",
properties={"icon": "Bot", "state": "partial"},
content_blocks=[ContentBlock(title="Agent Steps", contents=[])],
)
event = {
"event": "on_chain_stream",
"data": {"chunk": {"output": "streamed output"}},
}
updated_message, start_time = await handle_on_chain_stream(event, agent_message, send_message, 0.0)
assert updated_message.text == "streamed output"
assert updated_message.properties.state == "complete"
assert isinstance(start_time, float)
async def test_handle_on_chain_stream_no_output():
"""Test handle_on_chain_stream without output."""
send_message = AsyncMock(side_effect=lambda message: message)
agent_message = Message(
sender=MESSAGE_SENDER_AI,
sender_name="Agent",
properties={"icon": "Bot", "state": "partial"},
content_blocks=[ContentBlock(title="Agent Steps", contents=[])],
session_id="test_session_id",
)
event = {
"event": "on_chain_stream",
"data": {"chunk": {}},
}
updated_message, start_time = await handle_on_chain_stream(event, agent_message, send_message, 0.0)
assert updated_message.text == ""
assert updated_message.properties.state == "partial"
assert isinstance(start_time, float)