##// END OF EJS Templates
runtests: add a function to test if IPv6 is available...
runtests: add a function to test if IPv6 is available Previously, checkportisavailable returns True if the port is free either on IPv4 or IPv6, but the hg server only uses IPv4 by default. That leads to issues when IPv4 port is not free but the IPv6 one is. To address that, run-tests should stick with either IPv4 or IPv6. This patch adds a function similar to checkportisavailable to test if IPv6 is available, and assigns the result to a variable. The new function was tested in a Linux system script with the following steps: 1. Run "ip addr del ::1/128 dev lo" to delete lo's IPv6 address, Confirm checkipv6available() returns False. 2. Run "ip addr add ::1/128 dev lo" to add back lo's IPv6 address. Confirm checkipv6available() returns True. 3. Start a web server taking the 8000 port. Confirm checkipv6available(8000) is still True.

File last commit:

r30895:c32454d6 default
r30984:15f9084a default
Show More
test_data_structures.py
186 lines | 8.0 KiB | text/x-python | PythonLexer
import io
try:
import unittest2 as unittest
except ImportError:
import unittest
try:
import hypothesis
import hypothesis.strategies as strategies
except ImportError:
hypothesis = None
import zstd
from . common import (
make_cffi,
)
@make_cffi
class TestCompressionParameters(unittest.TestCase):
def test_init_bad_arg_type(self):
with self.assertRaises(TypeError):
zstd.CompressionParameters()
with self.assertRaises(TypeError):
zstd.CompressionParameters(0, 1)
def test_bounds(self):
zstd.CompressionParameters(zstd.WINDOWLOG_MIN,
zstd.CHAINLOG_MIN,
zstd.HASHLOG_MIN,
zstd.SEARCHLOG_MIN,
zstd.SEARCHLENGTH_MIN,
zstd.TARGETLENGTH_MIN,
zstd.STRATEGY_FAST)
zstd.CompressionParameters(zstd.WINDOWLOG_MAX,
zstd.CHAINLOG_MAX,
zstd.HASHLOG_MAX,
zstd.SEARCHLOG_MAX,
zstd.SEARCHLENGTH_MAX,
zstd.TARGETLENGTH_MAX,
zstd.STRATEGY_BTOPT)
def test_get_compression_parameters(self):
p = zstd.get_compression_parameters(1)
self.assertIsInstance(p, zstd.CompressionParameters)
self.assertEqual(p.window_log, 19)
def test_members(self):
p = zstd.CompressionParameters(10, 6, 7, 4, 5, 8, 1)
self.assertEqual(p.window_log, 10)
self.assertEqual(p.chain_log, 6)
self.assertEqual(p.hash_log, 7)
self.assertEqual(p.search_log, 4)
self.assertEqual(p.search_length, 5)
self.assertEqual(p.target_length, 8)
self.assertEqual(p.strategy, 1)
@make_cffi
class TestFrameParameters(unittest.TestCase):
def test_invalid_type(self):
with self.assertRaises(TypeError):
zstd.get_frame_parameters(None)
with self.assertRaises(TypeError):
zstd.get_frame_parameters(u'foobarbaz')
def test_invalid_input_sizes(self):
with self.assertRaisesRegexp(zstd.ZstdError, 'not enough data for frame'):
zstd.get_frame_parameters(b'')
with self.assertRaisesRegexp(zstd.ZstdError, 'not enough data for frame'):
zstd.get_frame_parameters(zstd.FRAME_HEADER)
def test_invalid_frame(self):
with self.assertRaisesRegexp(zstd.ZstdError, 'Unknown frame descriptor'):
zstd.get_frame_parameters(b'foobarbaz')
def test_attributes(self):
params = zstd.get_frame_parameters(zstd.FRAME_HEADER + b'\x00\x00')
self.assertEqual(params.content_size, 0)
self.assertEqual(params.window_size, 1024)
self.assertEqual(params.dict_id, 0)
self.assertFalse(params.has_checksum)
# Lowest 2 bits indicate a dictionary and length. Here, the dict id is 1 byte.
params = zstd.get_frame_parameters(zstd.FRAME_HEADER + b'\x01\x00\xff')
self.assertEqual(params.content_size, 0)
self.assertEqual(params.window_size, 1024)
self.assertEqual(params.dict_id, 255)
self.assertFalse(params.has_checksum)
# Lowest 3rd bit indicates if checksum is present.
params = zstd.get_frame_parameters(zstd.FRAME_HEADER + b'\x04\x00')
self.assertEqual(params.content_size, 0)
self.assertEqual(params.window_size, 1024)
self.assertEqual(params.dict_id, 0)
self.assertTrue(params.has_checksum)
# Upper 2 bits indicate content size.
params = zstd.get_frame_parameters(zstd.FRAME_HEADER + b'\x40\x00\xff\x00')
self.assertEqual(params.content_size, 511)
self.assertEqual(params.window_size, 1024)
self.assertEqual(params.dict_id, 0)
self.assertFalse(params.has_checksum)
# Window descriptor is 2nd byte after frame header.
params = zstd.get_frame_parameters(zstd.FRAME_HEADER + b'\x00\x40')
self.assertEqual(params.content_size, 0)
self.assertEqual(params.window_size, 262144)
self.assertEqual(params.dict_id, 0)
self.assertFalse(params.has_checksum)
# Set multiple things.
params = zstd.get_frame_parameters(zstd.FRAME_HEADER + b'\x45\x40\x0f\x10\x00')
self.assertEqual(params.content_size, 272)
self.assertEqual(params.window_size, 262144)
self.assertEqual(params.dict_id, 15)
self.assertTrue(params.has_checksum)
if hypothesis:
s_windowlog = strategies.integers(min_value=zstd.WINDOWLOG_MIN,
max_value=zstd.WINDOWLOG_MAX)
s_chainlog = strategies.integers(min_value=zstd.CHAINLOG_MIN,
max_value=zstd.CHAINLOG_MAX)
s_hashlog = strategies.integers(min_value=zstd.HASHLOG_MIN,
max_value=zstd.HASHLOG_MAX)
s_searchlog = strategies.integers(min_value=zstd.SEARCHLOG_MIN,
max_value=zstd.SEARCHLOG_MAX)
s_searchlength = strategies.integers(min_value=zstd.SEARCHLENGTH_MIN,
max_value=zstd.SEARCHLENGTH_MAX)
s_targetlength = strategies.integers(min_value=zstd.TARGETLENGTH_MIN,
max_value=zstd.TARGETLENGTH_MAX)
s_strategy = strategies.sampled_from((zstd.STRATEGY_FAST,
zstd.STRATEGY_DFAST,
zstd.STRATEGY_GREEDY,
zstd.STRATEGY_LAZY,
zstd.STRATEGY_LAZY2,
zstd.STRATEGY_BTLAZY2,
zstd.STRATEGY_BTOPT))
@make_cffi
class TestCompressionParametersHypothesis(unittest.TestCase):
@hypothesis.given(s_windowlog, s_chainlog, s_hashlog, s_searchlog,
s_searchlength, s_targetlength, s_strategy)
def test_valid_init(self, windowlog, chainlog, hashlog, searchlog,
searchlength, targetlength, strategy):
p = zstd.CompressionParameters(windowlog, chainlog, hashlog,
searchlog, searchlength,
targetlength, strategy)
# Verify we can instantiate a compressor with the supplied values.
# ZSTD_checkCParams moves the goal posts on us from what's advertised
# in the constants. So move along with them.
if searchlength == zstd.SEARCHLENGTH_MIN and strategy in (zstd.STRATEGY_FAST, zstd.STRATEGY_GREEDY):
searchlength += 1
p = zstd.CompressionParameters(windowlog, chainlog, hashlog,
searchlog, searchlength,
targetlength, strategy)
elif searchlength == zstd.SEARCHLENGTH_MAX and strategy != zstd.STRATEGY_FAST:
searchlength -= 1
p = zstd.CompressionParameters(windowlog, chainlog, hashlog,
searchlog, searchlength,
targetlength, strategy)
cctx = zstd.ZstdCompressor(compression_params=p)
with cctx.write_to(io.BytesIO()):
pass
@hypothesis.given(s_windowlog, s_chainlog, s_hashlog, s_searchlog,
s_searchlength, s_targetlength, s_strategy)
def test_estimate_compression_context_size(self, windowlog, chainlog,
hashlog, searchlog,
searchlength, targetlength,
strategy):
p = zstd.CompressionParameters(windowlog, chainlog, hashlog,
searchlog, searchlength,
targetlength, strategy)
size = zstd.estimate_compression_context_size(p)