Spaces:
Running
Running
from __future__ import annotations | |
import base64 | |
import binascii | |
import collections.abc as cabc | |
import typing as t | |
from ..http import dump_header | |
from ..http import parse_dict_header | |
from ..http import quote_header_value | |
from .structures import CallbackDict | |
if t.TYPE_CHECKING: | |
import typing_extensions as te | |
class Authorization: | |
"""Represents the parts of an ``Authorization`` request header. | |
:attr:`.Request.authorization` returns an instance if the header is set. | |
An instance can be used with the test :class:`.Client` request methods' ``auth`` | |
parameter to send the header in test requests. | |
Depending on the auth scheme, either :attr:`parameters` or :attr:`token` will be | |
set. The ``Basic`` scheme's token is decoded into the ``username`` and ``password`` | |
parameters. | |
For convenience, ``auth["key"]`` and ``auth.key`` both access the key in the | |
:attr:`parameters` dict, along with ``auth.get("key")`` and ``"key" in auth``. | |
.. versionchanged:: 2.3 | |
The ``token`` parameter and attribute was added to support auth schemes that use | |
a token instead of parameters, such as ``Bearer``. | |
.. versionchanged:: 2.3 | |
The object is no longer a ``dict``. | |
.. versionchanged:: 0.5 | |
The object is an immutable dict. | |
""" | |
def __init__( | |
self, | |
auth_type: str, | |
data: dict[str, str | None] | None = None, | |
token: str | None = None, | |
) -> None: | |
self.type = auth_type | |
"""The authorization scheme, like ``basic``, ``digest``, or ``bearer``.""" | |
if data is None: | |
data = {} | |
self.parameters = data | |
"""A dict of parameters parsed from the header. Either this or :attr:`token` | |
will have a value for a given scheme. | |
""" | |
self.token = token | |
"""A token parsed from the header. Either this or :attr:`parameters` will have a | |
value for a given scheme. | |
.. versionadded:: 2.3 | |
""" | |
def __getattr__(self, name: str) -> str | None: | |
return self.parameters.get(name) | |
def __getitem__(self, name: str) -> str | None: | |
return self.parameters.get(name) | |
def get(self, key: str, default: str | None = None) -> str | None: | |
return self.parameters.get(key, default) | |
def __contains__(self, key: str) -> bool: | |
return key in self.parameters | |
def __eq__(self, other: object) -> bool: | |
if not isinstance(other, Authorization): | |
return NotImplemented | |
return ( | |
other.type == self.type | |
and other.token == self.token | |
and other.parameters == self.parameters | |
) | |
def from_header(cls, value: str | None) -> te.Self | None: | |
"""Parse an ``Authorization`` header value and return an instance, or ``None`` | |
if the value is empty. | |
:param value: The header value to parse. | |
.. versionadded:: 2.3 | |
""" | |
if not value: | |
return None | |
scheme, _, rest = value.partition(" ") | |
scheme = scheme.lower() | |
rest = rest.strip() | |
if scheme == "basic": | |
try: | |
username, _, password = base64.b64decode(rest).decode().partition(":") | |
except (binascii.Error, UnicodeError): | |
return None | |
return cls(scheme, {"username": username, "password": password}) | |
if "=" in rest.rstrip("="): | |
# = that is not trailing, this is parameters. | |
return cls(scheme, parse_dict_header(rest), None) | |
# No = or only trailing =, this is a token. | |
return cls(scheme, None, rest) | |
def to_header(self) -> str: | |
"""Produce an ``Authorization`` header value representing this data. | |
.. versionadded:: 2.0 | |
""" | |
if self.type == "basic": | |
value = base64.b64encode( | |
f"{self.username}:{self.password}".encode() | |
).decode("ascii") | |
return f"Basic {value}" | |
if self.token is not None: | |
return f"{self.type.title()} {self.token}" | |
return f"{self.type.title()} {dump_header(self.parameters)}" | |
def __str__(self) -> str: | |
return self.to_header() | |
def __repr__(self) -> str: | |
return f"<{type(self).__name__} {self.to_header()}>" | |
class WWWAuthenticate: | |
"""Represents the parts of a ``WWW-Authenticate`` response header. | |
Set :attr:`.Response.www_authenticate` to an instance of list of instances to set | |
values for this header in the response. Modifying this instance will modify the | |
header value. | |
Depending on the auth scheme, either :attr:`parameters` or :attr:`token` should be | |
set. The ``Basic`` scheme will encode ``username`` and ``password`` parameters to a | |
token. | |
For convenience, ``auth["key"]`` and ``auth.key`` both act on the :attr:`parameters` | |
dict, and can be used to get, set, or delete parameters. ``auth.get("key")`` and | |
``"key" in auth`` are also provided. | |
.. versionchanged:: 2.3 | |
The ``token`` parameter and attribute was added to support auth schemes that use | |
a token instead of parameters, such as ``Bearer``. | |
.. versionchanged:: 2.3 | |
The object is no longer a ``dict``. | |
.. versionchanged:: 2.3 | |
The ``on_update`` parameter was removed. | |
""" | |
def __init__( | |
self, | |
auth_type: str, | |
values: dict[str, str | None] | None = None, | |
token: str | None = None, | |
): | |
self._type = auth_type.lower() | |
self._parameters: dict[str, str | None] = CallbackDict( | |
values, lambda _: self._trigger_on_update() | |
) | |
self._token = token | |
self._on_update: cabc.Callable[[WWWAuthenticate], None] | None = None | |
def _trigger_on_update(self) -> None: | |
if self._on_update is not None: | |
self._on_update(self) | |
def type(self) -> str: | |
"""The authorization scheme, like ``basic``, ``digest``, or ``bearer``.""" | |
return self._type | |
def type(self, value: str) -> None: | |
self._type = value | |
self._trigger_on_update() | |
def parameters(self) -> dict[str, str | None]: | |
"""A dict of parameters for the header. Only one of this or :attr:`token` should | |
have a value for a given scheme. | |
""" | |
return self._parameters | |
def parameters(self, value: dict[str, str]) -> None: | |
self._parameters = CallbackDict(value, lambda _: self._trigger_on_update()) | |
self._trigger_on_update() | |
def token(self) -> str | None: | |
"""A dict of parameters for the header. Only one of this or :attr:`token` should | |
have a value for a given scheme. | |
""" | |
return self._token | |
def token(self, value: str | None) -> None: | |
"""A token for the header. Only one of this or :attr:`parameters` should have a | |
value for a given scheme. | |
.. versionadded:: 2.3 | |
""" | |
self._token = value | |
self._trigger_on_update() | |
def __getitem__(self, key: str) -> str | None: | |
return self.parameters.get(key) | |
def __setitem__(self, key: str, value: str | None) -> None: | |
if value is None: | |
if key in self.parameters: | |
del self.parameters[key] | |
else: | |
self.parameters[key] = value | |
self._trigger_on_update() | |
def __delitem__(self, key: str) -> None: | |
if key in self.parameters: | |
del self.parameters[key] | |
self._trigger_on_update() | |
def __getattr__(self, name: str) -> str | None: | |
return self[name] | |
def __setattr__(self, name: str, value: str | None) -> None: | |
if name in {"_type", "_parameters", "_token", "_on_update"}: | |
super().__setattr__(name, value) | |
else: | |
self[name] = value | |
def __delattr__(self, name: str) -> None: | |
del self[name] | |
def __contains__(self, key: str) -> bool: | |
return key in self.parameters | |
def __eq__(self, other: object) -> bool: | |
if not isinstance(other, WWWAuthenticate): | |
return NotImplemented | |
return ( | |
other.type == self.type | |
and other.token == self.token | |
and other.parameters == self.parameters | |
) | |
def get(self, key: str, default: str | None = None) -> str | None: | |
return self.parameters.get(key, default) | |
def from_header(cls, value: str | None) -> te.Self | None: | |
"""Parse a ``WWW-Authenticate`` header value and return an instance, or ``None`` | |
if the value is empty. | |
:param value: The header value to parse. | |
.. versionadded:: 2.3 | |
""" | |
if not value: | |
return None | |
scheme, _, rest = value.partition(" ") | |
scheme = scheme.lower() | |
rest = rest.strip() | |
if "=" in rest.rstrip("="): | |
# = that is not trailing, this is parameters. | |
return cls(scheme, parse_dict_header(rest), None) | |
# No = or only trailing =, this is a token. | |
return cls(scheme, None, rest) | |
def to_header(self) -> str: | |
"""Produce a ``WWW-Authenticate`` header value representing this data.""" | |
if self.token is not None: | |
return f"{self.type.title()} {self.token}" | |
if self.type == "digest": | |
items = [] | |
for key, value in self.parameters.items(): | |
if key in {"realm", "domain", "nonce", "opaque", "qop"}: | |
value = quote_header_value(value, allow_token=False) | |
else: | |
value = quote_header_value(value) | |
items.append(f"{key}={value}") | |
return f"Digest {', '.join(items)}" | |
return f"{self.type.title()} {dump_header(self.parameters)}" | |
def __str__(self) -> str: | |
return self.to_header() | |
def __repr__(self) -> str: | |
return f"<{type(self).__name__} {self.to_header()}>" | |