##// END OF EJS Templates
py3: cast exception to bytes...
Gregory Szorc -
r39870:bce1c1af default
parent child Browse files
Show More
@@ -1,1325 +1,1326 b''
1 1 # wireprotoframing.py - unified framing protocol for wire protocol
2 2 #
3 3 # Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 # This file contains functionality to support the unified frame-based wire
9 9 # protocol. For details about the protocol, see
10 10 # `hg help internals.wireprotocol`.
11 11
12 12 from __future__ import absolute_import
13 13
14 14 import collections
15 15 import struct
16 16
17 17 from .i18n import _
18 18 from .thirdparty import (
19 19 attr,
20 20 )
21 21 from . import (
22 22 encoding,
23 23 error,
24 24 util,
25 25 )
26 26 from .utils import (
27 27 cborutil,
28 28 stringutil,
29 29 )
30 30
31 31 FRAME_HEADER_SIZE = 8
32 32 DEFAULT_MAX_FRAME_SIZE = 32768
33 33
34 34 STREAM_FLAG_BEGIN_STREAM = 0x01
35 35 STREAM_FLAG_END_STREAM = 0x02
36 36 STREAM_FLAG_ENCODING_APPLIED = 0x04
37 37
38 38 STREAM_FLAGS = {
39 39 b'stream-begin': STREAM_FLAG_BEGIN_STREAM,
40 40 b'stream-end': STREAM_FLAG_END_STREAM,
41 41 b'encoded': STREAM_FLAG_ENCODING_APPLIED,
42 42 }
43 43
44 44 FRAME_TYPE_COMMAND_REQUEST = 0x01
45 45 FRAME_TYPE_COMMAND_DATA = 0x02
46 46 FRAME_TYPE_COMMAND_RESPONSE = 0x03
47 47 FRAME_TYPE_ERROR_RESPONSE = 0x05
48 48 FRAME_TYPE_TEXT_OUTPUT = 0x06
49 49 FRAME_TYPE_PROGRESS = 0x07
50 50 FRAME_TYPE_STREAM_SETTINGS = 0x08
51 51
52 52 FRAME_TYPES = {
53 53 b'command-request': FRAME_TYPE_COMMAND_REQUEST,
54 54 b'command-data': FRAME_TYPE_COMMAND_DATA,
55 55 b'command-response': FRAME_TYPE_COMMAND_RESPONSE,
56 56 b'error-response': FRAME_TYPE_ERROR_RESPONSE,
57 57 b'text-output': FRAME_TYPE_TEXT_OUTPUT,
58 58 b'progress': FRAME_TYPE_PROGRESS,
59 59 b'stream-settings': FRAME_TYPE_STREAM_SETTINGS,
60 60 }
61 61
62 62 FLAG_COMMAND_REQUEST_NEW = 0x01
63 63 FLAG_COMMAND_REQUEST_CONTINUATION = 0x02
64 64 FLAG_COMMAND_REQUEST_MORE_FRAMES = 0x04
65 65 FLAG_COMMAND_REQUEST_EXPECT_DATA = 0x08
66 66
67 67 FLAGS_COMMAND_REQUEST = {
68 68 b'new': FLAG_COMMAND_REQUEST_NEW,
69 69 b'continuation': FLAG_COMMAND_REQUEST_CONTINUATION,
70 70 b'more': FLAG_COMMAND_REQUEST_MORE_FRAMES,
71 71 b'have-data': FLAG_COMMAND_REQUEST_EXPECT_DATA,
72 72 }
73 73
74 74 FLAG_COMMAND_DATA_CONTINUATION = 0x01
75 75 FLAG_COMMAND_DATA_EOS = 0x02
76 76
77 77 FLAGS_COMMAND_DATA = {
78 78 b'continuation': FLAG_COMMAND_DATA_CONTINUATION,
79 79 b'eos': FLAG_COMMAND_DATA_EOS,
80 80 }
81 81
82 82 FLAG_COMMAND_RESPONSE_CONTINUATION = 0x01
83 83 FLAG_COMMAND_RESPONSE_EOS = 0x02
84 84
85 85 FLAGS_COMMAND_RESPONSE = {
86 86 b'continuation': FLAG_COMMAND_RESPONSE_CONTINUATION,
87 87 b'eos': FLAG_COMMAND_RESPONSE_EOS,
88 88 }
89 89
90 90 # Maps frame types to their available flags.
91 91 FRAME_TYPE_FLAGS = {
92 92 FRAME_TYPE_COMMAND_REQUEST: FLAGS_COMMAND_REQUEST,
93 93 FRAME_TYPE_COMMAND_DATA: FLAGS_COMMAND_DATA,
94 94 FRAME_TYPE_COMMAND_RESPONSE: FLAGS_COMMAND_RESPONSE,
95 95 FRAME_TYPE_ERROR_RESPONSE: {},
96 96 FRAME_TYPE_TEXT_OUTPUT: {},
97 97 FRAME_TYPE_PROGRESS: {},
98 98 FRAME_TYPE_STREAM_SETTINGS: {},
99 99 }
100 100
101 101 ARGUMENT_RECORD_HEADER = struct.Struct(r'<HH')
102 102
103 103 def humanflags(mapping, value):
104 104 """Convert a numeric flags value to a human value, using a mapping table."""
105 105 namemap = {v: k for k, v in mapping.iteritems()}
106 106 flags = []
107 107 val = 1
108 108 while value >= val:
109 109 if value & val:
110 110 flags.append(namemap.get(val, '<unknown 0x%02x>' % val))
111 111 val <<= 1
112 112
113 113 return b'|'.join(flags)
114 114
115 115 @attr.s(slots=True)
116 116 class frameheader(object):
117 117 """Represents the data in a frame header."""
118 118
119 119 length = attr.ib()
120 120 requestid = attr.ib()
121 121 streamid = attr.ib()
122 122 streamflags = attr.ib()
123 123 typeid = attr.ib()
124 124 flags = attr.ib()
125 125
126 126 @attr.s(slots=True, repr=False)
127 127 class frame(object):
128 128 """Represents a parsed frame."""
129 129
130 130 requestid = attr.ib()
131 131 streamid = attr.ib()
132 132 streamflags = attr.ib()
133 133 typeid = attr.ib()
134 134 flags = attr.ib()
135 135 payload = attr.ib()
136 136
137 137 @encoding.strmethod
138 138 def __repr__(self):
139 139 typename = '<unknown 0x%02x>' % self.typeid
140 140 for name, value in FRAME_TYPES.iteritems():
141 141 if value == self.typeid:
142 142 typename = name
143 143 break
144 144
145 145 return ('frame(size=%d; request=%d; stream=%d; streamflags=%s; '
146 146 'type=%s; flags=%s)' % (
147 147 len(self.payload), self.requestid, self.streamid,
148 148 humanflags(STREAM_FLAGS, self.streamflags), typename,
149 149 humanflags(FRAME_TYPE_FLAGS.get(self.typeid, {}), self.flags)))
150 150
151 151 def makeframe(requestid, streamid, streamflags, typeid, flags, payload):
152 152 """Assemble a frame into a byte array."""
153 153 # TODO assert size of payload.
154 154 frame = bytearray(FRAME_HEADER_SIZE + len(payload))
155 155
156 156 # 24 bits length
157 157 # 16 bits request id
158 158 # 8 bits stream id
159 159 # 8 bits stream flags
160 160 # 4 bits type
161 161 # 4 bits flags
162 162
163 163 l = struct.pack(r'<I', len(payload))
164 164 frame[0:3] = l[0:3]
165 165 struct.pack_into(r'<HBB', frame, 3, requestid, streamid, streamflags)
166 166 frame[7] = (typeid << 4) | flags
167 167 frame[8:] = payload
168 168
169 169 return frame
170 170
171 171 def makeframefromhumanstring(s):
172 172 """Create a frame from a human readable string
173 173
174 174 Strings have the form:
175 175
176 176 <request-id> <stream-id> <stream-flags> <type> <flags> <payload>
177 177
178 178 This can be used by user-facing applications and tests for creating
179 179 frames easily without having to type out a bunch of constants.
180 180
181 181 Request ID and stream IDs are integers.
182 182
183 183 Stream flags, frame type, and flags can be specified by integer or
184 184 named constant.
185 185
186 186 Flags can be delimited by `|` to bitwise OR them together.
187 187
188 188 If the payload begins with ``cbor:``, the following string will be
189 189 evaluated as Python literal and the resulting object will be fed into
190 190 a CBOR encoder. Otherwise, the payload is interpreted as a Python
191 191 byte string literal.
192 192 """
193 193 fields = s.split(b' ', 5)
194 194 requestid, streamid, streamflags, frametype, frameflags, payload = fields
195 195
196 196 requestid = int(requestid)
197 197 streamid = int(streamid)
198 198
199 199 finalstreamflags = 0
200 200 for flag in streamflags.split(b'|'):
201 201 if flag in STREAM_FLAGS:
202 202 finalstreamflags |= STREAM_FLAGS[flag]
203 203 else:
204 204 finalstreamflags |= int(flag)
205 205
206 206 if frametype in FRAME_TYPES:
207 207 frametype = FRAME_TYPES[frametype]
208 208 else:
209 209 frametype = int(frametype)
210 210
211 211 finalflags = 0
212 212 validflags = FRAME_TYPE_FLAGS[frametype]
213 213 for flag in frameflags.split(b'|'):
214 214 if flag in validflags:
215 215 finalflags |= validflags[flag]
216 216 else:
217 217 finalflags |= int(flag)
218 218
219 219 if payload.startswith(b'cbor:'):
220 220 payload = b''.join(cborutil.streamencode(
221 221 stringutil.evalpythonliteral(payload[5:])))
222 222
223 223 else:
224 224 payload = stringutil.unescapestr(payload)
225 225
226 226 return makeframe(requestid=requestid, streamid=streamid,
227 227 streamflags=finalstreamflags, typeid=frametype,
228 228 flags=finalflags, payload=payload)
229 229
230 230 def parseheader(data):
231 231 """Parse a unified framing protocol frame header from a buffer.
232 232
233 233 The header is expected to be in the buffer at offset 0 and the
234 234 buffer is expected to be large enough to hold a full header.
235 235 """
236 236 # 24 bits payload length (little endian)
237 237 # 16 bits request ID
238 238 # 8 bits stream ID
239 239 # 8 bits stream flags
240 240 # 4 bits frame type
241 241 # 4 bits frame flags
242 242 # ... payload
243 243 framelength = data[0] + 256 * data[1] + 16384 * data[2]
244 244 requestid, streamid, streamflags = struct.unpack_from(r'<HBB', data, 3)
245 245 typeflags = data[7]
246 246
247 247 frametype = (typeflags & 0xf0) >> 4
248 248 frameflags = typeflags & 0x0f
249 249
250 250 return frameheader(framelength, requestid, streamid, streamflags,
251 251 frametype, frameflags)
252 252
253 253 def readframe(fh):
254 254 """Read a unified framing protocol frame from a file object.
255 255
256 256 Returns a 3-tuple of (type, flags, payload) for the decoded frame or
257 257 None if no frame is available. May raise if a malformed frame is
258 258 seen.
259 259 """
260 260 header = bytearray(FRAME_HEADER_SIZE)
261 261
262 262 readcount = fh.readinto(header)
263 263
264 264 if readcount == 0:
265 265 return None
266 266
267 267 if readcount != FRAME_HEADER_SIZE:
268 268 raise error.Abort(_('received incomplete frame: got %d bytes: %s') %
269 269 (readcount, header))
270 270
271 271 h = parseheader(header)
272 272
273 273 payload = fh.read(h.length)
274 274 if len(payload) != h.length:
275 275 raise error.Abort(_('frame length error: expected %d; got %d') %
276 276 (h.length, len(payload)))
277 277
278 278 return frame(h.requestid, h.streamid, h.streamflags, h.typeid, h.flags,
279 279 payload)
280 280
281 281 def createcommandframes(stream, requestid, cmd, args, datafh=None,
282 282 maxframesize=DEFAULT_MAX_FRAME_SIZE):
283 283 """Create frames necessary to transmit a request to run a command.
284 284
285 285 This is a generator of bytearrays. Each item represents a frame
286 286 ready to be sent over the wire to a peer.
287 287 """
288 288 data = {b'name': cmd}
289 289 if args:
290 290 data[b'args'] = args
291 291
292 292 data = b''.join(cborutil.streamencode(data))
293 293
294 294 offset = 0
295 295
296 296 while True:
297 297 flags = 0
298 298
299 299 # Must set new or continuation flag.
300 300 if not offset:
301 301 flags |= FLAG_COMMAND_REQUEST_NEW
302 302 else:
303 303 flags |= FLAG_COMMAND_REQUEST_CONTINUATION
304 304
305 305 # Data frames is set on all frames.
306 306 if datafh:
307 307 flags |= FLAG_COMMAND_REQUEST_EXPECT_DATA
308 308
309 309 payload = data[offset:offset + maxframesize]
310 310 offset += len(payload)
311 311
312 312 if len(payload) == maxframesize and offset < len(data):
313 313 flags |= FLAG_COMMAND_REQUEST_MORE_FRAMES
314 314
315 315 yield stream.makeframe(requestid=requestid,
316 316 typeid=FRAME_TYPE_COMMAND_REQUEST,
317 317 flags=flags,
318 318 payload=payload)
319 319
320 320 if not (flags & FLAG_COMMAND_REQUEST_MORE_FRAMES):
321 321 break
322 322
323 323 if datafh:
324 324 while True:
325 325 data = datafh.read(DEFAULT_MAX_FRAME_SIZE)
326 326
327 327 done = False
328 328 if len(data) == DEFAULT_MAX_FRAME_SIZE:
329 329 flags = FLAG_COMMAND_DATA_CONTINUATION
330 330 else:
331 331 flags = FLAG_COMMAND_DATA_EOS
332 332 assert datafh.read(1) == b''
333 333 done = True
334 334
335 335 yield stream.makeframe(requestid=requestid,
336 336 typeid=FRAME_TYPE_COMMAND_DATA,
337 337 flags=flags,
338 338 payload=data)
339 339
340 340 if done:
341 341 break
342 342
343 343 def createcommandresponseframesfrombytes(stream, requestid, data,
344 344 maxframesize=DEFAULT_MAX_FRAME_SIZE):
345 345 """Create a raw frame to send a bytes response from static bytes input.
346 346
347 347 Returns a generator of bytearrays.
348 348 """
349 349 # Automatically send the overall CBOR response map.
350 350 overall = b''.join(cborutil.streamencode({b'status': b'ok'}))
351 351 if len(overall) > maxframesize:
352 352 raise error.ProgrammingError('not yet implemented')
353 353
354 354 # Simple case where we can fit the full response in a single frame.
355 355 if len(overall) + len(data) <= maxframesize:
356 356 flags = FLAG_COMMAND_RESPONSE_EOS
357 357 yield stream.makeframe(requestid=requestid,
358 358 typeid=FRAME_TYPE_COMMAND_RESPONSE,
359 359 flags=flags,
360 360 payload=overall + data)
361 361 return
362 362
363 363 # It's easier to send the overall CBOR map in its own frame than to track
364 364 # offsets.
365 365 yield stream.makeframe(requestid=requestid,
366 366 typeid=FRAME_TYPE_COMMAND_RESPONSE,
367 367 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
368 368 payload=overall)
369 369
370 370 offset = 0
371 371 while True:
372 372 chunk = data[offset:offset + maxframesize]
373 373 offset += len(chunk)
374 374 done = offset == len(data)
375 375
376 376 if done:
377 377 flags = FLAG_COMMAND_RESPONSE_EOS
378 378 else:
379 379 flags = FLAG_COMMAND_RESPONSE_CONTINUATION
380 380
381 381 yield stream.makeframe(requestid=requestid,
382 382 typeid=FRAME_TYPE_COMMAND_RESPONSE,
383 383 flags=flags,
384 384 payload=chunk)
385 385
386 386 if done:
387 387 break
388 388
389 389 def createbytesresponseframesfromgen(stream, requestid, gen,
390 390 maxframesize=DEFAULT_MAX_FRAME_SIZE):
391 391 """Generator of frames from a generator of byte chunks.
392 392
393 393 This assumes that another frame will follow whatever this emits. i.e.
394 394 this always emits the continuation flag and never emits the end-of-stream
395 395 flag.
396 396 """
397 397 cb = util.chunkbuffer(gen)
398 398 flags = FLAG_COMMAND_RESPONSE_CONTINUATION
399 399
400 400 while True:
401 401 chunk = cb.read(maxframesize)
402 402 if not chunk:
403 403 break
404 404
405 405 yield stream.makeframe(requestid=requestid,
406 406 typeid=FRAME_TYPE_COMMAND_RESPONSE,
407 407 flags=flags,
408 408 payload=chunk)
409 409
410 410 flags |= FLAG_COMMAND_RESPONSE_CONTINUATION
411 411
412 412 def createcommandresponseokframe(stream, requestid):
413 413 overall = b''.join(cborutil.streamencode({b'status': b'ok'}))
414 414
415 415 return stream.makeframe(requestid=requestid,
416 416 typeid=FRAME_TYPE_COMMAND_RESPONSE,
417 417 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
418 418 payload=overall)
419 419
420 420 def createcommandresponseeosframe(stream, requestid):
421 421 """Create an empty payload frame representing command end-of-stream."""
422 422 return stream.makeframe(requestid=requestid,
423 423 typeid=FRAME_TYPE_COMMAND_RESPONSE,
424 424 flags=FLAG_COMMAND_RESPONSE_EOS,
425 425 payload=b'')
426 426
427 427 def createcommanderrorresponse(stream, requestid, message, args=None):
428 428 # TODO should this be using a list of {'msg': ..., 'args': {}} so atom
429 429 # formatting works consistently?
430 430 m = {
431 431 b'status': b'error',
432 432 b'error': {
433 433 b'message': message,
434 434 }
435 435 }
436 436
437 437 if args:
438 438 m[b'error'][b'args'] = args
439 439
440 440 overall = b''.join(cborutil.streamencode(m))
441 441
442 442 yield stream.makeframe(requestid=requestid,
443 443 typeid=FRAME_TYPE_COMMAND_RESPONSE,
444 444 flags=FLAG_COMMAND_RESPONSE_EOS,
445 445 payload=overall)
446 446
447 447 def createerrorframe(stream, requestid, msg, errtype):
448 448 # TODO properly handle frame size limits.
449 449 assert len(msg) <= DEFAULT_MAX_FRAME_SIZE
450 450
451 451 payload = b''.join(cborutil.streamencode({
452 452 b'type': errtype,
453 453 b'message': [{b'msg': msg}],
454 454 }))
455 455
456 456 yield stream.makeframe(requestid=requestid,
457 457 typeid=FRAME_TYPE_ERROR_RESPONSE,
458 458 flags=0,
459 459 payload=payload)
460 460
461 461 def createtextoutputframe(stream, requestid, atoms,
462 462 maxframesize=DEFAULT_MAX_FRAME_SIZE):
463 463 """Create a text output frame to render text to people.
464 464
465 465 ``atoms`` is a 3-tuple of (formatting string, args, labels).
466 466
467 467 The formatting string contains ``%s`` tokens to be replaced by the
468 468 corresponding indexed entry in ``args``. ``labels`` is an iterable of
469 469 formatters to be applied at rendering time. In terms of the ``ui``
470 470 class, each atom corresponds to a ``ui.write()``.
471 471 """
472 472 atomdicts = []
473 473
474 474 for (formatting, args, labels) in atoms:
475 475 # TODO look for localstr, other types here?
476 476
477 477 if not isinstance(formatting, bytes):
478 478 raise ValueError('must use bytes formatting strings')
479 479 for arg in args:
480 480 if not isinstance(arg, bytes):
481 481 raise ValueError('must use bytes for arguments')
482 482 for label in labels:
483 483 if not isinstance(label, bytes):
484 484 raise ValueError('must use bytes for labels')
485 485
486 486 # Formatting string must be ASCII.
487 487 formatting = formatting.decode(r'ascii', r'replace').encode(r'ascii')
488 488
489 489 # Arguments must be UTF-8.
490 490 args = [a.decode(r'utf-8', r'replace').encode(r'utf-8') for a in args]
491 491
492 492 # Labels must be ASCII.
493 493 labels = [l.decode(r'ascii', r'strict').encode(r'ascii')
494 494 for l in labels]
495 495
496 496 atom = {b'msg': formatting}
497 497 if args:
498 498 atom[b'args'] = args
499 499 if labels:
500 500 atom[b'labels'] = labels
501 501
502 502 atomdicts.append(atom)
503 503
504 504 payload = b''.join(cborutil.streamencode(atomdicts))
505 505
506 506 if len(payload) > maxframesize:
507 507 raise ValueError('cannot encode data in a single frame')
508 508
509 509 yield stream.makeframe(requestid=requestid,
510 510 typeid=FRAME_TYPE_TEXT_OUTPUT,
511 511 flags=0,
512 512 payload=payload)
513 513
514 514 class bufferingcommandresponseemitter(object):
515 515 """Helper object to emit command response frames intelligently.
516 516
517 517 Raw command response data is likely emitted in chunks much smaller
518 518 than what can fit in a single frame. This class exists to buffer
519 519 chunks until enough data is available to fit in a single frame.
520 520
521 521 TODO we'll need something like this when compression is supported.
522 522 So it might make sense to implement this functionality at the stream
523 523 level.
524 524 """
525 525 def __init__(self, stream, requestid, maxframesize=DEFAULT_MAX_FRAME_SIZE):
526 526 self._stream = stream
527 527 self._requestid = requestid
528 528 self._maxsize = maxframesize
529 529 self._chunks = []
530 530 self._chunkssize = 0
531 531
532 532 def send(self, data):
533 533 """Send new data for emission.
534 534
535 535 Is a generator of new frames that were derived from the new input.
536 536
537 537 If the special input ``None`` is received, flushes all buffered
538 538 data to frames.
539 539 """
540 540
541 541 if data is None:
542 542 for frame in self._flush():
543 543 yield frame
544 544 return
545 545
546 546 # There is a ton of potential to do more complicated things here.
547 547 # Our immediate goal is to coalesce small chunks into big frames,
548 548 # not achieve the fewest number of frames possible. So we go with
549 549 # a simple implementation:
550 550 #
551 551 # * If a chunk is too large for a frame, we flush and emit frames
552 552 # for the new chunk.
553 553 # * If a chunk can be buffered without total buffered size limits
554 554 # being exceeded, we do that.
555 555 # * If a chunk causes us to go over our buffering limit, we flush
556 556 # and then buffer the new chunk.
557 557
558 558 if len(data) > self._maxsize:
559 559 for frame in self._flush():
560 560 yield frame
561 561
562 562 # Now emit frames for the big chunk.
563 563 offset = 0
564 564 while True:
565 565 chunk = data[offset:offset + self._maxsize]
566 566 offset += len(chunk)
567 567
568 568 yield self._stream.makeframe(
569 569 self._requestid,
570 570 typeid=FRAME_TYPE_COMMAND_RESPONSE,
571 571 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
572 572 payload=chunk)
573 573
574 574 if offset == len(data):
575 575 return
576 576
577 577 # If we don't have enough to constitute a full frame, buffer and
578 578 # return.
579 579 if len(data) + self._chunkssize < self._maxsize:
580 580 self._chunks.append(data)
581 581 self._chunkssize += len(data)
582 582 return
583 583
584 584 # Else flush what we have and buffer the new chunk. We could do
585 585 # something more intelligent here, like break the chunk. Let's
586 586 # keep things simple for now.
587 587 for frame in self._flush():
588 588 yield frame
589 589
590 590 self._chunks.append(data)
591 591 self._chunkssize = len(data)
592 592
593 593 def _flush(self):
594 594 payload = b''.join(self._chunks)
595 595 assert len(payload) <= self._maxsize
596 596
597 597 self._chunks[:] = []
598 598 self._chunkssize = 0
599 599
600 600 yield self._stream.makeframe(
601 601 self._requestid,
602 602 typeid=FRAME_TYPE_COMMAND_RESPONSE,
603 603 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
604 604 payload=payload)
605 605
606 606 class stream(object):
607 607 """Represents a logical unidirectional series of frames."""
608 608
609 609 def __init__(self, streamid, active=False):
610 610 self.streamid = streamid
611 611 self._active = active
612 612
613 613 def makeframe(self, requestid, typeid, flags, payload):
614 614 """Create a frame to be sent out over this stream.
615 615
616 616 Only returns the frame instance. Does not actually send it.
617 617 """
618 618 streamflags = 0
619 619 if not self._active:
620 620 streamflags |= STREAM_FLAG_BEGIN_STREAM
621 621 self._active = True
622 622
623 623 return makeframe(requestid, self.streamid, streamflags, typeid, flags,
624 624 payload)
625 625
626 626 def ensureserverstream(stream):
627 627 if stream.streamid % 2:
628 628 raise error.ProgrammingError('server should only write to even '
629 629 'numbered streams; %d is not even' %
630 630 stream.streamid)
631 631
632 632 class serverreactor(object):
633 633 """Holds state of a server handling frame-based protocol requests.
634 634
635 635 This class is the "brain" of the unified frame-based protocol server
636 636 component. While the protocol is stateless from the perspective of
637 637 requests/commands, something needs to track which frames have been
638 638 received, what frames to expect, etc. This class is that thing.
639 639
640 640 Instances are modeled as a state machine of sorts. Instances are also
641 641 reactionary to external events. The point of this class is to encapsulate
642 642 the state of the connection and the exchange of frames, not to perform
643 643 work. Instead, callers tell this class when something occurs, like a
644 644 frame arriving. If that activity is worthy of a follow-up action (say
645 645 *run a command*), the return value of that handler will say so.
646 646
647 647 I/O and CPU intensive operations are purposefully delegated outside of
648 648 this class.
649 649
650 650 Consumers are expected to tell instances when events occur. They do so by
651 651 calling the various ``on*`` methods. These methods return a 2-tuple
652 652 describing any follow-up action(s) to take. The first element is the
653 653 name of an action to perform. The second is a data structure (usually
654 654 a dict) specific to that action that contains more information. e.g.
655 655 if the server wants to send frames back to the client, the data structure
656 656 will contain a reference to those frames.
657 657
658 658 Valid actions that consumers can be instructed to take are:
659 659
660 660 sendframes
661 661 Indicates that frames should be sent to the client. The ``framegen``
662 662 key contains a generator of frames that should be sent. The server
663 663 assumes that all frames are sent to the client.
664 664
665 665 error
666 666 Indicates that an error occurred. Consumer should probably abort.
667 667
668 668 runcommand
669 669 Indicates that the consumer should run a wire protocol command. Details
670 670 of the command to run are given in the data structure.
671 671
672 672 wantframe
673 673 Indicates that nothing of interest happened and the server is waiting on
674 674 more frames from the client before anything interesting can be done.
675 675
676 676 noop
677 677 Indicates no additional action is required.
678 678
679 679 Known Issues
680 680 ------------
681 681
682 682 There are no limits to the number of partially received commands or their
683 683 size. A malicious client could stream command request data and exhaust the
684 684 server's memory.
685 685
686 686 Partially received commands are not acted upon when end of input is
687 687 reached. Should the server error if it receives a partial request?
688 688 Should the client send a message to abort a partially transmitted request
689 689 to facilitate graceful shutdown?
690 690
691 691 Active requests that haven't been responded to aren't tracked. This means
692 692 that if we receive a command and instruct its dispatch, another command
693 693 with its request ID can come in over the wire and there will be a race
694 694 between who responds to what.
695 695 """
696 696
697 697 def __init__(self, deferoutput=False):
698 698 """Construct a new server reactor.
699 699
700 700 ``deferoutput`` can be used to indicate that no output frames should be
701 701 instructed to be sent until input has been exhausted. In this mode,
702 702 events that would normally generate output frames (such as a command
703 703 response being ready) will instead defer instructing the consumer to
704 704 send those frames. This is useful for half-duplex transports where the
705 705 sender cannot receive until all data has been transmitted.
706 706 """
707 707 self._deferoutput = deferoutput
708 708 self._state = 'idle'
709 709 self._nextoutgoingstreamid = 2
710 710 self._bufferedframegens = []
711 711 # stream id -> stream instance for all active streams from the client.
712 712 self._incomingstreams = {}
713 713 self._outgoingstreams = {}
714 714 # request id -> dict of commands that are actively being received.
715 715 self._receivingcommands = {}
716 716 # Request IDs that have been received and are actively being processed.
717 717 # Once all output for a request has been sent, it is removed from this
718 718 # set.
719 719 self._activecommands = set()
720 720
721 721 def onframerecv(self, frame):
722 722 """Process a frame that has been received off the wire.
723 723
724 724 Returns a dict with an ``action`` key that details what action,
725 725 if any, the consumer should take next.
726 726 """
727 727 if not frame.streamid % 2:
728 728 self._state = 'errored'
729 729 return self._makeerrorresult(
730 730 _('received frame with even numbered stream ID: %d') %
731 731 frame.streamid)
732 732
733 733 if frame.streamid not in self._incomingstreams:
734 734 if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM:
735 735 self._state = 'errored'
736 736 return self._makeerrorresult(
737 737 _('received frame on unknown inactive stream without '
738 738 'beginning of stream flag set'))
739 739
740 740 self._incomingstreams[frame.streamid] = stream(frame.streamid)
741 741
742 742 if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED:
743 743 # TODO handle decoding frames
744 744 self._state = 'errored'
745 745 raise error.ProgrammingError('support for decoding stream payloads '
746 746 'not yet implemented')
747 747
748 748 if frame.streamflags & STREAM_FLAG_END_STREAM:
749 749 del self._incomingstreams[frame.streamid]
750 750
751 751 handlers = {
752 752 'idle': self._onframeidle,
753 753 'command-receiving': self._onframecommandreceiving,
754 754 'errored': self._onframeerrored,
755 755 }
756 756
757 757 meth = handlers.get(self._state)
758 758 if not meth:
759 759 raise error.ProgrammingError('unhandled state: %s' % self._state)
760 760
761 761 return meth(frame)
762 762
763 763 def oncommandresponseready(self, stream, requestid, data):
764 764 """Signal that a bytes response is ready to be sent to the client.
765 765
766 766 The raw bytes response is passed as an argument.
767 767 """
768 768 ensureserverstream(stream)
769 769
770 770 def sendframes():
771 771 for frame in createcommandresponseframesfrombytes(stream, requestid,
772 772 data):
773 773 yield frame
774 774
775 775 self._activecommands.remove(requestid)
776 776
777 777 result = sendframes()
778 778
779 779 if self._deferoutput:
780 780 self._bufferedframegens.append(result)
781 781 return 'noop', {}
782 782 else:
783 783 return 'sendframes', {
784 784 'framegen': result,
785 785 }
786 786
787 787 def oncommandresponsereadyobjects(self, stream, requestid, objs):
788 788 """Signal that objects are ready to be sent to the client.
789 789
790 790 ``objs`` is an iterable of objects (typically a generator) that will
791 791 be encoded via CBOR and added to frames, which will be sent to the
792 792 client.
793 793 """
794 794 ensureserverstream(stream)
795 795
796 796 # We need to take care over exception handling. Uncaught exceptions
797 797 # when generating frames could lead to premature end of the frame
798 798 # stream and the possibility of the server or client process getting
799 799 # in a bad state.
800 800 #
801 801 # Keep in mind that if ``objs`` is a generator, advancing it could
802 802 # raise exceptions that originated in e.g. wire protocol command
803 803 # functions. That is why we differentiate between exceptions raised
804 804 # when iterating versus other exceptions that occur.
805 805 #
806 806 # In all cases, when the function finishes, the request is fully
807 807 # handled and no new frames for it should be seen.
808 808
809 809 def sendframes():
810 810 emitted = False
811 811 emitter = bufferingcommandresponseemitter(stream, requestid)
812 812 while True:
813 813 try:
814 814 o = next(objs)
815 815 except StopIteration:
816 816 for frame in emitter.send(None):
817 817 yield frame
818 818
819 819 if emitted:
820 820 yield createcommandresponseeosframe(stream, requestid)
821 821 break
822 822
823 823 except error.WireprotoCommandError as e:
824 824 for frame in createcommanderrorresponse(
825 825 stream, requestid, e.message, e.messageargs):
826 826 yield frame
827 827 break
828 828
829 829 except Exception as e:
830 for frame in createerrorframe(stream, requestid,
831 '%s' % e,
830 for frame in createerrorframe(
831 stream, requestid, '%s' % stringutil.forcebytestr(e),
832 832 errtype='server'):
833
833 834 yield frame
834 835
835 836 break
836 837
837 838 try:
838 839 if not emitted:
839 840 yield createcommandresponseokframe(stream, requestid)
840 841 emitted = True
841 842
842 843 for chunk in cborutil.streamencode(o):
843 844 for frame in emitter.send(chunk):
844 845 yield frame
845 846
846 847 except Exception as e:
847 848 for frame in createerrorframe(stream, requestid,
848 849 '%s' % e,
849 850 errtype='server'):
850 851 yield frame
851 852
852 853 break
853 854
854 855 self._activecommands.remove(requestid)
855 856
856 857 return self._handlesendframes(sendframes())
857 858
858 859 def oninputeof(self):
859 860 """Signals that end of input has been received.
860 861
861 862 No more frames will be received. All pending activity should be
862 863 completed.
863 864 """
864 865 # TODO should we do anything about in-flight commands?
865 866
866 867 if not self._deferoutput or not self._bufferedframegens:
867 868 return 'noop', {}
868 869
869 870 # If we buffered all our responses, emit those.
870 871 def makegen():
871 872 for gen in self._bufferedframegens:
872 873 for frame in gen:
873 874 yield frame
874 875
875 876 return 'sendframes', {
876 877 'framegen': makegen(),
877 878 }
878 879
879 880 def _handlesendframes(self, framegen):
880 881 if self._deferoutput:
881 882 self._bufferedframegens.append(framegen)
882 883 return 'noop', {}
883 884 else:
884 885 return 'sendframes', {
885 886 'framegen': framegen,
886 887 }
887 888
888 889 def onservererror(self, stream, requestid, msg):
889 890 ensureserverstream(stream)
890 891
891 892 def sendframes():
892 893 for frame in createerrorframe(stream, requestid, msg,
893 894 errtype='server'):
894 895 yield frame
895 896
896 897 self._activecommands.remove(requestid)
897 898
898 899 return self._handlesendframes(sendframes())
899 900
900 901 def oncommanderror(self, stream, requestid, message, args=None):
901 902 """Called when a command encountered an error before sending output."""
902 903 ensureserverstream(stream)
903 904
904 905 def sendframes():
905 906 for frame in createcommanderrorresponse(stream, requestid, message,
906 907 args):
907 908 yield frame
908 909
909 910 self._activecommands.remove(requestid)
910 911
911 912 return self._handlesendframes(sendframes())
912 913
913 914 def makeoutputstream(self):
914 915 """Create a stream to be used for sending data to the client."""
915 916 streamid = self._nextoutgoingstreamid
916 917 self._nextoutgoingstreamid += 2
917 918
918 919 s = stream(streamid)
919 920 self._outgoingstreams[streamid] = s
920 921
921 922 return s
922 923
923 924 def _makeerrorresult(self, msg):
924 925 return 'error', {
925 926 'message': msg,
926 927 }
927 928
928 929 def _makeruncommandresult(self, requestid):
929 930 entry = self._receivingcommands[requestid]
930 931
931 932 if not entry['requestdone']:
932 933 self._state = 'errored'
933 934 raise error.ProgrammingError('should not be called without '
934 935 'requestdone set')
935 936
936 937 del self._receivingcommands[requestid]
937 938
938 939 if self._receivingcommands:
939 940 self._state = 'command-receiving'
940 941 else:
941 942 self._state = 'idle'
942 943
943 944 # Decode the payloads as CBOR.
944 945 entry['payload'].seek(0)
945 946 request = cborutil.decodeall(entry['payload'].getvalue())[0]
946 947
947 948 if b'name' not in request:
948 949 self._state = 'errored'
949 950 return self._makeerrorresult(
950 951 _('command request missing "name" field'))
951 952
952 953 if b'args' not in request:
953 954 request[b'args'] = {}
954 955
955 956 assert requestid not in self._activecommands
956 957 self._activecommands.add(requestid)
957 958
958 959 return 'runcommand', {
959 960 'requestid': requestid,
960 961 'command': request[b'name'],
961 962 'args': request[b'args'],
962 963 'data': entry['data'].getvalue() if entry['data'] else None,
963 964 }
964 965
965 966 def _makewantframeresult(self):
966 967 return 'wantframe', {
967 968 'state': self._state,
968 969 }
969 970
970 971 def _validatecommandrequestframe(self, frame):
971 972 new = frame.flags & FLAG_COMMAND_REQUEST_NEW
972 973 continuation = frame.flags & FLAG_COMMAND_REQUEST_CONTINUATION
973 974
974 975 if new and continuation:
975 976 self._state = 'errored'
976 977 return self._makeerrorresult(
977 978 _('received command request frame with both new and '
978 979 'continuation flags set'))
979 980
980 981 if not new and not continuation:
981 982 self._state = 'errored'
982 983 return self._makeerrorresult(
983 984 _('received command request frame with neither new nor '
984 985 'continuation flags set'))
985 986
986 987 def _onframeidle(self, frame):
987 988 # The only frame type that should be received in this state is a
988 989 # command request.
989 990 if frame.typeid != FRAME_TYPE_COMMAND_REQUEST:
990 991 self._state = 'errored'
991 992 return self._makeerrorresult(
992 993 _('expected command request frame; got %d') % frame.typeid)
993 994
994 995 res = self._validatecommandrequestframe(frame)
995 996 if res:
996 997 return res
997 998
998 999 if frame.requestid in self._receivingcommands:
999 1000 self._state = 'errored'
1000 1001 return self._makeerrorresult(
1001 1002 _('request with ID %d already received') % frame.requestid)
1002 1003
1003 1004 if frame.requestid in self._activecommands:
1004 1005 self._state = 'errored'
1005 1006 return self._makeerrorresult(
1006 1007 _('request with ID %d is already active') % frame.requestid)
1007 1008
1008 1009 new = frame.flags & FLAG_COMMAND_REQUEST_NEW
1009 1010 moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES
1010 1011 expectingdata = frame.flags & FLAG_COMMAND_REQUEST_EXPECT_DATA
1011 1012
1012 1013 if not new:
1013 1014 self._state = 'errored'
1014 1015 return self._makeerrorresult(
1015 1016 _('received command request frame without new flag set'))
1016 1017
1017 1018 payload = util.bytesio()
1018 1019 payload.write(frame.payload)
1019 1020
1020 1021 self._receivingcommands[frame.requestid] = {
1021 1022 'payload': payload,
1022 1023 'data': None,
1023 1024 'requestdone': not moreframes,
1024 1025 'expectingdata': bool(expectingdata),
1025 1026 }
1026 1027
1027 1028 # This is the final frame for this request. Dispatch it.
1028 1029 if not moreframes and not expectingdata:
1029 1030 return self._makeruncommandresult(frame.requestid)
1030 1031
1031 1032 assert moreframes or expectingdata
1032 1033 self._state = 'command-receiving'
1033 1034 return self._makewantframeresult()
1034 1035
1035 1036 def _onframecommandreceiving(self, frame):
1036 1037 if frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
1037 1038 # Process new command requests as such.
1038 1039 if frame.flags & FLAG_COMMAND_REQUEST_NEW:
1039 1040 return self._onframeidle(frame)
1040 1041
1041 1042 res = self._validatecommandrequestframe(frame)
1042 1043 if res:
1043 1044 return res
1044 1045
1045 1046 # All other frames should be related to a command that is currently
1046 1047 # receiving but is not active.
1047 1048 if frame.requestid in self._activecommands:
1048 1049 self._state = 'errored'
1049 1050 return self._makeerrorresult(
1050 1051 _('received frame for request that is still active: %d') %
1051 1052 frame.requestid)
1052 1053
1053 1054 if frame.requestid not in self._receivingcommands:
1054 1055 self._state = 'errored'
1055 1056 return self._makeerrorresult(
1056 1057 _('received frame for request that is not receiving: %d') %
1057 1058 frame.requestid)
1058 1059
1059 1060 entry = self._receivingcommands[frame.requestid]
1060 1061
1061 1062 if frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
1062 1063 moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES
1063 1064 expectingdata = bool(frame.flags & FLAG_COMMAND_REQUEST_EXPECT_DATA)
1064 1065
1065 1066 if entry['requestdone']:
1066 1067 self._state = 'errored'
1067 1068 return self._makeerrorresult(
1068 1069 _('received command request frame when request frames '
1069 1070 'were supposedly done'))
1070 1071
1071 1072 if expectingdata != entry['expectingdata']:
1072 1073 self._state = 'errored'
1073 1074 return self._makeerrorresult(
1074 1075 _('mismatch between expect data flag and previous frame'))
1075 1076
1076 1077 entry['payload'].write(frame.payload)
1077 1078
1078 1079 if not moreframes:
1079 1080 entry['requestdone'] = True
1080 1081
1081 1082 if not moreframes and not expectingdata:
1082 1083 return self._makeruncommandresult(frame.requestid)
1083 1084
1084 1085 return self._makewantframeresult()
1085 1086
1086 1087 elif frame.typeid == FRAME_TYPE_COMMAND_DATA:
1087 1088 if not entry['expectingdata']:
1088 1089 self._state = 'errored'
1089 1090 return self._makeerrorresult(_(
1090 1091 'received command data frame for request that is not '
1091 1092 'expecting data: %d') % frame.requestid)
1092 1093
1093 1094 if entry['data'] is None:
1094 1095 entry['data'] = util.bytesio()
1095 1096
1096 1097 return self._handlecommanddataframe(frame, entry)
1097 1098 else:
1098 1099 self._state = 'errored'
1099 1100 return self._makeerrorresult(_(
1100 1101 'received unexpected frame type: %d') % frame.typeid)
1101 1102
1102 1103 def _handlecommanddataframe(self, frame, entry):
1103 1104 assert frame.typeid == FRAME_TYPE_COMMAND_DATA
1104 1105
1105 1106 # TODO support streaming data instead of buffering it.
1106 1107 entry['data'].write(frame.payload)
1107 1108
1108 1109 if frame.flags & FLAG_COMMAND_DATA_CONTINUATION:
1109 1110 return self._makewantframeresult()
1110 1111 elif frame.flags & FLAG_COMMAND_DATA_EOS:
1111 1112 entry['data'].seek(0)
1112 1113 return self._makeruncommandresult(frame.requestid)
1113 1114 else:
1114 1115 self._state = 'errored'
1115 1116 return self._makeerrorresult(_('command data frame without '
1116 1117 'flags'))
1117 1118
1118 1119 def _onframeerrored(self, frame):
1119 1120 return self._makeerrorresult(_('server already errored'))
1120 1121
1121 1122 class commandrequest(object):
1122 1123 """Represents a request to run a command."""
1123 1124
1124 1125 def __init__(self, requestid, name, args, datafh=None):
1125 1126 self.requestid = requestid
1126 1127 self.name = name
1127 1128 self.args = args
1128 1129 self.datafh = datafh
1129 1130 self.state = 'pending'
1130 1131
1131 1132 class clientreactor(object):
1132 1133 """Holds state of a client issuing frame-based protocol requests.
1133 1134
1134 1135 This is like ``serverreactor`` but for client-side state.
1135 1136
1136 1137 Each instance is bound to the lifetime of a connection. For persistent
1137 1138 connection transports using e.g. TCP sockets and speaking the raw
1138 1139 framing protocol, there will be a single instance for the lifetime of
1139 1140 the TCP socket. For transports where there are multiple discrete
1140 1141 interactions (say tunneled within in HTTP request), there will be a
1141 1142 separate instance for each distinct interaction.
1142 1143 """
1143 1144 def __init__(self, hasmultiplesend=False, buffersends=True):
1144 1145 """Create a new instance.
1145 1146
1146 1147 ``hasmultiplesend`` indicates whether multiple sends are supported
1147 1148 by the transport. When True, it is possible to send commands immediately
1148 1149 instead of buffering until the caller signals an intent to finish a
1149 1150 send operation.
1150 1151
1151 1152 ``buffercommands`` indicates whether sends should be buffered until the
1152 1153 last request has been issued.
1153 1154 """
1154 1155 self._hasmultiplesend = hasmultiplesend
1155 1156 self._buffersends = buffersends
1156 1157
1157 1158 self._canissuecommands = True
1158 1159 self._cansend = True
1159 1160
1160 1161 self._nextrequestid = 1
1161 1162 # We only support a single outgoing stream for now.
1162 1163 self._outgoingstream = stream(1)
1163 1164 self._pendingrequests = collections.deque()
1164 1165 self._activerequests = {}
1165 1166 self._incomingstreams = {}
1166 1167
1167 1168 def callcommand(self, name, args, datafh=None):
1168 1169 """Request that a command be executed.
1169 1170
1170 1171 Receives the command name, a dict of arguments to pass to the command,
1171 1172 and an optional file object containing the raw data for the command.
1172 1173
1173 1174 Returns a 3-tuple of (request, action, action data).
1174 1175 """
1175 1176 if not self._canissuecommands:
1176 1177 raise error.ProgrammingError('cannot issue new commands')
1177 1178
1178 1179 requestid = self._nextrequestid
1179 1180 self._nextrequestid += 2
1180 1181
1181 1182 request = commandrequest(requestid, name, args, datafh=datafh)
1182 1183
1183 1184 if self._buffersends:
1184 1185 self._pendingrequests.append(request)
1185 1186 return request, 'noop', {}
1186 1187 else:
1187 1188 if not self._cansend:
1188 1189 raise error.ProgrammingError('sends cannot be performed on '
1189 1190 'this instance')
1190 1191
1191 1192 if not self._hasmultiplesend:
1192 1193 self._cansend = False
1193 1194 self._canissuecommands = False
1194 1195
1195 1196 return request, 'sendframes', {
1196 1197 'framegen': self._makecommandframes(request),
1197 1198 }
1198 1199
1199 1200 def flushcommands(self):
1200 1201 """Request that all queued commands be sent.
1201 1202
1202 1203 If any commands are buffered, this will instruct the caller to send
1203 1204 them over the wire. If no commands are buffered it instructs the client
1204 1205 to no-op.
1205 1206
1206 1207 If instances aren't configured for multiple sends, no new command
1207 1208 requests are allowed after this is called.
1208 1209 """
1209 1210 if not self._pendingrequests:
1210 1211 return 'noop', {}
1211 1212
1212 1213 if not self._cansend:
1213 1214 raise error.ProgrammingError('sends cannot be performed on this '
1214 1215 'instance')
1215 1216
1216 1217 # If the instance only allows sending once, mark that we have fired
1217 1218 # our one shot.
1218 1219 if not self._hasmultiplesend:
1219 1220 self._canissuecommands = False
1220 1221 self._cansend = False
1221 1222
1222 1223 def makeframes():
1223 1224 while self._pendingrequests:
1224 1225 request = self._pendingrequests.popleft()
1225 1226 for frame in self._makecommandframes(request):
1226 1227 yield frame
1227 1228
1228 1229 return 'sendframes', {
1229 1230 'framegen': makeframes(),
1230 1231 }
1231 1232
1232 1233 def _makecommandframes(self, request):
1233 1234 """Emit frames to issue a command request.
1234 1235
1235 1236 As a side-effect, update request accounting to reflect its changed
1236 1237 state.
1237 1238 """
1238 1239 self._activerequests[request.requestid] = request
1239 1240 request.state = 'sending'
1240 1241
1241 1242 res = createcommandframes(self._outgoingstream,
1242 1243 request.requestid,
1243 1244 request.name,
1244 1245 request.args,
1245 1246 request.datafh)
1246 1247
1247 1248 for frame in res:
1248 1249 yield frame
1249 1250
1250 1251 request.state = 'sent'
1251 1252
1252 1253 def onframerecv(self, frame):
1253 1254 """Process a frame that has been received off the wire.
1254 1255
1255 1256 Returns a 2-tuple of (action, meta) describing further action the
1256 1257 caller needs to take as a result of receiving this frame.
1257 1258 """
1258 1259 if frame.streamid % 2:
1259 1260 return 'error', {
1260 1261 'message': (
1261 1262 _('received frame with odd numbered stream ID: %d') %
1262 1263 frame.streamid),
1263 1264 }
1264 1265
1265 1266 if frame.streamid not in self._incomingstreams:
1266 1267 if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM:
1267 1268 return 'error', {
1268 1269 'message': _('received frame on unknown stream '
1269 1270 'without beginning of stream flag set'),
1270 1271 }
1271 1272
1272 1273 self._incomingstreams[frame.streamid] = stream(frame.streamid)
1273 1274
1274 1275 if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED:
1275 1276 raise error.ProgrammingError('support for decoding stream '
1276 1277 'payloads not yet implemneted')
1277 1278
1278 1279 if frame.streamflags & STREAM_FLAG_END_STREAM:
1279 1280 del self._incomingstreams[frame.streamid]
1280 1281
1281 1282 if frame.requestid not in self._activerequests:
1282 1283 return 'error', {
1283 1284 'message': (_('received frame for inactive request ID: %d') %
1284 1285 frame.requestid),
1285 1286 }
1286 1287
1287 1288 request = self._activerequests[frame.requestid]
1288 1289 request.state = 'receiving'
1289 1290
1290 1291 handlers = {
1291 1292 FRAME_TYPE_COMMAND_RESPONSE: self._oncommandresponseframe,
1292 1293 FRAME_TYPE_ERROR_RESPONSE: self._onerrorresponseframe,
1293 1294 }
1294 1295
1295 1296 meth = handlers.get(frame.typeid)
1296 1297 if not meth:
1297 1298 raise error.ProgrammingError('unhandled frame type: %d' %
1298 1299 frame.typeid)
1299 1300
1300 1301 return meth(request, frame)
1301 1302
1302 1303 def _oncommandresponseframe(self, request, frame):
1303 1304 if frame.flags & FLAG_COMMAND_RESPONSE_EOS:
1304 1305 request.state = 'received'
1305 1306 del self._activerequests[request.requestid]
1306 1307
1307 1308 return 'responsedata', {
1308 1309 'request': request,
1309 1310 'expectmore': frame.flags & FLAG_COMMAND_RESPONSE_CONTINUATION,
1310 1311 'eos': frame.flags & FLAG_COMMAND_RESPONSE_EOS,
1311 1312 'data': frame.payload,
1312 1313 }
1313 1314
1314 1315 def _onerrorresponseframe(self, request, frame):
1315 1316 request.state = 'errored'
1316 1317 del self._activerequests[request.requestid]
1317 1318
1318 1319 # The payload should be a CBOR map.
1319 1320 m = cborutil.decodeall(frame.payload)[0]
1320 1321
1321 1322 return 'error', {
1322 1323 'request': request,
1323 1324 'type': m['type'],
1324 1325 'message': m['message'],
1325 1326 }
General Comments 0
You need to be logged in to leave comments. Login now