##// END OF EJS Templates
chgserver: make S channel support pager request...
chgserver: make S channel support pager request This patch adds the "pager" support for the S channel. The pager API allows running some subcommands, namely attachio, and waiting for the client to be properly synchronized.

File last commit:

r30435:b86a448a default
r30739:815e1cef default
Show More
test_data_structures.py
107 lines | 5.1 KiB | text/x-python | PythonLexer
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 import io
try:
import unittest2 as unittest
except ImportError:
import unittest
try:
import hypothesis
import hypothesis.strategies as strategies
except ImportError:
hypothesis = None
import zstd
class TestCompressionParameters(unittest.TestCase):
def test_init_bad_arg_type(self):
with self.assertRaises(TypeError):
zstd.CompressionParameters()
with self.assertRaises(TypeError):
zstd.CompressionParameters(0, 1)
def test_bounds(self):
zstd.CompressionParameters(zstd.WINDOWLOG_MIN,
zstd.CHAINLOG_MIN,
zstd.HASHLOG_MIN,
zstd.SEARCHLOG_MIN,
zstd.SEARCHLENGTH_MIN,
zstd.TARGETLENGTH_MIN,
zstd.STRATEGY_FAST)
zstd.CompressionParameters(zstd.WINDOWLOG_MAX,
zstd.CHAINLOG_MAX,
zstd.HASHLOG_MAX,
zstd.SEARCHLOG_MAX,
zstd.SEARCHLENGTH_MAX,
zstd.TARGETLENGTH_MAX,
zstd.STRATEGY_BTOPT)
def test_get_compression_parameters(self):
p = zstd.get_compression_parameters(1)
self.assertIsInstance(p, zstd.CompressionParameters)
self.assertEqual(p[0], 19)
if hypothesis:
s_windowlog = strategies.integers(min_value=zstd.WINDOWLOG_MIN,
max_value=zstd.WINDOWLOG_MAX)
s_chainlog = strategies.integers(min_value=zstd.CHAINLOG_MIN,
max_value=zstd.CHAINLOG_MAX)
s_hashlog = strategies.integers(min_value=zstd.HASHLOG_MIN,
max_value=zstd.HASHLOG_MAX)
s_searchlog = strategies.integers(min_value=zstd.SEARCHLOG_MIN,
max_value=zstd.SEARCHLOG_MAX)
s_searchlength = strategies.integers(min_value=zstd.SEARCHLENGTH_MIN,
max_value=zstd.SEARCHLENGTH_MAX)
s_targetlength = strategies.integers(min_value=zstd.TARGETLENGTH_MIN,
max_value=zstd.TARGETLENGTH_MAX)
s_strategy = strategies.sampled_from((zstd.STRATEGY_FAST,
zstd.STRATEGY_DFAST,
zstd.STRATEGY_GREEDY,
zstd.STRATEGY_LAZY,
zstd.STRATEGY_LAZY2,
zstd.STRATEGY_BTLAZY2,
zstd.STRATEGY_BTOPT))
class TestCompressionParametersHypothesis(unittest.TestCase):
@hypothesis.given(s_windowlog, s_chainlog, s_hashlog, s_searchlog,
s_searchlength, s_targetlength, s_strategy)
def test_valid_init(self, windowlog, chainlog, hashlog, searchlog,
searchlength, targetlength, strategy):
p = zstd.CompressionParameters(windowlog, chainlog, hashlog,
searchlog, searchlength,
targetlength, strategy)
self.assertEqual(tuple(p),
(windowlog, chainlog, hashlog, searchlog,
searchlength, targetlength, strategy))
# Verify we can instantiate a compressor with the supplied values.
# ZSTD_checkCParams moves the goal posts on us from what's advertised
# in the constants. So move along with them.
if searchlength == zstd.SEARCHLENGTH_MIN and strategy in (zstd.STRATEGY_FAST, zstd.STRATEGY_GREEDY):
searchlength += 1
p = zstd.CompressionParameters(windowlog, chainlog, hashlog,
searchlog, searchlength,
targetlength, strategy)
elif searchlength == zstd.SEARCHLENGTH_MAX and strategy != zstd.STRATEGY_FAST:
searchlength -= 1
p = zstd.CompressionParameters(windowlog, chainlog, hashlog,
searchlog, searchlength,
targetlength, strategy)
cctx = zstd.ZstdCompressor(compression_params=p)
with cctx.write_to(io.BytesIO()):
pass
@hypothesis.given(s_windowlog, s_chainlog, s_hashlog, s_searchlog,
s_searchlength, s_targetlength, s_strategy)
def test_estimate_compression_context_size(self, windowlog, chainlog,
hashlog, searchlog,
searchlength, targetlength,
strategy):
p = zstd.CompressionParameters(windowlog, chainlog, hashlog,
searchlog, searchlength,
targetlength, strategy)
size = zstd.estimate_compression_context_size(p)