Spaces:
Running
Running
File size: 7,020 Bytes
47b2311 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 |
from __future__ import annotations
import collections.abc as cabc
import typing as t
from datetime import datetime
if t.TYPE_CHECKING:
import typing_extensions as te
T = t.TypeVar("T")
class IfRange:
"""Very simple object that represents the `If-Range` header in parsed
form. It will either have neither a etag or date or one of either but
never both.
.. versionadded:: 0.7
"""
def __init__(self, etag: str | None = None, date: datetime | None = None):
#: The etag parsed and unquoted. Ranges always operate on strong
#: etags so the weakness information is not necessary.
self.etag = etag
#: The date in parsed format or `None`.
self.date = date
def to_header(self) -> str:
"""Converts the object back into an HTTP header."""
if self.date is not None:
return http.http_date(self.date)
if self.etag is not None:
return http.quote_etag(self.etag)
return ""
def __str__(self) -> str:
return self.to_header()
def __repr__(self) -> str:
return f"<{type(self).__name__} {str(self)!r}>"
class Range:
"""Represents a ``Range`` header. All methods only support only
bytes as the unit. Stores a list of ranges if given, but the methods
only work if only one range is provided.
:raise ValueError: If the ranges provided are invalid.
.. versionchanged:: 0.15
The ranges passed in are validated.
.. versionadded:: 0.7
"""
def __init__(
self, units: str, ranges: cabc.Sequence[tuple[int, int | None]]
) -> None:
#: The units of this range. Usually "bytes".
self.units = units
#: A list of ``(begin, end)`` tuples for the range header provided.
#: The ranges are non-inclusive.
self.ranges = ranges
for start, end in ranges:
if start is None or (end is not None and (start < 0 or start >= end)):
raise ValueError(f"{(start, end)} is not a valid range.")
def range_for_length(self, length: int | None) -> tuple[int, int] | None:
"""If the range is for bytes, the length is not None and there is
exactly one range and it is satisfiable it returns a ``(start, stop)``
tuple, otherwise `None`.
"""
if self.units != "bytes" or length is None or len(self.ranges) != 1:
return None
start, end = self.ranges[0]
if end is None:
end = length
if start < 0:
start += length
if http.is_byte_range_valid(start, end, length):
return start, min(end, length)
return None
def make_content_range(self, length: int | None) -> ContentRange | None:
"""Creates a :class:`~werkzeug.datastructures.ContentRange` object
from the current range and given content length.
"""
rng = self.range_for_length(length)
if rng is not None:
return ContentRange(self.units, rng[0], rng[1], length)
return None
def to_header(self) -> str:
"""Converts the object back into an HTTP header."""
ranges = []
for begin, end in self.ranges:
if end is None:
ranges.append(f"{begin}-" if begin >= 0 else str(begin))
else:
ranges.append(f"{begin}-{end - 1}")
return f"{self.units}={','.join(ranges)}"
def to_content_range_header(self, length: int | None) -> str | None:
"""Converts the object into `Content-Range` HTTP header,
based on given length
"""
range = self.range_for_length(length)
if range is not None:
return f"{self.units} {range[0]}-{range[1] - 1}/{length}"
return None
def __str__(self) -> str:
return self.to_header()
def __repr__(self) -> str:
return f"<{type(self).__name__} {str(self)!r}>"
class _CallbackProperty(t.Generic[T]):
def __set_name__(self, owner: type[ContentRange], name: str) -> None:
self.attr = f"_{name}"
@t.overload
def __get__(self, instance: None, owner: None) -> te.Self: ...
@t.overload
def __get__(self, instance: ContentRange, owner: type[ContentRange]) -> T: ...
def __get__(
self, instance: ContentRange | None, owner: type[ContentRange] | None
) -> te.Self | T:
if instance is None:
return self
return instance.__dict__[self.attr] # type: ignore[no-any-return]
def __set__(self, instance: ContentRange, value: T) -> None:
instance.__dict__[self.attr] = value
if instance.on_update is not None:
instance.on_update(instance)
class ContentRange:
"""Represents the content range header.
.. versionadded:: 0.7
"""
def __init__(
self,
units: str | None,
start: int | None,
stop: int | None,
length: int | None = None,
on_update: cabc.Callable[[ContentRange], None] | None = None,
) -> None:
self.on_update = on_update
self.set(start, stop, length, units)
#: The units to use, usually "bytes"
units: str | None = _CallbackProperty() # type: ignore[assignment]
#: The start point of the range or `None`.
start: int | None = _CallbackProperty() # type: ignore[assignment]
#: The stop point of the range (non-inclusive) or `None`. Can only be
#: `None` if also start is `None`.
stop: int | None = _CallbackProperty() # type: ignore[assignment]
#: The length of the range or `None`.
length: int | None = _CallbackProperty() # type: ignore[assignment]
def set(
self,
start: int | None,
stop: int | None,
length: int | None = None,
units: str | None = "bytes",
) -> None:
"""Simple method to update the ranges."""
assert http.is_byte_range_valid(start, stop, length), "Bad range provided"
self._units: str | None = units
self._start: int | None = start
self._stop: int | None = stop
self._length: int | None = length
if self.on_update is not None:
self.on_update(self)
def unset(self) -> None:
"""Sets the units to `None` which indicates that the header should
no longer be used.
"""
self.set(None, None, units=None)
def to_header(self) -> str:
if self._units is None:
return ""
if self._length is None:
length: str | int = "*"
else:
length = self._length
if self._start is None:
return f"{self._units} */{length}"
return f"{self._units} {self._start}-{self._stop - 1}/{length}" # type: ignore[operator]
def __bool__(self) -> bool:
return self._units is not None
def __str__(self) -> str:
return self.to_header()
def __repr__(self) -> str:
return f"<{type(self).__name__} {str(self)!r}>"
# circular dependencies
from .. import http
|