Tai Truong
fix readme
d202ada
import asyncio
import json
from pathlib import Path
from dotenv import load_dotenv
from loguru import logger
from langflow.graph import Graph
from langflow.graph.schema import RunOutputs
from langflow.logging.logger import configure
from langflow.processing.process import process_tweaks, run_graph
from langflow.utils.async_helpers import run_until_complete
from langflow.utils.util import update_settings
def load_flow_from_json(
flow: Path | str | dict,
*,
tweaks: dict | None = None,
log_level: str | None = None,
log_file: str | None = None,
env_file: str | None = None,
cache: str | None = None,
disable_logs: bool | None = True,
) -> Graph:
"""Load a flow graph from a JSON file or a JSON object.
Args:
flow (Union[Path, str, dict]): The flow to load. It can be a file path (str or Path object)
or a JSON object (dict).
tweaks (Optional[dict]): Optional tweaks to apply to the loaded flow graph.
log_level (Optional[str]): Optional log level to configure for the flow processing.
log_file (Optional[str]): Optional log file to configure for the flow processing.
env_file (Optional[str]): Optional .env file to override environment variables.
cache (Optional[str]): Optional cache path to update the flow settings.
disable_logs (Optional[bool], default=True): Optional flag to disable logs during flow processing.
If log_level or log_file are set, disable_logs is not used.
Returns:
Graph: The loaded flow graph as a Graph object.
Raises:
TypeError: If the input is neither a file path (str or Path object) nor a JSON object (dict).
"""
# If input is a file path, load JSON from the file
log_file_path = Path(log_file) if log_file else None
configure(log_level=log_level, log_file=log_file_path, disable=disable_logs, async_file=True)
# override env variables with .env file
if env_file:
load_dotenv(env_file, override=True)
# Update settings with cache and components path
update_settings(cache=cache)
if isinstance(flow, str | Path):
with Path(flow).open(encoding="utf-8") as f:
flow_graph = json.load(f)
# If input is a dictionary, assume it's a JSON object
elif isinstance(flow, dict):
flow_graph = flow
else:
msg = "Input must be either a file path (str) or a JSON object (dict)"
raise TypeError(msg)
graph_data = flow_graph["data"]
if tweaks is not None:
graph_data = process_tweaks(graph_data, tweaks)
return Graph.from_payload(graph_data)
async def arun_flow_from_json(
flow: Path | str | dict,
input_value: str,
*,
session_id: str | None = None,
tweaks: dict | None = None,
input_type: str = "chat",
output_type: str = "chat",
output_component: str | None = None,
log_level: str | None = None,
log_file: str | None = None,
env_file: str | None = None,
cache: str | None = None,
disable_logs: bool | None = True,
fallback_to_env_vars: bool = False,
) -> list[RunOutputs]:
"""Run a flow from a JSON file or dictionary.
Args:
flow (Union[Path, str, dict]): The path to the JSON file or the JSON dictionary representing the flow.
input_value (str): The input value to be processed by the flow.
session_id (str | None, optional): The session ID to be used for the flow. Defaults to None.
tweaks (Optional[dict], optional): Optional tweaks to be applied to the flow. Defaults to None.
input_type (str, optional): The type of the input value. Defaults to "chat".
output_type (str, optional): The type of the output value. Defaults to "chat".
output_component (Optional[str], optional): The specific component to output. Defaults to None.
log_level (Optional[str], optional): The log level to use. Defaults to None.
log_file (Optional[str], optional): The log file to write logs to. Defaults to None.
env_file (Optional[str], optional): The environment file to load. Defaults to None.
cache (Optional[str], optional): The cache directory to use. Defaults to None.
disable_logs (Optional[bool], optional): Whether to disable logs. Defaults to True.
fallback_to_env_vars (bool, optional): Whether Global Variables should fallback to environment variables if
not found. Defaults to False.
Returns:
List[RunOutputs]: A list of RunOutputs objects representing the results of running the flow.
"""
if tweaks is None:
tweaks = {}
tweaks["stream"] = False
graph = await asyncio.to_thread(
load_flow_from_json,
flow=flow,
tweaks=tweaks,
log_level=log_level,
log_file=log_file,
env_file=env_file,
cache=cache,
disable_logs=disable_logs,
)
result = await run_graph(
graph=graph,
session_id=session_id,
input_value=input_value,
input_type=input_type,
output_type=output_type,
output_component=output_component,
fallback_to_env_vars=fallback_to_env_vars,
)
await logger.complete()
return result
def run_flow_from_json(
flow: Path | str | dict,
input_value: str,
*,
session_id: str | None = None,
tweaks: dict | None = None,
input_type: str = "chat",
output_type: str = "chat",
output_component: str | None = None,
log_level: str | None = None,
log_file: str | None = None,
env_file: str | None = None,
cache: str | None = None,
disable_logs: bool | None = True,
fallback_to_env_vars: bool = False,
) -> list[RunOutputs]:
"""Run a flow from a JSON file or dictionary.
Note:
This function is a synchronous wrapper around `arun_flow_from_json`.
It creates an event loop if one does not exist and runs the flow.
Args:
flow (Union[Path, str, dict]): The path to the JSON file or the JSON dictionary representing the flow.
input_value (str): The input value to be processed by the flow.
session_id (str | None, optional): The session ID to be used for the flow. Defaults to None.
tweaks (Optional[dict], optional): Optional tweaks to be applied to the flow. Defaults to None.
input_type (str, optional): The type of the input value. Defaults to "chat".
output_type (str, optional): The type of the output value. Defaults to "chat".
output_component (Optional[str], optional): The specific component to output. Defaults to None.
log_level (Optional[str], optional): The log level to use. Defaults to None.
log_file (Optional[str], optional): The log file to write logs to. Defaults to None.
env_file (Optional[str], optional): The environment file to load. Defaults to None.
cache (Optional[str], optional): The cache directory to use. Defaults to None.
disable_logs (Optional[bool], optional): Whether to disable logs. Defaults to True.
fallback_to_env_vars (bool, optional): Whether Global Variables should fallback to environment variables if
not found. Defaults to False.
Returns:
List[RunOutputs]: A list of RunOutputs objects representing the results of running the flow.
"""
return run_until_complete(
arun_flow_from_json(
flow,
input_value,
session_id=session_id,
tweaks=tweaks,
input_type=input_type,
output_type=output_type,
output_component=output_component,
log_level=log_level,
log_file=log_file,
env_file=env_file,
cache=cache,
disable_logs=disable_logs,
fallback_to_env_vars=fallback_to_env_vars,
)
)