import io import os import pathlib import shutil import gradio as gr from loguru import logger import subprocess UPLOAD_DIRECTORY = pathlib.Path("/app/uploaded_files") CONFIG_PATH = pathlib.Path("/app/yourbench_config.yml") def save_files(files: list[str]): saved_paths = [shutil.move(str(pathlib.Path(file)), str(UPLOAD_DIRECTORY / pathlib.Path(file).name)) for file in files] return f"Files saved to: {', '.join(saved_paths)}" class SubprocessManager: def __init__(self, command): self.command = command self.process = None self.output_stream = io.StringIO() def start_process(self, oauth_token: gr.OAuthToken | None): """Start the subprocess.""" if self.is_running(): logger.info("Process is already running") return self.output_stream = io.StringIO() new_env = os.environ.copy() # Override env token, when running in gradio space if oauth_token: new_env["HF_TOKEN"] = oauth_token.token self.process = subprocess.Popen( self.command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, # Combine stderr with stdout text=True, bufsize=1, start_new_session=True, env=new_env ) os.set_blocking(self.process.stdout.fileno(), False) logger.info("Started the process") def read_and_get_output(self): """Read available subprocess output and return the captured output.""" if self.process and self.process.stdout: try: while True: line = self.process.stdout.readline() if line: self.output_stream.write(line) # Capture in StringIO else: break except BlockingIOError: pass return self.output_stream.getvalue() def stop_process(self): """Terminate the subprocess.""" if not self.is_running(): logger.info("Process is not running") return logger.info("Sending SIGTERM to the Process") self.process.terminate() exit_code = self.process.wait() # Wait for process to terminate logger.info(f"Process stopped exit code {exit_code}") #return exit_code def kill_process(self): """Forcefully kill the subprocess.""" if not self.is_running(): logger.info("Process is not running") return logger.info("Sending SIGKILL to the Process") self.process.kill() exit_code = self.process.wait() # Wait for process to be killed logger.info(f"Process killed exit code {exit_code}") #return exit_code def is_running(self): """Check if the subprocess is still running.""" return self.process and self.process.poll() is None