test
This commit is contained in:
		| @ -0,0 +1,4 @@ | ||||
| from ._dists import Distribution | ||||
| from ._envs import Environment | ||||
|  | ||||
| __all__ = ["Distribution", "Environment"] | ||||
| @ -0,0 +1,55 @@ | ||||
| import importlib.metadata | ||||
| from typing import Any, Optional, Protocol, cast | ||||
|  | ||||
|  | ||||
| class BadMetadata(ValueError): | ||||
|     def __init__(self, dist: importlib.metadata.Distribution, *, reason: str) -> None: | ||||
|         self.dist = dist | ||||
|         self.reason = reason | ||||
|  | ||||
|     def __str__(self) -> str: | ||||
|         return f"Bad metadata in {self.dist} ({self.reason})" | ||||
|  | ||||
|  | ||||
| class BasePath(Protocol): | ||||
|     """A protocol that various path objects conform. | ||||
|  | ||||
|     This exists because importlib.metadata uses both ``pathlib.Path`` and | ||||
|     ``zipfile.Path``, and we need a common base for type hints (Union does not | ||||
|     work well since ``zipfile.Path`` is too new for our linter setup). | ||||
|  | ||||
|     This does not mean to be exhaustive, but only contains things that present | ||||
|     in both classes *that we need*. | ||||
|     """ | ||||
|  | ||||
|     @property | ||||
|     def name(self) -> str: | ||||
|         raise NotImplementedError() | ||||
|  | ||||
|     @property | ||||
|     def parent(self) -> "BasePath": | ||||
|         raise NotImplementedError() | ||||
|  | ||||
|  | ||||
| def get_info_location(d: importlib.metadata.Distribution) -> Optional[BasePath]: | ||||
|     """Find the path to the distribution's metadata directory. | ||||
|  | ||||
|     HACK: This relies on importlib.metadata's private ``_path`` attribute. Not | ||||
|     all distributions exist on disk, so importlib.metadata is correct to not | ||||
|     expose the attribute as public. But pip's code base is old and not as clean, | ||||
|     so we do this to avoid having to rewrite too many things. Hopefully we can | ||||
|     eliminate this some day. | ||||
|     """ | ||||
|     return getattr(d, "_path", None) | ||||
|  | ||||
|  | ||||
| def get_dist_name(dist: importlib.metadata.Distribution) -> str: | ||||
|     """Get the distribution's project name. | ||||
|  | ||||
|     The ``name`` attribute is only available in Python 3.10 or later. We are | ||||
|     targeting exactly that, but Mypy does not know this. | ||||
|     """ | ||||
|     name = cast(Any, dist).name | ||||
|     if not isinstance(name, str): | ||||
|         raise BadMetadata(dist, reason="invalid metadata entry 'name'") | ||||
|     return name | ||||
| @ -0,0 +1,224 @@ | ||||
| import email.message | ||||
| import importlib.metadata | ||||
| import os | ||||
| import pathlib | ||||
| import zipfile | ||||
| from typing import ( | ||||
|     Collection, | ||||
|     Dict, | ||||
|     Iterable, | ||||
|     Iterator, | ||||
|     Mapping, | ||||
|     Optional, | ||||
|     Sequence, | ||||
|     cast, | ||||
| ) | ||||
|  | ||||
| from pip._vendor.packaging.requirements import Requirement | ||||
| from pip._vendor.packaging.utils import NormalizedName, canonicalize_name | ||||
| from pip._vendor.packaging.version import parse as parse_version | ||||
|  | ||||
| from pip._internal.exceptions import InvalidWheel, UnsupportedWheel | ||||
| from pip._internal.metadata.base import ( | ||||
|     BaseDistribution, | ||||
|     BaseEntryPoint, | ||||
|     DistributionVersion, | ||||
|     InfoPath, | ||||
|     Wheel, | ||||
| ) | ||||
| from pip._internal.utils.misc import normalize_path | ||||
| from pip._internal.utils.packaging import safe_extra | ||||
| from pip._internal.utils.temp_dir import TempDirectory | ||||
| from pip._internal.utils.wheel import parse_wheel, read_wheel_metadata_file | ||||
|  | ||||
| from ._compat import BasePath, get_dist_name | ||||
|  | ||||
|  | ||||
| class WheelDistribution(importlib.metadata.Distribution): | ||||
|     """An ``importlib.metadata.Distribution`` read from a wheel. | ||||
|  | ||||
|     Although ``importlib.metadata.PathDistribution`` accepts ``zipfile.Path``, | ||||
|     its implementation is too "lazy" for pip's needs (we can't keep the ZipFile | ||||
|     handle open for the entire lifetime of the distribution object). | ||||
|  | ||||
|     This implementation eagerly reads the entire metadata directory into the | ||||
|     memory instead, and operates from that. | ||||
|     """ | ||||
|  | ||||
|     def __init__( | ||||
|         self, | ||||
|         files: Mapping[pathlib.PurePosixPath, bytes], | ||||
|         info_location: pathlib.PurePosixPath, | ||||
|     ) -> None: | ||||
|         self._files = files | ||||
|         self.info_location = info_location | ||||
|  | ||||
|     @classmethod | ||||
|     def from_zipfile( | ||||
|         cls, | ||||
|         zf: zipfile.ZipFile, | ||||
|         name: str, | ||||
|         location: str, | ||||
|     ) -> "WheelDistribution": | ||||
|         info_dir, _ = parse_wheel(zf, name) | ||||
|         paths = ( | ||||
|             (name, pathlib.PurePosixPath(name.split("/", 1)[-1])) | ||||
|             for name in zf.namelist() | ||||
|             if name.startswith(f"{info_dir}/") | ||||
|         ) | ||||
|         files = { | ||||
|             relpath: read_wheel_metadata_file(zf, fullpath) | ||||
|             for fullpath, relpath in paths | ||||
|         } | ||||
|         info_location = pathlib.PurePosixPath(location, info_dir) | ||||
|         return cls(files, info_location) | ||||
|  | ||||
|     def iterdir(self, path: InfoPath) -> Iterator[pathlib.PurePosixPath]: | ||||
|         # Only allow iterating through the metadata directory. | ||||
|         if pathlib.PurePosixPath(str(path)) in self._files: | ||||
|             return iter(self._files) | ||||
|         raise FileNotFoundError(path) | ||||
|  | ||||
|     def read_text(self, filename: str) -> Optional[str]: | ||||
|         try: | ||||
|             data = self._files[pathlib.PurePosixPath(filename)] | ||||
|         except KeyError: | ||||
|             return None | ||||
|         try: | ||||
|             text = data.decode("utf-8") | ||||
|         except UnicodeDecodeError as e: | ||||
|             wheel = self.info_location.parent | ||||
|             error = f"Error decoding metadata for {wheel}: {e} in {filename} file" | ||||
|             raise UnsupportedWheel(error) | ||||
|         return text | ||||
|  | ||||
|  | ||||
| class Distribution(BaseDistribution): | ||||
|     def __init__( | ||||
|         self, | ||||
|         dist: importlib.metadata.Distribution, | ||||
|         info_location: Optional[BasePath], | ||||
|         installed_location: Optional[BasePath], | ||||
|     ) -> None: | ||||
|         self._dist = dist | ||||
|         self._info_location = info_location | ||||
|         self._installed_location = installed_location | ||||
|  | ||||
|     @classmethod | ||||
|     def from_directory(cls, directory: str) -> BaseDistribution: | ||||
|         info_location = pathlib.Path(directory) | ||||
|         dist = importlib.metadata.Distribution.at(info_location) | ||||
|         return cls(dist, info_location, info_location.parent) | ||||
|  | ||||
|     @classmethod | ||||
|     def from_metadata_file_contents( | ||||
|         cls, | ||||
|         metadata_contents: bytes, | ||||
|         filename: str, | ||||
|         project_name: str, | ||||
|     ) -> BaseDistribution: | ||||
|         # Generate temp dir to contain the metadata file, and write the file contents. | ||||
|         temp_dir = pathlib.Path( | ||||
|             TempDirectory(kind="metadata", globally_managed=True).path | ||||
|         ) | ||||
|         metadata_path = temp_dir / "METADATA" | ||||
|         metadata_path.write_bytes(metadata_contents) | ||||
|         # Construct dist pointing to the newly created directory. | ||||
|         dist = importlib.metadata.Distribution.at(metadata_path.parent) | ||||
|         return cls(dist, metadata_path.parent, None) | ||||
|  | ||||
|     @classmethod | ||||
|     def from_wheel(cls, wheel: Wheel, name: str) -> BaseDistribution: | ||||
|         try: | ||||
|             with wheel.as_zipfile() as zf: | ||||
|                 dist = WheelDistribution.from_zipfile(zf, name, wheel.location) | ||||
|         except zipfile.BadZipFile as e: | ||||
|             raise InvalidWheel(wheel.location, name) from e | ||||
|         except UnsupportedWheel as e: | ||||
|             raise UnsupportedWheel(f"{name} has an invalid wheel, {e}") | ||||
|         return cls(dist, dist.info_location, pathlib.PurePosixPath(wheel.location)) | ||||
|  | ||||
|     @property | ||||
|     def location(self) -> Optional[str]: | ||||
|         if self._info_location is None: | ||||
|             return None | ||||
|         return str(self._info_location.parent) | ||||
|  | ||||
|     @property | ||||
|     def info_location(self) -> Optional[str]: | ||||
|         if self._info_location is None: | ||||
|             return None | ||||
|         return str(self._info_location) | ||||
|  | ||||
|     @property | ||||
|     def installed_location(self) -> Optional[str]: | ||||
|         if self._installed_location is None: | ||||
|             return None | ||||
|         return normalize_path(str(self._installed_location)) | ||||
|  | ||||
|     def _get_dist_name_from_location(self) -> Optional[str]: | ||||
|         """Try to get the name from the metadata directory name. | ||||
|  | ||||
|         This is much faster than reading metadata. | ||||
|         """ | ||||
|         if self._info_location is None: | ||||
|             return None | ||||
|         stem, suffix = os.path.splitext(self._info_location.name) | ||||
|         if suffix not in (".dist-info", ".egg-info"): | ||||
|             return None | ||||
|         return stem.split("-", 1)[0] | ||||
|  | ||||
|     @property | ||||
|     def canonical_name(self) -> NormalizedName: | ||||
|         name = self._get_dist_name_from_location() or get_dist_name(self._dist) | ||||
|         return canonicalize_name(name) | ||||
|  | ||||
|     @property | ||||
|     def version(self) -> DistributionVersion: | ||||
|         return parse_version(self._dist.version) | ||||
|  | ||||
|     def is_file(self, path: InfoPath) -> bool: | ||||
|         return self._dist.read_text(str(path)) is not None | ||||
|  | ||||
|     def iter_distutils_script_names(self) -> Iterator[str]: | ||||
|         # A distutils installation is always "flat" (not in e.g. egg form), so | ||||
|         # if this distribution's info location is NOT a pathlib.Path (but e.g. | ||||
|         # zipfile.Path), it can never contain any distutils scripts. | ||||
|         if not isinstance(self._info_location, pathlib.Path): | ||||
|             return | ||||
|         for child in self._info_location.joinpath("scripts").iterdir(): | ||||
|             yield child.name | ||||
|  | ||||
|     def read_text(self, path: InfoPath) -> str: | ||||
|         content = self._dist.read_text(str(path)) | ||||
|         if content is None: | ||||
|             raise FileNotFoundError(path) | ||||
|         return content | ||||
|  | ||||
|     def iter_entry_points(self) -> Iterable[BaseEntryPoint]: | ||||
|         # importlib.metadata's EntryPoint structure sasitfies BaseEntryPoint. | ||||
|         return self._dist.entry_points | ||||
|  | ||||
|     def _metadata_impl(self) -> email.message.Message: | ||||
|         # From Python 3.10+, importlib.metadata declares PackageMetadata as the | ||||
|         # return type. This protocol is unfortunately a disaster now and misses | ||||
|         # a ton of fields that we need, including get() and get_payload(). We | ||||
|         # rely on the implementation that the object is actually a Message now, | ||||
|         # until upstream can improve the protocol. (python/cpython#94952) | ||||
|         return cast(email.message.Message, self._dist.metadata) | ||||
|  | ||||
|     def iter_provided_extras(self) -> Iterable[str]: | ||||
|         return ( | ||||
|             safe_extra(extra) for extra in self.metadata.get_all("Provides-Extra", []) | ||||
|         ) | ||||
|  | ||||
|     def iter_dependencies(self, extras: Collection[str] = ()) -> Iterable[Requirement]: | ||||
|         contexts: Sequence[Dict[str, str]] = [{"extra": safe_extra(e)} for e in extras] | ||||
|         for req_string in self.metadata.get_all("Requires-Dist", []): | ||||
|             req = Requirement(req_string) | ||||
|             if not req.marker: | ||||
|                 yield req | ||||
|             elif not extras and req.marker.evaluate({"extra": ""}): | ||||
|                 yield req | ||||
|             elif any(req.marker.evaluate(context) for context in contexts): | ||||
|                 yield req | ||||
| @ -0,0 +1,188 @@ | ||||
| import functools | ||||
| import importlib.metadata | ||||
| import logging | ||||
| import os | ||||
| import pathlib | ||||
| import sys | ||||
| import zipfile | ||||
| import zipimport | ||||
| from typing import Iterator, List, Optional, Sequence, Set, Tuple | ||||
|  | ||||
| from pip._vendor.packaging.utils import NormalizedName, canonicalize_name | ||||
|  | ||||
| from pip._internal.metadata.base import BaseDistribution, BaseEnvironment | ||||
| from pip._internal.models.wheel import Wheel | ||||
| from pip._internal.utils.deprecation import deprecated | ||||
| from pip._internal.utils.filetypes import WHEEL_EXTENSION | ||||
|  | ||||
| from ._compat import BadMetadata, BasePath, get_dist_name, get_info_location | ||||
| from ._dists import Distribution | ||||
|  | ||||
| logger = logging.getLogger(__name__) | ||||
|  | ||||
|  | ||||
| def _looks_like_wheel(location: str) -> bool: | ||||
|     if not location.endswith(WHEEL_EXTENSION): | ||||
|         return False | ||||
|     if not os.path.isfile(location): | ||||
|         return False | ||||
|     if not Wheel.wheel_file_re.match(os.path.basename(location)): | ||||
|         return False | ||||
|     return zipfile.is_zipfile(location) | ||||
|  | ||||
|  | ||||
| class _DistributionFinder: | ||||
|     """Finder to locate distributions. | ||||
|  | ||||
|     The main purpose of this class is to memoize found distributions' names, so | ||||
|     only one distribution is returned for each package name. At lot of pip code | ||||
|     assumes this (because it is setuptools's behavior), and not doing the same | ||||
|     can potentially cause a distribution in lower precedence path to override a | ||||
|     higher precedence one if the caller is not careful. | ||||
|  | ||||
|     Eventually we probably want to make it possible to see lower precedence | ||||
|     installations as well. It's useful feature, after all. | ||||
|     """ | ||||
|  | ||||
|     FoundResult = Tuple[importlib.metadata.Distribution, Optional[BasePath]] | ||||
|  | ||||
|     def __init__(self) -> None: | ||||
|         self._found_names: Set[NormalizedName] = set() | ||||
|  | ||||
|     def _find_impl(self, location: str) -> Iterator[FoundResult]: | ||||
|         """Find distributions in a location.""" | ||||
|         # Skip looking inside a wheel. Since a package inside a wheel is not | ||||
|         # always valid (due to .data directories etc.), its .dist-info entry | ||||
|         # should not be considered an installed distribution. | ||||
|         if _looks_like_wheel(location): | ||||
|             return | ||||
|         # To know exactly where we find a distribution, we have to feed in the | ||||
|         # paths one by one, instead of dumping the list to importlib.metadata. | ||||
|         for dist in importlib.metadata.distributions(path=[location]): | ||||
|             info_location = get_info_location(dist) | ||||
|             try: | ||||
|                 raw_name = get_dist_name(dist) | ||||
|             except BadMetadata as e: | ||||
|                 logger.warning("Skipping %s due to %s", info_location, e.reason) | ||||
|                 continue | ||||
|             normalized_name = canonicalize_name(raw_name) | ||||
|             if normalized_name in self._found_names: | ||||
|                 continue | ||||
|             self._found_names.add(normalized_name) | ||||
|             yield dist, info_location | ||||
|  | ||||
|     def find(self, location: str) -> Iterator[BaseDistribution]: | ||||
|         """Find distributions in a location. | ||||
|  | ||||
|         The path can be either a directory, or a ZIP archive. | ||||
|         """ | ||||
|         for dist, info_location in self._find_impl(location): | ||||
|             if info_location is None: | ||||
|                 installed_location: Optional[BasePath] = None | ||||
|             else: | ||||
|                 installed_location = info_location.parent | ||||
|             yield Distribution(dist, info_location, installed_location) | ||||
|  | ||||
|     def find_linked(self, location: str) -> Iterator[BaseDistribution]: | ||||
|         """Read location in egg-link files and return distributions in there. | ||||
|  | ||||
|         The path should be a directory; otherwise this returns nothing. This | ||||
|         follows how setuptools does this for compatibility. The first non-empty | ||||
|         line in the egg-link is read as a path (resolved against the egg-link's | ||||
|         containing directory if relative). Distributions found at that linked | ||||
|         location are returned. | ||||
|         """ | ||||
|         path = pathlib.Path(location) | ||||
|         if not path.is_dir(): | ||||
|             return | ||||
|         for child in path.iterdir(): | ||||
|             if child.suffix != ".egg-link": | ||||
|                 continue | ||||
|             with child.open() as f: | ||||
|                 lines = (line.strip() for line in f) | ||||
|                 target_rel = next((line for line in lines if line), "") | ||||
|             if not target_rel: | ||||
|                 continue | ||||
|             target_location = str(path.joinpath(target_rel)) | ||||
|             for dist, info_location in self._find_impl(target_location): | ||||
|                 yield Distribution(dist, info_location, path) | ||||
|  | ||||
|     def _find_eggs_in_dir(self, location: str) -> Iterator[BaseDistribution]: | ||||
|         from pip._vendor.pkg_resources import find_distributions | ||||
|  | ||||
|         from pip._internal.metadata import pkg_resources as legacy | ||||
|  | ||||
|         with os.scandir(location) as it: | ||||
|             for entry in it: | ||||
|                 if not entry.name.endswith(".egg"): | ||||
|                     continue | ||||
|                 for dist in find_distributions(entry.path): | ||||
|                     yield legacy.Distribution(dist) | ||||
|  | ||||
|     def _find_eggs_in_zip(self, location: str) -> Iterator[BaseDistribution]: | ||||
|         from pip._vendor.pkg_resources import find_eggs_in_zip | ||||
|  | ||||
|         from pip._internal.metadata import pkg_resources as legacy | ||||
|  | ||||
|         try: | ||||
|             importer = zipimport.zipimporter(location) | ||||
|         except zipimport.ZipImportError: | ||||
|             return | ||||
|         for dist in find_eggs_in_zip(importer, location): | ||||
|             yield legacy.Distribution(dist) | ||||
|  | ||||
|     def find_eggs(self, location: str) -> Iterator[BaseDistribution]: | ||||
|         """Find eggs in a location. | ||||
|  | ||||
|         This actually uses the old *pkg_resources* backend. We likely want to | ||||
|         deprecate this so we can eventually remove the *pkg_resources* | ||||
|         dependency entirely. Before that, this should first emit a deprecation | ||||
|         warning for some versions when using the fallback since importing | ||||
|         *pkg_resources* is slow for those who don't need it. | ||||
|         """ | ||||
|         if os.path.isdir(location): | ||||
|             yield from self._find_eggs_in_dir(location) | ||||
|         if zipfile.is_zipfile(location): | ||||
|             yield from self._find_eggs_in_zip(location) | ||||
|  | ||||
|  | ||||
| @functools.lru_cache(maxsize=None)  # Warn a distribution exactly once. | ||||
| def _emit_egg_deprecation(location: Optional[str]) -> None: | ||||
|     deprecated( | ||||
|         reason=f"Loading egg at {location} is deprecated.", | ||||
|         replacement="to use pip for package installation.", | ||||
|         gone_in=None, | ||||
|     ) | ||||
|  | ||||
|  | ||||
| class Environment(BaseEnvironment): | ||||
|     def __init__(self, paths: Sequence[str]) -> None: | ||||
|         self._paths = paths | ||||
|  | ||||
|     @classmethod | ||||
|     def default(cls) -> BaseEnvironment: | ||||
|         return cls(sys.path) | ||||
|  | ||||
|     @classmethod | ||||
|     def from_paths(cls, paths: Optional[List[str]]) -> BaseEnvironment: | ||||
|         if paths is None: | ||||
|             return cls(sys.path) | ||||
|         return cls(paths) | ||||
|  | ||||
|     def _iter_distributions(self) -> Iterator[BaseDistribution]: | ||||
|         finder = _DistributionFinder() | ||||
|         for location in self._paths: | ||||
|             yield from finder.find(location) | ||||
|             for dist in finder.find_eggs(location): | ||||
|                 # _emit_egg_deprecation(dist.location)  # TODO: Enable this. | ||||
|                 yield dist | ||||
|             # This must go last because that's how pkg_resources tie-breaks. | ||||
|             yield from finder.find_linked(location) | ||||
|  | ||||
|     def get_distribution(self, name: str) -> Optional[BaseDistribution]: | ||||
|         matches = ( | ||||
|             distribution | ||||
|             for distribution in self.iter_all_distributions() | ||||
|             if distribution.canonical_name == canonicalize_name(name) | ||||
|         ) | ||||
|         return next(matches, None) | ||||
		Reference in New Issue
	
	Block a user