|
|
import io
|
|
|
|
|
|
try:
|
|
|
import unittest2 as unittest
|
|
|
except ImportError:
|
|
|
import unittest
|
|
|
|
|
|
try:
|
|
|
import hypothesis
|
|
|
import hypothesis.strategies as strategies
|
|
|
except ImportError:
|
|
|
hypothesis = None
|
|
|
|
|
|
import zstd
|
|
|
|
|
|
from . common import (
|
|
|
make_cffi,
|
|
|
)
|
|
|
|
|
|
|
|
|
@make_cffi
|
|
|
class TestCompressionParameters(unittest.TestCase):
|
|
|
def test_init_bad_arg_type(self):
|
|
|
with self.assertRaises(TypeError):
|
|
|
zstd.CompressionParameters()
|
|
|
|
|
|
with self.assertRaises(TypeError):
|
|
|
zstd.CompressionParameters(0, 1)
|
|
|
|
|
|
def test_bounds(self):
|
|
|
zstd.CompressionParameters(zstd.WINDOWLOG_MIN,
|
|
|
zstd.CHAINLOG_MIN,
|
|
|
zstd.HASHLOG_MIN,
|
|
|
zstd.SEARCHLOG_MIN,
|
|
|
zstd.SEARCHLENGTH_MIN,
|
|
|
zstd.TARGETLENGTH_MIN,
|
|
|
zstd.STRATEGY_FAST)
|
|
|
|
|
|
zstd.CompressionParameters(zstd.WINDOWLOG_MAX,
|
|
|
zstd.CHAINLOG_MAX,
|
|
|
zstd.HASHLOG_MAX,
|
|
|
zstd.SEARCHLOG_MAX,
|
|
|
zstd.SEARCHLENGTH_MAX,
|
|
|
zstd.TARGETLENGTH_MAX,
|
|
|
zstd.STRATEGY_BTOPT)
|
|
|
|
|
|
def test_get_compression_parameters(self):
|
|
|
p = zstd.get_compression_parameters(1)
|
|
|
self.assertIsInstance(p, zstd.CompressionParameters)
|
|
|
|
|
|
self.assertEqual(p.window_log, 19)
|
|
|
|
|
|
def test_members(self):
|
|
|
p = zstd.CompressionParameters(10, 6, 7, 4, 5, 8, 1)
|
|
|
self.assertEqual(p.window_log, 10)
|
|
|
self.assertEqual(p.chain_log, 6)
|
|
|
self.assertEqual(p.hash_log, 7)
|
|
|
self.assertEqual(p.search_log, 4)
|
|
|
self.assertEqual(p.search_length, 5)
|
|
|
self.assertEqual(p.target_length, 8)
|
|
|
self.assertEqual(p.strategy, 1)
|
|
|
|
|
|
|
|
|
@make_cffi
|
|
|
class TestFrameParameters(unittest.TestCase):
|
|
|
def test_invalid_type(self):
|
|
|
with self.assertRaises(TypeError):
|
|
|
zstd.get_frame_parameters(None)
|
|
|
|
|
|
with self.assertRaises(TypeError):
|
|
|
zstd.get_frame_parameters(u'foobarbaz')
|
|
|
|
|
|
def test_invalid_input_sizes(self):
|
|
|
with self.assertRaisesRegexp(zstd.ZstdError, 'not enough data for frame'):
|
|
|
zstd.get_frame_parameters(b'')
|
|
|
|
|
|
with self.assertRaisesRegexp(zstd.ZstdError, 'not enough data for frame'):
|
|
|
zstd.get_frame_parameters(zstd.FRAME_HEADER)
|
|
|
|
|
|
def test_invalid_frame(self):
|
|
|
with self.assertRaisesRegexp(zstd.ZstdError, 'Unknown frame descriptor'):
|
|
|
zstd.get_frame_parameters(b'foobarbaz')
|
|
|
|
|
|
def test_attributes(self):
|
|
|
params = zstd.get_frame_parameters(zstd.FRAME_HEADER + b'\x00\x00')
|
|
|
self.assertEqual(params.content_size, 0)
|
|
|
self.assertEqual(params.window_size, 1024)
|
|
|
self.assertEqual(params.dict_id, 0)
|
|
|
self.assertFalse(params.has_checksum)
|
|
|
|
|
|
# Lowest 2 bits indicate a dictionary and length. Here, the dict id is 1 byte.
|
|
|
params = zstd.get_frame_parameters(zstd.FRAME_HEADER + b'\x01\x00\xff')
|
|
|
self.assertEqual(params.content_size, 0)
|
|
|
self.assertEqual(params.window_size, 1024)
|
|
|
self.assertEqual(params.dict_id, 255)
|
|
|
self.assertFalse(params.has_checksum)
|
|
|
|
|
|
# Lowest 3rd bit indicates if checksum is present.
|
|
|
params = zstd.get_frame_parameters(zstd.FRAME_HEADER + b'\x04\x00')
|
|
|
self.assertEqual(params.content_size, 0)
|
|
|
self.assertEqual(params.window_size, 1024)
|
|
|
self.assertEqual(params.dict_id, 0)
|
|
|
self.assertTrue(params.has_checksum)
|
|
|
|
|
|
# Upper 2 bits indicate content size.
|
|
|
params = zstd.get_frame_parameters(zstd.FRAME_HEADER + b'\x40\x00\xff\x00')
|
|
|
self.assertEqual(params.content_size, 511)
|
|
|
self.assertEqual(params.window_size, 1024)
|
|
|
self.assertEqual(params.dict_id, 0)
|
|
|
self.assertFalse(params.has_checksum)
|
|
|
|
|
|
# Window descriptor is 2nd byte after frame header.
|
|
|
params = zstd.get_frame_parameters(zstd.FRAME_HEADER + b'\x00\x40')
|
|
|
self.assertEqual(params.content_size, 0)
|
|
|
self.assertEqual(params.window_size, 262144)
|
|
|
self.assertEqual(params.dict_id, 0)
|
|
|
self.assertFalse(params.has_checksum)
|
|
|
|
|
|
# Set multiple things.
|
|
|
params = zstd.get_frame_parameters(zstd.FRAME_HEADER + b'\x45\x40\x0f\x10\x00')
|
|
|
self.assertEqual(params.content_size, 272)
|
|
|
self.assertEqual(params.window_size, 262144)
|
|
|
self.assertEqual(params.dict_id, 15)
|
|
|
self.assertTrue(params.has_checksum)
|
|
|
|
|
|
|
|
|
if hypothesis:
|
|
|
s_windowlog = strategies.integers(min_value=zstd.WINDOWLOG_MIN,
|
|
|
max_value=zstd.WINDOWLOG_MAX)
|
|
|
s_chainlog = strategies.integers(min_value=zstd.CHAINLOG_MIN,
|
|
|
max_value=zstd.CHAINLOG_MAX)
|
|
|
s_hashlog = strategies.integers(min_value=zstd.HASHLOG_MIN,
|
|
|
max_value=zstd.HASHLOG_MAX)
|
|
|
s_searchlog = strategies.integers(min_value=zstd.SEARCHLOG_MIN,
|
|
|
max_value=zstd.SEARCHLOG_MAX)
|
|
|
s_searchlength = strategies.integers(min_value=zstd.SEARCHLENGTH_MIN,
|
|
|
max_value=zstd.SEARCHLENGTH_MAX)
|
|
|
s_targetlength = strategies.integers(min_value=zstd.TARGETLENGTH_MIN,
|
|
|
max_value=zstd.TARGETLENGTH_MAX)
|
|
|
s_strategy = strategies.sampled_from((zstd.STRATEGY_FAST,
|
|
|
zstd.STRATEGY_DFAST,
|
|
|
zstd.STRATEGY_GREEDY,
|
|
|
zstd.STRATEGY_LAZY,
|
|
|
zstd.STRATEGY_LAZY2,
|
|
|
zstd.STRATEGY_BTLAZY2,
|
|
|
zstd.STRATEGY_BTOPT))
|
|
|
|
|
|
|
|
|
@make_cffi
|
|
|
class TestCompressionParametersHypothesis(unittest.TestCase):
|
|
|
@hypothesis.given(s_windowlog, s_chainlog, s_hashlog, s_searchlog,
|
|
|
s_searchlength, s_targetlength, s_strategy)
|
|
|
def test_valid_init(self, windowlog, chainlog, hashlog, searchlog,
|
|
|
searchlength, targetlength, strategy):
|
|
|
p = zstd.CompressionParameters(windowlog, chainlog, hashlog,
|
|
|
searchlog, searchlength,
|
|
|
targetlength, strategy)
|
|
|
|
|
|
# Verify we can instantiate a compressor with the supplied values.
|
|
|
# ZSTD_checkCParams moves the goal posts on us from what's advertised
|
|
|
# in the constants. So move along with them.
|
|
|
if searchlength == zstd.SEARCHLENGTH_MIN and strategy in (zstd.STRATEGY_FAST, zstd.STRATEGY_GREEDY):
|
|
|
searchlength += 1
|
|
|
p = zstd.CompressionParameters(windowlog, chainlog, hashlog,
|
|
|
searchlog, searchlength,
|
|
|
targetlength, strategy)
|
|
|
elif searchlength == zstd.SEARCHLENGTH_MAX and strategy != zstd.STRATEGY_FAST:
|
|
|
searchlength -= 1
|
|
|
p = zstd.CompressionParameters(windowlog, chainlog, hashlog,
|
|
|
searchlog, searchlength,
|
|
|
targetlength, strategy)
|
|
|
|
|
|
cctx = zstd.ZstdCompressor(compression_params=p)
|
|
|
with cctx.write_to(io.BytesIO()):
|
|
|
pass
|
|
|
|
|
|
@hypothesis.given(s_windowlog, s_chainlog, s_hashlog, s_searchlog,
|
|
|
s_searchlength, s_targetlength, s_strategy)
|
|
|
def test_estimate_compression_context_size(self, windowlog, chainlog,
|
|
|
hashlog, searchlog,
|
|
|
searchlength, targetlength,
|
|
|
strategy):
|
|
|
p = zstd.CompressionParameters(windowlog, chainlog, hashlog,
|
|
|
searchlog, searchlength,
|
|
|
targetlength, strategy)
|
|
|
size = zstd.estimate_compression_context_size(p)
|
|
|
|