diff --git a/contrib/python-zstandard/NEWS.rst b/contrib/python-zstandard/NEWS.rst --- a/contrib/python-zstandard/NEWS.rst +++ b/contrib/python-zstandard/NEWS.rst @@ -1,6 +1,34 @@ Version History =============== +0.8.0 (released 2017-03-08) +--------------------------- + +* CompressionParameters now has a estimated_compression_context_size() method. + zstd.estimate_compression_context_size() is now deprecated and slated for + removal. +* Implemented a lot of fuzzing tests. +* CompressionParameters instances now perform extra validation by calling + ZSTD_checkCParams() at construction time. +* multi_compress_to_buffer() API for compressing multiple inputs as a + single operation, as efficiently as possible. +* ZSTD_CStream instances are now used across multiple operations on + ZstdCompressor instances, resulting in much better performance for + APIs that do streaming. +* ZSTD_DStream instances are now used across multiple operations on + ZstdDecompressor instances, resulting in much better performance for + APIs that do streaming. +* train_dictionary() now releases the GIL. +* Support for training dictionaries using the COVER algorithm. +* multi_decompress_to_buffer() API for decompressing multiple frames as a + single operation, as efficiently as possible. +* Support for multi-threaded compression. +* Disable deprecation warnings when compiling CFFI module. +* Fixed memory leak in train_dictionary(). +* Removed DictParameters type. +* train_dictionary() now accepts keyword arguments instead of a + DictParameters instance to control dictionary generation. + 0.7.0 (released 2017-02-07) --------------------------- diff --git a/contrib/python-zstandard/README.rst b/contrib/python-zstandard/README.rst --- a/contrib/python-zstandard/README.rst +++ b/contrib/python-zstandard/README.rst @@ -20,9 +20,11 @@ State of Project ================ The project is officially in beta state. The author is reasonably satisfied -with the current API and that functionality works as advertised. There -may be some backwards incompatible changes before 1.0. Though the author -does not intend to make any major changes to the Python API. +that functionality works as advertised. **There will be some backwards +incompatible changes before 1.0, probably in the 0.9 release.** This may +involve renaming the main module from *zstd* to *zstandard* and renaming +various types and methods. Pin the package version to prevent unwanted +breakage when this change occurs! This project is vendored and distributed with Mercurial 4.1, where it is used in a production capacity. @@ -32,6 +34,10 @@ on Linux x86_x64 and Windows x86 and x86 confident the extension is stable and works as advertised on these platforms. +The CFFI bindings are mostly feature complete. Where a feature is implemented +in CFFI, unit tests run against both C extension and CFFI implementation to +ensure behavior parity. + Expected Changes ---------------- @@ -47,13 +53,20 @@ sizes using zstd's preferred defaults). There should be an API that accepts an object that conforms to the buffer interface and returns an iterator over compressed or decompressed output. +There should be an API that exposes an ``io.RawIOBase`` interface to +compressor and decompressor streams, like how ``gzip.GzipFile`` from +the standard library works (issue 13). + The author is on the fence as to whether to support the extremely low level compression and decompression APIs. It could be useful to support compression without the framing headers. But the author doesn't believe it a high priority at this time. -The CFFI bindings are feature complete and all tests run against both -the C extension and CFFI bindings to ensure behavior parity. +There will likely be a refactoring of the module names. Currently, +``zstd`` is a C extension and ``zstd_cffi`` is the CFFI interface. +This means that all code for the C extension must be implemented in +C. ``zstd`` may be converted to a Python module so code can be reused +between CFFI and C and so not all code in the C extension has to be C. Requirements ============ @@ -152,10 +165,13 @@ A Tox configuration is present to test a $ tox Tests use the ``hypothesis`` Python package to perform fuzzing. If you -don't have it, those tests won't run. +don't have it, those tests won't run. Since the fuzzing tests take longer +to execute than normal tests, you'll need to opt in to running them by +setting the ``ZSTD_SLOW_TESTS`` environment variable. This is set +automatically when using ``tox``. -There is also an experimental CFFI module. You need the ``cffi`` Python -package installed to build and test that. +The ``cffi`` Python package needs to be installed in order to build the CFFI +bindings. If it isn't present, the CFFI bindings won't be built. To create a virtualenv with all development dependencies, do something like the following:: @@ -172,8 +188,16 @@ like the following:: API === -The compiled C extension provides a ``zstd`` Python module. This module -exposes the following interfaces. +The compiled C extension provides a ``zstd`` Python module. The CFFI +bindings provide a ``zstd_cffi`` module. Both provide an identical API +interface. The types, functions, and attributes exposed by these modules +are documented in the sections below. + +.. note:: + + The documentation in this section makes references to various zstd + concepts and functionality. The ``Concepts`` section below explains + these concepts in more detail. ZstdCompressor -------------- @@ -209,6 +233,14 @@ write_dict_id Whether to write the dictionary ID into the compressed data. Defaults to True. The dictionary ID is only written if a dictionary is being used. +threads + Enables and sets the number of threads to use for multi-threaded compression + operations. Defaults to 0, which means to use single-threaded compression. + Negative values will resolve to the number of logical CPUs in the system. + Read below for more info on multi-threaded compression. This argument only + controls thread count for operations that operate on individual pieces of + data. APIs that spawn multiple threads for working on multiple pieces of + data have their own ``threads`` argument. Unless specified otherwise, assume that no two methods of ``ZstdCompressor`` instances can be called from multiple Python threads simultaneously. In other @@ -222,6 +254,8 @@ Simple API cctx = zstd.ZstdCompressor() compressed = cctx.compress(b'data to compress') +The ``data`` argument can be any object that implements the *buffer protocol*. + Unless ``compression_params`` or ``dict_data`` are passed to the ``ZstdCompressor``, each invocation of ``compress()`` will calculate the optimal compression parameters for the configured compression ``level`` and @@ -411,6 +445,42 @@ the compressor:: data = cobj.compress(b'foobar') data = cobj.flush() +Batch Compression API +^^^^^^^^^^^^^^^^^^^^^ + +(Experimental. Not yet supported in CFFI bindings.) + +``multi_compress_to_buffer(data, [threads=0])`` performs compression of multiple +inputs as a single operation. + +Data to be compressed can be passed as a ``BufferWithSegmentsCollection``, a +``BufferWithSegments``, or a list containing byte like objects. Each element of +the container will be compressed individually using the configured parameters +on the ``ZstdCompressor`` instance. + +The ``threads`` argument controls how many threads to use for compression. The +default is ``0`` which means to use a single thread. Negative values use the +number of logical CPUs in the machine. + +The function returns a ``BufferWithSegmentsCollection``. This type represents +N discrete memory allocations, eaching holding 1 or more compressed frames. + +Output data is written to shared memory buffers. This means that unlike +regular Python objects, a reference to *any* object within the collection +keeps the shared buffer and therefore memory backing it alive. This can have +undesirable effects on process memory usage. + +The API and behavior of this function is experimental and will likely change. +Known deficiencies include: + +* If asked to use multiple threads, it will always spawn that many threads, + even if the input is too small to use them. It should automatically lower + the thread count when the extra threads would just add overhead. +* The buffer allocation strategy is fixed. There is room to make it dynamic, + perhaps even to allow one output buffer per input, facilitating a variation + of the API to return a list without the adverse effects of shared memory + buffers. + ZstdDecompressor ---------------- @@ -585,6 +655,60 @@ Here is how this API should be used:: data = dobj.decompress(compressed_chunk_0) data = dobj.decompress(compressed_chunk_1) +Batch Decompression API +^^^^^^^^^^^^^^^^^^^^^^^ + +(Experimental. Not yet supported in CFFI bindings.) + +``multi_decompress_to_buffer()`` performs decompression of multiple +frames as a single operation and returns a ``BufferWithSegmentsCollection`` +containing decompressed data for all inputs. + +Compressed frames can be passed to the function as a ``BufferWithSegments``, +a ``BufferWithSegmentsCollection``, or as a list containing objects that +conform to the buffer protocol. For best performance, pass a +``BufferWithSegmentsCollection`` or a ``BufferWithSegments``, as +minimal input validation will be done for that type. If calling from +Python (as opposed to C), constructing one of these instances may add +overhead cancelling out the performance overhead of validation for list +inputs. + +The decompressed size of each frame must be discoverable. It can either be +embedded within the zstd frame (``write_content_size=True`` argument to +``ZstdCompressor``) or passed in via the ``decompressed_sizes`` argument. + +The ``decompressed_sizes`` argument is an object conforming to the buffer +protocol which holds an array of 64-bit unsigned integers in the machine's +native format defining the decompressed sizes of each frame. If this argument +is passed, it avoids having to scan each frame for its decompressed size. +This frame scanning can add noticeable overhead in some scenarios. + +The ``threads`` argument controls the number of threads to use to perform +decompression operations. The default (``0``) or the value ``1`` means to +use a single thread. Negative values use the number of logical CPUs in the +machine. + +.. note:: + + It is possible to pass a ``mmap.mmap()`` instance into this function by + wrapping it with a ``BufferWithSegments`` instance (which will define the + offsets of frames within the memory mapped region). + +This function is logically equivalent to performing ``dctx.decompress()`` +on each input frame and returning the result. + +This function exists to perform decompression on multiple frames as fast +as possible by having as little overhead as possible. Since decompression is +performed as a single operation and since the decompressed output is stored in +a single buffer, extra memory allocations, Python objects, and Python function +calls are avoided. This is ideal for scenarios where callers need to access +decompressed data for multiple frames. + +Currently, the implementation always spawns multiple threads when requested, +even if the amount of work to do is small. In the future, it will be smarter +about avoiding threads and their associated overhead when the amount of +work to do is small. + Content-Only Dictionary Chain Decompression ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ @@ -609,20 +733,20 @@ Each zstd frame **must** have the conten The following Python code can be used to produce a *content-only dictionary chain*:: - def make_chain(inputs): - frames = [] + def make_chain(inputs): + frames = [] - # First frame is compressed in standalone/discrete mode. - zctx = zstd.ZstdCompressor(write_content_size=True) - frames.append(zctx.compress(inputs[0])) + # First frame is compressed in standalone/discrete mode. + zctx = zstd.ZstdCompressor(write_content_size=True) + frames.append(zctx.compress(inputs[0])) - # Subsequent frames use the previous fulltext as a content-only dictionary - for i, raw in enumerate(inputs[1:]): - dict_data = zstd.ZstdCompressionDict(inputs[i]) - zctx = zstd.ZstdCompressor(write_content_size=True, dict_data=dict_data) - frames.append(zctx.compress(raw)) + # Subsequent frames use the previous fulltext as a content-only dictionary + for i, raw in enumerate(inputs[1:]): + dict_data = zstd.ZstdCompressionDict(inputs[i]) + zctx = zstd.ZstdCompressor(write_content_size=True, dict_data=dict_data) + frames.append(zctx.compress(raw)) - return frames + return frames ``decompress_content_dict_chain()`` returns the uncompressed data of the last element in the input chain. @@ -632,59 +756,42 @@ on top of other Python APIs. However, th faster, especially for long input chains, as it avoids the overhead of instantiating and passing around intermediate objects between C and Python. -Choosing an API ---------------- - -Various forms of compression and decompression APIs are provided because each -are suitable for different use cases. +Multi-Threaded Compression +-------------------------- -The simple/one-shot APIs are useful for small data, when the decompressed -data size is known (either recorded in the zstd frame header via -``write_content_size`` or known via an out-of-band mechanism, such as a file -size). +``ZstdCompressor`` accepts a ``threads`` argument that controls the number +of threads to use for compression. The way this works is that input is split +into segments and each segment is fed into a worker pool for compression. Once +a segment is compressed, it is flushed/appended to the output. -A limitation of the simple APIs is that input or output data must fit in memory. -And unless using advanced tricks with Python *buffer objects*, both input and -output must fit in memory simultaneously. - -Another limitation is that compression or decompression is performed as a single -operation. So if you feed large input, it could take a long time for the -function to return. +The segment size for multi-threaded compression is chosen from the window size +of the compressor. This is derived from the ``window_log`` attribute of a +``CompressionParameters`` instance. By default, segment sizes are in the 1+MB +range. -The streaming APIs do not have the limitations of the simple API. The cost to -this is they are more complex to use than a single function call. - -The streaming APIs put the caller in control of compression and decompression -behavior by allowing them to directly control either the input or output side -of the operation. - -With the streaming input APIs, the caller feeds data into the compressor or -decompressor as they see fit. Output data will only be written after the caller -has explicitly written data. +If multi-threaded compression is requested and the input is smaller than the +configured segment size, only a single compression thread will be used. If the +input is smaller than the segment size multiplied by the thread pool size or +if data cannot be delivered to the compressor fast enough, not all requested +compressor threads may be active simultaneously. -With the streaming output APIs, the caller consumes output from the compressor -or decompressor as they see fit. The compressor or decompressor will only -consume data from the source when the caller is ready to receive it. +Compared to non-multi-threaded compression, multi-threaded compression has +higher per-operation overhead. This includes extra memory operations, +thread creation, lock acquisition, etc. -One end of the streaming APIs involves a file-like object that must -``write()`` output data or ``read()`` input data. Depending on what the -backing storage for these objects is, those operations may not complete quickly. -For example, when streaming compressed data to a file, the ``write()`` into -a streaming compressor could result in a ``write()`` to the filesystem, which -may take a long time to finish due to slow I/O on the filesystem. So, there -may be overhead in streaming APIs beyond the compression and decompression -operations. +Due to the nature of multi-threaded compression using *N* compression +*states*, the output from multi-threaded compression will likely be larger +than non-multi-threaded compression. The difference is usually small. But +there is a CPU/wall time versus size trade off that may warrant investigation. + +Output from multi-threaded compression does not require any special handling +on the decompression side. In other words, any zstd decompressor should be able +to consume data produced with multi-threaded compression. Dictionary Creation and Management ---------------------------------- -Zstandard allows *dictionaries* to be used when compressing and -decompressing data. The idea is that if you are compressing a lot of similar -data, you can precompute common properties of that data (such as recurring -byte sequences) to achieve better compression ratios. - -In Python, compression dictionaries are represented as the -``ZstdCompressionDict`` type. +Compression dictionaries are represented as the ``ZstdCompressionDict`` type. Instances can be constructed from bytes:: @@ -736,6 +843,88 @@ a ``ZstdCompressionDict`` later) via ``a dict_data = zstd.train_dictionary(size, samples) raw_data = dict_data.as_bytes() +The following named arguments to ``train_dictionary`` can also be used +to further control dictionary generation. + +selectivity + Integer selectivity level. Default is 9. Larger values yield more data in + dictionary. +level + Integer compression level. Default is 6. +dict_id + Integer dictionary ID for the produced dictionary. Default is 0, which + means to use a random value. +notifications + Controls writing of informational messages to ``stderr``. ``0`` (the + default) means to write nothing. ``1`` writes errors. ``2`` writes + progression info. ``3`` writes more details. And ``4`` writes all info. + +Cover Dictionaries +^^^^^^^^^^^^^^^^^^ + +An alternate dictionary training mechanism named *cover* is also available. +More details about this training mechanism are available in the paper +*Effective Construction of Relative Lempel-Ziv Dictionaries* (authors: +Liao, Petri, Moffat, Wirth). + +To use this mechanism, use ``zstd.train_cover_dictionary()`` instead of +``zstd.train_dictionary()``. The function behaves nearly the same except +its arguments are different and the returned dictionary will contain ``k`` +and ``d`` attributes reflecting the parameters to the cover algorithm. + +.. note:: + + The ``k`` and ``d`` attributes are only populated on dictionary + instances created by this function. If a ``ZstdCompressionDict`` is + constructed from raw bytes data, the ``k`` and ``d`` attributes will + be ``0``. + +The segment and dmer size parameters to the cover algorithm can either be +specified manually or you can ask ``train_cover_dictionary()`` to try +multiple values and pick the best one, where *best* means the smallest +compressed data size. + +In manual mode, the ``k`` and ``d`` arguments must be specified or a +``ZstdError`` will be raised. + +In automatic mode (triggered by specifying ``optimize=True``), ``k`` +and ``d`` are optional. If a value isn't specified, then default values for +both are tested. The ``steps`` argument can control the number of steps +through ``k`` values. The ``level`` argument defines the compression level +that will be used when testing the compressed size. And ``threads`` can +specify the number of threads to use for concurrent operation. + +This function takes the following arguments: + +dict_size + Target size in bytes of the dictionary to generate. +samples + A list of bytes holding samples the dictionary will be trained from. +k + Parameter to cover algorithm defining the segment size. A reasonable range + is [16, 2048+]. +d + Parameter to cover algorithm defining the dmer size. A reasonable range is + [6, 16]. ``d`` must be less than or equal to ``k``. +dict_id + Integer dictionary ID for the produced dictionary. Default is 0, which uses + a random value. +optimize + When true, test dictionary generation with multiple parameters. +level + Integer target compression level when testing compression with + ``optimize=True``. Default is 1. +steps + Number of steps through ``k`` values to perform when ``optimize=True``. + Default is 32. +threads + Number of threads to use when ``optimize=True``. Default is 0, which means + to use a single thread. A negative value can be specified to use as many + threads as there are detected logical CPUs. +notifications + Controls writing of informational messages to ``stderr``. See the + documentation for ``train_dictionary()`` for more. + Explicit Compression Parameters ------------------------------- @@ -904,6 +1093,267 @@ 100 byte inputs will be significant (pos whereas 10 1,000,000 byte inputs will be more similar in speed (because the time spent doing compression dwarfs time spent creating new *contexts*). +Buffer Types +------------ + +The API exposes a handful of custom types for interfacing with memory buffers. +The primary goal of these types is to facilitate efficient multi-object +operations. + +The essential idea is to have a single memory allocation provide backing +storage for multiple logical objects. This has 2 main advantages: fewer +allocations and optimal memory access patterns. This avoids having to allocate +a Python object for each logical object and furthermore ensures that access of +data for objects can be sequential (read: fast) in memory. + +BufferWithSegments +^^^^^^^^^^^^^^^^^^ + +The ``BufferWithSegments`` type represents a memory buffer containing N +discrete items of known lengths (segments). It is essentially a fixed size +memory address and an array of 2-tuples of ``(offset, length)`` 64-bit +unsigned native endian integers defining the byte offset and length of each +segment within the buffer. + +Instances behave like containers. + +``len()`` returns the number of segments within the instance. + +``o[index]`` or ``__getitem__`` obtains a ``BufferSegment`` representing an +individual segment within the backing buffer. That returned object references +(not copies) memory. This means that iterating all objects doesn't copy +data within the buffer. + +The ``.size`` attribute contains the total size in bytes of the backing +buffer. + +Instances conform to the buffer protocol. So a reference to the backing bytes +can be obtained via ``memoryview(o)``. A *copy* of the backing bytes can also +be obtained via ``.tobytes()``. + +The ``.segments`` attribute exposes the array of ``(offset, length)`` for +segments within the buffer. It is a ``BufferSegments`` type. + +BufferSegment +^^^^^^^^^^^^^ + +The ``BufferSegment`` type represents a segment within a ``BufferWithSegments``. +It is essentially a reference to N bytes within a ``BufferWithSegments``. + +``len()`` returns the length of the segment in bytes. + +``.offset`` contains the byte offset of this segment within its parent +``BufferWithSegments`` instance. + +The object conforms to the buffer protocol. ``.tobytes()`` can be called to +obtain a ``bytes`` instance with a copy of the backing bytes. + +BufferSegments +^^^^^^^^^^^^^^ + +This type represents an array of ``(offset, length)`` integers defining segments +within a ``BufferWithSegments``. + +The array members are 64-bit unsigned integers using host/native bit order. + +Instances conform to the buffer protocol. + +BufferWithSegmentsCollection +^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +The ``BufferWithSegmentsCollection`` type represents a virtual spanning view +of multiple ``BufferWithSegments`` instances. + +Instances are constructed from 1 or more ``BufferWithSegments`` instances. The +resulting object behaves like an ordered sequence whose members are the +segments within each ``BufferWithSegments``. + +``len()`` returns the number of segments within all ``BufferWithSegments`` +instances. + +``o[index]`` and ``__getitem__(index)`` return the ``BufferSegment`` at +that offset as if all ``BufferWithSegments`` instances were a single +entity. + +If the object is composed of 2 ``BufferWithSegments`` instances with the +first having 2 segments and the second have 3 segments, then ``b[0]`` +and ``b[1]`` access segments in the first object and ``b[2]``, ``b[3]``, +and ``b[4]`` access segments from the second. + +Choosing an API +=============== + +There are multiple APIs for performing compression and decompression. This is +because different applications have different needs and the library wants to +facilitate optimal use in as many use cases as possible. + +From a high-level, APIs are divided into *one-shot* and *streaming*. See +the ``Concepts`` section for a description of how these are different at +the C layer. + +The *one-shot* APIs are useful for small data, where the input or output +size is known. (The size can come from a buffer length, file size, or +stored in the zstd frame header.) A limitation of the *one-shot* APIs is that +input and output must fit in memory simultaneously. For say a 4 GB input, +this is often not feasible. + +The *one-shot* APIs also perform all work as a single operation. So, if you +feed it large input, it could take a long time for the function to return. + +The streaming APIs do not have the limitations of the simple API. But the +price you pay for this flexibility is that they are more complex than a +single function call. + +The streaming APIs put the caller in control of compression and decompression +behavior by allowing them to directly control either the input or output side +of the operation. + +With the *streaming input*, *compressor*, and *decompressor* APIs, the caller +has full control over the input to the compression or decompression stream. +They can directly choose when new data is operated on. + +With the *streaming ouput* APIs, the caller has full control over the output +of the compression or decompression stream. It can choose when to receive +new data. + +When using the *streaming* APIs that operate on file-like or stream objects, +it is important to consider what happens in that object when I/O is requested. +There is potential for long pauses as data is read or written from the +underlying stream (say from interacting with a filesystem or network). This +could add considerable overhead. + +Concepts +======== + +It is important to have a basic understanding of how Zstandard works in order +to optimally use this library. In addition, there are some low-level Python +concepts that are worth explaining to aid understanding. This section aims to +provide that knowledge. + +Zstandard Frames and Compression Format +--------------------------------------- + +Compressed zstandard data almost always exists within a container called a +*frame*. (For the technically curious, see the +`specification _.) + +The frame contains a header and optional trailer. The header contains a +magic number to self-identify as a zstd frame and a description of the +compressed data that follows. + +Among other things, the frame *optionally* contains the size of the +decompressed data the frame represents, a 32-bit checksum of the +decompressed data (to facilitate verification during decompression), +and the ID of the dictionary used to compress the data. + +Storing the original content size in the frame (``write_content_size=True`` +to ``ZstdCompressor``) is important for performance in some scenarios. Having +the decompressed size stored there (or storing it elsewhere) allows +decompression to perform a single memory allocation that is exactly sized to +the output. This is faster than continuously growing a memory buffer to hold +output. + +Compression and Decompression Contexts +-------------------------------------- + +In order to perform a compression or decompression operation with the zstd +C API, you need what's called a *context*. A context essentially holds +configuration and state for a compression or decompression operation. For +example, a compression context holds the configured compression level. + +Contexts can be reused for multiple operations. Since creating and +destroying contexts is not free, there are performance advantages to +reusing contexts. + +The ``ZstdCompressor`` and ``ZstdDecompressor`` types are essentially +wrappers around these contexts in the zstd C API. + +One-shot And Streaming Operations +--------------------------------- + +A compression or decompression operation can either be performed as a +single *one-shot* operation or as a continuous *streaming* operation. + +In one-shot mode (the *simple* APIs provided by the Python interface), +**all** input is handed to the compressor or decompressor as a single buffer +and **all** output is returned as a single buffer. + +In streaming mode, input is delivered to the compressor or decompressor as +a series of chunks via multiple function calls. Likewise, output is +obtained in chunks as well. + +Streaming operations require an additional *stream* object to be created +to track the operation. These are logical extensions of *context* +instances. + +There are advantages and disadvantages to each mode of operation. There +are scenarios where certain modes can't be used. See the +``Choosing an API`` section for more. + +Dictionaries +------------ + +A compression *dictionary* is essentially data used to seed the compressor +state so it can achieve better compression. The idea is that if you are +compressing a lot of similar pieces of data (e.g. JSON documents or anything +sharing similar structure), then you can find common patterns across multiple +objects then leverage those common patterns during compression and +decompression operations to achieve better compression ratios. + +Dictionary compression is generally only useful for small inputs - data no +larger than a few kilobytes. The upper bound on this range is highly dependent +on the input data and the dictionary. + +Python Buffer Protocol +---------------------- + +Many functions in the library operate on objects that implement Python's +`buffer protocol `_. + +The *buffer protocol* is an internal implementation detail of a Python +type that allows instances of that type (objects) to be exposed as a raw +pointer (or buffer) in the C API. In other words, it allows objects to be +exposed as an array of bytes. + +From the perspective of the C API, objects implementing the *buffer protocol* +all look the same: they are just a pointer to a memory address of a defined +length. This allows the C API to be largely type agnostic when accessing their +data. This allows custom types to be passed in without first converting them +to a specific type. + +Many Python types implement the buffer protocol. These include ``bytes`` +(``str`` on Python 2), ``bytearray``, ``array.array``, ``io.BytesIO``, +``mmap.mmap``, and ``memoryview``. + +``python-zstandard`` APIs that accept objects conforming to the buffer +protocol require that the buffer is *C contiguous* and has a single +dimension (``ndim==1``). This is usually the case. An example of where it +is not is a Numpy matrix type. + +Requiring Output Sizes for Non-Streaming Decompression APIs +----------------------------------------------------------- + +Non-streaming decompression APIs require that either the output size is +explicitly defined (either in the zstd frame header or passed into the +function) or that a max output size is specified. This restriction is for +your safety. + +The *one-shot* decompression APIs store the decompressed result in a +single buffer. This means that a buffer needs to be pre-allocated to hold +the result. If the decompressed size is not known, then there is no universal +good default size to use. Any default will fail or will be highly sub-optimal +in some scenarios (it will either be too small or will put stress on the +memory allocator to allocate a too large block). + +A *helpful* API may retry decompression with buffers of increasing size. +While useful, there are obvious performance disadvantages, namely redoing +decompression N times until it works. In addition, there is a security +concern. Say the input came from highly compressible data, like 1 GB of the +same byte value. The output size could be several magnitudes larger than the +input size. An input of <100KB could decompress to >1GB. Without a bounds +restriction on the decompressed size, certain inputs could exhaust all system +memory. That's not good and is why the maximum output size is limited. + Note on Zstandard's *Experimental* API ====================================== diff --git a/contrib/python-zstandard/c-ext/bufferutil.c b/contrib/python-zstandard/c-ext/bufferutil.c new file mode 100644 --- /dev/null +++ b/contrib/python-zstandard/c-ext/bufferutil.c @@ -0,0 +1,770 @@ +/** +* Copyright (c) 2017-present, Gregory Szorc +* All rights reserved. +* +* This software may be modified and distributed under the terms +* of the BSD license. See the LICENSE file for details. +*/ + +#include "python-zstandard.h" + +extern PyObject* ZstdError; + +PyDoc_STRVAR(BufferWithSegments__doc__, +"BufferWithSegments - A memory buffer holding known sub-segments.\n" +"\n" +"This type represents a contiguous chunk of memory containing N discrete\n" +"items within sub-segments of that memory.\n" +"\n" +"Segments within the buffer are stored as an array of\n" +"``(offset, length)`` pairs, where each element is an unsigned 64-bit\n" +"integer using the host/native bit order representation.\n" +"\n" +"The type exists to facilitate operations against N>1 items without the\n" +"overhead of Python object creation and management.\n" +); + +static void BufferWithSegments_dealloc(ZstdBufferWithSegments* self) { + /* Backing memory is either canonically owned by a Py_buffer or by us. */ + if (self->parent.buf) { + PyBuffer_Release(&self->parent); + } + else if (self->useFree) { + free(self->data); + } + else { + PyMem_Free(self->data); + } + + self->data = NULL; + + if (self->useFree) { + free(self->segments); + } + else { + PyMem_Free(self->segments); + } + + self->segments = NULL; + + PyObject_Del(self); +} + +static int BufferWithSegments_init(ZstdBufferWithSegments* self, PyObject* args, PyObject* kwargs) { + static char* kwlist[] = { + "data", + "segments", + NULL + }; + + Py_buffer segments; + Py_ssize_t segmentCount; + Py_ssize_t i; + + memset(&self->parent, 0, sizeof(self->parent)); + +#if PY_MAJOR_VERSION >= 3 + if (!PyArg_ParseTupleAndKeywords(args, kwargs, "y*y*:BufferWithSegments", +#else + if (!PyArg_ParseTupleAndKeywords(args, kwargs, "s*s*:BufferWithSegments", +#endif + kwlist, &self->parent, &segments)) { + return -1; + } + + if (!PyBuffer_IsContiguous(&self->parent, 'C') || self->parent.ndim > 1) { + PyErr_SetString(PyExc_ValueError, "data buffer should be contiguous and have a single dimension"); + goto except; + } + + if (!PyBuffer_IsContiguous(&segments, 'C') || segments.ndim > 1) { + PyErr_SetString(PyExc_ValueError, "segments buffer should be contiguous and have a single dimension"); + goto except; + } + + if (segments.len % sizeof(BufferSegment)) { + PyErr_Format(PyExc_ValueError, "segments array size is not a multiple of %lu", + sizeof(BufferSegment)); + goto except; + } + + segmentCount = segments.len / sizeof(BufferSegment); + + /* Validate segments data, as blindly trusting it could lead to arbitrary + memory access. */ + for (i = 0; i < segmentCount; i++) { + BufferSegment* segment = &((BufferSegment*)(segments.buf))[i]; + + if (segment->offset + segment->length > (unsigned long long)self->parent.len) { + PyErr_SetString(PyExc_ValueError, "offset within segments array references memory outside buffer"); + goto except; + return -1; + } + } + + /* Make a copy of the segments data. It is cheap to do so and is a guard + against caller changing offsets, which has security implications. */ + self->segments = PyMem_Malloc(segments.len); + if (!self->segments) { + PyErr_NoMemory(); + goto except; + } + + memcpy(self->segments, segments.buf, segments.len); + PyBuffer_Release(&segments); + + self->data = self->parent.buf; + self->dataSize = self->parent.len; + self->segmentCount = segmentCount; + + return 0; + +except: + PyBuffer_Release(&self->parent); + PyBuffer_Release(&segments); + return -1; +}; + +/** + * Construct a BufferWithSegments from existing memory and offsets. + * + * Ownership of the backing memory and BufferSegments will be transferred to + * the created object and freed when the BufferWithSegments is destroyed. + */ +ZstdBufferWithSegments* BufferWithSegments_FromMemory(void* data, unsigned long long dataSize, + BufferSegment* segments, Py_ssize_t segmentsSize) { + ZstdBufferWithSegments* result = NULL; + Py_ssize_t i; + + if (NULL == data) { + PyErr_SetString(PyExc_ValueError, "data is NULL"); + return NULL; + } + + if (NULL == segments) { + PyErr_SetString(PyExc_ValueError, "segments is NULL"); + return NULL; + } + + for (i = 0; i < segmentsSize; i++) { + BufferSegment* segment = &segments[i]; + + if (segment->offset + segment->length > dataSize) { + PyErr_SetString(PyExc_ValueError, "offset in segments overflows buffer size"); + return NULL; + } + } + + result = PyObject_New(ZstdBufferWithSegments, &ZstdBufferWithSegmentsType); + if (NULL == result) { + return NULL; + } + + result->useFree = 0; + + memset(&result->parent, 0, sizeof(result->parent)); + result->data = data; + result->dataSize = dataSize; + result->segments = segments; + result->segmentCount = segmentsSize; + + return result; +} + +static Py_ssize_t BufferWithSegments_length(ZstdBufferWithSegments* self) { + return self->segmentCount; +} + +static ZstdBufferSegment* BufferWithSegments_item(ZstdBufferWithSegments* self, Py_ssize_t i) { + ZstdBufferSegment* result = NULL; + + if (i < 0) { + PyErr_SetString(PyExc_IndexError, "offset must be non-negative"); + return NULL; + } + + if (i >= self->segmentCount) { + PyErr_Format(PyExc_IndexError, "offset must be less than %zd", self->segmentCount); + return NULL; + } + + result = (ZstdBufferSegment*)PyObject_CallObject((PyObject*)&ZstdBufferSegmentType, NULL); + if (NULL == result) { + return NULL; + } + + result->parent = (PyObject*)self; + Py_INCREF(self); + + result->data = (char*)self->data + self->segments[i].offset; + result->dataSize = self->segments[i].length; + result->offset = self->segments[i].offset; + + return result; +} + +#if PY_MAJOR_VERSION >= 3 +static int BufferWithSegments_getbuffer(ZstdBufferWithSegments* self, Py_buffer* view, int flags) { + return PyBuffer_FillInfo(view, (PyObject*)self, self->data, self->dataSize, 1, flags); +} +#else +static Py_ssize_t BufferWithSegments_getreadbuffer(ZstdBufferWithSegments* self, Py_ssize_t segment, void **ptrptr) { + if (segment != 0) { + PyErr_SetString(PyExc_ValueError, "segment number must be 0"); + return -1; + } + + *ptrptr = self->data; + return self->dataSize; +} + +static Py_ssize_t BufferWithSegments_getsegcount(ZstdBufferWithSegments* self, Py_ssize_t* len) { + if (len) { + *len = 1; + } + + return 1; +} +#endif + +PyDoc_STRVAR(BufferWithSegments_tobytes__doc__, +"Obtain a bytes instance for this buffer.\n" +); + +static PyObject* BufferWithSegments_tobytes(ZstdBufferWithSegments* self) { + return PyBytes_FromStringAndSize(self->data, self->dataSize); +} + +PyDoc_STRVAR(BufferWithSegments_segments__doc__, +"Obtain a BufferSegments describing segments in this sintance.\n" +); + +static ZstdBufferSegments* BufferWithSegments_segments(ZstdBufferWithSegments* self) { + ZstdBufferSegments* result = (ZstdBufferSegments*)PyObject_CallObject((PyObject*)&ZstdBufferSegmentsType, NULL); + if (NULL == result) { + return NULL; + } + + result->parent = (PyObject*)self; + Py_INCREF(self); + result->segments = self->segments; + result->segmentCount = self->segmentCount; + + return result; +} + +static PySequenceMethods BufferWithSegments_sq = { + (lenfunc)BufferWithSegments_length, /* sq_length */ + 0, /* sq_concat */ + 0, /* sq_repeat */ + (ssizeargfunc)BufferWithSegments_item, /* sq_item */ + 0, /* sq_ass_item */ + 0, /* sq_contains */ + 0, /* sq_inplace_concat */ + 0 /* sq_inplace_repeat */ +}; + +static PyBufferProcs BufferWithSegments_as_buffer = { +#if PY_MAJOR_VERSION >= 3 + (getbufferproc)BufferWithSegments_getbuffer, /* bf_getbuffer */ + 0 /* bf_releasebuffer */ +#else + (readbufferproc)BufferWithSegments_getreadbuffer, /* bf_getreadbuffer */ + 0, /* bf_getwritebuffer */ + (segcountproc)BufferWithSegments_getsegcount, /* bf_getsegcount */ + 0 /* bf_getcharbuffer */ +#endif +}; + +static PyMethodDef BufferWithSegments_methods[] = { + { "segments", (PyCFunction)BufferWithSegments_segments, + METH_NOARGS, BufferWithSegments_segments__doc__ }, + { "tobytes", (PyCFunction)BufferWithSegments_tobytes, + METH_NOARGS, BufferWithSegments_tobytes__doc__ }, + { NULL, NULL } +}; + +static PyMemberDef BufferWithSegments_members[] = { + { "size", T_ULONGLONG, offsetof(ZstdBufferWithSegments, dataSize), + READONLY, "total size of the buffer in bytes" }, + { NULL } +}; + +PyTypeObject ZstdBufferWithSegmentsType = { + PyVarObject_HEAD_INIT(NULL, 0) + "zstd.BufferWithSegments", /* tp_name */ + sizeof(ZstdBufferWithSegments),/* tp_basicsize */ + 0, /* tp_itemsize */ + (destructor)BufferWithSegments_dealloc, /* tp_dealloc */ + 0, /* tp_print */ + 0, /* tp_getattr */ + 0, /* tp_setattr */ + 0, /* tp_compare */ + 0, /* tp_repr */ + 0, /* tp_as_number */ + &BufferWithSegments_sq, /* tp_as_sequence */ + 0, /* tp_as_mapping */ + 0, /* tp_hash */ + 0, /* tp_call */ + 0, /* tp_str */ + 0, /* tp_getattro */ + 0, /* tp_setattro */ + &BufferWithSegments_as_buffer, /* tp_as_buffer */ + Py_TPFLAGS_DEFAULT, /* tp_flags */ + BufferWithSegments__doc__, /* tp_doc */ + 0, /* tp_traverse */ + 0, /* tp_clear */ + 0, /* tp_richcompare */ + 0, /* tp_weaklistoffset */ + 0, /* tp_iter */ + 0, /* tp_iternext */ + BufferWithSegments_methods, /* tp_methods */ + BufferWithSegments_members, /* tp_members */ + 0, /* tp_getset */ + 0, /* tp_base */ + 0, /* tp_dict */ + 0, /* tp_descr_get */ + 0, /* tp_descr_set */ + 0, /* tp_dictoffset */ + (initproc)BufferWithSegments_init, /* tp_init */ + 0, /* tp_alloc */ + PyType_GenericNew, /* tp_new */ +}; + +PyDoc_STRVAR(BufferSegments__doc__, +"BufferSegments - Represents segments/offsets within a BufferWithSegments\n" +); + +static void BufferSegments_dealloc(ZstdBufferSegments* self) { + Py_CLEAR(self->parent); + PyObject_Del(self); +} + +#if PY_MAJOR_VERSION >= 3 +static int BufferSegments_getbuffer(ZstdBufferSegments* self, Py_buffer* view, int flags) { + return PyBuffer_FillInfo(view, (PyObject*)self, + (void*)self->segments, self->segmentCount * sizeof(BufferSegment), + 1, flags); +} +#else +static Py_ssize_t BufferSegments_getreadbuffer(ZstdBufferSegments* self, Py_ssize_t segment, void **ptrptr) { + if (segment != 0) { + PyErr_SetString(PyExc_ValueError, "segment number must be 0"); + return -1; + } + + *ptrptr = (void*)self->segments; + return self->segmentCount * sizeof(BufferSegment); +} + +static Py_ssize_t BufferSegments_getsegcount(ZstdBufferSegments* self, Py_ssize_t* len) { + if (len) { + *len = 1; + } + + return 1; +} +#endif + +static PyBufferProcs BufferSegments_as_buffer = { +#if PY_MAJOR_VERSION >= 3 + (getbufferproc)BufferSegments_getbuffer, + 0 +#else + (readbufferproc)BufferSegments_getreadbuffer, + 0, + (segcountproc)BufferSegments_getsegcount, + 0 +#endif +}; + +PyTypeObject ZstdBufferSegmentsType = { + PyVarObject_HEAD_INIT(NULL, 0) + "zstd.BufferSegments", /* tp_name */ + sizeof(ZstdBufferSegments),/* tp_basicsize */ + 0, /* tp_itemsize */ + (destructor)BufferSegments_dealloc, /* tp_dealloc */ + 0, /* tp_print */ + 0, /* tp_getattr */ + 0, /* tp_setattr */ + 0, /* tp_compare */ + 0, /* tp_repr */ + 0, /* tp_as_number */ + 0, /* tp_as_sequence */ + 0, /* tp_as_mapping */ + 0, /* tp_hash */ + 0, /* tp_call */ + 0, /* tp_str */ + 0, /* tp_getattro */ + 0, /* tp_setattro */ + &BufferSegments_as_buffer, /* tp_as_buffer */ + Py_TPFLAGS_DEFAULT, /* tp_flags */ + BufferSegments__doc__, /* tp_doc */ + 0, /* tp_traverse */ + 0, /* tp_clear */ + 0, /* tp_richcompare */ + 0, /* tp_weaklistoffset */ + 0, /* tp_iter */ + 0, /* tp_iternext */ + 0, /* tp_methods */ + 0, /* tp_members */ + 0, /* tp_getset */ + 0, /* tp_base */ + 0, /* tp_dict */ + 0, /* tp_descr_get */ + 0, /* tp_descr_set */ + 0, /* tp_dictoffset */ + 0, /* tp_init */ + 0, /* tp_alloc */ + PyType_GenericNew, /* tp_new */ +}; + +PyDoc_STRVAR(BufferSegment__doc__, + "BufferSegment - Represents a segment within a BufferWithSegments\n" +); + +static void BufferSegment_dealloc(ZstdBufferSegment* self) { + Py_CLEAR(self->parent); + PyObject_Del(self); +} + +static Py_ssize_t BufferSegment_length(ZstdBufferSegment* self) { + return self->dataSize; +} + +#if PY_MAJOR_VERSION >= 3 +static int BufferSegment_getbuffer(ZstdBufferSegment* self, Py_buffer* view, int flags) { + return PyBuffer_FillInfo(view, (PyObject*)self, + self->data, self->dataSize, 1, flags); +} +#else +static Py_ssize_t BufferSegment_getreadbuffer(ZstdBufferSegment* self, Py_ssize_t segment, void **ptrptr) { + if (segment != 0) { + PyErr_SetString(PyExc_ValueError, "segment number must be 0"); + return -1; + } + + *ptrptr = self->data; + return self->dataSize; +} + +static Py_ssize_t BufferSegment_getsegcount(ZstdBufferSegment* self, Py_ssize_t* len) { + if (len) { + *len = 1; + } + + return 1; +} +#endif + +PyDoc_STRVAR(BufferSegment_tobytes__doc__, +"Obtain a bytes instance for this segment.\n" +); + +static PyObject* BufferSegment_tobytes(ZstdBufferSegment* self) { + return PyBytes_FromStringAndSize(self->data, self->dataSize); +} + +static PySequenceMethods BufferSegment_sq = { + (lenfunc)BufferSegment_length, /* sq_length */ + 0, /* sq_concat */ + 0, /* sq_repeat */ + 0, /* sq_item */ + 0, /* sq_ass_item */ + 0, /* sq_contains */ + 0, /* sq_inplace_concat */ + 0 /* sq_inplace_repeat */ +}; + +static PyBufferProcs BufferSegment_as_buffer = { +#if PY_MAJOR_VERSION >= 3 + (getbufferproc)BufferSegment_getbuffer, + 0 +#else + (readbufferproc)BufferSegment_getreadbuffer, + 0, + (segcountproc)BufferSegment_getsegcount, + 0 +#endif +}; + +static PyMethodDef BufferSegment_methods[] = { + { "tobytes", (PyCFunction)BufferSegment_tobytes, + METH_NOARGS, BufferSegment_tobytes__doc__ }, + { NULL, NULL } +}; + +static PyMemberDef BufferSegment_members[] = { + { "offset", T_ULONGLONG, offsetof(ZstdBufferSegment, offset), READONLY, + "offset of segment within parent buffer" }, + { NULL } +}; + +PyTypeObject ZstdBufferSegmentType = { + PyVarObject_HEAD_INIT(NULL, 0) + "zstd.BufferSegment", /* tp_name */ + sizeof(ZstdBufferSegment),/* tp_basicsize */ + 0, /* tp_itemsize */ + (destructor)BufferSegment_dealloc, /* tp_dealloc */ + 0, /* tp_print */ + 0, /* tp_getattr */ + 0, /* tp_setattr */ + 0, /* tp_compare */ + 0, /* tp_repr */ + 0, /* tp_as_number */ + &BufferSegment_sq, /* tp_as_sequence */ + 0, /* tp_as_mapping */ + 0, /* tp_hash */ + 0, /* tp_call */ + 0, /* tp_str */ + 0, /* tp_getattro */ + 0, /* tp_setattro */ + &BufferSegment_as_buffer, /* tp_as_buffer */ + Py_TPFLAGS_DEFAULT, /* tp_flags */ + BufferSegment__doc__, /* tp_doc */ + 0, /* tp_traverse */ + 0, /* tp_clear */ + 0, /* tp_richcompare */ + 0, /* tp_weaklistoffset */ + 0, /* tp_iter */ + 0, /* tp_iternext */ + BufferSegment_methods, /* tp_methods */ + BufferSegment_members, /* tp_members */ + 0, /* tp_getset */ + 0, /* tp_base */ + 0, /* tp_dict */ + 0, /* tp_descr_get */ + 0, /* tp_descr_set */ + 0, /* tp_dictoffset */ + 0, /* tp_init */ + 0, /* tp_alloc */ + PyType_GenericNew, /* tp_new */ +}; + +PyDoc_STRVAR(BufferWithSegmentsCollection__doc__, +"Represents a collection of BufferWithSegments.\n" +); + +static void BufferWithSegmentsCollection_dealloc(ZstdBufferWithSegmentsCollection* self) { + Py_ssize_t i; + + if (self->firstElements) { + PyMem_Free(self->firstElements); + self->firstElements = NULL; + } + + if (self->buffers) { + for (i = 0; i < self->bufferCount; i++) { + Py_CLEAR(self->buffers[i]); + } + + PyMem_Free(self->buffers); + self->buffers = NULL; + } + + PyObject_Del(self); +} + +static int BufferWithSegmentsCollection_init(ZstdBufferWithSegmentsCollection* self, PyObject* args) { + Py_ssize_t size; + Py_ssize_t i; + Py_ssize_t offset = 0; + + size = PyTuple_Size(args); + if (-1 == size) { + return -1; + } + + if (0 == size) { + PyErr_SetString(PyExc_ValueError, "must pass at least 1 argument"); + return -1; + } + + for (i = 0; i < size; i++) { + PyObject* item = PyTuple_GET_ITEM(args, i); + if (!PyObject_TypeCheck(item, &ZstdBufferWithSegmentsType)) { + PyErr_SetString(PyExc_TypeError, "arguments must be BufferWithSegments instances"); + return -1; + } + + if (0 == ((ZstdBufferWithSegments*)item)->segmentCount || + 0 == ((ZstdBufferWithSegments*)item)->dataSize) { + PyErr_SetString(PyExc_ValueError, "ZstdBufferWithSegments cannot be empty"); + return -1; + } + } + + self->buffers = PyMem_Malloc(size * sizeof(ZstdBufferWithSegments*)); + if (NULL == self->buffers) { + PyErr_NoMemory(); + return -1; + } + + self->firstElements = PyMem_Malloc(size * sizeof(Py_ssize_t)); + if (NULL == self->firstElements) { + PyMem_Free(self->buffers); + self->buffers = NULL; + PyErr_NoMemory(); + return -1; + } + + self->bufferCount = size; + + for (i = 0; i < size; i++) { + ZstdBufferWithSegments* item = (ZstdBufferWithSegments*)PyTuple_GET_ITEM(args, i); + + self->buffers[i] = item; + Py_INCREF(item); + + if (i > 0) { + self->firstElements[i - 1] = offset; + } + + offset += item->segmentCount; + } + + self->firstElements[size - 1] = offset; + + return 0; +} + +static PyObject* BufferWithSegmentsCollection_size(ZstdBufferWithSegmentsCollection* self) { + Py_ssize_t i; + Py_ssize_t j; + unsigned long long size = 0; + + for (i = 0; i < self->bufferCount; i++) { + for (j = 0; j < self->buffers[i]->segmentCount; j++) { + size += self->buffers[i]->segments[j].length; + } + } + + return PyLong_FromUnsignedLongLong(size); +} + +Py_ssize_t BufferWithSegmentsCollection_length(ZstdBufferWithSegmentsCollection* self) { + return self->firstElements[self->bufferCount - 1]; +} + +static ZstdBufferSegment* BufferWithSegmentsCollection_item(ZstdBufferWithSegmentsCollection* self, Py_ssize_t i) { + Py_ssize_t bufferOffset; + + if (i < 0) { + PyErr_SetString(PyExc_IndexError, "offset must be non-negative"); + return NULL; + } + + if (i >= BufferWithSegmentsCollection_length(self)) { + PyErr_Format(PyExc_IndexError, "offset must be less than %zd", + BufferWithSegmentsCollection_length(self)); + return NULL; + } + + for (bufferOffset = 0; bufferOffset < self->bufferCount; bufferOffset++) { + Py_ssize_t offset = 0; + + if (i < self->firstElements[bufferOffset]) { + if (bufferOffset > 0) { + offset = self->firstElements[bufferOffset - 1]; + } + + return BufferWithSegments_item(self->buffers[bufferOffset], i - offset); + } + } + + PyErr_SetString(ZstdError, "error resolving segment; this should not happen"); + return NULL; +} + +static PySequenceMethods BufferWithSegmentsCollection_sq = { + (lenfunc)BufferWithSegmentsCollection_length, /* sq_length */ + 0, /* sq_concat */ + 0, /* sq_repeat */ + (ssizeargfunc)BufferWithSegmentsCollection_item, /* sq_item */ + 0, /* sq_ass_item */ + 0, /* sq_contains */ + 0, /* sq_inplace_concat */ + 0 /* sq_inplace_repeat */ +}; + +static PyMethodDef BufferWithSegmentsCollection_methods[] = { + { "size", (PyCFunction)BufferWithSegmentsCollection_size, + METH_NOARGS, PyDoc_STR("total size in bytes of all segments") }, + { NULL, NULL } +}; + +PyTypeObject ZstdBufferWithSegmentsCollectionType = { + PyVarObject_HEAD_INIT(NULL, 0) + "zstd.BufferWithSegmentsCollection", /* tp_name */ + sizeof(ZstdBufferWithSegmentsCollection),/* tp_basicsize */ + 0, /* tp_itemsize */ + (destructor)BufferWithSegmentsCollection_dealloc, /* tp_dealloc */ + 0, /* tp_print */ + 0, /* tp_getattr */ + 0, /* tp_setattr */ + 0, /* tp_compare */ + 0, /* tp_repr */ + 0, /* tp_as_number */ + &BufferWithSegmentsCollection_sq, /* tp_as_sequence */ + 0, /* tp_as_mapping */ + 0, /* tp_hash */ + 0, /* tp_call */ + 0, /* tp_str */ + 0, /* tp_getattro */ + 0, /* tp_setattro */ + 0, /* tp_as_buffer */ + Py_TPFLAGS_DEFAULT, /* tp_flags */ + BufferWithSegmentsCollection__doc__, /* tp_doc */ + 0, /* tp_traverse */ + 0, /* tp_clear */ + 0, /* tp_richcompare */ + 0, /* tp_weaklistoffset */ + /* TODO implement iterator for performance. */ + 0, /* tp_iter */ + 0, /* tp_iternext */ + BufferWithSegmentsCollection_methods, /* tp_methods */ + 0, /* tp_members */ + 0, /* tp_getset */ + 0, /* tp_base */ + 0, /* tp_dict */ + 0, /* tp_descr_get */ + 0, /* tp_descr_set */ + 0, /* tp_dictoffset */ + (initproc)BufferWithSegmentsCollection_init, /* tp_init */ + 0, /* tp_alloc */ + PyType_GenericNew, /* tp_new */ +}; + +void bufferutil_module_init(PyObject* mod) { + Py_TYPE(&ZstdBufferWithSegmentsType) = &PyType_Type; + if (PyType_Ready(&ZstdBufferWithSegmentsType) < 0) { + return; + } + + Py_INCREF(&ZstdBufferWithSegmentsType); + PyModule_AddObject(mod, "BufferWithSegments", (PyObject*)&ZstdBufferWithSegmentsType); + + Py_TYPE(&ZstdBufferSegmentsType) = &PyType_Type; + if (PyType_Ready(&ZstdBufferSegmentsType) < 0) { + return; + } + + Py_INCREF(&ZstdBufferSegmentsType); + PyModule_AddObject(mod, "BufferSegments", (PyObject*)&ZstdBufferSegmentsType); + + Py_TYPE(&ZstdBufferSegmentType) = &PyType_Type; + if (PyType_Ready(&ZstdBufferSegmentType) < 0) { + return; + } + + Py_INCREF(&ZstdBufferSegmentType); + PyModule_AddObject(mod, "BufferSegment", (PyObject*)&ZstdBufferSegmentType); + + Py_TYPE(&ZstdBufferWithSegmentsCollectionType) = &PyType_Type; + if (PyType_Ready(&ZstdBufferWithSegmentsCollectionType) < 0) { + return; + } + + Py_INCREF(&ZstdBufferWithSegmentsCollectionType); + PyModule_AddObject(mod, "BufferWithSegmentsCollection", (PyObject*)&ZstdBufferWithSegmentsCollectionType); +} diff --git a/contrib/python-zstandard/c-ext/compressiondict.c b/contrib/python-zstandard/c-ext/compressiondict.c --- a/contrib/python-zstandard/c-ext/compressiondict.c +++ b/contrib/python-zstandard/c-ext/compressiondict.c @@ -11,46 +11,48 @@ extern PyObject* ZstdError; ZstdCompressionDict* train_dictionary(PyObject* self, PyObject* args, PyObject* kwargs) { - static char *kwlist[] = { "dict_size", "samples", "parameters", NULL }; + static char* kwlist[] = { + "dict_size", + "samples", + "selectivity", + "level", + "notifications", + "dict_id", + NULL + }; size_t capacity; PyObject* samples; Py_ssize_t samplesLen; - PyObject* parameters = NULL; + unsigned selectivity = 0; + int level = 0; + unsigned notifications = 0; + unsigned dictID = 0; ZDICT_params_t zparams; Py_ssize_t sampleIndex; Py_ssize_t sampleSize; PyObject* sampleItem; size_t zresult; - void* sampleBuffer; + void* sampleBuffer = NULL; void* sampleOffset; size_t samplesSize = 0; - size_t* sampleSizes; - void* dict; - ZstdCompressionDict* result; + size_t* sampleSizes = NULL; + void* dict = NULL; + ZstdCompressionDict* result = NULL; - if (!PyArg_ParseTupleAndKeywords(args, kwargs, "nO!|O!:train_dictionary", + if (!PyArg_ParseTupleAndKeywords(args, kwargs, "nO!|IiII:train_dictionary", kwlist, &capacity, &PyList_Type, &samples, - (PyObject*)&DictParametersType, ¶meters)) { + &selectivity, &level, ¬ifications, &dictID)) { return NULL; } - /* Validate parameters first since it is easiest. */ - zparams.selectivityLevel = 0; - zparams.compressionLevel = 0; - zparams.notificationLevel = 0; - zparams.dictID = 0; - zparams.reserved[0] = 0; - zparams.reserved[1] = 0; + memset(&zparams, 0, sizeof(zparams)); - if (parameters) { - /* TODO validate data ranges */ - zparams.selectivityLevel = PyLong_AsUnsignedLong(PyTuple_GetItem(parameters, 0)); - zparams.compressionLevel = PyLong_AsLong(PyTuple_GetItem(parameters, 1)); - zparams.notificationLevel = PyLong_AsUnsignedLong(PyTuple_GetItem(parameters, 2)); - zparams.dictID = PyLong_AsUnsignedLong(PyTuple_GetItem(parameters, 3)); - } + zparams.selectivityLevel = selectivity; + zparams.compressionLevel = level; + zparams.notificationLevel = notifications; + zparams.dictID = dictID; /* Figure out the size of the raw samples */ samplesLen = PyList_Size(samples); @@ -68,13 +70,12 @@ ZstdCompressionDict* train_dictionary(Py sampleBuffer = PyMem_Malloc(samplesSize); if (!sampleBuffer) { PyErr_NoMemory(); - return NULL; + goto finally; } sampleSizes = PyMem_Malloc(samplesLen * sizeof(size_t)); if (!sampleSizes) { - PyMem_Free(sampleBuffer); PyErr_NoMemory(); - return NULL; + goto finally; } sampleOffset = sampleBuffer; @@ -89,33 +90,168 @@ ZstdCompressionDict* train_dictionary(Py dict = PyMem_Malloc(capacity); if (!dict) { - PyMem_Free(sampleSizes); - PyMem_Free(sampleBuffer); PyErr_NoMemory(); - return NULL; + goto finally; } + /* TODO consider using dup2() to redirect zstd's stderr writing to a buffer */ + Py_BEGIN_ALLOW_THREADS zresult = ZDICT_trainFromBuffer_advanced(dict, capacity, sampleBuffer, sampleSizes, (unsigned int)samplesLen, zparams); + Py_END_ALLOW_THREADS if (ZDICT_isError(zresult)) { PyErr_Format(ZstdError, "Cannot train dict: %s", ZDICT_getErrorName(zresult)); PyMem_Free(dict); - PyMem_Free(sampleSizes); - PyMem_Free(sampleBuffer); - return NULL; + goto finally; } result = PyObject_New(ZstdCompressionDict, &ZstdCompressionDictType); if (!result) { - return NULL; + goto finally; } result->dictData = dict; result->dictSize = zresult; + result->d = 0; + result->k = 0; + +finally: + PyMem_Free(sampleBuffer); + PyMem_Free(sampleSizes); + return result; } +ZstdCompressionDict* train_cover_dictionary(PyObject* self, PyObject* args, PyObject* kwargs) { + static char* kwlist[] = { + "dict_size", + "samples", + "k", + "d", + "notifications", + "dict_id", + "level", + "optimize", + "steps", + "threads", + NULL + }; + + size_t capacity; + PyObject* samples; + unsigned k = 0; + unsigned d = 0; + unsigned notifications = 0; + unsigned dictID = 0; + int level = 0; + PyObject* optimize = NULL; + unsigned steps = 0; + int threads = 0; + COVER_params_t params; + Py_ssize_t samplesLen; + Py_ssize_t i; + size_t samplesSize = 0; + void* sampleBuffer = NULL; + size_t* sampleSizes = NULL; + void* sampleOffset; + Py_ssize_t sampleSize; + void* dict = NULL; + size_t zresult; + ZstdCompressionDict* result = NULL; + + if (!PyArg_ParseTupleAndKeywords(args, kwargs, "nO!|IIIIiOIi:train_cover_dictionary", + kwlist, &capacity, &PyList_Type, &samples, + &k, &d, ¬ifications, &dictID, &level, &optimize, &steps, &threads)) { + return NULL; + } + + if (threads < 0) { + threads = cpu_count(); + } + + memset(¶ms, 0, sizeof(params)); + params.k = k; + params.d = d; + params.steps = steps; + params.nbThreads = threads; + params.notificationLevel = notifications; + params.dictID = dictID; + params.compressionLevel = level; + + /* Figure out total size of input samples. */ + samplesLen = PyList_Size(samples); + for (i = 0; i < samplesLen; i++) { + PyObject* sampleItem = PyList_GET_ITEM(samples, i); + + if (!PyBytes_Check(sampleItem)) { + PyErr_SetString(PyExc_ValueError, "samples must be bytes"); + return NULL; + } + samplesSize += PyBytes_GET_SIZE(sampleItem); + } + + sampleBuffer = PyMem_Malloc(samplesSize); + if (!sampleBuffer) { + PyErr_NoMemory(); + goto finally; + } + + sampleSizes = PyMem_Malloc(samplesLen * sizeof(size_t)); + if (!sampleSizes) { + PyErr_NoMemory(); + goto finally; + } + + sampleOffset = sampleBuffer; + for (i = 0; i < samplesLen; i++) { + PyObject* sampleItem = PyList_GET_ITEM(samples, i); + sampleSize = PyBytes_GET_SIZE(sampleItem); + sampleSizes[i] = sampleSize; + memcpy(sampleOffset, PyBytes_AS_STRING(sampleItem), sampleSize); + sampleOffset = (char*)sampleOffset + sampleSize; + } + + dict = PyMem_Malloc(capacity); + if (!dict) { + PyErr_NoMemory(); + goto finally; + } + + Py_BEGIN_ALLOW_THREADS + if (optimize && PyObject_IsTrue(optimize)) { + zresult = COVER_optimizeTrainFromBuffer(dict, capacity, + sampleBuffer, sampleSizes, (unsigned)samplesLen, ¶ms); + } + else { + zresult = COVER_trainFromBuffer(dict, capacity, + sampleBuffer, sampleSizes, (unsigned)samplesLen, params); + } + Py_END_ALLOW_THREADS + + if (ZDICT_isError(zresult)) { + PyMem_Free(dict); + PyErr_Format(ZstdError, "cannot train dict: %s", ZDICT_getErrorName(zresult)); + goto finally; + } + + result = PyObject_New(ZstdCompressionDict, &ZstdCompressionDictType); + if (!result) { + PyMem_Free(dict); + goto finally; + } + + result->dictData = dict; + result->dictSize = zresult; + result->d = params.d; + result->k = params.k; + +finally: + PyMem_Free(sampleBuffer); + PyMem_Free(sampleSizes); + + return result; +} PyDoc_STRVAR(ZstdCompressionDict__doc__, "ZstdCompressionDict(data) - Represents a computed compression dictionary\n" @@ -180,6 +316,14 @@ static PyMethodDef ZstdCompressionDict_m { NULL, NULL } }; +static PyMemberDef ZstdCompressionDict_members[] = { + { "k", T_UINT, offsetof(ZstdCompressionDict, k), READONLY, + "segment size" }, + { "d", T_UINT, offsetof(ZstdCompressionDict, d), READONLY, + "dmer size" }, + { NULL } +}; + static Py_ssize_t ZstdCompressionDict_length(ZstdCompressionDict* self) { return self->dictSize; } @@ -224,7 +368,7 @@ PyTypeObject ZstdCompressionDictType = { 0, /* tp_iter */ 0, /* tp_iternext */ ZstdCompressionDict_methods, /* tp_methods */ - 0, /* tp_members */ + ZstdCompressionDict_members, /* tp_members */ 0, /* tp_getset */ 0, /* tp_base */ 0, /* tp_dict */ diff --git a/contrib/python-zstandard/c-ext/compressionparams.c b/contrib/python-zstandard/c-ext/compressionparams.c --- a/contrib/python-zstandard/c-ext/compressionparams.c +++ b/contrib/python-zstandard/c-ext/compressionparams.c @@ -67,6 +67,8 @@ static int CompressionParameters_init(Co unsigned searchLength; unsigned targetLength; unsigned strategy; + ZSTD_compressionParameters params; + size_t zresult; if (!PyArg_ParseTupleAndKeywords(args, kwargs, "IIIIIII:CompressionParameters", kwlist, &windowLog, &chainLog, &hashLog, &searchLog, &searchLength, @@ -117,9 +119,30 @@ static int CompressionParameters_init(Co self->targetLength = targetLength; self->strategy = strategy; + ztopy_compression_parameters(self, ¶ms); + zresult = ZSTD_checkCParams(params); + + if (ZSTD_isError(zresult)) { + PyErr_Format(PyExc_ValueError, "invalid compression parameters: %s", + ZSTD_getErrorName(zresult)); + return -1; + } + return 0; } +PyDoc_STRVAR(CompressionParameters_estimated_compression_context_size__doc__, +"Estimate the size in bytes of a compression context for compression parameters\n" +); + +PyObject* CompressionParameters_estimated_compression_context_size(CompressionParametersObject* self) { + ZSTD_compressionParameters params; + + ztopy_compression_parameters(self, ¶ms); + + return PyLong_FromSize_t(ZSTD_estimateCCtxSize(params)); +} + PyObject* estimate_compression_context_size(PyObject* self, PyObject* args) { CompressionParametersObject* params; ZSTD_compressionParameters zparams; @@ -142,6 +165,16 @@ static void CompressionParameters_deallo PyObject_Del(self); } +static PyMethodDef CompressionParameters_methods[] = { + { + "estimated_compression_context_size", + (PyCFunction)CompressionParameters_estimated_compression_context_size, + METH_NOARGS, + CompressionParameters_estimated_compression_context_size__doc__ + }, + { NULL, NULL } +}; + static PyMemberDef CompressionParameters_members[] = { { "window_log", T_UINT, offsetof(CompressionParametersObject, windowLog), READONLY, @@ -195,7 +228,7 @@ PyTypeObject CompressionParametersType = 0, /* tp_weaklistoffset */ 0, /* tp_iter */ 0, /* tp_iternext */ - 0, /* tp_methods */ + CompressionParameters_methods, /* tp_methods */ CompressionParameters_members, /* tp_members */ 0, /* tp_getset */ 0, /* tp_base */ @@ -214,7 +247,7 @@ void compressionparams_module_init(PyObj return; } - Py_IncRef((PyObject*)&CompressionParametersType); + Py_INCREF(&CompressionParametersType); PyModule_AddObject(mod, "CompressionParameters", (PyObject*)&CompressionParametersType); } diff --git a/contrib/python-zstandard/c-ext/compressionwriter.c b/contrib/python-zstandard/c-ext/compressionwriter.c --- a/contrib/python-zstandard/c-ext/compressionwriter.c +++ b/contrib/python-zstandard/c-ext/compressionwriter.c @@ -18,11 +18,6 @@ static void ZstdCompressionWriter_deallo Py_XDECREF(self->compressor); Py_XDECREF(self->writer); - if (self->cstream) { - ZSTD_freeCStream(self->cstream); - self->cstream = NULL; - } - PyObject_Del(self); } @@ -32,9 +27,15 @@ static PyObject* ZstdCompressionWriter_e return NULL; } - self->cstream = CStream_from_ZstdCompressor(self->compressor, self->sourceSize); - if (!self->cstream) { - return NULL; + if (self->compressor->mtcctx) { + if (init_mtcstream(self->compressor, self->sourceSize)) { + return NULL; + } + } + else { + if (0 != init_cstream(self->compressor, self->sourceSize)) { + return NULL; + } } self->entered = 1; @@ -58,8 +59,8 @@ static PyObject* ZstdCompressionWriter_e self->entered = 0; - if (self->cstream && exc_type == Py_None && exc_value == Py_None && - exc_tb == Py_None) { + if ((self->compressor->cstream || self->compressor->mtcctx) && exc_type == Py_None + && exc_value == Py_None && exc_tb == Py_None) { output.dst = PyMem_Malloc(self->outSize); if (!output.dst) { @@ -69,7 +70,12 @@ static PyObject* ZstdCompressionWriter_e output.pos = 0; while (1) { - zresult = ZSTD_endStream(self->cstream, &output); + if (self->compressor->mtcctx) { + zresult = ZSTDMT_endStream(self->compressor->mtcctx, &output); + } + else { + zresult = ZSTD_endStream(self->compressor->cstream, &output); + } if (ZSTD_isError(zresult)) { PyErr_Format(ZstdError, "error ending compression stream: %s", ZSTD_getErrorName(zresult)); @@ -95,21 +101,19 @@ static PyObject* ZstdCompressionWriter_e } PyMem_Free(output.dst); - ZSTD_freeCStream(self->cstream); - self->cstream = NULL; } Py_RETURN_FALSE; } static PyObject* ZstdCompressionWriter_memory_size(ZstdCompressionWriter* self) { - if (!self->cstream) { + if (!self->compressor->cstream) { PyErr_SetString(ZstdError, "cannot determine size of an inactive compressor; " "call when a context manager is active"); return NULL; } - return PyLong_FromSize_t(ZSTD_sizeof_CStream(self->cstream)); + return PyLong_FromSize_t(ZSTD_sizeof_CStream(self->compressor->cstream)); } static PyObject* ZstdCompressionWriter_write(ZstdCompressionWriter* self, PyObject* args) { @@ -147,7 +151,13 @@ static PyObject* ZstdCompressionWriter_w while ((ssize_t)input.pos < sourceSize) { Py_BEGIN_ALLOW_THREADS - zresult = ZSTD_compressStream(self->cstream, &output, &input); + if (self->compressor->mtcctx) { + zresult = ZSTDMT_compressStream(self->compressor->mtcctx, + &output, &input); + } + else { + zresult = ZSTD_compressStream(self->compressor->cstream, &output, &input); + } Py_END_ALLOW_THREADS if (ZSTD_isError(zresult)) { @@ -195,7 +205,12 @@ static PyObject* ZstdCompressionWriter_f while (1) { Py_BEGIN_ALLOW_THREADS - zresult = ZSTD_flushStream(self->cstream, &output); + if (self->compressor->mtcctx) { + zresult = ZSTDMT_flushStream(self->compressor->mtcctx, &output); + } + else { + zresult = ZSTD_flushStream(self->compressor->cstream, &output); + } Py_END_ALLOW_THREADS if (ZSTD_isError(zresult)) { diff --git a/contrib/python-zstandard/c-ext/compressobj.c b/contrib/python-zstandard/c-ext/compressobj.c --- a/contrib/python-zstandard/c-ext/compressobj.c +++ b/contrib/python-zstandard/c-ext/compressobj.c @@ -18,11 +18,6 @@ static void ZstdCompressionObj_dealloc(Z PyMem_Free(self->output.dst); self->output.dst = NULL; - if (self->cstream) { - ZSTD_freeCStream(self->cstream); - self->cstream = NULL; - } - Py_XDECREF(self->compressor); PyObject_Del(self); @@ -55,7 +50,13 @@ static PyObject* ZstdCompressionObj_comp while ((ssize_t)input.pos < sourceSize) { Py_BEGIN_ALLOW_THREADS - zresult = ZSTD_compressStream(self->cstream, &self->output, &input); + if (self->compressor->mtcctx) { + zresult = ZSTDMT_compressStream(self->compressor->mtcctx, + &self->output, &input); + } + else { + zresult = ZSTD_compressStream(self->compressor->cstream, &self->output, &input); + } Py_END_ALLOW_THREADS if (ZSTD_isError(zresult)) { @@ -118,7 +119,12 @@ static PyObject* ZstdCompressionObj_flus /* The output buffer is of size ZSTD_CStreamOutSize(), which is guaranteed to hold a full block. */ Py_BEGIN_ALLOW_THREADS - zresult = ZSTD_flushStream(self->cstream, &self->output); + if (self->compressor->mtcctx) { + zresult = ZSTDMT_flushStream(self->compressor->mtcctx, &self->output); + } + else { + zresult = ZSTD_flushStream(self->compressor->cstream, &self->output); + } Py_END_ALLOW_THREADS if (ZSTD_isError(zresult)) { @@ -150,7 +156,12 @@ static PyObject* ZstdCompressionObj_flus self->finished = 1; while (1) { - zresult = ZSTD_endStream(self->cstream, &self->output); + if (self->compressor->mtcctx) { + zresult = ZSTDMT_endStream(self->compressor->mtcctx, &self->output); + } + else { + zresult = ZSTD_endStream(self->compressor->cstream, &self->output); + } if (ZSTD_isError(zresult)) { PyErr_Format(ZstdError, "error ending compression stream: %s", ZSTD_getErrorName(zresult)); @@ -182,9 +193,6 @@ static PyObject* ZstdCompressionObj_flus } } - ZSTD_freeCStream(self->cstream); - self->cstream = NULL; - if (result) { return result; } diff --git a/contrib/python-zstandard/c-ext/compressor.c b/contrib/python-zstandard/c-ext/compressor.c --- a/contrib/python-zstandard/c-ext/compressor.c +++ b/contrib/python-zstandard/c-ext/compressor.c @@ -7,12 +7,17 @@ */ #include "python-zstandard.h" +#include "pool.h" extern PyObject* ZstdError; -int populate_cdict(ZstdCompressor* compressor, void* dictData, size_t dictSize, ZSTD_parameters* zparams) { +int populate_cdict(ZstdCompressor* compressor, ZSTD_parameters* zparams) { ZSTD_customMem zmem; - assert(!compressor->cdict); + + if (compressor->cdict || !compressor->dict || !compressor->dict->dictData) { + return 0; + } + Py_BEGIN_ALLOW_THREADS memset(&zmem, 0, sizeof(zmem)); compressor->cdict = ZSTD_createCDict_advanced(compressor->dict->dictData, @@ -28,22 +33,32 @@ int populate_cdict(ZstdCompressor* compr } /** -* Initialize a zstd CStream from a ZstdCompressor instance. -* -* Returns a ZSTD_CStream on success or NULL on failure. If NULL, a Python -* exception will be set. -*/ -ZSTD_CStream* CStream_from_ZstdCompressor(ZstdCompressor* compressor, Py_ssize_t sourceSize) { - ZSTD_CStream* cstream; + * Ensure the ZSTD_CStream on a ZstdCompressor instance is initialized. + * + * Returns 0 on success. Other value on failure. Will set a Python exception + * on failure. + */ +int init_cstream(ZstdCompressor* compressor, unsigned long long sourceSize) { ZSTD_parameters zparams; void* dictData = NULL; size_t dictSize = 0; size_t zresult; - cstream = ZSTD_createCStream(); - if (!cstream) { - PyErr_SetString(ZstdError, "cannot create CStream"); - return NULL; + if (compressor->cstream) { + zresult = ZSTD_resetCStream(compressor->cstream, sourceSize); + if (ZSTD_isError(zresult)) { + PyErr_Format(ZstdError, "could not reset CStream: %s", + ZSTD_getErrorName(zresult)); + return -1; + } + + return 0; + } + + compressor->cstream = ZSTD_createCStream(); + if (!compressor->cstream) { + PyErr_SetString(ZstdError, "could not create CStream"); + return -1; } if (compressor->dict) { @@ -63,15 +78,51 @@ ZSTD_CStream* CStream_from_ZstdCompresso zparams.fParams = compressor->fparams; - zresult = ZSTD_initCStream_advanced(cstream, dictData, dictSize, zparams, sourceSize); + zresult = ZSTD_initCStream_advanced(compressor->cstream, dictData, dictSize, + zparams, sourceSize); if (ZSTD_isError(zresult)) { - ZSTD_freeCStream(cstream); + ZSTD_freeCStream(compressor->cstream); + compressor->cstream = NULL; PyErr_Format(ZstdError, "cannot init CStream: %s", ZSTD_getErrorName(zresult)); - return NULL; + return -1; } - return cstream; + return 0;; +} + +int init_mtcstream(ZstdCompressor* compressor, Py_ssize_t sourceSize) { + size_t zresult; + void* dictData = NULL; + size_t dictSize = 0; + ZSTD_parameters zparams; + + assert(compressor->mtcctx); + + if (compressor->dict) { + dictData = compressor->dict->dictData; + dictSize = compressor->dict->dictSize; + } + + memset(&zparams, 0, sizeof(zparams)); + if (compressor->cparams) { + ztopy_compression_parameters(compressor->cparams, &zparams.cParams); + } + else { + zparams.cParams = ZSTD_getCParams(compressor->compressionLevel, sourceSize, dictSize); + } + + zparams.fParams = compressor->fparams; + + zresult = ZSTDMT_initCStream_advanced(compressor->mtcctx, dictData, dictSize, + zparams, sourceSize); + + if (ZSTD_isError(zresult)) { + PyErr_Format(ZstdError, "cannot init CStream: %s", ZSTD_getErrorName(zresult)); + return -1; + } + + return 0; } PyDoc_STRVAR(ZstdCompressor__doc__, @@ -103,6 +154,11 @@ PyDoc_STRVAR(ZstdCompressor__doc__, " Determines whether the dictionary ID will be written into the compressed\n" " data. Defaults to True. Only adds content to the compressed data if\n" " a dictionary is being used.\n" +"threads\n" +" Number of threads to use to compress data concurrently. When set,\n" +" compression operations are performed on multiple threads. The default\n" +" value (0) disables multi-threaded compression. A value of ``-1`` means to\n" +" set the number of threads to the number of detected logical CPUs.\n" ); static int ZstdCompressor_init(ZstdCompressor* self, PyObject* args, PyObject* kwargs) { @@ -113,6 +169,7 @@ static int ZstdCompressor_init(ZstdCompr "write_checksum", "write_content_size", "write_dict_id", + "threads", NULL }; @@ -122,16 +179,12 @@ static int ZstdCompressor_init(ZstdCompr PyObject* writeChecksum = NULL; PyObject* writeContentSize = NULL; PyObject* writeDictID = NULL; + int threads = 0; - self->cctx = NULL; - self->dict = NULL; - self->cparams = NULL; - self->cdict = NULL; - - if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|iO!O!OOO:ZstdCompressor", + if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|iO!O!OOOi:ZstdCompressor", kwlist, &level, &ZstdCompressionDictType, &dict, &CompressionParametersType, ¶ms, - &writeChecksum, &writeContentSize, &writeDictID)) { + &writeChecksum, &writeContentSize, &writeDictID, &threads)) { return -1; } @@ -146,12 +199,27 @@ static int ZstdCompressor_init(ZstdCompr return -1; } + if (threads < 0) { + threads = cpu_count(); + } + + self->threads = threads; + /* We create a ZSTD_CCtx for reuse among multiple operations to reduce the overhead of each compression operation. */ - self->cctx = ZSTD_createCCtx(); - if (!self->cctx) { - PyErr_NoMemory(); - return -1; + if (threads) { + self->mtcctx = ZSTDMT_createCCtx(threads); + if (!self->mtcctx) { + PyErr_NoMemory(); + return -1; + } + } + else { + self->cctx = ZSTD_createCCtx(); + if (!self->cctx) { + PyErr_NoMemory(); + return -1; + } } self->compressionLevel = level; @@ -182,6 +250,11 @@ static int ZstdCompressor_init(ZstdCompr } static void ZstdCompressor_dealloc(ZstdCompressor* self) { + if (self->cstream) { + ZSTD_freeCStream(self->cstream); + self->cstream = NULL; + } + Py_XDECREF(self->cparams); Py_XDECREF(self->dict); @@ -195,6 +268,11 @@ static void ZstdCompressor_dealloc(ZstdC self->cctx = NULL; } + if (self->mtcctx) { + ZSTDMT_freeCCtx(self->mtcctx); + self->mtcctx = NULL; + } + PyObject_Del(self); } @@ -229,7 +307,6 @@ static PyObject* ZstdCompressor_copy_str Py_ssize_t sourceSize = 0; size_t inSize = ZSTD_CStreamInSize(); size_t outSize = ZSTD_CStreamOutSize(); - ZSTD_CStream* cstream; ZSTD_inBuffer input; ZSTD_outBuffer output; Py_ssize_t totalRead = 0; @@ -261,10 +338,17 @@ static PyObject* ZstdCompressor_copy_str /* Prevent free on uninitialized memory in finally. */ output.dst = NULL; - cstream = CStream_from_ZstdCompressor(self, sourceSize); - if (!cstream) { - res = NULL; - goto finally; + if (self->mtcctx) { + if (init_mtcstream(self, sourceSize)) { + res = NULL; + goto finally; + } + } + else { + if (0 != init_cstream(self, sourceSize)) { + res = NULL; + goto finally; + } } output.dst = PyMem_Malloc(outSize); @@ -300,7 +384,12 @@ static PyObject* ZstdCompressor_copy_str while (input.pos < input.size) { Py_BEGIN_ALLOW_THREADS - zresult = ZSTD_compressStream(cstream, &output, &input); + if (self->mtcctx) { + zresult = ZSTDMT_compressStream(self->mtcctx, &output, &input); + } + else { + zresult = ZSTD_compressStream(self->cstream, &output, &input); + } Py_END_ALLOW_THREADS if (ZSTD_isError(zresult)) { @@ -325,7 +414,12 @@ static PyObject* ZstdCompressor_copy_str /* We've finished reading. Now flush the compressor stream. */ while (1) { - zresult = ZSTD_endStream(cstream, &output); + if (self->mtcctx) { + zresult = ZSTDMT_endStream(self->mtcctx, &output); + } + else { + zresult = ZSTD_endStream(self->cstream, &output); + } if (ZSTD_isError(zresult)) { PyErr_Format(ZstdError, "error ending compression stream: %s", ZSTD_getErrorName(zresult)); @@ -350,24 +444,17 @@ static PyObject* ZstdCompressor_copy_str } } - ZSTD_freeCStream(cstream); - cstream = NULL; - totalReadPy = PyLong_FromSsize_t(totalRead); totalWritePy = PyLong_FromSsize_t(totalWrite); res = PyTuple_Pack(2, totalReadPy, totalWritePy); - Py_DecRef(totalReadPy); - Py_DecRef(totalWritePy); + Py_DECREF(totalReadPy); + Py_DECREF(totalWritePy); finally: if (output.dst) { PyMem_Free(output.dst); } - if (cstream) { - ZSTD_freeCStream(cstream); - } - return res; } @@ -410,6 +497,18 @@ static PyObject* ZstdCompressor_compress return NULL; } + if (self->threads && self->dict) { + PyErr_SetString(ZstdError, + "compress() cannot be used with both dictionaries and multi-threaded compression"); + return NULL; + } + + if (self->threads && self->cparams) { + PyErr_SetString(ZstdError, + "compress() cannot be used with both compression parameters and multi-threaded compression"); + return NULL; + } + /* Limitation in zstd C API doesn't let decompression side distinguish between content size of 0 and unknown content size. This can make round tripping via Python difficult. Until this is fixed, require a flag @@ -456,24 +555,28 @@ static PyObject* ZstdCompressor_compress https://github.com/facebook/zstd/issues/358 contains more info. We could potentially add an argument somewhere to control this behavior. */ - if (dictData && !self->cdict) { - if (populate_cdict(self, dictData, dictSize, &zparams)) { - Py_DECREF(output); - return NULL; - } + if (0 != populate_cdict(self, &zparams)) { + Py_DECREF(output); + return NULL; } Py_BEGIN_ALLOW_THREADS - /* By avoiding ZSTD_compress(), we don't necessarily write out content - size. This means the argument to ZstdCompressor to control frame - parameters is honored. */ - if (self->cdict) { - zresult = ZSTD_compress_usingCDict(self->cctx, dest, destSize, - source, sourceSize, self->cdict); + if (self->mtcctx) { + zresult = ZSTDMT_compressCCtx(self->mtcctx, dest, destSize, + source, sourceSize, self->compressionLevel); } else { - zresult = ZSTD_compress_advanced(self->cctx, dest, destSize, - source, sourceSize, dictData, dictSize, zparams); + /* By avoiding ZSTD_compress(), we don't necessarily write out content + size. This means the argument to ZstdCompressor to control frame + parameters is honored. */ + if (self->cdict) { + zresult = ZSTD_compress_usingCDict(self->cctx, dest, destSize, + source, sourceSize, self->cdict); + } + else { + zresult = ZSTD_compress_advanced(self->cctx, dest, destSize, + source, sourceSize, dictData, dictSize, zparams); + } } Py_END_ALLOW_THREADS @@ -507,21 +610,30 @@ static ZstdCompressionObj* ZstdCompresso Py_ssize_t inSize = 0; size_t outSize = ZSTD_CStreamOutSize(); - ZstdCompressionObj* result = PyObject_New(ZstdCompressionObj, &ZstdCompressionObjType); - if (!result) { - return NULL; - } + ZstdCompressionObj* result = NULL; if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|n:compressobj", kwlist, &inSize)) { return NULL; } - result->cstream = CStream_from_ZstdCompressor(self, inSize); - if (!result->cstream) { - Py_DECREF(result); + result = (ZstdCompressionObj*)PyObject_CallObject((PyObject*)&ZstdCompressionObjType, NULL); + if (!result) { return NULL; } + if (self->mtcctx) { + if (init_mtcstream(self, inSize)) { + Py_DECREF(result); + return NULL; + } + } + else { + if (0 != init_cstream(self, inSize)) { + Py_DECREF(result); + return NULL; + } + } + result->output.dst = PyMem_Malloc(outSize); if (!result->output.dst) { PyErr_NoMemory(); @@ -529,13 +641,9 @@ static ZstdCompressionObj* ZstdCompresso return NULL; } result->output.size = outSize; - result->output.pos = 0; - result->compressor = self; Py_INCREF(result->compressor); - result->finished = 0; - return result; } @@ -579,19 +687,10 @@ static ZstdCompressorIterator* ZstdCompr return NULL; } - result = PyObject_New(ZstdCompressorIterator, &ZstdCompressorIteratorType); + result = (ZstdCompressorIterator*)PyObject_CallObject((PyObject*)&ZstdCompressorIteratorType, NULL); if (!result) { return NULL; } - - result->compressor = NULL; - result->reader = NULL; - result->buffer = NULL; - result->cstream = NULL; - result->input.src = NULL; - result->output.dst = NULL; - result->readResult = NULL; - if (PyObject_HasAttrString(reader, "read")) { result->reader = reader; Py_INCREF(result->reader); @@ -608,7 +707,6 @@ static ZstdCompressorIterator* ZstdCompr goto except; } - result->bufferOffset = 0; sourceSize = result->buffer->len; } else { @@ -621,9 +719,16 @@ static ZstdCompressorIterator* ZstdCompr Py_INCREF(result->compressor); result->sourceSize = sourceSize; - result->cstream = CStream_from_ZstdCompressor(self, sourceSize); - if (!result->cstream) { - goto except; + + if (self->mtcctx) { + if (init_mtcstream(self, sourceSize)) { + goto except; + } + } + else { + if (0 != init_cstream(self, sourceSize)) { + goto except; + } } result->inSize = inSize; @@ -635,26 +740,12 @@ static ZstdCompressorIterator* ZstdCompr goto except; } result->output.size = outSize; - result->output.pos = 0; - - result->input.src = NULL; - result->input.size = 0; - result->input.pos = 0; - - result->finishedInput = 0; - result->finishedOutput = 0; goto finally; except: - if (result->cstream) { - ZSTD_freeCStream(result->cstream); - result->cstream = NULL; - } - - Py_DecRef((PyObject*)result->compressor); - Py_DecRef(result->reader); - + Py_XDECREF(result->compressor); + Py_XDECREF(result->reader); Py_DECREF(result); result = NULL; @@ -703,7 +794,7 @@ static ZstdCompressionWriter* ZstdCompre return NULL; } - result = PyObject_New(ZstdCompressionWriter, &ZstdCompressionWriterType); + result = (ZstdCompressionWriter*)PyObject_CallObject((PyObject*)&ZstdCompressionWriterType, NULL); if (!result) { return NULL; } @@ -715,11 +806,671 @@ static ZstdCompressionWriter* ZstdCompre Py_INCREF(result->writer); result->sourceSize = sourceSize; - result->outSize = outSize; - result->entered = 0; - result->cstream = NULL; + return result; +} + +typedef struct { + void* sourceData; + size_t sourceSize; +} DataSource; + +typedef struct { + DataSource* sources; + Py_ssize_t sourcesSize; + unsigned long long totalSourceSize; +} DataSources; + +typedef struct { + void* dest; + Py_ssize_t destSize; + BufferSegment* segments; + Py_ssize_t segmentsSize; +} DestBuffer; + +typedef enum { + WorkerError_none = 0, + WorkerError_zstd = 1, + WorkerError_no_memory = 2, +} WorkerError; + +/** + * Holds state for an individual worker performing multi_compress_to_buffer work. + */ +typedef struct { + /* Used for compression. */ + ZSTD_CCtx* cctx; + ZSTD_CDict* cdict; + int cLevel; + CompressionParametersObject* cParams; + ZSTD_frameParameters fParams; + + /* What to compress. */ + DataSource* sources; + Py_ssize_t sourcesSize; + Py_ssize_t startOffset; + Py_ssize_t endOffset; + unsigned long long totalSourceSize; + + /* Result storage. */ + DestBuffer* destBuffers; + Py_ssize_t destCount; + + /* Error tracking. */ + WorkerError error; + size_t zresult; + Py_ssize_t errorOffset; +} WorkerState; + +static void compress_worker(WorkerState* state) { + Py_ssize_t inputOffset = state->startOffset; + Py_ssize_t remainingItems = state->endOffset - state->startOffset + 1; + Py_ssize_t currentBufferStartOffset = state->startOffset; + size_t zresult; + ZSTD_parameters zparams; + void* newDest; + size_t allocationSize; + size_t boundSize; + Py_ssize_t destOffset = 0; + DataSource* sources = state->sources; + DestBuffer* destBuffer; + + assert(!state->destBuffers); + assert(0 == state->destCount); + + if (state->cParams) { + ztopy_compression_parameters(state->cParams, &zparams.cParams); + } + + zparams.fParams = state->fParams; + + /* + * The total size of the compressed data is unknown until we actually + * compress data. That means we can't pre-allocate the exact size we need. + * + * There is a cost to every allocation and reallocation. So, it is in our + * interest to minimize the number of allocations. + * + * There is also a cost to too few allocations. If allocations are too + * large they may fail. If buffers are shared and all inputs become + * irrelevant at different lifetimes, then a reference to one segment + * in the buffer will keep the entire buffer alive. This leads to excessive + * memory usage. + * + * Our current strategy is to assume a compression ratio of 16:1 and + * allocate buffers of that size, rounded up to the nearest power of 2 + * (because computers like round numbers). That ratio is greater than what + * most inputs achieve. This is by design: we don't want to over-allocate. + * But we don't want to under-allocate and lead to too many buffers either. + */ + + state->destCount = 1; + + state->destBuffers = calloc(1, sizeof(DestBuffer)); + if (NULL == state->destBuffers) { + state->error = WorkerError_no_memory; + return; + } + + destBuffer = &state->destBuffers[state->destCount - 1]; + + /* + * Rather than track bounds and grow the segments buffer, allocate space + * to hold remaining items then truncate when we're done with it. + */ + destBuffer->segments = calloc(remainingItems, sizeof(BufferSegment)); + if (NULL == destBuffer->segments) { + state->error = WorkerError_no_memory; + return; + } + + destBuffer->segmentsSize = remainingItems; + + allocationSize = roundpow2(state->totalSourceSize >> 4); + + /* If the maximum size of the output is larger than that, round up. */ + boundSize = ZSTD_compressBound(sources[inputOffset].sourceSize); + + if (boundSize > allocationSize) { + allocationSize = roundpow2(boundSize); + } + + destBuffer->dest = malloc(allocationSize); + if (NULL == destBuffer->dest) { + state->error = WorkerError_no_memory; + return; + } + + destBuffer->destSize = allocationSize; + + for (inputOffset = state->startOffset; inputOffset <= state->endOffset; inputOffset++) { + void* source = sources[inputOffset].sourceData; + size_t sourceSize = sources[inputOffset].sourceSize; + size_t destAvailable; + void* dest; + + destAvailable = destBuffer->destSize - destOffset; + boundSize = ZSTD_compressBound(sourceSize); + + /* + * Not enough space in current buffer to hold largest compressed output. + * So allocate and switch to a new output buffer. + */ + if (boundSize > destAvailable) { + /* + * The downsizing of the existing buffer is optional. It should be cheap + * (unlike growing). So we just do it. + */ + if (destAvailable) { + newDest = realloc(destBuffer->dest, destOffset); + if (NULL == newDest) { + state->error = WorkerError_no_memory; + return; + } + + destBuffer->dest = newDest; + destBuffer->destSize = destOffset; + } + + /* Truncate segments buffer. */ + newDest = realloc(destBuffer->segments, + (inputOffset - currentBufferStartOffset + 1) * sizeof(BufferSegment)); + if (NULL == newDest) { + state->error = WorkerError_no_memory; + return; + } + + destBuffer->segments = newDest; + destBuffer->segmentsSize = inputOffset - currentBufferStartOffset; + + /* Grow space for new struct. */ + /* TODO consider over-allocating so we don't do this every time. */ + newDest = realloc(state->destBuffers, (state->destCount + 1) * sizeof(DestBuffer)); + if (NULL == newDest) { + state->error = WorkerError_no_memory; + return; + } + + state->destBuffers = newDest; + state->destCount++; + + destBuffer = &state->destBuffers[state->destCount - 1]; + + /* Don't take any chances with non-NULL pointers. */ + memset(destBuffer, 0, sizeof(DestBuffer)); + + /** + * We could dynamically update allocation size based on work done so far. + * For now, keep is simple. + */ + allocationSize = roundpow2(state->totalSourceSize >> 4); + + if (boundSize > allocationSize) { + allocationSize = roundpow2(boundSize); + } + + destBuffer->dest = malloc(allocationSize); + if (NULL == destBuffer->dest) { + state->error = WorkerError_no_memory; + return; + } + + destBuffer->destSize = allocationSize; + destAvailable = allocationSize; + destOffset = 0; + + destBuffer->segments = calloc(remainingItems, sizeof(BufferSegment)); + if (NULL == destBuffer->segments) { + state->error = WorkerError_no_memory; + return; + } + + destBuffer->segmentsSize = remainingItems; + currentBufferStartOffset = inputOffset; + } + + dest = (char*)destBuffer->dest + destOffset; + + if (state->cdict) { + zresult = ZSTD_compress_usingCDict(state->cctx, dest, destAvailable, + source, sourceSize, state->cdict); + } + else { + if (!state->cParams) { + zparams.cParams = ZSTD_getCParams(state->cLevel, sourceSize, 0); + } + + zresult = ZSTD_compress_advanced(state->cctx, dest, destAvailable, + source, sourceSize, NULL, 0, zparams); + } + + if (ZSTD_isError(zresult)) { + state->error = WorkerError_zstd; + state->zresult = zresult; + state->errorOffset = inputOffset; + break; + } + + destBuffer->segments[inputOffset - currentBufferStartOffset].offset = destOffset; + destBuffer->segments[inputOffset - currentBufferStartOffset].length = zresult; + + destOffset += zresult; + remainingItems--; + } + + if (destBuffer->destSize > destOffset) { + newDest = realloc(destBuffer->dest, destOffset); + if (NULL == newDest) { + state->error = WorkerError_no_memory; + return; + } + + destBuffer->dest = newDest; + destBuffer->destSize = destOffset; + } +} + +ZstdBufferWithSegmentsCollection* compress_from_datasources(ZstdCompressor* compressor, + DataSources* sources, unsigned int threadCount) { + ZSTD_parameters zparams; + unsigned long long bytesPerWorker; + POOL_ctx* pool = NULL; + WorkerState* workerStates = NULL; + Py_ssize_t i; + unsigned long long workerBytes = 0; + Py_ssize_t workerStartOffset = 0; + size_t currentThread = 0; + int errored = 0; + Py_ssize_t segmentsCount = 0; + Py_ssize_t segmentIndex; + PyObject* segmentsArg = NULL; + ZstdBufferWithSegments* buffer; + ZstdBufferWithSegmentsCollection* result = NULL; + + assert(sources->sourcesSize > 0); + assert(sources->totalSourceSize > 0); + assert(threadCount >= 1); + + /* More threads than inputs makes no sense. */ + threadCount = sources->sourcesSize < threadCount ? (unsigned int)sources->sourcesSize + : threadCount; + + /* TODO lower thread count when input size is too small and threads would add + overhead. */ + + /* + * When dictionaries are used, parameters are derived from the size of the + * first element. + * + * TODO come up with a better mechanism. + */ + memset(&zparams, 0, sizeof(zparams)); + if (compressor->cparams) { + ztopy_compression_parameters(compressor->cparams, &zparams.cParams); + } + else { + zparams.cParams = ZSTD_getCParams(compressor->compressionLevel, + sources->sources[0].sourceSize, + compressor->dict ? compressor->dict->dictSize : 0); + } + + zparams.fParams = compressor->fparams; + + if (0 != populate_cdict(compressor, &zparams)) { + return NULL; + } + + workerStates = PyMem_Malloc(threadCount * sizeof(WorkerState)); + if (NULL == workerStates) { + PyErr_NoMemory(); + goto finally; + } + + memset(workerStates, 0, threadCount * sizeof(WorkerState)); + + if (threadCount > 1) { + pool = POOL_create(threadCount, 1); + if (NULL == pool) { + PyErr_SetString(ZstdError, "could not initialize zstd thread pool"); + goto finally; + } + } + + bytesPerWorker = sources->totalSourceSize / threadCount; + + for (i = 0; i < threadCount; i++) { + workerStates[i].cctx = ZSTD_createCCtx(); + if (!workerStates[i].cctx) { + PyErr_NoMemory(); + goto finally; + } + + workerStates[i].cdict = compressor->cdict; + workerStates[i].cLevel = compressor->compressionLevel; + workerStates[i].cParams = compressor->cparams; + workerStates[i].fParams = compressor->fparams; + + workerStates[i].sources = sources->sources; + workerStates[i].sourcesSize = sources->sourcesSize; + } + + Py_BEGIN_ALLOW_THREADS + for (i = 0; i < sources->sourcesSize; i++) { + workerBytes += sources->sources[i].sourceSize; + + /* + * The last worker/thread needs to handle all remaining work. Don't + * trigger it prematurely. Defer to the block outside of the loop + * to run the last worker/thread. But do still process this loop + * so workerBytes is correct. + */ + if (currentThread == threadCount - 1) { + continue; + } + + if (workerBytes >= bytesPerWorker) { + assert(currentThread < threadCount); + workerStates[currentThread].totalSourceSize = workerBytes; + workerStates[currentThread].startOffset = workerStartOffset; + workerStates[currentThread].endOffset = i; + + if (threadCount > 1) { + POOL_add(pool, (POOL_function)compress_worker, &workerStates[currentThread]); + } + else { + compress_worker(&workerStates[currentThread]); + } + + currentThread++; + workerStartOffset = i + 1; + workerBytes = 0; + } + } + + if (workerBytes) { + assert(currentThread < threadCount); + workerStates[currentThread].totalSourceSize = workerBytes; + workerStates[currentThread].startOffset = workerStartOffset; + workerStates[currentThread].endOffset = sources->sourcesSize - 1; + + if (threadCount > 1) { + POOL_add(pool, (POOL_function)compress_worker, &workerStates[currentThread]); + } + else { + compress_worker(&workerStates[currentThread]); + } + } + + if (threadCount > 1) { + POOL_free(pool); + pool = NULL; + } + + Py_END_ALLOW_THREADS + + for (i = 0; i < threadCount; i++) { + switch (workerStates[i].error) { + case WorkerError_no_memory: + PyErr_NoMemory(); + errored = 1; + break; + + case WorkerError_zstd: + PyErr_Format(ZstdError, "error compressing item %zd: %s", + workerStates[i].errorOffset, ZSTD_getErrorName(workerStates[i].zresult)); + errored = 1; + break; + default: + ; + } + + if (errored) { + break; + } + + } + + if (errored) { + goto finally; + } + + segmentsCount = 0; + for (i = 0; i < threadCount; i++) { + WorkerState* state = &workerStates[i]; + segmentsCount += state->destCount; + } + + segmentsArg = PyTuple_New(segmentsCount); + if (NULL == segmentsArg) { + goto finally; + } + + segmentIndex = 0; + + for (i = 0; i < threadCount; i++) { + Py_ssize_t j; + WorkerState* state = &workerStates[i]; + + for (j = 0; j < state->destCount; j++) { + DestBuffer* destBuffer = &state->destBuffers[j]; + buffer = BufferWithSegments_FromMemory(destBuffer->dest, destBuffer->destSize, + destBuffer->segments, destBuffer->segmentsSize); + + if (NULL == buffer) { + goto finally; + } + + /* Tell instance to use free() instsead of PyMem_Free(). */ + buffer->useFree = 1; + + /* + * BufferWithSegments_FromMemory takes ownership of the backing memory. + * Unset it here so it doesn't get freed below. + */ + destBuffer->dest = NULL; + destBuffer->segments = NULL; + + PyTuple_SET_ITEM(segmentsArg, segmentIndex++, (PyObject*)buffer); + } + } + + result = (ZstdBufferWithSegmentsCollection*)PyObject_CallObject( + (PyObject*)&ZstdBufferWithSegmentsCollectionType, segmentsArg); + +finally: + Py_CLEAR(segmentsArg); + + if (pool) { + POOL_free(pool); + } + + if (workerStates) { + Py_ssize_t j; + + for (i = 0; i < threadCount; i++) { + WorkerState state = workerStates[i]; + + if (state.cctx) { + ZSTD_freeCCtx(state.cctx); + } + + /* malloc() is used in worker thread. */ + + for (j = 0; j < state.destCount; j++) { + if (state.destBuffers) { + free(state.destBuffers[j].dest); + free(state.destBuffers[j].segments); + } + } + + + free(state.destBuffers); + } + + PyMem_Free(workerStates); + } + + return result; +} + +PyDoc_STRVAR(ZstdCompressor_multi_compress_to_buffer__doc__, +"Compress multiple pieces of data as a single operation\n" +"\n" +"Receives a ``BufferWithSegmentsCollection``, a ``BufferWithSegments``, or\n" +"a list of bytes like objects holding data to compress.\n" +"\n" +"Returns a ``BufferWithSegmentsCollection`` holding compressed data.\n" +"\n" +"This function is optimized to perform multiple compression operations as\n" +"as possible with as little overhead as possbile.\n" +); + +static ZstdBufferWithSegmentsCollection* ZstdCompressor_multi_compress_to_buffer(ZstdCompressor* self, PyObject* args, PyObject* kwargs) { + static char* kwlist[] = { + "data", + "threads", + NULL + }; + + PyObject* data; + int threads = 0; + Py_buffer* dataBuffers = NULL; + DataSources sources; + Py_ssize_t i; + Py_ssize_t sourceCount = 0; + ZstdBufferWithSegmentsCollection* result = NULL; + + if (self->mtcctx) { + PyErr_SetString(ZstdError, + "function cannot be called on ZstdCompressor configured for multi-threaded compression"); + return NULL; + } + + memset(&sources, 0, sizeof(sources)); + + if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|i:multi_compress_to_buffer", kwlist, + &data, &threads)) { + return NULL; + } + + if (threads < 0) { + threads = cpu_count(); + } + + if (threads < 2) { + threads = 1; + } + + if (PyObject_TypeCheck(data, &ZstdBufferWithSegmentsType)) { + ZstdBufferWithSegments* buffer = (ZstdBufferWithSegments*)data; + + sources.sources = PyMem_Malloc(buffer->segmentCount * sizeof(DataSource)); + if (NULL == sources.sources) { + PyErr_NoMemory(); + goto finally; + } + + for (i = 0; i < buffer->segmentCount; i++) { + sources.sources[i].sourceData = (char*)buffer->data + buffer->segments[i].offset; + sources.sources[i].sourceSize = buffer->segments[i].length; + sources.totalSourceSize += buffer->segments[i].length; + } + + sources.sourcesSize = buffer->segmentCount; + } + else if (PyObject_TypeCheck(data, &ZstdBufferWithSegmentsCollectionType)) { + Py_ssize_t j; + Py_ssize_t offset = 0; + ZstdBufferWithSegments* buffer; + ZstdBufferWithSegmentsCollection* collection = (ZstdBufferWithSegmentsCollection*)data; + + sourceCount = BufferWithSegmentsCollection_length(collection); + + sources.sources = PyMem_Malloc(sourceCount * sizeof(DataSource)); + if (NULL == sources.sources) { + PyErr_NoMemory(); + goto finally; + } + + for (i = 0; i < collection->bufferCount; i++) { + buffer = collection->buffers[i]; + + for (j = 0; j < buffer->segmentCount; j++) { + sources.sources[offset].sourceData = (char*)buffer->data + buffer->segments[j].offset; + sources.sources[offset].sourceSize = buffer->segments[j].length; + sources.totalSourceSize += buffer->segments[j].length; + + offset++; + } + } + + sources.sourcesSize = sourceCount; + } + else if (PyList_Check(data)) { + sourceCount = PyList_GET_SIZE(data); + + sources.sources = PyMem_Malloc(sourceCount * sizeof(DataSource)); + if (NULL == sources.sources) { + PyErr_NoMemory(); + goto finally; + } + + /* + * It isn't clear whether the address referred to by Py_buffer.buf + * is still valid after PyBuffer_Release. We we hold a reference to all + * Py_buffer instances for the duration of the operation. + */ + dataBuffers = PyMem_Malloc(sourceCount * sizeof(Py_buffer)); + if (NULL == dataBuffers) { + PyErr_NoMemory(); + goto finally; + } + + memset(dataBuffers, 0, sourceCount * sizeof(Py_buffer)); + + for (i = 0; i < sourceCount; i++) { + if (0 != PyObject_GetBuffer(PyList_GET_ITEM(data, i), + &dataBuffers[i], PyBUF_CONTIG_RO)) { + PyErr_Clear(); + PyErr_Format(PyExc_TypeError, "item %zd not a bytes like object", i); + goto finally; + } + + sources.sources[i].sourceData = dataBuffers[i].buf; + sources.sources[i].sourceSize = dataBuffers[i].len; + sources.totalSourceSize += dataBuffers[i].len; + } + + sources.sourcesSize = sourceCount; + } + else { + PyErr_SetString(PyExc_TypeError, "argument must be list of BufferWithSegments"); + goto finally; + } + + if (0 == sources.sourcesSize) { + PyErr_SetString(PyExc_ValueError, "no source elements found"); + goto finally; + } + + if (0 == sources.totalSourceSize) { + PyErr_SetString(PyExc_ValueError, "source elements are empty"); + goto finally; + } + + result = compress_from_datasources(self, &sources, threads); + +finally: + PyMem_Free(sources.sources); + + if (dataBuffers) { + for (i = 0; i < sourceCount; i++) { + PyBuffer_Release(&dataBuffers[i]); + } + + PyMem_Free(dataBuffers); + } return result; } @@ -735,6 +1486,8 @@ static PyMethodDef ZstdCompressor_method METH_VARARGS | METH_KEYWORDS, ZstdCompressor_read_from__doc__ }, { "write_to", (PyCFunction)ZstdCompressor_write_to, METH_VARARGS | METH_KEYWORDS, ZstdCompressor_write_to___doc__ }, + { "multi_compress_to_buffer", (PyCFunction)ZstdCompressor_multi_compress_to_buffer, + METH_VARARGS | METH_KEYWORDS, ZstdCompressor_multi_compress_to_buffer__doc__ }, { NULL, NULL } }; diff --git a/contrib/python-zstandard/c-ext/compressoriterator.c b/contrib/python-zstandard/c-ext/compressoriterator.c --- a/contrib/python-zstandard/c-ext/compressoriterator.c +++ b/contrib/python-zstandard/c-ext/compressoriterator.c @@ -27,11 +27,6 @@ static void ZstdCompressorIterator_deall self->buffer = NULL; } - if (self->cstream) { - ZSTD_freeCStream(self->cstream); - self->cstream = NULL; - } - if (self->output.dst) { PyMem_Free(self->output.dst); self->output.dst = NULL; @@ -63,7 +58,14 @@ feedcompressor: /* If we have data left in the input, consume it. */ if (self->input.pos < self->input.size) { Py_BEGIN_ALLOW_THREADS - zresult = ZSTD_compressStream(self->cstream, &self->output, &self->input); + if (self->compressor->mtcctx) { + zresult = ZSTDMT_compressStream(self->compressor->mtcctx, + &self->output, &self->input); + } + else { + zresult = ZSTD_compressStream(self->compressor->cstream, &self->output, + &self->input); + } Py_END_ALLOW_THREADS /* Release the Python object holding the input buffer. */ @@ -128,7 +130,12 @@ feedcompressor: /* EOF */ if (0 == readSize) { - zresult = ZSTD_endStream(self->cstream, &self->output); + if (self->compressor->mtcctx) { + zresult = ZSTDMT_endStream(self->compressor->mtcctx, &self->output); + } + else { + zresult = ZSTD_endStream(self->compressor->cstream, &self->output); + } if (ZSTD_isError(zresult)) { PyErr_Format(ZstdError, "error ending compression stream: %s", ZSTD_getErrorName(zresult)); @@ -152,7 +159,13 @@ feedcompressor: self->input.pos = 0; Py_BEGIN_ALLOW_THREADS - zresult = ZSTD_compressStream(self->cstream, &self->output, &self->input); + if (self->compressor->mtcctx) { + zresult = ZSTDMT_compressStream(self->compressor->mtcctx, &self->output, + &self->input); + } + else { + zresult = ZSTD_compressStream(self->compressor->cstream, &self->output, &self->input); + } Py_END_ALLOW_THREADS /* The input buffer currently points to memory managed by Python diff --git a/contrib/python-zstandard/c-ext/constants.c b/contrib/python-zstandard/c-ext/constants.c --- a/contrib/python-zstandard/c-ext/constants.c +++ b/contrib/python-zstandard/c-ext/constants.c @@ -41,7 +41,7 @@ void constants_module_init(PyObject* mod PyTuple_SetItem(zstdVersion, 0, PyLong_FromLong(ZSTD_VERSION_MAJOR)); PyTuple_SetItem(zstdVersion, 1, PyLong_FromLong(ZSTD_VERSION_MINOR)); PyTuple_SetItem(zstdVersion, 2, PyLong_FromLong(ZSTD_VERSION_RELEASE)); - Py_IncRef(zstdVersion); + Py_INCREF(zstdVersion); PyModule_AddObject(mod, "ZSTD_VERSION", zstdVersion); frameHeader = PyBytes_FromStringAndSize(frame_header, sizeof(frame_header)); diff --git a/contrib/python-zstandard/c-ext/decompressionwriter.c b/contrib/python-zstandard/c-ext/decompressionwriter.c --- a/contrib/python-zstandard/c-ext/decompressionwriter.c +++ b/contrib/python-zstandard/c-ext/decompressionwriter.c @@ -18,11 +18,6 @@ static void ZstdDecompressionWriter_deal Py_XDECREF(self->decompressor); Py_XDECREF(self->writer); - if (self->dstream) { - ZSTD_freeDStream(self->dstream); - self->dstream = NULL; - } - PyObject_Del(self); } @@ -32,8 +27,7 @@ static PyObject* ZstdDecompressionWriter return NULL; } - self->dstream = DStream_from_ZstdDecompressor(self->decompressor); - if (!self->dstream) { + if (0 != init_dstream(self->decompressor)) { return NULL; } @@ -46,22 +40,17 @@ static PyObject* ZstdDecompressionWriter static PyObject* ZstdDecompressionWriter_exit(ZstdDecompressionWriter* self, PyObject* args) { self->entered = 0; - if (self->dstream) { - ZSTD_freeDStream(self->dstream); - self->dstream = NULL; - } - Py_RETURN_FALSE; } static PyObject* ZstdDecompressionWriter_memory_size(ZstdDecompressionWriter* self) { - if (!self->dstream) { + if (!self->decompressor->dstream) { PyErr_SetString(ZstdError, "cannot determine size of inactive decompressor; " "call when context manager is active"); return NULL; } - return PyLong_FromSize_t(ZSTD_sizeof_DStream(self->dstream)); + return PyLong_FromSize_t(ZSTD_sizeof_DStream(self->decompressor->dstream)); } static PyObject* ZstdDecompressionWriter_write(ZstdDecompressionWriter* self, PyObject* args) { @@ -86,6 +75,8 @@ static PyObject* ZstdDecompressionWriter return NULL; } + assert(self->decompressor->dstream); + output.dst = PyMem_Malloc(self->outSize); if (!output.dst) { return PyErr_NoMemory(); @@ -99,7 +90,7 @@ static PyObject* ZstdDecompressionWriter while ((ssize_t)input.pos < sourceSize) { Py_BEGIN_ALLOW_THREADS - zresult = ZSTD_decompressStream(self->dstream, &output, &input); + zresult = ZSTD_decompressStream(self->decompressor->dstream, &output, &input); Py_END_ALLOW_THREADS if (ZSTD_isError(zresult)) { diff --git a/contrib/python-zstandard/c-ext/decompressobj.c b/contrib/python-zstandard/c-ext/decompressobj.c --- a/contrib/python-zstandard/c-ext/decompressobj.c +++ b/contrib/python-zstandard/c-ext/decompressobj.c @@ -15,11 +15,6 @@ PyDoc_STRVAR(DecompressionObj__doc__, ); static void DecompressionObj_dealloc(ZstdDecompressionObj* self) { - if (self->dstream) { - ZSTD_freeDStream(self->dstream); - self->dstream = NULL; - } - Py_XDECREF(self->decompressor); PyObject_Del(self); @@ -35,6 +30,9 @@ static PyObject* DecompressionObj_decomp PyObject* result = NULL; Py_ssize_t resultSize = 0; + /* Constructor should ensure stream is populated. */ + assert(self->decompressor->dstream); + if (self->finished) { PyErr_SetString(ZstdError, "cannot use a decompressobj multiple times"); return NULL; @@ -64,7 +62,7 @@ static PyObject* DecompressionObj_decomp /* Read input until exhausted. */ while (input.pos < input.size) { Py_BEGIN_ALLOW_THREADS - zresult = ZSTD_decompressStream(self->dstream, &output, &input); + zresult = ZSTD_decompressStream(self->decompressor->dstream, &output, &input); Py_END_ALLOW_THREADS if (ZSTD_isError(zresult)) { @@ -106,8 +104,7 @@ static PyObject* DecompressionObj_decomp goto finally; except: - Py_DecRef(result); - result = NULL; + Py_CLEAR(result); finally: PyMem_Free(output.dst); diff --git a/contrib/python-zstandard/c-ext/decompressor.c b/contrib/python-zstandard/c-ext/decompressor.c --- a/contrib/python-zstandard/c-ext/decompressor.c +++ b/contrib/python-zstandard/c-ext/decompressor.c @@ -7,19 +7,37 @@ */ #include "python-zstandard.h" +#include "pool.h" extern PyObject* ZstdError; -ZSTD_DStream* DStream_from_ZstdDecompressor(ZstdDecompressor* decompressor) { - ZSTD_DStream* dstream; +/** + * Ensure the ZSTD_DStream on a ZstdDecompressor is initialized and reset. + * + * This should be called before starting a decompression operation with a + * ZSTD_DStream on a ZstdDecompressor. + */ +int init_dstream(ZstdDecompressor* decompressor) { void* dictData = NULL; size_t dictSize = 0; size_t zresult; - dstream = ZSTD_createDStream(); - if (!dstream) { + /* Simple case of dstream already exists. Just reset it. */ + if (decompressor->dstream) { + zresult = ZSTD_resetDStream(decompressor->dstream); + if (ZSTD_isError(zresult)) { + PyErr_Format(ZstdError, "could not reset DStream: %s", + ZSTD_getErrorName(zresult)); + return -1; + } + + return 0; + } + + decompressor->dstream = ZSTD_createDStream(); + if (!decompressor->dstream) { PyErr_SetString(ZstdError, "could not create DStream"); - return NULL; + return -1; } if (decompressor->dict) { @@ -28,19 +46,23 @@ ZSTD_DStream* DStream_from_ZstdDecompres } if (dictData) { - zresult = ZSTD_initDStream_usingDict(dstream, dictData, dictSize); + zresult = ZSTD_initDStream_usingDict(decompressor->dstream, dictData, dictSize); } else { - zresult = ZSTD_initDStream(dstream); + zresult = ZSTD_initDStream(decompressor->dstream); } if (ZSTD_isError(zresult)) { + /* Don't leave a reference to an invalid object. */ + ZSTD_freeDStream(decompressor->dstream); + decompressor->dstream = NULL; + PyErr_Format(ZstdError, "could not initialize DStream: %s", ZSTD_getErrorName(zresult)); - return NULL; + return -1; } - return dstream; + return 0; } PyDoc_STRVAR(Decompressor__doc__, @@ -93,17 +115,23 @@ except: } static void Decompressor_dealloc(ZstdDecompressor* self) { - if (self->dctx) { - ZSTD_freeDCtx(self->dctx); - } - - Py_XDECREF(self->dict); + Py_CLEAR(self->dict); if (self->ddict) { ZSTD_freeDDict(self->ddict); self->ddict = NULL; } + if (self->dstream) { + ZSTD_freeDStream(self->dstream); + self->dstream = NULL; + } + + if (self->dctx) { + ZSTD_freeDCtx(self->dctx); + self->dctx = NULL; + } + PyObject_Del(self); } @@ -132,7 +160,6 @@ static PyObject* Decompressor_copy_strea PyObject* dest; size_t inSize = ZSTD_DStreamInSize(); size_t outSize = ZSTD_DStreamOutSize(); - ZSTD_DStream* dstream; ZSTD_inBuffer input; ZSTD_outBuffer output; Py_ssize_t totalRead = 0; @@ -164,8 +191,7 @@ static PyObject* Decompressor_copy_strea /* Prevent free on uninitialized memory in finally. */ output.dst = NULL; - dstream = DStream_from_ZstdDecompressor(self); - if (!dstream) { + if (0 != init_dstream(self)) { res = NULL; goto finally; } @@ -203,7 +229,7 @@ static PyObject* Decompressor_copy_strea while (input.pos < input.size) { Py_BEGIN_ALLOW_THREADS - zresult = ZSTD_decompressStream(dstream, &output, &input); + zresult = ZSTD_decompressStream(self->dstream, &output, &input); Py_END_ALLOW_THREADS if (ZSTD_isError(zresult)) { @@ -230,24 +256,17 @@ static PyObject* Decompressor_copy_strea /* Source stream is exhausted. Finish up. */ - ZSTD_freeDStream(dstream); - dstream = NULL; - totalReadPy = PyLong_FromSsize_t(totalRead); totalWritePy = PyLong_FromSsize_t(totalWrite); res = PyTuple_Pack(2, totalReadPy, totalWritePy); - Py_DecRef(totalReadPy); - Py_DecRef(totalWritePy); + Py_DECREF(totalReadPy); + Py_DECREF(totalWritePy); finally: if (output.dst) { PyMem_Free(output.dst); } - if (dstream) { - ZSTD_freeDStream(dstream); - } - return res; } @@ -352,18 +371,18 @@ PyObject* Decompressor_decompress(ZstdDe if (ZSTD_isError(zresult)) { PyErr_Format(ZstdError, "decompression error: %s", ZSTD_getErrorName(zresult)); - Py_DecRef(result); + Py_DECREF(result); return NULL; } else if (decompressedSize && zresult != decompressedSize) { PyErr_Format(ZstdError, "decompression error: decompressed %zu bytes; expected %llu", zresult, decompressedSize); - Py_DecRef(result); + Py_DECREF(result); return NULL; } else if (zresult < destCapacity) { if (_PyBytes_Resize(&result, zresult)) { - Py_DecRef(result); + Py_DECREF(result); return NULL; } } @@ -382,22 +401,19 @@ PyDoc_STRVAR(Decompressor_decompressobj_ ); static ZstdDecompressionObj* Decompressor_decompressobj(ZstdDecompressor* self) { - ZstdDecompressionObj* result = PyObject_New(ZstdDecompressionObj, &ZstdDecompressionObjType); + ZstdDecompressionObj* result = (ZstdDecompressionObj*)PyObject_CallObject((PyObject*)&ZstdDecompressionObjType, NULL); if (!result) { return NULL; } - result->dstream = DStream_from_ZstdDecompressor(self); - if (!result->dstream) { - Py_DecRef((PyObject*)result); + if (0 != init_dstream(self)) { + Py_DECREF(result); return NULL; } result->decompressor = self; Py_INCREF(result->decompressor); - result->finished = 0; - return result; } @@ -447,18 +463,11 @@ static ZstdDecompressorIterator* Decompr return NULL; } - result = PyObject_New(ZstdDecompressorIterator, &ZstdDecompressorIteratorType); + result = (ZstdDecompressorIterator*)PyObject_CallObject((PyObject*)&ZstdDecompressorIteratorType, NULL); if (!result) { return NULL; } - result->decompressor = NULL; - result->reader = NULL; - result->buffer = NULL; - result->dstream = NULL; - result->input.src = NULL; - result->output.dst = NULL; - if (PyObject_HasAttrString(reader, "read")) { result->reader = reader; Py_INCREF(result->reader); @@ -475,8 +484,6 @@ static ZstdDecompressorIterator* Decompr if (0 != PyObject_GetBuffer(reader, result->buffer, PyBUF_CONTIG_RO)) { goto except; } - - result->bufferOffset = 0; } else { PyErr_SetString(PyExc_ValueError, @@ -491,8 +498,7 @@ static ZstdDecompressorIterator* Decompr result->outSize = outSize; result->skipBytes = skipBytes; - result->dstream = DStream_from_ZstdDecompressor(self); - if (!result->dstream) { + if (0 != init_dstream(self)) { goto except; } @@ -501,16 +507,6 @@ static ZstdDecompressorIterator* Decompr PyErr_NoMemory(); goto except; } - result->input.size = 0; - result->input.pos = 0; - - result->output.dst = NULL; - result->output.size = 0; - result->output.pos = 0; - - result->readCount = 0; - result->finishedInput = 0; - result->finishedOutput = 0; goto finally; @@ -563,7 +559,7 @@ static ZstdDecompressionWriter* Decompre return NULL; } - result = PyObject_New(ZstdDecompressionWriter, &ZstdDecompressionWriterType); + result = (ZstdDecompressionWriter*)PyObject_CallObject((PyObject*)&ZstdDecompressionWriterType, NULL); if (!result) { return NULL; } @@ -576,9 +572,6 @@ static ZstdDecompressionWriter* Decompre result->outSize = outSize; - result->entered = 0; - result->dstream = NULL; - return result; } @@ -776,6 +769,746 @@ finally: return result; } +typedef struct { + void* sourceData; + size_t sourceSize; + unsigned long long destSize; +} FramePointer; + +typedef struct { + FramePointer* frames; + Py_ssize_t framesSize; + unsigned long long compressedSize; +} FrameSources; + +typedef struct { + void* dest; + Py_ssize_t destSize; + BufferSegment* segments; + Py_ssize_t segmentsSize; +} DestBuffer; + +typedef enum { + WorkerError_none = 0, + WorkerError_zstd = 1, + WorkerError_memory = 2, + WorkerError_sizeMismatch = 3, + WorkerError_unknownSize = 4, +} WorkerError; + +typedef struct { + /* Source records and length */ + FramePointer* framePointers; + /* Which records to process. */ + Py_ssize_t startOffset; + Py_ssize_t endOffset; + unsigned long long totalSourceSize; + + /* Compression state and settings. */ + ZSTD_DCtx* dctx; + ZSTD_DDict* ddict; + int requireOutputSizes; + + /* Output storage. */ + DestBuffer* destBuffers; + Py_ssize_t destCount; + + /* Item that error occurred on. */ + Py_ssize_t errorOffset; + /* If an error occurred. */ + WorkerError error; + /* result from zstd decompression operation */ + size_t zresult; +} WorkerState; + +static void decompress_worker(WorkerState* state) { + size_t allocationSize; + DestBuffer* destBuffer; + Py_ssize_t frameIndex; + Py_ssize_t localOffset = 0; + Py_ssize_t currentBufferStartIndex = state->startOffset; + Py_ssize_t remainingItems = state->endOffset - state->startOffset + 1; + void* tmpBuf; + Py_ssize_t destOffset = 0; + FramePointer* framePointers = state->framePointers; + size_t zresult; + unsigned long long totalOutputSize = 0; + + assert(NULL == state->destBuffers); + assert(0 == state->destCount); + assert(state->endOffset - state->startOffset >= 0); + + /* + * We need to allocate a buffer to hold decompressed data. How we do this + * depends on what we know about the output. The following scenarios are + * possible: + * + * 1. All structs defining frames declare the output size. + * 2. The decompressed size is embedded within the zstd frame. + * 3. The decompressed size is not stored anywhere. + * + * For now, we only support #1 and #2. + */ + + /* Resolve ouput segments. */ + for (frameIndex = state->startOffset; frameIndex <= state->endOffset; frameIndex++) { + FramePointer* fp = &framePointers[frameIndex]; + + if (0 == fp->destSize) { + fp->destSize = ZSTD_getDecompressedSize(fp->sourceData, fp->sourceSize); + if (0 == fp->destSize && state->requireOutputSizes) { + state->error = WorkerError_unknownSize; + state->errorOffset = frameIndex; + return; + } + } + + totalOutputSize += fp->destSize; + } + + state->destBuffers = calloc(1, sizeof(DestBuffer)); + if (NULL == state->destBuffers) { + state->error = WorkerError_memory; + return; + } + + state->destCount = 1; + + destBuffer = &state->destBuffers[state->destCount - 1]; + + assert(framePointers[state->startOffset].destSize > 0); /* For now. */ + + allocationSize = roundpow2(state->totalSourceSize); + + if (framePointers[state->startOffset].destSize > allocationSize) { + allocationSize = roundpow2(framePointers[state->startOffset].destSize); + } + + destBuffer->dest = malloc(allocationSize); + if (NULL == destBuffer->dest) { + state->error = WorkerError_memory; + return; + } + + destBuffer->destSize = allocationSize; + + destBuffer->segments = calloc(remainingItems, sizeof(BufferSegment)); + if (NULL == destBuffer->segments) { + /* Caller will free state->dest as part of cleanup. */ + state->error = WorkerError_memory; + return; + } + + destBuffer->segmentsSize = remainingItems; + + for (frameIndex = state->startOffset; frameIndex <= state->endOffset; frameIndex++) { + const void* source = framePointers[frameIndex].sourceData; + const size_t sourceSize = framePointers[frameIndex].sourceSize; + void* dest; + const size_t decompressedSize = framePointers[frameIndex].destSize; + size_t destAvailable = destBuffer->destSize - destOffset; + + assert(decompressedSize > 0); /* For now. */ + + /* + * Not enough space in current buffer. Finish current before and allocate and + * switch to a new one. + */ + if (decompressedSize > destAvailable) { + /* + * Shrinking the destination buffer is optional. But it should be cheap, + * so we just do it. + */ + if (destAvailable) { + tmpBuf = realloc(destBuffer->dest, destOffset); + if (NULL == tmpBuf) { + state->error = WorkerError_memory; + return; + } + + destBuffer->dest = tmpBuf; + destBuffer->destSize = destOffset; + } + + /* Truncate segments buffer. */ + tmpBuf = realloc(destBuffer->segments, + (frameIndex - currentBufferStartIndex) * sizeof(BufferSegment)); + if (NULL == tmpBuf) { + state->error = WorkerError_memory; + return; + } + + destBuffer->segments = tmpBuf; + destBuffer->segmentsSize = frameIndex - currentBufferStartIndex; + + /* Grow space for new DestBuffer. */ + tmpBuf = realloc(state->destBuffers, (state->destCount + 1) * sizeof(DestBuffer)); + if (NULL == tmpBuf) { + state->error = WorkerError_memory; + return; + } + + state->destBuffers = tmpBuf; + state->destCount++; + + destBuffer = &state->destBuffers[state->destCount - 1]; + + /* Don't take any chances will non-NULL pointers. */ + memset(destBuffer, 0, sizeof(DestBuffer)); + + allocationSize = roundpow2(state->totalSourceSize); + + if (decompressedSize > allocationSize) { + allocationSize = roundpow2(decompressedSize); + } + + destBuffer->dest = malloc(allocationSize); + if (NULL == destBuffer->dest) { + state->error = WorkerError_memory; + return; + } + + destBuffer->destSize = allocationSize; + destAvailable = allocationSize; + destOffset = 0; + localOffset = 0; + + destBuffer->segments = calloc(remainingItems, sizeof(BufferSegment)); + if (NULL == destBuffer->segments) { + state->error = WorkerError_memory; + return; + } + + destBuffer->segmentsSize = remainingItems; + currentBufferStartIndex = frameIndex; + } + + dest = (char*)destBuffer->dest + destOffset; + + if (state->ddict) { + zresult = ZSTD_decompress_usingDDict(state->dctx, dest, decompressedSize, + source, sourceSize, state->ddict); + } + else { + zresult = ZSTD_decompressDCtx(state->dctx, dest, decompressedSize, + source, sourceSize); + } + + if (ZSTD_isError(zresult)) { + state->error = WorkerError_zstd; + state->zresult = zresult; + state->errorOffset = frameIndex; + return; + } + else if (zresult != decompressedSize) { + state->error = WorkerError_sizeMismatch; + state->zresult = zresult; + state->errorOffset = frameIndex; + return; + } + + destBuffer->segments[localOffset].offset = destOffset; + destBuffer->segments[localOffset].length = decompressedSize; + destOffset += zresult; + localOffset++; + remainingItems--; + } + + if (destBuffer->destSize > destOffset) { + tmpBuf = realloc(destBuffer->dest, destOffset); + if (NULL == tmpBuf) { + state->error = WorkerError_memory; + return; + } + + destBuffer->dest = tmpBuf; + destBuffer->destSize = destOffset; + } +} + +ZstdBufferWithSegmentsCollection* decompress_from_framesources(ZstdDecompressor* decompressor, FrameSources* frames, + unsigned int threadCount) { + void* dictData = NULL; + size_t dictSize = 0; + Py_ssize_t i = 0; + int errored = 0; + Py_ssize_t segmentsCount; + ZstdBufferWithSegments* bws = NULL; + PyObject* resultArg = NULL; + Py_ssize_t resultIndex; + ZstdBufferWithSegmentsCollection* result = NULL; + FramePointer* framePointers = frames->frames; + unsigned long long workerBytes = 0; + int currentThread = 0; + Py_ssize_t workerStartOffset = 0; + POOL_ctx* pool = NULL; + WorkerState* workerStates = NULL; + unsigned long long bytesPerWorker; + + /* Caller should normalize 0 and negative values to 1 or larger. */ + assert(threadCount >= 1); + + /* More threads than inputs makes no sense under any conditions. */ + threadCount = frames->framesSize < threadCount ? (unsigned int)frames->framesSize + : threadCount; + + /* TODO lower thread count if input size is too small and threads would just + add overhead. */ + + if (decompressor->dict) { + dictData = decompressor->dict->dictData; + dictSize = decompressor->dict->dictSize; + } + + if (dictData && !decompressor->ddict) { + Py_BEGIN_ALLOW_THREADS + decompressor->ddict = ZSTD_createDDict_byReference(dictData, dictSize); + Py_END_ALLOW_THREADS + + if (!decompressor->ddict) { + PyErr_SetString(ZstdError, "could not create decompression dict"); + return NULL; + } + } + + /* If threadCount==1, we don't start a thread pool. But we do leverage the + same API for dispatching work. */ + workerStates = PyMem_Malloc(threadCount * sizeof(WorkerState)); + if (NULL == workerStates) { + PyErr_NoMemory(); + goto finally; + } + + memset(workerStates, 0, threadCount * sizeof(WorkerState)); + + if (threadCount > 1) { + pool = POOL_create(threadCount, 1); + if (NULL == pool) { + PyErr_SetString(ZstdError, "could not initialize zstd thread pool"); + goto finally; + } + } + + bytesPerWorker = frames->compressedSize / threadCount; + + for (i = 0; i < threadCount; i++) { + workerStates[i].dctx = ZSTD_createDCtx(); + if (NULL == workerStates[i].dctx) { + PyErr_NoMemory(); + goto finally; + } + + ZSTD_copyDCtx(workerStates[i].dctx, decompressor->dctx); + + workerStates[i].ddict = decompressor->ddict; + workerStates[i].framePointers = framePointers; + workerStates[i].requireOutputSizes = 1; + } + + Py_BEGIN_ALLOW_THREADS + /* There are many ways to split work among workers. + + For now, we take a simple approach of splitting work so each worker + gets roughly the same number of input bytes. This will result in more + starvation than running N>threadCount jobs. But it avoids complications + around state tracking, which could involve extra locking. + */ + for (i = 0; i < frames->framesSize; i++) { + workerBytes += frames->frames[i].sourceSize; + + /* + * The last worker/thread needs to handle all remaining work. Don't + * trigger it prematurely. Defer to the block outside of the loop. + * (But still process this loop so workerBytes is correct. + */ + if (currentThread == threadCount - 1) { + continue; + } + + if (workerBytes >= bytesPerWorker) { + workerStates[currentThread].startOffset = workerStartOffset; + workerStates[currentThread].endOffset = i; + workerStates[currentThread].totalSourceSize = workerBytes; + + if (threadCount > 1) { + POOL_add(pool, (POOL_function)decompress_worker, &workerStates[currentThread]); + } + else { + decompress_worker(&workerStates[currentThread]); + } + currentThread++; + workerStartOffset = i + 1; + workerBytes = 0; + } + } + + if (workerBytes) { + workerStates[currentThread].startOffset = workerStartOffset; + workerStates[currentThread].endOffset = frames->framesSize - 1; + workerStates[currentThread].totalSourceSize = workerBytes; + + if (threadCount > 1) { + POOL_add(pool, (POOL_function)decompress_worker, &workerStates[currentThread]); + } + else { + decompress_worker(&workerStates[currentThread]); + } + } + + if (threadCount > 1) { + POOL_free(pool); + pool = NULL; + } + Py_END_ALLOW_THREADS + + for (i = 0; i < threadCount; i++) { + switch (workerStates[i].error) { + case WorkerError_none: + break; + + case WorkerError_zstd: + PyErr_Format(ZstdError, "error decompressing item %zd: %s", + workerStates[i].errorOffset, ZSTD_getErrorName(workerStates[i].zresult)); + errored = 1; + break; + + case WorkerError_memory: + PyErr_NoMemory(); + errored = 1; + break; + + case WorkerError_sizeMismatch: + PyErr_Format(ZstdError, "error decompressing item %zd: decompressed %zu bytes; expected %llu", + workerStates[i].errorOffset, workerStates[i].zresult, + framePointers[workerStates[i].errorOffset].destSize); + errored = 1; + break; + + case WorkerError_unknownSize: + PyErr_Format(PyExc_ValueError, "could not determine decompressed size of item %zd", + workerStates[i].errorOffset); + errored = 1; + break; + + default: + PyErr_Format(ZstdError, "unhandled error type: %d; this is a bug", + workerStates[i].error); + errored = 1; + break; + } + + if (errored) { + break; + } + } + + if (errored) { + goto finally; + } + + segmentsCount = 0; + for (i = 0; i < threadCount; i++) { + segmentsCount += workerStates[i].destCount; + } + + resultArg = PyTuple_New(segmentsCount); + if (NULL == resultArg) { + goto finally; + } + + resultIndex = 0; + + for (i = 0; i < threadCount; i++) { + Py_ssize_t bufferIndex; + WorkerState* state = &workerStates[i]; + + for (bufferIndex = 0; bufferIndex < state->destCount; bufferIndex++) { + DestBuffer* destBuffer = &state->destBuffers[bufferIndex]; + + bws = BufferWithSegments_FromMemory(destBuffer->dest, destBuffer->destSize, + destBuffer->segments, destBuffer->segmentsSize); + if (NULL == bws) { + goto finally; + } + + /* + * Memory for buffer and segments was allocated using malloc() in worker + * and the memory is transferred to the BufferWithSegments instance. So + * tell instance to use free() and NULL the reference in the state struct + * so it isn't freed below. + */ + bws->useFree = 1; + destBuffer->dest = NULL; + destBuffer->segments = NULL; + + PyTuple_SET_ITEM(resultArg, resultIndex++, (PyObject*)bws); + } + } + + result = (ZstdBufferWithSegmentsCollection*)PyObject_CallObject( + (PyObject*)&ZstdBufferWithSegmentsCollectionType, resultArg); + +finally: + Py_CLEAR(resultArg); + + if (workerStates) { + for (i = 0; i < threadCount; i++) { + Py_ssize_t bufferIndex; + WorkerState* state = &workerStates[i]; + + if (state->dctx) { + ZSTD_freeDCtx(state->dctx); + } + + for (bufferIndex = 0; bufferIndex < state->destCount; bufferIndex++) { + if (state->destBuffers) { + /* + * Will be NULL if memory transfered to a BufferWithSegments. + * Otherwise it is left over after an error occurred. + */ + free(state->destBuffers[bufferIndex].dest); + free(state->destBuffers[bufferIndex].segments); + } + } + + free(state->destBuffers); + } + + PyMem_Free(workerStates); + } + + POOL_free(pool); + + return result; +} + +PyDoc_STRVAR(Decompressor_multi_decompress_to_buffer__doc__, +"Decompress multiple frames to output buffers\n" +"\n" +"Receives a ``BufferWithSegments``, a ``BufferWithSegmentsCollection`` or a\n" +"list of bytes-like objects. Each item in the passed collection should be a\n" +"compressed zstd frame.\n" +"\n" +"Unless ``decompressed_sizes`` is specified, the content size *must* be\n" +"written into the zstd frame header. If ``decompressed_sizes`` is specified,\n" +"it is an object conforming to the buffer protocol that represents an array\n" +"of 64-bit unsigned integers in the machine's native format. Specifying\n" +"``decompressed_sizes`` avoids a pre-scan of each frame to determine its\n" +"output size.\n" +"\n" +"Returns a ``BufferWithSegmentsCollection`` containing the decompressed\n" +"data. All decompressed data is allocated in a single memory buffer. The\n" +"``BufferWithSegments`` instance tracks which objects are at which offsets\n" +"and their respective lengths.\n" +"\n" +"The ``threads`` argument controls how many threads to use for operations.\n" +"Negative values will use the same number of threads as logical CPUs on the\n" +"machine.\n" +); + +static ZstdBufferWithSegmentsCollection* Decompressor_multi_decompress_to_buffer(ZstdDecompressor* self, PyObject* args, PyObject* kwargs) { + static char* kwlist[] = { + "frames", + "decompressed_sizes", + "threads", + NULL + }; + + PyObject* frames; + Py_buffer frameSizes; + int threads = 0; + Py_ssize_t frameCount; + Py_buffer* frameBuffers = NULL; + FramePointer* framePointers = NULL; + unsigned long long* frameSizesP = NULL; + unsigned long long totalInputSize = 0; + FrameSources frameSources; + ZstdBufferWithSegmentsCollection* result = NULL; + Py_ssize_t i; + + memset(&frameSizes, 0, sizeof(frameSizes)); + +#if PY_MAJOR_VERSION >= 3 + if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|y*i:multi_decompress_to_buffer", +#else + if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|s*i:multi_decompress_to_buffer", +#endif + kwlist, &frames, &frameSizes, &threads)) { + return NULL; + } + + if (frameSizes.buf) { + if (!PyBuffer_IsContiguous(&frameSizes, 'C') || frameSizes.ndim > 1) { + PyErr_SetString(PyExc_ValueError, "decompressed_sizes buffer should be contiguous and have a single dimension"); + goto finally; + } + + frameSizesP = (unsigned long long*)frameSizes.buf; + } + + if (threads < 0) { + threads = cpu_count(); + } + + if (threads < 2) { + threads = 1; + } + + if (PyObject_TypeCheck(frames, &ZstdBufferWithSegmentsType)) { + ZstdBufferWithSegments* buffer = (ZstdBufferWithSegments*)frames; + frameCount = buffer->segmentCount; + + if (frameSizes.buf && frameSizes.len != frameCount * (Py_ssize_t)sizeof(unsigned long long)) { + PyErr_Format(PyExc_ValueError, "decompressed_sizes size mismatch; expected %zd, got %zd", + frameCount * sizeof(unsigned long long), frameSizes.len); + goto finally; + } + + framePointers = PyMem_Malloc(frameCount * sizeof(FramePointer)); + if (!framePointers) { + PyErr_NoMemory(); + goto finally; + } + + for (i = 0; i < frameCount; i++) { + void* sourceData; + unsigned long long sourceSize; + unsigned long long decompressedSize = 0; + + if (buffer->segments[i].offset + buffer->segments[i].length > buffer->dataSize) { + PyErr_Format(PyExc_ValueError, "item %zd has offset outside memory area", i); + goto finally; + } + + sourceData = (char*)buffer->data + buffer->segments[i].offset; + sourceSize = buffer->segments[i].length; + totalInputSize += sourceSize; + + if (frameSizesP) { + decompressedSize = frameSizesP[i]; + } + + framePointers[i].sourceData = sourceData; + framePointers[i].sourceSize = sourceSize; + framePointers[i].destSize = decompressedSize; + } + } + else if (PyObject_TypeCheck(frames, &ZstdBufferWithSegmentsCollectionType)) { + Py_ssize_t offset = 0; + ZstdBufferWithSegments* buffer; + ZstdBufferWithSegmentsCollection* collection = (ZstdBufferWithSegmentsCollection*)frames; + + frameCount = BufferWithSegmentsCollection_length(collection); + + if (frameSizes.buf && frameSizes.len != frameCount) { + PyErr_Format(PyExc_ValueError, + "decompressed_sizes size mismatch; expected %zd; got %zd", + frameCount * sizeof(unsigned long long), frameSizes.len); + goto finally; + } + + framePointers = PyMem_Malloc(frameCount * sizeof(FramePointer)); + if (NULL == framePointers) { + PyErr_NoMemory(); + goto finally; + } + + /* Iterate the data structure directly because it is faster. */ + for (i = 0; i < collection->bufferCount; i++) { + Py_ssize_t segmentIndex; + buffer = collection->buffers[i]; + + for (segmentIndex = 0; segmentIndex < buffer->segmentCount; segmentIndex++) { + if (buffer->segments[segmentIndex].offset + buffer->segments[segmentIndex].length > buffer->dataSize) { + PyErr_Format(PyExc_ValueError, "item %zd has offset outside memory area", + offset); + goto finally; + } + + totalInputSize += buffer->segments[segmentIndex].length; + + framePointers[offset].sourceData = (char*)buffer->data + buffer->segments[segmentIndex].offset; + framePointers[offset].sourceSize = buffer->segments[segmentIndex].length; + framePointers[offset].destSize = frameSizesP ? frameSizesP[offset] : 0; + + offset++; + } + } + } + else if (PyList_Check(frames)) { + frameCount = PyList_GET_SIZE(frames); + + if (frameSizes.buf && frameSizes.len != frameCount * (Py_ssize_t)sizeof(unsigned long long)) { + PyErr_Format(PyExc_ValueError, "decompressed_sizes size mismatch; expected %zd, got %zd", + frameCount * sizeof(unsigned long long), frameSizes.len); + goto finally; + } + + framePointers = PyMem_Malloc(frameCount * sizeof(FramePointer)); + if (!framePointers) { + PyErr_NoMemory(); + goto finally; + } + + /* + * It is not clear whether Py_buffer.buf is still valid after + * PyBuffer_Release. So, we hold a reference to all Py_buffer instances + * for the duration of the operation. + */ + frameBuffers = PyMem_Malloc(frameCount * sizeof(Py_buffer)); + if (NULL == frameBuffers) { + PyErr_NoMemory(); + goto finally; + } + + memset(frameBuffers, 0, frameCount * sizeof(Py_buffer)); + + /* Do a pass to assemble info about our input buffers and output sizes. */ + for (i = 0; i < frameCount; i++) { + if (0 != PyObject_GetBuffer(PyList_GET_ITEM(frames, i), + &frameBuffers[i], PyBUF_CONTIG_RO)) { + PyErr_Clear(); + PyErr_Format(PyExc_TypeError, "item %zd not a bytes like object", i); + goto finally; + } + + totalInputSize += frameBuffers[i].len; + + framePointers[i].sourceData = frameBuffers[i].buf; + framePointers[i].sourceSize = frameBuffers[i].len; + framePointers[i].destSize = frameSizesP ? frameSizesP[i] : 0; + } + } + else { + PyErr_SetString(PyExc_TypeError, "argument must be list or BufferWithSegments"); + goto finally; + } + + /* We now have an array with info about our inputs and outputs. Feed it into + our generic decompression function. */ + frameSources.frames = framePointers; + frameSources.framesSize = frameCount; + frameSources.compressedSize = totalInputSize; + + result = decompress_from_framesources(self, &frameSources, threads); + +finally: + if (frameSizes.buf) { + PyBuffer_Release(&frameSizes); + } + PyMem_Free(framePointers); + + if (frameBuffers) { + for (i = 0; i < frameCount; i++) { + PyBuffer_Release(&frameBuffers[i]); + } + + PyMem_Free(frameBuffers); + } + + return result; +} + static PyMethodDef Decompressor_methods[] = { { "copy_stream", (PyCFunction)Decompressor_copy_stream, METH_VARARGS | METH_KEYWORDS, Decompressor_copy_stream__doc__ }, @@ -789,6 +1522,8 @@ static PyMethodDef Decompressor_methods[ Decompressor_write_to__doc__ }, { "decompress_content_dict_chain", (PyCFunction)Decompressor_decompress_content_dict_chain, METH_VARARGS | METH_KEYWORDS, Decompressor_decompress_content_dict_chain__doc__ }, + { "multi_decompress_to_buffer", (PyCFunction)Decompressor_multi_decompress_to_buffer, + METH_VARARGS | METH_KEYWORDS, Decompressor_multi_decompress_to_buffer__doc__ }, { NULL, NULL } }; diff --git a/contrib/python-zstandard/c-ext/decompressoriterator.c b/contrib/python-zstandard/c-ext/decompressoriterator.c --- a/contrib/python-zstandard/c-ext/decompressoriterator.c +++ b/contrib/python-zstandard/c-ext/decompressoriterator.c @@ -26,11 +26,6 @@ static void ZstdDecompressorIterator_dea self->buffer = NULL; } - if (self->dstream) { - ZSTD_freeDStream(self->dstream); - self->dstream = NULL; - } - if (self->input.src) { PyMem_Free((void*)self->input.src); self->input.src = NULL; @@ -50,6 +45,8 @@ static DecompressorIteratorResult read_d DecompressorIteratorResult result; size_t oldInputPos = self->input.pos; + assert(self->decompressor->dstream); + result.chunk = NULL; chunk = PyBytes_FromStringAndSize(NULL, self->outSize); @@ -63,7 +60,7 @@ static DecompressorIteratorResult read_d self->output.pos = 0; Py_BEGIN_ALLOW_THREADS - zresult = ZSTD_decompressStream(self->dstream, &self->output, &self->input); + zresult = ZSTD_decompressStream(self->decompressor->dstream, &self->output, &self->input); Py_END_ALLOW_THREADS /* We're done with the pointer. Nullify to prevent anyone from getting a @@ -160,7 +157,7 @@ read_from_source: PyErr_SetString(PyExc_ValueError, "skip_bytes larger than first input chunk; " "this scenario is currently unsupported"); - Py_DecRef(readResult); + Py_XDECREF(readResult); return NULL; } @@ -179,7 +176,7 @@ read_from_source: else if (!self->readCount) { self->finishedInput = 1; self->finishedOutput = 1; - Py_DecRef(readResult); + Py_XDECREF(readResult); PyErr_SetString(PyExc_StopIteration, "empty input"); return NULL; } @@ -188,7 +185,7 @@ read_from_source: } /* We've copied the data managed by memory. Discard the Python object. */ - Py_DecRef(readResult); + Py_XDECREF(readResult); } result = read_decompressor_iterator(self); diff --git a/contrib/python-zstandard/c-ext/dictparams.c b/contrib/python-zstandard/c-ext/dictparams.c deleted file mode 100644 --- a/contrib/python-zstandard/c-ext/dictparams.c +++ /dev/null @@ -1,141 +0,0 @@ -/** -* Copyright (c) 2016-present, Gregory Szorc -* All rights reserved. -* -* This software may be modified and distributed under the terms -* of the BSD license. See the LICENSE file for details. -*/ - -#include "python-zstandard.h" - -PyDoc_STRVAR(DictParameters__doc__, -"DictParameters: low-level control over dictionary generation"); - -static PyObject* DictParameters_new(PyTypeObject* subtype, PyObject* args, PyObject* kwargs) { - DictParametersObject* self; - unsigned selectivityLevel; - int compressionLevel; - unsigned notificationLevel; - unsigned dictID; - - if (!PyArg_ParseTuple(args, "IiII:DictParameters", - &selectivityLevel, &compressionLevel, ¬ificationLevel, &dictID)) { - return NULL; - } - - self = (DictParametersObject*)subtype->tp_alloc(subtype, 1); - if (!self) { - return NULL; - } - - self->selectivityLevel = selectivityLevel; - self->compressionLevel = compressionLevel; - self->notificationLevel = notificationLevel; - self->dictID = dictID; - - return (PyObject*)self; -} - -static void DictParameters_dealloc(PyObject* self) { - PyObject_Del(self); -} - -static PyMemberDef DictParameters_members[] = { - { "selectivity_level", T_UINT, - offsetof(DictParametersObject, selectivityLevel), READONLY, - "selectivity level" }, - { "compression_level", T_INT, - offsetof(DictParametersObject, compressionLevel), READONLY, - "compression level" }, - { "notification_level", T_UINT, - offsetof(DictParametersObject, notificationLevel), READONLY, - "notification level" }, - { "dict_id", T_UINT, - offsetof(DictParametersObject, dictID), READONLY, - "dictionary ID" }, - { NULL } -}; - -static Py_ssize_t DictParameters_length(PyObject* self) { - return 4; -} - -static PyObject* DictParameters_item(PyObject* o, Py_ssize_t i) { - DictParametersObject* self = (DictParametersObject*)o; - - switch (i) { - case 0: - return PyLong_FromLong(self->selectivityLevel); - case 1: - return PyLong_FromLong(self->compressionLevel); - case 2: - return PyLong_FromLong(self->notificationLevel); - case 3: - return PyLong_FromLong(self->dictID); - default: - PyErr_SetString(PyExc_IndexError, "index out of range"); - return NULL; - } -} - -static PySequenceMethods DictParameters_sq = { - DictParameters_length, /* sq_length */ - 0, /* sq_concat */ - 0, /* sq_repeat */ - DictParameters_item, /* sq_item */ - 0, /* sq_ass_item */ - 0, /* sq_contains */ - 0, /* sq_inplace_concat */ - 0 /* sq_inplace_repeat */ -}; - -PyTypeObject DictParametersType = { - PyVarObject_HEAD_INIT(NULL, 0) - "DictParameters", /* tp_name */ - sizeof(DictParametersObject), /* tp_basicsize */ - 0, /* tp_itemsize */ - (destructor)DictParameters_dealloc, /* tp_dealloc */ - 0, /* tp_print */ - 0, /* tp_getattr */ - 0, /* tp_setattr */ - 0, /* tp_compare */ - 0, /* tp_repr */ - 0, /* tp_as_number */ - &DictParameters_sq, /* tp_as_sequence */ - 0, /* tp_as_mapping */ - 0, /* tp_hash */ - 0, /* tp_call */ - 0, /* tp_str */ - 0, /* tp_getattro */ - 0, /* tp_setattro */ - 0, /* tp_as_buffer */ - Py_TPFLAGS_DEFAULT, /* tp_flags */ - DictParameters__doc__, /* tp_doc */ - 0, /* tp_traverse */ - 0, /* tp_clear */ - 0, /* tp_richcompare */ - 0, /* tp_weaklistoffset */ - 0, /* tp_iter */ - 0, /* tp_iternext */ - 0, /* tp_methods */ - DictParameters_members, /* tp_members */ - 0, /* tp_getset */ - 0, /* tp_base */ - 0, /* tp_dict */ - 0, /* tp_descr_get */ - 0, /* tp_descr_set */ - 0, /* tp_dictoffset */ - 0, /* tp_init */ - 0, /* tp_alloc */ - DictParameters_new, /* tp_new */ -}; - -void dictparams_module_init(PyObject* mod) { - Py_TYPE(&DictParametersType) = &PyType_Type; - if (PyType_Ready(&DictParametersType) < 0) { - return; - } - - Py_IncRef((PyObject*)&DictParametersType); - PyModule_AddObject(mod, "DictParameters", (PyObject*)&DictParametersType); -} diff --git a/contrib/python-zstandard/c-ext/frameparams.c b/contrib/python-zstandard/c-ext/frameparams.c --- a/contrib/python-zstandard/c-ext/frameparams.c +++ b/contrib/python-zstandard/c-ext/frameparams.c @@ -127,6 +127,6 @@ void frameparams_module_init(PyObject* m return; } - Py_IncRef((PyObject*)&FrameParametersType); + Py_INCREF(&FrameParametersType); PyModule_AddObject(mod, "FrameParameters", (PyObject*)&FrameParametersType); } diff --git a/contrib/python-zstandard/c-ext/python-zstandard.h b/contrib/python-zstandard/c-ext/python-zstandard.h --- a/contrib/python-zstandard/c-ext/python-zstandard.h +++ b/contrib/python-zstandard/c-ext/python-zstandard.h @@ -15,14 +15,20 @@ #include "mem.h" #include "zstd.h" #include "zdict.h" +#include "zstdmt_compress.h" -#define PYTHON_ZSTANDARD_VERSION "0.7.0" +#define PYTHON_ZSTANDARD_VERSION "0.8.0" typedef enum { compressorobj_flush_finish, compressorobj_flush_block, } CompressorObj_Flush; +/* + Represents a CompressionParameters type. + + This type is basically a wrapper around ZSTD_compressionParameters. +*/ typedef struct { PyObject_HEAD unsigned windowLog; @@ -36,6 +42,11 @@ typedef struct { extern PyTypeObject CompressionParametersType; +/* + Represents a FrameParameters type. + + This type is basically a wrapper around ZSTD_frameParams. +*/ typedef struct { PyObject_HEAD unsigned long long frameContentSize; @@ -46,34 +57,55 @@ typedef struct { extern PyTypeObject FrameParametersType; -typedef struct { - PyObject_HEAD - unsigned selectivityLevel; - int compressionLevel; - unsigned notificationLevel; - unsigned dictID; -} DictParametersObject; +/* + Represents a ZstdCompressionDict type. -extern PyTypeObject DictParametersType; - + Instances hold data used for a zstd compression dictionary. +*/ typedef struct { PyObject_HEAD + /* Pointer to dictionary data. Owned by self. */ void* dictData; + /* Size of dictionary data. */ size_t dictSize; + /* k parameter for cover dictionaries. Only populated by train_cover_dict(). */ + unsigned k; + /* d parameter for cover dictionaries. Only populated by train_cover_dict(). */ + unsigned d; } ZstdCompressionDict; extern PyTypeObject ZstdCompressionDictType; +/* + Represents a ZstdCompressor type. +*/ typedef struct { PyObject_HEAD + /* Configured compression level. Should be always set. */ int compressionLevel; + /* Number of threads to use for operations. */ + unsigned int threads; + /* Pointer to compression dictionary to use. NULL if not using dictionary + compression. */ ZstdCompressionDict* dict; + /* Compression context to use. Populated during object construction. NULL + if using multi-threaded compression. */ ZSTD_CCtx* cctx; + /* Multi-threaded compression context to use. Populated during object + construction. NULL if not using multi-threaded compression. */ + ZSTDMT_CCtx* mtcctx; + /* Digest compression dictionary. NULL initially. Populated on first use. */ ZSTD_CDict* cdict; + /* Low-level compression parameter control. NULL unless passed to + constructor. Takes precedence over `compressionLevel` if defined. */ CompressionParametersObject* cparams; + /* Controls zstd frame options. */ ZSTD_frameParameters fparams; + /* Holds state for streaming compression. Shared across all invocation. + Populated on first use. */ + ZSTD_CStream* cstream; } ZstdCompressor; extern PyTypeObject ZstdCompressorType; @@ -82,7 +114,6 @@ typedef struct { PyObject_HEAD ZstdCompressor* compressor; - ZSTD_CStream* cstream; ZSTD_outBuffer output; int finished; } ZstdCompressionObj; @@ -96,7 +127,6 @@ typedef struct { PyObject* writer; Py_ssize_t sourceSize; size_t outSize; - ZSTD_CStream* cstream; int entered; } ZstdCompressionWriter; @@ -113,7 +143,6 @@ typedef struct { size_t inSize; size_t outSize; - ZSTD_CStream* cstream; ZSTD_inBuffer input; ZSTD_outBuffer output; int finishedOutput; @@ -130,6 +159,7 @@ typedef struct { ZstdCompressionDict* dict; ZSTD_DDict* ddict; + ZSTD_DStream* dstream; } ZstdDecompressor; extern PyTypeObject ZstdDecompressorType; @@ -138,7 +168,6 @@ typedef struct { PyObject_HEAD ZstdDecompressor* decompressor; - ZSTD_DStream* dstream; int finished; } ZstdDecompressionObj; @@ -150,7 +179,6 @@ typedef struct { ZstdDecompressor* decompressor; PyObject* writer; size_t outSize; - ZSTD_DStream* dstream; int entered; } ZstdDecompressionWriter; @@ -166,7 +194,6 @@ typedef struct { size_t inSize; size_t outSize; size_t skipBytes; - ZSTD_DStream* dstream; ZSTD_inBuffer input; ZSTD_outBuffer output; Py_ssize_t readCount; @@ -181,10 +208,78 @@ typedef struct { PyObject* chunk; } DecompressorIteratorResult; +typedef struct { + unsigned long long offset; + unsigned long long length; +} BufferSegment; + +typedef struct { + PyObject_HEAD + + PyObject* parent; + BufferSegment* segments; + Py_ssize_t segmentCount; +} ZstdBufferSegments; + +extern PyTypeObject ZstdBufferSegmentsType; + +typedef struct { + PyObject_HEAD + + PyObject* parent; + void* data; + Py_ssize_t dataSize; + unsigned long long offset; +} ZstdBufferSegment; + +extern PyTypeObject ZstdBufferSegmentType; + +typedef struct { + PyObject_HEAD + + Py_buffer parent; + void* data; + unsigned long long dataSize; + BufferSegment* segments; + Py_ssize_t segmentCount; + int useFree; +} ZstdBufferWithSegments; + +extern PyTypeObject ZstdBufferWithSegmentsType; + +/** + * An ordered collection of BufferWithSegments exposed as a squashed collection. + * + * This type provides a virtual view spanning multiple BufferWithSegments + * instances. It allows multiple instances to be "chained" together and + * exposed as a single collection. e.g. if there are 2 buffers holding + * 10 segments each, then o[14] will access the 5th segment in the 2nd buffer. + */ +typedef struct { + PyObject_HEAD + + /* An array of buffers that should be exposed through this instance. */ + ZstdBufferWithSegments** buffers; + /* Number of elements in buffers array. */ + Py_ssize_t bufferCount; + /* Array of first offset in each buffer instance. 0th entry corresponds + to number of elements in the 0th buffer. 1st entry corresponds to the + sum of elements in 0th and 1st buffers. */ + Py_ssize_t* firstElements; +} ZstdBufferWithSegmentsCollection; + +extern PyTypeObject ZstdBufferWithSegmentsCollectionType; + void ztopy_compression_parameters(CompressionParametersObject* params, ZSTD_compressionParameters* zparams); CompressionParametersObject* get_compression_parameters(PyObject* self, PyObject* args); FrameParametersObject* get_frame_parameters(PyObject* self, PyObject* args); PyObject* estimate_compression_context_size(PyObject* self, PyObject* args); -ZSTD_CStream* CStream_from_ZstdCompressor(ZstdCompressor* compressor, Py_ssize_t sourceSize); -ZSTD_DStream* DStream_from_ZstdDecompressor(ZstdDecompressor* decompressor); +int init_cstream(ZstdCompressor* compressor, unsigned long long sourceSize); +int init_mtcstream(ZstdCompressor* compressor, Py_ssize_t sourceSize); +int init_dstream(ZstdDecompressor* decompressor); ZstdCompressionDict* train_dictionary(PyObject* self, PyObject* args, PyObject* kwargs); +ZstdCompressionDict* train_cover_dictionary(PyObject* self, PyObject* args, PyObject* kwargs); +ZstdBufferWithSegments* BufferWithSegments_FromMemory(void* data, unsigned long long dataSize, BufferSegment* segments, Py_ssize_t segmentsSize); +Py_ssize_t BufferWithSegmentsCollection_length(ZstdBufferWithSegmentsCollection*); +int cpu_count(void); +size_t roundpow2(size_t); diff --git a/contrib/python-zstandard/make_cffi.py b/contrib/python-zstandard/make_cffi.py --- a/contrib/python-zstandard/make_cffi.py +++ b/contrib/python-zstandard/make_cffi.py @@ -27,6 +27,7 @@ SOURCES = ['zstd/%s' % p for p in ( 'compress/fse_compress.c', 'compress/huf_compress.c', 'compress/zstd_compress.c', + 'compress/zstdmt_compress.c', 'decompress/huf_decompress.c', 'decompress/zstd_decompress.c', 'dictBuilder/cover.c', @@ -34,9 +35,10 @@ SOURCES = ['zstd/%s' % p for p in ( 'dictBuilder/zdict.c', )] +# Headers whose preprocessed output will be fed into cdef(). HEADERS = [os.path.join(HERE, 'zstd', *p) for p in ( ('zstd.h',), - ('common', 'pool.h'), + ('compress', 'zstdmt_compress.h'), ('dictBuilder', 'zdict.h'), )] @@ -76,11 +78,30 @@ else: raise Exception('unsupported compiler type: %s' % compiler.compiler_type) def preprocess(path): - # zstd.h includes , which is also included by cffi's boilerplate. - # This can lead to duplicate declarations. So we strip this include from the - # preprocessor invocation. with open(path, 'rb') as fh: - lines = [l for l in fh if not l.startswith(b'#include ')] + lines = [] + for l in fh: + # zstd.h includes , which is also included by cffi's + # boilerplate. This can lead to duplicate declarations. So we strip + # this include from the preprocessor invocation. + # + # The same things happens for including zstd.h, so give it the same + # treatment. + # + # We define ZSTD_STATIC_LINKING_ONLY, which is redundant with the inline + # #define in zstdmt_compress.h and results in a compiler warning. So drop + # the inline #define. + if l.startswith((b'#include ', + b'#include "zstd.h"', + b'#define ZSTD_STATIC_LINKING_ONLY')): + continue + + # ZSTDLIB_API may not be defined if we dropped zstd.h. It isn't + # important so just filter it out. + if l.startswith(b'ZSTDLIB_API'): + l = l[len(b'ZSTDLIB_API '):] + + lines.append(l) fd, input_file = tempfile.mkstemp(suffix='.h') os.write(fd, b''.join(lines)) @@ -116,25 +137,30 @@ def normalize_output(output): ffi = cffi.FFI() +# *_DISABLE_DEPRECATE_WARNINGS prevents the compiler from emitting a warning +# when cffi uses the function. Since we statically link against zstd, even +# if we use the deprecated functions it shouldn't be a huge problem. ffi.set_source('_zstd_cffi', ''' #include "mem.h" #define ZSTD_STATIC_LINKING_ONLY #include "zstd.h" #define ZDICT_STATIC_LINKING_ONLY -#include "pool.h" +#define ZDICT_DISABLE_DEPRECATE_WARNINGS #include "zdict.h" +#include "zstdmt_compress.h" ''', sources=SOURCES, include_dirs=INCLUDE_DIRS) DEFINE = re.compile(b'^\\#define ([a-zA-Z0-9_]+) ') sources = [] +# Feed normalized preprocessor output for headers into the cdef parser. for header in HEADERS: preprocessed = preprocess(header) sources.append(normalize_output(preprocessed)) - # Do another pass over source and find constants that were preprocessed - # away. + # #define's are effectively erased as part of going through preprocessor. + # So perform a manual pass to re-add those to the cdef source. with open(header, 'rb') as fh: for line in fh: line = line.strip() @@ -142,13 +168,20 @@ for header in HEADERS: if not m: continue + if m.group(1) == b'ZSTD_STATIC_LINKING_ONLY': + continue + # The parser doesn't like some constants with complex values. if m.group(1) in (b'ZSTD_LIB_VERSION', b'ZSTD_VERSION_STRING'): continue + # The ... is magic syntax by the cdef parser to resolve the + # value at compile time. sources.append(m.group(0) + b' ...') -ffi.cdef(u'\n'.join(s.decode('latin1') for s in sources)) +cdeflines = b'\n'.join(sources).splitlines() +cdeflines = [l for l in cdeflines if l.strip()] +ffi.cdef(b'\n'.join(cdeflines).decode('latin1')) if __name__ == '__main__': ffi.compile() diff --git a/contrib/python-zstandard/setup.py b/contrib/python-zstandard/setup.py --- a/contrib/python-zstandard/setup.py +++ b/contrib/python-zstandard/setup.py @@ -25,10 +25,15 @@ if "--legacy" in sys.argv: # facilitate reuse in other projects. extensions = [setup_zstd.get_c_extension(SUPPORT_LEGACY, 'zstd')] +install_requires = [] + if cffi: import make_cffi extensions.append(make_cffi.ffi.distutils_extension()) + # Need change in 1.8 for ffi.from_buffer() behavior. + install_requires.append('cffi>=1.8') + version = None with open('c-ext/python-zstandard.h', 'r') as fh: @@ -67,4 +72,5 @@ setup( keywords='zstandard zstd compression', ext_modules=extensions, test_suite='tests', + install_requires=install_requires, ) diff --git a/contrib/python-zstandard/setup_zstd.py b/contrib/python-zstandard/setup_zstd.py --- a/contrib/python-zstandard/setup_zstd.py +++ b/contrib/python-zstandard/setup_zstd.py @@ -19,6 +19,7 @@ zstd_sources = ['zstd/%s' % p for p in ( 'compress/fse_compress.c', 'compress/huf_compress.c', 'compress/zstd_compress.c', + 'compress/zstdmt_compress.c', 'decompress/huf_decompress.c', 'decompress/zstd_decompress.c', 'dictBuilder/cover.c', @@ -55,6 +56,7 @@ zstd_includes_legacy = [ ext_sources = [ 'zstd.c', + 'c-ext/bufferutil.c', 'c-ext/compressiondict.c', 'c-ext/compressobj.c', 'c-ext/compressor.c', @@ -66,7 +68,6 @@ ext_sources = [ 'c-ext/decompressor.c', 'c-ext/decompressoriterator.c', 'c-ext/decompressionwriter.c', - 'c-ext/dictparams.c', 'c-ext/frameparams.c', ] @@ -89,8 +90,13 @@ def get_c_extension(support_legacy=False depends = [os.path.join(root, p) for p in zstd_depends] + extra_args = ['-DZSTD_MULTITHREAD'] + + if support_legacy: + extra_args.append('-DZSTD_LEGACY_SUPPORT=1') + # TODO compile with optimizations. return Extension(name, sources, include_dirs=include_dirs, depends=depends, - extra_compile_args=["-DZSTD_LEGACY_SUPPORT=1"] if support_legacy else []) + extra_compile_args=extra_args) diff --git a/contrib/python-zstandard/tests/common.py b/contrib/python-zstandard/tests/common.py --- a/contrib/python-zstandard/tests/common.py +++ b/contrib/python-zstandard/tests/common.py @@ -1,5 +1,6 @@ import inspect import io +import os import types @@ -59,3 +60,29 @@ class OpCountingBytesIO(io.BytesIO): def write(self, data): self._write_count += 1 return super(OpCountingBytesIO, self).write(data) + + +_source_files = [] + + +def random_input_data(): + """Obtain the raw content of source files. + + This is used for generating "random" data to feed into fuzzing, since it is + faster than random content generation. + """ + if _source_files: + return _source_files + + for root, dirs, files in os.walk(os.path.dirname(__file__)): + dirs[:] = list(sorted(dirs)) + for f in sorted(files): + try: + with open(os.path.join(root, f), 'rb') as fh: + data = fh.read() + if data: + _source_files.append(data) + except OSError: + pass + + return _source_files diff --git a/contrib/python-zstandard/tests/test_buffer_util.py b/contrib/python-zstandard/tests/test_buffer_util.py new file mode 100644 --- /dev/null +++ b/contrib/python-zstandard/tests/test_buffer_util.py @@ -0,0 +1,112 @@ +import struct + +try: + import unittest2 as unittest +except ImportError: + import unittest + +import zstd + +ss = struct.Struct('=QQ') + + +class TestBufferWithSegments(unittest.TestCase): + def test_arguments(self): + with self.assertRaises(TypeError): + zstd.BufferWithSegments() + + with self.assertRaises(TypeError): + zstd.BufferWithSegments(b'foo') + + # Segments data should be a multiple of 16. + with self.assertRaisesRegexp(ValueError, 'segments array size is not a multiple of 16'): + zstd.BufferWithSegments(b'foo', b'\x00\x00') + + def test_invalid_offset(self): + with self.assertRaisesRegexp(ValueError, 'offset within segments array references memory'): + zstd.BufferWithSegments(b'foo', ss.pack(0, 4)) + + def test_invalid_getitem(self): + b = zstd.BufferWithSegments(b'foo', ss.pack(0, 3)) + + with self.assertRaisesRegexp(IndexError, 'offset must be non-negative'): + test = b[-10] + + with self.assertRaisesRegexp(IndexError, 'offset must be less than 1'): + test = b[1] + + with self.assertRaisesRegexp(IndexError, 'offset must be less than 1'): + test = b[2] + + def test_single(self): + b = zstd.BufferWithSegments(b'foo', ss.pack(0, 3)) + self.assertEqual(len(b), 1) + self.assertEqual(b.size, 3) + self.assertEqual(b.tobytes(), b'foo') + + self.assertEqual(len(b[0]), 3) + self.assertEqual(b[0].offset, 0) + self.assertEqual(b[0].tobytes(), b'foo') + + def test_multiple(self): + b = zstd.BufferWithSegments(b'foofooxfooxy', b''.join([ss.pack(0, 3), + ss.pack(3, 4), + ss.pack(7, 5)])) + self.assertEqual(len(b), 3) + self.assertEqual(b.size, 12) + self.assertEqual(b.tobytes(), b'foofooxfooxy') + + self.assertEqual(b[0].tobytes(), b'foo') + self.assertEqual(b[1].tobytes(), b'foox') + self.assertEqual(b[2].tobytes(), b'fooxy') + + +class TestBufferWithSegmentsCollection(unittest.TestCase): + def test_empty_constructor(self): + with self.assertRaisesRegexp(ValueError, 'must pass at least 1 argument'): + zstd.BufferWithSegmentsCollection() + + def test_argument_validation(self): + with self.assertRaisesRegexp(TypeError, 'arguments must be BufferWithSegments'): + zstd.BufferWithSegmentsCollection(None) + + with self.assertRaisesRegexp(TypeError, 'arguments must be BufferWithSegments'): + zstd.BufferWithSegmentsCollection(zstd.BufferWithSegments(b'foo', ss.pack(0, 3)), + None) + + with self.assertRaisesRegexp(ValueError, 'ZstdBufferWithSegments cannot be empty'): + zstd.BufferWithSegmentsCollection(zstd.BufferWithSegments(b'', b'')) + + def test_length(self): + b1 = zstd.BufferWithSegments(b'foo', ss.pack(0, 3)) + b2 = zstd.BufferWithSegments(b'barbaz', b''.join([ss.pack(0, 3), + ss.pack(3, 3)])) + + c = zstd.BufferWithSegmentsCollection(b1) + self.assertEqual(len(c), 1) + self.assertEqual(c.size(), 3) + + c = zstd.BufferWithSegmentsCollection(b2) + self.assertEqual(len(c), 2) + self.assertEqual(c.size(), 6) + + c = zstd.BufferWithSegmentsCollection(b1, b2) + self.assertEqual(len(c), 3) + self.assertEqual(c.size(), 9) + + def test_getitem(self): + b1 = zstd.BufferWithSegments(b'foo', ss.pack(0, 3)) + b2 = zstd.BufferWithSegments(b'barbaz', b''.join([ss.pack(0, 3), + ss.pack(3, 3)])) + + c = zstd.BufferWithSegmentsCollection(b1, b2) + + with self.assertRaisesRegexp(IndexError, 'offset must be less than 3'): + c[3] + + with self.assertRaisesRegexp(IndexError, 'offset must be less than 3'): + c[4] + + self.assertEqual(c[0].tobytes(), b'foo') + self.assertEqual(c[1].tobytes(), b'bar') + self.assertEqual(c[2].tobytes(), b'baz') diff --git a/contrib/python-zstandard/tests/test_compressor.py b/contrib/python-zstandard/tests/test_compressor.py --- a/contrib/python-zstandard/tests/test_compressor.py +++ b/contrib/python-zstandard/tests/test_compressor.py @@ -22,6 +22,12 @@ else: next = lambda it: it.next() +def multithreaded_chunk_size(level, source_size=0): + params = zstd.get_compression_parameters(level, source_size) + + return 1 << (params.window_log + 2) + + @make_cffi class TestCompressor(unittest.TestCase): def test_level_bounds(self): @@ -34,6 +40,24 @@ class TestCompressor(unittest.TestCase): @make_cffi class TestCompressor_compress(unittest.TestCase): + def test_multithreaded_unsupported(self): + samples = [] + for i in range(128): + samples.append(b'foo' * 64) + samples.append(b'bar' * 64) + + d = zstd.train_dictionary(8192, samples) + + cctx = zstd.ZstdCompressor(dict_data=d, threads=2) + + with self.assertRaisesRegexp(zstd.ZstdError, 'compress\(\) cannot be used with both dictionaries and multi-threaded compression'): + cctx.compress(b'foo') + + params = zstd.get_compression_parameters(3) + cctx = zstd.ZstdCompressor(compression_params=params, threads=2) + with self.assertRaisesRegexp(zstd.ZstdError, 'compress\(\) cannot be used with both compression parameters and multi-threaded compression'): + cctx.compress(b'foo') + def test_compress_empty(self): cctx = zstd.ZstdCompressor(level=1) result = cctx.compress(b'') @@ -132,6 +156,21 @@ class TestCompressor_compress(unittest.T for i in range(32): cctx.compress(b'foo bar foobar foo bar foobar') + def test_multithreaded(self): + chunk_size = multithreaded_chunk_size(1) + source = b''.join([b'x' * chunk_size, b'y' * chunk_size]) + + cctx = zstd.ZstdCompressor(level=1, threads=2) + compressed = cctx.compress(source) + + params = zstd.get_frame_parameters(compressed) + self.assertEqual(params.content_size, chunk_size * 2) + self.assertEqual(params.dict_id, 0) + self.assertFalse(params.has_checksum) + + dctx = zstd.ZstdDecompressor() + self.assertEqual(dctx.decompress(compressed), source) + @make_cffi class TestCompressor_compressobj(unittest.TestCase): @@ -237,6 +276,30 @@ class TestCompressor_compressobj(unittes header = trailing[0:3] self.assertEqual(header, b'\x01\x00\x00') + def test_multithreaded(self): + source = io.BytesIO() + source.write(b'a' * 1048576) + source.write(b'b' * 1048576) + source.write(b'c' * 1048576) + source.seek(0) + + cctx = zstd.ZstdCompressor(level=1, threads=2) + cobj = cctx.compressobj() + + chunks = [] + while True: + d = source.read(8192) + if not d: + break + + chunks.append(cobj.compress(d)) + + chunks.append(cobj.flush()) + + compressed = b''.join(chunks) + + self.assertEqual(len(compressed), 295) + @make_cffi class TestCompressor_copy_stream(unittest.TestCase): @@ -355,6 +418,36 @@ class TestCompressor_copy_stream(unittes self.assertEqual(source._read_count, len(source.getvalue()) + 1) self.assertEqual(dest._write_count, len(dest.getvalue())) + def test_multithreaded(self): + source = io.BytesIO() + source.write(b'a' * 1048576) + source.write(b'b' * 1048576) + source.write(b'c' * 1048576) + source.seek(0) + + dest = io.BytesIO() + cctx = zstd.ZstdCompressor(threads=2) + r, w = cctx.copy_stream(source, dest) + self.assertEqual(r, 3145728) + self.assertEqual(w, 295) + + params = zstd.get_frame_parameters(dest.getvalue()) + self.assertEqual(params.content_size, 0) + self.assertEqual(params.dict_id, 0) + self.assertFalse(params.has_checksum) + + # Writing content size and checksum works. + cctx = zstd.ZstdCompressor(threads=2, write_content_size=True, + write_checksum=True) + dest = io.BytesIO() + source.seek(0) + cctx.copy_stream(source, dest, size=len(source.getvalue())) + + params = zstd.get_frame_parameters(dest.getvalue()) + self.assertEqual(params.content_size, 3145728) + self.assertEqual(params.dict_id, 0) + self.assertTrue(params.has_checksum) + def compress(data, level): buffer = io.BytesIO() @@ -584,6 +677,16 @@ class TestCompressor_write_to(unittest.T header = trailing[0:3] self.assertEqual(header, b'\x01\x00\x00') + def test_multithreaded(self): + dest = io.BytesIO() + cctx = zstd.ZstdCompressor(threads=2) + with cctx.write_to(dest) as compressor: + compressor.write(b'a' * 1048576) + compressor.write(b'b' * 1048576) + compressor.write(b'c' * 1048576) + + self.assertEqual(len(dest.getvalue()), 295) + @make_cffi class TestCompressor_read_from(unittest.TestCase): @@ -673,3 +776,130 @@ class TestCompressor_read_from(unittest. self.assertEqual(len(chunk), 1) self.assertEqual(source._read_count, len(source.getvalue()) + 1) + + def test_multithreaded(self): + source = io.BytesIO() + source.write(b'a' * 1048576) + source.write(b'b' * 1048576) + source.write(b'c' * 1048576) + source.seek(0) + + cctx = zstd.ZstdCompressor(threads=2) + + compressed = b''.join(cctx.read_from(source)) + self.assertEqual(len(compressed), 295) + + +class TestCompressor_multi_compress_to_buffer(unittest.TestCase): + def test_multithreaded_unsupported(self): + cctx = zstd.ZstdCompressor(threads=2) + + with self.assertRaisesRegexp(zstd.ZstdError, 'function cannot be called on ZstdCompressor configured for multi-threaded compression'): + cctx.multi_compress_to_buffer([b'foo']) + + def test_invalid_inputs(self): + cctx = zstd.ZstdCompressor() + + with self.assertRaises(TypeError): + cctx.multi_compress_to_buffer(True) + + with self.assertRaises(TypeError): + cctx.multi_compress_to_buffer((1, 2)) + + with self.assertRaisesRegexp(TypeError, 'item 0 not a bytes like object'): + cctx.multi_compress_to_buffer([u'foo']) + + def test_empty_input(self): + cctx = zstd.ZstdCompressor() + + with self.assertRaisesRegexp(ValueError, 'no source elements found'): + cctx.multi_compress_to_buffer([]) + + with self.assertRaisesRegexp(ValueError, 'source elements are empty'): + cctx.multi_compress_to_buffer([b'', b'', b'']) + + def test_list_input(self): + cctx = zstd.ZstdCompressor(write_content_size=True, write_checksum=True) + + original = [b'foo' * 12, b'bar' * 6] + frames = [cctx.compress(c) for c in original] + b = cctx.multi_compress_to_buffer(original) + + self.assertIsInstance(b, zstd.BufferWithSegmentsCollection) + + self.assertEqual(len(b), 2) + self.assertEqual(b.size(), 44) + + self.assertEqual(b[0].tobytes(), frames[0]) + self.assertEqual(b[1].tobytes(), frames[1]) + + def test_buffer_with_segments_input(self): + cctx = zstd.ZstdCompressor(write_content_size=True, write_checksum=True) + + original = [b'foo' * 4, b'bar' * 6] + frames = [cctx.compress(c) for c in original] + + offsets = struct.pack('=QQQQ', 0, len(original[0]), + len(original[0]), len(original[1])) + segments = zstd.BufferWithSegments(b''.join(original), offsets) + + result = cctx.multi_compress_to_buffer(segments) + + self.assertEqual(len(result), 2) + self.assertEqual(result.size(), 47) + + self.assertEqual(result[0].tobytes(), frames[0]) + self.assertEqual(result[1].tobytes(), frames[1]) + + def test_buffer_with_segments_collection_input(self): + cctx = zstd.ZstdCompressor(write_content_size=True, write_checksum=True) + + original = [ + b'foo1', + b'foo2' * 2, + b'foo3' * 3, + b'foo4' * 4, + b'foo5' * 5, + ] + + frames = [cctx.compress(c) for c in original] + + b = b''.join([original[0], original[1]]) + b1 = zstd.BufferWithSegments(b, struct.pack('=QQQQ', + 0, len(original[0]), + len(original[0]), len(original[1]))) + b = b''.join([original[2], original[3], original[4]]) + b2 = zstd.BufferWithSegments(b, struct.pack('=QQQQQQ', + 0, len(original[2]), + len(original[2]), len(original[3]), + len(original[2]) + len(original[3]), len(original[4]))) + + c = zstd.BufferWithSegmentsCollection(b1, b2) + + result = cctx.multi_compress_to_buffer(c) + + self.assertEqual(len(result), len(frames)) + + for i, frame in enumerate(frames): + self.assertEqual(result[i].tobytes(), frame) + + def test_multiple_threads(self): + # threads argument will cause multi-threaded ZSTD APIs to be used, which will + # make output different. + refcctx = zstd.ZstdCompressor(write_content_size=True, write_checksum=True) + reference = [refcctx.compress(b'x' * 64), refcctx.compress(b'y' * 64)] + + cctx = zstd.ZstdCompressor(write_content_size=True, write_checksum=True) + + frames = [] + frames.extend(b'x' * 64 for i in range(256)) + frames.extend(b'y' * 64 for i in range(256)) + + result = cctx.multi_compress_to_buffer(frames, threads=-1) + + self.assertEqual(len(result), 512) + for i in range(512): + if i < 256: + self.assertEqual(result[i].tobytes(), reference[0]) + else: + self.assertEqual(result[i].tobytes(), reference[1]) diff --git a/contrib/python-zstandard/tests/test_compressor_fuzzing.py b/contrib/python-zstandard/tests/test_compressor_fuzzing.py new file mode 100644 --- /dev/null +++ b/contrib/python-zstandard/tests/test_compressor_fuzzing.py @@ -0,0 +1,143 @@ +import io +import os + +try: + import unittest2 as unittest +except ImportError: + import unittest + +try: + import hypothesis + import hypothesis.strategies as strategies +except ImportError: + raise unittest.SkipTest('hypothesis not available') + +import zstd + +from . common import ( + make_cffi, + random_input_data, +) + + +@unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set') +@make_cffi +class TestCompressor_write_to_fuzzing(unittest.TestCase): + @hypothesis.given(original=strategies.sampled_from(random_input_data()), + level=strategies.integers(min_value=1, max_value=5), + write_size=strategies.integers(min_value=1, max_value=1048576)) + def test_write_size_variance(self, original, level, write_size): + refctx = zstd.ZstdCompressor(level=level) + ref_frame = refctx.compress(original) + + cctx = zstd.ZstdCompressor(level=level) + b = io.BytesIO() + with cctx.write_to(b, size=len(original), write_size=write_size) as compressor: + compressor.write(original) + + self.assertEqual(b.getvalue(), ref_frame) + + +@unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set') +@make_cffi +class TestCompressor_copy_stream_fuzzing(unittest.TestCase): + @hypothesis.given(original=strategies.sampled_from(random_input_data()), + level=strategies.integers(min_value=1, max_value=5), + read_size=strategies.integers(min_value=1, max_value=1048576), + write_size=strategies.integers(min_value=1, max_value=1048576)) + def test_read_write_size_variance(self, original, level, read_size, write_size): + refctx = zstd.ZstdCompressor(level=level) + ref_frame = refctx.compress(original) + + cctx = zstd.ZstdCompressor(level=level) + source = io.BytesIO(original) + dest = io.BytesIO() + + cctx.copy_stream(source, dest, size=len(original), read_size=read_size, + write_size=write_size) + + self.assertEqual(dest.getvalue(), ref_frame) + + +@unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set') +@make_cffi +class TestCompressor_compressobj_fuzzing(unittest.TestCase): + @hypothesis.given(original=strategies.sampled_from(random_input_data()), + level=strategies.integers(min_value=1, max_value=5), + chunk_sizes=strategies.streaming( + strategies.integers(min_value=1, max_value=4096))) + def test_random_input_sizes(self, original, level, chunk_sizes): + chunk_sizes = iter(chunk_sizes) + + refctx = zstd.ZstdCompressor(level=level) + ref_frame = refctx.compress(original) + + cctx = zstd.ZstdCompressor(level=level) + cobj = cctx.compressobj(size=len(original)) + + chunks = [] + i = 0 + while True: + chunk_size = next(chunk_sizes) + source = original[i:i + chunk_size] + if not source: + break + + chunks.append(cobj.compress(source)) + i += chunk_size + + chunks.append(cobj.flush()) + + self.assertEqual(b''.join(chunks), ref_frame) + + +@unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set') +@make_cffi +class TestCompressor_read_from_fuzzing(unittest.TestCase): + @hypothesis.given(original=strategies.sampled_from(random_input_data()), + level=strategies.integers(min_value=1, max_value=5), + read_size=strategies.integers(min_value=1, max_value=4096), + write_size=strategies.integers(min_value=1, max_value=4096)) + def test_read_write_size_variance(self, original, level, read_size, write_size): + refcctx = zstd.ZstdCompressor(level=level) + ref_frame = refcctx.compress(original) + + source = io.BytesIO(original) + + cctx = zstd.ZstdCompressor(level=level) + chunks = list(cctx.read_from(source, size=len(original), read_size=read_size, + write_size=write_size)) + + self.assertEqual(b''.join(chunks), ref_frame) + + +@unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set') +class TestCompressor_multi_compress_to_buffer_fuzzing(unittest.TestCase): + @hypothesis.given(original=strategies.lists(strategies.sampled_from(random_input_data()), + min_size=1, max_size=1024), + threads=strategies.integers(min_value=1, max_value=8), + use_dict=strategies.booleans()) + def test_data_equivalence(self, original, threads, use_dict): + kwargs = {} + + # Use a content dictionary because it is cheap to create. + if use_dict: + kwargs['dict_data'] = zstd.ZstdCompressionDict(original[0]) + + cctx = zstd.ZstdCompressor(level=1, + write_content_size=True, + write_checksum=True, + **kwargs) + + result = cctx.multi_compress_to_buffer(original, threads=-1) + + self.assertEqual(len(result), len(original)) + + # The frame produced via the batch APIs may not be bit identical to that + # produced by compress() because compression parameters are adjusted + # from the first input in batch mode. So the only thing we can do is + # verify the decompressed data matches the input. + dctx = zstd.ZstdDecompressor(**kwargs) + + for i, frame in enumerate(result): + self.assertEqual(dctx.decompress(frame), original[i]) diff --git a/contrib/python-zstandard/tests/test_data_structures.py b/contrib/python-zstandard/tests/test_data_structures.py --- a/contrib/python-zstandard/tests/test_data_structures.py +++ b/contrib/python-zstandard/tests/test_data_structures.py @@ -1,16 +1,8 @@ -import io - try: import unittest2 as unittest except ImportError: import unittest -try: - import hypothesis - import hypothesis.strategies as strategies -except ImportError: - hypothesis = None - import zstd from . common import ( @@ -32,7 +24,7 @@ class TestCompressionParameters(unittest zstd.CHAINLOG_MIN, zstd.HASHLOG_MIN, zstd.SEARCHLOG_MIN, - zstd.SEARCHLENGTH_MIN, + zstd.SEARCHLENGTH_MIN + 1, zstd.TARGETLENGTH_MIN, zstd.STRATEGY_FAST) @@ -40,7 +32,7 @@ class TestCompressionParameters(unittest zstd.CHAINLOG_MAX, zstd.HASHLOG_MAX, zstd.SEARCHLOG_MAX, - zstd.SEARCHLENGTH_MAX, + zstd.SEARCHLENGTH_MAX - 1, zstd.TARGETLENGTH_MAX, zstd.STRATEGY_BTOPT) @@ -60,6 +52,13 @@ class TestCompressionParameters(unittest self.assertEqual(p.target_length, 8) self.assertEqual(p.strategy, 1) + def test_estimated_compression_context_size(self): + p = zstd.CompressionParameters(20, 16, 17, 1, 5, 16, zstd.STRATEGY_DFAST) + + # 32-bit has slightly different values from 64-bit. + self.assertAlmostEqual(p.estimated_compression_context_size(), 1287076, + delta=110) + @make_cffi class TestFrameParameters(unittest.TestCase): @@ -122,65 +121,3 @@ class TestFrameParameters(unittest.TestC self.assertEqual(params.window_size, 262144) self.assertEqual(params.dict_id, 15) self.assertTrue(params.has_checksum) - - -if hypothesis: - s_windowlog = strategies.integers(min_value=zstd.WINDOWLOG_MIN, - max_value=zstd.WINDOWLOG_MAX) - s_chainlog = strategies.integers(min_value=zstd.CHAINLOG_MIN, - max_value=zstd.CHAINLOG_MAX) - s_hashlog = strategies.integers(min_value=zstd.HASHLOG_MIN, - max_value=zstd.HASHLOG_MAX) - s_searchlog = strategies.integers(min_value=zstd.SEARCHLOG_MIN, - max_value=zstd.SEARCHLOG_MAX) - s_searchlength = strategies.integers(min_value=zstd.SEARCHLENGTH_MIN, - max_value=zstd.SEARCHLENGTH_MAX) - s_targetlength = strategies.integers(min_value=zstd.TARGETLENGTH_MIN, - max_value=zstd.TARGETLENGTH_MAX) - s_strategy = strategies.sampled_from((zstd.STRATEGY_FAST, - zstd.STRATEGY_DFAST, - zstd.STRATEGY_GREEDY, - zstd.STRATEGY_LAZY, - zstd.STRATEGY_LAZY2, - zstd.STRATEGY_BTLAZY2, - zstd.STRATEGY_BTOPT)) - - - @make_cffi - class TestCompressionParametersHypothesis(unittest.TestCase): - @hypothesis.given(s_windowlog, s_chainlog, s_hashlog, s_searchlog, - s_searchlength, s_targetlength, s_strategy) - def test_valid_init(self, windowlog, chainlog, hashlog, searchlog, - searchlength, targetlength, strategy): - p = zstd.CompressionParameters(windowlog, chainlog, hashlog, - searchlog, searchlength, - targetlength, strategy) - - # Verify we can instantiate a compressor with the supplied values. - # ZSTD_checkCParams moves the goal posts on us from what's advertised - # in the constants. So move along with them. - if searchlength == zstd.SEARCHLENGTH_MIN and strategy in (zstd.STRATEGY_FAST, zstd.STRATEGY_GREEDY): - searchlength += 1 - p = zstd.CompressionParameters(windowlog, chainlog, hashlog, - searchlog, searchlength, - targetlength, strategy) - elif searchlength == zstd.SEARCHLENGTH_MAX and strategy != zstd.STRATEGY_FAST: - searchlength -= 1 - p = zstd.CompressionParameters(windowlog, chainlog, hashlog, - searchlog, searchlength, - targetlength, strategy) - - cctx = zstd.ZstdCompressor(compression_params=p) - with cctx.write_to(io.BytesIO()): - pass - - @hypothesis.given(s_windowlog, s_chainlog, s_hashlog, s_searchlog, - s_searchlength, s_targetlength, s_strategy) - def test_estimate_compression_context_size(self, windowlog, chainlog, - hashlog, searchlog, - searchlength, targetlength, - strategy): - p = zstd.CompressionParameters(windowlog, chainlog, hashlog, - searchlog, searchlength, - targetlength, strategy) - size = zstd.estimate_compression_context_size(p) diff --git a/contrib/python-zstandard/tests/test_data_structures_fuzzing.py b/contrib/python-zstandard/tests/test_data_structures_fuzzing.py new file mode 100644 --- /dev/null +++ b/contrib/python-zstandard/tests/test_data_structures_fuzzing.py @@ -0,0 +1,79 @@ +import io +import os + +try: + import unittest2 as unittest +except ImportError: + import unittest + +try: + import hypothesis + import hypothesis.strategies as strategies +except ImportError: + raise unittest.SkipTest('hypothesis not available') + +import zstd + +from .common import ( + make_cffi, +) + + +s_windowlog = strategies.integers(min_value=zstd.WINDOWLOG_MIN, + max_value=zstd.WINDOWLOG_MAX) +s_chainlog = strategies.integers(min_value=zstd.CHAINLOG_MIN, + max_value=zstd.CHAINLOG_MAX) +s_hashlog = strategies.integers(min_value=zstd.HASHLOG_MIN, + max_value=zstd.HASHLOG_MAX) +s_searchlog = strategies.integers(min_value=zstd.SEARCHLOG_MIN, + max_value=zstd.SEARCHLOG_MAX) +s_searchlength = strategies.integers(min_value=zstd.SEARCHLENGTH_MIN, + max_value=zstd.SEARCHLENGTH_MAX) +s_targetlength = strategies.integers(min_value=zstd.TARGETLENGTH_MIN, + max_value=zstd.TARGETLENGTH_MAX) +s_strategy = strategies.sampled_from((zstd.STRATEGY_FAST, + zstd.STRATEGY_DFAST, + zstd.STRATEGY_GREEDY, + zstd.STRATEGY_LAZY, + zstd.STRATEGY_LAZY2, + zstd.STRATEGY_BTLAZY2, + zstd.STRATEGY_BTOPT)) + + +@make_cffi +@unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set') +class TestCompressionParametersHypothesis(unittest.TestCase): + @hypothesis.given(s_windowlog, s_chainlog, s_hashlog, s_searchlog, + s_searchlength, s_targetlength, s_strategy) + def test_valid_init(self, windowlog, chainlog, hashlog, searchlog, + searchlength, targetlength, strategy): + # ZSTD_checkCParams moves the goal posts on us from what's advertised + # in the constants. So move along with them. + if searchlength == zstd.SEARCHLENGTH_MIN and strategy in (zstd.STRATEGY_FAST, zstd.STRATEGY_GREEDY): + searchlength += 1 + elif searchlength == zstd.SEARCHLENGTH_MAX and strategy != zstd.STRATEGY_FAST: + searchlength -= 1 + + p = zstd.CompressionParameters(windowlog, chainlog, hashlog, + searchlog, searchlength, + targetlength, strategy) + + cctx = zstd.ZstdCompressor(compression_params=p) + with cctx.write_to(io.BytesIO()): + pass + + @hypothesis.given(s_windowlog, s_chainlog, s_hashlog, s_searchlog, + s_searchlength, s_targetlength, s_strategy) + def test_estimate_compression_context_size(self, windowlog, chainlog, + hashlog, searchlog, + searchlength, targetlength, + strategy): + if searchlength == zstd.SEARCHLENGTH_MIN and strategy in (zstd.STRATEGY_FAST, zstd.STRATEGY_GREEDY): + searchlength += 1 + elif searchlength == zstd.SEARCHLENGTH_MAX and strategy != zstd.STRATEGY_FAST: + searchlength -= 1 + + p = zstd.CompressionParameters(windowlog, chainlog, hashlog, + searchlog, searchlength, + targetlength, strategy) + size = zstd.estimate_compression_context_size(p) diff --git a/contrib/python-zstandard/tests/test_decompressor.py b/contrib/python-zstandard/tests/test_decompressor.py --- a/contrib/python-zstandard/tests/test_decompressor.py +++ b/contrib/python-zstandard/tests/test_decompressor.py @@ -49,7 +49,7 @@ class TestDecompressor_decompress(unitte compressed = cctx.compress(b'foobar') dctx = zstd.ZstdDecompressor() - decompressed = dctx.decompress(compressed) + decompressed = dctx.decompress(compressed) self.assertEqual(decompressed, b'foobar') def test_max_output_size(self): @@ -293,7 +293,6 @@ class TestDecompressor_write_to(unittest c = s.pack(c) decompressor.write(c) - self.assertEqual(dest.getvalue(), b'foobarfoobar') self.assertEqual(dest._write_count, len(dest.getvalue())) @@ -575,3 +574,168 @@ class TestDecompressor_content_dict_chai dctx = zstd.ZstdDecompressor() decompressed = dctx.decompress_content_dict_chain(chain) self.assertEqual(decompressed, expected) + + +# TODO enable for CFFI +class TestDecompressor_multi_decompress_to_buffer(unittest.TestCase): + def test_invalid_inputs(self): + dctx = zstd.ZstdDecompressor() + + with self.assertRaises(TypeError): + dctx.multi_decompress_to_buffer(True) + + with self.assertRaises(TypeError): + dctx.multi_decompress_to_buffer((1, 2)) + + with self.assertRaisesRegexp(TypeError, 'item 0 not a bytes like object'): + dctx.multi_decompress_to_buffer([u'foo']) + + with self.assertRaisesRegexp(ValueError, 'could not determine decompressed size of item 0'): + dctx.multi_decompress_to_buffer([b'foobarbaz']) + + def test_list_input(self): + cctx = zstd.ZstdCompressor(write_content_size=True) + + original = [b'foo' * 4, b'bar' * 6] + frames = [cctx.compress(d) for d in original] + + dctx = zstd.ZstdDecompressor() + result = dctx.multi_decompress_to_buffer(frames) + + self.assertEqual(len(result), len(frames)) + self.assertEqual(result.size(), sum(map(len, original))) + + for i, data in enumerate(original): + self.assertEqual(result[i].tobytes(), data) + + self.assertEqual(result[0].offset, 0) + self.assertEqual(len(result[0]), 12) + self.assertEqual(result[1].offset, 12) + self.assertEqual(len(result[1]), 18) + + def test_list_input_frame_sizes(self): + cctx = zstd.ZstdCompressor(write_content_size=False) + + original = [b'foo' * 4, b'bar' * 6, b'baz' * 8] + frames = [cctx.compress(d) for d in original] + sizes = struct.pack('=' + 'Q' * len(original), *map(len, original)) + + dctx = zstd.ZstdDecompressor() + result = dctx.multi_decompress_to_buffer(frames, decompressed_sizes=sizes) + + self.assertEqual(len(result), len(frames)) + self.assertEqual(result.size(), sum(map(len, original))) + + for i, data in enumerate(original): + self.assertEqual(result[i].tobytes(), data) + + def test_buffer_with_segments_input(self): + cctx = zstd.ZstdCompressor(write_content_size=True) + + original = [b'foo' * 4, b'bar' * 6] + frames = [cctx.compress(d) for d in original] + + dctx = zstd.ZstdDecompressor() + + segments = struct.pack('=QQQQ', 0, len(frames[0]), len(frames[0]), len(frames[1])) + b = zstd.BufferWithSegments(b''.join(frames), segments) + + result = dctx.multi_decompress_to_buffer(b) + + self.assertEqual(len(result), len(frames)) + self.assertEqual(result[0].offset, 0) + self.assertEqual(len(result[0]), 12) + self.assertEqual(result[1].offset, 12) + self.assertEqual(len(result[1]), 18) + + def test_buffer_with_segments_sizes(self): + cctx = zstd.ZstdCompressor(write_content_size=False) + original = [b'foo' * 4, b'bar' * 6, b'baz' * 8] + frames = [cctx.compress(d) for d in original] + sizes = struct.pack('=' + 'Q' * len(original), *map(len, original)) + + segments = struct.pack('=QQQQQQ', 0, len(frames[0]), + len(frames[0]), len(frames[1]), + len(frames[0]) + len(frames[1]), len(frames[2])) + b = zstd.BufferWithSegments(b''.join(frames), segments) + + dctx = zstd.ZstdDecompressor() + result = dctx.multi_decompress_to_buffer(b, decompressed_sizes=sizes) + + self.assertEqual(len(result), len(frames)) + self.assertEqual(result.size(), sum(map(len, original))) + + for i, data in enumerate(original): + self.assertEqual(result[i].tobytes(), data) + + def test_buffer_with_segments_collection_input(self): + cctx = zstd.ZstdCompressor(write_content_size=True) + + original = [ + b'foo0' * 2, + b'foo1' * 3, + b'foo2' * 4, + b'foo3' * 5, + b'foo4' * 6, + ] + + frames = cctx.multi_compress_to_buffer(original) + + # Check round trip. + dctx = zstd.ZstdDecompressor() + decompressed = dctx.multi_decompress_to_buffer(frames, threads=3) + + self.assertEqual(len(decompressed), len(original)) + + for i, data in enumerate(original): + self.assertEqual(data, decompressed[i].tobytes()) + + # And a manual mode. + b = b''.join([frames[0].tobytes(), frames[1].tobytes()]) + b1 = zstd.BufferWithSegments(b, struct.pack('=QQQQ', + 0, len(frames[0]), + len(frames[0]), len(frames[1]))) + + b = b''.join([frames[2].tobytes(), frames[3].tobytes(), frames[4].tobytes()]) + b2 = zstd.BufferWithSegments(b, struct.pack('=QQQQQQ', + 0, len(frames[2]), + len(frames[2]), len(frames[3]), + len(frames[2]) + len(frames[3]), len(frames[4]))) + + c = zstd.BufferWithSegmentsCollection(b1, b2) + + dctx = zstd.ZstdDecompressor() + decompressed = dctx.multi_decompress_to_buffer(c) + + self.assertEqual(len(decompressed), 5) + for i in range(5): + self.assertEqual(decompressed[i].tobytes(), original[i]) + + def test_multiple_threads(self): + cctx = zstd.ZstdCompressor(write_content_size=True) + + frames = [] + frames.extend(cctx.compress(b'x' * 64) for i in range(256)) + frames.extend(cctx.compress(b'y' * 64) for i in range(256)) + + dctx = zstd.ZstdDecompressor() + result = dctx.multi_decompress_to_buffer(frames, threads=-1) + + self.assertEqual(len(result), len(frames)) + self.assertEqual(result.size(), 2 * 64 * 256) + self.assertEqual(result[0].tobytes(), b'x' * 64) + self.assertEqual(result[256].tobytes(), b'y' * 64) + + def test_item_failure(self): + cctx = zstd.ZstdCompressor(write_content_size=True) + frames = [cctx.compress(b'x' * 128), cctx.compress(b'y' * 128)] + + frames[1] = frames[1] + b'extra' + + dctx = zstd.ZstdDecompressor() + + with self.assertRaisesRegexp(zstd.ZstdError, 'error decompressing item 1: Src size incorrect'): + dctx.multi_decompress_to_buffer(frames) + + with self.assertRaisesRegexp(zstd.ZstdError, 'error decompressing item 1: Src size incorrect'): + dctx.multi_decompress_to_buffer(frames, threads=2) diff --git a/contrib/python-zstandard/tests/test_decompressor_fuzzing.py b/contrib/python-zstandard/tests/test_decompressor_fuzzing.py new file mode 100644 --- /dev/null +++ b/contrib/python-zstandard/tests/test_decompressor_fuzzing.py @@ -0,0 +1,151 @@ +import io +import os + +try: + import unittest2 as unittest +except ImportError: + import unittest + +try: + import hypothesis + import hypothesis.strategies as strategies +except ImportError: + raise unittest.SkipTest('hypothesis not available') + +import zstd + +from . common import ( + make_cffi, + random_input_data, +) + + +@unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set') +@make_cffi +class TestDecompressor_write_to_fuzzing(unittest.TestCase): + @hypothesis.given(original=strategies.sampled_from(random_input_data()), + level=strategies.integers(min_value=1, max_value=5), + write_size=strategies.integers(min_value=1, max_value=8192), + input_sizes=strategies.streaming( + strategies.integers(min_value=1, max_value=4096))) + def test_write_size_variance(self, original, level, write_size, input_sizes): + input_sizes = iter(input_sizes) + + cctx = zstd.ZstdCompressor(level=level) + frame = cctx.compress(original) + + dctx = zstd.ZstdDecompressor() + source = io.BytesIO(frame) + dest = io.BytesIO() + + with dctx.write_to(dest, write_size=write_size) as decompressor: + while True: + chunk = source.read(next(input_sizes)) + if not chunk: + break + + decompressor.write(chunk) + + self.assertEqual(dest.getvalue(), original) + + +@unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set') +@make_cffi +class TestDecompressor_copy_stream_fuzzing(unittest.TestCase): + @hypothesis.given(original=strategies.sampled_from(random_input_data()), + level=strategies.integers(min_value=1, max_value=5), + read_size=strategies.integers(min_value=1, max_value=8192), + write_size=strategies.integers(min_value=1, max_value=8192)) + def test_read_write_size_variance(self, original, level, read_size, write_size): + cctx = zstd.ZstdCompressor(level=level) + frame = cctx.compress(original) + + source = io.BytesIO(frame) + dest = io.BytesIO() + + dctx = zstd.ZstdDecompressor() + dctx.copy_stream(source, dest, read_size=read_size, write_size=write_size) + + self.assertEqual(dest.getvalue(), original) + + +@unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set') +@make_cffi +class TestDecompressor_decompressobj_fuzzing(unittest.TestCase): + @hypothesis.given(original=strategies.sampled_from(random_input_data()), + level=strategies.integers(min_value=1, max_value=5), + chunk_sizes=strategies.streaming( + strategies.integers(min_value=1, max_value=4096))) + def test_random_input_sizes(self, original, level, chunk_sizes): + chunk_sizes = iter(chunk_sizes) + + cctx = zstd.ZstdCompressor(level=level) + frame = cctx.compress(original) + + source = io.BytesIO(frame) + + dctx = zstd.ZstdDecompressor() + dobj = dctx.decompressobj() + + chunks = [] + while True: + chunk = source.read(next(chunk_sizes)) + if not chunk: + break + + chunks.append(dobj.decompress(chunk)) + + self.assertEqual(b''.join(chunks), original) + + +@unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set') +@make_cffi +class TestDecompressor_read_from_fuzzing(unittest.TestCase): + @hypothesis.given(original=strategies.sampled_from(random_input_data()), + level=strategies.integers(min_value=1, max_value=5), + read_size=strategies.integers(min_value=1, max_value=4096), + write_size=strategies.integers(min_value=1, max_value=4096)) + def test_read_write_size_variance(self, original, level, read_size, write_size): + cctx = zstd.ZstdCompressor(level=level) + frame = cctx.compress(original) + + source = io.BytesIO(frame) + + dctx = zstd.ZstdDecompressor() + chunks = list(dctx.read_from(source, read_size=read_size, write_size=write_size)) + + self.assertEqual(b''.join(chunks), original) + + +@unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set') +class TestDecompressor_multi_decompress_to_buffer_fuzzing(unittest.TestCase): + @hypothesis.given(original=strategies.lists(strategies.sampled_from(random_input_data()), + min_size=1, max_size=1024), + threads=strategies.integers(min_value=1, max_value=8), + use_dict=strategies.booleans()) + def test_data_equivalence(self, original, threads, use_dict): + kwargs = {} + if use_dict: + kwargs['dict_data'] = zstd.ZstdCompressionDict(original[0]) + + cctx = zstd.ZstdCompressor(level=1, + write_content_size=True, + write_checksum=True, + **kwargs) + + frames_buffer = cctx.multi_compress_to_buffer(original, threads=-1) + + dctx = zstd.ZstdDecompressor(**kwargs) + + result = dctx.multi_decompress_to_buffer(frames_buffer) + + self.assertEqual(len(result), len(original)) + for i, frame in enumerate(result): + self.assertEqual(frame.tobytes(), original[i]) + + frames_list = [f.tobytes() for f in frames_buffer] + result = dctx.multi_decompress_to_buffer(frames_list) + + self.assertEqual(len(result), len(original)) + for i, frame in enumerate(result): + self.assertEqual(frame.tobytes(), original[i]) diff --git a/contrib/python-zstandard/tests/test_roundtrip.py b/contrib/python-zstandard/tests/test_roundtrip.py deleted file mode 100644 --- a/contrib/python-zstandard/tests/test_roundtrip.py +++ /dev/null @@ -1,68 +0,0 @@ -import io - -try: - import unittest2 as unittest -except ImportError: - import unittest - -try: - import hypothesis - import hypothesis.strategies as strategies -except ImportError: - raise unittest.SkipTest('hypothesis not available') - -import zstd - -from .common import ( - make_cffi, -) - -compression_levels = strategies.integers(min_value=1, max_value=22) - - -@make_cffi -class TestRoundTrip(unittest.TestCase): - @hypothesis.given(strategies.binary(), compression_levels) - def test_compress_write_to(self, data, level): - """Random data from compress() roundtrips via write_to.""" - cctx = zstd.ZstdCompressor(level=level) - compressed = cctx.compress(data) - - buffer = io.BytesIO() - dctx = zstd.ZstdDecompressor() - with dctx.write_to(buffer) as decompressor: - decompressor.write(compressed) - - self.assertEqual(buffer.getvalue(), data) - - @hypothesis.given(strategies.binary(), compression_levels) - def test_compressor_write_to_decompressor_write_to(self, data, level): - """Random data from compressor write_to roundtrips via write_to.""" - compress_buffer = io.BytesIO() - decompressed_buffer = io.BytesIO() - - cctx = zstd.ZstdCompressor(level=level) - with cctx.write_to(compress_buffer) as compressor: - compressor.write(data) - - dctx = zstd.ZstdDecompressor() - with dctx.write_to(decompressed_buffer) as decompressor: - decompressor.write(compress_buffer.getvalue()) - - self.assertEqual(decompressed_buffer.getvalue(), data) - - @hypothesis.given(strategies.binary(average_size=1048576)) - @hypothesis.settings(perform_health_check=False) - def test_compressor_write_to_decompressor_write_to_larger(self, data): - compress_buffer = io.BytesIO() - decompressed_buffer = io.BytesIO() - - cctx = zstd.ZstdCompressor(level=5) - with cctx.write_to(compress_buffer) as compressor: - compressor.write(data) - - dctx = zstd.ZstdDecompressor() - with dctx.write_to(decompressed_buffer) as decompressor: - decompressor.write(compress_buffer.getvalue()) - - self.assertEqual(decompressed_buffer.getvalue(), data) diff --git a/contrib/python-zstandard/tests/test_train_dictionary.py b/contrib/python-zstandard/tests/test_train_dictionary.py --- a/contrib/python-zstandard/tests/test_train_dictionary.py +++ b/contrib/python-zstandard/tests/test_train_dictionary.py @@ -48,3 +48,63 @@ class TestTrainDictionary(unittest.TestC data = d.as_bytes() self.assertEqual(data[0:4], b'\x37\xa4\x30\xec') + + def test_set_dict_id(self): + samples = [] + for i in range(128): + samples.append(b'foo' * 64) + samples.append(b'foobar' * 64) + + d = zstd.train_dictionary(8192, samples, dict_id=42) + self.assertEqual(d.dict_id(), 42) + + +@make_cffi +class TestTrainCoverDictionary(unittest.TestCase): + def test_no_args(self): + with self.assertRaises(TypeError): + zstd.train_cover_dictionary() + + def test_bad_args(self): + with self.assertRaises(TypeError): + zstd.train_cover_dictionary(8192, u'foo') + + with self.assertRaises(ValueError): + zstd.train_cover_dictionary(8192, [u'foo']) + + def test_basic(self): + samples = [] + for i in range(128): + samples.append(b'foo' * 64) + samples.append(b'foobar' * 64) + + d = zstd.train_cover_dictionary(8192, samples, k=64, d=16) + self.assertIsInstance(d.dict_id(), int_type) + + data = d.as_bytes() + self.assertEqual(data[0:4], b'\x37\xa4\x30\xec') + + self.assertEqual(d.k, 64) + self.assertEqual(d.d, 16) + + def test_set_dict_id(self): + samples = [] + for i in range(128): + samples.append(b'foo' * 64) + samples.append(b'foobar' * 64) + + d = zstd.train_cover_dictionary(8192, samples, k=64, d=16, + dict_id=42) + self.assertEqual(d.dict_id(), 42) + + def test_optimize(self): + samples = [] + for i in range(128): + samples.append(b'foo' * 64) + samples.append(b'foobar' * 64) + + d = zstd.train_cover_dictionary(8192, samples, optimize=True, + threads=-1, steps=1, d=16) + + self.assertEqual(d.k, 16) + self.assertEqual(d.d, 16) diff --git a/contrib/python-zstandard/zstd.c b/contrib/python-zstandard/zstd.c --- a/contrib/python-zstandard/zstd.c +++ b/contrib/python-zstandard/zstd.c @@ -8,6 +8,11 @@ /* A Python C extension for Zstandard. */ +#if defined(_WIN32) +#define WIN32_LEAN_AND_MEAN +#include +#endif + #include "python-zstandard.h" PyObject *ZstdError; @@ -49,9 +54,22 @@ PyDoc_STRVAR(train_dictionary__doc__, "\n" "The raw dictionary content will be returned\n"); +PyDoc_STRVAR(train_cover_dictionary__doc__, +"train_cover_dictionary(dict_size, samples, k=None, d=None, notifications=0, dict_id=0, level=0)\n" +"\n" +"Train a dictionary from sample data using the COVER algorithm.\n" +"\n" +"This behaves like ``train_dictionary()`` except a different algorithm is\n" +"used to create the dictionary. The algorithm has 2 parameters: ``k`` and\n" +"``d``. These control the *segment size* and *dmer size*. A reasonable range\n" +"for ``k`` is ``[16, 2048+]``. A reasonable range for ``d`` is ``[6, 16]``.\n" +"``d`` must be less than or equal to ``k``.\n" +); + static char zstd_doc[] = "Interface to zstandard"; static PyMethodDef zstd_methods[] = { + /* TODO remove since it is a method on CompressionParameters. */ { "estimate_compression_context_size", (PyCFunction)estimate_compression_context_size, METH_VARARGS, estimate_compression_context_size__doc__ }, { "estimate_decompression_context_size", (PyCFunction)estimate_decompression_context_size, @@ -62,14 +80,16 @@ static PyMethodDef zstd_methods[] = { METH_VARARGS, get_frame_parameters__doc__ }, { "train_dictionary", (PyCFunction)train_dictionary, METH_VARARGS | METH_KEYWORDS, train_dictionary__doc__ }, + { "train_cover_dictionary", (PyCFunction)train_cover_dictionary, + METH_VARARGS | METH_KEYWORDS, train_cover_dictionary__doc__ }, { NULL, NULL } }; +void bufferutil_module_init(PyObject* mod); void compressobj_module_init(PyObject* mod); void compressor_module_init(PyObject* mod); void compressionparams_module_init(PyObject* mod); void constants_module_init(PyObject* mod); -void dictparams_module_init(PyObject* mod); void compressiondict_module_init(PyObject* mod); void compressionwriter_module_init(PyObject* mod); void compressoriterator_module_init(PyObject* mod); @@ -100,8 +120,8 @@ void zstd_module_init(PyObject* m) { return; } + bufferutil_module_init(m); compressionparams_module_init(m); - dictparams_module_init(m); compressiondict_module_init(m); compressobj_module_init(m); compressor_module_init(m); @@ -143,3 +163,48 @@ PyMODINIT_FUNC initzstd(void) { } } #endif + +/* Attempt to resolve the number of CPUs in the system. */ +int cpu_count() { + int count = 0; + +#if defined(_WIN32) + SYSTEM_INFO si; + si.dwNumberOfProcessors = 0; + GetSystemInfo(&si); + count = si.dwNumberOfProcessors; +#elif defined(__APPLE__) + int num; + size_t size = sizeof(int); + + if (0 == sysctlbyname("hw.logicalcpu", &num, &size, NULL, 0)) { + count = num; + } +#elif defined(__linux__) + count = sysconf(_SC_NPROCESSORS_ONLN); +#elif defined(__OpenBSD__) || defined(__FreeBSD__) || defined(__NetBSD__) || defined(__DragonFly__) + int mib[2]; + size_t len = sizeof(count); + mib[0] = CTL_HW; + mib[1] = HW_NCPU; + if (0 != sysctl(mib, 2, &count, &len, NULL, 0)) { + count = 0; + } +#elif defined(__hpux) + count = mpctl(MPC_GETNUMSPUS, NULL, NULL); +#endif + + return count; +} + +size_t roundpow2(size_t i) { + i--; + i |= i >> 1; + i |= i >> 2; + i |= i >> 4; + i |= i >> 8; + i |= i >> 16; + i++; + + return i; +} diff --git a/contrib/python-zstandard/zstd_cffi.py b/contrib/python-zstandard/zstd_cffi.py --- a/contrib/python-zstandard/zstd_cffi.py +++ b/contrib/python-zstandard/zstd_cffi.py @@ -8,6 +8,7 @@ from __future__ import absolute_import, unicode_literals +import os import sys from _zstd_cffi import ( @@ -62,6 +63,26 @@ COMPRESSOBJ_FLUSH_FINISH = 0 COMPRESSOBJ_FLUSH_BLOCK = 1 +def _cpu_count(): + # os.cpu_count() was introducd in Python 3.4. + try: + return os.cpu_count() or 0 + except AttributeError: + pass + + # Linux. + try: + if sys.version_info[0] == 2: + return os.sysconf(b'SC_NPROCESSORS_ONLN') + else: + return os.sysconf(u'SC_NPROCESSORS_ONLN') + except (AttributeError, ValueError): + pass + + # TODO implement on other platforms. + return 0 + + class ZstdError(Exception): pass @@ -98,6 +119,14 @@ class CompressionParameters(object): self.target_length = target_length self.strategy = strategy + zresult = lib.ZSTD_checkCParams(self.as_compression_parameters()) + if lib.ZSTD_isError(zresult): + raise ValueError('invalid compression parameters: %s', + ffi.string(lib.ZSTD_getErrorName(zresult))) + + def estimated_compression_context_size(self): + return lib.ZSTD_estimateCCtxSize(self.as_compression_parameters()) + def as_compression_parameters(self): p = ffi.new('ZSTD_compressionParameters *')[0] p.windowLog = self.window_log @@ -140,12 +169,16 @@ class ZstdCompressionWriter(object): self._source_size = source_size self._write_size = write_size self._entered = False + self._mtcctx = compressor._cctx if compressor._multithreaded else None def __enter__(self): if self._entered: raise ZstdError('cannot __enter__ multiple times') - self._cstream = self._compressor._get_cstream(self._source_size) + if self._mtcctx: + self._compressor._init_mtcstream(self._source_size) + else: + self._compressor._ensure_cstream(self._source_size) self._entered = True return self @@ -160,7 +193,10 @@ class ZstdCompressionWriter(object): out_buffer.pos = 0 while True: - zresult = lib.ZSTD_endStream(self._cstream, out_buffer) + if self._mtcctx: + zresult = lib.ZSTDMT_endStream(self._mtcctx, out_buffer) + else: + zresult = lib.ZSTD_endStream(self._compressor._cstream, out_buffer) if lib.ZSTD_isError(zresult): raise ZstdError('error ending compression stream: %s' % ffi.string(lib.ZSTD_getErrorName(zresult))) @@ -172,7 +208,6 @@ class ZstdCompressionWriter(object): if zresult == 0: break - self._cstream = None self._compressor = None return False @@ -182,7 +217,7 @@ class ZstdCompressionWriter(object): raise ZstdError('cannot determine size of an inactive compressor; ' 'call when a context manager is active') - return lib.ZSTD_sizeof_CStream(self._cstream) + return lib.ZSTD_sizeof_CStream(self._compressor._cstream) def write(self, data): if not self._entered: @@ -205,7 +240,12 @@ class ZstdCompressionWriter(object): out_buffer.pos = 0 while in_buffer.pos < in_buffer.size: - zresult = lib.ZSTD_compressStream(self._cstream, out_buffer, in_buffer) + if self._mtcctx: + zresult = lib.ZSTDMT_compressStream(self._mtcctx, out_buffer, + in_buffer) + else: + zresult = lib.ZSTD_compressStream(self._compressor._cstream, out_buffer, + in_buffer) if lib.ZSTD_isError(zresult): raise ZstdError('zstd compress error: %s' % ffi.string(lib.ZSTD_getErrorName(zresult))) @@ -230,7 +270,10 @@ class ZstdCompressionWriter(object): out_buffer.pos = 0 while True: - zresult = lib.ZSTD_flushStream(self._cstream, out_buffer) + if self._mtcctx: + zresult = lib.ZSTDMT_flushStream(self._mtcctx, out_buffer) + else: + zresult = lib.ZSTD_flushStream(self._compressor._cstream, out_buffer) if lib.ZSTD_isError(zresult): raise ZstdError('zstd compress error: %s' % ffi.string(lib.ZSTD_getErrorName(zresult))) @@ -259,7 +302,12 @@ class ZstdCompressionObj(object): chunks = [] while source.pos < len(data): - zresult = lib.ZSTD_compressStream(self._cstream, self._out, source) + if self._mtcctx: + zresult = lib.ZSTDMT_compressStream(self._mtcctx, + self._out, source) + else: + zresult = lib.ZSTD_compressStream(self._compressor._cstream, self._out, + source) if lib.ZSTD_isError(zresult): raise ZstdError('zstd compress error: %s' % ffi.string(lib.ZSTD_getErrorName(zresult))) @@ -280,7 +328,10 @@ class ZstdCompressionObj(object): assert self._out.pos == 0 if flush_mode == COMPRESSOBJ_FLUSH_BLOCK: - zresult = lib.ZSTD_flushStream(self._cstream, self._out) + if self._mtcctx: + zresult = lib.ZSTDMT_flushStream(self._mtcctx, self._out) + else: + zresult = lib.ZSTD_flushStream(self._compressor._cstream, self._out) if lib.ZSTD_isError(zresult): raise ZstdError('zstd compress error: %s' % ffi.string(lib.ZSTD_getErrorName(zresult))) @@ -301,7 +352,10 @@ class ZstdCompressionObj(object): chunks = [] while True: - zresult = lib.ZSTD_endStream(self._cstream, self._out) + if self._mtcctx: + zresult = lib.ZSTDMT_endStream(self._mtcctx, self._out) + else: + zresult = lib.ZSTD_endStream(self._compressor._cstream, self._out) if lib.ZSTD_isError(zresult): raise ZstdError('error ending compression stream: %s' % ffi.string(lib.ZSTD_getErroName(zresult))) @@ -313,21 +367,21 @@ class ZstdCompressionObj(object): if not zresult: break - # GC compression stream immediately. - self._cstream = None - return b''.join(chunks) class ZstdCompressor(object): def __init__(self, level=3, dict_data=None, compression_params=None, write_checksum=False, write_content_size=False, - write_dict_id=True): + write_dict_id=True, threads=0): if level < 1: raise ValueError('level must be greater than 0') elif level > lib.ZSTD_maxCLevel(): raise ValueError('level must be less than %d' % lib.ZSTD_maxCLevel()) + if threads < 0: + threads = _cpu_count() + self._compression_level = level self._dict_data = dict_data self._cparams = compression_params @@ -336,16 +390,33 @@ class ZstdCompressor(object): self._fparams.contentSizeFlag = write_content_size self._fparams.noDictIDFlag = not write_dict_id - cctx = lib.ZSTD_createCCtx() - if cctx == ffi.NULL: - raise MemoryError() + if threads: + cctx = lib.ZSTDMT_createCCtx(threads) + if cctx == ffi.NULL: + raise MemoryError() - self._cctx = ffi.gc(cctx, lib.ZSTD_freeCCtx) + self._cctx = ffi.gc(cctx, lib.ZSTDMT_freeCCtx) + self._multithreaded = True + else: + cctx = lib.ZSTD_createCCtx() + if cctx == ffi.NULL: + raise MemoryError() + + self._cctx = ffi.gc(cctx, lib.ZSTD_freeCCtx) + self._multithreaded = False + + self._cstream = None def compress(self, data, allow_empty=False): if len(data) == 0 and self._fparams.contentSizeFlag and not allow_empty: raise ValueError('cannot write empty inputs when writing content sizes') + if self._multithreaded and self._dict_data: + raise ZstdError('compress() cannot be used with both dictionaries and multi-threaded compression') + + if self._multithreaded and self._cparams: + raise ZstdError('compress() cannot be used with both compression parameters and multi-threaded compression') + # TODO use a CDict for performance. dict_data = ffi.NULL dict_size = 0 @@ -365,11 +436,17 @@ class ZstdCompressor(object): dest_size = lib.ZSTD_compressBound(len(data)) out = new_nonzero('char[]', dest_size) - zresult = lib.ZSTD_compress_advanced(self._cctx, - ffi.addressof(out), dest_size, - data, len(data), - dict_data, dict_size, - params) + if self._multithreaded: + zresult = lib.ZSTDMT_compressCCtx(self._cctx, + ffi.addressof(out), dest_size, + data, len(data), + self._compression_level) + else: + zresult = lib.ZSTD_compress_advanced(self._cctx, + ffi.addressof(out), dest_size, + data, len(data), + dict_data, dict_size, + params) if lib.ZSTD_isError(zresult): raise ZstdError('cannot compress: %s' % @@ -378,9 +455,12 @@ class ZstdCompressor(object): return ffi.buffer(out, zresult)[:] def compressobj(self, size=0): - cstream = self._get_cstream(size) + if self._multithreaded: + self._init_mtcstream(size) + else: + self._ensure_cstream(size) + cobj = ZstdCompressionObj() - cobj._cstream = cstream cobj._out = ffi.new('ZSTD_outBuffer *') cobj._dst_buffer = ffi.new('char[]', COMPRESSION_RECOMMENDED_OUTPUT_SIZE) cobj._out.dst = cobj._dst_buffer @@ -389,6 +469,11 @@ class ZstdCompressor(object): cobj._compressor = self cobj._finished = False + if self._multithreaded: + cobj._mtcctx = self._cctx + else: + cobj._mtcctx = None + return cobj def copy_stream(self, ifh, ofh, size=0, @@ -400,7 +485,11 @@ class ZstdCompressor(object): if not hasattr(ofh, 'write'): raise ValueError('second argument must have a write() method') - cstream = self._get_cstream(size) + mt = self._multithreaded + if mt: + self._init_mtcstream(size) + else: + self._ensure_cstream(size) in_buffer = ffi.new('ZSTD_inBuffer *') out_buffer = ffi.new('ZSTD_outBuffer *') @@ -424,7 +513,11 @@ class ZstdCompressor(object): in_buffer.pos = 0 while in_buffer.pos < in_buffer.size: - zresult = lib.ZSTD_compressStream(cstream, out_buffer, in_buffer) + if mt: + zresult = lib.ZSTDMT_compressStream(self._cctx, out_buffer, in_buffer) + else: + zresult = lib.ZSTD_compressStream(self._cstream, + out_buffer, in_buffer) if lib.ZSTD_isError(zresult): raise ZstdError('zstd compress error: %s' % ffi.string(lib.ZSTD_getErrorName(zresult))) @@ -436,7 +529,10 @@ class ZstdCompressor(object): # We've finished reading. Flush the compressor. while True: - zresult = lib.ZSTD_endStream(cstream, out_buffer) + if mt: + zresult = lib.ZSTDMT_endStream(self._cctx, out_buffer) + else: + zresult = lib.ZSTD_endStream(self._cstream, out_buffer) if lib.ZSTD_isError(zresult): raise ZstdError('error ending compression stream: %s' % ffi.string(lib.ZSTD_getErrorName(zresult))) @@ -472,7 +568,10 @@ class ZstdCompressor(object): raise ValueError('must pass an object with a read() method or ' 'conforms to buffer protocol') - cstream = self._get_cstream(size) + if self._multithreaded: + self._init_mtcstream(size) + else: + self._ensure_cstream(size) in_buffer = ffi.new('ZSTD_inBuffer *') out_buffer = ffi.new('ZSTD_outBuffer *') @@ -512,7 +611,10 @@ class ZstdCompressor(object): in_buffer.pos = 0 while in_buffer.pos < in_buffer.size: - zresult = lib.ZSTD_compressStream(cstream, out_buffer, in_buffer) + if self._multithreaded: + zresult = lib.ZSTDMT_compressStream(self._cctx, out_buffer, in_buffer) + else: + zresult = lib.ZSTD_compressStream(self._cstream, out_buffer, in_buffer) if lib.ZSTD_isError(zresult): raise ZstdError('zstd compress error: %s' % ffi.string(lib.ZSTD_getErrorName(zresult))) @@ -531,7 +633,10 @@ class ZstdCompressor(object): # remains. while True: assert out_buffer.pos == 0 - zresult = lib.ZSTD_endStream(cstream, out_buffer) + if self._multithreaded: + zresult = lib.ZSTDMT_endStream(self._cctx, out_buffer) + else: + zresult = lib.ZSTD_endStream(self._cstream, out_buffer) if lib.ZSTD_isError(zresult): raise ZstdError('error ending compression stream: %s' % ffi.string(lib.ZSTD_getErrorName(zresult))) @@ -544,7 +649,15 @@ class ZstdCompressor(object): if zresult == 0: break - def _get_cstream(self, size): + def _ensure_cstream(self, size): + if self._cstream: + zresult = lib.ZSTD_resetCStream(self._cstream, size) + if lib.ZSTD_isError(zresult): + raise ZstdError('could not reset CStream: %s' % + ffi.string(lib.ZSTD_getErrorName(zresult))) + + return + cstream = lib.ZSTD_createCStream() if cstream == ffi.NULL: raise MemoryError() @@ -571,7 +684,32 @@ class ZstdCompressor(object): raise Exception('cannot init CStream: %s' % ffi.string(lib.ZSTD_getErrorName(zresult))) - return cstream + self._cstream = cstream + + def _init_mtcstream(self, size): + assert self._multithreaded + + dict_data = ffi.NULL + dict_size = 0 + if self._dict_data: + dict_data = self._dict_data.as_bytes() + dict_size = len(self._dict_data) + + zparams = ffi.new('ZSTD_parameters *')[0] + if self._cparams: + zparams.cParams = self._cparams.as_compression_parameters() + else: + zparams.cParams = lib.ZSTD_getCParams(self._compression_level, + size, dict_size) + + zparams.fParams = self._fparams + + zresult = lib.ZSTDMT_initCStream_advanced(self._cctx, dict_data, dict_size, + zparams, size) + + if lib.ZSTD_isError(zresult): + raise ZstdError('cannot init CStream: %s' % + ffi.string(lib.ZSTD_getErrorName(zresult))) class FrameParameters(object): @@ -601,9 +739,11 @@ def get_frame_parameters(data): class ZstdCompressionDict(object): - def __init__(self, data): + def __init__(self, data, k=0, d=0): assert isinstance(data, bytes_type) self._data = data + self.k = k + self.d = d def __len__(self): return len(self._data) @@ -615,7 +755,8 @@ class ZstdCompressionDict(object): return self._data -def train_dictionary(dict_size, samples, parameters=None): +def train_dictionary(dict_size, samples, selectivity=0, level=0, + notifications=0, dict_id=0): if not isinstance(samples, list): raise TypeError('samples must be a list') @@ -636,10 +777,18 @@ def train_dictionary(dict_size, samples, dict_data = new_nonzero('char[]', dict_size) - zresult = lib.ZDICT_trainFromBuffer(ffi.addressof(dict_data), dict_size, - ffi.addressof(samples_buffer), - ffi.addressof(sample_sizes, 0), - len(samples)) + dparams = ffi.new('ZDICT_params_t *')[0] + dparams.selectivityLevel = selectivity + dparams.compressionLevel = level + dparams.notificationLevel = notifications + dparams.dictID = dict_id + + zresult = lib.ZDICT_trainFromBuffer_advanced( + ffi.addressof(dict_data), dict_size, + ffi.addressof(samples_buffer), + ffi.addressof(sample_sizes, 0), len(samples), + dparams) + if lib.ZDICT_isError(zresult): raise ZstdError('Cannot train dict: %s' % ffi.string(lib.ZDICT_getErrorName(zresult))) @@ -647,16 +796,73 @@ def train_dictionary(dict_size, samples, return ZstdCompressionDict(ffi.buffer(dict_data, zresult)[:]) +def train_cover_dictionary(dict_size, samples, k=0, d=0, + notifications=0, dict_id=0, level=0, optimize=False, + steps=0, threads=0): + if not isinstance(samples, list): + raise TypeError('samples must be a list') + + if threads < 0: + threads = _cpu_count() + + total_size = sum(map(len, samples)) + + samples_buffer = new_nonzero('char[]', total_size) + sample_sizes = new_nonzero('size_t[]', len(samples)) + + offset = 0 + for i, sample in enumerate(samples): + if not isinstance(sample, bytes_type): + raise ValueError('samples must be bytes') + + l = len(sample) + ffi.memmove(samples_buffer + offset, sample, l) + offset += l + sample_sizes[i] = l + + dict_data = new_nonzero('char[]', dict_size) + + dparams = ffi.new('COVER_params_t *')[0] + dparams.k = k + dparams.d = d + dparams.steps = steps + dparams.nbThreads = threads + dparams.notificationLevel = notifications + dparams.dictID = dict_id + dparams.compressionLevel = level + + if optimize: + zresult = lib.COVER_optimizeTrainFromBuffer( + ffi.addressof(dict_data), dict_size, + ffi.addressof(samples_buffer), + ffi.addressof(sample_sizes, 0), len(samples), + ffi.addressof(dparams)) + else: + zresult = lib.COVER_trainFromBuffer( + ffi.addressof(dict_data), dict_size, + ffi.addressof(samples_buffer), + ffi.addressof(sample_sizes, 0), len(samples), + dparams) + + if lib.ZDICT_isError(zresult): + raise ZstdError('cannot train dict: %s' % + ffi.string(lib.ZDICT_getErrorName(zresult))) + + return ZstdCompressionDict(ffi.buffer(dict_data, zresult)[:], + k=dparams.k, d=dparams.d) + + class ZstdDecompressionObj(object): def __init__(self, decompressor): self._decompressor = decompressor - self._dstream = self._decompressor._get_dstream() self._finished = False def decompress(self, data): if self._finished: raise ZstdError('cannot use a decompressobj multiple times') + assert(self._decompressor._dstream) + in_buffer = ffi.new('ZSTD_inBuffer *') out_buffer = ffi.new('ZSTD_outBuffer *') @@ -673,14 +879,14 @@ class ZstdDecompressionObj(object): chunks = [] while in_buffer.pos < in_buffer.size: - zresult = lib.ZSTD_decompressStream(self._dstream, out_buffer, in_buffer) + zresult = lib.ZSTD_decompressStream(self._decompressor._dstream, + out_buffer, in_buffer) if lib.ZSTD_isError(zresult): raise ZstdError('zstd decompressor error: %s' % ffi.string(lib.ZSTD_getErrorName(zresult))) if zresult == 0: self._finished = True - self._dstream = None self._decompressor = None if out_buffer.pos: @@ -695,28 +901,26 @@ class ZstdDecompressionWriter(object): self._decompressor = decompressor self._writer = writer self._write_size = write_size - self._dstream = None self._entered = False def __enter__(self): if self._entered: raise ZstdError('cannot __enter__ multiple times') - self._dstream = self._decompressor._get_dstream() + self._decompressor._ensure_dstream() self._entered = True return self def __exit__(self, exc_type, exc_value, exc_tb): self._entered = False - self._dstream = None def memory_size(self): - if not self._dstream: + if not self._decompressor._dstream: raise ZstdError('cannot determine size of inactive decompressor ' 'call when context manager is active') - return lib.ZSTD_sizeof_DStream(self._dstream) + return lib.ZSTD_sizeof_DStream(self._decompressor._dstream) def write(self, data): if not self._entered: @@ -737,8 +941,10 @@ class ZstdDecompressionWriter(object): out_buffer.size = len(dst_buffer) out_buffer.pos = 0 + dstream = self._decompressor._dstream + while in_buffer.pos < in_buffer.size: - zresult = lib.ZSTD_decompressStream(self._dstream, out_buffer, in_buffer) + zresult = lib.ZSTD_decompressStream(dstream, out_buffer, in_buffer) if lib.ZSTD_isError(zresult): raise ZstdError('zstd decompress error: %s' % ffi.string(lib.ZSTD_getErrorName(zresult))) @@ -760,6 +966,7 @@ class ZstdDecompressor(object): raise MemoryError() self._refdctx = ffi.gc(dctx, lib.ZSTD_freeDCtx) + self._dstream = None @property def _ddict(self): @@ -816,6 +1023,7 @@ class ZstdDecompressor(object): return ffi.buffer(result_buffer, zresult)[:] def decompressobj(self): + self._ensure_dstream() return ZstdDecompressionObj(self) def read_from(self, reader, read_size=DECOMPRESSION_RECOMMENDED_INPUT_SIZE, @@ -843,7 +1051,7 @@ class ZstdDecompressor(object): buffer_offset = skip_bytes - dstream = self._get_dstream() + self._ensure_dstream() in_buffer = ffi.new('ZSTD_inBuffer *') out_buffer = ffi.new('ZSTD_outBuffer *') @@ -878,7 +1086,7 @@ class ZstdDecompressor(object): while in_buffer.pos < in_buffer.size: assert out_buffer.pos == 0 - zresult = lib.ZSTD_decompressStream(dstream, out_buffer, in_buffer) + zresult = lib.ZSTD_decompressStream(self._dstream, out_buffer, in_buffer) if lib.ZSTD_isError(zresult): raise ZstdError('zstd decompress error: %s' % ffi.string(lib.ZSTD_getErrorName(zresult))) @@ -910,7 +1118,7 @@ class ZstdDecompressor(object): if not hasattr(ofh, 'write'): raise ValueError('second argument must have a write() method') - dstream = self._get_dstream() + self._ensure_dstream() in_buffer = ffi.new('ZSTD_inBuffer *') out_buffer = ffi.new('ZSTD_outBuffer *') @@ -936,7 +1144,7 @@ class ZstdDecompressor(object): # Flush all read data to output. while in_buffer.pos < in_buffer.size: - zresult = lib.ZSTD_decompressStream(dstream, out_buffer, in_buffer) + zresult = lib.ZSTD_decompressStream(self._dstream, out_buffer, in_buffer) if lib.ZSTD_isError(zresult): raise ZstdError('zstd decompressor error: %s' % ffi.string(lib.ZSTD_getErrorName(zresult))) @@ -1021,22 +1229,29 @@ class ZstdDecompressor(object): return ffi.buffer(last_buffer, len(last_buffer))[:] - def _get_dstream(self): - dstream = lib.ZSTD_createDStream() - if dstream == ffi.NULL: + def _ensure_dstream(self): + if self._dstream: + zresult = lib.ZSTD_resetDStream(self._dstream) + if lib.ZSTD_isError(zresult): + raise ZstdError('could not reset DStream: %s' % + ffi.string(lib.ZSTD_getErrorName(zresult))) + + return + + self._dstream = lib.ZSTD_createDStream() + if self._dstream == ffi.NULL: raise MemoryError() - dstream = ffi.gc(dstream, lib.ZSTD_freeDStream) + self._dstream = ffi.gc(self._dstream, lib.ZSTD_freeDStream) if self._dict_data: - zresult = lib.ZSTD_initDStream_usingDict(dstream, + zresult = lib.ZSTD_initDStream_usingDict(self._dstream, self._dict_data.as_bytes(), len(self._dict_data)) else: - zresult = lib.ZSTD_initDStream(dstream) + zresult = lib.ZSTD_initDStream(self._dstream) if lib.ZSTD_isError(zresult): + self._dstream = None raise ZstdError('could not initialize DStream: %s' % ffi.string(lib.ZSTD_getErrorName(zresult))) - - return dstream diff --git a/tests/test-check-py3-compat.t b/tests/test-check-py3-compat.t --- a/tests/test-check-py3-compat.t +++ b/tests/test-check-py3-compat.t @@ -7,12 +7,15 @@ contrib/python-zstandard/setup.py not using absolute_import contrib/python-zstandard/setup_zstd.py not using absolute_import contrib/python-zstandard/tests/common.py not using absolute_import + contrib/python-zstandard/tests/test_buffer_util.py not using absolute_import contrib/python-zstandard/tests/test_compressor.py not using absolute_import + contrib/python-zstandard/tests/test_compressor_fuzzing.py not using absolute_import contrib/python-zstandard/tests/test_data_structures.py not using absolute_import + contrib/python-zstandard/tests/test_data_structures_fuzzing.py not using absolute_import contrib/python-zstandard/tests/test_decompressor.py not using absolute_import + contrib/python-zstandard/tests/test_decompressor_fuzzing.py not using absolute_import contrib/python-zstandard/tests/test_estimate_sizes.py not using absolute_import contrib/python-zstandard/tests/test_module_attributes.py not using absolute_import - contrib/python-zstandard/tests/test_roundtrip.py not using absolute_import contrib/python-zstandard/tests/test_train_dictionary.py not using absolute_import i18n/check-translation.py not using absolute_import setup.py not using absolute_import