Tai Truong
fix readme
d202ada
import asyncio
import json
import mimetypes
import re
import tempfile
from datetime import datetime
from pathlib import Path
from typing import Any
from urllib.parse import parse_qsl, urlencode, urlparse, urlunparse
from zoneinfo import ZoneInfo
import httpx
import validators
from langflow.base.curl.parse import parse_context
from langflow.custom import Component
from langflow.io import BoolInput, DataInput, DropdownInput, IntInput, MessageTextInput, NestedDictInput, Output
from langflow.schema import Data
from langflow.schema.dotdict import dotdict
class APIRequestComponent(Component):
display_name = "API Request"
description = (
"This component allows you to make HTTP requests to one or more URLs. "
"You can provide headers and body as either dictionaries or Data objects. "
"Additionally, you can append query parameters to the URLs.\n\n"
"**Note:** Check advanced options for more settings."
)
icon = "Globe"
name = "APIRequest"
inputs = [
MessageTextInput(
name="urls",
display_name="URLs",
list=True,
info="Enter one or more URLs, separated by commas.",
),
MessageTextInput(
name="curl",
display_name="cURL",
info="Paste a curl command to populate the fields. "
"This will fill in the dictionary fields for headers and body.",
advanced=False,
refresh_button=True,
real_time_refresh=True,
tool_mode=True,
),
DropdownInput(
name="method",
display_name="Method",
options=["GET", "POST", "PATCH", "PUT"],
value="GET",
info="The HTTP method to use (GET, POST, PATCH, PUT).",
),
NestedDictInput(
name="headers",
display_name="Headers",
info="The headers to send with the request as a dictionary. This is populated when using the CURL field.",
input_types=["Data"],
),
NestedDictInput(
name="body",
display_name="Body",
info="The body to send with the request as a dictionary (for POST, PATCH, PUT). "
"This is populated when using the CURL field.",
input_types=["Data"],
),
DataInput(
name="query_params",
display_name="Query Parameters",
info="The query parameters to append to the URL.",
tool_mode=True,
),
IntInput(
name="timeout",
display_name="Timeout",
value=5,
info="The timeout to use for the request.",
),
BoolInput(
name="follow_redirects",
display_name="Follow Redirects",
value=True,
info="Whether to follow http redirects.",
advanced=True,
),
BoolInput(
name="save_to_file",
display_name="Save to File",
value=False,
info="Save the API response to a temporary file",
advanced=True,
),
BoolInput(
name="include_httpx_metadata",
display_name="Include HTTPx Metadata",
value=False,
info=(
"Include properties such as headers, status_code, response_headers, "
"and redirection_history in the output."
),
advanced=True,
),
]
outputs = [
Output(display_name="Data", name="data", method="make_requests"),
]
def parse_curl(self, curl: str, build_config: dotdict) -> dotdict:
try:
parsed = parse_context(curl)
build_config["urls"]["value"] = [parsed.url]
build_config["method"]["value"] = parsed.method.upper()
build_config["headers"]["value"] = dict(parsed.headers)
if parsed.data:
try:
json_data = json.loads(parsed.data)
build_config["body"]["value"] = json_data
except json.JSONDecodeError:
self.log("Error decoding JSON data")
else:
build_config["body"]["value"] = {}
except Exception as exc:
msg = f"Error parsing curl: {exc}"
self.log(msg)
raise ValueError(msg) from exc
return build_config
def update_build_config(self, build_config: dotdict, field_value: Any, field_name: str | None = None):
if field_name == "curl" and field_value:
build_config = self.parse_curl(field_value, build_config)
return build_config
async def make_request(
self,
client: httpx.AsyncClient,
method: str,
url: str,
headers: dict | None = None,
body: dict | None = None,
timeout: int = 5,
*,
follow_redirects: bool = True,
save_to_file: bool = False,
include_httpx_metadata: bool = False,
) -> Data:
method = method.upper()
if method not in {"GET", "POST", "PATCH", "PUT", "DELETE"}:
msg = f"Unsupported method: {method}"
raise ValueError(msg)
if isinstance(body, str) and body:
try:
body = json.loads(body)
except Exception as e:
msg = f"Error decoding JSON data: {e}"
self.log.exception(msg)
body = None
raise ValueError(msg) from e
data = body or None
redirection_history = []
try:
response = await client.request(
method,
url,
headers=headers,
json=data,
timeout=timeout,
follow_redirects=follow_redirects,
)
redirection_history = [
{"url": str(redirect.url), "status_code": redirect.status_code} for redirect in response.history
]
if response.is_redirect:
redirection_history.append({"url": str(response.url), "status_code": response.status_code})
is_binary, file_path = self._response_info(response, with_file_path=save_to_file)
response_headers = self._headers_to_dict(response.headers)
metadata: dict[str, Any] = {
"source": url,
}
if save_to_file:
mode = "wb" if is_binary else "w"
encoding = response.encoding if mode == "w" else None
if file_path:
with file_path.open(mode, encoding=encoding) as f:
f.write(response.content if is_binary else response.text)
if include_httpx_metadata:
metadata.update(
{
"file_path": str(file_path),
"headers": headers,
"status_code": response.status_code,
"response_headers": response_headers,
**({"redirection_history": redirection_history} if redirection_history else {}),
}
)
return Data(data=metadata)
# Populate result when not saving to a file
if is_binary:
result = response.content
else:
try:
result = response.json()
except Exception: # noqa: BLE001
self.log("Error decoding JSON response")
result = response.text.encode("utf-8")
# Add result to metadata
metadata.update({"result": result})
# Add metadata to the output
if include_httpx_metadata:
metadata.update(
{
"headers": headers,
"status_code": response.status_code,
"response_headers": response_headers,
**({"redirection_history": redirection_history} if redirection_history else {}),
}
)
return Data(data=metadata)
except httpx.TimeoutException:
return Data(
data={
"source": url,
"headers": headers,
"status_code": 408,
"error": "Request timed out",
},
)
except Exception as exc: # noqa: BLE001
self.log(f"Error making request to {url}")
return Data(
data={
"source": url,
"headers": headers,
"status_code": 500,
"error": str(exc),
**({"redirection_history": redirection_history} if redirection_history else {}),
},
)
def add_query_params(self, url: str, params: dict) -> str:
url_parts = list(urlparse(url))
query = dict(parse_qsl(url_parts[4]))
query.update(params)
url_parts[4] = urlencode(query)
return urlunparse(url_parts)
async def make_requests(self) -> list[Data]:
method = self.method
urls = [url.strip() for url in self.urls if url.strip()]
curl = self.curl
headers = self.headers or {}
body = self.body or {}
timeout = self.timeout
follow_redirects = self.follow_redirects
save_to_file = self.save_to_file
include_httpx_metadata = self.include_httpx_metadata
invalid_urls = [url for url in urls if not validators.url(url)]
if invalid_urls:
msg = f"Invalid URLs provided: {invalid_urls}"
raise ValueError(msg)
if isinstance(self.query_params, str):
query_params = dict(parse_qsl(self.query_params))
else:
query_params = self.query_params.data if self.query_params else {}
if curl:
self._build_config = self.parse_curl(curl, dotdict())
if isinstance(headers, Data):
headers = headers.data
if isinstance(body, Data):
body = body.data
bodies = [body] * len(urls)
urls = [self.add_query_params(url, query_params) for url in urls]
async with httpx.AsyncClient() as client:
results = await asyncio.gather(
*[
self.make_request(
client,
method,
u,
headers,
rec,
timeout,
follow_redirects=follow_redirects,
save_to_file=save_to_file,
include_httpx_metadata=include_httpx_metadata,
)
for u, rec in zip(urls, bodies, strict=True)
]
)
self.status = results
return results
def _response_info(self, response: httpx.Response, *, with_file_path: bool = False) -> tuple[bool, Path | None]:
"""Determine the file path and whether the response content is binary.
Args:
response (Response): The HTTP response object.
with_file_path (bool): Whether to save the response content to a file.
Returns:
Tuple[bool, Path | None]:
A tuple containing a boolean indicating if the content is binary and the full file path (if applicable).
"""
# Determine if the content is binary
content_type = response.headers.get("Content-Type", "")
is_binary = "application/octet-stream" in content_type or "application/binary" in content_type
if not with_file_path:
return is_binary, None
# Step 1: Set up a subdirectory for the component in the OS temp directory
component_temp_dir = Path(tempfile.gettempdir()) / self.__class__.__name__
component_temp_dir.mkdir(parents=True, exist_ok=True)
# Step 2: Extract filename from Content-Disposition
filename = None
if "Content-Disposition" in response.headers:
content_disposition = response.headers["Content-Disposition"]
filename_match = re.search(r'filename="(.+?)"', content_disposition)
if not filename_match: # Try to match RFC 5987 style
filename_match = re.search(r"filename\*=(?:UTF-8'')?(.+)", content_disposition)
if filename_match:
extracted_filename = filename_match.group(1)
# Ensure the filename is unique
if (component_temp_dir / extracted_filename).exists():
timestamp = datetime.now(ZoneInfo("UTC")).strftime("%Y%m%d%H%M%S%f")
filename = f"{timestamp}-{extracted_filename}"
else:
filename = extracted_filename
# Step 3: Infer file extension or use part of the request URL if no filename
if not filename:
# Extract the last segment of the URL path
url_path = urlparse(str(response.request.url)).path
base_name = Path(url_path).name # Get the last segment of the path
if not base_name: # If the path ends with a slash or is empty
base_name = "response"
# Infer file extension
extension = mimetypes.guess_extension(content_type.split(";")[0]) if content_type else None
if not extension:
extension = ".bin" if is_binary else ".txt" # Default extensions
# Combine the base name with timestamp and extension
timestamp = datetime.now(ZoneInfo("UTC")).strftime("%Y%m%d%H%M%S%f")
filename = f"{timestamp}-{base_name}{extension}"
# Step 4: Define the full file path
file_path = component_temp_dir / filename
return is_binary, file_path
def _headers_to_dict(self, headers: httpx.Headers) -> dict[str, str]:
"""Convert HTTP headers to a dictionary with lowercased keys."""
return {k.lower(): v for k, v in headers.items()}