##// END OF EJS Templates
match: simplify nevermatcher...
match: simplify nevermatcher Most of it does the same as its superclass, so it can simply be removed. It also seems to make more sense for it to use relative paths, as we do for everything except alwaysmatcher, although nevermatcher.uipath() will probably never get called anyway, so it won't matter.

File last commit:

r31796:e0dc4053 default
r32650:783394c0 default
Show More
test_compressor.py
905 lines | 32.4 KiB | text/x-python | PythonLexer
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 import hashlib
import io
import struct
import sys
try:
import unittest2 as unittest
except ImportError:
import unittest
import zstd
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 from .common import (
make_cffi,
OpCountingBytesIO,
)
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435
if sys.version_info[0] >= 3:
next = lambda it: it.__next__()
else:
next = lambda it: it.next()
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 def multithreaded_chunk_size(level, source_size=0):
params = zstd.get_compression_parameters(level, source_size)
return 1 << (params.window_log + 2)
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 @make_cffi
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 class TestCompressor(unittest.TestCase):
def test_level_bounds(self):
with self.assertRaises(ValueError):
zstd.ZstdCompressor(level=0)
with self.assertRaises(ValueError):
zstd.ZstdCompressor(level=23)
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 @make_cffi
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 class TestCompressor_compress(unittest.TestCase):
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 def test_multithreaded_unsupported(self):
samples = []
for i in range(128):
samples.append(b'foo' * 64)
samples.append(b'bar' * 64)
d = zstd.train_dictionary(8192, samples)
cctx = zstd.ZstdCompressor(dict_data=d, threads=2)
with self.assertRaisesRegexp(zstd.ZstdError, 'compress\(\) cannot be used with both dictionaries and multi-threaded compression'):
cctx.compress(b'foo')
params = zstd.get_compression_parameters(3)
cctx = zstd.ZstdCompressor(compression_params=params, threads=2)
with self.assertRaisesRegexp(zstd.ZstdError, 'compress\(\) cannot be used with both compression parameters and multi-threaded compression'):
cctx.compress(b'foo')
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 def test_compress_empty(self):
cctx = zstd.ZstdCompressor(level=1)
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 result = cctx.compress(b'')
self.assertEqual(result, b'\x28\xb5\x2f\xfd\x00\x48\x01\x00\x00')
params = zstd.get_frame_parameters(result)
self.assertEqual(params.content_size, 0)
self.assertEqual(params.window_size, 524288)
self.assertEqual(params.dict_id, 0)
self.assertFalse(params.has_checksum, 0)
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435
Gregory Szorc
zstd: vendor python-zstandard 0.6.0...
r30822 # TODO should be temporary until https://github.com/facebook/zstd/issues/506
# is fixed.
cctx = zstd.ZstdCompressor(write_content_size=True)
with self.assertRaises(ValueError):
cctx.compress(b'')
cctx.compress(b'', allow_empty=True)
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 def test_compress_large(self):
chunks = []
for i in range(255):
chunks.append(struct.Struct('>B').pack(i) * 16384)
cctx = zstd.ZstdCompressor(level=3)
result = cctx.compress(b''.join(chunks))
self.assertEqual(len(result), 999)
self.assertEqual(result[0:4], b'\x28\xb5\x2f\xfd')
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 # This matches the test for read_from() below.
cctx = zstd.ZstdCompressor(level=1)
result = cctx.compress(b'f' * zstd.COMPRESSION_RECOMMENDED_INPUT_SIZE + b'o')
self.assertEqual(result, b'\x28\xb5\x2f\xfd\x00\x40\x54\x00\x00'
b'\x10\x66\x66\x01\x00\xfb\xff\x39\xc0'
b'\x02\x09\x00\x00\x6f')
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 def test_write_checksum(self):
cctx = zstd.ZstdCompressor(level=1)
no_checksum = cctx.compress(b'foobar')
cctx = zstd.ZstdCompressor(level=1, write_checksum=True)
with_checksum = cctx.compress(b'foobar')
self.assertEqual(len(with_checksum), len(no_checksum) + 4)
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 no_params = zstd.get_frame_parameters(no_checksum)
with_params = zstd.get_frame_parameters(with_checksum)
self.assertFalse(no_params.has_checksum)
self.assertTrue(with_params.has_checksum)
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 def test_write_content_size(self):
cctx = zstd.ZstdCompressor(level=1)
no_size = cctx.compress(b'foobar' * 256)
cctx = zstd.ZstdCompressor(level=1, write_content_size=True)
with_size = cctx.compress(b'foobar' * 256)
self.assertEqual(len(with_size), len(no_size) + 1)
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 no_params = zstd.get_frame_parameters(no_size)
with_params = zstd.get_frame_parameters(with_size)
self.assertEqual(no_params.content_size, 0)
self.assertEqual(with_params.content_size, 1536)
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 def test_no_dict_id(self):
samples = []
for i in range(128):
samples.append(b'foo' * 64)
samples.append(b'bar' * 64)
samples.append(b'foobar' * 64)
d = zstd.train_dictionary(1024, samples)
cctx = zstd.ZstdCompressor(level=1, dict_data=d)
with_dict_id = cctx.compress(b'foobarfoobar')
cctx = zstd.ZstdCompressor(level=1, dict_data=d, write_dict_id=False)
no_dict_id = cctx.compress(b'foobarfoobar')
self.assertEqual(len(with_dict_id), len(no_dict_id) + 4)
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 no_params = zstd.get_frame_parameters(no_dict_id)
with_params = zstd.get_frame_parameters(with_dict_id)
self.assertEqual(no_params.dict_id, 0)
self.assertEqual(with_params.dict_id, 1584102229)
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 def test_compress_dict_multiple(self):
samples = []
for i in range(128):
samples.append(b'foo' * 64)
samples.append(b'bar' * 64)
samples.append(b'foobar' * 64)
d = zstd.train_dictionary(8192, samples)
cctx = zstd.ZstdCompressor(level=1, dict_data=d)
for i in range(32):
cctx.compress(b'foo bar foobar foo bar foobar')
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 def test_multithreaded(self):
chunk_size = multithreaded_chunk_size(1)
source = b''.join([b'x' * chunk_size, b'y' * chunk_size])
cctx = zstd.ZstdCompressor(level=1, threads=2)
compressed = cctx.compress(source)
params = zstd.get_frame_parameters(compressed)
self.assertEqual(params.content_size, chunk_size * 2)
self.assertEqual(params.dict_id, 0)
self.assertFalse(params.has_checksum)
dctx = zstd.ZstdDecompressor()
self.assertEqual(dctx.decompress(compressed), source)
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 @make_cffi
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 class TestCompressor_compressobj(unittest.TestCase):
def test_compressobj_empty(self):
cctx = zstd.ZstdCompressor(level=1)
cobj = cctx.compressobj()
self.assertEqual(cobj.compress(b''), b'')
self.assertEqual(cobj.flush(),
b'\x28\xb5\x2f\xfd\x00\x48\x01\x00\x00')
def test_compressobj_large(self):
chunks = []
for i in range(255):
chunks.append(struct.Struct('>B').pack(i) * 16384)
cctx = zstd.ZstdCompressor(level=3)
cobj = cctx.compressobj()
result = cobj.compress(b''.join(chunks)) + cobj.flush()
self.assertEqual(len(result), 999)
self.assertEqual(result[0:4], b'\x28\xb5\x2f\xfd')
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 params = zstd.get_frame_parameters(result)
self.assertEqual(params.content_size, 0)
self.assertEqual(params.window_size, 1048576)
self.assertEqual(params.dict_id, 0)
self.assertFalse(params.has_checksum)
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 def test_write_checksum(self):
cctx = zstd.ZstdCompressor(level=1)
cobj = cctx.compressobj()
no_checksum = cobj.compress(b'foobar') + cobj.flush()
cctx = zstd.ZstdCompressor(level=1, write_checksum=True)
cobj = cctx.compressobj()
with_checksum = cobj.compress(b'foobar') + cobj.flush()
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 no_params = zstd.get_frame_parameters(no_checksum)
with_params = zstd.get_frame_parameters(with_checksum)
self.assertEqual(no_params.content_size, 0)
self.assertEqual(with_params.content_size, 0)
self.assertEqual(no_params.dict_id, 0)
self.assertEqual(with_params.dict_id, 0)
self.assertFalse(no_params.has_checksum)
self.assertTrue(with_params.has_checksum)
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 self.assertEqual(len(with_checksum), len(no_checksum) + 4)
def test_write_content_size(self):
cctx = zstd.ZstdCompressor(level=1)
cobj = cctx.compressobj(size=len(b'foobar' * 256))
no_size = cobj.compress(b'foobar' * 256) + cobj.flush()
cctx = zstd.ZstdCompressor(level=1, write_content_size=True)
cobj = cctx.compressobj(size=len(b'foobar' * 256))
with_size = cobj.compress(b'foobar' * 256) + cobj.flush()
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 no_params = zstd.get_frame_parameters(no_size)
with_params = zstd.get_frame_parameters(with_size)
self.assertEqual(no_params.content_size, 0)
self.assertEqual(with_params.content_size, 1536)
self.assertEqual(no_params.dict_id, 0)
self.assertEqual(with_params.dict_id, 0)
self.assertFalse(no_params.has_checksum)
self.assertFalse(with_params.has_checksum)
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 self.assertEqual(len(with_size), len(no_size) + 1)
Gregory Szorc
zstd: vendor python-zstandard 0.6.0...
r30822 def test_compress_after_finished(self):
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 cctx = zstd.ZstdCompressor()
cobj = cctx.compressobj()
cobj.compress(b'foo')
cobj.flush()
Gregory Szorc
zstd: vendor python-zstandard 0.6.0...
r30822 with self.assertRaisesRegexp(zstd.ZstdError, 'cannot call compress\(\) after compressor'):
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 cobj.compress(b'foo')
Gregory Szorc
zstd: vendor python-zstandard 0.6.0...
r30822 with self.assertRaisesRegexp(zstd.ZstdError, 'compressor object already finished'):
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 cobj.flush()
Gregory Szorc
zstd: vendor python-zstandard 0.6.0...
r30822 def test_flush_block_repeated(self):
cctx = zstd.ZstdCompressor(level=1)
cobj = cctx.compressobj()
self.assertEqual(cobj.compress(b'foo'), b'')
self.assertEqual(cobj.flush(zstd.COMPRESSOBJ_FLUSH_BLOCK),
b'\x28\xb5\x2f\xfd\x00\x48\x18\x00\x00foo')
self.assertEqual(cobj.compress(b'bar'), b'')
# 3 byte header plus content.
self.assertEqual(cobj.flush(), b'\x19\x00\x00bar')
def test_flush_empty_block(self):
cctx = zstd.ZstdCompressor(write_checksum=True)
cobj = cctx.compressobj()
cobj.compress(b'foobar')
cobj.flush(zstd.COMPRESSOBJ_FLUSH_BLOCK)
# No-op if no block is active (this is internal to zstd).
self.assertEqual(cobj.flush(zstd.COMPRESSOBJ_FLUSH_BLOCK), b'')
trailing = cobj.flush()
# 3 bytes block header + 4 bytes frame checksum
self.assertEqual(len(trailing), 7)
header = trailing[0:3]
self.assertEqual(header, b'\x01\x00\x00')
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 def test_multithreaded(self):
source = io.BytesIO()
source.write(b'a' * 1048576)
source.write(b'b' * 1048576)
source.write(b'c' * 1048576)
source.seek(0)
cctx = zstd.ZstdCompressor(level=1, threads=2)
cobj = cctx.compressobj()
chunks = []
while True:
d = source.read(8192)
if not d:
break
chunks.append(cobj.compress(d))
chunks.append(cobj.flush())
compressed = b''.join(chunks)
self.assertEqual(len(compressed), 295)
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 @make_cffi
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 class TestCompressor_copy_stream(unittest.TestCase):
def test_no_read(self):
source = object()
dest = io.BytesIO()
cctx = zstd.ZstdCompressor()
with self.assertRaises(ValueError):
cctx.copy_stream(source, dest)
def test_no_write(self):
source = io.BytesIO()
dest = object()
cctx = zstd.ZstdCompressor()
with self.assertRaises(ValueError):
cctx.copy_stream(source, dest)
def test_empty(self):
source = io.BytesIO()
dest = io.BytesIO()
cctx = zstd.ZstdCompressor(level=1)
r, w = cctx.copy_stream(source, dest)
self.assertEqual(int(r), 0)
self.assertEqual(w, 9)
self.assertEqual(dest.getvalue(),
b'\x28\xb5\x2f\xfd\x00\x48\x01\x00\x00')
def test_large_data(self):
source = io.BytesIO()
for i in range(255):
source.write(struct.Struct('>B').pack(i) * 16384)
source.seek(0)
dest = io.BytesIO()
cctx = zstd.ZstdCompressor()
r, w = cctx.copy_stream(source, dest)
self.assertEqual(r, 255 * 16384)
self.assertEqual(w, 999)
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 params = zstd.get_frame_parameters(dest.getvalue())
self.assertEqual(params.content_size, 0)
self.assertEqual(params.window_size, 1048576)
self.assertEqual(params.dict_id, 0)
self.assertFalse(params.has_checksum)
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 def test_write_checksum(self):
source = io.BytesIO(b'foobar')
no_checksum = io.BytesIO()
cctx = zstd.ZstdCompressor(level=1)
cctx.copy_stream(source, no_checksum)
source.seek(0)
with_checksum = io.BytesIO()
cctx = zstd.ZstdCompressor(level=1, write_checksum=True)
cctx.copy_stream(source, with_checksum)
self.assertEqual(len(with_checksum.getvalue()),
len(no_checksum.getvalue()) + 4)
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 no_params = zstd.get_frame_parameters(no_checksum.getvalue())
with_params = zstd.get_frame_parameters(with_checksum.getvalue())
self.assertEqual(no_params.content_size, 0)
self.assertEqual(with_params.content_size, 0)
self.assertEqual(no_params.dict_id, 0)
self.assertEqual(with_params.dict_id, 0)
self.assertFalse(no_params.has_checksum)
self.assertTrue(with_params.has_checksum)
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 def test_write_content_size(self):
source = io.BytesIO(b'foobar' * 256)
no_size = io.BytesIO()
cctx = zstd.ZstdCompressor(level=1)
cctx.copy_stream(source, no_size)
source.seek(0)
with_size = io.BytesIO()
cctx = zstd.ZstdCompressor(level=1, write_content_size=True)
cctx.copy_stream(source, with_size)
# Source content size is unknown, so no content size written.
self.assertEqual(len(with_size.getvalue()),
len(no_size.getvalue()))
source.seek(0)
with_size = io.BytesIO()
cctx.copy_stream(source, with_size, size=len(source.getvalue()))
# We specified source size, so content size header is present.
self.assertEqual(len(with_size.getvalue()),
len(no_size.getvalue()) + 1)
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 no_params = zstd.get_frame_parameters(no_size.getvalue())
with_params = zstd.get_frame_parameters(with_size.getvalue())
self.assertEqual(no_params.content_size, 0)
self.assertEqual(with_params.content_size, 1536)
self.assertEqual(no_params.dict_id, 0)
self.assertEqual(with_params.dict_id, 0)
self.assertFalse(no_params.has_checksum)
self.assertFalse(with_params.has_checksum)
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 def test_read_write_size(self):
source = OpCountingBytesIO(b'foobarfoobar')
dest = OpCountingBytesIO()
cctx = zstd.ZstdCompressor()
r, w = cctx.copy_stream(source, dest, read_size=1, write_size=1)
self.assertEqual(r, len(source.getvalue()))
self.assertEqual(w, 21)
self.assertEqual(source._read_count, len(source.getvalue()) + 1)
self.assertEqual(dest._write_count, len(dest.getvalue()))
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 def test_multithreaded(self):
source = io.BytesIO()
source.write(b'a' * 1048576)
source.write(b'b' * 1048576)
source.write(b'c' * 1048576)
source.seek(0)
dest = io.BytesIO()
cctx = zstd.ZstdCompressor(threads=2)
r, w = cctx.copy_stream(source, dest)
self.assertEqual(r, 3145728)
self.assertEqual(w, 295)
params = zstd.get_frame_parameters(dest.getvalue())
self.assertEqual(params.content_size, 0)
self.assertEqual(params.dict_id, 0)
self.assertFalse(params.has_checksum)
# Writing content size and checksum works.
cctx = zstd.ZstdCompressor(threads=2, write_content_size=True,
write_checksum=True)
dest = io.BytesIO()
source.seek(0)
cctx.copy_stream(source, dest, size=len(source.getvalue()))
params = zstd.get_frame_parameters(dest.getvalue())
self.assertEqual(params.content_size, 3145728)
self.assertEqual(params.dict_id, 0)
self.assertTrue(params.has_checksum)
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435
def compress(data, level):
buffer = io.BytesIO()
cctx = zstd.ZstdCompressor(level=level)
with cctx.write_to(buffer) as compressor:
compressor.write(data)
return buffer.getvalue()
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 @make_cffi
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 class TestCompressor_write_to(unittest.TestCase):
def test_empty(self):
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 result = compress(b'', 1)
self.assertEqual(result, b'\x28\xb5\x2f\xfd\x00\x48\x01\x00\x00')
params = zstd.get_frame_parameters(result)
self.assertEqual(params.content_size, 0)
self.assertEqual(params.window_size, 524288)
self.assertEqual(params.dict_id, 0)
self.assertFalse(params.has_checksum)
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435
def test_multiple_compress(self):
buffer = io.BytesIO()
cctx = zstd.ZstdCompressor(level=5)
with cctx.write_to(buffer) as compressor:
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 self.assertEqual(compressor.write(b'foo'), 0)
self.assertEqual(compressor.write(b'bar'), 0)
self.assertEqual(compressor.write(b'x' * 8192), 0)
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435
result = buffer.getvalue()
self.assertEqual(result,
b'\x28\xb5\x2f\xfd\x00\x50\x75\x00\x00\x38\x66\x6f'
b'\x6f\x62\x61\x72\x78\x01\x00\xfc\xdf\x03\x23')
def test_dictionary(self):
samples = []
for i in range(128):
samples.append(b'foo' * 64)
samples.append(b'bar' * 64)
samples.append(b'foobar' * 64)
d = zstd.train_dictionary(8192, samples)
buffer = io.BytesIO()
cctx = zstd.ZstdCompressor(level=9, dict_data=d)
with cctx.write_to(buffer) as compressor:
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 self.assertEqual(compressor.write(b'foo'), 0)
self.assertEqual(compressor.write(b'bar'), 0)
self.assertEqual(compressor.write(b'foo' * 16384), 634)
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435
compressed = buffer.getvalue()
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895
params = zstd.get_frame_parameters(compressed)
self.assertEqual(params.content_size, 0)
self.assertEqual(params.window_size, 1024)
self.assertEqual(params.dict_id, d.dict_id())
self.assertFalse(params.has_checksum)
self.assertEqual(compressed[0:32],
b'\x28\xb5\x2f\xfd\x03\x00\x55\x7b\x6b\x5e\x54\x00'
b'\x00\x00\x02\xfc\xf4\xa5\xba\x23\x3f\x85\xb3\x54'
b'\x00\x00\x18\x6f\x6f\x66\x01\x00')
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 h = hashlib.sha1(compressed).hexdigest()
self.assertEqual(h, '1c5bcd25181bcd8c1a73ea8773323e0056129f92')
def test_compression_params(self):
params = zstd.CompressionParameters(20, 6, 12, 5, 4, 10, zstd.STRATEGY_FAST)
buffer = io.BytesIO()
cctx = zstd.ZstdCompressor(compression_params=params)
with cctx.write_to(buffer) as compressor:
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 self.assertEqual(compressor.write(b'foo'), 0)
self.assertEqual(compressor.write(b'bar'), 0)
self.assertEqual(compressor.write(b'foobar' * 16384), 0)
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435
compressed = buffer.getvalue()
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895
params = zstd.get_frame_parameters(compressed)
self.assertEqual(params.content_size, 0)
self.assertEqual(params.window_size, 1048576)
self.assertEqual(params.dict_id, 0)
self.assertFalse(params.has_checksum)
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 h = hashlib.sha1(compressed).hexdigest()
self.assertEqual(h, '1ae31f270ed7de14235221a604b31ecd517ebd99')
def test_write_checksum(self):
no_checksum = io.BytesIO()
cctx = zstd.ZstdCompressor(level=1)
with cctx.write_to(no_checksum) as compressor:
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 self.assertEqual(compressor.write(b'foobar'), 0)
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435
with_checksum = io.BytesIO()
cctx = zstd.ZstdCompressor(level=1, write_checksum=True)
with cctx.write_to(with_checksum) as compressor:
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 self.assertEqual(compressor.write(b'foobar'), 0)
no_params = zstd.get_frame_parameters(no_checksum.getvalue())
with_params = zstd.get_frame_parameters(with_checksum.getvalue())
self.assertEqual(no_params.content_size, 0)
self.assertEqual(with_params.content_size, 0)
self.assertEqual(no_params.dict_id, 0)
self.assertEqual(with_params.dict_id, 0)
self.assertFalse(no_params.has_checksum)
self.assertTrue(with_params.has_checksum)
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435
self.assertEqual(len(with_checksum.getvalue()),
len(no_checksum.getvalue()) + 4)
def test_write_content_size(self):
no_size = io.BytesIO()
cctx = zstd.ZstdCompressor(level=1)
with cctx.write_to(no_size) as compressor:
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 self.assertEqual(compressor.write(b'foobar' * 256), 0)
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435
with_size = io.BytesIO()
cctx = zstd.ZstdCompressor(level=1, write_content_size=True)
with cctx.write_to(with_size) as compressor:
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 self.assertEqual(compressor.write(b'foobar' * 256), 0)
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435
# Source size is not known in streaming mode, so header not
# written.
self.assertEqual(len(with_size.getvalue()),
len(no_size.getvalue()))
# Declaring size will write the header.
with_size = io.BytesIO()
with cctx.write_to(with_size, size=len(b'foobar' * 256)) as compressor:
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 self.assertEqual(compressor.write(b'foobar' * 256), 0)
no_params = zstd.get_frame_parameters(no_size.getvalue())
with_params = zstd.get_frame_parameters(with_size.getvalue())
self.assertEqual(no_params.content_size, 0)
self.assertEqual(with_params.content_size, 1536)
self.assertEqual(no_params.dict_id, 0)
self.assertEqual(with_params.dict_id, 0)
self.assertFalse(no_params.has_checksum)
self.assertFalse(with_params.has_checksum)
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435
self.assertEqual(len(with_size.getvalue()),
len(no_size.getvalue()) + 1)
def test_no_dict_id(self):
samples = []
for i in range(128):
samples.append(b'foo' * 64)
samples.append(b'bar' * 64)
samples.append(b'foobar' * 64)
d = zstd.train_dictionary(1024, samples)
with_dict_id = io.BytesIO()
cctx = zstd.ZstdCompressor(level=1, dict_data=d)
with cctx.write_to(with_dict_id) as compressor:
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 self.assertEqual(compressor.write(b'foobarfoobar'), 0)
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435
cctx = zstd.ZstdCompressor(level=1, dict_data=d, write_dict_id=False)
no_dict_id = io.BytesIO()
with cctx.write_to(no_dict_id) as compressor:
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 self.assertEqual(compressor.write(b'foobarfoobar'), 0)
no_params = zstd.get_frame_parameters(no_dict_id.getvalue())
with_params = zstd.get_frame_parameters(with_dict_id.getvalue())
self.assertEqual(no_params.content_size, 0)
self.assertEqual(with_params.content_size, 0)
self.assertEqual(no_params.dict_id, 0)
self.assertEqual(with_params.dict_id, d.dict_id())
self.assertFalse(no_params.has_checksum)
self.assertFalse(with_params.has_checksum)
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435
self.assertEqual(len(with_dict_id.getvalue()),
len(no_dict_id.getvalue()) + 4)
def test_memory_size(self):
cctx = zstd.ZstdCompressor(level=3)
buffer = io.BytesIO()
with cctx.write_to(buffer) as compressor:
size = compressor.memory_size()
self.assertGreater(size, 100000)
def test_write_size(self):
cctx = zstd.ZstdCompressor(level=3)
dest = OpCountingBytesIO()
with cctx.write_to(dest, write_size=1) as compressor:
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 self.assertEqual(compressor.write(b'foo'), 0)
self.assertEqual(compressor.write(b'bar'), 0)
self.assertEqual(compressor.write(b'foobar'), 0)
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435
self.assertEqual(len(dest.getvalue()), dest._write_count)
Gregory Szorc
zstd: vendor python-zstandard 0.6.0...
r30822 def test_flush_repeated(self):
cctx = zstd.ZstdCompressor(level=3)
dest = OpCountingBytesIO()
with cctx.write_to(dest) as compressor:
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 self.assertEqual(compressor.write(b'foo'), 0)
Gregory Szorc
zstd: vendor python-zstandard 0.6.0...
r30822 self.assertEqual(dest._write_count, 0)
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 self.assertEqual(compressor.flush(), 12)
Gregory Szorc
zstd: vendor python-zstandard 0.6.0...
r30822 self.assertEqual(dest._write_count, 1)
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 self.assertEqual(compressor.write(b'bar'), 0)
Gregory Szorc
zstd: vendor python-zstandard 0.6.0...
r30822 self.assertEqual(dest._write_count, 1)
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 self.assertEqual(compressor.flush(), 6)
Gregory Szorc
zstd: vendor python-zstandard 0.6.0...
r30822 self.assertEqual(dest._write_count, 2)
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 self.assertEqual(compressor.write(b'baz'), 0)
Gregory Szorc
zstd: vendor python-zstandard 0.6.0...
r30822
self.assertEqual(dest._write_count, 3)
def test_flush_empty_block(self):
cctx = zstd.ZstdCompressor(level=3, write_checksum=True)
dest = OpCountingBytesIO()
with cctx.write_to(dest) as compressor:
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 self.assertEqual(compressor.write(b'foobar' * 8192), 0)
Gregory Szorc
zstd: vendor python-zstandard 0.6.0...
r30822 count = dest._write_count
offset = dest.tell()
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 self.assertEqual(compressor.flush(), 23)
Gregory Szorc
zstd: vendor python-zstandard 0.6.0...
r30822 self.assertGreater(dest._write_count, count)
self.assertGreater(dest.tell(), offset)
offset = dest.tell()
# Ending the write here should cause an empty block to be written
# to denote end of frame.
trailing = dest.getvalue()[offset:]
# 3 bytes block header + 4 bytes frame checksum
self.assertEqual(len(trailing), 7)
header = trailing[0:3]
self.assertEqual(header, b'\x01\x00\x00')
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 def test_multithreaded(self):
dest = io.BytesIO()
cctx = zstd.ZstdCompressor(threads=2)
with cctx.write_to(dest) as compressor:
compressor.write(b'a' * 1048576)
compressor.write(b'b' * 1048576)
compressor.write(b'c' * 1048576)
self.assertEqual(len(dest.getvalue()), 295)
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 @make_cffi
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 class TestCompressor_read_from(unittest.TestCase):
def test_type_validation(self):
cctx = zstd.ZstdCompressor()
# Object with read() works.
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 for chunk in cctx.read_from(io.BytesIO()):
pass
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435
# Buffer protocol works.
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 for chunk in cctx.read_from(b'foobar'):
pass
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435
with self.assertRaisesRegexp(ValueError, 'must pass an object with a read'):
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 for chunk in cctx.read_from(True):
pass
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435
def test_read_empty(self):
cctx = zstd.ZstdCompressor(level=1)
source = io.BytesIO()
it = cctx.read_from(source)
chunks = list(it)
self.assertEqual(len(chunks), 1)
compressed = b''.join(chunks)
self.assertEqual(compressed, b'\x28\xb5\x2f\xfd\x00\x48\x01\x00\x00')
# And again with the buffer protocol.
it = cctx.read_from(b'')
chunks = list(it)
self.assertEqual(len(chunks), 1)
compressed2 = b''.join(chunks)
self.assertEqual(compressed2, compressed)
def test_read_large(self):
cctx = zstd.ZstdCompressor(level=1)
source = io.BytesIO()
source.write(b'f' * zstd.COMPRESSION_RECOMMENDED_INPUT_SIZE)
source.write(b'o')
source.seek(0)
# Creating an iterator should not perform any compression until
# first read.
it = cctx.read_from(source, size=len(source.getvalue()))
self.assertEqual(source.tell(), 0)
# We should have exactly 2 output chunks.
chunks = []
chunk = next(it)
self.assertIsNotNone(chunk)
self.assertEqual(source.tell(), zstd.COMPRESSION_RECOMMENDED_INPUT_SIZE)
chunks.append(chunk)
chunk = next(it)
self.assertIsNotNone(chunk)
chunks.append(chunk)
self.assertEqual(source.tell(), len(source.getvalue()))
with self.assertRaises(StopIteration):
next(it)
# And again for good measure.
with self.assertRaises(StopIteration):
next(it)
# We should get the same output as the one-shot compression mechanism.
self.assertEqual(b''.join(chunks), cctx.compress(source.getvalue()))
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 params = zstd.get_frame_parameters(b''.join(chunks))
self.assertEqual(params.content_size, 0)
self.assertEqual(params.window_size, 262144)
self.assertEqual(params.dict_id, 0)
self.assertFalse(params.has_checksum)
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 # Now check the buffer protocol.
it = cctx.read_from(source.getvalue())
chunks = list(it)
self.assertEqual(len(chunks), 2)
self.assertEqual(b''.join(chunks), cctx.compress(source.getvalue()))
def test_read_write_size(self):
source = OpCountingBytesIO(b'foobarfoobar')
cctx = zstd.ZstdCompressor(level=3)
for chunk in cctx.read_from(source, read_size=1, write_size=1):
self.assertEqual(len(chunk), 1)
self.assertEqual(source._read_count, len(source.getvalue()) + 1)
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796
def test_multithreaded(self):
source = io.BytesIO()
source.write(b'a' * 1048576)
source.write(b'b' * 1048576)
source.write(b'c' * 1048576)
source.seek(0)
cctx = zstd.ZstdCompressor(threads=2)
compressed = b''.join(cctx.read_from(source))
self.assertEqual(len(compressed), 295)
class TestCompressor_multi_compress_to_buffer(unittest.TestCase):
def test_multithreaded_unsupported(self):
cctx = zstd.ZstdCompressor(threads=2)
with self.assertRaisesRegexp(zstd.ZstdError, 'function cannot be called on ZstdCompressor configured for multi-threaded compression'):
cctx.multi_compress_to_buffer([b'foo'])
def test_invalid_inputs(self):
cctx = zstd.ZstdCompressor()
with self.assertRaises(TypeError):
cctx.multi_compress_to_buffer(True)
with self.assertRaises(TypeError):
cctx.multi_compress_to_buffer((1, 2))
with self.assertRaisesRegexp(TypeError, 'item 0 not a bytes like object'):
cctx.multi_compress_to_buffer([u'foo'])
def test_empty_input(self):
cctx = zstd.ZstdCompressor()
with self.assertRaisesRegexp(ValueError, 'no source elements found'):
cctx.multi_compress_to_buffer([])
with self.assertRaisesRegexp(ValueError, 'source elements are empty'):
cctx.multi_compress_to_buffer([b'', b'', b''])
def test_list_input(self):
cctx = zstd.ZstdCompressor(write_content_size=True, write_checksum=True)
original = [b'foo' * 12, b'bar' * 6]
frames = [cctx.compress(c) for c in original]
b = cctx.multi_compress_to_buffer(original)
self.assertIsInstance(b, zstd.BufferWithSegmentsCollection)
self.assertEqual(len(b), 2)
self.assertEqual(b.size(), 44)
self.assertEqual(b[0].tobytes(), frames[0])
self.assertEqual(b[1].tobytes(), frames[1])
def test_buffer_with_segments_input(self):
cctx = zstd.ZstdCompressor(write_content_size=True, write_checksum=True)
original = [b'foo' * 4, b'bar' * 6]
frames = [cctx.compress(c) for c in original]
offsets = struct.pack('=QQQQ', 0, len(original[0]),
len(original[0]), len(original[1]))
segments = zstd.BufferWithSegments(b''.join(original), offsets)
result = cctx.multi_compress_to_buffer(segments)
self.assertEqual(len(result), 2)
self.assertEqual(result.size(), 47)
self.assertEqual(result[0].tobytes(), frames[0])
self.assertEqual(result[1].tobytes(), frames[1])
def test_buffer_with_segments_collection_input(self):
cctx = zstd.ZstdCompressor(write_content_size=True, write_checksum=True)
original = [
b'foo1',
b'foo2' * 2,
b'foo3' * 3,
b'foo4' * 4,
b'foo5' * 5,
]
frames = [cctx.compress(c) for c in original]
b = b''.join([original[0], original[1]])
b1 = zstd.BufferWithSegments(b, struct.pack('=QQQQ',
0, len(original[0]),
len(original[0]), len(original[1])))
b = b''.join([original[2], original[3], original[4]])
b2 = zstd.BufferWithSegments(b, struct.pack('=QQQQQQ',
0, len(original[2]),
len(original[2]), len(original[3]),
len(original[2]) + len(original[3]), len(original[4])))
c = zstd.BufferWithSegmentsCollection(b1, b2)
result = cctx.multi_compress_to_buffer(c)
self.assertEqual(len(result), len(frames))
for i, frame in enumerate(frames):
self.assertEqual(result[i].tobytes(), frame)
def test_multiple_threads(self):
# threads argument will cause multi-threaded ZSTD APIs to be used, which will
# make output different.
refcctx = zstd.ZstdCompressor(write_content_size=True, write_checksum=True)
reference = [refcctx.compress(b'x' * 64), refcctx.compress(b'y' * 64)]
cctx = zstd.ZstdCompressor(write_content_size=True, write_checksum=True)
frames = []
frames.extend(b'x' * 64 for i in range(256))
frames.extend(b'y' * 64 for i in range(256))
result = cctx.multi_compress_to_buffer(frames, threads=-1)
self.assertEqual(len(result), 512)
for i in range(512):
if i < 256:
self.assertEqual(result[i].tobytes(), reference[0])
else:
self.assertEqual(result[i].tobytes(), reference[1])