File size: 5,070 Bytes
246d201 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 |
import os
from pathlib import Path
from openhands.events.observation import (
ErrorObservation,
FileReadObservation,
FileWriteObservation,
Observation,
)
def resolve_path(
file_path: str,
working_directory: str,
workspace_base: str,
workspace_mount_path_in_sandbox: str,
):
"""Resolve a file path to a path on the host filesystem.
Args:
file_path: The path to resolve.
working_directory: The working directory of the agent.
workspace_mount_path_in_sandbox: The path to the workspace inside the sandbox.
workspace_base: The base path of the workspace on the host filesystem.
Returns:
The resolved path on the host filesystem.
"""
path_in_sandbox = Path(file_path)
# Apply working directory
if not path_in_sandbox.is_absolute():
path_in_sandbox = Path(working_directory) / path_in_sandbox
# Sanitize the path with respect to the root of the full sandbox
# (deny any .. path traversal to parent directories of the sandbox)
abs_path_in_sandbox = path_in_sandbox.resolve()
# If the path is outside the workspace, deny it
if not abs_path_in_sandbox.is_relative_to(workspace_mount_path_in_sandbox):
raise PermissionError(f'File access not permitted: {file_path}')
# Get path relative to the root of the workspace inside the sandbox
path_in_workspace = abs_path_in_sandbox.relative_to(
Path(workspace_mount_path_in_sandbox)
)
# Get path relative to host
path_in_host_workspace = Path(workspace_base) / path_in_workspace
return path_in_host_workspace
def read_lines(all_lines: list[str], start=0, end=-1):
start = max(start, 0)
start = min(start, len(all_lines))
end = -1 if end == -1 else max(end, 0)
end = min(end, len(all_lines))
if end == -1:
if start == 0:
return all_lines
else:
return all_lines[start:]
else:
num_lines = len(all_lines)
begin = max(0, min(start, num_lines - 2))
end = -1 if end > num_lines else max(begin + 1, end)
return all_lines[begin:end]
async def read_file(
path, workdir, workspace_base, workspace_mount_path_in_sandbox, start=0, end=-1
) -> Observation:
try:
whole_path = resolve_path(
path, workdir, workspace_base, workspace_mount_path_in_sandbox
)
except PermissionError:
return ErrorObservation(
f"You're not allowed to access this path: {path}. You can only access paths inside the workspace."
)
try:
with open(whole_path, 'r', encoding='utf-8') as file:
lines = read_lines(file.readlines(), start, end)
except FileNotFoundError:
return ErrorObservation(f'File not found: {path}')
except UnicodeDecodeError:
return ErrorObservation(f'File could not be decoded as utf-8: {path}')
except IsADirectoryError:
return ErrorObservation(f'Path is a directory: {path}. You can only read files')
code_view = ''.join(lines)
return FileReadObservation(path=path, content=code_view)
def insert_lines(
to_insert: list[str], original: list[str], start: int = 0, end: int = -1
):
"""Insert the new content to the original content based on start and end"""
new_lines = [''] if start == 0 else original[:start]
new_lines += [i + '\n' for i in to_insert]
new_lines += [''] if end == -1 else original[end:]
return new_lines
async def write_file(
path,
workdir,
workspace_base,
workspace_mount_path_in_sandbox,
content,
start=0,
end=-1,
) -> Observation:
insert = content.split('\n')
try:
whole_path = resolve_path(
path, workdir, workspace_base, workspace_mount_path_in_sandbox
)
if not os.path.exists(os.path.dirname(whole_path)):
os.makedirs(os.path.dirname(whole_path))
mode = 'w' if not os.path.exists(whole_path) else 'r+'
try:
with open(whole_path, mode, encoding='utf-8') as file:
if mode != 'w':
all_lines = file.readlines()
new_file = insert_lines(insert, all_lines, start, end)
else:
new_file = [i + '\n' for i in insert]
file.seek(0)
file.writelines(new_file)
file.truncate()
except FileNotFoundError:
return ErrorObservation(f'File not found: {path}')
except IsADirectoryError:
return ErrorObservation(
f'Path is a directory: {path}. You can only write to files'
)
except UnicodeDecodeError:
return ErrorObservation(f'File could not be decoded as utf-8: {path}')
except PermissionError:
return ErrorObservation(f'Malformed paths not permitted: {path}')
return FileWriteObservation(content='', path=path)
|