##// END OF EJS Templates
branching: merge stable into default
branching: merge stable into default

File last commit:

r54024:f16a7f3c stable
r54034:1b4a024f merge default
Show More
test_compressor_stream_writer.py
599 lines | 21.0 KiB | text/x-python | PythonLexer
/ contrib / python-zstandard / tests / test_compressor_stream_writer.py
import hashlib
import io
import os
import tarfile
import tempfile
import unittest
import zstandard as zstd
from .common import (
NonClosingBytesIO,
CustomBytesIO,
)
class TestCompressor_stream_writer(unittest.TestCase):
def test_io_api(self):
buffer = io.BytesIO()
cctx = zstd.ZstdCompressor()
writer = cctx.stream_writer(buffer)
self.assertFalse(writer.isatty())
self.assertFalse(writer.readable())
with self.assertRaises(io.UnsupportedOperation):
writer.__iter__()
with self.assertRaises(io.UnsupportedOperation):
writer.__next__()
with self.assertRaises(io.UnsupportedOperation):
writer.readline()
with self.assertRaises(io.UnsupportedOperation):
writer.readline(42)
with self.assertRaises(io.UnsupportedOperation):
writer.readline(size=42)
with self.assertRaises(io.UnsupportedOperation):
writer.readlines()
with self.assertRaises(io.UnsupportedOperation):
writer.readlines(42)
with self.assertRaises(io.UnsupportedOperation):
writer.readlines(hint=42)
with self.assertRaises(io.UnsupportedOperation):
writer.seek(0)
with self.assertRaises(io.UnsupportedOperation):
writer.seek(10, os.SEEK_SET)
self.assertFalse(writer.seekable())
with self.assertRaises(io.UnsupportedOperation):
writer.truncate()
with self.assertRaises(io.UnsupportedOperation):
writer.truncate(42)
with self.assertRaises(io.UnsupportedOperation):
writer.truncate(size=42)
self.assertTrue(writer.writable())
with self.assertRaises(NotImplementedError):
writer.writelines([])
with self.assertRaises(io.UnsupportedOperation):
writer.read()
with self.assertRaises(io.UnsupportedOperation):
writer.read(42)
with self.assertRaises(io.UnsupportedOperation):
writer.read(size=42)
with self.assertRaises(io.UnsupportedOperation):
writer.readall()
with self.assertRaises(io.UnsupportedOperation):
writer.readinto(None)
with self.assertRaises(io.UnsupportedOperation):
writer.fileno()
self.assertFalse(writer.closed)
def test_fileno_file(self):
with tempfile.TemporaryFile("wb") as tf:
cctx = zstd.ZstdCompressor()
writer = cctx.stream_writer(tf)
self.assertEqual(writer.fileno(), tf.fileno())
def test_close(self):
buffer = NonClosingBytesIO()
cctx = zstd.ZstdCompressor(level=1)
writer = cctx.stream_writer(buffer)
writer.write(b"foo" * 1024)
self.assertFalse(writer.closed)
self.assertFalse(buffer.closed)
writer.close()
self.assertTrue(writer.closed)
self.assertTrue(buffer.closed)
with self.assertRaisesRegex(ValueError, "stream is closed"):
writer.write(b"foo")
with self.assertRaisesRegex(ValueError, "stream is closed"):
writer.flush()
with self.assertRaisesRegex(ValueError, "stream is closed"):
with writer:
pass
self.assertEqual(
buffer.getvalue(),
b"\x28\xb5\x2f\xfd\x00\x48\x55\x00\x00\x18\x66\x6f"
b"\x6f\x01\x00\xfa\xd3\x77\x43",
)
# Context manager exit should close stream.
buffer = CustomBytesIO()
writer = cctx.stream_writer(buffer)
with writer:
writer.write(b"foo")
self.assertTrue(writer.closed)
self.assertTrue(buffer.closed)
self.assertEqual(buffer._flush_count, 0)
# Context manager exit should close stream if an exception raised.
buffer = CustomBytesIO()
writer = cctx.stream_writer(buffer)
with self.assertRaisesRegex(Exception, "ignore"):
with writer:
writer.write(b"foo")
raise Exception("ignore")
self.assertTrue(writer.closed)
self.assertTrue(buffer.closed)
self.assertEqual(buffer._flush_count, 0)
def test_close_closefd_false(self):
buffer = io.BytesIO()
cctx = zstd.ZstdCompressor(level=1)
writer = cctx.stream_writer(buffer, closefd=False)
writer.write(b"foo" * 1024)
self.assertFalse(writer.closed)
self.assertFalse(buffer.closed)
writer.close()
self.assertTrue(writer.closed)
self.assertFalse(buffer.closed)
with self.assertRaisesRegex(ValueError, "stream is closed"):
writer.write(b"foo")
with self.assertRaisesRegex(ValueError, "stream is closed"):
writer.flush()
with self.assertRaisesRegex(ValueError, "stream is closed"):
with writer:
pass
self.assertEqual(
buffer.getvalue(),
b"\x28\xb5\x2f\xfd\x00\x48\x55\x00\x00\x18\x66\x6f"
b"\x6f\x01\x00\xfa\xd3\x77\x43",
)
# Context manager exit should not close stream.
buffer = CustomBytesIO()
writer = cctx.stream_writer(buffer, closefd=False)
with writer:
writer.write(b"foo")
self.assertTrue(writer.closed)
self.assertFalse(buffer.closed)
self.assertEqual(buffer._flush_count, 0)
# Context manager exit should close stream if an exception raised.
buffer = CustomBytesIO()
writer = cctx.stream_writer(buffer, closefd=False)
with self.assertRaisesRegex(Exception, "ignore"):
with writer:
writer.write(b"foo")
raise Exception("ignore")
self.assertTrue(writer.closed)
self.assertFalse(buffer.closed)
self.assertEqual(buffer._flush_count, 0)
def test_empty(self):
buffer = io.BytesIO()
cctx = zstd.ZstdCompressor(level=1, write_content_size=False)
with cctx.stream_writer(buffer, closefd=False) as compressor:
compressor.write(b"")
result = buffer.getvalue()
self.assertEqual(result, b"\x28\xb5\x2f\xfd\x00\x00\x01\x00\x00")
params = zstd.get_frame_parameters(result)
self.assertEqual(params.content_size, zstd.CONTENTSIZE_UNKNOWN)
self.assertEqual(params.window_size, 1024)
self.assertEqual(params.dict_id, 0)
self.assertFalse(params.has_checksum)
# Test without context manager.
buffer = io.BytesIO()
compressor = cctx.stream_writer(buffer)
self.assertEqual(compressor.write(b""), 0)
self.assertEqual(buffer.getvalue(), b"")
self.assertEqual(compressor.flush(zstd.FLUSH_FRAME), 9)
result = buffer.getvalue()
self.assertEqual(result, b"\x28\xb5\x2f\xfd\x00\x00\x01\x00\x00")
params = zstd.get_frame_parameters(result)
self.assertEqual(params.content_size, zstd.CONTENTSIZE_UNKNOWN)
self.assertEqual(params.window_size, 1024)
self.assertEqual(params.dict_id, 0)
self.assertFalse(params.has_checksum)
# Test write_return_read=False
compressor = cctx.stream_writer(buffer, write_return_read=False)
self.assertEqual(compressor.write(b""), 0)
def test_input_types(self):
expected = b"\x28\xb5\x2f\xfd\x00\x48\x19\x00\x00\x66\x6f\x6f"
cctx = zstd.ZstdCompressor(level=1)
mutable_array = bytearray(3)
mutable_array[:] = b"foo"
sources = [
memoryview(b"foo"),
bytearray(b"foo"),
mutable_array,
]
for source in sources:
buffer = io.BytesIO()
with cctx.stream_writer(buffer, closefd=False) as compressor:
compressor.write(source)
self.assertEqual(buffer.getvalue(), expected)
compressor = cctx.stream_writer(buffer, write_return_read=False)
self.assertEqual(compressor.write(source), 0)
def test_multiple_compress(self):
buffer = io.BytesIO()
cctx = zstd.ZstdCompressor(level=5)
with cctx.stream_writer(buffer, closefd=False) as compressor:
self.assertEqual(compressor.write(b"foo"), 3)
self.assertEqual(compressor.write(b"bar"), 3)
self.assertEqual(compressor.write(b"x" * 8192), 8192)
result = buffer.getvalue()
self.assertEqual(
result,
b"\x28\xb5\x2f\xfd\x00\x58\x75\x00\x00\x38\x66\x6f"
b"\x6f\x62\x61\x72\x78\x01\x00\xfc\xdf\x03\x23",
)
# Test without context manager.
buffer = io.BytesIO()
compressor = cctx.stream_writer(buffer)
self.assertEqual(compressor.write(b"foo"), 3)
self.assertEqual(compressor.write(b"bar"), 3)
self.assertEqual(compressor.write(b"x" * 8192), 8192)
self.assertEqual(compressor.flush(zstd.FLUSH_FRAME), 23)
result = buffer.getvalue()
self.assertEqual(
result,
b"\x28\xb5\x2f\xfd\x00\x58\x75\x00\x00\x38\x66\x6f"
b"\x6f\x62\x61\x72\x78\x01\x00\xfc\xdf\x03\x23",
)
# Test with write_return_read=False.
compressor = cctx.stream_writer(buffer, write_return_read=False)
self.assertEqual(compressor.write(b"foo"), 0)
self.assertEqual(compressor.write(b"barbiz"), 0)
self.assertEqual(compressor.write(b"x" * 8192), 0)
def test_dictionary(self):
samples = []
for i in range(128):
samples.append(b"foo" * 64)
samples.append(b"bar" * 64)
samples.append(b"foobar" * 64)
d = zstd.train_dictionary(8192, samples)
h = hashlib.sha1(d.as_bytes()).hexdigest()
self.assertEqual(h, "a46d2f7a3bc3357c9d717d3dadf9a26fde23e93d")
buffer = io.BytesIO()
cctx = zstd.ZstdCompressor(level=9, dict_data=d)
with cctx.stream_writer(buffer, closefd=False) as compressor:
self.assertEqual(compressor.write(b"foo"), 3)
self.assertEqual(compressor.write(b"bar"), 3)
self.assertEqual(compressor.write(b"foo" * 16384), 3 * 16384)
compressed = buffer.getvalue()
params = zstd.get_frame_parameters(compressed)
self.assertEqual(params.content_size, zstd.CONTENTSIZE_UNKNOWN)
self.assertEqual(params.window_size, 4194304)
self.assertEqual(params.dict_id, d.dict_id())
self.assertFalse(params.has_checksum)
h = hashlib.sha1(compressed).hexdigest()
self.assertEqual(h, "f8ca6ebe269a822615e86d710c74d61cb4d4e3ca")
source = b"foo" + b"bar" + (b"foo" * 16384)
dctx = zstd.ZstdDecompressor(dict_data=d)
self.assertEqual(
dctx.decompress(compressed, max_output_size=len(source)), source
)
def test_compression_params(self):
params = zstd.ZstdCompressionParameters(
window_log=20,
chain_log=6,
hash_log=12,
min_match=5,
search_log=4,
target_length=10,
strategy=zstd.STRATEGY_FAST,
)
buffer = io.BytesIO()
cctx = zstd.ZstdCompressor(compression_params=params)
with cctx.stream_writer(buffer, closefd=False) as compressor:
self.assertEqual(compressor.write(b"foo"), 3)
self.assertEqual(compressor.write(b"bar"), 3)
self.assertEqual(compressor.write(b"foobar" * 16384), 6 * 16384)
compressed = buffer.getvalue()
params = zstd.get_frame_parameters(compressed)
self.assertEqual(params.content_size, zstd.CONTENTSIZE_UNKNOWN)
self.assertEqual(params.window_size, 1048576)
self.assertEqual(params.dict_id, 0)
self.assertFalse(params.has_checksum)
h = hashlib.sha1(compressed).hexdigest()
self.assertEqual(h, "dd4bb7d37c1a0235b38a2f6b462814376843ef0b")
def test_write_checksum(self):
no_checksum = io.BytesIO()
cctx = zstd.ZstdCompressor(level=1)
with cctx.stream_writer(no_checksum, closefd=False) as compressor:
self.assertEqual(compressor.write(b"foobar"), 6)
with_checksum = io.BytesIO()
cctx = zstd.ZstdCompressor(level=1, write_checksum=True)
with cctx.stream_writer(with_checksum, closefd=False) as compressor:
self.assertEqual(compressor.write(b"foobar"), 6)
no_params = zstd.get_frame_parameters(no_checksum.getvalue())
with_params = zstd.get_frame_parameters(with_checksum.getvalue())
self.assertEqual(no_params.content_size, zstd.CONTENTSIZE_UNKNOWN)
self.assertEqual(with_params.content_size, zstd.CONTENTSIZE_UNKNOWN)
self.assertEqual(no_params.dict_id, 0)
self.assertEqual(with_params.dict_id, 0)
self.assertFalse(no_params.has_checksum)
self.assertTrue(with_params.has_checksum)
self.assertEqual(
len(with_checksum.getvalue()), len(no_checksum.getvalue()) + 4
)
def test_write_content_size(self):
no_size = io.BytesIO()
cctx = zstd.ZstdCompressor(level=1, write_content_size=False)
with cctx.stream_writer(no_size, closefd=False) as compressor:
self.assertEqual(
compressor.write(b"foobar" * 256), len(b"foobar" * 256)
)
with_size = io.BytesIO()
cctx = zstd.ZstdCompressor(level=1)
with cctx.stream_writer(with_size, closefd=False) as compressor:
self.assertEqual(
compressor.write(b"foobar" * 256), len(b"foobar" * 256)
)
# Source size is not known in streaming mode, so header not
# written.
self.assertEqual(len(with_size.getvalue()), len(no_size.getvalue()))
# Declaring size will write the header.
with_size = io.BytesIO()
with cctx.stream_writer(
with_size, size=len(b"foobar" * 256), closefd=False
) as compressor:
self.assertEqual(
compressor.write(b"foobar" * 256), len(b"foobar" * 256)
)
no_params = zstd.get_frame_parameters(no_size.getvalue())
with_params = zstd.get_frame_parameters(with_size.getvalue())
self.assertEqual(no_params.content_size, zstd.CONTENTSIZE_UNKNOWN)
self.assertEqual(with_params.content_size, 1536)
self.assertEqual(no_params.dict_id, 0)
self.assertEqual(with_params.dict_id, 0)
self.assertFalse(no_params.has_checksum)
self.assertFalse(with_params.has_checksum)
self.assertEqual(len(with_size.getvalue()), len(no_size.getvalue()) + 1)
def test_no_dict_id(self):
samples = []
for i in range(128):
samples.append(b"foo" * 64)
samples.append(b"bar" * 64)
samples.append(b"foobar" * 64)
d = zstd.train_dictionary(1024, samples)
with_dict_id = io.BytesIO()
cctx = zstd.ZstdCompressor(level=1, dict_data=d)
with cctx.stream_writer(with_dict_id, closefd=False) as compressor:
self.assertEqual(compressor.write(b"foobarfoobar"), 12)
self.assertEqual(with_dict_id.getvalue()[4:5], b"\x03")
cctx = zstd.ZstdCompressor(level=1, dict_data=d, write_dict_id=False)
no_dict_id = io.BytesIO()
with cctx.stream_writer(no_dict_id, closefd=False) as compressor:
self.assertEqual(compressor.write(b"foobarfoobar"), 12)
self.assertEqual(no_dict_id.getvalue()[4:5], b"\x00")
no_params = zstd.get_frame_parameters(no_dict_id.getvalue())
with_params = zstd.get_frame_parameters(with_dict_id.getvalue())
self.assertEqual(no_params.content_size, zstd.CONTENTSIZE_UNKNOWN)
self.assertEqual(with_params.content_size, zstd.CONTENTSIZE_UNKNOWN)
self.assertEqual(no_params.dict_id, 0)
self.assertEqual(with_params.dict_id, d.dict_id())
self.assertFalse(no_params.has_checksum)
self.assertFalse(with_params.has_checksum)
self.assertEqual(
len(with_dict_id.getvalue()), len(no_dict_id.getvalue()) + 4
)
def test_memory_size(self):
cctx = zstd.ZstdCompressor(level=3)
buffer = io.BytesIO()
with cctx.stream_writer(buffer) as compressor:
compressor.write(b"foo")
size = compressor.memory_size()
self.assertGreater(size, 100000)
def test_write_size(self):
cctx = zstd.ZstdCompressor(level=3)
dest = CustomBytesIO()
with cctx.stream_writer(
dest, write_size=1, closefd=False
) as compressor:
self.assertEqual(compressor.write(b"foo"), 3)
self.assertEqual(compressor.write(b"bar"), 3)
self.assertEqual(compressor.write(b"foobar"), 6)
self.assertEqual(len(dest.getvalue()), dest._write_count)
def test_flush_repeated(self):
cctx = zstd.ZstdCompressor(level=3)
dest = CustomBytesIO()
with cctx.stream_writer(dest, closefd=False) as compressor:
self.assertEqual(compressor.write(b"foo"), 3)
self.assertEqual(dest._write_count, 0)
self.assertEqual(compressor.flush(), 12)
self.assertEqual(dest._flush_count, 1)
self.assertEqual(dest._write_count, 1)
self.assertEqual(compressor.write(b"bar"), 3)
self.assertEqual(dest._write_count, 1)
self.assertEqual(compressor.flush(), 6)
self.assertEqual(dest._flush_count, 2)
self.assertEqual(dest._write_count, 2)
self.assertEqual(compressor.write(b"baz"), 3)
self.assertEqual(dest._write_count, 3)
self.assertEqual(dest._flush_count, 2)
def test_flush_empty_block(self):
cctx = zstd.ZstdCompressor(level=3, write_checksum=True)
dest = CustomBytesIO()
with cctx.stream_writer(dest, closefd=False) as compressor:
self.assertEqual(compressor.write(b"foobar" * 8192), 6 * 8192)
count = dest._write_count
offset = dest.tell()
self.assertEqual(compressor.flush(), 23)
self.assertEqual(dest._flush_count, 1)
self.assertGreater(dest._write_count, count)
self.assertGreater(dest.tell(), offset)
offset = dest.tell()
# Ending the write here should cause an empty block to be written
# to denote end of frame.
self.assertEqual(dest._flush_count, 1)
trailing = dest.getvalue()[offset:]
# 3 bytes block header + 4 bytes frame checksum
self.assertEqual(len(trailing), 7)
header = trailing[0:3]
self.assertEqual(header, b"\x01\x00\x00")
def test_flush_frame(self):
cctx = zstd.ZstdCompressor(level=3)
dest = CustomBytesIO()
with cctx.stream_writer(dest, closefd=False) as compressor:
self.assertEqual(compressor.write(b"foobar" * 8192), 6 * 8192)
self.assertEqual(compressor.flush(zstd.FLUSH_FRAME), 23)
self.assertEqual(dest._flush_count, 1)
compressor.write(b"biz" * 16384)
self.assertEqual(dest._flush_count, 1)
self.assertEqual(
dest.getvalue(),
# Frame 1.
b"\x28\xb5\x2f\xfd\x00\x58\x75\x00\x00\x30\x66\x6f\x6f"
b"\x62\x61\x72\x01\x00\xf7\xbf\xe8\xa5\x08"
# Frame 2.
b"\x28\xb5\x2f\xfd\x00\x58\x5d\x00\x00\x18\x62\x69\x7a"
b"\x01\x00\xfa\x3f\x75\x37\x04",
)
def test_bad_flush_mode(self):
cctx = zstd.ZstdCompressor()
dest = io.BytesIO()
with cctx.stream_writer(dest) as compressor:
with self.assertRaisesRegex(ValueError, "unknown flush_mode: 42"):
compressor.flush(flush_mode=42)
def test_multithreaded(self):
dest = io.BytesIO()
cctx = zstd.ZstdCompressor(threads=2)
with cctx.stream_writer(dest, closefd=False) as compressor:
compressor.write(b"a" * 1048576)
compressor.write(b"b" * 1048576)
compressor.write(b"c" * 1048576)
self.assertEqual(len(dest.getvalue()), 111)
def test_tell(self):
dest = io.BytesIO()
cctx = zstd.ZstdCompressor()
with cctx.stream_writer(dest) as compressor:
self.assertEqual(compressor.tell(), 0)
for i in range(256):
compressor.write(b"foo" * (i + 1))
self.assertEqual(compressor.tell(), dest.tell())
def test_bad_size(self):
cctx = zstd.ZstdCompressor()
dest = io.BytesIO()
with self.assertRaisesRegex(zstd.ZstdError, "Src size is incorrect"):
with cctx.stream_writer(dest, size=2) as compressor:
compressor.write(b"foo")
# Test another operation.
with cctx.stream_writer(dest, size=42):
pass
def test_tarfile_compat(self):
dest = io.BytesIO()
cctx = zstd.ZstdCompressor()
with cctx.stream_writer(dest, closefd=False) as compressor:
with tarfile.open("tf", mode="w|", fileobj=compressor) as tf:
tf.add(__file__, "test_compressor.py")
dest = io.BytesIO(dest.getvalue())
dctx = zstd.ZstdDecompressor()
with dctx.stream_reader(dest) as reader:
with tarfile.open(mode="r|", fileobj=reader) as tf:
for member in tf:
self.assertEqual(member.name, "test_compressor.py")