##// END OF EJS Templates
scmutil: move construction of instability count message to separate fn...
scmutil: move construction of instability count message to separate fn When the commad we are running, introduces new instabilities, we show a message like `5 new orphan changesets`, `2 new content-divergent changesets`, `1 new phase-divergent changesets` etc which is very nice. Now taking a step ahead, we want users to show how to fix them too. Something like: `5 new orphan changesets (run 'hg evolve' to resolve/stabilize them)` `2 new content-divergent changesets (run 'hg evolve --content-divergent' to resolve them)` and maybe telling user a way to understand more about those new instabilities like `hg evolve --list` or `hg log -r 'orphan()'` something like that. The idea came from issue5855 which I want to fix because fixing that will result in a nice UI. Taking the construction logic out will allow extensions like evolve (maybe rebase too) to wrap that and add information about how to resolve and how to understand the instability more. Differential Revision: https://phab.mercurial-scm.org/D3734

File last commit:

r37513:b1fb341d default
r38474:1cac2e8c default
Show More
test_compressor_fuzzing.py
188 lines | 7.6 KiB | text/x-python | PythonLexer
/ contrib / python-zstandard / tests / test_compressor_fuzzing.py
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 import io
import os
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 import unittest
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796
try:
import hypothesis
import hypothesis.strategies as strategies
except ImportError:
raise unittest.SkipTest('hypothesis not available')
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 import zstandard as zstd
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796
from . common import (
make_cffi,
random_input_data,
)
@unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set')
@make_cffi
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 class TestCompressor_stream_reader_fuzzing(unittest.TestCase):
@hypothesis.given(original=strategies.sampled_from(random_input_data()),
level=strategies.integers(min_value=1, max_value=5),
source_read_size=strategies.integers(1, 16384),
read_sizes=strategies.data())
def test_stream_source_read_variance(self, original, level, source_read_size,
read_sizes):
refctx = zstd.ZstdCompressor(level=level)
ref_frame = refctx.compress(original)
cctx = zstd.ZstdCompressor(level=level)
with cctx.stream_reader(io.BytesIO(original), size=len(original),
read_size=source_read_size) as reader:
chunks = []
while True:
read_size = read_sizes.draw(strategies.integers(1, 16384))
chunk = reader.read(read_size)
if not chunk:
break
chunks.append(chunk)
self.assertEqual(b''.join(chunks), ref_frame)
@hypothesis.given(original=strategies.sampled_from(random_input_data()),
level=strategies.integers(min_value=1, max_value=5),
source_read_size=strategies.integers(1, 16384),
read_sizes=strategies.data())
def test_buffer_source_read_variance(self, original, level, source_read_size,
read_sizes):
refctx = zstd.ZstdCompressor(level=level)
ref_frame = refctx.compress(original)
cctx = zstd.ZstdCompressor(level=level)
with cctx.stream_reader(original, size=len(original),
read_size=source_read_size) as reader:
chunks = []
while True:
read_size = read_sizes.draw(strategies.integers(1, 16384))
chunk = reader.read(read_size)
if not chunk:
break
chunks.append(chunk)
self.assertEqual(b''.join(chunks), ref_frame)
@unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set')
@make_cffi
class TestCompressor_stream_writer_fuzzing(unittest.TestCase):
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 @hypothesis.given(original=strategies.sampled_from(random_input_data()),
level=strategies.integers(min_value=1, max_value=5),
write_size=strategies.integers(min_value=1, max_value=1048576))
def test_write_size_variance(self, original, level, write_size):
refctx = zstd.ZstdCompressor(level=level)
ref_frame = refctx.compress(original)
cctx = zstd.ZstdCompressor(level=level)
b = io.BytesIO()
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 with cctx.stream_writer(b, size=len(original), write_size=write_size) as compressor:
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 compressor.write(original)
self.assertEqual(b.getvalue(), ref_frame)
@unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set')
@make_cffi
class TestCompressor_copy_stream_fuzzing(unittest.TestCase):
@hypothesis.given(original=strategies.sampled_from(random_input_data()),
level=strategies.integers(min_value=1, max_value=5),
read_size=strategies.integers(min_value=1, max_value=1048576),
write_size=strategies.integers(min_value=1, max_value=1048576))
def test_read_write_size_variance(self, original, level, read_size, write_size):
refctx = zstd.ZstdCompressor(level=level)
ref_frame = refctx.compress(original)
cctx = zstd.ZstdCompressor(level=level)
source = io.BytesIO(original)
dest = io.BytesIO()
cctx.copy_stream(source, dest, size=len(original), read_size=read_size,
write_size=write_size)
self.assertEqual(dest.getvalue(), ref_frame)
@unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set')
@make_cffi
class TestCompressor_compressobj_fuzzing(unittest.TestCase):
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 @hypothesis.settings(
suppress_health_check=[hypothesis.HealthCheck.large_base_example])
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 @hypothesis.given(original=strategies.sampled_from(random_input_data()),
level=strategies.integers(min_value=1, max_value=5),
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 chunk_sizes=strategies.data())
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 def test_random_input_sizes(self, original, level, chunk_sizes):
refctx = zstd.ZstdCompressor(level=level)
ref_frame = refctx.compress(original)
cctx = zstd.ZstdCompressor(level=level)
cobj = cctx.compressobj(size=len(original))
chunks = []
i = 0
while True:
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 chunk_size = chunk_sizes.draw(strategies.integers(1, 4096))
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 source = original[i:i + chunk_size]
if not source:
break
chunks.append(cobj.compress(source))
i += chunk_size
chunks.append(cobj.flush())
self.assertEqual(b''.join(chunks), ref_frame)
@unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set')
@make_cffi
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 class TestCompressor_read_to_iter_fuzzing(unittest.TestCase):
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 @hypothesis.given(original=strategies.sampled_from(random_input_data()),
level=strategies.integers(min_value=1, max_value=5),
read_size=strategies.integers(min_value=1, max_value=4096),
write_size=strategies.integers(min_value=1, max_value=4096))
def test_read_write_size_variance(self, original, level, read_size, write_size):
refcctx = zstd.ZstdCompressor(level=level)
ref_frame = refcctx.compress(original)
source = io.BytesIO(original)
cctx = zstd.ZstdCompressor(level=level)
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 chunks = list(cctx.read_to_iter(source, size=len(original),
read_size=read_size,
write_size=write_size))
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796
self.assertEqual(b''.join(chunks), ref_frame)
@unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set')
class TestCompressor_multi_compress_to_buffer_fuzzing(unittest.TestCase):
@hypothesis.given(original=strategies.lists(strategies.sampled_from(random_input_data()),
min_size=1, max_size=1024),
threads=strategies.integers(min_value=1, max_value=8),
use_dict=strategies.booleans())
def test_data_equivalence(self, original, threads, use_dict):
kwargs = {}
# Use a content dictionary because it is cheap to create.
if use_dict:
kwargs['dict_data'] = zstd.ZstdCompressionDict(original[0])
cctx = zstd.ZstdCompressor(level=1,
write_checksum=True,
**kwargs)
result = cctx.multi_compress_to_buffer(original, threads=-1)
self.assertEqual(len(result), len(original))
# The frame produced via the batch APIs may not be bit identical to that
# produced by compress() because compression parameters are adjusted
# from the first input in batch mode. So the only thing we can do is
# verify the decompressed data matches the input.
dctx = zstd.ZstdDecompressor(**kwargs)
for i, frame in enumerate(result):
self.assertEqual(dctx.decompress(frame), original[i])