Spaces:
Running
Running
from collections.abc import Callable | |
from typing import TYPE_CHECKING, Any | |
from celery.result import AsyncResult | |
from langflow.services.task.backends.base import TaskBackend | |
from langflow.worker import celery_app | |
if TYPE_CHECKING: | |
from celery import Task | |
class CeleryBackend(TaskBackend): | |
name = "celery" | |
def __init__(self) -> None: | |
self.celery_app = celery_app | |
def launch_task(self, task_func: Callable[..., Any], *args: Any, **kwargs: Any) -> tuple[str, AsyncResult]: | |
# I need to type the delay method to make it easier | |
if not hasattr(task_func, "delay"): | |
msg = f"Task function {task_func} does not have a delay method" | |
raise ValueError(msg) | |
task: Task = task_func.delay(*args, **kwargs) | |
return task.id, AsyncResult(task.id, app=self.celery_app) | |
def get_task(self, task_id: str) -> Any: | |
return AsyncResult(task_id, app=self.celery_app) | |