test_compressor_fuzzing.py
710 lines
| 28.7 KiB
| text/x-python
|
PythonLexer
Gregory Szorc
|
r31796 | import io | |
import os | |||
Gregory Szorc
|
r37513 | import unittest | |
Gregory Szorc
|
r31796 | ||
try: | |||
import hypothesis | |||
import hypothesis.strategies as strategies | |||
except ImportError: | |||
raise unittest.SkipTest('hypothesis not available') | |||
Gregory Szorc
|
r37513 | import zstandard as zstd | |
Gregory Szorc
|
r31796 | ||
from . common import ( | |||
make_cffi, | |||
Gregory Szorc
|
r42237 | NonClosingBytesIO, | |
Gregory Szorc
|
r31796 | random_input_data, | |
) | |||
@unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set') | |||
@make_cffi | |||
Gregory Szorc
|
r37513 | class TestCompressor_stream_reader_fuzzing(unittest.TestCase): | |
Gregory Szorc
|
r42237 | @hypothesis.settings( | |
suppress_health_check=[hypothesis.HealthCheck.large_base_example]) | |||
@hypothesis.given(original=strategies.sampled_from(random_input_data()), | |||
level=strategies.integers(min_value=1, max_value=5), | |||
source_read_size=strategies.integers(1, 16384), | |||
read_size=strategies.integers(-1, zstd.COMPRESSION_RECOMMENDED_OUTPUT_SIZE)) | |||
def test_stream_source_read(self, original, level, source_read_size, | |||
read_size): | |||
if read_size == 0: | |||
read_size = -1 | |||
refctx = zstd.ZstdCompressor(level=level) | |||
ref_frame = refctx.compress(original) | |||
cctx = zstd.ZstdCompressor(level=level) | |||
with cctx.stream_reader(io.BytesIO(original), size=len(original), | |||
read_size=source_read_size) as reader: | |||
chunks = [] | |||
while True: | |||
chunk = reader.read(read_size) | |||
if not chunk: | |||
break | |||
chunks.append(chunk) | |||
self.assertEqual(b''.join(chunks), ref_frame) | |||
@hypothesis.settings( | |||
suppress_health_check=[hypothesis.HealthCheck.large_base_example]) | |||
@hypothesis.given(original=strategies.sampled_from(random_input_data()), | |||
level=strategies.integers(min_value=1, max_value=5), | |||
source_read_size=strategies.integers(1, 16384), | |||
read_size=strategies.integers(-1, zstd.COMPRESSION_RECOMMENDED_OUTPUT_SIZE)) | |||
def test_buffer_source_read(self, original, level, source_read_size, | |||
read_size): | |||
if read_size == 0: | |||
read_size = -1 | |||
refctx = zstd.ZstdCompressor(level=level) | |||
ref_frame = refctx.compress(original) | |||
cctx = zstd.ZstdCompressor(level=level) | |||
with cctx.stream_reader(original, size=len(original), | |||
read_size=source_read_size) as reader: | |||
chunks = [] | |||
while True: | |||
chunk = reader.read(read_size) | |||
if not chunk: | |||
break | |||
chunks.append(chunk) | |||
self.assertEqual(b''.join(chunks), ref_frame) | |||
@hypothesis.settings( | |||
suppress_health_check=[hypothesis.HealthCheck.large_base_example]) | |||
Gregory Szorc
|
r37513 | @hypothesis.given(original=strategies.sampled_from(random_input_data()), | |
level=strategies.integers(min_value=1, max_value=5), | |||
source_read_size=strategies.integers(1, 16384), | |||
read_sizes=strategies.data()) | |||
def test_stream_source_read_variance(self, original, level, source_read_size, | |||
read_sizes): | |||
refctx = zstd.ZstdCompressor(level=level) | |||
ref_frame = refctx.compress(original) | |||
cctx = zstd.ZstdCompressor(level=level) | |||
with cctx.stream_reader(io.BytesIO(original), size=len(original), | |||
read_size=source_read_size) as reader: | |||
chunks = [] | |||
while True: | |||
Gregory Szorc
|
r42237 | read_size = read_sizes.draw(strategies.integers(-1, 16384)) | |
Gregory Szorc
|
r37513 | chunk = reader.read(read_size) | |
Gregory Szorc
|
r42237 | if not chunk and read_size: | |
break | |||
Gregory Szorc
|
r37513 | ||
chunks.append(chunk) | |||
self.assertEqual(b''.join(chunks), ref_frame) | |||
Gregory Szorc
|
r42237 | @hypothesis.settings( | |
suppress_health_check=[hypothesis.HealthCheck.large_base_example]) | |||
Gregory Szorc
|
r37513 | @hypothesis.given(original=strategies.sampled_from(random_input_data()), | |
level=strategies.integers(min_value=1, max_value=5), | |||
source_read_size=strategies.integers(1, 16384), | |||
read_sizes=strategies.data()) | |||
def test_buffer_source_read_variance(self, original, level, source_read_size, | |||
read_sizes): | |||
refctx = zstd.ZstdCompressor(level=level) | |||
ref_frame = refctx.compress(original) | |||
cctx = zstd.ZstdCompressor(level=level) | |||
with cctx.stream_reader(original, size=len(original), | |||
read_size=source_read_size) as reader: | |||
chunks = [] | |||
while True: | |||
Gregory Szorc
|
r42237 | read_size = read_sizes.draw(strategies.integers(-1, 16384)) | |
chunk = reader.read(read_size) | |||
if not chunk and read_size: | |||
break | |||
chunks.append(chunk) | |||
self.assertEqual(b''.join(chunks), ref_frame) | |||
@hypothesis.settings( | |||
suppress_health_check=[hypothesis.HealthCheck.large_base_example]) | |||
@hypothesis.given(original=strategies.sampled_from(random_input_data()), | |||
level=strategies.integers(min_value=1, max_value=5), | |||
source_read_size=strategies.integers(1, 16384), | |||
read_size=strategies.integers(1, zstd.COMPRESSION_RECOMMENDED_OUTPUT_SIZE)) | |||
def test_stream_source_readinto(self, original, level, | |||
source_read_size, read_size): | |||
refctx = zstd.ZstdCompressor(level=level) | |||
ref_frame = refctx.compress(original) | |||
cctx = zstd.ZstdCompressor(level=level) | |||
with cctx.stream_reader(io.BytesIO(original), size=len(original), | |||
read_size=source_read_size) as reader: | |||
chunks = [] | |||
while True: | |||
b = bytearray(read_size) | |||
count = reader.readinto(b) | |||
if not count: | |||
break | |||
chunks.append(bytes(b[0:count])) | |||
self.assertEqual(b''.join(chunks), ref_frame) | |||
@hypothesis.settings( | |||
suppress_health_check=[hypothesis.HealthCheck.large_base_example]) | |||
@hypothesis.given(original=strategies.sampled_from(random_input_data()), | |||
level=strategies.integers(min_value=1, max_value=5), | |||
source_read_size=strategies.integers(1, 16384), | |||
read_size=strategies.integers(1, zstd.COMPRESSION_RECOMMENDED_OUTPUT_SIZE)) | |||
def test_buffer_source_readinto(self, original, level, | |||
source_read_size, read_size): | |||
refctx = zstd.ZstdCompressor(level=level) | |||
ref_frame = refctx.compress(original) | |||
cctx = zstd.ZstdCompressor(level=level) | |||
with cctx.stream_reader(original, size=len(original), | |||
read_size=source_read_size) as reader: | |||
chunks = [] | |||
while True: | |||
b = bytearray(read_size) | |||
count = reader.readinto(b) | |||
if not count: | |||
break | |||
chunks.append(bytes(b[0:count])) | |||
self.assertEqual(b''.join(chunks), ref_frame) | |||
@hypothesis.settings( | |||
suppress_health_check=[hypothesis.HealthCheck.large_base_example]) | |||
@hypothesis.given(original=strategies.sampled_from(random_input_data()), | |||
level=strategies.integers(min_value=1, max_value=5), | |||
source_read_size=strategies.integers(1, 16384), | |||
read_sizes=strategies.data()) | |||
def test_stream_source_readinto_variance(self, original, level, | |||
source_read_size, read_sizes): | |||
refctx = zstd.ZstdCompressor(level=level) | |||
ref_frame = refctx.compress(original) | |||
cctx = zstd.ZstdCompressor(level=level) | |||
with cctx.stream_reader(io.BytesIO(original), size=len(original), | |||
read_size=source_read_size) as reader: | |||
chunks = [] | |||
while True: | |||
Gregory Szorc
|
r37513 | read_size = read_sizes.draw(strategies.integers(1, 16384)) | |
Gregory Szorc
|
r42237 | b = bytearray(read_size) | |
count = reader.readinto(b) | |||
if not count: | |||
break | |||
chunks.append(bytes(b[0:count])) | |||
self.assertEqual(b''.join(chunks), ref_frame) | |||
@hypothesis.settings( | |||
suppress_health_check=[hypothesis.HealthCheck.large_base_example]) | |||
@hypothesis.given(original=strategies.sampled_from(random_input_data()), | |||
level=strategies.integers(min_value=1, max_value=5), | |||
source_read_size=strategies.integers(1, 16384), | |||
read_sizes=strategies.data()) | |||
def test_buffer_source_readinto_variance(self, original, level, | |||
source_read_size, read_sizes): | |||
refctx = zstd.ZstdCompressor(level=level) | |||
ref_frame = refctx.compress(original) | |||
cctx = zstd.ZstdCompressor(level=level) | |||
with cctx.stream_reader(original, size=len(original), | |||
read_size=source_read_size) as reader: | |||
chunks = [] | |||
while True: | |||
read_size = read_sizes.draw(strategies.integers(1, 16384)) | |||
b = bytearray(read_size) | |||
count = reader.readinto(b) | |||
if not count: | |||
break | |||
chunks.append(bytes(b[0:count])) | |||
self.assertEqual(b''.join(chunks), ref_frame) | |||
@hypothesis.settings( | |||
suppress_health_check=[hypothesis.HealthCheck.large_base_example]) | |||
@hypothesis.given(original=strategies.sampled_from(random_input_data()), | |||
level=strategies.integers(min_value=1, max_value=5), | |||
source_read_size=strategies.integers(1, 16384), | |||
read_size=strategies.integers(-1, zstd.COMPRESSION_RECOMMENDED_OUTPUT_SIZE)) | |||
def test_stream_source_read1(self, original, level, source_read_size, | |||
read_size): | |||
if read_size == 0: | |||
read_size = -1 | |||
refctx = zstd.ZstdCompressor(level=level) | |||
ref_frame = refctx.compress(original) | |||
cctx = zstd.ZstdCompressor(level=level) | |||
with cctx.stream_reader(io.BytesIO(original), size=len(original), | |||
read_size=source_read_size) as reader: | |||
chunks = [] | |||
while True: | |||
chunk = reader.read1(read_size) | |||
Gregory Szorc
|
r37513 | if not chunk: | |
break | |||
Gregory Szorc
|
r42237 | ||
Gregory Szorc
|
r37513 | chunks.append(chunk) | |
self.assertEqual(b''.join(chunks), ref_frame) | |||
Gregory Szorc
|
r42237 | @hypothesis.settings( | |
suppress_health_check=[hypothesis.HealthCheck.large_base_example]) | |||
@hypothesis.given(original=strategies.sampled_from(random_input_data()), | |||
level=strategies.integers(min_value=1, max_value=5), | |||
source_read_size=strategies.integers(1, 16384), | |||
read_size=strategies.integers(-1, zstd.COMPRESSION_RECOMMENDED_OUTPUT_SIZE)) | |||
def test_buffer_source_read1(self, original, level, source_read_size, | |||
read_size): | |||
if read_size == 0: | |||
read_size = -1 | |||
refctx = zstd.ZstdCompressor(level=level) | |||
ref_frame = refctx.compress(original) | |||
cctx = zstd.ZstdCompressor(level=level) | |||
with cctx.stream_reader(original, size=len(original), | |||
read_size=source_read_size) as reader: | |||
chunks = [] | |||
while True: | |||
chunk = reader.read1(read_size) | |||
if not chunk: | |||
break | |||
chunks.append(chunk) | |||
self.assertEqual(b''.join(chunks), ref_frame) | |||
@hypothesis.settings( | |||
suppress_health_check=[hypothesis.HealthCheck.large_base_example]) | |||
@hypothesis.given(original=strategies.sampled_from(random_input_data()), | |||
level=strategies.integers(min_value=1, max_value=5), | |||
source_read_size=strategies.integers(1, 16384), | |||
read_sizes=strategies.data()) | |||
def test_stream_source_read1_variance(self, original, level, source_read_size, | |||
read_sizes): | |||
refctx = zstd.ZstdCompressor(level=level) | |||
ref_frame = refctx.compress(original) | |||
cctx = zstd.ZstdCompressor(level=level) | |||
with cctx.stream_reader(io.BytesIO(original), size=len(original), | |||
read_size=source_read_size) as reader: | |||
chunks = [] | |||
while True: | |||
read_size = read_sizes.draw(strategies.integers(-1, 16384)) | |||
chunk = reader.read1(read_size) | |||
if not chunk and read_size: | |||
break | |||
chunks.append(chunk) | |||
self.assertEqual(b''.join(chunks), ref_frame) | |||
@hypothesis.settings( | |||
suppress_health_check=[hypothesis.HealthCheck.large_base_example]) | |||
@hypothesis.given(original=strategies.sampled_from(random_input_data()), | |||
level=strategies.integers(min_value=1, max_value=5), | |||
source_read_size=strategies.integers(1, 16384), | |||
read_sizes=strategies.data()) | |||
def test_buffer_source_read1_variance(self, original, level, source_read_size, | |||
read_sizes): | |||
refctx = zstd.ZstdCompressor(level=level) | |||
ref_frame = refctx.compress(original) | |||
cctx = zstd.ZstdCompressor(level=level) | |||
with cctx.stream_reader(original, size=len(original), | |||
read_size=source_read_size) as reader: | |||
chunks = [] | |||
while True: | |||
read_size = read_sizes.draw(strategies.integers(-1, 16384)) | |||
chunk = reader.read1(read_size) | |||
if not chunk and read_size: | |||
break | |||
chunks.append(chunk) | |||
self.assertEqual(b''.join(chunks), ref_frame) | |||
@hypothesis.settings( | |||
suppress_health_check=[hypothesis.HealthCheck.large_base_example]) | |||
@hypothesis.given(original=strategies.sampled_from(random_input_data()), | |||
level=strategies.integers(min_value=1, max_value=5), | |||
source_read_size=strategies.integers(1, 16384), | |||
read_size=strategies.integers(1, zstd.COMPRESSION_RECOMMENDED_OUTPUT_SIZE)) | |||
def test_stream_source_readinto1(self, original, level, source_read_size, | |||
read_size): | |||
if read_size == 0: | |||
read_size = -1 | |||
refctx = zstd.ZstdCompressor(level=level) | |||
ref_frame = refctx.compress(original) | |||
cctx = zstd.ZstdCompressor(level=level) | |||
with cctx.stream_reader(io.BytesIO(original), size=len(original), | |||
read_size=source_read_size) as reader: | |||
chunks = [] | |||
while True: | |||
b = bytearray(read_size) | |||
count = reader.readinto1(b) | |||
if not count: | |||
break | |||
chunks.append(bytes(b[0:count])) | |||
self.assertEqual(b''.join(chunks), ref_frame) | |||
@hypothesis.settings( | |||
suppress_health_check=[hypothesis.HealthCheck.large_base_example]) | |||
@hypothesis.given(original=strategies.sampled_from(random_input_data()), | |||
level=strategies.integers(min_value=1, max_value=5), | |||
source_read_size=strategies.integers(1, 16384), | |||
read_size=strategies.integers(1, zstd.COMPRESSION_RECOMMENDED_OUTPUT_SIZE)) | |||
def test_buffer_source_readinto1(self, original, level, source_read_size, | |||
read_size): | |||
if read_size == 0: | |||
read_size = -1 | |||
refctx = zstd.ZstdCompressor(level=level) | |||
ref_frame = refctx.compress(original) | |||
cctx = zstd.ZstdCompressor(level=level) | |||
with cctx.stream_reader(original, size=len(original), | |||
read_size=source_read_size) as reader: | |||
chunks = [] | |||
while True: | |||
b = bytearray(read_size) | |||
count = reader.readinto1(b) | |||
if not count: | |||
break | |||
chunks.append(bytes(b[0:count])) | |||
self.assertEqual(b''.join(chunks), ref_frame) | |||
@hypothesis.settings( | |||
suppress_health_check=[hypothesis.HealthCheck.large_base_example]) | |||
@hypothesis.given(original=strategies.sampled_from(random_input_data()), | |||
level=strategies.integers(min_value=1, max_value=5), | |||
source_read_size=strategies.integers(1, 16384), | |||
read_sizes=strategies.data()) | |||
def test_stream_source_readinto1_variance(self, original, level, source_read_size, | |||
read_sizes): | |||
refctx = zstd.ZstdCompressor(level=level) | |||
ref_frame = refctx.compress(original) | |||
cctx = zstd.ZstdCompressor(level=level) | |||
with cctx.stream_reader(io.BytesIO(original), size=len(original), | |||
read_size=source_read_size) as reader: | |||
chunks = [] | |||
while True: | |||
read_size = read_sizes.draw(strategies.integers(1, 16384)) | |||
b = bytearray(read_size) | |||
count = reader.readinto1(b) | |||
if not count: | |||
break | |||
chunks.append(bytes(b[0:count])) | |||
self.assertEqual(b''.join(chunks), ref_frame) | |||
@hypothesis.settings( | |||
suppress_health_check=[hypothesis.HealthCheck.large_base_example]) | |||
@hypothesis.given(original=strategies.sampled_from(random_input_data()), | |||
level=strategies.integers(min_value=1, max_value=5), | |||
source_read_size=strategies.integers(1, 16384), | |||
read_sizes=strategies.data()) | |||
def test_buffer_source_readinto1_variance(self, original, level, source_read_size, | |||
read_sizes): | |||
refctx = zstd.ZstdCompressor(level=level) | |||
ref_frame = refctx.compress(original) | |||
cctx = zstd.ZstdCompressor(level=level) | |||
with cctx.stream_reader(original, size=len(original), | |||
read_size=source_read_size) as reader: | |||
chunks = [] | |||
while True: | |||
read_size = read_sizes.draw(strategies.integers(1, 16384)) | |||
b = bytearray(read_size) | |||
count = reader.readinto1(b) | |||
if not count: | |||
break | |||
chunks.append(bytes(b[0:count])) | |||
self.assertEqual(b''.join(chunks), ref_frame) | |||
Gregory Szorc
|
r37513 | ||
@unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set') | |||
@make_cffi | |||
class TestCompressor_stream_writer_fuzzing(unittest.TestCase): | |||
Gregory Szorc
|
r31796 | @hypothesis.given(original=strategies.sampled_from(random_input_data()), | |
level=strategies.integers(min_value=1, max_value=5), | |||
write_size=strategies.integers(min_value=1, max_value=1048576)) | |||
def test_write_size_variance(self, original, level, write_size): | |||
refctx = zstd.ZstdCompressor(level=level) | |||
ref_frame = refctx.compress(original) | |||
cctx = zstd.ZstdCompressor(level=level) | |||
Gregory Szorc
|
r42237 | b = NonClosingBytesIO() | |
Gregory Szorc
|
r37513 | with cctx.stream_writer(b, size=len(original), write_size=write_size) as compressor: | |
Gregory Szorc
|
r31796 | compressor.write(original) | |
self.assertEqual(b.getvalue(), ref_frame) | |||
@unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set') | |||
@make_cffi | |||
class TestCompressor_copy_stream_fuzzing(unittest.TestCase): | |||
@hypothesis.given(original=strategies.sampled_from(random_input_data()), | |||
level=strategies.integers(min_value=1, max_value=5), | |||
read_size=strategies.integers(min_value=1, max_value=1048576), | |||
write_size=strategies.integers(min_value=1, max_value=1048576)) | |||
def test_read_write_size_variance(self, original, level, read_size, write_size): | |||
refctx = zstd.ZstdCompressor(level=level) | |||
ref_frame = refctx.compress(original) | |||
cctx = zstd.ZstdCompressor(level=level) | |||
source = io.BytesIO(original) | |||
dest = io.BytesIO() | |||
cctx.copy_stream(source, dest, size=len(original), read_size=read_size, | |||
write_size=write_size) | |||
self.assertEqual(dest.getvalue(), ref_frame) | |||
@unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set') | |||
@make_cffi | |||
class TestCompressor_compressobj_fuzzing(unittest.TestCase): | |||
Gregory Szorc
|
r37513 | @hypothesis.settings( | |
suppress_health_check=[hypothesis.HealthCheck.large_base_example]) | |||
Gregory Szorc
|
r31796 | @hypothesis.given(original=strategies.sampled_from(random_input_data()), | |
level=strategies.integers(min_value=1, max_value=5), | |||
Gregory Szorc
|
r37513 | chunk_sizes=strategies.data()) | |
Gregory Szorc
|
r31796 | def test_random_input_sizes(self, original, level, chunk_sizes): | |
refctx = zstd.ZstdCompressor(level=level) | |||
ref_frame = refctx.compress(original) | |||
cctx = zstd.ZstdCompressor(level=level) | |||
cobj = cctx.compressobj(size=len(original)) | |||
chunks = [] | |||
i = 0 | |||
while True: | |||
Gregory Szorc
|
r37513 | chunk_size = chunk_sizes.draw(strategies.integers(1, 4096)) | |
Gregory Szorc
|
r31796 | source = original[i:i + chunk_size] | |
if not source: | |||
break | |||
chunks.append(cobj.compress(source)) | |||
i += chunk_size | |||
chunks.append(cobj.flush()) | |||
self.assertEqual(b''.join(chunks), ref_frame) | |||
Gregory Szorc
|
r40157 | @hypothesis.settings( | |
suppress_health_check=[hypothesis.HealthCheck.large_base_example]) | |||
@hypothesis.given(original=strategies.sampled_from(random_input_data()), | |||
level=strategies.integers(min_value=1, max_value=5), | |||
chunk_sizes=strategies.data(), | |||
flushes=strategies.data()) | |||
def test_flush_block(self, original, level, chunk_sizes, flushes): | |||
cctx = zstd.ZstdCompressor(level=level) | |||
cobj = cctx.compressobj() | |||
dctx = zstd.ZstdDecompressor() | |||
dobj = dctx.decompressobj() | |||
compressed_chunks = [] | |||
decompressed_chunks = [] | |||
i = 0 | |||
while True: | |||
input_size = chunk_sizes.draw(strategies.integers(1, 4096)) | |||
source = original[i:i + input_size] | |||
if not source: | |||
break | |||
i += input_size | |||
chunk = cobj.compress(source) | |||
compressed_chunks.append(chunk) | |||
decompressed_chunks.append(dobj.decompress(chunk)) | |||
if not flushes.draw(strategies.booleans()): | |||
continue | |||
chunk = cobj.flush(zstd.COMPRESSOBJ_FLUSH_BLOCK) | |||
compressed_chunks.append(chunk) | |||
decompressed_chunks.append(dobj.decompress(chunk)) | |||
self.assertEqual(b''.join(decompressed_chunks), original[0:i]) | |||
chunk = cobj.flush(zstd.COMPRESSOBJ_FLUSH_FINISH) | |||
compressed_chunks.append(chunk) | |||
decompressed_chunks.append(dobj.decompress(chunk)) | |||
self.assertEqual(dctx.decompress(b''.join(compressed_chunks), | |||
max_output_size=len(original)), | |||
original) | |||
self.assertEqual(b''.join(decompressed_chunks), original) | |||
Gregory Szorc
|
r31796 | ||
@unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set') | |||
@make_cffi | |||
Gregory Szorc
|
r37513 | class TestCompressor_read_to_iter_fuzzing(unittest.TestCase): | |
Gregory Szorc
|
r31796 | @hypothesis.given(original=strategies.sampled_from(random_input_data()), | |
level=strategies.integers(min_value=1, max_value=5), | |||
read_size=strategies.integers(min_value=1, max_value=4096), | |||
write_size=strategies.integers(min_value=1, max_value=4096)) | |||
def test_read_write_size_variance(self, original, level, read_size, write_size): | |||
refcctx = zstd.ZstdCompressor(level=level) | |||
ref_frame = refcctx.compress(original) | |||
source = io.BytesIO(original) | |||
cctx = zstd.ZstdCompressor(level=level) | |||
Gregory Szorc
|
r37513 | chunks = list(cctx.read_to_iter(source, size=len(original), | |
read_size=read_size, | |||
write_size=write_size)) | |||
Gregory Szorc
|
r31796 | ||
self.assertEqual(b''.join(chunks), ref_frame) | |||
@unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set') | |||
class TestCompressor_multi_compress_to_buffer_fuzzing(unittest.TestCase): | |||
@hypothesis.given(original=strategies.lists(strategies.sampled_from(random_input_data()), | |||
min_size=1, max_size=1024), | |||
threads=strategies.integers(min_value=1, max_value=8), | |||
use_dict=strategies.booleans()) | |||
def test_data_equivalence(self, original, threads, use_dict): | |||
kwargs = {} | |||
# Use a content dictionary because it is cheap to create. | |||
if use_dict: | |||
kwargs['dict_data'] = zstd.ZstdCompressionDict(original[0]) | |||
cctx = zstd.ZstdCompressor(level=1, | |||
write_checksum=True, | |||
**kwargs) | |||
Gregory Szorc
|
r42237 | if not hasattr(cctx, 'multi_compress_to_buffer'): | |
self.skipTest('multi_compress_to_buffer not available') | |||
Gregory Szorc
|
r31796 | result = cctx.multi_compress_to_buffer(original, threads=-1) | |
self.assertEqual(len(result), len(original)) | |||
# The frame produced via the batch APIs may not be bit identical to that | |||
# produced by compress() because compression parameters are adjusted | |||
# from the first input in batch mode. So the only thing we can do is | |||
# verify the decompressed data matches the input. | |||
dctx = zstd.ZstdDecompressor(**kwargs) | |||
for i, frame in enumerate(result): | |||
self.assertEqual(dctx.decompress(frame), original[i]) | |||
Gregory Szorc
|
r40157 | ||
@unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set') | |||
@make_cffi | |||
class TestCompressor_chunker_fuzzing(unittest.TestCase): | |||
@hypothesis.settings( | |||
suppress_health_check=[hypothesis.HealthCheck.large_base_example]) | |||
@hypothesis.given(original=strategies.sampled_from(random_input_data()), | |||
level=strategies.integers(min_value=1, max_value=5), | |||
chunk_size=strategies.integers( | |||
min_value=1, | |||
max_value=32 * 1048576), | |||
input_sizes=strategies.data()) | |||
def test_random_input_sizes(self, original, level, chunk_size, input_sizes): | |||
cctx = zstd.ZstdCompressor(level=level) | |||
chunker = cctx.chunker(chunk_size=chunk_size) | |||
chunks = [] | |||
i = 0 | |||
while True: | |||
input_size = input_sizes.draw(strategies.integers(1, 4096)) | |||
source = original[i:i + input_size] | |||
if not source: | |||
break | |||
chunks.extend(chunker.compress(source)) | |||
i += input_size | |||
chunks.extend(chunker.finish()) | |||
dctx = zstd.ZstdDecompressor() | |||
self.assertEqual(dctx.decompress(b''.join(chunks), | |||
max_output_size=len(original)), | |||
original) | |||
self.assertTrue(all(len(chunk) == chunk_size for chunk in chunks[:-1])) | |||
@hypothesis.settings( | |||
suppress_health_check=[hypothesis.HealthCheck.large_base_example]) | |||
@hypothesis.given(original=strategies.sampled_from(random_input_data()), | |||
level=strategies.integers(min_value=1, max_value=5), | |||
chunk_size=strategies.integers( | |||
min_value=1, | |||
max_value=32 * 1048576), | |||
input_sizes=strategies.data(), | |||
flushes=strategies.data()) | |||
def test_flush_block(self, original, level, chunk_size, input_sizes, | |||
flushes): | |||
cctx = zstd.ZstdCompressor(level=level) | |||
chunker = cctx.chunker(chunk_size=chunk_size) | |||
dctx = zstd.ZstdDecompressor() | |||
dobj = dctx.decompressobj() | |||
compressed_chunks = [] | |||
decompressed_chunks = [] | |||
i = 0 | |||
while True: | |||
input_size = input_sizes.draw(strategies.integers(1, 4096)) | |||
source = original[i:i + input_size] | |||
if not source: | |||
break | |||
i += input_size | |||
chunks = list(chunker.compress(source)) | |||
compressed_chunks.extend(chunks) | |||
decompressed_chunks.append(dobj.decompress(b''.join(chunks))) | |||
if not flushes.draw(strategies.booleans()): | |||
continue | |||
chunks = list(chunker.flush()) | |||
compressed_chunks.extend(chunks) | |||
decompressed_chunks.append(dobj.decompress(b''.join(chunks))) | |||
self.assertEqual(b''.join(decompressed_chunks), original[0:i]) | |||
chunks = list(chunker.finish()) | |||
compressed_chunks.extend(chunks) | |||
decompressed_chunks.append(dobj.decompress(b''.join(chunks))) | |||
self.assertEqual(dctx.decompress(b''.join(compressed_chunks), | |||
max_output_size=len(original)), | |||
original) | |||
self.assertEqual(b''.join(decompressed_chunks), original) |