##// END OF EJS Templates
typing: suppress bogus pytype errors in `mercurial/wireprotoframing.py`...
Matt Harbison -
r53021:0c260e71 default
parent child Browse files
Show More
@@ -1,2094 +1,2096
1 1 # wireprotoframing.py - unified framing protocol for wire protocol
2 2 #
3 3 # Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 # This file contains functionality to support the unified frame-based wire
9 9 # protocol. For details about the protocol, see
10 10 # `hg help internals.wireprotocol`.
11 11
12 12 from __future__ import annotations
13 13
14 14 import collections
15 15 import struct
16 16 import typing
17 17
18 18 from .i18n import _
19 19 from .thirdparty import attr
20 20
21 21 # Force pytype to use the non-vendored package
22 22 if typing.TYPE_CHECKING:
23 23 # noinspection PyPackageRequirements
24 24 import attr
25 25
26 26 from . import (
27 27 encoding,
28 28 error,
29 29 pycompat,
30 30 util,
31 31 wireprototypes,
32 32 )
33 33 from .utils import (
34 34 cborutil,
35 35 stringutil,
36 36 )
37 37
38 38 FRAME_HEADER_SIZE = 8
39 39 DEFAULT_MAX_FRAME_SIZE = 32768
40 40
41 41 STREAM_FLAG_BEGIN_STREAM = 0x01
42 42 STREAM_FLAG_END_STREAM = 0x02
43 43 STREAM_FLAG_ENCODING_APPLIED = 0x04
44 44
45 45 STREAM_FLAGS = {
46 46 b'stream-begin': STREAM_FLAG_BEGIN_STREAM,
47 47 b'stream-end': STREAM_FLAG_END_STREAM,
48 48 b'encoded': STREAM_FLAG_ENCODING_APPLIED,
49 49 }
50 50
51 51 FRAME_TYPE_COMMAND_REQUEST = 0x01
52 52 FRAME_TYPE_COMMAND_DATA = 0x02
53 53 FRAME_TYPE_COMMAND_RESPONSE = 0x03
54 54 FRAME_TYPE_ERROR_RESPONSE = 0x05
55 55 FRAME_TYPE_TEXT_OUTPUT = 0x06
56 56 FRAME_TYPE_PROGRESS = 0x07
57 57 FRAME_TYPE_SENDER_PROTOCOL_SETTINGS = 0x08
58 58 FRAME_TYPE_STREAM_SETTINGS = 0x09
59 59
60 60 FRAME_TYPES = {
61 61 b'command-request': FRAME_TYPE_COMMAND_REQUEST,
62 62 b'command-data': FRAME_TYPE_COMMAND_DATA,
63 63 b'command-response': FRAME_TYPE_COMMAND_RESPONSE,
64 64 b'error-response': FRAME_TYPE_ERROR_RESPONSE,
65 65 b'text-output': FRAME_TYPE_TEXT_OUTPUT,
66 66 b'progress': FRAME_TYPE_PROGRESS,
67 67 b'sender-protocol-settings': FRAME_TYPE_SENDER_PROTOCOL_SETTINGS,
68 68 b'stream-settings': FRAME_TYPE_STREAM_SETTINGS,
69 69 }
70 70
71 71 FLAG_COMMAND_REQUEST_NEW = 0x01
72 72 FLAG_COMMAND_REQUEST_CONTINUATION = 0x02
73 73 FLAG_COMMAND_REQUEST_MORE_FRAMES = 0x04
74 74 FLAG_COMMAND_REQUEST_EXPECT_DATA = 0x08
75 75
76 76 FLAGS_COMMAND_REQUEST = {
77 77 b'new': FLAG_COMMAND_REQUEST_NEW,
78 78 b'continuation': FLAG_COMMAND_REQUEST_CONTINUATION,
79 79 b'more': FLAG_COMMAND_REQUEST_MORE_FRAMES,
80 80 b'have-data': FLAG_COMMAND_REQUEST_EXPECT_DATA,
81 81 }
82 82
83 83 FLAG_COMMAND_DATA_CONTINUATION = 0x01
84 84 FLAG_COMMAND_DATA_EOS = 0x02
85 85
86 86 FLAGS_COMMAND_DATA = {
87 87 b'continuation': FLAG_COMMAND_DATA_CONTINUATION,
88 88 b'eos': FLAG_COMMAND_DATA_EOS,
89 89 }
90 90
91 91 FLAG_COMMAND_RESPONSE_CONTINUATION = 0x01
92 92 FLAG_COMMAND_RESPONSE_EOS = 0x02
93 93
94 94 FLAGS_COMMAND_RESPONSE = {
95 95 b'continuation': FLAG_COMMAND_RESPONSE_CONTINUATION,
96 96 b'eos': FLAG_COMMAND_RESPONSE_EOS,
97 97 }
98 98
99 99 FLAG_SENDER_PROTOCOL_SETTINGS_CONTINUATION = 0x01
100 100 FLAG_SENDER_PROTOCOL_SETTINGS_EOS = 0x02
101 101
102 102 FLAGS_SENDER_PROTOCOL_SETTINGS = {
103 103 b'continuation': FLAG_SENDER_PROTOCOL_SETTINGS_CONTINUATION,
104 104 b'eos': FLAG_SENDER_PROTOCOL_SETTINGS_EOS,
105 105 }
106 106
107 107 FLAG_STREAM_ENCODING_SETTINGS_CONTINUATION = 0x01
108 108 FLAG_STREAM_ENCODING_SETTINGS_EOS = 0x02
109 109
110 110 FLAGS_STREAM_ENCODING_SETTINGS = {
111 111 b'continuation': FLAG_STREAM_ENCODING_SETTINGS_CONTINUATION,
112 112 b'eos': FLAG_STREAM_ENCODING_SETTINGS_EOS,
113 113 }
114 114
115 115 # Maps frame types to their available flags.
116 116 FRAME_TYPE_FLAGS = {
117 117 FRAME_TYPE_COMMAND_REQUEST: FLAGS_COMMAND_REQUEST,
118 118 FRAME_TYPE_COMMAND_DATA: FLAGS_COMMAND_DATA,
119 119 FRAME_TYPE_COMMAND_RESPONSE: FLAGS_COMMAND_RESPONSE,
120 120 FRAME_TYPE_ERROR_RESPONSE: {},
121 121 FRAME_TYPE_TEXT_OUTPUT: {},
122 122 FRAME_TYPE_PROGRESS: {},
123 123 FRAME_TYPE_SENDER_PROTOCOL_SETTINGS: FLAGS_SENDER_PROTOCOL_SETTINGS,
124 124 FRAME_TYPE_STREAM_SETTINGS: FLAGS_STREAM_ENCODING_SETTINGS,
125 125 }
126 126
127 127 ARGUMENT_RECORD_HEADER = struct.Struct('<HH')
128 128
129 129
130 130 def humanflags(mapping, value):
131 131 """Convert a numeric flags value to a human value, using a mapping table."""
132 132 namemap = {v: k for k, v in mapping.items()}
133 133 flags = []
134 134 val = 1
135 135 while value >= val:
136 136 if value & val:
137 137 flags.append(namemap.get(val, b'<unknown 0x%02x>' % val))
138 138 val <<= 1
139 139
140 140 return b'|'.join(flags)
141 141
142 142
143 143 @attr.s(slots=True)
144 144 class frameheader:
145 145 """Represents the data in a frame header."""
146 146
147 147 length = attr.ib()
148 148 requestid = attr.ib()
149 149 streamid = attr.ib()
150 150 streamflags = attr.ib()
151 151 typeid = attr.ib()
152 152 flags = attr.ib()
153 153
154 154
155 155 @attr.s(slots=True, repr=False)
156 156 class frame:
157 157 """Represents a parsed frame."""
158 158
159 159 requestid = attr.ib()
160 160 streamid = attr.ib()
161 161 streamflags = attr.ib()
162 162 typeid = attr.ib()
163 163 flags = attr.ib()
164 164 payload = attr.ib()
165 165
166 166 @encoding.strmethod
167 167 def __repr__(self):
168 168 typename = b'<unknown 0x%02x>' % self.typeid
169 169 for name, value in FRAME_TYPES.items():
170 170 if value == self.typeid:
171 171 typename = name
172 172 break
173 173
174 174 return (
175 175 b'frame(size=%d; request=%d; stream=%d; streamflags=%s; '
176 176 b'type=%s; flags=%s)'
177 177 % (
178 178 len(self.payload),
179 179 self.requestid,
180 180 self.streamid,
181 181 humanflags(STREAM_FLAGS, self.streamflags),
182 182 typename,
183 183 humanflags(FRAME_TYPE_FLAGS.get(self.typeid, {}), self.flags),
184 184 )
185 185 )
186 186
187 187
188 188 def makeframe(requestid, streamid, streamflags, typeid, flags, payload):
189 189 """Assemble a frame into a byte array."""
190 190 # TODO assert size of payload.
191 191 frame = bytearray(FRAME_HEADER_SIZE + len(payload))
192 192
193 193 # 24 bits length
194 194 # 16 bits request id
195 195 # 8 bits stream id
196 196 # 8 bits stream flags
197 197 # 4 bits type
198 198 # 4 bits flags
199 199
200 200 l = struct.pack('<I', len(payload))
201 201 frame[0:3] = l[0:3]
202 202 struct.pack_into('<HBB', frame, 3, requestid, streamid, streamflags)
203 203 frame[7] = (typeid << 4) | flags
204 204 frame[8:] = payload
205 205
206 206 return frame
207 207
208 208
209 209 def makeframefromhumanstring(s):
210 210 """Create a frame from a human readable string
211 211
212 212 Strings have the form:
213 213
214 214 <request-id> <stream-id> <stream-flags> <type> <flags> <payload>
215 215
216 216 This can be used by user-facing applications and tests for creating
217 217 frames easily without having to type out a bunch of constants.
218 218
219 219 Request ID and stream IDs are integers.
220 220
221 221 Stream flags, frame type, and flags can be specified by integer or
222 222 named constant.
223 223
224 224 Flags can be delimited by `|` to bitwise OR them together.
225 225
226 226 If the payload begins with ``cbor:``, the following string will be
227 227 evaluated as Python literal and the resulting object will be fed into
228 228 a CBOR encoder. Otherwise, the payload is interpreted as a Python
229 229 byte string literal.
230 230 """
231 231 fields = s.split(b' ', 5)
232 232 requestid, streamid, streamflags, frametype, frameflags, payload = fields
233 233
234 234 requestid = int(requestid)
235 235 streamid = int(streamid)
236 236
237 237 finalstreamflags = 0
238 238 for flag in streamflags.split(b'|'):
239 239 if flag in STREAM_FLAGS:
240 240 finalstreamflags |= STREAM_FLAGS[flag]
241 241 else:
242 242 finalstreamflags |= int(flag)
243 243
244 244 if frametype in FRAME_TYPES:
245 245 frametype = FRAME_TYPES[frametype]
246 246 else:
247 247 frametype = int(frametype)
248 248
249 249 finalflags = 0
250 250 validflags = FRAME_TYPE_FLAGS[frametype]
251 251 for flag in frameflags.split(b'|'):
252 252 if flag in validflags:
253 253 finalflags |= validflags[flag]
254 254 else:
255 255 finalflags |= int(flag)
256 256
257 257 if payload.startswith(b'cbor:'):
258 258 payload = b''.join(
259 259 cborutil.streamencode(stringutil.evalpythonliteral(payload[5:]))
260 260 )
261 261
262 262 else:
263 263 payload = stringutil.unescapestr(payload)
264 264
265 265 return makeframe(
266 266 requestid=requestid,
267 267 streamid=streamid,
268 268 streamflags=finalstreamflags,
269 269 typeid=frametype,
270 270 flags=finalflags,
271 271 payload=payload,
272 272 )
273 273
274 274
275 275 def parseheader(data):
276 276 """Parse a unified framing protocol frame header from a buffer.
277 277
278 278 The header is expected to be in the buffer at offset 0 and the
279 279 buffer is expected to be large enough to hold a full header.
280 280 """
281 281 # 24 bits payload length (little endian)
282 282 # 16 bits request ID
283 283 # 8 bits stream ID
284 284 # 8 bits stream flags
285 285 # 4 bits frame type
286 286 # 4 bits frame flags
287 287 # ... payload
288 288 framelength = data[0] + 256 * data[1] + 16384 * data[2]
289 289 requestid, streamid, streamflags = struct.unpack_from('<HBB', data, 3)
290 290 typeflags = data[7]
291 291
292 292 frametype = (typeflags & 0xF0) >> 4
293 293 frameflags = typeflags & 0x0F
294 294
295 295 return frameheader(
296 296 framelength, requestid, streamid, streamflags, frametype, frameflags
297 297 )
298 298
299 299
300 300 def readframe(fh):
301 301 """Read a unified framing protocol frame from a file object.
302 302
303 303 Returns a 3-tuple of (type, flags, payload) for the decoded frame or
304 304 None if no frame is available. May raise if a malformed frame is
305 305 seen.
306 306 """
307 307 header = bytearray(FRAME_HEADER_SIZE)
308 308
309 309 readcount = fh.readinto(header)
310 310
311 311 if readcount == 0:
312 312 return None
313 313
314 314 if readcount != FRAME_HEADER_SIZE:
315 315 raise error.Abort(
316 316 _(b'received incomplete frame: got %d bytes: %s')
317 317 % (readcount, header)
318 318 )
319 319
320 320 h = parseheader(header)
321 321
322 322 payload = fh.read(h.length)
323 323 if len(payload) != h.length:
324 324 raise error.Abort(
325 325 _(b'frame length error: expected %d; got %d')
326 326 % (h.length, len(payload))
327 327 )
328 328
329 329 return frame(
330 330 h.requestid, h.streamid, h.streamflags, h.typeid, h.flags, payload
331 331 )
332 332
333 333
334 334 def createcommandframes(
335 335 stream,
336 336 requestid,
337 337 cmd,
338 338 args,
339 339 datafh=None,
340 340 maxframesize=DEFAULT_MAX_FRAME_SIZE,
341 341 redirect=None,
342 342 ):
343 343 """Create frames necessary to transmit a request to run a command.
344 344
345 345 This is a generator of bytearrays. Each item represents a frame
346 346 ready to be sent over the wire to a peer.
347 347 """
348 348 data = {b'name': cmd}
349 349 if args:
350 350 data[b'args'] = args
351 351
352 352 if redirect:
353 353 data[b'redirect'] = redirect
354 354
355 355 data = b''.join(cborutil.streamencode(data))
356 356
357 357 offset = 0
358 358
359 359 while True:
360 360 flags = 0
361 361
362 362 # Must set new or continuation flag.
363 363 if not offset:
364 364 flags |= FLAG_COMMAND_REQUEST_NEW
365 365 else:
366 366 flags |= FLAG_COMMAND_REQUEST_CONTINUATION
367 367
368 368 # Data frames is set on all frames.
369 369 if datafh:
370 370 flags |= FLAG_COMMAND_REQUEST_EXPECT_DATA
371 371
372 372 payload = data[offset : offset + maxframesize]
373 373 offset += len(payload)
374 374
375 375 if len(payload) == maxframesize and offset < len(data):
376 376 flags |= FLAG_COMMAND_REQUEST_MORE_FRAMES
377 377
378 378 yield stream.makeframe(
379 379 requestid=requestid,
380 380 typeid=FRAME_TYPE_COMMAND_REQUEST,
381 381 flags=flags,
382 382 payload=payload,
383 383 )
384 384
385 385 if not (flags & FLAG_COMMAND_REQUEST_MORE_FRAMES):
386 386 break
387 387
388 388 if datafh:
389 389 while True:
390 390 data = datafh.read(DEFAULT_MAX_FRAME_SIZE)
391 391
392 392 done = False
393 393 if len(data) == DEFAULT_MAX_FRAME_SIZE:
394 394 flags = FLAG_COMMAND_DATA_CONTINUATION
395 395 else:
396 396 flags = FLAG_COMMAND_DATA_EOS
397 397 assert datafh.read(1) == b''
398 398 done = True
399 399
400 400 yield stream.makeframe(
401 401 requestid=requestid,
402 402 typeid=FRAME_TYPE_COMMAND_DATA,
403 403 flags=flags,
404 404 payload=data,
405 405 )
406 406
407 407 if done:
408 408 break
409 409
410 410
411 411 def createcommandresponseokframe(stream, requestid):
412 412 overall = b''.join(cborutil.streamencode({b'status': b'ok'}))
413 413
414 414 if stream.streamsettingssent:
415 415 overall = stream.encode(overall)
416 416 encoded = True
417 417
418 418 if not overall:
419 419 return None
420 420 else:
421 421 encoded = False
422 422
423 423 return stream.makeframe(
424 424 requestid=requestid,
425 425 typeid=FRAME_TYPE_COMMAND_RESPONSE,
426 426 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
427 427 payload=overall,
428 428 encoded=encoded,
429 429 )
430 430
431 431
432 432 def createcommandresponseeosframes(
433 433 stream, requestid, maxframesize=DEFAULT_MAX_FRAME_SIZE
434 434 ):
435 435 """Create an empty payload frame representing command end-of-stream."""
436 436 payload = stream.flush()
437 437
438 438 offset = 0
439 439 while True:
440 440 chunk = payload[offset : offset + maxframesize]
441 441 offset += len(chunk)
442 442
443 443 done = offset == len(payload)
444 444
445 445 if done:
446 446 flags = FLAG_COMMAND_RESPONSE_EOS
447 447 else:
448 448 flags = FLAG_COMMAND_RESPONSE_CONTINUATION
449 449
450 450 yield stream.makeframe(
451 451 requestid=requestid,
452 452 typeid=FRAME_TYPE_COMMAND_RESPONSE,
453 453 flags=flags,
454 454 payload=chunk,
455 455 encoded=payload != b'',
456 456 )
457 457
458 458 if done:
459 459 break
460 460
461 461
462 462 def createalternatelocationresponseframe(stream, requestid, location):
463 463 data = {
464 464 b'status': b'redirect',
465 465 b'location': {
466 466 b'url': location.url,
467 467 b'mediatype': location.mediatype,
468 468 },
469 469 }
470 470
471 471 for a in (
472 472 'size',
473 473 'fullhashes',
474 474 'fullhashseed',
475 475 'serverdercerts',
476 476 'servercadercerts',
477 477 ):
478 478 value = getattr(location, a)
479 479 if value is not None:
480 # pytype: disable=unsupported-operands
480 481 data[b'location'][pycompat.bytestr(a)] = value
482 # pytype: enable=unsupported-operands
481 483
482 484 payload = b''.join(cborutil.streamencode(data))
483 485
484 486 if stream.streamsettingssent:
485 487 payload = stream.encode(payload)
486 488 encoded = True
487 489 else:
488 490 encoded = False
489 491
490 492 return stream.makeframe(
491 493 requestid=requestid,
492 494 typeid=FRAME_TYPE_COMMAND_RESPONSE,
493 495 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
494 496 payload=payload,
495 497 encoded=encoded,
496 498 )
497 499
498 500
499 501 def createcommanderrorresponse(stream, requestid, message, args=None):
500 502 # TODO should this be using a list of {'msg': ..., 'args': {}} so atom
501 503 # formatting works consistently?
502 504 m = {
503 505 b'status': b'error',
504 506 b'error': {
505 507 b'message': message,
506 508 },
507 509 }
508 510
509 511 if args:
510 m[b'error'][b'args'] = args
512 m[b'error'][b'args'] = args # pytype: disable=unsupported-operands
511 513
512 514 overall = b''.join(cborutil.streamencode(m))
513 515
514 516 yield stream.makeframe(
515 517 requestid=requestid,
516 518 typeid=FRAME_TYPE_COMMAND_RESPONSE,
517 519 flags=FLAG_COMMAND_RESPONSE_EOS,
518 520 payload=overall,
519 521 )
520 522
521 523
522 524 def createerrorframe(stream, requestid, msg, errtype):
523 525 # TODO properly handle frame size limits.
524 526 assert len(msg) <= DEFAULT_MAX_FRAME_SIZE
525 527
526 528 payload = b''.join(
527 529 cborutil.streamencode(
528 530 {
529 531 b'type': errtype,
530 532 b'message': [{b'msg': msg}],
531 533 }
532 534 )
533 535 )
534 536
535 537 yield stream.makeframe(
536 538 requestid=requestid,
537 539 typeid=FRAME_TYPE_ERROR_RESPONSE,
538 540 flags=0,
539 541 payload=payload,
540 542 )
541 543
542 544
543 545 def createtextoutputframe(
544 546 stream, requestid, atoms, maxframesize=DEFAULT_MAX_FRAME_SIZE
545 547 ):
546 548 """Create a text output frame to render text to people.
547 549
548 550 ``atoms`` is a 3-tuple of (formatting string, args, labels).
549 551
550 552 The formatting string contains ``%s`` tokens to be replaced by the
551 553 corresponding indexed entry in ``args``. ``labels`` is an iterable of
552 554 formatters to be applied at rendering time. In terms of the ``ui``
553 555 class, each atom corresponds to a ``ui.write()``.
554 556 """
555 557 atomdicts = []
556 558
557 559 for formatting, args, labels in atoms:
558 560 # TODO look for localstr, other types here?
559 561
560 562 if not isinstance(formatting, bytes):
561 563 raise ValueError(b'must use bytes formatting strings')
562 564 for arg in args:
563 565 if not isinstance(arg, bytes):
564 566 raise ValueError(b'must use bytes for arguments')
565 567 for label in labels:
566 568 if not isinstance(label, bytes):
567 569 raise ValueError(b'must use bytes for labels')
568 570
569 571 # Formatting string must be ASCII.
570 572 formatting = formatting.decode('ascii', 'replace').encode('ascii')
571 573
572 574 # Arguments must be UTF-8.
573 575 args = [a.decode('utf-8', 'replace').encode('utf-8') for a in args]
574 576
575 577 # Labels must be ASCII.
576 578 labels = [l.decode('ascii', 'strict').encode('ascii') for l in labels]
577 579
578 580 atom = {b'msg': formatting}
579 581 if args:
580 582 atom[b'args'] = args
581 583 if labels:
582 584 atom[b'labels'] = labels
583 585
584 586 atomdicts.append(atom)
585 587
586 588 payload = b''.join(cborutil.streamencode(atomdicts))
587 589
588 590 if len(payload) > maxframesize:
589 591 raise ValueError(b'cannot encode data in a single frame')
590 592
591 593 yield stream.makeframe(
592 594 requestid=requestid,
593 595 typeid=FRAME_TYPE_TEXT_OUTPUT,
594 596 flags=0,
595 597 payload=payload,
596 598 )
597 599
598 600
599 601 class bufferingcommandresponseemitter:
600 602 """Helper object to emit command response frames intelligently.
601 603
602 604 Raw command response data is likely emitted in chunks much smaller
603 605 than what can fit in a single frame. This class exists to buffer
604 606 chunks until enough data is available to fit in a single frame.
605 607
606 608 TODO we'll need something like this when compression is supported.
607 609 So it might make sense to implement this functionality at the stream
608 610 level.
609 611 """
610 612
611 613 def __init__(self, stream, requestid, maxframesize=DEFAULT_MAX_FRAME_SIZE):
612 614 self._stream = stream
613 615 self._requestid = requestid
614 616 self._maxsize = maxframesize
615 617 self._chunks = []
616 618 self._chunkssize = 0
617 619
618 620 def send(self, data):
619 621 """Send new data for emission.
620 622
621 623 Is a generator of new frames that were derived from the new input.
622 624
623 625 If the special input ``None`` is received, flushes all buffered
624 626 data to frames.
625 627 """
626 628
627 629 if data is None:
628 630 for frame in self._flush():
629 631 yield frame
630 632 return
631 633
632 634 data = self._stream.encode(data)
633 635
634 636 # There is a ton of potential to do more complicated things here.
635 637 # Our immediate goal is to coalesce small chunks into big frames,
636 638 # not achieve the fewest number of frames possible. So we go with
637 639 # a simple implementation:
638 640 #
639 641 # * If a chunk is too large for a frame, we flush and emit frames
640 642 # for the new chunk.
641 643 # * If a chunk can be buffered without total buffered size limits
642 644 # being exceeded, we do that.
643 645 # * If a chunk causes us to go over our buffering limit, we flush
644 646 # and then buffer the new chunk.
645 647
646 648 if not data:
647 649 return
648 650
649 651 if len(data) > self._maxsize:
650 652 for frame in self._flush():
651 653 yield frame
652 654
653 655 # Now emit frames for the big chunk.
654 656 offset = 0
655 657 while True:
656 658 chunk = data[offset : offset + self._maxsize]
657 659 offset += len(chunk)
658 660
659 661 yield self._stream.makeframe(
660 662 self._requestid,
661 663 typeid=FRAME_TYPE_COMMAND_RESPONSE,
662 664 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
663 665 payload=chunk,
664 666 encoded=True,
665 667 )
666 668
667 669 if offset == len(data):
668 670 return
669 671
670 672 # If we don't have enough to constitute a full frame, buffer and
671 673 # return.
672 674 if len(data) + self._chunkssize < self._maxsize:
673 675 self._chunks.append(data)
674 676 self._chunkssize += len(data)
675 677 return
676 678
677 679 # Else flush what we have and buffer the new chunk. We could do
678 680 # something more intelligent here, like break the chunk. Let's
679 681 # keep things simple for now.
680 682 for frame in self._flush():
681 683 yield frame
682 684
683 685 self._chunks.append(data)
684 686 self._chunkssize = len(data)
685 687
686 688 def _flush(self):
687 689 payload = b''.join(self._chunks)
688 690 assert len(payload) <= self._maxsize
689 691
690 692 self._chunks[:] = []
691 693 self._chunkssize = 0
692 694
693 695 if not payload:
694 696 return
695 697
696 698 yield self._stream.makeframe(
697 699 self._requestid,
698 700 typeid=FRAME_TYPE_COMMAND_RESPONSE,
699 701 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
700 702 payload=payload,
701 703 encoded=True,
702 704 )
703 705
704 706
705 707 # TODO consider defining encoders/decoders using the util.compressionengine
706 708 # mechanism.
707 709
708 710
709 711 class identityencoder:
710 712 """Encoder for the "identity" stream encoding profile."""
711 713
712 714 def __init__(self, ui):
713 715 pass
714 716
715 717 def encode(self, data):
716 718 return data
717 719
718 720 def flush(self):
719 721 return b''
720 722
721 723 def finish(self):
722 724 return b''
723 725
724 726
725 727 class identitydecoder:
726 728 """Decoder for the "identity" stream encoding profile."""
727 729
728 730 def __init__(self, ui, extraobjs):
729 731 if extraobjs:
730 732 raise error.Abort(
731 733 _(b'identity decoder received unexpected additional values')
732 734 )
733 735
734 736 def decode(self, data):
735 737 return data
736 738
737 739
738 740 class zlibencoder:
739 741 def __init__(self, ui):
740 742 import zlib
741 743
742 744 self._zlib = zlib
743 745 self._compressor = zlib.compressobj()
744 746
745 747 def encode(self, data):
746 748 return self._compressor.compress(data)
747 749
748 750 def flush(self):
749 751 # Z_SYNC_FLUSH doesn't reset compression context, which is
750 752 # what we want.
751 753 return self._compressor.flush(self._zlib.Z_SYNC_FLUSH)
752 754
753 755 def finish(self):
754 756 res = self._compressor.flush(self._zlib.Z_FINISH)
755 757 self._compressor = None
756 758 return res
757 759
758 760
759 761 class zlibdecoder:
760 762 def __init__(self, ui, extraobjs):
761 763 import zlib
762 764
763 765 if extraobjs:
764 766 raise error.Abort(
765 767 _(b'zlib decoder received unexpected additional values')
766 768 )
767 769
768 770 self._decompressor = zlib.decompressobj()
769 771
770 772 def decode(self, data):
771 773 return self._decompressor.decompress(data)
772 774
773 775
774 776 class zstdbaseencoder:
775 777 def __init__(self, level):
776 from . import zstd
778 from . import zstd # pytype: disable=import-error
777 779
778 780 self._zstd = zstd
779 781 cctx = zstd.ZstdCompressor(level=level)
780 782 self._compressor = cctx.compressobj()
781 783
782 784 def encode(self, data):
783 785 return self._compressor.compress(data)
784 786
785 787 def flush(self):
786 788 # COMPRESSOBJ_FLUSH_BLOCK flushes all data previously fed into the
787 789 # compressor and allows a decompressor to access all encoded data
788 790 # up to this point.
789 791 return self._compressor.flush(self._zstd.COMPRESSOBJ_FLUSH_BLOCK)
790 792
791 793 def finish(self):
792 794 res = self._compressor.flush(self._zstd.COMPRESSOBJ_FLUSH_FINISH)
793 795 self._compressor = None
794 796 return res
795 797
796 798
797 799 class zstd8mbencoder(zstdbaseencoder):
798 800 def __init__(self, ui):
799 801 super(zstd8mbencoder, self).__init__(3)
800 802
801 803
802 804 class zstdbasedecoder:
803 805 def __init__(self, maxwindowsize):
804 from . import zstd
806 from . import zstd # pytype: disable=import-error
805 807
806 808 dctx = zstd.ZstdDecompressor(max_window_size=maxwindowsize)
807 809 self._decompressor = dctx.decompressobj()
808 810
809 811 def decode(self, data):
810 812 return self._decompressor.decompress(data)
811 813
812 814
813 815 class zstd8mbdecoder(zstdbasedecoder):
814 816 def __init__(self, ui, extraobjs):
815 817 if extraobjs:
816 818 raise error.Abort(
817 819 _(b'zstd8mb decoder received unexpected additional values')
818 820 )
819 821
820 822 super(zstd8mbdecoder, self).__init__(maxwindowsize=8 * 1048576)
821 823
822 824
823 825 # We lazily populate this to avoid excessive module imports when importing
824 826 # this module.
825 827 STREAM_ENCODERS = {}
826 828 STREAM_ENCODERS_ORDER = []
827 829
828 830
829 831 def populatestreamencoders():
830 832 if STREAM_ENCODERS:
831 833 return
832 834
833 835 try:
834 from . import zstd
836 from . import zstd # pytype: disable=import-error
835 837
836 838 zstd.__version__
837 839 except ImportError:
838 840 zstd = None
839 841
840 842 # zstandard is fastest and is preferred.
841 843 if zstd:
842 844 STREAM_ENCODERS[b'zstd-8mb'] = (zstd8mbencoder, zstd8mbdecoder)
843 845 STREAM_ENCODERS_ORDER.append(b'zstd-8mb')
844 846
845 847 STREAM_ENCODERS[b'zlib'] = (zlibencoder, zlibdecoder)
846 848 STREAM_ENCODERS_ORDER.append(b'zlib')
847 849
848 850 STREAM_ENCODERS[b'identity'] = (identityencoder, identitydecoder)
849 851 STREAM_ENCODERS_ORDER.append(b'identity')
850 852
851 853
852 854 class stream:
853 855 """Represents a logical unidirectional series of frames."""
854 856
855 857 def __init__(self, streamid, active=False):
856 858 self.streamid = streamid
857 859 self._active = active
858 860
859 861 def makeframe(self, requestid, typeid, flags, payload):
860 862 """Create a frame to be sent out over this stream.
861 863
862 864 Only returns the frame instance. Does not actually send it.
863 865 """
864 866 streamflags = 0
865 867 if not self._active:
866 868 streamflags |= STREAM_FLAG_BEGIN_STREAM
867 869 self._active = True
868 870
869 871 return makeframe(
870 872 requestid, self.streamid, streamflags, typeid, flags, payload
871 873 )
872 874
873 875
874 876 class inputstream(stream):
875 877 """Represents a stream used for receiving data."""
876 878
877 879 def __init__(self, streamid, active=False):
878 880 super(inputstream, self).__init__(streamid, active=active)
879 881 self._decoder = None
880 882
881 883 def setdecoder(self, ui, name, extraobjs):
882 884 """Set the decoder for this stream.
883 885
884 886 Receives the stream profile name and any additional CBOR objects
885 887 decoded from the stream encoding settings frame payloads.
886 888 """
887 889 if name not in STREAM_ENCODERS:
888 890 raise error.Abort(_(b'unknown stream decoder: %s') % name)
889 891
890 892 self._decoder = STREAM_ENCODERS[name][1](ui, extraobjs)
891 893
892 894 def decode(self, data):
893 895 # Default is identity decoder. We don't bother instantiating one
894 896 # because it is trivial.
895 897 if not self._decoder:
896 898 return data
897 899
898 900 return self._decoder.decode(data)
899 901
900 902 def flush(self):
901 903 if not self._decoder:
902 904 return b''
903 905
904 906 return self._decoder.flush()
905 907
906 908
907 909 class outputstream(stream):
908 910 """Represents a stream used for sending data."""
909 911
910 912 def __init__(self, streamid, active=False):
911 913 super(outputstream, self).__init__(streamid, active=active)
912 914 self.streamsettingssent = False
913 915 self._encoder = None
914 916 self._encodername = None
915 917
916 918 def setencoder(self, ui, name):
917 919 """Set the encoder for this stream.
918 920
919 921 Receives the stream profile name.
920 922 """
921 923 if name not in STREAM_ENCODERS:
922 924 raise error.Abort(_(b'unknown stream encoder: %s') % name)
923 925
924 926 self._encoder = STREAM_ENCODERS[name][0](ui)
925 927 self._encodername = name
926 928
927 929 def encode(self, data):
928 930 if not self._encoder:
929 931 return data
930 932
931 933 return self._encoder.encode(data)
932 934
933 935 def flush(self):
934 936 if not self._encoder:
935 937 return b''
936 938
937 939 return self._encoder.flush()
938 940
939 941 def finish(self):
940 942 if not self._encoder:
941 943 return b''
942 944
943 945 self._encoder.finish()
944 946
945 947 def makeframe(self, requestid, typeid, flags, payload, encoded=False):
946 948 """Create a frame to be sent out over this stream.
947 949
948 950 Only returns the frame instance. Does not actually send it.
949 951 """
950 952 streamflags = 0
951 953 if not self._active:
952 954 streamflags |= STREAM_FLAG_BEGIN_STREAM
953 955 self._active = True
954 956
955 957 if encoded:
956 958 if not self.streamsettingssent:
957 959 raise error.ProgrammingError(
958 960 b'attempting to send encoded frame without sending stream '
959 961 b'settings'
960 962 )
961 963
962 964 streamflags |= STREAM_FLAG_ENCODING_APPLIED
963 965
964 966 if (
965 967 typeid == FRAME_TYPE_STREAM_SETTINGS
966 968 and flags & FLAG_STREAM_ENCODING_SETTINGS_EOS
967 969 ):
968 970 self.streamsettingssent = True
969 971
970 972 return makeframe(
971 973 requestid, self.streamid, streamflags, typeid, flags, payload
972 974 )
973 975
974 976 def makestreamsettingsframe(self, requestid):
975 977 """Create a stream settings frame for this stream.
976 978
977 979 Returns frame data or None if no stream settings frame is needed or has
978 980 already been sent.
979 981 """
980 982 if not self._encoder or self.streamsettingssent:
981 983 return None
982 984
983 985 payload = b''.join(cborutil.streamencode(self._encodername))
984 986 return self.makeframe(
985 987 requestid,
986 988 FRAME_TYPE_STREAM_SETTINGS,
987 989 FLAG_STREAM_ENCODING_SETTINGS_EOS,
988 990 payload,
989 991 )
990 992
991 993
992 994 def ensureserverstream(stream):
993 995 if stream.streamid % 2:
994 996 raise error.ProgrammingError(
995 997 b'server should only write to even '
996 998 b'numbered streams; %d is not even' % stream.streamid
997 999 )
998 1000
999 1001
1000 1002 DEFAULT_PROTOCOL_SETTINGS = {
1001 1003 b'contentencodings': [b'identity'],
1002 1004 }
1003 1005
1004 1006
1005 1007 class serverreactor:
1006 1008 """Holds state of a server handling frame-based protocol requests.
1007 1009
1008 1010 This class is the "brain" of the unified frame-based protocol server
1009 1011 component. While the protocol is stateless from the perspective of
1010 1012 requests/commands, something needs to track which frames have been
1011 1013 received, what frames to expect, etc. This class is that thing.
1012 1014
1013 1015 Instances are modeled as a state machine of sorts. Instances are also
1014 1016 reactionary to external events. The point of this class is to encapsulate
1015 1017 the state of the connection and the exchange of frames, not to perform
1016 1018 work. Instead, callers tell this class when something occurs, like a
1017 1019 frame arriving. If that activity is worthy of a follow-up action (say
1018 1020 *run a command*), the return value of that handler will say so.
1019 1021
1020 1022 I/O and CPU intensive operations are purposefully delegated outside of
1021 1023 this class.
1022 1024
1023 1025 Consumers are expected to tell instances when events occur. They do so by
1024 1026 calling the various ``on*`` methods. These methods return a 2-tuple
1025 1027 describing any follow-up action(s) to take. The first element is the
1026 1028 name of an action to perform. The second is a data structure (usually
1027 1029 a dict) specific to that action that contains more information. e.g.
1028 1030 if the server wants to send frames back to the client, the data structure
1029 1031 will contain a reference to those frames.
1030 1032
1031 1033 Valid actions that consumers can be instructed to take are:
1032 1034
1033 1035 sendframes
1034 1036 Indicates that frames should be sent to the client. The ``framegen``
1035 1037 key contains a generator of frames that should be sent. The server
1036 1038 assumes that all frames are sent to the client.
1037 1039
1038 1040 error
1039 1041 Indicates that an error occurred. Consumer should probably abort.
1040 1042
1041 1043 runcommand
1042 1044 Indicates that the consumer should run a wire protocol command. Details
1043 1045 of the command to run are given in the data structure.
1044 1046
1045 1047 wantframe
1046 1048 Indicates that nothing of interest happened and the server is waiting on
1047 1049 more frames from the client before anything interesting can be done.
1048 1050
1049 1051 noop
1050 1052 Indicates no additional action is required.
1051 1053
1052 1054 Known Issues
1053 1055 ------------
1054 1056
1055 1057 There are no limits to the number of partially received commands or their
1056 1058 size. A malicious client could stream command request data and exhaust the
1057 1059 server's memory.
1058 1060
1059 1061 Partially received commands are not acted upon when end of input is
1060 1062 reached. Should the server error if it receives a partial request?
1061 1063 Should the client send a message to abort a partially transmitted request
1062 1064 to facilitate graceful shutdown?
1063 1065
1064 1066 Active requests that haven't been responded to aren't tracked. This means
1065 1067 that if we receive a command and instruct its dispatch, another command
1066 1068 with its request ID can come in over the wire and there will be a race
1067 1069 between who responds to what.
1068 1070 """
1069 1071
1070 1072 def __init__(self, ui, deferoutput=False):
1071 1073 """Construct a new server reactor.
1072 1074
1073 1075 ``deferoutput`` can be used to indicate that no output frames should be
1074 1076 instructed to be sent until input has been exhausted. In this mode,
1075 1077 events that would normally generate output frames (such as a command
1076 1078 response being ready) will instead defer instructing the consumer to
1077 1079 send those frames. This is useful for half-duplex transports where the
1078 1080 sender cannot receive until all data has been transmitted.
1079 1081 """
1080 1082 self._ui = ui
1081 1083 self._deferoutput = deferoutput
1082 1084 self._state = b'initial'
1083 1085 self._nextoutgoingstreamid = 2
1084 1086 self._bufferedframegens = []
1085 1087 # stream id -> stream instance for all active streams from the client.
1086 1088 self._incomingstreams = {}
1087 1089 self._outgoingstreams = {}
1088 1090 # request id -> dict of commands that are actively being received.
1089 1091 self._receivingcommands = {}
1090 1092 # Request IDs that have been received and are actively being processed.
1091 1093 # Once all output for a request has been sent, it is removed from this
1092 1094 # set.
1093 1095 self._activecommands = set()
1094 1096
1095 1097 self._protocolsettingsdecoder = None
1096 1098
1097 1099 # Sender protocol settings are optional. Set implied default values.
1098 1100 self._sendersettings = dict(DEFAULT_PROTOCOL_SETTINGS)
1099 1101
1100 1102 populatestreamencoders()
1101 1103
1102 1104 def onframerecv(self, frame):
1103 1105 """Process a frame that has been received off the wire.
1104 1106
1105 1107 Returns a dict with an ``action`` key that details what action,
1106 1108 if any, the consumer should take next.
1107 1109 """
1108 1110 if not frame.streamid % 2:
1109 1111 self._state = b'errored'
1110 1112 return self._makeerrorresult(
1111 1113 _(b'received frame with even numbered stream ID: %d')
1112 1114 % frame.streamid
1113 1115 )
1114 1116
1115 1117 if frame.streamid not in self._incomingstreams:
1116 1118 if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM:
1117 1119 self._state = b'errored'
1118 1120 return self._makeerrorresult(
1119 1121 _(
1120 1122 b'received frame on unknown inactive stream without '
1121 1123 b'beginning of stream flag set'
1122 1124 )
1123 1125 )
1124 1126
1125 1127 self._incomingstreams[frame.streamid] = inputstream(frame.streamid)
1126 1128
1127 1129 if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED:
1128 1130 # TODO handle decoding frames
1129 1131 self._state = b'errored'
1130 1132 raise error.ProgrammingError(
1131 1133 b'support for decoding stream payloads not yet implemented'
1132 1134 )
1133 1135
1134 1136 if frame.streamflags & STREAM_FLAG_END_STREAM:
1135 1137 del self._incomingstreams[frame.streamid]
1136 1138
1137 1139 handlers = {
1138 1140 b'initial': self._onframeinitial,
1139 1141 b'protocol-settings-receiving': self._onframeprotocolsettings,
1140 1142 b'idle': self._onframeidle,
1141 1143 b'command-receiving': self._onframecommandreceiving,
1142 1144 b'errored': self._onframeerrored,
1143 1145 }
1144 1146
1145 1147 meth = handlers.get(self._state)
1146 1148 if not meth:
1147 1149 raise error.ProgrammingError(b'unhandled state: %s' % self._state)
1148 1150
1149 1151 return meth(frame)
1150 1152
1151 1153 def oncommandresponsereadyobjects(self, stream, requestid, objs):
1152 1154 """Signal that objects are ready to be sent to the client.
1153 1155
1154 1156 ``objs`` is an iterable of objects (typically a generator) that will
1155 1157 be encoded via CBOR and added to frames, which will be sent to the
1156 1158 client.
1157 1159 """
1158 1160 ensureserverstream(stream)
1159 1161
1160 1162 # A more robust solution would be to check for objs.{next,__next__}.
1161 1163 if isinstance(objs, list):
1162 1164 objs = iter(objs)
1163 1165
1164 1166 # We need to take care over exception handling. Uncaught exceptions
1165 1167 # when generating frames could lead to premature end of the frame
1166 1168 # stream and the possibility of the server or client process getting
1167 1169 # in a bad state.
1168 1170 #
1169 1171 # Keep in mind that if ``objs`` is a generator, advancing it could
1170 1172 # raise exceptions that originated in e.g. wire protocol command
1171 1173 # functions. That is why we differentiate between exceptions raised
1172 1174 # when iterating versus other exceptions that occur.
1173 1175 #
1174 1176 # In all cases, when the function finishes, the request is fully
1175 1177 # handled and no new frames for it should be seen.
1176 1178
1177 1179 def sendframes():
1178 1180 emitted = False
1179 1181 alternatelocationsent = False
1180 1182 emitter = bufferingcommandresponseemitter(stream, requestid)
1181 1183 while True:
1182 1184 try:
1183 1185 o = next(objs)
1184 1186 except StopIteration:
1185 1187 for frame in emitter.send(None):
1186 1188 yield frame
1187 1189
1188 1190 if emitted:
1189 1191 for frame in createcommandresponseeosframes(
1190 1192 stream, requestid
1191 1193 ):
1192 1194 yield frame
1193 1195 break
1194 1196
1195 1197 except error.WireprotoCommandError as e:
1196 1198 for frame in createcommanderrorresponse(
1197 1199 stream, requestid, e.message, e.messageargs
1198 1200 ):
1199 1201 yield frame
1200 1202 break
1201 1203
1202 1204 except Exception as e:
1203 1205 for frame in createerrorframe(
1204 1206 stream,
1205 1207 requestid,
1206 1208 b'%s' % stringutil.forcebytestr(e),
1207 1209 errtype=b'server',
1208 1210 ):
1209 1211 yield frame
1210 1212
1211 1213 break
1212 1214
1213 1215 try:
1214 1216 # Alternate location responses can only be the first and
1215 1217 # only object in the output stream.
1216 1218 if isinstance(o, wireprototypes.alternatelocationresponse):
1217 1219 if emitted:
1218 1220 raise error.ProgrammingError(
1219 1221 b'alternatelocationresponse seen after initial '
1220 1222 b'output object'
1221 1223 )
1222 1224
1223 1225 frame = stream.makestreamsettingsframe(requestid)
1224 1226 if frame:
1225 1227 yield frame
1226 1228
1227 1229 yield createalternatelocationresponseframe(
1228 1230 stream, requestid, o
1229 1231 )
1230 1232
1231 1233 alternatelocationsent = True
1232 1234 emitted = True
1233 1235 continue
1234 1236
1235 1237 if alternatelocationsent:
1236 1238 raise error.ProgrammingError(
1237 1239 b'object follows alternatelocationresponse'
1238 1240 )
1239 1241
1240 1242 if not emitted:
1241 1243 # Frame is optional.
1242 1244 frame = stream.makestreamsettingsframe(requestid)
1243 1245 if frame:
1244 1246 yield frame
1245 1247
1246 1248 # May be None if empty frame (due to encoding).
1247 1249 frame = createcommandresponseokframe(stream, requestid)
1248 1250 if frame:
1249 1251 yield frame
1250 1252
1251 1253 emitted = True
1252 1254
1253 1255 # Objects emitted by command functions can be serializable
1254 1256 # data structures or special types.
1255 1257 # TODO consider extracting the content normalization to a
1256 1258 # standalone function, as it may be useful for e.g. cachers.
1257 1259
1258 1260 # A pre-encoded object is sent directly to the emitter.
1259 1261 if isinstance(o, wireprototypes.encodedresponse):
1260 1262 for frame in emitter.send(o.data):
1261 1263 yield frame
1262 1264
1263 1265 elif isinstance(
1264 1266 o, wireprototypes.indefinitebytestringresponse
1265 1267 ):
1266 1268 for chunk in cborutil.streamencodebytestringfromiter(
1267 1269 o.chunks
1268 1270 ):
1269 1271 for frame in emitter.send(chunk):
1270 1272 yield frame
1271 1273
1272 1274 # A regular object is CBOR encoded.
1273 1275 else:
1274 1276 for chunk in cborutil.streamencode(o):
1275 1277 for frame in emitter.send(chunk):
1276 1278 yield frame
1277 1279
1278 1280 except Exception as e:
1279 1281 for frame in createerrorframe(
1280 1282 stream, requestid, b'%s' % e, errtype=b'server'
1281 1283 ):
1282 1284 yield frame
1283 1285
1284 1286 break
1285 1287
1286 1288 self._activecommands.remove(requestid)
1287 1289
1288 1290 return self._handlesendframes(sendframes())
1289 1291
1290 1292 def oninputeof(self):
1291 1293 """Signals that end of input has been received.
1292 1294
1293 1295 No more frames will be received. All pending activity should be
1294 1296 completed.
1295 1297 """
1296 1298 # TODO should we do anything about in-flight commands?
1297 1299
1298 1300 if not self._deferoutput or not self._bufferedframegens:
1299 1301 return b'noop', {}
1300 1302
1301 1303 # If we buffered all our responses, emit those.
1302 1304 def makegen():
1303 1305 for gen in self._bufferedframegens:
1304 1306 for frame in gen:
1305 1307 yield frame
1306 1308
1307 1309 return b'sendframes', {
1308 1310 b'framegen': makegen(),
1309 1311 }
1310 1312
1311 1313 def _handlesendframes(self, framegen):
1312 1314 if self._deferoutput:
1313 1315 self._bufferedframegens.append(framegen)
1314 1316 return b'noop', {}
1315 1317 else:
1316 1318 return b'sendframes', {
1317 1319 b'framegen': framegen,
1318 1320 }
1319 1321
1320 1322 def onservererror(self, stream, requestid, msg):
1321 1323 ensureserverstream(stream)
1322 1324
1323 1325 def sendframes():
1324 1326 for frame in createerrorframe(
1325 1327 stream, requestid, msg, errtype=b'server'
1326 1328 ):
1327 1329 yield frame
1328 1330
1329 1331 self._activecommands.remove(requestid)
1330 1332
1331 1333 return self._handlesendframes(sendframes())
1332 1334
1333 1335 def oncommanderror(self, stream, requestid, message, args=None):
1334 1336 """Called when a command encountered an error before sending output."""
1335 1337 ensureserverstream(stream)
1336 1338
1337 1339 def sendframes():
1338 1340 for frame in createcommanderrorresponse(
1339 1341 stream, requestid, message, args
1340 1342 ):
1341 1343 yield frame
1342 1344
1343 1345 self._activecommands.remove(requestid)
1344 1346
1345 1347 return self._handlesendframes(sendframes())
1346 1348
1347 1349 def makeoutputstream(self):
1348 1350 """Create a stream to be used for sending data to the client.
1349 1351
1350 1352 If this is called before protocol settings frames are received, we
1351 1353 don't know what stream encodings are supported by the client and
1352 1354 we will default to identity.
1353 1355 """
1354 1356 streamid = self._nextoutgoingstreamid
1355 1357 self._nextoutgoingstreamid += 2
1356 1358
1357 1359 s = outputstream(streamid)
1358 1360 self._outgoingstreams[streamid] = s
1359 1361
1360 1362 # Always use the *server's* preferred encoder over the client's,
1361 1363 # as servers have more to lose from sub-optimal encoders being used.
1362 1364 for name in STREAM_ENCODERS_ORDER:
1363 1365 if name in self._sendersettings[b'contentencodings']:
1364 1366 s.setencoder(self._ui, name)
1365 1367 break
1366 1368
1367 1369 return s
1368 1370
1369 1371 def _makeerrorresult(self, msg):
1370 1372 return b'error', {
1371 1373 b'message': msg,
1372 1374 }
1373 1375
1374 1376 def _makeruncommandresult(self, requestid):
1375 1377 entry = self._receivingcommands[requestid]
1376 1378
1377 1379 if not entry[b'requestdone']:
1378 1380 self._state = b'errored'
1379 1381 raise error.ProgrammingError(
1380 1382 b'should not be called without requestdone set'
1381 1383 )
1382 1384
1383 1385 del self._receivingcommands[requestid]
1384 1386
1385 1387 if self._receivingcommands:
1386 1388 self._state = b'command-receiving'
1387 1389 else:
1388 1390 self._state = b'idle'
1389 1391
1390 1392 # Decode the payloads as CBOR.
1391 1393 entry[b'payload'].seek(0)
1392 1394 request = cborutil.decodeall(entry[b'payload'].getvalue())[0]
1393 1395
1394 1396 if b'name' not in request:
1395 1397 self._state = b'errored'
1396 1398 return self._makeerrorresult(
1397 1399 _(b'command request missing "name" field')
1398 1400 )
1399 1401
1400 1402 if b'args' not in request:
1401 1403 request[b'args'] = {}
1402 1404
1403 1405 assert requestid not in self._activecommands
1404 1406 self._activecommands.add(requestid)
1405 1407
1406 1408 return (
1407 1409 b'runcommand',
1408 1410 {
1409 1411 b'requestid': requestid,
1410 1412 b'command': request[b'name'],
1411 1413 b'args': request[b'args'],
1412 1414 b'redirect': request.get(b'redirect'),
1413 1415 b'data': entry[b'data'].getvalue() if entry[b'data'] else None,
1414 1416 },
1415 1417 )
1416 1418
1417 1419 def _makewantframeresult(self):
1418 1420 return b'wantframe', {
1419 1421 b'state': self._state,
1420 1422 }
1421 1423
1422 1424 def _validatecommandrequestframe(self, frame):
1423 1425 new = frame.flags & FLAG_COMMAND_REQUEST_NEW
1424 1426 continuation = frame.flags & FLAG_COMMAND_REQUEST_CONTINUATION
1425 1427
1426 1428 if new and continuation:
1427 1429 self._state = b'errored'
1428 1430 return self._makeerrorresult(
1429 1431 _(
1430 1432 b'received command request frame with both new and '
1431 1433 b'continuation flags set'
1432 1434 )
1433 1435 )
1434 1436
1435 1437 if not new and not continuation:
1436 1438 self._state = b'errored'
1437 1439 return self._makeerrorresult(
1438 1440 _(
1439 1441 b'received command request frame with neither new nor '
1440 1442 b'continuation flags set'
1441 1443 )
1442 1444 )
1443 1445
1444 1446 def _onframeinitial(self, frame):
1445 1447 # Called when we receive a frame when in the "initial" state.
1446 1448 if frame.typeid == FRAME_TYPE_SENDER_PROTOCOL_SETTINGS:
1447 1449 self._state = b'protocol-settings-receiving'
1448 1450 self._protocolsettingsdecoder = cborutil.bufferingdecoder()
1449 1451 return self._onframeprotocolsettings(frame)
1450 1452
1451 1453 elif frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
1452 1454 self._state = b'idle'
1453 1455 return self._onframeidle(frame)
1454 1456
1455 1457 else:
1456 1458 self._state = b'errored'
1457 1459 return self._makeerrorresult(
1458 1460 _(
1459 1461 b'expected sender protocol settings or command request '
1460 1462 b'frame; got %d'
1461 1463 )
1462 1464 % frame.typeid
1463 1465 )
1464 1466
1465 1467 def _onframeprotocolsettings(self, frame):
1466 1468 assert self._state == b'protocol-settings-receiving'
1467 1469 assert self._protocolsettingsdecoder is not None
1468 1470
1469 1471 if frame.typeid != FRAME_TYPE_SENDER_PROTOCOL_SETTINGS:
1470 1472 self._state = b'errored'
1471 1473 return self._makeerrorresult(
1472 1474 _(b'expected sender protocol settings frame; got %d')
1473 1475 % frame.typeid
1474 1476 )
1475 1477
1476 1478 more = frame.flags & FLAG_SENDER_PROTOCOL_SETTINGS_CONTINUATION
1477 1479 eos = frame.flags & FLAG_SENDER_PROTOCOL_SETTINGS_EOS
1478 1480
1479 1481 if more and eos:
1480 1482 self._state = b'errored'
1481 1483 return self._makeerrorresult(
1482 1484 _(
1483 1485 b'sender protocol settings frame cannot have both '
1484 1486 b'continuation and end of stream flags set'
1485 1487 )
1486 1488 )
1487 1489
1488 1490 if not more and not eos:
1489 1491 self._state = b'errored'
1490 1492 return self._makeerrorresult(
1491 1493 _(
1492 1494 b'sender protocol settings frame must have continuation or '
1493 1495 b'end of stream flag set'
1494 1496 )
1495 1497 )
1496 1498
1497 1499 # TODO establish limits for maximum amount of data that can be
1498 1500 # buffered.
1499 1501 try:
1500 1502 self._protocolsettingsdecoder.decode(frame.payload)
1501 1503 except Exception as e:
1502 1504 self._state = b'errored'
1503 1505 return self._makeerrorresult(
1504 1506 _(
1505 1507 b'error decoding CBOR from sender protocol settings frame: %s'
1506 1508 )
1507 1509 % stringutil.forcebytestr(e)
1508 1510 )
1509 1511
1510 1512 if more:
1511 1513 return self._makewantframeresult()
1512 1514
1513 1515 assert eos
1514 1516
1515 1517 decoded = self._protocolsettingsdecoder.getavailable()
1516 1518 self._protocolsettingsdecoder = None
1517 1519
1518 1520 if not decoded:
1519 1521 self._state = b'errored'
1520 1522 return self._makeerrorresult(
1521 1523 _(b'sender protocol settings frame did not contain CBOR data')
1522 1524 )
1523 1525 elif len(decoded) > 1:
1524 1526 self._state = b'errored'
1525 1527 return self._makeerrorresult(
1526 1528 _(
1527 1529 b'sender protocol settings frame contained multiple CBOR '
1528 1530 b'values'
1529 1531 )
1530 1532 )
1531 1533
1532 1534 d = decoded[0]
1533 1535
1534 1536 if b'contentencodings' in d:
1535 1537 self._sendersettings[b'contentencodings'] = d[b'contentencodings']
1536 1538
1537 1539 self._state = b'idle'
1538 1540
1539 1541 return self._makewantframeresult()
1540 1542
1541 1543 def _onframeidle(self, frame):
1542 1544 # The only frame type that should be received in this state is a
1543 1545 # command request.
1544 1546 if frame.typeid != FRAME_TYPE_COMMAND_REQUEST:
1545 1547 self._state = b'errored'
1546 1548 return self._makeerrorresult(
1547 1549 _(b'expected command request frame; got %d') % frame.typeid
1548 1550 )
1549 1551
1550 1552 res = self._validatecommandrequestframe(frame)
1551 1553 if res:
1552 1554 return res
1553 1555
1554 1556 if frame.requestid in self._receivingcommands:
1555 1557 self._state = b'errored'
1556 1558 return self._makeerrorresult(
1557 1559 _(b'request with ID %d already received') % frame.requestid
1558 1560 )
1559 1561
1560 1562 if frame.requestid in self._activecommands:
1561 1563 self._state = b'errored'
1562 1564 return self._makeerrorresult(
1563 1565 _(b'request with ID %d is already active') % frame.requestid
1564 1566 )
1565 1567
1566 1568 new = frame.flags & FLAG_COMMAND_REQUEST_NEW
1567 1569 moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES
1568 1570 expectingdata = frame.flags & FLAG_COMMAND_REQUEST_EXPECT_DATA
1569 1571
1570 1572 if not new:
1571 1573 self._state = b'errored'
1572 1574 return self._makeerrorresult(
1573 1575 _(b'received command request frame without new flag set')
1574 1576 )
1575 1577
1576 1578 payload = util.bytesio()
1577 1579 payload.write(frame.payload)
1578 1580
1579 1581 self._receivingcommands[frame.requestid] = {
1580 1582 b'payload': payload,
1581 1583 b'data': None,
1582 1584 b'requestdone': not moreframes,
1583 1585 b'expectingdata': bool(expectingdata),
1584 1586 }
1585 1587
1586 1588 # This is the final frame for this request. Dispatch it.
1587 1589 if not moreframes and not expectingdata:
1588 1590 return self._makeruncommandresult(frame.requestid)
1589 1591
1590 1592 assert moreframes or expectingdata
1591 1593 self._state = b'command-receiving'
1592 1594 return self._makewantframeresult()
1593 1595
1594 1596 def _onframecommandreceiving(self, frame):
1595 1597 if frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
1596 1598 # Process new command requests as such.
1597 1599 if frame.flags & FLAG_COMMAND_REQUEST_NEW:
1598 1600 return self._onframeidle(frame)
1599 1601
1600 1602 res = self._validatecommandrequestframe(frame)
1601 1603 if res:
1602 1604 return res
1603 1605
1604 1606 # All other frames should be related to a command that is currently
1605 1607 # receiving but is not active.
1606 1608 if frame.requestid in self._activecommands:
1607 1609 self._state = b'errored'
1608 1610 return self._makeerrorresult(
1609 1611 _(b'received frame for request that is still active: %d')
1610 1612 % frame.requestid
1611 1613 )
1612 1614
1613 1615 if frame.requestid not in self._receivingcommands:
1614 1616 self._state = b'errored'
1615 1617 return self._makeerrorresult(
1616 1618 _(b'received frame for request that is not receiving: %d')
1617 1619 % frame.requestid
1618 1620 )
1619 1621
1620 1622 entry = self._receivingcommands[frame.requestid]
1621 1623
1622 1624 if frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
1623 1625 moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES
1624 1626 expectingdata = bool(frame.flags & FLAG_COMMAND_REQUEST_EXPECT_DATA)
1625 1627
1626 1628 if entry[b'requestdone']:
1627 1629 self._state = b'errored'
1628 1630 return self._makeerrorresult(
1629 1631 _(
1630 1632 b'received command request frame when request frames '
1631 1633 b'were supposedly done'
1632 1634 )
1633 1635 )
1634 1636
1635 1637 if expectingdata != entry[b'expectingdata']:
1636 1638 self._state = b'errored'
1637 1639 return self._makeerrorresult(
1638 1640 _(b'mismatch between expect data flag and previous frame')
1639 1641 )
1640 1642
1641 1643 entry[b'payload'].write(frame.payload)
1642 1644
1643 1645 if not moreframes:
1644 1646 entry[b'requestdone'] = True
1645 1647
1646 1648 if not moreframes and not expectingdata:
1647 1649 return self._makeruncommandresult(frame.requestid)
1648 1650
1649 1651 return self._makewantframeresult()
1650 1652
1651 1653 elif frame.typeid == FRAME_TYPE_COMMAND_DATA:
1652 1654 if not entry[b'expectingdata']:
1653 1655 self._state = b'errored'
1654 1656 return self._makeerrorresult(
1655 1657 _(
1656 1658 b'received command data frame for request that is not '
1657 1659 b'expecting data: %d'
1658 1660 )
1659 1661 % frame.requestid
1660 1662 )
1661 1663
1662 1664 if entry[b'data'] is None:
1663 1665 entry[b'data'] = util.bytesio()
1664 1666
1665 1667 return self._handlecommanddataframe(frame, entry)
1666 1668 else:
1667 1669 self._state = b'errored'
1668 1670 return self._makeerrorresult(
1669 1671 _(b'received unexpected frame type: %d') % frame.typeid
1670 1672 )
1671 1673
1672 1674 def _handlecommanddataframe(self, frame, entry):
1673 1675 assert frame.typeid == FRAME_TYPE_COMMAND_DATA
1674 1676
1675 1677 # TODO support streaming data instead of buffering it.
1676 1678 entry[b'data'].write(frame.payload)
1677 1679
1678 1680 if frame.flags & FLAG_COMMAND_DATA_CONTINUATION:
1679 1681 return self._makewantframeresult()
1680 1682 elif frame.flags & FLAG_COMMAND_DATA_EOS:
1681 1683 entry[b'data'].seek(0)
1682 1684 return self._makeruncommandresult(frame.requestid)
1683 1685 else:
1684 1686 self._state = b'errored'
1685 1687 return self._makeerrorresult(_(b'command data frame without flags'))
1686 1688
1687 1689 def _onframeerrored(self, frame):
1688 1690 return self._makeerrorresult(_(b'server already errored'))
1689 1691
1690 1692
1691 1693 class commandrequest:
1692 1694 """Represents a request to run a command."""
1693 1695
1694 1696 def __init__(self, requestid, name, args, datafh=None, redirect=None):
1695 1697 self.requestid = requestid
1696 1698 self.name = name
1697 1699 self.args = args
1698 1700 self.datafh = datafh
1699 1701 self.redirect = redirect
1700 1702 self.state = b'pending'
1701 1703
1702 1704
1703 1705 class clientreactor:
1704 1706 """Holds state of a client issuing frame-based protocol requests.
1705 1707
1706 1708 This is like ``serverreactor`` but for client-side state.
1707 1709
1708 1710 Each instance is bound to the lifetime of a connection. For persistent
1709 1711 connection transports using e.g. TCP sockets and speaking the raw
1710 1712 framing protocol, there will be a single instance for the lifetime of
1711 1713 the TCP socket. For transports where there are multiple discrete
1712 1714 interactions (say tunneled within in HTTP request), there will be a
1713 1715 separate instance for each distinct interaction.
1714 1716
1715 1717 Consumers are expected to tell instances when events occur by calling
1716 1718 various methods. These methods return a 2-tuple describing any follow-up
1717 1719 action(s) to take. The first element is the name of an action to
1718 1720 perform. The second is a data structure (usually a dict) specific to
1719 1721 that action that contains more information. e.g. if the reactor wants
1720 1722 to send frames to the server, the data structure will contain a reference
1721 1723 to those frames.
1722 1724
1723 1725 Valid actions that consumers can be instructed to take are:
1724 1726
1725 1727 noop
1726 1728 Indicates no additional action is required.
1727 1729
1728 1730 sendframes
1729 1731 Indicates that frames should be sent to the server. The ``framegen``
1730 1732 key contains a generator of frames that should be sent. The reactor
1731 1733 assumes that all frames in this generator are sent to the server.
1732 1734
1733 1735 error
1734 1736 Indicates that an error occurred. The ``message`` key contains an
1735 1737 error message describing the failure.
1736 1738
1737 1739 responsedata
1738 1740 Indicates a response to a previously-issued command was received.
1739 1741
1740 1742 The ``request`` key contains the ``commandrequest`` instance that
1741 1743 represents the request this data is for.
1742 1744
1743 1745 The ``data`` key contains the decoded data from the server.
1744 1746
1745 1747 ``expectmore`` and ``eos`` evaluate to True when more response data
1746 1748 is expected to follow or we're at the end of the response stream,
1747 1749 respectively.
1748 1750 """
1749 1751
1750 1752 def __init__(
1751 1753 self,
1752 1754 ui,
1753 1755 hasmultiplesend=False,
1754 1756 buffersends=True,
1755 1757 clientcontentencoders=None,
1756 1758 ):
1757 1759 """Create a new instance.
1758 1760
1759 1761 ``hasmultiplesend`` indicates whether multiple sends are supported
1760 1762 by the transport. When True, it is possible to send commands immediately
1761 1763 instead of buffering until the caller signals an intent to finish a
1762 1764 send operation.
1763 1765
1764 1766 ``buffercommands`` indicates whether sends should be buffered until the
1765 1767 last request has been issued.
1766 1768
1767 1769 ``clientcontentencoders`` is an iterable of content encoders the client
1768 1770 will advertise to the server and that the server can use for encoding
1769 1771 data. If not defined, the client will not advertise content encoders
1770 1772 to the server.
1771 1773 """
1772 1774 self._ui = ui
1773 1775 self._hasmultiplesend = hasmultiplesend
1774 1776 self._buffersends = buffersends
1775 1777 self._clientcontentencoders = clientcontentencoders
1776 1778
1777 1779 self._canissuecommands = True
1778 1780 self._cansend = True
1779 1781 self._protocolsettingssent = False
1780 1782
1781 1783 self._nextrequestid = 1
1782 1784 # We only support a single outgoing stream for now.
1783 1785 self._outgoingstream = outputstream(1)
1784 1786 self._pendingrequests = collections.deque()
1785 1787 self._activerequests = {}
1786 1788 self._incomingstreams = {}
1787 1789 self._streamsettingsdecoders = {}
1788 1790
1789 1791 populatestreamencoders()
1790 1792
1791 1793 def callcommand(self, name, args, datafh=None, redirect=None):
1792 1794 """Request that a command be executed.
1793 1795
1794 1796 Receives the command name, a dict of arguments to pass to the command,
1795 1797 and an optional file object containing the raw data for the command.
1796 1798
1797 1799 Returns a 3-tuple of (request, action, action data).
1798 1800 """
1799 1801 if not self._canissuecommands:
1800 1802 raise error.ProgrammingError(b'cannot issue new commands')
1801 1803
1802 1804 requestid = self._nextrequestid
1803 1805 self._nextrequestid += 2
1804 1806
1805 1807 request = commandrequest(
1806 1808 requestid, name, args, datafh=datafh, redirect=redirect
1807 1809 )
1808 1810
1809 1811 if self._buffersends:
1810 1812 self._pendingrequests.append(request)
1811 1813 return request, b'noop', {}
1812 1814 else:
1813 1815 if not self._cansend:
1814 1816 raise error.ProgrammingError(
1815 1817 b'sends cannot be performed on this instance'
1816 1818 )
1817 1819
1818 1820 if not self._hasmultiplesend:
1819 1821 self._cansend = False
1820 1822 self._canissuecommands = False
1821 1823
1822 1824 return (
1823 1825 request,
1824 1826 b'sendframes',
1825 1827 {
1826 1828 b'framegen': self._makecommandframes(request),
1827 1829 },
1828 1830 )
1829 1831
1830 1832 def flushcommands(self):
1831 1833 """Request that all queued commands be sent.
1832 1834
1833 1835 If any commands are buffered, this will instruct the caller to send
1834 1836 them over the wire. If no commands are buffered it instructs the client
1835 1837 to no-op.
1836 1838
1837 1839 If instances aren't configured for multiple sends, no new command
1838 1840 requests are allowed after this is called.
1839 1841 """
1840 1842 if not self._pendingrequests:
1841 1843 return b'noop', {}
1842 1844
1843 1845 if not self._cansend:
1844 1846 raise error.ProgrammingError(
1845 1847 b'sends cannot be performed on this instance'
1846 1848 )
1847 1849
1848 1850 # If the instance only allows sending once, mark that we have fired
1849 1851 # our one shot.
1850 1852 if not self._hasmultiplesend:
1851 1853 self._canissuecommands = False
1852 1854 self._cansend = False
1853 1855
1854 1856 def makeframes():
1855 1857 while self._pendingrequests:
1856 1858 request = self._pendingrequests.popleft()
1857 1859 for frame in self._makecommandframes(request):
1858 1860 yield frame
1859 1861
1860 1862 return b'sendframes', {
1861 1863 b'framegen': makeframes(),
1862 1864 }
1863 1865
1864 1866 def _makecommandframes(self, request):
1865 1867 """Emit frames to issue a command request.
1866 1868
1867 1869 As a side-effect, update request accounting to reflect its changed
1868 1870 state.
1869 1871 """
1870 1872 self._activerequests[request.requestid] = request
1871 1873 request.state = b'sending'
1872 1874
1873 1875 if not self._protocolsettingssent and self._clientcontentencoders:
1874 1876 self._protocolsettingssent = True
1875 1877
1876 1878 payload = b''.join(
1877 1879 cborutil.streamencode(
1878 1880 {
1879 1881 b'contentencodings': self._clientcontentencoders,
1880 1882 }
1881 1883 )
1882 1884 )
1883 1885
1884 1886 yield self._outgoingstream.makeframe(
1885 1887 requestid=request.requestid,
1886 1888 typeid=FRAME_TYPE_SENDER_PROTOCOL_SETTINGS,
1887 1889 flags=FLAG_SENDER_PROTOCOL_SETTINGS_EOS,
1888 1890 payload=payload,
1889 1891 )
1890 1892
1891 1893 res = createcommandframes(
1892 1894 self._outgoingstream,
1893 1895 request.requestid,
1894 1896 request.name,
1895 1897 request.args,
1896 1898 datafh=request.datafh,
1897 1899 redirect=request.redirect,
1898 1900 )
1899 1901
1900 1902 for frame in res:
1901 1903 yield frame
1902 1904
1903 1905 request.state = b'sent'
1904 1906
1905 1907 def onframerecv(self, frame):
1906 1908 """Process a frame that has been received off the wire.
1907 1909
1908 1910 Returns a 2-tuple of (action, meta) describing further action the
1909 1911 caller needs to take as a result of receiving this frame.
1910 1912 """
1911 1913 if frame.streamid % 2:
1912 1914 return (
1913 1915 b'error',
1914 1916 {
1915 1917 b'message': (
1916 1918 _(b'received frame with odd numbered stream ID: %d')
1917 1919 % frame.streamid
1918 1920 ),
1919 1921 },
1920 1922 )
1921 1923
1922 1924 if frame.streamid not in self._incomingstreams:
1923 1925 if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM:
1924 1926 return (
1925 1927 b'error',
1926 1928 {
1927 1929 b'message': _(
1928 1930 b'received frame on unknown stream '
1929 1931 b'without beginning of stream flag set'
1930 1932 ),
1931 1933 },
1932 1934 )
1933 1935
1934 1936 self._incomingstreams[frame.streamid] = inputstream(frame.streamid)
1935 1937
1936 1938 stream = self._incomingstreams[frame.streamid]
1937 1939
1938 1940 # If the payload is encoded, ask the stream to decode it. We
1939 1941 # merely substitute the decoded result into the frame payload as
1940 1942 # if it had been transferred all along.
1941 1943 if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED:
1942 1944 frame.payload = stream.decode(frame.payload)
1943 1945
1944 1946 if frame.streamflags & STREAM_FLAG_END_STREAM:
1945 1947 del self._incomingstreams[frame.streamid]
1946 1948
1947 1949 if frame.typeid == FRAME_TYPE_STREAM_SETTINGS:
1948 1950 return self._onstreamsettingsframe(frame)
1949 1951
1950 1952 if frame.requestid not in self._activerequests:
1951 1953 return (
1952 1954 b'error',
1953 1955 {
1954 1956 b'message': (
1955 1957 _(b'received frame for inactive request ID: %d')
1956 1958 % frame.requestid
1957 1959 ),
1958 1960 },
1959 1961 )
1960 1962
1961 1963 request = self._activerequests[frame.requestid]
1962 1964 request.state = b'receiving'
1963 1965
1964 1966 handlers = {
1965 1967 FRAME_TYPE_COMMAND_RESPONSE: self._oncommandresponseframe,
1966 1968 FRAME_TYPE_ERROR_RESPONSE: self._onerrorresponseframe,
1967 1969 }
1968 1970
1969 1971 meth = handlers.get(frame.typeid)
1970 1972 if not meth:
1971 1973 raise error.ProgrammingError(
1972 1974 b'unhandled frame type: %d' % frame.typeid
1973 1975 )
1974 1976
1975 1977 return meth(request, frame)
1976 1978
1977 1979 def _onstreamsettingsframe(self, frame):
1978 1980 assert frame.typeid == FRAME_TYPE_STREAM_SETTINGS
1979 1981
1980 1982 more = frame.flags & FLAG_STREAM_ENCODING_SETTINGS_CONTINUATION
1981 1983 eos = frame.flags & FLAG_STREAM_ENCODING_SETTINGS_EOS
1982 1984
1983 1985 if more and eos:
1984 1986 return (
1985 1987 b'error',
1986 1988 {
1987 1989 b'message': (
1988 1990 _(
1989 1991 b'stream encoding settings frame cannot have both '
1990 1992 b'continuation and end of stream flags set'
1991 1993 )
1992 1994 ),
1993 1995 },
1994 1996 )
1995 1997
1996 1998 if not more and not eos:
1997 1999 return (
1998 2000 b'error',
1999 2001 {
2000 2002 b'message': _(
2001 2003 b'stream encoding settings frame must have '
2002 2004 b'continuation or end of stream flag set'
2003 2005 ),
2004 2006 },
2005 2007 )
2006 2008
2007 2009 if frame.streamid not in self._streamsettingsdecoders:
2008 2010 decoder = cborutil.bufferingdecoder()
2009 2011 self._streamsettingsdecoders[frame.streamid] = decoder
2010 2012
2011 2013 decoder = self._streamsettingsdecoders[frame.streamid]
2012 2014
2013 2015 try:
2014 2016 decoder.decode(frame.payload)
2015 2017 except Exception as e:
2016 2018 return (
2017 2019 b'error',
2018 2020 {
2019 2021 b'message': (
2020 2022 _(
2021 2023 b'error decoding CBOR from stream encoding '
2022 2024 b'settings frame: %s'
2023 2025 )
2024 2026 % stringutil.forcebytestr(e)
2025 2027 ),
2026 2028 },
2027 2029 )
2028 2030
2029 2031 if more:
2030 2032 return b'noop', {}
2031 2033
2032 2034 assert eos
2033 2035
2034 2036 decoded = decoder.getavailable()
2035 2037 del self._streamsettingsdecoders[frame.streamid]
2036 2038
2037 2039 if not decoded:
2038 2040 return (
2039 2041 b'error',
2040 2042 {
2041 2043 b'message': _(
2042 2044 b'stream encoding settings frame did not contain '
2043 2045 b'CBOR data'
2044 2046 ),
2045 2047 },
2046 2048 )
2047 2049
2048 2050 try:
2049 2051 self._incomingstreams[frame.streamid].setdecoder(
2050 2052 self._ui, decoded[0], decoded[1:]
2051 2053 )
2052 2054 except Exception as e:
2053 2055 return (
2054 2056 b'error',
2055 2057 {
2056 2058 b'message': (
2057 2059 _(b'error setting stream decoder: %s')
2058 2060 % stringutil.forcebytestr(e)
2059 2061 ),
2060 2062 },
2061 2063 )
2062 2064
2063 2065 return b'noop', {}
2064 2066
2065 2067 def _oncommandresponseframe(self, request, frame):
2066 2068 if frame.flags & FLAG_COMMAND_RESPONSE_EOS:
2067 2069 request.state = b'received'
2068 2070 del self._activerequests[request.requestid]
2069 2071
2070 2072 return (
2071 2073 b'responsedata',
2072 2074 {
2073 2075 b'request': request,
2074 2076 b'expectmore': frame.flags & FLAG_COMMAND_RESPONSE_CONTINUATION,
2075 2077 b'eos': frame.flags & FLAG_COMMAND_RESPONSE_EOS,
2076 2078 b'data': frame.payload,
2077 2079 },
2078 2080 )
2079 2081
2080 2082 def _onerrorresponseframe(self, request, frame):
2081 2083 request.state = b'errored'
2082 2084 del self._activerequests[request.requestid]
2083 2085
2084 2086 # The payload should be a CBOR map.
2085 2087 m = cborutil.decodeall(frame.payload)[0]
2086 2088
2087 2089 return (
2088 2090 b'error',
2089 2091 {
2090 2092 b'request': request,
2091 2093 b'type': m[b'type'],
2092 2094 b'message': m[b'message'],
2093 2095 },
2094 2096 )
General Comments 0
You need to be logged in to leave comments. Login now