Tai Truong
fix readme
d202ada
raw
history blame contribute delete
939 Bytes
from collections.abc import Callable
from typing import TYPE_CHECKING, Any
from celery.result import AsyncResult
from langflow.services.task.backends.base import TaskBackend
from langflow.worker import celery_app
if TYPE_CHECKING:
from celery import Task
class CeleryBackend(TaskBackend):
name = "celery"
def __init__(self) -> None:
self.celery_app = celery_app
def launch_task(self, task_func: Callable[..., Any], *args: Any, **kwargs: Any) -> tuple[str, AsyncResult]:
# I need to type the delay method to make it easier
if not hasattr(task_func, "delay"):
msg = f"Task function {task_func} does not have a delay method"
raise ValueError(msg)
task: Task = task_func.delay(*args, **kwargs)
return task.id, AsyncResult(task.id, app=self.celery_app)
def get_task(self, task_id: str) -> Any:
return AsyncResult(task_id, app=self.celery_app)