##// END OF EJS Templates
wireprotov2: document client reactor actions...
Gregory Szorc -
r40163:080419fa default
parent child Browse files
Show More
@@ -1,1497 +1,1531 b''
1 # wireprotoframing.py - unified framing protocol for wire protocol
1 # wireprotoframing.py - unified framing protocol for wire protocol
2 #
2 #
3 # Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
3 # Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 # This file contains functionality to support the unified frame-based wire
8 # This file contains functionality to support the unified frame-based wire
9 # protocol. For details about the protocol, see
9 # protocol. For details about the protocol, see
10 # `hg help internals.wireprotocol`.
10 # `hg help internals.wireprotocol`.
11
11
12 from __future__ import absolute_import
12 from __future__ import absolute_import
13
13
14 import collections
14 import collections
15 import struct
15 import struct
16
16
17 from .i18n import _
17 from .i18n import _
18 from .thirdparty import (
18 from .thirdparty import (
19 attr,
19 attr,
20 )
20 )
21 from . import (
21 from . import (
22 encoding,
22 encoding,
23 error,
23 error,
24 pycompat,
24 pycompat,
25 util,
25 util,
26 wireprototypes,
26 wireprototypes,
27 )
27 )
28 from .utils import (
28 from .utils import (
29 cborutil,
29 cborutil,
30 stringutil,
30 stringutil,
31 )
31 )
32
32
33 FRAME_HEADER_SIZE = 8
33 FRAME_HEADER_SIZE = 8
34 DEFAULT_MAX_FRAME_SIZE = 32768
34 DEFAULT_MAX_FRAME_SIZE = 32768
35
35
36 STREAM_FLAG_BEGIN_STREAM = 0x01
36 STREAM_FLAG_BEGIN_STREAM = 0x01
37 STREAM_FLAG_END_STREAM = 0x02
37 STREAM_FLAG_END_STREAM = 0x02
38 STREAM_FLAG_ENCODING_APPLIED = 0x04
38 STREAM_FLAG_ENCODING_APPLIED = 0x04
39
39
40 STREAM_FLAGS = {
40 STREAM_FLAGS = {
41 b'stream-begin': STREAM_FLAG_BEGIN_STREAM,
41 b'stream-begin': STREAM_FLAG_BEGIN_STREAM,
42 b'stream-end': STREAM_FLAG_END_STREAM,
42 b'stream-end': STREAM_FLAG_END_STREAM,
43 b'encoded': STREAM_FLAG_ENCODING_APPLIED,
43 b'encoded': STREAM_FLAG_ENCODING_APPLIED,
44 }
44 }
45
45
46 FRAME_TYPE_COMMAND_REQUEST = 0x01
46 FRAME_TYPE_COMMAND_REQUEST = 0x01
47 FRAME_TYPE_COMMAND_DATA = 0x02
47 FRAME_TYPE_COMMAND_DATA = 0x02
48 FRAME_TYPE_COMMAND_RESPONSE = 0x03
48 FRAME_TYPE_COMMAND_RESPONSE = 0x03
49 FRAME_TYPE_ERROR_RESPONSE = 0x05
49 FRAME_TYPE_ERROR_RESPONSE = 0x05
50 FRAME_TYPE_TEXT_OUTPUT = 0x06
50 FRAME_TYPE_TEXT_OUTPUT = 0x06
51 FRAME_TYPE_PROGRESS = 0x07
51 FRAME_TYPE_PROGRESS = 0x07
52 FRAME_TYPE_SENDER_PROTOCOL_SETTINGS = 0x08
52 FRAME_TYPE_SENDER_PROTOCOL_SETTINGS = 0x08
53 FRAME_TYPE_STREAM_SETTINGS = 0x09
53 FRAME_TYPE_STREAM_SETTINGS = 0x09
54
54
55 FRAME_TYPES = {
55 FRAME_TYPES = {
56 b'command-request': FRAME_TYPE_COMMAND_REQUEST,
56 b'command-request': FRAME_TYPE_COMMAND_REQUEST,
57 b'command-data': FRAME_TYPE_COMMAND_DATA,
57 b'command-data': FRAME_TYPE_COMMAND_DATA,
58 b'command-response': FRAME_TYPE_COMMAND_RESPONSE,
58 b'command-response': FRAME_TYPE_COMMAND_RESPONSE,
59 b'error-response': FRAME_TYPE_ERROR_RESPONSE,
59 b'error-response': FRAME_TYPE_ERROR_RESPONSE,
60 b'text-output': FRAME_TYPE_TEXT_OUTPUT,
60 b'text-output': FRAME_TYPE_TEXT_OUTPUT,
61 b'progress': FRAME_TYPE_PROGRESS,
61 b'progress': FRAME_TYPE_PROGRESS,
62 b'sender-protocol-settings': FRAME_TYPE_SENDER_PROTOCOL_SETTINGS,
62 b'sender-protocol-settings': FRAME_TYPE_SENDER_PROTOCOL_SETTINGS,
63 b'stream-settings': FRAME_TYPE_STREAM_SETTINGS,
63 b'stream-settings': FRAME_TYPE_STREAM_SETTINGS,
64 }
64 }
65
65
66 FLAG_COMMAND_REQUEST_NEW = 0x01
66 FLAG_COMMAND_REQUEST_NEW = 0x01
67 FLAG_COMMAND_REQUEST_CONTINUATION = 0x02
67 FLAG_COMMAND_REQUEST_CONTINUATION = 0x02
68 FLAG_COMMAND_REQUEST_MORE_FRAMES = 0x04
68 FLAG_COMMAND_REQUEST_MORE_FRAMES = 0x04
69 FLAG_COMMAND_REQUEST_EXPECT_DATA = 0x08
69 FLAG_COMMAND_REQUEST_EXPECT_DATA = 0x08
70
70
71 FLAGS_COMMAND_REQUEST = {
71 FLAGS_COMMAND_REQUEST = {
72 b'new': FLAG_COMMAND_REQUEST_NEW,
72 b'new': FLAG_COMMAND_REQUEST_NEW,
73 b'continuation': FLAG_COMMAND_REQUEST_CONTINUATION,
73 b'continuation': FLAG_COMMAND_REQUEST_CONTINUATION,
74 b'more': FLAG_COMMAND_REQUEST_MORE_FRAMES,
74 b'more': FLAG_COMMAND_REQUEST_MORE_FRAMES,
75 b'have-data': FLAG_COMMAND_REQUEST_EXPECT_DATA,
75 b'have-data': FLAG_COMMAND_REQUEST_EXPECT_DATA,
76 }
76 }
77
77
78 FLAG_COMMAND_DATA_CONTINUATION = 0x01
78 FLAG_COMMAND_DATA_CONTINUATION = 0x01
79 FLAG_COMMAND_DATA_EOS = 0x02
79 FLAG_COMMAND_DATA_EOS = 0x02
80
80
81 FLAGS_COMMAND_DATA = {
81 FLAGS_COMMAND_DATA = {
82 b'continuation': FLAG_COMMAND_DATA_CONTINUATION,
82 b'continuation': FLAG_COMMAND_DATA_CONTINUATION,
83 b'eos': FLAG_COMMAND_DATA_EOS,
83 b'eos': FLAG_COMMAND_DATA_EOS,
84 }
84 }
85
85
86 FLAG_COMMAND_RESPONSE_CONTINUATION = 0x01
86 FLAG_COMMAND_RESPONSE_CONTINUATION = 0x01
87 FLAG_COMMAND_RESPONSE_EOS = 0x02
87 FLAG_COMMAND_RESPONSE_EOS = 0x02
88
88
89 FLAGS_COMMAND_RESPONSE = {
89 FLAGS_COMMAND_RESPONSE = {
90 b'continuation': FLAG_COMMAND_RESPONSE_CONTINUATION,
90 b'continuation': FLAG_COMMAND_RESPONSE_CONTINUATION,
91 b'eos': FLAG_COMMAND_RESPONSE_EOS,
91 b'eos': FLAG_COMMAND_RESPONSE_EOS,
92 }
92 }
93
93
94 FLAG_SENDER_PROTOCOL_SETTINGS_CONTINUATION = 0x01
94 FLAG_SENDER_PROTOCOL_SETTINGS_CONTINUATION = 0x01
95 FLAG_SENDER_PROTOCOL_SETTINGS_EOS = 0x02
95 FLAG_SENDER_PROTOCOL_SETTINGS_EOS = 0x02
96
96
97 FLAGS_SENDER_PROTOCOL_SETTINGS = {
97 FLAGS_SENDER_PROTOCOL_SETTINGS = {
98 b'continuation': FLAG_SENDER_PROTOCOL_SETTINGS_CONTINUATION,
98 b'continuation': FLAG_SENDER_PROTOCOL_SETTINGS_CONTINUATION,
99 b'eos': FLAG_SENDER_PROTOCOL_SETTINGS_EOS,
99 b'eos': FLAG_SENDER_PROTOCOL_SETTINGS_EOS,
100 }
100 }
101
101
102 FLAG_STREAM_ENCODING_SETTINGS_CONTINUATION = 0x01
102 FLAG_STREAM_ENCODING_SETTINGS_CONTINUATION = 0x01
103 FLAG_STREAM_ENCODING_SETTINGS_EOS = 0x02
103 FLAG_STREAM_ENCODING_SETTINGS_EOS = 0x02
104
104
105 FLAGS_STREAM_ENCODING_SETTINGS = {
105 FLAGS_STREAM_ENCODING_SETTINGS = {
106 b'continuation': FLAG_STREAM_ENCODING_SETTINGS_CONTINUATION,
106 b'continuation': FLAG_STREAM_ENCODING_SETTINGS_CONTINUATION,
107 b'eos': FLAG_STREAM_ENCODING_SETTINGS_EOS,
107 b'eos': FLAG_STREAM_ENCODING_SETTINGS_EOS,
108 }
108 }
109
109
110 # Maps frame types to their available flags.
110 # Maps frame types to their available flags.
111 FRAME_TYPE_FLAGS = {
111 FRAME_TYPE_FLAGS = {
112 FRAME_TYPE_COMMAND_REQUEST: FLAGS_COMMAND_REQUEST,
112 FRAME_TYPE_COMMAND_REQUEST: FLAGS_COMMAND_REQUEST,
113 FRAME_TYPE_COMMAND_DATA: FLAGS_COMMAND_DATA,
113 FRAME_TYPE_COMMAND_DATA: FLAGS_COMMAND_DATA,
114 FRAME_TYPE_COMMAND_RESPONSE: FLAGS_COMMAND_RESPONSE,
114 FRAME_TYPE_COMMAND_RESPONSE: FLAGS_COMMAND_RESPONSE,
115 FRAME_TYPE_ERROR_RESPONSE: {},
115 FRAME_TYPE_ERROR_RESPONSE: {},
116 FRAME_TYPE_TEXT_OUTPUT: {},
116 FRAME_TYPE_TEXT_OUTPUT: {},
117 FRAME_TYPE_PROGRESS: {},
117 FRAME_TYPE_PROGRESS: {},
118 FRAME_TYPE_SENDER_PROTOCOL_SETTINGS: FLAGS_SENDER_PROTOCOL_SETTINGS,
118 FRAME_TYPE_SENDER_PROTOCOL_SETTINGS: FLAGS_SENDER_PROTOCOL_SETTINGS,
119 FRAME_TYPE_STREAM_SETTINGS: FLAGS_STREAM_ENCODING_SETTINGS,
119 FRAME_TYPE_STREAM_SETTINGS: FLAGS_STREAM_ENCODING_SETTINGS,
120 }
120 }
121
121
122 ARGUMENT_RECORD_HEADER = struct.Struct(r'<HH')
122 ARGUMENT_RECORD_HEADER = struct.Struct(r'<HH')
123
123
124 def humanflags(mapping, value):
124 def humanflags(mapping, value):
125 """Convert a numeric flags value to a human value, using a mapping table."""
125 """Convert a numeric flags value to a human value, using a mapping table."""
126 namemap = {v: k for k, v in mapping.iteritems()}
126 namemap = {v: k for k, v in mapping.iteritems()}
127 flags = []
127 flags = []
128 val = 1
128 val = 1
129 while value >= val:
129 while value >= val:
130 if value & val:
130 if value & val:
131 flags.append(namemap.get(val, '<unknown 0x%02x>' % val))
131 flags.append(namemap.get(val, '<unknown 0x%02x>' % val))
132 val <<= 1
132 val <<= 1
133
133
134 return b'|'.join(flags)
134 return b'|'.join(flags)
135
135
136 @attr.s(slots=True)
136 @attr.s(slots=True)
137 class frameheader(object):
137 class frameheader(object):
138 """Represents the data in a frame header."""
138 """Represents the data in a frame header."""
139
139
140 length = attr.ib()
140 length = attr.ib()
141 requestid = attr.ib()
141 requestid = attr.ib()
142 streamid = attr.ib()
142 streamid = attr.ib()
143 streamflags = attr.ib()
143 streamflags = attr.ib()
144 typeid = attr.ib()
144 typeid = attr.ib()
145 flags = attr.ib()
145 flags = attr.ib()
146
146
147 @attr.s(slots=True, repr=False)
147 @attr.s(slots=True, repr=False)
148 class frame(object):
148 class frame(object):
149 """Represents a parsed frame."""
149 """Represents a parsed frame."""
150
150
151 requestid = attr.ib()
151 requestid = attr.ib()
152 streamid = attr.ib()
152 streamid = attr.ib()
153 streamflags = attr.ib()
153 streamflags = attr.ib()
154 typeid = attr.ib()
154 typeid = attr.ib()
155 flags = attr.ib()
155 flags = attr.ib()
156 payload = attr.ib()
156 payload = attr.ib()
157
157
158 @encoding.strmethod
158 @encoding.strmethod
159 def __repr__(self):
159 def __repr__(self):
160 typename = '<unknown 0x%02x>' % self.typeid
160 typename = '<unknown 0x%02x>' % self.typeid
161 for name, value in FRAME_TYPES.iteritems():
161 for name, value in FRAME_TYPES.iteritems():
162 if value == self.typeid:
162 if value == self.typeid:
163 typename = name
163 typename = name
164 break
164 break
165
165
166 return ('frame(size=%d; request=%d; stream=%d; streamflags=%s; '
166 return ('frame(size=%d; request=%d; stream=%d; streamflags=%s; '
167 'type=%s; flags=%s)' % (
167 'type=%s; flags=%s)' % (
168 len(self.payload), self.requestid, self.streamid,
168 len(self.payload), self.requestid, self.streamid,
169 humanflags(STREAM_FLAGS, self.streamflags), typename,
169 humanflags(STREAM_FLAGS, self.streamflags), typename,
170 humanflags(FRAME_TYPE_FLAGS.get(self.typeid, {}), self.flags)))
170 humanflags(FRAME_TYPE_FLAGS.get(self.typeid, {}), self.flags)))
171
171
172 def makeframe(requestid, streamid, streamflags, typeid, flags, payload):
172 def makeframe(requestid, streamid, streamflags, typeid, flags, payload):
173 """Assemble a frame into a byte array."""
173 """Assemble a frame into a byte array."""
174 # TODO assert size of payload.
174 # TODO assert size of payload.
175 frame = bytearray(FRAME_HEADER_SIZE + len(payload))
175 frame = bytearray(FRAME_HEADER_SIZE + len(payload))
176
176
177 # 24 bits length
177 # 24 bits length
178 # 16 bits request id
178 # 16 bits request id
179 # 8 bits stream id
179 # 8 bits stream id
180 # 8 bits stream flags
180 # 8 bits stream flags
181 # 4 bits type
181 # 4 bits type
182 # 4 bits flags
182 # 4 bits flags
183
183
184 l = struct.pack(r'<I', len(payload))
184 l = struct.pack(r'<I', len(payload))
185 frame[0:3] = l[0:3]
185 frame[0:3] = l[0:3]
186 struct.pack_into(r'<HBB', frame, 3, requestid, streamid, streamflags)
186 struct.pack_into(r'<HBB', frame, 3, requestid, streamid, streamflags)
187 frame[7] = (typeid << 4) | flags
187 frame[7] = (typeid << 4) | flags
188 frame[8:] = payload
188 frame[8:] = payload
189
189
190 return frame
190 return frame
191
191
192 def makeframefromhumanstring(s):
192 def makeframefromhumanstring(s):
193 """Create a frame from a human readable string
193 """Create a frame from a human readable string
194
194
195 Strings have the form:
195 Strings have the form:
196
196
197 <request-id> <stream-id> <stream-flags> <type> <flags> <payload>
197 <request-id> <stream-id> <stream-flags> <type> <flags> <payload>
198
198
199 This can be used by user-facing applications and tests for creating
199 This can be used by user-facing applications and tests for creating
200 frames easily without having to type out a bunch of constants.
200 frames easily without having to type out a bunch of constants.
201
201
202 Request ID and stream IDs are integers.
202 Request ID and stream IDs are integers.
203
203
204 Stream flags, frame type, and flags can be specified by integer or
204 Stream flags, frame type, and flags can be specified by integer or
205 named constant.
205 named constant.
206
206
207 Flags can be delimited by `|` to bitwise OR them together.
207 Flags can be delimited by `|` to bitwise OR them together.
208
208
209 If the payload begins with ``cbor:``, the following string will be
209 If the payload begins with ``cbor:``, the following string will be
210 evaluated as Python literal and the resulting object will be fed into
210 evaluated as Python literal and the resulting object will be fed into
211 a CBOR encoder. Otherwise, the payload is interpreted as a Python
211 a CBOR encoder. Otherwise, the payload is interpreted as a Python
212 byte string literal.
212 byte string literal.
213 """
213 """
214 fields = s.split(b' ', 5)
214 fields = s.split(b' ', 5)
215 requestid, streamid, streamflags, frametype, frameflags, payload = fields
215 requestid, streamid, streamflags, frametype, frameflags, payload = fields
216
216
217 requestid = int(requestid)
217 requestid = int(requestid)
218 streamid = int(streamid)
218 streamid = int(streamid)
219
219
220 finalstreamflags = 0
220 finalstreamflags = 0
221 for flag in streamflags.split(b'|'):
221 for flag in streamflags.split(b'|'):
222 if flag in STREAM_FLAGS:
222 if flag in STREAM_FLAGS:
223 finalstreamflags |= STREAM_FLAGS[flag]
223 finalstreamflags |= STREAM_FLAGS[flag]
224 else:
224 else:
225 finalstreamflags |= int(flag)
225 finalstreamflags |= int(flag)
226
226
227 if frametype in FRAME_TYPES:
227 if frametype in FRAME_TYPES:
228 frametype = FRAME_TYPES[frametype]
228 frametype = FRAME_TYPES[frametype]
229 else:
229 else:
230 frametype = int(frametype)
230 frametype = int(frametype)
231
231
232 finalflags = 0
232 finalflags = 0
233 validflags = FRAME_TYPE_FLAGS[frametype]
233 validflags = FRAME_TYPE_FLAGS[frametype]
234 for flag in frameflags.split(b'|'):
234 for flag in frameflags.split(b'|'):
235 if flag in validflags:
235 if flag in validflags:
236 finalflags |= validflags[flag]
236 finalflags |= validflags[flag]
237 else:
237 else:
238 finalflags |= int(flag)
238 finalflags |= int(flag)
239
239
240 if payload.startswith(b'cbor:'):
240 if payload.startswith(b'cbor:'):
241 payload = b''.join(cborutil.streamencode(
241 payload = b''.join(cborutil.streamencode(
242 stringutil.evalpythonliteral(payload[5:])))
242 stringutil.evalpythonliteral(payload[5:])))
243
243
244 else:
244 else:
245 payload = stringutil.unescapestr(payload)
245 payload = stringutil.unescapestr(payload)
246
246
247 return makeframe(requestid=requestid, streamid=streamid,
247 return makeframe(requestid=requestid, streamid=streamid,
248 streamflags=finalstreamflags, typeid=frametype,
248 streamflags=finalstreamflags, typeid=frametype,
249 flags=finalflags, payload=payload)
249 flags=finalflags, payload=payload)
250
250
251 def parseheader(data):
251 def parseheader(data):
252 """Parse a unified framing protocol frame header from a buffer.
252 """Parse a unified framing protocol frame header from a buffer.
253
253
254 The header is expected to be in the buffer at offset 0 and the
254 The header is expected to be in the buffer at offset 0 and the
255 buffer is expected to be large enough to hold a full header.
255 buffer is expected to be large enough to hold a full header.
256 """
256 """
257 # 24 bits payload length (little endian)
257 # 24 bits payload length (little endian)
258 # 16 bits request ID
258 # 16 bits request ID
259 # 8 bits stream ID
259 # 8 bits stream ID
260 # 8 bits stream flags
260 # 8 bits stream flags
261 # 4 bits frame type
261 # 4 bits frame type
262 # 4 bits frame flags
262 # 4 bits frame flags
263 # ... payload
263 # ... payload
264 framelength = data[0] + 256 * data[1] + 16384 * data[2]
264 framelength = data[0] + 256 * data[1] + 16384 * data[2]
265 requestid, streamid, streamflags = struct.unpack_from(r'<HBB', data, 3)
265 requestid, streamid, streamflags = struct.unpack_from(r'<HBB', data, 3)
266 typeflags = data[7]
266 typeflags = data[7]
267
267
268 frametype = (typeflags & 0xf0) >> 4
268 frametype = (typeflags & 0xf0) >> 4
269 frameflags = typeflags & 0x0f
269 frameflags = typeflags & 0x0f
270
270
271 return frameheader(framelength, requestid, streamid, streamflags,
271 return frameheader(framelength, requestid, streamid, streamflags,
272 frametype, frameflags)
272 frametype, frameflags)
273
273
274 def readframe(fh):
274 def readframe(fh):
275 """Read a unified framing protocol frame from a file object.
275 """Read a unified framing protocol frame from a file object.
276
276
277 Returns a 3-tuple of (type, flags, payload) for the decoded frame or
277 Returns a 3-tuple of (type, flags, payload) for the decoded frame or
278 None if no frame is available. May raise if a malformed frame is
278 None if no frame is available. May raise if a malformed frame is
279 seen.
279 seen.
280 """
280 """
281 header = bytearray(FRAME_HEADER_SIZE)
281 header = bytearray(FRAME_HEADER_SIZE)
282
282
283 readcount = fh.readinto(header)
283 readcount = fh.readinto(header)
284
284
285 if readcount == 0:
285 if readcount == 0:
286 return None
286 return None
287
287
288 if readcount != FRAME_HEADER_SIZE:
288 if readcount != FRAME_HEADER_SIZE:
289 raise error.Abort(_('received incomplete frame: got %d bytes: %s') %
289 raise error.Abort(_('received incomplete frame: got %d bytes: %s') %
290 (readcount, header))
290 (readcount, header))
291
291
292 h = parseheader(header)
292 h = parseheader(header)
293
293
294 payload = fh.read(h.length)
294 payload = fh.read(h.length)
295 if len(payload) != h.length:
295 if len(payload) != h.length:
296 raise error.Abort(_('frame length error: expected %d; got %d') %
296 raise error.Abort(_('frame length error: expected %d; got %d') %
297 (h.length, len(payload)))
297 (h.length, len(payload)))
298
298
299 return frame(h.requestid, h.streamid, h.streamflags, h.typeid, h.flags,
299 return frame(h.requestid, h.streamid, h.streamflags, h.typeid, h.flags,
300 payload)
300 payload)
301
301
302 def createcommandframes(stream, requestid, cmd, args, datafh=None,
302 def createcommandframes(stream, requestid, cmd, args, datafh=None,
303 maxframesize=DEFAULT_MAX_FRAME_SIZE,
303 maxframesize=DEFAULT_MAX_FRAME_SIZE,
304 redirect=None):
304 redirect=None):
305 """Create frames necessary to transmit a request to run a command.
305 """Create frames necessary to transmit a request to run a command.
306
306
307 This is a generator of bytearrays. Each item represents a frame
307 This is a generator of bytearrays. Each item represents a frame
308 ready to be sent over the wire to a peer.
308 ready to be sent over the wire to a peer.
309 """
309 """
310 data = {b'name': cmd}
310 data = {b'name': cmd}
311 if args:
311 if args:
312 data[b'args'] = args
312 data[b'args'] = args
313
313
314 if redirect:
314 if redirect:
315 data[b'redirect'] = redirect
315 data[b'redirect'] = redirect
316
316
317 data = b''.join(cborutil.streamencode(data))
317 data = b''.join(cborutil.streamencode(data))
318
318
319 offset = 0
319 offset = 0
320
320
321 while True:
321 while True:
322 flags = 0
322 flags = 0
323
323
324 # Must set new or continuation flag.
324 # Must set new or continuation flag.
325 if not offset:
325 if not offset:
326 flags |= FLAG_COMMAND_REQUEST_NEW
326 flags |= FLAG_COMMAND_REQUEST_NEW
327 else:
327 else:
328 flags |= FLAG_COMMAND_REQUEST_CONTINUATION
328 flags |= FLAG_COMMAND_REQUEST_CONTINUATION
329
329
330 # Data frames is set on all frames.
330 # Data frames is set on all frames.
331 if datafh:
331 if datafh:
332 flags |= FLAG_COMMAND_REQUEST_EXPECT_DATA
332 flags |= FLAG_COMMAND_REQUEST_EXPECT_DATA
333
333
334 payload = data[offset:offset + maxframesize]
334 payload = data[offset:offset + maxframesize]
335 offset += len(payload)
335 offset += len(payload)
336
336
337 if len(payload) == maxframesize and offset < len(data):
337 if len(payload) == maxframesize and offset < len(data):
338 flags |= FLAG_COMMAND_REQUEST_MORE_FRAMES
338 flags |= FLAG_COMMAND_REQUEST_MORE_FRAMES
339
339
340 yield stream.makeframe(requestid=requestid,
340 yield stream.makeframe(requestid=requestid,
341 typeid=FRAME_TYPE_COMMAND_REQUEST,
341 typeid=FRAME_TYPE_COMMAND_REQUEST,
342 flags=flags,
342 flags=flags,
343 payload=payload)
343 payload=payload)
344
344
345 if not (flags & FLAG_COMMAND_REQUEST_MORE_FRAMES):
345 if not (flags & FLAG_COMMAND_REQUEST_MORE_FRAMES):
346 break
346 break
347
347
348 if datafh:
348 if datafh:
349 while True:
349 while True:
350 data = datafh.read(DEFAULT_MAX_FRAME_SIZE)
350 data = datafh.read(DEFAULT_MAX_FRAME_SIZE)
351
351
352 done = False
352 done = False
353 if len(data) == DEFAULT_MAX_FRAME_SIZE:
353 if len(data) == DEFAULT_MAX_FRAME_SIZE:
354 flags = FLAG_COMMAND_DATA_CONTINUATION
354 flags = FLAG_COMMAND_DATA_CONTINUATION
355 else:
355 else:
356 flags = FLAG_COMMAND_DATA_EOS
356 flags = FLAG_COMMAND_DATA_EOS
357 assert datafh.read(1) == b''
357 assert datafh.read(1) == b''
358 done = True
358 done = True
359
359
360 yield stream.makeframe(requestid=requestid,
360 yield stream.makeframe(requestid=requestid,
361 typeid=FRAME_TYPE_COMMAND_DATA,
361 typeid=FRAME_TYPE_COMMAND_DATA,
362 flags=flags,
362 flags=flags,
363 payload=data)
363 payload=data)
364
364
365 if done:
365 if done:
366 break
366 break
367
367
368 def createcommandresponseframesfrombytes(stream, requestid, data,
368 def createcommandresponseframesfrombytes(stream, requestid, data,
369 maxframesize=DEFAULT_MAX_FRAME_SIZE):
369 maxframesize=DEFAULT_MAX_FRAME_SIZE):
370 """Create a raw frame to send a bytes response from static bytes input.
370 """Create a raw frame to send a bytes response from static bytes input.
371
371
372 Returns a generator of bytearrays.
372 Returns a generator of bytearrays.
373 """
373 """
374 # Automatically send the overall CBOR response map.
374 # Automatically send the overall CBOR response map.
375 overall = b''.join(cborutil.streamencode({b'status': b'ok'}))
375 overall = b''.join(cborutil.streamencode({b'status': b'ok'}))
376 if len(overall) > maxframesize:
376 if len(overall) > maxframesize:
377 raise error.ProgrammingError('not yet implemented')
377 raise error.ProgrammingError('not yet implemented')
378
378
379 # Simple case where we can fit the full response in a single frame.
379 # Simple case where we can fit the full response in a single frame.
380 if len(overall) + len(data) <= maxframesize:
380 if len(overall) + len(data) <= maxframesize:
381 flags = FLAG_COMMAND_RESPONSE_EOS
381 flags = FLAG_COMMAND_RESPONSE_EOS
382 yield stream.makeframe(requestid=requestid,
382 yield stream.makeframe(requestid=requestid,
383 typeid=FRAME_TYPE_COMMAND_RESPONSE,
383 typeid=FRAME_TYPE_COMMAND_RESPONSE,
384 flags=flags,
384 flags=flags,
385 payload=overall + data)
385 payload=overall + data)
386 return
386 return
387
387
388 # It's easier to send the overall CBOR map in its own frame than to track
388 # It's easier to send the overall CBOR map in its own frame than to track
389 # offsets.
389 # offsets.
390 yield stream.makeframe(requestid=requestid,
390 yield stream.makeframe(requestid=requestid,
391 typeid=FRAME_TYPE_COMMAND_RESPONSE,
391 typeid=FRAME_TYPE_COMMAND_RESPONSE,
392 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
392 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
393 payload=overall)
393 payload=overall)
394
394
395 offset = 0
395 offset = 0
396 while True:
396 while True:
397 chunk = data[offset:offset + maxframesize]
397 chunk = data[offset:offset + maxframesize]
398 offset += len(chunk)
398 offset += len(chunk)
399 done = offset == len(data)
399 done = offset == len(data)
400
400
401 if done:
401 if done:
402 flags = FLAG_COMMAND_RESPONSE_EOS
402 flags = FLAG_COMMAND_RESPONSE_EOS
403 else:
403 else:
404 flags = FLAG_COMMAND_RESPONSE_CONTINUATION
404 flags = FLAG_COMMAND_RESPONSE_CONTINUATION
405
405
406 yield stream.makeframe(requestid=requestid,
406 yield stream.makeframe(requestid=requestid,
407 typeid=FRAME_TYPE_COMMAND_RESPONSE,
407 typeid=FRAME_TYPE_COMMAND_RESPONSE,
408 flags=flags,
408 flags=flags,
409 payload=chunk)
409 payload=chunk)
410
410
411 if done:
411 if done:
412 break
412 break
413
413
414 def createbytesresponseframesfromgen(stream, requestid, gen,
414 def createbytesresponseframesfromgen(stream, requestid, gen,
415 maxframesize=DEFAULT_MAX_FRAME_SIZE):
415 maxframesize=DEFAULT_MAX_FRAME_SIZE):
416 """Generator of frames from a generator of byte chunks.
416 """Generator of frames from a generator of byte chunks.
417
417
418 This assumes that another frame will follow whatever this emits. i.e.
418 This assumes that another frame will follow whatever this emits. i.e.
419 this always emits the continuation flag and never emits the end-of-stream
419 this always emits the continuation flag and never emits the end-of-stream
420 flag.
420 flag.
421 """
421 """
422 cb = util.chunkbuffer(gen)
422 cb = util.chunkbuffer(gen)
423 flags = FLAG_COMMAND_RESPONSE_CONTINUATION
423 flags = FLAG_COMMAND_RESPONSE_CONTINUATION
424
424
425 while True:
425 while True:
426 chunk = cb.read(maxframesize)
426 chunk = cb.read(maxframesize)
427 if not chunk:
427 if not chunk:
428 break
428 break
429
429
430 yield stream.makeframe(requestid=requestid,
430 yield stream.makeframe(requestid=requestid,
431 typeid=FRAME_TYPE_COMMAND_RESPONSE,
431 typeid=FRAME_TYPE_COMMAND_RESPONSE,
432 flags=flags,
432 flags=flags,
433 payload=chunk)
433 payload=chunk)
434
434
435 flags |= FLAG_COMMAND_RESPONSE_CONTINUATION
435 flags |= FLAG_COMMAND_RESPONSE_CONTINUATION
436
436
437 def createcommandresponseokframe(stream, requestid):
437 def createcommandresponseokframe(stream, requestid):
438 overall = b''.join(cborutil.streamencode({b'status': b'ok'}))
438 overall = b''.join(cborutil.streamencode({b'status': b'ok'}))
439
439
440 return stream.makeframe(requestid=requestid,
440 return stream.makeframe(requestid=requestid,
441 typeid=FRAME_TYPE_COMMAND_RESPONSE,
441 typeid=FRAME_TYPE_COMMAND_RESPONSE,
442 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
442 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
443 payload=overall)
443 payload=overall)
444
444
445 def createcommandresponseeosframe(stream, requestid):
445 def createcommandresponseeosframe(stream, requestid):
446 """Create an empty payload frame representing command end-of-stream."""
446 """Create an empty payload frame representing command end-of-stream."""
447 return stream.makeframe(requestid=requestid,
447 return stream.makeframe(requestid=requestid,
448 typeid=FRAME_TYPE_COMMAND_RESPONSE,
448 typeid=FRAME_TYPE_COMMAND_RESPONSE,
449 flags=FLAG_COMMAND_RESPONSE_EOS,
449 flags=FLAG_COMMAND_RESPONSE_EOS,
450 payload=b'')
450 payload=b'')
451
451
452 def createalternatelocationresponseframe(stream, requestid, location):
452 def createalternatelocationresponseframe(stream, requestid, location):
453 data = {
453 data = {
454 b'status': b'redirect',
454 b'status': b'redirect',
455 b'location': {
455 b'location': {
456 b'url': location.url,
456 b'url': location.url,
457 b'mediatype': location.mediatype,
457 b'mediatype': location.mediatype,
458 }
458 }
459 }
459 }
460
460
461 for a in (r'size', r'fullhashes', r'fullhashseed', r'serverdercerts',
461 for a in (r'size', r'fullhashes', r'fullhashseed', r'serverdercerts',
462 r'servercadercerts'):
462 r'servercadercerts'):
463 value = getattr(location, a)
463 value = getattr(location, a)
464 if value is not None:
464 if value is not None:
465 data[b'location'][pycompat.bytestr(a)] = value
465 data[b'location'][pycompat.bytestr(a)] = value
466
466
467 return stream.makeframe(requestid=requestid,
467 return stream.makeframe(requestid=requestid,
468 typeid=FRAME_TYPE_COMMAND_RESPONSE,
468 typeid=FRAME_TYPE_COMMAND_RESPONSE,
469 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
469 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
470 payload=b''.join(cborutil.streamencode(data)))
470 payload=b''.join(cborutil.streamencode(data)))
471
471
472 def createcommanderrorresponse(stream, requestid, message, args=None):
472 def createcommanderrorresponse(stream, requestid, message, args=None):
473 # TODO should this be using a list of {'msg': ..., 'args': {}} so atom
473 # TODO should this be using a list of {'msg': ..., 'args': {}} so atom
474 # formatting works consistently?
474 # formatting works consistently?
475 m = {
475 m = {
476 b'status': b'error',
476 b'status': b'error',
477 b'error': {
477 b'error': {
478 b'message': message,
478 b'message': message,
479 }
479 }
480 }
480 }
481
481
482 if args:
482 if args:
483 m[b'error'][b'args'] = args
483 m[b'error'][b'args'] = args
484
484
485 overall = b''.join(cborutil.streamencode(m))
485 overall = b''.join(cborutil.streamencode(m))
486
486
487 yield stream.makeframe(requestid=requestid,
487 yield stream.makeframe(requestid=requestid,
488 typeid=FRAME_TYPE_COMMAND_RESPONSE,
488 typeid=FRAME_TYPE_COMMAND_RESPONSE,
489 flags=FLAG_COMMAND_RESPONSE_EOS,
489 flags=FLAG_COMMAND_RESPONSE_EOS,
490 payload=overall)
490 payload=overall)
491
491
492 def createerrorframe(stream, requestid, msg, errtype):
492 def createerrorframe(stream, requestid, msg, errtype):
493 # TODO properly handle frame size limits.
493 # TODO properly handle frame size limits.
494 assert len(msg) <= DEFAULT_MAX_FRAME_SIZE
494 assert len(msg) <= DEFAULT_MAX_FRAME_SIZE
495
495
496 payload = b''.join(cborutil.streamencode({
496 payload = b''.join(cborutil.streamencode({
497 b'type': errtype,
497 b'type': errtype,
498 b'message': [{b'msg': msg}],
498 b'message': [{b'msg': msg}],
499 }))
499 }))
500
500
501 yield stream.makeframe(requestid=requestid,
501 yield stream.makeframe(requestid=requestid,
502 typeid=FRAME_TYPE_ERROR_RESPONSE,
502 typeid=FRAME_TYPE_ERROR_RESPONSE,
503 flags=0,
503 flags=0,
504 payload=payload)
504 payload=payload)
505
505
506 def createtextoutputframe(stream, requestid, atoms,
506 def createtextoutputframe(stream, requestid, atoms,
507 maxframesize=DEFAULT_MAX_FRAME_SIZE):
507 maxframesize=DEFAULT_MAX_FRAME_SIZE):
508 """Create a text output frame to render text to people.
508 """Create a text output frame to render text to people.
509
509
510 ``atoms`` is a 3-tuple of (formatting string, args, labels).
510 ``atoms`` is a 3-tuple of (formatting string, args, labels).
511
511
512 The formatting string contains ``%s`` tokens to be replaced by the
512 The formatting string contains ``%s`` tokens to be replaced by the
513 corresponding indexed entry in ``args``. ``labels`` is an iterable of
513 corresponding indexed entry in ``args``. ``labels`` is an iterable of
514 formatters to be applied at rendering time. In terms of the ``ui``
514 formatters to be applied at rendering time. In terms of the ``ui``
515 class, each atom corresponds to a ``ui.write()``.
515 class, each atom corresponds to a ``ui.write()``.
516 """
516 """
517 atomdicts = []
517 atomdicts = []
518
518
519 for (formatting, args, labels) in atoms:
519 for (formatting, args, labels) in atoms:
520 # TODO look for localstr, other types here?
520 # TODO look for localstr, other types here?
521
521
522 if not isinstance(formatting, bytes):
522 if not isinstance(formatting, bytes):
523 raise ValueError('must use bytes formatting strings')
523 raise ValueError('must use bytes formatting strings')
524 for arg in args:
524 for arg in args:
525 if not isinstance(arg, bytes):
525 if not isinstance(arg, bytes):
526 raise ValueError('must use bytes for arguments')
526 raise ValueError('must use bytes for arguments')
527 for label in labels:
527 for label in labels:
528 if not isinstance(label, bytes):
528 if not isinstance(label, bytes):
529 raise ValueError('must use bytes for labels')
529 raise ValueError('must use bytes for labels')
530
530
531 # Formatting string must be ASCII.
531 # Formatting string must be ASCII.
532 formatting = formatting.decode(r'ascii', r'replace').encode(r'ascii')
532 formatting = formatting.decode(r'ascii', r'replace').encode(r'ascii')
533
533
534 # Arguments must be UTF-8.
534 # Arguments must be UTF-8.
535 args = [a.decode(r'utf-8', r'replace').encode(r'utf-8') for a in args]
535 args = [a.decode(r'utf-8', r'replace').encode(r'utf-8') for a in args]
536
536
537 # Labels must be ASCII.
537 # Labels must be ASCII.
538 labels = [l.decode(r'ascii', r'strict').encode(r'ascii')
538 labels = [l.decode(r'ascii', r'strict').encode(r'ascii')
539 for l in labels]
539 for l in labels]
540
540
541 atom = {b'msg': formatting}
541 atom = {b'msg': formatting}
542 if args:
542 if args:
543 atom[b'args'] = args
543 atom[b'args'] = args
544 if labels:
544 if labels:
545 atom[b'labels'] = labels
545 atom[b'labels'] = labels
546
546
547 atomdicts.append(atom)
547 atomdicts.append(atom)
548
548
549 payload = b''.join(cborutil.streamencode(atomdicts))
549 payload = b''.join(cborutil.streamencode(atomdicts))
550
550
551 if len(payload) > maxframesize:
551 if len(payload) > maxframesize:
552 raise ValueError('cannot encode data in a single frame')
552 raise ValueError('cannot encode data in a single frame')
553
553
554 yield stream.makeframe(requestid=requestid,
554 yield stream.makeframe(requestid=requestid,
555 typeid=FRAME_TYPE_TEXT_OUTPUT,
555 typeid=FRAME_TYPE_TEXT_OUTPUT,
556 flags=0,
556 flags=0,
557 payload=payload)
557 payload=payload)
558
558
559 class bufferingcommandresponseemitter(object):
559 class bufferingcommandresponseemitter(object):
560 """Helper object to emit command response frames intelligently.
560 """Helper object to emit command response frames intelligently.
561
561
562 Raw command response data is likely emitted in chunks much smaller
562 Raw command response data is likely emitted in chunks much smaller
563 than what can fit in a single frame. This class exists to buffer
563 than what can fit in a single frame. This class exists to buffer
564 chunks until enough data is available to fit in a single frame.
564 chunks until enough data is available to fit in a single frame.
565
565
566 TODO we'll need something like this when compression is supported.
566 TODO we'll need something like this when compression is supported.
567 So it might make sense to implement this functionality at the stream
567 So it might make sense to implement this functionality at the stream
568 level.
568 level.
569 """
569 """
570 def __init__(self, stream, requestid, maxframesize=DEFAULT_MAX_FRAME_SIZE):
570 def __init__(self, stream, requestid, maxframesize=DEFAULT_MAX_FRAME_SIZE):
571 self._stream = stream
571 self._stream = stream
572 self._requestid = requestid
572 self._requestid = requestid
573 self._maxsize = maxframesize
573 self._maxsize = maxframesize
574 self._chunks = []
574 self._chunks = []
575 self._chunkssize = 0
575 self._chunkssize = 0
576
576
577 def send(self, data):
577 def send(self, data):
578 """Send new data for emission.
578 """Send new data for emission.
579
579
580 Is a generator of new frames that were derived from the new input.
580 Is a generator of new frames that were derived from the new input.
581
581
582 If the special input ``None`` is received, flushes all buffered
582 If the special input ``None`` is received, flushes all buffered
583 data to frames.
583 data to frames.
584 """
584 """
585
585
586 if data is None:
586 if data is None:
587 for frame in self._flush():
587 for frame in self._flush():
588 yield frame
588 yield frame
589 return
589 return
590
590
591 # There is a ton of potential to do more complicated things here.
591 # There is a ton of potential to do more complicated things here.
592 # Our immediate goal is to coalesce small chunks into big frames,
592 # Our immediate goal is to coalesce small chunks into big frames,
593 # not achieve the fewest number of frames possible. So we go with
593 # not achieve the fewest number of frames possible. So we go with
594 # a simple implementation:
594 # a simple implementation:
595 #
595 #
596 # * If a chunk is too large for a frame, we flush and emit frames
596 # * If a chunk is too large for a frame, we flush and emit frames
597 # for the new chunk.
597 # for the new chunk.
598 # * If a chunk can be buffered without total buffered size limits
598 # * If a chunk can be buffered without total buffered size limits
599 # being exceeded, we do that.
599 # being exceeded, we do that.
600 # * If a chunk causes us to go over our buffering limit, we flush
600 # * If a chunk causes us to go over our buffering limit, we flush
601 # and then buffer the new chunk.
601 # and then buffer the new chunk.
602
602
603 if len(data) > self._maxsize:
603 if len(data) > self._maxsize:
604 for frame in self._flush():
604 for frame in self._flush():
605 yield frame
605 yield frame
606
606
607 # Now emit frames for the big chunk.
607 # Now emit frames for the big chunk.
608 offset = 0
608 offset = 0
609 while True:
609 while True:
610 chunk = data[offset:offset + self._maxsize]
610 chunk = data[offset:offset + self._maxsize]
611 offset += len(chunk)
611 offset += len(chunk)
612
612
613 yield self._stream.makeframe(
613 yield self._stream.makeframe(
614 self._requestid,
614 self._requestid,
615 typeid=FRAME_TYPE_COMMAND_RESPONSE,
615 typeid=FRAME_TYPE_COMMAND_RESPONSE,
616 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
616 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
617 payload=chunk)
617 payload=chunk)
618
618
619 if offset == len(data):
619 if offset == len(data):
620 return
620 return
621
621
622 # If we don't have enough to constitute a full frame, buffer and
622 # If we don't have enough to constitute a full frame, buffer and
623 # return.
623 # return.
624 if len(data) + self._chunkssize < self._maxsize:
624 if len(data) + self._chunkssize < self._maxsize:
625 self._chunks.append(data)
625 self._chunks.append(data)
626 self._chunkssize += len(data)
626 self._chunkssize += len(data)
627 return
627 return
628
628
629 # Else flush what we have and buffer the new chunk. We could do
629 # Else flush what we have and buffer the new chunk. We could do
630 # something more intelligent here, like break the chunk. Let's
630 # something more intelligent here, like break the chunk. Let's
631 # keep things simple for now.
631 # keep things simple for now.
632 for frame in self._flush():
632 for frame in self._flush():
633 yield frame
633 yield frame
634
634
635 self._chunks.append(data)
635 self._chunks.append(data)
636 self._chunkssize = len(data)
636 self._chunkssize = len(data)
637
637
638 def _flush(self):
638 def _flush(self):
639 payload = b''.join(self._chunks)
639 payload = b''.join(self._chunks)
640 assert len(payload) <= self._maxsize
640 assert len(payload) <= self._maxsize
641
641
642 self._chunks[:] = []
642 self._chunks[:] = []
643 self._chunkssize = 0
643 self._chunkssize = 0
644
644
645 yield self._stream.makeframe(
645 yield self._stream.makeframe(
646 self._requestid,
646 self._requestid,
647 typeid=FRAME_TYPE_COMMAND_RESPONSE,
647 typeid=FRAME_TYPE_COMMAND_RESPONSE,
648 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
648 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
649 payload=payload)
649 payload=payload)
650
650
651 class stream(object):
651 class stream(object):
652 """Represents a logical unidirectional series of frames."""
652 """Represents a logical unidirectional series of frames."""
653
653
654 def __init__(self, streamid, active=False):
654 def __init__(self, streamid, active=False):
655 self.streamid = streamid
655 self.streamid = streamid
656 self._active = active
656 self._active = active
657
657
658 def makeframe(self, requestid, typeid, flags, payload):
658 def makeframe(self, requestid, typeid, flags, payload):
659 """Create a frame to be sent out over this stream.
659 """Create a frame to be sent out over this stream.
660
660
661 Only returns the frame instance. Does not actually send it.
661 Only returns the frame instance. Does not actually send it.
662 """
662 """
663 streamflags = 0
663 streamflags = 0
664 if not self._active:
664 if not self._active:
665 streamflags |= STREAM_FLAG_BEGIN_STREAM
665 streamflags |= STREAM_FLAG_BEGIN_STREAM
666 self._active = True
666 self._active = True
667
667
668 return makeframe(requestid, self.streamid, streamflags, typeid, flags,
668 return makeframe(requestid, self.streamid, streamflags, typeid, flags,
669 payload)
669 payload)
670
670
671 def ensureserverstream(stream):
671 def ensureserverstream(stream):
672 if stream.streamid % 2:
672 if stream.streamid % 2:
673 raise error.ProgrammingError('server should only write to even '
673 raise error.ProgrammingError('server should only write to even '
674 'numbered streams; %d is not even' %
674 'numbered streams; %d is not even' %
675 stream.streamid)
675 stream.streamid)
676
676
677 DEFAULT_PROTOCOL_SETTINGS = {
677 DEFAULT_PROTOCOL_SETTINGS = {
678 'contentencodings': [b'identity'],
678 'contentencodings': [b'identity'],
679 }
679 }
680
680
681 class serverreactor(object):
681 class serverreactor(object):
682 """Holds state of a server handling frame-based protocol requests.
682 """Holds state of a server handling frame-based protocol requests.
683
683
684 This class is the "brain" of the unified frame-based protocol server
684 This class is the "brain" of the unified frame-based protocol server
685 component. While the protocol is stateless from the perspective of
685 component. While the protocol is stateless from the perspective of
686 requests/commands, something needs to track which frames have been
686 requests/commands, something needs to track which frames have been
687 received, what frames to expect, etc. This class is that thing.
687 received, what frames to expect, etc. This class is that thing.
688
688
689 Instances are modeled as a state machine of sorts. Instances are also
689 Instances are modeled as a state machine of sorts. Instances are also
690 reactionary to external events. The point of this class is to encapsulate
690 reactionary to external events. The point of this class is to encapsulate
691 the state of the connection and the exchange of frames, not to perform
691 the state of the connection and the exchange of frames, not to perform
692 work. Instead, callers tell this class when something occurs, like a
692 work. Instead, callers tell this class when something occurs, like a
693 frame arriving. If that activity is worthy of a follow-up action (say
693 frame arriving. If that activity is worthy of a follow-up action (say
694 *run a command*), the return value of that handler will say so.
694 *run a command*), the return value of that handler will say so.
695
695
696 I/O and CPU intensive operations are purposefully delegated outside of
696 I/O and CPU intensive operations are purposefully delegated outside of
697 this class.
697 this class.
698
698
699 Consumers are expected to tell instances when events occur. They do so by
699 Consumers are expected to tell instances when events occur. They do so by
700 calling the various ``on*`` methods. These methods return a 2-tuple
700 calling the various ``on*`` methods. These methods return a 2-tuple
701 describing any follow-up action(s) to take. The first element is the
701 describing any follow-up action(s) to take. The first element is the
702 name of an action to perform. The second is a data structure (usually
702 name of an action to perform. The second is a data structure (usually
703 a dict) specific to that action that contains more information. e.g.
703 a dict) specific to that action that contains more information. e.g.
704 if the server wants to send frames back to the client, the data structure
704 if the server wants to send frames back to the client, the data structure
705 will contain a reference to those frames.
705 will contain a reference to those frames.
706
706
707 Valid actions that consumers can be instructed to take are:
707 Valid actions that consumers can be instructed to take are:
708
708
709 sendframes
709 sendframes
710 Indicates that frames should be sent to the client. The ``framegen``
710 Indicates that frames should be sent to the client. The ``framegen``
711 key contains a generator of frames that should be sent. The server
711 key contains a generator of frames that should be sent. The server
712 assumes that all frames are sent to the client.
712 assumes that all frames are sent to the client.
713
713
714 error
714 error
715 Indicates that an error occurred. Consumer should probably abort.
715 Indicates that an error occurred. Consumer should probably abort.
716
716
717 runcommand
717 runcommand
718 Indicates that the consumer should run a wire protocol command. Details
718 Indicates that the consumer should run a wire protocol command. Details
719 of the command to run are given in the data structure.
719 of the command to run are given in the data structure.
720
720
721 wantframe
721 wantframe
722 Indicates that nothing of interest happened and the server is waiting on
722 Indicates that nothing of interest happened and the server is waiting on
723 more frames from the client before anything interesting can be done.
723 more frames from the client before anything interesting can be done.
724
724
725 noop
725 noop
726 Indicates no additional action is required.
726 Indicates no additional action is required.
727
727
728 Known Issues
728 Known Issues
729 ------------
729 ------------
730
730
731 There are no limits to the number of partially received commands or their
731 There are no limits to the number of partially received commands or their
732 size. A malicious client could stream command request data and exhaust the
732 size. A malicious client could stream command request data and exhaust the
733 server's memory.
733 server's memory.
734
734
735 Partially received commands are not acted upon when end of input is
735 Partially received commands are not acted upon when end of input is
736 reached. Should the server error if it receives a partial request?
736 reached. Should the server error if it receives a partial request?
737 Should the client send a message to abort a partially transmitted request
737 Should the client send a message to abort a partially transmitted request
738 to facilitate graceful shutdown?
738 to facilitate graceful shutdown?
739
739
740 Active requests that haven't been responded to aren't tracked. This means
740 Active requests that haven't been responded to aren't tracked. This means
741 that if we receive a command and instruct its dispatch, another command
741 that if we receive a command and instruct its dispatch, another command
742 with its request ID can come in over the wire and there will be a race
742 with its request ID can come in over the wire and there will be a race
743 between who responds to what.
743 between who responds to what.
744 """
744 """
745
745
746 def __init__(self, deferoutput=False):
746 def __init__(self, deferoutput=False):
747 """Construct a new server reactor.
747 """Construct a new server reactor.
748
748
749 ``deferoutput`` can be used to indicate that no output frames should be
749 ``deferoutput`` can be used to indicate that no output frames should be
750 instructed to be sent until input has been exhausted. In this mode,
750 instructed to be sent until input has been exhausted. In this mode,
751 events that would normally generate output frames (such as a command
751 events that would normally generate output frames (such as a command
752 response being ready) will instead defer instructing the consumer to
752 response being ready) will instead defer instructing the consumer to
753 send those frames. This is useful for half-duplex transports where the
753 send those frames. This is useful for half-duplex transports where the
754 sender cannot receive until all data has been transmitted.
754 sender cannot receive until all data has been transmitted.
755 """
755 """
756 self._deferoutput = deferoutput
756 self._deferoutput = deferoutput
757 self._state = 'initial'
757 self._state = 'initial'
758 self._nextoutgoingstreamid = 2
758 self._nextoutgoingstreamid = 2
759 self._bufferedframegens = []
759 self._bufferedframegens = []
760 # stream id -> stream instance for all active streams from the client.
760 # stream id -> stream instance for all active streams from the client.
761 self._incomingstreams = {}
761 self._incomingstreams = {}
762 self._outgoingstreams = {}
762 self._outgoingstreams = {}
763 # request id -> dict of commands that are actively being received.
763 # request id -> dict of commands that are actively being received.
764 self._receivingcommands = {}
764 self._receivingcommands = {}
765 # Request IDs that have been received and are actively being processed.
765 # Request IDs that have been received and are actively being processed.
766 # Once all output for a request has been sent, it is removed from this
766 # Once all output for a request has been sent, it is removed from this
767 # set.
767 # set.
768 self._activecommands = set()
768 self._activecommands = set()
769
769
770 self._protocolsettingsdecoder = None
770 self._protocolsettingsdecoder = None
771
771
772 # Sender protocol settings are optional. Set implied default values.
772 # Sender protocol settings are optional. Set implied default values.
773 self._sendersettings = dict(DEFAULT_PROTOCOL_SETTINGS)
773 self._sendersettings = dict(DEFAULT_PROTOCOL_SETTINGS)
774
774
775 def onframerecv(self, frame):
775 def onframerecv(self, frame):
776 """Process a frame that has been received off the wire.
776 """Process a frame that has been received off the wire.
777
777
778 Returns a dict with an ``action`` key that details what action,
778 Returns a dict with an ``action`` key that details what action,
779 if any, the consumer should take next.
779 if any, the consumer should take next.
780 """
780 """
781 if not frame.streamid % 2:
781 if not frame.streamid % 2:
782 self._state = 'errored'
782 self._state = 'errored'
783 return self._makeerrorresult(
783 return self._makeerrorresult(
784 _('received frame with even numbered stream ID: %d') %
784 _('received frame with even numbered stream ID: %d') %
785 frame.streamid)
785 frame.streamid)
786
786
787 if frame.streamid not in self._incomingstreams:
787 if frame.streamid not in self._incomingstreams:
788 if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM:
788 if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM:
789 self._state = 'errored'
789 self._state = 'errored'
790 return self._makeerrorresult(
790 return self._makeerrorresult(
791 _('received frame on unknown inactive stream without '
791 _('received frame on unknown inactive stream without '
792 'beginning of stream flag set'))
792 'beginning of stream flag set'))
793
793
794 self._incomingstreams[frame.streamid] = stream(frame.streamid)
794 self._incomingstreams[frame.streamid] = stream(frame.streamid)
795
795
796 if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED:
796 if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED:
797 # TODO handle decoding frames
797 # TODO handle decoding frames
798 self._state = 'errored'
798 self._state = 'errored'
799 raise error.ProgrammingError('support for decoding stream payloads '
799 raise error.ProgrammingError('support for decoding stream payloads '
800 'not yet implemented')
800 'not yet implemented')
801
801
802 if frame.streamflags & STREAM_FLAG_END_STREAM:
802 if frame.streamflags & STREAM_FLAG_END_STREAM:
803 del self._incomingstreams[frame.streamid]
803 del self._incomingstreams[frame.streamid]
804
804
805 handlers = {
805 handlers = {
806 'initial': self._onframeinitial,
806 'initial': self._onframeinitial,
807 'protocol-settings-receiving': self._onframeprotocolsettings,
807 'protocol-settings-receiving': self._onframeprotocolsettings,
808 'idle': self._onframeidle,
808 'idle': self._onframeidle,
809 'command-receiving': self._onframecommandreceiving,
809 'command-receiving': self._onframecommandreceiving,
810 'errored': self._onframeerrored,
810 'errored': self._onframeerrored,
811 }
811 }
812
812
813 meth = handlers.get(self._state)
813 meth = handlers.get(self._state)
814 if not meth:
814 if not meth:
815 raise error.ProgrammingError('unhandled state: %s' % self._state)
815 raise error.ProgrammingError('unhandled state: %s' % self._state)
816
816
817 return meth(frame)
817 return meth(frame)
818
818
819 def oncommandresponseready(self, stream, requestid, data):
819 def oncommandresponseready(self, stream, requestid, data):
820 """Signal that a bytes response is ready to be sent to the client.
820 """Signal that a bytes response is ready to be sent to the client.
821
821
822 The raw bytes response is passed as an argument.
822 The raw bytes response is passed as an argument.
823 """
823 """
824 ensureserverstream(stream)
824 ensureserverstream(stream)
825
825
826 def sendframes():
826 def sendframes():
827 for frame in createcommandresponseframesfrombytes(stream, requestid,
827 for frame in createcommandresponseframesfrombytes(stream, requestid,
828 data):
828 data):
829 yield frame
829 yield frame
830
830
831 self._activecommands.remove(requestid)
831 self._activecommands.remove(requestid)
832
832
833 result = sendframes()
833 result = sendframes()
834
834
835 if self._deferoutput:
835 if self._deferoutput:
836 self._bufferedframegens.append(result)
836 self._bufferedframegens.append(result)
837 return 'noop', {}
837 return 'noop', {}
838 else:
838 else:
839 return 'sendframes', {
839 return 'sendframes', {
840 'framegen': result,
840 'framegen': result,
841 }
841 }
842
842
843 def oncommandresponsereadyobjects(self, stream, requestid, objs):
843 def oncommandresponsereadyobjects(self, stream, requestid, objs):
844 """Signal that objects are ready to be sent to the client.
844 """Signal that objects are ready to be sent to the client.
845
845
846 ``objs`` is an iterable of objects (typically a generator) that will
846 ``objs`` is an iterable of objects (typically a generator) that will
847 be encoded via CBOR and added to frames, which will be sent to the
847 be encoded via CBOR and added to frames, which will be sent to the
848 client.
848 client.
849 """
849 """
850 ensureserverstream(stream)
850 ensureserverstream(stream)
851
851
852 # We need to take care over exception handling. Uncaught exceptions
852 # We need to take care over exception handling. Uncaught exceptions
853 # when generating frames could lead to premature end of the frame
853 # when generating frames could lead to premature end of the frame
854 # stream and the possibility of the server or client process getting
854 # stream and the possibility of the server or client process getting
855 # in a bad state.
855 # in a bad state.
856 #
856 #
857 # Keep in mind that if ``objs`` is a generator, advancing it could
857 # Keep in mind that if ``objs`` is a generator, advancing it could
858 # raise exceptions that originated in e.g. wire protocol command
858 # raise exceptions that originated in e.g. wire protocol command
859 # functions. That is why we differentiate between exceptions raised
859 # functions. That is why we differentiate between exceptions raised
860 # when iterating versus other exceptions that occur.
860 # when iterating versus other exceptions that occur.
861 #
861 #
862 # In all cases, when the function finishes, the request is fully
862 # In all cases, when the function finishes, the request is fully
863 # handled and no new frames for it should be seen.
863 # handled and no new frames for it should be seen.
864
864
865 def sendframes():
865 def sendframes():
866 emitted = False
866 emitted = False
867 alternatelocationsent = False
867 alternatelocationsent = False
868 emitter = bufferingcommandresponseemitter(stream, requestid)
868 emitter = bufferingcommandresponseemitter(stream, requestid)
869 while True:
869 while True:
870 try:
870 try:
871 o = next(objs)
871 o = next(objs)
872 except StopIteration:
872 except StopIteration:
873 for frame in emitter.send(None):
873 for frame in emitter.send(None):
874 yield frame
874 yield frame
875
875
876 if emitted:
876 if emitted:
877 yield createcommandresponseeosframe(stream, requestid)
877 yield createcommandresponseeosframe(stream, requestid)
878 break
878 break
879
879
880 except error.WireprotoCommandError as e:
880 except error.WireprotoCommandError as e:
881 for frame in createcommanderrorresponse(
881 for frame in createcommanderrorresponse(
882 stream, requestid, e.message, e.messageargs):
882 stream, requestid, e.message, e.messageargs):
883 yield frame
883 yield frame
884 break
884 break
885
885
886 except Exception as e:
886 except Exception as e:
887 for frame in createerrorframe(
887 for frame in createerrorframe(
888 stream, requestid, '%s' % stringutil.forcebytestr(e),
888 stream, requestid, '%s' % stringutil.forcebytestr(e),
889 errtype='server'):
889 errtype='server'):
890
890
891 yield frame
891 yield frame
892
892
893 break
893 break
894
894
895 try:
895 try:
896 # Alternate location responses can only be the first and
896 # Alternate location responses can only be the first and
897 # only object in the output stream.
897 # only object in the output stream.
898 if isinstance(o, wireprototypes.alternatelocationresponse):
898 if isinstance(o, wireprototypes.alternatelocationresponse):
899 if emitted:
899 if emitted:
900 raise error.ProgrammingError(
900 raise error.ProgrammingError(
901 'alternatelocationresponse seen after initial '
901 'alternatelocationresponse seen after initial '
902 'output object')
902 'output object')
903
903
904 yield createalternatelocationresponseframe(
904 yield createalternatelocationresponseframe(
905 stream, requestid, o)
905 stream, requestid, o)
906
906
907 alternatelocationsent = True
907 alternatelocationsent = True
908 emitted = True
908 emitted = True
909 continue
909 continue
910
910
911 if alternatelocationsent:
911 if alternatelocationsent:
912 raise error.ProgrammingError(
912 raise error.ProgrammingError(
913 'object follows alternatelocationresponse')
913 'object follows alternatelocationresponse')
914
914
915 if not emitted:
915 if not emitted:
916 yield createcommandresponseokframe(stream, requestid)
916 yield createcommandresponseokframe(stream, requestid)
917 emitted = True
917 emitted = True
918
918
919 # Objects emitted by command functions can be serializable
919 # Objects emitted by command functions can be serializable
920 # data structures or special types.
920 # data structures or special types.
921 # TODO consider extracting the content normalization to a
921 # TODO consider extracting the content normalization to a
922 # standalone function, as it may be useful for e.g. cachers.
922 # standalone function, as it may be useful for e.g. cachers.
923
923
924 # A pre-encoded object is sent directly to the emitter.
924 # A pre-encoded object is sent directly to the emitter.
925 if isinstance(o, wireprototypes.encodedresponse):
925 if isinstance(o, wireprototypes.encodedresponse):
926 for frame in emitter.send(o.data):
926 for frame in emitter.send(o.data):
927 yield frame
927 yield frame
928
928
929 # A regular object is CBOR encoded.
929 # A regular object is CBOR encoded.
930 else:
930 else:
931 for chunk in cborutil.streamencode(o):
931 for chunk in cborutil.streamencode(o):
932 for frame in emitter.send(chunk):
932 for frame in emitter.send(chunk):
933 yield frame
933 yield frame
934
934
935 except Exception as e:
935 except Exception as e:
936 for frame in createerrorframe(stream, requestid,
936 for frame in createerrorframe(stream, requestid,
937 '%s' % e,
937 '%s' % e,
938 errtype='server'):
938 errtype='server'):
939 yield frame
939 yield frame
940
940
941 break
941 break
942
942
943 self._activecommands.remove(requestid)
943 self._activecommands.remove(requestid)
944
944
945 return self._handlesendframes(sendframes())
945 return self._handlesendframes(sendframes())
946
946
947 def oninputeof(self):
947 def oninputeof(self):
948 """Signals that end of input has been received.
948 """Signals that end of input has been received.
949
949
950 No more frames will be received. All pending activity should be
950 No more frames will be received. All pending activity should be
951 completed.
951 completed.
952 """
952 """
953 # TODO should we do anything about in-flight commands?
953 # TODO should we do anything about in-flight commands?
954
954
955 if not self._deferoutput or not self._bufferedframegens:
955 if not self._deferoutput or not self._bufferedframegens:
956 return 'noop', {}
956 return 'noop', {}
957
957
958 # If we buffered all our responses, emit those.
958 # If we buffered all our responses, emit those.
959 def makegen():
959 def makegen():
960 for gen in self._bufferedframegens:
960 for gen in self._bufferedframegens:
961 for frame in gen:
961 for frame in gen:
962 yield frame
962 yield frame
963
963
964 return 'sendframes', {
964 return 'sendframes', {
965 'framegen': makegen(),
965 'framegen': makegen(),
966 }
966 }
967
967
968 def _handlesendframes(self, framegen):
968 def _handlesendframes(self, framegen):
969 if self._deferoutput:
969 if self._deferoutput:
970 self._bufferedframegens.append(framegen)
970 self._bufferedframegens.append(framegen)
971 return 'noop', {}
971 return 'noop', {}
972 else:
972 else:
973 return 'sendframes', {
973 return 'sendframes', {
974 'framegen': framegen,
974 'framegen': framegen,
975 }
975 }
976
976
977 def onservererror(self, stream, requestid, msg):
977 def onservererror(self, stream, requestid, msg):
978 ensureserverstream(stream)
978 ensureserverstream(stream)
979
979
980 def sendframes():
980 def sendframes():
981 for frame in createerrorframe(stream, requestid, msg,
981 for frame in createerrorframe(stream, requestid, msg,
982 errtype='server'):
982 errtype='server'):
983 yield frame
983 yield frame
984
984
985 self._activecommands.remove(requestid)
985 self._activecommands.remove(requestid)
986
986
987 return self._handlesendframes(sendframes())
987 return self._handlesendframes(sendframes())
988
988
989 def oncommanderror(self, stream, requestid, message, args=None):
989 def oncommanderror(self, stream, requestid, message, args=None):
990 """Called when a command encountered an error before sending output."""
990 """Called when a command encountered an error before sending output."""
991 ensureserverstream(stream)
991 ensureserverstream(stream)
992
992
993 def sendframes():
993 def sendframes():
994 for frame in createcommanderrorresponse(stream, requestid, message,
994 for frame in createcommanderrorresponse(stream, requestid, message,
995 args):
995 args):
996 yield frame
996 yield frame
997
997
998 self._activecommands.remove(requestid)
998 self._activecommands.remove(requestid)
999
999
1000 return self._handlesendframes(sendframes())
1000 return self._handlesendframes(sendframes())
1001
1001
1002 def makeoutputstream(self):
1002 def makeoutputstream(self):
1003 """Create a stream to be used for sending data to the client."""
1003 """Create a stream to be used for sending data to the client."""
1004 streamid = self._nextoutgoingstreamid
1004 streamid = self._nextoutgoingstreamid
1005 self._nextoutgoingstreamid += 2
1005 self._nextoutgoingstreamid += 2
1006
1006
1007 s = stream(streamid)
1007 s = stream(streamid)
1008 self._outgoingstreams[streamid] = s
1008 self._outgoingstreams[streamid] = s
1009
1009
1010 return s
1010 return s
1011
1011
1012 def _makeerrorresult(self, msg):
1012 def _makeerrorresult(self, msg):
1013 return 'error', {
1013 return 'error', {
1014 'message': msg,
1014 'message': msg,
1015 }
1015 }
1016
1016
1017 def _makeruncommandresult(self, requestid):
1017 def _makeruncommandresult(self, requestid):
1018 entry = self._receivingcommands[requestid]
1018 entry = self._receivingcommands[requestid]
1019
1019
1020 if not entry['requestdone']:
1020 if not entry['requestdone']:
1021 self._state = 'errored'
1021 self._state = 'errored'
1022 raise error.ProgrammingError('should not be called without '
1022 raise error.ProgrammingError('should not be called without '
1023 'requestdone set')
1023 'requestdone set')
1024
1024
1025 del self._receivingcommands[requestid]
1025 del self._receivingcommands[requestid]
1026
1026
1027 if self._receivingcommands:
1027 if self._receivingcommands:
1028 self._state = 'command-receiving'
1028 self._state = 'command-receiving'
1029 else:
1029 else:
1030 self._state = 'idle'
1030 self._state = 'idle'
1031
1031
1032 # Decode the payloads as CBOR.
1032 # Decode the payloads as CBOR.
1033 entry['payload'].seek(0)
1033 entry['payload'].seek(0)
1034 request = cborutil.decodeall(entry['payload'].getvalue())[0]
1034 request = cborutil.decodeall(entry['payload'].getvalue())[0]
1035
1035
1036 if b'name' not in request:
1036 if b'name' not in request:
1037 self._state = 'errored'
1037 self._state = 'errored'
1038 return self._makeerrorresult(
1038 return self._makeerrorresult(
1039 _('command request missing "name" field'))
1039 _('command request missing "name" field'))
1040
1040
1041 if b'args' not in request:
1041 if b'args' not in request:
1042 request[b'args'] = {}
1042 request[b'args'] = {}
1043
1043
1044 assert requestid not in self._activecommands
1044 assert requestid not in self._activecommands
1045 self._activecommands.add(requestid)
1045 self._activecommands.add(requestid)
1046
1046
1047 return 'runcommand', {
1047 return 'runcommand', {
1048 'requestid': requestid,
1048 'requestid': requestid,
1049 'command': request[b'name'],
1049 'command': request[b'name'],
1050 'args': request[b'args'],
1050 'args': request[b'args'],
1051 'redirect': request.get(b'redirect'),
1051 'redirect': request.get(b'redirect'),
1052 'data': entry['data'].getvalue() if entry['data'] else None,
1052 'data': entry['data'].getvalue() if entry['data'] else None,
1053 }
1053 }
1054
1054
1055 def _makewantframeresult(self):
1055 def _makewantframeresult(self):
1056 return 'wantframe', {
1056 return 'wantframe', {
1057 'state': self._state,
1057 'state': self._state,
1058 }
1058 }
1059
1059
1060 def _validatecommandrequestframe(self, frame):
1060 def _validatecommandrequestframe(self, frame):
1061 new = frame.flags & FLAG_COMMAND_REQUEST_NEW
1061 new = frame.flags & FLAG_COMMAND_REQUEST_NEW
1062 continuation = frame.flags & FLAG_COMMAND_REQUEST_CONTINUATION
1062 continuation = frame.flags & FLAG_COMMAND_REQUEST_CONTINUATION
1063
1063
1064 if new and continuation:
1064 if new and continuation:
1065 self._state = 'errored'
1065 self._state = 'errored'
1066 return self._makeerrorresult(
1066 return self._makeerrorresult(
1067 _('received command request frame with both new and '
1067 _('received command request frame with both new and '
1068 'continuation flags set'))
1068 'continuation flags set'))
1069
1069
1070 if not new and not continuation:
1070 if not new and not continuation:
1071 self._state = 'errored'
1071 self._state = 'errored'
1072 return self._makeerrorresult(
1072 return self._makeerrorresult(
1073 _('received command request frame with neither new nor '
1073 _('received command request frame with neither new nor '
1074 'continuation flags set'))
1074 'continuation flags set'))
1075
1075
1076 def _onframeinitial(self, frame):
1076 def _onframeinitial(self, frame):
1077 # Called when we receive a frame when in the "initial" state.
1077 # Called when we receive a frame when in the "initial" state.
1078 if frame.typeid == FRAME_TYPE_SENDER_PROTOCOL_SETTINGS:
1078 if frame.typeid == FRAME_TYPE_SENDER_PROTOCOL_SETTINGS:
1079 self._state = 'protocol-settings-receiving'
1079 self._state = 'protocol-settings-receiving'
1080 self._protocolsettingsdecoder = cborutil.bufferingdecoder()
1080 self._protocolsettingsdecoder = cborutil.bufferingdecoder()
1081 return self._onframeprotocolsettings(frame)
1081 return self._onframeprotocolsettings(frame)
1082
1082
1083 elif frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
1083 elif frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
1084 self._state = 'idle'
1084 self._state = 'idle'
1085 return self._onframeidle(frame)
1085 return self._onframeidle(frame)
1086
1086
1087 else:
1087 else:
1088 self._state = 'errored'
1088 self._state = 'errored'
1089 return self._makeerrorresult(
1089 return self._makeerrorresult(
1090 _('expected sender protocol settings or command request '
1090 _('expected sender protocol settings or command request '
1091 'frame; got %d') % frame.typeid)
1091 'frame; got %d') % frame.typeid)
1092
1092
1093 def _onframeprotocolsettings(self, frame):
1093 def _onframeprotocolsettings(self, frame):
1094 assert self._state == 'protocol-settings-receiving'
1094 assert self._state == 'protocol-settings-receiving'
1095 assert self._protocolsettingsdecoder is not None
1095 assert self._protocolsettingsdecoder is not None
1096
1096
1097 if frame.typeid != FRAME_TYPE_SENDER_PROTOCOL_SETTINGS:
1097 if frame.typeid != FRAME_TYPE_SENDER_PROTOCOL_SETTINGS:
1098 self._state = 'errored'
1098 self._state = 'errored'
1099 return self._makeerrorresult(
1099 return self._makeerrorresult(
1100 _('expected sender protocol settings frame; got %d') %
1100 _('expected sender protocol settings frame; got %d') %
1101 frame.typeid)
1101 frame.typeid)
1102
1102
1103 more = frame.flags & FLAG_SENDER_PROTOCOL_SETTINGS_CONTINUATION
1103 more = frame.flags & FLAG_SENDER_PROTOCOL_SETTINGS_CONTINUATION
1104 eos = frame.flags & FLAG_SENDER_PROTOCOL_SETTINGS_EOS
1104 eos = frame.flags & FLAG_SENDER_PROTOCOL_SETTINGS_EOS
1105
1105
1106 if more and eos:
1106 if more and eos:
1107 self._state = 'errored'
1107 self._state = 'errored'
1108 return self._makeerrorresult(
1108 return self._makeerrorresult(
1109 _('sender protocol settings frame cannot have both '
1109 _('sender protocol settings frame cannot have both '
1110 'continuation and end of stream flags set'))
1110 'continuation and end of stream flags set'))
1111
1111
1112 if not more and not eos:
1112 if not more and not eos:
1113 self._state = 'errored'
1113 self._state = 'errored'
1114 return self._makeerrorresult(
1114 return self._makeerrorresult(
1115 _('sender protocol settings frame must have continuation or '
1115 _('sender protocol settings frame must have continuation or '
1116 'end of stream flag set'))
1116 'end of stream flag set'))
1117
1117
1118 # TODO establish limits for maximum amount of data that can be
1118 # TODO establish limits for maximum amount of data that can be
1119 # buffered.
1119 # buffered.
1120 try:
1120 try:
1121 self._protocolsettingsdecoder.decode(frame.payload)
1121 self._protocolsettingsdecoder.decode(frame.payload)
1122 except Exception as e:
1122 except Exception as e:
1123 self._state = 'errored'
1123 self._state = 'errored'
1124 return self._makeerrorresult(
1124 return self._makeerrorresult(
1125 _('error decoding CBOR from sender protocol settings frame: %s')
1125 _('error decoding CBOR from sender protocol settings frame: %s')
1126 % stringutil.forcebytestr(e))
1126 % stringutil.forcebytestr(e))
1127
1127
1128 if more:
1128 if more:
1129 return self._makewantframeresult()
1129 return self._makewantframeresult()
1130
1130
1131 assert eos
1131 assert eos
1132
1132
1133 decoded = self._protocolsettingsdecoder.getavailable()
1133 decoded = self._protocolsettingsdecoder.getavailable()
1134 self._protocolsettingsdecoder = None
1134 self._protocolsettingsdecoder = None
1135
1135
1136 if not decoded:
1136 if not decoded:
1137 self._state = 'errored'
1137 self._state = 'errored'
1138 return self._makeerrorresult(
1138 return self._makeerrorresult(
1139 _('sender protocol settings frame did not contain CBOR data'))
1139 _('sender protocol settings frame did not contain CBOR data'))
1140 elif len(decoded) > 1:
1140 elif len(decoded) > 1:
1141 self._state = 'errored'
1141 self._state = 'errored'
1142 return self._makeerrorresult(
1142 return self._makeerrorresult(
1143 _('sender protocol settings frame contained multiple CBOR '
1143 _('sender protocol settings frame contained multiple CBOR '
1144 'values'))
1144 'values'))
1145
1145
1146 d = decoded[0]
1146 d = decoded[0]
1147
1147
1148 if b'contentencodings' in d:
1148 if b'contentencodings' in d:
1149 self._sendersettings['contentencodings'] = d[b'contentencodings']
1149 self._sendersettings['contentencodings'] = d[b'contentencodings']
1150
1150
1151 self._state = 'idle'
1151 self._state = 'idle'
1152
1152
1153 return self._makewantframeresult()
1153 return self._makewantframeresult()
1154
1154
1155 def _onframeidle(self, frame):
1155 def _onframeidle(self, frame):
1156 # The only frame type that should be received in this state is a
1156 # The only frame type that should be received in this state is a
1157 # command request.
1157 # command request.
1158 if frame.typeid != FRAME_TYPE_COMMAND_REQUEST:
1158 if frame.typeid != FRAME_TYPE_COMMAND_REQUEST:
1159 self._state = 'errored'
1159 self._state = 'errored'
1160 return self._makeerrorresult(
1160 return self._makeerrorresult(
1161 _('expected command request frame; got %d') % frame.typeid)
1161 _('expected command request frame; got %d') % frame.typeid)
1162
1162
1163 res = self._validatecommandrequestframe(frame)
1163 res = self._validatecommandrequestframe(frame)
1164 if res:
1164 if res:
1165 return res
1165 return res
1166
1166
1167 if frame.requestid in self._receivingcommands:
1167 if frame.requestid in self._receivingcommands:
1168 self._state = 'errored'
1168 self._state = 'errored'
1169 return self._makeerrorresult(
1169 return self._makeerrorresult(
1170 _('request with ID %d already received') % frame.requestid)
1170 _('request with ID %d already received') % frame.requestid)
1171
1171
1172 if frame.requestid in self._activecommands:
1172 if frame.requestid in self._activecommands:
1173 self._state = 'errored'
1173 self._state = 'errored'
1174 return self._makeerrorresult(
1174 return self._makeerrorresult(
1175 _('request with ID %d is already active') % frame.requestid)
1175 _('request with ID %d is already active') % frame.requestid)
1176
1176
1177 new = frame.flags & FLAG_COMMAND_REQUEST_NEW
1177 new = frame.flags & FLAG_COMMAND_REQUEST_NEW
1178 moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES
1178 moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES
1179 expectingdata = frame.flags & FLAG_COMMAND_REQUEST_EXPECT_DATA
1179 expectingdata = frame.flags & FLAG_COMMAND_REQUEST_EXPECT_DATA
1180
1180
1181 if not new:
1181 if not new:
1182 self._state = 'errored'
1182 self._state = 'errored'
1183 return self._makeerrorresult(
1183 return self._makeerrorresult(
1184 _('received command request frame without new flag set'))
1184 _('received command request frame without new flag set'))
1185
1185
1186 payload = util.bytesio()
1186 payload = util.bytesio()
1187 payload.write(frame.payload)
1187 payload.write(frame.payload)
1188
1188
1189 self._receivingcommands[frame.requestid] = {
1189 self._receivingcommands[frame.requestid] = {
1190 'payload': payload,
1190 'payload': payload,
1191 'data': None,
1191 'data': None,
1192 'requestdone': not moreframes,
1192 'requestdone': not moreframes,
1193 'expectingdata': bool(expectingdata),
1193 'expectingdata': bool(expectingdata),
1194 }
1194 }
1195
1195
1196 # This is the final frame for this request. Dispatch it.
1196 # This is the final frame for this request. Dispatch it.
1197 if not moreframes and not expectingdata:
1197 if not moreframes and not expectingdata:
1198 return self._makeruncommandresult(frame.requestid)
1198 return self._makeruncommandresult(frame.requestid)
1199
1199
1200 assert moreframes or expectingdata
1200 assert moreframes or expectingdata
1201 self._state = 'command-receiving'
1201 self._state = 'command-receiving'
1202 return self._makewantframeresult()
1202 return self._makewantframeresult()
1203
1203
1204 def _onframecommandreceiving(self, frame):
1204 def _onframecommandreceiving(self, frame):
1205 if frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
1205 if frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
1206 # Process new command requests as such.
1206 # Process new command requests as such.
1207 if frame.flags & FLAG_COMMAND_REQUEST_NEW:
1207 if frame.flags & FLAG_COMMAND_REQUEST_NEW:
1208 return self._onframeidle(frame)
1208 return self._onframeidle(frame)
1209
1209
1210 res = self._validatecommandrequestframe(frame)
1210 res = self._validatecommandrequestframe(frame)
1211 if res:
1211 if res:
1212 return res
1212 return res
1213
1213
1214 # All other frames should be related to a command that is currently
1214 # All other frames should be related to a command that is currently
1215 # receiving but is not active.
1215 # receiving but is not active.
1216 if frame.requestid in self._activecommands:
1216 if frame.requestid in self._activecommands:
1217 self._state = 'errored'
1217 self._state = 'errored'
1218 return self._makeerrorresult(
1218 return self._makeerrorresult(
1219 _('received frame for request that is still active: %d') %
1219 _('received frame for request that is still active: %d') %
1220 frame.requestid)
1220 frame.requestid)
1221
1221
1222 if frame.requestid not in self._receivingcommands:
1222 if frame.requestid not in self._receivingcommands:
1223 self._state = 'errored'
1223 self._state = 'errored'
1224 return self._makeerrorresult(
1224 return self._makeerrorresult(
1225 _('received frame for request that is not receiving: %d') %
1225 _('received frame for request that is not receiving: %d') %
1226 frame.requestid)
1226 frame.requestid)
1227
1227
1228 entry = self._receivingcommands[frame.requestid]
1228 entry = self._receivingcommands[frame.requestid]
1229
1229
1230 if frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
1230 if frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
1231 moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES
1231 moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES
1232 expectingdata = bool(frame.flags & FLAG_COMMAND_REQUEST_EXPECT_DATA)
1232 expectingdata = bool(frame.flags & FLAG_COMMAND_REQUEST_EXPECT_DATA)
1233
1233
1234 if entry['requestdone']:
1234 if entry['requestdone']:
1235 self._state = 'errored'
1235 self._state = 'errored'
1236 return self._makeerrorresult(
1236 return self._makeerrorresult(
1237 _('received command request frame when request frames '
1237 _('received command request frame when request frames '
1238 'were supposedly done'))
1238 'were supposedly done'))
1239
1239
1240 if expectingdata != entry['expectingdata']:
1240 if expectingdata != entry['expectingdata']:
1241 self._state = 'errored'
1241 self._state = 'errored'
1242 return self._makeerrorresult(
1242 return self._makeerrorresult(
1243 _('mismatch between expect data flag and previous frame'))
1243 _('mismatch between expect data flag and previous frame'))
1244
1244
1245 entry['payload'].write(frame.payload)
1245 entry['payload'].write(frame.payload)
1246
1246
1247 if not moreframes:
1247 if not moreframes:
1248 entry['requestdone'] = True
1248 entry['requestdone'] = True
1249
1249
1250 if not moreframes and not expectingdata:
1250 if not moreframes and not expectingdata:
1251 return self._makeruncommandresult(frame.requestid)
1251 return self._makeruncommandresult(frame.requestid)
1252
1252
1253 return self._makewantframeresult()
1253 return self._makewantframeresult()
1254
1254
1255 elif frame.typeid == FRAME_TYPE_COMMAND_DATA:
1255 elif frame.typeid == FRAME_TYPE_COMMAND_DATA:
1256 if not entry['expectingdata']:
1256 if not entry['expectingdata']:
1257 self._state = 'errored'
1257 self._state = 'errored'
1258 return self._makeerrorresult(_(
1258 return self._makeerrorresult(_(
1259 'received command data frame for request that is not '
1259 'received command data frame for request that is not '
1260 'expecting data: %d') % frame.requestid)
1260 'expecting data: %d') % frame.requestid)
1261
1261
1262 if entry['data'] is None:
1262 if entry['data'] is None:
1263 entry['data'] = util.bytesio()
1263 entry['data'] = util.bytesio()
1264
1264
1265 return self._handlecommanddataframe(frame, entry)
1265 return self._handlecommanddataframe(frame, entry)
1266 else:
1266 else:
1267 self._state = 'errored'
1267 self._state = 'errored'
1268 return self._makeerrorresult(_(
1268 return self._makeerrorresult(_(
1269 'received unexpected frame type: %d') % frame.typeid)
1269 'received unexpected frame type: %d') % frame.typeid)
1270
1270
1271 def _handlecommanddataframe(self, frame, entry):
1271 def _handlecommanddataframe(self, frame, entry):
1272 assert frame.typeid == FRAME_TYPE_COMMAND_DATA
1272 assert frame.typeid == FRAME_TYPE_COMMAND_DATA
1273
1273
1274 # TODO support streaming data instead of buffering it.
1274 # TODO support streaming data instead of buffering it.
1275 entry['data'].write(frame.payload)
1275 entry['data'].write(frame.payload)
1276
1276
1277 if frame.flags & FLAG_COMMAND_DATA_CONTINUATION:
1277 if frame.flags & FLAG_COMMAND_DATA_CONTINUATION:
1278 return self._makewantframeresult()
1278 return self._makewantframeresult()
1279 elif frame.flags & FLAG_COMMAND_DATA_EOS:
1279 elif frame.flags & FLAG_COMMAND_DATA_EOS:
1280 entry['data'].seek(0)
1280 entry['data'].seek(0)
1281 return self._makeruncommandresult(frame.requestid)
1281 return self._makeruncommandresult(frame.requestid)
1282 else:
1282 else:
1283 self._state = 'errored'
1283 self._state = 'errored'
1284 return self._makeerrorresult(_('command data frame without '
1284 return self._makeerrorresult(_('command data frame without '
1285 'flags'))
1285 'flags'))
1286
1286
1287 def _onframeerrored(self, frame):
1287 def _onframeerrored(self, frame):
1288 return self._makeerrorresult(_('server already errored'))
1288 return self._makeerrorresult(_('server already errored'))
1289
1289
1290 class commandrequest(object):
1290 class commandrequest(object):
1291 """Represents a request to run a command."""
1291 """Represents a request to run a command."""
1292
1292
1293 def __init__(self, requestid, name, args, datafh=None, redirect=None):
1293 def __init__(self, requestid, name, args, datafh=None, redirect=None):
1294 self.requestid = requestid
1294 self.requestid = requestid
1295 self.name = name
1295 self.name = name
1296 self.args = args
1296 self.args = args
1297 self.datafh = datafh
1297 self.datafh = datafh
1298 self.redirect = redirect
1298 self.redirect = redirect
1299 self.state = 'pending'
1299 self.state = 'pending'
1300
1300
1301 class clientreactor(object):
1301 class clientreactor(object):
1302 """Holds state of a client issuing frame-based protocol requests.
1302 """Holds state of a client issuing frame-based protocol requests.
1303
1303
1304 This is like ``serverreactor`` but for client-side state.
1304 This is like ``serverreactor`` but for client-side state.
1305
1305
1306 Each instance is bound to the lifetime of a connection. For persistent
1306 Each instance is bound to the lifetime of a connection. For persistent
1307 connection transports using e.g. TCP sockets and speaking the raw
1307 connection transports using e.g. TCP sockets and speaking the raw
1308 framing protocol, there will be a single instance for the lifetime of
1308 framing protocol, there will be a single instance for the lifetime of
1309 the TCP socket. For transports where there are multiple discrete
1309 the TCP socket. For transports where there are multiple discrete
1310 interactions (say tunneled within in HTTP request), there will be a
1310 interactions (say tunneled within in HTTP request), there will be a
1311 separate instance for each distinct interaction.
1311 separate instance for each distinct interaction.
1312
1313 Consumers are expected to tell instances when events occur by calling
1314 various methods. These methods return a 2-tuple describing any follow-up
1315 action(s) to take. The first element is the name of an action to
1316 perform. The second is a data structure (usually a dict) specific to
1317 that action that contains more information. e.g. if the reactor wants
1318 to send frames to the server, the data structure will contain a reference
1319 to those frames.
1320
1321 Valid actions that consumers can be instructed to take are:
1322
1323 noop
1324 Indicates no additional action is required.
1325
1326 sendframes
1327 Indicates that frames should be sent to the server. The ``framegen``
1328 key contains a generator of frames that should be sent. The reactor
1329 assumes that all frames in this generator are sent to the server.
1330
1331 error
1332 Indicates that an error occurred. The ``message`` key contains an
1333 error message describing the failure.
1334
1335 responsedata
1336 Indicates a response to a previously-issued command was received.
1337
1338 The ``request`` key contains the ``commandrequest`` instance that
1339 represents the request this data is for.
1340
1341 The ``data`` key contains the decoded data from the server.
1342
1343 ``expectmore`` and ``eos`` evaluate to True when more response data
1344 is expected to follow or we're at the end of the response stream,
1345 respectively.
1312 """
1346 """
1313 def __init__(self, hasmultiplesend=False, buffersends=True):
1347 def __init__(self, hasmultiplesend=False, buffersends=True):
1314 """Create a new instance.
1348 """Create a new instance.
1315
1349
1316 ``hasmultiplesend`` indicates whether multiple sends are supported
1350 ``hasmultiplesend`` indicates whether multiple sends are supported
1317 by the transport. When True, it is possible to send commands immediately
1351 by the transport. When True, it is possible to send commands immediately
1318 instead of buffering until the caller signals an intent to finish a
1352 instead of buffering until the caller signals an intent to finish a
1319 send operation.
1353 send operation.
1320
1354
1321 ``buffercommands`` indicates whether sends should be buffered until the
1355 ``buffercommands`` indicates whether sends should be buffered until the
1322 last request has been issued.
1356 last request has been issued.
1323 """
1357 """
1324 self._hasmultiplesend = hasmultiplesend
1358 self._hasmultiplesend = hasmultiplesend
1325 self._buffersends = buffersends
1359 self._buffersends = buffersends
1326
1360
1327 self._canissuecommands = True
1361 self._canissuecommands = True
1328 self._cansend = True
1362 self._cansend = True
1329
1363
1330 self._nextrequestid = 1
1364 self._nextrequestid = 1
1331 # We only support a single outgoing stream for now.
1365 # We only support a single outgoing stream for now.
1332 self._outgoingstream = stream(1)
1366 self._outgoingstream = stream(1)
1333 self._pendingrequests = collections.deque()
1367 self._pendingrequests = collections.deque()
1334 self._activerequests = {}
1368 self._activerequests = {}
1335 self._incomingstreams = {}
1369 self._incomingstreams = {}
1336
1370
1337 def callcommand(self, name, args, datafh=None, redirect=None):
1371 def callcommand(self, name, args, datafh=None, redirect=None):
1338 """Request that a command be executed.
1372 """Request that a command be executed.
1339
1373
1340 Receives the command name, a dict of arguments to pass to the command,
1374 Receives the command name, a dict of arguments to pass to the command,
1341 and an optional file object containing the raw data for the command.
1375 and an optional file object containing the raw data for the command.
1342
1376
1343 Returns a 3-tuple of (request, action, action data).
1377 Returns a 3-tuple of (request, action, action data).
1344 """
1378 """
1345 if not self._canissuecommands:
1379 if not self._canissuecommands:
1346 raise error.ProgrammingError('cannot issue new commands')
1380 raise error.ProgrammingError('cannot issue new commands')
1347
1381
1348 requestid = self._nextrequestid
1382 requestid = self._nextrequestid
1349 self._nextrequestid += 2
1383 self._nextrequestid += 2
1350
1384
1351 request = commandrequest(requestid, name, args, datafh=datafh,
1385 request = commandrequest(requestid, name, args, datafh=datafh,
1352 redirect=redirect)
1386 redirect=redirect)
1353
1387
1354 if self._buffersends:
1388 if self._buffersends:
1355 self._pendingrequests.append(request)
1389 self._pendingrequests.append(request)
1356 return request, 'noop', {}
1390 return request, 'noop', {}
1357 else:
1391 else:
1358 if not self._cansend:
1392 if not self._cansend:
1359 raise error.ProgrammingError('sends cannot be performed on '
1393 raise error.ProgrammingError('sends cannot be performed on '
1360 'this instance')
1394 'this instance')
1361
1395
1362 if not self._hasmultiplesend:
1396 if not self._hasmultiplesend:
1363 self._cansend = False
1397 self._cansend = False
1364 self._canissuecommands = False
1398 self._canissuecommands = False
1365
1399
1366 return request, 'sendframes', {
1400 return request, 'sendframes', {
1367 'framegen': self._makecommandframes(request),
1401 'framegen': self._makecommandframes(request),
1368 }
1402 }
1369
1403
1370 def flushcommands(self):
1404 def flushcommands(self):
1371 """Request that all queued commands be sent.
1405 """Request that all queued commands be sent.
1372
1406
1373 If any commands are buffered, this will instruct the caller to send
1407 If any commands are buffered, this will instruct the caller to send
1374 them over the wire. If no commands are buffered it instructs the client
1408 them over the wire. If no commands are buffered it instructs the client
1375 to no-op.
1409 to no-op.
1376
1410
1377 If instances aren't configured for multiple sends, no new command
1411 If instances aren't configured for multiple sends, no new command
1378 requests are allowed after this is called.
1412 requests are allowed after this is called.
1379 """
1413 """
1380 if not self._pendingrequests:
1414 if not self._pendingrequests:
1381 return 'noop', {}
1415 return 'noop', {}
1382
1416
1383 if not self._cansend:
1417 if not self._cansend:
1384 raise error.ProgrammingError('sends cannot be performed on this '
1418 raise error.ProgrammingError('sends cannot be performed on this '
1385 'instance')
1419 'instance')
1386
1420
1387 # If the instance only allows sending once, mark that we have fired
1421 # If the instance only allows sending once, mark that we have fired
1388 # our one shot.
1422 # our one shot.
1389 if not self._hasmultiplesend:
1423 if not self._hasmultiplesend:
1390 self._canissuecommands = False
1424 self._canissuecommands = False
1391 self._cansend = False
1425 self._cansend = False
1392
1426
1393 def makeframes():
1427 def makeframes():
1394 while self._pendingrequests:
1428 while self._pendingrequests:
1395 request = self._pendingrequests.popleft()
1429 request = self._pendingrequests.popleft()
1396 for frame in self._makecommandframes(request):
1430 for frame in self._makecommandframes(request):
1397 yield frame
1431 yield frame
1398
1432
1399 return 'sendframes', {
1433 return 'sendframes', {
1400 'framegen': makeframes(),
1434 'framegen': makeframes(),
1401 }
1435 }
1402
1436
1403 def _makecommandframes(self, request):
1437 def _makecommandframes(self, request):
1404 """Emit frames to issue a command request.
1438 """Emit frames to issue a command request.
1405
1439
1406 As a side-effect, update request accounting to reflect its changed
1440 As a side-effect, update request accounting to reflect its changed
1407 state.
1441 state.
1408 """
1442 """
1409 self._activerequests[request.requestid] = request
1443 self._activerequests[request.requestid] = request
1410 request.state = 'sending'
1444 request.state = 'sending'
1411
1445
1412 res = createcommandframes(self._outgoingstream,
1446 res = createcommandframes(self._outgoingstream,
1413 request.requestid,
1447 request.requestid,
1414 request.name,
1448 request.name,
1415 request.args,
1449 request.args,
1416 datafh=request.datafh,
1450 datafh=request.datafh,
1417 redirect=request.redirect)
1451 redirect=request.redirect)
1418
1452
1419 for frame in res:
1453 for frame in res:
1420 yield frame
1454 yield frame
1421
1455
1422 request.state = 'sent'
1456 request.state = 'sent'
1423
1457
1424 def onframerecv(self, frame):
1458 def onframerecv(self, frame):
1425 """Process a frame that has been received off the wire.
1459 """Process a frame that has been received off the wire.
1426
1460
1427 Returns a 2-tuple of (action, meta) describing further action the
1461 Returns a 2-tuple of (action, meta) describing further action the
1428 caller needs to take as a result of receiving this frame.
1462 caller needs to take as a result of receiving this frame.
1429 """
1463 """
1430 if frame.streamid % 2:
1464 if frame.streamid % 2:
1431 return 'error', {
1465 return 'error', {
1432 'message': (
1466 'message': (
1433 _('received frame with odd numbered stream ID: %d') %
1467 _('received frame with odd numbered stream ID: %d') %
1434 frame.streamid),
1468 frame.streamid),
1435 }
1469 }
1436
1470
1437 if frame.streamid not in self._incomingstreams:
1471 if frame.streamid not in self._incomingstreams:
1438 if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM:
1472 if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM:
1439 return 'error', {
1473 return 'error', {
1440 'message': _('received frame on unknown stream '
1474 'message': _('received frame on unknown stream '
1441 'without beginning of stream flag set'),
1475 'without beginning of stream flag set'),
1442 }
1476 }
1443
1477
1444 self._incomingstreams[frame.streamid] = stream(frame.streamid)
1478 self._incomingstreams[frame.streamid] = stream(frame.streamid)
1445
1479
1446 if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED:
1480 if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED:
1447 raise error.ProgrammingError('support for decoding stream '
1481 raise error.ProgrammingError('support for decoding stream '
1448 'payloads not yet implemneted')
1482 'payloads not yet implemneted')
1449
1483
1450 if frame.streamflags & STREAM_FLAG_END_STREAM:
1484 if frame.streamflags & STREAM_FLAG_END_STREAM:
1451 del self._incomingstreams[frame.streamid]
1485 del self._incomingstreams[frame.streamid]
1452
1486
1453 if frame.requestid not in self._activerequests:
1487 if frame.requestid not in self._activerequests:
1454 return 'error', {
1488 return 'error', {
1455 'message': (_('received frame for inactive request ID: %d') %
1489 'message': (_('received frame for inactive request ID: %d') %
1456 frame.requestid),
1490 frame.requestid),
1457 }
1491 }
1458
1492
1459 request = self._activerequests[frame.requestid]
1493 request = self._activerequests[frame.requestid]
1460 request.state = 'receiving'
1494 request.state = 'receiving'
1461
1495
1462 handlers = {
1496 handlers = {
1463 FRAME_TYPE_COMMAND_RESPONSE: self._oncommandresponseframe,
1497 FRAME_TYPE_COMMAND_RESPONSE: self._oncommandresponseframe,
1464 FRAME_TYPE_ERROR_RESPONSE: self._onerrorresponseframe,
1498 FRAME_TYPE_ERROR_RESPONSE: self._onerrorresponseframe,
1465 }
1499 }
1466
1500
1467 meth = handlers.get(frame.typeid)
1501 meth = handlers.get(frame.typeid)
1468 if not meth:
1502 if not meth:
1469 raise error.ProgrammingError('unhandled frame type: %d' %
1503 raise error.ProgrammingError('unhandled frame type: %d' %
1470 frame.typeid)
1504 frame.typeid)
1471
1505
1472 return meth(request, frame)
1506 return meth(request, frame)
1473
1507
1474 def _oncommandresponseframe(self, request, frame):
1508 def _oncommandresponseframe(self, request, frame):
1475 if frame.flags & FLAG_COMMAND_RESPONSE_EOS:
1509 if frame.flags & FLAG_COMMAND_RESPONSE_EOS:
1476 request.state = 'received'
1510 request.state = 'received'
1477 del self._activerequests[request.requestid]
1511 del self._activerequests[request.requestid]
1478
1512
1479 return 'responsedata', {
1513 return 'responsedata', {
1480 'request': request,
1514 'request': request,
1481 'expectmore': frame.flags & FLAG_COMMAND_RESPONSE_CONTINUATION,
1515 'expectmore': frame.flags & FLAG_COMMAND_RESPONSE_CONTINUATION,
1482 'eos': frame.flags & FLAG_COMMAND_RESPONSE_EOS,
1516 'eos': frame.flags & FLAG_COMMAND_RESPONSE_EOS,
1483 'data': frame.payload,
1517 'data': frame.payload,
1484 }
1518 }
1485
1519
1486 def _onerrorresponseframe(self, request, frame):
1520 def _onerrorresponseframe(self, request, frame):
1487 request.state = 'errored'
1521 request.state = 'errored'
1488 del self._activerequests[request.requestid]
1522 del self._activerequests[request.requestid]
1489
1523
1490 # The payload should be a CBOR map.
1524 # The payload should be a CBOR map.
1491 m = cborutil.decodeall(frame.payload)[0]
1525 m = cborutil.decodeall(frame.payload)[0]
1492
1526
1493 return 'error', {
1527 return 'error', {
1494 'request': request,
1528 'request': request,
1495 'type': m['type'],
1529 'type': m['type'],
1496 'message': m['message'],
1530 'message': m['message'],
1497 }
1531 }
General Comments 0
You need to be logged in to leave comments. Login now