##// END OF EJS Templates
wireprotov2: don't emit empty frames...
Gregory Szorc -
r40171:3a6d6c54 default
parent child Browse files
Show More
@@ -1,1738 +1,1744 b''
1 1 # wireprotoframing.py - unified framing protocol for wire protocol
2 2 #
3 3 # Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 # This file contains functionality to support the unified frame-based wire
9 9 # protocol. For details about the protocol, see
10 10 # `hg help internals.wireprotocol`.
11 11
12 12 from __future__ import absolute_import
13 13
14 14 import collections
15 15 import struct
16 16
17 17 from .i18n import _
18 18 from .thirdparty import (
19 19 attr,
20 20 )
21 21 from . import (
22 22 encoding,
23 23 error,
24 24 pycompat,
25 25 util,
26 26 wireprototypes,
27 27 )
28 28 from .utils import (
29 29 cborutil,
30 30 stringutil,
31 31 )
32 32
33 33 FRAME_HEADER_SIZE = 8
34 34 DEFAULT_MAX_FRAME_SIZE = 32768
35 35
36 36 STREAM_FLAG_BEGIN_STREAM = 0x01
37 37 STREAM_FLAG_END_STREAM = 0x02
38 38 STREAM_FLAG_ENCODING_APPLIED = 0x04
39 39
40 40 STREAM_FLAGS = {
41 41 b'stream-begin': STREAM_FLAG_BEGIN_STREAM,
42 42 b'stream-end': STREAM_FLAG_END_STREAM,
43 43 b'encoded': STREAM_FLAG_ENCODING_APPLIED,
44 44 }
45 45
46 46 FRAME_TYPE_COMMAND_REQUEST = 0x01
47 47 FRAME_TYPE_COMMAND_DATA = 0x02
48 48 FRAME_TYPE_COMMAND_RESPONSE = 0x03
49 49 FRAME_TYPE_ERROR_RESPONSE = 0x05
50 50 FRAME_TYPE_TEXT_OUTPUT = 0x06
51 51 FRAME_TYPE_PROGRESS = 0x07
52 52 FRAME_TYPE_SENDER_PROTOCOL_SETTINGS = 0x08
53 53 FRAME_TYPE_STREAM_SETTINGS = 0x09
54 54
55 55 FRAME_TYPES = {
56 56 b'command-request': FRAME_TYPE_COMMAND_REQUEST,
57 57 b'command-data': FRAME_TYPE_COMMAND_DATA,
58 58 b'command-response': FRAME_TYPE_COMMAND_RESPONSE,
59 59 b'error-response': FRAME_TYPE_ERROR_RESPONSE,
60 60 b'text-output': FRAME_TYPE_TEXT_OUTPUT,
61 61 b'progress': FRAME_TYPE_PROGRESS,
62 62 b'sender-protocol-settings': FRAME_TYPE_SENDER_PROTOCOL_SETTINGS,
63 63 b'stream-settings': FRAME_TYPE_STREAM_SETTINGS,
64 64 }
65 65
66 66 FLAG_COMMAND_REQUEST_NEW = 0x01
67 67 FLAG_COMMAND_REQUEST_CONTINUATION = 0x02
68 68 FLAG_COMMAND_REQUEST_MORE_FRAMES = 0x04
69 69 FLAG_COMMAND_REQUEST_EXPECT_DATA = 0x08
70 70
71 71 FLAGS_COMMAND_REQUEST = {
72 72 b'new': FLAG_COMMAND_REQUEST_NEW,
73 73 b'continuation': FLAG_COMMAND_REQUEST_CONTINUATION,
74 74 b'more': FLAG_COMMAND_REQUEST_MORE_FRAMES,
75 75 b'have-data': FLAG_COMMAND_REQUEST_EXPECT_DATA,
76 76 }
77 77
78 78 FLAG_COMMAND_DATA_CONTINUATION = 0x01
79 79 FLAG_COMMAND_DATA_EOS = 0x02
80 80
81 81 FLAGS_COMMAND_DATA = {
82 82 b'continuation': FLAG_COMMAND_DATA_CONTINUATION,
83 83 b'eos': FLAG_COMMAND_DATA_EOS,
84 84 }
85 85
86 86 FLAG_COMMAND_RESPONSE_CONTINUATION = 0x01
87 87 FLAG_COMMAND_RESPONSE_EOS = 0x02
88 88
89 89 FLAGS_COMMAND_RESPONSE = {
90 90 b'continuation': FLAG_COMMAND_RESPONSE_CONTINUATION,
91 91 b'eos': FLAG_COMMAND_RESPONSE_EOS,
92 92 }
93 93
94 94 FLAG_SENDER_PROTOCOL_SETTINGS_CONTINUATION = 0x01
95 95 FLAG_SENDER_PROTOCOL_SETTINGS_EOS = 0x02
96 96
97 97 FLAGS_SENDER_PROTOCOL_SETTINGS = {
98 98 b'continuation': FLAG_SENDER_PROTOCOL_SETTINGS_CONTINUATION,
99 99 b'eos': FLAG_SENDER_PROTOCOL_SETTINGS_EOS,
100 100 }
101 101
102 102 FLAG_STREAM_ENCODING_SETTINGS_CONTINUATION = 0x01
103 103 FLAG_STREAM_ENCODING_SETTINGS_EOS = 0x02
104 104
105 105 FLAGS_STREAM_ENCODING_SETTINGS = {
106 106 b'continuation': FLAG_STREAM_ENCODING_SETTINGS_CONTINUATION,
107 107 b'eos': FLAG_STREAM_ENCODING_SETTINGS_EOS,
108 108 }
109 109
110 110 # Maps frame types to their available flags.
111 111 FRAME_TYPE_FLAGS = {
112 112 FRAME_TYPE_COMMAND_REQUEST: FLAGS_COMMAND_REQUEST,
113 113 FRAME_TYPE_COMMAND_DATA: FLAGS_COMMAND_DATA,
114 114 FRAME_TYPE_COMMAND_RESPONSE: FLAGS_COMMAND_RESPONSE,
115 115 FRAME_TYPE_ERROR_RESPONSE: {},
116 116 FRAME_TYPE_TEXT_OUTPUT: {},
117 117 FRAME_TYPE_PROGRESS: {},
118 118 FRAME_TYPE_SENDER_PROTOCOL_SETTINGS: FLAGS_SENDER_PROTOCOL_SETTINGS,
119 119 FRAME_TYPE_STREAM_SETTINGS: FLAGS_STREAM_ENCODING_SETTINGS,
120 120 }
121 121
122 122 ARGUMENT_RECORD_HEADER = struct.Struct(r'<HH')
123 123
124 124 def humanflags(mapping, value):
125 125 """Convert a numeric flags value to a human value, using a mapping table."""
126 126 namemap = {v: k for k, v in mapping.iteritems()}
127 127 flags = []
128 128 val = 1
129 129 while value >= val:
130 130 if value & val:
131 131 flags.append(namemap.get(val, '<unknown 0x%02x>' % val))
132 132 val <<= 1
133 133
134 134 return b'|'.join(flags)
135 135
136 136 @attr.s(slots=True)
137 137 class frameheader(object):
138 138 """Represents the data in a frame header."""
139 139
140 140 length = attr.ib()
141 141 requestid = attr.ib()
142 142 streamid = attr.ib()
143 143 streamflags = attr.ib()
144 144 typeid = attr.ib()
145 145 flags = attr.ib()
146 146
147 147 @attr.s(slots=True, repr=False)
148 148 class frame(object):
149 149 """Represents a parsed frame."""
150 150
151 151 requestid = attr.ib()
152 152 streamid = attr.ib()
153 153 streamflags = attr.ib()
154 154 typeid = attr.ib()
155 155 flags = attr.ib()
156 156 payload = attr.ib()
157 157
158 158 @encoding.strmethod
159 159 def __repr__(self):
160 160 typename = '<unknown 0x%02x>' % self.typeid
161 161 for name, value in FRAME_TYPES.iteritems():
162 162 if value == self.typeid:
163 163 typename = name
164 164 break
165 165
166 166 return ('frame(size=%d; request=%d; stream=%d; streamflags=%s; '
167 167 'type=%s; flags=%s)' % (
168 168 len(self.payload), self.requestid, self.streamid,
169 169 humanflags(STREAM_FLAGS, self.streamflags), typename,
170 170 humanflags(FRAME_TYPE_FLAGS.get(self.typeid, {}), self.flags)))
171 171
172 172 def makeframe(requestid, streamid, streamflags, typeid, flags, payload):
173 173 """Assemble a frame into a byte array."""
174 174 # TODO assert size of payload.
175 175 frame = bytearray(FRAME_HEADER_SIZE + len(payload))
176 176
177 177 # 24 bits length
178 178 # 16 bits request id
179 179 # 8 bits stream id
180 180 # 8 bits stream flags
181 181 # 4 bits type
182 182 # 4 bits flags
183 183
184 184 l = struct.pack(r'<I', len(payload))
185 185 frame[0:3] = l[0:3]
186 186 struct.pack_into(r'<HBB', frame, 3, requestid, streamid, streamflags)
187 187 frame[7] = (typeid << 4) | flags
188 188 frame[8:] = payload
189 189
190 190 return frame
191 191
192 192 def makeframefromhumanstring(s):
193 193 """Create a frame from a human readable string
194 194
195 195 Strings have the form:
196 196
197 197 <request-id> <stream-id> <stream-flags> <type> <flags> <payload>
198 198
199 199 This can be used by user-facing applications and tests for creating
200 200 frames easily without having to type out a bunch of constants.
201 201
202 202 Request ID and stream IDs are integers.
203 203
204 204 Stream flags, frame type, and flags can be specified by integer or
205 205 named constant.
206 206
207 207 Flags can be delimited by `|` to bitwise OR them together.
208 208
209 209 If the payload begins with ``cbor:``, the following string will be
210 210 evaluated as Python literal and the resulting object will be fed into
211 211 a CBOR encoder. Otherwise, the payload is interpreted as a Python
212 212 byte string literal.
213 213 """
214 214 fields = s.split(b' ', 5)
215 215 requestid, streamid, streamflags, frametype, frameflags, payload = fields
216 216
217 217 requestid = int(requestid)
218 218 streamid = int(streamid)
219 219
220 220 finalstreamflags = 0
221 221 for flag in streamflags.split(b'|'):
222 222 if flag in STREAM_FLAGS:
223 223 finalstreamflags |= STREAM_FLAGS[flag]
224 224 else:
225 225 finalstreamflags |= int(flag)
226 226
227 227 if frametype in FRAME_TYPES:
228 228 frametype = FRAME_TYPES[frametype]
229 229 else:
230 230 frametype = int(frametype)
231 231
232 232 finalflags = 0
233 233 validflags = FRAME_TYPE_FLAGS[frametype]
234 234 for flag in frameflags.split(b'|'):
235 235 if flag in validflags:
236 236 finalflags |= validflags[flag]
237 237 else:
238 238 finalflags |= int(flag)
239 239
240 240 if payload.startswith(b'cbor:'):
241 241 payload = b''.join(cborutil.streamencode(
242 242 stringutil.evalpythonliteral(payload[5:])))
243 243
244 244 else:
245 245 payload = stringutil.unescapestr(payload)
246 246
247 247 return makeframe(requestid=requestid, streamid=streamid,
248 248 streamflags=finalstreamflags, typeid=frametype,
249 249 flags=finalflags, payload=payload)
250 250
251 251 def parseheader(data):
252 252 """Parse a unified framing protocol frame header from a buffer.
253 253
254 254 The header is expected to be in the buffer at offset 0 and the
255 255 buffer is expected to be large enough to hold a full header.
256 256 """
257 257 # 24 bits payload length (little endian)
258 258 # 16 bits request ID
259 259 # 8 bits stream ID
260 260 # 8 bits stream flags
261 261 # 4 bits frame type
262 262 # 4 bits frame flags
263 263 # ... payload
264 264 framelength = data[0] + 256 * data[1] + 16384 * data[2]
265 265 requestid, streamid, streamflags = struct.unpack_from(r'<HBB', data, 3)
266 266 typeflags = data[7]
267 267
268 268 frametype = (typeflags & 0xf0) >> 4
269 269 frameflags = typeflags & 0x0f
270 270
271 271 return frameheader(framelength, requestid, streamid, streamflags,
272 272 frametype, frameflags)
273 273
274 274 def readframe(fh):
275 275 """Read a unified framing protocol frame from a file object.
276 276
277 277 Returns a 3-tuple of (type, flags, payload) for the decoded frame or
278 278 None if no frame is available. May raise if a malformed frame is
279 279 seen.
280 280 """
281 281 header = bytearray(FRAME_HEADER_SIZE)
282 282
283 283 readcount = fh.readinto(header)
284 284
285 285 if readcount == 0:
286 286 return None
287 287
288 288 if readcount != FRAME_HEADER_SIZE:
289 289 raise error.Abort(_('received incomplete frame: got %d bytes: %s') %
290 290 (readcount, header))
291 291
292 292 h = parseheader(header)
293 293
294 294 payload = fh.read(h.length)
295 295 if len(payload) != h.length:
296 296 raise error.Abort(_('frame length error: expected %d; got %d') %
297 297 (h.length, len(payload)))
298 298
299 299 return frame(h.requestid, h.streamid, h.streamflags, h.typeid, h.flags,
300 300 payload)
301 301
302 302 def createcommandframes(stream, requestid, cmd, args, datafh=None,
303 303 maxframesize=DEFAULT_MAX_FRAME_SIZE,
304 304 redirect=None):
305 305 """Create frames necessary to transmit a request to run a command.
306 306
307 307 This is a generator of bytearrays. Each item represents a frame
308 308 ready to be sent over the wire to a peer.
309 309 """
310 310 data = {b'name': cmd}
311 311 if args:
312 312 data[b'args'] = args
313 313
314 314 if redirect:
315 315 data[b'redirect'] = redirect
316 316
317 317 data = b''.join(cborutil.streamencode(data))
318 318
319 319 offset = 0
320 320
321 321 while True:
322 322 flags = 0
323 323
324 324 # Must set new or continuation flag.
325 325 if not offset:
326 326 flags |= FLAG_COMMAND_REQUEST_NEW
327 327 else:
328 328 flags |= FLAG_COMMAND_REQUEST_CONTINUATION
329 329
330 330 # Data frames is set on all frames.
331 331 if datafh:
332 332 flags |= FLAG_COMMAND_REQUEST_EXPECT_DATA
333 333
334 334 payload = data[offset:offset + maxframesize]
335 335 offset += len(payload)
336 336
337 337 if len(payload) == maxframesize and offset < len(data):
338 338 flags |= FLAG_COMMAND_REQUEST_MORE_FRAMES
339 339
340 340 yield stream.makeframe(requestid=requestid,
341 341 typeid=FRAME_TYPE_COMMAND_REQUEST,
342 342 flags=flags,
343 343 payload=payload)
344 344
345 345 if not (flags & FLAG_COMMAND_REQUEST_MORE_FRAMES):
346 346 break
347 347
348 348 if datafh:
349 349 while True:
350 350 data = datafh.read(DEFAULT_MAX_FRAME_SIZE)
351 351
352 352 done = False
353 353 if len(data) == DEFAULT_MAX_FRAME_SIZE:
354 354 flags = FLAG_COMMAND_DATA_CONTINUATION
355 355 else:
356 356 flags = FLAG_COMMAND_DATA_EOS
357 357 assert datafh.read(1) == b''
358 358 done = True
359 359
360 360 yield stream.makeframe(requestid=requestid,
361 361 typeid=FRAME_TYPE_COMMAND_DATA,
362 362 flags=flags,
363 363 payload=data)
364 364
365 365 if done:
366 366 break
367 367
368 368 def createcommandresponseokframe(stream, requestid):
369 369 overall = b''.join(cborutil.streamencode({b'status': b'ok'}))
370 370
371 371 return stream.makeframe(requestid=requestid,
372 372 typeid=FRAME_TYPE_COMMAND_RESPONSE,
373 373 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
374 374 payload=overall)
375 375
376 376 def createcommandresponseeosframe(stream, requestid):
377 377 """Create an empty payload frame representing command end-of-stream."""
378 378 return stream.makeframe(requestid=requestid,
379 379 typeid=FRAME_TYPE_COMMAND_RESPONSE,
380 380 flags=FLAG_COMMAND_RESPONSE_EOS,
381 381 payload=b'')
382 382
383 383 def createalternatelocationresponseframe(stream, requestid, location):
384 384 data = {
385 385 b'status': b'redirect',
386 386 b'location': {
387 387 b'url': location.url,
388 388 b'mediatype': location.mediatype,
389 389 }
390 390 }
391 391
392 392 for a in (r'size', r'fullhashes', r'fullhashseed', r'serverdercerts',
393 393 r'servercadercerts'):
394 394 value = getattr(location, a)
395 395 if value is not None:
396 396 data[b'location'][pycompat.bytestr(a)] = value
397 397
398 398 return stream.makeframe(requestid=requestid,
399 399 typeid=FRAME_TYPE_COMMAND_RESPONSE,
400 400 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
401 401 payload=b''.join(cborutil.streamencode(data)))
402 402
403 403 def createcommanderrorresponse(stream, requestid, message, args=None):
404 404 # TODO should this be using a list of {'msg': ..., 'args': {}} so atom
405 405 # formatting works consistently?
406 406 m = {
407 407 b'status': b'error',
408 408 b'error': {
409 409 b'message': message,
410 410 }
411 411 }
412 412
413 413 if args:
414 414 m[b'error'][b'args'] = args
415 415
416 416 overall = b''.join(cborutil.streamencode(m))
417 417
418 418 yield stream.makeframe(requestid=requestid,
419 419 typeid=FRAME_TYPE_COMMAND_RESPONSE,
420 420 flags=FLAG_COMMAND_RESPONSE_EOS,
421 421 payload=overall)
422 422
423 423 def createerrorframe(stream, requestid, msg, errtype):
424 424 # TODO properly handle frame size limits.
425 425 assert len(msg) <= DEFAULT_MAX_FRAME_SIZE
426 426
427 427 payload = b''.join(cborutil.streamencode({
428 428 b'type': errtype,
429 429 b'message': [{b'msg': msg}],
430 430 }))
431 431
432 432 yield stream.makeframe(requestid=requestid,
433 433 typeid=FRAME_TYPE_ERROR_RESPONSE,
434 434 flags=0,
435 435 payload=payload)
436 436
437 437 def createtextoutputframe(stream, requestid, atoms,
438 438 maxframesize=DEFAULT_MAX_FRAME_SIZE):
439 439 """Create a text output frame to render text to people.
440 440
441 441 ``atoms`` is a 3-tuple of (formatting string, args, labels).
442 442
443 443 The formatting string contains ``%s`` tokens to be replaced by the
444 444 corresponding indexed entry in ``args``. ``labels`` is an iterable of
445 445 formatters to be applied at rendering time. In terms of the ``ui``
446 446 class, each atom corresponds to a ``ui.write()``.
447 447 """
448 448 atomdicts = []
449 449
450 450 for (formatting, args, labels) in atoms:
451 451 # TODO look for localstr, other types here?
452 452
453 453 if not isinstance(formatting, bytes):
454 454 raise ValueError('must use bytes formatting strings')
455 455 for arg in args:
456 456 if not isinstance(arg, bytes):
457 457 raise ValueError('must use bytes for arguments')
458 458 for label in labels:
459 459 if not isinstance(label, bytes):
460 460 raise ValueError('must use bytes for labels')
461 461
462 462 # Formatting string must be ASCII.
463 463 formatting = formatting.decode(r'ascii', r'replace').encode(r'ascii')
464 464
465 465 # Arguments must be UTF-8.
466 466 args = [a.decode(r'utf-8', r'replace').encode(r'utf-8') for a in args]
467 467
468 468 # Labels must be ASCII.
469 469 labels = [l.decode(r'ascii', r'strict').encode(r'ascii')
470 470 for l in labels]
471 471
472 472 atom = {b'msg': formatting}
473 473 if args:
474 474 atom[b'args'] = args
475 475 if labels:
476 476 atom[b'labels'] = labels
477 477
478 478 atomdicts.append(atom)
479 479
480 480 payload = b''.join(cborutil.streamencode(atomdicts))
481 481
482 482 if len(payload) > maxframesize:
483 483 raise ValueError('cannot encode data in a single frame')
484 484
485 485 yield stream.makeframe(requestid=requestid,
486 486 typeid=FRAME_TYPE_TEXT_OUTPUT,
487 487 flags=0,
488 488 payload=payload)
489 489
490 490 class bufferingcommandresponseemitter(object):
491 491 """Helper object to emit command response frames intelligently.
492 492
493 493 Raw command response data is likely emitted in chunks much smaller
494 494 than what can fit in a single frame. This class exists to buffer
495 495 chunks until enough data is available to fit in a single frame.
496 496
497 497 TODO we'll need something like this when compression is supported.
498 498 So it might make sense to implement this functionality at the stream
499 499 level.
500 500 """
501 501 def __init__(self, stream, requestid, maxframesize=DEFAULT_MAX_FRAME_SIZE):
502 502 self._stream = stream
503 503 self._requestid = requestid
504 504 self._maxsize = maxframesize
505 505 self._chunks = []
506 506 self._chunkssize = 0
507 507
508 508 def send(self, data):
509 509 """Send new data for emission.
510 510
511 511 Is a generator of new frames that were derived from the new input.
512 512
513 513 If the special input ``None`` is received, flushes all buffered
514 514 data to frames.
515 515 """
516 516
517 517 if data is None:
518 518 for frame in self._flush():
519 519 yield frame
520 520 return
521 521
522 522 # There is a ton of potential to do more complicated things here.
523 523 # Our immediate goal is to coalesce small chunks into big frames,
524 524 # not achieve the fewest number of frames possible. So we go with
525 525 # a simple implementation:
526 526 #
527 527 # * If a chunk is too large for a frame, we flush and emit frames
528 528 # for the new chunk.
529 529 # * If a chunk can be buffered without total buffered size limits
530 530 # being exceeded, we do that.
531 531 # * If a chunk causes us to go over our buffering limit, we flush
532 532 # and then buffer the new chunk.
533 533
534 if not data:
535 return
536
534 537 if len(data) > self._maxsize:
535 538 for frame in self._flush():
536 539 yield frame
537 540
538 541 # Now emit frames for the big chunk.
539 542 offset = 0
540 543 while True:
541 544 chunk = data[offset:offset + self._maxsize]
542 545 offset += len(chunk)
543 546
544 547 yield self._stream.makeframe(
545 548 self._requestid,
546 549 typeid=FRAME_TYPE_COMMAND_RESPONSE,
547 550 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
548 551 payload=chunk)
549 552
550 553 if offset == len(data):
551 554 return
552 555
553 556 # If we don't have enough to constitute a full frame, buffer and
554 557 # return.
555 558 if len(data) + self._chunkssize < self._maxsize:
556 559 self._chunks.append(data)
557 560 self._chunkssize += len(data)
558 561 return
559 562
560 563 # Else flush what we have and buffer the new chunk. We could do
561 564 # something more intelligent here, like break the chunk. Let's
562 565 # keep things simple for now.
563 566 for frame in self._flush():
564 567 yield frame
565 568
566 569 self._chunks.append(data)
567 570 self._chunkssize = len(data)
568 571
569 572 def _flush(self):
570 573 payload = b''.join(self._chunks)
571 574 assert len(payload) <= self._maxsize
572 575
573 576 self._chunks[:] = []
574 577 self._chunkssize = 0
575 578
579 if not payload:
580 return
581
576 582 yield self._stream.makeframe(
577 583 self._requestid,
578 584 typeid=FRAME_TYPE_COMMAND_RESPONSE,
579 585 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
580 586 payload=payload)
581 587
582 588 # TODO consider defining encoders/decoders using the util.compressionengine
583 589 # mechanism.
584 590
585 591 class identityencoder(object):
586 592 """Encoder for the "identity" stream encoding profile."""
587 593 def __init__(self, ui):
588 594 pass
589 595
590 596 def encode(self, data):
591 597 return data
592 598
593 599 def flush(self):
594 600 return b''
595 601
596 602 def finish(self):
597 603 return b''
598 604
599 605 class identitydecoder(object):
600 606 """Decoder for the "identity" stream encoding profile."""
601 607
602 608 def __init__(self, ui, extraobjs):
603 609 if extraobjs:
604 610 raise error.Abort(_('identity decoder received unexpected '
605 611 'additional values'))
606 612
607 613 def decode(self, data):
608 614 return data
609 615
610 616 class zlibencoder(object):
611 617 def __init__(self, ui):
612 618 import zlib
613 619 self._zlib = zlib
614 620 self._compressor = zlib.compressobj()
615 621
616 622 def encode(self, data):
617 623 return self._compressor.compress(data)
618 624
619 625 def flush(self):
620 626 # Z_SYNC_FLUSH doesn't reset compression context, which is
621 627 # what we want.
622 628 return self._compressor.flush(self._zlib.Z_SYNC_FLUSH)
623 629
624 630 def finish(self):
625 631 res = self._compressor.flush(self._zlib.Z_FINISH)
626 632 self._compressor = None
627 633 return res
628 634
629 635 class zlibdecoder(object):
630 636 def __init__(self, ui, extraobjs):
631 637 import zlib
632 638
633 639 if extraobjs:
634 640 raise error.Abort(_('zlib decoder received unexpected '
635 641 'additional values'))
636 642
637 643 self._decompressor = zlib.decompressobj()
638 644
639 645 def decode(self, data):
640 646 # Python 2's zlib module doesn't use the buffer protocol and can't
641 647 # handle all bytes-like types.
642 648 if not pycompat.ispy3 and isinstance(data, bytearray):
643 649 data = bytes(data)
644 650
645 651 return self._decompressor.decompress(data)
646 652
647 653 class zstdbaseencoder(object):
648 654 def __init__(self, level):
649 655 from . import zstd
650 656
651 657 self._zstd = zstd
652 658 cctx = zstd.ZstdCompressor(level=level)
653 659 self._compressor = cctx.compressobj()
654 660
655 661 def encode(self, data):
656 662 return self._compressor.compress(data)
657 663
658 664 def flush(self):
659 665 # COMPRESSOBJ_FLUSH_BLOCK flushes all data previously fed into the
660 666 # compressor and allows a decompressor to access all encoded data
661 667 # up to this point.
662 668 return self._compressor.flush(self._zstd.COMPRESSOBJ_FLUSH_BLOCK)
663 669
664 670 def finish(self):
665 671 res = self._compressor.flush(self._zstd.COMPRESSOBJ_FLUSH_FINISH)
666 672 self._compressor = None
667 673 return res
668 674
669 675 class zstd8mbencoder(zstdbaseencoder):
670 676 def __init__(self, ui):
671 677 super(zstd8mbencoder, self).__init__(3)
672 678
673 679 class zstdbasedecoder(object):
674 680 def __init__(self, maxwindowsize):
675 681 from . import zstd
676 682 dctx = zstd.ZstdDecompressor(max_window_size=maxwindowsize)
677 683 self._decompressor = dctx.decompressobj()
678 684
679 685 def decode(self, data):
680 686 return self._decompressor.decompress(data)
681 687
682 688 class zstd8mbdecoder(zstdbasedecoder):
683 689 def __init__(self, ui, extraobjs):
684 690 if extraobjs:
685 691 raise error.Abort(_('zstd8mb decoder received unexpected '
686 692 'additional values'))
687 693
688 694 super(zstd8mbdecoder, self).__init__(maxwindowsize=8 * 1048576)
689 695
690 696 # We lazily populate this to avoid excessive module imports when importing
691 697 # this module.
692 698 STREAM_ENCODERS = {}
693 699 STREAM_ENCODERS_ORDER = []
694 700
695 701 def populatestreamencoders():
696 702 if STREAM_ENCODERS:
697 703 return
698 704
699 705 try:
700 706 from . import zstd
701 707 zstd.__version__
702 708 except ImportError:
703 709 zstd = None
704 710
705 711 # zstandard is fastest and is preferred.
706 712 if zstd:
707 713 STREAM_ENCODERS[b'zstd-8mb'] = (zstd8mbencoder, zstd8mbdecoder)
708 714 STREAM_ENCODERS_ORDER.append(b'zstd-8mb')
709 715
710 716 STREAM_ENCODERS[b'zlib'] = (zlibencoder, zlibdecoder)
711 717 STREAM_ENCODERS_ORDER.append(b'zlib')
712 718
713 719 STREAM_ENCODERS[b'identity'] = (identityencoder, identitydecoder)
714 720 STREAM_ENCODERS_ORDER.append(b'identity')
715 721
716 722 class stream(object):
717 723 """Represents a logical unidirectional series of frames."""
718 724
719 725 def __init__(self, streamid, active=False):
720 726 self.streamid = streamid
721 727 self._active = active
722 728
723 729 def makeframe(self, requestid, typeid, flags, payload):
724 730 """Create a frame to be sent out over this stream.
725 731
726 732 Only returns the frame instance. Does not actually send it.
727 733 """
728 734 streamflags = 0
729 735 if not self._active:
730 736 streamflags |= STREAM_FLAG_BEGIN_STREAM
731 737 self._active = True
732 738
733 739 return makeframe(requestid, self.streamid, streamflags, typeid, flags,
734 740 payload)
735 741
736 742 class inputstream(stream):
737 743 """Represents a stream used for receiving data."""
738 744
739 745 def __init__(self, streamid, active=False):
740 746 super(inputstream, self).__init__(streamid, active=active)
741 747 self._decoder = None
742 748
743 749 def setdecoder(self, ui, name, extraobjs):
744 750 """Set the decoder for this stream.
745 751
746 752 Receives the stream profile name and any additional CBOR objects
747 753 decoded from the stream encoding settings frame payloads.
748 754 """
749 755 if name not in STREAM_ENCODERS:
750 756 raise error.Abort(_('unknown stream decoder: %s') % name)
751 757
752 758 self._decoder = STREAM_ENCODERS[name][1](ui, extraobjs)
753 759
754 760 def decode(self, data):
755 761 # Default is identity decoder. We don't bother instantiating one
756 762 # because it is trivial.
757 763 if not self._decoder:
758 764 return data
759 765
760 766 return self._decoder.decode(data)
761 767
762 768 def flush(self):
763 769 if not self._decoder:
764 770 return b''
765 771
766 772 return self._decoder.flush()
767 773
768 774 class outputstream(stream):
769 775 """Represents a stream used for sending data."""
770 776
771 777 def __init__(self, streamid, active=False):
772 778 super(outputstream, self).__init__(streamid, active=active)
773 779 self._encoder = None
774 780
775 781 def setencoder(self, ui, name):
776 782 """Set the encoder for this stream.
777 783
778 784 Receives the stream profile name.
779 785 """
780 786 if name not in STREAM_ENCODERS:
781 787 raise error.Abort(_('unknown stream encoder: %s') % name)
782 788
783 789 self._encoder = STREAM_ENCODERS[name][0](ui)
784 790
785 791 def encode(self, data):
786 792 if not self._encoder:
787 793 return data
788 794
789 795 return self._encoder.encode(data)
790 796
791 797 def flush(self):
792 798 if not self._encoder:
793 799 return b''
794 800
795 801 return self._encoder.flush()
796 802
797 803 def finish(self):
798 804 if not self._encoder:
799 805 return b''
800 806
801 807 self._encoder.finish()
802 808
803 809 def ensureserverstream(stream):
804 810 if stream.streamid % 2:
805 811 raise error.ProgrammingError('server should only write to even '
806 812 'numbered streams; %d is not even' %
807 813 stream.streamid)
808 814
809 815 DEFAULT_PROTOCOL_SETTINGS = {
810 816 'contentencodings': [b'identity'],
811 817 }
812 818
813 819 class serverreactor(object):
814 820 """Holds state of a server handling frame-based protocol requests.
815 821
816 822 This class is the "brain" of the unified frame-based protocol server
817 823 component. While the protocol is stateless from the perspective of
818 824 requests/commands, something needs to track which frames have been
819 825 received, what frames to expect, etc. This class is that thing.
820 826
821 827 Instances are modeled as a state machine of sorts. Instances are also
822 828 reactionary to external events. The point of this class is to encapsulate
823 829 the state of the connection and the exchange of frames, not to perform
824 830 work. Instead, callers tell this class when something occurs, like a
825 831 frame arriving. If that activity is worthy of a follow-up action (say
826 832 *run a command*), the return value of that handler will say so.
827 833
828 834 I/O and CPU intensive operations are purposefully delegated outside of
829 835 this class.
830 836
831 837 Consumers are expected to tell instances when events occur. They do so by
832 838 calling the various ``on*`` methods. These methods return a 2-tuple
833 839 describing any follow-up action(s) to take. The first element is the
834 840 name of an action to perform. The second is a data structure (usually
835 841 a dict) specific to that action that contains more information. e.g.
836 842 if the server wants to send frames back to the client, the data structure
837 843 will contain a reference to those frames.
838 844
839 845 Valid actions that consumers can be instructed to take are:
840 846
841 847 sendframes
842 848 Indicates that frames should be sent to the client. The ``framegen``
843 849 key contains a generator of frames that should be sent. The server
844 850 assumes that all frames are sent to the client.
845 851
846 852 error
847 853 Indicates that an error occurred. Consumer should probably abort.
848 854
849 855 runcommand
850 856 Indicates that the consumer should run a wire protocol command. Details
851 857 of the command to run are given in the data structure.
852 858
853 859 wantframe
854 860 Indicates that nothing of interest happened and the server is waiting on
855 861 more frames from the client before anything interesting can be done.
856 862
857 863 noop
858 864 Indicates no additional action is required.
859 865
860 866 Known Issues
861 867 ------------
862 868
863 869 There are no limits to the number of partially received commands or their
864 870 size. A malicious client could stream command request data and exhaust the
865 871 server's memory.
866 872
867 873 Partially received commands are not acted upon when end of input is
868 874 reached. Should the server error if it receives a partial request?
869 875 Should the client send a message to abort a partially transmitted request
870 876 to facilitate graceful shutdown?
871 877
872 878 Active requests that haven't been responded to aren't tracked. This means
873 879 that if we receive a command and instruct its dispatch, another command
874 880 with its request ID can come in over the wire and there will be a race
875 881 between who responds to what.
876 882 """
877 883
878 884 def __init__(self, ui, deferoutput=False):
879 885 """Construct a new server reactor.
880 886
881 887 ``deferoutput`` can be used to indicate that no output frames should be
882 888 instructed to be sent until input has been exhausted. In this mode,
883 889 events that would normally generate output frames (such as a command
884 890 response being ready) will instead defer instructing the consumer to
885 891 send those frames. This is useful for half-duplex transports where the
886 892 sender cannot receive until all data has been transmitted.
887 893 """
888 894 self._ui = ui
889 895 self._deferoutput = deferoutput
890 896 self._state = 'initial'
891 897 self._nextoutgoingstreamid = 2
892 898 self._bufferedframegens = []
893 899 # stream id -> stream instance for all active streams from the client.
894 900 self._incomingstreams = {}
895 901 self._outgoingstreams = {}
896 902 # request id -> dict of commands that are actively being received.
897 903 self._receivingcommands = {}
898 904 # Request IDs that have been received and are actively being processed.
899 905 # Once all output for a request has been sent, it is removed from this
900 906 # set.
901 907 self._activecommands = set()
902 908
903 909 self._protocolsettingsdecoder = None
904 910
905 911 # Sender protocol settings are optional. Set implied default values.
906 912 self._sendersettings = dict(DEFAULT_PROTOCOL_SETTINGS)
907 913
908 914 populatestreamencoders()
909 915
910 916 def onframerecv(self, frame):
911 917 """Process a frame that has been received off the wire.
912 918
913 919 Returns a dict with an ``action`` key that details what action,
914 920 if any, the consumer should take next.
915 921 """
916 922 if not frame.streamid % 2:
917 923 self._state = 'errored'
918 924 return self._makeerrorresult(
919 925 _('received frame with even numbered stream ID: %d') %
920 926 frame.streamid)
921 927
922 928 if frame.streamid not in self._incomingstreams:
923 929 if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM:
924 930 self._state = 'errored'
925 931 return self._makeerrorresult(
926 932 _('received frame on unknown inactive stream without '
927 933 'beginning of stream flag set'))
928 934
929 935 self._incomingstreams[frame.streamid] = inputstream(frame.streamid)
930 936
931 937 if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED:
932 938 # TODO handle decoding frames
933 939 self._state = 'errored'
934 940 raise error.ProgrammingError('support for decoding stream payloads '
935 941 'not yet implemented')
936 942
937 943 if frame.streamflags & STREAM_FLAG_END_STREAM:
938 944 del self._incomingstreams[frame.streamid]
939 945
940 946 handlers = {
941 947 'initial': self._onframeinitial,
942 948 'protocol-settings-receiving': self._onframeprotocolsettings,
943 949 'idle': self._onframeidle,
944 950 'command-receiving': self._onframecommandreceiving,
945 951 'errored': self._onframeerrored,
946 952 }
947 953
948 954 meth = handlers.get(self._state)
949 955 if not meth:
950 956 raise error.ProgrammingError('unhandled state: %s' % self._state)
951 957
952 958 return meth(frame)
953 959
954 960 def oncommandresponsereadyobjects(self, stream, requestid, objs):
955 961 """Signal that objects are ready to be sent to the client.
956 962
957 963 ``objs`` is an iterable of objects (typically a generator) that will
958 964 be encoded via CBOR and added to frames, which will be sent to the
959 965 client.
960 966 """
961 967 ensureserverstream(stream)
962 968
963 969 # A more robust solution would be to check for objs.{next,__next__}.
964 970 if isinstance(objs, list):
965 971 objs = iter(objs)
966 972
967 973 # We need to take care over exception handling. Uncaught exceptions
968 974 # when generating frames could lead to premature end of the frame
969 975 # stream and the possibility of the server or client process getting
970 976 # in a bad state.
971 977 #
972 978 # Keep in mind that if ``objs`` is a generator, advancing it could
973 979 # raise exceptions that originated in e.g. wire protocol command
974 980 # functions. That is why we differentiate between exceptions raised
975 981 # when iterating versus other exceptions that occur.
976 982 #
977 983 # In all cases, when the function finishes, the request is fully
978 984 # handled and no new frames for it should be seen.
979 985
980 986 def sendframes():
981 987 emitted = False
982 988 alternatelocationsent = False
983 989 emitter = bufferingcommandresponseemitter(stream, requestid)
984 990 while True:
985 991 try:
986 992 o = next(objs)
987 993 except StopIteration:
988 994 for frame in emitter.send(None):
989 995 yield frame
990 996
991 997 if emitted:
992 998 yield createcommandresponseeosframe(stream, requestid)
993 999 break
994 1000
995 1001 except error.WireprotoCommandError as e:
996 1002 for frame in createcommanderrorresponse(
997 1003 stream, requestid, e.message, e.messageargs):
998 1004 yield frame
999 1005 break
1000 1006
1001 1007 except Exception as e:
1002 1008 for frame in createerrorframe(
1003 1009 stream, requestid, '%s' % stringutil.forcebytestr(e),
1004 1010 errtype='server'):
1005 1011
1006 1012 yield frame
1007 1013
1008 1014 break
1009 1015
1010 1016 try:
1011 1017 # Alternate location responses can only be the first and
1012 1018 # only object in the output stream.
1013 1019 if isinstance(o, wireprototypes.alternatelocationresponse):
1014 1020 if emitted:
1015 1021 raise error.ProgrammingError(
1016 1022 'alternatelocationresponse seen after initial '
1017 1023 'output object')
1018 1024
1019 1025 yield createalternatelocationresponseframe(
1020 1026 stream, requestid, o)
1021 1027
1022 1028 alternatelocationsent = True
1023 1029 emitted = True
1024 1030 continue
1025 1031
1026 1032 if alternatelocationsent:
1027 1033 raise error.ProgrammingError(
1028 1034 'object follows alternatelocationresponse')
1029 1035
1030 1036 if not emitted:
1031 1037 yield createcommandresponseokframe(stream, requestid)
1032 1038 emitted = True
1033 1039
1034 1040 # Objects emitted by command functions can be serializable
1035 1041 # data structures or special types.
1036 1042 # TODO consider extracting the content normalization to a
1037 1043 # standalone function, as it may be useful for e.g. cachers.
1038 1044
1039 1045 # A pre-encoded object is sent directly to the emitter.
1040 1046 if isinstance(o, wireprototypes.encodedresponse):
1041 1047 for frame in emitter.send(o.data):
1042 1048 yield frame
1043 1049
1044 1050 # A regular object is CBOR encoded.
1045 1051 else:
1046 1052 for chunk in cborutil.streamencode(o):
1047 1053 for frame in emitter.send(chunk):
1048 1054 yield frame
1049 1055
1050 1056 except Exception as e:
1051 1057 for frame in createerrorframe(stream, requestid,
1052 1058 '%s' % e,
1053 1059 errtype='server'):
1054 1060 yield frame
1055 1061
1056 1062 break
1057 1063
1058 1064 self._activecommands.remove(requestid)
1059 1065
1060 1066 return self._handlesendframes(sendframes())
1061 1067
1062 1068 def oninputeof(self):
1063 1069 """Signals that end of input has been received.
1064 1070
1065 1071 No more frames will be received. All pending activity should be
1066 1072 completed.
1067 1073 """
1068 1074 # TODO should we do anything about in-flight commands?
1069 1075
1070 1076 if not self._deferoutput or not self._bufferedframegens:
1071 1077 return 'noop', {}
1072 1078
1073 1079 # If we buffered all our responses, emit those.
1074 1080 def makegen():
1075 1081 for gen in self._bufferedframegens:
1076 1082 for frame in gen:
1077 1083 yield frame
1078 1084
1079 1085 return 'sendframes', {
1080 1086 'framegen': makegen(),
1081 1087 }
1082 1088
1083 1089 def _handlesendframes(self, framegen):
1084 1090 if self._deferoutput:
1085 1091 self._bufferedframegens.append(framegen)
1086 1092 return 'noop', {}
1087 1093 else:
1088 1094 return 'sendframes', {
1089 1095 'framegen': framegen,
1090 1096 }
1091 1097
1092 1098 def onservererror(self, stream, requestid, msg):
1093 1099 ensureserverstream(stream)
1094 1100
1095 1101 def sendframes():
1096 1102 for frame in createerrorframe(stream, requestid, msg,
1097 1103 errtype='server'):
1098 1104 yield frame
1099 1105
1100 1106 self._activecommands.remove(requestid)
1101 1107
1102 1108 return self._handlesendframes(sendframes())
1103 1109
1104 1110 def oncommanderror(self, stream, requestid, message, args=None):
1105 1111 """Called when a command encountered an error before sending output."""
1106 1112 ensureserverstream(stream)
1107 1113
1108 1114 def sendframes():
1109 1115 for frame in createcommanderrorresponse(stream, requestid, message,
1110 1116 args):
1111 1117 yield frame
1112 1118
1113 1119 self._activecommands.remove(requestid)
1114 1120
1115 1121 return self._handlesendframes(sendframes())
1116 1122
1117 1123 def makeoutputstream(self):
1118 1124 """Create a stream to be used for sending data to the client."""
1119 1125 streamid = self._nextoutgoingstreamid
1120 1126 self._nextoutgoingstreamid += 2
1121 1127
1122 1128 s = outputstream(streamid)
1123 1129 self._outgoingstreams[streamid] = s
1124 1130
1125 1131 return s
1126 1132
1127 1133 def _makeerrorresult(self, msg):
1128 1134 return 'error', {
1129 1135 'message': msg,
1130 1136 }
1131 1137
1132 1138 def _makeruncommandresult(self, requestid):
1133 1139 entry = self._receivingcommands[requestid]
1134 1140
1135 1141 if not entry['requestdone']:
1136 1142 self._state = 'errored'
1137 1143 raise error.ProgrammingError('should not be called without '
1138 1144 'requestdone set')
1139 1145
1140 1146 del self._receivingcommands[requestid]
1141 1147
1142 1148 if self._receivingcommands:
1143 1149 self._state = 'command-receiving'
1144 1150 else:
1145 1151 self._state = 'idle'
1146 1152
1147 1153 # Decode the payloads as CBOR.
1148 1154 entry['payload'].seek(0)
1149 1155 request = cborutil.decodeall(entry['payload'].getvalue())[0]
1150 1156
1151 1157 if b'name' not in request:
1152 1158 self._state = 'errored'
1153 1159 return self._makeerrorresult(
1154 1160 _('command request missing "name" field'))
1155 1161
1156 1162 if b'args' not in request:
1157 1163 request[b'args'] = {}
1158 1164
1159 1165 assert requestid not in self._activecommands
1160 1166 self._activecommands.add(requestid)
1161 1167
1162 1168 return 'runcommand', {
1163 1169 'requestid': requestid,
1164 1170 'command': request[b'name'],
1165 1171 'args': request[b'args'],
1166 1172 'redirect': request.get(b'redirect'),
1167 1173 'data': entry['data'].getvalue() if entry['data'] else None,
1168 1174 }
1169 1175
1170 1176 def _makewantframeresult(self):
1171 1177 return 'wantframe', {
1172 1178 'state': self._state,
1173 1179 }
1174 1180
1175 1181 def _validatecommandrequestframe(self, frame):
1176 1182 new = frame.flags & FLAG_COMMAND_REQUEST_NEW
1177 1183 continuation = frame.flags & FLAG_COMMAND_REQUEST_CONTINUATION
1178 1184
1179 1185 if new and continuation:
1180 1186 self._state = 'errored'
1181 1187 return self._makeerrorresult(
1182 1188 _('received command request frame with both new and '
1183 1189 'continuation flags set'))
1184 1190
1185 1191 if not new and not continuation:
1186 1192 self._state = 'errored'
1187 1193 return self._makeerrorresult(
1188 1194 _('received command request frame with neither new nor '
1189 1195 'continuation flags set'))
1190 1196
1191 1197 def _onframeinitial(self, frame):
1192 1198 # Called when we receive a frame when in the "initial" state.
1193 1199 if frame.typeid == FRAME_TYPE_SENDER_PROTOCOL_SETTINGS:
1194 1200 self._state = 'protocol-settings-receiving'
1195 1201 self._protocolsettingsdecoder = cborutil.bufferingdecoder()
1196 1202 return self._onframeprotocolsettings(frame)
1197 1203
1198 1204 elif frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
1199 1205 self._state = 'idle'
1200 1206 return self._onframeidle(frame)
1201 1207
1202 1208 else:
1203 1209 self._state = 'errored'
1204 1210 return self._makeerrorresult(
1205 1211 _('expected sender protocol settings or command request '
1206 1212 'frame; got %d') % frame.typeid)
1207 1213
1208 1214 def _onframeprotocolsettings(self, frame):
1209 1215 assert self._state == 'protocol-settings-receiving'
1210 1216 assert self._protocolsettingsdecoder is not None
1211 1217
1212 1218 if frame.typeid != FRAME_TYPE_SENDER_PROTOCOL_SETTINGS:
1213 1219 self._state = 'errored'
1214 1220 return self._makeerrorresult(
1215 1221 _('expected sender protocol settings frame; got %d') %
1216 1222 frame.typeid)
1217 1223
1218 1224 more = frame.flags & FLAG_SENDER_PROTOCOL_SETTINGS_CONTINUATION
1219 1225 eos = frame.flags & FLAG_SENDER_PROTOCOL_SETTINGS_EOS
1220 1226
1221 1227 if more and eos:
1222 1228 self._state = 'errored'
1223 1229 return self._makeerrorresult(
1224 1230 _('sender protocol settings frame cannot have both '
1225 1231 'continuation and end of stream flags set'))
1226 1232
1227 1233 if not more and not eos:
1228 1234 self._state = 'errored'
1229 1235 return self._makeerrorresult(
1230 1236 _('sender protocol settings frame must have continuation or '
1231 1237 'end of stream flag set'))
1232 1238
1233 1239 # TODO establish limits for maximum amount of data that can be
1234 1240 # buffered.
1235 1241 try:
1236 1242 self._protocolsettingsdecoder.decode(frame.payload)
1237 1243 except Exception as e:
1238 1244 self._state = 'errored'
1239 1245 return self._makeerrorresult(
1240 1246 _('error decoding CBOR from sender protocol settings frame: %s')
1241 1247 % stringutil.forcebytestr(e))
1242 1248
1243 1249 if more:
1244 1250 return self._makewantframeresult()
1245 1251
1246 1252 assert eos
1247 1253
1248 1254 decoded = self._protocolsettingsdecoder.getavailable()
1249 1255 self._protocolsettingsdecoder = None
1250 1256
1251 1257 if not decoded:
1252 1258 self._state = 'errored'
1253 1259 return self._makeerrorresult(
1254 1260 _('sender protocol settings frame did not contain CBOR data'))
1255 1261 elif len(decoded) > 1:
1256 1262 self._state = 'errored'
1257 1263 return self._makeerrorresult(
1258 1264 _('sender protocol settings frame contained multiple CBOR '
1259 1265 'values'))
1260 1266
1261 1267 d = decoded[0]
1262 1268
1263 1269 if b'contentencodings' in d:
1264 1270 self._sendersettings['contentencodings'] = d[b'contentencodings']
1265 1271
1266 1272 self._state = 'idle'
1267 1273
1268 1274 return self._makewantframeresult()
1269 1275
1270 1276 def _onframeidle(self, frame):
1271 1277 # The only frame type that should be received in this state is a
1272 1278 # command request.
1273 1279 if frame.typeid != FRAME_TYPE_COMMAND_REQUEST:
1274 1280 self._state = 'errored'
1275 1281 return self._makeerrorresult(
1276 1282 _('expected command request frame; got %d') % frame.typeid)
1277 1283
1278 1284 res = self._validatecommandrequestframe(frame)
1279 1285 if res:
1280 1286 return res
1281 1287
1282 1288 if frame.requestid in self._receivingcommands:
1283 1289 self._state = 'errored'
1284 1290 return self._makeerrorresult(
1285 1291 _('request with ID %d already received') % frame.requestid)
1286 1292
1287 1293 if frame.requestid in self._activecommands:
1288 1294 self._state = 'errored'
1289 1295 return self._makeerrorresult(
1290 1296 _('request with ID %d is already active') % frame.requestid)
1291 1297
1292 1298 new = frame.flags & FLAG_COMMAND_REQUEST_NEW
1293 1299 moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES
1294 1300 expectingdata = frame.flags & FLAG_COMMAND_REQUEST_EXPECT_DATA
1295 1301
1296 1302 if not new:
1297 1303 self._state = 'errored'
1298 1304 return self._makeerrorresult(
1299 1305 _('received command request frame without new flag set'))
1300 1306
1301 1307 payload = util.bytesio()
1302 1308 payload.write(frame.payload)
1303 1309
1304 1310 self._receivingcommands[frame.requestid] = {
1305 1311 'payload': payload,
1306 1312 'data': None,
1307 1313 'requestdone': not moreframes,
1308 1314 'expectingdata': bool(expectingdata),
1309 1315 }
1310 1316
1311 1317 # This is the final frame for this request. Dispatch it.
1312 1318 if not moreframes and not expectingdata:
1313 1319 return self._makeruncommandresult(frame.requestid)
1314 1320
1315 1321 assert moreframes or expectingdata
1316 1322 self._state = 'command-receiving'
1317 1323 return self._makewantframeresult()
1318 1324
1319 1325 def _onframecommandreceiving(self, frame):
1320 1326 if frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
1321 1327 # Process new command requests as such.
1322 1328 if frame.flags & FLAG_COMMAND_REQUEST_NEW:
1323 1329 return self._onframeidle(frame)
1324 1330
1325 1331 res = self._validatecommandrequestframe(frame)
1326 1332 if res:
1327 1333 return res
1328 1334
1329 1335 # All other frames should be related to a command that is currently
1330 1336 # receiving but is not active.
1331 1337 if frame.requestid in self._activecommands:
1332 1338 self._state = 'errored'
1333 1339 return self._makeerrorresult(
1334 1340 _('received frame for request that is still active: %d') %
1335 1341 frame.requestid)
1336 1342
1337 1343 if frame.requestid not in self._receivingcommands:
1338 1344 self._state = 'errored'
1339 1345 return self._makeerrorresult(
1340 1346 _('received frame for request that is not receiving: %d') %
1341 1347 frame.requestid)
1342 1348
1343 1349 entry = self._receivingcommands[frame.requestid]
1344 1350
1345 1351 if frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
1346 1352 moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES
1347 1353 expectingdata = bool(frame.flags & FLAG_COMMAND_REQUEST_EXPECT_DATA)
1348 1354
1349 1355 if entry['requestdone']:
1350 1356 self._state = 'errored'
1351 1357 return self._makeerrorresult(
1352 1358 _('received command request frame when request frames '
1353 1359 'were supposedly done'))
1354 1360
1355 1361 if expectingdata != entry['expectingdata']:
1356 1362 self._state = 'errored'
1357 1363 return self._makeerrorresult(
1358 1364 _('mismatch between expect data flag and previous frame'))
1359 1365
1360 1366 entry['payload'].write(frame.payload)
1361 1367
1362 1368 if not moreframes:
1363 1369 entry['requestdone'] = True
1364 1370
1365 1371 if not moreframes and not expectingdata:
1366 1372 return self._makeruncommandresult(frame.requestid)
1367 1373
1368 1374 return self._makewantframeresult()
1369 1375
1370 1376 elif frame.typeid == FRAME_TYPE_COMMAND_DATA:
1371 1377 if not entry['expectingdata']:
1372 1378 self._state = 'errored'
1373 1379 return self._makeerrorresult(_(
1374 1380 'received command data frame for request that is not '
1375 1381 'expecting data: %d') % frame.requestid)
1376 1382
1377 1383 if entry['data'] is None:
1378 1384 entry['data'] = util.bytesio()
1379 1385
1380 1386 return self._handlecommanddataframe(frame, entry)
1381 1387 else:
1382 1388 self._state = 'errored'
1383 1389 return self._makeerrorresult(_(
1384 1390 'received unexpected frame type: %d') % frame.typeid)
1385 1391
1386 1392 def _handlecommanddataframe(self, frame, entry):
1387 1393 assert frame.typeid == FRAME_TYPE_COMMAND_DATA
1388 1394
1389 1395 # TODO support streaming data instead of buffering it.
1390 1396 entry['data'].write(frame.payload)
1391 1397
1392 1398 if frame.flags & FLAG_COMMAND_DATA_CONTINUATION:
1393 1399 return self._makewantframeresult()
1394 1400 elif frame.flags & FLAG_COMMAND_DATA_EOS:
1395 1401 entry['data'].seek(0)
1396 1402 return self._makeruncommandresult(frame.requestid)
1397 1403 else:
1398 1404 self._state = 'errored'
1399 1405 return self._makeerrorresult(_('command data frame without '
1400 1406 'flags'))
1401 1407
1402 1408 def _onframeerrored(self, frame):
1403 1409 return self._makeerrorresult(_('server already errored'))
1404 1410
1405 1411 class commandrequest(object):
1406 1412 """Represents a request to run a command."""
1407 1413
1408 1414 def __init__(self, requestid, name, args, datafh=None, redirect=None):
1409 1415 self.requestid = requestid
1410 1416 self.name = name
1411 1417 self.args = args
1412 1418 self.datafh = datafh
1413 1419 self.redirect = redirect
1414 1420 self.state = 'pending'
1415 1421
1416 1422 class clientreactor(object):
1417 1423 """Holds state of a client issuing frame-based protocol requests.
1418 1424
1419 1425 This is like ``serverreactor`` but for client-side state.
1420 1426
1421 1427 Each instance is bound to the lifetime of a connection. For persistent
1422 1428 connection transports using e.g. TCP sockets and speaking the raw
1423 1429 framing protocol, there will be a single instance for the lifetime of
1424 1430 the TCP socket. For transports where there are multiple discrete
1425 1431 interactions (say tunneled within in HTTP request), there will be a
1426 1432 separate instance for each distinct interaction.
1427 1433
1428 1434 Consumers are expected to tell instances when events occur by calling
1429 1435 various methods. These methods return a 2-tuple describing any follow-up
1430 1436 action(s) to take. The first element is the name of an action to
1431 1437 perform. The second is a data structure (usually a dict) specific to
1432 1438 that action that contains more information. e.g. if the reactor wants
1433 1439 to send frames to the server, the data structure will contain a reference
1434 1440 to those frames.
1435 1441
1436 1442 Valid actions that consumers can be instructed to take are:
1437 1443
1438 1444 noop
1439 1445 Indicates no additional action is required.
1440 1446
1441 1447 sendframes
1442 1448 Indicates that frames should be sent to the server. The ``framegen``
1443 1449 key contains a generator of frames that should be sent. The reactor
1444 1450 assumes that all frames in this generator are sent to the server.
1445 1451
1446 1452 error
1447 1453 Indicates that an error occurred. The ``message`` key contains an
1448 1454 error message describing the failure.
1449 1455
1450 1456 responsedata
1451 1457 Indicates a response to a previously-issued command was received.
1452 1458
1453 1459 The ``request`` key contains the ``commandrequest`` instance that
1454 1460 represents the request this data is for.
1455 1461
1456 1462 The ``data`` key contains the decoded data from the server.
1457 1463
1458 1464 ``expectmore`` and ``eos`` evaluate to True when more response data
1459 1465 is expected to follow or we're at the end of the response stream,
1460 1466 respectively.
1461 1467 """
1462 1468 def __init__(self, ui, hasmultiplesend=False, buffersends=True,
1463 1469 clientcontentencoders=None):
1464 1470 """Create a new instance.
1465 1471
1466 1472 ``hasmultiplesend`` indicates whether multiple sends are supported
1467 1473 by the transport. When True, it is possible to send commands immediately
1468 1474 instead of buffering until the caller signals an intent to finish a
1469 1475 send operation.
1470 1476
1471 1477 ``buffercommands`` indicates whether sends should be buffered until the
1472 1478 last request has been issued.
1473 1479
1474 1480 ``clientcontentencoders`` is an iterable of content encoders the client
1475 1481 will advertise to the server and that the server can use for encoding
1476 1482 data. If not defined, the client will not advertise content encoders
1477 1483 to the server.
1478 1484 """
1479 1485 self._ui = ui
1480 1486 self._hasmultiplesend = hasmultiplesend
1481 1487 self._buffersends = buffersends
1482 1488 self._clientcontentencoders = clientcontentencoders
1483 1489
1484 1490 self._canissuecommands = True
1485 1491 self._cansend = True
1486 1492 self._protocolsettingssent = False
1487 1493
1488 1494 self._nextrequestid = 1
1489 1495 # We only support a single outgoing stream for now.
1490 1496 self._outgoingstream = outputstream(1)
1491 1497 self._pendingrequests = collections.deque()
1492 1498 self._activerequests = {}
1493 1499 self._incomingstreams = {}
1494 1500 self._streamsettingsdecoders = {}
1495 1501
1496 1502 populatestreamencoders()
1497 1503
1498 1504 def callcommand(self, name, args, datafh=None, redirect=None):
1499 1505 """Request that a command be executed.
1500 1506
1501 1507 Receives the command name, a dict of arguments to pass to the command,
1502 1508 and an optional file object containing the raw data for the command.
1503 1509
1504 1510 Returns a 3-tuple of (request, action, action data).
1505 1511 """
1506 1512 if not self._canissuecommands:
1507 1513 raise error.ProgrammingError('cannot issue new commands')
1508 1514
1509 1515 requestid = self._nextrequestid
1510 1516 self._nextrequestid += 2
1511 1517
1512 1518 request = commandrequest(requestid, name, args, datafh=datafh,
1513 1519 redirect=redirect)
1514 1520
1515 1521 if self._buffersends:
1516 1522 self._pendingrequests.append(request)
1517 1523 return request, 'noop', {}
1518 1524 else:
1519 1525 if not self._cansend:
1520 1526 raise error.ProgrammingError('sends cannot be performed on '
1521 1527 'this instance')
1522 1528
1523 1529 if not self._hasmultiplesend:
1524 1530 self._cansend = False
1525 1531 self._canissuecommands = False
1526 1532
1527 1533 return request, 'sendframes', {
1528 1534 'framegen': self._makecommandframes(request),
1529 1535 }
1530 1536
1531 1537 def flushcommands(self):
1532 1538 """Request that all queued commands be sent.
1533 1539
1534 1540 If any commands are buffered, this will instruct the caller to send
1535 1541 them over the wire. If no commands are buffered it instructs the client
1536 1542 to no-op.
1537 1543
1538 1544 If instances aren't configured for multiple sends, no new command
1539 1545 requests are allowed after this is called.
1540 1546 """
1541 1547 if not self._pendingrequests:
1542 1548 return 'noop', {}
1543 1549
1544 1550 if not self._cansend:
1545 1551 raise error.ProgrammingError('sends cannot be performed on this '
1546 1552 'instance')
1547 1553
1548 1554 # If the instance only allows sending once, mark that we have fired
1549 1555 # our one shot.
1550 1556 if not self._hasmultiplesend:
1551 1557 self._canissuecommands = False
1552 1558 self._cansend = False
1553 1559
1554 1560 def makeframes():
1555 1561 while self._pendingrequests:
1556 1562 request = self._pendingrequests.popleft()
1557 1563 for frame in self._makecommandframes(request):
1558 1564 yield frame
1559 1565
1560 1566 return 'sendframes', {
1561 1567 'framegen': makeframes(),
1562 1568 }
1563 1569
1564 1570 def _makecommandframes(self, request):
1565 1571 """Emit frames to issue a command request.
1566 1572
1567 1573 As a side-effect, update request accounting to reflect its changed
1568 1574 state.
1569 1575 """
1570 1576 self._activerequests[request.requestid] = request
1571 1577 request.state = 'sending'
1572 1578
1573 1579 if not self._protocolsettingssent and self._clientcontentencoders:
1574 1580 self._protocolsettingssent = True
1575 1581
1576 1582 payload = b''.join(cborutil.streamencode({
1577 1583 b'contentencodings': self._clientcontentencoders,
1578 1584 }))
1579 1585
1580 1586 yield self._outgoingstream.makeframe(
1581 1587 requestid=request.requestid,
1582 1588 typeid=FRAME_TYPE_SENDER_PROTOCOL_SETTINGS,
1583 1589 flags=FLAG_SENDER_PROTOCOL_SETTINGS_EOS,
1584 1590 payload=payload)
1585 1591
1586 1592 res = createcommandframes(self._outgoingstream,
1587 1593 request.requestid,
1588 1594 request.name,
1589 1595 request.args,
1590 1596 datafh=request.datafh,
1591 1597 redirect=request.redirect)
1592 1598
1593 1599 for frame in res:
1594 1600 yield frame
1595 1601
1596 1602 request.state = 'sent'
1597 1603
1598 1604 def onframerecv(self, frame):
1599 1605 """Process a frame that has been received off the wire.
1600 1606
1601 1607 Returns a 2-tuple of (action, meta) describing further action the
1602 1608 caller needs to take as a result of receiving this frame.
1603 1609 """
1604 1610 if frame.streamid % 2:
1605 1611 return 'error', {
1606 1612 'message': (
1607 1613 _('received frame with odd numbered stream ID: %d') %
1608 1614 frame.streamid),
1609 1615 }
1610 1616
1611 1617 if frame.streamid not in self._incomingstreams:
1612 1618 if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM:
1613 1619 return 'error', {
1614 1620 'message': _('received frame on unknown stream '
1615 1621 'without beginning of stream flag set'),
1616 1622 }
1617 1623
1618 1624 self._incomingstreams[frame.streamid] = inputstream(
1619 1625 frame.streamid)
1620 1626
1621 1627 stream = self._incomingstreams[frame.streamid]
1622 1628
1623 1629 # If the payload is encoded, ask the stream to decode it. We
1624 1630 # merely substitute the decoded result into the frame payload as
1625 1631 # if it had been transferred all along.
1626 1632 if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED:
1627 1633 frame.payload = stream.decode(frame.payload)
1628 1634
1629 1635 if frame.streamflags & STREAM_FLAG_END_STREAM:
1630 1636 del self._incomingstreams[frame.streamid]
1631 1637
1632 1638 if frame.typeid == FRAME_TYPE_STREAM_SETTINGS:
1633 1639 return self._onstreamsettingsframe(frame)
1634 1640
1635 1641 if frame.requestid not in self._activerequests:
1636 1642 return 'error', {
1637 1643 'message': (_('received frame for inactive request ID: %d') %
1638 1644 frame.requestid),
1639 1645 }
1640 1646
1641 1647 request = self._activerequests[frame.requestid]
1642 1648 request.state = 'receiving'
1643 1649
1644 1650 handlers = {
1645 1651 FRAME_TYPE_COMMAND_RESPONSE: self._oncommandresponseframe,
1646 1652 FRAME_TYPE_ERROR_RESPONSE: self._onerrorresponseframe,
1647 1653 }
1648 1654
1649 1655 meth = handlers.get(frame.typeid)
1650 1656 if not meth:
1651 1657 raise error.ProgrammingError('unhandled frame type: %d' %
1652 1658 frame.typeid)
1653 1659
1654 1660 return meth(request, frame)
1655 1661
1656 1662 def _onstreamsettingsframe(self, frame):
1657 1663 assert frame.typeid == FRAME_TYPE_STREAM_SETTINGS
1658 1664
1659 1665 more = frame.flags & FLAG_STREAM_ENCODING_SETTINGS_CONTINUATION
1660 1666 eos = frame.flags & FLAG_STREAM_ENCODING_SETTINGS_EOS
1661 1667
1662 1668 if more and eos:
1663 1669 return 'error', {
1664 1670 'message': (_('stream encoding settings frame cannot have both '
1665 1671 'continuation and end of stream flags set')),
1666 1672 }
1667 1673
1668 1674 if not more and not eos:
1669 1675 return 'error', {
1670 1676 'message': _('stream encoding settings frame must have '
1671 1677 'continuation or end of stream flag set'),
1672 1678 }
1673 1679
1674 1680 if frame.streamid not in self._streamsettingsdecoders:
1675 1681 decoder = cborutil.bufferingdecoder()
1676 1682 self._streamsettingsdecoders[frame.streamid] = decoder
1677 1683
1678 1684 decoder = self._streamsettingsdecoders[frame.streamid]
1679 1685
1680 1686 try:
1681 1687 decoder.decode(frame.payload)
1682 1688 except Exception as e:
1683 1689 return 'error', {
1684 1690 'message': (_('error decoding CBOR from stream encoding '
1685 1691 'settings frame: %s') %
1686 1692 stringutil.forcebytestr(e)),
1687 1693 }
1688 1694
1689 1695 if more:
1690 1696 return 'noop', {}
1691 1697
1692 1698 assert eos
1693 1699
1694 1700 decoded = decoder.getavailable()
1695 1701 del self._streamsettingsdecoders[frame.streamid]
1696 1702
1697 1703 if not decoded:
1698 1704 return 'error', {
1699 1705 'message': _('stream encoding settings frame did not contain '
1700 1706 'CBOR data'),
1701 1707 }
1702 1708
1703 1709 try:
1704 1710 self._incomingstreams[frame.streamid].setdecoder(self._ui,
1705 1711 decoded[0],
1706 1712 decoded[1:])
1707 1713 except Exception as e:
1708 1714 return 'error', {
1709 1715 'message': (_('error setting stream decoder: %s') %
1710 1716 stringutil.forcebytestr(e)),
1711 1717 }
1712 1718
1713 1719 return 'noop', {}
1714 1720
1715 1721 def _oncommandresponseframe(self, request, frame):
1716 1722 if frame.flags & FLAG_COMMAND_RESPONSE_EOS:
1717 1723 request.state = 'received'
1718 1724 del self._activerequests[request.requestid]
1719 1725
1720 1726 return 'responsedata', {
1721 1727 'request': request,
1722 1728 'expectmore': frame.flags & FLAG_COMMAND_RESPONSE_CONTINUATION,
1723 1729 'eos': frame.flags & FLAG_COMMAND_RESPONSE_EOS,
1724 1730 'data': frame.payload,
1725 1731 }
1726 1732
1727 1733 def _onerrorresponseframe(self, request, frame):
1728 1734 request.state = 'errored'
1729 1735 del self._activerequests[request.requestid]
1730 1736
1731 1737 # The payload should be a CBOR map.
1732 1738 m = cborutil.decodeall(frame.payload)[0]
1733 1739
1734 1740 return 'error', {
1735 1741 'request': request,
1736 1742 'type': m['type'],
1737 1743 'message': m['message'],
1738 1744 }
@@ -1,629 +1,628 b''
1 1 from __future__ import absolute_import, print_function
2 2
3 3 import unittest
4 4
5 5 from mercurial.thirdparty import (
6 6 cbor,
7 7 )
8 8 from mercurial import (
9 9 ui as uimod,
10 10 util,
11 11 wireprotoframing as framing,
12 12 )
13 13 from mercurial.utils import (
14 14 cborutil,
15 15 )
16 16
17 17 ffs = framing.makeframefromhumanstring
18 18
19 19 OK = cbor.dumps({b'status': b'ok'})
20 20
21 21 def makereactor(deferoutput=False):
22 22 ui = uimod.ui()
23 23 return framing.serverreactor(ui, deferoutput=deferoutput)
24 24
25 25 def sendframes(reactor, gen):
26 26 """Send a generator of frame bytearray to a reactor.
27 27
28 28 Emits a generator of results from ``onframerecv()`` calls.
29 29 """
30 30 for frame in gen:
31 31 header = framing.parseheader(frame)
32 32 payload = frame[framing.FRAME_HEADER_SIZE:]
33 33 assert len(payload) == header.length
34 34
35 35 yield reactor.onframerecv(framing.frame(header.requestid,
36 36 header.streamid,
37 37 header.streamflags,
38 38 header.typeid,
39 39 header.flags,
40 40 payload))
41 41
42 42 def sendcommandframes(reactor, stream, rid, cmd, args, datafh=None):
43 43 """Generate frames to run a command and send them to a reactor."""
44 44 return sendframes(reactor,
45 45 framing.createcommandframes(stream, rid, cmd, args,
46 46 datafh))
47 47
48 48
49 49 class ServerReactorTests(unittest.TestCase):
50 50 def _sendsingleframe(self, reactor, f):
51 51 results = list(sendframes(reactor, [f]))
52 52 self.assertEqual(len(results), 1)
53 53
54 54 return results[0]
55 55
56 56 def assertaction(self, res, expected):
57 57 self.assertIsInstance(res, tuple)
58 58 self.assertEqual(len(res), 2)
59 59 self.assertIsInstance(res[1], dict)
60 60 self.assertEqual(res[0], expected)
61 61
62 62 def assertframesequal(self, frames, framestrings):
63 63 expected = [ffs(s) for s in framestrings]
64 64 self.assertEqual(list(frames), expected)
65 65
66 66 def test1framecommand(self):
67 67 """Receiving a command in a single frame yields request to run it."""
68 68 reactor = makereactor()
69 69 stream = framing.stream(1)
70 70 results = list(sendcommandframes(reactor, stream, 1, b'mycommand', {}))
71 71 self.assertEqual(len(results), 1)
72 72 self.assertaction(results[0], b'runcommand')
73 73 self.assertEqual(results[0][1], {
74 74 b'requestid': 1,
75 75 b'command': b'mycommand',
76 76 b'args': {},
77 77 b'redirect': None,
78 78 b'data': None,
79 79 })
80 80
81 81 result = reactor.oninputeof()
82 82 self.assertaction(result, b'noop')
83 83
84 84 def test1argument(self):
85 85 reactor = makereactor()
86 86 stream = framing.stream(1)
87 87 results = list(sendcommandframes(reactor, stream, 41, b'mycommand',
88 88 {b'foo': b'bar'}))
89 89 self.assertEqual(len(results), 1)
90 90 self.assertaction(results[0], b'runcommand')
91 91 self.assertEqual(results[0][1], {
92 92 b'requestid': 41,
93 93 b'command': b'mycommand',
94 94 b'args': {b'foo': b'bar'},
95 95 b'redirect': None,
96 96 b'data': None,
97 97 })
98 98
99 99 def testmultiarguments(self):
100 100 reactor = makereactor()
101 101 stream = framing.stream(1)
102 102 results = list(sendcommandframes(reactor, stream, 1, b'mycommand',
103 103 {b'foo': b'bar', b'biz': b'baz'}))
104 104 self.assertEqual(len(results), 1)
105 105 self.assertaction(results[0], b'runcommand')
106 106 self.assertEqual(results[0][1], {
107 107 b'requestid': 1,
108 108 b'command': b'mycommand',
109 109 b'args': {b'foo': b'bar', b'biz': b'baz'},
110 110 b'redirect': None,
111 111 b'data': None,
112 112 })
113 113
114 114 def testsimplecommanddata(self):
115 115 reactor = makereactor()
116 116 stream = framing.stream(1)
117 117 results = list(sendcommandframes(reactor, stream, 1, b'mycommand', {},
118 118 util.bytesio(b'data!')))
119 119 self.assertEqual(len(results), 2)
120 120 self.assertaction(results[0], b'wantframe')
121 121 self.assertaction(results[1], b'runcommand')
122 122 self.assertEqual(results[1][1], {
123 123 b'requestid': 1,
124 124 b'command': b'mycommand',
125 125 b'args': {},
126 126 b'redirect': None,
127 127 b'data': b'data!',
128 128 })
129 129
130 130 def testmultipledataframes(self):
131 131 frames = [
132 132 ffs(b'1 1 stream-begin command-request new|have-data '
133 133 b"cbor:{b'name': b'mycommand'}"),
134 134 ffs(b'1 1 0 command-data continuation data1'),
135 135 ffs(b'1 1 0 command-data continuation data2'),
136 136 ffs(b'1 1 0 command-data eos data3'),
137 137 ]
138 138
139 139 reactor = makereactor()
140 140 results = list(sendframes(reactor, frames))
141 141 self.assertEqual(len(results), 4)
142 142 for i in range(3):
143 143 self.assertaction(results[i], b'wantframe')
144 144 self.assertaction(results[3], b'runcommand')
145 145 self.assertEqual(results[3][1], {
146 146 b'requestid': 1,
147 147 b'command': b'mycommand',
148 148 b'args': {},
149 149 b'redirect': None,
150 150 b'data': b'data1data2data3',
151 151 })
152 152
153 153 def testargumentanddata(self):
154 154 frames = [
155 155 ffs(b'1 1 stream-begin command-request new|have-data '
156 156 b"cbor:{b'name': b'command', b'args': {b'key': b'val',"
157 157 b"b'foo': b'bar'}}"),
158 158 ffs(b'1 1 0 command-data continuation value1'),
159 159 ffs(b'1 1 0 command-data eos value2'),
160 160 ]
161 161
162 162 reactor = makereactor()
163 163 results = list(sendframes(reactor, frames))
164 164
165 165 self.assertaction(results[-1], b'runcommand')
166 166 self.assertEqual(results[-1][1], {
167 167 b'requestid': 1,
168 168 b'command': b'command',
169 169 b'args': {
170 170 b'key': b'val',
171 171 b'foo': b'bar',
172 172 },
173 173 b'redirect': None,
174 174 b'data': b'value1value2',
175 175 })
176 176
177 177 def testnewandcontinuation(self):
178 178 result = self._sendsingleframe(makereactor(),
179 179 ffs(b'1 1 stream-begin command-request new|continuation '))
180 180 self.assertaction(result, b'error')
181 181 self.assertEqual(result[1], {
182 182 b'message': b'received command request frame with both new and '
183 183 b'continuation flags set',
184 184 })
185 185
186 186 def testneithernewnorcontinuation(self):
187 187 result = self._sendsingleframe(makereactor(),
188 188 ffs(b'1 1 stream-begin command-request 0 '))
189 189 self.assertaction(result, b'error')
190 190 self.assertEqual(result[1], {
191 191 b'message': b'received command request frame with neither new nor '
192 192 b'continuation flags set',
193 193 })
194 194
195 195 def testunexpectedcommanddata(self):
196 196 """Command data frame when not running a command is an error."""
197 197 result = self._sendsingleframe(makereactor(),
198 198 ffs(b'1 1 stream-begin command-data 0 ignored'))
199 199 self.assertaction(result, b'error')
200 200 self.assertEqual(result[1], {
201 201 b'message': b'expected sender protocol settings or command request '
202 202 b'frame; got 2',
203 203 })
204 204
205 205 def testunexpectedcommanddatareceiving(self):
206 206 """Same as above except the command is receiving."""
207 207 results = list(sendframes(makereactor(), [
208 208 ffs(b'1 1 stream-begin command-request new|more '
209 209 b"cbor:{b'name': b'ignored'}"),
210 210 ffs(b'1 1 0 command-data eos ignored'),
211 211 ]))
212 212
213 213 self.assertaction(results[0], b'wantframe')
214 214 self.assertaction(results[1], b'error')
215 215 self.assertEqual(results[1][1], {
216 216 b'message': b'received command data frame for request that is not '
217 217 b'expecting data: 1',
218 218 })
219 219
220 220 def testconflictingrequestidallowed(self):
221 221 """Multiple fully serviced commands with same request ID is allowed."""
222 222 reactor = makereactor()
223 223 results = []
224 224 outstream = reactor.makeoutputstream()
225 225 results.append(self._sendsingleframe(
226 226 reactor, ffs(b'1 1 stream-begin command-request new '
227 227 b"cbor:{b'name': b'command'}")))
228 228 result = reactor.oncommandresponsereadyobjects(
229 229 outstream, 1, [b'response1'])
230 230 self.assertaction(result, b'sendframes')
231 231 list(result[1][b'framegen'])
232 232 results.append(self._sendsingleframe(
233 233 reactor, ffs(b'1 1 stream-begin command-request new '
234 234 b"cbor:{b'name': b'command'}")))
235 235 result = reactor.oncommandresponsereadyobjects(
236 236 outstream, 1, [b'response2'])
237 237 self.assertaction(result, b'sendframes')
238 238 list(result[1][b'framegen'])
239 239 results.append(self._sendsingleframe(
240 240 reactor, ffs(b'1 1 stream-begin command-request new '
241 241 b"cbor:{b'name': b'command'}")))
242 242 result = reactor.oncommandresponsereadyobjects(
243 243 outstream, 1, [b'response3'])
244 244 self.assertaction(result, b'sendframes')
245 245 list(result[1][b'framegen'])
246 246
247 247 for i in range(3):
248 248 self.assertaction(results[i], b'runcommand')
249 249 self.assertEqual(results[i][1], {
250 250 b'requestid': 1,
251 251 b'command': b'command',
252 252 b'args': {},
253 253 b'redirect': None,
254 254 b'data': None,
255 255 })
256 256
257 257 def testconflictingrequestid(self):
258 258 """Request ID for new command matching in-flight command is illegal."""
259 259 results = list(sendframes(makereactor(), [
260 260 ffs(b'1 1 stream-begin command-request new|more '
261 261 b"cbor:{b'name': b'command'}"),
262 262 ffs(b'1 1 0 command-request new '
263 263 b"cbor:{b'name': b'command1'}"),
264 264 ]))
265 265
266 266 self.assertaction(results[0], b'wantframe')
267 267 self.assertaction(results[1], b'error')
268 268 self.assertEqual(results[1][1], {
269 269 b'message': b'request with ID 1 already received',
270 270 })
271 271
272 272 def testinterleavedcommands(self):
273 273 cbor1 = cbor.dumps({
274 274 b'name': b'command1',
275 275 b'args': {
276 276 b'foo': b'bar',
277 277 b'key1': b'val',
278 278 }
279 279 }, canonical=True)
280 280 cbor3 = cbor.dumps({
281 281 b'name': b'command3',
282 282 b'args': {
283 283 b'biz': b'baz',
284 284 b'key': b'val',
285 285 },
286 286 }, canonical=True)
287 287
288 288 results = list(sendframes(makereactor(), [
289 289 ffs(b'1 1 stream-begin command-request new|more %s' % cbor1[0:6]),
290 290 ffs(b'3 1 0 command-request new|more %s' % cbor3[0:10]),
291 291 ffs(b'1 1 0 command-request continuation|more %s' % cbor1[6:9]),
292 292 ffs(b'3 1 0 command-request continuation|more %s' % cbor3[10:13]),
293 293 ffs(b'3 1 0 command-request continuation %s' % cbor3[13:]),
294 294 ffs(b'1 1 0 command-request continuation %s' % cbor1[9:]),
295 295 ]))
296 296
297 297 self.assertEqual([t[0] for t in results], [
298 298 b'wantframe',
299 299 b'wantframe',
300 300 b'wantframe',
301 301 b'wantframe',
302 302 b'runcommand',
303 303 b'runcommand',
304 304 ])
305 305
306 306 self.assertEqual(results[4][1], {
307 307 b'requestid': 3,
308 308 b'command': b'command3',
309 309 b'args': {b'biz': b'baz', b'key': b'val'},
310 310 b'redirect': None,
311 311 b'data': None,
312 312 })
313 313 self.assertEqual(results[5][1], {
314 314 b'requestid': 1,
315 315 b'command': b'command1',
316 316 b'args': {b'foo': b'bar', b'key1': b'val'},
317 317 b'redirect': None,
318 318 b'data': None,
319 319 })
320 320
321 321 def testmissingcommanddataframe(self):
322 322 # The reactor doesn't currently handle partially received commands.
323 323 # So this test is failing to do anything with request 1.
324 324 frames = [
325 325 ffs(b'1 1 stream-begin command-request new|have-data '
326 326 b"cbor:{b'name': b'command1'}"),
327 327 ffs(b'3 1 0 command-request new '
328 328 b"cbor:{b'name': b'command2'}"),
329 329 ]
330 330 results = list(sendframes(makereactor(), frames))
331 331 self.assertEqual(len(results), 2)
332 332 self.assertaction(results[0], b'wantframe')
333 333 self.assertaction(results[1], b'runcommand')
334 334
335 335 def testmissingcommanddataframeflags(self):
336 336 frames = [
337 337 ffs(b'1 1 stream-begin command-request new|have-data '
338 338 b"cbor:{b'name': b'command1'}"),
339 339 ffs(b'1 1 0 command-data 0 data'),
340 340 ]
341 341 results = list(sendframes(makereactor(), frames))
342 342 self.assertEqual(len(results), 2)
343 343 self.assertaction(results[0], b'wantframe')
344 344 self.assertaction(results[1], b'error')
345 345 self.assertEqual(results[1][1], {
346 346 b'message': b'command data frame without flags',
347 347 })
348 348
349 349 def testframefornonreceivingrequest(self):
350 350 """Receiving a frame for a command that is not receiving is illegal."""
351 351 results = list(sendframes(makereactor(), [
352 352 ffs(b'1 1 stream-begin command-request new '
353 353 b"cbor:{b'name': b'command1'}"),
354 354 ffs(b'3 1 0 command-request new|have-data '
355 355 b"cbor:{b'name': b'command3'}"),
356 356 ffs(b'5 1 0 command-data eos ignored'),
357 357 ]))
358 358 self.assertaction(results[2], b'error')
359 359 self.assertEqual(results[2][1], {
360 360 b'message': b'received frame for request that is not receiving: 5',
361 361 })
362 362
363 363 def testsimpleresponse(self):
364 364 """Bytes response to command sends result frames."""
365 365 reactor = makereactor()
366 366 instream = framing.stream(1)
367 367 list(sendcommandframes(reactor, instream, 1, b'mycommand', {}))
368 368
369 369 outstream = reactor.makeoutputstream()
370 370 result = reactor.oncommandresponsereadyobjects(
371 371 outstream, 1, [b'response'])
372 372 self.assertaction(result, b'sendframes')
373 373 self.assertframesequal(result[1][b'framegen'], [
374 374 b'1 2 stream-begin command-response continuation %s' % OK,
375 375 b'1 2 0 command-response continuation cbor:b"response"',
376 376 b'1 2 0 command-response eos ',
377 377 ])
378 378
379 379 def testmultiframeresponse(self):
380 380 """Bytes response spanning multiple frames is handled."""
381 381 first = b'x' * framing.DEFAULT_MAX_FRAME_SIZE
382 382 second = b'y' * 100
383 383
384 384 reactor = makereactor()
385 385 instream = framing.stream(1)
386 386 list(sendcommandframes(reactor, instream, 1, b'mycommand', {}))
387 387
388 388 outstream = reactor.makeoutputstream()
389 389 result = reactor.oncommandresponsereadyobjects(
390 390 outstream, 1, [first + second])
391 391 self.assertaction(result, b'sendframes')
392 392 self.assertframesequal(result[1][b'framegen'], [
393 393 b'1 2 stream-begin command-response continuation %s' % OK,
394 394 b'1 2 0 command-response continuation Y\x80d',
395 395 b'1 2 0 command-response continuation %s' % first,
396 396 b'1 2 0 command-response continuation %s' % second,
397 b'1 2 0 command-response continuation ',
398 397 b'1 2 0 command-response eos '
399 398 ])
400 399
401 400 def testservererror(self):
402 401 reactor = makereactor()
403 402 instream = framing.stream(1)
404 403 list(sendcommandframes(reactor, instream, 1, b'mycommand', {}))
405 404
406 405 outstream = reactor.makeoutputstream()
407 406 result = reactor.onservererror(outstream, 1, b'some message')
408 407 self.assertaction(result, b'sendframes')
409 408 self.assertframesequal(result[1][b'framegen'], [
410 409 b"1 2 stream-begin error-response 0 "
411 410 b"cbor:{b'type': b'server', "
412 411 b"b'message': [{b'msg': b'some message'}]}",
413 412 ])
414 413
415 414 def test1commanddeferresponse(self):
416 415 """Responses when in deferred output mode are delayed until EOF."""
417 416 reactor = makereactor(deferoutput=True)
418 417 instream = framing.stream(1)
419 418 results = list(sendcommandframes(reactor, instream, 1, b'mycommand',
420 419 {}))
421 420 self.assertEqual(len(results), 1)
422 421 self.assertaction(results[0], b'runcommand')
423 422
424 423 outstream = reactor.makeoutputstream()
425 424 result = reactor.oncommandresponsereadyobjects(
426 425 outstream, 1, [b'response'])
427 426 self.assertaction(result, b'noop')
428 427 result = reactor.oninputeof()
429 428 self.assertaction(result, b'sendframes')
430 429 self.assertframesequal(result[1][b'framegen'], [
431 430 b'1 2 stream-begin command-response continuation %s' % OK,
432 431 b'1 2 0 command-response continuation cbor:b"response"',
433 432 b'1 2 0 command-response eos ',
434 433 ])
435 434
436 435 def testmultiplecommanddeferresponse(self):
437 436 reactor = makereactor(deferoutput=True)
438 437 instream = framing.stream(1)
439 438 list(sendcommandframes(reactor, instream, 1, b'command1', {}))
440 439 list(sendcommandframes(reactor, instream, 3, b'command2', {}))
441 440
442 441 outstream = reactor.makeoutputstream()
443 442 result = reactor.oncommandresponsereadyobjects(
444 443 outstream, 1, [b'response1'])
445 444 self.assertaction(result, b'noop')
446 445 result = reactor.oncommandresponsereadyobjects(
447 446 outstream, 3, [b'response2'])
448 447 self.assertaction(result, b'noop')
449 448 result = reactor.oninputeof()
450 449 self.assertaction(result, b'sendframes')
451 450 self.assertframesequal(result[1][b'framegen'], [
452 451 b'1 2 stream-begin command-response continuation %s' % OK,
453 452 b'1 2 0 command-response continuation cbor:b"response1"',
454 453 b'1 2 0 command-response eos ',
455 454 b'3 2 0 command-response continuation %s' % OK,
456 455 b'3 2 0 command-response continuation cbor:b"response2"',
457 456 b'3 2 0 command-response eos ',
458 457 ])
459 458
460 459 def testrequestidtracking(self):
461 460 reactor = makereactor(deferoutput=True)
462 461 instream = framing.stream(1)
463 462 list(sendcommandframes(reactor, instream, 1, b'command1', {}))
464 463 list(sendcommandframes(reactor, instream, 3, b'command2', {}))
465 464 list(sendcommandframes(reactor, instream, 5, b'command3', {}))
466 465
467 466 # Register results for commands out of order.
468 467 outstream = reactor.makeoutputstream()
469 468 reactor.oncommandresponsereadyobjects(outstream, 3, [b'response3'])
470 469 reactor.oncommandresponsereadyobjects(outstream, 1, [b'response1'])
471 470 reactor.oncommandresponsereadyobjects(outstream, 5, [b'response5'])
472 471
473 472 result = reactor.oninputeof()
474 473 self.assertaction(result, b'sendframes')
475 474 self.assertframesequal(result[1][b'framegen'], [
476 475 b'3 2 stream-begin command-response continuation %s' % OK,
477 476 b'3 2 0 command-response continuation cbor:b"response3"',
478 477 b'3 2 0 command-response eos ',
479 478 b'1 2 0 command-response continuation %s' % OK,
480 479 b'1 2 0 command-response continuation cbor:b"response1"',
481 480 b'1 2 0 command-response eos ',
482 481 b'5 2 0 command-response continuation %s' % OK,
483 482 b'5 2 0 command-response continuation cbor:b"response5"',
484 483 b'5 2 0 command-response eos ',
485 484 ])
486 485
487 486 def testduplicaterequestonactivecommand(self):
488 487 """Receiving a request ID that matches a request that isn't finished."""
489 488 reactor = makereactor()
490 489 stream = framing.stream(1)
491 490 list(sendcommandframes(reactor, stream, 1, b'command1', {}))
492 491 results = list(sendcommandframes(reactor, stream, 1, b'command1', {}))
493 492
494 493 self.assertaction(results[0], b'error')
495 494 self.assertEqual(results[0][1], {
496 495 b'message': b'request with ID 1 is already active',
497 496 })
498 497
499 498 def testduplicaterequestonactivecommandnosend(self):
500 499 """Same as above but we've registered a response but haven't sent it."""
501 500 reactor = makereactor()
502 501 instream = framing.stream(1)
503 502 list(sendcommandframes(reactor, instream, 1, b'command1', {}))
504 503 outstream = reactor.makeoutputstream()
505 504 reactor.oncommandresponsereadyobjects(outstream, 1, [b'response'])
506 505
507 506 # We've registered the response but haven't sent it. From the
508 507 # perspective of the reactor, the command is still active.
509 508
510 509 results = list(sendcommandframes(reactor, instream, 1, b'command1', {}))
511 510 self.assertaction(results[0], b'error')
512 511 self.assertEqual(results[0][1], {
513 512 b'message': b'request with ID 1 is already active',
514 513 })
515 514
516 515 def testduplicaterequestaftersend(self):
517 516 """We can use a duplicate request ID after we've sent the response."""
518 517 reactor = makereactor()
519 518 instream = framing.stream(1)
520 519 list(sendcommandframes(reactor, instream, 1, b'command1', {}))
521 520 outstream = reactor.makeoutputstream()
522 521 res = reactor.oncommandresponsereadyobjects(outstream, 1, [b'response'])
523 522 list(res[1][b'framegen'])
524 523
525 524 results = list(sendcommandframes(reactor, instream, 1, b'command1', {}))
526 525 self.assertaction(results[0], b'runcommand')
527 526
528 527 def testprotocolsettingsnoflags(self):
529 528 result = self._sendsingleframe(
530 529 makereactor(),
531 530 ffs(b'0 1 stream-begin sender-protocol-settings 0 '))
532 531 self.assertaction(result, b'error')
533 532 self.assertEqual(result[1], {
534 533 b'message': b'sender protocol settings frame must have '
535 534 b'continuation or end of stream flag set',
536 535 })
537 536
538 537 def testprotocolsettingsconflictflags(self):
539 538 result = self._sendsingleframe(
540 539 makereactor(),
541 540 ffs(b'0 1 stream-begin sender-protocol-settings continuation|eos '))
542 541 self.assertaction(result, b'error')
543 542 self.assertEqual(result[1], {
544 543 b'message': b'sender protocol settings frame cannot have both '
545 544 b'continuation and end of stream flags set',
546 545 })
547 546
548 547 def testprotocolsettingsemptypayload(self):
549 548 result = self._sendsingleframe(
550 549 makereactor(),
551 550 ffs(b'0 1 stream-begin sender-protocol-settings eos '))
552 551 self.assertaction(result, b'error')
553 552 self.assertEqual(result[1], {
554 553 b'message': b'sender protocol settings frame did not contain CBOR '
555 554 b'data',
556 555 })
557 556
558 557 def testprotocolsettingsmultipleobjects(self):
559 558 result = self._sendsingleframe(
560 559 makereactor(),
561 560 ffs(b'0 1 stream-begin sender-protocol-settings eos '
562 561 b'\x46foobar\x43foo'))
563 562 self.assertaction(result, b'error')
564 563 self.assertEqual(result[1], {
565 564 b'message': b'sender protocol settings frame contained multiple '
566 565 b'CBOR values',
567 566 })
568 567
569 568 def testprotocolsettingscontentencodings(self):
570 569 reactor = makereactor()
571 570
572 571 result = self._sendsingleframe(
573 572 reactor,
574 573 ffs(b'0 1 stream-begin sender-protocol-settings eos '
575 574 b'cbor:{b"contentencodings": [b"a", b"b"]}'))
576 575 self.assertaction(result, b'wantframe')
577 576
578 577 self.assertEqual(reactor._state, b'idle')
579 578 self.assertEqual(reactor._sendersettings[b'contentencodings'],
580 579 [b'a', b'b'])
581 580
582 581 def testprotocolsettingsmultipleframes(self):
583 582 reactor = makereactor()
584 583
585 584 data = b''.join(cborutil.streamencode({
586 585 b'contentencodings': [b'value1', b'value2'],
587 586 }))
588 587
589 588 results = list(sendframes(reactor, [
590 589 ffs(b'0 1 stream-begin sender-protocol-settings continuation %s' %
591 590 data[0:5]),
592 591 ffs(b'0 1 0 sender-protocol-settings eos %s' % data[5:]),
593 592 ]))
594 593
595 594 self.assertEqual(len(results), 2)
596 595
597 596 self.assertaction(results[0], b'wantframe')
598 597 self.assertaction(results[1], b'wantframe')
599 598
600 599 self.assertEqual(reactor._state, b'idle')
601 600 self.assertEqual(reactor._sendersettings[b'contentencodings'],
602 601 [b'value1', b'value2'])
603 602
604 603 def testprotocolsettingsbadcbor(self):
605 604 result = self._sendsingleframe(
606 605 makereactor(),
607 606 ffs(b'0 1 stream-begin sender-protocol-settings eos badvalue'))
608 607 self.assertaction(result, b'error')
609 608
610 609 def testprotocolsettingsnoninitial(self):
611 610 # Cannot have protocol settings frames as non-initial frames.
612 611 reactor = makereactor()
613 612
614 613 stream = framing.stream(1)
615 614 results = list(sendcommandframes(reactor, stream, 1, b'mycommand', {}))
616 615 self.assertEqual(len(results), 1)
617 616 self.assertaction(results[0], b'runcommand')
618 617
619 618 result = self._sendsingleframe(
620 619 reactor,
621 620 ffs(b'0 1 0 sender-protocol-settings eos '))
622 621 self.assertaction(result, b'error')
623 622 self.assertEqual(result[1], {
624 623 b'message': b'expected command request frame; got 8',
625 624 })
626 625
627 626 if __name__ == '__main__':
628 627 import silenttestrunner
629 628 silenttestrunner.main(__name__)
General Comments 0
You need to be logged in to leave comments. Login now