##// END OF EJS Templates
push: allow to specify multiple destinations...
push: allow to specify multiple destinations I end up needing that on a regular basis and it turn out to be very simple to implement. See documentation and test for details. Differential Revision: https://phab.mercurial-scm.org/D10161

File last commit:

r44605:5e84a96d default
r47536:066b8d8f default
Show More
test_compressor_fuzzing.py
884 lines | 28.5 KiB | text/x-python | PythonLexer
/ contrib / python-zstandard / tests / test_compressor_fuzzing.py
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 import io
import os
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 import unittest
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796
try:
import hypothesis
import hypothesis.strategies as strategies
except ImportError:
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 raise unittest.SkipTest("hypothesis not available")
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 import zstandard as zstd
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 from .common import (
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 make_cffi,
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 NonClosingBytesIO,
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 random_input_data,
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 TestCase,
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 )
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 @unittest.skipUnless("ZSTD_SLOW_TESTS" in os.environ, "ZSTD_SLOW_TESTS not set")
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 @make_cffi
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 class TestCompressor_stream_reader_fuzzing(TestCase):
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 @hypothesis.settings(
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 suppress_health_check=[hypothesis.HealthCheck.large_base_example]
)
@hypothesis.given(
original=strategies.sampled_from(random_input_data()),
level=strategies.integers(min_value=1, max_value=5),
source_read_size=strategies.integers(1, 16384),
Gregory Szorc
python-zstandard: blacken at 80 characters...
r44605 read_size=strategies.integers(
-1, zstd.COMPRESSION_RECOMMENDED_OUTPUT_SIZE
),
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 )
Gregory Szorc
python-zstandard: blacken at 80 characters...
r44605 def test_stream_source_read(
self, original, level, source_read_size, read_size
):
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 if read_size == 0:
read_size = -1
refctx = zstd.ZstdCompressor(level=level)
ref_frame = refctx.compress(original)
cctx = zstd.ZstdCompressor(level=level)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 with cctx.stream_reader(
io.BytesIO(original), size=len(original), read_size=source_read_size
) as reader:
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 chunks = []
while True:
chunk = reader.read(read_size)
if not chunk:
break
chunks.append(chunk)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(b"".join(chunks), ref_frame)
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
@hypothesis.settings(
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 suppress_health_check=[hypothesis.HealthCheck.large_base_example]
)
@hypothesis.given(
original=strategies.sampled_from(random_input_data()),
level=strategies.integers(min_value=1, max_value=5),
source_read_size=strategies.integers(1, 16384),
Gregory Szorc
python-zstandard: blacken at 80 characters...
r44605 read_size=strategies.integers(
-1, zstd.COMPRESSION_RECOMMENDED_OUTPUT_SIZE
),
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 )
Gregory Szorc
python-zstandard: blacken at 80 characters...
r44605 def test_buffer_source_read(
self, original, level, source_read_size, read_size
):
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 if read_size == 0:
read_size = -1
refctx = zstd.ZstdCompressor(level=level)
ref_frame = refctx.compress(original)
cctx = zstd.ZstdCompressor(level=level)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 with cctx.stream_reader(
original, size=len(original), read_size=source_read_size
) as reader:
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 chunks = []
while True:
chunk = reader.read(read_size)
if not chunk:
break
chunks.append(chunk)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(b"".join(chunks), ref_frame)
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
@hypothesis.settings(
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 suppress_health_check=[
hypothesis.HealthCheck.large_base_example,
hypothesis.HealthCheck.too_slow,
]
)
@hypothesis.given(
original=strategies.sampled_from(random_input_data()),
level=strategies.integers(min_value=1, max_value=5),
source_read_size=strategies.integers(1, 16384),
read_sizes=strategies.data(),
)
def test_stream_source_read_variance(
self, original, level, source_read_size, read_sizes
):
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 refctx = zstd.ZstdCompressor(level=level)
ref_frame = refctx.compress(original)
cctx = zstd.ZstdCompressor(level=level)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 with cctx.stream_reader(
io.BytesIO(original), size=len(original), read_size=source_read_size
) as reader:
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 chunks = []
while True:
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 read_size = read_sizes.draw(strategies.integers(-1, 16384))
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 chunk = reader.read(read_size)
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 if not chunk and read_size:
break
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513
chunks.append(chunk)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(b"".join(chunks), ref_frame)
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 @hypothesis.settings(
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 suppress_health_check=[
hypothesis.HealthCheck.large_base_example,
hypothesis.HealthCheck.too_slow,
]
)
@hypothesis.given(
original=strategies.sampled_from(random_input_data()),
level=strategies.integers(min_value=1, max_value=5),
source_read_size=strategies.integers(1, 16384),
read_sizes=strategies.data(),
)
def test_buffer_source_read_variance(
self, original, level, source_read_size, read_sizes
):
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513
refctx = zstd.ZstdCompressor(level=level)
ref_frame = refctx.compress(original)
cctx = zstd.ZstdCompressor(level=level)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 with cctx.stream_reader(
original, size=len(original), read_size=source_read_size
) as reader:
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 chunks = []
while True:
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 read_size = read_sizes.draw(strategies.integers(-1, 16384))
chunk = reader.read(read_size)
if not chunk and read_size:
break
chunks.append(chunk)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(b"".join(chunks), ref_frame)
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
@hypothesis.settings(
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 suppress_health_check=[hypothesis.HealthCheck.large_base_example]
)
@hypothesis.given(
original=strategies.sampled_from(random_input_data()),
level=strategies.integers(min_value=1, max_value=5),
source_read_size=strategies.integers(1, 16384),
Gregory Szorc
python-zstandard: blacken at 80 characters...
r44605 read_size=strategies.integers(
1, zstd.COMPRESSION_RECOMMENDED_OUTPUT_SIZE
),
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 )
Gregory Szorc
python-zstandard: blacken at 80 characters...
r44605 def test_stream_source_readinto(
self, original, level, source_read_size, read_size
):
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 refctx = zstd.ZstdCompressor(level=level)
ref_frame = refctx.compress(original)
cctx = zstd.ZstdCompressor(level=level)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 with cctx.stream_reader(
io.BytesIO(original), size=len(original), read_size=source_read_size
) as reader:
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 chunks = []
while True:
b = bytearray(read_size)
count = reader.readinto(b)
if not count:
break
chunks.append(bytes(b[0:count]))
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(b"".join(chunks), ref_frame)
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
@hypothesis.settings(
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 suppress_health_check=[hypothesis.HealthCheck.large_base_example]
)
@hypothesis.given(
original=strategies.sampled_from(random_input_data()),
level=strategies.integers(min_value=1, max_value=5),
source_read_size=strategies.integers(1, 16384),
Gregory Szorc
python-zstandard: blacken at 80 characters...
r44605 read_size=strategies.integers(
1, zstd.COMPRESSION_RECOMMENDED_OUTPUT_SIZE
),
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 )
Gregory Szorc
python-zstandard: blacken at 80 characters...
r44605 def test_buffer_source_readinto(
self, original, level, source_read_size, read_size
):
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
refctx = zstd.ZstdCompressor(level=level)
ref_frame = refctx.compress(original)
cctx = zstd.ZstdCompressor(level=level)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 with cctx.stream_reader(
original, size=len(original), read_size=source_read_size
) as reader:
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 chunks = []
while True:
b = bytearray(read_size)
count = reader.readinto(b)
if not count:
break
chunks.append(bytes(b[0:count]))
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(b"".join(chunks), ref_frame)
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
@hypothesis.settings(
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 suppress_health_check=[
hypothesis.HealthCheck.large_base_example,
hypothesis.HealthCheck.too_slow,
]
)
@hypothesis.given(
original=strategies.sampled_from(random_input_data()),
level=strategies.integers(min_value=1, max_value=5),
source_read_size=strategies.integers(1, 16384),
read_sizes=strategies.data(),
)
def test_stream_source_readinto_variance(
self, original, level, source_read_size, read_sizes
):
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 refctx = zstd.ZstdCompressor(level=level)
ref_frame = refctx.compress(original)
cctx = zstd.ZstdCompressor(level=level)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 with cctx.stream_reader(
io.BytesIO(original), size=len(original), read_size=source_read_size
) as reader:
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 chunks = []
while True:
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 read_size = read_sizes.draw(strategies.integers(1, 16384))
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 b = bytearray(read_size)
count = reader.readinto(b)
if not count:
break
chunks.append(bytes(b[0:count]))
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(b"".join(chunks), ref_frame)
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
@hypothesis.settings(
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 suppress_health_check=[
hypothesis.HealthCheck.large_base_example,
hypothesis.HealthCheck.too_slow,
]
)
@hypothesis.given(
original=strategies.sampled_from(random_input_data()),
level=strategies.integers(min_value=1, max_value=5),
source_read_size=strategies.integers(1, 16384),
read_sizes=strategies.data(),
)
def test_buffer_source_readinto_variance(
self, original, level, source_read_size, read_sizes
):
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
refctx = zstd.ZstdCompressor(level=level)
ref_frame = refctx.compress(original)
cctx = zstd.ZstdCompressor(level=level)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 with cctx.stream_reader(
original, size=len(original), read_size=source_read_size
) as reader:
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 chunks = []
while True:
read_size = read_sizes.draw(strategies.integers(1, 16384))
b = bytearray(read_size)
count = reader.readinto(b)
if not count:
break
chunks.append(bytes(b[0:count]))
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(b"".join(chunks), ref_frame)
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
@hypothesis.settings(
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 suppress_health_check=[hypothesis.HealthCheck.large_base_example]
)
@hypothesis.given(
original=strategies.sampled_from(random_input_data()),
level=strategies.integers(min_value=1, max_value=5),
source_read_size=strategies.integers(1, 16384),
Gregory Szorc
python-zstandard: blacken at 80 characters...
r44605 read_size=strategies.integers(
-1, zstd.COMPRESSION_RECOMMENDED_OUTPUT_SIZE
),
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 )
Gregory Szorc
python-zstandard: blacken at 80 characters...
r44605 def test_stream_source_read1(
self, original, level, source_read_size, read_size
):
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 if read_size == 0:
read_size = -1
refctx = zstd.ZstdCompressor(level=level)
ref_frame = refctx.compress(original)
cctx = zstd.ZstdCompressor(level=level)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 with cctx.stream_reader(
io.BytesIO(original), size=len(original), read_size=source_read_size
) as reader:
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 chunks = []
while True:
chunk = reader.read1(read_size)
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 if not chunk:
break
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 chunks.append(chunk)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(b"".join(chunks), ref_frame)
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 @hypothesis.settings(
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 suppress_health_check=[hypothesis.HealthCheck.large_base_example]
)
@hypothesis.given(
original=strategies.sampled_from(random_input_data()),
level=strategies.integers(min_value=1, max_value=5),
source_read_size=strategies.integers(1, 16384),
Gregory Szorc
python-zstandard: blacken at 80 characters...
r44605 read_size=strategies.integers(
-1, zstd.COMPRESSION_RECOMMENDED_OUTPUT_SIZE
),
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 )
Gregory Szorc
python-zstandard: blacken at 80 characters...
r44605 def test_buffer_source_read1(
self, original, level, source_read_size, read_size
):
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 if read_size == 0:
read_size = -1
refctx = zstd.ZstdCompressor(level=level)
ref_frame = refctx.compress(original)
cctx = zstd.ZstdCompressor(level=level)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 with cctx.stream_reader(
original, size=len(original), read_size=source_read_size
) as reader:
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 chunks = []
while True:
chunk = reader.read1(read_size)
if not chunk:
break
chunks.append(chunk)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(b"".join(chunks), ref_frame)
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
@hypothesis.settings(
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 suppress_health_check=[
hypothesis.HealthCheck.large_base_example,
hypothesis.HealthCheck.too_slow,
]
)
@hypothesis.given(
original=strategies.sampled_from(random_input_data()),
level=strategies.integers(min_value=1, max_value=5),
source_read_size=strategies.integers(1, 16384),
read_sizes=strategies.data(),
)
def test_stream_source_read1_variance(
self, original, level, source_read_size, read_sizes
):
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 refctx = zstd.ZstdCompressor(level=level)
ref_frame = refctx.compress(original)
cctx = zstd.ZstdCompressor(level=level)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 with cctx.stream_reader(
io.BytesIO(original), size=len(original), read_size=source_read_size
) as reader:
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 chunks = []
while True:
read_size = read_sizes.draw(strategies.integers(-1, 16384))
chunk = reader.read1(read_size)
if not chunk and read_size:
break
chunks.append(chunk)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(b"".join(chunks), ref_frame)
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
@hypothesis.settings(
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 suppress_health_check=[
hypothesis.HealthCheck.large_base_example,
hypothesis.HealthCheck.too_slow,
]
)
@hypothesis.given(
original=strategies.sampled_from(random_input_data()),
level=strategies.integers(min_value=1, max_value=5),
source_read_size=strategies.integers(1, 16384),
read_sizes=strategies.data(),
)
def test_buffer_source_read1_variance(
self, original, level, source_read_size, read_sizes
):
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
refctx = zstd.ZstdCompressor(level=level)
ref_frame = refctx.compress(original)
cctx = zstd.ZstdCompressor(level=level)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 with cctx.stream_reader(
original, size=len(original), read_size=source_read_size
) as reader:
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 chunks = []
while True:
read_size = read_sizes.draw(strategies.integers(-1, 16384))
chunk = reader.read1(read_size)
if not chunk and read_size:
break
chunks.append(chunk)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(b"".join(chunks), ref_frame)
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
@hypothesis.settings(
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 suppress_health_check=[hypothesis.HealthCheck.large_base_example]
)
@hypothesis.given(
original=strategies.sampled_from(random_input_data()),
level=strategies.integers(min_value=1, max_value=5),
source_read_size=strategies.integers(1, 16384),
Gregory Szorc
python-zstandard: blacken at 80 characters...
r44605 read_size=strategies.integers(
1, zstd.COMPRESSION_RECOMMENDED_OUTPUT_SIZE
),
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 )
def test_stream_source_readinto1(
self, original, level, source_read_size, read_size
):
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 if read_size == 0:
read_size = -1
refctx = zstd.ZstdCompressor(level=level)
ref_frame = refctx.compress(original)
cctx = zstd.ZstdCompressor(level=level)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 with cctx.stream_reader(
io.BytesIO(original), size=len(original), read_size=source_read_size
) as reader:
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 chunks = []
while True:
b = bytearray(read_size)
count = reader.readinto1(b)
if not count:
break
chunks.append(bytes(b[0:count]))
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(b"".join(chunks), ref_frame)
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
@hypothesis.settings(
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 suppress_health_check=[hypothesis.HealthCheck.large_base_example]
)
@hypothesis.given(
original=strategies.sampled_from(random_input_data()),
level=strategies.integers(min_value=1, max_value=5),
source_read_size=strategies.integers(1, 16384),
Gregory Szorc
python-zstandard: blacken at 80 characters...
r44605 read_size=strategies.integers(
1, zstd.COMPRESSION_RECOMMENDED_OUTPUT_SIZE
),
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 )
def test_buffer_source_readinto1(
self, original, level, source_read_size, read_size
):
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 if read_size == 0:
read_size = -1
refctx = zstd.ZstdCompressor(level=level)
ref_frame = refctx.compress(original)
cctx = zstd.ZstdCompressor(level=level)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 with cctx.stream_reader(
original, size=len(original), read_size=source_read_size
) as reader:
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 chunks = []
while True:
b = bytearray(read_size)
count = reader.readinto1(b)
if not count:
break
chunks.append(bytes(b[0:count]))
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(b"".join(chunks), ref_frame)
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
@hypothesis.settings(
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 suppress_health_check=[
hypothesis.HealthCheck.large_base_example,
hypothesis.HealthCheck.too_slow,
]
)
@hypothesis.given(
original=strategies.sampled_from(random_input_data()),
level=strategies.integers(min_value=1, max_value=5),
source_read_size=strategies.integers(1, 16384),
read_sizes=strategies.data(),
)
def test_stream_source_readinto1_variance(
self, original, level, source_read_size, read_sizes
):
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 refctx = zstd.ZstdCompressor(level=level)
ref_frame = refctx.compress(original)
cctx = zstd.ZstdCompressor(level=level)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 with cctx.stream_reader(
io.BytesIO(original), size=len(original), read_size=source_read_size
) as reader:
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 chunks = []
while True:
read_size = read_sizes.draw(strategies.integers(1, 16384))
b = bytearray(read_size)
count = reader.readinto1(b)
if not count:
break
chunks.append(bytes(b[0:count]))
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(b"".join(chunks), ref_frame)
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
@hypothesis.settings(
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 suppress_health_check=[
hypothesis.HealthCheck.large_base_example,
hypothesis.HealthCheck.too_slow,
]
)
@hypothesis.given(
original=strategies.sampled_from(random_input_data()),
level=strategies.integers(min_value=1, max_value=5),
source_read_size=strategies.integers(1, 16384),
read_sizes=strategies.data(),
)
def test_buffer_source_readinto1_variance(
self, original, level, source_read_size, read_sizes
):
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
refctx = zstd.ZstdCompressor(level=level)
ref_frame = refctx.compress(original)
cctx = zstd.ZstdCompressor(level=level)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 with cctx.stream_reader(
original, size=len(original), read_size=source_read_size
) as reader:
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 chunks = []
while True:
read_size = read_sizes.draw(strategies.integers(1, 16384))
b = bytearray(read_size)
count = reader.readinto1(b)
if not count:
break
chunks.append(bytes(b[0:count]))
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(b"".join(chunks), ref_frame)
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 @unittest.skipUnless("ZSTD_SLOW_TESTS" in os.environ, "ZSTD_SLOW_TESTS not set")
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 @make_cffi
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 class TestCompressor_stream_writer_fuzzing(TestCase):
@hypothesis.given(
original=strategies.sampled_from(random_input_data()),
level=strategies.integers(min_value=1, max_value=5),
write_size=strategies.integers(min_value=1, max_value=1048576),
)
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 def test_write_size_variance(self, original, level, write_size):
refctx = zstd.ZstdCompressor(level=level)
ref_frame = refctx.compress(original)
cctx = zstd.ZstdCompressor(level=level)
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 b = NonClosingBytesIO()
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 with cctx.stream_writer(
b, size=len(original), write_size=write_size
) as compressor:
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 compressor.write(original)
self.assertEqual(b.getvalue(), ref_frame)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 @unittest.skipUnless("ZSTD_SLOW_TESTS" in os.environ, "ZSTD_SLOW_TESTS not set")
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 @make_cffi
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 class TestCompressor_copy_stream_fuzzing(TestCase):
@hypothesis.given(
original=strategies.sampled_from(random_input_data()),
level=strategies.integers(min_value=1, max_value=5),
read_size=strategies.integers(min_value=1, max_value=1048576),
write_size=strategies.integers(min_value=1, max_value=1048576),
)
Gregory Szorc
python-zstandard: blacken at 80 characters...
r44605 def test_read_write_size_variance(
self, original, level, read_size, write_size
):
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 refctx = zstd.ZstdCompressor(level=level)
ref_frame = refctx.compress(original)
cctx = zstd.ZstdCompressor(level=level)
source = io.BytesIO(original)
dest = io.BytesIO()
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 cctx.copy_stream(
Gregory Szorc
python-zstandard: blacken at 80 characters...
r44605 source,
dest,
size=len(original),
read_size=read_size,
write_size=write_size,
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 )
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796
self.assertEqual(dest.getvalue(), ref_frame)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 @unittest.skipUnless("ZSTD_SLOW_TESTS" in os.environ, "ZSTD_SLOW_TESTS not set")
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 @make_cffi
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 class TestCompressor_compressobj_fuzzing(TestCase):
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 @hypothesis.settings(
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 suppress_health_check=[
hypothesis.HealthCheck.large_base_example,
hypothesis.HealthCheck.too_slow,
]
)
@hypothesis.given(
original=strategies.sampled_from(random_input_data()),
level=strategies.integers(min_value=1, max_value=5),
chunk_sizes=strategies.data(),
)
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 def test_random_input_sizes(self, original, level, chunk_sizes):
refctx = zstd.ZstdCompressor(level=level)
ref_frame = refctx.compress(original)
cctx = zstd.ZstdCompressor(level=level)
cobj = cctx.compressobj(size=len(original))
chunks = []
i = 0
while True:
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 chunk_size = chunk_sizes.draw(strategies.integers(1, 4096))
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 source = original[i : i + chunk_size]
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 if not source:
break
chunks.append(cobj.compress(source))
i += chunk_size
chunks.append(cobj.flush())
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(b"".join(chunks), ref_frame)
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796
Gregory Szorc
zstandard: vendor python-zstandard 0.10.1...
r40157 @hypothesis.settings(
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 suppress_health_check=[
hypothesis.HealthCheck.large_base_example,
hypothesis.HealthCheck.too_slow,
]
)
@hypothesis.given(
original=strategies.sampled_from(random_input_data()),
level=strategies.integers(min_value=1, max_value=5),
chunk_sizes=strategies.data(),
flushes=strategies.data(),
)
Gregory Szorc
zstandard: vendor python-zstandard 0.10.1...
r40157 def test_flush_block(self, original, level, chunk_sizes, flushes):
cctx = zstd.ZstdCompressor(level=level)
cobj = cctx.compressobj()
dctx = zstd.ZstdDecompressor()
dobj = dctx.decompressobj()
compressed_chunks = []
decompressed_chunks = []
i = 0
while True:
input_size = chunk_sizes.draw(strategies.integers(1, 4096))
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 source = original[i : i + input_size]
Gregory Szorc
zstandard: vendor python-zstandard 0.10.1...
r40157 if not source:
break
i += input_size
chunk = cobj.compress(source)
compressed_chunks.append(chunk)
decompressed_chunks.append(dobj.decompress(chunk))
if not flushes.draw(strategies.booleans()):
continue
chunk = cobj.flush(zstd.COMPRESSOBJ_FLUSH_BLOCK)
compressed_chunks.append(chunk)
decompressed_chunks.append(dobj.decompress(chunk))
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(b"".join(decompressed_chunks), original[0:i])
Gregory Szorc
zstandard: vendor python-zstandard 0.10.1...
r40157
chunk = cobj.flush(zstd.COMPRESSOBJ_FLUSH_FINISH)
compressed_chunks.append(chunk)
decompressed_chunks.append(dobj.decompress(chunk))
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(
Gregory Szorc
python-zstandard: blacken at 80 characters...
r44605 dctx.decompress(
b"".join(compressed_chunks), max_output_size=len(original)
),
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 original,
)
self.assertEqual(b"".join(decompressed_chunks), original)
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 @unittest.skipUnless("ZSTD_SLOW_TESTS" in os.environ, "ZSTD_SLOW_TESTS not set")
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 @make_cffi
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 class TestCompressor_read_to_iter_fuzzing(TestCase):
@hypothesis.given(
original=strategies.sampled_from(random_input_data()),
level=strategies.integers(min_value=1, max_value=5),
read_size=strategies.integers(min_value=1, max_value=4096),
write_size=strategies.integers(min_value=1, max_value=4096),
)
Gregory Szorc
python-zstandard: blacken at 80 characters...
r44605 def test_read_write_size_variance(
self, original, level, read_size, write_size
):
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 refcctx = zstd.ZstdCompressor(level=level)
ref_frame = refcctx.compress(original)
source = io.BytesIO(original)
cctx = zstd.ZstdCompressor(level=level)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 chunks = list(
cctx.read_to_iter(
Gregory Szorc
python-zstandard: blacken at 80 characters...
r44605 source,
size=len(original),
read_size=read_size,
write_size=write_size,
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 )
)
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(b"".join(chunks), ref_frame)
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 @unittest.skipUnless("ZSTD_SLOW_TESTS" in os.environ, "ZSTD_SLOW_TESTS not set")
class TestCompressor_multi_compress_to_buffer_fuzzing(TestCase):
@hypothesis.given(
original=strategies.lists(
Gregory Szorc
python-zstandard: blacken at 80 characters...
r44605 strategies.sampled_from(random_input_data()),
min_size=1,
max_size=1024,
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 ),
threads=strategies.integers(min_value=1, max_value=8),
use_dict=strategies.booleans(),
)
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 def test_data_equivalence(self, original, threads, use_dict):
kwargs = {}
# Use a content dictionary because it is cheap to create.
if use_dict:
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 kwargs["dict_data"] = zstd.ZstdCompressionDict(original[0])
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 cctx = zstd.ZstdCompressor(level=1, write_checksum=True, **kwargs)
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 if not hasattr(cctx, "multi_compress_to_buffer"):
self.skipTest("multi_compress_to_buffer not available")
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 result = cctx.multi_compress_to_buffer(original, threads=-1)
self.assertEqual(len(result), len(original))
# The frame produced via the batch APIs may not be bit identical to that
# produced by compress() because compression parameters are adjusted
# from the first input in batch mode. So the only thing we can do is
# verify the decompressed data matches the input.
dctx = zstd.ZstdDecompressor(**kwargs)
for i, frame in enumerate(result):
self.assertEqual(dctx.decompress(frame), original[i])
Gregory Szorc
zstandard: vendor python-zstandard 0.10.1...
r40157
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 @unittest.skipUnless("ZSTD_SLOW_TESTS" in os.environ, "ZSTD_SLOW_TESTS not set")
Gregory Szorc
zstandard: vendor python-zstandard 0.10.1...
r40157 @make_cffi
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 class TestCompressor_chunker_fuzzing(TestCase):
Gregory Szorc
zstandard: vendor python-zstandard 0.10.1...
r40157 @hypothesis.settings(
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 suppress_health_check=[
hypothesis.HealthCheck.large_base_example,
hypothesis.HealthCheck.too_slow,
]
)
@hypothesis.given(
original=strategies.sampled_from(random_input_data()),
level=strategies.integers(min_value=1, max_value=5),
chunk_size=strategies.integers(min_value=1, max_value=32 * 1048576),
input_sizes=strategies.data(),
)
Gregory Szorc
zstandard: vendor python-zstandard 0.10.1...
r40157 def test_random_input_sizes(self, original, level, chunk_size, input_sizes):
cctx = zstd.ZstdCompressor(level=level)
chunker = cctx.chunker(chunk_size=chunk_size)
chunks = []
i = 0
while True:
input_size = input_sizes.draw(strategies.integers(1, 4096))
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 source = original[i : i + input_size]
Gregory Szorc
zstandard: vendor python-zstandard 0.10.1...
r40157 if not source:
break
chunks.extend(chunker.compress(source))
i += input_size
chunks.extend(chunker.finish())
dctx = zstd.ZstdDecompressor()
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(
Gregory Szorc
python-zstandard: blacken at 80 characters...
r44605 dctx.decompress(b"".join(chunks), max_output_size=len(original)),
original,
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 )
Gregory Szorc
zstandard: vendor python-zstandard 0.10.1...
r40157
self.assertTrue(all(len(chunk) == chunk_size for chunk in chunks[:-1]))
@hypothesis.settings(
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 suppress_health_check=[
hypothesis.HealthCheck.large_base_example,
hypothesis.HealthCheck.too_slow,
]
)
@hypothesis.given(
original=strategies.sampled_from(random_input_data()),
level=strategies.integers(min_value=1, max_value=5),
chunk_size=strategies.integers(min_value=1, max_value=32 * 1048576),
input_sizes=strategies.data(),
flushes=strategies.data(),
)
Gregory Szorc
python-zstandard: blacken at 80 characters...
r44605 def test_flush_block(
self, original, level, chunk_size, input_sizes, flushes
):
Gregory Szorc
zstandard: vendor python-zstandard 0.10.1...
r40157 cctx = zstd.ZstdCompressor(level=level)
chunker = cctx.chunker(chunk_size=chunk_size)
dctx = zstd.ZstdDecompressor()
dobj = dctx.decompressobj()
compressed_chunks = []
decompressed_chunks = []
i = 0
while True:
input_size = input_sizes.draw(strategies.integers(1, 4096))
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 source = original[i : i + input_size]
Gregory Szorc
zstandard: vendor python-zstandard 0.10.1...
r40157 if not source:
break
i += input_size
chunks = list(chunker.compress(source))
compressed_chunks.extend(chunks)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 decompressed_chunks.append(dobj.decompress(b"".join(chunks)))
Gregory Szorc
zstandard: vendor python-zstandard 0.10.1...
r40157
if not flushes.draw(strategies.booleans()):
continue
chunks = list(chunker.flush())
compressed_chunks.extend(chunks)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 decompressed_chunks.append(dobj.decompress(b"".join(chunks)))
Gregory Szorc
zstandard: vendor python-zstandard 0.10.1...
r40157
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(b"".join(decompressed_chunks), original[0:i])
Gregory Szorc
zstandard: vendor python-zstandard 0.10.1...
r40157
chunks = list(chunker.finish())
compressed_chunks.extend(chunks)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 decompressed_chunks.append(dobj.decompress(b"".join(chunks)))
Gregory Szorc
zstandard: vendor python-zstandard 0.10.1...
r40157
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(
Gregory Szorc
python-zstandard: blacken at 80 characters...
r44605 dctx.decompress(
b"".join(compressed_chunks), max_output_size=len(original)
),
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 original,
)
self.assertEqual(b"".join(decompressed_chunks), original)