|
|
|
|
|
|
|
|
|
"""Utilities used in ODB testing""" |
|
from gitdb import OStream |
|
|
|
import sys |
|
import random |
|
from array import array |
|
|
|
from io import BytesIO |
|
|
|
import glob |
|
import unittest |
|
import tempfile |
|
import shutil |
|
import os |
|
import gc |
|
import logging |
|
from functools import wraps |
|
|
|
|
|
|
|
|
|
class TestBase(unittest.TestCase): |
|
"""Base class for all tests |
|
|
|
TestCase providing access to readonly repositories using the following member variables. |
|
|
|
* gitrepopath |
|
|
|
* read-only base path of the git source repository, i.e. .../git/.git |
|
""" |
|
|
|
|
|
k_env_git_repo = "GITDB_TEST_GIT_REPO_BASE" |
|
|
|
|
|
@classmethod |
|
def setUpClass(cls): |
|
try: |
|
super().setUpClass() |
|
except AttributeError: |
|
pass |
|
|
|
cls.gitrepopath = os.environ.get(cls.k_env_git_repo) |
|
if not cls.gitrepopath: |
|
logging.info( |
|
"You can set the %s environment variable to a .git repository of your choice - defaulting to the gitdb repository", cls.k_env_git_repo) |
|
ospd = os.path.dirname |
|
cls.gitrepopath = os.path.join(ospd(ospd(ospd(__file__))), '.git') |
|
|
|
assert cls.gitrepopath.endswith('.git') |
|
|
|
|
|
|
|
|
|
|
|
|
|
def with_rw_directory(func): |
|
"""Create a temporary directory which can be written to, remove it if the |
|
test succeeds, but leave it otherwise to aid additional debugging""" |
|
|
|
def wrapper(self): |
|
path = tempfile.mktemp(prefix=func.__name__) |
|
os.mkdir(path) |
|
keep = False |
|
try: |
|
try: |
|
return func(self, path) |
|
except Exception: |
|
sys.stderr.write(f"Test {type(self).__name__}.{func.__name__} failed, output is at {path!r}\n") |
|
keep = True |
|
raise |
|
finally: |
|
|
|
|
|
|
|
|
|
if not keep: |
|
gc.collect() |
|
shutil.rmtree(path) |
|
|
|
|
|
|
|
wrapper.__name__ = func.__name__ |
|
return wrapper |
|
|
|
|
|
def with_packs_rw(func): |
|
"""Function that provides a path into which the packs for testing should be |
|
copied. Will pass on the path to the actual function afterwards""" |
|
|
|
def wrapper(self, path): |
|
src_pack_glob = fixture_path('packs/*') |
|
copy_files_globbed(src_pack_glob, path, hard_link_ok=True) |
|
return func(self, path) |
|
|
|
|
|
wrapper.__name__ = func.__name__ |
|
return wrapper |
|
|
|
|
|
|
|
|
|
|
|
|
|
def fixture_path(relapath=''): |
|
""":return: absolute path into the fixture directory |
|
:param relapath: relative path into the fixtures directory, or '' |
|
to obtain the fixture directory itself""" |
|
return os.path.join(os.path.dirname(__file__), 'fixtures', relapath) |
|
|
|
|
|
def copy_files_globbed(source_glob, target_dir, hard_link_ok=False): |
|
"""Copy all files found according to the given source glob into the target directory |
|
:param hard_link_ok: if True, hard links will be created if possible. Otherwise |
|
the files will be copied""" |
|
for src_file in glob.glob(source_glob): |
|
if hard_link_ok and hasattr(os, 'link'): |
|
target = os.path.join(target_dir, os.path.basename(src_file)) |
|
try: |
|
os.link(src_file, target) |
|
except OSError: |
|
shutil.copy(src_file, target_dir) |
|
|
|
else: |
|
shutil.copy(src_file, target_dir) |
|
|
|
|
|
|
|
|
|
def make_bytes(size_in_bytes, randomize=False): |
|
""":return: string with given size in bytes |
|
:param randomize: try to produce a very random stream""" |
|
actual_size = size_in_bytes // 4 |
|
producer = range(actual_size) |
|
if randomize: |
|
producer = list(producer) |
|
random.shuffle(producer) |
|
|
|
a = array('i', producer) |
|
return a.tobytes() |
|
|
|
|
|
def make_object(type, data): |
|
""":return: bytes resembling an uncompressed object""" |
|
odata = "blob %i\0" % len(data) |
|
return odata.encode("ascii") + data |
|
|
|
|
|
def make_memory_file(size_in_bytes, randomize=False): |
|
""":return: tuple(size_of_stream, stream) |
|
:param randomize: try to produce a very random stream""" |
|
d = make_bytes(size_in_bytes, randomize) |
|
return len(d), BytesIO(d) |
|
|
|
|
|
|
|
|
|
|
|
|
|
class DummyStream: |
|
|
|
def __init__(self): |
|
self.was_read = False |
|
self.bytes = 0 |
|
self.closed = False |
|
|
|
def read(self, size): |
|
self.was_read = True |
|
self.bytes = size |
|
|
|
def close(self): |
|
self.closed = True |
|
|
|
def _assert(self): |
|
assert self.was_read |
|
|
|
|
|
class DeriveTest(OStream): |
|
|
|
def __init__(self, sha, type, size, stream, *args, **kwargs): |
|
self.myarg = kwargs.pop('myarg') |
|
self.args = args |
|
|
|
def _assert(self): |
|
assert self.args |
|
assert self.myarg |
|
|
|
|
|
|