##// END OF EJS Templates
convert: avoid wrong lfconvert defaults by moving configitems to core...
convert: avoid wrong lfconvert defaults by moving configitems to core The `hg lfconvert --to-normal` command uses the convert extension internally to work its magic, but that produced devel-warn messages if the convert extension wasn't loaded by the user. The test in fcd2f9b06629 (modified here) wasn't showing the warnings because the convert extension was loaded via $HGRCPATH. Most of the config options default to None/False, but 'hg.usebranchnames' and 'hg.tagsbranch' are supposed to default to True and 'default' respectively. The first iteration of this was to ui.setconfig() inside lfconvert, to force the convert extension to load. But there really is no precedent for doing this, and check-config complained that 'extensions.convert' isn't documented. Yuya suggested this alternative. This partially backs out 0d5a1175d0f9.

File last commit:

r31796:e0dc4053 default
r35141:28121415 stable
Show More
test_decompressor_fuzzing.py
151 lines | 5.5 KiB | text/x-python | PythonLexer
import io
import os
try:
import unittest2 as unittest
except ImportError:
import unittest
try:
import hypothesis
import hypothesis.strategies as strategies
except ImportError:
raise unittest.SkipTest('hypothesis not available')
import zstd
from . common import (
make_cffi,
random_input_data,
)
@unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set')
@make_cffi
class TestDecompressor_write_to_fuzzing(unittest.TestCase):
@hypothesis.given(original=strategies.sampled_from(random_input_data()),
level=strategies.integers(min_value=1, max_value=5),
write_size=strategies.integers(min_value=1, max_value=8192),
input_sizes=strategies.streaming(
strategies.integers(min_value=1, max_value=4096)))
def test_write_size_variance(self, original, level, write_size, input_sizes):
input_sizes = iter(input_sizes)
cctx = zstd.ZstdCompressor(level=level)
frame = cctx.compress(original)
dctx = zstd.ZstdDecompressor()
source = io.BytesIO(frame)
dest = io.BytesIO()
with dctx.write_to(dest, write_size=write_size) as decompressor:
while True:
chunk = source.read(next(input_sizes))
if not chunk:
break
decompressor.write(chunk)
self.assertEqual(dest.getvalue(), original)
@unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set')
@make_cffi
class TestDecompressor_copy_stream_fuzzing(unittest.TestCase):
@hypothesis.given(original=strategies.sampled_from(random_input_data()),
level=strategies.integers(min_value=1, max_value=5),
read_size=strategies.integers(min_value=1, max_value=8192),
write_size=strategies.integers(min_value=1, max_value=8192))
def test_read_write_size_variance(self, original, level, read_size, write_size):
cctx = zstd.ZstdCompressor(level=level)
frame = cctx.compress(original)
source = io.BytesIO(frame)
dest = io.BytesIO()
dctx = zstd.ZstdDecompressor()
dctx.copy_stream(source, dest, read_size=read_size, write_size=write_size)
self.assertEqual(dest.getvalue(), original)
@unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set')
@make_cffi
class TestDecompressor_decompressobj_fuzzing(unittest.TestCase):
@hypothesis.given(original=strategies.sampled_from(random_input_data()),
level=strategies.integers(min_value=1, max_value=5),
chunk_sizes=strategies.streaming(
strategies.integers(min_value=1, max_value=4096)))
def test_random_input_sizes(self, original, level, chunk_sizes):
chunk_sizes = iter(chunk_sizes)
cctx = zstd.ZstdCompressor(level=level)
frame = cctx.compress(original)
source = io.BytesIO(frame)
dctx = zstd.ZstdDecompressor()
dobj = dctx.decompressobj()
chunks = []
while True:
chunk = source.read(next(chunk_sizes))
if not chunk:
break
chunks.append(dobj.decompress(chunk))
self.assertEqual(b''.join(chunks), original)
@unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set')
@make_cffi
class TestDecompressor_read_from_fuzzing(unittest.TestCase):
@hypothesis.given(original=strategies.sampled_from(random_input_data()),
level=strategies.integers(min_value=1, max_value=5),
read_size=strategies.integers(min_value=1, max_value=4096),
write_size=strategies.integers(min_value=1, max_value=4096))
def test_read_write_size_variance(self, original, level, read_size, write_size):
cctx = zstd.ZstdCompressor(level=level)
frame = cctx.compress(original)
source = io.BytesIO(frame)
dctx = zstd.ZstdDecompressor()
chunks = list(dctx.read_from(source, read_size=read_size, write_size=write_size))
self.assertEqual(b''.join(chunks), original)
@unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set')
class TestDecompressor_multi_decompress_to_buffer_fuzzing(unittest.TestCase):
@hypothesis.given(original=strategies.lists(strategies.sampled_from(random_input_data()),
min_size=1, max_size=1024),
threads=strategies.integers(min_value=1, max_value=8),
use_dict=strategies.booleans())
def test_data_equivalence(self, original, threads, use_dict):
kwargs = {}
if use_dict:
kwargs['dict_data'] = zstd.ZstdCompressionDict(original[0])
cctx = zstd.ZstdCompressor(level=1,
write_content_size=True,
write_checksum=True,
**kwargs)
frames_buffer = cctx.multi_compress_to_buffer(original, threads=-1)
dctx = zstd.ZstdDecompressor(**kwargs)
result = dctx.multi_decompress_to_buffer(frames_buffer)
self.assertEqual(len(result), len(original))
for i, frame in enumerate(result):
self.assertEqual(frame.tobytes(), original[i])
frames_list = [f.tobytes() for f in frames_buffer]
result = dctx.multi_decompress_to_buffer(frames_list)
self.assertEqual(len(result), len(original))
for i, frame in enumerate(result):
self.assertEqual(frame.tobytes(), original[i])