Spaces:
Running
Running
from __future__ import annotations | |
import contextvars | |
import sys | |
import typing as t | |
from functools import update_wrapper | |
from types import TracebackType | |
from werkzeug.exceptions import HTTPException | |
from . import typing as ft | |
from .globals import _cv_app | |
from .globals import _cv_request | |
from .signals import appcontext_popped | |
from .signals import appcontext_pushed | |
if t.TYPE_CHECKING: # pragma: no cover | |
from _typeshed.wsgi import WSGIEnvironment | |
from .app import Flask | |
from .sessions import SessionMixin | |
from .wrappers import Request | |
# a singleton sentinel value for parameter defaults | |
_sentinel = object() | |
class _AppCtxGlobals: | |
"""A plain object. Used as a namespace for storing data during an | |
application context. | |
Creating an app context automatically creates this object, which is | |
made available as the :data:`g` proxy. | |
.. describe:: 'key' in g | |
Check whether an attribute is present. | |
.. versionadded:: 0.10 | |
.. describe:: iter(g) | |
Return an iterator over the attribute names. | |
.. versionadded:: 0.10 | |
""" | |
# Define attr methods to let mypy know this is a namespace object | |
# that has arbitrary attributes. | |
def __getattr__(self, name: str) -> t.Any: | |
try: | |
return self.__dict__[name] | |
except KeyError: | |
raise AttributeError(name) from None | |
def __setattr__(self, name: str, value: t.Any) -> None: | |
self.__dict__[name] = value | |
def __delattr__(self, name: str) -> None: | |
try: | |
del self.__dict__[name] | |
except KeyError: | |
raise AttributeError(name) from None | |
def get(self, name: str, default: t.Any | None = None) -> t.Any: | |
"""Get an attribute by name, or a default value. Like | |
:meth:`dict.get`. | |
:param name: Name of attribute to get. | |
:param default: Value to return if the attribute is not present. | |
.. versionadded:: 0.10 | |
""" | |
return self.__dict__.get(name, default) | |
def pop(self, name: str, default: t.Any = _sentinel) -> t.Any: | |
"""Get and remove an attribute by name. Like :meth:`dict.pop`. | |
:param name: Name of attribute to pop. | |
:param default: Value to return if the attribute is not present, | |
instead of raising a ``KeyError``. | |
.. versionadded:: 0.11 | |
""" | |
if default is _sentinel: | |
return self.__dict__.pop(name) | |
else: | |
return self.__dict__.pop(name, default) | |
def setdefault(self, name: str, default: t.Any = None) -> t.Any: | |
"""Get the value of an attribute if it is present, otherwise | |
set and return a default value. Like :meth:`dict.setdefault`. | |
:param name: Name of attribute to get. | |
:param default: Value to set and return if the attribute is not | |
present. | |
.. versionadded:: 0.11 | |
""" | |
return self.__dict__.setdefault(name, default) | |
def __contains__(self, item: str) -> bool: | |
return item in self.__dict__ | |
def __iter__(self) -> t.Iterator[str]: | |
return iter(self.__dict__) | |
def __repr__(self) -> str: | |
ctx = _cv_app.get(None) | |
if ctx is not None: | |
return f"<flask.g of '{ctx.app.name}'>" | |
return object.__repr__(self) | |
def after_this_request( | |
f: ft.AfterRequestCallable[t.Any], | |
) -> ft.AfterRequestCallable[t.Any]: | |
"""Executes a function after this request. This is useful to modify | |
response objects. The function is passed the response object and has | |
to return the same or a new one. | |
Example:: | |
@app.route('/') | |
def index(): | |
@after_this_request | |
def add_header(response): | |
response.headers['X-Foo'] = 'Parachute' | |
return response | |
return 'Hello World!' | |
This is more useful if a function other than the view function wants to | |
modify a response. For instance think of a decorator that wants to add | |
some headers without converting the return value into a response object. | |
.. versionadded:: 0.9 | |
""" | |
ctx = _cv_request.get(None) | |
if ctx is None: | |
raise RuntimeError( | |
"'after_this_request' can only be used when a request" | |
" context is active, such as in a view function." | |
) | |
ctx._after_request_functions.append(f) | |
return f | |
F = t.TypeVar("F", bound=t.Callable[..., t.Any]) | |
def copy_current_request_context(f: F) -> F: | |
"""A helper function that decorates a function to retain the current | |
request context. This is useful when working with greenlets. The moment | |
the function is decorated a copy of the request context is created and | |
then pushed when the function is called. The current session is also | |
included in the copied request context. | |
Example:: | |
import gevent | |
from flask import copy_current_request_context | |
@app.route('/') | |
def index(): | |
@copy_current_request_context | |
def do_some_work(): | |
# do some work here, it can access flask.request or | |
# flask.session like you would otherwise in the view function. | |
... | |
gevent.spawn(do_some_work) | |
return 'Regular response' | |
.. versionadded:: 0.10 | |
""" | |
ctx = _cv_request.get(None) | |
if ctx is None: | |
raise RuntimeError( | |
"'copy_current_request_context' can only be used when a" | |
" request context is active, such as in a view function." | |
) | |
ctx = ctx.copy() | |
def wrapper(*args: t.Any, **kwargs: t.Any) -> t.Any: | |
with ctx: # type: ignore[union-attr] | |
return ctx.app.ensure_sync(f)(*args, **kwargs) # type: ignore[union-attr] | |
return update_wrapper(wrapper, f) # type: ignore[return-value] | |
def has_request_context() -> bool: | |
"""If you have code that wants to test if a request context is there or | |
not this function can be used. For instance, you may want to take advantage | |
of request information if the request object is available, but fail | |
silently if it is unavailable. | |
:: | |
class User(db.Model): | |
def __init__(self, username, remote_addr=None): | |
self.username = username | |
if remote_addr is None and has_request_context(): | |
remote_addr = request.remote_addr | |
self.remote_addr = remote_addr | |
Alternatively you can also just test any of the context bound objects | |
(such as :class:`request` or :class:`g`) for truthness:: | |
class User(db.Model): | |
def __init__(self, username, remote_addr=None): | |
self.username = username | |
if remote_addr is None and request: | |
remote_addr = request.remote_addr | |
self.remote_addr = remote_addr | |
.. versionadded:: 0.7 | |
""" | |
return _cv_request.get(None) is not None | |
def has_app_context() -> bool: | |
"""Works like :func:`has_request_context` but for the application | |
context. You can also just do a boolean check on the | |
:data:`current_app` object instead. | |
.. versionadded:: 0.9 | |
""" | |
return _cv_app.get(None) is not None | |
class AppContext: | |
"""The app context contains application-specific information. An app | |
context is created and pushed at the beginning of each request if | |
one is not already active. An app context is also pushed when | |
running CLI commands. | |
""" | |
def __init__(self, app: Flask) -> None: | |
self.app = app | |
self.url_adapter = app.create_url_adapter(None) | |
self.g: _AppCtxGlobals = app.app_ctx_globals_class() | |
self._cv_tokens: list[contextvars.Token[AppContext]] = [] | |
def push(self) -> None: | |
"""Binds the app context to the current context.""" | |
self._cv_tokens.append(_cv_app.set(self)) | |
appcontext_pushed.send(self.app, _async_wrapper=self.app.ensure_sync) | |
def pop(self, exc: BaseException | None = _sentinel) -> None: # type: ignore | |
"""Pops the app context.""" | |
try: | |
if len(self._cv_tokens) == 1: | |
if exc is _sentinel: | |
exc = sys.exc_info()[1] | |
self.app.do_teardown_appcontext(exc) | |
finally: | |
ctx = _cv_app.get() | |
_cv_app.reset(self._cv_tokens.pop()) | |
if ctx is not self: | |
raise AssertionError( | |
f"Popped wrong app context. ({ctx!r} instead of {self!r})" | |
) | |
appcontext_popped.send(self.app, _async_wrapper=self.app.ensure_sync) | |
def __enter__(self) -> AppContext: | |
self.push() | |
return self | |
def __exit__( | |
self, | |
exc_type: type | None, | |
exc_value: BaseException | None, | |
tb: TracebackType | None, | |
) -> None: | |
self.pop(exc_value) | |
class RequestContext: | |
"""The request context contains per-request information. The Flask | |
app creates and pushes it at the beginning of the request, then pops | |
it at the end of the request. It will create the URL adapter and | |
request object for the WSGI environment provided. | |
Do not attempt to use this class directly, instead use | |
:meth:`~flask.Flask.test_request_context` and | |
:meth:`~flask.Flask.request_context` to create this object. | |
When the request context is popped, it will evaluate all the | |
functions registered on the application for teardown execution | |
(:meth:`~flask.Flask.teardown_request`). | |
The request context is automatically popped at the end of the | |
request. When using the interactive debugger, the context will be | |
restored so ``request`` is still accessible. Similarly, the test | |
client can preserve the context after the request ends. However, | |
teardown functions may already have closed some resources such as | |
database connections. | |
""" | |
def __init__( | |
self, | |
app: Flask, | |
environ: WSGIEnvironment, | |
request: Request | None = None, | |
session: SessionMixin | None = None, | |
) -> None: | |
self.app = app | |
if request is None: | |
request = app.request_class(environ) | |
request.json_module = app.json | |
self.request: Request = request | |
self.url_adapter = None | |
try: | |
self.url_adapter = app.create_url_adapter(self.request) | |
except HTTPException as e: | |
self.request.routing_exception = e | |
self.flashes: list[tuple[str, str]] | None = None | |
self.session: SessionMixin | None = session | |
# Functions that should be executed after the request on the response | |
# object. These will be called before the regular "after_request" | |
# functions. | |
self._after_request_functions: list[ft.AfterRequestCallable[t.Any]] = [] | |
self._cv_tokens: list[ | |
tuple[contextvars.Token[RequestContext], AppContext | None] | |
] = [] | |
def copy(self) -> RequestContext: | |
"""Creates a copy of this request context with the same request object. | |
This can be used to move a request context to a different greenlet. | |
Because the actual request object is the same this cannot be used to | |
move a request context to a different thread unless access to the | |
request object is locked. | |
.. versionadded:: 0.10 | |
.. versionchanged:: 1.1 | |
The current session object is used instead of reloading the original | |
data. This prevents `flask.session` pointing to an out-of-date object. | |
""" | |
return self.__class__( | |
self.app, | |
environ=self.request.environ, | |
request=self.request, | |
session=self.session, | |
) | |
def match_request(self) -> None: | |
"""Can be overridden by a subclass to hook into the matching | |
of the request. | |
""" | |
try: | |
result = self.url_adapter.match(return_rule=True) # type: ignore | |
self.request.url_rule, self.request.view_args = result # type: ignore | |
except HTTPException as e: | |
self.request.routing_exception = e | |
def push(self) -> None: | |
# Before we push the request context we have to ensure that there | |
# is an application context. | |
app_ctx = _cv_app.get(None) | |
if app_ctx is None or app_ctx.app is not self.app: | |
app_ctx = self.app.app_context() | |
app_ctx.push() | |
else: | |
app_ctx = None | |
self._cv_tokens.append((_cv_request.set(self), app_ctx)) | |
# Open the session at the moment that the request context is available. | |
# This allows a custom open_session method to use the request context. | |
# Only open a new session if this is the first time the request was | |
# pushed, otherwise stream_with_context loses the session. | |
if self.session is None: | |
session_interface = self.app.session_interface | |
self.session = session_interface.open_session(self.app, self.request) | |
if self.session is None: | |
self.session = session_interface.make_null_session(self.app) | |
# Match the request URL after loading the session, so that the | |
# session is available in custom URL converters. | |
if self.url_adapter is not None: | |
self.match_request() | |
def pop(self, exc: BaseException | None = _sentinel) -> None: # type: ignore | |
"""Pops the request context and unbinds it by doing that. This will | |
also trigger the execution of functions registered by the | |
:meth:`~flask.Flask.teardown_request` decorator. | |
.. versionchanged:: 0.9 | |
Added the `exc` argument. | |
""" | |
clear_request = len(self._cv_tokens) == 1 | |
try: | |
if clear_request: | |
if exc is _sentinel: | |
exc = sys.exc_info()[1] | |
self.app.do_teardown_request(exc) | |
request_close = getattr(self.request, "close", None) | |
if request_close is not None: | |
request_close() | |
finally: | |
ctx = _cv_request.get() | |
token, app_ctx = self._cv_tokens.pop() | |
_cv_request.reset(token) | |
# get rid of circular dependencies at the end of the request | |
# so that we don't require the GC to be active. | |
if clear_request: | |
ctx.request.environ["werkzeug.request"] = None | |
if app_ctx is not None: | |
app_ctx.pop(exc) | |
if ctx is not self: | |
raise AssertionError( | |
f"Popped wrong request context. ({ctx!r} instead of {self!r})" | |
) | |
def __enter__(self) -> RequestContext: | |
self.push() | |
return self | |
def __exit__( | |
self, | |
exc_type: type | None, | |
exc_value: BaseException | None, | |
tb: TracebackType | None, | |
) -> None: | |
self.pop(exc_value) | |
def __repr__(self) -> str: | |
return ( | |
f"<{type(self).__name__} {self.request.url!r}" | |
f" [{self.request.method}] of {self.app.name}>" | |
) | |