zstd_cffi.py
1866 lines
| 61.7 KiB
| text/x-python
|
PythonLexer
Gregory Szorc
|
r30435 | # Copyright (c) 2016-present, Gregory Szorc | ||
# All rights reserved. | ||||
# | ||||
# This software may be modified and distributed under the terms | ||||
# of the BSD license. See the LICENSE file for details. | ||||
"""Python interface to the Zstandard (zstd) compression library.""" | ||||
from __future__ import absolute_import, unicode_literals | ||||
Gregory Szorc
|
r37513 | # This should match what the C extension exports. | ||
__all__ = [ | ||||
#'BufferSegment', | ||||
#'BufferSegments', | ||||
#'BufferWithSegments', | ||||
#'BufferWithSegmentsCollection', | ||||
'CompressionParameters', | ||||
'ZstdCompressionDict', | ||||
'ZstdCompressionParameters', | ||||
'ZstdCompressor', | ||||
'ZstdError', | ||||
'ZstdDecompressor', | ||||
'FrameParameters', | ||||
'estimate_decompression_context_size', | ||||
'frame_content_size', | ||||
'frame_header_size', | ||||
'get_frame_parameters', | ||||
'train_dictionary', | ||||
# Constants. | ||||
'COMPRESSOBJ_FLUSH_FINISH', | ||||
'COMPRESSOBJ_FLUSH_BLOCK', | ||||
'ZSTD_VERSION', | ||||
'FRAME_HEADER', | ||||
'CONTENTSIZE_UNKNOWN', | ||||
'CONTENTSIZE_ERROR', | ||||
'MAX_COMPRESSION_LEVEL', | ||||
'COMPRESSION_RECOMMENDED_INPUT_SIZE', | ||||
'COMPRESSION_RECOMMENDED_OUTPUT_SIZE', | ||||
'DECOMPRESSION_RECOMMENDED_INPUT_SIZE', | ||||
'DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE', | ||||
'MAGIC_NUMBER', | ||||
'WINDOWLOG_MIN', | ||||
'WINDOWLOG_MAX', | ||||
'CHAINLOG_MIN', | ||||
'CHAINLOG_MAX', | ||||
'HASHLOG_MIN', | ||||
'HASHLOG_MAX', | ||||
'HASHLOG3_MAX', | ||||
'SEARCHLOG_MIN', | ||||
'SEARCHLOG_MAX', | ||||
'SEARCHLENGTH_MIN', | ||||
'SEARCHLENGTH_MAX', | ||||
'TARGETLENGTH_MIN', | ||||
'LDM_MINMATCH_MIN', | ||||
'LDM_MINMATCH_MAX', | ||||
'LDM_BUCKETSIZELOG_MAX', | ||||
'STRATEGY_FAST', | ||||
'STRATEGY_DFAST', | ||||
'STRATEGY_GREEDY', | ||||
'STRATEGY_LAZY', | ||||
'STRATEGY_LAZY2', | ||||
'STRATEGY_BTLAZY2', | ||||
'STRATEGY_BTOPT', | ||||
'STRATEGY_BTULTRA', | ||||
'DICT_TYPE_AUTO', | ||||
'DICT_TYPE_RAWCONTENT', | ||||
'DICT_TYPE_FULLDICT', | ||||
'FORMAT_ZSTD1', | ||||
'FORMAT_ZSTD1_MAGICLESS', | ||||
] | ||||
import io | ||||
Gregory Szorc
|
r31796 | import os | ||
Gregory Szorc
|
r30895 | import sys | ||
Gregory Szorc
|
r30435 | |||
from _zstd_cffi import ( | ||||
ffi, | ||||
lib, | ||||
) | ||||
Gregory Szorc
|
r30895 | if sys.version_info[0] == 2: | ||
bytes_type = str | ||||
int_type = long | ||||
else: | ||||
bytes_type = bytes | ||||
int_type = int | ||||
Gregory Szorc
|
r30435 | |||
Gregory Szorc
|
r30895 | |||
COMPRESSION_RECOMMENDED_INPUT_SIZE = lib.ZSTD_CStreamInSize() | ||||
COMPRESSION_RECOMMENDED_OUTPUT_SIZE = lib.ZSTD_CStreamOutSize() | ||||
DECOMPRESSION_RECOMMENDED_INPUT_SIZE = lib.ZSTD_DStreamInSize() | ||||
DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE = lib.ZSTD_DStreamOutSize() | ||||
new_nonzero = ffi.new_allocator(should_clear_after_alloc=False) | ||||
MAX_COMPRESSION_LEVEL = lib.ZSTD_maxCLevel() | ||||
MAGIC_NUMBER = lib.ZSTD_MAGICNUMBER | ||||
FRAME_HEADER = b'\x28\xb5\x2f\xfd' | ||||
Gregory Szorc
|
r37513 | CONTENTSIZE_UNKNOWN = lib.ZSTD_CONTENTSIZE_UNKNOWN | ||
CONTENTSIZE_ERROR = lib.ZSTD_CONTENTSIZE_ERROR | ||||
Gregory Szorc
|
r30895 | ZSTD_VERSION = (lib.ZSTD_VERSION_MAJOR, lib.ZSTD_VERSION_MINOR, lib.ZSTD_VERSION_RELEASE) | ||
WINDOWLOG_MIN = lib.ZSTD_WINDOWLOG_MIN | ||||
WINDOWLOG_MAX = lib.ZSTD_WINDOWLOG_MAX | ||||
CHAINLOG_MIN = lib.ZSTD_CHAINLOG_MIN | ||||
CHAINLOG_MAX = lib.ZSTD_CHAINLOG_MAX | ||||
HASHLOG_MIN = lib.ZSTD_HASHLOG_MIN | ||||
HASHLOG_MAX = lib.ZSTD_HASHLOG_MAX | ||||
HASHLOG3_MAX = lib.ZSTD_HASHLOG3_MAX | ||||
SEARCHLOG_MIN = lib.ZSTD_SEARCHLOG_MIN | ||||
SEARCHLOG_MAX = lib.ZSTD_SEARCHLOG_MAX | ||||
SEARCHLENGTH_MIN = lib.ZSTD_SEARCHLENGTH_MIN | ||||
SEARCHLENGTH_MAX = lib.ZSTD_SEARCHLENGTH_MAX | ||||
TARGETLENGTH_MIN = lib.ZSTD_TARGETLENGTH_MIN | ||||
Gregory Szorc
|
r37513 | LDM_MINMATCH_MIN = lib.ZSTD_LDM_MINMATCH_MIN | ||
LDM_MINMATCH_MAX = lib.ZSTD_LDM_MINMATCH_MAX | ||||
LDM_BUCKETSIZELOG_MAX = lib.ZSTD_LDM_BUCKETSIZELOG_MAX | ||||
Gregory Szorc
|
r30895 | |||
STRATEGY_FAST = lib.ZSTD_fast | ||||
STRATEGY_DFAST = lib.ZSTD_dfast | ||||
STRATEGY_GREEDY = lib.ZSTD_greedy | ||||
STRATEGY_LAZY = lib.ZSTD_lazy | ||||
STRATEGY_LAZY2 = lib.ZSTD_lazy2 | ||||
STRATEGY_BTLAZY2 = lib.ZSTD_btlazy2 | ||||
STRATEGY_BTOPT = lib.ZSTD_btopt | ||||
Gregory Szorc
|
r37513 | STRATEGY_BTULTRA = lib.ZSTD_btultra | ||
DICT_TYPE_AUTO = lib.ZSTD_dct_auto | ||||
DICT_TYPE_RAWCONTENT = lib.ZSTD_dct_rawContent | ||||
DICT_TYPE_FULLDICT = lib.ZSTD_dct_fullDict | ||||
FORMAT_ZSTD1 = lib.ZSTD_f_zstd1 | ||||
FORMAT_ZSTD1_MAGICLESS = lib.ZSTD_f_zstd1_magicless | ||||
Gregory Szorc
|
r30895 | |||
COMPRESSOBJ_FLUSH_FINISH = 0 | ||||
COMPRESSOBJ_FLUSH_BLOCK = 1 | ||||
Gregory Szorc
|
r31796 | def _cpu_count(): | ||
# os.cpu_count() was introducd in Python 3.4. | ||||
try: | ||||
return os.cpu_count() or 0 | ||||
except AttributeError: | ||||
pass | ||||
# Linux. | ||||
try: | ||||
if sys.version_info[0] == 2: | ||||
return os.sysconf(b'SC_NPROCESSORS_ONLN') | ||||
else: | ||||
return os.sysconf(u'SC_NPROCESSORS_ONLN') | ||||
except (AttributeError, ValueError): | ||||
pass | ||||
# TODO implement on other platforms. | ||||
return 0 | ||||
Gregory Szorc
|
r30895 | class ZstdError(Exception): | ||
pass | ||||
Gregory Szorc
|
r30435 | |||
Gregory Szorc
|
r37513 | def _zstd_error(zresult): | ||
# Resolves to bytes on Python 2 and 3. We use the string for formatting | ||||
# into error messages, which will be literal unicode. So convert it to | ||||
# unicode. | ||||
return ffi.string(lib.ZSTD_getErrorName(zresult)).decode('utf-8') | ||||
Gregory Szorc
|
r30895 | |||
Gregory Szorc
|
r37513 | def _make_cctx_params(params): | ||
res = lib.ZSTD_createCCtxParams() | ||||
if res == ffi.NULL: | ||||
raise MemoryError() | ||||
res = ffi.gc(res, lib.ZSTD_freeCCtxParams) | ||||
Gregory Szorc
|
r30895 | |||
Gregory Szorc
|
r37513 | attrs = [ | ||
(lib.ZSTD_p_format, params.format), | ||||
(lib.ZSTD_p_compressionLevel, params.compression_level), | ||||
(lib.ZSTD_p_windowLog, params.window_log), | ||||
(lib.ZSTD_p_hashLog, params.hash_log), | ||||
(lib.ZSTD_p_chainLog, params.chain_log), | ||||
(lib.ZSTD_p_searchLog, params.search_log), | ||||
(lib.ZSTD_p_minMatch, params.min_match), | ||||
(lib.ZSTD_p_targetLength, params.target_length), | ||||
(lib.ZSTD_p_compressionStrategy, params.compression_strategy), | ||||
(lib.ZSTD_p_contentSizeFlag, params.write_content_size), | ||||
(lib.ZSTD_p_checksumFlag, params.write_checksum), | ||||
(lib.ZSTD_p_dictIDFlag, params.write_dict_id), | ||||
(lib.ZSTD_p_nbWorkers, params.threads), | ||||
(lib.ZSTD_p_jobSize, params.job_size), | ||||
(lib.ZSTD_p_overlapSizeLog, params.overlap_size_log), | ||||
(lib.ZSTD_p_compressLiterals, params.compress_literals), | ||||
(lib.ZSTD_p_forceMaxWindow, params.force_max_window), | ||||
(lib.ZSTD_p_enableLongDistanceMatching, params.enable_ldm), | ||||
(lib.ZSTD_p_ldmHashLog, params.ldm_hash_log), | ||||
(lib.ZSTD_p_ldmMinMatch, params.ldm_min_match), | ||||
(lib.ZSTD_p_ldmBucketSizeLog, params.ldm_bucket_size_log), | ||||
(lib.ZSTD_p_ldmHashEveryLog, params.ldm_hash_every_log), | ||||
] | ||||
Gregory Szorc
|
r30895 | |||
Gregory Szorc
|
r37513 | for param, value in attrs: | ||
_set_compression_parameter(res, param, value) | ||||
return res | ||||
Gregory Szorc
|
r30895 | |||
Gregory Szorc
|
r37513 | class ZstdCompressionParameters(object): | ||
@staticmethod | ||||
def from_level(level, source_size=0, dict_size=0, **kwargs): | ||||
params = lib.ZSTD_getCParams(level, source_size, dict_size) | ||||
Gregory Szorc
|
r30895 | |||
Gregory Szorc
|
r37513 | args = { | ||
'window_log': 'windowLog', | ||||
'chain_log': 'chainLog', | ||||
'hash_log': 'hashLog', | ||||
'search_log': 'searchLog', | ||||
'min_match': 'searchLength', | ||||
'target_length': 'targetLength', | ||||
'compression_strategy': 'strategy', | ||||
} | ||||
for arg, attr in args.items(): | ||||
if arg not in kwargs: | ||||
kwargs[arg] = getattr(params, attr) | ||||
if 'compress_literals' not in kwargs: | ||||
kwargs['compress_literals'] = 1 if level >= 0 else 0 | ||||
Gregory Szorc
|
r30895 | |||
Gregory Szorc
|
r37513 | return ZstdCompressionParameters(**kwargs) | ||
def __init__(self, format=0, compression_level=0, window_log=0, hash_log=0, | ||||
chain_log=0, search_log=0, min_match=0, target_length=0, | ||||
compression_strategy=0, write_content_size=1, write_checksum=0, | ||||
write_dict_id=0, job_size=0, overlap_size_log=0, | ||||
force_max_window=0, enable_ldm=0, ldm_hash_log=0, | ||||
ldm_min_match=0, ldm_bucket_size_log=0, ldm_hash_every_log=0, | ||||
threads=0, compress_literals=None): | ||||
Gregory Szorc
|
r30895 | |||
Gregory Szorc
|
r37513 | if threads < 0: | ||
threads = _cpu_count() | ||||
if compress_literals is None: | ||||
compress_literals = compression_level >= 0 | ||||
self.format = format | ||||
self.compression_level = compression_level | ||||
Gregory Szorc
|
r30895 | self.window_log = window_log | ||
self.hash_log = hash_log | ||||
Gregory Szorc
|
r37513 | self.chain_log = chain_log | ||
Gregory Szorc
|
r30895 | self.search_log = search_log | ||
Gregory Szorc
|
r37513 | self.min_match = min_match | ||
Gregory Szorc
|
r30895 | self.target_length = target_length | ||
Gregory Szorc
|
r37513 | self.compression_strategy = compression_strategy | ||
self.write_content_size = write_content_size | ||||
self.write_checksum = write_checksum | ||||
self.write_dict_id = write_dict_id | ||||
self.job_size = job_size | ||||
self.overlap_size_log = overlap_size_log | ||||
self.compress_literals = compress_literals | ||||
self.force_max_window = force_max_window | ||||
self.enable_ldm = enable_ldm | ||||
self.ldm_hash_log = ldm_hash_log | ||||
self.ldm_min_match = ldm_min_match | ||||
self.ldm_bucket_size_log = ldm_bucket_size_log | ||||
self.ldm_hash_every_log = ldm_hash_every_log | ||||
self.threads = threads | ||||
Gregory Szorc
|
r30895 | |||
Gregory Szorc
|
r37513 | self.params = _make_cctx_params(self) | ||
Gregory Szorc
|
r31796 | |||
def estimated_compression_context_size(self): | ||||
Gregory Szorc
|
r37513 | return lib.ZSTD_estimateCCtxSize_usingCCtxParams(self.params) | ||
Gregory Szorc
|
r30895 | |||
Gregory Szorc
|
r37513 | CompressionParameters = ZstdCompressionParameters | ||
Gregory Szorc
|
r30895 | |||
def estimate_decompression_context_size(): | ||||
return lib.ZSTD_estimateDCtxSize() | ||||
Gregory Szorc
|
r37513 | def _set_compression_parameter(params, param, value): | ||
zresult = lib.ZSTD_CCtxParam_setParameter(params, param, | ||||
ffi.cast('unsigned', value)) | ||||
if lib.ZSTD_isError(zresult): | ||||
raise ZstdError('unable to set compression context parameter: %s' % | ||||
_zstd_error(zresult)) | ||||
Gregory Szorc
|
r30895 | class ZstdCompressionWriter(object): | ||
def __init__(self, compressor, writer, source_size, write_size): | ||||
self._compressor = compressor | ||||
Gregory Szorc
|
r30435 | self._writer = writer | ||
Gregory Szorc
|
r30895 | self._source_size = source_size | ||
self._write_size = write_size | ||||
self._entered = False | ||||
Gregory Szorc
|
r37513 | self._bytes_compressed = 0 | ||
Gregory Szorc
|
r30435 | |||
def __enter__(self): | ||||
Gregory Szorc
|
r30895 | if self._entered: | ||
raise ZstdError('cannot __enter__ multiple times') | ||||
Gregory Szorc
|
r37513 | zresult = lib.ZSTD_CCtx_setPledgedSrcSize(self._compressor._cctx, | ||
self._source_size) | ||||
if lib.ZSTD_isError(zresult): | ||||
raise ZstdError('error setting source size: %s' % | ||||
_zstd_error(zresult)) | ||||
Gregory Szorc
|
r30895 | self._entered = True | ||
Gregory Szorc
|
r30435 | return self | ||
def __exit__(self, exc_type, exc_value, exc_tb): | ||||
Gregory Szorc
|
r30895 | self._entered = False | ||
Gregory Szorc
|
r30435 | if not exc_type and not exc_value and not exc_tb: | ||
Gregory Szorc
|
r37513 | dst_buffer = ffi.new('char[]', self._write_size) | ||
Gregory Szorc
|
r30435 | out_buffer = ffi.new('ZSTD_outBuffer *') | ||
Gregory Szorc
|
r37513 | in_buffer = ffi.new('ZSTD_inBuffer *') | ||
Gregory Szorc
|
r30895 | out_buffer.dst = dst_buffer | ||
Gregory Szorc
|
r37513 | out_buffer.size = len(dst_buffer) | ||
Gregory Szorc
|
r30435 | out_buffer.pos = 0 | ||
Gregory Szorc
|
r37513 | in_buffer.src = ffi.NULL | ||
in_buffer.size = 0 | ||||
in_buffer.pos = 0 | ||||
Gregory Szorc
|
r30435 | while True: | ||
Gregory Szorc
|
r37513 | zresult = lib.ZSTD_compress_generic(self._compressor._cctx, | ||
out_buffer, in_buffer, | ||||
lib.ZSTD_e_end) | ||||
Gregory Szorc
|
r30895 | if lib.ZSTD_isError(zresult): | ||
raise ZstdError('error ending compression stream: %s' % | ||||
Gregory Szorc
|
r37513 | _zstd_error(zresult)) | ||
Gregory Szorc
|
r30435 | |||
if out_buffer.pos: | ||||
Gregory Szorc
|
r30895 | self._writer.write(ffi.buffer(out_buffer.dst, out_buffer.pos)[:]) | ||
Gregory Szorc
|
r30435 | out_buffer.pos = 0 | ||
Gregory Szorc
|
r30895 | if zresult == 0: | ||
Gregory Szorc
|
r30435 | break | ||
Gregory Szorc
|
r30895 | self._compressor = None | ||
Gregory Szorc
|
r30435 | return False | ||
Gregory Szorc
|
r30895 | def memory_size(self): | ||
if not self._entered: | ||||
raise ZstdError('cannot determine size of an inactive compressor; ' | ||||
'call when a context manager is active') | ||||
Gregory Szorc
|
r37513 | return lib.ZSTD_sizeof_CCtx(self._compressor._cctx) | ||
Gregory Szorc
|
r30895 | |||
Gregory Szorc
|
r30435 | def write(self, data): | ||
Gregory Szorc
|
r30895 | if not self._entered: | ||
raise ZstdError('write() must be called from an active context ' | ||||
'manager') | ||||
total_write = 0 | ||||
data_buffer = ffi.from_buffer(data) | ||||
in_buffer = ffi.new('ZSTD_inBuffer *') | ||||
in_buffer.src = data_buffer | ||||
in_buffer.size = len(data_buffer) | ||||
in_buffer.pos = 0 | ||||
Gregory Szorc
|
r30435 | out_buffer = ffi.new('ZSTD_outBuffer *') | ||
Gregory Szorc
|
r30895 | dst_buffer = ffi.new('char[]', self._write_size) | ||
out_buffer.dst = dst_buffer | ||||
out_buffer.size = self._write_size | ||||
out_buffer.pos = 0 | ||||
while in_buffer.pos < in_buffer.size: | ||||
Gregory Szorc
|
r37513 | zresult = lib.ZSTD_compress_generic(self._compressor._cctx, | ||
out_buffer, in_buffer, | ||||
lib.ZSTD_e_continue) | ||||
Gregory Szorc
|
r30895 | if lib.ZSTD_isError(zresult): | ||
raise ZstdError('zstd compress error: %s' % | ||||
Gregory Szorc
|
r37513 | _zstd_error(zresult)) | ||
Gregory Szorc
|
r30895 | |||
if out_buffer.pos: | ||||
self._writer.write(ffi.buffer(out_buffer.dst, out_buffer.pos)[:]) | ||||
total_write += out_buffer.pos | ||||
Gregory Szorc
|
r37513 | self._bytes_compressed += out_buffer.pos | ||
Gregory Szorc
|
r30895 | out_buffer.pos = 0 | ||
return total_write | ||||
def flush(self): | ||||
if not self._entered: | ||||
raise ZstdError('flush must be called from an active context manager') | ||||
total_write = 0 | ||||
out_buffer = ffi.new('ZSTD_outBuffer *') | ||||
dst_buffer = ffi.new('char[]', self._write_size) | ||||
out_buffer.dst = dst_buffer | ||||
out_buffer.size = self._write_size | ||||
Gregory Szorc
|
r30435 | out_buffer.pos = 0 | ||
Gregory Szorc
|
r37513 | in_buffer = ffi.new('ZSTD_inBuffer *') | ||
in_buffer.src = ffi.NULL | ||||
in_buffer.size = 0 | ||||
in_buffer.pos = 0 | ||||
Gregory Szorc
|
r30895 | while True: | ||
Gregory Szorc
|
r37513 | zresult = lib.ZSTD_compress_generic(self._compressor._cctx, | ||
out_buffer, in_buffer, | ||||
lib.ZSTD_e_flush) | ||||
Gregory Szorc
|
r30895 | if lib.ZSTD_isError(zresult): | ||
raise ZstdError('zstd compress error: %s' % | ||||
Gregory Szorc
|
r37513 | _zstd_error(zresult)) | ||
Gregory Szorc
|
r30895 | |||
if not out_buffer.pos: | ||||
break | ||||
self._writer.write(ffi.buffer(out_buffer.dst, out_buffer.pos)[:]) | ||||
total_write += out_buffer.pos | ||||
Gregory Szorc
|
r37513 | self._bytes_compressed += out_buffer.pos | ||
Gregory Szorc
|
r30895 | out_buffer.pos = 0 | ||
return total_write | ||||
Gregory Szorc
|
r37513 | def tell(self): | ||
return self._bytes_compressed | ||||
Gregory Szorc
|
r30895 | |||
class ZstdCompressionObj(object): | ||||
def compress(self, data): | ||||
if self._finished: | ||||
raise ZstdError('cannot call compress() after compressor finished') | ||||
data_buffer = ffi.from_buffer(data) | ||||
source = ffi.new('ZSTD_inBuffer *') | ||||
source.src = data_buffer | ||||
source.size = len(data_buffer) | ||||
source.pos = 0 | ||||
chunks = [] | ||||
while source.pos < len(data): | ||||
Gregory Szorc
|
r37513 | zresult = lib.ZSTD_compress_generic(self._compressor._cctx, | ||
self._out, | ||||
source, | ||||
lib.ZSTD_e_continue) | ||||
Gregory Szorc
|
r30895 | if lib.ZSTD_isError(zresult): | ||
raise ZstdError('zstd compress error: %s' % | ||||
Gregory Szorc
|
r37513 | _zstd_error(zresult)) | ||
Gregory Szorc
|
r30895 | |||
if self._out.pos: | ||||
chunks.append(ffi.buffer(self._out.dst, self._out.pos)[:]) | ||||
self._out.pos = 0 | ||||
return b''.join(chunks) | ||||
Gregory Szorc
|
r30435 | |||
Gregory Szorc
|
r30895 | def flush(self, flush_mode=COMPRESSOBJ_FLUSH_FINISH): | ||
if flush_mode not in (COMPRESSOBJ_FLUSH_FINISH, COMPRESSOBJ_FLUSH_BLOCK): | ||||
raise ValueError('flush mode not recognized') | ||||
if self._finished: | ||||
raise ZstdError('compressor object already finished') | ||||
assert self._out.pos == 0 | ||||
Gregory Szorc
|
r37513 | in_buffer = ffi.new('ZSTD_inBuffer *') | ||
in_buffer.src = ffi.NULL | ||||
in_buffer.size = 0 | ||||
in_buffer.pos = 0 | ||||
Gregory Szorc
|
r30895 | if flush_mode == COMPRESSOBJ_FLUSH_BLOCK: | ||
Gregory Szorc
|
r37513 | zresult = lib.ZSTD_compress_generic(self._compressor._cctx, | ||
self._out, | ||||
in_buffer, | ||||
lib.ZSTD_e_flush) | ||||
Gregory Szorc
|
r30895 | if lib.ZSTD_isError(zresult): | ||
raise ZstdError('zstd compress error: %s' % | ||||
Gregory Szorc
|
r37513 | _zstd_error(zresult)) | ||
Gregory Szorc
|
r30895 | |||
# Output buffer is guaranteed to hold full block. | ||||
assert zresult == 0 | ||||
if self._out.pos: | ||||
result = ffi.buffer(self._out.dst, self._out.pos)[:] | ||||
self._out.pos = 0 | ||||
return result | ||||
else: | ||||
return b'' | ||||
assert flush_mode == COMPRESSOBJ_FLUSH_FINISH | ||||
self._finished = True | ||||
chunks = [] | ||||
while True: | ||||
Gregory Szorc
|
r37513 | zresult = lib.ZSTD_compress_generic(self._compressor._cctx, | ||
self._out, | ||||
in_buffer, | ||||
lib.ZSTD_e_end) | ||||
Gregory Szorc
|
r30895 | if lib.ZSTD_isError(zresult): | ||
raise ZstdError('error ending compression stream: %s' % | ||||
Gregory Szorc
|
r37513 | _zstd_error(zresult)) | ||
Gregory Szorc
|
r30895 | |||
if self._out.pos: | ||||
chunks.append(ffi.buffer(self._out.dst, self._out.pos)[:]) | ||||
self._out.pos = 0 | ||||
if not zresult: | ||||
break | ||||
return b''.join(chunks) | ||||
Gregory Szorc
|
r30435 | |||
Gregory Szorc
|
r37513 | class CompressionReader(object): | ||
def __init__(self, compressor, source, size, read_size): | ||||
self._compressor = compressor | ||||
self._source = source | ||||
self._source_size = size | ||||
self._read_size = read_size | ||||
self._entered = False | ||||
self._closed = False | ||||
self._bytes_compressed = 0 | ||||
self._finished_input = False | ||||
self._finished_output = False | ||||
self._in_buffer = ffi.new('ZSTD_inBuffer *') | ||||
# Holds a ref so backing bytes in self._in_buffer stay alive. | ||||
self._source_buffer = None | ||||
def __enter__(self): | ||||
if self._entered: | ||||
raise ValueError('cannot __enter__ multiple times') | ||||
zresult = lib.ZSTD_CCtx_setPledgedSrcSize(self._compressor._cctx, | ||||
self._source_size) | ||||
if lib.ZSTD_isError(zresult): | ||||
raise ZstdError('error setting source size: %s' % | ||||
_zstd_error(zresult)) | ||||
self._entered = True | ||||
return self | ||||
def __exit__(self, exc_type, exc_value, exc_tb): | ||||
self._entered = False | ||||
self._closed = True | ||||
self._source = None | ||||
self._compressor = None | ||||
return False | ||||
def readable(self): | ||||
return True | ||||
def writable(self): | ||||
return False | ||||
def seekable(self): | ||||
return False | ||||
def readline(self): | ||||
raise io.UnsupportedOperation() | ||||
def readlines(self): | ||||
raise io.UnsupportedOperation() | ||||
def write(self, data): | ||||
raise OSError('stream is not writable') | ||||
def writelines(self, ignored): | ||||
raise OSError('stream is not writable') | ||||
def isatty(self): | ||||
return False | ||||
def flush(self): | ||||
return None | ||||
def close(self): | ||||
self._closed = True | ||||
return None | ||||
def closed(self): | ||||
return self._closed | ||||
def tell(self): | ||||
return self._bytes_compressed | ||||
def readall(self): | ||||
raise NotImplementedError() | ||||
def __iter__(self): | ||||
raise io.UnsupportedOperation() | ||||
def __next__(self): | ||||
raise io.UnsupportedOperation() | ||||
next = __next__ | ||||
def read(self, size=-1): | ||||
if not self._entered: | ||||
raise ZstdError('read() must be called from an active context manager') | ||||
if self._closed: | ||||
raise ValueError('stream is closed') | ||||
if self._finished_output: | ||||
return b'' | ||||
if size < 1: | ||||
raise ValueError('cannot read negative or size 0 amounts') | ||||
# Need a dedicated ref to dest buffer otherwise it gets collected. | ||||
dst_buffer = ffi.new('char[]', size) | ||||
out_buffer = ffi.new('ZSTD_outBuffer *') | ||||
out_buffer.dst = dst_buffer | ||||
out_buffer.size = size | ||||
out_buffer.pos = 0 | ||||
def compress_input(): | ||||
if self._in_buffer.pos >= self._in_buffer.size: | ||||
return | ||||
old_pos = out_buffer.pos | ||||
zresult = lib.ZSTD_compress_generic(self._compressor._cctx, | ||||
out_buffer, self._in_buffer, | ||||
lib.ZSTD_e_continue) | ||||
self._bytes_compressed += out_buffer.pos - old_pos | ||||
if self._in_buffer.pos == self._in_buffer.size: | ||||
self._in_buffer.src = ffi.NULL | ||||
self._in_buffer.pos = 0 | ||||
self._in_buffer.size = 0 | ||||
self._source_buffer = None | ||||
if not hasattr(self._source, 'read'): | ||||
self._finished_input = True | ||||
if lib.ZSTD_isError(zresult): | ||||
raise ZstdError('zstd compress error: %s', | ||||
_zstd_error(zresult)) | ||||
if out_buffer.pos and out_buffer.pos == out_buffer.size: | ||||
return ffi.buffer(out_buffer.dst, out_buffer.pos)[:] | ||||
def get_input(): | ||||
if self._finished_input: | ||||
return | ||||
if hasattr(self._source, 'read'): | ||||
data = self._source.read(self._read_size) | ||||
if not data: | ||||
self._finished_input = True | ||||
return | ||||
self._source_buffer = ffi.from_buffer(data) | ||||
self._in_buffer.src = self._source_buffer | ||||
self._in_buffer.size = len(self._source_buffer) | ||||
self._in_buffer.pos = 0 | ||||
else: | ||||
self._source_buffer = ffi.from_buffer(self._source) | ||||
self._in_buffer.src = self._source_buffer | ||||
self._in_buffer.size = len(self._source_buffer) | ||||
self._in_buffer.pos = 0 | ||||
result = compress_input() | ||||
if result: | ||||
return result | ||||
while not self._finished_input: | ||||
get_input() | ||||
result = compress_input() | ||||
if result: | ||||
return result | ||||
# EOF | ||||
old_pos = out_buffer.pos | ||||
zresult = lib.ZSTD_compress_generic(self._compressor._cctx, | ||||
out_buffer, self._in_buffer, | ||||
lib.ZSTD_e_end) | ||||
self._bytes_compressed += out_buffer.pos - old_pos | ||||
if lib.ZSTD_isError(zresult): | ||||
raise ZstdError('error ending compression stream: %s', | ||||
_zstd_error(zresult)) | ||||
if zresult == 0: | ||||
self._finished_output = True | ||||
return ffi.buffer(out_buffer.dst, out_buffer.pos)[:] | ||||
Gregory Szorc
|
r30435 | class ZstdCompressor(object): | ||
Gregory Szorc
|
r30895 | def __init__(self, level=3, dict_data=None, compression_params=None, | ||
Gregory Szorc
|
r37513 | write_checksum=None, write_content_size=None, | ||
write_dict_id=None, threads=0): | ||||
if level > lib.ZSTD_maxCLevel(): | ||||
Gregory Szorc
|
r30895 | raise ValueError('level must be less than %d' % lib.ZSTD_maxCLevel()) | ||
Gregory Szorc
|
r30435 | |||
Gregory Szorc
|
r31796 | if threads < 0: | ||
threads = _cpu_count() | ||||
Gregory Szorc
|
r37513 | if compression_params and write_checksum is not None: | ||
raise ValueError('cannot define compression_params and ' | ||||
'write_checksum') | ||||
if compression_params and write_content_size is not None: | ||||
raise ValueError('cannot define compression_params and ' | ||||
'write_content_size') | ||||
if compression_params and write_dict_id is not None: | ||||
raise ValueError('cannot define compression_params and ' | ||||
'write_dict_id') | ||||
Gregory Szorc
|
r30435 | |||
Gregory Szorc
|
r37513 | if compression_params and threads: | ||
raise ValueError('cannot define compression_params and threads') | ||||
Gregory Szorc
|
r30895 | |||
Gregory Szorc
|
r37513 | if compression_params: | ||
self._params = _make_cctx_params(compression_params) | ||||
Gregory Szorc
|
r31796 | else: | ||
Gregory Szorc
|
r37513 | if write_dict_id is None: | ||
write_dict_id = True | ||||
params = lib.ZSTD_createCCtxParams() | ||||
if params == ffi.NULL: | ||||
Gregory Szorc
|
r31796 | raise MemoryError() | ||
Gregory Szorc
|
r37513 | self._params = ffi.gc(params, lib.ZSTD_freeCCtxParams) | ||
_set_compression_parameter(self._params, | ||||
lib.ZSTD_p_compressionLevel, | ||||
level) | ||||
Gregory Szorc
|
r31796 | |||
Gregory Szorc
|
r37513 | _set_compression_parameter( | ||
self._params, | ||||
lib.ZSTD_p_contentSizeFlag, | ||||
write_content_size if write_content_size is not None else 1) | ||||
_set_compression_parameter(self._params, | ||||
lib.ZSTD_p_checksumFlag, | ||||
1 if write_checksum else 0) | ||||
Gregory Szorc
|
r30895 | |||
Gregory Szorc
|
r37513 | _set_compression_parameter(self._params, | ||
lib.ZSTD_p_dictIDFlag, | ||||
1 if write_dict_id else 0) | ||||
Gregory Szorc
|
r30895 | |||
Gregory Szorc
|
r37513 | if threads: | ||
_set_compression_parameter(self._params, | ||||
lib.ZSTD_p_nbWorkers, | ||||
threads) | ||||
Gregory Szorc
|
r31796 | |||
Gregory Szorc
|
r37513 | cctx = lib.ZSTD_createCCtx() | ||
if cctx == ffi.NULL: | ||||
raise MemoryError() | ||||
self._cctx = cctx | ||||
self._dict_data = dict_data | ||||
Gregory Szorc
|
r31796 | |||
Gregory Szorc
|
r37513 | # We defer setting up garbage collection until after calling | ||
# _ensure_cctx() to ensure the memory size estimate is more accurate. | ||||
try: | ||||
self._ensure_cctx() | ||||
finally: | ||||
self._cctx = ffi.gc(cctx, lib.ZSTD_freeCCtx, | ||||
size=lib.ZSTD_sizeof_CCtx(cctx)) | ||||
Gregory Szorc
|
r30895 | |||
Gregory Szorc
|
r37513 | def _ensure_cctx(self): | ||
lib.ZSTD_CCtx_reset(self._cctx) | ||||
zresult = lib.ZSTD_CCtx_setParametersUsingCCtxParams(self._cctx, | ||||
self._params) | ||||
if lib.ZSTD_isError(zresult): | ||||
raise ZstdError('could not set compression parameters: %s' % | ||||
_zstd_error(zresult)) | ||||
dict_data = self._dict_data | ||||
Gregory Szorc
|
r30895 | |||
Gregory Szorc
|
r37513 | if dict_data: | ||
if dict_data._cdict: | ||||
zresult = lib.ZSTD_CCtx_refCDict(self._cctx, dict_data._cdict) | ||||
else: | ||||
zresult = lib.ZSTD_CCtx_loadDictionary_advanced( | ||||
self._cctx, dict_data.as_bytes(), len(dict_data), | ||||
lib.ZSTD_dlm_byRef, dict_data._dict_type) | ||||
Gregory Szorc
|
r30895 | |||
Gregory Szorc
|
r37513 | if lib.ZSTD_isError(zresult): | ||
raise ZstdError('could not load compression dictionary: %s' % | ||||
_zstd_error(zresult)) | ||||
def memory_size(self): | ||||
return lib.ZSTD_sizeof_CCtx(self._cctx) | ||||
def compress(self, data): | ||||
self._ensure_cctx() | ||||
data_buffer = ffi.from_buffer(data) | ||||
dest_size = lib.ZSTD_compressBound(len(data_buffer)) | ||||
Gregory Szorc
|
r30895 | out = new_nonzero('char[]', dest_size) | ||
Gregory Szorc
|
r30435 | |||
Gregory Szorc
|
r37513 | zresult = lib.ZSTD_CCtx_setPledgedSrcSize(self._cctx, len(data_buffer)) | ||
if lib.ZSTD_isError(zresult): | ||||
raise ZstdError('error setting source size: %s' % | ||||
_zstd_error(zresult)) | ||||
out_buffer = ffi.new('ZSTD_outBuffer *') | ||||
in_buffer = ffi.new('ZSTD_inBuffer *') | ||||
out_buffer.dst = out | ||||
out_buffer.size = dest_size | ||||
out_buffer.pos = 0 | ||||
in_buffer.src = data_buffer | ||||
in_buffer.size = len(data_buffer) | ||||
in_buffer.pos = 0 | ||||
zresult = lib.ZSTD_compress_generic(self._cctx, | ||||
out_buffer, | ||||
in_buffer, | ||||
lib.ZSTD_e_end) | ||||
Gregory Szorc
|
r30895 | |||
if lib.ZSTD_isError(zresult): | ||||
raise ZstdError('cannot compress: %s' % | ||||
Gregory Szorc
|
r37513 | _zstd_error(zresult)) | ||
elif zresult: | ||||
raise ZstdError('unexpected partial frame flush') | ||||
Gregory Szorc
|
r30895 | |||
Gregory Szorc
|
r37513 | return ffi.buffer(out, out_buffer.pos)[:] | ||
Gregory Szorc
|
r30895 | |||
Gregory Szorc
|
r37513 | def compressobj(self, size=-1): | ||
self._ensure_cctx() | ||||
if size < 0: | ||||
size = lib.ZSTD_CONTENTSIZE_UNKNOWN | ||||
zresult = lib.ZSTD_CCtx_setPledgedSrcSize(self._cctx, size) | ||||
if lib.ZSTD_isError(zresult): | ||||
raise ZstdError('error setting source size: %s' % | ||||
_zstd_error(zresult)) | ||||
Gregory Szorc
|
r31796 | |||
Gregory Szorc
|
r30895 | cobj = ZstdCompressionObj() | ||
cobj._out = ffi.new('ZSTD_outBuffer *') | ||||
cobj._dst_buffer = ffi.new('char[]', COMPRESSION_RECOMMENDED_OUTPUT_SIZE) | ||||
cobj._out.dst = cobj._dst_buffer | ||||
cobj._out.size = COMPRESSION_RECOMMENDED_OUTPUT_SIZE | ||||
cobj._out.pos = 0 | ||||
cobj._compressor = self | ||||
cobj._finished = False | ||||
return cobj | ||||
Gregory Szorc
|
r37513 | def copy_stream(self, ifh, ofh, size=-1, | ||
Gregory Szorc
|
r30895 | read_size=COMPRESSION_RECOMMENDED_INPUT_SIZE, | ||
write_size=COMPRESSION_RECOMMENDED_OUTPUT_SIZE): | ||||
if not hasattr(ifh, 'read'): | ||||
raise ValueError('first argument must have a read() method') | ||||
if not hasattr(ofh, 'write'): | ||||
raise ValueError('second argument must have a write() method') | ||||
Gregory Szorc
|
r37513 | self._ensure_cctx() | ||
if size < 0: | ||||
size = lib.ZSTD_CONTENTSIZE_UNKNOWN | ||||
zresult = lib.ZSTD_CCtx_setPledgedSrcSize(self._cctx, size) | ||||
if lib.ZSTD_isError(zresult): | ||||
raise ZstdError('error setting source size: %s' % | ||||
_zstd_error(zresult)) | ||||
Gregory Szorc
|
r30435 | |||
in_buffer = ffi.new('ZSTD_inBuffer *') | ||||
out_buffer = ffi.new('ZSTD_outBuffer *') | ||||
Gregory Szorc
|
r30895 | dst_buffer = ffi.new('char[]', write_size) | ||
out_buffer.dst = dst_buffer | ||||
out_buffer.size = write_size | ||||
Gregory Szorc
|
r30435 | out_buffer.pos = 0 | ||
total_read, total_write = 0, 0 | ||||
while True: | ||||
Gregory Szorc
|
r30895 | data = ifh.read(read_size) | ||
Gregory Szorc
|
r30435 | if not data: | ||
break | ||||
Gregory Szorc
|
r30895 | data_buffer = ffi.from_buffer(data) | ||
total_read += len(data_buffer) | ||||
in_buffer.src = data_buffer | ||||
in_buffer.size = len(data_buffer) | ||||
Gregory Szorc
|
r30435 | in_buffer.pos = 0 | ||
while in_buffer.pos < in_buffer.size: | ||||
Gregory Szorc
|
r37513 | zresult = lib.ZSTD_compress_generic(self._cctx, | ||
out_buffer, | ||||
in_buffer, | ||||
lib.ZSTD_e_continue) | ||||
Gregory Szorc
|
r30895 | if lib.ZSTD_isError(zresult): | ||
raise ZstdError('zstd compress error: %s' % | ||||
Gregory Szorc
|
r37513 | _zstd_error(zresult)) | ||
Gregory Szorc
|
r30435 | |||
if out_buffer.pos: | ||||
ofh.write(ffi.buffer(out_buffer.dst, out_buffer.pos)) | ||||
Gregory Szorc
|
r30895 | total_write += out_buffer.pos | ||
Gregory Szorc
|
r30435 | out_buffer.pos = 0 | ||
# We've finished reading. Flush the compressor. | ||||
while True: | ||||
Gregory Szorc
|
r37513 | zresult = lib.ZSTD_compress_generic(self._cctx, | ||
out_buffer, | ||||
in_buffer, | ||||
lib.ZSTD_e_end) | ||||
Gregory Szorc
|
r30895 | if lib.ZSTD_isError(zresult): | ||
raise ZstdError('error ending compression stream: %s' % | ||||
Gregory Szorc
|
r37513 | _zstd_error(zresult)) | ||
Gregory Szorc
|
r30435 | |||
if out_buffer.pos: | ||||
ofh.write(ffi.buffer(out_buffer.dst, out_buffer.pos)) | ||||
total_write += out_buffer.pos | ||||
out_buffer.pos = 0 | ||||
Gregory Szorc
|
r30895 | if zresult == 0: | ||
Gregory Szorc
|
r30435 | break | ||
return total_read, total_write | ||||
Gregory Szorc
|
r37513 | def stream_reader(self, source, size=-1, | ||
read_size=COMPRESSION_RECOMMENDED_INPUT_SIZE): | ||||
self._ensure_cctx() | ||||
try: | ||||
size = len(source) | ||||
except Exception: | ||||
pass | ||||
if size < 0: | ||||
size = lib.ZSTD_CONTENTSIZE_UNKNOWN | ||||
return CompressionReader(self, source, size, read_size) | ||||
def stream_writer(self, writer, size=-1, | ||||
Gregory Szorc
|
r30895 | write_size=COMPRESSION_RECOMMENDED_OUTPUT_SIZE): | ||
if not hasattr(writer, 'write'): | ||||
raise ValueError('must pass an object with a write() method') | ||||
Gregory Szorc
|
r37513 | self._ensure_cctx() | ||
if size < 0: | ||||
size = lib.ZSTD_CONTENTSIZE_UNKNOWN | ||||
Gregory Szorc
|
r30895 | return ZstdCompressionWriter(self, writer, size, write_size) | ||
Gregory Szorc
|
r37513 | write_to = stream_writer | ||
def read_to_iter(self, reader, size=-1, | ||||
read_size=COMPRESSION_RECOMMENDED_INPUT_SIZE, | ||||
write_size=COMPRESSION_RECOMMENDED_OUTPUT_SIZE): | ||||
Gregory Szorc
|
r30895 | if hasattr(reader, 'read'): | ||
have_read = True | ||||
elif hasattr(reader, '__getitem__'): | ||||
have_read = False | ||||
buffer_offset = 0 | ||||
size = len(reader) | ||||
else: | ||||
raise ValueError('must pass an object with a read() method or ' | ||||
'conforms to buffer protocol') | ||||
Gregory Szorc
|
r37513 | self._ensure_cctx() | ||
if size < 0: | ||||
size = lib.ZSTD_CONTENTSIZE_UNKNOWN | ||||
zresult = lib.ZSTD_CCtx_setPledgedSrcSize(self._cctx, size) | ||||
if lib.ZSTD_isError(zresult): | ||||
raise ZstdError('error setting source size: %s' % | ||||
_zstd_error(zresult)) | ||||
Gregory Szorc
|
r30895 | |||
in_buffer = ffi.new('ZSTD_inBuffer *') | ||||
out_buffer = ffi.new('ZSTD_outBuffer *') | ||||
in_buffer.src = ffi.NULL | ||||
in_buffer.size = 0 | ||||
in_buffer.pos = 0 | ||||
dst_buffer = ffi.new('char[]', write_size) | ||||
out_buffer.dst = dst_buffer | ||||
out_buffer.size = write_size | ||||
out_buffer.pos = 0 | ||||
while True: | ||||
# We should never have output data sitting around after a previous | ||||
# iteration. | ||||
assert out_buffer.pos == 0 | ||||
# Collect input data. | ||||
if have_read: | ||||
read_result = reader.read(read_size) | ||||
else: | ||||
remaining = len(reader) - buffer_offset | ||||
slice_size = min(remaining, read_size) | ||||
read_result = reader[buffer_offset:buffer_offset + slice_size] | ||||
buffer_offset += slice_size | ||||
Gregory Szorc
|
r30435 | |||
Gregory Szorc
|
r30895 | # No new input data. Break out of the read loop. | ||
if not read_result: | ||||
break | ||||
# Feed all read data into the compressor and emit output until | ||||
# exhausted. | ||||
read_buffer = ffi.from_buffer(read_result) | ||||
in_buffer.src = read_buffer | ||||
in_buffer.size = len(read_buffer) | ||||
in_buffer.pos = 0 | ||||
while in_buffer.pos < in_buffer.size: | ||||
Gregory Szorc
|
r37513 | zresult = lib.ZSTD_compress_generic(self._cctx, out_buffer, in_buffer, | ||
lib.ZSTD_e_continue) | ||||
Gregory Szorc
|
r30895 | if lib.ZSTD_isError(zresult): | ||
raise ZstdError('zstd compress error: %s' % | ||||
Gregory Szorc
|
r37513 | _zstd_error(zresult)) | ||
Gregory Szorc
|
r30895 | |||
if out_buffer.pos: | ||||
data = ffi.buffer(out_buffer.dst, out_buffer.pos)[:] | ||||
out_buffer.pos = 0 | ||||
yield data | ||||
assert out_buffer.pos == 0 | ||||
# And repeat the loop to collect more data. | ||||
continue | ||||
# If we get here, input is exhausted. End the stream and emit what | ||||
# remains. | ||||
while True: | ||||
assert out_buffer.pos == 0 | ||||
Gregory Szorc
|
r37513 | zresult = lib.ZSTD_compress_generic(self._cctx, | ||
out_buffer, | ||||
in_buffer, | ||||
lib.ZSTD_e_end) | ||||
Gregory Szorc
|
r30895 | if lib.ZSTD_isError(zresult): | ||
raise ZstdError('error ending compression stream: %s' % | ||||
Gregory Szorc
|
r37513 | _zstd_error(zresult)) | ||
Gregory Szorc
|
r30895 | |||
if out_buffer.pos: | ||||
data = ffi.buffer(out_buffer.dst, out_buffer.pos)[:] | ||||
out_buffer.pos = 0 | ||||
yield data | ||||
if zresult == 0: | ||||
break | ||||
Gregory Szorc
|
r37513 | read_from = read_to_iter | ||
Gregory Szorc
|
r30895 | |||
Gregory Szorc
|
r37513 | def frame_progression(self): | ||
progression = lib.ZSTD_getFrameProgression(self._cctx) | ||||
Gregory Szorc
|
r31796 | |||
Gregory Szorc
|
r37513 | return progression.ingested, progression.consumed, progression.produced | ||
Gregory Szorc
|
r30895 | |||
class FrameParameters(object): | ||||
def __init__(self, fparams): | ||||
self.content_size = fparams.frameContentSize | ||||
self.window_size = fparams.windowSize | ||||
self.dict_id = fparams.dictID | ||||
self.has_checksum = bool(fparams.checksumFlag) | ||||
Gregory Szorc
|
r37513 | def frame_content_size(data): | ||
data_buffer = ffi.from_buffer(data) | ||||
size = lib.ZSTD_getFrameContentSize(data_buffer, len(data_buffer)) | ||||
if size == lib.ZSTD_CONTENTSIZE_ERROR: | ||||
raise ZstdError('error when determining content size') | ||||
elif size == lib.ZSTD_CONTENTSIZE_UNKNOWN: | ||||
return -1 | ||||
else: | ||||
return size | ||||
Gregory Szorc
|
r30895 | |||
Gregory Szorc
|
r37513 | def frame_header_size(data): | ||
data_buffer = ffi.from_buffer(data) | ||||
zresult = lib.ZSTD_frameHeaderSize(data_buffer, len(data_buffer)) | ||||
if lib.ZSTD_isError(zresult): | ||||
raise ZstdError('could not determine frame header size: %s' % | ||||
_zstd_error(zresult)) | ||||
Gregory Szorc
|
r30895 | |||
Gregory Szorc
|
r37513 | return zresult | ||
def get_frame_parameters(data): | ||||
params = ffi.new('ZSTD_frameHeader *') | ||||
data_buffer = ffi.from_buffer(data) | ||||
zresult = lib.ZSTD_getFrameHeader(params, data_buffer, len(data_buffer)) | ||||
Gregory Szorc
|
r30895 | if lib.ZSTD_isError(zresult): | ||
raise ZstdError('cannot get frame parameters: %s' % | ||||
Gregory Szorc
|
r37513 | _zstd_error(zresult)) | ||
Gregory Szorc
|
r30895 | |||
if zresult: | ||||
raise ZstdError('not enough data for frame parameters; need %d bytes' % | ||||
zresult) | ||||
return FrameParameters(params[0]) | ||||
class ZstdCompressionDict(object): | ||||
Gregory Szorc
|
r37513 | def __init__(self, data, dict_type=DICT_TYPE_AUTO, k=0, d=0): | ||
Gregory Szorc
|
r30895 | assert isinstance(data, bytes_type) | ||
self._data = data | ||||
Gregory Szorc
|
r31796 | self.k = k | ||
self.d = d | ||||
Gregory Szorc
|
r30895 | |||
Gregory Szorc
|
r37513 | if dict_type not in (DICT_TYPE_AUTO, DICT_TYPE_RAWCONTENT, | ||
DICT_TYPE_FULLDICT): | ||||
raise ValueError('invalid dictionary load mode: %d; must use ' | ||||
'DICT_TYPE_* constants') | ||||
self._dict_type = dict_type | ||||
self._cdict = None | ||||
Gregory Szorc
|
r30895 | def __len__(self): | ||
return len(self._data) | ||||
def dict_id(self): | ||||
return int_type(lib.ZDICT_getDictID(self._data, len(self._data))) | ||||
def as_bytes(self): | ||||
return self._data | ||||
Gregory Szorc
|
r37513 | def precompute_compress(self, level=0, compression_params=None): | ||
if level and compression_params: | ||||
raise ValueError('must only specify one of level or ' | ||||
'compression_params') | ||||
Gregory Szorc
|
r30895 | |||
Gregory Szorc
|
r37513 | if not level and not compression_params: | ||
raise ValueError('must specify one of level or compression_params') | ||||
Gregory Szorc
|
r30895 | |||
Gregory Szorc
|
r37513 | if level: | ||
cparams = lib.ZSTD_getCParams(level, 0, len(self._data)) | ||||
else: | ||||
cparams = ffi.new('ZSTD_compressionParameters') | ||||
cparams.chainLog = compression_params.chain_log | ||||
cparams.hashLog = compression_params.hash_log | ||||
cparams.searchLength = compression_params.min_match | ||||
cparams.searchLog = compression_params.search_log | ||||
cparams.strategy = compression_params.compression_strategy | ||||
cparams.targetLength = compression_params.target_length | ||||
cparams.windowLog = compression_params.window_log | ||||
Gregory Szorc
|
r30895 | |||
Gregory Szorc
|
r37513 | cdict = lib.ZSTD_createCDict_advanced(self._data, len(self._data), | ||
lib.ZSTD_dlm_byRef, | ||||
self._dict_type, | ||||
cparams, | ||||
lib.ZSTD_defaultCMem) | ||||
if cdict == ffi.NULL: | ||||
raise ZstdError('unable to precompute dictionary') | ||||
self._cdict = ffi.gc(cdict, lib.ZSTD_freeCDict, | ||||
size=lib.ZSTD_sizeof_CDict(cdict)) | ||||
Gregory Szorc
|
r31796 | |||
Gregory Szorc
|
r37513 | @property | ||
def _ddict(self): | ||||
ddict = lib.ZSTD_createDDict_advanced(self._data, len(self._data), | ||||
lib.ZSTD_dlm_byRef, | ||||
self._dict_type, | ||||
lib.ZSTD_defaultCMem) | ||||
Gregory Szorc
|
r31796 | |||
Gregory Szorc
|
r37513 | if ddict == ffi.NULL: | ||
raise ZstdError('could not create decompression dict') | ||||
Gregory Szorc
|
r30895 | |||
Gregory Szorc
|
r37513 | ddict = ffi.gc(ddict, lib.ZSTD_freeDDict, | ||
size=lib.ZSTD_sizeof_DDict(ddict)) | ||||
self.__dict__['_ddict'] = ddict | ||||
Gregory Szorc
|
r30895 | |||
Gregory Szorc
|
r37513 | return ddict | ||
def train_dictionary(dict_size, samples, k=0, d=0, notifications=0, dict_id=0, | ||||
level=0, steps=0, threads=0): | ||||
Gregory Szorc
|
r31796 | if not isinstance(samples, list): | ||
raise TypeError('samples must be a list') | ||||
if threads < 0: | ||||
threads = _cpu_count() | ||||
total_size = sum(map(len, samples)) | ||||
samples_buffer = new_nonzero('char[]', total_size) | ||||
sample_sizes = new_nonzero('size_t[]', len(samples)) | ||||
offset = 0 | ||||
for i, sample in enumerate(samples): | ||||
if not isinstance(sample, bytes_type): | ||||
raise ValueError('samples must be bytes') | ||||
l = len(sample) | ||||
ffi.memmove(samples_buffer + offset, sample, l) | ||||
offset += l | ||||
sample_sizes[i] = l | ||||
dict_data = new_nonzero('char[]', dict_size) | ||||
Gregory Szorc
|
r37513 | dparams = ffi.new('ZDICT_cover_params_t *')[0] | ||
Gregory Szorc
|
r31796 | dparams.k = k | ||
dparams.d = d | ||||
dparams.steps = steps | ||||
dparams.nbThreads = threads | ||||
Gregory Szorc
|
r37513 | dparams.zParams.notificationLevel = notifications | ||
dparams.zParams.dictID = dict_id | ||||
dparams.zParams.compressionLevel = level | ||||
Gregory Szorc
|
r31796 | |||
Gregory Szorc
|
r37513 | if (not dparams.k and not dparams.d and not dparams.steps | ||
and not dparams.nbThreads and not dparams.zParams.notificationLevel | ||||
and not dparams.zParams.dictID | ||||
and not dparams.zParams.compressionLevel): | ||||
zresult = lib.ZDICT_trainFromBuffer( | ||||
ffi.addressof(dict_data), dict_size, | ||||
ffi.addressof(samples_buffer), | ||||
ffi.addressof(sample_sizes, 0), len(samples)) | ||||
elif dparams.steps or dparams.nbThreads: | ||||
zresult = lib.ZDICT_optimizeTrainFromBuffer_cover( | ||||
Gregory Szorc
|
r31796 | ffi.addressof(dict_data), dict_size, | ||
ffi.addressof(samples_buffer), | ||||
ffi.addressof(sample_sizes, 0), len(samples), | ||||
ffi.addressof(dparams)) | ||||
else: | ||||
Gregory Szorc
|
r37513 | zresult = lib.ZDICT_trainFromBuffer_cover( | ||
Gregory Szorc
|
r31796 | ffi.addressof(dict_data), dict_size, | ||
ffi.addressof(samples_buffer), | ||||
ffi.addressof(sample_sizes, 0), len(samples), | ||||
dparams) | ||||
if lib.ZDICT_isError(zresult): | ||||
Gregory Szorc
|
r37513 | msg = ffi.string(lib.ZDICT_getErrorName(zresult)).decode('utf-8') | ||
raise ZstdError('cannot train dict: %s' % msg) | ||||
Gregory Szorc
|
r31796 | |||
return ZstdCompressionDict(ffi.buffer(dict_data, zresult)[:], | ||||
Gregory Szorc
|
r37513 | dict_type=DICT_TYPE_FULLDICT, | ||
Gregory Szorc
|
r31796 | k=dparams.k, d=dparams.d) | ||
Gregory Szorc
|
r30895 | class ZstdDecompressionObj(object): | ||
Gregory Szorc
|
r37513 | def __init__(self, decompressor, write_size): | ||
Gregory Szorc
|
r30895 | self._decompressor = decompressor | ||
Gregory Szorc
|
r37513 | self._write_size = write_size | ||
Gregory Szorc
|
r30895 | self._finished = False | ||
def decompress(self, data): | ||||
if self._finished: | ||||
raise ZstdError('cannot use a decompressobj multiple times') | ||||
in_buffer = ffi.new('ZSTD_inBuffer *') | ||||
out_buffer = ffi.new('ZSTD_outBuffer *') | ||||
data_buffer = ffi.from_buffer(data) | ||||
in_buffer.src = data_buffer | ||||
in_buffer.size = len(data_buffer) | ||||
in_buffer.pos = 0 | ||||
Gregory Szorc
|
r37513 | dst_buffer = ffi.new('char[]', self._write_size) | ||
Gregory Szorc
|
r30895 | out_buffer.dst = dst_buffer | ||
out_buffer.size = len(dst_buffer) | ||||
out_buffer.pos = 0 | ||||
chunks = [] | ||||
while in_buffer.pos < in_buffer.size: | ||||
Gregory Szorc
|
r37513 | zresult = lib.ZSTD_decompress_generic(self._decompressor._dctx, | ||
out_buffer, in_buffer) | ||||
Gregory Szorc
|
r30895 | if lib.ZSTD_isError(zresult): | ||
raise ZstdError('zstd decompressor error: %s' % | ||||
Gregory Szorc
|
r37513 | _zstd_error(zresult)) | ||
Gregory Szorc
|
r30895 | |||
if zresult == 0: | ||||
self._finished = True | ||||
self._decompressor = None | ||||
if out_buffer.pos: | ||||
chunks.append(ffi.buffer(out_buffer.dst, out_buffer.pos)[:]) | ||||
out_buffer.pos = 0 | ||||
return b''.join(chunks) | ||||
Gregory Szorc
|
r37513 | class DecompressionReader(object): | ||
def __init__(self, decompressor, source, read_size): | ||||
self._decompressor = decompressor | ||||
self._source = source | ||||
self._read_size = read_size | ||||
self._entered = False | ||||
self._closed = False | ||||
self._bytes_decompressed = 0 | ||||
self._finished_input = False | ||||
self._finished_output = False | ||||
self._in_buffer = ffi.new('ZSTD_inBuffer *') | ||||
# Holds a ref to self._in_buffer.src. | ||||
self._source_buffer = None | ||||
def __enter__(self): | ||||
if self._entered: | ||||
raise ValueError('cannot __enter__ multiple times') | ||||
self._decompressor._ensure_dctx() | ||||
self._entered = True | ||||
return self | ||||
def __exit__(self, exc_type, exc_value, exc_tb): | ||||
self._entered = False | ||||
self._closed = True | ||||
self._source = None | ||||
self._decompressor = None | ||||
return False | ||||
def readable(self): | ||||
return True | ||||
def writable(self): | ||||
return False | ||||
def seekable(self): | ||||
return True | ||||
def readline(self): | ||||
raise NotImplementedError() | ||||
def readlines(self): | ||||
raise NotImplementedError() | ||||
def write(self, data): | ||||
raise io.UnsupportedOperation() | ||||
def writelines(self, lines): | ||||
raise io.UnsupportedOperation() | ||||
def isatty(self): | ||||
return False | ||||
def flush(self): | ||||
return None | ||||
def close(self): | ||||
self._closed = True | ||||
return None | ||||
def closed(self): | ||||
return self._closed | ||||
def tell(self): | ||||
return self._bytes_decompressed | ||||
def readall(self): | ||||
raise NotImplementedError() | ||||
def __iter__(self): | ||||
raise NotImplementedError() | ||||
def __next__(self): | ||||
raise NotImplementedError() | ||||
next = __next__ | ||||
def read(self, size=-1): | ||||
if not self._entered: | ||||
raise ZstdError('read() must be called from an active context manager') | ||||
if self._closed: | ||||
raise ValueError('stream is closed') | ||||
if self._finished_output: | ||||
return b'' | ||||
if size < 1: | ||||
raise ValueError('cannot read negative or size 0 amounts') | ||||
dst_buffer = ffi.new('char[]', size) | ||||
out_buffer = ffi.new('ZSTD_outBuffer *') | ||||
out_buffer.dst = dst_buffer | ||||
out_buffer.size = size | ||||
out_buffer.pos = 0 | ||||
def decompress(): | ||||
zresult = lib.ZSTD_decompress_generic(self._decompressor._dctx, | ||||
out_buffer, self._in_buffer) | ||||
if self._in_buffer.pos == self._in_buffer.size: | ||||
self._in_buffer.src = ffi.NULL | ||||
self._in_buffer.pos = 0 | ||||
self._in_buffer.size = 0 | ||||
self._source_buffer = None | ||||
if not hasattr(self._source, 'read'): | ||||
self._finished_input = True | ||||
if lib.ZSTD_isError(zresult): | ||||
raise ZstdError('zstd decompress error: %s', | ||||
_zstd_error(zresult)) | ||||
elif zresult == 0: | ||||
self._finished_output = True | ||||
if out_buffer.pos and out_buffer.pos == out_buffer.size: | ||||
self._bytes_decompressed += out_buffer.size | ||||
return ffi.buffer(out_buffer.dst, out_buffer.pos)[:] | ||||
def get_input(): | ||||
if self._finished_input: | ||||
return | ||||
if hasattr(self._source, 'read'): | ||||
data = self._source.read(self._read_size) | ||||
if not data: | ||||
self._finished_input = True | ||||
return | ||||
self._source_buffer = ffi.from_buffer(data) | ||||
self._in_buffer.src = self._source_buffer | ||||
self._in_buffer.size = len(self._source_buffer) | ||||
self._in_buffer.pos = 0 | ||||
else: | ||||
self._source_buffer = ffi.from_buffer(self._source) | ||||
self._in_buffer.src = self._source_buffer | ||||
self._in_buffer.size = len(self._source_buffer) | ||||
self._in_buffer.pos = 0 | ||||
get_input() | ||||
result = decompress() | ||||
if result: | ||||
return result | ||||
while not self._finished_input: | ||||
get_input() | ||||
result = decompress() | ||||
if result: | ||||
return result | ||||
self._bytes_decompressed += out_buffer.pos | ||||
return ffi.buffer(out_buffer.dst, out_buffer.pos)[:] | ||||
def seek(self, pos, whence=os.SEEK_SET): | ||||
if not self._entered: | ||||
raise ZstdError('seek() must be called from an active context ' | ||||
'manager') | ||||
if self._closed: | ||||
raise ValueError('stream is closed') | ||||
read_amount = 0 | ||||
if whence == os.SEEK_SET: | ||||
if pos < 0: | ||||
raise ValueError('cannot seek to negative position with SEEK_SET') | ||||
if pos < self._bytes_decompressed: | ||||
raise ValueError('cannot seek zstd decompression stream ' | ||||
'backwards') | ||||
read_amount = pos - self._bytes_decompressed | ||||
elif whence == os.SEEK_CUR: | ||||
if pos < 0: | ||||
raise ValueError('cannot seek zstd decompression stream ' | ||||
'backwards') | ||||
read_amount = pos | ||||
elif whence == os.SEEK_END: | ||||
raise ValueError('zstd decompression streams cannot be seeked ' | ||||
'with SEEK_END') | ||||
while read_amount: | ||||
result = self.read(min(read_amount, | ||||
DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE)) | ||||
if not result: | ||||
break | ||||
read_amount -= len(result) | ||||
return self._bytes_decompressed | ||||
Gregory Szorc
|
r30895 | class ZstdDecompressionWriter(object): | ||
def __init__(self, decompressor, writer, write_size): | ||||
self._decompressor = decompressor | ||||
self._writer = writer | ||||
self._write_size = write_size | ||||
self._entered = False | ||||
def __enter__(self): | ||||
if self._entered: | ||||
raise ZstdError('cannot __enter__ multiple times') | ||||
Gregory Szorc
|
r37513 | self._decompressor._ensure_dctx() | ||
Gregory Szorc
|
r30895 | self._entered = True | ||
return self | ||||
def __exit__(self, exc_type, exc_value, exc_tb): | ||||
self._entered = False | ||||
def memory_size(self): | ||||
Gregory Szorc
|
r37513 | if not self._decompressor._dctx: | ||
Gregory Szorc
|
r30895 | raise ZstdError('cannot determine size of inactive decompressor ' | ||
'call when context manager is active') | ||||
Gregory Szorc
|
r37513 | return lib.ZSTD_sizeof_DCtx(self._decompressor._dctx) | ||
Gregory Szorc
|
r30895 | |||
def write(self, data): | ||||
if not self._entered: | ||||
raise ZstdError('write must be called from an active context manager') | ||||
total_write = 0 | ||||
in_buffer = ffi.new('ZSTD_inBuffer *') | ||||
out_buffer = ffi.new('ZSTD_outBuffer *') | ||||
data_buffer = ffi.from_buffer(data) | ||||
in_buffer.src = data_buffer | ||||
in_buffer.size = len(data_buffer) | ||||
in_buffer.pos = 0 | ||||
dst_buffer = ffi.new('char[]', self._write_size) | ||||
out_buffer.dst = dst_buffer | ||||
out_buffer.size = len(dst_buffer) | ||||
out_buffer.pos = 0 | ||||
Gregory Szorc
|
r37513 | dctx = self._decompressor._dctx | ||
Gregory Szorc
|
r31796 | |||
Gregory Szorc
|
r30895 | while in_buffer.pos < in_buffer.size: | ||
Gregory Szorc
|
r37513 | zresult = lib.ZSTD_decompress_generic(dctx, out_buffer, in_buffer) | ||
Gregory Szorc
|
r30895 | if lib.ZSTD_isError(zresult): | ||
raise ZstdError('zstd decompress error: %s' % | ||||
Gregory Szorc
|
r37513 | _zstd_error(zresult)) | ||
Gregory Szorc
|
r30895 | |||
if out_buffer.pos: | ||||
self._writer.write(ffi.buffer(out_buffer.dst, out_buffer.pos)[:]) | ||||
total_write += out_buffer.pos | ||||
out_buffer.pos = 0 | ||||
return total_write | ||||
class ZstdDecompressor(object): | ||||
Gregory Szorc
|
r37513 | def __init__(self, dict_data=None, max_window_size=0, format=FORMAT_ZSTD1): | ||
Gregory Szorc
|
r30895 | self._dict_data = dict_data | ||
Gregory Szorc
|
r37513 | self._max_window_size = max_window_size | ||
self._format = format | ||||
Gregory Szorc
|
r30895 | |||
dctx = lib.ZSTD_createDCtx() | ||||
if dctx == ffi.NULL: | ||||
raise MemoryError() | ||||
Gregory Szorc
|
r37513 | self._dctx = dctx | ||
Gregory Szorc
|
r30895 | |||
Gregory Szorc
|
r37513 | # Defer setting up garbage collection until full state is loaded so | ||
# the memory size is more accurate. | ||||
try: | ||||
self._ensure_dctx() | ||||
finally: | ||||
self._dctx = ffi.gc(dctx, lib.ZSTD_freeDCtx, | ||||
size=lib.ZSTD_sizeof_DCtx(dctx)) | ||||
Gregory Szorc
|
r30895 | |||
Gregory Szorc
|
r37513 | def memory_size(self): | ||
return lib.ZSTD_sizeof_DCtx(self._dctx) | ||||
Gregory Szorc
|
r30895 | |||
def decompress(self, data, max_output_size=0): | ||||
Gregory Szorc
|
r37513 | self._ensure_dctx() | ||
Gregory Szorc
|
r30895 | data_buffer = ffi.from_buffer(data) | ||
Gregory Szorc
|
r37513 | output_size = lib.ZSTD_getFrameContentSize(data_buffer, len(data_buffer)) | ||
Gregory Szorc
|
r30895 | |||
Gregory Szorc
|
r37513 | if output_size == lib.ZSTD_CONTENTSIZE_ERROR: | ||
raise ZstdError('error determining content size from frame header') | ||||
elif output_size == 0: | ||||
return b'' | ||||
elif output_size == lib.ZSTD_CONTENTSIZE_UNKNOWN: | ||||
Gregory Szorc
|
r30895 | if not max_output_size: | ||
Gregory Szorc
|
r37513 | raise ZstdError('could not determine content size in frame header') | ||
Gregory Szorc
|
r30895 | |||
result_buffer = ffi.new('char[]', max_output_size) | ||||
result_size = max_output_size | ||||
Gregory Szorc
|
r37513 | output_size = 0 | ||
else: | ||||
result_buffer = ffi.new('char[]', output_size) | ||||
result_size = output_size | ||||
Gregory Szorc
|
r30895 | |||
Gregory Szorc
|
r37513 | out_buffer = ffi.new('ZSTD_outBuffer *') | ||
out_buffer.dst = result_buffer | ||||
out_buffer.size = result_size | ||||
out_buffer.pos = 0 | ||||
in_buffer = ffi.new('ZSTD_inBuffer *') | ||||
in_buffer.src = data_buffer | ||||
in_buffer.size = len(data_buffer) | ||||
in_buffer.pos = 0 | ||||
zresult = lib.ZSTD_decompress_generic(self._dctx, out_buffer, in_buffer) | ||||
Gregory Szorc
|
r30895 | if lib.ZSTD_isError(zresult): | ||
raise ZstdError('decompression error: %s' % | ||||
Gregory Szorc
|
r37513 | _zstd_error(zresult)) | ||
elif zresult: | ||||
raise ZstdError('decompression error: did not decompress full frame') | ||||
elif output_size and out_buffer.pos != output_size: | ||||
Gregory Szorc
|
r30895 | raise ZstdError('decompression error: decompressed %d bytes; expected %d' % | ||
(zresult, output_size)) | ||||
Gregory Szorc
|
r37513 | return ffi.buffer(result_buffer, out_buffer.pos)[:] | ||
def stream_reader(self, source, read_size=DECOMPRESSION_RECOMMENDED_INPUT_SIZE): | ||||
self._ensure_dctx() | ||||
return DecompressionReader(self, source, read_size) | ||||
Gregory Szorc
|
r30895 | |||
Gregory Szorc
|
r37513 | def decompressobj(self, write_size=DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE): | ||
if write_size < 1: | ||||
raise ValueError('write_size must be positive') | ||||
Gregory Szorc
|
r30895 | |||
Gregory Szorc
|
r37513 | self._ensure_dctx() | ||
return ZstdDecompressionObj(self, write_size=write_size) | ||||
def read_to_iter(self, reader, read_size=DECOMPRESSION_RECOMMENDED_INPUT_SIZE, | ||||
write_size=DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE, | ||||
skip_bytes=0): | ||||
Gregory Szorc
|
r30895 | if skip_bytes >= read_size: | ||
raise ValueError('skip_bytes must be smaller than read_size') | ||||
if hasattr(reader, 'read'): | ||||
have_read = True | ||||
elif hasattr(reader, '__getitem__'): | ||||
have_read = False | ||||
buffer_offset = 0 | ||||
size = len(reader) | ||||
else: | ||||
raise ValueError('must pass an object with a read() method or ' | ||||
'conforms to buffer protocol') | ||||
if skip_bytes: | ||||
if have_read: | ||||
reader.read(skip_bytes) | ||||
else: | ||||
if skip_bytes > size: | ||||
raise ValueError('skip_bytes larger than first input chunk') | ||||
buffer_offset = skip_bytes | ||||
Gregory Szorc
|
r37513 | self._ensure_dctx() | ||
Gregory Szorc
|
r30895 | |||
in_buffer = ffi.new('ZSTD_inBuffer *') | ||||
out_buffer = ffi.new('ZSTD_outBuffer *') | ||||
dst_buffer = ffi.new('char[]', write_size) | ||||
out_buffer.dst = dst_buffer | ||||
out_buffer.size = len(dst_buffer) | ||||
out_buffer.pos = 0 | ||||
while True: | ||||
assert out_buffer.pos == 0 | ||||
if have_read: | ||||
read_result = reader.read(read_size) | ||||
else: | ||||
remaining = size - buffer_offset | ||||
slice_size = min(remaining, read_size) | ||||
read_result = reader[buffer_offset:buffer_offset + slice_size] | ||||
buffer_offset += slice_size | ||||
# No new input. Break out of read loop. | ||||
if not read_result: | ||||
break | ||||
# Feed all read data into decompressor and emit output until | ||||
# exhausted. | ||||
read_buffer = ffi.from_buffer(read_result) | ||||
in_buffer.src = read_buffer | ||||
in_buffer.size = len(read_buffer) | ||||
in_buffer.pos = 0 | ||||
while in_buffer.pos < in_buffer.size: | ||||
assert out_buffer.pos == 0 | ||||
Gregory Szorc
|
r37513 | zresult = lib.ZSTD_decompress_generic(self._dctx, out_buffer, in_buffer) | ||
Gregory Szorc
|
r30895 | if lib.ZSTD_isError(zresult): | ||
raise ZstdError('zstd decompress error: %s' % | ||||
Gregory Szorc
|
r37513 | _zstd_error(zresult)) | ||
Gregory Szorc
|
r30895 | |||
if out_buffer.pos: | ||||
data = ffi.buffer(out_buffer.dst, out_buffer.pos)[:] | ||||
out_buffer.pos = 0 | ||||
yield data | ||||
if zresult == 0: | ||||
return | ||||
# Repeat loop to collect more input data. | ||||
continue | ||||
# If we get here, input is exhausted. | ||||
Gregory Szorc
|
r37513 | read_from = read_to_iter | ||
def stream_writer(self, writer, write_size=DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE): | ||||
Gregory Szorc
|
r30895 | if not hasattr(writer, 'write'): | ||
raise ValueError('must pass an object with a write() method') | ||||
return ZstdDecompressionWriter(self, writer, write_size) | ||||
Gregory Szorc
|
r37513 | write_to = stream_writer | ||
Gregory Szorc
|
r30895 | def copy_stream(self, ifh, ofh, | ||
read_size=DECOMPRESSION_RECOMMENDED_INPUT_SIZE, | ||||
write_size=DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE): | ||||
if not hasattr(ifh, 'read'): | ||||
raise ValueError('first argument must have a read() method') | ||||
if not hasattr(ofh, 'write'): | ||||
raise ValueError('second argument must have a write() method') | ||||
Gregory Szorc
|
r37513 | self._ensure_dctx() | ||
Gregory Szorc
|
r30895 | |||
in_buffer = ffi.new('ZSTD_inBuffer *') | ||||
out_buffer = ffi.new('ZSTD_outBuffer *') | ||||
dst_buffer = ffi.new('char[]', write_size) | ||||
out_buffer.dst = dst_buffer | ||||
out_buffer.size = write_size | ||||
out_buffer.pos = 0 | ||||
total_read, total_write = 0, 0 | ||||
# Read all available input. | ||||
while True: | ||||
data = ifh.read(read_size) | ||||
if not data: | ||||
break | ||||
data_buffer = ffi.from_buffer(data) | ||||
total_read += len(data_buffer) | ||||
in_buffer.src = data_buffer | ||||
in_buffer.size = len(data_buffer) | ||||
in_buffer.pos = 0 | ||||
# Flush all read data to output. | ||||
while in_buffer.pos < in_buffer.size: | ||||
Gregory Szorc
|
r37513 | zresult = lib.ZSTD_decompress_generic(self._dctx, out_buffer, in_buffer) | ||
Gregory Szorc
|
r30895 | if lib.ZSTD_isError(zresult): | ||
raise ZstdError('zstd decompressor error: %s' % | ||||
Gregory Szorc
|
r37513 | _zstd_error(zresult)) | ||
Gregory Szorc
|
r30895 | |||
if out_buffer.pos: | ||||
ofh.write(ffi.buffer(out_buffer.dst, out_buffer.pos)) | ||||
total_write += out_buffer.pos | ||||
out_buffer.pos = 0 | ||||
# Continue loop to keep reading. | ||||
return total_read, total_write | ||||
def decompress_content_dict_chain(self, frames): | ||||
if not isinstance(frames, list): | ||||
raise TypeError('argument must be a list') | ||||
if not frames: | ||||
raise ValueError('empty input chain') | ||||
# First chunk should not be using a dictionary. We handle it specially. | ||||
chunk = frames[0] | ||||
if not isinstance(chunk, bytes_type): | ||||
raise ValueError('chunk 0 must be bytes') | ||||
# All chunks should be zstd frames and should have content size set. | ||||
chunk_buffer = ffi.from_buffer(chunk) | ||||
Gregory Szorc
|
r37513 | params = ffi.new('ZSTD_frameHeader *') | ||
zresult = lib.ZSTD_getFrameHeader(params, chunk_buffer, len(chunk_buffer)) | ||||
Gregory Szorc
|
r30895 | if lib.ZSTD_isError(zresult): | ||
raise ValueError('chunk 0 is not a valid zstd frame') | ||||
elif zresult: | ||||
raise ValueError('chunk 0 is too small to contain a zstd frame') | ||||
Gregory Szorc
|
r37513 | if params.frameContentSize == lib.ZSTD_CONTENTSIZE_UNKNOWN: | ||
Gregory Szorc
|
r30895 | raise ValueError('chunk 0 missing content size in frame') | ||
Gregory Szorc
|
r37513 | self._ensure_dctx(load_dict=False) | ||
Gregory Szorc
|
r30895 | |||
last_buffer = ffi.new('char[]', params.frameContentSize) | ||||
Gregory Szorc
|
r37513 | out_buffer = ffi.new('ZSTD_outBuffer *') | ||
out_buffer.dst = last_buffer | ||||
out_buffer.size = len(last_buffer) | ||||
out_buffer.pos = 0 | ||||
in_buffer = ffi.new('ZSTD_inBuffer *') | ||||
in_buffer.src = chunk_buffer | ||||
in_buffer.size = len(chunk_buffer) | ||||
in_buffer.pos = 0 | ||||
zresult = lib.ZSTD_decompress_generic(self._dctx, out_buffer, in_buffer) | ||||
Gregory Szorc
|
r30895 | if lib.ZSTD_isError(zresult): | ||
raise ZstdError('could not decompress chunk 0: %s' % | ||||
Gregory Szorc
|
r37513 | _zstd_error(zresult)) | ||
elif zresult: | ||||
raise ZstdError('chunk 0 did not decompress full frame') | ||||
Gregory Szorc
|
r30895 | |||
# Special case of chain length of 1 | ||||
if len(frames) == 1: | ||||
return ffi.buffer(last_buffer, len(last_buffer))[:] | ||||
i = 1 | ||||
while i < len(frames): | ||||
chunk = frames[i] | ||||
if not isinstance(chunk, bytes_type): | ||||
raise ValueError('chunk %d must be bytes' % i) | ||||
chunk_buffer = ffi.from_buffer(chunk) | ||||
Gregory Szorc
|
r37513 | zresult = lib.ZSTD_getFrameHeader(params, chunk_buffer, len(chunk_buffer)) | ||
Gregory Szorc
|
r30895 | if lib.ZSTD_isError(zresult): | ||
raise ValueError('chunk %d is not a valid zstd frame' % i) | ||||
elif zresult: | ||||
raise ValueError('chunk %d is too small to contain a zstd frame' % i) | ||||
Gregory Szorc
|
r37513 | if params.frameContentSize == lib.ZSTD_CONTENTSIZE_UNKNOWN: | ||
Gregory Szorc
|
r30895 | raise ValueError('chunk %d missing content size in frame' % i) | ||
dest_buffer = ffi.new('char[]', params.frameContentSize) | ||||
Gregory Szorc
|
r37513 | out_buffer.dst = dest_buffer | ||
out_buffer.size = len(dest_buffer) | ||||
out_buffer.pos = 0 | ||||
in_buffer.src = chunk_buffer | ||||
in_buffer.size = len(chunk_buffer) | ||||
in_buffer.pos = 0 | ||||
zresult = lib.ZSTD_decompress_generic(self._dctx, out_buffer, in_buffer) | ||||
Gregory Szorc
|
r30895 | if lib.ZSTD_isError(zresult): | ||
Gregory Szorc
|
r37513 | raise ZstdError('could not decompress chunk %d: %s' % | ||
_zstd_error(zresult)) | ||||
elif zresult: | ||||
raise ZstdError('chunk %d did not decompress full frame' % i) | ||||
Gregory Szorc
|
r30895 | |||
last_buffer = dest_buffer | ||||
i += 1 | ||||
return ffi.buffer(last_buffer, len(last_buffer))[:] | ||||
Gregory Szorc
|
r37513 | def _ensure_dctx(self, load_dict=True): | ||
lib.ZSTD_DCtx_reset(self._dctx) | ||||
if self._max_window_size: | ||||
zresult = lib.ZSTD_DCtx_setMaxWindowSize(self._dctx, | ||||
self._max_window_size) | ||||
Gregory Szorc
|
r31796 | if lib.ZSTD_isError(zresult): | ||
Gregory Szorc
|
r37513 | raise ZstdError('unable to set max window size: %s' % | ||
_zstd_error(zresult)) | ||||
Gregory Szorc
|
r30895 | |||
Gregory Szorc
|
r37513 | zresult = lib.ZSTD_DCtx_setFormat(self._dctx, self._format) | ||
if lib.ZSTD_isError(zresult): | ||||
raise ZstdError('unable to set decoding format: %s' % | ||||
_zstd_error(zresult)) | ||||
Gregory Szorc
|
r30895 | |||
Gregory Szorc
|
r37513 | if self._dict_data and load_dict: | ||
zresult = lib.ZSTD_DCtx_refDDict(self._dctx, self._dict_data._ddict) | ||||
if lib.ZSTD_isError(zresult): | ||||
raise ZstdError('unable to reference prepared dictionary: %s' % | ||||
_zstd_error(zresult)) | ||||