test-wireproto-framing.py
286 lines
| 8.1 KiB
| text/x-python
|
PythonLexer
/ tests / test-wireproto-framing.py
Gregory Szorc
|
r37560 | from __future__ import absolute_import, print_function | ||
import unittest | ||||
from mercurial import ( | ||||
util, | ||||
wireprotoframing as framing, | ||||
) | ||||
ffs = framing.makeframefromhumanstring | ||||
Augie Fackler
|
r43346 | |||
Gregory Szorc
|
r37560 | class FrameHumanStringTests(unittest.TestCase): | ||
def testbasic(self): | ||||
Augie Fackler
|
r43346 | self.assertEqual( | ||
ffs(b'1 1 0 1 0 '), b'\x00\x00\x00\x01\x00\x01\x00\x10' | ||||
) | ||||
Gregory Szorc
|
r37560 | |||
Augie Fackler
|
r43346 | self.assertEqual( | ||
ffs(b'2 4 0 1 0 '), b'\x00\x00\x00\x02\x00\x04\x00\x10' | ||||
) | ||||
Gregory Szorc
|
r37560 | |||
Augie Fackler
|
r43346 | self.assertEqual( | ||
ffs(b'2 4 0 1 0 foo'), b'\x03\x00\x00\x02\x00\x04\x00\x10foo' | ||||
) | ||||
Gregory Szorc
|
r37560 | |||
def testcborint(self): | ||||
Augie Fackler
|
r43346 | self.assertEqual( | ||
ffs(b'1 1 0 1 0 cbor:15'), b'\x01\x00\x00\x01\x00\x01\x00\x10\x0f' | ||||
) | ||||
Gregory Szorc
|
r37560 | |||
Augie Fackler
|
r43346 | self.assertEqual( | ||
ffs(b'1 1 0 1 0 cbor:42'), b'\x02\x00\x00\x01\x00\x01\x00\x10\x18*' | ||||
) | ||||
self.assertEqual( | ||||
ffs(b'1 1 0 1 0 cbor:1048576'), | ||||
b'\x05\x00\x00\x01\x00\x01\x00\x10\x1a' b'\x00\x10\x00\x00', | ||||
) | ||||
Gregory Szorc
|
r37560 | |||
Augie Fackler
|
r43346 | self.assertEqual( | ||
ffs(b'1 1 0 1 0 cbor:0'), b'\x01\x00\x00\x01\x00\x01\x00\x10\x00' | ||||
) | ||||
Gregory Szorc
|
r37560 | |||
Augie Fackler
|
r43346 | self.assertEqual( | ||
ffs(b'1 1 0 1 0 cbor:-1'), b'\x01\x00\x00\x01\x00\x01\x00\x10 ' | ||||
) | ||||
Gregory Szorc
|
r37560 | |||
Augie Fackler
|
r43346 | self.assertEqual( | ||
ffs(b'1 1 0 1 0 cbor:-342542'), | ||||
b'\x05\x00\x00\x01\x00\x01\x00\x10:\x00\x05:\r', | ||||
) | ||||
Gregory Szorc
|
r37560 | |||
def testcborstrings(self): | ||||
Augie Fackler
|
r43346 | self.assertEqual( | ||
ffs(b"1 1 0 1 0 cbor:b'foo'"), | ||||
b'\x04\x00\x00\x01\x00\x01\x00\x10Cfoo', | ||||
) | ||||
Gregory Szorc
|
r37560 | |||
def testcborlists(self): | ||||
Augie Fackler
|
r43346 | self.assertEqual( | ||
ffs(b"1 1 0 1 0 cbor:[None, True, False, 42, b'foo']"), | ||||
b'\n\x00\x00\x01\x00\x01\x00\x10\x85\xf6\xf5\xf4' b'\x18*Cfoo', | ||||
) | ||||
Gregory Szorc
|
r37560 | |||
def testcbordicts(self): | ||||
Augie Fackler
|
r43346 | self.assertEqual( | ||
ffs(b"1 1 0 1 0 " b"cbor:{b'foo': b'val1', b'bar': b'val2'}"), | ||||
b'\x13\x00\x00\x01\x00\x01\x00\x10\xa2' b'CbarDval2CfooDval1', | ||||
) | ||||
Gregory Szorc
|
r37560 | |||
class FrameTests(unittest.TestCase): | ||||
def testdataexactframesize(self): | ||||
data = util.bytesio(b'x' * framing.DEFAULT_MAX_FRAME_SIZE) | ||||
stream = framing.stream(1) | ||||
Augie Fackler
|
r43346 | frames = list( | ||
framing.createcommandframes(stream, 1, b'command', {}, data) | ||||
) | ||||
self.assertEqual( | ||||
frames, | ||||
[ | ||||
ffs( | ||||
b'1 1 stream-begin command-request new|have-data ' | ||||
b"cbor:{b'name': b'command'}" | ||||
), | ||||
ffs(b'1 1 0 command-data continuation %s' % data.getvalue()), | ||||
ffs(b'1 1 0 command-data eos '), | ||||
], | ||||
) | ||||
Gregory Szorc
|
r37560 | |||
def testdatamultipleframes(self): | ||||
data = util.bytesio(b'x' * (framing.DEFAULT_MAX_FRAME_SIZE + 1)) | ||||
stream = framing.stream(1) | ||||
Augie Fackler
|
r43346 | frames = list( | ||
framing.createcommandframes(stream, 1, b'command', {}, data) | ||||
) | ||||
self.assertEqual( | ||||
frames, | ||||
[ | ||||
ffs( | ||||
b'1 1 stream-begin command-request new|have-data ' | ||||
b"cbor:{b'name': b'command'}" | ||||
), | ||||
ffs( | ||||
b'1 1 0 command-data continuation %s' | ||||
% (b'x' * framing.DEFAULT_MAX_FRAME_SIZE) | ||||
), | ||||
ffs(b'1 1 0 command-data eos x'), | ||||
], | ||||
) | ||||
Gregory Szorc
|
r37560 | |||
def testargsanddata(self): | ||||
data = util.bytesio(b'x' * 100) | ||||
stream = framing.stream(1) | ||||
Augie Fackler
|
r43346 | frames = list( | ||
framing.createcommandframes( | ||||
stream, | ||||
1, | ||||
b'command', | ||||
{ | ||||
b'key1': b'key1value', | ||||
b'key2': b'key2value', | ||||
b'key3': b'key3value', | ||||
}, | ||||
data, | ||||
) | ||||
) | ||||
Gregory Szorc
|
r37560 | |||
Augie Fackler
|
r43346 | self.assertEqual( | ||
frames, | ||||
[ | ||||
ffs( | ||||
b'1 1 stream-begin command-request new|have-data ' | ||||
b"cbor:{b'name': b'command', b'args': {b'key1': b'key1value', " | ||||
b"b'key2': b'key2value', b'key3': b'key3value'}}" | ||||
), | ||||
ffs(b'1 1 0 command-data eos %s' % data.getvalue()), | ||||
], | ||||
) | ||||
Gregory Szorc
|
r37560 | |||
Augie Fackler
|
r37733 | if not getattr(unittest.TestCase, 'assertRaisesRegex', False): | ||
# Python 3.7 deprecates the regex*p* version, but 2.7 lacks | ||||
# the regex version. | ||||
Augie Fackler
|
r43346 | assertRaisesRegex = ( # camelcase-required | ||
unittest.TestCase.assertRaisesRegexp | ||||
) | ||||
Augie Fackler
|
r37733 | |||
Gregory Szorc
|
r37560 | def testtextoutputformattingstringtype(self): | ||
"""Formatting string must be bytes.""" | ||||
Augie Fackler
|
r37733 | with self.assertRaisesRegex(ValueError, 'must use bytes formatting '): | ||
Augie Fackler
|
r43346 | list( | ||
framing.createtextoutputframe( | ||||
None, 1, [(b'foo'.decode('ascii'), [], [])] | ||||
) | ||||
) | ||||
Gregory Szorc
|
r37560 | |||
def testtextoutputargumentbytes(self): | ||||
Augie Fackler
|
r37733 | with self.assertRaisesRegex(ValueError, 'must use bytes for argument'): | ||
Augie Fackler
|
r43346 | list( | ||
framing.createtextoutputframe( | ||||
None, 1, [(b'foo', [b'foo'.decode('ascii')], [])] | ||||
) | ||||
) | ||||
Gregory Szorc
|
r37560 | |||
def testtextoutputlabelbytes(self): | ||||
Augie Fackler
|
r37733 | with self.assertRaisesRegex(ValueError, 'must use bytes for labels'): | ||
Augie Fackler
|
r43346 | list( | ||
framing.createtextoutputframe( | ||||
None, 1, [(b'foo', [], [b'foo'.decode('ascii')])] | ||||
) | ||||
) | ||||
Gregory Szorc
|
r37560 | |||
def testtextoutput1simpleatom(self): | ||||
stream = framing.stream(1) | ||||
Augie Fackler
|
r43346 | val = list(framing.createtextoutputframe(stream, 1, [(b'foo', [], [])])) | ||
Gregory Szorc
|
r37560 | |||
Augie Fackler
|
r43346 | self.assertEqual( | ||
val, | ||||
[ | ||||
ffs( | ||||
b'1 1 stream-begin text-output 0 ' | ||||
b"cbor:[{b'msg': b'foo'}]" | ||||
), | ||||
], | ||||
) | ||||
Gregory Szorc
|
r37560 | |||
def testtextoutput2simpleatoms(self): | ||||
stream = framing.stream(1) | ||||
Augie Fackler
|
r43346 | val = list( | ||
framing.createtextoutputframe( | ||||
stream, 1, [(b'foo', [], []), (b'bar', [], []),] | ||||
) | ||||
) | ||||
Gregory Szorc
|
r37560 | |||
Augie Fackler
|
r43346 | self.assertEqual( | ||
val, | ||||
[ | ||||
ffs( | ||||
b'1 1 stream-begin text-output 0 ' | ||||
b"cbor:[{b'msg': b'foo'}, {b'msg': b'bar'}]" | ||||
) | ||||
], | ||||
) | ||||
Gregory Szorc
|
r37560 | |||
def testtextoutput1arg(self): | ||||
stream = framing.stream(1) | ||||
Augie Fackler
|
r43346 | val = list( | ||
framing.createtextoutputframe( | ||||
stream, 1, [(b'foo %s', [b'val1'], []),] | ||||
) | ||||
) | ||||
Gregory Szorc
|
r37560 | |||
Augie Fackler
|
r43346 | self.assertEqual( | ||
val, | ||||
[ | ||||
ffs( | ||||
b'1 1 stream-begin text-output 0 ' | ||||
b"cbor:[{b'msg': b'foo %s', b'args': [b'val1']}]" | ||||
) | ||||
], | ||||
) | ||||
Gregory Szorc
|
r37560 | |||
def testtextoutput2arg(self): | ||||
stream = framing.stream(1) | ||||
Augie Fackler
|
r43346 | val = list( | ||
framing.createtextoutputframe( | ||||
stream, 1, [(b'foo %s %s', [b'val', b'value'], []),] | ||||
) | ||||
) | ||||
Gregory Szorc
|
r37560 | |||
Augie Fackler
|
r43346 | self.assertEqual( | ||
val, | ||||
[ | ||||
ffs( | ||||
b'1 1 stream-begin text-output 0 ' | ||||
b"cbor:[{b'msg': b'foo %s %s', b'args': [b'val', b'value']}]" | ||||
) | ||||
], | ||||
) | ||||
Gregory Szorc
|
r37560 | |||
def testtextoutput1label(self): | ||||
stream = framing.stream(1) | ||||
Augie Fackler
|
r43346 | val = list( | ||
framing.createtextoutputframe( | ||||
stream, 1, [(b'foo', [], [b'label']),] | ||||
) | ||||
) | ||||
Gregory Szorc
|
r37560 | |||
Augie Fackler
|
r43346 | self.assertEqual( | ||
val, | ||||
[ | ||||
ffs( | ||||
b'1 1 stream-begin text-output 0 ' | ||||
b"cbor:[{b'msg': b'foo', b'labels': [b'label']}]" | ||||
) | ||||
], | ||||
) | ||||
Gregory Szorc
|
r37560 | |||
def testargandlabel(self): | ||||
stream = framing.stream(1) | ||||
Augie Fackler
|
r43346 | val = list( | ||
framing.createtextoutputframe( | ||||
stream, 1, [(b'foo %s', [b'arg'], [b'label']),] | ||||
) | ||||
) | ||||
Gregory Szorc
|
r37560 | |||
Augie Fackler
|
r43346 | self.assertEqual( | ||
val, | ||||
[ | ||||
ffs( | ||||
b'1 1 stream-begin text-output 0 ' | ||||
b"cbor:[{b'msg': b'foo %s', b'args': [b'arg'], " | ||||
b"b'labels': [b'label']}]" | ||||
) | ||||
], | ||||
) | ||||
Gregory Szorc
|
r37560 | |||
if __name__ == '__main__': | ||||
import silenttestrunner | ||||
Augie Fackler
|
r43346 | |||
Gregory Szorc
|
r37560 | silenttestrunner.main(__name__) | ||