##// END OF EJS Templates
rhg: handle null changelog and manifest revisions...
rhg: handle null changelog and manifest revisions Differential Revision: https://phab.mercurial-scm.org/D11650

File last commit:

r46554:89a2afe3 default
r49012:61ce70fd default
Show More
test-wireproto-framing.py
307 lines | 8.4 KiB | text/x-python | PythonLexer
/ tests / test-wireproto-framing.py
Gregory Szorc
tests: extract wire protocol framing tests to own file...
r37560 from __future__ import absolute_import, print_function
import unittest
from mercurial import (
util,
wireprotoframing as framing,
)
ffs = framing.makeframefromhumanstring
Augie Fackler
formatting: blacken the codebase...
r43346
Gregory Szorc
tests: extract wire protocol framing tests to own file...
r37560 class FrameHumanStringTests(unittest.TestCase):
def testbasic(self):
Augie Fackler
formatting: blacken the codebase...
r43346 self.assertEqual(
ffs(b'1 1 0 1 0 '), b'\x00\x00\x00\x01\x00\x01\x00\x10'
)
Gregory Szorc
tests: extract wire protocol framing tests to own file...
r37560
Augie Fackler
formatting: blacken the codebase...
r43346 self.assertEqual(
ffs(b'2 4 0 1 0 '), b'\x00\x00\x00\x02\x00\x04\x00\x10'
)
Gregory Szorc
tests: extract wire protocol framing tests to own file...
r37560
Augie Fackler
formatting: blacken the codebase...
r43346 self.assertEqual(
ffs(b'2 4 0 1 0 foo'), b'\x03\x00\x00\x02\x00\x04\x00\x10foo'
)
Gregory Szorc
tests: extract wire protocol framing tests to own file...
r37560
def testcborint(self):
Augie Fackler
formatting: blacken the codebase...
r43346 self.assertEqual(
ffs(b'1 1 0 1 0 cbor:15'), b'\x01\x00\x00\x01\x00\x01\x00\x10\x0f'
)
Gregory Szorc
tests: extract wire protocol framing tests to own file...
r37560
Augie Fackler
formatting: blacken the codebase...
r43346 self.assertEqual(
ffs(b'1 1 0 1 0 cbor:42'), b'\x02\x00\x00\x01\x00\x01\x00\x10\x18*'
)
self.assertEqual(
ffs(b'1 1 0 1 0 cbor:1048576'),
b'\x05\x00\x00\x01\x00\x01\x00\x10\x1a' b'\x00\x10\x00\x00',
)
Gregory Szorc
tests: extract wire protocol framing tests to own file...
r37560
Augie Fackler
formatting: blacken the codebase...
r43346 self.assertEqual(
ffs(b'1 1 0 1 0 cbor:0'), b'\x01\x00\x00\x01\x00\x01\x00\x10\x00'
)
Gregory Szorc
tests: extract wire protocol framing tests to own file...
r37560
Augie Fackler
formatting: blacken the codebase...
r43346 self.assertEqual(
ffs(b'1 1 0 1 0 cbor:-1'), b'\x01\x00\x00\x01\x00\x01\x00\x10 '
)
Gregory Szorc
tests: extract wire protocol framing tests to own file...
r37560
Augie Fackler
formatting: blacken the codebase...
r43346 self.assertEqual(
ffs(b'1 1 0 1 0 cbor:-342542'),
b'\x05\x00\x00\x01\x00\x01\x00\x10:\x00\x05:\r',
)
Gregory Szorc
tests: extract wire protocol framing tests to own file...
r37560
def testcborstrings(self):
Augie Fackler
formatting: blacken the codebase...
r43346 self.assertEqual(
ffs(b"1 1 0 1 0 cbor:b'foo'"),
b'\x04\x00\x00\x01\x00\x01\x00\x10Cfoo',
)
Gregory Szorc
tests: extract wire protocol framing tests to own file...
r37560
def testcborlists(self):
Augie Fackler
formatting: blacken the codebase...
r43346 self.assertEqual(
ffs(b"1 1 0 1 0 cbor:[None, True, False, 42, b'foo']"),
b'\n\x00\x00\x01\x00\x01\x00\x10\x85\xf6\xf5\xf4' b'\x18*Cfoo',
)
Gregory Szorc
tests: extract wire protocol framing tests to own file...
r37560
def testcbordicts(self):
Augie Fackler
formatting: blacken the codebase...
r43346 self.assertEqual(
ffs(b"1 1 0 1 0 " b"cbor:{b'foo': b'val1', b'bar': b'val2'}"),
b'\x13\x00\x00\x01\x00\x01\x00\x10\xa2' b'CbarDval2CfooDval1',
)
Gregory Szorc
tests: extract wire protocol framing tests to own file...
r37560
class FrameTests(unittest.TestCase):
def testdataexactframesize(self):
data = util.bytesio(b'x' * framing.DEFAULT_MAX_FRAME_SIZE)
stream = framing.stream(1)
Augie Fackler
formatting: blacken the codebase...
r43346 frames = list(
framing.createcommandframes(stream, 1, b'command', {}, data)
)
self.assertEqual(
frames,
[
ffs(
b'1 1 stream-begin command-request new|have-data '
b"cbor:{b'name': b'command'}"
),
ffs(b'1 1 0 command-data continuation %s' % data.getvalue()),
ffs(b'1 1 0 command-data eos '),
],
)
Gregory Szorc
tests: extract wire protocol framing tests to own file...
r37560
def testdatamultipleframes(self):
data = util.bytesio(b'x' * (framing.DEFAULT_MAX_FRAME_SIZE + 1))
stream = framing.stream(1)
Augie Fackler
formatting: blacken the codebase...
r43346 frames = list(
framing.createcommandframes(stream, 1, b'command', {}, data)
)
self.assertEqual(
frames,
[
ffs(
b'1 1 stream-begin command-request new|have-data '
b"cbor:{b'name': b'command'}"
),
ffs(
b'1 1 0 command-data continuation %s'
% (b'x' * framing.DEFAULT_MAX_FRAME_SIZE)
),
ffs(b'1 1 0 command-data eos x'),
],
)
Gregory Szorc
tests: extract wire protocol framing tests to own file...
r37560
def testargsanddata(self):
data = util.bytesio(b'x' * 100)
stream = framing.stream(1)
Augie Fackler
formatting: blacken the codebase...
r43346 frames = list(
framing.createcommandframes(
stream,
1,
b'command',
{
b'key1': b'key1value',
b'key2': b'key2value',
b'key3': b'key3value',
},
data,
)
)
Gregory Szorc
tests: extract wire protocol framing tests to own file...
r37560
Augie Fackler
formatting: blacken the codebase...
r43346 self.assertEqual(
frames,
[
ffs(
b'1 1 stream-begin command-request new|have-data '
b"cbor:{b'name': b'command', b'args': {b'key1': b'key1value', "
b"b'key2': b'key2value', b'key3': b'key3value'}}"
),
ffs(b'1 1 0 command-data eos %s' % data.getvalue()),
],
)
Gregory Szorc
tests: extract wire protocol framing tests to own file...
r37560
Augie Fackler
cleanup: polyfill assertRaisesRegex so we can avoid assertRaisesRegexp...
r37733 if not getattr(unittest.TestCase, 'assertRaisesRegex', False):
# Python 3.7 deprecates the regex*p* version, but 2.7 lacks
# the regex version.
Augie Fackler
formatting: blacken the codebase...
r43346 assertRaisesRegex = ( # camelcase-required
unittest.TestCase.assertRaisesRegexp
)
Augie Fackler
cleanup: polyfill assertRaisesRegex so we can avoid assertRaisesRegexp...
r37733
Gregory Szorc
tests: extract wire protocol framing tests to own file...
r37560 def testtextoutputformattingstringtype(self):
"""Formatting string must be bytes."""
Augie Fackler
cleanup: polyfill assertRaisesRegex so we can avoid assertRaisesRegexp...
r37733 with self.assertRaisesRegex(ValueError, 'must use bytes formatting '):
Augie Fackler
formatting: blacken the codebase...
r43346 list(
framing.createtextoutputframe(
None, 1, [(b'foo'.decode('ascii'), [], [])]
)
)
Gregory Szorc
tests: extract wire protocol framing tests to own file...
r37560
def testtextoutputargumentbytes(self):
Augie Fackler
cleanup: polyfill assertRaisesRegex so we can avoid assertRaisesRegexp...
r37733 with self.assertRaisesRegex(ValueError, 'must use bytes for argument'):
Augie Fackler
formatting: blacken the codebase...
r43346 list(
framing.createtextoutputframe(
None, 1, [(b'foo', [b'foo'.decode('ascii')], [])]
)
)
Gregory Szorc
tests: extract wire protocol framing tests to own file...
r37560
def testtextoutputlabelbytes(self):
Augie Fackler
cleanup: polyfill assertRaisesRegex so we can avoid assertRaisesRegexp...
r37733 with self.assertRaisesRegex(ValueError, 'must use bytes for labels'):
Augie Fackler
formatting: blacken the codebase...
r43346 list(
framing.createtextoutputframe(
None, 1, [(b'foo', [], [b'foo'.decode('ascii')])]
)
)
Gregory Szorc
tests: extract wire protocol framing tests to own file...
r37560
def testtextoutput1simpleatom(self):
stream = framing.stream(1)
Augie Fackler
formatting: blacken the codebase...
r43346 val = list(framing.createtextoutputframe(stream, 1, [(b'foo', [], [])]))
Gregory Szorc
tests: extract wire protocol framing tests to own file...
r37560
Augie Fackler
formatting: blacken the codebase...
r43346 self.assertEqual(
val,
[
ffs(
b'1 1 stream-begin text-output 0 '
b"cbor:[{b'msg': b'foo'}]"
),
],
)
Gregory Szorc
tests: extract wire protocol framing tests to own file...
r37560
def testtextoutput2simpleatoms(self):
stream = framing.stream(1)
Augie Fackler
formatting: blacken the codebase...
r43346 val = list(
framing.createtextoutputframe(
Augie Fackler
formating: upgrade to black 20.8b1...
r46554 stream,
1,
[
(b'foo', [], []),
(b'bar', [], []),
],
Augie Fackler
formatting: blacken the codebase...
r43346 )
)
Gregory Szorc
tests: extract wire protocol framing tests to own file...
r37560
Augie Fackler
formatting: blacken the codebase...
r43346 self.assertEqual(
val,
[
ffs(
b'1 1 stream-begin text-output 0 '
b"cbor:[{b'msg': b'foo'}, {b'msg': b'bar'}]"
)
],
)
Gregory Szorc
tests: extract wire protocol framing tests to own file...
r37560
def testtextoutput1arg(self):
stream = framing.stream(1)
Augie Fackler
formatting: blacken the codebase...
r43346 val = list(
framing.createtextoutputframe(
Augie Fackler
formating: upgrade to black 20.8b1...
r46554 stream,
1,
[
(b'foo %s', [b'val1'], []),
],
Augie Fackler
formatting: blacken the codebase...
r43346 )
)
Gregory Szorc
tests: extract wire protocol framing tests to own file...
r37560
Augie Fackler
formatting: blacken the codebase...
r43346 self.assertEqual(
val,
[
ffs(
b'1 1 stream-begin text-output 0 '
b"cbor:[{b'msg': b'foo %s', b'args': [b'val1']}]"
)
],
)
Gregory Szorc
tests: extract wire protocol framing tests to own file...
r37560
def testtextoutput2arg(self):
stream = framing.stream(1)
Augie Fackler
formatting: blacken the codebase...
r43346 val = list(
framing.createtextoutputframe(
Augie Fackler
formating: upgrade to black 20.8b1...
r46554 stream,
1,
[
(b'foo %s %s', [b'val', b'value'], []),
],
Augie Fackler
formatting: blacken the codebase...
r43346 )
)
Gregory Szorc
tests: extract wire protocol framing tests to own file...
r37560
Augie Fackler
formatting: blacken the codebase...
r43346 self.assertEqual(
val,
[
ffs(
b'1 1 stream-begin text-output 0 '
b"cbor:[{b'msg': b'foo %s %s', b'args': [b'val', b'value']}]"
)
],
)
Gregory Szorc
tests: extract wire protocol framing tests to own file...
r37560
def testtextoutput1label(self):
stream = framing.stream(1)
Augie Fackler
formatting: blacken the codebase...
r43346 val = list(
framing.createtextoutputframe(
Augie Fackler
formating: upgrade to black 20.8b1...
r46554 stream,
1,
[
(b'foo', [], [b'label']),
],
Augie Fackler
formatting: blacken the codebase...
r43346 )
)
Gregory Szorc
tests: extract wire protocol framing tests to own file...
r37560
Augie Fackler
formatting: blacken the codebase...
r43346 self.assertEqual(
val,
[
ffs(
b'1 1 stream-begin text-output 0 '
b"cbor:[{b'msg': b'foo', b'labels': [b'label']}]"
)
],
)
Gregory Szorc
tests: extract wire protocol framing tests to own file...
r37560
def testargandlabel(self):
stream = framing.stream(1)
Augie Fackler
formatting: blacken the codebase...
r43346 val = list(
framing.createtextoutputframe(
Augie Fackler
formating: upgrade to black 20.8b1...
r46554 stream,
1,
[
(b'foo %s', [b'arg'], [b'label']),
],
Augie Fackler
formatting: blacken the codebase...
r43346 )
)
Gregory Szorc
tests: extract wire protocol framing tests to own file...
r37560
Augie Fackler
formatting: blacken the codebase...
r43346 self.assertEqual(
val,
[
ffs(
b'1 1 stream-begin text-output 0 '
b"cbor:[{b'msg': b'foo %s', b'args': [b'arg'], "
b"b'labels': [b'label']}]"
)
],
)
Gregory Szorc
tests: extract wire protocol framing tests to own file...
r37560
if __name__ == '__main__':
import silenttestrunner
Augie Fackler
formatting: blacken the codebase...
r43346
Gregory Szorc
tests: extract wire protocol framing tests to own file...
r37560 silenttestrunner.main(__name__)