##// END OF EJS Templates
copies-rust: add smarter approach for merging small mapping with large mapping...
copies-rust: add smarter approach for merging small mapping with large mapping The current approach (finding the smaller updated set) works great when the mapping have similar size, but do a lot of unnecessary work when one side is tinier than the other one. So we do better in theses cases. See inline documentation for details. It give a sizeable boost to many of out slower cases: Repo Case Source-Rev Dest-Rev # of revisions old time new time Difference Factor time per rev --------------------------------------------------------------------------------------------------------------------------------------------------------------- mozilla-try x00000_revs_x_added_0_copies 6a320851d377 1ebb79acd503 : 363753 revs, 18.123103 s, 5.693818 s, -12.429285 s, × 0.3142, 15 µs/rev mozilla-try x00000_revs_x_added_x_copies 5173c4b6f97c 95d83ee7242d : 362229 revs, 17.907312 s, 5.677655 s, -12.229657 s, × 0.3171, 15 µs/rev mozilla-try x00000_revs_x000_added_x_copies 9126823d0e9c ca82787bb23c : 359344 revs, 17.684797 s, 5.563370 s, -12.121427 s, × 0.3146, 15 µs/rev mozilla-try x00000_revs_x0000_added_x0000_copies 8d3fafa80d4b eb884023b810 : 192665 revs, 2.881471 s, 2.864099 s, -0.017372 s, × 0.9940, 14 µs/rev mozilla-try x00000_revs_x00000_added_x000_copies 9b2a99adc05e 8e29777b48e6 : 382065 revs, 63.148971 s, 59.498652 s, -3.650319 s, × 0.9422, 155 µs/rev mozilla-try x00000_revs_x00000_added_x000_copies 9b2a99adc05e 8e29777b48e6 : 382065 revs, 63.148971 s, 59.498652 s, -3.650319 s, × 0.9422, 155 µs/rev ideally, the im-rs object would have a `merge` method, but it does not (yet) Full timing comparison below (they are one pathological case than become even worse, for unclear reason). Repo Case Source-Rev Dest-Rev # of revisions old time new time Difference Factor time per rev --------------------------------------------------------------------------------------------------------------------------------------------------------------- mercurial x_revs_x_added_0_copies ad6b123de1c7 39cfcef4f463 : 1 revs, 0.000043 s, 0.000042 s, -0.000001 s, × 0.9767, 42 µs/rev mercurial x_revs_x_added_x_copies 2b1c78674230 0c1d10351869 : 6 revs, 0.000105 s, 0.000104 s, -0.000001 s, × 0.9905, 17 µs/rev mercurial x000_revs_x000_added_x_copies 81f8ff2a9bf2 dd3267698d84 : 1032 revs, 0.004895 s, 0.004913 s, +0.000018 s, × 1.0037, 4 µs/rev pypy x_revs_x_added_0_copies aed021ee8ae8 099ed31b181b : 9 revs, 0.000194 s, 0.000191 s, -0.000003 s, × 0.9845, 21 µs/rev pypy x_revs_x000_added_0_copies 4aa4e1f8e19a 359343b9ac0e : 1 revs, 0.000050 s, 0.000050 s, +0.000000 s, × 1.0000, 50 µs/rev pypy x_revs_x_added_x_copies ac52eb7bbbb0 72e022663155 : 7 revs, 0.000115 s, 0.000112 s, -0.000003 s, × 0.9739, 16 µs/rev pypy x_revs_x00_added_x_copies c3b14617fbd7 ace7255d9a26 : 1 revs, 0.000289 s, 0.000288 s, -0.000001 s, × 0.9965, 288 µs/rev pypy x_revs_x000_added_x000_copies df6f7a526b60 a83dc6a2d56f : 6 revs, 0.010513 s, 0.010411 s, -0.000102 s, × 0.9903, 1735 µs/rev pypy x000_revs_xx00_added_0_copies 89a76aede314 2f22446ff07e : 4785 revs, 0.051474 s, 0.052852 s, +0.001378 s, × 1.0268, 11 µs/rev pypy x000_revs_x000_added_x_copies 8a3b5bfd266e 2c68e87c3efe : 6780 revs, 0.088086 s, 0.092828 s, +0.004742 s, × 1.0538, 13 µs/rev pypy x000_revs_x000_added_x000_copies 89a76aede314 7b3dda341c84 : 5441 revs, 0.062176 s, 0.063269 s, +0.001093 s, × 1.0176, 11 µs/rev pypy x0000_revs_x_added_0_copies d1defd0dc478 c9cb1334cc78 : 43645 revs, 0.720950 s, 0.711975 s, -0.008975 s, × 0.9876, 16 µs/rev pypy x0000_revs_xx000_added_0_copies bf2c629d0071 4ffed77c095c : 2 revs, 0.012897 s, 0.012771 s, -0.000126 s, × 0.9902, 6385 µs/rev pypy x0000_revs_xx000_added_x000_copies 08ea3258278e d9fa043f30c0 : 11316 revs, 0.121524 s, 0.124505 s, +0.002981 s, × 1.0245, 11 µs/rev netbeans x_revs_x_added_0_copies fb0955ffcbcd a01e9239f9e7 : 2 revs, 0.000082 s, 0.000082 s, +0.000000 s, × 1.0000, 41 µs/rev netbeans x_revs_x000_added_0_copies 6f360122949f 20eb231cc7d0 : 2 revs, 0.000109 s, 0.000111 s, +0.000002 s, × 1.0183, 55 µs/rev netbeans x_revs_x_added_x_copies 1ada3faf6fb6 5a39d12eecf4 : 3 revs, 0.000175 s, 0.000171 s, -0.000004 s, × 0.9771, 57 µs/rev netbeans x_revs_x00_added_x_copies 35be93ba1e2c 9eec5e90c05f : 9 revs, 0.000719 s, 0.000708 s, -0.000011 s, × 0.9847, 78 µs/rev netbeans x000_revs_xx00_added_0_copies eac3045b4fdd 51d4ae7f1290 : 1421 revs, 0.010426 s, 0.010608 s, +0.000182 s, × 1.0175, 7 µs/rev netbeans x000_revs_x000_added_x_copies e2063d266acd 6081d72689dc : 1533 revs, 0.015712 s, 0.015635 s, -0.000077 s, × 0.9951, 10 µs/rev netbeans x000_revs_x000_added_x000_copies ff453e9fee32 411350406ec2 : 5750 revs, 0.077353 s, 0.072072 s, -0.005281 s, × 0.9317, 12 µs/rev netbeans x0000_revs_xx000_added_x000_copies 588c2d1ced70 1aad62e59ddd : 66949 revs, 0.673930 s, 0.682732 s, +0.008802 s, × 1.0131, 10 µs/rev mozilla-central x_revs_x_added_0_copies 3697f962bb7b 7015fcdd43a2 : 2 revs, 0.000089 s, 0.000090 s, +0.000001 s, × 1.0112, 45 µs/rev mozilla-central x_revs_x000_added_0_copies dd390860c6c9 40d0c5bed75d : 8 revs, 0.000212 s, 0.000210 s, -0.000002 s, × 0.9906, 26 µs/rev mozilla-central x_revs_x_added_x_copies 8d198483ae3b 14207ffc2b2f : 9 revs, 0.000183 s, 0.000182 s, -0.000001 s, × 0.9945, 20 µs/rev mozilla-central x_revs_x00_added_x_copies 98cbc58cc6bc 446a150332c3 : 7 revs, 0.000595 s, 0.000594 s, -0.000001 s, × 0.9983, 84 µs/rev mozilla-central x_revs_x000_added_x000_copies 3c684b4b8f68 0a5e72d1b479 : 3 revs, 0.003117 s, 0.003102 s, -0.000015 s, × 0.9952, 1034 µs/rev mozilla-central x_revs_x0000_added_x0000_copies effb563bb7e5 c07a39dc4e80 : 6 revs, 0.060197 s, 0.060234 s, +0.000037 s, × 1.0006, 10039 µs/rev mozilla-central x000_revs_xx00_added_0_copies 6100d773079a 04a55431795e : 1593 revs, 0.006379 s, 0.006300 s, -0.000079 s, × 0.9876, 3 µs/rev mozilla-central x000_revs_x000_added_x_copies 9f17a6fc04f9 2d37b966abed : 41 revs, 0.005008 s, 0.004817 s, -0.000191 s, × 0.9619, 117 µs/rev mozilla-central x000_revs_x000_added_x000_copies 7c97034feb78 4407bd0c6330 : 7839 revs, 0.065123 s, 0.065451 s, +0.000328 s, × 1.0050, 8 µs/rev mozilla-central x0000_revs_xx000_added_0_copies 9eec5917337d 67118cc6dcad : 615 revs, 0.026404 s, 0.026282 s, -0.000122 s, × 0.9954, 42 µs/rev mozilla-central x0000_revs_xx000_added_x000_copies f78c615a656c 96a38b690156 : 30263 revs, 0.203456 s, 0.206873 s, +0.003417 s, × 1.0168, 6 µs/rev mozilla-central x00000_revs_x0000_added_x0000_copies 6832ae71433c 4c222a1d9a00 : 153721 revs, 1.929809 s, 1.935918 s, +0.006109 s, × 1.0032, 12 µs/rev mozilla-central x00000_revs_x00000_added_x000_copies 76caed42cf7c 1daa622bbe42 : 204976 revs, 2.825064 s, 2.827320 s, +0.002256 s, × 1.0008, 13 µs/rev mozilla-try x_revs_x_added_0_copies aaf6dde0deb8 9790f499805a : 2 revs, 0.000857 s, 0.000842 s, -0.000015 s, × 0.9825, 421 µs/rev mozilla-try x_revs_x000_added_0_copies d8d0222927b4 5bb8ce8c7450 : 2 revs, 0.000870 s, 0.000870 s, +0.000000 s, × 1.0000, 435 µs/rev mozilla-try x_revs_x_added_x_copies 092fcca11bdb 936255a0384a : 4 revs, 0.000161 s, 0.000165 s, +0.000004 s, × 1.0248, 41 µs/rev mozilla-try x_revs_x00_added_x_copies b53d2fadbdb5 017afae788ec : 2 revs, 0.001147 s, 0.001145 s, -0.000002 s, × 0.9983, 572 µs/rev mozilla-try x_revs_x000_added_x000_copies 20408ad61ce5 6f0ee96e21ad : 1 revs, 0.026640 s, 0.026500 s, -0.000140 s, × 0.9947, 26500 µs/rev mozilla-try x_revs_x0000_added_x0000_copies effb563bb7e5 c07a39dc4e80 : 6 revs, 0.059849 s, 0.059407 s, -0.000442 s, × 0.9926, 9901 µs/rev mozilla-try x000_revs_xx00_added_0_copies 6100d773079a 04a55431795e : 1593 revs, 0.006326 s, 0.006325 s, -0.000001 s, × 0.9998, 3 µs/rev mozilla-try x000_revs_x000_added_x_copies 9f17a6fc04f9 2d37b966abed : 41 revs, 0.005188 s, 0.005171 s, -0.000017 s, × 0.9967, 126 µs/rev mozilla-try x000_revs_x000_added_x000_copies 1346fd0130e4 4c65cbdabc1f : 6657 revs, 0.067633 s, 0.066837 s, -0.000796 s, × 0.9882, 10 µs/rev mozilla-try x0000_revs_x_added_0_copies 63519bfd42ee a36a2a865d92 : 40314 revs, 0.306969 s, 0.314252 s, +0.007283 s, × 1.0237, 7 µs/rev mozilla-try x0000_revs_x_added_x_copies 9fe69ff0762d bcabf2a78927 : 38690 revs, 0.293370 s, 0.304160 s, +0.010790 s, × 1.0368, 7 µs/rev mozilla-try x0000_revs_xx000_added_x_copies 156f6e2674f2 4d0f2c178e66 : 8598 revs, 0.087159 s, 0.089223 s, +0.002064 s, × 1.0237, 10 µs/rev mozilla-try x0000_revs_xx000_added_0_copies 9eec5917337d 67118cc6dcad : 615 revs, 0.027251 s, 0.026711 s, -0.000540 s, × 0.9802, 43 µs/rev mozilla-try x0000_revs_xx000_added_x000_copies 89294cd501d9 7ccb2fc7ccb5 : 97052 revs, 3.010011 s, 3.243010 s, +0.232999 s, × 1.0774, 33 µs/rev mozilla-try x0000_revs_x0000_added_x0000_copies e928c65095ed e951f4ad123a : 52031 revs, 0.753434 s, 0.756500 s, +0.003066 s, × 1.0041, 14 µs/rev mozilla-try x00000_revs_x_added_0_copies 6a320851d377 1ebb79acd503 : 363753 revs, 18.123103 s, 5.693818 s, -12.429285 s, × 0.3142, 15 µs/rev mozilla-try x00000_revs_x00000_added_0_copies dc8a3ca7010e d16fde900c9c : 34414 revs, 0.583206 s, 0.590904 s, +0.007698 s, × 1.0132, 17 µs/rev mozilla-try x00000_revs_x_added_x_copies 5173c4b6f97c 95d83ee7242d : 362229 revs, 17.907312 s, 5.677655 s, -12.229657 s, × 0.3171, 15 µs/rev mozilla-try x00000_revs_x000_added_x_copies 9126823d0e9c ca82787bb23c : 359344 revs, 17.684797 s, 5.563370 s, -12.121427 s, × 0.3146, 15 µs/rev mozilla-try x00000_revs_x0000_added_x0000_copies 8d3fafa80d4b eb884023b810 : 192665 revs, 2.881471 s, 2.864099 s, -0.017372 s, × 0.9940, 14 µs/rev mozilla-try x00000_revs_x00000_added_x0000_copies 1b661134e2ca 1ae03d022d6d : 228985 revs, 101.062002 s, 113.297287 s, +12.235285 s, × 1.1211, 494 µs/rev mozilla-try x00000_revs_x00000_added_x000_copies 9b2a99adc05e 8e29777b48e6 : 382065 revs, 63.148971 s, 59.498652 s, -3.650319 s, × 0.9422, 155 µs/rev Differential Revision: https://phab.mercurial-scm.org/D9491

