Spaces:
Running
Running
import asyncio | |
import json | |
import mimetypes | |
import re | |
import tempfile | |
from datetime import datetime | |
from pathlib import Path | |
from typing import Any | |
from urllib.parse import parse_qsl, urlencode, urlparse, urlunparse | |
from zoneinfo import ZoneInfo | |
import httpx | |
import validators | |
from langflow.base.curl.parse import parse_context | |
from langflow.custom import Component | |
from langflow.io import BoolInput, DataInput, DropdownInput, IntInput, MessageTextInput, NestedDictInput, Output | |
from langflow.schema import Data | |
from langflow.schema.dotdict import dotdict | |
class APIRequestComponent(Component): | |
display_name = "API Request" | |
description = ( | |
"This component allows you to make HTTP requests to one or more URLs. " | |
"You can provide headers and body as either dictionaries or Data objects. " | |
"Additionally, you can append query parameters to the URLs.\n\n" | |
"**Note:** Check advanced options for more settings." | |
) | |
icon = "Globe" | |
name = "APIRequest" | |
inputs = [ | |
MessageTextInput( | |
name="urls", | |
display_name="URLs", | |
list=True, | |
info="Enter one or more URLs, separated by commas.", | |
), | |
MessageTextInput( | |
name="curl", | |
display_name="cURL", | |
info="Paste a curl command to populate the fields. " | |
"This will fill in the dictionary fields for headers and body.", | |
advanced=False, | |
refresh_button=True, | |
real_time_refresh=True, | |
tool_mode=True, | |
), | |
DropdownInput( | |
name="method", | |
display_name="Method", | |
options=["GET", "POST", "PATCH", "PUT"], | |
value="GET", | |
info="The HTTP method to use (GET, POST, PATCH, PUT).", | |
), | |
NestedDictInput( | |
name="headers", | |
display_name="Headers", | |
info="The headers to send with the request as a dictionary. This is populated when using the CURL field.", | |
input_types=["Data"], | |
), | |
NestedDictInput( | |
name="body", | |
display_name="Body", | |
info="The body to send with the request as a dictionary (for POST, PATCH, PUT). " | |
"This is populated when using the CURL field.", | |
input_types=["Data"], | |
), | |
DataInput( | |
name="query_params", | |
display_name="Query Parameters", | |
info="The query parameters to append to the URL.", | |
tool_mode=True, | |
), | |
IntInput( | |
name="timeout", | |
display_name="Timeout", | |
value=5, | |
info="The timeout to use for the request.", | |
), | |
BoolInput( | |
name="follow_redirects", | |
display_name="Follow Redirects", | |
value=True, | |
info="Whether to follow http redirects.", | |
advanced=True, | |
), | |
BoolInput( | |
name="save_to_file", | |
display_name="Save to File", | |
value=False, | |
info="Save the API response to a temporary file", | |
advanced=True, | |
), | |
BoolInput( | |
name="include_httpx_metadata", | |
display_name="Include HTTPx Metadata", | |
value=False, | |
info=( | |
"Include properties such as headers, status_code, response_headers, " | |
"and redirection_history in the output." | |
), | |
advanced=True, | |
), | |
] | |
outputs = [ | |
Output(display_name="Data", name="data", method="make_requests"), | |
] | |
def parse_curl(self, curl: str, build_config: dotdict) -> dotdict: | |
try: | |
parsed = parse_context(curl) | |
build_config["urls"]["value"] = [parsed.url] | |
build_config["method"]["value"] = parsed.method.upper() | |
build_config["headers"]["value"] = dict(parsed.headers) | |
if parsed.data: | |
try: | |
json_data = json.loads(parsed.data) | |
build_config["body"]["value"] = json_data | |
except json.JSONDecodeError: | |
self.log("Error decoding JSON data") | |
else: | |
build_config["body"]["value"] = {} | |
except Exception as exc: | |
msg = f"Error parsing curl: {exc}" | |
self.log(msg) | |
raise ValueError(msg) from exc | |
return build_config | |
def update_build_config(self, build_config: dotdict, field_value: Any, field_name: str | None = None): | |
if field_name == "curl" and field_value: | |
build_config = self.parse_curl(field_value, build_config) | |
return build_config | |
async def make_request( | |
self, | |
client: httpx.AsyncClient, | |
method: str, | |
url: str, | |
headers: dict | None = None, | |
body: dict | None = None, | |
timeout: int = 5, | |
*, | |
follow_redirects: bool = True, | |
save_to_file: bool = False, | |
include_httpx_metadata: bool = False, | |
) -> Data: | |
method = method.upper() | |
if method not in {"GET", "POST", "PATCH", "PUT", "DELETE"}: | |
msg = f"Unsupported method: {method}" | |
raise ValueError(msg) | |
if isinstance(body, str) and body: | |
try: | |
body = json.loads(body) | |
except Exception as e: | |
msg = f"Error decoding JSON data: {e}" | |
self.log.exception(msg) | |
body = None | |
raise ValueError(msg) from e | |
data = body or None | |
redirection_history = [] | |
try: | |
response = await client.request( | |
method, | |
url, | |
headers=headers, | |
json=data, | |
timeout=timeout, | |
follow_redirects=follow_redirects, | |
) | |
redirection_history = [ | |
{"url": str(redirect.url), "status_code": redirect.status_code} for redirect in response.history | |
] | |
if response.is_redirect: | |
redirection_history.append({"url": str(response.url), "status_code": response.status_code}) | |
is_binary, file_path = self._response_info(response, with_file_path=save_to_file) | |
response_headers = self._headers_to_dict(response.headers) | |
metadata: dict[str, Any] = { | |
"source": url, | |
} | |
if save_to_file: | |
mode = "wb" if is_binary else "w" | |
encoding = response.encoding if mode == "w" else None | |
if file_path: | |
with file_path.open(mode, encoding=encoding) as f: | |
f.write(response.content if is_binary else response.text) | |
if include_httpx_metadata: | |
metadata.update( | |
{ | |
"file_path": str(file_path), | |
"headers": headers, | |
"status_code": response.status_code, | |
"response_headers": response_headers, | |
**({"redirection_history": redirection_history} if redirection_history else {}), | |
} | |
) | |
return Data(data=metadata) | |
# Populate result when not saving to a file | |
if is_binary: | |
result = response.content | |
else: | |
try: | |
result = response.json() | |
except Exception: # noqa: BLE001 | |
self.log("Error decoding JSON response") | |
result = response.text.encode("utf-8") | |
# Add result to metadata | |
metadata.update({"result": result}) | |
# Add metadata to the output | |
if include_httpx_metadata: | |
metadata.update( | |
{ | |
"headers": headers, | |
"status_code": response.status_code, | |
"response_headers": response_headers, | |
**({"redirection_history": redirection_history} if redirection_history else {}), | |
} | |
) | |
return Data(data=metadata) | |
except httpx.TimeoutException: | |
return Data( | |
data={ | |
"source": url, | |
"headers": headers, | |
"status_code": 408, | |
"error": "Request timed out", | |
}, | |
) | |
except Exception as exc: # noqa: BLE001 | |
self.log(f"Error making request to {url}") | |
return Data( | |
data={ | |
"source": url, | |
"headers": headers, | |
"status_code": 500, | |
"error": str(exc), | |
**({"redirection_history": redirection_history} if redirection_history else {}), | |
}, | |
) | |
def add_query_params(self, url: str, params: dict) -> str: | |
url_parts = list(urlparse(url)) | |
query = dict(parse_qsl(url_parts[4])) | |
query.update(params) | |
url_parts[4] = urlencode(query) | |
return urlunparse(url_parts) | |
async def make_requests(self) -> list[Data]: | |
method = self.method | |
urls = [url.strip() for url in self.urls if url.strip()] | |
curl = self.curl | |
headers = self.headers or {} | |
body = self.body or {} | |
timeout = self.timeout | |
follow_redirects = self.follow_redirects | |
save_to_file = self.save_to_file | |
include_httpx_metadata = self.include_httpx_metadata | |
invalid_urls = [url for url in urls if not validators.url(url)] | |
if invalid_urls: | |
msg = f"Invalid URLs provided: {invalid_urls}" | |
raise ValueError(msg) | |
if isinstance(self.query_params, str): | |
query_params = dict(parse_qsl(self.query_params)) | |
else: | |
query_params = self.query_params.data if self.query_params else {} | |
if curl: | |
self._build_config = self.parse_curl(curl, dotdict()) | |
if isinstance(headers, Data): | |
headers = headers.data | |
if isinstance(body, Data): | |
body = body.data | |
bodies = [body] * len(urls) | |
urls = [self.add_query_params(url, query_params) for url in urls] | |
async with httpx.AsyncClient() as client: | |
results = await asyncio.gather( | |
*[ | |
self.make_request( | |
client, | |
method, | |
u, | |
headers, | |
rec, | |
timeout, | |
follow_redirects=follow_redirects, | |
save_to_file=save_to_file, | |
include_httpx_metadata=include_httpx_metadata, | |
) | |
for u, rec in zip(urls, bodies, strict=True) | |
] | |
) | |
self.status = results | |
return results | |
def _response_info(self, response: httpx.Response, *, with_file_path: bool = False) -> tuple[bool, Path | None]: | |
"""Determine the file path and whether the response content is binary. | |
Args: | |
response (Response): The HTTP response object. | |
with_file_path (bool): Whether to save the response content to a file. | |
Returns: | |
Tuple[bool, Path | None]: | |
A tuple containing a boolean indicating if the content is binary and the full file path (if applicable). | |
""" | |
# Determine if the content is binary | |
content_type = response.headers.get("Content-Type", "") | |
is_binary = "application/octet-stream" in content_type or "application/binary" in content_type | |
if not with_file_path: | |
return is_binary, None | |
# Step 1: Set up a subdirectory for the component in the OS temp directory | |
component_temp_dir = Path(tempfile.gettempdir()) / self.__class__.__name__ | |
component_temp_dir.mkdir(parents=True, exist_ok=True) | |
# Step 2: Extract filename from Content-Disposition | |
filename = None | |
if "Content-Disposition" in response.headers: | |
content_disposition = response.headers["Content-Disposition"] | |
filename_match = re.search(r'filename="(.+?)"', content_disposition) | |
if not filename_match: # Try to match RFC 5987 style | |
filename_match = re.search(r"filename\*=(?:UTF-8'')?(.+)", content_disposition) | |
if filename_match: | |
extracted_filename = filename_match.group(1) | |
# Ensure the filename is unique | |
if (component_temp_dir / extracted_filename).exists(): | |
timestamp = datetime.now(ZoneInfo("UTC")).strftime("%Y%m%d%H%M%S%f") | |
filename = f"{timestamp}-{extracted_filename}" | |
else: | |
filename = extracted_filename | |
# Step 3: Infer file extension or use part of the request URL if no filename | |
if not filename: | |
# Extract the last segment of the URL path | |
url_path = urlparse(str(response.request.url)).path | |
base_name = Path(url_path).name # Get the last segment of the path | |
if not base_name: # If the path ends with a slash or is empty | |
base_name = "response" | |
# Infer file extension | |
extension = mimetypes.guess_extension(content_type.split(";")[0]) if content_type else None | |
if not extension: | |
extension = ".bin" if is_binary else ".txt" # Default extensions | |
# Combine the base name with timestamp and extension | |
timestamp = datetime.now(ZoneInfo("UTC")).strftime("%Y%m%d%H%M%S%f") | |
filename = f"{timestamp}-{base_name}{extension}" | |
# Step 4: Define the full file path | |
file_path = component_temp_dir / filename | |
return is_binary, file_path | |
def _headers_to_dict(self, headers: httpx.Headers) -> dict[str, str]: | |
"""Convert HTTP headers to a dictionary with lowercased keys.""" | |
return {k.lower(): v for k, v in headers.items()} | |