import asyncio
import json
from pathlib import Path

from dotenv import load_dotenv
from loguru import logger

from langflow.graph import Graph
from langflow.graph.schema import RunOutputs
from langflow.logging.logger import configure
from langflow.processing.process import process_tweaks, run_graph
from langflow.utils.async_helpers import run_until_complete
from langflow.utils.util import update_settings


def load_flow_from_json(
    flow: Path | str | dict,
    *,
    tweaks: dict | None = None,
    log_level: str | None = None,
    log_file: str | None = None,
    env_file: str | None = None,
    cache: str | None = None,
    disable_logs: bool | None = True,
) -> Graph:
    """Load a flow graph from a JSON file or a JSON object.

    Args:
        flow (Union[Path, str, dict]): The flow to load. It can be a file path (str or Path object)
            or a JSON object (dict).
        tweaks (Optional[dict]): Optional tweaks to apply to the loaded flow graph.
        log_level (Optional[str]): Optional log level to configure for the flow processing.
        log_file (Optional[str]): Optional log file to configure for the flow processing.
        env_file (Optional[str]): Optional .env file to override environment variables.
        cache (Optional[str]): Optional cache path to update the flow settings.
        disable_logs (Optional[bool], default=True): Optional flag to disable logs during flow processing.
            If log_level or log_file are set, disable_logs is not used.

    Returns:
        Graph: The loaded flow graph as a Graph object.

    Raises:
        TypeError: If the input is neither a file path (str or Path object) nor a JSON object (dict).

    """
    # If input is a file path, load JSON from the file
    log_file_path = Path(log_file) if log_file else None
    configure(log_level=log_level, log_file=log_file_path, disable=disable_logs, async_file=True)

    # override env variables with .env file
    if env_file:
        load_dotenv(env_file, override=True)

    # Update settings with cache and components path
    update_settings(cache=cache)

    if isinstance(flow, str | Path):
        with Path(flow).open(encoding="utf-8") as f:
            flow_graph = json.load(f)
    # If input is a dictionary, assume it's a JSON object
    elif isinstance(flow, dict):
        flow_graph = flow
    else:
        msg = "Input must be either a file path (str) or a JSON object (dict)"
        raise TypeError(msg)

    graph_data = flow_graph["data"]
    if tweaks is not None:
        graph_data = process_tweaks(graph_data, tweaks)

    return Graph.from_payload(graph_data)


async def arun_flow_from_json(
    flow: Path | str | dict,
    input_value: str,
    *,
    session_id: str | None = None,
    tweaks: dict | None = None,
    input_type: str = "chat",
    output_type: str = "chat",
    output_component: str | None = None,
    log_level: str | None = None,
    log_file: str | None = None,
    env_file: str | None = None,
    cache: str | None = None,
    disable_logs: bool | None = True,
    fallback_to_env_vars: bool = False,
) -> list[RunOutputs]:
    """Run a flow from a JSON file or dictionary.

    Args:
        flow (Union[Path, str, dict]): The path to the JSON file or the JSON dictionary representing the flow.
        input_value (str): The input value to be processed by the flow.
        session_id (str | None, optional): The session ID to be used for the flow. Defaults to None.
        tweaks (Optional[dict], optional): Optional tweaks to be applied to the flow. Defaults to None.
        input_type (str, optional): The type of the input value. Defaults to "chat".
        output_type (str, optional): The type of the output value. Defaults to "chat".
        output_component (Optional[str], optional): The specific component to output. Defaults to None.
        log_level (Optional[str], optional): The log level to use. Defaults to None.
        log_file (Optional[str], optional): The log file to write logs to. Defaults to None.
        env_file (Optional[str], optional): The environment file to load. Defaults to None.
        cache (Optional[str], optional): The cache directory to use. Defaults to None.
        disable_logs (Optional[bool], optional): Whether to disable logs. Defaults to True.
        fallback_to_env_vars (bool, optional): Whether Global Variables should fallback to environment variables if
            not found. Defaults to False.

    Returns:
        List[RunOutputs]: A list of RunOutputs objects representing the results of running the flow.
    """
    if tweaks is None:
        tweaks = {}
    tweaks["stream"] = False
    graph = await asyncio.to_thread(
        load_flow_from_json,
        flow=flow,
        tweaks=tweaks,
        log_level=log_level,
        log_file=log_file,
        env_file=env_file,
        cache=cache,
        disable_logs=disable_logs,
    )
    result = await run_graph(
        graph=graph,
        session_id=session_id,
        input_value=input_value,
        input_type=input_type,
        output_type=output_type,
        output_component=output_component,
        fallback_to_env_vars=fallback_to_env_vars,
    )
    await logger.complete()
    return result


def run_flow_from_json(
    flow: Path | str | dict,
    input_value: str,
    *,
    session_id: str | None = None,
    tweaks: dict | None = None,
    input_type: str = "chat",
    output_type: str = "chat",
    output_component: str | None = None,
    log_level: str | None = None,
    log_file: str | None = None,
    env_file: str | None = None,
    cache: str | None = None,
    disable_logs: bool | None = True,
    fallback_to_env_vars: bool = False,
) -> list[RunOutputs]:
    """Run a flow from a JSON file or dictionary.

    Note:
        This function is a synchronous wrapper around `arun_flow_from_json`.
        It creates an event loop if one does not exist and runs the flow.

    Args:
        flow (Union[Path, str, dict]): The path to the JSON file or the JSON dictionary representing the flow.
        input_value (str): The input value to be processed by the flow.
        session_id (str | None, optional): The session ID to be used for the flow. Defaults to None.
        tweaks (Optional[dict], optional): Optional tweaks to be applied to the flow. Defaults to None.
        input_type (str, optional): The type of the input value. Defaults to "chat".
        output_type (str, optional): The type of the output value. Defaults to "chat".
        output_component (Optional[str], optional): The specific component to output. Defaults to None.
        log_level (Optional[str], optional): The log level to use. Defaults to None.
        log_file (Optional[str], optional): The log file to write logs to. Defaults to None.
        env_file (Optional[str], optional): The environment file to load. Defaults to None.
        cache (Optional[str], optional): The cache directory to use. Defaults to None.
        disable_logs (Optional[bool], optional): Whether to disable logs. Defaults to True.
        fallback_to_env_vars (bool, optional): Whether Global Variables should fallback to environment variables if
            not found. Defaults to False.

    Returns:
        List[RunOutputs]: A list of RunOutputs objects representing the results of running the flow.
    """
    return run_until_complete(
        arun_flow_from_json(
            flow,
            input_value,
            session_id=session_id,
            tweaks=tweaks,
            input_type=input_type,
            output_type=output_type,
            output_component=output_component,
            log_level=log_level,
            log_file=log_file,
            env_file=env_file,
            cache=cache,
            disable_logs=disable_logs,
            fallback_to_env_vars=fallback_to_env_vars,
        )
    )