##// END OF EJS Templates
convert: avoid wrong lfconvert defaults by moving configitems to core...
convert: avoid wrong lfconvert defaults by moving configitems to core The `hg lfconvert --to-normal` command uses the convert extension internally to work its magic, but that produced devel-warn messages if the convert extension wasn't loaded by the user. The test in fcd2f9b06629 (modified here) wasn't showing the warnings because the convert extension was loaded via $HGRCPATH. Most of the config options default to None/False, but 'hg.usebranchnames' and 'hg.tagsbranch' are supposed to default to True and 'default' respectively. The first iteration of this was to ui.setconfig() inside lfconvert, to force the convert extension to load. But there really is no precedent for doing this, and check-config complained that 'extensions.convert' isn't documented. Yuya suggested this alternative. This partially backs out 0d5a1175d0f9.

File last commit:

r31796:e0dc4053 default
r35141:28121415 stable
Show More
test_compressor_fuzzing.py
143 lines | 5.5 KiB | text/x-python | PythonLexer
import io
import os
try:
import unittest2 as unittest
except ImportError:
import unittest
try:
import hypothesis
import hypothesis.strategies as strategies
except ImportError:
raise unittest.SkipTest('hypothesis not available')
import zstd
from . common import (
make_cffi,
random_input_data,
)
@unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set')
@make_cffi
class TestCompressor_write_to_fuzzing(unittest.TestCase):
@hypothesis.given(original=strategies.sampled_from(random_input_data()),
level=strategies.integers(min_value=1, max_value=5),
write_size=strategies.integers(min_value=1, max_value=1048576))
def test_write_size_variance(self, original, level, write_size):
refctx = zstd.ZstdCompressor(level=level)
ref_frame = refctx.compress(original)
cctx = zstd.ZstdCompressor(level=level)
b = io.BytesIO()
with cctx.write_to(b, size=len(original), write_size=write_size) as compressor:
compressor.write(original)
self.assertEqual(b.getvalue(), ref_frame)
@unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set')
@make_cffi
class TestCompressor_copy_stream_fuzzing(unittest.TestCase):
@hypothesis.given(original=strategies.sampled_from(random_input_data()),
level=strategies.integers(min_value=1, max_value=5),
read_size=strategies.integers(min_value=1, max_value=1048576),
write_size=strategies.integers(min_value=1, max_value=1048576))
def test_read_write_size_variance(self, original, level, read_size, write_size):
refctx = zstd.ZstdCompressor(level=level)
ref_frame = refctx.compress(original)
cctx = zstd.ZstdCompressor(level=level)
source = io.BytesIO(original)
dest = io.BytesIO()
cctx.copy_stream(source, dest, size=len(original), read_size=read_size,
write_size=write_size)
self.assertEqual(dest.getvalue(), ref_frame)
@unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set')
@make_cffi
class TestCompressor_compressobj_fuzzing(unittest.TestCase):
@hypothesis.given(original=strategies.sampled_from(random_input_data()),
level=strategies.integers(min_value=1, max_value=5),
chunk_sizes=strategies.streaming(
strategies.integers(min_value=1, max_value=4096)))
def test_random_input_sizes(self, original, level, chunk_sizes):
chunk_sizes = iter(chunk_sizes)
refctx = zstd.ZstdCompressor(level=level)
ref_frame = refctx.compress(original)
cctx = zstd.ZstdCompressor(level=level)
cobj = cctx.compressobj(size=len(original))
chunks = []
i = 0
while True:
chunk_size = next(chunk_sizes)
source = original[i:i + chunk_size]
if not source:
break
chunks.append(cobj.compress(source))
i += chunk_size
chunks.append(cobj.flush())
self.assertEqual(b''.join(chunks), ref_frame)
@unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set')
@make_cffi
class TestCompressor_read_from_fuzzing(unittest.TestCase):
@hypothesis.given(original=strategies.sampled_from(random_input_data()),
level=strategies.integers(min_value=1, max_value=5),
read_size=strategies.integers(min_value=1, max_value=4096),
write_size=strategies.integers(min_value=1, max_value=4096))
def test_read_write_size_variance(self, original, level, read_size, write_size):
refcctx = zstd.ZstdCompressor(level=level)
ref_frame = refcctx.compress(original)
source = io.BytesIO(original)
cctx = zstd.ZstdCompressor(level=level)
chunks = list(cctx.read_from(source, size=len(original), read_size=read_size,
write_size=write_size))
self.assertEqual(b''.join(chunks), ref_frame)
@unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set')
class TestCompressor_multi_compress_to_buffer_fuzzing(unittest.TestCase):
@hypothesis.given(original=strategies.lists(strategies.sampled_from(random_input_data()),
min_size=1, max_size=1024),
threads=strategies.integers(min_value=1, max_value=8),
use_dict=strategies.booleans())
def test_data_equivalence(self, original, threads, use_dict):
kwargs = {}
# Use a content dictionary because it is cheap to create.
if use_dict:
kwargs['dict_data'] = zstd.ZstdCompressionDict(original[0])
cctx = zstd.ZstdCompressor(level=1,
write_content_size=True,
write_checksum=True,
**kwargs)
result = cctx.multi_compress_to_buffer(original, threads=-1)
self.assertEqual(len(result), len(original))
# The frame produced via the batch APIs may not be bit identical to that
# produced by compress() because compression parameters are adjusted
# from the first input in batch mode. So the only thing we can do is
# verify the decompressed data matches the input.
dctx = zstd.ZstdDecompressor(**kwargs)
for i, frame in enumerate(result):
self.assertEqual(dctx.decompress(frame), original[i])