Show More
@@ -551,18 +551,19 class httpv2peer(object): | |||
|
551 | 551 | |
|
552 | 552 | self.ui.note(_('received %r\n') % frame) |
|
553 | 553 | |
|
554 | if frame.typeid == wireprotoframing.FRAME_TYPE_BYTES_RESPONSE: | |
|
555 | if frame.flags & wireprotoframing.FLAG_BYTES_RESPONSE_CBOR: | |
|
556 | payload = util.bytesio(frame.payload) | |
|
554 | action, meta = reactor.onframerecv(frame) | |
|
555 | ||
|
556 | if action == 'responsedata': | |
|
557 | if meta['cbor']: | |
|
558 | payload = util.bytesio(meta['data']) | |
|
557 | 559 | |
|
558 | 560 | decoder = cbor.CBORDecoder(payload) |
|
559 |
while payload.tell() + 1 < len( |
|
|
561 | while payload.tell() + 1 < len(meta['data']): | |
|
560 | 562 | results.append(decoder.decode()) |
|
561 | 563 | else: |
|
562 |
results.append( |
|
|
564 | results.append(meta['data']) | |
|
563 | 565 | else: |
|
564 |
error.ProgrammingError('unhandled |
|
|
565 | frame.typeid) | |
|
566 | error.ProgrammingError('unhandled action: %s' % action) | |
|
566 | 567 | |
|
567 | 568 | return results |
|
568 | 569 |
@@ -922,6 +922,7 class clientreactor(object): | |||
|
922 | 922 | self._outgoingstream = stream(1) |
|
923 | 923 | self._pendingrequests = collections.deque() |
|
924 | 924 | self._activerequests = {} |
|
925 | self._incomingstreams = {} | |
|
925 | 926 | |
|
926 | 927 | def callcommand(self, name, args, datafh=None): |
|
927 | 928 | """Request that a command be executed. |
@@ -1007,3 +1008,63 class clientreactor(object): | |||
|
1007 | 1008 | yield frame |
|
1008 | 1009 | |
|
1009 | 1010 | request.state = 'sent' |
|
1011 | ||
|
1012 | def onframerecv(self, frame): | |
|
1013 | """Process a frame that has been received off the wire. | |
|
1014 | ||
|
1015 | Returns a 2-tuple of (action, meta) describing further action the | |
|
1016 | caller needs to take as a result of receiving this frame. | |
|
1017 | """ | |
|
1018 | if frame.streamid % 2: | |
|
1019 | return 'error', { | |
|
1020 | 'message': ( | |
|
1021 | _('received frame with odd numbered stream ID: %d') % | |
|
1022 | frame.streamid), | |
|
1023 | } | |
|
1024 | ||
|
1025 | if frame.streamid not in self._incomingstreams: | |
|
1026 | if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM: | |
|
1027 | return 'error', { | |
|
1028 | 'message': _('received frame on unknown stream ' | |
|
1029 | 'without beginning of stream flag set'), | |
|
1030 | } | |
|
1031 | ||
|
1032 | if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED: | |
|
1033 | raise error.ProgrammingError('support for decoding stream ' | |
|
1034 | 'payloads not yet implemneted') | |
|
1035 | ||
|
1036 | if frame.streamflags & STREAM_FLAG_END_STREAM: | |
|
1037 | del self._incomingstreams[frame.streamid] | |
|
1038 | ||
|
1039 | if frame.requestid not in self._activerequests: | |
|
1040 | return 'error', { | |
|
1041 | 'message': (_('received frame for inactive request ID: %d') % | |
|
1042 | frame.requestid), | |
|
1043 | } | |
|
1044 | ||
|
1045 | request = self._activerequests[frame.requestid] | |
|
1046 | request.state = 'receiving' | |
|
1047 | ||
|
1048 | handlers = { | |
|
1049 | FRAME_TYPE_BYTES_RESPONSE: self._onbytesresponseframe, | |
|
1050 | } | |
|
1051 | ||
|
1052 | meth = handlers.get(frame.typeid) | |
|
1053 | if not meth: | |
|
1054 | raise error.ProgrammingError('unhandled frame type: %d' % | |
|
1055 | frame.typeid) | |
|
1056 | ||
|
1057 | return meth(request, frame) | |
|
1058 | ||
|
1059 | def _onbytesresponseframe(self, request, frame): | |
|
1060 | if frame.flags & FLAG_BYTES_RESPONSE_EOS: | |
|
1061 | request.state = 'received' | |
|
1062 | del self._activerequests[request.requestid] | |
|
1063 | ||
|
1064 | return 'responsedata', { | |
|
1065 | 'request': request, | |
|
1066 | 'expectmore': frame.flags & FLAG_BYTES_RESPONSE_CONTINUATION, | |
|
1067 | 'eos': frame.flags & FLAG_BYTES_RESPONSE_EOS, | |
|
1068 | 'cbor': frame.flags & FLAG_BYTES_RESPONSE_CBOR, | |
|
1069 | 'data': frame.payload, | |
|
1070 | } |
@@ -7,6 +7,21 from mercurial import ( | |||
|
7 | 7 | wireprotoframing as framing, |
|
8 | 8 | ) |
|
9 | 9 | |
|
10 | ffs = framing.makeframefromhumanstring | |
|
11 | ||
|
12 | def sendframe(reactor, frame): | |
|
13 | """Send a frame bytearray to a reactor.""" | |
|
14 | header = framing.parseheader(frame) | |
|
15 | payload = frame[framing.FRAME_HEADER_SIZE:] | |
|
16 | assert len(payload) == header.length | |
|
17 | ||
|
18 | return reactor.onframerecv(framing.frame(header.requestid, | |
|
19 | header.streamid, | |
|
20 | header.streamflags, | |
|
21 | header.typeid, | |
|
22 | header.flags, | |
|
23 | payload)) | |
|
24 | ||
|
10 | 25 | class SingleSendTests(unittest.TestCase): |
|
11 | 26 | """A reactor that can only send once rejects subsequent sends.""" |
|
12 | 27 | def testbasic(self): |
@@ -61,6 +76,35 class NoBufferTests(unittest.TestCase): | |||
|
61 | 76 | |
|
62 | 77 | self.assertEqual(request.state, 'sent') |
|
63 | 78 | |
|
79 | class BadFrameRecvTests(unittest.TestCase): | |
|
80 | def testoddstream(self): | |
|
81 | reactor = framing.clientreactor() | |
|
82 | ||
|
83 | action, meta = sendframe(reactor, ffs(b'1 1 0 1 0 foo')) | |
|
84 | self.assertEqual(action, 'error') | |
|
85 | self.assertEqual(meta['message'], | |
|
86 | 'received frame with odd numbered stream ID: 1') | |
|
87 | ||
|
88 | def testunknownstream(self): | |
|
89 | reactor = framing.clientreactor() | |
|
90 | ||
|
91 | action, meta = sendframe(reactor, ffs(b'1 0 0 1 0 foo')) | |
|
92 | self.assertEqual(action, 'error') | |
|
93 | self.assertEqual(meta['message'], | |
|
94 | 'received frame on unknown stream without beginning ' | |
|
95 | 'of stream flag set') | |
|
96 | ||
|
97 | def testunhandledframetype(self): | |
|
98 | reactor = framing.clientreactor(buffersends=False) | |
|
99 | ||
|
100 | request, action, meta = reactor.callcommand(b'foo', {}) | |
|
101 | for frame in meta['framegen']: | |
|
102 | pass | |
|
103 | ||
|
104 | with self.assertRaisesRegexp(error.ProgrammingError, | |
|
105 | 'unhandled frame type'): | |
|
106 | sendframe(reactor, ffs(b'1 0 stream-begin text-output 0 foo')) | |
|
107 | ||
|
64 | 108 | if __name__ == '__main__': |
|
65 | 109 | import silenttestrunner |
|
66 | 110 | silenttestrunner.main(__name__) |
General Comments 0
You need to be logged in to leave comments.
Login now