Spaces:
Running
Running
from pathlib import Path | |
import httpx | |
from langflow.services.database.models.flow.model import FlowBase | |
class UploadError(Exception): | |
"""Raised when an error occurs during the upload process.""" | |
def upload(file_path: str, host: str, flow_id: str): | |
"""Upload a file to Langflow and return the file path. | |
Args: | |
file_path (str): The path to the file to be uploaded. | |
host (str): The host URL of Langflow. | |
flow_id (UUID): The ID of the flow to which the file belongs. | |
Returns: | |
dict: A dictionary containing the file path. | |
Raises: | |
UploadError: If an error occurs during the upload process. | |
""" | |
try: | |
url = f"{host}/api/v1/upload/{flow_id}" | |
with Path(file_path).open("rb") as file: | |
response = httpx.post(url, files={"file": file}) | |
if response.status_code in {httpx.codes.OK, httpx.codes.CREATED}: | |
return response.json() | |
except Exception as e: | |
msg = f"Error uploading file: {e}" | |
raise UploadError(msg) from e | |
msg = f"Error uploading file: {response.status_code}" | |
raise UploadError(msg) | |
def upload_file(file_path: str, host: str, flow_id: str, components: list[str], tweaks: dict | None = None): | |
"""Upload a file to Langflow and return the file path. | |
Args: | |
file_path (str): The path to the file to be uploaded. | |
host (str): The host URL of Langflow. | |
port (int): The port number of Langflow. | |
flow_id (UUID): The ID of the flow to which the file belongs. | |
components (str): List of component IDs or names that need the file. | |
tweaks (dict): A dictionary of tweaks to be applied to the file. | |
Returns: | |
dict: A dictionary containing the file path and any tweaks that were applied. | |
Raises: | |
UploadError: If an error occurs during the upload process. | |
""" | |
try: | |
response = upload(file_path, host, flow_id) | |
except Exception as e: | |
msg = f"Error uploading file: {e}" | |
raise UploadError(msg) from e | |
if not tweaks: | |
tweaks = {} | |
if response["file_path"]: | |
for component in components: | |
if isinstance(component, str): | |
tweaks[component] = {"path": response["file_path"]} | |
else: | |
msg = f"Error uploading file: component ID or name must be a string. Got {type(component)}" | |
raise UploadError(msg) | |
return tweaks | |
msg = "Error uploading file" | |
raise UploadError(msg) | |
def get_flow(url: str, flow_id: str): | |
"""Get the details of a flow from Langflow. | |
Args: | |
url (str): The host URL of Langflow. | |
port (int): The port number of Langflow. | |
flow_id (UUID): The ID of the flow to retrieve. | |
Returns: | |
dict: A dictionary containing the details of the flow. | |
Raises: | |
UploadError: If an error occurs during the retrieval process. | |
""" | |
try: | |
flow_url = f"{url}/api/v1/flows/{flow_id}" | |
response = httpx.get(flow_url) | |
if response.status_code == httpx.codes.OK: | |
json_response = response.json() | |
return FlowBase(**json_response).model_dump() | |
except Exception as e: | |
msg = f"Error retrieving flow: {e}" | |
raise UploadError(msg) from e | |
msg = f"Error retrieving flow: {response.status_code}" | |
raise UploadError(msg) | |