ar08's picture
Upload 1040 files
246d201 verified
import copy
import os
import tarfile
from glob import glob
from e2b import Sandbox as E2BSandbox
from e2b.exceptions import TimeoutException
from openhands.core.config import SandboxConfig
from openhands.core.logger import openhands_logger as logger
class E2BBox:
closed = False
_cwd: str = '/home/user'
_env: dict[str, str] = {}
is_initial_session: bool = True
def __init__(
self,
config: SandboxConfig,
e2b_api_key: str,
template: str = 'openhands',
):
self.config = copy.deepcopy(config)
self.initialize_plugins: bool = config.initialize_plugins
self.sandbox = E2BSandbox(
api_key=e2b_api_key,
template=template,
# It's possible to stream stdout and stderr from sandbox and from each process
on_stderr=lambda x: logger.debug(f'E2B sandbox stderr: {x}'),
on_stdout=lambda x: logger.debug(f'E2B sandbox stdout: {x}'),
cwd=self._cwd, # Default workdir inside sandbox
)
logger.debug(f'Started E2B sandbox with ID "{self.sandbox.id}"')
@property
def filesystem(self):
return self.sandbox.filesystem
def _archive(self, host_src: str, recursive: bool = False):
if recursive:
assert os.path.isdir(
host_src
), 'Source must be a directory when recursive is True'
files = glob(host_src + '/**/*', recursive=True)
srcname = os.path.basename(host_src)
tar_filename = os.path.join(os.path.dirname(host_src), srcname + '.tar')
with tarfile.open(tar_filename, mode='w') as tar:
for file in files:
tar.add(
file, arcname=os.path.relpath(file, os.path.dirname(host_src))
)
else:
assert os.path.isfile(
host_src
), 'Source must be a file when recursive is False'
srcname = os.path.basename(host_src)
tar_filename = os.path.join(os.path.dirname(host_src), srcname + '.tar')
with tarfile.open(tar_filename, mode='w') as tar:
tar.add(host_src, arcname=srcname)
return tar_filename
def execute(self, cmd: str, timeout: int | None = None) -> tuple[int, str]:
timeout = timeout if timeout is not None else self.config.timeout
process = self.sandbox.process.start(cmd, env_vars=self._env)
try:
process_output = process.wait(timeout=timeout)
except TimeoutException:
logger.debug('Command timed out, killing process...')
process.kill()
return -1, f'Command: "{cmd}" timed out'
logs = [m.line for m in process_output.messages]
logs_str = '\n'.join(logs)
if process.exit_code is None:
return -1, logs_str
assert process_output.exit_code is not None
return process_output.exit_code, logs_str
def copy_to(self, host_src: str, sandbox_dest: str, recursive: bool = False):
"""Copies a local file or directory to the sandbox."""
tar_filename = self._archive(host_src, recursive)
# Prepend the sandbox destination with our sandbox cwd
sandbox_dest = os.path.join(self._cwd, sandbox_dest.removeprefix('/'))
with open(tar_filename, 'rb') as tar_file:
# Upload the archive to /home/user (default destination that always exists)
uploaded_path = self.sandbox.upload_file(tar_file)
# Check if sandbox_dest exists. If not, create it.
process = self.sandbox.process.start_and_wait(f'test -d {sandbox_dest}')
if process.exit_code != 0:
self.sandbox.filesystem.make_dir(sandbox_dest)
# Extract the archive into the destination and delete the archive
process = self.sandbox.process.start_and_wait(
f'sudo tar -xf {uploaded_path} -C {sandbox_dest} && sudo rm {uploaded_path}'
)
if process.exit_code != 0:
raise Exception(
f'Failed to extract {uploaded_path} to {sandbox_dest}: {process.stderr}'
)
# Delete the local archive
os.remove(tar_filename)
def close(self):
self.sandbox.close()
def get_working_directory(self):
return self.sandbox.cwd