Spaces:
Running
Running
import json | |
from json import JSONDecodeError | |
import jq | |
from json_repair import repair_json | |
from loguru import logger | |
from langflow.custom import Component | |
from langflow.inputs import HandleInput, MessageTextInput | |
from langflow.io import Output | |
from langflow.schema import Data | |
from langflow.schema.message import Message | |
class ParseJSONDataComponent(Component): | |
display_name = "Parse JSON" | |
description = "Convert and extract JSON fields." | |
icon = "braces" | |
name = "ParseJSONData" | |
legacy: bool = True | |
inputs = [ | |
HandleInput( | |
name="input_value", | |
display_name="Input", | |
info="Data object to filter.", | |
required=True, | |
input_types=["Message", "Data"], | |
), | |
MessageTextInput( | |
name="query", | |
display_name="JQ Query", | |
info="JQ Query to filter the data. The input is always a JSON list.", | |
required=True, | |
), | |
] | |
outputs = [ | |
Output(display_name="Filtered Data", name="filtered_data", method="filter_data"), | |
] | |
def _parse_data(self, input_value) -> str: | |
if isinstance(input_value, Message) and isinstance(input_value.text, str): | |
return input_value.text | |
if isinstance(input_value, Data): | |
return json.dumps(input_value.data) | |
return str(input_value) | |
def filter_data(self) -> list[Data]: | |
to_filter = self.input_value | |
if not to_filter: | |
return [] | |
# Check if input is a list | |
if isinstance(to_filter, list): | |
to_filter = [self._parse_data(f) for f in to_filter] | |
else: | |
to_filter = self._parse_data(to_filter) | |
# If input is not a list, don't wrap it in a list | |
if not isinstance(to_filter, list): | |
to_filter = repair_json(to_filter) | |
try: | |
to_filter_as_dict = json.loads(to_filter) | |
except JSONDecodeError: | |
try: | |
to_filter_as_dict = json.loads(repair_json(to_filter)) | |
except JSONDecodeError as e: | |
msg = f"Invalid JSON: {e}" | |
raise ValueError(msg) from e | |
else: | |
to_filter = [repair_json(f) for f in to_filter] | |
to_filter_as_dict = [] | |
for f in to_filter: | |
try: | |
to_filter_as_dict.append(json.loads(f)) | |
except JSONDecodeError: | |
try: | |
to_filter_as_dict.append(json.loads(repair_json(f))) | |
except JSONDecodeError as e: | |
msg = f"Invalid JSON: {e}" | |
raise ValueError(msg) from e | |
to_filter = to_filter_as_dict | |
full_filter_str = json.dumps(to_filter_as_dict) | |
logger.info("to_filter: ", to_filter) | |
results = jq.compile(self.query).input_text(full_filter_str).all() | |
logger.info("results: ", results) | |
return [Data(data=value) if isinstance(value, dict) else Data(text=str(value)) for value in results] | |