|
import os
|
|
import tempfile
|
|
import threading
|
|
from abc import abstractmethod
|
|
from pathlib import Path
|
|
from typing import Any
|
|
from zipfile import ZipFile
|
|
|
|
import requests
|
|
|
|
from openhands.core.config import AppConfig
|
|
from openhands.core.exceptions import (
|
|
AgentRuntimeTimeoutError,
|
|
)
|
|
from openhands.events import EventStream
|
|
from openhands.events.action import (
|
|
ActionConfirmationStatus,
|
|
BrowseInteractiveAction,
|
|
BrowseURLAction,
|
|
CmdRunAction,
|
|
FileEditAction,
|
|
FileReadAction,
|
|
FileWriteAction,
|
|
IPythonRunCellAction,
|
|
)
|
|
from openhands.events.action.action import Action
|
|
from openhands.events.observation import (
|
|
ErrorObservation,
|
|
NullObservation,
|
|
Observation,
|
|
UserRejectObservation,
|
|
)
|
|
from openhands.events.serialization import event_to_dict, observation_from_dict
|
|
from openhands.events.serialization.action import ACTION_TYPE_TO_CLASS
|
|
from openhands.runtime.base import Runtime
|
|
from openhands.runtime.plugins import PluginRequirement
|
|
from openhands.runtime.utils.request import send_request
|
|
from openhands.utils.http_session import HttpSession
|
|
|
|
|
|
class ActionExecutionClient(Runtime):
|
|
"""Base class for runtimes that interact with the action execution server.
|
|
|
|
This class contains shared logic between DockerRuntime and RemoteRuntime
|
|
for interacting with the HTTP server defined in action_execution_server.py.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
config: AppConfig,
|
|
event_stream: EventStream,
|
|
sid: str = 'default',
|
|
plugins: list[PluginRequirement] | None = None,
|
|
env_vars: dict[str, str] | None = None,
|
|
status_callback: Any | None = None,
|
|
attach_to_existing: bool = False,
|
|
headless_mode: bool = True,
|
|
):
|
|
self.session = HttpSession()
|
|
self.action_semaphore = threading.Semaphore(1)
|
|
self._runtime_initialized: bool = False
|
|
self._runtime_closed: bool = False
|
|
self._vscode_token: str | None = None
|
|
super().__init__(
|
|
config,
|
|
event_stream,
|
|
sid,
|
|
plugins,
|
|
env_vars,
|
|
status_callback,
|
|
attach_to_existing,
|
|
headless_mode,
|
|
)
|
|
|
|
@abstractmethod
|
|
def _get_action_execution_server_host(self) -> str:
|
|
pass
|
|
|
|
def _send_action_server_request(
|
|
self,
|
|
method: str,
|
|
url: str,
|
|
**kwargs,
|
|
) -> requests.Response:
|
|
"""Send a request to the action execution server.
|
|
|
|
Args:
|
|
method: HTTP method (GET, POST, etc.)
|
|
url: URL to send the request to
|
|
**kwargs: Additional arguments to pass to requests.request()
|
|
|
|
Returns:
|
|
Response from the server
|
|
|
|
Raises:
|
|
AgentRuntimeError: If the request fails
|
|
"""
|
|
return send_request(self.session, method, url, **kwargs)
|
|
|
|
def check_if_alive(self) -> None:
|
|
with self._send_action_server_request(
|
|
'GET',
|
|
f'{self._get_action_execution_server_host()}/alive',
|
|
timeout=5,
|
|
):
|
|
pass
|
|
|
|
def list_files(self, path: str | None = None) -> list[str]:
|
|
"""List files in the sandbox.
|
|
|
|
If path is None, list files in the sandbox's initial working directory (e.g., /workspace).
|
|
"""
|
|
|
|
try:
|
|
data = {}
|
|
if path is not None:
|
|
data['path'] = path
|
|
|
|
with self._send_action_server_request(
|
|
'POST',
|
|
f'{self._get_action_execution_server_host()}/list_files',
|
|
json=data,
|
|
timeout=10,
|
|
) as response:
|
|
response_json = response.json()
|
|
assert isinstance(response_json, list)
|
|
return response_json
|
|
except requests.Timeout:
|
|
raise TimeoutError('List files operation timed out')
|
|
|
|
def copy_from(self, path: str) -> Path:
|
|
"""Zip all files in the sandbox and return as a stream of bytes."""
|
|
|
|
try:
|
|
params = {'path': path}
|
|
with self._send_action_server_request(
|
|
'GET',
|
|
f'{self._get_action_execution_server_host()}/download_files',
|
|
params=params,
|
|
stream=True,
|
|
timeout=30,
|
|
) as response:
|
|
temp_file = tempfile.NamedTemporaryFile(delete=False)
|
|
for chunk in response.iter_content(chunk_size=8192):
|
|
if chunk:
|
|
temp_file.write(chunk)
|
|
return Path(temp_file.name)
|
|
except requests.Timeout:
|
|
raise TimeoutError('Copy operation timed out')
|
|
|
|
def copy_to(
|
|
self, host_src: str, sandbox_dest: str, recursive: bool = False
|
|
) -> None:
|
|
if not os.path.exists(host_src):
|
|
raise FileNotFoundError(f'Source file {host_src} does not exist')
|
|
|
|
try:
|
|
if recursive:
|
|
with tempfile.NamedTemporaryFile(
|
|
suffix='.zip', delete=False
|
|
) as temp_zip:
|
|
temp_zip_path = temp_zip.name
|
|
|
|
with ZipFile(temp_zip_path, 'w') as zipf:
|
|
for root, _, files in os.walk(host_src):
|
|
for file in files:
|
|
file_path = os.path.join(root, file)
|
|
arcname = os.path.relpath(
|
|
file_path, os.path.dirname(host_src)
|
|
)
|
|
zipf.write(file_path, arcname)
|
|
|
|
upload_data = {'file': open(temp_zip_path, 'rb')}
|
|
else:
|
|
upload_data = {'file': open(host_src, 'rb')}
|
|
|
|
params = {'destination': sandbox_dest, 'recursive': str(recursive).lower()}
|
|
|
|
with self._send_action_server_request(
|
|
'POST',
|
|
f'{self._get_action_execution_server_host()}/upload_file',
|
|
files=upload_data,
|
|
params=params,
|
|
timeout=300,
|
|
) as response:
|
|
self.log(
|
|
'debug',
|
|
f'Copy completed: host:{host_src} -> runtime:{sandbox_dest}. Response: {response.text}',
|
|
)
|
|
finally:
|
|
if recursive:
|
|
os.unlink(temp_zip_path)
|
|
self.log(
|
|
'debug', f'Copy completed: host:{host_src} -> runtime:{sandbox_dest}'
|
|
)
|
|
|
|
def get_vscode_token(self) -> str:
|
|
if self.vscode_enabled and self._runtime_initialized:
|
|
if self._vscode_token is not None:
|
|
return self._vscode_token
|
|
with self._send_action_server_request(
|
|
'GET',
|
|
f'{self._get_action_execution_server_host()}/vscode/connection_token',
|
|
timeout=10,
|
|
) as response:
|
|
response_json = response.json()
|
|
assert isinstance(response_json, dict)
|
|
if response_json['token'] is None:
|
|
return ''
|
|
self._vscode_token = response_json['token']
|
|
return response_json['token']
|
|
else:
|
|
return ''
|
|
|
|
def send_action_for_execution(self, action: Action) -> Observation:
|
|
if isinstance(action, FileEditAction):
|
|
return self.edit(action)
|
|
|
|
|
|
if action.timeout is None:
|
|
|
|
action.set_hard_timeout(self.config.sandbox.timeout, blocking=False)
|
|
|
|
with self.action_semaphore:
|
|
if not action.runnable:
|
|
return NullObservation('')
|
|
if (
|
|
hasattr(action, 'confirmation_state')
|
|
and action.confirmation_state
|
|
== ActionConfirmationStatus.AWAITING_CONFIRMATION
|
|
):
|
|
return NullObservation('')
|
|
action_type = action.action
|
|
if action_type not in ACTION_TYPE_TO_CLASS:
|
|
raise ValueError(f'Action {action_type} does not exist.')
|
|
if not hasattr(self, action_type):
|
|
return ErrorObservation(
|
|
f'Action {action_type} is not supported in the current runtime.',
|
|
error_id='AGENT_ERROR$BAD_ACTION',
|
|
)
|
|
if (
|
|
getattr(action, 'confirmation_state', None)
|
|
== ActionConfirmationStatus.REJECTED
|
|
):
|
|
return UserRejectObservation(
|
|
'Action has been rejected by the user! Waiting for further user input.'
|
|
)
|
|
|
|
assert action.timeout is not None
|
|
|
|
try:
|
|
with self._send_action_server_request(
|
|
'POST',
|
|
f'{self._get_action_execution_server_host()}/execute_action',
|
|
json={'action': event_to_dict(action)},
|
|
|
|
timeout=action.timeout + 5,
|
|
) as response:
|
|
output = response.json()
|
|
obs = observation_from_dict(output)
|
|
obs._cause = action.id
|
|
except requests.Timeout:
|
|
raise AgentRuntimeTimeoutError(
|
|
f'Runtime failed to return execute_action before the requested timeout of {action.timeout}s'
|
|
)
|
|
return obs
|
|
|
|
def run(self, action: CmdRunAction) -> Observation:
|
|
return self.send_action_for_execution(action)
|
|
|
|
def run_ipython(self, action: IPythonRunCellAction) -> Observation:
|
|
return self.send_action_for_execution(action)
|
|
|
|
def read(self, action: FileReadAction) -> Observation:
|
|
return self.send_action_for_execution(action)
|
|
|
|
def write(self, action: FileWriteAction) -> Observation:
|
|
return self.send_action_for_execution(action)
|
|
|
|
def browse(self, action: BrowseURLAction) -> Observation:
|
|
return self.send_action_for_execution(action)
|
|
|
|
def browse_interactive(self, action: BrowseInteractiveAction) -> Observation:
|
|
return self.send_action_for_execution(action)
|
|
|
|
def close(self) -> None:
|
|
|
|
|
|
if self._runtime_closed:
|
|
return
|
|
self._runtime_closed = True
|
|
self.session.close()
|
|
|