Spaces:
Running
Running
from loguru import logger | |
from langflow.custom import Component | |
from langflow.io import DataInput, Output | |
from langflow.schema import Data | |
class MergeDataComponent(Component): | |
"""MergeDataComponent is responsible for combining multiple Data objects into a unified list of Data objects. | |
It ensures that all keys across the input Data objects are present in each merged Data object. | |
Missing keys are filled with empty strings to maintain consistency. | |
""" | |
display_name = "Merge Data" | |
description = ( | |
"Combines multiple Data objects into a unified list, ensuring all keys are present in each Data object." | |
) | |
icon = "merge" | |
inputs = [ | |
DataInput( | |
name="data_inputs", | |
display_name="Data Inputs", | |
is_list=True, | |
info="A list of Data inputs objects to be merged.", | |
), | |
] | |
outputs = [ | |
Output( | |
display_name="Merged Data", | |
name="merged_data", | |
method="merge_data", | |
), | |
] | |
def merge_data(self) -> list[Data]: | |
"""Merges multiple Data objects into a single list of Data objects. | |
Ensures that all keys from the input Data objects are present in each merged Data object. | |
Missing keys are filled with empty strings. | |
Returns: | |
List[Data]: A list of merged Data objects with consistent keys. | |
""" | |
logger.info("Initiating the data merging process.") | |
data_inputs: list[Data] = self.data_inputs | |
logger.debug(f"Received {len(data_inputs)} data input(s) for merging.") | |
if not data_inputs: | |
logger.warning("No data inputs provided. Returning an empty list.") | |
return [] | |
# Collect all unique keys from all Data objects | |
all_keys: set[str] = set() | |
for idx, data_input in enumerate(data_inputs): | |
if not isinstance(data_input, Data): | |
error_message = f"Data input at index {idx} is not of type Data." | |
logger.error(error_message) | |
type_error_message = ( | |
f"All items in data_inputs must be of type Data. Item at index {idx} is {type(data_input)}" | |
) | |
raise TypeError(type_error_message) | |
all_keys.update(data_input.data.keys()) | |
logger.debug(f"Collected {len(all_keys)} unique key(s) from input data.") | |
try: | |
# Create new list of Data objects with missing keys filled with empty strings | |
merged_data_list = [] | |
for idx, data_input in enumerate(data_inputs): | |
merged_data_dict = {} | |
for key in all_keys: | |
# Use the existing value if the key exists, otherwise use an empty string | |
value = data_input.data.get(key, "") | |
if key not in data_input.data: | |
log_message = f"Key '{key}' missing in data input at index {idx}. " "Assigning empty string." | |
logger.debug(log_message) | |
merged_data_dict[key] = value | |
merged_data = Data( | |
text_key=data_input.text_key, data=merged_data_dict, default_value=data_input.default_value | |
) | |
merged_data_list.append(merged_data) | |
logger.debug(f"Merged Data object created for input at index {idx}.") | |
except Exception: | |
logger.exception("An error occurred during the data merging process.") | |
raise | |
logger.info("Data merging process completed successfully.") | |
return merged_data_list | |