# coding=utf-8
# Copyright 2024 HuggingFace Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import io
import os
import re
import tempfile
import uuid
import warnings
from collections.abc import Generator
from contextlib import nullcontext as does_not_raise
from dataclasses import dataclass
from pathlib import Path
from textwrap import dedent
from typing import Optional
from unittest.mock import MagicMock, patch
import pytest
from huggingface_hub import (
    ChatCompletionOutputFunctionDefinition,
    ChatCompletionOutputMessage,
    ChatCompletionOutputToolCall,
)
from rich.console import Console
from smolagents import EMPTY_PROMPT_TEMPLATES
from smolagents.agent_types import AgentImage, AgentText
from smolagents.agents import (
    AgentError,
    AgentMaxStepsError,
    AgentToolCallError,
    CodeAgent,
    MultiStepAgent,
    ToolCall,
    ToolCallingAgent,
    ToolOutput,
    populate_template,
)
from smolagents.default_tools import DuckDuckGoSearchTool, FinalAnswerTool, PythonInterpreterTool, VisitWebpageTool
from smolagents.memory import (
    ActionStep,
    PlanningStep,
    TaskStep,
)
from smolagents.models import (
    ChatMessage,
    ChatMessageToolCall,
    ChatMessageToolCallFunction,
    InferenceClientModel,
    MessageRole,
    Model,
    TransformersModel,
)
from smolagents.monitoring import AgentLogger, LogLevel, TokenUsage
from smolagents.tools import Tool, tool
from smolagents.utils import (
    BASE_BUILTIN_MODULES,
    AgentExecutionError,
    AgentGenerationError,
    AgentToolExecutionError,
)
@dataclass
class ChoiceDeltaToolCallFunction:
    arguments: Optional[str] = None
    name: Optional[str] = None
@dataclass
class ChoiceDeltaToolCall:
    index: Optional[int] = None
    id: Optional[str] = None
    function: Optional[ChoiceDeltaToolCallFunction] = None
    type: Optional[str] = None
@dataclass
class ChoiceDelta:
    content: Optional[str] = None
    function_call: Optional[str] = None
    refusal: Optional[str] = None
    role: Optional[str] = None
    tool_calls: Optional[list] = None
def get_new_path(suffix="") -> str:
    directory = tempfile.mkdtemp()
    return os.path.join(directory, str(uuid.uuid4()) + suffix)
@pytest.fixture
def agent_logger():
    return AgentLogger(
        LogLevel.DEBUG, console=Console(record=True, no_color=True, force_terminal=False, file=io.StringIO())
    )
class FakeToolCallModel(Model):
    def generate(self, messages, tools_to_call_from=None, stop_sequences=None):
        if len(messages) < 3:
            return ChatMessage(
                role=MessageRole.ASSISTANT,
                content="",
                tool_calls=[
                    ChatMessageToolCall(
                        id="call_0",
                        type="function",
                        function=ChatMessageToolCallFunction(
                            name="python_interpreter", arguments={"code": "2*3.6452"}
                        ),
                    )
                ],
            )
        else:
            return ChatMessage(
                role=MessageRole.ASSISTANT,
                content="",
                tool_calls=[
                    ChatMessageToolCall(
                        id="call_1",
                        type="function",
                        function=ChatMessageToolCallFunction(name="final_answer", arguments={"answer": "7.2904"}),
                    )
                ],
            )
class FakeToolCallModelImage(Model):
    def generate(self, messages, tools_to_call_from=None, stop_sequences=None):
        if len(messages) < 3:
            return ChatMessage(
                role=MessageRole.ASSISTANT,
                content="",
                tool_calls=[
                    ChatMessageToolCall(
                        id="call_0",
                        type="function",
                        function=ChatMessageToolCallFunction(
                            name="fake_image_generation_tool",
                            arguments={"prompt": "An image of a cat"},
                        ),
                    )
                ],
            )
        else:
            return ChatMessage(
                role=MessageRole.ASSISTANT,
                content="",
                tool_calls=[
                    ChatMessageToolCall(
                        id="call_1",
                        type="function",
                        function=ChatMessageToolCallFunction(name="final_answer", arguments="image.png"),
                    )
                ],
            )
class FakeToolCallModelVL(Model):
    def generate(self, messages, tools_to_call_from=None, stop_sequences=None):
        if len(messages) < 3:
            return ChatMessage(
                role=MessageRole.ASSISTANT,
                content="",
                tool_calls=[
                    ChatMessageToolCall(
                        id="call_0",
                        type="function",
                        function=ChatMessageToolCallFunction(
                            name="fake_image_understanding_tool",
                            arguments={
                                "prompt": "What is in this image?",
                                "image": "image.png",
                            },
                        ),
                    )
                ],
            )
        else:
            return ChatMessage(
                role=MessageRole.ASSISTANT,
                content="",
                tool_calls=[
                    ChatMessageToolCall(
                        id="call_1",
                        type="function",
                        function=ChatMessageToolCallFunction(name="final_answer", arguments="The image is a cat."),
                    )
                ],
            )
class FakeCodeModel(Model):
    def generate(self, messages, stop_sequences=None):
        prompt = str(messages)
        if "special_marker" not in prompt:
            return ChatMessage(
                role=MessageRole.ASSISTANT,
                content="""
Thought: I should multiply 2 by 3.6452. special_marker
result = 2**3.6452
""",
            )
        else:  # We're at step 2
            return ChatMessage(
                role=MessageRole.ASSISTANT,
                content="""
Thought: I can now answer the initial question
final_answer(7.2904)
""",
            )
class FakeCodeModelPlanning(Model):
    def generate(self, messages, stop_sequences=None):
        prompt = str(messages)
        if "planning_marker" not in prompt:
            return ChatMessage(
                role=MessageRole.ASSISTANT,
                content="llm plan update planning_marker",
                token_usage=TokenUsage(input_tokens=10, output_tokens=10),
            )
        elif "action_marker" not in prompt:
            return ChatMessage(
                role=MessageRole.ASSISTANT,
                content="""
Thought: I should multiply 2 by 3.6452. action_marker
result = 2**3.6452
""",
                token_usage=TokenUsage(input_tokens=10, output_tokens=10),
            )
        else:
            return ChatMessage(
                role=MessageRole.ASSISTANT,
                content="llm plan again",
                token_usage=TokenUsage(input_tokens=10, output_tokens=10),
            )
class FakeCodeModelError(Model):
    def generate(self, messages, stop_sequences=None):
        prompt = str(messages)
        if "special_marker" not in prompt:
            return ChatMessage(
                role=MessageRole.ASSISTANT,
                content="""
Thought: I should multiply 2 by 3.6452. special_marker
print("Flag!")
def error_function():
    raise ValueError("error")
error_function()
""",
            )
        else:  # We're at step 2
            return ChatMessage(
                role=MessageRole.ASSISTANT,
                content="""
Thought: I faced an error in the previous step.
final_answer("got an error")
""",
            )
class FakeCodeModelSyntaxError(Model):
    def generate(self, messages, stop_sequences=None):
        prompt = str(messages)
        if "special_marker" not in prompt:
            return ChatMessage(
                role=MessageRole.ASSISTANT,
                content="""
Thought: I should multiply 2 by 3.6452. special_marker
a = 2
b = a * 2
    print("Failing due to unexpected indent")
print("Ok, calculation done!")
""",
            )
        else:  # We're at step 2
            return ChatMessage(
                role=MessageRole.ASSISTANT,
                content="""
Thought: I can now answer the initial question
final_answer("got an error")
""",
            )
class FakeCodeModelImport(Model):
    def generate(self, messages, stop_sequences=None):
        return ChatMessage(
            role=MessageRole.ASSISTANT,
            content="""
Thought: I can answer the question
import numpy as np
final_answer("got an error")
""",
        )
class FakeCodeModelFunctionDef(Model):
    def generate(self, messages, stop_sequences=None):
        prompt = str(messages)
        if "special_marker" not in prompt:
            return ChatMessage(
                role=MessageRole.ASSISTANT,
                content="""
Thought: Let's define the function. special_marker
import numpy as np
def moving_average(x, w):
    return np.convolve(x, np.ones(w), 'valid') / w
    """,
            )
        else:  # We're at step 2
            return ChatMessage(
                role=MessageRole.ASSISTANT,
                content="""
Thought: I can now answer the initial question
x, w = [0, 1, 2, 3, 4, 5], 2
res = moving_average(x, w)
final_answer(res)
""",
            )
class FakeCodeModelSingleStep(Model):
    def generate(self, messages, stop_sequences=None):
        return ChatMessage(
            role=MessageRole.ASSISTANT,
            content="""
Thought: I should multiply 2 by 3.6452. special_marker
result = python_interpreter(code="2*3.6452")
final_answer(result)
```
""",
        )
class FakeCodeModelNoReturn(Model):
    def generate(self, messages, stop_sequences=None):
        return ChatMessage(
            role=MessageRole.ASSISTANT,
            content="""
Thought: I should multiply 2 by 3.6452. special_marker
result = python_interpreter(code="2*3.6452")
print(result)
```
""",
        )
