##// END OF EJS Templates
wireprotov2peer: stream decoded responses...
wireprotov2peer: stream decoded responses Previously, wire protocol version 2 would buffer all response data. Only once all data was received did we CBOR decode it and resolve the future associated with the command. This was obviously not desirable. In future commits that introduce large response payloads, this caused significant memory bloat and slowed down client operations due to waiting on the server. This commit refactors the response handling code so that response data can be streamed. Command response objects now contain a buffered CBOR decoder. As new data arrives, it is fed into the decoder. Decoded objects are made available to the generator as they are decoded. Because there is a separate thread processing incoming frames and feeding data into the response object, there is the potential for race conditions when mutating response objects. So a lock has been added to guard access to critical state variables. Because the generator emitting decoded objects needs to wait on those objects to become available, we've added an Event for the generator to wait on so it doesn't busy loop. This does mean there is the potential for deadlocks. And I'm pretty sure they can occur in some scenarios. We already have a handful of TODOs around this. But I've added some more. Fixing this will likely require moving the background thread receiving frames into clienthandler. We likely would have done this anyway when implementing the client bits for the SSH transport. Test output changes because the initial CBOR map holding the overall response state is now always handled internally by the response object. Differential Revision: https://phab.mercurial-scm.org/D4474

File last commit:

