# Path: src/backend/langflow/services/database/models/flow/model.py import re from datetime import datetime, timezone from typing import TYPE_CHECKING, Optional from uuid import UUID, uuid4 import emoji from emoji import purely_emoji from fastapi import HTTPException, status from loguru import logger from pydantic import BaseModel, field_serializer, field_validator from sqlalchemy import Text, UniqueConstraint from sqlmodel import JSON, Column, Field, Relationship, SQLModel from langflow.schema import Data if TYPE_CHECKING: from langflow.services.database.models import TransactionTable from langflow.services.database.models.folder import Folder from langflow.services.database.models.message import MessageTable from langflow.services.database.models.user import User from langflow.services.database.models.vertex_builds.model import VertexBuildTable HEX_COLOR_LENGTH = 7 class FlowBase(SQLModel): name: str = Field(index=True) description: str | None = Field(default=None, sa_column=Column(Text, index=True, nullable=True)) icon: str | None = Field(default=None, nullable=True) icon_bg_color: str | None = Field(default=None, nullable=True) gradient: str | None = Field(default=None, nullable=True) data: dict | None = Field(default=None, nullable=True) is_component: bool | None = Field(default=False, nullable=True) updated_at: datetime | None = Field(default_factory=lambda: datetime.now(timezone.utc), nullable=True) webhook: bool | None = Field(default=False, nullable=True, description="Can be used on the webhook endpoint") endpoint_name: str | None = Field(default=None, nullable=True, index=True) tags: list[str] | None = None locked: bool | None = Field(default=False, nullable=True) @field_validator("endpoint_name") @classmethod def validate_endpoint_name(cls, v): # Endpoint name must be a string containing only letters, numbers, hyphens, and underscores if v is not None: if not isinstance(v, str): raise HTTPException( status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail="Endpoint name must be a string", ) if not re.match(r"^[a-zA-Z0-9_-]+$", v): raise HTTPException( status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail="Endpoint name must contain only letters, numbers, hyphens, and underscores", ) return v @field_validator("icon_bg_color") @classmethod def validate_icon_bg_color(cls, v): if v is not None and not isinstance(v, str): msg = "Icon background color must be a string" raise ValueError(msg) # validate that is is a hex color if v and not v.startswith("#"): msg = "Icon background color must start with #" raise ValueError(msg) # validate that it is a valid hex color if v and len(v) != HEX_COLOR_LENGTH: msg = "Icon background color must be 7 characters long" raise ValueError(msg) return v @field_validator("icon") @classmethod def validate_icon_atr(cls, v): # const emojiRegex = /\p{Emoji}/u; # const isEmoji = emojiRegex.test(data?.node?.icon!); # emoji pattern in Python if v is None: return v # we are going to use the emoji library to validate the emoji # emojis can be defined using the :emoji_name: syntax if not v.startswith(":") and not v.endswith(":"): return v if not v.startswith(":") or not v.endswith(":"): # emoji should have both starting and ending colons # so if one of them is missing, we will raise msg = f"Invalid emoji. {v} is not a valid emoji." raise ValueError(msg) emoji_value = emoji.emojize(v, variant="emoji_type") if v == emoji_value: logger.warning(f"Invalid emoji. {v} is not a valid emoji.") icon = emoji_value if purely_emoji(icon): # this is indeed an emoji return icon # otherwise it should be a valid lucide icon if v is not None and not isinstance(v, str): msg = "Icon must be a string" raise ValueError(msg) # is should be lowercase and contain only letters and hyphens if v and not v.islower(): msg = "Icon must be lowercase" raise ValueError(msg) if v and not v.replace("-", "").isalpha(): msg = "Icon must contain only letters and hyphens" raise ValueError(msg) return v @field_validator("data") @classmethod def validate_json(cls, v): if not v: return v if not isinstance(v, dict): msg = "Flow must be a valid JSON" raise ValueError(msg) # noqa: TRY004 # data must contain nodes and edges if "nodes" not in v: msg = "Flow must have nodes" raise ValueError(msg) if "edges" not in v: msg = "Flow must have edges" raise ValueError(msg) return v # updated_at can be serialized to JSON @field_serializer("updated_at") def serialize_datetime(self, value): if isinstance(value, datetime): # I'm getting 2024-05-29T17:57:17.631346 # and I want 2024-05-29T17:57:17-05:00 value = value.replace(microsecond=0) if value.tzinfo is None: value = value.replace(tzinfo=timezone.utc) return value.isoformat() return value @field_validator("updated_at", mode="before") @classmethod def validate_dt(cls, v): if v is None: return v if isinstance(v, datetime): return v return datetime.fromisoformat(v) class Flow(FlowBase, table=True): # type: ignore[call-arg] id: UUID = Field(default_factory=uuid4, primary_key=True, unique=True) data: dict | None = Field(default=None, sa_column=Column(JSON)) user_id: UUID | None = Field(index=True, foreign_key="user.id", nullable=True) user: "User" = Relationship(back_populates="flows") icon: str | None = Field(default=None, nullable=True) tags: list[str] | None = Field(sa_column=Column(JSON), default=[]) locked: bool | None = Field(default=False, nullable=True) folder_id: UUID | None = Field(default=None, foreign_key="folder.id", nullable=True, index=True) folder: Optional["Folder"] = Relationship(back_populates="flows") messages: list["MessageTable"] = Relationship(back_populates="flow") transactions: list["TransactionTable"] = Relationship(back_populates="flow") vertex_builds: list["VertexBuildTable"] = Relationship(back_populates="flow") def to_data(self): serialized = self.model_dump() data = { "id": serialized.pop("id"), "data": serialized.pop("data"), "name": serialized.pop("name"), "description": serialized.pop("description"), "updated_at": serialized.pop("updated_at"), } return Data(data=data) __table_args__ = ( UniqueConstraint("user_id", "name", name="unique_flow_name"), UniqueConstraint("user_id", "endpoint_name", name="unique_flow_endpoint_name"), ) class FlowCreate(FlowBase): user_id: UUID | None = None folder_id: UUID | None = None class FlowRead(FlowBase): id: UUID user_id: UUID | None = Field() folder_id: UUID | None = Field() class FlowHeader(BaseModel): """Model representing a header for a flow - Without the data. Attributes: ----------- id : UUID Unique identifier for the flow. name : str The name of the flow. folder_id : UUID | None, optional The ID of the folder containing the flow. None if not associated with a folder. is_component : bool | None, optional Flag indicating whether the flow is a component. endpoint_name : str | None, optional The name of the endpoint associated with this flow. description : str | None, optional A description of the flow. """ id: UUID name: str folder_id: UUID | None = None is_component: bool | None = None endpoint_name: str | None = None description: str | None = None class FlowUpdate(SQLModel): name: str | None = None description: str | None = None data: dict | None = None folder_id: UUID | None = None endpoint_name: str | None = None locked: bool | None = None @field_validator("endpoint_name") @classmethod def validate_endpoint_name(cls, v): # Endpoint name must be a string containing only letters, numbers, hyphens, and underscores if v is not None: if not isinstance(v, str): raise HTTPException( status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail="Endpoint name must be a string", ) if not re.match(r"^[a-zA-Z0-9_-]+$", v): raise HTTPException( status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail="Endpoint name must contain only letters, numbers, hyphens, and underscores", ) return v