class TestAgent:
    def test_fake_toolcalling_agent(self):
        agent = ToolCallingAgent(tools=[PythonInterpreterTool()], model=FakeToolCallModel())
        output = agent.run("What is 2 multiplied by 3.6452?")
        assert isinstance(output, str)
        assert "7.2904" in output
        assert agent.memory.steps[0].task == "What is 2 multiplied by 3.6452?"
        assert "7.2904" in agent.memory.steps[1].observations
        assert (
            agent.memory.steps[2].model_output
            == "Tool call call_1: calling 'final_answer' with arguments: {'answer': '7.2904'}"
        )
    def test_toolcalling_agent_handles_image_tool_outputs(self, shared_datadir):
        import PIL.Image
        @tool
        def fake_image_generation_tool(prompt: str) -> PIL.Image.Image:
            """Tool that generates an image.
            Args:
                prompt: The prompt
            """
            import PIL.Image
            return PIL.Image.open(shared_datadir / "000000039769.png")
        agent = ToolCallingAgent(
            tools=[fake_image_generation_tool], model=FakeToolCallModelImage(), verbosity_level=10
        )
        output = agent.run("Make me an image.")
        assert isinstance(output, AgentImage)
        assert isinstance(agent.state["image.png"], PIL.Image.Image)
    def test_toolcalling_agent_handles_image_inputs(self, shared_datadir):
        import PIL.Image
        image = PIL.Image.open(shared_datadir / "000000039769.png")  # dummy input
        @tool
        def fake_image_understanding_tool(prompt: str, image: PIL.Image.Image) -> str:
            """Tool that creates a caption for an image.
            Args:
                prompt: The prompt
                image: The image
            """
            return "The image is a cat."
        agent = ToolCallingAgent(tools=[fake_image_understanding_tool], model=FakeToolCallModelVL())
        output = agent.run("Caption this image.", images=[image])
        assert output == "The image is a cat."
    def test_fake_code_agent(self):
        agent = CodeAgent(tools=[PythonInterpreterTool()], model=FakeCodeModel(), verbosity_level=10)
        output = agent.run("What is 2 multiplied by 3.6452?")
        assert isinstance(output, float)
        assert output == 7.2904
        assert agent.memory.steps[0].task == "What is 2 multiplied by 3.6452?"
        assert agent.memory.steps[2].tool_calls == [
            ToolCall(name="python_interpreter", arguments="final_answer(7.2904)", id="call_2")
        ]
    def test_additional_args_added_to_task(self):
        agent = CodeAgent(tools=[], model=FakeCodeModel())
        agent.run(
            "What is 2 multiplied by 3.6452?",
            additional_args={"instruction": "Remember this."},
        )
        assert "Remember this" in agent.task
    def test_reset_conversations(self):
        agent = CodeAgent(tools=[PythonInterpreterTool()], model=FakeCodeModel())
        output = agent.run("What is 2 multiplied by 3.6452?", reset=True)
        assert output == 7.2904
        assert len(agent.memory.steps) == 3
        output = agent.run("What is 2 multiplied by 3.6452?", reset=False)
        assert output == 7.2904
        assert len(agent.memory.steps) == 5
        output = agent.run("What is 2 multiplied by 3.6452?", reset=True)
        assert output == 7.2904
        assert len(agent.memory.steps) == 3
    def test_setup_agent_with_empty_toolbox(self):
        ToolCallingAgent(model=FakeToolCallModel(), tools=[])
    def test_fails_max_steps(self):
        agent = CodeAgent(
            tools=[PythonInterpreterTool()],
            model=FakeCodeModelNoReturn(),  # use this callable because it never ends
            max_steps=5,
        )
        answer = agent.run("What is 2 multiplied by 3.6452?")
        assert len(agent.memory.steps) == 7  # Task step + 5 action steps + Final answer
        assert type(agent.memory.steps[-1].error) is AgentMaxStepsError
        assert isinstance(answer, str)
        agent = CodeAgent(
            tools=[PythonInterpreterTool()],
            model=FakeCodeModelNoReturn(),  # use this callable because it never ends
            max_steps=5,
        )
        answer = agent.run("What is 2 multiplied by 3.6452?", max_steps=3)
        assert len(agent.memory.steps) == 5  # Task step + 3 action steps + Final answer
        assert type(agent.memory.steps[-1].error) is AgentMaxStepsError
        assert isinstance(answer, str)
    def test_tool_descriptions_get_baked_in_system_prompt(self):
        tool = PythonInterpreterTool()
        tool.name = "fake_tool_name"
        tool.description = "fake_tool_description"
        agent = CodeAgent(tools=[tool], model=FakeCodeModel())
        agent.run("Empty task")
        assert agent.system_prompt is not None
        assert f"def {tool.name}(" in agent.system_prompt
        assert f'"""{tool.description}' in agent.system_prompt
    def test_module_imports_get_baked_in_system_prompt(self):
        agent = CodeAgent(tools=[], model=FakeCodeModel())
        agent.run("Empty task")
        for module in BASE_BUILTIN_MODULES:
            assert module in agent.system_prompt
    def test_init_agent_with_different_toolsets(self):
        toolset_1 = []
        agent = CodeAgent(tools=toolset_1, model=FakeCodeModel())
        assert len(agent.tools) == 1  # when no tools are provided, only the final_answer tool is added by default
        toolset_2 = [PythonInterpreterTool(), PythonInterpreterTool()]
        with pytest.raises(ValueError) as e:
            agent = CodeAgent(tools=toolset_2, model=FakeCodeModel())
        assert "Each tool or managed_agent should have a unique name!" in str(e)
        with pytest.raises(ValueError) as e:
            agent.name = "python_interpreter"
            agent.description = "empty"
            CodeAgent(tools=[PythonInterpreterTool()], model=FakeCodeModel(), managed_agents=[agent])
        assert "Each tool or managed_agent should have a unique name!" in str(e)
        # check that python_interpreter base tool does not get added to CodeAgent
        agent = CodeAgent(tools=[], model=FakeCodeModel(), add_base_tools=True)
        assert len(agent.tools) == 3  # added final_answer tool + search + visit_webpage
        # check that python_interpreter base tool gets added to ToolCallingAgent
        agent = ToolCallingAgent(tools=[], model=FakeCodeModel(), add_base_tools=True)
        assert len(agent.tools) == 4  # added final_answer tool + search + visit_webpage
    def test_function_persistence_across_steps(self):
        agent = CodeAgent(
            tools=[],
            model=FakeCodeModelFunctionDef(),
            max_steps=2,
            additional_authorized_imports=["numpy"],
            verbosity_level=100,
        )
        res = agent.run("ok")
        assert res[0] == 0.5
    def test_init_managed_agent(self):
        agent = CodeAgent(tools=[], model=FakeCodeModelFunctionDef(), name="managed_agent", description="Empty")
        assert agent.name == "managed_agent"
        assert agent.description == "Empty"
    def test_agent_description_gets_correctly_inserted_in_system_prompt(self):
        managed_agent = CodeAgent(
            tools=[], model=FakeCodeModelFunctionDef(), name="managed_agent", description="Empty"
        )
        manager_agent = CodeAgent(
            tools=[],
            model=FakeCodeModelFunctionDef(),
            managed_agents=[managed_agent],
        )
        assert "You can also give tasks to team members." not in managed_agent.system_prompt
        assert "{{managed_agents_descriptions}}" not in managed_agent.system_prompt
        assert "You can also give tasks to team members." in manager_agent.system_prompt
    def test_replay_shows_logs(self, agent_logger):
        agent = CodeAgent(
            tools=[],
            model=FakeCodeModelImport(),
            verbosity_level=0,
            additional_authorized_imports=["numpy"],
            logger=agent_logger,
        )
        agent.run("Count to 3")
        str_output = agent_logger.console.export_text()
        assert "New run" in str_output
        assert 'final_answer("got' in str_output
        assert "" in str_output
        agent = ToolCallingAgent(tools=[PythonInterpreterTool()], model=FakeToolCallModel(), verbosity_level=0)
        agent.logger = agent_logger
        agent.run("What is 2 multiplied by 3.6452?")
        agent.replay()
        str_output = agent_logger.console.export_text()
        assert "Tool call" in str_output
        assert "arguments" in str_output
    def test_code_nontrivial_final_answer_works(self):
        class FakeCodeModelFinalAnswer(Model):
            def generate(self, messages, stop_sequences=None):
                return ChatMessage(
                    role=MessageRole.ASSISTANT,
                    content="""
def nested_answer():
    final_answer("Correct!")
nested_answer()
""",
                )
        agent = CodeAgent(tools=[], model=FakeCodeModelFinalAnswer())
        output = agent.run("Count to 3")
        assert output == "Correct!"
    def test_transformers_toolcalling_agent(self):
        @tool
        def weather_api(location: str, celsius: str = "") -> str:
            """
            Gets the weather in the next days at given location.
            Secretly this tool does not care about the location, it hates the weather everywhere.
            Args:
                location: the location
                celsius: the temperature type
            """
            return "The weather is UNGODLY with torrential rains and temperatures below -10°C"
        model = TransformersModel(
            model_id="HuggingFaceTB/SmolLM2-360M-Instruct",
            max_new_tokens=100,
            device_map="auto",
            do_sample=False,
        )
        agent = ToolCallingAgent(model=model, tools=[weather_api], max_steps=1, verbosity_level=10)
        task = "What is the weather in Paris? "
        agent.run(task)
        assert agent.memory.steps[0].task == task
        assert agent.memory.steps[1].tool_calls[0].name == "weather_api"
        step_memory_dict = agent.memory.get_succinct_steps()[1]
        assert step_memory_dict["model_output_message"]["tool_calls"][0]["function"]["name"] == "weather_api"
        assert step_memory_dict["model_output_message"]["raw"]["completion_kwargs"]["max_new_tokens"] == 100
        assert "model_input_messages" in agent.memory.get_full_steps()[1]
        assert step_memory_dict["token_usage"]["total_tokens"] > 100
        assert step_memory_dict["timing"]["duration"] > 0.1
    def test_final_answer_checks(self):
        error_string = "failed with error"
        def check_always_fails(final_answer, agent_memory):
            assert False, "Error raised in check"
        agent = CodeAgent(model=FakeCodeModel(), tools=[], final_answer_checks=[check_always_fails])
        agent.run("Dummy task.")
        assert error_string in str(agent.write_memory_to_messages())
        assert "Error raised in check" in str(agent.write_memory_to_messages())
        agent = CodeAgent(
            model=FakeCodeModel(),
            tools=[],
            final_answer_checks=[lambda x, y: x == 7.2904],
            verbosity_level=1000,
        )
        output = agent.run("Dummy task.")
        assert output == 7.2904  # Check that output is correct
        assert len([step for step in agent.memory.steps if isinstance(step, ActionStep)]) == 2
        assert error_string not in str(agent.write_memory_to_messages())
    def test_generation_errors_are_raised(self):
        class FakeCodeModel(Model):
            def generate(self, messages, stop_sequences=None):
                assert False, "Generation failed"
        agent = CodeAgent(model=FakeCodeModel(), tools=[])
        with pytest.raises(AgentGenerationError) as e:
            agent.run("Dummy task.")
        assert len(agent.memory.steps) == 2
        assert "Generation failed" in str(e)
    def test_planning_step_with_injected_memory(self):
        """Test that agent properly uses update plan prompts when memory is injected before a run.
        This test verifies:
        1. Planning steps are created with the correct frequency
        2. Injected memory is included in planning context
        3. Messages are properly formatted with expected roles and content
        """
        planning_interval = 1
        max_steps = 4
        task = "Continuous task"
        previous_task = "Previous user request"
        # Create agent with planning capability
        agent = CodeAgent(
            tools=[],
            planning_interval=planning_interval,
            model=FakeCodeModelPlanning(),
            max_steps=max_steps,
        )
        # Inject memory before run to simulate existing conversation history
        previous_step = TaskStep(task=previous_task)
        agent.memory.steps.append(previous_step)
        # Run the agent
        agent.run(task, reset=False)
        # Extract and validate planning steps
        planning_steps = [step for step in agent.memory.steps if isinstance(step, PlanningStep)]
        assert len(planning_steps) > 2, "Expected multiple planning steps to be generated"
        # Verify first planning step incorporates injected memory
        first_planning_step = planning_steps[0]
        input_messages = first_planning_step.model_input_messages
        # Check message structure and content
        assert len(input_messages) == 4, (
            "First planning step should have 4 messages: system-plan-pre-update + memory + task + user-plan-post-update"
        )
        # Verify system message contains current task
        system_message = input_messages[0]
        assert system_message.role == "system", "First message should have system role"
        assert task in system_message.content[0]["text"], f"System message should contain the current task: '{task}'"
        # Verify memory message contains previous task
        memory_message = input_messages[1]
        assert previous_task in memory_message.content[0]["text"], (
            f"Memory message should contain previous task: '{previous_task}'"
        )
        # Verify task message contains current task
        task_message = input_messages[2]
        assert task in task_message.content[0]["text"], f"Task message should contain current task: '{task}'"
        # Verify user message for planning
        user_message = input_messages[3]
        assert user_message.role == "user", "Fourth message should have user role"
        # Verify second planning step has more context from first agent actions
        second_planning_step = planning_steps[1]
        second_messages = second_planning_step.model_input_messages
        # Check that conversation history is growing appropriately
        assert len(second_messages) == 6, "Second planning step should have 6 messages including tool interactions"
        # Verify all conversation elements are present
        conversation_text = "".join([msg.content[0]["text"] for msg in second_messages if hasattr(msg, "content")])
        assert previous_task in conversation_text, "Previous task should be included in the conversation history"
        assert task in conversation_text, "Current task should be included in the conversation history"
        assert "tools" in conversation_text, "Tool interactions should be included in the conversation history"
