import asyncio import logging from fastapi import BackgroundTasks, HTTPException from concurrent.futures import ThreadPoolExecutor logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) class Worker: def doing_work(self, task_manager): task_manager.task_status["status"] = "Running" for i in range(1, 101): if task_manager.task_status["status"] == "Stopped": break asyncio.sleep(1) # Simulate a time-consuming task task_manager.task_status["progress"] = i logger.info('process ' + str(i) + '%' + ' done') if task_manager.task_status["status"] != "Stopped": task_manager.task_status["status"] = "Completed" class TaskManager: task_status = {"progress": 0, "status": "Not started"} task = None #def __init__(self): worker = Worker() async def doing_work(self): loop = asyncio.get_running_loop() with ThreadPoolExecutor() as pool: await loop.run_in_executor(pool, self.worker.doing_work, self) #self.worker.doing_work(self) # self.task_status["status"] = "Running" # for i in range(1, 101): # if self.task_status["status"] == "Stopped": # break # await asyncio.sleep(1) # Simulate a time-consuming task # self.task_status["progress"] = i # logger.info('process ' + str(i) + '%' + ' done') # if self.task_status["status"] != "Stopped": # self.task_status["status"] = "Completed" async def start_task(self): if self.task is None or self.task.done(): self.task_status["progress"] = 0 self.task_status["status"] = "Not started" self.task = asyncio.create_task(self.doing_work()) return {"message": "Task started"} else: raise HTTPException(status_code=409, detail="Task already running") async def get_task_status(self): return self.task_status async def stop_task(self): if self.task is not None and not self.task.done(): self.task_status["status"] = "Stopped" self.task.cancel() return {"message": "Task stopped"} else: raise HTTPException(status_code=409, detail="No task running")