import io try: import unittest2 as unittest except ImportError: import unittest try: import hypothesis import hypothesis.strategies as strategies except ImportError: raise unittest.SkipTest('hypothesis not available') import zstd compression_levels = strategies.integers(min_value=1, max_value=22) class TestRoundTrip(unittest.TestCase): @hypothesis.given(strategies.binary(), compression_levels) def test_compress_write_to(self, data, level): """Random data from compress() roundtrips via write_to.""" cctx = zstd.ZstdCompressor(level=level) compressed = cctx.compress(data) buffer = io.BytesIO() dctx = zstd.ZstdDecompressor() with dctx.write_to(buffer) as decompressor: decompressor.write(compressed) self.assertEqual(buffer.getvalue(), data) @hypothesis.given(strategies.binary(), compression_levels) def test_compressor_write_to_decompressor_write_to(self, data, level): """Random data from compressor write_to roundtrips via write_to.""" compress_buffer = io.BytesIO() decompressed_buffer = io.BytesIO() cctx = zstd.ZstdCompressor(level=level) with cctx.write_to(compress_buffer) as compressor: compressor.write(data) dctx = zstd.ZstdDecompressor() with dctx.write_to(decompressed_buffer) as decompressor: decompressor.write(compress_buffer.getvalue()) self.assertEqual(decompressed_buffer.getvalue(), data) @hypothesis.given(strategies.binary(average_size=1048576)) @hypothesis.settings(perform_health_check=False) def test_compressor_write_to_decompressor_write_to_larger(self, data): compress_buffer = io.BytesIO() decompressed_buffer = io.BytesIO() cctx = zstd.ZstdCompressor(level=5) with cctx.write_to(compress_buffer) as compressor: compressor.write(data) dctx = zstd.ZstdDecompressor() with dctx.write_to(decompressed_buffer) as decompressor: decompressor.write(compress_buffer.getvalue()) self.assertEqual(decompressed_buffer.getvalue(), data)