Spaces:
Running
Running
""" | |
Serve Shared Static Files | |
========================= | |
.. autoclass:: SharedDataMiddleware | |
:members: is_allowed | |
:copyright: 2007 Pallets | |
:license: BSD-3-Clause | |
""" | |
from __future__ import annotations | |
import collections.abc as cabc | |
import importlib.util | |
import mimetypes | |
import os | |
import posixpath | |
import typing as t | |
from datetime import datetime | |
from datetime import timezone | |
from io import BytesIO | |
from time import time | |
from zlib import adler32 | |
from ..http import http_date | |
from ..http import is_resource_modified | |
from ..security import safe_join | |
from ..utils import get_content_type | |
from ..wsgi import get_path_info | |
from ..wsgi import wrap_file | |
_TOpener = t.Callable[[], tuple[t.IO[bytes], datetime, int]] | |
_TLoader = t.Callable[[t.Optional[str]], tuple[t.Optional[str], t.Optional[_TOpener]]] | |
if t.TYPE_CHECKING: | |
from _typeshed.wsgi import StartResponse | |
from _typeshed.wsgi import WSGIApplication | |
from _typeshed.wsgi import WSGIEnvironment | |
class SharedDataMiddleware: | |
"""A WSGI middleware which provides static content for development | |
environments or simple server setups. Its usage is quite simple:: | |
import os | |
from werkzeug.middleware.shared_data import SharedDataMiddleware | |
app = SharedDataMiddleware(app, { | |
'/shared': os.path.join(os.path.dirname(__file__), 'shared') | |
}) | |
The contents of the folder ``./shared`` will now be available on | |
``http://example.com/shared/``. This is pretty useful during development | |
because a standalone media server is not required. Files can also be | |
mounted on the root folder and still continue to use the application because | |
the shared data middleware forwards all unhandled requests to the | |
application, even if the requests are below one of the shared folders. | |
If `pkg_resources` is available you can also tell the middleware to serve | |
files from package data:: | |
app = SharedDataMiddleware(app, { | |
'/static': ('myapplication', 'static') | |
}) | |
This will then serve the ``static`` folder in the `myapplication` | |
Python package. | |
The optional `disallow` parameter can be a list of :func:`~fnmatch.fnmatch` | |
rules for files that are not accessible from the web. If `cache` is set to | |
`False` no caching headers are sent. | |
Currently the middleware does not support non-ASCII filenames. If the | |
encoding on the file system happens to match the encoding of the URI it may | |
work but this could also be by accident. We strongly suggest using ASCII | |
only file names for static files. | |
The middleware will guess the mimetype using the Python `mimetype` | |
module. If it's unable to figure out the charset it will fall back | |
to `fallback_mimetype`. | |
:param app: the application to wrap. If you don't want to wrap an | |
application you can pass it :exc:`NotFound`. | |
:param exports: a list or dict of exported files and folders. | |
:param disallow: a list of :func:`~fnmatch.fnmatch` rules. | |
:param cache: enable or disable caching headers. | |
:param cache_timeout: the cache timeout in seconds for the headers. | |
:param fallback_mimetype: The fallback mimetype for unknown files. | |
.. versionchanged:: 1.0 | |
The default ``fallback_mimetype`` is | |
``application/octet-stream``. If a filename looks like a text | |
mimetype, the ``utf-8`` charset is added to it. | |
.. versionadded:: 0.6 | |
Added ``fallback_mimetype``. | |
.. versionchanged:: 0.5 | |
Added ``cache_timeout``. | |
""" | |
def __init__( | |
self, | |
app: WSGIApplication, | |
exports: ( | |
cabc.Mapping[str, str | tuple[str, str]] | |
| t.Iterable[tuple[str, str | tuple[str, str]]] | |
), | |
disallow: None = None, | |
cache: bool = True, | |
cache_timeout: int = 60 * 60 * 12, | |
fallback_mimetype: str = "application/octet-stream", | |
) -> None: | |
self.app = app | |
self.exports: list[tuple[str, _TLoader]] = [] | |
self.cache = cache | |
self.cache_timeout = cache_timeout | |
if isinstance(exports, cabc.Mapping): | |
exports = exports.items() | |
for key, value in exports: | |
if isinstance(value, tuple): | |
loader = self.get_package_loader(*value) | |
elif isinstance(value, str): | |
if os.path.isfile(value): | |
loader = self.get_file_loader(value) | |
else: | |
loader = self.get_directory_loader(value) | |
else: | |
raise TypeError(f"unknown def {value!r}") | |
self.exports.append((key, loader)) | |
if disallow is not None: | |
from fnmatch import fnmatch | |
self.is_allowed = lambda x: not fnmatch(x, disallow) | |
self.fallback_mimetype = fallback_mimetype | |
def is_allowed(self, filename: str) -> bool: | |
"""Subclasses can override this method to disallow the access to | |
certain files. However by providing `disallow` in the constructor | |
this method is overwritten. | |
""" | |
return True | |
def _opener(self, filename: str) -> _TOpener: | |
return lambda: ( | |
open(filename, "rb"), | |
datetime.fromtimestamp(os.path.getmtime(filename), tz=timezone.utc), | |
int(os.path.getsize(filename)), | |
) | |
def get_file_loader(self, filename: str) -> _TLoader: | |
return lambda x: (os.path.basename(filename), self._opener(filename)) | |
def get_package_loader(self, package: str, package_path: str) -> _TLoader: | |
load_time = datetime.now(timezone.utc) | |
spec = importlib.util.find_spec(package) | |
reader = spec.loader.get_resource_reader(package) # type: ignore[union-attr] | |
def loader( | |
path: str | None, | |
) -> tuple[str | None, _TOpener | None]: | |
if path is None: | |
return None, None | |
path = safe_join(package_path, path) | |
if path is None: | |
return None, None | |
basename = posixpath.basename(path) | |
try: | |
resource = reader.open_resource(path) | |
except OSError: | |
return None, None | |
if isinstance(resource, BytesIO): | |
return ( | |
basename, | |
lambda: (resource, load_time, len(resource.getvalue())), | |
) | |
return ( | |
basename, | |
lambda: ( | |
resource, | |
datetime.fromtimestamp( | |
os.path.getmtime(resource.name), tz=timezone.utc | |
), | |
os.path.getsize(resource.name), | |
), | |
) | |
return loader | |
def get_directory_loader(self, directory: str) -> _TLoader: | |
def loader( | |
path: str | None, | |
) -> tuple[str | None, _TOpener | None]: | |
if path is not None: | |
path = safe_join(directory, path) | |
if path is None: | |
return None, None | |
else: | |
path = directory | |
if os.path.isfile(path): | |
return os.path.basename(path), self._opener(path) | |
return None, None | |
return loader | |
def generate_etag(self, mtime: datetime, file_size: int, real_filename: str) -> str: | |
fn_str = os.fsencode(real_filename) | |
timestamp = mtime.timestamp() | |
checksum = adler32(fn_str) & 0xFFFFFFFF | |
return f"wzsdm-{timestamp}-{file_size}-{checksum}" | |
def __call__( | |
self, environ: WSGIEnvironment, start_response: StartResponse | |
) -> t.Iterable[bytes]: | |
path = get_path_info(environ) | |
file_loader = None | |
for search_path, loader in self.exports: | |
if search_path == path: | |
real_filename, file_loader = loader(None) | |
if file_loader is not None: | |
break | |
if not search_path.endswith("/"): | |
search_path += "/" | |
if path.startswith(search_path): | |
real_filename, file_loader = loader(path[len(search_path) :]) | |
if file_loader is not None: | |
break | |
if file_loader is None or not self.is_allowed(real_filename): # type: ignore | |
return self.app(environ, start_response) | |
guessed_type = mimetypes.guess_type(real_filename) # type: ignore | |
mime_type = get_content_type(guessed_type[0] or self.fallback_mimetype, "utf-8") | |
f, mtime, file_size = file_loader() | |
headers = [("Date", http_date())] | |
if self.cache: | |
timeout = self.cache_timeout | |
etag = self.generate_etag(mtime, file_size, real_filename) # type: ignore | |
headers += [ | |
("Etag", f'"{etag}"'), | |
("Cache-Control", f"max-age={timeout}, public"), | |
] | |
if not is_resource_modified(environ, etag, last_modified=mtime): | |
f.close() | |
start_response("304 Not Modified", headers) | |
return [] | |
headers.append(("Expires", http_date(time() + timeout))) | |
else: | |
headers.append(("Cache-Control", "public")) | |
headers.extend( | |
( | |
("Content-Type", mime_type), | |
("Content-Length", str(file_size)), | |
("Last-Modified", http_date(mtime)), | |
) | |
) | |
start_response("200 OK", headers) | |
return wrap_file(environ, f) | |