##// END OF EJS Templates
resourceutil: correct the root path for file based lookup under py2exe...
resourceutil: correct the root path for file based lookup under py2exe This silly copy/paste error caused "Mercurial" to be truncated from "C:\Program Files". The fact that "helptext" and "defaultrc" are now in a subpackage of "mercurial" added it back on, and everything seemed to work. But that broke if not installed to the default directory, and also caused TortoiseHg to look at Mercurial's config files instead of its own. Differential Revision: https://phab.mercurial-scm.org/D8054

File last commit:

r44605:5e84a96d default
r44678:9e367157 stable
Show More
test_decompressor_fuzzing.py
576 lines | 18.7 KiB | text/x-python | PythonLexer
/ contrib / python-zstandard / tests / test_decompressor_fuzzing.py
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 import io
import os
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 import unittest
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796
try:
import hypothesis
import hypothesis.strategies as strategies
except ImportError:
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 raise unittest.SkipTest("hypothesis not available")
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 import zstandard as zstd
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 from .common import (
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 make_cffi,
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 NonClosingBytesIO,
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 random_input_data,
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 TestCase,
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 )
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 @unittest.skipUnless("ZSTD_SLOW_TESTS" in os.environ, "ZSTD_SLOW_TESTS not set")
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 @make_cffi
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 class TestDecompressor_stream_reader_fuzzing(TestCase):
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 @hypothesis.settings(
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 suppress_health_check=[
hypothesis.HealthCheck.large_base_example,
hypothesis.HealthCheck.too_slow,
]
)
@hypothesis.given(
original=strategies.sampled_from(random_input_data()),
level=strategies.integers(min_value=1, max_value=5),
streaming=strategies.booleans(),
source_read_size=strategies.integers(1, 1048576),
read_sizes=strategies.data(),
)
def test_stream_source_read_variance(
self, original, level, streaming, source_read_size, read_sizes
):
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 cctx = zstd.ZstdCompressor(level=level)
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
if streaming:
source = io.BytesIO()
writer = cctx.stream_writer(source)
writer.write(original)
writer.flush(zstd.FLUSH_FRAME)
source.seek(0)
else:
frame = cctx.compress(original)
source = io.BytesIO(frame)
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513
dctx = zstd.ZstdDecompressor()
chunks = []
with dctx.stream_reader(source, read_size=source_read_size) as reader:
while True:
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 read_size = read_sizes.draw(strategies.integers(-1, 131072))
chunk = reader.read(read_size)
if not chunk and read_size:
break
chunks.append(chunk)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(b"".join(chunks), original)
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
# Similar to above except we have a constant read() size.
@hypothesis.settings(
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 suppress_health_check=[hypothesis.HealthCheck.large_base_example]
)
@hypothesis.given(
original=strategies.sampled_from(random_input_data()),
level=strategies.integers(min_value=1, max_value=5),
streaming=strategies.booleans(),
source_read_size=strategies.integers(1, 1048576),
read_size=strategies.integers(-1, 131072),
)
def test_stream_source_read_size(
self, original, level, streaming, source_read_size, read_size
):
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 if read_size == 0:
read_size = 1
cctx = zstd.ZstdCompressor(level=level)
if streaming:
source = io.BytesIO()
writer = cctx.stream_writer(source)
writer.write(original)
writer.flush(zstd.FLUSH_FRAME)
source.seek(0)
else:
frame = cctx.compress(original)
source = io.BytesIO(frame)
dctx = zstd.ZstdDecompressor()
chunks = []
reader = dctx.stream_reader(source, read_size=source_read_size)
while True:
chunk = reader.read(read_size)
if not chunk and read_size:
break
chunks.append(chunk)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(b"".join(chunks), original)
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
@hypothesis.settings(
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 suppress_health_check=[
hypothesis.HealthCheck.large_base_example,
hypothesis.HealthCheck.too_slow,
]
)
@hypothesis.given(
original=strategies.sampled_from(random_input_data()),
level=strategies.integers(min_value=1, max_value=5),
streaming=strategies.booleans(),
source_read_size=strategies.integers(1, 1048576),
read_sizes=strategies.data(),
)
def test_buffer_source_read_variance(
self, original, level, streaming, source_read_size, read_sizes
):
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 cctx = zstd.ZstdCompressor(level=level)
if streaming:
source = io.BytesIO()
writer = cctx.stream_writer(source)
writer.write(original)
writer.flush(zstd.FLUSH_FRAME)
frame = source.getvalue()
else:
frame = cctx.compress(original)
dctx = zstd.ZstdDecompressor()
chunks = []
with dctx.stream_reader(frame, read_size=source_read_size) as reader:
while True:
read_size = read_sizes.draw(strategies.integers(-1, 131072))
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 chunk = reader.read(read_size)
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 if not chunk and read_size:
break
chunks.append(chunk)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(b"".join(chunks), original)
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
# Similar to above except we have a constant read() size.
@hypothesis.settings(
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 suppress_health_check=[hypothesis.HealthCheck.large_base_example]
)
@hypothesis.given(
original=strategies.sampled_from(random_input_data()),
level=strategies.integers(min_value=1, max_value=5),
streaming=strategies.booleans(),
source_read_size=strategies.integers(1, 1048576),
read_size=strategies.integers(-1, 131072),
)
def test_buffer_source_constant_read_size(
self, original, level, streaming, source_read_size, read_size
):
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 if read_size == 0:
read_size = -1
cctx = zstd.ZstdCompressor(level=level)
if streaming:
source = io.BytesIO()
writer = cctx.stream_writer(source)
writer.write(original)
writer.flush(zstd.FLUSH_FRAME)
frame = source.getvalue()
else:
frame = cctx.compress(original)
dctx = zstd.ZstdDecompressor()
chunks = []
reader = dctx.stream_reader(frame, read_size=source_read_size)
while True:
chunk = reader.read(read_size)
if not chunk and read_size:
break
chunks.append(chunk)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(b"".join(chunks), original)
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
@hypothesis.settings(
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 suppress_health_check=[hypothesis.HealthCheck.large_base_example]
)
@hypothesis.given(
original=strategies.sampled_from(random_input_data()),
level=strategies.integers(min_value=1, max_value=5),
streaming=strategies.booleans(),
source_read_size=strategies.integers(1, 1048576),
)
def test_stream_source_readall(self, original, level, streaming, source_read_size):
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 cctx = zstd.ZstdCompressor(level=level)
if streaming:
source = io.BytesIO()
writer = cctx.stream_writer(source)
writer.write(original)
writer.flush(zstd.FLUSH_FRAME)
source.seek(0)
else:
frame = cctx.compress(original)
source = io.BytesIO(frame)
dctx = zstd.ZstdDecompressor()
data = dctx.stream_reader(source, read_size=source_read_size).readall()
self.assertEqual(data, original)
@hypothesis.settings(
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 suppress_health_check=[
hypothesis.HealthCheck.large_base_example,
hypothesis.HealthCheck.too_slow,
]
)
@hypothesis.given(
original=strategies.sampled_from(random_input_data()),
level=strategies.integers(min_value=1, max_value=5),
streaming=strategies.booleans(),
source_read_size=strategies.integers(1, 1048576),
read_sizes=strategies.data(),
)
def test_stream_source_read1_variance(
self, original, level, streaming, source_read_size, read_sizes
):
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 cctx = zstd.ZstdCompressor(level=level)
if streaming:
source = io.BytesIO()
writer = cctx.stream_writer(source)
writer.write(original)
writer.flush(zstd.FLUSH_FRAME)
source.seek(0)
else:
frame = cctx.compress(original)
source = io.BytesIO(frame)
dctx = zstd.ZstdDecompressor()
chunks = []
with dctx.stream_reader(source, read_size=source_read_size) as reader:
while True:
read_size = read_sizes.draw(strategies.integers(-1, 131072))
chunk = reader.read1(read_size)
if not chunk and read_size:
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 break
chunks.append(chunk)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(b"".join(chunks), original)
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513
@hypothesis.settings(
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 suppress_health_check=[
hypothesis.HealthCheck.large_base_example,
hypothesis.HealthCheck.too_slow,
]
)
@hypothesis.given(
original=strategies.sampled_from(random_input_data()),
level=strategies.integers(min_value=1, max_value=5),
streaming=strategies.booleans(),
source_read_size=strategies.integers(1, 1048576),
read_sizes=strategies.data(),
)
def test_stream_source_readinto1_variance(
self, original, level, streaming, source_read_size, read_sizes
):
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 cctx = zstd.ZstdCompressor(level=level)
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
if streaming:
source = io.BytesIO()
writer = cctx.stream_writer(source)
writer.write(original)
writer.flush(zstd.FLUSH_FRAME)
source.seek(0)
else:
frame = cctx.compress(original)
source = io.BytesIO(frame)
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513
dctx = zstd.ZstdDecompressor()
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 chunks = []
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 with dctx.stream_reader(source, read_size=source_read_size) as reader:
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 while True:
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 read_size = read_sizes.draw(strategies.integers(1, 131072))
b = bytearray(read_size)
count = reader.readinto1(b)
if not count:
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 break
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 chunks.append(bytes(b[0:count]))
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(b"".join(chunks), original)
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513
@hypothesis.settings(
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 suppress_health_check=[
hypothesis.HealthCheck.large_base_example,
hypothesis.HealthCheck.too_slow,
]
)
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 @hypothesis.given(
original=strategies.sampled_from(random_input_data()),
level=strategies.integers(min_value=1, max_value=5),
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 source_read_size=strategies.integers(1, 1048576),
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 seek_amounts=strategies.data(),
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 read_sizes=strategies.data(),
)
def test_relative_seeks(
self, original, level, source_read_size, seek_amounts, read_sizes
):
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 cctx = zstd.ZstdCompressor(level=level)
frame = cctx.compress(original)
dctx = zstd.ZstdDecompressor()
with dctx.stream_reader(frame, read_size=source_read_size) as reader:
while True:
amount = seek_amounts.draw(strategies.integers(0, 16384))
reader.seek(amount, os.SEEK_CUR)
offset = reader.tell()
read_amount = read_sizes.draw(strategies.integers(1, 16384))
chunk = reader.read(read_amount)
if not chunk:
break
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(original[offset : offset + len(chunk)], chunk)
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 @hypothesis.settings(
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 suppress_health_check=[
hypothesis.HealthCheck.large_base_example,
hypothesis.HealthCheck.too_slow,
]
)
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 @hypothesis.given(
originals=strategies.data(),
frame_count=strategies.integers(min_value=2, max_value=10),
level=strategies.integers(min_value=1, max_value=5),
source_read_size=strategies.integers(1, 1048576),
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 read_sizes=strategies.data(),
)
def test_multiple_frames(
self, originals, frame_count, level, source_read_size, read_sizes
):
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
cctx = zstd.ZstdCompressor(level=level)
source = io.BytesIO()
buffer = io.BytesIO()
writer = cctx.stream_writer(buffer)
for i in range(frame_count):
data = originals.draw(strategies.sampled_from(random_input_data()))
source.write(data)
writer.write(data)
writer.flush(zstd.FLUSH_FRAME)
dctx = zstd.ZstdDecompressor()
buffer.seek(0)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 reader = dctx.stream_reader(
buffer, read_size=source_read_size, read_across_frames=True
)
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
chunks = []
while True:
read_amount = read_sizes.draw(strategies.integers(-1, 16384))
chunk = reader.read(read_amount)
if not chunk and read_amount:
break
chunks.append(chunk)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(source.getvalue(), b"".join(chunks))
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 @unittest.skipUnless("ZSTD_SLOW_TESTS" in os.environ, "ZSTD_SLOW_TESTS not set")
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 @make_cffi
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 class TestDecompressor_stream_writer_fuzzing(TestCase):
@hypothesis.settings(
suppress_health_check=[
hypothesis.HealthCheck.large_base_example,
hypothesis.HealthCheck.too_slow,
]
)
@hypothesis.given(
original=strategies.sampled_from(random_input_data()),
level=strategies.integers(min_value=1, max_value=5),
write_size=strategies.integers(min_value=1, max_value=8192),
input_sizes=strategies.data(),
)
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 def test_write_size_variance(self, original, level, write_size, input_sizes):
cctx = zstd.ZstdCompressor(level=level)
frame = cctx.compress(original)
dctx = zstd.ZstdDecompressor()
source = io.BytesIO(frame)
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 dest = NonClosingBytesIO()
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 with dctx.stream_writer(dest, write_size=write_size) as decompressor:
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 while True:
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 input_size = input_sizes.draw(strategies.integers(1, 4096))
chunk = source.read(input_size)
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 if not chunk:
break
decompressor.write(chunk)
self.assertEqual(dest.getvalue(), original)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 @unittest.skipUnless("ZSTD_SLOW_TESTS" in os.environ, "ZSTD_SLOW_TESTS not set")
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 @make_cffi
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 class TestDecompressor_copy_stream_fuzzing(TestCase):
@hypothesis.settings(
suppress_health_check=[
hypothesis.HealthCheck.large_base_example,
hypothesis.HealthCheck.too_slow,
]
)
@hypothesis.given(
original=strategies.sampled_from(random_input_data()),
level=strategies.integers(min_value=1, max_value=5),
read_size=strategies.integers(min_value=1, max_value=8192),
write_size=strategies.integers(min_value=1, max_value=8192),
)
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 def test_read_write_size_variance(self, original, level, read_size, write_size):
cctx = zstd.ZstdCompressor(level=level)
frame = cctx.compress(original)
source = io.BytesIO(frame)
dest = io.BytesIO()
dctx = zstd.ZstdDecompressor()
dctx.copy_stream(source, dest, read_size=read_size, write_size=write_size)
self.assertEqual(dest.getvalue(), original)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 @unittest.skipUnless("ZSTD_SLOW_TESTS" in os.environ, "ZSTD_SLOW_TESTS not set")
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 @make_cffi
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 class TestDecompressor_decompressobj_fuzzing(TestCase):
@hypothesis.settings(
suppress_health_check=[
hypothesis.HealthCheck.large_base_example,
hypothesis.HealthCheck.too_slow,
]
)
@hypothesis.given(
original=strategies.sampled_from(random_input_data()),
level=strategies.integers(min_value=1, max_value=5),
chunk_sizes=strategies.data(),
)
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 def test_random_input_sizes(self, original, level, chunk_sizes):
cctx = zstd.ZstdCompressor(level=level)
frame = cctx.compress(original)
source = io.BytesIO(frame)
dctx = zstd.ZstdDecompressor()
dobj = dctx.decompressobj()
chunks = []
while True:
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 chunk_size = chunk_sizes.draw(strategies.integers(1, 4096))
chunk = source.read(chunk_size)
if not chunk:
break
chunks.append(dobj.decompress(chunk))
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(b"".join(chunks), original)
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 @hypothesis.settings(
suppress_health_check=[
hypothesis.HealthCheck.large_base_example,
hypothesis.HealthCheck.too_slow,
]
)
@hypothesis.given(
original=strategies.sampled_from(random_input_data()),
level=strategies.integers(min_value=1, max_value=5),
write_size=strategies.integers(
min_value=1, max_value=4 * zstd.DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE
),
chunk_sizes=strategies.data(),
)
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 def test_random_output_sizes(self, original, level, write_size, chunk_sizes):
cctx = zstd.ZstdCompressor(level=level)
frame = cctx.compress(original)
source = io.BytesIO(frame)
dctx = zstd.ZstdDecompressor()
dobj = dctx.decompressobj(write_size=write_size)
chunks = []
while True:
chunk_size = chunk_sizes.draw(strategies.integers(1, 4096))
chunk = source.read(chunk_size)
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 if not chunk:
break
chunks.append(dobj.decompress(chunk))
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(b"".join(chunks), original)
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 @unittest.skipUnless("ZSTD_SLOW_TESTS" in os.environ, "ZSTD_SLOW_TESTS not set")
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 @make_cffi
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 class TestDecompressor_read_to_iter_fuzzing(TestCase):
@hypothesis.given(
original=strategies.sampled_from(random_input_data()),
level=strategies.integers(min_value=1, max_value=5),
read_size=strategies.integers(min_value=1, max_value=4096),
write_size=strategies.integers(min_value=1, max_value=4096),
)
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 def test_read_write_size_variance(self, original, level, read_size, write_size):
cctx = zstd.ZstdCompressor(level=level)
frame = cctx.compress(original)
source = io.BytesIO(frame)
dctx = zstd.ZstdDecompressor()
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 chunks = list(
dctx.read_to_iter(source, read_size=read_size, write_size=write_size)
)
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(b"".join(chunks), original)
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 @unittest.skipUnless("ZSTD_SLOW_TESTS" in os.environ, "ZSTD_SLOW_TESTS not set")
class TestDecompressor_multi_decompress_to_buffer_fuzzing(TestCase):
@hypothesis.given(
original=strategies.lists(
strategies.sampled_from(random_input_data()), min_size=1, max_size=1024
),
threads=strategies.integers(min_value=1, max_value=8),
use_dict=strategies.booleans(),
)
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 def test_data_equivalence(self, original, threads, use_dict):
kwargs = {}
if use_dict:
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 kwargs["dict_data"] = zstd.ZstdCompressionDict(original[0])
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 cctx = zstd.ZstdCompressor(
level=1, write_content_size=True, write_checksum=True, **kwargs
)
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 if not hasattr(cctx, "multi_compress_to_buffer"):
self.skipTest("multi_compress_to_buffer not available")
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 frames_buffer = cctx.multi_compress_to_buffer(original, threads=-1)
dctx = zstd.ZstdDecompressor(**kwargs)
result = dctx.multi_decompress_to_buffer(frames_buffer)
self.assertEqual(len(result), len(original))
for i, frame in enumerate(result):
self.assertEqual(frame.tobytes(), original[i])
frames_list = [f.tobytes() for f in frames_buffer]
result = dctx.multi_decompress_to_buffer(frames_list)
self.assertEqual(len(result), len(original))
for i, frame in enumerate(result):
self.assertEqual(frame.tobytes(), original[i])