Spaces:
Running
Running
File size: 37,093 Bytes
47b2311 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 |
from __future__ import annotations
import ast
import collections.abc as cabc
import importlib.metadata
import inspect
import os
import platform
import re
import sys
import traceback
import typing as t
from functools import update_wrapper
from operator import itemgetter
from types import ModuleType
import click
from click.core import ParameterSource
from werkzeug import run_simple
from werkzeug.serving import is_running_from_reloader
from werkzeug.utils import import_string
from .globals import current_app
from .helpers import get_debug_flag
from .helpers import get_load_dotenv
if t.TYPE_CHECKING:
import ssl
from _typeshed.wsgi import StartResponse
from _typeshed.wsgi import WSGIApplication
from _typeshed.wsgi import WSGIEnvironment
from .app import Flask
class NoAppException(click.UsageError):
"""Raised if an application cannot be found or loaded."""
def find_best_app(module: ModuleType) -> Flask:
"""Given a module instance this tries to find the best possible
application in the module or raises an exception.
"""
from . import Flask
# Search for the most common names first.
for attr_name in ("app", "application"):
app = getattr(module, attr_name, None)
if isinstance(app, Flask):
return app
# Otherwise find the only object that is a Flask instance.
matches = [v for v in module.__dict__.values() if isinstance(v, Flask)]
if len(matches) == 1:
return matches[0]
elif len(matches) > 1:
raise NoAppException(
"Detected multiple Flask applications in module"
f" '{module.__name__}'. Use '{module.__name__}:name'"
" to specify the correct one."
)
# Search for app factory functions.
for attr_name in ("create_app", "make_app"):
app_factory = getattr(module, attr_name, None)
if inspect.isfunction(app_factory):
try:
app = app_factory()
if isinstance(app, Flask):
return app
except TypeError as e:
if not _called_with_wrong_args(app_factory):
raise
raise NoAppException(
f"Detected factory '{attr_name}' in module '{module.__name__}',"
" but could not call it without arguments. Use"
f" '{module.__name__}:{attr_name}(args)'"
" to specify arguments."
) from e
raise NoAppException(
"Failed to find Flask application or factory in module"
f" '{module.__name__}'. Use '{module.__name__}:name'"
" to specify one."
)
def _called_with_wrong_args(f: t.Callable[..., Flask]) -> bool:
"""Check whether calling a function raised a ``TypeError`` because
the call failed or because something in the factory raised the
error.
:param f: The function that was called.
:return: ``True`` if the call failed.
"""
tb = sys.exc_info()[2]
try:
while tb is not None:
if tb.tb_frame.f_code is f.__code__:
# In the function, it was called successfully.
return False
tb = tb.tb_next
# Didn't reach the function.
return True
finally:
# Delete tb to break a circular reference.
# https://docs.python.org/2/library/sys.html#sys.exc_info
del tb
def find_app_by_string(module: ModuleType, app_name: str) -> Flask:
"""Check if the given string is a variable name or a function. Call
a function to get the app instance, or return the variable directly.
"""
from . import Flask
# Parse app_name as a single expression to determine if it's a valid
# attribute name or function call.
try:
expr = ast.parse(app_name.strip(), mode="eval").body
except SyntaxError:
raise NoAppException(
f"Failed to parse {app_name!r} as an attribute name or function call."
) from None
if isinstance(expr, ast.Name):
name = expr.id
args = []
kwargs = {}
elif isinstance(expr, ast.Call):
# Ensure the function name is an attribute name only.
if not isinstance(expr.func, ast.Name):
raise NoAppException(
f"Function reference must be a simple name: {app_name!r}."
)
name = expr.func.id
# Parse the positional and keyword arguments as literals.
try:
args = [ast.literal_eval(arg) for arg in expr.args]
kwargs = {
kw.arg: ast.literal_eval(kw.value)
for kw in expr.keywords
if kw.arg is not None
}
except ValueError:
# literal_eval gives cryptic error messages, show a generic
# message with the full expression instead.
raise NoAppException(
f"Failed to parse arguments as literal values: {app_name!r}."
) from None
else:
raise NoAppException(
f"Failed to parse {app_name!r} as an attribute name or function call."
)
try:
attr = getattr(module, name)
except AttributeError as e:
raise NoAppException(
f"Failed to find attribute {name!r} in {module.__name__!r}."
) from e
# If the attribute is a function, call it with any args and kwargs
# to get the real application.
if inspect.isfunction(attr):
try:
app = attr(*args, **kwargs)
except TypeError as e:
if not _called_with_wrong_args(attr):
raise
raise NoAppException(
f"The factory {app_name!r} in module"
f" {module.__name__!r} could not be called with the"
" specified arguments."
) from e
else:
app = attr
if isinstance(app, Flask):
return app
raise NoAppException(
"A valid Flask application was not obtained from"
f" '{module.__name__}:{app_name}'."
)
def prepare_import(path: str) -> str:
"""Given a filename this will try to calculate the python path, add it
to the search path and return the actual module name that is expected.
"""
path = os.path.realpath(path)
fname, ext = os.path.splitext(path)
if ext == ".py":
path = fname
if os.path.basename(path) == "__init__":
path = os.path.dirname(path)
module_name = []
# move up until outside package structure (no __init__.py)
while True:
path, name = os.path.split(path)
module_name.append(name)
if not os.path.exists(os.path.join(path, "__init__.py")):
break
if sys.path[0] != path:
sys.path.insert(0, path)
return ".".join(module_name[::-1])
@t.overload
def locate_app(
module_name: str, app_name: str | None, raise_if_not_found: t.Literal[True] = True
) -> Flask: ...
@t.overload
def locate_app(
module_name: str, app_name: str | None, raise_if_not_found: t.Literal[False] = ...
) -> Flask | None: ...
def locate_app(
module_name: str, app_name: str | None, raise_if_not_found: bool = True
) -> Flask | None:
try:
__import__(module_name)
except ImportError:
# Reraise the ImportError if it occurred within the imported module.
# Determine this by checking whether the trace has a depth > 1.
if sys.exc_info()[2].tb_next: # type: ignore[union-attr]
raise NoAppException(
f"While importing {module_name!r}, an ImportError was"
f" raised:\n\n{traceback.format_exc()}"
) from None
elif raise_if_not_found:
raise NoAppException(f"Could not import {module_name!r}.") from None
else:
return None
module = sys.modules[module_name]
if app_name is None:
return find_best_app(module)
else:
return find_app_by_string(module, app_name)
def get_version(ctx: click.Context, param: click.Parameter, value: t.Any) -> None:
if not value or ctx.resilient_parsing:
return
flask_version = importlib.metadata.version("flask")
werkzeug_version = importlib.metadata.version("werkzeug")
click.echo(
f"Python {platform.python_version()}\n"
f"Flask {flask_version}\n"
f"Werkzeug {werkzeug_version}",
color=ctx.color,
)
ctx.exit()
version_option = click.Option(
["--version"],
help="Show the Flask version.",
expose_value=False,
callback=get_version,
is_flag=True,
is_eager=True,
)
class ScriptInfo:
"""Helper object to deal with Flask applications. This is usually not
necessary to interface with as it's used internally in the dispatching
to click. In future versions of Flask this object will most likely play
a bigger role. Typically it's created automatically by the
:class:`FlaskGroup` but you can also manually create it and pass it
onwards as click object.
.. versionchanged:: 3.1
Added the ``load_dotenv_defaults`` parameter and attribute.
"""
def __init__(
self,
app_import_path: str | None = None,
create_app: t.Callable[..., Flask] | None = None,
set_debug_flag: bool = True,
load_dotenv_defaults: bool = True,
) -> None:
#: Optionally the import path for the Flask application.
self.app_import_path = app_import_path
#: Optionally a function that is passed the script info to create
#: the instance of the application.
self.create_app = create_app
#: A dictionary with arbitrary data that can be associated with
#: this script info.
self.data: dict[t.Any, t.Any] = {}
self.set_debug_flag = set_debug_flag
self.load_dotenv_defaults = get_load_dotenv(load_dotenv_defaults)
"""Whether default ``.flaskenv`` and ``.env`` files should be loaded.
``ScriptInfo`` doesn't load anything, this is for reference when doing
the load elsewhere during processing.
.. versionadded:: 3.1
"""
self._loaded_app: Flask | None = None
def load_app(self) -> Flask:
"""Loads the Flask app (if not yet loaded) and returns it. Calling
this multiple times will just result in the already loaded app to
be returned.
"""
if self._loaded_app is not None:
return self._loaded_app
app: Flask | None = None
if self.create_app is not None:
app = self.create_app()
else:
if self.app_import_path:
path, name = (
re.split(r":(?![\\/])", self.app_import_path, maxsplit=1) + [None]
)[:2]
import_name = prepare_import(path)
app = locate_app(import_name, name)
else:
for path in ("wsgi.py", "app.py"):
import_name = prepare_import(path)
app = locate_app(import_name, None, raise_if_not_found=False)
if app is not None:
break
if app is None:
raise NoAppException(
"Could not locate a Flask application. Use the"
" 'flask --app' option, 'FLASK_APP' environment"
" variable, or a 'wsgi.py' or 'app.py' file in the"
" current directory."
)
if self.set_debug_flag:
# Update the app's debug flag through the descriptor so that
# other values repopulate as well.
app.debug = get_debug_flag()
self._loaded_app = app
return app
pass_script_info = click.make_pass_decorator(ScriptInfo, ensure=True)
F = t.TypeVar("F", bound=t.Callable[..., t.Any])
def with_appcontext(f: F) -> F:
"""Wraps a callback so that it's guaranteed to be executed with the
script's application context.
Custom commands (and their options) registered under ``app.cli`` or
``blueprint.cli`` will always have an app context available, this
decorator is not required in that case.
.. versionchanged:: 2.2
The app context is active for subcommands as well as the
decorated callback. The app context is always available to
``app.cli`` command and parameter callbacks.
"""
@click.pass_context
def decorator(ctx: click.Context, /, *args: t.Any, **kwargs: t.Any) -> t.Any:
if not current_app:
app = ctx.ensure_object(ScriptInfo).load_app()
ctx.with_resource(app.app_context())
return ctx.invoke(f, *args, **kwargs)
return update_wrapper(decorator, f) # type: ignore[return-value]
class AppGroup(click.Group):
"""This works similar to a regular click :class:`~click.Group` but it
changes the behavior of the :meth:`command` decorator so that it
automatically wraps the functions in :func:`with_appcontext`.
Not to be confused with :class:`FlaskGroup`.
"""
def command( # type: ignore[override]
self, *args: t.Any, **kwargs: t.Any
) -> t.Callable[[t.Callable[..., t.Any]], click.Command]:
"""This works exactly like the method of the same name on a regular
:class:`click.Group` but it wraps callbacks in :func:`with_appcontext`
unless it's disabled by passing ``with_appcontext=False``.
"""
wrap_for_ctx = kwargs.pop("with_appcontext", True)
def decorator(f: t.Callable[..., t.Any]) -> click.Command:
if wrap_for_ctx:
f = with_appcontext(f)
return super(AppGroup, self).command(*args, **kwargs)(f) # type: ignore[no-any-return]
return decorator
def group( # type: ignore[override]
self, *args: t.Any, **kwargs: t.Any
) -> t.Callable[[t.Callable[..., t.Any]], click.Group]:
"""This works exactly like the method of the same name on a regular
:class:`click.Group` but it defaults the group class to
:class:`AppGroup`.
"""
kwargs.setdefault("cls", AppGroup)
return super().group(*args, **kwargs) # type: ignore[no-any-return]
def _set_app(ctx: click.Context, param: click.Option, value: str | None) -> str | None:
if value is None:
return None
info = ctx.ensure_object(ScriptInfo)
info.app_import_path = value
return value
# This option is eager so the app will be available if --help is given.
# --help is also eager, so --app must be before it in the param list.
# no_args_is_help bypasses eager processing, so this option must be
# processed manually in that case to ensure FLASK_APP gets picked up.
_app_option = click.Option(
["-A", "--app"],
metavar="IMPORT",
help=(
"The Flask application or factory function to load, in the form 'module:name'."
" Module can be a dotted import or file path. Name is not required if it is"
" 'app', 'application', 'create_app', or 'make_app', and can be 'name(args)' to"
" pass arguments."
),
is_eager=True,
expose_value=False,
callback=_set_app,
)
def _set_debug(ctx: click.Context, param: click.Option, value: bool) -> bool | None:
# If the flag isn't provided, it will default to False. Don't use
# that, let debug be set by env in that case.
source = ctx.get_parameter_source(param.name) # type: ignore[arg-type]
if source is not None and source in (
ParameterSource.DEFAULT,
ParameterSource.DEFAULT_MAP,
):
return None
# Set with env var instead of ScriptInfo.load so that it can be
# accessed early during a factory function.
os.environ["FLASK_DEBUG"] = "1" if value else "0"
return value
_debug_option = click.Option(
["--debug/--no-debug"],
help="Set debug mode.",
expose_value=False,
callback=_set_debug,
)
def _env_file_callback(
ctx: click.Context, param: click.Option, value: str | None
) -> str | None:
try:
import dotenv # noqa: F401
except ImportError:
# Only show an error if a value was passed, otherwise we still want to
# call load_dotenv and show a message without exiting.
if value is not None:
raise click.BadParameter(
"python-dotenv must be installed to load an env file.",
ctx=ctx,
param=param,
) from None
# Load if a value was passed, or we want to load default files, or both.
if value is not None or ctx.obj.load_dotenv_defaults:
load_dotenv(value, load_defaults=ctx.obj.load_dotenv_defaults)
return value
# This option is eager so env vars are loaded as early as possible to be
# used by other options.
_env_file_option = click.Option(
["-e", "--env-file"],
type=click.Path(exists=True, dir_okay=False),
help=(
"Load environment variables from this file, taking precedence over"
" those set by '.env' and '.flaskenv'. Variables set directly in the"
" environment take highest precedence. python-dotenv must be installed."
),
is_eager=True,
expose_value=False,
callback=_env_file_callback,
)
class FlaskGroup(AppGroup):
"""Special subclass of the :class:`AppGroup` group that supports
loading more commands from the configured Flask app. Normally a
developer does not have to interface with this class but there are
some very advanced use cases for which it makes sense to create an
instance of this. see :ref:`custom-scripts`.
:param add_default_commands: if this is True then the default run and
shell commands will be added.
:param add_version_option: adds the ``--version`` option.
:param create_app: an optional callback that is passed the script info and
returns the loaded app.
:param load_dotenv: Load the nearest :file:`.env` and :file:`.flaskenv`
files to set environment variables. Will also change the working
directory to the directory containing the first file found.
:param set_debug_flag: Set the app's debug flag.
.. versionchanged:: 3.1
``-e path`` takes precedence over default ``.env`` and ``.flaskenv`` files.
.. versionchanged:: 2.2
Added the ``-A/--app``, ``--debug/--no-debug``, ``-e/--env-file`` options.
.. versionchanged:: 2.2
An app context is pushed when running ``app.cli`` commands, so
``@with_appcontext`` is no longer required for those commands.
.. versionchanged:: 1.0
If installed, python-dotenv will be used to load environment variables
from :file:`.env` and :file:`.flaskenv` files.
"""
def __init__(
self,
add_default_commands: bool = True,
create_app: t.Callable[..., Flask] | None = None,
add_version_option: bool = True,
load_dotenv: bool = True,
set_debug_flag: bool = True,
**extra: t.Any,
) -> None:
params: list[click.Parameter] = list(extra.pop("params", None) or ())
# Processing is done with option callbacks instead of a group
# callback. This allows users to make a custom group callback
# without losing the behavior. --env-file must come first so
# that it is eagerly evaluated before --app.
params.extend((_env_file_option, _app_option, _debug_option))
if add_version_option:
params.append(version_option)
if "context_settings" not in extra:
extra["context_settings"] = {}
extra["context_settings"].setdefault("auto_envvar_prefix", "FLASK")
super().__init__(params=params, **extra)
self.create_app = create_app
self.load_dotenv = load_dotenv
self.set_debug_flag = set_debug_flag
if add_default_commands:
self.add_command(run_command)
self.add_command(shell_command)
self.add_command(routes_command)
self._loaded_plugin_commands = False
def _load_plugin_commands(self) -> None:
if self._loaded_plugin_commands:
return
if sys.version_info >= (3, 10):
from importlib import metadata
else:
# Use a backport on Python < 3.10. We technically have
# importlib.metadata on 3.8+, but the API changed in 3.10,
# so use the backport for consistency.
import importlib_metadata as metadata # pyright: ignore
for ep in metadata.entry_points(group="flask.commands"):
self.add_command(ep.load(), ep.name)
self._loaded_plugin_commands = True
def get_command(self, ctx: click.Context, name: str) -> click.Command | None:
self._load_plugin_commands()
# Look up built-in and plugin commands, which should be
# available even if the app fails to load.
rv = super().get_command(ctx, name)
if rv is not None:
return rv
info = ctx.ensure_object(ScriptInfo)
# Look up commands provided by the app, showing an error and
# continuing if the app couldn't be loaded.
try:
app = info.load_app()
except NoAppException as e:
click.secho(f"Error: {e.format_message()}\n", err=True, fg="red")
return None
# Push an app context for the loaded app unless it is already
# active somehow. This makes the context available to parameter
# and command callbacks without needing @with_appcontext.
if not current_app or current_app._get_current_object() is not app: # type: ignore[attr-defined]
ctx.with_resource(app.app_context())
return app.cli.get_command(ctx, name)
def list_commands(self, ctx: click.Context) -> list[str]:
self._load_plugin_commands()
# Start with the built-in and plugin commands.
rv = set(super().list_commands(ctx))
info = ctx.ensure_object(ScriptInfo)
# Add commands provided by the app, showing an error and
# continuing if the app couldn't be loaded.
try:
rv.update(info.load_app().cli.list_commands(ctx))
except NoAppException as e:
# When an app couldn't be loaded, show the error message
# without the traceback.
click.secho(f"Error: {e.format_message()}\n", err=True, fg="red")
except Exception:
# When any other errors occurred during loading, show the
# full traceback.
click.secho(f"{traceback.format_exc()}\n", err=True, fg="red")
return sorted(rv)
def make_context(
self,
info_name: str | None,
args: list[str],
parent: click.Context | None = None,
**extra: t.Any,
) -> click.Context:
# Set a flag to tell app.run to become a no-op. If app.run was
# not in a __name__ == __main__ guard, it would start the server
# when importing, blocking whatever command is being called.
os.environ["FLASK_RUN_FROM_CLI"] = "true"
if "obj" not in extra and "obj" not in self.context_settings:
extra["obj"] = ScriptInfo(
create_app=self.create_app,
set_debug_flag=self.set_debug_flag,
load_dotenv_defaults=self.load_dotenv,
)
return super().make_context(info_name, args, parent=parent, **extra)
def parse_args(self, ctx: click.Context, args: list[str]) -> list[str]:
if not args and self.no_args_is_help:
# Attempt to load --env-file and --app early in case they
# were given as env vars. Otherwise no_args_is_help will not
# see commands from app.cli.
_env_file_option.handle_parse_result(ctx, {}, [])
_app_option.handle_parse_result(ctx, {}, [])
return super().parse_args(ctx, args)
def _path_is_ancestor(path: str, other: str) -> bool:
"""Take ``other`` and remove the length of ``path`` from it. Then join it
to ``path``. If it is the original value, ``path`` is an ancestor of
``other``."""
return os.path.join(path, other[len(path) :].lstrip(os.sep)) == other
def load_dotenv(
path: str | os.PathLike[str] | None = None, load_defaults: bool = True
) -> bool:
"""Load "dotenv" files to set environment variables. A given path takes
precedence over ``.env``, which takes precedence over ``.flaskenv``. After
loading and combining these files, values are only set if the key is not
already set in ``os.environ``.
This is a no-op if `python-dotenv`_ is not installed.
.. _python-dotenv: https://github.com/theskumar/python-dotenv#readme
:param path: Load the file at this location.
:param load_defaults: Search for and load the default ``.flaskenv`` and
``.env`` files.
:return: ``True`` if at least one env var was loaded.
.. versionchanged:: 3.1
Added the ``load_defaults`` parameter. A given path takes precedence
over default files.
.. versionchanged:: 2.0
The current directory is not changed to the location of the
loaded file.
.. versionchanged:: 2.0
When loading the env files, set the default encoding to UTF-8.
.. versionchanged:: 1.1.0
Returns ``False`` when python-dotenv is not installed, or when
the given path isn't a file.
.. versionadded:: 1.0
"""
try:
import dotenv
except ImportError:
if path or os.path.isfile(".env") or os.path.isfile(".flaskenv"):
click.secho(
" * Tip: There are .env files present. Install python-dotenv"
" to use them.",
fg="yellow",
err=True,
)
return False
data: dict[str, str | None] = {}
if load_defaults:
for default_name in (".flaskenv", ".env"):
if not (default_path := dotenv.find_dotenv(default_name, usecwd=True)):
continue
data |= dotenv.dotenv_values(default_path, encoding="utf-8")
if path is not None and os.path.isfile(path):
data |= dotenv.dotenv_values(path, encoding="utf-8")
for key, value in data.items():
if key in os.environ or value is None:
continue
os.environ[key] = value
return bool(data) # True if at least one env var was loaded.
def show_server_banner(debug: bool, app_import_path: str | None) -> None:
"""Show extra startup messages the first time the server is run,
ignoring the reloader.
"""
if is_running_from_reloader():
return
if app_import_path is not None:
click.echo(f" * Serving Flask app '{app_import_path}'")
if debug is not None:
click.echo(f" * Debug mode: {'on' if debug else 'off'}")
class CertParamType(click.ParamType):
"""Click option type for the ``--cert`` option. Allows either an
existing file, the string ``'adhoc'``, or an import for a
:class:`~ssl.SSLContext` object.
"""
name = "path"
def __init__(self) -> None:
self.path_type = click.Path(exists=True, dir_okay=False, resolve_path=True)
def convert(
self, value: t.Any, param: click.Parameter | None, ctx: click.Context | None
) -> t.Any:
try:
import ssl
except ImportError:
raise click.BadParameter(
'Using "--cert" requires Python to be compiled with SSL support.',
ctx,
param,
) from None
try:
return self.path_type(value, param, ctx)
except click.BadParameter:
value = click.STRING(value, param, ctx).lower()
if value == "adhoc":
try:
import cryptography # noqa: F401
except ImportError:
raise click.BadParameter(
"Using ad-hoc certificates requires the cryptography library.",
ctx,
param,
) from None
return value
obj = import_string(value, silent=True)
if isinstance(obj, ssl.SSLContext):
return obj
raise
def _validate_key(ctx: click.Context, param: click.Parameter, value: t.Any) -> t.Any:
"""The ``--key`` option must be specified when ``--cert`` is a file.
Modifies the ``cert`` param to be a ``(cert, key)`` pair if needed.
"""
cert = ctx.params.get("cert")
is_adhoc = cert == "adhoc"
try:
import ssl
except ImportError:
is_context = False
else:
is_context = isinstance(cert, ssl.SSLContext)
if value is not None:
if is_adhoc:
raise click.BadParameter(
'When "--cert" is "adhoc", "--key" is not used.', ctx, param
)
if is_context:
raise click.BadParameter(
'When "--cert" is an SSLContext object, "--key" is not used.',
ctx,
param,
)
if not cert:
raise click.BadParameter('"--cert" must also be specified.', ctx, param)
ctx.params["cert"] = cert, value
else:
if cert and not (is_adhoc or is_context):
raise click.BadParameter('Required when using "--cert".', ctx, param)
return value
class SeparatedPathType(click.Path):
"""Click option type that accepts a list of values separated by the
OS's path separator (``:``, ``;`` on Windows). Each value is
validated as a :class:`click.Path` type.
"""
def convert(
self, value: t.Any, param: click.Parameter | None, ctx: click.Context | None
) -> t.Any:
items = self.split_envvar_value(value)
# can't call no-arg super() inside list comprehension until Python 3.12
super_convert = super().convert
return [super_convert(item, param, ctx) for item in items]
@click.command("run", short_help="Run a development server.")
@click.option("--host", "-h", default="127.0.0.1", help="The interface to bind to.")
@click.option("--port", "-p", default=5000, help="The port to bind to.")
@click.option(
"--cert",
type=CertParamType(),
help="Specify a certificate file to use HTTPS.",
is_eager=True,
)
@click.option(
"--key",
type=click.Path(exists=True, dir_okay=False, resolve_path=True),
callback=_validate_key,
expose_value=False,
help="The key file to use when specifying a certificate.",
)
@click.option(
"--reload/--no-reload",
default=None,
help="Enable or disable the reloader. By default the reloader "
"is active if debug is enabled.",
)
@click.option(
"--debugger/--no-debugger",
default=None,
help="Enable or disable the debugger. By default the debugger "
"is active if debug is enabled.",
)
@click.option(
"--with-threads/--without-threads",
default=True,
help="Enable or disable multithreading.",
)
@click.option(
"--extra-files",
default=None,
type=SeparatedPathType(),
help=(
"Extra files that trigger a reload on change. Multiple paths"
f" are separated by {os.path.pathsep!r}."
),
)
@click.option(
"--exclude-patterns",
default=None,
type=SeparatedPathType(),
help=(
"Files matching these fnmatch patterns will not trigger a reload"
" on change. Multiple patterns are separated by"
f" {os.path.pathsep!r}."
),
)
@pass_script_info
def run_command(
info: ScriptInfo,
host: str,
port: int,
reload: bool,
debugger: bool,
with_threads: bool,
cert: ssl.SSLContext | tuple[str, str | None] | t.Literal["adhoc"] | None,
extra_files: list[str] | None,
exclude_patterns: list[str] | None,
) -> None:
"""Run a local development server.
This server is for development purposes only. It does not provide
the stability, security, or performance of production WSGI servers.
The reloader and debugger are enabled by default with the '--debug'
option.
"""
try:
app: WSGIApplication = info.load_app() # pyright: ignore
except Exception as e:
if is_running_from_reloader():
# When reloading, print out the error immediately, but raise
# it later so the debugger or server can handle it.
traceback.print_exc()
err = e
def app(
environ: WSGIEnvironment, start_response: StartResponse
) -> cabc.Iterable[bytes]:
raise err from None
else:
# When not reloading, raise the error immediately so the
# command fails.
raise e from None
debug = get_debug_flag()
if reload is None:
reload = debug
if debugger is None:
debugger = debug
show_server_banner(debug, info.app_import_path)
run_simple(
host,
port,
app,
use_reloader=reload,
use_debugger=debugger,
threaded=with_threads,
ssl_context=cert,
extra_files=extra_files,
exclude_patterns=exclude_patterns,
)
run_command.params.insert(0, _debug_option)
@click.command("shell", short_help="Run a shell in the app context.")
@with_appcontext
def shell_command() -> None:
"""Run an interactive Python shell in the context of a given
Flask application. The application will populate the default
namespace of this shell according to its configuration.
This is useful for executing small snippets of management code
without having to manually configure the application.
"""
import code
banner = (
f"Python {sys.version} on {sys.platform}\n"
f"App: {current_app.import_name}\n"
f"Instance: {current_app.instance_path}"
)
ctx: dict[str, t.Any] = {}
# Support the regular Python interpreter startup script if someone
# is using it.
startup = os.environ.get("PYTHONSTARTUP")
if startup and os.path.isfile(startup):
with open(startup) as f:
eval(compile(f.read(), startup, "exec"), ctx)
ctx.update(current_app.make_shell_context())
# Site, customize, or startup script can set a hook to call when
# entering interactive mode. The default one sets up readline with
# tab and history completion.
interactive_hook = getattr(sys, "__interactivehook__", None)
if interactive_hook is not None:
try:
import readline
from rlcompleter import Completer
except ImportError:
pass
else:
# rlcompleter uses __main__.__dict__ by default, which is
# flask.__main__. Use the shell context instead.
readline.set_completer(Completer(ctx).complete)
interactive_hook()
code.interact(banner=banner, local=ctx)
@click.command("routes", short_help="Show the routes for the app.")
@click.option(
"--sort",
"-s",
type=click.Choice(("endpoint", "methods", "domain", "rule", "match")),
default="endpoint",
help=(
"Method to sort routes by. 'match' is the order that Flask will match routes"
" when dispatching a request."
),
)
@click.option("--all-methods", is_flag=True, help="Show HEAD and OPTIONS methods.")
@with_appcontext
def routes_command(sort: str, all_methods: bool) -> None:
"""Show all registered routes with endpoints and methods."""
rules = list(current_app.url_map.iter_rules())
if not rules:
click.echo("No routes were registered.")
return
ignored_methods = set() if all_methods else {"HEAD", "OPTIONS"}
host_matching = current_app.url_map.host_matching
has_domain = any(rule.host if host_matching else rule.subdomain for rule in rules)
rows = []
for rule in rules:
row = [
rule.endpoint,
", ".join(sorted((rule.methods or set()) - ignored_methods)),
]
if has_domain:
row.append((rule.host if host_matching else rule.subdomain) or "")
row.append(rule.rule)
rows.append(row)
headers = ["Endpoint", "Methods"]
sorts = ["endpoint", "methods"]
if has_domain:
headers.append("Host" if host_matching else "Subdomain")
sorts.append("domain")
headers.append("Rule")
sorts.append("rule")
try:
rows.sort(key=itemgetter(sorts.index(sort)))
except ValueError:
pass
rows.insert(0, headers)
widths = [max(len(row[i]) for row in rows) for i in range(len(headers))]
rows.insert(1, ["-" * w for w in widths])
template = " ".join(f"{{{i}:<{w}}}" for i, w in enumerate(widths))
for row in rows:
click.echo(template.format(*row))
cli = FlaskGroup(
name="flask",
help="""\
A general utility script for Flask applications.
An application to load must be given with the '--app' option,
'FLASK_APP' environment variable, or with a 'wsgi.py' or 'app.py' file
in the current directory.
""",
)
def main() -> None:
cli.main()
if __name__ == "__main__":
main()
|