class CustomFinalAnswerTool(FinalAnswerTool):
    def forward(self, answer) -> str:
        return answer + "CUSTOM"
class MockTool(Tool):
    def __init__(self, name):
        self.name = name
        self.description = "Mock tool description"
        self.inputs = {}
        self.output_type = "string"
    def forward(self):
        return "Mock tool output"
class MockAgent:
    def __init__(self, name, tools, description="Mock agent description"):
        self.name = name
        self.tools = {t.name: t for t in tools}
        self.description = description
class DummyMultiStepAgent(MultiStepAgent):
    def step(self, memory_step: ActionStep) -> Generator[None]:
        yield None
    def initialize_system_prompt(self):
        pass
class TestMultiStepAgent:
    def test_instantiation_disables_logging_to_terminal(self):
        fake_model = MagicMock()
        agent = DummyMultiStepAgent(tools=[], model=fake_model)
        assert agent.logger.level == -1, "logging to terminal should be disabled for testing using a fixture"
    def test_instantiation_with_prompt_templates(self, prompt_templates):
        agent = DummyMultiStepAgent(tools=[], model=MagicMock(), prompt_templates=prompt_templates)
        assert agent.prompt_templates == prompt_templates
        assert agent.prompt_templates["system_prompt"] == "This is a test system prompt."
        assert "managed_agent" in agent.prompt_templates
        assert agent.prompt_templates["managed_agent"]["task"] == "Task for {{name}}: {{task}}"
        assert agent.prompt_templates["managed_agent"]["report"] == "Report for {{name}}: {{final_answer}}"
    @pytest.mark.parametrize(
        "tools, expected_final_answer_tool",
        [([], FinalAnswerTool), ([CustomFinalAnswerTool()], CustomFinalAnswerTool)],
    )
    def test_instantiation_with_final_answer_tool(self, tools, expected_final_answer_tool):
        agent = DummyMultiStepAgent(tools=tools, model=MagicMock())
        assert "final_answer" in agent.tools
        assert isinstance(agent.tools["final_answer"], expected_final_answer_tool)
    def test_instantiation_with_deprecated_grammar(self):
        class SimpleAgent(MultiStepAgent):
            def initialize_system_prompt(self) -> str:
                return "Test system prompt"
        # Test with a non-None grammar parameter
        with pytest.warns(
            FutureWarning, match="Parameter 'grammar' is deprecated and will be removed in version 1.20."
        ):
            SimpleAgent(tools=[], model=MagicMock(), grammar={"format": "json"}, verbosity_level=LogLevel.DEBUG)
        # Verify no warning when grammar is None
        with warnings.catch_warnings():
            warnings.simplefilter("error")  # Turn warnings into errors
            SimpleAgent(tools=[], model=MagicMock(), grammar=None, verbosity_level=LogLevel.DEBUG)
    def test_system_prompt_property(self):
        """Test that system_prompt property is read-only and calls initialize_system_prompt."""
        class SimpleAgent(MultiStepAgent):
            def initialize_system_prompt(self) -> str:
                return "Test system prompt"
            def step(self, memory_step: ActionStep) -> Generator[None]:
                yield None
        # Create a simple agent with mocked model
        model = MagicMock()
        agent = SimpleAgent(tools=[], model=model)
        # Test reading the property works and calls initialize_system_prompt
        assert agent.system_prompt == "Test system prompt"
        # Test setting the property raises AttributeError with correct message
        with pytest.raises(
            AttributeError,
            match=re.escape(
                """The 'system_prompt' property is read-only. Use 'self.prompt_templates["system_prompt"]' instead."""
            ),
        ):
            agent.system_prompt = "New system prompt"
        # assert "read-only" in str(exc_info.value)
        # assert "Use 'self.prompt_templates[\"system_prompt\"]' instead" in str(exc_info.value)
    def test_logs_display_thoughts_even_if_error(self):
        class FakeJsonModelNoCall(Model):
            def generate(self, messages, stop_sequences=None, tools_to_call_from=None):
                return ChatMessage(
                    role=MessageRole.ASSISTANT,
                    content="""I don't want to call tools today""",
                    tool_calls=None,
                    raw="""I don't want to call tools today""",
                )
        agent_toolcalling = ToolCallingAgent(model=FakeJsonModelNoCall(), tools=[], max_steps=1, verbosity_level=10)
        with agent_toolcalling.logger.console.capture() as capture:
            agent_toolcalling.run("Dummy task")
        assert "don't" in capture.get() and "want" in capture.get()
        class FakeCodeModelNoCall(Model):
            def generate(self, messages, stop_sequences=None):
                return ChatMessage(
                    role=MessageRole.ASSISTANT,
                    content="""I don't want to write an action today""",
                )
        agent_code = CodeAgent(model=FakeCodeModelNoCall(), tools=[], max_steps=1, verbosity_level=10)
        with agent_code.logger.console.capture() as capture:
            agent_code.run("Dummy task")
        assert "don't" in capture.get() and "want" in capture.get()
    def test_step_number(self):
        fake_model = MagicMock()
        fake_model.generate.return_value = ChatMessage(
            role=MessageRole.ASSISTANT,
            content="Model output.",
            tool_calls=None,
            raw="Model output.",
            token_usage=None,
        )
        max_steps = 2
        agent = CodeAgent(tools=[], model=fake_model, max_steps=max_steps)
        assert hasattr(agent, "step_number"), "step_number attribute should be defined"
        assert agent.step_number == 0, "step_number should be initialized to 0"
        agent.run("Test task")
        assert hasattr(agent, "step_number"), "step_number attribute should be defined"
        assert agent.step_number == max_steps + 1, "step_number should be max_steps + 1 after run method is called"
    @pytest.mark.parametrize(
        "step, expected_messages_list",
        [
            (
                1,
                [
                    [
                        ChatMessage(
                            role=MessageRole.USER, content=[{"type": "text", "text": "INITIAL_PLAN_USER_PROMPT"}]
                        ),
                    ],
                ],
            ),
            (
                2,
                [
                    [
                        ChatMessage(
                            role=MessageRole.SYSTEM,
                            content=[{"type": "text", "text": "UPDATE_PLAN_SYSTEM_PROMPT"}],
                        ),
                        ChatMessage(
                            role=MessageRole.USER,
                            content=[{"type": "text", "text": "UPDATE_PLAN_USER_PROMPT"}],
                        ),
                    ],
                ],
            ),
        ],
    )
    def test_planning_step(self, step, expected_messages_list):
        fake_model = MagicMock()
        agent = CodeAgent(
            tools=[],
            model=fake_model,
        )
        task = "Test task"
        planning_step = list(agent._generate_planning_step(task, is_first_step=(step == 1), step=step))[-1]
        expected_message_texts = {
            "INITIAL_PLAN_USER_PROMPT": populate_template(
                agent.prompt_templates["planning"]["initial_plan"],
                variables=dict(
                    task=task,
                    tools=agent.tools,
                    managed_agents=agent.managed_agents,
                    answer_facts=planning_step.model_output_message.content,
                ),
            ),
            "UPDATE_PLAN_SYSTEM_PROMPT": populate_template(
                agent.prompt_templates["planning"]["update_plan_pre_messages"], variables=dict(task=task)
            ),
            "UPDATE_PLAN_USER_PROMPT": populate_template(
                agent.prompt_templates["planning"]["update_plan_post_messages"],
                variables=dict(
                    task=task,
                    tools=agent.tools,
                    managed_agents=agent.managed_agents,
                    facts_update=planning_step.model_output_message.content,
                    remaining_steps=agent.max_steps - step,
                ),
            ),
        }
        for expected_messages in expected_messages_list:
            for expected_message in expected_messages:
                expected_message.content[0]["text"] = expected_message_texts[expected_message.content[0]["text"]]
        assert isinstance(planning_step, PlanningStep)
        expected_model_input_messages = expected_messages_list[0]
        model_input_messages = planning_step.model_input_messages
        assert isinstance(model_input_messages, list)
        assert len(model_input_messages) == len(expected_model_input_messages)  # 2
        for message, expected_message in zip(model_input_messages, expected_model_input_messages):
            assert isinstance(message, ChatMessage)
            assert message.role in MessageRole.__members__.values()
            assert message.role == expected_message.role
            assert isinstance(message.content, list)
            for content, expected_content in zip(message.content, expected_message.content):
                assert content == expected_content
        # Test calls to model
        assert len(fake_model.generate.call_args_list) == 1
        for call_args, expected_messages in zip(fake_model.generate.call_args_list, expected_messages_list):
            assert len(call_args.args) == 1
            messages = call_args.args[0]
            assert isinstance(messages, list)
            assert len(messages) == len(expected_messages)
            for message, expected_message in zip(messages, expected_messages):
                assert isinstance(message, ChatMessage)
                assert message.role in MessageRole.__members__.values()
                assert message.role == expected_message.role
                assert isinstance(message.content, list)
                for content, expected_content in zip(message.content, expected_message.content):
                    assert content == expected_content
    @pytest.mark.parametrize(
        "images, expected_messages_list",
        [
            (
                None,
                [
                    [
                        ChatMessage(
                            role=MessageRole.SYSTEM,
                            content=[{"type": "text", "text": "FINAL_ANSWER_SYSTEM_PROMPT"}],
                        ),
                        ChatMessage(
                            role=MessageRole.USER,
                            content=[{"type": "text", "text": "FINAL_ANSWER_USER_PROMPT"}],
                        ),
                    ]
                ],
            ),
            (
                ["image1.png"],
                [
                    [
                        ChatMessage(
                            role=MessageRole.SYSTEM,
                            content=[
                                {"type": "text", "text": "FINAL_ANSWER_SYSTEM_PROMPT"},
                                {"type": "image", "image": "image1.png"},
                            ],
                        ),
                        ChatMessage(
                            role=MessageRole.USER,
                            content=[{"type": "text", "text": "FINAL_ANSWER_USER_PROMPT"}],
                        ),
                    ]
                ],
            ),
        ],
    )
    def test_provide_final_answer(self, images, expected_messages_list):
        fake_model = MagicMock()
        fake_model.generate.return_value = ChatMessage(
            role=MessageRole.ASSISTANT,
            content="Final answer.",
            tool_calls=None,
            raw="Final answer.",
            token_usage=None,
        )
        agent = CodeAgent(
            tools=[],
            model=fake_model,
        )
        task = "Test task"
        final_answer = agent.provide_final_answer(task, images=images).content
        expected_message_texts = {
            "FINAL_ANSWER_SYSTEM_PROMPT": agent.prompt_templates["final_answer"]["pre_messages"],
            "FINAL_ANSWER_USER_PROMPT": populate_template(
                agent.prompt_templates["final_answer"]["post_messages"], variables=dict(task=task)
            ),
        }
        for expected_messages in expected_messages_list:
            for expected_message in expected_messages:
                for expected_content in expected_message.content:
                    if "text" in expected_content:
                        expected_content["text"] = expected_message_texts[expected_content["text"]]
        assert final_answer == "Final answer."
        # Test calls to model
        assert len(fake_model.generate.call_args_list) == 1
        for call_args, expected_messages in zip(fake_model.generate.call_args_list, expected_messages_list):
            assert len(call_args.args) == 1
            messages = call_args.args[0]
            assert isinstance(messages, list)
            assert len(messages) == len(expected_messages)
            for message, expected_message in zip(messages, expected_messages):
                assert isinstance(message, ChatMessage)
                assert message.role in MessageRole.__members__.values()
                assert message.role == expected_message.role
                assert isinstance(message.content, list)
                for content, expected_content in zip(message.content, expected_message.content):
                    assert content == expected_content
    def test_interrupt(self):
        fake_model = MagicMock()
        fake_model.generate.return_value = ChatMessage(
            role=MessageRole.ASSISTANT,
            content="Model output.",
            tool_calls=None,
            raw="Model output.",
            token_usage=None,
        )
        def interrupt_callback(memory_step, agent):
            agent.interrupt()
        agent = CodeAgent(
            tools=[],
            model=fake_model,
            step_callbacks=[interrupt_callback],
        )
        with pytest.raises(AgentError) as e:
            agent.run("Test task")
        assert "Agent interrupted" in str(e)
    @pytest.mark.parametrize(
        "tools, managed_agents, name, expectation",
        [
            # Valid case: no duplicates
            (
                [MockTool("tool1"), MockTool("tool2")],
                [MockAgent("agent1", [MockTool("tool3")])],
                "test_agent",
                does_not_raise(),
            ),
            # Invalid case: duplicate tool names
            ([MockTool("tool1"), MockTool("tool1")], [], "test_agent", pytest.raises(ValueError)),
            # Invalid case: tool name same as managed agent name
            (
                [MockTool("tool1")],
                [MockAgent("tool1", [MockTool("final_answer")])],
                "test_agent",
                pytest.raises(ValueError),
            ),
            # Valid case: tool name same as managed agent's tool name
            ([MockTool("tool1")], [MockAgent("agent1", [MockTool("tool1")])], "test_agent", does_not_raise()),
            # Invalid case: duplicate managed agent name and managed agent tool name
            ([MockTool("tool1")], [], "tool1", pytest.raises(ValueError)),
            # Valid case: duplicate tool names across managed agents
            (
                [MockTool("tool1")],
                [
                    MockAgent("agent1", [MockTool("tool2"), MockTool("final_answer")]),
                    MockAgent("agent2", [MockTool("tool2"), MockTool("final_answer")]),
                ],
                "test_agent",
                does_not_raise(),
            ),
        ],
    )
    def test_validate_tools_and_managed_agents(self, tools, managed_agents, name, expectation):
        fake_model = MagicMock()
        with expectation:
            DummyMultiStepAgent(
                tools=tools,
                model=fake_model,
                name=name,
                managed_agents=managed_agents,
            )
    def test_from_dict(self):
        # Create a test agent dictionary
        agent_dict = {
            "model": {"class": "TransformersModel", "data": {"model_id": "test/model"}},
            "tools": [
                {
                    "name": "valid_tool_function",
                    "code": 'from smolagents import Tool\nfrom typing import Any, Optional\n\nclass SimpleTool(Tool):\n    name = "valid_tool_function"\n    description = "A valid tool function."\n    inputs = {"input":{"type":"string","description":"Input string."}}\n    output_type = "string"\n\n    def forward(self, input: str) -> str:\n        """A valid tool function.\n\n        Args:\n            input (str): Input string.\n        """\n        return input.upper()',
                    "requirements": {"smolagents"},
                }
            ],
            "managed_agents": {},
            "prompt_templates": EMPTY_PROMPT_TEMPLATES,
            "max_steps": 15,
            "verbosity_level": 2,
            "planning_interval": 3,
            "name": "test_agent",
            "description": "Test agent description",
        }
        # Call from_dict
        with patch("smolagents.models.TransformersModel") as mock_model_class:
            mock_model_instance = mock_model_class.from_dict.return_value
            agent = DummyMultiStepAgent.from_dict(agent_dict)
        # Verify the agent was created correctly
        assert agent.model == mock_model_instance
        assert mock_model_class.from_dict.call_args.args[0] == {"model_id": "test/model"}
        assert agent.max_steps == 15
        assert agent.logger.level == 2
        assert agent.planning_interval == 3
        assert agent.name == "test_agent"
        assert agent.description == "Test agent description"
        # Verify the tool was created correctly
        assert sorted(agent.tools.keys()) == ["final_answer", "valid_tool_function"]
        assert agent.tools["valid_tool_function"].name == "valid_tool_function"
        assert agent.tools["valid_tool_function"].description == "A valid tool function."
        assert agent.tools["valid_tool_function"].inputs == {
            "input": {"type": "string", "description": "Input string."}
        }
        assert agent.tools["valid_tool_function"]("test") == "TEST"
        # Test overriding with kwargs
        with patch("smolagents.models.TransformersModel") as mock_model_class:
            agent = DummyMultiStepAgent.from_dict(agent_dict, max_steps=30)
        assert agent.max_steps == 30
