##// END OF EJS Templates
debian: support building a single deb for multiple py3 versions...
debian: support building a single deb for multiple py3 versions Around transitions from one python minor version to another (such as 3.7 to 3.8), the current packaging can be slightly problematic - it produces a `control` file that requires that the version of `python3` that's installed be exactly the one that was used on the build machine for the `mercurial` package, by containing a line like: Depends: sensible-utils, libc6 (>= 2.14), python3 (<< 3.8), python3 (>= 3.7~), python3:any (>= 3.5~) This is because it "knows" we only built for v3.7, which is the current default on my system. By building the native components for multiple versions, we can make it produce a line like this, which is compatible with 3.7 AND 3.8: Depends: sensible-utils, libc6 (>= 2.14), python3 (<< 3.9), python3 (>= 3.7~), python3:any (>= 3.5~) This isn't *normally* required, so I'm not making it the default. For those that receive their python3 and mercurial packages from their distro, and/or don't have to worry about a situation where the team that manages the python3 installation isn't the same as the team that manages the mercurial installation, this is probably not necessary. I chose the names `DEB_HG_*` because `DEB_*` is passed through `debuild` automatically (otherwise we'd have to explicitly allow the options through, which is a nuisance), and the `HG` part is to make it clear that this isn't a "standard" debian option that other packages might respect. Test Plan: 1. "nothing changed": - built a deb without these changes - built a deb with these changes but everything at the default - used diffoscope to compare, all differences were due to timestamps 2. "explicit is the same as implicit" (single version) - built a deb with everything at the default - built a deb with DEB_HG_PYTHON_VERSIONS=3.7 - used diffoscope to compare, all differences were due to timestamps 3. "explicit is the same as implicit" (multi version) - built a deb with DEB_HG_MULTI_VERSION=1 - built a deb with DEB_HG_PYTHON_VERSIONS=3.7 - used diffoscope to compare, all differences were due to timestamps 4. (single version, 3.7) doesn't work with python3.8 - `/usr/bin/python3.7 /usr/bin/hg debuginstall` works - `/usr/bin/python3.8 /usr/bin/hg debuginstall` crashes 5. (multi version, 3.7 + 3.8) - `/usr/bin/python3.7 /usr/bin/hg debuginstall` works - `/usr/bin/python3.8 /usr/bin/hg debuginstall` works Differential Revision: https://phab.mercurial-scm.org/D8642

File last commit:

