##// END OF EJS Templates
wireproto: client reactor support for receiving frames...
Gregory Szorc -
r37562:55b5ba8d default
parent child Browse files
Show More
@@ -551,18 +551,19 class httpv2peer(object):
551
551
552 self.ui.note(_('received %r\n') % frame)
552 self.ui.note(_('received %r\n') % frame)
553
553
554 if frame.typeid == wireprotoframing.FRAME_TYPE_BYTES_RESPONSE:
554 action, meta = reactor.onframerecv(frame)
555 if frame.flags & wireprotoframing.FLAG_BYTES_RESPONSE_CBOR:
555
556 payload = util.bytesio(frame.payload)
556 if action == 'responsedata':
557 if meta['cbor']:
558 payload = util.bytesio(meta['data'])
557
559
558 decoder = cbor.CBORDecoder(payload)
560 decoder = cbor.CBORDecoder(payload)
559 while payload.tell() + 1 < len(frame.payload):
561 while payload.tell() + 1 < len(meta['data']):
560 results.append(decoder.decode())
562 results.append(decoder.decode())
561 else:
563 else:
562 results.append(frame.payload)
564 results.append(meta['data'])
563 else:
565 else:
564 error.ProgrammingError('unhandled frame type: %d' %
566 error.ProgrammingError('unhandled action: %s' % action)
565 frame.typeid)
566
567
567 return results
568 return results
568
569
@@ -922,6 +922,7 class clientreactor(object):
922 self._outgoingstream = stream(1)
922 self._outgoingstream = stream(1)
923 self._pendingrequests = collections.deque()
923 self._pendingrequests = collections.deque()
924 self._activerequests = {}
924 self._activerequests = {}
925 self._incomingstreams = {}
925
926
926 def callcommand(self, name, args, datafh=None):
927 def callcommand(self, name, args, datafh=None):
927 """Request that a command be executed.
928 """Request that a command be executed.
@@ -1007,3 +1008,63 class clientreactor(object):
1007 yield frame
1008 yield frame
1008
1009
1009 request.state = 'sent'
1010 request.state = 'sent'
1011
1012 def onframerecv(self, frame):
1013 """Process a frame that has been received off the wire.
1014
1015 Returns a 2-tuple of (action, meta) describing further action the
1016 caller needs to take as a result of receiving this frame.
1017 """
1018 if frame.streamid % 2:
1019 return 'error', {
1020 'message': (
1021 _('received frame with odd numbered stream ID: %d') %
1022 frame.streamid),
1023 }
1024
1025 if frame.streamid not in self._incomingstreams:
1026 if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM:
1027 return 'error', {
1028 'message': _('received frame on unknown stream '
1029 'without beginning of stream flag set'),
1030 }
1031
1032 if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED:
1033 raise error.ProgrammingError('support for decoding stream '
1034 'payloads not yet implemneted')
1035
1036 if frame.streamflags & STREAM_FLAG_END_STREAM:
1037 del self._incomingstreams[frame.streamid]
1038
1039 if frame.requestid not in self._activerequests:
1040 return 'error', {
1041 'message': (_('received frame for inactive request ID: %d') %
1042 frame.requestid),
1043 }
1044
1045 request = self._activerequests[frame.requestid]
1046 request.state = 'receiving'
1047
1048 handlers = {
1049 FRAME_TYPE_BYTES_RESPONSE: self._onbytesresponseframe,
1050 }
1051
1052 meth = handlers.get(frame.typeid)
1053 if not meth:
1054 raise error.ProgrammingError('unhandled frame type: %d' %
1055 frame.typeid)
1056
1057 return meth(request, frame)
1058
1059 def _onbytesresponseframe(self, request, frame):
1060 if frame.flags & FLAG_BYTES_RESPONSE_EOS:
1061 request.state = 'received'
1062 del self._activerequests[request.requestid]
1063
1064 return 'responsedata', {
1065 'request': request,
1066 'expectmore': frame.flags & FLAG_BYTES_RESPONSE_CONTINUATION,
1067 'eos': frame.flags & FLAG_BYTES_RESPONSE_EOS,
1068 'cbor': frame.flags & FLAG_BYTES_RESPONSE_CBOR,
1069 'data': frame.payload,
1070 }
@@ -7,6 +7,21 from mercurial import (
7 wireprotoframing as framing,
7 wireprotoframing as framing,
8 )
8 )
9
9
10 ffs = framing.makeframefromhumanstring
11
12 def sendframe(reactor, frame):
13 """Send a frame bytearray to a reactor."""
14 header = framing.parseheader(frame)
15 payload = frame[framing.FRAME_HEADER_SIZE:]
16 assert len(payload) == header.length
17
18 return reactor.onframerecv(framing.frame(header.requestid,
19 header.streamid,
20 header.streamflags,
21 header.typeid,
22 header.flags,
23 payload))
24
10 class SingleSendTests(unittest.TestCase):
25 class SingleSendTests(unittest.TestCase):
11 """A reactor that can only send once rejects subsequent sends."""
26 """A reactor that can only send once rejects subsequent sends."""
12 def testbasic(self):
27 def testbasic(self):
@@ -61,6 +76,35 class NoBufferTests(unittest.TestCase):
61
76
62 self.assertEqual(request.state, 'sent')
77 self.assertEqual(request.state, 'sent')
63
78
79 class BadFrameRecvTests(unittest.TestCase):
80 def testoddstream(self):
81 reactor = framing.clientreactor()
82
83 action, meta = sendframe(reactor, ffs(b'1 1 0 1 0 foo'))
84 self.assertEqual(action, 'error')
85 self.assertEqual(meta['message'],
86 'received frame with odd numbered stream ID: 1')
87
88 def testunknownstream(self):
89 reactor = framing.clientreactor()
90
91 action, meta = sendframe(reactor, ffs(b'1 0 0 1 0 foo'))
92 self.assertEqual(action, 'error')
93 self.assertEqual(meta['message'],
94 'received frame on unknown stream without beginning '
95 'of stream flag set')
96
97 def testunhandledframetype(self):
98 reactor = framing.clientreactor(buffersends=False)
99
100 request, action, meta = reactor.callcommand(b'foo', {})
101 for frame in meta['framegen']:
102 pass
103
104 with self.assertRaisesRegexp(error.ProgrammingError,
105 'unhandled frame type'):
106 sendframe(reactor, ffs(b'1 0 stream-begin text-output 0 foo'))
107
64 if __name__ == '__main__':
108 if __name__ == '__main__':
65 import silenttestrunner
109 import silenttestrunner
66 silenttestrunner.main(__name__)
110 silenttestrunner.main(__name__)
General Comments 0
You need to be logged in to leave comments. Login now