class TestToolCallingAgent:
    def test_toolcalling_agent_instructions(self):
        agent = ToolCallingAgent(tools=[], model=MagicMock(), instructions="Test instructions")
        assert agent.instructions == "Test instructions"
        assert "Test instructions" in agent.system_prompt
    def test_toolcalling_agent_passes_both_tools_and_managed_agents(self, test_tool):
        """Test that both tools and managed agents are passed to the model."""
        managed_agent = MagicMock()
        managed_agent.name = "managed_agent"
        model = MagicMock()
        model.generate.return_value = ChatMessage(
            role=MessageRole.ASSISTANT,
            content="",
            tool_calls=[
                ChatMessageToolCall(
                    id="call_0",
                    type="function",
                    function=ChatMessageToolCallFunction(name="test_tool", arguments={"input": "test_value"}),
                )
            ],
        )
        agent = ToolCallingAgent(tools=[test_tool], managed_agents=[managed_agent], model=model)
        # Run the agent one step to trigger the model call
        next(agent.run("Test task", stream=True))
        # Check that the model was called with both tools and managed agents:
        # - Get all tool_to_call_from names passed to the model
        tools_to_call_from_names = [tool.name for tool in model.generate.call_args.kwargs["tools_to_call_from"]]
        # - Verify both regular tools and managed agents are included
        assert "test_tool" in tools_to_call_from_names  # The regular tool
        assert "managed_agent" in tools_to_call_from_names  # The managed agent
        assert "final_answer" in tools_to_call_from_names  # The final_answer tool (added by default)
    @patch("huggingface_hub.InferenceClient")
    def test_toolcalling_agent_api(self, mock_inference_client):
        mock_client = mock_inference_client.return_value
        mock_response = mock_client.chat_completion.return_value
        mock_response.choices[0].message = ChatCompletionOutputMessage(
            role=MessageRole.ASSISTANT,
            content='{"name": "weather_api", "arguments": {"location": "Paris", "date": "today"}}',
        )
        mock_response.usage.prompt_tokens = 10
        mock_response.usage.completion_tokens = 20
        model = InferenceClientModel(model_id="test-model")
        from smolagents import tool
        @tool
        def weather_api(location: str, date: str) -> str:
            """
            Gets the weather in the next days at given location.
            Args:
                location: the location
                date: the date
            """
            return f"The weather in {location} on date:{date} is sunny."
        agent = ToolCallingAgent(model=model, tools=[weather_api], max_steps=1)
        agent.run("What's the weather in Paris?")
        assert agent.memory.steps[0].task == "What's the weather in Paris?"
        assert agent.memory.steps[1].tool_calls[0].name == "weather_api"
        assert agent.memory.steps[1].tool_calls[0].arguments == {"location": "Paris", "date": "today"}
        assert agent.memory.steps[1].observations == "The weather in Paris on date:today is sunny."
        mock_response.choices[0].message = ChatCompletionOutputMessage(
            role=MessageRole.ASSISTANT,
            content=None,
            tool_calls=[
                ChatCompletionOutputToolCall(
                    function=ChatCompletionOutputFunctionDefinition(
                        name="weather_api", arguments='{"location": "Paris", "date": "today"}'
                    ),
                    id="call_0",
                    type="function",
                )
            ],
        )
        agent.run("What's the weather in Paris?")
        assert agent.memory.steps[0].task == "What's the weather in Paris?"
        assert agent.memory.steps[1].tool_calls[0].name == "weather_api"
        assert agent.memory.steps[1].tool_calls[0].arguments == {"location": "Paris", "date": "today"}
        assert agent.memory.steps[1].observations == "The weather in Paris on date:today is sunny."
    @patch("openai.OpenAI")
    def test_toolcalling_agent_stream_outputs_multiple_tool_calls(self, mock_openai_client, test_tool):
        """Test that ToolCallingAgent with stream_outputs=True returns the first final_answer when multiple are called."""
        mock_client = mock_openai_client.return_value
        from smolagents import OpenAIServerModel
        # Mock streaming response with multiple final_answer calls
        mock_deltas = [
            ChoiceDelta(role=MessageRole.ASSISTANT),
            ChoiceDelta(
                tool_calls=[
                    ChoiceDeltaToolCall(
                        index=0,
                        id="call_1",
                        function=ChoiceDeltaToolCallFunction(name="final_answer"),
                        type="function",
                    )
                ]
            ),
            ChoiceDelta(
                tool_calls=[ChoiceDeltaToolCall(index=0, function=ChoiceDeltaToolCallFunction(arguments='{"an'))]
            ),
            ChoiceDelta(
                tool_calls=[ChoiceDeltaToolCall(index=0, function=ChoiceDeltaToolCallFunction(arguments='swer"'))]
            ),
            ChoiceDelta(
                tool_calls=[ChoiceDeltaToolCall(index=0, function=ChoiceDeltaToolCallFunction(arguments=': "out'))]
            ),
            ChoiceDelta(
                tool_calls=[ChoiceDeltaToolCall(index=0, function=ChoiceDeltaToolCallFunction(arguments="put1"))]
            ),
            ChoiceDelta(
                tool_calls=[ChoiceDeltaToolCall(index=0, function=ChoiceDeltaToolCallFunction(arguments='"}'))]
            ),
            ChoiceDelta(
                tool_calls=[
                    ChoiceDeltaToolCall(
                        index=1,
                        id="call_2",
                        function=ChoiceDeltaToolCallFunction(name="test_tool"),
                        type="function",
                    )
                ]
            ),
            ChoiceDelta(
                tool_calls=[ChoiceDeltaToolCall(index=1, function=ChoiceDeltaToolCallFunction(arguments='{"in'))]
            ),
            ChoiceDelta(
                tool_calls=[ChoiceDeltaToolCall(index=1, function=ChoiceDeltaToolCallFunction(arguments='put"'))]
            ),
            ChoiceDelta(
                tool_calls=[ChoiceDeltaToolCall(index=1, function=ChoiceDeltaToolCallFunction(arguments=': "out'))]
            ),
            ChoiceDelta(
                tool_calls=[ChoiceDeltaToolCall(index=1, function=ChoiceDeltaToolCallFunction(arguments="put2"))]
            ),
            ChoiceDelta(
                tool_calls=[ChoiceDeltaToolCall(index=1, function=ChoiceDeltaToolCallFunction(arguments='"}'))]
            ),
        ]
        class MockChoice:
            def __init__(self, delta):
                self.delta = delta
        class MockChunk:
            def __init__(self, delta):
                self.choices = [MockChoice(delta)]
                self.usage = None
        mock_client.chat.completions.create.return_value = (MockChunk(delta) for delta in mock_deltas)
        # Mock usage for non-streaming fallback
        mock_usage = MagicMock()
        mock_usage.prompt_tokens = 10
        mock_usage.completion_tokens = 20
        model = OpenAIServerModel(model_id="fakemodel")
        agent = ToolCallingAgent(model=model, tools=[test_tool], max_steps=1, stream_outputs=True)
        result = agent.run("Make 2 calls to final answer: return both 'output1' and 'output2'")
        assert len(agent.memory.steps[-1].model_output_message.tool_calls) == 2
        assert agent.memory.steps[-1].model_output_message.tool_calls[0].function.name == "final_answer"
        assert agent.memory.steps[-1].model_output_message.tool_calls[1].function.name == "test_tool"
        # The agent should return the final answer call
        assert result == "output1"
    @patch("huggingface_hub.InferenceClient")
    def test_toolcalling_agent_api_misformatted_output(self, mock_inference_client):
        """Test that even misformatted json blobs don't interrupt the run for a ToolCallingAgent."""
        mock_client = mock_inference_client.return_value
        mock_response = mock_client.chat_completion.return_value
        mock_response.choices[0].message = ChatCompletionOutputMessage(
            role=MessageRole.ASSISTANT,
            content='{"name": weather_api", "arguments": {"location": "Paris", "date": "today"}}',
        )
        mock_response.usage.prompt_tokens = 10
        mock_response.usage.completion_tokens = 20
        model = InferenceClientModel(model_id="test-model")
        logger = AgentLogger(console=Console(markup=False, no_color=True))
        agent = ToolCallingAgent(model=model, tools=[], max_steps=2, verbosity_level=1, logger=logger)
        with agent.logger.console.capture() as capture:
            agent.run("What's the weather in Paris?")
        assert agent.memory.steps[0].task == "What's the weather in Paris?"
        assert agent.memory.steps[1].tool_calls is None
        assert "The JSON blob you used is invalid" in agent.memory.steps[1].error.message
        assert "Error while parsing" in capture.get()
        assert len(agent.memory.steps) == 4
    def test_change_tools_after_init(self):
        from smolagents import tool
        @tool
        def fake_tool_1() -> str:
            """Fake tool"""
            return "1"
        @tool
        def fake_tool_2() -> str:
            """Fake tool"""
            return "2"
        class FakeCodeModel(Model):
            def generate(self, messages, stop_sequences=None):
                return ChatMessage(role=MessageRole.ASSISTANT, content="\nfinal_answer(fake_tool_1())\n")
        agent = CodeAgent(tools=[fake_tool_1], model=FakeCodeModel())
        agent.tools["final_answer"] = CustomFinalAnswerTool()
        agent.tools["fake_tool_1"] = fake_tool_2
        answer = agent.run("Fake task.")
        assert answer == "2CUSTOM"
    def test_custom_final_answer_with_custom_inputs(self, test_tool):
        class CustomFinalAnswerToolWithCustomInputs(FinalAnswerTool):
            inputs = {
                "answer1": {"type": "string", "description": "First part of the answer."},
                "answer2": {"type": "string", "description": "Second part of the answer."},
            }
            def forward(self, answer1: str, answer2: str) -> str:
                return answer1 + " and " + answer2
        model = MagicMock()
        model.generate.return_value = ChatMessage(
            role=MessageRole.ASSISTANT,
            content=None,
            tool_calls=[
                ChatMessageToolCall(
                    id="call_0",
                    type="function",
                    function=ChatMessageToolCallFunction(
                        name="final_answer", arguments={"answer1": "1", "answer2": "2"}
                    ),
                ),
                ChatMessageToolCall(
                    id="call_1",
                    type="function",
                    function=ChatMessageToolCallFunction(name="test_tool", arguments={"input": "3"}),
                ),
            ],
        )
        agent = ToolCallingAgent(tools=[test_tool, CustomFinalAnswerToolWithCustomInputs()], model=model)
        answer = agent.run("Fake task.")
        assert answer == "1 and 2"
        assert agent.memory.steps[-1].model_output_message.tool_calls[0].function.name == "final_answer"
        assert agent.memory.steps[-1].model_output_message.tool_calls[1].function.name == "test_tool"
    @pytest.mark.parametrize(
        "test_case",
        [
            # Case 0: Single valid tool call
            {
                "tool_calls": [
                    ChatMessageToolCall(
                        id="call_1",
                        type="function",
                        function=ChatMessageToolCallFunction(name="test_tool", arguments={"input": "test_value"}),
                    )
                ],
                "expected_model_output": "Tool call call_1: calling 'test_tool' with arguments: {'input': 'test_value'}",
                "expected_observations": "Processed: test_value",
                "expected_final_outputs": ["Processed: test_value"],
                "expected_error": None,
            },
            # Case 1: Multiple tool calls
            {
                "tool_calls": [
                    ChatMessageToolCall(
                        id="call_1",
                        type="function",
                        function=ChatMessageToolCallFunction(name="test_tool", arguments={"input": "value1"}),
                    ),
                    ChatMessageToolCall(
                        id="call_2",
                        type="function",
                        function=ChatMessageToolCallFunction(name="test_tool", arguments={"input": "value2"}),
                    ),
                ],
                "expected_model_output": "Tool call call_1: calling 'test_tool' with arguments: {'input': 'value1'}\nTool call call_2: calling 'test_tool' with arguments: {'input': 'value2'}",
                "expected_observations": "Processed: value1\nProcessed: value2",
                "expected_final_outputs": ["Processed: value1", "Processed: value2"],
                "expected_error": None,
            },
            # Case 2: Invalid tool name
            {
                "tool_calls": [
                    ChatMessageToolCall(
                        id="call_1",
                        type="function",
                        function=ChatMessageToolCallFunction(name="nonexistent_tool", arguments={"input": "test"}),
                    )
                ],
                "expected_error": AgentToolExecutionError,
            },
            # Case 3: Tool execution error
            {
                "tool_calls": [
                    ChatMessageToolCall(
                        id="call_1",
                        type="function",
                        function=ChatMessageToolCallFunction(name="test_tool", arguments={"input": "error"}),
                    )
                ],
                "expected_error": AgentToolExecutionError,
            },
            # Case 4: Empty tool calls list
            {
                "tool_calls": [],
                "expected_model_output": "",
                "expected_observations": "",
                "expected_final_outputs": [],
                "expected_error": None,
            },
            # Case 5: Final answer call
            {
                "tool_calls": [
                    ChatMessageToolCall(
                        id="call_1",
                        type="function",
                        function=ChatMessageToolCallFunction(
                            name="final_answer", arguments={"answer": "This is the final answer"}
                        ),
                    )
                ],
                "expected_model_output": "Tool call call_1: calling 'final_answer' with arguments: {'answer': 'This is the final answer'}",
                "expected_observations": "This is the final answer",
                "expected_final_outputs": ["This is the final answer"],
                "expected_error": None,
            },
            # Case 6: Invalid arguments
            {
                "tool_calls": [
                    ChatMessageToolCall(
                        id="call_1",
                        type="function",
                        function=ChatMessageToolCallFunction(name="test_tool", arguments={"wrong_param": "value"}),
                    )
                ],
                "expected_error": AgentToolCallError,
            },
        ],
    )
    def test_process_tool_calls(self, test_case, test_tool):
        # Create a ToolCallingAgent instance with the test tool
        agent = ToolCallingAgent(tools=[test_tool], model=MagicMock())
        # Create chat message with the specified tool calls for process_tool_calls
        chat_message = ChatMessage(role=MessageRole.ASSISTANT, content="", tool_calls=test_case["tool_calls"])
        # Create a memory step for process_tool_calls
        memory_step = ActionStep(step_number=10, timing="mock_timing")
        # Process tool calls
        if test_case["expected_error"]:
            with pytest.raises(test_case["expected_error"]):
                list(agent.process_tool_calls(chat_message, memory_step))
        else:
            final_outputs = list(agent.process_tool_calls(chat_message, memory_step))
            assert memory_step.model_output == test_case["expected_model_output"]
            assert memory_step.observations == test_case["expected_observations"]
            assert [
                final_output.output for final_output in final_outputs if isinstance(final_output, ToolOutput)
            ] == test_case["expected_final_outputs"]
            # Verify memory step tool calls were updated correctly
            if test_case["tool_calls"]:
                assert memory_step.tool_calls == [
                    ToolCall(name=tool_call.function.name, arguments=tool_call.function.arguments, id=tool_call.id)
                    for tool_call in test_case["tool_calls"]
                ]
