import os from pathlib import Path from openhands.events.observation import ( ErrorObservation, FileReadObservation, FileWriteObservation, Observation, ) def resolve_path( file_path: str, working_directory: str, workspace_base: str, workspace_mount_path_in_sandbox: str, ): """Resolve a file path to a path on the host filesystem. Args: file_path: The path to resolve. working_directory: The working directory of the agent. workspace_mount_path_in_sandbox: The path to the workspace inside the sandbox. workspace_base: The base path of the workspace on the host filesystem. Returns: The resolved path on the host filesystem. """ path_in_sandbox = Path(file_path) # Apply working directory if not path_in_sandbox.is_absolute(): path_in_sandbox = Path(working_directory) / path_in_sandbox # Sanitize the path with respect to the root of the full sandbox # (deny any .. path traversal to parent directories of the sandbox) abs_path_in_sandbox = path_in_sandbox.resolve() # If the path is outside the workspace, deny it if not abs_path_in_sandbox.is_relative_to(workspace_mount_path_in_sandbox): raise PermissionError(f'File access not permitted: {file_path}') # Get path relative to the root of the workspace inside the sandbox path_in_workspace = abs_path_in_sandbox.relative_to( Path(workspace_mount_path_in_sandbox) ) # Get path relative to host path_in_host_workspace = Path(workspace_base) / path_in_workspace return path_in_host_workspace def read_lines(all_lines: list[str], start=0, end=-1): start = max(start, 0) start = min(start, len(all_lines)) end = -1 if end == -1 else max(end, 0) end = min(end, len(all_lines)) if end == -1: if start == 0: return all_lines else: return all_lines[start:] else: num_lines = len(all_lines) begin = max(0, min(start, num_lines - 2)) end = -1 if end > num_lines else max(begin + 1, end) return all_lines[begin:end] async def read_file( path, workdir, workspace_base, workspace_mount_path_in_sandbox, start=0, end=-1 ) -> Observation: try: whole_path = resolve_path( path, workdir, workspace_base, workspace_mount_path_in_sandbox ) except PermissionError: return ErrorObservation( f"You're not allowed to access this path: {path}. You can only access paths inside the workspace." ) try: with open(whole_path, 'r', encoding='utf-8') as file: lines = read_lines(file.readlines(), start, end) except FileNotFoundError: return ErrorObservation(f'File not found: {path}') except UnicodeDecodeError: return ErrorObservation(f'File could not be decoded as utf-8: {path}') except IsADirectoryError: return ErrorObservation(f'Path is a directory: {path}. You can only read files') code_view = ''.join(lines) return FileReadObservation(path=path, content=code_view) def insert_lines( to_insert: list[str], original: list[str], start: int = 0, end: int = -1 ): """Insert the new content to the original content based on start and end""" new_lines = [''] if start == 0 else original[:start] new_lines += [i + '\n' for i in to_insert] new_lines += [''] if end == -1 else original[end:] return new_lines async def write_file( path, workdir, workspace_base, workspace_mount_path_in_sandbox, content, start=0, end=-1, ) -> Observation: insert = content.split('\n') try: whole_path = resolve_path( path, workdir, workspace_base, workspace_mount_path_in_sandbox ) if not os.path.exists(os.path.dirname(whole_path)): os.makedirs(os.path.dirname(whole_path)) mode = 'w' if not os.path.exists(whole_path) else 'r+' try: with open(whole_path, mode, encoding='utf-8') as file: if mode != 'w': all_lines = file.readlines() new_file = insert_lines(insert, all_lines, start, end) else: new_file = [i + '\n' for i in insert] file.seek(0) file.writelines(new_file) file.truncate() except FileNotFoundError: return ErrorObservation(f'File not found: {path}') except IsADirectoryError: return ErrorObservation( f'Path is a directory: {path}. You can only write to files' ) except UnicodeDecodeError: return ErrorObservation(f'File could not be decoded as utf-8: {path}') except PermissionError: return ErrorObservation(f'Malformed paths not permitted: {path}') return FileWriteObservation(content='', path=path)