Spaces:
Running
Running
from __future__ import annotations | |
from typing import TYPE_CHECKING, Any | |
from asgiref.sync import async_to_sync | |
from celery.exceptions import SoftTimeLimitExceeded | |
from langflow.core.celery_app import celery_app | |
if TYPE_CHECKING: | |
from langflow.graph.vertex.base import Vertex | |
def test_celery(word: str) -> str: | |
return f"test task return {word}" | |
def build_vertex(self, vertex: Vertex) -> Vertex: | |
"""Build a vertex.""" | |
try: | |
vertex.task_id = self.request.id | |
async_to_sync(vertex.build)() | |
except SoftTimeLimitExceeded as e: | |
raise self.retry(exc=SoftTimeLimitExceeded("Task took too long"), countdown=2) from e | |
return vertex | |
def process_graph_cached_task() -> dict[str, Any]: | |
msg = "This task is not implemented yet" | |
raise NotImplementedError(msg) | |