from pathlib import Path import httpx from langflow.services.database.models.flow.model import FlowBase class UploadError(Exception): """Raised when an error occurs during the upload process.""" def upload(file_path: str, host: str, flow_id: str): """Upload a file to Langflow and return the file path. Args: file_path (str): The path to the file to be uploaded. host (str): The host URL of Langflow. flow_id (UUID): The ID of the flow to which the file belongs. Returns: dict: A dictionary containing the file path. Raises: UploadError: If an error occurs during the upload process. """ try: url = f"{host}/api/v1/upload/{flow_id}" with Path(file_path).open("rb") as file: response = httpx.post(url, files={"file": file}) if response.status_code in {httpx.codes.OK, httpx.codes.CREATED}: return response.json() except Exception as e: msg = f"Error uploading file: {e}" raise UploadError(msg) from e msg = f"Error uploading file: {response.status_code}" raise UploadError(msg) def upload_file(file_path: str, host: str, flow_id: str, components: list[str], tweaks: dict | None = None): """Upload a file to Langflow and return the file path. Args: file_path (str): The path to the file to be uploaded. host (str): The host URL of Langflow. port (int): The port number of Langflow. flow_id (UUID): The ID of the flow to which the file belongs. components (str): List of component IDs or names that need the file. tweaks (dict): A dictionary of tweaks to be applied to the file. Returns: dict: A dictionary containing the file path and any tweaks that were applied. Raises: UploadError: If an error occurs during the upload process. """ try: response = upload(file_path, host, flow_id) except Exception as e: msg = f"Error uploading file: {e}" raise UploadError(msg) from e if not tweaks: tweaks = {} if response["file_path"]: for component in components: if isinstance(component, str): tweaks[component] = {"path": response["file_path"]} else: msg = f"Error uploading file: component ID or name must be a string. Got {type(component)}" raise UploadError(msg) return tweaks msg = "Error uploading file" raise UploadError(msg) def get_flow(url: str, flow_id: str): """Get the details of a flow from Langflow. Args: url (str): The host URL of Langflow. port (int): The port number of Langflow. flow_id (UUID): The ID of the flow to retrieve. Returns: dict: A dictionary containing the details of the flow. Raises: UploadError: If an error occurs during the retrieval process. """ try: flow_url = f"{url}/api/v1/flows/{flow_id}" response = httpx.get(flow_url) if response.status_code == httpx.codes.OK: json_response = response.json() return FlowBase(**json_response).model_dump() except Exception as e: msg = f"Error retrieving flow: {e}" raise UploadError(msg) from e msg = f"Error retrieving flow: {response.status_code}" raise UploadError(msg)