##// END OF EJS Templates
zstd: vendor python-zstandard 0.8.0...
Gregory Szorc -
r31796:e0dc4053 default
parent child Browse files
Show More
This diff has been collapsed as it changes many lines, (770 lines changed) Show them Hide them
@@ -0,0 +1,770 b''
1 /**
2 * Copyright (c) 2017-present, Gregory Szorc
3 * All rights reserved.
4 *
5 * This software may be modified and distributed under the terms
6 * of the BSD license. See the LICENSE file for details.
7 */
8
9 #include "python-zstandard.h"
10
11 extern PyObject* ZstdError;
12
13 PyDoc_STRVAR(BufferWithSegments__doc__,
14 "BufferWithSegments - A memory buffer holding known sub-segments.\n"
15 "\n"
16 "This type represents a contiguous chunk of memory containing N discrete\n"
17 "items within sub-segments of that memory.\n"
18 "\n"
19 "Segments within the buffer are stored as an array of\n"
20 "``(offset, length)`` pairs, where each element is an unsigned 64-bit\n"
21 "integer using the host/native bit order representation.\n"
22 "\n"
23 "The type exists to facilitate operations against N>1 items without the\n"
24 "overhead of Python object creation and management.\n"
25 );
26
27 static void BufferWithSegments_dealloc(ZstdBufferWithSegments* self) {
28 /* Backing memory is either canonically owned by a Py_buffer or by us. */
29 if (self->parent.buf) {
30 PyBuffer_Release(&self->parent);
31 }
32 else if (self->useFree) {
33 free(self->data);
34 }
35 else {
36 PyMem_Free(self->data);
37 }
38
39 self->data = NULL;
40
41 if (self->useFree) {
42 free(self->segments);
43 }
44 else {
45 PyMem_Free(self->segments);
46 }
47
48 self->segments = NULL;
49
50 PyObject_Del(self);
51 }
52
53 static int BufferWithSegments_init(ZstdBufferWithSegments* self, PyObject* args, PyObject* kwargs) {
54 static char* kwlist[] = {
55 "data",
56 "segments",
57 NULL
58 };
59
60 Py_buffer segments;
61 Py_ssize_t segmentCount;
62 Py_ssize_t i;
63
64 memset(&self->parent, 0, sizeof(self->parent));
65
66 #if PY_MAJOR_VERSION >= 3
67 if (!PyArg_ParseTupleAndKeywords(args, kwargs, "y*y*:BufferWithSegments",
68 #else
69 if (!PyArg_ParseTupleAndKeywords(args, kwargs, "s*s*:BufferWithSegments",
70 #endif
71 kwlist, &self->parent, &segments)) {
72 return -1;
73 }
74
75 if (!PyBuffer_IsContiguous(&self->parent, 'C') || self->parent.ndim > 1) {
76 PyErr_SetString(PyExc_ValueError, "data buffer should be contiguous and have a single dimension");
77 goto except;
78 }
79
80 if (!PyBuffer_IsContiguous(&segments, 'C') || segments.ndim > 1) {
81 PyErr_SetString(PyExc_ValueError, "segments buffer should be contiguous and have a single dimension");
82 goto except;
83 }
84
85 if (segments.len % sizeof(BufferSegment)) {
86 PyErr_Format(PyExc_ValueError, "segments array size is not a multiple of %lu",
87 sizeof(BufferSegment));
88 goto except;
89 }
90
91 segmentCount = segments.len / sizeof(BufferSegment);
92
93 /* Validate segments data, as blindly trusting it could lead to arbitrary
94 memory access. */
95 for (i = 0; i < segmentCount; i++) {
96 BufferSegment* segment = &((BufferSegment*)(segments.buf))[i];
97
98 if (segment->offset + segment->length > (unsigned long long)self->parent.len) {
99 PyErr_SetString(PyExc_ValueError, "offset within segments array references memory outside buffer");
100 goto except;
101 return -1;
102 }
103 }
104
105 /* Make a copy of the segments data. It is cheap to do so and is a guard
106 against caller changing offsets, which has security implications. */
107 self->segments = PyMem_Malloc(segments.len);
108 if (!self->segments) {
109 PyErr_NoMemory();
110 goto except;
111 }
112
113 memcpy(self->segments, segments.buf, segments.len);
114 PyBuffer_Release(&segments);
115
116 self->data = self->parent.buf;
117 self->dataSize = self->parent.len;
118 self->segmentCount = segmentCount;
119
120 return 0;
121
122 except:
123 PyBuffer_Release(&self->parent);
124 PyBuffer_Release(&segments);
125 return -1;
126 };
127
128 /**
129 * Construct a BufferWithSegments from existing memory and offsets.
130 *
131 * Ownership of the backing memory and BufferSegments will be transferred to
132 * the created object and freed when the BufferWithSegments is destroyed.
133 */
134 ZstdBufferWithSegments* BufferWithSegments_FromMemory(void* data, unsigned long long dataSize,
135 BufferSegment* segments, Py_ssize_t segmentsSize) {
136 ZstdBufferWithSegments* result = NULL;
137 Py_ssize_t i;
138
139 if (NULL == data) {
140 PyErr_SetString(PyExc_ValueError, "data is NULL");
141 return NULL;
142 }
143
144 if (NULL == segments) {
145 PyErr_SetString(PyExc_ValueError, "segments is NULL");
146 return NULL;
147 }
148
149 for (i = 0; i < segmentsSize; i++) {
150 BufferSegment* segment = &segments[i];
151
152 if (segment->offset + segment->length > dataSize) {
153 PyErr_SetString(PyExc_ValueError, "offset in segments overflows buffer size");
154 return NULL;
155 }
156 }
157
158 result = PyObject_New(ZstdBufferWithSegments, &ZstdBufferWithSegmentsType);
159 if (NULL == result) {
160 return NULL;
161 }
162
163 result->useFree = 0;
164
165 memset(&result->parent, 0, sizeof(result->parent));
166 result->data = data;
167 result->dataSize = dataSize;
168 result->segments = segments;
169 result->segmentCount = segmentsSize;
170
171 return result;
172 }
173
174 static Py_ssize_t BufferWithSegments_length(ZstdBufferWithSegments* self) {
175 return self->segmentCount;
176 }
177
178 static ZstdBufferSegment* BufferWithSegments_item(ZstdBufferWithSegments* self, Py_ssize_t i) {
179 ZstdBufferSegment* result = NULL;
180
181 if (i < 0) {
182 PyErr_SetString(PyExc_IndexError, "offset must be non-negative");
183 return NULL;
184 }
185
186 if (i >= self->segmentCount) {
187 PyErr_Format(PyExc_IndexError, "offset must be less than %zd", self->segmentCount);
188 return NULL;
189 }
190
191 result = (ZstdBufferSegment*)PyObject_CallObject((PyObject*)&ZstdBufferSegmentType, NULL);
192 if (NULL == result) {
193 return NULL;
194 }
195
196 result->parent = (PyObject*)self;
197 Py_INCREF(self);
198
199 result->data = (char*)self->data + self->segments[i].offset;
200 result->dataSize = self->segments[i].length;
201 result->offset = self->segments[i].offset;
202
203 return result;
204 }
205
206 #if PY_MAJOR_VERSION >= 3
207 static int BufferWithSegments_getbuffer(ZstdBufferWithSegments* self, Py_buffer* view, int flags) {
208 return PyBuffer_FillInfo(view, (PyObject*)self, self->data, self->dataSize, 1, flags);
209 }
210 #else
211 static Py_ssize_t BufferWithSegments_getreadbuffer(ZstdBufferWithSegments* self, Py_ssize_t segment, void **ptrptr) {
212 if (segment != 0) {
213 PyErr_SetString(PyExc_ValueError, "segment number must be 0");
214 return -1;
215 }
216
217 *ptrptr = self->data;
218 return self->dataSize;
219 }
220
221 static Py_ssize_t BufferWithSegments_getsegcount(ZstdBufferWithSegments* self, Py_ssize_t* len) {
222 if (len) {
223 *len = 1;
224 }
225
226 return 1;
227 }
228 #endif
229
230 PyDoc_STRVAR(BufferWithSegments_tobytes__doc__,
231 "Obtain a bytes instance for this buffer.\n"
232 );
233
234 static PyObject* BufferWithSegments_tobytes(ZstdBufferWithSegments* self) {
235 return PyBytes_FromStringAndSize(self->data, self->dataSize);
236 }
237
238 PyDoc_STRVAR(BufferWithSegments_segments__doc__,
239 "Obtain a BufferSegments describing segments in this sintance.\n"
240 );
241
242 static ZstdBufferSegments* BufferWithSegments_segments(ZstdBufferWithSegments* self) {
243 ZstdBufferSegments* result = (ZstdBufferSegments*)PyObject_CallObject((PyObject*)&ZstdBufferSegmentsType, NULL);
244 if (NULL == result) {
245 return NULL;
246 }
247
248 result->parent = (PyObject*)self;
249 Py_INCREF(self);
250 result->segments = self->segments;
251 result->segmentCount = self->segmentCount;
252
253 return result;
254 }
255
256 static PySequenceMethods BufferWithSegments_sq = {
257 (lenfunc)BufferWithSegments_length, /* sq_length */
258 0, /* sq_concat */
259 0, /* sq_repeat */
260 (ssizeargfunc)BufferWithSegments_item, /* sq_item */
261 0, /* sq_ass_item */
262 0, /* sq_contains */
263 0, /* sq_inplace_concat */
264 0 /* sq_inplace_repeat */
265 };
266
267 static PyBufferProcs BufferWithSegments_as_buffer = {
268 #if PY_MAJOR_VERSION >= 3
269 (getbufferproc)BufferWithSegments_getbuffer, /* bf_getbuffer */
270 0 /* bf_releasebuffer */
271 #else
272 (readbufferproc)BufferWithSegments_getreadbuffer, /* bf_getreadbuffer */
273 0, /* bf_getwritebuffer */
274 (segcountproc)BufferWithSegments_getsegcount, /* bf_getsegcount */
275 0 /* bf_getcharbuffer */
276 #endif
277 };
278
279 static PyMethodDef BufferWithSegments_methods[] = {
280 { "segments", (PyCFunction)BufferWithSegments_segments,
281 METH_NOARGS, BufferWithSegments_segments__doc__ },
282 { "tobytes", (PyCFunction)BufferWithSegments_tobytes,
283 METH_NOARGS, BufferWithSegments_tobytes__doc__ },
284 { NULL, NULL }
285 };
286
287 static PyMemberDef BufferWithSegments_members[] = {
288 { "size", T_ULONGLONG, offsetof(ZstdBufferWithSegments, dataSize),
289 READONLY, "total size of the buffer in bytes" },
290 { NULL }
291 };
292
293 PyTypeObject ZstdBufferWithSegmentsType = {
294 PyVarObject_HEAD_INIT(NULL, 0)
295 "zstd.BufferWithSegments", /* tp_name */
296 sizeof(ZstdBufferWithSegments),/* tp_basicsize */
297 0, /* tp_itemsize */
298 (destructor)BufferWithSegments_dealloc, /* tp_dealloc */
299 0, /* tp_print */
300 0, /* tp_getattr */
301 0, /* tp_setattr */
302 0, /* tp_compare */
303 0, /* tp_repr */
304 0, /* tp_as_number */
305 &BufferWithSegments_sq, /* tp_as_sequence */
306 0, /* tp_as_mapping */
307 0, /* tp_hash */
308 0, /* tp_call */
309 0, /* tp_str */
310 0, /* tp_getattro */
311 0, /* tp_setattro */
312 &BufferWithSegments_as_buffer, /* tp_as_buffer */
313 Py_TPFLAGS_DEFAULT, /* tp_flags */
314 BufferWithSegments__doc__, /* tp_doc */
315 0, /* tp_traverse */
316 0, /* tp_clear */
317 0, /* tp_richcompare */
318 0, /* tp_weaklistoffset */
319 0, /* tp_iter */
320 0, /* tp_iternext */
321 BufferWithSegments_methods, /* tp_methods */
322 BufferWithSegments_members, /* tp_members */
323 0, /* tp_getset */
324 0, /* tp_base */
325 0, /* tp_dict */
326 0, /* tp_descr_get */
327 0, /* tp_descr_set */
328 0, /* tp_dictoffset */
329 (initproc)BufferWithSegments_init, /* tp_init */
330 0, /* tp_alloc */
331 PyType_GenericNew, /* tp_new */
332 };
333
334 PyDoc_STRVAR(BufferSegments__doc__,
335 "BufferSegments - Represents segments/offsets within a BufferWithSegments\n"
336 );
337
338 static void BufferSegments_dealloc(ZstdBufferSegments* self) {
339 Py_CLEAR(self->parent);
340 PyObject_Del(self);
341 }
342
343 #if PY_MAJOR_VERSION >= 3
344 static int BufferSegments_getbuffer(ZstdBufferSegments* self, Py_buffer* view, int flags) {
345 return PyBuffer_FillInfo(view, (PyObject*)self,
346 (void*)self->segments, self->segmentCount * sizeof(BufferSegment),
347 1, flags);
348 }
349 #else
350 static Py_ssize_t BufferSegments_getreadbuffer(ZstdBufferSegments* self, Py_ssize_t segment, void **ptrptr) {
351 if (segment != 0) {
352 PyErr_SetString(PyExc_ValueError, "segment number must be 0");
353 return -1;
354 }
355
356 *ptrptr = (void*)self->segments;
357 return self->segmentCount * sizeof(BufferSegment);
358 }
359
360 static Py_ssize_t BufferSegments_getsegcount(ZstdBufferSegments* self, Py_ssize_t* len) {
361 if (len) {
362 *len = 1;
363 }
364
365 return 1;
366 }
367 #endif
368
369 static PyBufferProcs BufferSegments_as_buffer = {
370 #if PY_MAJOR_VERSION >= 3
371 (getbufferproc)BufferSegments_getbuffer,
372 0
373 #else
374 (readbufferproc)BufferSegments_getreadbuffer,
375 0,
376 (segcountproc)BufferSegments_getsegcount,
377 0
378 #endif
379 };
380
381 PyTypeObject ZstdBufferSegmentsType = {
382 PyVarObject_HEAD_INIT(NULL, 0)
383 "zstd.BufferSegments", /* tp_name */
384 sizeof(ZstdBufferSegments),/* tp_basicsize */
385 0, /* tp_itemsize */
386 (destructor)BufferSegments_dealloc, /* tp_dealloc */
387 0, /* tp_print */
388 0, /* tp_getattr */
389 0, /* tp_setattr */
390 0, /* tp_compare */
391 0, /* tp_repr */
392 0, /* tp_as_number */
393 0, /* tp_as_sequence */
394 0, /* tp_as_mapping */
395 0, /* tp_hash */
396 0, /* tp_call */
397 0, /* tp_str */
398 0, /* tp_getattro */
399 0, /* tp_setattro */
400 &BufferSegments_as_buffer, /* tp_as_buffer */
401 Py_TPFLAGS_DEFAULT, /* tp_flags */
402 BufferSegments__doc__, /* tp_doc */
403 0, /* tp_traverse */
404 0, /* tp_clear */
405 0, /* tp_richcompare */
406 0, /* tp_weaklistoffset */
407 0, /* tp_iter */
408 0, /* tp_iternext */
409 0, /* tp_methods */
410 0, /* tp_members */
411 0, /* tp_getset */
412 0, /* tp_base */
413 0, /* tp_dict */
414 0, /* tp_descr_get */
415 0, /* tp_descr_set */
416 0, /* tp_dictoffset */
417 0, /* tp_init */
418 0, /* tp_alloc */
419 PyType_GenericNew, /* tp_new */
420 };
421
422 PyDoc_STRVAR(BufferSegment__doc__,
423 "BufferSegment - Represents a segment within a BufferWithSegments\n"
424 );
425
426 static void BufferSegment_dealloc(ZstdBufferSegment* self) {
427 Py_CLEAR(self->parent);
428 PyObject_Del(self);
429 }
430
431 static Py_ssize_t BufferSegment_length(ZstdBufferSegment* self) {
432 return self->dataSize;
433 }
434
435 #if PY_MAJOR_VERSION >= 3
436 static int BufferSegment_getbuffer(ZstdBufferSegment* self, Py_buffer* view, int flags) {
437 return PyBuffer_FillInfo(view, (PyObject*)self,
438 self->data, self->dataSize, 1, flags);
439 }
440 #else
441 static Py_ssize_t BufferSegment_getreadbuffer(ZstdBufferSegment* self, Py_ssize_t segment, void **ptrptr) {
442 if (segment != 0) {
443 PyErr_SetString(PyExc_ValueError, "segment number must be 0");
444 return -1;
445 }
446
447 *ptrptr = self->data;
448 return self->dataSize;
449 }
450
451 static Py_ssize_t BufferSegment_getsegcount(ZstdBufferSegment* self, Py_ssize_t* len) {
452 if (len) {
453 *len = 1;
454 }
455
456 return 1;
457 }
458 #endif
459
460 PyDoc_STRVAR(BufferSegment_tobytes__doc__,
461 "Obtain a bytes instance for this segment.\n"
462 );
463
464 static PyObject* BufferSegment_tobytes(ZstdBufferSegment* self) {
465 return PyBytes_FromStringAndSize(self->data, self->dataSize);
466 }
467
468 static PySequenceMethods BufferSegment_sq = {
469 (lenfunc)BufferSegment_length, /* sq_length */
470 0, /* sq_concat */
471 0, /* sq_repeat */
472 0, /* sq_item */
473 0, /* sq_ass_item */
474 0, /* sq_contains */
475 0, /* sq_inplace_concat */
476 0 /* sq_inplace_repeat */
477 };
478
479 static PyBufferProcs BufferSegment_as_buffer = {
480 #if PY_MAJOR_VERSION >= 3
481 (getbufferproc)BufferSegment_getbuffer,
482 0
483 #else
484 (readbufferproc)BufferSegment_getreadbuffer,
485 0,
486 (segcountproc)BufferSegment_getsegcount,
487 0
488 #endif
489 };
490
491 static PyMethodDef BufferSegment_methods[] = {
492 { "tobytes", (PyCFunction)BufferSegment_tobytes,
493 METH_NOARGS, BufferSegment_tobytes__doc__ },
494 { NULL, NULL }
495 };
496
497 static PyMemberDef BufferSegment_members[] = {
498 { "offset", T_ULONGLONG, offsetof(ZstdBufferSegment, offset), READONLY,
499 "offset of segment within parent buffer" },
500 { NULL }
501 };
502
503 PyTypeObject ZstdBufferSegmentType = {
504 PyVarObject_HEAD_INIT(NULL, 0)
505 "zstd.BufferSegment", /* tp_name */
506 sizeof(ZstdBufferSegment),/* tp_basicsize */
507 0, /* tp_itemsize */
508 (destructor)BufferSegment_dealloc, /* tp_dealloc */
509 0, /* tp_print */
510 0, /* tp_getattr */
511 0, /* tp_setattr */
512 0, /* tp_compare */
513 0, /* tp_repr */
514 0, /* tp_as_number */
515 &BufferSegment_sq, /* tp_as_sequence */
516 0, /* tp_as_mapping */
517 0, /* tp_hash */
518 0, /* tp_call */
519 0, /* tp_str */
520 0, /* tp_getattro */
521 0, /* tp_setattro */
522 &BufferSegment_as_buffer, /* tp_as_buffer */
523 Py_TPFLAGS_DEFAULT, /* tp_flags */
524 BufferSegment__doc__, /* tp_doc */
525 0, /* tp_traverse */
526 0, /* tp_clear */
527 0, /* tp_richcompare */
528 0, /* tp_weaklistoffset */
529 0, /* tp_iter */
530 0, /* tp_iternext */
531 BufferSegment_methods, /* tp_methods */
532 BufferSegment_members, /* tp_members */
533 0, /* tp_getset */
534 0, /* tp_base */
535 0, /* tp_dict */
536 0, /* tp_descr_get */
537 0, /* tp_descr_set */
538 0, /* tp_dictoffset */
539 0, /* tp_init */
540 0, /* tp_alloc */
541 PyType_GenericNew, /* tp_new */
542 };
543
544 PyDoc_STRVAR(BufferWithSegmentsCollection__doc__,
545 "Represents a collection of BufferWithSegments.\n"
546 );
547
548 static void BufferWithSegmentsCollection_dealloc(ZstdBufferWithSegmentsCollection* self) {
549 Py_ssize_t i;
550
551 if (self->firstElements) {
552 PyMem_Free(self->firstElements);
553 self->firstElements = NULL;
554 }
555
556 if (self->buffers) {
557 for (i = 0; i < self->bufferCount; i++) {
558 Py_CLEAR(self->buffers[i]);
559 }
560
561 PyMem_Free(self->buffers);
562 self->buffers = NULL;
563 }
564
565 PyObject_Del(self);
566 }
567
568 static int BufferWithSegmentsCollection_init(ZstdBufferWithSegmentsCollection* self, PyObject* args) {
569 Py_ssize_t size;
570 Py_ssize_t i;
571 Py_ssize_t offset = 0;
572
573 size = PyTuple_Size(args);
574 if (-1 == size) {
575 return -1;
576 }
577
578 if (0 == size) {
579 PyErr_SetString(PyExc_ValueError, "must pass at least 1 argument");
580 return -1;
581 }
582
583 for (i = 0; i < size; i++) {
584 PyObject* item = PyTuple_GET_ITEM(args, i);
585 if (!PyObject_TypeCheck(item, &ZstdBufferWithSegmentsType)) {
586 PyErr_SetString(PyExc_TypeError, "arguments must be BufferWithSegments instances");
587 return -1;
588 }
589
590 if (0 == ((ZstdBufferWithSegments*)item)->segmentCount ||
591 0 == ((ZstdBufferWithSegments*)item)->dataSize) {
592 PyErr_SetString(PyExc_ValueError, "ZstdBufferWithSegments cannot be empty");
593 return -1;
594 }
595 }
596
597 self->buffers = PyMem_Malloc(size * sizeof(ZstdBufferWithSegments*));
598 if (NULL == self->buffers) {
599 PyErr_NoMemory();
600 return -1;
601 }
602
603 self->firstElements = PyMem_Malloc(size * sizeof(Py_ssize_t));
604 if (NULL == self->firstElements) {
605 PyMem_Free(self->buffers);
606 self->buffers = NULL;
607 PyErr_NoMemory();
608 return -1;
609 }
610
611 self->bufferCount = size;
612
613 for (i = 0; i < size; i++) {
614 ZstdBufferWithSegments* item = (ZstdBufferWithSegments*)PyTuple_GET_ITEM(args, i);
615
616 self->buffers[i] = item;
617 Py_INCREF(item);
618
619 if (i > 0) {
620 self->firstElements[i - 1] = offset;
621 }
622
623 offset += item->segmentCount;
624 }
625
626 self->firstElements[size - 1] = offset;
627
628 return 0;
629 }
630
631 static PyObject* BufferWithSegmentsCollection_size(ZstdBufferWithSegmentsCollection* self) {
632 Py_ssize_t i;
633 Py_ssize_t j;
634 unsigned long long size = 0;
635
636 for (i = 0; i < self->bufferCount; i++) {
637 for (j = 0; j < self->buffers[i]->segmentCount; j++) {
638 size += self->buffers[i]->segments[j].length;
639 }
640 }
641
642 return PyLong_FromUnsignedLongLong(size);
643 }
644
645 Py_ssize_t BufferWithSegmentsCollection_length(ZstdBufferWithSegmentsCollection* self) {
646 return self->firstElements[self->bufferCount - 1];
647 }
648
649 static ZstdBufferSegment* BufferWithSegmentsCollection_item(ZstdBufferWithSegmentsCollection* self, Py_ssize_t i) {
650 Py_ssize_t bufferOffset;
651
652 if (i < 0) {
653 PyErr_SetString(PyExc_IndexError, "offset must be non-negative");
654 return NULL;
655 }
656
657 if (i >= BufferWithSegmentsCollection_length(self)) {
658 PyErr_Format(PyExc_IndexError, "offset must be less than %zd",
659 BufferWithSegmentsCollection_length(self));
660 return NULL;
661 }
662
663 for (bufferOffset = 0; bufferOffset < self->bufferCount; bufferOffset++) {
664 Py_ssize_t offset = 0;
665
666 if (i < self->firstElements[bufferOffset]) {
667 if (bufferOffset > 0) {
668 offset = self->firstElements[bufferOffset - 1];
669 }
670
671 return BufferWithSegments_item(self->buffers[bufferOffset], i - offset);
672 }
673 }
674
675 PyErr_SetString(ZstdError, "error resolving segment; this should not happen");
676 return NULL;
677 }
678
679 static PySequenceMethods BufferWithSegmentsCollection_sq = {
680 (lenfunc)BufferWithSegmentsCollection_length, /* sq_length */
681 0, /* sq_concat */
682 0, /* sq_repeat */
683 (ssizeargfunc)BufferWithSegmentsCollection_item, /* sq_item */
684 0, /* sq_ass_item */
685 0, /* sq_contains */
686 0, /* sq_inplace_concat */
687 0 /* sq_inplace_repeat */
688 };
689
690 static PyMethodDef BufferWithSegmentsCollection_methods[] = {
691 { "size", (PyCFunction)BufferWithSegmentsCollection_size,
692 METH_NOARGS, PyDoc_STR("total size in bytes of all segments") },
693 { NULL, NULL }
694 };
695
696 PyTypeObject ZstdBufferWithSegmentsCollectionType = {
697 PyVarObject_HEAD_INIT(NULL, 0)
698 "zstd.BufferWithSegmentsCollection", /* tp_name */
699 sizeof(ZstdBufferWithSegmentsCollection),/* tp_basicsize */
700 0, /* tp_itemsize */
701 (destructor)BufferWithSegmentsCollection_dealloc, /* tp_dealloc */
702 0, /* tp_print */
703 0, /* tp_getattr */
704 0, /* tp_setattr */
705 0, /* tp_compare */
706 0, /* tp_repr */
707 0, /* tp_as_number */
708 &BufferWithSegmentsCollection_sq, /* tp_as_sequence */
709 0, /* tp_as_mapping */
710 0, /* tp_hash */
711 0, /* tp_call */
712 0, /* tp_str */
713 0, /* tp_getattro */
714 0, /* tp_setattro */
715 0, /* tp_as_buffer */
716 Py_TPFLAGS_DEFAULT, /* tp_flags */
717 BufferWithSegmentsCollection__doc__, /* tp_doc */
718 0, /* tp_traverse */
719 0, /* tp_clear */
720 0, /* tp_richcompare */
721 0, /* tp_weaklistoffset */
722 /* TODO implement iterator for performance. */
723 0, /* tp_iter */
724 0, /* tp_iternext */
725 BufferWithSegmentsCollection_methods, /* tp_methods */
726 0, /* tp_members */
727 0, /* tp_getset */
728 0, /* tp_base */
729 0, /* tp_dict */
730 0, /* tp_descr_get */
731 0, /* tp_descr_set */
732 0, /* tp_dictoffset */
733 (initproc)BufferWithSegmentsCollection_init, /* tp_init */
734 0, /* tp_alloc */
735 PyType_GenericNew, /* tp_new */
736 };
737
738 void bufferutil_module_init(PyObject* mod) {
739 Py_TYPE(&ZstdBufferWithSegmentsType) = &PyType_Type;
740 if (PyType_Ready(&ZstdBufferWithSegmentsType) < 0) {
741 return;
742 }
743
744 Py_INCREF(&ZstdBufferWithSegmentsType);
745 PyModule_AddObject(mod, "BufferWithSegments", (PyObject*)&ZstdBufferWithSegmentsType);
746
747 Py_TYPE(&ZstdBufferSegmentsType) = &PyType_Type;
748 if (PyType_Ready(&ZstdBufferSegmentsType) < 0) {
749 return;
750 }
751
752 Py_INCREF(&ZstdBufferSegmentsType);
753 PyModule_AddObject(mod, "BufferSegments", (PyObject*)&ZstdBufferSegmentsType);
754
755 Py_TYPE(&ZstdBufferSegmentType) = &PyType_Type;
756 if (PyType_Ready(&ZstdBufferSegmentType) < 0) {
757 return;
758 }
759
760 Py_INCREF(&ZstdBufferSegmentType);
761 PyModule_AddObject(mod, "BufferSegment", (PyObject*)&ZstdBufferSegmentType);
762
763 Py_TYPE(&ZstdBufferWithSegmentsCollectionType) = &PyType_Type;
764 if (PyType_Ready(&ZstdBufferWithSegmentsCollectionType) < 0) {
765 return;
766 }
767
768 Py_INCREF(&ZstdBufferWithSegmentsCollectionType);
769 PyModule_AddObject(mod, "BufferWithSegmentsCollection", (PyObject*)&ZstdBufferWithSegmentsCollectionType);
770 }
@@ -0,0 +1,112 b''
1 import struct
2
3 try:
4 import unittest2 as unittest
5 except ImportError:
6 import unittest
7
8 import zstd
9
10 ss = struct.Struct('=QQ')
11
12
13 class TestBufferWithSegments(unittest.TestCase):
14 def test_arguments(self):
15 with self.assertRaises(TypeError):
16 zstd.BufferWithSegments()
17
18 with self.assertRaises(TypeError):
19 zstd.BufferWithSegments(b'foo')
20
21 # Segments data should be a multiple of 16.
22 with self.assertRaisesRegexp(ValueError, 'segments array size is not a multiple of 16'):
23 zstd.BufferWithSegments(b'foo', b'\x00\x00')
24
25 def test_invalid_offset(self):
26 with self.assertRaisesRegexp(ValueError, 'offset within segments array references memory'):
27 zstd.BufferWithSegments(b'foo', ss.pack(0, 4))
28
29 def test_invalid_getitem(self):
30 b = zstd.BufferWithSegments(b'foo', ss.pack(0, 3))
31
32 with self.assertRaisesRegexp(IndexError, 'offset must be non-negative'):
33 test = b[-10]
34
35 with self.assertRaisesRegexp(IndexError, 'offset must be less than 1'):
36 test = b[1]
37
38 with self.assertRaisesRegexp(IndexError, 'offset must be less than 1'):
39 test = b[2]
40
41 def test_single(self):
42 b = zstd.BufferWithSegments(b'foo', ss.pack(0, 3))
43 self.assertEqual(len(b), 1)
44 self.assertEqual(b.size, 3)
45 self.assertEqual(b.tobytes(), b'foo')
46
47 self.assertEqual(len(b[0]), 3)
48 self.assertEqual(b[0].offset, 0)
49 self.assertEqual(b[0].tobytes(), b'foo')
50
51 def test_multiple(self):
52 b = zstd.BufferWithSegments(b'foofooxfooxy', b''.join([ss.pack(0, 3),
53 ss.pack(3, 4),
54 ss.pack(7, 5)]))
55 self.assertEqual(len(b), 3)
56 self.assertEqual(b.size, 12)
57 self.assertEqual(b.tobytes(), b'foofooxfooxy')
58
59 self.assertEqual(b[0].tobytes(), b'foo')
60 self.assertEqual(b[1].tobytes(), b'foox')
61 self.assertEqual(b[2].tobytes(), b'fooxy')
62
63
64 class TestBufferWithSegmentsCollection(unittest.TestCase):
65 def test_empty_constructor(self):
66 with self.assertRaisesRegexp(ValueError, 'must pass at least 1 argument'):
67 zstd.BufferWithSegmentsCollection()
68
69 def test_argument_validation(self):
70 with self.assertRaisesRegexp(TypeError, 'arguments must be BufferWithSegments'):
71 zstd.BufferWithSegmentsCollection(None)
72
73 with self.assertRaisesRegexp(TypeError, 'arguments must be BufferWithSegments'):
74 zstd.BufferWithSegmentsCollection(zstd.BufferWithSegments(b'foo', ss.pack(0, 3)),
75 None)
76
77 with self.assertRaisesRegexp(ValueError, 'ZstdBufferWithSegments cannot be empty'):
78 zstd.BufferWithSegmentsCollection(zstd.BufferWithSegments(b'', b''))
79
80 def test_length(self):
81 b1 = zstd.BufferWithSegments(b'foo', ss.pack(0, 3))
82 b2 = zstd.BufferWithSegments(b'barbaz', b''.join([ss.pack(0, 3),
83 ss.pack(3, 3)]))
84
85 c = zstd.BufferWithSegmentsCollection(b1)
86 self.assertEqual(len(c), 1)
87 self.assertEqual(c.size(), 3)
88
89 c = zstd.BufferWithSegmentsCollection(b2)
90 self.assertEqual(len(c), 2)
91 self.assertEqual(c.size(), 6)
92
93 c = zstd.BufferWithSegmentsCollection(b1, b2)
94 self.assertEqual(len(c), 3)
95 self.assertEqual(c.size(), 9)
96
97 def test_getitem(self):
98 b1 = zstd.BufferWithSegments(b'foo', ss.pack(0, 3))
99 b2 = zstd.BufferWithSegments(b'barbaz', b''.join([ss.pack(0, 3),
100 ss.pack(3, 3)]))
101
102 c = zstd.BufferWithSegmentsCollection(b1, b2)
103
104 with self.assertRaisesRegexp(IndexError, 'offset must be less than 3'):
105 c[3]
106
107 with self.assertRaisesRegexp(IndexError, 'offset must be less than 3'):
108 c[4]
109
110 self.assertEqual(c[0].tobytes(), b'foo')
111 self.assertEqual(c[1].tobytes(), b'bar')
112 self.assertEqual(c[2].tobytes(), b'baz')
@@ -0,0 +1,143 b''
1 import io
2 import os
3
4 try:
5 import unittest2 as unittest
6 except ImportError:
7 import unittest
8
9 try:
10 import hypothesis
11 import hypothesis.strategies as strategies
12 except ImportError:
13 raise unittest.SkipTest('hypothesis not available')
14
15 import zstd
16
17 from . common import (
18 make_cffi,
19 random_input_data,
20 )
21
22
23 @unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set')
24 @make_cffi
25 class TestCompressor_write_to_fuzzing(unittest.TestCase):
26 @hypothesis.given(original=strategies.sampled_from(random_input_data()),
27 level=strategies.integers(min_value=1, max_value=5),
28 write_size=strategies.integers(min_value=1, max_value=1048576))
29 def test_write_size_variance(self, original, level, write_size):
30 refctx = zstd.ZstdCompressor(level=level)
31 ref_frame = refctx.compress(original)
32
33 cctx = zstd.ZstdCompressor(level=level)
34 b = io.BytesIO()
35 with cctx.write_to(b, size=len(original), write_size=write_size) as compressor:
36 compressor.write(original)
37
38 self.assertEqual(b.getvalue(), ref_frame)
39
40
41 @unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set')
42 @make_cffi
43 class TestCompressor_copy_stream_fuzzing(unittest.TestCase):
44 @hypothesis.given(original=strategies.sampled_from(random_input_data()),
45 level=strategies.integers(min_value=1, max_value=5),
46 read_size=strategies.integers(min_value=1, max_value=1048576),
47 write_size=strategies.integers(min_value=1, max_value=1048576))
48 def test_read_write_size_variance(self, original, level, read_size, write_size):
49 refctx = zstd.ZstdCompressor(level=level)
50 ref_frame = refctx.compress(original)
51
52 cctx = zstd.ZstdCompressor(level=level)
53 source = io.BytesIO(original)
54 dest = io.BytesIO()
55
56 cctx.copy_stream(source, dest, size=len(original), read_size=read_size,
57 write_size=write_size)
58
59 self.assertEqual(dest.getvalue(), ref_frame)
60
61
62 @unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set')
63 @make_cffi
64 class TestCompressor_compressobj_fuzzing(unittest.TestCase):
65 @hypothesis.given(original=strategies.sampled_from(random_input_data()),
66 level=strategies.integers(min_value=1, max_value=5),
67 chunk_sizes=strategies.streaming(
68 strategies.integers(min_value=1, max_value=4096)))
69 def test_random_input_sizes(self, original, level, chunk_sizes):
70 chunk_sizes = iter(chunk_sizes)
71
72 refctx = zstd.ZstdCompressor(level=level)
73 ref_frame = refctx.compress(original)
74
75 cctx = zstd.ZstdCompressor(level=level)
76 cobj = cctx.compressobj(size=len(original))
77
78 chunks = []
79 i = 0
80 while True:
81 chunk_size = next(chunk_sizes)
82 source = original[i:i + chunk_size]
83 if not source:
84 break
85
86 chunks.append(cobj.compress(source))
87 i += chunk_size
88
89 chunks.append(cobj.flush())
90
91 self.assertEqual(b''.join(chunks), ref_frame)
92
93
94 @unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set')
95 @make_cffi
96 class TestCompressor_read_from_fuzzing(unittest.TestCase):
97 @hypothesis.given(original=strategies.sampled_from(random_input_data()),
98 level=strategies.integers(min_value=1, max_value=5),
99 read_size=strategies.integers(min_value=1, max_value=4096),
100 write_size=strategies.integers(min_value=1, max_value=4096))
101 def test_read_write_size_variance(self, original, level, read_size, write_size):
102 refcctx = zstd.ZstdCompressor(level=level)
103 ref_frame = refcctx.compress(original)
104
105 source = io.BytesIO(original)
106
107 cctx = zstd.ZstdCompressor(level=level)
108 chunks = list(cctx.read_from(source, size=len(original), read_size=read_size,
109 write_size=write_size))
110
111 self.assertEqual(b''.join(chunks), ref_frame)
112
113
114 @unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set')
115 class TestCompressor_multi_compress_to_buffer_fuzzing(unittest.TestCase):
116 @hypothesis.given(original=strategies.lists(strategies.sampled_from(random_input_data()),
117 min_size=1, max_size=1024),
118 threads=strategies.integers(min_value=1, max_value=8),
119 use_dict=strategies.booleans())
120 def test_data_equivalence(self, original, threads, use_dict):
121 kwargs = {}
122
123 # Use a content dictionary because it is cheap to create.
124 if use_dict:
125 kwargs['dict_data'] = zstd.ZstdCompressionDict(original[0])
126
127 cctx = zstd.ZstdCompressor(level=1,
128 write_content_size=True,
129 write_checksum=True,
130 **kwargs)
131
132 result = cctx.multi_compress_to_buffer(original, threads=-1)
133
134 self.assertEqual(len(result), len(original))
135
136 # The frame produced via the batch APIs may not be bit identical to that
137 # produced by compress() because compression parameters are adjusted
138 # from the first input in batch mode. So the only thing we can do is
139 # verify the decompressed data matches the input.
140 dctx = zstd.ZstdDecompressor(**kwargs)
141
142 for i, frame in enumerate(result):
143 self.assertEqual(dctx.decompress(frame), original[i])
@@ -0,0 +1,79 b''
1 import io
2 import os
3
4 try:
5 import unittest2 as unittest
6 except ImportError:
7 import unittest
8
9 try:
10 import hypothesis
11 import hypothesis.strategies as strategies
12 except ImportError:
13 raise unittest.SkipTest('hypothesis not available')
14
15 import zstd
16
17 from .common import (
18 make_cffi,
19 )
20
21
22 s_windowlog = strategies.integers(min_value=zstd.WINDOWLOG_MIN,
23 max_value=zstd.WINDOWLOG_MAX)
24 s_chainlog = strategies.integers(min_value=zstd.CHAINLOG_MIN,
25 max_value=zstd.CHAINLOG_MAX)
26 s_hashlog = strategies.integers(min_value=zstd.HASHLOG_MIN,
27 max_value=zstd.HASHLOG_MAX)
28 s_searchlog = strategies.integers(min_value=zstd.SEARCHLOG_MIN,
29 max_value=zstd.SEARCHLOG_MAX)
30 s_searchlength = strategies.integers(min_value=zstd.SEARCHLENGTH_MIN,
31 max_value=zstd.SEARCHLENGTH_MAX)
32 s_targetlength = strategies.integers(min_value=zstd.TARGETLENGTH_MIN,
33 max_value=zstd.TARGETLENGTH_MAX)
34 s_strategy = strategies.sampled_from((zstd.STRATEGY_FAST,
35 zstd.STRATEGY_DFAST,
36 zstd.STRATEGY_GREEDY,
37 zstd.STRATEGY_LAZY,
38 zstd.STRATEGY_LAZY2,
39 zstd.STRATEGY_BTLAZY2,
40 zstd.STRATEGY_BTOPT))
41
42
43 @make_cffi
44 @unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set')
45 class TestCompressionParametersHypothesis(unittest.TestCase):
46 @hypothesis.given(s_windowlog, s_chainlog, s_hashlog, s_searchlog,
47 s_searchlength, s_targetlength, s_strategy)
48 def test_valid_init(self, windowlog, chainlog, hashlog, searchlog,
49 searchlength, targetlength, strategy):
50 # ZSTD_checkCParams moves the goal posts on us from what's advertised
51 # in the constants. So move along with them.
52 if searchlength == zstd.SEARCHLENGTH_MIN and strategy in (zstd.STRATEGY_FAST, zstd.STRATEGY_GREEDY):
53 searchlength += 1
54 elif searchlength == zstd.SEARCHLENGTH_MAX and strategy != zstd.STRATEGY_FAST:
55 searchlength -= 1
56
57 p = zstd.CompressionParameters(windowlog, chainlog, hashlog,
58 searchlog, searchlength,
59 targetlength, strategy)
60
61 cctx = zstd.ZstdCompressor(compression_params=p)
62 with cctx.write_to(io.BytesIO()):
63 pass
64
65 @hypothesis.given(s_windowlog, s_chainlog, s_hashlog, s_searchlog,
66 s_searchlength, s_targetlength, s_strategy)
67 def test_estimate_compression_context_size(self, windowlog, chainlog,
68 hashlog, searchlog,
69 searchlength, targetlength,
70 strategy):
71 if searchlength == zstd.SEARCHLENGTH_MIN and strategy in (zstd.STRATEGY_FAST, zstd.STRATEGY_GREEDY):
72 searchlength += 1
73 elif searchlength == zstd.SEARCHLENGTH_MAX and strategy != zstd.STRATEGY_FAST:
74 searchlength -= 1
75
76 p = zstd.CompressionParameters(windowlog, chainlog, hashlog,
77 searchlog, searchlength,
78 targetlength, strategy)
79 size = zstd.estimate_compression_context_size(p)
@@ -0,0 +1,151 b''
1 import io
2 import os
3
4 try:
5 import unittest2 as unittest
6 except ImportError:
7 import unittest
8
9 try:
10 import hypothesis
11 import hypothesis.strategies as strategies
12 except ImportError:
13 raise unittest.SkipTest('hypothesis not available')
14
15 import zstd
16
17 from . common import (
18 make_cffi,
19 random_input_data,
20 )
21
22
23 @unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set')
24 @make_cffi
25 class TestDecompressor_write_to_fuzzing(unittest.TestCase):
26 @hypothesis.given(original=strategies.sampled_from(random_input_data()),
27 level=strategies.integers(min_value=1, max_value=5),
28 write_size=strategies.integers(min_value=1, max_value=8192),
29 input_sizes=strategies.streaming(
30 strategies.integers(min_value=1, max_value=4096)))
31 def test_write_size_variance(self, original, level, write_size, input_sizes):
32 input_sizes = iter(input_sizes)
33
34 cctx = zstd.ZstdCompressor(level=level)
35 frame = cctx.compress(original)
36
37 dctx = zstd.ZstdDecompressor()
38 source = io.BytesIO(frame)
39 dest = io.BytesIO()
40
41 with dctx.write_to(dest, write_size=write_size) as decompressor:
42 while True:
43 chunk = source.read(next(input_sizes))
44 if not chunk:
45 break
46
47 decompressor.write(chunk)
48
49 self.assertEqual(dest.getvalue(), original)
50
51
52 @unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set')
53 @make_cffi
54 class TestDecompressor_copy_stream_fuzzing(unittest.TestCase):
55 @hypothesis.given(original=strategies.sampled_from(random_input_data()),
56 level=strategies.integers(min_value=1, max_value=5),
57 read_size=strategies.integers(min_value=1, max_value=8192),
58 write_size=strategies.integers(min_value=1, max_value=8192))
59 def test_read_write_size_variance(self, original, level, read_size, write_size):
60 cctx = zstd.ZstdCompressor(level=level)
61 frame = cctx.compress(original)
62
63 source = io.BytesIO(frame)
64 dest = io.BytesIO()
65
66 dctx = zstd.ZstdDecompressor()
67 dctx.copy_stream(source, dest, read_size=read_size, write_size=write_size)
68
69 self.assertEqual(dest.getvalue(), original)
70
71
72 @unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set')
73 @make_cffi
74 class TestDecompressor_decompressobj_fuzzing(unittest.TestCase):
75 @hypothesis.given(original=strategies.sampled_from(random_input_data()),
76 level=strategies.integers(min_value=1, max_value=5),
77 chunk_sizes=strategies.streaming(
78 strategies.integers(min_value=1, max_value=4096)))
79 def test_random_input_sizes(self, original, level, chunk_sizes):
80 chunk_sizes = iter(chunk_sizes)
81
82 cctx = zstd.ZstdCompressor(level=level)
83 frame = cctx.compress(original)
84
85 source = io.BytesIO(frame)
86
87 dctx = zstd.ZstdDecompressor()
88 dobj = dctx.decompressobj()
89
90 chunks = []
91 while True:
92 chunk = source.read(next(chunk_sizes))
93 if not chunk:
94 break
95
96 chunks.append(dobj.decompress(chunk))
97
98 self.assertEqual(b''.join(chunks), original)
99
100
101 @unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set')
102 @make_cffi
103 class TestDecompressor_read_from_fuzzing(unittest.TestCase):
104 @hypothesis.given(original=strategies.sampled_from(random_input_data()),
105 level=strategies.integers(min_value=1, max_value=5),
106 read_size=strategies.integers(min_value=1, max_value=4096),
107 write_size=strategies.integers(min_value=1, max_value=4096))
108 def test_read_write_size_variance(self, original, level, read_size, write_size):
109 cctx = zstd.ZstdCompressor(level=level)
110 frame = cctx.compress(original)
111
112 source = io.BytesIO(frame)
113
114 dctx = zstd.ZstdDecompressor()
115 chunks = list(dctx.read_from(source, read_size=read_size, write_size=write_size))
116
117 self.assertEqual(b''.join(chunks), original)
118
119
120 @unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set')
121 class TestDecompressor_multi_decompress_to_buffer_fuzzing(unittest.TestCase):
122 @hypothesis.given(original=strategies.lists(strategies.sampled_from(random_input_data()),
123 min_size=1, max_size=1024),
124 threads=strategies.integers(min_value=1, max_value=8),
125 use_dict=strategies.booleans())
126 def test_data_equivalence(self, original, threads, use_dict):
127 kwargs = {}
128 if use_dict:
129 kwargs['dict_data'] = zstd.ZstdCompressionDict(original[0])
130
131 cctx = zstd.ZstdCompressor(level=1,
132 write_content_size=True,
133 write_checksum=True,
134 **kwargs)
135
136 frames_buffer = cctx.multi_compress_to_buffer(original, threads=-1)
137
138 dctx = zstd.ZstdDecompressor(**kwargs)
139
140 result = dctx.multi_decompress_to_buffer(frames_buffer)
141
142 self.assertEqual(len(result), len(original))
143 for i, frame in enumerate(result):
144 self.assertEqual(frame.tobytes(), original[i])
145
146 frames_list = [f.tobytes() for f in frames_buffer]
147 result = dctx.multi_decompress_to_buffer(frames_list)
148
149 self.assertEqual(len(result), len(original))
150 for i, frame in enumerate(result):
151 self.assertEqual(frame.tobytes(), original[i])
@@ -1,6 +1,34 b''
1 1 Version History
2 2 ===============
3 3
4 0.8.0 (released 2017-03-08)
5 ---------------------------
6
7 * CompressionParameters now has a estimated_compression_context_size() method.
8 zstd.estimate_compression_context_size() is now deprecated and slated for
9 removal.
10 * Implemented a lot of fuzzing tests.
11 * CompressionParameters instances now perform extra validation by calling
12 ZSTD_checkCParams() at construction time.
13 * multi_compress_to_buffer() API for compressing multiple inputs as a
14 single operation, as efficiently as possible.
15 * ZSTD_CStream instances are now used across multiple operations on
16 ZstdCompressor instances, resulting in much better performance for
17 APIs that do streaming.
18 * ZSTD_DStream instances are now used across multiple operations on
19 ZstdDecompressor instances, resulting in much better performance for
20 APIs that do streaming.
21 * train_dictionary() now releases the GIL.
22 * Support for training dictionaries using the COVER algorithm.
23 * multi_decompress_to_buffer() API for decompressing multiple frames as a
24 single operation, as efficiently as possible.
25 * Support for multi-threaded compression.
26 * Disable deprecation warnings when compiling CFFI module.
27 * Fixed memory leak in train_dictionary().
28 * Removed DictParameters type.
29 * train_dictionary() now accepts keyword arguments instead of a
30 DictParameters instance to control dictionary generation.
31
4 32 0.7.0 (released 2017-02-07)
5 33 ---------------------------
6 34
This diff has been collapsed as it changes many lines, (580 lines changed) Show them Hide them
@@ -20,9 +20,11 b' State of Project'
20 20 ================
21 21
22 22 The project is officially in beta state. The author is reasonably satisfied
23 with the current API and that functionality works as advertised. There
24 may be some backwards incompatible changes before 1.0. Though the author
25 does not intend to make any major changes to the Python API.
23 that functionality works as advertised. **There will be some backwards
24 incompatible changes before 1.0, probably in the 0.9 release.** This may
25 involve renaming the main module from *zstd* to *zstandard* and renaming
26 various types and methods. Pin the package version to prevent unwanted
27 breakage when this change occurs!
26 28
27 29 This project is vendored and distributed with Mercurial 4.1, where it is
28 30 used in a production capacity.
@@ -32,6 +34,10 b' on Linux x86_x64 and Windows x86 and x86'
32 34 confident the extension is stable and works as advertised on these
33 35 platforms.
34 36
37 The CFFI bindings are mostly feature complete. Where a feature is implemented
38 in CFFI, unit tests run against both C extension and CFFI implementation to
39 ensure behavior parity.
40
35 41 Expected Changes
36 42 ----------------
37 43
@@ -47,13 +53,20 b" sizes using zstd's preferred defaults)."
47 53 There should be an API that accepts an object that conforms to the buffer
48 54 interface and returns an iterator over compressed or decompressed output.
49 55
56 There should be an API that exposes an ``io.RawIOBase`` interface to
57 compressor and decompressor streams, like how ``gzip.GzipFile`` from
58 the standard library works (issue 13).
59
50 60 The author is on the fence as to whether to support the extremely
51 61 low level compression and decompression APIs. It could be useful to
52 62 support compression without the framing headers. But the author doesn't
53 63 believe it a high priority at this time.
54 64
55 The CFFI bindings are feature complete and all tests run against both
56 the C extension and CFFI bindings to ensure behavior parity.
65 There will likely be a refactoring of the module names. Currently,
66 ``zstd`` is a C extension and ``zstd_cffi`` is the CFFI interface.
67 This means that all code for the C extension must be implemented in
68 C. ``zstd`` may be converted to a Python module so code can be reused
69 between CFFI and C and so not all code in the C extension has to be C.
57 70
58 71 Requirements
59 72 ============
@@ -152,10 +165,13 b' A Tox configuration is present to test a'
152 165 $ tox
153 166
154 167 Tests use the ``hypothesis`` Python package to perform fuzzing. If you
155 don't have it, those tests won't run.
168 don't have it, those tests won't run. Since the fuzzing tests take longer
169 to execute than normal tests, you'll need to opt in to running them by
170 setting the ``ZSTD_SLOW_TESTS`` environment variable. This is set
171 automatically when using ``tox``.
156 172
157 There is also an experimental CFFI module. You need the ``cffi`` Python
158 package installed to build and test that.
173 The ``cffi`` Python package needs to be installed in order to build the CFFI
174 bindings. If it isn't present, the CFFI bindings won't be built.
159 175
160 176 To create a virtualenv with all development dependencies, do something
161 177 like the following::
@@ -172,8 +188,16 b' like the following::'
172 188 API
173 189 ===
174 190
175 The compiled C extension provides a ``zstd`` Python module. This module
176 exposes the following interfaces.
191 The compiled C extension provides a ``zstd`` Python module. The CFFI
192 bindings provide a ``zstd_cffi`` module. Both provide an identical API
193 interface. The types, functions, and attributes exposed by these modules
194 are documented in the sections below.
195
196 .. note::
197
198 The documentation in this section makes references to various zstd
199 concepts and functionality. The ``Concepts`` section below explains
200 these concepts in more detail.
177 201
178 202 ZstdCompressor
179 203 --------------
@@ -209,6 +233,14 b' write_dict_id'
209 233 Whether to write the dictionary ID into the compressed data.
210 234 Defaults to True. The dictionary ID is only written if a dictionary
211 235 is being used.
236 threads
237 Enables and sets the number of threads to use for multi-threaded compression
238 operations. Defaults to 0, which means to use single-threaded compression.
239 Negative values will resolve to the number of logical CPUs in the system.
240 Read below for more info on multi-threaded compression. This argument only
241 controls thread count for operations that operate on individual pieces of
242 data. APIs that spawn multiple threads for working on multiple pieces of
243 data have their own ``threads`` argument.
212 244
213 245 Unless specified otherwise, assume that no two methods of ``ZstdCompressor``
214 246 instances can be called from multiple Python threads simultaneously. In other
@@ -222,6 +254,8 b' Simple API'
222 254 cctx = zstd.ZstdCompressor()
223 255 compressed = cctx.compress(b'data to compress')
224 256
257 The ``data`` argument can be any object that implements the *buffer protocol*.
258
225 259 Unless ``compression_params`` or ``dict_data`` are passed to the
226 260 ``ZstdCompressor``, each invocation of ``compress()`` will calculate the
227 261 optimal compression parameters for the configured compression ``level`` and
@@ -411,6 +445,42 b' the compressor::'
411 445 data = cobj.compress(b'foobar')
412 446 data = cobj.flush()
413 447
448 Batch Compression API
449 ^^^^^^^^^^^^^^^^^^^^^
450
451 (Experimental. Not yet supported in CFFI bindings.)
452
453 ``multi_compress_to_buffer(data, [threads=0])`` performs compression of multiple
454 inputs as a single operation.
455
456 Data to be compressed can be passed as a ``BufferWithSegmentsCollection``, a
457 ``BufferWithSegments``, or a list containing byte like objects. Each element of
458 the container will be compressed individually using the configured parameters
459 on the ``ZstdCompressor`` instance.
460
461 The ``threads`` argument controls how many threads to use for compression. The
462 default is ``0`` which means to use a single thread. Negative values use the
463 number of logical CPUs in the machine.
464
465 The function returns a ``BufferWithSegmentsCollection``. This type represents
466 N discrete memory allocations, eaching holding 1 or more compressed frames.
467
468 Output data is written to shared memory buffers. This means that unlike
469 regular Python objects, a reference to *any* object within the collection
470 keeps the shared buffer and therefore memory backing it alive. This can have
471 undesirable effects on process memory usage.
472
473 The API and behavior of this function is experimental and will likely change.
474 Known deficiencies include:
475
476 * If asked to use multiple threads, it will always spawn that many threads,
477 even if the input is too small to use them. It should automatically lower
478 the thread count when the extra threads would just add overhead.
479 * The buffer allocation strategy is fixed. There is room to make it dynamic,
480 perhaps even to allow one output buffer per input, facilitating a variation
481 of the API to return a list without the adverse effects of shared memory
482 buffers.
483
414 484 ZstdDecompressor
415 485 ----------------
416 486
@@ -585,6 +655,60 b' Here is how this API should be used::'
585 655 data = dobj.decompress(compressed_chunk_0)
586 656 data = dobj.decompress(compressed_chunk_1)
587 657
658 Batch Decompression API
659 ^^^^^^^^^^^^^^^^^^^^^^^
660
661 (Experimental. Not yet supported in CFFI bindings.)
662
663 ``multi_decompress_to_buffer()`` performs decompression of multiple
664 frames as a single operation and returns a ``BufferWithSegmentsCollection``
665 containing decompressed data for all inputs.
666
667 Compressed frames can be passed to the function as a ``BufferWithSegments``,
668 a ``BufferWithSegmentsCollection``, or as a list containing objects that
669 conform to the buffer protocol. For best performance, pass a
670 ``BufferWithSegmentsCollection`` or a ``BufferWithSegments``, as
671 minimal input validation will be done for that type. If calling from
672 Python (as opposed to C), constructing one of these instances may add
673 overhead cancelling out the performance overhead of validation for list
674 inputs.
675
676 The decompressed size of each frame must be discoverable. It can either be
677 embedded within the zstd frame (``write_content_size=True`` argument to
678 ``ZstdCompressor``) or passed in via the ``decompressed_sizes`` argument.
679
680 The ``decompressed_sizes`` argument is an object conforming to the buffer
681 protocol which holds an array of 64-bit unsigned integers in the machine's
682 native format defining the decompressed sizes of each frame. If this argument
683 is passed, it avoids having to scan each frame for its decompressed size.
684 This frame scanning can add noticeable overhead in some scenarios.
685
686 The ``threads`` argument controls the number of threads to use to perform
687 decompression operations. The default (``0``) or the value ``1`` means to
688 use a single thread. Negative values use the number of logical CPUs in the
689 machine.
690
691 .. note::
692
693 It is possible to pass a ``mmap.mmap()`` instance into this function by
694 wrapping it with a ``BufferWithSegments`` instance (which will define the
695 offsets of frames within the memory mapped region).
696
697 This function is logically equivalent to performing ``dctx.decompress()``
698 on each input frame and returning the result.
699
700 This function exists to perform decompression on multiple frames as fast
701 as possible by having as little overhead as possible. Since decompression is
702 performed as a single operation and since the decompressed output is stored in
703 a single buffer, extra memory allocations, Python objects, and Python function
704 calls are avoided. This is ideal for scenarios where callers need to access
705 decompressed data for multiple frames.
706
707 Currently, the implementation always spawns multiple threads when requested,
708 even if the amount of work to do is small. In the future, it will be smarter
709 about avoiding threads and their associated overhead when the amount of
710 work to do is small.
711
588 712 Content-Only Dictionary Chain Decompression
589 713 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
590 714
@@ -609,20 +733,20 b' Each zstd frame **must** have the conten'
609 733 The following Python code can be used to produce a *content-only dictionary
610 734 chain*::
611 735
612 def make_chain(inputs):
613 frames = []
736 def make_chain(inputs):
737 frames = []
614 738
615 # First frame is compressed in standalone/discrete mode.
616 zctx = zstd.ZstdCompressor(write_content_size=True)
617 frames.append(zctx.compress(inputs[0]))
739 # First frame is compressed in standalone/discrete mode.
740 zctx = zstd.ZstdCompressor(write_content_size=True)
741 frames.append(zctx.compress(inputs[0]))
618 742
619 # Subsequent frames use the previous fulltext as a content-only dictionary
620 for i, raw in enumerate(inputs[1:]):
621 dict_data = zstd.ZstdCompressionDict(inputs[i])
622 zctx = zstd.ZstdCompressor(write_content_size=True, dict_data=dict_data)
623 frames.append(zctx.compress(raw))
743 # Subsequent frames use the previous fulltext as a content-only dictionary
744 for i, raw in enumerate(inputs[1:]):
745 dict_data = zstd.ZstdCompressionDict(inputs[i])
746 zctx = zstd.ZstdCompressor(write_content_size=True, dict_data=dict_data)
747 frames.append(zctx.compress(raw))
624 748
625 return frames
749 return frames
626 750
627 751 ``decompress_content_dict_chain()`` returns the uncompressed data of the last
628 752 element in the input chain.
@@ -632,59 +756,42 b' on top of other Python APIs. However, th'
632 756 faster, especially for long input chains, as it avoids the overhead of
633 757 instantiating and passing around intermediate objects between C and Python.
634 758
635 Choosing an API
636 ---------------
637
638 Various forms of compression and decompression APIs are provided because each
639 are suitable for different use cases.
759 Multi-Threaded Compression
760 --------------------------
640 761
641 The simple/one-shot APIs are useful for small data, when the decompressed
642 data size is known (either recorded in the zstd frame header via
643 ``write_content_size`` or known via an out-of-band mechanism, such as a file
644 size).
762 ``ZstdCompressor`` accepts a ``threads`` argument that controls the number
763 of threads to use for compression. The way this works is that input is split
764 into segments and each segment is fed into a worker pool for compression. Once
765 a segment is compressed, it is flushed/appended to the output.
645 766
646 A limitation of the simple APIs is that input or output data must fit in memory.
647 And unless using advanced tricks with Python *buffer objects*, both input and
648 output must fit in memory simultaneously.
649
650 Another limitation is that compression or decompression is performed as a single
651 operation. So if you feed large input, it could take a long time for the
652 function to return.
767 The segment size for multi-threaded compression is chosen from the window size
768 of the compressor. This is derived from the ``window_log`` attribute of a
769 ``CompressionParameters`` instance. By default, segment sizes are in the 1+MB
770 range.
653 771
654 The streaming APIs do not have the limitations of the simple API. The cost to
655 this is they are more complex to use than a single function call.
656
657 The streaming APIs put the caller in control of compression and decompression
658 behavior by allowing them to directly control either the input or output side
659 of the operation.
660
661 With the streaming input APIs, the caller feeds data into the compressor or
662 decompressor as they see fit. Output data will only be written after the caller
663 has explicitly written data.
772 If multi-threaded compression is requested and the input is smaller than the
773 configured segment size, only a single compression thread will be used. If the
774 input is smaller than the segment size multiplied by the thread pool size or
775 if data cannot be delivered to the compressor fast enough, not all requested
776 compressor threads may be active simultaneously.
664 777
665 With the streaming output APIs, the caller consumes output from the compressor
666 or decompressor as they see fit. The compressor or decompressor will only
667 consume data from the source when the caller is ready to receive it.
778 Compared to non-multi-threaded compression, multi-threaded compression has
779 higher per-operation overhead. This includes extra memory operations,
780 thread creation, lock acquisition, etc.
668 781
669 One end of the streaming APIs involves a file-like object that must
670 ``write()`` output data or ``read()`` input data. Depending on what the
671 backing storage for these objects is, those operations may not complete quickly.
672 For example, when streaming compressed data to a file, the ``write()`` into
673 a streaming compressor could result in a ``write()`` to the filesystem, which
674 may take a long time to finish due to slow I/O on the filesystem. So, there
675 may be overhead in streaming APIs beyond the compression and decompression
676 operations.
782 Due to the nature of multi-threaded compression using *N* compression
783 *states*, the output from multi-threaded compression will likely be larger
784 than non-multi-threaded compression. The difference is usually small. But
785 there is a CPU/wall time versus size trade off that may warrant investigation.
786
787 Output from multi-threaded compression does not require any special handling
788 on the decompression side. In other words, any zstd decompressor should be able
789 to consume data produced with multi-threaded compression.
677 790
678 791 Dictionary Creation and Management
679 792 ----------------------------------
680 793
681 Zstandard allows *dictionaries* to be used when compressing and
682 decompressing data. The idea is that if you are compressing a lot of similar
683 data, you can precompute common properties of that data (such as recurring
684 byte sequences) to achieve better compression ratios.
685
686 In Python, compression dictionaries are represented as the
687 ``ZstdCompressionDict`` type.
794 Compression dictionaries are represented as the ``ZstdCompressionDict`` type.
688 795
689 796 Instances can be constructed from bytes::
690 797
@@ -736,6 +843,88 b' a ``ZstdCompressionDict`` later) via ``a'
736 843 dict_data = zstd.train_dictionary(size, samples)
737 844 raw_data = dict_data.as_bytes()
738 845
846 The following named arguments to ``train_dictionary`` can also be used
847 to further control dictionary generation.
848
849 selectivity
850 Integer selectivity level. Default is 9. Larger values yield more data in
851 dictionary.
852 level
853 Integer compression level. Default is 6.
854 dict_id
855 Integer dictionary ID for the produced dictionary. Default is 0, which
856 means to use a random value.
857 notifications
858 Controls writing of informational messages to ``stderr``. ``0`` (the
859 default) means to write nothing. ``1`` writes errors. ``2`` writes
860 progression info. ``3`` writes more details. And ``4`` writes all info.
861
862 Cover Dictionaries
863 ^^^^^^^^^^^^^^^^^^
864
865 An alternate dictionary training mechanism named *cover* is also available.
866 More details about this training mechanism are available in the paper
867 *Effective Construction of Relative Lempel-Ziv Dictionaries* (authors:
868 Liao, Petri, Moffat, Wirth).
869
870 To use this mechanism, use ``zstd.train_cover_dictionary()`` instead of
871 ``zstd.train_dictionary()``. The function behaves nearly the same except
872 its arguments are different and the returned dictionary will contain ``k``
873 and ``d`` attributes reflecting the parameters to the cover algorithm.
874
875 .. note::
876
877 The ``k`` and ``d`` attributes are only populated on dictionary
878 instances created by this function. If a ``ZstdCompressionDict`` is
879 constructed from raw bytes data, the ``k`` and ``d`` attributes will
880 be ``0``.
881
882 The segment and dmer size parameters to the cover algorithm can either be
883 specified manually or you can ask ``train_cover_dictionary()`` to try
884 multiple values and pick the best one, where *best* means the smallest
885 compressed data size.
886
887 In manual mode, the ``k`` and ``d`` arguments must be specified or a
888 ``ZstdError`` will be raised.
889
890 In automatic mode (triggered by specifying ``optimize=True``), ``k``
891 and ``d`` are optional. If a value isn't specified, then default values for
892 both are tested. The ``steps`` argument can control the number of steps
893 through ``k`` values. The ``level`` argument defines the compression level
894 that will be used when testing the compressed size. And ``threads`` can
895 specify the number of threads to use for concurrent operation.
896
897 This function takes the following arguments:
898
899 dict_size
900 Target size in bytes of the dictionary to generate.
901 samples
902 A list of bytes holding samples the dictionary will be trained from.
903 k
904 Parameter to cover algorithm defining the segment size. A reasonable range
905 is [16, 2048+].
906 d
907 Parameter to cover algorithm defining the dmer size. A reasonable range is
908 [6, 16]. ``d`` must be less than or equal to ``k``.
909 dict_id
910 Integer dictionary ID for the produced dictionary. Default is 0, which uses
911 a random value.
912 optimize
913 When true, test dictionary generation with multiple parameters.
914 level
915 Integer target compression level when testing compression with
916 ``optimize=True``. Default is 1.
917 steps
918 Number of steps through ``k`` values to perform when ``optimize=True``.
919 Default is 32.
920 threads
921 Number of threads to use when ``optimize=True``. Default is 0, which means
922 to use a single thread. A negative value can be specified to use as many
923 threads as there are detected logical CPUs.
924 notifications
925 Controls writing of informational messages to ``stderr``. See the
926 documentation for ``train_dictionary()`` for more.
927
739 928 Explicit Compression Parameters
740 929 -------------------------------
741 930
@@ -904,6 +1093,267 b' 100 byte inputs will be significant (pos'
904 1093 whereas 10 1,000,000 byte inputs will be more similar in speed (because the
905 1094 time spent doing compression dwarfs time spent creating new *contexts*).
906 1095
1096 Buffer Types
1097 ------------
1098
1099 The API exposes a handful of custom types for interfacing with memory buffers.
1100 The primary goal of these types is to facilitate efficient multi-object
1101 operations.
1102
1103 The essential idea is to have a single memory allocation provide backing
1104 storage for multiple logical objects. This has 2 main advantages: fewer
1105 allocations and optimal memory access patterns. This avoids having to allocate
1106 a Python object for each logical object and furthermore ensures that access of
1107 data for objects can be sequential (read: fast) in memory.
1108
1109 BufferWithSegments
1110 ^^^^^^^^^^^^^^^^^^
1111
1112 The ``BufferWithSegments`` type represents a memory buffer containing N
1113 discrete items of known lengths (segments). It is essentially a fixed size
1114 memory address and an array of 2-tuples of ``(offset, length)`` 64-bit
1115 unsigned native endian integers defining the byte offset and length of each
1116 segment within the buffer.
1117
1118 Instances behave like containers.
1119
1120 ``len()`` returns the number of segments within the instance.
1121
1122 ``o[index]`` or ``__getitem__`` obtains a ``BufferSegment`` representing an
1123 individual segment within the backing buffer. That returned object references
1124 (not copies) memory. This means that iterating all objects doesn't copy
1125 data within the buffer.
1126
1127 The ``.size`` attribute contains the total size in bytes of the backing
1128 buffer.
1129
1130 Instances conform to the buffer protocol. So a reference to the backing bytes
1131 can be obtained via ``memoryview(o)``. A *copy* of the backing bytes can also
1132 be obtained via ``.tobytes()``.
1133
1134 The ``.segments`` attribute exposes the array of ``(offset, length)`` for
1135 segments within the buffer. It is a ``BufferSegments`` type.
1136
1137 BufferSegment
1138 ^^^^^^^^^^^^^
1139
1140 The ``BufferSegment`` type represents a segment within a ``BufferWithSegments``.
1141 It is essentially a reference to N bytes within a ``BufferWithSegments``.
1142
1143 ``len()`` returns the length of the segment in bytes.
1144
1145 ``.offset`` contains the byte offset of this segment within its parent
1146 ``BufferWithSegments`` instance.
1147
1148 The object conforms to the buffer protocol. ``.tobytes()`` can be called to
1149 obtain a ``bytes`` instance with a copy of the backing bytes.
1150
1151 BufferSegments
1152 ^^^^^^^^^^^^^^
1153
1154 This type represents an array of ``(offset, length)`` integers defining segments
1155 within a ``BufferWithSegments``.
1156
1157 The array members are 64-bit unsigned integers using host/native bit order.
1158
1159 Instances conform to the buffer protocol.
1160
1161 BufferWithSegmentsCollection
1162 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
1163
1164 The ``BufferWithSegmentsCollection`` type represents a virtual spanning view
1165 of multiple ``BufferWithSegments`` instances.
1166
1167 Instances are constructed from 1 or more ``BufferWithSegments`` instances. The
1168 resulting object behaves like an ordered sequence whose members are the
1169 segments within each ``BufferWithSegments``.
1170
1171 ``len()`` returns the number of segments within all ``BufferWithSegments``
1172 instances.
1173
1174 ``o[index]`` and ``__getitem__(index)`` return the ``BufferSegment`` at
1175 that offset as if all ``BufferWithSegments`` instances were a single
1176 entity.
1177
1178 If the object is composed of 2 ``BufferWithSegments`` instances with the
1179 first having 2 segments and the second have 3 segments, then ``b[0]``
1180 and ``b[1]`` access segments in the first object and ``b[2]``, ``b[3]``,
1181 and ``b[4]`` access segments from the second.
1182
1183 Choosing an API
1184 ===============
1185
1186 There are multiple APIs for performing compression and decompression. This is
1187 because different applications have different needs and the library wants to
1188 facilitate optimal use in as many use cases as possible.
1189
1190 From a high-level, APIs are divided into *one-shot* and *streaming*. See
1191 the ``Concepts`` section for a description of how these are different at
1192 the C layer.
1193
1194 The *one-shot* APIs are useful for small data, where the input or output
1195 size is known. (The size can come from a buffer length, file size, or
1196 stored in the zstd frame header.) A limitation of the *one-shot* APIs is that
1197 input and output must fit in memory simultaneously. For say a 4 GB input,
1198 this is often not feasible.
1199
1200 The *one-shot* APIs also perform all work as a single operation. So, if you
1201 feed it large input, it could take a long time for the function to return.
1202
1203 The streaming APIs do not have the limitations of the simple API. But the
1204 price you pay for this flexibility is that they are more complex than a
1205 single function call.
1206
1207 The streaming APIs put the caller in control of compression and decompression
1208 behavior by allowing them to directly control either the input or output side
1209 of the operation.
1210
1211 With the *streaming input*, *compressor*, and *decompressor* APIs, the caller
1212 has full control over the input to the compression or decompression stream.
1213 They can directly choose when new data is operated on.
1214
1215 With the *streaming ouput* APIs, the caller has full control over the output
1216 of the compression or decompression stream. It can choose when to receive
1217 new data.
1218
1219 When using the *streaming* APIs that operate on file-like or stream objects,
1220 it is important to consider what happens in that object when I/O is requested.
1221 There is potential for long pauses as data is read or written from the
1222 underlying stream (say from interacting with a filesystem or network). This
1223 could add considerable overhead.
1224
1225 Concepts
1226 ========
1227
1228 It is important to have a basic understanding of how Zstandard works in order
1229 to optimally use this library. In addition, there are some low-level Python
1230 concepts that are worth explaining to aid understanding. This section aims to
1231 provide that knowledge.
1232
1233 Zstandard Frames and Compression Format
1234 ---------------------------------------
1235
1236 Compressed zstandard data almost always exists within a container called a
1237 *frame*. (For the technically curious, see the
1238 `specification <https://github.com/facebook/zstd/blob/3bee41a70eaf343fbcae3637b3f6edbe52f35ed8/doc/zstd_compression_format.md>_.)
1239
1240 The frame contains a header and optional trailer. The header contains a
1241 magic number to self-identify as a zstd frame and a description of the
1242 compressed data that follows.
1243
1244 Among other things, the frame *optionally* contains the size of the
1245 decompressed data the frame represents, a 32-bit checksum of the
1246 decompressed data (to facilitate verification during decompression),
1247 and the ID of the dictionary used to compress the data.
1248
1249 Storing the original content size in the frame (``write_content_size=True``
1250 to ``ZstdCompressor``) is important for performance in some scenarios. Having
1251 the decompressed size stored there (or storing it elsewhere) allows
1252 decompression to perform a single memory allocation that is exactly sized to
1253 the output. This is faster than continuously growing a memory buffer to hold
1254 output.
1255
1256 Compression and Decompression Contexts
1257 --------------------------------------
1258
1259 In order to perform a compression or decompression operation with the zstd
1260 C API, you need what's called a *context*. A context essentially holds
1261 configuration and state for a compression or decompression operation. For
1262 example, a compression context holds the configured compression level.
1263
1264 Contexts can be reused for multiple operations. Since creating and
1265 destroying contexts is not free, there are performance advantages to
1266 reusing contexts.
1267
1268 The ``ZstdCompressor`` and ``ZstdDecompressor`` types are essentially
1269 wrappers around these contexts in the zstd C API.
1270
1271 One-shot And Streaming Operations
1272 ---------------------------------
1273
1274 A compression or decompression operation can either be performed as a
1275 single *one-shot* operation or as a continuous *streaming* operation.
1276
1277 In one-shot mode (the *simple* APIs provided by the Python interface),
1278 **all** input is handed to the compressor or decompressor as a single buffer
1279 and **all** output is returned as a single buffer.
1280
1281 In streaming mode, input is delivered to the compressor or decompressor as
1282 a series of chunks via multiple function calls. Likewise, output is
1283 obtained in chunks as well.
1284
1285 Streaming operations require an additional *stream* object to be created
1286 to track the operation. These are logical extensions of *context*
1287 instances.
1288
1289 There are advantages and disadvantages to each mode of operation. There
1290 are scenarios where certain modes can't be used. See the
1291 ``Choosing an API`` section for more.
1292
1293 Dictionaries
1294 ------------
1295
1296 A compression *dictionary* is essentially data used to seed the compressor
1297 state so it can achieve better compression. The idea is that if you are
1298 compressing a lot of similar pieces of data (e.g. JSON documents or anything
1299 sharing similar structure), then you can find common patterns across multiple
1300 objects then leverage those common patterns during compression and
1301 decompression operations to achieve better compression ratios.
1302
1303 Dictionary compression is generally only useful for small inputs - data no
1304 larger than a few kilobytes. The upper bound on this range is highly dependent
1305 on the input data and the dictionary.
1306
1307 Python Buffer Protocol
1308 ----------------------
1309
1310 Many functions in the library operate on objects that implement Python's
1311 `buffer protocol <https://docs.python.org/3.6/c-api/buffer.html>`_.
1312
1313 The *buffer protocol* is an internal implementation detail of a Python
1314 type that allows instances of that type (objects) to be exposed as a raw
1315 pointer (or buffer) in the C API. In other words, it allows objects to be
1316 exposed as an array of bytes.
1317
1318 From the perspective of the C API, objects implementing the *buffer protocol*
1319 all look the same: they are just a pointer to a memory address of a defined
1320 length. This allows the C API to be largely type agnostic when accessing their
1321 data. This allows custom types to be passed in without first converting them
1322 to a specific type.
1323
1324 Many Python types implement the buffer protocol. These include ``bytes``
1325 (``str`` on Python 2), ``bytearray``, ``array.array``, ``io.BytesIO``,
1326 ``mmap.mmap``, and ``memoryview``.
1327
1328 ``python-zstandard`` APIs that accept objects conforming to the buffer
1329 protocol require that the buffer is *C contiguous* and has a single
1330 dimension (``ndim==1``). This is usually the case. An example of where it
1331 is not is a Numpy matrix type.
1332
1333 Requiring Output Sizes for Non-Streaming Decompression APIs
1334 -----------------------------------------------------------
1335
1336 Non-streaming decompression APIs require that either the output size is
1337 explicitly defined (either in the zstd frame header or passed into the
1338 function) or that a max output size is specified. This restriction is for
1339 your safety.
1340
1341 The *one-shot* decompression APIs store the decompressed result in a
1342 single buffer. This means that a buffer needs to be pre-allocated to hold
1343 the result. If the decompressed size is not known, then there is no universal
1344 good default size to use. Any default will fail or will be highly sub-optimal
1345 in some scenarios (it will either be too small or will put stress on the
1346 memory allocator to allocate a too large block).
1347
1348 A *helpful* API may retry decompression with buffers of increasing size.
1349 While useful, there are obvious performance disadvantages, namely redoing
1350 decompression N times until it works. In addition, there is a security
1351 concern. Say the input came from highly compressible data, like 1 GB of the
1352 same byte value. The output size could be several magnitudes larger than the
1353 input size. An input of <100KB could decompress to >1GB. Without a bounds
1354 restriction on the decompressed size, certain inputs could exhaust all system
1355 memory. That's not good and is why the maximum output size is limited.
1356
907 1357 Note on Zstandard's *Experimental* API
908 1358 ======================================
909 1359
@@ -11,46 +11,48 b''
11 11 extern PyObject* ZstdError;
12 12
13 13 ZstdCompressionDict* train_dictionary(PyObject* self, PyObject* args, PyObject* kwargs) {
14 static char *kwlist[] = { "dict_size", "samples", "parameters", NULL };
14 static char* kwlist[] = {
15 "dict_size",
16 "samples",
17 "selectivity",
18 "level",
19 "notifications",
20 "dict_id",
21 NULL
22 };
15 23 size_t capacity;
16 24 PyObject* samples;
17 25 Py_ssize_t samplesLen;
18 PyObject* parameters = NULL;
26 unsigned selectivity = 0;
27 int level = 0;
28 unsigned notifications = 0;
29 unsigned dictID = 0;
19 30 ZDICT_params_t zparams;
20 31 Py_ssize_t sampleIndex;
21 32 Py_ssize_t sampleSize;
22 33 PyObject* sampleItem;
23 34 size_t zresult;
24 void* sampleBuffer;
35 void* sampleBuffer = NULL;
25 36 void* sampleOffset;
26 37 size_t samplesSize = 0;
27 size_t* sampleSizes;
28 void* dict;
29 ZstdCompressionDict* result;
38 size_t* sampleSizes = NULL;
39 void* dict = NULL;
40 ZstdCompressionDict* result = NULL;
30 41
31 if (!PyArg_ParseTupleAndKeywords(args, kwargs, "nO!|O!:train_dictionary",
42 if (!PyArg_ParseTupleAndKeywords(args, kwargs, "nO!|IiII:train_dictionary",
32 43 kwlist,
33 44 &capacity,
34 45 &PyList_Type, &samples,
35 (PyObject*)&DictParametersType, &parameters)) {
46 &selectivity, &level, &notifications, &dictID)) {
36 47 return NULL;
37 48 }
38 49
39 /* Validate parameters first since it is easiest. */
40 zparams.selectivityLevel = 0;
41 zparams.compressionLevel = 0;
42 zparams.notificationLevel = 0;
43 zparams.dictID = 0;
44 zparams.reserved[0] = 0;
45 zparams.reserved[1] = 0;
50 memset(&zparams, 0, sizeof(zparams));
46 51
47 if (parameters) {
48 /* TODO validate data ranges */
49 zparams.selectivityLevel = PyLong_AsUnsignedLong(PyTuple_GetItem(parameters, 0));
50 zparams.compressionLevel = PyLong_AsLong(PyTuple_GetItem(parameters, 1));
51 zparams.notificationLevel = PyLong_AsUnsignedLong(PyTuple_GetItem(parameters, 2));
52 zparams.dictID = PyLong_AsUnsignedLong(PyTuple_GetItem(parameters, 3));
53 }
52 zparams.selectivityLevel = selectivity;
53 zparams.compressionLevel = level;
54 zparams.notificationLevel = notifications;
55 zparams.dictID = dictID;
54 56
55 57 /* Figure out the size of the raw samples */
56 58 samplesLen = PyList_Size(samples);
@@ -68,13 +70,12 b' ZstdCompressionDict* train_dictionary(Py'
68 70 sampleBuffer = PyMem_Malloc(samplesSize);
69 71 if (!sampleBuffer) {
70 72 PyErr_NoMemory();
71 return NULL;
73 goto finally;
72 74 }
73 75 sampleSizes = PyMem_Malloc(samplesLen * sizeof(size_t));
74 76 if (!sampleSizes) {
75 PyMem_Free(sampleBuffer);
76 77 PyErr_NoMemory();
77 return NULL;
78 goto finally;
78 79 }
79 80
80 81 sampleOffset = sampleBuffer;
@@ -89,33 +90,168 b' ZstdCompressionDict* train_dictionary(Py'
89 90
90 91 dict = PyMem_Malloc(capacity);
91 92 if (!dict) {
92 PyMem_Free(sampleSizes);
93 PyMem_Free(sampleBuffer);
94 93 PyErr_NoMemory();
95 return NULL;
94 goto finally;
96 95 }
97 96
97 /* TODO consider using dup2() to redirect zstd's stderr writing to a buffer */
98 Py_BEGIN_ALLOW_THREADS
98 99 zresult = ZDICT_trainFromBuffer_advanced(dict, capacity,
99 100 sampleBuffer, sampleSizes, (unsigned int)samplesLen,
100 101 zparams);
102 Py_END_ALLOW_THREADS
101 103 if (ZDICT_isError(zresult)) {
102 104 PyErr_Format(ZstdError, "Cannot train dict: %s", ZDICT_getErrorName(zresult));
103 105 PyMem_Free(dict);
104 PyMem_Free(sampleSizes);
105 PyMem_Free(sampleBuffer);
106 return NULL;
106 goto finally;
107 107 }
108 108
109 109 result = PyObject_New(ZstdCompressionDict, &ZstdCompressionDictType);
110 110 if (!result) {
111 return NULL;
111 goto finally;
112 112 }
113 113
114 114 result->dictData = dict;
115 115 result->dictSize = zresult;
116 result->d = 0;
117 result->k = 0;
118
119 finally:
120 PyMem_Free(sampleBuffer);
121 PyMem_Free(sampleSizes);
122
116 123 return result;
117 124 }
118 125
126 ZstdCompressionDict* train_cover_dictionary(PyObject* self, PyObject* args, PyObject* kwargs) {
127 static char* kwlist[] = {
128 "dict_size",
129 "samples",
130 "k",
131 "d",
132 "notifications",
133 "dict_id",
134 "level",
135 "optimize",
136 "steps",
137 "threads",
138 NULL
139 };
140
141 size_t capacity;
142 PyObject* samples;
143 unsigned k = 0;
144 unsigned d = 0;
145 unsigned notifications = 0;
146 unsigned dictID = 0;
147 int level = 0;
148 PyObject* optimize = NULL;
149 unsigned steps = 0;
150 int threads = 0;
151 COVER_params_t params;
152 Py_ssize_t samplesLen;
153 Py_ssize_t i;
154 size_t samplesSize = 0;
155 void* sampleBuffer = NULL;
156 size_t* sampleSizes = NULL;
157 void* sampleOffset;
158 Py_ssize_t sampleSize;
159 void* dict = NULL;
160 size_t zresult;
161 ZstdCompressionDict* result = NULL;
162
163 if (!PyArg_ParseTupleAndKeywords(args, kwargs, "nO!|IIIIiOIi:train_cover_dictionary",
164 kwlist, &capacity, &PyList_Type, &samples,
165 &k, &d, &notifications, &dictID, &level, &optimize, &steps, &threads)) {
166 return NULL;
167 }
168
169 if (threads < 0) {
170 threads = cpu_count();
171 }
172
173 memset(&params, 0, sizeof(params));
174 params.k = k;
175 params.d = d;
176 params.steps = steps;
177 params.nbThreads = threads;
178 params.notificationLevel = notifications;
179 params.dictID = dictID;
180 params.compressionLevel = level;
181
182 /* Figure out total size of input samples. */
183 samplesLen = PyList_Size(samples);
184 for (i = 0; i < samplesLen; i++) {
185 PyObject* sampleItem = PyList_GET_ITEM(samples, i);
186
187 if (!PyBytes_Check(sampleItem)) {
188 PyErr_SetString(PyExc_ValueError, "samples must be bytes");
189 return NULL;
190 }
191 samplesSize += PyBytes_GET_SIZE(sampleItem);
192 }
193
194 sampleBuffer = PyMem_Malloc(samplesSize);
195 if (!sampleBuffer) {
196 PyErr_NoMemory();
197 goto finally;
198 }
199
200 sampleSizes = PyMem_Malloc(samplesLen * sizeof(size_t));
201 if (!sampleSizes) {
202 PyErr_NoMemory();
203 goto finally;
204 }
205
206 sampleOffset = sampleBuffer;
207 for (i = 0; i < samplesLen; i++) {
208 PyObject* sampleItem = PyList_GET_ITEM(samples, i);
209 sampleSize = PyBytes_GET_SIZE(sampleItem);
210 sampleSizes[i] = sampleSize;
211 memcpy(sampleOffset, PyBytes_AS_STRING(sampleItem), sampleSize);
212 sampleOffset = (char*)sampleOffset + sampleSize;
213 }
214
215 dict = PyMem_Malloc(capacity);
216 if (!dict) {
217 PyErr_NoMemory();
218 goto finally;
219 }
220
221 Py_BEGIN_ALLOW_THREADS
222 if (optimize && PyObject_IsTrue(optimize)) {
223 zresult = COVER_optimizeTrainFromBuffer(dict, capacity,
224 sampleBuffer, sampleSizes, (unsigned)samplesLen, &params);
225 }
226 else {
227 zresult = COVER_trainFromBuffer(dict, capacity,
228 sampleBuffer, sampleSizes, (unsigned)samplesLen, params);
229 }
230 Py_END_ALLOW_THREADS
231
232 if (ZDICT_isError(zresult)) {
233 PyMem_Free(dict);
234 PyErr_Format(ZstdError, "cannot train dict: %s", ZDICT_getErrorName(zresult));
235 goto finally;
236 }
237
238 result = PyObject_New(ZstdCompressionDict, &ZstdCompressionDictType);
239 if (!result) {
240 PyMem_Free(dict);
241 goto finally;
242 }
243
244 result->dictData = dict;
245 result->dictSize = zresult;
246 result->d = params.d;
247 result->k = params.k;
248
249 finally:
250 PyMem_Free(sampleBuffer);
251 PyMem_Free(sampleSizes);
252
253 return result;
254 }
119 255
120 256 PyDoc_STRVAR(ZstdCompressionDict__doc__,
121 257 "ZstdCompressionDict(data) - Represents a computed compression dictionary\n"
@@ -180,6 +316,14 b' static PyMethodDef ZstdCompressionDict_m'
180 316 { NULL, NULL }
181 317 };
182 318
319 static PyMemberDef ZstdCompressionDict_members[] = {
320 { "k", T_UINT, offsetof(ZstdCompressionDict, k), READONLY,
321 "segment size" },
322 { "d", T_UINT, offsetof(ZstdCompressionDict, d), READONLY,
323 "dmer size" },
324 { NULL }
325 };
326
183 327 static Py_ssize_t ZstdCompressionDict_length(ZstdCompressionDict* self) {
184 328 return self->dictSize;
185 329 }
@@ -224,7 +368,7 b' PyTypeObject ZstdCompressionDictType = {'
224 368 0, /* tp_iter */
225 369 0, /* tp_iternext */
226 370 ZstdCompressionDict_methods, /* tp_methods */
227 0, /* tp_members */
371 ZstdCompressionDict_members, /* tp_members */
228 372 0, /* tp_getset */
229 373 0, /* tp_base */
230 374 0, /* tp_dict */
@@ -67,6 +67,8 b' static int CompressionParameters_init(Co'
67 67 unsigned searchLength;
68 68 unsigned targetLength;
69 69 unsigned strategy;
70 ZSTD_compressionParameters params;
71 size_t zresult;
70 72
71 73 if (!PyArg_ParseTupleAndKeywords(args, kwargs, "IIIIIII:CompressionParameters",
72 74 kwlist, &windowLog, &chainLog, &hashLog, &searchLog, &searchLength,
@@ -117,9 +119,30 b' static int CompressionParameters_init(Co'
117 119 self->targetLength = targetLength;
118 120 self->strategy = strategy;
119 121
122 ztopy_compression_parameters(self, &params);
123 zresult = ZSTD_checkCParams(params);
124
125 if (ZSTD_isError(zresult)) {
126 PyErr_Format(PyExc_ValueError, "invalid compression parameters: %s",
127 ZSTD_getErrorName(zresult));
128 return -1;
129 }
130
120 131 return 0;
121 132 }
122 133
134 PyDoc_STRVAR(CompressionParameters_estimated_compression_context_size__doc__,
135 "Estimate the size in bytes of a compression context for compression parameters\n"
136 );
137
138 PyObject* CompressionParameters_estimated_compression_context_size(CompressionParametersObject* self) {
139 ZSTD_compressionParameters params;
140
141 ztopy_compression_parameters(self, &params);
142
143 return PyLong_FromSize_t(ZSTD_estimateCCtxSize(params));
144 }
145
123 146 PyObject* estimate_compression_context_size(PyObject* self, PyObject* args) {
124 147 CompressionParametersObject* params;
125 148 ZSTD_compressionParameters zparams;
@@ -142,6 +165,16 b' static void CompressionParameters_deallo'
142 165 PyObject_Del(self);
143 166 }
144 167
168 static PyMethodDef CompressionParameters_methods[] = {
169 {
170 "estimated_compression_context_size",
171 (PyCFunction)CompressionParameters_estimated_compression_context_size,
172 METH_NOARGS,
173 CompressionParameters_estimated_compression_context_size__doc__
174 },
175 { NULL, NULL }
176 };
177
145 178 static PyMemberDef CompressionParameters_members[] = {
146 179 { "window_log", T_UINT,
147 180 offsetof(CompressionParametersObject, windowLog), READONLY,
@@ -195,7 +228,7 b' PyTypeObject CompressionParametersType ='
195 228 0, /* tp_weaklistoffset */
196 229 0, /* tp_iter */
197 230 0, /* tp_iternext */
198 0, /* tp_methods */
231 CompressionParameters_methods, /* tp_methods */
199 232 CompressionParameters_members, /* tp_members */
200 233 0, /* tp_getset */
201 234 0, /* tp_base */
@@ -214,7 +247,7 b' void compressionparams_module_init(PyObj'
214 247 return;
215 248 }
216 249
217 Py_IncRef((PyObject*)&CompressionParametersType);
250 Py_INCREF(&CompressionParametersType);
218 251 PyModule_AddObject(mod, "CompressionParameters",
219 252 (PyObject*)&CompressionParametersType);
220 253 }
@@ -18,11 +18,6 b' static void ZstdCompressionWriter_deallo'
18 18 Py_XDECREF(self->compressor);
19 19 Py_XDECREF(self->writer);
20 20
21 if (self->cstream) {
22 ZSTD_freeCStream(self->cstream);
23 self->cstream = NULL;
24 }
25
26 21 PyObject_Del(self);
27 22 }
28 23
@@ -32,9 +27,15 b' static PyObject* ZstdCompressionWriter_e'
32 27 return NULL;
33 28 }
34 29
35 self->cstream = CStream_from_ZstdCompressor(self->compressor, self->sourceSize);
36 if (!self->cstream) {
37 return NULL;
30 if (self->compressor->mtcctx) {
31 if (init_mtcstream(self->compressor, self->sourceSize)) {
32 return NULL;
33 }
34 }
35 else {
36 if (0 != init_cstream(self->compressor, self->sourceSize)) {
37 return NULL;
38 }
38 39 }
39 40
40 41 self->entered = 1;
@@ -58,8 +59,8 b' static PyObject* ZstdCompressionWriter_e'
58 59
59 60 self->entered = 0;
60 61
61 if (self->cstream && exc_type == Py_None && exc_value == Py_None &&
62 exc_tb == Py_None) {
62 if ((self->compressor->cstream || self->compressor->mtcctx) && exc_type == Py_None
63 && exc_value == Py_None && exc_tb == Py_None) {
63 64
64 65 output.dst = PyMem_Malloc(self->outSize);
65 66 if (!output.dst) {
@@ -69,7 +70,12 b' static PyObject* ZstdCompressionWriter_e'
69 70 output.pos = 0;
70 71
71 72 while (1) {
72 zresult = ZSTD_endStream(self->cstream, &output);
73 if (self->compressor->mtcctx) {
74 zresult = ZSTDMT_endStream(self->compressor->mtcctx, &output);
75 }
76 else {
77 zresult = ZSTD_endStream(self->compressor->cstream, &output);
78 }
73 79 if (ZSTD_isError(zresult)) {
74 80 PyErr_Format(ZstdError, "error ending compression stream: %s",
75 81 ZSTD_getErrorName(zresult));
@@ -95,21 +101,19 b' static PyObject* ZstdCompressionWriter_e'
95 101 }
96 102
97 103 PyMem_Free(output.dst);
98 ZSTD_freeCStream(self->cstream);
99 self->cstream = NULL;
100 104 }
101 105
102 106 Py_RETURN_FALSE;
103 107 }
104 108
105 109 static PyObject* ZstdCompressionWriter_memory_size(ZstdCompressionWriter* self) {
106 if (!self->cstream) {
110 if (!self->compressor->cstream) {
107 111 PyErr_SetString(ZstdError, "cannot determine size of an inactive compressor; "
108 112 "call when a context manager is active");
109 113 return NULL;
110 114 }
111 115
112 return PyLong_FromSize_t(ZSTD_sizeof_CStream(self->cstream));
116 return PyLong_FromSize_t(ZSTD_sizeof_CStream(self->compressor->cstream));
113 117 }
114 118
115 119 static PyObject* ZstdCompressionWriter_write(ZstdCompressionWriter* self, PyObject* args) {
@@ -147,7 +151,13 b' static PyObject* ZstdCompressionWriter_w'
147 151
148 152 while ((ssize_t)input.pos < sourceSize) {
149 153 Py_BEGIN_ALLOW_THREADS
150 zresult = ZSTD_compressStream(self->cstream, &output, &input);
154 if (self->compressor->mtcctx) {
155 zresult = ZSTDMT_compressStream(self->compressor->mtcctx,
156 &output, &input);
157 }
158 else {
159 zresult = ZSTD_compressStream(self->compressor->cstream, &output, &input);
160 }
151 161 Py_END_ALLOW_THREADS
152 162
153 163 if (ZSTD_isError(zresult)) {
@@ -195,7 +205,12 b' static PyObject* ZstdCompressionWriter_f'
195 205
196 206 while (1) {
197 207 Py_BEGIN_ALLOW_THREADS
198 zresult = ZSTD_flushStream(self->cstream, &output);
208 if (self->compressor->mtcctx) {
209 zresult = ZSTDMT_flushStream(self->compressor->mtcctx, &output);
210 }
211 else {
212 zresult = ZSTD_flushStream(self->compressor->cstream, &output);
213 }
199 214 Py_END_ALLOW_THREADS
200 215
201 216 if (ZSTD_isError(zresult)) {
@@ -18,11 +18,6 b' static void ZstdCompressionObj_dealloc(Z'
18 18 PyMem_Free(self->output.dst);
19 19 self->output.dst = NULL;
20 20
21 if (self->cstream) {
22 ZSTD_freeCStream(self->cstream);
23 self->cstream = NULL;
24 }
25
26 21 Py_XDECREF(self->compressor);
27 22
28 23 PyObject_Del(self);
@@ -55,7 +50,13 b' static PyObject* ZstdCompressionObj_comp'
55 50
56 51 while ((ssize_t)input.pos < sourceSize) {
57 52 Py_BEGIN_ALLOW_THREADS
58 zresult = ZSTD_compressStream(self->cstream, &self->output, &input);
53 if (self->compressor->mtcctx) {
54 zresult = ZSTDMT_compressStream(self->compressor->mtcctx,
55 &self->output, &input);
56 }
57 else {
58 zresult = ZSTD_compressStream(self->compressor->cstream, &self->output, &input);
59 }
59 60 Py_END_ALLOW_THREADS
60 61
61 62 if (ZSTD_isError(zresult)) {
@@ -118,7 +119,12 b' static PyObject* ZstdCompressionObj_flus'
118 119 /* The output buffer is of size ZSTD_CStreamOutSize(), which is
119 120 guaranteed to hold a full block. */
120 121 Py_BEGIN_ALLOW_THREADS
121 zresult = ZSTD_flushStream(self->cstream, &self->output);
122 if (self->compressor->mtcctx) {
123 zresult = ZSTDMT_flushStream(self->compressor->mtcctx, &self->output);
124 }
125 else {
126 zresult = ZSTD_flushStream(self->compressor->cstream, &self->output);
127 }
122 128 Py_END_ALLOW_THREADS
123 129
124 130 if (ZSTD_isError(zresult)) {
@@ -150,7 +156,12 b' static PyObject* ZstdCompressionObj_flus'
150 156 self->finished = 1;
151 157
152 158 while (1) {
153 zresult = ZSTD_endStream(self->cstream, &self->output);
159 if (self->compressor->mtcctx) {
160 zresult = ZSTDMT_endStream(self->compressor->mtcctx, &self->output);
161 }
162 else {
163 zresult = ZSTD_endStream(self->compressor->cstream, &self->output);
164 }
154 165 if (ZSTD_isError(zresult)) {
155 166 PyErr_Format(ZstdError, "error ending compression stream: %s",
156 167 ZSTD_getErrorName(zresult));
@@ -182,9 +193,6 b' static PyObject* ZstdCompressionObj_flus'
182 193 }
183 194 }
184 195
185 ZSTD_freeCStream(self->cstream);
186 self->cstream = NULL;
187
188 196 if (result) {
189 197 return result;
190 198 }
This diff has been collapsed as it changes many lines, (957 lines changed) Show them Hide them
@@ -7,12 +7,17 b''
7 7 */
8 8
9 9 #include "python-zstandard.h"
10 #include "pool.h"
10 11
11 12 extern PyObject* ZstdError;
12 13
13 int populate_cdict(ZstdCompressor* compressor, void* dictData, size_t dictSize, ZSTD_parameters* zparams) {
14 int populate_cdict(ZstdCompressor* compressor, ZSTD_parameters* zparams) {
14 15 ZSTD_customMem zmem;
15 assert(!compressor->cdict);
16
17 if (compressor->cdict || !compressor->dict || !compressor->dict->dictData) {
18 return 0;
19 }
20
16 21 Py_BEGIN_ALLOW_THREADS
17 22 memset(&zmem, 0, sizeof(zmem));
18 23 compressor->cdict = ZSTD_createCDict_advanced(compressor->dict->dictData,
@@ -28,22 +33,32 b' int populate_cdict(ZstdCompressor* compr'
28 33 }
29 34
30 35 /**
31 * Initialize a zstd CStream from a ZstdCompressor instance.
32 *
33 * Returns a ZSTD_CStream on success or NULL on failure. If NULL, a Python
34 * exception will be set.
35 */
36 ZSTD_CStream* CStream_from_ZstdCompressor(ZstdCompressor* compressor, Py_ssize_t sourceSize) {
37 ZSTD_CStream* cstream;
36 * Ensure the ZSTD_CStream on a ZstdCompressor instance is initialized.
37 *
38 * Returns 0 on success. Other value on failure. Will set a Python exception
39 * on failure.
40 */
41 int init_cstream(ZstdCompressor* compressor, unsigned long long sourceSize) {
38 42 ZSTD_parameters zparams;
39 43 void* dictData = NULL;
40 44 size_t dictSize = 0;
41 45 size_t zresult;
42 46
43 cstream = ZSTD_createCStream();
44 if (!cstream) {
45 PyErr_SetString(ZstdError, "cannot create CStream");
46 return NULL;
47 if (compressor->cstream) {
48 zresult = ZSTD_resetCStream(compressor->cstream, sourceSize);
49 if (ZSTD_isError(zresult)) {
50 PyErr_Format(ZstdError, "could not reset CStream: %s",
51 ZSTD_getErrorName(zresult));
52 return -1;
53 }
54
55 return 0;
56 }
57
58 compressor->cstream = ZSTD_createCStream();
59 if (!compressor->cstream) {
60 PyErr_SetString(ZstdError, "could not create CStream");
61 return -1;
47 62 }
48 63
49 64 if (compressor->dict) {
@@ -63,15 +78,51 b' ZSTD_CStream* CStream_from_ZstdCompresso'
63 78
64 79 zparams.fParams = compressor->fparams;
65 80
66 zresult = ZSTD_initCStream_advanced(cstream, dictData, dictSize, zparams, sourceSize);
81 zresult = ZSTD_initCStream_advanced(compressor->cstream, dictData, dictSize,
82 zparams, sourceSize);
67 83
68 84 if (ZSTD_isError(zresult)) {
69 ZSTD_freeCStream(cstream);
85 ZSTD_freeCStream(compressor->cstream);
86 compressor->cstream = NULL;
70 87 PyErr_Format(ZstdError, "cannot init CStream: %s", ZSTD_getErrorName(zresult));
71 return NULL;
88 return -1;
72 89 }
73 90
74 return cstream;
91 return 0;;
92 }
93
94 int init_mtcstream(ZstdCompressor* compressor, Py_ssize_t sourceSize) {
95 size_t zresult;
96 void* dictData = NULL;
97 size_t dictSize = 0;
98 ZSTD_parameters zparams;
99
100 assert(compressor->mtcctx);
101
102 if (compressor->dict) {
103 dictData = compressor->dict->dictData;
104 dictSize = compressor->dict->dictSize;
105 }
106
107 memset(&zparams, 0, sizeof(zparams));
108 if (compressor->cparams) {
109 ztopy_compression_parameters(compressor->cparams, &zparams.cParams);
110 }
111 else {
112 zparams.cParams = ZSTD_getCParams(compressor->compressionLevel, sourceSize, dictSize);
113 }
114
115 zparams.fParams = compressor->fparams;
116
117 zresult = ZSTDMT_initCStream_advanced(compressor->mtcctx, dictData, dictSize,
118 zparams, sourceSize);
119
120 if (ZSTD_isError(zresult)) {
121 PyErr_Format(ZstdError, "cannot init CStream: %s", ZSTD_getErrorName(zresult));
122 return -1;
123 }
124
125 return 0;
75 126 }
76 127
77 128 PyDoc_STRVAR(ZstdCompressor__doc__,
@@ -103,6 +154,11 b' PyDoc_STRVAR(ZstdCompressor__doc__,'
103 154 " Determines whether the dictionary ID will be written into the compressed\n"
104 155 " data. Defaults to True. Only adds content to the compressed data if\n"
105 156 " a dictionary is being used.\n"
157 "threads\n"
158 " Number of threads to use to compress data concurrently. When set,\n"
159 " compression operations are performed on multiple threads. The default\n"
160 " value (0) disables multi-threaded compression. A value of ``-1`` means to\n"
161 " set the number of threads to the number of detected logical CPUs.\n"
106 162 );
107 163
108 164 static int ZstdCompressor_init(ZstdCompressor* self, PyObject* args, PyObject* kwargs) {
@@ -113,6 +169,7 b' static int ZstdCompressor_init(ZstdCompr'
113 169 "write_checksum",
114 170 "write_content_size",
115 171 "write_dict_id",
172 "threads",
116 173 NULL
117 174 };
118 175
@@ -122,16 +179,12 b' static int ZstdCompressor_init(ZstdCompr'
122 179 PyObject* writeChecksum = NULL;
123 180 PyObject* writeContentSize = NULL;
124 181 PyObject* writeDictID = NULL;
182 int threads = 0;
125 183
126 self->cctx = NULL;
127 self->dict = NULL;
128 self->cparams = NULL;
129 self->cdict = NULL;
130
131 if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|iO!O!OOO:ZstdCompressor",
184 if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|iO!O!OOOi:ZstdCompressor",
132 185 kwlist, &level, &ZstdCompressionDictType, &dict,
133 186 &CompressionParametersType, &params,
134 &writeChecksum, &writeContentSize, &writeDictID)) {
187 &writeChecksum, &writeContentSize, &writeDictID, &threads)) {
135 188 return -1;
136 189 }
137 190
@@ -146,12 +199,27 b' static int ZstdCompressor_init(ZstdCompr'
146 199 return -1;
147 200 }
148 201
202 if (threads < 0) {
203 threads = cpu_count();
204 }
205
206 self->threads = threads;
207
149 208 /* We create a ZSTD_CCtx for reuse among multiple operations to reduce the
150 209 overhead of each compression operation. */
151 self->cctx = ZSTD_createCCtx();
152 if (!self->cctx) {
153 PyErr_NoMemory();
154 return -1;
210 if (threads) {
211 self->mtcctx = ZSTDMT_createCCtx(threads);
212 if (!self->mtcctx) {
213 PyErr_NoMemory();
214 return -1;
215 }
216 }
217 else {
218 self->cctx = ZSTD_createCCtx();
219 if (!self->cctx) {
220 PyErr_NoMemory();
221 return -1;
222 }
155 223 }
156 224
157 225 self->compressionLevel = level;
@@ -182,6 +250,11 b' static int ZstdCompressor_init(ZstdCompr'
182 250 }
183 251
184 252 static void ZstdCompressor_dealloc(ZstdCompressor* self) {
253 if (self->cstream) {
254 ZSTD_freeCStream(self->cstream);
255 self->cstream = NULL;
256 }
257
185 258 Py_XDECREF(self->cparams);
186 259 Py_XDECREF(self->dict);
187 260
@@ -195,6 +268,11 b' static void ZstdCompressor_dealloc(ZstdC'
195 268 self->cctx = NULL;
196 269 }
197 270
271 if (self->mtcctx) {
272 ZSTDMT_freeCCtx(self->mtcctx);
273 self->mtcctx = NULL;
274 }
275
198 276 PyObject_Del(self);
199 277 }
200 278
@@ -229,7 +307,6 b' static PyObject* ZstdCompressor_copy_str'
229 307 Py_ssize_t sourceSize = 0;
230 308 size_t inSize = ZSTD_CStreamInSize();
231 309 size_t outSize = ZSTD_CStreamOutSize();
232 ZSTD_CStream* cstream;
233 310 ZSTD_inBuffer input;
234 311 ZSTD_outBuffer output;
235 312 Py_ssize_t totalRead = 0;
@@ -261,10 +338,17 b' static PyObject* ZstdCompressor_copy_str'
261 338 /* Prevent free on uninitialized memory in finally. */
262 339 output.dst = NULL;
263 340
264 cstream = CStream_from_ZstdCompressor(self, sourceSize);
265 if (!cstream) {
266 res = NULL;
267 goto finally;
341 if (self->mtcctx) {
342 if (init_mtcstream(self, sourceSize)) {
343 res = NULL;
344 goto finally;
345 }
346 }
347 else {
348 if (0 != init_cstream(self, sourceSize)) {
349 res = NULL;
350 goto finally;
351 }
268 352 }
269 353
270 354 output.dst = PyMem_Malloc(outSize);
@@ -300,7 +384,12 b' static PyObject* ZstdCompressor_copy_str'
300 384
301 385 while (input.pos < input.size) {
302 386 Py_BEGIN_ALLOW_THREADS
303 zresult = ZSTD_compressStream(cstream, &output, &input);
387 if (self->mtcctx) {
388 zresult = ZSTDMT_compressStream(self->mtcctx, &output, &input);
389 }
390 else {
391 zresult = ZSTD_compressStream(self->cstream, &output, &input);
392 }
304 393 Py_END_ALLOW_THREADS
305 394
306 395 if (ZSTD_isError(zresult)) {
@@ -325,7 +414,12 b' static PyObject* ZstdCompressor_copy_str'
325 414
326 415 /* We've finished reading. Now flush the compressor stream. */
327 416 while (1) {
328 zresult = ZSTD_endStream(cstream, &output);
417 if (self->mtcctx) {
418 zresult = ZSTDMT_endStream(self->mtcctx, &output);
419 }
420 else {
421 zresult = ZSTD_endStream(self->cstream, &output);
422 }
329 423 if (ZSTD_isError(zresult)) {
330 424 PyErr_Format(ZstdError, "error ending compression stream: %s",
331 425 ZSTD_getErrorName(zresult));
@@ -350,24 +444,17 b' static PyObject* ZstdCompressor_copy_str'
350 444 }
351 445 }
352 446
353 ZSTD_freeCStream(cstream);
354 cstream = NULL;
355
356 447 totalReadPy = PyLong_FromSsize_t(totalRead);
357 448 totalWritePy = PyLong_FromSsize_t(totalWrite);
358 449 res = PyTuple_Pack(2, totalReadPy, totalWritePy);
359 Py_DecRef(totalReadPy);
360 Py_DecRef(totalWritePy);
450 Py_DECREF(totalReadPy);
451 Py_DECREF(totalWritePy);
361 452
362 453 finally:
363 454 if (output.dst) {
364 455 PyMem_Free(output.dst);
365 456 }
366 457
367 if (cstream) {
368 ZSTD_freeCStream(cstream);
369 }
370
371 458 return res;
372 459 }
373 460
@@ -410,6 +497,18 b' static PyObject* ZstdCompressor_compress'
410 497 return NULL;
411 498 }
412 499
500 if (self->threads && self->dict) {
501 PyErr_SetString(ZstdError,
502 "compress() cannot be used with both dictionaries and multi-threaded compression");
503 return NULL;
504 }
505
506 if (self->threads && self->cparams) {
507 PyErr_SetString(ZstdError,
508 "compress() cannot be used with both compression parameters and multi-threaded compression");
509 return NULL;
510 }
511
413 512 /* Limitation in zstd C API doesn't let decompression side distinguish
414 513 between content size of 0 and unknown content size. This can make round
415 514 tripping via Python difficult. Until this is fixed, require a flag
@@ -456,24 +555,28 b' static PyObject* ZstdCompressor_compress'
456 555 https://github.com/facebook/zstd/issues/358 contains more info. We could
457 556 potentially add an argument somewhere to control this behavior.
458 557 */
459 if (dictData && !self->cdict) {
460 if (populate_cdict(self, dictData, dictSize, &zparams)) {
461 Py_DECREF(output);
462 return NULL;
463 }
558 if (0 != populate_cdict(self, &zparams)) {
559 Py_DECREF(output);
560 return NULL;
464 561 }
465 562
466 563 Py_BEGIN_ALLOW_THREADS
467 /* By avoiding ZSTD_compress(), we don't necessarily write out content
468 size. This means the argument to ZstdCompressor to control frame
469 parameters is honored. */
470 if (self->cdict) {
471 zresult = ZSTD_compress_usingCDict(self->cctx, dest, destSize,
472 source, sourceSize, self->cdict);
564 if (self->mtcctx) {
565 zresult = ZSTDMT_compressCCtx(self->mtcctx, dest, destSize,
566 source, sourceSize, self->compressionLevel);
473 567 }
474 568 else {
475 zresult = ZSTD_compress_advanced(self->cctx, dest, destSize,
476 source, sourceSize, dictData, dictSize, zparams);
569 /* By avoiding ZSTD_compress(), we don't necessarily write out content
570 size. This means the argument to ZstdCompressor to control frame
571 parameters is honored. */
572 if (self->cdict) {
573 zresult = ZSTD_compress_usingCDict(self->cctx, dest, destSize,
574 source, sourceSize, self->cdict);
575 }
576 else {
577 zresult = ZSTD_compress_advanced(self->cctx, dest, destSize,
578 source, sourceSize, dictData, dictSize, zparams);
579 }
477 580 }
478 581 Py_END_ALLOW_THREADS
479 582
@@ -507,21 +610,30 b' static ZstdCompressionObj* ZstdCompresso'
507 610
508 611 Py_ssize_t inSize = 0;
509 612 size_t outSize = ZSTD_CStreamOutSize();
510 ZstdCompressionObj* result = PyObject_New(ZstdCompressionObj, &ZstdCompressionObjType);
511 if (!result) {
512 return NULL;
513 }
613 ZstdCompressionObj* result = NULL;
514 614
515 615 if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|n:compressobj", kwlist, &inSize)) {
516 616 return NULL;
517 617 }
518 618
519 result->cstream = CStream_from_ZstdCompressor(self, inSize);
520 if (!result->cstream) {
521 Py_DECREF(result);
619 result = (ZstdCompressionObj*)PyObject_CallObject((PyObject*)&ZstdCompressionObjType, NULL);
620 if (!result) {
522 621 return NULL;
523 622 }
524 623
624 if (self->mtcctx) {
625 if (init_mtcstream(self, inSize)) {
626 Py_DECREF(result);
627 return NULL;
628 }
629 }
630 else {
631 if (0 != init_cstream(self, inSize)) {
632 Py_DECREF(result);
633 return NULL;
634 }
635 }
636
525 637 result->output.dst = PyMem_Malloc(outSize);
526 638 if (!result->output.dst) {
527 639 PyErr_NoMemory();
@@ -529,13 +641,9 b' static ZstdCompressionObj* ZstdCompresso'
529 641 return NULL;
530 642 }
531 643 result->output.size = outSize;
532 result->output.pos = 0;
533
534 644 result->compressor = self;
535 645 Py_INCREF(result->compressor);
536 646
537 result->finished = 0;
538
539 647 return result;
540 648 }
541 649
@@ -579,19 +687,10 b' static ZstdCompressorIterator* ZstdCompr'
579 687 return NULL;
580 688 }
581 689
582 result = PyObject_New(ZstdCompressorIterator, &ZstdCompressorIteratorType);
690 result = (ZstdCompressorIterator*)PyObject_CallObject((PyObject*)&ZstdCompressorIteratorType, NULL);
583 691 if (!result) {
584 692 return NULL;
585 693 }
586
587 result->compressor = NULL;
588 result->reader = NULL;
589 result->buffer = NULL;
590 result->cstream = NULL;
591 result->input.src = NULL;
592 result->output.dst = NULL;
593 result->readResult = NULL;
594
595 694 if (PyObject_HasAttrString(reader, "read")) {
596 695 result->reader = reader;
597 696 Py_INCREF(result->reader);
@@ -608,7 +707,6 b' static ZstdCompressorIterator* ZstdCompr'
608 707 goto except;
609 708 }
610 709
611 result->bufferOffset = 0;
612 710 sourceSize = result->buffer->len;
613 711 }
614 712 else {
@@ -621,9 +719,16 b' static ZstdCompressorIterator* ZstdCompr'
621 719 Py_INCREF(result->compressor);
622 720
623 721 result->sourceSize = sourceSize;
624 result->cstream = CStream_from_ZstdCompressor(self, sourceSize);
625 if (!result->cstream) {
626 goto except;
722
723 if (self->mtcctx) {
724 if (init_mtcstream(self, sourceSize)) {
725 goto except;
726 }
727 }
728 else {
729 if (0 != init_cstream(self, sourceSize)) {
730 goto except;
731 }
627 732 }
628 733
629 734 result->inSize = inSize;
@@ -635,26 +740,12 b' static ZstdCompressorIterator* ZstdCompr'
635 740 goto except;
636 741 }
637 742 result->output.size = outSize;
638 result->output.pos = 0;
639
640 result->input.src = NULL;
641 result->input.size = 0;
642 result->input.pos = 0;
643
644 result->finishedInput = 0;
645 result->finishedOutput = 0;
646 743
647 744 goto finally;
648 745
649 746 except:
650 if (result->cstream) {
651 ZSTD_freeCStream(result->cstream);
652 result->cstream = NULL;
653 }
654
655 Py_DecRef((PyObject*)result->compressor);
656 Py_DecRef(result->reader);
657
747 Py_XDECREF(result->compressor);
748 Py_XDECREF(result->reader);
658 749 Py_DECREF(result);
659 750 result = NULL;
660 751
@@ -703,7 +794,7 b' static ZstdCompressionWriter* ZstdCompre'
703 794 return NULL;
704 795 }
705 796
706 result = PyObject_New(ZstdCompressionWriter, &ZstdCompressionWriterType);
797 result = (ZstdCompressionWriter*)PyObject_CallObject((PyObject*)&ZstdCompressionWriterType, NULL);
707 798 if (!result) {
708 799 return NULL;
709 800 }
@@ -715,11 +806,671 b' static ZstdCompressionWriter* ZstdCompre'
715 806 Py_INCREF(result->writer);
716 807
717 808 result->sourceSize = sourceSize;
718
719 809 result->outSize = outSize;
720 810
721 result->entered = 0;
722 result->cstream = NULL;
811 return result;
812 }
813
814 typedef struct {
815 void* sourceData;
816 size_t sourceSize;
817 } DataSource;
818
819 typedef struct {
820 DataSource* sources;
821 Py_ssize_t sourcesSize;
822 unsigned long long totalSourceSize;
823 } DataSources;
824
825 typedef struct {
826 void* dest;
827 Py_ssize_t destSize;
828 BufferSegment* segments;
829 Py_ssize_t segmentsSize;
830 } DestBuffer;
831
832 typedef enum {
833 WorkerError_none = 0,
834 WorkerError_zstd = 1,
835 WorkerError_no_memory = 2,
836 } WorkerError;
837
838 /**
839 * Holds state for an individual worker performing multi_compress_to_buffer work.
840 */
841 typedef struct {
842 /* Used for compression. */
843 ZSTD_CCtx* cctx;
844 ZSTD_CDict* cdict;
845 int cLevel;
846 CompressionParametersObject* cParams;
847 ZSTD_frameParameters fParams;
848
849 /* What to compress. */
850 DataSource* sources;
851 Py_ssize_t sourcesSize;
852 Py_ssize_t startOffset;
853 Py_ssize_t endOffset;
854 unsigned long long totalSourceSize;
855
856 /* Result storage. */
857 DestBuffer* destBuffers;
858 Py_ssize_t destCount;
859
860 /* Error tracking. */
861 WorkerError error;
862 size_t zresult;
863 Py_ssize_t errorOffset;
864 } WorkerState;
865
866 static void compress_worker(WorkerState* state) {
867 Py_ssize_t inputOffset = state->startOffset;
868 Py_ssize_t remainingItems = state->endOffset - state->startOffset + 1;
869 Py_ssize_t currentBufferStartOffset = state->startOffset;
870 size_t zresult;
871 ZSTD_parameters zparams;
872 void* newDest;
873 size_t allocationSize;
874 size_t boundSize;
875 Py_ssize_t destOffset = 0;
876 DataSource* sources = state->sources;
877 DestBuffer* destBuffer;
878
879 assert(!state->destBuffers);
880 assert(0 == state->destCount);
881
882 if (state->cParams) {
883 ztopy_compression_parameters(state->cParams, &zparams.cParams);
884 }
885
886 zparams.fParams = state->fParams;
887
888 /*
889 * The total size of the compressed data is unknown until we actually
890 * compress data. That means we can't pre-allocate the exact size we need.
891 *
892 * There is a cost to every allocation and reallocation. So, it is in our
893 * interest to minimize the number of allocations.
894 *
895 * There is also a cost to too few allocations. If allocations are too
896 * large they may fail. If buffers are shared and all inputs become
897 * irrelevant at different lifetimes, then a reference to one segment
898 * in the buffer will keep the entire buffer alive. This leads to excessive
899 * memory usage.
900 *
901 * Our current strategy is to assume a compression ratio of 16:1 and
902 * allocate buffers of that size, rounded up to the nearest power of 2
903 * (because computers like round numbers). That ratio is greater than what
904 * most inputs achieve. This is by design: we don't want to over-allocate.
905 * But we don't want to under-allocate and lead to too many buffers either.
906 */
907
908 state->destCount = 1;
909
910 state->destBuffers = calloc(1, sizeof(DestBuffer));
911 if (NULL == state->destBuffers) {
912 state->error = WorkerError_no_memory;
913 return;
914 }
915
916 destBuffer = &state->destBuffers[state->destCount - 1];
917
918 /*
919 * Rather than track bounds and grow the segments buffer, allocate space
920 * to hold remaining items then truncate when we're done with it.
921 */
922 destBuffer->segments = calloc(remainingItems, sizeof(BufferSegment));
923 if (NULL == destBuffer->segments) {
924 state->error = WorkerError_no_memory;
925 return;
926 }
927
928 destBuffer->segmentsSize = remainingItems;
929
930 allocationSize = roundpow2(state->totalSourceSize >> 4);
931
932 /* If the maximum size of the output is larger than that, round up. */
933 boundSize = ZSTD_compressBound(sources[inputOffset].sourceSize);
934
935 if (boundSize > allocationSize) {
936 allocationSize = roundpow2(boundSize);
937 }
938
939 destBuffer->dest = malloc(allocationSize);
940 if (NULL == destBuffer->dest) {
941 state->error = WorkerError_no_memory;
942 return;
943 }
944
945 destBuffer->destSize = allocationSize;
946
947 for (inputOffset = state->startOffset; inputOffset <= state->endOffset; inputOffset++) {
948 void* source = sources[inputOffset].sourceData;
949 size_t sourceSize = sources[inputOffset].sourceSize;
950 size_t destAvailable;
951 void* dest;
952
953 destAvailable = destBuffer->destSize - destOffset;
954 boundSize = ZSTD_compressBound(sourceSize);
955
956 /*
957 * Not enough space in current buffer to hold largest compressed output.
958 * So allocate and switch to a new output buffer.
959 */
960 if (boundSize > destAvailable) {
961 /*
962 * The downsizing of the existing buffer is optional. It should be cheap
963 * (unlike growing). So we just do it.
964 */
965 if (destAvailable) {
966 newDest = realloc(destBuffer->dest, destOffset);
967 if (NULL == newDest) {
968 state->error = WorkerError_no_memory;
969 return;
970 }
971
972 destBuffer->dest = newDest;
973 destBuffer->destSize = destOffset;
974 }
975
976 /* Truncate segments buffer. */
977 newDest = realloc(destBuffer->segments,
978 (inputOffset - currentBufferStartOffset + 1) * sizeof(BufferSegment));
979 if (NULL == newDest) {
980 state->error = WorkerError_no_memory;
981 return;
982 }
983
984 destBuffer->segments = newDest;
985 destBuffer->segmentsSize = inputOffset - currentBufferStartOffset;
986
987 /* Grow space for new struct. */
988 /* TODO consider over-allocating so we don't do this every time. */
989 newDest = realloc(state->destBuffers, (state->destCount + 1) * sizeof(DestBuffer));
990 if (NULL == newDest) {
991 state->error = WorkerError_no_memory;
992 return;
993 }
994
995 state->destBuffers = newDest;
996 state->destCount++;
997
998 destBuffer = &state->destBuffers[state->destCount - 1];
999
1000 /* Don't take any chances with non-NULL pointers. */
1001 memset(destBuffer, 0, sizeof(DestBuffer));
1002
1003 /**
1004 * We could dynamically update allocation size based on work done so far.
1005 * For now, keep is simple.
1006 */
1007 allocationSize = roundpow2(state->totalSourceSize >> 4);
1008
1009 if (boundSize > allocationSize) {
1010 allocationSize = roundpow2(boundSize);
1011 }
1012
1013 destBuffer->dest = malloc(allocationSize);
1014 if (NULL == destBuffer->dest) {
1015 state->error = WorkerError_no_memory;
1016 return;
1017 }
1018
1019 destBuffer->destSize = allocationSize;
1020 destAvailable = allocationSize;
1021 destOffset = 0;
1022
1023 destBuffer->segments = calloc(remainingItems, sizeof(BufferSegment));
1024 if (NULL == destBuffer->segments) {
1025 state->error = WorkerError_no_memory;
1026 return;
1027 }
1028
1029 destBuffer->segmentsSize = remainingItems;
1030 currentBufferStartOffset = inputOffset;
1031 }
1032
1033 dest = (char*)destBuffer->dest + destOffset;
1034
1035 if (state->cdict) {
1036 zresult = ZSTD_compress_usingCDict(state->cctx, dest, destAvailable,
1037 source, sourceSize, state->cdict);
1038 }
1039 else {
1040 if (!state->cParams) {
1041 zparams.cParams = ZSTD_getCParams(state->cLevel, sourceSize, 0);
1042 }
1043
1044 zresult = ZSTD_compress_advanced(state->cctx, dest, destAvailable,
1045 source, sourceSize, NULL, 0, zparams);
1046 }
1047
1048 if (ZSTD_isError(zresult)) {
1049 state->error = WorkerError_zstd;
1050 state->zresult = zresult;
1051 state->errorOffset = inputOffset;
1052 break;
1053 }
1054
1055 destBuffer->segments[inputOffset - currentBufferStartOffset].offset = destOffset;
1056 destBuffer->segments[inputOffset - currentBufferStartOffset].length = zresult;
1057
1058 destOffset += zresult;
1059 remainingItems--;
1060 }
1061
1062 if (destBuffer->destSize > destOffset) {
1063 newDest = realloc(destBuffer->dest, destOffset);
1064 if (NULL == newDest) {
1065 state->error = WorkerError_no_memory;
1066 return;
1067 }
1068
1069 destBuffer->dest = newDest;
1070 destBuffer->destSize = destOffset;
1071 }
1072 }
1073
1074 ZstdBufferWithSegmentsCollection* compress_from_datasources(ZstdCompressor* compressor,
1075 DataSources* sources, unsigned int threadCount) {
1076 ZSTD_parameters zparams;
1077 unsigned long long bytesPerWorker;
1078 POOL_ctx* pool = NULL;
1079 WorkerState* workerStates = NULL;
1080 Py_ssize_t i;
1081 unsigned long long workerBytes = 0;
1082 Py_ssize_t workerStartOffset = 0;
1083 size_t currentThread = 0;
1084 int errored = 0;
1085 Py_ssize_t segmentsCount = 0;
1086 Py_ssize_t segmentIndex;
1087 PyObject* segmentsArg = NULL;
1088 ZstdBufferWithSegments* buffer;
1089 ZstdBufferWithSegmentsCollection* result = NULL;
1090
1091 assert(sources->sourcesSize > 0);
1092 assert(sources->totalSourceSize > 0);
1093 assert(threadCount >= 1);
1094
1095 /* More threads than inputs makes no sense. */
1096 threadCount = sources->sourcesSize < threadCount ? (unsigned int)sources->sourcesSize
1097 : threadCount;
1098
1099 /* TODO lower thread count when input size is too small and threads would add
1100 overhead. */
1101
1102 /*
1103 * When dictionaries are used, parameters are derived from the size of the
1104 * first element.
1105 *
1106 * TODO come up with a better mechanism.
1107 */
1108 memset(&zparams, 0, sizeof(zparams));
1109 if (compressor->cparams) {
1110 ztopy_compression_parameters(compressor->cparams, &zparams.cParams);
1111 }
1112 else {
1113 zparams.cParams = ZSTD_getCParams(compressor->compressionLevel,
1114 sources->sources[0].sourceSize,
1115 compressor->dict ? compressor->dict->dictSize : 0);
1116 }
1117
1118 zparams.fParams = compressor->fparams;
1119
1120 if (0 != populate_cdict(compressor, &zparams)) {
1121 return NULL;
1122 }
1123
1124 workerStates = PyMem_Malloc(threadCount * sizeof(WorkerState));
1125 if (NULL == workerStates) {
1126 PyErr_NoMemory();
1127 goto finally;
1128 }
1129
1130 memset(workerStates, 0, threadCount * sizeof(WorkerState));
1131
1132 if (threadCount > 1) {
1133 pool = POOL_create(threadCount, 1);
1134 if (NULL == pool) {
1135 PyErr_SetString(ZstdError, "could not initialize zstd thread pool");
1136 goto finally;
1137 }
1138 }
1139
1140 bytesPerWorker = sources->totalSourceSize / threadCount;
1141
1142 for (i = 0; i < threadCount; i++) {
1143 workerStates[i].cctx = ZSTD_createCCtx();
1144 if (!workerStates[i].cctx) {
1145 PyErr_NoMemory();
1146 goto finally;
1147 }
1148
1149 workerStates[i].cdict = compressor->cdict;
1150 workerStates[i].cLevel = compressor->compressionLevel;
1151 workerStates[i].cParams = compressor->cparams;
1152 workerStates[i].fParams = compressor->fparams;
1153
1154 workerStates[i].sources = sources->sources;
1155 workerStates[i].sourcesSize = sources->sourcesSize;
1156 }
1157
1158 Py_BEGIN_ALLOW_THREADS
1159 for (i = 0; i < sources->sourcesSize; i++) {
1160 workerBytes += sources->sources[i].sourceSize;
1161
1162 /*
1163 * The last worker/thread needs to handle all remaining work. Don't
1164 * trigger it prematurely. Defer to the block outside of the loop
1165 * to run the last worker/thread. But do still process this loop
1166 * so workerBytes is correct.
1167 */
1168 if (currentThread == threadCount - 1) {
1169 continue;
1170 }
1171
1172 if (workerBytes >= bytesPerWorker) {
1173 assert(currentThread < threadCount);
1174 workerStates[currentThread].totalSourceSize = workerBytes;
1175 workerStates[currentThread].startOffset = workerStartOffset;
1176 workerStates[currentThread].endOffset = i;
1177
1178 if (threadCount > 1) {
1179 POOL_add(pool, (POOL_function)compress_worker, &workerStates[currentThread]);
1180 }
1181 else {
1182 compress_worker(&workerStates[currentThread]);
1183 }
1184
1185 currentThread++;
1186 workerStartOffset = i + 1;
1187 workerBytes = 0;
1188 }
1189 }
1190
1191 if (workerBytes) {
1192 assert(currentThread < threadCount);
1193 workerStates[currentThread].totalSourceSize = workerBytes;
1194 workerStates[currentThread].startOffset = workerStartOffset;
1195 workerStates[currentThread].endOffset = sources->sourcesSize - 1;
1196
1197 if (threadCount > 1) {
1198 POOL_add(pool, (POOL_function)compress_worker, &workerStates[currentThread]);
1199 }
1200 else {
1201 compress_worker(&workerStates[currentThread]);
1202 }
1203 }
1204
1205 if (threadCount > 1) {
1206 POOL_free(pool);
1207 pool = NULL;
1208 }
1209
1210 Py_END_ALLOW_THREADS
1211
1212 for (i = 0; i < threadCount; i++) {
1213 switch (workerStates[i].error) {
1214 case WorkerError_no_memory:
1215 PyErr_NoMemory();
1216 errored = 1;
1217 break;
1218
1219 case WorkerError_zstd:
1220 PyErr_Format(ZstdError, "error compressing item %zd: %s",
1221 workerStates[i].errorOffset, ZSTD_getErrorName(workerStates[i].zresult));
1222 errored = 1;
1223 break;
1224 default:
1225 ;
1226 }
1227
1228 if (errored) {
1229 break;
1230 }
1231
1232 }
1233
1234 if (errored) {
1235 goto finally;
1236 }
1237
1238 segmentsCount = 0;
1239 for (i = 0; i < threadCount; i++) {
1240 WorkerState* state = &workerStates[i];
1241 segmentsCount += state->destCount;
1242 }
1243
1244 segmentsArg = PyTuple_New(segmentsCount);
1245 if (NULL == segmentsArg) {
1246 goto finally;
1247 }
1248
1249 segmentIndex = 0;
1250
1251 for (i = 0; i < threadCount; i++) {
1252 Py_ssize_t j;
1253 WorkerState* state = &workerStates[i];
1254
1255 for (j = 0; j < state->destCount; j++) {
1256 DestBuffer* destBuffer = &state->destBuffers[j];
1257 buffer = BufferWithSegments_FromMemory(destBuffer->dest, destBuffer->destSize,
1258 destBuffer->segments, destBuffer->segmentsSize);
1259
1260 if (NULL == buffer) {
1261 goto finally;
1262 }
1263
1264 /* Tell instance to use free() instsead of PyMem_Free(). */
1265 buffer->useFree = 1;
1266
1267 /*
1268 * BufferWithSegments_FromMemory takes ownership of the backing memory.
1269 * Unset it here so it doesn't get freed below.
1270 */
1271 destBuffer->dest = NULL;
1272 destBuffer->segments = NULL;
1273
1274 PyTuple_SET_ITEM(segmentsArg, segmentIndex++, (PyObject*)buffer);
1275 }
1276 }
1277
1278 result = (ZstdBufferWithSegmentsCollection*)PyObject_CallObject(
1279 (PyObject*)&ZstdBufferWithSegmentsCollectionType, segmentsArg);
1280
1281 finally:
1282 Py_CLEAR(segmentsArg);
1283
1284 if (pool) {
1285 POOL_free(pool);
1286 }
1287
1288 if (workerStates) {
1289 Py_ssize_t j;
1290
1291 for (i = 0; i < threadCount; i++) {
1292 WorkerState state = workerStates[i];
1293
1294 if (state.cctx) {
1295 ZSTD_freeCCtx(state.cctx);
1296 }
1297
1298 /* malloc() is used in worker thread. */
1299
1300 for (j = 0; j < state.destCount; j++) {
1301 if (state.destBuffers) {
1302 free(state.destBuffers[j].dest);
1303 free(state.destBuffers[j].segments);
1304 }
1305 }
1306
1307
1308 free(state.destBuffers);
1309 }
1310
1311 PyMem_Free(workerStates);
1312 }
1313
1314 return result;
1315 }
1316
1317 PyDoc_STRVAR(ZstdCompressor_multi_compress_to_buffer__doc__,
1318 "Compress multiple pieces of data as a single operation\n"
1319 "\n"
1320 "Receives a ``BufferWithSegmentsCollection``, a ``BufferWithSegments``, or\n"
1321 "a list of bytes like objects holding data to compress.\n"
1322 "\n"
1323 "Returns a ``BufferWithSegmentsCollection`` holding compressed data.\n"
1324 "\n"
1325 "This function is optimized to perform multiple compression operations as\n"
1326 "as possible with as little overhead as possbile.\n"
1327 );
1328
1329 static ZstdBufferWithSegmentsCollection* ZstdCompressor_multi_compress_to_buffer(ZstdCompressor* self, PyObject* args, PyObject* kwargs) {
1330 static char* kwlist[] = {
1331 "data",
1332 "threads",
1333 NULL
1334 };
1335
1336 PyObject* data;
1337 int threads = 0;
1338 Py_buffer* dataBuffers = NULL;
1339 DataSources sources;
1340 Py_ssize_t i;
1341 Py_ssize_t sourceCount = 0;
1342 ZstdBufferWithSegmentsCollection* result = NULL;
1343
1344 if (self->mtcctx) {
1345 PyErr_SetString(ZstdError,
1346 "function cannot be called on ZstdCompressor configured for multi-threaded compression");
1347 return NULL;
1348 }
1349
1350 memset(&sources, 0, sizeof(sources));
1351
1352 if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|i:multi_compress_to_buffer", kwlist,
1353 &data, &threads)) {
1354 return NULL;
1355 }
1356
1357 if (threads < 0) {
1358 threads = cpu_count();
1359 }
1360
1361 if (threads < 2) {
1362 threads = 1;
1363 }
1364
1365 if (PyObject_TypeCheck(data, &ZstdBufferWithSegmentsType)) {
1366 ZstdBufferWithSegments* buffer = (ZstdBufferWithSegments*)data;
1367
1368 sources.sources = PyMem_Malloc(buffer->segmentCount * sizeof(DataSource));
1369 if (NULL == sources.sources) {
1370 PyErr_NoMemory();
1371 goto finally;
1372 }
1373
1374 for (i = 0; i < buffer->segmentCount; i++) {
1375 sources.sources[i].sourceData = (char*)buffer->data + buffer->segments[i].offset;
1376 sources.sources[i].sourceSize = buffer->segments[i].length;
1377 sources.totalSourceSize += buffer->segments[i].length;
1378 }
1379
1380 sources.sourcesSize = buffer->segmentCount;
1381 }
1382 else if (PyObject_TypeCheck(data, &ZstdBufferWithSegmentsCollectionType)) {
1383 Py_ssize_t j;
1384 Py_ssize_t offset = 0;
1385 ZstdBufferWithSegments* buffer;
1386 ZstdBufferWithSegmentsCollection* collection = (ZstdBufferWithSegmentsCollection*)data;
1387
1388 sourceCount = BufferWithSegmentsCollection_length(collection);
1389
1390 sources.sources = PyMem_Malloc(sourceCount * sizeof(DataSource));
1391 if (NULL == sources.sources) {
1392 PyErr_NoMemory();
1393 goto finally;
1394 }
1395
1396 for (i = 0; i < collection->bufferCount; i++) {
1397 buffer = collection->buffers[i];
1398
1399 for (j = 0; j < buffer->segmentCount; j++) {
1400 sources.sources[offset].sourceData = (char*)buffer->data + buffer->segments[j].offset;
1401 sources.sources[offset].sourceSize = buffer->segments[j].length;
1402 sources.totalSourceSize += buffer->segments[j].length;
1403
1404 offset++;
1405 }
1406 }
1407
1408 sources.sourcesSize = sourceCount;
1409 }
1410 else if (PyList_Check(data)) {
1411 sourceCount = PyList_GET_SIZE(data);
1412
1413 sources.sources = PyMem_Malloc(sourceCount * sizeof(DataSource));
1414 if (NULL == sources.sources) {
1415 PyErr_NoMemory();
1416 goto finally;
1417 }
1418
1419 /*
1420 * It isn't clear whether the address referred to by Py_buffer.buf
1421 * is still valid after PyBuffer_Release. We we hold a reference to all
1422 * Py_buffer instances for the duration of the operation.
1423 */
1424 dataBuffers = PyMem_Malloc(sourceCount * sizeof(Py_buffer));
1425 if (NULL == dataBuffers) {
1426 PyErr_NoMemory();
1427 goto finally;
1428 }
1429
1430 memset(dataBuffers, 0, sourceCount * sizeof(Py_buffer));
1431
1432 for (i = 0; i < sourceCount; i++) {
1433 if (0 != PyObject_GetBuffer(PyList_GET_ITEM(data, i),
1434 &dataBuffers[i], PyBUF_CONTIG_RO)) {
1435 PyErr_Clear();
1436 PyErr_Format(PyExc_TypeError, "item %zd not a bytes like object", i);
1437 goto finally;
1438 }
1439
1440 sources.sources[i].sourceData = dataBuffers[i].buf;
1441 sources.sources[i].sourceSize = dataBuffers[i].len;
1442 sources.totalSourceSize += dataBuffers[i].len;
1443 }
1444
1445 sources.sourcesSize = sourceCount;
1446 }
1447 else {
1448 PyErr_SetString(PyExc_TypeError, "argument must be list of BufferWithSegments");
1449 goto finally;
1450 }
1451
1452 if (0 == sources.sourcesSize) {
1453 PyErr_SetString(PyExc_ValueError, "no source elements found");
1454 goto finally;
1455 }
1456
1457 if (0 == sources.totalSourceSize) {
1458 PyErr_SetString(PyExc_ValueError, "source elements are empty");
1459 goto finally;
1460 }
1461
1462 result = compress_from_datasources(self, &sources, threads);
1463
1464 finally:
1465 PyMem_Free(sources.sources);
1466
1467 if (dataBuffers) {
1468 for (i = 0; i < sourceCount; i++) {
1469 PyBuffer_Release(&dataBuffers[i]);
1470 }
1471
1472 PyMem_Free(dataBuffers);
1473 }
723 1474
724 1475 return result;
725 1476 }
@@ -735,6 +1486,8 b' static PyMethodDef ZstdCompressor_method'
735 1486 METH_VARARGS | METH_KEYWORDS, ZstdCompressor_read_from__doc__ },
736 1487 { "write_to", (PyCFunction)ZstdCompressor_write_to,
737 1488 METH_VARARGS | METH_KEYWORDS, ZstdCompressor_write_to___doc__ },
1489 { "multi_compress_to_buffer", (PyCFunction)ZstdCompressor_multi_compress_to_buffer,
1490 METH_VARARGS | METH_KEYWORDS, ZstdCompressor_multi_compress_to_buffer__doc__ },
738 1491 { NULL, NULL }
739 1492 };
740 1493
@@ -27,11 +27,6 b' static void ZstdCompressorIterator_deall'
27 27 self->buffer = NULL;
28 28 }
29 29
30 if (self->cstream) {
31 ZSTD_freeCStream(self->cstream);
32 self->cstream = NULL;
33 }
34
35 30 if (self->output.dst) {
36 31 PyMem_Free(self->output.dst);
37 32 self->output.dst = NULL;
@@ -63,7 +58,14 b' feedcompressor:'
63 58 /* If we have data left in the input, consume it. */
64 59 if (self->input.pos < self->input.size) {
65 60 Py_BEGIN_ALLOW_THREADS
66 zresult = ZSTD_compressStream(self->cstream, &self->output, &self->input);
61 if (self->compressor->mtcctx) {
62 zresult = ZSTDMT_compressStream(self->compressor->mtcctx,
63 &self->output, &self->input);
64 }
65 else {
66 zresult = ZSTD_compressStream(self->compressor->cstream, &self->output,
67 &self->input);
68 }
67 69 Py_END_ALLOW_THREADS
68 70
69 71 /* Release the Python object holding the input buffer. */
@@ -128,7 +130,12 b' feedcompressor:'
128 130
129 131 /* EOF */
130 132 if (0 == readSize) {
131 zresult = ZSTD_endStream(self->cstream, &self->output);
133 if (self->compressor->mtcctx) {
134 zresult = ZSTDMT_endStream(self->compressor->mtcctx, &self->output);
135 }
136 else {
137 zresult = ZSTD_endStream(self->compressor->cstream, &self->output);
138 }
132 139 if (ZSTD_isError(zresult)) {
133 140 PyErr_Format(ZstdError, "error ending compression stream: %s",
134 141 ZSTD_getErrorName(zresult));
@@ -152,7 +159,13 b' feedcompressor:'
152 159 self->input.pos = 0;
153 160
154 161 Py_BEGIN_ALLOW_THREADS
155 zresult = ZSTD_compressStream(self->cstream, &self->output, &self->input);
162 if (self->compressor->mtcctx) {
163 zresult = ZSTDMT_compressStream(self->compressor->mtcctx, &self->output,
164 &self->input);
165 }
166 else {
167 zresult = ZSTD_compressStream(self->compressor->cstream, &self->output, &self->input);
168 }
156 169 Py_END_ALLOW_THREADS
157 170
158 171 /* The input buffer currently points to memory managed by Python
@@ -41,7 +41,7 b' void constants_module_init(PyObject* mod'
41 41 PyTuple_SetItem(zstdVersion, 0, PyLong_FromLong(ZSTD_VERSION_MAJOR));
42 42 PyTuple_SetItem(zstdVersion, 1, PyLong_FromLong(ZSTD_VERSION_MINOR));
43 43 PyTuple_SetItem(zstdVersion, 2, PyLong_FromLong(ZSTD_VERSION_RELEASE));
44 Py_IncRef(zstdVersion);
44 Py_INCREF(zstdVersion);
45 45 PyModule_AddObject(mod, "ZSTD_VERSION", zstdVersion);
46 46
47 47 frameHeader = PyBytes_FromStringAndSize(frame_header, sizeof(frame_header));
@@ -18,11 +18,6 b' static void ZstdDecompressionWriter_deal'
18 18 Py_XDECREF(self->decompressor);
19 19 Py_XDECREF(self->writer);
20 20
21 if (self->dstream) {
22 ZSTD_freeDStream(self->dstream);
23 self->dstream = NULL;
24 }
25
26 21 PyObject_Del(self);
27 22 }
28 23
@@ -32,8 +27,7 b' static PyObject* ZstdDecompressionWriter'
32 27 return NULL;
33 28 }
34 29
35 self->dstream = DStream_from_ZstdDecompressor(self->decompressor);
36 if (!self->dstream) {
30 if (0 != init_dstream(self->decompressor)) {
37 31 return NULL;
38 32 }
39 33
@@ -46,22 +40,17 b' static PyObject* ZstdDecompressionWriter'
46 40 static PyObject* ZstdDecompressionWriter_exit(ZstdDecompressionWriter* self, PyObject* args) {
47 41 self->entered = 0;
48 42
49 if (self->dstream) {
50 ZSTD_freeDStream(self->dstream);
51 self->dstream = NULL;
52 }
53
54 43 Py_RETURN_FALSE;
55 44 }
56 45
57 46 static PyObject* ZstdDecompressionWriter_memory_size(ZstdDecompressionWriter* self) {
58 if (!self->dstream) {
47 if (!self->decompressor->dstream) {
59 48 PyErr_SetString(ZstdError, "cannot determine size of inactive decompressor; "
60 49 "call when context manager is active");
61 50 return NULL;
62 51 }
63 52
64 return PyLong_FromSize_t(ZSTD_sizeof_DStream(self->dstream));
53 return PyLong_FromSize_t(ZSTD_sizeof_DStream(self->decompressor->dstream));
65 54 }
66 55
67 56 static PyObject* ZstdDecompressionWriter_write(ZstdDecompressionWriter* self, PyObject* args) {
@@ -86,6 +75,8 b' static PyObject* ZstdDecompressionWriter'
86 75 return NULL;
87 76 }
88 77
78 assert(self->decompressor->dstream);
79
89 80 output.dst = PyMem_Malloc(self->outSize);
90 81 if (!output.dst) {
91 82 return PyErr_NoMemory();
@@ -99,7 +90,7 b' static PyObject* ZstdDecompressionWriter'
99 90
100 91 while ((ssize_t)input.pos < sourceSize) {
101 92 Py_BEGIN_ALLOW_THREADS
102 zresult = ZSTD_decompressStream(self->dstream, &output, &input);
93 zresult = ZSTD_decompressStream(self->decompressor->dstream, &output, &input);
103 94 Py_END_ALLOW_THREADS
104 95
105 96 if (ZSTD_isError(zresult)) {
@@ -15,11 +15,6 b' PyDoc_STRVAR(DecompressionObj__doc__,'
15 15 );
16 16
17 17 static void DecompressionObj_dealloc(ZstdDecompressionObj* self) {
18 if (self->dstream) {
19 ZSTD_freeDStream(self->dstream);
20 self->dstream = NULL;
21 }
22
23 18 Py_XDECREF(self->decompressor);
24 19
25 20 PyObject_Del(self);
@@ -35,6 +30,9 b' static PyObject* DecompressionObj_decomp'
35 30 PyObject* result = NULL;
36 31 Py_ssize_t resultSize = 0;
37 32
33 /* Constructor should ensure stream is populated. */
34 assert(self->decompressor->dstream);
35
38 36 if (self->finished) {
39 37 PyErr_SetString(ZstdError, "cannot use a decompressobj multiple times");
40 38 return NULL;
@@ -64,7 +62,7 b' static PyObject* DecompressionObj_decomp'
64 62 /* Read input until exhausted. */
65 63 while (input.pos < input.size) {
66 64 Py_BEGIN_ALLOW_THREADS
67 zresult = ZSTD_decompressStream(self->dstream, &output, &input);
65 zresult = ZSTD_decompressStream(self->decompressor->dstream, &output, &input);
68 66 Py_END_ALLOW_THREADS
69 67
70 68 if (ZSTD_isError(zresult)) {
@@ -106,8 +104,7 b' static PyObject* DecompressionObj_decomp'
106 104 goto finally;
107 105
108 106 except:
109 Py_DecRef(result);
110 result = NULL;
107 Py_CLEAR(result);
111 108
112 109 finally:
113 110 PyMem_Free(output.dst);
This diff has been collapsed as it changes many lines, (859 lines changed) Show them Hide them
@@ -7,19 +7,37 b''
7 7 */
8 8
9 9 #include "python-zstandard.h"
10 #include "pool.h"
10 11
11 12 extern PyObject* ZstdError;
12 13
13 ZSTD_DStream* DStream_from_ZstdDecompressor(ZstdDecompressor* decompressor) {
14 ZSTD_DStream* dstream;
14 /**
15 * Ensure the ZSTD_DStream on a ZstdDecompressor is initialized and reset.
16 *
17 * This should be called before starting a decompression operation with a
18 * ZSTD_DStream on a ZstdDecompressor.
19 */
20 int init_dstream(ZstdDecompressor* decompressor) {
15 21 void* dictData = NULL;
16 22 size_t dictSize = 0;
17 23 size_t zresult;
18 24
19 dstream = ZSTD_createDStream();
20 if (!dstream) {
25 /* Simple case of dstream already exists. Just reset it. */
26 if (decompressor->dstream) {
27 zresult = ZSTD_resetDStream(decompressor->dstream);
28 if (ZSTD_isError(zresult)) {
29 PyErr_Format(ZstdError, "could not reset DStream: %s",
30 ZSTD_getErrorName(zresult));
31 return -1;
32 }
33
34 return 0;
35 }
36
37 decompressor->dstream = ZSTD_createDStream();
38 if (!decompressor->dstream) {
21 39 PyErr_SetString(ZstdError, "could not create DStream");
22 return NULL;
40 return -1;
23 41 }
24 42
25 43 if (decompressor->dict) {
@@ -28,19 +46,23 b' ZSTD_DStream* DStream_from_ZstdDecompres'
28 46 }
29 47
30 48 if (dictData) {
31 zresult = ZSTD_initDStream_usingDict(dstream, dictData, dictSize);
49 zresult = ZSTD_initDStream_usingDict(decompressor->dstream, dictData, dictSize);
32 50 }
33 51 else {
34 zresult = ZSTD_initDStream(dstream);
52 zresult = ZSTD_initDStream(decompressor->dstream);
35 53 }
36 54
37 55 if (ZSTD_isError(zresult)) {
56 /* Don't leave a reference to an invalid object. */
57 ZSTD_freeDStream(decompressor->dstream);
58 decompressor->dstream = NULL;
59
38 60 PyErr_Format(ZstdError, "could not initialize DStream: %s",
39 61 ZSTD_getErrorName(zresult));
40 return NULL;
62 return -1;
41 63 }
42 64
43 return dstream;
65 return 0;
44 66 }
45 67
46 68 PyDoc_STRVAR(Decompressor__doc__,
@@ -93,17 +115,23 b' except:'
93 115 }
94 116
95 117 static void Decompressor_dealloc(ZstdDecompressor* self) {
96 if (self->dctx) {
97 ZSTD_freeDCtx(self->dctx);
98 }
99
100 Py_XDECREF(self->dict);
118 Py_CLEAR(self->dict);
101 119
102 120 if (self->ddict) {
103 121 ZSTD_freeDDict(self->ddict);
104 122 self->ddict = NULL;
105 123 }
106 124
125 if (self->dstream) {
126 ZSTD_freeDStream(self->dstream);
127 self->dstream = NULL;
128 }
129
130 if (self->dctx) {
131 ZSTD_freeDCtx(self->dctx);
132 self->dctx = NULL;
133 }
134
107 135 PyObject_Del(self);
108 136 }
109 137
@@ -132,7 +160,6 b' static PyObject* Decompressor_copy_strea'
132 160 PyObject* dest;
133 161 size_t inSize = ZSTD_DStreamInSize();
134 162 size_t outSize = ZSTD_DStreamOutSize();
135 ZSTD_DStream* dstream;
136 163 ZSTD_inBuffer input;
137 164 ZSTD_outBuffer output;
138 165 Py_ssize_t totalRead = 0;
@@ -164,8 +191,7 b' static PyObject* Decompressor_copy_strea'
164 191 /* Prevent free on uninitialized memory in finally. */
165 192 output.dst = NULL;
166 193
167 dstream = DStream_from_ZstdDecompressor(self);
168 if (!dstream) {
194 if (0 != init_dstream(self)) {
169 195 res = NULL;
170 196 goto finally;
171 197 }
@@ -203,7 +229,7 b' static PyObject* Decompressor_copy_strea'
203 229
204 230 while (input.pos < input.size) {
205 231 Py_BEGIN_ALLOW_THREADS
206 zresult = ZSTD_decompressStream(dstream, &output, &input);
232 zresult = ZSTD_decompressStream(self->dstream, &output, &input);
207 233 Py_END_ALLOW_THREADS
208 234
209 235 if (ZSTD_isError(zresult)) {
@@ -230,24 +256,17 b' static PyObject* Decompressor_copy_strea'
230 256
231 257 /* Source stream is exhausted. Finish up. */
232 258
233 ZSTD_freeDStream(dstream);
234 dstream = NULL;
235
236 259 totalReadPy = PyLong_FromSsize_t(totalRead);
237 260 totalWritePy = PyLong_FromSsize_t(totalWrite);
238 261 res = PyTuple_Pack(2, totalReadPy, totalWritePy);
239 Py_DecRef(totalReadPy);
240 Py_DecRef(totalWritePy);
262 Py_DECREF(totalReadPy);
263 Py_DECREF(totalWritePy);
241 264
242 265 finally:
243 266 if (output.dst) {
244 267 PyMem_Free(output.dst);
245 268 }
246 269
247 if (dstream) {
248 ZSTD_freeDStream(dstream);
249 }
250
251 270 return res;
252 271 }
253 272
@@ -352,18 +371,18 b' PyObject* Decompressor_decompress(ZstdDe'
352 371
353 372 if (ZSTD_isError(zresult)) {
354 373 PyErr_Format(ZstdError, "decompression error: %s", ZSTD_getErrorName(zresult));
355 Py_DecRef(result);
374 Py_DECREF(result);
356 375 return NULL;
357 376 }
358 377 else if (decompressedSize && zresult != decompressedSize) {
359 378 PyErr_Format(ZstdError, "decompression error: decompressed %zu bytes; expected %llu",
360 379 zresult, decompressedSize);
361 Py_DecRef(result);
380 Py_DECREF(result);
362 381 return NULL;
363 382 }
364 383 else if (zresult < destCapacity) {
365 384 if (_PyBytes_Resize(&result, zresult)) {
366 Py_DecRef(result);
385 Py_DECREF(result);
367 386 return NULL;
368 387 }
369 388 }
@@ -382,22 +401,19 b' PyDoc_STRVAR(Decompressor_decompressobj_'
382 401 );
383 402
384 403 static ZstdDecompressionObj* Decompressor_decompressobj(ZstdDecompressor* self) {
385 ZstdDecompressionObj* result = PyObject_New(ZstdDecompressionObj, &ZstdDecompressionObjType);
404 ZstdDecompressionObj* result = (ZstdDecompressionObj*)PyObject_CallObject((PyObject*)&ZstdDecompressionObjType, NULL);
386 405 if (!result) {
387 406 return NULL;
388 407 }
389 408
390 result->dstream = DStream_from_ZstdDecompressor(self);
391 if (!result->dstream) {
392 Py_DecRef((PyObject*)result);
409 if (0 != init_dstream(self)) {
410 Py_DECREF(result);
393 411 return NULL;
394 412 }
395 413
396 414 result->decompressor = self;
397 415 Py_INCREF(result->decompressor);
398 416
399 result->finished = 0;
400
401 417 return result;
402 418 }
403 419
@@ -447,18 +463,11 b' static ZstdDecompressorIterator* Decompr'
447 463 return NULL;
448 464 }
449 465
450 result = PyObject_New(ZstdDecompressorIterator, &ZstdDecompressorIteratorType);
466 result = (ZstdDecompressorIterator*)PyObject_CallObject((PyObject*)&ZstdDecompressorIteratorType, NULL);
451 467 if (!result) {
452 468 return NULL;
453 469 }
454 470
455 result->decompressor = NULL;
456 result->reader = NULL;
457 result->buffer = NULL;
458 result->dstream = NULL;
459 result->input.src = NULL;
460 result->output.dst = NULL;
461
462 471 if (PyObject_HasAttrString(reader, "read")) {
463 472 result->reader = reader;
464 473 Py_INCREF(result->reader);
@@ -475,8 +484,6 b' static ZstdDecompressorIterator* Decompr'
475 484 if (0 != PyObject_GetBuffer(reader, result->buffer, PyBUF_CONTIG_RO)) {
476 485 goto except;
477 486 }
478
479 result->bufferOffset = 0;
480 487 }
481 488 else {
482 489 PyErr_SetString(PyExc_ValueError,
@@ -491,8 +498,7 b' static ZstdDecompressorIterator* Decompr'
491 498 result->outSize = outSize;
492 499 result->skipBytes = skipBytes;
493 500
494 result->dstream = DStream_from_ZstdDecompressor(self);
495 if (!result->dstream) {
501 if (0 != init_dstream(self)) {
496 502 goto except;
497 503 }
498 504
@@ -501,16 +507,6 b' static ZstdDecompressorIterator* Decompr'
501 507 PyErr_NoMemory();
502 508 goto except;
503 509 }
504 result->input.size = 0;
505 result->input.pos = 0;
506
507 result->output.dst = NULL;
508 result->output.size = 0;
509 result->output.pos = 0;
510
511 result->readCount = 0;
512 result->finishedInput = 0;
513 result->finishedOutput = 0;
514 510
515 511 goto finally;
516 512
@@ -563,7 +559,7 b' static ZstdDecompressionWriter* Decompre'
563 559 return NULL;
564 560 }
565 561
566 result = PyObject_New(ZstdDecompressionWriter, &ZstdDecompressionWriterType);
562 result = (ZstdDecompressionWriter*)PyObject_CallObject((PyObject*)&ZstdDecompressionWriterType, NULL);
567 563 if (!result) {
568 564 return NULL;
569 565 }
@@ -576,9 +572,6 b' static ZstdDecompressionWriter* Decompre'
576 572
577 573 result->outSize = outSize;
578 574
579 result->entered = 0;
580 result->dstream = NULL;
581
582 575 return result;
583 576 }
584 577
@@ -776,6 +769,746 b' finally:'
776 769 return result;
777 770 }
778 771
772 typedef struct {
773 void* sourceData;
774 size_t sourceSize;
775 unsigned long long destSize;
776 } FramePointer;
777
778 typedef struct {
779 FramePointer* frames;
780 Py_ssize_t framesSize;
781 unsigned long long compressedSize;
782 } FrameSources;
783
784 typedef struct {
785 void* dest;
786 Py_ssize_t destSize;
787 BufferSegment* segments;
788 Py_ssize_t segmentsSize;
789 } DestBuffer;
790
791 typedef enum {
792 WorkerError_none = 0,
793 WorkerError_zstd = 1,
794 WorkerError_memory = 2,
795 WorkerError_sizeMismatch = 3,
796 WorkerError_unknownSize = 4,
797 } WorkerError;
798
799 typedef struct {
800 /* Source records and length */
801 FramePointer* framePointers;
802 /* Which records to process. */
803 Py_ssize_t startOffset;
804 Py_ssize_t endOffset;
805 unsigned long long totalSourceSize;
806
807 /* Compression state and settings. */
808 ZSTD_DCtx* dctx;
809 ZSTD_DDict* ddict;
810 int requireOutputSizes;
811
812 /* Output storage. */
813 DestBuffer* destBuffers;
814 Py_ssize_t destCount;
815
816 /* Item that error occurred on. */
817 Py_ssize_t errorOffset;
818 /* If an error occurred. */
819 WorkerError error;
820 /* result from zstd decompression operation */
821 size_t zresult;
822 } WorkerState;
823
824 static void decompress_worker(WorkerState* state) {
825 size_t allocationSize;
826 DestBuffer* destBuffer;
827 Py_ssize_t frameIndex;
828 Py_ssize_t localOffset = 0;
829 Py_ssize_t currentBufferStartIndex = state->startOffset;
830 Py_ssize_t remainingItems = state->endOffset - state->startOffset + 1;
831 void* tmpBuf;
832 Py_ssize_t destOffset = 0;
833 FramePointer* framePointers = state->framePointers;
834 size_t zresult;
835 unsigned long long totalOutputSize = 0;
836
837 assert(NULL == state->destBuffers);
838 assert(0 == state->destCount);
839 assert(state->endOffset - state->startOffset >= 0);
840
841 /*
842 * We need to allocate a buffer to hold decompressed data. How we do this
843 * depends on what we know about the output. The following scenarios are
844 * possible:
845 *
846 * 1. All structs defining frames declare the output size.
847 * 2. The decompressed size is embedded within the zstd frame.
848 * 3. The decompressed size is not stored anywhere.
849 *
850 * For now, we only support #1 and #2.
851 */
852
853 /* Resolve ouput segments. */
854 for (frameIndex = state->startOffset; frameIndex <= state->endOffset; frameIndex++) {
855 FramePointer* fp = &framePointers[frameIndex];
856
857 if (0 == fp->destSize) {
858 fp->destSize = ZSTD_getDecompressedSize(fp->sourceData, fp->sourceSize);
859 if (0 == fp->destSize && state->requireOutputSizes) {
860 state->error = WorkerError_unknownSize;
861 state->errorOffset = frameIndex;
862 return;
863 }
864 }
865
866 totalOutputSize += fp->destSize;
867 }
868
869 state->destBuffers = calloc(1, sizeof(DestBuffer));
870 if (NULL == state->destBuffers) {
871 state->error = WorkerError_memory;
872 return;
873 }
874
875 state->destCount = 1;
876
877 destBuffer = &state->destBuffers[state->destCount - 1];
878
879 assert(framePointers[state->startOffset].destSize > 0); /* For now. */
880
881 allocationSize = roundpow2(state->totalSourceSize);
882
883 if (framePointers[state->startOffset].destSize > allocationSize) {
884 allocationSize = roundpow2(framePointers[state->startOffset].destSize);
885 }
886
887 destBuffer->dest = malloc(allocationSize);
888 if (NULL == destBuffer->dest) {
889 state->error = WorkerError_memory;
890 return;
891 }
892
893 destBuffer->destSize = allocationSize;
894
895 destBuffer->segments = calloc(remainingItems, sizeof(BufferSegment));
896 if (NULL == destBuffer->segments) {
897 /* Caller will free state->dest as part of cleanup. */
898 state->error = WorkerError_memory;
899 return;
900 }
901
902 destBuffer->segmentsSize = remainingItems;
903
904 for (frameIndex = state->startOffset; frameIndex <= state->endOffset; frameIndex++) {
905 const void* source = framePointers[frameIndex].sourceData;
906 const size_t sourceSize = framePointers[frameIndex].sourceSize;
907 void* dest;
908 const size_t decompressedSize = framePointers[frameIndex].destSize;
909 size_t destAvailable = destBuffer->destSize - destOffset;
910
911 assert(decompressedSize > 0); /* For now. */
912
913 /*
914 * Not enough space in current buffer. Finish current before and allocate and
915 * switch to a new one.
916 */
917 if (decompressedSize > destAvailable) {
918 /*
919 * Shrinking the destination buffer is optional. But it should be cheap,
920 * so we just do it.
921 */
922 if (destAvailable) {
923 tmpBuf = realloc(destBuffer->dest, destOffset);
924 if (NULL == tmpBuf) {
925 state->error = WorkerError_memory;
926 return;
927 }
928
929 destBuffer->dest = tmpBuf;
930 destBuffer->destSize = destOffset;
931 }
932
933 /* Truncate segments buffer. */
934 tmpBuf = realloc(destBuffer->segments,
935 (frameIndex - currentBufferStartIndex) * sizeof(BufferSegment));
936 if (NULL == tmpBuf) {
937 state->error = WorkerError_memory;
938 return;
939 }
940
941 destBuffer->segments = tmpBuf;
942 destBuffer->segmentsSize = frameIndex - currentBufferStartIndex;
943
944 /* Grow space for new DestBuffer. */
945 tmpBuf = realloc(state->destBuffers, (state->destCount + 1) * sizeof(DestBuffer));
946 if (NULL == tmpBuf) {
947 state->error = WorkerError_memory;
948 return;
949 }
950
951 state->destBuffers = tmpBuf;
952 state->destCount++;
953
954 destBuffer = &state->destBuffers[state->destCount - 1];
955
956 /* Don't take any chances will non-NULL pointers. */
957 memset(destBuffer, 0, sizeof(DestBuffer));
958
959 allocationSize = roundpow2(state->totalSourceSize);
960
961 if (decompressedSize > allocationSize) {
962 allocationSize = roundpow2(decompressedSize);
963 }
964
965 destBuffer->dest = malloc(allocationSize);
966 if (NULL == destBuffer->dest) {
967 state->error = WorkerError_memory;
968 return;
969 }
970
971 destBuffer->destSize = allocationSize;
972 destAvailable = allocationSize;
973 destOffset = 0;
974 localOffset = 0;
975
976 destBuffer->segments = calloc(remainingItems, sizeof(BufferSegment));
977 if (NULL == destBuffer->segments) {
978 state->error = WorkerError_memory;
979 return;
980 }
981
982 destBuffer->segmentsSize = remainingItems;
983 currentBufferStartIndex = frameIndex;
984 }
985
986 dest = (char*)destBuffer->dest + destOffset;
987
988 if (state->ddict) {
989 zresult = ZSTD_decompress_usingDDict(state->dctx, dest, decompressedSize,
990 source, sourceSize, state->ddict);
991 }
992 else {
993 zresult = ZSTD_decompressDCtx(state->dctx, dest, decompressedSize,
994 source, sourceSize);
995 }
996
997 if (ZSTD_isError(zresult)) {
998 state->error = WorkerError_zstd;
999 state->zresult = zresult;
1000 state->errorOffset = frameIndex;
1001 return;
1002 }
1003 else if (zresult != decompressedSize) {
1004 state->error = WorkerError_sizeMismatch;
1005 state->zresult = zresult;
1006 state->errorOffset = frameIndex;
1007 return;
1008 }
1009
1010 destBuffer->segments[localOffset].offset = destOffset;
1011 destBuffer->segments[localOffset].length = decompressedSize;
1012 destOffset += zresult;
1013 localOffset++;
1014 remainingItems--;
1015 }
1016
1017 if (destBuffer->destSize > destOffset) {
1018 tmpBuf = realloc(destBuffer->dest, destOffset);
1019 if (NULL == tmpBuf) {
1020 state->error = WorkerError_memory;
1021 return;
1022 }
1023
1024 destBuffer->dest = tmpBuf;
1025 destBuffer->destSize = destOffset;
1026 }
1027 }
1028
1029 ZstdBufferWithSegmentsCollection* decompress_from_framesources(ZstdDecompressor* decompressor, FrameSources* frames,
1030 unsigned int threadCount) {
1031 void* dictData = NULL;
1032 size_t dictSize = 0;
1033 Py_ssize_t i = 0;
1034 int errored = 0;
1035 Py_ssize_t segmentsCount;
1036 ZstdBufferWithSegments* bws = NULL;
1037 PyObject* resultArg = NULL;
1038 Py_ssize_t resultIndex;
1039 ZstdBufferWithSegmentsCollection* result = NULL;
1040 FramePointer* framePointers = frames->frames;
1041 unsigned long long workerBytes = 0;
1042 int currentThread = 0;
1043 Py_ssize_t workerStartOffset = 0;
1044 POOL_ctx* pool = NULL;
1045 WorkerState* workerStates = NULL;
1046 unsigned long long bytesPerWorker;
1047
1048 /* Caller should normalize 0 and negative values to 1 or larger. */
1049 assert(threadCount >= 1);
1050
1051 /* More threads than inputs makes no sense under any conditions. */
1052 threadCount = frames->framesSize < threadCount ? (unsigned int)frames->framesSize
1053 : threadCount;
1054
1055 /* TODO lower thread count if input size is too small and threads would just
1056 add overhead. */
1057
1058 if (decompressor->dict) {
1059 dictData = decompressor->dict->dictData;
1060 dictSize = decompressor->dict->dictSize;
1061 }
1062
1063 if (dictData && !decompressor->ddict) {
1064 Py_BEGIN_ALLOW_THREADS
1065 decompressor->ddict = ZSTD_createDDict_byReference(dictData, dictSize);
1066 Py_END_ALLOW_THREADS
1067
1068 if (!decompressor->ddict) {
1069 PyErr_SetString(ZstdError, "could not create decompression dict");
1070 return NULL;
1071 }
1072 }
1073
1074 /* If threadCount==1, we don't start a thread pool. But we do leverage the
1075 same API for dispatching work. */
1076 workerStates = PyMem_Malloc(threadCount * sizeof(WorkerState));
1077 if (NULL == workerStates) {
1078 PyErr_NoMemory();
1079 goto finally;
1080 }
1081
1082 memset(workerStates, 0, threadCount * sizeof(WorkerState));
1083
1084 if (threadCount > 1) {
1085 pool = POOL_create(threadCount, 1);
1086 if (NULL == pool) {
1087 PyErr_SetString(ZstdError, "could not initialize zstd thread pool");
1088 goto finally;
1089 }
1090 }
1091
1092 bytesPerWorker = frames->compressedSize / threadCount;
1093
1094 for (i = 0; i < threadCount; i++) {
1095 workerStates[i].dctx = ZSTD_createDCtx();
1096 if (NULL == workerStates[i].dctx) {
1097 PyErr_NoMemory();
1098 goto finally;
1099 }
1100
1101 ZSTD_copyDCtx(workerStates[i].dctx, decompressor->dctx);
1102
1103 workerStates[i].ddict = decompressor->ddict;
1104 workerStates[i].framePointers = framePointers;
1105 workerStates[i].requireOutputSizes = 1;
1106 }
1107
1108 Py_BEGIN_ALLOW_THREADS
1109 /* There are many ways to split work among workers.
1110
1111 For now, we take a simple approach of splitting work so each worker
1112 gets roughly the same number of input bytes. This will result in more
1113 starvation than running N>threadCount jobs. But it avoids complications
1114 around state tracking, which could involve extra locking.
1115 */
1116 for (i = 0; i < frames->framesSize; i++) {
1117 workerBytes += frames->frames[i].sourceSize;
1118
1119 /*
1120 * The last worker/thread needs to handle all remaining work. Don't
1121 * trigger it prematurely. Defer to the block outside of the loop.
1122 * (But still process this loop so workerBytes is correct.
1123 */
1124 if (currentThread == threadCount - 1) {
1125 continue;
1126 }
1127
1128 if (workerBytes >= bytesPerWorker) {
1129 workerStates[currentThread].startOffset = workerStartOffset;
1130 workerStates[currentThread].endOffset = i;
1131 workerStates[currentThread].totalSourceSize = workerBytes;
1132
1133 if (threadCount > 1) {
1134 POOL_add(pool, (POOL_function)decompress_worker, &workerStates[currentThread]);
1135 }
1136 else {
1137 decompress_worker(&workerStates[currentThread]);
1138 }
1139 currentThread++;
1140 workerStartOffset = i + 1;
1141 workerBytes = 0;
1142 }
1143 }
1144
1145 if (workerBytes) {
1146 workerStates[currentThread].startOffset = workerStartOffset;
1147 workerStates[currentThread].endOffset = frames->framesSize - 1;
1148 workerStates[currentThread].totalSourceSize = workerBytes;
1149
1150 if (threadCount > 1) {
1151 POOL_add(pool, (POOL_function)decompress_worker, &workerStates[currentThread]);
1152 }
1153 else {
1154 decompress_worker(&workerStates[currentThread]);
1155 }
1156 }
1157
1158 if (threadCount > 1) {
1159 POOL_free(pool);
1160 pool = NULL;
1161 }
1162 Py_END_ALLOW_THREADS
1163
1164 for (i = 0; i < threadCount; i++) {
1165 switch (workerStates[i].error) {
1166 case WorkerError_none:
1167 break;
1168
1169 case WorkerError_zstd:
1170 PyErr_Format(ZstdError, "error decompressing item %zd: %s",
1171 workerStates[i].errorOffset, ZSTD_getErrorName(workerStates[i].zresult));
1172 errored = 1;
1173 break;
1174
1175 case WorkerError_memory:
1176 PyErr_NoMemory();
1177 errored = 1;
1178 break;
1179
1180 case WorkerError_sizeMismatch:
1181 PyErr_Format(ZstdError, "error decompressing item %zd: decompressed %zu bytes; expected %llu",
1182 workerStates[i].errorOffset, workerStates[i].zresult,
1183 framePointers[workerStates[i].errorOffset].destSize);
1184 errored = 1;
1185 break;
1186
1187 case WorkerError_unknownSize:
1188 PyErr_Format(PyExc_ValueError, "could not determine decompressed size of item %zd",
1189 workerStates[i].errorOffset);
1190 errored = 1;
1191 break;
1192
1193 default:
1194 PyErr_Format(ZstdError, "unhandled error type: %d; this is a bug",
1195 workerStates[i].error);
1196 errored = 1;
1197 break;
1198 }
1199
1200 if (errored) {
1201 break;
1202 }
1203 }
1204
1205 if (errored) {
1206 goto finally;
1207 }
1208
1209 segmentsCount = 0;
1210 for (i = 0; i < threadCount; i++) {
1211 segmentsCount += workerStates[i].destCount;
1212 }
1213
1214 resultArg = PyTuple_New(segmentsCount);
1215 if (NULL == resultArg) {
1216 goto finally;
1217 }
1218
1219 resultIndex = 0;
1220
1221 for (i = 0; i < threadCount; i++) {
1222 Py_ssize_t bufferIndex;
1223 WorkerState* state = &workerStates[i];
1224
1225 for (bufferIndex = 0; bufferIndex < state->destCount; bufferIndex++) {
1226 DestBuffer* destBuffer = &state->destBuffers[bufferIndex];
1227
1228 bws = BufferWithSegments_FromMemory(destBuffer->dest, destBuffer->destSize,
1229 destBuffer->segments, destBuffer->segmentsSize);
1230 if (NULL == bws) {
1231 goto finally;
1232 }
1233
1234 /*
1235 * Memory for buffer and segments was allocated using malloc() in worker
1236 * and the memory is transferred to the BufferWithSegments instance. So
1237 * tell instance to use free() and NULL the reference in the state struct
1238 * so it isn't freed below.
1239 */
1240 bws->useFree = 1;
1241 destBuffer->dest = NULL;
1242 destBuffer->segments = NULL;
1243
1244 PyTuple_SET_ITEM(resultArg, resultIndex++, (PyObject*)bws);
1245 }
1246 }
1247
1248 result = (ZstdBufferWithSegmentsCollection*)PyObject_CallObject(
1249 (PyObject*)&ZstdBufferWithSegmentsCollectionType, resultArg);
1250
1251 finally:
1252 Py_CLEAR(resultArg);
1253
1254 if (workerStates) {
1255 for (i = 0; i < threadCount; i++) {
1256 Py_ssize_t bufferIndex;
1257 WorkerState* state = &workerStates[i];
1258
1259 if (state->dctx) {
1260 ZSTD_freeDCtx(state->dctx);
1261 }
1262
1263 for (bufferIndex = 0; bufferIndex < state->destCount; bufferIndex++) {
1264 if (state->destBuffers) {
1265 /*
1266 * Will be NULL if memory transfered to a BufferWithSegments.
1267 * Otherwise it is left over after an error occurred.
1268 */
1269 free(state->destBuffers[bufferIndex].dest);
1270 free(state->destBuffers[bufferIndex].segments);
1271 }
1272 }
1273
1274 free(state->destBuffers);
1275 }
1276
1277 PyMem_Free(workerStates);
1278 }
1279
1280 POOL_free(pool);
1281
1282 return result;
1283 }
1284
1285 PyDoc_STRVAR(Decompressor_multi_decompress_to_buffer__doc__,
1286 "Decompress multiple frames to output buffers\n"
1287 "\n"
1288 "Receives a ``BufferWithSegments``, a ``BufferWithSegmentsCollection`` or a\n"
1289 "list of bytes-like objects. Each item in the passed collection should be a\n"
1290 "compressed zstd frame.\n"
1291 "\n"
1292 "Unless ``decompressed_sizes`` is specified, the content size *must* be\n"
1293 "written into the zstd frame header. If ``decompressed_sizes`` is specified,\n"
1294 "it is an object conforming to the buffer protocol that represents an array\n"
1295 "of 64-bit unsigned integers in the machine's native format. Specifying\n"
1296 "``decompressed_sizes`` avoids a pre-scan of each frame to determine its\n"
1297 "output size.\n"
1298 "\n"
1299 "Returns a ``BufferWithSegmentsCollection`` containing the decompressed\n"
1300 "data. All decompressed data is allocated in a single memory buffer. The\n"
1301 "``BufferWithSegments`` instance tracks which objects are at which offsets\n"
1302 "and their respective lengths.\n"
1303 "\n"
1304 "The ``threads`` argument controls how many threads to use for operations.\n"
1305 "Negative values will use the same number of threads as logical CPUs on the\n"
1306 "machine.\n"
1307 );
1308
1309 static ZstdBufferWithSegmentsCollection* Decompressor_multi_decompress_to_buffer(ZstdDecompressor* self, PyObject* args, PyObject* kwargs) {
1310 static char* kwlist[] = {
1311 "frames",
1312 "decompressed_sizes",
1313 "threads",
1314 NULL
1315 };
1316
1317 PyObject* frames;
1318 Py_buffer frameSizes;
1319 int threads = 0;
1320 Py_ssize_t frameCount;
1321 Py_buffer* frameBuffers = NULL;
1322 FramePointer* framePointers = NULL;
1323 unsigned long long* frameSizesP = NULL;
1324 unsigned long long totalInputSize = 0;
1325 FrameSources frameSources;
1326 ZstdBufferWithSegmentsCollection* result = NULL;
1327 Py_ssize_t i;
1328
1329 memset(&frameSizes, 0, sizeof(frameSizes));
1330
1331 #if PY_MAJOR_VERSION >= 3
1332 if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|y*i:multi_decompress_to_buffer",
1333 #else
1334 if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|s*i:multi_decompress_to_buffer",
1335 #endif
1336 kwlist, &frames, &frameSizes, &threads)) {
1337 return NULL;
1338 }
1339
1340 if (frameSizes.buf) {
1341 if (!PyBuffer_IsContiguous(&frameSizes, 'C') || frameSizes.ndim > 1) {
1342 PyErr_SetString(PyExc_ValueError, "decompressed_sizes buffer should be contiguous and have a single dimension");
1343 goto finally;
1344 }
1345
1346 frameSizesP = (unsigned long long*)frameSizes.buf;
1347 }
1348
1349 if (threads < 0) {
1350 threads = cpu_count();
1351 }
1352
1353 if (threads < 2) {
1354 threads = 1;
1355 }
1356
1357 if (PyObject_TypeCheck(frames, &ZstdBufferWithSegmentsType)) {
1358 ZstdBufferWithSegments* buffer = (ZstdBufferWithSegments*)frames;
1359 frameCount = buffer->segmentCount;
1360
1361 if (frameSizes.buf && frameSizes.len != frameCount * (Py_ssize_t)sizeof(unsigned long long)) {
1362 PyErr_Format(PyExc_ValueError, "decompressed_sizes size mismatch; expected %zd, got %zd",
1363 frameCount * sizeof(unsigned long long), frameSizes.len);
1364 goto finally;
1365 }
1366
1367 framePointers = PyMem_Malloc(frameCount * sizeof(FramePointer));
1368 if (!framePointers) {
1369 PyErr_NoMemory();
1370 goto finally;
1371 }
1372
1373 for (i = 0; i < frameCount; i++) {
1374 void* sourceData;
1375 unsigned long long sourceSize;
1376 unsigned long long decompressedSize = 0;
1377
1378 if (buffer->segments[i].offset + buffer->segments[i].length > buffer->dataSize) {
1379 PyErr_Format(PyExc_ValueError, "item %zd has offset outside memory area", i);
1380 goto finally;
1381 }
1382
1383 sourceData = (char*)buffer->data + buffer->segments[i].offset;
1384 sourceSize = buffer->segments[i].length;
1385 totalInputSize += sourceSize;
1386
1387 if (frameSizesP) {
1388 decompressedSize = frameSizesP[i];
1389 }
1390
1391 framePointers[i].sourceData = sourceData;
1392 framePointers[i].sourceSize = sourceSize;
1393 framePointers[i].destSize = decompressedSize;
1394 }
1395 }
1396 else if (PyObject_TypeCheck(frames, &ZstdBufferWithSegmentsCollectionType)) {
1397 Py_ssize_t offset = 0;
1398 ZstdBufferWithSegments* buffer;
1399 ZstdBufferWithSegmentsCollection* collection = (ZstdBufferWithSegmentsCollection*)frames;
1400
1401 frameCount = BufferWithSegmentsCollection_length(collection);
1402
1403 if (frameSizes.buf && frameSizes.len != frameCount) {
1404 PyErr_Format(PyExc_ValueError,
1405 "decompressed_sizes size mismatch; expected %zd; got %zd",
1406 frameCount * sizeof(unsigned long long), frameSizes.len);
1407 goto finally;
1408 }
1409
1410 framePointers = PyMem_Malloc(frameCount * sizeof(FramePointer));
1411 if (NULL == framePointers) {
1412 PyErr_NoMemory();
1413 goto finally;
1414 }
1415
1416 /* Iterate the data structure directly because it is faster. */
1417 for (i = 0; i < collection->bufferCount; i++) {
1418 Py_ssize_t segmentIndex;
1419 buffer = collection->buffers[i];
1420
1421 for (segmentIndex = 0; segmentIndex < buffer->segmentCount; segmentIndex++) {
1422 if (buffer->segments[segmentIndex].offset + buffer->segments[segmentIndex].length > buffer->dataSize) {
1423 PyErr_Format(PyExc_ValueError, "item %zd has offset outside memory area",
1424 offset);
1425 goto finally;
1426 }
1427
1428 totalInputSize += buffer->segments[segmentIndex].length;
1429
1430 framePointers[offset].sourceData = (char*)buffer->data + buffer->segments[segmentIndex].offset;
1431 framePointers[offset].sourceSize = buffer->segments[segmentIndex].length;
1432 framePointers[offset].destSize = frameSizesP ? frameSizesP[offset] : 0;
1433
1434 offset++;
1435 }
1436 }
1437 }
1438 else if (PyList_Check(frames)) {
1439 frameCount = PyList_GET_SIZE(frames);
1440
1441 if (frameSizes.buf && frameSizes.len != frameCount * (Py_ssize_t)sizeof(unsigned long long)) {
1442 PyErr_Format(PyExc_ValueError, "decompressed_sizes size mismatch; expected %zd, got %zd",
1443 frameCount * sizeof(unsigned long long), frameSizes.len);
1444 goto finally;
1445 }
1446
1447 framePointers = PyMem_Malloc(frameCount * sizeof(FramePointer));
1448 if (!framePointers) {
1449 PyErr_NoMemory();
1450 goto finally;
1451 }
1452
1453 /*
1454 * It is not clear whether Py_buffer.buf is still valid after
1455 * PyBuffer_Release. So, we hold a reference to all Py_buffer instances
1456 * for the duration of the operation.
1457 */
1458 frameBuffers = PyMem_Malloc(frameCount * sizeof(Py_buffer));
1459 if (NULL == frameBuffers) {
1460 PyErr_NoMemory();
1461 goto finally;
1462 }
1463
1464 memset(frameBuffers, 0, frameCount * sizeof(Py_buffer));
1465
1466 /* Do a pass to assemble info about our input buffers and output sizes. */
1467 for (i = 0; i < frameCount; i++) {
1468 if (0 != PyObject_GetBuffer(PyList_GET_ITEM(frames, i),
1469 &frameBuffers[i], PyBUF_CONTIG_RO)) {
1470 PyErr_Clear();
1471 PyErr_Format(PyExc_TypeError, "item %zd not a bytes like object", i);
1472 goto finally;
1473 }
1474
1475 totalInputSize += frameBuffers[i].len;
1476
1477 framePointers[i].sourceData = frameBuffers[i].buf;
1478 framePointers[i].sourceSize = frameBuffers[i].len;
1479 framePointers[i].destSize = frameSizesP ? frameSizesP[i] : 0;
1480 }
1481 }
1482 else {
1483 PyErr_SetString(PyExc_TypeError, "argument must be list or BufferWithSegments");
1484 goto finally;
1485 }
1486
1487 /* We now have an array with info about our inputs and outputs. Feed it into
1488 our generic decompression function. */
1489 frameSources.frames = framePointers;
1490 frameSources.framesSize = frameCount;
1491 frameSources.compressedSize = totalInputSize;
1492
1493 result = decompress_from_framesources(self, &frameSources, threads);
1494
1495 finally:
1496 if (frameSizes.buf) {
1497 PyBuffer_Release(&frameSizes);
1498 }
1499 PyMem_Free(framePointers);
1500
1501 if (frameBuffers) {
1502 for (i = 0; i < frameCount; i++) {
1503 PyBuffer_Release(&frameBuffers[i]);
1504 }
1505
1506 PyMem_Free(frameBuffers);
1507 }
1508
1509 return result;
1510 }
1511
779 1512 static PyMethodDef Decompressor_methods[] = {
780 1513 { "copy_stream", (PyCFunction)Decompressor_copy_stream, METH_VARARGS | METH_KEYWORDS,
781 1514 Decompressor_copy_stream__doc__ },
@@ -789,6 +1522,8 b' static PyMethodDef Decompressor_methods['
789 1522 Decompressor_write_to__doc__ },
790 1523 { "decompress_content_dict_chain", (PyCFunction)Decompressor_decompress_content_dict_chain,
791 1524 METH_VARARGS | METH_KEYWORDS, Decompressor_decompress_content_dict_chain__doc__ },
1525 { "multi_decompress_to_buffer", (PyCFunction)Decompressor_multi_decompress_to_buffer,
1526 METH_VARARGS | METH_KEYWORDS, Decompressor_multi_decompress_to_buffer__doc__ },
792 1527 { NULL, NULL }
793 1528 };
794 1529
@@ -26,11 +26,6 b' static void ZstdDecompressorIterator_dea'
26 26 self->buffer = NULL;
27 27 }
28 28
29 if (self->dstream) {
30 ZSTD_freeDStream(self->dstream);
31 self->dstream = NULL;
32 }
33
34 29 if (self->input.src) {
35 30 PyMem_Free((void*)self->input.src);
36 31 self->input.src = NULL;
@@ -50,6 +45,8 b' static DecompressorIteratorResult read_d'
50 45 DecompressorIteratorResult result;
51 46 size_t oldInputPos = self->input.pos;
52 47
48 assert(self->decompressor->dstream);
49
53 50 result.chunk = NULL;
54 51
55 52 chunk = PyBytes_FromStringAndSize(NULL, self->outSize);
@@ -63,7 +60,7 b' static DecompressorIteratorResult read_d'
63 60 self->output.pos = 0;
64 61
65 62 Py_BEGIN_ALLOW_THREADS
66 zresult = ZSTD_decompressStream(self->dstream, &self->output, &self->input);
63 zresult = ZSTD_decompressStream(self->decompressor->dstream, &self->output, &self->input);
67 64 Py_END_ALLOW_THREADS
68 65
69 66 /* We're done with the pointer. Nullify to prevent anyone from getting a
@@ -160,7 +157,7 b' read_from_source:'
160 157 PyErr_SetString(PyExc_ValueError,
161 158 "skip_bytes larger than first input chunk; "
162 159 "this scenario is currently unsupported");
163 Py_DecRef(readResult);
160 Py_XDECREF(readResult);
164 161 return NULL;
165 162 }
166 163
@@ -179,7 +176,7 b' read_from_source:'
179 176 else if (!self->readCount) {
180 177 self->finishedInput = 1;
181 178 self->finishedOutput = 1;
182 Py_DecRef(readResult);
179 Py_XDECREF(readResult);
183 180 PyErr_SetString(PyExc_StopIteration, "empty input");
184 181 return NULL;
185 182 }
@@ -188,7 +185,7 b' read_from_source:'
188 185 }
189 186
190 187 /* We've copied the data managed by memory. Discard the Python object. */
191 Py_DecRef(readResult);
188 Py_XDECREF(readResult);
192 189 }
193 190
194 191 result = read_decompressor_iterator(self);
@@ -127,6 +127,6 b' void frameparams_module_init(PyObject* m'
127 127 return;
128 128 }
129 129
130 Py_IncRef((PyObject*)&FrameParametersType);
130 Py_INCREF(&FrameParametersType);
131 131 PyModule_AddObject(mod, "FrameParameters", (PyObject*)&FrameParametersType);
132 132 }
@@ -15,14 +15,20 b''
15 15 #include "mem.h"
16 16 #include "zstd.h"
17 17 #include "zdict.h"
18 #include "zstdmt_compress.h"
18 19
19 #define PYTHON_ZSTANDARD_VERSION "0.7.0"
20 #define PYTHON_ZSTANDARD_VERSION "0.8.0"
20 21
21 22 typedef enum {
22 23 compressorobj_flush_finish,
23 24 compressorobj_flush_block,
24 25 } CompressorObj_Flush;
25 26
27 /*
28 Represents a CompressionParameters type.
29
30 This type is basically a wrapper around ZSTD_compressionParameters.
31 */
26 32 typedef struct {
27 33 PyObject_HEAD
28 34 unsigned windowLog;
@@ -36,6 +42,11 b' typedef struct {'
36 42
37 43 extern PyTypeObject CompressionParametersType;
38 44
45 /*
46 Represents a FrameParameters type.
47
48 This type is basically a wrapper around ZSTD_frameParams.
49 */
39 50 typedef struct {
40 51 PyObject_HEAD
41 52 unsigned long long frameContentSize;
@@ -46,34 +57,55 b' typedef struct {'
46 57
47 58 extern PyTypeObject FrameParametersType;
48 59
49 typedef struct {
50 PyObject_HEAD
51 unsigned selectivityLevel;
52 int compressionLevel;
53 unsigned notificationLevel;
54 unsigned dictID;
55 } DictParametersObject;
60 /*
61 Represents a ZstdCompressionDict type.
56 62
57 extern PyTypeObject DictParametersType;
58
63 Instances hold data used for a zstd compression dictionary.
64 */
59 65 typedef struct {
60 66 PyObject_HEAD
61 67
68 /* Pointer to dictionary data. Owned by self. */
62 69 void* dictData;
70 /* Size of dictionary data. */
63 71 size_t dictSize;
72 /* k parameter for cover dictionaries. Only populated by train_cover_dict(). */
73 unsigned k;
74 /* d parameter for cover dictionaries. Only populated by train_cover_dict(). */
75 unsigned d;
64 76 } ZstdCompressionDict;
65 77
66 78 extern PyTypeObject ZstdCompressionDictType;
67 79
80 /*
81 Represents a ZstdCompressor type.
82 */
68 83 typedef struct {
69 84 PyObject_HEAD
70 85
86 /* Configured compression level. Should be always set. */
71 87 int compressionLevel;
88 /* Number of threads to use for operations. */
89 unsigned int threads;
90 /* Pointer to compression dictionary to use. NULL if not using dictionary
91 compression. */
72 92 ZstdCompressionDict* dict;
93 /* Compression context to use. Populated during object construction. NULL
94 if using multi-threaded compression. */
73 95 ZSTD_CCtx* cctx;
96 /* Multi-threaded compression context to use. Populated during object
97 construction. NULL if not using multi-threaded compression. */
98 ZSTDMT_CCtx* mtcctx;
99 /* Digest compression dictionary. NULL initially. Populated on first use. */
74 100 ZSTD_CDict* cdict;
101 /* Low-level compression parameter control. NULL unless passed to
102 constructor. Takes precedence over `compressionLevel` if defined. */
75 103 CompressionParametersObject* cparams;
104 /* Controls zstd frame options. */
76 105 ZSTD_frameParameters fparams;
106 /* Holds state for streaming compression. Shared across all invocation.
107 Populated on first use. */
108 ZSTD_CStream* cstream;
77 109 } ZstdCompressor;
78 110
79 111 extern PyTypeObject ZstdCompressorType;
@@ -82,7 +114,6 b' typedef struct {'
82 114 PyObject_HEAD
83 115
84 116 ZstdCompressor* compressor;
85 ZSTD_CStream* cstream;
86 117 ZSTD_outBuffer output;
87 118 int finished;
88 119 } ZstdCompressionObj;
@@ -96,7 +127,6 b' typedef struct {'
96 127 PyObject* writer;
97 128 Py_ssize_t sourceSize;
98 129 size_t outSize;
99 ZSTD_CStream* cstream;
100 130 int entered;
101 131 } ZstdCompressionWriter;
102 132
@@ -113,7 +143,6 b' typedef struct {'
113 143 size_t inSize;
114 144 size_t outSize;
115 145
116 ZSTD_CStream* cstream;
117 146 ZSTD_inBuffer input;
118 147 ZSTD_outBuffer output;
119 148 int finishedOutput;
@@ -130,6 +159,7 b' typedef struct {'
130 159
131 160 ZstdCompressionDict* dict;
132 161 ZSTD_DDict* ddict;
162 ZSTD_DStream* dstream;
133 163 } ZstdDecompressor;
134 164
135 165 extern PyTypeObject ZstdDecompressorType;
@@ -138,7 +168,6 b' typedef struct {'
138 168 PyObject_HEAD
139 169
140 170 ZstdDecompressor* decompressor;
141 ZSTD_DStream* dstream;
142 171 int finished;
143 172 } ZstdDecompressionObj;
144 173
@@ -150,7 +179,6 b' typedef struct {'
150 179 ZstdDecompressor* decompressor;
151 180 PyObject* writer;
152 181 size_t outSize;
153 ZSTD_DStream* dstream;
154 182 int entered;
155 183 } ZstdDecompressionWriter;
156 184
@@ -166,7 +194,6 b' typedef struct {'
166 194 size_t inSize;
167 195 size_t outSize;
168 196 size_t skipBytes;
169 ZSTD_DStream* dstream;
170 197 ZSTD_inBuffer input;
171 198 ZSTD_outBuffer output;
172 199 Py_ssize_t readCount;
@@ -181,10 +208,78 b' typedef struct {'
181 208 PyObject* chunk;
182 209 } DecompressorIteratorResult;
183 210
211 typedef struct {
212 unsigned long long offset;
213 unsigned long long length;
214 } BufferSegment;
215
216 typedef struct {
217 PyObject_HEAD
218
219 PyObject* parent;
220 BufferSegment* segments;
221 Py_ssize_t segmentCount;
222 } ZstdBufferSegments;
223
224 extern PyTypeObject ZstdBufferSegmentsType;
225
226 typedef struct {
227 PyObject_HEAD
228
229 PyObject* parent;
230 void* data;
231 Py_ssize_t dataSize;
232 unsigned long long offset;
233 } ZstdBufferSegment;
234
235 extern PyTypeObject ZstdBufferSegmentType;
236
237 typedef struct {
238 PyObject_HEAD
239
240 Py_buffer parent;
241 void* data;
242 unsigned long long dataSize;
243 BufferSegment* segments;
244 Py_ssize_t segmentCount;
245 int useFree;
246 } ZstdBufferWithSegments;
247
248 extern PyTypeObject ZstdBufferWithSegmentsType;
249
250 /**
251 * An ordered collection of BufferWithSegments exposed as a squashed collection.
252 *
253 * This type provides a virtual view spanning multiple BufferWithSegments
254 * instances. It allows multiple instances to be "chained" together and
255 * exposed as a single collection. e.g. if there are 2 buffers holding
256 * 10 segments each, then o[14] will access the 5th segment in the 2nd buffer.
257 */
258 typedef struct {
259 PyObject_HEAD
260
261 /* An array of buffers that should be exposed through this instance. */
262 ZstdBufferWithSegments** buffers;
263 /* Number of elements in buffers array. */
264 Py_ssize_t bufferCount;
265 /* Array of first offset in each buffer instance. 0th entry corresponds
266 to number of elements in the 0th buffer. 1st entry corresponds to the
267 sum of elements in 0th and 1st buffers. */
268 Py_ssize_t* firstElements;
269 } ZstdBufferWithSegmentsCollection;
270
271 extern PyTypeObject ZstdBufferWithSegmentsCollectionType;
272
184 273 void ztopy_compression_parameters(CompressionParametersObject* params, ZSTD_compressionParameters* zparams);
185 274 CompressionParametersObject* get_compression_parameters(PyObject* self, PyObject* args);
186 275 FrameParametersObject* get_frame_parameters(PyObject* self, PyObject* args);
187 276 PyObject* estimate_compression_context_size(PyObject* self, PyObject* args);
188 ZSTD_CStream* CStream_from_ZstdCompressor(ZstdCompressor* compressor, Py_ssize_t sourceSize);
189 ZSTD_DStream* DStream_from_ZstdDecompressor(ZstdDecompressor* decompressor);
277 int init_cstream(ZstdCompressor* compressor, unsigned long long sourceSize);
278 int init_mtcstream(ZstdCompressor* compressor, Py_ssize_t sourceSize);
279 int init_dstream(ZstdDecompressor* decompressor);
190 280 ZstdCompressionDict* train_dictionary(PyObject* self, PyObject* args, PyObject* kwargs);
281 ZstdCompressionDict* train_cover_dictionary(PyObject* self, PyObject* args, PyObject* kwargs);
282 ZstdBufferWithSegments* BufferWithSegments_FromMemory(void* data, unsigned long long dataSize, BufferSegment* segments, Py_ssize_t segmentsSize);
283 Py_ssize_t BufferWithSegmentsCollection_length(ZstdBufferWithSegmentsCollection*);
284 int cpu_count(void);
285 size_t roundpow2(size_t);
@@ -27,6 +27,7 b" SOURCES = ['zstd/%s' % p for p in ("
27 27 'compress/fse_compress.c',
28 28 'compress/huf_compress.c',
29 29 'compress/zstd_compress.c',
30 'compress/zstdmt_compress.c',
30 31 'decompress/huf_decompress.c',
31 32 'decompress/zstd_decompress.c',
32 33 'dictBuilder/cover.c',
@@ -34,9 +35,10 b" SOURCES = ['zstd/%s' % p for p in ("
34 35 'dictBuilder/zdict.c',
35 36 )]
36 37
38 # Headers whose preprocessed output will be fed into cdef().
37 39 HEADERS = [os.path.join(HERE, 'zstd', *p) for p in (
38 40 ('zstd.h',),
39 ('common', 'pool.h'),
41 ('compress', 'zstdmt_compress.h'),
40 42 ('dictBuilder', 'zdict.h'),
41 43 )]
42 44
@@ -76,11 +78,30 b' else:'
76 78 raise Exception('unsupported compiler type: %s' % compiler.compiler_type)
77 79
78 80 def preprocess(path):
79 # zstd.h includes <stddef.h>, which is also included by cffi's boilerplate.
80 # This can lead to duplicate declarations. So we strip this include from the
81 # preprocessor invocation.
82 81 with open(path, 'rb') as fh:
83 lines = [l for l in fh if not l.startswith(b'#include <stddef.h>')]
82 lines = []
83 for l in fh:
84 # zstd.h includes <stddef.h>, which is also included by cffi's
85 # boilerplate. This can lead to duplicate declarations. So we strip
86 # this include from the preprocessor invocation.
87 #
88 # The same things happens for including zstd.h, so give it the same
89 # treatment.
90 #
91 # We define ZSTD_STATIC_LINKING_ONLY, which is redundant with the inline
92 # #define in zstdmt_compress.h and results in a compiler warning. So drop
93 # the inline #define.
94 if l.startswith((b'#include <stddef.h>',
95 b'#include "zstd.h"',
96 b'#define ZSTD_STATIC_LINKING_ONLY')):
97 continue
98
99 # ZSTDLIB_API may not be defined if we dropped zstd.h. It isn't
100 # important so just filter it out.
101 if l.startswith(b'ZSTDLIB_API'):
102 l = l[len(b'ZSTDLIB_API '):]
103
104 lines.append(l)
84 105
85 106 fd, input_file = tempfile.mkstemp(suffix='.h')
86 107 os.write(fd, b''.join(lines))
@@ -116,25 +137,30 b' def normalize_output(output):'
116 137
117 138
118 139 ffi = cffi.FFI()
140 # *_DISABLE_DEPRECATE_WARNINGS prevents the compiler from emitting a warning
141 # when cffi uses the function. Since we statically link against zstd, even
142 # if we use the deprecated functions it shouldn't be a huge problem.
119 143 ffi.set_source('_zstd_cffi', '''
120 144 #include "mem.h"
121 145 #define ZSTD_STATIC_LINKING_ONLY
122 146 #include "zstd.h"
123 147 #define ZDICT_STATIC_LINKING_ONLY
124 #include "pool.h"
148 #define ZDICT_DISABLE_DEPRECATE_WARNINGS
125 149 #include "zdict.h"
150 #include "zstdmt_compress.h"
126 151 ''', sources=SOURCES, include_dirs=INCLUDE_DIRS)
127 152
128 153 DEFINE = re.compile(b'^\\#define ([a-zA-Z0-9_]+) ')
129 154
130 155 sources = []
131 156
157 # Feed normalized preprocessor output for headers into the cdef parser.
132 158 for header in HEADERS:
133 159 preprocessed = preprocess(header)
134 160 sources.append(normalize_output(preprocessed))
135 161
136 # Do another pass over source and find constants that were preprocessed
137 # away.
162 # #define's are effectively erased as part of going through preprocessor.
163 # So perform a manual pass to re-add those to the cdef source.
138 164 with open(header, 'rb') as fh:
139 165 for line in fh:
140 166 line = line.strip()
@@ -142,13 +168,20 b' for header in HEADERS:'
142 168 if not m:
143 169 continue
144 170
171 if m.group(1) == b'ZSTD_STATIC_LINKING_ONLY':
172 continue
173
145 174 # The parser doesn't like some constants with complex values.
146 175 if m.group(1) in (b'ZSTD_LIB_VERSION', b'ZSTD_VERSION_STRING'):
147 176 continue
148 177
178 # The ... is magic syntax by the cdef parser to resolve the
179 # value at compile time.
149 180 sources.append(m.group(0) + b' ...')
150 181
151 ffi.cdef(u'\n'.join(s.decode('latin1') for s in sources))
182 cdeflines = b'\n'.join(sources).splitlines()
183 cdeflines = [l for l in cdeflines if l.strip()]
184 ffi.cdef(b'\n'.join(cdeflines).decode('latin1'))
152 185
153 186 if __name__ == '__main__':
154 187 ffi.compile()
@@ -25,10 +25,15 b' if "--legacy" in sys.argv:'
25 25 # facilitate reuse in other projects.
26 26 extensions = [setup_zstd.get_c_extension(SUPPORT_LEGACY, 'zstd')]
27 27
28 install_requires = []
29
28 30 if cffi:
29 31 import make_cffi
30 32 extensions.append(make_cffi.ffi.distutils_extension())
31 33
34 # Need change in 1.8 for ffi.from_buffer() behavior.
35 install_requires.append('cffi>=1.8')
36
32 37 version = None
33 38
34 39 with open('c-ext/python-zstandard.h', 'r') as fh:
@@ -67,4 +72,5 b' setup('
67 72 keywords='zstandard zstd compression',
68 73 ext_modules=extensions,
69 74 test_suite='tests',
75 install_requires=install_requires,
70 76 )
@@ -19,6 +19,7 b" zstd_sources = ['zstd/%s' % p for p in ("
19 19 'compress/fse_compress.c',
20 20 'compress/huf_compress.c',
21 21 'compress/zstd_compress.c',
22 'compress/zstdmt_compress.c',
22 23 'decompress/huf_decompress.c',
23 24 'decompress/zstd_decompress.c',
24 25 'dictBuilder/cover.c',
@@ -55,6 +56,7 b' zstd_includes_legacy = ['
55 56
56 57 ext_sources = [
57 58 'zstd.c',
59 'c-ext/bufferutil.c',
58 60 'c-ext/compressiondict.c',
59 61 'c-ext/compressobj.c',
60 62 'c-ext/compressor.c',
@@ -66,7 +68,6 b' ext_sources = ['
66 68 'c-ext/decompressor.c',
67 69 'c-ext/decompressoriterator.c',
68 70 'c-ext/decompressionwriter.c',
69 'c-ext/dictparams.c',
70 71 'c-ext/frameparams.c',
71 72 ]
72 73
@@ -89,8 +90,13 b' def get_c_extension(support_legacy=False'
89 90
90 91 depends = [os.path.join(root, p) for p in zstd_depends]
91 92
93 extra_args = ['-DZSTD_MULTITHREAD']
94
95 if support_legacy:
96 extra_args.append('-DZSTD_LEGACY_SUPPORT=1')
97
92 98 # TODO compile with optimizations.
93 99 return Extension(name, sources,
94 100 include_dirs=include_dirs,
95 101 depends=depends,
96 extra_compile_args=["-DZSTD_LEGACY_SUPPORT=1"] if support_legacy else [])
102 extra_compile_args=extra_args)
@@ -1,5 +1,6 b''
1 1 import inspect
2 2 import io
3 import os
3 4 import types
4 5
5 6
@@ -59,3 +60,29 b' class OpCountingBytesIO(io.BytesIO):'
59 60 def write(self, data):
60 61 self._write_count += 1
61 62 return super(OpCountingBytesIO, self).write(data)
63
64
65 _source_files = []
66
67
68 def random_input_data():
69 """Obtain the raw content of source files.
70
71 This is used for generating "random" data to feed into fuzzing, since it is
72 faster than random content generation.
73 """
74 if _source_files:
75 return _source_files
76
77 for root, dirs, files in os.walk(os.path.dirname(__file__)):
78 dirs[:] = list(sorted(dirs))
79 for f in sorted(files):
80 try:
81 with open(os.path.join(root, f), 'rb') as fh:
82 data = fh.read()
83 if data:
84 _source_files.append(data)
85 except OSError:
86 pass
87
88 return _source_files
@@ -22,6 +22,12 b' else:'
22 22 next = lambda it: it.next()
23 23
24 24
25 def multithreaded_chunk_size(level, source_size=0):
26 params = zstd.get_compression_parameters(level, source_size)
27
28 return 1 << (params.window_log + 2)
29
30
25 31 @make_cffi
26 32 class TestCompressor(unittest.TestCase):
27 33 def test_level_bounds(self):
@@ -34,6 +40,24 b' class TestCompressor(unittest.TestCase):'
34 40
35 41 @make_cffi
36 42 class TestCompressor_compress(unittest.TestCase):
43 def test_multithreaded_unsupported(self):
44 samples = []
45 for i in range(128):
46 samples.append(b'foo' * 64)
47 samples.append(b'bar' * 64)
48
49 d = zstd.train_dictionary(8192, samples)
50
51 cctx = zstd.ZstdCompressor(dict_data=d, threads=2)
52
53 with self.assertRaisesRegexp(zstd.ZstdError, 'compress\(\) cannot be used with both dictionaries and multi-threaded compression'):
54 cctx.compress(b'foo')
55
56 params = zstd.get_compression_parameters(3)
57 cctx = zstd.ZstdCompressor(compression_params=params, threads=2)
58 with self.assertRaisesRegexp(zstd.ZstdError, 'compress\(\) cannot be used with both compression parameters and multi-threaded compression'):
59 cctx.compress(b'foo')
60
37 61 def test_compress_empty(self):
38 62 cctx = zstd.ZstdCompressor(level=1)
39 63 result = cctx.compress(b'')
@@ -132,6 +156,21 b' class TestCompressor_compress(unittest.T'
132 156 for i in range(32):
133 157 cctx.compress(b'foo bar foobar foo bar foobar')
134 158
159 def test_multithreaded(self):
160 chunk_size = multithreaded_chunk_size(1)
161 source = b''.join([b'x' * chunk_size, b'y' * chunk_size])
162
163 cctx = zstd.ZstdCompressor(level=1, threads=2)
164 compressed = cctx.compress(source)
165
166 params = zstd.get_frame_parameters(compressed)
167 self.assertEqual(params.content_size, chunk_size * 2)
168 self.assertEqual(params.dict_id, 0)
169 self.assertFalse(params.has_checksum)
170
171 dctx = zstd.ZstdDecompressor()
172 self.assertEqual(dctx.decompress(compressed), source)
173
135 174
136 175 @make_cffi
137 176 class TestCompressor_compressobj(unittest.TestCase):
@@ -237,6 +276,30 b' class TestCompressor_compressobj(unittes'
237 276 header = trailing[0:3]
238 277 self.assertEqual(header, b'\x01\x00\x00')
239 278
279 def test_multithreaded(self):
280 source = io.BytesIO()
281 source.write(b'a' * 1048576)
282 source.write(b'b' * 1048576)
283 source.write(b'c' * 1048576)
284 source.seek(0)
285
286 cctx = zstd.ZstdCompressor(level=1, threads=2)
287 cobj = cctx.compressobj()
288
289 chunks = []
290 while True:
291 d = source.read(8192)
292 if not d:
293 break
294
295 chunks.append(cobj.compress(d))
296
297 chunks.append(cobj.flush())
298
299 compressed = b''.join(chunks)
300
301 self.assertEqual(len(compressed), 295)
302
240 303
241 304 @make_cffi
242 305 class TestCompressor_copy_stream(unittest.TestCase):
@@ -355,6 +418,36 b' class TestCompressor_copy_stream(unittes'
355 418 self.assertEqual(source._read_count, len(source.getvalue()) + 1)
356 419 self.assertEqual(dest._write_count, len(dest.getvalue()))
357 420
421 def test_multithreaded(self):
422 source = io.BytesIO()
423 source.write(b'a' * 1048576)
424 source.write(b'b' * 1048576)
425 source.write(b'c' * 1048576)
426 source.seek(0)
427
428 dest = io.BytesIO()
429 cctx = zstd.ZstdCompressor(threads=2)
430 r, w = cctx.copy_stream(source, dest)
431 self.assertEqual(r, 3145728)
432 self.assertEqual(w, 295)
433
434 params = zstd.get_frame_parameters(dest.getvalue())
435 self.assertEqual(params.content_size, 0)
436 self.assertEqual(params.dict_id, 0)
437 self.assertFalse(params.has_checksum)
438
439 # Writing content size and checksum works.
440 cctx = zstd.ZstdCompressor(threads=2, write_content_size=True,
441 write_checksum=True)
442 dest = io.BytesIO()
443 source.seek(0)
444 cctx.copy_stream(source, dest, size=len(source.getvalue()))
445
446 params = zstd.get_frame_parameters(dest.getvalue())
447 self.assertEqual(params.content_size, 3145728)
448 self.assertEqual(params.dict_id, 0)
449 self.assertTrue(params.has_checksum)
450
358 451
359 452 def compress(data, level):
360 453 buffer = io.BytesIO()
@@ -584,6 +677,16 b' class TestCompressor_write_to(unittest.T'
584 677 header = trailing[0:3]
585 678 self.assertEqual(header, b'\x01\x00\x00')
586 679
680 def test_multithreaded(self):
681 dest = io.BytesIO()
682 cctx = zstd.ZstdCompressor(threads=2)
683 with cctx.write_to(dest) as compressor:
684 compressor.write(b'a' * 1048576)
685 compressor.write(b'b' * 1048576)
686 compressor.write(b'c' * 1048576)
687
688 self.assertEqual(len(dest.getvalue()), 295)
689
587 690
588 691 @make_cffi
589 692 class TestCompressor_read_from(unittest.TestCase):
@@ -673,3 +776,130 b' class TestCompressor_read_from(unittest.'
673 776 self.assertEqual(len(chunk), 1)
674 777
675 778 self.assertEqual(source._read_count, len(source.getvalue()) + 1)
779
780 def test_multithreaded(self):
781 source = io.BytesIO()
782 source.write(b'a' * 1048576)
783 source.write(b'b' * 1048576)
784 source.write(b'c' * 1048576)
785 source.seek(0)
786
787 cctx = zstd.ZstdCompressor(threads=2)
788
789 compressed = b''.join(cctx.read_from(source))
790 self.assertEqual(len(compressed), 295)
791
792
793 class TestCompressor_multi_compress_to_buffer(unittest.TestCase):
794 def test_multithreaded_unsupported(self):
795 cctx = zstd.ZstdCompressor(threads=2)
796
797 with self.assertRaisesRegexp(zstd.ZstdError, 'function cannot be called on ZstdCompressor configured for multi-threaded compression'):
798 cctx.multi_compress_to_buffer([b'foo'])
799
800 def test_invalid_inputs(self):
801 cctx = zstd.ZstdCompressor()
802
803 with self.assertRaises(TypeError):
804 cctx.multi_compress_to_buffer(True)
805
806 with self.assertRaises(TypeError):
807 cctx.multi_compress_to_buffer((1, 2))
808
809 with self.assertRaisesRegexp(TypeError, 'item 0 not a bytes like object'):
810 cctx.multi_compress_to_buffer([u'foo'])
811
812 def test_empty_input(self):
813 cctx = zstd.ZstdCompressor()
814
815 with self.assertRaisesRegexp(ValueError, 'no source elements found'):
816 cctx.multi_compress_to_buffer([])
817
818 with self.assertRaisesRegexp(ValueError, 'source elements are empty'):
819 cctx.multi_compress_to_buffer([b'', b'', b''])
820
821 def test_list_input(self):
822 cctx = zstd.ZstdCompressor(write_content_size=True, write_checksum=True)
823
824 original = [b'foo' * 12, b'bar' * 6]
825 frames = [cctx.compress(c) for c in original]
826 b = cctx.multi_compress_to_buffer(original)
827
828 self.assertIsInstance(b, zstd.BufferWithSegmentsCollection)
829
830 self.assertEqual(len(b), 2)
831 self.assertEqual(b.size(), 44)
832
833 self.assertEqual(b[0].tobytes(), frames[0])
834 self.assertEqual(b[1].tobytes(), frames[1])
835
836 def test_buffer_with_segments_input(self):
837 cctx = zstd.ZstdCompressor(write_content_size=True, write_checksum=True)
838
839 original = [b'foo' * 4, b'bar' * 6]
840 frames = [cctx.compress(c) for c in original]
841
842 offsets = struct.pack('=QQQQ', 0, len(original[0]),
843 len(original[0]), len(original[1]))
844 segments = zstd.BufferWithSegments(b''.join(original), offsets)
845
846 result = cctx.multi_compress_to_buffer(segments)
847
848 self.assertEqual(len(result), 2)
849 self.assertEqual(result.size(), 47)
850
851 self.assertEqual(result[0].tobytes(), frames[0])
852 self.assertEqual(result[1].tobytes(), frames[1])
853
854 def test_buffer_with_segments_collection_input(self):
855 cctx = zstd.ZstdCompressor(write_content_size=True, write_checksum=True)
856
857 original = [
858 b'foo1',
859 b'foo2' * 2,
860 b'foo3' * 3,
861 b'foo4' * 4,
862 b'foo5' * 5,
863 ]
864
865 frames = [cctx.compress(c) for c in original]
866
867 b = b''.join([original[0], original[1]])
868 b1 = zstd.BufferWithSegments(b, struct.pack('=QQQQ',
869 0, len(original[0]),
870 len(original[0]), len(original[1])))
871 b = b''.join([original[2], original[3], original[4]])
872 b2 = zstd.BufferWithSegments(b, struct.pack('=QQQQQQ',
873 0, len(original[2]),
874 len(original[2]), len(original[3]),
875 len(original[2]) + len(original[3]), len(original[4])))
876
877 c = zstd.BufferWithSegmentsCollection(b1, b2)
878
879 result = cctx.multi_compress_to_buffer(c)
880
881 self.assertEqual(len(result), len(frames))
882
883 for i, frame in enumerate(frames):
884 self.assertEqual(result[i].tobytes(), frame)
885
886 def test_multiple_threads(self):
887 # threads argument will cause multi-threaded ZSTD APIs to be used, which will
888 # make output different.
889 refcctx = zstd.ZstdCompressor(write_content_size=True, write_checksum=True)
890 reference = [refcctx.compress(b'x' * 64), refcctx.compress(b'y' * 64)]
891
892 cctx = zstd.ZstdCompressor(write_content_size=True, write_checksum=True)
893
894 frames = []
895 frames.extend(b'x' * 64 for i in range(256))
896 frames.extend(b'y' * 64 for i in range(256))
897
898 result = cctx.multi_compress_to_buffer(frames, threads=-1)
899
900 self.assertEqual(len(result), 512)
901 for i in range(512):
902 if i < 256:
903 self.assertEqual(result[i].tobytes(), reference[0])
904 else:
905 self.assertEqual(result[i].tobytes(), reference[1])
@@ -1,16 +1,8 b''
1 import io
2
3 1 try:
4 2 import unittest2 as unittest
5 3 except ImportError:
6 4 import unittest
7 5
8 try:
9 import hypothesis
10 import hypothesis.strategies as strategies
11 except ImportError:
12 hypothesis = None
13
14 6 import zstd
15 7
16 8 from . common import (
@@ -32,7 +24,7 b' class TestCompressionParameters(unittest'
32 24 zstd.CHAINLOG_MIN,
33 25 zstd.HASHLOG_MIN,
34 26 zstd.SEARCHLOG_MIN,
35 zstd.SEARCHLENGTH_MIN,
27 zstd.SEARCHLENGTH_MIN + 1,
36 28 zstd.TARGETLENGTH_MIN,
37 29 zstd.STRATEGY_FAST)
38 30
@@ -40,7 +32,7 b' class TestCompressionParameters(unittest'
40 32 zstd.CHAINLOG_MAX,
41 33 zstd.HASHLOG_MAX,
42 34 zstd.SEARCHLOG_MAX,
43 zstd.SEARCHLENGTH_MAX,
35 zstd.SEARCHLENGTH_MAX - 1,
44 36 zstd.TARGETLENGTH_MAX,
45 37 zstd.STRATEGY_BTOPT)
46 38
@@ -60,6 +52,13 b' class TestCompressionParameters(unittest'
60 52 self.assertEqual(p.target_length, 8)
61 53 self.assertEqual(p.strategy, 1)
62 54
55 def test_estimated_compression_context_size(self):
56 p = zstd.CompressionParameters(20, 16, 17, 1, 5, 16, zstd.STRATEGY_DFAST)
57
58 # 32-bit has slightly different values from 64-bit.
59 self.assertAlmostEqual(p.estimated_compression_context_size(), 1287076,
60 delta=110)
61
63 62
64 63 @make_cffi
65 64 class TestFrameParameters(unittest.TestCase):
@@ -122,65 +121,3 b' class TestFrameParameters(unittest.TestC'
122 121 self.assertEqual(params.window_size, 262144)
123 122 self.assertEqual(params.dict_id, 15)
124 123 self.assertTrue(params.has_checksum)
125
126
127 if hypothesis:
128 s_windowlog = strategies.integers(min_value=zstd.WINDOWLOG_MIN,
129 max_value=zstd.WINDOWLOG_MAX)
130 s_chainlog = strategies.integers(min_value=zstd.CHAINLOG_MIN,
131 max_value=zstd.CHAINLOG_MAX)
132 s_hashlog = strategies.integers(min_value=zstd.HASHLOG_MIN,
133 max_value=zstd.HASHLOG_MAX)
134 s_searchlog = strategies.integers(min_value=zstd.SEARCHLOG_MIN,
135 max_value=zstd.SEARCHLOG_MAX)
136 s_searchlength = strategies.integers(min_value=zstd.SEARCHLENGTH_MIN,
137 max_value=zstd.SEARCHLENGTH_MAX)
138 s_targetlength = strategies.integers(min_value=zstd.TARGETLENGTH_MIN,
139 max_value=zstd.TARGETLENGTH_MAX)
140 s_strategy = strategies.sampled_from((zstd.STRATEGY_FAST,
141 zstd.STRATEGY_DFAST,
142 zstd.STRATEGY_GREEDY,
143 zstd.STRATEGY_LAZY,
144 zstd.STRATEGY_LAZY2,
145 zstd.STRATEGY_BTLAZY2,
146 zstd.STRATEGY_BTOPT))
147
148
149 @make_cffi
150 class TestCompressionParametersHypothesis(unittest.TestCase):
151 @hypothesis.given(s_windowlog, s_chainlog, s_hashlog, s_searchlog,
152 s_searchlength, s_targetlength, s_strategy)
153 def test_valid_init(self, windowlog, chainlog, hashlog, searchlog,
154 searchlength, targetlength, strategy):
155 p = zstd.CompressionParameters(windowlog, chainlog, hashlog,
156 searchlog, searchlength,
157 targetlength, strategy)
158
159 # Verify we can instantiate a compressor with the supplied values.
160 # ZSTD_checkCParams moves the goal posts on us from what's advertised
161 # in the constants. So move along with them.
162 if searchlength == zstd.SEARCHLENGTH_MIN and strategy in (zstd.STRATEGY_FAST, zstd.STRATEGY_GREEDY):
163 searchlength += 1
164 p = zstd.CompressionParameters(windowlog, chainlog, hashlog,
165 searchlog, searchlength,
166 targetlength, strategy)
167 elif searchlength == zstd.SEARCHLENGTH_MAX and strategy != zstd.STRATEGY_FAST:
168 searchlength -= 1
169 p = zstd.CompressionParameters(windowlog, chainlog, hashlog,
170 searchlog, searchlength,
171 targetlength, strategy)
172
173 cctx = zstd.ZstdCompressor(compression_params=p)
174 with cctx.write_to(io.BytesIO()):
175 pass
176
177 @hypothesis.given(s_windowlog, s_chainlog, s_hashlog, s_searchlog,
178 s_searchlength, s_targetlength, s_strategy)
179 def test_estimate_compression_context_size(self, windowlog, chainlog,
180 hashlog, searchlog,
181 searchlength, targetlength,
182 strategy):
183 p = zstd.CompressionParameters(windowlog, chainlog, hashlog,
184 searchlog, searchlength,
185 targetlength, strategy)
186 size = zstd.estimate_compression_context_size(p)
@@ -49,7 +49,7 b' class TestDecompressor_decompress(unitte'
49 49 compressed = cctx.compress(b'foobar')
50 50
51 51 dctx = zstd.ZstdDecompressor()
52 decompressed = dctx.decompress(compressed)
52 decompressed = dctx.decompress(compressed)
53 53 self.assertEqual(decompressed, b'foobar')
54 54
55 55 def test_max_output_size(self):
@@ -293,7 +293,6 b' class TestDecompressor_write_to(unittest'
293 293 c = s.pack(c)
294 294 decompressor.write(c)
295 295
296
297 296 self.assertEqual(dest.getvalue(), b'foobarfoobar')
298 297 self.assertEqual(dest._write_count, len(dest.getvalue()))
299 298
@@ -575,3 +574,168 b' class TestDecompressor_content_dict_chai'
575 574 dctx = zstd.ZstdDecompressor()
576 575 decompressed = dctx.decompress_content_dict_chain(chain)
577 576 self.assertEqual(decompressed, expected)
577
578
579 # TODO enable for CFFI
580 class TestDecompressor_multi_decompress_to_buffer(unittest.TestCase):
581 def test_invalid_inputs(self):
582 dctx = zstd.ZstdDecompressor()
583
584 with self.assertRaises(TypeError):
585 dctx.multi_decompress_to_buffer(True)
586
587 with self.assertRaises(TypeError):
588 dctx.multi_decompress_to_buffer((1, 2))
589
590 with self.assertRaisesRegexp(TypeError, 'item 0 not a bytes like object'):
591 dctx.multi_decompress_to_buffer([u'foo'])
592
593 with self.assertRaisesRegexp(ValueError, 'could not determine decompressed size of item 0'):
594 dctx.multi_decompress_to_buffer([b'foobarbaz'])
595
596 def test_list_input(self):
597 cctx = zstd.ZstdCompressor(write_content_size=True)
598
599 original = [b'foo' * 4, b'bar' * 6]
600 frames = [cctx.compress(d) for d in original]
601
602 dctx = zstd.ZstdDecompressor()
603 result = dctx.multi_decompress_to_buffer(frames)
604
605 self.assertEqual(len(result), len(frames))
606 self.assertEqual(result.size(), sum(map(len, original)))
607
608 for i, data in enumerate(original):
609 self.assertEqual(result[i].tobytes(), data)
610
611 self.assertEqual(result[0].offset, 0)
612 self.assertEqual(len(result[0]), 12)
613 self.assertEqual(result[1].offset, 12)
614 self.assertEqual(len(result[1]), 18)
615
616 def test_list_input_frame_sizes(self):
617 cctx = zstd.ZstdCompressor(write_content_size=False)
618
619 original = [b'foo' * 4, b'bar' * 6, b'baz' * 8]
620 frames = [cctx.compress(d) for d in original]
621 sizes = struct.pack('=' + 'Q' * len(original), *map(len, original))
622
623 dctx = zstd.ZstdDecompressor()
624 result = dctx.multi_decompress_to_buffer(frames, decompressed_sizes=sizes)
625
626 self.assertEqual(len(result), len(frames))
627 self.assertEqual(result.size(), sum(map(len, original)))
628
629 for i, data in enumerate(original):
630 self.assertEqual(result[i].tobytes(), data)
631
632 def test_buffer_with_segments_input(self):
633 cctx = zstd.ZstdCompressor(write_content_size=True)
634
635 original = [b'foo' * 4, b'bar' * 6]
636 frames = [cctx.compress(d) for d in original]
637
638 dctx = zstd.ZstdDecompressor()
639
640 segments = struct.pack('=QQQQ', 0, len(frames[0]), len(frames[0]), len(frames[1]))
641 b = zstd.BufferWithSegments(b''.join(frames), segments)
642
643 result = dctx.multi_decompress_to_buffer(b)
644
645 self.assertEqual(len(result), len(frames))
646 self.assertEqual(result[0].offset, 0)
647 self.assertEqual(len(result[0]), 12)
648 self.assertEqual(result[1].offset, 12)
649 self.assertEqual(len(result[1]), 18)
650
651 def test_buffer_with_segments_sizes(self):
652 cctx = zstd.ZstdCompressor(write_content_size=False)
653 original = [b'foo' * 4, b'bar' * 6, b'baz' * 8]
654 frames = [cctx.compress(d) for d in original]
655 sizes = struct.pack('=' + 'Q' * len(original), *map(len, original))
656
657 segments = struct.pack('=QQQQQQ', 0, len(frames[0]),
658 len(frames[0]), len(frames[1]),
659 len(frames[0]) + len(frames[1]), len(frames[2]))
660 b = zstd.BufferWithSegments(b''.join(frames), segments)
661
662 dctx = zstd.ZstdDecompressor()
663 result = dctx.multi_decompress_to_buffer(b, decompressed_sizes=sizes)
664
665 self.assertEqual(len(result), len(frames))
666 self.assertEqual(result.size(), sum(map(len, original)))
667
668 for i, data in enumerate(original):
669 self.assertEqual(result[i].tobytes(), data)
670
671 def test_buffer_with_segments_collection_input(self):
672 cctx = zstd.ZstdCompressor(write_content_size=True)
673
674 original = [
675 b'foo0' * 2,
676 b'foo1' * 3,
677 b'foo2' * 4,
678 b'foo3' * 5,
679 b'foo4' * 6,
680 ]
681
682 frames = cctx.multi_compress_to_buffer(original)
683
684 # Check round trip.
685 dctx = zstd.ZstdDecompressor()
686 decompressed = dctx.multi_decompress_to_buffer(frames, threads=3)
687
688 self.assertEqual(len(decompressed), len(original))
689
690 for i, data in enumerate(original):
691 self.assertEqual(data, decompressed[i].tobytes())
692
693 # And a manual mode.
694 b = b''.join([frames[0].tobytes(), frames[1].tobytes()])
695 b1 = zstd.BufferWithSegments(b, struct.pack('=QQQQ',
696 0, len(frames[0]),
697 len(frames[0]), len(frames[1])))
698
699 b = b''.join([frames[2].tobytes(), frames[3].tobytes(), frames[4].tobytes()])
700 b2 = zstd.BufferWithSegments(b, struct.pack('=QQQQQQ',
701 0, len(frames[2]),
702 len(frames[2]), len(frames[3]),
703 len(frames[2]) + len(frames[3]), len(frames[4])))
704
705 c = zstd.BufferWithSegmentsCollection(b1, b2)
706
707 dctx = zstd.ZstdDecompressor()
708 decompressed = dctx.multi_decompress_to_buffer(c)
709
710 self.assertEqual(len(decompressed), 5)
711 for i in range(5):
712 self.assertEqual(decompressed[i].tobytes(), original[i])
713
714 def test_multiple_threads(self):
715 cctx = zstd.ZstdCompressor(write_content_size=True)
716
717 frames = []
718 frames.extend(cctx.compress(b'x' * 64) for i in range(256))
719 frames.extend(cctx.compress(b'y' * 64) for i in range(256))
720
721 dctx = zstd.ZstdDecompressor()
722 result = dctx.multi_decompress_to_buffer(frames, threads=-1)
723
724 self.assertEqual(len(result), len(frames))
725 self.assertEqual(result.size(), 2 * 64 * 256)
726 self.assertEqual(result[0].tobytes(), b'x' * 64)
727 self.assertEqual(result[256].tobytes(), b'y' * 64)
728
729 def test_item_failure(self):
730 cctx = zstd.ZstdCompressor(write_content_size=True)
731 frames = [cctx.compress(b'x' * 128), cctx.compress(b'y' * 128)]
732
733 frames[1] = frames[1] + b'extra'
734
735 dctx = zstd.ZstdDecompressor()
736
737 with self.assertRaisesRegexp(zstd.ZstdError, 'error decompressing item 1: Src size incorrect'):
738 dctx.multi_decompress_to_buffer(frames)
739
740 with self.assertRaisesRegexp(zstd.ZstdError, 'error decompressing item 1: Src size incorrect'):
741 dctx.multi_decompress_to_buffer(frames, threads=2)
@@ -48,3 +48,63 b' class TestTrainDictionary(unittest.TestC'
48 48
49 49 data = d.as_bytes()
50 50 self.assertEqual(data[0:4], b'\x37\xa4\x30\xec')
51
52 def test_set_dict_id(self):
53 samples = []
54 for i in range(128):
55 samples.append(b'foo' * 64)
56 samples.append(b'foobar' * 64)
57
58 d = zstd.train_dictionary(8192, samples, dict_id=42)
59 self.assertEqual(d.dict_id(), 42)
60
61
62 @make_cffi
63 class TestTrainCoverDictionary(unittest.TestCase):
64 def test_no_args(self):
65 with self.assertRaises(TypeError):
66 zstd.train_cover_dictionary()
67
68 def test_bad_args(self):
69 with self.assertRaises(TypeError):
70 zstd.train_cover_dictionary(8192, u'foo')
71
72 with self.assertRaises(ValueError):
73 zstd.train_cover_dictionary(8192, [u'foo'])
74
75 def test_basic(self):
76 samples = []
77 for i in range(128):
78 samples.append(b'foo' * 64)
79 samples.append(b'foobar' * 64)
80
81 d = zstd.train_cover_dictionary(8192, samples, k=64, d=16)
82 self.assertIsInstance(d.dict_id(), int_type)
83
84 data = d.as_bytes()
85 self.assertEqual(data[0:4], b'\x37\xa4\x30\xec')
86
87 self.assertEqual(d.k, 64)
88 self.assertEqual(d.d, 16)
89
90 def test_set_dict_id(self):
91 samples = []
92 for i in range(128):
93 samples.append(b'foo' * 64)
94 samples.append(b'foobar' * 64)
95
96 d = zstd.train_cover_dictionary(8192, samples, k=64, d=16,
97 dict_id=42)
98 self.assertEqual(d.dict_id(), 42)
99
100 def test_optimize(self):
101 samples = []
102 for i in range(128):
103 samples.append(b'foo' * 64)
104 samples.append(b'foobar' * 64)
105
106 d = zstd.train_cover_dictionary(8192, samples, optimize=True,
107 threads=-1, steps=1, d=16)
108
109 self.assertEqual(d.k, 16)
110 self.assertEqual(d.d, 16)
@@ -8,6 +8,11 b''
8 8
9 9 /* A Python C extension for Zstandard. */
10 10
11 #if defined(_WIN32)
12 #define WIN32_LEAN_AND_MEAN
13 #include <Windows.h>
14 #endif
15
11 16 #include "python-zstandard.h"
12 17
13 18 PyObject *ZstdError;
@@ -49,9 +54,22 b' PyDoc_STRVAR(train_dictionary__doc__,'
49 54 "\n"
50 55 "The raw dictionary content will be returned\n");
51 56
57 PyDoc_STRVAR(train_cover_dictionary__doc__,
58 "train_cover_dictionary(dict_size, samples, k=None, d=None, notifications=0, dict_id=0, level=0)\n"
59 "\n"
60 "Train a dictionary from sample data using the COVER algorithm.\n"
61 "\n"
62 "This behaves like ``train_dictionary()`` except a different algorithm is\n"
63 "used to create the dictionary. The algorithm has 2 parameters: ``k`` and\n"
64 "``d``. These control the *segment size* and *dmer size*. A reasonable range\n"
65 "for ``k`` is ``[16, 2048+]``. A reasonable range for ``d`` is ``[6, 16]``.\n"
66 "``d`` must be less than or equal to ``k``.\n"
67 );
68
52 69 static char zstd_doc[] = "Interface to zstandard";
53 70
54 71 static PyMethodDef zstd_methods[] = {
72 /* TODO remove since it is a method on CompressionParameters. */
55 73 { "estimate_compression_context_size", (PyCFunction)estimate_compression_context_size,
56 74 METH_VARARGS, estimate_compression_context_size__doc__ },
57 75 { "estimate_decompression_context_size", (PyCFunction)estimate_decompression_context_size,
@@ -62,14 +80,16 b' static PyMethodDef zstd_methods[] = {'
62 80 METH_VARARGS, get_frame_parameters__doc__ },
63 81 { "train_dictionary", (PyCFunction)train_dictionary,
64 82 METH_VARARGS | METH_KEYWORDS, train_dictionary__doc__ },
83 { "train_cover_dictionary", (PyCFunction)train_cover_dictionary,
84 METH_VARARGS | METH_KEYWORDS, train_cover_dictionary__doc__ },
65 85 { NULL, NULL }
66 86 };
67 87
88 void bufferutil_module_init(PyObject* mod);
68 89 void compressobj_module_init(PyObject* mod);
69 90 void compressor_module_init(PyObject* mod);
70 91 void compressionparams_module_init(PyObject* mod);
71 92 void constants_module_init(PyObject* mod);
72 void dictparams_module_init(PyObject* mod);
73 93 void compressiondict_module_init(PyObject* mod);
74 94 void compressionwriter_module_init(PyObject* mod);
75 95 void compressoriterator_module_init(PyObject* mod);
@@ -100,8 +120,8 b' void zstd_module_init(PyObject* m) {'
100 120 return;
101 121 }
102 122
123 bufferutil_module_init(m);
103 124 compressionparams_module_init(m);
104 dictparams_module_init(m);
105 125 compressiondict_module_init(m);
106 126 compressobj_module_init(m);
107 127 compressor_module_init(m);
@@ -143,3 +163,48 b' PyMODINIT_FUNC initzstd(void) {'
143 163 }
144 164 }
145 165 #endif
166
167 /* Attempt to resolve the number of CPUs in the system. */
168 int cpu_count() {
169 int count = 0;
170
171 #if defined(_WIN32)
172 SYSTEM_INFO si;
173 si.dwNumberOfProcessors = 0;
174 GetSystemInfo(&si);
175 count = si.dwNumberOfProcessors;
176 #elif defined(__APPLE__)
177 int num;
178 size_t size = sizeof(int);
179
180 if (0 == sysctlbyname("hw.logicalcpu", &num, &size, NULL, 0)) {
181 count = num;
182 }
183 #elif defined(__linux__)
184 count = sysconf(_SC_NPROCESSORS_ONLN);
185 #elif defined(__OpenBSD__) || defined(__FreeBSD__) || defined(__NetBSD__) || defined(__DragonFly__)
186 int mib[2];
187 size_t len = sizeof(count);
188 mib[0] = CTL_HW;
189 mib[1] = HW_NCPU;
190 if (0 != sysctl(mib, 2, &count, &len, NULL, 0)) {
191 count = 0;
192 }
193 #elif defined(__hpux)
194 count = mpctl(MPC_GETNUMSPUS, NULL, NULL);
195 #endif
196
197 return count;
198 }
199
200 size_t roundpow2(size_t i) {
201 i--;
202 i |= i >> 1;
203 i |= i >> 2;
204 i |= i >> 4;
205 i |= i >> 8;
206 i |= i >> 16;
207 i++;
208
209 return i;
210 }
@@ -8,6 +8,7 b''
8 8
9 9 from __future__ import absolute_import, unicode_literals
10 10
11 import os
11 12 import sys
12 13
13 14 from _zstd_cffi import (
@@ -62,6 +63,26 b' COMPRESSOBJ_FLUSH_FINISH = 0'
62 63 COMPRESSOBJ_FLUSH_BLOCK = 1
63 64
64 65
66 def _cpu_count():
67 # os.cpu_count() was introducd in Python 3.4.
68 try:
69 return os.cpu_count() or 0
70 except AttributeError:
71 pass
72
73 # Linux.
74 try:
75 if sys.version_info[0] == 2:
76 return os.sysconf(b'SC_NPROCESSORS_ONLN')
77 else:
78 return os.sysconf(u'SC_NPROCESSORS_ONLN')
79 except (AttributeError, ValueError):
80 pass
81
82 # TODO implement on other platforms.
83 return 0
84
85
65 86 class ZstdError(Exception):
66 87 pass
67 88
@@ -98,6 +119,14 b' class CompressionParameters(object):'
98 119 self.target_length = target_length
99 120 self.strategy = strategy
100 121
122 zresult = lib.ZSTD_checkCParams(self.as_compression_parameters())
123 if lib.ZSTD_isError(zresult):
124 raise ValueError('invalid compression parameters: %s',
125 ffi.string(lib.ZSTD_getErrorName(zresult)))
126
127 def estimated_compression_context_size(self):
128 return lib.ZSTD_estimateCCtxSize(self.as_compression_parameters())
129
101 130 def as_compression_parameters(self):
102 131 p = ffi.new('ZSTD_compressionParameters *')[0]
103 132 p.windowLog = self.window_log
@@ -140,12 +169,16 b' class ZstdCompressionWriter(object):'
140 169 self._source_size = source_size
141 170 self._write_size = write_size
142 171 self._entered = False
172 self._mtcctx = compressor._cctx if compressor._multithreaded else None
143 173
144 174 def __enter__(self):
145 175 if self._entered:
146 176 raise ZstdError('cannot __enter__ multiple times')
147 177
148 self._cstream = self._compressor._get_cstream(self._source_size)
178 if self._mtcctx:
179 self._compressor._init_mtcstream(self._source_size)
180 else:
181 self._compressor._ensure_cstream(self._source_size)
149 182 self._entered = True
150 183 return self
151 184
@@ -160,7 +193,10 b' class ZstdCompressionWriter(object):'
160 193 out_buffer.pos = 0
161 194
162 195 while True:
163 zresult = lib.ZSTD_endStream(self._cstream, out_buffer)
196 if self._mtcctx:
197 zresult = lib.ZSTDMT_endStream(self._mtcctx, out_buffer)
198 else:
199 zresult = lib.ZSTD_endStream(self._compressor._cstream, out_buffer)
164 200 if lib.ZSTD_isError(zresult):
165 201 raise ZstdError('error ending compression stream: %s' %
166 202 ffi.string(lib.ZSTD_getErrorName(zresult)))
@@ -172,7 +208,6 b' class ZstdCompressionWriter(object):'
172 208 if zresult == 0:
173 209 break
174 210
175 self._cstream = None
176 211 self._compressor = None
177 212
178 213 return False
@@ -182,7 +217,7 b' class ZstdCompressionWriter(object):'
182 217 raise ZstdError('cannot determine size of an inactive compressor; '
183 218 'call when a context manager is active')
184 219
185 return lib.ZSTD_sizeof_CStream(self._cstream)
220 return lib.ZSTD_sizeof_CStream(self._compressor._cstream)
186 221
187 222 def write(self, data):
188 223 if not self._entered:
@@ -205,7 +240,12 b' class ZstdCompressionWriter(object):'
205 240 out_buffer.pos = 0
206 241
207 242 while in_buffer.pos < in_buffer.size:
208 zresult = lib.ZSTD_compressStream(self._cstream, out_buffer, in_buffer)
243 if self._mtcctx:
244 zresult = lib.ZSTDMT_compressStream(self._mtcctx, out_buffer,
245 in_buffer)
246 else:
247 zresult = lib.ZSTD_compressStream(self._compressor._cstream, out_buffer,
248 in_buffer)
209 249 if lib.ZSTD_isError(zresult):
210 250 raise ZstdError('zstd compress error: %s' %
211 251 ffi.string(lib.ZSTD_getErrorName(zresult)))
@@ -230,7 +270,10 b' class ZstdCompressionWriter(object):'
230 270 out_buffer.pos = 0
231 271
232 272 while True:
233 zresult = lib.ZSTD_flushStream(self._cstream, out_buffer)
273 if self._mtcctx:
274 zresult = lib.ZSTDMT_flushStream(self._mtcctx, out_buffer)
275 else:
276 zresult = lib.ZSTD_flushStream(self._compressor._cstream, out_buffer)
234 277 if lib.ZSTD_isError(zresult):
235 278 raise ZstdError('zstd compress error: %s' %
236 279 ffi.string(lib.ZSTD_getErrorName(zresult)))
@@ -259,7 +302,12 b' class ZstdCompressionObj(object):'
259 302 chunks = []
260 303
261 304 while source.pos < len(data):
262 zresult = lib.ZSTD_compressStream(self._cstream, self._out, source)
305 if self._mtcctx:
306 zresult = lib.ZSTDMT_compressStream(self._mtcctx,
307 self._out, source)
308 else:
309 zresult = lib.ZSTD_compressStream(self._compressor._cstream, self._out,
310 source)
263 311 if lib.ZSTD_isError(zresult):
264 312 raise ZstdError('zstd compress error: %s' %
265 313 ffi.string(lib.ZSTD_getErrorName(zresult)))
@@ -280,7 +328,10 b' class ZstdCompressionObj(object):'
280 328 assert self._out.pos == 0
281 329
282 330 if flush_mode == COMPRESSOBJ_FLUSH_BLOCK:
283 zresult = lib.ZSTD_flushStream(self._cstream, self._out)
331 if self._mtcctx:
332 zresult = lib.ZSTDMT_flushStream(self._mtcctx, self._out)
333 else:
334 zresult = lib.ZSTD_flushStream(self._compressor._cstream, self._out)
284 335 if lib.ZSTD_isError(zresult):
285 336 raise ZstdError('zstd compress error: %s' %
286 337 ffi.string(lib.ZSTD_getErrorName(zresult)))
@@ -301,7 +352,10 b' class ZstdCompressionObj(object):'
301 352 chunks = []
302 353
303 354 while True:
304 zresult = lib.ZSTD_endStream(self._cstream, self._out)
355 if self._mtcctx:
356 zresult = lib.ZSTDMT_endStream(self._mtcctx, self._out)
357 else:
358 zresult = lib.ZSTD_endStream(self._compressor._cstream, self._out)
305 359 if lib.ZSTD_isError(zresult):
306 360 raise ZstdError('error ending compression stream: %s' %
307 361 ffi.string(lib.ZSTD_getErroName(zresult)))
@@ -313,21 +367,21 b' class ZstdCompressionObj(object):'
313 367 if not zresult:
314 368 break
315 369
316 # GC compression stream immediately.
317 self._cstream = None
318
319 370 return b''.join(chunks)
320 371
321 372
322 373 class ZstdCompressor(object):
323 374 def __init__(self, level=3, dict_data=None, compression_params=None,
324 375 write_checksum=False, write_content_size=False,
325 write_dict_id=True):
376 write_dict_id=True, threads=0):
326 377 if level < 1:
327 378 raise ValueError('level must be greater than 0')
328 379 elif level > lib.ZSTD_maxCLevel():
329 380 raise ValueError('level must be less than %d' % lib.ZSTD_maxCLevel())
330 381
382 if threads < 0:
383 threads = _cpu_count()
384
331 385 self._compression_level = level
332 386 self._dict_data = dict_data
333 387 self._cparams = compression_params
@@ -336,16 +390,33 b' class ZstdCompressor(object):'
336 390 self._fparams.contentSizeFlag = write_content_size
337 391 self._fparams.noDictIDFlag = not write_dict_id
338 392
339 cctx = lib.ZSTD_createCCtx()
340 if cctx == ffi.NULL:
341 raise MemoryError()
393 if threads:
394 cctx = lib.ZSTDMT_createCCtx(threads)
395 if cctx == ffi.NULL:
396 raise MemoryError()
342 397
343 self._cctx = ffi.gc(cctx, lib.ZSTD_freeCCtx)
398 self._cctx = ffi.gc(cctx, lib.ZSTDMT_freeCCtx)
399 self._multithreaded = True
400 else:
401 cctx = lib.ZSTD_createCCtx()
402 if cctx == ffi.NULL:
403 raise MemoryError()
404
405 self._cctx = ffi.gc(cctx, lib.ZSTD_freeCCtx)
406 self._multithreaded = False
407
408 self._cstream = None
344 409
345 410 def compress(self, data, allow_empty=False):
346 411 if len(data) == 0 and self._fparams.contentSizeFlag and not allow_empty:
347 412 raise ValueError('cannot write empty inputs when writing content sizes')
348 413
414 if self._multithreaded and self._dict_data:
415 raise ZstdError('compress() cannot be used with both dictionaries and multi-threaded compression')
416
417 if self._multithreaded and self._cparams:
418 raise ZstdError('compress() cannot be used with both compression parameters and multi-threaded compression')
419
349 420 # TODO use a CDict for performance.
350 421 dict_data = ffi.NULL
351 422 dict_size = 0
@@ -365,11 +436,17 b' class ZstdCompressor(object):'
365 436 dest_size = lib.ZSTD_compressBound(len(data))
366 437 out = new_nonzero('char[]', dest_size)
367 438
368 zresult = lib.ZSTD_compress_advanced(self._cctx,
369 ffi.addressof(out), dest_size,
370 data, len(data),
371 dict_data, dict_size,
372 params)
439 if self._multithreaded:
440 zresult = lib.ZSTDMT_compressCCtx(self._cctx,
441 ffi.addressof(out), dest_size,
442 data, len(data),
443 self._compression_level)
444 else:
445 zresult = lib.ZSTD_compress_advanced(self._cctx,
446 ffi.addressof(out), dest_size,
447 data, len(data),
448 dict_data, dict_size,
449 params)
373 450
374 451 if lib.ZSTD_isError(zresult):
375 452 raise ZstdError('cannot compress: %s' %
@@ -378,9 +455,12 b' class ZstdCompressor(object):'
378 455 return ffi.buffer(out, zresult)[:]
379 456
380 457 def compressobj(self, size=0):
381 cstream = self._get_cstream(size)
458 if self._multithreaded:
459 self._init_mtcstream(size)
460 else:
461 self._ensure_cstream(size)
462
382 463 cobj = ZstdCompressionObj()
383 cobj._cstream = cstream
384 464 cobj._out = ffi.new('ZSTD_outBuffer *')
385 465 cobj._dst_buffer = ffi.new('char[]', COMPRESSION_RECOMMENDED_OUTPUT_SIZE)
386 466 cobj._out.dst = cobj._dst_buffer
@@ -389,6 +469,11 b' class ZstdCompressor(object):'
389 469 cobj._compressor = self
390 470 cobj._finished = False
391 471
472 if self._multithreaded:
473 cobj._mtcctx = self._cctx
474 else:
475 cobj._mtcctx = None
476
392 477 return cobj
393 478
394 479 def copy_stream(self, ifh, ofh, size=0,
@@ -400,7 +485,11 b' class ZstdCompressor(object):'
400 485 if not hasattr(ofh, 'write'):
401 486 raise ValueError('second argument must have a write() method')
402 487
403 cstream = self._get_cstream(size)
488 mt = self._multithreaded
489 if mt:
490 self._init_mtcstream(size)
491 else:
492 self._ensure_cstream(size)
404 493
405 494 in_buffer = ffi.new('ZSTD_inBuffer *')
406 495 out_buffer = ffi.new('ZSTD_outBuffer *')
@@ -424,7 +513,11 b' class ZstdCompressor(object):'
424 513 in_buffer.pos = 0
425 514
426 515 while in_buffer.pos < in_buffer.size:
427 zresult = lib.ZSTD_compressStream(cstream, out_buffer, in_buffer)
516 if mt:
517 zresult = lib.ZSTDMT_compressStream(self._cctx, out_buffer, in_buffer)
518 else:
519 zresult = lib.ZSTD_compressStream(self._cstream,
520 out_buffer, in_buffer)
428 521 if lib.ZSTD_isError(zresult):
429 522 raise ZstdError('zstd compress error: %s' %
430 523 ffi.string(lib.ZSTD_getErrorName(zresult)))
@@ -436,7 +529,10 b' class ZstdCompressor(object):'
436 529
437 530 # We've finished reading. Flush the compressor.
438 531 while True:
439 zresult = lib.ZSTD_endStream(cstream, out_buffer)
532 if mt:
533 zresult = lib.ZSTDMT_endStream(self._cctx, out_buffer)
534 else:
535 zresult = lib.ZSTD_endStream(self._cstream, out_buffer)
440 536 if lib.ZSTD_isError(zresult):
441 537 raise ZstdError('error ending compression stream: %s' %
442 538 ffi.string(lib.ZSTD_getErrorName(zresult)))
@@ -472,7 +568,10 b' class ZstdCompressor(object):'
472 568 raise ValueError('must pass an object with a read() method or '
473 569 'conforms to buffer protocol')
474 570
475 cstream = self._get_cstream(size)
571 if self._multithreaded:
572 self._init_mtcstream(size)
573 else:
574 self._ensure_cstream(size)
476 575
477 576 in_buffer = ffi.new('ZSTD_inBuffer *')
478 577 out_buffer = ffi.new('ZSTD_outBuffer *')
@@ -512,7 +611,10 b' class ZstdCompressor(object):'
512 611 in_buffer.pos = 0
513 612
514 613 while in_buffer.pos < in_buffer.size:
515 zresult = lib.ZSTD_compressStream(cstream, out_buffer, in_buffer)
614 if self._multithreaded:
615 zresult = lib.ZSTDMT_compressStream(self._cctx, out_buffer, in_buffer)
616 else:
617 zresult = lib.ZSTD_compressStream(self._cstream, out_buffer, in_buffer)
516 618 if lib.ZSTD_isError(zresult):
517 619 raise ZstdError('zstd compress error: %s' %
518 620 ffi.string(lib.ZSTD_getErrorName(zresult)))
@@ -531,7 +633,10 b' class ZstdCompressor(object):'
531 633 # remains.
532 634 while True:
533 635 assert out_buffer.pos == 0
534 zresult = lib.ZSTD_endStream(cstream, out_buffer)
636 if self._multithreaded:
637 zresult = lib.ZSTDMT_endStream(self._cctx, out_buffer)
638 else:
639 zresult = lib.ZSTD_endStream(self._cstream, out_buffer)
535 640 if lib.ZSTD_isError(zresult):
536 641 raise ZstdError('error ending compression stream: %s' %
537 642 ffi.string(lib.ZSTD_getErrorName(zresult)))
@@ -544,7 +649,15 b' class ZstdCompressor(object):'
544 649 if zresult == 0:
545 650 break
546 651
547 def _get_cstream(self, size):
652 def _ensure_cstream(self, size):
653 if self._cstream:
654 zresult = lib.ZSTD_resetCStream(self._cstream, size)
655 if lib.ZSTD_isError(zresult):
656 raise ZstdError('could not reset CStream: %s' %
657 ffi.string(lib.ZSTD_getErrorName(zresult)))
658
659 return
660
548 661 cstream = lib.ZSTD_createCStream()
549 662 if cstream == ffi.NULL:
550 663 raise MemoryError()
@@ -571,7 +684,32 b' class ZstdCompressor(object):'
571 684 raise Exception('cannot init CStream: %s' %
572 685 ffi.string(lib.ZSTD_getErrorName(zresult)))
573 686
574 return cstream
687 self._cstream = cstream
688
689 def _init_mtcstream(self, size):
690 assert self._multithreaded
691
692 dict_data = ffi.NULL
693 dict_size = 0
694 if self._dict_data:
695 dict_data = self._dict_data.as_bytes()
696 dict_size = len(self._dict_data)
697
698 zparams = ffi.new('ZSTD_parameters *')[0]
699 if self._cparams:
700 zparams.cParams = self._cparams.as_compression_parameters()
701 else:
702 zparams.cParams = lib.ZSTD_getCParams(self._compression_level,
703 size, dict_size)
704
705 zparams.fParams = self._fparams
706
707 zresult = lib.ZSTDMT_initCStream_advanced(self._cctx, dict_data, dict_size,
708 zparams, size)
709
710 if lib.ZSTD_isError(zresult):
711 raise ZstdError('cannot init CStream: %s' %
712 ffi.string(lib.ZSTD_getErrorName(zresult)))
575 713
576 714
577 715 class FrameParameters(object):
@@ -601,9 +739,11 b' def get_frame_parameters(data):'
601 739
602 740
603 741 class ZstdCompressionDict(object):
604 def __init__(self, data):
742 def __init__(self, data, k=0, d=0):
605 743 assert isinstance(data, bytes_type)
606 744 self._data = data
745 self.k = k
746 self.d = d
607 747
608 748 def __len__(self):
609 749 return len(self._data)
@@ -615,7 +755,8 b' class ZstdCompressionDict(object):'
615 755 return self._data
616 756
617 757
618 def train_dictionary(dict_size, samples, parameters=None):
758 def train_dictionary(dict_size, samples, selectivity=0, level=0,
759 notifications=0, dict_id=0):
619 760 if not isinstance(samples, list):
620 761 raise TypeError('samples must be a list')
621 762
@@ -636,10 +777,18 b' def train_dictionary(dict_size, samples,'
636 777
637 778 dict_data = new_nonzero('char[]', dict_size)
638 779
639 zresult = lib.ZDICT_trainFromBuffer(ffi.addressof(dict_data), dict_size,
640 ffi.addressof(samples_buffer),
641 ffi.addressof(sample_sizes, 0),
642 len(samples))
780 dparams = ffi.new('ZDICT_params_t *')[0]
781 dparams.selectivityLevel = selectivity
782 dparams.compressionLevel = level
783 dparams.notificationLevel = notifications
784 dparams.dictID = dict_id
785
786 zresult = lib.ZDICT_trainFromBuffer_advanced(
787 ffi.addressof(dict_data), dict_size,
788 ffi.addressof(samples_buffer),
789 ffi.addressof(sample_sizes, 0), len(samples),
790 dparams)
791
643 792 if lib.ZDICT_isError(zresult):
644 793 raise ZstdError('Cannot train dict: %s' %
645 794 ffi.string(lib.ZDICT_getErrorName(zresult)))
@@ -647,16 +796,73 b' def train_dictionary(dict_size, samples,'
647 796 return ZstdCompressionDict(ffi.buffer(dict_data, zresult)[:])
648 797
649 798
799 def train_cover_dictionary(dict_size, samples, k=0, d=0,
800 notifications=0, dict_id=0, level=0, optimize=False,
801 steps=0, threads=0):
802 if not isinstance(samples, list):
803 raise TypeError('samples must be a list')
804
805 if threads < 0:
806 threads = _cpu_count()
807
808 total_size = sum(map(len, samples))
809
810 samples_buffer = new_nonzero('char[]', total_size)
811 sample_sizes = new_nonzero('size_t[]', len(samples))
812
813 offset = 0
814 for i, sample in enumerate(samples):
815 if not isinstance(sample, bytes_type):
816 raise ValueError('samples must be bytes')
817
818 l = len(sample)
819 ffi.memmove(samples_buffer + offset, sample, l)
820 offset += l
821 sample_sizes[i] = l
822
823 dict_data = new_nonzero('char[]', dict_size)
824
825 dparams = ffi.new('COVER_params_t *')[0]
826 dparams.k = k
827 dparams.d = d
828 dparams.steps = steps
829 dparams.nbThreads = threads
830 dparams.notificationLevel = notifications
831 dparams.dictID = dict_id
832 dparams.compressionLevel = level
833
834 if optimize:
835 zresult = lib.COVER_optimizeTrainFromBuffer(
836 ffi.addressof(dict_data), dict_size,
837 ffi.addressof(samples_buffer),
838 ffi.addressof(sample_sizes, 0), len(samples),
839 ffi.addressof(dparams))
840 else:
841 zresult = lib.COVER_trainFromBuffer(
842 ffi.addressof(dict_data), dict_size,
843 ffi.addressof(samples_buffer),
844 ffi.addressof(sample_sizes, 0), len(samples),
845 dparams)
846
847 if lib.ZDICT_isError(zresult):
848 raise ZstdError('cannot train dict: %s' %
849 ffi.string(lib.ZDICT_getErrorName(zresult)))
850
851 return ZstdCompressionDict(ffi.buffer(dict_data, zresult)[:],
852 k=dparams.k, d=dparams.d)
853
854
650 855 class ZstdDecompressionObj(object):
651 856 def __init__(self, decompressor):
652 857 self._decompressor = decompressor
653 self._dstream = self._decompressor._get_dstream()
654 858 self._finished = False
655 859
656 860 def decompress(self, data):
657 861 if self._finished:
658 862 raise ZstdError('cannot use a decompressobj multiple times')
659 863
864 assert(self._decompressor._dstream)
865
660 866 in_buffer = ffi.new('ZSTD_inBuffer *')
661 867 out_buffer = ffi.new('ZSTD_outBuffer *')
662 868
@@ -673,14 +879,14 b' class ZstdDecompressionObj(object):'
673 879 chunks = []
674 880
675 881 while in_buffer.pos < in_buffer.size:
676 zresult = lib.ZSTD_decompressStream(self._dstream, out_buffer, in_buffer)
882 zresult = lib.ZSTD_decompressStream(self._decompressor._dstream,
883 out_buffer, in_buffer)
677 884 if lib.ZSTD_isError(zresult):
678 885 raise ZstdError('zstd decompressor error: %s' %
679 886 ffi.string(lib.ZSTD_getErrorName(zresult)))
680 887
681 888 if zresult == 0:
682 889 self._finished = True
683 self._dstream = None
684 890 self._decompressor = None
685 891
686 892 if out_buffer.pos:
@@ -695,28 +901,26 b' class ZstdDecompressionWriter(object):'
695 901 self._decompressor = decompressor
696 902 self._writer = writer
697 903 self._write_size = write_size
698 self._dstream = None
699 904 self._entered = False
700 905
701 906 def __enter__(self):
702 907 if self._entered:
703 908 raise ZstdError('cannot __enter__ multiple times')
704 909
705 self._dstream = self._decompressor._get_dstream()
910 self._decompressor._ensure_dstream()
706 911 self._entered = True
707 912
708 913 return self
709 914
710 915 def __exit__(self, exc_type, exc_value, exc_tb):
711 916 self._entered = False
712 self._dstream = None
713 917
714 918 def memory_size(self):
715 if not self._dstream:
919 if not self._decompressor._dstream:
716 920 raise ZstdError('cannot determine size of inactive decompressor '
717 921 'call when context manager is active')
718 922
719 return lib.ZSTD_sizeof_DStream(self._dstream)
923 return lib.ZSTD_sizeof_DStream(self._decompressor._dstream)
720 924
721 925 def write(self, data):
722 926 if not self._entered:
@@ -737,8 +941,10 b' class ZstdDecompressionWriter(object):'
737 941 out_buffer.size = len(dst_buffer)
738 942 out_buffer.pos = 0
739 943
944 dstream = self._decompressor._dstream
945
740 946 while in_buffer.pos < in_buffer.size:
741 zresult = lib.ZSTD_decompressStream(self._dstream, out_buffer, in_buffer)
947 zresult = lib.ZSTD_decompressStream(dstream, out_buffer, in_buffer)
742 948 if lib.ZSTD_isError(zresult):
743 949 raise ZstdError('zstd decompress error: %s' %
744 950 ffi.string(lib.ZSTD_getErrorName(zresult)))
@@ -760,6 +966,7 b' class ZstdDecompressor(object):'
760 966 raise MemoryError()
761 967
762 968 self._refdctx = ffi.gc(dctx, lib.ZSTD_freeDCtx)
969 self._dstream = None
763 970
764 971 @property
765 972 def _ddict(self):
@@ -816,6 +1023,7 b' class ZstdDecompressor(object):'
816 1023 return ffi.buffer(result_buffer, zresult)[:]
817 1024
818 1025 def decompressobj(self):
1026 self._ensure_dstream()
819 1027 return ZstdDecompressionObj(self)
820 1028
821 1029 def read_from(self, reader, read_size=DECOMPRESSION_RECOMMENDED_INPUT_SIZE,
@@ -843,7 +1051,7 b' class ZstdDecompressor(object):'
843 1051
844 1052 buffer_offset = skip_bytes
845 1053
846 dstream = self._get_dstream()
1054 self._ensure_dstream()
847 1055
848 1056 in_buffer = ffi.new('ZSTD_inBuffer *')
849 1057 out_buffer = ffi.new('ZSTD_outBuffer *')
@@ -878,7 +1086,7 b' class ZstdDecompressor(object):'
878 1086 while in_buffer.pos < in_buffer.size:
879 1087 assert out_buffer.pos == 0
880 1088
881 zresult = lib.ZSTD_decompressStream(dstream, out_buffer, in_buffer)
1089 zresult = lib.ZSTD_decompressStream(self._dstream, out_buffer, in_buffer)
882 1090 if lib.ZSTD_isError(zresult):
883 1091 raise ZstdError('zstd decompress error: %s' %
884 1092 ffi.string(lib.ZSTD_getErrorName(zresult)))
@@ -910,7 +1118,7 b' class ZstdDecompressor(object):'
910 1118 if not hasattr(ofh, 'write'):
911 1119 raise ValueError('second argument must have a write() method')
912 1120
913 dstream = self._get_dstream()
1121 self._ensure_dstream()
914 1122
915 1123 in_buffer = ffi.new('ZSTD_inBuffer *')
916 1124 out_buffer = ffi.new('ZSTD_outBuffer *')
@@ -936,7 +1144,7 b' class ZstdDecompressor(object):'
936 1144
937 1145 # Flush all read data to output.
938 1146 while in_buffer.pos < in_buffer.size:
939 zresult = lib.ZSTD_decompressStream(dstream, out_buffer, in_buffer)
1147 zresult = lib.ZSTD_decompressStream(self._dstream, out_buffer, in_buffer)
940 1148 if lib.ZSTD_isError(zresult):
941 1149 raise ZstdError('zstd decompressor error: %s' %
942 1150 ffi.string(lib.ZSTD_getErrorName(zresult)))
@@ -1021,22 +1229,29 b' class ZstdDecompressor(object):'
1021 1229
1022 1230 return ffi.buffer(last_buffer, len(last_buffer))[:]
1023 1231
1024 def _get_dstream(self):
1025 dstream = lib.ZSTD_createDStream()
1026 if dstream == ffi.NULL:
1232 def _ensure_dstream(self):
1233 if self._dstream:
1234 zresult = lib.ZSTD_resetDStream(self._dstream)
1235 if lib.ZSTD_isError(zresult):
1236 raise ZstdError('could not reset DStream: %s' %
1237 ffi.string(lib.ZSTD_getErrorName(zresult)))
1238
1239 return
1240
1241 self._dstream = lib.ZSTD_createDStream()
1242 if self._dstream == ffi.NULL:
1027 1243 raise MemoryError()
1028 1244
1029 dstream = ffi.gc(dstream, lib.ZSTD_freeDStream)
1245 self._dstream = ffi.gc(self._dstream, lib.ZSTD_freeDStream)
1030 1246
1031 1247 if self._dict_data:
1032 zresult = lib.ZSTD_initDStream_usingDict(dstream,
1248 zresult = lib.ZSTD_initDStream_usingDict(self._dstream,
1033 1249 self._dict_data.as_bytes(),
1034 1250 len(self._dict_data))
1035 1251 else:
1036 zresult = lib.ZSTD_initDStream(dstream)
1252 zresult = lib.ZSTD_initDStream(self._dstream)
1037 1253
1038 1254 if lib.ZSTD_isError(zresult):
1255 self._dstream = None
1039 1256 raise ZstdError('could not initialize DStream: %s' %
1040 1257 ffi.string(lib.ZSTD_getErrorName(zresult)))
1041
1042 return dstream
@@ -7,12 +7,15 b''
7 7 contrib/python-zstandard/setup.py not using absolute_import
8 8 contrib/python-zstandard/setup_zstd.py not using absolute_import
9 9 contrib/python-zstandard/tests/common.py not using absolute_import
10 contrib/python-zstandard/tests/test_buffer_util.py not using absolute_import
10 11 contrib/python-zstandard/tests/test_compressor.py not using absolute_import
12 contrib/python-zstandard/tests/test_compressor_fuzzing.py not using absolute_import
11 13 contrib/python-zstandard/tests/test_data_structures.py not using absolute_import
14 contrib/python-zstandard/tests/test_data_structures_fuzzing.py not using absolute_import
12 15 contrib/python-zstandard/tests/test_decompressor.py not using absolute_import
16 contrib/python-zstandard/tests/test_decompressor_fuzzing.py not using absolute_import
13 17 contrib/python-zstandard/tests/test_estimate_sizes.py not using absolute_import
14 18 contrib/python-zstandard/tests/test_module_attributes.py not using absolute_import
15 contrib/python-zstandard/tests/test_roundtrip.py not using absolute_import
16 19 contrib/python-zstandard/tests/test_train_dictionary.py not using absolute_import
17 20 i18n/check-translation.py not using absolute_import
18 21 setup.py not using absolute_import
1 NO CONTENT: file was removed
1 NO CONTENT: file was removed
General Comments 0
You need to be logged in to leave comments. Login now