from __future__ import annotations import asyncio import json import re import traceback from collections.abc import AsyncIterator, Iterator from datetime import datetime, timezone from typing import Annotated, Any, Literal from uuid import UUID from fastapi.encoders import jsonable_encoder from langchain_core.load import load from langchain_core.messages import AIMessage, BaseMessage, HumanMessage, SystemMessage from langchain_core.prompts import BaseChatPromptTemplate, ChatPromptTemplate, PromptTemplate from loguru import logger from pydantic import BaseModel, ConfigDict, Field, ValidationError, field_serializer, field_validator from langflow.base.prompts.utils import dict_values_to_string from langflow.schema.content_block import ContentBlock from langflow.schema.content_types import ErrorContent from langflow.schema.data import Data from langflow.schema.image import Image, get_file_paths, is_image_file from langflow.schema.properties import Properties, Source from langflow.schema.validators import timestamp_to_str_validator from langflow.utils.constants import ( MESSAGE_SENDER_AI, MESSAGE_SENDER_NAME_AI, MESSAGE_SENDER_NAME_USER, MESSAGE_SENDER_USER, ) from langflow.utils.image import create_data_url class Message(Data): model_config = ConfigDict(arbitrary_types_allowed=True) # Helper class to deal with image data text_key: str = "text" text: str | AsyncIterator | Iterator | None = Field(default="") sender: str | None = None sender_name: str | None = None files: list[str | Image] | None = Field(default=[]) session_id: str | None = Field(default="") timestamp: Annotated[str, timestamp_to_str_validator] = Field( default_factory=lambda: datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S %Z") ) flow_id: str | UUID | None = None error: bool = Field(default=False) edit: bool = Field(default=False) properties: Properties = Field(default_factory=Properties) category: Literal["message", "error", "warning", "info"] | None = "message" content_blocks: list[ContentBlock] = Field(default_factory=list) @field_validator("flow_id", mode="before") @classmethod def validate_flow_id(cls, value): if isinstance(value, UUID): value = str(value) return value @field_validator("content_blocks", mode="before") @classmethod def validate_content_blocks(cls, value): # value may start with [ or not if isinstance(value, list): return [ ContentBlock.model_validate_json(v) if isinstance(v, str) else ContentBlock.model_validate(v) for v in value ] if isinstance(value, str): value = json.loads(value) if value.startswith("[") else [ContentBlock.model_validate_json(value)] return value @field_validator("properties", mode="before") @classmethod def validate_properties(cls, value): if isinstance(value, str): value = Properties.model_validate_json(value) elif isinstance(value, dict): value = Properties.model_validate(value) return value @field_serializer("flow_id") def serialize_flow_id(self, value): if isinstance(value, UUID): return str(value) return value @field_serializer("timestamp") def serialize_timestamp(self, value): try: # Try parsing with timezone return datetime.strptime(value.strip(), "%Y-%m-%d %H:%M:%S %Z").astimezone(timezone.utc) except ValueError: # Try parsing without timezone return datetime.strptime(value.strip(), "%Y-%m-%d %H:%M:%S").replace(tzinfo=timezone.utc) @field_validator("files", mode="before") @classmethod def validate_files(cls, value): if not value: value = [] elif not isinstance(value, list): value = [value] return value def model_post_init(self, /, _context: Any) -> None: new_files: list[Any] = [] for file in self.files or []: if is_image_file(file): new_files.append(Image(path=file)) else: new_files.append(file) self.files = new_files if "timestamp" not in self.data: self.data["timestamp"] = self.timestamp def set_flow_id(self, flow_id: str) -> None: self.flow_id = flow_id def to_lc_message( self, ) -> BaseMessage: """Converts the Data to a BaseMessage. Returns: BaseMessage: The converted BaseMessage. """ # The idea of this function is to be a helper to convert a Data to a BaseMessage # It will use the "sender" key to determine if the message is Human or AI # If the key is not present, it will default to AI # But first we check if all required keys are present in the data dictionary # they are: "text", "sender" if self.text is None or not self.sender: logger.warning("Missing required keys ('text', 'sender') in Message, defaulting to HumanMessage.") text = "" if not isinstance(self.text, str) else self.text if self.sender == MESSAGE_SENDER_USER or not self.sender: if self.files: contents = [{"type": "text", "text": text}] contents.extend(self.get_file_content_dicts()) human_message = HumanMessage(content=contents) else: human_message = HumanMessage(content=text) return human_message return AIMessage(content=text) @classmethod def from_lc_message(cls, lc_message: BaseMessage) -> Message: if lc_message.type == "human": sender = MESSAGE_SENDER_USER sender_name = MESSAGE_SENDER_NAME_USER elif lc_message.type == "ai": sender = MESSAGE_SENDER_AI sender_name = MESSAGE_SENDER_NAME_AI elif lc_message.type == "system": sender = "System" sender_name = "System" else: sender = lc_message.type sender_name = lc_message.type return cls(text=lc_message.content, sender=sender, sender_name=sender_name) @classmethod def from_data(cls, data: Data) -> Message: """Converts Data to a Message. Args: data: The Data to convert. Returns: The converted Message. """ return cls( text=data.text, sender=data.sender, sender_name=data.sender_name, files=data.files, session_id=data.session_id, timestamp=data.timestamp, flow_id=data.flow_id, error=data.error, edit=data.edit, ) @field_serializer("text", mode="plain") def serialize_text(self, value): if isinstance(value, AsyncIterator | Iterator): return "" return value # Keep this async method for backwards compatibility def get_file_content_dicts(self): content_dicts = [] files = get_file_paths(self.files) for file in files: if isinstance(file, Image): content_dicts.append(file.to_content_dict()) else: image_url = create_data_url(file) content_dicts.append({"type": "image_url", "image_url": {"url": image_url}}) return content_dicts def load_lc_prompt(self): if "prompt" not in self: msg = "Prompt is required." raise ValueError(msg) # self.prompt was passed through jsonable_encoder # so inner messages are not BaseMessage # we need to convert them to BaseMessage messages = [] for message in self.prompt.get("kwargs", {}).get("messages", []): match message: case HumanMessage(): messages.append(message) case _ if message.get("type") == "human": messages.append(HumanMessage(content=message.get("content"))) case _ if message.get("type") == "system": messages.append(SystemMessage(content=message.get("content"))) case _ if message.get("type") == "ai": messages.append(AIMessage(content=message.get("content"))) self.prompt["kwargs"]["messages"] = messages return load(self.prompt) @classmethod def from_lc_prompt( cls, prompt: BaseChatPromptTemplate, ): prompt_json = prompt.to_json() return cls(prompt=prompt_json) def format_text(self): prompt_template = PromptTemplate.from_template(self.template) variables_with_str_values = dict_values_to_string(self.variables) formatted_prompt = prompt_template.format(**variables_with_str_values) self.text = formatted_prompt return formatted_prompt @classmethod async def from_template_and_variables(cls, template: str, **variables): # This method has to be async for backwards compatibility with versions # >1.0.15, <1.1 return cls.from_template(template, **variables) # Define a sync version for backwards compatibility with versions >1.0.15, <1.1 @classmethod def from_template(cls, template: str, **variables): instance = cls(template=template, variables=variables) text = instance.format_text() message = HumanMessage(content=text) contents = [] for value in variables.values(): if isinstance(value, cls) and value.files: content_dicts = value.get_file_content_dicts() contents.extend(content_dicts) if contents: message = HumanMessage(content=[{"type": "text", "text": text}, *contents]) prompt_template = ChatPromptTemplate.from_messages([message]) instance.prompt = jsonable_encoder(prompt_template.to_json()) instance.messages = instance.prompt.get("kwargs", {}).get("messages", []) return instance @classmethod async def create(cls, **kwargs): """If files are present, create the message in a separate thread as is_image_file is blocking.""" if "files" in kwargs: return await asyncio.to_thread(cls, **kwargs) return cls(**kwargs) class DefaultModel(BaseModel): class Config: from_attributes = True populate_by_name = True json_encoders = { datetime: lambda v: v.isoformat(), } def json(self, **kwargs): # Usa a função de serialização personalizada return super().model_dump_json(**kwargs, encoder=self.custom_encoder) @staticmethod def custom_encoder(obj): if isinstance(obj, datetime): return obj.isoformat() msg = f"Object of type {obj.__class__.__name__} is not JSON serializable" raise TypeError(msg) class MessageResponse(DefaultModel): id: str | UUID | None = Field(default=None) flow_id: UUID | None = Field(default=None) timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) sender: str sender_name: str session_id: str text: str files: list[str] = [] edit: bool properties: Properties | None = None category: str | None = None content_blocks: list[ContentBlock] | None = None @field_validator("files", mode="before") @classmethod def validate_files(cls, v): if isinstance(v, str): v = json.loads(v) return v @field_serializer("timestamp") @classmethod def serialize_timestamp(cls, v): v = v.replace(microsecond=0) return v.strftime("%Y-%m-%d %H:%M:%S %Z") @field_serializer("files") @classmethod def serialize_files(cls, v): if isinstance(v, list): return json.dumps(v) return v @classmethod def from_message(cls, message: Message, flow_id: str | None = None): # first check if the record has all the required fields if message.text is None or not message.sender or not message.sender_name: msg = "The message does not have the required fields (text, sender, sender_name)." raise ValueError(msg) return cls( sender=message.sender, sender_name=message.sender_name, text=message.text, session_id=message.session_id, files=message.files or [], timestamp=message.timestamp, flow_id=flow_id, ) class ErrorMessage(Message): """A message class specifically for error messages with predefined error-specific attributes.""" def __init__( self, exception: BaseException, session_id: str, source: Source, trace_name: str | None = None, flow_id: str | None = None, ) -> None: # This is done to avoid circular imports if exception.__class__.__name__ == "ExceptionWithMessageError" and exception.__cause__ is not None: exception = exception.__cause__ # Get the error reason reason = f"**{exception.__class__.__name__}**\n" if hasattr(exception, "body") and "message" in exception.body: reason += f" - **{exception.body.get('message')}**\n" elif hasattr(exception, "code"): reason += f" - **Code: {exception.code}**\n" elif hasattr(exception, "args") and exception.args: reason += f" - **Details: {exception.args[0]}**\n" elif isinstance(exception, ValidationError): reason += f" - **Details:**\n\n```python\n{exception!s}\n```\n" else: reason += " - **An unknown error occurred.**\n" # Get the sender ID if trace_name: match = re.search(r"\((.*?)\)", trace_name) if match: match.group(1) super().__init__( session_id=session_id, sender=source.display_name, sender_name=source.display_name, text=reason, properties=Properties( text_color="red", background_color="red", edited=False, source=source, icon="error", allow_markdown=False, targets=[], ), category="error", error=True, content_blocks=[ ContentBlock( title="Error", contents=[ ErrorContent( type="error", component=source.display_name, field=str(exception.field) if hasattr(exception, "field") else None, reason=reason, solution=str(exception.solution) if hasattr(exception, "solution") else None, traceback=traceback.format_exc(), ) ], ) ], flow_id=flow_id, )