Spaces:
Build error
Build error
from dataclasses import dataclass, field | |
from openhands.core.schema import ObservationType | |
from openhands.events.event import RecallType | |
from openhands.events.observation.observation import Observation | |
class AgentStateChangedObservation(Observation): | |
"""This data class represents the result from delegating to another agent""" | |
agent_state: str | |
reason: str = '' | |
observation: str = ObservationType.AGENT_STATE_CHANGED | |
def message(self) -> str: | |
return '' | |
class AgentCondensationObservation(Observation): | |
"""The output of a condensation action.""" | |
observation: str = ObservationType.CONDENSE | |
def message(self) -> str: | |
return self.content | |
class AgentThinkObservation(Observation): | |
"""The output of a think action. | |
In practice, this is a no-op, since it will just reply a static message to the agent | |
acknowledging that the thought has been logged. | |
""" | |
observation: str = ObservationType.THINK | |
def message(self) -> str: | |
return self.content | |
class MicroagentKnowledge: | |
""" | |
Represents knowledge from a triggered microagent. | |
Attributes: | |
name: The name of the microagent that was triggered | |
trigger: The word that triggered this microagent | |
content: The actual content/knowledge from the microagent | |
""" | |
name: str | |
trigger: str | |
content: str | |
class RecallObservation(Observation): | |
"""The retrieval of content from a microagent or more microagents.""" | |
recall_type: RecallType | |
observation: str = ObservationType.RECALL | |
# workspace context | |
repo_name: str = '' | |
repo_directory: str = '' | |
repo_instructions: str = '' | |
runtime_hosts: dict[str, int] = field(default_factory=dict) | |
additional_agent_instructions: str = '' | |
date: str = '' | |
custom_secrets_descriptions: dict[str, str] = field(default_factory=dict) | |
conversation_instructions: str = '' | |
# knowledge | |
microagent_knowledge: list[MicroagentKnowledge] = field(default_factory=list) | |
""" | |
A list of MicroagentKnowledge objects, each containing information from a triggered microagent. | |
Example: | |
[ | |
MicroagentKnowledge( | |
name="python_best_practices", | |
trigger="python", | |
content="Always use virtual environments for Python projects." | |
), | |
MicroagentKnowledge( | |
name="git_workflow", | |
trigger="git", | |
content="Create a new branch for each feature or bugfix." | |
) | |
] | |
""" | |
def message(self) -> str: | |
return ( | |
'Added workspace context' | |
if self.recall_type == RecallType.WORKSPACE_CONTEXT | |
else 'Added microagent knowledge' | |
) | |
def __str__(self) -> str: | |
# Build a string representation | |
fields = [] | |
if self.recall_type == RecallType.WORKSPACE_CONTEXT: | |
fields.extend( | |
[ | |
f'recall_type={self.recall_type}', | |
f'repo_name={self.repo_name}', | |
f'repo_instructions={self.repo_instructions[:20]}...', | |
f'runtime_hosts={self.runtime_hosts}', | |
f'additional_agent_instructions={self.additional_agent_instructions[:20]}...', | |
f'date={self.date}' | |
f'custom_secrets_descriptions={self.custom_secrets_descriptions}', | |
f'conversation_instructions={self.conversation_instructions[0:20]}...', | |
] | |
) | |
else: | |
fields.extend( | |
[ | |
f'recall_type={self.recall_type}', | |
] | |
) | |
if self.microagent_knowledge: | |
fields.extend( | |
[ | |
f'microagent_knowledge={", ".join([m.name for m in self.microagent_knowledge])}', | |
] | |
) | |
return f'**RecallObservation**\n{", ".join(fields)}' | |