from typing import Any from langflow.custom import Component from langflow.field_typing.range_spec import RangeSpec from langflow.inputs.inputs import ( BoolInput, DataInput, DictInput, IntInput, MessageTextInput, ) from langflow.io import Output from langflow.schema import Data from langflow.schema.dotdict import dotdict class UpdateDataComponent(Component): display_name: str = "Update Data" description: str = "Dynamically update or append data with the specified fields." name: str = "UpdateData" MAX_FIELDS = 15 # Define a constant for maximum number of fields icon = "FolderSync" inputs = [ DataInput( name="old_data", display_name="Data", info="The record to update.", is_list=True, # Changed to True to handle list of Data objects ), IntInput( name="number_of_fields", display_name="Number of Fields", info="Number of fields to be added to the record.", real_time_refresh=True, value=0, range_spec=RangeSpec(min=1, max=MAX_FIELDS, step=1, step_type="int"), ), MessageTextInput( name="text_key", display_name="Text Key", info="Key that identifies the field to be used as the text content.", advanced=True, ), BoolInput( name="text_key_validator", display_name="Text Key Validator", advanced=True, info="If enabled, checks if the given 'Text Key' is present in the given 'Data'.", ), ] outputs = [ Output(display_name="Data", name="data", method="build_data"), ] def update_build_config(self, build_config: dotdict, field_value: Any, field_name: str | None = None): """Update the build configuration when the number of fields changes. Args: build_config (dotdict): The current build configuration. field_value (Any): The new value for the field. field_name (Optional[str]): The name of the field being updated. """ if field_name == "number_of_fields": default_keys = { "code", "_type", "number_of_fields", "text_key", "old_data", "text_key_validator", } try: field_value_int = int(field_value) except ValueError: return build_config if field_value_int > self.MAX_FIELDS: build_config["number_of_fields"]["value"] = self.MAX_FIELDS msg = f"Number of fields cannot exceed {self.MAX_FIELDS}. " "Try using a Component to combine two Data." raise ValueError(msg) existing_fields = {} # Back up the existing template fields for key in list(build_config.keys()): if key not in default_keys: existing_fields[key] = build_config.pop(key) for i in range(1, field_value_int + 1): key = f"field_{i}_key" if key in existing_fields: field = existing_fields[key] build_config[key] = field else: field = DictInput( display_name=f"Field {i}", name=key, info=f"Key for field {i}.", input_types=["Text", "Data"], ) build_config[field.name] = field.to_dict() build_config["number_of_fields"]["value"] = field_value_int return build_config async def build_data(self) -> Data | list[Data]: """Build the updated data by combining the old data with new fields.""" new_data = self.get_data() if isinstance(self.old_data, list): for data_item in self.old_data: if not isinstance(data_item, Data): continue # Skip invalid items data_item.data.update(new_data) if self.text_key: data_item.text_key = self.text_key self.validate_text_key(data_item) self.status = self.old_data return self.old_data # Returns List[Data] if isinstance(self.old_data, Data): self.old_data.data.update(new_data) if self.text_key: self.old_data.text_key = self.text_key self.status = self.old_data self.validate_text_key(self.old_data) return self.old_data # Returns Data msg = "old_data is not a Data object or list of Data objects." raise ValueError(msg) def get_data(self): """Function to get the Data from the attributes.""" data = {} default_keys = { "code", "_type", "number_of_fields", "text_key", "old_data", "text_key_validator", } for attr_name, attr_value in self._attributes.items(): if attr_name in default_keys: continue # Skip default attributes if isinstance(attr_value, dict): for key, value in attr_value.items(): data[key] = value.get_text() if isinstance(value, Data) else value elif isinstance(attr_value, Data): data[attr_name] = attr_value.get_text() else: data[attr_name] = attr_value return data def validate_text_key(self, data: Data) -> None: """This function validates that the Text Key is one of the keys in the Data.""" data_keys = data.data.keys() if self.text_key and self.text_key not in data_keys: msg = f"Text Key: '{self.text_key}' not found in the Data keys: " f"{', '.join(data_keys)}" raise ValueError(msg)