Tai Truong
fix readme
d202ada
from __future__ import annotations
from collections.abc import Callable, Coroutine
from typing import TYPE_CHECKING, Any
from loguru import logger
from langflow.services.base import Service
from langflow.services.task.backends.anyio import AnyIOBackend
from langflow.services.task.utils import get_celery_worker_status
if TYPE_CHECKING:
from langflow.services.settings.service import SettingsService
from langflow.services.task.backends.base import TaskBackend
def check_celery_availability():
try:
from langflow.worker import celery_app
status = get_celery_worker_status(celery_app)
logger.debug(f"Celery status: {status}")
except Exception: # noqa: BLE001
logger.opt(exception=True).debug("Celery not available")
status = {"availability": None}
return status
class TaskService(Service):
name = "task_service"
def __init__(self, settings_service: SettingsService):
self.settings_service = settings_service
try:
if self.settings_service.settings.celery_enabled:
status = check_celery_availability()
use_celery = status.get("availability") is not None
else:
use_celery = False
except ImportError:
use_celery = False
self.use_celery = use_celery
self.backend = self.get_backend()
@property
def backend_name(self) -> str:
return self.backend.name
def get_backend(self) -> TaskBackend:
if self.use_celery:
from langflow.services.task.backends.celery import CeleryBackend
logger.debug("Using Celery backend")
return CeleryBackend()
logger.debug("Using AnyIO backend")
return AnyIOBackend()
# In your TaskService class
async def launch_and_await_task(
self,
task_func: Callable[..., Any],
*args: Any,
**kwargs: Any,
) -> Any:
if not self.use_celery:
return None, await task_func(*args, **kwargs)
if not hasattr(task_func, "apply"):
msg = f"Task function {task_func} does not have an apply method"
raise ValueError(msg)
task = task_func.apply(args=args, kwargs=kwargs)
result = task.get()
# if result is coroutine
if isinstance(result, Coroutine):
result = await result
return task.id, result
async def launch_task(self, task_func: Callable[..., Any], *args: Any, **kwargs: Any) -> Any:
logger.debug(f"Launching task {task_func} with args {args} and kwargs {kwargs}")
logger.debug(f"Using backend {self.backend}")
task = self.backend.launch_task(task_func, *args, **kwargs)
return await task if isinstance(task, Coroutine) else task
def get_task(self, task_id: str) -> Any:
return self.backend.get_task(task_id)