Show More
@@ -551,18 +551,19 class httpv2peer(object): | |||||
551 |
|
551 | |||
552 | self.ui.note(_('received %r\n') % frame) |
|
552 | self.ui.note(_('received %r\n') % frame) | |
553 |
|
553 | |||
554 | if frame.typeid == wireprotoframing.FRAME_TYPE_BYTES_RESPONSE: |
|
554 | action, meta = reactor.onframerecv(frame) | |
555 | if frame.flags & wireprotoframing.FLAG_BYTES_RESPONSE_CBOR: |
|
555 | ||
556 | payload = util.bytesio(frame.payload) |
|
556 | if action == 'responsedata': | |
|
557 | if meta['cbor']: | |||
|
558 | payload = util.bytesio(meta['data']) | |||
557 |
|
559 | |||
558 | decoder = cbor.CBORDecoder(payload) |
|
560 | decoder = cbor.CBORDecoder(payload) | |
559 |
while payload.tell() + 1 < len( |
|
561 | while payload.tell() + 1 < len(meta['data']): | |
560 | results.append(decoder.decode()) |
|
562 | results.append(decoder.decode()) | |
561 | else: |
|
563 | else: | |
562 |
results.append( |
|
564 | results.append(meta['data']) | |
563 | else: |
|
565 | else: | |
564 |
error.ProgrammingError('unhandled |
|
566 | error.ProgrammingError('unhandled action: %s' % action) | |
565 | frame.typeid) |
|
|||
566 |
|
567 | |||
567 | return results |
|
568 | return results | |
568 |
|
569 |
@@ -922,6 +922,7 class clientreactor(object): | |||||
922 | self._outgoingstream = stream(1) |
|
922 | self._outgoingstream = stream(1) | |
923 | self._pendingrequests = collections.deque() |
|
923 | self._pendingrequests = collections.deque() | |
924 | self._activerequests = {} |
|
924 | self._activerequests = {} | |
|
925 | self._incomingstreams = {} | |||
925 |
|
926 | |||
926 | def callcommand(self, name, args, datafh=None): |
|
927 | def callcommand(self, name, args, datafh=None): | |
927 | """Request that a command be executed. |
|
928 | """Request that a command be executed. | |
@@ -1007,3 +1008,63 class clientreactor(object): | |||||
1007 | yield frame |
|
1008 | yield frame | |
1008 |
|
1009 | |||
1009 | request.state = 'sent' |
|
1010 | request.state = 'sent' | |
|
1011 | ||||
|
1012 | def onframerecv(self, frame): | |||
|
1013 | """Process a frame that has been received off the wire. | |||
|
1014 | ||||
|
1015 | Returns a 2-tuple of (action, meta) describing further action the | |||
|
1016 | caller needs to take as a result of receiving this frame. | |||
|
1017 | """ | |||
|
1018 | if frame.streamid % 2: | |||
|
1019 | return 'error', { | |||
|
1020 | 'message': ( | |||
|
1021 | _('received frame with odd numbered stream ID: %d') % | |||
|
1022 | frame.streamid), | |||
|
1023 | } | |||
|
1024 | ||||
|
1025 | if frame.streamid not in self._incomingstreams: | |||
|
1026 | if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM: | |||
|
1027 | return 'error', { | |||
|
1028 | 'message': _('received frame on unknown stream ' | |||
|
1029 | 'without beginning of stream flag set'), | |||
|
1030 | } | |||
|
1031 | ||||
|
1032 | if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED: | |||
|
1033 | raise error.ProgrammingError('support for decoding stream ' | |||
|
1034 | 'payloads not yet implemneted') | |||
|
1035 | ||||
|
1036 | if frame.streamflags & STREAM_FLAG_END_STREAM: | |||
|
1037 | del self._incomingstreams[frame.streamid] | |||
|
1038 | ||||
|
1039 | if frame.requestid not in self._activerequests: | |||
|
1040 | return 'error', { | |||
|
1041 | 'message': (_('received frame for inactive request ID: %d') % | |||
|
1042 | frame.requestid), | |||
|
1043 | } | |||
|
1044 | ||||
|
1045 | request = self._activerequests[frame.requestid] | |||
|
1046 | request.state = 'receiving' | |||
|
1047 | ||||
|
1048 | handlers = { | |||
|
1049 | FRAME_TYPE_BYTES_RESPONSE: self._onbytesresponseframe, | |||
|
1050 | } | |||
|
1051 | ||||
|
1052 | meth = handlers.get(frame.typeid) | |||
|
1053 | if not meth: | |||
|
1054 | raise error.ProgrammingError('unhandled frame type: %d' % | |||
|
1055 | frame.typeid) | |||
|
1056 | ||||
|
1057 | return meth(request, frame) | |||
|
1058 | ||||
|
1059 | def _onbytesresponseframe(self, request, frame): | |||
|
1060 | if frame.flags & FLAG_BYTES_RESPONSE_EOS: | |||
|
1061 | request.state = 'received' | |||
|
1062 | del self._activerequests[request.requestid] | |||
|
1063 | ||||
|
1064 | return 'responsedata', { | |||
|
1065 | 'request': request, | |||
|
1066 | 'expectmore': frame.flags & FLAG_BYTES_RESPONSE_CONTINUATION, | |||
|
1067 | 'eos': frame.flags & FLAG_BYTES_RESPONSE_EOS, | |||
|
1068 | 'cbor': frame.flags & FLAG_BYTES_RESPONSE_CBOR, | |||
|
1069 | 'data': frame.payload, | |||
|
1070 | } |
@@ -7,6 +7,21 from mercurial import ( | |||||
7 | wireprotoframing as framing, |
|
7 | wireprotoframing as framing, | |
8 | ) |
|
8 | ) | |
9 |
|
9 | |||
|
10 | ffs = framing.makeframefromhumanstring | |||
|
11 | ||||
|
12 | def sendframe(reactor, frame): | |||
|
13 | """Send a frame bytearray to a reactor.""" | |||
|
14 | header = framing.parseheader(frame) | |||
|
15 | payload = frame[framing.FRAME_HEADER_SIZE:] | |||
|
16 | assert len(payload) == header.length | |||
|
17 | ||||
|
18 | return reactor.onframerecv(framing.frame(header.requestid, | |||
|
19 | header.streamid, | |||
|
20 | header.streamflags, | |||
|
21 | header.typeid, | |||
|
22 | header.flags, | |||
|
23 | payload)) | |||
|
24 | ||||
10 | class SingleSendTests(unittest.TestCase): |
|
25 | class SingleSendTests(unittest.TestCase): | |
11 | """A reactor that can only send once rejects subsequent sends.""" |
|
26 | """A reactor that can only send once rejects subsequent sends.""" | |
12 | def testbasic(self): |
|
27 | def testbasic(self): | |
@@ -61,6 +76,35 class NoBufferTests(unittest.TestCase): | |||||
61 |
|
76 | |||
62 | self.assertEqual(request.state, 'sent') |
|
77 | self.assertEqual(request.state, 'sent') | |
63 |
|
78 | |||
|
79 | class BadFrameRecvTests(unittest.TestCase): | |||
|
80 | def testoddstream(self): | |||
|
81 | reactor = framing.clientreactor() | |||
|
82 | ||||
|
83 | action, meta = sendframe(reactor, ffs(b'1 1 0 1 0 foo')) | |||
|
84 | self.assertEqual(action, 'error') | |||
|
85 | self.assertEqual(meta['message'], | |||
|
86 | 'received frame with odd numbered stream ID: 1') | |||
|
87 | ||||
|
88 | def testunknownstream(self): | |||
|
89 | reactor = framing.clientreactor() | |||
|
90 | ||||
|
91 | action, meta = sendframe(reactor, ffs(b'1 0 0 1 0 foo')) | |||
|
92 | self.assertEqual(action, 'error') | |||
|
93 | self.assertEqual(meta['message'], | |||
|
94 | 'received frame on unknown stream without beginning ' | |||
|
95 | 'of stream flag set') | |||
|
96 | ||||
|
97 | def testunhandledframetype(self): | |||
|
98 | reactor = framing.clientreactor(buffersends=False) | |||
|
99 | ||||
|
100 | request, action, meta = reactor.callcommand(b'foo', {}) | |||
|
101 | for frame in meta['framegen']: | |||
|
102 | pass | |||
|
103 | ||||
|
104 | with self.assertRaisesRegexp(error.ProgrammingError, | |||
|
105 | 'unhandled frame type'): | |||
|
106 | sendframe(reactor, ffs(b'1 0 stream-begin text-output 0 foo')) | |||
|
107 | ||||
64 | if __name__ == '__main__': |
|
108 | if __name__ == '__main__': | |
65 | import silenttestrunner |
|
109 | import silenttestrunner | |
66 | silenttestrunner.main(__name__) |
|
110 | silenttestrunner.main(__name__) |
General Comments 0
You need to be logged in to leave comments.
Login now