##// END OF EJS Templates
wireproto: show unknown id and flags in repr(frame)...
Yuya Nishihara -
r37491:7c2c7c74 default
parent child Browse files
Show More
@@ -1,867 +1,870
1 # wireprotoframing.py - unified framing protocol for wire protocol
1 # wireprotoframing.py - unified framing protocol for wire protocol
2 #
2 #
3 # Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
3 # Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 # This file contains functionality to support the unified frame-based wire
8 # This file contains functionality to support the unified frame-based wire
9 # protocol. For details about the protocol, see
9 # protocol. For details about the protocol, see
10 # `hg help internals.wireprotocol`.
10 # `hg help internals.wireprotocol`.
11
11
12 from __future__ import absolute_import
12 from __future__ import absolute_import
13
13
14 import struct
14 import struct
15
15
16 from .i18n import _
16 from .i18n import _
17 from .thirdparty import (
17 from .thirdparty import (
18 attr,
18 attr,
19 cbor,
19 cbor,
20 )
20 )
21 from . import (
21 from . import (
22 error,
22 error,
23 util,
23 util,
24 )
24 )
25 from .utils import (
25 from .utils import (
26 stringutil,
26 stringutil,
27 )
27 )
28
28
29 FRAME_HEADER_SIZE = 8
29 FRAME_HEADER_SIZE = 8
30 DEFAULT_MAX_FRAME_SIZE = 32768
30 DEFAULT_MAX_FRAME_SIZE = 32768
31
31
32 STREAM_FLAG_BEGIN_STREAM = 0x01
32 STREAM_FLAG_BEGIN_STREAM = 0x01
33 STREAM_FLAG_END_STREAM = 0x02
33 STREAM_FLAG_END_STREAM = 0x02
34 STREAM_FLAG_ENCODING_APPLIED = 0x04
34 STREAM_FLAG_ENCODING_APPLIED = 0x04
35
35
36 STREAM_FLAGS = {
36 STREAM_FLAGS = {
37 b'stream-begin': STREAM_FLAG_BEGIN_STREAM,
37 b'stream-begin': STREAM_FLAG_BEGIN_STREAM,
38 b'stream-end': STREAM_FLAG_END_STREAM,
38 b'stream-end': STREAM_FLAG_END_STREAM,
39 b'encoded': STREAM_FLAG_ENCODING_APPLIED,
39 b'encoded': STREAM_FLAG_ENCODING_APPLIED,
40 }
40 }
41
41
42 FRAME_TYPE_COMMAND_REQUEST = 0x01
42 FRAME_TYPE_COMMAND_REQUEST = 0x01
43 FRAME_TYPE_COMMAND_DATA = 0x03
43 FRAME_TYPE_COMMAND_DATA = 0x03
44 FRAME_TYPE_BYTES_RESPONSE = 0x04
44 FRAME_TYPE_BYTES_RESPONSE = 0x04
45 FRAME_TYPE_ERROR_RESPONSE = 0x05
45 FRAME_TYPE_ERROR_RESPONSE = 0x05
46 FRAME_TYPE_TEXT_OUTPUT = 0x06
46 FRAME_TYPE_TEXT_OUTPUT = 0x06
47 FRAME_TYPE_PROGRESS = 0x07
47 FRAME_TYPE_PROGRESS = 0x07
48 FRAME_TYPE_STREAM_SETTINGS = 0x08
48 FRAME_TYPE_STREAM_SETTINGS = 0x08
49
49
50 FRAME_TYPES = {
50 FRAME_TYPES = {
51 b'command-request': FRAME_TYPE_COMMAND_REQUEST,
51 b'command-request': FRAME_TYPE_COMMAND_REQUEST,
52 b'command-data': FRAME_TYPE_COMMAND_DATA,
52 b'command-data': FRAME_TYPE_COMMAND_DATA,
53 b'bytes-response': FRAME_TYPE_BYTES_RESPONSE,
53 b'bytes-response': FRAME_TYPE_BYTES_RESPONSE,
54 b'error-response': FRAME_TYPE_ERROR_RESPONSE,
54 b'error-response': FRAME_TYPE_ERROR_RESPONSE,
55 b'text-output': FRAME_TYPE_TEXT_OUTPUT,
55 b'text-output': FRAME_TYPE_TEXT_OUTPUT,
56 b'progress': FRAME_TYPE_PROGRESS,
56 b'progress': FRAME_TYPE_PROGRESS,
57 b'stream-settings': FRAME_TYPE_STREAM_SETTINGS,
57 b'stream-settings': FRAME_TYPE_STREAM_SETTINGS,
58 }
58 }
59
59
60 FLAG_COMMAND_REQUEST_NEW = 0x01
60 FLAG_COMMAND_REQUEST_NEW = 0x01
61 FLAG_COMMAND_REQUEST_CONTINUATION = 0x02
61 FLAG_COMMAND_REQUEST_CONTINUATION = 0x02
62 FLAG_COMMAND_REQUEST_MORE_FRAMES = 0x04
62 FLAG_COMMAND_REQUEST_MORE_FRAMES = 0x04
63 FLAG_COMMAND_REQUEST_EXPECT_DATA = 0x08
63 FLAG_COMMAND_REQUEST_EXPECT_DATA = 0x08
64
64
65 FLAGS_COMMAND_REQUEST = {
65 FLAGS_COMMAND_REQUEST = {
66 b'new': FLAG_COMMAND_REQUEST_NEW,
66 b'new': FLAG_COMMAND_REQUEST_NEW,
67 b'continuation': FLAG_COMMAND_REQUEST_CONTINUATION,
67 b'continuation': FLAG_COMMAND_REQUEST_CONTINUATION,
68 b'more': FLAG_COMMAND_REQUEST_MORE_FRAMES,
68 b'more': FLAG_COMMAND_REQUEST_MORE_FRAMES,
69 b'have-data': FLAG_COMMAND_REQUEST_EXPECT_DATA,
69 b'have-data': FLAG_COMMAND_REQUEST_EXPECT_DATA,
70 }
70 }
71
71
72 FLAG_COMMAND_DATA_CONTINUATION = 0x01
72 FLAG_COMMAND_DATA_CONTINUATION = 0x01
73 FLAG_COMMAND_DATA_EOS = 0x02
73 FLAG_COMMAND_DATA_EOS = 0x02
74
74
75 FLAGS_COMMAND_DATA = {
75 FLAGS_COMMAND_DATA = {
76 b'continuation': FLAG_COMMAND_DATA_CONTINUATION,
76 b'continuation': FLAG_COMMAND_DATA_CONTINUATION,
77 b'eos': FLAG_COMMAND_DATA_EOS,
77 b'eos': FLAG_COMMAND_DATA_EOS,
78 }
78 }
79
79
80 FLAG_BYTES_RESPONSE_CONTINUATION = 0x01
80 FLAG_BYTES_RESPONSE_CONTINUATION = 0x01
81 FLAG_BYTES_RESPONSE_EOS = 0x02
81 FLAG_BYTES_RESPONSE_EOS = 0x02
82 FLAG_BYTES_RESPONSE_CBOR = 0x04
82 FLAG_BYTES_RESPONSE_CBOR = 0x04
83
83
84 FLAGS_BYTES_RESPONSE = {
84 FLAGS_BYTES_RESPONSE = {
85 b'continuation': FLAG_BYTES_RESPONSE_CONTINUATION,
85 b'continuation': FLAG_BYTES_RESPONSE_CONTINUATION,
86 b'eos': FLAG_BYTES_RESPONSE_EOS,
86 b'eos': FLAG_BYTES_RESPONSE_EOS,
87 b'cbor': FLAG_BYTES_RESPONSE_CBOR,
87 b'cbor': FLAG_BYTES_RESPONSE_CBOR,
88 }
88 }
89
89
90 FLAG_ERROR_RESPONSE_PROTOCOL = 0x01
90 FLAG_ERROR_RESPONSE_PROTOCOL = 0x01
91 FLAG_ERROR_RESPONSE_APPLICATION = 0x02
91 FLAG_ERROR_RESPONSE_APPLICATION = 0x02
92
92
93 FLAGS_ERROR_RESPONSE = {
93 FLAGS_ERROR_RESPONSE = {
94 b'protocol': FLAG_ERROR_RESPONSE_PROTOCOL,
94 b'protocol': FLAG_ERROR_RESPONSE_PROTOCOL,
95 b'application': FLAG_ERROR_RESPONSE_APPLICATION,
95 b'application': FLAG_ERROR_RESPONSE_APPLICATION,
96 }
96 }
97
97
98 # Maps frame types to their available flags.
98 # Maps frame types to their available flags.
99 FRAME_TYPE_FLAGS = {
99 FRAME_TYPE_FLAGS = {
100 FRAME_TYPE_COMMAND_REQUEST: FLAGS_COMMAND_REQUEST,
100 FRAME_TYPE_COMMAND_REQUEST: FLAGS_COMMAND_REQUEST,
101 FRAME_TYPE_COMMAND_DATA: FLAGS_COMMAND_DATA,
101 FRAME_TYPE_COMMAND_DATA: FLAGS_COMMAND_DATA,
102 FRAME_TYPE_BYTES_RESPONSE: FLAGS_BYTES_RESPONSE,
102 FRAME_TYPE_BYTES_RESPONSE: FLAGS_BYTES_RESPONSE,
103 FRAME_TYPE_ERROR_RESPONSE: FLAGS_ERROR_RESPONSE,
103 FRAME_TYPE_ERROR_RESPONSE: FLAGS_ERROR_RESPONSE,
104 FRAME_TYPE_TEXT_OUTPUT: {},
104 FRAME_TYPE_TEXT_OUTPUT: {},
105 FRAME_TYPE_PROGRESS: {},
105 FRAME_TYPE_PROGRESS: {},
106 FRAME_TYPE_STREAM_SETTINGS: {},
106 FRAME_TYPE_STREAM_SETTINGS: {},
107 }
107 }
108
108
109 ARGUMENT_RECORD_HEADER = struct.Struct(r'<HH')
109 ARGUMENT_RECORD_HEADER = struct.Struct(r'<HH')
110
110
111 def humanflags(mapping, value):
111 def humanflags(mapping, value):
112 """Convert a numeric flags value to a human value, using a mapping table."""
112 """Convert a numeric flags value to a human value, using a mapping table."""
113 namemap = {v: k for k, v in mapping.iteritems()}
113 flags = []
114 flags = []
114 for val, name in sorted({v: k for k, v in mapping.iteritems()}.iteritems()):
115 val = 1
116 while value >= val:
115 if value & val:
117 if value & val:
116 flags.append(name)
118 flags.append(namemap.get(val, '<unknown 0x%02x>' % val))
119 val <<= 1
117
120
118 return b'|'.join(flags)
121 return b'|'.join(flags)
119
122
120 @attr.s(slots=True)
123 @attr.s(slots=True)
121 class frameheader(object):
124 class frameheader(object):
122 """Represents the data in a frame header."""
125 """Represents the data in a frame header."""
123
126
124 length = attr.ib()
127 length = attr.ib()
125 requestid = attr.ib()
128 requestid = attr.ib()
126 streamid = attr.ib()
129 streamid = attr.ib()
127 streamflags = attr.ib()
130 streamflags = attr.ib()
128 typeid = attr.ib()
131 typeid = attr.ib()
129 flags = attr.ib()
132 flags = attr.ib()
130
133
131 @attr.s(slots=True, repr=False)
134 @attr.s(slots=True, repr=False)
132 class frame(object):
135 class frame(object):
133 """Represents a parsed frame."""
136 """Represents a parsed frame."""
134
137
135 requestid = attr.ib()
138 requestid = attr.ib()
136 streamid = attr.ib()
139 streamid = attr.ib()
137 streamflags = attr.ib()
140 streamflags = attr.ib()
138 typeid = attr.ib()
141 typeid = attr.ib()
139 flags = attr.ib()
142 flags = attr.ib()
140 payload = attr.ib()
143 payload = attr.ib()
141
144
142 def __repr__(self):
145 def __repr__(self):
143 typename = '<unknown>'
146 typename = '<unknown 0x%02x>' % self.typeid
144 for name, value in FRAME_TYPES.iteritems():
147 for name, value in FRAME_TYPES.iteritems():
145 if value == self.typeid:
148 if value == self.typeid:
146 typename = name
149 typename = name
147 break
150 break
148
151
149 return ('frame(size=%d; request=%d; stream=%d; streamflags=%s; '
152 return ('frame(size=%d; request=%d; stream=%d; streamflags=%s; '
150 'type=%s; flags=%s)' % (
153 'type=%s; flags=%s)' % (
151 len(self.payload), self.requestid, self.streamid,
154 len(self.payload), self.requestid, self.streamid,
152 humanflags(STREAM_FLAGS, self.streamflags), typename,
155 humanflags(STREAM_FLAGS, self.streamflags), typename,
153 humanflags(FRAME_TYPE_FLAGS.get(self.typeid, {}), self.flags)))
156 humanflags(FRAME_TYPE_FLAGS.get(self.typeid, {}), self.flags)))
154
157
155 def makeframe(requestid, streamid, streamflags, typeid, flags, payload):
158 def makeframe(requestid, streamid, streamflags, typeid, flags, payload):
156 """Assemble a frame into a byte array."""
159 """Assemble a frame into a byte array."""
157 # TODO assert size of payload.
160 # TODO assert size of payload.
158 frame = bytearray(FRAME_HEADER_SIZE + len(payload))
161 frame = bytearray(FRAME_HEADER_SIZE + len(payload))
159
162
160 # 24 bits length
163 # 24 bits length
161 # 16 bits request id
164 # 16 bits request id
162 # 8 bits stream id
165 # 8 bits stream id
163 # 8 bits stream flags
166 # 8 bits stream flags
164 # 4 bits type
167 # 4 bits type
165 # 4 bits flags
168 # 4 bits flags
166
169
167 l = struct.pack(r'<I', len(payload))
170 l = struct.pack(r'<I', len(payload))
168 frame[0:3] = l[0:3]
171 frame[0:3] = l[0:3]
169 struct.pack_into(r'<HBB', frame, 3, requestid, streamid, streamflags)
172 struct.pack_into(r'<HBB', frame, 3, requestid, streamid, streamflags)
170 frame[7] = (typeid << 4) | flags
173 frame[7] = (typeid << 4) | flags
171 frame[8:] = payload
174 frame[8:] = payload
172
175
173 return frame
176 return frame
174
177
175 def makeframefromhumanstring(s):
178 def makeframefromhumanstring(s):
176 """Create a frame from a human readable string
179 """Create a frame from a human readable string
177
180
178 DANGER: NOT SAFE TO USE WITH UNTRUSTED INPUT BECAUSE OF POTENTIAL
181 DANGER: NOT SAFE TO USE WITH UNTRUSTED INPUT BECAUSE OF POTENTIAL
179 eval() USAGE. DO NOT USE IN CORE.
182 eval() USAGE. DO NOT USE IN CORE.
180
183
181 Strings have the form:
184 Strings have the form:
182
185
183 <request-id> <stream-id> <stream-flags> <type> <flags> <payload>
186 <request-id> <stream-id> <stream-flags> <type> <flags> <payload>
184
187
185 This can be used by user-facing applications and tests for creating
188 This can be used by user-facing applications and tests for creating
186 frames easily without having to type out a bunch of constants.
189 frames easily without having to type out a bunch of constants.
187
190
188 Request ID and stream IDs are integers.
191 Request ID and stream IDs are integers.
189
192
190 Stream flags, frame type, and flags can be specified by integer or
193 Stream flags, frame type, and flags can be specified by integer or
191 named constant.
194 named constant.
192
195
193 Flags can be delimited by `|` to bitwise OR them together.
196 Flags can be delimited by `|` to bitwise OR them together.
194
197
195 If the payload begins with ``cbor:``, the following string will be
198 If the payload begins with ``cbor:``, the following string will be
196 evaluated as Python code and the resulting object will be fed into
199 evaluated as Python code and the resulting object will be fed into
197 a CBOR encoder. Otherwise, the payload is interpreted as a Python
200 a CBOR encoder. Otherwise, the payload is interpreted as a Python
198 byte string literal.
201 byte string literal.
199 """
202 """
200 fields = s.split(b' ', 5)
203 fields = s.split(b' ', 5)
201 requestid, streamid, streamflags, frametype, frameflags, payload = fields
204 requestid, streamid, streamflags, frametype, frameflags, payload = fields
202
205
203 requestid = int(requestid)
206 requestid = int(requestid)
204 streamid = int(streamid)
207 streamid = int(streamid)
205
208
206 finalstreamflags = 0
209 finalstreamflags = 0
207 for flag in streamflags.split(b'|'):
210 for flag in streamflags.split(b'|'):
208 if flag in STREAM_FLAGS:
211 if flag in STREAM_FLAGS:
209 finalstreamflags |= STREAM_FLAGS[flag]
212 finalstreamflags |= STREAM_FLAGS[flag]
210 else:
213 else:
211 finalstreamflags |= int(flag)
214 finalstreamflags |= int(flag)
212
215
213 if frametype in FRAME_TYPES:
216 if frametype in FRAME_TYPES:
214 frametype = FRAME_TYPES[frametype]
217 frametype = FRAME_TYPES[frametype]
215 else:
218 else:
216 frametype = int(frametype)
219 frametype = int(frametype)
217
220
218 finalflags = 0
221 finalflags = 0
219 validflags = FRAME_TYPE_FLAGS[frametype]
222 validflags = FRAME_TYPE_FLAGS[frametype]
220 for flag in frameflags.split(b'|'):
223 for flag in frameflags.split(b'|'):
221 if flag in validflags:
224 if flag in validflags:
222 finalflags |= validflags[flag]
225 finalflags |= validflags[flag]
223 else:
226 else:
224 finalflags |= int(flag)
227 finalflags |= int(flag)
225
228
226 if payload.startswith(b'cbor:'):
229 if payload.startswith(b'cbor:'):
227 payload = cbor.dumps(stringutil.evalpython(payload[5:]), canonical=True)
230 payload = cbor.dumps(stringutil.evalpython(payload[5:]), canonical=True)
228
231
229 else:
232 else:
230 payload = stringutil.unescapestr(payload)
233 payload = stringutil.unescapestr(payload)
231
234
232 return makeframe(requestid=requestid, streamid=streamid,
235 return makeframe(requestid=requestid, streamid=streamid,
233 streamflags=finalstreamflags, typeid=frametype,
236 streamflags=finalstreamflags, typeid=frametype,
234 flags=finalflags, payload=payload)
237 flags=finalflags, payload=payload)
235
238
236 def parseheader(data):
239 def parseheader(data):
237 """Parse a unified framing protocol frame header from a buffer.
240 """Parse a unified framing protocol frame header from a buffer.
238
241
239 The header is expected to be in the buffer at offset 0 and the
242 The header is expected to be in the buffer at offset 0 and the
240 buffer is expected to be large enough to hold a full header.
243 buffer is expected to be large enough to hold a full header.
241 """
244 """
242 # 24 bits payload length (little endian)
245 # 24 bits payload length (little endian)
243 # 16 bits request ID
246 # 16 bits request ID
244 # 8 bits stream ID
247 # 8 bits stream ID
245 # 8 bits stream flags
248 # 8 bits stream flags
246 # 4 bits frame type
249 # 4 bits frame type
247 # 4 bits frame flags
250 # 4 bits frame flags
248 # ... payload
251 # ... payload
249 framelength = data[0] + 256 * data[1] + 16384 * data[2]
252 framelength = data[0] + 256 * data[1] + 16384 * data[2]
250 requestid, streamid, streamflags = struct.unpack_from(r'<HBB', data, 3)
253 requestid, streamid, streamflags = struct.unpack_from(r'<HBB', data, 3)
251 typeflags = data[7]
254 typeflags = data[7]
252
255
253 frametype = (typeflags & 0xf0) >> 4
256 frametype = (typeflags & 0xf0) >> 4
254 frameflags = typeflags & 0x0f
257 frameflags = typeflags & 0x0f
255
258
256 return frameheader(framelength, requestid, streamid, streamflags,
259 return frameheader(framelength, requestid, streamid, streamflags,
257 frametype, frameflags)
260 frametype, frameflags)
258
261
259 def readframe(fh):
262 def readframe(fh):
260 """Read a unified framing protocol frame from a file object.
263 """Read a unified framing protocol frame from a file object.
261
264
262 Returns a 3-tuple of (type, flags, payload) for the decoded frame or
265 Returns a 3-tuple of (type, flags, payload) for the decoded frame or
263 None if no frame is available. May raise if a malformed frame is
266 None if no frame is available. May raise if a malformed frame is
264 seen.
267 seen.
265 """
268 """
266 header = bytearray(FRAME_HEADER_SIZE)
269 header = bytearray(FRAME_HEADER_SIZE)
267
270
268 readcount = fh.readinto(header)
271 readcount = fh.readinto(header)
269
272
270 if readcount == 0:
273 if readcount == 0:
271 return None
274 return None
272
275
273 if readcount != FRAME_HEADER_SIZE:
276 if readcount != FRAME_HEADER_SIZE:
274 raise error.Abort(_('received incomplete frame: got %d bytes: %s') %
277 raise error.Abort(_('received incomplete frame: got %d bytes: %s') %
275 (readcount, header))
278 (readcount, header))
276
279
277 h = parseheader(header)
280 h = parseheader(header)
278
281
279 payload = fh.read(h.length)
282 payload = fh.read(h.length)
280 if len(payload) != h.length:
283 if len(payload) != h.length:
281 raise error.Abort(_('frame length error: expected %d; got %d') %
284 raise error.Abort(_('frame length error: expected %d; got %d') %
282 (h.length, len(payload)))
285 (h.length, len(payload)))
283
286
284 return frame(h.requestid, h.streamid, h.streamflags, h.typeid, h.flags,
287 return frame(h.requestid, h.streamid, h.streamflags, h.typeid, h.flags,
285 payload)
288 payload)
286
289
287 def createcommandframes(stream, requestid, cmd, args, datafh=None,
290 def createcommandframes(stream, requestid, cmd, args, datafh=None,
288 maxframesize=DEFAULT_MAX_FRAME_SIZE):
291 maxframesize=DEFAULT_MAX_FRAME_SIZE):
289 """Create frames necessary to transmit a request to run a command.
292 """Create frames necessary to transmit a request to run a command.
290
293
291 This is a generator of bytearrays. Each item represents a frame
294 This is a generator of bytearrays. Each item represents a frame
292 ready to be sent over the wire to a peer.
295 ready to be sent over the wire to a peer.
293 """
296 """
294 data = {b'name': cmd}
297 data = {b'name': cmd}
295 if args:
298 if args:
296 data[b'args'] = args
299 data[b'args'] = args
297
300
298 data = cbor.dumps(data, canonical=True)
301 data = cbor.dumps(data, canonical=True)
299
302
300 offset = 0
303 offset = 0
301
304
302 while True:
305 while True:
303 flags = 0
306 flags = 0
304
307
305 # Must set new or continuation flag.
308 # Must set new or continuation flag.
306 if not offset:
309 if not offset:
307 flags |= FLAG_COMMAND_REQUEST_NEW
310 flags |= FLAG_COMMAND_REQUEST_NEW
308 else:
311 else:
309 flags |= FLAG_COMMAND_REQUEST_CONTINUATION
312 flags |= FLAG_COMMAND_REQUEST_CONTINUATION
310
313
311 # Data frames is set on all frames.
314 # Data frames is set on all frames.
312 if datafh:
315 if datafh:
313 flags |= FLAG_COMMAND_REQUEST_EXPECT_DATA
316 flags |= FLAG_COMMAND_REQUEST_EXPECT_DATA
314
317
315 payload = data[offset:offset + maxframesize]
318 payload = data[offset:offset + maxframesize]
316 offset += len(payload)
319 offset += len(payload)
317
320
318 if len(payload) == maxframesize and offset < len(data):
321 if len(payload) == maxframesize and offset < len(data):
319 flags |= FLAG_COMMAND_REQUEST_MORE_FRAMES
322 flags |= FLAG_COMMAND_REQUEST_MORE_FRAMES
320
323
321 yield stream.makeframe(requestid=requestid,
324 yield stream.makeframe(requestid=requestid,
322 typeid=FRAME_TYPE_COMMAND_REQUEST,
325 typeid=FRAME_TYPE_COMMAND_REQUEST,
323 flags=flags,
326 flags=flags,
324 payload=payload)
327 payload=payload)
325
328
326 if not (flags & FLAG_COMMAND_REQUEST_MORE_FRAMES):
329 if not (flags & FLAG_COMMAND_REQUEST_MORE_FRAMES):
327 break
330 break
328
331
329 if datafh:
332 if datafh:
330 while True:
333 while True:
331 data = datafh.read(DEFAULT_MAX_FRAME_SIZE)
334 data = datafh.read(DEFAULT_MAX_FRAME_SIZE)
332
335
333 done = False
336 done = False
334 if len(data) == DEFAULT_MAX_FRAME_SIZE:
337 if len(data) == DEFAULT_MAX_FRAME_SIZE:
335 flags = FLAG_COMMAND_DATA_CONTINUATION
338 flags = FLAG_COMMAND_DATA_CONTINUATION
336 else:
339 else:
337 flags = FLAG_COMMAND_DATA_EOS
340 flags = FLAG_COMMAND_DATA_EOS
338 assert datafh.read(1) == b''
341 assert datafh.read(1) == b''
339 done = True
342 done = True
340
343
341 yield stream.makeframe(requestid=requestid,
344 yield stream.makeframe(requestid=requestid,
342 typeid=FRAME_TYPE_COMMAND_DATA,
345 typeid=FRAME_TYPE_COMMAND_DATA,
343 flags=flags,
346 flags=flags,
344 payload=data)
347 payload=data)
345
348
346 if done:
349 if done:
347 break
350 break
348
351
349 def createbytesresponseframesfrombytes(stream, requestid, data,
352 def createbytesresponseframesfrombytes(stream, requestid, data,
350 maxframesize=DEFAULT_MAX_FRAME_SIZE):
353 maxframesize=DEFAULT_MAX_FRAME_SIZE):
351 """Create a raw frame to send a bytes response from static bytes input.
354 """Create a raw frame to send a bytes response from static bytes input.
352
355
353 Returns a generator of bytearrays.
356 Returns a generator of bytearrays.
354 """
357 """
355
358
356 # Simple case of a single frame.
359 # Simple case of a single frame.
357 if len(data) <= maxframesize:
360 if len(data) <= maxframesize:
358 yield stream.makeframe(requestid=requestid,
361 yield stream.makeframe(requestid=requestid,
359 typeid=FRAME_TYPE_BYTES_RESPONSE,
362 typeid=FRAME_TYPE_BYTES_RESPONSE,
360 flags=FLAG_BYTES_RESPONSE_EOS,
363 flags=FLAG_BYTES_RESPONSE_EOS,
361 payload=data)
364 payload=data)
362 return
365 return
363
366
364 offset = 0
367 offset = 0
365 while True:
368 while True:
366 chunk = data[offset:offset + maxframesize]
369 chunk = data[offset:offset + maxframesize]
367 offset += len(chunk)
370 offset += len(chunk)
368 done = offset == len(data)
371 done = offset == len(data)
369
372
370 if done:
373 if done:
371 flags = FLAG_BYTES_RESPONSE_EOS
374 flags = FLAG_BYTES_RESPONSE_EOS
372 else:
375 else:
373 flags = FLAG_BYTES_RESPONSE_CONTINUATION
376 flags = FLAG_BYTES_RESPONSE_CONTINUATION
374
377
375 yield stream.makeframe(requestid=requestid,
378 yield stream.makeframe(requestid=requestid,
376 typeid=FRAME_TYPE_BYTES_RESPONSE,
379 typeid=FRAME_TYPE_BYTES_RESPONSE,
377 flags=flags,
380 flags=flags,
378 payload=chunk)
381 payload=chunk)
379
382
380 if done:
383 if done:
381 break
384 break
382
385
383 def createerrorframe(stream, requestid, msg, protocol=False, application=False):
386 def createerrorframe(stream, requestid, msg, protocol=False, application=False):
384 # TODO properly handle frame size limits.
387 # TODO properly handle frame size limits.
385 assert len(msg) <= DEFAULT_MAX_FRAME_SIZE
388 assert len(msg) <= DEFAULT_MAX_FRAME_SIZE
386
389
387 flags = 0
390 flags = 0
388 if protocol:
391 if protocol:
389 flags |= FLAG_ERROR_RESPONSE_PROTOCOL
392 flags |= FLAG_ERROR_RESPONSE_PROTOCOL
390 if application:
393 if application:
391 flags |= FLAG_ERROR_RESPONSE_APPLICATION
394 flags |= FLAG_ERROR_RESPONSE_APPLICATION
392
395
393 yield stream.makeframe(requestid=requestid,
396 yield stream.makeframe(requestid=requestid,
394 typeid=FRAME_TYPE_ERROR_RESPONSE,
397 typeid=FRAME_TYPE_ERROR_RESPONSE,
395 flags=flags,
398 flags=flags,
396 payload=msg)
399 payload=msg)
397
400
398 def createtextoutputframe(stream, requestid, atoms,
401 def createtextoutputframe(stream, requestid, atoms,
399 maxframesize=DEFAULT_MAX_FRAME_SIZE):
402 maxframesize=DEFAULT_MAX_FRAME_SIZE):
400 """Create a text output frame to render text to people.
403 """Create a text output frame to render text to people.
401
404
402 ``atoms`` is a 3-tuple of (formatting string, args, labels).
405 ``atoms`` is a 3-tuple of (formatting string, args, labels).
403
406
404 The formatting string contains ``%s`` tokens to be replaced by the
407 The formatting string contains ``%s`` tokens to be replaced by the
405 corresponding indexed entry in ``args``. ``labels`` is an iterable of
408 corresponding indexed entry in ``args``. ``labels`` is an iterable of
406 formatters to be applied at rendering time. In terms of the ``ui``
409 formatters to be applied at rendering time. In terms of the ``ui``
407 class, each atom corresponds to a ``ui.write()``.
410 class, each atom corresponds to a ``ui.write()``.
408 """
411 """
409 atomdicts = []
412 atomdicts = []
410
413
411 for (formatting, args, labels) in atoms:
414 for (formatting, args, labels) in atoms:
412 # TODO look for localstr, other types here?
415 # TODO look for localstr, other types here?
413
416
414 if not isinstance(formatting, bytes):
417 if not isinstance(formatting, bytes):
415 raise ValueError('must use bytes formatting strings')
418 raise ValueError('must use bytes formatting strings')
416 for arg in args:
419 for arg in args:
417 if not isinstance(arg, bytes):
420 if not isinstance(arg, bytes):
418 raise ValueError('must use bytes for arguments')
421 raise ValueError('must use bytes for arguments')
419 for label in labels:
422 for label in labels:
420 if not isinstance(label, bytes):
423 if not isinstance(label, bytes):
421 raise ValueError('must use bytes for labels')
424 raise ValueError('must use bytes for labels')
422
425
423 # Formatting string must be ASCII.
426 # Formatting string must be ASCII.
424 formatting = formatting.decode(r'ascii', r'replace').encode(r'ascii')
427 formatting = formatting.decode(r'ascii', r'replace').encode(r'ascii')
425
428
426 # Arguments must be UTF-8.
429 # Arguments must be UTF-8.
427 args = [a.decode(r'utf-8', r'replace').encode(r'utf-8') for a in args]
430 args = [a.decode(r'utf-8', r'replace').encode(r'utf-8') for a in args]
428
431
429 # Labels must be ASCII.
432 # Labels must be ASCII.
430 labels = [l.decode(r'ascii', r'strict').encode(r'ascii')
433 labels = [l.decode(r'ascii', r'strict').encode(r'ascii')
431 for l in labels]
434 for l in labels]
432
435
433 atom = {b'msg': formatting}
436 atom = {b'msg': formatting}
434 if args:
437 if args:
435 atom[b'args'] = args
438 atom[b'args'] = args
436 if labels:
439 if labels:
437 atom[b'labels'] = labels
440 atom[b'labels'] = labels
438
441
439 atomdicts.append(atom)
442 atomdicts.append(atom)
440
443
441 payload = cbor.dumps(atomdicts, canonical=True)
444 payload = cbor.dumps(atomdicts, canonical=True)
442
445
443 if len(payload) > maxframesize:
446 if len(payload) > maxframesize:
444 raise ValueError('cannot encode data in a single frame')
447 raise ValueError('cannot encode data in a single frame')
445
448
446 yield stream.makeframe(requestid=requestid,
449 yield stream.makeframe(requestid=requestid,
447 typeid=FRAME_TYPE_TEXT_OUTPUT,
450 typeid=FRAME_TYPE_TEXT_OUTPUT,
448 flags=0,
451 flags=0,
449 payload=payload)
452 payload=payload)
450
453
451 class stream(object):
454 class stream(object):
452 """Represents a logical unidirectional series of frames."""
455 """Represents a logical unidirectional series of frames."""
453
456
454 def __init__(self, streamid, active=False):
457 def __init__(self, streamid, active=False):
455 self.streamid = streamid
458 self.streamid = streamid
456 self._active = False
459 self._active = False
457
460
458 def makeframe(self, requestid, typeid, flags, payload):
461 def makeframe(self, requestid, typeid, flags, payload):
459 """Create a frame to be sent out over this stream.
462 """Create a frame to be sent out over this stream.
460
463
461 Only returns the frame instance. Does not actually send it.
464 Only returns the frame instance. Does not actually send it.
462 """
465 """
463 streamflags = 0
466 streamflags = 0
464 if not self._active:
467 if not self._active:
465 streamflags |= STREAM_FLAG_BEGIN_STREAM
468 streamflags |= STREAM_FLAG_BEGIN_STREAM
466 self._active = True
469 self._active = True
467
470
468 return makeframe(requestid, self.streamid, streamflags, typeid, flags,
471 return makeframe(requestid, self.streamid, streamflags, typeid, flags,
469 payload)
472 payload)
470
473
471 def ensureserverstream(stream):
474 def ensureserverstream(stream):
472 if stream.streamid % 2:
475 if stream.streamid % 2:
473 raise error.ProgrammingError('server should only write to even '
476 raise error.ProgrammingError('server should only write to even '
474 'numbered streams; %d is not even' %
477 'numbered streams; %d is not even' %
475 stream.streamid)
478 stream.streamid)
476
479
477 class serverreactor(object):
480 class serverreactor(object):
478 """Holds state of a server handling frame-based protocol requests.
481 """Holds state of a server handling frame-based protocol requests.
479
482
480 This class is the "brain" of the unified frame-based protocol server
483 This class is the "brain" of the unified frame-based protocol server
481 component. While the protocol is stateless from the perspective of
484 component. While the protocol is stateless from the perspective of
482 requests/commands, something needs to track which frames have been
485 requests/commands, something needs to track which frames have been
483 received, what frames to expect, etc. This class is that thing.
486 received, what frames to expect, etc. This class is that thing.
484
487
485 Instances are modeled as a state machine of sorts. Instances are also
488 Instances are modeled as a state machine of sorts. Instances are also
486 reactionary to external events. The point of this class is to encapsulate
489 reactionary to external events. The point of this class is to encapsulate
487 the state of the connection and the exchange of frames, not to perform
490 the state of the connection and the exchange of frames, not to perform
488 work. Instead, callers tell this class when something occurs, like a
491 work. Instead, callers tell this class when something occurs, like a
489 frame arriving. If that activity is worthy of a follow-up action (say
492 frame arriving. If that activity is worthy of a follow-up action (say
490 *run a command*), the return value of that handler will say so.
493 *run a command*), the return value of that handler will say so.
491
494
492 I/O and CPU intensive operations are purposefully delegated outside of
495 I/O and CPU intensive operations are purposefully delegated outside of
493 this class.
496 this class.
494
497
495 Consumers are expected to tell instances when events occur. They do so by
498 Consumers are expected to tell instances when events occur. They do so by
496 calling the various ``on*`` methods. These methods return a 2-tuple
499 calling the various ``on*`` methods. These methods return a 2-tuple
497 describing any follow-up action(s) to take. The first element is the
500 describing any follow-up action(s) to take. The first element is the
498 name of an action to perform. The second is a data structure (usually
501 name of an action to perform. The second is a data structure (usually
499 a dict) specific to that action that contains more information. e.g.
502 a dict) specific to that action that contains more information. e.g.
500 if the server wants to send frames back to the client, the data structure
503 if the server wants to send frames back to the client, the data structure
501 will contain a reference to those frames.
504 will contain a reference to those frames.
502
505
503 Valid actions that consumers can be instructed to take are:
506 Valid actions that consumers can be instructed to take are:
504
507
505 sendframes
508 sendframes
506 Indicates that frames should be sent to the client. The ``framegen``
509 Indicates that frames should be sent to the client. The ``framegen``
507 key contains a generator of frames that should be sent. The server
510 key contains a generator of frames that should be sent. The server
508 assumes that all frames are sent to the client.
511 assumes that all frames are sent to the client.
509
512
510 error
513 error
511 Indicates that an error occurred. Consumer should probably abort.
514 Indicates that an error occurred. Consumer should probably abort.
512
515
513 runcommand
516 runcommand
514 Indicates that the consumer should run a wire protocol command. Details
517 Indicates that the consumer should run a wire protocol command. Details
515 of the command to run are given in the data structure.
518 of the command to run are given in the data structure.
516
519
517 wantframe
520 wantframe
518 Indicates that nothing of interest happened and the server is waiting on
521 Indicates that nothing of interest happened and the server is waiting on
519 more frames from the client before anything interesting can be done.
522 more frames from the client before anything interesting can be done.
520
523
521 noop
524 noop
522 Indicates no additional action is required.
525 Indicates no additional action is required.
523
526
524 Known Issues
527 Known Issues
525 ------------
528 ------------
526
529
527 There are no limits to the number of partially received commands or their
530 There are no limits to the number of partially received commands or their
528 size. A malicious client could stream command request data and exhaust the
531 size. A malicious client could stream command request data and exhaust the
529 server's memory.
532 server's memory.
530
533
531 Partially received commands are not acted upon when end of input is
534 Partially received commands are not acted upon when end of input is
532 reached. Should the server error if it receives a partial request?
535 reached. Should the server error if it receives a partial request?
533 Should the client send a message to abort a partially transmitted request
536 Should the client send a message to abort a partially transmitted request
534 to facilitate graceful shutdown?
537 to facilitate graceful shutdown?
535
538
536 Active requests that haven't been responded to aren't tracked. This means
539 Active requests that haven't been responded to aren't tracked. This means
537 that if we receive a command and instruct its dispatch, another command
540 that if we receive a command and instruct its dispatch, another command
538 with its request ID can come in over the wire and there will be a race
541 with its request ID can come in over the wire and there will be a race
539 between who responds to what.
542 between who responds to what.
540 """
543 """
541
544
542 def __init__(self, deferoutput=False):
545 def __init__(self, deferoutput=False):
543 """Construct a new server reactor.
546 """Construct a new server reactor.
544
547
545 ``deferoutput`` can be used to indicate that no output frames should be
548 ``deferoutput`` can be used to indicate that no output frames should be
546 instructed to be sent until input has been exhausted. In this mode,
549 instructed to be sent until input has been exhausted. In this mode,
547 events that would normally generate output frames (such as a command
550 events that would normally generate output frames (such as a command
548 response being ready) will instead defer instructing the consumer to
551 response being ready) will instead defer instructing the consumer to
549 send those frames. This is useful for half-duplex transports where the
552 send those frames. This is useful for half-duplex transports where the
550 sender cannot receive until all data has been transmitted.
553 sender cannot receive until all data has been transmitted.
551 """
554 """
552 self._deferoutput = deferoutput
555 self._deferoutput = deferoutput
553 self._state = 'idle'
556 self._state = 'idle'
554 self._nextoutgoingstreamid = 2
557 self._nextoutgoingstreamid = 2
555 self._bufferedframegens = []
558 self._bufferedframegens = []
556 # stream id -> stream instance for all active streams from the client.
559 # stream id -> stream instance for all active streams from the client.
557 self._incomingstreams = {}
560 self._incomingstreams = {}
558 self._outgoingstreams = {}
561 self._outgoingstreams = {}
559 # request id -> dict of commands that are actively being received.
562 # request id -> dict of commands that are actively being received.
560 self._receivingcommands = {}
563 self._receivingcommands = {}
561 # Request IDs that have been received and are actively being processed.
564 # Request IDs that have been received and are actively being processed.
562 # Once all output for a request has been sent, it is removed from this
565 # Once all output for a request has been sent, it is removed from this
563 # set.
566 # set.
564 self._activecommands = set()
567 self._activecommands = set()
565
568
566 def onframerecv(self, frame):
569 def onframerecv(self, frame):
567 """Process a frame that has been received off the wire.
570 """Process a frame that has been received off the wire.
568
571
569 Returns a dict with an ``action`` key that details what action,
572 Returns a dict with an ``action`` key that details what action,
570 if any, the consumer should take next.
573 if any, the consumer should take next.
571 """
574 """
572 if not frame.streamid % 2:
575 if not frame.streamid % 2:
573 self._state = 'errored'
576 self._state = 'errored'
574 return self._makeerrorresult(
577 return self._makeerrorresult(
575 _('received frame with even numbered stream ID: %d') %
578 _('received frame with even numbered stream ID: %d') %
576 frame.streamid)
579 frame.streamid)
577
580
578 if frame.streamid not in self._incomingstreams:
581 if frame.streamid not in self._incomingstreams:
579 if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM:
582 if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM:
580 self._state = 'errored'
583 self._state = 'errored'
581 return self._makeerrorresult(
584 return self._makeerrorresult(
582 _('received frame on unknown inactive stream without '
585 _('received frame on unknown inactive stream without '
583 'beginning of stream flag set'))
586 'beginning of stream flag set'))
584
587
585 self._incomingstreams[frame.streamid] = stream(frame.streamid)
588 self._incomingstreams[frame.streamid] = stream(frame.streamid)
586
589
587 if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED:
590 if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED:
588 # TODO handle decoding frames
591 # TODO handle decoding frames
589 self._state = 'errored'
592 self._state = 'errored'
590 raise error.ProgrammingError('support for decoding stream payloads '
593 raise error.ProgrammingError('support for decoding stream payloads '
591 'not yet implemented')
594 'not yet implemented')
592
595
593 if frame.streamflags & STREAM_FLAG_END_STREAM:
596 if frame.streamflags & STREAM_FLAG_END_STREAM:
594 del self._incomingstreams[frame.streamid]
597 del self._incomingstreams[frame.streamid]
595
598
596 handlers = {
599 handlers = {
597 'idle': self._onframeidle,
600 'idle': self._onframeidle,
598 'command-receiving': self._onframecommandreceiving,
601 'command-receiving': self._onframecommandreceiving,
599 'errored': self._onframeerrored,
602 'errored': self._onframeerrored,
600 }
603 }
601
604
602 meth = handlers.get(self._state)
605 meth = handlers.get(self._state)
603 if not meth:
606 if not meth:
604 raise error.ProgrammingError('unhandled state: %s' % self._state)
607 raise error.ProgrammingError('unhandled state: %s' % self._state)
605
608
606 return meth(frame)
609 return meth(frame)
607
610
608 def onbytesresponseready(self, stream, requestid, data):
611 def onbytesresponseready(self, stream, requestid, data):
609 """Signal that a bytes response is ready to be sent to the client.
612 """Signal that a bytes response is ready to be sent to the client.
610
613
611 The raw bytes response is passed as an argument.
614 The raw bytes response is passed as an argument.
612 """
615 """
613 ensureserverstream(stream)
616 ensureserverstream(stream)
614
617
615 def sendframes():
618 def sendframes():
616 for frame in createbytesresponseframesfrombytes(stream, requestid,
619 for frame in createbytesresponseframesfrombytes(stream, requestid,
617 data):
620 data):
618 yield frame
621 yield frame
619
622
620 self._activecommands.remove(requestid)
623 self._activecommands.remove(requestid)
621
624
622 result = sendframes()
625 result = sendframes()
623
626
624 if self._deferoutput:
627 if self._deferoutput:
625 self._bufferedframegens.append(result)
628 self._bufferedframegens.append(result)
626 return 'noop', {}
629 return 'noop', {}
627 else:
630 else:
628 return 'sendframes', {
631 return 'sendframes', {
629 'framegen': result,
632 'framegen': result,
630 }
633 }
631
634
632 def oninputeof(self):
635 def oninputeof(self):
633 """Signals that end of input has been received.
636 """Signals that end of input has been received.
634
637
635 No more frames will be received. All pending activity should be
638 No more frames will be received. All pending activity should be
636 completed.
639 completed.
637 """
640 """
638 # TODO should we do anything about in-flight commands?
641 # TODO should we do anything about in-flight commands?
639
642
640 if not self._deferoutput or not self._bufferedframegens:
643 if not self._deferoutput or not self._bufferedframegens:
641 return 'noop', {}
644 return 'noop', {}
642
645
643 # If we buffered all our responses, emit those.
646 # If we buffered all our responses, emit those.
644 def makegen():
647 def makegen():
645 for gen in self._bufferedframegens:
648 for gen in self._bufferedframegens:
646 for frame in gen:
649 for frame in gen:
647 yield frame
650 yield frame
648
651
649 return 'sendframes', {
652 return 'sendframes', {
650 'framegen': makegen(),
653 'framegen': makegen(),
651 }
654 }
652
655
653 def onapplicationerror(self, stream, requestid, msg):
656 def onapplicationerror(self, stream, requestid, msg):
654 ensureserverstream(stream)
657 ensureserverstream(stream)
655
658
656 return 'sendframes', {
659 return 'sendframes', {
657 'framegen': createerrorframe(stream, requestid, msg,
660 'framegen': createerrorframe(stream, requestid, msg,
658 application=True),
661 application=True),
659 }
662 }
660
663
661 def makeoutputstream(self):
664 def makeoutputstream(self):
662 """Create a stream to be used for sending data to the client."""
665 """Create a stream to be used for sending data to the client."""
663 streamid = self._nextoutgoingstreamid
666 streamid = self._nextoutgoingstreamid
664 self._nextoutgoingstreamid += 2
667 self._nextoutgoingstreamid += 2
665
668
666 s = stream(streamid)
669 s = stream(streamid)
667 self._outgoingstreams[streamid] = s
670 self._outgoingstreams[streamid] = s
668
671
669 return s
672 return s
670
673
671 def _makeerrorresult(self, msg):
674 def _makeerrorresult(self, msg):
672 return 'error', {
675 return 'error', {
673 'message': msg,
676 'message': msg,
674 }
677 }
675
678
676 def _makeruncommandresult(self, requestid):
679 def _makeruncommandresult(self, requestid):
677 entry = self._receivingcommands[requestid]
680 entry = self._receivingcommands[requestid]
678
681
679 if not entry['requestdone']:
682 if not entry['requestdone']:
680 self._state = 'errored'
683 self._state = 'errored'
681 raise error.ProgrammingError('should not be called without '
684 raise error.ProgrammingError('should not be called without '
682 'requestdone set')
685 'requestdone set')
683
686
684 del self._receivingcommands[requestid]
687 del self._receivingcommands[requestid]
685
688
686 if self._receivingcommands:
689 if self._receivingcommands:
687 self._state = 'command-receiving'
690 self._state = 'command-receiving'
688 else:
691 else:
689 self._state = 'idle'
692 self._state = 'idle'
690
693
691 # Decode the payloads as CBOR.
694 # Decode the payloads as CBOR.
692 entry['payload'].seek(0)
695 entry['payload'].seek(0)
693 request = cbor.load(entry['payload'])
696 request = cbor.load(entry['payload'])
694
697
695 if b'name' not in request:
698 if b'name' not in request:
696 self._state = 'errored'
699 self._state = 'errored'
697 return self._makeerrorresult(
700 return self._makeerrorresult(
698 _('command request missing "name" field'))
701 _('command request missing "name" field'))
699
702
700 if b'args' not in request:
703 if b'args' not in request:
701 request[b'args'] = {}
704 request[b'args'] = {}
702
705
703 assert requestid not in self._activecommands
706 assert requestid not in self._activecommands
704 self._activecommands.add(requestid)
707 self._activecommands.add(requestid)
705
708
706 return 'runcommand', {
709 return 'runcommand', {
707 'requestid': requestid,
710 'requestid': requestid,
708 'command': request[b'name'],
711 'command': request[b'name'],
709 'args': request[b'args'],
712 'args': request[b'args'],
710 'data': entry['data'].getvalue() if entry['data'] else None,
713 'data': entry['data'].getvalue() if entry['data'] else None,
711 }
714 }
712
715
713 def _makewantframeresult(self):
716 def _makewantframeresult(self):
714 return 'wantframe', {
717 return 'wantframe', {
715 'state': self._state,
718 'state': self._state,
716 }
719 }
717
720
718 def _validatecommandrequestframe(self, frame):
721 def _validatecommandrequestframe(self, frame):
719 new = frame.flags & FLAG_COMMAND_REQUEST_NEW
722 new = frame.flags & FLAG_COMMAND_REQUEST_NEW
720 continuation = frame.flags & FLAG_COMMAND_REQUEST_CONTINUATION
723 continuation = frame.flags & FLAG_COMMAND_REQUEST_CONTINUATION
721
724
722 if new and continuation:
725 if new and continuation:
723 self._state = 'errored'
726 self._state = 'errored'
724 return self._makeerrorresult(
727 return self._makeerrorresult(
725 _('received command request frame with both new and '
728 _('received command request frame with both new and '
726 'continuation flags set'))
729 'continuation flags set'))
727
730
728 if not new and not continuation:
731 if not new and not continuation:
729 self._state = 'errored'
732 self._state = 'errored'
730 return self._makeerrorresult(
733 return self._makeerrorresult(
731 _('received command request frame with neither new nor '
734 _('received command request frame with neither new nor '
732 'continuation flags set'))
735 'continuation flags set'))
733
736
734 def _onframeidle(self, frame):
737 def _onframeidle(self, frame):
735 # The only frame type that should be received in this state is a
738 # The only frame type that should be received in this state is a
736 # command request.
739 # command request.
737 if frame.typeid != FRAME_TYPE_COMMAND_REQUEST:
740 if frame.typeid != FRAME_TYPE_COMMAND_REQUEST:
738 self._state = 'errored'
741 self._state = 'errored'
739 return self._makeerrorresult(
742 return self._makeerrorresult(
740 _('expected command request frame; got %d') % frame.typeid)
743 _('expected command request frame; got %d') % frame.typeid)
741
744
742 res = self._validatecommandrequestframe(frame)
745 res = self._validatecommandrequestframe(frame)
743 if res:
746 if res:
744 return res
747 return res
745
748
746 if frame.requestid in self._receivingcommands:
749 if frame.requestid in self._receivingcommands:
747 self._state = 'errored'
750 self._state = 'errored'
748 return self._makeerrorresult(
751 return self._makeerrorresult(
749 _('request with ID %d already received') % frame.requestid)
752 _('request with ID %d already received') % frame.requestid)
750
753
751 if frame.requestid in self._activecommands:
754 if frame.requestid in self._activecommands:
752 self._state = 'errored'
755 self._state = 'errored'
753 return self._makeerrorresult(
756 return self._makeerrorresult(
754 _('request with ID %d is already active') % frame.requestid)
757 _('request with ID %d is already active') % frame.requestid)
755
758
756 new = frame.flags & FLAG_COMMAND_REQUEST_NEW
759 new = frame.flags & FLAG_COMMAND_REQUEST_NEW
757 moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES
760 moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES
758 expectingdata = frame.flags & FLAG_COMMAND_REQUEST_EXPECT_DATA
761 expectingdata = frame.flags & FLAG_COMMAND_REQUEST_EXPECT_DATA
759
762
760 if not new:
763 if not new:
761 self._state = 'errored'
764 self._state = 'errored'
762 return self._makeerrorresult(
765 return self._makeerrorresult(
763 _('received command request frame without new flag set'))
766 _('received command request frame without new flag set'))
764
767
765 payload = util.bytesio()
768 payload = util.bytesio()
766 payload.write(frame.payload)
769 payload.write(frame.payload)
767
770
768 self._receivingcommands[frame.requestid] = {
771 self._receivingcommands[frame.requestid] = {
769 'payload': payload,
772 'payload': payload,
770 'data': None,
773 'data': None,
771 'requestdone': not moreframes,
774 'requestdone': not moreframes,
772 'expectingdata': bool(expectingdata),
775 'expectingdata': bool(expectingdata),
773 }
776 }
774
777
775 # This is the final frame for this request. Dispatch it.
778 # This is the final frame for this request. Dispatch it.
776 if not moreframes and not expectingdata:
779 if not moreframes and not expectingdata:
777 return self._makeruncommandresult(frame.requestid)
780 return self._makeruncommandresult(frame.requestid)
778
781
779 assert moreframes or expectingdata
782 assert moreframes or expectingdata
780 self._state = 'command-receiving'
783 self._state = 'command-receiving'
781 return self._makewantframeresult()
784 return self._makewantframeresult()
782
785
783 def _onframecommandreceiving(self, frame):
786 def _onframecommandreceiving(self, frame):
784 if frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
787 if frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
785 # Process new command requests as such.
788 # Process new command requests as such.
786 if frame.flags & FLAG_COMMAND_REQUEST_NEW:
789 if frame.flags & FLAG_COMMAND_REQUEST_NEW:
787 return self._onframeidle(frame)
790 return self._onframeidle(frame)
788
791
789 res = self._validatecommandrequestframe(frame)
792 res = self._validatecommandrequestframe(frame)
790 if res:
793 if res:
791 return res
794 return res
792
795
793 # All other frames should be related to a command that is currently
796 # All other frames should be related to a command that is currently
794 # receiving but is not active.
797 # receiving but is not active.
795 if frame.requestid in self._activecommands:
798 if frame.requestid in self._activecommands:
796 self._state = 'errored'
799 self._state = 'errored'
797 return self._makeerrorresult(
800 return self._makeerrorresult(
798 _('received frame for request that is still active: %d') %
801 _('received frame for request that is still active: %d') %
799 frame.requestid)
802 frame.requestid)
800
803
801 if frame.requestid not in self._receivingcommands:
804 if frame.requestid not in self._receivingcommands:
802 self._state = 'errored'
805 self._state = 'errored'
803 return self._makeerrorresult(
806 return self._makeerrorresult(
804 _('received frame for request that is not receiving: %d') %
807 _('received frame for request that is not receiving: %d') %
805 frame.requestid)
808 frame.requestid)
806
809
807 entry = self._receivingcommands[frame.requestid]
810 entry = self._receivingcommands[frame.requestid]
808
811
809 if frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
812 if frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
810 moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES
813 moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES
811 expectingdata = bool(frame.flags & FLAG_COMMAND_REQUEST_EXPECT_DATA)
814 expectingdata = bool(frame.flags & FLAG_COMMAND_REQUEST_EXPECT_DATA)
812
815
813 if entry['requestdone']:
816 if entry['requestdone']:
814 self._state = 'errored'
817 self._state = 'errored'
815 return self._makeerrorresult(
818 return self._makeerrorresult(
816 _('received command request frame when request frames '
819 _('received command request frame when request frames '
817 'were supposedly done'))
820 'were supposedly done'))
818
821
819 if expectingdata != entry['expectingdata']:
822 if expectingdata != entry['expectingdata']:
820 self._state = 'errored'
823 self._state = 'errored'
821 return self._makeerrorresult(
824 return self._makeerrorresult(
822 _('mismatch between expect data flag and previous frame'))
825 _('mismatch between expect data flag and previous frame'))
823
826
824 entry['payload'].write(frame.payload)
827 entry['payload'].write(frame.payload)
825
828
826 if not moreframes:
829 if not moreframes:
827 entry['requestdone'] = True
830 entry['requestdone'] = True
828
831
829 if not moreframes and not expectingdata:
832 if not moreframes and not expectingdata:
830 return self._makeruncommandresult(frame.requestid)
833 return self._makeruncommandresult(frame.requestid)
831
834
832 return self._makewantframeresult()
835 return self._makewantframeresult()
833
836
834 elif frame.typeid == FRAME_TYPE_COMMAND_DATA:
837 elif frame.typeid == FRAME_TYPE_COMMAND_DATA:
835 if not entry['expectingdata']:
838 if not entry['expectingdata']:
836 self._state = 'errored'
839 self._state = 'errored'
837 return self._makeerrorresult(_(
840 return self._makeerrorresult(_(
838 'received command data frame for request that is not '
841 'received command data frame for request that is not '
839 'expecting data: %d') % frame.requestid)
842 'expecting data: %d') % frame.requestid)
840
843
841 if entry['data'] is None:
844 if entry['data'] is None:
842 entry['data'] = util.bytesio()
845 entry['data'] = util.bytesio()
843
846
844 return self._handlecommanddataframe(frame, entry)
847 return self._handlecommanddataframe(frame, entry)
845 else:
848 else:
846 self._state = 'errored'
849 self._state = 'errored'
847 return self._makeerrorresult(_(
850 return self._makeerrorresult(_(
848 'received unexpected frame type: %d') % frame.typeid)
851 'received unexpected frame type: %d') % frame.typeid)
849
852
850 def _handlecommanddataframe(self, frame, entry):
853 def _handlecommanddataframe(self, frame, entry):
851 assert frame.typeid == FRAME_TYPE_COMMAND_DATA
854 assert frame.typeid == FRAME_TYPE_COMMAND_DATA
852
855
853 # TODO support streaming data instead of buffering it.
856 # TODO support streaming data instead of buffering it.
854 entry['data'].write(frame.payload)
857 entry['data'].write(frame.payload)
855
858
856 if frame.flags & FLAG_COMMAND_DATA_CONTINUATION:
859 if frame.flags & FLAG_COMMAND_DATA_CONTINUATION:
857 return self._makewantframeresult()
860 return self._makewantframeresult()
858 elif frame.flags & FLAG_COMMAND_DATA_EOS:
861 elif frame.flags & FLAG_COMMAND_DATA_EOS:
859 entry['data'].seek(0)
862 entry['data'].seek(0)
860 return self._makeruncommandresult(frame.requestid)
863 return self._makeruncommandresult(frame.requestid)
861 else:
864 else:
862 self._state = 'errored'
865 self._state = 'errored'
863 return self._makeerrorresult(_('command data frame without '
866 return self._makeerrorresult(_('command data frame without '
864 'flags'))
867 'flags'))
865
868
866 def _onframeerrored(self, frame):
869 def _onframeerrored(self, frame):
867 return self._makeerrorresult(_('server already errored'))
870 return self._makeerrorresult(_('server already errored'))
General Comments 0
You need to be logged in to leave comments. Login now