r44605:5e84a96d default
r45543:36178b5c default
Show More
test_compressor_fuzzing.py
884 lines | 28.5 KiB | text/x-python | PythonLexer
/ contrib / python-zstandard / tests / test_compressor_fuzzing.py
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 import io
import os
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 import unittest
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796
try:
import hypothesis
import hypothesis.strategies as strategies
except ImportError:
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 raise unittest.SkipTest("hypothesis not available")
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 import zstandard as zstd
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 from .common import (
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 make_cffi,
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 NonClosingBytesIO,
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 random_input_data,
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 TestCase,
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 )
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 @unittest.skipUnless("ZSTD_SLOW_TESTS" in os.environ, "ZSTD_SLOW_TESTS not set")
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 @make_cffi
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 class TestCompressor_stream_reader_fuzzing(TestCase):
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 @hypothesis.settings(
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 suppress_health_check=[hypothesis.HealthCheck.large_base_example]
)
@hypothesis.given(
original=strategies.sampled_from(random_input_data()),
level=strategies.integers(min_value=1, max_value=5),
source_read_size=strategies.integers(1, 16384),
Gregory Szorc
python-zstandard: blacken at 80 characters...
r44605 read_size=strategies.integers(
-1, zstd.COMPRESSION_RECOMMENDED_OUTPUT_SIZE
),
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 )
Gregory Szorc
python-zstandard: blacken at 80 characters...
r44605 def test_stream_source_read(
self, original, level, source_read_size, read_size
):
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 if read_size == 0:
read_size = -1
refctx = zstd.ZstdCompressor(level=level)
ref_frame = refctx.compress(original)
cctx = zstd.ZstdCompressor(level=level)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 with cctx.stream_reader(
io.BytesIO(original), size=len(original), read_size=source_read_size
) as reader:
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 chunks = []
while True:
chunk = reader.read(read_size)
if not chunk:
break
chunks.append(chunk)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(b"".join(chunks), ref_frame)
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
@hypothesis.settings(
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 suppress_health_check=[hypothesis.HealthCheck.large_base_example]
)
@hypothesis.given(
original=strategies.sampled_from(random_input_data()),
level=strategies.integers(min_value=1, max_value=5),
source_read_size=strategies.integers(1, 16384),
Gregory Szorc
python-zstandard: blacken at 80 characters...
r44605 read_size=strategies.integers(
-1, zstd.COMPRESSION_RECOMMENDED_OUTPUT_SIZE
),
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 )
Gregory Szorc
python-zstandard: blacken at 80 characters...
r44605 def test_buffer_source_read(
self, original, level, source_read_size, read_size
):
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 if read_size == 0:
read_size = -1
refctx = zstd.ZstdCompressor(level=level)
ref_frame = refctx.compress(original)
cctx = zstd.ZstdCompressor(level=level)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 with cctx.stream_reader(
original, size=len(original), read_size=source_read_size
) as reader:
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 chunks = []
while True:
chunk = reader.read(read_size)
if not chunk:
break
chunks.append(chunk)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(b"".join(chunks), ref_frame)
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
@hypothesis.settings(
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 suppress_health_check=[
hypothesis.HealthCheck.large_base_example,
hypothesis.HealthCheck.too_slow,
]
)
@hypothesis.given(
original=strategies.sampled_from(random_input_data()),
level=strategies.integers(min_value=1, max_value=5),
source_read_size=strategies.integers(1, 16384),
read_sizes=strategies.data(),
)
def test_stream_source_read_variance(
self, original, level, source_read_size, read_sizes
):
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 refctx = zstd.ZstdCompressor(level=level)
ref_frame = refctx.compress(original)
cctx = zstd.ZstdCompressor(level=level)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 with cctx.stream_reader(
io.BytesIO(original), size=len(original), read_size=source_read_size
) as reader:
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 chunks = []
while True:
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 read_size = read_sizes.draw(strategies.integers(-1, 16384))
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 chunk = reader.read(read_size)
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 if not chunk and read_size:
break
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513
chunks.append(chunk)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(b"".join(chunks), ref_frame)
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 @hypothesis.settings(
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 suppress_health_check=[
hypothesis.HealthCheck.large_base_example,
hypothesis.HealthCheck.too_slow,
]
)
@hypothesis.given(
original=strategies.sampled_from(random_input_data()),
level=strategies.integers(min_value=1, max_value=5),
source_read_size=strategies.integers(1, 16384),
read_sizes=strategies.data(),
)
def test_buffer_source_read_variance(
self, original, level, source_read_size, read_sizes
):
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513
refctx = zstd.ZstdCompressor(level=level)
ref_frame = refctx.compress(original)
cctx = zstd.ZstdCompressor(level=level)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 with cctx.stream_reader(
original, size=len(original), read_size=source_read_size
) as reader:
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 chunks = []
while True:
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 read_size = read_sizes.draw(strategies.integers(-1, 16384))
chunk = reader.read(read_size)
if not chunk and read_size:
break
chunks.append(chunk)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(b"".join(chunks), ref_frame)
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
@hypothesis.settings(
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 suppress_health_check=[hypothesis.HealthCheck.large_base_example]
)
@hypothesis.given(
original=strategies.sampled_from(random_input_data()),
level=strategies.integers(min_value=1, max_value=5),
source_read_size=strategies.integers(1, 16384),
Gregory Szorc
python-zstandard: blacken at 80 characters...
r44605 read_size=strategies.integers(
1, zstd.COMPRESSION_RECOMMENDED_OUTPUT_SIZE
),
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 )
Gregory Szorc
python-zstandard: blacken at 80 characters...
r44605 def test_stream_source_readinto(
self, original, level, source_read_size, read_size
):
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 refctx = zstd.ZstdCompressor(level=level)
ref_frame = refctx.compress(original)
cctx = zstd.ZstdCompressor(level=level)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 with cctx.stream_reader(
io.BytesIO(original), size=len(original), read_size=source_read_size
) as reader:
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 chunks = []
while True:
b = bytearray(read_size)
count = reader.readinto(b)
if not count:
break
chunks.append(bytes(b[0:count]))
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(b"".join(chunks), ref_frame)
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
@hypothesis.settings(
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 suppress_health_check=[hypothesis.HealthCheck.large_base_example]
)
@hypothesis.given(
original=strategies.sampled_from(random_input_data()),
level=strategies.integers(min_value=1, max_value=5),
source_read_size=strategies.integers(1, 16384),
Gregory Szorc
python-zstandard: blacken at 80 characters...
r44605 read_size=strategies.integers(
1, zstd.COMPRESSION_RECOMMENDED_OUTPUT_SIZE
),
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 )
Gregory Szorc
python-zstandard: blacken at 80 characters...
r44605 def test_buffer_source_readinto(
self, original, level, source_read_size, read_size
):
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
refctx = zstd.ZstdCompressor(level=level)
ref_frame = refctx.compress(original)
cctx = zstd.ZstdCompressor(level=level)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 with cctx.stream_reader(
original, size=len(original), read_size=source_read_size
) as reader:
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 chunks = []
while True:
b = bytearray(read_size)
count = reader.readinto(b)
if not count:
break
chunks.append(bytes(b[0:count]))
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(b"".join(chunks), ref_frame)
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
@hypothesis.settings(
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 suppress_health_check=[
hypothesis.HealthCheck.large_base_example,
hypothesis.HealthCheck.too_slow,
]
)
@hypothesis.given(
original=strategies.sampled_from(random_input_data()),
level=strategies.integers(min_value=1, max_value=5),
source_read_size=strategies.integers(1, 16384),
read_sizes=strategies.data(),
)
def test_stream_source_readinto_variance(
self, original, level, source_read_size, read_sizes
):
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 refctx = zstd.ZstdCompressor(level=level)
ref_frame = refctx.compress(original)
cctx = zstd.ZstdCompressor(level=level)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 with cctx.stream_reader(
io.BytesIO(original), size=len(original), read_size=source_read_size
) as reader:
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 chunks = []
while True:
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 read_size = read_sizes.draw(strategies.integers(1, 16384))
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 b = bytearray(read_size)
count = reader.readinto(b)
if not count:
break
chunks.append(bytes(b[0:count]))
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(b"".join(chunks), ref_frame)
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
@hypothesis.settings(
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 suppress_health_check=[
hypothesis.HealthCheck.large_base_example,
hypothesis.HealthCheck.too_slow,
]
)
@hypothesis.given(
original=strategies.sampled_from(random_input_data()),
level=strategies.integers(min_value=1, max_value=5),
source_read_size=strategies.integers(1, 16384),
read_sizes=strategies.data(),
)
def test_buffer_source_readinto_variance(
self, original, level, source_read_size, read_sizes
):
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
refctx = zstd.ZstdCompressor(level=level)
ref_frame = refctx.compress(original)
cctx = zstd.ZstdCompressor(level=level)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 with cctx.stream_reader(
original, size=len(original), read_size=source_read_size
) as reader:
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 chunks = []
while True:
read_size = read_sizes.draw(strategies.integers(1, 16384))
b = bytearray(read_size)
count = reader.readinto(b)
if not count:
break
chunks.append(bytes(b[0:count]))
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(b"".join(chunks), ref_frame)
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
@hypothesis.settings(
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 suppress_health_check=[hypothesis.HealthCheck.large_base_example]
)
@hypothesis.given(
original=strategies.sampled_from(random_input_data()),
level=strategies.integers(min_value=1, max_value=5),
source_read_size=strategies.integers(1, 16384),
Gregory Szorc
python-zstandard: blacken at 80 characters...
r44605 read_size=strategies.integers(
-1, zstd.COMPRESSION_RECOMMENDED_OUTPUT_SIZE
),
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 )
Gregory Szorc
python-zstandard: blacken at 80 characters...
r44605 def test_stream_source_read1(
self, original, level, source_read_size, read_size
):
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 if read_size == 0:
read_size = -1
refctx = zstd.ZstdCompressor(level=level)
ref_frame = refctx.compress(original)
cctx = zstd.ZstdCompressor(level=level)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 with cctx.stream_reader(
io.BytesIO(original), size=len(original), read_size=source_read_size
) as reader:
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 chunks = []
while True:
chunk = reader.read1(read_size)
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 if not chunk:
break
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 chunks.append(chunk)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(b"".join(chunks), ref_frame)
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 @hypothesis.settings(
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 suppress_health_check=[hypothesis.HealthCheck.large_base_example]
)
@hypothesis.given(
original=strategies.sampled_from(random_input_data()),
level=strategies.integers(min_value=1, max_value=5),
source_read_size=strategies.integers(1, 16384),
Gregory Szorc
python-zstandard: blacken at 80 characters...
r44605 read_size=strategies.integers(
-1, zstd.COMPRESSION_RECOMMENDED_OUTPUT_SIZE
),
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 )
Gregory Szorc
python-zstandard: blacken at 80 characters...
r44605 def test_buffer_source_read1(
self, original, level, source_read_size, read_size
):
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 if read_size == 0:
read_size = -1
refctx = zstd.ZstdCompressor(level=level)
ref_frame = refctx.compress(original)
cctx = zstd.ZstdCompressor(level=level)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 with cctx.stream_reader(
original, size=len(original), read_size=source_read_size
) as reader:
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 chunks = []
while True:
chunk = reader.read1(read_size)
if not chunk:
break
chunks.append(chunk)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(b"".join(chunks), ref_frame)
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
@hypothesis.settings(
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 suppress_health_check=[
hypothesis.HealthCheck.large_base_example,
hypothesis.HealthCheck.too_slow,
]
)
@hypothesis.given(
original=strategies.sampled_from(random_input_data()),
level=strategies.integers(min_value=1, max_value=5),
source_read_size=strategies.integers(1, 16384),
read_sizes=strategies.data(),
)
def test_stream_source_read1_variance(
self, original, level, source_read_size, read_sizes
):
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 refctx = zstd.ZstdCompressor(level=level)
ref_frame = refctx.compress(original)
cctx = zstd.ZstdCompressor(level=level)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 with cctx.stream_reader(
io.BytesIO(original), size=len(original), read_size=source_read_size
) as reader:
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 chunks = []
while True:
read_size = read_sizes.draw(strategies.integers(-1, 16384))
chunk = reader.read1(read_size)
if not chunk and read_size:
break
chunks.append(chunk)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(b"".join(chunks), ref_frame)
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
@hypothesis.settings(
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 suppress_health_check=[
hypothesis.HealthCheck.large_base_example,
hypothesis.HealthCheck.too_slow,
]
)
@hypothesis.given(
original=strategies.sampled_from(random_input_data()),
level=strategies.integers(min_value=1, max_value=5),
source_read_size=strategies.integers(1, 16384),
read_sizes=strategies.data(),
)
def test_buffer_source_read1_variance(
self, original, level, source_read_size, read_sizes
):
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
refctx = zstd.ZstdCompressor(level=level)
ref_frame = refctx.compress(original)
cctx = zstd.ZstdCompressor(level=level)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 with cctx.stream_reader(
original, size=len(original), read_size=source_read_size
) as reader:
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 chunks = []
while True:
read_size = read_sizes.draw(strategies.integers(-1, 16384))
chunk = reader.read1(read_size)
if not chunk and read_size:
break
chunks.append(chunk)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(b"".join(chunks), ref_frame)
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
@hypothesis.settings(
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 suppress_health_check=[hypothesis.HealthCheck.large_base_example]
)
@hypothesis.given(
original=strategies.sampled_from(random_input_data()),
level=strategies.integers(min_value=1, max_value=5),
source_read_size=strategies.integers(1, 16384),
Gregory Szorc
python-zstandard: blacken at 80 characters...
r44605 read_size=strategies.integers(
1, zstd.COMPRESSION_RECOMMENDED_OUTPUT_SIZE
),
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 )
def test_stream_source_readinto1(
self, original, level, source_read_size, read_size
):
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 if read_size == 0:
read_size = -1
refctx = zstd.ZstdCompressor(level=level)
ref_frame = refctx.compress(original)
cctx = zstd.ZstdCompressor(level=level)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 with cctx.stream_reader(
io.BytesIO(original), size=len(original), read_size=source_read_size
) as reader:
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 chunks = []
while True:
b = bytearray(read_size)
count = reader.readinto1(b)
if not count:
break
chunks.append(bytes(b[0:count]))
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(b"".join(chunks), ref_frame)
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
@hypothesis.settings(
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 suppress_health_check=[hypothesis.HealthCheck.large_base_example]
)
@hypothesis.given(
original=strategies.sampled_from(random_input_data()),
level=strategies.integers(min_value=1, max_value=5),
source_read_size=strategies.integers(1, 16384),
Gregory Szorc
python-zstandard: blacken at 80 characters...
r44605 read_size=strategies.integers(
1, zstd.COMPRESSION_RECOMMENDED_OUTPUT_SIZE
),
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 )
def test_buffer_source_readinto1(
self, original, level, source_read_size, read_size
):
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 if read_size == 0:
read_size = -1
refctx = zstd.ZstdCompressor(level=level)
ref_frame = refctx.compress(original)
cctx = zstd.ZstdCompressor(level=level)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 with cctx.stream_reader(
original, size=len(original), read_size=source_read_size
) as reader:
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 chunks = []
while True:
b = bytearray(read_size)
count = reader.readinto1(b)
if not count:
break
chunks.append(bytes(b[0:count]))
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(b"".join(chunks), ref_frame)
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
@hypothesis.settings(
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 suppress_health_check=[
hypothesis.HealthCheck.large_base_example,
hypothesis.HealthCheck.too_slow,
]
)
@hypothesis.given(
original=strategies.sampled_from(random_input_data()),
level=strategies.integers(min_value=1, max_value=5),
source_read_size=strategies.integers(1, 16384),
read_sizes=strategies.data(),
)
def test_stream_source_readinto1_variance(
self, original, level, source_read_size, read_sizes
):
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 refctx = zstd.ZstdCompressor(level=level)
ref_frame = refctx.compress(original)
cctx = zstd.ZstdCompressor(level=level)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 with cctx.stream_reader(
io.BytesIO(original), size=len(original), read_size=source_read_size
) as reader:
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 chunks = []
while True:
read_size = read_sizes.draw(strategies.integers(1, 16384))
b = bytearray(read_size)
count = reader.readinto1(b)
if not count:
break
chunks.append(bytes(b[0:count]))
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(b"".join(chunks), ref_frame)
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
@hypothesis.settings(
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 suppress_health_check=[
hypothesis.HealthCheck.large_base_example,
hypothesis.HealthCheck.too_slow,
]
)
@hypothesis.given(
original=strategies.sampled_from(random_input_data()),
level=strategies.integers(min_value=1, max_value=5),
source_read_size=strategies.integers(1, 16384),
read_sizes=strategies.data(),
)
def test_buffer_source_readinto1_variance(
self, original, level, source_read_size, read_sizes
):
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
refctx = zstd.ZstdCompressor(level=level)
ref_frame = refctx.compress(original)
cctx = zstd.ZstdCompressor(level=level)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 with cctx.stream_reader(
original, size=len(original), read_size=source_read_size
) as reader:
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 chunks = []
while True:
read_size = read_sizes.draw(strategies.integers(1, 16384))
b = bytearray(read_size)
count = reader.readinto1(b)
if not count:
break
chunks.append(bytes(b[0:count]))
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(b"".join(chunks), ref_frame)
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 @unittest.skipUnless("ZSTD_SLOW_TESTS" in os.environ, "ZSTD_SLOW_TESTS not set")
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 @make_cffi
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 class TestCompressor_stream_writer_fuzzing(TestCase):
@hypothesis.given(
original=strategies.sampled_from(random_input_data()),
level=strategies.integers(min_value=1, max_value=5),
write_size=strategies.integers(min_value=1, max_value=1048576),
)
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 def test_write_size_variance(self, original, level, write_size):
refctx = zstd.ZstdCompressor(level=level)
ref_frame = refctx.compress(original)
cctx = zstd.ZstdCompressor(level=level)
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 b = NonClosingBytesIO()
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 with cctx.stream_writer(
b, size=len(original), write_size=write_size
) as compressor:
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 compressor.write(original)
self.assertEqual(b.getvalue(), ref_frame)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 @unittest.skipUnless("ZSTD_SLOW_TESTS" in os.environ, "ZSTD_SLOW_TESTS not set")
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 @make_cffi
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 class TestCompressor_copy_stream_fuzzing(TestCase):
@hypothesis.given(
original=strategies.sampled_from(random_input_data()),
level=strategies.integers(min_value=1, max_value=5),
read_size=strategies.integers(min_value=1, max_value=1048576),
write_size=strategies.integers(min_value=1, max_value=1048576),
)
Gregory Szorc
python-zstandard: blacken at 80 characters...
r44605 def test_read_write_size_variance(
self, original, level, read_size, write_size
):
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 refctx = zstd.ZstdCompressor(level=level)
ref_frame = refctx.compress(original)
cctx = zstd.ZstdCompressor(level=level)
source = io.BytesIO(original)
dest = io.BytesIO()
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 cctx.copy_stream(
Gregory Szorc
python-zstandard: blacken at 80 characters...
r44605 source,
dest,
size=len(original),
read_size=read_size,
write_size=write_size,
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 )
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796
self.assertEqual(dest.getvalue(), ref_frame)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 @unittest.skipUnless("ZSTD_SLOW_TESTS" in os.environ, "ZSTD_SLOW_TESTS not set")
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 @make_cffi
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 class TestCompressor_compressobj_fuzzing(TestCase):
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 @hypothesis.settings(
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 suppress_health_check=[
hypothesis.HealthCheck.large_base_example,
hypothesis.HealthCheck.too_slow,
]
)
@hypothesis.given(
original=strategies.sampled_from(random_input_data()),
level=strategies.integers(min_value=1, max_value=5),
chunk_sizes=strategies.data(),
)
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 def test_random_input_sizes(self, original, level, chunk_sizes):
refctx = zstd.ZstdCompressor(level=level)
ref_frame = refctx.compress(original)
cctx = zstd.ZstdCompressor(level=level)
cobj = cctx.compressobj(size=len(original))
chunks = []
i = 0
while True:
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 chunk_size = chunk_sizes.draw(strategies.integers(1, 4096))
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 source = original[i : i + chunk_size]
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 if not source:
break
chunks.append(cobj.compress(source))
i += chunk_size
chunks.append(cobj.flush())
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(b"".join(chunks), ref_frame)
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796
Gregory Szorc
zstandard: vendor python-zstandard 0.10.1...
r40157 @hypothesis.settings(
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 suppress_health_check=[
hypothesis.HealthCheck.large_base_example,
hypothesis.HealthCheck.too_slow,
]
)
@hypothesis.given(
original=strategies.sampled_from(random_input_data()),
level=strategies.integers(min_value=1, max_value=5),
chunk_sizes=strategies.data(),
flushes=strategies.data(),
)
Gregory Szorc
zstandard: vendor python-zstandard 0.10.1...
r40157 def test_flush_block(self, original, level, chunk_sizes, flushes):
cctx = zstd.ZstdCompressor(level=level)
cobj = cctx.compressobj()
dctx = zstd.ZstdDecompressor()
dobj = dctx.decompressobj()
compressed_chunks = []
decompressed_chunks = []
i = 0
while True:
input_size = chunk_sizes.draw(strategies.integers(1, 4096))
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 source = original[i : i + input_size]
Gregory Szorc
zstandard: vendor python-zstandard 0.10.1...
r40157 if not source:
break
i += input_size
chunk = cobj.compress(source)
compressed_chunks.append(chunk)
decompressed_chunks.append(dobj.decompress(chunk))
if not flushes.draw(strategies.booleans()):
continue
chunk = cobj.flush(zstd.COMPRESSOBJ_FLUSH_BLOCK)
compressed_chunks.append(chunk)
decompressed_chunks.append(dobj.decompress(chunk))
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(b"".join(decompressed_chunks), original[0:i])
Gregory Szorc
zstandard: vendor python-zstandard 0.10.1...
r40157
chunk = cobj.flush(zstd.COMPRESSOBJ_FLUSH_FINISH)
compressed_chunks.append(chunk)
decompressed_chunks.append(dobj.decompress(chunk))
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(
Gregory Szorc
python-zstandard: blacken at 80 characters...
r44605 dctx.decompress(
b"".join(compressed_chunks), max_output_size=len(original)
),
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 original,
)
self.assertEqual(b"".join(decompressed_chunks), original)
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 @unittest.skipUnless("ZSTD_SLOW_TESTS" in os.environ, "ZSTD_SLOW_TESTS not set")
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 @make_cffi
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 class TestCompressor_read_to_iter_fuzzing(TestCase):
@hypothesis.given(
original=strategies.sampled_from(random_input_data()),
level=strategies.integers(min_value=1, max_value=5),
read_size=strategies.integers(min_value=1, max_value=4096),
write_size=strategies.integers(min_value=1, max_value=4096),
)
Gregory Szorc
python-zstandard: blacken at 80 characters...
r44605 def test_read_write_size_variance(
self, original, level, read_size, write_size
):
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 refcctx = zstd.ZstdCompressor(level=level)
ref_frame = refcctx.compress(original)
source = io.BytesIO(original)
cctx = zstd.ZstdCompressor(level=level)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 chunks = list(
cctx.read_to_iter(
Gregory Szorc
python-zstandard: blacken at 80 characters...
r44605 source,
size=len(original),
read_size=read_size,
write_size=write_size,
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 )
)
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(b"".join(chunks), ref_frame)
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 @unittest.skipUnless("ZSTD_SLOW_TESTS" in os.environ, "ZSTD_SLOW_TESTS not set")
class TestCompressor_multi_compress_to_buffer_fuzzing(TestCase):
@hypothesis.given(
original=strategies.lists(
Gregory Szorc
python-zstandard: blacken at 80 characters...
r44605 strategies.sampled_from(random_input_data()),
min_size=1,
max_size=1024,
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 ),
threads=strategies.integers(min_value=1, max_value=8),
use_dict=strategies.booleans(),
)
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 def test_data_equivalence(self, original, threads, use_dict):
kwargs = {}
# Use a content dictionary because it is cheap to create.
if use_dict:
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 kwargs["dict_data"] = zstd.ZstdCompressionDict(original[0])
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 cctx = zstd.ZstdCompressor(level=1, write_checksum=True, **kwargs)
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 if not hasattr(cctx, "multi_compress_to_buffer"):
self.skipTest("multi_compress_to_buffer not available")
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 result = cctx.multi_compress_to_buffer(original, threads=-1)
self.assertEqual(len(result), len(original))
# The frame produced via the batch APIs may not be bit identical to that
# produced by compress() because compression parameters are adjusted
# from the first input in batch mode. So the only thing we can do is
# verify the decompressed data matches the input.
dctx = zstd.ZstdDecompressor(**kwargs)
for i, frame in enumerate(result):
self.assertEqual(dctx.decompress(frame), original[i])
Gregory Szorc
zstandard: vendor python-zstandard 0.10.1...
r40157
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 @unittest.skipUnless("ZSTD_SLOW_TESTS" in os.environ, "ZSTD_SLOW_TESTS not set")
Gregory Szorc
zstandard: vendor python-zstandard 0.10.1...
r40157 @make_cffi
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 class TestCompressor_chunker_fuzzing(TestCase):
Gregory Szorc
zstandard: vendor python-zstandard 0.10.1...
r40157 @hypothesis.settings(
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 suppress_health_check=[
hypothesis.HealthCheck.large_base_example,
hypothesis.HealthCheck.too_slow,
]
)
@hypothesis.given(
original=strategies.sampled_from(random_input_data()),
level=strategies.integers(min_value=1, max_value=5),
chunk_size=strategies.integers(min_value=1, max_value=32 * 1048576),
input_sizes=strategies.data(),
)
Gregory Szorc
zstandard: vendor python-zstandard 0.10.1...
r40157 def test_random_input_sizes(self, original, level, chunk_size, input_sizes):
cctx = zstd.ZstdCompressor(level=level)
chunker = cctx.chunker(chunk_size=chunk_size)
chunks = []
i = 0
while True:
input_size = input_sizes.draw(strategies.integers(1, 4096))
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 source = original[i : i + input_size]
Gregory Szorc
zstandard: vendor python-zstandard 0.10.1...
r40157 if not source:
break
chunks.extend(chunker.compress(source))
i += input_size
chunks.extend(chunker.finish())
dctx = zstd.ZstdDecompressor()
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(
Gregory Szorc
python-zstandard: blacken at 80 characters...
r44605 dctx.decompress(b"".join(chunks), max_output_size=len(original)),
original,
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 )
Gregory Szorc
zstandard: vendor python-zstandard 0.10.1...
r40157
self.assertTrue(all(len(chunk) == chunk_size for chunk in chunks[:-1]))
@hypothesis.settings(
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 suppress_health_check=[
hypothesis.HealthCheck.large_base_example,
hypothesis.HealthCheck.too_slow,
]
)
@hypothesis.given(
original=strategies.sampled_from(random_input_data()),
level=strategies.integers(min_value=1, max_value=5),
chunk_size=strategies.integers(min_value=1, max_value=32 * 1048576),
input_sizes=strategies.data(),
flushes=strategies.data(),
)
Gregory Szorc
python-zstandard: blacken at 80 characters...
r44605 def test_flush_block(
self, original, level, chunk_size, input_sizes, flushes
):
Gregory Szorc
zstandard: vendor python-zstandard 0.10.1...
r40157 cctx = zstd.ZstdCompressor(level=level)
chunker = cctx.chunker(chunk_size=chunk_size)
dctx = zstd.ZstdDecompressor()
dobj = dctx.decompressobj()
compressed_chunks = []
decompressed_chunks = []
i = 0
while True:
input_size = input_sizes.draw(strategies.integers(1, 4096))
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 source = original[i : i + input_size]
Gregory Szorc
zstandard: vendor python-zstandard 0.10.1...
r40157 if not source:
break
i += input_size
chunks = list(chunker.compress(source))
compressed_chunks.extend(chunks)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 decompressed_chunks.append(dobj.decompress(b"".join(chunks)))
Gregory Szorc
zstandard: vendor python-zstandard 0.10.1...
r40157
if not flushes.draw(strategies.booleans()):
continue
chunks = list(chunker.flush())
compressed_chunks.extend(chunks)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 decompressed_chunks.append(dobj.decompress(b"".join(chunks)))
Gregory Szorc
zstandard: vendor python-zstandard 0.10.1...
r40157
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(b"".join(decompressed_chunks), original[0:i])
Gregory Szorc
zstandard: vendor python-zstandard 0.10.1...
r40157
chunks = list(chunker.finish())
compressed_chunks.extend(chunks)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 decompressed_chunks.append(dobj.decompress(b"".join(chunks)))
Gregory Szorc
zstandard: vendor python-zstandard 0.10.1...
r40157
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(
Gregory Szorc
python-zstandard: blacken at 80 characters...
r44605 dctx.decompress(
b"".join(compressed_chunks), max_output_size=len(original)
),
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 original,
)
self.assertEqual(b"".join(decompressed_chunks), original)