Spaces:
Running
Running
""" | |
Application Profiler | |
==================== | |
This module provides a middleware that profiles each request with the | |
:mod:`cProfile` module. This can help identify bottlenecks in your code | |
that may be slowing down your application. | |
.. autoclass:: ProfilerMiddleware | |
:copyright: 2007 Pallets | |
:license: BSD-3-Clause | |
""" | |
from __future__ import annotations | |
import os.path | |
import sys | |
import time | |
import typing as t | |
from pstats import Stats | |
try: | |
from cProfile import Profile | |
except ImportError: | |
from profile import Profile # type: ignore | |
if t.TYPE_CHECKING: | |
from _typeshed.wsgi import StartResponse | |
from _typeshed.wsgi import WSGIApplication | |
from _typeshed.wsgi import WSGIEnvironment | |
class ProfilerMiddleware: | |
"""Wrap a WSGI application and profile the execution of each | |
request. Responses are buffered so that timings are more exact. | |
If ``stream`` is given, :class:`pstats.Stats` are written to it | |
after each request. If ``profile_dir`` is given, :mod:`cProfile` | |
data files are saved to that directory, one file per request. | |
The filename can be customized by passing ``filename_format``. If | |
it is a string, it will be formatted using :meth:`str.format` with | |
the following fields available: | |
- ``{method}`` - The request method; GET, POST, etc. | |
- ``{path}`` - The request path or 'root' should one not exist. | |
- ``{elapsed}`` - The elapsed time of the request in milliseconds. | |
- ``{time}`` - The time of the request. | |
If it is a callable, it will be called with the WSGI ``environ`` and | |
be expected to return a filename string. The ``environ`` dictionary | |
will also have the ``"werkzeug.profiler"`` key populated with a | |
dictionary containing the following fields (more may be added in the | |
future): | |
- ``{elapsed}`` - The elapsed time of the request in milliseconds. | |
- ``{time}`` - The time of the request. | |
:param app: The WSGI application to wrap. | |
:param stream: Write stats to this stream. Disable with ``None``. | |
:param sort_by: A tuple of columns to sort stats by. See | |
:meth:`pstats.Stats.sort_stats`. | |
:param restrictions: A tuple of restrictions to filter stats by. See | |
:meth:`pstats.Stats.print_stats`. | |
:param profile_dir: Save profile data files to this directory. | |
:param filename_format: Format string for profile data file names, | |
or a callable returning a name. See explanation above. | |
.. code-block:: python | |
from werkzeug.middleware.profiler import ProfilerMiddleware | |
app = ProfilerMiddleware(app) | |
.. versionchanged:: 3.0 | |
Added the ``"werkzeug.profiler"`` key to the ``filename_format(environ)`` | |
parameter with the ``elapsed`` and ``time`` fields. | |
.. versionchanged:: 0.15 | |
Stats are written even if ``profile_dir`` is given, and can be | |
disable by passing ``stream=None``. | |
.. versionadded:: 0.15 | |
Added ``filename_format``. | |
.. versionadded:: 0.9 | |
Added ``restrictions`` and ``profile_dir``. | |
""" | |
def __init__( | |
self, | |
app: WSGIApplication, | |
stream: t.IO[str] | None = sys.stdout, | |
sort_by: t.Iterable[str] = ("time", "calls"), | |
restrictions: t.Iterable[str | int | float] = (), | |
profile_dir: str | None = None, | |
filename_format: str = "{method}.{path}.{elapsed:.0f}ms.{time:.0f}.prof", | |
) -> None: | |
self._app = app | |
self._stream = stream | |
self._sort_by = sort_by | |
self._restrictions = restrictions | |
self._profile_dir = profile_dir | |
self._filename_format = filename_format | |
def __call__( | |
self, environ: WSGIEnvironment, start_response: StartResponse | |
) -> t.Iterable[bytes]: | |
response_body: list[bytes] = [] | |
def catching_start_response(status, headers, exc_info=None): # type: ignore | |
start_response(status, headers, exc_info) | |
return response_body.append | |
def runapp() -> None: | |
app_iter = self._app( | |
environ, t.cast("StartResponse", catching_start_response) | |
) | |
response_body.extend(app_iter) | |
if hasattr(app_iter, "close"): | |
app_iter.close() | |
profile = Profile() | |
start = time.time() | |
profile.runcall(runapp) | |
body = b"".join(response_body) | |
elapsed = time.time() - start | |
if self._profile_dir is not None: | |
if callable(self._filename_format): | |
environ["werkzeug.profiler"] = { | |
"elapsed": elapsed * 1000.0, | |
"time": time.time(), | |
} | |
filename = self._filename_format(environ) | |
else: | |
filename = self._filename_format.format( | |
method=environ["REQUEST_METHOD"], | |
path=environ["PATH_INFO"].strip("/").replace("/", ".") or "root", | |
elapsed=elapsed * 1000.0, | |
time=time.time(), | |
) | |
filename = os.path.join(self._profile_dir, filename) | |
profile.dump_stats(filename) | |
if self._stream is not None: | |
stats = Stats(profile, stream=self._stream) | |
stats.sort_stats(*self._sort_by) | |
print("-" * 80, file=self._stream) | |
path_info = environ.get("PATH_INFO", "") | |
print(f"PATH: {path_info!r}", file=self._stream) | |
stats.print_stats(*self._restrictions) | |
print(f"{'-' * 80}\n", file=self._stream) | |
return [body] | |