Tai Truong
fix readme
d202ada
import json
from pathlib import Path
from json_repair import repair_json
from langflow.custom import Component
from langflow.io import FileInput, MessageTextInput, MultilineInput, Output
from langflow.schema import Data
class JSONToDataComponent(Component):
display_name = "Load JSON"
description = (
"Convert a JSON file, JSON from a file path, or a JSON string to a Data object or a list of Data objects"
)
icon = "braces"
name = "JSONtoData"
legacy = True
inputs = [
FileInput(
name="json_file",
display_name="JSON File",
file_types=["json"],
info="Upload a JSON file to convert to a Data object or list of Data objects",
),
MessageTextInput(
name="json_path",
display_name="JSON File Path",
info="Provide the path to the JSON file as pure text",
),
MultilineInput(
name="json_string",
display_name="JSON String",
info="Enter a valid JSON string (object or array) to convert to a Data object or list of Data objects",
),
]
outputs = [
Output(name="data", display_name="Data", method="convert_json_to_data"),
]
def convert_json_to_data(self) -> Data | list[Data]:
if sum(bool(field) for field in [self.json_file, self.json_path, self.json_string]) != 1:
msg = "Please provide exactly one of: JSON file, file path, or JSON string."
self.status = msg
raise ValueError(msg)
json_data = None
try:
if self.json_file:
resolved_path = self.resolve_path(self.json_file)
file_path = Path(resolved_path)
if file_path.suffix.lower() != ".json":
self.status = "The provided file must be a JSON file."
else:
json_data = file_path.read_text(encoding="utf-8")
elif self.json_path:
file_path = Path(self.json_path)
if file_path.suffix.lower() != ".json":
self.status = "The provided file must be a JSON file."
else:
json_data = file_path.read_text(encoding="utf-8")
else:
json_data = self.json_string
if json_data:
# Try to parse the JSON string
try:
parsed_data = json.loads(json_data)
except json.JSONDecodeError:
# If JSON parsing fails, try to repair the JSON string
repaired_json_string = repair_json(json_data)
parsed_data = json.loads(repaired_json_string)
# Check if the parsed data is a list
if isinstance(parsed_data, list):
result = [Data(data=item) for item in parsed_data]
else:
result = Data(data=parsed_data)
self.status = result
return result
except (json.JSONDecodeError, SyntaxError, ValueError) as e:
error_message = f"Invalid JSON or Python literal: {e}"
self.status = error_message
raise ValueError(error_message) from e
except Exception as e:
error_message = f"An error occurred: {e}"
self.status = error_message
raise ValueError(error_message) from e
# An error occurred
raise ValueError(self.status)