##// END OF EJS Templates
wireprotov2peer: stream decoded responses...
wireprotov2peer: stream decoded responses Previously, wire protocol version 2 would buffer all response data. Only once all data was received did we CBOR decode it and resolve the future associated with the command. This was obviously not desirable. In future commits that introduce large response payloads, this caused significant memory bloat and slowed down client operations due to waiting on the server. This commit refactors the response handling code so that response data can be streamed. Command response objects now contain a buffered CBOR decoder. As new data arrives, it is fed into the decoder. Decoded objects are made available to the generator as they are decoded. Because there is a separate thread processing incoming frames and feeding data into the response object, there is the potential for race conditions when mutating response objects. So a lock has been added to guard access to critical state variables. Because the generator emitting decoded objects needs to wait on those objects to become available, we've added an Event for the generator to wait on so it doesn't busy loop. This does mean there is the potential for deadlocks. And I'm pretty sure they can occur in some scenarios. We already have a handful of TODOs around this. But I've added some more. Fixing this will likely require moving the background thread receiving frames into clienthandler. We likely would have done this anyway when implementing the client bits for the SSH transport. Test output changes because the initial CBOR map holding the overall response state is now always handled internally by the response object. Differential Revision: https://phab.mercurial-scm.org/D4474

File last commit:

