Spaces:
Running
Running
import json | |
from pathlib import Path | |
from json_repair import repair_json | |
from langflow.custom import Component | |
from langflow.io import FileInput, MessageTextInput, MultilineInput, Output | |
from langflow.schema import Data | |
class JSONToDataComponent(Component): | |
display_name = "Load JSON" | |
description = ( | |
"Convert a JSON file, JSON from a file path, or a JSON string to a Data object or a list of Data objects" | |
) | |
icon = "braces" | |
name = "JSONtoData" | |
legacy = True | |
inputs = [ | |
FileInput( | |
name="json_file", | |
display_name="JSON File", | |
file_types=["json"], | |
info="Upload a JSON file to convert to a Data object or list of Data objects", | |
), | |
MessageTextInput( | |
name="json_path", | |
display_name="JSON File Path", | |
info="Provide the path to the JSON file as pure text", | |
), | |
MultilineInput( | |
name="json_string", | |
display_name="JSON String", | |
info="Enter a valid JSON string (object or array) to convert to a Data object or list of Data objects", | |
), | |
] | |
outputs = [ | |
Output(name="data", display_name="Data", method="convert_json_to_data"), | |
] | |
def convert_json_to_data(self) -> Data | list[Data]: | |
if sum(bool(field) for field in [self.json_file, self.json_path, self.json_string]) != 1: | |
msg = "Please provide exactly one of: JSON file, file path, or JSON string." | |
self.status = msg | |
raise ValueError(msg) | |
json_data = None | |
try: | |
if self.json_file: | |
resolved_path = self.resolve_path(self.json_file) | |
file_path = Path(resolved_path) | |
if file_path.suffix.lower() != ".json": | |
self.status = "The provided file must be a JSON file." | |
else: | |
json_data = file_path.read_text(encoding="utf-8") | |
elif self.json_path: | |
file_path = Path(self.json_path) | |
if file_path.suffix.lower() != ".json": | |
self.status = "The provided file must be a JSON file." | |
else: | |
json_data = file_path.read_text(encoding="utf-8") | |
else: | |
json_data = self.json_string | |
if json_data: | |
# Try to parse the JSON string | |
try: | |
parsed_data = json.loads(json_data) | |
except json.JSONDecodeError: | |
# If JSON parsing fails, try to repair the JSON string | |
repaired_json_string = repair_json(json_data) | |
parsed_data = json.loads(repaired_json_string) | |
# Check if the parsed data is a list | |
if isinstance(parsed_data, list): | |
result = [Data(data=item) for item in parsed_data] | |
else: | |
result = Data(data=parsed_data) | |
self.status = result | |
return result | |
except (json.JSONDecodeError, SyntaxError, ValueError) as e: | |
error_message = f"Invalid JSON or Python literal: {e}" | |
self.status = error_message | |
raise ValueError(error_message) from e | |
except Exception as e: | |
error_message = f"An error occurred: {e}" | |
self.status = error_message | |
raise ValueError(error_message) from e | |
# An error occurred | |
raise ValueError(self.status) | |