Spaces:
Sleeping
Sleeping
from __future__ import annotations | |
import typing | |
from types import TracebackType | |
from .._models import Request, Response | |
T = typing.TypeVar("T", bound="BaseTransport") | |
A = typing.TypeVar("A", bound="AsyncBaseTransport") | |
__all__ = ["AsyncBaseTransport", "BaseTransport"] | |
class BaseTransport: | |
def __enter__(self: T) -> T: | |
return self | |
def __exit__( | |
self, | |
exc_type: type[BaseException] | None = None, | |
exc_value: BaseException | None = None, | |
traceback: TracebackType | None = None, | |
) -> None: | |
self.close() | |
def handle_request(self, request: Request) -> Response: | |
""" | |
Send a single HTTP request and return a response. | |
Developers shouldn't typically ever need to call into this API directly, | |
since the Client class provides all the higher level user-facing API | |
niceties. | |
In order to properly release any network resources, the response | |
stream should *either* be consumed immediately, with a call to | |
`response.stream.read()`, or else the `handle_request` call should | |
be followed with a try/finally block to ensuring the stream is | |
always closed. | |
Example usage: | |
with httpx.HTTPTransport() as transport: | |
req = httpx.Request( | |
method=b"GET", | |
url=(b"https", b"www.example.com", 443, b"/"), | |
headers=[(b"Host", b"www.example.com")], | |
) | |
resp = transport.handle_request(req) | |
body = resp.stream.read() | |
print(resp.status_code, resp.headers, body) | |
Takes a `Request` instance as the only argument. | |
Returns a `Response` instance. | |
""" | |
raise NotImplementedError( | |
"The 'handle_request' method must be implemented." | |
) # pragma: no cover | |
def close(self) -> None: | |
pass | |
class AsyncBaseTransport: | |
async def __aenter__(self: A) -> A: | |
return self | |
async def __aexit__( | |
self, | |
exc_type: type[BaseException] | None = None, | |
exc_value: BaseException | None = None, | |
traceback: TracebackType | None = None, | |
) -> None: | |
await self.aclose() | |
async def handle_async_request( | |
self, | |
request: Request, | |
) -> Response: | |
raise NotImplementedError( | |
"The 'handle_async_request' method must be implemented." | |
) # pragma: no cover | |
async def aclose(self) -> None: | |
pass | |