r37513:b1fb341d default
r39597:d06834e0 default
Show More
python-zstandard.h
346 lines | 8.3 KiB | text/x-c | CLexer
/**
* Copyright (c) 2016-present, Gregory Szorc
* All rights reserved.
*
* This software may be modified and distributed under the terms
* of the BSD license. See the LICENSE file for details.
*/
#define PY_SSIZE_T_CLEAN
#include <Python.h>
#include "structmember.h"
#define ZSTD_STATIC_LINKING_ONLY
#define ZDICT_STATIC_LINKING_ONLY
#include <zstd.h>
#include <zdict.h>
#define PYTHON_ZSTANDARD_VERSION "0.9.0"
typedef enum {
compressorobj_flush_finish,
compressorobj_flush_block,
} CompressorObj_Flush;
/*
Represents a ZstdCompressionParameters type.
This type holds all the low-level compression parameters that can be set.
*/
typedef struct {
PyObject_HEAD
ZSTD_CCtx_params* params;
unsigned format;
int compressionLevel;
unsigned windowLog;
unsigned hashLog;
unsigned chainLog;
unsigned searchLog;
unsigned minMatch;
unsigned targetLength;
unsigned compressionStrategy;
unsigned contentSizeFlag;
unsigned checksumFlag;
unsigned dictIDFlag;
unsigned threads;
unsigned jobSize;
unsigned overlapSizeLog;
unsigned compressLiterals;
unsigned forceMaxWindow;
unsigned enableLongDistanceMatching;
unsigned ldmHashLog;
unsigned ldmMinMatch;
unsigned ldmBucketSizeLog;
unsigned ldmHashEveryLog;
} ZstdCompressionParametersObject;
extern PyTypeObject ZstdCompressionParametersType;
/*
Represents a FrameParameters type.
This type is basically a wrapper around ZSTD_frameParams.
*/
typedef struct {
PyObject_HEAD
unsigned long long frameContentSize;
unsigned long long windowSize;
unsigned dictID;
char checksumFlag;
} FrameParametersObject;
extern PyTypeObject FrameParametersType;
/*
Represents a ZstdCompressionDict type.
Instances hold data used for a zstd compression dictionary.
*/
typedef struct {
PyObject_HEAD
/* Pointer to dictionary data. Owned by self. */
void* dictData;
/* Size of dictionary data. */
size_t dictSize;
ZSTD_dictContentType_e dictType;
/* k parameter for cover dictionaries. Only populated by train_cover_dict(). */
unsigned k;
/* d parameter for cover dictionaries. Only populated by train_cover_dict(). */
unsigned d;
/* Digested dictionary, suitable for reuse. */
ZSTD_CDict* cdict;
ZSTD_DDict* ddict;
} ZstdCompressionDict;
extern PyTypeObject ZstdCompressionDictType;
/*
Represents a ZstdCompressor type.
*/
typedef struct {
PyObject_HEAD
/* Number of threads to use for operations. */
unsigned int threads;
/* Pointer to compression dictionary to use. NULL if not using dictionary
compression. */
ZstdCompressionDict* dict;
/* Compression context to use. Populated during object construction. */
ZSTD_CCtx* cctx;
/* Compression parameters in use. */
ZSTD_CCtx_params* params;
} ZstdCompressor;
extern PyTypeObject ZstdCompressorType;
typedef struct {
PyObject_HEAD
ZstdCompressor* compressor;
ZSTD_outBuffer output;
int finished;
} ZstdCompressionObj;
extern PyTypeObject ZstdCompressionObjType;
typedef struct {
PyObject_HEAD
ZstdCompressor* compressor;
PyObject* writer;
unsigned long long sourceSize;
size_t outSize;
int entered;
unsigned long long bytesCompressed;
} ZstdCompressionWriter;
extern PyTypeObject ZstdCompressionWriterType;
typedef struct {
PyObject_HEAD
ZstdCompressor* compressor;
PyObject* reader;
Py_buffer buffer;
Py_ssize_t bufferOffset;
size_t inSize;
size_t outSize;
ZSTD_inBuffer input;
ZSTD_outBuffer output;
int finishedOutput;
int finishedInput;
PyObject* readResult;
} ZstdCompressorIterator;
extern PyTypeObject ZstdCompressorIteratorType;
typedef struct {
PyObject_HEAD
ZstdCompressor* compressor;
PyObject* reader;
Py_buffer buffer;
unsigned long long sourceSize;
size_t readSize;
int entered;
int closed;
unsigned long long bytesCompressed;
ZSTD_inBuffer input;
ZSTD_outBuffer output;
int finishedInput;
int finishedOutput;
PyObject* readResult;
} ZstdCompressionReader;
extern PyTypeObject ZstdCompressionReaderType;
typedef struct {
PyObject_HEAD
ZSTD_DCtx* dctx;
ZstdCompressionDict* dict;
size_t maxWindowSize;
ZSTD_format_e format;
} ZstdDecompressor;
extern PyTypeObject ZstdDecompressorType;
typedef struct {
PyObject_HEAD
ZstdDecompressor* decompressor;
size_t outSize;
int finished;
} ZstdDecompressionObj;
extern PyTypeObject ZstdDecompressionObjType;
typedef struct {
PyObject_HEAD
/* Parent decompressor to which this object is associated. */
ZstdDecompressor* decompressor;
/* Object to read() from (if reading from a stream). */
PyObject* reader;
/* Size for read() operations on reader. */
size_t readSize;
/* Buffer to read from (if reading from a buffer). */
Py_buffer buffer;
/* Whether the context manager is active. */
int entered;
/* Whether we've closed the stream. */
int closed;
/* Number of bytes decompressed and returned to user. */
unsigned long long bytesDecompressed;
/* Tracks data going into decompressor. */
ZSTD_inBuffer input;
/* Holds output from read() operation on reader. */
PyObject* readResult;
/* Whether all input has been sent to the decompressor. */
int finishedInput;
/* Whether all output has been flushed from the decompressor. */
int finishedOutput;
} ZstdDecompressionReader;
extern PyTypeObject ZstdDecompressionReaderType;
typedef struct {
PyObject_HEAD
ZstdDecompressor* decompressor;
PyObject* writer;
size_t outSize;
int entered;
} ZstdDecompressionWriter;
extern PyTypeObject ZstdDecompressionWriterType;
typedef struct {
PyObject_HEAD
ZstdDecompressor* decompressor;
PyObject* reader;
Py_buffer buffer;
Py_ssize_t bufferOffset;
size_t inSize;
size_t outSize;
size_t skipBytes;
ZSTD_inBuffer input;
ZSTD_outBuffer output;
Py_ssize_t readCount;
int finishedInput;
int finishedOutput;
} ZstdDecompressorIterator;
extern PyTypeObject ZstdDecompressorIteratorType;
typedef struct {
int errored;
PyObject* chunk;
} DecompressorIteratorResult;
typedef struct {
/* The public API is that these are 64-bit unsigned integers. So these can't
* be size_t, even though values larger than SIZE_MAX or PY_SSIZE_T_MAX may
* be nonsensical for this platform. */
unsigned long long offset;
unsigned long long length;
} BufferSegment;
typedef struct {
PyObject_HEAD
PyObject* parent;
BufferSegment* segments;
Py_ssize_t segmentCount;
} ZstdBufferSegments;
extern PyTypeObject ZstdBufferSegmentsType;
typedef struct {
PyObject_HEAD
PyObject* parent;
void* data;
Py_ssize_t dataSize;
unsigned long long offset;
} ZstdBufferSegment;
extern PyTypeObject ZstdBufferSegmentType;
typedef struct {
PyObject_HEAD
Py_buffer parent;
void* data;
unsigned long long dataSize;
BufferSegment* segments;
Py_ssize_t segmentCount;
int useFree;
} ZstdBufferWithSegments;
extern PyTypeObject ZstdBufferWithSegmentsType;
/**
* An ordered collection of BufferWithSegments exposed as a squashed collection.
*
* This type provides a virtual view spanning multiple BufferWithSegments
* instances. It allows multiple instances to be "chained" together and
* exposed as a single collection. e.g. if there are 2 buffers holding
* 10 segments each, then o[14] will access the 5th segment in the 2nd buffer.
*/
typedef struct {
PyObject_HEAD
/* An array of buffers that should be exposed through this instance. */
ZstdBufferWithSegments** buffers;
/* Number of elements in buffers array. */
Py_ssize_t bufferCount;
/* Array of first offset in each buffer instance. 0th entry corresponds
to number of elements in the 0th buffer. 1st entry corresponds to the
sum of elements in 0th and 1st buffers. */
Py_ssize_t* firstElements;
} ZstdBufferWithSegmentsCollection;
extern PyTypeObject ZstdBufferWithSegmentsCollectionType;
int set_parameter(ZSTD_CCtx_params* params, ZSTD_cParameter param, unsigned value);
int set_parameters(ZSTD_CCtx_params* params, ZstdCompressionParametersObject* obj);
FrameParametersObject* get_frame_parameters(PyObject* self, PyObject* args, PyObject* kwargs);
int ensure_ddict(ZstdCompressionDict* dict);
int ensure_dctx(ZstdDecompressor* decompressor, int loadDict);
ZstdCompressionDict* train_dictionary(PyObject* self, PyObject* args, PyObject* kwargs);
ZstdBufferWithSegments* BufferWithSegments_FromMemory(void* data, unsigned long long dataSize, BufferSegment* segments, Py_ssize_t segmentsSize);
Py_ssize_t BufferWithSegmentsCollection_length(ZstdBufferWithSegmentsCollection*);
int cpu_count(void);
size_t roundpow2(size_t);
int safe_pybytes_resize(PyObject** obj, Py_ssize_t size);