##// END OF EJS Templates
copies-rust: start recording overwrite as they happens...
copies-rust: start recording overwrite as they happens If a revision has information overwriting data from another revision, the overwriting revision is a descendant of the overwritten one. So we could warm the Oracle cache with such information to avoid potential future `is_ancestors` call. This provide us with a large speedup in the most expensive cases: Repo Case Source-Rev Dest-Rev # of revisions old time new time Difference Factor time per rev --------------------------------------------------------------------------------------------------------------------------------------------------------------- mozilla-try x00000_revs_x00000_added_x0000_copies 1b661134e2ca 1ae03d022d6d : 228985 revs, 41.113063 s, 36.001255 s, -5.111808 s, × 0.8757, 157 µs/rev mozilla-try x00000_revs_x00000_added_x000_copies 9b2a99adc05e 8e29777b48e6 : 382065 revs, 27.891612 s, 14.340641 s, -13.550971 s, × 0.5142, 37 µs/rev Full comparison below: Repo Case Source-Rev Dest-Rev # of revisions old time new time Difference Factor time per rev --------------------------------------------------------------------------------------------------------------------------------------------------------------- mercurial x_revs_x_added_0_copies ad6b123de1c7 39cfcef4f463 : 1 revs, 0.000042 s, 0.000042 s, +0.000000 s, × 1.0000, 42 µs/rev mercurial x_revs_x_added_x_copies 2b1c78674230 0c1d10351869 : 6 revs, 0.000114 s, 0.000109 s, -0.000005 s, × 0.9561, 18 µs/rev mercurial x000_revs_x000_added_x_copies 81f8ff2a9bf2 dd3267698d84 : 1032 revs, 0.004934 s, 0.004953 s, +0.000019 s, × 1.0039, 4 µs/rev pypy x_revs_x_added_0_copies aed021ee8ae8 099ed31b181b : 9 revs, 0.000195 s, 0.000237 s, +0.000042 s, × 1.2154, 26 µs/rev pypy x_revs_x000_added_0_copies 4aa4e1f8e19a 359343b9ac0e : 1 revs, 0.000050 s, 0.000050 s, +0.000000 s, × 1.0000, 50 µs/rev pypy x_revs_x_added_x_copies ac52eb7bbbb0 72e022663155 : 7 revs, 0.000113 s, 0.000113 s, +0.000000 s, × 1.0000, 16 µs/rev pypy x_revs_x00_added_x_copies c3b14617fbd7 ace7255d9a26 : 1 revs, 0.6f1f4a s, 0.6f1f4a s, +0.000000 s, × 1.0000, 322 µs/rev pypy x_revs_x000_added_x000_copies df6f7a526b60 a83dc6a2d56f : 6 revs, 0.010788 s, 0.010702 s, -0.000086 s, × 0.9920, 1783 µs/rev pypy x000_revs_xx00_added_0_copies 89a76aede314 2f22446ff07e : 4785 revs, 0.050880 s, 0.050504 s, -0.000376 s, × 0.9926, 10 µs/rev pypy x000_revs_x000_added_x_copies 8a3b5bfd266e 2c68e87c3efe : 6780 revs, 0.081760 s, 0.080159 s, -0.001601 s, × 0.9804, 11 µs/rev pypy x000_revs_x000_added_x000_copies 89a76aede314 7b3dda341c84 : 5441 revs, 0.061382 s, 0.060058 s, -0.001324 s, × 0.9784, 11 µs/rev pypy x0000_revs_x_added_0_copies d1defd0dc478 c9cb1334cc78 : 43645 revs, 0.585802 s, 0.536950 s, -0.048852 s, × 0.9166, 12 µs/rev pypy x0000_revs_xx000_added_0_copies bf2c629d0071 4ffed77c095c : 2 revs, 0.012803 s, 0.012868 s, +0.000065 s, × 1.0051, 6434 µs/rev pypy x0000_revs_xx000_added_x000_copies 08ea3258278e d9fa043f30c0 : 11316 revs, 0.113558 s, 0.112806 s, -0.000752 s, × 0.9934, 9 µs/rev netbeans x_revs_x_added_0_copies fb0955ffcbcd a01e9239f9e7 : 2 revs, 0.000085 s, 0.000084 s, -0.000001 s, × 0.9882, 42 µs/rev netbeans x_revs_x000_added_0_copies 6f360122949f 20eb231cc7d0 : 2 revs, 0.000106 s, 0.000106 s, +0.000000 s, × 1.0000, 53 µs/rev netbeans x_revs_x_added_x_copies 1ada3faf6fb6 5a39d12eecf4 : 3 revs, 0.000175 s, 0.000174 s, -0.000001 s, × 0.9943, 58 µs/rev netbeans x_revs_x00_added_x_copies 35be93ba1e2c 9eec5e90c05f : 9 revs, 0.000721 s, 0.000726 s, +0.000005 s, × 1.0069, 80 µs/rev netbeans x000_revs_xx00_added_0_copies eac3045b4fdd 51d4ae7f1290 : 1421 revs, 0.010127 s, 0.010105 s, -0.000022 s, × 0.9978, 7 µs/rev netbeans x000_revs_x000_added_x_copies e2063d266acd 6081d72689dc : 1533 revs, 0.015616 s, 0.015748 s, +0.000132 s, × 1.0085, 10 µs/rev netbeans x000_revs_x000_added_x000_copies ff453e9fee32 411350406ec2 : 5750 revs, 0.061341 s, 0.060357 s, -0.000984 s, × 0.9840, 10 µs/rev netbeans x0000_revs_xx000_added_x000_copies 588c2d1ced70 1aad62e59ddd : 66949 revs, 0.542214 s, 0.499356 s, -0.042858 s, × 0.9210, 7 µs/rev mozilla-central x_revs_x_added_0_copies 3697f962bb7b 7015fcdd43a2 : 2 revs, 0.000089 s, 0.000092 s, +0.000003 s, × 1.0337, 46 µs/rev mozilla-central x_revs_x000_added_0_copies dd390860c6c9 40d0c5bed75d : 8 revs, 0.000279 s, 0.000279 s, +0.000000 s, × 1.0000, 34 µs/rev mozilla-central x_revs_x_added_x_copies 8d198483ae3b 14207ffc2b2f : 9 revs, 0.000184 s, 0.000186 s, +0.000002 s, × 1.0109, 20 µs/rev mozilla-central x_revs_x00_added_x_copies 98cbc58cc6bc 446a150332c3 : 7 revs, 0.000661 s, 0.000660 s, -0.000001 s, × 0.9985, 94 µs/rev mozilla-central x_revs_x000_added_x000_copies 3c684b4b8f68 0a5e72d1b479 : 3 revs, 0.003377 s, 0.003372 s, -0.000005 s, × 0.9985, 1124 µs/rev mozilla-central x_revs_x0000_added_x0000_copies effb563bb7e5 c07a39dc4e80 : 6 revs, 0.070508 s, 0.070294 s, -0.000214 s, × 0.9970, 11715 µs/rev mozilla-central x000_revs_xx00_added_0_copies 6100d773079a 04a55431795e : 1593 revs, 0.006576 s, 0.006545 s, -0.000031 s, × 0.9953, 4 µs/rev mozilla-central x000_revs_x000_added_x_copies 9f17a6fc04f9 2d37b966abed : 41 revs, 0.004809 s, 0.004998 s, +0.000189 s, × 1.0393, 121 µs/rev mozilla-central x000_revs_x000_added_x000_copies 7c97034feb78 4407bd0c6330 : 7839 revs, 0.064872 s, 0.063348 s, -0.001524 s, × 0.9765, 8 µs/rev mozilla-central x0000_revs_xx000_added_0_copies 9eec5917337d 67118cc6dcad : 615 revs, 0.026142 s, 0.026154 s, +0.000012 s, × 1.0005, 42 µs/rev mozilla-central x0000_revs_xx000_added_x000_copies f78c615a656c 96a38b690156 : 30263 revs, 0.203956 s, 0.199063 s, -0.004893 s, × 0.9760, 6 µs/rev mozilla-central x00000_revs_x0000_added_x0000_copies 6832ae71433c 4c222a1d9a00 : 153721 revs, 1.763853 s, 1.277320 s, -0.486533 s, × 0.7242, 8 µs/rev mozilla-central x00000_revs_x00000_added_x000_copies 76caed42cf7c 1daa622bbe42 : 204976 revs, 2.609761 s, 1.698794 s, -0.910967 s, × 0.6509, 8 µs/rev mozilla-try x_revs_x_added_0_copies aaf6dde0deb8 9790f499805a : 2 revs, 0.000847 s, 0.000842 s, -0.000005 s, × 0.9941, 421 µs/rev mozilla-try x_revs_x000_added_0_copies d8d0222927b4 5bb8ce8c7450 : 2 revs, 0.000867 s, 0.000865 s, -0.000002 s, × 0.9977, 432 µs/rev mozilla-try x_revs_x_added_x_copies 092fcca11bdb 936255a0384a : 4 revs, 0.000161 s, 0.000160 s, -0.000001 s, × 0.9938, 40 µs/rev mozilla-try x_revs_x00_added_x_copies b53d2fadbdb5 017afae788ec : 2 revs, 0.001131 s, 0.001122 s, -0.000009 s, × 0.9920, 561 µs/rev mozilla-try x_revs_x000_added_x000_copies 20408ad61ce5 6f0ee96e21ad : 1 revs, 0.033114 s, 0.032743 s, -0.000371 s, × 0.9888, 32743 µs/rev mozilla-try x_revs_x0000_added_x0000_copies effb563bb7e5 c07a39dc4e80 : 6 revs, 0.071092 s, 0.071529 s, +0.000437 s, × 1.0061, 11921 µs/rev mozilla-try x000_revs_xx00_added_0_copies 6100d773079a 04a55431795e : 1593 revs, 0.006554 s, 0.006593 s, +0.000039 s, × 1.0060, 4 µs/rev mozilla-try x000_revs_x000_added_x_copies 9f17a6fc04f9 2d37b966abed : 41 revs, 0.005160 s, 0.005311 s, +0.000151 s, × 1.0293, 129 µs/rev mozilla-try x000_revs_x000_added_x000_copies 1346fd0130e4 4c65cbdabc1f : 6657 revs, 0.065063 s, 0.063063 s, -0.002000 s, × 0.9693, 9 µs/rev mozilla-try x0000_revs_x_added_0_copies 63519bfd42ee a36a2a865d92 : 40314 revs, 0.297118 s, 0.312363 s, +0.015245 s, × 1.0513, 7 µs/rev mozilla-try x0000_revs_x_added_x_copies 9fe69ff0762d bcabf2a78927 : 38690 revs, 0.284002 s, 0.283106 s, -0.000896 s, × 0.9968, 7 µs/rev mozilla-try x0000_revs_xx000_added_x_copies 156f6e2674f2 4d0f2c178e66 : 8598 revs, 0.086311 s, 0.083817 s, -0.002494 s, × 0.9711, 9 µs/rev mozilla-try x0000_revs_xx000_added_0_copies 9eec5917337d 67118cc6dcad : 615 revs, 0.026738 s, 0.026516 s, -0.000222 s, × 0.9917, 43 µs/rev mozilla-try x0000_revs_xx000_added_x000_copies 89294cd501d9 7ccb2fc7ccb5 : 97052 revs, 1.514270 s, 1.304865 s, -0.209405 s, × 0.8617, 13 µs/rev mozilla-try x0000_revs_x0000_added_x0000_copies e928c65095ed e951f4ad123a : 52031 revs, 0.735875 s, 0.681088 s, -0.054787 s, × 0.9255, 13 µs/rev mozilla-try x00000_revs_x_added_0_copies 6a320851d377 1ebb79acd503 : 363753 revs, 4.843329 s, 4.454320 s, -0.389009 s, × 0.9197, 12 µs/rev mozilla-try x00000_revs_x00000_added_0_copies dc8a3ca7010e d16fde900c9c : 34414 revs, 0.591752 s, 0.567913 s, -0.023839 s, × 0.9597, 16 µs/rev mozilla-try x00000_revs_x_added_x_copies 5173c4b6f97c 95d83ee7242d : 362229 revs, 4.760563 s, 4.547043 s, -0.213520 s, × 0.9551, 12 µs/rev mozilla-try x00000_revs_x000_added_x_copies 9126823d0e9c ca82787bb23c : 359344 revs, 4.751942 s, 4.378579 s, -0.373363 s, × 0.9214, 12 µs/rev mozilla-try x00000_revs_x0000_added_x0000_copies 8d3fafa80d4b eb884023b810 : 192665 revs, 2.605014 s, 1.703622 s, -0.901392 s, × 0.6540, 8 µs/rev mozilla-try x00000_revs_x00000_added_x0000_copies 1b661134e2ca 1ae03d022d6d : 228985 revs, 41.113063 s, 36.001255 s, -5.111808 s, × 0.8757, 157 µs/rev mozilla-try x00000_revs_x00000_added_x000_copies 9b2a99adc05e 8e29777b48e6 : 382065 revs, 27.891612 s, 14.340641 s, -13.550971 s, × 0.5142, 37 µs/rev Differential Revision: https://phab.mercurial-scm.org/D9497

