##// END OF EJS Templates
rust-nodemap: building blocks for nodetree structures...
rust-nodemap: building blocks for nodetree structures This is similar to `nodetreenode` in `revlog.c`. We give it a higher level feeling for ease of handling in Rust context and provide tools for tests and debugging. The encoding choice is dictated by our ultimate goal in this series, that is to make an append-only persistent version of `nodetree`: the 0th Block must be adressed from other Blocks. Differential Revision: https://phab.mercurial-scm.org/D7787

File last commit:

r44446:de783805 default
r44600:63db6657 default
Show More
test_compressor_fuzzing.py
836 lines | 28.0 KiB | text/x-python | PythonLexer
/ contrib / python-zstandard / tests / test_compressor_fuzzing.py
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 import io
import os
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 import unittest
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796
try:
import hypothesis
import hypothesis.strategies as strategies
except ImportError:
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 raise unittest.SkipTest("hypothesis not available")
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 import zstandard as zstd
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 from .common import (
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 make_cffi,
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 NonClosingBytesIO,
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 random_input_data,
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 TestCase,
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 )
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 @unittest.skipUnless("ZSTD_SLOW_TESTS" in os.environ, "ZSTD_SLOW_TESTS not set")
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 @make_cffi
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 class TestCompressor_stream_reader_fuzzing(TestCase):
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 @hypothesis.settings(
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 suppress_health_check=[hypothesis.HealthCheck.large_base_example]
)
@hypothesis.given(
original=strategies.sampled_from(random_input_data()),
level=strategies.integers(min_value=1, max_value=5),
source_read_size=strategies.integers(1, 16384),
read_size=strategies.integers(-1, zstd.COMPRESSION_RECOMMENDED_OUTPUT_SIZE),
)
def test_stream_source_read(self, original, level, source_read_size, read_size):
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 if read_size == 0:
read_size = -1
refctx = zstd.ZstdCompressor(level=level)
ref_frame = refctx.compress(original)
cctx = zstd.ZstdCompressor(level=level)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 with cctx.stream_reader(
io.BytesIO(original), size=len(original), read_size=source_read_size
) as reader:
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 chunks = []
while True:
chunk = reader.read(read_size)
if not chunk:
break
chunks.append(chunk)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(b"".join(chunks), ref_frame)
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
@hypothesis.settings(
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 suppress_health_check=[hypothesis.HealthCheck.large_base_example]
)
@hypothesis.given(
original=strategies.sampled_from(random_input_data()),
level=strategies.integers(min_value=1, max_value=5),
source_read_size=strategies.integers(1, 16384),
read_size=strategies.integers(-1, zstd.COMPRESSION_RECOMMENDED_OUTPUT_SIZE),
)
def test_buffer_source_read(self, original, level, source_read_size, read_size):
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 if read_size == 0:
read_size = -1
refctx = zstd.ZstdCompressor(level=level)
ref_frame = refctx.compress(original)
cctx = zstd.ZstdCompressor(level=level)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 with cctx.stream_reader(
original, size=len(original), read_size=source_read_size
) as reader:
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 chunks = []
while True:
chunk = reader.read(read_size)
if not chunk:
break
chunks.append(chunk)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(b"".join(chunks), ref_frame)
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
@hypothesis.settings(
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 suppress_health_check=[
hypothesis.HealthCheck.large_base_example,
hypothesis.HealthCheck.too_slow,
]
)
@hypothesis.given(
original=strategies.sampled_from(random_input_data()),
level=strategies.integers(min_value=1, max_value=5),
source_read_size=strategies.integers(1, 16384),
read_sizes=strategies.data(),
)
def test_stream_source_read_variance(
self, original, level, source_read_size, read_sizes
):
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 refctx = zstd.ZstdCompressor(level=level)
ref_frame = refctx.compress(original)
cctx = zstd.ZstdCompressor(level=level)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 with cctx.stream_reader(
io.BytesIO(original), size=len(original), read_size=source_read_size
) as reader:
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 chunks = []
while True:
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 read_size = read_sizes.draw(strategies.integers(-1, 16384))
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 chunk = reader.read(read_size)
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 if not chunk and read_size:
break
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513
chunks.append(chunk)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(b"".join(chunks), ref_frame)
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 @hypothesis.settings(
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 suppress_health_check=[
hypothesis.HealthCheck.large_base_example,
hypothesis.HealthCheck.too_slow,
]
)
@hypothesis.given(
original=strategies.sampled_from(random_input_data()),
level=strategies.integers(min_value=1, max_value=5),
source_read_size=strategies.integers(1, 16384),
read_sizes=strategies.data(),
)
def test_buffer_source_read_variance(
self, original, level, source_read_size, read_sizes
):
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513
refctx = zstd.ZstdCompressor(level=level)
ref_frame = refctx.compress(original)
cctx = zstd.ZstdCompressor(level=level)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 with cctx.stream_reader(
original, size=len(original), read_size=source_read_size
) as reader:
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 chunks = []
while True:
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 read_size = read_sizes.draw(strategies.integers(-1, 16384))
chunk = reader.read(read_size)
if not chunk and read_size:
break
chunks.append(chunk)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(b"".join(chunks), ref_frame)
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
@hypothesis.settings(
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 suppress_health_check=[hypothesis.HealthCheck.large_base_example]
)
@hypothesis.given(
original=strategies.sampled_from(random_input_data()),
level=strategies.integers(min_value=1, max_value=5),
source_read_size=strategies.integers(1, 16384),
read_size=strategies.integers(1, zstd.COMPRESSION_RECOMMENDED_OUTPUT_SIZE),
)
def test_stream_source_readinto(self, original, level, source_read_size, read_size):
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 refctx = zstd.ZstdCompressor(level=level)
ref_frame = refctx.compress(original)
cctx = zstd.ZstdCompressor(level=level)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 with cctx.stream_reader(
io.BytesIO(original), size=len(original), read_size=source_read_size
) as reader:
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 chunks = []
while True:
b = bytearray(read_size)
count = reader.readinto(b)
if not count:
break
chunks.append(bytes(b[0:count]))
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(b"".join(chunks), ref_frame)
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
@hypothesis.settings(
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 suppress_health_check=[hypothesis.HealthCheck.large_base_example]
)
@hypothesis.given(
original=strategies.sampled_from(random_input_data()),
level=strategies.integers(min_value=1, max_value=5),
source_read_size=strategies.integers(1, 16384),
read_size=strategies.integers(1, zstd.COMPRESSION_RECOMMENDED_OUTPUT_SIZE),
)
def test_buffer_source_readinto(self, original, level, source_read_size, read_size):
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
refctx = zstd.ZstdCompressor(level=level)
ref_frame = refctx.compress(original)
cctx = zstd.ZstdCompressor(level=level)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 with cctx.stream_reader(
original, size=len(original), read_size=source_read_size
) as reader:
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 chunks = []
while True:
b = bytearray(read_size)
count = reader.readinto(b)
if not count:
break
chunks.append(bytes(b[0:count]))
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(b"".join(chunks), ref_frame)
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
@hypothesis.settings(
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 suppress_health_check=[
hypothesis.HealthCheck.large_base_example,
hypothesis.HealthCheck.too_slow,
]
)
@hypothesis.given(
original=strategies.sampled_from(random_input_data()),
level=strategies.integers(min_value=1, max_value=5),
source_read_size=strategies.integers(1, 16384),
read_sizes=strategies.data(),
)
def test_stream_source_readinto_variance(
self, original, level, source_read_size, read_sizes
):
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 refctx = zstd.ZstdCompressor(level=level)
ref_frame = refctx.compress(original)
cctx = zstd.ZstdCompressor(level=level)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 with cctx.stream_reader(
io.BytesIO(original), size=len(original), read_size=source_read_size
) as reader:
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 chunks = []
while True:
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 read_size = read_sizes.draw(strategies.integers(1, 16384))
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 b = bytearray(read_size)
count = reader.readinto(b)
if not count:
break
chunks.append(bytes(b[0:count]))
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(b"".join(chunks), ref_frame)
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
@hypothesis.settings(
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 suppress_health_check=[
hypothesis.HealthCheck.large_base_example,
hypothesis.HealthCheck.too_slow,
]
)
@hypothesis.given(
original=strategies.sampled_from(random_input_data()),
level=strategies.integers(min_value=1, max_value=5),
source_read_size=strategies.integers(1, 16384),
read_sizes=strategies.data(),
)
def test_buffer_source_readinto_variance(
self, original, level, source_read_size, read_sizes
):
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
refctx = zstd.ZstdCompressor(level=level)
ref_frame = refctx.compress(original)
cctx = zstd.ZstdCompressor(level=level)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 with cctx.stream_reader(
original, size=len(original), read_size=source_read_size
) as reader:
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 chunks = []
while True:
read_size = read_sizes.draw(strategies.integers(1, 16384))
b = bytearray(read_size)
count = reader.readinto(b)
if not count:
break
chunks.append(bytes(b[0:count]))
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(b"".join(chunks), ref_frame)
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
@hypothesis.settings(
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 suppress_health_check=[hypothesis.HealthCheck.large_base_example]
)
@hypothesis.given(
original=strategies.sampled_from(random_input_data()),
level=strategies.integers(min_value=1, max_value=5),
source_read_size=strategies.integers(1, 16384),
read_size=strategies.integers(-1, zstd.COMPRESSION_RECOMMENDED_OUTPUT_SIZE),
)
def test_stream_source_read1(self, original, level, source_read_size, read_size):
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 if read_size == 0:
read_size = -1
refctx = zstd.ZstdCompressor(level=level)
ref_frame = refctx.compress(original)
cctx = zstd.ZstdCompressor(level=level)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 with cctx.stream_reader(
io.BytesIO(original), size=len(original), read_size=source_read_size
) as reader:
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 chunks = []
while True:
chunk = reader.read1(read_size)
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 if not chunk:
break
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 chunks.append(chunk)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(b"".join(chunks), ref_frame)
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 @hypothesis.settings(
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 suppress_health_check=[hypothesis.HealthCheck.large_base_example]
)
@hypothesis.given(
original=strategies.sampled_from(random_input_data()),
level=strategies.integers(min_value=1, max_value=5),
source_read_size=strategies.integers(1, 16384),
read_size=strategies.integers(-1, zstd.COMPRESSION_RECOMMENDED_OUTPUT_SIZE),
)
def test_buffer_source_read1(self, original, level, source_read_size, read_size):
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 if read_size == 0:
read_size = -1
refctx = zstd.ZstdCompressor(level=level)
ref_frame = refctx.compress(original)
cctx = zstd.ZstdCompressor(level=level)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 with cctx.stream_reader(
original, size=len(original), read_size=source_read_size
) as reader:
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 chunks = []
while True:
chunk = reader.read1(read_size)
if not chunk:
break
chunks.append(chunk)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(b"".join(chunks), ref_frame)
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
@hypothesis.settings(
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 suppress_health_check=[
hypothesis.HealthCheck.large_base_example,
hypothesis.HealthCheck.too_slow,
]
)
@hypothesis.given(
original=strategies.sampled_from(random_input_data()),
level=strategies.integers(min_value=1, max_value=5),
source_read_size=strategies.integers(1, 16384),
read_sizes=strategies.data(),
)
def test_stream_source_read1_variance(
self, original, level, source_read_size, read_sizes
):
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 refctx = zstd.ZstdCompressor(level=level)
ref_frame = refctx.compress(original)
cctx = zstd.ZstdCompressor(level=level)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 with cctx.stream_reader(
io.BytesIO(original), size=len(original), read_size=source_read_size
) as reader:
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 chunks = []
while True:
read_size = read_sizes.draw(strategies.integers(-1, 16384))
chunk = reader.read1(read_size)
if not chunk and read_size:
break
chunks.append(chunk)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(b"".join(chunks), ref_frame)
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
@hypothesis.settings(
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 suppress_health_check=[
hypothesis.HealthCheck.large_base_example,
hypothesis.HealthCheck.too_slow,
]
)
@hypothesis.given(
original=strategies.sampled_from(random_input_data()),
level=strategies.integers(min_value=1, max_value=5),
source_read_size=strategies.integers(1, 16384),
read_sizes=strategies.data(),
)
def test_buffer_source_read1_variance(
self, original, level, source_read_size, read_sizes
):
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
refctx = zstd.ZstdCompressor(level=level)
ref_frame = refctx.compress(original)
cctx = zstd.ZstdCompressor(level=level)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 with cctx.stream_reader(
original, size=len(original), read_size=source_read_size
) as reader:
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 chunks = []
while True:
read_size = read_sizes.draw(strategies.integers(-1, 16384))
chunk = reader.read1(read_size)
if not chunk and read_size:
break
chunks.append(chunk)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(b"".join(chunks), ref_frame)
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
@hypothesis.settings(
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 suppress_health_check=[hypothesis.HealthCheck.large_base_example]
)
@hypothesis.given(
original=strategies.sampled_from(random_input_data()),
level=strategies.integers(min_value=1, max_value=5),
source_read_size=strategies.integers(1, 16384),
read_size=strategies.integers(1, zstd.COMPRESSION_RECOMMENDED_OUTPUT_SIZE),
)
def test_stream_source_readinto1(
self, original, level, source_read_size, read_size
):
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 if read_size == 0:
read_size = -1
refctx = zstd.ZstdCompressor(level=level)
ref_frame = refctx.compress(original)
cctx = zstd.ZstdCompressor(level=level)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 with cctx.stream_reader(
io.BytesIO(original), size=len(original), read_size=source_read_size
) as reader:
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 chunks = []
while True:
b = bytearray(read_size)
count = reader.readinto1(b)
if not count:
break
chunks.append(bytes(b[0:count]))
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(b"".join(chunks), ref_frame)
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
@hypothesis.settings(
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 suppress_health_check=[hypothesis.HealthCheck.large_base_example]
)
@hypothesis.given(
original=strategies.sampled_from(random_input_data()),
level=strategies.integers(min_value=1, max_value=5),
source_read_size=strategies.integers(1, 16384),
read_size=strategies.integers(1, zstd.COMPRESSION_RECOMMENDED_OUTPUT_SIZE),
)
def test_buffer_source_readinto1(
self, original, level, source_read_size, read_size
):
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 if read_size == 0:
read_size = -1
refctx = zstd.ZstdCompressor(level=level)
ref_frame = refctx.compress(original)
cctx = zstd.ZstdCompressor(level=level)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 with cctx.stream_reader(
original, size=len(original), read_size=source_read_size
) as reader:
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 chunks = []
while True:
b = bytearray(read_size)
count = reader.readinto1(b)
if not count:
break
chunks.append(bytes(b[0:count]))
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(b"".join(chunks), ref_frame)
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
@hypothesis.settings(
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 suppress_health_check=[
hypothesis.HealthCheck.large_base_example,
hypothesis.HealthCheck.too_slow,
]
)
@hypothesis.given(
original=strategies.sampled_from(random_input_data()),
level=strategies.integers(min_value=1, max_value=5),
source_read_size=strategies.integers(1, 16384),
read_sizes=strategies.data(),
)
def test_stream_source_readinto1_variance(
self, original, level, source_read_size, read_sizes
):
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 refctx = zstd.ZstdCompressor(level=level)
ref_frame = refctx.compress(original)
cctx = zstd.ZstdCompressor(level=level)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 with cctx.stream_reader(
io.BytesIO(original), size=len(original), read_size=source_read_size
) as reader:
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 chunks = []
while True:
read_size = read_sizes.draw(strategies.integers(1, 16384))
b = bytearray(read_size)
count = reader.readinto1(b)
if not count:
break
chunks.append(bytes(b[0:count]))
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(b"".join(chunks), ref_frame)
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
@hypothesis.settings(
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 suppress_health_check=[
hypothesis.HealthCheck.large_base_example,
hypothesis.HealthCheck.too_slow,
]
)
@hypothesis.given(
original=strategies.sampled_from(random_input_data()),
level=strategies.integers(min_value=1, max_value=5),
source_read_size=strategies.integers(1, 16384),
read_sizes=strategies.data(),
)
def test_buffer_source_readinto1_variance(
self, original, level, source_read_size, read_sizes
):
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
refctx = zstd.ZstdCompressor(level=level)
ref_frame = refctx.compress(original)
cctx = zstd.ZstdCompressor(level=level)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 with cctx.stream_reader(
original, size=len(original), read_size=source_read_size
) as reader:
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 chunks = []
while True:
read_size = read_sizes.draw(strategies.integers(1, 16384))
b = bytearray(read_size)
count = reader.readinto1(b)
if not count:
break
chunks.append(bytes(b[0:count]))
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(b"".join(chunks), ref_frame)
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 @unittest.skipUnless("ZSTD_SLOW_TESTS" in os.environ, "ZSTD_SLOW_TESTS not set")
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 @make_cffi
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 class TestCompressor_stream_writer_fuzzing(TestCase):
@hypothesis.given(
original=strategies.sampled_from(random_input_data()),
level=strategies.integers(min_value=1, max_value=5),
write_size=strategies.integers(min_value=1, max_value=1048576),
)
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 def test_write_size_variance(self, original, level, write_size):
refctx = zstd.ZstdCompressor(level=level)
ref_frame = refctx.compress(original)
cctx = zstd.ZstdCompressor(level=level)
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 b = NonClosingBytesIO()
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 with cctx.stream_writer(
b, size=len(original), write_size=write_size
) as compressor:
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 compressor.write(original)
self.assertEqual(b.getvalue(), ref_frame)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 @unittest.skipUnless("ZSTD_SLOW_TESTS" in os.environ, "ZSTD_SLOW_TESTS not set")
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 @make_cffi
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 class TestCompressor_copy_stream_fuzzing(TestCase):
@hypothesis.given(
original=strategies.sampled_from(random_input_data()),
level=strategies.integers(min_value=1, max_value=5),
read_size=strategies.integers(min_value=1, max_value=1048576),
write_size=strategies.integers(min_value=1, max_value=1048576),
)
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 def test_read_write_size_variance(self, original, level, read_size, write_size):
refctx = zstd.ZstdCompressor(level=level)
ref_frame = refctx.compress(original)
cctx = zstd.ZstdCompressor(level=level)
source = io.BytesIO(original)
dest = io.BytesIO()
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 cctx.copy_stream(
source, dest, size=len(original), read_size=read_size, write_size=write_size
)
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796
self.assertEqual(dest.getvalue(), ref_frame)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 @unittest.skipUnless("ZSTD_SLOW_TESTS" in os.environ, "ZSTD_SLOW_TESTS not set")
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 @make_cffi
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 class TestCompressor_compressobj_fuzzing(TestCase):
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 @hypothesis.settings(
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 suppress_health_check=[
hypothesis.HealthCheck.large_base_example,
hypothesis.HealthCheck.too_slow,
]
)
@hypothesis.given(
original=strategies.sampled_from(random_input_data()),
level=strategies.integers(min_value=1, max_value=5),
chunk_sizes=strategies.data(),
)
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 def test_random_input_sizes(self, original, level, chunk_sizes):
refctx = zstd.ZstdCompressor(level=level)
ref_frame = refctx.compress(original)
cctx = zstd.ZstdCompressor(level=level)
cobj = cctx.compressobj(size=len(original))
chunks = []
i = 0
while True:
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 chunk_size = chunk_sizes.draw(strategies.integers(1, 4096))
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 source = original[i : i + chunk_size]
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 if not source:
break
chunks.append(cobj.compress(source))
i += chunk_size
chunks.append(cobj.flush())
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(b"".join(chunks), ref_frame)
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796
Gregory Szorc
zstandard: vendor python-zstandard 0.10.1...
r40157 @hypothesis.settings(
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 suppress_health_check=[
hypothesis.HealthCheck.large_base_example,
hypothesis.HealthCheck.too_slow,
]
)
@hypothesis.given(
original=strategies.sampled_from(random_input_data()),
level=strategies.integers(min_value=1, max_value=5),
chunk_sizes=strategies.data(),
flushes=strategies.data(),
)
Gregory Szorc
zstandard: vendor python-zstandard 0.10.1...
r40157 def test_flush_block(self, original, level, chunk_sizes, flushes):
cctx = zstd.ZstdCompressor(level=level)
cobj = cctx.compressobj()
dctx = zstd.ZstdDecompressor()
dobj = dctx.decompressobj()
compressed_chunks = []
decompressed_chunks = []
i = 0
while True:
input_size = chunk_sizes.draw(strategies.integers(1, 4096))
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 source = original[i : i + input_size]
Gregory Szorc
zstandard: vendor python-zstandard 0.10.1...
r40157 if not source:
break
i += input_size
chunk = cobj.compress(source)
compressed_chunks.append(chunk)
decompressed_chunks.append(dobj.decompress(chunk))
if not flushes.draw(strategies.booleans()):
continue
chunk = cobj.flush(zstd.COMPRESSOBJ_FLUSH_BLOCK)
compressed_chunks.append(chunk)
decompressed_chunks.append(dobj.decompress(chunk))
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(b"".join(decompressed_chunks), original[0:i])
Gregory Szorc
zstandard: vendor python-zstandard 0.10.1...
r40157
chunk = cobj.flush(zstd.COMPRESSOBJ_FLUSH_FINISH)
compressed_chunks.append(chunk)
decompressed_chunks.append(dobj.decompress(chunk))
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(
dctx.decompress(b"".join(compressed_chunks), max_output_size=len(original)),
original,
)
self.assertEqual(b"".join(decompressed_chunks), original)
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 @unittest.skipUnless("ZSTD_SLOW_TESTS" in os.environ, "ZSTD_SLOW_TESTS not set")
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 @make_cffi
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 class TestCompressor_read_to_iter_fuzzing(TestCase):
@hypothesis.given(
original=strategies.sampled_from(random_input_data()),
level=strategies.integers(min_value=1, max_value=5),
read_size=strategies.integers(min_value=1, max_value=4096),
write_size=strategies.integers(min_value=1, max_value=4096),
)
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 def test_read_write_size_variance(self, original, level, read_size, write_size):
refcctx = zstd.ZstdCompressor(level=level)
ref_frame = refcctx.compress(original)
source = io.BytesIO(original)
cctx = zstd.ZstdCompressor(level=level)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 chunks = list(
cctx.read_to_iter(
source, size=len(original), read_size=read_size, write_size=write_size
)
)
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(b"".join(chunks), ref_frame)
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 @unittest.skipUnless("ZSTD_SLOW_TESTS" in os.environ, "ZSTD_SLOW_TESTS not set")
class TestCompressor_multi_compress_to_buffer_fuzzing(TestCase):
@hypothesis.given(
original=strategies.lists(
strategies.sampled_from(random_input_data()), min_size=1, max_size=1024
),
threads=strategies.integers(min_value=1, max_value=8),
use_dict=strategies.booleans(),
)
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 def test_data_equivalence(self, original, threads, use_dict):
kwargs = {}
# Use a content dictionary because it is cheap to create.
if use_dict:
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 kwargs["dict_data"] = zstd.ZstdCompressionDict(original[0])
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 cctx = zstd.ZstdCompressor(level=1, write_checksum=True, **kwargs)
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 if not hasattr(cctx, "multi_compress_to_buffer"):
self.skipTest("multi_compress_to_buffer not available")
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 result = cctx.multi_compress_to_buffer(original, threads=-1)
self.assertEqual(len(result), len(original))
# The frame produced via the batch APIs may not be bit identical to that
# produced by compress() because compression parameters are adjusted
# from the first input in batch mode. So the only thing we can do is
# verify the decompressed data matches the input.
dctx = zstd.ZstdDecompressor(**kwargs)
for i, frame in enumerate(result):
self.assertEqual(dctx.decompress(frame), original[i])
Gregory Szorc
zstandard: vendor python-zstandard 0.10.1...
r40157
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 @unittest.skipUnless("ZSTD_SLOW_TESTS" in os.environ, "ZSTD_SLOW_TESTS not set")
Gregory Szorc
zstandard: vendor python-zstandard 0.10.1...
r40157 @make_cffi
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 class TestCompressor_chunker_fuzzing(TestCase):
Gregory Szorc
zstandard: vendor python-zstandard 0.10.1...
r40157 @hypothesis.settings(
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 suppress_health_check=[
hypothesis.HealthCheck.large_base_example,
hypothesis.HealthCheck.too_slow,
]
)
@hypothesis.given(
original=strategies.sampled_from(random_input_data()),
level=strategies.integers(min_value=1, max_value=5),
chunk_size=strategies.integers(min_value=1, max_value=32 * 1048576),
input_sizes=strategies.data(),
)
Gregory Szorc
zstandard: vendor python-zstandard 0.10.1...
r40157 def test_random_input_sizes(self, original, level, chunk_size, input_sizes):
cctx = zstd.ZstdCompressor(level=level)
chunker = cctx.chunker(chunk_size=chunk_size)
chunks = []
i = 0
while True:
input_size = input_sizes.draw(strategies.integers(1, 4096))
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 source = original[i : i + input_size]
Gregory Szorc
zstandard: vendor python-zstandard 0.10.1...
r40157 if not source:
break
chunks.extend(chunker.compress(source))
i += input_size
chunks.extend(chunker.finish())
dctx = zstd.ZstdDecompressor()
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(
dctx.decompress(b"".join(chunks), max_output_size=len(original)), original
)
Gregory Szorc
zstandard: vendor python-zstandard 0.10.1...
r40157
self.assertTrue(all(len(chunk) == chunk_size for chunk in chunks[:-1]))
@hypothesis.settings(
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 suppress_health_check=[
hypothesis.HealthCheck.large_base_example,
hypothesis.HealthCheck.too_slow,
]
)
@hypothesis.given(
original=strategies.sampled_from(random_input_data()),
level=strategies.integers(min_value=1, max_value=5),
chunk_size=strategies.integers(min_value=1, max_value=32 * 1048576),
input_sizes=strategies.data(),
flushes=strategies.data(),
)
def test_flush_block(self, original, level, chunk_size, input_sizes, flushes):
Gregory Szorc
zstandard: vendor python-zstandard 0.10.1...
r40157 cctx = zstd.ZstdCompressor(level=level)
chunker = cctx.chunker(chunk_size=chunk_size)
dctx = zstd.ZstdDecompressor()
dobj = dctx.decompressobj()
compressed_chunks = []
decompressed_chunks = []
i = 0
while True:
input_size = input_sizes.draw(strategies.integers(1, 4096))
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 source = original[i : i + input_size]
Gregory Szorc
zstandard: vendor python-zstandard 0.10.1...
r40157 if not source:
break
i += input_size
chunks = list(chunker.compress(source))
compressed_chunks.extend(chunks)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 decompressed_chunks.append(dobj.decompress(b"".join(chunks)))
Gregory Szorc
zstandard: vendor python-zstandard 0.10.1...
r40157
if not flushes.draw(strategies.booleans()):
continue
chunks = list(chunker.flush())
compressed_chunks.extend(chunks)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 decompressed_chunks.append(dobj.decompress(b"".join(chunks)))
Gregory Szorc
zstandard: vendor python-zstandard 0.10.1...
r40157
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(b"".join(decompressed_chunks), original[0:i])
Gregory Szorc
zstandard: vendor python-zstandard 0.10.1...
r40157
chunks = list(chunker.finish())
compressed_chunks.extend(chunks)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 decompressed_chunks.append(dobj.decompress(b"".join(chunks)))
Gregory Szorc
zstandard: vendor python-zstandard 0.10.1...
r40157
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(
dctx.decompress(b"".join(compressed_chunks), max_output_size=len(original)),
original,
)
self.assertEqual(b"".join(decompressed_chunks), original)