##// END OF EJS Templates
wireprotov2: define and implement "manifestdata" command...
wireprotov2: define and implement "manifestdata" command The added command can be used for obtaining manifest data. Given a manifest path and set of manifest nodes, data about manifests can be retrieved. Unlike changeset data, we wish to emit deltas to describe manifest revisions. So the command uses the relatively new API for building delta requests and emitting them. The code calls into deltaparent(), which I'm not very keen of. There's still work to be done in delta generation land so implementation details of storage (e.g. exactly one delta is stored/available) don't creep into higher levels. But we can worry about this later (there is already a TODO on imanifestorage tracking this). On the subject of parent deltas, the server assumes parent revisions exist on the receiving end. This is obviously wrong for shallow clone. I've added TODOs to add a mechanism to the command to allow clients to specify desired behavior. This shouldn't be too difficult to implement. Another big change is that the client must explicitly request manifest nodes to retrieve. This is a major departure from "getbundle," where the server derives relevant manifests as it iterates changesets and sends them automatically. As implemented, the client must transmit each requested node to the server. At 20 bytes per node, we're looking at 2 MB per 100,000 nodes. Plus wire encoding overhead. This isn't ideal for clients with limited upload bandwidth. I plan to address this in the future by allowing alternate mechanisms for defining the revisions to retrieve. One idea is to define a range of changeset revisions whose manifest revisions to retrieve (similar to how "changesetdata" works). We almost certainly want an API to look up an individual manifest by node. And that's where I've chosen to start with the implementation. Again, a theme of this early exchangev2 work is I want to start by building primitives for accessing raw repository data first and see how far we can get with those before we need more complexity. Differential Revision: https://phab.mercurial-scm.org/D4488

File last commit:

