Spaces:
Running
Running
from langflow.custom import Component | |
from langflow.io import DataInput, Output, StrInput | |
from langflow.schema import Data | |
class ExtractDataKeyComponent(Component): | |
display_name = "Extract Key" | |
description = ( | |
"Extract a specific key from a Data object or a list of " | |
"Data objects and return the extracted value(s) as Data object(s)." | |
) | |
icon = "key" | |
name = "ExtractaKey" | |
legacy = True | |
inputs = [ | |
DataInput( | |
name="data_input", | |
display_name="Data Input", | |
info="The Data object or list of Data objects to extract the key from.", | |
), | |
StrInput( | |
name="key", | |
display_name="Key to Extract", | |
info="The key in the Data object(s) to extract.", | |
), | |
] | |
outputs = [ | |
Output(display_name="Extracted Data", name="extracted_data", method="extract_key"), | |
] | |
def extract_key(self) -> Data | list[Data]: | |
key = self.key | |
if isinstance(self.data_input, list): | |
result = [] | |
for item in self.data_input: | |
if isinstance(item, Data) and key in item.data: | |
extracted_value = item.data[key] | |
result.append(Data(data={key: extracted_value})) | |
self.status = result | |
return result | |
if isinstance(self.data_input, Data): | |
if key in self.data_input.data: | |
extracted_value = self.data_input.data[key] | |
result = Data(data={key: extracted_value}) | |
self.status = result | |
return result | |
self.status = f"Key '{key}' not found in Data object." | |
return Data(data={"error": f"Key '{key}' not found in Data object."}) | |
self.status = "Invalid input. Expected Data object or list of Data objects." | |
return Data(data={"error": "Invalid input. Expected Data object or list of Data objects."}) | |