cffi.py
2615 lines
| 78.9 KiB
| text/x-python
|
PythonLexer
Gregory Szorc
|
r42237 | # Copyright (c) 2016-present, Gregory Szorc | ||
# All rights reserved. | ||||
# | ||||
# This software may be modified and distributed under the terms | ||||
# of the BSD license. See the LICENSE file for details. | ||||
"""Python interface to the Zstandard (zstd) compression library.""" | ||||
from __future__ import absolute_import, unicode_literals | ||||
# This should match what the C extension exports. | ||||
__all__ = [ | ||||
#'BufferSegment', | ||||
#'BufferSegments', | ||||
#'BufferWithSegments', | ||||
#'BufferWithSegmentsCollection', | ||||
Gregory Szorc
|
r44446 | "CompressionParameters", | ||
"ZstdCompressionDict", | ||||
"ZstdCompressionParameters", | ||||
"ZstdCompressor", | ||||
"ZstdError", | ||||
"ZstdDecompressor", | ||||
"FrameParameters", | ||||
"estimate_decompression_context_size", | ||||
"frame_content_size", | ||||
"frame_header_size", | ||||
"get_frame_parameters", | ||||
"train_dictionary", | ||||
Gregory Szorc
|
r42237 | # Constants. | ||
Gregory Szorc
|
r44446 | "FLUSH_BLOCK", | ||
"FLUSH_FRAME", | ||||
"COMPRESSOBJ_FLUSH_FINISH", | ||||
"COMPRESSOBJ_FLUSH_BLOCK", | ||||
"ZSTD_VERSION", | ||||
"FRAME_HEADER", | ||||
"CONTENTSIZE_UNKNOWN", | ||||
"CONTENTSIZE_ERROR", | ||||
"MAX_COMPRESSION_LEVEL", | ||||
"COMPRESSION_RECOMMENDED_INPUT_SIZE", | ||||
"COMPRESSION_RECOMMENDED_OUTPUT_SIZE", | ||||
"DECOMPRESSION_RECOMMENDED_INPUT_SIZE", | ||||
"DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE", | ||||
"MAGIC_NUMBER", | ||||
"BLOCKSIZELOG_MAX", | ||||
"BLOCKSIZE_MAX", | ||||
"WINDOWLOG_MIN", | ||||
"WINDOWLOG_MAX", | ||||
"CHAINLOG_MIN", | ||||
"CHAINLOG_MAX", | ||||
"HASHLOG_MIN", | ||||
"HASHLOG_MAX", | ||||
"HASHLOG3_MAX", | ||||
"MINMATCH_MIN", | ||||
"MINMATCH_MAX", | ||||
"SEARCHLOG_MIN", | ||||
"SEARCHLOG_MAX", | ||||
"SEARCHLENGTH_MIN", | ||||
"SEARCHLENGTH_MAX", | ||||
"TARGETLENGTH_MIN", | ||||
"TARGETLENGTH_MAX", | ||||
"LDM_MINMATCH_MIN", | ||||
"LDM_MINMATCH_MAX", | ||||
"LDM_BUCKETSIZELOG_MAX", | ||||
"STRATEGY_FAST", | ||||
"STRATEGY_DFAST", | ||||
"STRATEGY_GREEDY", | ||||
"STRATEGY_LAZY", | ||||
"STRATEGY_LAZY2", | ||||
"STRATEGY_BTLAZY2", | ||||
"STRATEGY_BTOPT", | ||||
"STRATEGY_BTULTRA", | ||||
"STRATEGY_BTULTRA2", | ||||
"DICT_TYPE_AUTO", | ||||
"DICT_TYPE_RAWCONTENT", | ||||
"DICT_TYPE_FULLDICT", | ||||
"FORMAT_ZSTD1", | ||||
"FORMAT_ZSTD1_MAGICLESS", | ||||
Gregory Szorc
|
r42237 | ] | ||
import io | ||||
import os | ||||
import sys | ||||
from _zstd_cffi import ( | ||||
ffi, | ||||
lib, | ||||
) | ||||
if sys.version_info[0] == 2: | ||||
bytes_type = str | ||||
int_type = long | ||||
else: | ||||
bytes_type = bytes | ||||
int_type = int | ||||
COMPRESSION_RECOMMENDED_INPUT_SIZE = lib.ZSTD_CStreamInSize() | ||||
COMPRESSION_RECOMMENDED_OUTPUT_SIZE = lib.ZSTD_CStreamOutSize() | ||||
DECOMPRESSION_RECOMMENDED_INPUT_SIZE = lib.ZSTD_DStreamInSize() | ||||
DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE = lib.ZSTD_DStreamOutSize() | ||||
new_nonzero = ffi.new_allocator(should_clear_after_alloc=False) | ||||
MAX_COMPRESSION_LEVEL = lib.ZSTD_maxCLevel() | ||||
MAGIC_NUMBER = lib.ZSTD_MAGICNUMBER | ||||
Gregory Szorc
|
r44446 | FRAME_HEADER = b"\x28\xb5\x2f\xfd" | ||
Gregory Szorc
|
r42237 | CONTENTSIZE_UNKNOWN = lib.ZSTD_CONTENTSIZE_UNKNOWN | ||
CONTENTSIZE_ERROR = lib.ZSTD_CONTENTSIZE_ERROR | ||||
Gregory Szorc
|
r44446 | ZSTD_VERSION = ( | ||
lib.ZSTD_VERSION_MAJOR, | ||||
lib.ZSTD_VERSION_MINOR, | ||||
lib.ZSTD_VERSION_RELEASE, | ||||
) | ||||
Gregory Szorc
|
r42237 | |||
BLOCKSIZELOG_MAX = lib.ZSTD_BLOCKSIZELOG_MAX | ||||
BLOCKSIZE_MAX = lib.ZSTD_BLOCKSIZE_MAX | ||||
WINDOWLOG_MIN = lib.ZSTD_WINDOWLOG_MIN | ||||
WINDOWLOG_MAX = lib.ZSTD_WINDOWLOG_MAX | ||||
CHAINLOG_MIN = lib.ZSTD_CHAINLOG_MIN | ||||
CHAINLOG_MAX = lib.ZSTD_CHAINLOG_MAX | ||||
HASHLOG_MIN = lib.ZSTD_HASHLOG_MIN | ||||
HASHLOG_MAX = lib.ZSTD_HASHLOG_MAX | ||||
HASHLOG3_MAX = lib.ZSTD_HASHLOG3_MAX | ||||
MINMATCH_MIN = lib.ZSTD_MINMATCH_MIN | ||||
MINMATCH_MAX = lib.ZSTD_MINMATCH_MAX | ||||
SEARCHLOG_MIN = lib.ZSTD_SEARCHLOG_MIN | ||||
SEARCHLOG_MAX = lib.ZSTD_SEARCHLOG_MAX | ||||
SEARCHLENGTH_MIN = lib.ZSTD_MINMATCH_MIN | ||||
SEARCHLENGTH_MAX = lib.ZSTD_MINMATCH_MAX | ||||
TARGETLENGTH_MIN = lib.ZSTD_TARGETLENGTH_MIN | ||||
TARGETLENGTH_MAX = lib.ZSTD_TARGETLENGTH_MAX | ||||
LDM_MINMATCH_MIN = lib.ZSTD_LDM_MINMATCH_MIN | ||||
LDM_MINMATCH_MAX = lib.ZSTD_LDM_MINMATCH_MAX | ||||
LDM_BUCKETSIZELOG_MAX = lib.ZSTD_LDM_BUCKETSIZELOG_MAX | ||||
STRATEGY_FAST = lib.ZSTD_fast | ||||
STRATEGY_DFAST = lib.ZSTD_dfast | ||||
STRATEGY_GREEDY = lib.ZSTD_greedy | ||||
STRATEGY_LAZY = lib.ZSTD_lazy | ||||
STRATEGY_LAZY2 = lib.ZSTD_lazy2 | ||||
STRATEGY_BTLAZY2 = lib.ZSTD_btlazy2 | ||||
STRATEGY_BTOPT = lib.ZSTD_btopt | ||||
STRATEGY_BTULTRA = lib.ZSTD_btultra | ||||
STRATEGY_BTULTRA2 = lib.ZSTD_btultra2 | ||||
DICT_TYPE_AUTO = lib.ZSTD_dct_auto | ||||
DICT_TYPE_RAWCONTENT = lib.ZSTD_dct_rawContent | ||||
DICT_TYPE_FULLDICT = lib.ZSTD_dct_fullDict | ||||
FORMAT_ZSTD1 = lib.ZSTD_f_zstd1 | ||||
FORMAT_ZSTD1_MAGICLESS = lib.ZSTD_f_zstd1_magicless | ||||
FLUSH_BLOCK = 0 | ||||
FLUSH_FRAME = 1 | ||||
COMPRESSOBJ_FLUSH_FINISH = 0 | ||||
COMPRESSOBJ_FLUSH_BLOCK = 1 | ||||
def _cpu_count(): | ||||
# os.cpu_count() was introducd in Python 3.4. | ||||
try: | ||||
return os.cpu_count() or 0 | ||||
except AttributeError: | ||||
pass | ||||
# Linux. | ||||
try: | ||||
if sys.version_info[0] == 2: | ||||
Gregory Szorc
|
r44446 | return os.sysconf(b"SC_NPROCESSORS_ONLN") | ||
Gregory Szorc
|
r42237 | else: | ||
Gregory Szorc
|
r44446 | return os.sysconf("SC_NPROCESSORS_ONLN") | ||
Gregory Szorc
|
r42237 | except (AttributeError, ValueError): | ||
pass | ||||
# TODO implement on other platforms. | ||||
return 0 | ||||
class ZstdError(Exception): | ||||
pass | ||||
def _zstd_error(zresult): | ||||
# Resolves to bytes on Python 2 and 3. We use the string for formatting | ||||
# into error messages, which will be literal unicode. So convert it to | ||||
# unicode. | ||||
Gregory Szorc
|
r44446 | return ffi.string(lib.ZSTD_getErrorName(zresult)).decode("utf-8") | ||
Gregory Szorc
|
r42237 | |||
def _make_cctx_params(params): | ||||
res = lib.ZSTD_createCCtxParams() | ||||
if res == ffi.NULL: | ||||
raise MemoryError() | ||||
res = ffi.gc(res, lib.ZSTD_freeCCtxParams) | ||||
attrs = [ | ||||
(lib.ZSTD_c_format, params.format), | ||||
(lib.ZSTD_c_compressionLevel, params.compression_level), | ||||
(lib.ZSTD_c_windowLog, params.window_log), | ||||
(lib.ZSTD_c_hashLog, params.hash_log), | ||||
(lib.ZSTD_c_chainLog, params.chain_log), | ||||
(lib.ZSTD_c_searchLog, params.search_log), | ||||
(lib.ZSTD_c_minMatch, params.min_match), | ||||
(lib.ZSTD_c_targetLength, params.target_length), | ||||
(lib.ZSTD_c_strategy, params.compression_strategy), | ||||
(lib.ZSTD_c_contentSizeFlag, params.write_content_size), | ||||
(lib.ZSTD_c_checksumFlag, params.write_checksum), | ||||
(lib.ZSTD_c_dictIDFlag, params.write_dict_id), | ||||
(lib.ZSTD_c_nbWorkers, params.threads), | ||||
(lib.ZSTD_c_jobSize, params.job_size), | ||||
(lib.ZSTD_c_overlapLog, params.overlap_log), | ||||
(lib.ZSTD_c_forceMaxWindow, params.force_max_window), | ||||
(lib.ZSTD_c_enableLongDistanceMatching, params.enable_ldm), | ||||
(lib.ZSTD_c_ldmHashLog, params.ldm_hash_log), | ||||
(lib.ZSTD_c_ldmMinMatch, params.ldm_min_match), | ||||
(lib.ZSTD_c_ldmBucketSizeLog, params.ldm_bucket_size_log), | ||||
(lib.ZSTD_c_ldmHashRateLog, params.ldm_hash_rate_log), | ||||
] | ||||
for param, value in attrs: | ||||
_set_compression_parameter(res, param, value) | ||||
return res | ||||
Gregory Szorc
|
r44446 | |||
Gregory Szorc
|
r42237 | class ZstdCompressionParameters(object): | ||
@staticmethod | ||||
def from_level(level, source_size=0, dict_size=0, **kwargs): | ||||
params = lib.ZSTD_getCParams(level, source_size, dict_size) | ||||
args = { | ||||
Gregory Szorc
|
r44446 | "window_log": "windowLog", | ||
"chain_log": "chainLog", | ||||
"hash_log": "hashLog", | ||||
"search_log": "searchLog", | ||||
"min_match": "minMatch", | ||||
"target_length": "targetLength", | ||||
"compression_strategy": "strategy", | ||||
Gregory Szorc
|
r42237 | } | ||
for arg, attr in args.items(): | ||||
if arg not in kwargs: | ||||
kwargs[arg] = getattr(params, attr) | ||||
return ZstdCompressionParameters(**kwargs) | ||||
Gregory Szorc
|
r44446 | def __init__( | ||
self, | ||||
format=0, | ||||
compression_level=0, | ||||
window_log=0, | ||||
hash_log=0, | ||||
chain_log=0, | ||||
search_log=0, | ||||
min_match=0, | ||||
target_length=0, | ||||
strategy=-1, | ||||
compression_strategy=-1, | ||||
write_content_size=1, | ||||
write_checksum=0, | ||||
write_dict_id=0, | ||||
job_size=0, | ||||
overlap_log=-1, | ||||
overlap_size_log=-1, | ||||
force_max_window=0, | ||||
enable_ldm=0, | ||||
ldm_hash_log=0, | ||||
ldm_min_match=0, | ||||
ldm_bucket_size_log=0, | ||||
ldm_hash_rate_log=-1, | ||||
ldm_hash_every_log=-1, | ||||
threads=0, | ||||
): | ||||
Gregory Szorc
|
r42237 | |||
params = lib.ZSTD_createCCtxParams() | ||||
if params == ffi.NULL: | ||||
raise MemoryError() | ||||
params = ffi.gc(params, lib.ZSTD_freeCCtxParams) | ||||
self._params = params | ||||
if threads < 0: | ||||
threads = _cpu_count() | ||||
# We need to set ZSTD_c_nbWorkers before ZSTD_c_jobSize and ZSTD_c_overlapLog | ||||
# because setting ZSTD_c_nbWorkers resets the other parameters. | ||||
_set_compression_parameter(params, lib.ZSTD_c_nbWorkers, threads) | ||||
_set_compression_parameter(params, lib.ZSTD_c_format, format) | ||||
Gregory Szorc
|
r44446 | _set_compression_parameter( | ||
params, lib.ZSTD_c_compressionLevel, compression_level | ||||
) | ||||
Gregory Szorc
|
r42237 | _set_compression_parameter(params, lib.ZSTD_c_windowLog, window_log) | ||
_set_compression_parameter(params, lib.ZSTD_c_hashLog, hash_log) | ||||
_set_compression_parameter(params, lib.ZSTD_c_chainLog, chain_log) | ||||
_set_compression_parameter(params, lib.ZSTD_c_searchLog, search_log) | ||||
_set_compression_parameter(params, lib.ZSTD_c_minMatch, min_match) | ||||
_set_compression_parameter(params, lib.ZSTD_c_targetLength, target_length) | ||||
if strategy != -1 and compression_strategy != -1: | ||||
Gregory Szorc
|
r44446 | raise ValueError("cannot specify both compression_strategy and strategy") | ||
Gregory Szorc
|
r42237 | |||
if compression_strategy != -1: | ||||
strategy = compression_strategy | ||||
elif strategy == -1: | ||||
strategy = 0 | ||||
_set_compression_parameter(params, lib.ZSTD_c_strategy, strategy) | ||||
Gregory Szorc
|
r44446 | _set_compression_parameter( | ||
params, lib.ZSTD_c_contentSizeFlag, write_content_size | ||||
) | ||||
Gregory Szorc
|
r42237 | _set_compression_parameter(params, lib.ZSTD_c_checksumFlag, write_checksum) | ||
_set_compression_parameter(params, lib.ZSTD_c_dictIDFlag, write_dict_id) | ||||
_set_compression_parameter(params, lib.ZSTD_c_jobSize, job_size) | ||||
if overlap_log != -1 and overlap_size_log != -1: | ||||
Gregory Szorc
|
r44446 | raise ValueError("cannot specify both overlap_log and overlap_size_log") | ||
Gregory Szorc
|
r42237 | |||
if overlap_size_log != -1: | ||||
overlap_log = overlap_size_log | ||||
elif overlap_log == -1: | ||||
overlap_log = 0 | ||||
_set_compression_parameter(params, lib.ZSTD_c_overlapLog, overlap_log) | ||||
_set_compression_parameter(params, lib.ZSTD_c_forceMaxWindow, force_max_window) | ||||
Gregory Szorc
|
r44446 | _set_compression_parameter( | ||
params, lib.ZSTD_c_enableLongDistanceMatching, enable_ldm | ||||
) | ||||
Gregory Szorc
|
r42237 | _set_compression_parameter(params, lib.ZSTD_c_ldmHashLog, ldm_hash_log) | ||
_set_compression_parameter(params, lib.ZSTD_c_ldmMinMatch, ldm_min_match) | ||||
Gregory Szorc
|
r44446 | _set_compression_parameter( | ||
params, lib.ZSTD_c_ldmBucketSizeLog, ldm_bucket_size_log | ||||
) | ||||
Gregory Szorc
|
r42237 | |||
if ldm_hash_rate_log != -1 and ldm_hash_every_log != -1: | ||||
Gregory Szorc
|
r44446 | raise ValueError( | ||
"cannot specify both ldm_hash_rate_log and ldm_hash_every_log" | ||||
) | ||||
Gregory Szorc
|
r42237 | |||
if ldm_hash_every_log != -1: | ||||
ldm_hash_rate_log = ldm_hash_every_log | ||||
elif ldm_hash_rate_log == -1: | ||||
ldm_hash_rate_log = 0 | ||||
_set_compression_parameter(params, lib.ZSTD_c_ldmHashRateLog, ldm_hash_rate_log) | ||||
@property | ||||
def format(self): | ||||
return _get_compression_parameter(self._params, lib.ZSTD_c_format) | ||||
@property | ||||
def compression_level(self): | ||||
return _get_compression_parameter(self._params, lib.ZSTD_c_compressionLevel) | ||||
@property | ||||
def window_log(self): | ||||
return _get_compression_parameter(self._params, lib.ZSTD_c_windowLog) | ||||
@property | ||||
def hash_log(self): | ||||
return _get_compression_parameter(self._params, lib.ZSTD_c_hashLog) | ||||
@property | ||||
def chain_log(self): | ||||
return _get_compression_parameter(self._params, lib.ZSTD_c_chainLog) | ||||
@property | ||||
def search_log(self): | ||||
return _get_compression_parameter(self._params, lib.ZSTD_c_searchLog) | ||||
@property | ||||
def min_match(self): | ||||
return _get_compression_parameter(self._params, lib.ZSTD_c_minMatch) | ||||
@property | ||||
def target_length(self): | ||||
return _get_compression_parameter(self._params, lib.ZSTD_c_targetLength) | ||||
@property | ||||
def compression_strategy(self): | ||||
return _get_compression_parameter(self._params, lib.ZSTD_c_strategy) | ||||
@property | ||||
def write_content_size(self): | ||||
return _get_compression_parameter(self._params, lib.ZSTD_c_contentSizeFlag) | ||||
@property | ||||
def write_checksum(self): | ||||
return _get_compression_parameter(self._params, lib.ZSTD_c_checksumFlag) | ||||
@property | ||||
def write_dict_id(self): | ||||
return _get_compression_parameter(self._params, lib.ZSTD_c_dictIDFlag) | ||||
@property | ||||
def job_size(self): | ||||
return _get_compression_parameter(self._params, lib.ZSTD_c_jobSize) | ||||
@property | ||||
def overlap_log(self): | ||||
return _get_compression_parameter(self._params, lib.ZSTD_c_overlapLog) | ||||
@property | ||||
def overlap_size_log(self): | ||||
return self.overlap_log | ||||
@property | ||||
def force_max_window(self): | ||||
return _get_compression_parameter(self._params, lib.ZSTD_c_forceMaxWindow) | ||||
@property | ||||
def enable_ldm(self): | ||||
Gregory Szorc
|
r44446 | return _get_compression_parameter( | ||
self._params, lib.ZSTD_c_enableLongDistanceMatching | ||||
) | ||||
Gregory Szorc
|
r42237 | |||
@property | ||||
def ldm_hash_log(self): | ||||
return _get_compression_parameter(self._params, lib.ZSTD_c_ldmHashLog) | ||||
@property | ||||
def ldm_min_match(self): | ||||
return _get_compression_parameter(self._params, lib.ZSTD_c_ldmMinMatch) | ||||
@property | ||||
def ldm_bucket_size_log(self): | ||||
return _get_compression_parameter(self._params, lib.ZSTD_c_ldmBucketSizeLog) | ||||
@property | ||||
def ldm_hash_rate_log(self): | ||||
return _get_compression_parameter(self._params, lib.ZSTD_c_ldmHashRateLog) | ||||
@property | ||||
def ldm_hash_every_log(self): | ||||
return self.ldm_hash_rate_log | ||||
@property | ||||
def threads(self): | ||||
return _get_compression_parameter(self._params, lib.ZSTD_c_nbWorkers) | ||||
def estimated_compression_context_size(self): | ||||
return lib.ZSTD_estimateCCtxSize_usingCCtxParams(self._params) | ||||
Gregory Szorc
|
r44446 | |||
Gregory Szorc
|
r42237 | CompressionParameters = ZstdCompressionParameters | ||
Gregory Szorc
|
r44446 | |||
Gregory Szorc
|
r42237 | def estimate_decompression_context_size(): | ||
return lib.ZSTD_estimateDCtxSize() | ||||
def _set_compression_parameter(params, param, value): | ||||
Gregory Szorc
|
r43207 | zresult = lib.ZSTD_CCtxParams_setParameter(params, param, value) | ||
Gregory Szorc
|
r42237 | if lib.ZSTD_isError(zresult): | ||
Gregory Szorc
|
r44446 | raise ZstdError( | ||
"unable to set compression context parameter: %s" % _zstd_error(zresult) | ||||
) | ||||
Gregory Szorc
|
r42237 | |||
def _get_compression_parameter(params, param): | ||||
Gregory Szorc
|
r44446 | result = ffi.new("int *") | ||
Gregory Szorc
|
r42237 | |||
Gregory Szorc
|
r43207 | zresult = lib.ZSTD_CCtxParams_getParameter(params, param, result) | ||
Gregory Szorc
|
r42237 | if lib.ZSTD_isError(zresult): | ||
Gregory Szorc
|
r44446 | raise ZstdError( | ||
"unable to get compression context parameter: %s" % _zstd_error(zresult) | ||||
) | ||||
Gregory Szorc
|
r42237 | |||
return result[0] | ||||
class ZstdCompressionWriter(object): | ||||
Gregory Szorc
|
r44446 | def __init__(self, compressor, writer, source_size, write_size, write_return_read): | ||
Gregory Szorc
|
r42237 | self._compressor = compressor | ||
self._writer = writer | ||||
self._write_size = write_size | ||||
self._write_return_read = bool(write_return_read) | ||||
self._entered = False | ||||
self._closed = False | ||||
self._bytes_compressed = 0 | ||||
Gregory Szorc
|
r44446 | self._dst_buffer = ffi.new("char[]", write_size) | ||
self._out_buffer = ffi.new("ZSTD_outBuffer *") | ||||
Gregory Szorc
|
r42237 | self._out_buffer.dst = self._dst_buffer | ||
self._out_buffer.size = len(self._dst_buffer) | ||||
self._out_buffer.pos = 0 | ||||
Gregory Szorc
|
r44446 | zresult = lib.ZSTD_CCtx_setPledgedSrcSize(compressor._cctx, source_size) | ||
Gregory Szorc
|
r42237 | if lib.ZSTD_isError(zresult): | ||
Gregory Szorc
|
r44446 | raise ZstdError("error setting source size: %s" % _zstd_error(zresult)) | ||
Gregory Szorc
|
r42237 | |||
def __enter__(self): | ||||
if self._closed: | ||||
Gregory Szorc
|
r44446 | raise ValueError("stream is closed") | ||
Gregory Szorc
|
r42237 | |||
if self._entered: | ||||
Gregory Szorc
|
r44446 | raise ZstdError("cannot __enter__ multiple times") | ||
Gregory Szorc
|
r42237 | |||
self._entered = True | ||||
return self | ||||
def __exit__(self, exc_type, exc_value, exc_tb): | ||||
self._entered = False | ||||
if not exc_type and not exc_value and not exc_tb: | ||||
self.close() | ||||
self._compressor = None | ||||
return False | ||||
def memory_size(self): | ||||
return lib.ZSTD_sizeof_CCtx(self._compressor._cctx) | ||||
def fileno(self): | ||||
Gregory Szorc
|
r44446 | f = getattr(self._writer, "fileno", None) | ||
Gregory Szorc
|
r42237 | if f: | ||
return f() | ||||
else: | ||||
Gregory Szorc
|
r44446 | raise OSError("fileno not available on underlying writer") | ||
Gregory Szorc
|
r42237 | |||
def close(self): | ||||
if self._closed: | ||||
return | ||||
try: | ||||
self.flush(FLUSH_FRAME) | ||||
finally: | ||||
self._closed = True | ||||
# Call close() on underlying stream as well. | ||||
Gregory Szorc
|
r44446 | f = getattr(self._writer, "close", None) | ||
Gregory Szorc
|
r42237 | if f: | ||
f() | ||||
@property | ||||
def closed(self): | ||||
return self._closed | ||||
def isatty(self): | ||||
return False | ||||
def readable(self): | ||||
return False | ||||
def readline(self, size=-1): | ||||
raise io.UnsupportedOperation() | ||||
def readlines(self, hint=-1): | ||||
raise io.UnsupportedOperation() | ||||
def seek(self, offset, whence=None): | ||||
raise io.UnsupportedOperation() | ||||
def seekable(self): | ||||
return False | ||||
def truncate(self, size=None): | ||||
raise io.UnsupportedOperation() | ||||
def writable(self): | ||||
return True | ||||
def writelines(self, lines): | ||||
Gregory Szorc
|
r44446 | raise NotImplementedError("writelines() is not yet implemented") | ||
Gregory Szorc
|
r42237 | |||
def read(self, size=-1): | ||||
raise io.UnsupportedOperation() | ||||
def readall(self): | ||||
raise io.UnsupportedOperation() | ||||
def readinto(self, b): | ||||
raise io.UnsupportedOperation() | ||||
def write(self, data): | ||||
if self._closed: | ||||
Gregory Szorc
|
r44446 | raise ValueError("stream is closed") | ||
Gregory Szorc
|
r42237 | |||
total_write = 0 | ||||
data_buffer = ffi.from_buffer(data) | ||||
Gregory Szorc
|
r44446 | in_buffer = ffi.new("ZSTD_inBuffer *") | ||
Gregory Szorc
|
r42237 | in_buffer.src = data_buffer | ||
in_buffer.size = len(data_buffer) | ||||
in_buffer.pos = 0 | ||||
out_buffer = self._out_buffer | ||||
out_buffer.pos = 0 | ||||
while in_buffer.pos < in_buffer.size: | ||||
Gregory Szorc
|
r44446 | zresult = lib.ZSTD_compressStream2( | ||
self._compressor._cctx, out_buffer, in_buffer, lib.ZSTD_e_continue | ||||
) | ||||
Gregory Szorc
|
r42237 | if lib.ZSTD_isError(zresult): | ||
Gregory Szorc
|
r44446 | raise ZstdError("zstd compress error: %s" % _zstd_error(zresult)) | ||
Gregory Szorc
|
r42237 | |||
if out_buffer.pos: | ||||
self._writer.write(ffi.buffer(out_buffer.dst, out_buffer.pos)[:]) | ||||
total_write += out_buffer.pos | ||||
self._bytes_compressed += out_buffer.pos | ||||
out_buffer.pos = 0 | ||||
if self._write_return_read: | ||||
return in_buffer.pos | ||||
else: | ||||
return total_write | ||||
def flush(self, flush_mode=FLUSH_BLOCK): | ||||
if flush_mode == FLUSH_BLOCK: | ||||
flush = lib.ZSTD_e_flush | ||||
elif flush_mode == FLUSH_FRAME: | ||||
flush = lib.ZSTD_e_end | ||||
else: | ||||
Gregory Szorc
|
r44446 | raise ValueError("unknown flush_mode: %r" % flush_mode) | ||
Gregory Szorc
|
r42237 | |||
if self._closed: | ||||
Gregory Szorc
|
r44446 | raise ValueError("stream is closed") | ||
Gregory Szorc
|
r42237 | |||
total_write = 0 | ||||
out_buffer = self._out_buffer | ||||
out_buffer.pos = 0 | ||||
Gregory Szorc
|
r44446 | in_buffer = ffi.new("ZSTD_inBuffer *") | ||
Gregory Szorc
|
r42237 | in_buffer.src = ffi.NULL | ||
in_buffer.size = 0 | ||||
in_buffer.pos = 0 | ||||
while True: | ||||
Gregory Szorc
|
r44446 | zresult = lib.ZSTD_compressStream2( | ||
self._compressor._cctx, out_buffer, in_buffer, flush | ||||
) | ||||
Gregory Szorc
|
r42237 | if lib.ZSTD_isError(zresult): | ||
Gregory Szorc
|
r44446 | raise ZstdError("zstd compress error: %s" % _zstd_error(zresult)) | ||
Gregory Szorc
|
r42237 | |||
if out_buffer.pos: | ||||
self._writer.write(ffi.buffer(out_buffer.dst, out_buffer.pos)[:]) | ||||
total_write += out_buffer.pos | ||||
self._bytes_compressed += out_buffer.pos | ||||
out_buffer.pos = 0 | ||||
if not zresult: | ||||
break | ||||
return total_write | ||||
def tell(self): | ||||
return self._bytes_compressed | ||||
class ZstdCompressionObj(object): | ||||
def compress(self, data): | ||||
if self._finished: | ||||
Gregory Szorc
|
r44446 | raise ZstdError("cannot call compress() after compressor finished") | ||
Gregory Szorc
|
r42237 | |||
data_buffer = ffi.from_buffer(data) | ||||
Gregory Szorc
|
r44446 | source = ffi.new("ZSTD_inBuffer *") | ||
Gregory Szorc
|
r42237 | source.src = data_buffer | ||
source.size = len(data_buffer) | ||||
source.pos = 0 | ||||
chunks = [] | ||||
while source.pos < len(data): | ||||
Gregory Szorc
|
r44446 | zresult = lib.ZSTD_compressStream2( | ||
self._compressor._cctx, self._out, source, lib.ZSTD_e_continue | ||||
) | ||||
Gregory Szorc
|
r42237 | if lib.ZSTD_isError(zresult): | ||
Gregory Szorc
|
r44446 | raise ZstdError("zstd compress error: %s" % _zstd_error(zresult)) | ||
Gregory Szorc
|
r42237 | |||
if self._out.pos: | ||||
chunks.append(ffi.buffer(self._out.dst, self._out.pos)[:]) | ||||
self._out.pos = 0 | ||||
Gregory Szorc
|
r44446 | return b"".join(chunks) | ||
Gregory Szorc
|
r42237 | |||
def flush(self, flush_mode=COMPRESSOBJ_FLUSH_FINISH): | ||||
if flush_mode not in (COMPRESSOBJ_FLUSH_FINISH, COMPRESSOBJ_FLUSH_BLOCK): | ||||
Gregory Szorc
|
r44446 | raise ValueError("flush mode not recognized") | ||
Gregory Szorc
|
r42237 | |||
if self._finished: | ||||
Gregory Szorc
|
r44446 | raise ZstdError("compressor object already finished") | ||
Gregory Szorc
|
r42237 | |||
if flush_mode == COMPRESSOBJ_FLUSH_BLOCK: | ||||
z_flush_mode = lib.ZSTD_e_flush | ||||
elif flush_mode == COMPRESSOBJ_FLUSH_FINISH: | ||||
z_flush_mode = lib.ZSTD_e_end | ||||
self._finished = True | ||||
else: | ||||
Gregory Szorc
|
r44446 | raise ZstdError("unhandled flush mode") | ||
Gregory Szorc
|
r42237 | |||
assert self._out.pos == 0 | ||||
Gregory Szorc
|
r44446 | in_buffer = ffi.new("ZSTD_inBuffer *") | ||
Gregory Szorc
|
r42237 | in_buffer.src = ffi.NULL | ||
in_buffer.size = 0 | ||||
in_buffer.pos = 0 | ||||
chunks = [] | ||||
while True: | ||||
Gregory Szorc
|
r44446 | zresult = lib.ZSTD_compressStream2( | ||
self._compressor._cctx, self._out, in_buffer, z_flush_mode | ||||
) | ||||
Gregory Szorc
|
r42237 | if lib.ZSTD_isError(zresult): | ||
Gregory Szorc
|
r44446 | raise ZstdError( | ||
"error ending compression stream: %s" % _zstd_error(zresult) | ||||
) | ||||
Gregory Szorc
|
r42237 | |||
if self._out.pos: | ||||
chunks.append(ffi.buffer(self._out.dst, self._out.pos)[:]) | ||||
self._out.pos = 0 | ||||
if not zresult: | ||||
break | ||||
Gregory Szorc
|
r44446 | return b"".join(chunks) | ||
Gregory Szorc
|
r42237 | |||
class ZstdCompressionChunker(object): | ||||
def __init__(self, compressor, chunk_size): | ||||
self._compressor = compressor | ||||
Gregory Szorc
|
r44446 | self._out = ffi.new("ZSTD_outBuffer *") | ||
self._dst_buffer = ffi.new("char[]", chunk_size) | ||||
Gregory Szorc
|
r42237 | self._out.dst = self._dst_buffer | ||
self._out.size = chunk_size | ||||
self._out.pos = 0 | ||||
Gregory Szorc
|
r44446 | self._in = ffi.new("ZSTD_inBuffer *") | ||
Gregory Szorc
|
r42237 | self._in.src = ffi.NULL | ||
self._in.size = 0 | ||||
self._in.pos = 0 | ||||
self._finished = False | ||||
def compress(self, data): | ||||
if self._finished: | ||||
Gregory Szorc
|
r44446 | raise ZstdError("cannot call compress() after compression finished") | ||
Gregory Szorc
|
r42237 | |||
if self._in.src != ffi.NULL: | ||||
Gregory Szorc
|
r44446 | raise ZstdError( | ||
"cannot perform operation before consuming output " | ||||
"from previous operation" | ||||
) | ||||
Gregory Szorc
|
r42237 | |||
data_buffer = ffi.from_buffer(data) | ||||
if not len(data_buffer): | ||||
return | ||||
self._in.src = data_buffer | ||||
self._in.size = len(data_buffer) | ||||
self._in.pos = 0 | ||||
while self._in.pos < self._in.size: | ||||
Gregory Szorc
|
r44446 | zresult = lib.ZSTD_compressStream2( | ||
self._compressor._cctx, self._out, self._in, lib.ZSTD_e_continue | ||||
) | ||||
Gregory Szorc
|
r42237 | |||
if self._in.pos == self._in.size: | ||||
self._in.src = ffi.NULL | ||||
self._in.size = 0 | ||||
self._in.pos = 0 | ||||
if lib.ZSTD_isError(zresult): | ||||
Gregory Szorc
|
r44446 | raise ZstdError("zstd compress error: %s" % _zstd_error(zresult)) | ||
Gregory Szorc
|
r42237 | |||
if self._out.pos == self._out.size: | ||||
yield ffi.buffer(self._out.dst, self._out.pos)[:] | ||||
self._out.pos = 0 | ||||
def flush(self): | ||||
if self._finished: | ||||
Gregory Szorc
|
r44446 | raise ZstdError("cannot call flush() after compression finished") | ||
Gregory Szorc
|
r42237 | |||
if self._in.src != ffi.NULL: | ||||
Gregory Szorc
|
r44446 | raise ZstdError( | ||
"cannot call flush() before consuming output from " "previous operation" | ||||
) | ||||
Gregory Szorc
|
r42237 | |||
while True: | ||||
Gregory Szorc
|
r44446 | zresult = lib.ZSTD_compressStream2( | ||
self._compressor._cctx, self._out, self._in, lib.ZSTD_e_flush | ||||
) | ||||
Gregory Szorc
|
r42237 | if lib.ZSTD_isError(zresult): | ||
Gregory Szorc
|
r44446 | raise ZstdError("zstd compress error: %s" % _zstd_error(zresult)) | ||
Gregory Szorc
|
r42237 | |||
if self._out.pos: | ||||
yield ffi.buffer(self._out.dst, self._out.pos)[:] | ||||
self._out.pos = 0 | ||||
if not zresult: | ||||
return | ||||
def finish(self): | ||||
if self._finished: | ||||
Gregory Szorc
|
r44446 | raise ZstdError("cannot call finish() after compression finished") | ||
Gregory Szorc
|
r42237 | |||
if self._in.src != ffi.NULL: | ||||
Gregory Szorc
|
r44446 | raise ZstdError( | ||
"cannot call finish() before consuming output from " | ||||
"previous operation" | ||||
) | ||||
Gregory Szorc
|
r42237 | |||
while True: | ||||
Gregory Szorc
|
r44446 | zresult = lib.ZSTD_compressStream2( | ||
self._compressor._cctx, self._out, self._in, lib.ZSTD_e_end | ||||
) | ||||
Gregory Szorc
|
r42237 | if lib.ZSTD_isError(zresult): | ||
Gregory Szorc
|
r44446 | raise ZstdError("zstd compress error: %s" % _zstd_error(zresult)) | ||
Gregory Szorc
|
r42237 | |||
if self._out.pos: | ||||
yield ffi.buffer(self._out.dst, self._out.pos)[:] | ||||
self._out.pos = 0 | ||||
if not zresult: | ||||
self._finished = True | ||||
return | ||||
class ZstdCompressionReader(object): | ||||
def __init__(self, compressor, source, read_size): | ||||
self._compressor = compressor | ||||
self._source = source | ||||
self._read_size = read_size | ||||
self._entered = False | ||||
self._closed = False | ||||
self._bytes_compressed = 0 | ||||
self._finished_input = False | ||||
self._finished_output = False | ||||
Gregory Szorc
|
r44446 | self._in_buffer = ffi.new("ZSTD_inBuffer *") | ||
Gregory Szorc
|
r42237 | # Holds a ref so backing bytes in self._in_buffer stay alive. | ||
self._source_buffer = None | ||||
def __enter__(self): | ||||
if self._entered: | ||||
Gregory Szorc
|
r44446 | raise ValueError("cannot __enter__ multiple times") | ||
Gregory Szorc
|
r42237 | |||
self._entered = True | ||||
return self | ||||
def __exit__(self, exc_type, exc_value, exc_tb): | ||||
self._entered = False | ||||
self._closed = True | ||||
self._source = None | ||||
self._compressor = None | ||||
return False | ||||
def readable(self): | ||||
return True | ||||
def writable(self): | ||||
return False | ||||
def seekable(self): | ||||
return False | ||||
def readline(self): | ||||
raise io.UnsupportedOperation() | ||||
def readlines(self): | ||||
raise io.UnsupportedOperation() | ||||
def write(self, data): | ||||
Gregory Szorc
|
r44446 | raise OSError("stream is not writable") | ||
Gregory Szorc
|
r42237 | |||
def writelines(self, ignored): | ||||
Gregory Szorc
|
r44446 | raise OSError("stream is not writable") | ||
Gregory Szorc
|
r42237 | |||
def isatty(self): | ||||
return False | ||||
def flush(self): | ||||
return None | ||||
def close(self): | ||||
self._closed = True | ||||
return None | ||||
@property | ||||
def closed(self): | ||||
return self._closed | ||||
def tell(self): | ||||
return self._bytes_compressed | ||||
def readall(self): | ||||
chunks = [] | ||||
while True: | ||||
chunk = self.read(1048576) | ||||
if not chunk: | ||||
break | ||||
chunks.append(chunk) | ||||
Gregory Szorc
|
r44446 | return b"".join(chunks) | ||
Gregory Szorc
|
r42237 | |||
def __iter__(self): | ||||
raise io.UnsupportedOperation() | ||||
def __next__(self): | ||||
raise io.UnsupportedOperation() | ||||
next = __next__ | ||||
def _read_input(self): | ||||
if self._finished_input: | ||||
return | ||||
Gregory Szorc
|
r44446 | if hasattr(self._source, "read"): | ||
Gregory Szorc
|
r42237 | data = self._source.read(self._read_size) | ||
if not data: | ||||
self._finished_input = True | ||||
return | ||||
self._source_buffer = ffi.from_buffer(data) | ||||
self._in_buffer.src = self._source_buffer | ||||
self._in_buffer.size = len(self._source_buffer) | ||||
self._in_buffer.pos = 0 | ||||
else: | ||||
self._source_buffer = ffi.from_buffer(self._source) | ||||
self._in_buffer.src = self._source_buffer | ||||
self._in_buffer.size = len(self._source_buffer) | ||||
self._in_buffer.pos = 0 | ||||
def _compress_into_buffer(self, out_buffer): | ||||
if self._in_buffer.pos >= self._in_buffer.size: | ||||
return | ||||
old_pos = out_buffer.pos | ||||
Gregory Szorc
|
r44446 | zresult = lib.ZSTD_compressStream2( | ||
self._compressor._cctx, out_buffer, self._in_buffer, lib.ZSTD_e_continue | ||||
) | ||||
Gregory Szorc
|
r42237 | |||
self._bytes_compressed += out_buffer.pos - old_pos | ||||
if self._in_buffer.pos == self._in_buffer.size: | ||||
self._in_buffer.src = ffi.NULL | ||||
self._in_buffer.pos = 0 | ||||
self._in_buffer.size = 0 | ||||
self._source_buffer = None | ||||
Gregory Szorc
|
r44446 | if not hasattr(self._source, "read"): | ||
Gregory Szorc
|
r42237 | self._finished_input = True | ||
if lib.ZSTD_isError(zresult): | ||||
Gregory Szorc
|
r44446 | raise ZstdError("zstd compress error: %s", _zstd_error(zresult)) | ||
Gregory Szorc
|
r42237 | |||
return out_buffer.pos and out_buffer.pos == out_buffer.size | ||||
def read(self, size=-1): | ||||
if self._closed: | ||||
Gregory Szorc
|
r44446 | raise ValueError("stream is closed") | ||
Gregory Szorc
|
r42237 | |||
if size < -1: | ||||
Gregory Szorc
|
r44446 | raise ValueError("cannot read negative amounts less than -1") | ||
Gregory Szorc
|
r42237 | |||
if size == -1: | ||||
return self.readall() | ||||
if self._finished_output or size == 0: | ||||
Gregory Szorc
|
r44446 | return b"" | ||
Gregory Szorc
|
r42237 | |||
# Need a dedicated ref to dest buffer otherwise it gets collected. | ||||
Gregory Szorc
|
r44446 | dst_buffer = ffi.new("char[]", size) | ||
out_buffer = ffi.new("ZSTD_outBuffer *") | ||||
Gregory Szorc
|
r42237 | out_buffer.dst = dst_buffer | ||
out_buffer.size = size | ||||
out_buffer.pos = 0 | ||||
if self._compress_into_buffer(out_buffer): | ||||
return ffi.buffer(out_buffer.dst, out_buffer.pos)[:] | ||||
while not self._finished_input: | ||||
self._read_input() | ||||
if self._compress_into_buffer(out_buffer): | ||||
return ffi.buffer(out_buffer.dst, out_buffer.pos)[:] | ||||
# EOF | ||||
old_pos = out_buffer.pos | ||||
Gregory Szorc
|
r44446 | zresult = lib.ZSTD_compressStream2( | ||
self._compressor._cctx, out_buffer, self._in_buffer, lib.ZSTD_e_end | ||||
) | ||||
Gregory Szorc
|
r42237 | |||
self._bytes_compressed += out_buffer.pos - old_pos | ||||
if lib.ZSTD_isError(zresult): | ||||
Gregory Szorc
|
r44446 | raise ZstdError("error ending compression stream: %s", _zstd_error(zresult)) | ||
Gregory Szorc
|
r42237 | |||
if zresult == 0: | ||||
self._finished_output = True | ||||
return ffi.buffer(out_buffer.dst, out_buffer.pos)[:] | ||||
def read1(self, size=-1): | ||||
if self._closed: | ||||
Gregory Szorc
|
r44446 | raise ValueError("stream is closed") | ||
Gregory Szorc
|
r42237 | |||
if size < -1: | ||||
Gregory Szorc
|
r44446 | raise ValueError("cannot read negative amounts less than -1") | ||
Gregory Szorc
|
r42237 | |||
if self._finished_output or size == 0: | ||||
Gregory Szorc
|
r44446 | return b"" | ||
Gregory Szorc
|
r42237 | |||
# -1 returns arbitrary number of bytes. | ||||
if size == -1: | ||||
size = COMPRESSION_RECOMMENDED_OUTPUT_SIZE | ||||
Gregory Szorc
|
r44446 | dst_buffer = ffi.new("char[]", size) | ||
out_buffer = ffi.new("ZSTD_outBuffer *") | ||||
Gregory Szorc
|
r42237 | out_buffer.dst = dst_buffer | ||
out_buffer.size = size | ||||
out_buffer.pos = 0 | ||||
# read1() dictates that we can perform at most 1 call to the | ||||
# underlying stream to get input. However, we can't satisfy this | ||||
# restriction with compression because not all input generates output. | ||||
# It is possible to perform a block flush in order to ensure output. | ||||
# But this may not be desirable behavior. So we allow multiple read() | ||||
# to the underlying stream. But unlike read(), we stop once we have | ||||
# any output. | ||||
self._compress_into_buffer(out_buffer) | ||||
if out_buffer.pos: | ||||
return ffi.buffer(out_buffer.dst, out_buffer.pos)[:] | ||||
while not self._finished_input: | ||||
self._read_input() | ||||
# If we've filled the output buffer, return immediately. | ||||
if self._compress_into_buffer(out_buffer): | ||||
return ffi.buffer(out_buffer.dst, out_buffer.pos)[:] | ||||
# If we've populated the output buffer and we're not at EOF, | ||||
# also return, as we've satisfied the read1() limits. | ||||
if out_buffer.pos and not self._finished_input: | ||||
return ffi.buffer(out_buffer.dst, out_buffer.pos)[:] | ||||
# Else if we're at EOS and we have room left in the buffer, | ||||
# fall through to below and try to add more data to the output. | ||||
# EOF. | ||||
old_pos = out_buffer.pos | ||||
Gregory Szorc
|
r44446 | zresult = lib.ZSTD_compressStream2( | ||
self._compressor._cctx, out_buffer, self._in_buffer, lib.ZSTD_e_end | ||||
) | ||||
Gregory Szorc
|
r42237 | |||
self._bytes_compressed += out_buffer.pos - old_pos | ||||
if lib.ZSTD_isError(zresult): | ||||
Gregory Szorc
|
r44446 | raise ZstdError( | ||
"error ending compression stream: %s" % _zstd_error(zresult) | ||||
) | ||||
Gregory Szorc
|
r42237 | |||
if zresult == 0: | ||||
self._finished_output = True | ||||
return ffi.buffer(out_buffer.dst, out_buffer.pos)[:] | ||||
def readinto(self, b): | ||||
if self._closed: | ||||
Gregory Szorc
|
r44446 | raise ValueError("stream is closed") | ||
Gregory Szorc
|
r42237 | |||
if self._finished_output: | ||||
return 0 | ||||
# TODO use writable=True once we require CFFI >= 1.12. | ||||
dest_buffer = ffi.from_buffer(b) | ||||
Gregory Szorc
|
r44446 | ffi.memmove(b, b"", 0) | ||
out_buffer = ffi.new("ZSTD_outBuffer *") | ||||
Gregory Szorc
|
r42237 | out_buffer.dst = dest_buffer | ||
out_buffer.size = len(dest_buffer) | ||||
out_buffer.pos = 0 | ||||
if self._compress_into_buffer(out_buffer): | ||||
return out_buffer.pos | ||||
while not self._finished_input: | ||||
self._read_input() | ||||
if self._compress_into_buffer(out_buffer): | ||||
return out_buffer.pos | ||||
# EOF. | ||||
old_pos = out_buffer.pos | ||||
Gregory Szorc
|
r44446 | zresult = lib.ZSTD_compressStream2( | ||
self._compressor._cctx, out_buffer, self._in_buffer, lib.ZSTD_e_end | ||||
) | ||||
Gregory Szorc
|
r42237 | |||
self._bytes_compressed += out_buffer.pos - old_pos | ||||
if lib.ZSTD_isError(zresult): | ||||
Gregory Szorc
|
r44446 | raise ZstdError("error ending compression stream: %s", _zstd_error(zresult)) | ||
Gregory Szorc
|
r42237 | |||
if zresult == 0: | ||||
self._finished_output = True | ||||
return out_buffer.pos | ||||
def readinto1(self, b): | ||||
if self._closed: | ||||
Gregory Szorc
|
r44446 | raise ValueError("stream is closed") | ||
Gregory Szorc
|
r42237 | |||
if self._finished_output: | ||||
return 0 | ||||
# TODO use writable=True once we require CFFI >= 1.12. | ||||
dest_buffer = ffi.from_buffer(b) | ||||
Gregory Szorc
|
r44446 | ffi.memmove(b, b"", 0) | ||
out_buffer = ffi.new("ZSTD_outBuffer *") | ||||
Gregory Szorc
|
r42237 | out_buffer.dst = dest_buffer | ||
out_buffer.size = len(dest_buffer) | ||||
out_buffer.pos = 0 | ||||
self._compress_into_buffer(out_buffer) | ||||
if out_buffer.pos: | ||||
return out_buffer.pos | ||||
while not self._finished_input: | ||||
self._read_input() | ||||
if self._compress_into_buffer(out_buffer): | ||||
return out_buffer.pos | ||||
if out_buffer.pos and not self._finished_input: | ||||
return out_buffer.pos | ||||
# EOF. | ||||
old_pos = out_buffer.pos | ||||
Gregory Szorc
|
r44446 | zresult = lib.ZSTD_compressStream2( | ||
self._compressor._cctx, out_buffer, self._in_buffer, lib.ZSTD_e_end | ||||
) | ||||
Gregory Szorc
|
r42237 | |||
self._bytes_compressed += out_buffer.pos - old_pos | ||||
if lib.ZSTD_isError(zresult): | ||||
Gregory Szorc
|
r44446 | raise ZstdError( | ||
"error ending compression stream: %s" % _zstd_error(zresult) | ||||
) | ||||
Gregory Szorc
|
r42237 | |||
if zresult == 0: | ||||
self._finished_output = True | ||||
return out_buffer.pos | ||||
class ZstdCompressor(object): | ||||
Gregory Szorc
|
r44446 | def __init__( | ||
self, | ||||
level=3, | ||||
dict_data=None, | ||||
compression_params=None, | ||||
write_checksum=None, | ||||
write_content_size=None, | ||||
write_dict_id=None, | ||||
threads=0, | ||||
): | ||||
Gregory Szorc
|
r42237 | if level > lib.ZSTD_maxCLevel(): | ||
Gregory Szorc
|
r44446 | raise ValueError("level must be less than %d" % lib.ZSTD_maxCLevel()) | ||
Gregory Szorc
|
r42237 | |||
if threads < 0: | ||||
threads = _cpu_count() | ||||
if compression_params and write_checksum is not None: | ||||
Gregory Szorc
|
r44446 | raise ValueError("cannot define compression_params and " "write_checksum") | ||
Gregory Szorc
|
r42237 | |||
if compression_params and write_content_size is not None: | ||||
Gregory Szorc
|
r44446 | raise ValueError( | ||
"cannot define compression_params and " "write_content_size" | ||||
) | ||||
Gregory Szorc
|
r42237 | |||
if compression_params and write_dict_id is not None: | ||||
Gregory Szorc
|
r44446 | raise ValueError("cannot define compression_params and " "write_dict_id") | ||
Gregory Szorc
|
r42237 | |||
if compression_params and threads: | ||||
Gregory Szorc
|
r44446 | raise ValueError("cannot define compression_params and threads") | ||
Gregory Szorc
|
r42237 | |||
if compression_params: | ||||
self._params = _make_cctx_params(compression_params) | ||||
else: | ||||
if write_dict_id is None: | ||||
write_dict_id = True | ||||
params = lib.ZSTD_createCCtxParams() | ||||
if params == ffi.NULL: | ||||
raise MemoryError() | ||||
self._params = ffi.gc(params, lib.ZSTD_freeCCtxParams) | ||||
Gregory Szorc
|
r44446 | _set_compression_parameter(self._params, lib.ZSTD_c_compressionLevel, level) | ||
Gregory Szorc
|
r42237 | |||
_set_compression_parameter( | ||||
self._params, | ||||
lib.ZSTD_c_contentSizeFlag, | ||||
Gregory Szorc
|
r44446 | write_content_size if write_content_size is not None else 1, | ||
) | ||||
_set_compression_parameter( | ||||
self._params, lib.ZSTD_c_checksumFlag, 1 if write_checksum else 0 | ||||
) | ||||
_set_compression_parameter( | ||||
self._params, lib.ZSTD_c_dictIDFlag, 1 if write_dict_id else 0 | ||||
) | ||||
Gregory Szorc
|
r42237 | |||
if threads: | ||||
Gregory Szorc
|
r44446 | _set_compression_parameter(self._params, lib.ZSTD_c_nbWorkers, threads) | ||
Gregory Szorc
|
r42237 | |||
cctx = lib.ZSTD_createCCtx() | ||||
if cctx == ffi.NULL: | ||||
raise MemoryError() | ||||
self._cctx = cctx | ||||
self._dict_data = dict_data | ||||
# We defer setting up garbage collection until after calling | ||||
# _setup_cctx() to ensure the memory size estimate is more accurate. | ||||
try: | ||||
self._setup_cctx() | ||||
finally: | ||||
Gregory Szorc
|
r44446 | self._cctx = ffi.gc( | ||
cctx, lib.ZSTD_freeCCtx, size=lib.ZSTD_sizeof_CCtx(cctx) | ||||
) | ||||
Gregory Szorc
|
r42237 | |||
def _setup_cctx(self): | ||||
Gregory Szorc
|
r44446 | zresult = lib.ZSTD_CCtx_setParametersUsingCCtxParams(self._cctx, self._params) | ||
Gregory Szorc
|
r42237 | if lib.ZSTD_isError(zresult): | ||
Gregory Szorc
|
r44446 | raise ZstdError( | ||
"could not set compression parameters: %s" % _zstd_error(zresult) | ||||
) | ||||
Gregory Szorc
|
r42237 | |||
dict_data = self._dict_data | ||||
if dict_data: | ||||
if dict_data._cdict: | ||||
zresult = lib.ZSTD_CCtx_refCDict(self._cctx, dict_data._cdict) | ||||
else: | ||||
zresult = lib.ZSTD_CCtx_loadDictionary_advanced( | ||||
Gregory Szorc
|
r44446 | self._cctx, | ||
dict_data.as_bytes(), | ||||
len(dict_data), | ||||
lib.ZSTD_dlm_byRef, | ||||
dict_data._dict_type, | ||||
) | ||||
Gregory Szorc
|
r42237 | |||
if lib.ZSTD_isError(zresult): | ||||
Gregory Szorc
|
r44446 | raise ZstdError( | ||
"could not load compression dictionary: %s" % _zstd_error(zresult) | ||||
) | ||||
Gregory Szorc
|
r42237 | |||
def memory_size(self): | ||||
return lib.ZSTD_sizeof_CCtx(self._cctx) | ||||
def compress(self, data): | ||||
lib.ZSTD_CCtx_reset(self._cctx, lib.ZSTD_reset_session_only) | ||||
data_buffer = ffi.from_buffer(data) | ||||
dest_size = lib.ZSTD_compressBound(len(data_buffer)) | ||||
Gregory Szorc
|
r44446 | out = new_nonzero("char[]", dest_size) | ||
Gregory Szorc
|
r42237 | |||
zresult = lib.ZSTD_CCtx_setPledgedSrcSize(self._cctx, len(data_buffer)) | ||||
if lib.ZSTD_isError(zresult): | ||||
Gregory Szorc
|
r44446 | raise ZstdError("error setting source size: %s" % _zstd_error(zresult)) | ||
out_buffer = ffi.new("ZSTD_outBuffer *") | ||||
in_buffer = ffi.new("ZSTD_inBuffer *") | ||||
Gregory Szorc
|
r42237 | |||
out_buffer.dst = out | ||||
out_buffer.size = dest_size | ||||
out_buffer.pos = 0 | ||||
in_buffer.src = data_buffer | ||||
in_buffer.size = len(data_buffer) | ||||
in_buffer.pos = 0 | ||||
Gregory Szorc
|
r44446 | zresult = lib.ZSTD_compressStream2( | ||
self._cctx, out_buffer, in_buffer, lib.ZSTD_e_end | ||||
) | ||||
Gregory Szorc
|
r42237 | |||
if lib.ZSTD_isError(zresult): | ||||
Gregory Szorc
|
r44446 | raise ZstdError("cannot compress: %s" % _zstd_error(zresult)) | ||
Gregory Szorc
|
r42237 | elif zresult: | ||
Gregory Szorc
|
r44446 | raise ZstdError("unexpected partial frame flush") | ||
Gregory Szorc
|
r42237 | |||
return ffi.buffer(out, out_buffer.pos)[:] | ||||
def compressobj(self, size=-1): | ||||
lib.ZSTD_CCtx_reset(self._cctx, lib.ZSTD_reset_session_only) | ||||
if size < 0: | ||||
size = lib.ZSTD_CONTENTSIZE_UNKNOWN | ||||
zresult = lib.ZSTD_CCtx_setPledgedSrcSize(self._cctx, size) | ||||
if lib.ZSTD_isError(zresult): | ||||
Gregory Szorc
|
r44446 | raise ZstdError("error setting source size: %s" % _zstd_error(zresult)) | ||
Gregory Szorc
|
r42237 | |||
cobj = ZstdCompressionObj() | ||||
Gregory Szorc
|
r44446 | cobj._out = ffi.new("ZSTD_outBuffer *") | ||
cobj._dst_buffer = ffi.new("char[]", COMPRESSION_RECOMMENDED_OUTPUT_SIZE) | ||||
Gregory Szorc
|
r42237 | cobj._out.dst = cobj._dst_buffer | ||
cobj._out.size = COMPRESSION_RECOMMENDED_OUTPUT_SIZE | ||||
cobj._out.pos = 0 | ||||
cobj._compressor = self | ||||
cobj._finished = False | ||||
return cobj | ||||
def chunker(self, size=-1, chunk_size=COMPRESSION_RECOMMENDED_OUTPUT_SIZE): | ||||
lib.ZSTD_CCtx_reset(self._cctx, lib.ZSTD_reset_session_only) | ||||
if size < 0: | ||||
size = lib.ZSTD_CONTENTSIZE_UNKNOWN | ||||
zresult = lib.ZSTD_CCtx_setPledgedSrcSize(self._cctx, size) | ||||
if lib.ZSTD_isError(zresult): | ||||
Gregory Szorc
|
r44446 | raise ZstdError("error setting source size: %s" % _zstd_error(zresult)) | ||
Gregory Szorc
|
r42237 | |||
return ZstdCompressionChunker(self, chunk_size=chunk_size) | ||||
Gregory Szorc
|
r44446 | def copy_stream( | ||
self, | ||||
ifh, | ||||
ofh, | ||||
size=-1, | ||||
read_size=COMPRESSION_RECOMMENDED_INPUT_SIZE, | ||||
write_size=COMPRESSION_RECOMMENDED_OUTPUT_SIZE, | ||||
): | ||||
if not hasattr(ifh, "read"): | ||||
raise ValueError("first argument must have a read() method") | ||||
if not hasattr(ofh, "write"): | ||||
raise ValueError("second argument must have a write() method") | ||||
Gregory Szorc
|
r42237 | |||
lib.ZSTD_CCtx_reset(self._cctx, lib.ZSTD_reset_session_only) | ||||
if size < 0: | ||||
size = lib.ZSTD_CONTENTSIZE_UNKNOWN | ||||
zresult = lib.ZSTD_CCtx_setPledgedSrcSize(self._cctx, size) | ||||
if lib.ZSTD_isError(zresult): | ||||
Gregory Szorc
|
r44446 | raise ZstdError("error setting source size: %s" % _zstd_error(zresult)) | ||
in_buffer = ffi.new("ZSTD_inBuffer *") | ||||
out_buffer = ffi.new("ZSTD_outBuffer *") | ||||
dst_buffer = ffi.new("char[]", write_size) | ||||
Gregory Szorc
|
r42237 | out_buffer.dst = dst_buffer | ||
out_buffer.size = write_size | ||||
out_buffer.pos = 0 | ||||
total_read, total_write = 0, 0 | ||||
while True: | ||||
data = ifh.read(read_size) | ||||
if not data: | ||||
break | ||||
data_buffer = ffi.from_buffer(data) | ||||
total_read += len(data_buffer) | ||||
in_buffer.src = data_buffer | ||||
in_buffer.size = len(data_buffer) | ||||
in_buffer.pos = 0 | ||||
while in_buffer.pos < in_buffer.size: | ||||
Gregory Szorc
|
r44446 | zresult = lib.ZSTD_compressStream2( | ||
self._cctx, out_buffer, in_buffer, lib.ZSTD_e_continue | ||||
) | ||||
Gregory Szorc
|
r42237 | if lib.ZSTD_isError(zresult): | ||
Gregory Szorc
|
r44446 | raise ZstdError("zstd compress error: %s" % _zstd_error(zresult)) | ||
Gregory Szorc
|
r42237 | |||
if out_buffer.pos: | ||||
ofh.write(ffi.buffer(out_buffer.dst, out_buffer.pos)) | ||||
total_write += out_buffer.pos | ||||
out_buffer.pos = 0 | ||||
# We've finished reading. Flush the compressor. | ||||
while True: | ||||
Gregory Szorc
|
r44446 | zresult = lib.ZSTD_compressStream2( | ||
self._cctx, out_buffer, in_buffer, lib.ZSTD_e_end | ||||
) | ||||
Gregory Szorc
|
r42237 | if lib.ZSTD_isError(zresult): | ||
Gregory Szorc
|
r44446 | raise ZstdError( | ||
"error ending compression stream: %s" % _zstd_error(zresult) | ||||
) | ||||
Gregory Szorc
|
r42237 | |||
if out_buffer.pos: | ||||
ofh.write(ffi.buffer(out_buffer.dst, out_buffer.pos)) | ||||
total_write += out_buffer.pos | ||||
out_buffer.pos = 0 | ||||
if zresult == 0: | ||||
break | ||||
return total_read, total_write | ||||
Gregory Szorc
|
r44446 | def stream_reader( | ||
self, source, size=-1, read_size=COMPRESSION_RECOMMENDED_INPUT_SIZE | ||||
): | ||||
Gregory Szorc
|
r42237 | lib.ZSTD_CCtx_reset(self._cctx, lib.ZSTD_reset_session_only) | ||
try: | ||||
size = len(source) | ||||
except Exception: | ||||
pass | ||||
if size < 0: | ||||
size = lib.ZSTD_CONTENTSIZE_UNKNOWN | ||||
zresult = lib.ZSTD_CCtx_setPledgedSrcSize(self._cctx, size) | ||||
if lib.ZSTD_isError(zresult): | ||||
Gregory Szorc
|
r44446 | raise ZstdError("error setting source size: %s" % _zstd_error(zresult)) | ||
Gregory Szorc
|
r42237 | |||
return ZstdCompressionReader(self, source, read_size) | ||||
Gregory Szorc
|
r44446 | def stream_writer( | ||
self, | ||||
writer, | ||||
size=-1, | ||||
write_size=COMPRESSION_RECOMMENDED_OUTPUT_SIZE, | ||||
write_return_read=False, | ||||
): | ||||
if not hasattr(writer, "write"): | ||||
raise ValueError("must pass an object with a write() method") | ||||
Gregory Szorc
|
r42237 | |||
lib.ZSTD_CCtx_reset(self._cctx, lib.ZSTD_reset_session_only) | ||||
if size < 0: | ||||
size = lib.ZSTD_CONTENTSIZE_UNKNOWN | ||||
Gregory Szorc
|
r44446 | return ZstdCompressionWriter(self, writer, size, write_size, write_return_read) | ||
Gregory Szorc
|
r42237 | |||
write_to = stream_writer | ||||
Gregory Szorc
|
r44446 | def read_to_iter( | ||
self, | ||||
reader, | ||||
size=-1, | ||||
read_size=COMPRESSION_RECOMMENDED_INPUT_SIZE, | ||||
write_size=COMPRESSION_RECOMMENDED_OUTPUT_SIZE, | ||||
): | ||||
if hasattr(reader, "read"): | ||||
Gregory Szorc
|
r42237 | have_read = True | ||
Gregory Szorc
|
r44446 | elif hasattr(reader, "__getitem__"): | ||
Gregory Szorc
|
r42237 | have_read = False | ||
buffer_offset = 0 | ||||
size = len(reader) | ||||
else: | ||||
Gregory Szorc
|
r44446 | raise ValueError( | ||
"must pass an object with a read() method or " | ||||
"conforms to buffer protocol" | ||||
) | ||||
Gregory Szorc
|
r42237 | |||
lib.ZSTD_CCtx_reset(self._cctx, lib.ZSTD_reset_session_only) | ||||
if size < 0: | ||||
size = lib.ZSTD_CONTENTSIZE_UNKNOWN | ||||
zresult = lib.ZSTD_CCtx_setPledgedSrcSize(self._cctx, size) | ||||
if lib.ZSTD_isError(zresult): | ||||
Gregory Szorc
|
r44446 | raise ZstdError("error setting source size: %s" % _zstd_error(zresult)) | ||
in_buffer = ffi.new("ZSTD_inBuffer *") | ||||
out_buffer = ffi.new("ZSTD_outBuffer *") | ||||
Gregory Szorc
|
r42237 | |||
in_buffer.src = ffi.NULL | ||||
in_buffer.size = 0 | ||||
in_buffer.pos = 0 | ||||
Gregory Szorc
|
r44446 | dst_buffer = ffi.new("char[]", write_size) | ||
Gregory Szorc
|
r42237 | out_buffer.dst = dst_buffer | ||
out_buffer.size = write_size | ||||
out_buffer.pos = 0 | ||||
while True: | ||||
# We should never have output data sitting around after a previous | ||||
# iteration. | ||||
assert out_buffer.pos == 0 | ||||
# Collect input data. | ||||
if have_read: | ||||
read_result = reader.read(read_size) | ||||
else: | ||||
remaining = len(reader) - buffer_offset | ||||
slice_size = min(remaining, read_size) | ||||
Gregory Szorc
|
r44446 | read_result = reader[buffer_offset : buffer_offset + slice_size] | ||
Gregory Szorc
|
r42237 | buffer_offset += slice_size | ||
# No new input data. Break out of the read loop. | ||||
if not read_result: | ||||
break | ||||
# Feed all read data into the compressor and emit output until | ||||
# exhausted. | ||||
read_buffer = ffi.from_buffer(read_result) | ||||
in_buffer.src = read_buffer | ||||
in_buffer.size = len(read_buffer) | ||||
in_buffer.pos = 0 | ||||
while in_buffer.pos < in_buffer.size: | ||||
Gregory Szorc
|
r44446 | zresult = lib.ZSTD_compressStream2( | ||
self._cctx, out_buffer, in_buffer, lib.ZSTD_e_continue | ||||
) | ||||
Gregory Szorc
|
r42237 | if lib.ZSTD_isError(zresult): | ||
Gregory Szorc
|
r44446 | raise ZstdError("zstd compress error: %s" % _zstd_error(zresult)) | ||
Gregory Szorc
|
r42237 | |||
if out_buffer.pos: | ||||
data = ffi.buffer(out_buffer.dst, out_buffer.pos)[:] | ||||
out_buffer.pos = 0 | ||||
yield data | ||||
assert out_buffer.pos == 0 | ||||
# And repeat the loop to collect more data. | ||||
continue | ||||
# If we get here, input is exhausted. End the stream and emit what | ||||
# remains. | ||||
while True: | ||||
assert out_buffer.pos == 0 | ||||
Gregory Szorc
|
r44446 | zresult = lib.ZSTD_compressStream2( | ||
self._cctx, out_buffer, in_buffer, lib.ZSTD_e_end | ||||
) | ||||
Gregory Szorc
|
r42237 | if lib.ZSTD_isError(zresult): | ||
Gregory Szorc
|
r44446 | raise ZstdError( | ||
"error ending compression stream: %s" % _zstd_error(zresult) | ||||
) | ||||
Gregory Szorc
|
r42237 | |||
if out_buffer.pos: | ||||
data = ffi.buffer(out_buffer.dst, out_buffer.pos)[:] | ||||
out_buffer.pos = 0 | ||||
yield data | ||||
if zresult == 0: | ||||
break | ||||
read_from = read_to_iter | ||||
def frame_progression(self): | ||||
progression = lib.ZSTD_getFrameProgression(self._cctx) | ||||
return progression.ingested, progression.consumed, progression.produced | ||||
class FrameParameters(object): | ||||
def __init__(self, fparams): | ||||
self.content_size = fparams.frameContentSize | ||||
self.window_size = fparams.windowSize | ||||
self.dict_id = fparams.dictID | ||||
self.has_checksum = bool(fparams.checksumFlag) | ||||
def frame_content_size(data): | ||||
data_buffer = ffi.from_buffer(data) | ||||
size = lib.ZSTD_getFrameContentSize(data_buffer, len(data_buffer)) | ||||
if size == lib.ZSTD_CONTENTSIZE_ERROR: | ||||
Gregory Szorc
|
r44446 | raise ZstdError("error when determining content size") | ||
Gregory Szorc
|
r42237 | elif size == lib.ZSTD_CONTENTSIZE_UNKNOWN: | ||
return -1 | ||||
else: | ||||
return size | ||||
def frame_header_size(data): | ||||
data_buffer = ffi.from_buffer(data) | ||||
zresult = lib.ZSTD_frameHeaderSize(data_buffer, len(data_buffer)) | ||||
if lib.ZSTD_isError(zresult): | ||||
Gregory Szorc
|
r44446 | raise ZstdError( | ||
"could not determine frame header size: %s" % _zstd_error(zresult) | ||||
) | ||||
Gregory Szorc
|
r42237 | |||
return zresult | ||||
def get_frame_parameters(data): | ||||
Gregory Szorc
|
r44446 | params = ffi.new("ZSTD_frameHeader *") | ||
Gregory Szorc
|
r42237 | |||
data_buffer = ffi.from_buffer(data) | ||||
zresult = lib.ZSTD_getFrameHeader(params, data_buffer, len(data_buffer)) | ||||
if lib.ZSTD_isError(zresult): | ||||
Gregory Szorc
|
r44446 | raise ZstdError("cannot get frame parameters: %s" % _zstd_error(zresult)) | ||
Gregory Szorc
|
r42237 | |||
if zresult: | ||||
Gregory Szorc
|
r44446 | raise ZstdError("not enough data for frame parameters; need %d bytes" % zresult) | ||
Gregory Szorc
|
r42237 | |||
return FrameParameters(params[0]) | ||||
class ZstdCompressionDict(object): | ||||
def __init__(self, data, dict_type=DICT_TYPE_AUTO, k=0, d=0): | ||||
assert isinstance(data, bytes_type) | ||||
self._data = data | ||||
self.k = k | ||||
self.d = d | ||||
Gregory Szorc
|
r44446 | if dict_type not in (DICT_TYPE_AUTO, DICT_TYPE_RAWCONTENT, DICT_TYPE_FULLDICT): | ||
raise ValueError( | ||||
"invalid dictionary load mode: %d; must use " "DICT_TYPE_* constants" | ||||
) | ||||
Gregory Szorc
|
r42237 | |||
self._dict_type = dict_type | ||||
self._cdict = None | ||||
def __len__(self): | ||||
return len(self._data) | ||||
def dict_id(self): | ||||
return int_type(lib.ZDICT_getDictID(self._data, len(self._data))) | ||||
def as_bytes(self): | ||||
return self._data | ||||
def precompute_compress(self, level=0, compression_params=None): | ||||
if level and compression_params: | ||||
Gregory Szorc
|
r44446 | raise ValueError("must only specify one of level or " "compression_params") | ||
Gregory Szorc
|
r42237 | |||
if not level and not compression_params: | ||||
Gregory Szorc
|
r44446 | raise ValueError("must specify one of level or compression_params") | ||
Gregory Szorc
|
r42237 | |||
if level: | ||||
cparams = lib.ZSTD_getCParams(level, 0, len(self._data)) | ||||
else: | ||||
Gregory Szorc
|
r44446 | cparams = ffi.new("ZSTD_compressionParameters") | ||
Gregory Szorc
|
r42237 | cparams.chainLog = compression_params.chain_log | ||
cparams.hashLog = compression_params.hash_log | ||||
cparams.minMatch = compression_params.min_match | ||||
cparams.searchLog = compression_params.search_log | ||||
cparams.strategy = compression_params.compression_strategy | ||||
cparams.targetLength = compression_params.target_length | ||||
cparams.windowLog = compression_params.window_log | ||||
Gregory Szorc
|
r44446 | cdict = lib.ZSTD_createCDict_advanced( | ||
self._data, | ||||
len(self._data), | ||||
lib.ZSTD_dlm_byRef, | ||||
self._dict_type, | ||||
cparams, | ||||
lib.ZSTD_defaultCMem, | ||||
) | ||||
Gregory Szorc
|
r42237 | if cdict == ffi.NULL: | ||
Gregory Szorc
|
r44446 | raise ZstdError("unable to precompute dictionary") | ||
self._cdict = ffi.gc( | ||||
cdict, lib.ZSTD_freeCDict, size=lib.ZSTD_sizeof_CDict(cdict) | ||||
) | ||||
Gregory Szorc
|
r42237 | |||
@property | ||||
def _ddict(self): | ||||
Gregory Szorc
|
r44446 | ddict = lib.ZSTD_createDDict_advanced( | ||
self._data, | ||||
len(self._data), | ||||
lib.ZSTD_dlm_byRef, | ||||
self._dict_type, | ||||
lib.ZSTD_defaultCMem, | ||||
) | ||||
Gregory Szorc
|
r42237 | |||
if ddict == ffi.NULL: | ||||
Gregory Szorc
|
r44446 | raise ZstdError("could not create decompression dict") | ||
ddict = ffi.gc(ddict, lib.ZSTD_freeDDict, size=lib.ZSTD_sizeof_DDict(ddict)) | ||||
self.__dict__["_ddict"] = ddict | ||||
Gregory Szorc
|
r42237 | |||
return ddict | ||||
Gregory Szorc
|
r44446 | |||
def train_dictionary( | ||||
dict_size, | ||||
samples, | ||||
k=0, | ||||
d=0, | ||||
notifications=0, | ||||
dict_id=0, | ||||
level=0, | ||||
steps=0, | ||||
threads=0, | ||||
): | ||||
Gregory Szorc
|
r42237 | if not isinstance(samples, list): | ||
Gregory Szorc
|
r44446 | raise TypeError("samples must be a list") | ||
Gregory Szorc
|
r42237 | |||
if threads < 0: | ||||
threads = _cpu_count() | ||||
total_size = sum(map(len, samples)) | ||||
Gregory Szorc
|
r44446 | samples_buffer = new_nonzero("char[]", total_size) | ||
sample_sizes = new_nonzero("size_t[]", len(samples)) | ||||
Gregory Szorc
|
r42237 | |||
offset = 0 | ||||
for i, sample in enumerate(samples): | ||||
if not isinstance(sample, bytes_type): | ||||
Gregory Szorc
|
r44446 | raise ValueError("samples must be bytes") | ||
Gregory Szorc
|
r42237 | |||
l = len(sample) | ||||
ffi.memmove(samples_buffer + offset, sample, l) | ||||
offset += l | ||||
sample_sizes[i] = l | ||||
Gregory Szorc
|
r44446 | dict_data = new_nonzero("char[]", dict_size) | ||
dparams = ffi.new("ZDICT_cover_params_t *")[0] | ||||
Gregory Szorc
|
r42237 | dparams.k = k | ||
dparams.d = d | ||||
dparams.steps = steps | ||||
dparams.nbThreads = threads | ||||
dparams.zParams.notificationLevel = notifications | ||||
dparams.zParams.dictID = dict_id | ||||
dparams.zParams.compressionLevel = level | ||||
Gregory Szorc
|
r44446 | if ( | ||
not dparams.k | ||||
and not dparams.d | ||||
and not dparams.steps | ||||
and not dparams.nbThreads | ||||
and not dparams.zParams.notificationLevel | ||||
Gregory Szorc
|
r42237 | and not dparams.zParams.dictID | ||
Gregory Szorc
|
r44446 | and not dparams.zParams.compressionLevel | ||
): | ||||
Gregory Szorc
|
r42237 | zresult = lib.ZDICT_trainFromBuffer( | ||
Gregory Szorc
|
r44446 | ffi.addressof(dict_data), | ||
dict_size, | ||||
Gregory Szorc
|
r42237 | ffi.addressof(samples_buffer), | ||
Gregory Szorc
|
r44446 | ffi.addressof(sample_sizes, 0), | ||
len(samples), | ||||
) | ||||
Gregory Szorc
|
r42237 | elif dparams.steps or dparams.nbThreads: | ||
zresult = lib.ZDICT_optimizeTrainFromBuffer_cover( | ||||
Gregory Szorc
|
r44446 | ffi.addressof(dict_data), | ||
dict_size, | ||||
Gregory Szorc
|
r42237 | ffi.addressof(samples_buffer), | ||
Gregory Szorc
|
r44446 | ffi.addressof(sample_sizes, 0), | ||
len(samples), | ||||
ffi.addressof(dparams), | ||||
) | ||||
Gregory Szorc
|
r42237 | else: | ||
zresult = lib.ZDICT_trainFromBuffer_cover( | ||||
Gregory Szorc
|
r44446 | ffi.addressof(dict_data), | ||
dict_size, | ||||
Gregory Szorc
|
r42237 | ffi.addressof(samples_buffer), | ||
Gregory Szorc
|
r44446 | ffi.addressof(sample_sizes, 0), | ||
len(samples), | ||||
dparams, | ||||
) | ||||
Gregory Szorc
|
r42237 | |||
if lib.ZDICT_isError(zresult): | ||||
Gregory Szorc
|
r44446 | msg = ffi.string(lib.ZDICT_getErrorName(zresult)).decode("utf-8") | ||
raise ZstdError("cannot train dict: %s" % msg) | ||||
return ZstdCompressionDict( | ||||
ffi.buffer(dict_data, zresult)[:], | ||||
dict_type=DICT_TYPE_FULLDICT, | ||||
k=dparams.k, | ||||
d=dparams.d, | ||||
) | ||||
Gregory Szorc
|
r42237 | |||
class ZstdDecompressionObj(object): | ||||
def __init__(self, decompressor, write_size): | ||||
self._decompressor = decompressor | ||||
self._write_size = write_size | ||||
self._finished = False | ||||
def decompress(self, data): | ||||
if self._finished: | ||||
Gregory Szorc
|
r44446 | raise ZstdError("cannot use a decompressobj multiple times") | ||
in_buffer = ffi.new("ZSTD_inBuffer *") | ||||
out_buffer = ffi.new("ZSTD_outBuffer *") | ||||
Gregory Szorc
|
r42237 | |||
data_buffer = ffi.from_buffer(data) | ||||
if len(data_buffer) == 0: | ||||
Gregory Szorc
|
r44446 | return b"" | ||
Gregory Szorc
|
r42237 | |||
in_buffer.src = data_buffer | ||||
in_buffer.size = len(data_buffer) | ||||
in_buffer.pos = 0 | ||||
Gregory Szorc
|
r44446 | dst_buffer = ffi.new("char[]", self._write_size) | ||
Gregory Szorc
|
r42237 | out_buffer.dst = dst_buffer | ||
out_buffer.size = len(dst_buffer) | ||||
out_buffer.pos = 0 | ||||
chunks = [] | ||||
while True: | ||||
Gregory Szorc
|
r44446 | zresult = lib.ZSTD_decompressStream( | ||
self._decompressor._dctx, out_buffer, in_buffer | ||||
) | ||||
Gregory Szorc
|
r42237 | if lib.ZSTD_isError(zresult): | ||
Gregory Szorc
|
r44446 | raise ZstdError("zstd decompressor error: %s" % _zstd_error(zresult)) | ||
Gregory Szorc
|
r42237 | |||
if zresult == 0: | ||||
self._finished = True | ||||
self._decompressor = None | ||||
if out_buffer.pos: | ||||
chunks.append(ffi.buffer(out_buffer.dst, out_buffer.pos)[:]) | ||||
Gregory Szorc
|
r44446 | if zresult == 0 or ( | ||
in_buffer.pos == in_buffer.size and out_buffer.pos == 0 | ||||
): | ||||
Gregory Szorc
|
r42237 | break | ||
out_buffer.pos = 0 | ||||
Gregory Szorc
|
r44446 | return b"".join(chunks) | ||
Gregory Szorc
|
r42237 | |||
def flush(self, length=0): | ||||
pass | ||||
class ZstdDecompressionReader(object): | ||||
def __init__(self, decompressor, source, read_size, read_across_frames): | ||||
self._decompressor = decompressor | ||||
self._source = source | ||||
self._read_size = read_size | ||||
self._read_across_frames = bool(read_across_frames) | ||||
self._entered = False | ||||
self._closed = False | ||||
self._bytes_decompressed = 0 | ||||
self._finished_input = False | ||||
self._finished_output = False | ||||
Gregory Szorc
|
r44446 | self._in_buffer = ffi.new("ZSTD_inBuffer *") | ||
Gregory Szorc
|
r42237 | # Holds a ref to self._in_buffer.src. | ||
self._source_buffer = None | ||||
def __enter__(self): | ||||
if self._entered: | ||||
Gregory Szorc
|
r44446 | raise ValueError("cannot __enter__ multiple times") | ||
Gregory Szorc
|
r42237 | |||
self._entered = True | ||||
return self | ||||
def __exit__(self, exc_type, exc_value, exc_tb): | ||||
self._entered = False | ||||
self._closed = True | ||||
self._source = None | ||||
self._decompressor = None | ||||
return False | ||||
def readable(self): | ||||
return True | ||||
def writable(self): | ||||
return False | ||||
def seekable(self): | ||||
return True | ||||
def readline(self): | ||||
raise io.UnsupportedOperation() | ||||
def readlines(self): | ||||
raise io.UnsupportedOperation() | ||||
def write(self, data): | ||||
raise io.UnsupportedOperation() | ||||
def writelines(self, lines): | ||||
raise io.UnsupportedOperation() | ||||
def isatty(self): | ||||
return False | ||||
def flush(self): | ||||
return None | ||||
def close(self): | ||||
self._closed = True | ||||
return None | ||||
@property | ||||
def closed(self): | ||||
return self._closed | ||||
def tell(self): | ||||
return self._bytes_decompressed | ||||
def readall(self): | ||||
chunks = [] | ||||
while True: | ||||
chunk = self.read(1048576) | ||||
if not chunk: | ||||
break | ||||
chunks.append(chunk) | ||||
Gregory Szorc
|
r44446 | return b"".join(chunks) | ||
Gregory Szorc
|
r42237 | |||
def __iter__(self): | ||||
raise io.UnsupportedOperation() | ||||
def __next__(self): | ||||
raise io.UnsupportedOperation() | ||||
next = __next__ | ||||
def _read_input(self): | ||||
# We have data left over in the input buffer. Use it. | ||||
if self._in_buffer.pos < self._in_buffer.size: | ||||
return | ||||
# All input data exhausted. Nothing to do. | ||||
if self._finished_input: | ||||
return | ||||
# Else populate the input buffer from our source. | ||||
Gregory Szorc
|
r44446 | if hasattr(self._source, "read"): | ||
Gregory Szorc
|
r42237 | data = self._source.read(self._read_size) | ||
if not data: | ||||
self._finished_input = True | ||||
return | ||||
self._source_buffer = ffi.from_buffer(data) | ||||
self._in_buffer.src = self._source_buffer | ||||
self._in_buffer.size = len(self._source_buffer) | ||||
self._in_buffer.pos = 0 | ||||
else: | ||||
self._source_buffer = ffi.from_buffer(self._source) | ||||
self._in_buffer.src = self._source_buffer | ||||
self._in_buffer.size = len(self._source_buffer) | ||||
self._in_buffer.pos = 0 | ||||
def _decompress_into_buffer(self, out_buffer): | ||||
"""Decompress available input into an output buffer. | ||||
Returns True if data in output buffer should be emitted. | ||||
""" | ||||
Gregory Szorc
|
r44446 | zresult = lib.ZSTD_decompressStream( | ||
self._decompressor._dctx, out_buffer, self._in_buffer | ||||
) | ||||
Gregory Szorc
|
r42237 | |||
if self._in_buffer.pos == self._in_buffer.size: | ||||
self._in_buffer.src = ffi.NULL | ||||
self._in_buffer.pos = 0 | ||||
self._in_buffer.size = 0 | ||||
self._source_buffer = None | ||||
Gregory Szorc
|
r44446 | if not hasattr(self._source, "read"): | ||
Gregory Szorc
|
r42237 | self._finished_input = True | ||
if lib.ZSTD_isError(zresult): | ||||
Gregory Szorc
|
r44446 | raise ZstdError("zstd decompress error: %s" % _zstd_error(zresult)) | ||
Gregory Szorc
|
r42237 | |||
# Emit data if there is data AND either: | ||||
# a) output buffer is full (read amount is satisfied) | ||||
# b) we're at end of a frame and not in frame spanning mode | ||||
Gregory Szorc
|
r44446 | return out_buffer.pos and ( | ||
out_buffer.pos == out_buffer.size | ||||
or zresult == 0 | ||||
and not self._read_across_frames | ||||
) | ||||
Gregory Szorc
|
r42237 | |||
def read(self, size=-1): | ||||
if self._closed: | ||||
Gregory Szorc
|
r44446 | raise ValueError("stream is closed") | ||
Gregory Szorc
|
r42237 | |||
if size < -1: | ||||
Gregory Szorc
|
r44446 | raise ValueError("cannot read negative amounts less than -1") | ||
Gregory Szorc
|
r42237 | |||
if size == -1: | ||||
# This is recursive. But it gets the job done. | ||||
return self.readall() | ||||
if self._finished_output or size == 0: | ||||
Gregory Szorc
|
r44446 | return b"" | ||
Gregory Szorc
|
r42237 | |||
# We /could/ call into readinto() here. But that introduces more | ||||
# overhead. | ||||
Gregory Szorc
|
r44446 | dst_buffer = ffi.new("char[]", size) | ||
out_buffer = ffi.new("ZSTD_outBuffer *") | ||||
Gregory Szorc
|
r42237 | out_buffer.dst = dst_buffer | ||
out_buffer.size = size | ||||
out_buffer.pos = 0 | ||||
self._read_input() | ||||
if self._decompress_into_buffer(out_buffer): | ||||
self._bytes_decompressed += out_buffer.pos | ||||
return ffi.buffer(out_buffer.dst, out_buffer.pos)[:] | ||||
while not self._finished_input: | ||||
self._read_input() | ||||
if self._decompress_into_buffer(out_buffer): | ||||
self._bytes_decompressed += out_buffer.pos | ||||
return ffi.buffer(out_buffer.dst, out_buffer.pos)[:] | ||||
self._bytes_decompressed += out_buffer.pos | ||||
return ffi.buffer(out_buffer.dst, out_buffer.pos)[:] | ||||
def readinto(self, b): | ||||
if self._closed: | ||||
Gregory Szorc
|
r44446 | raise ValueError("stream is closed") | ||
Gregory Szorc
|
r42237 | |||
if self._finished_output: | ||||
return 0 | ||||
# TODO use writable=True once we require CFFI >= 1.12. | ||||
dest_buffer = ffi.from_buffer(b) | ||||
Gregory Szorc
|
r44446 | ffi.memmove(b, b"", 0) | ||
out_buffer = ffi.new("ZSTD_outBuffer *") | ||||
Gregory Szorc
|
r42237 | out_buffer.dst = dest_buffer | ||
out_buffer.size = len(dest_buffer) | ||||
out_buffer.pos = 0 | ||||
self._read_input() | ||||
if self._decompress_into_buffer(out_buffer): | ||||
self._bytes_decompressed += out_buffer.pos | ||||
return out_buffer.pos | ||||
while not self._finished_input: | ||||
self._read_input() | ||||
if self._decompress_into_buffer(out_buffer): | ||||
self._bytes_decompressed += out_buffer.pos | ||||
return out_buffer.pos | ||||
self._bytes_decompressed += out_buffer.pos | ||||
return out_buffer.pos | ||||
def read1(self, size=-1): | ||||
if self._closed: | ||||
Gregory Szorc
|
r44446 | raise ValueError("stream is closed") | ||
Gregory Szorc
|
r42237 | |||
if size < -1: | ||||
Gregory Szorc
|
r44446 | raise ValueError("cannot read negative amounts less than -1") | ||
Gregory Szorc
|
r42237 | |||
if self._finished_output or size == 0: | ||||
Gregory Szorc
|
r44446 | return b"" | ||
Gregory Szorc
|
r42237 | |||
# -1 returns arbitrary number of bytes. | ||||
if size == -1: | ||||
size = DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE | ||||
Gregory Szorc
|
r44446 | dst_buffer = ffi.new("char[]", size) | ||
out_buffer = ffi.new("ZSTD_outBuffer *") | ||||
Gregory Szorc
|
r42237 | out_buffer.dst = dst_buffer | ||
out_buffer.size = size | ||||
out_buffer.pos = 0 | ||||
# read1() dictates that we can perform at most 1 call to underlying | ||||
# stream to get input. However, we can't satisfy this restriction with | ||||
# decompression because not all input generates output. So we allow | ||||
# multiple read(). But unlike read(), we stop once we have any output. | ||||
while not self._finished_input: | ||||
self._read_input() | ||||
self._decompress_into_buffer(out_buffer) | ||||
if out_buffer.pos: | ||||
break | ||||
self._bytes_decompressed += out_buffer.pos | ||||
return ffi.buffer(out_buffer.dst, out_buffer.pos)[:] | ||||
def readinto1(self, b): | ||||
if self._closed: | ||||
Gregory Szorc
|
r44446 | raise ValueError("stream is closed") | ||
Gregory Szorc
|
r42237 | |||
if self._finished_output: | ||||
return 0 | ||||
# TODO use writable=True once we require CFFI >= 1.12. | ||||
dest_buffer = ffi.from_buffer(b) | ||||
Gregory Szorc
|
r44446 | ffi.memmove(b, b"", 0) | ||
out_buffer = ffi.new("ZSTD_outBuffer *") | ||||
Gregory Szorc
|
r42237 | out_buffer.dst = dest_buffer | ||
out_buffer.size = len(dest_buffer) | ||||
out_buffer.pos = 0 | ||||
while not self._finished_input and not self._finished_output: | ||||
self._read_input() | ||||
self._decompress_into_buffer(out_buffer) | ||||
if out_buffer.pos: | ||||
break | ||||
self._bytes_decompressed += out_buffer.pos | ||||
return out_buffer.pos | ||||
def seek(self, pos, whence=os.SEEK_SET): | ||||
if self._closed: | ||||
Gregory Szorc
|
r44446 | raise ValueError("stream is closed") | ||
Gregory Szorc
|
r42237 | |||
read_amount = 0 | ||||
if whence == os.SEEK_SET: | ||||
if pos < 0: | ||||
Gregory Szorc
|
r44446 | raise ValueError("cannot seek to negative position with SEEK_SET") | ||
Gregory Szorc
|
r42237 | |||
if pos < self._bytes_decompressed: | ||||
Gregory Szorc
|
r44446 | raise ValueError("cannot seek zstd decompression stream " "backwards") | ||
Gregory Szorc
|
r42237 | |||
read_amount = pos - self._bytes_decompressed | ||||
elif whence == os.SEEK_CUR: | ||||
if pos < 0: | ||||
Gregory Szorc
|
r44446 | raise ValueError("cannot seek zstd decompression stream " "backwards") | ||
Gregory Szorc
|
r42237 | |||
read_amount = pos | ||||
elif whence == os.SEEK_END: | ||||
Gregory Szorc
|
r44446 | raise ValueError( | ||
"zstd decompression streams cannot be seeked " "with SEEK_END" | ||||
) | ||||
Gregory Szorc
|
r42237 | |||
while read_amount: | ||||
Gregory Szorc
|
r44446 | result = self.read(min(read_amount, DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE)) | ||
Gregory Szorc
|
r42237 | |||
if not result: | ||||
break | ||||
read_amount -= len(result) | ||||
return self._bytes_decompressed | ||||
Gregory Szorc
|
r44446 | |||
Gregory Szorc
|
r42237 | class ZstdDecompressionWriter(object): | ||
def __init__(self, decompressor, writer, write_size, write_return_read): | ||||
decompressor._ensure_dctx() | ||||
self._decompressor = decompressor | ||||
self._writer = writer | ||||
self._write_size = write_size | ||||
self._write_return_read = bool(write_return_read) | ||||
self._entered = False | ||||
self._closed = False | ||||
def __enter__(self): | ||||
if self._closed: | ||||
Gregory Szorc
|
r44446 | raise ValueError("stream is closed") | ||
Gregory Szorc
|
r42237 | |||
if self._entered: | ||||
Gregory Szorc
|
r44446 | raise ZstdError("cannot __enter__ multiple times") | ||
Gregory Szorc
|
r42237 | |||
self._entered = True | ||||
return self | ||||
def __exit__(self, exc_type, exc_value, exc_tb): | ||||
self._entered = False | ||||
self.close() | ||||
def memory_size(self): | ||||
return lib.ZSTD_sizeof_DCtx(self._decompressor._dctx) | ||||
def close(self): | ||||
if self._closed: | ||||
return | ||||
try: | ||||
self.flush() | ||||
finally: | ||||
self._closed = True | ||||
Gregory Szorc
|
r44446 | f = getattr(self._writer, "close", None) | ||
Gregory Szorc
|
r42237 | if f: | ||
f() | ||||
@property | ||||
def closed(self): | ||||
return self._closed | ||||
def fileno(self): | ||||
Gregory Szorc
|
r44446 | f = getattr(self._writer, "fileno", None) | ||
Gregory Szorc
|
r42237 | if f: | ||
return f() | ||||
else: | ||||
Gregory Szorc
|
r44446 | raise OSError("fileno not available on underlying writer") | ||
Gregory Szorc
|
r42237 | |||
def flush(self): | ||||
if self._closed: | ||||
Gregory Szorc
|
r44446 | raise ValueError("stream is closed") | ||
f = getattr(self._writer, "flush", None) | ||||
Gregory Szorc
|
r42237 | if f: | ||
return f() | ||||
def isatty(self): | ||||
return False | ||||
def readable(self): | ||||
return False | ||||
def readline(self, size=-1): | ||||
raise io.UnsupportedOperation() | ||||
def readlines(self, hint=-1): | ||||
raise io.UnsupportedOperation() | ||||
def seek(self, offset, whence=None): | ||||
raise io.UnsupportedOperation() | ||||
def seekable(self): | ||||
return False | ||||
def tell(self): | ||||
raise io.UnsupportedOperation() | ||||
def truncate(self, size=None): | ||||
raise io.UnsupportedOperation() | ||||
def writable(self): | ||||
return True | ||||
def writelines(self, lines): | ||||
raise io.UnsupportedOperation() | ||||
def read(self, size=-1): | ||||
raise io.UnsupportedOperation() | ||||
def readall(self): | ||||
raise io.UnsupportedOperation() | ||||
def readinto(self, b): | ||||
raise io.UnsupportedOperation() | ||||
def write(self, data): | ||||
if self._closed: | ||||
Gregory Szorc
|
r44446 | raise ValueError("stream is closed") | ||
Gregory Szorc
|
r42237 | |||
total_write = 0 | ||||
Gregory Szorc
|
r44446 | in_buffer = ffi.new("ZSTD_inBuffer *") | ||
out_buffer = ffi.new("ZSTD_outBuffer *") | ||||
Gregory Szorc
|
r42237 | |||
data_buffer = ffi.from_buffer(data) | ||||
in_buffer.src = data_buffer | ||||
in_buffer.size = len(data_buffer) | ||||
in_buffer.pos = 0 | ||||
Gregory Szorc
|
r44446 | dst_buffer = ffi.new("char[]", self._write_size) | ||
Gregory Szorc
|
r42237 | out_buffer.dst = dst_buffer | ||
out_buffer.size = len(dst_buffer) | ||||
out_buffer.pos = 0 | ||||
dctx = self._decompressor._dctx | ||||
while in_buffer.pos < in_buffer.size: | ||||
zresult = lib.ZSTD_decompressStream(dctx, out_buffer, in_buffer) | ||||
if lib.ZSTD_isError(zresult): | ||||
Gregory Szorc
|
r44446 | raise ZstdError("zstd decompress error: %s" % _zstd_error(zresult)) | ||
Gregory Szorc
|
r42237 | |||
if out_buffer.pos: | ||||
self._writer.write(ffi.buffer(out_buffer.dst, out_buffer.pos)[:]) | ||||
total_write += out_buffer.pos | ||||
out_buffer.pos = 0 | ||||
if self._write_return_read: | ||||
return in_buffer.pos | ||||
else: | ||||
return total_write | ||||
class ZstdDecompressor(object): | ||||
def __init__(self, dict_data=None, max_window_size=0, format=FORMAT_ZSTD1): | ||||
self._dict_data = dict_data | ||||
self._max_window_size = max_window_size | ||||
self._format = format | ||||
dctx = lib.ZSTD_createDCtx() | ||||
if dctx == ffi.NULL: | ||||
raise MemoryError() | ||||
self._dctx = dctx | ||||
# Defer setting up garbage collection until full state is loaded so | ||||
# the memory size is more accurate. | ||||
try: | ||||
self._ensure_dctx() | ||||
finally: | ||||
Gregory Szorc
|
r44446 | self._dctx = ffi.gc( | ||
dctx, lib.ZSTD_freeDCtx, size=lib.ZSTD_sizeof_DCtx(dctx) | ||||
) | ||||
Gregory Szorc
|
r42237 | |||
def memory_size(self): | ||||
return lib.ZSTD_sizeof_DCtx(self._dctx) | ||||
def decompress(self, data, max_output_size=0): | ||||
self._ensure_dctx() | ||||
data_buffer = ffi.from_buffer(data) | ||||
output_size = lib.ZSTD_getFrameContentSize(data_buffer, len(data_buffer)) | ||||
if output_size == lib.ZSTD_CONTENTSIZE_ERROR: | ||||
Gregory Szorc
|
r44446 | raise ZstdError("error determining content size from frame header") | ||
Gregory Szorc
|
r42237 | elif output_size == 0: | ||
Gregory Szorc
|
r44446 | return b"" | ||
Gregory Szorc
|
r42237 | elif output_size == lib.ZSTD_CONTENTSIZE_UNKNOWN: | ||
if not max_output_size: | ||||
Gregory Szorc
|
r44446 | raise ZstdError("could not determine content size in frame header") | ||
result_buffer = ffi.new("char[]", max_output_size) | ||||
Gregory Szorc
|
r42237 | result_size = max_output_size | ||
output_size = 0 | ||||
else: | ||||
Gregory Szorc
|
r44446 | result_buffer = ffi.new("char[]", output_size) | ||
Gregory Szorc
|
r42237 | result_size = output_size | ||
Gregory Szorc
|
r44446 | out_buffer = ffi.new("ZSTD_outBuffer *") | ||
Gregory Szorc
|
r42237 | out_buffer.dst = result_buffer | ||
out_buffer.size = result_size | ||||
out_buffer.pos = 0 | ||||
Gregory Szorc
|
r44446 | in_buffer = ffi.new("ZSTD_inBuffer *") | ||
Gregory Szorc
|
r42237 | in_buffer.src = data_buffer | ||
in_buffer.size = len(data_buffer) | ||||
in_buffer.pos = 0 | ||||
zresult = lib.ZSTD_decompressStream(self._dctx, out_buffer, in_buffer) | ||||
if lib.ZSTD_isError(zresult): | ||||
Gregory Szorc
|
r44446 | raise ZstdError("decompression error: %s" % _zstd_error(zresult)) | ||
Gregory Szorc
|
r42237 | elif zresult: | ||
Gregory Szorc
|
r44446 | raise ZstdError("decompression error: did not decompress full frame") | ||
Gregory Szorc
|
r42237 | elif output_size and out_buffer.pos != output_size: | ||
Gregory Szorc
|
r44446 | raise ZstdError( | ||
"decompression error: decompressed %d bytes; expected %d" | ||||
% (zresult, output_size) | ||||
) | ||||
Gregory Szorc
|
r42237 | |||
return ffi.buffer(result_buffer, out_buffer.pos)[:] | ||||
Gregory Szorc
|
r44446 | def stream_reader( | ||
self, | ||||
source, | ||||
read_size=DECOMPRESSION_RECOMMENDED_INPUT_SIZE, | ||||
read_across_frames=False, | ||||
): | ||||
Gregory Szorc
|
r42237 | self._ensure_dctx() | ||
return ZstdDecompressionReader(self, source, read_size, read_across_frames) | ||||
def decompressobj(self, write_size=DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE): | ||||
if write_size < 1: | ||||
Gregory Szorc
|
r44446 | raise ValueError("write_size must be positive") | ||
Gregory Szorc
|
r42237 | |||
self._ensure_dctx() | ||||
return ZstdDecompressionObj(self, write_size=write_size) | ||||
Gregory Szorc
|
r44446 | def read_to_iter( | ||
self, | ||||
reader, | ||||
read_size=DECOMPRESSION_RECOMMENDED_INPUT_SIZE, | ||||
write_size=DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE, | ||||
skip_bytes=0, | ||||
): | ||||
Gregory Szorc
|
r42237 | if skip_bytes >= read_size: | ||
Gregory Szorc
|
r44446 | raise ValueError("skip_bytes must be smaller than read_size") | ||
if hasattr(reader, "read"): | ||||
Gregory Szorc
|
r42237 | have_read = True | ||
Gregory Szorc
|
r44446 | elif hasattr(reader, "__getitem__"): | ||
Gregory Szorc
|
r42237 | have_read = False | ||
buffer_offset = 0 | ||||
size = len(reader) | ||||
else: | ||||
Gregory Szorc
|
r44446 | raise ValueError( | ||
"must pass an object with a read() method or " | ||||
"conforms to buffer protocol" | ||||
) | ||||
Gregory Szorc
|
r42237 | |||
if skip_bytes: | ||||
if have_read: | ||||
reader.read(skip_bytes) | ||||
else: | ||||
if skip_bytes > size: | ||||
Gregory Szorc
|
r44446 | raise ValueError("skip_bytes larger than first input chunk") | ||
Gregory Szorc
|
r42237 | |||
buffer_offset = skip_bytes | ||||
self._ensure_dctx() | ||||
Gregory Szorc
|
r44446 | in_buffer = ffi.new("ZSTD_inBuffer *") | ||
out_buffer = ffi.new("ZSTD_outBuffer *") | ||||
dst_buffer = ffi.new("char[]", write_size) | ||||
Gregory Szorc
|
r42237 | out_buffer.dst = dst_buffer | ||
out_buffer.size = len(dst_buffer) | ||||
out_buffer.pos = 0 | ||||
while True: | ||||
assert out_buffer.pos == 0 | ||||
if have_read: | ||||
read_result = reader.read(read_size) | ||||
else: | ||||
remaining = size - buffer_offset | ||||
slice_size = min(remaining, read_size) | ||||
Gregory Szorc
|
r44446 | read_result = reader[buffer_offset : buffer_offset + slice_size] | ||
Gregory Szorc
|
r42237 | buffer_offset += slice_size | ||
# No new input. Break out of read loop. | ||||
if not read_result: | ||||
break | ||||
# Feed all read data into decompressor and emit output until | ||||
# exhausted. | ||||
read_buffer = ffi.from_buffer(read_result) | ||||
in_buffer.src = read_buffer | ||||
in_buffer.size = len(read_buffer) | ||||
in_buffer.pos = 0 | ||||
while in_buffer.pos < in_buffer.size: | ||||
assert out_buffer.pos == 0 | ||||
zresult = lib.ZSTD_decompressStream(self._dctx, out_buffer, in_buffer) | ||||
if lib.ZSTD_isError(zresult): | ||||
Gregory Szorc
|
r44446 | raise ZstdError("zstd decompress error: %s" % _zstd_error(zresult)) | ||
Gregory Szorc
|
r42237 | |||
if out_buffer.pos: | ||||
data = ffi.buffer(out_buffer.dst, out_buffer.pos)[:] | ||||
out_buffer.pos = 0 | ||||
yield data | ||||
if zresult == 0: | ||||
return | ||||
# Repeat loop to collect more input data. | ||||
continue | ||||
# If we get here, input is exhausted. | ||||
read_from = read_to_iter | ||||
Gregory Szorc
|
r44446 | def stream_writer( | ||
self, | ||||
writer, | ||||
write_size=DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE, | ||||
write_return_read=False, | ||||
): | ||||
if not hasattr(writer, "write"): | ||||
raise ValueError("must pass an object with a write() method") | ||||
return ZstdDecompressionWriter(self, writer, write_size, write_return_read) | ||||
Gregory Szorc
|
r42237 | |||
write_to = stream_writer | ||||
Gregory Szorc
|
r44446 | def copy_stream( | ||
self, | ||||
ifh, | ||||
ofh, | ||||
read_size=DECOMPRESSION_RECOMMENDED_INPUT_SIZE, | ||||
write_size=DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE, | ||||
): | ||||
if not hasattr(ifh, "read"): | ||||
raise ValueError("first argument must have a read() method") | ||||
if not hasattr(ofh, "write"): | ||||
raise ValueError("second argument must have a write() method") | ||||
Gregory Szorc
|
r42237 | |||
self._ensure_dctx() | ||||
Gregory Szorc
|
r44446 | in_buffer = ffi.new("ZSTD_inBuffer *") | ||
out_buffer = ffi.new("ZSTD_outBuffer *") | ||||
dst_buffer = ffi.new("char[]", write_size) | ||||
Gregory Szorc
|
r42237 | out_buffer.dst = dst_buffer | ||
out_buffer.size = write_size | ||||
out_buffer.pos = 0 | ||||
total_read, total_write = 0, 0 | ||||
# Read all available input. | ||||
while True: | ||||
data = ifh.read(read_size) | ||||
if not data: | ||||
break | ||||
data_buffer = ffi.from_buffer(data) | ||||
total_read += len(data_buffer) | ||||
in_buffer.src = data_buffer | ||||
in_buffer.size = len(data_buffer) | ||||
in_buffer.pos = 0 | ||||
# Flush all read data to output. | ||||
while in_buffer.pos < in_buffer.size: | ||||
zresult = lib.ZSTD_decompressStream(self._dctx, out_buffer, in_buffer) | ||||
if lib.ZSTD_isError(zresult): | ||||
Gregory Szorc
|
r44446 | raise ZstdError( | ||
"zstd decompressor error: %s" % _zstd_error(zresult) | ||||
) | ||||
Gregory Szorc
|
r42237 | |||
if out_buffer.pos: | ||||
ofh.write(ffi.buffer(out_buffer.dst, out_buffer.pos)) | ||||
total_write += out_buffer.pos | ||||
out_buffer.pos = 0 | ||||
# Continue loop to keep reading. | ||||
return total_read, total_write | ||||
def decompress_content_dict_chain(self, frames): | ||||
if not isinstance(frames, list): | ||||
Gregory Szorc
|
r44446 | raise TypeError("argument must be a list") | ||
Gregory Szorc
|
r42237 | |||
if not frames: | ||||
Gregory Szorc
|
r44446 | raise ValueError("empty input chain") | ||
Gregory Szorc
|
r42237 | |||
# First chunk should not be using a dictionary. We handle it specially. | ||||
chunk = frames[0] | ||||
if not isinstance(chunk, bytes_type): | ||||
Gregory Szorc
|
r44446 | raise ValueError("chunk 0 must be bytes") | ||
Gregory Szorc
|
r42237 | |||
# All chunks should be zstd frames and should have content size set. | ||||
chunk_buffer = ffi.from_buffer(chunk) | ||||
Gregory Szorc
|
r44446 | params = ffi.new("ZSTD_frameHeader *") | ||
Gregory Szorc
|
r42237 | zresult = lib.ZSTD_getFrameHeader(params, chunk_buffer, len(chunk_buffer)) | ||
if lib.ZSTD_isError(zresult): | ||||
Gregory Szorc
|
r44446 | raise ValueError("chunk 0 is not a valid zstd frame") | ||
Gregory Szorc
|
r42237 | elif zresult: | ||
Gregory Szorc
|
r44446 | raise ValueError("chunk 0 is too small to contain a zstd frame") | ||
Gregory Szorc
|
r42237 | |||
if params.frameContentSize == lib.ZSTD_CONTENTSIZE_UNKNOWN: | ||||
Gregory Szorc
|
r44446 | raise ValueError("chunk 0 missing content size in frame") | ||
Gregory Szorc
|
r42237 | |||
self._ensure_dctx(load_dict=False) | ||||
Gregory Szorc
|
r44446 | last_buffer = ffi.new("char[]", params.frameContentSize) | ||
out_buffer = ffi.new("ZSTD_outBuffer *") | ||||
Gregory Szorc
|
r42237 | out_buffer.dst = last_buffer | ||
out_buffer.size = len(last_buffer) | ||||
out_buffer.pos = 0 | ||||
Gregory Szorc
|
r44446 | in_buffer = ffi.new("ZSTD_inBuffer *") | ||
Gregory Szorc
|
r42237 | in_buffer.src = chunk_buffer | ||
in_buffer.size = len(chunk_buffer) | ||||
in_buffer.pos = 0 | ||||
zresult = lib.ZSTD_decompressStream(self._dctx, out_buffer, in_buffer) | ||||
if lib.ZSTD_isError(zresult): | ||||
Gregory Szorc
|
r44446 | raise ZstdError("could not decompress chunk 0: %s" % _zstd_error(zresult)) | ||
Gregory Szorc
|
r42237 | elif zresult: | ||
Gregory Szorc
|
r44446 | raise ZstdError("chunk 0 did not decompress full frame") | ||
Gregory Szorc
|
r42237 | |||
# Special case of chain length of 1 | ||||
if len(frames) == 1: | ||||
return ffi.buffer(last_buffer, len(last_buffer))[:] | ||||
i = 1 | ||||
while i < len(frames): | ||||
chunk = frames[i] | ||||
if not isinstance(chunk, bytes_type): | ||||
Gregory Szorc
|
r44446 | raise ValueError("chunk %d must be bytes" % i) | ||
Gregory Szorc
|
r42237 | |||
chunk_buffer = ffi.from_buffer(chunk) | ||||
zresult = lib.ZSTD_getFrameHeader(params, chunk_buffer, len(chunk_buffer)) | ||||
if lib.ZSTD_isError(zresult): | ||||
Gregory Szorc
|
r44446 | raise ValueError("chunk %d is not a valid zstd frame" % i) | ||
Gregory Szorc
|
r42237 | elif zresult: | ||
Gregory Szorc
|
r44446 | raise ValueError("chunk %d is too small to contain a zstd frame" % i) | ||
Gregory Szorc
|
r42237 | |||
if params.frameContentSize == lib.ZSTD_CONTENTSIZE_UNKNOWN: | ||||
Gregory Szorc
|
r44446 | raise ValueError("chunk %d missing content size in frame" % i) | ||
dest_buffer = ffi.new("char[]", params.frameContentSize) | ||||
Gregory Szorc
|
r42237 | |||
out_buffer.dst = dest_buffer | ||||
out_buffer.size = len(dest_buffer) | ||||
out_buffer.pos = 0 | ||||
in_buffer.src = chunk_buffer | ||||
in_buffer.size = len(chunk_buffer) | ||||
in_buffer.pos = 0 | ||||
zresult = lib.ZSTD_decompressStream(self._dctx, out_buffer, in_buffer) | ||||
if lib.ZSTD_isError(zresult): | ||||
Gregory Szorc
|
r44446 | raise ZstdError( | ||
"could not decompress chunk %d: %s" % _zstd_error(zresult) | ||||
) | ||||
Gregory Szorc
|
r42237 | elif zresult: | ||
Gregory Szorc
|
r44446 | raise ZstdError("chunk %d did not decompress full frame" % i) | ||
Gregory Szorc
|
r42237 | |||
last_buffer = dest_buffer | ||||
i += 1 | ||||
return ffi.buffer(last_buffer, len(last_buffer))[:] | ||||
def _ensure_dctx(self, load_dict=True): | ||||
lib.ZSTD_DCtx_reset(self._dctx, lib.ZSTD_reset_session_only) | ||||
if self._max_window_size: | ||||
Gregory Szorc
|
r44446 | zresult = lib.ZSTD_DCtx_setMaxWindowSize(self._dctx, self._max_window_size) | ||
Gregory Szorc
|
r42237 | if lib.ZSTD_isError(zresult): | ||
Gregory Szorc
|
r44446 | raise ZstdError( | ||
"unable to set max window size: %s" % _zstd_error(zresult) | ||||
) | ||||
Gregory Szorc
|
r42237 | |||
zresult = lib.ZSTD_DCtx_setFormat(self._dctx, self._format) | ||||
if lib.ZSTD_isError(zresult): | ||||
Gregory Szorc
|
r44446 | raise ZstdError("unable to set decoding format: %s" % _zstd_error(zresult)) | ||
Gregory Szorc
|
r42237 | |||
if self._dict_data and load_dict: | ||||
zresult = lib.ZSTD_DCtx_refDDict(self._dctx, self._dict_data._ddict) | ||||
if lib.ZSTD_isError(zresult): | ||||
Gregory Szorc
|
r44446 | raise ZstdError( | ||
"unable to reference prepared dictionary: %s" % _zstd_error(zresult) | ||||
) | ||||