##// END OF EJS Templates
wireprotov2: document client reactor actions...
Gregory Szorc -
r40163:080419fa default
parent child Browse files
Show More
@@ -1,1497 +1,1531 b''
1 1 # wireprotoframing.py - unified framing protocol for wire protocol
2 2 #
3 3 # Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 # This file contains functionality to support the unified frame-based wire
9 9 # protocol. For details about the protocol, see
10 10 # `hg help internals.wireprotocol`.
11 11
12 12 from __future__ import absolute_import
13 13
14 14 import collections
15 15 import struct
16 16
17 17 from .i18n import _
18 18 from .thirdparty import (
19 19 attr,
20 20 )
21 21 from . import (
22 22 encoding,
23 23 error,
24 24 pycompat,
25 25 util,
26 26 wireprototypes,
27 27 )
28 28 from .utils import (
29 29 cborutil,
30 30 stringutil,
31 31 )
32 32
33 33 FRAME_HEADER_SIZE = 8
34 34 DEFAULT_MAX_FRAME_SIZE = 32768
35 35
36 36 STREAM_FLAG_BEGIN_STREAM = 0x01
37 37 STREAM_FLAG_END_STREAM = 0x02
38 38 STREAM_FLAG_ENCODING_APPLIED = 0x04
39 39
40 40 STREAM_FLAGS = {
41 41 b'stream-begin': STREAM_FLAG_BEGIN_STREAM,
42 42 b'stream-end': STREAM_FLAG_END_STREAM,
43 43 b'encoded': STREAM_FLAG_ENCODING_APPLIED,
44 44 }
45 45
46 46 FRAME_TYPE_COMMAND_REQUEST = 0x01
47 47 FRAME_TYPE_COMMAND_DATA = 0x02
48 48 FRAME_TYPE_COMMAND_RESPONSE = 0x03
49 49 FRAME_TYPE_ERROR_RESPONSE = 0x05
50 50 FRAME_TYPE_TEXT_OUTPUT = 0x06
51 51 FRAME_TYPE_PROGRESS = 0x07
52 52 FRAME_TYPE_SENDER_PROTOCOL_SETTINGS = 0x08
53 53 FRAME_TYPE_STREAM_SETTINGS = 0x09
54 54
55 55 FRAME_TYPES = {
56 56 b'command-request': FRAME_TYPE_COMMAND_REQUEST,
57 57 b'command-data': FRAME_TYPE_COMMAND_DATA,
58 58 b'command-response': FRAME_TYPE_COMMAND_RESPONSE,
59 59 b'error-response': FRAME_TYPE_ERROR_RESPONSE,
60 60 b'text-output': FRAME_TYPE_TEXT_OUTPUT,
61 61 b'progress': FRAME_TYPE_PROGRESS,
62 62 b'sender-protocol-settings': FRAME_TYPE_SENDER_PROTOCOL_SETTINGS,
63 63 b'stream-settings': FRAME_TYPE_STREAM_SETTINGS,
64 64 }
65 65
66 66 FLAG_COMMAND_REQUEST_NEW = 0x01
67 67 FLAG_COMMAND_REQUEST_CONTINUATION = 0x02
68 68 FLAG_COMMAND_REQUEST_MORE_FRAMES = 0x04
69 69 FLAG_COMMAND_REQUEST_EXPECT_DATA = 0x08
70 70
71 71 FLAGS_COMMAND_REQUEST = {
72 72 b'new': FLAG_COMMAND_REQUEST_NEW,
73 73 b'continuation': FLAG_COMMAND_REQUEST_CONTINUATION,
74 74 b'more': FLAG_COMMAND_REQUEST_MORE_FRAMES,
75 75 b'have-data': FLAG_COMMAND_REQUEST_EXPECT_DATA,
76 76 }
77 77
78 78 FLAG_COMMAND_DATA_CONTINUATION = 0x01
79 79 FLAG_COMMAND_DATA_EOS = 0x02
80 80
81 81 FLAGS_COMMAND_DATA = {
82 82 b'continuation': FLAG_COMMAND_DATA_CONTINUATION,
83 83 b'eos': FLAG_COMMAND_DATA_EOS,
84 84 }
85 85
86 86 FLAG_COMMAND_RESPONSE_CONTINUATION = 0x01
87 87 FLAG_COMMAND_RESPONSE_EOS = 0x02
88 88
89 89 FLAGS_COMMAND_RESPONSE = {
90 90 b'continuation': FLAG_COMMAND_RESPONSE_CONTINUATION,
91 91 b'eos': FLAG_COMMAND_RESPONSE_EOS,
92 92 }
93 93
94 94 FLAG_SENDER_PROTOCOL_SETTINGS_CONTINUATION = 0x01
95 95 FLAG_SENDER_PROTOCOL_SETTINGS_EOS = 0x02
96 96
97 97 FLAGS_SENDER_PROTOCOL_SETTINGS = {
98 98 b'continuation': FLAG_SENDER_PROTOCOL_SETTINGS_CONTINUATION,
99 99 b'eos': FLAG_SENDER_PROTOCOL_SETTINGS_EOS,
100 100 }
101 101
102 102 FLAG_STREAM_ENCODING_SETTINGS_CONTINUATION = 0x01
103 103 FLAG_STREAM_ENCODING_SETTINGS_EOS = 0x02
104 104
105 105 FLAGS_STREAM_ENCODING_SETTINGS = {
106 106 b'continuation': FLAG_STREAM_ENCODING_SETTINGS_CONTINUATION,
107 107 b'eos': FLAG_STREAM_ENCODING_SETTINGS_EOS,
108 108 }
109 109
110 110 # Maps frame types to their available flags.
111 111 FRAME_TYPE_FLAGS = {
112 112 FRAME_TYPE_COMMAND_REQUEST: FLAGS_COMMAND_REQUEST,
113 113 FRAME_TYPE_COMMAND_DATA: FLAGS_COMMAND_DATA,
114 114 FRAME_TYPE_COMMAND_RESPONSE: FLAGS_COMMAND_RESPONSE,
115 115 FRAME_TYPE_ERROR_RESPONSE: {},
116 116 FRAME_TYPE_TEXT_OUTPUT: {},
117 117 FRAME_TYPE_PROGRESS: {},
118 118 FRAME_TYPE_SENDER_PROTOCOL_SETTINGS: FLAGS_SENDER_PROTOCOL_SETTINGS,
119 119 FRAME_TYPE_STREAM_SETTINGS: FLAGS_STREAM_ENCODING_SETTINGS,
120 120 }
121 121
122 122 ARGUMENT_RECORD_HEADER = struct.Struct(r'<HH')
123 123
124 124 def humanflags(mapping, value):
125 125 """Convert a numeric flags value to a human value, using a mapping table."""
126 126 namemap = {v: k for k, v in mapping.iteritems()}
127 127 flags = []
128 128 val = 1
129 129 while value >= val:
130 130 if value & val:
131 131 flags.append(namemap.get(val, '<unknown 0x%02x>' % val))
132 132 val <<= 1
133 133
134 134 return b'|'.join(flags)
135 135
136 136 @attr.s(slots=True)
137 137 class frameheader(object):
138 138 """Represents the data in a frame header."""
139 139
140 140 length = attr.ib()
141 141 requestid = attr.ib()
142 142 streamid = attr.ib()
143 143 streamflags = attr.ib()
144 144 typeid = attr.ib()
145 145 flags = attr.ib()
146 146
147 147 @attr.s(slots=True, repr=False)
148 148 class frame(object):
149 149 """Represents a parsed frame."""
150 150
151 151 requestid = attr.ib()
152 152 streamid = attr.ib()
153 153 streamflags = attr.ib()
154 154 typeid = attr.ib()
155 155 flags = attr.ib()
156 156 payload = attr.ib()
157 157
158 158 @encoding.strmethod
159 159 def __repr__(self):
160 160 typename = '<unknown 0x%02x>' % self.typeid
161 161 for name, value in FRAME_TYPES.iteritems():
162 162 if value == self.typeid:
163 163 typename = name
164 164 break
165 165
166 166 return ('frame(size=%d; request=%d; stream=%d; streamflags=%s; '
167 167 'type=%s; flags=%s)' % (
168 168 len(self.payload), self.requestid, self.streamid,
169 169 humanflags(STREAM_FLAGS, self.streamflags), typename,
170 170 humanflags(FRAME_TYPE_FLAGS.get(self.typeid, {}), self.flags)))
171 171
172 172 def makeframe(requestid, streamid, streamflags, typeid, flags, payload):
173 173 """Assemble a frame into a byte array."""
174 174 # TODO assert size of payload.
175 175 frame = bytearray(FRAME_HEADER_SIZE + len(payload))
176 176
177 177 # 24 bits length
178 178 # 16 bits request id
179 179 # 8 bits stream id
180 180 # 8 bits stream flags
181 181 # 4 bits type
182 182 # 4 bits flags
183 183
184 184 l = struct.pack(r'<I', len(payload))
185 185 frame[0:3] = l[0:3]
186 186 struct.pack_into(r'<HBB', frame, 3, requestid, streamid, streamflags)
187 187 frame[7] = (typeid << 4) | flags
188 188 frame[8:] = payload
189 189
190 190 return frame
191 191
192 192 def makeframefromhumanstring(s):
193 193 """Create a frame from a human readable string
194 194
195 195 Strings have the form:
196 196
197 197 <request-id> <stream-id> <stream-flags> <type> <flags> <payload>
198 198
199 199 This can be used by user-facing applications and tests for creating
200 200 frames easily without having to type out a bunch of constants.
201 201
202 202 Request ID and stream IDs are integers.
203 203
204 204 Stream flags, frame type, and flags can be specified by integer or
205 205 named constant.
206 206
207 207 Flags can be delimited by `|` to bitwise OR them together.
208 208
209 209 If the payload begins with ``cbor:``, the following string will be
210 210 evaluated as Python literal and the resulting object will be fed into
211 211 a CBOR encoder. Otherwise, the payload is interpreted as a Python
212 212 byte string literal.
213 213 """
214 214 fields = s.split(b' ', 5)
215 215 requestid, streamid, streamflags, frametype, frameflags, payload = fields
216 216
217 217 requestid = int(requestid)
218 218 streamid = int(streamid)
219 219
220 220 finalstreamflags = 0
221 221 for flag in streamflags.split(b'|'):
222 222 if flag in STREAM_FLAGS:
223 223 finalstreamflags |= STREAM_FLAGS[flag]
224 224 else:
225 225 finalstreamflags |= int(flag)
226 226
227 227 if frametype in FRAME_TYPES:
228 228 frametype = FRAME_TYPES[frametype]
229 229 else:
230 230 frametype = int(frametype)
231 231
232 232 finalflags = 0
233 233 validflags = FRAME_TYPE_FLAGS[frametype]
234 234 for flag in frameflags.split(b'|'):
235 235 if flag in validflags:
236 236 finalflags |= validflags[flag]
237 237 else:
238 238 finalflags |= int(flag)
239 239
240 240 if payload.startswith(b'cbor:'):
241 241 payload = b''.join(cborutil.streamencode(
242 242 stringutil.evalpythonliteral(payload[5:])))
243 243
244 244 else:
245 245 payload = stringutil.unescapestr(payload)
246 246
247 247 return makeframe(requestid=requestid, streamid=streamid,
248 248 streamflags=finalstreamflags, typeid=frametype,
249 249 flags=finalflags, payload=payload)
250 250
251 251 def parseheader(data):
252 252 """Parse a unified framing protocol frame header from a buffer.
253 253
254 254 The header is expected to be in the buffer at offset 0 and the
255 255 buffer is expected to be large enough to hold a full header.
256 256 """
257 257 # 24 bits payload length (little endian)
258 258 # 16 bits request ID
259 259 # 8 bits stream ID
260 260 # 8 bits stream flags
261 261 # 4 bits frame type
262 262 # 4 bits frame flags
263 263 # ... payload
264 264 framelength = data[0] + 256 * data[1] + 16384 * data[2]
265 265 requestid, streamid, streamflags = struct.unpack_from(r'<HBB', data, 3)
266 266 typeflags = data[7]
267 267
268 268 frametype = (typeflags & 0xf0) >> 4
269 269 frameflags = typeflags & 0x0f
270 270
271 271 return frameheader(framelength, requestid, streamid, streamflags,
272 272 frametype, frameflags)
273 273
274 274 def readframe(fh):
275 275 """Read a unified framing protocol frame from a file object.
276 276
277 277 Returns a 3-tuple of (type, flags, payload) for the decoded frame or
278 278 None if no frame is available. May raise if a malformed frame is
279 279 seen.
280 280 """
281 281 header = bytearray(FRAME_HEADER_SIZE)
282 282
283 283 readcount = fh.readinto(header)
284 284
285 285 if readcount == 0:
286 286 return None
287 287
288 288 if readcount != FRAME_HEADER_SIZE:
289 289 raise error.Abort(_('received incomplete frame: got %d bytes: %s') %
290 290 (readcount, header))
291 291
292 292 h = parseheader(header)
293 293
294 294 payload = fh.read(h.length)
295 295 if len(payload) != h.length:
296 296 raise error.Abort(_('frame length error: expected %d; got %d') %
297 297 (h.length, len(payload)))
298 298
299 299 return frame(h.requestid, h.streamid, h.streamflags, h.typeid, h.flags,
300 300 payload)
301 301
302 302 def createcommandframes(stream, requestid, cmd, args, datafh=None,
303 303 maxframesize=DEFAULT_MAX_FRAME_SIZE,
304 304 redirect=None):
305 305 """Create frames necessary to transmit a request to run a command.
306 306
307 307 This is a generator of bytearrays. Each item represents a frame
308 308 ready to be sent over the wire to a peer.
309 309 """
310 310 data = {b'name': cmd}
311 311 if args:
312 312 data[b'args'] = args
313 313
314 314 if redirect:
315 315 data[b'redirect'] = redirect
316 316
317 317 data = b''.join(cborutil.streamencode(data))
318 318
319 319 offset = 0
320 320
321 321 while True:
322 322 flags = 0
323 323
324 324 # Must set new or continuation flag.
325 325 if not offset:
326 326 flags |= FLAG_COMMAND_REQUEST_NEW
327 327 else:
328 328 flags |= FLAG_COMMAND_REQUEST_CONTINUATION
329 329
330 330 # Data frames is set on all frames.
331 331 if datafh:
332 332 flags |= FLAG_COMMAND_REQUEST_EXPECT_DATA
333 333
334 334 payload = data[offset:offset + maxframesize]
335 335 offset += len(payload)
336 336
337 337 if len(payload) == maxframesize and offset < len(data):
338 338 flags |= FLAG_COMMAND_REQUEST_MORE_FRAMES
339 339
340 340 yield stream.makeframe(requestid=requestid,
341 341 typeid=FRAME_TYPE_COMMAND_REQUEST,
342 342 flags=flags,
343 343 payload=payload)
344 344
345 345 if not (flags & FLAG_COMMAND_REQUEST_MORE_FRAMES):
346 346 break
347 347
348 348 if datafh:
349 349 while True:
350 350 data = datafh.read(DEFAULT_MAX_FRAME_SIZE)
351 351
352 352 done = False
353 353 if len(data) == DEFAULT_MAX_FRAME_SIZE:
354 354 flags = FLAG_COMMAND_DATA_CONTINUATION
355 355 else:
356 356 flags = FLAG_COMMAND_DATA_EOS
357 357 assert datafh.read(1) == b''
358 358 done = True
359 359
360 360 yield stream.makeframe(requestid=requestid,
361 361 typeid=FRAME_TYPE_COMMAND_DATA,
362 362 flags=flags,
363 363 payload=data)
364 364
365 365 if done:
366 366 break
367 367
368 368 def createcommandresponseframesfrombytes(stream, requestid, data,
369 369 maxframesize=DEFAULT_MAX_FRAME_SIZE):
370 370 """Create a raw frame to send a bytes response from static bytes input.
371 371
372 372 Returns a generator of bytearrays.
373 373 """
374 374 # Automatically send the overall CBOR response map.
375 375 overall = b''.join(cborutil.streamencode({b'status': b'ok'}))
376 376 if len(overall) > maxframesize:
377 377 raise error.ProgrammingError('not yet implemented')
378 378
379 379 # Simple case where we can fit the full response in a single frame.
380 380 if len(overall) + len(data) <= maxframesize:
381 381 flags = FLAG_COMMAND_RESPONSE_EOS
382 382 yield stream.makeframe(requestid=requestid,
383 383 typeid=FRAME_TYPE_COMMAND_RESPONSE,
384 384 flags=flags,
385 385 payload=overall + data)
386 386 return
387 387
388 388 # It's easier to send the overall CBOR map in its own frame than to track
389 389 # offsets.
390 390 yield stream.makeframe(requestid=requestid,
391 391 typeid=FRAME_TYPE_COMMAND_RESPONSE,
392 392 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
393 393 payload=overall)
394 394
395 395 offset = 0
396 396 while True:
397 397 chunk = data[offset:offset + maxframesize]
398 398 offset += len(chunk)
399 399 done = offset == len(data)
400 400
401 401 if done:
402 402 flags = FLAG_COMMAND_RESPONSE_EOS
403 403 else:
404 404 flags = FLAG_COMMAND_RESPONSE_CONTINUATION
405 405
406 406 yield stream.makeframe(requestid=requestid,
407 407 typeid=FRAME_TYPE_COMMAND_RESPONSE,
408 408 flags=flags,
409 409 payload=chunk)
410 410
411 411 if done:
412 412 break
413 413
414 414 def createbytesresponseframesfromgen(stream, requestid, gen,
415 415 maxframesize=DEFAULT_MAX_FRAME_SIZE):
416 416 """Generator of frames from a generator of byte chunks.
417 417
418 418 This assumes that another frame will follow whatever this emits. i.e.
419 419 this always emits the continuation flag and never emits the end-of-stream
420 420 flag.
421 421 """
422 422 cb = util.chunkbuffer(gen)
423 423 flags = FLAG_COMMAND_RESPONSE_CONTINUATION
424 424
425 425 while True:
426 426 chunk = cb.read(maxframesize)
427 427 if not chunk:
428 428 break
429 429
430 430 yield stream.makeframe(requestid=requestid,
431 431 typeid=FRAME_TYPE_COMMAND_RESPONSE,
432 432 flags=flags,
433 433 payload=chunk)
434 434
435 435 flags |= FLAG_COMMAND_RESPONSE_CONTINUATION
436 436
437 437 def createcommandresponseokframe(stream, requestid):
438 438 overall = b''.join(cborutil.streamencode({b'status': b'ok'}))
439 439
440 440 return stream.makeframe(requestid=requestid,
441 441 typeid=FRAME_TYPE_COMMAND_RESPONSE,
442 442 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
443 443 payload=overall)
444 444
445 445 def createcommandresponseeosframe(stream, requestid):
446 446 """Create an empty payload frame representing command end-of-stream."""
447 447 return stream.makeframe(requestid=requestid,
448 448 typeid=FRAME_TYPE_COMMAND_RESPONSE,
449 449 flags=FLAG_COMMAND_RESPONSE_EOS,
450 450 payload=b'')
451 451
452 452 def createalternatelocationresponseframe(stream, requestid, location):
453 453 data = {
454 454 b'status': b'redirect',
455 455 b'location': {
456 456 b'url': location.url,
457 457 b'mediatype': location.mediatype,
458 458 }
459 459 }
460 460
461 461 for a in (r'size', r'fullhashes', r'fullhashseed', r'serverdercerts',
462 462 r'servercadercerts'):
463 463 value = getattr(location, a)
464 464 if value is not None:
465 465 data[b'location'][pycompat.bytestr(a)] = value
466 466
467 467 return stream.makeframe(requestid=requestid,
468 468 typeid=FRAME_TYPE_COMMAND_RESPONSE,
469 469 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
470 470 payload=b''.join(cborutil.streamencode(data)))
471 471
472 472 def createcommanderrorresponse(stream, requestid, message, args=None):
473 473 # TODO should this be using a list of {'msg': ..., 'args': {}} so atom
474 474 # formatting works consistently?
475 475 m = {
476 476 b'status': b'error',
477 477 b'error': {
478 478 b'message': message,
479 479 }
480 480 }
481 481
482 482 if args:
483 483 m[b'error'][b'args'] = args
484 484
485 485 overall = b''.join(cborutil.streamencode(m))
486 486
487 487 yield stream.makeframe(requestid=requestid,
488 488 typeid=FRAME_TYPE_COMMAND_RESPONSE,
489 489 flags=FLAG_COMMAND_RESPONSE_EOS,
490 490 payload=overall)
491 491
492 492 def createerrorframe(stream, requestid, msg, errtype):
493 493 # TODO properly handle frame size limits.
494 494 assert len(msg) <= DEFAULT_MAX_FRAME_SIZE
495 495
496 496 payload = b''.join(cborutil.streamencode({
497 497 b'type': errtype,
498 498 b'message': [{b'msg': msg}],
499 499 }))
500 500
501 501 yield stream.makeframe(requestid=requestid,
502 502 typeid=FRAME_TYPE_ERROR_RESPONSE,
503 503 flags=0,
504 504 payload=payload)
505 505
506 506 def createtextoutputframe(stream, requestid, atoms,
507 507 maxframesize=DEFAULT_MAX_FRAME_SIZE):
508 508 """Create a text output frame to render text to people.
509 509
510 510 ``atoms`` is a 3-tuple of (formatting string, args, labels).
511 511
512 512 The formatting string contains ``%s`` tokens to be replaced by the
513 513 corresponding indexed entry in ``args``. ``labels`` is an iterable of
514 514 formatters to be applied at rendering time. In terms of the ``ui``
515 515 class, each atom corresponds to a ``ui.write()``.
516 516 """
517 517 atomdicts = []
518 518
519 519 for (formatting, args, labels) in atoms:
520 520 # TODO look for localstr, other types here?
521 521
522 522 if not isinstance(formatting, bytes):
523 523 raise ValueError('must use bytes formatting strings')
524 524 for arg in args:
525 525 if not isinstance(arg, bytes):
526 526 raise ValueError('must use bytes for arguments')
527 527 for label in labels:
528 528 if not isinstance(label, bytes):
529 529 raise ValueError('must use bytes for labels')
530 530
531 531 # Formatting string must be ASCII.
532 532 formatting = formatting.decode(r'ascii', r'replace').encode(r'ascii')
533 533
534 534 # Arguments must be UTF-8.
535 535 args = [a.decode(r'utf-8', r'replace').encode(r'utf-8') for a in args]
536 536
537 537 # Labels must be ASCII.
538 538 labels = [l.decode(r'ascii', r'strict').encode(r'ascii')
539 539 for l in labels]
540 540
541 541 atom = {b'msg': formatting}
542 542 if args:
543 543 atom[b'args'] = args
544 544 if labels:
545 545 atom[b'labels'] = labels
546 546
547 547 atomdicts.append(atom)
548 548
549 549 payload = b''.join(cborutil.streamencode(atomdicts))
550 550
551 551 if len(payload) > maxframesize:
552 552 raise ValueError('cannot encode data in a single frame')
553 553
554 554 yield stream.makeframe(requestid=requestid,
555 555 typeid=FRAME_TYPE_TEXT_OUTPUT,
556 556 flags=0,
557 557 payload=payload)
558 558
559 559 class bufferingcommandresponseemitter(object):
560 560 """Helper object to emit command response frames intelligently.
561 561
562 562 Raw command response data is likely emitted in chunks much smaller
563 563 than what can fit in a single frame. This class exists to buffer
564 564 chunks until enough data is available to fit in a single frame.
565 565
566 566 TODO we'll need something like this when compression is supported.
567 567 So it might make sense to implement this functionality at the stream
568 568 level.
569 569 """
570 570 def __init__(self, stream, requestid, maxframesize=DEFAULT_MAX_FRAME_SIZE):
571 571 self._stream = stream
572 572 self._requestid = requestid
573 573 self._maxsize = maxframesize
574 574 self._chunks = []
575 575 self._chunkssize = 0
576 576
577 577 def send(self, data):
578 578 """Send new data for emission.
579 579
580 580 Is a generator of new frames that were derived from the new input.
581 581
582 582 If the special input ``None`` is received, flushes all buffered
583 583 data to frames.
584 584 """
585 585
586 586 if data is None:
587 587 for frame in self._flush():
588 588 yield frame
589 589 return
590 590
591 591 # There is a ton of potential to do more complicated things here.
592 592 # Our immediate goal is to coalesce small chunks into big frames,
593 593 # not achieve the fewest number of frames possible. So we go with
594 594 # a simple implementation:
595 595 #
596 596 # * If a chunk is too large for a frame, we flush and emit frames
597 597 # for the new chunk.
598 598 # * If a chunk can be buffered without total buffered size limits
599 599 # being exceeded, we do that.
600 600 # * If a chunk causes us to go over our buffering limit, we flush
601 601 # and then buffer the new chunk.
602 602
603 603 if len(data) > self._maxsize:
604 604 for frame in self._flush():
605 605 yield frame
606 606
607 607 # Now emit frames for the big chunk.
608 608 offset = 0
609 609 while True:
610 610 chunk = data[offset:offset + self._maxsize]
611 611 offset += len(chunk)
612 612
613 613 yield self._stream.makeframe(
614 614 self._requestid,
615 615 typeid=FRAME_TYPE_COMMAND_RESPONSE,
616 616 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
617 617 payload=chunk)
618 618
619 619 if offset == len(data):
620 620 return
621 621
622 622 # If we don't have enough to constitute a full frame, buffer and
623 623 # return.
624 624 if len(data) + self._chunkssize < self._maxsize:
625 625 self._chunks.append(data)
626 626 self._chunkssize += len(data)
627 627 return
628 628
629 629 # Else flush what we have and buffer the new chunk. We could do
630 630 # something more intelligent here, like break the chunk. Let's
631 631 # keep things simple for now.
632 632 for frame in self._flush():
633 633 yield frame
634 634
635 635 self._chunks.append(data)
636 636 self._chunkssize = len(data)
637 637
638 638 def _flush(self):
639 639 payload = b''.join(self._chunks)
640 640 assert len(payload) <= self._maxsize
641 641
642 642 self._chunks[:] = []
643 643 self._chunkssize = 0
644 644
645 645 yield self._stream.makeframe(
646 646 self._requestid,
647 647 typeid=FRAME_TYPE_COMMAND_RESPONSE,
648 648 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
649 649 payload=payload)
650 650
651 651 class stream(object):
652 652 """Represents a logical unidirectional series of frames."""
653 653
654 654 def __init__(self, streamid, active=False):
655 655 self.streamid = streamid
656 656 self._active = active
657 657
658 658 def makeframe(self, requestid, typeid, flags, payload):
659 659 """Create a frame to be sent out over this stream.
660 660
661 661 Only returns the frame instance. Does not actually send it.
662 662 """
663 663 streamflags = 0
664 664 if not self._active:
665 665 streamflags |= STREAM_FLAG_BEGIN_STREAM
666 666 self._active = True
667 667
668 668 return makeframe(requestid, self.streamid, streamflags, typeid, flags,
669 669 payload)
670 670
671 671 def ensureserverstream(stream):
672 672 if stream.streamid % 2:
673 673 raise error.ProgrammingError('server should only write to even '
674 674 'numbered streams; %d is not even' %
675 675 stream.streamid)
676 676
677 677 DEFAULT_PROTOCOL_SETTINGS = {
678 678 'contentencodings': [b'identity'],
679 679 }
680 680
681 681 class serverreactor(object):
682 682 """Holds state of a server handling frame-based protocol requests.
683 683
684 684 This class is the "brain" of the unified frame-based protocol server
685 685 component. While the protocol is stateless from the perspective of
686 686 requests/commands, something needs to track which frames have been
687 687 received, what frames to expect, etc. This class is that thing.
688 688
689 689 Instances are modeled as a state machine of sorts. Instances are also
690 690 reactionary to external events. The point of this class is to encapsulate
691 691 the state of the connection and the exchange of frames, not to perform
692 692 work. Instead, callers tell this class when something occurs, like a
693 693 frame arriving. If that activity is worthy of a follow-up action (say
694 694 *run a command*), the return value of that handler will say so.
695 695
696 696 I/O and CPU intensive operations are purposefully delegated outside of
697 697 this class.
698 698
699 699 Consumers are expected to tell instances when events occur. They do so by
700 700 calling the various ``on*`` methods. These methods return a 2-tuple
701 701 describing any follow-up action(s) to take. The first element is the
702 702 name of an action to perform. The second is a data structure (usually
703 703 a dict) specific to that action that contains more information. e.g.
704 704 if the server wants to send frames back to the client, the data structure
705 705 will contain a reference to those frames.
706 706
707 707 Valid actions that consumers can be instructed to take are:
708 708
709 709 sendframes
710 710 Indicates that frames should be sent to the client. The ``framegen``
711 711 key contains a generator of frames that should be sent. The server
712 712 assumes that all frames are sent to the client.
713 713
714 714 error
715 715 Indicates that an error occurred. Consumer should probably abort.
716 716
717 717 runcommand
718 718 Indicates that the consumer should run a wire protocol command. Details
719 719 of the command to run are given in the data structure.
720 720
721 721 wantframe
722 722 Indicates that nothing of interest happened and the server is waiting on
723 723 more frames from the client before anything interesting can be done.
724 724
725 725 noop
726 726 Indicates no additional action is required.
727 727
728 728 Known Issues
729 729 ------------
730 730
731 731 There are no limits to the number of partially received commands or their
732 732 size. A malicious client could stream command request data and exhaust the
733 733 server's memory.
734 734
735 735 Partially received commands are not acted upon when end of input is
736 736 reached. Should the server error if it receives a partial request?
737 737 Should the client send a message to abort a partially transmitted request
738 738 to facilitate graceful shutdown?
739 739
740 740 Active requests that haven't been responded to aren't tracked. This means
741 741 that if we receive a command and instruct its dispatch, another command
742 742 with its request ID can come in over the wire and there will be a race
743 743 between who responds to what.
744 744 """
745 745
746 746 def __init__(self, deferoutput=False):
747 747 """Construct a new server reactor.
748 748
749 749 ``deferoutput`` can be used to indicate that no output frames should be
750 750 instructed to be sent until input has been exhausted. In this mode,
751 751 events that would normally generate output frames (such as a command
752 752 response being ready) will instead defer instructing the consumer to
753 753 send those frames. This is useful for half-duplex transports where the
754 754 sender cannot receive until all data has been transmitted.
755 755 """
756 756 self._deferoutput = deferoutput
757 757 self._state = 'initial'
758 758 self._nextoutgoingstreamid = 2
759 759 self._bufferedframegens = []
760 760 # stream id -> stream instance for all active streams from the client.
761 761 self._incomingstreams = {}
762 762 self._outgoingstreams = {}
763 763 # request id -> dict of commands that are actively being received.
764 764 self._receivingcommands = {}
765 765 # Request IDs that have been received and are actively being processed.
766 766 # Once all output for a request has been sent, it is removed from this
767 767 # set.
768 768 self._activecommands = set()
769 769
770 770 self._protocolsettingsdecoder = None
771 771
772 772 # Sender protocol settings are optional. Set implied default values.
773 773 self._sendersettings = dict(DEFAULT_PROTOCOL_SETTINGS)
774 774
775 775 def onframerecv(self, frame):
776 776 """Process a frame that has been received off the wire.
777 777
778 778 Returns a dict with an ``action`` key that details what action,
779 779 if any, the consumer should take next.
780 780 """
781 781 if not frame.streamid % 2:
782 782 self._state = 'errored'
783 783 return self._makeerrorresult(
784 784 _('received frame with even numbered stream ID: %d') %
785 785 frame.streamid)
786 786
787 787 if frame.streamid not in self._incomingstreams:
788 788 if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM:
789 789 self._state = 'errored'
790 790 return self._makeerrorresult(
791 791 _('received frame on unknown inactive stream without '
792 792 'beginning of stream flag set'))
793 793
794 794 self._incomingstreams[frame.streamid] = stream(frame.streamid)
795 795
796 796 if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED:
797 797 # TODO handle decoding frames
798 798 self._state = 'errored'
799 799 raise error.ProgrammingError('support for decoding stream payloads '
800 800 'not yet implemented')
801 801
802 802 if frame.streamflags & STREAM_FLAG_END_STREAM:
803 803 del self._incomingstreams[frame.streamid]
804 804
805 805 handlers = {
806 806 'initial': self._onframeinitial,
807 807 'protocol-settings-receiving': self._onframeprotocolsettings,
808 808 'idle': self._onframeidle,
809 809 'command-receiving': self._onframecommandreceiving,
810 810 'errored': self._onframeerrored,
811 811 }
812 812
813 813 meth = handlers.get(self._state)
814 814 if not meth:
815 815 raise error.ProgrammingError('unhandled state: %s' % self._state)
816 816
817 817 return meth(frame)
818 818
819 819 def oncommandresponseready(self, stream, requestid, data):
820 820 """Signal that a bytes response is ready to be sent to the client.
821 821
822 822 The raw bytes response is passed as an argument.
823 823 """
824 824 ensureserverstream(stream)
825 825
826 826 def sendframes():
827 827 for frame in createcommandresponseframesfrombytes(stream, requestid,
828 828 data):
829 829 yield frame
830 830
831 831 self._activecommands.remove(requestid)
832 832
833 833 result = sendframes()
834 834
835 835 if self._deferoutput:
836 836 self._bufferedframegens.append(result)
837 837 return 'noop', {}
838 838 else:
839 839 return 'sendframes', {
840 840 'framegen': result,
841 841 }
842 842
843 843 def oncommandresponsereadyobjects(self, stream, requestid, objs):
844 844 """Signal that objects are ready to be sent to the client.
845 845
846 846 ``objs`` is an iterable of objects (typically a generator) that will
847 847 be encoded via CBOR and added to frames, which will be sent to the
848 848 client.
849 849 """
850 850 ensureserverstream(stream)
851 851
852 852 # We need to take care over exception handling. Uncaught exceptions
853 853 # when generating frames could lead to premature end of the frame
854 854 # stream and the possibility of the server or client process getting
855 855 # in a bad state.
856 856 #
857 857 # Keep in mind that if ``objs`` is a generator, advancing it could
858 858 # raise exceptions that originated in e.g. wire protocol command
859 859 # functions. That is why we differentiate between exceptions raised
860 860 # when iterating versus other exceptions that occur.
861 861 #
862 862 # In all cases, when the function finishes, the request is fully
863 863 # handled and no new frames for it should be seen.
864 864
865 865 def sendframes():
866 866 emitted = False
867 867 alternatelocationsent = False
868 868 emitter = bufferingcommandresponseemitter(stream, requestid)
869 869 while True:
870 870 try:
871 871 o = next(objs)
872 872 except StopIteration:
873 873 for frame in emitter.send(None):
874 874 yield frame
875 875
876 876 if emitted:
877 877 yield createcommandresponseeosframe(stream, requestid)
878 878 break
879 879
880 880 except error.WireprotoCommandError as e:
881 881 for frame in createcommanderrorresponse(
882 882 stream, requestid, e.message, e.messageargs):
883 883 yield frame
884 884 break
885 885
886 886 except Exception as e:
887 887 for frame in createerrorframe(
888 888 stream, requestid, '%s' % stringutil.forcebytestr(e),
889 889 errtype='server'):
890 890
891 891 yield frame
892 892
893 893 break
894 894
895 895 try:
896 896 # Alternate location responses can only be the first and
897 897 # only object in the output stream.
898 898 if isinstance(o, wireprototypes.alternatelocationresponse):
899 899 if emitted:
900 900 raise error.ProgrammingError(
901 901 'alternatelocationresponse seen after initial '
902 902 'output object')
903 903
904 904 yield createalternatelocationresponseframe(
905 905 stream, requestid, o)
906 906
907 907 alternatelocationsent = True
908 908 emitted = True
909 909 continue
910 910
911 911 if alternatelocationsent:
912 912 raise error.ProgrammingError(
913 913 'object follows alternatelocationresponse')
914 914
915 915 if not emitted:
916 916 yield createcommandresponseokframe(stream, requestid)
917 917 emitted = True
918 918
919 919 # Objects emitted by command functions can be serializable
920 920 # data structures or special types.
921 921 # TODO consider extracting the content normalization to a
922 922 # standalone function, as it may be useful for e.g. cachers.
923 923
924 924 # A pre-encoded object is sent directly to the emitter.
925 925 if isinstance(o, wireprototypes.encodedresponse):
926 926 for frame in emitter.send(o.data):
927 927 yield frame
928 928
929 929 # A regular object is CBOR encoded.
930 930 else:
931 931 for chunk in cborutil.streamencode(o):
932 932 for frame in emitter.send(chunk):
933 933 yield frame
934 934
935 935 except Exception as e:
936 936 for frame in createerrorframe(stream, requestid,
937 937 '%s' % e,
938 938 errtype='server'):
939 939 yield frame
940 940
941 941 break
942 942
943 943 self._activecommands.remove(requestid)
944 944
945 945 return self._handlesendframes(sendframes())
946 946
947 947 def oninputeof(self):
948 948 """Signals that end of input has been received.
949 949
950 950 No more frames will be received. All pending activity should be
951 951 completed.
952 952 """
953 953 # TODO should we do anything about in-flight commands?
954 954
955 955 if not self._deferoutput or not self._bufferedframegens:
956 956 return 'noop', {}
957 957
958 958 # If we buffered all our responses, emit those.
959 959 def makegen():
960 960 for gen in self._bufferedframegens:
961 961 for frame in gen:
962 962 yield frame
963 963
964 964 return 'sendframes', {
965 965 'framegen': makegen(),
966 966 }
967 967
968 968 def _handlesendframes(self, framegen):
969 969 if self._deferoutput:
970 970 self._bufferedframegens.append(framegen)
971 971 return 'noop', {}
972 972 else:
973 973 return 'sendframes', {
974 974 'framegen': framegen,
975 975 }
976 976
977 977 def onservererror(self, stream, requestid, msg):
978 978 ensureserverstream(stream)
979 979
980 980 def sendframes():
981 981 for frame in createerrorframe(stream, requestid, msg,
982 982 errtype='server'):
983 983 yield frame
984 984
985 985 self._activecommands.remove(requestid)
986 986
987 987 return self._handlesendframes(sendframes())
988 988
989 989 def oncommanderror(self, stream, requestid, message, args=None):
990 990 """Called when a command encountered an error before sending output."""
991 991 ensureserverstream(stream)
992 992
993 993 def sendframes():
994 994 for frame in createcommanderrorresponse(stream, requestid, message,
995 995 args):
996 996 yield frame
997 997
998 998 self._activecommands.remove(requestid)
999 999
1000 1000 return self._handlesendframes(sendframes())
1001 1001
1002 1002 def makeoutputstream(self):
1003 1003 """Create a stream to be used for sending data to the client."""
1004 1004 streamid = self._nextoutgoingstreamid
1005 1005 self._nextoutgoingstreamid += 2
1006 1006
1007 1007 s = stream(streamid)
1008 1008 self._outgoingstreams[streamid] = s
1009 1009
1010 1010 return s
1011 1011
1012 1012 def _makeerrorresult(self, msg):
1013 1013 return 'error', {
1014 1014 'message': msg,
1015 1015 }
1016 1016
1017 1017 def _makeruncommandresult(self, requestid):
1018 1018 entry = self._receivingcommands[requestid]
1019 1019
1020 1020 if not entry['requestdone']:
1021 1021 self._state = 'errored'
1022 1022 raise error.ProgrammingError('should not be called without '
1023 1023 'requestdone set')
1024 1024
1025 1025 del self._receivingcommands[requestid]
1026 1026
1027 1027 if self._receivingcommands:
1028 1028 self._state = 'command-receiving'
1029 1029 else:
1030 1030 self._state = 'idle'
1031 1031
1032 1032 # Decode the payloads as CBOR.
1033 1033 entry['payload'].seek(0)
1034 1034 request = cborutil.decodeall(entry['payload'].getvalue())[0]
1035 1035
1036 1036 if b'name' not in request:
1037 1037 self._state = 'errored'
1038 1038 return self._makeerrorresult(
1039 1039 _('command request missing "name" field'))
1040 1040
1041 1041 if b'args' not in request:
1042 1042 request[b'args'] = {}
1043 1043
1044 1044 assert requestid not in self._activecommands
1045 1045 self._activecommands.add(requestid)
1046 1046
1047 1047 return 'runcommand', {
1048 1048 'requestid': requestid,
1049 1049 'command': request[b'name'],
1050 1050 'args': request[b'args'],
1051 1051 'redirect': request.get(b'redirect'),
1052 1052 'data': entry['data'].getvalue() if entry['data'] else None,
1053 1053 }
1054 1054
1055 1055 def _makewantframeresult(self):
1056 1056 return 'wantframe', {
1057 1057 'state': self._state,
1058 1058 }
1059 1059
1060 1060 def _validatecommandrequestframe(self, frame):
1061 1061 new = frame.flags & FLAG_COMMAND_REQUEST_NEW
1062 1062 continuation = frame.flags & FLAG_COMMAND_REQUEST_CONTINUATION
1063 1063
1064 1064 if new and continuation:
1065 1065 self._state = 'errored'
1066 1066 return self._makeerrorresult(
1067 1067 _('received command request frame with both new and '
1068 1068 'continuation flags set'))
1069 1069
1070 1070 if not new and not continuation:
1071 1071 self._state = 'errored'
1072 1072 return self._makeerrorresult(
1073 1073 _('received command request frame with neither new nor '
1074 1074 'continuation flags set'))
1075 1075
1076 1076 def _onframeinitial(self, frame):
1077 1077 # Called when we receive a frame when in the "initial" state.
1078 1078 if frame.typeid == FRAME_TYPE_SENDER_PROTOCOL_SETTINGS:
1079 1079 self._state = 'protocol-settings-receiving'
1080 1080 self._protocolsettingsdecoder = cborutil.bufferingdecoder()
1081 1081 return self._onframeprotocolsettings(frame)
1082 1082
1083 1083 elif frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
1084 1084 self._state = 'idle'
1085 1085 return self._onframeidle(frame)
1086 1086
1087 1087 else:
1088 1088 self._state = 'errored'
1089 1089 return self._makeerrorresult(
1090 1090 _('expected sender protocol settings or command request '
1091 1091 'frame; got %d') % frame.typeid)
1092 1092
1093 1093 def _onframeprotocolsettings(self, frame):
1094 1094 assert self._state == 'protocol-settings-receiving'
1095 1095 assert self._protocolsettingsdecoder is not None
1096 1096
1097 1097 if frame.typeid != FRAME_TYPE_SENDER_PROTOCOL_SETTINGS:
1098 1098 self._state = 'errored'
1099 1099 return self._makeerrorresult(
1100 1100 _('expected sender protocol settings frame; got %d') %
1101 1101 frame.typeid)
1102 1102
1103 1103 more = frame.flags & FLAG_SENDER_PROTOCOL_SETTINGS_CONTINUATION
1104 1104 eos = frame.flags & FLAG_SENDER_PROTOCOL_SETTINGS_EOS
1105 1105
1106 1106 if more and eos:
1107 1107 self._state = 'errored'
1108 1108 return self._makeerrorresult(
1109 1109 _('sender protocol settings frame cannot have both '
1110 1110 'continuation and end of stream flags set'))
1111 1111
1112 1112 if not more and not eos:
1113 1113 self._state = 'errored'
1114 1114 return self._makeerrorresult(
1115 1115 _('sender protocol settings frame must have continuation or '
1116 1116 'end of stream flag set'))
1117 1117
1118 1118 # TODO establish limits for maximum amount of data that can be
1119 1119 # buffered.
1120 1120 try:
1121 1121 self._protocolsettingsdecoder.decode(frame.payload)
1122 1122 except Exception as e:
1123 1123 self._state = 'errored'
1124 1124 return self._makeerrorresult(
1125 1125 _('error decoding CBOR from sender protocol settings frame: %s')
1126 1126 % stringutil.forcebytestr(e))
1127 1127
1128 1128 if more:
1129 1129 return self._makewantframeresult()
1130 1130
1131 1131 assert eos
1132 1132
1133 1133 decoded = self._protocolsettingsdecoder.getavailable()
1134 1134 self._protocolsettingsdecoder = None
1135 1135
1136 1136 if not decoded:
1137 1137 self._state = 'errored'
1138 1138 return self._makeerrorresult(
1139 1139 _('sender protocol settings frame did not contain CBOR data'))
1140 1140 elif len(decoded) > 1:
1141 1141 self._state = 'errored'
1142 1142 return self._makeerrorresult(
1143 1143 _('sender protocol settings frame contained multiple CBOR '
1144 1144 'values'))
1145 1145
1146 1146 d = decoded[0]
1147 1147
1148 1148 if b'contentencodings' in d:
1149 1149 self._sendersettings['contentencodings'] = d[b'contentencodings']
1150 1150
1151 1151 self._state = 'idle'
1152 1152
1153 1153 return self._makewantframeresult()
1154 1154
1155 1155 def _onframeidle(self, frame):
1156 1156 # The only frame type that should be received in this state is a
1157 1157 # command request.
1158 1158 if frame.typeid != FRAME_TYPE_COMMAND_REQUEST:
1159 1159 self._state = 'errored'
1160 1160 return self._makeerrorresult(
1161 1161 _('expected command request frame; got %d') % frame.typeid)
1162 1162
1163 1163 res = self._validatecommandrequestframe(frame)
1164 1164 if res:
1165 1165 return res
1166 1166
1167 1167 if frame.requestid in self._receivingcommands:
1168 1168 self._state = 'errored'
1169 1169 return self._makeerrorresult(
1170 1170 _('request with ID %d already received') % frame.requestid)
1171 1171
1172 1172 if frame.requestid in self._activecommands:
1173 1173 self._state = 'errored'
1174 1174 return self._makeerrorresult(
1175 1175 _('request with ID %d is already active') % frame.requestid)
1176 1176
1177 1177 new = frame.flags & FLAG_COMMAND_REQUEST_NEW
1178 1178 moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES
1179 1179 expectingdata = frame.flags & FLAG_COMMAND_REQUEST_EXPECT_DATA
1180 1180
1181 1181 if not new:
1182 1182 self._state = 'errored'
1183 1183 return self._makeerrorresult(
1184 1184 _('received command request frame without new flag set'))
1185 1185
1186 1186 payload = util.bytesio()
1187 1187 payload.write(frame.payload)
1188 1188
1189 1189 self._receivingcommands[frame.requestid] = {
1190 1190 'payload': payload,
1191 1191 'data': None,
1192 1192 'requestdone': not moreframes,
1193 1193 'expectingdata': bool(expectingdata),
1194 1194 }
1195 1195
1196 1196 # This is the final frame for this request. Dispatch it.
1197 1197 if not moreframes and not expectingdata:
1198 1198 return self._makeruncommandresult(frame.requestid)
1199 1199
1200 1200 assert moreframes or expectingdata
1201 1201 self._state = 'command-receiving'
1202 1202 return self._makewantframeresult()
1203 1203
1204 1204 def _onframecommandreceiving(self, frame):
1205 1205 if frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
1206 1206 # Process new command requests as such.
1207 1207 if frame.flags & FLAG_COMMAND_REQUEST_NEW:
1208 1208 return self._onframeidle(frame)
1209 1209
1210 1210 res = self._validatecommandrequestframe(frame)
1211 1211 if res:
1212 1212 return res
1213 1213
1214 1214 # All other frames should be related to a command that is currently
1215 1215 # receiving but is not active.
1216 1216 if frame.requestid in self._activecommands:
1217 1217 self._state = 'errored'
1218 1218 return self._makeerrorresult(
1219 1219 _('received frame for request that is still active: %d') %
1220 1220 frame.requestid)
1221 1221
1222 1222 if frame.requestid not in self._receivingcommands:
1223 1223 self._state = 'errored'
1224 1224 return self._makeerrorresult(
1225 1225 _('received frame for request that is not receiving: %d') %
1226 1226 frame.requestid)
1227 1227
1228 1228 entry = self._receivingcommands[frame.requestid]
1229 1229
1230 1230 if frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
1231 1231 moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES
1232 1232 expectingdata = bool(frame.flags & FLAG_COMMAND_REQUEST_EXPECT_DATA)
1233 1233
1234 1234 if entry['requestdone']:
1235 1235 self._state = 'errored'
1236 1236 return self._makeerrorresult(
1237 1237 _('received command request frame when request frames '
1238 1238 'were supposedly done'))
1239 1239
1240 1240 if expectingdata != entry['expectingdata']:
1241 1241 self._state = 'errored'
1242 1242 return self._makeerrorresult(
1243 1243 _('mismatch between expect data flag and previous frame'))
1244 1244
1245 1245 entry['payload'].write(frame.payload)
1246 1246
1247 1247 if not moreframes:
1248 1248 entry['requestdone'] = True
1249 1249
1250 1250 if not moreframes and not expectingdata:
1251 1251 return self._makeruncommandresult(frame.requestid)
1252 1252
1253 1253 return self._makewantframeresult()
1254 1254
1255 1255 elif frame.typeid == FRAME_TYPE_COMMAND_DATA:
1256 1256 if not entry['expectingdata']:
1257 1257 self._state = 'errored'
1258 1258 return self._makeerrorresult(_(
1259 1259 'received command data frame for request that is not '
1260 1260 'expecting data: %d') % frame.requestid)
1261 1261
1262 1262 if entry['data'] is None:
1263 1263 entry['data'] = util.bytesio()
1264 1264
1265 1265 return self._handlecommanddataframe(frame, entry)
1266 1266 else:
1267 1267 self._state = 'errored'
1268 1268 return self._makeerrorresult(_(
1269 1269 'received unexpected frame type: %d') % frame.typeid)
1270 1270
1271 1271 def _handlecommanddataframe(self, frame, entry):
1272 1272 assert frame.typeid == FRAME_TYPE_COMMAND_DATA
1273 1273
1274 1274 # TODO support streaming data instead of buffering it.
1275 1275 entry['data'].write(frame.payload)
1276 1276
1277 1277 if frame.flags & FLAG_COMMAND_DATA_CONTINUATION:
1278 1278 return self._makewantframeresult()
1279 1279 elif frame.flags & FLAG_COMMAND_DATA_EOS:
1280 1280 entry['data'].seek(0)
1281 1281 return self._makeruncommandresult(frame.requestid)
1282 1282 else:
1283 1283 self._state = 'errored'
1284 1284 return self._makeerrorresult(_('command data frame without '
1285 1285 'flags'))
1286 1286
1287 1287 def _onframeerrored(self, frame):
1288 1288 return self._makeerrorresult(_('server already errored'))
1289 1289
1290 1290 class commandrequest(object):
1291 1291 """Represents a request to run a command."""
1292 1292
1293 1293 def __init__(self, requestid, name, args, datafh=None, redirect=None):
1294 1294 self.requestid = requestid
1295 1295 self.name = name
1296 1296 self.args = args
1297 1297 self.datafh = datafh
1298 1298 self.redirect = redirect
1299 1299 self.state = 'pending'
1300 1300
1301 1301 class clientreactor(object):
1302 1302 """Holds state of a client issuing frame-based protocol requests.
1303 1303
1304 1304 This is like ``serverreactor`` but for client-side state.
1305 1305
1306 1306 Each instance is bound to the lifetime of a connection. For persistent
1307 1307 connection transports using e.g. TCP sockets and speaking the raw
1308 1308 framing protocol, there will be a single instance for the lifetime of
1309 1309 the TCP socket. For transports where there are multiple discrete
1310 1310 interactions (say tunneled within in HTTP request), there will be a
1311 1311 separate instance for each distinct interaction.
1312
1313 Consumers are expected to tell instances when events occur by calling
1314 various methods. These methods return a 2-tuple describing any follow-up
1315 action(s) to take. The first element is the name of an action to
1316 perform. The second is a data structure (usually a dict) specific to
1317 that action that contains more information. e.g. if the reactor wants
1318 to send frames to the server, the data structure will contain a reference
1319 to those frames.
1320
1321 Valid actions that consumers can be instructed to take are:
1322
1323 noop
1324 Indicates no additional action is required.
1325
1326 sendframes
1327 Indicates that frames should be sent to the server. The ``framegen``
1328 key contains a generator of frames that should be sent. The reactor
1329 assumes that all frames in this generator are sent to the server.
1330
1331 error
1332 Indicates that an error occurred. The ``message`` key contains an
1333 error message describing the failure.
1334
1335 responsedata
1336 Indicates a response to a previously-issued command was received.
1337
1338 The ``request`` key contains the ``commandrequest`` instance that
1339 represents the request this data is for.
1340
1341 The ``data`` key contains the decoded data from the server.
1342
1343 ``expectmore`` and ``eos`` evaluate to True when more response data
1344 is expected to follow or we're at the end of the response stream,
1345 respectively.
1312 1346 """
1313 1347 def __init__(self, hasmultiplesend=False, buffersends=True):
1314 1348 """Create a new instance.
1315 1349
1316 1350 ``hasmultiplesend`` indicates whether multiple sends are supported
1317 1351 by the transport. When True, it is possible to send commands immediately
1318 1352 instead of buffering until the caller signals an intent to finish a
1319 1353 send operation.
1320 1354
1321 1355 ``buffercommands`` indicates whether sends should be buffered until the
1322 1356 last request has been issued.
1323 1357 """
1324 1358 self._hasmultiplesend = hasmultiplesend
1325 1359 self._buffersends = buffersends
1326 1360
1327 1361 self._canissuecommands = True
1328 1362 self._cansend = True
1329 1363
1330 1364 self._nextrequestid = 1
1331 1365 # We only support a single outgoing stream for now.
1332 1366 self._outgoingstream = stream(1)
1333 1367 self._pendingrequests = collections.deque()
1334 1368 self._activerequests = {}
1335 1369 self._incomingstreams = {}
1336 1370
1337 1371 def callcommand(self, name, args, datafh=None, redirect=None):
1338 1372 """Request that a command be executed.
1339 1373
1340 1374 Receives the command name, a dict of arguments to pass to the command,
1341 1375 and an optional file object containing the raw data for the command.
1342 1376
1343 1377 Returns a 3-tuple of (request, action, action data).
1344 1378 """
1345 1379 if not self._canissuecommands:
1346 1380 raise error.ProgrammingError('cannot issue new commands')
1347 1381
1348 1382 requestid = self._nextrequestid
1349 1383 self._nextrequestid += 2
1350 1384
1351 1385 request = commandrequest(requestid, name, args, datafh=datafh,
1352 1386 redirect=redirect)
1353 1387
1354 1388 if self._buffersends:
1355 1389 self._pendingrequests.append(request)
1356 1390 return request, 'noop', {}
1357 1391 else:
1358 1392 if not self._cansend:
1359 1393 raise error.ProgrammingError('sends cannot be performed on '
1360 1394 'this instance')
1361 1395
1362 1396 if not self._hasmultiplesend:
1363 1397 self._cansend = False
1364 1398 self._canissuecommands = False
1365 1399
1366 1400 return request, 'sendframes', {
1367 1401 'framegen': self._makecommandframes(request),
1368 1402 }
1369 1403
1370 1404 def flushcommands(self):
1371 1405 """Request that all queued commands be sent.
1372 1406
1373 1407 If any commands are buffered, this will instruct the caller to send
1374 1408 them over the wire. If no commands are buffered it instructs the client
1375 1409 to no-op.
1376 1410
1377 1411 If instances aren't configured for multiple sends, no new command
1378 1412 requests are allowed after this is called.
1379 1413 """
1380 1414 if not self._pendingrequests:
1381 1415 return 'noop', {}
1382 1416
1383 1417 if not self._cansend:
1384 1418 raise error.ProgrammingError('sends cannot be performed on this '
1385 1419 'instance')
1386 1420
1387 1421 # If the instance only allows sending once, mark that we have fired
1388 1422 # our one shot.
1389 1423 if not self._hasmultiplesend:
1390 1424 self._canissuecommands = False
1391 1425 self._cansend = False
1392 1426
1393 1427 def makeframes():
1394 1428 while self._pendingrequests:
1395 1429 request = self._pendingrequests.popleft()
1396 1430 for frame in self._makecommandframes(request):
1397 1431 yield frame
1398 1432
1399 1433 return 'sendframes', {
1400 1434 'framegen': makeframes(),
1401 1435 }
1402 1436
1403 1437 def _makecommandframes(self, request):
1404 1438 """Emit frames to issue a command request.
1405 1439
1406 1440 As a side-effect, update request accounting to reflect its changed
1407 1441 state.
1408 1442 """
1409 1443 self._activerequests[request.requestid] = request
1410 1444 request.state = 'sending'
1411 1445
1412 1446 res = createcommandframes(self._outgoingstream,
1413 1447 request.requestid,
1414 1448 request.name,
1415 1449 request.args,
1416 1450 datafh=request.datafh,
1417 1451 redirect=request.redirect)
1418 1452
1419 1453 for frame in res:
1420 1454 yield frame
1421 1455
1422 1456 request.state = 'sent'
1423 1457
1424 1458 def onframerecv(self, frame):
1425 1459 """Process a frame that has been received off the wire.
1426 1460
1427 1461 Returns a 2-tuple of (action, meta) describing further action the
1428 1462 caller needs to take as a result of receiving this frame.
1429 1463 """
1430 1464 if frame.streamid % 2:
1431 1465 return 'error', {
1432 1466 'message': (
1433 1467 _('received frame with odd numbered stream ID: %d') %
1434 1468 frame.streamid),
1435 1469 }
1436 1470
1437 1471 if frame.streamid not in self._incomingstreams:
1438 1472 if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM:
1439 1473 return 'error', {
1440 1474 'message': _('received frame on unknown stream '
1441 1475 'without beginning of stream flag set'),
1442 1476 }
1443 1477
1444 1478 self._incomingstreams[frame.streamid] = stream(frame.streamid)
1445 1479
1446 1480 if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED:
1447 1481 raise error.ProgrammingError('support for decoding stream '
1448 1482 'payloads not yet implemneted')
1449 1483
1450 1484 if frame.streamflags & STREAM_FLAG_END_STREAM:
1451 1485 del self._incomingstreams[frame.streamid]
1452 1486
1453 1487 if frame.requestid not in self._activerequests:
1454 1488 return 'error', {
1455 1489 'message': (_('received frame for inactive request ID: %d') %
1456 1490 frame.requestid),
1457 1491 }
1458 1492
1459 1493 request = self._activerequests[frame.requestid]
1460 1494 request.state = 'receiving'
1461 1495
1462 1496 handlers = {
1463 1497 FRAME_TYPE_COMMAND_RESPONSE: self._oncommandresponseframe,
1464 1498 FRAME_TYPE_ERROR_RESPONSE: self._onerrorresponseframe,
1465 1499 }
1466 1500
1467 1501 meth = handlers.get(frame.typeid)
1468 1502 if not meth:
1469 1503 raise error.ProgrammingError('unhandled frame type: %d' %
1470 1504 frame.typeid)
1471 1505
1472 1506 return meth(request, frame)
1473 1507
1474 1508 def _oncommandresponseframe(self, request, frame):
1475 1509 if frame.flags & FLAG_COMMAND_RESPONSE_EOS:
1476 1510 request.state = 'received'
1477 1511 del self._activerequests[request.requestid]
1478 1512
1479 1513 return 'responsedata', {
1480 1514 'request': request,
1481 1515 'expectmore': frame.flags & FLAG_COMMAND_RESPONSE_CONTINUATION,
1482 1516 'eos': frame.flags & FLAG_COMMAND_RESPONSE_EOS,
1483 1517 'data': frame.payload,
1484 1518 }
1485 1519
1486 1520 def _onerrorresponseframe(self, request, frame):
1487 1521 request.state = 'errored'
1488 1522 del self._activerequests[request.requestid]
1489 1523
1490 1524 # The payload should be a CBOR map.
1491 1525 m = cborutil.decodeall(frame.payload)[0]
1492 1526
1493 1527 return 'error', {
1494 1528 'request': request,
1495 1529 'type': m['type'],
1496 1530 'message': m['message'],
1497 1531 }
General Comments 0
You need to be logged in to leave comments. Login now