r37513:b1fb341d default
r39597:d06834e0 default
Show More
decompressor.c
1803 lines | 50.0 KiB | text/x-c | CLexer
/**
* Copyright (c) 2016-present, Gregory Szorc
* All rights reserved.
*
* This software may be modified and distributed under the terms
* of the BSD license. See the LICENSE file for details.
*/
#include "python-zstandard.h"
#include "pool.h"
extern PyObject* ZstdError;
/**
* Ensure the ZSTD_DCtx on a decompressor is initiated and ready for a new operation.
*/
int ensure_dctx(ZstdDecompressor* decompressor, int loadDict) {
size_t zresult;
ZSTD_DCtx_reset(decompressor->dctx);
if (decompressor->maxWindowSize) {
zresult = ZSTD_DCtx_setMaxWindowSize(decompressor->dctx, decompressor->maxWindowSize);
if (ZSTD_isError(zresult)) {
PyErr_Format(ZstdError, "unable to set max window size: %s",
ZSTD_getErrorName(zresult));
return 1;
}
}
zresult = ZSTD_DCtx_setFormat(decompressor->dctx, decompressor->format);
if (ZSTD_isError(zresult)) {
PyErr_Format(ZstdError, "unable to set decoding format: %s",
ZSTD_getErrorName(zresult));
return 1;
}
if (loadDict && decompressor->dict) {
if (ensure_ddict(decompressor->dict)) {
return 1;
}
zresult = ZSTD_DCtx_refDDict(decompressor->dctx, decompressor->dict->ddict);
if (ZSTD_isError(zresult)) {
PyErr_Format(ZstdError, "unable to reference prepared dictionary: %s",
ZSTD_getErrorName(zresult));
return 1;
}
}
return 0;
}
PyDoc_STRVAR(Decompressor__doc__,
"ZstdDecompressor(dict_data=None)\n"
"\n"
"Create an object used to perform Zstandard decompression.\n"
"\n"
"An instance can perform multiple decompression operations."
);
static int Decompressor_init(ZstdDecompressor* self, PyObject* args, PyObject* kwargs) {
static char* kwlist[] = {
"dict_data",
"max_window_size",
"format",
NULL
};
ZstdCompressionDict* dict = NULL;
size_t maxWindowSize = 0;
ZSTD_format_e format = ZSTD_f_zstd1;
self->dctx = NULL;
self->dict = NULL;
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|O!II:ZstdDecompressor", kwlist,
&ZstdCompressionDictType, &dict, &maxWindowSize, &format)) {
return -1;
}
self->dctx = ZSTD_createDCtx();
if (!self->dctx) {
PyErr_NoMemory();
goto except;
}
self->maxWindowSize = maxWindowSize;
self->format = format;
if (dict) {
self->dict = dict;
Py_INCREF(dict);
}
if (ensure_dctx(self, 1)) {
goto except;
}
return 0;
except:
Py_CLEAR(self->dict);
if (self->dctx) {
ZSTD_freeDCtx(self->dctx);
self->dctx = NULL;
}
return -1;
}
static void Decompressor_dealloc(ZstdDecompressor* self) {
Py_CLEAR(self->dict);
if (self->dctx) {
ZSTD_freeDCtx(self->dctx);
self->dctx = NULL;
}
PyObject_Del(self);
}
PyDoc_STRVAR(Decompressor_memory_size__doc__,
"memory_size() -- Size of decompression context, in bytes\n"
);
static PyObject* Decompressor_memory_size(ZstdDecompressor* self) {
if (self->dctx) {
return PyLong_FromSize_t(ZSTD_sizeof_DCtx(self->dctx));
}
else {
PyErr_SetString(ZstdError, "no decompressor context found; this should never happen");
return NULL;
}
}
PyDoc_STRVAR(Decompressor_copy_stream__doc__,
"copy_stream(ifh, ofh[, read_size=default, write_size=default]) -- decompress data between streams\n"
"\n"
"Compressed data will be read from ``ifh``, decompressed, and written to\n"
"``ofh``. ``ifh`` must have a ``read(size)`` method. ``ofh`` must have a\n"
"``write(data)`` method.\n"
"\n"
"The optional ``read_size`` and ``write_size`` arguments control the chunk\n"
"size of data that is ``read()`` and ``write()`` between streams. They default\n"
"to the default input and output sizes of zstd decompressor streams.\n"
);
static PyObject* Decompressor_copy_stream(ZstdDecompressor* self, PyObject* args, PyObject* kwargs) {
static char* kwlist[] = {
"ifh",
"ofh",
"read_size",
"write_size",
NULL
};
PyObject* source;
PyObject* dest;
size_t inSize = ZSTD_DStreamInSize();
size_t outSize = ZSTD_DStreamOutSize();
ZSTD_inBuffer input;
ZSTD_outBuffer output;
Py_ssize_t totalRead = 0;
Py_ssize_t totalWrite = 0;
char* readBuffer;
Py_ssize_t readSize;
PyObject* readResult = NULL;
PyObject* res = NULL;
size_t zresult = 0;
PyObject* writeResult;
PyObject* totalReadPy;
PyObject* totalWritePy;
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "OO|kk:copy_stream", kwlist,
&source, &dest, &inSize, &outSize)) {
return NULL;
}
if (!PyObject_HasAttrString(source, "read")) {
PyErr_SetString(PyExc_ValueError, "first argument must have a read() method");
return NULL;
}
if (!PyObject_HasAttrString(dest, "write")) {
PyErr_SetString(PyExc_ValueError, "second argument must have a write() method");
return NULL;
}
/* Prevent free on uninitialized memory in finally. */
output.dst = NULL;
if (ensure_dctx(self, 1)) {
res = NULL;
goto finally;
}
output.dst = PyMem_Malloc(outSize);
if (!output.dst) {
PyErr_NoMemory();
res = NULL;
goto finally;
}
output.size = outSize;
output.pos = 0;
/* Read source stream until EOF */
while (1) {
readResult = PyObject_CallMethod(source, "read", "n", inSize);
if (!readResult) {
PyErr_SetString(ZstdError, "could not read() from source");
goto finally;
}
PyBytes_AsStringAndSize(readResult, &readBuffer, &readSize);
/* If no data was read, we're at EOF. */
if (0 == readSize) {
break;
}
totalRead += readSize;
/* Send data to decompressor */
input.src = readBuffer;
input.size = readSize;
input.pos = 0;
while (input.pos < input.size) {
Py_BEGIN_ALLOW_THREADS
zresult = ZSTD_decompress_generic(self->dctx, &output, &input);
Py_END_ALLOW_THREADS
if (ZSTD_isError(zresult)) {
PyErr_Format(ZstdError, "zstd decompressor error: %s",
ZSTD_getErrorName(zresult));
res = NULL;
goto finally;
}
if (output.pos) {
#if PY_MAJOR_VERSION >= 3
writeResult = PyObject_CallMethod(dest, "write", "y#",
#else
writeResult = PyObject_CallMethod(dest, "write", "s#",
#endif
output.dst, output.pos);
Py_XDECREF(writeResult);
totalWrite += output.pos;
output.pos = 0;
}
}
Py_CLEAR(readResult);
}
/* Source stream is exhausted. Finish up. */
totalReadPy = PyLong_FromSsize_t(totalRead);
totalWritePy = PyLong_FromSsize_t(totalWrite);
res = PyTuple_Pack(2, totalReadPy, totalWritePy);
Py_DECREF(totalReadPy);
Py_DECREF(totalWritePy);
finally:
if (output.dst) {
PyMem_Free(output.dst);
}
Py_XDECREF(readResult);
return res;
}
PyDoc_STRVAR(Decompressor_decompress__doc__,
"decompress(data[, max_output_size=None]) -- Decompress data in its entirety\n"
"\n"
"This method will decompress the entirety of the argument and return the\n"
"result.\n"
"\n"
"The input bytes are expected to contain a full Zstandard frame (something\n"
"compressed with ``ZstdCompressor.compress()`` or similar). If the input does\n"
"not contain a full frame, an exception will be raised.\n"
"\n"
"If the frame header of the compressed data does not contain the content size\n"
"``max_output_size`` must be specified or ``ZstdError`` will be raised. An\n"
"allocation of size ``max_output_size`` will be performed and an attempt will\n"
"be made to perform decompression into that buffer. If the buffer is too\n"
"small or cannot be allocated, ``ZstdError`` will be raised. The buffer will\n"
"be resized if it is too large.\n"
"\n"
"Uncompressed data could be much larger than compressed data. As a result,\n"
"calling this function could result in a very large memory allocation being\n"
"performed to hold the uncompressed data. Therefore it is **highly**\n"
"recommended to use a streaming decompression method instead of this one.\n"
);
PyObject* Decompressor_decompress(ZstdDecompressor* self, PyObject* args, PyObject* kwargs) {
static char* kwlist[] = {
"data",
"max_output_size",
NULL
};
Py_buffer source;
Py_ssize_t maxOutputSize = 0;
unsigned long long decompressedSize;
size_t destCapacity;
PyObject* result = NULL;
size_t zresult;
ZSTD_outBuffer outBuffer;
ZSTD_inBuffer inBuffer;
#if PY_MAJOR_VERSION >= 3
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "y*|n:decompress",
#else
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "s*|n:decompress",
#endif
kwlist, &source, &maxOutputSize)) {
return NULL;
}
if (!PyBuffer_IsContiguous(&source, 'C') || source.ndim > 1) {
PyErr_SetString(PyExc_ValueError,
"data buffer should be contiguous and have at most one dimension");
goto finally;
}
if (ensure_dctx(self, 1)) {
goto finally;
}
decompressedSize = ZSTD_getFrameContentSize(source.buf, source.len);
if (ZSTD_CONTENTSIZE_ERROR == decompressedSize) {
PyErr_SetString(ZstdError, "error determining content size from frame header");
goto finally;
}
/* Special case of empty frame. */
else if (0 == decompressedSize) {
result = PyBytes_FromStringAndSize("", 0);
goto finally;
}
/* Missing content size in frame header. */
if (ZSTD_CONTENTSIZE_UNKNOWN == decompressedSize) {
if (0 == maxOutputSize) {
PyErr_SetString(ZstdError, "could not determine content size in frame header");
goto finally;
}
result = PyBytes_FromStringAndSize(NULL, maxOutputSize);
destCapacity = maxOutputSize;
decompressedSize = 0;
}
/* Size is recorded in frame header. */
else {
assert(SIZE_MAX >= PY_SSIZE_T_MAX);
if (decompressedSize > PY_SSIZE_T_MAX) {
PyErr_SetString(ZstdError, "frame is too large to decompress on this platform");
goto finally;
}
result = PyBytes_FromStringAndSize(NULL, (Py_ssize_t)decompressedSize);
destCapacity = (size_t)decompressedSize;
}
if (!result) {
goto finally;
}
outBuffer.dst = PyBytes_AsString(result);
outBuffer.size = destCapacity;
outBuffer.pos = 0;
inBuffer.src = source.buf;
inBuffer.size = source.len;
inBuffer.pos = 0;
Py_BEGIN_ALLOW_THREADS
zresult = ZSTD_decompress_generic(self->dctx, &outBuffer, &inBuffer);
Py_END_ALLOW_THREADS
if (ZSTD_isError(zresult)) {
PyErr_Format(ZstdError, "decompression error: %s", ZSTD_getErrorName(zresult));
Py_CLEAR(result);
goto finally;
}
else if (zresult) {
PyErr_Format(ZstdError, "decompression error: did not decompress full frame");
Py_CLEAR(result);
goto finally;
}
else if (decompressedSize && outBuffer.pos != decompressedSize) {
PyErr_Format(ZstdError, "decompression error: decompressed %zu bytes; expected %llu",
zresult, decompressedSize);
Py_CLEAR(result);
goto finally;
}
else if (outBuffer.pos < destCapacity) {
if (safe_pybytes_resize(&result, outBuffer.pos)) {
Py_CLEAR(result);
goto finally;
}
}
finally:
PyBuffer_Release(&source);
return result;
}
PyDoc_STRVAR(Decompressor_decompressobj__doc__,
"decompressobj([write_size=default])\n"
"\n"
"Incrementally feed data into a decompressor.\n"
"\n"
"The returned object exposes a ``decompress(data)`` method. This makes it\n"
"compatible with ``zlib.decompressobj`` and ``bz2.BZ2Decompressor`` so that\n"
"callers can swap in the zstd decompressor while using the same API.\n"
);
static ZstdDecompressionObj* Decompressor_decompressobj(ZstdDecompressor* self, PyObject* args, PyObject* kwargs) {
static char* kwlist[] = {
"write_size",
NULL
};
ZstdDecompressionObj* result = NULL;
size_t outSize = ZSTD_DStreamOutSize();
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|k:decompressobj", kwlist, &outSize)) {
return NULL;
}
if (!outSize) {
PyErr_SetString(PyExc_ValueError, "write_size must be positive");
return NULL;
}
result = (ZstdDecompressionObj*)PyObject_CallObject((PyObject*)&ZstdDecompressionObjType, NULL);
if (!result) {
return NULL;
}
if (ensure_dctx(self, 1)) {
Py_DECREF(result);
return NULL;
}
result->decompressor = self;
Py_INCREF(result->decompressor);
result->outSize = outSize;
return result;
}
PyDoc_STRVAR(Decompressor_read_to_iter__doc__,
"read_to_iter(reader[, read_size=default, write_size=default, skip_bytes=0])\n"
"Read compressed data and return an iterator\n"
"\n"
"Returns an iterator of decompressed data chunks produced from reading from\n"
"the ``reader``.\n"
"\n"
"Compressed data will be obtained from ``reader`` by calling the\n"
"``read(size)`` method of it. The source data will be streamed into a\n"
"decompressor. As decompressed data is available, it will be exposed to the\n"
"returned iterator.\n"
"\n"
"Data is ``read()`` in chunks of size ``read_size`` and exposed to the\n"
"iterator in chunks of size ``write_size``. The default values are the input\n"
"and output sizes for a zstd streaming decompressor.\n"
"\n"
"There is also support for skipping the first ``skip_bytes`` of data from\n"
"the source.\n"
);
static ZstdDecompressorIterator* Decompressor_read_to_iter(ZstdDecompressor* self, PyObject* args, PyObject* kwargs) {
static char* kwlist[] = {
"reader",
"read_size",
"write_size",
"skip_bytes",
NULL
};
PyObject* reader;
size_t inSize = ZSTD_DStreamInSize();
size_t outSize = ZSTD_DStreamOutSize();
ZstdDecompressorIterator* result;
size_t skipBytes = 0;
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|kkk:read_to_iter", kwlist,
&reader, &inSize, &outSize, &skipBytes)) {
return NULL;
}
if (skipBytes >= inSize) {
PyErr_SetString(PyExc_ValueError,
"skip_bytes must be smaller than read_size");
return NULL;
}
result = (ZstdDecompressorIterator*)PyObject_CallObject((PyObject*)&ZstdDecompressorIteratorType, NULL);
if (!result) {
return NULL;
}
if (PyObject_HasAttrString(reader, "read")) {
result->reader = reader;
Py_INCREF(result->reader);
}
else if (1 == PyObject_CheckBuffer(reader)) {
/* Object claims it is a buffer. Try to get a handle to it. */
if (0 != PyObject_GetBuffer(reader, &result->buffer, PyBUF_CONTIG_RO)) {
goto except;
}
}
else {
PyErr_SetString(PyExc_ValueError,
"must pass an object with a read() method or conforms to buffer protocol");
goto except;
}
result->decompressor = self;
Py_INCREF(result->decompressor);
result->inSize = inSize;
result->outSize = outSize;
result->skipBytes = skipBytes;
if (ensure_dctx(self, 1)) {
goto except;
}
result->input.src = PyMem_Malloc(inSize);
if (!result->input.src) {
PyErr_NoMemory();
goto except;
}
goto finally;
except:
Py_CLEAR(result);
finally:
return result;
}
PyDoc_STRVAR(Decompressor_stream_reader__doc__,
"stream_reader(source, [read_size=default])\n"
"\n"
"Obtain an object that behaves like an I/O stream that can be used for\n"
"reading decompressed output from an object.\n"
"\n"
"The source object can be any object with a ``read(size)`` method or that\n"
"conforms to the buffer protocol.\n"
);
static ZstdDecompressionReader* Decompressor_stream_reader(ZstdDecompressor* self, PyObject* args, PyObject* kwargs) {
static char* kwlist[] = {
"source",
"read_size",
NULL
};
PyObject* source;
size_t readSize = ZSTD_DStreamInSize();
ZstdDecompressionReader* result;
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|k:stream_reader", kwlist,
&source, &readSize)) {
return NULL;
}
result = (ZstdDecompressionReader*)PyObject_CallObject((PyObject*)&ZstdDecompressionReaderType, NULL);
if (NULL == result) {
return NULL;
}
if (PyObject_HasAttrString(source, "read")) {
result->reader = source;
Py_INCREF(source);
result->readSize = readSize;
}
else if (1 == PyObject_CheckBuffer(source)) {
if (0 != PyObject_GetBuffer(source, &result->buffer, PyBUF_CONTIG_RO)) {
Py_CLEAR(result);
return NULL;
}
}
else {
PyErr_SetString(PyExc_TypeError,
"must pass an object with a read() method or that conforms to the buffer protocol");
Py_CLEAR(result);
return NULL;
}
result->decompressor = self;
Py_INCREF(self);
return result;
}
PyDoc_STRVAR(Decompressor_stream_writer__doc__,
"Create a context manager to write decompressed data to an object.\n"
"\n"
"The passed object must have a ``write()`` method.\n"
"\n"
"The caller feeds intput data to the object by calling ``write(data)``.\n"
"Decompressed data is written to the argument given as it is decompressed.\n"
"\n"
"An optional ``write_size`` argument defines the size of chunks to\n"
"``write()`` to the writer. It defaults to the default output size for a zstd\n"
"streaming decompressor.\n"
);
static ZstdDecompressionWriter* Decompressor_stream_writer(ZstdDecompressor* self, PyObject* args, PyObject* kwargs) {
static char* kwlist[] = {
"writer",
"write_size",
NULL
};
PyObject* writer;
size_t outSize = ZSTD_DStreamOutSize();
ZstdDecompressionWriter* result;
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|k:stream_writer", kwlist,
&writer, &outSize)) {
return NULL;
}
if (!PyObject_HasAttrString(writer, "write")) {
PyErr_SetString(PyExc_ValueError, "must pass an object with a write() method");
return NULL;
}
result = (ZstdDecompressionWriter*)PyObject_CallObject((PyObject*)&ZstdDecompressionWriterType, NULL);
if (!result) {
return NULL;
}
result->decompressor = self;
Py_INCREF(result->decompressor);
result->writer = writer;
Py_INCREF(result->writer);
result->outSize = outSize;
return result;
}
PyDoc_STRVAR(Decompressor_decompress_content_dict_chain__doc__,
"Decompress a series of chunks using the content dictionary chaining technique\n"
);
static PyObject* Decompressor_decompress_content_dict_chain(ZstdDecompressor* self, PyObject* args, PyObject* kwargs) {
static char* kwlist[] = {
"frames",
NULL
};
PyObject* chunks;
Py_ssize_t chunksLen;
Py_ssize_t chunkIndex;
char parity = 0;
PyObject* chunk;
char* chunkData;
Py_ssize_t chunkSize;
size_t zresult;
ZSTD_frameHeader frameHeader;
void* buffer1 = NULL;
size_t buffer1Size = 0;
size_t buffer1ContentSize = 0;
void* buffer2 = NULL;
size_t buffer2Size = 0;
size_t buffer2ContentSize = 0;
void* destBuffer = NULL;
PyObject* result = NULL;
ZSTD_outBuffer outBuffer;
ZSTD_inBuffer inBuffer;
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O!:decompress_content_dict_chain",
kwlist, &PyList_Type, &chunks)) {
return NULL;
}
chunksLen = PyList_Size(chunks);
if (!chunksLen) {
PyErr_SetString(PyExc_ValueError, "empty input chain");
return NULL;
}
/* The first chunk should not be using a dictionary. We handle it specially. */
chunk = PyList_GetItem(chunks, 0);
if (!PyBytes_Check(chunk)) {
PyErr_SetString(PyExc_ValueError, "chunk 0 must be bytes");
return NULL;
}
/* We require that all chunks be zstd frames and that they have content size set. */
PyBytes_AsStringAndSize(chunk, &chunkData, &chunkSize);
zresult = ZSTD_getFrameHeader(&frameHeader, (void*)chunkData, chunkSize);
if (ZSTD_isError(zresult)) {
PyErr_SetString(PyExc_ValueError, "chunk 0 is not a valid zstd frame");
return NULL;
}
else if (zresult) {
PyErr_SetString(PyExc_ValueError, "chunk 0 is too small to contain a zstd frame");
return NULL;
}
if (ZSTD_CONTENTSIZE_UNKNOWN == frameHeader.frameContentSize) {
PyErr_SetString(PyExc_ValueError, "chunk 0 missing content size in frame");
return NULL;
}
assert(ZSTD_CONTENTSIZE_ERROR != frameHeader.frameContentSize);
/* We check against PY_SSIZE_T_MAX here because we ultimately cast the
* result to a Python object and it's length can be no greater than
* Py_ssize_t. In theory, we could have an intermediate frame that is
* larger. But a) why would this API be used for frames that large b)
* it isn't worth the complexity to support. */
assert(SIZE_MAX >= PY_SSIZE_T_MAX);
if (frameHeader.frameContentSize > PY_SSIZE_T_MAX) {
PyErr_SetString(PyExc_ValueError,
"chunk 0 is too large to decompress on this platform");
return NULL;
}
if (ensure_dctx(self, 0)) {
goto finally;
}
buffer1Size = (size_t)frameHeader.frameContentSize;
buffer1 = PyMem_Malloc(buffer1Size);
if (!buffer1) {
goto finally;
}
outBuffer.dst = buffer1;
outBuffer.size = buffer1Size;
outBuffer.pos = 0;
inBuffer.src = chunkData;
inBuffer.size = chunkSize;
inBuffer.pos = 0;
Py_BEGIN_ALLOW_THREADS
zresult = ZSTD_decompress_generic(self->dctx, &outBuffer, &inBuffer);
Py_END_ALLOW_THREADS
if (ZSTD_isError(zresult)) {
PyErr_Format(ZstdError, "could not decompress chunk 0: %s", ZSTD_getErrorName(zresult));
goto finally;
}
else if (zresult) {
PyErr_Format(ZstdError, "chunk 0 did not decompress full frame");
goto finally;
}
buffer1ContentSize = outBuffer.pos;
/* Special case of a simple chain. */
if (1 == chunksLen) {
result = PyBytes_FromStringAndSize(buffer1, buffer1Size);
goto finally;
}
/* This should ideally look at next chunk. But this is slightly simpler. */
buffer2Size = (size_t)frameHeader.frameContentSize;
buffer2 = PyMem_Malloc(buffer2Size);
if (!buffer2) {
goto finally;
}
/* For each subsequent chunk, use the previous fulltext as a content dictionary.
Our strategy is to have 2 buffers. One holds the previous fulltext (to be
used as a content dictionary) and the other holds the new fulltext. The
buffers grow when needed but never decrease in size. This limits the
memory allocator overhead.
*/
for (chunkIndex = 1; chunkIndex < chunksLen; chunkIndex++) {
chunk = PyList_GetItem(chunks, chunkIndex);
if (!PyBytes_Check(chunk)) {
PyErr_Format(PyExc_ValueError, "chunk %zd must be bytes", chunkIndex);
goto finally;
}
PyBytes_AsStringAndSize(chunk, &chunkData, &chunkSize);
zresult = ZSTD_getFrameHeader(&frameHeader, (void*)chunkData, chunkSize);
if (ZSTD_isError(zresult)) {
PyErr_Format(PyExc_ValueError, "chunk %zd is not a valid zstd frame", chunkIndex);
goto finally;
}
else if (zresult) {
PyErr_Format(PyExc_ValueError, "chunk %zd is too small to contain a zstd frame", chunkIndex);
goto finally;
}
if (ZSTD_CONTENTSIZE_UNKNOWN == frameHeader.frameContentSize) {
PyErr_Format(PyExc_ValueError, "chunk %zd missing content size in frame", chunkIndex);
goto finally;
}
assert(ZSTD_CONTENTSIZE_ERROR != frameHeader.frameContentSize);
if (frameHeader.frameContentSize > PY_SSIZE_T_MAX) {
PyErr_Format(PyExc_ValueError,
"chunk %zd is too large to decompress on this platform", chunkIndex);
goto finally;
}
inBuffer.src = chunkData;
inBuffer.size = chunkSize;
inBuffer.pos = 0;
parity = chunkIndex % 2;
/* This could definitely be abstracted to reduce code duplication. */
if (parity) {
/* Resize destination buffer to hold larger content. */
if (buffer2Size < frameHeader.frameContentSize) {
buffer2Size = (size_t)frameHeader.frameContentSize;
destBuffer = PyMem_Realloc(buffer2, buffer2Size);
if (!destBuffer) {
goto finally;
}
buffer2 = destBuffer;
}
Py_BEGIN_ALLOW_THREADS
zresult = ZSTD_DCtx_refPrefix_advanced(self->dctx,
buffer1, buffer1ContentSize, ZSTD_dct_rawContent);
Py_END_ALLOW_THREADS
if (ZSTD_isError(zresult)) {
PyErr_Format(ZstdError,
"failed to load prefix dictionary at chunk %zd", chunkIndex);
goto finally;
}
outBuffer.dst = buffer2;
outBuffer.size = buffer2Size;
outBuffer.pos = 0;
Py_BEGIN_ALLOW_THREADS
zresult = ZSTD_decompress_generic(self->dctx, &outBuffer, &inBuffer);
Py_END_ALLOW_THREADS
if (ZSTD_isError(zresult)) {
PyErr_Format(ZstdError, "could not decompress chunk %zd: %s",
chunkIndex, ZSTD_getErrorName(zresult));
goto finally;
}
else if (zresult) {
PyErr_Format(ZstdError, "chunk %zd did not decompress full frame",
chunkIndex);
goto finally;
}
buffer2ContentSize = outBuffer.pos;
}
else {
if (buffer1Size < frameHeader.frameContentSize) {
buffer1Size = (size_t)frameHeader.frameContentSize;
destBuffer = PyMem_Realloc(buffer1, buffer1Size);
if (!destBuffer) {
goto finally;
}
buffer1 = destBuffer;
}
Py_BEGIN_ALLOW_THREADS
zresult = ZSTD_DCtx_refPrefix_advanced(self->dctx,
buffer2, buffer2ContentSize, ZSTD_dct_rawContent);
Py_END_ALLOW_THREADS
if (ZSTD_isError(zresult)) {
PyErr_Format(ZstdError,
"failed to load prefix dictionary at chunk %zd", chunkIndex);
goto finally;
}
outBuffer.dst = buffer1;
outBuffer.size = buffer1Size;
outBuffer.pos = 0;
Py_BEGIN_ALLOW_THREADS
zresult = ZSTD_decompress_generic(self->dctx, &outBuffer, &inBuffer);
Py_END_ALLOW_THREADS
if (ZSTD_isError(zresult)) {
PyErr_Format(ZstdError, "could not decompress chunk %zd: %s",
chunkIndex, ZSTD_getErrorName(zresult));
goto finally;
}
else if (zresult) {
PyErr_Format(ZstdError, "chunk %zd did not decompress full frame",
chunkIndex);
goto finally;
}
buffer1ContentSize = outBuffer.pos;
}
}
result = PyBytes_FromStringAndSize(parity ? buffer2 : buffer1,
parity ? buffer2ContentSize : buffer1ContentSize);
finally:
if (buffer2) {
PyMem_Free(buffer2);
}
if (buffer1) {
PyMem_Free(buffer1);
}
return result;
}
typedef struct {
void* sourceData;
size_t sourceSize;
size_t destSize;
} FramePointer;
typedef struct {
FramePointer* frames;
Py_ssize_t framesSize;
unsigned long long compressedSize;
} FrameSources;
typedef struct {
void* dest;
Py_ssize_t destSize;
BufferSegment* segments;
Py_ssize_t segmentsSize;
} DestBuffer;
typedef enum {
WorkerError_none = 0,
WorkerError_zstd = 1,
WorkerError_memory = 2,
WorkerError_sizeMismatch = 3,
WorkerError_unknownSize = 4,
} WorkerError;
typedef struct {
/* Source records and length */
FramePointer* framePointers;
/* Which records to process. */
Py_ssize_t startOffset;
Py_ssize_t endOffset;
unsigned long long totalSourceSize;
/* Compression state and settings. */
ZSTD_DCtx* dctx;
int requireOutputSizes;
/* Output storage. */
DestBuffer* destBuffers;
Py_ssize_t destCount;
/* Item that error occurred on. */
Py_ssize_t errorOffset;
/* If an error occurred. */
WorkerError error;
/* result from zstd decompression operation */
size_t zresult;
} WorkerState;
static void decompress_worker(WorkerState* state) {
size_t allocationSize;
DestBuffer* destBuffer;
Py_ssize_t frameIndex;
Py_ssize_t localOffset = 0;
Py_ssize_t currentBufferStartIndex = state->startOffset;
Py_ssize_t remainingItems = state->endOffset - state->startOffset + 1;
void* tmpBuf;
Py_ssize_t destOffset = 0;
FramePointer* framePointers = state->framePointers;
size_t zresult;
unsigned long long totalOutputSize = 0;
assert(NULL == state->destBuffers);
assert(0 == state->destCount);
assert(state->endOffset - state->startOffset >= 0);
/* We could get here due to the way work is allocated. Ideally we wouldn't
get here. But that would require a bit of a refactor in the caller. */
if (state->totalSourceSize > SIZE_MAX) {
state->error = WorkerError_memory;
state->errorOffset = 0;
return;
}
/*
* We need to allocate a buffer to hold decompressed data. How we do this
* depends on what we know about the output. The following scenarios are
* possible:
*
* 1. All structs defining frames declare the output size.
* 2. The decompressed size is embedded within the zstd frame.
* 3. The decompressed size is not stored anywhere.
*
* For now, we only support #1 and #2.
*/
/* Resolve ouput segments. */
for (frameIndex = state->startOffset; frameIndex <= state->endOffset; frameIndex++) {
FramePointer* fp = &framePointers[frameIndex];
unsigned long long decompressedSize;
if (0 == fp->destSize) {
decompressedSize = ZSTD_getFrameContentSize(fp->sourceData, fp->sourceSize);
if (ZSTD_CONTENTSIZE_ERROR == decompressedSize) {
state->error = WorkerError_unknownSize;
state->errorOffset = frameIndex;
return;
}
else if (ZSTD_CONTENTSIZE_UNKNOWN == decompressedSize) {
if (state->requireOutputSizes) {
state->error = WorkerError_unknownSize;
state->errorOffset = frameIndex;
return;
}
/* This will fail the assert for .destSize > 0 below. */
decompressedSize = 0;
}
if (decompressedSize > SIZE_MAX) {
state->error = WorkerError_memory;
state->errorOffset = frameIndex;
return;
}
fp->destSize = (size_t)decompressedSize;
}
totalOutputSize += fp->destSize;
}
state->destBuffers = calloc(1, sizeof(DestBuffer));
if (NULL == state->destBuffers) {
state->error = WorkerError_memory;
return;
}
state->destCount = 1;
destBuffer = &state->destBuffers[state->destCount - 1];
assert(framePointers[state->startOffset].destSize > 0); /* For now. */
allocationSize = roundpow2((size_t)state->totalSourceSize);
if (framePointers[state->startOffset].destSize > allocationSize) {
allocationSize = roundpow2(framePointers[state->startOffset].destSize);
}
destBuffer->dest = malloc(allocationSize);
if (NULL == destBuffer->dest) {
state->error = WorkerError_memory;
return;
}
destBuffer->destSize = allocationSize;
destBuffer->segments = calloc(remainingItems, sizeof(BufferSegment));
if (NULL == destBuffer->segments) {
/* Caller will free state->dest as part of cleanup. */
state->error = WorkerError_memory;
return;
}
destBuffer->segmentsSize = remainingItems;
for (frameIndex = state->startOffset; frameIndex <= state->endOffset; frameIndex++) {
ZSTD_outBuffer outBuffer;
ZSTD_inBuffer inBuffer;
const void* source = framePointers[frameIndex].sourceData;
const size_t sourceSize = framePointers[frameIndex].sourceSize;
void* dest;
const size_t decompressedSize = framePointers[frameIndex].destSize;
size_t destAvailable = destBuffer->destSize - destOffset;
assert(decompressedSize > 0); /* For now. */
/*
* Not enough space in current buffer. Finish current before and allocate and
* switch to a new one.
*/
if (decompressedSize > destAvailable) {
/*
* Shrinking the destination buffer is optional. But it should be cheap,
* so we just do it.
*/
if (destAvailable) {
tmpBuf = realloc(destBuffer->dest, destOffset);
if (NULL == tmpBuf) {
state->error = WorkerError_memory;
return;
}
destBuffer->dest = tmpBuf;
destBuffer->destSize = destOffset;
}
/* Truncate segments buffer. */
tmpBuf = realloc(destBuffer->segments,
(frameIndex - currentBufferStartIndex) * sizeof(BufferSegment));
if (NULL == tmpBuf) {
state->error = WorkerError_memory;
return;
}
destBuffer->segments = tmpBuf;
destBuffer->segmentsSize = frameIndex - currentBufferStartIndex;
/* Grow space for new DestBuffer. */
tmpBuf = realloc(state->destBuffers, (state->destCount + 1) * sizeof(DestBuffer));
if (NULL == tmpBuf) {
state->error = WorkerError_memory;
return;
}
state->destBuffers = tmpBuf;
state->destCount++;
destBuffer = &state->destBuffers[state->destCount - 1];
/* Don't take any chances will non-NULL pointers. */
memset(destBuffer, 0, sizeof(DestBuffer));
allocationSize = roundpow2((size_t)state->totalSourceSize);
if (decompressedSize > allocationSize) {
allocationSize = roundpow2(decompressedSize);
}
destBuffer->dest = malloc(allocationSize);
if (NULL == destBuffer->dest) {
state->error = WorkerError_memory;
return;
}
destBuffer->destSize = allocationSize;
destAvailable = allocationSize;
destOffset = 0;
localOffset = 0;
destBuffer->segments = calloc(remainingItems, sizeof(BufferSegment));
if (NULL == destBuffer->segments) {
state->error = WorkerError_memory;
return;
}
destBuffer->segmentsSize = remainingItems;
currentBufferStartIndex = frameIndex;
}
dest = (char*)destBuffer->dest + destOffset;
outBuffer.dst = dest;
outBuffer.size = decompressedSize;
outBuffer.pos = 0;
inBuffer.src = source;
inBuffer.size = sourceSize;
inBuffer.pos = 0;
zresult = ZSTD_decompress_generic(state->dctx, &outBuffer, &inBuffer);
if (ZSTD_isError(zresult)) {
state->error = WorkerError_zstd;
state->zresult = zresult;
state->errorOffset = frameIndex;
return;
}
else if (zresult || outBuffer.pos != decompressedSize) {
state->error = WorkerError_sizeMismatch;
state->zresult = outBuffer.pos;
state->errorOffset = frameIndex;
return;
}
destBuffer->segments[localOffset].offset = destOffset;
destBuffer->segments[localOffset].length = outBuffer.pos;
destOffset += outBuffer.pos;
localOffset++;
remainingItems--;
}
if (destBuffer->destSize > destOffset) {
tmpBuf = realloc(destBuffer->dest, destOffset);
if (NULL == tmpBuf) {
state->error = WorkerError_memory;
return;
}
destBuffer->dest = tmpBuf;
destBuffer->destSize = destOffset;
}
}
ZstdBufferWithSegmentsCollection* decompress_from_framesources(ZstdDecompressor* decompressor, FrameSources* frames,
Py_ssize_t threadCount) {
Py_ssize_t i = 0;
int errored = 0;
Py_ssize_t segmentsCount;
ZstdBufferWithSegments* bws = NULL;
PyObject* resultArg = NULL;
Py_ssize_t resultIndex;
ZstdBufferWithSegmentsCollection* result = NULL;
FramePointer* framePointers = frames->frames;
unsigned long long workerBytes = 0;
Py_ssize_t currentThread = 0;
Py_ssize_t workerStartOffset = 0;
POOL_ctx* pool = NULL;
WorkerState* workerStates = NULL;
unsigned long long bytesPerWorker;
/* Caller should normalize 0 and negative values to 1 or larger. */
assert(threadCount >= 1);
/* More threads than inputs makes no sense under any conditions. */
threadCount = frames->framesSize < threadCount ? frames->framesSize
: threadCount;
/* TODO lower thread count if input size is too small and threads would just
add overhead. */
if (decompressor->dict) {
if (ensure_ddict(decompressor->dict)) {
return NULL;
}
}
/* If threadCount==1, we don't start a thread pool. But we do leverage the
same API for dispatching work. */
workerStates = PyMem_Malloc(threadCount * sizeof(WorkerState));
if (NULL == workerStates) {
PyErr_NoMemory();
goto finally;
}
memset(workerStates, 0, threadCount * sizeof(WorkerState));
if (threadCount > 1) {
pool = POOL_create(threadCount, 1);
if (NULL == pool) {
PyErr_SetString(ZstdError, "could not initialize zstd thread pool");
goto finally;
}
}
bytesPerWorker = frames->compressedSize / threadCount;
if (bytesPerWorker > SIZE_MAX) {
PyErr_SetString(ZstdError, "too much data per worker for this platform");
goto finally;
}
for (i = 0; i < threadCount; i++) {
size_t zresult;
workerStates[i].dctx = ZSTD_createDCtx();
if (NULL == workerStates[i].dctx) {
PyErr_NoMemory();
goto finally;
}
ZSTD_copyDCtx(workerStates[i].dctx, decompressor->dctx);
if (decompressor->dict) {
zresult = ZSTD_DCtx_refDDict(workerStates[i].dctx, decompressor->dict->ddict);
if (zresult) {
PyErr_Format(ZstdError, "unable to reference prepared dictionary: %s",
ZSTD_getErrorName(zresult));
goto finally;
}
}
workerStates[i].framePointers = framePointers;
workerStates[i].requireOutputSizes = 1;
}
Py_BEGIN_ALLOW_THREADS
/* There are many ways to split work among workers.
For now, we take a simple approach of splitting work so each worker
gets roughly the same number of input bytes. This will result in more
starvation than running N>threadCount jobs. But it avoids complications
around state tracking, which could involve extra locking.
*/
for (i = 0; i < frames->framesSize; i++) {
workerBytes += frames->frames[i].sourceSize;
/*
* The last worker/thread needs to handle all remaining work. Don't
* trigger it prematurely. Defer to the block outside of the loop.
* (But still process this loop so workerBytes is correct.
*/
if (currentThread == threadCount - 1) {
continue;
}
if (workerBytes >= bytesPerWorker) {
workerStates[currentThread].startOffset = workerStartOffset;
workerStates[currentThread].endOffset = i;
workerStates[currentThread].totalSourceSize = workerBytes;
if (threadCount > 1) {
POOL_add(pool, (POOL_function)decompress_worker, &workerStates[currentThread]);
}
else {
decompress_worker(&workerStates[currentThread]);
}
currentThread++;
workerStartOffset = i + 1;
workerBytes = 0;
}
}
if (workerBytes) {
workerStates[currentThread].startOffset = workerStartOffset;
workerStates[currentThread].endOffset = frames->framesSize - 1;
workerStates[currentThread].totalSourceSize = workerBytes;
if (threadCount > 1) {
POOL_add(pool, (POOL_function)decompress_worker, &workerStates[currentThread]);
}
else {
decompress_worker(&workerStates[currentThread]);
}
}
if (threadCount > 1) {
POOL_free(pool);
pool = NULL;
}
Py_END_ALLOW_THREADS
for (i = 0; i < threadCount; i++) {
switch (workerStates[i].error) {
case WorkerError_none:
break;
case WorkerError_zstd:
PyErr_Format(ZstdError, "error decompressing item %zd: %s",
workerStates[i].errorOffset, ZSTD_getErrorName(workerStates[i].zresult));
errored = 1;
break;
case WorkerError_memory:
PyErr_NoMemory();
errored = 1;
break;
case WorkerError_sizeMismatch:
PyErr_Format(ZstdError, "error decompressing item %zd: decompressed %zu bytes; expected %zu",
workerStates[i].errorOffset, workerStates[i].zresult,
framePointers[workerStates[i].errorOffset].destSize);
errored = 1;
break;
case WorkerError_unknownSize:
PyErr_Format(PyExc_ValueError, "could not determine decompressed size of item %zd",
workerStates[i].errorOffset);
errored = 1;
break;
default:
PyErr_Format(ZstdError, "unhandled error type: %d; this is a bug",
workerStates[i].error);
errored = 1;
break;
}
if (errored) {
break;
}
}
if (errored) {
goto finally;
}
segmentsCount = 0;
for (i = 0; i < threadCount; i++) {
segmentsCount += workerStates[i].destCount;
}
resultArg = PyTuple_New(segmentsCount);
if (NULL == resultArg) {
goto finally;
}
resultIndex = 0;
for (i = 0; i < threadCount; i++) {
Py_ssize_t bufferIndex;
WorkerState* state = &workerStates[i];
for (bufferIndex = 0; bufferIndex < state->destCount; bufferIndex++) {
DestBuffer* destBuffer = &state->destBuffers[bufferIndex];
bws = BufferWithSegments_FromMemory(destBuffer->dest, destBuffer->destSize,
destBuffer->segments, destBuffer->segmentsSize);
if (NULL == bws) {
goto finally;
}
/*
* Memory for buffer and segments was allocated using malloc() in worker
* and the memory is transferred to the BufferWithSegments instance. So
* tell instance to use free() and NULL the reference in the state struct
* so it isn't freed below.
*/
bws->useFree = 1;
destBuffer->dest = NULL;
destBuffer->segments = NULL;
PyTuple_SET_ITEM(resultArg, resultIndex++, (PyObject*)bws);
}
}
result = (ZstdBufferWithSegmentsCollection*)PyObject_CallObject(
(PyObject*)&ZstdBufferWithSegmentsCollectionType, resultArg);
finally:
Py_CLEAR(resultArg);
if (workerStates) {
for (i = 0; i < threadCount; i++) {
Py_ssize_t bufferIndex;
WorkerState* state = &workerStates[i];
if (state->dctx) {
ZSTD_freeDCtx(state->dctx);
}
for (bufferIndex = 0; bufferIndex < state->destCount; bufferIndex++) {
if (state->destBuffers) {
/*
* Will be NULL if memory transfered to a BufferWithSegments.
* Otherwise it is left over after an error occurred.
*/
free(state->destBuffers[bufferIndex].dest);
free(state->destBuffers[bufferIndex].segments);
}
}
free(state->destBuffers);
}
PyMem_Free(workerStates);
}
POOL_free(pool);
return result;
}
PyDoc_STRVAR(Decompressor_multi_decompress_to_buffer__doc__,
"Decompress multiple frames to output buffers\n"
"\n"
"Receives a ``BufferWithSegments``, a ``BufferWithSegmentsCollection`` or a\n"
"list of bytes-like objects. Each item in the passed collection should be a\n"
"compressed zstd frame.\n"
"\n"
"Unless ``decompressed_sizes`` is specified, the content size *must* be\n"
"written into the zstd frame header. If ``decompressed_sizes`` is specified,\n"
"it is an object conforming to the buffer protocol that represents an array\n"
"of 64-bit unsigned integers in the machine's native format. Specifying\n"
"``decompressed_sizes`` avoids a pre-scan of each frame to determine its\n"
"output size.\n"
"\n"
"Returns a ``BufferWithSegmentsCollection`` containing the decompressed\n"
"data. All decompressed data is allocated in a single memory buffer. The\n"
"``BufferWithSegments`` instance tracks which objects are at which offsets\n"
"and their respective lengths.\n"
"\n"
"The ``threads`` argument controls how many threads to use for operations.\n"
"Negative values will use the same number of threads as logical CPUs on the\n"
"machine.\n"
);
static ZstdBufferWithSegmentsCollection* Decompressor_multi_decompress_to_buffer(ZstdDecompressor* self, PyObject* args, PyObject* kwargs) {
static char* kwlist[] = {
"frames",
"decompressed_sizes",
"threads",
NULL
};
PyObject* frames;
Py_buffer frameSizes;
int threads = 0;
Py_ssize_t frameCount;
Py_buffer* frameBuffers = NULL;
FramePointer* framePointers = NULL;
unsigned long long* frameSizesP = NULL;
unsigned long long totalInputSize = 0;
FrameSources frameSources;
ZstdBufferWithSegmentsCollection* result = NULL;
Py_ssize_t i;
memset(&frameSizes, 0, sizeof(frameSizes));
#if PY_MAJOR_VERSION >= 3
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|y*i:multi_decompress_to_buffer",
#else
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|s*i:multi_decompress_to_buffer",
#endif
kwlist, &frames, &frameSizes, &threads)) {
return NULL;
}
if (frameSizes.buf) {
if (!PyBuffer_IsContiguous(&frameSizes, 'C') || frameSizes.ndim > 1) {
PyErr_SetString(PyExc_ValueError, "decompressed_sizes buffer should be contiguous and have a single dimension");
goto finally;
}
frameSizesP = (unsigned long long*)frameSizes.buf;
}
if (threads < 0) {
threads = cpu_count();
}
if (threads < 2) {
threads = 1;
}
if (PyObject_TypeCheck(frames, &ZstdBufferWithSegmentsType)) {
ZstdBufferWithSegments* buffer = (ZstdBufferWithSegments*)frames;
frameCount = buffer->segmentCount;
if (frameSizes.buf && frameSizes.len != frameCount * (Py_ssize_t)sizeof(unsigned long long)) {
PyErr_Format(PyExc_ValueError, "decompressed_sizes size mismatch; expected %zd, got %zd",
frameCount * sizeof(unsigned long long), frameSizes.len);
goto finally;
}
framePointers = PyMem_Malloc(frameCount * sizeof(FramePointer));
if (!framePointers) {
PyErr_NoMemory();
goto finally;
}
for (i = 0; i < frameCount; i++) {
void* sourceData;
unsigned long long sourceSize;
unsigned long long decompressedSize = 0;
if (buffer->segments[i].offset + buffer->segments[i].length > buffer->dataSize) {
PyErr_Format(PyExc_ValueError, "item %zd has offset outside memory area", i);
goto finally;
}
sourceData = (char*)buffer->data + buffer->segments[i].offset;
sourceSize = buffer->segments[i].length;
totalInputSize += sourceSize;
if (frameSizesP) {
decompressedSize = frameSizesP[i];
}
if (sourceSize > SIZE_MAX) {
PyErr_Format(PyExc_ValueError,
"item %zd is too large for this platform", i);
goto finally;
}
if (decompressedSize > SIZE_MAX) {
PyErr_Format(PyExc_ValueError,
"decompressed size of item %zd is too large for this platform", i);
goto finally;
}
framePointers[i].sourceData = sourceData;
framePointers[i].sourceSize = (size_t)sourceSize;
framePointers[i].destSize = (size_t)decompressedSize;
}
}
else if (PyObject_TypeCheck(frames, &ZstdBufferWithSegmentsCollectionType)) {
Py_ssize_t offset = 0;
ZstdBufferWithSegments* buffer;
ZstdBufferWithSegmentsCollection* collection = (ZstdBufferWithSegmentsCollection*)frames;
frameCount = BufferWithSegmentsCollection_length(collection);
if (frameSizes.buf && frameSizes.len != frameCount) {
PyErr_Format(PyExc_ValueError,
"decompressed_sizes size mismatch; expected %zd; got %zd",
frameCount * sizeof(unsigned long long), frameSizes.len);
goto finally;
}
framePointers = PyMem_Malloc(frameCount * sizeof(FramePointer));
if (NULL == framePointers) {
PyErr_NoMemory();
goto finally;
}
/* Iterate the data structure directly because it is faster. */
for (i = 0; i < collection->bufferCount; i++) {
Py_ssize_t segmentIndex;
buffer = collection->buffers[i];
for (segmentIndex = 0; segmentIndex < buffer->segmentCount; segmentIndex++) {
unsigned long long decompressedSize = frameSizesP ? frameSizesP[offset] : 0;
if (buffer->segments[segmentIndex].offset + buffer->segments[segmentIndex].length > buffer->dataSize) {
PyErr_Format(PyExc_ValueError, "item %zd has offset outside memory area",
offset);
goto finally;
}
if (buffer->segments[segmentIndex].length > SIZE_MAX) {
PyErr_Format(PyExc_ValueError,
"item %zd in buffer %zd is too large for this platform",
segmentIndex, i);
goto finally;
}
if (decompressedSize > SIZE_MAX) {
PyErr_Format(PyExc_ValueError,
"decompressed size of item %zd in buffer %zd is too large for this platform",
segmentIndex, i);
goto finally;
}
totalInputSize += buffer->segments[segmentIndex].length;
framePointers[offset].sourceData = (char*)buffer->data + buffer->segments[segmentIndex].offset;
framePointers[offset].sourceSize = (size_t)buffer->segments[segmentIndex].length;
framePointers[offset].destSize = (size_t)decompressedSize;
offset++;
}
}
}
else if (PyList_Check(frames)) {
frameCount = PyList_GET_SIZE(frames);
if (frameSizes.buf && frameSizes.len != frameCount * (Py_ssize_t)sizeof(unsigned long long)) {
PyErr_Format(PyExc_ValueError, "decompressed_sizes size mismatch; expected %zd, got %zd",
frameCount * sizeof(unsigned long long), frameSizes.len);
goto finally;
}
framePointers = PyMem_Malloc(frameCount * sizeof(FramePointer));
if (!framePointers) {
PyErr_NoMemory();
goto finally;
}
frameBuffers = PyMem_Malloc(frameCount * sizeof(Py_buffer));
if (NULL == frameBuffers) {
PyErr_NoMemory();
goto finally;
}
memset(frameBuffers, 0, frameCount * sizeof(Py_buffer));
/* Do a pass to assemble info about our input buffers and output sizes. */
for (i = 0; i < frameCount; i++) {
unsigned long long decompressedSize = frameSizesP ? frameSizesP[i] : 0;
if (0 != PyObject_GetBuffer(PyList_GET_ITEM(frames, i),
&frameBuffers[i], PyBUF_CONTIG_RO)) {
PyErr_Clear();
PyErr_Format(PyExc_TypeError, "item %zd not a bytes like object", i);
goto finally;
}
if (decompressedSize > SIZE_MAX) {
PyErr_Format(PyExc_ValueError,
"decompressed size of item %zd is too large for this platform", i);
goto finally;
}
totalInputSize += frameBuffers[i].len;
framePointers[i].sourceData = frameBuffers[i].buf;
framePointers[i].sourceSize = frameBuffers[i].len;
framePointers[i].destSize = (size_t)decompressedSize;
}
}
else {
PyErr_SetString(PyExc_TypeError, "argument must be list or BufferWithSegments");
goto finally;
}
/* We now have an array with info about our inputs and outputs. Feed it into
our generic decompression function. */
frameSources.frames = framePointers;
frameSources.framesSize = frameCount;
frameSources.compressedSize = totalInputSize;
result = decompress_from_framesources(self, &frameSources, threads);
finally:
if (frameSizes.buf) {
PyBuffer_Release(&frameSizes);
}
PyMem_Free(framePointers);
if (frameBuffers) {
for (i = 0; i < frameCount; i++) {
PyBuffer_Release(&frameBuffers[i]);
}
PyMem_Free(frameBuffers);
}
return result;
}
static PyMethodDef Decompressor_methods[] = {
{ "copy_stream", (PyCFunction)Decompressor_copy_stream, METH_VARARGS | METH_KEYWORDS,
Decompressor_copy_stream__doc__ },
{ "decompress", (PyCFunction)Decompressor_decompress, METH_VARARGS | METH_KEYWORDS,
Decompressor_decompress__doc__ },
{ "decompressobj", (PyCFunction)Decompressor_decompressobj, METH_VARARGS | METH_KEYWORDS,
Decompressor_decompressobj__doc__ },
{ "read_to_iter", (PyCFunction)Decompressor_read_to_iter, METH_VARARGS | METH_KEYWORDS,
Decompressor_read_to_iter__doc__ },
/* TODO Remove deprecated API */
{ "read_from", (PyCFunction)Decompressor_read_to_iter, METH_VARARGS | METH_KEYWORDS,
Decompressor_read_to_iter__doc__ },
{ "stream_reader", (PyCFunction)Decompressor_stream_reader,
METH_VARARGS | METH_KEYWORDS, Decompressor_stream_reader__doc__ },
{ "stream_writer", (PyCFunction)Decompressor_stream_writer, METH_VARARGS | METH_KEYWORDS,
Decompressor_stream_writer__doc__ },
/* TODO remove deprecated API */
{ "write_to", (PyCFunction)Decompressor_stream_writer, METH_VARARGS | METH_KEYWORDS,
Decompressor_stream_writer__doc__ },
{ "decompress_content_dict_chain", (PyCFunction)Decompressor_decompress_content_dict_chain,
METH_VARARGS | METH_KEYWORDS, Decompressor_decompress_content_dict_chain__doc__ },
{ "multi_decompress_to_buffer", (PyCFunction)Decompressor_multi_decompress_to_buffer,
METH_VARARGS | METH_KEYWORDS, Decompressor_multi_decompress_to_buffer__doc__ },
{ "memory_size", (PyCFunction)Decompressor_memory_size, METH_NOARGS,
Decompressor_memory_size__doc__ },
{ NULL, NULL }
};
PyTypeObject ZstdDecompressorType = {
PyVarObject_HEAD_INIT(NULL, 0)
"zstd.ZstdDecompressor", /* tp_name */
sizeof(ZstdDecompressor), /* tp_basicsize */
0, /* tp_itemsize */
(destructor)Decompressor_dealloc, /* tp_dealloc */
0, /* tp_print */
0, /* tp_getattr */
0, /* tp_setattr */
0, /* tp_compare */
0, /* tp_repr */
0, /* tp_as_number */
0, /* tp_as_sequence */
0, /* tp_as_mapping */
0, /* tp_hash */
0, /* tp_call */
0, /* tp_str */
0, /* tp_getattro */
0, /* tp_setattro */
0, /* tp_as_buffer */
Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE, /* tp_flags */
Decompressor__doc__, /* tp_doc */
0, /* tp_traverse */
0, /* tp_clear */
0, /* tp_richcompare */
0, /* tp_weaklistoffset */
0, /* tp_iter */
0, /* tp_iternext */
Decompressor_methods, /* tp_methods */
0, /* tp_members */
0, /* tp_getset */
0, /* tp_base */
0, /* tp_dict */
0, /* tp_descr_get */
0, /* tp_descr_set */
0, /* tp_dictoffset */
(initproc)Decompressor_init, /* tp_init */
0, /* tp_alloc */
PyType_GenericNew, /* tp_new */
};
void decompressor_module_init(PyObject* mod) {
Py_TYPE(&ZstdDecompressorType) = &PyType_Type;
if (PyType_Ready(&ZstdDecompressorType) < 0) {
return;
}
Py_INCREF((PyObject*)&ZstdDecompressorType);
PyModule_AddObject(mod, "ZstdDecompressor",
(PyObject*)&ZstdDecompressorType);
}