##// END OF EJS Templates
share: make it possible to control the working copy format variant...
share: make it possible to control the working copy format variant A share will use the same format as its source for the store, but there are no reason to not lets it control the working copy variant at creation time. So we make it so. Differential Revision: https://phab.mercurial-scm.org/D11892

File last commit:

r46554:89a2afe3 default
r49297:bf2738e0 default
Show More
test-wireproto-framing.py
307 lines | 8.4 KiB | text/x-python | PythonLexer
/ tests / test-wireproto-framing.py
from __future__ import absolute_import, print_function
import unittest
from mercurial import (
util,
wireprotoframing as framing,
)
ffs = framing.makeframefromhumanstring
class FrameHumanStringTests(unittest.TestCase):
def testbasic(self):
self.assertEqual(
ffs(b'1 1 0 1 0 '), b'\x00\x00\x00\x01\x00\x01\x00\x10'
)
self.assertEqual(
ffs(b'2 4 0 1 0 '), b'\x00\x00\x00\x02\x00\x04\x00\x10'
)
self.assertEqual(
ffs(b'2 4 0 1 0 foo'), b'\x03\x00\x00\x02\x00\x04\x00\x10foo'
)
def testcborint(self):
self.assertEqual(
ffs(b'1 1 0 1 0 cbor:15'), b'\x01\x00\x00\x01\x00\x01\x00\x10\x0f'
)
self.assertEqual(
ffs(b'1 1 0 1 0 cbor:42'), b'\x02\x00\x00\x01\x00\x01\x00\x10\x18*'
)
self.assertEqual(
ffs(b'1 1 0 1 0 cbor:1048576'),
b'\x05\x00\x00\x01\x00\x01\x00\x10\x1a' b'\x00\x10\x00\x00',
)
self.assertEqual(
ffs(b'1 1 0 1 0 cbor:0'), b'\x01\x00\x00\x01\x00\x01\x00\x10\x00'
)
self.assertEqual(
ffs(b'1 1 0 1 0 cbor:-1'), b'\x01\x00\x00\x01\x00\x01\x00\x10 '
)
self.assertEqual(
ffs(b'1 1 0 1 0 cbor:-342542'),
b'\x05\x00\x00\x01\x00\x01\x00\x10:\x00\x05:\r',
)
def testcborstrings(self):
self.assertEqual(
ffs(b"1 1 0 1 0 cbor:b'foo'"),
b'\x04\x00\x00\x01\x00\x01\x00\x10Cfoo',
)
def testcborlists(self):
self.assertEqual(
ffs(b"1 1 0 1 0 cbor:[None, True, False, 42, b'foo']"),
b'\n\x00\x00\x01\x00\x01\x00\x10\x85\xf6\xf5\xf4' b'\x18*Cfoo',
)
def testcbordicts(self):
self.assertEqual(
ffs(b"1 1 0 1 0 " b"cbor:{b'foo': b'val1', b'bar': b'val2'}"),
b'\x13\x00\x00\x01\x00\x01\x00\x10\xa2' b'CbarDval2CfooDval1',
)
class FrameTests(unittest.TestCase):
def testdataexactframesize(self):
data = util.bytesio(b'x' * framing.DEFAULT_MAX_FRAME_SIZE)
stream = framing.stream(1)
frames = list(
framing.createcommandframes(stream, 1, b'command', {}, data)
)
self.assertEqual(
frames,
[
ffs(
b'1 1 stream-begin command-request new|have-data '
b"cbor:{b'name': b'command'}"
),
ffs(b'1 1 0 command-data continuation %s' % data.getvalue()),
ffs(b'1 1 0 command-data eos '),
],
)
def testdatamultipleframes(self):
data = util.bytesio(b'x' * (framing.DEFAULT_MAX_FRAME_SIZE + 1))
stream = framing.stream(1)
frames = list(
framing.createcommandframes(stream, 1, b'command', {}, data)
)
self.assertEqual(
frames,
[
ffs(
b'1 1 stream-begin command-request new|have-data '
b"cbor:{b'name': b'command'}"
),
ffs(
b'1 1 0 command-data continuation %s'
% (b'x' * framing.DEFAULT_MAX_FRAME_SIZE)
),
ffs(b'1 1 0 command-data eos x'),
],
)
def testargsanddata(self):
data = util.bytesio(b'x' * 100)
stream = framing.stream(1)
frames = list(
framing.createcommandframes(
stream,
1,
b'command',
{
b'key1': b'key1value',
b'key2': b'key2value',
b'key3': b'key3value',
},
data,
)
)
self.assertEqual(
frames,
[
ffs(
b'1 1 stream-begin command-request new|have-data '
b"cbor:{b'name': b'command', b'args': {b'key1': b'key1value', "
b"b'key2': b'key2value', b'key3': b'key3value'}}"
),
ffs(b'1 1 0 command-data eos %s' % data.getvalue()),
],
)
if not getattr(unittest.TestCase, 'assertRaisesRegex', False):
# Python 3.7 deprecates the regex*p* version, but 2.7 lacks
# the regex version.
assertRaisesRegex = ( # camelcase-required
unittest.TestCase.assertRaisesRegexp
)
def testtextoutputformattingstringtype(self):
"""Formatting string must be bytes."""
with self.assertRaisesRegex(ValueError, 'must use bytes formatting '):
list(
framing.createtextoutputframe(
None, 1, [(b'foo'.decode('ascii'), [], [])]
)
)
def testtextoutputargumentbytes(self):
with self.assertRaisesRegex(ValueError, 'must use bytes for argument'):
list(
framing.createtextoutputframe(
None, 1, [(b'foo', [b'foo'.decode('ascii')], [])]
)
)
def testtextoutputlabelbytes(self):
with self.assertRaisesRegex(ValueError, 'must use bytes for labels'):
list(
framing.createtextoutputframe(
None, 1, [(b'foo', [], [b'foo'.decode('ascii')])]
)
)
def testtextoutput1simpleatom(self):
stream = framing.stream(1)
val = list(framing.createtextoutputframe(stream, 1, [(b'foo', [], [])]))
self.assertEqual(
val,
[
ffs(
b'1 1 stream-begin text-output 0 '
b"cbor:[{b'msg': b'foo'}]"
),
],
)
def testtextoutput2simpleatoms(self):
stream = framing.stream(1)
val = list(
framing.createtextoutputframe(
stream,
1,
[
(b'foo', [], []),
(b'bar', [], []),
],
)
)
self.assertEqual(
val,
[
ffs(
b'1 1 stream-begin text-output 0 '
b"cbor:[{b'msg': b'foo'}, {b'msg': b'bar'}]"
)
],
)
def testtextoutput1arg(self):
stream = framing.stream(1)
val = list(
framing.createtextoutputframe(
stream,
1,
[
(b'foo %s', [b'val1'], []),
],
)
)
self.assertEqual(
val,
[
ffs(
b'1 1 stream-begin text-output 0 '
b"cbor:[{b'msg': b'foo %s', b'args': [b'val1']}]"
)
],
)
def testtextoutput2arg(self):
stream = framing.stream(1)
val = list(
framing.createtextoutputframe(
stream,
1,
[
(b'foo %s %s', [b'val', b'value'], []),
],
)
)
self.assertEqual(
val,
[
ffs(
b'1 1 stream-begin text-output 0 '
b"cbor:[{b'msg': b'foo %s %s', b'args': [b'val', b'value']}]"
)
],
)
def testtextoutput1label(self):
stream = framing.stream(1)
val = list(
framing.createtextoutputframe(
stream,
1,
[
(b'foo', [], [b'label']),
],
)
)
self.assertEqual(
val,
[
ffs(
b'1 1 stream-begin text-output 0 '
b"cbor:[{b'msg': b'foo', b'labels': [b'label']}]"
)
],
)
def testargandlabel(self):
stream = framing.stream(1)
val = list(
framing.createtextoutputframe(
stream,
1,
[
(b'foo %s', [b'arg'], [b'label']),
],
)
)
self.assertEqual(
val,
[
ffs(
b'1 1 stream-begin text-output 0 '
b"cbor:[{b'msg': b'foo %s', b'args': [b'arg'], "
b"b'labels': [b'label']}]"
)
],
)
if __name__ == '__main__':
import silenttestrunner
silenttestrunner.main(__name__)