##// END OF EJS Templates
vfs: use 'vfs' module directly in 'mercurial.localrepo'...
vfs: use 'vfs' module directly in 'mercurial.localrepo' Now that the 'vfs' classes moved in their own module, lets use the new module directly. We update code iteratively to help with possible bisect needs in the future.

File last commit:

r30895:c32454d6 default
r31231:0a38a313 default
Show More
test_roundtrip.py
68 lines | 2.2 KiB | text/x-python | PythonLexer
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 import io
try:
import unittest2 as unittest
except ImportError:
import unittest
try:
import hypothesis
import hypothesis.strategies as strategies
except ImportError:
raise unittest.SkipTest('hypothesis not available')
import zstd
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 from .common import (
make_cffi,
)
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435
compression_levels = strategies.integers(min_value=1, max_value=22)
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 @make_cffi
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 class TestRoundTrip(unittest.TestCase):
@hypothesis.given(strategies.binary(), compression_levels)
def test_compress_write_to(self, data, level):
"""Random data from compress() roundtrips via write_to."""
cctx = zstd.ZstdCompressor(level=level)
compressed = cctx.compress(data)
buffer = io.BytesIO()
dctx = zstd.ZstdDecompressor()
with dctx.write_to(buffer) as decompressor:
decompressor.write(compressed)
self.assertEqual(buffer.getvalue(), data)
@hypothesis.given(strategies.binary(), compression_levels)
def test_compressor_write_to_decompressor_write_to(self, data, level):
"""Random data from compressor write_to roundtrips via write_to."""
compress_buffer = io.BytesIO()
decompressed_buffer = io.BytesIO()
cctx = zstd.ZstdCompressor(level=level)
with cctx.write_to(compress_buffer) as compressor:
compressor.write(data)
dctx = zstd.ZstdDecompressor()
with dctx.write_to(decompressed_buffer) as decompressor:
decompressor.write(compress_buffer.getvalue())
self.assertEqual(decompressed_buffer.getvalue(), data)
@hypothesis.given(strategies.binary(average_size=1048576))
@hypothesis.settings(perform_health_check=False)
def test_compressor_write_to_decompressor_write_to_larger(self, data):
compress_buffer = io.BytesIO()
decompressed_buffer = io.BytesIO()
cctx = zstd.ZstdCompressor(level=5)
with cctx.write_to(compress_buffer) as compressor:
compressor.write(data)
dctx = zstd.ZstdDecompressor()
with dctx.write_to(decompressed_buffer) as decompressor:
decompressor.write(compress_buffer.getvalue())
self.assertEqual(decompressed_buffer.getvalue(), data)