Spaces:
Running
Running
| import asyncio | |
| import json | |
| from pathlib import Path | |
| from dotenv import load_dotenv | |
| from loguru import logger | |
| from langflow.graph import Graph | |
| from langflow.graph.schema import RunOutputs | |
| from langflow.logging.logger import configure | |
| from langflow.processing.process import process_tweaks, run_graph | |
| from langflow.utils.async_helpers import run_until_complete | |
| from langflow.utils.util import update_settings | |
| def load_flow_from_json( | |
| flow: Path | str | dict, | |
| *, | |
| tweaks: dict | None = None, | |
| log_level: str | None = None, | |
| log_file: str | None = None, | |
| env_file: str | None = None, | |
| cache: str | None = None, | |
| disable_logs: bool | None = True, | |
| ) -> Graph: | |
| """Load a flow graph from a JSON file or a JSON object. | |
| Args: | |
| flow (Union[Path, str, dict]): The flow to load. It can be a file path (str or Path object) | |
| or a JSON object (dict). | |
| tweaks (Optional[dict]): Optional tweaks to apply to the loaded flow graph. | |
| log_level (Optional[str]): Optional log level to configure for the flow processing. | |
| log_file (Optional[str]): Optional log file to configure for the flow processing. | |
| env_file (Optional[str]): Optional .env file to override environment variables. | |
| cache (Optional[str]): Optional cache path to update the flow settings. | |
| disable_logs (Optional[bool], default=True): Optional flag to disable logs during flow processing. | |
| If log_level or log_file are set, disable_logs is not used. | |
| Returns: | |
| Graph: The loaded flow graph as a Graph object. | |
| Raises: | |
| TypeError: If the input is neither a file path (str or Path object) nor a JSON object (dict). | |
| """ | |
| # If input is a file path, load JSON from the file | |
| log_file_path = Path(log_file) if log_file else None | |
| configure(log_level=log_level, log_file=log_file_path, disable=disable_logs, async_file=True) | |
| # override env variables with .env file | |
| if env_file: | |
| load_dotenv(env_file, override=True) | |
| # Update settings with cache and components path | |
| update_settings(cache=cache) | |
| if isinstance(flow, str | Path): | |
| with Path(flow).open(encoding="utf-8") as f: | |
| flow_graph = json.load(f) | |
| # If input is a dictionary, assume it's a JSON object | |
| elif isinstance(flow, dict): | |
| flow_graph = flow | |
| else: | |
| msg = "Input must be either a file path (str) or a JSON object (dict)" | |
| raise TypeError(msg) | |
| graph_data = flow_graph["data"] | |
| if tweaks is not None: | |
| graph_data = process_tweaks(graph_data, tweaks) | |
| return Graph.from_payload(graph_data) | |
| async def arun_flow_from_json( | |
| flow: Path | str | dict, | |
| input_value: str, | |
| *, | |
| session_id: str | None = None, | |
| tweaks: dict | None = None, | |
| input_type: str = "chat", | |
| output_type: str = "chat", | |
| output_component: str | None = None, | |
| log_level: str | None = None, | |
| log_file: str | None = None, | |
| env_file: str | None = None, | |
| cache: str | None = None, | |
| disable_logs: bool | None = True, | |
| fallback_to_env_vars: bool = False, | |
| ) -> list[RunOutputs]: | |
| """Run a flow from a JSON file or dictionary. | |
| Args: | |
| flow (Union[Path, str, dict]): The path to the JSON file or the JSON dictionary representing the flow. | |
| input_value (str): The input value to be processed by the flow. | |
| session_id (str | None, optional): The session ID to be used for the flow. Defaults to None. | |
| tweaks (Optional[dict], optional): Optional tweaks to be applied to the flow. Defaults to None. | |
| input_type (str, optional): The type of the input value. Defaults to "chat". | |
| output_type (str, optional): The type of the output value. Defaults to "chat". | |
| output_component (Optional[str], optional): The specific component to output. Defaults to None. | |
| log_level (Optional[str], optional): The log level to use. Defaults to None. | |
| log_file (Optional[str], optional): The log file to write logs to. Defaults to None. | |
| env_file (Optional[str], optional): The environment file to load. Defaults to None. | |
| cache (Optional[str], optional): The cache directory to use. Defaults to None. | |
| disable_logs (Optional[bool], optional): Whether to disable logs. Defaults to True. | |
| fallback_to_env_vars (bool, optional): Whether Global Variables should fallback to environment variables if | |
| not found. Defaults to False. | |
| Returns: | |
| List[RunOutputs]: A list of RunOutputs objects representing the results of running the flow. | |
| """ | |
| if tweaks is None: | |
| tweaks = {} | |
| tweaks["stream"] = False | |
| graph = await asyncio.to_thread( | |
| load_flow_from_json, | |
| flow=flow, | |
| tweaks=tweaks, | |
| log_level=log_level, | |
| log_file=log_file, | |
| env_file=env_file, | |
| cache=cache, | |
| disable_logs=disable_logs, | |
| ) | |
| result = await run_graph( | |
| graph=graph, | |
| session_id=session_id, | |
| input_value=input_value, | |
| input_type=input_type, | |
| output_type=output_type, | |
| output_component=output_component, | |
| fallback_to_env_vars=fallback_to_env_vars, | |
| ) | |
| await logger.complete() | |
| return result | |
| def run_flow_from_json( | |
| flow: Path | str | dict, | |
| input_value: str, | |
| *, | |
| session_id: str | None = None, | |
| tweaks: dict | None = None, | |
| input_type: str = "chat", | |
| output_type: str = "chat", | |
| output_component: str | None = None, | |
| log_level: str | None = None, | |
| log_file: str | None = None, | |
| env_file: str | None = None, | |
| cache: str | None = None, | |
| disable_logs: bool | None = True, | |
| fallback_to_env_vars: bool = False, | |
| ) -> list[RunOutputs]: | |
| """Run a flow from a JSON file or dictionary. | |
| Note: | |
| This function is a synchronous wrapper around `arun_flow_from_json`. | |
| It creates an event loop if one does not exist and runs the flow. | |
| Args: | |
| flow (Union[Path, str, dict]): The path to the JSON file or the JSON dictionary representing the flow. | |
| input_value (str): The input value to be processed by the flow. | |
| session_id (str | None, optional): The session ID to be used for the flow. Defaults to None. | |
| tweaks (Optional[dict], optional): Optional tweaks to be applied to the flow. Defaults to None. | |
| input_type (str, optional): The type of the input value. Defaults to "chat". | |
| output_type (str, optional): The type of the output value. Defaults to "chat". | |
| output_component (Optional[str], optional): The specific component to output. Defaults to None. | |
| log_level (Optional[str], optional): The log level to use. Defaults to None. | |
| log_file (Optional[str], optional): The log file to write logs to. Defaults to None. | |
| env_file (Optional[str], optional): The environment file to load. Defaults to None. | |
| cache (Optional[str], optional): The cache directory to use. Defaults to None. | |
| disable_logs (Optional[bool], optional): Whether to disable logs. Defaults to True. | |
| fallback_to_env_vars (bool, optional): Whether Global Variables should fallback to environment variables if | |
| not found. Defaults to False. | |
| Returns: | |
| List[RunOutputs]: A list of RunOutputs objects representing the results of running the flow. | |
| """ | |
| return run_until_complete( | |
| arun_flow_from_json( | |
| flow, | |
| input_value, | |
| session_id=session_id, | |
| tweaks=tweaks, | |
| input_type=input_type, | |
| output_type=output_type, | |
| output_component=output_component, | |
| log_level=log_level, | |
| log_file=log_file, | |
| env_file=env_file, | |
| cache=cache, | |
| disable_logs=disable_logs, | |
| fallback_to_env_vars=fallback_to_env_vars, | |
| ) | |
| ) | |