Spaces:
Paused
Paused
| import abc | |
| import importlib | |
| import io | |
| import sys | |
| import types | |
| import pathlib | |
| import contextlib | |
| from ..abc import ResourceReader | |
| from .compat.py39 import import_helper, os_helper | |
| from . import zip as zip_ | |
| from . import _path | |
| from importlib.machinery import ModuleSpec | |
| class Reader(ResourceReader): | |
| def __init__(self, **kwargs): | |
| vars(self).update(kwargs) | |
| def get_resource_reader(self, package): | |
| return self | |
| def open_resource(self, path): | |
| self._path = path | |
| if isinstance(self.file, Exception): | |
| raise self.file | |
| return self.file | |
| def resource_path(self, path_): | |
| self._path = path_ | |
| if isinstance(self.path, Exception): | |
| raise self.path | |
| return self.path | |
| def is_resource(self, path_): | |
| self._path = path_ | |
| if isinstance(self.path, Exception): | |
| raise self.path | |
| def part(entry): | |
| return entry.split('/') | |
| return any( | |
| len(parts) == 1 and parts[0] == path_ for parts in map(part, self._contents) | |
| ) | |
| def contents(self): | |
| if isinstance(self.path, Exception): | |
| raise self.path | |
| yield from self._contents | |
| def create_package_from_loader(loader, is_package=True): | |
| name = 'testingpackage' | |
| module = types.ModuleType(name) | |
| spec = ModuleSpec(name, loader, origin='does-not-exist', is_package=is_package) | |
| module.__spec__ = spec | |
| module.__loader__ = loader | |
| return module | |
| def create_package(file=None, path=None, is_package=True, contents=()): | |
| return create_package_from_loader( | |
| Reader(file=file, path=path, _contents=contents), | |
| is_package, | |
| ) | |
| class CommonTestsBase(metaclass=abc.ABCMeta): | |
| """ | |
| Tests shared by test_open, test_path, and test_read. | |
| """ | |
| def execute(self, package, path): | |
| """ | |
| Call the pertinent legacy API function (e.g. open_text, path) | |
| on package and path. | |
| """ | |
| def test_package_name(self): | |
| """ | |
| Passing in the package name should succeed. | |
| """ | |
| self.execute(self.data.__name__, 'utf-8.file') | |
| def test_package_object(self): | |
| """ | |
| Passing in the package itself should succeed. | |
| """ | |
| self.execute(self.data, 'utf-8.file') | |
| def test_string_path(self): | |
| """ | |
| Passing in a string for the path should succeed. | |
| """ | |
| path = 'utf-8.file' | |
| self.execute(self.data, path) | |
| def test_pathlib_path(self): | |
| """ | |
| Passing in a pathlib.PurePath object for the path should succeed. | |
| """ | |
| path = pathlib.PurePath('utf-8.file') | |
| self.execute(self.data, path) | |
| def test_importing_module_as_side_effect(self): | |
| """ | |
| The anchor package can already be imported. | |
| """ | |
| del sys.modules[self.data.__name__] | |
| self.execute(self.data.__name__, 'utf-8.file') | |
| def test_missing_path(self): | |
| """ | |
| Attempting to open or read or request the path for a | |
| non-existent path should succeed if open_resource | |
| can return a viable data stream. | |
| """ | |
| bytes_data = io.BytesIO(b'Hello, world!') | |
| package = create_package(file=bytes_data, path=FileNotFoundError()) | |
| self.execute(package, 'utf-8.file') | |
| self.assertEqual(package.__loader__._path, 'utf-8.file') | |
| def test_extant_path(self): | |
| # Attempting to open or read or request the path when the | |
| # path does exist should still succeed. Does not assert | |
| # anything about the result. | |
| bytes_data = io.BytesIO(b'Hello, world!') | |
| # any path that exists | |
| path = __file__ | |
| package = create_package(file=bytes_data, path=path) | |
| self.execute(package, 'utf-8.file') | |
| self.assertEqual(package.__loader__._path, 'utf-8.file') | |
| def test_useless_loader(self): | |
| package = create_package(file=FileNotFoundError(), path=FileNotFoundError()) | |
| with self.assertRaises(FileNotFoundError): | |
| self.execute(package, 'utf-8.file') | |
| fixtures = dict( | |
| data01={ | |
| '__init__.py': '', | |
| 'binary.file': bytes(range(4)), | |
| 'utf-16.file': 'Hello, UTF-16 world!\n'.encode('utf-16'), | |
| 'utf-8.file': 'Hello, UTF-8 world!\n'.encode('utf-8'), | |
| 'subdirectory': { | |
| '__init__.py': '', | |
| 'binary.file': bytes(range(4, 8)), | |
| }, | |
| }, | |
| data02={ | |
| '__init__.py': '', | |
| 'one': {'__init__.py': '', 'resource1.txt': 'one resource'}, | |
| 'two': {'__init__.py': '', 'resource2.txt': 'two resource'}, | |
| 'subdirectory': {'subsubdir': {'resource.txt': 'a resource'}}, | |
| }, | |
| namespacedata01={ | |
| 'binary.file': bytes(range(4)), | |
| 'utf-16.file': 'Hello, UTF-16 world!\n'.encode('utf-16'), | |
| 'utf-8.file': 'Hello, UTF-8 world!\n'.encode('utf-8'), | |
| 'subdirectory': { | |
| 'binary.file': bytes(range(12, 16)), | |
| }, | |
| }, | |
| ) | |
| class ModuleSetup: | |
| def setUp(self): | |
| self.fixtures = contextlib.ExitStack() | |
| self.addCleanup(self.fixtures.close) | |
| self.fixtures.enter_context(import_helper.isolated_modules()) | |
| self.data = self.load_fixture(self.MODULE) | |
| def load_fixture(self, module): | |
| self.tree_on_path({module: fixtures[module]}) | |
| return importlib.import_module(module) | |
| class ZipSetup(ModuleSetup): | |
| MODULE = 'data01' | |
| def tree_on_path(self, spec): | |
| temp_dir = self.fixtures.enter_context(os_helper.temp_dir()) | |
| modules = pathlib.Path(temp_dir) / 'zipped modules.zip' | |
| self.fixtures.enter_context( | |
| import_helper.DirsOnSysPath(str(zip_.make_zip_file(spec, modules))) | |
| ) | |
| class DiskSetup(ModuleSetup): | |
| MODULE = 'data01' | |
| def tree_on_path(self, spec): | |
| temp_dir = self.fixtures.enter_context(os_helper.temp_dir()) | |
| _path.build(spec, pathlib.Path(temp_dir)) | |
| self.fixtures.enter_context(import_helper.DirsOnSysPath(temp_dir)) | |
| class CommonTests(DiskSetup, CommonTestsBase): | |
| pass | |