|
|
|
|
|
|
|
|
|
"""Test for object db""" |
|
|
|
from gitdb.test.lib import ( |
|
TestBase, |
|
DummyStream, |
|
make_bytes, |
|
make_object, |
|
fixture_path |
|
) |
|
|
|
from gitdb import ( |
|
DecompressMemMapReader, |
|
FDCompressedSha1Writer, |
|
LooseObjectDB, |
|
Sha1Writer, |
|
MemoryDB, |
|
IStream, |
|
) |
|
from gitdb.util import hex_to_bin |
|
|
|
import zlib |
|
from gitdb.typ import ( |
|
str_blob_type |
|
) |
|
|
|
import tempfile |
|
import os |
|
from io import BytesIO |
|
|
|
|
|
class TestStream(TestBase): |
|
|
|
"""Test stream classes""" |
|
|
|
data_sizes = (15, 10000, 1000 * 1024 + 512) |
|
|
|
def _assert_stream_reader(self, stream, cdata, rewind_stream=lambda s: None): |
|
"""Make stream tests - the orig_stream is seekable, allowing it to be |
|
rewound and reused |
|
:param cdata: the data we expect to read from stream, the contents |
|
:param rewind_stream: function called to rewind the stream to make it ready |
|
for reuse""" |
|
ns = 10 |
|
assert len(cdata) > ns - 1, "Data must be larger than %i, was %i" % (ns, len(cdata)) |
|
|
|
|
|
ss = len(cdata) // ns |
|
for i in range(ns): |
|
data = stream.read(ss) |
|
chunk = cdata[i * ss:(i + 1) * ss] |
|
assert data == chunk |
|
|
|
rest = stream.read() |
|
if rest: |
|
assert rest == cdata[-len(rest):] |
|
|
|
|
|
if isinstance(stream, DecompressMemMapReader): |
|
assert len(stream.data()) == stream.compressed_bytes_read() |
|
|
|
|
|
rewind_stream(stream) |
|
|
|
|
|
rdata = stream.read() |
|
assert rdata == cdata |
|
|
|
if isinstance(stream, DecompressMemMapReader): |
|
assert len(stream.data()) == stream.compressed_bytes_read() |
|
|
|
|
|
def test_decompress_reader(self): |
|
for close_on_deletion in range(2): |
|
for with_size in range(2): |
|
for ds in self.data_sizes: |
|
cdata = make_bytes(ds, randomize=False) |
|
|
|
|
|
|
|
|
|
|
|
if with_size: |
|
|
|
zdata = zlib.compress(make_object(str_blob_type, cdata)) |
|
typ, size, reader = DecompressMemMapReader.new(zdata, close_on_deletion) |
|
assert size == len(cdata) |
|
assert typ == str_blob_type |
|
|
|
|
|
test_reader = DecompressMemMapReader(zdata, close_on_deletion=False) |
|
assert test_reader._s == len(cdata) |
|
else: |
|
|
|
zdata = zlib.compress(cdata) |
|
reader = DecompressMemMapReader(zdata, close_on_deletion, len(cdata)) |
|
assert reader._s == len(cdata) |
|
|
|
|
|
self._assert_stream_reader(reader, cdata, lambda r: r.seek(0)) |
|
|
|
|
|
dummy = DummyStream() |
|
reader._m = dummy |
|
|
|
assert not dummy.closed |
|
del(reader) |
|
assert dummy.closed == close_on_deletion |
|
|
|
|
|
|
|
|
|
def test_sha_writer(self): |
|
writer = Sha1Writer() |
|
assert 2 == writer.write(b"hi") |
|
assert len(writer.sha(as_hex=1)) == 40 |
|
assert len(writer.sha(as_hex=0)) == 20 |
|
|
|
|
|
prev_sha = writer.sha() |
|
writer.write(b"hi again") |
|
assert writer.sha() != prev_sha |
|
|
|
def test_compressed_writer(self): |
|
for ds in self.data_sizes: |
|
fd, path = tempfile.mkstemp() |
|
ostream = FDCompressedSha1Writer(fd) |
|
data = make_bytes(ds, randomize=False) |
|
|
|
|
|
assert len(data) == ostream.write(data) |
|
ostream.close() |
|
|
|
|
|
self.assertRaises(OSError, os.close, fd) |
|
|
|
|
|
fd = os.open(path, os.O_RDONLY | getattr(os, 'O_BINARY', 0)) |
|
written_data = os.read(fd, os.path.getsize(path)) |
|
assert len(written_data) == os.path.getsize(path) |
|
os.close(fd) |
|
assert written_data == zlib.compress(data, 1) |
|
|
|
os.remove(path) |
|
|
|
|
|
def test_decompress_reader_special_case(self): |
|
odb = LooseObjectDB(fixture_path('objects')) |
|
mdb = MemoryDB() |
|
for sha in (b'888401851f15db0eed60eb1bc29dec5ddcace911', |
|
b'7bb839852ed5e3a069966281bb08d50012fb309b',): |
|
ostream = odb.stream(hex_to_bin(sha)) |
|
|
|
|
|
data = ostream.read() |
|
assert len(data) == ostream.size |
|
|
|
|
|
dump = mdb.store(IStream(ostream.type, ostream.size, BytesIO(data))) |
|
assert dump.hexsha == sha |
|
|
|
|