##// END OF EJS Templates
hg: raise Abort on invalid path...
hg: raise Abort on invalid path Currently, some os.path functions when opening repositories may raise an uncaught TypeError or ValueError if the path is invalid. Let's catch these exceptions and turn them into an Abort for convenience. Differential Revision: https://phab.mercurial-scm.org/D5772

File last commit:

r37513:b1fb341d default
r41522:2fe2f1ba default
Show More
test_decompressor_fuzzing.py
252 lines | 9.7 KiB | text/x-python | PythonLexer
/ contrib / python-zstandard / tests / test_decompressor_fuzzing.py
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 import io
import os
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 import unittest
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796
try:
import hypothesis
import hypothesis.strategies as strategies
except ImportError:
raise unittest.SkipTest('hypothesis not available')
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 import zstandard as zstd
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796
from . common import (
make_cffi,
random_input_data,
)
@unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set')
@make_cffi
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 class TestDecompressor_stream_reader_fuzzing(unittest.TestCase):
@hypothesis.settings(
suppress_health_check=[hypothesis.HealthCheck.large_base_example])
@hypothesis.given(original=strategies.sampled_from(random_input_data()),
level=strategies.integers(min_value=1, max_value=5),
source_read_size=strategies.integers(1, 16384),
read_sizes=strategies.data())
def test_stream_source_read_variance(self, original, level, source_read_size,
read_sizes):
cctx = zstd.ZstdCompressor(level=level)
frame = cctx.compress(original)
dctx = zstd.ZstdDecompressor()
source = io.BytesIO(frame)
chunks = []
with dctx.stream_reader(source, read_size=source_read_size) as reader:
while True:
read_size = read_sizes.draw(strategies.integers(1, 16384))
chunk = reader.read(read_size)
if not chunk:
break
chunks.append(chunk)
self.assertEqual(b''.join(chunks), original)
@hypothesis.settings(
suppress_health_check=[hypothesis.HealthCheck.large_base_example])
@hypothesis.given(original=strategies.sampled_from(random_input_data()),
level=strategies.integers(min_value=1, max_value=5),
source_read_size=strategies.integers(1, 16384),
read_sizes=strategies.data())
def test_buffer_source_read_variance(self, original, level, source_read_size,
read_sizes):
cctx = zstd.ZstdCompressor(level=level)
frame = cctx.compress(original)
dctx = zstd.ZstdDecompressor()
chunks = []
with dctx.stream_reader(frame, read_size=source_read_size) as reader:
while True:
read_size = read_sizes.draw(strategies.integers(1, 16384))
chunk = reader.read(read_size)
if not chunk:
break
chunks.append(chunk)
self.assertEqual(b''.join(chunks), original)
@hypothesis.settings(
suppress_health_check=[hypothesis.HealthCheck.large_base_example])
@hypothesis.given(
original=strategies.sampled_from(random_input_data()),
level=strategies.integers(min_value=1, max_value=5),
source_read_size=strategies.integers(1, 16384),
seek_amounts=strategies.data(),
read_sizes=strategies.data())
def test_relative_seeks(self, original, level, source_read_size, seek_amounts,
read_sizes):
cctx = zstd.ZstdCompressor(level=level)
frame = cctx.compress(original)
dctx = zstd.ZstdDecompressor()
with dctx.stream_reader(frame, read_size=source_read_size) as reader:
while True:
amount = seek_amounts.draw(strategies.integers(0, 16384))
reader.seek(amount, os.SEEK_CUR)
offset = reader.tell()
read_amount = read_sizes.draw(strategies.integers(1, 16384))
chunk = reader.read(read_amount)
if not chunk:
break
self.assertEqual(original[offset:offset + len(chunk)], chunk)
@unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set')
@make_cffi
class TestDecompressor_stream_writer_fuzzing(unittest.TestCase):
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 @hypothesis.given(original=strategies.sampled_from(random_input_data()),
level=strategies.integers(min_value=1, max_value=5),
write_size=strategies.integers(min_value=1, max_value=8192),
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 input_sizes=strategies.data())
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 def test_write_size_variance(self, original, level, write_size, input_sizes):
cctx = zstd.ZstdCompressor(level=level)
frame = cctx.compress(original)
dctx = zstd.ZstdDecompressor()
source = io.BytesIO(frame)
dest = io.BytesIO()
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 with dctx.stream_writer(dest, write_size=write_size) as decompressor:
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 while True:
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 input_size = input_sizes.draw(strategies.integers(1, 4096))
chunk = source.read(input_size)
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 if not chunk:
break
decompressor.write(chunk)
self.assertEqual(dest.getvalue(), original)
@unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set')
@make_cffi
class TestDecompressor_copy_stream_fuzzing(unittest.TestCase):
@hypothesis.given(original=strategies.sampled_from(random_input_data()),
level=strategies.integers(min_value=1, max_value=5),
read_size=strategies.integers(min_value=1, max_value=8192),
write_size=strategies.integers(min_value=1, max_value=8192))
def test_read_write_size_variance(self, original, level, read_size, write_size):
cctx = zstd.ZstdCompressor(level=level)
frame = cctx.compress(original)
source = io.BytesIO(frame)
dest = io.BytesIO()
dctx = zstd.ZstdDecompressor()
dctx.copy_stream(source, dest, read_size=read_size, write_size=write_size)
self.assertEqual(dest.getvalue(), original)
@unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set')
@make_cffi
class TestDecompressor_decompressobj_fuzzing(unittest.TestCase):
@hypothesis.given(original=strategies.sampled_from(random_input_data()),
level=strategies.integers(min_value=1, max_value=5),
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 chunk_sizes=strategies.data())
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 def test_random_input_sizes(self, original, level, chunk_sizes):
cctx = zstd.ZstdCompressor(level=level)
frame = cctx.compress(original)
source = io.BytesIO(frame)
dctx = zstd.ZstdDecompressor()
dobj = dctx.decompressobj()
chunks = []
while True:
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 chunk_size = chunk_sizes.draw(strategies.integers(1, 4096))
chunk = source.read(chunk_size)
if not chunk:
break
chunks.append(dobj.decompress(chunk))
self.assertEqual(b''.join(chunks), original)
@hypothesis.given(original=strategies.sampled_from(random_input_data()),
level=strategies.integers(min_value=1, max_value=5),
write_size=strategies.integers(min_value=1,
max_value=4 * zstd.DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE),
chunk_sizes=strategies.data())
def test_random_output_sizes(self, original, level, write_size, chunk_sizes):
cctx = zstd.ZstdCompressor(level=level)
frame = cctx.compress(original)
source = io.BytesIO(frame)
dctx = zstd.ZstdDecompressor()
dobj = dctx.decompressobj(write_size=write_size)
chunks = []
while True:
chunk_size = chunk_sizes.draw(strategies.integers(1, 4096))
chunk = source.read(chunk_size)
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 if not chunk:
break
chunks.append(dobj.decompress(chunk))
self.assertEqual(b''.join(chunks), original)
@unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set')
@make_cffi
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 class TestDecompressor_read_to_iter_fuzzing(unittest.TestCase):
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 @hypothesis.given(original=strategies.sampled_from(random_input_data()),
level=strategies.integers(min_value=1, max_value=5),
read_size=strategies.integers(min_value=1, max_value=4096),
write_size=strategies.integers(min_value=1, max_value=4096))
def test_read_write_size_variance(self, original, level, read_size, write_size):
cctx = zstd.ZstdCompressor(level=level)
frame = cctx.compress(original)
source = io.BytesIO(frame)
dctx = zstd.ZstdDecompressor()
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 chunks = list(dctx.read_to_iter(source, read_size=read_size, write_size=write_size))
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796
self.assertEqual(b''.join(chunks), original)
@unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set')
class TestDecompressor_multi_decompress_to_buffer_fuzzing(unittest.TestCase):
@hypothesis.given(original=strategies.lists(strategies.sampled_from(random_input_data()),
min_size=1, max_size=1024),
threads=strategies.integers(min_value=1, max_value=8),
use_dict=strategies.booleans())
def test_data_equivalence(self, original, threads, use_dict):
kwargs = {}
if use_dict:
kwargs['dict_data'] = zstd.ZstdCompressionDict(original[0])
cctx = zstd.ZstdCompressor(level=1,
write_content_size=True,
write_checksum=True,
**kwargs)
frames_buffer = cctx.multi_compress_to_buffer(original, threads=-1)
dctx = zstd.ZstdDecompressor(**kwargs)
result = dctx.multi_decompress_to_buffer(frames_buffer)
self.assertEqual(len(result), len(original))
for i, frame in enumerate(result):
self.assertEqual(frame.tobytes(), original[i])
frames_list = [f.tobytes() for f in frames_buffer]
result = dctx.multi_decompress_to_buffer(frames_list)
self.assertEqual(len(result), len(original))
for i, frame in enumerate(result):
self.assertEqual(frame.tobytes(), original[i])