##// END OF EJS Templates
extensions: ignore exceptions from an extension's `getversion()` method...
extensions: ignore exceptions from an extension's `getversion()` method This method is usually called when there's a stacktrace being generated, or with `hg version -v`. Raising another exception risks mangling the bug report info. I hit this issue when trying to add the method to the keyring extension to report the version of the extension and the underlying module, and ran into demandimport issues prior to py3.8. It seems like a wise thing to do anyway, though unfortunately there's no convenient `ui` object around to issue a warning. Use 'unknown' to signal that it tried to report a version and failed, unlike the default case of printing nothing. Differential Revision: https://phab.mercurial-scm.org/D10540

File last commit:

r44605:5e84a96d default
r47829:55345152 stable
Show More
test_decompressor.py
1714 lines | 53.3 KiB | text/x-python | PythonLexer
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 import io
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 import os
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 import random
import struct
import sys
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 import tempfile
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 import unittest
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 import zstandard as zstd
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 from .common import (
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 generate_samples,
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 make_cffi,
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 NonClosingBytesIO,
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 OpCountingBytesIO,
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 TestCase,
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 )
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435
if sys.version_info[0] >= 3:
next = lambda it: it.__next__()
else:
next = lambda it: it.next()
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 @make_cffi
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 class TestFrameHeaderSize(TestCase):
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 def test_empty(self):
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 with self.assertRaisesRegex(
zstd.ZstdError,
"could not determine frame header size: Src size " "is incorrect",
):
zstd.frame_header_size(b"")
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513
def test_too_small(self):
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 with self.assertRaisesRegex(
zstd.ZstdError,
"could not determine frame header size: Src size " "is incorrect",
):
zstd.frame_header_size(b"foob")
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513
def test_basic(self):
# It doesn't matter that it isn't a valid frame.
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(zstd.frame_header_size(b"long enough but no magic"), 6)
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513
@make_cffi
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 class TestFrameContentSize(TestCase):
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 def test_empty(self):
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 with self.assertRaisesRegex(
zstd.ZstdError, "error when determining content size"
):
zstd.frame_content_size(b"")
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513
def test_too_small(self):
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 with self.assertRaisesRegex(
zstd.ZstdError, "error when determining content size"
):
zstd.frame_content_size(b"foob")
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513
def test_bad_frame(self):
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 with self.assertRaisesRegex(
zstd.ZstdError, "error when determining content size"
):
zstd.frame_content_size(b"invalid frame header")
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513
def test_unknown(self):
cctx = zstd.ZstdCompressor(write_content_size=False)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 frame = cctx.compress(b"foobar")
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513
self.assertEqual(zstd.frame_content_size(frame), -1)
def test_empty(self):
cctx = zstd.ZstdCompressor()
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 frame = cctx.compress(b"")
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513
self.assertEqual(zstd.frame_content_size(frame), 0)
def test_basic(self):
cctx = zstd.ZstdCompressor()
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 frame = cctx.compress(b"foobar")
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513
self.assertEqual(zstd.frame_content_size(frame), 6)
@make_cffi
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 class TestDecompressor(TestCase):
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 def test_memory_size(self):
dctx = zstd.ZstdDecompressor()
self.assertGreater(dctx.memory_size(), 100)
@make_cffi
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 class TestDecompressor_decompress(TestCase):
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 def test_empty_input(self):
dctx = zstd.ZstdDecompressor()
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 with self.assertRaisesRegex(
zstd.ZstdError, "error determining content size from frame header"
):
dctx.decompress(b"")
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435
def test_invalid_input(self):
dctx = zstd.ZstdDecompressor()
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 with self.assertRaisesRegex(
zstd.ZstdError, "error determining content size from frame header"
):
dctx.decompress(b"foobar")
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 def test_input_types(self):
cctx = zstd.ZstdCompressor(level=1)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 compressed = cctx.compress(b"foo")
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513
mutable_array = bytearray(len(compressed))
mutable_array[:] = compressed
sources = [
memoryview(compressed),
bytearray(compressed),
mutable_array,
]
dctx = zstd.ZstdDecompressor()
for source in sources:
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(dctx.decompress(source), b"foo")
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 def test_no_content_size_in_frame(self):
cctx = zstd.ZstdCompressor(write_content_size=False)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 compressed = cctx.compress(b"foobar")
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435
dctx = zstd.ZstdDecompressor()
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 with self.assertRaisesRegex(
zstd.ZstdError, "could not determine content size in frame header"
):
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 dctx.decompress(compressed)
def test_content_size_present(self):
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 cctx = zstd.ZstdCompressor()
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 compressed = cctx.compress(b"foobar")
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435
dctx = zstd.ZstdDecompressor()
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 decompressed = dctx.decompress(compressed)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(decompressed, b"foobar")
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 def test_empty_roundtrip(self):
cctx = zstd.ZstdCompressor()
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 compressed = cctx.compress(b"")
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513
dctx = zstd.ZstdDecompressor()
decompressed = dctx.decompress(compressed)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(decompressed, b"")
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 def test_max_output_size(self):
cctx = zstd.ZstdCompressor(write_content_size=False)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 source = b"foobar" * 256
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 compressed = cctx.compress(source)
dctx = zstd.ZstdDecompressor()
# Will fit into buffer exactly the size of input.
decompressed = dctx.decompress(compressed, max_output_size=len(source))
self.assertEqual(decompressed, source)
# Input size - 1 fails
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 with self.assertRaisesRegex(
zstd.ZstdError, "decompression error: did not decompress full frame"
):
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 dctx.decompress(compressed, max_output_size=len(source) - 1)
# Input size + 1 works
Gregory Szorc
python-zstandard: blacken at 80 characters...
r44605 decompressed = dctx.decompress(
compressed, max_output_size=len(source) + 1
)
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 self.assertEqual(decompressed, source)
# A much larger buffer works.
Gregory Szorc
python-zstandard: blacken at 80 characters...
r44605 decompressed = dctx.decompress(
compressed, max_output_size=len(source) * 64
)
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 self.assertEqual(decompressed, source)
def test_stupidly_large_output_buffer(self):
cctx = zstd.ZstdCompressor(write_content_size=False)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 compressed = cctx.compress(b"foobar" * 256)
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 dctx = zstd.ZstdDecompressor()
# Will get OverflowError on some Python distributions that can't
# handle really large integers.
with self.assertRaises((MemoryError, OverflowError)):
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 dctx.decompress(compressed, max_output_size=2 ** 62)
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435
def test_dictionary(self):
samples = []
for i in range(128):
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 samples.append(b"foo" * 64)
samples.append(b"bar" * 64)
samples.append(b"foobar" * 64)
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435
d = zstd.train_dictionary(8192, samples)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 orig = b"foobar" * 16384
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 cctx = zstd.ZstdCompressor(level=1, dict_data=d)
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 compressed = cctx.compress(orig)
dctx = zstd.ZstdDecompressor(dict_data=d)
decompressed = dctx.decompress(compressed)
self.assertEqual(decompressed, orig)
def test_dictionary_multiple(self):
samples = []
for i in range(128):
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 samples.append(b"foo" * 64)
samples.append(b"bar" * 64)
samples.append(b"foobar" * 64)
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435
d = zstd.train_dictionary(8192, samples)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 sources = (b"foobar" * 8192, b"foo" * 8192, b"bar" * 8192)
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 compressed = []
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 cctx = zstd.ZstdCompressor(level=1, dict_data=d)
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 for source in sources:
compressed.append(cctx.compress(source))
dctx = zstd.ZstdDecompressor(dict_data=d)
for i in range(len(sources)):
decompressed = dctx.decompress(compressed[i])
self.assertEqual(decompressed, sources[i])
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 def test_max_window_size(self):
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 with open(__file__, "rb") as fh:
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 source = fh.read()
# If we write a content size, the decompressor engages single pass
# mode and the window size doesn't come into play.
cctx = zstd.ZstdCompressor(write_content_size=False)
frame = cctx.compress(source)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 dctx = zstd.ZstdDecompressor(max_window_size=2 ** zstd.WINDOWLOG_MIN)
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 with self.assertRaisesRegex(
Gregory Szorc
python-zstandard: blacken at 80 characters...
r44605 zstd.ZstdError,
"decompression error: Frame requires too much memory",
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 ):
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 dctx.decompress(frame, max_output_size=len(source))
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 @make_cffi
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 class TestDecompressor_copy_stream(TestCase):
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 def test_no_read(self):
source = object()
dest = io.BytesIO()
dctx = zstd.ZstdDecompressor()
with self.assertRaises(ValueError):
dctx.copy_stream(source, dest)
def test_no_write(self):
source = io.BytesIO()
dest = object()
dctx = zstd.ZstdDecompressor()
with self.assertRaises(ValueError):
dctx.copy_stream(source, dest)
def test_empty(self):
source = io.BytesIO()
dest = io.BytesIO()
dctx = zstd.ZstdDecompressor()
# TODO should this raise an error?
r, w = dctx.copy_stream(source, dest)
self.assertEqual(r, 0)
self.assertEqual(w, 0)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(dest.getvalue(), b"")
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435
def test_large_data(self):
source = io.BytesIO()
for i in range(255):
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 source.write(struct.Struct(">B").pack(i) * 16384)
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 source.seek(0)
compressed = io.BytesIO()
cctx = zstd.ZstdCompressor()
cctx.copy_stream(source, compressed)
compressed.seek(0)
dest = io.BytesIO()
dctx = zstd.ZstdDecompressor()
r, w = dctx.copy_stream(compressed, dest)
self.assertEqual(r, len(compressed.getvalue()))
self.assertEqual(w, len(source.getvalue()))
def test_read_write_size(self):
Gregory Szorc
python-zstandard: blacken at 80 characters...
r44605 source = OpCountingBytesIO(
zstd.ZstdCompressor().compress(b"foobarfoobar")
)
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435
dest = OpCountingBytesIO()
dctx = zstd.ZstdDecompressor()
r, w = dctx.copy_stream(source, dest, read_size=1, write_size=1)
self.assertEqual(r, len(source.getvalue()))
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(w, len(b"foobarfoobar"))
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 self.assertEqual(source._read_count, len(source.getvalue()) + 1)
self.assertEqual(dest._write_count, len(dest.getvalue()))
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 @make_cffi
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 class TestDecompressor_stream_reader(TestCase):
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 def test_context_manager(self):
dctx = zstd.ZstdDecompressor()
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 with dctx.stream_reader(b"foo") as reader:
Gregory Szorc
python-zstandard: blacken at 80 characters...
r44605 with self.assertRaisesRegex(
ValueError, "cannot __enter__ multiple times"
):
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 with reader as reader2:
pass
def test_not_implemented(self):
dctx = zstd.ZstdDecompressor()
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 with dctx.stream_reader(b"foo") as reader:
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 with self.assertRaises(io.UnsupportedOperation):
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 reader.readline()
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 with self.assertRaises(io.UnsupportedOperation):
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 reader.readlines()
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 with self.assertRaises(io.UnsupportedOperation):
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 iter(reader)
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 with self.assertRaises(io.UnsupportedOperation):
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 next(reader)
with self.assertRaises(io.UnsupportedOperation):
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 reader.write(b"foo")
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513
with self.assertRaises(io.UnsupportedOperation):
reader.writelines([])
def test_constant_methods(self):
dctx = zstd.ZstdDecompressor()
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 with dctx.stream_reader(b"foo") as reader:
Gregory Szorc
zstandard: vendor python-zstandard 0.10.1...
r40157 self.assertFalse(reader.closed)
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 self.assertTrue(reader.readable())
self.assertFalse(reader.writable())
self.assertTrue(reader.seekable())
self.assertFalse(reader.isatty())
Gregory Szorc
zstandard: vendor python-zstandard 0.10.1...
r40157 self.assertFalse(reader.closed)
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 self.assertIsNone(reader.flush())
Gregory Szorc
zstandard: vendor python-zstandard 0.10.1...
r40157 self.assertFalse(reader.closed)
self.assertTrue(reader.closed)
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513
def test_read_closed(self):
dctx = zstd.ZstdDecompressor()
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 with dctx.stream_reader(b"foo") as reader:
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 reader.close()
Gregory Szorc
zstandard: vendor python-zstandard 0.10.1...
r40157 self.assertTrue(reader.closed)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 with self.assertRaisesRegex(ValueError, "stream is closed"):
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 reader.read(1)
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 def test_read_sizes(self):
cctx = zstd.ZstdCompressor()
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 foo = cctx.compress(b"foo")
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 dctx = zstd.ZstdDecompressor()
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 with dctx.stream_reader(foo) as reader:
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 with self.assertRaisesRegex(
ValueError, "cannot read negative amounts less than -1"
):
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 reader.read(-2)
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(reader.read(0), b"")
self.assertEqual(reader.read(), b"foo")
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513
def test_read_buffer(self):
cctx = zstd.ZstdCompressor()
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 source = b"".join([b"foo" * 60, b"bar" * 60, b"baz" * 60])
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 frame = cctx.compress(source)
dctx = zstd.ZstdDecompressor()
with dctx.stream_reader(frame) as reader:
self.assertEqual(reader.tell(), 0)
# We should get entire frame in one read.
result = reader.read(8192)
self.assertEqual(result, source)
self.assertEqual(reader.tell(), len(source))
# Read after EOF should return empty bytes.
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(reader.read(1), b"")
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 self.assertEqual(reader.tell(), len(result))
Gregory Szorc
zstandard: vendor python-zstandard 0.10.1...
r40157 self.assertTrue(reader.closed)
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513
def test_read_buffer_small_chunks(self):
cctx = zstd.ZstdCompressor()
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 source = b"".join([b"foo" * 60, b"bar" * 60, b"baz" * 60])
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 frame = cctx.compress(source)
dctx = zstd.ZstdDecompressor()
chunks = []
with dctx.stream_reader(frame, read_size=1) as reader:
while True:
chunk = reader.read(1)
if not chunk:
break
chunks.append(chunk)
self.assertEqual(reader.tell(), sum(map(len, chunks)))
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(b"".join(chunks), source)
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513
def test_read_stream(self):
cctx = zstd.ZstdCompressor()
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 source = b"".join([b"foo" * 60, b"bar" * 60, b"baz" * 60])
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 frame = cctx.compress(source)
dctx = zstd.ZstdDecompressor()
with dctx.stream_reader(io.BytesIO(frame)) as reader:
self.assertEqual(reader.tell(), 0)
chunk = reader.read(8192)
self.assertEqual(chunk, source)
self.assertEqual(reader.tell(), len(source))
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(reader.read(1), b"")
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 self.assertEqual(reader.tell(), len(source))
Gregory Szorc
zstandard: vendor python-zstandard 0.10.1...
r40157 self.assertFalse(reader.closed)
self.assertTrue(reader.closed)
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513
def test_read_stream_small_chunks(self):
cctx = zstd.ZstdCompressor()
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 source = b"".join([b"foo" * 60, b"bar" * 60, b"baz" * 60])
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 frame = cctx.compress(source)
dctx = zstd.ZstdDecompressor()
chunks = []
with dctx.stream_reader(io.BytesIO(frame), read_size=1) as reader:
while True:
chunk = reader.read(1)
if not chunk:
break
chunks.append(chunk)
self.assertEqual(reader.tell(), sum(map(len, chunks)))
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(b"".join(chunks), source)
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513
def test_read_after_exit(self):
cctx = zstd.ZstdCompressor()
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 frame = cctx.compress(b"foo" * 60)
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513
dctx = zstd.ZstdDecompressor()
with dctx.stream_reader(frame) as reader:
while reader.read(16):
pass
Gregory Szorc
zstandard: vendor python-zstandard 0.10.1...
r40157 self.assertTrue(reader.closed)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 with self.assertRaisesRegex(ValueError, "stream is closed"):
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 reader.read(10)
def test_illegal_seeks(self):
cctx = zstd.ZstdCompressor()
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 frame = cctx.compress(b"foo" * 60)
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513
dctx = zstd.ZstdDecompressor()
with dctx.stream_reader(frame) as reader:
Gregory Szorc
python-zstandard: blacken at 80 characters...
r44605 with self.assertRaisesRegex(
ValueError, "cannot seek to negative position"
):
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 reader.seek(-1, os.SEEK_SET)
reader.read(1)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 with self.assertRaisesRegex(
ValueError, "cannot seek zstd decompression stream backwards"
):
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 reader.seek(0, os.SEEK_SET)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 with self.assertRaisesRegex(
ValueError, "cannot seek zstd decompression stream backwards"
):
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 reader.seek(-1, os.SEEK_CUR)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 with self.assertRaisesRegex(
Gregory Szorc
python-zstandard: blacken at 80 characters...
r44605 ValueError,
"zstd decompression streams cannot be seeked with SEEK_END",
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 ):
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 reader.seek(0, os.SEEK_END)
reader.close()
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 with self.assertRaisesRegex(ValueError, "stream is closed"):
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 reader.seek(4, os.SEEK_SET)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 with self.assertRaisesRegex(ValueError, "stream is closed"):
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 reader.seek(0)
def test_seek(self):
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 source = b"foobar" * 60
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 cctx = zstd.ZstdCompressor()
frame = cctx.compress(source)
dctx = zstd.ZstdDecompressor()
with dctx.stream_reader(frame) as reader:
reader.seek(3)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(reader.read(3), b"bar")
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513
reader.seek(4, os.SEEK_CUR)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(reader.read(2), b"ar")
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513
Gregory Szorc
zstandard: vendor python-zstandard 0.10.1...
r40157 def test_no_context_manager(self):
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 source = b"foobar" * 60
Gregory Szorc
zstandard: vendor python-zstandard 0.10.1...
r40157 cctx = zstd.ZstdCompressor()
frame = cctx.compress(source)
dctx = zstd.ZstdDecompressor()
reader = dctx.stream_reader(frame)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(reader.read(6), b"foobar")
self.assertEqual(reader.read(18), b"foobar" * 3)
Gregory Szorc
zstandard: vendor python-zstandard 0.10.1...
r40157 self.assertFalse(reader.closed)
# Calling close prevents subsequent use.
reader.close()
self.assertTrue(reader.closed)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 with self.assertRaisesRegex(ValueError, "stream is closed"):
Gregory Szorc
zstandard: vendor python-zstandard 0.10.1...
r40157 reader.read(6)
def test_read_after_error(self):
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 source = io.BytesIO(b"")
Gregory Szorc
zstandard: vendor python-zstandard 0.10.1...
r40157 dctx = zstd.ZstdDecompressor()
reader = dctx.stream_reader(source)
with reader:
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 reader.read(0)
Gregory Szorc
zstandard: vendor python-zstandard 0.10.1...
r40157
with reader:
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 with self.assertRaisesRegex(ValueError, "stream is closed"):
Gregory Szorc
zstandard: vendor python-zstandard 0.10.1...
r40157 reader.read(100)
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 def test_partial_read(self):
# Inspired by https://github.com/indygreg/python-zstandard/issues/71.
buffer = io.BytesIO()
cctx = zstd.ZstdCompressor()
writer = cctx.stream_writer(buffer)
writer.write(bytearray(os.urandom(1000000)))
writer.flush(zstd.FLUSH_FRAME)
buffer.seek(0)
dctx = zstd.ZstdDecompressor()
reader = dctx.stream_reader(buffer)
while True:
chunk = reader.read(8192)
if not chunk:
break
def test_read_multiple_frames(self):
cctx = zstd.ZstdCompressor()
source = io.BytesIO()
writer = cctx.stream_writer(source)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 writer.write(b"foo")
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 writer.flush(zstd.FLUSH_FRAME)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 writer.write(b"bar")
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 writer.flush(zstd.FLUSH_FRAME)
dctx = zstd.ZstdDecompressor()
reader = dctx.stream_reader(source.getvalue())
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(reader.read(2), b"fo")
self.assertEqual(reader.read(2), b"o")
self.assertEqual(reader.read(2), b"ba")
self.assertEqual(reader.read(2), b"r")
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
source.seek(0)
reader = dctx.stream_reader(source)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(reader.read(2), b"fo")
self.assertEqual(reader.read(2), b"o")
self.assertEqual(reader.read(2), b"ba")
self.assertEqual(reader.read(2), b"r")
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
reader = dctx.stream_reader(source.getvalue())
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(reader.read(3), b"foo")
self.assertEqual(reader.read(3), b"bar")
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
source.seek(0)
reader = dctx.stream_reader(source)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(reader.read(3), b"foo")
self.assertEqual(reader.read(3), b"bar")
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
reader = dctx.stream_reader(source.getvalue())
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(reader.read(4), b"foo")
self.assertEqual(reader.read(4), b"bar")
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
source.seek(0)
reader = dctx.stream_reader(source)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(reader.read(4), b"foo")
self.assertEqual(reader.read(4), b"bar")
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
reader = dctx.stream_reader(source.getvalue())
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(reader.read(128), b"foo")
self.assertEqual(reader.read(128), b"bar")
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
source.seek(0)
reader = dctx.stream_reader(source)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(reader.read(128), b"foo")
self.assertEqual(reader.read(128), b"bar")
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
# Now tests for reads spanning frames.
reader = dctx.stream_reader(source.getvalue(), read_across_frames=True)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(reader.read(3), b"foo")
self.assertEqual(reader.read(3), b"bar")
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
source.seek(0)
reader = dctx.stream_reader(source, read_across_frames=True)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(reader.read(3), b"foo")
self.assertEqual(reader.read(3), b"bar")
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
reader = dctx.stream_reader(source.getvalue(), read_across_frames=True)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(reader.read(6), b"foobar")
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
source.seek(0)
reader = dctx.stream_reader(source, read_across_frames=True)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(reader.read(6), b"foobar")
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
reader = dctx.stream_reader(source.getvalue(), read_across_frames=True)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(reader.read(7), b"foobar")
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
source.seek(0)
reader = dctx.stream_reader(source, read_across_frames=True)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(reader.read(7), b"foobar")
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
reader = dctx.stream_reader(source.getvalue(), read_across_frames=True)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(reader.read(128), b"foobar")
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
source.seek(0)
reader = dctx.stream_reader(source, read_across_frames=True)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(reader.read(128), b"foobar")
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
def test_readinto(self):
cctx = zstd.ZstdCompressor()
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 foo = cctx.compress(b"foo")
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
dctx = zstd.ZstdDecompressor()
# Attempting to readinto() a non-writable buffer fails.
# The exact exception varies based on the backend.
reader = dctx.stream_reader(foo)
with self.assertRaises(Exception):
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 reader.readinto(b"foobar")
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
# readinto() with sufficiently large destination.
b = bytearray(1024)
reader = dctx.stream_reader(foo)
self.assertEqual(reader.readinto(b), 3)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(b[0:3], b"foo")
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 self.assertEqual(reader.readinto(b), 0)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(b[0:3], b"foo")
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
# readinto() with small reads.
b = bytearray(1024)
reader = dctx.stream_reader(foo, read_size=1)
self.assertEqual(reader.readinto(b), 3)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(b[0:3], b"foo")
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
# Too small destination buffer.
b = bytearray(2)
reader = dctx.stream_reader(foo)
self.assertEqual(reader.readinto(b), 2)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(b[:], b"fo")
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
def test_readinto1(self):
cctx = zstd.ZstdCompressor()
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 foo = cctx.compress(b"foo")
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
dctx = zstd.ZstdDecompressor()
reader = dctx.stream_reader(foo)
with self.assertRaises(Exception):
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 reader.readinto1(b"foobar")
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
# Sufficiently large destination.
b = bytearray(1024)
reader = dctx.stream_reader(foo)
self.assertEqual(reader.readinto1(b), 3)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(b[0:3], b"foo")
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 self.assertEqual(reader.readinto1(b), 0)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(b[0:3], b"foo")
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
# readinto() with small reads.
b = bytearray(1024)
reader = dctx.stream_reader(foo, read_size=1)
self.assertEqual(reader.readinto1(b), 3)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(b[0:3], b"foo")
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
# Too small destination buffer.
b = bytearray(2)
reader = dctx.stream_reader(foo)
self.assertEqual(reader.readinto1(b), 2)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(b[:], b"fo")
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
def test_readall(self):
cctx = zstd.ZstdCompressor()
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 foo = cctx.compress(b"foo")
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
dctx = zstd.ZstdDecompressor()
reader = dctx.stream_reader(foo)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(reader.readall(), b"foo")
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
def test_read1(self):
cctx = zstd.ZstdCompressor()
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 foo = cctx.compress(b"foo")
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
dctx = zstd.ZstdDecompressor()
b = OpCountingBytesIO(foo)
reader = dctx.stream_reader(b)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(reader.read1(), b"foo")
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 self.assertEqual(b._read_count, 1)
b = OpCountingBytesIO(foo)
reader = dctx.stream_reader(b)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(reader.read1(0), b"")
self.assertEqual(reader.read1(2), b"fo")
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 self.assertEqual(b._read_count, 1)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(reader.read1(1), b"o")
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 self.assertEqual(b._read_count, 1)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(reader.read1(1), b"")
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 self.assertEqual(b._read_count, 2)
def test_read_lines(self):
cctx = zstd.ZstdCompressor()
Gregory Szorc
python-zstandard: blacken at 80 characters...
r44605 source = b"\n".join(
("line %d" % i).encode("ascii") for i in range(1024)
)
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
frame = cctx.compress(source)
dctx = zstd.ZstdDecompressor()
reader = dctx.stream_reader(frame)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 tr = io.TextIOWrapper(reader, encoding="utf-8")
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
lines = []
for line in tr:
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 lines.append(line.encode("utf-8"))
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
self.assertEqual(len(lines), 1024)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(b"".join(lines), source)
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
reader = dctx.stream_reader(frame)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 tr = io.TextIOWrapper(reader, encoding="utf-8")
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
lines = tr.readlines()
self.assertEqual(len(lines), 1024)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual("".join(lines).encode("utf-8"), source)
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
reader = dctx.stream_reader(frame)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 tr = io.TextIOWrapper(reader, encoding="utf-8")
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
lines = []
while True:
line = tr.readline()
if not line:
break
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 lines.append(line.encode("utf-8"))
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
self.assertEqual(len(lines), 1024)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(b"".join(lines), source)
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513
@make_cffi
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 class TestDecompressor_decompressobj(TestCase):
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 def test_simple(self):
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 data = zstd.ZstdCompressor(level=1).compress(b"foobar")
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435
dctx = zstd.ZstdDecompressor()
dobj = dctx.decompressobj()
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(dobj.decompress(data), b"foobar")
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 self.assertIsNone(dobj.flush())
self.assertIsNone(dobj.flush(10))
self.assertIsNone(dobj.flush(length=100))
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 def test_input_types(self):
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 compressed = zstd.ZstdCompressor(level=1).compress(b"foo")
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513
dctx = zstd.ZstdDecompressor()
mutable_array = bytearray(len(compressed))
mutable_array[:] = compressed
sources = [
memoryview(compressed),
bytearray(compressed),
mutable_array,
]
for source in sources:
dobj = dctx.decompressobj()
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 self.assertIsNone(dobj.flush())
self.assertIsNone(dobj.flush(10))
self.assertIsNone(dobj.flush(length=100))
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(dobj.decompress(source), b"foo")
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 self.assertIsNone(dobj.flush())
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 def test_reuse(self):
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 data = zstd.ZstdCompressor(level=1).compress(b"foobar")
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435
dctx = zstd.ZstdDecompressor()
dobj = dctx.decompressobj()
dobj.decompress(data)
Gregory Szorc
python-zstandard: blacken at 80 characters...
r44605 with self.assertRaisesRegex(
zstd.ZstdError, "cannot use a decompressobj"
):
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 dobj.decompress(data)
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 self.assertIsNone(dobj.flush())
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 def test_bad_write_size(self):
dctx = zstd.ZstdDecompressor()
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 with self.assertRaisesRegex(ValueError, "write_size must be positive"):
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 dctx.decompressobj(write_size=0)
def test_write_size(self):
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 source = b"foo" * 64 + b"bar" * 128
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 data = zstd.ZstdCompressor(level=1).compress(source)
dctx = zstd.ZstdDecompressor()
for i in range(128):
dobj = dctx.decompressobj(write_size=i + 1)
self.assertEqual(dobj.decompress(data), source)
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 def decompress_via_writer(data):
buffer = io.BytesIO()
dctx = zstd.ZstdDecompressor()
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 decompressor = dctx.stream_writer(buffer)
decompressor.write(data)
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 return buffer.getvalue()
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 @make_cffi
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 class TestDecompressor_stream_writer(TestCase):
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 def test_io_api(self):
buffer = io.BytesIO()
dctx = zstd.ZstdDecompressor()
writer = dctx.stream_writer(buffer)
self.assertFalse(writer.closed)
self.assertFalse(writer.isatty())
self.assertFalse(writer.readable())
with self.assertRaises(io.UnsupportedOperation):
writer.readline()
with self.assertRaises(io.UnsupportedOperation):
writer.readline(42)
with self.assertRaises(io.UnsupportedOperation):
writer.readline(size=42)
with self.assertRaises(io.UnsupportedOperation):
writer.readlines()
with self.assertRaises(io.UnsupportedOperation):
writer.readlines(42)
with self.assertRaises(io.UnsupportedOperation):
writer.readlines(hint=42)
with self.assertRaises(io.UnsupportedOperation):
writer.seek(0)
with self.assertRaises(io.UnsupportedOperation):
writer.seek(10, os.SEEK_SET)
self.assertFalse(writer.seekable())
with self.assertRaises(io.UnsupportedOperation):
writer.tell()
with self.assertRaises(io.UnsupportedOperation):
writer.truncate()
with self.assertRaises(io.UnsupportedOperation):
writer.truncate(42)
with self.assertRaises(io.UnsupportedOperation):
writer.truncate(size=42)
self.assertTrue(writer.writable())
with self.assertRaises(io.UnsupportedOperation):
writer.writelines([])
with self.assertRaises(io.UnsupportedOperation):
writer.read()
with self.assertRaises(io.UnsupportedOperation):
writer.read(42)
with self.assertRaises(io.UnsupportedOperation):
writer.read(size=42)
with self.assertRaises(io.UnsupportedOperation):
writer.readall()
with self.assertRaises(io.UnsupportedOperation):
writer.readinto(None)
with self.assertRaises(io.UnsupportedOperation):
writer.fileno()
def test_fileno_file(self):
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 with tempfile.TemporaryFile("wb") as tf:
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 dctx = zstd.ZstdDecompressor()
writer = dctx.stream_writer(tf)
self.assertEqual(writer.fileno(), tf.fileno())
def test_close(self):
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 foo = zstd.ZstdCompressor().compress(b"foo")
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
buffer = NonClosingBytesIO()
dctx = zstd.ZstdDecompressor()
writer = dctx.stream_writer(buffer)
writer.write(foo)
self.assertFalse(writer.closed)
self.assertFalse(buffer.closed)
writer.close()
self.assertTrue(writer.closed)
self.assertTrue(buffer.closed)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 with self.assertRaisesRegex(ValueError, "stream is closed"):
writer.write(b"")
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 with self.assertRaisesRegex(ValueError, "stream is closed"):
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 writer.flush()
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 with self.assertRaisesRegex(ValueError, "stream is closed"):
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 with writer:
pass
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(buffer.getvalue(), b"foo")
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
# Context manager exit should close stream.
buffer = NonClosingBytesIO()
writer = dctx.stream_writer(buffer)
with writer:
writer.write(foo)
self.assertTrue(writer.closed)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(buffer.getvalue(), b"foo")
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
def test_flush(self):
buffer = OpCountingBytesIO()
dctx = zstd.ZstdDecompressor()
writer = dctx.stream_writer(buffer)
writer.flush()
self.assertEqual(buffer._flush_count, 1)
writer.flush()
self.assertEqual(buffer._flush_count, 2)
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 def test_empty_roundtrip(self):
cctx = zstd.ZstdCompressor()
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 empty = cctx.compress(b"")
self.assertEqual(decompress_via_writer(empty), b"")
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 def test_input_types(self):
cctx = zstd.ZstdCompressor(level=1)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 compressed = cctx.compress(b"foo")
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513
mutable_array = bytearray(len(compressed))
mutable_array[:] = compressed
sources = [
memoryview(compressed),
bytearray(compressed),
mutable_array,
]
dctx = zstd.ZstdDecompressor()
for source in sources:
buffer = io.BytesIO()
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
decompressor = dctx.stream_writer(buffer)
decompressor.write(source)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(buffer.getvalue(), b"foo")
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
buffer = NonClosingBytesIO()
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 with dctx.stream_writer(buffer) as decompressor:
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 self.assertEqual(decompressor.write(source), 3)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(buffer.getvalue(), b"foo")
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 buffer = io.BytesIO()
writer = dctx.stream_writer(buffer, write_return_read=True)
self.assertEqual(writer.write(source), len(source))
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(buffer.getvalue(), b"foo")
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 def test_large_roundtrip(self):
chunks = []
for i in range(255):
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 chunks.append(struct.Struct(">B").pack(i) * 16384)
orig = b"".join(chunks)
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 cctx = zstd.ZstdCompressor()
compressed = cctx.compress(orig)
self.assertEqual(decompress_via_writer(compressed), orig)
def test_multiple_calls(self):
chunks = []
for i in range(255):
for j in range(255):
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 chunks.append(struct.Struct(">B").pack(j) * i)
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 orig = b"".join(chunks)
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 cctx = zstd.ZstdCompressor()
compressed = cctx.compress(orig)
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 buffer = NonClosingBytesIO()
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 dctx = zstd.ZstdDecompressor()
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 with dctx.stream_writer(buffer) as decompressor:
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 pos = 0
while pos < len(compressed):
pos2 = pos + 8192
decompressor.write(compressed[pos:pos2])
pos += 8192
self.assertEqual(buffer.getvalue(), orig)
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 # Again with write_return_read=True
buffer = io.BytesIO()
writer = dctx.stream_writer(buffer, write_return_read=True)
pos = 0
while pos < len(compressed):
pos2 = pos + 8192
chunk = compressed[pos:pos2]
self.assertEqual(writer.write(chunk), len(chunk))
pos += 8192
self.assertEqual(buffer.getvalue(), orig)
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 def test_dictionary(self):
samples = []
for i in range(128):
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 samples.append(b"foo" * 64)
samples.append(b"bar" * 64)
samples.append(b"foobar" * 64)
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435
d = zstd.train_dictionary(8192, samples)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 orig = b"foobar" * 16384
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 buffer = NonClosingBytesIO()
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 cctx = zstd.ZstdCompressor(dict_data=d)
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 with cctx.stream_writer(buffer) as compressor:
self.assertEqual(compressor.write(orig), 0)
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435
compressed = buffer.getvalue()
buffer = io.BytesIO()
dctx = zstd.ZstdDecompressor(dict_data=d)
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 decompressor = dctx.stream_writer(buffer)
self.assertEqual(decompressor.write(compressed), len(orig))
self.assertEqual(buffer.getvalue(), orig)
buffer = NonClosingBytesIO()
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 with dctx.stream_writer(buffer) as decompressor:
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 self.assertEqual(decompressor.write(compressed), len(orig))
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435
self.assertEqual(buffer.getvalue(), orig)
def test_memory_size(self):
dctx = zstd.ZstdDecompressor()
buffer = io.BytesIO()
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
decompressor = dctx.stream_writer(buffer)
size = decompressor.memory_size()
self.assertGreater(size, 100000)
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 with dctx.stream_writer(buffer) as decompressor:
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 size = decompressor.memory_size()
self.assertGreater(size, 100000)
def test_write_size(self):
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 source = zstd.ZstdCompressor().compress(b"foobarfoobar")
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 dest = OpCountingBytesIO()
dctx = zstd.ZstdDecompressor()
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 with dctx.stream_writer(dest, write_size=1) as decompressor:
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 s = struct.Struct(">B")
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 for c in source:
if not isinstance(c, str):
c = s.pack(c)
decompressor.write(c)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(dest.getvalue(), b"foobarfoobar")
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 self.assertEqual(dest._write_count, len(dest.getvalue()))
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 @make_cffi
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 class TestDecompressor_read_to_iter(TestCase):
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 def test_type_validation(self):
dctx = zstd.ZstdDecompressor()
# Object with read() works.
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 dctx.read_to_iter(io.BytesIO())
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435
# Buffer protocol works.
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 dctx.read_to_iter(b"foobar")
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435
Gregory Szorc
python-zstandard: blacken at 80 characters...
r44605 with self.assertRaisesRegex(
ValueError, "must pass an object with a read"
):
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 b"".join(dctx.read_to_iter(True))
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435
def test_empty_input(self):
dctx = zstd.ZstdDecompressor()
source = io.BytesIO()
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 it = dctx.read_to_iter(source)
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 # TODO this is arguably wrong. Should get an error about missing frame foo.
with self.assertRaises(StopIteration):
next(it)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 it = dctx.read_to_iter(b"")
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 with self.assertRaises(StopIteration):
next(it)
def test_invalid_input(self):
dctx = zstd.ZstdDecompressor()
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 source = io.BytesIO(b"foobar")
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 it = dctx.read_to_iter(source)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 with self.assertRaisesRegex(zstd.ZstdError, "Unknown frame descriptor"):
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 next(it)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 it = dctx.read_to_iter(b"foobar")
with self.assertRaisesRegex(zstd.ZstdError, "Unknown frame descriptor"):
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 next(it)
def test_empty_roundtrip(self):
cctx = zstd.ZstdCompressor(level=1, write_content_size=False)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 empty = cctx.compress(b"")
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435
source = io.BytesIO(empty)
source.seek(0)
dctx = zstd.ZstdDecompressor()
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 it = dctx.read_to_iter(source)
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435
# No chunks should be emitted since there is no data.
with self.assertRaises(StopIteration):
next(it)
# Again for good measure.
with self.assertRaises(StopIteration):
next(it)
def test_skip_bytes_too_large(self):
dctx = zstd.ZstdDecompressor()
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 with self.assertRaisesRegex(
ValueError, "skip_bytes must be smaller than read_size"
):
b"".join(dctx.read_to_iter(b"", skip_bytes=1, read_size=1))
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 with self.assertRaisesRegex(
ValueError, "skip_bytes larger than first input chunk"
):
b"".join(dctx.read_to_iter(b"foobar", skip_bytes=10))
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435
def test_skip_bytes(self):
cctx = zstd.ZstdCompressor(write_content_size=False)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 compressed = cctx.compress(b"foobar")
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435
dctx = zstd.ZstdDecompressor()
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 output = b"".join(dctx.read_to_iter(b"hdr" + compressed, skip_bytes=3))
self.assertEqual(output, b"foobar")
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435
def test_large_output(self):
source = io.BytesIO()
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 source.write(b"f" * zstd.DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE)
source.write(b"o")
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 source.seek(0)
cctx = zstd.ZstdCompressor(level=1)
compressed = io.BytesIO(cctx.compress(source.getvalue()))
compressed.seek(0)
dctx = zstd.ZstdDecompressor()
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 it = dctx.read_to_iter(compressed)
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435
chunks = []
chunks.append(next(it))
chunks.append(next(it))
with self.assertRaises(StopIteration):
next(it)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 decompressed = b"".join(chunks)
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 self.assertEqual(decompressed, source.getvalue())
# And again with buffer protocol.
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 it = dctx.read_to_iter(compressed.getvalue())
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 chunks = []
chunks.append(next(it))
chunks.append(next(it))
with self.assertRaises(StopIteration):
next(it)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 decompressed = b"".join(chunks)
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 self.assertEqual(decompressed, source.getvalue())
Gregory Szorc
python-zstandard: blacken at 80 characters...
r44605 @unittest.skipUnless(
"ZSTD_SLOW_TESTS" in os.environ, "ZSTD_SLOW_TESTS not set"
)
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 def test_large_input(self):
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 bytes = list(struct.Struct(">B").pack(i) for i in range(256))
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 compressed = NonClosingBytesIO()
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 input_size = 0
cctx = zstd.ZstdCompressor(level=1)
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 with cctx.stream_writer(compressed) as compressor:
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 while True:
compressor.write(random.choice(bytes))
input_size += 1
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 have_compressed = (
len(compressed.getvalue())
> zstd.DECOMPRESSION_RECOMMENDED_INPUT_SIZE
)
Gregory Szorc
python-zstandard: blacken at 80 characters...
r44605 have_raw = (
input_size > zstd.DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE * 2
)
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 if have_compressed and have_raw:
break
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 compressed = io.BytesIO(compressed.getvalue())
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertGreater(
Gregory Szorc
python-zstandard: blacken at 80 characters...
r44605 len(compressed.getvalue()),
zstd.DECOMPRESSION_RECOMMENDED_INPUT_SIZE,
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 )
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435
dctx = zstd.ZstdDecompressor()
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 it = dctx.read_to_iter(compressed)
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435
chunks = []
chunks.append(next(it))
chunks.append(next(it))
chunks.append(next(it))
with self.assertRaises(StopIteration):
next(it)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 decompressed = b"".join(chunks)
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 self.assertEqual(len(decompressed), input_size)
# And again with buffer protocol.
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 it = dctx.read_to_iter(compressed.getvalue())
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435
chunks = []
chunks.append(next(it))
chunks.append(next(it))
chunks.append(next(it))
with self.assertRaises(StopIteration):
next(it)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 decompressed = b"".join(chunks)
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 self.assertEqual(len(decompressed), input_size)
def test_interesting(self):
# Found this edge case via fuzzing.
cctx = zstd.ZstdCompressor(level=1)
source = io.BytesIO()
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 compressed = NonClosingBytesIO()
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 with cctx.stream_writer(compressed) as compressor:
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 for i in range(256):
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 chunk = b"\0" * 1024
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 compressor.write(chunk)
source.write(chunk)
dctx = zstd.ZstdDecompressor()
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 simple = dctx.decompress(
compressed.getvalue(), max_output_size=len(source.getvalue())
)
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 self.assertEqual(simple, source.getvalue())
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 compressed = io.BytesIO(compressed.getvalue())
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 streamed = b"".join(dctx.read_to_iter(compressed))
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 self.assertEqual(streamed, source.getvalue())
def test_read_write_size(self):
Gregory Szorc
python-zstandard: blacken at 80 characters...
r44605 source = OpCountingBytesIO(
zstd.ZstdCompressor().compress(b"foobarfoobar")
)
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 dctx = zstd.ZstdDecompressor()
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 for chunk in dctx.read_to_iter(source, read_size=1, write_size=1):
Gregory Szorc
zstd: vendor python-zstandard 0.5.0...
r30435 self.assertEqual(len(chunk), 1)
self.assertEqual(source._read_count, len(source.getvalue()))
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 def test_magic_less(self):
params = zstd.CompressionParameters.from_level(
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 1, format=zstd.FORMAT_ZSTD1_MAGICLESS
)
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 cctx = zstd.ZstdCompressor(compression_params=params)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 frame = cctx.compress(b"foobar")
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertNotEqual(frame[0:4], b"\x28\xb5\x2f\xfd")
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513
dctx = zstd.ZstdDecompressor()
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 with self.assertRaisesRegex(
zstd.ZstdError, "error determining content size from frame header"
):
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 dctx.decompress(frame)
dctx = zstd.ZstdDecompressor(format=zstd.FORMAT_ZSTD1_MAGICLESS)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 res = b"".join(dctx.read_to_iter(frame))
self.assertEqual(res, b"foobar")
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895
@make_cffi
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 class TestDecompressor_content_dict_chain(TestCase):
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 def test_bad_inputs_simple(self):
dctx = zstd.ZstdDecompressor()
with self.assertRaises(TypeError):
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 dctx.decompress_content_dict_chain(b"foo")
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895
with self.assertRaises(TypeError):
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 dctx.decompress_content_dict_chain((b"foo", b"bar"))
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 with self.assertRaisesRegex(ValueError, "empty input chain"):
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 dctx.decompress_content_dict_chain([])
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 with self.assertRaisesRegex(ValueError, "chunk 0 must be bytes"):
dctx.decompress_content_dict_chain([u"foo"])
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 with self.assertRaisesRegex(ValueError, "chunk 0 must be bytes"):
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 dctx.decompress_content_dict_chain([True])
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 with self.assertRaisesRegex(
ValueError, "chunk 0 is too small to contain a zstd frame"
):
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 dctx.decompress_content_dict_chain([zstd.FRAME_HEADER])
Gregory Szorc
python-zstandard: blacken at 80 characters...
r44605 with self.assertRaisesRegex(
ValueError, "chunk 0 is not a valid zstd frame"
):
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 dctx.decompress_content_dict_chain([b"foo" * 8])
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895
Gregory Szorc
python-zstandard: blacken at 80 characters...
r44605 no_size = zstd.ZstdCompressor(write_content_size=False).compress(
b"foo" * 64
)
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 with self.assertRaisesRegex(
ValueError, "chunk 0 missing content size in frame"
):
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 dctx.decompress_content_dict_chain([no_size])
# Corrupt first frame.
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 frame = zstd.ZstdCompressor().compress(b"foo" * 64)
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 frame = frame[0:12] + frame[15:]
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 with self.assertRaisesRegex(
zstd.ZstdError, "chunk 0 did not decompress full frame"
):
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 dctx.decompress_content_dict_chain([frame])
def test_bad_subsequent_input(self):
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 initial = zstd.ZstdCompressor().compress(b"foo" * 64)
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895
dctx = zstd.ZstdDecompressor()
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 with self.assertRaisesRegex(ValueError, "chunk 1 must be bytes"):
dctx.decompress_content_dict_chain([initial, u"foo"])
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 with self.assertRaisesRegex(ValueError, "chunk 1 must be bytes"):
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 dctx.decompress_content_dict_chain([initial, None])
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 with self.assertRaisesRegex(
ValueError, "chunk 1 is too small to contain a zstd frame"
):
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 dctx.decompress_content_dict_chain([initial, zstd.FRAME_HEADER])
Gregory Szorc
python-zstandard: blacken at 80 characters...
r44605 with self.assertRaisesRegex(
ValueError, "chunk 1 is not a valid zstd frame"
):
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 dctx.decompress_content_dict_chain([initial, b"foo" * 8])
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895
Gregory Szorc
python-zstandard: blacken at 80 characters...
r44605 no_size = zstd.ZstdCompressor(write_content_size=False).compress(
b"foo" * 64
)
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 with self.assertRaisesRegex(
ValueError, "chunk 1 missing content size in frame"
):
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 dctx.decompress_content_dict_chain([initial, no_size])
# Corrupt second frame.
Gregory Szorc
python-zstandard: blacken at 80 characters...
r44605 cctx = zstd.ZstdCompressor(
dict_data=zstd.ZstdCompressionDict(b"foo" * 64)
)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 frame = cctx.compress(b"bar" * 64)
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 frame = frame[0:12] + frame[15:]
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 with self.assertRaisesRegex(
zstd.ZstdError, "chunk 1 did not decompress full frame"
):
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 dctx.decompress_content_dict_chain([initial, frame])
def test_simple(self):
original = [
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 b"foo" * 64,
b"foobar" * 64,
b"baz" * 64,
b"foobaz" * 64,
b"foobarbaz" * 64,
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 ]
chunks = []
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 chunks.append(zstd.ZstdCompressor().compress(original[0]))
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 for i, chunk in enumerate(original[1:]):
d = zstd.ZstdCompressionDict(original[i])
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 cctx = zstd.ZstdCompressor(dict_data=d)
Gregory Szorc
zstd: vendor python-zstandard 0.7.0...
r30895 chunks.append(cctx.compress(chunk))
for i in range(1, len(original)):
chain = chunks[0:i]
expected = original[i - 1]
dctx = zstd.ZstdDecompressor()
decompressed = dctx.decompress_content_dict_chain(chain)
self.assertEqual(decompressed, expected)
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796
# TODO enable for CFFI
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 class TestDecompressor_multi_decompress_to_buffer(TestCase):
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 def test_invalid_inputs(self):
dctx = zstd.ZstdDecompressor()
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 if not hasattr(dctx, "multi_decompress_to_buffer"):
self.skipTest("multi_decompress_to_buffer not available")
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 with self.assertRaises(TypeError):
dctx.multi_decompress_to_buffer(True)
with self.assertRaises(TypeError):
dctx.multi_decompress_to_buffer((1, 2))
Gregory Szorc
python-zstandard: blacken at 80 characters...
r44605 with self.assertRaisesRegex(
TypeError, "item 0 not a bytes like object"
):
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 dctx.multi_decompress_to_buffer([u"foo"])
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 with self.assertRaisesRegex(
ValueError, "could not determine decompressed size of item 0"
):
dctx.multi_decompress_to_buffer([b"foobarbaz"])
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796
def test_list_input(self):
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 cctx = zstd.ZstdCompressor()
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 original = [b"foo" * 4, b"bar" * 6]
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 frames = [cctx.compress(d) for d in original]
dctx = zstd.ZstdDecompressor()
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 if not hasattr(dctx, "multi_decompress_to_buffer"):
self.skipTest("multi_decompress_to_buffer not available")
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 result = dctx.multi_decompress_to_buffer(frames)
self.assertEqual(len(result), len(frames))
self.assertEqual(result.size(), sum(map(len, original)))
for i, data in enumerate(original):
self.assertEqual(result[i].tobytes(), data)
self.assertEqual(result[0].offset, 0)
self.assertEqual(len(result[0]), 12)
self.assertEqual(result[1].offset, 12)
self.assertEqual(len(result[1]), 18)
def test_list_input_frame_sizes(self):
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 cctx = zstd.ZstdCompressor()
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 original = [b"foo" * 4, b"bar" * 6, b"baz" * 8]
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 frames = [cctx.compress(d) for d in original]
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 sizes = struct.pack("=" + "Q" * len(original), *map(len, original))
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796
dctx = zstd.ZstdDecompressor()
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 if not hasattr(dctx, "multi_decompress_to_buffer"):
self.skipTest("multi_decompress_to_buffer not available")
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
Gregory Szorc
python-zstandard: blacken at 80 characters...
r44605 result = dctx.multi_decompress_to_buffer(
frames, decompressed_sizes=sizes
)
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796
self.assertEqual(len(result), len(frames))
self.assertEqual(result.size(), sum(map(len, original)))
for i, data in enumerate(original):
self.assertEqual(result[i].tobytes(), data)
def test_buffer_with_segments_input(self):
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 cctx = zstd.ZstdCompressor()
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 original = [b"foo" * 4, b"bar" * 6]
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 frames = [cctx.compress(d) for d in original]
dctx = zstd.ZstdDecompressor()
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 if not hasattr(dctx, "multi_decompress_to_buffer"):
self.skipTest("multi_decompress_to_buffer not available")
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 segments = struct.pack(
"=QQQQ", 0, len(frames[0]), len(frames[0]), len(frames[1])
)
b = zstd.BufferWithSegments(b"".join(frames), segments)
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796
result = dctx.multi_decompress_to_buffer(b)
self.assertEqual(len(result), len(frames))
self.assertEqual(result[0].offset, 0)
self.assertEqual(len(result[0]), 12)
self.assertEqual(result[1].offset, 12)
self.assertEqual(len(result[1]), 18)
def test_buffer_with_segments_sizes(self):
cctx = zstd.ZstdCompressor(write_content_size=False)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 original = [b"foo" * 4, b"bar" * 6, b"baz" * 8]
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 frames = [cctx.compress(d) for d in original]
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 sizes = struct.pack("=" + "Q" * len(original), *map(len, original))
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237 dctx = zstd.ZstdDecompressor()
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 if not hasattr(dctx, "multi_decompress_to_buffer"):
self.skipTest("multi_decompress_to_buffer not available")
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 segments = struct.pack(
"=QQQQQQ",
0,
len(frames[0]),
len(frames[0]),
len(frames[1]),
len(frames[0]) + len(frames[1]),
len(frames[2]),
)
b = zstd.BufferWithSegments(b"".join(frames), segments)
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796
result = dctx.multi_decompress_to_buffer(b, decompressed_sizes=sizes)
self.assertEqual(len(result), len(frames))
self.assertEqual(result.size(), sum(map(len, original)))
for i, data in enumerate(original):
self.assertEqual(result[i].tobytes(), data)
def test_buffer_with_segments_collection_input(self):
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 cctx = zstd.ZstdCompressor()
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796
original = [
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 b"foo0" * 2,
b"foo1" * 3,
b"foo2" * 4,
b"foo3" * 5,
b"foo4" * 6,
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 ]
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 if not hasattr(cctx, "multi_compress_to_buffer"):
self.skipTest("multi_compress_to_buffer not available")
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 frames = cctx.multi_compress_to_buffer(original)
# Check round trip.
dctx = zstd.ZstdDecompressor()
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 decompressed = dctx.multi_decompress_to_buffer(frames, threads=3)
self.assertEqual(len(decompressed), len(original))
for i, data in enumerate(original):
self.assertEqual(data, decompressed[i].tobytes())
# And a manual mode.
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 b = b"".join([frames[0].tobytes(), frames[1].tobytes()])
b1 = zstd.BufferWithSegments(
Gregory Szorc
python-zstandard: blacken at 80 characters...
r44605 b,
struct.pack(
"=QQQQ", 0, len(frames[0]), len(frames[0]), len(frames[1])
),
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 )
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796
Gregory Szorc
python-zstandard: blacken at 80 characters...
r44605 b = b"".join(
[frames[2].tobytes(), frames[3].tobytes(), frames[4].tobytes()]
)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 b2 = zstd.BufferWithSegments(
b,
struct.pack(
"=QQQQQQ",
0,
len(frames[2]),
len(frames[2]),
len(frames[3]),
len(frames[2]) + len(frames[3]),
len(frames[4]),
),
)
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796
c = zstd.BufferWithSegmentsCollection(b1, b2)
dctx = zstd.ZstdDecompressor()
decompressed = dctx.multi_decompress_to_buffer(c)
self.assertEqual(len(decompressed), 5)
for i in range(5):
self.assertEqual(decompressed[i].tobytes(), original[i])
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 def test_dict(self):
d = zstd.train_dictionary(16384, generate_samples(), k=64, d=16)
cctx = zstd.ZstdCompressor(dict_data=d, level=1)
frames = [cctx.compress(s) for s in generate_samples()]
dctx = zstd.ZstdDecompressor(dict_data=d)
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 if not hasattr(dctx, "multi_decompress_to_buffer"):
self.skipTest("multi_decompress_to_buffer not available")
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 result = dctx.multi_decompress_to_buffer(frames)
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 self.assertEqual([o.tobytes() for o in result], generate_samples())
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 def test_multiple_threads(self):
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 cctx = zstd.ZstdCompressor()
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796
frames = []
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 frames.extend(cctx.compress(b"x" * 64) for i in range(256))
frames.extend(cctx.compress(b"y" * 64) for i in range(256))
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796
dctx = zstd.ZstdDecompressor()
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 if not hasattr(dctx, "multi_decompress_to_buffer"):
self.skipTest("multi_decompress_to_buffer not available")
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 result = dctx.multi_decompress_to_buffer(frames, threads=-1)
self.assertEqual(len(result), len(frames))
self.assertEqual(result.size(), 2 * 64 * 256)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 self.assertEqual(result[0].tobytes(), b"x" * 64)
self.assertEqual(result[256].tobytes(), b"y" * 64)
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796
def test_item_failure(self):
Gregory Szorc
zstandard: vendor python-zstandard 0.9.0...
r37513 cctx = zstd.ZstdCompressor()
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 frames = [cctx.compress(b"x" * 128), cctx.compress(b"y" * 128)]
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 frames[1] = frames[1][0:15] + b"extra" + frames[1][15:]
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796
dctx = zstd.ZstdDecompressor()
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 if not hasattr(dctx, "multi_decompress_to_buffer"):
self.skipTest("multi_decompress_to_buffer not available")
Gregory Szorc
zstandard: vendor python-zstandard 0.11...
r42237
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 with self.assertRaisesRegex(
zstd.ZstdError,
"error decompressing item 1: ("
"Corrupted block|"
"Destination buffer is too small)",
):
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 dctx.multi_decompress_to_buffer(frames)
Gregory Szorc
zstandard: vendor python-zstandard 0.13.0...
r44446 with self.assertRaisesRegex(
zstd.ZstdError,
"error decompressing item 1: ("
"Corrupted block|"
"Destination buffer is too small)",
):
Gregory Szorc
zstd: vendor python-zstandard 0.8.0...
r31796 dctx.multi_decompress_to_buffer(frames, threads=2)