Spaces:
Running
Running
from __future__ import annotations | |
import collections.abc as cabc | |
import time | |
import typing as t | |
from datetime import datetime | |
from datetime import timezone | |
from .encoding import base64_decode | |
from .encoding import base64_encode | |
from .encoding import bytes_to_int | |
from .encoding import int_to_bytes | |
from .encoding import want_bytes | |
from .exc import BadSignature | |
from .exc import BadTimeSignature | |
from .exc import SignatureExpired | |
from .serializer import _TSerialized | |
from .serializer import Serializer | |
from .signer import Signer | |
class TimestampSigner(Signer): | |
"""Works like the regular :class:`.Signer` but also records the time | |
of the signing and can be used to expire signatures. The | |
:meth:`unsign` method can raise :exc:`.SignatureExpired` if the | |
unsigning failed because the signature is expired. | |
""" | |
def get_timestamp(self) -> int: | |
"""Returns the current timestamp. The function must return an | |
integer. | |
""" | |
return int(time.time()) | |
def timestamp_to_datetime(self, ts: int) -> datetime: | |
"""Convert the timestamp from :meth:`get_timestamp` into an | |
aware :class`datetime.datetime` in UTC. | |
.. versionchanged:: 2.0 | |
The timestamp is returned as a timezone-aware ``datetime`` | |
in UTC rather than a naive ``datetime`` assumed to be UTC. | |
""" | |
return datetime.fromtimestamp(ts, tz=timezone.utc) | |
def sign(self, value: str | bytes) -> bytes: | |
"""Signs the given string and also attaches time information.""" | |
value = want_bytes(value) | |
timestamp = base64_encode(int_to_bytes(self.get_timestamp())) | |
sep = want_bytes(self.sep) | |
value = value + sep + timestamp | |
return value + sep + self.get_signature(value) | |
# Ignore overlapping signatures check, return_timestamp is the only | |
# parameter that affects the return type. | |
def unsign( # type: ignore[overload-overlap] | |
self, | |
signed_value: str | bytes, | |
max_age: int | None = None, | |
return_timestamp: t.Literal[False] = False, | |
) -> bytes: ... | |
def unsign( | |
self, | |
signed_value: str | bytes, | |
max_age: int | None = None, | |
return_timestamp: t.Literal[True] = True, | |
) -> tuple[bytes, datetime]: ... | |
def unsign( | |
self, | |
signed_value: str | bytes, | |
max_age: int | None = None, | |
return_timestamp: bool = False, | |
) -> tuple[bytes, datetime] | bytes: | |
"""Works like the regular :meth:`.Signer.unsign` but can also | |
validate the time. See the base docstring of the class for | |
the general behavior. If ``return_timestamp`` is ``True`` the | |
timestamp of the signature will be returned as an aware | |
:class:`datetime.datetime` object in UTC. | |
.. versionchanged:: 2.0 | |
The timestamp is returned as a timezone-aware ``datetime`` | |
in UTC rather than a naive ``datetime`` assumed to be UTC. | |
""" | |
try: | |
result = super().unsign(signed_value) | |
sig_error = None | |
except BadSignature as e: | |
sig_error = e | |
result = e.payload or b"" | |
sep = want_bytes(self.sep) | |
# If there is no timestamp in the result there is something | |
# seriously wrong. In case there was a signature error, we raise | |
# that one directly, otherwise we have a weird situation in | |
# which we shouldn't have come except someone uses a time-based | |
# serializer on non-timestamp data, so catch that. | |
if sep not in result: | |
if sig_error: | |
raise sig_error | |
raise BadTimeSignature("timestamp missing", payload=result) | |
value, ts_bytes = result.rsplit(sep, 1) | |
ts_int: int | None = None | |
ts_dt: datetime | None = None | |
try: | |
ts_int = bytes_to_int(base64_decode(ts_bytes)) | |
except Exception: | |
pass | |
# Signature is *not* okay. Raise a proper error now that we have | |
# split the value and the timestamp. | |
if sig_error is not None: | |
if ts_int is not None: | |
try: | |
ts_dt = self.timestamp_to_datetime(ts_int) | |
except (ValueError, OSError, OverflowError) as exc: | |
# Windows raises OSError | |
# 32-bit raises OverflowError | |
raise BadTimeSignature( | |
"Malformed timestamp", payload=value | |
) from exc | |
raise BadTimeSignature(str(sig_error), payload=value, date_signed=ts_dt) | |
# Signature was okay but the timestamp is actually not there or | |
# malformed. Should not happen, but we handle it anyway. | |
if ts_int is None: | |
raise BadTimeSignature("Malformed timestamp", payload=value) | |
# Check timestamp is not older than max_age | |
if max_age is not None: | |
age = self.get_timestamp() - ts_int | |
if age > max_age: | |
raise SignatureExpired( | |
f"Signature age {age} > {max_age} seconds", | |
payload=value, | |
date_signed=self.timestamp_to_datetime(ts_int), | |
) | |
if age < 0: | |
raise SignatureExpired( | |
f"Signature age {age} < 0 seconds", | |
payload=value, | |
date_signed=self.timestamp_to_datetime(ts_int), | |
) | |
if return_timestamp: | |
return value, self.timestamp_to_datetime(ts_int) | |
return value | |
def validate(self, signed_value: str | bytes, max_age: int | None = None) -> bool: | |
"""Only validates the given signed value. Returns ``True`` if | |
the signature exists and is valid.""" | |
try: | |
self.unsign(signed_value, max_age=max_age) | |
return True | |
except BadSignature: | |
return False | |
class TimedSerializer(Serializer[_TSerialized]): | |
"""Uses :class:`TimestampSigner` instead of the default | |
:class:`.Signer`. | |
""" | |
default_signer: type[TimestampSigner] = TimestampSigner | |
def iter_unsigners( | |
self, salt: str | bytes | None = None | |
) -> cabc.Iterator[TimestampSigner]: | |
return t.cast("cabc.Iterator[TimestampSigner]", super().iter_unsigners(salt)) | |
# TODO: Signature is incompatible because parameters were added | |
# before salt. | |
def loads( # type: ignore[override] | |
self, | |
s: str | bytes, | |
max_age: int | None = None, | |
return_timestamp: bool = False, | |
salt: str | bytes | None = None, | |
) -> t.Any: | |
"""Reverse of :meth:`dumps`, raises :exc:`.BadSignature` if the | |
signature validation fails. If a ``max_age`` is provided it will | |
ensure the signature is not older than that time in seconds. In | |
case the signature is outdated, :exc:`.SignatureExpired` is | |
raised. All arguments are forwarded to the signer's | |
:meth:`~TimestampSigner.unsign` method. | |
""" | |
s = want_bytes(s) | |
last_exception = None | |
for signer in self.iter_unsigners(salt): | |
try: | |
base64d, timestamp = signer.unsign( | |
s, max_age=max_age, return_timestamp=True | |
) | |
payload = self.load_payload(base64d) | |
if return_timestamp: | |
return payload, timestamp | |
return payload | |
except SignatureExpired: | |
# The signature was unsigned successfully but was | |
# expired. Do not try the next signer. | |
raise | |
except BadSignature as err: | |
last_exception = err | |
raise t.cast(BadSignature, last_exception) | |
def loads_unsafe( # type: ignore[override] | |
self, | |
s: str | bytes, | |
max_age: int | None = None, | |
salt: str | bytes | None = None, | |
) -> tuple[bool, t.Any]: | |
return self._loads_unsafe_impl(s, salt, load_kwargs={"max_age": max_age}) | |