Spaces:
Sleeping
Sleeping
from __future__ import annotations | |
import functools | |
import json | |
import sys | |
import typing | |
import click | |
import httpcore | |
import pygments.lexers | |
import pygments.util | |
import rich.console | |
import rich.markup | |
import rich.progress | |
import rich.syntax | |
import rich.table | |
from ._client import Client | |
from ._exceptions import RequestError | |
from ._models import Response | |
from ._status_codes import codes | |
def print_help() -> None: | |
console = rich.console.Console() | |
console.print("[bold]HTTPX :butterfly:", justify="center") | |
console.print() | |
console.print("A next generation HTTP client.", justify="center") | |
console.print() | |
console.print( | |
"Usage: [bold]httpx[/bold] [cyan]<URL> [OPTIONS][/cyan] ", justify="left" | |
) | |
console.print() | |
table = rich.table.Table.grid(padding=1, pad_edge=True) | |
table.add_column("Parameter", no_wrap=True, justify="left", style="bold") | |
table.add_column("Description") | |
table.add_row( | |
"-m, --method [cyan]METHOD", | |
"Request method, such as GET, POST, PUT, PATCH, DELETE, OPTIONS, HEAD.\n" | |
"[Default: GET, or POST if a request body is included]", | |
) | |
table.add_row( | |
"-p, --params [cyan]<NAME VALUE> ...", | |
"Query parameters to include in the request URL.", | |
) | |
table.add_row( | |
"-c, --content [cyan]TEXT", "Byte content to include in the request body." | |
) | |
table.add_row( | |
"-d, --data [cyan]<NAME VALUE> ...", "Form data to include in the request body." | |
) | |
table.add_row( | |
"-f, --files [cyan]<NAME FILENAME> ...", | |
"Form files to include in the request body.", | |
) | |
table.add_row("-j, --json [cyan]TEXT", "JSON data to include in the request body.") | |
table.add_row( | |
"-h, --headers [cyan]<NAME VALUE> ...", | |
"Include additional HTTP headers in the request.", | |
) | |
table.add_row( | |
"--cookies [cyan]<NAME VALUE> ...", "Cookies to include in the request." | |
) | |
table.add_row( | |
"--auth [cyan]<USER PASS>", | |
"Username and password to include in the request. Specify '-' for the password" | |
" to use a password prompt. Note that using --verbose/-v will expose" | |
" the Authorization header, including the password encoding" | |
" in a trivially reversible format.", | |
) | |
table.add_row( | |
"--proxy [cyan]URL", | |
"Send the request via a proxy. Should be the URL giving the proxy address.", | |
) | |
table.add_row( | |
"--timeout [cyan]FLOAT", | |
"Timeout value to use for network operations, such as establishing the" | |
" connection, reading some data, etc... [Default: 5.0]", | |
) | |
table.add_row("--follow-redirects", "Automatically follow redirects.") | |
table.add_row("--no-verify", "Disable SSL verification.") | |
table.add_row( | |
"--http2", "Send the request using HTTP/2, if the remote server supports it." | |
) | |
table.add_row( | |
"--download [cyan]FILE", | |
"Save the response content as a file, rather than displaying it.", | |
) | |
table.add_row("-v, --verbose", "Verbose output. Show request as well as response.") | |
table.add_row("--help", "Show this message and exit.") | |
console.print(table) | |
def get_lexer_for_response(response: Response) -> str: | |
content_type = response.headers.get("Content-Type") | |
if content_type is not None: | |
mime_type, _, _ = content_type.partition(";") | |
try: | |
return typing.cast( | |
str, pygments.lexers.get_lexer_for_mimetype(mime_type.strip()).name | |
) | |
except pygments.util.ClassNotFound: # pragma: no cover | |
pass | |
return "" # pragma: no cover | |
def format_request_headers(request: httpcore.Request, http2: bool = False) -> str: | |
version = "HTTP/2" if http2 else "HTTP/1.1" | |
headers = [ | |
(name.lower() if http2 else name, value) for name, value in request.headers | |
] | |
method = request.method.decode("ascii") | |
target = request.url.target.decode("ascii") | |
lines = [f"{method} {target} {version}"] + [ | |
f"{name.decode('ascii')}: {value.decode('ascii')}" for name, value in headers | |
] | |
return "\n".join(lines) | |
def format_response_headers( | |
http_version: bytes, | |
status: int, | |
reason_phrase: bytes | None, | |
headers: list[tuple[bytes, bytes]], | |
) -> str: | |
version = http_version.decode("ascii") | |
reason = ( | |
codes.get_reason_phrase(status) | |
if reason_phrase is None | |
else reason_phrase.decode("ascii") | |
) | |
lines = [f"{version} {status} {reason}"] + [ | |
f"{name.decode('ascii')}: {value.decode('ascii')}" for name, value in headers | |
] | |
return "\n".join(lines) | |
def print_request_headers(request: httpcore.Request, http2: bool = False) -> None: | |
console = rich.console.Console() | |
http_text = format_request_headers(request, http2=http2) | |
syntax = rich.syntax.Syntax(http_text, "http", theme="ansi_dark", word_wrap=True) | |
console.print(syntax) | |
syntax = rich.syntax.Syntax("", "http", theme="ansi_dark", word_wrap=True) | |
console.print(syntax) | |
def print_response_headers( | |
http_version: bytes, | |
status: int, | |
reason_phrase: bytes | None, | |
headers: list[tuple[bytes, bytes]], | |
) -> None: | |
console = rich.console.Console() | |
http_text = format_response_headers(http_version, status, reason_phrase, headers) | |
syntax = rich.syntax.Syntax(http_text, "http", theme="ansi_dark", word_wrap=True) | |
console.print(syntax) | |
syntax = rich.syntax.Syntax("", "http", theme="ansi_dark", word_wrap=True) | |
console.print(syntax) | |
def print_response(response: Response) -> None: | |
console = rich.console.Console() | |
lexer_name = get_lexer_for_response(response) | |
if lexer_name: | |
if lexer_name.lower() == "json": | |
try: | |
data = response.json() | |
text = json.dumps(data, indent=4) | |
except ValueError: # pragma: no cover | |
text = response.text | |
else: | |
text = response.text | |
syntax = rich.syntax.Syntax(text, lexer_name, theme="ansi_dark", word_wrap=True) | |
console.print(syntax) | |
else: | |
console.print(f"<{len(response.content)} bytes of binary data>") | |
_PCTRTT = typing.Tuple[typing.Tuple[str, str], ...] | |
_PCTRTTT = typing.Tuple[_PCTRTT, ...] | |
_PeerCertRetDictType = typing.Dict[str, typing.Union[str, _PCTRTTT, _PCTRTT]] | |
def format_certificate(cert: _PeerCertRetDictType) -> str: # pragma: no cover | |
lines = [] | |
for key, value in cert.items(): | |
if isinstance(value, (list, tuple)): | |
lines.append(f"* {key}:") | |
for item in value: | |
if key in ("subject", "issuer"): | |
for sub_item in item: | |
lines.append(f"* {sub_item[0]}: {sub_item[1]!r}") | |
elif isinstance(item, tuple) and len(item) == 2: | |
lines.append(f"* {item[0]}: {item[1]!r}") | |
else: | |
lines.append(f"* {item!r}") | |
else: | |
lines.append(f"* {key}: {value!r}") | |
return "\n".join(lines) | |
def trace( | |
name: str, info: typing.Mapping[str, typing.Any], verbose: bool = False | |
) -> None: | |
console = rich.console.Console() | |
if name == "connection.connect_tcp.started" and verbose: | |
host = info["host"] | |
console.print(f"* Connecting to {host!r}") | |
elif name == "connection.connect_tcp.complete" and verbose: | |
stream = info["return_value"] | |
server_addr = stream.get_extra_info("server_addr") | |
console.print(f"* Connected to {server_addr[0]!r} on port {server_addr[1]}") | |
elif name == "connection.start_tls.complete" and verbose: # pragma: no cover | |
stream = info["return_value"] | |
ssl_object = stream.get_extra_info("ssl_object") | |
version = ssl_object.version() | |
cipher = ssl_object.cipher() | |
server_cert = ssl_object.getpeercert() | |
alpn = ssl_object.selected_alpn_protocol() | |
console.print(f"* SSL established using {version!r} / {cipher[0]!r}") | |
console.print(f"* Selected ALPN protocol: {alpn!r}") | |
if server_cert: | |
console.print("* Server certificate:") | |
console.print(format_certificate(server_cert)) | |
elif name == "http11.send_request_headers.started" and verbose: | |
request = info["request"] | |
print_request_headers(request, http2=False) | |
elif name == "http2.send_request_headers.started" and verbose: # pragma: no cover | |
request = info["request"] | |
print_request_headers(request, http2=True) | |
elif name == "http11.receive_response_headers.complete": | |
http_version, status, reason_phrase, headers = info["return_value"] | |
print_response_headers(http_version, status, reason_phrase, headers) | |
elif name == "http2.receive_response_headers.complete": # pragma: no cover | |
status, headers = info["return_value"] | |
http_version = b"HTTP/2" | |
reason_phrase = None | |
print_response_headers(http_version, status, reason_phrase, headers) | |
def download_response(response: Response, download: typing.BinaryIO) -> None: | |
console = rich.console.Console() | |
console.print() | |
content_length = response.headers.get("Content-Length") | |
with rich.progress.Progress( | |
"[progress.description]{task.description}", | |
"[progress.percentage]{task.percentage:>3.0f}%", | |
rich.progress.BarColumn(bar_width=None), | |
rich.progress.DownloadColumn(), | |
rich.progress.TransferSpeedColumn(), | |
) as progress: | |
description = f"Downloading [bold]{rich.markup.escape(download.name)}" | |
download_task = progress.add_task( | |
description, | |
total=int(content_length or 0), | |
start=content_length is not None, | |
) | |
for chunk in response.iter_bytes(): | |
download.write(chunk) | |
progress.update(download_task, completed=response.num_bytes_downloaded) | |
def validate_json( | |
ctx: click.Context, | |
param: click.Option | click.Parameter, | |
value: typing.Any, | |
) -> typing.Any: | |
if value is None: | |
return None | |
try: | |
return json.loads(value) | |
except json.JSONDecodeError: # pragma: no cover | |
raise click.BadParameter("Not valid JSON") | |
def validate_auth( | |
ctx: click.Context, | |
param: click.Option | click.Parameter, | |
value: typing.Any, | |
) -> typing.Any: | |
if value == (None, None): | |
return None | |
username, password = value | |
if password == "-": # pragma: no cover | |
password = click.prompt("Password", hide_input=True) | |
return (username, password) | |
def handle_help( | |
ctx: click.Context, | |
param: click.Option | click.Parameter, | |
value: typing.Any, | |
) -> None: | |
if not value or ctx.resilient_parsing: | |
return | |
print_help() | |
ctx.exit() | |
def main( | |
url: str, | |
method: str, | |
params: list[tuple[str, str]], | |
content: str, | |
data: list[tuple[str, str]], | |
files: list[tuple[str, click.File]], | |
json: str, | |
headers: list[tuple[str, str]], | |
cookies: list[tuple[str, str]], | |
auth: tuple[str, str] | None, | |
proxy: str, | |
timeout: float, | |
follow_redirects: bool, | |
verify: bool, | |
http2: bool, | |
download: typing.BinaryIO | None, | |
verbose: bool, | |
) -> None: | |
""" | |
An HTTP command line client. | |
Sends a request and displays the response. | |
""" | |
if not method: | |
method = "POST" if content or data or files or json else "GET" | |
try: | |
with Client( | |
proxy=proxy, | |
timeout=timeout, | |
verify=verify, | |
http2=http2, | |
) as client: | |
with client.stream( | |
method, | |
url, | |
params=list(params), | |
content=content, | |
data=dict(data), | |
files=files, # type: ignore | |
json=json, | |
headers=headers, | |
cookies=dict(cookies), | |
auth=auth, | |
follow_redirects=follow_redirects, | |
extensions={"trace": functools.partial(trace, verbose=verbose)}, | |
) as response: | |
if download is not None: | |
download_response(response, download) | |
else: | |
response.read() | |
if response.content: | |
print_response(response) | |
except RequestError as exc: | |
console = rich.console.Console() | |
console.print(f"[red]{type(exc).__name__}[/red]: {exc}") | |
sys.exit(1) | |
sys.exit(0 if response.is_success else 1) | |