Spaces:
Running
Running
File size: 6,701 Bytes
47b2311 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 |
from __future__ import annotations
import collections.abc as cabc
import mimetypes
import os
import typing as t
from io import BytesIO
from os import fsdecode
from os import fspath
from .._internal import _plain_int
from .headers import Headers
from .structures import MultiDict
class FileStorage:
"""The :class:`FileStorage` class is a thin wrapper over incoming files.
It is used by the request object to represent uploaded files. All the
attributes of the wrapper stream are proxied by the file storage so
it's possible to do ``storage.read()`` instead of the long form
``storage.stream.read()``.
"""
def __init__(
self,
stream: t.IO[bytes] | None = None,
filename: str | None = None,
name: str | None = None,
content_type: str | None = None,
content_length: int | None = None,
headers: Headers | None = None,
):
self.name = name
self.stream = stream or BytesIO()
# If no filename is provided, attempt to get the filename from
# the stream object. Python names special streams like
# ``<stderr>`` with angular brackets, skip these streams.
if filename is None:
filename = getattr(stream, "name", None)
if filename is not None:
filename = fsdecode(filename)
if filename and filename[0] == "<" and filename[-1] == ">":
filename = None
else:
filename = fsdecode(filename)
self.filename = filename
if headers is None:
headers = Headers()
self.headers = headers
if content_type is not None:
headers["Content-Type"] = content_type
if content_length is not None:
headers["Content-Length"] = str(content_length)
def _parse_content_type(self) -> None:
if not hasattr(self, "_parsed_content_type"):
self._parsed_content_type = http.parse_options_header(self.content_type)
@property
def content_type(self) -> str | None:
"""The content-type sent in the header. Usually not available"""
return self.headers.get("content-type")
@property
def content_length(self) -> int:
"""The content-length sent in the header. Usually not available"""
if "content-length" in self.headers:
try:
return _plain_int(self.headers["content-length"])
except ValueError:
pass
return 0
@property
def mimetype(self) -> str:
"""Like :attr:`content_type`, but without parameters (eg, without
charset, type etc.) and always lowercase. For example if the content
type is ``text/HTML; charset=utf-8`` the mimetype would be
``'text/html'``.
.. versionadded:: 0.7
"""
self._parse_content_type()
return self._parsed_content_type[0].lower()
@property
def mimetype_params(self) -> dict[str, str]:
"""The mimetype parameters as dict. For example if the content
type is ``text/html; charset=utf-8`` the params would be
``{'charset': 'utf-8'}``.
.. versionadded:: 0.7
"""
self._parse_content_type()
return self._parsed_content_type[1]
def save(
self, dst: str | os.PathLike[str] | t.IO[bytes], buffer_size: int = 16384
) -> None:
"""Save the file to a destination path or file object. If the
destination is a file object you have to close it yourself after the
call. The buffer size is the number of bytes held in memory during
the copy process. It defaults to 16KB.
For secure file saving also have a look at :func:`secure_filename`.
:param dst: a filename, :class:`os.PathLike`, or open file
object to write to.
:param buffer_size: Passed as the ``length`` parameter of
:func:`shutil.copyfileobj`.
.. versionchanged:: 1.0
Supports :mod:`pathlib`.
"""
from shutil import copyfileobj
close_dst = False
if hasattr(dst, "__fspath__"):
dst = fspath(dst)
if isinstance(dst, str):
dst = open(dst, "wb")
close_dst = True
try:
copyfileobj(self.stream, dst, buffer_size)
finally:
if close_dst:
dst.close()
def close(self) -> None:
"""Close the underlying file if possible."""
try:
self.stream.close()
except Exception:
pass
def __bool__(self) -> bool:
return bool(self.filename)
def __getattr__(self, name: str) -> t.Any:
try:
return getattr(self.stream, name)
except AttributeError:
# SpooledTemporaryFile on Python < 3.11 doesn't implement IOBase,
# get the attribute from its backing file instead.
if hasattr(self.stream, "_file"):
return getattr(self.stream._file, name)
raise
def __iter__(self) -> cabc.Iterator[bytes]:
return iter(self.stream)
def __repr__(self) -> str:
return f"<{type(self).__name__}: {self.filename!r} ({self.content_type!r})>"
class FileMultiDict(MultiDict[str, FileStorage]):
"""A special :class:`MultiDict` that has convenience methods to add
files to it. This is used for :class:`EnvironBuilder` and generally
useful for unittesting.
.. versionadded:: 0.5
"""
def add_file(
self,
name: str,
file: str | os.PathLike[str] | t.IO[bytes] | FileStorage,
filename: str | None = None,
content_type: str | None = None,
) -> None:
"""Adds a new file to the dict. `file` can be a file name or
a :class:`file`-like or a :class:`FileStorage` object.
:param name: the name of the field.
:param file: a filename or :class:`file`-like object
:param filename: an optional filename
:param content_type: an optional content type
"""
if isinstance(file, FileStorage):
self.add(name, file)
return
if isinstance(file, (str, os.PathLike)):
if filename is None:
filename = os.fspath(file)
file_obj: t.IO[bytes] = open(file, "rb")
else:
file_obj = file # type: ignore[assignment]
if filename and content_type is None:
content_type = (
mimetypes.guess_type(filename)[0] or "application/octet-stream"
)
self.add(name, FileStorage(file_obj, filename, name, content_type))
# circular dependencies
from .. import http
|