|
|
|
|
|
from __future__ import annotations |
|
|
|
import abc |
|
import contextlib |
|
import copy |
|
import enum |
|
import functools |
|
import inspect |
|
import itertools |
|
import linecache |
|
import sys |
|
import types |
|
import typing |
|
|
|
from operator import itemgetter |
|
|
|
|
|
|
|
from . import _compat, _config, setters |
|
from ._compat import ( |
|
PY_3_8_PLUS, |
|
PY_3_10_PLUS, |
|
PY_3_11_PLUS, |
|
_AnnotationExtractor, |
|
_get_annotations, |
|
get_generic_base, |
|
) |
|
from .exceptions import ( |
|
DefaultAlreadySetError, |
|
FrozenInstanceError, |
|
NotAnAttrsClassError, |
|
UnannotatedAttributeError, |
|
) |
|
|
|
|
|
|
|
_OBJ_SETATTR = object.__setattr__ |
|
_INIT_FACTORY_PAT = "__attr_factory_%s" |
|
_CLASSVAR_PREFIXES = ( |
|
"typing.ClassVar", |
|
"t.ClassVar", |
|
"ClassVar", |
|
"typing_extensions.ClassVar", |
|
) |
|
|
|
|
|
|
|
_HASH_CACHE_FIELD = "_attrs_cached_hash" |
|
|
|
_EMPTY_METADATA_SINGLETON = types.MappingProxyType({}) |
|
|
|
|
|
_SENTINEL = object() |
|
|
|
_DEFAULT_ON_SETATTR = setters.pipe(setters.convert, setters.validate) |
|
|
|
|
|
class _Nothing(enum.Enum): |
|
""" |
|
Sentinel to indicate the lack of a value when `None` is ambiguous. |
|
|
|
If extending attrs, you can use ``typing.Literal[NOTHING]`` to show |
|
that a value may be ``NOTHING``. |
|
|
|
.. versionchanged:: 21.1.0 ``bool(NOTHING)`` is now False. |
|
.. versionchanged:: 22.2.0 ``NOTHING`` is now an ``enum.Enum`` variant. |
|
""" |
|
|
|
NOTHING = enum.auto() |
|
|
|
def __repr__(self): |
|
return "NOTHING" |
|
|
|
def __bool__(self): |
|
return False |
|
|
|
|
|
NOTHING = _Nothing.NOTHING |
|
""" |
|
Sentinel to indicate the lack of a value when `None` is ambiguous. |
|
""" |
|
|
|
|
|
class _CacheHashWrapper(int): |
|
""" |
|
An integer subclass that pickles / copies as None |
|
|
|
This is used for non-slots classes with ``cache_hash=True``, to avoid |
|
serializing a potentially (even likely) invalid hash value. Since `None` |
|
is the default value for uncalculated hashes, whenever this is copied, |
|
the copy's value for the hash should automatically reset. |
|
|
|
See GH #613 for more details. |
|
""" |
|
|
|
def __reduce__(self, _none_constructor=type(None), _args=()): |
|
return _none_constructor, _args |
|
|
|
|
|
def attrib( |
|
default=NOTHING, |
|
validator=None, |
|
repr=True, |
|
cmp=None, |
|
hash=None, |
|
init=True, |
|
metadata=None, |
|
type=None, |
|
converter=None, |
|
factory=None, |
|
kw_only=False, |
|
eq=None, |
|
order=None, |
|
on_setattr=None, |
|
alias=None, |
|
): |
|
""" |
|
Create a new field / attribute on a class. |
|
|
|
Identical to `attrs.field`, except it's not keyword-only. |
|
|
|
Consider using `attrs.field` in new code (``attr.ib`` will *never* go away, |
|
though). |
|
|
|
.. warning:: |
|
|
|
Does **nothing** unless the class is also decorated with |
|
`attr.s` (or similar)! |
|
|
|
|
|
.. versionadded:: 15.2.0 *convert* |
|
.. versionadded:: 16.3.0 *metadata* |
|
.. versionchanged:: 17.1.0 *validator* can be a ``list`` now. |
|
.. versionchanged:: 17.1.0 |
|
*hash* is `None` and therefore mirrors *eq* by default. |
|
.. versionadded:: 17.3.0 *type* |
|
.. deprecated:: 17.4.0 *convert* |
|
.. versionadded:: 17.4.0 |
|
*converter* as a replacement for the deprecated *convert* to achieve |
|
consistency with other noun-based arguments. |
|
.. versionadded:: 18.1.0 |
|
``factory=f`` is syntactic sugar for ``default=attr.Factory(f)``. |
|
.. versionadded:: 18.2.0 *kw_only* |
|
.. versionchanged:: 19.2.0 *convert* keyword argument removed. |
|
.. versionchanged:: 19.2.0 *repr* also accepts a custom callable. |
|
.. deprecated:: 19.2.0 *cmp* Removal on or after 2021-06-01. |
|
.. versionadded:: 19.2.0 *eq* and *order* |
|
.. versionadded:: 20.1.0 *on_setattr* |
|
.. versionchanged:: 20.3.0 *kw_only* backported to Python 2 |
|
.. versionchanged:: 21.1.0 |
|
*eq*, *order*, and *cmp* also accept a custom callable |
|
.. versionchanged:: 21.1.0 *cmp* undeprecated |
|
.. versionadded:: 22.2.0 *alias* |
|
""" |
|
eq, eq_key, order, order_key = _determine_attrib_eq_order( |
|
cmp, eq, order, True |
|
) |
|
|
|
if hash is not None and hash is not True and hash is not False: |
|
msg = "Invalid value for hash. Must be True, False, or None." |
|
raise TypeError(msg) |
|
|
|
if factory is not None: |
|
if default is not NOTHING: |
|
msg = ( |
|
"The `default` and `factory` arguments are mutually exclusive." |
|
) |
|
raise ValueError(msg) |
|
if not callable(factory): |
|
msg = "The `factory` argument must be a callable." |
|
raise ValueError(msg) |
|
default = Factory(factory) |
|
|
|
if metadata is None: |
|
metadata = {} |
|
|
|
|
|
if isinstance(on_setattr, (list, tuple)): |
|
on_setattr = setters.pipe(*on_setattr) |
|
|
|
if validator and isinstance(validator, (list, tuple)): |
|
validator = and_(*validator) |
|
|
|
if converter and isinstance(converter, (list, tuple)): |
|
converter = pipe(*converter) |
|
|
|
return _CountingAttr( |
|
default=default, |
|
validator=validator, |
|
repr=repr, |
|
cmp=None, |
|
hash=hash, |
|
init=init, |
|
converter=converter, |
|
metadata=metadata, |
|
type=type, |
|
kw_only=kw_only, |
|
eq=eq, |
|
eq_key=eq_key, |
|
order=order, |
|
order_key=order_key, |
|
on_setattr=on_setattr, |
|
alias=alias, |
|
) |
|
|
|
|
|
def _compile_and_eval(script, globs, locs=None, filename=""): |
|
""" |
|
Evaluate the script with the given global (globs) and local (locs) |
|
variables. |
|
""" |
|
bytecode = compile(script, filename, "exec") |
|
eval(bytecode, globs, locs) |
|
|
|
|
|
def _make_method(name, script, filename, globs, locals=None): |
|
""" |
|
Create the method with the script given and return the method object. |
|
""" |
|
locs = {} if locals is None else locals |
|
|
|
|
|
|
|
count = 1 |
|
base_filename = filename |
|
while True: |
|
linecache_tuple = ( |
|
len(script), |
|
None, |
|
script.splitlines(True), |
|
filename, |
|
) |
|
old_val = linecache.cache.setdefault(filename, linecache_tuple) |
|
if old_val == linecache_tuple: |
|
break |
|
|
|
filename = f"{base_filename[:-1]}-{count}>" |
|
count += 1 |
|
|
|
_compile_and_eval(script, globs, locs, filename) |
|
|
|
return locs[name] |
|
|
|
|
|
def _make_attr_tuple_class(cls_name, attr_names): |
|
""" |
|
Create a tuple subclass to hold `Attribute`s for an `attrs` class. |
|
|
|
The subclass is a bare tuple with properties for names. |
|
|
|
class MyClassAttributes(tuple): |
|
__slots__ = () |
|
x = property(itemgetter(0)) |
|
""" |
|
attr_class_name = f"{cls_name}Attributes" |
|
attr_class_template = [ |
|
f"class {attr_class_name}(tuple):", |
|
" __slots__ = ()", |
|
] |
|
if attr_names: |
|
for i, attr_name in enumerate(attr_names): |
|
attr_class_template.append( |
|
f" {attr_name} = _attrs_property(_attrs_itemgetter({i}))" |
|
) |
|
else: |
|
attr_class_template.append(" pass") |
|
globs = {"_attrs_itemgetter": itemgetter, "_attrs_property": property} |
|
_compile_and_eval("\n".join(attr_class_template), globs) |
|
return globs[attr_class_name] |
|
|
|
|
|
|
|
|
|
_Attributes = _make_attr_tuple_class( |
|
"_Attributes", |
|
[ |
|
|
|
"attrs", |
|
|
|
"base_attrs", |
|
|
|
"base_attrs_map", |
|
], |
|
) |
|
|
|
|
|
def _is_class_var(annot): |
|
""" |
|
Check whether *annot* is a typing.ClassVar. |
|
|
|
The string comparison hack is used to avoid evaluating all string |
|
annotations which would put attrs-based classes at a performance |
|
disadvantage compared to plain old classes. |
|
""" |
|
annot = str(annot) |
|
|
|
|
|
if annot.startswith(("'", '"')) and annot.endswith(("'", '"')): |
|
annot = annot[1:-1] |
|
|
|
return annot.startswith(_CLASSVAR_PREFIXES) |
|
|
|
|
|
def _has_own_attribute(cls, attrib_name): |
|
""" |
|
Check whether *cls* defines *attrib_name* (and doesn't just inherit it). |
|
""" |
|
return attrib_name in cls.__dict__ |
|
|
|
|
|
def _collect_base_attrs(cls, taken_attr_names): |
|
""" |
|
Collect attr.ibs from base classes of *cls*, except *taken_attr_names*. |
|
""" |
|
base_attrs = [] |
|
base_attr_map = {} |
|
|
|
|
|
for base_cls in reversed(cls.__mro__[1:-1]): |
|
for a in getattr(base_cls, "__attrs_attrs__", []): |
|
if a.inherited or a.name in taken_attr_names: |
|
continue |
|
|
|
a = a.evolve(inherited=True) |
|
base_attrs.append(a) |
|
base_attr_map[a.name] = base_cls |
|
|
|
|
|
|
|
|
|
filtered = [] |
|
seen = set() |
|
for a in reversed(base_attrs): |
|
if a.name in seen: |
|
continue |
|
filtered.insert(0, a) |
|
seen.add(a.name) |
|
|
|
return filtered, base_attr_map |
|
|
|
|
|
def _collect_base_attrs_broken(cls, taken_attr_names): |
|
""" |
|
Collect attr.ibs from base classes of *cls*, except *taken_attr_names*. |
|
|
|
N.B. *taken_attr_names* will be mutated. |
|
|
|
Adhere to the old incorrect behavior. |
|
|
|
Notably it collects from the front and considers inherited attributes which |
|
leads to the buggy behavior reported in #428. |
|
""" |
|
base_attrs = [] |
|
base_attr_map = {} |
|
|
|
|
|
for base_cls in cls.__mro__[1:-1]: |
|
for a in getattr(base_cls, "__attrs_attrs__", []): |
|
if a.name in taken_attr_names: |
|
continue |
|
|
|
a = a.evolve(inherited=True) |
|
taken_attr_names.add(a.name) |
|
base_attrs.append(a) |
|
base_attr_map[a.name] = base_cls |
|
|
|
return base_attrs, base_attr_map |
|
|
|
|
|
def _transform_attrs( |
|
cls, these, auto_attribs, kw_only, collect_by_mro, field_transformer |
|
): |
|
""" |
|
Transform all `_CountingAttr`s on a class into `Attribute`s. |
|
|
|
If *these* is passed, use that and don't look for them on the class. |
|
|
|
If *collect_by_mro* is True, collect them in the correct MRO order, |
|
otherwise use the old -- incorrect -- order. See #428. |
|
|
|
Return an `_Attributes`. |
|
""" |
|
cd = cls.__dict__ |
|
anns = _get_annotations(cls) |
|
|
|
if these is not None: |
|
ca_list = list(these.items()) |
|
elif auto_attribs is True: |
|
ca_names = { |
|
name |
|
for name, attr in cd.items() |
|
if isinstance(attr, _CountingAttr) |
|
} |
|
ca_list = [] |
|
annot_names = set() |
|
for attr_name, type in anns.items(): |
|
if _is_class_var(type): |
|
continue |
|
annot_names.add(attr_name) |
|
a = cd.get(attr_name, NOTHING) |
|
|
|
if not isinstance(a, _CountingAttr): |
|
a = attrib() if a is NOTHING else attrib(default=a) |
|
ca_list.append((attr_name, a)) |
|
|
|
unannotated = ca_names - annot_names |
|
if len(unannotated) > 0: |
|
raise UnannotatedAttributeError( |
|
"The following `attr.ib`s lack a type annotation: " |
|
+ ", ".join( |
|
sorted(unannotated, key=lambda n: cd.get(n).counter) |
|
) |
|
+ "." |
|
) |
|
else: |
|
ca_list = sorted( |
|
( |
|
(name, attr) |
|
for name, attr in cd.items() |
|
if isinstance(attr, _CountingAttr) |
|
), |
|
key=lambda e: e[1].counter, |
|
) |
|
|
|
own_attrs = [ |
|
Attribute.from_counting_attr( |
|
name=attr_name, ca=ca, type=anns.get(attr_name) |
|
) |
|
for attr_name, ca in ca_list |
|
] |
|
|
|
if collect_by_mro: |
|
base_attrs, base_attr_map = _collect_base_attrs( |
|
cls, {a.name for a in own_attrs} |
|
) |
|
else: |
|
base_attrs, base_attr_map = _collect_base_attrs_broken( |
|
cls, {a.name for a in own_attrs} |
|
) |
|
|
|
if kw_only: |
|
own_attrs = [a.evolve(kw_only=True) for a in own_attrs] |
|
base_attrs = [a.evolve(kw_only=True) for a in base_attrs] |
|
|
|
attrs = base_attrs + own_attrs |
|
|
|
|
|
|
|
|
|
|
|
had_default = False |
|
for a in (a for a in attrs if a.init is not False and a.kw_only is False): |
|
if had_default is True and a.default is NOTHING: |
|
msg = f"No mandatory attributes allowed after an attribute with a default value or factory. Attribute in question: {a!r}" |
|
raise ValueError(msg) |
|
|
|
if had_default is False and a.default is not NOTHING: |
|
had_default = True |
|
|
|
if field_transformer is not None: |
|
attrs = field_transformer(cls, attrs) |
|
|
|
|
|
|
|
|
|
attrs = [ |
|
a.evolve(alias=_default_init_alias_for(a.name)) if not a.alias else a |
|
for a in attrs |
|
] |
|
|
|
|
|
|
|
attr_names = [a.name for a in attrs] |
|
AttrsClass = _make_attr_tuple_class(cls.__name__, attr_names) |
|
|
|
return _Attributes((AttrsClass(attrs), base_attrs, base_attr_map)) |
|
|
|
|
|
def _make_cached_property_getattr(cached_properties, original_getattr, cls): |
|
lines = [ |
|
|
|
|
|
"def wrapper(_cls):", |
|
" __class__ = _cls", |
|
" def __getattr__(self, item, cached_properties=cached_properties, original_getattr=original_getattr, _cached_setattr_get=_cached_setattr_get):", |
|
" func = cached_properties.get(item)", |
|
" if func is not None:", |
|
" result = func(self)", |
|
" _setter = _cached_setattr_get(self)", |
|
" _setter(item, result)", |
|
" return result", |
|
] |
|
if original_getattr is not None: |
|
lines.append( |
|
" return original_getattr(self, item)", |
|
) |
|
else: |
|
lines.extend( |
|
[ |
|
" try:", |
|
" return super().__getattribute__(item)", |
|
" except AttributeError:", |
|
" if not hasattr(super(), '__getattr__'):", |
|
" raise", |
|
" return super().__getattr__(item)", |
|
" original_error = f\"'{self.__class__.__name__}' object has no attribute '{item}'\"", |
|
" raise AttributeError(original_error)", |
|
] |
|
) |
|
|
|
lines.extend( |
|
[ |
|
" return __getattr__", |
|
"__getattr__ = wrapper(_cls)", |
|
] |
|
) |
|
|
|
unique_filename = _generate_unique_filename(cls, "getattr") |
|
|
|
glob = { |
|
"cached_properties": cached_properties, |
|
"_cached_setattr_get": _OBJ_SETATTR.__get__, |
|
"original_getattr": original_getattr, |
|
} |
|
|
|
return _make_method( |
|
"__getattr__", |
|
"\n".join(lines), |
|
unique_filename, |
|
glob, |
|
locals={ |
|
"_cls": cls, |
|
}, |
|
) |
|
|
|
|
|
def _frozen_setattrs(self, name, value): |
|
""" |
|
Attached to frozen classes as __setattr__. |
|
""" |
|
if isinstance(self, BaseException) and name in ( |
|
"__cause__", |
|
"__context__", |
|
"__traceback__", |
|
): |
|
BaseException.__setattr__(self, name, value) |
|
return |
|
|
|
raise FrozenInstanceError() |
|
|
|
|
|
def _frozen_delattrs(self, name): |
|
""" |
|
Attached to frozen classes as __delattr__. |
|
""" |
|
raise FrozenInstanceError() |
|
|
|
|
|
class _ClassBuilder: |
|
""" |
|
Iteratively build *one* class. |
|
""" |
|
|
|
__slots__ = ( |
|
"_attr_names", |
|
"_attrs", |
|
"_base_attr_map", |
|
"_base_names", |
|
"_cache_hash", |
|
"_cls", |
|
"_cls_dict", |
|
"_delete_attribs", |
|
"_frozen", |
|
"_has_pre_init", |
|
"_pre_init_has_args", |
|
"_has_post_init", |
|
"_is_exc", |
|
"_on_setattr", |
|
"_slots", |
|
"_weakref_slot", |
|
"_wrote_own_setattr", |
|
"_has_custom_setattr", |
|
) |
|
|
|
def __init__( |
|
self, |
|
cls, |
|
these, |
|
slots, |
|
frozen, |
|
weakref_slot, |
|
getstate_setstate, |
|
auto_attribs, |
|
kw_only, |
|
cache_hash, |
|
is_exc, |
|
collect_by_mro, |
|
on_setattr, |
|
has_custom_setattr, |
|
field_transformer, |
|
): |
|
attrs, base_attrs, base_map = _transform_attrs( |
|
cls, |
|
these, |
|
auto_attribs, |
|
kw_only, |
|
collect_by_mro, |
|
field_transformer, |
|
) |
|
|
|
self._cls = cls |
|
self._cls_dict = dict(cls.__dict__) if slots else {} |
|
self._attrs = attrs |
|
self._base_names = {a.name for a in base_attrs} |
|
self._base_attr_map = base_map |
|
self._attr_names = tuple(a.name for a in attrs) |
|
self._slots = slots |
|
self._frozen = frozen |
|
self._weakref_slot = weakref_slot |
|
self._cache_hash = cache_hash |
|
self._has_pre_init = bool(getattr(cls, "__attrs_pre_init__", False)) |
|
self._pre_init_has_args = False |
|
if self._has_pre_init: |
|
|
|
|
|
pre_init_func = cls.__attrs_pre_init__ |
|
pre_init_signature = inspect.signature(pre_init_func) |
|
self._pre_init_has_args = len(pre_init_signature.parameters) > 1 |
|
self._has_post_init = bool(getattr(cls, "__attrs_post_init__", False)) |
|
self._delete_attribs = not bool(these) |
|
self._is_exc = is_exc |
|
self._on_setattr = on_setattr |
|
|
|
self._has_custom_setattr = has_custom_setattr |
|
self._wrote_own_setattr = False |
|
|
|
self._cls_dict["__attrs_attrs__"] = self._attrs |
|
|
|
if frozen: |
|
self._cls_dict["__setattr__"] = _frozen_setattrs |
|
self._cls_dict["__delattr__"] = _frozen_delattrs |
|
|
|
self._wrote_own_setattr = True |
|
elif on_setattr in ( |
|
_DEFAULT_ON_SETATTR, |
|
setters.validate, |
|
setters.convert, |
|
): |
|
has_validator = has_converter = False |
|
for a in attrs: |
|
if a.validator is not None: |
|
has_validator = True |
|
if a.converter is not None: |
|
has_converter = True |
|
|
|
if has_validator and has_converter: |
|
break |
|
if ( |
|
( |
|
on_setattr == _DEFAULT_ON_SETATTR |
|
and not (has_validator or has_converter) |
|
) |
|
or (on_setattr == setters.validate and not has_validator) |
|
or (on_setattr == setters.convert and not has_converter) |
|
): |
|
|
|
|
|
|
|
self._on_setattr = None |
|
|
|
if getstate_setstate: |
|
( |
|
self._cls_dict["__getstate__"], |
|
self._cls_dict["__setstate__"], |
|
) = self._make_getstate_setstate() |
|
|
|
def __repr__(self): |
|
return f"<_ClassBuilder(cls={self._cls.__name__})>" |
|
|
|
def build_class(self): |
|
""" |
|
Finalize class based on the accumulated configuration. |
|
|
|
Builder cannot be used after calling this method. |
|
""" |
|
if self._slots is True: |
|
cls = self._create_slots_class() |
|
else: |
|
cls = self._patch_original_class() |
|
if PY_3_10_PLUS: |
|
cls = abc.update_abstractmethods(cls) |
|
|
|
|
|
|
|
if ( |
|
getattr(cls, "__attrs_init_subclass__", None) |
|
and "__attrs_init_subclass__" not in cls.__dict__ |
|
): |
|
cls.__attrs_init_subclass__() |
|
|
|
return cls |
|
|
|
def _patch_original_class(self): |
|
""" |
|
Apply accumulated methods and return the class. |
|
""" |
|
cls = self._cls |
|
base_names = self._base_names |
|
|
|
|
|
if self._delete_attribs: |
|
for name in self._attr_names: |
|
if ( |
|
name not in base_names |
|
and getattr(cls, name, _SENTINEL) is not _SENTINEL |
|
): |
|
|
|
|
|
|
|
with contextlib.suppress(AttributeError): |
|
delattr(cls, name) |
|
|
|
|
|
for name, value in self._cls_dict.items(): |
|
setattr(cls, name, value) |
|
|
|
|
|
|
|
if not self._wrote_own_setattr and getattr( |
|
cls, "__attrs_own_setattr__", False |
|
): |
|
cls.__attrs_own_setattr__ = False |
|
|
|
if not self._has_custom_setattr: |
|
cls.__setattr__ = _OBJ_SETATTR |
|
|
|
return cls |
|
|
|
def _create_slots_class(self): |
|
""" |
|
Build and return a new class with a `__slots__` attribute. |
|
""" |
|
cd = { |
|
k: v |
|
for k, v in self._cls_dict.items() |
|
if k not in (*tuple(self._attr_names), "__dict__", "__weakref__") |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if not self._wrote_own_setattr: |
|
cd["__attrs_own_setattr__"] = False |
|
|
|
if not self._has_custom_setattr: |
|
for base_cls in self._cls.__bases__: |
|
if base_cls.__dict__.get("__attrs_own_setattr__", False): |
|
cd["__setattr__"] = _OBJ_SETATTR |
|
break |
|
|
|
|
|
|
|
existing_slots = {} |
|
weakref_inherited = False |
|
for base_cls in self._cls.__mro__[1:-1]: |
|
if base_cls.__dict__.get("__weakref__", None) is not None: |
|
weakref_inherited = True |
|
existing_slots.update( |
|
{ |
|
name: getattr(base_cls, name) |
|
for name in getattr(base_cls, "__slots__", []) |
|
} |
|
) |
|
|
|
base_names = set(self._base_names) |
|
|
|
names = self._attr_names |
|
if ( |
|
self._weakref_slot |
|
and "__weakref__" not in getattr(self._cls, "__slots__", ()) |
|
and "__weakref__" not in names |
|
and not weakref_inherited |
|
): |
|
names += ("__weakref__",) |
|
|
|
if PY_3_8_PLUS: |
|
cached_properties = { |
|
name: cached_property.func |
|
for name, cached_property in cd.items() |
|
if isinstance(cached_property, functools.cached_property) |
|
} |
|
else: |
|
|
|
|
|
cached_properties = {} |
|
|
|
|
|
|
|
additional_closure_functions_to_update = [] |
|
if cached_properties: |
|
class_annotations = _get_annotations(self._cls) |
|
for name, func in cached_properties.items(): |
|
|
|
names += (name,) |
|
|
|
del cd[name] |
|
additional_closure_functions_to_update.append(func) |
|
annotation = inspect.signature(func).return_annotation |
|
if annotation is not inspect.Parameter.empty: |
|
class_annotations[name] = annotation |
|
|
|
original_getattr = cd.get("__getattr__") |
|
if original_getattr is not None: |
|
additional_closure_functions_to_update.append(original_getattr) |
|
|
|
cd["__getattr__"] = _make_cached_property_getattr( |
|
cached_properties, original_getattr, self._cls |
|
) |
|
|
|
|
|
|
|
slot_names = [name for name in names if name not in base_names] |
|
|
|
|
|
|
|
|
|
|
|
reused_slots = { |
|
slot: slot_descriptor |
|
for slot, slot_descriptor in existing_slots.items() |
|
if slot in slot_names |
|
} |
|
slot_names = [name for name in slot_names if name not in reused_slots] |
|
cd.update(reused_slots) |
|
if self._cache_hash: |
|
slot_names.append(_HASH_CACHE_FIELD) |
|
|
|
cd["__slots__"] = tuple(slot_names) |
|
|
|
cd["__qualname__"] = self._cls.__qualname__ |
|
|
|
|
|
cls = type(self._cls)(self._cls.__name__, self._cls.__bases__, cd) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
for item in itertools.chain( |
|
cls.__dict__.values(), additional_closure_functions_to_update |
|
): |
|
if isinstance(item, (classmethod, staticmethod)): |
|
|
|
|
|
closure_cells = getattr(item.__func__, "__closure__", None) |
|
elif isinstance(item, property): |
|
|
|
|
|
closure_cells = getattr(item.fget, "__closure__", None) |
|
else: |
|
closure_cells = getattr(item, "__closure__", None) |
|
|
|
if not closure_cells: |
|
continue |
|
for cell in closure_cells: |
|
try: |
|
match = cell.cell_contents is self._cls |
|
except ValueError: |
|
|
|
pass |
|
else: |
|
if match: |
|
cell.cell_contents = cls |
|
return cls |
|
|
|
def add_repr(self, ns): |
|
self._cls_dict["__repr__"] = self._add_method_dunders( |
|
_make_repr(self._attrs, ns, self._cls) |
|
) |
|
return self |
|
|
|
def add_str(self): |
|
repr = self._cls_dict.get("__repr__") |
|
if repr is None: |
|
msg = "__str__ can only be generated if a __repr__ exists." |
|
raise ValueError(msg) |
|
|
|
def __str__(self): |
|
return self.__repr__() |
|
|
|
self._cls_dict["__str__"] = self._add_method_dunders(__str__) |
|
return self |
|
|
|
def _make_getstate_setstate(self): |
|
""" |
|
Create custom __setstate__ and __getstate__ methods. |
|
""" |
|
|
|
state_attr_names = tuple( |
|
an for an in self._attr_names if an != "__weakref__" |
|
) |
|
|
|
def slots_getstate(self): |
|
""" |
|
Automatically created by attrs. |
|
""" |
|
return {name: getattr(self, name) for name in state_attr_names} |
|
|
|
hash_caching_enabled = self._cache_hash |
|
|
|
def slots_setstate(self, state): |
|
""" |
|
Automatically created by attrs. |
|
""" |
|
__bound_setattr = _OBJ_SETATTR.__get__(self) |
|
if isinstance(state, tuple): |
|
|
|
|
|
for name, value in zip(state_attr_names, state): |
|
__bound_setattr(name, value) |
|
else: |
|
for name in state_attr_names: |
|
if name in state: |
|
__bound_setattr(name, state[name]) |
|
|
|
|
|
|
|
|
|
|
|
if hash_caching_enabled: |
|
__bound_setattr(_HASH_CACHE_FIELD, None) |
|
|
|
return slots_getstate, slots_setstate |
|
|
|
def make_unhashable(self): |
|
self._cls_dict["__hash__"] = None |
|
return self |
|
|
|
def add_hash(self): |
|
self._cls_dict["__hash__"] = self._add_method_dunders( |
|
_make_hash( |
|
self._cls, |
|
self._attrs, |
|
frozen=self._frozen, |
|
cache_hash=self._cache_hash, |
|
) |
|
) |
|
|
|
return self |
|
|
|
def add_init(self): |
|
self._cls_dict["__init__"] = self._add_method_dunders( |
|
_make_init( |
|
self._cls, |
|
self._attrs, |
|
self._has_pre_init, |
|
self._pre_init_has_args, |
|
self._has_post_init, |
|
self._frozen, |
|
self._slots, |
|
self._cache_hash, |
|
self._base_attr_map, |
|
self._is_exc, |
|
self._on_setattr, |
|
attrs_init=False, |
|
) |
|
) |
|
|
|
return self |
|
|
|
def add_match_args(self): |
|
self._cls_dict["__match_args__"] = tuple( |
|
field.name |
|
for field in self._attrs |
|
if field.init and not field.kw_only |
|
) |
|
|
|
def add_attrs_init(self): |
|
self._cls_dict["__attrs_init__"] = self._add_method_dunders( |
|
_make_init( |
|
self._cls, |
|
self._attrs, |
|
self._has_pre_init, |
|
self._pre_init_has_args, |
|
self._has_post_init, |
|
self._frozen, |
|
self._slots, |
|
self._cache_hash, |
|
self._base_attr_map, |
|
self._is_exc, |
|
self._on_setattr, |
|
attrs_init=True, |
|
) |
|
) |
|
|
|
return self |
|
|
|
def add_eq(self): |
|
cd = self._cls_dict |
|
|
|
cd["__eq__"] = self._add_method_dunders( |
|
_make_eq(self._cls, self._attrs) |
|
) |
|
cd["__ne__"] = self._add_method_dunders(_make_ne()) |
|
|
|
return self |
|
|
|
def add_order(self): |
|
cd = self._cls_dict |
|
|
|
cd["__lt__"], cd["__le__"], cd["__gt__"], cd["__ge__"] = ( |
|
self._add_method_dunders(meth) |
|
for meth in _make_order(self._cls, self._attrs) |
|
) |
|
|
|
return self |
|
|
|
def add_setattr(self): |
|
if self._frozen: |
|
return self |
|
|
|
sa_attrs = {} |
|
for a in self._attrs: |
|
on_setattr = a.on_setattr or self._on_setattr |
|
if on_setattr and on_setattr is not setters.NO_OP: |
|
sa_attrs[a.name] = a, on_setattr |
|
|
|
if not sa_attrs: |
|
return self |
|
|
|
if self._has_custom_setattr: |
|
|
|
msg = "Can't combine custom __setattr__ with on_setattr hooks." |
|
raise ValueError(msg) |
|
|
|
|
|
def __setattr__(self, name, val): |
|
try: |
|
a, hook = sa_attrs[name] |
|
except KeyError: |
|
nval = val |
|
else: |
|
nval = hook(self, a, val) |
|
|
|
_OBJ_SETATTR(self, name, nval) |
|
|
|
self._cls_dict["__attrs_own_setattr__"] = True |
|
self._cls_dict["__setattr__"] = self._add_method_dunders(__setattr__) |
|
self._wrote_own_setattr = True |
|
|
|
return self |
|
|
|
def _add_method_dunders(self, method): |
|
""" |
|
Add __module__ and __qualname__ to a *method* if possible. |
|
""" |
|
with contextlib.suppress(AttributeError): |
|
method.__module__ = self._cls.__module__ |
|
|
|
with contextlib.suppress(AttributeError): |
|
method.__qualname__ = f"{self._cls.__qualname__}.{method.__name__}" |
|
|
|
with contextlib.suppress(AttributeError): |
|
method.__doc__ = ( |
|
"Method generated by attrs for class " |
|
f"{self._cls.__qualname__}." |
|
) |
|
|
|
return method |
|
|
|
|
|
def _determine_attrs_eq_order(cmp, eq, order, default_eq): |
|
""" |
|
Validate the combination of *cmp*, *eq*, and *order*. Derive the effective |
|
values of eq and order. If *eq* is None, set it to *default_eq*. |
|
""" |
|
if cmp is not None and any((eq is not None, order is not None)): |
|
msg = "Don't mix `cmp` with `eq' and `order`." |
|
raise ValueError(msg) |
|
|
|
|
|
if cmp is not None: |
|
return cmp, cmp |
|
|
|
|
|
|
|
if eq is None: |
|
eq = default_eq |
|
|
|
if order is None: |
|
order = eq |
|
|
|
if eq is False and order is True: |
|
msg = "`order` can only be True if `eq` is True too." |
|
raise ValueError(msg) |
|
|
|
return eq, order |
|
|
|
|
|
def _determine_attrib_eq_order(cmp, eq, order, default_eq): |
|
""" |
|
Validate the combination of *cmp*, *eq*, and *order*. Derive the effective |
|
values of eq and order. If *eq* is None, set it to *default_eq*. |
|
""" |
|
if cmp is not None and any((eq is not None, order is not None)): |
|
msg = "Don't mix `cmp` with `eq' and `order`." |
|
raise ValueError(msg) |
|
|
|
def decide_callable_or_boolean(value): |
|
""" |
|
Decide whether a key function is used. |
|
""" |
|
if callable(value): |
|
value, key = True, value |
|
else: |
|
key = None |
|
return value, key |
|
|
|
|
|
if cmp is not None: |
|
cmp, cmp_key = decide_callable_or_boolean(cmp) |
|
return cmp, cmp_key, cmp, cmp_key |
|
|
|
|
|
|
|
if eq is None: |
|
eq, eq_key = default_eq, None |
|
else: |
|
eq, eq_key = decide_callable_or_boolean(eq) |
|
|
|
if order is None: |
|
order, order_key = eq, eq_key |
|
else: |
|
order, order_key = decide_callable_or_boolean(order) |
|
|
|
if eq is False and order is True: |
|
msg = "`order` can only be True if `eq` is True too." |
|
raise ValueError(msg) |
|
|
|
return eq, eq_key, order, order_key |
|
|
|
|
|
def _determine_whether_to_implement( |
|
cls, flag, auto_detect, dunders, default=True |
|
): |
|
""" |
|
Check whether we should implement a set of methods for *cls*. |
|
|
|
*flag* is the argument passed into @attr.s like 'init', *auto_detect* the |
|
same as passed into @attr.s and *dunders* is a tuple of attribute names |
|
whose presence signal that the user has implemented it themselves. |
|
|
|
Return *default* if no reason for either for or against is found. |
|
""" |
|
if flag is True or flag is False: |
|
return flag |
|
|
|
if flag is None and auto_detect is False: |
|
return default |
|
|
|
|
|
for dunder in dunders: |
|
if _has_own_attribute(cls, dunder): |
|
return False |
|
|
|
return default |
|
|
|
|
|
def attrs( |
|
maybe_cls=None, |
|
these=None, |
|
repr_ns=None, |
|
repr=None, |
|
cmp=None, |
|
hash=None, |
|
init=None, |
|
slots=False, |
|
frozen=False, |
|
weakref_slot=True, |
|
str=False, |
|
auto_attribs=False, |
|
kw_only=False, |
|
cache_hash=False, |
|
auto_exc=False, |
|
eq=None, |
|
order=None, |
|
auto_detect=False, |
|
collect_by_mro=False, |
|
getstate_setstate=None, |
|
on_setattr=None, |
|
field_transformer=None, |
|
match_args=True, |
|
unsafe_hash=None, |
|
): |
|
r""" |
|
A class decorator that adds :term:`dunder methods` according to the |
|
specified attributes using `attr.ib` or the *these* argument. |
|
|
|
Consider using `attrs.define` / `attrs.frozen` in new code (``attr.s`` will |
|
*never* go away, though). |
|
|
|
Args: |
|
repr_ns (str): |
|
When using nested classes, there was no way in Python 2 to |
|
automatically detect that. This argument allows to set a custom |
|
name for a more meaningful ``repr`` output. This argument is |
|
pointless in Python 3 and is therefore deprecated. |
|
|
|
.. caution:: |
|
Refer to `attrs.define` for the rest of the parameters, but note that they |
|
can have different defaults. |
|
|
|
Notably, leaving *on_setattr* as `None` will **not** add any hooks. |
|
|
|
.. versionadded:: 16.0.0 *slots* |
|
.. versionadded:: 16.1.0 *frozen* |
|
.. versionadded:: 16.3.0 *str* |
|
.. versionadded:: 16.3.0 Support for ``__attrs_post_init__``. |
|
.. versionchanged:: 17.1.0 |
|
*hash* supports `None` as value which is also the default now. |
|
.. versionadded:: 17.3.0 *auto_attribs* |
|
.. versionchanged:: 18.1.0 |
|
If *these* is passed, no attributes are deleted from the class body. |
|
.. versionchanged:: 18.1.0 If *these* is ordered, the order is retained. |
|
.. versionadded:: 18.2.0 *weakref_slot* |
|
.. deprecated:: 18.2.0 |
|
``__lt__``, ``__le__``, ``__gt__``, and ``__ge__`` now raise a |
|
`DeprecationWarning` if the classes compared are subclasses of |
|
each other. ``__eq`` and ``__ne__`` never tried to compared subclasses |
|
to each other. |
|
.. versionchanged:: 19.2.0 |
|
``__lt__``, ``__le__``, ``__gt__``, and ``__ge__`` now do not consider |
|
subclasses comparable anymore. |
|
.. versionadded:: 18.2.0 *kw_only* |
|
.. versionadded:: 18.2.0 *cache_hash* |
|
.. versionadded:: 19.1.0 *auto_exc* |
|
.. deprecated:: 19.2.0 *cmp* Removal on or after 2021-06-01. |
|
.. versionadded:: 19.2.0 *eq* and *order* |
|
.. versionadded:: 20.1.0 *auto_detect* |
|
.. versionadded:: 20.1.0 *collect_by_mro* |
|
.. versionadded:: 20.1.0 *getstate_setstate* |
|
.. versionadded:: 20.1.0 *on_setattr* |
|
.. versionadded:: 20.3.0 *field_transformer* |
|
.. versionchanged:: 21.1.0 |
|
``init=False`` injects ``__attrs_init__`` |
|
.. versionchanged:: 21.1.0 Support for ``__attrs_pre_init__`` |
|
.. versionchanged:: 21.1.0 *cmp* undeprecated |
|
.. versionadded:: 21.3.0 *match_args* |
|
.. versionadded:: 22.2.0 |
|
*unsafe_hash* as an alias for *hash* (for :pep:`681` compliance). |
|
.. deprecated:: 24.1.0 *repr_ns* |
|
.. versionchanged:: 24.1.0 |
|
Instances are not compared as tuples of attributes anymore, but using a |
|
big ``and`` condition. This is faster and has more correct behavior for |
|
uncomparable values like `math.nan`. |
|
.. versionadded:: 24.1.0 |
|
If a class has an *inherited* classmethod called |
|
``__attrs_init_subclass__``, it is executed after the class is created. |
|
.. deprecated:: 24.1.0 *hash* is deprecated in favor of *unsafe_hash*. |
|
""" |
|
if repr_ns is not None: |
|
import warnings |
|
|
|
warnings.warn( |
|
DeprecationWarning( |
|
"The `repr_ns` argument is deprecated and will be removed in or after August 2025." |
|
), |
|
stacklevel=2, |
|
) |
|
|
|
eq_, order_ = _determine_attrs_eq_order(cmp, eq, order, None) |
|
|
|
|
|
if unsafe_hash is not None: |
|
hash = unsafe_hash |
|
|
|
if isinstance(on_setattr, (list, tuple)): |
|
on_setattr = setters.pipe(*on_setattr) |
|
|
|
def wrap(cls): |
|
is_frozen = frozen or _has_frozen_base_class(cls) |
|
is_exc = auto_exc is True and issubclass(cls, BaseException) |
|
has_own_setattr = auto_detect and _has_own_attribute( |
|
cls, "__setattr__" |
|
) |
|
|
|
if has_own_setattr and is_frozen: |
|
msg = "Can't freeze a class with a custom __setattr__." |
|
raise ValueError(msg) |
|
|
|
builder = _ClassBuilder( |
|
cls, |
|
these, |
|
slots, |
|
is_frozen, |
|
weakref_slot, |
|
_determine_whether_to_implement( |
|
cls, |
|
getstate_setstate, |
|
auto_detect, |
|
("__getstate__", "__setstate__"), |
|
default=slots, |
|
), |
|
auto_attribs, |
|
kw_only, |
|
cache_hash, |
|
is_exc, |
|
collect_by_mro, |
|
on_setattr, |
|
has_own_setattr, |
|
field_transformer, |
|
) |
|
if _determine_whether_to_implement( |
|
cls, repr, auto_detect, ("__repr__",) |
|
): |
|
builder.add_repr(repr_ns) |
|
if str is True: |
|
builder.add_str() |
|
|
|
eq = _determine_whether_to_implement( |
|
cls, eq_, auto_detect, ("__eq__", "__ne__") |
|
) |
|
if not is_exc and eq is True: |
|
builder.add_eq() |
|
if not is_exc and _determine_whether_to_implement( |
|
cls, order_, auto_detect, ("__lt__", "__le__", "__gt__", "__ge__") |
|
): |
|
builder.add_order() |
|
|
|
builder.add_setattr() |
|
|
|
nonlocal hash |
|
if ( |
|
hash is None |
|
and auto_detect is True |
|
and _has_own_attribute(cls, "__hash__") |
|
): |
|
hash = False |
|
|
|
if hash is not True and hash is not False and hash is not None: |
|
|
|
msg = "Invalid value for hash. Must be True, False, or None." |
|
raise TypeError(msg) |
|
|
|
if hash is False or (hash is None and eq is False) or is_exc: |
|
|
|
|
|
if cache_hash: |
|
msg = "Invalid value for cache_hash. To use hash caching, hashing must be either explicitly or implicitly enabled." |
|
raise TypeError(msg) |
|
elif hash is True or ( |
|
hash is None and eq is True and is_frozen is True |
|
): |
|
|
|
builder.add_hash() |
|
else: |
|
|
|
if cache_hash: |
|
msg = "Invalid value for cache_hash. To use hash caching, hashing must be either explicitly or implicitly enabled." |
|
raise TypeError(msg) |
|
builder.make_unhashable() |
|
|
|
if _determine_whether_to_implement( |
|
cls, init, auto_detect, ("__init__",) |
|
): |
|
builder.add_init() |
|
else: |
|
builder.add_attrs_init() |
|
if cache_hash: |
|
msg = "Invalid value for cache_hash. To use hash caching, init must be True." |
|
raise TypeError(msg) |
|
|
|
if ( |
|
PY_3_10_PLUS |
|
and match_args |
|
and not _has_own_attribute(cls, "__match_args__") |
|
): |
|
builder.add_match_args() |
|
|
|
return builder.build_class() |
|
|
|
|
|
|
|
if maybe_cls is None: |
|
return wrap |
|
|
|
return wrap(maybe_cls) |
|
|
|
|
|
_attrs = attrs |
|
""" |
|
Internal alias so we can use it in functions that take an argument called |
|
*attrs*. |
|
""" |
|
|
|
|
|
def _has_frozen_base_class(cls): |
|
""" |
|
Check whether *cls* has a frozen ancestor by looking at its |
|
__setattr__. |
|
""" |
|
return cls.__setattr__ is _frozen_setattrs |
|
|
|
|
|
def _generate_unique_filename(cls, func_name): |
|
""" |
|
Create a "filename" suitable for a function being generated. |
|
""" |
|
return ( |
|
f"<attrs generated {func_name} {cls.__module__}." |
|
f"{getattr(cls, '__qualname__', cls.__name__)}>" |
|
) |
|
|
|
|
|
def _make_hash(cls, attrs, frozen, cache_hash): |
|
attrs = tuple( |
|
a for a in attrs if a.hash is True or (a.hash is None and a.eq is True) |
|
) |
|
|
|
tab = " " |
|
|
|
unique_filename = _generate_unique_filename(cls, "hash") |
|
type_hash = hash(unique_filename) |
|
|
|
globs = {} |
|
|
|
hash_def = "def __hash__(self" |
|
hash_func = "hash((" |
|
closing_braces = "))" |
|
if not cache_hash: |
|
hash_def += "):" |
|
else: |
|
hash_def += ", *" |
|
|
|
hash_def += ", _cache_wrapper=__import__('attr._make')._make._CacheHashWrapper):" |
|
hash_func = "_cache_wrapper(" + hash_func |
|
closing_braces += ")" |
|
|
|
method_lines = [hash_def] |
|
|
|
def append_hash_computation_lines(prefix, indent): |
|
""" |
|
Generate the code for actually computing the hash code. |
|
Below this will either be returned directly or used to compute |
|
a value which is then cached, depending on the value of cache_hash |
|
""" |
|
|
|
method_lines.extend( |
|
[ |
|
indent + prefix + hash_func, |
|
indent + f" {type_hash},", |
|
] |
|
) |
|
|
|
for a in attrs: |
|
if a.eq_key: |
|
cmp_name = f"_{a.name}_key" |
|
globs[cmp_name] = a.eq_key |
|
method_lines.append( |
|
indent + f" {cmp_name}(self.{a.name})," |
|
) |
|
else: |
|
method_lines.append(indent + f" self.{a.name},") |
|
|
|
method_lines.append(indent + " " + closing_braces) |
|
|
|
if cache_hash: |
|
method_lines.append(tab + f"if self.{_HASH_CACHE_FIELD} is None:") |
|
if frozen: |
|
append_hash_computation_lines( |
|
f"object.__setattr__(self, '{_HASH_CACHE_FIELD}', ", tab * 2 |
|
) |
|
method_lines.append(tab * 2 + ")") |
|
else: |
|
append_hash_computation_lines( |
|
f"self.{_HASH_CACHE_FIELD} = ", tab * 2 |
|
) |
|
method_lines.append(tab + f"return self.{_HASH_CACHE_FIELD}") |
|
else: |
|
append_hash_computation_lines("return ", tab) |
|
|
|
script = "\n".join(method_lines) |
|
return _make_method("__hash__", script, unique_filename, globs) |
|
|
|
|
|
def _add_hash(cls, attrs): |
|
""" |
|
Add a hash method to *cls*. |
|
""" |
|
cls.__hash__ = _make_hash(cls, attrs, frozen=False, cache_hash=False) |
|
return cls |
|
|
|
|
|
def _make_ne(): |
|
""" |
|
Create __ne__ method. |
|
""" |
|
|
|
def __ne__(self, other): |
|
""" |
|
Check equality and either forward a NotImplemented or |
|
return the result negated. |
|
""" |
|
result = self.__eq__(other) |
|
if result is NotImplemented: |
|
return NotImplemented |
|
|
|
return not result |
|
|
|
return __ne__ |
|
|
|
|
|
def _make_eq(cls, attrs): |
|
""" |
|
Create __eq__ method for *cls* with *attrs*. |
|
""" |
|
attrs = [a for a in attrs if a.eq] |
|
|
|
unique_filename = _generate_unique_filename(cls, "eq") |
|
lines = [ |
|
"def __eq__(self, other):", |
|
" if other.__class__ is not self.__class__:", |
|
" return NotImplemented", |
|
] |
|
|
|
|
|
|
|
globs = {} |
|
if attrs: |
|
lines.append(" return (") |
|
for a in attrs: |
|
if a.eq_key: |
|
cmp_name = f"_{a.name}_key" |
|
|
|
|
|
globs[cmp_name] = a.eq_key |
|
lines.append( |
|
f" {cmp_name}(self.{a.name}) == {cmp_name}(other.{a.name})" |
|
) |
|
else: |
|
lines.append(f" self.{a.name} == other.{a.name}") |
|
if a is not attrs[-1]: |
|
lines[-1] = f"{lines[-1]} and" |
|
lines.append(" )") |
|
else: |
|
lines.append(" return True") |
|
|
|
script = "\n".join(lines) |
|
|
|
return _make_method("__eq__", script, unique_filename, globs) |
|
|
|
|
|
def _make_order(cls, attrs): |
|
""" |
|
Create ordering methods for *cls* with *attrs*. |
|
""" |
|
attrs = [a for a in attrs if a.order] |
|
|
|
def attrs_to_tuple(obj): |
|
""" |
|
Save us some typing. |
|
""" |
|
return tuple( |
|
key(value) if key else value |
|
for value, key in ( |
|
(getattr(obj, a.name), a.order_key) for a in attrs |
|
) |
|
) |
|
|
|
def __lt__(self, other): |
|
""" |
|
Automatically created by attrs. |
|
""" |
|
if other.__class__ is self.__class__: |
|
return attrs_to_tuple(self) < attrs_to_tuple(other) |
|
|
|
return NotImplemented |
|
|
|
def __le__(self, other): |
|
""" |
|
Automatically created by attrs. |
|
""" |
|
if other.__class__ is self.__class__: |
|
return attrs_to_tuple(self) <= attrs_to_tuple(other) |
|
|
|
return NotImplemented |
|
|
|
def __gt__(self, other): |
|
""" |
|
Automatically created by attrs. |
|
""" |
|
if other.__class__ is self.__class__: |
|
return attrs_to_tuple(self) > attrs_to_tuple(other) |
|
|
|
return NotImplemented |
|
|
|
def __ge__(self, other): |
|
""" |
|
Automatically created by attrs. |
|
""" |
|
if other.__class__ is self.__class__: |
|
return attrs_to_tuple(self) >= attrs_to_tuple(other) |
|
|
|
return NotImplemented |
|
|
|
return __lt__, __le__, __gt__, __ge__ |
|
|
|
|
|
def _add_eq(cls, attrs=None): |
|
""" |
|
Add equality methods to *cls* with *attrs*. |
|
""" |
|
if attrs is None: |
|
attrs = cls.__attrs_attrs__ |
|
|
|
cls.__eq__ = _make_eq(cls, attrs) |
|
cls.__ne__ = _make_ne() |
|
|
|
return cls |
|
|
|
|
|
def _make_repr(attrs, ns, cls): |
|
unique_filename = _generate_unique_filename(cls, "repr") |
|
|
|
|
|
|
|
attr_names_with_reprs = tuple( |
|
(a.name, (repr if a.repr is True else a.repr), a.init) |
|
for a in attrs |
|
if a.repr is not False |
|
) |
|
globs = { |
|
name + "_repr": r for name, r, _ in attr_names_with_reprs if r != repr |
|
} |
|
globs["_compat"] = _compat |
|
globs["AttributeError"] = AttributeError |
|
globs["NOTHING"] = NOTHING |
|
attribute_fragments = [] |
|
for name, r, i in attr_names_with_reprs: |
|
accessor = ( |
|
"self." + name if i else 'getattr(self, "' + name + '", NOTHING)' |
|
) |
|
fragment = ( |
|
"%s={%s!r}" % (name, accessor) |
|
if r == repr |
|
else "%s={%s_repr(%s)}" % (name, name, accessor) |
|
) |
|
attribute_fragments.append(fragment) |
|
repr_fragment = ", ".join(attribute_fragments) |
|
|
|
if ns is None: |
|
cls_name_fragment = '{self.__class__.__qualname__.rsplit(">.", 1)[-1]}' |
|
else: |
|
cls_name_fragment = ns + ".{self.__class__.__name__}" |
|
|
|
lines = [ |
|
"def __repr__(self):", |
|
" try:", |
|
" already_repring = _compat.repr_context.already_repring", |
|
" except AttributeError:", |
|
" already_repring = {id(self),}", |
|
" _compat.repr_context.already_repring = already_repring", |
|
" else:", |
|
" if id(self) in already_repring:", |
|
" return '...'", |
|
" else:", |
|
" already_repring.add(id(self))", |
|
" try:", |
|
f" return f'{cls_name_fragment}({repr_fragment})'", |
|
" finally:", |
|
" already_repring.remove(id(self))", |
|
] |
|
|
|
return _make_method( |
|
"__repr__", "\n".join(lines), unique_filename, globs=globs |
|
) |
|
|
|
|
|
def _add_repr(cls, ns=None, attrs=None): |
|
""" |
|
Add a repr method to *cls*. |
|
""" |
|
if attrs is None: |
|
attrs = cls.__attrs_attrs__ |
|
|
|
cls.__repr__ = _make_repr(attrs, ns, cls) |
|
return cls |
|
|
|
|
|
def fields(cls): |
|
""" |
|
Return the tuple of *attrs* attributes for a class. |
|
|
|
The tuple also allows accessing the fields by their names (see below for |
|
examples). |
|
|
|
Args: |
|
cls (type): Class to introspect. |
|
|
|
Raises: |
|
TypeError: If *cls* is not a class. |
|
|
|
attrs.exceptions.NotAnAttrsClassError: |
|
If *cls* is not an *attrs* class. |
|
|
|
Returns: |
|
tuple (with name accessors) of `attrs.Attribute` |
|
|
|
.. versionchanged:: 16.2.0 Returned tuple allows accessing the fields |
|
by name. |
|
.. versionchanged:: 23.1.0 Add support for generic classes. |
|
""" |
|
generic_base = get_generic_base(cls) |
|
|
|
if generic_base is None and not isinstance(cls, type): |
|
msg = "Passed object must be a class." |
|
raise TypeError(msg) |
|
|
|
attrs = getattr(cls, "__attrs_attrs__", None) |
|
|
|
if attrs is None: |
|
if generic_base is not None: |
|
attrs = getattr(generic_base, "__attrs_attrs__", None) |
|
if attrs is not None: |
|
|
|
|
|
|
|
cls.__attrs_attrs__ = attrs |
|
return attrs |
|
msg = f"{cls!r} is not an attrs-decorated class." |
|
raise NotAnAttrsClassError(msg) |
|
|
|
return attrs |
|
|
|
|
|
def fields_dict(cls): |
|
""" |
|
Return an ordered dictionary of *attrs* attributes for a class, whose keys |
|
are the attribute names. |
|
|
|
Args: |
|
cls (type): Class to introspect. |
|
|
|
Raises: |
|
TypeError: If *cls* is not a class. |
|
|
|
attrs.exceptions.NotAnAttrsClassError: |
|
If *cls* is not an *attrs* class. |
|
|
|
Returns: |
|
dict[str, attrs.Attribute]: Dict of attribute name to definition |
|
|
|
.. versionadded:: 18.1.0 |
|
""" |
|
if not isinstance(cls, type): |
|
msg = "Passed object must be a class." |
|
raise TypeError(msg) |
|
attrs = getattr(cls, "__attrs_attrs__", None) |
|
if attrs is None: |
|
msg = f"{cls!r} is not an attrs-decorated class." |
|
raise NotAnAttrsClassError(msg) |
|
return {a.name: a for a in attrs} |
|
|
|
|
|
def validate(inst): |
|
""" |
|
Validate all attributes on *inst* that have a validator. |
|
|
|
Leaves all exceptions through. |
|
|
|
Args: |
|
inst: Instance of a class with *attrs* attributes. |
|
""" |
|
if _config._run_validators is False: |
|
return |
|
|
|
for a in fields(inst.__class__): |
|
v = a.validator |
|
if v is not None: |
|
v(inst, a, getattr(inst, a.name)) |
|
|
|
|
|
def _is_slot_attr(a_name, base_attr_map): |
|
""" |
|
Check if the attribute name comes from a slot class. |
|
""" |
|
cls = base_attr_map.get(a_name) |
|
return cls and "__slots__" in cls.__dict__ |
|
|
|
|
|
def _make_init( |
|
cls, |
|
attrs, |
|
pre_init, |
|
pre_init_has_args, |
|
post_init, |
|
frozen, |
|
slots, |
|
cache_hash, |
|
base_attr_map, |
|
is_exc, |
|
cls_on_setattr, |
|
attrs_init, |
|
): |
|
has_cls_on_setattr = ( |
|
cls_on_setattr is not None and cls_on_setattr is not setters.NO_OP |
|
) |
|
|
|
if frozen and has_cls_on_setattr: |
|
msg = "Frozen classes can't use on_setattr." |
|
raise ValueError(msg) |
|
|
|
needs_cached_setattr = cache_hash or frozen |
|
filtered_attrs = [] |
|
attr_dict = {} |
|
for a in attrs: |
|
if not a.init and a.default is NOTHING: |
|
continue |
|
|
|
filtered_attrs.append(a) |
|
attr_dict[a.name] = a |
|
|
|
if a.on_setattr is not None: |
|
if frozen is True: |
|
msg = "Frozen classes can't use on_setattr." |
|
raise ValueError(msg) |
|
|
|
needs_cached_setattr = True |
|
elif has_cls_on_setattr and a.on_setattr is not setters.NO_OP: |
|
needs_cached_setattr = True |
|
|
|
unique_filename = _generate_unique_filename(cls, "init") |
|
|
|
script, globs, annotations = _attrs_to_init_script( |
|
filtered_attrs, |
|
frozen, |
|
slots, |
|
pre_init, |
|
pre_init_has_args, |
|
post_init, |
|
cache_hash, |
|
base_attr_map, |
|
is_exc, |
|
needs_cached_setattr, |
|
has_cls_on_setattr, |
|
"__attrs_init__" if attrs_init else "__init__", |
|
) |
|
if cls.__module__ in sys.modules: |
|
|
|
globs.update(sys.modules[cls.__module__].__dict__) |
|
|
|
globs.update({"NOTHING": NOTHING, "attr_dict": attr_dict}) |
|
|
|
if needs_cached_setattr: |
|
|
|
|
|
globs["_cached_setattr_get"] = _OBJ_SETATTR.__get__ |
|
|
|
init = _make_method( |
|
"__attrs_init__" if attrs_init else "__init__", |
|
script, |
|
unique_filename, |
|
globs, |
|
) |
|
init.__annotations__ = annotations |
|
|
|
return init |
|
|
|
|
|
def _setattr(attr_name: str, value_var: str, has_on_setattr: bool) -> str: |
|
""" |
|
Use the cached object.setattr to set *attr_name* to *value_var*. |
|
""" |
|
return f"_setattr('{attr_name}', {value_var})" |
|
|
|
|
|
def _setattr_with_converter( |
|
attr_name: str, value_var: str, has_on_setattr: bool, converter: Converter |
|
) -> str: |
|
""" |
|
Use the cached object.setattr to set *attr_name* to *value_var*, but run |
|
its converter first. |
|
""" |
|
return f"_setattr('{attr_name}', {converter._fmt_converter_call(attr_name, value_var)})" |
|
|
|
|
|
def _assign(attr_name: str, value: str, has_on_setattr: bool) -> str: |
|
""" |
|
Unless *attr_name* has an on_setattr hook, use normal assignment. Otherwise |
|
relegate to _setattr. |
|
""" |
|
if has_on_setattr: |
|
return _setattr(attr_name, value, True) |
|
|
|
return f"self.{attr_name} = {value}" |
|
|
|
|
|
def _assign_with_converter( |
|
attr_name: str, value_var: str, has_on_setattr: bool, converter: Converter |
|
) -> str: |
|
""" |
|
Unless *attr_name* has an on_setattr hook, use normal assignment after |
|
conversion. Otherwise relegate to _setattr_with_converter. |
|
""" |
|
if has_on_setattr: |
|
return _setattr_with_converter(attr_name, value_var, True, converter) |
|
|
|
return f"self.{attr_name} = {converter._fmt_converter_call(attr_name, value_var)}" |
|
|
|
|
|
def _determine_setters( |
|
frozen: bool, slots: bool, base_attr_map: dict[str, type] |
|
): |
|
""" |
|
Determine the correct setter functions based on whether a class is frozen |
|
and/or slotted. |
|
""" |
|
if frozen is True: |
|
if slots is True: |
|
return (), _setattr, _setattr_with_converter |
|
|
|
|
|
|
|
|
|
|
|
|
|
def fmt_setter( |
|
attr_name: str, value_var: str, has_on_setattr: bool |
|
) -> str: |
|
if _is_slot_attr(attr_name, base_attr_map): |
|
return _setattr(attr_name, value_var, has_on_setattr) |
|
|
|
return f"_inst_dict['{attr_name}'] = {value_var}" |
|
|
|
def fmt_setter_with_converter( |
|
attr_name: str, |
|
value_var: str, |
|
has_on_setattr: bool, |
|
converter: Converter, |
|
) -> str: |
|
if has_on_setattr or _is_slot_attr(attr_name, base_attr_map): |
|
return _setattr_with_converter( |
|
attr_name, value_var, has_on_setattr, converter |
|
) |
|
|
|
return f"_inst_dict['{attr_name}'] = {converter._fmt_converter_call(attr_name, value_var)}" |
|
|
|
return ( |
|
("_inst_dict = self.__dict__",), |
|
fmt_setter, |
|
fmt_setter_with_converter, |
|
) |
|
|
|
|
|
return (), _assign, _assign_with_converter |
|
|
|
|
|
def _attrs_to_init_script( |
|
attrs: list[Attribute], |
|
is_frozen: bool, |
|
is_slotted: bool, |
|
call_pre_init: bool, |
|
pre_init_has_args: bool, |
|
call_post_init: bool, |
|
does_cache_hash: bool, |
|
base_attr_map: dict[str, type], |
|
is_exc: bool, |
|
needs_cached_setattr: bool, |
|
has_cls_on_setattr: bool, |
|
method_name: str, |
|
) -> tuple[str, dict, dict]: |
|
""" |
|
Return a script of an initializer for *attrs*, a dict of globals, and |
|
annotations for the initializer. |
|
|
|
The globals are required by the generated script. |
|
""" |
|
lines = ["self.__attrs_pre_init__()"] if call_pre_init else [] |
|
|
|
if needs_cached_setattr: |
|
lines.append( |
|
|
|
|
|
|
|
"_setattr = _cached_setattr_get(self)" |
|
) |
|
|
|
extra_lines, fmt_setter, fmt_setter_with_converter = _determine_setters( |
|
is_frozen, is_slotted, base_attr_map |
|
) |
|
lines.extend(extra_lines) |
|
|
|
args = [] |
|
kw_only_args = [] |
|
attrs_to_validate = [] |
|
|
|
|
|
|
|
names_for_globals = {} |
|
annotations = {"return": None} |
|
|
|
for a in attrs: |
|
if a.validator: |
|
attrs_to_validate.append(a) |
|
|
|
attr_name = a.name |
|
has_on_setattr = a.on_setattr is not None or ( |
|
a.on_setattr is not setters.NO_OP and has_cls_on_setattr |
|
) |
|
|
|
|
|
arg_name = a.alias |
|
|
|
has_factory = isinstance(a.default, Factory) |
|
maybe_self = "self" if has_factory and a.default.takes_self else "" |
|
|
|
if a.converter and not isinstance(a.converter, Converter): |
|
converter = Converter(a.converter) |
|
else: |
|
converter = a.converter |
|
|
|
if a.init is False: |
|
if has_factory: |
|
init_factory_name = _INIT_FACTORY_PAT % (a.name,) |
|
if converter is not None: |
|
lines.append( |
|
fmt_setter_with_converter( |
|
attr_name, |
|
init_factory_name + f"({maybe_self})", |
|
has_on_setattr, |
|
converter, |
|
) |
|
) |
|
names_for_globals[converter._get_global_name(a.name)] = ( |
|
converter.converter |
|
) |
|
else: |
|
lines.append( |
|
fmt_setter( |
|
attr_name, |
|
init_factory_name + f"({maybe_self})", |
|
has_on_setattr, |
|
) |
|
) |
|
names_for_globals[init_factory_name] = a.default.factory |
|
elif converter is not None: |
|
lines.append( |
|
fmt_setter_with_converter( |
|
attr_name, |
|
f"attr_dict['{attr_name}'].default", |
|
has_on_setattr, |
|
converter, |
|
) |
|
) |
|
names_for_globals[converter._get_global_name(a.name)] = ( |
|
converter.converter |
|
) |
|
else: |
|
lines.append( |
|
fmt_setter( |
|
attr_name, |
|
f"attr_dict['{attr_name}'].default", |
|
has_on_setattr, |
|
) |
|
) |
|
elif a.default is not NOTHING and not has_factory: |
|
arg = f"{arg_name}=attr_dict['{attr_name}'].default" |
|
if a.kw_only: |
|
kw_only_args.append(arg) |
|
else: |
|
args.append(arg) |
|
|
|
if converter is not None: |
|
lines.append( |
|
fmt_setter_with_converter( |
|
attr_name, arg_name, has_on_setattr, converter |
|
) |
|
) |
|
names_for_globals[converter._get_global_name(a.name)] = ( |
|
converter.converter |
|
) |
|
else: |
|
lines.append(fmt_setter(attr_name, arg_name, has_on_setattr)) |
|
|
|
elif has_factory: |
|
arg = f"{arg_name}=NOTHING" |
|
if a.kw_only: |
|
kw_only_args.append(arg) |
|
else: |
|
args.append(arg) |
|
lines.append(f"if {arg_name} is not NOTHING:") |
|
|
|
init_factory_name = _INIT_FACTORY_PAT % (a.name,) |
|
if converter is not None: |
|
lines.append( |
|
" " |
|
+ fmt_setter_with_converter( |
|
attr_name, arg_name, has_on_setattr, converter |
|
) |
|
) |
|
lines.append("else:") |
|
lines.append( |
|
" " |
|
+ fmt_setter_with_converter( |
|
attr_name, |
|
init_factory_name + "(" + maybe_self + ")", |
|
has_on_setattr, |
|
converter, |
|
) |
|
) |
|
names_for_globals[converter._get_global_name(a.name)] = ( |
|
converter.converter |
|
) |
|
else: |
|
lines.append( |
|
" " + fmt_setter(attr_name, arg_name, has_on_setattr) |
|
) |
|
lines.append("else:") |
|
lines.append( |
|
" " |
|
+ fmt_setter( |
|
attr_name, |
|
init_factory_name + "(" + maybe_self + ")", |
|
has_on_setattr, |
|
) |
|
) |
|
names_for_globals[init_factory_name] = a.default.factory |
|
else: |
|
if a.kw_only: |
|
kw_only_args.append(arg_name) |
|
else: |
|
args.append(arg_name) |
|
|
|
if converter is not None: |
|
lines.append( |
|
fmt_setter_with_converter( |
|
attr_name, arg_name, has_on_setattr, converter |
|
) |
|
) |
|
names_for_globals[converter._get_global_name(a.name)] = ( |
|
converter.converter |
|
) |
|
else: |
|
lines.append(fmt_setter(attr_name, arg_name, has_on_setattr)) |
|
|
|
if a.init is True: |
|
if a.type is not None and converter is None: |
|
annotations[arg_name] = a.type |
|
elif converter is not None and converter._first_param_type: |
|
|
|
annotations[arg_name] = converter._first_param_type |
|
|
|
if attrs_to_validate: |
|
names_for_globals["_config"] = _config |
|
lines.append("if _config._run_validators is True:") |
|
for a in attrs_to_validate: |
|
val_name = "__attr_validator_" + a.name |
|
attr_name = "__attr_" + a.name |
|
lines.append(f" {val_name}(self, {attr_name}, self.{a.name})") |
|
names_for_globals[val_name] = a.validator |
|
names_for_globals[attr_name] = a |
|
|
|
if call_post_init: |
|
lines.append("self.__attrs_post_init__()") |
|
|
|
|
|
|
|
|
|
|
|
|
|
if does_cache_hash: |
|
if is_frozen: |
|
if is_slotted: |
|
init_hash_cache = f"_setattr('{_HASH_CACHE_FIELD}', None)" |
|
else: |
|
init_hash_cache = f"_inst_dict['{_HASH_CACHE_FIELD}'] = None" |
|
else: |
|
init_hash_cache = f"self.{_HASH_CACHE_FIELD} = None" |
|
lines.append(init_hash_cache) |
|
|
|
|
|
|
|
if is_exc: |
|
vals = ",".join(f"self.{a.name}" for a in attrs if a.init) |
|
|
|
lines.append(f"BaseException.__init__(self, {vals})") |
|
|
|
args = ", ".join(args) |
|
pre_init_args = args |
|
if kw_only_args: |
|
|
|
args += f"{', ' if args else ''}*, {', '.join(kw_only_args)}" |
|
pre_init_kw_only_args = ", ".join( |
|
[ |
|
f"{kw_arg_name}={kw_arg_name}" |
|
|
|
for kw_arg_name in (kwa.split("=")[0] for kwa in kw_only_args) |
|
] |
|
) |
|
pre_init_args += ", " if pre_init_args else "" |
|
pre_init_args += pre_init_kw_only_args |
|
|
|
if call_pre_init and pre_init_has_args: |
|
|
|
lines[0] = f"self.__attrs_pre_init__({pre_init_args})" |
|
|
|
|
|
NL = "\n " |
|
return ( |
|
f"""def {method_name}(self, {args}): |
|
{NL.join(lines) if lines else 'pass'} |
|
""", |
|
names_for_globals, |
|
annotations, |
|
) |
|
|
|
|
|
def _default_init_alias_for(name: str) -> str: |
|
""" |
|
The default __init__ parameter name for a field. |
|
|
|
This performs private-name adjustment via leading-unscore stripping, |
|
and is the default value of Attribute.alias if not provided. |
|
""" |
|
|
|
return name.lstrip("_") |
|
|
|
|
|
class Attribute: |
|
""" |
|
*Read-only* representation of an attribute. |
|
|
|
.. warning:: |
|
|
|
You should never instantiate this class yourself. |
|
|
|
The class has *all* arguments of `attr.ib` (except for ``factory`` which is |
|
only syntactic sugar for ``default=Factory(...)`` plus the following: |
|
|
|
- ``name`` (`str`): The name of the attribute. |
|
- ``alias`` (`str`): The __init__ parameter name of the attribute, after |
|
any explicit overrides and default private-attribute-name handling. |
|
- ``inherited`` (`bool`): Whether or not that attribute has been inherited |
|
from a base class. |
|
- ``eq_key`` and ``order_key`` (`typing.Callable` or `None`): The |
|
callables that are used for comparing and ordering objects by this |
|
attribute, respectively. These are set by passing a callable to |
|
`attr.ib`'s ``eq``, ``order``, or ``cmp`` arguments. See also |
|
:ref:`comparison customization <custom-comparison>`. |
|
|
|
Instances of this class are frequently used for introspection purposes |
|
like: |
|
|
|
- `fields` returns a tuple of them. |
|
- Validators get them passed as the first argument. |
|
- The :ref:`field transformer <transform-fields>` hook receives a list of |
|
them. |
|
- The ``alias`` property exposes the __init__ parameter name of the field, |
|
with any overrides and default private-attribute handling applied. |
|
|
|
|
|
.. versionadded:: 20.1.0 *inherited* |
|
.. versionadded:: 20.1.0 *on_setattr* |
|
.. versionchanged:: 20.2.0 *inherited* is not taken into account for |
|
equality checks and hashing anymore. |
|
.. versionadded:: 21.1.0 *eq_key* and *order_key* |
|
.. versionadded:: 22.2.0 *alias* |
|
|
|
For the full version history of the fields, see `attr.ib`. |
|
""" |
|
|
|
__slots__ = ( |
|
"name", |
|
"default", |
|
"validator", |
|
"repr", |
|
"eq", |
|
"eq_key", |
|
"order", |
|
"order_key", |
|
"hash", |
|
"init", |
|
"metadata", |
|
"type", |
|
"converter", |
|
"kw_only", |
|
"inherited", |
|
"on_setattr", |
|
"alias", |
|
) |
|
|
|
def __init__( |
|
self, |
|
name, |
|
default, |
|
validator, |
|
repr, |
|
cmp, |
|
hash, |
|
init, |
|
inherited, |
|
metadata=None, |
|
type=None, |
|
converter=None, |
|
kw_only=False, |
|
eq=None, |
|
eq_key=None, |
|
order=None, |
|
order_key=None, |
|
on_setattr=None, |
|
alias=None, |
|
): |
|
eq, eq_key, order, order_key = _determine_attrib_eq_order( |
|
cmp, eq_key or eq, order_key or order, True |
|
) |
|
|
|
|
|
bound_setattr = _OBJ_SETATTR.__get__(self) |
|
|
|
|
|
|
|
bound_setattr("name", name) |
|
bound_setattr("default", default) |
|
bound_setattr("validator", validator) |
|
bound_setattr("repr", repr) |
|
bound_setattr("eq", eq) |
|
bound_setattr("eq_key", eq_key) |
|
bound_setattr("order", order) |
|
bound_setattr("order_key", order_key) |
|
bound_setattr("hash", hash) |
|
bound_setattr("init", init) |
|
bound_setattr("converter", converter) |
|
bound_setattr( |
|
"metadata", |
|
( |
|
types.MappingProxyType(dict(metadata)) |
|
if metadata |
|
else _EMPTY_METADATA_SINGLETON |
|
), |
|
) |
|
bound_setattr("type", type) |
|
bound_setattr("kw_only", kw_only) |
|
bound_setattr("inherited", inherited) |
|
bound_setattr("on_setattr", on_setattr) |
|
bound_setattr("alias", alias) |
|
|
|
def __setattr__(self, name, value): |
|
raise FrozenInstanceError() |
|
|
|
@classmethod |
|
def from_counting_attr(cls, name, ca, type=None): |
|
|
|
if type is None: |
|
type = ca.type |
|
elif ca.type is not None: |
|
msg = "Type annotation and type argument cannot both be present" |
|
raise ValueError(msg) |
|
inst_dict = { |
|
k: getattr(ca, k) |
|
for k in Attribute.__slots__ |
|
if k |
|
not in ( |
|
"name", |
|
"validator", |
|
"default", |
|
"type", |
|
"inherited", |
|
) |
|
} |
|
return cls( |
|
name=name, |
|
validator=ca._validator, |
|
default=ca._default, |
|
type=type, |
|
cmp=None, |
|
inherited=False, |
|
**inst_dict, |
|
) |
|
|
|
|
|
def evolve(self, **changes): |
|
""" |
|
Copy *self* and apply *changes*. |
|
|
|
This works similarly to `attrs.evolve` but that function does not work |
|
with {class}`Attribute`. |
|
|
|
It is mainly meant to be used for `transform-fields`. |
|
|
|
.. versionadded:: 20.3.0 |
|
""" |
|
new = copy.copy(self) |
|
|
|
new._setattrs(changes.items()) |
|
|
|
return new |
|
|
|
|
|
def __getstate__(self): |
|
""" |
|
Play nice with pickle. |
|
""" |
|
return tuple( |
|
getattr(self, name) if name != "metadata" else dict(self.metadata) |
|
for name in self.__slots__ |
|
) |
|
|
|
def __setstate__(self, state): |
|
""" |
|
Play nice with pickle. |
|
""" |
|
self._setattrs(zip(self.__slots__, state)) |
|
|
|
def _setattrs(self, name_values_pairs): |
|
bound_setattr = _OBJ_SETATTR.__get__(self) |
|
for name, value in name_values_pairs: |
|
if name != "metadata": |
|
bound_setattr(name, value) |
|
else: |
|
bound_setattr( |
|
name, |
|
( |
|
types.MappingProxyType(dict(value)) |
|
if value |
|
else _EMPTY_METADATA_SINGLETON |
|
), |
|
) |
|
|
|
|
|
_a = [ |
|
Attribute( |
|
name=name, |
|
default=NOTHING, |
|
validator=None, |
|
repr=True, |
|
cmp=None, |
|
eq=True, |
|
order=False, |
|
hash=(name != "metadata"), |
|
init=True, |
|
inherited=False, |
|
alias=_default_init_alias_for(name), |
|
) |
|
for name in Attribute.__slots__ |
|
] |
|
|
|
Attribute = _add_hash( |
|
_add_eq( |
|
_add_repr(Attribute, attrs=_a), |
|
attrs=[a for a in _a if a.name != "inherited"], |
|
), |
|
attrs=[a for a in _a if a.hash and a.name != "inherited"], |
|
) |
|
|
|
|
|
class _CountingAttr: |
|
""" |
|
Intermediate representation of attributes that uses a counter to preserve |
|
the order in which the attributes have been defined. |
|
|
|
*Internal* data structure of the attrs library. Running into is most |
|
likely the result of a bug like a forgotten `@attr.s` decorator. |
|
""" |
|
|
|
__slots__ = ( |
|
"counter", |
|
"_default", |
|
"repr", |
|
"eq", |
|
"eq_key", |
|
"order", |
|
"order_key", |
|
"hash", |
|
"init", |
|
"metadata", |
|
"_validator", |
|
"converter", |
|
"type", |
|
"kw_only", |
|
"on_setattr", |
|
"alias", |
|
) |
|
__attrs_attrs__ = ( |
|
*tuple( |
|
Attribute( |
|
name=name, |
|
alias=_default_init_alias_for(name), |
|
default=NOTHING, |
|
validator=None, |
|
repr=True, |
|
cmp=None, |
|
hash=True, |
|
init=True, |
|
kw_only=False, |
|
eq=True, |
|
eq_key=None, |
|
order=False, |
|
order_key=None, |
|
inherited=False, |
|
on_setattr=None, |
|
) |
|
for name in ( |
|
"counter", |
|
"_default", |
|
"repr", |
|
"eq", |
|
"order", |
|
"hash", |
|
"init", |
|
"on_setattr", |
|
"alias", |
|
) |
|
), |
|
Attribute( |
|
name="metadata", |
|
alias="metadata", |
|
default=None, |
|
validator=None, |
|
repr=True, |
|
cmp=None, |
|
hash=False, |
|
init=True, |
|
kw_only=False, |
|
eq=True, |
|
eq_key=None, |
|
order=False, |
|
order_key=None, |
|
inherited=False, |
|
on_setattr=None, |
|
), |
|
) |
|
cls_counter = 0 |
|
|
|
def __init__( |
|
self, |
|
default, |
|
validator, |
|
repr, |
|
cmp, |
|
hash, |
|
init, |
|
converter, |
|
metadata, |
|
type, |
|
kw_only, |
|
eq, |
|
eq_key, |
|
order, |
|
order_key, |
|
on_setattr, |
|
alias, |
|
): |
|
_CountingAttr.cls_counter += 1 |
|
self.counter = _CountingAttr.cls_counter |
|
self._default = default |
|
self._validator = validator |
|
self.converter = converter |
|
self.repr = repr |
|
self.eq = eq |
|
self.eq_key = eq_key |
|
self.order = order |
|
self.order_key = order_key |
|
self.hash = hash |
|
self.init = init |
|
self.metadata = metadata |
|
self.type = type |
|
self.kw_only = kw_only |
|
self.on_setattr = on_setattr |
|
self.alias = alias |
|
|
|
def validator(self, meth): |
|
""" |
|
Decorator that adds *meth* to the list of validators. |
|
|
|
Returns *meth* unchanged. |
|
|
|
.. versionadded:: 17.1.0 |
|
""" |
|
if self._validator is None: |
|
self._validator = meth |
|
else: |
|
self._validator = and_(self._validator, meth) |
|
return meth |
|
|
|
def default(self, meth): |
|
""" |
|
Decorator that allows to set the default for an attribute. |
|
|
|
Returns *meth* unchanged. |
|
|
|
Raises: |
|
DefaultAlreadySetError: If default has been set before. |
|
|
|
.. versionadded:: 17.1.0 |
|
""" |
|
if self._default is not NOTHING: |
|
raise DefaultAlreadySetError() |
|
|
|
self._default = Factory(meth, takes_self=True) |
|
|
|
return meth |
|
|
|
|
|
_CountingAttr = _add_eq(_add_repr(_CountingAttr)) |
|
|
|
|
|
class Factory: |
|
""" |
|
Stores a factory callable. |
|
|
|
If passed as the default value to `attrs.field`, the factory is used to |
|
generate a new value. |
|
|
|
Args: |
|
factory (typing.Callable): |
|
A callable that takes either none or exactly one mandatory |
|
positional argument depending on *takes_self*. |
|
|
|
takes_self (bool): |
|
Pass the partially initialized instance that is being initialized |
|
as a positional argument. |
|
|
|
.. versionadded:: 17.1.0 *takes_self* |
|
""" |
|
|
|
__slots__ = ("factory", "takes_self") |
|
|
|
def __init__(self, factory, takes_self=False): |
|
self.factory = factory |
|
self.takes_self = takes_self |
|
|
|
def __getstate__(self): |
|
""" |
|
Play nice with pickle. |
|
""" |
|
return tuple(getattr(self, name) for name in self.__slots__) |
|
|
|
def __setstate__(self, state): |
|
""" |
|
Play nice with pickle. |
|
""" |
|
for name, value in zip(self.__slots__, state): |
|
setattr(self, name, value) |
|
|
|
|
|
_f = [ |
|
Attribute( |
|
name=name, |
|
default=NOTHING, |
|
validator=None, |
|
repr=True, |
|
cmp=None, |
|
eq=True, |
|
order=False, |
|
hash=True, |
|
init=True, |
|
inherited=False, |
|
) |
|
for name in Factory.__slots__ |
|
] |
|
|
|
Factory = _add_hash(_add_eq(_add_repr(Factory, attrs=_f), attrs=_f), attrs=_f) |
|
|
|
|
|
class Converter: |
|
""" |
|
Stores a converter callable. |
|
|
|
Allows for the wrapped converter to take additional arguments. The |
|
arguments are passed in the order they are documented. |
|
|
|
Args: |
|
converter (Callable): A callable that converts the passed value. |
|
|
|
takes_self (bool): |
|
Pass the partially initialized instance that is being initialized |
|
as a positional argument. (default: `False`) |
|
|
|
takes_field (bool): |
|
Pass the field definition (an :class:`Attribute`) into the |
|
converter as a positional argument. (default: `False`) |
|
|
|
.. versionadded:: 24.1.0 |
|
""" |
|
|
|
__slots__ = ( |
|
"converter", |
|
"takes_self", |
|
"takes_field", |
|
"_first_param_type", |
|
"_global_name", |
|
"__call__", |
|
) |
|
|
|
def __init__(self, converter, *, takes_self=False, takes_field=False): |
|
self.converter = converter |
|
self.takes_self = takes_self |
|
self.takes_field = takes_field |
|
|
|
ex = _AnnotationExtractor(converter) |
|
self._first_param_type = ex.get_first_param_type() |
|
|
|
if not (self.takes_self or self.takes_field): |
|
self.__call__ = lambda value, _, __: self.converter(value) |
|
elif self.takes_self and not self.takes_field: |
|
self.__call__ = lambda value, instance, __: self.converter( |
|
value, instance |
|
) |
|
elif not self.takes_self and self.takes_field: |
|
self.__call__ = lambda value, __, field: self.converter( |
|
value, field |
|
) |
|
else: |
|
self.__call__ = lambda value, instance, field: self.converter( |
|
value, instance, field |
|
) |
|
|
|
rt = ex.get_return_type() |
|
if rt is not None: |
|
self.__call__.__annotations__["return"] = rt |
|
|
|
@staticmethod |
|
def _get_global_name(attr_name: str) -> str: |
|
""" |
|
Return the name that a converter for an attribute name *attr_name* |
|
would have. |
|
""" |
|
return f"__attr_converter_{attr_name}" |
|
|
|
def _fmt_converter_call(self, attr_name: str, value_var: str) -> str: |
|
""" |
|
Return a string that calls the converter for an attribute name |
|
*attr_name* and the value in variable named *value_var* according to |
|
`self.takes_self` and `self.takes_field`. |
|
""" |
|
if not (self.takes_self or self.takes_field): |
|
return f"{self._get_global_name(attr_name)}({value_var})" |
|
|
|
if self.takes_self and self.takes_field: |
|
return f"{self._get_global_name(attr_name)}({value_var}, self, attr_dict['{attr_name}'])" |
|
|
|
if self.takes_self: |
|
return f"{self._get_global_name(attr_name)}({value_var}, self)" |
|
|
|
return f"{self._get_global_name(attr_name)}({value_var}, attr_dict['{attr_name}'])" |
|
|
|
def __getstate__(self): |
|
""" |
|
Return a dict containing only converter and takes_self -- the rest gets |
|
computed when loading. |
|
""" |
|
return { |
|
"converter": self.converter, |
|
"takes_self": self.takes_self, |
|
"takes_field": self.takes_field, |
|
} |
|
|
|
def __setstate__(self, state): |
|
""" |
|
Load instance from state. |
|
""" |
|
self.__init__(**state) |
|
|
|
|
|
_f = [ |
|
Attribute( |
|
name=name, |
|
default=NOTHING, |
|
validator=None, |
|
repr=True, |
|
cmp=None, |
|
eq=True, |
|
order=False, |
|
hash=True, |
|
init=True, |
|
inherited=False, |
|
) |
|
for name in ("converter", "takes_self", "takes_field") |
|
] |
|
|
|
Converter = _add_hash( |
|
_add_eq(_add_repr(Converter, attrs=_f), attrs=_f), attrs=_f |
|
) |
|
|
|
|
|
def make_class( |
|
name, attrs, bases=(object,), class_body=None, **attributes_arguments |
|
): |
|
r""" |
|
A quick way to create a new class called *name* with *attrs*. |
|
|
|
Args: |
|
name (str): The name for the new class. |
|
|
|
attrs( list | dict): |
|
A list of names or a dictionary of mappings of names to `attr.ib`\ |
|
s / `attrs.field`\ s. |
|
|
|
The order is deduced from the order of the names or attributes |
|
inside *attrs*. Otherwise the order of the definition of the |
|
attributes is used. |
|
|
|
bases (tuple[type, ...]): Classes that the new class will subclass. |
|
|
|
class_body (dict): |
|
An optional dictionary of class attributes for the new class. |
|
|
|
attributes_arguments: Passed unmodified to `attr.s`. |
|
|
|
Returns: |
|
type: A new class with *attrs*. |
|
|
|
.. versionadded:: 17.1.0 *bases* |
|
.. versionchanged:: 18.1.0 If *attrs* is ordered, the order is retained. |
|
.. versionchanged:: 23.2.0 *class_body* |
|
""" |
|
if isinstance(attrs, dict): |
|
cls_dict = attrs |
|
elif isinstance(attrs, (list, tuple)): |
|
cls_dict = {a: attrib() for a in attrs} |
|
else: |
|
msg = "attrs argument must be a dict or a list." |
|
raise TypeError(msg) |
|
|
|
pre_init = cls_dict.pop("__attrs_pre_init__", None) |
|
post_init = cls_dict.pop("__attrs_post_init__", None) |
|
user_init = cls_dict.pop("__init__", None) |
|
|
|
body = {} |
|
if class_body is not None: |
|
body.update(class_body) |
|
if pre_init is not None: |
|
body["__attrs_pre_init__"] = pre_init |
|
if post_init is not None: |
|
body["__attrs_post_init__"] = post_init |
|
if user_init is not None: |
|
body["__init__"] = user_init |
|
|
|
type_ = types.new_class(name, bases, {}, lambda ns: ns.update(body)) |
|
|
|
|
|
|
|
|
|
|
|
with contextlib.suppress(AttributeError, ValueError): |
|
type_.__module__ = sys._getframe(1).f_globals.get( |
|
"__name__", "__main__" |
|
) |
|
|
|
|
|
cmp = attributes_arguments.pop("cmp", None) |
|
( |
|
attributes_arguments["eq"], |
|
attributes_arguments["order"], |
|
) = _determine_attrs_eq_order( |
|
cmp, |
|
attributes_arguments.get("eq"), |
|
attributes_arguments.get("order"), |
|
True, |
|
) |
|
|
|
cls = _attrs(these=cls_dict, **attributes_arguments)(type_) |
|
|
|
cls.__annotations__ = { |
|
k: v.type for k, v in cls_dict.items() if v.type is not None |
|
} |
|
return cls |
|
|
|
|
|
|
|
|
|
|
|
|
|
@attrs(slots=True, unsafe_hash=True) |
|
class _AndValidator: |
|
""" |
|
Compose many validators to a single one. |
|
""" |
|
|
|
_validators = attrib() |
|
|
|
def __call__(self, inst, attr, value): |
|
for v in self._validators: |
|
v(inst, attr, value) |
|
|
|
|
|
def and_(*validators): |
|
""" |
|
A validator that composes multiple validators into one. |
|
|
|
When called on a value, it runs all wrapped validators. |
|
|
|
Args: |
|
validators (~collections.abc.Iterable[typing.Callable]): |
|
Arbitrary number of validators. |
|
|
|
.. versionadded:: 17.1.0 |
|
""" |
|
vals = [] |
|
for validator in validators: |
|
vals.extend( |
|
validator._validators |
|
if isinstance(validator, _AndValidator) |
|
else [validator] |
|
) |
|
|
|
return _AndValidator(tuple(vals)) |
|
|
|
|
|
def pipe(*converters): |
|
""" |
|
A converter that composes multiple converters into one. |
|
|
|
When called on a value, it runs all wrapped converters, returning the |
|
*last* value. |
|
|
|
Type annotations will be inferred from the wrapped converters', if they |
|
have any. |
|
|
|
converters (~collections.abc.Iterable[typing.Callable]): |
|
Arbitrary number of converters. |
|
|
|
.. versionadded:: 20.1.0 |
|
""" |
|
|
|
def pipe_converter(val, inst, field): |
|
for c in converters: |
|
val = c(val, inst, field) if isinstance(c, Converter) else c(val) |
|
|
|
return val |
|
|
|
if not converters: |
|
|
|
A = typing.TypeVar("A") |
|
pipe_converter.__annotations__.update({"val": A, "return": A}) |
|
else: |
|
|
|
t = _AnnotationExtractor(converters[0]).get_first_param_type() |
|
if t: |
|
pipe_converter.__annotations__["val"] = t |
|
|
|
last = converters[-1] |
|
if not PY_3_11_PLUS and isinstance(last, Converter): |
|
last = last.__call__ |
|
|
|
|
|
rt = _AnnotationExtractor(last).get_return_type() |
|
if rt: |
|
pipe_converter.__annotations__["return"] = rt |
|
|
|
return Converter(pipe_converter, takes_self=True, takes_field=True) |
|
|