##// END OF EJS Templates
util: make strdate's defaults default value a dict...
util: make strdate's defaults default value a dict It was specified to be an empty list in c6adf2be6069 in 2007. It was correct at the time. But when the function was refactored in 91bc001a592f (2010), it started expecting a dict. I guess this code path is untested? Thanks to Yuya for spotting this.

File last commit:

r30895:c32454d6 default
r31404:7409eb69 default
Show More
test_roundtrip.py
68 lines | 2.2 KiB | text/x-python | PythonLexer
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 import io
try:
import unittest2 as unittest
except ImportError:
import unittest
try:
import hypothesis
import hypothesis.strategies as strategies
except ImportError:
raise unittest.SkipTest('hypothesis not available')
import zstd
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 from .common import (
make_cffi,
)
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435
compression_levels = strategies.integers(min_value=1, max_value=22)
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 @make_cffi
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 class TestRoundTrip(unittest.TestCase):
@hypothesis.given(strategies.binary(), compression_levels)
def test_compress_write_to(self, data, level):
"""Random data from compress() roundtrips via write_to."""
cctx = zstd.ZstdCompressor(level=level)
compressed = cctx.compress(data)
buffer = io.BytesIO()
dctx = zstd.ZstdDecompressor()
with dctx.write_to(buffer) as decompressor:
decompressor.write(compressed)
self.assertEqual(buffer.getvalue(), data)
@hypothesis.given(strategies.binary(), compression_levels)
def test_compressor_write_to_decompressor_write_to(self, data, level):
"""Random data from compressor write_to roundtrips via write_to."""
compress_buffer = io.BytesIO()
decompressed_buffer = io.BytesIO()
cctx = zstd.ZstdCompressor(level=level)
with cctx.write_to(compress_buffer) as compressor:
compressor.write(data)
dctx = zstd.ZstdDecompressor()
with dctx.write_to(decompressed_buffer) as decompressor:
decompressor.write(compress_buffer.getvalue())
self.assertEqual(decompressed_buffer.getvalue(), data)
@hypothesis.given(strategies.binary(average_size=1048576))
@hypothesis.settings(perform_health_check=False)
def test_compressor_write_to_decompressor_write_to_larger(self, data):
compress_buffer = io.BytesIO()
decompressed_buffer = io.BytesIO()
cctx = zstd.ZstdCompressor(level=5)
with cctx.write_to(compress_buffer) as compressor:
compressor.write(data)
dctx = zstd.ZstdDecompressor()
with dctx.write_to(decompressed_buffer) as decompressor:
decompressor.write(compress_buffer.getvalue())
self.assertEqual(decompressed_buffer.getvalue(), data)