##// END OF EJS Templates
py3: system-stringify repr(frame)...
Yuya Nishihara -
r37492:d3399712 default
parent child Browse files
Show More
@@ -1,870 +1,872
1 1 # wireprotoframing.py - unified framing protocol for wire protocol
2 2 #
3 3 # Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 # This file contains functionality to support the unified frame-based wire
9 9 # protocol. For details about the protocol, see
10 10 # `hg help internals.wireprotocol`.
11 11
12 12 from __future__ import absolute_import
13 13
14 14 import struct
15 15
16 16 from .i18n import _
17 17 from .thirdparty import (
18 18 attr,
19 19 cbor,
20 20 )
21 21 from . import (
22 encoding,
22 23 error,
23 24 util,
24 25 )
25 26 from .utils import (
26 27 stringutil,
27 28 )
28 29
29 30 FRAME_HEADER_SIZE = 8
30 31 DEFAULT_MAX_FRAME_SIZE = 32768
31 32
32 33 STREAM_FLAG_BEGIN_STREAM = 0x01
33 34 STREAM_FLAG_END_STREAM = 0x02
34 35 STREAM_FLAG_ENCODING_APPLIED = 0x04
35 36
36 37 STREAM_FLAGS = {
37 38 b'stream-begin': STREAM_FLAG_BEGIN_STREAM,
38 39 b'stream-end': STREAM_FLAG_END_STREAM,
39 40 b'encoded': STREAM_FLAG_ENCODING_APPLIED,
40 41 }
41 42
42 43 FRAME_TYPE_COMMAND_REQUEST = 0x01
43 44 FRAME_TYPE_COMMAND_DATA = 0x03
44 45 FRAME_TYPE_BYTES_RESPONSE = 0x04
45 46 FRAME_TYPE_ERROR_RESPONSE = 0x05
46 47 FRAME_TYPE_TEXT_OUTPUT = 0x06
47 48 FRAME_TYPE_PROGRESS = 0x07
48 49 FRAME_TYPE_STREAM_SETTINGS = 0x08
49 50
50 51 FRAME_TYPES = {
51 52 b'command-request': FRAME_TYPE_COMMAND_REQUEST,
52 53 b'command-data': FRAME_TYPE_COMMAND_DATA,
53 54 b'bytes-response': FRAME_TYPE_BYTES_RESPONSE,
54 55 b'error-response': FRAME_TYPE_ERROR_RESPONSE,
55 56 b'text-output': FRAME_TYPE_TEXT_OUTPUT,
56 57 b'progress': FRAME_TYPE_PROGRESS,
57 58 b'stream-settings': FRAME_TYPE_STREAM_SETTINGS,
58 59 }
59 60
60 61 FLAG_COMMAND_REQUEST_NEW = 0x01
61 62 FLAG_COMMAND_REQUEST_CONTINUATION = 0x02
62 63 FLAG_COMMAND_REQUEST_MORE_FRAMES = 0x04
63 64 FLAG_COMMAND_REQUEST_EXPECT_DATA = 0x08
64 65
65 66 FLAGS_COMMAND_REQUEST = {
66 67 b'new': FLAG_COMMAND_REQUEST_NEW,
67 68 b'continuation': FLAG_COMMAND_REQUEST_CONTINUATION,
68 69 b'more': FLAG_COMMAND_REQUEST_MORE_FRAMES,
69 70 b'have-data': FLAG_COMMAND_REQUEST_EXPECT_DATA,
70 71 }
71 72
72 73 FLAG_COMMAND_DATA_CONTINUATION = 0x01
73 74 FLAG_COMMAND_DATA_EOS = 0x02
74 75
75 76 FLAGS_COMMAND_DATA = {
76 77 b'continuation': FLAG_COMMAND_DATA_CONTINUATION,
77 78 b'eos': FLAG_COMMAND_DATA_EOS,
78 79 }
79 80
80 81 FLAG_BYTES_RESPONSE_CONTINUATION = 0x01
81 82 FLAG_BYTES_RESPONSE_EOS = 0x02
82 83 FLAG_BYTES_RESPONSE_CBOR = 0x04
83 84
84 85 FLAGS_BYTES_RESPONSE = {
85 86 b'continuation': FLAG_BYTES_RESPONSE_CONTINUATION,
86 87 b'eos': FLAG_BYTES_RESPONSE_EOS,
87 88 b'cbor': FLAG_BYTES_RESPONSE_CBOR,
88 89 }
89 90
90 91 FLAG_ERROR_RESPONSE_PROTOCOL = 0x01
91 92 FLAG_ERROR_RESPONSE_APPLICATION = 0x02
92 93
93 94 FLAGS_ERROR_RESPONSE = {
94 95 b'protocol': FLAG_ERROR_RESPONSE_PROTOCOL,
95 96 b'application': FLAG_ERROR_RESPONSE_APPLICATION,
96 97 }
97 98
98 99 # Maps frame types to their available flags.
99 100 FRAME_TYPE_FLAGS = {
100 101 FRAME_TYPE_COMMAND_REQUEST: FLAGS_COMMAND_REQUEST,
101 102 FRAME_TYPE_COMMAND_DATA: FLAGS_COMMAND_DATA,
102 103 FRAME_TYPE_BYTES_RESPONSE: FLAGS_BYTES_RESPONSE,
103 104 FRAME_TYPE_ERROR_RESPONSE: FLAGS_ERROR_RESPONSE,
104 105 FRAME_TYPE_TEXT_OUTPUT: {},
105 106 FRAME_TYPE_PROGRESS: {},
106 107 FRAME_TYPE_STREAM_SETTINGS: {},
107 108 }
108 109
109 110 ARGUMENT_RECORD_HEADER = struct.Struct(r'<HH')
110 111
111 112 def humanflags(mapping, value):
112 113 """Convert a numeric flags value to a human value, using a mapping table."""
113 114 namemap = {v: k for k, v in mapping.iteritems()}
114 115 flags = []
115 116 val = 1
116 117 while value >= val:
117 118 if value & val:
118 119 flags.append(namemap.get(val, '<unknown 0x%02x>' % val))
119 120 val <<= 1
120 121
121 122 return b'|'.join(flags)
122 123
123 124 @attr.s(slots=True)
124 125 class frameheader(object):
125 126 """Represents the data in a frame header."""
126 127
127 128 length = attr.ib()
128 129 requestid = attr.ib()
129 130 streamid = attr.ib()
130 131 streamflags = attr.ib()
131 132 typeid = attr.ib()
132 133 flags = attr.ib()
133 134
134 135 @attr.s(slots=True, repr=False)
135 136 class frame(object):
136 137 """Represents a parsed frame."""
137 138
138 139 requestid = attr.ib()
139 140 streamid = attr.ib()
140 141 streamflags = attr.ib()
141 142 typeid = attr.ib()
142 143 flags = attr.ib()
143 144 payload = attr.ib()
144 145
146 @encoding.strmethod
145 147 def __repr__(self):
146 148 typename = '<unknown 0x%02x>' % self.typeid
147 149 for name, value in FRAME_TYPES.iteritems():
148 150 if value == self.typeid:
149 151 typename = name
150 152 break
151 153
152 154 return ('frame(size=%d; request=%d; stream=%d; streamflags=%s; '
153 155 'type=%s; flags=%s)' % (
154 156 len(self.payload), self.requestid, self.streamid,
155 157 humanflags(STREAM_FLAGS, self.streamflags), typename,
156 158 humanflags(FRAME_TYPE_FLAGS.get(self.typeid, {}), self.flags)))
157 159
158 160 def makeframe(requestid, streamid, streamflags, typeid, flags, payload):
159 161 """Assemble a frame into a byte array."""
160 162 # TODO assert size of payload.
161 163 frame = bytearray(FRAME_HEADER_SIZE + len(payload))
162 164
163 165 # 24 bits length
164 166 # 16 bits request id
165 167 # 8 bits stream id
166 168 # 8 bits stream flags
167 169 # 4 bits type
168 170 # 4 bits flags
169 171
170 172 l = struct.pack(r'<I', len(payload))
171 173 frame[0:3] = l[0:3]
172 174 struct.pack_into(r'<HBB', frame, 3, requestid, streamid, streamflags)
173 175 frame[7] = (typeid << 4) | flags
174 176 frame[8:] = payload
175 177
176 178 return frame
177 179
178 180 def makeframefromhumanstring(s):
179 181 """Create a frame from a human readable string
180 182
181 183 DANGER: NOT SAFE TO USE WITH UNTRUSTED INPUT BECAUSE OF POTENTIAL
182 184 eval() USAGE. DO NOT USE IN CORE.
183 185
184 186 Strings have the form:
185 187
186 188 <request-id> <stream-id> <stream-flags> <type> <flags> <payload>
187 189
188 190 This can be used by user-facing applications and tests for creating
189 191 frames easily without having to type out a bunch of constants.
190 192
191 193 Request ID and stream IDs are integers.
192 194
193 195 Stream flags, frame type, and flags can be specified by integer or
194 196 named constant.
195 197
196 198 Flags can be delimited by `|` to bitwise OR them together.
197 199
198 200 If the payload begins with ``cbor:``, the following string will be
199 201 evaluated as Python code and the resulting object will be fed into
200 202 a CBOR encoder. Otherwise, the payload is interpreted as a Python
201 203 byte string literal.
202 204 """
203 205 fields = s.split(b' ', 5)
204 206 requestid, streamid, streamflags, frametype, frameflags, payload = fields
205 207
206 208 requestid = int(requestid)
207 209 streamid = int(streamid)
208 210
209 211 finalstreamflags = 0
210 212 for flag in streamflags.split(b'|'):
211 213 if flag in STREAM_FLAGS:
212 214 finalstreamflags |= STREAM_FLAGS[flag]
213 215 else:
214 216 finalstreamflags |= int(flag)
215 217
216 218 if frametype in FRAME_TYPES:
217 219 frametype = FRAME_TYPES[frametype]
218 220 else:
219 221 frametype = int(frametype)
220 222
221 223 finalflags = 0
222 224 validflags = FRAME_TYPE_FLAGS[frametype]
223 225 for flag in frameflags.split(b'|'):
224 226 if flag in validflags:
225 227 finalflags |= validflags[flag]
226 228 else:
227 229 finalflags |= int(flag)
228 230
229 231 if payload.startswith(b'cbor:'):
230 232 payload = cbor.dumps(stringutil.evalpython(payload[5:]), canonical=True)
231 233
232 234 else:
233 235 payload = stringutil.unescapestr(payload)
234 236
235 237 return makeframe(requestid=requestid, streamid=streamid,
236 238 streamflags=finalstreamflags, typeid=frametype,
237 239 flags=finalflags, payload=payload)
238 240
239 241 def parseheader(data):
240 242 """Parse a unified framing protocol frame header from a buffer.
241 243
242 244 The header is expected to be in the buffer at offset 0 and the
243 245 buffer is expected to be large enough to hold a full header.
244 246 """
245 247 # 24 bits payload length (little endian)
246 248 # 16 bits request ID
247 249 # 8 bits stream ID
248 250 # 8 bits stream flags
249 251 # 4 bits frame type
250 252 # 4 bits frame flags
251 253 # ... payload
252 254 framelength = data[0] + 256 * data[1] + 16384 * data[2]
253 255 requestid, streamid, streamflags = struct.unpack_from(r'<HBB', data, 3)
254 256 typeflags = data[7]
255 257
256 258 frametype = (typeflags & 0xf0) >> 4
257 259 frameflags = typeflags & 0x0f
258 260
259 261 return frameheader(framelength, requestid, streamid, streamflags,
260 262 frametype, frameflags)
261 263
262 264 def readframe(fh):
263 265 """Read a unified framing protocol frame from a file object.
264 266
265 267 Returns a 3-tuple of (type, flags, payload) for the decoded frame or
266 268 None if no frame is available. May raise if a malformed frame is
267 269 seen.
268 270 """
269 271 header = bytearray(FRAME_HEADER_SIZE)
270 272
271 273 readcount = fh.readinto(header)
272 274
273 275 if readcount == 0:
274 276 return None
275 277
276 278 if readcount != FRAME_HEADER_SIZE:
277 279 raise error.Abort(_('received incomplete frame: got %d bytes: %s') %
278 280 (readcount, header))
279 281
280 282 h = parseheader(header)
281 283
282 284 payload = fh.read(h.length)
283 285 if len(payload) != h.length:
284 286 raise error.Abort(_('frame length error: expected %d; got %d') %
285 287 (h.length, len(payload)))
286 288
287 289 return frame(h.requestid, h.streamid, h.streamflags, h.typeid, h.flags,
288 290 payload)
289 291
290 292 def createcommandframes(stream, requestid, cmd, args, datafh=None,
291 293 maxframesize=DEFAULT_MAX_FRAME_SIZE):
292 294 """Create frames necessary to transmit a request to run a command.
293 295
294 296 This is a generator of bytearrays. Each item represents a frame
295 297 ready to be sent over the wire to a peer.
296 298 """
297 299 data = {b'name': cmd}
298 300 if args:
299 301 data[b'args'] = args
300 302
301 303 data = cbor.dumps(data, canonical=True)
302 304
303 305 offset = 0
304 306
305 307 while True:
306 308 flags = 0
307 309
308 310 # Must set new or continuation flag.
309 311 if not offset:
310 312 flags |= FLAG_COMMAND_REQUEST_NEW
311 313 else:
312 314 flags |= FLAG_COMMAND_REQUEST_CONTINUATION
313 315
314 316 # Data frames is set on all frames.
315 317 if datafh:
316 318 flags |= FLAG_COMMAND_REQUEST_EXPECT_DATA
317 319
318 320 payload = data[offset:offset + maxframesize]
319 321 offset += len(payload)
320 322
321 323 if len(payload) == maxframesize and offset < len(data):
322 324 flags |= FLAG_COMMAND_REQUEST_MORE_FRAMES
323 325
324 326 yield stream.makeframe(requestid=requestid,
325 327 typeid=FRAME_TYPE_COMMAND_REQUEST,
326 328 flags=flags,
327 329 payload=payload)
328 330
329 331 if not (flags & FLAG_COMMAND_REQUEST_MORE_FRAMES):
330 332 break
331 333
332 334 if datafh:
333 335 while True:
334 336 data = datafh.read(DEFAULT_MAX_FRAME_SIZE)
335 337
336 338 done = False
337 339 if len(data) == DEFAULT_MAX_FRAME_SIZE:
338 340 flags = FLAG_COMMAND_DATA_CONTINUATION
339 341 else:
340 342 flags = FLAG_COMMAND_DATA_EOS
341 343 assert datafh.read(1) == b''
342 344 done = True
343 345
344 346 yield stream.makeframe(requestid=requestid,
345 347 typeid=FRAME_TYPE_COMMAND_DATA,
346 348 flags=flags,
347 349 payload=data)
348 350
349 351 if done:
350 352 break
351 353
352 354 def createbytesresponseframesfrombytes(stream, requestid, data,
353 355 maxframesize=DEFAULT_MAX_FRAME_SIZE):
354 356 """Create a raw frame to send a bytes response from static bytes input.
355 357
356 358 Returns a generator of bytearrays.
357 359 """
358 360
359 361 # Simple case of a single frame.
360 362 if len(data) <= maxframesize:
361 363 yield stream.makeframe(requestid=requestid,
362 364 typeid=FRAME_TYPE_BYTES_RESPONSE,
363 365 flags=FLAG_BYTES_RESPONSE_EOS,
364 366 payload=data)
365 367 return
366 368
367 369 offset = 0
368 370 while True:
369 371 chunk = data[offset:offset + maxframesize]
370 372 offset += len(chunk)
371 373 done = offset == len(data)
372 374
373 375 if done:
374 376 flags = FLAG_BYTES_RESPONSE_EOS
375 377 else:
376 378 flags = FLAG_BYTES_RESPONSE_CONTINUATION
377 379
378 380 yield stream.makeframe(requestid=requestid,
379 381 typeid=FRAME_TYPE_BYTES_RESPONSE,
380 382 flags=flags,
381 383 payload=chunk)
382 384
383 385 if done:
384 386 break
385 387
386 388 def createerrorframe(stream, requestid, msg, protocol=False, application=False):
387 389 # TODO properly handle frame size limits.
388 390 assert len(msg) <= DEFAULT_MAX_FRAME_SIZE
389 391
390 392 flags = 0
391 393 if protocol:
392 394 flags |= FLAG_ERROR_RESPONSE_PROTOCOL
393 395 if application:
394 396 flags |= FLAG_ERROR_RESPONSE_APPLICATION
395 397
396 398 yield stream.makeframe(requestid=requestid,
397 399 typeid=FRAME_TYPE_ERROR_RESPONSE,
398 400 flags=flags,
399 401 payload=msg)
400 402
401 403 def createtextoutputframe(stream, requestid, atoms,
402 404 maxframesize=DEFAULT_MAX_FRAME_SIZE):
403 405 """Create a text output frame to render text to people.
404 406
405 407 ``atoms`` is a 3-tuple of (formatting string, args, labels).
406 408
407 409 The formatting string contains ``%s`` tokens to be replaced by the
408 410 corresponding indexed entry in ``args``. ``labels`` is an iterable of
409 411 formatters to be applied at rendering time. In terms of the ``ui``
410 412 class, each atom corresponds to a ``ui.write()``.
411 413 """
412 414 atomdicts = []
413 415
414 416 for (formatting, args, labels) in atoms:
415 417 # TODO look for localstr, other types here?
416 418
417 419 if not isinstance(formatting, bytes):
418 420 raise ValueError('must use bytes formatting strings')
419 421 for arg in args:
420 422 if not isinstance(arg, bytes):
421 423 raise ValueError('must use bytes for arguments')
422 424 for label in labels:
423 425 if not isinstance(label, bytes):
424 426 raise ValueError('must use bytes for labels')
425 427
426 428 # Formatting string must be ASCII.
427 429 formatting = formatting.decode(r'ascii', r'replace').encode(r'ascii')
428 430
429 431 # Arguments must be UTF-8.
430 432 args = [a.decode(r'utf-8', r'replace').encode(r'utf-8') for a in args]
431 433
432 434 # Labels must be ASCII.
433 435 labels = [l.decode(r'ascii', r'strict').encode(r'ascii')
434 436 for l in labels]
435 437
436 438 atom = {b'msg': formatting}
437 439 if args:
438 440 atom[b'args'] = args
439 441 if labels:
440 442 atom[b'labels'] = labels
441 443
442 444 atomdicts.append(atom)
443 445
444 446 payload = cbor.dumps(atomdicts, canonical=True)
445 447
446 448 if len(payload) > maxframesize:
447 449 raise ValueError('cannot encode data in a single frame')
448 450
449 451 yield stream.makeframe(requestid=requestid,
450 452 typeid=FRAME_TYPE_TEXT_OUTPUT,
451 453 flags=0,
452 454 payload=payload)
453 455
454 456 class stream(object):
455 457 """Represents a logical unidirectional series of frames."""
456 458
457 459 def __init__(self, streamid, active=False):
458 460 self.streamid = streamid
459 461 self._active = False
460 462
461 463 def makeframe(self, requestid, typeid, flags, payload):
462 464 """Create a frame to be sent out over this stream.
463 465
464 466 Only returns the frame instance. Does not actually send it.
465 467 """
466 468 streamflags = 0
467 469 if not self._active:
468 470 streamflags |= STREAM_FLAG_BEGIN_STREAM
469 471 self._active = True
470 472
471 473 return makeframe(requestid, self.streamid, streamflags, typeid, flags,
472 474 payload)
473 475
474 476 def ensureserverstream(stream):
475 477 if stream.streamid % 2:
476 478 raise error.ProgrammingError('server should only write to even '
477 479 'numbered streams; %d is not even' %
478 480 stream.streamid)
479 481
480 482 class serverreactor(object):
481 483 """Holds state of a server handling frame-based protocol requests.
482 484
483 485 This class is the "brain" of the unified frame-based protocol server
484 486 component. While the protocol is stateless from the perspective of
485 487 requests/commands, something needs to track which frames have been
486 488 received, what frames to expect, etc. This class is that thing.
487 489
488 490 Instances are modeled as a state machine of sorts. Instances are also
489 491 reactionary to external events. The point of this class is to encapsulate
490 492 the state of the connection and the exchange of frames, not to perform
491 493 work. Instead, callers tell this class when something occurs, like a
492 494 frame arriving. If that activity is worthy of a follow-up action (say
493 495 *run a command*), the return value of that handler will say so.
494 496
495 497 I/O and CPU intensive operations are purposefully delegated outside of
496 498 this class.
497 499
498 500 Consumers are expected to tell instances when events occur. They do so by
499 501 calling the various ``on*`` methods. These methods return a 2-tuple
500 502 describing any follow-up action(s) to take. The first element is the
501 503 name of an action to perform. The second is a data structure (usually
502 504 a dict) specific to that action that contains more information. e.g.
503 505 if the server wants to send frames back to the client, the data structure
504 506 will contain a reference to those frames.
505 507
506 508 Valid actions that consumers can be instructed to take are:
507 509
508 510 sendframes
509 511 Indicates that frames should be sent to the client. The ``framegen``
510 512 key contains a generator of frames that should be sent. The server
511 513 assumes that all frames are sent to the client.
512 514
513 515 error
514 516 Indicates that an error occurred. Consumer should probably abort.
515 517
516 518 runcommand
517 519 Indicates that the consumer should run a wire protocol command. Details
518 520 of the command to run are given in the data structure.
519 521
520 522 wantframe
521 523 Indicates that nothing of interest happened and the server is waiting on
522 524 more frames from the client before anything interesting can be done.
523 525
524 526 noop
525 527 Indicates no additional action is required.
526 528
527 529 Known Issues
528 530 ------------
529 531
530 532 There are no limits to the number of partially received commands or their
531 533 size. A malicious client could stream command request data and exhaust the
532 534 server's memory.
533 535
534 536 Partially received commands are not acted upon when end of input is
535 537 reached. Should the server error if it receives a partial request?
536 538 Should the client send a message to abort a partially transmitted request
537 539 to facilitate graceful shutdown?
538 540
539 541 Active requests that haven't been responded to aren't tracked. This means
540 542 that if we receive a command and instruct its dispatch, another command
541 543 with its request ID can come in over the wire and there will be a race
542 544 between who responds to what.
543 545 """
544 546
545 547 def __init__(self, deferoutput=False):
546 548 """Construct a new server reactor.
547 549
548 550 ``deferoutput`` can be used to indicate that no output frames should be
549 551 instructed to be sent until input has been exhausted. In this mode,
550 552 events that would normally generate output frames (such as a command
551 553 response being ready) will instead defer instructing the consumer to
552 554 send those frames. This is useful for half-duplex transports where the
553 555 sender cannot receive until all data has been transmitted.
554 556 """
555 557 self._deferoutput = deferoutput
556 558 self._state = 'idle'
557 559 self._nextoutgoingstreamid = 2
558 560 self._bufferedframegens = []
559 561 # stream id -> stream instance for all active streams from the client.
560 562 self._incomingstreams = {}
561 563 self._outgoingstreams = {}
562 564 # request id -> dict of commands that are actively being received.
563 565 self._receivingcommands = {}
564 566 # Request IDs that have been received and are actively being processed.
565 567 # Once all output for a request has been sent, it is removed from this
566 568 # set.
567 569 self._activecommands = set()
568 570
569 571 def onframerecv(self, frame):
570 572 """Process a frame that has been received off the wire.
571 573
572 574 Returns a dict with an ``action`` key that details what action,
573 575 if any, the consumer should take next.
574 576 """
575 577 if not frame.streamid % 2:
576 578 self._state = 'errored'
577 579 return self._makeerrorresult(
578 580 _('received frame with even numbered stream ID: %d') %
579 581 frame.streamid)
580 582
581 583 if frame.streamid not in self._incomingstreams:
582 584 if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM:
583 585 self._state = 'errored'
584 586 return self._makeerrorresult(
585 587 _('received frame on unknown inactive stream without '
586 588 'beginning of stream flag set'))
587 589
588 590 self._incomingstreams[frame.streamid] = stream(frame.streamid)
589 591
590 592 if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED:
591 593 # TODO handle decoding frames
592 594 self._state = 'errored'
593 595 raise error.ProgrammingError('support for decoding stream payloads '
594 596 'not yet implemented')
595 597
596 598 if frame.streamflags & STREAM_FLAG_END_STREAM:
597 599 del self._incomingstreams[frame.streamid]
598 600
599 601 handlers = {
600 602 'idle': self._onframeidle,
601 603 'command-receiving': self._onframecommandreceiving,
602 604 'errored': self._onframeerrored,
603 605 }
604 606
605 607 meth = handlers.get(self._state)
606 608 if not meth:
607 609 raise error.ProgrammingError('unhandled state: %s' % self._state)
608 610
609 611 return meth(frame)
610 612
611 613 def onbytesresponseready(self, stream, requestid, data):
612 614 """Signal that a bytes response is ready to be sent to the client.
613 615
614 616 The raw bytes response is passed as an argument.
615 617 """
616 618 ensureserverstream(stream)
617 619
618 620 def sendframes():
619 621 for frame in createbytesresponseframesfrombytes(stream, requestid,
620 622 data):
621 623 yield frame
622 624
623 625 self._activecommands.remove(requestid)
624 626
625 627 result = sendframes()
626 628
627 629 if self._deferoutput:
628 630 self._bufferedframegens.append(result)
629 631 return 'noop', {}
630 632 else:
631 633 return 'sendframes', {
632 634 'framegen': result,
633 635 }
634 636
635 637 def oninputeof(self):
636 638 """Signals that end of input has been received.
637 639
638 640 No more frames will be received. All pending activity should be
639 641 completed.
640 642 """
641 643 # TODO should we do anything about in-flight commands?
642 644
643 645 if not self._deferoutput or not self._bufferedframegens:
644 646 return 'noop', {}
645 647
646 648 # If we buffered all our responses, emit those.
647 649 def makegen():
648 650 for gen in self._bufferedframegens:
649 651 for frame in gen:
650 652 yield frame
651 653
652 654 return 'sendframes', {
653 655 'framegen': makegen(),
654 656 }
655 657
656 658 def onapplicationerror(self, stream, requestid, msg):
657 659 ensureserverstream(stream)
658 660
659 661 return 'sendframes', {
660 662 'framegen': createerrorframe(stream, requestid, msg,
661 663 application=True),
662 664 }
663 665
664 666 def makeoutputstream(self):
665 667 """Create a stream to be used for sending data to the client."""
666 668 streamid = self._nextoutgoingstreamid
667 669 self._nextoutgoingstreamid += 2
668 670
669 671 s = stream(streamid)
670 672 self._outgoingstreams[streamid] = s
671 673
672 674 return s
673 675
674 676 def _makeerrorresult(self, msg):
675 677 return 'error', {
676 678 'message': msg,
677 679 }
678 680
679 681 def _makeruncommandresult(self, requestid):
680 682 entry = self._receivingcommands[requestid]
681 683
682 684 if not entry['requestdone']:
683 685 self._state = 'errored'
684 686 raise error.ProgrammingError('should not be called without '
685 687 'requestdone set')
686 688
687 689 del self._receivingcommands[requestid]
688 690
689 691 if self._receivingcommands:
690 692 self._state = 'command-receiving'
691 693 else:
692 694 self._state = 'idle'
693 695
694 696 # Decode the payloads as CBOR.
695 697 entry['payload'].seek(0)
696 698 request = cbor.load(entry['payload'])
697 699
698 700 if b'name' not in request:
699 701 self._state = 'errored'
700 702 return self._makeerrorresult(
701 703 _('command request missing "name" field'))
702 704
703 705 if b'args' not in request:
704 706 request[b'args'] = {}
705 707
706 708 assert requestid not in self._activecommands
707 709 self._activecommands.add(requestid)
708 710
709 711 return 'runcommand', {
710 712 'requestid': requestid,
711 713 'command': request[b'name'],
712 714 'args': request[b'args'],
713 715 'data': entry['data'].getvalue() if entry['data'] else None,
714 716 }
715 717
716 718 def _makewantframeresult(self):
717 719 return 'wantframe', {
718 720 'state': self._state,
719 721 }
720 722
721 723 def _validatecommandrequestframe(self, frame):
722 724 new = frame.flags & FLAG_COMMAND_REQUEST_NEW
723 725 continuation = frame.flags & FLAG_COMMAND_REQUEST_CONTINUATION
724 726
725 727 if new and continuation:
726 728 self._state = 'errored'
727 729 return self._makeerrorresult(
728 730 _('received command request frame with both new and '
729 731 'continuation flags set'))
730 732
731 733 if not new and not continuation:
732 734 self._state = 'errored'
733 735 return self._makeerrorresult(
734 736 _('received command request frame with neither new nor '
735 737 'continuation flags set'))
736 738
737 739 def _onframeidle(self, frame):
738 740 # The only frame type that should be received in this state is a
739 741 # command request.
740 742 if frame.typeid != FRAME_TYPE_COMMAND_REQUEST:
741 743 self._state = 'errored'
742 744 return self._makeerrorresult(
743 745 _('expected command request frame; got %d') % frame.typeid)
744 746
745 747 res = self._validatecommandrequestframe(frame)
746 748 if res:
747 749 return res
748 750
749 751 if frame.requestid in self._receivingcommands:
750 752 self._state = 'errored'
751 753 return self._makeerrorresult(
752 754 _('request with ID %d already received') % frame.requestid)
753 755
754 756 if frame.requestid in self._activecommands:
755 757 self._state = 'errored'
756 758 return self._makeerrorresult(
757 759 _('request with ID %d is already active') % frame.requestid)
758 760
759 761 new = frame.flags & FLAG_COMMAND_REQUEST_NEW
760 762 moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES
761 763 expectingdata = frame.flags & FLAG_COMMAND_REQUEST_EXPECT_DATA
762 764
763 765 if not new:
764 766 self._state = 'errored'
765 767 return self._makeerrorresult(
766 768 _('received command request frame without new flag set'))
767 769
768 770 payload = util.bytesio()
769 771 payload.write(frame.payload)
770 772
771 773 self._receivingcommands[frame.requestid] = {
772 774 'payload': payload,
773 775 'data': None,
774 776 'requestdone': not moreframes,
775 777 'expectingdata': bool(expectingdata),
776 778 }
777 779
778 780 # This is the final frame for this request. Dispatch it.
779 781 if not moreframes and not expectingdata:
780 782 return self._makeruncommandresult(frame.requestid)
781 783
782 784 assert moreframes or expectingdata
783 785 self._state = 'command-receiving'
784 786 return self._makewantframeresult()
785 787
786 788 def _onframecommandreceiving(self, frame):
787 789 if frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
788 790 # Process new command requests as such.
789 791 if frame.flags & FLAG_COMMAND_REQUEST_NEW:
790 792 return self._onframeidle(frame)
791 793
792 794 res = self._validatecommandrequestframe(frame)
793 795 if res:
794 796 return res
795 797
796 798 # All other frames should be related to a command that is currently
797 799 # receiving but is not active.
798 800 if frame.requestid in self._activecommands:
799 801 self._state = 'errored'
800 802 return self._makeerrorresult(
801 803 _('received frame for request that is still active: %d') %
802 804 frame.requestid)
803 805
804 806 if frame.requestid not in self._receivingcommands:
805 807 self._state = 'errored'
806 808 return self._makeerrorresult(
807 809 _('received frame for request that is not receiving: %d') %
808 810 frame.requestid)
809 811
810 812 entry = self._receivingcommands[frame.requestid]
811 813
812 814 if frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
813 815 moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES
814 816 expectingdata = bool(frame.flags & FLAG_COMMAND_REQUEST_EXPECT_DATA)
815 817
816 818 if entry['requestdone']:
817 819 self._state = 'errored'
818 820 return self._makeerrorresult(
819 821 _('received command request frame when request frames '
820 822 'were supposedly done'))
821 823
822 824 if expectingdata != entry['expectingdata']:
823 825 self._state = 'errored'
824 826 return self._makeerrorresult(
825 827 _('mismatch between expect data flag and previous frame'))
826 828
827 829 entry['payload'].write(frame.payload)
828 830
829 831 if not moreframes:
830 832 entry['requestdone'] = True
831 833
832 834 if not moreframes and not expectingdata:
833 835 return self._makeruncommandresult(frame.requestid)
834 836
835 837 return self._makewantframeresult()
836 838
837 839 elif frame.typeid == FRAME_TYPE_COMMAND_DATA:
838 840 if not entry['expectingdata']:
839 841 self._state = 'errored'
840 842 return self._makeerrorresult(_(
841 843 'received command data frame for request that is not '
842 844 'expecting data: %d') % frame.requestid)
843 845
844 846 if entry['data'] is None:
845 847 entry['data'] = util.bytesio()
846 848
847 849 return self._handlecommanddataframe(frame, entry)
848 850 else:
849 851 self._state = 'errored'
850 852 return self._makeerrorresult(_(
851 853 'received unexpected frame type: %d') % frame.typeid)
852 854
853 855 def _handlecommanddataframe(self, frame, entry):
854 856 assert frame.typeid == FRAME_TYPE_COMMAND_DATA
855 857
856 858 # TODO support streaming data instead of buffering it.
857 859 entry['data'].write(frame.payload)
858 860
859 861 if frame.flags & FLAG_COMMAND_DATA_CONTINUATION:
860 862 return self._makewantframeresult()
861 863 elif frame.flags & FLAG_COMMAND_DATA_EOS:
862 864 entry['data'].seek(0)
863 865 return self._makeruncommandresult(frame.requestid)
864 866 else:
865 867 self._state = 'errored'
866 868 return self._makeerrorresult(_('command data frame without '
867 869 'flags'))
868 870
869 871 def _onframeerrored(self, frame):
870 872 return self._makeerrorresult(_('server already errored'))
General Comments 0
You need to be logged in to leave comments. Login now