##// END OF EJS Templates
wireprotov2: remove functions for creating response frames from bytes...
Gregory Szorc -
r40170:966b5f7f default
parent child Browse files
Show More
@@ -1,1827 +1,1738
1 1 # wireprotoframing.py - unified framing protocol for wire protocol
2 2 #
3 3 # Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 # This file contains functionality to support the unified frame-based wire
9 9 # protocol. For details about the protocol, see
10 10 # `hg help internals.wireprotocol`.
11 11
12 12 from __future__ import absolute_import
13 13
14 14 import collections
15 15 import struct
16 16
17 17 from .i18n import _
18 18 from .thirdparty import (
19 19 attr,
20 20 )
21 21 from . import (
22 22 encoding,
23 23 error,
24 24 pycompat,
25 25 util,
26 26 wireprototypes,
27 27 )
28 28 from .utils import (
29 29 cborutil,
30 30 stringutil,
31 31 )
32 32
33 33 FRAME_HEADER_SIZE = 8
34 34 DEFAULT_MAX_FRAME_SIZE = 32768
35 35
36 36 STREAM_FLAG_BEGIN_STREAM = 0x01
37 37 STREAM_FLAG_END_STREAM = 0x02
38 38 STREAM_FLAG_ENCODING_APPLIED = 0x04
39 39
40 40 STREAM_FLAGS = {
41 41 b'stream-begin': STREAM_FLAG_BEGIN_STREAM,
42 42 b'stream-end': STREAM_FLAG_END_STREAM,
43 43 b'encoded': STREAM_FLAG_ENCODING_APPLIED,
44 44 }
45 45
46 46 FRAME_TYPE_COMMAND_REQUEST = 0x01
47 47 FRAME_TYPE_COMMAND_DATA = 0x02
48 48 FRAME_TYPE_COMMAND_RESPONSE = 0x03
49 49 FRAME_TYPE_ERROR_RESPONSE = 0x05
50 50 FRAME_TYPE_TEXT_OUTPUT = 0x06
51 51 FRAME_TYPE_PROGRESS = 0x07
52 52 FRAME_TYPE_SENDER_PROTOCOL_SETTINGS = 0x08
53 53 FRAME_TYPE_STREAM_SETTINGS = 0x09
54 54
55 55 FRAME_TYPES = {
56 56 b'command-request': FRAME_TYPE_COMMAND_REQUEST,
57 57 b'command-data': FRAME_TYPE_COMMAND_DATA,
58 58 b'command-response': FRAME_TYPE_COMMAND_RESPONSE,
59 59 b'error-response': FRAME_TYPE_ERROR_RESPONSE,
60 60 b'text-output': FRAME_TYPE_TEXT_OUTPUT,
61 61 b'progress': FRAME_TYPE_PROGRESS,
62 62 b'sender-protocol-settings': FRAME_TYPE_SENDER_PROTOCOL_SETTINGS,
63 63 b'stream-settings': FRAME_TYPE_STREAM_SETTINGS,
64 64 }
65 65
66 66 FLAG_COMMAND_REQUEST_NEW = 0x01
67 67 FLAG_COMMAND_REQUEST_CONTINUATION = 0x02
68 68 FLAG_COMMAND_REQUEST_MORE_FRAMES = 0x04
69 69 FLAG_COMMAND_REQUEST_EXPECT_DATA = 0x08
70 70
71 71 FLAGS_COMMAND_REQUEST = {
72 72 b'new': FLAG_COMMAND_REQUEST_NEW,
73 73 b'continuation': FLAG_COMMAND_REQUEST_CONTINUATION,
74 74 b'more': FLAG_COMMAND_REQUEST_MORE_FRAMES,
75 75 b'have-data': FLAG_COMMAND_REQUEST_EXPECT_DATA,
76 76 }
77 77
78 78 FLAG_COMMAND_DATA_CONTINUATION = 0x01
79 79 FLAG_COMMAND_DATA_EOS = 0x02
80 80
81 81 FLAGS_COMMAND_DATA = {
82 82 b'continuation': FLAG_COMMAND_DATA_CONTINUATION,
83 83 b'eos': FLAG_COMMAND_DATA_EOS,
84 84 }
85 85
86 86 FLAG_COMMAND_RESPONSE_CONTINUATION = 0x01
87 87 FLAG_COMMAND_RESPONSE_EOS = 0x02
88 88
89 89 FLAGS_COMMAND_RESPONSE = {
90 90 b'continuation': FLAG_COMMAND_RESPONSE_CONTINUATION,
91 91 b'eos': FLAG_COMMAND_RESPONSE_EOS,
92 92 }
93 93
94 94 FLAG_SENDER_PROTOCOL_SETTINGS_CONTINUATION = 0x01
95 95 FLAG_SENDER_PROTOCOL_SETTINGS_EOS = 0x02
96 96
97 97 FLAGS_SENDER_PROTOCOL_SETTINGS = {
98 98 b'continuation': FLAG_SENDER_PROTOCOL_SETTINGS_CONTINUATION,
99 99 b'eos': FLAG_SENDER_PROTOCOL_SETTINGS_EOS,
100 100 }
101 101
102 102 FLAG_STREAM_ENCODING_SETTINGS_CONTINUATION = 0x01
103 103 FLAG_STREAM_ENCODING_SETTINGS_EOS = 0x02
104 104
105 105 FLAGS_STREAM_ENCODING_SETTINGS = {
106 106 b'continuation': FLAG_STREAM_ENCODING_SETTINGS_CONTINUATION,
107 107 b'eos': FLAG_STREAM_ENCODING_SETTINGS_EOS,
108 108 }
109 109
110 110 # Maps frame types to their available flags.
111 111 FRAME_TYPE_FLAGS = {
112 112 FRAME_TYPE_COMMAND_REQUEST: FLAGS_COMMAND_REQUEST,
113 113 FRAME_TYPE_COMMAND_DATA: FLAGS_COMMAND_DATA,
114 114 FRAME_TYPE_COMMAND_RESPONSE: FLAGS_COMMAND_RESPONSE,
115 115 FRAME_TYPE_ERROR_RESPONSE: {},
116 116 FRAME_TYPE_TEXT_OUTPUT: {},
117 117 FRAME_TYPE_PROGRESS: {},
118 118 FRAME_TYPE_SENDER_PROTOCOL_SETTINGS: FLAGS_SENDER_PROTOCOL_SETTINGS,
119 119 FRAME_TYPE_STREAM_SETTINGS: FLAGS_STREAM_ENCODING_SETTINGS,
120 120 }
121 121
122 122 ARGUMENT_RECORD_HEADER = struct.Struct(r'<HH')
123 123
124 124 def humanflags(mapping, value):
125 125 """Convert a numeric flags value to a human value, using a mapping table."""
126 126 namemap = {v: k for k, v in mapping.iteritems()}
127 127 flags = []
128 128 val = 1
129 129 while value >= val:
130 130 if value & val:
131 131 flags.append(namemap.get(val, '<unknown 0x%02x>' % val))
132 132 val <<= 1
133 133
134 134 return b'|'.join(flags)
135 135
136 136 @attr.s(slots=True)
137 137 class frameheader(object):
138 138 """Represents the data in a frame header."""
139 139
140 140 length = attr.ib()
141 141 requestid = attr.ib()
142 142 streamid = attr.ib()
143 143 streamflags = attr.ib()
144 144 typeid = attr.ib()
145 145 flags = attr.ib()
146 146
147 147 @attr.s(slots=True, repr=False)
148 148 class frame(object):
149 149 """Represents a parsed frame."""
150 150
151 151 requestid = attr.ib()
152 152 streamid = attr.ib()
153 153 streamflags = attr.ib()
154 154 typeid = attr.ib()
155 155 flags = attr.ib()
156 156 payload = attr.ib()
157 157
158 158 @encoding.strmethod
159 159 def __repr__(self):
160 160 typename = '<unknown 0x%02x>' % self.typeid
161 161 for name, value in FRAME_TYPES.iteritems():
162 162 if value == self.typeid:
163 163 typename = name
164 164 break
165 165
166 166 return ('frame(size=%d; request=%d; stream=%d; streamflags=%s; '
167 167 'type=%s; flags=%s)' % (
168 168 len(self.payload), self.requestid, self.streamid,
169 169 humanflags(STREAM_FLAGS, self.streamflags), typename,
170 170 humanflags(FRAME_TYPE_FLAGS.get(self.typeid, {}), self.flags)))
171 171
172 172 def makeframe(requestid, streamid, streamflags, typeid, flags, payload):
173 173 """Assemble a frame into a byte array."""
174 174 # TODO assert size of payload.
175 175 frame = bytearray(FRAME_HEADER_SIZE + len(payload))
176 176
177 177 # 24 bits length
178 178 # 16 bits request id
179 179 # 8 bits stream id
180 180 # 8 bits stream flags
181 181 # 4 bits type
182 182 # 4 bits flags
183 183
184 184 l = struct.pack(r'<I', len(payload))
185 185 frame[0:3] = l[0:3]
186 186 struct.pack_into(r'<HBB', frame, 3, requestid, streamid, streamflags)
187 187 frame[7] = (typeid << 4) | flags
188 188 frame[8:] = payload
189 189
190 190 return frame
191 191
192 192 def makeframefromhumanstring(s):
193 193 """Create a frame from a human readable string
194 194
195 195 Strings have the form:
196 196
197 197 <request-id> <stream-id> <stream-flags> <type> <flags> <payload>
198 198
199 199 This can be used by user-facing applications and tests for creating
200 200 frames easily without having to type out a bunch of constants.
201 201
202 202 Request ID and stream IDs are integers.
203 203
204 204 Stream flags, frame type, and flags can be specified by integer or
205 205 named constant.
206 206
207 207 Flags can be delimited by `|` to bitwise OR them together.
208 208
209 209 If the payload begins with ``cbor:``, the following string will be
210 210 evaluated as Python literal and the resulting object will be fed into
211 211 a CBOR encoder. Otherwise, the payload is interpreted as a Python
212 212 byte string literal.
213 213 """
214 214 fields = s.split(b' ', 5)
215 215 requestid, streamid, streamflags, frametype, frameflags, payload = fields
216 216
217 217 requestid = int(requestid)
218 218 streamid = int(streamid)
219 219
220 220 finalstreamflags = 0
221 221 for flag in streamflags.split(b'|'):
222 222 if flag in STREAM_FLAGS:
223 223 finalstreamflags |= STREAM_FLAGS[flag]
224 224 else:
225 225 finalstreamflags |= int(flag)
226 226
227 227 if frametype in FRAME_TYPES:
228 228 frametype = FRAME_TYPES[frametype]
229 229 else:
230 230 frametype = int(frametype)
231 231
232 232 finalflags = 0
233 233 validflags = FRAME_TYPE_FLAGS[frametype]
234 234 for flag in frameflags.split(b'|'):
235 235 if flag in validflags:
236 236 finalflags |= validflags[flag]
237 237 else:
238 238 finalflags |= int(flag)
239 239
240 240 if payload.startswith(b'cbor:'):
241 241 payload = b''.join(cborutil.streamencode(
242 242 stringutil.evalpythonliteral(payload[5:])))
243 243
244 244 else:
245 245 payload = stringutil.unescapestr(payload)
246 246
247 247 return makeframe(requestid=requestid, streamid=streamid,
248 248 streamflags=finalstreamflags, typeid=frametype,
249 249 flags=finalflags, payload=payload)
250 250
251 251 def parseheader(data):
252 252 """Parse a unified framing protocol frame header from a buffer.
253 253
254 254 The header is expected to be in the buffer at offset 0 and the
255 255 buffer is expected to be large enough to hold a full header.
256 256 """
257 257 # 24 bits payload length (little endian)
258 258 # 16 bits request ID
259 259 # 8 bits stream ID
260 260 # 8 bits stream flags
261 261 # 4 bits frame type
262 262 # 4 bits frame flags
263 263 # ... payload
264 264 framelength = data[0] + 256 * data[1] + 16384 * data[2]
265 265 requestid, streamid, streamflags = struct.unpack_from(r'<HBB', data, 3)
266 266 typeflags = data[7]
267 267
268 268 frametype = (typeflags & 0xf0) >> 4
269 269 frameflags = typeflags & 0x0f
270 270
271 271 return frameheader(framelength, requestid, streamid, streamflags,
272 272 frametype, frameflags)
273 273
274 274 def readframe(fh):
275 275 """Read a unified framing protocol frame from a file object.
276 276
277 277 Returns a 3-tuple of (type, flags, payload) for the decoded frame or
278 278 None if no frame is available. May raise if a malformed frame is
279 279 seen.
280 280 """
281 281 header = bytearray(FRAME_HEADER_SIZE)
282 282
283 283 readcount = fh.readinto(header)
284 284
285 285 if readcount == 0:
286 286 return None
287 287
288 288 if readcount != FRAME_HEADER_SIZE:
289 289 raise error.Abort(_('received incomplete frame: got %d bytes: %s') %
290 290 (readcount, header))
291 291
292 292 h = parseheader(header)
293 293
294 294 payload = fh.read(h.length)
295 295 if len(payload) != h.length:
296 296 raise error.Abort(_('frame length error: expected %d; got %d') %
297 297 (h.length, len(payload)))
298 298
299 299 return frame(h.requestid, h.streamid, h.streamflags, h.typeid, h.flags,
300 300 payload)
301 301
302 302 def createcommandframes(stream, requestid, cmd, args, datafh=None,
303 303 maxframesize=DEFAULT_MAX_FRAME_SIZE,
304 304 redirect=None):
305 305 """Create frames necessary to transmit a request to run a command.
306 306
307 307 This is a generator of bytearrays. Each item represents a frame
308 308 ready to be sent over the wire to a peer.
309 309 """
310 310 data = {b'name': cmd}
311 311 if args:
312 312 data[b'args'] = args
313 313
314 314 if redirect:
315 315 data[b'redirect'] = redirect
316 316
317 317 data = b''.join(cborutil.streamencode(data))
318 318
319 319 offset = 0
320 320
321 321 while True:
322 322 flags = 0
323 323
324 324 # Must set new or continuation flag.
325 325 if not offset:
326 326 flags |= FLAG_COMMAND_REQUEST_NEW
327 327 else:
328 328 flags |= FLAG_COMMAND_REQUEST_CONTINUATION
329 329
330 330 # Data frames is set on all frames.
331 331 if datafh:
332 332 flags |= FLAG_COMMAND_REQUEST_EXPECT_DATA
333 333
334 334 payload = data[offset:offset + maxframesize]
335 335 offset += len(payload)
336 336
337 337 if len(payload) == maxframesize and offset < len(data):
338 338 flags |= FLAG_COMMAND_REQUEST_MORE_FRAMES
339 339
340 340 yield stream.makeframe(requestid=requestid,
341 341 typeid=FRAME_TYPE_COMMAND_REQUEST,
342 342 flags=flags,
343 343 payload=payload)
344 344
345 345 if not (flags & FLAG_COMMAND_REQUEST_MORE_FRAMES):
346 346 break
347 347
348 348 if datafh:
349 349 while True:
350 350 data = datafh.read(DEFAULT_MAX_FRAME_SIZE)
351 351
352 352 done = False
353 353 if len(data) == DEFAULT_MAX_FRAME_SIZE:
354 354 flags = FLAG_COMMAND_DATA_CONTINUATION
355 355 else:
356 356 flags = FLAG_COMMAND_DATA_EOS
357 357 assert datafh.read(1) == b''
358 358 done = True
359 359
360 360 yield stream.makeframe(requestid=requestid,
361 361 typeid=FRAME_TYPE_COMMAND_DATA,
362 362 flags=flags,
363 363 payload=data)
364 364
365 365 if done:
366 366 break
367 367
368 def createcommandresponseframesfrombytes(stream, requestid, data,
369 maxframesize=DEFAULT_MAX_FRAME_SIZE):
370 """Create a raw frame to send a bytes response from static bytes input.
371
372 Returns a generator of bytearrays.
373 """
374 # Automatically send the overall CBOR response map.
375 overall = b''.join(cborutil.streamencode({b'status': b'ok'}))
376 if len(overall) > maxframesize:
377 raise error.ProgrammingError('not yet implemented')
378
379 # Simple case where we can fit the full response in a single frame.
380 if len(overall) + len(data) <= maxframesize:
381 flags = FLAG_COMMAND_RESPONSE_EOS
382 yield stream.makeframe(requestid=requestid,
383 typeid=FRAME_TYPE_COMMAND_RESPONSE,
384 flags=flags,
385 payload=overall + data)
386 return
387
388 # It's easier to send the overall CBOR map in its own frame than to track
389 # offsets.
390 yield stream.makeframe(requestid=requestid,
391 typeid=FRAME_TYPE_COMMAND_RESPONSE,
392 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
393 payload=overall)
394
395 offset = 0
396 while True:
397 chunk = data[offset:offset + maxframesize]
398 offset += len(chunk)
399 done = offset == len(data)
400
401 if done:
402 flags = FLAG_COMMAND_RESPONSE_EOS
403 else:
404 flags = FLAG_COMMAND_RESPONSE_CONTINUATION
405
406 yield stream.makeframe(requestid=requestid,
407 typeid=FRAME_TYPE_COMMAND_RESPONSE,
408 flags=flags,
409 payload=chunk)
410
411 if done:
412 break
413
414 def createbytesresponseframesfromgen(stream, requestid, gen,
415 maxframesize=DEFAULT_MAX_FRAME_SIZE):
416 """Generator of frames from a generator of byte chunks.
417
418 This assumes that another frame will follow whatever this emits. i.e.
419 this always emits the continuation flag and never emits the end-of-stream
420 flag.
421 """
422 cb = util.chunkbuffer(gen)
423 flags = FLAG_COMMAND_RESPONSE_CONTINUATION
424
425 while True:
426 chunk = cb.read(maxframesize)
427 if not chunk:
428 break
429
430 yield stream.makeframe(requestid=requestid,
431 typeid=FRAME_TYPE_COMMAND_RESPONSE,
432 flags=flags,
433 payload=chunk)
434
435 flags |= FLAG_COMMAND_RESPONSE_CONTINUATION
436
437 368 def createcommandresponseokframe(stream, requestid):
438 369 overall = b''.join(cborutil.streamencode({b'status': b'ok'}))
439 370
440 371 return stream.makeframe(requestid=requestid,
441 372 typeid=FRAME_TYPE_COMMAND_RESPONSE,
442 373 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
443 374 payload=overall)
444 375
445 376 def createcommandresponseeosframe(stream, requestid):
446 377 """Create an empty payload frame representing command end-of-stream."""
447 378 return stream.makeframe(requestid=requestid,
448 379 typeid=FRAME_TYPE_COMMAND_RESPONSE,
449 380 flags=FLAG_COMMAND_RESPONSE_EOS,
450 381 payload=b'')
451 382
452 383 def createalternatelocationresponseframe(stream, requestid, location):
453 384 data = {
454 385 b'status': b'redirect',
455 386 b'location': {
456 387 b'url': location.url,
457 388 b'mediatype': location.mediatype,
458 389 }
459 390 }
460 391
461 392 for a in (r'size', r'fullhashes', r'fullhashseed', r'serverdercerts',
462 393 r'servercadercerts'):
463 394 value = getattr(location, a)
464 395 if value is not None:
465 396 data[b'location'][pycompat.bytestr(a)] = value
466 397
467 398 return stream.makeframe(requestid=requestid,
468 399 typeid=FRAME_TYPE_COMMAND_RESPONSE,
469 400 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
470 401 payload=b''.join(cborutil.streamencode(data)))
471 402
472 403 def createcommanderrorresponse(stream, requestid, message, args=None):
473 404 # TODO should this be using a list of {'msg': ..., 'args': {}} so atom
474 405 # formatting works consistently?
475 406 m = {
476 407 b'status': b'error',
477 408 b'error': {
478 409 b'message': message,
479 410 }
480 411 }
481 412
482 413 if args:
483 414 m[b'error'][b'args'] = args
484 415
485 416 overall = b''.join(cborutil.streamencode(m))
486 417
487 418 yield stream.makeframe(requestid=requestid,
488 419 typeid=FRAME_TYPE_COMMAND_RESPONSE,
489 420 flags=FLAG_COMMAND_RESPONSE_EOS,
490 421 payload=overall)
491 422
492 423 def createerrorframe(stream, requestid, msg, errtype):
493 424 # TODO properly handle frame size limits.
494 425 assert len(msg) <= DEFAULT_MAX_FRAME_SIZE
495 426
496 427 payload = b''.join(cborutil.streamencode({
497 428 b'type': errtype,
498 429 b'message': [{b'msg': msg}],
499 430 }))
500 431
501 432 yield stream.makeframe(requestid=requestid,
502 433 typeid=FRAME_TYPE_ERROR_RESPONSE,
503 434 flags=0,
504 435 payload=payload)
505 436
506 437 def createtextoutputframe(stream, requestid, atoms,
507 438 maxframesize=DEFAULT_MAX_FRAME_SIZE):
508 439 """Create a text output frame to render text to people.
509 440
510 441 ``atoms`` is a 3-tuple of (formatting string, args, labels).
511 442
512 443 The formatting string contains ``%s`` tokens to be replaced by the
513 444 corresponding indexed entry in ``args``. ``labels`` is an iterable of
514 445 formatters to be applied at rendering time. In terms of the ``ui``
515 446 class, each atom corresponds to a ``ui.write()``.
516 447 """
517 448 atomdicts = []
518 449
519 450 for (formatting, args, labels) in atoms:
520 451 # TODO look for localstr, other types here?
521 452
522 453 if not isinstance(formatting, bytes):
523 454 raise ValueError('must use bytes formatting strings')
524 455 for arg in args:
525 456 if not isinstance(arg, bytes):
526 457 raise ValueError('must use bytes for arguments')
527 458 for label in labels:
528 459 if not isinstance(label, bytes):
529 460 raise ValueError('must use bytes for labels')
530 461
531 462 # Formatting string must be ASCII.
532 463 formatting = formatting.decode(r'ascii', r'replace').encode(r'ascii')
533 464
534 465 # Arguments must be UTF-8.
535 466 args = [a.decode(r'utf-8', r'replace').encode(r'utf-8') for a in args]
536 467
537 468 # Labels must be ASCII.
538 469 labels = [l.decode(r'ascii', r'strict').encode(r'ascii')
539 470 for l in labels]
540 471
541 472 atom = {b'msg': formatting}
542 473 if args:
543 474 atom[b'args'] = args
544 475 if labels:
545 476 atom[b'labels'] = labels
546 477
547 478 atomdicts.append(atom)
548 479
549 480 payload = b''.join(cborutil.streamencode(atomdicts))
550 481
551 482 if len(payload) > maxframesize:
552 483 raise ValueError('cannot encode data in a single frame')
553 484
554 485 yield stream.makeframe(requestid=requestid,
555 486 typeid=FRAME_TYPE_TEXT_OUTPUT,
556 487 flags=0,
557 488 payload=payload)
558 489
559 490 class bufferingcommandresponseemitter(object):
560 491 """Helper object to emit command response frames intelligently.
561 492
562 493 Raw command response data is likely emitted in chunks much smaller
563 494 than what can fit in a single frame. This class exists to buffer
564 495 chunks until enough data is available to fit in a single frame.
565 496
566 497 TODO we'll need something like this when compression is supported.
567 498 So it might make sense to implement this functionality at the stream
568 499 level.
569 500 """
570 501 def __init__(self, stream, requestid, maxframesize=DEFAULT_MAX_FRAME_SIZE):
571 502 self._stream = stream
572 503 self._requestid = requestid
573 504 self._maxsize = maxframesize
574 505 self._chunks = []
575 506 self._chunkssize = 0
576 507
577 508 def send(self, data):
578 509 """Send new data for emission.
579 510
580 511 Is a generator of new frames that were derived from the new input.
581 512
582 513 If the special input ``None`` is received, flushes all buffered
583 514 data to frames.
584 515 """
585 516
586 517 if data is None:
587 518 for frame in self._flush():
588 519 yield frame
589 520 return
590 521
591 522 # There is a ton of potential to do more complicated things here.
592 523 # Our immediate goal is to coalesce small chunks into big frames,
593 524 # not achieve the fewest number of frames possible. So we go with
594 525 # a simple implementation:
595 526 #
596 527 # * If a chunk is too large for a frame, we flush and emit frames
597 528 # for the new chunk.
598 529 # * If a chunk can be buffered without total buffered size limits
599 530 # being exceeded, we do that.
600 531 # * If a chunk causes us to go over our buffering limit, we flush
601 532 # and then buffer the new chunk.
602 533
603 534 if len(data) > self._maxsize:
604 535 for frame in self._flush():
605 536 yield frame
606 537
607 538 # Now emit frames for the big chunk.
608 539 offset = 0
609 540 while True:
610 541 chunk = data[offset:offset + self._maxsize]
611 542 offset += len(chunk)
612 543
613 544 yield self._stream.makeframe(
614 545 self._requestid,
615 546 typeid=FRAME_TYPE_COMMAND_RESPONSE,
616 547 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
617 548 payload=chunk)
618 549
619 550 if offset == len(data):
620 551 return
621 552
622 553 # If we don't have enough to constitute a full frame, buffer and
623 554 # return.
624 555 if len(data) + self._chunkssize < self._maxsize:
625 556 self._chunks.append(data)
626 557 self._chunkssize += len(data)
627 558 return
628 559
629 560 # Else flush what we have and buffer the new chunk. We could do
630 561 # something more intelligent here, like break the chunk. Let's
631 562 # keep things simple for now.
632 563 for frame in self._flush():
633 564 yield frame
634 565
635 566 self._chunks.append(data)
636 567 self._chunkssize = len(data)
637 568
638 569 def _flush(self):
639 570 payload = b''.join(self._chunks)
640 571 assert len(payload) <= self._maxsize
641 572
642 573 self._chunks[:] = []
643 574 self._chunkssize = 0
644 575
645 576 yield self._stream.makeframe(
646 577 self._requestid,
647 578 typeid=FRAME_TYPE_COMMAND_RESPONSE,
648 579 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
649 580 payload=payload)
650 581
651 582 # TODO consider defining encoders/decoders using the util.compressionengine
652 583 # mechanism.
653 584
654 585 class identityencoder(object):
655 586 """Encoder for the "identity" stream encoding profile."""
656 587 def __init__(self, ui):
657 588 pass
658 589
659 590 def encode(self, data):
660 591 return data
661 592
662 593 def flush(self):
663 594 return b''
664 595
665 596 def finish(self):
666 597 return b''
667 598
668 599 class identitydecoder(object):
669 600 """Decoder for the "identity" stream encoding profile."""
670 601
671 602 def __init__(self, ui, extraobjs):
672 603 if extraobjs:
673 604 raise error.Abort(_('identity decoder received unexpected '
674 605 'additional values'))
675 606
676 607 def decode(self, data):
677 608 return data
678 609
679 610 class zlibencoder(object):
680 611 def __init__(self, ui):
681 612 import zlib
682 613 self._zlib = zlib
683 614 self._compressor = zlib.compressobj()
684 615
685 616 def encode(self, data):
686 617 return self._compressor.compress(data)
687 618
688 619 def flush(self):
689 620 # Z_SYNC_FLUSH doesn't reset compression context, which is
690 621 # what we want.
691 622 return self._compressor.flush(self._zlib.Z_SYNC_FLUSH)
692 623
693 624 def finish(self):
694 625 res = self._compressor.flush(self._zlib.Z_FINISH)
695 626 self._compressor = None
696 627 return res
697 628
698 629 class zlibdecoder(object):
699 630 def __init__(self, ui, extraobjs):
700 631 import zlib
701 632
702 633 if extraobjs:
703 634 raise error.Abort(_('zlib decoder received unexpected '
704 635 'additional values'))
705 636
706 637 self._decompressor = zlib.decompressobj()
707 638
708 639 def decode(self, data):
709 640 # Python 2's zlib module doesn't use the buffer protocol and can't
710 641 # handle all bytes-like types.
711 642 if not pycompat.ispy3 and isinstance(data, bytearray):
712 643 data = bytes(data)
713 644
714 645 return self._decompressor.decompress(data)
715 646
716 647 class zstdbaseencoder(object):
717 648 def __init__(self, level):
718 649 from . import zstd
719 650
720 651 self._zstd = zstd
721 652 cctx = zstd.ZstdCompressor(level=level)
722 653 self._compressor = cctx.compressobj()
723 654
724 655 def encode(self, data):
725 656 return self._compressor.compress(data)
726 657
727 658 def flush(self):
728 659 # COMPRESSOBJ_FLUSH_BLOCK flushes all data previously fed into the
729 660 # compressor and allows a decompressor to access all encoded data
730 661 # up to this point.
731 662 return self._compressor.flush(self._zstd.COMPRESSOBJ_FLUSH_BLOCK)
732 663
733 664 def finish(self):
734 665 res = self._compressor.flush(self._zstd.COMPRESSOBJ_FLUSH_FINISH)
735 666 self._compressor = None
736 667 return res
737 668
738 669 class zstd8mbencoder(zstdbaseencoder):
739 670 def __init__(self, ui):
740 671 super(zstd8mbencoder, self).__init__(3)
741 672
742 673 class zstdbasedecoder(object):
743 674 def __init__(self, maxwindowsize):
744 675 from . import zstd
745 676 dctx = zstd.ZstdDecompressor(max_window_size=maxwindowsize)
746 677 self._decompressor = dctx.decompressobj()
747 678
748 679 def decode(self, data):
749 680 return self._decompressor.decompress(data)
750 681
751 682 class zstd8mbdecoder(zstdbasedecoder):
752 683 def __init__(self, ui, extraobjs):
753 684 if extraobjs:
754 685 raise error.Abort(_('zstd8mb decoder received unexpected '
755 686 'additional values'))
756 687
757 688 super(zstd8mbdecoder, self).__init__(maxwindowsize=8 * 1048576)
758 689
759 690 # We lazily populate this to avoid excessive module imports when importing
760 691 # this module.
761 692 STREAM_ENCODERS = {}
762 693 STREAM_ENCODERS_ORDER = []
763 694
764 695 def populatestreamencoders():
765 696 if STREAM_ENCODERS:
766 697 return
767 698
768 699 try:
769 700 from . import zstd
770 701 zstd.__version__
771 702 except ImportError:
772 703 zstd = None
773 704
774 705 # zstandard is fastest and is preferred.
775 706 if zstd:
776 707 STREAM_ENCODERS[b'zstd-8mb'] = (zstd8mbencoder, zstd8mbdecoder)
777 708 STREAM_ENCODERS_ORDER.append(b'zstd-8mb')
778 709
779 710 STREAM_ENCODERS[b'zlib'] = (zlibencoder, zlibdecoder)
780 711 STREAM_ENCODERS_ORDER.append(b'zlib')
781 712
782 713 STREAM_ENCODERS[b'identity'] = (identityencoder, identitydecoder)
783 714 STREAM_ENCODERS_ORDER.append(b'identity')
784 715
785 716 class stream(object):
786 717 """Represents a logical unidirectional series of frames."""
787 718
788 719 def __init__(self, streamid, active=False):
789 720 self.streamid = streamid
790 721 self._active = active
791 722
792 723 def makeframe(self, requestid, typeid, flags, payload):
793 724 """Create a frame to be sent out over this stream.
794 725
795 726 Only returns the frame instance. Does not actually send it.
796 727 """
797 728 streamflags = 0
798 729 if not self._active:
799 730 streamflags |= STREAM_FLAG_BEGIN_STREAM
800 731 self._active = True
801 732
802 733 return makeframe(requestid, self.streamid, streamflags, typeid, flags,
803 734 payload)
804 735
805 736 class inputstream(stream):
806 737 """Represents a stream used for receiving data."""
807 738
808 739 def __init__(self, streamid, active=False):
809 740 super(inputstream, self).__init__(streamid, active=active)
810 741 self._decoder = None
811 742
812 743 def setdecoder(self, ui, name, extraobjs):
813 744 """Set the decoder for this stream.
814 745
815 746 Receives the stream profile name and any additional CBOR objects
816 747 decoded from the stream encoding settings frame payloads.
817 748 """
818 749 if name not in STREAM_ENCODERS:
819 750 raise error.Abort(_('unknown stream decoder: %s') % name)
820 751
821 752 self._decoder = STREAM_ENCODERS[name][1](ui, extraobjs)
822 753
823 754 def decode(self, data):
824 755 # Default is identity decoder. We don't bother instantiating one
825 756 # because it is trivial.
826 757 if not self._decoder:
827 758 return data
828 759
829 760 return self._decoder.decode(data)
830 761
831 762 def flush(self):
832 763 if not self._decoder:
833 764 return b''
834 765
835 766 return self._decoder.flush()
836 767
837 768 class outputstream(stream):
838 769 """Represents a stream used for sending data."""
839 770
840 771 def __init__(self, streamid, active=False):
841 772 super(outputstream, self).__init__(streamid, active=active)
842 773 self._encoder = None
843 774
844 775 def setencoder(self, ui, name):
845 776 """Set the encoder for this stream.
846 777
847 778 Receives the stream profile name.
848 779 """
849 780 if name not in STREAM_ENCODERS:
850 781 raise error.Abort(_('unknown stream encoder: %s') % name)
851 782
852 783 self._encoder = STREAM_ENCODERS[name][0](ui)
853 784
854 785 def encode(self, data):
855 786 if not self._encoder:
856 787 return data
857 788
858 789 return self._encoder.encode(data)
859 790
860 791 def flush(self):
861 792 if not self._encoder:
862 793 return b''
863 794
864 795 return self._encoder.flush()
865 796
866 797 def finish(self):
867 798 if not self._encoder:
868 799 return b''
869 800
870 801 self._encoder.finish()
871 802
872 803 def ensureserverstream(stream):
873 804 if stream.streamid % 2:
874 805 raise error.ProgrammingError('server should only write to even '
875 806 'numbered streams; %d is not even' %
876 807 stream.streamid)
877 808
878 809 DEFAULT_PROTOCOL_SETTINGS = {
879 810 'contentencodings': [b'identity'],
880 811 }
881 812
882 813 class serverreactor(object):
883 814 """Holds state of a server handling frame-based protocol requests.
884 815
885 816 This class is the "brain" of the unified frame-based protocol server
886 817 component. While the protocol is stateless from the perspective of
887 818 requests/commands, something needs to track which frames have been
888 819 received, what frames to expect, etc. This class is that thing.
889 820
890 821 Instances are modeled as a state machine of sorts. Instances are also
891 822 reactionary to external events. The point of this class is to encapsulate
892 823 the state of the connection and the exchange of frames, not to perform
893 824 work. Instead, callers tell this class when something occurs, like a
894 825 frame arriving. If that activity is worthy of a follow-up action (say
895 826 *run a command*), the return value of that handler will say so.
896 827
897 828 I/O and CPU intensive operations are purposefully delegated outside of
898 829 this class.
899 830
900 831 Consumers are expected to tell instances when events occur. They do so by
901 832 calling the various ``on*`` methods. These methods return a 2-tuple
902 833 describing any follow-up action(s) to take. The first element is the
903 834 name of an action to perform. The second is a data structure (usually
904 835 a dict) specific to that action that contains more information. e.g.
905 836 if the server wants to send frames back to the client, the data structure
906 837 will contain a reference to those frames.
907 838
908 839 Valid actions that consumers can be instructed to take are:
909 840
910 841 sendframes
911 842 Indicates that frames should be sent to the client. The ``framegen``
912 843 key contains a generator of frames that should be sent. The server
913 844 assumes that all frames are sent to the client.
914 845
915 846 error
916 847 Indicates that an error occurred. Consumer should probably abort.
917 848
918 849 runcommand
919 850 Indicates that the consumer should run a wire protocol command. Details
920 851 of the command to run are given in the data structure.
921 852
922 853 wantframe
923 854 Indicates that nothing of interest happened and the server is waiting on
924 855 more frames from the client before anything interesting can be done.
925 856
926 857 noop
927 858 Indicates no additional action is required.
928 859
929 860 Known Issues
930 861 ------------
931 862
932 863 There are no limits to the number of partially received commands or their
933 864 size. A malicious client could stream command request data and exhaust the
934 865 server's memory.
935 866
936 867 Partially received commands are not acted upon when end of input is
937 868 reached. Should the server error if it receives a partial request?
938 869 Should the client send a message to abort a partially transmitted request
939 870 to facilitate graceful shutdown?
940 871
941 872 Active requests that haven't been responded to aren't tracked. This means
942 873 that if we receive a command and instruct its dispatch, another command
943 874 with its request ID can come in over the wire and there will be a race
944 875 between who responds to what.
945 876 """
946 877
947 878 def __init__(self, ui, deferoutput=False):
948 879 """Construct a new server reactor.
949 880
950 881 ``deferoutput`` can be used to indicate that no output frames should be
951 882 instructed to be sent until input has been exhausted. In this mode,
952 883 events that would normally generate output frames (such as a command
953 884 response being ready) will instead defer instructing the consumer to
954 885 send those frames. This is useful for half-duplex transports where the
955 886 sender cannot receive until all data has been transmitted.
956 887 """
957 888 self._ui = ui
958 889 self._deferoutput = deferoutput
959 890 self._state = 'initial'
960 891 self._nextoutgoingstreamid = 2
961 892 self._bufferedframegens = []
962 893 # stream id -> stream instance for all active streams from the client.
963 894 self._incomingstreams = {}
964 895 self._outgoingstreams = {}
965 896 # request id -> dict of commands that are actively being received.
966 897 self._receivingcommands = {}
967 898 # Request IDs that have been received and are actively being processed.
968 899 # Once all output for a request has been sent, it is removed from this
969 900 # set.
970 901 self._activecommands = set()
971 902
972 903 self._protocolsettingsdecoder = None
973 904
974 905 # Sender protocol settings are optional. Set implied default values.
975 906 self._sendersettings = dict(DEFAULT_PROTOCOL_SETTINGS)
976 907
977 908 populatestreamencoders()
978 909
979 910 def onframerecv(self, frame):
980 911 """Process a frame that has been received off the wire.
981 912
982 913 Returns a dict with an ``action`` key that details what action,
983 914 if any, the consumer should take next.
984 915 """
985 916 if not frame.streamid % 2:
986 917 self._state = 'errored'
987 918 return self._makeerrorresult(
988 919 _('received frame with even numbered stream ID: %d') %
989 920 frame.streamid)
990 921
991 922 if frame.streamid not in self._incomingstreams:
992 923 if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM:
993 924 self._state = 'errored'
994 925 return self._makeerrorresult(
995 926 _('received frame on unknown inactive stream without '
996 927 'beginning of stream flag set'))
997 928
998 929 self._incomingstreams[frame.streamid] = inputstream(frame.streamid)
999 930
1000 931 if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED:
1001 932 # TODO handle decoding frames
1002 933 self._state = 'errored'
1003 934 raise error.ProgrammingError('support for decoding stream payloads '
1004 935 'not yet implemented')
1005 936
1006 937 if frame.streamflags & STREAM_FLAG_END_STREAM:
1007 938 del self._incomingstreams[frame.streamid]
1008 939
1009 940 handlers = {
1010 941 'initial': self._onframeinitial,
1011 942 'protocol-settings-receiving': self._onframeprotocolsettings,
1012 943 'idle': self._onframeidle,
1013 944 'command-receiving': self._onframecommandreceiving,
1014 945 'errored': self._onframeerrored,
1015 946 }
1016 947
1017 948 meth = handlers.get(self._state)
1018 949 if not meth:
1019 950 raise error.ProgrammingError('unhandled state: %s' % self._state)
1020 951
1021 952 return meth(frame)
1022 953
1023 def oncommandresponseready(self, stream, requestid, data):
1024 """Signal that a bytes response is ready to be sent to the client.
1025
1026 The raw bytes response is passed as an argument.
1027 """
1028 ensureserverstream(stream)
1029
1030 def sendframes():
1031 for frame in createcommandresponseframesfrombytes(stream, requestid,
1032 data):
1033 yield frame
1034
1035 self._activecommands.remove(requestid)
1036
1037 result = sendframes()
1038
1039 if self._deferoutput:
1040 self._bufferedframegens.append(result)
1041 return 'noop', {}
1042 else:
1043 return 'sendframes', {
1044 'framegen': result,
1045 }
1046
1047 954 def oncommandresponsereadyobjects(self, stream, requestid, objs):
1048 955 """Signal that objects are ready to be sent to the client.
1049 956
1050 957 ``objs`` is an iterable of objects (typically a generator) that will
1051 958 be encoded via CBOR and added to frames, which will be sent to the
1052 959 client.
1053 960 """
1054 961 ensureserverstream(stream)
1055 962
963 # A more robust solution would be to check for objs.{next,__next__}.
964 if isinstance(objs, list):
965 objs = iter(objs)
966
1056 967 # We need to take care over exception handling. Uncaught exceptions
1057 968 # when generating frames could lead to premature end of the frame
1058 969 # stream and the possibility of the server or client process getting
1059 970 # in a bad state.
1060 971 #
1061 972 # Keep in mind that if ``objs`` is a generator, advancing it could
1062 973 # raise exceptions that originated in e.g. wire protocol command
1063 974 # functions. That is why we differentiate between exceptions raised
1064 975 # when iterating versus other exceptions that occur.
1065 976 #
1066 977 # In all cases, when the function finishes, the request is fully
1067 978 # handled and no new frames for it should be seen.
1068 979
1069 980 def sendframes():
1070 981 emitted = False
1071 982 alternatelocationsent = False
1072 983 emitter = bufferingcommandresponseemitter(stream, requestid)
1073 984 while True:
1074 985 try:
1075 986 o = next(objs)
1076 987 except StopIteration:
1077 988 for frame in emitter.send(None):
1078 989 yield frame
1079 990
1080 991 if emitted:
1081 992 yield createcommandresponseeosframe(stream, requestid)
1082 993 break
1083 994
1084 995 except error.WireprotoCommandError as e:
1085 996 for frame in createcommanderrorresponse(
1086 997 stream, requestid, e.message, e.messageargs):
1087 998 yield frame
1088 999 break
1089 1000
1090 1001 except Exception as e:
1091 1002 for frame in createerrorframe(
1092 1003 stream, requestid, '%s' % stringutil.forcebytestr(e),
1093 1004 errtype='server'):
1094 1005
1095 1006 yield frame
1096 1007
1097 1008 break
1098 1009
1099 1010 try:
1100 1011 # Alternate location responses can only be the first and
1101 1012 # only object in the output stream.
1102 1013 if isinstance(o, wireprototypes.alternatelocationresponse):
1103 1014 if emitted:
1104 1015 raise error.ProgrammingError(
1105 1016 'alternatelocationresponse seen after initial '
1106 1017 'output object')
1107 1018
1108 1019 yield createalternatelocationresponseframe(
1109 1020 stream, requestid, o)
1110 1021
1111 1022 alternatelocationsent = True
1112 1023 emitted = True
1113 1024 continue
1114 1025
1115 1026 if alternatelocationsent:
1116 1027 raise error.ProgrammingError(
1117 1028 'object follows alternatelocationresponse')
1118 1029
1119 1030 if not emitted:
1120 1031 yield createcommandresponseokframe(stream, requestid)
1121 1032 emitted = True
1122 1033
1123 1034 # Objects emitted by command functions can be serializable
1124 1035 # data structures or special types.
1125 1036 # TODO consider extracting the content normalization to a
1126 1037 # standalone function, as it may be useful for e.g. cachers.
1127 1038
1128 1039 # A pre-encoded object is sent directly to the emitter.
1129 1040 if isinstance(o, wireprototypes.encodedresponse):
1130 1041 for frame in emitter.send(o.data):
1131 1042 yield frame
1132 1043
1133 1044 # A regular object is CBOR encoded.
1134 1045 else:
1135 1046 for chunk in cborutil.streamencode(o):
1136 1047 for frame in emitter.send(chunk):
1137 1048 yield frame
1138 1049
1139 1050 except Exception as e:
1140 1051 for frame in createerrorframe(stream, requestid,
1141 1052 '%s' % e,
1142 1053 errtype='server'):
1143 1054 yield frame
1144 1055
1145 1056 break
1146 1057
1147 1058 self._activecommands.remove(requestid)
1148 1059
1149 1060 return self._handlesendframes(sendframes())
1150 1061
1151 1062 def oninputeof(self):
1152 1063 """Signals that end of input has been received.
1153 1064
1154 1065 No more frames will be received. All pending activity should be
1155 1066 completed.
1156 1067 """
1157 1068 # TODO should we do anything about in-flight commands?
1158 1069
1159 1070 if not self._deferoutput or not self._bufferedframegens:
1160 1071 return 'noop', {}
1161 1072
1162 1073 # If we buffered all our responses, emit those.
1163 1074 def makegen():
1164 1075 for gen in self._bufferedframegens:
1165 1076 for frame in gen:
1166 1077 yield frame
1167 1078
1168 1079 return 'sendframes', {
1169 1080 'framegen': makegen(),
1170 1081 }
1171 1082
1172 1083 def _handlesendframes(self, framegen):
1173 1084 if self._deferoutput:
1174 1085 self._bufferedframegens.append(framegen)
1175 1086 return 'noop', {}
1176 1087 else:
1177 1088 return 'sendframes', {
1178 1089 'framegen': framegen,
1179 1090 }
1180 1091
1181 1092 def onservererror(self, stream, requestid, msg):
1182 1093 ensureserverstream(stream)
1183 1094
1184 1095 def sendframes():
1185 1096 for frame in createerrorframe(stream, requestid, msg,
1186 1097 errtype='server'):
1187 1098 yield frame
1188 1099
1189 1100 self._activecommands.remove(requestid)
1190 1101
1191 1102 return self._handlesendframes(sendframes())
1192 1103
1193 1104 def oncommanderror(self, stream, requestid, message, args=None):
1194 1105 """Called when a command encountered an error before sending output."""
1195 1106 ensureserverstream(stream)
1196 1107
1197 1108 def sendframes():
1198 1109 for frame in createcommanderrorresponse(stream, requestid, message,
1199 1110 args):
1200 1111 yield frame
1201 1112
1202 1113 self._activecommands.remove(requestid)
1203 1114
1204 1115 return self._handlesendframes(sendframes())
1205 1116
1206 1117 def makeoutputstream(self):
1207 1118 """Create a stream to be used for sending data to the client."""
1208 1119 streamid = self._nextoutgoingstreamid
1209 1120 self._nextoutgoingstreamid += 2
1210 1121
1211 1122 s = outputstream(streamid)
1212 1123 self._outgoingstreams[streamid] = s
1213 1124
1214 1125 return s
1215 1126
1216 1127 def _makeerrorresult(self, msg):
1217 1128 return 'error', {
1218 1129 'message': msg,
1219 1130 }
1220 1131
1221 1132 def _makeruncommandresult(self, requestid):
1222 1133 entry = self._receivingcommands[requestid]
1223 1134
1224 1135 if not entry['requestdone']:
1225 1136 self._state = 'errored'
1226 1137 raise error.ProgrammingError('should not be called without '
1227 1138 'requestdone set')
1228 1139
1229 1140 del self._receivingcommands[requestid]
1230 1141
1231 1142 if self._receivingcommands:
1232 1143 self._state = 'command-receiving'
1233 1144 else:
1234 1145 self._state = 'idle'
1235 1146
1236 1147 # Decode the payloads as CBOR.
1237 1148 entry['payload'].seek(0)
1238 1149 request = cborutil.decodeall(entry['payload'].getvalue())[0]
1239 1150
1240 1151 if b'name' not in request:
1241 1152 self._state = 'errored'
1242 1153 return self._makeerrorresult(
1243 1154 _('command request missing "name" field'))
1244 1155
1245 1156 if b'args' not in request:
1246 1157 request[b'args'] = {}
1247 1158
1248 1159 assert requestid not in self._activecommands
1249 1160 self._activecommands.add(requestid)
1250 1161
1251 1162 return 'runcommand', {
1252 1163 'requestid': requestid,
1253 1164 'command': request[b'name'],
1254 1165 'args': request[b'args'],
1255 1166 'redirect': request.get(b'redirect'),
1256 1167 'data': entry['data'].getvalue() if entry['data'] else None,
1257 1168 }
1258 1169
1259 1170 def _makewantframeresult(self):
1260 1171 return 'wantframe', {
1261 1172 'state': self._state,
1262 1173 }
1263 1174
1264 1175 def _validatecommandrequestframe(self, frame):
1265 1176 new = frame.flags & FLAG_COMMAND_REQUEST_NEW
1266 1177 continuation = frame.flags & FLAG_COMMAND_REQUEST_CONTINUATION
1267 1178
1268 1179 if new and continuation:
1269 1180 self._state = 'errored'
1270 1181 return self._makeerrorresult(
1271 1182 _('received command request frame with both new and '
1272 1183 'continuation flags set'))
1273 1184
1274 1185 if not new and not continuation:
1275 1186 self._state = 'errored'
1276 1187 return self._makeerrorresult(
1277 1188 _('received command request frame with neither new nor '
1278 1189 'continuation flags set'))
1279 1190
1280 1191 def _onframeinitial(self, frame):
1281 1192 # Called when we receive a frame when in the "initial" state.
1282 1193 if frame.typeid == FRAME_TYPE_SENDER_PROTOCOL_SETTINGS:
1283 1194 self._state = 'protocol-settings-receiving'
1284 1195 self._protocolsettingsdecoder = cborutil.bufferingdecoder()
1285 1196 return self._onframeprotocolsettings(frame)
1286 1197
1287 1198 elif frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
1288 1199 self._state = 'idle'
1289 1200 return self._onframeidle(frame)
1290 1201
1291 1202 else:
1292 1203 self._state = 'errored'
1293 1204 return self._makeerrorresult(
1294 1205 _('expected sender protocol settings or command request '
1295 1206 'frame; got %d') % frame.typeid)
1296 1207
1297 1208 def _onframeprotocolsettings(self, frame):
1298 1209 assert self._state == 'protocol-settings-receiving'
1299 1210 assert self._protocolsettingsdecoder is not None
1300 1211
1301 1212 if frame.typeid != FRAME_TYPE_SENDER_PROTOCOL_SETTINGS:
1302 1213 self._state = 'errored'
1303 1214 return self._makeerrorresult(
1304 1215 _('expected sender protocol settings frame; got %d') %
1305 1216 frame.typeid)
1306 1217
1307 1218 more = frame.flags & FLAG_SENDER_PROTOCOL_SETTINGS_CONTINUATION
1308 1219 eos = frame.flags & FLAG_SENDER_PROTOCOL_SETTINGS_EOS
1309 1220
1310 1221 if more and eos:
1311 1222 self._state = 'errored'
1312 1223 return self._makeerrorresult(
1313 1224 _('sender protocol settings frame cannot have both '
1314 1225 'continuation and end of stream flags set'))
1315 1226
1316 1227 if not more and not eos:
1317 1228 self._state = 'errored'
1318 1229 return self._makeerrorresult(
1319 1230 _('sender protocol settings frame must have continuation or '
1320 1231 'end of stream flag set'))
1321 1232
1322 1233 # TODO establish limits for maximum amount of data that can be
1323 1234 # buffered.
1324 1235 try:
1325 1236 self._protocolsettingsdecoder.decode(frame.payload)
1326 1237 except Exception as e:
1327 1238 self._state = 'errored'
1328 1239 return self._makeerrorresult(
1329 1240 _('error decoding CBOR from sender protocol settings frame: %s')
1330 1241 % stringutil.forcebytestr(e))
1331 1242
1332 1243 if more:
1333 1244 return self._makewantframeresult()
1334 1245
1335 1246 assert eos
1336 1247
1337 1248 decoded = self._protocolsettingsdecoder.getavailable()
1338 1249 self._protocolsettingsdecoder = None
1339 1250
1340 1251 if not decoded:
1341 1252 self._state = 'errored'
1342 1253 return self._makeerrorresult(
1343 1254 _('sender protocol settings frame did not contain CBOR data'))
1344 1255 elif len(decoded) > 1:
1345 1256 self._state = 'errored'
1346 1257 return self._makeerrorresult(
1347 1258 _('sender protocol settings frame contained multiple CBOR '
1348 1259 'values'))
1349 1260
1350 1261 d = decoded[0]
1351 1262
1352 1263 if b'contentencodings' in d:
1353 1264 self._sendersettings['contentencodings'] = d[b'contentencodings']
1354 1265
1355 1266 self._state = 'idle'
1356 1267
1357 1268 return self._makewantframeresult()
1358 1269
1359 1270 def _onframeidle(self, frame):
1360 1271 # The only frame type that should be received in this state is a
1361 1272 # command request.
1362 1273 if frame.typeid != FRAME_TYPE_COMMAND_REQUEST:
1363 1274 self._state = 'errored'
1364 1275 return self._makeerrorresult(
1365 1276 _('expected command request frame; got %d') % frame.typeid)
1366 1277
1367 1278 res = self._validatecommandrequestframe(frame)
1368 1279 if res:
1369 1280 return res
1370 1281
1371 1282 if frame.requestid in self._receivingcommands:
1372 1283 self._state = 'errored'
1373 1284 return self._makeerrorresult(
1374 1285 _('request with ID %d already received') % frame.requestid)
1375 1286
1376 1287 if frame.requestid in self._activecommands:
1377 1288 self._state = 'errored'
1378 1289 return self._makeerrorresult(
1379 1290 _('request with ID %d is already active') % frame.requestid)
1380 1291
1381 1292 new = frame.flags & FLAG_COMMAND_REQUEST_NEW
1382 1293 moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES
1383 1294 expectingdata = frame.flags & FLAG_COMMAND_REQUEST_EXPECT_DATA
1384 1295
1385 1296 if not new:
1386 1297 self._state = 'errored'
1387 1298 return self._makeerrorresult(
1388 1299 _('received command request frame without new flag set'))
1389 1300
1390 1301 payload = util.bytesio()
1391 1302 payload.write(frame.payload)
1392 1303
1393 1304 self._receivingcommands[frame.requestid] = {
1394 1305 'payload': payload,
1395 1306 'data': None,
1396 1307 'requestdone': not moreframes,
1397 1308 'expectingdata': bool(expectingdata),
1398 1309 }
1399 1310
1400 1311 # This is the final frame for this request. Dispatch it.
1401 1312 if not moreframes and not expectingdata:
1402 1313 return self._makeruncommandresult(frame.requestid)
1403 1314
1404 1315 assert moreframes or expectingdata
1405 1316 self._state = 'command-receiving'
1406 1317 return self._makewantframeresult()
1407 1318
1408 1319 def _onframecommandreceiving(self, frame):
1409 1320 if frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
1410 1321 # Process new command requests as such.
1411 1322 if frame.flags & FLAG_COMMAND_REQUEST_NEW:
1412 1323 return self._onframeidle(frame)
1413 1324
1414 1325 res = self._validatecommandrequestframe(frame)
1415 1326 if res:
1416 1327 return res
1417 1328
1418 1329 # All other frames should be related to a command that is currently
1419 1330 # receiving but is not active.
1420 1331 if frame.requestid in self._activecommands:
1421 1332 self._state = 'errored'
1422 1333 return self._makeerrorresult(
1423 1334 _('received frame for request that is still active: %d') %
1424 1335 frame.requestid)
1425 1336
1426 1337 if frame.requestid not in self._receivingcommands:
1427 1338 self._state = 'errored'
1428 1339 return self._makeerrorresult(
1429 1340 _('received frame for request that is not receiving: %d') %
1430 1341 frame.requestid)
1431 1342
1432 1343 entry = self._receivingcommands[frame.requestid]
1433 1344
1434 1345 if frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
1435 1346 moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES
1436 1347 expectingdata = bool(frame.flags & FLAG_COMMAND_REQUEST_EXPECT_DATA)
1437 1348
1438 1349 if entry['requestdone']:
1439 1350 self._state = 'errored'
1440 1351 return self._makeerrorresult(
1441 1352 _('received command request frame when request frames '
1442 1353 'were supposedly done'))
1443 1354
1444 1355 if expectingdata != entry['expectingdata']:
1445 1356 self._state = 'errored'
1446 1357 return self._makeerrorresult(
1447 1358 _('mismatch between expect data flag and previous frame'))
1448 1359
1449 1360 entry['payload'].write(frame.payload)
1450 1361
1451 1362 if not moreframes:
1452 1363 entry['requestdone'] = True
1453 1364
1454 1365 if not moreframes and not expectingdata:
1455 1366 return self._makeruncommandresult(frame.requestid)
1456 1367
1457 1368 return self._makewantframeresult()
1458 1369
1459 1370 elif frame.typeid == FRAME_TYPE_COMMAND_DATA:
1460 1371 if not entry['expectingdata']:
1461 1372 self._state = 'errored'
1462 1373 return self._makeerrorresult(_(
1463 1374 'received command data frame for request that is not '
1464 1375 'expecting data: %d') % frame.requestid)
1465 1376
1466 1377 if entry['data'] is None:
1467 1378 entry['data'] = util.bytesio()
1468 1379
1469 1380 return self._handlecommanddataframe(frame, entry)
1470 1381 else:
1471 1382 self._state = 'errored'
1472 1383 return self._makeerrorresult(_(
1473 1384 'received unexpected frame type: %d') % frame.typeid)
1474 1385
1475 1386 def _handlecommanddataframe(self, frame, entry):
1476 1387 assert frame.typeid == FRAME_TYPE_COMMAND_DATA
1477 1388
1478 1389 # TODO support streaming data instead of buffering it.
1479 1390 entry['data'].write(frame.payload)
1480 1391
1481 1392 if frame.flags & FLAG_COMMAND_DATA_CONTINUATION:
1482 1393 return self._makewantframeresult()
1483 1394 elif frame.flags & FLAG_COMMAND_DATA_EOS:
1484 1395 entry['data'].seek(0)
1485 1396 return self._makeruncommandresult(frame.requestid)
1486 1397 else:
1487 1398 self._state = 'errored'
1488 1399 return self._makeerrorresult(_('command data frame without '
1489 1400 'flags'))
1490 1401
1491 1402 def _onframeerrored(self, frame):
1492 1403 return self._makeerrorresult(_('server already errored'))
1493 1404
1494 1405 class commandrequest(object):
1495 1406 """Represents a request to run a command."""
1496 1407
1497 1408 def __init__(self, requestid, name, args, datafh=None, redirect=None):
1498 1409 self.requestid = requestid
1499 1410 self.name = name
1500 1411 self.args = args
1501 1412 self.datafh = datafh
1502 1413 self.redirect = redirect
1503 1414 self.state = 'pending'
1504 1415
1505 1416 class clientreactor(object):
1506 1417 """Holds state of a client issuing frame-based protocol requests.
1507 1418
1508 1419 This is like ``serverreactor`` but for client-side state.
1509 1420
1510 1421 Each instance is bound to the lifetime of a connection. For persistent
1511 1422 connection transports using e.g. TCP sockets and speaking the raw
1512 1423 framing protocol, there will be a single instance for the lifetime of
1513 1424 the TCP socket. For transports where there are multiple discrete
1514 1425 interactions (say tunneled within in HTTP request), there will be a
1515 1426 separate instance for each distinct interaction.
1516 1427
1517 1428 Consumers are expected to tell instances when events occur by calling
1518 1429 various methods. These methods return a 2-tuple describing any follow-up
1519 1430 action(s) to take. The first element is the name of an action to
1520 1431 perform. The second is a data structure (usually a dict) specific to
1521 1432 that action that contains more information. e.g. if the reactor wants
1522 1433 to send frames to the server, the data structure will contain a reference
1523 1434 to those frames.
1524 1435
1525 1436 Valid actions that consumers can be instructed to take are:
1526 1437
1527 1438 noop
1528 1439 Indicates no additional action is required.
1529 1440
1530 1441 sendframes
1531 1442 Indicates that frames should be sent to the server. The ``framegen``
1532 1443 key contains a generator of frames that should be sent. The reactor
1533 1444 assumes that all frames in this generator are sent to the server.
1534 1445
1535 1446 error
1536 1447 Indicates that an error occurred. The ``message`` key contains an
1537 1448 error message describing the failure.
1538 1449
1539 1450 responsedata
1540 1451 Indicates a response to a previously-issued command was received.
1541 1452
1542 1453 The ``request`` key contains the ``commandrequest`` instance that
1543 1454 represents the request this data is for.
1544 1455
1545 1456 The ``data`` key contains the decoded data from the server.
1546 1457
1547 1458 ``expectmore`` and ``eos`` evaluate to True when more response data
1548 1459 is expected to follow or we're at the end of the response stream,
1549 1460 respectively.
1550 1461 """
1551 1462 def __init__(self, ui, hasmultiplesend=False, buffersends=True,
1552 1463 clientcontentencoders=None):
1553 1464 """Create a new instance.
1554 1465
1555 1466 ``hasmultiplesend`` indicates whether multiple sends are supported
1556 1467 by the transport. When True, it is possible to send commands immediately
1557 1468 instead of buffering until the caller signals an intent to finish a
1558 1469 send operation.
1559 1470
1560 1471 ``buffercommands`` indicates whether sends should be buffered until the
1561 1472 last request has been issued.
1562 1473
1563 1474 ``clientcontentencoders`` is an iterable of content encoders the client
1564 1475 will advertise to the server and that the server can use for encoding
1565 1476 data. If not defined, the client will not advertise content encoders
1566 1477 to the server.
1567 1478 """
1568 1479 self._ui = ui
1569 1480 self._hasmultiplesend = hasmultiplesend
1570 1481 self._buffersends = buffersends
1571 1482 self._clientcontentencoders = clientcontentencoders
1572 1483
1573 1484 self._canissuecommands = True
1574 1485 self._cansend = True
1575 1486 self._protocolsettingssent = False
1576 1487
1577 1488 self._nextrequestid = 1
1578 1489 # We only support a single outgoing stream for now.
1579 1490 self._outgoingstream = outputstream(1)
1580 1491 self._pendingrequests = collections.deque()
1581 1492 self._activerequests = {}
1582 1493 self._incomingstreams = {}
1583 1494 self._streamsettingsdecoders = {}
1584 1495
1585 1496 populatestreamencoders()
1586 1497
1587 1498 def callcommand(self, name, args, datafh=None, redirect=None):
1588 1499 """Request that a command be executed.
1589 1500
1590 1501 Receives the command name, a dict of arguments to pass to the command,
1591 1502 and an optional file object containing the raw data for the command.
1592 1503
1593 1504 Returns a 3-tuple of (request, action, action data).
1594 1505 """
1595 1506 if not self._canissuecommands:
1596 1507 raise error.ProgrammingError('cannot issue new commands')
1597 1508
1598 1509 requestid = self._nextrequestid
1599 1510 self._nextrequestid += 2
1600 1511
1601 1512 request = commandrequest(requestid, name, args, datafh=datafh,
1602 1513 redirect=redirect)
1603 1514
1604 1515 if self._buffersends:
1605 1516 self._pendingrequests.append(request)
1606 1517 return request, 'noop', {}
1607 1518 else:
1608 1519 if not self._cansend:
1609 1520 raise error.ProgrammingError('sends cannot be performed on '
1610 1521 'this instance')
1611 1522
1612 1523 if not self._hasmultiplesend:
1613 1524 self._cansend = False
1614 1525 self._canissuecommands = False
1615 1526
1616 1527 return request, 'sendframes', {
1617 1528 'framegen': self._makecommandframes(request),
1618 1529 }
1619 1530
1620 1531 def flushcommands(self):
1621 1532 """Request that all queued commands be sent.
1622 1533
1623 1534 If any commands are buffered, this will instruct the caller to send
1624 1535 them over the wire. If no commands are buffered it instructs the client
1625 1536 to no-op.
1626 1537
1627 1538 If instances aren't configured for multiple sends, no new command
1628 1539 requests are allowed after this is called.
1629 1540 """
1630 1541 if not self._pendingrequests:
1631 1542 return 'noop', {}
1632 1543
1633 1544 if not self._cansend:
1634 1545 raise error.ProgrammingError('sends cannot be performed on this '
1635 1546 'instance')
1636 1547
1637 1548 # If the instance only allows sending once, mark that we have fired
1638 1549 # our one shot.
1639 1550 if not self._hasmultiplesend:
1640 1551 self._canissuecommands = False
1641 1552 self._cansend = False
1642 1553
1643 1554 def makeframes():
1644 1555 while self._pendingrequests:
1645 1556 request = self._pendingrequests.popleft()
1646 1557 for frame in self._makecommandframes(request):
1647 1558 yield frame
1648 1559
1649 1560 return 'sendframes', {
1650 1561 'framegen': makeframes(),
1651 1562 }
1652 1563
1653 1564 def _makecommandframes(self, request):
1654 1565 """Emit frames to issue a command request.
1655 1566
1656 1567 As a side-effect, update request accounting to reflect its changed
1657 1568 state.
1658 1569 """
1659 1570 self._activerequests[request.requestid] = request
1660 1571 request.state = 'sending'
1661 1572
1662 1573 if not self._protocolsettingssent and self._clientcontentencoders:
1663 1574 self._protocolsettingssent = True
1664 1575
1665 1576 payload = b''.join(cborutil.streamencode({
1666 1577 b'contentencodings': self._clientcontentencoders,
1667 1578 }))
1668 1579
1669 1580 yield self._outgoingstream.makeframe(
1670 1581 requestid=request.requestid,
1671 1582 typeid=FRAME_TYPE_SENDER_PROTOCOL_SETTINGS,
1672 1583 flags=FLAG_SENDER_PROTOCOL_SETTINGS_EOS,
1673 1584 payload=payload)
1674 1585
1675 1586 res = createcommandframes(self._outgoingstream,
1676 1587 request.requestid,
1677 1588 request.name,
1678 1589 request.args,
1679 1590 datafh=request.datafh,
1680 1591 redirect=request.redirect)
1681 1592
1682 1593 for frame in res:
1683 1594 yield frame
1684 1595
1685 1596 request.state = 'sent'
1686 1597
1687 1598 def onframerecv(self, frame):
1688 1599 """Process a frame that has been received off the wire.
1689 1600
1690 1601 Returns a 2-tuple of (action, meta) describing further action the
1691 1602 caller needs to take as a result of receiving this frame.
1692 1603 """
1693 1604 if frame.streamid % 2:
1694 1605 return 'error', {
1695 1606 'message': (
1696 1607 _('received frame with odd numbered stream ID: %d') %
1697 1608 frame.streamid),
1698 1609 }
1699 1610
1700 1611 if frame.streamid not in self._incomingstreams:
1701 1612 if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM:
1702 1613 return 'error', {
1703 1614 'message': _('received frame on unknown stream '
1704 1615 'without beginning of stream flag set'),
1705 1616 }
1706 1617
1707 1618 self._incomingstreams[frame.streamid] = inputstream(
1708 1619 frame.streamid)
1709 1620
1710 1621 stream = self._incomingstreams[frame.streamid]
1711 1622
1712 1623 # If the payload is encoded, ask the stream to decode it. We
1713 1624 # merely substitute the decoded result into the frame payload as
1714 1625 # if it had been transferred all along.
1715 1626 if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED:
1716 1627 frame.payload = stream.decode(frame.payload)
1717 1628
1718 1629 if frame.streamflags & STREAM_FLAG_END_STREAM:
1719 1630 del self._incomingstreams[frame.streamid]
1720 1631
1721 1632 if frame.typeid == FRAME_TYPE_STREAM_SETTINGS:
1722 1633 return self._onstreamsettingsframe(frame)
1723 1634
1724 1635 if frame.requestid not in self._activerequests:
1725 1636 return 'error', {
1726 1637 'message': (_('received frame for inactive request ID: %d') %
1727 1638 frame.requestid),
1728 1639 }
1729 1640
1730 1641 request = self._activerequests[frame.requestid]
1731 1642 request.state = 'receiving'
1732 1643
1733 1644 handlers = {
1734 1645 FRAME_TYPE_COMMAND_RESPONSE: self._oncommandresponseframe,
1735 1646 FRAME_TYPE_ERROR_RESPONSE: self._onerrorresponseframe,
1736 1647 }
1737 1648
1738 1649 meth = handlers.get(frame.typeid)
1739 1650 if not meth:
1740 1651 raise error.ProgrammingError('unhandled frame type: %d' %
1741 1652 frame.typeid)
1742 1653
1743 1654 return meth(request, frame)
1744 1655
1745 1656 def _onstreamsettingsframe(self, frame):
1746 1657 assert frame.typeid == FRAME_TYPE_STREAM_SETTINGS
1747 1658
1748 1659 more = frame.flags & FLAG_STREAM_ENCODING_SETTINGS_CONTINUATION
1749 1660 eos = frame.flags & FLAG_STREAM_ENCODING_SETTINGS_EOS
1750 1661
1751 1662 if more and eos:
1752 1663 return 'error', {
1753 1664 'message': (_('stream encoding settings frame cannot have both '
1754 1665 'continuation and end of stream flags set')),
1755 1666 }
1756 1667
1757 1668 if not more and not eos:
1758 1669 return 'error', {
1759 1670 'message': _('stream encoding settings frame must have '
1760 1671 'continuation or end of stream flag set'),
1761 1672 }
1762 1673
1763 1674 if frame.streamid not in self._streamsettingsdecoders:
1764 1675 decoder = cborutil.bufferingdecoder()
1765 1676 self._streamsettingsdecoders[frame.streamid] = decoder
1766 1677
1767 1678 decoder = self._streamsettingsdecoders[frame.streamid]
1768 1679
1769 1680 try:
1770 1681 decoder.decode(frame.payload)
1771 1682 except Exception as e:
1772 1683 return 'error', {
1773 1684 'message': (_('error decoding CBOR from stream encoding '
1774 1685 'settings frame: %s') %
1775 1686 stringutil.forcebytestr(e)),
1776 1687 }
1777 1688
1778 1689 if more:
1779 1690 return 'noop', {}
1780 1691
1781 1692 assert eos
1782 1693
1783 1694 decoded = decoder.getavailable()
1784 1695 del self._streamsettingsdecoders[frame.streamid]
1785 1696
1786 1697 if not decoded:
1787 1698 return 'error', {
1788 1699 'message': _('stream encoding settings frame did not contain '
1789 1700 'CBOR data'),
1790 1701 }
1791 1702
1792 1703 try:
1793 1704 self._incomingstreams[frame.streamid].setdecoder(self._ui,
1794 1705 decoded[0],
1795 1706 decoded[1:])
1796 1707 except Exception as e:
1797 1708 return 'error', {
1798 1709 'message': (_('error setting stream decoder: %s') %
1799 1710 stringutil.forcebytestr(e)),
1800 1711 }
1801 1712
1802 1713 return 'noop', {}
1803 1714
1804 1715 def _oncommandresponseframe(self, request, frame):
1805 1716 if frame.flags & FLAG_COMMAND_RESPONSE_EOS:
1806 1717 request.state = 'received'
1807 1718 del self._activerequests[request.requestid]
1808 1719
1809 1720 return 'responsedata', {
1810 1721 'request': request,
1811 1722 'expectmore': frame.flags & FLAG_COMMAND_RESPONSE_CONTINUATION,
1812 1723 'eos': frame.flags & FLAG_COMMAND_RESPONSE_EOS,
1813 1724 'data': frame.payload,
1814 1725 }
1815 1726
1816 1727 def _onerrorresponseframe(self, request, frame):
1817 1728 request.state = 'errored'
1818 1729 del self._activerequests[request.requestid]
1819 1730
1820 1731 # The payload should be a CBOR map.
1821 1732 m = cborutil.decodeall(frame.payload)[0]
1822 1733
1823 1734 return 'error', {
1824 1735 'request': request,
1825 1736 'type': m['type'],
1826 1737 'message': m['message'],
1827 1738 }
@@ -1,604 +1,629
1 1 from __future__ import absolute_import, print_function
2 2
3 3 import unittest
4 4
5 5 from mercurial.thirdparty import (
6 6 cbor,
7 7 )
8 8 from mercurial import (
9 9 ui as uimod,
10 10 util,
11 11 wireprotoframing as framing,
12 12 )
13 13 from mercurial.utils import (
14 14 cborutil,
15 15 )
16 16
17 17 ffs = framing.makeframefromhumanstring
18 18
19 19 OK = cbor.dumps({b'status': b'ok'})
20 20
21 21 def makereactor(deferoutput=False):
22 22 ui = uimod.ui()
23 23 return framing.serverreactor(ui, deferoutput=deferoutput)
24 24
25 25 def sendframes(reactor, gen):
26 26 """Send a generator of frame bytearray to a reactor.
27 27
28 28 Emits a generator of results from ``onframerecv()`` calls.
29 29 """
30 30 for frame in gen:
31 31 header = framing.parseheader(frame)
32 32 payload = frame[framing.FRAME_HEADER_SIZE:]
33 33 assert len(payload) == header.length
34 34
35 35 yield reactor.onframerecv(framing.frame(header.requestid,
36 36 header.streamid,
37 37 header.streamflags,
38 38 header.typeid,
39 39 header.flags,
40 40 payload))
41 41
42 42 def sendcommandframes(reactor, stream, rid, cmd, args, datafh=None):
43 43 """Generate frames to run a command and send them to a reactor."""
44 44 return sendframes(reactor,
45 45 framing.createcommandframes(stream, rid, cmd, args,
46 46 datafh))
47 47
48 48
49 49 class ServerReactorTests(unittest.TestCase):
50 50 def _sendsingleframe(self, reactor, f):
51 51 results = list(sendframes(reactor, [f]))
52 52 self.assertEqual(len(results), 1)
53 53
54 54 return results[0]
55 55
56 56 def assertaction(self, res, expected):
57 57 self.assertIsInstance(res, tuple)
58 58 self.assertEqual(len(res), 2)
59 59 self.assertIsInstance(res[1], dict)
60 60 self.assertEqual(res[0], expected)
61 61
62 62 def assertframesequal(self, frames, framestrings):
63 63 expected = [ffs(s) for s in framestrings]
64 64 self.assertEqual(list(frames), expected)
65 65
66 66 def test1framecommand(self):
67 67 """Receiving a command in a single frame yields request to run it."""
68 68 reactor = makereactor()
69 69 stream = framing.stream(1)
70 70 results = list(sendcommandframes(reactor, stream, 1, b'mycommand', {}))
71 71 self.assertEqual(len(results), 1)
72 72 self.assertaction(results[0], b'runcommand')
73 73 self.assertEqual(results[0][1], {
74 74 b'requestid': 1,
75 75 b'command': b'mycommand',
76 76 b'args': {},
77 77 b'redirect': None,
78 78 b'data': None,
79 79 })
80 80
81 81 result = reactor.oninputeof()
82 82 self.assertaction(result, b'noop')
83 83
84 84 def test1argument(self):
85 85 reactor = makereactor()
86 86 stream = framing.stream(1)
87 87 results = list(sendcommandframes(reactor, stream, 41, b'mycommand',
88 88 {b'foo': b'bar'}))
89 89 self.assertEqual(len(results), 1)
90 90 self.assertaction(results[0], b'runcommand')
91 91 self.assertEqual(results[0][1], {
92 92 b'requestid': 41,
93 93 b'command': b'mycommand',
94 94 b'args': {b'foo': b'bar'},
95 95 b'redirect': None,
96 96 b'data': None,
97 97 })
98 98
99 99 def testmultiarguments(self):
100 100 reactor = makereactor()
101 101 stream = framing.stream(1)
102 102 results = list(sendcommandframes(reactor, stream, 1, b'mycommand',
103 103 {b'foo': b'bar', b'biz': b'baz'}))
104 104 self.assertEqual(len(results), 1)
105 105 self.assertaction(results[0], b'runcommand')
106 106 self.assertEqual(results[0][1], {
107 107 b'requestid': 1,
108 108 b'command': b'mycommand',
109 109 b'args': {b'foo': b'bar', b'biz': b'baz'},
110 110 b'redirect': None,
111 111 b'data': None,
112 112 })
113 113
114 114 def testsimplecommanddata(self):
115 115 reactor = makereactor()
116 116 stream = framing.stream(1)
117 117 results = list(sendcommandframes(reactor, stream, 1, b'mycommand', {},
118 118 util.bytesio(b'data!')))
119 119 self.assertEqual(len(results), 2)
120 120 self.assertaction(results[0], b'wantframe')
121 121 self.assertaction(results[1], b'runcommand')
122 122 self.assertEqual(results[1][1], {
123 123 b'requestid': 1,
124 124 b'command': b'mycommand',
125 125 b'args': {},
126 126 b'redirect': None,
127 127 b'data': b'data!',
128 128 })
129 129
130 130 def testmultipledataframes(self):
131 131 frames = [
132 132 ffs(b'1 1 stream-begin command-request new|have-data '
133 133 b"cbor:{b'name': b'mycommand'}"),
134 134 ffs(b'1 1 0 command-data continuation data1'),
135 135 ffs(b'1 1 0 command-data continuation data2'),
136 136 ffs(b'1 1 0 command-data eos data3'),
137 137 ]
138 138
139 139 reactor = makereactor()
140 140 results = list(sendframes(reactor, frames))
141 141 self.assertEqual(len(results), 4)
142 142 for i in range(3):
143 143 self.assertaction(results[i], b'wantframe')
144 144 self.assertaction(results[3], b'runcommand')
145 145 self.assertEqual(results[3][1], {
146 146 b'requestid': 1,
147 147 b'command': b'mycommand',
148 148 b'args': {},
149 149 b'redirect': None,
150 150 b'data': b'data1data2data3',
151 151 })
152 152
153 153 def testargumentanddata(self):
154 154 frames = [
155 155 ffs(b'1 1 stream-begin command-request new|have-data '
156 156 b"cbor:{b'name': b'command', b'args': {b'key': b'val',"
157 157 b"b'foo': b'bar'}}"),
158 158 ffs(b'1 1 0 command-data continuation value1'),
159 159 ffs(b'1 1 0 command-data eos value2'),
160 160 ]
161 161
162 162 reactor = makereactor()
163 163 results = list(sendframes(reactor, frames))
164 164
165 165 self.assertaction(results[-1], b'runcommand')
166 166 self.assertEqual(results[-1][1], {
167 167 b'requestid': 1,
168 168 b'command': b'command',
169 169 b'args': {
170 170 b'key': b'val',
171 171 b'foo': b'bar',
172 172 },
173 173 b'redirect': None,
174 174 b'data': b'value1value2',
175 175 })
176 176
177 177 def testnewandcontinuation(self):
178 178 result = self._sendsingleframe(makereactor(),
179 179 ffs(b'1 1 stream-begin command-request new|continuation '))
180 180 self.assertaction(result, b'error')
181 181 self.assertEqual(result[1], {
182 182 b'message': b'received command request frame with both new and '
183 183 b'continuation flags set',
184 184 })
185 185
186 186 def testneithernewnorcontinuation(self):
187 187 result = self._sendsingleframe(makereactor(),
188 188 ffs(b'1 1 stream-begin command-request 0 '))
189 189 self.assertaction(result, b'error')
190 190 self.assertEqual(result[1], {
191 191 b'message': b'received command request frame with neither new nor '
192 192 b'continuation flags set',
193 193 })
194 194
195 195 def testunexpectedcommanddata(self):
196 196 """Command data frame when not running a command is an error."""
197 197 result = self._sendsingleframe(makereactor(),
198 198 ffs(b'1 1 stream-begin command-data 0 ignored'))
199 199 self.assertaction(result, b'error')
200 200 self.assertEqual(result[1], {
201 201 b'message': b'expected sender protocol settings or command request '
202 202 b'frame; got 2',
203 203 })
204 204
205 205 def testunexpectedcommanddatareceiving(self):
206 206 """Same as above except the command is receiving."""
207 207 results = list(sendframes(makereactor(), [
208 208 ffs(b'1 1 stream-begin command-request new|more '
209 209 b"cbor:{b'name': b'ignored'}"),
210 210 ffs(b'1 1 0 command-data eos ignored'),
211 211 ]))
212 212
213 213 self.assertaction(results[0], b'wantframe')
214 214 self.assertaction(results[1], b'error')
215 215 self.assertEqual(results[1][1], {
216 216 b'message': b'received command data frame for request that is not '
217 217 b'expecting data: 1',
218 218 })
219 219
220 220 def testconflictingrequestidallowed(self):
221 221 """Multiple fully serviced commands with same request ID is allowed."""
222 222 reactor = makereactor()
223 223 results = []
224 224 outstream = reactor.makeoutputstream()
225 225 results.append(self._sendsingleframe(
226 226 reactor, ffs(b'1 1 stream-begin command-request new '
227 227 b"cbor:{b'name': b'command'}")))
228 result = reactor.oncommandresponseready(outstream, 1, b'response1')
228 result = reactor.oncommandresponsereadyobjects(
229 outstream, 1, [b'response1'])
229 230 self.assertaction(result, b'sendframes')
230 231 list(result[1][b'framegen'])
231 232 results.append(self._sendsingleframe(
232 233 reactor, ffs(b'1 1 stream-begin command-request new '
233 234 b"cbor:{b'name': b'command'}")))
234 result = reactor.oncommandresponseready(outstream, 1, b'response2')
235 result = reactor.oncommandresponsereadyobjects(
236 outstream, 1, [b'response2'])
235 237 self.assertaction(result, b'sendframes')
236 238 list(result[1][b'framegen'])
237 239 results.append(self._sendsingleframe(
238 240 reactor, ffs(b'1 1 stream-begin command-request new '
239 241 b"cbor:{b'name': b'command'}")))
240 result = reactor.oncommandresponseready(outstream, 1, b'response3')
242 result = reactor.oncommandresponsereadyobjects(
243 outstream, 1, [b'response3'])
241 244 self.assertaction(result, b'sendframes')
242 245 list(result[1][b'framegen'])
243 246
244 247 for i in range(3):
245 248 self.assertaction(results[i], b'runcommand')
246 249 self.assertEqual(results[i][1], {
247 250 b'requestid': 1,
248 251 b'command': b'command',
249 252 b'args': {},
250 253 b'redirect': None,
251 254 b'data': None,
252 255 })
253 256
254 257 def testconflictingrequestid(self):
255 258 """Request ID for new command matching in-flight command is illegal."""
256 259 results = list(sendframes(makereactor(), [
257 260 ffs(b'1 1 stream-begin command-request new|more '
258 261 b"cbor:{b'name': b'command'}"),
259 262 ffs(b'1 1 0 command-request new '
260 263 b"cbor:{b'name': b'command1'}"),
261 264 ]))
262 265
263 266 self.assertaction(results[0], b'wantframe')
264 267 self.assertaction(results[1], b'error')
265 268 self.assertEqual(results[1][1], {
266 269 b'message': b'request with ID 1 already received',
267 270 })
268 271
269 272 def testinterleavedcommands(self):
270 273 cbor1 = cbor.dumps({
271 274 b'name': b'command1',
272 275 b'args': {
273 276 b'foo': b'bar',
274 277 b'key1': b'val',
275 278 }
276 279 }, canonical=True)
277 280 cbor3 = cbor.dumps({
278 281 b'name': b'command3',
279 282 b'args': {
280 283 b'biz': b'baz',
281 284 b'key': b'val',
282 285 },
283 286 }, canonical=True)
284 287
285 288 results = list(sendframes(makereactor(), [
286 289 ffs(b'1 1 stream-begin command-request new|more %s' % cbor1[0:6]),
287 290 ffs(b'3 1 0 command-request new|more %s' % cbor3[0:10]),
288 291 ffs(b'1 1 0 command-request continuation|more %s' % cbor1[6:9]),
289 292 ffs(b'3 1 0 command-request continuation|more %s' % cbor3[10:13]),
290 293 ffs(b'3 1 0 command-request continuation %s' % cbor3[13:]),
291 294 ffs(b'1 1 0 command-request continuation %s' % cbor1[9:]),
292 295 ]))
293 296
294 297 self.assertEqual([t[0] for t in results], [
295 298 b'wantframe',
296 299 b'wantframe',
297 300 b'wantframe',
298 301 b'wantframe',
299 302 b'runcommand',
300 303 b'runcommand',
301 304 ])
302 305
303 306 self.assertEqual(results[4][1], {
304 307 b'requestid': 3,
305 308 b'command': b'command3',
306 309 b'args': {b'biz': b'baz', b'key': b'val'},
307 310 b'redirect': None,
308 311 b'data': None,
309 312 })
310 313 self.assertEqual(results[5][1], {
311 314 b'requestid': 1,
312 315 b'command': b'command1',
313 316 b'args': {b'foo': b'bar', b'key1': b'val'},
314 317 b'redirect': None,
315 318 b'data': None,
316 319 })
317 320
318 321 def testmissingcommanddataframe(self):
319 322 # The reactor doesn't currently handle partially received commands.
320 323 # So this test is failing to do anything with request 1.
321 324 frames = [
322 325 ffs(b'1 1 stream-begin command-request new|have-data '
323 326 b"cbor:{b'name': b'command1'}"),
324 327 ffs(b'3 1 0 command-request new '
325 328 b"cbor:{b'name': b'command2'}"),
326 329 ]
327 330 results = list(sendframes(makereactor(), frames))
328 331 self.assertEqual(len(results), 2)
329 332 self.assertaction(results[0], b'wantframe')
330 333 self.assertaction(results[1], b'runcommand')
331 334
332 335 def testmissingcommanddataframeflags(self):
333 336 frames = [
334 337 ffs(b'1 1 stream-begin command-request new|have-data '
335 338 b"cbor:{b'name': b'command1'}"),
336 339 ffs(b'1 1 0 command-data 0 data'),
337 340 ]
338 341 results = list(sendframes(makereactor(), frames))
339 342 self.assertEqual(len(results), 2)
340 343 self.assertaction(results[0], b'wantframe')
341 344 self.assertaction(results[1], b'error')
342 345 self.assertEqual(results[1][1], {
343 346 b'message': b'command data frame without flags',
344 347 })
345 348
346 349 def testframefornonreceivingrequest(self):
347 350 """Receiving a frame for a command that is not receiving is illegal."""
348 351 results = list(sendframes(makereactor(), [
349 352 ffs(b'1 1 stream-begin command-request new '
350 353 b"cbor:{b'name': b'command1'}"),
351 354 ffs(b'3 1 0 command-request new|have-data '
352 355 b"cbor:{b'name': b'command3'}"),
353 356 ffs(b'5 1 0 command-data eos ignored'),
354 357 ]))
355 358 self.assertaction(results[2], b'error')
356 359 self.assertEqual(results[2][1], {
357 360 b'message': b'received frame for request that is not receiving: 5',
358 361 })
359 362
360 363 def testsimpleresponse(self):
361 364 """Bytes response to command sends result frames."""
362 365 reactor = makereactor()
363 366 instream = framing.stream(1)
364 367 list(sendcommandframes(reactor, instream, 1, b'mycommand', {}))
365 368
366 369 outstream = reactor.makeoutputstream()
367 result = reactor.oncommandresponseready(outstream, 1, b'response')
370 result = reactor.oncommandresponsereadyobjects(
371 outstream, 1, [b'response'])
368 372 self.assertaction(result, b'sendframes')
369 373 self.assertframesequal(result[1][b'framegen'], [
370 b'1 2 stream-begin command-response eos %sresponse' % OK,
374 b'1 2 stream-begin command-response continuation %s' % OK,
375 b'1 2 0 command-response continuation cbor:b"response"',
376 b'1 2 0 command-response eos ',
371 377 ])
372 378
373 379 def testmultiframeresponse(self):
374 380 """Bytes response spanning multiple frames is handled."""
375 381 first = b'x' * framing.DEFAULT_MAX_FRAME_SIZE
376 382 second = b'y' * 100
377 383
378 384 reactor = makereactor()
379 385 instream = framing.stream(1)
380 386 list(sendcommandframes(reactor, instream, 1, b'mycommand', {}))
381 387
382 388 outstream = reactor.makeoutputstream()
383 result = reactor.oncommandresponseready(outstream, 1, first + second)
389 result = reactor.oncommandresponsereadyobjects(
390 outstream, 1, [first + second])
384 391 self.assertaction(result, b'sendframes')
385 392 self.assertframesequal(result[1][b'framegen'], [
386 393 b'1 2 stream-begin command-response continuation %s' % OK,
394 b'1 2 0 command-response continuation Y\x80d',
387 395 b'1 2 0 command-response continuation %s' % first,
388 b'1 2 0 command-response eos %s' % second,
396 b'1 2 0 command-response continuation %s' % second,
397 b'1 2 0 command-response continuation ',
398 b'1 2 0 command-response eos '
389 399 ])
390 400
391 401 def testservererror(self):
392 402 reactor = makereactor()
393 403 instream = framing.stream(1)
394 404 list(sendcommandframes(reactor, instream, 1, b'mycommand', {}))
395 405
396 406 outstream = reactor.makeoutputstream()
397 407 result = reactor.onservererror(outstream, 1, b'some message')
398 408 self.assertaction(result, b'sendframes')
399 409 self.assertframesequal(result[1][b'framegen'], [
400 410 b"1 2 stream-begin error-response 0 "
401 411 b"cbor:{b'type': b'server', "
402 412 b"b'message': [{b'msg': b'some message'}]}",
403 413 ])
404 414
405 415 def test1commanddeferresponse(self):
406 416 """Responses when in deferred output mode are delayed until EOF."""
407 417 reactor = makereactor(deferoutput=True)
408 418 instream = framing.stream(1)
409 419 results = list(sendcommandframes(reactor, instream, 1, b'mycommand',
410 420 {}))
411 421 self.assertEqual(len(results), 1)
412 422 self.assertaction(results[0], b'runcommand')
413 423
414 424 outstream = reactor.makeoutputstream()
415 result = reactor.oncommandresponseready(outstream, 1, b'response')
425 result = reactor.oncommandresponsereadyobjects(
426 outstream, 1, [b'response'])
416 427 self.assertaction(result, b'noop')
417 428 result = reactor.oninputeof()
418 429 self.assertaction(result, b'sendframes')
419 430 self.assertframesequal(result[1][b'framegen'], [
420 b'1 2 stream-begin command-response eos %sresponse' % OK,
431 b'1 2 stream-begin command-response continuation %s' % OK,
432 b'1 2 0 command-response continuation cbor:b"response"',
433 b'1 2 0 command-response eos ',
421 434 ])
422 435
423 436 def testmultiplecommanddeferresponse(self):
424 437 reactor = makereactor(deferoutput=True)
425 438 instream = framing.stream(1)
426 439 list(sendcommandframes(reactor, instream, 1, b'command1', {}))
427 440 list(sendcommandframes(reactor, instream, 3, b'command2', {}))
428 441
429 442 outstream = reactor.makeoutputstream()
430 result = reactor.oncommandresponseready(outstream, 1, b'response1')
443 result = reactor.oncommandresponsereadyobjects(
444 outstream, 1, [b'response1'])
431 445 self.assertaction(result, b'noop')
432 result = reactor.oncommandresponseready(outstream, 3, b'response2')
446 result = reactor.oncommandresponsereadyobjects(
447 outstream, 3, [b'response2'])
433 448 self.assertaction(result, b'noop')
434 449 result = reactor.oninputeof()
435 450 self.assertaction(result, b'sendframes')
436 451 self.assertframesequal(result[1][b'framegen'], [
437 b'1 2 stream-begin command-response eos %sresponse1' % OK,
438 b'3 2 0 command-response eos %sresponse2' % OK,
452 b'1 2 stream-begin command-response continuation %s' % OK,
453 b'1 2 0 command-response continuation cbor:b"response1"',
454 b'1 2 0 command-response eos ',
455 b'3 2 0 command-response continuation %s' % OK,
456 b'3 2 0 command-response continuation cbor:b"response2"',
457 b'3 2 0 command-response eos ',
439 458 ])
440 459
441 460 def testrequestidtracking(self):
442 461 reactor = makereactor(deferoutput=True)
443 462 instream = framing.stream(1)
444 463 list(sendcommandframes(reactor, instream, 1, b'command1', {}))
445 464 list(sendcommandframes(reactor, instream, 3, b'command2', {}))
446 465 list(sendcommandframes(reactor, instream, 5, b'command3', {}))
447 466
448 467 # Register results for commands out of order.
449 468 outstream = reactor.makeoutputstream()
450 reactor.oncommandresponseready(outstream, 3, b'response3')
451 reactor.oncommandresponseready(outstream, 1, b'response1')
452 reactor.oncommandresponseready(outstream, 5, b'response5')
469 reactor.oncommandresponsereadyobjects(outstream, 3, [b'response3'])
470 reactor.oncommandresponsereadyobjects(outstream, 1, [b'response1'])
471 reactor.oncommandresponsereadyobjects(outstream, 5, [b'response5'])
453 472
454 473 result = reactor.oninputeof()
455 474 self.assertaction(result, b'sendframes')
456 475 self.assertframesequal(result[1][b'framegen'], [
457 b'3 2 stream-begin command-response eos %sresponse3' % OK,
458 b'1 2 0 command-response eos %sresponse1' % OK,
459 b'5 2 0 command-response eos %sresponse5' % OK,
476 b'3 2 stream-begin command-response continuation %s' % OK,
477 b'3 2 0 command-response continuation cbor:b"response3"',
478 b'3 2 0 command-response eos ',
479 b'1 2 0 command-response continuation %s' % OK,
480 b'1 2 0 command-response continuation cbor:b"response1"',
481 b'1 2 0 command-response eos ',
482 b'5 2 0 command-response continuation %s' % OK,
483 b'5 2 0 command-response continuation cbor:b"response5"',
484 b'5 2 0 command-response eos ',
460 485 ])
461 486
462 487 def testduplicaterequestonactivecommand(self):
463 488 """Receiving a request ID that matches a request that isn't finished."""
464 489 reactor = makereactor()
465 490 stream = framing.stream(1)
466 491 list(sendcommandframes(reactor, stream, 1, b'command1', {}))
467 492 results = list(sendcommandframes(reactor, stream, 1, b'command1', {}))
468 493
469 494 self.assertaction(results[0], b'error')
470 495 self.assertEqual(results[0][1], {
471 496 b'message': b'request with ID 1 is already active',
472 497 })
473 498
474 499 def testduplicaterequestonactivecommandnosend(self):
475 500 """Same as above but we've registered a response but haven't sent it."""
476 501 reactor = makereactor()
477 502 instream = framing.stream(1)
478 503 list(sendcommandframes(reactor, instream, 1, b'command1', {}))
479 504 outstream = reactor.makeoutputstream()
480 reactor.oncommandresponseready(outstream, 1, b'response')
505 reactor.oncommandresponsereadyobjects(outstream, 1, [b'response'])
481 506
482 507 # We've registered the response but haven't sent it. From the
483 508 # perspective of the reactor, the command is still active.
484 509
485 510 results = list(sendcommandframes(reactor, instream, 1, b'command1', {}))
486 511 self.assertaction(results[0], b'error')
487 512 self.assertEqual(results[0][1], {
488 513 b'message': b'request with ID 1 is already active',
489 514 })
490 515
491 516 def testduplicaterequestaftersend(self):
492 517 """We can use a duplicate request ID after we've sent the response."""
493 518 reactor = makereactor()
494 519 instream = framing.stream(1)
495 520 list(sendcommandframes(reactor, instream, 1, b'command1', {}))
496 521 outstream = reactor.makeoutputstream()
497 res = reactor.oncommandresponseready(outstream, 1, b'response')
522 res = reactor.oncommandresponsereadyobjects(outstream, 1, [b'response'])
498 523 list(res[1][b'framegen'])
499 524
500 525 results = list(sendcommandframes(reactor, instream, 1, b'command1', {}))
501 526 self.assertaction(results[0], b'runcommand')
502 527
503 528 def testprotocolsettingsnoflags(self):
504 529 result = self._sendsingleframe(
505 530 makereactor(),
506 531 ffs(b'0 1 stream-begin sender-protocol-settings 0 '))
507 532 self.assertaction(result, b'error')
508 533 self.assertEqual(result[1], {
509 534 b'message': b'sender protocol settings frame must have '
510 535 b'continuation or end of stream flag set',
511 536 })
512 537
513 538 def testprotocolsettingsconflictflags(self):
514 539 result = self._sendsingleframe(
515 540 makereactor(),
516 541 ffs(b'0 1 stream-begin sender-protocol-settings continuation|eos '))
517 542 self.assertaction(result, b'error')
518 543 self.assertEqual(result[1], {
519 544 b'message': b'sender protocol settings frame cannot have both '
520 545 b'continuation and end of stream flags set',
521 546 })
522 547
523 548 def testprotocolsettingsemptypayload(self):
524 549 result = self._sendsingleframe(
525 550 makereactor(),
526 551 ffs(b'0 1 stream-begin sender-protocol-settings eos '))
527 552 self.assertaction(result, b'error')
528 553 self.assertEqual(result[1], {
529 554 b'message': b'sender protocol settings frame did not contain CBOR '
530 555 b'data',
531 556 })
532 557
533 558 def testprotocolsettingsmultipleobjects(self):
534 559 result = self._sendsingleframe(
535 560 makereactor(),
536 561 ffs(b'0 1 stream-begin sender-protocol-settings eos '
537 562 b'\x46foobar\x43foo'))
538 563 self.assertaction(result, b'error')
539 564 self.assertEqual(result[1], {
540 565 b'message': b'sender protocol settings frame contained multiple '
541 566 b'CBOR values',
542 567 })
543 568
544 569 def testprotocolsettingscontentencodings(self):
545 570 reactor = makereactor()
546 571
547 572 result = self._sendsingleframe(
548 573 reactor,
549 574 ffs(b'0 1 stream-begin sender-protocol-settings eos '
550 575 b'cbor:{b"contentencodings": [b"a", b"b"]}'))
551 576 self.assertaction(result, b'wantframe')
552 577
553 578 self.assertEqual(reactor._state, b'idle')
554 579 self.assertEqual(reactor._sendersettings[b'contentencodings'],
555 580 [b'a', b'b'])
556 581
557 582 def testprotocolsettingsmultipleframes(self):
558 583 reactor = makereactor()
559 584
560 585 data = b''.join(cborutil.streamencode({
561 586 b'contentencodings': [b'value1', b'value2'],
562 587 }))
563 588
564 589 results = list(sendframes(reactor, [
565 590 ffs(b'0 1 stream-begin sender-protocol-settings continuation %s' %
566 591 data[0:5]),
567 592 ffs(b'0 1 0 sender-protocol-settings eos %s' % data[5:]),
568 593 ]))
569 594
570 595 self.assertEqual(len(results), 2)
571 596
572 597 self.assertaction(results[0], b'wantframe')
573 598 self.assertaction(results[1], b'wantframe')
574 599
575 600 self.assertEqual(reactor._state, b'idle')
576 601 self.assertEqual(reactor._sendersettings[b'contentencodings'],
577 602 [b'value1', b'value2'])
578 603
579 604 def testprotocolsettingsbadcbor(self):
580 605 result = self._sendsingleframe(
581 606 makereactor(),
582 607 ffs(b'0 1 stream-begin sender-protocol-settings eos badvalue'))
583 608 self.assertaction(result, b'error')
584 609
585 610 def testprotocolsettingsnoninitial(self):
586 611 # Cannot have protocol settings frames as non-initial frames.
587 612 reactor = makereactor()
588 613
589 614 stream = framing.stream(1)
590 615 results = list(sendcommandframes(reactor, stream, 1, b'mycommand', {}))
591 616 self.assertEqual(len(results), 1)
592 617 self.assertaction(results[0], b'runcommand')
593 618
594 619 result = self._sendsingleframe(
595 620 reactor,
596 621 ffs(b'0 1 0 sender-protocol-settings eos '))
597 622 self.assertaction(result, b'error')
598 623 self.assertEqual(result[1], {
599 624 b'message': b'expected command request frame; got 8',
600 625 })
601 626
602 627 if __name__ == '__main__':
603 628 import silenttestrunner
604 629 silenttestrunner.main(__name__)
General Comments 0
You need to be logged in to leave comments. Login now