class TestCodeAgent:
    def test_code_agent_instructions(self):
        agent = CodeAgent(tools=[], model=MagicMock(), instructions="Test instructions")
        assert agent.instructions == "Test instructions"
        assert "Test instructions" in agent.system_prompt
        agent = CodeAgent(
            tools=[], model=MagicMock(), instructions="Test instructions", use_structured_outputs_internally=True
        )
        assert agent.instructions == "Test instructions"
        assert "Test instructions" in agent.system_prompt
    @pytest.mark.filterwarnings("ignore")  # Ignore FutureWarning for deprecated grammar parameter
    def test_init_with_incompatible_grammar_and_use_structured_outputs_internally(self):
        # Test that using both parameters raises ValueError with correct message
        with pytest.raises(
            ValueError, match="You cannot use 'grammar' and 'use_structured_outputs_internally' at the same time."
        ):
            CodeAgent(
                tools=[],
                model=MagicMock(),
                grammar={"format": "json"},
                use_structured_outputs_internally=True,
                verbosity_level=LogLevel.DEBUG,
            )
        # Verify no error when only one option is used
        # Only grammar
        agent_with_grammar = CodeAgent(
            tools=[],
            model=MagicMock(),
            grammar={"format": "json"},
            use_structured_outputs_internally=False,
            verbosity_level=LogLevel.DEBUG,
        )
        assert agent_with_grammar.grammar is not None
        assert agent_with_grammar._use_structured_outputs_internally is False
        # Only structured output
        agent_with_structured = CodeAgent(
            tools=[],
            model=MagicMock(),
            grammar=None,
            use_structured_outputs_internally=True,
            verbosity_level=LogLevel.DEBUG,
        )
        assert agent_with_structured.grammar is None
        assert agent_with_structured._use_structured_outputs_internally is True
    @pytest.mark.parametrize("provide_run_summary", [False, True])
    def test_call_with_provide_run_summary(self, provide_run_summary):
        agent = CodeAgent(tools=[], model=MagicMock(), provide_run_summary=provide_run_summary)
        assert agent.provide_run_summary is provide_run_summary
        agent.name = "test_agent"
        agent.run = MagicMock(return_value="Test output")
        agent.write_memory_to_messages = MagicMock(return_value=[{"content": "Test summary"}])
        result = agent("Test request")
        expected_summary = "Here is the final answer from your managed agent 'test_agent':\nTest output"
        if provide_run_summary:
            expected_summary += (
                "\n\nFor more detail, find below a summary of this agent's work:\n"
                "\n\nTest summary\n---\n"
            )
        assert result == expected_summary
    def test_errors_logging(self):
        class FakeCodeModel(Model):
            def generate(self, messages, stop_sequences=None):
                return ChatMessage(role=MessageRole.ASSISTANT, content="\nsecret=3;['1', '2'][secret]\n")
        agent = CodeAgent(tools=[], model=FakeCodeModel(), verbosity_level=1)
        with agent.logger.console.capture() as capture:
            agent.run("Test request")
        assert "secret\\\\" in repr(capture.get())
    def test_missing_import_triggers_advice_in_error_log(self):
        # Set explicit verbosity level to 1 to override the default verbosity level of -1 set in CI fixture
        agent = CodeAgent(tools=[], model=FakeCodeModelImport(), verbosity_level=1)
        with agent.logger.console.capture() as capture:
            agent.run("Count to 3")
        str_output = capture.get()
        assert "`additional_authorized_imports`" in str_output.replace("\n", "")
    def test_errors_show_offending_line_and_error(self):
        agent = CodeAgent(tools=[PythonInterpreterTool()], model=FakeCodeModelError())
        output = agent.run("What is 2 multiplied by 3.6452?")
        assert isinstance(output, AgentText)
        assert output == "got an error"
        assert "Code execution failed at line 'error_function()'" in str(agent.memory.steps[1].error)
        assert "ValueError" in str(agent.memory.steps)
    def test_error_saves_previous_print_outputs(self):
        agent = CodeAgent(tools=[PythonInterpreterTool()], model=FakeCodeModelError(), verbosity_level=10)
        agent.run("What is 2 multiplied by 3.6452?")
        assert "Flag!" in str(agent.memory.steps[1].observations)
    def test_syntax_error_show_offending_lines(self):
        agent = CodeAgent(tools=[PythonInterpreterTool()], model=FakeCodeModelSyntaxError())
        output = agent.run("What is 2 multiplied by 3.6452?")
        assert isinstance(output, AgentText)
        assert output == "got an error"
        assert '    print("Failing due to unexpected indent")' in str(agent.memory.steps)
        assert isinstance(agent.memory.steps[-2], ActionStep)
        assert agent.memory.steps[-2].code_action == dedent("""a = 2
b = a * 2
    print("Failing due to unexpected indent")
print("Ok, calculation done!")""")
    def test_end_code_appending(self):
        # Checking original output message
        orig_output = FakeCodeModelNoReturn().generate([])
        assert not orig_output.content.endswith("")
        # Checking the step output
        agent = CodeAgent(
            tools=[PythonInterpreterTool()],
            model=FakeCodeModelNoReturn(),
            max_steps=1,
        )
        answer = agent.run("What is 2 multiplied by 3.6452?")
        assert answer
        memory_steps = agent.memory.steps
        actions_steps = [s for s in memory_steps if isinstance(s, ActionStep)]
        outputs = [s.model_output for s in actions_steps if s.model_output]
        assert outputs
        assert all(o.endswith("") for o in outputs)
        messages = [s.model_output_message for s in actions_steps if s.model_output_message]
        assert messages
        assert all(m.content.endswith("") for m in messages)
    def test_change_tools_after_init(self):
        from smolagents import tool
        @tool
        def fake_tool_1() -> str:
            """Fake tool"""
            return "1"
        @tool
        def fake_tool_2() -> str:
            """Fake tool"""
            return "2"
        class FakeCodeModel(Model):
            def generate(self, messages, stop_sequences=None):
                return ChatMessage(role=MessageRole.ASSISTANT, content="\nfinal_answer(fake_tool_1())\n")
        agent = CodeAgent(tools=[fake_tool_1], model=FakeCodeModel())
        agent.tools["final_answer"] = CustomFinalAnswerTool()
        agent.tools["fake_tool_1"] = fake_tool_2
        answer = agent.run("Fake task.")
        assert answer == "2CUSTOM"
    def test_local_python_executor_with_custom_functions(self):
        model = MagicMock()
        model.generate.return_value = ChatMessage(
            role=MessageRole.ASSISTANT,
            content="",
            tool_calls=None,
            raw="",
            token_usage=None,
        )
        agent = CodeAgent(tools=[], model=model, executor_kwargs={"additional_functions": {"open": open}})
        agent.run("Test run")
        assert "open" in agent.python_executor.static_tools
    @pytest.mark.parametrize("agent_dict_version", ["v1.9", "v1.10"])
    def test_from_folder(self, agent_dict_version, get_agent_dict):
        agent_dict = get_agent_dict(agent_dict_version)
        with (
            patch("smolagents.agents.Path") as mock_path,
            patch("smolagents.models.InferenceClientModel") as mock_model,
        ):
            import json
            mock_path.return_value.__truediv__.return_value.read_text.return_value = json.dumps(agent_dict)
            mock_model.from_dict.return_value.model_id = "Qwen/Qwen2.5-Coder-32B-Instruct"
            agent = CodeAgent.from_folder("ignored_dummy_folder")
        assert isinstance(agent, CodeAgent)
        assert agent.name == "test_agent"
        assert agent.description == "dummy description"
        assert agent.max_steps == 10
        assert agent.planning_interval == 2
        assert agent.additional_authorized_imports == ["pandas"]
        assert "pandas" in agent.authorized_imports
        assert agent.executor_type == "local"
        assert agent.executor_kwargs == {}
        assert agent.max_print_outputs_length is None
        assert agent.managed_agents == {}
        assert set(agent.tools.keys()) == {"final_answer"}
        assert agent.model == mock_model.from_dict.return_value
        assert mock_model.from_dict.call_args.args[0]["model_id"] == "Qwen/Qwen2.5-Coder-32B-Instruct"
        assert agent.model.model_id == "Qwen/Qwen2.5-Coder-32B-Instruct"
        assert agent.logger.level == 2
        assert agent.prompt_templates["system_prompt"] == "dummy system prompt"
    def test_from_dict(self):
        # Create a test agent dictionary
        agent_dict = {
            "model": {"class": "InferenceClientModel", "data": {"model_id": "Qwen/Qwen2.5-Coder-32B-Instruct"}},
            "tools": [
                {
                    "name": "valid_tool_function",
                    "code": 'from smolagents import Tool\nfrom typing import Any, Optional\n\nclass SimpleTool(Tool):\n    name = "valid_tool_function"\n    description = "A valid tool function."\n    inputs = {"input":{"type":"string","description":"Input string."}}\n    output_type = "string"\n\n    def forward(self, input: str) -> str:\n        """A valid tool function.\n\n        Args:\n            input (str): Input string.\n        """\n        return input.upper()',
                    "requirements": {"smolagents"},
                }
            ],
            "managed_agents": {},
            "prompt_templates": EMPTY_PROMPT_TEMPLATES,
            "max_steps": 15,
            "verbosity_level": 2,
            "use_structured_output": False,
            "planning_interval": 3,
            "name": "test_code_agent",
            "description": "Test code agent description",
            "authorized_imports": ["pandas", "numpy"],
            "executor_type": "local",
            "executor_kwargs": {"max_print_outputs_length": 10_000},
            "max_print_outputs_length": 1000,
        }
        # Call from_dict
        with patch("smolagents.models.InferenceClientModel") as mock_model_class:
            mock_model_instance = mock_model_class.from_dict.return_value
            agent = CodeAgent.from_dict(agent_dict)
        # Verify the agent was created correctly with CodeAgent-specific parameters
        assert agent.model == mock_model_instance
        assert agent.additional_authorized_imports == ["pandas", "numpy"]
        assert agent.executor_type == "local"
        assert agent.executor_kwargs == {"max_print_outputs_length": 10_000}
        assert agent.max_print_outputs_length == 1000
        # Test with missing optional parameters
        minimal_agent_dict = {
            "model": {"class": "InferenceClientModel", "data": {"model_id": "Qwen/Qwen2.5-Coder-32B-Instruct"}},
            "tools": [],
            "managed_agents": {},
        }
        with patch("smolagents.models.InferenceClientModel"):
            agent = CodeAgent.from_dict(minimal_agent_dict)
        # Verify defaults are used
        assert agent.max_steps == 20  # default from MultiStepAgent.__init__
        # Test overriding with kwargs
        with patch("smolagents.models.InferenceClientModel"):
            agent = CodeAgent.from_dict(
                agent_dict,
                additional_authorized_imports=["matplotlib"],
                executor_kwargs={"max_print_outputs_length": 5_000},
            )
        assert agent.additional_authorized_imports == ["matplotlib"]
        assert agent.executor_kwargs == {"max_print_outputs_length": 5_000}
    def test_custom_final_answer_with_custom_inputs(self):
        class CustomFinalAnswerToolWithCustomInputs(FinalAnswerTool):
            inputs = {
                "answer1": {"type": "string", "description": "First part of the answer."},
                "answer2": {"type": "string", "description": "Second part of the answer."},
            }
            def forward(self, answer1: str, answer2: str) -> str:
                return answer1 + "CUSTOM" + answer2
        model = MagicMock()
        model.generate.return_value = ChatMessage(
            role=MessageRole.ASSISTANT, content="\nfinal_answer(answer1='1', answer2='2')\n"
        )
        agent = CodeAgent(tools=[CustomFinalAnswerToolWithCustomInputs()], model=model)
        answer = agent.run("Fake task.")
        assert answer == "1CUSTOM2"
