##// END OF EJS Templates
wireproto: client reactor support for receiving frames...
Gregory Szorc -
r37562:55b5ba8d default
parent child Browse files
Show More
@@ -551,18 +551,19 class httpv2peer(object):
551 551
552 552 self.ui.note(_('received %r\n') % frame)
553 553
554 if frame.typeid == wireprotoframing.FRAME_TYPE_BYTES_RESPONSE:
555 if frame.flags & wireprotoframing.FLAG_BYTES_RESPONSE_CBOR:
556 payload = util.bytesio(frame.payload)
554 action, meta = reactor.onframerecv(frame)
555
556 if action == 'responsedata':
557 if meta['cbor']:
558 payload = util.bytesio(meta['data'])
557 559
558 560 decoder = cbor.CBORDecoder(payload)
559 while payload.tell() + 1 < len(frame.payload):
561 while payload.tell() + 1 < len(meta['data']):
560 562 results.append(decoder.decode())
561 563 else:
562 results.append(frame.payload)
564 results.append(meta['data'])
563 565 else:
564 error.ProgrammingError('unhandled frame type: %d' %
565 frame.typeid)
566 error.ProgrammingError('unhandled action: %s' % action)
566 567
567 568 return results
568 569
@@ -922,6 +922,7 class clientreactor(object):
922 922 self._outgoingstream = stream(1)
923 923 self._pendingrequests = collections.deque()
924 924 self._activerequests = {}
925 self._incomingstreams = {}
925 926
926 927 def callcommand(self, name, args, datafh=None):
927 928 """Request that a command be executed.
@@ -1007,3 +1008,63 class clientreactor(object):
1007 1008 yield frame
1008 1009
1009 1010 request.state = 'sent'
1011
1012 def onframerecv(self, frame):
1013 """Process a frame that has been received off the wire.
1014
1015 Returns a 2-tuple of (action, meta) describing further action the
1016 caller needs to take as a result of receiving this frame.
1017 """
1018 if frame.streamid % 2:
1019 return 'error', {
1020 'message': (
1021 _('received frame with odd numbered stream ID: %d') %
1022 frame.streamid),
1023 }
1024
1025 if frame.streamid not in self._incomingstreams:
1026 if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM:
1027 return 'error', {
1028 'message': _('received frame on unknown stream '
1029 'without beginning of stream flag set'),
1030 }
1031
1032 if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED:
1033 raise error.ProgrammingError('support for decoding stream '
1034 'payloads not yet implemneted')
1035
1036 if frame.streamflags & STREAM_FLAG_END_STREAM:
1037 del self._incomingstreams[frame.streamid]
1038
1039 if frame.requestid not in self._activerequests:
1040 return 'error', {
1041 'message': (_('received frame for inactive request ID: %d') %
1042 frame.requestid),
1043 }
1044
1045 request = self._activerequests[frame.requestid]
1046 request.state = 'receiving'
1047
1048 handlers = {
1049 FRAME_TYPE_BYTES_RESPONSE: self._onbytesresponseframe,
1050 }
1051
1052 meth = handlers.get(frame.typeid)
1053 if not meth:
1054 raise error.ProgrammingError('unhandled frame type: %d' %
1055 frame.typeid)
1056
1057 return meth(request, frame)
1058
1059 def _onbytesresponseframe(self, request, frame):
1060 if frame.flags & FLAG_BYTES_RESPONSE_EOS:
1061 request.state = 'received'
1062 del self._activerequests[request.requestid]
1063
1064 return 'responsedata', {
1065 'request': request,
1066 'expectmore': frame.flags & FLAG_BYTES_RESPONSE_CONTINUATION,
1067 'eos': frame.flags & FLAG_BYTES_RESPONSE_EOS,
1068 'cbor': frame.flags & FLAG_BYTES_RESPONSE_CBOR,
1069 'data': frame.payload,
1070 }
@@ -7,6 +7,21 from mercurial import (
7 7 wireprotoframing as framing,
8 8 )
9 9
10 ffs = framing.makeframefromhumanstring
11
12 def sendframe(reactor, frame):
13 """Send a frame bytearray to a reactor."""
14 header = framing.parseheader(frame)
15 payload = frame[framing.FRAME_HEADER_SIZE:]
16 assert len(payload) == header.length
17
18 return reactor.onframerecv(framing.frame(header.requestid,
19 header.streamid,
20 header.streamflags,
21 header.typeid,
22 header.flags,
23 payload))
24
10 25 class SingleSendTests(unittest.TestCase):
11 26 """A reactor that can only send once rejects subsequent sends."""
12 27 def testbasic(self):
@@ -61,6 +76,35 class NoBufferTests(unittest.TestCase):
61 76
62 77 self.assertEqual(request.state, 'sent')
63 78
79 class BadFrameRecvTests(unittest.TestCase):
80 def testoddstream(self):
81 reactor = framing.clientreactor()
82
83 action, meta = sendframe(reactor, ffs(b'1 1 0 1 0 foo'))
84 self.assertEqual(action, 'error')
85 self.assertEqual(meta['message'],
86 'received frame with odd numbered stream ID: 1')
87
88 def testunknownstream(self):
89 reactor = framing.clientreactor()
90
91 action, meta = sendframe(reactor, ffs(b'1 0 0 1 0 foo'))
92 self.assertEqual(action, 'error')
93 self.assertEqual(meta['message'],
94 'received frame on unknown stream without beginning '
95 'of stream flag set')
96
97 def testunhandledframetype(self):
98 reactor = framing.clientreactor(buffersends=False)
99
100 request, action, meta = reactor.callcommand(b'foo', {})
101 for frame in meta['framegen']:
102 pass
103
104 with self.assertRaisesRegexp(error.ProgrammingError,
105 'unhandled frame type'):
106 sendframe(reactor, ffs(b'1 0 stream-begin text-output 0 foo'))
107
64 108 if __name__ == '__main__':
65 109 import silenttestrunner
66 110 silenttestrunner.main(__name__)
General Comments 0
You need to be logged in to leave comments. Login now