import struct import unittest import zstandard as zstd from .common import TestCase ss = struct.Struct("=QQ") class TestBufferWithSegments(TestCase): def test_arguments(self): if not hasattr(zstd, "BufferWithSegments"): self.skipTest("BufferWithSegments not available") with self.assertRaises(TypeError): zstd.BufferWithSegments() with self.assertRaises(TypeError): zstd.BufferWithSegments(b"foo") # Segments data should be a multiple of 16. with self.assertRaisesRegex( ValueError, "segments array size is not a multiple of 16" ): zstd.BufferWithSegments(b"foo", b"\x00\x00") def test_invalid_offset(self): if not hasattr(zstd, "BufferWithSegments"): self.skipTest("BufferWithSegments not available") with self.assertRaisesRegex( ValueError, "offset within segments array references memory" ): zstd.BufferWithSegments(b"foo", ss.pack(0, 4)) def test_invalid_getitem(self): if not hasattr(zstd, "BufferWithSegments"): self.skipTest("BufferWithSegments not available") b = zstd.BufferWithSegments(b"foo", ss.pack(0, 3)) with self.assertRaisesRegex(IndexError, "offset must be non-negative"): test = b[-10] with self.assertRaisesRegex(IndexError, "offset must be less than 1"): test = b[1] with self.assertRaisesRegex(IndexError, "offset must be less than 1"): test = b[2] def test_single(self): if not hasattr(zstd, "BufferWithSegments"): self.skipTest("BufferWithSegments not available") b = zstd.BufferWithSegments(b"foo", ss.pack(0, 3)) self.assertEqual(len(b), 1) self.assertEqual(b.size, 3) self.assertEqual(b.tobytes(), b"foo") self.assertEqual(len(b[0]), 3) self.assertEqual(b[0].offset, 0) self.assertEqual(b[0].tobytes(), b"foo") def test_multiple(self): if not hasattr(zstd, "BufferWithSegments"): self.skipTest("BufferWithSegments not available") b = zstd.BufferWithSegments( b"foofooxfooxy", b"".join([ss.pack(0, 3), ss.pack(3, 4), ss.pack(7, 5)]), ) self.assertEqual(len(b), 3) self.assertEqual(b.size, 12) self.assertEqual(b.tobytes(), b"foofooxfooxy") self.assertEqual(b[0].tobytes(), b"foo") self.assertEqual(b[1].tobytes(), b"foox") self.assertEqual(b[2].tobytes(), b"fooxy") class TestBufferWithSegmentsCollection(TestCase): def test_empty_constructor(self): if not hasattr(zstd, "BufferWithSegmentsCollection"): self.skipTest("BufferWithSegmentsCollection not available") with self.assertRaisesRegex( ValueError, "must pass at least 1 argument" ): zstd.BufferWithSegmentsCollection() def test_argument_validation(self): if not hasattr(zstd, "BufferWithSegmentsCollection"): self.skipTest("BufferWithSegmentsCollection not available") with self.assertRaisesRegex( TypeError, "arguments must be BufferWithSegments" ): zstd.BufferWithSegmentsCollection(None) with self.assertRaisesRegex( TypeError, "arguments must be BufferWithSegments" ): zstd.BufferWithSegmentsCollection( zstd.BufferWithSegments(b"foo", ss.pack(0, 3)), None ) with self.assertRaisesRegex( ValueError, "ZstdBufferWithSegments cannot be empty" ): zstd.BufferWithSegmentsCollection(zstd.BufferWithSegments(b"", b"")) def test_length(self): if not hasattr(zstd, "BufferWithSegmentsCollection"): self.skipTest("BufferWithSegmentsCollection not available") b1 = zstd.BufferWithSegments(b"foo", ss.pack(0, 3)) b2 = zstd.BufferWithSegments( b"barbaz", b"".join([ss.pack(0, 3), ss.pack(3, 3)]) ) c = zstd.BufferWithSegmentsCollection(b1) self.assertEqual(len(c), 1) self.assertEqual(c.size(), 3) c = zstd.BufferWithSegmentsCollection(b2) self.assertEqual(len(c), 2) self.assertEqual(c.size(), 6) c = zstd.BufferWithSegmentsCollection(b1, b2) self.assertEqual(len(c), 3) self.assertEqual(c.size(), 9) def test_getitem(self): if not hasattr(zstd, "BufferWithSegmentsCollection"): self.skipTest("BufferWithSegmentsCollection not available") b1 = zstd.BufferWithSegments(b"foo", ss.pack(0, 3)) b2 = zstd.BufferWithSegments( b"barbaz", b"".join([ss.pack(0, 3), ss.pack(3, 3)]) ) c = zstd.BufferWithSegmentsCollection(b1, b2) with self.assertRaisesRegex(IndexError, "offset must be less than 3"): c[3] with self.assertRaisesRegex(IndexError, "offset must be less than 3"): c[4] self.assertEqual(c[0].tobytes(), b"foo") self.assertEqual(c[1].tobytes(), b"bar") self.assertEqual(c[2].tobytes(), b"baz")