r37744:0c184ca5 default
r39673:c7a7c7e8 default
Show More
test-wireproto-serverreactor.py
490 lines | 19.0 KiB | text/x-python | PythonLexer
/ tests / test-wireproto-serverreactor.py
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070 from __future__ import absolute_import, print_function
import unittest
Gregory Szorc
wireproto: use CBOR for command requests...
r37308 from mercurial.thirdparty import (
cbor,
)
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070 from mercurial import (
util,
wireprotoframing as framing,
)
ffs = framing.makeframefromhumanstring
Gregory Szorc
wireprotov2: change command response protocol to include a leading map...
r37743 OK = cbor.dumps({b'status': b'ok'})
Gregory Szorc
wireproto: buffer output frames when in half duplex mode...
r37074 def makereactor(deferoutput=False):
return framing.serverreactor(deferoutput=deferoutput)
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070
def sendframes(reactor, gen):
"""Send a generator of frame bytearray to a reactor.
Emits a generator of results from ``onframerecv()`` calls.
"""
for frame in gen:
Gregory Szorc
wireproto: define attr-based classes for representing frames...
r37079 header = framing.parseheader(frame)
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070 payload = frame[framing.FRAME_HEADER_SIZE:]
Gregory Szorc
wireproto: define attr-based classes for representing frames...
r37079 assert len(payload) == header.length
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070
Gregory Szorc
wireproto: define attr-based classes for representing frames...
r37079 yield reactor.onframerecv(framing.frame(header.requestid,
Gregory Szorc
wireproto: add streams to frame-based protocol...
r37304 header.streamid,
header.streamflags,
Gregory Szorc
wireproto: define attr-based classes for representing frames...
r37079 header.typeid,
header.flags,
payload))
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070
Gregory Szorc
wireproto: start to associate frame generation with a stream...
r37303 def sendcommandframes(reactor, stream, rid, cmd, args, datafh=None):
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070 """Generate frames to run a command and send them to a reactor."""
Gregory Szorc
wireproto: add request IDs to frames...
r37075 return sendframes(reactor,
Gregory Szorc
wireproto: start to associate frame generation with a stream...
r37303 framing.createcommandframes(stream, rid, cmd, args,
datafh))
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070
Gregory Szorc
wireproto: define human output side channel frame...
r37078
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070 class ServerReactorTests(unittest.TestCase):
Gregory Szorc
tests: fix duplicate and failing test...
r37302 def _sendsingleframe(self, reactor, f):
results = list(sendframes(reactor, [f]))
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070 self.assertEqual(len(results), 1)
return results[0]
def assertaction(self, res, expected):
self.assertIsInstance(res, tuple)
self.assertEqual(len(res), 2)
self.assertIsInstance(res[1], dict)
self.assertEqual(res[0], expected)
Gregory Szorc
wireproto: define and implement responses in framing protocol...
r37073 def assertframesequal(self, frames, framestrings):
expected = [ffs(s) for s in framestrings]
self.assertEqual(list(frames), expected)
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070 def test1framecommand(self):
"""Receiving a command in a single frame yields request to run it."""
reactor = makereactor()
Gregory Szorc
wireproto: add streams to frame-based protocol...
r37304 stream = framing.stream(1)
Gregory Szorc
wireproto: start to associate frame generation with a stream...
r37303 results = list(sendcommandframes(reactor, stream, 1, b'mycommand', {}))
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070 self.assertEqual(len(results), 1)
Augie Fackler
tests: add all missing b prefixes in reactor tests...
r37700 self.assertaction(results[0], b'runcommand')
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070 self.assertEqual(results[0][1], {
Augie Fackler
tests: add all missing b prefixes in reactor tests...
r37700 b'requestid': 1,
b'command': b'mycommand',
b'args': {},
b'data': None,
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070 })
Gregory Szorc
wireproto: buffer output frames when in half duplex mode...
r37074 result = reactor.oninputeof()
Augie Fackler
tests: add all missing b prefixes in reactor tests...
r37700 self.assertaction(result, b'noop')
Gregory Szorc
wireproto: buffer output frames when in half duplex mode...
r37074
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070 def test1argument(self):
reactor = makereactor()
Gregory Szorc
wireproto: add streams to frame-based protocol...
r37304 stream = framing.stream(1)
Gregory Szorc
wireproto: start to associate frame generation with a stream...
r37303 results = list(sendcommandframes(reactor, stream, 41, b'mycommand',
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070 {b'foo': b'bar'}))
Gregory Szorc
wireproto: use CBOR for command requests...
r37308 self.assertEqual(len(results), 1)
Augie Fackler
tests: add all missing b prefixes in reactor tests...
r37700 self.assertaction(results[0], b'runcommand')
Gregory Szorc
wireproto: use CBOR for command requests...
r37308 self.assertEqual(results[0][1], {
Augie Fackler
tests: add all missing b prefixes in reactor tests...
r37700 b'requestid': 41,
b'command': b'mycommand',
b'args': {b'foo': b'bar'},
b'data': None,
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070 })
def testmultiarguments(self):
reactor = makereactor()
Gregory Szorc
wireproto: add streams to frame-based protocol...
r37304 stream = framing.stream(1)
Gregory Szorc
wireproto: start to associate frame generation with a stream...
r37303 results = list(sendcommandframes(reactor, stream, 1, b'mycommand',
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070 {b'foo': b'bar', b'biz': b'baz'}))
Gregory Szorc
wireproto: use CBOR for command requests...
r37308 self.assertEqual(len(results), 1)
Augie Fackler
tests: add all missing b prefixes in reactor tests...
r37700 self.assertaction(results[0], b'runcommand')
Gregory Szorc
wireproto: use CBOR for command requests...
r37308 self.assertEqual(results[0][1], {
Augie Fackler
tests: add all missing b prefixes in reactor tests...
r37700 b'requestid': 1,
b'command': b'mycommand',
b'args': {b'foo': b'bar', b'biz': b'baz'},
b'data': None,
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070 })
def testsimplecommanddata(self):
reactor = makereactor()
Gregory Szorc
wireproto: add streams to frame-based protocol...
r37304 stream = framing.stream(1)
Gregory Szorc
wireproto: start to associate frame generation with a stream...
r37303 results = list(sendcommandframes(reactor, stream, 1, b'mycommand', {},
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070 util.bytesio(b'data!')))
self.assertEqual(len(results), 2)
Augie Fackler
tests: add all missing b prefixes in reactor tests...
r37700 self.assertaction(results[0], b'wantframe')
self.assertaction(results[1], b'runcommand')
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070 self.assertEqual(results[1][1], {
Augie Fackler
tests: add all missing b prefixes in reactor tests...
r37700 b'requestid': 1,
b'command': b'mycommand',
b'args': {},
b'data': b'data!',
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070 })
def testmultipledataframes(self):
frames = [
Gregory Szorc
wireproto: use CBOR for command requests...
r37308 ffs(b'1 1 stream-begin command-request new|have-data '
b"cbor:{b'name': b'mycommand'}"),
Gregory Szorc
wireproto: add streams to frame-based protocol...
r37304 ffs(b'1 1 0 command-data continuation data1'),
ffs(b'1 1 0 command-data continuation data2'),
ffs(b'1 1 0 command-data eos data3'),
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070 ]
reactor = makereactor()
results = list(sendframes(reactor, frames))
self.assertEqual(len(results), 4)
for i in range(3):
Augie Fackler
tests: add all missing b prefixes in reactor tests...
r37700 self.assertaction(results[i], b'wantframe')
self.assertaction(results[3], b'runcommand')
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070 self.assertEqual(results[3][1], {
Augie Fackler
tests: add all missing b prefixes in reactor tests...
r37700 b'requestid': 1,
b'command': b'mycommand',
b'args': {},
b'data': b'data1data2data3',
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070 })
def testargumentanddata(self):
frames = [
Gregory Szorc
wireproto: use CBOR for command requests...
r37308 ffs(b'1 1 stream-begin command-request new|have-data '
b"cbor:{b'name': b'command', b'args': {b'key': b'val',"
b"b'foo': b'bar'}}"),
Gregory Szorc
wireproto: add streams to frame-based protocol...
r37304 ffs(b'1 1 0 command-data continuation value1'),
ffs(b'1 1 0 command-data eos value2'),
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070 ]
reactor = makereactor()
results = list(sendframes(reactor, frames))
Augie Fackler
tests: add all missing b prefixes in reactor tests...
r37700 self.assertaction(results[-1], b'runcommand')
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070 self.assertEqual(results[-1][1], {
Augie Fackler
tests: add all missing b prefixes in reactor tests...
r37700 b'requestid': 1,
b'command': b'command',
b'args': {
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070 b'key': b'val',
b'foo': b'bar',
},
Augie Fackler
tests: add all missing b prefixes in reactor tests...
r37700 b'data': b'value1value2',
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070 })
Gregory Szorc
wireproto: use CBOR for command requests...
r37308 def testnewandcontinuation(self):
result = self._sendsingleframe(makereactor(),
ffs(b'1 1 stream-begin command-request new|continuation '))
Augie Fackler
tests: add all missing b prefixes in reactor tests...
r37700 self.assertaction(result, b'error')
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070 self.assertEqual(result[1], {
Augie Fackler
tests: add all missing b prefixes in reactor tests...
r37700 b'message': b'received command request frame with both new and '
b'continuation flags set',
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070 })
Gregory Szorc
wireproto: use CBOR for command requests...
r37308 def testneithernewnorcontinuation(self):
result = self._sendsingleframe(makereactor(),
ffs(b'1 1 stream-begin command-request 0 '))
Augie Fackler
tests: add all missing b prefixes in reactor tests...
r37700 self.assertaction(result, b'error')
Gregory Szorc
wireproto: use CBOR for command requests...
r37308 self.assertEqual(result[1], {
Augie Fackler
tests: add all missing b prefixes in reactor tests...
r37700 b'message': b'received command request frame with neither new nor '
b'continuation flags set',
Gregory Szorc
wireproto: support for receiving multiple requests...
r37076 })
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070 def testunexpectedcommanddata(self):
Gregory Szorc
wireproto: use CBOR for command requests...
r37308 """Command data frame when not running a command is an error."""
result = self._sendsingleframe(makereactor(),
ffs(b'1 1 stream-begin command-data 0 ignored'))
Augie Fackler
tests: add all missing b prefixes in reactor tests...
r37700 self.assertaction(result, b'error')
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070 self.assertEqual(result[1], {
Gregory Szorc
wireprotov2: change frame type value for command data...
r37741 b'message': b'expected command request frame; got 2',
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070 })
Gregory Szorc
wireproto: support for receiving multiple requests...
r37076 def testunexpectedcommanddatareceiving(self):
"""Same as above except the command is receiving."""
results = list(sendframes(makereactor(), [
Gregory Szorc
wireproto: use CBOR for command requests...
r37308 ffs(b'1 1 stream-begin command-request new|more '
b"cbor:{b'name': b'ignored'}"),
Gregory Szorc
wireproto: add streams to frame-based protocol...
r37304 ffs(b'1 1 0 command-data eos ignored'),
Gregory Szorc
wireproto: support for receiving multiple requests...
r37076 ]))
Augie Fackler
tests: add all missing b prefixes in reactor tests...
r37700 self.assertaction(results[0], b'wantframe')
self.assertaction(results[1], b'error')
Gregory Szorc
wireproto: support for receiving multiple requests...
r37076 self.assertEqual(results[1][1], {
Augie Fackler
tests: add all missing b prefixes in reactor tests...
r37700 b'message': b'received command data frame for request that is not '
b'expecting data: 1',
Gregory Szorc
wireproto: support for receiving multiple requests...
r37076 })
Gregory Szorc
tests: fix duplicate and failing test...
r37302 def testconflictingrequestidallowed(self):
Gregory Szorc
wireproto: support for receiving multiple requests...
r37076 """Multiple fully serviced commands with same request ID is allowed."""
Gregory Szorc
tests: fix duplicate and failing test...
r37302 reactor = makereactor()
results = []
Gregory Szorc
wireproto: explicit API to create outgoing streams...
r37305 outstream = reactor.makeoutputstream()
Gregory Szorc
tests: fix duplicate and failing test...
r37302 results.append(self._sendsingleframe(
Gregory Szorc
wireproto: use CBOR for command requests...
r37308 reactor, ffs(b'1 1 stream-begin command-request new '
b"cbor:{b'name': b'command'}")))
Gregory Szorc
wireprotov2: change frame type and name for command response...
r37742 result = reactor.oncommandresponseready(outstream, 1, b'response1')
Augie Fackler
tests: add all missing b prefixes in reactor tests...
r37700 self.assertaction(result, b'sendframes')
list(result[1][b'framegen'])
Gregory Szorc
tests: fix duplicate and failing test...
r37302 results.append(self._sendsingleframe(
Gregory Szorc
wireproto: use CBOR for command requests...
r37308 reactor, ffs(b'1 1 stream-begin command-request new '
b"cbor:{b'name': b'command'}")))
Gregory Szorc
wireprotov2: change frame type and name for command response...
r37742 result = reactor.oncommandresponseready(outstream, 1, b'response2')
Augie Fackler
tests: add all missing b prefixes in reactor tests...
r37700 self.assertaction(result, b'sendframes')
list(result[1][b'framegen'])
Gregory Szorc
tests: fix duplicate and failing test...
r37302 results.append(self._sendsingleframe(
Gregory Szorc
wireproto: use CBOR for command requests...
r37308 reactor, ffs(b'1 1 stream-begin command-request new '
b"cbor:{b'name': b'command'}")))
Gregory Szorc
wireprotov2: change frame type and name for command response...
r37742 result = reactor.oncommandresponseready(outstream, 1, b'response3')
Augie Fackler
tests: add all missing b prefixes in reactor tests...
r37700 self.assertaction(result, b'sendframes')
list(result[1][b'framegen'])
Gregory Szorc
tests: fix duplicate and failing test...
r37302
Gregory Szorc
wireproto: support for receiving multiple requests...
r37076 for i in range(3):
Augie Fackler
tests: add all missing b prefixes in reactor tests...
r37700 self.assertaction(results[i], b'runcommand')
Gregory Szorc
wireproto: support for receiving multiple requests...
r37076 self.assertEqual(results[i][1], {
Augie Fackler
tests: add all missing b prefixes in reactor tests...
r37700 b'requestid': 1,
b'command': b'command',
b'args': {},
b'data': None,
Gregory Szorc
wireproto: support for receiving multiple requests...
r37076 })
def testconflictingrequestid(self):
"""Request ID for new command matching in-flight command is illegal."""
results = list(sendframes(makereactor(), [
Gregory Szorc
wireproto: use CBOR for command requests...
r37308 ffs(b'1 1 stream-begin command-request new|more '
b"cbor:{b'name': b'command'}"),
ffs(b'1 1 0 command-request new '
b"cbor:{b'name': b'command1'}"),
Gregory Szorc
wireproto: support for receiving multiple requests...
r37076 ]))
Augie Fackler
tests: add all missing b prefixes in reactor tests...
r37700 self.assertaction(results[0], b'wantframe')
self.assertaction(results[1], b'error')
Gregory Szorc
wireproto: support for receiving multiple requests...
r37076 self.assertEqual(results[1][1], {
Augie Fackler
tests: add all missing b prefixes in reactor tests...
r37700 b'message': b'request with ID 1 already received',
Gregory Szorc
wireproto: support for receiving multiple requests...
r37076 })
def testinterleavedcommands(self):
Gregory Szorc
wireproto: use CBOR for command requests...
r37308 cbor1 = cbor.dumps({
b'name': b'command1',
b'args': {
b'foo': b'bar',
b'key1': b'val',
}
}, canonical=True)
cbor3 = cbor.dumps({
b'name': b'command3',
b'args': {
b'biz': b'baz',
b'key': b'val',
},
}, canonical=True)
Gregory Szorc
wireproto: support for receiving multiple requests...
r37076 results = list(sendframes(makereactor(), [
Gregory Szorc
wireproto: use CBOR for command requests...
r37308 ffs(b'1 1 stream-begin command-request new|more %s' % cbor1[0:6]),
ffs(b'3 1 0 command-request new|more %s' % cbor3[0:10]),
ffs(b'1 1 0 command-request continuation|more %s' % cbor1[6:9]),
ffs(b'3 1 0 command-request continuation|more %s' % cbor3[10:13]),
ffs(b'3 1 0 command-request continuation %s' % cbor3[13:]),
ffs(b'1 1 0 command-request continuation %s' % cbor1[9:]),
Gregory Szorc
wireproto: support for receiving multiple requests...
r37076 ]))
self.assertEqual([t[0] for t in results], [
Augie Fackler
tests: add all missing b prefixes in reactor tests...
r37700 b'wantframe',
b'wantframe',
b'wantframe',
b'wantframe',
b'runcommand',
b'runcommand',
Gregory Szorc
wireproto: support for receiving multiple requests...
r37076 ])
self.assertEqual(results[4][1], {
Augie Fackler
tests: add all missing b prefixes in reactor tests...
r37700 b'requestid': 3,
b'command': b'command3',
b'args': {b'biz': b'baz', b'key': b'val'},
b'data': None,
Gregory Szorc
wireproto: support for receiving multiple requests...
r37076 })
self.assertEqual(results[5][1], {
Augie Fackler
tests: add all missing b prefixes in reactor tests...
r37700 b'requestid': 1,
b'command': b'command1',
b'args': {b'foo': b'bar', b'key1': b'val'},
b'data': None,
Gregory Szorc
wireproto: support for receiving multiple requests...
r37076 })
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070 def testmissingcommanddataframe(self):
Gregory Szorc
wireproto: support for receiving multiple requests...
r37076 # The reactor doesn't currently handle partially received commands.
# So this test is failing to do anything with request 1.
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070 frames = [
Gregory Szorc
wireproto: use CBOR for command requests...
r37308 ffs(b'1 1 stream-begin command-request new|have-data '
b"cbor:{b'name': b'command1'}"),
ffs(b'3 1 0 command-request new '
b"cbor:{b'name': b'command2'}"),
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070 ]
results = list(sendframes(makereactor(), frames))
self.assertEqual(len(results), 2)
Augie Fackler
tests: add all missing b prefixes in reactor tests...
r37700 self.assertaction(results[0], b'wantframe')
self.assertaction(results[1], b'runcommand')
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070
def testmissingcommanddataframeflags(self):
frames = [
Gregory Szorc
wireproto: use CBOR for command requests...
r37308 ffs(b'1 1 stream-begin command-request new|have-data '
b"cbor:{b'name': b'command1'}"),
Gregory Szorc
wireproto: add streams to frame-based protocol...
r37304 ffs(b'1 1 0 command-data 0 data'),
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070 ]
results = list(sendframes(makereactor(), frames))
self.assertEqual(len(results), 2)
Augie Fackler
tests: add all missing b prefixes in reactor tests...
r37700 self.assertaction(results[0], b'wantframe')
self.assertaction(results[1], b'error')
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070 self.assertEqual(results[1][1], {
Augie Fackler
tests: add all missing b prefixes in reactor tests...
r37700 b'message': b'command data frame without flags',
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070 })
Gregory Szorc
wireproto: support for receiving multiple requests...
r37076 def testframefornonreceivingrequest(self):
"""Receiving a frame for a command that is not receiving is illegal."""
results = list(sendframes(makereactor(), [
Gregory Szorc
wireproto: use CBOR for command requests...
r37308 ffs(b'1 1 stream-begin command-request new '
b"cbor:{b'name': b'command1'}"),
ffs(b'3 1 0 command-request new|have-data '
b"cbor:{b'name': b'command3'}"),
ffs(b'5 1 0 command-data eos ignored'),
Gregory Szorc
wireproto: support for receiving multiple requests...
r37076 ]))
Augie Fackler
tests: add all missing b prefixes in reactor tests...
r37700 self.assertaction(results[2], b'error')
Gregory Szorc
wireproto: support for receiving multiple requests...
r37076 self.assertEqual(results[2][1], {
Augie Fackler
tests: add all missing b prefixes in reactor tests...
r37700 b'message': b'received frame for request that is not receiving: 5',
Gregory Szorc
wireproto: support for receiving multiple requests...
r37076 })
Gregory Szorc
wireproto: define and implement responses in framing protocol...
r37073 def testsimpleresponse(self):
"""Bytes response to command sends result frames."""
reactor = makereactor()
Gregory Szorc
wireproto: add streams to frame-based protocol...
r37304 instream = framing.stream(1)
Gregory Szorc
wireproto: start to associate frame generation with a stream...
r37303 list(sendcommandframes(reactor, instream, 1, b'mycommand', {}))
Gregory Szorc
wireproto: define and implement responses in framing protocol...
r37073
Gregory Szorc
wireproto: explicit API to create outgoing streams...
r37305 outstream = reactor.makeoutputstream()
Gregory Szorc
wireprotov2: change frame type and name for command response...
r37742 result = reactor.oncommandresponseready(outstream, 1, b'response')
Augie Fackler
tests: add all missing b prefixes in reactor tests...
r37700 self.assertaction(result, b'sendframes')
self.assertframesequal(result[1][b'framegen'], [
Gregory Szorc
wireprotov2: change command response protocol to include a leading map...
r37743 b'1 2 stream-begin command-response eos %sresponse' % OK,
Gregory Szorc
wireproto: define and implement responses in framing protocol...
r37073 ])
def testmultiframeresponse(self):
"""Bytes response spanning multiple frames is handled."""
first = b'x' * framing.DEFAULT_MAX_FRAME_SIZE
second = b'y' * 100
reactor = makereactor()
Gregory Szorc
wireproto: add streams to frame-based protocol...
r37304 instream = framing.stream(1)
Gregory Szorc
wireproto: start to associate frame generation with a stream...
r37303 list(sendcommandframes(reactor, instream, 1, b'mycommand', {}))
Gregory Szorc
wireproto: define and implement responses in framing protocol...
r37073
Gregory Szorc
wireproto: explicit API to create outgoing streams...
r37305 outstream = reactor.makeoutputstream()
Gregory Szorc
wireprotov2: change frame type and name for command response...
r37742 result = reactor.oncommandresponseready(outstream, 1, first + second)
Augie Fackler
tests: add all missing b prefixes in reactor tests...
r37700 self.assertaction(result, b'sendframes')
self.assertframesequal(result[1][b'framegen'], [
Gregory Szorc
wireprotov2: change command response protocol to include a leading map...
r37743 b'1 2 stream-begin command-response continuation %s' % OK,
b'1 2 0 command-response continuation %s' % first,
Gregory Szorc
wireprotov2: change frame type and name for command response...
r37742 b'1 2 0 command-response eos %s' % second,
Gregory Szorc
wireproto: define and implement responses in framing protocol...
r37073 ])
Gregory Szorc
wireprotov2: change behavior of error frame...
r37744 def testservererror(self):
Gregory Szorc
wireproto: define and implement responses in framing protocol...
r37073 reactor = makereactor()
Gregory Szorc
wireproto: add streams to frame-based protocol...
r37304 instream = framing.stream(1)
Gregory Szorc
wireproto: start to associate frame generation with a stream...
r37303 list(sendcommandframes(reactor, instream, 1, b'mycommand', {}))
Gregory Szorc
wireproto: define and implement responses in framing protocol...
r37073
Gregory Szorc
wireproto: explicit API to create outgoing streams...
r37305 outstream = reactor.makeoutputstream()
Gregory Szorc
wireprotov2: change behavior of error frame...
r37744 result = reactor.onservererror(outstream, 1, b'some message')
Augie Fackler
tests: add all missing b prefixes in reactor tests...
r37700 self.assertaction(result, b'sendframes')
self.assertframesequal(result[1][b'framegen'], [
Gregory Szorc
wireprotov2: change behavior of error frame...
r37744 b"1 2 stream-begin error-response 0 "
b"cbor:{b'type': b'server', "
b"b'message': [{b'msg': b'some message'}]}",
Gregory Szorc
wireproto: define and implement responses in framing protocol...
r37073 ])
Gregory Szorc
wireproto: buffer output frames when in half duplex mode...
r37074 def test1commanddeferresponse(self):
"""Responses when in deferred output mode are delayed until EOF."""
reactor = makereactor(deferoutput=True)
Gregory Szorc
wireproto: add streams to frame-based protocol...
r37304 instream = framing.stream(1)
Gregory Szorc
wireproto: start to associate frame generation with a stream...
r37303 results = list(sendcommandframes(reactor, instream, 1, b'mycommand',
{}))
Gregory Szorc
wireproto: buffer output frames when in half duplex mode...
r37074 self.assertEqual(len(results), 1)
Augie Fackler
tests: add all missing b prefixes in reactor tests...
r37700 self.assertaction(results[0], b'runcommand')
Gregory Szorc
wireproto: buffer output frames when in half duplex mode...
r37074
Gregory Szorc
wireproto: explicit API to create outgoing streams...
r37305 outstream = reactor.makeoutputstream()
Gregory Szorc
wireprotov2: change frame type and name for command response...
r37742 result = reactor.oncommandresponseready(outstream, 1, b'response')
Augie Fackler
tests: add all missing b prefixes in reactor tests...
r37700 self.assertaction(result, b'noop')
Gregory Szorc
wireproto: buffer output frames when in half duplex mode...
r37074 result = reactor.oninputeof()
Augie Fackler
tests: add all missing b prefixes in reactor tests...
r37700 self.assertaction(result, b'sendframes')
self.assertframesequal(result[1][b'framegen'], [
Gregory Szorc
wireprotov2: change command response protocol to include a leading map...
r37743 b'1 2 stream-begin command-response eos %sresponse' % OK,
Gregory Szorc
wireproto: buffer output frames when in half duplex mode...
r37074 ])
def testmultiplecommanddeferresponse(self):
reactor = makereactor(deferoutput=True)
Gregory Szorc
wireproto: add streams to frame-based protocol...
r37304 instream = framing.stream(1)
Gregory Szorc
wireproto: start to associate frame generation with a stream...
r37303 list(sendcommandframes(reactor, instream, 1, b'command1', {}))
list(sendcommandframes(reactor, instream, 3, b'command2', {}))
Gregory Szorc
wireproto: buffer output frames when in half duplex mode...
r37074
Gregory Szorc
wireproto: explicit API to create outgoing streams...
r37305 outstream = reactor.makeoutputstream()
Gregory Szorc
wireprotov2: change frame type and name for command response...
r37742 result = reactor.oncommandresponseready(outstream, 1, b'response1')
Augie Fackler
tests: add all missing b prefixes in reactor tests...
r37700 self.assertaction(result, b'noop')
Gregory Szorc
wireprotov2: change frame type and name for command response...
r37742 result = reactor.oncommandresponseready(outstream, 3, b'response2')
Augie Fackler
tests: add all missing b prefixes in reactor tests...
r37700 self.assertaction(result, b'noop')
Gregory Szorc
wireproto: buffer output frames when in half duplex mode...
r37074 result = reactor.oninputeof()
Augie Fackler
tests: add all missing b prefixes in reactor tests...
r37700 self.assertaction(result, b'sendframes')
self.assertframesequal(result[1][b'framegen'], [
Gregory Szorc
wireprotov2: change command response protocol to include a leading map...
r37743 b'1 2 stream-begin command-response eos %sresponse1' % OK,
b'3 2 0 command-response eos %sresponse2' % OK,
Gregory Szorc
wireproto: add request IDs to frames...
r37075 ])
def testrequestidtracking(self):
reactor = makereactor(deferoutput=True)
Gregory Szorc
wireproto: add streams to frame-based protocol...
r37304 instream = framing.stream(1)
Gregory Szorc
wireproto: start to associate frame generation with a stream...
r37303 list(sendcommandframes(reactor, instream, 1, b'command1', {}))
list(sendcommandframes(reactor, instream, 3, b'command2', {}))
list(sendcommandframes(reactor, instream, 5, b'command3', {}))
Gregory Szorc
wireproto: add request IDs to frames...
r37075
# Register results for commands out of order.
Gregory Szorc
wireproto: explicit API to create outgoing streams...
r37305 outstream = reactor.makeoutputstream()
Gregory Szorc
wireprotov2: change frame type and name for command response...
r37742 reactor.oncommandresponseready(outstream, 3, b'response3')
reactor.oncommandresponseready(outstream, 1, b'response1')
reactor.oncommandresponseready(outstream, 5, b'response5')
Gregory Szorc
wireproto: add request IDs to frames...
r37075
result = reactor.oninputeof()
Augie Fackler
tests: add all missing b prefixes in reactor tests...
r37700 self.assertaction(result, b'sendframes')
self.assertframesequal(result[1][b'framegen'], [
Gregory Szorc
wireprotov2: change command response protocol to include a leading map...
r37743 b'3 2 stream-begin command-response eos %sresponse3' % OK,
b'1 2 0 command-response eos %sresponse1' % OK,
b'5 2 0 command-response eos %sresponse5' % OK,
Gregory Szorc
wireproto: buffer output frames when in half duplex mode...
r37074 ])
Gregory Szorc
wireproto: explicitly track which requests are active...
r37081 def testduplicaterequestonactivecommand(self):
"""Receiving a request ID that matches a request that isn't finished."""
reactor = makereactor()
Gregory Szorc
wireproto: add streams to frame-based protocol...
r37304 stream = framing.stream(1)
Gregory Szorc
wireproto: start to associate frame generation with a stream...
r37303 list(sendcommandframes(reactor, stream, 1, b'command1', {}))
results = list(sendcommandframes(reactor, stream, 1, b'command1', {}))
Gregory Szorc
wireproto: explicitly track which requests are active...
r37081
Augie Fackler
tests: add all missing b prefixes in reactor tests...
r37700 self.assertaction(results[0], b'error')
Gregory Szorc
wireproto: explicitly track which requests are active...
r37081 self.assertEqual(results[0][1], {
Augie Fackler
tests: add all missing b prefixes in reactor tests...
r37700 b'message': b'request with ID 1 is already active',
Gregory Szorc
wireproto: explicitly track which requests are active...
r37081 })
def testduplicaterequestonactivecommandnosend(self):
"""Same as above but we've registered a response but haven't sent it."""
reactor = makereactor()
Gregory Szorc
wireproto: add streams to frame-based protocol...
r37304 instream = framing.stream(1)
Gregory Szorc
wireproto: start to associate frame generation with a stream...
r37303 list(sendcommandframes(reactor, instream, 1, b'command1', {}))
Gregory Szorc
wireproto: explicit API to create outgoing streams...
r37305 outstream = reactor.makeoutputstream()
Gregory Szorc
wireprotov2: change frame type and name for command response...
r37742 reactor.oncommandresponseready(outstream, 1, b'response')
Gregory Szorc
wireproto: explicitly track which requests are active...
r37081
# We've registered the response but haven't sent it. From the
# perspective of the reactor, the command is still active.
Gregory Szorc
wireproto: start to associate frame generation with a stream...
r37303 results = list(sendcommandframes(reactor, instream, 1, b'command1', {}))
Augie Fackler
tests: add all missing b prefixes in reactor tests...
r37700 self.assertaction(results[0], b'error')
Gregory Szorc
wireproto: explicitly track which requests are active...
r37081 self.assertEqual(results[0][1], {
Augie Fackler
tests: add all missing b prefixes in reactor tests...
r37700 b'message': b'request with ID 1 is already active',
Gregory Szorc
wireproto: explicitly track which requests are active...
r37081 })
def testduplicaterequestaftersend(self):
"""We can use a duplicate request ID after we've sent the response."""
reactor = makereactor()
Gregory Szorc
wireproto: add streams to frame-based protocol...
r37304 instream = framing.stream(1)
Gregory Szorc
wireproto: start to associate frame generation with a stream...
r37303 list(sendcommandframes(reactor, instream, 1, b'command1', {}))
Gregory Szorc
wireproto: explicit API to create outgoing streams...
r37305 outstream = reactor.makeoutputstream()
Gregory Szorc
wireprotov2: change frame type and name for command response...
r37742 res = reactor.oncommandresponseready(outstream, 1, b'response')
Augie Fackler
tests: add all missing b prefixes in reactor tests...
r37700 list(res[1][b'framegen'])
Gregory Szorc
wireproto: explicitly track which requests are active...
r37081
Gregory Szorc
wireproto: start to associate frame generation with a stream...
r37303 results = list(sendcommandframes(reactor, instream, 1, b'command1', {}))
Augie Fackler
tests: add all missing b prefixes in reactor tests...
r37700 self.assertaction(results[0], b'runcommand')
Gregory Szorc
wireproto: explicitly track which requests are active...
r37081
Gregory Szorc
wireproto: implement basic frame reading and processing...
r37070 if __name__ == '__main__':
import silenttestrunner
silenttestrunner.main(__name__)