##// END OF EJS Templates
remotefilelog: replace repack lock to solve race condition...
remotefilelog: replace repack lock to solve race condition 2c74337e6483 reduced the probability of race-conditions when starting background repack and prefetch and we saw the difference in our CI instance with all failures disappearing except one where one call to waitonrepack seems to returns too early. I'm not sure what exactly goes wrong but I realized that while the prefetch operation uses a standard Mercurial lock, the repack operation is using a custom lock based on `fcntl.flock` on available platforms. As `extutil.flock` fallback on traditional Mercurial locks on other platforms and the tests are stable on my laptop, our CI environment and GCC112, I'm sending this patch to standardize the behavior across environments. Differential Revision: https://phab.mercurial-scm.org/D6844

File last commit:

r43207:69de49c4 default
r43213:5fadf610 default
Show More
test_compressor.py
1735 lines | 59.5 KiB | text/x-python | PythonLexer
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 import hashlib
import io
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 import os
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 import struct
import sys
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 import tarfile
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 import tempfile
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 import unittest
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 import zstandard as zstd
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 from .common import (
make_cffi,
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 NonClosingBytesIO,
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 OpCountingBytesIO,
)
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435
if sys.version_info[0] >= 3:
next = lambda it: it.__next__()
else:
next = lambda it: it.next()
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 def multithreaded_chunk_size(level, source_size=0):
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 params = zstd.ZstdCompressionParameters.from_level(level,
source_size=source_size)
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796
return 1 << (params.window_log + 2)
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 @make_cffi
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 class TestCompressor(unittest.TestCase):
def test_level_bounds(self):
with self.assertRaises(ValueError):
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 zstd.ZstdCompressor(level=23)
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 def test_memory_size(self):
cctx = zstd.ZstdCompressor(level=1)
self.assertGreater(cctx.memory_size(), 100)
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 @make_cffi
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 class TestCompressor_compress(unittest.TestCase):
def test_compress_empty(self):
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 cctx = zstd.ZstdCompressor(level=1, write_content_size=False)
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 result = cctx.compress(b'')
self.assertEqual(result, b'\x28\xb5\x2f\xfd\x00\x48\x01\x00\x00')
params = zstd.get_frame_parameters(result)
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 self.assertEqual(params.content_size, zstd.CONTENTSIZE_UNKNOWN)
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 self.assertEqual(params.window_size, 524288)
self.assertEqual(params.dict_id, 0)
self.assertFalse(params.has_checksum, 0)
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 cctx = zstd.ZstdCompressor()
result = cctx.compress(b'')
self.assertEqual(result, b'\x28\xb5\x2f\xfd\x20\x00\x01\x00\x00')
params = zstd.get_frame_parameters(result)
self.assertEqual(params.content_size, 0)
def test_input_types(self):
cctx = zstd.ZstdCompressor(level=1, write_content_size=False)
expected = b'\x28\xb5\x2f\xfd\x00\x00\x19\x00\x00\x66\x6f\x6f'
Gregory Szorc
zstd: vendor python-zstandard 0.6.0...
r30822
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 mutable_array = bytearray(3)
mutable_array[:] = b'foo'
sources = [
memoryview(b'foo'),
bytearray(b'foo'),
mutable_array,
]
for source in sources:
self.assertEqual(cctx.compress(source), expected)
Gregory Szorc
zstd: vendor python-zstandard 0.6.0...
r30822
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 def test_compress_large(self):
chunks = []
for i in range(255):
chunks.append(struct.Struct('>B').pack(i) * 16384)
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 cctx = zstd.ZstdCompressor(level=3, write_content_size=False)
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 result = cctx.compress(b''.join(chunks))
self.assertEqual(len(result), 999)
self.assertEqual(result[0:4], b'\x28\xb5\x2f\xfd')
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 # This matches the test for read_to_iter() below.
cctx = zstd.ZstdCompressor(level=1, write_content_size=False)
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 result = cctx.compress(b'f' * zstd.COMPRESSION_RECOMMENDED_INPUT_SIZE + b'o')
self.assertEqual(result, b'\x28\xb5\x2f\xfd\x00\x40\x54\x00\x00'
b'\x10\x66\x66\x01\x00\xfb\xff\x39\xc0'
b'\x02\x09\x00\x00\x6f')
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 def test_negative_level(self):
cctx = zstd.ZstdCompressor(level=-4)
result = cctx.compress(b'foo' * 256)
def test_no_magic(self):
params = zstd.ZstdCompressionParameters.from_level(
1, format=zstd.FORMAT_ZSTD1)
cctx = zstd.ZstdCompressor(compression_params=params)
magic = cctx.compress(b'foobar')
params = zstd.ZstdCompressionParameters.from_level(
1, format=zstd.FORMAT_ZSTD1_MAGICLESS)
cctx = zstd.ZstdCompressor(compression_params=params)
no_magic = cctx.compress(b'foobar')
self.assertEqual(magic[0:4], b'\x28\xb5\x2f\xfd')
self.assertEqual(magic[4:], no_magic)
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 def test_write_checksum(self):
cctx = zstd.ZstdCompressor(level=1)
no_checksum = cctx.compress(b'foobar')
cctx = zstd.ZstdCompressor(level=1, write_checksum=True)
with_checksum = cctx.compress(b'foobar')
self.assertEqual(len(with_checksum), len(no_checksum) + 4)
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 no_params = zstd.get_frame_parameters(no_checksum)
with_params = zstd.get_frame_parameters(with_checksum)
self.assertFalse(no_params.has_checksum)
self.assertTrue(with_params.has_checksum)
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 def test_write_content_size(self):
cctx = zstd.ZstdCompressor(level=1)
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 with_size = cctx.compress(b'foobar' * 256)
cctx = zstd.ZstdCompressor(level=1, write_content_size=False)
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 no_size = cctx.compress(b'foobar' * 256)
self.assertEqual(len(with_size), len(no_size) + 1)
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 no_params = zstd.get_frame_parameters(no_size)
with_params = zstd.get_frame_parameters(with_size)
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 self.assertEqual(no_params.content_size, zstd.CONTENTSIZE_UNKNOWN)
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 self.assertEqual(with_params.content_size, 1536)
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 def test_no_dict_id(self):
samples = []
for i in range(128):
samples.append(b'foo' * 64)
samples.append(b'bar' * 64)
samples.append(b'foobar' * 64)
d = zstd.train_dictionary(1024, samples)
cctx = zstd.ZstdCompressor(level=1, dict_data=d)
with_dict_id = cctx.compress(b'foobarfoobar')
cctx = zstd.ZstdCompressor(level=1, dict_data=d, write_dict_id=False)
no_dict_id = cctx.compress(b'foobarfoobar')
self.assertEqual(len(with_dict_id), len(no_dict_id) + 4)
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 no_params = zstd.get_frame_parameters(no_dict_id)
with_params = zstd.get_frame_parameters(with_dict_id)
self.assertEqual(no_params.dict_id, 0)
Gregory Szorc
zstandard: vendor python-zstandard 0.10.1...
r40157 self.assertEqual(with_params.dict_id, 1880053135)
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 def test_compress_dict_multiple(self):
samples = []
for i in range(128):
samples.append(b'foo' * 64)
samples.append(b'bar' * 64)
samples.append(b'foobar' * 64)
d = zstd.train_dictionary(8192, samples)
cctx = zstd.ZstdCompressor(level=1, dict_data=d)
for i in range(32):
cctx.compress(b'foo bar foobar foo bar foobar')
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 def test_dict_precompute(self):
samples = []
for i in range(128):
samples.append(b'foo' * 64)
samples.append(b'bar' * 64)
samples.append(b'foobar' * 64)
d = zstd.train_dictionary(8192, samples)
d.precompute_compress(level=1)
cctx = zstd.ZstdCompressor(level=1, dict_data=d)
for i in range(32):
cctx.compress(b'foo bar foobar foo bar foobar')
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 def test_multithreaded(self):
chunk_size = multithreaded_chunk_size(1)
source = b''.join([b'x' * chunk_size, b'y' * chunk_size])
cctx = zstd.ZstdCompressor(level=1, threads=2)
compressed = cctx.compress(source)
params = zstd.get_frame_parameters(compressed)
self.assertEqual(params.content_size, chunk_size * 2)
self.assertEqual(params.dict_id, 0)
self.assertFalse(params.has_checksum)
dctx = zstd.ZstdDecompressor()
self.assertEqual(dctx.decompress(compressed), source)
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 def test_multithreaded_dict(self):
samples = []
for i in range(128):
samples.append(b'foo' * 64)
samples.append(b'bar' * 64)
samples.append(b'foobar' * 64)
d = zstd.train_dictionary(1024, samples)
cctx = zstd.ZstdCompressor(dict_data=d, threads=2)
result = cctx.compress(b'foo')
params = zstd.get_frame_parameters(result);
self.assertEqual(params.content_size, 3);
self.assertEqual(params.dict_id, d.dict_id())
self.assertEqual(result,
Gregory Szorc
zstandard: vendor python-zstandard 0.10.1...
r40157 b'\x28\xb5\x2f\xfd\x23\x8f\x55\x0f\x70\x03\x19\x00\x00'
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 b'\x66\x6f\x6f')
def test_multithreaded_compression_params(self):
params = zstd.ZstdCompressionParameters.from_level(0, threads=2)
cctx = zstd.ZstdCompressor(compression_params=params)
result = cctx.compress(b'foo')
params = zstd.get_frame_parameters(result);
self.assertEqual(params.content_size, 3);
self.assertEqual(result,
b'\x28\xb5\x2f\xfd\x20\x03\x19\x00\x00\x66\x6f\x6f')
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 @make_cffi
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 class TestCompressor_compressobj(unittest.TestCase):
def test_compressobj_empty(self):
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 cctx = zstd.ZstdCompressor(level=1, write_content_size=False)
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 cobj = cctx.compressobj()
self.assertEqual(cobj.compress(b''), b'')
self.assertEqual(cobj.flush(),
b'\x28\xb5\x2f\xfd\x00\x48\x01\x00\x00')
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 def test_input_types(self):
expected = b'\x28\xb5\x2f\xfd\x00\x48\x19\x00\x00\x66\x6f\x6f'
cctx = zstd.ZstdCompressor(level=1, write_content_size=False)
mutable_array = bytearray(3)
mutable_array[:] = b'foo'
sources = [
memoryview(b'foo'),
bytearray(b'foo'),
mutable_array,
]
for source in sources:
cobj = cctx.compressobj()
self.assertEqual(cobj.compress(source), b'')
self.assertEqual(cobj.flush(), expected)
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 def test_compressobj_large(self):
chunks = []
for i in range(255):
chunks.append(struct.Struct('>B').pack(i) * 16384)
cctx = zstd.ZstdCompressor(level=3)
cobj = cctx.compressobj()
result = cobj.compress(b''.join(chunks)) + cobj.flush()
self.assertEqual(len(result), 999)
self.assertEqual(result[0:4], b'\x28\xb5\x2f\xfd')
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 params = zstd.get_frame_parameters(result)
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 self.assertEqual(params.content_size, zstd.CONTENTSIZE_UNKNOWN)
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 self.assertEqual(params.window_size, 2097152)
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 self.assertEqual(params.dict_id, 0)
self.assertFalse(params.has_checksum)
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 def test_write_checksum(self):
cctx = zstd.ZstdCompressor(level=1)
cobj = cctx.compressobj()
no_checksum = cobj.compress(b'foobar') + cobj.flush()
cctx = zstd.ZstdCompressor(level=1, write_checksum=True)
cobj = cctx.compressobj()
with_checksum = cobj.compress(b'foobar') + cobj.flush()
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 no_params = zstd.get_frame_parameters(no_checksum)
with_params = zstd.get_frame_parameters(with_checksum)
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 self.assertEqual(no_params.content_size, zstd.CONTENTSIZE_UNKNOWN)
self.assertEqual(with_params.content_size, zstd.CONTENTSIZE_UNKNOWN)
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 self.assertEqual(no_params.dict_id, 0)
self.assertEqual(with_params.dict_id, 0)
self.assertFalse(no_params.has_checksum)
self.assertTrue(with_params.has_checksum)
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 self.assertEqual(len(with_checksum), len(no_checksum) + 4)
def test_write_content_size(self):
cctx = zstd.ZstdCompressor(level=1)
cobj = cctx.compressobj(size=len(b'foobar' * 256))
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 with_size = cobj.compress(b'foobar' * 256) + cobj.flush()
cctx = zstd.ZstdCompressor(level=1, write_content_size=False)
cobj = cctx.compressobj(size=len(b'foobar' * 256))
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 no_size = cobj.compress(b'foobar' * 256) + cobj.flush()
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 no_params = zstd.get_frame_parameters(no_size)
with_params = zstd.get_frame_parameters(with_size)
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 self.assertEqual(no_params.content_size, zstd.CONTENTSIZE_UNKNOWN)
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 self.assertEqual(with_params.content_size, 1536)
self.assertEqual(no_params.dict_id, 0)
self.assertEqual(with_params.dict_id, 0)
self.assertFalse(no_params.has_checksum)
self.assertFalse(with_params.has_checksum)
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 self.assertEqual(len(with_size), len(no_size) + 1)
Gregory Szorc
zstd: vendor python-zstandard 0.6.0...
r30822 def test_compress_after_finished(self):
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 cctx = zstd.ZstdCompressor()
cobj = cctx.compressobj()
cobj.compress(b'foo')
cobj.flush()
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 with self.assertRaisesRegexp(zstd.ZstdError, r'cannot call compress\(\) after compressor'):
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 cobj.compress(b'foo')
Gregory Szorc
zstd: vendor python-zstandard 0.6.0...
r30822 with self.assertRaisesRegexp(zstd.ZstdError, 'compressor object already finished'):
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 cobj.flush()
Gregory Szorc
zstd: vendor python-zstandard 0.6.0...
r30822 def test_flush_block_repeated(self):
cctx = zstd.ZstdCompressor(level=1)
cobj = cctx.compressobj()
self.assertEqual(cobj.compress(b'foo'), b'')
self.assertEqual(cobj.flush(zstd.COMPRESSOBJ_FLUSH_BLOCK),
b'\x28\xb5\x2f\xfd\x00\x48\x18\x00\x00foo')
self.assertEqual(cobj.compress(b'bar'), b'')
# 3 byte header plus content.
Gregory Szorc
zstandard: vendor python-zstandard 0.10.1...
r40157 self.assertEqual(cobj.flush(zstd.COMPRESSOBJ_FLUSH_BLOCK),
b'\x18\x00\x00bar')
self.assertEqual(cobj.flush(), b'\x01\x00\x00')
Gregory Szorc
zstd: vendor python-zstandard 0.6.0...
r30822
def test_flush_empty_block(self):
cctx = zstd.ZstdCompressor(write_checksum=True)
cobj = cctx.compressobj()
cobj.compress(b'foobar')
cobj.flush(zstd.COMPRESSOBJ_FLUSH_BLOCK)
# No-op if no block is active (this is internal to zstd).
self.assertEqual(cobj.flush(zstd.COMPRESSOBJ_FLUSH_BLOCK), b'')
trailing = cobj.flush()
# 3 bytes block header + 4 bytes frame checksum
self.assertEqual(len(trailing), 7)
header = trailing[0:3]
self.assertEqual(header, b'\x01\x00\x00')
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 def test_multithreaded(self):
source = io.BytesIO()
source.write(b'a' * 1048576)
source.write(b'b' * 1048576)
source.write(b'c' * 1048576)
source.seek(0)
cctx = zstd.ZstdCompressor(level=1, threads=2)
cobj = cctx.compressobj()
chunks = []
while True:
d = source.read(8192)
if not d:
break
chunks.append(cobj.compress(d))
chunks.append(cobj.flush())
compressed = b''.join(chunks)
self.assertEqual(len(compressed), 295)
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 def test_frame_progression(self):
cctx = zstd.ZstdCompressor()
self.assertEqual(cctx.frame_progression(), (0, 0, 0))
cobj = cctx.compressobj()
cobj.compress(b'foobar')
self.assertEqual(cctx.frame_progression(), (6, 0, 0))
cobj.flush()
self.assertEqual(cctx.frame_progression(), (6, 6, 15))
def test_bad_size(self):
cctx = zstd.ZstdCompressor()
cobj = cctx.compressobj(size=2)
with self.assertRaisesRegexp(zstd.ZstdError, 'Src size is incorrect'):
cobj.compress(b'foo')
# Try another operation on this instance.
with self.assertRaisesRegexp(zstd.ZstdError, 'Src size is incorrect'):
cobj.compress(b'aa')
# Try another operation on the compressor.
cctx.compressobj(size=4)
cctx.compress(b'foobar')
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 @make_cffi
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 class TestCompressor_copy_stream(unittest.TestCase):
def test_no_read(self):
source = object()
dest = io.BytesIO()
cctx = zstd.ZstdCompressor()
with self.assertRaises(ValueError):
cctx.copy_stream(source, dest)
def test_no_write(self):
source = io.BytesIO()
dest = object()
cctx = zstd.ZstdCompressor()
with self.assertRaises(ValueError):
cctx.copy_stream(source, dest)
def test_empty(self):
source = io.BytesIO()
dest = io.BytesIO()
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 cctx = zstd.ZstdCompressor(level=1, write_content_size=False)
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 r, w = cctx.copy_stream(source, dest)
self.assertEqual(int(r), 0)
self.assertEqual(w, 9)
self.assertEqual(dest.getvalue(),
b'\x28\xb5\x2f\xfd\x00\x48\x01\x00\x00')
def test_large_data(self):
source = io.BytesIO()
for i in range(255):
source.write(struct.Struct('>B').pack(i) * 16384)
source.seek(0)
dest = io.BytesIO()
cctx = zstd.ZstdCompressor()
r, w = cctx.copy_stream(source, dest)
self.assertEqual(r, 255 * 16384)
self.assertEqual(w, 999)
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 params = zstd.get_frame_parameters(dest.getvalue())
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 self.assertEqual(params.content_size, zstd.CONTENTSIZE_UNKNOWN)
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 self.assertEqual(params.window_size, 2097152)
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 self.assertEqual(params.dict_id, 0)
self.assertFalse(params.has_checksum)
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 def test_write_checksum(self):
source = io.BytesIO(b'foobar')
no_checksum = io.BytesIO()
cctx = zstd.ZstdCompressor(level=1)
cctx.copy_stream(source, no_checksum)
source.seek(0)
with_checksum = io.BytesIO()
cctx = zstd.ZstdCompressor(level=1, write_checksum=True)
cctx.copy_stream(source, with_checksum)
self.assertEqual(len(with_checksum.getvalue()),
len(no_checksum.getvalue()) + 4)
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 no_params = zstd.get_frame_parameters(no_checksum.getvalue())
with_params = zstd.get_frame_parameters(with_checksum.getvalue())
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 self.assertEqual(no_params.content_size, zstd.CONTENTSIZE_UNKNOWN)
self.assertEqual(with_params.content_size, zstd.CONTENTSIZE_UNKNOWN)
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 self.assertEqual(no_params.dict_id, 0)
self.assertEqual(with_params.dict_id, 0)
self.assertFalse(no_params.has_checksum)
self.assertTrue(with_params.has_checksum)
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 def test_write_content_size(self):
source = io.BytesIO(b'foobar' * 256)
no_size = io.BytesIO()
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 cctx = zstd.ZstdCompressor(level=1, write_content_size=False)
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 cctx.copy_stream(source, no_size)
source.seek(0)
with_size = io.BytesIO()
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 cctx = zstd.ZstdCompressor(level=1)
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 cctx.copy_stream(source, with_size)
# Source content size is unknown, so no content size written.
self.assertEqual(len(with_size.getvalue()),
len(no_size.getvalue()))
source.seek(0)
with_size = io.BytesIO()
cctx.copy_stream(source, with_size, size=len(source.getvalue()))
# We specified source size, so content size header is present.
self.assertEqual(len(with_size.getvalue()),
len(no_size.getvalue()) + 1)
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 no_params = zstd.get_frame_parameters(no_size.getvalue())
with_params = zstd.get_frame_parameters(with_size.getvalue())
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 self.assertEqual(no_params.content_size, zstd.CONTENTSIZE_UNKNOWN)
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 self.assertEqual(with_params.content_size, 1536)
self.assertEqual(no_params.dict_id, 0)
self.assertEqual(with_params.dict_id, 0)
self.assertFalse(no_params.has_checksum)
self.assertFalse(with_params.has_checksum)
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 def test_read_write_size(self):
source = OpCountingBytesIO(b'foobarfoobar')
dest = OpCountingBytesIO()
cctx = zstd.ZstdCompressor()
r, w = cctx.copy_stream(source, dest, read_size=1, write_size=1)
self.assertEqual(r, len(source.getvalue()))
self.assertEqual(w, 21)
self.assertEqual(source._read_count, len(source.getvalue()) + 1)
self.assertEqual(dest._write_count, len(dest.getvalue()))
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 def test_multithreaded(self):
source = io.BytesIO()
source.write(b'a' * 1048576)
source.write(b'b' * 1048576)
source.write(b'c' * 1048576)
source.seek(0)
dest = io.BytesIO()
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 cctx = zstd.ZstdCompressor(threads=2, write_content_size=False)
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 r, w = cctx.copy_stream(source, dest)
self.assertEqual(r, 3145728)
self.assertEqual(w, 295)
params = zstd.get_frame_parameters(dest.getvalue())
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 self.assertEqual(params.content_size, zstd.CONTENTSIZE_UNKNOWN)
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 self.assertEqual(params.dict_id, 0)
self.assertFalse(params.has_checksum)
# Writing content size and checksum works.
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 cctx = zstd.ZstdCompressor(threads=2, write_checksum=True)
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 dest = io.BytesIO()
source.seek(0)
cctx.copy_stream(source, dest, size=len(source.getvalue()))
params = zstd.get_frame_parameters(dest.getvalue())
self.assertEqual(params.content_size, 3145728)
self.assertEqual(params.dict_id, 0)
self.assertTrue(params.has_checksum)
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 def test_bad_size(self):
source = io.BytesIO()
source.write(b'a' * 32768)
source.write(b'b' * 32768)
source.seek(0)
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 dest = io.BytesIO()
cctx = zstd.ZstdCompressor()
with self.assertRaisesRegexp(zstd.ZstdError, 'Src size is incorrect'):
cctx.copy_stream(source, dest, size=42)
# Try another operation on this compressor.
source.seek(0)
dest = io.BytesIO()
cctx.copy_stream(source, dest)
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 @make_cffi
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 class TestCompressor_stream_reader(unittest.TestCase):
def test_context_manager(self):
cctx = zstd.ZstdCompressor()
with cctx.stream_reader(b'foo') as reader:
with self.assertRaisesRegexp(ValueError, 'cannot __enter__ multiple times'):
with reader as reader2:
pass
Gregory Szorc
zstandard: vendor python-zstandard 0.10.1...
r40157 def test_no_context_manager(self):
cctx = zstd.ZstdCompressor()
reader = cctx.stream_reader(b'foo')
reader.read(4)
self.assertFalse(reader.closed)
reader.close()
self.assertTrue(reader.closed)
with self.assertRaisesRegexp(ValueError, 'stream is closed'):
reader.read(1)
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 def test_not_implemented(self):
cctx = zstd.ZstdCompressor()
with cctx.stream_reader(b'foo' * 60) as reader:
with self.assertRaises(io.UnsupportedOperation):
reader.readline()
with self.assertRaises(io.UnsupportedOperation):
reader.readlines()
with self.assertRaises(io.UnsupportedOperation):
iter(reader)
with self.assertRaises(io.UnsupportedOperation):
next(reader)
with self.assertRaises(OSError):
reader.writelines([])
with self.assertRaises(OSError):
reader.write(b'foo')
def test_constant_methods(self):
cctx = zstd.ZstdCompressor()
with cctx.stream_reader(b'boo') as reader:
self.assertTrue(reader.readable())
self.assertFalse(reader.writable())
self.assertFalse(reader.seekable())
self.assertFalse(reader.isatty())
Gregory Szorc
zstandard: vendor python-zstandard 0.10.1...
r40157 self.assertFalse(reader.closed)
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 self.assertIsNone(reader.flush())
Gregory Szorc
zstandard: vendor python-zstandard 0.10.1...
r40157 self.assertFalse(reader.closed)
self.assertTrue(reader.closed)
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513
def test_read_closed(self):
cctx = zstd.ZstdCompressor()
with cctx.stream_reader(b'foo' * 60) as reader:
reader.close()
Gregory Szorc
zstandard: vendor python-zstandard 0.10.1...
r40157 self.assertTrue(reader.closed)
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 with self.assertRaisesRegexp(ValueError, 'stream is closed'):
reader.read(10)
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 def test_read_sizes(self):
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 cctx = zstd.ZstdCompressor()
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 foo = cctx.compress(b'foo')
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513
with cctx.stream_reader(b'foo') as reader:
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 with self.assertRaisesRegexp(ValueError, 'cannot read negative amounts less than -1'):
reader.read(-2)
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 self.assertEqual(reader.read(0), b'')
self.assertEqual(reader.read(), foo)
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513
def test_read_buffer(self):
cctx = zstd.ZstdCompressor()
source = b''.join([b'foo' * 60, b'bar' * 60, b'baz' * 60])
frame = cctx.compress(source)
with cctx.stream_reader(source) as reader:
self.assertEqual(reader.tell(), 0)
# We should get entire frame in one read.
result = reader.read(8192)
self.assertEqual(result, frame)
self.assertEqual(reader.tell(), len(result))
self.assertEqual(reader.read(), b'')
self.assertEqual(reader.tell(), len(result))
def test_read_buffer_small_chunks(self):
cctx = zstd.ZstdCompressor()
source = b'foo' * 60
chunks = []
with cctx.stream_reader(source) as reader:
self.assertEqual(reader.tell(), 0)
while True:
chunk = reader.read(1)
if not chunk:
break
chunks.append(chunk)
self.assertEqual(reader.tell(), sum(map(len, chunks)))
self.assertEqual(b''.join(chunks), cctx.compress(source))
def test_read_stream(self):
cctx = zstd.ZstdCompressor()
source = b''.join([b'foo' * 60, b'bar' * 60, b'baz' * 60])
frame = cctx.compress(source)
with cctx.stream_reader(io.BytesIO(source), size=len(source)) as reader:
self.assertEqual(reader.tell(), 0)
chunk = reader.read(8192)
self.assertEqual(chunk, frame)
self.assertEqual(reader.tell(), len(chunk))
self.assertEqual(reader.read(), b'')
self.assertEqual(reader.tell(), len(chunk))
def test_read_stream_small_chunks(self):
cctx = zstd.ZstdCompressor()
source = b'foo' * 60
chunks = []
with cctx.stream_reader(io.BytesIO(source), size=len(source)) as reader:
self.assertEqual(reader.tell(), 0)
while True:
chunk = reader.read(1)
if not chunk:
break
chunks.append(chunk)
self.assertEqual(reader.tell(), sum(map(len, chunks)))
self.assertEqual(b''.join(chunks), cctx.compress(source))
def test_read_after_exit(self):
cctx = zstd.ZstdCompressor()
with cctx.stream_reader(b'foo' * 60) as reader:
while reader.read(8192):
pass
Gregory Szorc
zstandard: vendor python-zstandard 0.10.1...
r40157 with self.assertRaisesRegexp(ValueError, 'stream is closed'):
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 reader.read(10)
def test_bad_size(self):
cctx = zstd.ZstdCompressor()
source = io.BytesIO(b'foobar')
with cctx.stream_reader(source, size=2) as reader:
with self.assertRaisesRegexp(zstd.ZstdError, 'Src size is incorrect'):
reader.read(10)
# Try another compression operation.
with cctx.stream_reader(source, size=42):
pass
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 def test_readall(self):
cctx = zstd.ZstdCompressor()
frame = cctx.compress(b'foo' * 1024)
reader = cctx.stream_reader(b'foo' * 1024)
self.assertEqual(reader.readall(), frame)
def test_readinto(self):
cctx = zstd.ZstdCompressor()
foo = cctx.compress(b'foo')
reader = cctx.stream_reader(b'foo')
with self.assertRaises(Exception):
reader.readinto(b'foobar')
# readinto() with sufficiently large destination.
b = bytearray(1024)
reader = cctx.stream_reader(b'foo')
self.assertEqual(reader.readinto(b), len(foo))
self.assertEqual(b[0:len(foo)], foo)
self.assertEqual(reader.readinto(b), 0)
self.assertEqual(b[0:len(foo)], foo)
# readinto() with small reads.
b = bytearray(1024)
reader = cctx.stream_reader(b'foo', read_size=1)
self.assertEqual(reader.readinto(b), len(foo))
self.assertEqual(b[0:len(foo)], foo)
# Too small destination buffer.
b = bytearray(2)
reader = cctx.stream_reader(b'foo')
self.assertEqual(reader.readinto(b), 2)
self.assertEqual(b[:], foo[0:2])
self.assertEqual(reader.readinto(b), 2)
self.assertEqual(b[:], foo[2:4])
self.assertEqual(reader.readinto(b), 2)
self.assertEqual(b[:], foo[4:6])
def test_readinto1(self):
cctx = zstd.ZstdCompressor()
foo = b''.join(cctx.read_to_iter(io.BytesIO(b'foo')))
reader = cctx.stream_reader(b'foo')
with self.assertRaises(Exception):
reader.readinto1(b'foobar')
b = bytearray(1024)
source = OpCountingBytesIO(b'foo')
reader = cctx.stream_reader(source)
self.assertEqual(reader.readinto1(b), len(foo))
self.assertEqual(b[0:len(foo)], foo)
self.assertEqual(source._read_count, 2)
# readinto1() with small reads.
b = bytearray(1024)
source = OpCountingBytesIO(b'foo')
reader = cctx.stream_reader(source, read_size=1)
self.assertEqual(reader.readinto1(b), len(foo))
self.assertEqual(b[0:len(foo)], foo)
self.assertEqual(source._read_count, 4)
def test_read1(self):
cctx = zstd.ZstdCompressor()
foo = b''.join(cctx.read_to_iter(io.BytesIO(b'foo')))
b = OpCountingBytesIO(b'foo')
reader = cctx.stream_reader(b)
self.assertEqual(reader.read1(), foo)
self.assertEqual(b._read_count, 2)
b = OpCountingBytesIO(b'foo')
reader = cctx.stream_reader(b)
self.assertEqual(reader.read1(0), b'')
self.assertEqual(reader.read1(2), foo[0:2])
self.assertEqual(b._read_count, 2)
self.assertEqual(reader.read1(2), foo[2:4])
self.assertEqual(reader.read1(1024), foo[4:])
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513
@make_cffi
class TestCompressor_stream_writer(unittest.TestCase):
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 def test_io_api(self):
buffer = io.BytesIO()
cctx = zstd.ZstdCompressor()
writer = cctx.stream_writer(buffer)
self.assertFalse(writer.isatty())
self.assertFalse(writer.readable())
with self.assertRaises(io.UnsupportedOperation):
writer.readline()
with self.assertRaises(io.UnsupportedOperation):
writer.readline(42)
with self.assertRaises(io.UnsupportedOperation):
writer.readline(size=42)
with self.assertRaises(io.UnsupportedOperation):
writer.readlines()
with self.assertRaises(io.UnsupportedOperation):
writer.readlines(42)
with self.assertRaises(io.UnsupportedOperation):
writer.readlines(hint=42)
with self.assertRaises(io.UnsupportedOperation):
writer.seek(0)
with self.assertRaises(io.UnsupportedOperation):
writer.seek(10, os.SEEK_SET)
self.assertFalse(writer.seekable())
with self.assertRaises(io.UnsupportedOperation):
writer.truncate()
with self.assertRaises(io.UnsupportedOperation):
writer.truncate(42)
with self.assertRaises(io.UnsupportedOperation):
writer.truncate(size=42)
self.assertTrue(writer.writable())
with self.assertRaises(NotImplementedError):
writer.writelines([])
with self.assertRaises(io.UnsupportedOperation):
writer.read()
with self.assertRaises(io.UnsupportedOperation):
writer.read(42)
with self.assertRaises(io.UnsupportedOperation):
writer.read(size=42)
with self.assertRaises(io.UnsupportedOperation):
writer.readall()
with self.assertRaises(io.UnsupportedOperation):
writer.readinto(None)
with self.assertRaises(io.UnsupportedOperation):
writer.fileno()
self.assertFalse(writer.closed)
def test_fileno_file(self):
with tempfile.TemporaryFile('wb') as tf:
cctx = zstd.ZstdCompressor()
writer = cctx.stream_writer(tf)
self.assertEqual(writer.fileno(), tf.fileno())
def test_close(self):
buffer = NonClosingBytesIO()
cctx = zstd.ZstdCompressor(level=1)
writer = cctx.stream_writer(buffer)
writer.write(b'foo' * 1024)
self.assertFalse(writer.closed)
self.assertFalse(buffer.closed)
writer.close()
self.assertTrue(writer.closed)
self.assertTrue(buffer.closed)
with self.assertRaisesRegexp(ValueError, 'stream is closed'):
writer.write(b'foo')
with self.assertRaisesRegexp(ValueError, 'stream is closed'):
writer.flush()
with self.assertRaisesRegexp(ValueError, 'stream is closed'):
with writer:
pass
self.assertEqual(buffer.getvalue(),
b'\x28\xb5\x2f\xfd\x00\x48\x55\x00\x00\x18\x66\x6f'
b'\x6f\x01\x00\xfa\xd3\x77\x43')
# Context manager exit should close stream.
buffer = io.BytesIO()
writer = cctx.stream_writer(buffer)
with writer:
writer.write(b'foo')
self.assertTrue(writer.closed)
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 def test_empty(self):
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 buffer = NonClosingBytesIO()
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 cctx = zstd.ZstdCompressor(level=1, write_content_size=False)
with cctx.stream_writer(buffer) as compressor:
compressor.write(b'')
result = buffer.getvalue()
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 self.assertEqual(result, b'\x28\xb5\x2f\xfd\x00\x48\x01\x00\x00')
params = zstd.get_frame_parameters(result)
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 self.assertEqual(params.content_size, zstd.CONTENTSIZE_UNKNOWN)
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 self.assertEqual(params.window_size, 524288)
self.assertEqual(params.dict_id, 0)
self.assertFalse(params.has_checksum)
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 # Test without context manager.
buffer = io.BytesIO()
compressor = cctx.stream_writer(buffer)
self.assertEqual(compressor.write(b''), 0)
self.assertEqual(buffer.getvalue(), b'')
self.assertEqual(compressor.flush(zstd.FLUSH_FRAME), 9)
result = buffer.getvalue()
self.assertEqual(result, b'\x28\xb5\x2f\xfd\x00\x48\x01\x00\x00')
params = zstd.get_frame_parameters(result)
self.assertEqual(params.content_size, zstd.CONTENTSIZE_UNKNOWN)
self.assertEqual(params.window_size, 524288)
self.assertEqual(params.dict_id, 0)
self.assertFalse(params.has_checksum)
# Test write_return_read=True
compressor = cctx.stream_writer(buffer, write_return_read=True)
self.assertEqual(compressor.write(b''), 0)
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 def test_input_types(self):
expected = b'\x28\xb5\x2f\xfd\x00\x48\x19\x00\x00\x66\x6f\x6f'
cctx = zstd.ZstdCompressor(level=1)
mutable_array = bytearray(3)
mutable_array[:] = b'foo'
sources = [
memoryview(b'foo'),
bytearray(b'foo'),
mutable_array,
]
for source in sources:
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 buffer = NonClosingBytesIO()
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 with cctx.stream_writer(buffer) as compressor:
compressor.write(source)
self.assertEqual(buffer.getvalue(), expected)
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 compressor = cctx.stream_writer(buffer, write_return_read=True)
self.assertEqual(compressor.write(source), len(source))
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 def test_multiple_compress(self):
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 buffer = NonClosingBytesIO()
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 cctx = zstd.ZstdCompressor(level=5)
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 with cctx.stream_writer(buffer) as compressor:
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 self.assertEqual(compressor.write(b'foo'), 0)
self.assertEqual(compressor.write(b'bar'), 0)
self.assertEqual(compressor.write(b'x' * 8192), 0)
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435
result = buffer.getvalue()
self.assertEqual(result,
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 b'\x28\xb5\x2f\xfd\x00\x58\x75\x00\x00\x38\x66\x6f'
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 b'\x6f\x62\x61\x72\x78\x01\x00\xfc\xdf\x03\x23')
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 # Test without context manager.
buffer = io.BytesIO()
compressor = cctx.stream_writer(buffer)
self.assertEqual(compressor.write(b'foo'), 0)
self.assertEqual(compressor.write(b'bar'), 0)
self.assertEqual(compressor.write(b'x' * 8192), 0)
self.assertEqual(compressor.flush(zstd.FLUSH_FRAME), 23)
result = buffer.getvalue()
self.assertEqual(result,
b'\x28\xb5\x2f\xfd\x00\x58\x75\x00\x00\x38\x66\x6f'
b'\x6f\x62\x61\x72\x78\x01\x00\xfc\xdf\x03\x23')
# Test with write_return_read=True.
compressor = cctx.stream_writer(buffer, write_return_read=True)
self.assertEqual(compressor.write(b'foo'), 3)
self.assertEqual(compressor.write(b'barbiz'), 6)
self.assertEqual(compressor.write(b'x' * 8192), 8192)
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 def test_dictionary(self):
samples = []
for i in range(128):
samples.append(b'foo' * 64)
samples.append(b'bar' * 64)
samples.append(b'foobar' * 64)
d = zstd.train_dictionary(8192, samples)
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 h = hashlib.sha1(d.as_bytes()).hexdigest()
Gregory Szorc
zstandard: vendor python-zstandard 0.12...
r43207 self.assertEqual(h, '7a2e59a876db958f74257141045af8f912e00d4e')
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 buffer = NonClosingBytesIO()
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 cctx = zstd.ZstdCompressor(level=9, dict_data=d)
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 with cctx.stream_writer(buffer) as compressor:
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 self.assertEqual(compressor.write(b'foo'), 0)
self.assertEqual(compressor.write(b'bar'), 0)
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 self.assertEqual(compressor.write(b'foo' * 16384), 0)
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435
compressed = buffer.getvalue()
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895
params = zstd.get_frame_parameters(compressed)
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 self.assertEqual(params.content_size, zstd.CONTENTSIZE_UNKNOWN)
self.assertEqual(params.window_size, 2097152)
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 self.assertEqual(params.dict_id, d.dict_id())
self.assertFalse(params.has_checksum)
Gregory Szorc
zstandard: vendor python-zstandard 0.10.1...
r40157
h = hashlib.sha1(compressed).hexdigest()
Gregory Szorc
zstandard: vendor python-zstandard 0.12...
r43207 self.assertEqual(h, '0a7c05635061f58039727cdbe76388c6f4cfef06')
Gregory Szorc
zstandard: vendor python-zstandard 0.10.1...
r40157
source = b'foo' + b'bar' + (b'foo' * 16384)
dctx = zstd.ZstdDecompressor(dict_data=d)
self.assertEqual(dctx.decompress(compressed, max_output_size=len(source)),
source)
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435
def test_compression_params(self):
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 params = zstd.ZstdCompressionParameters(
window_log=20,
chain_log=6,
hash_log=12,
min_match=5,
search_log=4,
target_length=10,
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 strategy=zstd.STRATEGY_FAST)
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 buffer = NonClosingBytesIO()
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 cctx = zstd.ZstdCompressor(compression_params=params)
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 with cctx.stream_writer(buffer) as compressor:
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 self.assertEqual(compressor.write(b'foo'), 0)
self.assertEqual(compressor.write(b'bar'), 0)
self.assertEqual(compressor.write(b'foobar' * 16384), 0)
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435
compressed = buffer.getvalue()
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895
params = zstd.get_frame_parameters(compressed)
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 self.assertEqual(params.content_size, zstd.CONTENTSIZE_UNKNOWN)
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 self.assertEqual(params.window_size, 1048576)
self.assertEqual(params.dict_id, 0)
self.assertFalse(params.has_checksum)
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 h = hashlib.sha1(compressed).hexdigest()
Gregory Szorc
zstandard: vendor python-zstandard 0.12...
r43207 self.assertEqual(h, 'dd4bb7d37c1a0235b38a2f6b462814376843ef0b')
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435
def test_write_checksum(self):
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 no_checksum = NonClosingBytesIO()
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 cctx = zstd.ZstdCompressor(level=1)
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 with cctx.stream_writer(no_checksum) as compressor:
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 self.assertEqual(compressor.write(b'foobar'), 0)
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 with_checksum = NonClosingBytesIO()
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 cctx = zstd.ZstdCompressor(level=1, write_checksum=True)
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 with cctx.stream_writer(with_checksum) as compressor:
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 self.assertEqual(compressor.write(b'foobar'), 0)
no_params = zstd.get_frame_parameters(no_checksum.getvalue())
with_params = zstd.get_frame_parameters(with_checksum.getvalue())
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 self.assertEqual(no_params.content_size, zstd.CONTENTSIZE_UNKNOWN)
self.assertEqual(with_params.content_size, zstd.CONTENTSIZE_UNKNOWN)
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 self.assertEqual(no_params.dict_id, 0)
self.assertEqual(with_params.dict_id, 0)
self.assertFalse(no_params.has_checksum)
self.assertTrue(with_params.has_checksum)
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435
self.assertEqual(len(with_checksum.getvalue()),
len(no_checksum.getvalue()) + 4)
def test_write_content_size(self):
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 no_size = NonClosingBytesIO()
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 cctx = zstd.ZstdCompressor(level=1, write_content_size=False)
with cctx.stream_writer(no_size) as compressor:
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 self.assertEqual(compressor.write(b'foobar' * 256), 0)
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 with_size = NonClosingBytesIO()
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 cctx = zstd.ZstdCompressor(level=1)
with cctx.stream_writer(with_size) as compressor:
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 self.assertEqual(compressor.write(b'foobar' * 256), 0)
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435
# Source size is not known in streaming mode, so header not
# written.
self.assertEqual(len(with_size.getvalue()),
len(no_size.getvalue()))
# Declaring size will write the header.
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 with_size = NonClosingBytesIO()
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 with cctx.stream_writer(with_size, size=len(b'foobar' * 256)) as compressor:
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 self.assertEqual(compressor.write(b'foobar' * 256), 0)
no_params = zstd.get_frame_parameters(no_size.getvalue())
with_params = zstd.get_frame_parameters(with_size.getvalue())
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 self.assertEqual(no_params.content_size, zstd.CONTENTSIZE_UNKNOWN)
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 self.assertEqual(with_params.content_size, 1536)
self.assertEqual(no_params.dict_id, 0)
self.assertEqual(with_params.dict_id, 0)
self.assertFalse(no_params.has_checksum)
self.assertFalse(with_params.has_checksum)
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435
self.assertEqual(len(with_size.getvalue()),
len(no_size.getvalue()) + 1)
def test_no_dict_id(self):
samples = []
for i in range(128):
samples.append(b'foo' * 64)
samples.append(b'bar' * 64)
samples.append(b'foobar' * 64)
d = zstd.train_dictionary(1024, samples)
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 with_dict_id = NonClosingBytesIO()
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 cctx = zstd.ZstdCompressor(level=1, dict_data=d)
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 with cctx.stream_writer(with_dict_id) as compressor:
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 self.assertEqual(compressor.write(b'foobarfoobar'), 0)
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 self.assertEqual(with_dict_id.getvalue()[4:5], b'\x03')
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 cctx = zstd.ZstdCompressor(level=1, dict_data=d, write_dict_id=False)
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 no_dict_id = NonClosingBytesIO()
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 with cctx.stream_writer(no_dict_id) as compressor:
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 self.assertEqual(compressor.write(b'foobarfoobar'), 0)
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 self.assertEqual(no_dict_id.getvalue()[4:5], b'\x00')
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 no_params = zstd.get_frame_parameters(no_dict_id.getvalue())
with_params = zstd.get_frame_parameters(with_dict_id.getvalue())
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 self.assertEqual(no_params.content_size, zstd.CONTENTSIZE_UNKNOWN)
self.assertEqual(with_params.content_size, zstd.CONTENTSIZE_UNKNOWN)
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 self.assertEqual(no_params.dict_id, 0)
self.assertEqual(with_params.dict_id, d.dict_id())
self.assertFalse(no_params.has_checksum)
self.assertFalse(with_params.has_checksum)
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435
self.assertEqual(len(with_dict_id.getvalue()),
len(no_dict_id.getvalue()) + 4)
def test_memory_size(self):
cctx = zstd.ZstdCompressor(level=3)
buffer = io.BytesIO()
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 with cctx.stream_writer(buffer) as compressor:
compressor.write(b'foo')
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 size = compressor.memory_size()
self.assertGreater(size, 100000)
def test_write_size(self):
cctx = zstd.ZstdCompressor(level=3)
dest = OpCountingBytesIO()
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 with cctx.stream_writer(dest, write_size=1) as compressor:
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 self.assertEqual(compressor.write(b'foo'), 0)
self.assertEqual(compressor.write(b'bar'), 0)
self.assertEqual(compressor.write(b'foobar'), 0)
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435
self.assertEqual(len(dest.getvalue()), dest._write_count)
Gregory Szorc
zstd: vendor python-zstandard 0.6.0...
r30822 def test_flush_repeated(self):
cctx = zstd.ZstdCompressor(level=3)
dest = OpCountingBytesIO()
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 with cctx.stream_writer(dest) as compressor:
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 self.assertEqual(compressor.write(b'foo'), 0)
Gregory Szorc
zstd: vendor python-zstandard 0.6.0...
r30822 self.assertEqual(dest._write_count, 0)
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 self.assertEqual(compressor.flush(), 12)
Gregory Szorc
zstd: vendor python-zstandard 0.6.0...
r30822 self.assertEqual(dest._write_count, 1)
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 self.assertEqual(compressor.write(b'bar'), 0)
Gregory Szorc
zstd: vendor python-zstandard 0.6.0...
r30822 self.assertEqual(dest._write_count, 1)
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 self.assertEqual(compressor.flush(), 6)
Gregory Szorc
zstd: vendor python-zstandard 0.6.0...
r30822 self.assertEqual(dest._write_count, 2)
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 self.assertEqual(compressor.write(b'baz'), 0)
Gregory Szorc
zstd: vendor python-zstandard 0.6.0...
r30822
self.assertEqual(dest._write_count, 3)
def test_flush_empty_block(self):
cctx = zstd.ZstdCompressor(level=3, write_checksum=True)
dest = OpCountingBytesIO()
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 with cctx.stream_writer(dest) as compressor:
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 self.assertEqual(compressor.write(b'foobar' * 8192), 0)
Gregory Szorc
zstd: vendor python-zstandard 0.6.0...
r30822 count = dest._write_count
offset = dest.tell()
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 self.assertEqual(compressor.flush(), 23)
Gregory Szorc
zstd: vendor python-zstandard 0.6.0...
r30822 self.assertGreater(dest._write_count, count)
self.assertGreater(dest.tell(), offset)
offset = dest.tell()
# Ending the write here should cause an empty block to be written
# to denote end of frame.
trailing = dest.getvalue()[offset:]
# 3 bytes block header + 4 bytes frame checksum
self.assertEqual(len(trailing), 7)
header = trailing[0:3]
self.assertEqual(header, b'\x01\x00\x00')
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 def test_flush_frame(self):
cctx = zstd.ZstdCompressor(level=3)
dest = OpCountingBytesIO()
with cctx.stream_writer(dest) as compressor:
self.assertEqual(compressor.write(b'foobar' * 8192), 0)
self.assertEqual(compressor.flush(zstd.FLUSH_FRAME), 23)
compressor.write(b'biz' * 16384)
self.assertEqual(dest.getvalue(),
# Frame 1.
b'\x28\xb5\x2f\xfd\x00\x58\x75\x00\x00\x30\x66\x6f\x6f'
b'\x62\x61\x72\x01\x00\xf7\xbf\xe8\xa5\x08'
# Frame 2.
b'\x28\xb5\x2f\xfd\x00\x58\x5d\x00\x00\x18\x62\x69\x7a'
b'\x01\x00\xfa\x3f\x75\x37\x04')
def test_bad_flush_mode(self):
cctx = zstd.ZstdCompressor()
dest = io.BytesIO()
with cctx.stream_writer(dest) as compressor:
with self.assertRaisesRegexp(ValueError, 'unknown flush_mode: 42'):
compressor.flush(flush_mode=42)
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 def test_multithreaded(self):
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 dest = NonClosingBytesIO()
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 cctx = zstd.ZstdCompressor(threads=2)
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 with cctx.stream_writer(dest) as compressor:
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 compressor.write(b'a' * 1048576)
compressor.write(b'b' * 1048576)
compressor.write(b'c' * 1048576)
self.assertEqual(len(dest.getvalue()), 295)
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 def test_tell(self):
dest = io.BytesIO()
cctx = zstd.ZstdCompressor()
with cctx.stream_writer(dest) as compressor:
self.assertEqual(compressor.tell(), 0)
for i in range(256):
compressor.write(b'foo' * (i + 1))
self.assertEqual(compressor.tell(), dest.tell())
def test_bad_size(self):
cctx = zstd.ZstdCompressor()
dest = io.BytesIO()
with self.assertRaisesRegexp(zstd.ZstdError, 'Src size is incorrect'):
with cctx.stream_writer(dest, size=2) as compressor:
compressor.write(b'foo')
# Test another operation.
with cctx.stream_writer(dest, size=42):
pass
def test_tarfile_compat(self):
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 dest = NonClosingBytesIO()
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 cctx = zstd.ZstdCompressor()
with cctx.stream_writer(dest) as compressor:
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 with tarfile.open('tf', mode='w|', fileobj=compressor) as tf:
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 tf.add(__file__, 'test_compressor.py')
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 dest = io.BytesIO(dest.getvalue())
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513
dctx = zstd.ZstdDecompressor()
with dctx.stream_reader(dest) as reader:
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 with tarfile.open(mode='r|', fileobj=reader) as tf:
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 for member in tf:
self.assertEqual(member.name, 'test_compressor.py')
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 @make_cffi
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 class TestCompressor_read_to_iter(unittest.TestCase):
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 def test_type_validation(self):
cctx = zstd.ZstdCompressor()
# Object with read() works.
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 for chunk in cctx.read_to_iter(io.BytesIO()):
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 pass
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435
# Buffer protocol works.
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 for chunk in cctx.read_to_iter(b'foobar'):
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 pass
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435
with self.assertRaisesRegexp(ValueError, 'must pass an object with a read'):
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 for chunk in cctx.read_to_iter(True):
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 pass
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435
def test_read_empty(self):
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 cctx = zstd.ZstdCompressor(level=1, write_content_size=False)
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435
source = io.BytesIO()
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 it = cctx.read_to_iter(source)
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 chunks = list(it)
self.assertEqual(len(chunks), 1)
compressed = b''.join(chunks)
self.assertEqual(compressed, b'\x28\xb5\x2f\xfd\x00\x48\x01\x00\x00')
# And again with the buffer protocol.
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 it = cctx.read_to_iter(b'')
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 chunks = list(it)
self.assertEqual(len(chunks), 1)
compressed2 = b''.join(chunks)
self.assertEqual(compressed2, compressed)
def test_read_large(self):
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 cctx = zstd.ZstdCompressor(level=1, write_content_size=False)
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435
source = io.BytesIO()
source.write(b'f' * zstd.COMPRESSION_RECOMMENDED_INPUT_SIZE)
source.write(b'o')
source.seek(0)
# Creating an iterator should not perform any compression until
# first read.
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 it = cctx.read_to_iter(source, size=len(source.getvalue()))
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 self.assertEqual(source.tell(), 0)
# We should have exactly 2 output chunks.
chunks = []
chunk = next(it)
self.assertIsNotNone(chunk)
self.assertEqual(source.tell(), zstd.COMPRESSION_RECOMMENDED_INPUT_SIZE)
chunks.append(chunk)
chunk = next(it)
self.assertIsNotNone(chunk)
chunks.append(chunk)
self.assertEqual(source.tell(), len(source.getvalue()))
with self.assertRaises(StopIteration):
next(it)
# And again for good measure.
with self.assertRaises(StopIteration):
next(it)
# We should get the same output as the one-shot compression mechanism.
self.assertEqual(b''.join(chunks), cctx.compress(source.getvalue()))
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 params = zstd.get_frame_parameters(b''.join(chunks))
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 self.assertEqual(params.content_size, zstd.CONTENTSIZE_UNKNOWN)
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 self.assertEqual(params.window_size, 262144)
self.assertEqual(params.dict_id, 0)
self.assertFalse(params.has_checksum)
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 # Now check the buffer protocol.
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 it = cctx.read_to_iter(source.getvalue())
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 chunks = list(it)
self.assertEqual(len(chunks), 2)
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513
params = zstd.get_frame_parameters(b''.join(chunks))
self.assertEqual(params.content_size, zstd.CONTENTSIZE_UNKNOWN)
#self.assertEqual(params.window_size, 262144)
self.assertEqual(params.dict_id, 0)
self.assertFalse(params.has_checksum)
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 self.assertEqual(b''.join(chunks), cctx.compress(source.getvalue()))
def test_read_write_size(self):
source = OpCountingBytesIO(b'foobarfoobar')
cctx = zstd.ZstdCompressor(level=3)
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 for chunk in cctx.read_to_iter(source, read_size=1, write_size=1):
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 self.assertEqual(len(chunk), 1)
self.assertEqual(source._read_count, len(source.getvalue()) + 1)
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796
def test_multithreaded(self):
source = io.BytesIO()
source.write(b'a' * 1048576)
source.write(b'b' * 1048576)
source.write(b'c' * 1048576)
source.seek(0)
cctx = zstd.ZstdCompressor(threads=2)
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 compressed = b''.join(cctx.read_to_iter(source))
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 self.assertEqual(len(compressed), 295)
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 def test_bad_size(self):
cctx = zstd.ZstdCompressor()
source = io.BytesIO(b'a' * 42)
with self.assertRaisesRegexp(zstd.ZstdError, 'Src size is incorrect'):
b''.join(cctx.read_to_iter(source, size=2))
# Test another operation on errored compressor.
b''.join(cctx.read_to_iter(source))
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796
Gregory Szorc
zstandard: vendor python-zstandard 0.10.1...
r40157 @make_cffi
class TestCompressor_chunker(unittest.TestCase):
def test_empty(self):
cctx = zstd.ZstdCompressor(write_content_size=False)
chunker = cctx.chunker()
it = chunker.compress(b'')
with self.assertRaises(StopIteration):
next(it)
it = chunker.finish()
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 self.assertEqual(next(it), b'\x28\xb5\x2f\xfd\x00\x58\x01\x00\x00')
Gregory Szorc
zstandard: vendor python-zstandard 0.10.1...
r40157
with self.assertRaises(StopIteration):
next(it)
def test_simple_input(self):
cctx = zstd.ZstdCompressor()
chunker = cctx.chunker()
it = chunker.compress(b'foobar')
with self.assertRaises(StopIteration):
next(it)
it = chunker.compress(b'baz' * 30)
with self.assertRaises(StopIteration):
next(it)
it = chunker.finish()
self.assertEqual(next(it),
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 b'\x28\xb5\x2f\xfd\x00\x58\x7d\x00\x00\x48\x66\x6f'
Gregory Szorc
zstandard: vendor python-zstandard 0.10.1...
r40157 b'\x6f\x62\x61\x72\x62\x61\x7a\x01\x00\xe4\xe4\x8e')
with self.assertRaises(StopIteration):
next(it)
def test_input_size(self):
cctx = zstd.ZstdCompressor()
chunker = cctx.chunker(size=1024)
it = chunker.compress(b'x' * 1000)
with self.assertRaises(StopIteration):
next(it)
it = chunker.compress(b'y' * 24)
with self.assertRaises(StopIteration):
next(it)
chunks = list(chunker.finish())
self.assertEqual(chunks, [
b'\x28\xb5\x2f\xfd\x60\x00\x03\x65\x00\x00\x18\x78\x78\x79\x02\x00'
b'\xa0\x16\xe3\x2b\x80\x05'
])
dctx = zstd.ZstdDecompressor()
self.assertEqual(dctx.decompress(b''.join(chunks)),
(b'x' * 1000) + (b'y' * 24))
def test_small_chunk_size(self):
cctx = zstd.ZstdCompressor()
chunker = cctx.chunker(chunk_size=1)
chunks = list(chunker.compress(b'foo' * 1024))
self.assertEqual(chunks, [])
chunks = list(chunker.finish())
self.assertTrue(all(len(chunk) == 1 for chunk in chunks))
self.assertEqual(
b''.join(chunks),
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 b'\x28\xb5\x2f\xfd\x00\x58\x55\x00\x00\x18\x66\x6f\x6f\x01\x00'
Gregory Szorc
zstandard: vendor python-zstandard 0.10.1...
r40157 b'\xfa\xd3\x77\x43')
dctx = zstd.ZstdDecompressor()
self.assertEqual(dctx.decompress(b''.join(chunks),
max_output_size=10000),
b'foo' * 1024)
def test_input_types(self):
cctx = zstd.ZstdCompressor()
mutable_array = bytearray(3)
mutable_array[:] = b'foo'
sources = [
memoryview(b'foo'),
bytearray(b'foo'),
mutable_array,
]
for source in sources:
chunker = cctx.chunker()
self.assertEqual(list(chunker.compress(source)), [])
self.assertEqual(list(chunker.finish()), [
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 b'\x28\xb5\x2f\xfd\x00\x58\x19\x00\x00\x66\x6f\x6f'
Gregory Szorc
zstandard: vendor python-zstandard 0.10.1...
r40157 ])
def test_flush(self):
cctx = zstd.ZstdCompressor()
chunker = cctx.chunker()
self.assertEqual(list(chunker.compress(b'foo' * 1024)), [])
self.assertEqual(list(chunker.compress(b'bar' * 1024)), [])
chunks1 = list(chunker.flush())
self.assertEqual(chunks1, [
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 b'\x28\xb5\x2f\xfd\x00\x58\x8c\x00\x00\x30\x66\x6f\x6f\x62\x61\x72'
Gregory Szorc
zstandard: vendor python-zstandard 0.10.1...
r40157 b'\x02\x00\xfa\x03\xfe\xd0\x9f\xbe\x1b\x02'
])
self.assertEqual(list(chunker.flush()), [])
self.assertEqual(list(chunker.flush()), [])
self.assertEqual(list(chunker.compress(b'baz' * 1024)), [])
chunks2 = list(chunker.flush())
self.assertEqual(len(chunks2), 1)
chunks3 = list(chunker.finish())
self.assertEqual(len(chunks2), 1)
dctx = zstd.ZstdDecompressor()
self.assertEqual(dctx.decompress(b''.join(chunks1 + chunks2 + chunks3),
max_output_size=10000),
(b'foo' * 1024) + (b'bar' * 1024) + (b'baz' * 1024))
def test_compress_after_finish(self):
cctx = zstd.ZstdCompressor()
chunker = cctx.chunker()
list(chunker.compress(b'foo'))
list(chunker.finish())
with self.assertRaisesRegexp(
zstd.ZstdError,
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 r'cannot call compress\(\) after compression finished'):
Gregory Szorc
zstandard: vendor python-zstandard 0.10.1...
r40157 list(chunker.compress(b'foo'))
def test_flush_after_finish(self):
cctx = zstd.ZstdCompressor()
chunker = cctx.chunker()
list(chunker.compress(b'foo'))
list(chunker.finish())
with self.assertRaisesRegexp(
zstd.ZstdError,
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 r'cannot call flush\(\) after compression finished'):
Gregory Szorc
zstandard: vendor python-zstandard 0.10.1...
r40157 list(chunker.flush())
def test_finish_after_finish(self):
cctx = zstd.ZstdCompressor()
chunker = cctx.chunker()
list(chunker.compress(b'foo'))
list(chunker.finish())
with self.assertRaisesRegexp(
zstd.ZstdError,
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 r'cannot call finish\(\) after compression finished'):
Gregory Szorc
zstandard: vendor python-zstandard 0.10.1...
r40157 list(chunker.finish())
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 class TestCompressor_multi_compress_to_buffer(unittest.TestCase):
def test_invalid_inputs(self):
cctx = zstd.ZstdCompressor()
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 if not hasattr(cctx, 'multi_compress_to_buffer'):
self.skipTest('multi_compress_to_buffer not available')
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 with self.assertRaises(TypeError):
cctx.multi_compress_to_buffer(True)
with self.assertRaises(TypeError):
cctx.multi_compress_to_buffer((1, 2))
with self.assertRaisesRegexp(TypeError, 'item 0 not a bytes like object'):
cctx.multi_compress_to_buffer([u'foo'])
def test_empty_input(self):
cctx = zstd.ZstdCompressor()
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 if not hasattr(cctx, 'multi_compress_to_buffer'):
self.skipTest('multi_compress_to_buffer not available')
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 with self.assertRaisesRegexp(ValueError, 'no source elements found'):
cctx.multi_compress_to_buffer([])
with self.assertRaisesRegexp(ValueError, 'source elements are empty'):
cctx.multi_compress_to_buffer([b'', b'', b''])
def test_list_input(self):
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 cctx = zstd.ZstdCompressor(write_checksum=True)
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 if not hasattr(cctx, 'multi_compress_to_buffer'):
self.skipTest('multi_compress_to_buffer not available')
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 original = [b'foo' * 12, b'bar' * 6]
frames = [cctx.compress(c) for c in original]
b = cctx.multi_compress_to_buffer(original)
self.assertIsInstance(b, zstd.BufferWithSegmentsCollection)
self.assertEqual(len(b), 2)
self.assertEqual(b.size(), 44)
self.assertEqual(b[0].tobytes(), frames[0])
self.assertEqual(b[1].tobytes(), frames[1])
def test_buffer_with_segments_input(self):
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 cctx = zstd.ZstdCompressor(write_checksum=True)
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 if not hasattr(cctx, 'multi_compress_to_buffer'):
self.skipTest('multi_compress_to_buffer not available')
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 original = [b'foo' * 4, b'bar' * 6]
frames = [cctx.compress(c) for c in original]
offsets = struct.pack('=QQQQ', 0, len(original[0]),
len(original[0]), len(original[1]))
segments = zstd.BufferWithSegments(b''.join(original), offsets)
result = cctx.multi_compress_to_buffer(segments)
self.assertEqual(len(result), 2)
self.assertEqual(result.size(), 47)
self.assertEqual(result[0].tobytes(), frames[0])
self.assertEqual(result[1].tobytes(), frames[1])
def test_buffer_with_segments_collection_input(self):
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 cctx = zstd.ZstdCompressor(write_checksum=True)
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 if not hasattr(cctx, 'multi_compress_to_buffer'):
self.skipTest('multi_compress_to_buffer not available')
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 original = [
b'foo1',
b'foo2' * 2,
b'foo3' * 3,
b'foo4' * 4,
b'foo5' * 5,
]
frames = [cctx.compress(c) for c in original]
b = b''.join([original[0], original[1]])
b1 = zstd.BufferWithSegments(b, struct.pack('=QQQQ',
0, len(original[0]),
len(original[0]), len(original[1])))
b = b''.join([original[2], original[3], original[4]])
b2 = zstd.BufferWithSegments(b, struct.pack('=QQQQQQ',
0, len(original[2]),
len(original[2]), len(original[3]),
len(original[2]) + len(original[3]), len(original[4])))
c = zstd.BufferWithSegmentsCollection(b1, b2)
result = cctx.multi_compress_to_buffer(c)
self.assertEqual(len(result), len(frames))
for i, frame in enumerate(frames):
self.assertEqual(result[i].tobytes(), frame)
def test_multiple_threads(self):
# threads argument will cause multi-threaded ZSTD APIs to be used, which will
# make output different.
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 refcctx = zstd.ZstdCompressor(write_checksum=True)
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 reference = [refcctx.compress(b'x' * 64), refcctx.compress(b'y' * 64)]
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 cctx = zstd.ZstdCompressor(write_checksum=True)
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 if not hasattr(cctx, 'multi_compress_to_buffer'):
self.skipTest('multi_compress_to_buffer not available')
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 frames = []
frames.extend(b'x' * 64 for i in range(256))
frames.extend(b'y' * 64 for i in range(256))
result = cctx.multi_compress_to_buffer(frames, threads=-1)
self.assertEqual(len(result), 512)
for i in range(512):
if i < 256:
self.assertEqual(result[i].tobytes(), reference[0])
else:
self.assertEqual(result[i].tobytes(), reference[1])