zstd_cffi.py
152 lines
| 4.8 KiB
| text/x-python
|
PythonLexer
Gregory Szorc
|
r30435 | # Copyright (c) 2016-present, Gregory Szorc | ||
# All rights reserved. | ||||
# | ||||
# This software may be modified and distributed under the terms | ||||
# of the BSD license. See the LICENSE file for details. | ||||
"""Python interface to the Zstandard (zstd) compression library.""" | ||||
from __future__ import absolute_import, unicode_literals | ||||
import io | ||||
from _zstd_cffi import ( | ||||
ffi, | ||||
lib, | ||||
) | ||||
_CSTREAM_IN_SIZE = lib.ZSTD_CStreamInSize() | ||||
_CSTREAM_OUT_SIZE = lib.ZSTD_CStreamOutSize() | ||||
class _ZstdCompressionWriter(object): | ||||
def __init__(self, cstream, writer): | ||||
self._cstream = cstream | ||||
self._writer = writer | ||||
def __enter__(self): | ||||
return self | ||||
def __exit__(self, exc_type, exc_value, exc_tb): | ||||
if not exc_type and not exc_value and not exc_tb: | ||||
out_buffer = ffi.new('ZSTD_outBuffer *') | ||||
out_buffer.dst = ffi.new('char[]', _CSTREAM_OUT_SIZE) | ||||
out_buffer.size = _CSTREAM_OUT_SIZE | ||||
out_buffer.pos = 0 | ||||
while True: | ||||
res = lib.ZSTD_endStream(self._cstream, out_buffer) | ||||
if lib.ZSTD_isError(res): | ||||
raise Exception('error ending compression stream: %s' % lib.ZSTD_getErrorName) | ||||
if out_buffer.pos: | ||||
self._writer.write(ffi.buffer(out_buffer.dst, out_buffer.pos)) | ||||
out_buffer.pos = 0 | ||||
if res == 0: | ||||
break | ||||
return False | ||||
def write(self, data): | ||||
out_buffer = ffi.new('ZSTD_outBuffer *') | ||||
out_buffer.dst = ffi.new('char[]', _CSTREAM_OUT_SIZE) | ||||
out_buffer.size = _CSTREAM_OUT_SIZE | ||||
out_buffer.pos = 0 | ||||
# TODO can we reuse existing memory? | ||||
in_buffer = ffi.new('ZSTD_inBuffer *') | ||||
in_buffer.src = ffi.new('char[]', data) | ||||
in_buffer.size = len(data) | ||||
in_buffer.pos = 0 | ||||
while in_buffer.pos < in_buffer.size: | ||||
res = lib.ZSTD_compressStream(self._cstream, out_buffer, in_buffer) | ||||
if lib.ZSTD_isError(res): | ||||
raise Exception('zstd compress error: %s' % lib.ZSTD_getErrorName(res)) | ||||
if out_buffer.pos: | ||||
self._writer.write(ffi.buffer(out_buffer.dst, out_buffer.pos)) | ||||
out_buffer.pos = 0 | ||||
class ZstdCompressor(object): | ||||
def __init__(self, level=3, dict_data=None, compression_params=None): | ||||
if dict_data: | ||||
raise Exception('dict_data not yet supported') | ||||
if compression_params: | ||||
raise Exception('compression_params not yet supported') | ||||
self._compression_level = level | ||||
def compress(self, data): | ||||
# Just use the stream API for now. | ||||
output = io.BytesIO() | ||||
with self.write_to(output) as compressor: | ||||
compressor.write(data) | ||||
return output.getvalue() | ||||
def copy_stream(self, ifh, ofh): | ||||
cstream = self._get_cstream() | ||||
in_buffer = ffi.new('ZSTD_inBuffer *') | ||||
out_buffer = ffi.new('ZSTD_outBuffer *') | ||||
out_buffer.dst = ffi.new('char[]', _CSTREAM_OUT_SIZE) | ||||
out_buffer.size = _CSTREAM_OUT_SIZE | ||||
out_buffer.pos = 0 | ||||
total_read, total_write = 0, 0 | ||||
while True: | ||||
data = ifh.read(_CSTREAM_IN_SIZE) | ||||
if not data: | ||||
break | ||||
total_read += len(data) | ||||
in_buffer.src = ffi.new('char[]', data) | ||||
in_buffer.size = len(data) | ||||
in_buffer.pos = 0 | ||||
while in_buffer.pos < in_buffer.size: | ||||
res = lib.ZSTD_compressStream(cstream, out_buffer, in_buffer) | ||||
if lib.ZSTD_isError(res): | ||||
raise Exception('zstd compress error: %s' % | ||||
lib.ZSTD_getErrorName(res)) | ||||
if out_buffer.pos: | ||||
ofh.write(ffi.buffer(out_buffer.dst, out_buffer.pos)) | ||||
total_write = out_buffer.pos | ||||
out_buffer.pos = 0 | ||||
# We've finished reading. Flush the compressor. | ||||
while True: | ||||
res = lib.ZSTD_endStream(cstream, out_buffer) | ||||
if lib.ZSTD_isError(res): | ||||
raise Exception('error ending compression stream: %s' % | ||||
lib.ZSTD_getErrorName(res)) | ||||
if out_buffer.pos: | ||||
ofh.write(ffi.buffer(out_buffer.dst, out_buffer.pos)) | ||||
total_write += out_buffer.pos | ||||
out_buffer.pos = 0 | ||||
if res == 0: | ||||
break | ||||
return total_read, total_write | ||||
def write_to(self, writer): | ||||
return _ZstdCompressionWriter(self._get_cstream(), writer) | ||||
def _get_cstream(self): | ||||
cstream = lib.ZSTD_createCStream() | ||||
cstream = ffi.gc(cstream, lib.ZSTD_freeCStream) | ||||
res = lib.ZSTD_initCStream(cstream, self._compression_level) | ||||
if lib.ZSTD_isError(res): | ||||
raise Exception('cannot init CStream: %s' % | ||||
lib.ZSTD_getErrorName(res)) | ||||
return cstream | ||||