##// END OF EJS Templates
branching: merge stable into default
branching: merge stable into default

File last commit:

r54024:f16a7f3c stable
r54034:1b4a024f merge default
Show More
test_compressor_stream_reader.py
375 lines | 11.5 KiB | text/x-python | PythonLexer
/ contrib / python-zstandard / tests / test_compressor_stream_reader.py
import io
import unittest
import zstandard as zstd
from .common import (
NonClosingBytesIO,
CustomBytesIO,
)
class TestCompressor_stream_reader(unittest.TestCase):
def test_context_manager(self):
cctx = zstd.ZstdCompressor()
with cctx.stream_reader(b"foo") as reader:
with self.assertRaisesRegex(
ValueError, "cannot __enter__ multiple times"
):
with reader as reader2:
pass
def test_no_context_manager(self):
cctx = zstd.ZstdCompressor()
reader = cctx.stream_reader(b"foo")
reader.read(4)
self.assertFalse(reader.closed)
reader.close()
self.assertTrue(reader.closed)
with self.assertRaisesRegex(ValueError, "stream is closed"):
reader.read(1)
def test_not_implemented(self):
cctx = zstd.ZstdCompressor()
with cctx.stream_reader(b"foo" * 60) as reader:
with self.assertRaises(io.UnsupportedOperation):
reader.readline()
with self.assertRaises(io.UnsupportedOperation):
reader.readlines()
with self.assertRaises(io.UnsupportedOperation):
iter(reader)
with self.assertRaises(io.UnsupportedOperation):
next(reader)
with self.assertRaises(OSError):
reader.writelines([])
with self.assertRaises(OSError):
reader.write(b"foo")
def test_constant_methods(self):
cctx = zstd.ZstdCompressor()
with cctx.stream_reader(b"boo") as reader:
self.assertTrue(reader.readable())
self.assertFalse(reader.writable())
self.assertFalse(reader.seekable())
self.assertFalse(reader.isatty())
self.assertFalse(reader.closed)
self.assertIsNone(reader.flush())
self.assertFalse(reader.closed)
self.assertTrue(reader.closed)
def test_read_closed(self):
cctx = zstd.ZstdCompressor()
with cctx.stream_reader(b"foo" * 60) as reader:
reader.close()
self.assertTrue(reader.closed)
with self.assertRaisesRegex(ValueError, "stream is closed"):
reader.read(10)
def test_read_sizes(self):
cctx = zstd.ZstdCompressor()
foo = cctx.compress(b"foo")
with cctx.stream_reader(b"foo") as reader:
with self.assertRaisesRegex(
ValueError, "cannot read negative amounts less than -1"
):
reader.read(-2)
self.assertEqual(reader.read(0), b"")
self.assertEqual(reader.read(), foo)
def test_read_buffer(self):
cctx = zstd.ZstdCompressor()
source = b"".join([b"foo" * 60, b"bar" * 60, b"baz" * 60])
frame = cctx.compress(source)
with cctx.stream_reader(source) as reader:
self.assertEqual(reader.tell(), 0)
# We should get entire frame in one read.
result = reader.read(8192)
self.assertEqual(result, frame)
self.assertEqual(reader.tell(), len(result))
self.assertEqual(reader.read(), b"")
self.assertEqual(reader.tell(), len(result))
def test_read_buffer_small_chunks(self):
cctx = zstd.ZstdCompressor()
source = b"foo" * 60
chunks = []
with cctx.stream_reader(source) as reader:
self.assertEqual(reader.tell(), 0)
while True:
chunk = reader.read(1)
if not chunk:
break
chunks.append(chunk)
self.assertEqual(reader.tell(), sum(map(len, chunks)))
self.assertEqual(b"".join(chunks), cctx.compress(source))
def test_read_stream(self):
cctx = zstd.ZstdCompressor()
source = b"".join([b"foo" * 60, b"bar" * 60, b"baz" * 60])
frame = cctx.compress(source)
with cctx.stream_reader(io.BytesIO(source), size=len(source)) as reader:
self.assertEqual(reader.tell(), 0)
chunk = reader.read(8192)
self.assertEqual(chunk, frame)
self.assertEqual(reader.tell(), len(chunk))
self.assertEqual(reader.read(), b"")
self.assertEqual(reader.tell(), len(chunk))
def test_read_stream_small_chunks(self):
cctx = zstd.ZstdCompressor()
source = b"foo" * 60
chunks = []
with cctx.stream_reader(io.BytesIO(source), size=len(source)) as reader:
self.assertEqual(reader.tell(), 0)
while True:
chunk = reader.read(1)
if not chunk:
break
chunks.append(chunk)
self.assertEqual(reader.tell(), sum(map(len, chunks)))
self.assertEqual(b"".join(chunks), cctx.compress(source))
def test_read_after_exit(self):
cctx = zstd.ZstdCompressor()
with cctx.stream_reader(b"foo" * 60) as reader:
while reader.read(8192):
pass
with self.assertRaisesRegex(ValueError, "stream is closed"):
reader.read(10)
def test_bad_size(self):
cctx = zstd.ZstdCompressor()
source = io.BytesIO(b"foobar")
with cctx.stream_reader(source, size=2) as reader:
with self.assertRaisesRegex(
zstd.ZstdError, "Src size is incorrect"
):
reader.read(10)
# Try another compression operation.
with cctx.stream_reader(source, size=42):
pass
def test_readall(self):
cctx = zstd.ZstdCompressor()
frame = cctx.compress(b"foo" * 1024)
reader = cctx.stream_reader(b"foo" * 1024)
self.assertEqual(reader.readall(), frame)
def test_readinto(self):
cctx = zstd.ZstdCompressor()
foo = cctx.compress(b"foo")
reader = cctx.stream_reader(b"foo")
with self.assertRaises(Exception):
reader.readinto(b"foobar")
# readinto() with sufficiently large destination.
b = bytearray(1024)
reader = cctx.stream_reader(b"foo")
self.assertEqual(reader.readinto(b), len(foo))
self.assertEqual(b[0 : len(foo)], foo)
self.assertEqual(reader.readinto(b), 0)
self.assertEqual(b[0 : len(foo)], foo)
# readinto() with small reads.
b = bytearray(1024)
reader = cctx.stream_reader(b"foo", read_size=1)
self.assertEqual(reader.readinto(b), len(foo))
self.assertEqual(b[0 : len(foo)], foo)
# Too small destination buffer.
b = bytearray(2)
reader = cctx.stream_reader(b"foo")
self.assertEqual(reader.readinto(b), 2)
self.assertEqual(b[:], foo[0:2])
self.assertEqual(reader.readinto(b), 2)
self.assertEqual(b[:], foo[2:4])
self.assertEqual(reader.readinto(b), 2)
self.assertEqual(b[:], foo[4:6])
def test_readinto1(self):
cctx = zstd.ZstdCompressor()
foo = b"".join(cctx.read_to_iter(io.BytesIO(b"foo")))
reader = cctx.stream_reader(b"foo")
with self.assertRaises(Exception):
reader.readinto1(b"foobar")
b = bytearray(1024)
source = CustomBytesIO(b"foo")
reader = cctx.stream_reader(source)
self.assertEqual(reader.readinto1(b), len(foo))
self.assertEqual(b[0 : len(foo)], foo)
self.assertEqual(source._read_count, 2)
# readinto1() with small reads.
b = bytearray(1024)
source = CustomBytesIO(b"foo")
reader = cctx.stream_reader(source, read_size=1)
self.assertEqual(reader.readinto1(b), len(foo))
self.assertEqual(b[0 : len(foo)], foo)
self.assertEqual(source._read_count, 4)
def test_read1(self):
cctx = zstd.ZstdCompressor()
foo = b"".join(cctx.read_to_iter(io.BytesIO(b"foo")))
b = CustomBytesIO(b"foo")
reader = cctx.stream_reader(b)
self.assertEqual(reader.read1(), foo)
self.assertEqual(b._read_count, 2)
b = CustomBytesIO(b"foo")
reader = cctx.stream_reader(b)
self.assertEqual(reader.read1(0), b"")
self.assertEqual(reader.read1(2), foo[0:2])
self.assertEqual(b._read_count, 2)
self.assertEqual(reader.read1(2), foo[2:4])
self.assertEqual(reader.read1(1024), foo[4:])
def test_close(self):
buffer = NonClosingBytesIO(b"foo" * 1024)
cctx = zstd.ZstdCompressor()
reader = cctx.stream_reader(buffer)
reader.read(3)
self.assertFalse(reader.closed)
self.assertFalse(buffer.closed)
reader.close()
self.assertTrue(reader.closed)
self.assertTrue(buffer.closed)
with self.assertRaisesRegex(ValueError, "stream is closed"):
reader.read(3)
with self.assertRaisesRegex(ValueError, "stream is closed"):
with reader:
pass
# Context manager exit should close stream.
buffer = io.BytesIO(b"foo" * 1024)
reader = cctx.stream_reader(buffer)
with reader:
reader.read(3)
self.assertTrue(reader.closed)
self.assertTrue(buffer.closed)
# Context manager exit should close stream if an exception raised.
buffer = io.BytesIO(b"foo" * 1024)
reader = cctx.stream_reader(buffer)
with self.assertRaisesRegex(Exception, "ignore"):
with reader:
reader.read(3)
raise Exception("ignore")
self.assertTrue(reader.closed)
self.assertTrue(buffer.closed)
# Test with non-file source.
with cctx.stream_reader(b"foo" * 1024) as reader:
reader.read(3)
self.assertFalse(reader.closed)
self.assertTrue(reader.closed)
def test_close_closefd_false(self):
buffer = NonClosingBytesIO(b"foo" * 1024)
cctx = zstd.ZstdCompressor()
reader = cctx.stream_reader(buffer, closefd=False)
reader.read(3)
self.assertFalse(reader.closed)
self.assertFalse(buffer.closed)
reader.close()
self.assertTrue(reader.closed)
self.assertFalse(buffer.closed)
with self.assertRaisesRegex(ValueError, "stream is closed"):
reader.read(3)
with self.assertRaisesRegex(ValueError, "stream is closed"):
with reader:
pass
# Context manager exit should close stream.
buffer = io.BytesIO(b"foo" * 1024)
reader = cctx.stream_reader(buffer, closefd=False)
with reader:
reader.read(3)
self.assertTrue(reader.closed)
self.assertFalse(buffer.closed)
# Context manager exit should close stream if an exception raised.
buffer = io.BytesIO(b"foo" * 1024)
reader = cctx.stream_reader(buffer, closefd=False)
with self.assertRaisesRegex(Exception, "ignore"):
with reader:
reader.read(3)
raise Exception("ignore")
self.assertTrue(reader.closed)
self.assertFalse(buffer.closed)
# Test with non-file source variant.
with cctx.stream_reader(b"foo" * 1024, closefd=False) as reader:
reader.read(3)
self.assertFalse(reader.closed)
self.assertTrue(reader.closed)
def test_write_exception(self):
b = CustomBytesIO()
b.write_exception = IOError("write")
cctx = zstd.ZstdCompressor()
writer = cctx.stream_writer(b)
# Initial write won't issue write() to underlying stream.
writer.write(b"foo")
with self.assertRaisesRegex(IOError, "write"):
writer.flush()