from datetime import datetime, timezone from enum import Enum from pathlib import Path from typing import Any from uuid import UUID from pydantic import ( BaseModel, ConfigDict, Field, field_serializer, field_validator, model_serializer, ) from langflow.graph.schema import RunOutputs from langflow.graph.utils import serialize_field from langflow.schema import dotdict from langflow.schema.graph import Tweaks from langflow.schema.schema import InputType, OutputType, OutputValue from langflow.services.database.models.api_key.model import ApiKeyRead from langflow.services.database.models.base import orjson_dumps from langflow.services.database.models.flow import FlowCreate, FlowRead from langflow.services.database.models.user import UserRead from langflow.services.settings.feature_flags import FeatureFlags from langflow.services.tracing.schema import Log from langflow.utils.util_strings import truncate_long_strings class BuildStatus(Enum): """Status of the build.""" SUCCESS = "success" FAILURE = "failure" STARTED = "started" IN_PROGRESS = "in_progress" class TweaksRequest(BaseModel): tweaks: dict[str, dict[str, Any]] | None = Field(default_factory=dict) class UpdateTemplateRequest(BaseModel): template: dict class TaskResponse(BaseModel): """Task response schema.""" id: str | None = Field(None) href: str | None = Field(None) class ProcessResponse(BaseModel): """Process response schema.""" result: Any status: str | None = None task: TaskResponse | None = None session_id: str | None = None backend: str | None = None class RunResponse(BaseModel): """Run response schema.""" outputs: list[RunOutputs] | None = [] session_id: str | None = None @model_serializer(mode="plain") def serialize(self): # Serialize all the outputs if they are base models serialized = {"session_id": self.session_id, "outputs": []} if self.outputs: serialized_outputs = [] for output in self.outputs: if isinstance(output, BaseModel) and not isinstance(output, RunOutputs): serialized_outputs.append(output.model_dump(exclude_none=True)) else: serialized_outputs.append(output) serialized["outputs"] = serialized_outputs return serialized class PreloadResponse(BaseModel): """Preload response schema.""" session_id: str | None = None is_clear: bool | None = None class TaskStatusResponse(BaseModel): """Task status response schema.""" status: str result: Any | None = None class ChatMessage(BaseModel): """Chat message schema.""" is_bot: bool = False message: str | None | dict = None chat_key: str | None = Field(None, serialization_alias="chatKey") type: str = "human" class ChatResponse(ChatMessage): """Chat response schema.""" intermediate_steps: str type: str is_bot: bool = True files: list = [] @field_validator("type") @classmethod def validate_message_type(cls, v): if v not in {"start", "stream", "end", "error", "info", "file"}: msg = "type must be start, stream, end, error, info, or file" raise ValueError(msg) return v class PromptResponse(ChatMessage): """Prompt response schema.""" prompt: str type: str = "prompt" is_bot: bool = True class FileResponse(ChatMessage): """File response schema.""" data: Any = None data_type: str type: str = "file" is_bot: bool = True @field_validator("data_type") @classmethod def validate_data_type(cls, v): if v not in {"image", "csv"}: msg = "data_type must be image or csv" raise ValueError(msg) return v class FlowListCreate(BaseModel): flows: list[FlowCreate] class FlowListIds(BaseModel): flow_ids: list[str] class FlowListRead(BaseModel): flows: list[FlowRead] class FlowListReadWithFolderName(BaseModel): flows: list[FlowRead] folder_name: str description: str class InitResponse(BaseModel): flow_id: str = Field(serialization_alias="flowId") class BuiltResponse(BaseModel): built: bool class UploadFileResponse(BaseModel): """Upload file response schema.""" flow_id: str = Field(serialization_alias="flowId") file_path: Path class StreamData(BaseModel): event: str data: dict def __str__(self) -> str: return f"event: {self.event}\ndata: {orjson_dumps(self.data, indent_2=False)}\n\n" class CustomComponentRequest(BaseModel): model_config = ConfigDict(arbitrary_types_allowed=True) code: str frontend_node: dict | None = None class CustomComponentResponse(BaseModel): data: dict type: str class UpdateCustomComponentRequest(CustomComponentRequest): field: str field_value: str | int | float | bool | dict | list | None = None template: dict tool_mode: bool = False def get_template(self): return dotdict(self.template) class CustomComponentResponseError(BaseModel): detail: str traceback: str class ComponentListCreate(BaseModel): flows: list[FlowCreate] class ComponentListRead(BaseModel): flows: list[FlowRead] class UsersResponse(BaseModel): total_count: int users: list[UserRead] class ApiKeyResponse(BaseModel): id: str api_key: str name: str created_at: str last_used_at: str class ApiKeysResponse(BaseModel): total_count: int user_id: UUID api_keys: list[ApiKeyRead] class CreateApiKeyRequest(BaseModel): name: str class Token(BaseModel): access_token: str refresh_token: str token_type: str class ApiKeyCreateRequest(BaseModel): api_key: str class VerticesOrderResponse(BaseModel): ids: list[str] run_id: UUID vertices_to_run: list[str] class ResultDataResponse(BaseModel): results: Any | None = Field(default_factory=dict) outputs: dict[str, OutputValue] = Field(default_factory=dict) logs: dict[str, list[Log]] = Field(default_factory=dict) message: Any | None = Field(default_factory=dict) artifacts: Any | None = Field(default_factory=dict) timedelta: float | None = None duration: str | None = None used_frozen_result: bool | None = False @field_serializer("results") @classmethod def serialize_results(cls, v): if isinstance(v, dict): return {key: serialize_field(val) for key, val in v.items()} return serialize_field(v) class VertexBuildResponse(BaseModel): id: str | None = None inactivated_vertices: list[str] | None = None next_vertices_ids: list[str] | None = None top_level_vertices: list[str] | None = None valid: bool params: Any | None = Field(default_factory=dict) """JSON string of the params.""" data: ResultDataResponse """Mapping of vertex ids to result dict containing the param name and result value.""" timestamp: datetime | None = Field(default_factory=lambda: datetime.now(timezone.utc)) """Timestamp of the build.""" @field_serializer("data") def serialize_data(self, data: ResultDataResponse) -> dict: data_dict = data.model_dump() if isinstance(data, BaseModel) else data return truncate_long_strings(data_dict) class VerticesBuiltResponse(BaseModel): vertices: list[VertexBuildResponse] class InputValueRequest(BaseModel): components: list[str] | None = [] input_value: str | None = None session: str | None = None type: InputType | None = Field( "any", description="Defines on which components the input value should be applied. " "'any' applies to all input components.", ) # add an example model_config = ConfigDict( json_schema_extra={ "examples": [ { "components": ["components_id", "Component Name"], "input_value": "input_value", "session": "session_id", }, {"components": ["Component Name"], "input_value": "input_value"}, {"input_value": "input_value"}, { "components": ["Component Name"], "input_value": "input_value", "session": "session_id", }, {"input_value": "input_value", "session": "session_id"}, {"type": "chat", "input_value": "input_value"}, {"type": "json", "input_value": '{"key": "value"}'}, ] }, extra="forbid", ) class SimplifiedAPIRequest(BaseModel): input_value: str | None = Field(default=None, description="The input value") input_type: InputType | None = Field(default="chat", description="The input type") output_type: OutputType | None = Field(default="chat", description="The output type") output_component: str | None = Field( default="", description="If there are multiple output components, you can specify the component to get the output from.", ) tweaks: Tweaks | None = Field(default=None, description="The tweaks") session_id: str | None = Field(default=None, description="The session id") # (alias) type ReactFlowJsonObject = { # nodes: Node[]; # edges: Edge[]; # viewport: Viewport; # } # import ReactFlowJsonObject class FlowDataRequest(BaseModel): nodes: list[dict] edges: list[dict] viewport: dict | None = None class ConfigResponse(BaseModel): feature_flags: FeatureFlags frontend_timeout: int auto_saving: bool auto_saving_interval: int health_check_max_retries: int max_file_size_upload: int