Spaces:
Runtime error
Runtime error
| """Routines related to PyPI, indexes""" | |
| # The following comment should be removed at some point in the future. | |
| # mypy: strict-optional=False | |
| import functools | |
| import itertools | |
| import logging | |
| import re | |
| from typing import FrozenSet, Iterable, List, Optional, Set, Tuple, Union | |
| from pip._vendor.packaging import specifiers | |
| from pip._vendor.packaging.tags import Tag | |
| from pip._vendor.packaging.utils import canonicalize_name | |
| from pip._vendor.packaging.version import _BaseVersion | |
| from pip._vendor.packaging.version import parse as parse_version | |
| from pip._internal.exceptions import ( | |
| BestVersionAlreadyInstalled, | |
| DistributionNotFound, | |
| InvalidWheelFilename, | |
| UnsupportedWheel, | |
| ) | |
| from pip._internal.index.collector import LinkCollector, parse_links | |
| from pip._internal.models.candidate import InstallationCandidate | |
| from pip._internal.models.format_control import FormatControl | |
| from pip._internal.models.link import Link | |
| from pip._internal.models.search_scope import SearchScope | |
| from pip._internal.models.selection_prefs import SelectionPreferences | |
| from pip._internal.models.target_python import TargetPython | |
| from pip._internal.models.wheel import Wheel | |
| from pip._internal.req import InstallRequirement | |
| from pip._internal.utils._log import getLogger | |
| from pip._internal.utils.filetypes import WHEEL_EXTENSION | |
| from pip._internal.utils.hashes import Hashes | |
| from pip._internal.utils.logging import indent_log | |
| from pip._internal.utils.misc import build_netloc | |
| from pip._internal.utils.packaging import check_requires_python | |
| from pip._internal.utils.unpacking import SUPPORTED_EXTENSIONS | |
| __all__ = ["FormatControl", "BestCandidateResult", "PackageFinder"] | |
| logger = getLogger(__name__) | |
| BuildTag = Union[Tuple[()], Tuple[int, str]] | |
| CandidateSortingKey = Tuple[int, int, int, _BaseVersion, Optional[int], BuildTag] | |
| def _check_link_requires_python( | |
| link: Link, | |
| version_info: Tuple[int, int, int], | |
| ignore_requires_python: bool = False, | |
| ) -> bool: | |
| """ | |
| Return whether the given Python version is compatible with a link's | |
| "Requires-Python" value. | |
| :param version_info: A 3-tuple of ints representing the Python | |
| major-minor-micro version to check. | |
| :param ignore_requires_python: Whether to ignore the "Requires-Python" | |
| value if the given Python version isn't compatible. | |
| """ | |
| try: | |
| is_compatible = check_requires_python( | |
| link.requires_python, | |
| version_info=version_info, | |
| ) | |
| except specifiers.InvalidSpecifier: | |
| logger.debug( | |
| "Ignoring invalid Requires-Python (%r) for link: %s", | |
| link.requires_python, | |
| link, | |
| ) | |
| else: | |
| if not is_compatible: | |
| version = ".".join(map(str, version_info)) | |
| if not ignore_requires_python: | |
| logger.verbose( | |
| "Link requires a different Python (%s not in: %r): %s", | |
| version, | |
| link.requires_python, | |
| link, | |
| ) | |
| return False | |
| logger.debug( | |
| "Ignoring failed Requires-Python check (%s not in: %r) for link: %s", | |
| version, | |
| link.requires_python, | |
| link, | |
| ) | |
| return True | |
| class LinkEvaluator: | |
| """ | |
| Responsible for evaluating links for a particular project. | |
| """ | |
| _py_version_re = re.compile(r"-py([123]\.?[0-9]?)$") | |
| # Don't include an allow_yanked default value to make sure each call | |
| # site considers whether yanked releases are allowed. This also causes | |
| # that decision to be made explicit in the calling code, which helps | |
| # people when reading the code. | |
| def __init__( | |
| self, | |
| project_name: str, | |
| canonical_name: str, | |
| formats: FrozenSet[str], | |
| target_python: TargetPython, | |
| allow_yanked: bool, | |
| ignore_requires_python: Optional[bool] = None, | |
| ) -> None: | |
| """ | |
| :param project_name: The user supplied package name. | |
| :param canonical_name: The canonical package name. | |
| :param formats: The formats allowed for this package. Should be a set | |
| with 'binary' or 'source' or both in it. | |
| :param target_python: The target Python interpreter to use when | |
| evaluating link compatibility. This is used, for example, to | |
| check wheel compatibility, as well as when checking the Python | |
| version, e.g. the Python version embedded in a link filename | |
| (or egg fragment) and against an HTML link's optional PEP 503 | |
| "data-requires-python" attribute. | |
| :param allow_yanked: Whether files marked as yanked (in the sense | |
| of PEP 592) are permitted to be candidates for install. | |
| :param ignore_requires_python: Whether to ignore incompatible | |
| PEP 503 "data-requires-python" values in HTML links. Defaults | |
| to False. | |
| """ | |
| if ignore_requires_python is None: | |
| ignore_requires_python = False | |
| self._allow_yanked = allow_yanked | |
| self._canonical_name = canonical_name | |
| self._ignore_requires_python = ignore_requires_python | |
| self._formats = formats | |
| self._target_python = target_python | |
| self.project_name = project_name | |
| def evaluate_link(self, link: Link) -> Tuple[bool, Optional[str]]: | |
| """ | |
| Determine whether a link is a candidate for installation. | |
| :return: A tuple (is_candidate, result), where `result` is (1) a | |
| version string if `is_candidate` is True, and (2) if | |
| `is_candidate` is False, an optional string to log the reason | |
| the link fails to qualify. | |
| """ | |
| version = None | |
| if link.is_yanked and not self._allow_yanked: | |
| reason = link.yanked_reason or "<none given>" | |
| return (False, f"yanked for reason: {reason}") | |
| if link.egg_fragment: | |
| egg_info = link.egg_fragment | |
| ext = link.ext | |
| else: | |
| egg_info, ext = link.splitext() | |
| if not ext: | |
| return (False, "not a file") | |
| if ext not in SUPPORTED_EXTENSIONS: | |
| return (False, f"unsupported archive format: {ext}") | |
| if "binary" not in self._formats and ext == WHEEL_EXTENSION: | |
| reason = "No binaries permitted for {}".format(self.project_name) | |
| return (False, reason) | |
| if "macosx10" in link.path and ext == ".zip": | |
| return (False, "macosx10 one") | |
| if ext == WHEEL_EXTENSION: | |
| try: | |
| wheel = Wheel(link.filename) | |
| except InvalidWheelFilename: | |
| return (False, "invalid wheel filename") | |
| if canonicalize_name(wheel.name) != self._canonical_name: | |
| reason = "wrong project name (not {})".format(self.project_name) | |
| return (False, reason) | |
| supported_tags = self._target_python.get_tags() | |
| if not wheel.supported(supported_tags): | |
| # Include the wheel's tags in the reason string to | |
| # simplify troubleshooting compatibility issues. | |
| file_tags = wheel.get_formatted_file_tags() | |
| reason = ( | |
| "none of the wheel's tags ({}) are compatible " | |
| "(run pip debug --verbose to show compatible tags)".format( | |
| ", ".join(file_tags) | |
| ) | |
| ) | |
| return (False, reason) | |
| version = wheel.version | |
| # This should be up by the self.ok_binary check, but see issue 2700. | |
| if "source" not in self._formats and ext != WHEEL_EXTENSION: | |
| reason = f"No sources permitted for {self.project_name}" | |
| return (False, reason) | |
| if not version: | |
| version = _extract_version_from_fragment( | |
| egg_info, | |
| self._canonical_name, | |
| ) | |
| if not version: | |
| reason = f"Missing project version for {self.project_name}" | |
| return (False, reason) | |
| match = self._py_version_re.search(version) | |
| if match: | |
| version = version[: match.start()] | |
| py_version = match.group(1) | |
| if py_version != self._target_python.py_version: | |
| return (False, "Python version is incorrect") | |
| supports_python = _check_link_requires_python( | |
| link, | |
| version_info=self._target_python.py_version_info, | |
| ignore_requires_python=self._ignore_requires_python, | |
| ) | |
| if not supports_python: | |
| # Return None for the reason text to suppress calling | |
| # _log_skipped_link(). | |
| return (False, None) | |
| logger.debug("Found link %s, version: %s", link, version) | |
| return (True, version) | |
| def filter_unallowed_hashes( | |
| candidates: List[InstallationCandidate], | |
| hashes: Hashes, | |
| project_name: str, | |
| ) -> List[InstallationCandidate]: | |
| """ | |
| Filter out candidates whose hashes aren't allowed, and return a new | |
| list of candidates. | |
| If at least one candidate has an allowed hash, then all candidates with | |
| either an allowed hash or no hash specified are returned. Otherwise, | |
| the given candidates are returned. | |
| Including the candidates with no hash specified when there is a match | |
| allows a warning to be logged if there is a more preferred candidate | |
| with no hash specified. Returning all candidates in the case of no | |
| matches lets pip report the hash of the candidate that would otherwise | |
| have been installed (e.g. permitting the user to more easily update | |
| their requirements file with the desired hash). | |
| """ | |
| if not hashes: | |
| logger.debug( | |
| "Given no hashes to check %s links for project %r: " | |
| "discarding no candidates", | |
| len(candidates), | |
| project_name, | |
| ) | |
| # Make sure we're not returning back the given value. | |
| return list(candidates) | |
| matches_or_no_digest = [] | |
| # Collect the non-matches for logging purposes. | |
| non_matches = [] | |
| match_count = 0 | |
| for candidate in candidates: | |
| link = candidate.link | |
| if not link.has_hash: | |
| pass | |
| elif link.is_hash_allowed(hashes=hashes): | |
| match_count += 1 | |
| else: | |
| non_matches.append(candidate) | |
| continue | |
| matches_or_no_digest.append(candidate) | |
| if match_count: | |
| filtered = matches_or_no_digest | |
| else: | |
| # Make sure we're not returning back the given value. | |
| filtered = list(candidates) | |
| if len(filtered) == len(candidates): | |
| discard_message = "discarding no candidates" | |
| else: | |
| discard_message = "discarding {} non-matches:\n {}".format( | |
| len(non_matches), | |
| "\n ".join(str(candidate.link) for candidate in non_matches), | |
| ) | |
| logger.debug( | |
| "Checked %s links for project %r against %s hashes " | |
| "(%s matches, %s no digest): %s", | |
| len(candidates), | |
| project_name, | |
| hashes.digest_count, | |
| match_count, | |
| len(matches_or_no_digest) - match_count, | |
| discard_message, | |
| ) | |
| return filtered | |
| class CandidatePreferences: | |
| """ | |
| Encapsulates some of the preferences for filtering and sorting | |
| InstallationCandidate objects. | |
| """ | |
| def __init__( | |
| self, | |
| prefer_binary: bool = False, | |
| allow_all_prereleases: bool = False, | |
| ) -> None: | |
| """ | |
| :param allow_all_prereleases: Whether to allow all pre-releases. | |
| """ | |
| self.allow_all_prereleases = allow_all_prereleases | |
| self.prefer_binary = prefer_binary | |
| class BestCandidateResult: | |
| """A collection of candidates, returned by `PackageFinder.find_best_candidate`. | |
| This class is only intended to be instantiated by CandidateEvaluator's | |
| `compute_best_candidate()` method. | |
| """ | |
| def __init__( | |
| self, | |
| candidates: List[InstallationCandidate], | |
| applicable_candidates: List[InstallationCandidate], | |
| best_candidate: Optional[InstallationCandidate], | |
| ) -> None: | |
| """ | |
| :param candidates: A sequence of all available candidates found. | |
| :param applicable_candidates: The applicable candidates. | |
| :param best_candidate: The most preferred candidate found, or None | |
| if no applicable candidates were found. | |
| """ | |
| assert set(applicable_candidates) <= set(candidates) | |
| if best_candidate is None: | |
| assert not applicable_candidates | |
| else: | |
| assert best_candidate in applicable_candidates | |
| self._applicable_candidates = applicable_candidates | |
| self._candidates = candidates | |
| self.best_candidate = best_candidate | |
| def iter_all(self) -> Iterable[InstallationCandidate]: | |
| """Iterate through all candidates.""" | |
| return iter(self._candidates) | |
| def iter_applicable(self) -> Iterable[InstallationCandidate]: | |
| """Iterate through the applicable candidates.""" | |
| return iter(self._applicable_candidates) | |
| class CandidateEvaluator: | |
| """ | |
| Responsible for filtering and sorting candidates for installation based | |
| on what tags are valid. | |
| """ | |
| def create( | |
| cls, | |
| project_name: str, | |
| target_python: Optional[TargetPython] = None, | |
| prefer_binary: bool = False, | |
| allow_all_prereleases: bool = False, | |
| specifier: Optional[specifiers.BaseSpecifier] = None, | |
| hashes: Optional[Hashes] = None, | |
| ) -> "CandidateEvaluator": | |
| """Create a CandidateEvaluator object. | |
| :param target_python: The target Python interpreter to use when | |
| checking compatibility. If None (the default), a TargetPython | |
| object will be constructed from the running Python. | |
| :param specifier: An optional object implementing `filter` | |
| (e.g. `packaging.specifiers.SpecifierSet`) to filter applicable | |
| versions. | |
| :param hashes: An optional collection of allowed hashes. | |
| """ | |
| if target_python is None: | |
| target_python = TargetPython() | |
| if specifier is None: | |
| specifier = specifiers.SpecifierSet() | |
| supported_tags = target_python.get_tags() | |
| return cls( | |
| project_name=project_name, | |
| supported_tags=supported_tags, | |
| specifier=specifier, | |
| prefer_binary=prefer_binary, | |
| allow_all_prereleases=allow_all_prereleases, | |
| hashes=hashes, | |
| ) | |
| def __init__( | |
| self, | |
| project_name: str, | |
| supported_tags: List[Tag], | |
| specifier: specifiers.BaseSpecifier, | |
| prefer_binary: bool = False, | |
| allow_all_prereleases: bool = False, | |
| hashes: Optional[Hashes] = None, | |
| ) -> None: | |
| """ | |
| :param supported_tags: The PEP 425 tags supported by the target | |
| Python in order of preference (most preferred first). | |
| """ | |
| self._allow_all_prereleases = allow_all_prereleases | |
| self._hashes = hashes | |
| self._prefer_binary = prefer_binary | |
| self._project_name = project_name | |
| self._specifier = specifier | |
| self._supported_tags = supported_tags | |
| # Since the index of the tag in the _supported_tags list is used | |
| # as a priority, precompute a map from tag to index/priority to be | |
| # used in wheel.find_most_preferred_tag. | |
| self._wheel_tag_preferences = { | |
| tag: idx for idx, tag in enumerate(supported_tags) | |
| } | |
| def get_applicable_candidates( | |
| self, | |
| candidates: List[InstallationCandidate], | |
| ) -> List[InstallationCandidate]: | |
| """ | |
| Return the applicable candidates from a list of candidates. | |
| """ | |
| # Using None infers from the specifier instead. | |
| allow_prereleases = self._allow_all_prereleases or None | |
| specifier = self._specifier | |
| versions = { | |
| str(v) | |
| for v in specifier.filter( | |
| # We turn the version object into a str here because otherwise | |
| # when we're debundled but setuptools isn't, Python will see | |
| # packaging.version.Version and | |
| # pkg_resources._vendor.packaging.version.Version as different | |
| # types. This way we'll use a str as a common data interchange | |
| # format. If we stop using the pkg_resources provided specifier | |
| # and start using our own, we can drop the cast to str(). | |
| (str(c.version) for c in candidates), | |
| prereleases=allow_prereleases, | |
| ) | |
| } | |
| # Again, converting version to str to deal with debundling. | |
| applicable_candidates = [c for c in candidates if str(c.version) in versions] | |
| filtered_applicable_candidates = filter_unallowed_hashes( | |
| candidates=applicable_candidates, | |
| hashes=self._hashes, | |
| project_name=self._project_name, | |
| ) | |
| return sorted(filtered_applicable_candidates, key=self._sort_key) | |
| def _sort_key(self, candidate: InstallationCandidate) -> CandidateSortingKey: | |
| """ | |
| Function to pass as the `key` argument to a call to sorted() to sort | |
| InstallationCandidates by preference. | |
| Returns a tuple such that tuples sorting as greater using Python's | |
| default comparison operator are more preferred. | |
| The preference is as follows: | |
| First and foremost, candidates with allowed (matching) hashes are | |
| always preferred over candidates without matching hashes. This is | |
| because e.g. if the only candidate with an allowed hash is yanked, | |
| we still want to use that candidate. | |
| Second, excepting hash considerations, candidates that have been | |
| yanked (in the sense of PEP 592) are always less preferred than | |
| candidates that haven't been yanked. Then: | |
| If not finding wheels, they are sorted by version only. | |
| If finding wheels, then the sort order is by version, then: | |
| 1. existing installs | |
| 2. wheels ordered via Wheel.support_index_min(self._supported_tags) | |
| 3. source archives | |
| If prefer_binary was set, then all wheels are sorted above sources. | |
| Note: it was considered to embed this logic into the Link | |
| comparison operators, but then different sdist links | |
| with the same version, would have to be considered equal | |
| """ | |
| valid_tags = self._supported_tags | |
| support_num = len(valid_tags) | |
| build_tag: BuildTag = () | |
| binary_preference = 0 | |
| link = candidate.link | |
| if link.is_wheel: | |
| # can raise InvalidWheelFilename | |
| wheel = Wheel(link.filename) | |
| try: | |
| pri = -( | |
| wheel.find_most_preferred_tag( | |
| valid_tags, self._wheel_tag_preferences | |
| ) | |
| ) | |
| except ValueError: | |
| raise UnsupportedWheel( | |
| "{} is not a supported wheel for this platform. It " | |
| "can't be sorted.".format(wheel.filename) | |
| ) | |
| if self._prefer_binary: | |
| binary_preference = 1 | |
| if wheel.build_tag is not None: | |
| match = re.match(r"^(\d+)(.*)$", wheel.build_tag) | |
| build_tag_groups = match.groups() | |
| build_tag = (int(build_tag_groups[0]), build_tag_groups[1]) | |
| else: # sdist | |
| pri = -(support_num) | |
| has_allowed_hash = int(link.is_hash_allowed(self._hashes)) | |
| yank_value = -1 * int(link.is_yanked) # -1 for yanked. | |
| return ( | |
| has_allowed_hash, | |
| yank_value, | |
| binary_preference, | |
| candidate.version, | |
| pri, | |
| build_tag, | |
| ) | |
| def sort_best_candidate( | |
| self, | |
| candidates: List[InstallationCandidate], | |
| ) -> Optional[InstallationCandidate]: | |
| """ | |
| Return the best candidate per the instance's sort order, or None if | |
| no candidate is acceptable. | |
| """ | |
| if not candidates: | |
| return None | |
| best_candidate = max(candidates, key=self._sort_key) | |
| return best_candidate | |
| def compute_best_candidate( | |
| self, | |
| candidates: List[InstallationCandidate], | |
| ) -> BestCandidateResult: | |
| """ | |
| Compute and return a `BestCandidateResult` instance. | |
| """ | |
| applicable_candidates = self.get_applicable_candidates(candidates) | |
| best_candidate = self.sort_best_candidate(applicable_candidates) | |
| return BestCandidateResult( | |
| candidates, | |
| applicable_candidates=applicable_candidates, | |
| best_candidate=best_candidate, | |
| ) | |
| class PackageFinder: | |
| """This finds packages. | |
| This is meant to match easy_install's technique for looking for | |
| packages, by reading pages and looking for appropriate links. | |
| """ | |
| def __init__( | |
| self, | |
| link_collector: LinkCollector, | |
| target_python: TargetPython, | |
| allow_yanked: bool, | |
| use_deprecated_html5lib: bool, | |
| format_control: Optional[FormatControl] = None, | |
| candidate_prefs: Optional[CandidatePreferences] = None, | |
| ignore_requires_python: Optional[bool] = None, | |
| ) -> None: | |
| """ | |
| This constructor is primarily meant to be used by the create() class | |
| method and from tests. | |
| :param format_control: A FormatControl object, used to control | |
| the selection of source packages / binary packages when consulting | |
| the index and links. | |
| :param candidate_prefs: Options to use when creating a | |
| CandidateEvaluator object. | |
| """ | |
| if candidate_prefs is None: | |
| candidate_prefs = CandidatePreferences() | |
| format_control = format_control or FormatControl(set(), set()) | |
| self._allow_yanked = allow_yanked | |
| self._candidate_prefs = candidate_prefs | |
| self._ignore_requires_python = ignore_requires_python | |
| self._link_collector = link_collector | |
| self._target_python = target_python | |
| self._use_deprecated_html5lib = use_deprecated_html5lib | |
| self.format_control = format_control | |
| # These are boring links that have already been logged somehow. | |
| self._logged_links: Set[Link] = set() | |
| # Don't include an allow_yanked default value to make sure each call | |
| # site considers whether yanked releases are allowed. This also causes | |
| # that decision to be made explicit in the calling code, which helps | |
| # people when reading the code. | |
| def create( | |
| cls, | |
| link_collector: LinkCollector, | |
| selection_prefs: SelectionPreferences, | |
| target_python: Optional[TargetPython] = None, | |
| *, | |
| use_deprecated_html5lib: bool, | |
| ) -> "PackageFinder": | |
| """Create a PackageFinder. | |
| :param selection_prefs: The candidate selection preferences, as a | |
| SelectionPreferences object. | |
| :param target_python: The target Python interpreter to use when | |
| checking compatibility. If None (the default), a TargetPython | |
| object will be constructed from the running Python. | |
| """ | |
| if target_python is None: | |
| target_python = TargetPython() | |
| candidate_prefs = CandidatePreferences( | |
| prefer_binary=selection_prefs.prefer_binary, | |
| allow_all_prereleases=selection_prefs.allow_all_prereleases, | |
| ) | |
| return cls( | |
| candidate_prefs=candidate_prefs, | |
| link_collector=link_collector, | |
| target_python=target_python, | |
| allow_yanked=selection_prefs.allow_yanked, | |
| format_control=selection_prefs.format_control, | |
| ignore_requires_python=selection_prefs.ignore_requires_python, | |
| use_deprecated_html5lib=use_deprecated_html5lib, | |
| ) | |
| def target_python(self) -> TargetPython: | |
| return self._target_python | |
| def search_scope(self) -> SearchScope: | |
| return self._link_collector.search_scope | |
| def search_scope(self, search_scope: SearchScope) -> None: | |
| self._link_collector.search_scope = search_scope | |
| def find_links(self) -> List[str]: | |
| return self._link_collector.find_links | |
| def index_urls(self) -> List[str]: | |
| return self.search_scope.index_urls | |
| def trusted_hosts(self) -> Iterable[str]: | |
| for host_port in self._link_collector.session.pip_trusted_origins: | |
| yield build_netloc(*host_port) | |
| def allow_all_prereleases(self) -> bool: | |
| return self._candidate_prefs.allow_all_prereleases | |
| def set_allow_all_prereleases(self) -> None: | |
| self._candidate_prefs.allow_all_prereleases = True | |
| def prefer_binary(self) -> bool: | |
| return self._candidate_prefs.prefer_binary | |
| def set_prefer_binary(self) -> None: | |
| self._candidate_prefs.prefer_binary = True | |
| def make_link_evaluator(self, project_name: str) -> LinkEvaluator: | |
| canonical_name = canonicalize_name(project_name) | |
| formats = self.format_control.get_allowed_formats(canonical_name) | |
| return LinkEvaluator( | |
| project_name=project_name, | |
| canonical_name=canonical_name, | |
| formats=formats, | |
| target_python=self._target_python, | |
| allow_yanked=self._allow_yanked, | |
| ignore_requires_python=self._ignore_requires_python, | |
| ) | |
| def _sort_links(self, links: Iterable[Link]) -> List[Link]: | |
| """ | |
| Returns elements of links in order, non-egg links first, egg links | |
| second, while eliminating duplicates | |
| """ | |
| eggs, no_eggs = [], [] | |
| seen: Set[Link] = set() | |
| for link in links: | |
| if link not in seen: | |
| seen.add(link) | |
| if link.egg_fragment: | |
| eggs.append(link) | |
| else: | |
| no_eggs.append(link) | |
| return no_eggs + eggs | |
| def _log_skipped_link(self, link: Link, reason: str) -> None: | |
| if link not in self._logged_links: | |
| # Put the link at the end so the reason is more visible and because | |
| # the link string is usually very long. | |
| logger.debug("Skipping link: %s: %s", reason, link) | |
| self._logged_links.add(link) | |
| def get_install_candidate( | |
| self, link_evaluator: LinkEvaluator, link: Link | |
| ) -> Optional[InstallationCandidate]: | |
| """ | |
| If the link is a candidate for install, convert it to an | |
| InstallationCandidate and return it. Otherwise, return None. | |
| """ | |
| is_candidate, result = link_evaluator.evaluate_link(link) | |
| if not is_candidate: | |
| if result: | |
| self._log_skipped_link(link, reason=result) | |
| return None | |
| return InstallationCandidate( | |
| name=link_evaluator.project_name, | |
| link=link, | |
| version=result, | |
| ) | |
| def evaluate_links( | |
| self, link_evaluator: LinkEvaluator, links: Iterable[Link] | |
| ) -> List[InstallationCandidate]: | |
| """ | |
| Convert links that are candidates to InstallationCandidate objects. | |
| """ | |
| candidates = [] | |
| for link in self._sort_links(links): | |
| candidate = self.get_install_candidate(link_evaluator, link) | |
| if candidate is not None: | |
| candidates.append(candidate) | |
| return candidates | |
| def process_project_url( | |
| self, project_url: Link, link_evaluator: LinkEvaluator | |
| ) -> List[InstallationCandidate]: | |
| logger.debug( | |
| "Fetching project page and analyzing links: %s", | |
| project_url, | |
| ) | |
| html_page = self._link_collector.fetch_page(project_url) | |
| if html_page is None: | |
| return [] | |
| page_links = list(parse_links(html_page, self._use_deprecated_html5lib)) | |
| with indent_log(): | |
| package_links = self.evaluate_links( | |
| link_evaluator, | |
| links=page_links, | |
| ) | |
| return package_links | |
| def find_all_candidates(self, project_name: str) -> List[InstallationCandidate]: | |
| """Find all available InstallationCandidate for project_name | |
| This checks index_urls and find_links. | |
| All versions found are returned as an InstallationCandidate list. | |
| See LinkEvaluator.evaluate_link() for details on which files | |
| are accepted. | |
| """ | |
| link_evaluator = self.make_link_evaluator(project_name) | |
| collected_sources = self._link_collector.collect_sources( | |
| project_name=project_name, | |
| candidates_from_page=functools.partial( | |
| self.process_project_url, | |
| link_evaluator=link_evaluator, | |
| ), | |
| ) | |
| page_candidates_it = itertools.chain.from_iterable( | |
| source.page_candidates() | |
| for sources in collected_sources | |
| for source in sources | |
| if source is not None | |
| ) | |
| page_candidates = list(page_candidates_it) | |
| file_links_it = itertools.chain.from_iterable( | |
| source.file_links() | |
| for sources in collected_sources | |
| for source in sources | |
| if source is not None | |
| ) | |
| file_candidates = self.evaluate_links( | |
| link_evaluator, | |
| sorted(file_links_it, reverse=True), | |
| ) | |
| if logger.isEnabledFor(logging.DEBUG) and file_candidates: | |
| paths = [] | |
| for candidate in file_candidates: | |
| assert candidate.link.url # we need to have a URL | |
| try: | |
| paths.append(candidate.link.file_path) | |
| except Exception: | |
| paths.append(candidate.link.url) # it's not a local file | |
| logger.debug("Local files found: %s", ", ".join(paths)) | |
| # This is an intentional priority ordering | |
| return file_candidates + page_candidates | |
| def make_candidate_evaluator( | |
| self, | |
| project_name: str, | |
| specifier: Optional[specifiers.BaseSpecifier] = None, | |
| hashes: Optional[Hashes] = None, | |
| ) -> CandidateEvaluator: | |
| """Create a CandidateEvaluator object to use.""" | |
| candidate_prefs = self._candidate_prefs | |
| return CandidateEvaluator.create( | |
| project_name=project_name, | |
| target_python=self._target_python, | |
| prefer_binary=candidate_prefs.prefer_binary, | |
| allow_all_prereleases=candidate_prefs.allow_all_prereleases, | |
| specifier=specifier, | |
| hashes=hashes, | |
| ) | |
| def find_best_candidate( | |
| self, | |
| project_name: str, | |
| specifier: Optional[specifiers.BaseSpecifier] = None, | |
| hashes: Optional[Hashes] = None, | |
| ) -> BestCandidateResult: | |
| """Find matches for the given project and specifier. | |
| :param specifier: An optional object implementing `filter` | |
| (e.g. `packaging.specifiers.SpecifierSet`) to filter applicable | |
| versions. | |
| :return: A `BestCandidateResult` instance. | |
| """ | |
| candidates = self.find_all_candidates(project_name) | |
| candidate_evaluator = self.make_candidate_evaluator( | |
| project_name=project_name, | |
| specifier=specifier, | |
| hashes=hashes, | |
| ) | |
| return candidate_evaluator.compute_best_candidate(candidates) | |
| def find_requirement( | |
| self, req: InstallRequirement, upgrade: bool | |
| ) -> Optional[InstallationCandidate]: | |
| """Try to find a Link matching req | |
| Expects req, an InstallRequirement and upgrade, a boolean | |
| Returns a InstallationCandidate if found, | |
| Raises DistributionNotFound or BestVersionAlreadyInstalled otherwise | |
| """ | |
| hashes = req.hashes(trust_internet=False) | |
| best_candidate_result = self.find_best_candidate( | |
| req.name, | |
| specifier=req.specifier, | |
| hashes=hashes, | |
| ) | |
| best_candidate = best_candidate_result.best_candidate | |
| installed_version: Optional[_BaseVersion] = None | |
| if req.satisfied_by is not None: | |
| installed_version = req.satisfied_by.version | |
| def _format_versions(cand_iter: Iterable[InstallationCandidate]) -> str: | |
| # This repeated parse_version and str() conversion is needed to | |
| # handle different vendoring sources from pip and pkg_resources. | |
| # If we stop using the pkg_resources provided specifier and start | |
| # using our own, we can drop the cast to str(). | |
| return ( | |
| ", ".join( | |
| sorted( | |
| {str(c.version) for c in cand_iter}, | |
| key=parse_version, | |
| ) | |
| ) | |
| or "none" | |
| ) | |
| if installed_version is None and best_candidate is None: | |
| logger.critical( | |
| "Could not find a version that satisfies the requirement %s " | |
| "(from versions: %s)", | |
| req, | |
| _format_versions(best_candidate_result.iter_all()), | |
| ) | |
| raise DistributionNotFound( | |
| "No matching distribution found for {}".format(req) | |
| ) | |
| best_installed = False | |
| if installed_version and ( | |
| best_candidate is None or best_candidate.version <= installed_version | |
| ): | |
| best_installed = True | |
| if not upgrade and installed_version is not None: | |
| if best_installed: | |
| logger.debug( | |
| "Existing installed version (%s) is most up-to-date and " | |
| "satisfies requirement", | |
| installed_version, | |
| ) | |
| else: | |
| logger.debug( | |
| "Existing installed version (%s) satisfies requirement " | |
| "(most up-to-date version is %s)", | |
| installed_version, | |
| best_candidate.version, | |
| ) | |
| return None | |
| if best_installed: | |
| # We have an existing version, and its the best version | |
| logger.debug( | |
| "Installed version (%s) is most up-to-date (past versions: %s)", | |
| installed_version, | |
| _format_versions(best_candidate_result.iter_applicable()), | |
| ) | |
| raise BestVersionAlreadyInstalled | |
| logger.debug( | |
| "Using version %s (newest of versions: %s)", | |
| best_candidate.version, | |
| _format_versions(best_candidate_result.iter_applicable()), | |
| ) | |
| return best_candidate | |
| def _find_name_version_sep(fragment: str, canonical_name: str) -> int: | |
| """Find the separator's index based on the package's canonical name. | |
| :param fragment: A <package>+<version> filename "fragment" (stem) or | |
| egg fragment. | |
| :param canonical_name: The package's canonical name. | |
| This function is needed since the canonicalized name does not necessarily | |
| have the same length as the egg info's name part. An example:: | |
| >>> fragment = 'foo__bar-1.0' | |
| >>> canonical_name = 'foo-bar' | |
| >>> _find_name_version_sep(fragment, canonical_name) | |
| 8 | |
| """ | |
| # Project name and version must be separated by one single dash. Find all | |
| # occurrences of dashes; if the string in front of it matches the canonical | |
| # name, this is the one separating the name and version parts. | |
| for i, c in enumerate(fragment): | |
| if c != "-": | |
| continue | |
| if canonicalize_name(fragment[:i]) == canonical_name: | |
| return i | |
| raise ValueError(f"{fragment} does not match {canonical_name}") | |
| def _extract_version_from_fragment(fragment: str, canonical_name: str) -> Optional[str]: | |
| """Parse the version string from a <package>+<version> filename | |
| "fragment" (stem) or egg fragment. | |
| :param fragment: The string to parse. E.g. foo-2.1 | |
| :param canonical_name: The canonicalized name of the package this | |
| belongs to. | |
| """ | |
| try: | |
| version_start = _find_name_version_sep(fragment, canonical_name) + 1 | |
| except ValueError: | |
| return None | |
| version = fragment[version_start:] | |
| if not version: | |
| return None | |
| return version | |