##// END OF EJS Templates
py3: system-stringify repr(frame)...
Yuya Nishihara -
r37492:d3399712 default
parent child Browse files
Show More
@@ -1,870 +1,872
1 # wireprotoframing.py - unified framing protocol for wire protocol
1 # wireprotoframing.py - unified framing protocol for wire protocol
2 #
2 #
3 # Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
3 # Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 # This file contains functionality to support the unified frame-based wire
8 # This file contains functionality to support the unified frame-based wire
9 # protocol. For details about the protocol, see
9 # protocol. For details about the protocol, see
10 # `hg help internals.wireprotocol`.
10 # `hg help internals.wireprotocol`.
11
11
12 from __future__ import absolute_import
12 from __future__ import absolute_import
13
13
14 import struct
14 import struct
15
15
16 from .i18n import _
16 from .i18n import _
17 from .thirdparty import (
17 from .thirdparty import (
18 attr,
18 attr,
19 cbor,
19 cbor,
20 )
20 )
21 from . import (
21 from . import (
22 encoding,
22 error,
23 error,
23 util,
24 util,
24 )
25 )
25 from .utils import (
26 from .utils import (
26 stringutil,
27 stringutil,
27 )
28 )
28
29
29 FRAME_HEADER_SIZE = 8
30 FRAME_HEADER_SIZE = 8
30 DEFAULT_MAX_FRAME_SIZE = 32768
31 DEFAULT_MAX_FRAME_SIZE = 32768
31
32
32 STREAM_FLAG_BEGIN_STREAM = 0x01
33 STREAM_FLAG_BEGIN_STREAM = 0x01
33 STREAM_FLAG_END_STREAM = 0x02
34 STREAM_FLAG_END_STREAM = 0x02
34 STREAM_FLAG_ENCODING_APPLIED = 0x04
35 STREAM_FLAG_ENCODING_APPLIED = 0x04
35
36
36 STREAM_FLAGS = {
37 STREAM_FLAGS = {
37 b'stream-begin': STREAM_FLAG_BEGIN_STREAM,
38 b'stream-begin': STREAM_FLAG_BEGIN_STREAM,
38 b'stream-end': STREAM_FLAG_END_STREAM,
39 b'stream-end': STREAM_FLAG_END_STREAM,
39 b'encoded': STREAM_FLAG_ENCODING_APPLIED,
40 b'encoded': STREAM_FLAG_ENCODING_APPLIED,
40 }
41 }
41
42
42 FRAME_TYPE_COMMAND_REQUEST = 0x01
43 FRAME_TYPE_COMMAND_REQUEST = 0x01
43 FRAME_TYPE_COMMAND_DATA = 0x03
44 FRAME_TYPE_COMMAND_DATA = 0x03
44 FRAME_TYPE_BYTES_RESPONSE = 0x04
45 FRAME_TYPE_BYTES_RESPONSE = 0x04
45 FRAME_TYPE_ERROR_RESPONSE = 0x05
46 FRAME_TYPE_ERROR_RESPONSE = 0x05
46 FRAME_TYPE_TEXT_OUTPUT = 0x06
47 FRAME_TYPE_TEXT_OUTPUT = 0x06
47 FRAME_TYPE_PROGRESS = 0x07
48 FRAME_TYPE_PROGRESS = 0x07
48 FRAME_TYPE_STREAM_SETTINGS = 0x08
49 FRAME_TYPE_STREAM_SETTINGS = 0x08
49
50
50 FRAME_TYPES = {
51 FRAME_TYPES = {
51 b'command-request': FRAME_TYPE_COMMAND_REQUEST,
52 b'command-request': FRAME_TYPE_COMMAND_REQUEST,
52 b'command-data': FRAME_TYPE_COMMAND_DATA,
53 b'command-data': FRAME_TYPE_COMMAND_DATA,
53 b'bytes-response': FRAME_TYPE_BYTES_RESPONSE,
54 b'bytes-response': FRAME_TYPE_BYTES_RESPONSE,
54 b'error-response': FRAME_TYPE_ERROR_RESPONSE,
55 b'error-response': FRAME_TYPE_ERROR_RESPONSE,
55 b'text-output': FRAME_TYPE_TEXT_OUTPUT,
56 b'text-output': FRAME_TYPE_TEXT_OUTPUT,
56 b'progress': FRAME_TYPE_PROGRESS,
57 b'progress': FRAME_TYPE_PROGRESS,
57 b'stream-settings': FRAME_TYPE_STREAM_SETTINGS,
58 b'stream-settings': FRAME_TYPE_STREAM_SETTINGS,
58 }
59 }
59
60
60 FLAG_COMMAND_REQUEST_NEW = 0x01
61 FLAG_COMMAND_REQUEST_NEW = 0x01
61 FLAG_COMMAND_REQUEST_CONTINUATION = 0x02
62 FLAG_COMMAND_REQUEST_CONTINUATION = 0x02
62 FLAG_COMMAND_REQUEST_MORE_FRAMES = 0x04
63 FLAG_COMMAND_REQUEST_MORE_FRAMES = 0x04
63 FLAG_COMMAND_REQUEST_EXPECT_DATA = 0x08
64 FLAG_COMMAND_REQUEST_EXPECT_DATA = 0x08
64
65
65 FLAGS_COMMAND_REQUEST = {
66 FLAGS_COMMAND_REQUEST = {
66 b'new': FLAG_COMMAND_REQUEST_NEW,
67 b'new': FLAG_COMMAND_REQUEST_NEW,
67 b'continuation': FLAG_COMMAND_REQUEST_CONTINUATION,
68 b'continuation': FLAG_COMMAND_REQUEST_CONTINUATION,
68 b'more': FLAG_COMMAND_REQUEST_MORE_FRAMES,
69 b'more': FLAG_COMMAND_REQUEST_MORE_FRAMES,
69 b'have-data': FLAG_COMMAND_REQUEST_EXPECT_DATA,
70 b'have-data': FLAG_COMMAND_REQUEST_EXPECT_DATA,
70 }
71 }
71
72
72 FLAG_COMMAND_DATA_CONTINUATION = 0x01
73 FLAG_COMMAND_DATA_CONTINUATION = 0x01
73 FLAG_COMMAND_DATA_EOS = 0x02
74 FLAG_COMMAND_DATA_EOS = 0x02
74
75
75 FLAGS_COMMAND_DATA = {
76 FLAGS_COMMAND_DATA = {
76 b'continuation': FLAG_COMMAND_DATA_CONTINUATION,
77 b'continuation': FLAG_COMMAND_DATA_CONTINUATION,
77 b'eos': FLAG_COMMAND_DATA_EOS,
78 b'eos': FLAG_COMMAND_DATA_EOS,
78 }
79 }
79
80
80 FLAG_BYTES_RESPONSE_CONTINUATION = 0x01
81 FLAG_BYTES_RESPONSE_CONTINUATION = 0x01
81 FLAG_BYTES_RESPONSE_EOS = 0x02
82 FLAG_BYTES_RESPONSE_EOS = 0x02
82 FLAG_BYTES_RESPONSE_CBOR = 0x04
83 FLAG_BYTES_RESPONSE_CBOR = 0x04
83
84
84 FLAGS_BYTES_RESPONSE = {
85 FLAGS_BYTES_RESPONSE = {
85 b'continuation': FLAG_BYTES_RESPONSE_CONTINUATION,
86 b'continuation': FLAG_BYTES_RESPONSE_CONTINUATION,
86 b'eos': FLAG_BYTES_RESPONSE_EOS,
87 b'eos': FLAG_BYTES_RESPONSE_EOS,
87 b'cbor': FLAG_BYTES_RESPONSE_CBOR,
88 b'cbor': FLAG_BYTES_RESPONSE_CBOR,
88 }
89 }
89
90
90 FLAG_ERROR_RESPONSE_PROTOCOL = 0x01
91 FLAG_ERROR_RESPONSE_PROTOCOL = 0x01
91 FLAG_ERROR_RESPONSE_APPLICATION = 0x02
92 FLAG_ERROR_RESPONSE_APPLICATION = 0x02
92
93
93 FLAGS_ERROR_RESPONSE = {
94 FLAGS_ERROR_RESPONSE = {
94 b'protocol': FLAG_ERROR_RESPONSE_PROTOCOL,
95 b'protocol': FLAG_ERROR_RESPONSE_PROTOCOL,
95 b'application': FLAG_ERROR_RESPONSE_APPLICATION,
96 b'application': FLAG_ERROR_RESPONSE_APPLICATION,
96 }
97 }
97
98
98 # Maps frame types to their available flags.
99 # Maps frame types to their available flags.
99 FRAME_TYPE_FLAGS = {
100 FRAME_TYPE_FLAGS = {
100 FRAME_TYPE_COMMAND_REQUEST: FLAGS_COMMAND_REQUEST,
101 FRAME_TYPE_COMMAND_REQUEST: FLAGS_COMMAND_REQUEST,
101 FRAME_TYPE_COMMAND_DATA: FLAGS_COMMAND_DATA,
102 FRAME_TYPE_COMMAND_DATA: FLAGS_COMMAND_DATA,
102 FRAME_TYPE_BYTES_RESPONSE: FLAGS_BYTES_RESPONSE,
103 FRAME_TYPE_BYTES_RESPONSE: FLAGS_BYTES_RESPONSE,
103 FRAME_TYPE_ERROR_RESPONSE: FLAGS_ERROR_RESPONSE,
104 FRAME_TYPE_ERROR_RESPONSE: FLAGS_ERROR_RESPONSE,
104 FRAME_TYPE_TEXT_OUTPUT: {},
105 FRAME_TYPE_TEXT_OUTPUT: {},
105 FRAME_TYPE_PROGRESS: {},
106 FRAME_TYPE_PROGRESS: {},
106 FRAME_TYPE_STREAM_SETTINGS: {},
107 FRAME_TYPE_STREAM_SETTINGS: {},
107 }
108 }
108
109
109 ARGUMENT_RECORD_HEADER = struct.Struct(r'<HH')
110 ARGUMENT_RECORD_HEADER = struct.Struct(r'<HH')
110
111
111 def humanflags(mapping, value):
112 def humanflags(mapping, value):
112 """Convert a numeric flags value to a human value, using a mapping table."""
113 """Convert a numeric flags value to a human value, using a mapping table."""
113 namemap = {v: k for k, v in mapping.iteritems()}
114 namemap = {v: k for k, v in mapping.iteritems()}
114 flags = []
115 flags = []
115 val = 1
116 val = 1
116 while value >= val:
117 while value >= val:
117 if value & val:
118 if value & val:
118 flags.append(namemap.get(val, '<unknown 0x%02x>' % val))
119 flags.append(namemap.get(val, '<unknown 0x%02x>' % val))
119 val <<= 1
120 val <<= 1
120
121
121 return b'|'.join(flags)
122 return b'|'.join(flags)
122
123
123 @attr.s(slots=True)
124 @attr.s(slots=True)
124 class frameheader(object):
125 class frameheader(object):
125 """Represents the data in a frame header."""
126 """Represents the data in a frame header."""
126
127
127 length = attr.ib()
128 length = attr.ib()
128 requestid = attr.ib()
129 requestid = attr.ib()
129 streamid = attr.ib()
130 streamid = attr.ib()
130 streamflags = attr.ib()
131 streamflags = attr.ib()
131 typeid = attr.ib()
132 typeid = attr.ib()
132 flags = attr.ib()
133 flags = attr.ib()
133
134
134 @attr.s(slots=True, repr=False)
135 @attr.s(slots=True, repr=False)
135 class frame(object):
136 class frame(object):
136 """Represents a parsed frame."""
137 """Represents a parsed frame."""
137
138
138 requestid = attr.ib()
139 requestid = attr.ib()
139 streamid = attr.ib()
140 streamid = attr.ib()
140 streamflags = attr.ib()
141 streamflags = attr.ib()
141 typeid = attr.ib()
142 typeid = attr.ib()
142 flags = attr.ib()
143 flags = attr.ib()
143 payload = attr.ib()
144 payload = attr.ib()
144
145
146 @encoding.strmethod
145 def __repr__(self):
147 def __repr__(self):
146 typename = '<unknown 0x%02x>' % self.typeid
148 typename = '<unknown 0x%02x>' % self.typeid
147 for name, value in FRAME_TYPES.iteritems():
149 for name, value in FRAME_TYPES.iteritems():
148 if value == self.typeid:
150 if value == self.typeid:
149 typename = name
151 typename = name
150 break
152 break
151
153
152 return ('frame(size=%d; request=%d; stream=%d; streamflags=%s; '
154 return ('frame(size=%d; request=%d; stream=%d; streamflags=%s; '
153 'type=%s; flags=%s)' % (
155 'type=%s; flags=%s)' % (
154 len(self.payload), self.requestid, self.streamid,
156 len(self.payload), self.requestid, self.streamid,
155 humanflags(STREAM_FLAGS, self.streamflags), typename,
157 humanflags(STREAM_FLAGS, self.streamflags), typename,
156 humanflags(FRAME_TYPE_FLAGS.get(self.typeid, {}), self.flags)))
158 humanflags(FRAME_TYPE_FLAGS.get(self.typeid, {}), self.flags)))
157
159
158 def makeframe(requestid, streamid, streamflags, typeid, flags, payload):
160 def makeframe(requestid, streamid, streamflags, typeid, flags, payload):
159 """Assemble a frame into a byte array."""
161 """Assemble a frame into a byte array."""
160 # TODO assert size of payload.
162 # TODO assert size of payload.
161 frame = bytearray(FRAME_HEADER_SIZE + len(payload))
163 frame = bytearray(FRAME_HEADER_SIZE + len(payload))
162
164
163 # 24 bits length
165 # 24 bits length
164 # 16 bits request id
166 # 16 bits request id
165 # 8 bits stream id
167 # 8 bits stream id
166 # 8 bits stream flags
168 # 8 bits stream flags
167 # 4 bits type
169 # 4 bits type
168 # 4 bits flags
170 # 4 bits flags
169
171
170 l = struct.pack(r'<I', len(payload))
172 l = struct.pack(r'<I', len(payload))
171 frame[0:3] = l[0:3]
173 frame[0:3] = l[0:3]
172 struct.pack_into(r'<HBB', frame, 3, requestid, streamid, streamflags)
174 struct.pack_into(r'<HBB', frame, 3, requestid, streamid, streamflags)
173 frame[7] = (typeid << 4) | flags
175 frame[7] = (typeid << 4) | flags
174 frame[8:] = payload
176 frame[8:] = payload
175
177
176 return frame
178 return frame
177
179
178 def makeframefromhumanstring(s):
180 def makeframefromhumanstring(s):
179 """Create a frame from a human readable string
181 """Create a frame from a human readable string
180
182
181 DANGER: NOT SAFE TO USE WITH UNTRUSTED INPUT BECAUSE OF POTENTIAL
183 DANGER: NOT SAFE TO USE WITH UNTRUSTED INPUT BECAUSE OF POTENTIAL
182 eval() USAGE. DO NOT USE IN CORE.
184 eval() USAGE. DO NOT USE IN CORE.
183
185
184 Strings have the form:
186 Strings have the form:
185
187
186 <request-id> <stream-id> <stream-flags> <type> <flags> <payload>
188 <request-id> <stream-id> <stream-flags> <type> <flags> <payload>
187
189
188 This can be used by user-facing applications and tests for creating
190 This can be used by user-facing applications and tests for creating
189 frames easily without having to type out a bunch of constants.
191 frames easily without having to type out a bunch of constants.
190
192
191 Request ID and stream IDs are integers.
193 Request ID and stream IDs are integers.
192
194
193 Stream flags, frame type, and flags can be specified by integer or
195 Stream flags, frame type, and flags can be specified by integer or
194 named constant.
196 named constant.
195
197
196 Flags can be delimited by `|` to bitwise OR them together.
198 Flags can be delimited by `|` to bitwise OR them together.
197
199
198 If the payload begins with ``cbor:``, the following string will be
200 If the payload begins with ``cbor:``, the following string will be
199 evaluated as Python code and the resulting object will be fed into
201 evaluated as Python code and the resulting object will be fed into
200 a CBOR encoder. Otherwise, the payload is interpreted as a Python
202 a CBOR encoder. Otherwise, the payload is interpreted as a Python
201 byte string literal.
203 byte string literal.
202 """
204 """
203 fields = s.split(b' ', 5)
205 fields = s.split(b' ', 5)
204 requestid, streamid, streamflags, frametype, frameflags, payload = fields
206 requestid, streamid, streamflags, frametype, frameflags, payload = fields
205
207
206 requestid = int(requestid)
208 requestid = int(requestid)
207 streamid = int(streamid)
209 streamid = int(streamid)
208
210
209 finalstreamflags = 0
211 finalstreamflags = 0
210 for flag in streamflags.split(b'|'):
212 for flag in streamflags.split(b'|'):
211 if flag in STREAM_FLAGS:
213 if flag in STREAM_FLAGS:
212 finalstreamflags |= STREAM_FLAGS[flag]
214 finalstreamflags |= STREAM_FLAGS[flag]
213 else:
215 else:
214 finalstreamflags |= int(flag)
216 finalstreamflags |= int(flag)
215
217
216 if frametype in FRAME_TYPES:
218 if frametype in FRAME_TYPES:
217 frametype = FRAME_TYPES[frametype]
219 frametype = FRAME_TYPES[frametype]
218 else:
220 else:
219 frametype = int(frametype)
221 frametype = int(frametype)
220
222
221 finalflags = 0
223 finalflags = 0
222 validflags = FRAME_TYPE_FLAGS[frametype]
224 validflags = FRAME_TYPE_FLAGS[frametype]
223 for flag in frameflags.split(b'|'):
225 for flag in frameflags.split(b'|'):
224 if flag in validflags:
226 if flag in validflags:
225 finalflags |= validflags[flag]
227 finalflags |= validflags[flag]
226 else:
228 else:
227 finalflags |= int(flag)
229 finalflags |= int(flag)
228
230
229 if payload.startswith(b'cbor:'):
231 if payload.startswith(b'cbor:'):
230 payload = cbor.dumps(stringutil.evalpython(payload[5:]), canonical=True)
232 payload = cbor.dumps(stringutil.evalpython(payload[5:]), canonical=True)
231
233
232 else:
234 else:
233 payload = stringutil.unescapestr(payload)
235 payload = stringutil.unescapestr(payload)
234
236
235 return makeframe(requestid=requestid, streamid=streamid,
237 return makeframe(requestid=requestid, streamid=streamid,
236 streamflags=finalstreamflags, typeid=frametype,
238 streamflags=finalstreamflags, typeid=frametype,
237 flags=finalflags, payload=payload)
239 flags=finalflags, payload=payload)
238
240
239 def parseheader(data):
241 def parseheader(data):
240 """Parse a unified framing protocol frame header from a buffer.
242 """Parse a unified framing protocol frame header from a buffer.
241
243
242 The header is expected to be in the buffer at offset 0 and the
244 The header is expected to be in the buffer at offset 0 and the
243 buffer is expected to be large enough to hold a full header.
245 buffer is expected to be large enough to hold a full header.
244 """
246 """
245 # 24 bits payload length (little endian)
247 # 24 bits payload length (little endian)
246 # 16 bits request ID
248 # 16 bits request ID
247 # 8 bits stream ID
249 # 8 bits stream ID
248 # 8 bits stream flags
250 # 8 bits stream flags
249 # 4 bits frame type
251 # 4 bits frame type
250 # 4 bits frame flags
252 # 4 bits frame flags
251 # ... payload
253 # ... payload
252 framelength = data[0] + 256 * data[1] + 16384 * data[2]
254 framelength = data[0] + 256 * data[1] + 16384 * data[2]
253 requestid, streamid, streamflags = struct.unpack_from(r'<HBB', data, 3)
255 requestid, streamid, streamflags = struct.unpack_from(r'<HBB', data, 3)
254 typeflags = data[7]
256 typeflags = data[7]
255
257
256 frametype = (typeflags & 0xf0) >> 4
258 frametype = (typeflags & 0xf0) >> 4
257 frameflags = typeflags & 0x0f
259 frameflags = typeflags & 0x0f
258
260
259 return frameheader(framelength, requestid, streamid, streamflags,
261 return frameheader(framelength, requestid, streamid, streamflags,
260 frametype, frameflags)
262 frametype, frameflags)
261
263
262 def readframe(fh):
264 def readframe(fh):
263 """Read a unified framing protocol frame from a file object.
265 """Read a unified framing protocol frame from a file object.
264
266
265 Returns a 3-tuple of (type, flags, payload) for the decoded frame or
267 Returns a 3-tuple of (type, flags, payload) for the decoded frame or
266 None if no frame is available. May raise if a malformed frame is
268 None if no frame is available. May raise if a malformed frame is
267 seen.
269 seen.
268 """
270 """
269 header = bytearray(FRAME_HEADER_SIZE)
271 header = bytearray(FRAME_HEADER_SIZE)
270
272
271 readcount = fh.readinto(header)
273 readcount = fh.readinto(header)
272
274
273 if readcount == 0:
275 if readcount == 0:
274 return None
276 return None
275
277
276 if readcount != FRAME_HEADER_SIZE:
278 if readcount != FRAME_HEADER_SIZE:
277 raise error.Abort(_('received incomplete frame: got %d bytes: %s') %
279 raise error.Abort(_('received incomplete frame: got %d bytes: %s') %
278 (readcount, header))
280 (readcount, header))
279
281
280 h = parseheader(header)
282 h = parseheader(header)
281
283
282 payload = fh.read(h.length)
284 payload = fh.read(h.length)
283 if len(payload) != h.length:
285 if len(payload) != h.length:
284 raise error.Abort(_('frame length error: expected %d; got %d') %
286 raise error.Abort(_('frame length error: expected %d; got %d') %
285 (h.length, len(payload)))
287 (h.length, len(payload)))
286
288
287 return frame(h.requestid, h.streamid, h.streamflags, h.typeid, h.flags,
289 return frame(h.requestid, h.streamid, h.streamflags, h.typeid, h.flags,
288 payload)
290 payload)
289
291
290 def createcommandframes(stream, requestid, cmd, args, datafh=None,
292 def createcommandframes(stream, requestid, cmd, args, datafh=None,
291 maxframesize=DEFAULT_MAX_FRAME_SIZE):
293 maxframesize=DEFAULT_MAX_FRAME_SIZE):
292 """Create frames necessary to transmit a request to run a command.
294 """Create frames necessary to transmit a request to run a command.
293
295
294 This is a generator of bytearrays. Each item represents a frame
296 This is a generator of bytearrays. Each item represents a frame
295 ready to be sent over the wire to a peer.
297 ready to be sent over the wire to a peer.
296 """
298 """
297 data = {b'name': cmd}
299 data = {b'name': cmd}
298 if args:
300 if args:
299 data[b'args'] = args
301 data[b'args'] = args
300
302
301 data = cbor.dumps(data, canonical=True)
303 data = cbor.dumps(data, canonical=True)
302
304
303 offset = 0
305 offset = 0
304
306
305 while True:
307 while True:
306 flags = 0
308 flags = 0
307
309
308 # Must set new or continuation flag.
310 # Must set new or continuation flag.
309 if not offset:
311 if not offset:
310 flags |= FLAG_COMMAND_REQUEST_NEW
312 flags |= FLAG_COMMAND_REQUEST_NEW
311 else:
313 else:
312 flags |= FLAG_COMMAND_REQUEST_CONTINUATION
314 flags |= FLAG_COMMAND_REQUEST_CONTINUATION
313
315
314 # Data frames is set on all frames.
316 # Data frames is set on all frames.
315 if datafh:
317 if datafh:
316 flags |= FLAG_COMMAND_REQUEST_EXPECT_DATA
318 flags |= FLAG_COMMAND_REQUEST_EXPECT_DATA
317
319
318 payload = data[offset:offset + maxframesize]
320 payload = data[offset:offset + maxframesize]
319 offset += len(payload)
321 offset += len(payload)
320
322
321 if len(payload) == maxframesize and offset < len(data):
323 if len(payload) == maxframesize and offset < len(data):
322 flags |= FLAG_COMMAND_REQUEST_MORE_FRAMES
324 flags |= FLAG_COMMAND_REQUEST_MORE_FRAMES
323
325
324 yield stream.makeframe(requestid=requestid,
326 yield stream.makeframe(requestid=requestid,
325 typeid=FRAME_TYPE_COMMAND_REQUEST,
327 typeid=FRAME_TYPE_COMMAND_REQUEST,
326 flags=flags,
328 flags=flags,
327 payload=payload)
329 payload=payload)
328
330
329 if not (flags & FLAG_COMMAND_REQUEST_MORE_FRAMES):
331 if not (flags & FLAG_COMMAND_REQUEST_MORE_FRAMES):
330 break
332 break
331
333
332 if datafh:
334 if datafh:
333 while True:
335 while True:
334 data = datafh.read(DEFAULT_MAX_FRAME_SIZE)
336 data = datafh.read(DEFAULT_MAX_FRAME_SIZE)
335
337
336 done = False
338 done = False
337 if len(data) == DEFAULT_MAX_FRAME_SIZE:
339 if len(data) == DEFAULT_MAX_FRAME_SIZE:
338 flags = FLAG_COMMAND_DATA_CONTINUATION
340 flags = FLAG_COMMAND_DATA_CONTINUATION
339 else:
341 else:
340 flags = FLAG_COMMAND_DATA_EOS
342 flags = FLAG_COMMAND_DATA_EOS
341 assert datafh.read(1) == b''
343 assert datafh.read(1) == b''
342 done = True
344 done = True
343
345
344 yield stream.makeframe(requestid=requestid,
346 yield stream.makeframe(requestid=requestid,
345 typeid=FRAME_TYPE_COMMAND_DATA,
347 typeid=FRAME_TYPE_COMMAND_DATA,
346 flags=flags,
348 flags=flags,
347 payload=data)
349 payload=data)
348
350
349 if done:
351 if done:
350 break
352 break
351
353
352 def createbytesresponseframesfrombytes(stream, requestid, data,
354 def createbytesresponseframesfrombytes(stream, requestid, data,
353 maxframesize=DEFAULT_MAX_FRAME_SIZE):
355 maxframesize=DEFAULT_MAX_FRAME_SIZE):
354 """Create a raw frame to send a bytes response from static bytes input.
356 """Create a raw frame to send a bytes response from static bytes input.
355
357
356 Returns a generator of bytearrays.
358 Returns a generator of bytearrays.
357 """
359 """
358
360
359 # Simple case of a single frame.
361 # Simple case of a single frame.
360 if len(data) <= maxframesize:
362 if len(data) <= maxframesize:
361 yield stream.makeframe(requestid=requestid,
363 yield stream.makeframe(requestid=requestid,
362 typeid=FRAME_TYPE_BYTES_RESPONSE,
364 typeid=FRAME_TYPE_BYTES_RESPONSE,
363 flags=FLAG_BYTES_RESPONSE_EOS,
365 flags=FLAG_BYTES_RESPONSE_EOS,
364 payload=data)
366 payload=data)
365 return
367 return
366
368
367 offset = 0
369 offset = 0
368 while True:
370 while True:
369 chunk = data[offset:offset + maxframesize]
371 chunk = data[offset:offset + maxframesize]
370 offset += len(chunk)
372 offset += len(chunk)
371 done = offset == len(data)
373 done = offset == len(data)
372
374
373 if done:
375 if done:
374 flags = FLAG_BYTES_RESPONSE_EOS
376 flags = FLAG_BYTES_RESPONSE_EOS
375 else:
377 else:
376 flags = FLAG_BYTES_RESPONSE_CONTINUATION
378 flags = FLAG_BYTES_RESPONSE_CONTINUATION
377
379
378 yield stream.makeframe(requestid=requestid,
380 yield stream.makeframe(requestid=requestid,
379 typeid=FRAME_TYPE_BYTES_RESPONSE,
381 typeid=FRAME_TYPE_BYTES_RESPONSE,
380 flags=flags,
382 flags=flags,
381 payload=chunk)
383 payload=chunk)
382
384
383 if done:
385 if done:
384 break
386 break
385
387
386 def createerrorframe(stream, requestid, msg, protocol=False, application=False):
388 def createerrorframe(stream, requestid, msg, protocol=False, application=False):
387 # TODO properly handle frame size limits.
389 # TODO properly handle frame size limits.
388 assert len(msg) <= DEFAULT_MAX_FRAME_SIZE
390 assert len(msg) <= DEFAULT_MAX_FRAME_SIZE
389
391
390 flags = 0
392 flags = 0
391 if protocol:
393 if protocol:
392 flags |= FLAG_ERROR_RESPONSE_PROTOCOL
394 flags |= FLAG_ERROR_RESPONSE_PROTOCOL
393 if application:
395 if application:
394 flags |= FLAG_ERROR_RESPONSE_APPLICATION
396 flags |= FLAG_ERROR_RESPONSE_APPLICATION
395
397
396 yield stream.makeframe(requestid=requestid,
398 yield stream.makeframe(requestid=requestid,
397 typeid=FRAME_TYPE_ERROR_RESPONSE,
399 typeid=FRAME_TYPE_ERROR_RESPONSE,
398 flags=flags,
400 flags=flags,
399 payload=msg)
401 payload=msg)
400
402
401 def createtextoutputframe(stream, requestid, atoms,
403 def createtextoutputframe(stream, requestid, atoms,
402 maxframesize=DEFAULT_MAX_FRAME_SIZE):
404 maxframesize=DEFAULT_MAX_FRAME_SIZE):
403 """Create a text output frame to render text to people.
405 """Create a text output frame to render text to people.
404
406
405 ``atoms`` is a 3-tuple of (formatting string, args, labels).
407 ``atoms`` is a 3-tuple of (formatting string, args, labels).
406
408
407 The formatting string contains ``%s`` tokens to be replaced by the
409 The formatting string contains ``%s`` tokens to be replaced by the
408 corresponding indexed entry in ``args``. ``labels`` is an iterable of
410 corresponding indexed entry in ``args``. ``labels`` is an iterable of
409 formatters to be applied at rendering time. In terms of the ``ui``
411 formatters to be applied at rendering time. In terms of the ``ui``
410 class, each atom corresponds to a ``ui.write()``.
412 class, each atom corresponds to a ``ui.write()``.
411 """
413 """
412 atomdicts = []
414 atomdicts = []
413
415
414 for (formatting, args, labels) in atoms:
416 for (formatting, args, labels) in atoms:
415 # TODO look for localstr, other types here?
417 # TODO look for localstr, other types here?
416
418
417 if not isinstance(formatting, bytes):
419 if not isinstance(formatting, bytes):
418 raise ValueError('must use bytes formatting strings')
420 raise ValueError('must use bytes formatting strings')
419 for arg in args:
421 for arg in args:
420 if not isinstance(arg, bytes):
422 if not isinstance(arg, bytes):
421 raise ValueError('must use bytes for arguments')
423 raise ValueError('must use bytes for arguments')
422 for label in labels:
424 for label in labels:
423 if not isinstance(label, bytes):
425 if not isinstance(label, bytes):
424 raise ValueError('must use bytes for labels')
426 raise ValueError('must use bytes for labels')
425
427
426 # Formatting string must be ASCII.
428 # Formatting string must be ASCII.
427 formatting = formatting.decode(r'ascii', r'replace').encode(r'ascii')
429 formatting = formatting.decode(r'ascii', r'replace').encode(r'ascii')
428
430
429 # Arguments must be UTF-8.
431 # Arguments must be UTF-8.
430 args = [a.decode(r'utf-8', r'replace').encode(r'utf-8') for a in args]
432 args = [a.decode(r'utf-8', r'replace').encode(r'utf-8') for a in args]
431
433
432 # Labels must be ASCII.
434 # Labels must be ASCII.
433 labels = [l.decode(r'ascii', r'strict').encode(r'ascii')
435 labels = [l.decode(r'ascii', r'strict').encode(r'ascii')
434 for l in labels]
436 for l in labels]
435
437
436 atom = {b'msg': formatting}
438 atom = {b'msg': formatting}
437 if args:
439 if args:
438 atom[b'args'] = args
440 atom[b'args'] = args
439 if labels:
441 if labels:
440 atom[b'labels'] = labels
442 atom[b'labels'] = labels
441
443
442 atomdicts.append(atom)
444 atomdicts.append(atom)
443
445
444 payload = cbor.dumps(atomdicts, canonical=True)
446 payload = cbor.dumps(atomdicts, canonical=True)
445
447
446 if len(payload) > maxframesize:
448 if len(payload) > maxframesize:
447 raise ValueError('cannot encode data in a single frame')
449 raise ValueError('cannot encode data in a single frame')
448
450
449 yield stream.makeframe(requestid=requestid,
451 yield stream.makeframe(requestid=requestid,
450 typeid=FRAME_TYPE_TEXT_OUTPUT,
452 typeid=FRAME_TYPE_TEXT_OUTPUT,
451 flags=0,
453 flags=0,
452 payload=payload)
454 payload=payload)
453
455
454 class stream(object):
456 class stream(object):
455 """Represents a logical unidirectional series of frames."""
457 """Represents a logical unidirectional series of frames."""
456
458
457 def __init__(self, streamid, active=False):
459 def __init__(self, streamid, active=False):
458 self.streamid = streamid
460 self.streamid = streamid
459 self._active = False
461 self._active = False
460
462
461 def makeframe(self, requestid, typeid, flags, payload):
463 def makeframe(self, requestid, typeid, flags, payload):
462 """Create a frame to be sent out over this stream.
464 """Create a frame to be sent out over this stream.
463
465
464 Only returns the frame instance. Does not actually send it.
466 Only returns the frame instance. Does not actually send it.
465 """
467 """
466 streamflags = 0
468 streamflags = 0
467 if not self._active:
469 if not self._active:
468 streamflags |= STREAM_FLAG_BEGIN_STREAM
470 streamflags |= STREAM_FLAG_BEGIN_STREAM
469 self._active = True
471 self._active = True
470
472
471 return makeframe(requestid, self.streamid, streamflags, typeid, flags,
473 return makeframe(requestid, self.streamid, streamflags, typeid, flags,
472 payload)
474 payload)
473
475
474 def ensureserverstream(stream):
476 def ensureserverstream(stream):
475 if stream.streamid % 2:
477 if stream.streamid % 2:
476 raise error.ProgrammingError('server should only write to even '
478 raise error.ProgrammingError('server should only write to even '
477 'numbered streams; %d is not even' %
479 'numbered streams; %d is not even' %
478 stream.streamid)
480 stream.streamid)
479
481
480 class serverreactor(object):
482 class serverreactor(object):
481 """Holds state of a server handling frame-based protocol requests.
483 """Holds state of a server handling frame-based protocol requests.
482
484
483 This class is the "brain" of the unified frame-based protocol server
485 This class is the "brain" of the unified frame-based protocol server
484 component. While the protocol is stateless from the perspective of
486 component. While the protocol is stateless from the perspective of
485 requests/commands, something needs to track which frames have been
487 requests/commands, something needs to track which frames have been
486 received, what frames to expect, etc. This class is that thing.
488 received, what frames to expect, etc. This class is that thing.
487
489
488 Instances are modeled as a state machine of sorts. Instances are also
490 Instances are modeled as a state machine of sorts. Instances are also
489 reactionary to external events. The point of this class is to encapsulate
491 reactionary to external events. The point of this class is to encapsulate
490 the state of the connection and the exchange of frames, not to perform
492 the state of the connection and the exchange of frames, not to perform
491 work. Instead, callers tell this class when something occurs, like a
493 work. Instead, callers tell this class when something occurs, like a
492 frame arriving. If that activity is worthy of a follow-up action (say
494 frame arriving. If that activity is worthy of a follow-up action (say
493 *run a command*), the return value of that handler will say so.
495 *run a command*), the return value of that handler will say so.
494
496
495 I/O and CPU intensive operations are purposefully delegated outside of
497 I/O and CPU intensive operations are purposefully delegated outside of
496 this class.
498 this class.
497
499
498 Consumers are expected to tell instances when events occur. They do so by
500 Consumers are expected to tell instances when events occur. They do so by
499 calling the various ``on*`` methods. These methods return a 2-tuple
501 calling the various ``on*`` methods. These methods return a 2-tuple
500 describing any follow-up action(s) to take. The first element is the
502 describing any follow-up action(s) to take. The first element is the
501 name of an action to perform. The second is a data structure (usually
503 name of an action to perform. The second is a data structure (usually
502 a dict) specific to that action that contains more information. e.g.
504 a dict) specific to that action that contains more information. e.g.
503 if the server wants to send frames back to the client, the data structure
505 if the server wants to send frames back to the client, the data structure
504 will contain a reference to those frames.
506 will contain a reference to those frames.
505
507
506 Valid actions that consumers can be instructed to take are:
508 Valid actions that consumers can be instructed to take are:
507
509
508 sendframes
510 sendframes
509 Indicates that frames should be sent to the client. The ``framegen``
511 Indicates that frames should be sent to the client. The ``framegen``
510 key contains a generator of frames that should be sent. The server
512 key contains a generator of frames that should be sent. The server
511 assumes that all frames are sent to the client.
513 assumes that all frames are sent to the client.
512
514
513 error
515 error
514 Indicates that an error occurred. Consumer should probably abort.
516 Indicates that an error occurred. Consumer should probably abort.
515
517
516 runcommand
518 runcommand
517 Indicates that the consumer should run a wire protocol command. Details
519 Indicates that the consumer should run a wire protocol command. Details
518 of the command to run are given in the data structure.
520 of the command to run are given in the data structure.
519
521
520 wantframe
522 wantframe
521 Indicates that nothing of interest happened and the server is waiting on
523 Indicates that nothing of interest happened and the server is waiting on
522 more frames from the client before anything interesting can be done.
524 more frames from the client before anything interesting can be done.
523
525
524 noop
526 noop
525 Indicates no additional action is required.
527 Indicates no additional action is required.
526
528
527 Known Issues
529 Known Issues
528 ------------
530 ------------
529
531
530 There are no limits to the number of partially received commands or their
532 There are no limits to the number of partially received commands or their
531 size. A malicious client could stream command request data and exhaust the
533 size. A malicious client could stream command request data and exhaust the
532 server's memory.
534 server's memory.
533
535
534 Partially received commands are not acted upon when end of input is
536 Partially received commands are not acted upon when end of input is
535 reached. Should the server error if it receives a partial request?
537 reached. Should the server error if it receives a partial request?
536 Should the client send a message to abort a partially transmitted request
538 Should the client send a message to abort a partially transmitted request
537 to facilitate graceful shutdown?
539 to facilitate graceful shutdown?
538
540
539 Active requests that haven't been responded to aren't tracked. This means
541 Active requests that haven't been responded to aren't tracked. This means
540 that if we receive a command and instruct its dispatch, another command
542 that if we receive a command and instruct its dispatch, another command
541 with its request ID can come in over the wire and there will be a race
543 with its request ID can come in over the wire and there will be a race
542 between who responds to what.
544 between who responds to what.
543 """
545 """
544
546
545 def __init__(self, deferoutput=False):
547 def __init__(self, deferoutput=False):
546 """Construct a new server reactor.
548 """Construct a new server reactor.
547
549
548 ``deferoutput`` can be used to indicate that no output frames should be
550 ``deferoutput`` can be used to indicate that no output frames should be
549 instructed to be sent until input has been exhausted. In this mode,
551 instructed to be sent until input has been exhausted. In this mode,
550 events that would normally generate output frames (such as a command
552 events that would normally generate output frames (such as a command
551 response being ready) will instead defer instructing the consumer to
553 response being ready) will instead defer instructing the consumer to
552 send those frames. This is useful for half-duplex transports where the
554 send those frames. This is useful for half-duplex transports where the
553 sender cannot receive until all data has been transmitted.
555 sender cannot receive until all data has been transmitted.
554 """
556 """
555 self._deferoutput = deferoutput
557 self._deferoutput = deferoutput
556 self._state = 'idle'
558 self._state = 'idle'
557 self._nextoutgoingstreamid = 2
559 self._nextoutgoingstreamid = 2
558 self._bufferedframegens = []
560 self._bufferedframegens = []
559 # stream id -> stream instance for all active streams from the client.
561 # stream id -> stream instance for all active streams from the client.
560 self._incomingstreams = {}
562 self._incomingstreams = {}
561 self._outgoingstreams = {}
563 self._outgoingstreams = {}
562 # request id -> dict of commands that are actively being received.
564 # request id -> dict of commands that are actively being received.
563 self._receivingcommands = {}
565 self._receivingcommands = {}
564 # Request IDs that have been received and are actively being processed.
566 # Request IDs that have been received and are actively being processed.
565 # Once all output for a request has been sent, it is removed from this
567 # Once all output for a request has been sent, it is removed from this
566 # set.
568 # set.
567 self._activecommands = set()
569 self._activecommands = set()
568
570
569 def onframerecv(self, frame):
571 def onframerecv(self, frame):
570 """Process a frame that has been received off the wire.
572 """Process a frame that has been received off the wire.
571
573
572 Returns a dict with an ``action`` key that details what action,
574 Returns a dict with an ``action`` key that details what action,
573 if any, the consumer should take next.
575 if any, the consumer should take next.
574 """
576 """
575 if not frame.streamid % 2:
577 if not frame.streamid % 2:
576 self._state = 'errored'
578 self._state = 'errored'
577 return self._makeerrorresult(
579 return self._makeerrorresult(
578 _('received frame with even numbered stream ID: %d') %
580 _('received frame with even numbered stream ID: %d') %
579 frame.streamid)
581 frame.streamid)
580
582
581 if frame.streamid not in self._incomingstreams:
583 if frame.streamid not in self._incomingstreams:
582 if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM:
584 if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM:
583 self._state = 'errored'
585 self._state = 'errored'
584 return self._makeerrorresult(
586 return self._makeerrorresult(
585 _('received frame on unknown inactive stream without '
587 _('received frame on unknown inactive stream without '
586 'beginning of stream flag set'))
588 'beginning of stream flag set'))
587
589
588 self._incomingstreams[frame.streamid] = stream(frame.streamid)
590 self._incomingstreams[frame.streamid] = stream(frame.streamid)
589
591
590 if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED:
592 if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED:
591 # TODO handle decoding frames
593 # TODO handle decoding frames
592 self._state = 'errored'
594 self._state = 'errored'
593 raise error.ProgrammingError('support for decoding stream payloads '
595 raise error.ProgrammingError('support for decoding stream payloads '
594 'not yet implemented')
596 'not yet implemented')
595
597
596 if frame.streamflags & STREAM_FLAG_END_STREAM:
598 if frame.streamflags & STREAM_FLAG_END_STREAM:
597 del self._incomingstreams[frame.streamid]
599 del self._incomingstreams[frame.streamid]
598
600
599 handlers = {
601 handlers = {
600 'idle': self._onframeidle,
602 'idle': self._onframeidle,
601 'command-receiving': self._onframecommandreceiving,
603 'command-receiving': self._onframecommandreceiving,
602 'errored': self._onframeerrored,
604 'errored': self._onframeerrored,
603 }
605 }
604
606
605 meth = handlers.get(self._state)
607 meth = handlers.get(self._state)
606 if not meth:
608 if not meth:
607 raise error.ProgrammingError('unhandled state: %s' % self._state)
609 raise error.ProgrammingError('unhandled state: %s' % self._state)
608
610
609 return meth(frame)
611 return meth(frame)
610
612
611 def onbytesresponseready(self, stream, requestid, data):
613 def onbytesresponseready(self, stream, requestid, data):
612 """Signal that a bytes response is ready to be sent to the client.
614 """Signal that a bytes response is ready to be sent to the client.
613
615
614 The raw bytes response is passed as an argument.
616 The raw bytes response is passed as an argument.
615 """
617 """
616 ensureserverstream(stream)
618 ensureserverstream(stream)
617
619
618 def sendframes():
620 def sendframes():
619 for frame in createbytesresponseframesfrombytes(stream, requestid,
621 for frame in createbytesresponseframesfrombytes(stream, requestid,
620 data):
622 data):
621 yield frame
623 yield frame
622
624
623 self._activecommands.remove(requestid)
625 self._activecommands.remove(requestid)
624
626
625 result = sendframes()
627 result = sendframes()
626
628
627 if self._deferoutput:
629 if self._deferoutput:
628 self._bufferedframegens.append(result)
630 self._bufferedframegens.append(result)
629 return 'noop', {}
631 return 'noop', {}
630 else:
632 else:
631 return 'sendframes', {
633 return 'sendframes', {
632 'framegen': result,
634 'framegen': result,
633 }
635 }
634
636
635 def oninputeof(self):
637 def oninputeof(self):
636 """Signals that end of input has been received.
638 """Signals that end of input has been received.
637
639
638 No more frames will be received. All pending activity should be
640 No more frames will be received. All pending activity should be
639 completed.
641 completed.
640 """
642 """
641 # TODO should we do anything about in-flight commands?
643 # TODO should we do anything about in-flight commands?
642
644
643 if not self._deferoutput or not self._bufferedframegens:
645 if not self._deferoutput or not self._bufferedframegens:
644 return 'noop', {}
646 return 'noop', {}
645
647
646 # If we buffered all our responses, emit those.
648 # If we buffered all our responses, emit those.
647 def makegen():
649 def makegen():
648 for gen in self._bufferedframegens:
650 for gen in self._bufferedframegens:
649 for frame in gen:
651 for frame in gen:
650 yield frame
652 yield frame
651
653
652 return 'sendframes', {
654 return 'sendframes', {
653 'framegen': makegen(),
655 'framegen': makegen(),
654 }
656 }
655
657
656 def onapplicationerror(self, stream, requestid, msg):
658 def onapplicationerror(self, stream, requestid, msg):
657 ensureserverstream(stream)
659 ensureserverstream(stream)
658
660
659 return 'sendframes', {
661 return 'sendframes', {
660 'framegen': createerrorframe(stream, requestid, msg,
662 'framegen': createerrorframe(stream, requestid, msg,
661 application=True),
663 application=True),
662 }
664 }
663
665
664 def makeoutputstream(self):
666 def makeoutputstream(self):
665 """Create a stream to be used for sending data to the client."""
667 """Create a stream to be used for sending data to the client."""
666 streamid = self._nextoutgoingstreamid
668 streamid = self._nextoutgoingstreamid
667 self._nextoutgoingstreamid += 2
669 self._nextoutgoingstreamid += 2
668
670
669 s = stream(streamid)
671 s = stream(streamid)
670 self._outgoingstreams[streamid] = s
672 self._outgoingstreams[streamid] = s
671
673
672 return s
674 return s
673
675
674 def _makeerrorresult(self, msg):
676 def _makeerrorresult(self, msg):
675 return 'error', {
677 return 'error', {
676 'message': msg,
678 'message': msg,
677 }
679 }
678
680
679 def _makeruncommandresult(self, requestid):
681 def _makeruncommandresult(self, requestid):
680 entry = self._receivingcommands[requestid]
682 entry = self._receivingcommands[requestid]
681
683
682 if not entry['requestdone']:
684 if not entry['requestdone']:
683 self._state = 'errored'
685 self._state = 'errored'
684 raise error.ProgrammingError('should not be called without '
686 raise error.ProgrammingError('should not be called without '
685 'requestdone set')
687 'requestdone set')
686
688
687 del self._receivingcommands[requestid]
689 del self._receivingcommands[requestid]
688
690
689 if self._receivingcommands:
691 if self._receivingcommands:
690 self._state = 'command-receiving'
692 self._state = 'command-receiving'
691 else:
693 else:
692 self._state = 'idle'
694 self._state = 'idle'
693
695
694 # Decode the payloads as CBOR.
696 # Decode the payloads as CBOR.
695 entry['payload'].seek(0)
697 entry['payload'].seek(0)
696 request = cbor.load(entry['payload'])
698 request = cbor.load(entry['payload'])
697
699
698 if b'name' not in request:
700 if b'name' not in request:
699 self._state = 'errored'
701 self._state = 'errored'
700 return self._makeerrorresult(
702 return self._makeerrorresult(
701 _('command request missing "name" field'))
703 _('command request missing "name" field'))
702
704
703 if b'args' not in request:
705 if b'args' not in request:
704 request[b'args'] = {}
706 request[b'args'] = {}
705
707
706 assert requestid not in self._activecommands
708 assert requestid not in self._activecommands
707 self._activecommands.add(requestid)
709 self._activecommands.add(requestid)
708
710
709 return 'runcommand', {
711 return 'runcommand', {
710 'requestid': requestid,
712 'requestid': requestid,
711 'command': request[b'name'],
713 'command': request[b'name'],
712 'args': request[b'args'],
714 'args': request[b'args'],
713 'data': entry['data'].getvalue() if entry['data'] else None,
715 'data': entry['data'].getvalue() if entry['data'] else None,
714 }
716 }
715
717
716 def _makewantframeresult(self):
718 def _makewantframeresult(self):
717 return 'wantframe', {
719 return 'wantframe', {
718 'state': self._state,
720 'state': self._state,
719 }
721 }
720
722
721 def _validatecommandrequestframe(self, frame):
723 def _validatecommandrequestframe(self, frame):
722 new = frame.flags & FLAG_COMMAND_REQUEST_NEW
724 new = frame.flags & FLAG_COMMAND_REQUEST_NEW
723 continuation = frame.flags & FLAG_COMMAND_REQUEST_CONTINUATION
725 continuation = frame.flags & FLAG_COMMAND_REQUEST_CONTINUATION
724
726
725 if new and continuation:
727 if new and continuation:
726 self._state = 'errored'
728 self._state = 'errored'
727 return self._makeerrorresult(
729 return self._makeerrorresult(
728 _('received command request frame with both new and '
730 _('received command request frame with both new and '
729 'continuation flags set'))
731 'continuation flags set'))
730
732
731 if not new and not continuation:
733 if not new and not continuation:
732 self._state = 'errored'
734 self._state = 'errored'
733 return self._makeerrorresult(
735 return self._makeerrorresult(
734 _('received command request frame with neither new nor '
736 _('received command request frame with neither new nor '
735 'continuation flags set'))
737 'continuation flags set'))
736
738
737 def _onframeidle(self, frame):
739 def _onframeidle(self, frame):
738 # The only frame type that should be received in this state is a
740 # The only frame type that should be received in this state is a
739 # command request.
741 # command request.
740 if frame.typeid != FRAME_TYPE_COMMAND_REQUEST:
742 if frame.typeid != FRAME_TYPE_COMMAND_REQUEST:
741 self._state = 'errored'
743 self._state = 'errored'
742 return self._makeerrorresult(
744 return self._makeerrorresult(
743 _('expected command request frame; got %d') % frame.typeid)
745 _('expected command request frame; got %d') % frame.typeid)
744
746
745 res = self._validatecommandrequestframe(frame)
747 res = self._validatecommandrequestframe(frame)
746 if res:
748 if res:
747 return res
749 return res
748
750
749 if frame.requestid in self._receivingcommands:
751 if frame.requestid in self._receivingcommands:
750 self._state = 'errored'
752 self._state = 'errored'
751 return self._makeerrorresult(
753 return self._makeerrorresult(
752 _('request with ID %d already received') % frame.requestid)
754 _('request with ID %d already received') % frame.requestid)
753
755
754 if frame.requestid in self._activecommands:
756 if frame.requestid in self._activecommands:
755 self._state = 'errored'
757 self._state = 'errored'
756 return self._makeerrorresult(
758 return self._makeerrorresult(
757 _('request with ID %d is already active') % frame.requestid)
759 _('request with ID %d is already active') % frame.requestid)
758
760
759 new = frame.flags & FLAG_COMMAND_REQUEST_NEW
761 new = frame.flags & FLAG_COMMAND_REQUEST_NEW
760 moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES
762 moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES
761 expectingdata = frame.flags & FLAG_COMMAND_REQUEST_EXPECT_DATA
763 expectingdata = frame.flags & FLAG_COMMAND_REQUEST_EXPECT_DATA
762
764
763 if not new:
765 if not new:
764 self._state = 'errored'
766 self._state = 'errored'
765 return self._makeerrorresult(
767 return self._makeerrorresult(
766 _('received command request frame without new flag set'))
768 _('received command request frame without new flag set'))
767
769
768 payload = util.bytesio()
770 payload = util.bytesio()
769 payload.write(frame.payload)
771 payload.write(frame.payload)
770
772
771 self._receivingcommands[frame.requestid] = {
773 self._receivingcommands[frame.requestid] = {
772 'payload': payload,
774 'payload': payload,
773 'data': None,
775 'data': None,
774 'requestdone': not moreframes,
776 'requestdone': not moreframes,
775 'expectingdata': bool(expectingdata),
777 'expectingdata': bool(expectingdata),
776 }
778 }
777
779
778 # This is the final frame for this request. Dispatch it.
780 # This is the final frame for this request. Dispatch it.
779 if not moreframes and not expectingdata:
781 if not moreframes and not expectingdata:
780 return self._makeruncommandresult(frame.requestid)
782 return self._makeruncommandresult(frame.requestid)
781
783
782 assert moreframes or expectingdata
784 assert moreframes or expectingdata
783 self._state = 'command-receiving'
785 self._state = 'command-receiving'
784 return self._makewantframeresult()
786 return self._makewantframeresult()
785
787
786 def _onframecommandreceiving(self, frame):
788 def _onframecommandreceiving(self, frame):
787 if frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
789 if frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
788 # Process new command requests as such.
790 # Process new command requests as such.
789 if frame.flags & FLAG_COMMAND_REQUEST_NEW:
791 if frame.flags & FLAG_COMMAND_REQUEST_NEW:
790 return self._onframeidle(frame)
792 return self._onframeidle(frame)
791
793
792 res = self._validatecommandrequestframe(frame)
794 res = self._validatecommandrequestframe(frame)
793 if res:
795 if res:
794 return res
796 return res
795
797
796 # All other frames should be related to a command that is currently
798 # All other frames should be related to a command that is currently
797 # receiving but is not active.
799 # receiving but is not active.
798 if frame.requestid in self._activecommands:
800 if frame.requestid in self._activecommands:
799 self._state = 'errored'
801 self._state = 'errored'
800 return self._makeerrorresult(
802 return self._makeerrorresult(
801 _('received frame for request that is still active: %d') %
803 _('received frame for request that is still active: %d') %
802 frame.requestid)
804 frame.requestid)
803
805
804 if frame.requestid not in self._receivingcommands:
806 if frame.requestid not in self._receivingcommands:
805 self._state = 'errored'
807 self._state = 'errored'
806 return self._makeerrorresult(
808 return self._makeerrorresult(
807 _('received frame for request that is not receiving: %d') %
809 _('received frame for request that is not receiving: %d') %
808 frame.requestid)
810 frame.requestid)
809
811
810 entry = self._receivingcommands[frame.requestid]
812 entry = self._receivingcommands[frame.requestid]
811
813
812 if frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
814 if frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
813 moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES
815 moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES
814 expectingdata = bool(frame.flags & FLAG_COMMAND_REQUEST_EXPECT_DATA)
816 expectingdata = bool(frame.flags & FLAG_COMMAND_REQUEST_EXPECT_DATA)
815
817
816 if entry['requestdone']:
818 if entry['requestdone']:
817 self._state = 'errored'
819 self._state = 'errored'
818 return self._makeerrorresult(
820 return self._makeerrorresult(
819 _('received command request frame when request frames '
821 _('received command request frame when request frames '
820 'were supposedly done'))
822 'were supposedly done'))
821
823
822 if expectingdata != entry['expectingdata']:
824 if expectingdata != entry['expectingdata']:
823 self._state = 'errored'
825 self._state = 'errored'
824 return self._makeerrorresult(
826 return self._makeerrorresult(
825 _('mismatch between expect data flag and previous frame'))
827 _('mismatch between expect data flag and previous frame'))
826
828
827 entry['payload'].write(frame.payload)
829 entry['payload'].write(frame.payload)
828
830
829 if not moreframes:
831 if not moreframes:
830 entry['requestdone'] = True
832 entry['requestdone'] = True
831
833
832 if not moreframes and not expectingdata:
834 if not moreframes and not expectingdata:
833 return self._makeruncommandresult(frame.requestid)
835 return self._makeruncommandresult(frame.requestid)
834
836
835 return self._makewantframeresult()
837 return self._makewantframeresult()
836
838
837 elif frame.typeid == FRAME_TYPE_COMMAND_DATA:
839 elif frame.typeid == FRAME_TYPE_COMMAND_DATA:
838 if not entry['expectingdata']:
840 if not entry['expectingdata']:
839 self._state = 'errored'
841 self._state = 'errored'
840 return self._makeerrorresult(_(
842 return self._makeerrorresult(_(
841 'received command data frame for request that is not '
843 'received command data frame for request that is not '
842 'expecting data: %d') % frame.requestid)
844 'expecting data: %d') % frame.requestid)
843
845
844 if entry['data'] is None:
846 if entry['data'] is None:
845 entry['data'] = util.bytesio()
847 entry['data'] = util.bytesio()
846
848
847 return self._handlecommanddataframe(frame, entry)
849 return self._handlecommanddataframe(frame, entry)
848 else:
850 else:
849 self._state = 'errored'
851 self._state = 'errored'
850 return self._makeerrorresult(_(
852 return self._makeerrorresult(_(
851 'received unexpected frame type: %d') % frame.typeid)
853 'received unexpected frame type: %d') % frame.typeid)
852
854
853 def _handlecommanddataframe(self, frame, entry):
855 def _handlecommanddataframe(self, frame, entry):
854 assert frame.typeid == FRAME_TYPE_COMMAND_DATA
856 assert frame.typeid == FRAME_TYPE_COMMAND_DATA
855
857
856 # TODO support streaming data instead of buffering it.
858 # TODO support streaming data instead of buffering it.
857 entry['data'].write(frame.payload)
859 entry['data'].write(frame.payload)
858
860
859 if frame.flags & FLAG_COMMAND_DATA_CONTINUATION:
861 if frame.flags & FLAG_COMMAND_DATA_CONTINUATION:
860 return self._makewantframeresult()
862 return self._makewantframeresult()
861 elif frame.flags & FLAG_COMMAND_DATA_EOS:
863 elif frame.flags & FLAG_COMMAND_DATA_EOS:
862 entry['data'].seek(0)
864 entry['data'].seek(0)
863 return self._makeruncommandresult(frame.requestid)
865 return self._makeruncommandresult(frame.requestid)
864 else:
866 else:
865 self._state = 'errored'
867 self._state = 'errored'
866 return self._makeerrorresult(_('command data frame without '
868 return self._makeerrorresult(_('command data frame without '
867 'flags'))
869 'flags'))
868
870
869 def _onframeerrored(self, frame):
871 def _onframeerrored(self, frame):
870 return self._makeerrorresult(_('server already errored'))
872 return self._makeerrorresult(_('server already errored'))
General Comments 0
You need to be logged in to leave comments. Login now