zstd_cffi.py
1042 lines
| 35.8 KiB
| text/x-python
|
PythonLexer
Gregory Szorc
|
r30435 | # Copyright (c) 2016-present, Gregory Szorc | ||
# All rights reserved. | ||||
# | ||||
# This software may be modified and distributed under the terms | ||||
# of the BSD license. See the LICENSE file for details. | ||||
"""Python interface to the Zstandard (zstd) compression library.""" | ||||
from __future__ import absolute_import, unicode_literals | ||||
Gregory Szorc
|
r30895 | import sys | ||
Gregory Szorc
|
r30435 | |||
from _zstd_cffi import ( | ||||
ffi, | ||||
lib, | ||||
) | ||||
Gregory Szorc
|
r30895 | if sys.version_info[0] == 2: | ||
bytes_type = str | ||||
int_type = long | ||||
else: | ||||
bytes_type = bytes | ||||
int_type = int | ||||
Gregory Szorc
|
r30435 | |||
Gregory Szorc
|
r30895 | |||
COMPRESSION_RECOMMENDED_INPUT_SIZE = lib.ZSTD_CStreamInSize() | ||||
COMPRESSION_RECOMMENDED_OUTPUT_SIZE = lib.ZSTD_CStreamOutSize() | ||||
DECOMPRESSION_RECOMMENDED_INPUT_SIZE = lib.ZSTD_DStreamInSize() | ||||
DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE = lib.ZSTD_DStreamOutSize() | ||||
new_nonzero = ffi.new_allocator(should_clear_after_alloc=False) | ||||
MAX_COMPRESSION_LEVEL = lib.ZSTD_maxCLevel() | ||||
MAGIC_NUMBER = lib.ZSTD_MAGICNUMBER | ||||
FRAME_HEADER = b'\x28\xb5\x2f\xfd' | ||||
ZSTD_VERSION = (lib.ZSTD_VERSION_MAJOR, lib.ZSTD_VERSION_MINOR, lib.ZSTD_VERSION_RELEASE) | ||||
WINDOWLOG_MIN = lib.ZSTD_WINDOWLOG_MIN | ||||
WINDOWLOG_MAX = lib.ZSTD_WINDOWLOG_MAX | ||||
CHAINLOG_MIN = lib.ZSTD_CHAINLOG_MIN | ||||
CHAINLOG_MAX = lib.ZSTD_CHAINLOG_MAX | ||||
HASHLOG_MIN = lib.ZSTD_HASHLOG_MIN | ||||
HASHLOG_MAX = lib.ZSTD_HASHLOG_MAX | ||||
HASHLOG3_MAX = lib.ZSTD_HASHLOG3_MAX | ||||
SEARCHLOG_MIN = lib.ZSTD_SEARCHLOG_MIN | ||||
SEARCHLOG_MAX = lib.ZSTD_SEARCHLOG_MAX | ||||
SEARCHLENGTH_MIN = lib.ZSTD_SEARCHLENGTH_MIN | ||||
SEARCHLENGTH_MAX = lib.ZSTD_SEARCHLENGTH_MAX | ||||
TARGETLENGTH_MIN = lib.ZSTD_TARGETLENGTH_MIN | ||||
TARGETLENGTH_MAX = lib.ZSTD_TARGETLENGTH_MAX | ||||
STRATEGY_FAST = lib.ZSTD_fast | ||||
STRATEGY_DFAST = lib.ZSTD_dfast | ||||
STRATEGY_GREEDY = lib.ZSTD_greedy | ||||
STRATEGY_LAZY = lib.ZSTD_lazy | ||||
STRATEGY_LAZY2 = lib.ZSTD_lazy2 | ||||
STRATEGY_BTLAZY2 = lib.ZSTD_btlazy2 | ||||
STRATEGY_BTOPT = lib.ZSTD_btopt | ||||
COMPRESSOBJ_FLUSH_FINISH = 0 | ||||
COMPRESSOBJ_FLUSH_BLOCK = 1 | ||||
class ZstdError(Exception): | ||||
pass | ||||
Gregory Szorc
|
r30435 | |||
Gregory Szorc
|
r30895 | class CompressionParameters(object): | ||
def __init__(self, window_log, chain_log, hash_log, search_log, | ||||
search_length, target_length, strategy): | ||||
if window_log < WINDOWLOG_MIN or window_log > WINDOWLOG_MAX: | ||||
raise ValueError('invalid window log value') | ||||
if chain_log < CHAINLOG_MIN or chain_log > CHAINLOG_MAX: | ||||
raise ValueError('invalid chain log value') | ||||
if hash_log < HASHLOG_MIN or hash_log > HASHLOG_MAX: | ||||
raise ValueError('invalid hash log value') | ||||
if search_log < SEARCHLOG_MIN or search_log > SEARCHLOG_MAX: | ||||
raise ValueError('invalid search log value') | ||||
if search_length < SEARCHLENGTH_MIN or search_length > SEARCHLENGTH_MAX: | ||||
raise ValueError('invalid search length value') | ||||
if target_length < TARGETLENGTH_MIN or target_length > TARGETLENGTH_MAX: | ||||
raise ValueError('invalid target length value') | ||||
if strategy < STRATEGY_FAST or strategy > STRATEGY_BTOPT: | ||||
raise ValueError('invalid strategy value') | ||||
self.window_log = window_log | ||||
self.chain_log = chain_log | ||||
self.hash_log = hash_log | ||||
self.search_log = search_log | ||||
self.search_length = search_length | ||||
self.target_length = target_length | ||||
self.strategy = strategy | ||||
def as_compression_parameters(self): | ||||
p = ffi.new('ZSTD_compressionParameters *')[0] | ||||
p.windowLog = self.window_log | ||||
p.chainLog = self.chain_log | ||||
p.hashLog = self.hash_log | ||||
p.searchLog = self.search_log | ||||
p.searchLength = self.search_length | ||||
p.targetLength = self.target_length | ||||
p.strategy = self.strategy | ||||
return p | ||||
def get_compression_parameters(level, source_size=0, dict_size=0): | ||||
params = lib.ZSTD_getCParams(level, source_size, dict_size) | ||||
return CompressionParameters(window_log=params.windowLog, | ||||
chain_log=params.chainLog, | ||||
hash_log=params.hashLog, | ||||
search_log=params.searchLog, | ||||
search_length=params.searchLength, | ||||
target_length=params.targetLength, | ||||
strategy=params.strategy) | ||||
def estimate_compression_context_size(params): | ||||
if not isinstance(params, CompressionParameters): | ||||
raise ValueError('argument must be a CompressionParameters') | ||||
cparams = params.as_compression_parameters() | ||||
return lib.ZSTD_estimateCCtxSize(cparams) | ||||
def estimate_decompression_context_size(): | ||||
return lib.ZSTD_estimateDCtxSize() | ||||
class ZstdCompressionWriter(object): | ||||
def __init__(self, compressor, writer, source_size, write_size): | ||||
self._compressor = compressor | ||||
Gregory Szorc
|
r30435 | self._writer = writer | ||
Gregory Szorc
|
r30895 | self._source_size = source_size | ||
self._write_size = write_size | ||||
self._entered = False | ||||
Gregory Szorc
|
r30435 | |||
def __enter__(self): | ||||
Gregory Szorc
|
r30895 | if self._entered: | ||
raise ZstdError('cannot __enter__ multiple times') | ||||
self._cstream = self._compressor._get_cstream(self._source_size) | ||||
self._entered = True | ||||
Gregory Szorc
|
r30435 | return self | ||
def __exit__(self, exc_type, exc_value, exc_tb): | ||||
Gregory Szorc
|
r30895 | self._entered = False | ||
Gregory Szorc
|
r30435 | if not exc_type and not exc_value and not exc_tb: | ||
out_buffer = ffi.new('ZSTD_outBuffer *') | ||||
Gregory Szorc
|
r30895 | dst_buffer = ffi.new('char[]', self._write_size) | ||
out_buffer.dst = dst_buffer | ||||
out_buffer.size = self._write_size | ||||
Gregory Szorc
|
r30435 | out_buffer.pos = 0 | ||
while True: | ||||
Gregory Szorc
|
r30895 | zresult = lib.ZSTD_endStream(self._cstream, out_buffer) | ||
if lib.ZSTD_isError(zresult): | ||||
raise ZstdError('error ending compression stream: %s' % | ||||
ffi.string(lib.ZSTD_getErrorName(zresult))) | ||||
Gregory Szorc
|
r30435 | |||
if out_buffer.pos: | ||||
Gregory Szorc
|
r30895 | self._writer.write(ffi.buffer(out_buffer.dst, out_buffer.pos)[:]) | ||
Gregory Szorc
|
r30435 | out_buffer.pos = 0 | ||
Gregory Szorc
|
r30895 | if zresult == 0: | ||
Gregory Szorc
|
r30435 | break | ||
Gregory Szorc
|
r30895 | self._cstream = None | ||
self._compressor = None | ||||
Gregory Szorc
|
r30435 | return False | ||
Gregory Szorc
|
r30895 | def memory_size(self): | ||
if not self._entered: | ||||
raise ZstdError('cannot determine size of an inactive compressor; ' | ||||
'call when a context manager is active') | ||||
return lib.ZSTD_sizeof_CStream(self._cstream) | ||||
Gregory Szorc
|
r30435 | def write(self, data): | ||
Gregory Szorc
|
r30895 | if not self._entered: | ||
raise ZstdError('write() must be called from an active context ' | ||||
'manager') | ||||
total_write = 0 | ||||
data_buffer = ffi.from_buffer(data) | ||||
in_buffer = ffi.new('ZSTD_inBuffer *') | ||||
in_buffer.src = data_buffer | ||||
in_buffer.size = len(data_buffer) | ||||
in_buffer.pos = 0 | ||||
Gregory Szorc
|
r30435 | out_buffer = ffi.new('ZSTD_outBuffer *') | ||
Gregory Szorc
|
r30895 | dst_buffer = ffi.new('char[]', self._write_size) | ||
out_buffer.dst = dst_buffer | ||||
out_buffer.size = self._write_size | ||||
out_buffer.pos = 0 | ||||
while in_buffer.pos < in_buffer.size: | ||||
zresult = lib.ZSTD_compressStream(self._cstream, out_buffer, in_buffer) | ||||
if lib.ZSTD_isError(zresult): | ||||
raise ZstdError('zstd compress error: %s' % | ||||
ffi.string(lib.ZSTD_getErrorName(zresult))) | ||||
if out_buffer.pos: | ||||
self._writer.write(ffi.buffer(out_buffer.dst, out_buffer.pos)[:]) | ||||
total_write += out_buffer.pos | ||||
out_buffer.pos = 0 | ||||
return total_write | ||||
def flush(self): | ||||
if not self._entered: | ||||
raise ZstdError('flush must be called from an active context manager') | ||||
total_write = 0 | ||||
out_buffer = ffi.new('ZSTD_outBuffer *') | ||||
dst_buffer = ffi.new('char[]', self._write_size) | ||||
out_buffer.dst = dst_buffer | ||||
out_buffer.size = self._write_size | ||||
Gregory Szorc
|
r30435 | out_buffer.pos = 0 | ||
Gregory Szorc
|
r30895 | while True: | ||
zresult = lib.ZSTD_flushStream(self._cstream, out_buffer) | ||||
if lib.ZSTD_isError(zresult): | ||||
raise ZstdError('zstd compress error: %s' % | ||||
ffi.string(lib.ZSTD_getErrorName(zresult))) | ||||
if not out_buffer.pos: | ||||
break | ||||
self._writer.write(ffi.buffer(out_buffer.dst, out_buffer.pos)[:]) | ||||
total_write += out_buffer.pos | ||||
out_buffer.pos = 0 | ||||
return total_write | ||||
class ZstdCompressionObj(object): | ||||
def compress(self, data): | ||||
if self._finished: | ||||
raise ZstdError('cannot call compress() after compressor finished') | ||||
data_buffer = ffi.from_buffer(data) | ||||
source = ffi.new('ZSTD_inBuffer *') | ||||
source.src = data_buffer | ||||
source.size = len(data_buffer) | ||||
source.pos = 0 | ||||
chunks = [] | ||||
while source.pos < len(data): | ||||
zresult = lib.ZSTD_compressStream(self._cstream, self._out, source) | ||||
if lib.ZSTD_isError(zresult): | ||||
raise ZstdError('zstd compress error: %s' % | ||||
ffi.string(lib.ZSTD_getErrorName(zresult))) | ||||
if self._out.pos: | ||||
chunks.append(ffi.buffer(self._out.dst, self._out.pos)[:]) | ||||
self._out.pos = 0 | ||||
return b''.join(chunks) | ||||
Gregory Szorc
|
r30435 | |||
Gregory Szorc
|
r30895 | def flush(self, flush_mode=COMPRESSOBJ_FLUSH_FINISH): | ||
if flush_mode not in (COMPRESSOBJ_FLUSH_FINISH, COMPRESSOBJ_FLUSH_BLOCK): | ||||
raise ValueError('flush mode not recognized') | ||||
if self._finished: | ||||
raise ZstdError('compressor object already finished') | ||||
assert self._out.pos == 0 | ||||
if flush_mode == COMPRESSOBJ_FLUSH_BLOCK: | ||||
zresult = lib.ZSTD_flushStream(self._cstream, self._out) | ||||
if lib.ZSTD_isError(zresult): | ||||
raise ZstdError('zstd compress error: %s' % | ||||
ffi.string(lib.ZSTD_getErrorName(zresult))) | ||||
# Output buffer is guaranteed to hold full block. | ||||
assert zresult == 0 | ||||
if self._out.pos: | ||||
result = ffi.buffer(self._out.dst, self._out.pos)[:] | ||||
self._out.pos = 0 | ||||
return result | ||||
else: | ||||
return b'' | ||||
assert flush_mode == COMPRESSOBJ_FLUSH_FINISH | ||||
self._finished = True | ||||
chunks = [] | ||||
while True: | ||||
zresult = lib.ZSTD_endStream(self._cstream, self._out) | ||||
if lib.ZSTD_isError(zresult): | ||||
raise ZstdError('error ending compression stream: %s' % | ||||
ffi.string(lib.ZSTD_getErroName(zresult))) | ||||
if self._out.pos: | ||||
chunks.append(ffi.buffer(self._out.dst, self._out.pos)[:]) | ||||
self._out.pos = 0 | ||||
if not zresult: | ||||
break | ||||
# GC compression stream immediately. | ||||
self._cstream = None | ||||
return b''.join(chunks) | ||||
Gregory Szorc
|
r30435 | |||
class ZstdCompressor(object): | ||||
Gregory Szorc
|
r30895 | def __init__(self, level=3, dict_data=None, compression_params=None, | ||
write_checksum=False, write_content_size=False, | ||||
write_dict_id=True): | ||||
if level < 1: | ||||
raise ValueError('level must be greater than 0') | ||||
elif level > lib.ZSTD_maxCLevel(): | ||||
raise ValueError('level must be less than %d' % lib.ZSTD_maxCLevel()) | ||||
Gregory Szorc
|
r30435 | |||
self._compression_level = level | ||||
Gregory Szorc
|
r30895 | self._dict_data = dict_data | ||
self._cparams = compression_params | ||||
self._fparams = ffi.new('ZSTD_frameParameters *')[0] | ||||
self._fparams.checksumFlag = write_checksum | ||||
self._fparams.contentSizeFlag = write_content_size | ||||
self._fparams.noDictIDFlag = not write_dict_id | ||||
Gregory Szorc
|
r30435 | |||
Gregory Szorc
|
r30895 | cctx = lib.ZSTD_createCCtx() | ||
if cctx == ffi.NULL: | ||||
raise MemoryError() | ||||
self._cctx = ffi.gc(cctx, lib.ZSTD_freeCCtx) | ||||
def compress(self, data, allow_empty=False): | ||||
if len(data) == 0 and self._fparams.contentSizeFlag and not allow_empty: | ||||
raise ValueError('cannot write empty inputs when writing content sizes') | ||||
# TODO use a CDict for performance. | ||||
dict_data = ffi.NULL | ||||
dict_size = 0 | ||||
if self._dict_data: | ||||
dict_data = self._dict_data.as_bytes() | ||||
dict_size = len(self._dict_data) | ||||
params = ffi.new('ZSTD_parameters *')[0] | ||||
if self._cparams: | ||||
params.cParams = self._cparams.as_compression_parameters() | ||||
else: | ||||
params.cParams = lib.ZSTD_getCParams(self._compression_level, len(data), | ||||
dict_size) | ||||
params.fParams = self._fparams | ||||
dest_size = lib.ZSTD_compressBound(len(data)) | ||||
out = new_nonzero('char[]', dest_size) | ||||
Gregory Szorc
|
r30435 | |||
Gregory Szorc
|
r30895 | zresult = lib.ZSTD_compress_advanced(self._cctx, | ||
ffi.addressof(out), dest_size, | ||||
data, len(data), | ||||
dict_data, dict_size, | ||||
params) | ||||
if lib.ZSTD_isError(zresult): | ||||
raise ZstdError('cannot compress: %s' % | ||||
ffi.string(lib.ZSTD_getErrorName(zresult))) | ||||
return ffi.buffer(out, zresult)[:] | ||||
def compressobj(self, size=0): | ||||
cstream = self._get_cstream(size) | ||||
cobj = ZstdCompressionObj() | ||||
cobj._cstream = cstream | ||||
cobj._out = ffi.new('ZSTD_outBuffer *') | ||||
cobj._dst_buffer = ffi.new('char[]', COMPRESSION_RECOMMENDED_OUTPUT_SIZE) | ||||
cobj._out.dst = cobj._dst_buffer | ||||
cobj._out.size = COMPRESSION_RECOMMENDED_OUTPUT_SIZE | ||||
cobj._out.pos = 0 | ||||
cobj._compressor = self | ||||
cobj._finished = False | ||||
return cobj | ||||
def copy_stream(self, ifh, ofh, size=0, | ||||
read_size=COMPRESSION_RECOMMENDED_INPUT_SIZE, | ||||
write_size=COMPRESSION_RECOMMENDED_OUTPUT_SIZE): | ||||
if not hasattr(ifh, 'read'): | ||||
raise ValueError('first argument must have a read() method') | ||||
if not hasattr(ofh, 'write'): | ||||
raise ValueError('second argument must have a write() method') | ||||
cstream = self._get_cstream(size) | ||||
Gregory Szorc
|
r30435 | |||
in_buffer = ffi.new('ZSTD_inBuffer *') | ||||
out_buffer = ffi.new('ZSTD_outBuffer *') | ||||
Gregory Szorc
|
r30895 | dst_buffer = ffi.new('char[]', write_size) | ||
out_buffer.dst = dst_buffer | ||||
out_buffer.size = write_size | ||||
Gregory Szorc
|
r30435 | out_buffer.pos = 0 | ||
total_read, total_write = 0, 0 | ||||
while True: | ||||
Gregory Szorc
|
r30895 | data = ifh.read(read_size) | ||
Gregory Szorc
|
r30435 | if not data: | ||
break | ||||
Gregory Szorc
|
r30895 | data_buffer = ffi.from_buffer(data) | ||
total_read += len(data_buffer) | ||||
in_buffer.src = data_buffer | ||||
in_buffer.size = len(data_buffer) | ||||
Gregory Szorc
|
r30435 | in_buffer.pos = 0 | ||
while in_buffer.pos < in_buffer.size: | ||||
Gregory Szorc
|
r30895 | zresult = lib.ZSTD_compressStream(cstream, out_buffer, in_buffer) | ||
if lib.ZSTD_isError(zresult): | ||||
raise ZstdError('zstd compress error: %s' % | ||||
ffi.string(lib.ZSTD_getErrorName(zresult))) | ||||
Gregory Szorc
|
r30435 | |||
if out_buffer.pos: | ||||
ofh.write(ffi.buffer(out_buffer.dst, out_buffer.pos)) | ||||
Gregory Szorc
|
r30895 | total_write += out_buffer.pos | ||
Gregory Szorc
|
r30435 | out_buffer.pos = 0 | ||
# We've finished reading. Flush the compressor. | ||||
while True: | ||||
Gregory Szorc
|
r30895 | zresult = lib.ZSTD_endStream(cstream, out_buffer) | ||
if lib.ZSTD_isError(zresult): | ||||
raise ZstdError('error ending compression stream: %s' % | ||||
ffi.string(lib.ZSTD_getErrorName(zresult))) | ||||
Gregory Szorc
|
r30435 | |||
if out_buffer.pos: | ||||
ofh.write(ffi.buffer(out_buffer.dst, out_buffer.pos)) | ||||
total_write += out_buffer.pos | ||||
out_buffer.pos = 0 | ||||
Gregory Szorc
|
r30895 | if zresult == 0: | ||
Gregory Szorc
|
r30435 | break | ||
return total_read, total_write | ||||
Gregory Szorc
|
r30895 | def write_to(self, writer, size=0, | ||
write_size=COMPRESSION_RECOMMENDED_OUTPUT_SIZE): | ||||
if not hasattr(writer, 'write'): | ||||
raise ValueError('must pass an object with a write() method') | ||||
return ZstdCompressionWriter(self, writer, size, write_size) | ||||
def read_from(self, reader, size=0, | ||||
read_size=COMPRESSION_RECOMMENDED_INPUT_SIZE, | ||||
write_size=COMPRESSION_RECOMMENDED_OUTPUT_SIZE): | ||||
if hasattr(reader, 'read'): | ||||
have_read = True | ||||
elif hasattr(reader, '__getitem__'): | ||||
have_read = False | ||||
buffer_offset = 0 | ||||
size = len(reader) | ||||
else: | ||||
raise ValueError('must pass an object with a read() method or ' | ||||
'conforms to buffer protocol') | ||||
cstream = self._get_cstream(size) | ||||
in_buffer = ffi.new('ZSTD_inBuffer *') | ||||
out_buffer = ffi.new('ZSTD_outBuffer *') | ||||
in_buffer.src = ffi.NULL | ||||
in_buffer.size = 0 | ||||
in_buffer.pos = 0 | ||||
dst_buffer = ffi.new('char[]', write_size) | ||||
out_buffer.dst = dst_buffer | ||||
out_buffer.size = write_size | ||||
out_buffer.pos = 0 | ||||
while True: | ||||
# We should never have output data sitting around after a previous | ||||
# iteration. | ||||
assert out_buffer.pos == 0 | ||||
# Collect input data. | ||||
if have_read: | ||||
read_result = reader.read(read_size) | ||||
else: | ||||
remaining = len(reader) - buffer_offset | ||||
slice_size = min(remaining, read_size) | ||||
read_result = reader[buffer_offset:buffer_offset + slice_size] | ||||
buffer_offset += slice_size | ||||
Gregory Szorc
|
r30435 | |||
Gregory Szorc
|
r30895 | # No new input data. Break out of the read loop. | ||
if not read_result: | ||||
break | ||||
# Feed all read data into the compressor and emit output until | ||||
# exhausted. | ||||
read_buffer = ffi.from_buffer(read_result) | ||||
in_buffer.src = read_buffer | ||||
in_buffer.size = len(read_buffer) | ||||
in_buffer.pos = 0 | ||||
while in_buffer.pos < in_buffer.size: | ||||
zresult = lib.ZSTD_compressStream(cstream, out_buffer, in_buffer) | ||||
if lib.ZSTD_isError(zresult): | ||||
raise ZstdError('zstd compress error: %s' % | ||||
ffi.string(lib.ZSTD_getErrorName(zresult))) | ||||
if out_buffer.pos: | ||||
data = ffi.buffer(out_buffer.dst, out_buffer.pos)[:] | ||||
out_buffer.pos = 0 | ||||
yield data | ||||
assert out_buffer.pos == 0 | ||||
# And repeat the loop to collect more data. | ||||
continue | ||||
# If we get here, input is exhausted. End the stream and emit what | ||||
# remains. | ||||
while True: | ||||
assert out_buffer.pos == 0 | ||||
zresult = lib.ZSTD_endStream(cstream, out_buffer) | ||||
if lib.ZSTD_isError(zresult): | ||||
raise ZstdError('error ending compression stream: %s' % | ||||
ffi.string(lib.ZSTD_getErrorName(zresult))) | ||||
if out_buffer.pos: | ||||
data = ffi.buffer(out_buffer.dst, out_buffer.pos)[:] | ||||
out_buffer.pos = 0 | ||||
yield data | ||||
if zresult == 0: | ||||
break | ||||
def _get_cstream(self, size): | ||||
Gregory Szorc
|
r30435 | cstream = lib.ZSTD_createCStream() | ||
Gregory Szorc
|
r30895 | if cstream == ffi.NULL: | ||
raise MemoryError() | ||||
Gregory Szorc
|
r30435 | cstream = ffi.gc(cstream, lib.ZSTD_freeCStream) | ||
Gregory Szorc
|
r30895 | dict_data = ffi.NULL | ||
dict_size = 0 | ||||
if self._dict_data: | ||||
dict_data = self._dict_data.as_bytes() | ||||
dict_size = len(self._dict_data) | ||||
zparams = ffi.new('ZSTD_parameters *')[0] | ||||
if self._cparams: | ||||
zparams.cParams = self._cparams.as_compression_parameters() | ||||
else: | ||||
zparams.cParams = lib.ZSTD_getCParams(self._compression_level, | ||||
size, dict_size) | ||||
zparams.fParams = self._fparams | ||||
zresult = lib.ZSTD_initCStream_advanced(cstream, dict_data, dict_size, | ||||
zparams, size) | ||||
if lib.ZSTD_isError(zresult): | ||||
Gregory Szorc
|
r30435 | raise Exception('cannot init CStream: %s' % | ||
Gregory Szorc
|
r30895 | ffi.string(lib.ZSTD_getErrorName(zresult))) | ||
Gregory Szorc
|
r30435 | |||
return cstream | ||||
Gregory Szorc
|
r30895 | |||
class FrameParameters(object): | ||||
def __init__(self, fparams): | ||||
self.content_size = fparams.frameContentSize | ||||
self.window_size = fparams.windowSize | ||||
self.dict_id = fparams.dictID | ||||
self.has_checksum = bool(fparams.checksumFlag) | ||||
def get_frame_parameters(data): | ||||
if not isinstance(data, bytes_type): | ||||
raise TypeError('argument must be bytes') | ||||
params = ffi.new('ZSTD_frameParams *') | ||||
zresult = lib.ZSTD_getFrameParams(params, data, len(data)) | ||||
if lib.ZSTD_isError(zresult): | ||||
raise ZstdError('cannot get frame parameters: %s' % | ||||
ffi.string(lib.ZSTD_getErrorName(zresult))) | ||||
if zresult: | ||||
raise ZstdError('not enough data for frame parameters; need %d bytes' % | ||||
zresult) | ||||
return FrameParameters(params[0]) | ||||
class ZstdCompressionDict(object): | ||||
def __init__(self, data): | ||||
assert isinstance(data, bytes_type) | ||||
self._data = data | ||||
def __len__(self): | ||||
return len(self._data) | ||||
def dict_id(self): | ||||
return int_type(lib.ZDICT_getDictID(self._data, len(self._data))) | ||||
def as_bytes(self): | ||||
return self._data | ||||
def train_dictionary(dict_size, samples, parameters=None): | ||||
if not isinstance(samples, list): | ||||
raise TypeError('samples must be a list') | ||||
total_size = sum(map(len, samples)) | ||||
samples_buffer = new_nonzero('char[]', total_size) | ||||
sample_sizes = new_nonzero('size_t[]', len(samples)) | ||||
offset = 0 | ||||
for i, sample in enumerate(samples): | ||||
if not isinstance(sample, bytes_type): | ||||
raise ValueError('samples must be bytes') | ||||
l = len(sample) | ||||
ffi.memmove(samples_buffer + offset, sample, l) | ||||
offset += l | ||||
sample_sizes[i] = l | ||||
dict_data = new_nonzero('char[]', dict_size) | ||||
zresult = lib.ZDICT_trainFromBuffer(ffi.addressof(dict_data), dict_size, | ||||
ffi.addressof(samples_buffer), | ||||
ffi.addressof(sample_sizes, 0), | ||||
len(samples)) | ||||
if lib.ZDICT_isError(zresult): | ||||
raise ZstdError('Cannot train dict: %s' % | ||||
ffi.string(lib.ZDICT_getErrorName(zresult))) | ||||
return ZstdCompressionDict(ffi.buffer(dict_data, zresult)[:]) | ||||
class ZstdDecompressionObj(object): | ||||
def __init__(self, decompressor): | ||||
self._decompressor = decompressor | ||||
self._dstream = self._decompressor._get_dstream() | ||||
self._finished = False | ||||
def decompress(self, data): | ||||
if self._finished: | ||||
raise ZstdError('cannot use a decompressobj multiple times') | ||||
in_buffer = ffi.new('ZSTD_inBuffer *') | ||||
out_buffer = ffi.new('ZSTD_outBuffer *') | ||||
data_buffer = ffi.from_buffer(data) | ||||
in_buffer.src = data_buffer | ||||
in_buffer.size = len(data_buffer) | ||||
in_buffer.pos = 0 | ||||
dst_buffer = ffi.new('char[]', DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE) | ||||
out_buffer.dst = dst_buffer | ||||
out_buffer.size = len(dst_buffer) | ||||
out_buffer.pos = 0 | ||||
chunks = [] | ||||
while in_buffer.pos < in_buffer.size: | ||||
zresult = lib.ZSTD_decompressStream(self._dstream, out_buffer, in_buffer) | ||||
if lib.ZSTD_isError(zresult): | ||||
raise ZstdError('zstd decompressor error: %s' % | ||||
ffi.string(lib.ZSTD_getErrorName(zresult))) | ||||
if zresult == 0: | ||||
self._finished = True | ||||
self._dstream = None | ||||
self._decompressor = None | ||||
if out_buffer.pos: | ||||
chunks.append(ffi.buffer(out_buffer.dst, out_buffer.pos)[:]) | ||||
out_buffer.pos = 0 | ||||
return b''.join(chunks) | ||||
class ZstdDecompressionWriter(object): | ||||
def __init__(self, decompressor, writer, write_size): | ||||
self._decompressor = decompressor | ||||
self._writer = writer | ||||
self._write_size = write_size | ||||
self._dstream = None | ||||
self._entered = False | ||||
def __enter__(self): | ||||
if self._entered: | ||||
raise ZstdError('cannot __enter__ multiple times') | ||||
self._dstream = self._decompressor._get_dstream() | ||||
self._entered = True | ||||
return self | ||||
def __exit__(self, exc_type, exc_value, exc_tb): | ||||
self._entered = False | ||||
self._dstream = None | ||||
def memory_size(self): | ||||
if not self._dstream: | ||||
raise ZstdError('cannot determine size of inactive decompressor ' | ||||
'call when context manager is active') | ||||
return lib.ZSTD_sizeof_DStream(self._dstream) | ||||
def write(self, data): | ||||
if not self._entered: | ||||
raise ZstdError('write must be called from an active context manager') | ||||
total_write = 0 | ||||
in_buffer = ffi.new('ZSTD_inBuffer *') | ||||
out_buffer = ffi.new('ZSTD_outBuffer *') | ||||
data_buffer = ffi.from_buffer(data) | ||||
in_buffer.src = data_buffer | ||||
in_buffer.size = len(data_buffer) | ||||
in_buffer.pos = 0 | ||||
dst_buffer = ffi.new('char[]', self._write_size) | ||||
out_buffer.dst = dst_buffer | ||||
out_buffer.size = len(dst_buffer) | ||||
out_buffer.pos = 0 | ||||
while in_buffer.pos < in_buffer.size: | ||||
zresult = lib.ZSTD_decompressStream(self._dstream, out_buffer, in_buffer) | ||||
if lib.ZSTD_isError(zresult): | ||||
raise ZstdError('zstd decompress error: %s' % | ||||
ffi.string(lib.ZSTD_getErrorName(zresult))) | ||||
if out_buffer.pos: | ||||
self._writer.write(ffi.buffer(out_buffer.dst, out_buffer.pos)[:]) | ||||
total_write += out_buffer.pos | ||||
out_buffer.pos = 0 | ||||
return total_write | ||||
class ZstdDecompressor(object): | ||||
def __init__(self, dict_data=None): | ||||
self._dict_data = dict_data | ||||
dctx = lib.ZSTD_createDCtx() | ||||
if dctx == ffi.NULL: | ||||
raise MemoryError() | ||||
self._refdctx = ffi.gc(dctx, lib.ZSTD_freeDCtx) | ||||
@property | ||||
def _ddict(self): | ||||
if self._dict_data: | ||||
dict_data = self._dict_data.as_bytes() | ||||
dict_size = len(self._dict_data) | ||||
ddict = lib.ZSTD_createDDict(dict_data, dict_size) | ||||
if ddict == ffi.NULL: | ||||
raise ZstdError('could not create decompression dict') | ||||
else: | ||||
ddict = None | ||||
self.__dict__['_ddict'] = ddict | ||||
return ddict | ||||
def decompress(self, data, max_output_size=0): | ||||
data_buffer = ffi.from_buffer(data) | ||||
orig_dctx = new_nonzero('char[]', lib.ZSTD_sizeof_DCtx(self._refdctx)) | ||||
dctx = ffi.cast('ZSTD_DCtx *', orig_dctx) | ||||
lib.ZSTD_copyDCtx(dctx, self._refdctx) | ||||
ddict = self._ddict | ||||
output_size = lib.ZSTD_getDecompressedSize(data_buffer, len(data_buffer)) | ||||
if output_size: | ||||
result_buffer = ffi.new('char[]', output_size) | ||||
result_size = output_size | ||||
else: | ||||
if not max_output_size: | ||||
raise ZstdError('input data invalid or missing content size ' | ||||
'in frame header') | ||||
result_buffer = ffi.new('char[]', max_output_size) | ||||
result_size = max_output_size | ||||
if ddict: | ||||
zresult = lib.ZSTD_decompress_usingDDict(dctx, | ||||
result_buffer, result_size, | ||||
data_buffer, len(data_buffer), | ||||
ddict) | ||||
else: | ||||
zresult = lib.ZSTD_decompressDCtx(dctx, | ||||
result_buffer, result_size, | ||||
data_buffer, len(data_buffer)) | ||||
if lib.ZSTD_isError(zresult): | ||||
raise ZstdError('decompression error: %s' % | ||||
ffi.string(lib.ZSTD_getErrorName(zresult))) | ||||
elif output_size and zresult != output_size: | ||||
raise ZstdError('decompression error: decompressed %d bytes; expected %d' % | ||||
(zresult, output_size)) | ||||
return ffi.buffer(result_buffer, zresult)[:] | ||||
def decompressobj(self): | ||||
return ZstdDecompressionObj(self) | ||||
def read_from(self, reader, read_size=DECOMPRESSION_RECOMMENDED_INPUT_SIZE, | ||||
write_size=DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE, | ||||
skip_bytes=0): | ||||
if skip_bytes >= read_size: | ||||
raise ValueError('skip_bytes must be smaller than read_size') | ||||
if hasattr(reader, 'read'): | ||||
have_read = True | ||||
elif hasattr(reader, '__getitem__'): | ||||
have_read = False | ||||
buffer_offset = 0 | ||||
size = len(reader) | ||||
else: | ||||
raise ValueError('must pass an object with a read() method or ' | ||||
'conforms to buffer protocol') | ||||
if skip_bytes: | ||||
if have_read: | ||||
reader.read(skip_bytes) | ||||
else: | ||||
if skip_bytes > size: | ||||
raise ValueError('skip_bytes larger than first input chunk') | ||||
buffer_offset = skip_bytes | ||||
dstream = self._get_dstream() | ||||
in_buffer = ffi.new('ZSTD_inBuffer *') | ||||
out_buffer = ffi.new('ZSTD_outBuffer *') | ||||
dst_buffer = ffi.new('char[]', write_size) | ||||
out_buffer.dst = dst_buffer | ||||
out_buffer.size = len(dst_buffer) | ||||
out_buffer.pos = 0 | ||||
while True: | ||||
assert out_buffer.pos == 0 | ||||
if have_read: | ||||
read_result = reader.read(read_size) | ||||
else: | ||||
remaining = size - buffer_offset | ||||
slice_size = min(remaining, read_size) | ||||
read_result = reader[buffer_offset:buffer_offset + slice_size] | ||||
buffer_offset += slice_size | ||||
# No new input. Break out of read loop. | ||||
if not read_result: | ||||
break | ||||
# Feed all read data into decompressor and emit output until | ||||
# exhausted. | ||||
read_buffer = ffi.from_buffer(read_result) | ||||
in_buffer.src = read_buffer | ||||
in_buffer.size = len(read_buffer) | ||||
in_buffer.pos = 0 | ||||
while in_buffer.pos < in_buffer.size: | ||||
assert out_buffer.pos == 0 | ||||
zresult = lib.ZSTD_decompressStream(dstream, out_buffer, in_buffer) | ||||
if lib.ZSTD_isError(zresult): | ||||
raise ZstdError('zstd decompress error: %s' % | ||||
ffi.string(lib.ZSTD_getErrorName(zresult))) | ||||
if out_buffer.pos: | ||||
data = ffi.buffer(out_buffer.dst, out_buffer.pos)[:] | ||||
out_buffer.pos = 0 | ||||
yield data | ||||
if zresult == 0: | ||||
return | ||||
# Repeat loop to collect more input data. | ||||
continue | ||||
# If we get here, input is exhausted. | ||||
def write_to(self, writer, write_size=DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE): | ||||
if not hasattr(writer, 'write'): | ||||
raise ValueError('must pass an object with a write() method') | ||||
return ZstdDecompressionWriter(self, writer, write_size) | ||||
def copy_stream(self, ifh, ofh, | ||||
read_size=DECOMPRESSION_RECOMMENDED_INPUT_SIZE, | ||||
write_size=DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE): | ||||
if not hasattr(ifh, 'read'): | ||||
raise ValueError('first argument must have a read() method') | ||||
if not hasattr(ofh, 'write'): | ||||
raise ValueError('second argument must have a write() method') | ||||
dstream = self._get_dstream() | ||||
in_buffer = ffi.new('ZSTD_inBuffer *') | ||||
out_buffer = ffi.new('ZSTD_outBuffer *') | ||||
dst_buffer = ffi.new('char[]', write_size) | ||||
out_buffer.dst = dst_buffer | ||||
out_buffer.size = write_size | ||||
out_buffer.pos = 0 | ||||
total_read, total_write = 0, 0 | ||||
# Read all available input. | ||||
while True: | ||||
data = ifh.read(read_size) | ||||
if not data: | ||||
break | ||||
data_buffer = ffi.from_buffer(data) | ||||
total_read += len(data_buffer) | ||||
in_buffer.src = data_buffer | ||||
in_buffer.size = len(data_buffer) | ||||
in_buffer.pos = 0 | ||||
# Flush all read data to output. | ||||
while in_buffer.pos < in_buffer.size: | ||||
zresult = lib.ZSTD_decompressStream(dstream, out_buffer, in_buffer) | ||||
if lib.ZSTD_isError(zresult): | ||||
raise ZstdError('zstd decompressor error: %s' % | ||||
ffi.string(lib.ZSTD_getErrorName(zresult))) | ||||
if out_buffer.pos: | ||||
ofh.write(ffi.buffer(out_buffer.dst, out_buffer.pos)) | ||||
total_write += out_buffer.pos | ||||
out_buffer.pos = 0 | ||||
# Continue loop to keep reading. | ||||
return total_read, total_write | ||||
def decompress_content_dict_chain(self, frames): | ||||
if not isinstance(frames, list): | ||||
raise TypeError('argument must be a list') | ||||
if not frames: | ||||
raise ValueError('empty input chain') | ||||
# First chunk should not be using a dictionary. We handle it specially. | ||||
chunk = frames[0] | ||||
if not isinstance(chunk, bytes_type): | ||||
raise ValueError('chunk 0 must be bytes') | ||||
# All chunks should be zstd frames and should have content size set. | ||||
chunk_buffer = ffi.from_buffer(chunk) | ||||
params = ffi.new('ZSTD_frameParams *') | ||||
zresult = lib.ZSTD_getFrameParams(params, chunk_buffer, len(chunk_buffer)) | ||||
if lib.ZSTD_isError(zresult): | ||||
raise ValueError('chunk 0 is not a valid zstd frame') | ||||
elif zresult: | ||||
raise ValueError('chunk 0 is too small to contain a zstd frame') | ||||
if not params.frameContentSize: | ||||
raise ValueError('chunk 0 missing content size in frame') | ||||
dctx = lib.ZSTD_createDCtx() | ||||
if dctx == ffi.NULL: | ||||
raise MemoryError() | ||||
dctx = ffi.gc(dctx, lib.ZSTD_freeDCtx) | ||||
last_buffer = ffi.new('char[]', params.frameContentSize) | ||||
zresult = lib.ZSTD_decompressDCtx(dctx, last_buffer, len(last_buffer), | ||||
chunk_buffer, len(chunk_buffer)) | ||||
if lib.ZSTD_isError(zresult): | ||||
raise ZstdError('could not decompress chunk 0: %s' % | ||||
ffi.string(lib.ZSTD_getErrorName(zresult))) | ||||
# Special case of chain length of 1 | ||||
if len(frames) == 1: | ||||
return ffi.buffer(last_buffer, len(last_buffer))[:] | ||||
i = 1 | ||||
while i < len(frames): | ||||
chunk = frames[i] | ||||
if not isinstance(chunk, bytes_type): | ||||
raise ValueError('chunk %d must be bytes' % i) | ||||
chunk_buffer = ffi.from_buffer(chunk) | ||||
zresult = lib.ZSTD_getFrameParams(params, chunk_buffer, len(chunk_buffer)) | ||||
if lib.ZSTD_isError(zresult): | ||||
raise ValueError('chunk %d is not a valid zstd frame' % i) | ||||
elif zresult: | ||||
raise ValueError('chunk %d is too small to contain a zstd frame' % i) | ||||
if not params.frameContentSize: | ||||
raise ValueError('chunk %d missing content size in frame' % i) | ||||
dest_buffer = ffi.new('char[]', params.frameContentSize) | ||||
zresult = lib.ZSTD_decompress_usingDict(dctx, dest_buffer, len(dest_buffer), | ||||
chunk_buffer, len(chunk_buffer), | ||||
last_buffer, len(last_buffer)) | ||||
if lib.ZSTD_isError(zresult): | ||||
raise ZstdError('could not decompress chunk %d' % i) | ||||
last_buffer = dest_buffer | ||||
i += 1 | ||||
return ffi.buffer(last_buffer, len(last_buffer))[:] | ||||
def _get_dstream(self): | ||||
dstream = lib.ZSTD_createDStream() | ||||
if dstream == ffi.NULL: | ||||
raise MemoryError() | ||||
dstream = ffi.gc(dstream, lib.ZSTD_freeDStream) | ||||
if self._dict_data: | ||||
zresult = lib.ZSTD_initDStream_usingDict(dstream, | ||||
self._dict_data.as_bytes(), | ||||
len(self._dict_data)) | ||||
else: | ||||
zresult = lib.ZSTD_initDStream(dstream) | ||||
if lib.ZSTD_isError(zresult): | ||||
raise ZstdError('could not initialize DStream: %s' % | ||||
ffi.string(lib.ZSTD_getErrorName(zresult))) | ||||
return dstream | ||||