##// END OF EJS Templates
dispatch: protect against malicious 'hg serve --stdio' invocations (sec)...
dispatch: protect against malicious 'hg serve --stdio' invocations (sec) Some shared-ssh installations assume that 'hg serve --stdio' is a safe command to run for minimally trusted users. Unfortunately, the messy implementation of argument parsing here meant that trying to access a repo named '--debugger' would give the user a pdb prompt, thereby sidestepping any hoped-for sandboxing. Serving repositories over HTTP(S) is unaffected. We're not currently hardening any subcommands other than 'serve'. If your service exposes other commands to users with arbitrary repository names, it is imperative that you defend against repository names of '--debugger' and anything starting with '--config'. The read-only mode of hg-ssh stopped working because it provided its hook configuration to "hg serve --stdio" via --config parameter. This is banned for security reasons now. This patch switches it to directly call ui.setconfig(). If your custom hosting infrastructure relies on passing --config to "hg serve --stdio", you'll need to find a different way to get that configuration into Mercurial, either by using ui.setconfig() as hg-ssh does in this patch, or by placing an hgrc file someplace where Mercurial will read it. mitrandir@fb.com provided some extra fixes for the dispatch code and for hg-ssh in places that I overlooked.

File last commit:

r31796:e0dc4053 default
r32050:77eaf953 4.1.3 stable
Show More
test_compressor.py
536 lines | 17.8 KiB | text/x-python | PythonLexer
import hashlib
import io
import struct
import sys
try:
import unittest2 as unittest
except ImportError:
import unittest
import zstd
from .common import OpCountingBytesIO
if sys.version_info[0] >= 3:
next = lambda it: it.__next__()
else:
next = lambda it: it.next()
class TestCompressor(unittest.TestCase):
def test_level_bounds(self):
with self.assertRaises(ValueError):
zstd.ZstdCompressor(level=0)
with self.assertRaises(ValueError):
zstd.ZstdCompressor(level=23)
class TestCompressor_compress(unittest.TestCase):
def test_compress_empty(self):
cctx = zstd.ZstdCompressor(level=1)
cctx.compress(b'')
cctx = zstd.ZstdCompressor(level=22)
cctx.compress(b'')
def test_compress_empty(self):
cctx = zstd.ZstdCompressor(level=1)
self.assertEqual(cctx.compress(b''),
b'\x28\xb5\x2f\xfd\x00\x48\x01\x00\x00')
# TODO should be temporary until https://github.com/facebook/zstd/issues/506
# is fixed.
cctx = zstd.ZstdCompressor(write_content_size=True)
with self.assertRaises(ValueError):
cctx.compress(b'')
cctx.compress(b'', allow_empty=True)
def test_compress_large(self):
chunks = []
for i in range(255):
chunks.append(struct.Struct('>B').pack(i) * 16384)
cctx = zstd.ZstdCompressor(level=3)
result = cctx.compress(b''.join(chunks))
self.assertEqual(len(result), 999)
self.assertEqual(result[0:4], b'\x28\xb5\x2f\xfd')
def test_write_checksum(self):
cctx = zstd.ZstdCompressor(level=1)
no_checksum = cctx.compress(b'foobar')
cctx = zstd.ZstdCompressor(level=1, write_checksum=True)
with_checksum = cctx.compress(b'foobar')
self.assertEqual(len(with_checksum), len(no_checksum) + 4)
def test_write_content_size(self):
cctx = zstd.ZstdCompressor(level=1)
no_size = cctx.compress(b'foobar' * 256)
cctx = zstd.ZstdCompressor(level=1, write_content_size=True)
with_size = cctx.compress(b'foobar' * 256)
self.assertEqual(len(with_size), len(no_size) + 1)
def test_no_dict_id(self):
samples = []
for i in range(128):
samples.append(b'foo' * 64)
samples.append(b'bar' * 64)
samples.append(b'foobar' * 64)
d = zstd.train_dictionary(1024, samples)
cctx = zstd.ZstdCompressor(level=1, dict_data=d)
with_dict_id = cctx.compress(b'foobarfoobar')
cctx = zstd.ZstdCompressor(level=1, dict_data=d, write_dict_id=False)
no_dict_id = cctx.compress(b'foobarfoobar')
self.assertEqual(len(with_dict_id), len(no_dict_id) + 4)
def test_compress_dict_multiple(self):
samples = []
for i in range(128):
samples.append(b'foo' * 64)
samples.append(b'bar' * 64)
samples.append(b'foobar' * 64)
d = zstd.train_dictionary(8192, samples)
cctx = zstd.ZstdCompressor(level=1, dict_data=d)
for i in range(32):
cctx.compress(b'foo bar foobar foo bar foobar')
class TestCompressor_compressobj(unittest.TestCase):
def test_compressobj_empty(self):
cctx = zstd.ZstdCompressor(level=1)
cobj = cctx.compressobj()
self.assertEqual(cobj.compress(b''), b'')
self.assertEqual(cobj.flush(),
b'\x28\xb5\x2f\xfd\x00\x48\x01\x00\x00')
def test_compressobj_large(self):
chunks = []
for i in range(255):
chunks.append(struct.Struct('>B').pack(i) * 16384)
cctx = zstd.ZstdCompressor(level=3)
cobj = cctx.compressobj()
result = cobj.compress(b''.join(chunks)) + cobj.flush()
self.assertEqual(len(result), 999)
self.assertEqual(result[0:4], b'\x28\xb5\x2f\xfd')
def test_write_checksum(self):
cctx = zstd.ZstdCompressor(level=1)
cobj = cctx.compressobj()
no_checksum = cobj.compress(b'foobar') + cobj.flush()
cctx = zstd.ZstdCompressor(level=1, write_checksum=True)
cobj = cctx.compressobj()
with_checksum = cobj.compress(b'foobar') + cobj.flush()
self.assertEqual(len(with_checksum), len(no_checksum) + 4)
def test_write_content_size(self):
cctx = zstd.ZstdCompressor(level=1)
cobj = cctx.compressobj(size=len(b'foobar' * 256))
no_size = cobj.compress(b'foobar' * 256) + cobj.flush()
cctx = zstd.ZstdCompressor(level=1, write_content_size=True)
cobj = cctx.compressobj(size=len(b'foobar' * 256))
with_size = cobj.compress(b'foobar' * 256) + cobj.flush()
self.assertEqual(len(with_size), len(no_size) + 1)
def test_compress_after_finished(self):
cctx = zstd.ZstdCompressor()
cobj = cctx.compressobj()
cobj.compress(b'foo')
cobj.flush()
with self.assertRaisesRegexp(zstd.ZstdError, 'cannot call compress\(\) after compressor'):
cobj.compress(b'foo')
with self.assertRaisesRegexp(zstd.ZstdError, 'compressor object already finished'):
cobj.flush()
def test_flush_block_repeated(self):
cctx = zstd.ZstdCompressor(level=1)
cobj = cctx.compressobj()
self.assertEqual(cobj.compress(b'foo'), b'')
self.assertEqual(cobj.flush(zstd.COMPRESSOBJ_FLUSH_BLOCK),
b'\x28\xb5\x2f\xfd\x00\x48\x18\x00\x00foo')
self.assertEqual(cobj.compress(b'bar'), b'')
# 3 byte header plus content.
self.assertEqual(cobj.flush(), b'\x19\x00\x00bar')
def test_flush_empty_block(self):
cctx = zstd.ZstdCompressor(write_checksum=True)
cobj = cctx.compressobj()
cobj.compress(b'foobar')
cobj.flush(zstd.COMPRESSOBJ_FLUSH_BLOCK)
# No-op if no block is active (this is internal to zstd).
self.assertEqual(cobj.flush(zstd.COMPRESSOBJ_FLUSH_BLOCK), b'')
trailing = cobj.flush()
# 3 bytes block header + 4 bytes frame checksum
self.assertEqual(len(trailing), 7)
header = trailing[0:3]
self.assertEqual(header, b'\x01\x00\x00')
class TestCompressor_copy_stream(unittest.TestCase):
def test_no_read(self):
source = object()
dest = io.BytesIO()
cctx = zstd.ZstdCompressor()
with self.assertRaises(ValueError):
cctx.copy_stream(source, dest)
def test_no_write(self):
source = io.BytesIO()
dest = object()
cctx = zstd.ZstdCompressor()
with self.assertRaises(ValueError):
cctx.copy_stream(source, dest)
def test_empty(self):
source = io.BytesIO()
dest = io.BytesIO()
cctx = zstd.ZstdCompressor(level=1)
r, w = cctx.copy_stream(source, dest)
self.assertEqual(int(r), 0)
self.assertEqual(w, 9)
self.assertEqual(dest.getvalue(),
b'\x28\xb5\x2f\xfd\x00\x48\x01\x00\x00')
def test_large_data(self):
source = io.BytesIO()
for i in range(255):
source.write(struct.Struct('>B').pack(i) * 16384)
source.seek(0)
dest = io.BytesIO()
cctx = zstd.ZstdCompressor()
r, w = cctx.copy_stream(source, dest)
self.assertEqual(r, 255 * 16384)
self.assertEqual(w, 999)
def test_write_checksum(self):
source = io.BytesIO(b'foobar')
no_checksum = io.BytesIO()
cctx = zstd.ZstdCompressor(level=1)
cctx.copy_stream(source, no_checksum)
source.seek(0)
with_checksum = io.BytesIO()
cctx = zstd.ZstdCompressor(level=1, write_checksum=True)
cctx.copy_stream(source, with_checksum)
self.assertEqual(len(with_checksum.getvalue()),
len(no_checksum.getvalue()) + 4)
def test_write_content_size(self):
source = io.BytesIO(b'foobar' * 256)
no_size = io.BytesIO()
cctx = zstd.ZstdCompressor(level=1)
cctx.copy_stream(source, no_size)
source.seek(0)
with_size = io.BytesIO()
cctx = zstd.ZstdCompressor(level=1, write_content_size=True)
cctx.copy_stream(source, with_size)
# Source content size is unknown, so no content size written.
self.assertEqual(len(with_size.getvalue()),
len(no_size.getvalue()))
source.seek(0)
with_size = io.BytesIO()
cctx.copy_stream(source, with_size, size=len(source.getvalue()))
# We specified source size, so content size header is present.
self.assertEqual(len(with_size.getvalue()),
len(no_size.getvalue()) + 1)
def test_read_write_size(self):
source = OpCountingBytesIO(b'foobarfoobar')
dest = OpCountingBytesIO()
cctx = zstd.ZstdCompressor()
r, w = cctx.copy_stream(source, dest, read_size=1, write_size=1)
self.assertEqual(r, len(source.getvalue()))
self.assertEqual(w, 21)
self.assertEqual(source._read_count, len(source.getvalue()) + 1)
self.assertEqual(dest._write_count, len(dest.getvalue()))
def compress(data, level):
buffer = io.BytesIO()
cctx = zstd.ZstdCompressor(level=level)
with cctx.write_to(buffer) as compressor:
compressor.write(data)
return buffer.getvalue()
class TestCompressor_write_to(unittest.TestCase):
def test_empty(self):
self.assertEqual(compress(b'', 1),
b'\x28\xb5\x2f\xfd\x00\x48\x01\x00\x00')
def test_multiple_compress(self):
buffer = io.BytesIO()
cctx = zstd.ZstdCompressor(level=5)
with cctx.write_to(buffer) as compressor:
compressor.write(b'foo')
compressor.write(b'bar')
compressor.write(b'x' * 8192)
result = buffer.getvalue()
self.assertEqual(result,
b'\x28\xb5\x2f\xfd\x00\x50\x75\x00\x00\x38\x66\x6f'
b'\x6f\x62\x61\x72\x78\x01\x00\xfc\xdf\x03\x23')
def test_dictionary(self):
samples = []
for i in range(128):
samples.append(b'foo' * 64)
samples.append(b'bar' * 64)
samples.append(b'foobar' * 64)
d = zstd.train_dictionary(8192, samples)
buffer = io.BytesIO()
cctx = zstd.ZstdCompressor(level=9, dict_data=d)
with cctx.write_to(buffer) as compressor:
compressor.write(b'foo')
compressor.write(b'bar')
compressor.write(b'foo' * 16384)
compressed = buffer.getvalue()
h = hashlib.sha1(compressed).hexdigest()
self.assertEqual(h, '1c5bcd25181bcd8c1a73ea8773323e0056129f92')
def test_compression_params(self):
params = zstd.CompressionParameters(20, 6, 12, 5, 4, 10, zstd.STRATEGY_FAST)
buffer = io.BytesIO()
cctx = zstd.ZstdCompressor(compression_params=params)
with cctx.write_to(buffer) as compressor:
compressor.write(b'foo')
compressor.write(b'bar')
compressor.write(b'foobar' * 16384)
compressed = buffer.getvalue()
h = hashlib.sha1(compressed).hexdigest()
self.assertEqual(h, '1ae31f270ed7de14235221a604b31ecd517ebd99')
def test_write_checksum(self):
no_checksum = io.BytesIO()
cctx = zstd.ZstdCompressor(level=1)
with cctx.write_to(no_checksum) as compressor:
compressor.write(b'foobar')
with_checksum = io.BytesIO()
cctx = zstd.ZstdCompressor(level=1, write_checksum=True)
with cctx.write_to(with_checksum) as compressor:
compressor.write(b'foobar')
self.assertEqual(len(with_checksum.getvalue()),
len(no_checksum.getvalue()) + 4)
def test_write_content_size(self):
no_size = io.BytesIO()
cctx = zstd.ZstdCompressor(level=1)
with cctx.write_to(no_size) as compressor:
compressor.write(b'foobar' * 256)
with_size = io.BytesIO()
cctx = zstd.ZstdCompressor(level=1, write_content_size=True)
with cctx.write_to(with_size) as compressor:
compressor.write(b'foobar' * 256)
# Source size is not known in streaming mode, so header not
# written.
self.assertEqual(len(with_size.getvalue()),
len(no_size.getvalue()))
# Declaring size will write the header.
with_size = io.BytesIO()
with cctx.write_to(with_size, size=len(b'foobar' * 256)) as compressor:
compressor.write(b'foobar' * 256)
self.assertEqual(len(with_size.getvalue()),
len(no_size.getvalue()) + 1)
def test_no_dict_id(self):
samples = []
for i in range(128):
samples.append(b'foo' * 64)
samples.append(b'bar' * 64)
samples.append(b'foobar' * 64)
d = zstd.train_dictionary(1024, samples)
with_dict_id = io.BytesIO()
cctx = zstd.ZstdCompressor(level=1, dict_data=d)
with cctx.write_to(with_dict_id) as compressor:
compressor.write(b'foobarfoobar')
cctx = zstd.ZstdCompressor(level=1, dict_data=d, write_dict_id=False)
no_dict_id = io.BytesIO()
with cctx.write_to(no_dict_id) as compressor:
compressor.write(b'foobarfoobar')
self.assertEqual(len(with_dict_id.getvalue()),
len(no_dict_id.getvalue()) + 4)
def test_memory_size(self):
cctx = zstd.ZstdCompressor(level=3)
buffer = io.BytesIO()
with cctx.write_to(buffer) as compressor:
size = compressor.memory_size()
self.assertGreater(size, 100000)
def test_write_size(self):
cctx = zstd.ZstdCompressor(level=3)
dest = OpCountingBytesIO()
with cctx.write_to(dest, write_size=1) as compressor:
compressor.write(b'foo')
compressor.write(b'bar')
compressor.write(b'foobar')
self.assertEqual(len(dest.getvalue()), dest._write_count)
def test_flush_repeated(self):
cctx = zstd.ZstdCompressor(level=3)
dest = OpCountingBytesIO()
with cctx.write_to(dest) as compressor:
compressor.write(b'foo')
self.assertEqual(dest._write_count, 0)
compressor.flush()
self.assertEqual(dest._write_count, 1)
compressor.write(b'bar')
self.assertEqual(dest._write_count, 1)
compressor.flush()
self.assertEqual(dest._write_count, 2)
compressor.write(b'baz')
self.assertEqual(dest._write_count, 3)
def test_flush_empty_block(self):
cctx = zstd.ZstdCompressor(level=3, write_checksum=True)
dest = OpCountingBytesIO()
with cctx.write_to(dest) as compressor:
compressor.write(b'foobar' * 8192)
count = dest._write_count
offset = dest.tell()
compressor.flush()
self.assertGreater(dest._write_count, count)
self.assertGreater(dest.tell(), offset)
offset = dest.tell()
# Ending the write here should cause an empty block to be written
# to denote end of frame.
trailing = dest.getvalue()[offset:]
# 3 bytes block header + 4 bytes frame checksum
self.assertEqual(len(trailing), 7)
header = trailing[0:3]
self.assertEqual(header, b'\x01\x00\x00')
class TestCompressor_read_from(unittest.TestCase):
def test_type_validation(self):
cctx = zstd.ZstdCompressor()
# Object with read() works.
cctx.read_from(io.BytesIO())
# Buffer protocol works.
cctx.read_from(b'foobar')
with self.assertRaisesRegexp(ValueError, 'must pass an object with a read'):
cctx.read_from(True)
def test_read_empty(self):
cctx = zstd.ZstdCompressor(level=1)
source = io.BytesIO()
it = cctx.read_from(source)
chunks = list(it)
self.assertEqual(len(chunks), 1)
compressed = b''.join(chunks)
self.assertEqual(compressed, b'\x28\xb5\x2f\xfd\x00\x48\x01\x00\x00')
# And again with the buffer protocol.
it = cctx.read_from(b'')
chunks = list(it)
self.assertEqual(len(chunks), 1)
compressed2 = b''.join(chunks)
self.assertEqual(compressed2, compressed)
def test_read_large(self):
cctx = zstd.ZstdCompressor(level=1)
source = io.BytesIO()
source.write(b'f' * zstd.COMPRESSION_RECOMMENDED_INPUT_SIZE)
source.write(b'o')
source.seek(0)
# Creating an iterator should not perform any compression until
# first read.
it = cctx.read_from(source, size=len(source.getvalue()))
self.assertEqual(source.tell(), 0)
# We should have exactly 2 output chunks.
chunks = []
chunk = next(it)
self.assertIsNotNone(chunk)
self.assertEqual(source.tell(), zstd.COMPRESSION_RECOMMENDED_INPUT_SIZE)
chunks.append(chunk)
chunk = next(it)
self.assertIsNotNone(chunk)
chunks.append(chunk)
self.assertEqual(source.tell(), len(source.getvalue()))
with self.assertRaises(StopIteration):
next(it)
# And again for good measure.
with self.assertRaises(StopIteration):
next(it)
# We should get the same output as the one-shot compression mechanism.
self.assertEqual(b''.join(chunks), cctx.compress(source.getvalue()))
# Now check the buffer protocol.
it = cctx.read_from(source.getvalue())
chunks = list(it)
self.assertEqual(len(chunks), 2)
self.assertEqual(b''.join(chunks), cctx.compress(source.getvalue()))
def test_read_write_size(self):
source = OpCountingBytesIO(b'foobarfoobar')
cctx = zstd.ZstdCompressor(level=3)
for chunk in cctx.read_from(source, read_size=1, write_size=1):
self.assertEqual(len(chunk), 1)
self.assertEqual(source._read_count, len(source.getvalue()) + 1)