|
import functools |
|
import logging |
|
import os |
|
from typing import TYPE_CHECKING, Dict, List, Optional, Set, Tuple, cast |
|
|
|
from pip._vendor.packaging.utils import canonicalize_name |
|
from pip._vendor.resolvelib import BaseReporter, ResolutionImpossible |
|
from pip._vendor.resolvelib import Resolver as RLResolver |
|
from pip._vendor.resolvelib.structs import DirectedGraph |
|
|
|
from pip._internal.cache import WheelCache |
|
from pip._internal.index.package_finder import PackageFinder |
|
from pip._internal.operations.prepare import RequirementPreparer |
|
from pip._internal.req.req_install import InstallRequirement |
|
from pip._internal.req.req_set import RequirementSet |
|
from pip._internal.resolution.base import BaseResolver, InstallRequirementProvider |
|
from pip._internal.resolution.resolvelib.provider import PipProvider |
|
from pip._internal.resolution.resolvelib.reporter import ( |
|
PipDebuggingReporter, |
|
PipReporter, |
|
) |
|
|
|
from .base import Candidate, Requirement |
|
from .factory import Factory |
|
|
|
if TYPE_CHECKING: |
|
from pip._vendor.resolvelib.resolvers import Result as RLResult |
|
|
|
Result = RLResult[Requirement, Candidate, str] |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
class Resolver(BaseResolver): |
|
_allowed_strategies = {"eager", "only-if-needed", "to-satisfy-only"} |
|
|
|
def __init__( |
|
self, |
|
preparer: RequirementPreparer, |
|
finder: PackageFinder, |
|
wheel_cache: Optional[WheelCache], |
|
make_install_req: InstallRequirementProvider, |
|
use_user_site: bool, |
|
ignore_dependencies: bool, |
|
ignore_installed: bool, |
|
ignore_requires_python: bool, |
|
force_reinstall: bool, |
|
upgrade_strategy: str, |
|
py_version_info: Optional[Tuple[int, ...]] = None, |
|
): |
|
super().__init__() |
|
assert upgrade_strategy in self._allowed_strategies |
|
|
|
self.factory = Factory( |
|
finder=finder, |
|
preparer=preparer, |
|
make_install_req=make_install_req, |
|
wheel_cache=wheel_cache, |
|
use_user_site=use_user_site, |
|
force_reinstall=force_reinstall, |
|
ignore_installed=ignore_installed, |
|
ignore_requires_python=ignore_requires_python, |
|
py_version_info=py_version_info, |
|
) |
|
self.ignore_dependencies = ignore_dependencies |
|
self.upgrade_strategy = upgrade_strategy |
|
self._result: Optional[Result] = None |
|
|
|
def resolve( |
|
self, root_reqs: List[InstallRequirement], check_supported_wheels: bool |
|
) -> RequirementSet: |
|
collected = self.factory.collect_root_requirements(root_reqs) |
|
provider = PipProvider( |
|
factory=self.factory, |
|
constraints=collected.constraints, |
|
ignore_dependencies=self.ignore_dependencies, |
|
upgrade_strategy=self.upgrade_strategy, |
|
user_requested=collected.user_requested, |
|
) |
|
if "PIP_RESOLVER_DEBUG" in os.environ: |
|
reporter: BaseReporter = PipDebuggingReporter() |
|
else: |
|
reporter = PipReporter() |
|
resolver: RLResolver[Requirement, Candidate, str] = RLResolver( |
|
provider, |
|
reporter, |
|
) |
|
|
|
try: |
|
try_to_avoid_resolution_too_deep = 2000000 |
|
result = self._result = resolver.resolve( |
|
collected.requirements, max_rounds=try_to_avoid_resolution_too_deep |
|
) |
|
|
|
except ResolutionImpossible as e: |
|
error = self.factory.get_installation_error( |
|
cast("ResolutionImpossible[Requirement, Candidate]", e), |
|
collected.constraints, |
|
) |
|
raise error from e |
|
|
|
req_set = RequirementSet(check_supported_wheels=check_supported_wheels) |
|
for candidate in result.mapping.values(): |
|
ireq = candidate.get_install_requirement() |
|
if ireq is None: |
|
continue |
|
|
|
|
|
|
|
installed_dist = self.factory.get_dist_to_uninstall(candidate) |
|
if installed_dist is None: |
|
|
|
ireq.should_reinstall = False |
|
elif self.factory.force_reinstall: |
|
|
|
ireq.should_reinstall = True |
|
elif installed_dist.version != candidate.version: |
|
|
|
ireq.should_reinstall = True |
|
elif candidate.is_editable or installed_dist.editable: |
|
|
|
|
|
ireq.should_reinstall = True |
|
elif candidate.source_link and candidate.source_link.is_file: |
|
|
|
if candidate.source_link.is_wheel: |
|
|
|
logger.info( |
|
"%s is already installed with the same version as the " |
|
"provided wheel. Use --force-reinstall to force an " |
|
"installation of the wheel.", |
|
ireq.name, |
|
) |
|
continue |
|
|
|
|
|
ireq.should_reinstall = True |
|
else: |
|
continue |
|
|
|
link = candidate.source_link |
|
if link and link.is_yanked: |
|
|
|
|
|
msg = ( |
|
"The candidate selected for download or install is a " |
|
"yanked version: {name!r} candidate (version {version} " |
|
"at {link})\nReason for being yanked: {reason}" |
|
).format( |
|
name=candidate.name, |
|
version=candidate.version, |
|
link=link, |
|
reason=link.yanked_reason or "<none given>", |
|
) |
|
logger.warning(msg) |
|
|
|
req_set.add_named_requirement(ireq) |
|
|
|
reqs = req_set.all_requirements |
|
self.factory.preparer.prepare_linked_requirements_more(reqs) |
|
return req_set |
|
|
|
def get_installation_order( |
|
self, req_set: RequirementSet |
|
) -> List[InstallRequirement]: |
|
"""Get order for installation of requirements in RequirementSet. |
|
|
|
The returned list contains a requirement before another that depends on |
|
it. This helps ensure that the environment is kept consistent as they |
|
get installed one-by-one. |
|
|
|
The current implementation creates a topological ordering of the |
|
dependency graph, giving more weight to packages with less |
|
or no dependencies, while breaking any cycles in the graph at |
|
arbitrary points. We make no guarantees about where the cycle |
|
would be broken, other than it *would* be broken. |
|
""" |
|
assert self._result is not None, "must call resolve() first" |
|
|
|
if not req_set.requirements: |
|
|
|
return [] |
|
|
|
graph = self._result.graph |
|
weights = get_topological_weights(graph, set(req_set.requirements.keys())) |
|
|
|
sorted_items = sorted( |
|
req_set.requirements.items(), |
|
key=functools.partial(_req_set_item_sorter, weights=weights), |
|
reverse=True, |
|
) |
|
return [ireq for _, ireq in sorted_items] |
|
|
|
|
|
def get_topological_weights( |
|
graph: "DirectedGraph[Optional[str]]", requirement_keys: Set[str] |
|
) -> Dict[Optional[str], int]: |
|
"""Assign weights to each node based on how "deep" they are. |
|
|
|
This implementation may change at any point in the future without prior |
|
notice. |
|
|
|
We first simplify the dependency graph by pruning any leaves and giving them |
|
the highest weight: a package without any dependencies should be installed |
|
first. This is done again and again in the same way, giving ever less weight |
|
to the newly found leaves. The loop stops when no leaves are left: all |
|
remaining packages have at least one dependency left in the graph. |
|
|
|
Then we continue with the remaining graph, by taking the length for the |
|
longest path to any node from root, ignoring any paths that contain a single |
|
node twice (i.e. cycles). This is done through a depth-first search through |
|
the graph, while keeping track of the path to the node. |
|
|
|
Cycles in the graph result would result in node being revisited while also |
|
being on its own path. In this case, take no action. This helps ensure we |
|
don't get stuck in a cycle. |
|
|
|
When assigning weight, the longer path (i.e. larger length) is preferred. |
|
|
|
We are only interested in the weights of packages that are in the |
|
requirement_keys. |
|
""" |
|
path: Set[Optional[str]] = set() |
|
weights: Dict[Optional[str], int] = {} |
|
|
|
def visit(node: Optional[str]) -> None: |
|
if node in path: |
|
|
|
return |
|
|
|
|
|
path.add(node) |
|
for child in graph.iter_children(node): |
|
visit(child) |
|
path.remove(node) |
|
|
|
if node not in requirement_keys: |
|
return |
|
|
|
last_known_parent_count = weights.get(node, 0) |
|
weights[node] = max(last_known_parent_count, len(path)) |
|
|
|
|
|
|
|
|
|
|
|
|
|
while True: |
|
leaves = set() |
|
for key in graph: |
|
if key is None: |
|
continue |
|
for _child in graph.iter_children(key): |
|
|
|
break |
|
else: |
|
|
|
leaves.add(key) |
|
if not leaves: |
|
|
|
break |
|
|
|
weight = len(graph) - 1 |
|
for leaf in leaves: |
|
if leaf not in requirement_keys: |
|
continue |
|
weights[leaf] = weight |
|
|
|
for leaf in leaves: |
|
graph.remove(leaf) |
|
|
|
|
|
|
|
visit(None) |
|
|
|
|
|
|
|
difference = set(weights.keys()).difference(requirement_keys) |
|
assert not difference, difference |
|
|
|
return weights |
|
|
|
|
|
def _req_set_item_sorter( |
|
item: Tuple[str, InstallRequirement], |
|
weights: Dict[Optional[str], int], |
|
) -> Tuple[int, str]: |
|
"""Key function used to sort install requirements for installation. |
|
|
|
Based on the "weight" mapping calculated in ``get_installation_order()``. |
|
The canonical package name is returned as the second member as a tie- |
|
breaker to ensure the result is predictable, which is useful in tests. |
|
""" |
|
name = canonicalize_name(item[0]) |
|
return weights[name], name |
|
|