##// END OF EJS Templates
wireprotov2: establish dedicated classes for input and output streams...
Gregory Szorc -
r40166:5d44c4d1 default
parent child Browse files
Show More
@@ -1,1602 +1,1609 b''
1 1 # wireprotoframing.py - unified framing protocol for wire protocol
2 2 #
3 3 # Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 # This file contains functionality to support the unified frame-based wire
9 9 # protocol. For details about the protocol, see
10 10 # `hg help internals.wireprotocol`.
11 11
12 12 from __future__ import absolute_import
13 13
14 14 import collections
15 15 import struct
16 16
17 17 from .i18n import _
18 18 from .thirdparty import (
19 19 attr,
20 20 )
21 21 from . import (
22 22 encoding,
23 23 error,
24 24 pycompat,
25 25 util,
26 26 wireprototypes,
27 27 )
28 28 from .utils import (
29 29 cborutil,
30 30 stringutil,
31 31 )
32 32
33 33 FRAME_HEADER_SIZE = 8
34 34 DEFAULT_MAX_FRAME_SIZE = 32768
35 35
36 36 STREAM_FLAG_BEGIN_STREAM = 0x01
37 37 STREAM_FLAG_END_STREAM = 0x02
38 38 STREAM_FLAG_ENCODING_APPLIED = 0x04
39 39
40 40 STREAM_FLAGS = {
41 41 b'stream-begin': STREAM_FLAG_BEGIN_STREAM,
42 42 b'stream-end': STREAM_FLAG_END_STREAM,
43 43 b'encoded': STREAM_FLAG_ENCODING_APPLIED,
44 44 }
45 45
46 46 FRAME_TYPE_COMMAND_REQUEST = 0x01
47 47 FRAME_TYPE_COMMAND_DATA = 0x02
48 48 FRAME_TYPE_COMMAND_RESPONSE = 0x03
49 49 FRAME_TYPE_ERROR_RESPONSE = 0x05
50 50 FRAME_TYPE_TEXT_OUTPUT = 0x06
51 51 FRAME_TYPE_PROGRESS = 0x07
52 52 FRAME_TYPE_SENDER_PROTOCOL_SETTINGS = 0x08
53 53 FRAME_TYPE_STREAM_SETTINGS = 0x09
54 54
55 55 FRAME_TYPES = {
56 56 b'command-request': FRAME_TYPE_COMMAND_REQUEST,
57 57 b'command-data': FRAME_TYPE_COMMAND_DATA,
58 58 b'command-response': FRAME_TYPE_COMMAND_RESPONSE,
59 59 b'error-response': FRAME_TYPE_ERROR_RESPONSE,
60 60 b'text-output': FRAME_TYPE_TEXT_OUTPUT,
61 61 b'progress': FRAME_TYPE_PROGRESS,
62 62 b'sender-protocol-settings': FRAME_TYPE_SENDER_PROTOCOL_SETTINGS,
63 63 b'stream-settings': FRAME_TYPE_STREAM_SETTINGS,
64 64 }
65 65
66 66 FLAG_COMMAND_REQUEST_NEW = 0x01
67 67 FLAG_COMMAND_REQUEST_CONTINUATION = 0x02
68 68 FLAG_COMMAND_REQUEST_MORE_FRAMES = 0x04
69 69 FLAG_COMMAND_REQUEST_EXPECT_DATA = 0x08
70 70
71 71 FLAGS_COMMAND_REQUEST = {
72 72 b'new': FLAG_COMMAND_REQUEST_NEW,
73 73 b'continuation': FLAG_COMMAND_REQUEST_CONTINUATION,
74 74 b'more': FLAG_COMMAND_REQUEST_MORE_FRAMES,
75 75 b'have-data': FLAG_COMMAND_REQUEST_EXPECT_DATA,
76 76 }
77 77
78 78 FLAG_COMMAND_DATA_CONTINUATION = 0x01
79 79 FLAG_COMMAND_DATA_EOS = 0x02
80 80
81 81 FLAGS_COMMAND_DATA = {
82 82 b'continuation': FLAG_COMMAND_DATA_CONTINUATION,
83 83 b'eos': FLAG_COMMAND_DATA_EOS,
84 84 }
85 85
86 86 FLAG_COMMAND_RESPONSE_CONTINUATION = 0x01
87 87 FLAG_COMMAND_RESPONSE_EOS = 0x02
88 88
89 89 FLAGS_COMMAND_RESPONSE = {
90 90 b'continuation': FLAG_COMMAND_RESPONSE_CONTINUATION,
91 91 b'eos': FLAG_COMMAND_RESPONSE_EOS,
92 92 }
93 93
94 94 FLAG_SENDER_PROTOCOL_SETTINGS_CONTINUATION = 0x01
95 95 FLAG_SENDER_PROTOCOL_SETTINGS_EOS = 0x02
96 96
97 97 FLAGS_SENDER_PROTOCOL_SETTINGS = {
98 98 b'continuation': FLAG_SENDER_PROTOCOL_SETTINGS_CONTINUATION,
99 99 b'eos': FLAG_SENDER_PROTOCOL_SETTINGS_EOS,
100 100 }
101 101
102 102 FLAG_STREAM_ENCODING_SETTINGS_CONTINUATION = 0x01
103 103 FLAG_STREAM_ENCODING_SETTINGS_EOS = 0x02
104 104
105 105 FLAGS_STREAM_ENCODING_SETTINGS = {
106 106 b'continuation': FLAG_STREAM_ENCODING_SETTINGS_CONTINUATION,
107 107 b'eos': FLAG_STREAM_ENCODING_SETTINGS_EOS,
108 108 }
109 109
110 110 # Maps frame types to their available flags.
111 111 FRAME_TYPE_FLAGS = {
112 112 FRAME_TYPE_COMMAND_REQUEST: FLAGS_COMMAND_REQUEST,
113 113 FRAME_TYPE_COMMAND_DATA: FLAGS_COMMAND_DATA,
114 114 FRAME_TYPE_COMMAND_RESPONSE: FLAGS_COMMAND_RESPONSE,
115 115 FRAME_TYPE_ERROR_RESPONSE: {},
116 116 FRAME_TYPE_TEXT_OUTPUT: {},
117 117 FRAME_TYPE_PROGRESS: {},
118 118 FRAME_TYPE_SENDER_PROTOCOL_SETTINGS: FLAGS_SENDER_PROTOCOL_SETTINGS,
119 119 FRAME_TYPE_STREAM_SETTINGS: FLAGS_STREAM_ENCODING_SETTINGS,
120 120 }
121 121
122 122 ARGUMENT_RECORD_HEADER = struct.Struct(r'<HH')
123 123
124 124 def humanflags(mapping, value):
125 125 """Convert a numeric flags value to a human value, using a mapping table."""
126 126 namemap = {v: k for k, v in mapping.iteritems()}
127 127 flags = []
128 128 val = 1
129 129 while value >= val:
130 130 if value & val:
131 131 flags.append(namemap.get(val, '<unknown 0x%02x>' % val))
132 132 val <<= 1
133 133
134 134 return b'|'.join(flags)
135 135
136 136 @attr.s(slots=True)
137 137 class frameheader(object):
138 138 """Represents the data in a frame header."""
139 139
140 140 length = attr.ib()
141 141 requestid = attr.ib()
142 142 streamid = attr.ib()
143 143 streamflags = attr.ib()
144 144 typeid = attr.ib()
145 145 flags = attr.ib()
146 146
147 147 @attr.s(slots=True, repr=False)
148 148 class frame(object):
149 149 """Represents a parsed frame."""
150 150
151 151 requestid = attr.ib()
152 152 streamid = attr.ib()
153 153 streamflags = attr.ib()
154 154 typeid = attr.ib()
155 155 flags = attr.ib()
156 156 payload = attr.ib()
157 157
158 158 @encoding.strmethod
159 159 def __repr__(self):
160 160 typename = '<unknown 0x%02x>' % self.typeid
161 161 for name, value in FRAME_TYPES.iteritems():
162 162 if value == self.typeid:
163 163 typename = name
164 164 break
165 165
166 166 return ('frame(size=%d; request=%d; stream=%d; streamflags=%s; '
167 167 'type=%s; flags=%s)' % (
168 168 len(self.payload), self.requestid, self.streamid,
169 169 humanflags(STREAM_FLAGS, self.streamflags), typename,
170 170 humanflags(FRAME_TYPE_FLAGS.get(self.typeid, {}), self.flags)))
171 171
172 172 def makeframe(requestid, streamid, streamflags, typeid, flags, payload):
173 173 """Assemble a frame into a byte array."""
174 174 # TODO assert size of payload.
175 175 frame = bytearray(FRAME_HEADER_SIZE + len(payload))
176 176
177 177 # 24 bits length
178 178 # 16 bits request id
179 179 # 8 bits stream id
180 180 # 8 bits stream flags
181 181 # 4 bits type
182 182 # 4 bits flags
183 183
184 184 l = struct.pack(r'<I', len(payload))
185 185 frame[0:3] = l[0:3]
186 186 struct.pack_into(r'<HBB', frame, 3, requestid, streamid, streamflags)
187 187 frame[7] = (typeid << 4) | flags
188 188 frame[8:] = payload
189 189
190 190 return frame
191 191
192 192 def makeframefromhumanstring(s):
193 193 """Create a frame from a human readable string
194 194
195 195 Strings have the form:
196 196
197 197 <request-id> <stream-id> <stream-flags> <type> <flags> <payload>
198 198
199 199 This can be used by user-facing applications and tests for creating
200 200 frames easily without having to type out a bunch of constants.
201 201
202 202 Request ID and stream IDs are integers.
203 203
204 204 Stream flags, frame type, and flags can be specified by integer or
205 205 named constant.
206 206
207 207 Flags can be delimited by `|` to bitwise OR them together.
208 208
209 209 If the payload begins with ``cbor:``, the following string will be
210 210 evaluated as Python literal and the resulting object will be fed into
211 211 a CBOR encoder. Otherwise, the payload is interpreted as a Python
212 212 byte string literal.
213 213 """
214 214 fields = s.split(b' ', 5)
215 215 requestid, streamid, streamflags, frametype, frameflags, payload = fields
216 216
217 217 requestid = int(requestid)
218 218 streamid = int(streamid)
219 219
220 220 finalstreamflags = 0
221 221 for flag in streamflags.split(b'|'):
222 222 if flag in STREAM_FLAGS:
223 223 finalstreamflags |= STREAM_FLAGS[flag]
224 224 else:
225 225 finalstreamflags |= int(flag)
226 226
227 227 if frametype in FRAME_TYPES:
228 228 frametype = FRAME_TYPES[frametype]
229 229 else:
230 230 frametype = int(frametype)
231 231
232 232 finalflags = 0
233 233 validflags = FRAME_TYPE_FLAGS[frametype]
234 234 for flag in frameflags.split(b'|'):
235 235 if flag in validflags:
236 236 finalflags |= validflags[flag]
237 237 else:
238 238 finalflags |= int(flag)
239 239
240 240 if payload.startswith(b'cbor:'):
241 241 payload = b''.join(cborutil.streamencode(
242 242 stringutil.evalpythonliteral(payload[5:])))
243 243
244 244 else:
245 245 payload = stringutil.unescapestr(payload)
246 246
247 247 return makeframe(requestid=requestid, streamid=streamid,
248 248 streamflags=finalstreamflags, typeid=frametype,
249 249 flags=finalflags, payload=payload)
250 250
251 251 def parseheader(data):
252 252 """Parse a unified framing protocol frame header from a buffer.
253 253
254 254 The header is expected to be in the buffer at offset 0 and the
255 255 buffer is expected to be large enough to hold a full header.
256 256 """
257 257 # 24 bits payload length (little endian)
258 258 # 16 bits request ID
259 259 # 8 bits stream ID
260 260 # 8 bits stream flags
261 261 # 4 bits frame type
262 262 # 4 bits frame flags
263 263 # ... payload
264 264 framelength = data[0] + 256 * data[1] + 16384 * data[2]
265 265 requestid, streamid, streamflags = struct.unpack_from(r'<HBB', data, 3)
266 266 typeflags = data[7]
267 267
268 268 frametype = (typeflags & 0xf0) >> 4
269 269 frameflags = typeflags & 0x0f
270 270
271 271 return frameheader(framelength, requestid, streamid, streamflags,
272 272 frametype, frameflags)
273 273
274 274 def readframe(fh):
275 275 """Read a unified framing protocol frame from a file object.
276 276
277 277 Returns a 3-tuple of (type, flags, payload) for the decoded frame or
278 278 None if no frame is available. May raise if a malformed frame is
279 279 seen.
280 280 """
281 281 header = bytearray(FRAME_HEADER_SIZE)
282 282
283 283 readcount = fh.readinto(header)
284 284
285 285 if readcount == 0:
286 286 return None
287 287
288 288 if readcount != FRAME_HEADER_SIZE:
289 289 raise error.Abort(_('received incomplete frame: got %d bytes: %s') %
290 290 (readcount, header))
291 291
292 292 h = parseheader(header)
293 293
294 294 payload = fh.read(h.length)
295 295 if len(payload) != h.length:
296 296 raise error.Abort(_('frame length error: expected %d; got %d') %
297 297 (h.length, len(payload)))
298 298
299 299 return frame(h.requestid, h.streamid, h.streamflags, h.typeid, h.flags,
300 300 payload)
301 301
302 302 def createcommandframes(stream, requestid, cmd, args, datafh=None,
303 303 maxframesize=DEFAULT_MAX_FRAME_SIZE,
304 304 redirect=None):
305 305 """Create frames necessary to transmit a request to run a command.
306 306
307 307 This is a generator of bytearrays. Each item represents a frame
308 308 ready to be sent over the wire to a peer.
309 309 """
310 310 data = {b'name': cmd}
311 311 if args:
312 312 data[b'args'] = args
313 313
314 314 if redirect:
315 315 data[b'redirect'] = redirect
316 316
317 317 data = b''.join(cborutil.streamencode(data))
318 318
319 319 offset = 0
320 320
321 321 while True:
322 322 flags = 0
323 323
324 324 # Must set new or continuation flag.
325 325 if not offset:
326 326 flags |= FLAG_COMMAND_REQUEST_NEW
327 327 else:
328 328 flags |= FLAG_COMMAND_REQUEST_CONTINUATION
329 329
330 330 # Data frames is set on all frames.
331 331 if datafh:
332 332 flags |= FLAG_COMMAND_REQUEST_EXPECT_DATA
333 333
334 334 payload = data[offset:offset + maxframesize]
335 335 offset += len(payload)
336 336
337 337 if len(payload) == maxframesize and offset < len(data):
338 338 flags |= FLAG_COMMAND_REQUEST_MORE_FRAMES
339 339
340 340 yield stream.makeframe(requestid=requestid,
341 341 typeid=FRAME_TYPE_COMMAND_REQUEST,
342 342 flags=flags,
343 343 payload=payload)
344 344
345 345 if not (flags & FLAG_COMMAND_REQUEST_MORE_FRAMES):
346 346 break
347 347
348 348 if datafh:
349 349 while True:
350 350 data = datafh.read(DEFAULT_MAX_FRAME_SIZE)
351 351
352 352 done = False
353 353 if len(data) == DEFAULT_MAX_FRAME_SIZE:
354 354 flags = FLAG_COMMAND_DATA_CONTINUATION
355 355 else:
356 356 flags = FLAG_COMMAND_DATA_EOS
357 357 assert datafh.read(1) == b''
358 358 done = True
359 359
360 360 yield stream.makeframe(requestid=requestid,
361 361 typeid=FRAME_TYPE_COMMAND_DATA,
362 362 flags=flags,
363 363 payload=data)
364 364
365 365 if done:
366 366 break
367 367
368 368 def createcommandresponseframesfrombytes(stream, requestid, data,
369 369 maxframesize=DEFAULT_MAX_FRAME_SIZE):
370 370 """Create a raw frame to send a bytes response from static bytes input.
371 371
372 372 Returns a generator of bytearrays.
373 373 """
374 374 # Automatically send the overall CBOR response map.
375 375 overall = b''.join(cborutil.streamencode({b'status': b'ok'}))
376 376 if len(overall) > maxframesize:
377 377 raise error.ProgrammingError('not yet implemented')
378 378
379 379 # Simple case where we can fit the full response in a single frame.
380 380 if len(overall) + len(data) <= maxframesize:
381 381 flags = FLAG_COMMAND_RESPONSE_EOS
382 382 yield stream.makeframe(requestid=requestid,
383 383 typeid=FRAME_TYPE_COMMAND_RESPONSE,
384 384 flags=flags,
385 385 payload=overall + data)
386 386 return
387 387
388 388 # It's easier to send the overall CBOR map in its own frame than to track
389 389 # offsets.
390 390 yield stream.makeframe(requestid=requestid,
391 391 typeid=FRAME_TYPE_COMMAND_RESPONSE,
392 392 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
393 393 payload=overall)
394 394
395 395 offset = 0
396 396 while True:
397 397 chunk = data[offset:offset + maxframesize]
398 398 offset += len(chunk)
399 399 done = offset == len(data)
400 400
401 401 if done:
402 402 flags = FLAG_COMMAND_RESPONSE_EOS
403 403 else:
404 404 flags = FLAG_COMMAND_RESPONSE_CONTINUATION
405 405
406 406 yield stream.makeframe(requestid=requestid,
407 407 typeid=FRAME_TYPE_COMMAND_RESPONSE,
408 408 flags=flags,
409 409 payload=chunk)
410 410
411 411 if done:
412 412 break
413 413
414 414 def createbytesresponseframesfromgen(stream, requestid, gen,
415 415 maxframesize=DEFAULT_MAX_FRAME_SIZE):
416 416 """Generator of frames from a generator of byte chunks.
417 417
418 418 This assumes that another frame will follow whatever this emits. i.e.
419 419 this always emits the continuation flag and never emits the end-of-stream
420 420 flag.
421 421 """
422 422 cb = util.chunkbuffer(gen)
423 423 flags = FLAG_COMMAND_RESPONSE_CONTINUATION
424 424
425 425 while True:
426 426 chunk = cb.read(maxframesize)
427 427 if not chunk:
428 428 break
429 429
430 430 yield stream.makeframe(requestid=requestid,
431 431 typeid=FRAME_TYPE_COMMAND_RESPONSE,
432 432 flags=flags,
433 433 payload=chunk)
434 434
435 435 flags |= FLAG_COMMAND_RESPONSE_CONTINUATION
436 436
437 437 def createcommandresponseokframe(stream, requestid):
438 438 overall = b''.join(cborutil.streamencode({b'status': b'ok'}))
439 439
440 440 return stream.makeframe(requestid=requestid,
441 441 typeid=FRAME_TYPE_COMMAND_RESPONSE,
442 442 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
443 443 payload=overall)
444 444
445 445 def createcommandresponseeosframe(stream, requestid):
446 446 """Create an empty payload frame representing command end-of-stream."""
447 447 return stream.makeframe(requestid=requestid,
448 448 typeid=FRAME_TYPE_COMMAND_RESPONSE,
449 449 flags=FLAG_COMMAND_RESPONSE_EOS,
450 450 payload=b'')
451 451
452 452 def createalternatelocationresponseframe(stream, requestid, location):
453 453 data = {
454 454 b'status': b'redirect',
455 455 b'location': {
456 456 b'url': location.url,
457 457 b'mediatype': location.mediatype,
458 458 }
459 459 }
460 460
461 461 for a in (r'size', r'fullhashes', r'fullhashseed', r'serverdercerts',
462 462 r'servercadercerts'):
463 463 value = getattr(location, a)
464 464 if value is not None:
465 465 data[b'location'][pycompat.bytestr(a)] = value
466 466
467 467 return stream.makeframe(requestid=requestid,
468 468 typeid=FRAME_TYPE_COMMAND_RESPONSE,
469 469 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
470 470 payload=b''.join(cborutil.streamencode(data)))
471 471
472 472 def createcommanderrorresponse(stream, requestid, message, args=None):
473 473 # TODO should this be using a list of {'msg': ..., 'args': {}} so atom
474 474 # formatting works consistently?
475 475 m = {
476 476 b'status': b'error',
477 477 b'error': {
478 478 b'message': message,
479 479 }
480 480 }
481 481
482 482 if args:
483 483 m[b'error'][b'args'] = args
484 484
485 485 overall = b''.join(cborutil.streamencode(m))
486 486
487 487 yield stream.makeframe(requestid=requestid,
488 488 typeid=FRAME_TYPE_COMMAND_RESPONSE,
489 489 flags=FLAG_COMMAND_RESPONSE_EOS,
490 490 payload=overall)
491 491
492 492 def createerrorframe(stream, requestid, msg, errtype):
493 493 # TODO properly handle frame size limits.
494 494 assert len(msg) <= DEFAULT_MAX_FRAME_SIZE
495 495
496 496 payload = b''.join(cborutil.streamencode({
497 497 b'type': errtype,
498 498 b'message': [{b'msg': msg}],
499 499 }))
500 500
501 501 yield stream.makeframe(requestid=requestid,
502 502 typeid=FRAME_TYPE_ERROR_RESPONSE,
503 503 flags=0,
504 504 payload=payload)
505 505
506 506 def createtextoutputframe(stream, requestid, atoms,
507 507 maxframesize=DEFAULT_MAX_FRAME_SIZE):
508 508 """Create a text output frame to render text to people.
509 509
510 510 ``atoms`` is a 3-tuple of (formatting string, args, labels).
511 511
512 512 The formatting string contains ``%s`` tokens to be replaced by the
513 513 corresponding indexed entry in ``args``. ``labels`` is an iterable of
514 514 formatters to be applied at rendering time. In terms of the ``ui``
515 515 class, each atom corresponds to a ``ui.write()``.
516 516 """
517 517 atomdicts = []
518 518
519 519 for (formatting, args, labels) in atoms:
520 520 # TODO look for localstr, other types here?
521 521
522 522 if not isinstance(formatting, bytes):
523 523 raise ValueError('must use bytes formatting strings')
524 524 for arg in args:
525 525 if not isinstance(arg, bytes):
526 526 raise ValueError('must use bytes for arguments')
527 527 for label in labels:
528 528 if not isinstance(label, bytes):
529 529 raise ValueError('must use bytes for labels')
530 530
531 531 # Formatting string must be ASCII.
532 532 formatting = formatting.decode(r'ascii', r'replace').encode(r'ascii')
533 533
534 534 # Arguments must be UTF-8.
535 535 args = [a.decode(r'utf-8', r'replace').encode(r'utf-8') for a in args]
536 536
537 537 # Labels must be ASCII.
538 538 labels = [l.decode(r'ascii', r'strict').encode(r'ascii')
539 539 for l in labels]
540 540
541 541 atom = {b'msg': formatting}
542 542 if args:
543 543 atom[b'args'] = args
544 544 if labels:
545 545 atom[b'labels'] = labels
546 546
547 547 atomdicts.append(atom)
548 548
549 549 payload = b''.join(cborutil.streamencode(atomdicts))
550 550
551 551 if len(payload) > maxframesize:
552 552 raise ValueError('cannot encode data in a single frame')
553 553
554 554 yield stream.makeframe(requestid=requestid,
555 555 typeid=FRAME_TYPE_TEXT_OUTPUT,
556 556 flags=0,
557 557 payload=payload)
558 558
559 559 class bufferingcommandresponseemitter(object):
560 560 """Helper object to emit command response frames intelligently.
561 561
562 562 Raw command response data is likely emitted in chunks much smaller
563 563 than what can fit in a single frame. This class exists to buffer
564 564 chunks until enough data is available to fit in a single frame.
565 565
566 566 TODO we'll need something like this when compression is supported.
567 567 So it might make sense to implement this functionality at the stream
568 568 level.
569 569 """
570 570 def __init__(self, stream, requestid, maxframesize=DEFAULT_MAX_FRAME_SIZE):
571 571 self._stream = stream
572 572 self._requestid = requestid
573 573 self._maxsize = maxframesize
574 574 self._chunks = []
575 575 self._chunkssize = 0
576 576
577 577 def send(self, data):
578 578 """Send new data for emission.
579 579
580 580 Is a generator of new frames that were derived from the new input.
581 581
582 582 If the special input ``None`` is received, flushes all buffered
583 583 data to frames.
584 584 """
585 585
586 586 if data is None:
587 587 for frame in self._flush():
588 588 yield frame
589 589 return
590 590
591 591 # There is a ton of potential to do more complicated things here.
592 592 # Our immediate goal is to coalesce small chunks into big frames,
593 593 # not achieve the fewest number of frames possible. So we go with
594 594 # a simple implementation:
595 595 #
596 596 # * If a chunk is too large for a frame, we flush and emit frames
597 597 # for the new chunk.
598 598 # * If a chunk can be buffered without total buffered size limits
599 599 # being exceeded, we do that.
600 600 # * If a chunk causes us to go over our buffering limit, we flush
601 601 # and then buffer the new chunk.
602 602
603 603 if len(data) > self._maxsize:
604 604 for frame in self._flush():
605 605 yield frame
606 606
607 607 # Now emit frames for the big chunk.
608 608 offset = 0
609 609 while True:
610 610 chunk = data[offset:offset + self._maxsize]
611 611 offset += len(chunk)
612 612
613 613 yield self._stream.makeframe(
614 614 self._requestid,
615 615 typeid=FRAME_TYPE_COMMAND_RESPONSE,
616 616 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
617 617 payload=chunk)
618 618
619 619 if offset == len(data):
620 620 return
621 621
622 622 # If we don't have enough to constitute a full frame, buffer and
623 623 # return.
624 624 if len(data) + self._chunkssize < self._maxsize:
625 625 self._chunks.append(data)
626 626 self._chunkssize += len(data)
627 627 return
628 628
629 629 # Else flush what we have and buffer the new chunk. We could do
630 630 # something more intelligent here, like break the chunk. Let's
631 631 # keep things simple for now.
632 632 for frame in self._flush():
633 633 yield frame
634 634
635 635 self._chunks.append(data)
636 636 self._chunkssize = len(data)
637 637
638 638 def _flush(self):
639 639 payload = b''.join(self._chunks)
640 640 assert len(payload) <= self._maxsize
641 641
642 642 self._chunks[:] = []
643 643 self._chunkssize = 0
644 644
645 645 yield self._stream.makeframe(
646 646 self._requestid,
647 647 typeid=FRAME_TYPE_COMMAND_RESPONSE,
648 648 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
649 649 payload=payload)
650 650
651 651 class stream(object):
652 652 """Represents a logical unidirectional series of frames."""
653 653
654 654 def __init__(self, streamid, active=False):
655 655 self.streamid = streamid
656 656 self._active = active
657 657
658 658 def makeframe(self, requestid, typeid, flags, payload):
659 659 """Create a frame to be sent out over this stream.
660 660
661 661 Only returns the frame instance. Does not actually send it.
662 662 """
663 663 streamflags = 0
664 664 if not self._active:
665 665 streamflags |= STREAM_FLAG_BEGIN_STREAM
666 666 self._active = True
667 667
668 668 return makeframe(requestid, self.streamid, streamflags, typeid, flags,
669 669 payload)
670 670
671 class inputstream(stream):
672 """Represents a stream used for receiving data."""
673
671 674 def setdecoder(self, name, extraobjs):
672 675 """Set the decoder for this stream.
673 676
674 677 Receives the stream profile name and any additional CBOR objects
675 678 decoded from the stream encoding settings frame payloads.
676 679 """
677 680
681 class outputstream(stream):
682 """Represents a stream used for sending data."""
683
678 684 def ensureserverstream(stream):
679 685 if stream.streamid % 2:
680 686 raise error.ProgrammingError('server should only write to even '
681 687 'numbered streams; %d is not even' %
682 688 stream.streamid)
683 689
684 690 DEFAULT_PROTOCOL_SETTINGS = {
685 691 'contentencodings': [b'identity'],
686 692 }
687 693
688 694 class serverreactor(object):
689 695 """Holds state of a server handling frame-based protocol requests.
690 696
691 697 This class is the "brain" of the unified frame-based protocol server
692 698 component. While the protocol is stateless from the perspective of
693 699 requests/commands, something needs to track which frames have been
694 700 received, what frames to expect, etc. This class is that thing.
695 701
696 702 Instances are modeled as a state machine of sorts. Instances are also
697 703 reactionary to external events. The point of this class is to encapsulate
698 704 the state of the connection and the exchange of frames, not to perform
699 705 work. Instead, callers tell this class when something occurs, like a
700 706 frame arriving. If that activity is worthy of a follow-up action (say
701 707 *run a command*), the return value of that handler will say so.
702 708
703 709 I/O and CPU intensive operations are purposefully delegated outside of
704 710 this class.
705 711
706 712 Consumers are expected to tell instances when events occur. They do so by
707 713 calling the various ``on*`` methods. These methods return a 2-tuple
708 714 describing any follow-up action(s) to take. The first element is the
709 715 name of an action to perform. The second is a data structure (usually
710 716 a dict) specific to that action that contains more information. e.g.
711 717 if the server wants to send frames back to the client, the data structure
712 718 will contain a reference to those frames.
713 719
714 720 Valid actions that consumers can be instructed to take are:
715 721
716 722 sendframes
717 723 Indicates that frames should be sent to the client. The ``framegen``
718 724 key contains a generator of frames that should be sent. The server
719 725 assumes that all frames are sent to the client.
720 726
721 727 error
722 728 Indicates that an error occurred. Consumer should probably abort.
723 729
724 730 runcommand
725 731 Indicates that the consumer should run a wire protocol command. Details
726 732 of the command to run are given in the data structure.
727 733
728 734 wantframe
729 735 Indicates that nothing of interest happened and the server is waiting on
730 736 more frames from the client before anything interesting can be done.
731 737
732 738 noop
733 739 Indicates no additional action is required.
734 740
735 741 Known Issues
736 742 ------------
737 743
738 744 There are no limits to the number of partially received commands or their
739 745 size. A malicious client could stream command request data and exhaust the
740 746 server's memory.
741 747
742 748 Partially received commands are not acted upon when end of input is
743 749 reached. Should the server error if it receives a partial request?
744 750 Should the client send a message to abort a partially transmitted request
745 751 to facilitate graceful shutdown?
746 752
747 753 Active requests that haven't been responded to aren't tracked. This means
748 754 that if we receive a command and instruct its dispatch, another command
749 755 with its request ID can come in over the wire and there will be a race
750 756 between who responds to what.
751 757 """
752 758
753 759 def __init__(self, ui, deferoutput=False):
754 760 """Construct a new server reactor.
755 761
756 762 ``deferoutput`` can be used to indicate that no output frames should be
757 763 instructed to be sent until input has been exhausted. In this mode,
758 764 events that would normally generate output frames (such as a command
759 765 response being ready) will instead defer instructing the consumer to
760 766 send those frames. This is useful for half-duplex transports where the
761 767 sender cannot receive until all data has been transmitted.
762 768 """
763 769 self._ui = ui
764 770 self._deferoutput = deferoutput
765 771 self._state = 'initial'
766 772 self._nextoutgoingstreamid = 2
767 773 self._bufferedframegens = []
768 774 # stream id -> stream instance for all active streams from the client.
769 775 self._incomingstreams = {}
770 776 self._outgoingstreams = {}
771 777 # request id -> dict of commands that are actively being received.
772 778 self._receivingcommands = {}
773 779 # Request IDs that have been received and are actively being processed.
774 780 # Once all output for a request has been sent, it is removed from this
775 781 # set.
776 782 self._activecommands = set()
777 783
778 784 self._protocolsettingsdecoder = None
779 785
780 786 # Sender protocol settings are optional. Set implied default values.
781 787 self._sendersettings = dict(DEFAULT_PROTOCOL_SETTINGS)
782 788
783 789 def onframerecv(self, frame):
784 790 """Process a frame that has been received off the wire.
785 791
786 792 Returns a dict with an ``action`` key that details what action,
787 793 if any, the consumer should take next.
788 794 """
789 795 if not frame.streamid % 2:
790 796 self._state = 'errored'
791 797 return self._makeerrorresult(
792 798 _('received frame with even numbered stream ID: %d') %
793 799 frame.streamid)
794 800
795 801 if frame.streamid not in self._incomingstreams:
796 802 if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM:
797 803 self._state = 'errored'
798 804 return self._makeerrorresult(
799 805 _('received frame on unknown inactive stream without '
800 806 'beginning of stream flag set'))
801 807
802 self._incomingstreams[frame.streamid] = stream(frame.streamid)
808 self._incomingstreams[frame.streamid] = inputstream(frame.streamid)
803 809
804 810 if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED:
805 811 # TODO handle decoding frames
806 812 self._state = 'errored'
807 813 raise error.ProgrammingError('support for decoding stream payloads '
808 814 'not yet implemented')
809 815
810 816 if frame.streamflags & STREAM_FLAG_END_STREAM:
811 817 del self._incomingstreams[frame.streamid]
812 818
813 819 handlers = {
814 820 'initial': self._onframeinitial,
815 821 'protocol-settings-receiving': self._onframeprotocolsettings,
816 822 'idle': self._onframeidle,
817 823 'command-receiving': self._onframecommandreceiving,
818 824 'errored': self._onframeerrored,
819 825 }
820 826
821 827 meth = handlers.get(self._state)
822 828 if not meth:
823 829 raise error.ProgrammingError('unhandled state: %s' % self._state)
824 830
825 831 return meth(frame)
826 832
827 833 def oncommandresponseready(self, stream, requestid, data):
828 834 """Signal that a bytes response is ready to be sent to the client.
829 835
830 836 The raw bytes response is passed as an argument.
831 837 """
832 838 ensureserverstream(stream)
833 839
834 840 def sendframes():
835 841 for frame in createcommandresponseframesfrombytes(stream, requestid,
836 842 data):
837 843 yield frame
838 844
839 845 self._activecommands.remove(requestid)
840 846
841 847 result = sendframes()
842 848
843 849 if self._deferoutput:
844 850 self._bufferedframegens.append(result)
845 851 return 'noop', {}
846 852 else:
847 853 return 'sendframes', {
848 854 'framegen': result,
849 855 }
850 856
851 857 def oncommandresponsereadyobjects(self, stream, requestid, objs):
852 858 """Signal that objects are ready to be sent to the client.
853 859
854 860 ``objs`` is an iterable of objects (typically a generator) that will
855 861 be encoded via CBOR and added to frames, which will be sent to the
856 862 client.
857 863 """
858 864 ensureserverstream(stream)
859 865
860 866 # We need to take care over exception handling. Uncaught exceptions
861 867 # when generating frames could lead to premature end of the frame
862 868 # stream and the possibility of the server or client process getting
863 869 # in a bad state.
864 870 #
865 871 # Keep in mind that if ``objs`` is a generator, advancing it could
866 872 # raise exceptions that originated in e.g. wire protocol command
867 873 # functions. That is why we differentiate between exceptions raised
868 874 # when iterating versus other exceptions that occur.
869 875 #
870 876 # In all cases, when the function finishes, the request is fully
871 877 # handled and no new frames for it should be seen.
872 878
873 879 def sendframes():
874 880 emitted = False
875 881 alternatelocationsent = False
876 882 emitter = bufferingcommandresponseemitter(stream, requestid)
877 883 while True:
878 884 try:
879 885 o = next(objs)
880 886 except StopIteration:
881 887 for frame in emitter.send(None):
882 888 yield frame
883 889
884 890 if emitted:
885 891 yield createcommandresponseeosframe(stream, requestid)
886 892 break
887 893
888 894 except error.WireprotoCommandError as e:
889 895 for frame in createcommanderrorresponse(
890 896 stream, requestid, e.message, e.messageargs):
891 897 yield frame
892 898 break
893 899
894 900 except Exception as e:
895 901 for frame in createerrorframe(
896 902 stream, requestid, '%s' % stringutil.forcebytestr(e),
897 903 errtype='server'):
898 904
899 905 yield frame
900 906
901 907 break
902 908
903 909 try:
904 910 # Alternate location responses can only be the first and
905 911 # only object in the output stream.
906 912 if isinstance(o, wireprototypes.alternatelocationresponse):
907 913 if emitted:
908 914 raise error.ProgrammingError(
909 915 'alternatelocationresponse seen after initial '
910 916 'output object')
911 917
912 918 yield createalternatelocationresponseframe(
913 919 stream, requestid, o)
914 920
915 921 alternatelocationsent = True
916 922 emitted = True
917 923 continue
918 924
919 925 if alternatelocationsent:
920 926 raise error.ProgrammingError(
921 927 'object follows alternatelocationresponse')
922 928
923 929 if not emitted:
924 930 yield createcommandresponseokframe(stream, requestid)
925 931 emitted = True
926 932
927 933 # Objects emitted by command functions can be serializable
928 934 # data structures or special types.
929 935 # TODO consider extracting the content normalization to a
930 936 # standalone function, as it may be useful for e.g. cachers.
931 937
932 938 # A pre-encoded object is sent directly to the emitter.
933 939 if isinstance(o, wireprototypes.encodedresponse):
934 940 for frame in emitter.send(o.data):
935 941 yield frame
936 942
937 943 # A regular object is CBOR encoded.
938 944 else:
939 945 for chunk in cborutil.streamencode(o):
940 946 for frame in emitter.send(chunk):
941 947 yield frame
942 948
943 949 except Exception as e:
944 950 for frame in createerrorframe(stream, requestid,
945 951 '%s' % e,
946 952 errtype='server'):
947 953 yield frame
948 954
949 955 break
950 956
951 957 self._activecommands.remove(requestid)
952 958
953 959 return self._handlesendframes(sendframes())
954 960
955 961 def oninputeof(self):
956 962 """Signals that end of input has been received.
957 963
958 964 No more frames will be received. All pending activity should be
959 965 completed.
960 966 """
961 967 # TODO should we do anything about in-flight commands?
962 968
963 969 if not self._deferoutput or not self._bufferedframegens:
964 970 return 'noop', {}
965 971
966 972 # If we buffered all our responses, emit those.
967 973 def makegen():
968 974 for gen in self._bufferedframegens:
969 975 for frame in gen:
970 976 yield frame
971 977
972 978 return 'sendframes', {
973 979 'framegen': makegen(),
974 980 }
975 981
976 982 def _handlesendframes(self, framegen):
977 983 if self._deferoutput:
978 984 self._bufferedframegens.append(framegen)
979 985 return 'noop', {}
980 986 else:
981 987 return 'sendframes', {
982 988 'framegen': framegen,
983 989 }
984 990
985 991 def onservererror(self, stream, requestid, msg):
986 992 ensureserverstream(stream)
987 993
988 994 def sendframes():
989 995 for frame in createerrorframe(stream, requestid, msg,
990 996 errtype='server'):
991 997 yield frame
992 998
993 999 self._activecommands.remove(requestid)
994 1000
995 1001 return self._handlesendframes(sendframes())
996 1002
997 1003 def oncommanderror(self, stream, requestid, message, args=None):
998 1004 """Called when a command encountered an error before sending output."""
999 1005 ensureserverstream(stream)
1000 1006
1001 1007 def sendframes():
1002 1008 for frame in createcommanderrorresponse(stream, requestid, message,
1003 1009 args):
1004 1010 yield frame
1005 1011
1006 1012 self._activecommands.remove(requestid)
1007 1013
1008 1014 return self._handlesendframes(sendframes())
1009 1015
1010 1016 def makeoutputstream(self):
1011 1017 """Create a stream to be used for sending data to the client."""
1012 1018 streamid = self._nextoutgoingstreamid
1013 1019 self._nextoutgoingstreamid += 2
1014 1020
1015 s = stream(streamid)
1021 s = outputstream(streamid)
1016 1022 self._outgoingstreams[streamid] = s
1017 1023
1018 1024 return s
1019 1025
1020 1026 def _makeerrorresult(self, msg):
1021 1027 return 'error', {
1022 1028 'message': msg,
1023 1029 }
1024 1030
1025 1031 def _makeruncommandresult(self, requestid):
1026 1032 entry = self._receivingcommands[requestid]
1027 1033
1028 1034 if not entry['requestdone']:
1029 1035 self._state = 'errored'
1030 1036 raise error.ProgrammingError('should not be called without '
1031 1037 'requestdone set')
1032 1038
1033 1039 del self._receivingcommands[requestid]
1034 1040
1035 1041 if self._receivingcommands:
1036 1042 self._state = 'command-receiving'
1037 1043 else:
1038 1044 self._state = 'idle'
1039 1045
1040 1046 # Decode the payloads as CBOR.
1041 1047 entry['payload'].seek(0)
1042 1048 request = cborutil.decodeall(entry['payload'].getvalue())[0]
1043 1049
1044 1050 if b'name' not in request:
1045 1051 self._state = 'errored'
1046 1052 return self._makeerrorresult(
1047 1053 _('command request missing "name" field'))
1048 1054
1049 1055 if b'args' not in request:
1050 1056 request[b'args'] = {}
1051 1057
1052 1058 assert requestid not in self._activecommands
1053 1059 self._activecommands.add(requestid)
1054 1060
1055 1061 return 'runcommand', {
1056 1062 'requestid': requestid,
1057 1063 'command': request[b'name'],
1058 1064 'args': request[b'args'],
1059 1065 'redirect': request.get(b'redirect'),
1060 1066 'data': entry['data'].getvalue() if entry['data'] else None,
1061 1067 }
1062 1068
1063 1069 def _makewantframeresult(self):
1064 1070 return 'wantframe', {
1065 1071 'state': self._state,
1066 1072 }
1067 1073
1068 1074 def _validatecommandrequestframe(self, frame):
1069 1075 new = frame.flags & FLAG_COMMAND_REQUEST_NEW
1070 1076 continuation = frame.flags & FLAG_COMMAND_REQUEST_CONTINUATION
1071 1077
1072 1078 if new and continuation:
1073 1079 self._state = 'errored'
1074 1080 return self._makeerrorresult(
1075 1081 _('received command request frame with both new and '
1076 1082 'continuation flags set'))
1077 1083
1078 1084 if not new and not continuation:
1079 1085 self._state = 'errored'
1080 1086 return self._makeerrorresult(
1081 1087 _('received command request frame with neither new nor '
1082 1088 'continuation flags set'))
1083 1089
1084 1090 def _onframeinitial(self, frame):
1085 1091 # Called when we receive a frame when in the "initial" state.
1086 1092 if frame.typeid == FRAME_TYPE_SENDER_PROTOCOL_SETTINGS:
1087 1093 self._state = 'protocol-settings-receiving'
1088 1094 self._protocolsettingsdecoder = cborutil.bufferingdecoder()
1089 1095 return self._onframeprotocolsettings(frame)
1090 1096
1091 1097 elif frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
1092 1098 self._state = 'idle'
1093 1099 return self._onframeidle(frame)
1094 1100
1095 1101 else:
1096 1102 self._state = 'errored'
1097 1103 return self._makeerrorresult(
1098 1104 _('expected sender protocol settings or command request '
1099 1105 'frame; got %d') % frame.typeid)
1100 1106
1101 1107 def _onframeprotocolsettings(self, frame):
1102 1108 assert self._state == 'protocol-settings-receiving'
1103 1109 assert self._protocolsettingsdecoder is not None
1104 1110
1105 1111 if frame.typeid != FRAME_TYPE_SENDER_PROTOCOL_SETTINGS:
1106 1112 self._state = 'errored'
1107 1113 return self._makeerrorresult(
1108 1114 _('expected sender protocol settings frame; got %d') %
1109 1115 frame.typeid)
1110 1116
1111 1117 more = frame.flags & FLAG_SENDER_PROTOCOL_SETTINGS_CONTINUATION
1112 1118 eos = frame.flags & FLAG_SENDER_PROTOCOL_SETTINGS_EOS
1113 1119
1114 1120 if more and eos:
1115 1121 self._state = 'errored'
1116 1122 return self._makeerrorresult(
1117 1123 _('sender protocol settings frame cannot have both '
1118 1124 'continuation and end of stream flags set'))
1119 1125
1120 1126 if not more and not eos:
1121 1127 self._state = 'errored'
1122 1128 return self._makeerrorresult(
1123 1129 _('sender protocol settings frame must have continuation or '
1124 1130 'end of stream flag set'))
1125 1131
1126 1132 # TODO establish limits for maximum amount of data that can be
1127 1133 # buffered.
1128 1134 try:
1129 1135 self._protocolsettingsdecoder.decode(frame.payload)
1130 1136 except Exception as e:
1131 1137 self._state = 'errored'
1132 1138 return self._makeerrorresult(
1133 1139 _('error decoding CBOR from sender protocol settings frame: %s')
1134 1140 % stringutil.forcebytestr(e))
1135 1141
1136 1142 if more:
1137 1143 return self._makewantframeresult()
1138 1144
1139 1145 assert eos
1140 1146
1141 1147 decoded = self._protocolsettingsdecoder.getavailable()
1142 1148 self._protocolsettingsdecoder = None
1143 1149
1144 1150 if not decoded:
1145 1151 self._state = 'errored'
1146 1152 return self._makeerrorresult(
1147 1153 _('sender protocol settings frame did not contain CBOR data'))
1148 1154 elif len(decoded) > 1:
1149 1155 self._state = 'errored'
1150 1156 return self._makeerrorresult(
1151 1157 _('sender protocol settings frame contained multiple CBOR '
1152 1158 'values'))
1153 1159
1154 1160 d = decoded[0]
1155 1161
1156 1162 if b'contentencodings' in d:
1157 1163 self._sendersettings['contentencodings'] = d[b'contentencodings']
1158 1164
1159 1165 self._state = 'idle'
1160 1166
1161 1167 return self._makewantframeresult()
1162 1168
1163 1169 def _onframeidle(self, frame):
1164 1170 # The only frame type that should be received in this state is a
1165 1171 # command request.
1166 1172 if frame.typeid != FRAME_TYPE_COMMAND_REQUEST:
1167 1173 self._state = 'errored'
1168 1174 return self._makeerrorresult(
1169 1175 _('expected command request frame; got %d') % frame.typeid)
1170 1176
1171 1177 res = self._validatecommandrequestframe(frame)
1172 1178 if res:
1173 1179 return res
1174 1180
1175 1181 if frame.requestid in self._receivingcommands:
1176 1182 self._state = 'errored'
1177 1183 return self._makeerrorresult(
1178 1184 _('request with ID %d already received') % frame.requestid)
1179 1185
1180 1186 if frame.requestid in self._activecommands:
1181 1187 self._state = 'errored'
1182 1188 return self._makeerrorresult(
1183 1189 _('request with ID %d is already active') % frame.requestid)
1184 1190
1185 1191 new = frame.flags & FLAG_COMMAND_REQUEST_NEW
1186 1192 moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES
1187 1193 expectingdata = frame.flags & FLAG_COMMAND_REQUEST_EXPECT_DATA
1188 1194
1189 1195 if not new:
1190 1196 self._state = 'errored'
1191 1197 return self._makeerrorresult(
1192 1198 _('received command request frame without new flag set'))
1193 1199
1194 1200 payload = util.bytesio()
1195 1201 payload.write(frame.payload)
1196 1202
1197 1203 self._receivingcommands[frame.requestid] = {
1198 1204 'payload': payload,
1199 1205 'data': None,
1200 1206 'requestdone': not moreframes,
1201 1207 'expectingdata': bool(expectingdata),
1202 1208 }
1203 1209
1204 1210 # This is the final frame for this request. Dispatch it.
1205 1211 if not moreframes and not expectingdata:
1206 1212 return self._makeruncommandresult(frame.requestid)
1207 1213
1208 1214 assert moreframes or expectingdata
1209 1215 self._state = 'command-receiving'
1210 1216 return self._makewantframeresult()
1211 1217
1212 1218 def _onframecommandreceiving(self, frame):
1213 1219 if frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
1214 1220 # Process new command requests as such.
1215 1221 if frame.flags & FLAG_COMMAND_REQUEST_NEW:
1216 1222 return self._onframeidle(frame)
1217 1223
1218 1224 res = self._validatecommandrequestframe(frame)
1219 1225 if res:
1220 1226 return res
1221 1227
1222 1228 # All other frames should be related to a command that is currently
1223 1229 # receiving but is not active.
1224 1230 if frame.requestid in self._activecommands:
1225 1231 self._state = 'errored'
1226 1232 return self._makeerrorresult(
1227 1233 _('received frame for request that is still active: %d') %
1228 1234 frame.requestid)
1229 1235
1230 1236 if frame.requestid not in self._receivingcommands:
1231 1237 self._state = 'errored'
1232 1238 return self._makeerrorresult(
1233 1239 _('received frame for request that is not receiving: %d') %
1234 1240 frame.requestid)
1235 1241
1236 1242 entry = self._receivingcommands[frame.requestid]
1237 1243
1238 1244 if frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
1239 1245 moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES
1240 1246 expectingdata = bool(frame.flags & FLAG_COMMAND_REQUEST_EXPECT_DATA)
1241 1247
1242 1248 if entry['requestdone']:
1243 1249 self._state = 'errored'
1244 1250 return self._makeerrorresult(
1245 1251 _('received command request frame when request frames '
1246 1252 'were supposedly done'))
1247 1253
1248 1254 if expectingdata != entry['expectingdata']:
1249 1255 self._state = 'errored'
1250 1256 return self._makeerrorresult(
1251 1257 _('mismatch between expect data flag and previous frame'))
1252 1258
1253 1259 entry['payload'].write(frame.payload)
1254 1260
1255 1261 if not moreframes:
1256 1262 entry['requestdone'] = True
1257 1263
1258 1264 if not moreframes and not expectingdata:
1259 1265 return self._makeruncommandresult(frame.requestid)
1260 1266
1261 1267 return self._makewantframeresult()
1262 1268
1263 1269 elif frame.typeid == FRAME_TYPE_COMMAND_DATA:
1264 1270 if not entry['expectingdata']:
1265 1271 self._state = 'errored'
1266 1272 return self._makeerrorresult(_(
1267 1273 'received command data frame for request that is not '
1268 1274 'expecting data: %d') % frame.requestid)
1269 1275
1270 1276 if entry['data'] is None:
1271 1277 entry['data'] = util.bytesio()
1272 1278
1273 1279 return self._handlecommanddataframe(frame, entry)
1274 1280 else:
1275 1281 self._state = 'errored'
1276 1282 return self._makeerrorresult(_(
1277 1283 'received unexpected frame type: %d') % frame.typeid)
1278 1284
1279 1285 def _handlecommanddataframe(self, frame, entry):
1280 1286 assert frame.typeid == FRAME_TYPE_COMMAND_DATA
1281 1287
1282 1288 # TODO support streaming data instead of buffering it.
1283 1289 entry['data'].write(frame.payload)
1284 1290
1285 1291 if frame.flags & FLAG_COMMAND_DATA_CONTINUATION:
1286 1292 return self._makewantframeresult()
1287 1293 elif frame.flags & FLAG_COMMAND_DATA_EOS:
1288 1294 entry['data'].seek(0)
1289 1295 return self._makeruncommandresult(frame.requestid)
1290 1296 else:
1291 1297 self._state = 'errored'
1292 1298 return self._makeerrorresult(_('command data frame without '
1293 1299 'flags'))
1294 1300
1295 1301 def _onframeerrored(self, frame):
1296 1302 return self._makeerrorresult(_('server already errored'))
1297 1303
1298 1304 class commandrequest(object):
1299 1305 """Represents a request to run a command."""
1300 1306
1301 1307 def __init__(self, requestid, name, args, datafh=None, redirect=None):
1302 1308 self.requestid = requestid
1303 1309 self.name = name
1304 1310 self.args = args
1305 1311 self.datafh = datafh
1306 1312 self.redirect = redirect
1307 1313 self.state = 'pending'
1308 1314
1309 1315 class clientreactor(object):
1310 1316 """Holds state of a client issuing frame-based protocol requests.
1311 1317
1312 1318 This is like ``serverreactor`` but for client-side state.
1313 1319
1314 1320 Each instance is bound to the lifetime of a connection. For persistent
1315 1321 connection transports using e.g. TCP sockets and speaking the raw
1316 1322 framing protocol, there will be a single instance for the lifetime of
1317 1323 the TCP socket. For transports where there are multiple discrete
1318 1324 interactions (say tunneled within in HTTP request), there will be a
1319 1325 separate instance for each distinct interaction.
1320 1326
1321 1327 Consumers are expected to tell instances when events occur by calling
1322 1328 various methods. These methods return a 2-tuple describing any follow-up
1323 1329 action(s) to take. The first element is the name of an action to
1324 1330 perform. The second is a data structure (usually a dict) specific to
1325 1331 that action that contains more information. e.g. if the reactor wants
1326 1332 to send frames to the server, the data structure will contain a reference
1327 1333 to those frames.
1328 1334
1329 1335 Valid actions that consumers can be instructed to take are:
1330 1336
1331 1337 noop
1332 1338 Indicates no additional action is required.
1333 1339
1334 1340 sendframes
1335 1341 Indicates that frames should be sent to the server. The ``framegen``
1336 1342 key contains a generator of frames that should be sent. The reactor
1337 1343 assumes that all frames in this generator are sent to the server.
1338 1344
1339 1345 error
1340 1346 Indicates that an error occurred. The ``message`` key contains an
1341 1347 error message describing the failure.
1342 1348
1343 1349 responsedata
1344 1350 Indicates a response to a previously-issued command was received.
1345 1351
1346 1352 The ``request`` key contains the ``commandrequest`` instance that
1347 1353 represents the request this data is for.
1348 1354
1349 1355 The ``data`` key contains the decoded data from the server.
1350 1356
1351 1357 ``expectmore`` and ``eos`` evaluate to True when more response data
1352 1358 is expected to follow or we're at the end of the response stream,
1353 1359 respectively.
1354 1360 """
1355 1361 def __init__(self, ui, hasmultiplesend=False, buffersends=True):
1356 1362 """Create a new instance.
1357 1363
1358 1364 ``hasmultiplesend`` indicates whether multiple sends are supported
1359 1365 by the transport. When True, it is possible to send commands immediately
1360 1366 instead of buffering until the caller signals an intent to finish a
1361 1367 send operation.
1362 1368
1363 1369 ``buffercommands`` indicates whether sends should be buffered until the
1364 1370 last request has been issued.
1365 1371 """
1366 1372 self._ui = ui
1367 1373 self._hasmultiplesend = hasmultiplesend
1368 1374 self._buffersends = buffersends
1369 1375
1370 1376 self._canissuecommands = True
1371 1377 self._cansend = True
1372 1378
1373 1379 self._nextrequestid = 1
1374 1380 # We only support a single outgoing stream for now.
1375 self._outgoingstream = stream(1)
1381 self._outgoingstream = outputstream(1)
1376 1382 self._pendingrequests = collections.deque()
1377 1383 self._activerequests = {}
1378 1384 self._incomingstreams = {}
1379 1385 self._streamsettingsdecoders = {}
1380 1386
1381 1387 def callcommand(self, name, args, datafh=None, redirect=None):
1382 1388 """Request that a command be executed.
1383 1389
1384 1390 Receives the command name, a dict of arguments to pass to the command,
1385 1391 and an optional file object containing the raw data for the command.
1386 1392
1387 1393 Returns a 3-tuple of (request, action, action data).
1388 1394 """
1389 1395 if not self._canissuecommands:
1390 1396 raise error.ProgrammingError('cannot issue new commands')
1391 1397
1392 1398 requestid = self._nextrequestid
1393 1399 self._nextrequestid += 2
1394 1400
1395 1401 request = commandrequest(requestid, name, args, datafh=datafh,
1396 1402 redirect=redirect)
1397 1403
1398 1404 if self._buffersends:
1399 1405 self._pendingrequests.append(request)
1400 1406 return request, 'noop', {}
1401 1407 else:
1402 1408 if not self._cansend:
1403 1409 raise error.ProgrammingError('sends cannot be performed on '
1404 1410 'this instance')
1405 1411
1406 1412 if not self._hasmultiplesend:
1407 1413 self._cansend = False
1408 1414 self._canissuecommands = False
1409 1415
1410 1416 return request, 'sendframes', {
1411 1417 'framegen': self._makecommandframes(request),
1412 1418 }
1413 1419
1414 1420 def flushcommands(self):
1415 1421 """Request that all queued commands be sent.
1416 1422
1417 1423 If any commands are buffered, this will instruct the caller to send
1418 1424 them over the wire. If no commands are buffered it instructs the client
1419 1425 to no-op.
1420 1426
1421 1427 If instances aren't configured for multiple sends, no new command
1422 1428 requests are allowed after this is called.
1423 1429 """
1424 1430 if not self._pendingrequests:
1425 1431 return 'noop', {}
1426 1432
1427 1433 if not self._cansend:
1428 1434 raise error.ProgrammingError('sends cannot be performed on this '
1429 1435 'instance')
1430 1436
1431 1437 # If the instance only allows sending once, mark that we have fired
1432 1438 # our one shot.
1433 1439 if not self._hasmultiplesend:
1434 1440 self._canissuecommands = False
1435 1441 self._cansend = False
1436 1442
1437 1443 def makeframes():
1438 1444 while self._pendingrequests:
1439 1445 request = self._pendingrequests.popleft()
1440 1446 for frame in self._makecommandframes(request):
1441 1447 yield frame
1442 1448
1443 1449 return 'sendframes', {
1444 1450 'framegen': makeframes(),
1445 1451 }
1446 1452
1447 1453 def _makecommandframes(self, request):
1448 1454 """Emit frames to issue a command request.
1449 1455
1450 1456 As a side-effect, update request accounting to reflect its changed
1451 1457 state.
1452 1458 """
1453 1459 self._activerequests[request.requestid] = request
1454 1460 request.state = 'sending'
1455 1461
1456 1462 res = createcommandframes(self._outgoingstream,
1457 1463 request.requestid,
1458 1464 request.name,
1459 1465 request.args,
1460 1466 datafh=request.datafh,
1461 1467 redirect=request.redirect)
1462 1468
1463 1469 for frame in res:
1464 1470 yield frame
1465 1471
1466 1472 request.state = 'sent'
1467 1473
1468 1474 def onframerecv(self, frame):
1469 1475 """Process a frame that has been received off the wire.
1470 1476
1471 1477 Returns a 2-tuple of (action, meta) describing further action the
1472 1478 caller needs to take as a result of receiving this frame.
1473 1479 """
1474 1480 if frame.streamid % 2:
1475 1481 return 'error', {
1476 1482 'message': (
1477 1483 _('received frame with odd numbered stream ID: %d') %
1478 1484 frame.streamid),
1479 1485 }
1480 1486
1481 1487 if frame.streamid not in self._incomingstreams:
1482 1488 if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM:
1483 1489 return 'error', {
1484 1490 'message': _('received frame on unknown stream '
1485 1491 'without beginning of stream flag set'),
1486 1492 }
1487 1493
1488 self._incomingstreams[frame.streamid] = stream(frame.streamid)
1494 self._incomingstreams[frame.streamid] = inputstream(
1495 frame.streamid)
1489 1496
1490 1497 if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED:
1491 1498 raise error.ProgrammingError('support for decoding stream '
1492 1499 'payloads not yet implemneted')
1493 1500
1494 1501 if frame.streamflags & STREAM_FLAG_END_STREAM:
1495 1502 del self._incomingstreams[frame.streamid]
1496 1503
1497 1504 if frame.typeid == FRAME_TYPE_STREAM_SETTINGS:
1498 1505 return self._onstreamsettingsframe(frame)
1499 1506
1500 1507 if frame.requestid not in self._activerequests:
1501 1508 return 'error', {
1502 1509 'message': (_('received frame for inactive request ID: %d') %
1503 1510 frame.requestid),
1504 1511 }
1505 1512
1506 1513 request = self._activerequests[frame.requestid]
1507 1514 request.state = 'receiving'
1508 1515
1509 1516 handlers = {
1510 1517 FRAME_TYPE_COMMAND_RESPONSE: self._oncommandresponseframe,
1511 1518 FRAME_TYPE_ERROR_RESPONSE: self._onerrorresponseframe,
1512 1519 }
1513 1520
1514 1521 meth = handlers.get(frame.typeid)
1515 1522 if not meth:
1516 1523 raise error.ProgrammingError('unhandled frame type: %d' %
1517 1524 frame.typeid)
1518 1525
1519 1526 return meth(request, frame)
1520 1527
1521 1528 def _onstreamsettingsframe(self, frame):
1522 1529 assert frame.typeid == FRAME_TYPE_STREAM_SETTINGS
1523 1530
1524 1531 more = frame.flags & FLAG_STREAM_ENCODING_SETTINGS_CONTINUATION
1525 1532 eos = frame.flags & FLAG_STREAM_ENCODING_SETTINGS_EOS
1526 1533
1527 1534 if more and eos:
1528 1535 return 'error', {
1529 1536 'message': (_('stream encoding settings frame cannot have both '
1530 1537 'continuation and end of stream flags set')),
1531 1538 }
1532 1539
1533 1540 if not more and not eos:
1534 1541 return 'error', {
1535 1542 'message': _('stream encoding settings frame must have '
1536 1543 'continuation or end of stream flag set'),
1537 1544 }
1538 1545
1539 1546 if frame.streamid not in self._streamsettingsdecoders:
1540 1547 decoder = cborutil.bufferingdecoder()
1541 1548 self._streamsettingsdecoders[frame.streamid] = decoder
1542 1549
1543 1550 decoder = self._streamsettingsdecoders[frame.streamid]
1544 1551
1545 1552 try:
1546 1553 decoder.decode(frame.payload)
1547 1554 except Exception as e:
1548 1555 return 'error', {
1549 1556 'message': (_('error decoding CBOR from stream encoding '
1550 1557 'settings frame: %s') %
1551 1558 stringutil.forcebytestr(e)),
1552 1559 }
1553 1560
1554 1561 if more:
1555 1562 return 'noop', {}
1556 1563
1557 1564 assert eos
1558 1565
1559 1566 decoded = decoder.getavailable()
1560 1567 del self._streamsettingsdecoders[frame.streamid]
1561 1568
1562 1569 if not decoded:
1563 1570 return 'error', {
1564 1571 'message': _('stream encoding settings frame did not contain '
1565 1572 'CBOR data'),
1566 1573 }
1567 1574
1568 1575 try:
1569 1576 self._incomingstreams[frame.streamid].setdecoder(decoded[0],
1570 1577 decoded[1:])
1571 1578 except Exception as e:
1572 1579 return 'error', {
1573 1580 'message': (_('error setting stream decoder: %s') %
1574 1581 stringutil.forcebytestr(e)),
1575 1582 }
1576 1583
1577 1584 return 'noop', {}
1578 1585
1579 1586 def _oncommandresponseframe(self, request, frame):
1580 1587 if frame.flags & FLAG_COMMAND_RESPONSE_EOS:
1581 1588 request.state = 'received'
1582 1589 del self._activerequests[request.requestid]
1583 1590
1584 1591 return 'responsedata', {
1585 1592 'request': request,
1586 1593 'expectmore': frame.flags & FLAG_COMMAND_RESPONSE_CONTINUATION,
1587 1594 'eos': frame.flags & FLAG_COMMAND_RESPONSE_EOS,
1588 1595 'data': frame.payload,
1589 1596 }
1590 1597
1591 1598 def _onerrorresponseframe(self, request, frame):
1592 1599 request.state = 'errored'
1593 1600 del self._activerequests[request.requestid]
1594 1601
1595 1602 # The payload should be a CBOR map.
1596 1603 m = cborutil.decodeall(frame.payload)[0]
1597 1604
1598 1605 return 'error', {
1599 1606 'request': request,
1600 1607 'type': m['type'],
1601 1608 'message': m['message'],
1602 1609 }
General Comments 0
You need to be logged in to leave comments. Login now