##// END OF EJS Templates
bundle1: fix bundle1-denied reporting for push over ssh...
bundle1: fix bundle1-denied reporting for push over ssh Changeset b288fb2724bf introduced a config option to have the server deny push using bundle1. The original protocol has not really be design to allow such kind of error reporting so some hack was used. It turned the hack only works on HTTP and that ssh wire peer hangs forever when the same hack is used. After further digging, there is no way to report the error in a unified way. Using 'ooberror' freeze ssh and raising 'Abort' makes HTTP return a HTTP500 without further details. So with sadness we implement a version that dispatch according to the protocol used. We also add a test for pushing over ssh to make sure we won't regress in the future. That test show that the hint is missing, this is another bug fixed in the next changeset.

File last commit:

r30895:c32454d6 default
r30909:d554e624 stable
Show More
zstd_cffi.py
152 lines | 4.8 KiB | text/x-python | PythonLexer
# Copyright (c) 2016-present, Gregory Szorc
# All rights reserved.
#
# This software may be modified and distributed under the terms
# of the BSD license. See the LICENSE file for details.
"""Python interface to the Zstandard (zstd) compression library."""
from __future__ import absolute_import, unicode_literals
import io
from _zstd_cffi import (
ffi,
lib,
)
_CSTREAM_IN_SIZE = lib.ZSTD_CStreamInSize()
_CSTREAM_OUT_SIZE = lib.ZSTD_CStreamOutSize()
class _ZstdCompressionWriter(object):
def __init__(self, cstream, writer):
self._cstream = cstream
self._writer = writer
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, exc_tb):
if not exc_type and not exc_value and not exc_tb:
out_buffer = ffi.new('ZSTD_outBuffer *')
out_buffer.dst = ffi.new('char[]', _CSTREAM_OUT_SIZE)
out_buffer.size = _CSTREAM_OUT_SIZE
out_buffer.pos = 0
while True:
res = lib.ZSTD_endStream(self._cstream, out_buffer)
if lib.ZSTD_isError(res):
raise Exception('error ending compression stream: %s' % lib.ZSTD_getErrorName)
if out_buffer.pos:
self._writer.write(ffi.buffer(out_buffer.dst, out_buffer.pos))
out_buffer.pos = 0
if res == 0:
break
return False
def write(self, data):
out_buffer = ffi.new('ZSTD_outBuffer *')
out_buffer.dst = ffi.new('char[]', _CSTREAM_OUT_SIZE)
out_buffer.size = _CSTREAM_OUT_SIZE
out_buffer.pos = 0
# TODO can we reuse existing memory?
in_buffer = ffi.new('ZSTD_inBuffer *')
in_buffer.src = ffi.new('char[]', data)
in_buffer.size = len(data)
in_buffer.pos = 0
while in_buffer.pos < in_buffer.size:
res = lib.ZSTD_compressStream(self._cstream, out_buffer, in_buffer)
if lib.ZSTD_isError(res):
raise Exception('zstd compress error: %s' % lib.ZSTD_getErrorName(res))
if out_buffer.pos:
self._writer.write(ffi.buffer(out_buffer.dst, out_buffer.pos))
out_buffer.pos = 0
class ZstdCompressor(object):
def __init__(self, level=3, dict_data=None, compression_params=None):
if dict_data:
raise Exception('dict_data not yet supported')
if compression_params:
raise Exception('compression_params not yet supported')
self._compression_level = level
def compress(self, data):
# Just use the stream API for now.
output = io.BytesIO()
with self.write_to(output) as compressor:
compressor.write(data)
return output.getvalue()
def copy_stream(self, ifh, ofh):
cstream = self._get_cstream()
in_buffer = ffi.new('ZSTD_inBuffer *')
out_buffer = ffi.new('ZSTD_outBuffer *')
out_buffer.dst = ffi.new('char[]', _CSTREAM_OUT_SIZE)
out_buffer.size = _CSTREAM_OUT_SIZE
out_buffer.pos = 0
total_read, total_write = 0, 0
while True:
data = ifh.read(_CSTREAM_IN_SIZE)
if not data:
break
total_read += len(data)
in_buffer.src = ffi.new('char[]', data)
in_buffer.size = len(data)
in_buffer.pos = 0
while in_buffer.pos < in_buffer.size:
res = lib.ZSTD_compressStream(cstream, out_buffer, in_buffer)
if lib.ZSTD_isError(res):
raise Exception('zstd compress error: %s' %
lib.ZSTD_getErrorName(res))
if out_buffer.pos:
ofh.write(ffi.buffer(out_buffer.dst, out_buffer.pos))
total_write = out_buffer.pos
out_buffer.pos = 0
# We've finished reading. Flush the compressor.
while True:
res = lib.ZSTD_endStream(cstream, out_buffer)
if lib.ZSTD_isError(res):
raise Exception('error ending compression stream: %s' %
lib.ZSTD_getErrorName(res))
if out_buffer.pos:
ofh.write(ffi.buffer(out_buffer.dst, out_buffer.pos))
total_write += out_buffer.pos
out_buffer.pos = 0
if res == 0:
break
return total_read, total_write
def write_to(self, writer):
return _ZstdCompressionWriter(self._get_cstream(), writer)
def _get_cstream(self):
cstream = lib.ZSTD_createCStream()
cstream = ffi.gc(cstream, lib.ZSTD_freeCStream)
res = lib.ZSTD_initCStream(cstream, self._compression_level)
if lib.ZSTD_isError(res):
raise Exception('cannot init CStream: %s' %
lib.ZSTD_getErrorName(res))
return cstream