File last commit:

r42237:675775c3 default
r46770:fce2f20a default
Show More
compressor.c
1670 lines | 44.6 KiB | text/x-c | CLexer
/**
* Copyright (c) 2016-present, Gregory Szorc
* All rights reserved.
*
* This software may be modified and distributed under the terms
* of the BSD license. See the LICENSE file for details.
*/
#include "python-zstandard.h"
#include "pool.h"
extern PyObject* ZstdError;
int setup_cctx(ZstdCompressor* compressor) {
size_t zresult;
assert(compressor);
assert(compressor->cctx);
assert(compressor->params);
zresult = ZSTD_CCtx_setParametersUsingCCtxParams(compressor->cctx, compressor->params);
if (ZSTD_isError(zresult)) {
PyErr_Format(ZstdError, "could not set compression parameters: %s",
ZSTD_getErrorName(zresult));
return 1;
}
if (compressor->dict) {
if (compressor->dict->cdict) {
zresult = ZSTD_CCtx_refCDict(compressor->cctx, compressor->dict->cdict);
}
else {
zresult = ZSTD_CCtx_loadDictionary_advanced(compressor->cctx,
compressor->dict->dictData, compressor->dict->dictSize,
ZSTD_dlm_byRef, compressor->dict->dictType);
}
if (ZSTD_isError(zresult)) {
PyErr_Format(ZstdError, "could not load compression dictionary: %s",
ZSTD_getErrorName(zresult));
return 1;
}
}
return 0;
}
static PyObject* frame_progression(ZSTD_CCtx* cctx) {
PyObject* result = NULL;
PyObject* value;
ZSTD_frameProgression progression;
result = PyTuple_New(3);
if (!result) {
return NULL;
}
progression = ZSTD_getFrameProgression(cctx);
value = PyLong_FromUnsignedLongLong(progression.ingested);
if (!value) {
Py_DECREF(result);
return NULL;
}
PyTuple_SET_ITEM(result, 0, value);
value = PyLong_FromUnsignedLongLong(progression.consumed);
if (!value) {
Py_DECREF(result);
return NULL;
}
PyTuple_SET_ITEM(result, 1, value);
value = PyLong_FromUnsignedLongLong(progression.produced);
if (!value) {
Py_DECREF(result);
return NULL;
}
PyTuple_SET_ITEM(result, 2, value);
return result;
}
PyDoc_STRVAR(ZstdCompressor__doc__,
"ZstdCompressor(level=None, dict_data=None, compression_params=None)\n"
"\n"
"Create an object used to perform Zstandard compression.\n"
"\n"
"An instance can compress data various ways. Instances can be used multiple\n"
"times. Each compression operation will use the compression parameters\n"
"defined at construction time.\n"
"\n"
"Compression can be configured via the following names arguments:\n"
"\n"
"level\n"
" Integer compression level.\n"
"dict_data\n"
" A ``ZstdCompressionDict`` to be used to compress with dictionary data.\n"
"compression_params\n"
" A ``CompressionParameters`` instance defining low-level compression"
" parameters. If defined, this will overwrite the ``level`` argument.\n"
"write_checksum\n"
" If True, a 4 byte content checksum will be written with the compressed\n"
" data, allowing the decompressor to perform content verification.\n"
"write_content_size\n"
" If True (the default), the decompressed content size will be included in\n"
" the header of the compressed data. This data will only be written if the\n"
" compressor knows the size of the input data.\n"
"write_dict_id\n"
" Determines whether the dictionary ID will be written into the compressed\n"
" data. Defaults to True. Only adds content to the compressed data if\n"
" a dictionary is being used.\n"
"threads\n"
" Number of threads to use to compress data concurrently. When set,\n"
" compression operations are performed on multiple threads. The default\n"
" value (0) disables multi-threaded compression. A value of ``-1`` means to\n"
" set the number of threads to the number of detected logical CPUs.\n"
);
static int ZstdCompressor_init(ZstdCompressor* self, PyObject* args, PyObject* kwargs) {
static char* kwlist[] = {
"level",
"dict_data",
"compression_params",
"write_checksum",
"write_content_size",
"write_dict_id",
"threads",
NULL
};
int level = 3;
ZstdCompressionDict* dict = NULL;
ZstdCompressionParametersObject* params = NULL;
PyObject* writeChecksum = NULL;
PyObject* writeContentSize = NULL;
PyObject* writeDictID = NULL;
int threads = 0;
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|iO!O!OOOi:ZstdCompressor",
kwlist, &level, &ZstdCompressionDictType, &dict,
&ZstdCompressionParametersType, &params,
&writeChecksum, &writeContentSize, &writeDictID, &threads)) {
return -1;
}
if (level > ZSTD_maxCLevel()) {
PyErr_Format(PyExc_ValueError, "level must be less than %d",
ZSTD_maxCLevel() + 1);
return -1;
}
if (threads < 0) {
threads = cpu_count();
}
/* We create a ZSTD_CCtx for reuse among multiple operations to reduce the
overhead of each compression operation. */
self->cctx = ZSTD_createCCtx();
if (!self->cctx) {
PyErr_NoMemory();
return -1;
}
/* TODO stuff the original parameters away somewhere so we can reset later. This
will allow us to do things like automatically adjust cparams based on input
size (assuming zstd isn't doing that internally). */
self->params = ZSTD_createCCtxParams();
if (!self->params) {
PyErr_NoMemory();
return -1;
}
if (params && writeChecksum) {
PyErr_SetString(PyExc_ValueError,
"cannot define compression_params and write_checksum");
return -1;
}
if (params && writeContentSize) {
PyErr_SetString(PyExc_ValueError,
"cannot define compression_params and write_content_size");
return -1;
}
if (params && writeDictID) {
PyErr_SetString(PyExc_ValueError,
"cannot define compression_params and write_dict_id");
return -1;
}
if (params && threads) {
PyErr_SetString(PyExc_ValueError,
"cannot define compression_params and threads");
return -1;
}
if (params) {
if (set_parameters(self->params, params)) {
return -1;
}
}
else {
if (set_parameter(self->params, ZSTD_c_compressionLevel, level)) {
return -1;
}
if (set_parameter(self->params, ZSTD_c_contentSizeFlag,
writeContentSize ? PyObject_IsTrue(writeContentSize) : 1)) {
return -1;
}
if (set_parameter(self->params, ZSTD_c_checksumFlag,
writeChecksum ? PyObject_IsTrue(writeChecksum) : 0)) {
return -1;
}
if (set_parameter(self->params, ZSTD_c_dictIDFlag,
writeDictID ? PyObject_IsTrue(writeDictID) : 1)) {
return -1;
}
if (threads) {
if (set_parameter(self->params, ZSTD_c_nbWorkers, threads)) {
return -1;
}
}
}
if (dict) {
self->dict = dict;
Py_INCREF(dict);
}
if (setup_cctx(self)) {
return -1;
}
return 0;
}
static void ZstdCompressor_dealloc(ZstdCompressor* self) {
if (self->cctx) {
ZSTD_freeCCtx(self->cctx);
self->cctx = NULL;
}
if (self->params) {
ZSTD_freeCCtxParams(self->params);
self->params = NULL;
}
Py_XDECREF(self->dict);
PyObject_Del(self);
}
PyDoc_STRVAR(ZstdCompressor_memory_size__doc__,
"memory_size()\n"
"\n"
"Obtain the memory usage of this compressor, in bytes.\n"
);
static PyObject* ZstdCompressor_memory_size(ZstdCompressor* self) {
if (self->cctx) {
return PyLong_FromSize_t(ZSTD_sizeof_CCtx(self->cctx));
}
else {
PyErr_SetString(ZstdError, "no compressor context found; this should never happen");
return NULL;
}
}
PyDoc_STRVAR(ZstdCompressor_frame_progression__doc__,
"frame_progression()\n"
"\n"
"Return information on how much work the compressor has done.\n"
"\n"
"Returns a 3-tuple of (ingested, consumed, produced).\n"
);
static PyObject* ZstdCompressor_frame_progression(ZstdCompressor* self) {
return frame_progression(self->cctx);
}
PyDoc_STRVAR(ZstdCompressor_copy_stream__doc__,
"copy_stream(ifh, ofh[, size=0, read_size=default, write_size=default])\n"
"compress data between streams\n"
"\n"
"Data will be read from ``ifh``, compressed, and written to ``ofh``.\n"
"``ifh`` must have a ``read(size)`` method. ``ofh`` must have a ``write(data)``\n"
"method.\n"
"\n"
"An optional ``size`` argument specifies the size of the source stream.\n"
"If defined, compression parameters will be tuned based on the size.\n"
"\n"
"Optional arguments ``read_size`` and ``write_size`` define the chunk sizes\n"
"of ``read()`` and ``write()`` operations, respectively. By default, they use\n"
"the default compression stream input and output sizes, respectively.\n"
);
static PyObject* ZstdCompressor_copy_stream(ZstdCompressor* self, PyObject* args, PyObject* kwargs) {
static char* kwlist[] = {
"ifh",
"ofh",
"size",
"read_size",
"write_size",
NULL
};
PyObject* source;
PyObject* dest;
unsigned long long sourceSize = ZSTD_CONTENTSIZE_UNKNOWN;
size_t inSize = ZSTD_CStreamInSize();
size_t outSize = ZSTD_CStreamOutSize();
ZSTD_inBuffer input;
ZSTD_outBuffer output;
Py_ssize_t totalRead = 0;
Py_ssize_t totalWrite = 0;
char* readBuffer;
Py_ssize_t readSize;
PyObject* readResult = NULL;
PyObject* res = NULL;
size_t zresult;
PyObject* writeResult;
PyObject* totalReadPy;
PyObject* totalWritePy;
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "OO|Kkk:copy_stream", kwlist,
&source, &dest, &sourceSize, &inSize, &outSize)) {
return NULL;
}
if (!PyObject_HasAttrString(source, "read")) {
PyErr_SetString(PyExc_ValueError, "first argument must have a read() method");
return NULL;
}
if (!PyObject_HasAttrString(dest, "write")) {
PyErr_SetString(PyExc_ValueError, "second argument must have a write() method");
return NULL;
}
ZSTD_CCtx_reset(self->cctx, ZSTD_reset_session_only);
zresult = ZSTD_CCtx_setPledgedSrcSize(self->cctx, sourceSize);
if (ZSTD_isError(zresult)) {
PyErr_Format(ZstdError, "error setting source size: %s",
ZSTD_getErrorName(zresult));
return NULL;
}
/* Prevent free on uninitialized memory in finally. */
output.dst = PyMem_Malloc(outSize);
if (!output.dst) {
PyErr_NoMemory();
res = NULL;
goto finally;
}
output.size = outSize;
output.pos = 0;
input.src = NULL;
input.size = 0;
input.pos = 0;
while (1) {
/* Try to read from source stream. */
readResult = PyObject_CallMethod(source, "read", "n", inSize);
if (!readResult) {
PyErr_SetString(ZstdError, "could not read() from source");
goto finally;
}
PyBytes_AsStringAndSize(readResult, &readBuffer, &readSize);
/* If no data was read, we're at EOF. */
if (0 == readSize) {
break;
}
totalRead += readSize;
/* Send data to compressor */
input.src = readBuffer;
input.size = readSize;
input.pos = 0;
while (input.pos < input.size) {
Py_BEGIN_ALLOW_THREADS
zresult = ZSTD_compressStream2(self->cctx, &output, &input, ZSTD_e_continue);
Py_END_ALLOW_THREADS
if (ZSTD_isError(zresult)) {
res = NULL;
PyErr_Format(ZstdError, "zstd compress error: %s", ZSTD_getErrorName(zresult));
goto finally;
}
if (output.pos) {
#if PY_MAJOR_VERSION >= 3
writeResult = PyObject_CallMethod(dest, "write", "y#",
#else
writeResult = PyObject_CallMethod(dest, "write", "s#",
#endif
output.dst, output.pos);
Py_XDECREF(writeResult);
totalWrite += output.pos;
output.pos = 0;
}
}
Py_CLEAR(readResult);
}
/* We've finished reading. Now flush the compressor stream. */
assert(input.pos == input.size);
while (1) {
Py_BEGIN_ALLOW_THREADS
zresult = ZSTD_compressStream2(self->cctx, &output, &input, ZSTD_e_end);
Py_END_ALLOW_THREADS
if (ZSTD_isError(zresult)) {
PyErr_Format(ZstdError, "error ending compression stream: %s",
ZSTD_getErrorName(zresult));
res = NULL;
goto finally;
}
if (output.pos) {
#if PY_MAJOR_VERSION >= 3
writeResult = PyObject_CallMethod(dest, "write", "y#",
#else
writeResult = PyObject_CallMethod(dest, "write", "s#",
#endif
output.dst, output.pos);
totalWrite += output.pos;
Py_XDECREF(writeResult);
output.pos = 0;
}
if (!zresult) {
break;
}
}
totalReadPy = PyLong_FromSsize_t(totalRead);
totalWritePy = PyLong_FromSsize_t(totalWrite);
res = PyTuple_Pack(2, totalReadPy, totalWritePy);
Py_DECREF(totalReadPy);
Py_DECREF(totalWritePy);
finally:
if (output.dst) {
PyMem_Free(output.dst);
}
Py_XDECREF(readResult);
return res;
}
PyDoc_STRVAR(ZstdCompressor_stream_reader__doc__,
"stream_reader(source, [size=0])\n"
"\n"
"Obtain an object that behaves like an I/O stream.\n"
"\n"
"The source object can be any object with a ``read(size)`` method\n"
"or an object that conforms to the buffer protocol.\n"
);
static ZstdCompressionReader* ZstdCompressor_stream_reader(ZstdCompressor* self, PyObject* args, PyObject* kwargs) {
static char* kwlist[] = {
"source",
"size",
"read_size",
NULL
};
PyObject* source;
unsigned long long sourceSize = ZSTD_CONTENTSIZE_UNKNOWN;
size_t readSize = ZSTD_CStreamInSize();
ZstdCompressionReader* result = NULL;
size_t zresult;
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|Kk:stream_reader", kwlist,
&source, &sourceSize, &readSize)) {
return NULL;
}
result = (ZstdCompressionReader*)PyObject_CallObject((PyObject*)&ZstdCompressionReaderType, NULL);
if (!result) {
return NULL;
}
if (PyObject_HasAttrString(source, "read")) {
result->reader = source;
Py_INCREF(source);
result->readSize = readSize;
}
else if (1 == PyObject_CheckBuffer(source)) {
if (0 != PyObject_GetBuffer(source, &result->buffer, PyBUF_CONTIG_RO)) {
goto except;
}
assert(result->buffer.len >= 0);
sourceSize = result->buffer.len;
}
else {
PyErr_SetString(PyExc_TypeError,
"must pass an object with a read() method or that conforms to the buffer protocol");
goto except;
}
ZSTD_CCtx_reset(self->cctx, ZSTD_reset_session_only);
zresult = ZSTD_CCtx_setPledgedSrcSize(self->cctx, sourceSize);
if (ZSTD_isError(zresult)) {
PyErr_Format(ZstdError, "error setting source source: %s",
ZSTD_getErrorName(zresult));
goto except;
}
result->compressor = self;
Py_INCREF(self);
return result;
except:
Py_CLEAR(result);
return NULL;
}
PyDoc_STRVAR(ZstdCompressor_compress__doc__,
"compress(data)\n"
"\n"
"Compress data in a single operation.\n"
"\n"
"This is the simplest mechanism to perform compression: simply pass in a\n"
"value and get a compressed value back. It is almost the most prone to abuse.\n"
"The input and output values must fit in memory, so passing in very large\n"
"values can result in excessive memory usage. For this reason, one of the\n"
"streaming based APIs is preferred for larger values.\n"
);
static PyObject* ZstdCompressor_compress(ZstdCompressor* self, PyObject* args, PyObject* kwargs) {
static char* kwlist[] = {
"data",
NULL
};
Py_buffer source;
size_t destSize;
PyObject* output = NULL;
size_t zresult;
ZSTD_outBuffer outBuffer;
ZSTD_inBuffer inBuffer;
#if PY_MAJOR_VERSION >= 3
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "y*|O:compress",
#else
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "s*|O:compress",
#endif
kwlist, &source)) {
return NULL;
}
if (!PyBuffer_IsContiguous(&source, 'C') || source.ndim > 1) {
PyErr_SetString(PyExc_ValueError,
"data buffer should be contiguous and have at most one dimension");
goto finally;
}
ZSTD_CCtx_reset(self->cctx, ZSTD_reset_session_only);
destSize = ZSTD_compressBound(source.len);
output = PyBytes_FromStringAndSize(NULL, destSize);
if (!output) {
goto finally;
}
zresult = ZSTD_CCtx_setPledgedSrcSize(self->cctx, source.len);
if (ZSTD_isError(zresult)) {
PyErr_Format(ZstdError, "error setting source size: %s",
ZSTD_getErrorName(zresult));
Py_CLEAR(output);
goto finally;
}
inBuffer.src = source.buf;
inBuffer.size = source.len;
inBuffer.pos = 0;
outBuffer.dst = PyBytes_AsString(output);
outBuffer.size = destSize;
outBuffer.pos = 0;
Py_BEGIN_ALLOW_THREADS
/* By avoiding ZSTD_compress(), we don't necessarily write out content
size. This means the argument to ZstdCompressor to control frame
parameters is honored. */
zresult = ZSTD_compressStream2(self->cctx, &outBuffer, &inBuffer, ZSTD_e_end);
Py_END_ALLOW_THREADS
if (ZSTD_isError(zresult)) {
PyErr_Format(ZstdError, "cannot compress: %s", ZSTD_getErrorName(zresult));
Py_CLEAR(output);
goto finally;
}
else if (zresult) {
PyErr_SetString(ZstdError, "unexpected partial frame flush");
Py_CLEAR(output);
goto finally;
}
Py_SIZE(output) = outBuffer.pos;
finally:
PyBuffer_Release(&source);
return output;
}
PyDoc_STRVAR(ZstdCompressionObj__doc__,
"compressobj()\n"
"\n"
"Return an object exposing ``compress(data)`` and ``flush()`` methods.\n"
"\n"
"The returned object exposes an API similar to ``zlib.compressobj`` and\n"
"``bz2.BZ2Compressor`` so that callers can swap in the zstd compressor\n"
"without changing how compression is performed.\n"
);
static ZstdCompressionObj* ZstdCompressor_compressobj(ZstdCompressor* self, PyObject* args, PyObject* kwargs) {
static char* kwlist[] = {
"size",
NULL
};
unsigned long long inSize = ZSTD_CONTENTSIZE_UNKNOWN;
size_t outSize = ZSTD_CStreamOutSize();
ZstdCompressionObj* result = NULL;
size_t zresult;
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|K:compressobj", kwlist, &inSize)) {
return NULL;
}
ZSTD_CCtx_reset(self->cctx, ZSTD_reset_session_only);
zresult = ZSTD_CCtx_setPledgedSrcSize(self->cctx, inSize);
if (ZSTD_isError(zresult)) {
PyErr_Format(ZstdError, "error setting source size: %s",
ZSTD_getErrorName(zresult));
return NULL;
}
result = (ZstdCompressionObj*)PyObject_CallObject((PyObject*)&ZstdCompressionObjType, NULL);
if (!result) {
return NULL;
}
result->output.dst = PyMem_Malloc(outSize);
if (!result->output.dst) {
PyErr_NoMemory();
Py_DECREF(result);
return NULL;
}
result->output.size = outSize;
result->compressor = self;
Py_INCREF(result->compressor);
return result;
}
PyDoc_STRVAR(ZstdCompressor_read_to_iter__doc__,
"read_to_iter(reader, [size=0, read_size=default, write_size=default])\n"
"Read uncompressed data from a reader and return an iterator\n"
"\n"
"Returns an iterator of compressed data produced from reading from ``reader``.\n"
"\n"
"Uncompressed data will be obtained from ``reader`` by calling the\n"
"``read(size)`` method of it. The source data will be streamed into a\n"
"compressor. As compressed data is available, it will be exposed to the\n"
"iterator.\n"
"\n"
"Data is read from the source in chunks of ``read_size``. Compressed chunks\n"
"are at most ``write_size`` bytes. Both values default to the zstd input and\n"
"and output defaults, respectively.\n"
"\n"
"The caller is partially in control of how fast data is fed into the\n"
"compressor by how it consumes the returned iterator. The compressor will\n"
"not consume from the reader unless the caller consumes from the iterator.\n"
);
static ZstdCompressorIterator* ZstdCompressor_read_to_iter(ZstdCompressor* self, PyObject* args, PyObject* kwargs) {
static char* kwlist[] = {
"reader",
"size",
"read_size",
"write_size",
NULL
};
PyObject* reader;
unsigned long long sourceSize = ZSTD_CONTENTSIZE_UNKNOWN;
size_t inSize = ZSTD_CStreamInSize();
size_t outSize = ZSTD_CStreamOutSize();
ZstdCompressorIterator* result;
size_t zresult;
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|Kkk:read_to_iter", kwlist,
&reader, &sourceSize, &inSize, &outSize)) {
return NULL;
}
result = (ZstdCompressorIterator*)PyObject_CallObject((PyObject*)&ZstdCompressorIteratorType, NULL);
if (!result) {
return NULL;
}
if (PyObject_HasAttrString(reader, "read")) {
result->reader = reader;
Py_INCREF(result->reader);
}
else if (1 == PyObject_CheckBuffer(reader)) {
if (0 != PyObject_GetBuffer(reader, &result->buffer, PyBUF_CONTIG_RO)) {
goto except;
}
sourceSize = result->buffer.len;
}
else {
PyErr_SetString(PyExc_ValueError,
"must pass an object with a read() method or conforms to buffer protocol");
goto except;
}
ZSTD_CCtx_reset(self->cctx, ZSTD_reset_session_only);
zresult = ZSTD_CCtx_setPledgedSrcSize(self->cctx, sourceSize);
if (ZSTD_isError(zresult)) {
PyErr_Format(ZstdError, "error setting source size: %s",
ZSTD_getErrorName(zresult));
return NULL;
}
result->compressor = self;
Py_INCREF(result->compressor);
result->inSize = inSize;
result->outSize = outSize;
result->output.dst = PyMem_Malloc(outSize);
if (!result->output.dst) {
PyErr_NoMemory();
goto except;
}
result->output.size = outSize;
goto finally;
except:
Py_CLEAR(result);
finally:
return result;
}
PyDoc_STRVAR(ZstdCompressor_stream_writer___doc__,
"Create a context manager to write compressed data to an object.\n"
"\n"
"The passed object must have a ``write()`` method.\n"
"\n"
"The caller feeds input data to the object by calling ``compress(data)``.\n"
"Compressed data is written to the argument given to this function.\n"
"\n"
"The function takes an optional ``size`` argument indicating the total size\n"
"of the eventual input. If specified, the size will influence compression\n"
"parameter tuning and could result in the size being written into the\n"
"header of the compressed data.\n"
"\n"
"An optional ``write_size`` argument is also accepted. It defines the maximum\n"
"byte size of chunks fed to ``write()``. By default, it uses the zstd default\n"
"for a compressor output stream.\n"
);
static ZstdCompressionWriter* ZstdCompressor_stream_writer(ZstdCompressor* self, PyObject* args, PyObject* kwargs) {
static char* kwlist[] = {
"writer",
"size",
"write_size",
"write_return_read",
NULL
};
PyObject* writer;
ZstdCompressionWriter* result;
size_t zresult;
unsigned long long sourceSize = ZSTD_CONTENTSIZE_UNKNOWN;
size_t outSize = ZSTD_CStreamOutSize();
PyObject* writeReturnRead = NULL;
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|KkO:stream_writer", kwlist,
&writer, &sourceSize, &outSize, &writeReturnRead)) {
return NULL;
}
if (!PyObject_HasAttrString(writer, "write")) {
PyErr_SetString(PyExc_ValueError, "must pass an object with a write() method");
return NULL;
}
ZSTD_CCtx_reset(self->cctx, ZSTD_reset_session_only);
zresult = ZSTD_CCtx_setPledgedSrcSize(self->cctx, sourceSize);
if (ZSTD_isError(zresult)) {
PyErr_Format(ZstdError, "error setting source size: %s",
ZSTD_getErrorName(zresult));
return NULL;
}
result = (ZstdCompressionWriter*)PyObject_CallObject((PyObject*)&ZstdCompressionWriterType, NULL);
if (!result) {
return NULL;
}
result->output.dst = PyMem_Malloc(outSize);
if (!result->output.dst) {
Py_DECREF(result);
return (ZstdCompressionWriter*)PyErr_NoMemory();
}
result->output.pos = 0;
result->output.size = outSize;
result->compressor = self;
Py_INCREF(result->compressor);
result->writer = writer;
Py_INCREF(result->writer);
result->outSize = outSize;
result->bytesCompressed = 0;
result->writeReturnRead = writeReturnRead ? PyObject_IsTrue(writeReturnRead) : 0;
return result;
}
PyDoc_STRVAR(ZstdCompressor_chunker__doc__,
"Create an object for iterative compressing to same-sized chunks.\n"
);
static ZstdCompressionChunker* ZstdCompressor_chunker(ZstdCompressor* self, PyObject* args, PyObject* kwargs) {
static char* kwlist[] = {
"size",
"chunk_size",
NULL
};
unsigned long long sourceSize = ZSTD_CONTENTSIZE_UNKNOWN;
size_t chunkSize = ZSTD_CStreamOutSize();
ZstdCompressionChunker* chunker;
size_t zresult;
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|Kk:chunker", kwlist,
&sourceSize, &chunkSize)) {
return NULL;
}
ZSTD_CCtx_reset(self->cctx, ZSTD_reset_session_only);
zresult = ZSTD_CCtx_setPledgedSrcSize(self->cctx, sourceSize);
if (ZSTD_isError(zresult)) {
PyErr_Format(ZstdError, "error setting source size: %s",
ZSTD_getErrorName(zresult));
return NULL;
}
chunker = (ZstdCompressionChunker*)PyObject_CallObject((PyObject*)&ZstdCompressionChunkerType, NULL);
if (!chunker) {
return NULL;
}
chunker->output.dst = PyMem_Malloc(chunkSize);
if (!chunker->output.dst) {
PyErr_NoMemory();
Py_DECREF(chunker);
return NULL;
}
chunker->output.size = chunkSize;
chunker->output.pos = 0;
chunker->compressor = self;
Py_INCREF(chunker->compressor);
chunker->chunkSize = chunkSize;
return chunker;
}
typedef struct {
void* sourceData;
size_t sourceSize;
} DataSource;
typedef struct {
DataSource* sources;
Py_ssize_t sourcesSize;
unsigned long long totalSourceSize;
} DataSources;
typedef struct {
void* dest;
Py_ssize_t destSize;
BufferSegment* segments;
Py_ssize_t segmentsSize;
} DestBuffer;
typedef enum {
WorkerError_none = 0,
WorkerError_zstd = 1,
WorkerError_no_memory = 2,
WorkerError_nospace = 3,
} WorkerError;
/**
* Holds state for an individual worker performing multi_compress_to_buffer work.
*/
typedef struct {
/* Used for compression. */
ZSTD_CCtx* cctx;
/* What to compress. */
DataSource* sources;
Py_ssize_t sourcesSize;
Py_ssize_t startOffset;
Py_ssize_t endOffset;
unsigned long long totalSourceSize;
/* Result storage. */
DestBuffer* destBuffers;
Py_ssize_t destCount;
/* Error tracking. */
WorkerError error;
size_t zresult;
Py_ssize_t errorOffset;
} WorkerState;
static void compress_worker(WorkerState* state) {
Py_ssize_t inputOffset = state->startOffset;
Py_ssize_t remainingItems = state->endOffset - state->startOffset + 1;
Py_ssize_t currentBufferStartOffset = state->startOffset;
size_t zresult;
void* newDest;
size_t allocationSize;
size_t boundSize;
Py_ssize_t destOffset = 0;
DataSource* sources = state->sources;
DestBuffer* destBuffer;
assert(!state->destBuffers);
assert(0 == state->destCount);
/*
* The total size of the compressed data is unknown until we actually
* compress data. That means we can't pre-allocate the exact size we need.
*
* There is a cost to every allocation and reallocation. So, it is in our
* interest to minimize the number of allocations.
*
* There is also a cost to too few allocations. If allocations are too
* large they may fail. If buffers are shared and all inputs become
* irrelevant at different lifetimes, then a reference to one segment
* in the buffer will keep the entire buffer alive. This leads to excessive
* memory usage.
*
* Our current strategy is to assume a compression ratio of 16:1 and
* allocate buffers of that size, rounded up to the nearest power of 2
* (because computers like round numbers). That ratio is greater than what
* most inputs achieve. This is by design: we don't want to over-allocate.
* But we don't want to under-allocate and lead to too many buffers either.
*/
state->destCount = 1;
state->destBuffers = calloc(1, sizeof(DestBuffer));
if (NULL == state->destBuffers) {
state->error = WorkerError_no_memory;
return;
}
destBuffer = &state->destBuffers[state->destCount - 1];
/*
* Rather than track bounds and grow the segments buffer, allocate space
* to hold remaining items then truncate when we're done with it.
*/
destBuffer->segments = calloc(remainingItems, sizeof(BufferSegment));
if (NULL == destBuffer->segments) {
state->error = WorkerError_no_memory;
return;
}
destBuffer->segmentsSize = remainingItems;
assert(state->totalSourceSize <= SIZE_MAX);
allocationSize = roundpow2((size_t)state->totalSourceSize >> 4);
/* If the maximum size of the output is larger than that, round up. */
boundSize = ZSTD_compressBound(sources[inputOffset].sourceSize);
if (boundSize > allocationSize) {
allocationSize = roundpow2(boundSize);
}
destBuffer->dest = malloc(allocationSize);
if (NULL == destBuffer->dest) {
state->error = WorkerError_no_memory;
return;
}
destBuffer->destSize = allocationSize;
for (inputOffset = state->startOffset; inputOffset <= state->endOffset; inputOffset++) {
void* source = sources[inputOffset].sourceData;
size_t sourceSize = sources[inputOffset].sourceSize;
size_t destAvailable;
void* dest;
ZSTD_outBuffer opOutBuffer;
ZSTD_inBuffer opInBuffer;
destAvailable = destBuffer->destSize - destOffset;
boundSize = ZSTD_compressBound(sourceSize);
/*
* Not enough space in current buffer to hold largest compressed output.
* So allocate and switch to a new output buffer.
*/
if (boundSize > destAvailable) {
/*
* The downsizing of the existing buffer is optional. It should be cheap
* (unlike growing). So we just do it.
*/
if (destAvailable) {
newDest = realloc(destBuffer->dest, destOffset);
if (NULL == newDest) {
state->error = WorkerError_no_memory;
return;
}
destBuffer->dest = newDest;
destBuffer->destSize = destOffset;
}
/* Truncate segments buffer. */
newDest = realloc(destBuffer->segments,
(inputOffset - currentBufferStartOffset + 1) * sizeof(BufferSegment));
if (NULL == newDest) {
state->error = WorkerError_no_memory;
return;
}
destBuffer->segments = newDest;
destBuffer->segmentsSize = inputOffset - currentBufferStartOffset;
/* Grow space for new struct. */
/* TODO consider over-allocating so we don't do this every time. */
newDest = realloc(state->destBuffers, (state->destCount + 1) * sizeof(DestBuffer));
if (NULL == newDest) {
state->error = WorkerError_no_memory;
return;
}
state->destBuffers = newDest;
state->destCount++;
destBuffer = &state->destBuffers[state->destCount - 1];
/* Don't take any chances with non-NULL pointers. */
memset(destBuffer, 0, sizeof(DestBuffer));
/**
* We could dynamically update allocation size based on work done so far.
* For now, keep is simple.
*/
assert(state->totalSourceSize <= SIZE_MAX);
allocationSize = roundpow2((size_t)state->totalSourceSize >> 4);
if (boundSize > allocationSize) {
allocationSize = roundpow2(boundSize);
}
destBuffer->dest = malloc(allocationSize);
if (NULL == destBuffer->dest) {
state->error = WorkerError_no_memory;
return;
}
destBuffer->destSize = allocationSize;
destAvailable = allocationSize;
destOffset = 0;
destBuffer->segments = calloc(remainingItems, sizeof(BufferSegment));
if (NULL == destBuffer->segments) {
state->error = WorkerError_no_memory;
return;
}
destBuffer->segmentsSize = remainingItems;
currentBufferStartOffset = inputOffset;
}
dest = (char*)destBuffer->dest + destOffset;
opInBuffer.src = source;
opInBuffer.size = sourceSize;
opInBuffer.pos = 0;
opOutBuffer.dst = dest;
opOutBuffer.size = destAvailable;
opOutBuffer.pos = 0;
zresult = ZSTD_CCtx_setPledgedSrcSize(state->cctx, sourceSize);
if (ZSTD_isError(zresult)) {
state->error = WorkerError_zstd;
state->zresult = zresult;
state->errorOffset = inputOffset;
break;
}
zresult = ZSTD_compressStream2(state->cctx, &opOutBuffer, &opInBuffer, ZSTD_e_end);
if (ZSTD_isError(zresult)) {
state->error = WorkerError_zstd;
state->zresult = zresult;
state->errorOffset = inputOffset;
break;
}
else if (zresult) {
state->error = WorkerError_nospace;
state->errorOffset = inputOffset;
break;
}
destBuffer->segments[inputOffset - currentBufferStartOffset].offset = destOffset;
destBuffer->segments[inputOffset - currentBufferStartOffset].length = opOutBuffer.pos;
destOffset += opOutBuffer.pos;
remainingItems--;
}
if (destBuffer->destSize > destOffset) {
newDest = realloc(destBuffer->dest, destOffset);
if (NULL == newDest) {
state->error = WorkerError_no_memory;
return;
}
destBuffer->dest = newDest;
destBuffer->destSize = destOffset;
}
}
ZstdBufferWithSegmentsCollection* compress_from_datasources(ZstdCompressor* compressor,
DataSources* sources, Py_ssize_t threadCount) {
unsigned long long bytesPerWorker;
POOL_ctx* pool = NULL;
WorkerState* workerStates = NULL;
Py_ssize_t i;
unsigned long long workerBytes = 0;
Py_ssize_t workerStartOffset = 0;
Py_ssize_t currentThread = 0;
int errored = 0;
Py_ssize_t segmentsCount = 0;
Py_ssize_t segmentIndex;
PyObject* segmentsArg = NULL;
ZstdBufferWithSegments* buffer;
ZstdBufferWithSegmentsCollection* result = NULL;
assert(sources->sourcesSize > 0);
assert(sources->totalSourceSize > 0);
assert(threadCount >= 1);
/* More threads than inputs makes no sense. */
threadCount = sources->sourcesSize < threadCount ? sources->sourcesSize
: threadCount;
/* TODO lower thread count when input size is too small and threads would add
overhead. */
workerStates = PyMem_Malloc(threadCount * sizeof(WorkerState));
if (NULL == workerStates) {
PyErr_NoMemory();
goto finally;
}
memset(workerStates, 0, threadCount * sizeof(WorkerState));
if (threadCount > 1) {
pool = POOL_create(threadCount, 1);
if (NULL == pool) {
PyErr_SetString(ZstdError, "could not initialize zstd thread pool");
goto finally;
}
}
bytesPerWorker = sources->totalSourceSize / threadCount;
for (i = 0; i < threadCount; i++) {
size_t zresult;
workerStates[i].cctx = ZSTD_createCCtx();
if (!workerStates[i].cctx) {
PyErr_NoMemory();
goto finally;
}
zresult = ZSTD_CCtx_setParametersUsingCCtxParams(workerStates[i].cctx,
compressor->params);
if (ZSTD_isError(zresult)) {
PyErr_Format(ZstdError, "could not set compression parameters: %s",
ZSTD_getErrorName(zresult));
goto finally;
}
if (compressor->dict) {
if (compressor->dict->cdict) {
zresult = ZSTD_CCtx_refCDict(workerStates[i].cctx, compressor->dict->cdict);
}
else {
zresult = ZSTD_CCtx_loadDictionary_advanced(
workerStates[i].cctx,
compressor->dict->dictData,
compressor->dict->dictSize,
ZSTD_dlm_byRef,
compressor->dict->dictType);
}
if (ZSTD_isError(zresult)) {
PyErr_Format(ZstdError, "could not load compression dictionary: %s",
ZSTD_getErrorName(zresult));
goto finally;
}
}
workerStates[i].sources = sources->sources;
workerStates[i].sourcesSize = sources->sourcesSize;
}
Py_BEGIN_ALLOW_THREADS
for (i = 0; i < sources->sourcesSize; i++) {
workerBytes += sources->sources[i].sourceSize;
/*
* The last worker/thread needs to handle all remaining work. Don't
* trigger it prematurely. Defer to the block outside of the loop
* to run the last worker/thread. But do still process this loop
* so workerBytes is correct.
*/
if (currentThread == threadCount - 1) {
continue;
}
if (workerBytes >= bytesPerWorker) {
assert(currentThread < threadCount);
workerStates[currentThread].totalSourceSize = workerBytes;
workerStates[currentThread].startOffset = workerStartOffset;
workerStates[currentThread].endOffset = i;
if (threadCount > 1) {
POOL_add(pool, (POOL_function)compress_worker, &workerStates[currentThread]);
}
else {
compress_worker(&workerStates[currentThread]);
}
currentThread++;
workerStartOffset = i + 1;
workerBytes = 0;
}
}
if (workerBytes) {
assert(currentThread < threadCount);
workerStates[currentThread].totalSourceSize = workerBytes;
workerStates[currentThread].startOffset = workerStartOffset;
workerStates[currentThread].endOffset = sources->sourcesSize - 1;
if (threadCount > 1) {
POOL_add(pool, (POOL_function)compress_worker, &workerStates[currentThread]);
}
else {
compress_worker(&workerStates[currentThread]);
}
}
if (threadCount > 1) {
POOL_free(pool);
pool = NULL;
}
Py_END_ALLOW_THREADS
for (i = 0; i < threadCount; i++) {
switch (workerStates[i].error) {
case WorkerError_no_memory:
PyErr_NoMemory();
errored = 1;
break;
case WorkerError_zstd:
PyErr_Format(ZstdError, "error compressing item %zd: %s",
workerStates[i].errorOffset, ZSTD_getErrorName(workerStates[i].zresult));
errored = 1;
break;
case WorkerError_nospace:
PyErr_Format(ZstdError, "error compressing item %zd: not enough space in output",
workerStates[i].errorOffset);
errored = 1;
break;
default:
;
}
if (errored) {
break;
}
}
if (errored) {
goto finally;
}
segmentsCount = 0;
for (i = 0; i < threadCount; i++) {
WorkerState* state = &workerStates[i];
segmentsCount += state->destCount;
}
segmentsArg = PyTuple_New(segmentsCount);
if (NULL == segmentsArg) {
goto finally;
}
segmentIndex = 0;
for (i = 0; i < threadCount; i++) {
Py_ssize_t j;
WorkerState* state = &workerStates[i];
for (j = 0; j < state->destCount; j++) {
DestBuffer* destBuffer = &state->destBuffers[j];
buffer = BufferWithSegments_FromMemory(destBuffer->dest, destBuffer->destSize,
destBuffer->segments, destBuffer->segmentsSize);
if (NULL == buffer) {
goto finally;
}
/* Tell instance to use free() instsead of PyMem_Free(). */
buffer->useFree = 1;
/*
* BufferWithSegments_FromMemory takes ownership of the backing memory.
* Unset it here so it doesn't get freed below.
*/
destBuffer->dest = NULL;
destBuffer->segments = NULL;
PyTuple_SET_ITEM(segmentsArg, segmentIndex++, (PyObject*)buffer);
}
}
result = (ZstdBufferWithSegmentsCollection*)PyObject_CallObject(
(PyObject*)&ZstdBufferWithSegmentsCollectionType, segmentsArg);
finally:
Py_CLEAR(segmentsArg);
if (pool) {
POOL_free(pool);
}
if (workerStates) {
Py_ssize_t j;
for (i = 0; i < threadCount; i++) {
WorkerState state = workerStates[i];
if (state.cctx) {
ZSTD_freeCCtx(state.cctx);
}
/* malloc() is used in worker thread. */
for (j = 0; j < state.destCount; j++) {
if (state.destBuffers) {
free(state.destBuffers[j].dest);
free(state.destBuffers[j].segments);
}
}
free(state.destBuffers);
}
PyMem_Free(workerStates);
}
return result;
}
PyDoc_STRVAR(ZstdCompressor_multi_compress_to_buffer__doc__,
"Compress multiple pieces of data as a single operation\n"
"\n"
"Receives a ``BufferWithSegmentsCollection``, a ``BufferWithSegments``, or\n"
"a list of bytes like objects holding data to compress.\n"
"\n"
"Returns a ``BufferWithSegmentsCollection`` holding compressed data.\n"
"\n"
"This function is optimized to perform multiple compression operations as\n"
"as possible with as little overhead as possbile.\n"
);
static ZstdBufferWithSegmentsCollection* ZstdCompressor_multi_compress_to_buffer(ZstdCompressor* self, PyObject* args, PyObject* kwargs) {
static char* kwlist[] = {
"data",
"threads",
NULL
};
PyObject* data;
int threads = 0;
Py_buffer* dataBuffers = NULL;
DataSources sources;
Py_ssize_t i;
Py_ssize_t sourceCount = 0;
ZstdBufferWithSegmentsCollection* result = NULL;
memset(&sources, 0, sizeof(sources));
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|i:multi_compress_to_buffer", kwlist,
&data, &threads)) {
return NULL;
}
if (threads < 0) {
threads = cpu_count();
}
if (threads < 2) {
threads = 1;
}
if (PyObject_TypeCheck(data, &ZstdBufferWithSegmentsType)) {
ZstdBufferWithSegments* buffer = (ZstdBufferWithSegments*)data;
sources.sources = PyMem_Malloc(buffer->segmentCount * sizeof(DataSource));
if (NULL == sources.sources) {
PyErr_NoMemory();
goto finally;
}
for (i = 0; i < buffer->segmentCount; i++) {
if (buffer->segments[i].length > SIZE_MAX) {
PyErr_Format(PyExc_ValueError,
"buffer segment %zd is too large for this platform", i);
goto finally;
}
sources.sources[i].sourceData = (char*)buffer->data + buffer->segments[i].offset;
sources.sources[i].sourceSize = (size_t)buffer->segments[i].length;
sources.totalSourceSize += buffer->segments[i].length;
}
sources.sourcesSize = buffer->segmentCount;
}
else if (PyObject_TypeCheck(data, &ZstdBufferWithSegmentsCollectionType)) {
Py_ssize_t j;
Py_ssize_t offset = 0;
ZstdBufferWithSegments* buffer;
ZstdBufferWithSegmentsCollection* collection = (ZstdBufferWithSegmentsCollection*)data;
sourceCount = BufferWithSegmentsCollection_length(collection);
sources.sources = PyMem_Malloc(sourceCount * sizeof(DataSource));
if (NULL == sources.sources) {
PyErr_NoMemory();
goto finally;
}
for (i = 0; i < collection->bufferCount; i++) {
buffer = collection->buffers[i];
for (j = 0; j < buffer->segmentCount; j++) {
if (buffer->segments[j].length > SIZE_MAX) {
PyErr_Format(PyExc_ValueError,
"buffer segment %zd in buffer %zd is too large for this platform",
j, i);
goto finally;
}
sources.sources[offset].sourceData = (char*)buffer->data + buffer->segments[j].offset;
sources.sources[offset].sourceSize = (size_t)buffer->segments[j].length;
sources.totalSourceSize += buffer->segments[j].length;
offset++;
}
}
sources.sourcesSize = sourceCount;
}
else if (PyList_Check(data)) {
sourceCount = PyList_GET_SIZE(data);
sources.sources = PyMem_Malloc(sourceCount * sizeof(DataSource));
if (NULL == sources.sources) {
PyErr_NoMemory();
goto finally;
}
dataBuffers = PyMem_Malloc(sourceCount * sizeof(Py_buffer));
if (NULL == dataBuffers) {
PyErr_NoMemory();
goto finally;
}
memset(dataBuffers, 0, sourceCount * sizeof(Py_buffer));
for (i = 0; i < sourceCount; i++) {
if (0 != PyObject_GetBuffer(PyList_GET_ITEM(data, i),
&dataBuffers[i], PyBUF_CONTIG_RO)) {
PyErr_Clear();
PyErr_Format(PyExc_TypeError, "item %zd not a bytes like object", i);
goto finally;
}
sources.sources[i].sourceData = dataBuffers[i].buf;
sources.sources[i].sourceSize = dataBuffers[i].len;
sources.totalSourceSize += dataBuffers[i].len;
}
sources.sourcesSize = sourceCount;
}
else {
PyErr_SetString(PyExc_TypeError, "argument must be list of BufferWithSegments");
goto finally;
}
if (0 == sources.sourcesSize) {
PyErr_SetString(PyExc_ValueError, "no source elements found");
goto finally;
}
if (0 == sources.totalSourceSize) {
PyErr_SetString(PyExc_ValueError, "source elements are empty");
goto finally;
}
if (sources.totalSourceSize > SIZE_MAX) {
PyErr_SetString(PyExc_ValueError, "sources are too large for this platform");
goto finally;
}
result = compress_from_datasources(self, &sources, threads);
finally:
PyMem_Free(sources.sources);
if (dataBuffers) {
for (i = 0; i < sourceCount; i++) {
PyBuffer_Release(&dataBuffers[i]);
}
PyMem_Free(dataBuffers);
}
return result;
}
static PyMethodDef ZstdCompressor_methods[] = {
{ "chunker", (PyCFunction)ZstdCompressor_chunker,
METH_VARARGS | METH_KEYWORDS, ZstdCompressor_chunker__doc__ },
{ "compress", (PyCFunction)ZstdCompressor_compress,
METH_VARARGS | METH_KEYWORDS, ZstdCompressor_compress__doc__ },
{ "compressobj", (PyCFunction)ZstdCompressor_compressobj,
METH_VARARGS | METH_KEYWORDS, ZstdCompressionObj__doc__ },
{ "copy_stream", (PyCFunction)ZstdCompressor_copy_stream,
METH_VARARGS | METH_KEYWORDS, ZstdCompressor_copy_stream__doc__ },
{ "stream_reader", (PyCFunction)ZstdCompressor_stream_reader,
METH_VARARGS | METH_KEYWORDS, ZstdCompressor_stream_reader__doc__ },
{ "stream_writer", (PyCFunction)ZstdCompressor_stream_writer,
METH_VARARGS | METH_KEYWORDS, ZstdCompressor_stream_writer___doc__ },
{ "read_to_iter", (PyCFunction)ZstdCompressor_read_to_iter,
METH_VARARGS | METH_KEYWORDS, ZstdCompressor_read_to_iter__doc__ },
/* TODO Remove deprecated API */
{ "read_from", (PyCFunction)ZstdCompressor_read_to_iter,
METH_VARARGS | METH_KEYWORDS, ZstdCompressor_read_to_iter__doc__ },
/* TODO remove deprecated API */
{ "write_to", (PyCFunction)ZstdCompressor_stream_writer,
METH_VARARGS | METH_KEYWORDS, ZstdCompressor_stream_writer___doc__ },
{ "multi_compress_to_buffer", (PyCFunction)ZstdCompressor_multi_compress_to_buffer,
METH_VARARGS | METH_KEYWORDS, ZstdCompressor_multi_compress_to_buffer__doc__ },
{ "memory_size", (PyCFunction)ZstdCompressor_memory_size,
METH_NOARGS, ZstdCompressor_memory_size__doc__ },
{ "frame_progression", (PyCFunction)ZstdCompressor_frame_progression,
METH_NOARGS, ZstdCompressor_frame_progression__doc__ },
{ NULL, NULL }
};
PyTypeObject ZstdCompressorType = {
PyVarObject_HEAD_INIT(NULL, 0)
"zstd.ZstdCompressor", /* tp_name */
sizeof(ZstdCompressor), /* tp_basicsize */
0, /* tp_itemsize */
(destructor)ZstdCompressor_dealloc, /* tp_dealloc */
0, /* tp_print */
0, /* tp_getattr */
0, /* tp_setattr */
0, /* tp_compare */
0, /* tp_repr */
0, /* tp_as_number */
0, /* tp_as_sequence */
0, /* tp_as_mapping */
0, /* tp_hash */
0, /* tp_call */
0, /* tp_str */
0, /* tp_getattro */
0, /* tp_setattro */
0, /* tp_as_buffer */
Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE, /* tp_flags */
ZstdCompressor__doc__, /* tp_doc */
0, /* tp_traverse */
0, /* tp_clear */
0, /* tp_richcompare */
0, /* tp_weaklistoffset */
0, /* tp_iter */
0, /* tp_iternext */
ZstdCompressor_methods, /* tp_methods */
0, /* tp_members */
0, /* tp_getset */
0, /* tp_base */
0, /* tp_dict */
0, /* tp_descr_get */
0, /* tp_descr_set */
0, /* tp_dictoffset */
(initproc)ZstdCompressor_init, /* tp_init */
0, /* tp_alloc */
PyType_GenericNew, /* tp_new */
};
void compressor_module_init(PyObject* mod) {
Py_TYPE(&ZstdCompressorType) = &PyType_Type;
if (PyType_Ready(&ZstdCompressorType) < 0) {
return;
}
Py_INCREF((PyObject*)&ZstdCompressorType);
PyModule_AddObject(mod, "ZstdCompressor",
(PyObject*)&ZstdCompressorType);
}