Spaces:
Running
Running
from typing import Any | |
from langflow.custom import Component | |
from langflow.field_typing.range_spec import RangeSpec | |
from langflow.inputs.inputs import ( | |
BoolInput, | |
DataInput, | |
DictInput, | |
IntInput, | |
MessageTextInput, | |
) | |
from langflow.io import Output | |
from langflow.schema import Data | |
from langflow.schema.dotdict import dotdict | |
class UpdateDataComponent(Component): | |
display_name: str = "Update Data" | |
description: str = "Dynamically update or append data with the specified fields." | |
name: str = "UpdateData" | |
MAX_FIELDS = 15 # Define a constant for maximum number of fields | |
icon = "FolderSync" | |
inputs = [ | |
DataInput( | |
name="old_data", | |
display_name="Data", | |
info="The record to update.", | |
is_list=True, # Changed to True to handle list of Data objects | |
), | |
IntInput( | |
name="number_of_fields", | |
display_name="Number of Fields", | |
info="Number of fields to be added to the record.", | |
real_time_refresh=True, | |
value=0, | |
range_spec=RangeSpec(min=1, max=MAX_FIELDS, step=1, step_type="int"), | |
), | |
MessageTextInput( | |
name="text_key", | |
display_name="Text Key", | |
info="Key that identifies the field to be used as the text content.", | |
advanced=True, | |
), | |
BoolInput( | |
name="text_key_validator", | |
display_name="Text Key Validator", | |
advanced=True, | |
info="If enabled, checks if the given 'Text Key' is present in the given 'Data'.", | |
), | |
] | |
outputs = [ | |
Output(display_name="Data", name="data", method="build_data"), | |
] | |
def update_build_config(self, build_config: dotdict, field_value: Any, field_name: str | None = None): | |
"""Update the build configuration when the number of fields changes. | |
Args: | |
build_config (dotdict): The current build configuration. | |
field_value (Any): The new value for the field. | |
field_name (Optional[str]): The name of the field being updated. | |
""" | |
if field_name == "number_of_fields": | |
default_keys = { | |
"code", | |
"_type", | |
"number_of_fields", | |
"text_key", | |
"old_data", | |
"text_key_validator", | |
} | |
try: | |
field_value_int = int(field_value) | |
except ValueError: | |
return build_config | |
if field_value_int > self.MAX_FIELDS: | |
build_config["number_of_fields"]["value"] = self.MAX_FIELDS | |
msg = f"Number of fields cannot exceed {self.MAX_FIELDS}. " "Try using a Component to combine two Data." | |
raise ValueError(msg) | |
existing_fields = {} | |
# Back up the existing template fields | |
for key in list(build_config.keys()): | |
if key not in default_keys: | |
existing_fields[key] = build_config.pop(key) | |
for i in range(1, field_value_int + 1): | |
key = f"field_{i}_key" | |
if key in existing_fields: | |
field = existing_fields[key] | |
build_config[key] = field | |
else: | |
field = DictInput( | |
display_name=f"Field {i}", | |
name=key, | |
info=f"Key for field {i}.", | |
input_types=["Text", "Data"], | |
) | |
build_config[field.name] = field.to_dict() | |
build_config["number_of_fields"]["value"] = field_value_int | |
return build_config | |
async def build_data(self) -> Data | list[Data]: | |
"""Build the updated data by combining the old data with new fields.""" | |
new_data = self.get_data() | |
if isinstance(self.old_data, list): | |
for data_item in self.old_data: | |
if not isinstance(data_item, Data): | |
continue # Skip invalid items | |
data_item.data.update(new_data) | |
if self.text_key: | |
data_item.text_key = self.text_key | |
self.validate_text_key(data_item) | |
self.status = self.old_data | |
return self.old_data # Returns List[Data] | |
if isinstance(self.old_data, Data): | |
self.old_data.data.update(new_data) | |
if self.text_key: | |
self.old_data.text_key = self.text_key | |
self.status = self.old_data | |
self.validate_text_key(self.old_data) | |
return self.old_data # Returns Data | |
msg = "old_data is not a Data object or list of Data objects." | |
raise ValueError(msg) | |
def get_data(self): | |
"""Function to get the Data from the attributes.""" | |
data = {} | |
default_keys = { | |
"code", | |
"_type", | |
"number_of_fields", | |
"text_key", | |
"old_data", | |
"text_key_validator", | |
} | |
for attr_name, attr_value in self._attributes.items(): | |
if attr_name in default_keys: | |
continue # Skip default attributes | |
if isinstance(attr_value, dict): | |
for key, value in attr_value.items(): | |
data[key] = value.get_text() if isinstance(value, Data) else value | |
elif isinstance(attr_value, Data): | |
data[attr_name] = attr_value.get_text() | |
else: | |
data[attr_name] = attr_value | |
return data | |
def validate_text_key(self, data: Data) -> None: | |
"""This function validates that the Text Key is one of the keys in the Data.""" | |
data_keys = data.data.keys() | |
if self.text_key and self.text_key not in data_keys: | |
msg = f"Text Key: '{self.text_key}' not found in the Data keys: " f"{', '.join(data_keys)}" | |
raise ValueError(msg) | |