##// END OF EJS Templates
wireproto: fix repr(frame) to not crash by unknown type id...
Yuya Nishihara -
r37490:2f81926c default
parent child Browse files
Show More
@@ -1,867 +1,867 b''
1 1 # wireprotoframing.py - unified framing protocol for wire protocol
2 2 #
3 3 # Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 # This file contains functionality to support the unified frame-based wire
9 9 # protocol. For details about the protocol, see
10 10 # `hg help internals.wireprotocol`.
11 11
12 12 from __future__ import absolute_import
13 13
14 14 import struct
15 15
16 16 from .i18n import _
17 17 from .thirdparty import (
18 18 attr,
19 19 cbor,
20 20 )
21 21 from . import (
22 22 error,
23 23 util,
24 24 )
25 25 from .utils import (
26 26 stringutil,
27 27 )
28 28
29 29 FRAME_HEADER_SIZE = 8
30 30 DEFAULT_MAX_FRAME_SIZE = 32768
31 31
32 32 STREAM_FLAG_BEGIN_STREAM = 0x01
33 33 STREAM_FLAG_END_STREAM = 0x02
34 34 STREAM_FLAG_ENCODING_APPLIED = 0x04
35 35
36 36 STREAM_FLAGS = {
37 37 b'stream-begin': STREAM_FLAG_BEGIN_STREAM,
38 38 b'stream-end': STREAM_FLAG_END_STREAM,
39 39 b'encoded': STREAM_FLAG_ENCODING_APPLIED,
40 40 }
41 41
42 42 FRAME_TYPE_COMMAND_REQUEST = 0x01
43 43 FRAME_TYPE_COMMAND_DATA = 0x03
44 44 FRAME_TYPE_BYTES_RESPONSE = 0x04
45 45 FRAME_TYPE_ERROR_RESPONSE = 0x05
46 46 FRAME_TYPE_TEXT_OUTPUT = 0x06
47 47 FRAME_TYPE_PROGRESS = 0x07
48 48 FRAME_TYPE_STREAM_SETTINGS = 0x08
49 49
50 50 FRAME_TYPES = {
51 51 b'command-request': FRAME_TYPE_COMMAND_REQUEST,
52 52 b'command-data': FRAME_TYPE_COMMAND_DATA,
53 53 b'bytes-response': FRAME_TYPE_BYTES_RESPONSE,
54 54 b'error-response': FRAME_TYPE_ERROR_RESPONSE,
55 55 b'text-output': FRAME_TYPE_TEXT_OUTPUT,
56 56 b'progress': FRAME_TYPE_PROGRESS,
57 57 b'stream-settings': FRAME_TYPE_STREAM_SETTINGS,
58 58 }
59 59
60 60 FLAG_COMMAND_REQUEST_NEW = 0x01
61 61 FLAG_COMMAND_REQUEST_CONTINUATION = 0x02
62 62 FLAG_COMMAND_REQUEST_MORE_FRAMES = 0x04
63 63 FLAG_COMMAND_REQUEST_EXPECT_DATA = 0x08
64 64
65 65 FLAGS_COMMAND_REQUEST = {
66 66 b'new': FLAG_COMMAND_REQUEST_NEW,
67 67 b'continuation': FLAG_COMMAND_REQUEST_CONTINUATION,
68 68 b'more': FLAG_COMMAND_REQUEST_MORE_FRAMES,
69 69 b'have-data': FLAG_COMMAND_REQUEST_EXPECT_DATA,
70 70 }
71 71
72 72 FLAG_COMMAND_DATA_CONTINUATION = 0x01
73 73 FLAG_COMMAND_DATA_EOS = 0x02
74 74
75 75 FLAGS_COMMAND_DATA = {
76 76 b'continuation': FLAG_COMMAND_DATA_CONTINUATION,
77 77 b'eos': FLAG_COMMAND_DATA_EOS,
78 78 }
79 79
80 80 FLAG_BYTES_RESPONSE_CONTINUATION = 0x01
81 81 FLAG_BYTES_RESPONSE_EOS = 0x02
82 82 FLAG_BYTES_RESPONSE_CBOR = 0x04
83 83
84 84 FLAGS_BYTES_RESPONSE = {
85 85 b'continuation': FLAG_BYTES_RESPONSE_CONTINUATION,
86 86 b'eos': FLAG_BYTES_RESPONSE_EOS,
87 87 b'cbor': FLAG_BYTES_RESPONSE_CBOR,
88 88 }
89 89
90 90 FLAG_ERROR_RESPONSE_PROTOCOL = 0x01
91 91 FLAG_ERROR_RESPONSE_APPLICATION = 0x02
92 92
93 93 FLAGS_ERROR_RESPONSE = {
94 94 b'protocol': FLAG_ERROR_RESPONSE_PROTOCOL,
95 95 b'application': FLAG_ERROR_RESPONSE_APPLICATION,
96 96 }
97 97
98 98 # Maps frame types to their available flags.
99 99 FRAME_TYPE_FLAGS = {
100 100 FRAME_TYPE_COMMAND_REQUEST: FLAGS_COMMAND_REQUEST,
101 101 FRAME_TYPE_COMMAND_DATA: FLAGS_COMMAND_DATA,
102 102 FRAME_TYPE_BYTES_RESPONSE: FLAGS_BYTES_RESPONSE,
103 103 FRAME_TYPE_ERROR_RESPONSE: FLAGS_ERROR_RESPONSE,
104 104 FRAME_TYPE_TEXT_OUTPUT: {},
105 105 FRAME_TYPE_PROGRESS: {},
106 106 FRAME_TYPE_STREAM_SETTINGS: {},
107 107 }
108 108
109 109 ARGUMENT_RECORD_HEADER = struct.Struct(r'<HH')
110 110
111 111 def humanflags(mapping, value):
112 112 """Convert a numeric flags value to a human value, using a mapping table."""
113 113 flags = []
114 114 for val, name in sorted({v: k for k, v in mapping.iteritems()}.iteritems()):
115 115 if value & val:
116 116 flags.append(name)
117 117
118 118 return b'|'.join(flags)
119 119
120 120 @attr.s(slots=True)
121 121 class frameheader(object):
122 122 """Represents the data in a frame header."""
123 123
124 124 length = attr.ib()
125 125 requestid = attr.ib()
126 126 streamid = attr.ib()
127 127 streamflags = attr.ib()
128 128 typeid = attr.ib()
129 129 flags = attr.ib()
130 130
131 131 @attr.s(slots=True, repr=False)
132 132 class frame(object):
133 133 """Represents a parsed frame."""
134 134
135 135 requestid = attr.ib()
136 136 streamid = attr.ib()
137 137 streamflags = attr.ib()
138 138 typeid = attr.ib()
139 139 flags = attr.ib()
140 140 payload = attr.ib()
141 141
142 142 def __repr__(self):
143 143 typename = '<unknown>'
144 144 for name, value in FRAME_TYPES.iteritems():
145 145 if value == self.typeid:
146 146 typename = name
147 147 break
148 148
149 149 return ('frame(size=%d; request=%d; stream=%d; streamflags=%s; '
150 150 'type=%s; flags=%s)' % (
151 151 len(self.payload), self.requestid, self.streamid,
152 152 humanflags(STREAM_FLAGS, self.streamflags), typename,
153 humanflags(FRAME_TYPE_FLAGS[self.typeid], self.flags)))
153 humanflags(FRAME_TYPE_FLAGS.get(self.typeid, {}), self.flags)))
154 154
155 155 def makeframe(requestid, streamid, streamflags, typeid, flags, payload):
156 156 """Assemble a frame into a byte array."""
157 157 # TODO assert size of payload.
158 158 frame = bytearray(FRAME_HEADER_SIZE + len(payload))
159 159
160 160 # 24 bits length
161 161 # 16 bits request id
162 162 # 8 bits stream id
163 163 # 8 bits stream flags
164 164 # 4 bits type
165 165 # 4 bits flags
166 166
167 167 l = struct.pack(r'<I', len(payload))
168 168 frame[0:3] = l[0:3]
169 169 struct.pack_into(r'<HBB', frame, 3, requestid, streamid, streamflags)
170 170 frame[7] = (typeid << 4) | flags
171 171 frame[8:] = payload
172 172
173 173 return frame
174 174
175 175 def makeframefromhumanstring(s):
176 176 """Create a frame from a human readable string
177 177
178 178 DANGER: NOT SAFE TO USE WITH UNTRUSTED INPUT BECAUSE OF POTENTIAL
179 179 eval() USAGE. DO NOT USE IN CORE.
180 180
181 181 Strings have the form:
182 182
183 183 <request-id> <stream-id> <stream-flags> <type> <flags> <payload>
184 184
185 185 This can be used by user-facing applications and tests for creating
186 186 frames easily without having to type out a bunch of constants.
187 187
188 188 Request ID and stream IDs are integers.
189 189
190 190 Stream flags, frame type, and flags can be specified by integer or
191 191 named constant.
192 192
193 193 Flags can be delimited by `|` to bitwise OR them together.
194 194
195 195 If the payload begins with ``cbor:``, the following string will be
196 196 evaluated as Python code and the resulting object will be fed into
197 197 a CBOR encoder. Otherwise, the payload is interpreted as a Python
198 198 byte string literal.
199 199 """
200 200 fields = s.split(b' ', 5)
201 201 requestid, streamid, streamflags, frametype, frameflags, payload = fields
202 202
203 203 requestid = int(requestid)
204 204 streamid = int(streamid)
205 205
206 206 finalstreamflags = 0
207 207 for flag in streamflags.split(b'|'):
208 208 if flag in STREAM_FLAGS:
209 209 finalstreamflags |= STREAM_FLAGS[flag]
210 210 else:
211 211 finalstreamflags |= int(flag)
212 212
213 213 if frametype in FRAME_TYPES:
214 214 frametype = FRAME_TYPES[frametype]
215 215 else:
216 216 frametype = int(frametype)
217 217
218 218 finalflags = 0
219 219 validflags = FRAME_TYPE_FLAGS[frametype]
220 220 for flag in frameflags.split(b'|'):
221 221 if flag in validflags:
222 222 finalflags |= validflags[flag]
223 223 else:
224 224 finalflags |= int(flag)
225 225
226 226 if payload.startswith(b'cbor:'):
227 227 payload = cbor.dumps(stringutil.evalpython(payload[5:]), canonical=True)
228 228
229 229 else:
230 230 payload = stringutil.unescapestr(payload)
231 231
232 232 return makeframe(requestid=requestid, streamid=streamid,
233 233 streamflags=finalstreamflags, typeid=frametype,
234 234 flags=finalflags, payload=payload)
235 235
236 236 def parseheader(data):
237 237 """Parse a unified framing protocol frame header from a buffer.
238 238
239 239 The header is expected to be in the buffer at offset 0 and the
240 240 buffer is expected to be large enough to hold a full header.
241 241 """
242 242 # 24 bits payload length (little endian)
243 243 # 16 bits request ID
244 244 # 8 bits stream ID
245 245 # 8 bits stream flags
246 246 # 4 bits frame type
247 247 # 4 bits frame flags
248 248 # ... payload
249 249 framelength = data[0] + 256 * data[1] + 16384 * data[2]
250 250 requestid, streamid, streamflags = struct.unpack_from(r'<HBB', data, 3)
251 251 typeflags = data[7]
252 252
253 253 frametype = (typeflags & 0xf0) >> 4
254 254 frameflags = typeflags & 0x0f
255 255
256 256 return frameheader(framelength, requestid, streamid, streamflags,
257 257 frametype, frameflags)
258 258
259 259 def readframe(fh):
260 260 """Read a unified framing protocol frame from a file object.
261 261
262 262 Returns a 3-tuple of (type, flags, payload) for the decoded frame or
263 263 None if no frame is available. May raise if a malformed frame is
264 264 seen.
265 265 """
266 266 header = bytearray(FRAME_HEADER_SIZE)
267 267
268 268 readcount = fh.readinto(header)
269 269
270 270 if readcount == 0:
271 271 return None
272 272
273 273 if readcount != FRAME_HEADER_SIZE:
274 274 raise error.Abort(_('received incomplete frame: got %d bytes: %s') %
275 275 (readcount, header))
276 276
277 277 h = parseheader(header)
278 278
279 279 payload = fh.read(h.length)
280 280 if len(payload) != h.length:
281 281 raise error.Abort(_('frame length error: expected %d; got %d') %
282 282 (h.length, len(payload)))
283 283
284 284 return frame(h.requestid, h.streamid, h.streamflags, h.typeid, h.flags,
285 285 payload)
286 286
287 287 def createcommandframes(stream, requestid, cmd, args, datafh=None,
288 288 maxframesize=DEFAULT_MAX_FRAME_SIZE):
289 289 """Create frames necessary to transmit a request to run a command.
290 290
291 291 This is a generator of bytearrays. Each item represents a frame
292 292 ready to be sent over the wire to a peer.
293 293 """
294 294 data = {b'name': cmd}
295 295 if args:
296 296 data[b'args'] = args
297 297
298 298 data = cbor.dumps(data, canonical=True)
299 299
300 300 offset = 0
301 301
302 302 while True:
303 303 flags = 0
304 304
305 305 # Must set new or continuation flag.
306 306 if not offset:
307 307 flags |= FLAG_COMMAND_REQUEST_NEW
308 308 else:
309 309 flags |= FLAG_COMMAND_REQUEST_CONTINUATION
310 310
311 311 # Data frames is set on all frames.
312 312 if datafh:
313 313 flags |= FLAG_COMMAND_REQUEST_EXPECT_DATA
314 314
315 315 payload = data[offset:offset + maxframesize]
316 316 offset += len(payload)
317 317
318 318 if len(payload) == maxframesize and offset < len(data):
319 319 flags |= FLAG_COMMAND_REQUEST_MORE_FRAMES
320 320
321 321 yield stream.makeframe(requestid=requestid,
322 322 typeid=FRAME_TYPE_COMMAND_REQUEST,
323 323 flags=flags,
324 324 payload=payload)
325 325
326 326 if not (flags & FLAG_COMMAND_REQUEST_MORE_FRAMES):
327 327 break
328 328
329 329 if datafh:
330 330 while True:
331 331 data = datafh.read(DEFAULT_MAX_FRAME_SIZE)
332 332
333 333 done = False
334 334 if len(data) == DEFAULT_MAX_FRAME_SIZE:
335 335 flags = FLAG_COMMAND_DATA_CONTINUATION
336 336 else:
337 337 flags = FLAG_COMMAND_DATA_EOS
338 338 assert datafh.read(1) == b''
339 339 done = True
340 340
341 341 yield stream.makeframe(requestid=requestid,
342 342 typeid=FRAME_TYPE_COMMAND_DATA,
343 343 flags=flags,
344 344 payload=data)
345 345
346 346 if done:
347 347 break
348 348
349 349 def createbytesresponseframesfrombytes(stream, requestid, data,
350 350 maxframesize=DEFAULT_MAX_FRAME_SIZE):
351 351 """Create a raw frame to send a bytes response from static bytes input.
352 352
353 353 Returns a generator of bytearrays.
354 354 """
355 355
356 356 # Simple case of a single frame.
357 357 if len(data) <= maxframesize:
358 358 yield stream.makeframe(requestid=requestid,
359 359 typeid=FRAME_TYPE_BYTES_RESPONSE,
360 360 flags=FLAG_BYTES_RESPONSE_EOS,
361 361 payload=data)
362 362 return
363 363
364 364 offset = 0
365 365 while True:
366 366 chunk = data[offset:offset + maxframesize]
367 367 offset += len(chunk)
368 368 done = offset == len(data)
369 369
370 370 if done:
371 371 flags = FLAG_BYTES_RESPONSE_EOS
372 372 else:
373 373 flags = FLAG_BYTES_RESPONSE_CONTINUATION
374 374
375 375 yield stream.makeframe(requestid=requestid,
376 376 typeid=FRAME_TYPE_BYTES_RESPONSE,
377 377 flags=flags,
378 378 payload=chunk)
379 379
380 380 if done:
381 381 break
382 382
383 383 def createerrorframe(stream, requestid, msg, protocol=False, application=False):
384 384 # TODO properly handle frame size limits.
385 385 assert len(msg) <= DEFAULT_MAX_FRAME_SIZE
386 386
387 387 flags = 0
388 388 if protocol:
389 389 flags |= FLAG_ERROR_RESPONSE_PROTOCOL
390 390 if application:
391 391 flags |= FLAG_ERROR_RESPONSE_APPLICATION
392 392
393 393 yield stream.makeframe(requestid=requestid,
394 394 typeid=FRAME_TYPE_ERROR_RESPONSE,
395 395 flags=flags,
396 396 payload=msg)
397 397
398 398 def createtextoutputframe(stream, requestid, atoms,
399 399 maxframesize=DEFAULT_MAX_FRAME_SIZE):
400 400 """Create a text output frame to render text to people.
401 401
402 402 ``atoms`` is a 3-tuple of (formatting string, args, labels).
403 403
404 404 The formatting string contains ``%s`` tokens to be replaced by the
405 405 corresponding indexed entry in ``args``. ``labels`` is an iterable of
406 406 formatters to be applied at rendering time. In terms of the ``ui``
407 407 class, each atom corresponds to a ``ui.write()``.
408 408 """
409 409 atomdicts = []
410 410
411 411 for (formatting, args, labels) in atoms:
412 412 # TODO look for localstr, other types here?
413 413
414 414 if not isinstance(formatting, bytes):
415 415 raise ValueError('must use bytes formatting strings')
416 416 for arg in args:
417 417 if not isinstance(arg, bytes):
418 418 raise ValueError('must use bytes for arguments')
419 419 for label in labels:
420 420 if not isinstance(label, bytes):
421 421 raise ValueError('must use bytes for labels')
422 422
423 423 # Formatting string must be ASCII.
424 424 formatting = formatting.decode(r'ascii', r'replace').encode(r'ascii')
425 425
426 426 # Arguments must be UTF-8.
427 427 args = [a.decode(r'utf-8', r'replace').encode(r'utf-8') for a in args]
428 428
429 429 # Labels must be ASCII.
430 430 labels = [l.decode(r'ascii', r'strict').encode(r'ascii')
431 431 for l in labels]
432 432
433 433 atom = {b'msg': formatting}
434 434 if args:
435 435 atom[b'args'] = args
436 436 if labels:
437 437 atom[b'labels'] = labels
438 438
439 439 atomdicts.append(atom)
440 440
441 441 payload = cbor.dumps(atomdicts, canonical=True)
442 442
443 443 if len(payload) > maxframesize:
444 444 raise ValueError('cannot encode data in a single frame')
445 445
446 446 yield stream.makeframe(requestid=requestid,
447 447 typeid=FRAME_TYPE_TEXT_OUTPUT,
448 448 flags=0,
449 449 payload=payload)
450 450
451 451 class stream(object):
452 452 """Represents a logical unidirectional series of frames."""
453 453
454 454 def __init__(self, streamid, active=False):
455 455 self.streamid = streamid
456 456 self._active = False
457 457
458 458 def makeframe(self, requestid, typeid, flags, payload):
459 459 """Create a frame to be sent out over this stream.
460 460
461 461 Only returns the frame instance. Does not actually send it.
462 462 """
463 463 streamflags = 0
464 464 if not self._active:
465 465 streamflags |= STREAM_FLAG_BEGIN_STREAM
466 466 self._active = True
467 467
468 468 return makeframe(requestid, self.streamid, streamflags, typeid, flags,
469 469 payload)
470 470
471 471 def ensureserverstream(stream):
472 472 if stream.streamid % 2:
473 473 raise error.ProgrammingError('server should only write to even '
474 474 'numbered streams; %d is not even' %
475 475 stream.streamid)
476 476
477 477 class serverreactor(object):
478 478 """Holds state of a server handling frame-based protocol requests.
479 479
480 480 This class is the "brain" of the unified frame-based protocol server
481 481 component. While the protocol is stateless from the perspective of
482 482 requests/commands, something needs to track which frames have been
483 483 received, what frames to expect, etc. This class is that thing.
484 484
485 485 Instances are modeled as a state machine of sorts. Instances are also
486 486 reactionary to external events. The point of this class is to encapsulate
487 487 the state of the connection and the exchange of frames, not to perform
488 488 work. Instead, callers tell this class when something occurs, like a
489 489 frame arriving. If that activity is worthy of a follow-up action (say
490 490 *run a command*), the return value of that handler will say so.
491 491
492 492 I/O and CPU intensive operations are purposefully delegated outside of
493 493 this class.
494 494
495 495 Consumers are expected to tell instances when events occur. They do so by
496 496 calling the various ``on*`` methods. These methods return a 2-tuple
497 497 describing any follow-up action(s) to take. The first element is the
498 498 name of an action to perform. The second is a data structure (usually
499 499 a dict) specific to that action that contains more information. e.g.
500 500 if the server wants to send frames back to the client, the data structure
501 501 will contain a reference to those frames.
502 502
503 503 Valid actions that consumers can be instructed to take are:
504 504
505 505 sendframes
506 506 Indicates that frames should be sent to the client. The ``framegen``
507 507 key contains a generator of frames that should be sent. The server
508 508 assumes that all frames are sent to the client.
509 509
510 510 error
511 511 Indicates that an error occurred. Consumer should probably abort.
512 512
513 513 runcommand
514 514 Indicates that the consumer should run a wire protocol command. Details
515 515 of the command to run are given in the data structure.
516 516
517 517 wantframe
518 518 Indicates that nothing of interest happened and the server is waiting on
519 519 more frames from the client before anything interesting can be done.
520 520
521 521 noop
522 522 Indicates no additional action is required.
523 523
524 524 Known Issues
525 525 ------------
526 526
527 527 There are no limits to the number of partially received commands or their
528 528 size. A malicious client could stream command request data and exhaust the
529 529 server's memory.
530 530
531 531 Partially received commands are not acted upon when end of input is
532 532 reached. Should the server error if it receives a partial request?
533 533 Should the client send a message to abort a partially transmitted request
534 534 to facilitate graceful shutdown?
535 535
536 536 Active requests that haven't been responded to aren't tracked. This means
537 537 that if we receive a command and instruct its dispatch, another command
538 538 with its request ID can come in over the wire and there will be a race
539 539 between who responds to what.
540 540 """
541 541
542 542 def __init__(self, deferoutput=False):
543 543 """Construct a new server reactor.
544 544
545 545 ``deferoutput`` can be used to indicate that no output frames should be
546 546 instructed to be sent until input has been exhausted. In this mode,
547 547 events that would normally generate output frames (such as a command
548 548 response being ready) will instead defer instructing the consumer to
549 549 send those frames. This is useful for half-duplex transports where the
550 550 sender cannot receive until all data has been transmitted.
551 551 """
552 552 self._deferoutput = deferoutput
553 553 self._state = 'idle'
554 554 self._nextoutgoingstreamid = 2
555 555 self._bufferedframegens = []
556 556 # stream id -> stream instance for all active streams from the client.
557 557 self._incomingstreams = {}
558 558 self._outgoingstreams = {}
559 559 # request id -> dict of commands that are actively being received.
560 560 self._receivingcommands = {}
561 561 # Request IDs that have been received and are actively being processed.
562 562 # Once all output for a request has been sent, it is removed from this
563 563 # set.
564 564 self._activecommands = set()
565 565
566 566 def onframerecv(self, frame):
567 567 """Process a frame that has been received off the wire.
568 568
569 569 Returns a dict with an ``action`` key that details what action,
570 570 if any, the consumer should take next.
571 571 """
572 572 if not frame.streamid % 2:
573 573 self._state = 'errored'
574 574 return self._makeerrorresult(
575 575 _('received frame with even numbered stream ID: %d') %
576 576 frame.streamid)
577 577
578 578 if frame.streamid not in self._incomingstreams:
579 579 if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM:
580 580 self._state = 'errored'
581 581 return self._makeerrorresult(
582 582 _('received frame on unknown inactive stream without '
583 583 'beginning of stream flag set'))
584 584
585 585 self._incomingstreams[frame.streamid] = stream(frame.streamid)
586 586
587 587 if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED:
588 588 # TODO handle decoding frames
589 589 self._state = 'errored'
590 590 raise error.ProgrammingError('support for decoding stream payloads '
591 591 'not yet implemented')
592 592
593 593 if frame.streamflags & STREAM_FLAG_END_STREAM:
594 594 del self._incomingstreams[frame.streamid]
595 595
596 596 handlers = {
597 597 'idle': self._onframeidle,
598 598 'command-receiving': self._onframecommandreceiving,
599 599 'errored': self._onframeerrored,
600 600 }
601 601
602 602 meth = handlers.get(self._state)
603 603 if not meth:
604 604 raise error.ProgrammingError('unhandled state: %s' % self._state)
605 605
606 606 return meth(frame)
607 607
608 608 def onbytesresponseready(self, stream, requestid, data):
609 609 """Signal that a bytes response is ready to be sent to the client.
610 610
611 611 The raw bytes response is passed as an argument.
612 612 """
613 613 ensureserverstream(stream)
614 614
615 615 def sendframes():
616 616 for frame in createbytesresponseframesfrombytes(stream, requestid,
617 617 data):
618 618 yield frame
619 619
620 620 self._activecommands.remove(requestid)
621 621
622 622 result = sendframes()
623 623
624 624 if self._deferoutput:
625 625 self._bufferedframegens.append(result)
626 626 return 'noop', {}
627 627 else:
628 628 return 'sendframes', {
629 629 'framegen': result,
630 630 }
631 631
632 632 def oninputeof(self):
633 633 """Signals that end of input has been received.
634 634
635 635 No more frames will be received. All pending activity should be
636 636 completed.
637 637 """
638 638 # TODO should we do anything about in-flight commands?
639 639
640 640 if not self._deferoutput or not self._bufferedframegens:
641 641 return 'noop', {}
642 642
643 643 # If we buffered all our responses, emit those.
644 644 def makegen():
645 645 for gen in self._bufferedframegens:
646 646 for frame in gen:
647 647 yield frame
648 648
649 649 return 'sendframes', {
650 650 'framegen': makegen(),
651 651 }
652 652
653 653 def onapplicationerror(self, stream, requestid, msg):
654 654 ensureserverstream(stream)
655 655
656 656 return 'sendframes', {
657 657 'framegen': createerrorframe(stream, requestid, msg,
658 658 application=True),
659 659 }
660 660
661 661 def makeoutputstream(self):
662 662 """Create a stream to be used for sending data to the client."""
663 663 streamid = self._nextoutgoingstreamid
664 664 self._nextoutgoingstreamid += 2
665 665
666 666 s = stream(streamid)
667 667 self._outgoingstreams[streamid] = s
668 668
669 669 return s
670 670
671 671 def _makeerrorresult(self, msg):
672 672 return 'error', {
673 673 'message': msg,
674 674 }
675 675
676 676 def _makeruncommandresult(self, requestid):
677 677 entry = self._receivingcommands[requestid]
678 678
679 679 if not entry['requestdone']:
680 680 self._state = 'errored'
681 681 raise error.ProgrammingError('should not be called without '
682 682 'requestdone set')
683 683
684 684 del self._receivingcommands[requestid]
685 685
686 686 if self._receivingcommands:
687 687 self._state = 'command-receiving'
688 688 else:
689 689 self._state = 'idle'
690 690
691 691 # Decode the payloads as CBOR.
692 692 entry['payload'].seek(0)
693 693 request = cbor.load(entry['payload'])
694 694
695 695 if b'name' not in request:
696 696 self._state = 'errored'
697 697 return self._makeerrorresult(
698 698 _('command request missing "name" field'))
699 699
700 700 if b'args' not in request:
701 701 request[b'args'] = {}
702 702
703 703 assert requestid not in self._activecommands
704 704 self._activecommands.add(requestid)
705 705
706 706 return 'runcommand', {
707 707 'requestid': requestid,
708 708 'command': request[b'name'],
709 709 'args': request[b'args'],
710 710 'data': entry['data'].getvalue() if entry['data'] else None,
711 711 }
712 712
713 713 def _makewantframeresult(self):
714 714 return 'wantframe', {
715 715 'state': self._state,
716 716 }
717 717
718 718 def _validatecommandrequestframe(self, frame):
719 719 new = frame.flags & FLAG_COMMAND_REQUEST_NEW
720 720 continuation = frame.flags & FLAG_COMMAND_REQUEST_CONTINUATION
721 721
722 722 if new and continuation:
723 723 self._state = 'errored'
724 724 return self._makeerrorresult(
725 725 _('received command request frame with both new and '
726 726 'continuation flags set'))
727 727
728 728 if not new and not continuation:
729 729 self._state = 'errored'
730 730 return self._makeerrorresult(
731 731 _('received command request frame with neither new nor '
732 732 'continuation flags set'))
733 733
734 734 def _onframeidle(self, frame):
735 735 # The only frame type that should be received in this state is a
736 736 # command request.
737 737 if frame.typeid != FRAME_TYPE_COMMAND_REQUEST:
738 738 self._state = 'errored'
739 739 return self._makeerrorresult(
740 740 _('expected command request frame; got %d') % frame.typeid)
741 741
742 742 res = self._validatecommandrequestframe(frame)
743 743 if res:
744 744 return res
745 745
746 746 if frame.requestid in self._receivingcommands:
747 747 self._state = 'errored'
748 748 return self._makeerrorresult(
749 749 _('request with ID %d already received') % frame.requestid)
750 750
751 751 if frame.requestid in self._activecommands:
752 752 self._state = 'errored'
753 753 return self._makeerrorresult(
754 754 _('request with ID %d is already active') % frame.requestid)
755 755
756 756 new = frame.flags & FLAG_COMMAND_REQUEST_NEW
757 757 moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES
758 758 expectingdata = frame.flags & FLAG_COMMAND_REQUEST_EXPECT_DATA
759 759
760 760 if not new:
761 761 self._state = 'errored'
762 762 return self._makeerrorresult(
763 763 _('received command request frame without new flag set'))
764 764
765 765 payload = util.bytesio()
766 766 payload.write(frame.payload)
767 767
768 768 self._receivingcommands[frame.requestid] = {
769 769 'payload': payload,
770 770 'data': None,
771 771 'requestdone': not moreframes,
772 772 'expectingdata': bool(expectingdata),
773 773 }
774 774
775 775 # This is the final frame for this request. Dispatch it.
776 776 if not moreframes and not expectingdata:
777 777 return self._makeruncommandresult(frame.requestid)
778 778
779 779 assert moreframes or expectingdata
780 780 self._state = 'command-receiving'
781 781 return self._makewantframeresult()
782 782
783 783 def _onframecommandreceiving(self, frame):
784 784 if frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
785 785 # Process new command requests as such.
786 786 if frame.flags & FLAG_COMMAND_REQUEST_NEW:
787 787 return self._onframeidle(frame)
788 788
789 789 res = self._validatecommandrequestframe(frame)
790 790 if res:
791 791 return res
792 792
793 793 # All other frames should be related to a command that is currently
794 794 # receiving but is not active.
795 795 if frame.requestid in self._activecommands:
796 796 self._state = 'errored'
797 797 return self._makeerrorresult(
798 798 _('received frame for request that is still active: %d') %
799 799 frame.requestid)
800 800
801 801 if frame.requestid not in self._receivingcommands:
802 802 self._state = 'errored'
803 803 return self._makeerrorresult(
804 804 _('received frame for request that is not receiving: %d') %
805 805 frame.requestid)
806 806
807 807 entry = self._receivingcommands[frame.requestid]
808 808
809 809 if frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
810 810 moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES
811 811 expectingdata = bool(frame.flags & FLAG_COMMAND_REQUEST_EXPECT_DATA)
812 812
813 813 if entry['requestdone']:
814 814 self._state = 'errored'
815 815 return self._makeerrorresult(
816 816 _('received command request frame when request frames '
817 817 'were supposedly done'))
818 818
819 819 if expectingdata != entry['expectingdata']:
820 820 self._state = 'errored'
821 821 return self._makeerrorresult(
822 822 _('mismatch between expect data flag and previous frame'))
823 823
824 824 entry['payload'].write(frame.payload)
825 825
826 826 if not moreframes:
827 827 entry['requestdone'] = True
828 828
829 829 if not moreframes and not expectingdata:
830 830 return self._makeruncommandresult(frame.requestid)
831 831
832 832 return self._makewantframeresult()
833 833
834 834 elif frame.typeid == FRAME_TYPE_COMMAND_DATA:
835 835 if not entry['expectingdata']:
836 836 self._state = 'errored'
837 837 return self._makeerrorresult(_(
838 838 'received command data frame for request that is not '
839 839 'expecting data: %d') % frame.requestid)
840 840
841 841 if entry['data'] is None:
842 842 entry['data'] = util.bytesio()
843 843
844 844 return self._handlecommanddataframe(frame, entry)
845 845 else:
846 846 self._state = 'errored'
847 847 return self._makeerrorresult(_(
848 848 'received unexpected frame type: %d') % frame.typeid)
849 849
850 850 def _handlecommanddataframe(self, frame, entry):
851 851 assert frame.typeid == FRAME_TYPE_COMMAND_DATA
852 852
853 853 # TODO support streaming data instead of buffering it.
854 854 entry['data'].write(frame.payload)
855 855
856 856 if frame.flags & FLAG_COMMAND_DATA_CONTINUATION:
857 857 return self._makewantframeresult()
858 858 elif frame.flags & FLAG_COMMAND_DATA_EOS:
859 859 entry['data'].seek(0)
860 860 return self._makeruncommandresult(frame.requestid)
861 861 else:
862 862 self._state = 'errored'
863 863 return self._makeerrorresult(_('command data frame without '
864 864 'flags'))
865 865
866 866 def _onframeerrored(self, frame):
867 867 return self._makeerrorresult(_('server already errored'))
General Comments 0
You need to be logged in to leave comments. Login now