# Copyright (c) 2016-present, Gregory Szorc # All rights reserved. # # This software may be modified and distributed under the terms # of the BSD license. See the LICENSE file for details. """Python interface to the Zstandard (zstd) compression library.""" from __future__ import absolute_import, unicode_literals # This should match what the C extension exports. __all__ = [ #'BufferSegment', #'BufferSegments', #'BufferWithSegments', #'BufferWithSegmentsCollection', "CompressionParameters", "ZstdCompressionDict", "ZstdCompressionParameters", "ZstdCompressor", "ZstdError", "ZstdDecompressor", "FrameParameters", "estimate_decompression_context_size", "frame_content_size", "frame_header_size", "get_frame_parameters", "train_dictionary", # Constants. "FLUSH_BLOCK", "FLUSH_FRAME", "COMPRESSOBJ_FLUSH_FINISH", "COMPRESSOBJ_FLUSH_BLOCK", "ZSTD_VERSION", "FRAME_HEADER", "CONTENTSIZE_UNKNOWN", "CONTENTSIZE_ERROR", "MAX_COMPRESSION_LEVEL", "COMPRESSION_RECOMMENDED_INPUT_SIZE", "COMPRESSION_RECOMMENDED_OUTPUT_SIZE", "DECOMPRESSION_RECOMMENDED_INPUT_SIZE", "DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE", "MAGIC_NUMBER", "BLOCKSIZELOG_MAX", "BLOCKSIZE_MAX", "WINDOWLOG_MIN", "WINDOWLOG_MAX", "CHAINLOG_MIN", "CHAINLOG_MAX", "HASHLOG_MIN", "HASHLOG_MAX", "HASHLOG3_MAX", "MINMATCH_MIN", "MINMATCH_MAX", "SEARCHLOG_MIN", "SEARCHLOG_MAX", "SEARCHLENGTH_MIN", "SEARCHLENGTH_MAX", "TARGETLENGTH_MIN", "TARGETLENGTH_MAX", "LDM_MINMATCH_MIN", "LDM_MINMATCH_MAX", "LDM_BUCKETSIZELOG_MAX", "STRATEGY_FAST", "STRATEGY_DFAST", "STRATEGY_GREEDY", "STRATEGY_LAZY", "STRATEGY_LAZY2", "STRATEGY_BTLAZY2", "STRATEGY_BTOPT", "STRATEGY_BTULTRA", "STRATEGY_BTULTRA2", "DICT_TYPE_AUTO", "DICT_TYPE_RAWCONTENT", "DICT_TYPE_FULLDICT", "FORMAT_ZSTD1", "FORMAT_ZSTD1_MAGICLESS", ] import io import os import sys from _zstd_cffi import ( ffi, lib, ) if sys.version_info[0] == 2: bytes_type = str int_type = long else: bytes_type = bytes int_type = int COMPRESSION_RECOMMENDED_INPUT_SIZE = lib.ZSTD_CStreamInSize() COMPRESSION_RECOMMENDED_OUTPUT_SIZE = lib.ZSTD_CStreamOutSize() DECOMPRESSION_RECOMMENDED_INPUT_SIZE = lib.ZSTD_DStreamInSize() DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE = lib.ZSTD_DStreamOutSize() new_nonzero = ffi.new_allocator(should_clear_after_alloc=False) MAX_COMPRESSION_LEVEL = lib.ZSTD_maxCLevel() MAGIC_NUMBER = lib.ZSTD_MAGICNUMBER FRAME_HEADER = b"\x28\xb5\x2f\xfd" CONTENTSIZE_UNKNOWN = lib.ZSTD_CONTENTSIZE_UNKNOWN CONTENTSIZE_ERROR = lib.ZSTD_CONTENTSIZE_ERROR ZSTD_VERSION = ( lib.ZSTD_VERSION_MAJOR, lib.ZSTD_VERSION_MINOR, lib.ZSTD_VERSION_RELEASE, ) BLOCKSIZELOG_MAX = lib.ZSTD_BLOCKSIZELOG_MAX BLOCKSIZE_MAX = lib.ZSTD_BLOCKSIZE_MAX WINDOWLOG_MIN = lib.ZSTD_WINDOWLOG_MIN WINDOWLOG_MAX = lib.ZSTD_WINDOWLOG_MAX CHAINLOG_MIN = lib.ZSTD_CHAINLOG_MIN CHAINLOG_MAX = lib.ZSTD_CHAINLOG_MAX HASHLOG_MIN = lib.ZSTD_HASHLOG_MIN HASHLOG_MAX = lib.ZSTD_HASHLOG_MAX HASHLOG3_MAX = lib.ZSTD_HASHLOG3_MAX MINMATCH_MIN = lib.ZSTD_MINMATCH_MIN MINMATCH_MAX = lib.ZSTD_MINMATCH_MAX SEARCHLOG_MIN = lib.ZSTD_SEARCHLOG_MIN SEARCHLOG_MAX = lib.ZSTD_SEARCHLOG_MAX SEARCHLENGTH_MIN = lib.ZSTD_MINMATCH_MIN SEARCHLENGTH_MAX = lib.ZSTD_MINMATCH_MAX TARGETLENGTH_MIN = lib.ZSTD_TARGETLENGTH_MIN TARGETLENGTH_MAX = lib.ZSTD_TARGETLENGTH_MAX LDM_MINMATCH_MIN = lib.ZSTD_LDM_MINMATCH_MIN LDM_MINMATCH_MAX = lib.ZSTD_LDM_MINMATCH_MAX LDM_BUCKETSIZELOG_MAX = lib.ZSTD_LDM_BUCKETSIZELOG_MAX STRATEGY_FAST = lib.ZSTD_fast STRATEGY_DFAST = lib.ZSTD_dfast STRATEGY_GREEDY = lib.ZSTD_greedy STRATEGY_LAZY = lib.ZSTD_lazy STRATEGY_LAZY2 = lib.ZSTD_lazy2 STRATEGY_BTLAZY2 = lib.ZSTD_btlazy2 STRATEGY_BTOPT = lib.ZSTD_btopt STRATEGY_BTULTRA = lib.ZSTD_btultra STRATEGY_BTULTRA2 = lib.ZSTD_btultra2 DICT_TYPE_AUTO = lib.ZSTD_dct_auto DICT_TYPE_RAWCONTENT = lib.ZSTD_dct_rawContent DICT_TYPE_FULLDICT = lib.ZSTD_dct_fullDict FORMAT_ZSTD1 = lib.ZSTD_f_zstd1 FORMAT_ZSTD1_MAGICLESS = lib.ZSTD_f_zstd1_magicless FLUSH_BLOCK = 0 FLUSH_FRAME = 1 COMPRESSOBJ_FLUSH_FINISH = 0 COMPRESSOBJ_FLUSH_BLOCK = 1 def _cpu_count(): # os.cpu_count() was introducd in Python 3.4. try: return os.cpu_count() or 0 except AttributeError: pass # Linux. try: if sys.version_info[0] == 2: return os.sysconf(b"SC_NPROCESSORS_ONLN") else: return os.sysconf("SC_NPROCESSORS_ONLN") except (AttributeError, ValueError): pass # TODO implement on other platforms. return 0 class ZstdError(Exception): pass def _zstd_error(zresult): # Resolves to bytes on Python 2 and 3. We use the string for formatting # into error messages, which will be literal unicode. So convert it to # unicode. return ffi.string(lib.ZSTD_getErrorName(zresult)).decode("utf-8") def _make_cctx_params(params): res = lib.ZSTD_createCCtxParams() if res == ffi.NULL: raise MemoryError() res = ffi.gc(res, lib.ZSTD_freeCCtxParams) attrs = [ (lib.ZSTD_c_format, params.format), (lib.ZSTD_c_compressionLevel, params.compression_level), (lib.ZSTD_c_windowLog, params.window_log), (lib.ZSTD_c_hashLog, params.hash_log), (lib.ZSTD_c_chainLog, params.chain_log), (lib.ZSTD_c_searchLog, params.search_log), (lib.ZSTD_c_minMatch, params.min_match), (lib.ZSTD_c_targetLength, params.target_length), (lib.ZSTD_c_strategy, params.compression_strategy), (lib.ZSTD_c_contentSizeFlag, params.write_content_size), (lib.ZSTD_c_checksumFlag, params.write_checksum), (lib.ZSTD_c_dictIDFlag, params.write_dict_id), (lib.ZSTD_c_nbWorkers, params.threads), (lib.ZSTD_c_jobSize, params.job_size), (lib.ZSTD_c_overlapLog, params.overlap_log), (lib.ZSTD_c_forceMaxWindow, params.force_max_window), (lib.ZSTD_c_enableLongDistanceMatching, params.enable_ldm), (lib.ZSTD_c_ldmHashLog, params.ldm_hash_log), (lib.ZSTD_c_ldmMinMatch, params.ldm_min_match), (lib.ZSTD_c_ldmBucketSizeLog, params.ldm_bucket_size_log), (lib.ZSTD_c_ldmHashRateLog, params.ldm_hash_rate_log), ] for param, value in attrs: _set_compression_parameter(res, param, value) return res class ZstdCompressionParameters(object): @staticmethod def from_level(level, source_size=0, dict_size=0, **kwargs): params = lib.ZSTD_getCParams(level, source_size, dict_size) args = { "window_log": "windowLog", "chain_log": "chainLog", "hash_log": "hashLog", "search_log": "searchLog", "min_match": "minMatch", "target_length": "targetLength", "compression_strategy": "strategy", } for arg, attr in args.items(): if arg not in kwargs: kwargs[arg] = getattr(params, attr) return ZstdCompressionParameters(**kwargs) def __init__( self, format=0, compression_level=0, window_log=0, hash_log=0, chain_log=0, search_log=0, min_match=0, target_length=0, strategy=-1, compression_strategy=-1, write_content_size=1, write_checksum=0, write_dict_id=0, job_size=0, overlap_log=-1, overlap_size_log=-1, force_max_window=0, enable_ldm=0, ldm_hash_log=0, ldm_min_match=0, ldm_bucket_size_log=0, ldm_hash_rate_log=-1, ldm_hash_every_log=-1, threads=0, ): params = lib.ZSTD_createCCtxParams() if params == ffi.NULL: raise MemoryError() params = ffi.gc(params, lib.ZSTD_freeCCtxParams) self._params = params if threads < 0: threads = _cpu_count() # We need to set ZSTD_c_nbWorkers before ZSTD_c_jobSize and ZSTD_c_overlapLog # because setting ZSTD_c_nbWorkers resets the other parameters. _set_compression_parameter(params, lib.ZSTD_c_nbWorkers, threads) _set_compression_parameter(params, lib.ZSTD_c_format, format) _set_compression_parameter( params, lib.ZSTD_c_compressionLevel, compression_level ) _set_compression_parameter(params, lib.ZSTD_c_windowLog, window_log) _set_compression_parameter(params, lib.ZSTD_c_hashLog, hash_log) _set_compression_parameter(params, lib.ZSTD_c_chainLog, chain_log) _set_compression_parameter(params, lib.ZSTD_c_searchLog, search_log) _set_compression_parameter(params, lib.ZSTD_c_minMatch, min_match) _set_compression_parameter( params, lib.ZSTD_c_targetLength, target_length ) if strategy != -1 and compression_strategy != -1: raise ValueError( "cannot specify both compression_strategy and strategy" ) if compression_strategy != -1: strategy = compression_strategy elif strategy == -1: strategy = 0 _set_compression_parameter(params, lib.ZSTD_c_strategy, strategy) _set_compression_parameter( params, lib.ZSTD_c_contentSizeFlag, write_content_size ) _set_compression_parameter( params, lib.ZSTD_c_checksumFlag, write_checksum ) _set_compression_parameter(params, lib.ZSTD_c_dictIDFlag, write_dict_id) _set_compression_parameter(params, lib.ZSTD_c_jobSize, job_size) if overlap_log != -1 and overlap_size_log != -1: raise ValueError( "cannot specify both overlap_log and overlap_size_log" ) if overlap_size_log != -1: overlap_log = overlap_size_log elif overlap_log == -1: overlap_log = 0 _set_compression_parameter(params, lib.ZSTD_c_overlapLog, overlap_log) _set_compression_parameter( params, lib.ZSTD_c_forceMaxWindow, force_max_window ) _set_compression_parameter( params, lib.ZSTD_c_enableLongDistanceMatching, enable_ldm ) _set_compression_parameter(params, lib.ZSTD_c_ldmHashLog, ldm_hash_log) _set_compression_parameter( params, lib.ZSTD_c_ldmMinMatch, ldm_min_match ) _set_compression_parameter( params, lib.ZSTD_c_ldmBucketSizeLog, ldm_bucket_size_log ) if ldm_hash_rate_log != -1 and ldm_hash_every_log != -1: raise ValueError( "cannot specify both ldm_hash_rate_log and ldm_hash_every_log" ) if ldm_hash_every_log != -1: ldm_hash_rate_log = ldm_hash_every_log elif ldm_hash_rate_log == -1: ldm_hash_rate_log = 0 _set_compression_parameter( params, lib.ZSTD_c_ldmHashRateLog, ldm_hash_rate_log ) @property def format(self): return _get_compression_parameter(self._params, lib.ZSTD_c_format) @property def compression_level(self): return _get_compression_parameter( self._params, lib.ZSTD_c_compressionLevel ) @property def window_log(self): return _get_compression_parameter(self._params, lib.ZSTD_c_windowLog) @property def hash_log(self): return _get_compression_parameter(self._params, lib.ZSTD_c_hashLog) @property def chain_log(self): return _get_compression_parameter(self._params, lib.ZSTD_c_chainLog) @property def search_log(self): return _get_compression_parameter(self._params, lib.ZSTD_c_searchLog) @property def min_match(self): return _get_compression_parameter(self._params, lib.ZSTD_c_minMatch) @property def target_length(self): return _get_compression_parameter(self._params, lib.ZSTD_c_targetLength) @property def compression_strategy(self): return _get_compression_parameter(self._params, lib.ZSTD_c_strategy) @property def write_content_size(self): return _get_compression_parameter( self._params, lib.ZSTD_c_contentSizeFlag ) @property def write_checksum(self): return _get_compression_parameter(self._params, lib.ZSTD_c_checksumFlag) @property def write_dict_id(self): return _get_compression_parameter(self._params, lib.ZSTD_c_dictIDFlag) @property def job_size(self): return _get_compression_parameter(self._params, lib.ZSTD_c_jobSize) @property def overlap_log(self): return _get_compression_parameter(self._params, lib.ZSTD_c_overlapLog) @property def overlap_size_log(self): return self.overlap_log @property def force_max_window(self): return _get_compression_parameter( self._params, lib.ZSTD_c_forceMaxWindow ) @property def enable_ldm(self): return _get_compression_parameter( self._params, lib.ZSTD_c_enableLongDistanceMatching ) @property def ldm_hash_log(self): return _get_compression_parameter(self._params, lib.ZSTD_c_ldmHashLog) @property def ldm_min_match(self): return _get_compression_parameter(self._params, lib.ZSTD_c_ldmMinMatch) @property def ldm_bucket_size_log(self): return _get_compression_parameter( self._params, lib.ZSTD_c_ldmBucketSizeLog ) @property def ldm_hash_rate_log(self): return _get_compression_parameter( self._params, lib.ZSTD_c_ldmHashRateLog ) @property def ldm_hash_every_log(self): return self.ldm_hash_rate_log @property def threads(self): return _get_compression_parameter(self._params, lib.ZSTD_c_nbWorkers) def estimated_compression_context_size(self): return lib.ZSTD_estimateCCtxSize_usingCCtxParams(self._params) CompressionParameters = ZstdCompressionParameters def estimate_decompression_context_size(): return lib.ZSTD_estimateDCtxSize() def _set_compression_parameter(params, param, value): zresult = lib.ZSTD_CCtxParams_setParameter(params, param, value) if lib.ZSTD_isError(zresult): raise ZstdError( "unable to set compression context parameter: %s" % _zstd_error(zresult) ) def _get_compression_parameter(params, param): result = ffi.new("int *") zresult = lib.ZSTD_CCtxParams_getParameter(params, param, result) if lib.ZSTD_isError(zresult): raise ZstdError( "unable to get compression context parameter: %s" % _zstd_error(zresult) ) return result[0] class ZstdCompressionWriter(object): def __init__( self, compressor, writer, source_size, write_size, write_return_read ): self._compressor = compressor self._writer = writer self._write_size = write_size self._write_return_read = bool(write_return_read) self._entered = False self._closed = False self._bytes_compressed = 0 self._dst_buffer = ffi.new("char[]", write_size) self._out_buffer = ffi.new("ZSTD_outBuffer *") self._out_buffer.dst = self._dst_buffer self._out_buffer.size = len(self._dst_buffer) self._out_buffer.pos = 0 zresult = lib.ZSTD_CCtx_setPledgedSrcSize(compressor._cctx, source_size) if lib.ZSTD_isError(zresult): raise ZstdError( "error setting source size: %s" % _zstd_error(zresult) ) def __enter__(self): if self._closed: raise ValueError("stream is closed") if self._entered: raise ZstdError("cannot __enter__ multiple times") self._entered = True return self def __exit__(self, exc_type, exc_value, exc_tb): self._entered = False if not exc_type and not exc_value and not exc_tb: self.close() self._compressor = None return False def memory_size(self): return lib.ZSTD_sizeof_CCtx(self._compressor._cctx) def fileno(self): f = getattr(self._writer, "fileno", None) if f: return f() else: raise OSError("fileno not available on underlying writer") def close(self): if self._closed: return try: self.flush(FLUSH_FRAME) finally: self._closed = True # Call close() on underlying stream as well. f = getattr(self._writer, "close", None) if f: f() @property def closed(self): return self._closed def isatty(self): return False def readable(self): return False def readline(self, size=-1): raise io.UnsupportedOperation() def readlines(self, hint=-1): raise io.UnsupportedOperation() def seek(self, offset, whence=None): raise io.UnsupportedOperation() def seekable(self): return False def truncate(self, size=None): raise io.UnsupportedOperation() def writable(self): return True def writelines(self, lines): raise NotImplementedError("writelines() is not yet implemented") def read(self, size=-1): raise io.UnsupportedOperation() def readall(self): raise io.UnsupportedOperation() def readinto(self, b): raise io.UnsupportedOperation() def write(self, data): if self._closed: raise ValueError("stream is closed") total_write = 0 data_buffer = ffi.from_buffer(data) in_buffer = ffi.new("ZSTD_inBuffer *") in_buffer.src = data_buffer in_buffer.size = len(data_buffer) in_buffer.pos = 0 out_buffer = self._out_buffer out_buffer.pos = 0 while in_buffer.pos < in_buffer.size: zresult = lib.ZSTD_compressStream2( self._compressor._cctx, out_buffer, in_buffer, lib.ZSTD_e_continue, ) if lib.ZSTD_isError(zresult): raise ZstdError( "zstd compress error: %s" % _zstd_error(zresult) ) if out_buffer.pos: self._writer.write( ffi.buffer(out_buffer.dst, out_buffer.pos)[:] ) total_write += out_buffer.pos self._bytes_compressed += out_buffer.pos out_buffer.pos = 0 if self._write_return_read: return in_buffer.pos else: return total_write def flush(self, flush_mode=FLUSH_BLOCK): if flush_mode == FLUSH_BLOCK: flush = lib.ZSTD_e_flush elif flush_mode == FLUSH_FRAME: flush = lib.ZSTD_e_end else: raise ValueError("unknown flush_mode: %r" % flush_mode) if self._closed: raise ValueError("stream is closed") total_write = 0 out_buffer = self._out_buffer out_buffer.pos = 0 in_buffer = ffi.new("ZSTD_inBuffer *") in_buffer.src = ffi.NULL in_buffer.size = 0 in_buffer.pos = 0 while True: zresult = lib.ZSTD_compressStream2( self._compressor._cctx, out_buffer, in_buffer, flush ) if lib.ZSTD_isError(zresult): raise ZstdError( "zstd compress error: %s" % _zstd_error(zresult) ) if out_buffer.pos: self._writer.write( ffi.buffer(out_buffer.dst, out_buffer.pos)[:] ) total_write += out_buffer.pos self._bytes_compressed += out_buffer.pos out_buffer.pos = 0 if not zresult: break return total_write def tell(self): return self._bytes_compressed class ZstdCompressionObj(object): def compress(self, data): if self._finished: raise ZstdError("cannot call compress() after compressor finished") data_buffer = ffi.from_buffer(data) source = ffi.new("ZSTD_inBuffer *") source.src = data_buffer source.size = len(data_buffer) source.pos = 0 chunks = [] while source.pos < len(data): zresult = lib.ZSTD_compressStream2( self._compressor._cctx, self._out, source, lib.ZSTD_e_continue ) if lib.ZSTD_isError(zresult): raise ZstdError( "zstd compress error: %s" % _zstd_error(zresult) ) if self._out.pos: chunks.append(ffi.buffer(self._out.dst, self._out.pos)[:]) self._out.pos = 0 return b"".join(chunks) def flush(self, flush_mode=COMPRESSOBJ_FLUSH_FINISH): if flush_mode not in ( COMPRESSOBJ_FLUSH_FINISH, COMPRESSOBJ_FLUSH_BLOCK, ): raise ValueError("flush mode not recognized") if self._finished: raise ZstdError("compressor object already finished") if flush_mode == COMPRESSOBJ_FLUSH_BLOCK: z_flush_mode = lib.ZSTD_e_flush elif flush_mode == COMPRESSOBJ_FLUSH_FINISH: z_flush_mode = lib.ZSTD_e_end self._finished = True else: raise ZstdError("unhandled flush mode") assert self._out.pos == 0 in_buffer = ffi.new("ZSTD_inBuffer *") in_buffer.src = ffi.NULL in_buffer.size = 0 in_buffer.pos = 0 chunks = [] while True: zresult = lib.ZSTD_compressStream2( self._compressor._cctx, self._out, in_buffer, z_flush_mode ) if lib.ZSTD_isError(zresult): raise ZstdError( "error ending compression stream: %s" % _zstd_error(zresult) ) if self._out.pos: chunks.append(ffi.buffer(self._out.dst, self._out.pos)[:]) self._out.pos = 0 if not zresult: break return b"".join(chunks) class ZstdCompressionChunker(object): def __init__(self, compressor, chunk_size): self._compressor = compressor self._out = ffi.new("ZSTD_outBuffer *") self._dst_buffer = ffi.new("char[]", chunk_size) self._out.dst = self._dst_buffer self._out.size = chunk_size self._out.pos = 0 self._in = ffi.new("ZSTD_inBuffer *") self._in.src = ffi.NULL self._in.size = 0 self._in.pos = 0 self._finished = False def compress(self, data): if self._finished: raise ZstdError("cannot call compress() after compression finished") if self._in.src != ffi.NULL: raise ZstdError( "cannot perform operation before consuming output " "from previous operation" ) data_buffer = ffi.from_buffer(data) if not len(data_buffer): return self._in.src = data_buffer self._in.size = len(data_buffer) self._in.pos = 0 while self._in.pos < self._in.size: zresult = lib.ZSTD_compressStream2( self._compressor._cctx, self._out, self._in, lib.ZSTD_e_continue ) if self._in.pos == self._in.size: self._in.src = ffi.NULL self._in.size = 0 self._in.pos = 0 if lib.ZSTD_isError(zresult): raise ZstdError( "zstd compress error: %s" % _zstd_error(zresult) ) if self._out.pos == self._out.size: yield ffi.buffer(self._out.dst, self._out.pos)[:] self._out.pos = 0 def flush(self): if self._finished: raise ZstdError("cannot call flush() after compression finished") if self._in.src != ffi.NULL: raise ZstdError( "cannot call flush() before consuming output from " "previous operation" ) while True: zresult = lib.ZSTD_compressStream2( self._compressor._cctx, self._out, self._in, lib.ZSTD_e_flush ) if lib.ZSTD_isError(zresult): raise ZstdError( "zstd compress error: %s" % _zstd_error(zresult) ) if self._out.pos: yield ffi.buffer(self._out.dst, self._out.pos)[:] self._out.pos = 0 if not zresult: return def finish(self): if self._finished: raise ZstdError("cannot call finish() after compression finished") if self._in.src != ffi.NULL: raise ZstdError( "cannot call finish() before consuming output from " "previous operation" ) while True: zresult = lib.ZSTD_compressStream2( self._compressor._cctx, self._out, self._in, lib.ZSTD_e_end ) if lib.ZSTD_isError(zresult): raise ZstdError( "zstd compress error: %s" % _zstd_error(zresult) ) if self._out.pos: yield ffi.buffer(self._out.dst, self._out.pos)[:] self._out.pos = 0 if not zresult: self._finished = True return class ZstdCompressionReader(object): def __init__(self, compressor, source, read_size): self._compressor = compressor self._source = source self._read_size = read_size self._entered = False self._closed = False self._bytes_compressed = 0 self._finished_input = False self._finished_output = False self._in_buffer = ffi.new("ZSTD_inBuffer *") # Holds a ref so backing bytes in self._in_buffer stay alive. self._source_buffer = None def __enter__(self): if self._entered: raise ValueError("cannot __enter__ multiple times") self._entered = True return self def __exit__(self, exc_type, exc_value, exc_tb): self._entered = False self._closed = True self._source = None self._compressor = None return False def readable(self): return True def writable(self): return False def seekable(self): return False def readline(self): raise io.UnsupportedOperation() def readlines(self): raise io.UnsupportedOperation() def write(self, data): raise OSError("stream is not writable") def writelines(self, ignored): raise OSError("stream is not writable") def isatty(self): return False def flush(self): return None def close(self): self._closed = True return None @property def closed(self): return self._closed def tell(self): return self._bytes_compressed def readall(self): chunks = [] while True: chunk = self.read(1048576) if not chunk: break chunks.append(chunk) return b"".join(chunks) def __iter__(self): raise io.UnsupportedOperation() def __next__(self): raise io.UnsupportedOperation() next = __next__ def _read_input(self): if self._finished_input: return if hasattr(self._source, "read"): data = self._source.read(self._read_size) if not data: self._finished_input = True return self._source_buffer = ffi.from_buffer(data) self._in_buffer.src = self._source_buffer self._in_buffer.size = len(self._source_buffer) self._in_buffer.pos = 0 else: self._source_buffer = ffi.from_buffer(self._source) self._in_buffer.src = self._source_buffer self._in_buffer.size = len(self._source_buffer) self._in_buffer.pos = 0 def _compress_into_buffer(self, out_buffer): if self._in_buffer.pos >= self._in_buffer.size: return old_pos = out_buffer.pos zresult = lib.ZSTD_compressStream2( self._compressor._cctx, out_buffer, self._in_buffer, lib.ZSTD_e_continue, ) self._bytes_compressed += out_buffer.pos - old_pos if self._in_buffer.pos == self._in_buffer.size: self._in_buffer.src = ffi.NULL self._in_buffer.pos = 0 self._in_buffer.size = 0 self._source_buffer = None if not hasattr(self._source, "read"): self._finished_input = True if lib.ZSTD_isError(zresult): raise ZstdError("zstd compress error: %s", _zstd_error(zresult)) return out_buffer.pos and out_buffer.pos == out_buffer.size def read(self, size=-1): if self._closed: raise ValueError("stream is closed") if size < -1: raise ValueError("cannot read negative amounts less than -1") if size == -1: return self.readall() if self._finished_output or size == 0: return b"" # Need a dedicated ref to dest buffer otherwise it gets collected. dst_buffer = ffi.new("char[]", size) out_buffer = ffi.new("ZSTD_outBuffer *") out_buffer.dst = dst_buffer out_buffer.size = size out_buffer.pos = 0 if self._compress_into_buffer(out_buffer): return ffi.buffer(out_buffer.dst, out_buffer.pos)[:] while not self._finished_input: self._read_input() if self._compress_into_buffer(out_buffer): return ffi.buffer(out_buffer.dst, out_buffer.pos)[:] # EOF old_pos = out_buffer.pos zresult = lib.ZSTD_compressStream2( self._compressor._cctx, out_buffer, self._in_buffer, lib.ZSTD_e_end ) self._bytes_compressed += out_buffer.pos - old_pos if lib.ZSTD_isError(zresult): raise ZstdError( "error ending compression stream: %s", _zstd_error(zresult) ) if zresult == 0: self._finished_output = True return ffi.buffer(out_buffer.dst, out_buffer.pos)[:] def read1(self, size=-1): if self._closed: raise ValueError("stream is closed") if size < -1: raise ValueError("cannot read negative amounts less than -1") if self._finished_output or size == 0: return b"" # -1 returns arbitrary number of bytes. if size == -1: size = COMPRESSION_RECOMMENDED_OUTPUT_SIZE dst_buffer = ffi.new("char[]", size) out_buffer = ffi.new("ZSTD_outBuffer *") out_buffer.dst = dst_buffer out_buffer.size = size out_buffer.pos = 0 # read1() dictates that we can perform at most 1 call to the # underlying stream to get input. However, we can't satisfy this # restriction with compression because not all input generates output. # It is possible to perform a block flush in order to ensure output. # But this may not be desirable behavior. So we allow multiple read() # to the underlying stream. But unlike read(), we stop once we have # any output. self._compress_into_buffer(out_buffer) if out_buffer.pos: return ffi.buffer(out_buffer.dst, out_buffer.pos)[:] while not self._finished_input: self._read_input() # If we've filled the output buffer, return immediately. if self._compress_into_buffer(out_buffer): return ffi.buffer(out_buffer.dst, out_buffer.pos)[:] # If we've populated the output buffer and we're not at EOF, # also return, as we've satisfied the read1() limits. if out_buffer.pos and not self._finished_input: return ffi.buffer(out_buffer.dst, out_buffer.pos)[:] # Else if we're at EOS and we have room left in the buffer, # fall through to below and try to add more data to the output. # EOF. old_pos = out_buffer.pos zresult = lib.ZSTD_compressStream2( self._compressor._cctx, out_buffer, self._in_buffer, lib.ZSTD_e_end ) self._bytes_compressed += out_buffer.pos - old_pos if lib.ZSTD_isError(zresult): raise ZstdError( "error ending compression stream: %s" % _zstd_error(zresult) ) if zresult == 0: self._finished_output = True return ffi.buffer(out_buffer.dst, out_buffer.pos)[:] def readinto(self, b): if self._closed: raise ValueError("stream is closed") if self._finished_output: return 0 # TODO use writable=True once we require CFFI >= 1.12. dest_buffer = ffi.from_buffer(b) ffi.memmove(b, b"", 0) out_buffer = ffi.new("ZSTD_outBuffer *") out_buffer.dst = dest_buffer out_buffer.size = len(dest_buffer) out_buffer.pos = 0 if self._compress_into_buffer(out_buffer): return out_buffer.pos while not self._finished_input: self._read_input() if self._compress_into_buffer(out_buffer): return out_buffer.pos # EOF. old_pos = out_buffer.pos zresult = lib.ZSTD_compressStream2( self._compressor._cctx, out_buffer, self._in_buffer, lib.ZSTD_e_end ) self._bytes_compressed += out_buffer.pos - old_pos if lib.ZSTD_isError(zresult): raise ZstdError( "error ending compression stream: %s", _zstd_error(zresult) ) if zresult == 0: self._finished_output = True return out_buffer.pos def readinto1(self, b): if self._closed: raise ValueError("stream is closed") if self._finished_output: return 0 # TODO use writable=True once we require CFFI >= 1.12. dest_buffer = ffi.from_buffer(b) ffi.memmove(b, b"", 0) out_buffer = ffi.new("ZSTD_outBuffer *") out_buffer.dst = dest_buffer out_buffer.size = len(dest_buffer) out_buffer.pos = 0 self._compress_into_buffer(out_buffer) if out_buffer.pos: return out_buffer.pos while not self._finished_input: self._read_input() if self._compress_into_buffer(out_buffer): return out_buffer.pos if out_buffer.pos and not self._finished_input: return out_buffer.pos # EOF. old_pos = out_buffer.pos zresult = lib.ZSTD_compressStream2( self._compressor._cctx, out_buffer, self._in_buffer, lib.ZSTD_e_end ) self._bytes_compressed += out_buffer.pos - old_pos if lib.ZSTD_isError(zresult): raise ZstdError( "error ending compression stream: %s" % _zstd_error(zresult) ) if zresult == 0: self._finished_output = True return out_buffer.pos class ZstdCompressor(object): def __init__( self, level=3, dict_data=None, compression_params=None, write_checksum=None, write_content_size=None, write_dict_id=None, threads=0, ): if level > lib.ZSTD_maxCLevel(): raise ValueError( "level must be less than %d" % lib.ZSTD_maxCLevel() ) if threads < 0: threads = _cpu_count() if compression_params and write_checksum is not None: raise ValueError( "cannot define compression_params and " "write_checksum" ) if compression_params and write_content_size is not None: raise ValueError( "cannot define compression_params and " "write_content_size" ) if compression_params and write_dict_id is not None: raise ValueError( "cannot define compression_params and " "write_dict_id" ) if compression_params and threads: raise ValueError("cannot define compression_params and threads") if compression_params: self._params = _make_cctx_params(compression_params) else: if write_dict_id is None: write_dict_id = True params = lib.ZSTD_createCCtxParams() if params == ffi.NULL: raise MemoryError() self._params = ffi.gc(params, lib.ZSTD_freeCCtxParams) _set_compression_parameter( self._params, lib.ZSTD_c_compressionLevel, level ) _set_compression_parameter( self._params, lib.ZSTD_c_contentSizeFlag, write_content_size if write_content_size is not None else 1, ) _set_compression_parameter( self._params, lib.ZSTD_c_checksumFlag, 1 if write_checksum else 0, ) _set_compression_parameter( self._params, lib.ZSTD_c_dictIDFlag, 1 if write_dict_id else 0 ) if threads: _set_compression_parameter( self._params, lib.ZSTD_c_nbWorkers, threads ) cctx = lib.ZSTD_createCCtx() if cctx == ffi.NULL: raise MemoryError() self._cctx = cctx self._dict_data = dict_data # We defer setting up garbage collection until after calling # _setup_cctx() to ensure the memory size estimate is more accurate. try: self._setup_cctx() finally: self._cctx = ffi.gc( cctx, lib.ZSTD_freeCCtx, size=lib.ZSTD_sizeof_CCtx(cctx) ) def _setup_cctx(self): zresult = lib.ZSTD_CCtx_setParametersUsingCCtxParams( self._cctx, self._params ) if lib.ZSTD_isError(zresult): raise ZstdError( "could not set compression parameters: %s" % _zstd_error(zresult) ) dict_data = self._dict_data if dict_data: if dict_data._cdict: zresult = lib.ZSTD_CCtx_refCDict(self._cctx, dict_data._cdict) else: zresult = lib.ZSTD_CCtx_loadDictionary_advanced( self._cctx, dict_data.as_bytes(), len(dict_data), lib.ZSTD_dlm_byRef, dict_data._dict_type, ) if lib.ZSTD_isError(zresult): raise ZstdError( "could not load compression dictionary: %s" % _zstd_error(zresult) ) def memory_size(self): return lib.ZSTD_sizeof_CCtx(self._cctx) def compress(self, data): lib.ZSTD_CCtx_reset(self._cctx, lib.ZSTD_reset_session_only) data_buffer = ffi.from_buffer(data) dest_size = lib.ZSTD_compressBound(len(data_buffer)) out = new_nonzero("char[]", dest_size) zresult = lib.ZSTD_CCtx_setPledgedSrcSize(self._cctx, len(data_buffer)) if lib.ZSTD_isError(zresult): raise ZstdError( "error setting source size: %s" % _zstd_error(zresult) ) out_buffer = ffi.new("ZSTD_outBuffer *") in_buffer = ffi.new("ZSTD_inBuffer *") out_buffer.dst = out out_buffer.size = dest_size out_buffer.pos = 0 in_buffer.src = data_buffer in_buffer.size = len(data_buffer) in_buffer.pos = 0 zresult = lib.ZSTD_compressStream2( self._cctx, out_buffer, in_buffer, lib.ZSTD_e_end ) if lib.ZSTD_isError(zresult): raise ZstdError("cannot compress: %s" % _zstd_error(zresult)) elif zresult: raise ZstdError("unexpected partial frame flush") return ffi.buffer(out, out_buffer.pos)[:] def compressobj(self, size=-1): lib.ZSTD_CCtx_reset(self._cctx, lib.ZSTD_reset_session_only) if size < 0: size = lib.ZSTD_CONTENTSIZE_UNKNOWN zresult = lib.ZSTD_CCtx_setPledgedSrcSize(self._cctx, size) if lib.ZSTD_isError(zresult): raise ZstdError( "error setting source size: %s" % _zstd_error(zresult) ) cobj = ZstdCompressionObj() cobj._out = ffi.new("ZSTD_outBuffer *") cobj._dst_buffer = ffi.new( "char[]", COMPRESSION_RECOMMENDED_OUTPUT_SIZE ) cobj._out.dst = cobj._dst_buffer cobj._out.size = COMPRESSION_RECOMMENDED_OUTPUT_SIZE cobj._out.pos = 0 cobj._compressor = self cobj._finished = False return cobj def chunker(self, size=-1, chunk_size=COMPRESSION_RECOMMENDED_OUTPUT_SIZE): lib.ZSTD_CCtx_reset(self._cctx, lib.ZSTD_reset_session_only) if size < 0: size = lib.ZSTD_CONTENTSIZE_UNKNOWN zresult = lib.ZSTD_CCtx_setPledgedSrcSize(self._cctx, size) if lib.ZSTD_isError(zresult): raise ZstdError( "error setting source size: %s" % _zstd_error(zresult) ) return ZstdCompressionChunker(self, chunk_size=chunk_size) def copy_stream( self, ifh, ofh, size=-1, read_size=COMPRESSION_RECOMMENDED_INPUT_SIZE, write_size=COMPRESSION_RECOMMENDED_OUTPUT_SIZE, ): if not hasattr(ifh, "read"): raise ValueError("first argument must have a read() method") if not hasattr(ofh, "write"): raise ValueError("second argument must have a write() method") lib.ZSTD_CCtx_reset(self._cctx, lib.ZSTD_reset_session_only) if size < 0: size = lib.ZSTD_CONTENTSIZE_UNKNOWN zresult = lib.ZSTD_CCtx_setPledgedSrcSize(self._cctx, size) if lib.ZSTD_isError(zresult): raise ZstdError( "error setting source size: %s" % _zstd_error(zresult) ) in_buffer = ffi.new("ZSTD_inBuffer *") out_buffer = ffi.new("ZSTD_outBuffer *") dst_buffer = ffi.new("char[]", write_size) out_buffer.dst = dst_buffer out_buffer.size = write_size out_buffer.pos = 0 total_read, total_write = 0, 0 while True: data = ifh.read(read_size) if not data: break data_buffer = ffi.from_buffer(data) total_read += len(data_buffer) in_buffer.src = data_buffer in_buffer.size = len(data_buffer) in_buffer.pos = 0 while in_buffer.pos < in_buffer.size: zresult = lib.ZSTD_compressStream2( self._cctx, out_buffer, in_buffer, lib.ZSTD_e_continue ) if lib.ZSTD_isError(zresult): raise ZstdError( "zstd compress error: %s" % _zstd_error(zresult) ) if out_buffer.pos: ofh.write(ffi.buffer(out_buffer.dst, out_buffer.pos)) total_write += out_buffer.pos out_buffer.pos = 0 # We've finished reading. Flush the compressor. while True: zresult = lib.ZSTD_compressStream2( self._cctx, out_buffer, in_buffer, lib.ZSTD_e_end ) if lib.ZSTD_isError(zresult): raise ZstdError( "error ending compression stream: %s" % _zstd_error(zresult) ) if out_buffer.pos: ofh.write(ffi.buffer(out_buffer.dst, out_buffer.pos)) total_write += out_buffer.pos out_buffer.pos = 0 if zresult == 0: break return total_read, total_write def stream_reader( self, source, size=-1, read_size=COMPRESSION_RECOMMENDED_INPUT_SIZE ): lib.ZSTD_CCtx_reset(self._cctx, lib.ZSTD_reset_session_only) try: size = len(source) except Exception: pass if size < 0: size = lib.ZSTD_CONTENTSIZE_UNKNOWN zresult = lib.ZSTD_CCtx_setPledgedSrcSize(self._cctx, size) if lib.ZSTD_isError(zresult): raise ZstdError( "error setting source size: %s" % _zstd_error(zresult) ) return ZstdCompressionReader(self, source, read_size) def stream_writer( self, writer, size=-1, write_size=COMPRESSION_RECOMMENDED_OUTPUT_SIZE, write_return_read=False, ): if not hasattr(writer, "write"): raise ValueError("must pass an object with a write() method") lib.ZSTD_CCtx_reset(self._cctx, lib.ZSTD_reset_session_only) if size < 0: size = lib.ZSTD_CONTENTSIZE_UNKNOWN return ZstdCompressionWriter( self, writer, size, write_size, write_return_read ) write_to = stream_writer def read_to_iter( self, reader, size=-1, read_size=COMPRESSION_RECOMMENDED_INPUT_SIZE, write_size=COMPRESSION_RECOMMENDED_OUTPUT_SIZE, ): if hasattr(reader, "read"): have_read = True elif hasattr(reader, "__getitem__"): have_read = False buffer_offset = 0 size = len(reader) else: raise ValueError( "must pass an object with a read() method or " "conforms to buffer protocol" ) lib.ZSTD_CCtx_reset(self._cctx, lib.ZSTD_reset_session_only) if size < 0: size = lib.ZSTD_CONTENTSIZE_UNKNOWN zresult = lib.ZSTD_CCtx_setPledgedSrcSize(self._cctx, size) if lib.ZSTD_isError(zresult): raise ZstdError( "error setting source size: %s" % _zstd_error(zresult) ) in_buffer = ffi.new("ZSTD_inBuffer *") out_buffer = ffi.new("ZSTD_outBuffer *") in_buffer.src = ffi.NULL in_buffer.size = 0 in_buffer.pos = 0 dst_buffer = ffi.new("char[]", write_size) out_buffer.dst = dst_buffer out_buffer.size = write_size out_buffer.pos = 0 while True: # We should never have output data sitting around after a previous # iteration. assert out_buffer.pos == 0 # Collect input data. if have_read: read_result = reader.read(read_size) else: remaining = len(reader) - buffer_offset slice_size = min(remaining, read_size) read_result = reader[buffer_offset : buffer_offset + slice_size] buffer_offset += slice_size # No new input data. Break out of the read loop. if not read_result: break # Feed all read data into the compressor and emit output until # exhausted. read_buffer = ffi.from_buffer(read_result) in_buffer.src = read_buffer in_buffer.size = len(read_buffer) in_buffer.pos = 0 while in_buffer.pos < in_buffer.size: zresult = lib.ZSTD_compressStream2( self._cctx, out_buffer, in_buffer, lib.ZSTD_e_continue ) if lib.ZSTD_isError(zresult): raise ZstdError( "zstd compress error: %s" % _zstd_error(zresult) ) if out_buffer.pos: data = ffi.buffer(out_buffer.dst, out_buffer.pos)[:] out_buffer.pos = 0 yield data assert out_buffer.pos == 0 # And repeat the loop to collect more data. continue # If we get here, input is exhausted. End the stream and emit what # remains. while True: assert out_buffer.pos == 0 zresult = lib.ZSTD_compressStream2( self._cctx, out_buffer, in_buffer, lib.ZSTD_e_end ) if lib.ZSTD_isError(zresult): raise ZstdError( "error ending compression stream: %s" % _zstd_error(zresult) ) if out_buffer.pos: data = ffi.buffer(out_buffer.dst, out_buffer.pos)[:] out_buffer.pos = 0 yield data if zresult == 0: break read_from = read_to_iter def frame_progression(self): progression = lib.ZSTD_getFrameProgression(self._cctx) return progression.ingested, progression.consumed, progression.produced class FrameParameters(object): def __init__(self, fparams): self.content_size = fparams.frameContentSize self.window_size = fparams.windowSize self.dict_id = fparams.dictID self.has_checksum = bool(fparams.checksumFlag) def frame_content_size(data): data_buffer = ffi.from_buffer(data) size = lib.ZSTD_getFrameContentSize(data_buffer, len(data_buffer)) if size == lib.ZSTD_CONTENTSIZE_ERROR: raise ZstdError("error when determining content size") elif size == lib.ZSTD_CONTENTSIZE_UNKNOWN: return -1 else: return size def frame_header_size(data): data_buffer = ffi.from_buffer(data) zresult = lib.ZSTD_frameHeaderSize(data_buffer, len(data_buffer)) if lib.ZSTD_isError(zresult): raise ZstdError( "could not determine frame header size: %s" % _zstd_error(zresult) ) return zresult def get_frame_parameters(data): params = ffi.new("ZSTD_frameHeader *") data_buffer = ffi.from_buffer(data) zresult = lib.ZSTD_getFrameHeader(params, data_buffer, len(data_buffer)) if lib.ZSTD_isError(zresult): raise ZstdError( "cannot get frame parameters: %s" % _zstd_error(zresult) ) if zresult: raise ZstdError( "not enough data for frame parameters; need %d bytes" % zresult ) return FrameParameters(params[0]) class ZstdCompressionDict(object): def __init__(self, data, dict_type=DICT_TYPE_AUTO, k=0, d=0): assert isinstance(data, bytes_type) self._data = data self.k = k self.d = d if dict_type not in ( DICT_TYPE_AUTO, DICT_TYPE_RAWCONTENT, DICT_TYPE_FULLDICT, ): raise ValueError( "invalid dictionary load mode: %d; must use " "DICT_TYPE_* constants" ) self._dict_type = dict_type self._cdict = None def __len__(self): return len(self._data) def dict_id(self): return int_type(lib.ZDICT_getDictID(self._data, len(self._data))) def as_bytes(self): return self._data def precompute_compress(self, level=0, compression_params=None): if level and compression_params: raise ValueError( "must only specify one of level or " "compression_params" ) if not level and not compression_params: raise ValueError("must specify one of level or compression_params") if level: cparams = lib.ZSTD_getCParams(level, 0, len(self._data)) else: cparams = ffi.new("ZSTD_compressionParameters") cparams.chainLog = compression_params.chain_log cparams.hashLog = compression_params.hash_log cparams.minMatch = compression_params.min_match cparams.searchLog = compression_params.search_log cparams.strategy = compression_params.compression_strategy cparams.targetLength = compression_params.target_length cparams.windowLog = compression_params.window_log cdict = lib.ZSTD_createCDict_advanced( self._data, len(self._data), lib.ZSTD_dlm_byRef, self._dict_type, cparams, lib.ZSTD_defaultCMem, ) if cdict == ffi.NULL: raise ZstdError("unable to precompute dictionary") self._cdict = ffi.gc( cdict, lib.ZSTD_freeCDict, size=lib.ZSTD_sizeof_CDict(cdict) ) @property def _ddict(self): ddict = lib.ZSTD_createDDict_advanced( self._data, len(self._data), lib.ZSTD_dlm_byRef, self._dict_type, lib.ZSTD_defaultCMem, ) if ddict == ffi.NULL: raise ZstdError("could not create decompression dict") ddict = ffi.gc( ddict, lib.ZSTD_freeDDict, size=lib.ZSTD_sizeof_DDict(ddict) ) self.__dict__["_ddict"] = ddict return ddict def train_dictionary( dict_size, samples, k=0, d=0, notifications=0, dict_id=0, level=0, steps=0, threads=0, ): if not isinstance(samples, list): raise TypeError("samples must be a list") if threads < 0: threads = _cpu_count() total_size = sum(map(len, samples)) samples_buffer = new_nonzero("char[]", total_size) sample_sizes = new_nonzero("size_t[]", len(samples)) offset = 0 for i, sample in enumerate(samples): if not isinstance(sample, bytes_type): raise ValueError("samples must be bytes") l = len(sample) ffi.memmove(samples_buffer + offset, sample, l) offset += l sample_sizes[i] = l dict_data = new_nonzero("char[]", dict_size) dparams = ffi.new("ZDICT_cover_params_t *")[0] dparams.k = k dparams.d = d dparams.steps = steps dparams.nbThreads = threads dparams.zParams.notificationLevel = notifications dparams.zParams.dictID = dict_id dparams.zParams.compressionLevel = level if ( not dparams.k and not dparams.d and not dparams.steps and not dparams.nbThreads and not dparams.zParams.notificationLevel and not dparams.zParams.dictID and not dparams.zParams.compressionLevel ): zresult = lib.ZDICT_trainFromBuffer( ffi.addressof(dict_data), dict_size, ffi.addressof(samples_buffer), ffi.addressof(sample_sizes, 0), len(samples), ) elif dparams.steps or dparams.nbThreads: zresult = lib.ZDICT_optimizeTrainFromBuffer_cover( ffi.addressof(dict_data), dict_size, ffi.addressof(samples_buffer), ffi.addressof(sample_sizes, 0), len(samples), ffi.addressof(dparams), ) else: zresult = lib.ZDICT_trainFromBuffer_cover( ffi.addressof(dict_data), dict_size, ffi.addressof(samples_buffer), ffi.addressof(sample_sizes, 0), len(samples), dparams, ) if lib.ZDICT_isError(zresult): msg = ffi.string(lib.ZDICT_getErrorName(zresult)).decode("utf-8") raise ZstdError("cannot train dict: %s" % msg) return ZstdCompressionDict( ffi.buffer(dict_data, zresult)[:], dict_type=DICT_TYPE_FULLDICT, k=dparams.k, d=dparams.d, ) class ZstdDecompressionObj(object): def __init__(self, decompressor, write_size): self._decompressor = decompressor self._write_size = write_size self._finished = False def decompress(self, data): if self._finished: raise ZstdError("cannot use a decompressobj multiple times") in_buffer = ffi.new("ZSTD_inBuffer *") out_buffer = ffi.new("ZSTD_outBuffer *") data_buffer = ffi.from_buffer(data) if len(data_buffer) == 0: return b"" in_buffer.src = data_buffer in_buffer.size = len(data_buffer) in_buffer.pos = 0 dst_buffer = ffi.new("char[]", self._write_size) out_buffer.dst = dst_buffer out_buffer.size = len(dst_buffer) out_buffer.pos = 0 chunks = [] while True: zresult = lib.ZSTD_decompressStream( self._decompressor._dctx, out_buffer, in_buffer ) if lib.ZSTD_isError(zresult): raise ZstdError( "zstd decompressor error: %s" % _zstd_error(zresult) ) if zresult == 0: self._finished = True self._decompressor = None if out_buffer.pos: chunks.append(ffi.buffer(out_buffer.dst, out_buffer.pos)[:]) if zresult == 0 or ( in_buffer.pos == in_buffer.size and out_buffer.pos == 0 ): break out_buffer.pos = 0 return b"".join(chunks) def flush(self, length=0): pass class ZstdDecompressionReader(object): def __init__(self, decompressor, source, read_size, read_across_frames): self._decompressor = decompressor self._source = source self._read_size = read_size self._read_across_frames = bool(read_across_frames) self._entered = False self._closed = False self._bytes_decompressed = 0 self._finished_input = False self._finished_output = False self._in_buffer = ffi.new("ZSTD_inBuffer *") # Holds a ref to self._in_buffer.src. self._source_buffer = None def __enter__(self): if self._entered: raise ValueError("cannot __enter__ multiple times") self._entered = True return self def __exit__(self, exc_type, exc_value, exc_tb): self._entered = False self._closed = True self._source = None self._decompressor = None return False def readable(self): return True def writable(self): return False def seekable(self): return True def readline(self): raise io.UnsupportedOperation() def readlines(self): raise io.UnsupportedOperation() def write(self, data): raise io.UnsupportedOperation() def writelines(self, lines): raise io.UnsupportedOperation() def isatty(self): return False def flush(self): return None def close(self): self._closed = True return None @property def closed(self): return self._closed def tell(self): return self._bytes_decompressed def readall(self): chunks = [] while True: chunk = self.read(1048576) if not chunk: break chunks.append(chunk) return b"".join(chunks) def __iter__(self): raise io.UnsupportedOperation() def __next__(self): raise io.UnsupportedOperation() next = __next__ def _read_input(self): # We have data left over in the input buffer. Use it. if self._in_buffer.pos < self._in_buffer.size: return # All input data exhausted. Nothing to do. if self._finished_input: return # Else populate the input buffer from our source. if hasattr(self._source, "read"): data = self._source.read(self._read_size) if not data: self._finished_input = True return self._source_buffer = ffi.from_buffer(data) self._in_buffer.src = self._source_buffer self._in_buffer.size = len(self._source_buffer) self._in_buffer.pos = 0 else: self._source_buffer = ffi.from_buffer(self._source) self._in_buffer.src = self._source_buffer self._in_buffer.size = len(self._source_buffer) self._in_buffer.pos = 0 def _decompress_into_buffer(self, out_buffer): """Decompress available input into an output buffer. Returns True if data in output buffer should be emitted. """ zresult = lib.ZSTD_decompressStream( self._decompressor._dctx, out_buffer, self._in_buffer ) if self._in_buffer.pos == self._in_buffer.size: self._in_buffer.src = ffi.NULL self._in_buffer.pos = 0 self._in_buffer.size = 0 self._source_buffer = None if not hasattr(self._source, "read"): self._finished_input = True if lib.ZSTD_isError(zresult): raise ZstdError("zstd decompress error: %s" % _zstd_error(zresult)) # Emit data if there is data AND either: # a) output buffer is full (read amount is satisfied) # b) we're at end of a frame and not in frame spanning mode return out_buffer.pos and ( out_buffer.pos == out_buffer.size or zresult == 0 and not self._read_across_frames ) def read(self, size=-1): if self._closed: raise ValueError("stream is closed") if size < -1: raise ValueError("cannot read negative amounts less than -1") if size == -1: # This is recursive. But it gets the job done. return self.readall() if self._finished_output or size == 0: return b"" # We /could/ call into readinto() here. But that introduces more # overhead. dst_buffer = ffi.new("char[]", size) out_buffer = ffi.new("ZSTD_outBuffer *") out_buffer.dst = dst_buffer out_buffer.size = size out_buffer.pos = 0 self._read_input() if self._decompress_into_buffer(out_buffer): self._bytes_decompressed += out_buffer.pos return ffi.buffer(out_buffer.dst, out_buffer.pos)[:] while not self._finished_input: self._read_input() if self._decompress_into_buffer(out_buffer): self._bytes_decompressed += out_buffer.pos return ffi.buffer(out_buffer.dst, out_buffer.pos)[:] self._bytes_decompressed += out_buffer.pos return ffi.buffer(out_buffer.dst, out_buffer.pos)[:] def readinto(self, b): if self._closed: raise ValueError("stream is closed") if self._finished_output: return 0 # TODO use writable=True once we require CFFI >= 1.12. dest_buffer = ffi.from_buffer(b) ffi.memmove(b, b"", 0) out_buffer = ffi.new("ZSTD_outBuffer *") out_buffer.dst = dest_buffer out_buffer.size = len(dest_buffer) out_buffer.pos = 0 self._read_input() if self._decompress_into_buffer(out_buffer): self._bytes_decompressed += out_buffer.pos return out_buffer.pos while not self._finished_input: self._read_input() if self._decompress_into_buffer(out_buffer): self._bytes_decompressed += out_buffer.pos return out_buffer.pos self._bytes_decompressed += out_buffer.pos return out_buffer.pos def read1(self, size=-1): if self._closed: raise ValueError("stream is closed") if size < -1: raise ValueError("cannot read negative amounts less than -1") if self._finished_output or size == 0: return b"" # -1 returns arbitrary number of bytes. if size == -1: size = DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE dst_buffer = ffi.new("char[]", size) out_buffer = ffi.new("ZSTD_outBuffer *") out_buffer.dst = dst_buffer out_buffer.size = size out_buffer.pos = 0 # read1() dictates that we can perform at most 1 call to underlying # stream to get input. However, we can't satisfy this restriction with # decompression because not all input generates output. So we allow # multiple read(). But unlike read(), we stop once we have any output. while not self._finished_input: self._read_input() self._decompress_into_buffer(out_buffer) if out_buffer.pos: break self._bytes_decompressed += out_buffer.pos return ffi.buffer(out_buffer.dst, out_buffer.pos)[:] def readinto1(self, b): if self._closed: raise ValueError("stream is closed") if self._finished_output: return 0 # TODO use writable=True once we require CFFI >= 1.12. dest_buffer = ffi.from_buffer(b) ffi.memmove(b, b"", 0) out_buffer = ffi.new("ZSTD_outBuffer *") out_buffer.dst = dest_buffer out_buffer.size = len(dest_buffer) out_buffer.pos = 0 while not self._finished_input and not self._finished_output: self._read_input() self._decompress_into_buffer(out_buffer) if out_buffer.pos: break self._bytes_decompressed += out_buffer.pos return out_buffer.pos def seek(self, pos, whence=os.SEEK_SET): if self._closed: raise ValueError("stream is closed") read_amount = 0 if whence == os.SEEK_SET: if pos < 0: raise ValueError( "cannot seek to negative position with SEEK_SET" ) if pos < self._bytes_decompressed: raise ValueError( "cannot seek zstd decompression stream " "backwards" ) read_amount = pos - self._bytes_decompressed elif whence == os.SEEK_CUR: if pos < 0: raise ValueError( "cannot seek zstd decompression stream " "backwards" ) read_amount = pos elif whence == os.SEEK_END: raise ValueError( "zstd decompression streams cannot be seeked " "with SEEK_END" ) while read_amount: result = self.read( min(read_amount, DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE) ) if not result: break read_amount -= len(result) return self._bytes_decompressed class ZstdDecompressionWriter(object): def __init__(self, decompressor, writer, write_size, write_return_read): decompressor._ensure_dctx() self._decompressor = decompressor self._writer = writer self._write_size = write_size self._write_return_read = bool(write_return_read) self._entered = False self._closed = False def __enter__(self): if self._closed: raise ValueError("stream is closed") if self._entered: raise ZstdError("cannot __enter__ multiple times") self._entered = True return self def __exit__(self, exc_type, exc_value, exc_tb): self._entered = False self.close() def memory_size(self): return lib.ZSTD_sizeof_DCtx(self._decompressor._dctx) def close(self): if self._closed: return try: self.flush() finally: self._closed = True f = getattr(self._writer, "close", None) if f: f() @property def closed(self): return self._closed def fileno(self): f = getattr(self._writer, "fileno", None) if f: return f() else: raise OSError("fileno not available on underlying writer") def flush(self): if self._closed: raise ValueError("stream is closed") f = getattr(self._writer, "flush", None) if f: return f() def isatty(self): return False def readable(self): return False def readline(self, size=-1): raise io.UnsupportedOperation() def readlines(self, hint=-1): raise io.UnsupportedOperation() def seek(self, offset, whence=None): raise io.UnsupportedOperation() def seekable(self): return False def tell(self): raise io.UnsupportedOperation() def truncate(self, size=None): raise io.UnsupportedOperation() def writable(self): return True def writelines(self, lines): raise io.UnsupportedOperation() def read(self, size=-1): raise io.UnsupportedOperation() def readall(self): raise io.UnsupportedOperation() def readinto(self, b): raise io.UnsupportedOperation() def write(self, data): if self._closed: raise ValueError("stream is closed") total_write = 0 in_buffer = ffi.new("ZSTD_inBuffer *") out_buffer = ffi.new("ZSTD_outBuffer *") data_buffer = ffi.from_buffer(data) in_buffer.src = data_buffer in_buffer.size = len(data_buffer) in_buffer.pos = 0 dst_buffer = ffi.new("char[]", self._write_size) out_buffer.dst = dst_buffer out_buffer.size = len(dst_buffer) out_buffer.pos = 0 dctx = self._decompressor._dctx while in_buffer.pos < in_buffer.size: zresult = lib.ZSTD_decompressStream(dctx, out_buffer, in_buffer) if lib.ZSTD_isError(zresult): raise ZstdError( "zstd decompress error: %s" % _zstd_error(zresult) ) if out_buffer.pos: self._writer.write( ffi.buffer(out_buffer.dst, out_buffer.pos)[:] ) total_write += out_buffer.pos out_buffer.pos = 0 if self._write_return_read: return in_buffer.pos else: return total_write class ZstdDecompressor(object): def __init__(self, dict_data=None, max_window_size=0, format=FORMAT_ZSTD1): self._dict_data = dict_data self._max_window_size = max_window_size self._format = format dctx = lib.ZSTD_createDCtx() if dctx == ffi.NULL: raise MemoryError() self._dctx = dctx # Defer setting up garbage collection until full state is loaded so # the memory size is more accurate. try: self._ensure_dctx() finally: self._dctx = ffi.gc( dctx, lib.ZSTD_freeDCtx, size=lib.ZSTD_sizeof_DCtx(dctx) ) def memory_size(self): return lib.ZSTD_sizeof_DCtx(self._dctx) def decompress(self, data, max_output_size=0): self._ensure_dctx() data_buffer = ffi.from_buffer(data) output_size = lib.ZSTD_getFrameContentSize( data_buffer, len(data_buffer) ) if output_size == lib.ZSTD_CONTENTSIZE_ERROR: raise ZstdError("error determining content size from frame header") elif output_size == 0: return b"" elif output_size == lib.ZSTD_CONTENTSIZE_UNKNOWN: if not max_output_size: raise ZstdError( "could not determine content size in frame header" ) result_buffer = ffi.new("char[]", max_output_size) result_size = max_output_size output_size = 0 else: result_buffer = ffi.new("char[]", output_size) result_size = output_size out_buffer = ffi.new("ZSTD_outBuffer *") out_buffer.dst = result_buffer out_buffer.size = result_size out_buffer.pos = 0 in_buffer = ffi.new("ZSTD_inBuffer *") in_buffer.src = data_buffer in_buffer.size = len(data_buffer) in_buffer.pos = 0 zresult = lib.ZSTD_decompressStream(self._dctx, out_buffer, in_buffer) if lib.ZSTD_isError(zresult): raise ZstdError("decompression error: %s" % _zstd_error(zresult)) elif zresult: raise ZstdError( "decompression error: did not decompress full frame" ) elif output_size and out_buffer.pos != output_size: raise ZstdError( "decompression error: decompressed %d bytes; expected %d" % (zresult, output_size) ) return ffi.buffer(result_buffer, out_buffer.pos)[:] def stream_reader( self, source, read_size=DECOMPRESSION_RECOMMENDED_INPUT_SIZE, read_across_frames=False, ): self._ensure_dctx() return ZstdDecompressionReader( self, source, read_size, read_across_frames ) def decompressobj(self, write_size=DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE): if write_size < 1: raise ValueError("write_size must be positive") self._ensure_dctx() return ZstdDecompressionObj(self, write_size=write_size) def read_to_iter( self, reader, read_size=DECOMPRESSION_RECOMMENDED_INPUT_SIZE, write_size=DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE, skip_bytes=0, ): if skip_bytes >= read_size: raise ValueError("skip_bytes must be smaller than read_size") if hasattr(reader, "read"): have_read = True elif hasattr(reader, "__getitem__"): have_read = False buffer_offset = 0 size = len(reader) else: raise ValueError( "must pass an object with a read() method or " "conforms to buffer protocol" ) if skip_bytes: if have_read: reader.read(skip_bytes) else: if skip_bytes > size: raise ValueError("skip_bytes larger than first input chunk") buffer_offset = skip_bytes self._ensure_dctx() in_buffer = ffi.new("ZSTD_inBuffer *") out_buffer = ffi.new("ZSTD_outBuffer *") dst_buffer = ffi.new("char[]", write_size) out_buffer.dst = dst_buffer out_buffer.size = len(dst_buffer) out_buffer.pos = 0 while True: assert out_buffer.pos == 0 if have_read: read_result = reader.read(read_size) else: remaining = size - buffer_offset slice_size = min(remaining, read_size) read_result = reader[buffer_offset : buffer_offset + slice_size] buffer_offset += slice_size # No new input. Break out of read loop. if not read_result: break # Feed all read data into decompressor and emit output until # exhausted. read_buffer = ffi.from_buffer(read_result) in_buffer.src = read_buffer in_buffer.size = len(read_buffer) in_buffer.pos = 0 while in_buffer.pos < in_buffer.size: assert out_buffer.pos == 0 zresult = lib.ZSTD_decompressStream( self._dctx, out_buffer, in_buffer ) if lib.ZSTD_isError(zresult): raise ZstdError( "zstd decompress error: %s" % _zstd_error(zresult) ) if out_buffer.pos: data = ffi.buffer(out_buffer.dst, out_buffer.pos)[:] out_buffer.pos = 0 yield data if zresult == 0: return # Repeat loop to collect more input data. continue # If we get here, input is exhausted. read_from = read_to_iter def stream_writer( self, writer, write_size=DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE, write_return_read=False, ): if not hasattr(writer, "write"): raise ValueError("must pass an object with a write() method") return ZstdDecompressionWriter( self, writer, write_size, write_return_read ) write_to = stream_writer def copy_stream( self, ifh, ofh, read_size=DECOMPRESSION_RECOMMENDED_INPUT_SIZE, write_size=DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE, ): if not hasattr(ifh, "read"): raise ValueError("first argument must have a read() method") if not hasattr(ofh, "write"): raise ValueError("second argument must have a write() method") self._ensure_dctx() in_buffer = ffi.new("ZSTD_inBuffer *") out_buffer = ffi.new("ZSTD_outBuffer *") dst_buffer = ffi.new("char[]", write_size) out_buffer.dst = dst_buffer out_buffer.size = write_size out_buffer.pos = 0 total_read, total_write = 0, 0 # Read all available input. while True: data = ifh.read(read_size) if not data: break data_buffer = ffi.from_buffer(data) total_read += len(data_buffer) in_buffer.src = data_buffer in_buffer.size = len(data_buffer) in_buffer.pos = 0 # Flush all read data to output. while in_buffer.pos < in_buffer.size: zresult = lib.ZSTD_decompressStream( self._dctx, out_buffer, in_buffer ) if lib.ZSTD_isError(zresult): raise ZstdError( "zstd decompressor error: %s" % _zstd_error(zresult) ) if out_buffer.pos: ofh.write(ffi.buffer(out_buffer.dst, out_buffer.pos)) total_write += out_buffer.pos out_buffer.pos = 0 # Continue loop to keep reading. return total_read, total_write def decompress_content_dict_chain(self, frames): if not isinstance(frames, list): raise TypeError("argument must be a list") if not frames: raise ValueError("empty input chain") # First chunk should not be using a dictionary. We handle it specially. chunk = frames[0] if not isinstance(chunk, bytes_type): raise ValueError("chunk 0 must be bytes") # All chunks should be zstd frames and should have content size set. chunk_buffer = ffi.from_buffer(chunk) params = ffi.new("ZSTD_frameHeader *") zresult = lib.ZSTD_getFrameHeader( params, chunk_buffer, len(chunk_buffer) ) if lib.ZSTD_isError(zresult): raise ValueError("chunk 0 is not a valid zstd frame") elif zresult: raise ValueError("chunk 0 is too small to contain a zstd frame") if params.frameContentSize == lib.ZSTD_CONTENTSIZE_UNKNOWN: raise ValueError("chunk 0 missing content size in frame") self._ensure_dctx(load_dict=False) last_buffer = ffi.new("char[]", params.frameContentSize) out_buffer = ffi.new("ZSTD_outBuffer *") out_buffer.dst = last_buffer out_buffer.size = len(last_buffer) out_buffer.pos = 0 in_buffer = ffi.new("ZSTD_inBuffer *") in_buffer.src = chunk_buffer in_buffer.size = len(chunk_buffer) in_buffer.pos = 0 zresult = lib.ZSTD_decompressStream(self._dctx, out_buffer, in_buffer) if lib.ZSTD_isError(zresult): raise ZstdError( "could not decompress chunk 0: %s" % _zstd_error(zresult) ) elif zresult: raise ZstdError("chunk 0 did not decompress full frame") # Special case of chain length of 1 if len(frames) == 1: return ffi.buffer(last_buffer, len(last_buffer))[:] i = 1 while i < len(frames): chunk = frames[i] if not isinstance(chunk, bytes_type): raise ValueError("chunk %d must be bytes" % i) chunk_buffer = ffi.from_buffer(chunk) zresult = lib.ZSTD_getFrameHeader( params, chunk_buffer, len(chunk_buffer) ) if lib.ZSTD_isError(zresult): raise ValueError("chunk %d is not a valid zstd frame" % i) elif zresult: raise ValueError( "chunk %d is too small to contain a zstd frame" % i ) if params.frameContentSize == lib.ZSTD_CONTENTSIZE_UNKNOWN: raise ValueError("chunk %d missing content size in frame" % i) dest_buffer = ffi.new("char[]", params.frameContentSize) out_buffer.dst = dest_buffer out_buffer.size = len(dest_buffer) out_buffer.pos = 0 in_buffer.src = chunk_buffer in_buffer.size = len(chunk_buffer) in_buffer.pos = 0 zresult = lib.ZSTD_decompressStream( self._dctx, out_buffer, in_buffer ) if lib.ZSTD_isError(zresult): raise ZstdError( "could not decompress chunk %d: %s" % _zstd_error(zresult) ) elif zresult: raise ZstdError("chunk %d did not decompress full frame" % i) last_buffer = dest_buffer i += 1 return ffi.buffer(last_buffer, len(last_buffer))[:] def _ensure_dctx(self, load_dict=True): lib.ZSTD_DCtx_reset(self._dctx, lib.ZSTD_reset_session_only) if self._max_window_size: zresult = lib.ZSTD_DCtx_setMaxWindowSize( self._dctx, self._max_window_size ) if lib.ZSTD_isError(zresult): raise ZstdError( "unable to set max window size: %s" % _zstd_error(zresult) ) zresult = lib.ZSTD_DCtx_setFormat(self._dctx, self._format) if lib.ZSTD_isError(zresult): raise ZstdError( "unable to set decoding format: %s" % _zstd_error(zresult) ) if self._dict_data and load_dict: zresult = lib.ZSTD_DCtx_refDDict(self._dctx, self._dict_data._ddict) if lib.ZSTD_isError(zresult): raise ZstdError( "unable to reference prepared dictionary: %s" % _zstd_error(zresult) )