Spaces:
Running
Running
from __future__ import annotations | |
from collections.abc import Callable, Coroutine | |
from typing import TYPE_CHECKING, Any | |
from loguru import logger | |
from langflow.services.base import Service | |
from langflow.services.task.backends.anyio import AnyIOBackend | |
from langflow.services.task.utils import get_celery_worker_status | |
if TYPE_CHECKING: | |
from langflow.services.settings.service import SettingsService | |
from langflow.services.task.backends.base import TaskBackend | |
def check_celery_availability(): | |
try: | |
from langflow.worker import celery_app | |
status = get_celery_worker_status(celery_app) | |
logger.debug(f"Celery status: {status}") | |
except Exception: # noqa: BLE001 | |
logger.opt(exception=True).debug("Celery not available") | |
status = {"availability": None} | |
return status | |
class TaskService(Service): | |
name = "task_service" | |
def __init__(self, settings_service: SettingsService): | |
self.settings_service = settings_service | |
try: | |
if self.settings_service.settings.celery_enabled: | |
status = check_celery_availability() | |
use_celery = status.get("availability") is not None | |
else: | |
use_celery = False | |
except ImportError: | |
use_celery = False | |
self.use_celery = use_celery | |
self.backend = self.get_backend() | |
def backend_name(self) -> str: | |
return self.backend.name | |
def get_backend(self) -> TaskBackend: | |
if self.use_celery: | |
from langflow.services.task.backends.celery import CeleryBackend | |
logger.debug("Using Celery backend") | |
return CeleryBackend() | |
logger.debug("Using AnyIO backend") | |
return AnyIOBackend() | |
# In your TaskService class | |
async def launch_and_await_task( | |
self, | |
task_func: Callable[..., Any], | |
*args: Any, | |
**kwargs: Any, | |
) -> Any: | |
if not self.use_celery: | |
return None, await task_func(*args, **kwargs) | |
if not hasattr(task_func, "apply"): | |
msg = f"Task function {task_func} does not have an apply method" | |
raise ValueError(msg) | |
task = task_func.apply(args=args, kwargs=kwargs) | |
result = task.get() | |
# if result is coroutine | |
if isinstance(result, Coroutine): | |
result = await result | |
return task.id, result | |
async def launch_task(self, task_func: Callable[..., Any], *args: Any, **kwargs: Any) -> Any: | |
logger.debug(f"Launching task {task_func} with args {args} and kwargs {kwargs}") | |
logger.debug(f"Using backend {self.backend}") | |
task = self.backend.launch_task(task_func, *args, **kwargs) | |
return await task if isinstance(task, Coroutine) else task | |
def get_task(self, task_id: str) -> Any: | |
return self.backend.get_task(task_id) | |