class TestMultiAgents:
    def test_multiagents_save(self, tmp_path):
        model = InferenceClientModel(model_id="Qwen/Qwen2.5-Coder-32B-Instruct", max_tokens=2096, temperature=0.5)
        web_agent = ToolCallingAgent(
            model=model,
            tools=[DuckDuckGoSearchTool(max_results=2), VisitWebpageTool()],
            name="web_agent",
            description="does web searches",
        )
        code_agent = CodeAgent(model=model, tools=[], name="useless", description="does nothing in particular")
        agent = CodeAgent(
            model=model,
            tools=[],
            additional_authorized_imports=["pandas", "datetime"],
            managed_agents=[web_agent, code_agent],
            max_print_outputs_length=1000,
            executor_type="local",
            executor_kwargs={"max_print_outputs_length": 10_000},
        )
        agent.save(tmp_path)
        expected_structure = {
            "managed_agents": {
                "useless": {"tools": {"files": ["final_answer.py"]}, "files": ["agent.json", "prompts.yaml"]},
                "web_agent": {
                    "tools": {"files": ["final_answer.py", "visit_webpage.py", "web_search.py"]},
                    "files": ["agent.json", "prompts.yaml"],
                },
            },
            "tools": {"files": ["final_answer.py"]},
            "files": ["app.py", "requirements.txt", "agent.json", "prompts.yaml"],
        }
        def verify_structure(current_path: Path, structure: dict):
            for dir_name, contents in structure.items():
                if dir_name != "files":
                    # For directories, verify they exist and recurse into them
                    dir_path = current_path / dir_name
                    assert dir_path.exists(), f"Directory {dir_path} does not exist"
                    assert dir_path.is_dir(), f"{dir_path} is not a directory"
                    verify_structure(dir_path, contents)
                else:
                    # For files, verify each exists in the current path
                    for file_name in contents:
                        file_path = current_path / file_name
                        assert file_path.exists(), f"File {file_path} does not exist"
                        assert file_path.is_file(), f"{file_path} is not a file"
        verify_structure(tmp_path, expected_structure)
        # Test that re-loaded agents work as expected.
        agent2 = CodeAgent.from_folder(tmp_path, planning_interval=5)
        assert agent2.planning_interval == 5  # Check that kwargs are used
        assert set(agent2.authorized_imports) == set(["pandas", "datetime"] + BASE_BUILTIN_MODULES)
        assert agent2.max_print_outputs_length == 1000
        assert agent2.executor_type == "local"
        assert agent2.executor_kwargs == {"max_print_outputs_length": 10_000}
        assert (
            agent2.managed_agents["web_agent"].tools["web_search"].max_results == 10
        )  # For now tool init parameters are forgotten
        assert agent2.model.kwargs["temperature"] == pytest.approx(0.5)
    def test_multiagents(self):
        class FakeModelMultiagentsManagerAgent(Model):
            model_id = "fake_model"
            def generate(
                self,
                messages,
                stop_sequences=None,
                tools_to_call_from=None,
            ):
                if tools_to_call_from is not None:
                    if len(messages) < 3:
                        return ChatMessage(
                            role=MessageRole.ASSISTANT,
                            content="",
                            tool_calls=[
                                ChatMessageToolCall(
                                    id="call_0",
                                    type="function",
                                    function=ChatMessageToolCallFunction(
                                        name="search_agent",
                                        arguments="Who is the current US president?",
                                    ),
                                )
                            ],
                        )
                    else:
                        assert "Report on the current US president" in str(messages)
                        return ChatMessage(
                            role=MessageRole.ASSISTANT,
                            content="",
                            tool_calls=[
                                ChatMessageToolCall(
                                    id="call_0",
                                    type="function",
                                    function=ChatMessageToolCallFunction(
                                        name="final_answer", arguments="Final report."
                                    ),
                                )
                            ],
                        )
                else:
                    if len(messages) < 3:
                        return ChatMessage(
                            role=MessageRole.ASSISTANT,
                            content="""
Thought: Let's call our search agent.
result = search_agent("Who is the current US president?")
""",
                        )
                    else:
                        assert "Report on the current US president" in str(messages)
                        return ChatMessage(
                            role=MessageRole.ASSISTANT,
                            content="""
Thought: Let's return the report.
final_answer("Final report.")
""",
                        )
        manager_model = FakeModelMultiagentsManagerAgent()
        class FakeModelMultiagentsManagedAgent(Model):
            model_id = "fake_model"
            def generate(
                self,
                messages,
                tools_to_call_from=None,
                stop_sequences=None,
            ):
                return ChatMessage(
                    role=MessageRole.ASSISTANT,
                    content="Here is the secret content: FLAG1",
                    tool_calls=[
                        ChatMessageToolCall(
                            id="call_0",
                            type="function",
                            function=ChatMessageToolCallFunction(
                                name="final_answer",
                                arguments="Report on the current US president",
                            ),
                        )
                    ],
                )
        managed_model = FakeModelMultiagentsManagedAgent()
        web_agent = ToolCallingAgent(
            tools=[],
            model=managed_model,
            max_steps=10,
            name="search_agent",
            description="Runs web searches for you. Give it your request as an argument. Make the request as detailed as needed, you can ask for thorough reports",
            verbosity_level=2,
        )
        manager_code_agent = CodeAgent(
            tools=[],
            model=manager_model,
            managed_agents=[web_agent],
            additional_authorized_imports=["time", "numpy", "pandas"],
        )
        report = manager_code_agent.run("Fake question.")
        assert report == "Final report."
        manager_toolcalling_agent = ToolCallingAgent(
            tools=[],
            model=manager_model,
            managed_agents=[web_agent],
        )
        with web_agent.logger.console.capture() as capture:
            report = manager_toolcalling_agent.run("Fake question.")
        assert report == "Final report."
        assert "FLAG1" in capture.get()  # Check that managed agent's output is properly logged
        # Test that visualization works
        with manager_toolcalling_agent.logger.console.capture() as capture:
            manager_toolcalling_agent.visualize()
        assert "├──" in capture.get()
