##// END OF EJS Templates
rust-pyo3: exposition of AncestorsIterator...
rust-pyo3: exposition of AncestorsIterator Compared to the early experiments, we have one less `RwLock` in the wrapping. Still that is somewhat redundant with `UnsafePyLeaked` being itself some kind of lock. In the original rust-cpython code, we were borrowing the `RefCell` with a method that can panic. Instead we are now converting the `PoisonError` that unlocking a `RwLock` can produce. Since all methods acquiring the `RwLock` are themselves protected by the GIL and do not release it before returning, nor do they leak RwLock guards, there is no risk of contention on these locks themselves.

File last commit:

r44605:5e84a96d default
r53428:4c9e3198 default
Show More
test_train_dictionary.py
102 lines | 2.8 KiB | text/x-python | PythonLexer
import struct
import sys
import unittest
import zstandard as zstd
from .common import (
generate_samples,
make_cffi,
random_input_data,
TestCase,
)
if sys.version_info[0] >= 3:
int_type = int
else:
int_type = long
@make_cffi
class TestTrainDictionary(TestCase):
def test_no_args(self):
with self.assertRaises(TypeError):
zstd.train_dictionary()
def test_bad_args(self):
with self.assertRaises(TypeError):
zstd.train_dictionary(8192, u"foo")
with self.assertRaises(ValueError):
zstd.train_dictionary(8192, [u"foo"])
def test_no_params(self):
d = zstd.train_dictionary(8192, random_input_data())
self.assertIsInstance(d.dict_id(), int_type)
# The dictionary ID may be different across platforms.
expected = b"\x37\xa4\x30\xec" + struct.pack("<I", d.dict_id())
data = d.as_bytes()
self.assertEqual(data[0:8], expected)
def test_basic(self):
d = zstd.train_dictionary(8192, generate_samples(), k=64, d=16)
self.assertIsInstance(d.dict_id(), int_type)
data = d.as_bytes()
self.assertEqual(data[0:4], b"\x37\xa4\x30\xec")
self.assertEqual(d.k, 64)
self.assertEqual(d.d, 16)
def test_set_dict_id(self):
d = zstd.train_dictionary(
8192, generate_samples(), k=64, d=16, dict_id=42
)
self.assertEqual(d.dict_id(), 42)
def test_optimize(self):
d = zstd.train_dictionary(
8192, generate_samples(), threads=-1, steps=1, d=16
)
# This varies by platform.
self.assertIn(d.k, (50, 2000))
self.assertEqual(d.d, 16)
@make_cffi
class TestCompressionDict(TestCase):
def test_bad_mode(self):
with self.assertRaisesRegex(ValueError, "invalid dictionary load mode"):
zstd.ZstdCompressionDict(b"foo", dict_type=42)
def test_bad_precompute_compress(self):
d = zstd.train_dictionary(8192, generate_samples(), k=64, d=16)
with self.assertRaisesRegex(
ValueError, "must specify one of level or "
):
d.precompute_compress()
with self.assertRaisesRegex(
ValueError, "must only specify one of level or "
):
d.precompute_compress(
level=3, compression_params=zstd.CompressionParameters()
)
def test_precompute_compress_rawcontent(self):
d = zstd.ZstdCompressionDict(
b"dictcontent" * 64, dict_type=zstd.DICT_TYPE_RAWCONTENT
)
d.precompute_compress(level=1)
d = zstd.ZstdCompressionDict(
b"dictcontent" * 64, dict_type=zstd.DICT_TYPE_FULLDICT
)
with self.assertRaisesRegex(
zstd.ZstdError, "unable to precompute dictionary"
):
d.precompute_compress(level=1)