test_decompressor_fuzzing.py
151 lines
| 5.5 KiB
| text/x-python
|
PythonLexer
Gregory Szorc
|
r31796 | import io | ||
import os | ||||
try: | ||||
import unittest2 as unittest | ||||
except ImportError: | ||||
import unittest | ||||
try: | ||||
import hypothesis | ||||
import hypothesis.strategies as strategies | ||||
except ImportError: | ||||
raise unittest.SkipTest('hypothesis not available') | ||||
import zstd | ||||
from . common import ( | ||||
make_cffi, | ||||
random_input_data, | ||||
) | ||||
@unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set') | ||||
@make_cffi | ||||
class TestDecompressor_write_to_fuzzing(unittest.TestCase): | ||||
@hypothesis.given(original=strategies.sampled_from(random_input_data()), | ||||
level=strategies.integers(min_value=1, max_value=5), | ||||
write_size=strategies.integers(min_value=1, max_value=8192), | ||||
input_sizes=strategies.streaming( | ||||
strategies.integers(min_value=1, max_value=4096))) | ||||
def test_write_size_variance(self, original, level, write_size, input_sizes): | ||||
input_sizes = iter(input_sizes) | ||||
cctx = zstd.ZstdCompressor(level=level) | ||||
frame = cctx.compress(original) | ||||
dctx = zstd.ZstdDecompressor() | ||||
source = io.BytesIO(frame) | ||||
dest = io.BytesIO() | ||||
with dctx.write_to(dest, write_size=write_size) as decompressor: | ||||
while True: | ||||
chunk = source.read(next(input_sizes)) | ||||
if not chunk: | ||||
break | ||||
decompressor.write(chunk) | ||||
self.assertEqual(dest.getvalue(), original) | ||||
@unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set') | ||||
@make_cffi | ||||
class TestDecompressor_copy_stream_fuzzing(unittest.TestCase): | ||||
@hypothesis.given(original=strategies.sampled_from(random_input_data()), | ||||
level=strategies.integers(min_value=1, max_value=5), | ||||
read_size=strategies.integers(min_value=1, max_value=8192), | ||||
write_size=strategies.integers(min_value=1, max_value=8192)) | ||||
def test_read_write_size_variance(self, original, level, read_size, write_size): | ||||
cctx = zstd.ZstdCompressor(level=level) | ||||
frame = cctx.compress(original) | ||||
source = io.BytesIO(frame) | ||||
dest = io.BytesIO() | ||||
dctx = zstd.ZstdDecompressor() | ||||
dctx.copy_stream(source, dest, read_size=read_size, write_size=write_size) | ||||
self.assertEqual(dest.getvalue(), original) | ||||
@unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set') | ||||
@make_cffi | ||||
class TestDecompressor_decompressobj_fuzzing(unittest.TestCase): | ||||
@hypothesis.given(original=strategies.sampled_from(random_input_data()), | ||||
level=strategies.integers(min_value=1, max_value=5), | ||||
chunk_sizes=strategies.streaming( | ||||
strategies.integers(min_value=1, max_value=4096))) | ||||
def test_random_input_sizes(self, original, level, chunk_sizes): | ||||
chunk_sizes = iter(chunk_sizes) | ||||
cctx = zstd.ZstdCompressor(level=level) | ||||
frame = cctx.compress(original) | ||||
source = io.BytesIO(frame) | ||||
dctx = zstd.ZstdDecompressor() | ||||
dobj = dctx.decompressobj() | ||||
chunks = [] | ||||
while True: | ||||
chunk = source.read(next(chunk_sizes)) | ||||
if not chunk: | ||||
break | ||||
chunks.append(dobj.decompress(chunk)) | ||||
self.assertEqual(b''.join(chunks), original) | ||||
@unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set') | ||||
@make_cffi | ||||
class TestDecompressor_read_from_fuzzing(unittest.TestCase): | ||||
@hypothesis.given(original=strategies.sampled_from(random_input_data()), | ||||
level=strategies.integers(min_value=1, max_value=5), | ||||
read_size=strategies.integers(min_value=1, max_value=4096), | ||||
write_size=strategies.integers(min_value=1, max_value=4096)) | ||||
def test_read_write_size_variance(self, original, level, read_size, write_size): | ||||
cctx = zstd.ZstdCompressor(level=level) | ||||
frame = cctx.compress(original) | ||||
source = io.BytesIO(frame) | ||||
dctx = zstd.ZstdDecompressor() | ||||
chunks = list(dctx.read_from(source, read_size=read_size, write_size=write_size)) | ||||
self.assertEqual(b''.join(chunks), original) | ||||
@unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set') | ||||
class TestDecompressor_multi_decompress_to_buffer_fuzzing(unittest.TestCase): | ||||
@hypothesis.given(original=strategies.lists(strategies.sampled_from(random_input_data()), | ||||
min_size=1, max_size=1024), | ||||
threads=strategies.integers(min_value=1, max_value=8), | ||||
use_dict=strategies.booleans()) | ||||
def test_data_equivalence(self, original, threads, use_dict): | ||||
kwargs = {} | ||||
if use_dict: | ||||
kwargs['dict_data'] = zstd.ZstdCompressionDict(original[0]) | ||||
cctx = zstd.ZstdCompressor(level=1, | ||||
write_content_size=True, | ||||
write_checksum=True, | ||||
**kwargs) | ||||
frames_buffer = cctx.multi_compress_to_buffer(original, threads=-1) | ||||
dctx = zstd.ZstdDecompressor(**kwargs) | ||||
result = dctx.multi_decompress_to_buffer(frames_buffer) | ||||
self.assertEqual(len(result), len(original)) | ||||
for i, frame in enumerate(result): | ||||
self.assertEqual(frame.tobytes(), original[i]) | ||||
frames_list = [f.tobytes() for f in frames_buffer] | ||||
result = dctx.multi_decompress_to_buffer(frames_list) | ||||
self.assertEqual(len(result), len(original)) | ||||
for i, frame in enumerate(result): | ||||
self.assertEqual(frame.tobytes(), original[i]) | ||||