@pytest.fixture
def prompt_templates():
    return {
        "system_prompt": "This is a test system prompt.",
        "managed_agent": {"task": "Task for {{name}}: {{task}}", "report": "Report for {{name}}: {{final_answer}}"},
        "planning": {
            "initial_plan": "The plan.",
            "update_plan_pre_messages": "custom",
            "update_plan_post_messages": "custom",
        },
        "final_answer": {"pre_messages": "custom", "post_messages": "custom"},
    }
@pytest.mark.parametrize(
    "arguments",
    [
        {},
        {"arg": "bar"},
        {None: None},
        [1, 2, 3],
    ],
)
def test_tool_calling_agents_raises_tool_call_error_being_invoked_with_wrong_arguments(arguments):
    @tool
    def _sample_tool(prompt: str) -> str:
        """Tool that returns same string
        Args:
            prompt: The string to return
        Returns:
            The same string
        """
        return prompt
    agent = ToolCallingAgent(model=FakeToolCallModel(), tools=[_sample_tool])
    with pytest.raises(AgentToolCallError):
        agent.execute_tool_call(_sample_tool.name, arguments)
def test_tool_calling_agents_raises_agent_execution_error_when_tool_raises():
    @tool
    def _sample_tool(_: str) -> float:
        """Tool that fails
        Args:
            _: The pointless string
        Returns:
            Some number
        """
        return 1 / 0
    agent = ToolCallingAgent(model=FakeToolCallModel(), tools=[_sample_tool])
    with pytest.raises(AgentExecutionError):
        agent.execute_tool_call(_sample_tool.name, "sample")