File last commit:

r43083:a4e32fd5 5.1.1 stable
r46744:c94d013e default
Show More
decompressor.c
1822 lines | 50.7 KiB | text/x-c | CLexer
/**
* Copyright (c) 2016-present, Gregory Szorc
* All rights reserved.
*
* This software may be modified and distributed under the terms
* of the BSD license. See the LICENSE file for details.
*/
#include "python-zstandard.h"
#include "pool.h"
extern PyObject* ZstdError;
/**
* Ensure the ZSTD_DCtx on a decompressor is initiated and ready for a new operation.
*/
int ensure_dctx(ZstdDecompressor* decompressor, int loadDict) {
size_t zresult;
ZSTD_DCtx_reset(decompressor->dctx, ZSTD_reset_session_only);
if (decompressor->maxWindowSize) {
zresult = ZSTD_DCtx_setMaxWindowSize(decompressor->dctx, decompressor->maxWindowSize);
if (ZSTD_isError(zresult)) {
PyErr_Format(ZstdError, "unable to set max window size: %s",
ZSTD_getErrorName(zresult));
return 1;
}
}
zresult = ZSTD_DCtx_setFormat(decompressor->dctx, decompressor->format);
if (ZSTD_isError(zresult)) {
PyErr_Format(ZstdError, "unable to set decoding format: %s",
ZSTD_getErrorName(zresult));
return 1;
}
if (loadDict && decompressor->dict) {
if (ensure_ddict(decompressor->dict)) {
return 1;
}
zresult = ZSTD_DCtx_refDDict(decompressor->dctx, decompressor->dict->ddict);
if (ZSTD_isError(zresult)) {
PyErr_Format(ZstdError, "unable to reference prepared dictionary: %s",
ZSTD_getErrorName(zresult));
return 1;
}
}
return 0;
}
PyDoc_STRVAR(Decompressor__doc__,
"ZstdDecompressor(dict_data=None)\n"
"\n"
"Create an object used to perform Zstandard decompression.\n"
"\n"
"An instance can perform multiple decompression operations."
);
static int Decompressor_init(ZstdDecompressor* self, PyObject* args, PyObject* kwargs) {
static char* kwlist[] = {
"dict_data",
"max_window_size",
"format",
NULL
};
ZstdCompressionDict* dict = NULL;
Py_ssize_t maxWindowSize = 0;
ZSTD_format_e format = ZSTD_f_zstd1;
self->dctx = NULL;
self->dict = NULL;
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|O!nI:ZstdDecompressor", kwlist,
&ZstdCompressionDictType, &dict, &maxWindowSize, &format)) {
return -1;
}
self->dctx = ZSTD_createDCtx();
if (!self->dctx) {
PyErr_NoMemory();
goto except;
}
self->maxWindowSize = maxWindowSize;
self->format = format;
if (dict) {
self->dict = dict;
Py_INCREF(dict);
}
if (ensure_dctx(self, 1)) {
goto except;
}
return 0;
except:
Py_CLEAR(self->dict);
if (self->dctx) {
ZSTD_freeDCtx(self->dctx);
self->dctx = NULL;
}
return -1;
}
static void Decompressor_dealloc(ZstdDecompressor* self) {
Py_CLEAR(self->dict);
if (self->dctx) {
ZSTD_freeDCtx(self->dctx);
self->dctx = NULL;
}
PyObject_Del(self);
}
PyDoc_STRVAR(Decompressor_memory_size__doc__,
"memory_size() -- Size of decompression context, in bytes\n"
);
static PyObject* Decompressor_memory_size(ZstdDecompressor* self) {
if (self->dctx) {
return PyLong_FromSize_t(ZSTD_sizeof_DCtx(self->dctx));
}
else {
PyErr_SetString(ZstdError, "no decompressor context found; this should never happen");
return NULL;
}
}
PyDoc_STRVAR(Decompressor_copy_stream__doc__,
"copy_stream(ifh, ofh[, read_size=default, write_size=default]) -- decompress data between streams\n"
"\n"
"Compressed data will be read from ``ifh``, decompressed, and written to\n"
"``ofh``. ``ifh`` must have a ``read(size)`` method. ``ofh`` must have a\n"
"``write(data)`` method.\n"
"\n"
"The optional ``read_size`` and ``write_size`` arguments control the chunk\n"
"size of data that is ``read()`` and ``write()`` between streams. They default\n"
"to the default input and output sizes of zstd decompressor streams.\n"
);
static PyObject* Decompressor_copy_stream(ZstdDecompressor* self, PyObject* args, PyObject* kwargs) {
static char* kwlist[] = {
"ifh",
"ofh",
"read_size",
"write_size",
NULL
};
PyObject* source;
PyObject* dest;
size_t inSize = ZSTD_DStreamInSize();
size_t outSize = ZSTD_DStreamOutSize();
ZSTD_inBuffer input;
ZSTD_outBuffer output;
Py_ssize_t totalRead = 0;
Py_ssize_t totalWrite = 0;
char* readBuffer;
Py_ssize_t readSize;
PyObject* readResult = NULL;
PyObject* res = NULL;
size_t zresult = 0;
PyObject* writeResult;
PyObject* totalReadPy;
PyObject* totalWritePy;
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "OO|kk:copy_stream", kwlist,
&source, &dest, &inSize, &outSize)) {
return NULL;
}
if (!PyObject_HasAttrString(source, "read")) {
PyErr_SetString(PyExc_ValueError, "first argument must have a read() method");
return NULL;
}
if (!PyObject_HasAttrString(dest, "write")) {
PyErr_SetString(PyExc_ValueError, "second argument must have a write() method");
return NULL;
}
/* Prevent free on uninitialized memory in finally. */
output.dst = NULL;
if (ensure_dctx(self, 1)) {
res = NULL;
goto finally;
}
output.dst = PyMem_Malloc(outSize);
if (!output.dst) {
PyErr_NoMemory();
res = NULL;
goto finally;
}
output.size = outSize;
output.pos = 0;
/* Read source stream until EOF */
while (1) {
readResult = PyObject_CallMethod(source, "read", "n", inSize);
if (!readResult) {
PyErr_SetString(ZstdError, "could not read() from source");
goto finally;
}
PyBytes_AsStringAndSize(readResult, &readBuffer, &readSize);
/* If no data was read, we're at EOF. */
if (0 == readSize) {
break;
}
totalRead += readSize;
/* Send data to decompressor */
input.src = readBuffer;
input.size = readSize;
input.pos = 0;
while (input.pos < input.size) {
Py_BEGIN_ALLOW_THREADS
zresult = ZSTD_decompressStream(self->dctx, &output, &input);
Py_END_ALLOW_THREADS
if (ZSTD_isError(zresult)) {
PyErr_Format(ZstdError, "zstd decompressor error: %s",
ZSTD_getErrorName(zresult));
res = NULL;
goto finally;
}
if (output.pos) {
#if PY_MAJOR_VERSION >= 3
writeResult = PyObject_CallMethod(dest, "write", "y#",
#else
writeResult = PyObject_CallMethod(dest, "write", "s#",
#endif
output.dst, output.pos);
Py_XDECREF(writeResult);
totalWrite += output.pos;
output.pos = 0;
}
}
Py_CLEAR(readResult);
}
/* Source stream is exhausted. Finish up. */
totalReadPy = PyLong_FromSsize_t(totalRead);
totalWritePy = PyLong_FromSsize_t(totalWrite);
res = PyTuple_Pack(2, totalReadPy, totalWritePy);
Py_DECREF(totalReadPy);
Py_DECREF(totalWritePy);
finally:
if (output.dst) {
PyMem_Free(output.dst);
}
Py_XDECREF(readResult);
return res;
}
PyDoc_STRVAR(Decompressor_decompress__doc__,
"decompress(data[, max_output_size=None]) -- Decompress data in its entirety\n"
"\n"
"This method will decompress the entirety of the argument and return the\n"
"result.\n"
"\n"
"The input bytes are expected to contain a full Zstandard frame (something\n"
"compressed with ``ZstdCompressor.compress()`` or similar). If the input does\n"
"not contain a full frame, an exception will be raised.\n"
"\n"
"If the frame header of the compressed data does not contain the content size\n"
"``max_output_size`` must be specified or ``ZstdError`` will be raised. An\n"
"allocation of size ``max_output_size`` will be performed and an attempt will\n"
"be made to perform decompression into that buffer. If the buffer is too\n"
"small or cannot be allocated, ``ZstdError`` will be raised. The buffer will\n"
"be resized if it is too large.\n"
"\n"
"Uncompressed data could be much larger than compressed data. As a result,\n"
"calling this function could result in a very large memory allocation being\n"
"performed to hold the uncompressed data. Therefore it is **highly**\n"
"recommended to use a streaming decompression method instead of this one.\n"
);
PyObject* Decompressor_decompress(ZstdDecompressor* self, PyObject* args, PyObject* kwargs) {
static char* kwlist[] = {
"data",
"max_output_size",
NULL
};
Py_buffer source;
Py_ssize_t maxOutputSize = 0;
unsigned long long decompressedSize;
size_t destCapacity;
PyObject* result = NULL;
size_t zresult;
ZSTD_outBuffer outBuffer;
ZSTD_inBuffer inBuffer;
#if PY_MAJOR_VERSION >= 3
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "y*|n:decompress",
#else
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "s*|n:decompress",
#endif
kwlist, &source, &maxOutputSize)) {
return NULL;
}
if (!PyBuffer_IsContiguous(&source, 'C') || source.ndim > 1) {
PyErr_SetString(PyExc_ValueError,
"data buffer should be contiguous and have at most one dimension");
goto finally;
}
if (ensure_dctx(self, 1)) {
goto finally;
}
decompressedSize = ZSTD_getFrameContentSize(source.buf, source.len);
if (ZSTD_CONTENTSIZE_ERROR == decompressedSize) {
PyErr_SetString(ZstdError, "error determining content size from frame header");
goto finally;
}
/* Special case of empty frame. */
else if (0 == decompressedSize) {
result = PyBytes_FromStringAndSize("", 0);
goto finally;
}
/* Missing content size in frame header. */
if (ZSTD_CONTENTSIZE_UNKNOWN == decompressedSize) {
if (0 == maxOutputSize) {
PyErr_SetString(ZstdError, "could not determine content size in frame header");
goto finally;
}
result = PyBytes_FromStringAndSize(NULL, maxOutputSize);
destCapacity = maxOutputSize;
decompressedSize = 0;
}
/* Size is recorded in frame header. */
else {
assert(SIZE_MAX >= PY_SSIZE_T_MAX);
if (decompressedSize > PY_SSIZE_T_MAX) {
PyErr_SetString(ZstdError, "frame is too large to decompress on this platform");
goto finally;
}
result = PyBytes_FromStringAndSize(NULL, (Py_ssize_t)decompressedSize);
destCapacity = (size_t)decompressedSize;
}
if (!result) {
goto finally;
}
outBuffer.dst = PyBytes_AsString(result);
outBuffer.size = destCapacity;
outBuffer.pos = 0;
inBuffer.src = source.buf;
inBuffer.size = source.len;
inBuffer.pos = 0;
Py_BEGIN_ALLOW_THREADS
zresult = ZSTD_decompressStream(self->dctx, &outBuffer, &inBuffer);
Py_END_ALLOW_THREADS
if (ZSTD_isError(zresult)) {
PyErr_Format(ZstdError, "decompression error: %s", ZSTD_getErrorName(zresult));
Py_CLEAR(result);
goto finally;
}
else if (zresult) {
PyErr_Format(ZstdError, "decompression error: did not decompress full frame");
Py_CLEAR(result);
goto finally;
}
else if (decompressedSize && outBuffer.pos != decompressedSize) {
PyErr_Format(ZstdError, "decompression error: decompressed %zu bytes; expected %llu",
zresult, decompressedSize);
Py_CLEAR(result);
goto finally;
}
else if (outBuffer.pos < destCapacity) {
if (safe_pybytes_resize(&result, outBuffer.pos)) {
Py_CLEAR(result);
goto finally;
}
}
finally:
PyBuffer_Release(&source);
return result;
}
PyDoc_STRVAR(Decompressor_decompressobj__doc__,
"decompressobj([write_size=default])\n"
"\n"
"Incrementally feed data into a decompressor.\n"
"\n"
"The returned object exposes a ``decompress(data)`` method. This makes it\n"
"compatible with ``zlib.decompressobj`` and ``bz2.BZ2Decompressor`` so that\n"
"callers can swap in the zstd decompressor while using the same API.\n"
);
static ZstdDecompressionObj* Decompressor_decompressobj(ZstdDecompressor* self, PyObject* args, PyObject* kwargs) {
static char* kwlist[] = {
"write_size",
NULL
};
ZstdDecompressionObj* result = NULL;
size_t outSize = ZSTD_DStreamOutSize();
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|k:decompressobj", kwlist, &outSize)) {
return NULL;
}
if (!outSize) {
PyErr_SetString(PyExc_ValueError, "write_size must be positive");
return NULL;
}
result = (ZstdDecompressionObj*)PyObject_CallObject((PyObject*)&ZstdDecompressionObjType, NULL);
if (!result) {
return NULL;
}
if (ensure_dctx(self, 1)) {
Py_DECREF(result);
return NULL;
}
result->decompressor = self;
Py_INCREF(result->decompressor);
result->outSize = outSize;
return result;
}
PyDoc_STRVAR(Decompressor_read_to_iter__doc__,
"read_to_iter(reader[, read_size=default, write_size=default, skip_bytes=0])\n"
"Read compressed data and return an iterator\n"
"\n"
"Returns an iterator of decompressed data chunks produced from reading from\n"
"the ``reader``.\n"
"\n"
"Compressed data will be obtained from ``reader`` by calling the\n"
"``read(size)`` method of it. The source data will be streamed into a\n"
"decompressor. As decompressed data is available, it will be exposed to the\n"
"returned iterator.\n"
"\n"
"Data is ``read()`` in chunks of size ``read_size`` and exposed to the\n"
"iterator in chunks of size ``write_size``. The default values are the input\n"
"and output sizes for a zstd streaming decompressor.\n"
"\n"
"There is also support for skipping the first ``skip_bytes`` of data from\n"
"the source.\n"
);
static ZstdDecompressorIterator* Decompressor_read_to_iter(ZstdDecompressor* self, PyObject* args, PyObject* kwargs) {
static char* kwlist[] = {
"reader",
"read_size",
"write_size",
"skip_bytes",
NULL
};
PyObject* reader;
size_t inSize = ZSTD_DStreamInSize();
size_t outSize = ZSTD_DStreamOutSize();
ZstdDecompressorIterator* result;
size_t skipBytes = 0;
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|kkk:read_to_iter", kwlist,
&reader, &inSize, &outSize, &skipBytes)) {
return NULL;
}
if (skipBytes >= inSize) {
PyErr_SetString(PyExc_ValueError,
"skip_bytes must be smaller than read_size");
return NULL;
}
result = (ZstdDecompressorIterator*)PyObject_CallObject((PyObject*)&ZstdDecompressorIteratorType, NULL);
if (!result) {
return NULL;
}
if (PyObject_HasAttrString(reader, "read")) {
result->reader = reader;
Py_INCREF(result->reader);
}
else if (1 == PyObject_CheckBuffer(reader)) {
/* Object claims it is a buffer. Try to get a handle to it. */
if (0 != PyObject_GetBuffer(reader, &result->buffer, PyBUF_CONTIG_RO)) {
goto except;
}
}
else {
PyErr_SetString(PyExc_ValueError,
"must pass an object with a read() method or conforms to buffer protocol");
goto except;
}
result->decompressor = self;
Py_INCREF(result->decompressor);
result->inSize = inSize;
result->outSize = outSize;
result->skipBytes = skipBytes;
if (ensure_dctx(self, 1)) {
goto except;
}
result->input.src = PyMem_Malloc(inSize);
if (!result->input.src) {
PyErr_NoMemory();
goto except;
}
goto finally;
except:
Py_CLEAR(result);
finally:
return result;
}
PyDoc_STRVAR(Decompressor_stream_reader__doc__,
"stream_reader(source, [read_size=default, [read_across_frames=False]])\n"
"\n"
"Obtain an object that behaves like an I/O stream that can be used for\n"
"reading decompressed output from an object.\n"
"\n"
"The source object can be any object with a ``read(size)`` method or that\n"
"conforms to the buffer protocol.\n"
"\n"
"``read_across_frames`` controls the behavior of ``read()`` when the end\n"
"of a zstd frame is reached. When ``True``, ``read()`` can potentially\n"
"return data belonging to multiple zstd frames. When ``False``, ``read()``\n"
"will return when the end of a frame is reached.\n"
);
static ZstdDecompressionReader* Decompressor_stream_reader(ZstdDecompressor* self, PyObject* args, PyObject* kwargs) {
static char* kwlist[] = {
"source",
"read_size",
"read_across_frames",
NULL
};
PyObject* source;
size_t readSize = ZSTD_DStreamInSize();
PyObject* readAcrossFrames = NULL;
ZstdDecompressionReader* result;
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|kO:stream_reader", kwlist,
&source, &readSize, &readAcrossFrames)) {
return NULL;
}
if (ensure_dctx(self, 1)) {
return NULL;
}
result = (ZstdDecompressionReader*)PyObject_CallObject((PyObject*)&ZstdDecompressionReaderType, NULL);
if (NULL == result) {
return NULL;
}
if (PyObject_HasAttrString(source, "read")) {
result->reader = source;
Py_INCREF(source);
result->readSize = readSize;
}
else if (1 == PyObject_CheckBuffer(source)) {
if (0 != PyObject_GetBuffer(source, &result->buffer, PyBUF_CONTIG_RO)) {
Py_CLEAR(result);
return NULL;
}
}
else {
PyErr_SetString(PyExc_TypeError,
"must pass an object with a read() method or that conforms to the buffer protocol");
Py_CLEAR(result);
return NULL;
}
result->decompressor = self;
Py_INCREF(self);
result->readAcrossFrames = readAcrossFrames ? PyObject_IsTrue(readAcrossFrames) : 0;
return result;
}
PyDoc_STRVAR(Decompressor_stream_writer__doc__,
"Create a context manager to write decompressed data to an object.\n"
"\n"
"The passed object must have a ``write()`` method.\n"
"\n"
"The caller feeds intput data to the object by calling ``write(data)``.\n"
"Decompressed data is written to the argument given as it is decompressed.\n"
"\n"
"An optional ``write_size`` argument defines the size of chunks to\n"
"``write()`` to the writer. It defaults to the default output size for a zstd\n"
"streaming decompressor.\n"
);
static ZstdDecompressionWriter* Decompressor_stream_writer(ZstdDecompressor* self, PyObject* args, PyObject* kwargs) {
static char* kwlist[] = {
"writer",
"write_size",
"write_return_read",
NULL
};
PyObject* writer;
size_t outSize = ZSTD_DStreamOutSize();
PyObject* writeReturnRead = NULL;
ZstdDecompressionWriter* result;
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|kO:stream_writer", kwlist,
&writer, &outSize, &writeReturnRead)) {
return NULL;
}
if (!PyObject_HasAttrString(writer, "write")) {
PyErr_SetString(PyExc_ValueError, "must pass an object with a write() method");
return NULL;
}
if (ensure_dctx(self, 1)) {
return NULL;
}
result = (ZstdDecompressionWriter*)PyObject_CallObject((PyObject*)&ZstdDecompressionWriterType, NULL);
if (!result) {
return NULL;
}
result->decompressor = self;
Py_INCREF(result->decompressor);
result->writer = writer;
Py_INCREF(result->writer);
result->outSize = outSize;
result->writeReturnRead = writeReturnRead ? PyObject_IsTrue(writeReturnRead) : 0;
return result;
}
PyDoc_STRVAR(Decompressor_decompress_content_dict_chain__doc__,
"Decompress a series of chunks using the content dictionary chaining technique\n"
);
static PyObject* Decompressor_decompress_content_dict_chain(ZstdDecompressor* self, PyObject* args, PyObject* kwargs) {
static char* kwlist[] = {
"frames",
NULL
};
PyObject* chunks;
Py_ssize_t chunksLen;
Py_ssize_t chunkIndex;
char parity = 0;
PyObject* chunk;
char* chunkData;
Py_ssize_t chunkSize;
size_t zresult;
ZSTD_frameHeader frameHeader;
void* buffer1 = NULL;
size_t buffer1Size = 0;
size_t buffer1ContentSize = 0;
void* buffer2 = NULL;
size_t buffer2Size = 0;
size_t buffer2ContentSize = 0;
void* destBuffer = NULL;
PyObject* result = NULL;
ZSTD_outBuffer outBuffer;
ZSTD_inBuffer inBuffer;
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O!:decompress_content_dict_chain",
kwlist, &PyList_Type, &chunks)) {
return NULL;
}
chunksLen = PyList_Size(chunks);
if (!chunksLen) {
PyErr_SetString(PyExc_ValueError, "empty input chain");
return NULL;
}
/* The first chunk should not be using a dictionary. We handle it specially. */
chunk = PyList_GetItem(chunks, 0);
if (!PyBytes_Check(chunk)) {
PyErr_SetString(PyExc_ValueError, "chunk 0 must be bytes");
return NULL;
}
/* We require that all chunks be zstd frames and that they have content size set. */
PyBytes_AsStringAndSize(chunk, &chunkData, &chunkSize);
zresult = ZSTD_getFrameHeader(&frameHeader, (void*)chunkData, chunkSize);
if (ZSTD_isError(zresult)) {
PyErr_SetString(PyExc_ValueError, "chunk 0 is not a valid zstd frame");
return NULL;
}
else if (zresult) {
PyErr_SetString(PyExc_ValueError, "chunk 0 is too small to contain a zstd frame");
return NULL;
}
if (ZSTD_CONTENTSIZE_UNKNOWN == frameHeader.frameContentSize) {
PyErr_SetString(PyExc_ValueError, "chunk 0 missing content size in frame");
return NULL;
}
assert(ZSTD_CONTENTSIZE_ERROR != frameHeader.frameContentSize);
/* We check against PY_SSIZE_T_MAX here because we ultimately cast the
* result to a Python object and it's length can be no greater than
* Py_ssize_t. In theory, we could have an intermediate frame that is
* larger. But a) why would this API be used for frames that large b)
* it isn't worth the complexity to support. */
assert(SIZE_MAX >= PY_SSIZE_T_MAX);
if (frameHeader.frameContentSize > PY_SSIZE_T_MAX) {
PyErr_SetString(PyExc_ValueError,
"chunk 0 is too large to decompress on this platform");
return NULL;
}
if (ensure_dctx(self, 0)) {
goto finally;
}
buffer1Size = (size_t)frameHeader.frameContentSize;
buffer1 = PyMem_Malloc(buffer1Size);
if (!buffer1) {
goto finally;
}
outBuffer.dst = buffer1;
outBuffer.size = buffer1Size;
outBuffer.pos = 0;
inBuffer.src = chunkData;
inBuffer.size = chunkSize;
inBuffer.pos = 0;
Py_BEGIN_ALLOW_THREADS
zresult = ZSTD_decompressStream(self->dctx, &outBuffer, &inBuffer);
Py_END_ALLOW_THREADS
if (ZSTD_isError(zresult)) {
PyErr_Format(ZstdError, "could not decompress chunk 0: %s", ZSTD_getErrorName(zresult));
goto finally;
}
else if (zresult) {
PyErr_Format(ZstdError, "chunk 0 did not decompress full frame");
goto finally;
}
buffer1ContentSize = outBuffer.pos;
/* Special case of a simple chain. */
if (1 == chunksLen) {
result = PyBytes_FromStringAndSize(buffer1, buffer1Size);
goto finally;
}
/* This should ideally look at next chunk. But this is slightly simpler. */
buffer2Size = (size_t)frameHeader.frameContentSize;
buffer2 = PyMem_Malloc(buffer2Size);
if (!buffer2) {
goto finally;
}
/* For each subsequent chunk, use the previous fulltext as a content dictionary.
Our strategy is to have 2 buffers. One holds the previous fulltext (to be
used as a content dictionary) and the other holds the new fulltext. The
buffers grow when needed but never decrease in size. This limits the
memory allocator overhead.
*/
for (chunkIndex = 1; chunkIndex < chunksLen; chunkIndex++) {
chunk = PyList_GetItem(chunks, chunkIndex);
if (!PyBytes_Check(chunk)) {
PyErr_Format(PyExc_ValueError, "chunk %zd must be bytes", chunkIndex);
goto finally;
}
PyBytes_AsStringAndSize(chunk, &chunkData, &chunkSize);
zresult = ZSTD_getFrameHeader(&frameHeader, (void*)chunkData, chunkSize);
if (ZSTD_isError(zresult)) {
PyErr_Format(PyExc_ValueError, "chunk %zd is not a valid zstd frame", chunkIndex);
goto finally;
}
else if (zresult) {
PyErr_Format(PyExc_ValueError, "chunk %zd is too small to contain a zstd frame", chunkIndex);
goto finally;
}
if (ZSTD_CONTENTSIZE_UNKNOWN == frameHeader.frameContentSize) {
PyErr_Format(PyExc_ValueError, "chunk %zd missing content size in frame", chunkIndex);
goto finally;
}
assert(ZSTD_CONTENTSIZE_ERROR != frameHeader.frameContentSize);
if (frameHeader.frameContentSize > PY_SSIZE_T_MAX) {
PyErr_Format(PyExc_ValueError,
"chunk %zd is too large to decompress on this platform", chunkIndex);
goto finally;
}
inBuffer.src = chunkData;
inBuffer.size = chunkSize;
inBuffer.pos = 0;
parity = chunkIndex % 2;
/* This could definitely be abstracted to reduce code duplication. */
if (parity) {
/* Resize destination buffer to hold larger content. */
if (buffer2Size < frameHeader.frameContentSize) {
buffer2Size = (size_t)frameHeader.frameContentSize;
destBuffer = PyMem_Realloc(buffer2, buffer2Size);
if (!destBuffer) {
goto finally;
}
buffer2 = destBuffer;
}
Py_BEGIN_ALLOW_THREADS
zresult = ZSTD_DCtx_refPrefix_advanced(self->dctx,
buffer1, buffer1ContentSize, ZSTD_dct_rawContent);
Py_END_ALLOW_THREADS
if (ZSTD_isError(zresult)) {
PyErr_Format(ZstdError,
"failed to load prefix dictionary at chunk %zd", chunkIndex);
goto finally;
}
outBuffer.dst = buffer2;
outBuffer.size = buffer2Size;
outBuffer.pos = 0;
Py_BEGIN_ALLOW_THREADS
zresult = ZSTD_decompressStream(self->dctx, &outBuffer, &inBuffer);
Py_END_ALLOW_THREADS
if (ZSTD_isError(zresult)) {
PyErr_Format(ZstdError, "could not decompress chunk %zd: %s",
chunkIndex, ZSTD_getErrorName(zresult));
goto finally;
}
else if (zresult) {
PyErr_Format(ZstdError, "chunk %zd did not decompress full frame",
chunkIndex);
goto finally;
}
buffer2ContentSize = outBuffer.pos;
}
else {
if (buffer1Size < frameHeader.frameContentSize) {
buffer1Size = (size_t)frameHeader.frameContentSize;
destBuffer = PyMem_Realloc(buffer1, buffer1Size);
if (!destBuffer) {
goto finally;
}
buffer1 = destBuffer;
}
Py_BEGIN_ALLOW_THREADS
zresult = ZSTD_DCtx_refPrefix_advanced(self->dctx,
buffer2, buffer2ContentSize, ZSTD_dct_rawContent);
Py_END_ALLOW_THREADS
if (ZSTD_isError(zresult)) {
PyErr_Format(ZstdError,
"failed to load prefix dictionary at chunk %zd", chunkIndex);
goto finally;
}
outBuffer.dst = buffer1;
outBuffer.size = buffer1Size;
outBuffer.pos = 0;
Py_BEGIN_ALLOW_THREADS
zresult = ZSTD_decompressStream(self->dctx, &outBuffer, &inBuffer);
Py_END_ALLOW_THREADS
if (ZSTD_isError(zresult)) {
PyErr_Format(ZstdError, "could not decompress chunk %zd: %s",
chunkIndex, ZSTD_getErrorName(zresult));
goto finally;
}
else if (zresult) {
PyErr_Format(ZstdError, "chunk %zd did not decompress full frame",
chunkIndex);
goto finally;
}
buffer1ContentSize = outBuffer.pos;
}
}
result = PyBytes_FromStringAndSize(parity ? buffer2 : buffer1,
parity ? buffer2ContentSize : buffer1ContentSize);
finally:
if (buffer2) {
PyMem_Free(buffer2);
}
if (buffer1) {
PyMem_Free(buffer1);
}
return result;
}
typedef struct {
void* sourceData;
size_t sourceSize;
size_t destSize;
} FramePointer;
typedef struct {
FramePointer* frames;
Py_ssize_t framesSize;
unsigned long long compressedSize;
} FrameSources;
typedef struct {
void* dest;
Py_ssize_t destSize;
BufferSegment* segments;
Py_ssize_t segmentsSize;
} DestBuffer;
typedef enum {
WorkerError_none = 0,
WorkerError_zstd = 1,
WorkerError_memory = 2,
WorkerError_sizeMismatch = 3,
WorkerError_unknownSize = 4,
} WorkerError;
typedef struct {
/* Source records and length */
FramePointer* framePointers;
/* Which records to process. */
Py_ssize_t startOffset;
Py_ssize_t endOffset;
unsigned long long totalSourceSize;
/* Compression state and settings. */
ZSTD_DCtx* dctx;
int requireOutputSizes;
/* Output storage. */
DestBuffer* destBuffers;
Py_ssize_t destCount;
/* Item that error occurred on. */
Py_ssize_t errorOffset;
/* If an error occurred. */
WorkerError error;
/* result from zstd decompression operation */
size_t zresult;
} WorkerState;
static void decompress_worker(WorkerState* state) {
size_t allocationSize;
DestBuffer* destBuffer;
Py_ssize_t frameIndex;
Py_ssize_t localOffset = 0;
Py_ssize_t currentBufferStartIndex = state->startOffset;
Py_ssize_t remainingItems = state->endOffset - state->startOffset + 1;
void* tmpBuf;
Py_ssize_t destOffset = 0;
FramePointer* framePointers = state->framePointers;
size_t zresult;
unsigned long long totalOutputSize = 0;
assert(NULL == state->destBuffers);
assert(0 == state->destCount);
assert(state->endOffset - state->startOffset >= 0);
/* We could get here due to the way work is allocated. Ideally we wouldn't
get here. But that would require a bit of a refactor in the caller. */
if (state->totalSourceSize > SIZE_MAX) {
state->error = WorkerError_memory;
state->errorOffset = 0;
return;
}
/*
* We need to allocate a buffer to hold decompressed data. How we do this
* depends on what we know about the output. The following scenarios are
* possible:
*
* 1. All structs defining frames declare the output size.
* 2. The decompressed size is embedded within the zstd frame.
* 3. The decompressed size is not stored anywhere.
*
* For now, we only support #1 and #2.
*/
/* Resolve ouput segments. */
for (frameIndex = state->startOffset; frameIndex <= state->endOffset; frameIndex++) {
FramePointer* fp = &framePointers[frameIndex];
unsigned long long decompressedSize;
if (0 == fp->destSize) {
decompressedSize = ZSTD_getFrameContentSize(fp->sourceData, fp->sourceSize);
if (ZSTD_CONTENTSIZE_ERROR == decompressedSize) {
state->error = WorkerError_unknownSize;
state->errorOffset = frameIndex;
return;
}
else if (ZSTD_CONTENTSIZE_UNKNOWN == decompressedSize) {
if (state->requireOutputSizes) {
state->error = WorkerError_unknownSize;
state->errorOffset = frameIndex;
return;
}
/* This will fail the assert for .destSize > 0 below. */
decompressedSize = 0;
}
if (decompressedSize > SIZE_MAX) {
state->error = WorkerError_memory;
state->errorOffset = frameIndex;
return;
}
fp->destSize = (size_t)decompressedSize;
}
totalOutputSize += fp->destSize;
}
state->destBuffers = calloc(1, sizeof(DestBuffer));
if (NULL == state->destBuffers) {
state->error = WorkerError_memory;
return;
}
state->destCount = 1;
destBuffer = &state->destBuffers[state->destCount - 1];
assert(framePointers[state->startOffset].destSize > 0); /* For now. */
allocationSize = roundpow2((size_t)state->totalSourceSize);
if (framePointers[state->startOffset].destSize > allocationSize) {
allocationSize = roundpow2(framePointers[state->startOffset].destSize);
}
destBuffer->dest = malloc(allocationSize);
if (NULL == destBuffer->dest) {
state->error = WorkerError_memory;
return;
}
destBuffer->destSize = allocationSize;
destBuffer->segments = calloc(remainingItems, sizeof(BufferSegment));
if (NULL == destBuffer->segments) {
/* Caller will free state->dest as part of cleanup. */
state->error = WorkerError_memory;
return;
}
destBuffer->segmentsSize = remainingItems;
for (frameIndex = state->startOffset; frameIndex <= state->endOffset; frameIndex++) {
ZSTD_outBuffer outBuffer;
ZSTD_inBuffer inBuffer;
const void* source = framePointers[frameIndex].sourceData;
const size_t sourceSize = framePointers[frameIndex].sourceSize;
void* dest;
const size_t decompressedSize = framePointers[frameIndex].destSize;
size_t destAvailable = destBuffer->destSize - destOffset;
assert(decompressedSize > 0); /* For now. */
/*
* Not enough space in current buffer. Finish current before and allocate and
* switch to a new one.
*/
if (decompressedSize > destAvailable) {
/*
* Shrinking the destination buffer is optional. But it should be cheap,
* so we just do it.
*/
if (destAvailable) {
tmpBuf = realloc(destBuffer->dest, destOffset);
if (NULL == tmpBuf) {
state->error = WorkerError_memory;
return;
}
destBuffer->dest = tmpBuf;
destBuffer->destSize = destOffset;
}
/* Truncate segments buffer. */
tmpBuf = realloc(destBuffer->segments,
(frameIndex - currentBufferStartIndex) * sizeof(BufferSegment));
if (NULL == tmpBuf) {
state->error = WorkerError_memory;
return;
}
destBuffer->segments = tmpBuf;
destBuffer->segmentsSize = frameIndex - currentBufferStartIndex;
/* Grow space for new DestBuffer. */
tmpBuf = realloc(state->destBuffers, (state->destCount + 1) * sizeof(DestBuffer));
if (NULL == tmpBuf) {
state->error = WorkerError_memory;
return;
}
state->destBuffers = tmpBuf;
state->destCount++;
destBuffer = &state->destBuffers[state->destCount - 1];
/* Don't take any chances will non-NULL pointers. */
memset(destBuffer, 0, sizeof(DestBuffer));
allocationSize = roundpow2((size_t)state->totalSourceSize);
if (decompressedSize > allocationSize) {
allocationSize = roundpow2(decompressedSize);
}
destBuffer->dest = malloc(allocationSize);
if (NULL == destBuffer->dest) {
state->error = WorkerError_memory;
return;
}
destBuffer->destSize = allocationSize;
destAvailable = allocationSize;
destOffset = 0;
localOffset = 0;
destBuffer->segments = calloc(remainingItems, sizeof(BufferSegment));
if (NULL == destBuffer->segments) {
state->error = WorkerError_memory;
return;
}
destBuffer->segmentsSize = remainingItems;
currentBufferStartIndex = frameIndex;
}
dest = (char*)destBuffer->dest + destOffset;
outBuffer.dst = dest;
outBuffer.size = decompressedSize;
outBuffer.pos = 0;
inBuffer.src = source;
inBuffer.size = sourceSize;
inBuffer.pos = 0;
zresult = ZSTD_decompressStream(state->dctx, &outBuffer, &inBuffer);
if (ZSTD_isError(zresult)) {
state->error = WorkerError_zstd;
state->zresult = zresult;
state->errorOffset = frameIndex;
return;
}
else if (zresult || outBuffer.pos != decompressedSize) {
state->error = WorkerError_sizeMismatch;
state->zresult = outBuffer.pos;
state->errorOffset = frameIndex;
return;
}
destBuffer->segments[localOffset].offset = destOffset;
destBuffer->segments[localOffset].length = outBuffer.pos;
destOffset += outBuffer.pos;
localOffset++;
remainingItems--;
}
if (destBuffer->destSize > destOffset) {
tmpBuf = realloc(destBuffer->dest, destOffset);
if (NULL == tmpBuf) {
state->error = WorkerError_memory;
return;
}
destBuffer->dest = tmpBuf;
destBuffer->destSize = destOffset;
}
}
ZstdBufferWithSegmentsCollection* decompress_from_framesources(ZstdDecompressor* decompressor, FrameSources* frames,
Py_ssize_t threadCount) {
Py_ssize_t i = 0;
int errored = 0;
Py_ssize_t segmentsCount;
ZstdBufferWithSegments* bws = NULL;
PyObject* resultArg = NULL;
Py_ssize_t resultIndex;
ZstdBufferWithSegmentsCollection* result = NULL;
FramePointer* framePointers = frames->frames;
unsigned long long workerBytes = 0;
Py_ssize_t currentThread = 0;
Py_ssize_t workerStartOffset = 0;
POOL_ctx* pool = NULL;
WorkerState* workerStates = NULL;
unsigned long long bytesPerWorker;
/* Caller should normalize 0 and negative values to 1 or larger. */
assert(threadCount >= 1);
/* More threads than inputs makes no sense under any conditions. */
threadCount = frames->framesSize < threadCount ? frames->framesSize
: threadCount;
/* TODO lower thread count if input size is too small and threads would just
add overhead. */
if (decompressor->dict) {
if (ensure_ddict(decompressor->dict)) {
return NULL;
}
}
/* If threadCount==1, we don't start a thread pool. But we do leverage the
same API for dispatching work. */
workerStates = PyMem_Malloc(threadCount * sizeof(WorkerState));
if (NULL == workerStates) {
PyErr_NoMemory();
goto finally;
}
memset(workerStates, 0, threadCount * sizeof(WorkerState));
if (threadCount > 1) {
pool = POOL_create(threadCount, 1);
if (NULL == pool) {
PyErr_SetString(ZstdError, "could not initialize zstd thread pool");
goto finally;
}
}
bytesPerWorker = frames->compressedSize / threadCount;
if (bytesPerWorker > SIZE_MAX) {
PyErr_SetString(ZstdError, "too much data per worker for this platform");
goto finally;
}
for (i = 0; i < threadCount; i++) {
size_t zresult;
workerStates[i].dctx = ZSTD_createDCtx();
if (NULL == workerStates[i].dctx) {
PyErr_NoMemory();
goto finally;
}
ZSTD_copyDCtx(workerStates[i].dctx, decompressor->dctx);
if (decompressor->dict) {
zresult = ZSTD_DCtx_refDDict(workerStates[i].dctx, decompressor->dict->ddict);
if (zresult) {
PyErr_Format(ZstdError, "unable to reference prepared dictionary: %s",
ZSTD_getErrorName(zresult));
goto finally;
}
}
workerStates[i].framePointers = framePointers;
workerStates[i].requireOutputSizes = 1;
}
Py_BEGIN_ALLOW_THREADS
/* There are many ways to split work among workers.
For now, we take a simple approach of splitting work so each worker
gets roughly the same number of input bytes. This will result in more
starvation than running N>threadCount jobs. But it avoids complications
around state tracking, which could involve extra locking.
*/
for (i = 0; i < frames->framesSize; i++) {
workerBytes += frames->frames[i].sourceSize;
/*
* The last worker/thread needs to handle all remaining work. Don't
* trigger it prematurely. Defer to the block outside of the loop.
* (But still process this loop so workerBytes is correct.
*/
if (currentThread == threadCount - 1) {
continue;
}
if (workerBytes >= bytesPerWorker) {
workerStates[currentThread].startOffset = workerStartOffset;
workerStates[currentThread].endOffset = i;
workerStates[currentThread].totalSourceSize = workerBytes;
if (threadCount > 1) {
POOL_add(pool, (POOL_function)decompress_worker, &workerStates[currentThread]);
}
else {
decompress_worker(&workerStates[currentThread]);
}
currentThread++;
workerStartOffset = i + 1;
workerBytes = 0;
}
}
if (workerBytes) {
workerStates[currentThread].startOffset = workerStartOffset;
workerStates[currentThread].endOffset = frames->framesSize - 1;
workerStates[currentThread].totalSourceSize = workerBytes;
if (threadCount > 1) {
POOL_add(pool, (POOL_function)decompress_worker, &workerStates[currentThread]);
}
else {
decompress_worker(&workerStates[currentThread]);
}
}
if (threadCount > 1) {
POOL_free(pool);
pool = NULL;
}
Py_END_ALLOW_THREADS
for (i = 0; i < threadCount; i++) {
switch (workerStates[i].error) {
case WorkerError_none:
break;
case WorkerError_zstd:
PyErr_Format(ZstdError, "error decompressing item %zd: %s",
workerStates[i].errorOffset, ZSTD_getErrorName(workerStates[i].zresult));
errored = 1;
break;
case WorkerError_memory:
PyErr_NoMemory();
errored = 1;
break;
case WorkerError_sizeMismatch:
PyErr_Format(ZstdError, "error decompressing item %zd: decompressed %zu bytes; expected %zu",
workerStates[i].errorOffset, workerStates[i].zresult,
framePointers[workerStates[i].errorOffset].destSize);
errored = 1;
break;
case WorkerError_unknownSize:
PyErr_Format(PyExc_ValueError, "could not determine decompressed size of item %zd",
workerStates[i].errorOffset);
errored = 1;
break;
default:
PyErr_Format(ZstdError, "unhandled error type: %d; this is a bug",
workerStates[i].error);
errored = 1;
break;
}
if (errored) {
break;
}
}
if (errored) {
goto finally;
}
segmentsCount = 0;
for (i = 0; i < threadCount; i++) {
segmentsCount += workerStates[i].destCount;
}
resultArg = PyTuple_New(segmentsCount);
if (NULL == resultArg) {
goto finally;
}
resultIndex = 0;
for (i = 0; i < threadCount; i++) {
Py_ssize_t bufferIndex;
WorkerState* state = &workerStates[i];
for (bufferIndex = 0; bufferIndex < state->destCount; bufferIndex++) {
DestBuffer* destBuffer = &state->destBuffers[bufferIndex];
bws = BufferWithSegments_FromMemory(destBuffer->dest, destBuffer->destSize,
destBuffer->segments, destBuffer->segmentsSize);
if (NULL == bws) {
goto finally;
}
/*
* Memory for buffer and segments was allocated using malloc() in worker
* and the memory is transferred to the BufferWithSegments instance. So
* tell instance to use free() and NULL the reference in the state struct
* so it isn't freed below.
*/
bws->useFree = 1;
destBuffer->dest = NULL;
destBuffer->segments = NULL;
PyTuple_SET_ITEM(resultArg, resultIndex++, (PyObject*)bws);
}
}
result = (ZstdBufferWithSegmentsCollection*)PyObject_CallObject(
(PyObject*)&ZstdBufferWithSegmentsCollectionType, resultArg);
finally:
Py_CLEAR(resultArg);
if (workerStates) {
for (i = 0; i < threadCount; i++) {
Py_ssize_t bufferIndex;
WorkerState* state = &workerStates[i];
if (state->dctx) {
ZSTD_freeDCtx(state->dctx);
}
for (bufferIndex = 0; bufferIndex < state->destCount; bufferIndex++) {
if (state->destBuffers) {
/*
* Will be NULL if memory transfered to a BufferWithSegments.
* Otherwise it is left over after an error occurred.
*/
free(state->destBuffers[bufferIndex].dest);
free(state->destBuffers[bufferIndex].segments);
}
}
free(state->destBuffers);
}
PyMem_Free(workerStates);
}
POOL_free(pool);
return result;
}
PyDoc_STRVAR(Decompressor_multi_decompress_to_buffer__doc__,
"Decompress multiple frames to output buffers\n"
"\n"
"Receives a ``BufferWithSegments``, a ``BufferWithSegmentsCollection`` or a\n"
"list of bytes-like objects. Each item in the passed collection should be a\n"
"compressed zstd frame.\n"
"\n"
"Unless ``decompressed_sizes`` is specified, the content size *must* be\n"
"written into the zstd frame header. If ``decompressed_sizes`` is specified,\n"
"it is an object conforming to the buffer protocol that represents an array\n"
"of 64-bit unsigned integers in the machine's native format. Specifying\n"
"``decompressed_sizes`` avoids a pre-scan of each frame to determine its\n"
"output size.\n"
"\n"
"Returns a ``BufferWithSegmentsCollection`` containing the decompressed\n"
"data. All decompressed data is allocated in a single memory buffer. The\n"
"``BufferWithSegments`` instance tracks which objects are at which offsets\n"
"and their respective lengths.\n"
"\n"
"The ``threads`` argument controls how many threads to use for operations.\n"
"Negative values will use the same number of threads as logical CPUs on the\n"
"machine.\n"
);
static ZstdBufferWithSegmentsCollection* Decompressor_multi_decompress_to_buffer(ZstdDecompressor* self, PyObject* args, PyObject* kwargs) {
static char* kwlist[] = {
"frames",
"decompressed_sizes",
"threads",
NULL
};
PyObject* frames;
Py_buffer frameSizes;
int threads = 0;
Py_ssize_t frameCount;
Py_buffer* frameBuffers = NULL;
FramePointer* framePointers = NULL;
unsigned long long* frameSizesP = NULL;
unsigned long long totalInputSize = 0;
FrameSources frameSources;
ZstdBufferWithSegmentsCollection* result = NULL;
Py_ssize_t i;
memset(&frameSizes, 0, sizeof(frameSizes));
#if PY_MAJOR_VERSION >= 3
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|y*i:multi_decompress_to_buffer",
#else
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|s*i:multi_decompress_to_buffer",
#endif
kwlist, &frames, &frameSizes, &threads)) {
return NULL;
}
if (frameSizes.buf) {
if (!PyBuffer_IsContiguous(&frameSizes, 'C') || frameSizes.ndim > 1) {
PyErr_SetString(PyExc_ValueError, "decompressed_sizes buffer should be contiguous and have a single dimension");
goto finally;
}
frameSizesP = (unsigned long long*)frameSizes.buf;
}
if (threads < 0) {
threads = cpu_count();
}
if (threads < 2) {
threads = 1;
}
if (PyObject_TypeCheck(frames, &ZstdBufferWithSegmentsType)) {
ZstdBufferWithSegments* buffer = (ZstdBufferWithSegments*)frames;
frameCount = buffer->segmentCount;
if (frameSizes.buf && frameSizes.len != frameCount * (Py_ssize_t)sizeof(unsigned long long)) {
PyErr_Format(PyExc_ValueError, "decompressed_sizes size mismatch; expected %zd, got %zd",
frameCount * sizeof(unsigned long long), frameSizes.len);
goto finally;
}
framePointers = PyMem_Malloc(frameCount * sizeof(FramePointer));
if (!framePointers) {
PyErr_NoMemory();
goto finally;
}
for (i = 0; i < frameCount; i++) {
void* sourceData;
unsigned long long sourceSize;
unsigned long long decompressedSize = 0;
if (buffer->segments[i].offset + buffer->segments[i].length > buffer->dataSize) {
PyErr_Format(PyExc_ValueError, "item %zd has offset outside memory area", i);
goto finally;
}
sourceData = (char*)buffer->data + buffer->segments[i].offset;
sourceSize = buffer->segments[i].length;
totalInputSize += sourceSize;
if (frameSizesP) {
decompressedSize = frameSizesP[i];
}
if (sourceSize > SIZE_MAX) {
PyErr_Format(PyExc_ValueError,
"item %zd is too large for this platform", i);
goto finally;
}
if (decompressedSize > SIZE_MAX) {
PyErr_Format(PyExc_ValueError,
"decompressed size of item %zd is too large for this platform", i);
goto finally;
}
framePointers[i].sourceData = sourceData;
framePointers[i].sourceSize = (size_t)sourceSize;
framePointers[i].destSize = (size_t)decompressedSize;
}
}
else if (PyObject_TypeCheck(frames, &ZstdBufferWithSegmentsCollectionType)) {
Py_ssize_t offset = 0;
ZstdBufferWithSegments* buffer;
ZstdBufferWithSegmentsCollection* collection = (ZstdBufferWithSegmentsCollection*)frames;
frameCount = BufferWithSegmentsCollection_length(collection);
if (frameSizes.buf && frameSizes.len != frameCount) {
PyErr_Format(PyExc_ValueError,
"decompressed_sizes size mismatch; expected %zd; got %zd",
frameCount * sizeof(unsigned long long), frameSizes.len);
goto finally;
}
framePointers = PyMem_Malloc(frameCount * sizeof(FramePointer));
if (NULL == framePointers) {
PyErr_NoMemory();
goto finally;
}
/* Iterate the data structure directly because it is faster. */
for (i = 0; i < collection->bufferCount; i++) {
Py_ssize_t segmentIndex;
buffer = collection->buffers[i];
for (segmentIndex = 0; segmentIndex < buffer->segmentCount; segmentIndex++) {
unsigned long long decompressedSize = frameSizesP ? frameSizesP[offset] : 0;
if (buffer->segments[segmentIndex].offset + buffer->segments[segmentIndex].length > buffer->dataSize) {
PyErr_Format(PyExc_ValueError, "item %zd has offset outside memory area",
offset);
goto finally;
}
if (buffer->segments[segmentIndex].length > SIZE_MAX) {
PyErr_Format(PyExc_ValueError,
"item %zd in buffer %zd is too large for this platform",
segmentIndex, i);
goto finally;
}
if (decompressedSize > SIZE_MAX) {
PyErr_Format(PyExc_ValueError,
"decompressed size of item %zd in buffer %zd is too large for this platform",
segmentIndex, i);
goto finally;
}
totalInputSize += buffer->segments[segmentIndex].length;
framePointers[offset].sourceData = (char*)buffer->data + buffer->segments[segmentIndex].offset;
framePointers[offset].sourceSize = (size_t)buffer->segments[segmentIndex].length;
framePointers[offset].destSize = (size_t)decompressedSize;
offset++;
}
}
}
else if (PyList_Check(frames)) {
frameCount = PyList_GET_SIZE(frames);
if (frameSizes.buf && frameSizes.len != frameCount * (Py_ssize_t)sizeof(unsigned long long)) {
PyErr_Format(PyExc_ValueError, "decompressed_sizes size mismatch; expected %zd, got %zd",
frameCount * sizeof(unsigned long long), frameSizes.len);
goto finally;
}
framePointers = PyMem_Malloc(frameCount * sizeof(FramePointer));
if (!framePointers) {
PyErr_NoMemory();
goto finally;
}
frameBuffers = PyMem_Malloc(frameCount * sizeof(Py_buffer));
if (NULL == frameBuffers) {
PyErr_NoMemory();
goto finally;
}
memset(frameBuffers, 0, frameCount * sizeof(Py_buffer));
/* Do a pass to assemble info about our input buffers and output sizes. */
for (i = 0; i < frameCount; i++) {
unsigned long long decompressedSize = frameSizesP ? frameSizesP[i] : 0;
if (0 != PyObject_GetBuffer(PyList_GET_ITEM(frames, i),
&frameBuffers[i], PyBUF_CONTIG_RO)) {
PyErr_Clear();
PyErr_Format(PyExc_TypeError, "item %zd not a bytes like object", i);
goto finally;
}
if (decompressedSize > SIZE_MAX) {
PyErr_Format(PyExc_ValueError,
"decompressed size of item %zd is too large for this platform", i);
goto finally;
}
totalInputSize += frameBuffers[i].len;
framePointers[i].sourceData = frameBuffers[i].buf;
framePointers[i].sourceSize = frameBuffers[i].len;
framePointers[i].destSize = (size_t)decompressedSize;
}
}
else {
PyErr_SetString(PyExc_TypeError, "argument must be list or BufferWithSegments");
goto finally;
}
/* We now have an array with info about our inputs and outputs. Feed it into
our generic decompression function. */
frameSources.frames = framePointers;
frameSources.framesSize = frameCount;
frameSources.compressedSize = totalInputSize;
result = decompress_from_framesources(self, &frameSources, threads);
finally:
if (frameSizes.buf) {
PyBuffer_Release(&frameSizes);
}
PyMem_Free(framePointers);
if (frameBuffers) {
for (i = 0; i < frameCount; i++) {
PyBuffer_Release(&frameBuffers[i]);
}
PyMem_Free(frameBuffers);
}
return result;
}
static PyMethodDef Decompressor_methods[] = {
{ "copy_stream", (PyCFunction)Decompressor_copy_stream, METH_VARARGS | METH_KEYWORDS,
Decompressor_copy_stream__doc__ },
{ "decompress", (PyCFunction)Decompressor_decompress, METH_VARARGS | METH_KEYWORDS,
Decompressor_decompress__doc__ },
{ "decompressobj", (PyCFunction)Decompressor_decompressobj, METH_VARARGS | METH_KEYWORDS,
Decompressor_decompressobj__doc__ },
{ "read_to_iter", (PyCFunction)Decompressor_read_to_iter, METH_VARARGS | METH_KEYWORDS,
Decompressor_read_to_iter__doc__ },
/* TODO Remove deprecated API */
{ "read_from", (PyCFunction)Decompressor_read_to_iter, METH_VARARGS | METH_KEYWORDS,
Decompressor_read_to_iter__doc__ },
{ "stream_reader", (PyCFunction)Decompressor_stream_reader,
METH_VARARGS | METH_KEYWORDS, Decompressor_stream_reader__doc__ },
{ "stream_writer", (PyCFunction)Decompressor_stream_writer, METH_VARARGS | METH_KEYWORDS,
Decompressor_stream_writer__doc__ },
/* TODO remove deprecated API */
{ "write_to", (PyCFunction)Decompressor_stream_writer, METH_VARARGS | METH_KEYWORDS,
Decompressor_stream_writer__doc__ },
{ "decompress_content_dict_chain", (PyCFunction)Decompressor_decompress_content_dict_chain,
METH_VARARGS | METH_KEYWORDS, Decompressor_decompress_content_dict_chain__doc__ },
{ "multi_decompress_to_buffer", (PyCFunction)Decompressor_multi_decompress_to_buffer,
METH_VARARGS | METH_KEYWORDS, Decompressor_multi_decompress_to_buffer__doc__ },
{ "memory_size", (PyCFunction)Decompressor_memory_size, METH_NOARGS,
Decompressor_memory_size__doc__ },
{ NULL, NULL }
};
PyTypeObject ZstdDecompressorType = {
PyVarObject_HEAD_INIT(NULL, 0)
"zstd.ZstdDecompressor", /* tp_name */
sizeof(ZstdDecompressor), /* tp_basicsize */
0, /* tp_itemsize */
(destructor)Decompressor_dealloc, /* tp_dealloc */
0, /* tp_print */
0, /* tp_getattr */
0, /* tp_setattr */
0, /* tp_compare */
0, /* tp_repr */
0, /* tp_as_number */
0, /* tp_as_sequence */
0, /* tp_as_mapping */
0, /* tp_hash */
0, /* tp_call */
0, /* tp_str */
0, /* tp_getattro */
0, /* tp_setattro */
0, /* tp_as_buffer */
Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE, /* tp_flags */
Decompressor__doc__, /* tp_doc */
0, /* tp_traverse */
0, /* tp_clear */
0, /* tp_richcompare */
0, /* tp_weaklistoffset */
0, /* tp_iter */
0, /* tp_iternext */
Decompressor_methods, /* tp_methods */
0, /* tp_members */
0, /* tp_getset */
0, /* tp_base */
0, /* tp_dict */
0, /* tp_descr_get */
0, /* tp_descr_set */
0, /* tp_dictoffset */
(initproc)Decompressor_init, /* tp_init */
0, /* tp_alloc */
PyType_GenericNew, /* tp_new */
};
void decompressor_module_init(PyObject* mod) {
Py_TYPE(&ZstdDecompressorType) = &PyType_Type;
if (PyType_Ready(&ZstdDecompressorType) < 0) {
return;
}
Py_INCREF((PyObject*)&ZstdDecompressorType);
PyModule_AddObject(mod, "ZstdDecompressor",
(PyObject*)&ZstdDecompressorType);
}