##// END OF EJS Templates
wireproto: convert human output frames to CBOR...
wireproto: convert human output frames to CBOR This is easier than rolling our own encoding format. As a bonus, some of our artificial limits around lengths of things went away because we are no longer using fixed length fields to hold sizes. Differential Revision: https://phab.mercurial-scm.org/D3067

File last commit:

r37335:36d17f37 default
r37335:36d17f37 default
Show More
test-wireproto-serverreactor.py
665 lines | 25.3 KiB | text/x-python | PythonLexer
/ tests / test-wireproto-serverreactor.py
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070 from __future__ import absolute_import, print_function
import unittest
Gregory Szorc
wireproto: use CBOR for command requests...
r37308 from mercurial.thirdparty import (
cbor,
)
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070 from mercurial import (
util,
wireprotoframing as framing,
)
ffs = framing.makeframefromhumanstring
Gregory Szorc
wireproto: buffer output frames when in half duplex mode...
r37074 def makereactor(deferoutput=False):
return framing.serverreactor(deferoutput=deferoutput)
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070
def sendframes(reactor, gen):
"""Send a generator of frame bytearray to a reactor.
Emits a generator of results from ``onframerecv()`` calls.
"""
for frame in gen:
Gregory Szorc
wireproto: define attr-based classes for representing frames...
r37079 header = framing.parseheader(frame)
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070 payload = frame[framing.FRAME_HEADER_SIZE:]
Gregory Szorc
wireproto: define attr-based classes for representing frames...
r37079 assert len(payload) == header.length
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070
Gregory Szorc
wireproto: define attr-based classes for representing frames...
r37079 yield reactor.onframerecv(framing.frame(header.requestid,
Gregory Szorc
wireproto: add streams to frame-based protocol...
r37304 header.streamid,
header.streamflags,
Gregory Szorc
wireproto: define attr-based classes for representing frames...
r37079 header.typeid,
header.flags,
payload))
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070
Gregory Szorc
wireproto: start to associate frame generation with a stream...
r37303 def sendcommandframes(reactor, stream, rid, cmd, args, datafh=None):
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070 """Generate frames to run a command and send them to a reactor."""
Gregory Szorc
wireproto: add request IDs to frames...
r37075 return sendframes(reactor,
Gregory Szorc
wireproto: start to associate frame generation with a stream...
r37303 framing.createcommandframes(stream, rid, cmd, args,
datafh))
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070
Gregory Szorc
wireproto: syntax for encoding CBOR into frames...
r37306 class FrameHumanStringTests(unittest.TestCase):
def testbasic(self):
self.assertEqual(ffs(b'1 1 0 1 0 '),
b'\x00\x00\x00\x01\x00\x01\x00\x10')
self.assertEqual(ffs(b'2 4 0 1 0 '),
b'\x00\x00\x00\x02\x00\x04\x00\x10')
self.assertEqual(ffs(b'2 4 0 1 0 foo'),
b'\x03\x00\x00\x02\x00\x04\x00\x10foo')
def testcborint(self):
self.assertEqual(ffs(b'1 1 0 1 0 cbor:15'),
b'\x01\x00\x00\x01\x00\x01\x00\x10\x0f')
self.assertEqual(ffs(b'1 1 0 1 0 cbor:42'),
b'\x02\x00\x00\x01\x00\x01\x00\x10\x18*')
self.assertEqual(ffs(b'1 1 0 1 0 cbor:1048576'),
b'\x05\x00\x00\x01\x00\x01\x00\x10\x1a'
b'\x00\x10\x00\x00')
self.assertEqual(ffs(b'1 1 0 1 0 cbor:0'),
b'\x01\x00\x00\x01\x00\x01\x00\x10\x00')
self.assertEqual(ffs(b'1 1 0 1 0 cbor:-1'),
b'\x01\x00\x00\x01\x00\x01\x00\x10 ')
self.assertEqual(ffs(b'1 1 0 1 0 cbor:-342542'),
b'\x05\x00\x00\x01\x00\x01\x00\x10:\x00\x05:\r')
def testcborstrings(self):
# String literals should be unicode.
self.assertEqual(ffs(b"1 1 0 1 0 cbor:'foo'"),
b'\x04\x00\x00\x01\x00\x01\x00\x10cfoo')
self.assertEqual(ffs(b"1 1 0 1 0 cbor:b'foo'"),
b'\x04\x00\x00\x01\x00\x01\x00\x10Cfoo')
self.assertEqual(ffs(b"1 1 0 1 0 cbor:u'foo'"),
b'\x04\x00\x00\x01\x00\x01\x00\x10cfoo')
def testcborlists(self):
self.assertEqual(ffs(b"1 1 0 1 0 cbor:[None, True, False, 42, b'foo']"),
b'\n\x00\x00\x01\x00\x01\x00\x10\x85\xf6\xf5\xf4'
b'\x18*Cfoo')
def testcbordicts(self):
self.assertEqual(ffs(b"1 1 0 1 0 "
b"cbor:{b'foo': b'val1', b'bar': b'val2'}"),
b'\x13\x00\x00\x01\x00\x01\x00\x10\xa2'
b'CbarDval2CfooDval1')
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070 class FrameTests(unittest.TestCase):
def testdataexactframesize(self):
data = util.bytesio(b'x' * framing.DEFAULT_MAX_FRAME_SIZE)
Gregory Szorc
wireproto: add streams to frame-based protocol...
r37304 stream = framing.stream(1)
Gregory Szorc
wireproto: start to associate frame generation with a stream...
r37303 frames = list(framing.createcommandframes(stream, 1, b'command',
{}, data))
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070 self.assertEqual(frames, [
Gregory Szorc
wireproto: use CBOR for command requests...
r37308 ffs(b'1 1 stream-begin command-request new|have-data '
b"cbor:{b'name': b'command'}"),
Gregory Szorc
wireproto: add streams to frame-based protocol...
r37304 ffs(b'1 1 0 command-data continuation %s' % data.getvalue()),
ffs(b'1 1 0 command-data eos ')
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070 ])
def testdatamultipleframes(self):
data = util.bytesio(b'x' * (framing.DEFAULT_MAX_FRAME_SIZE + 1))
Gregory Szorc
wireproto: start to associate frame generation with a stream...
r37303
Gregory Szorc
wireproto: add streams to frame-based protocol...
r37304 stream = framing.stream(1)
Gregory Szorc
wireproto: start to associate frame generation with a stream...
r37303 frames = list(framing.createcommandframes(stream, 1, b'command', {},
data))
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070 self.assertEqual(frames, [
Gregory Szorc
wireproto: use CBOR for command requests...
r37308 ffs(b'1 1 stream-begin command-request new|have-data '
b"cbor:{b'name': b'command'}"),
Gregory Szorc
wireproto: add streams to frame-based protocol...
r37304 ffs(b'1 1 0 command-data continuation %s' % (
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070 b'x' * framing.DEFAULT_MAX_FRAME_SIZE)),
Gregory Szorc
wireproto: add streams to frame-based protocol...
r37304 ffs(b'1 1 0 command-data eos x'),
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070 ])
def testargsanddata(self):
data = util.bytesio(b'x' * 100)
Gregory Szorc
wireproto: add streams to frame-based protocol...
r37304 stream = framing.stream(1)
Gregory Szorc
wireproto: start to associate frame generation with a stream...
r37303 frames = list(framing.createcommandframes(stream, 1, b'command', {
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070 b'key1': b'key1value',
b'key2': b'key2value',
b'key3': b'key3value',
}, data))
self.assertEqual(frames, [
Gregory Szorc
wireproto: use CBOR for command requests...
r37308 ffs(b'1 1 stream-begin command-request new|have-data '
b"cbor:{b'name': b'command', b'args': {b'key1': b'key1value', "
b"b'key2': b'key2value', b'key3': b'key3value'}}"),
Gregory Szorc
wireproto: add streams to frame-based protocol...
r37304 ffs(b'1 1 0 command-data eos %s' % data.getvalue()),
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070 ])
Gregory Szorc
wireproto: define human output side channel frame...
r37078 def testtextoutputformattingstringtype(self):
"""Formatting string must be bytes."""
with self.assertRaisesRegexp(ValueError, 'must use bytes formatting '):
Gregory Szorc
wireproto: start to associate frame generation with a stream...
r37303 list(framing.createtextoutputframe(None, 1, [
Gregory Szorc
wireproto: define human output side channel frame...
r37078 (b'foo'.decode('ascii'), [], [])]))
def testtextoutputargumentbytes(self):
with self.assertRaisesRegexp(ValueError, 'must use bytes for argument'):
Gregory Szorc
wireproto: start to associate frame generation with a stream...
r37303 list(framing.createtextoutputframe(None, 1, [
Gregory Szorc
wireproto: define human output side channel frame...
r37078 (b'foo', [b'foo'.decode('ascii')], [])]))
def testtextoutputlabelbytes(self):
with self.assertRaisesRegexp(ValueError, 'must use bytes for labels'):
Gregory Szorc
wireproto: start to associate frame generation with a stream...
r37303 list(framing.createtextoutputframe(None, 1, [
Gregory Szorc
wireproto: define human output side channel frame...
r37078 (b'foo', [], [b'foo'.decode('ascii')])]))
def testtextoutput1simpleatom(self):
Gregory Szorc
wireproto: add streams to frame-based protocol...
r37304 stream = framing.stream(1)
Gregory Szorc
wireproto: start to associate frame generation with a stream...
r37303 val = list(framing.createtextoutputframe(stream, 1, [
Gregory Szorc
wireproto: define human output side channel frame...
r37078 (b'foo', [], [])]))
self.assertEqual(val, [
Gregory Szorc
wireproto: convert human output frames to CBOR...
r37335 ffs(b'1 1 stream-begin text-output 0 '
b"cbor:[{b'msg': b'foo'}]"),
Gregory Szorc
wireproto: define human output side channel frame...
r37078 ])
def testtextoutput2simpleatoms(self):
Gregory Szorc
wireproto: add streams to frame-based protocol...
r37304 stream = framing.stream(1)
Gregory Szorc
wireproto: start to associate frame generation with a stream...
r37303 val = list(framing.createtextoutputframe(stream, 1, [
Gregory Szorc
wireproto: define human output side channel frame...
r37078 (b'foo', [], []),
(b'bar', [], []),
]))
self.assertEqual(val, [
Gregory Szorc
wireproto: convert human output frames to CBOR...
r37335 ffs(b'1 1 stream-begin text-output 0 '
b"cbor:[{b'msg': b'foo'}, {b'msg': b'bar'}]")
Gregory Szorc
wireproto: define human output side channel frame...
r37078 ])
def testtextoutput1arg(self):
Gregory Szorc
wireproto: add streams to frame-based protocol...
r37304 stream = framing.stream(1)
Gregory Szorc
wireproto: start to associate frame generation with a stream...
r37303 val = list(framing.createtextoutputframe(stream, 1, [
Gregory Szorc
wireproto: define human output side channel frame...
r37078 (b'foo %s', [b'val1'], []),
]))
self.assertEqual(val, [
Gregory Szorc
wireproto: convert human output frames to CBOR...
r37335 ffs(b'1 1 stream-begin text-output 0 '
b"cbor:[{b'msg': b'foo %s', b'args': [b'val1']}]")
Gregory Szorc
wireproto: define human output side channel frame...
r37078 ])
def testtextoutput2arg(self):
Gregory Szorc
wireproto: add streams to frame-based protocol...
r37304 stream = framing.stream(1)
Gregory Szorc
wireproto: start to associate frame generation with a stream...
r37303 val = list(framing.createtextoutputframe(stream, 1, [
Gregory Szorc
wireproto: define human output side channel frame...
r37078 (b'foo %s %s', [b'val', b'value'], []),
]))
self.assertEqual(val, [
Gregory Szorc
wireproto: convert human output frames to CBOR...
r37335 ffs(b'1 1 stream-begin text-output 0 '
b"cbor:[{b'msg': b'foo %s %s', b'args': [b'val', b'value']}]")
Gregory Szorc
wireproto: define human output side channel frame...
r37078 ])
def testtextoutput1label(self):
Gregory Szorc
wireproto: add streams to frame-based protocol...
r37304 stream = framing.stream(1)
Gregory Szorc
wireproto: start to associate frame generation with a stream...
r37303 val = list(framing.createtextoutputframe(stream, 1, [
Gregory Szorc
wireproto: define human output side channel frame...
r37078 (b'foo', [], [b'label']),
]))
self.assertEqual(val, [
Gregory Szorc
wireproto: convert human output frames to CBOR...
r37335 ffs(b'1 1 stream-begin text-output 0 '
b"cbor:[{b'msg': b'foo', b'labels': [b'label']}]")
Gregory Szorc
wireproto: define human output side channel frame...
r37078 ])
def testargandlabel(self):
Gregory Szorc
wireproto: add streams to frame-based protocol...
r37304 stream = framing.stream(1)
Gregory Szorc
wireproto: start to associate frame generation with a stream...
r37303 val = list(framing.createtextoutputframe(stream, 1, [
Gregory Szorc
wireproto: define human output side channel frame...
r37078 (b'foo %s', [b'arg'], [b'label']),
]))
self.assertEqual(val, [
Gregory Szorc
wireproto: convert human output frames to CBOR...
r37335 ffs(b'1 1 stream-begin text-output 0 '
b"cbor:[{b'msg': b'foo %s', b'args': [b'arg'], "
b"b'labels': [b'label']}]")
Gregory Szorc
wireproto: define human output side channel frame...
r37078 ])
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070 class ServerReactorTests(unittest.TestCase):
Gregory Szorc
tests: fix duplicate and failing test...
r37302 def _sendsingleframe(self, reactor, f):
results = list(sendframes(reactor, [f]))
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070 self.assertEqual(len(results), 1)
return results[0]
def assertaction(self, res, expected):
self.assertIsInstance(res, tuple)
self.assertEqual(len(res), 2)
self.assertIsInstance(res[1], dict)
self.assertEqual(res[0], expected)
Gregory Szorc
wireproto: define and implement responses in framing protocol...
r37073 def assertframesequal(self, frames, framestrings):
expected = [ffs(s) for s in framestrings]
self.assertEqual(list(frames), expected)
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070 def test1framecommand(self):
"""Receiving a command in a single frame yields request to run it."""
reactor = makereactor()
Gregory Szorc
wireproto: add streams to frame-based protocol...
r37304 stream = framing.stream(1)
Gregory Szorc
wireproto: start to associate frame generation with a stream...
r37303 results = list(sendcommandframes(reactor, stream, 1, b'mycommand', {}))
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070 self.assertEqual(len(results), 1)
self.assertaction(results[0], 'runcommand')
self.assertEqual(results[0][1], {
Gregory Szorc
wireproto: add request IDs to frames...
r37075 'requestid': 1,
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070 'command': b'mycommand',
'args': {},
'data': None,
})
Gregory Szorc
wireproto: buffer output frames when in half duplex mode...
r37074 result = reactor.oninputeof()
self.assertaction(result, 'noop')
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070 def test1argument(self):
reactor = makereactor()
Gregory Szorc
wireproto: add streams to frame-based protocol...
r37304 stream = framing.stream(1)
Gregory Szorc
wireproto: start to associate frame generation with a stream...
r37303 results = list(sendcommandframes(reactor, stream, 41, b'mycommand',
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070 {b'foo': b'bar'}))
Gregory Szorc
wireproto: use CBOR for command requests...
r37308 self.assertEqual(len(results), 1)
self.assertaction(results[0], 'runcommand')
self.assertEqual(results[0][1], {
Gregory Szorc
wireproto: add request IDs to frames...
r37075 'requestid': 41,
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070 'command': b'mycommand',
'args': {b'foo': b'bar'},
'data': None,
})
def testmultiarguments(self):
reactor = makereactor()
Gregory Szorc
wireproto: add streams to frame-based protocol...
r37304 stream = framing.stream(1)
Gregory Szorc
wireproto: start to associate frame generation with a stream...
r37303 results = list(sendcommandframes(reactor, stream, 1, b'mycommand',
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070 {b'foo': b'bar', b'biz': b'baz'}))
Gregory Szorc
wireproto: use CBOR for command requests...
r37308 self.assertEqual(len(results), 1)
self.assertaction(results[0], 'runcommand')
self.assertEqual(results[0][1], {
Gregory Szorc
wireproto: add request IDs to frames...
r37075 'requestid': 1,
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070 'command': b'mycommand',
'args': {b'foo': b'bar', b'biz': b'baz'},
'data': None,
})
def testsimplecommanddata(self):
reactor = makereactor()
Gregory Szorc
wireproto: add streams to frame-based protocol...
r37304 stream = framing.stream(1)
Gregory Szorc
wireproto: start to associate frame generation with a stream...
r37303 results = list(sendcommandframes(reactor, stream, 1, b'mycommand', {},
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070 util.bytesio(b'data!')))
self.assertEqual(len(results), 2)
self.assertaction(results[0], 'wantframe')
self.assertaction(results[1], 'runcommand')
self.assertEqual(results[1][1], {
Gregory Szorc
wireproto: add request IDs to frames...
r37075 'requestid': 1,
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070 'command': b'mycommand',
'args': {},
'data': b'data!',
})
def testmultipledataframes(self):
frames = [
Gregory Szorc
wireproto: use CBOR for command requests...
r37308 ffs(b'1 1 stream-begin command-request new|have-data '
b"cbor:{b'name': b'mycommand'}"),
Gregory Szorc
wireproto: add streams to frame-based protocol...
r37304 ffs(b'1 1 0 command-data continuation data1'),
ffs(b'1 1 0 command-data continuation data2'),
ffs(b'1 1 0 command-data eos data3'),
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070 ]
reactor = makereactor()
results = list(sendframes(reactor, frames))
self.assertEqual(len(results), 4)
for i in range(3):
self.assertaction(results[i], 'wantframe')
self.assertaction(results[3], 'runcommand')
self.assertEqual(results[3][1], {
Gregory Szorc
wireproto: add request IDs to frames...
r37075 'requestid': 1,
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070 'command': b'mycommand',
'args': {},
'data': b'data1data2data3',
})
def testargumentanddata(self):
frames = [
Gregory Szorc
wireproto: use CBOR for command requests...
r37308 ffs(b'1 1 stream-begin command-request new|have-data '
b"cbor:{b'name': b'command', b'args': {b'key': b'val',"
b"b'foo': b'bar'}}"),
Gregory Szorc
wireproto: add streams to frame-based protocol...
r37304 ffs(b'1 1 0 command-data continuation value1'),
ffs(b'1 1 0 command-data eos value2'),
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070 ]
reactor = makereactor()
results = list(sendframes(reactor, frames))
self.assertaction(results[-1], 'runcommand')
self.assertEqual(results[-1][1], {
Gregory Szorc
wireproto: add request IDs to frames...
r37075 'requestid': 1,
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070 'command': b'command',
'args': {
b'key': b'val',
b'foo': b'bar',
},
'data': b'value1value2',
})
Gregory Szorc
wireproto: use CBOR for command requests...
r37308 def testnewandcontinuation(self):
result = self._sendsingleframe(makereactor(),
ffs(b'1 1 stream-begin command-request new|continuation '))
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070 self.assertaction(result, 'error')
self.assertEqual(result[1], {
Gregory Szorc
wireproto: use CBOR for command requests...
r37308 'message': b'received command request frame with both new and '
b'continuation flags set',
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070 })
Gregory Szorc
wireproto: use CBOR for command requests...
r37308 def testneithernewnorcontinuation(self):
result = self._sendsingleframe(makereactor(),
ffs(b'1 1 stream-begin command-request 0 '))
self.assertaction(result, 'error')
self.assertEqual(result[1], {
'message': b'received command request frame with neither new nor '
b'continuation flags set',
Gregory Szorc
wireproto: support for receiving multiple requests...
r37076 })
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070 def testunexpectedcommanddata(self):
Gregory Szorc
wireproto: use CBOR for command requests...
r37308 """Command data frame when not running a command is an error."""
result = self._sendsingleframe(makereactor(),
ffs(b'1 1 stream-begin command-data 0 ignored'))
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070 self.assertaction(result, 'error')
self.assertEqual(result[1], {
Gregory Szorc
wireproto: use CBOR for command requests...
r37308 'message': b'expected command request frame; got 3',
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070 })
Gregory Szorc
wireproto: support for receiving multiple requests...
r37076 def testunexpectedcommanddatareceiving(self):
"""Same as above except the command is receiving."""
results = list(sendframes(makereactor(), [
Gregory Szorc
wireproto: use CBOR for command requests...
r37308 ffs(b'1 1 stream-begin command-request new|more '
b"cbor:{b'name': b'ignored'}"),
Gregory Szorc
wireproto: add streams to frame-based protocol...
r37304 ffs(b'1 1 0 command-data eos ignored'),
Gregory Szorc
wireproto: support for receiving multiple requests...
r37076 ]))
Gregory Szorc
wireproto: use CBOR for command requests...
r37308 self.assertaction(results[0], 'wantframe')
Gregory Szorc
wireproto: support for receiving multiple requests...
r37076 self.assertaction(results[1], 'error')
self.assertEqual(results[1][1], {
'message': b'received command data frame for request that is not '
b'expecting data: 1',
})
Gregory Szorc
tests: fix duplicate and failing test...
r37302 def testconflictingrequestidallowed(self):
Gregory Szorc
wireproto: support for receiving multiple requests...
r37076 """Multiple fully serviced commands with same request ID is allowed."""
Gregory Szorc
tests: fix duplicate and failing test...
r37302 reactor = makereactor()
results = []
Gregory Szorc
wireproto: explicit API to create outgoing streams...
r37305 outstream = reactor.makeoutputstream()
Gregory Szorc
tests: fix duplicate and failing test...
r37302 results.append(self._sendsingleframe(
Gregory Szorc
wireproto: use CBOR for command requests...
r37308 reactor, ffs(b'1 1 stream-begin command-request new '
b"cbor:{b'name': b'command'}")))
Gregory Szorc
wireproto: start to associate frame generation with a stream...
r37303 result = reactor.onbytesresponseready(outstream, 1, b'response1')
Gregory Szorc
tests: fix duplicate and failing test...
r37302 self.assertaction(result, 'sendframes')
list(result[1]['framegen'])
results.append(self._sendsingleframe(
Gregory Szorc
wireproto: use CBOR for command requests...
r37308 reactor, ffs(b'1 1 stream-begin command-request new '
b"cbor:{b'name': b'command'}")))
Gregory Szorc
wireproto: start to associate frame generation with a stream...
r37303 result = reactor.onbytesresponseready(outstream, 1, b'response2')
Gregory Szorc
tests: fix duplicate and failing test...
r37302 self.assertaction(result, 'sendframes')
list(result[1]['framegen'])
results.append(self._sendsingleframe(
Gregory Szorc
wireproto: use CBOR for command requests...
r37308 reactor, ffs(b'1 1 stream-begin command-request new '
b"cbor:{b'name': b'command'}")))
Gregory Szorc
wireproto: start to associate frame generation with a stream...
r37303 result = reactor.onbytesresponseready(outstream, 1, b'response3')
Gregory Szorc
tests: fix duplicate and failing test...
r37302 self.assertaction(result, 'sendframes')
list(result[1]['framegen'])
Gregory Szorc
wireproto: support for receiving multiple requests...
r37076 for i in range(3):
self.assertaction(results[i], 'runcommand')
self.assertEqual(results[i][1], {
'requestid': 1,
'command': b'command',
'args': {},
'data': None,
})
def testconflictingrequestid(self):
"""Request ID for new command matching in-flight command is illegal."""
results = list(sendframes(makereactor(), [
Gregory Szorc
wireproto: use CBOR for command requests...
r37308 ffs(b'1 1 stream-begin command-request new|more '
b"cbor:{b'name': b'command'}"),
ffs(b'1 1 0 command-request new '
b"cbor:{b'name': b'command1'}"),
Gregory Szorc
wireproto: support for receiving multiple requests...
r37076 ]))
self.assertaction(results[0], 'wantframe')
self.assertaction(results[1], 'error')
self.assertEqual(results[1][1], {
'message': b'request with ID 1 already received',
})
def testinterleavedcommands(self):
Gregory Szorc
wireproto: use CBOR for command requests...
r37308 cbor1 = cbor.dumps({
b'name': b'command1',
b'args': {
b'foo': b'bar',
b'key1': b'val',
}
}, canonical=True)
cbor3 = cbor.dumps({
b'name': b'command3',
b'args': {
b'biz': b'baz',
b'key': b'val',
},
}, canonical=True)
Gregory Szorc
wireproto: support for receiving multiple requests...
r37076 results = list(sendframes(makereactor(), [
Gregory Szorc
wireproto: use CBOR for command requests...
r37308 ffs(b'1 1 stream-begin command-request new|more %s' % cbor1[0:6]),
ffs(b'3 1 0 command-request new|more %s' % cbor3[0:10]),
ffs(b'1 1 0 command-request continuation|more %s' % cbor1[6:9]),
ffs(b'3 1 0 command-request continuation|more %s' % cbor3[10:13]),
ffs(b'3 1 0 command-request continuation %s' % cbor3[13:]),
ffs(b'1 1 0 command-request continuation %s' % cbor1[9:]),
Gregory Szorc
wireproto: support for receiving multiple requests...
r37076 ]))
self.assertEqual([t[0] for t in results], [
'wantframe',
'wantframe',
'wantframe',
'wantframe',
'runcommand',
'runcommand',
])
self.assertEqual(results[4][1], {
'requestid': 3,
'command': 'command3',
'args': {b'biz': b'baz', b'key': b'val'},
'data': None,
})
self.assertEqual(results[5][1], {
'requestid': 1,
'command': 'command1',
'args': {b'foo': b'bar', b'key1': b'val'},
'data': None,
})
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070 def testmissingcommanddataframe(self):
Gregory Szorc
wireproto: support for receiving multiple requests...
r37076 # The reactor doesn't currently handle partially received commands.
# So this test is failing to do anything with request 1.
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070 frames = [
Gregory Szorc
wireproto: use CBOR for command requests...
r37308 ffs(b'1 1 stream-begin command-request new|have-data '
b"cbor:{b'name': b'command1'}"),
ffs(b'3 1 0 command-request new '
b"cbor:{b'name': b'command2'}"),
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070 ]
results = list(sendframes(makereactor(), frames))
self.assertEqual(len(results), 2)
self.assertaction(results[0], 'wantframe')
Gregory Szorc
wireproto: support for receiving multiple requests...
r37076 self.assertaction(results[1], 'runcommand')
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070
def testmissingcommanddataframeflags(self):
frames = [
Gregory Szorc
wireproto: use CBOR for command requests...
r37308 ffs(b'1 1 stream-begin command-request new|have-data '
b"cbor:{b'name': b'command1'}"),
Gregory Szorc
wireproto: add streams to frame-based protocol...
r37304 ffs(b'1 1 0 command-data 0 data'),
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070 ]
results = list(sendframes(makereactor(), frames))
self.assertEqual(len(results), 2)
self.assertaction(results[0], 'wantframe')
self.assertaction(results[1], 'error')
self.assertEqual(results[1][1], {
'message': b'command data frame without flags',
})
Gregory Szorc
wireproto: support for receiving multiple requests...
r37076 def testframefornonreceivingrequest(self):
"""Receiving a frame for a command that is not receiving is illegal."""
results = list(sendframes(makereactor(), [
Gregory Szorc
wireproto: use CBOR for command requests...
r37308 ffs(b'1 1 stream-begin command-request new '
b"cbor:{b'name': b'command1'}"),
ffs(b'3 1 0 command-request new|have-data '
b"cbor:{b'name': b'command3'}"),
ffs(b'5 1 0 command-data eos ignored'),
Gregory Szorc
wireproto: support for receiving multiple requests...
r37076 ]))
self.assertaction(results[2], 'error')
self.assertEqual(results[2][1], {
Gregory Szorc
wireproto: explicitly track which requests are active...
r37081 'message': b'received frame for request that is not receiving: 5',
Gregory Szorc
wireproto: support for receiving multiple requests...
r37076 })
Gregory Szorc
wireproto: define and implement responses in framing protocol...
r37073 def testsimpleresponse(self):
"""Bytes response to command sends result frames."""
reactor = makereactor()
Gregory Szorc
wireproto: add streams to frame-based protocol...
r37304 instream = framing.stream(1)
Gregory Szorc
wireproto: start to associate frame generation with a stream...
r37303 list(sendcommandframes(reactor, instream, 1, b'mycommand', {}))
Gregory Szorc
wireproto: define and implement responses in framing protocol...
r37073
Gregory Szorc
wireproto: explicit API to create outgoing streams...
r37305 outstream = reactor.makeoutputstream()
Gregory Szorc
wireproto: start to associate frame generation with a stream...
r37303 result = reactor.onbytesresponseready(outstream, 1, b'response')
Gregory Szorc
wireproto: define and implement responses in framing protocol...
r37073 self.assertaction(result, 'sendframes')
self.assertframesequal(result[1]['framegen'], [
Gregory Szorc
wireproto: add streams to frame-based protocol...
r37304 b'1 2 stream-begin bytes-response eos response',
Gregory Szorc
wireproto: define and implement responses in framing protocol...
r37073 ])
def testmultiframeresponse(self):
"""Bytes response spanning multiple frames is handled."""
first = b'x' * framing.DEFAULT_MAX_FRAME_SIZE
second = b'y' * 100
reactor = makereactor()
Gregory Szorc
wireproto: add streams to frame-based protocol...
r37304 instream = framing.stream(1)
Gregory Szorc
wireproto: start to associate frame generation with a stream...
r37303 list(sendcommandframes(reactor, instream, 1, b'mycommand', {}))
Gregory Szorc
wireproto: define and implement responses in framing protocol...
r37073
Gregory Szorc
wireproto: explicit API to create outgoing streams...
r37305 outstream = reactor.makeoutputstream()
Gregory Szorc
wireproto: start to associate frame generation with a stream...
r37303 result = reactor.onbytesresponseready(outstream, 1, first + second)
Gregory Szorc
wireproto: define and implement responses in framing protocol...
r37073 self.assertaction(result, 'sendframes')
self.assertframesequal(result[1]['framegen'], [
Gregory Szorc
wireproto: add streams to frame-based protocol...
r37304 b'1 2 stream-begin bytes-response continuation %s' % first,
b'1 2 0 bytes-response eos %s' % second,
Gregory Szorc
wireproto: define and implement responses in framing protocol...
r37073 ])
def testapplicationerror(self):
reactor = makereactor()
Gregory Szorc
wireproto: add streams to frame-based protocol...
r37304 instream = framing.stream(1)
Gregory Szorc
wireproto: start to associate frame generation with a stream...
r37303 list(sendcommandframes(reactor, instream, 1, b'mycommand', {}))
Gregory Szorc
wireproto: define and implement responses in framing protocol...
r37073
Gregory Szorc
wireproto: explicit API to create outgoing streams...
r37305 outstream = reactor.makeoutputstream()
Gregory Szorc
wireproto: start to associate frame generation with a stream...
r37303 result = reactor.onapplicationerror(outstream, 1, b'some message')
Gregory Szorc
wireproto: define and implement responses in framing protocol...
r37073 self.assertaction(result, 'sendframes')
self.assertframesequal(result[1]['framegen'], [
Gregory Szorc
wireproto: add streams to frame-based protocol...
r37304 b'1 2 stream-begin error-response application some message',
Gregory Szorc
wireproto: define and implement responses in framing protocol...
r37073 ])
Gregory Szorc
wireproto: buffer output frames when in half duplex mode...
r37074 def test1commanddeferresponse(self):
"""Responses when in deferred output mode are delayed until EOF."""
reactor = makereactor(deferoutput=True)
Gregory Szorc
wireproto: add streams to frame-based protocol...
r37304 instream = framing.stream(1)
Gregory Szorc
wireproto: start to associate frame generation with a stream...
r37303 results = list(sendcommandframes(reactor, instream, 1, b'mycommand',
{}))
Gregory Szorc
wireproto: buffer output frames when in half duplex mode...
r37074 self.assertEqual(len(results), 1)
self.assertaction(results[0], 'runcommand')
Gregory Szorc
wireproto: explicit API to create outgoing streams...
r37305 outstream = reactor.makeoutputstream()
Gregory Szorc
wireproto: start to associate frame generation with a stream...
r37303 result = reactor.onbytesresponseready(outstream, 1, b'response')
Gregory Szorc
wireproto: buffer output frames when in half duplex mode...
r37074 self.assertaction(result, 'noop')
result = reactor.oninputeof()
self.assertaction(result, 'sendframes')
self.assertframesequal(result[1]['framegen'], [
Gregory Szorc
wireproto: add streams to frame-based protocol...
r37304 b'1 2 stream-begin bytes-response eos response',
Gregory Szorc
wireproto: buffer output frames when in half duplex mode...
r37074 ])
def testmultiplecommanddeferresponse(self):
reactor = makereactor(deferoutput=True)
Gregory Szorc
wireproto: add streams to frame-based protocol...
r37304 instream = framing.stream(1)
Gregory Szorc
wireproto: start to associate frame generation with a stream...
r37303 list(sendcommandframes(reactor, instream, 1, b'command1', {}))
list(sendcommandframes(reactor, instream, 3, b'command2', {}))
Gregory Szorc
wireproto: buffer output frames when in half duplex mode...
r37074
Gregory Szorc
wireproto: explicit API to create outgoing streams...
r37305 outstream = reactor.makeoutputstream()
Gregory Szorc
wireproto: start to associate frame generation with a stream...
r37303 result = reactor.onbytesresponseready(outstream, 1, b'response1')
Gregory Szorc
wireproto: buffer output frames when in half duplex mode...
r37074 self.assertaction(result, 'noop')
Gregory Szorc
wireproto: start to associate frame generation with a stream...
r37303 result = reactor.onbytesresponseready(outstream, 3, b'response2')
Gregory Szorc
wireproto: buffer output frames when in half duplex mode...
r37074 self.assertaction(result, 'noop')
result = reactor.oninputeof()
self.assertaction(result, 'sendframes')
self.assertframesequal(result[1]['framegen'], [
Gregory Szorc
wireproto: add streams to frame-based protocol...
r37304 b'1 2 stream-begin bytes-response eos response1',
b'3 2 0 bytes-response eos response2'
Gregory Szorc
wireproto: add request IDs to frames...
r37075 ])
def testrequestidtracking(self):
reactor = makereactor(deferoutput=True)
Gregory Szorc
wireproto: add streams to frame-based protocol...
r37304 instream = framing.stream(1)
Gregory Szorc
wireproto: start to associate frame generation with a stream...
r37303 list(sendcommandframes(reactor, instream, 1, b'command1', {}))
list(sendcommandframes(reactor, instream, 3, b'command2', {}))
list(sendcommandframes(reactor, instream, 5, b'command3', {}))
Gregory Szorc
wireproto: add request IDs to frames...
r37075
# Register results for commands out of order.
Gregory Szorc
wireproto: explicit API to create outgoing streams...
r37305 outstream = reactor.makeoutputstream()
Gregory Szorc
wireproto: start to associate frame generation with a stream...
r37303 reactor.onbytesresponseready(outstream, 3, b'response3')
reactor.onbytesresponseready(outstream, 1, b'response1')
reactor.onbytesresponseready(outstream, 5, b'response5')
Gregory Szorc
wireproto: add request IDs to frames...
r37075
result = reactor.oninputeof()
self.assertaction(result, 'sendframes')
self.assertframesequal(result[1]['framegen'], [
Gregory Szorc
wireproto: add streams to frame-based protocol...
r37304 b'3 2 stream-begin bytes-response eos response3',
b'1 2 0 bytes-response eos response1',
b'5 2 0 bytes-response eos response5',
Gregory Szorc
wireproto: buffer output frames when in half duplex mode...
r37074 ])
Gregory Szorc
wireproto: explicitly track which requests are active...
r37081 def testduplicaterequestonactivecommand(self):
"""Receiving a request ID that matches a request that isn't finished."""
reactor = makereactor()
Gregory Szorc
wireproto: add streams to frame-based protocol...
r37304 stream = framing.stream(1)
Gregory Szorc
wireproto: start to associate frame generation with a stream...
r37303 list(sendcommandframes(reactor, stream, 1, b'command1', {}))
results = list(sendcommandframes(reactor, stream, 1, b'command1', {}))
Gregory Szorc
wireproto: explicitly track which requests are active...
r37081
self.assertaction(results[0], 'error')
self.assertEqual(results[0][1], {
'message': b'request with ID 1 is already active',
})
def testduplicaterequestonactivecommandnosend(self):
"""Same as above but we've registered a response but haven't sent it."""
reactor = makereactor()
Gregory Szorc
wireproto: add streams to frame-based protocol...
r37304 instream = framing.stream(1)
Gregory Szorc
wireproto: start to associate frame generation with a stream...
r37303 list(sendcommandframes(reactor, instream, 1, b'command1', {}))
Gregory Szorc
wireproto: explicit API to create outgoing streams...
r37305 outstream = reactor.makeoutputstream()
Gregory Szorc
wireproto: start to associate frame generation with a stream...
r37303 reactor.onbytesresponseready(outstream, 1, b'response')
Gregory Szorc
wireproto: explicitly track which requests are active...
r37081
# We've registered the response but haven't sent it. From the
# perspective of the reactor, the command is still active.
Gregory Szorc
wireproto: start to associate frame generation with a stream...
r37303 results = list(sendcommandframes(reactor, instream, 1, b'command1', {}))
Gregory Szorc
wireproto: explicitly track which requests are active...
r37081 self.assertaction(results[0], 'error')
self.assertEqual(results[0][1], {
'message': b'request with ID 1 is already active',
})
def testduplicaterequestaftersend(self):
"""We can use a duplicate request ID after we've sent the response."""
reactor = makereactor()
Gregory Szorc
wireproto: add streams to frame-based protocol...
r37304 instream = framing.stream(1)
Gregory Szorc
wireproto: start to associate frame generation with a stream...
r37303 list(sendcommandframes(reactor, instream, 1, b'command1', {}))
Gregory Szorc
wireproto: explicit API to create outgoing streams...
r37305 outstream = reactor.makeoutputstream()
Gregory Szorc
wireproto: start to associate frame generation with a stream...
r37303 res = reactor.onbytesresponseready(outstream, 1, b'response')
Gregory Szorc
wireproto: explicitly track which requests are active...
r37081 list(res[1]['framegen'])
Gregory Szorc
wireproto: start to associate frame generation with a stream...
r37303 results = list(sendcommandframes(reactor, instream, 1, b'command1', {}))
Gregory Szorc
wireproto: explicitly track which requests are active...
r37081 self.assertaction(results[0], 'runcommand')
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070 if __name__ == '__main__':
import silenttestrunner
silenttestrunner.main(__name__)