|
import collections |
|
import math |
|
from typing import ( |
|
TYPE_CHECKING, |
|
Dict, |
|
Iterable, |
|
Iterator, |
|
Mapping, |
|
Sequence, |
|
TypeVar, |
|
Union, |
|
) |
|
|
|
from pip._vendor.resolvelib.providers import AbstractProvider |
|
|
|
from .base import Candidate, Constraint, Requirement |
|
from .candidates import REQUIRES_PYTHON_IDENTIFIER |
|
from .factory import Factory |
|
|
|
if TYPE_CHECKING: |
|
from pip._vendor.resolvelib.providers import Preference |
|
from pip._vendor.resolvelib.resolvers import RequirementInformation |
|
|
|
PreferenceInformation = RequirementInformation[Requirement, Candidate] |
|
|
|
_ProviderBase = AbstractProvider[Requirement, Candidate, str] |
|
else: |
|
_ProviderBase = AbstractProvider |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
D = TypeVar("D") |
|
V = TypeVar("V") |
|
|
|
|
|
def _get_with_identifier( |
|
mapping: Mapping[str, V], |
|
identifier: str, |
|
default: D, |
|
) -> Union[D, V]: |
|
"""Get item from a package name lookup mapping with a resolver identifier. |
|
|
|
This extra logic is needed when the target mapping is keyed by package |
|
name, which cannot be directly looked up with an identifier (which may |
|
contain requested extras). Additional logic is added to also look up a value |
|
by "cleaning up" the extras from the identifier. |
|
""" |
|
if identifier in mapping: |
|
return mapping[identifier] |
|
|
|
|
|
|
|
|
|
|
|
name, open_bracket, _ = identifier.partition("[") |
|
if open_bracket and name in mapping: |
|
return mapping[name] |
|
return default |
|
|
|
|
|
class PipProvider(_ProviderBase): |
|
"""Pip's provider implementation for resolvelib. |
|
|
|
:params constraints: A mapping of constraints specified by the user. Keys |
|
are canonicalized project names. |
|
:params ignore_dependencies: Whether the user specified ``--no-deps``. |
|
:params upgrade_strategy: The user-specified upgrade strategy. |
|
:params user_requested: A set of canonicalized package names that the user |
|
supplied for pip to install/upgrade. |
|
""" |
|
|
|
def __init__( |
|
self, |
|
factory: Factory, |
|
constraints: Dict[str, Constraint], |
|
ignore_dependencies: bool, |
|
upgrade_strategy: str, |
|
user_requested: Dict[str, int], |
|
) -> None: |
|
self._factory = factory |
|
self._constraints = constraints |
|
self._ignore_dependencies = ignore_dependencies |
|
self._upgrade_strategy = upgrade_strategy |
|
self._user_requested = user_requested |
|
self._known_depths: Dict[str, float] = collections.defaultdict(lambda: math.inf) |
|
|
|
def identify(self, requirement_or_candidate: Union[Requirement, Candidate]) -> str: |
|
return requirement_or_candidate.name |
|
|
|
def get_preference( |
|
self, |
|
identifier: str, |
|
resolutions: Mapping[str, Candidate], |
|
candidates: Mapping[str, Iterator[Candidate]], |
|
information: Mapping[str, Iterable["PreferenceInformation"]], |
|
backtrack_causes: Sequence["PreferenceInformation"], |
|
) -> "Preference": |
|
"""Produce a sort key for given requirement based on preference. |
|
|
|
The lower the return value is, the more preferred this group of |
|
arguments is. |
|
|
|
Currently pip considers the following in order: |
|
|
|
* Prefer if any of the known requirements is "direct", e.g. points to an |
|
explicit URL. |
|
* If equal, prefer if any requirement is "pinned", i.e. contains |
|
operator ``===`` or ``==``. |
|
* If equal, calculate an approximate "depth" and resolve requirements |
|
closer to the user-specified requirements first. |
|
* Order user-specified requirements by the order they are specified. |
|
* If equal, prefers "non-free" requirements, i.e. contains at least one |
|
operator, such as ``>=`` or ``<``. |
|
* If equal, order alphabetically for consistency (helps debuggability). |
|
""" |
|
lookups = (r.get_candidate_lookup() for r, _ in information[identifier]) |
|
candidate, ireqs = zip(*lookups) |
|
operators = [ |
|
specifier.operator |
|
for specifier_set in (ireq.specifier for ireq in ireqs if ireq) |
|
for specifier in specifier_set |
|
] |
|
|
|
direct = candidate is not None |
|
pinned = any(op[:2] == "==" for op in operators) |
|
unfree = bool(operators) |
|
|
|
try: |
|
requested_order: Union[int, float] = self._user_requested[identifier] |
|
except KeyError: |
|
requested_order = math.inf |
|
parent_depths = ( |
|
self._known_depths[parent.name] if parent is not None else 0.0 |
|
for _, parent in information[identifier] |
|
) |
|
inferred_depth = min(d for d in parent_depths) + 1.0 |
|
else: |
|
inferred_depth = 1.0 |
|
self._known_depths[identifier] = inferred_depth |
|
|
|
requested_order = self._user_requested.get(identifier, math.inf) |
|
|
|
|
|
|
|
requires_python = identifier == REQUIRES_PYTHON_IDENTIFIER |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
delay_this = identifier == "setuptools" |
|
|
|
|
|
|
|
|
|
backtrack_cause = self.is_backtrack_cause(identifier, backtrack_causes) |
|
|
|
return ( |
|
not requires_python, |
|
delay_this, |
|
not direct, |
|
not pinned, |
|
not backtrack_cause, |
|
inferred_depth, |
|
requested_order, |
|
not unfree, |
|
identifier, |
|
) |
|
|
|
def find_matches( |
|
self, |
|
identifier: str, |
|
requirements: Mapping[str, Iterator[Requirement]], |
|
incompatibilities: Mapping[str, Iterator[Candidate]], |
|
) -> Iterable[Candidate]: |
|
def _eligible_for_upgrade(identifier: str) -> bool: |
|
"""Are upgrades allowed for this project? |
|
|
|
This checks the upgrade strategy, and whether the project was one |
|
that the user specified in the command line, in order to decide |
|
whether we should upgrade if there's a newer version available. |
|
|
|
(Note that we don't need access to the `--upgrade` flag, because |
|
an upgrade strategy of "to-satisfy-only" means that `--upgrade` |
|
was not specified). |
|
""" |
|
if self._upgrade_strategy == "eager": |
|
return True |
|
elif self._upgrade_strategy == "only-if-needed": |
|
user_order = _get_with_identifier( |
|
self._user_requested, |
|
identifier, |
|
default=None, |
|
) |
|
return user_order is not None |
|
return False |
|
|
|
constraint = _get_with_identifier( |
|
self._constraints, |
|
identifier, |
|
default=Constraint.empty(), |
|
) |
|
return self._factory.find_candidates( |
|
identifier=identifier, |
|
requirements=requirements, |
|
constraint=constraint, |
|
prefers_installed=(not _eligible_for_upgrade(identifier)), |
|
incompatibilities=incompatibilities, |
|
) |
|
|
|
def is_satisfied_by(self, requirement: Requirement, candidate: Candidate) -> bool: |
|
return requirement.is_satisfied_by(candidate) |
|
|
|
def get_dependencies(self, candidate: Candidate) -> Sequence[Requirement]: |
|
with_requires = not self._ignore_dependencies |
|
return [r for r in candidate.iter_dependencies(with_requires) if r is not None] |
|
|
|
@staticmethod |
|
def is_backtrack_cause( |
|
identifier: str, backtrack_causes: Sequence["PreferenceInformation"] |
|
) -> bool: |
|
for backtrack_cause in backtrack_causes: |
|
if identifier == backtrack_cause.requirement.name: |
|
return True |
|
if backtrack_cause.parent and identifier == backtrack_cause.parent.name: |
|
return True |
|
return False |
|
|