import os import re import time import traceback import uuid from enum import Enum import bashlex import libtmux from openhands.core.logger import openhands_logger as logger from import CmdRunAction from import ErrorObservation from import ( CMD_OUTPUT_PS1_END, CmdOutputMetadata, CmdOutputObservation, ) from openhands.utils.shutdown_listener import should_continue def split_bash_commands(commands): if not commands.strip(): return [''] try: parsed = bashlex.parse(commands) except (bashlex.errors.ParsingError, NotImplementedError): logger.debug( f'Failed to parse bash commands\n' f'[input]: {commands}\n' f'[warning]: {traceback.format_exc()}\n' f'The original command will be returned as is.' ) # If parsing fails, return the original commands return [commands] result: list[str] = [] last_end = 0 for node in parsed: start, end = node.pos # Include any text between the last command and this one if start > last_end: between = commands[last_end:start] logger.debug(f'BASH PARSING between: {between}') if result: result[-1] += between.rstrip() elif between.strip(): # THIS SHOULD NOT HAPPEN result.append(between.rstrip()) # Extract the command, preserving original formatting command = commands[start:end].rstrip() logger.debug(f'BASH PARSING command: {command}') result.append(command) last_end = end # Add any remaining text after the last command to the last command remaining = commands[last_end:].rstrip() logger.debug(f'BASH PARSING remaining: {remaining}') if last_end < len(commands) and result: result[-1] += remaining logger.debug(f'BASH PARSING result[-1] += remaining: {result[-1]}') elif last_end < len(commands): if remaining: result.append(remaining) logger.debug(f'BASH PARSING result.append(remaining): {result[-1]}') return result def escape_bash_special_chars(command: str) -> str: r""" Escapes characters that have different interpretations in bash vs python. Specifically handles escape sequences like \;, \|, \&, etc. """ if command.strip() == '': return '' try: parts = [] last_pos = 0 def visit_node(node): nonlocal last_pos if ( node.kind == 'redirect' and hasattr(node, 'heredoc') and node.heredoc is not None ): # We're entering a heredoc - preserve everything as-is until we see EOF # Store the heredoc end marker (usually 'EOF' but could be different) between = command[last_pos : node.pos[0]] parts.append(between) # Add the heredoc start marker parts.append(command[node.pos[0] : node.heredoc.pos[0]]) # Add the heredoc content as-is parts.append(command[node.heredoc.pos[0] : node.heredoc.pos[1]]) last_pos = node.pos[1] return if node.kind == 'word': # Get the raw text between the last position and current word between = command[last_pos : node.pos[0]] word_text = command[node.pos[0] : node.pos[1]] # Add the between text, escaping special characters between = re.sub(r'\\([;&|><])', r'\\\\\1', between) parts.append(between) # Check if word_text is a quoted string or command substitution if ( (word_text.startswith('"') and word_text.endswith('"')) or (word_text.startswith("'") and word_text.endswith("'")) or (word_text.startswith('$(') and word_text.endswith(')')) or (word_text.startswith('`') and word_text.endswith('`')) ): # Preserve quoted strings, command substitutions, and heredoc content as-is parts.append(word_text) else: # Escape special chars in unquoted text word_text = re.sub(r'\\([;&|><])', r'\\\\\1', word_text) parts.append(word_text) last_pos = node.pos[1] return # Visit child nodes if hasattr(node, 'parts'): for part in visit_node(part) # Process all nodes in the AST nodes = list(bashlex.parse(command)) for node in nodes: between = command[last_pos : node.pos[0]] between = re.sub(r'\\([;&|><])', r'\\\\\1', between) parts.append(between) last_pos = node.pos[0] visit_node(node) # Handle any remaining text after the last word remaining = command[last_pos:] parts.append(remaining) return ''.join(parts) except (bashlex.errors.ParsingError, NotImplementedError): logger.debug( f'Failed to parse bash commands for special characters escape\n' f'[input]: {command}\n' f'[warning]: {traceback.format_exc()}\n' f'The original command will be returned as is.' ) return command class BashCommandStatus(Enum): CONTINUE = 'continue' COMPLETED = 'completed' NO_CHANGE_TIMEOUT = 'no_change_timeout' HARD_TIMEOUT = 'hard_timeout' def _remove_command_prefix(command_output: str, command: str) -> str: return command_output.lstrip().removeprefix(command.lstrip()).lstrip() class BashSession: POLL_INTERVAL = 0.5 HISTORY_LIMIT = 10_000 PS1 = CmdOutputMetadata.to_ps1_prompt() def __init__( self, work_dir: str, username: str | None = None, no_change_timeout_seconds: int = 30, ): self.NO_CHANGE_TIMEOUT_SECONDS = no_change_timeout_seconds self.work_dir = work_dir self.username = username self._initialized = False def initialize(self): self.server = libtmux.Server() window_command = '/bin/bash' if self.username: # This starts a non-login (new) shell for the given user window_command = f'su {self.username} -' session_name = f'openhands-{self.username}-{uuid.uuid4()}' self.session = self.server.new_session( session_name=session_name, window_name='bash', window_command=window_command, start_directory=self.work_dir, kill_session=True, x=1000, y=1000, ) # Set history limit to a large number to avoid losing history # self.session.set_option('history-limit', str(self.HISTORY_LIMIT), _global=True) self.session.history_limit = self.HISTORY_LIMIT # We need to create a new pane because the initial pane's history limit is (default) 2000 _initial_window = self.session.attached_window self.window = self.session.new_window( window_shell=window_command, start_directory=self.work_dir, ) self.pane = self.window.attached_pane logger.debug(f'pane: {self.pane}; history_limit: {self.session.history_limit}') _initial_window.kill_window() # Configure bash to use simple PS1 and disable PS2 self.pane.send_keys( f'export PROMPT_COMMAND=\'export PS1="{self.PS1}"\'; export PS2=""' ) time.sleep(0.1) # Wait for command to take effect self._clear_screen() # Store the last command for interactive input handling self.prev_status: BashCommandStatus | None = None self.prev_output: str = '' self._closed: bool = False logger.debug(f'Bash session initialized with work dir: {self.work_dir}') # Maintain the current working directory self._cwd = os.path.abspath(self.work_dir) self._initialized = True def __del__(self): """Ensure the session is closed when the object is destroyed.""" self.close() def _get_pane_content(self) -> str: """Capture the current pane content and update the buffer.""" content = '\n'.join( map( # avoid double newlines lambda line: line.rstrip(), self.pane.cmd('capture-pane', '-J', '-pS', '-').stdout, ) ) return content def close(self): """Clean up the session.""" if self._closed: return self.session.kill_session() self._closed = True @property def cwd(self): return self._cwd def _is_special_key(self, command: str) -> bool: """Check if the command is a special key.""" # Special keys are of the form C- _command = command.strip() return _command.startswith('C-') and len(_command) == 3 def _clear_screen(self): """Clear the tmux pane screen and history.""" self.pane.send_keys('C-l', enter=False) time.sleep(0.1) self.pane.cmd('clear-history') def _get_command_output( self, command: str, raw_command_output: str, metadata: CmdOutputMetadata, continue_prefix: str = '', ) -> str: """Get the command output with the previous command output removed. Args: command: The command that was executed. raw_command_output: The raw output from the command. metadata: The metadata object to store prefix/suffix in. continue_prefix: The prefix to add to the command output if it's a continuation of the previous command. """ # remove the previous command output from the new output if any if self.prev_output: command_output = raw_command_output.removeprefix(self.prev_output) metadata.prefix = continue_prefix else: command_output = raw_command_output self.prev_output = raw_command_output # update current command output anyway command_output = _remove_command_prefix(command_output, command) return command_output.rstrip() def _handle_completed_command( self, command: str, pane_content: str, ps1_matches: list[re.Match] ) -> CmdOutputObservation: is_special_key = self._is_special_key(command) assert len(ps1_matches) >= 1, ( f'Expected at least one PS1 metadata block, but got {len(ps1_matches)}.\n' f'---FULL OUTPUT---\n{pane_content!r}\n---END OF OUTPUT---' ) metadata = CmdOutputMetadata.from_ps1_match(ps1_matches[-1]) # Special case where the previous command output is truncated due to history limit # We should get the content BEFORE the last PS1 prompt get_content_before_last_match = bool(len(ps1_matches) == 1) # Update the current working directory if it has changed if metadata.working_dir != self._cwd and metadata.working_dir: self._cwd = metadata.working_dir logger.debug(f'COMMAND OUTPUT: {pane_content}') # Extract the command output between the two PS1 prompts raw_command_output = self._combine_outputs_between_matches( pane_content, ps1_matches, get_content_before_last_match=get_content_before_last_match, ) if get_content_before_last_match: # Count the number of lines in the truncated output num_lines = len(raw_command_output.splitlines()) metadata.prefix = f'[Previous command outputs are truncated. Showing the last {num_lines} lines of the output below.]\n' metadata.suffix = ( f'\n[The command completed with exit code {metadata.exit_code}.]' if not is_special_key else f'\n[The command completed with exit code {metadata.exit_code}. CTRL+{command[-1].upper()} was sent.]' ) command_output = self._get_command_output( command, raw_command_output, metadata, ) self.prev_status = BashCommandStatus.COMPLETED self.prev_output = '' # Reset previous command output self._ready_for_next_command() return CmdOutputObservation( content=command_output, command=command, metadata=metadata, ) def _handle_nochange_timeout_command( self, command: str, pane_content: str, ps1_matches: list[re.Match], ) -> CmdOutputObservation: self.prev_status = BashCommandStatus.NO_CHANGE_TIMEOUT if len(ps1_matches) != 1: logger.warning( 'Expected exactly one PS1 metadata block BEFORE the execution of a command, ' f'but got {len(ps1_matches)} PS1 metadata blocks:\n---\n{pane_content!r}\n---' ) raw_command_output = self._combine_outputs_between_matches( pane_content, ps1_matches ) metadata = CmdOutputMetadata() # No metadata available metadata.suffix = ( f'\n[The command has no new output after {self.NO_CHANGE_TIMEOUT_SECONDS} seconds. ' "You may wait longer to see additional output by sending empty command '', " 'send other commands to interact with the current process, ' 'or send keys to interrupt/kill the command.]' ) command_output = self._get_command_output( command, raw_command_output, metadata, continue_prefix='[Below is the output of the previous command.]\n', ) return CmdOutputObservation( content=command_output, command=command, metadata=metadata, ) def _handle_hard_timeout_command( self, command: str, pane_content: str, ps1_matches: list[re.Match], timeout: float, ) -> CmdOutputObservation: self.prev_status = BashCommandStatus.HARD_TIMEOUT if len(ps1_matches) != 1: logger.warning( 'Expected exactly one PS1 metadata block BEFORE the execution of a command, ' f'but got {len(ps1_matches)} PS1 metadata blocks:\n---\n{pane_content!r}\n---' ) raw_command_output = self._combine_outputs_between_matches( pane_content, ps1_matches ) metadata = CmdOutputMetadata() # No metadata available metadata.suffix = ( f'\n[The command timed out after {timeout} seconds. ' "You may wait longer to see additional output by sending empty command '', " 'send other commands to interact with the current process, ' 'or send keys to interrupt/kill the command.]' ) command_output = self._get_command_output( command, raw_command_output, metadata, continue_prefix='[Below is the output of the previous command.]\n', ) return CmdOutputObservation( command=command, content=command_output, metadata=metadata, ) def _ready_for_next_command(self): """Reset the content buffer for a new command.""" # Clear the current content self._clear_screen() def _combine_outputs_between_matches( self, pane_content: str, ps1_matches: list[re.Match], get_content_before_last_match: bool = False, ) -> str: """Combine all outputs between PS1 matches. Args: pane_content: The full pane content containing PS1 prompts and command outputs ps1_matches: List of regex matches for PS1 prompts get_content_before_last_match: when there's only one PS1 match, whether to get the content before the last PS1 prompt (True) or after the last PS1 prompt (False) Returns: Combined string of all outputs between matches """ if len(ps1_matches) == 1: if get_content_before_last_match: # The command output is the content before the last PS1 prompt return pane_content[: ps1_matches[0].start()] else: # The command output is the content after the last PS1 prompt return pane_content[ps1_matches[0].end() + 1 :] elif len(ps1_matches) == 0: return pane_content combined_output = '' for i in range(len(ps1_matches) - 1): # Extract content between current and next PS1 prompt output_segment = pane_content[ ps1_matches[i].end() + 1 : ps1_matches[i + 1].start() ] combined_output += output_segment + '\n' logger.debug(f'COMBINED OUTPUT: {combined_output}') return combined_output def execute(self, action: CmdRunAction) -> CmdOutputObservation | ErrorObservation: """Execute a command in the bash session.""" if not self._initialized: raise RuntimeError('Bash session is not initialized') # Strip the command of any leading/trailing whitespace logger.debug(f'RECEIVED ACTION: {action}') command = action.command.strip() is_input: bool = action.is_input # If the previous command is not completed, we need to check if the command is empty if self.prev_status not in { BashCommandStatus.CONTINUE, BashCommandStatus.NO_CHANGE_TIMEOUT, BashCommandStatus.HARD_TIMEOUT, }: if command == '': return CmdOutputObservation( content='ERROR: No previous running command to retrieve logs from.', command='', metadata=CmdOutputMetadata(), ) if is_input: return CmdOutputObservation( content='ERROR: No previous running command to interact with.', command='', metadata=CmdOutputMetadata(), ) # Check if the command is a single command or multiple commands splited_commands = split_bash_commands(command) if len(splited_commands) > 1: return ErrorObservation( content=( f'ERROR: Cannot execute multiple commands at once.\n' f'Please run each command separately OR chain them into a single command via && or ;\n' f'Provided commands:\n{"\n".join(f"({i+1}) {cmd}" for i, cmd in enumerate(splited_commands))}' ) ) start_time = time.time() last_change_time = start_time last_pane_output = self._get_pane_content() # When prev command is still running, and we are trying to send a new command if ( self.prev_status in { BashCommandStatus.HARD_TIMEOUT, BashCommandStatus.NO_CHANGE_TIMEOUT, } and not last_pane_output.endswith( CMD_OUTPUT_PS1_END ) # prev command is not completed and not is_input and command != '' # not input and not empty command ): _ps1_matches = CmdOutputMetadata.matches_ps1_metadata(last_pane_output) raw_command_output = self._combine_outputs_between_matches( last_pane_output, _ps1_matches ) metadata = CmdOutputMetadata() # No metadata available metadata.suffix = ( f'\n[Your command "{command}" is NOT executed. ' f'The previous command is still running - You CANNOT send new commands until the previous command is completed. ' 'By setting `is_input` to `true`, you can interact with the current process: ' "You may wait longer to see additional output of the previous command by sending empty command '', " 'send other commands to interact with the current process, ' 'or send keys ("C-c", "C-z", "C-d") to interrupt/kill the previous command before sending your new command.]' ) logger.debug(f'PREVIOUS COMMAND OUTPUT: {raw_command_output}') command_output = self._get_command_output( command, raw_command_output, metadata, continue_prefix='[Below is the output of the previous command.]\n', ) return CmdOutputObservation( command=command, content=command_output, metadata=metadata, ) # Send actual command/inputs to the pane if command != '': is_special_key = self._is_special_key(command) if is_input: logger.debug(f'SENDING INPUT TO RUNNING PROCESS: {command!r}') self.pane.send_keys( command, enter=not is_special_key, ) else: # convert command to raw string command = escape_bash_special_chars(command) logger.debug(f'SENDING COMMAND: {command!r}') self.pane.send_keys( command, enter=not is_special_key, ) # Loop until the command completes or times out while should_continue(): _start_time = time.time() logger.debug(f'GETTING PANE CONTENT at {_start_time}') cur_pane_output = self._get_pane_content() logger.debug( f'PANE CONTENT GOT after {time.time() - _start_time:.2f} seconds' ) logger.debug(f'BEGIN OF PANE CONTENT: {cur_pane_output.split("\n")[:10]}') logger.debug(f'END OF PANE CONTENT: {cur_pane_output.split("\n")[-10:]}') ps1_matches = CmdOutputMetadata.matches_ps1_metadata(cur_pane_output) if cur_pane_output != last_pane_output: last_pane_output = cur_pane_output last_change_time = time.time() logger.debug(f'CONTENT UPDATED DETECTED at {last_change_time}') # 1) Execution completed # if the last command output contains the end marker if cur_pane_output.rstrip().endswith(CMD_OUTPUT_PS1_END.rstrip()): return self._handle_completed_command( command, pane_content=cur_pane_output, ps1_matches=ps1_matches, ) # 2) Execution timed out since there's no change in output # for a while (self.NO_CHANGE_TIMEOUT_SECONDS) # We ignore this if the command is *blocking time_since_last_change = time.time() - last_change_time logger.debug( f'CHECKING NO CHANGE TIMEOUT ({self.NO_CHANGE_TIMEOUT_SECONDS}s): elapsed {time_since_last_change}. Action blocking: {action.blocking}' ) if ( not action.blocking and time_since_last_change >= self.NO_CHANGE_TIMEOUT_SECONDS ): return self._handle_nochange_timeout_command( command, pane_content=cur_pane_output, ps1_matches=ps1_matches, ) # 3) Execution timed out due to hard timeout logger.debug( f'CHECKING HARD TIMEOUT ({action.timeout}s): elapsed {time.time() - start_time}' ) if action.timeout and time.time() - start_time >= action.timeout: return self._handle_hard_timeout_command( command, pane_content=cur_pane_output, ps1_matches=ps1_matches, timeout=action.timeout, ) logger.debug(f'SLEEPING for {self.POLL_INTERVAL} seconds for next poll') time.sleep(self.POLL_INTERVAL) raise RuntimeError('Bash session was likely interrupted...')