##// END OF EJS Templates
wireprotov2: handle sender protocol settings frames...
Gregory Szorc -
r40162:327d40b9 default
parent child Browse files
Show More
@@ -1,1407 +1,1497 b''
1 1 # wireprotoframing.py - unified framing protocol for wire protocol
2 2 #
3 3 # Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 # This file contains functionality to support the unified frame-based wire
9 9 # protocol. For details about the protocol, see
10 10 # `hg help internals.wireprotocol`.
11 11
12 12 from __future__ import absolute_import
13 13
14 14 import collections
15 15 import struct
16 16
17 17 from .i18n import _
18 18 from .thirdparty import (
19 19 attr,
20 20 )
21 21 from . import (
22 22 encoding,
23 23 error,
24 24 pycompat,
25 25 util,
26 26 wireprototypes,
27 27 )
28 28 from .utils import (
29 29 cborutil,
30 30 stringutil,
31 31 )
32 32
33 33 FRAME_HEADER_SIZE = 8
34 34 DEFAULT_MAX_FRAME_SIZE = 32768
35 35
36 36 STREAM_FLAG_BEGIN_STREAM = 0x01
37 37 STREAM_FLAG_END_STREAM = 0x02
38 38 STREAM_FLAG_ENCODING_APPLIED = 0x04
39 39
40 40 STREAM_FLAGS = {
41 41 b'stream-begin': STREAM_FLAG_BEGIN_STREAM,
42 42 b'stream-end': STREAM_FLAG_END_STREAM,
43 43 b'encoded': STREAM_FLAG_ENCODING_APPLIED,
44 44 }
45 45
46 46 FRAME_TYPE_COMMAND_REQUEST = 0x01
47 47 FRAME_TYPE_COMMAND_DATA = 0x02
48 48 FRAME_TYPE_COMMAND_RESPONSE = 0x03
49 49 FRAME_TYPE_ERROR_RESPONSE = 0x05
50 50 FRAME_TYPE_TEXT_OUTPUT = 0x06
51 51 FRAME_TYPE_PROGRESS = 0x07
52 52 FRAME_TYPE_SENDER_PROTOCOL_SETTINGS = 0x08
53 53 FRAME_TYPE_STREAM_SETTINGS = 0x09
54 54
55 55 FRAME_TYPES = {
56 56 b'command-request': FRAME_TYPE_COMMAND_REQUEST,
57 57 b'command-data': FRAME_TYPE_COMMAND_DATA,
58 58 b'command-response': FRAME_TYPE_COMMAND_RESPONSE,
59 59 b'error-response': FRAME_TYPE_ERROR_RESPONSE,
60 60 b'text-output': FRAME_TYPE_TEXT_OUTPUT,
61 61 b'progress': FRAME_TYPE_PROGRESS,
62 62 b'sender-protocol-settings': FRAME_TYPE_SENDER_PROTOCOL_SETTINGS,
63 63 b'stream-settings': FRAME_TYPE_STREAM_SETTINGS,
64 64 }
65 65
66 66 FLAG_COMMAND_REQUEST_NEW = 0x01
67 67 FLAG_COMMAND_REQUEST_CONTINUATION = 0x02
68 68 FLAG_COMMAND_REQUEST_MORE_FRAMES = 0x04
69 69 FLAG_COMMAND_REQUEST_EXPECT_DATA = 0x08
70 70
71 71 FLAGS_COMMAND_REQUEST = {
72 72 b'new': FLAG_COMMAND_REQUEST_NEW,
73 73 b'continuation': FLAG_COMMAND_REQUEST_CONTINUATION,
74 74 b'more': FLAG_COMMAND_REQUEST_MORE_FRAMES,
75 75 b'have-data': FLAG_COMMAND_REQUEST_EXPECT_DATA,
76 76 }
77 77
78 78 FLAG_COMMAND_DATA_CONTINUATION = 0x01
79 79 FLAG_COMMAND_DATA_EOS = 0x02
80 80
81 81 FLAGS_COMMAND_DATA = {
82 82 b'continuation': FLAG_COMMAND_DATA_CONTINUATION,
83 83 b'eos': FLAG_COMMAND_DATA_EOS,
84 84 }
85 85
86 86 FLAG_COMMAND_RESPONSE_CONTINUATION = 0x01
87 87 FLAG_COMMAND_RESPONSE_EOS = 0x02
88 88
89 89 FLAGS_COMMAND_RESPONSE = {
90 90 b'continuation': FLAG_COMMAND_RESPONSE_CONTINUATION,
91 91 b'eos': FLAG_COMMAND_RESPONSE_EOS,
92 92 }
93 93
94 94 FLAG_SENDER_PROTOCOL_SETTINGS_CONTINUATION = 0x01
95 95 FLAG_SENDER_PROTOCOL_SETTINGS_EOS = 0x02
96 96
97 97 FLAGS_SENDER_PROTOCOL_SETTINGS = {
98 98 b'continuation': FLAG_SENDER_PROTOCOL_SETTINGS_CONTINUATION,
99 99 b'eos': FLAG_SENDER_PROTOCOL_SETTINGS_EOS,
100 100 }
101 101
102 102 FLAG_STREAM_ENCODING_SETTINGS_CONTINUATION = 0x01
103 103 FLAG_STREAM_ENCODING_SETTINGS_EOS = 0x02
104 104
105 105 FLAGS_STREAM_ENCODING_SETTINGS = {
106 106 b'continuation': FLAG_STREAM_ENCODING_SETTINGS_CONTINUATION,
107 107 b'eos': FLAG_STREAM_ENCODING_SETTINGS_EOS,
108 108 }
109 109
110 110 # Maps frame types to their available flags.
111 111 FRAME_TYPE_FLAGS = {
112 112 FRAME_TYPE_COMMAND_REQUEST: FLAGS_COMMAND_REQUEST,
113 113 FRAME_TYPE_COMMAND_DATA: FLAGS_COMMAND_DATA,
114 114 FRAME_TYPE_COMMAND_RESPONSE: FLAGS_COMMAND_RESPONSE,
115 115 FRAME_TYPE_ERROR_RESPONSE: {},
116 116 FRAME_TYPE_TEXT_OUTPUT: {},
117 117 FRAME_TYPE_PROGRESS: {},
118 118 FRAME_TYPE_SENDER_PROTOCOL_SETTINGS: FLAGS_SENDER_PROTOCOL_SETTINGS,
119 119 FRAME_TYPE_STREAM_SETTINGS: FLAGS_STREAM_ENCODING_SETTINGS,
120 120 }
121 121
122 122 ARGUMENT_RECORD_HEADER = struct.Struct(r'<HH')
123 123
124 124 def humanflags(mapping, value):
125 125 """Convert a numeric flags value to a human value, using a mapping table."""
126 126 namemap = {v: k for k, v in mapping.iteritems()}
127 127 flags = []
128 128 val = 1
129 129 while value >= val:
130 130 if value & val:
131 131 flags.append(namemap.get(val, '<unknown 0x%02x>' % val))
132 132 val <<= 1
133 133
134 134 return b'|'.join(flags)
135 135
136 136 @attr.s(slots=True)
137 137 class frameheader(object):
138 138 """Represents the data in a frame header."""
139 139
140 140 length = attr.ib()
141 141 requestid = attr.ib()
142 142 streamid = attr.ib()
143 143 streamflags = attr.ib()
144 144 typeid = attr.ib()
145 145 flags = attr.ib()
146 146
147 147 @attr.s(slots=True, repr=False)
148 148 class frame(object):
149 149 """Represents a parsed frame."""
150 150
151 151 requestid = attr.ib()
152 152 streamid = attr.ib()
153 153 streamflags = attr.ib()
154 154 typeid = attr.ib()
155 155 flags = attr.ib()
156 156 payload = attr.ib()
157 157
158 158 @encoding.strmethod
159 159 def __repr__(self):
160 160 typename = '<unknown 0x%02x>' % self.typeid
161 161 for name, value in FRAME_TYPES.iteritems():
162 162 if value == self.typeid:
163 163 typename = name
164 164 break
165 165
166 166 return ('frame(size=%d; request=%d; stream=%d; streamflags=%s; '
167 167 'type=%s; flags=%s)' % (
168 168 len(self.payload), self.requestid, self.streamid,
169 169 humanflags(STREAM_FLAGS, self.streamflags), typename,
170 170 humanflags(FRAME_TYPE_FLAGS.get(self.typeid, {}), self.flags)))
171 171
172 172 def makeframe(requestid, streamid, streamflags, typeid, flags, payload):
173 173 """Assemble a frame into a byte array."""
174 174 # TODO assert size of payload.
175 175 frame = bytearray(FRAME_HEADER_SIZE + len(payload))
176 176
177 177 # 24 bits length
178 178 # 16 bits request id
179 179 # 8 bits stream id
180 180 # 8 bits stream flags
181 181 # 4 bits type
182 182 # 4 bits flags
183 183
184 184 l = struct.pack(r'<I', len(payload))
185 185 frame[0:3] = l[0:3]
186 186 struct.pack_into(r'<HBB', frame, 3, requestid, streamid, streamflags)
187 187 frame[7] = (typeid << 4) | flags
188 188 frame[8:] = payload
189 189
190 190 return frame
191 191
192 192 def makeframefromhumanstring(s):
193 193 """Create a frame from a human readable string
194 194
195 195 Strings have the form:
196 196
197 197 <request-id> <stream-id> <stream-flags> <type> <flags> <payload>
198 198
199 199 This can be used by user-facing applications and tests for creating
200 200 frames easily without having to type out a bunch of constants.
201 201
202 202 Request ID and stream IDs are integers.
203 203
204 204 Stream flags, frame type, and flags can be specified by integer or
205 205 named constant.
206 206
207 207 Flags can be delimited by `|` to bitwise OR them together.
208 208
209 209 If the payload begins with ``cbor:``, the following string will be
210 210 evaluated as Python literal and the resulting object will be fed into
211 211 a CBOR encoder. Otherwise, the payload is interpreted as a Python
212 212 byte string literal.
213 213 """
214 214 fields = s.split(b' ', 5)
215 215 requestid, streamid, streamflags, frametype, frameflags, payload = fields
216 216
217 217 requestid = int(requestid)
218 218 streamid = int(streamid)
219 219
220 220 finalstreamflags = 0
221 221 for flag in streamflags.split(b'|'):
222 222 if flag in STREAM_FLAGS:
223 223 finalstreamflags |= STREAM_FLAGS[flag]
224 224 else:
225 225 finalstreamflags |= int(flag)
226 226
227 227 if frametype in FRAME_TYPES:
228 228 frametype = FRAME_TYPES[frametype]
229 229 else:
230 230 frametype = int(frametype)
231 231
232 232 finalflags = 0
233 233 validflags = FRAME_TYPE_FLAGS[frametype]
234 234 for flag in frameflags.split(b'|'):
235 235 if flag in validflags:
236 236 finalflags |= validflags[flag]
237 237 else:
238 238 finalflags |= int(flag)
239 239
240 240 if payload.startswith(b'cbor:'):
241 241 payload = b''.join(cborutil.streamencode(
242 242 stringutil.evalpythonliteral(payload[5:])))
243 243
244 244 else:
245 245 payload = stringutil.unescapestr(payload)
246 246
247 247 return makeframe(requestid=requestid, streamid=streamid,
248 248 streamflags=finalstreamflags, typeid=frametype,
249 249 flags=finalflags, payload=payload)
250 250
251 251 def parseheader(data):
252 252 """Parse a unified framing protocol frame header from a buffer.
253 253
254 254 The header is expected to be in the buffer at offset 0 and the
255 255 buffer is expected to be large enough to hold a full header.
256 256 """
257 257 # 24 bits payload length (little endian)
258 258 # 16 bits request ID
259 259 # 8 bits stream ID
260 260 # 8 bits stream flags
261 261 # 4 bits frame type
262 262 # 4 bits frame flags
263 263 # ... payload
264 264 framelength = data[0] + 256 * data[1] + 16384 * data[2]
265 265 requestid, streamid, streamflags = struct.unpack_from(r'<HBB', data, 3)
266 266 typeflags = data[7]
267 267
268 268 frametype = (typeflags & 0xf0) >> 4
269 269 frameflags = typeflags & 0x0f
270 270
271 271 return frameheader(framelength, requestid, streamid, streamflags,
272 272 frametype, frameflags)
273 273
274 274 def readframe(fh):
275 275 """Read a unified framing protocol frame from a file object.
276 276
277 277 Returns a 3-tuple of (type, flags, payload) for the decoded frame or
278 278 None if no frame is available. May raise if a malformed frame is
279 279 seen.
280 280 """
281 281 header = bytearray(FRAME_HEADER_SIZE)
282 282
283 283 readcount = fh.readinto(header)
284 284
285 285 if readcount == 0:
286 286 return None
287 287
288 288 if readcount != FRAME_HEADER_SIZE:
289 289 raise error.Abort(_('received incomplete frame: got %d bytes: %s') %
290 290 (readcount, header))
291 291
292 292 h = parseheader(header)
293 293
294 294 payload = fh.read(h.length)
295 295 if len(payload) != h.length:
296 296 raise error.Abort(_('frame length error: expected %d; got %d') %
297 297 (h.length, len(payload)))
298 298
299 299 return frame(h.requestid, h.streamid, h.streamflags, h.typeid, h.flags,
300 300 payload)
301 301
302 302 def createcommandframes(stream, requestid, cmd, args, datafh=None,
303 303 maxframesize=DEFAULT_MAX_FRAME_SIZE,
304 304 redirect=None):
305 305 """Create frames necessary to transmit a request to run a command.
306 306
307 307 This is a generator of bytearrays. Each item represents a frame
308 308 ready to be sent over the wire to a peer.
309 309 """
310 310 data = {b'name': cmd}
311 311 if args:
312 312 data[b'args'] = args
313 313
314 314 if redirect:
315 315 data[b'redirect'] = redirect
316 316
317 317 data = b''.join(cborutil.streamencode(data))
318 318
319 319 offset = 0
320 320
321 321 while True:
322 322 flags = 0
323 323
324 324 # Must set new or continuation flag.
325 325 if not offset:
326 326 flags |= FLAG_COMMAND_REQUEST_NEW
327 327 else:
328 328 flags |= FLAG_COMMAND_REQUEST_CONTINUATION
329 329
330 330 # Data frames is set on all frames.
331 331 if datafh:
332 332 flags |= FLAG_COMMAND_REQUEST_EXPECT_DATA
333 333
334 334 payload = data[offset:offset + maxframesize]
335 335 offset += len(payload)
336 336
337 337 if len(payload) == maxframesize and offset < len(data):
338 338 flags |= FLAG_COMMAND_REQUEST_MORE_FRAMES
339 339
340 340 yield stream.makeframe(requestid=requestid,
341 341 typeid=FRAME_TYPE_COMMAND_REQUEST,
342 342 flags=flags,
343 343 payload=payload)
344 344
345 345 if not (flags & FLAG_COMMAND_REQUEST_MORE_FRAMES):
346 346 break
347 347
348 348 if datafh:
349 349 while True:
350 350 data = datafh.read(DEFAULT_MAX_FRAME_SIZE)
351 351
352 352 done = False
353 353 if len(data) == DEFAULT_MAX_FRAME_SIZE:
354 354 flags = FLAG_COMMAND_DATA_CONTINUATION
355 355 else:
356 356 flags = FLAG_COMMAND_DATA_EOS
357 357 assert datafh.read(1) == b''
358 358 done = True
359 359
360 360 yield stream.makeframe(requestid=requestid,
361 361 typeid=FRAME_TYPE_COMMAND_DATA,
362 362 flags=flags,
363 363 payload=data)
364 364
365 365 if done:
366 366 break
367 367
368 368 def createcommandresponseframesfrombytes(stream, requestid, data,
369 369 maxframesize=DEFAULT_MAX_FRAME_SIZE):
370 370 """Create a raw frame to send a bytes response from static bytes input.
371 371
372 372 Returns a generator of bytearrays.
373 373 """
374 374 # Automatically send the overall CBOR response map.
375 375 overall = b''.join(cborutil.streamencode({b'status': b'ok'}))
376 376 if len(overall) > maxframesize:
377 377 raise error.ProgrammingError('not yet implemented')
378 378
379 379 # Simple case where we can fit the full response in a single frame.
380 380 if len(overall) + len(data) <= maxframesize:
381 381 flags = FLAG_COMMAND_RESPONSE_EOS
382 382 yield stream.makeframe(requestid=requestid,
383 383 typeid=FRAME_TYPE_COMMAND_RESPONSE,
384 384 flags=flags,
385 385 payload=overall + data)
386 386 return
387 387
388 388 # It's easier to send the overall CBOR map in its own frame than to track
389 389 # offsets.
390 390 yield stream.makeframe(requestid=requestid,
391 391 typeid=FRAME_TYPE_COMMAND_RESPONSE,
392 392 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
393 393 payload=overall)
394 394
395 395 offset = 0
396 396 while True:
397 397 chunk = data[offset:offset + maxframesize]
398 398 offset += len(chunk)
399 399 done = offset == len(data)
400 400
401 401 if done:
402 402 flags = FLAG_COMMAND_RESPONSE_EOS
403 403 else:
404 404 flags = FLAG_COMMAND_RESPONSE_CONTINUATION
405 405
406 406 yield stream.makeframe(requestid=requestid,
407 407 typeid=FRAME_TYPE_COMMAND_RESPONSE,
408 408 flags=flags,
409 409 payload=chunk)
410 410
411 411 if done:
412 412 break
413 413
414 414 def createbytesresponseframesfromgen(stream, requestid, gen,
415 415 maxframesize=DEFAULT_MAX_FRAME_SIZE):
416 416 """Generator of frames from a generator of byte chunks.
417 417
418 418 This assumes that another frame will follow whatever this emits. i.e.
419 419 this always emits the continuation flag and never emits the end-of-stream
420 420 flag.
421 421 """
422 422 cb = util.chunkbuffer(gen)
423 423 flags = FLAG_COMMAND_RESPONSE_CONTINUATION
424 424
425 425 while True:
426 426 chunk = cb.read(maxframesize)
427 427 if not chunk:
428 428 break
429 429
430 430 yield stream.makeframe(requestid=requestid,
431 431 typeid=FRAME_TYPE_COMMAND_RESPONSE,
432 432 flags=flags,
433 433 payload=chunk)
434 434
435 435 flags |= FLAG_COMMAND_RESPONSE_CONTINUATION
436 436
437 437 def createcommandresponseokframe(stream, requestid):
438 438 overall = b''.join(cborutil.streamencode({b'status': b'ok'}))
439 439
440 440 return stream.makeframe(requestid=requestid,
441 441 typeid=FRAME_TYPE_COMMAND_RESPONSE,
442 442 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
443 443 payload=overall)
444 444
445 445 def createcommandresponseeosframe(stream, requestid):
446 446 """Create an empty payload frame representing command end-of-stream."""
447 447 return stream.makeframe(requestid=requestid,
448 448 typeid=FRAME_TYPE_COMMAND_RESPONSE,
449 449 flags=FLAG_COMMAND_RESPONSE_EOS,
450 450 payload=b'')
451 451
452 452 def createalternatelocationresponseframe(stream, requestid, location):
453 453 data = {
454 454 b'status': b'redirect',
455 455 b'location': {
456 456 b'url': location.url,
457 457 b'mediatype': location.mediatype,
458 458 }
459 459 }
460 460
461 461 for a in (r'size', r'fullhashes', r'fullhashseed', r'serverdercerts',
462 462 r'servercadercerts'):
463 463 value = getattr(location, a)
464 464 if value is not None:
465 465 data[b'location'][pycompat.bytestr(a)] = value
466 466
467 467 return stream.makeframe(requestid=requestid,
468 468 typeid=FRAME_TYPE_COMMAND_RESPONSE,
469 469 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
470 470 payload=b''.join(cborutil.streamencode(data)))
471 471
472 472 def createcommanderrorresponse(stream, requestid, message, args=None):
473 473 # TODO should this be using a list of {'msg': ..., 'args': {}} so atom
474 474 # formatting works consistently?
475 475 m = {
476 476 b'status': b'error',
477 477 b'error': {
478 478 b'message': message,
479 479 }
480 480 }
481 481
482 482 if args:
483 483 m[b'error'][b'args'] = args
484 484
485 485 overall = b''.join(cborutil.streamencode(m))
486 486
487 487 yield stream.makeframe(requestid=requestid,
488 488 typeid=FRAME_TYPE_COMMAND_RESPONSE,
489 489 flags=FLAG_COMMAND_RESPONSE_EOS,
490 490 payload=overall)
491 491
492 492 def createerrorframe(stream, requestid, msg, errtype):
493 493 # TODO properly handle frame size limits.
494 494 assert len(msg) <= DEFAULT_MAX_FRAME_SIZE
495 495
496 496 payload = b''.join(cborutil.streamencode({
497 497 b'type': errtype,
498 498 b'message': [{b'msg': msg}],
499 499 }))
500 500
501 501 yield stream.makeframe(requestid=requestid,
502 502 typeid=FRAME_TYPE_ERROR_RESPONSE,
503 503 flags=0,
504 504 payload=payload)
505 505
506 506 def createtextoutputframe(stream, requestid, atoms,
507 507 maxframesize=DEFAULT_MAX_FRAME_SIZE):
508 508 """Create a text output frame to render text to people.
509 509
510 510 ``atoms`` is a 3-tuple of (formatting string, args, labels).
511 511
512 512 The formatting string contains ``%s`` tokens to be replaced by the
513 513 corresponding indexed entry in ``args``. ``labels`` is an iterable of
514 514 formatters to be applied at rendering time. In terms of the ``ui``
515 515 class, each atom corresponds to a ``ui.write()``.
516 516 """
517 517 atomdicts = []
518 518
519 519 for (formatting, args, labels) in atoms:
520 520 # TODO look for localstr, other types here?
521 521
522 522 if not isinstance(formatting, bytes):
523 523 raise ValueError('must use bytes formatting strings')
524 524 for arg in args:
525 525 if not isinstance(arg, bytes):
526 526 raise ValueError('must use bytes for arguments')
527 527 for label in labels:
528 528 if not isinstance(label, bytes):
529 529 raise ValueError('must use bytes for labels')
530 530
531 531 # Formatting string must be ASCII.
532 532 formatting = formatting.decode(r'ascii', r'replace').encode(r'ascii')
533 533
534 534 # Arguments must be UTF-8.
535 535 args = [a.decode(r'utf-8', r'replace').encode(r'utf-8') for a in args]
536 536
537 537 # Labels must be ASCII.
538 538 labels = [l.decode(r'ascii', r'strict').encode(r'ascii')
539 539 for l in labels]
540 540
541 541 atom = {b'msg': formatting}
542 542 if args:
543 543 atom[b'args'] = args
544 544 if labels:
545 545 atom[b'labels'] = labels
546 546
547 547 atomdicts.append(atom)
548 548
549 549 payload = b''.join(cborutil.streamencode(atomdicts))
550 550
551 551 if len(payload) > maxframesize:
552 552 raise ValueError('cannot encode data in a single frame')
553 553
554 554 yield stream.makeframe(requestid=requestid,
555 555 typeid=FRAME_TYPE_TEXT_OUTPUT,
556 556 flags=0,
557 557 payload=payload)
558 558
559 559 class bufferingcommandresponseemitter(object):
560 560 """Helper object to emit command response frames intelligently.
561 561
562 562 Raw command response data is likely emitted in chunks much smaller
563 563 than what can fit in a single frame. This class exists to buffer
564 564 chunks until enough data is available to fit in a single frame.
565 565
566 566 TODO we'll need something like this when compression is supported.
567 567 So it might make sense to implement this functionality at the stream
568 568 level.
569 569 """
570 570 def __init__(self, stream, requestid, maxframesize=DEFAULT_MAX_FRAME_SIZE):
571 571 self._stream = stream
572 572 self._requestid = requestid
573 573 self._maxsize = maxframesize
574 574 self._chunks = []
575 575 self._chunkssize = 0
576 576
577 577 def send(self, data):
578 578 """Send new data for emission.
579 579
580 580 Is a generator of new frames that were derived from the new input.
581 581
582 582 If the special input ``None`` is received, flushes all buffered
583 583 data to frames.
584 584 """
585 585
586 586 if data is None:
587 587 for frame in self._flush():
588 588 yield frame
589 589 return
590 590
591 591 # There is a ton of potential to do more complicated things here.
592 592 # Our immediate goal is to coalesce small chunks into big frames,
593 593 # not achieve the fewest number of frames possible. So we go with
594 594 # a simple implementation:
595 595 #
596 596 # * If a chunk is too large for a frame, we flush and emit frames
597 597 # for the new chunk.
598 598 # * If a chunk can be buffered without total buffered size limits
599 599 # being exceeded, we do that.
600 600 # * If a chunk causes us to go over our buffering limit, we flush
601 601 # and then buffer the new chunk.
602 602
603 603 if len(data) > self._maxsize:
604 604 for frame in self._flush():
605 605 yield frame
606 606
607 607 # Now emit frames for the big chunk.
608 608 offset = 0
609 609 while True:
610 610 chunk = data[offset:offset + self._maxsize]
611 611 offset += len(chunk)
612 612
613 613 yield self._stream.makeframe(
614 614 self._requestid,
615 615 typeid=FRAME_TYPE_COMMAND_RESPONSE,
616 616 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
617 617 payload=chunk)
618 618
619 619 if offset == len(data):
620 620 return
621 621
622 622 # If we don't have enough to constitute a full frame, buffer and
623 623 # return.
624 624 if len(data) + self._chunkssize < self._maxsize:
625 625 self._chunks.append(data)
626 626 self._chunkssize += len(data)
627 627 return
628 628
629 629 # Else flush what we have and buffer the new chunk. We could do
630 630 # something more intelligent here, like break the chunk. Let's
631 631 # keep things simple for now.
632 632 for frame in self._flush():
633 633 yield frame
634 634
635 635 self._chunks.append(data)
636 636 self._chunkssize = len(data)
637 637
638 638 def _flush(self):
639 639 payload = b''.join(self._chunks)
640 640 assert len(payload) <= self._maxsize
641 641
642 642 self._chunks[:] = []
643 643 self._chunkssize = 0
644 644
645 645 yield self._stream.makeframe(
646 646 self._requestid,
647 647 typeid=FRAME_TYPE_COMMAND_RESPONSE,
648 648 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
649 649 payload=payload)
650 650
651 651 class stream(object):
652 652 """Represents a logical unidirectional series of frames."""
653 653
654 654 def __init__(self, streamid, active=False):
655 655 self.streamid = streamid
656 656 self._active = active
657 657
658 658 def makeframe(self, requestid, typeid, flags, payload):
659 659 """Create a frame to be sent out over this stream.
660 660
661 661 Only returns the frame instance. Does not actually send it.
662 662 """
663 663 streamflags = 0
664 664 if not self._active:
665 665 streamflags |= STREAM_FLAG_BEGIN_STREAM
666 666 self._active = True
667 667
668 668 return makeframe(requestid, self.streamid, streamflags, typeid, flags,
669 669 payload)
670 670
671 671 def ensureserverstream(stream):
672 672 if stream.streamid % 2:
673 673 raise error.ProgrammingError('server should only write to even '
674 674 'numbered streams; %d is not even' %
675 675 stream.streamid)
676 676
677 DEFAULT_PROTOCOL_SETTINGS = {
678 'contentencodings': [b'identity'],
679 }
680
677 681 class serverreactor(object):
678 682 """Holds state of a server handling frame-based protocol requests.
679 683
680 684 This class is the "brain" of the unified frame-based protocol server
681 685 component. While the protocol is stateless from the perspective of
682 686 requests/commands, something needs to track which frames have been
683 687 received, what frames to expect, etc. This class is that thing.
684 688
685 689 Instances are modeled as a state machine of sorts. Instances are also
686 690 reactionary to external events. The point of this class is to encapsulate
687 691 the state of the connection and the exchange of frames, not to perform
688 692 work. Instead, callers tell this class when something occurs, like a
689 693 frame arriving. If that activity is worthy of a follow-up action (say
690 694 *run a command*), the return value of that handler will say so.
691 695
692 696 I/O and CPU intensive operations are purposefully delegated outside of
693 697 this class.
694 698
695 699 Consumers are expected to tell instances when events occur. They do so by
696 700 calling the various ``on*`` methods. These methods return a 2-tuple
697 701 describing any follow-up action(s) to take. The first element is the
698 702 name of an action to perform. The second is a data structure (usually
699 703 a dict) specific to that action that contains more information. e.g.
700 704 if the server wants to send frames back to the client, the data structure
701 705 will contain a reference to those frames.
702 706
703 707 Valid actions that consumers can be instructed to take are:
704 708
705 709 sendframes
706 710 Indicates that frames should be sent to the client. The ``framegen``
707 711 key contains a generator of frames that should be sent. The server
708 712 assumes that all frames are sent to the client.
709 713
710 714 error
711 715 Indicates that an error occurred. Consumer should probably abort.
712 716
713 717 runcommand
714 718 Indicates that the consumer should run a wire protocol command. Details
715 719 of the command to run are given in the data structure.
716 720
717 721 wantframe
718 722 Indicates that nothing of interest happened and the server is waiting on
719 723 more frames from the client before anything interesting can be done.
720 724
721 725 noop
722 726 Indicates no additional action is required.
723 727
724 728 Known Issues
725 729 ------------
726 730
727 731 There are no limits to the number of partially received commands or their
728 732 size. A malicious client could stream command request data and exhaust the
729 733 server's memory.
730 734
731 735 Partially received commands are not acted upon when end of input is
732 736 reached. Should the server error if it receives a partial request?
733 737 Should the client send a message to abort a partially transmitted request
734 738 to facilitate graceful shutdown?
735 739
736 740 Active requests that haven't been responded to aren't tracked. This means
737 741 that if we receive a command and instruct its dispatch, another command
738 742 with its request ID can come in over the wire and there will be a race
739 743 between who responds to what.
740 744 """
741 745
742 746 def __init__(self, deferoutput=False):
743 747 """Construct a new server reactor.
744 748
745 749 ``deferoutput`` can be used to indicate that no output frames should be
746 750 instructed to be sent until input has been exhausted. In this mode,
747 751 events that would normally generate output frames (such as a command
748 752 response being ready) will instead defer instructing the consumer to
749 753 send those frames. This is useful for half-duplex transports where the
750 754 sender cannot receive until all data has been transmitted.
751 755 """
752 756 self._deferoutput = deferoutput
753 self._state = 'idle'
757 self._state = 'initial'
754 758 self._nextoutgoingstreamid = 2
755 759 self._bufferedframegens = []
756 760 # stream id -> stream instance for all active streams from the client.
757 761 self._incomingstreams = {}
758 762 self._outgoingstreams = {}
759 763 # request id -> dict of commands that are actively being received.
760 764 self._receivingcommands = {}
761 765 # Request IDs that have been received and are actively being processed.
762 766 # Once all output for a request has been sent, it is removed from this
763 767 # set.
764 768 self._activecommands = set()
765 769
770 self._protocolsettingsdecoder = None
771
772 # Sender protocol settings are optional. Set implied default values.
773 self._sendersettings = dict(DEFAULT_PROTOCOL_SETTINGS)
774
766 775 def onframerecv(self, frame):
767 776 """Process a frame that has been received off the wire.
768 777
769 778 Returns a dict with an ``action`` key that details what action,
770 779 if any, the consumer should take next.
771 780 """
772 781 if not frame.streamid % 2:
773 782 self._state = 'errored'
774 783 return self._makeerrorresult(
775 784 _('received frame with even numbered stream ID: %d') %
776 785 frame.streamid)
777 786
778 787 if frame.streamid not in self._incomingstreams:
779 788 if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM:
780 789 self._state = 'errored'
781 790 return self._makeerrorresult(
782 791 _('received frame on unknown inactive stream without '
783 792 'beginning of stream flag set'))
784 793
785 794 self._incomingstreams[frame.streamid] = stream(frame.streamid)
786 795
787 796 if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED:
788 797 # TODO handle decoding frames
789 798 self._state = 'errored'
790 799 raise error.ProgrammingError('support for decoding stream payloads '
791 800 'not yet implemented')
792 801
793 802 if frame.streamflags & STREAM_FLAG_END_STREAM:
794 803 del self._incomingstreams[frame.streamid]
795 804
796 805 handlers = {
806 'initial': self._onframeinitial,
807 'protocol-settings-receiving': self._onframeprotocolsettings,
797 808 'idle': self._onframeidle,
798 809 'command-receiving': self._onframecommandreceiving,
799 810 'errored': self._onframeerrored,
800 811 }
801 812
802 813 meth = handlers.get(self._state)
803 814 if not meth:
804 815 raise error.ProgrammingError('unhandled state: %s' % self._state)
805 816
806 817 return meth(frame)
807 818
808 819 def oncommandresponseready(self, stream, requestid, data):
809 820 """Signal that a bytes response is ready to be sent to the client.
810 821
811 822 The raw bytes response is passed as an argument.
812 823 """
813 824 ensureserverstream(stream)
814 825
815 826 def sendframes():
816 827 for frame in createcommandresponseframesfrombytes(stream, requestid,
817 828 data):
818 829 yield frame
819 830
820 831 self._activecommands.remove(requestid)
821 832
822 833 result = sendframes()
823 834
824 835 if self._deferoutput:
825 836 self._bufferedframegens.append(result)
826 837 return 'noop', {}
827 838 else:
828 839 return 'sendframes', {
829 840 'framegen': result,
830 841 }
831 842
832 843 def oncommandresponsereadyobjects(self, stream, requestid, objs):
833 844 """Signal that objects are ready to be sent to the client.
834 845
835 846 ``objs`` is an iterable of objects (typically a generator) that will
836 847 be encoded via CBOR and added to frames, which will be sent to the
837 848 client.
838 849 """
839 850 ensureserverstream(stream)
840 851
841 852 # We need to take care over exception handling. Uncaught exceptions
842 853 # when generating frames could lead to premature end of the frame
843 854 # stream and the possibility of the server or client process getting
844 855 # in a bad state.
845 856 #
846 857 # Keep in mind that if ``objs`` is a generator, advancing it could
847 858 # raise exceptions that originated in e.g. wire protocol command
848 859 # functions. That is why we differentiate between exceptions raised
849 860 # when iterating versus other exceptions that occur.
850 861 #
851 862 # In all cases, when the function finishes, the request is fully
852 863 # handled and no new frames for it should be seen.
853 864
854 865 def sendframes():
855 866 emitted = False
856 867 alternatelocationsent = False
857 868 emitter = bufferingcommandresponseemitter(stream, requestid)
858 869 while True:
859 870 try:
860 871 o = next(objs)
861 872 except StopIteration:
862 873 for frame in emitter.send(None):
863 874 yield frame
864 875
865 876 if emitted:
866 877 yield createcommandresponseeosframe(stream, requestid)
867 878 break
868 879
869 880 except error.WireprotoCommandError as e:
870 881 for frame in createcommanderrorresponse(
871 882 stream, requestid, e.message, e.messageargs):
872 883 yield frame
873 884 break
874 885
875 886 except Exception as e:
876 887 for frame in createerrorframe(
877 888 stream, requestid, '%s' % stringutil.forcebytestr(e),
878 889 errtype='server'):
879 890
880 891 yield frame
881 892
882 893 break
883 894
884 895 try:
885 896 # Alternate location responses can only be the first and
886 897 # only object in the output stream.
887 898 if isinstance(o, wireprototypes.alternatelocationresponse):
888 899 if emitted:
889 900 raise error.ProgrammingError(
890 901 'alternatelocationresponse seen after initial '
891 902 'output object')
892 903
893 904 yield createalternatelocationresponseframe(
894 905 stream, requestid, o)
895 906
896 907 alternatelocationsent = True
897 908 emitted = True
898 909 continue
899 910
900 911 if alternatelocationsent:
901 912 raise error.ProgrammingError(
902 913 'object follows alternatelocationresponse')
903 914
904 915 if not emitted:
905 916 yield createcommandresponseokframe(stream, requestid)
906 917 emitted = True
907 918
908 919 # Objects emitted by command functions can be serializable
909 920 # data structures or special types.
910 921 # TODO consider extracting the content normalization to a
911 922 # standalone function, as it may be useful for e.g. cachers.
912 923
913 924 # A pre-encoded object is sent directly to the emitter.
914 925 if isinstance(o, wireprototypes.encodedresponse):
915 926 for frame in emitter.send(o.data):
916 927 yield frame
917 928
918 929 # A regular object is CBOR encoded.
919 930 else:
920 931 for chunk in cborutil.streamencode(o):
921 932 for frame in emitter.send(chunk):
922 933 yield frame
923 934
924 935 except Exception as e:
925 936 for frame in createerrorframe(stream, requestid,
926 937 '%s' % e,
927 938 errtype='server'):
928 939 yield frame
929 940
930 941 break
931 942
932 943 self._activecommands.remove(requestid)
933 944
934 945 return self._handlesendframes(sendframes())
935 946
936 947 def oninputeof(self):
937 948 """Signals that end of input has been received.
938 949
939 950 No more frames will be received. All pending activity should be
940 951 completed.
941 952 """
942 953 # TODO should we do anything about in-flight commands?
943 954
944 955 if not self._deferoutput or not self._bufferedframegens:
945 956 return 'noop', {}
946 957
947 958 # If we buffered all our responses, emit those.
948 959 def makegen():
949 960 for gen in self._bufferedframegens:
950 961 for frame in gen:
951 962 yield frame
952 963
953 964 return 'sendframes', {
954 965 'framegen': makegen(),
955 966 }
956 967
957 968 def _handlesendframes(self, framegen):
958 969 if self._deferoutput:
959 970 self._bufferedframegens.append(framegen)
960 971 return 'noop', {}
961 972 else:
962 973 return 'sendframes', {
963 974 'framegen': framegen,
964 975 }
965 976
966 977 def onservererror(self, stream, requestid, msg):
967 978 ensureserverstream(stream)
968 979
969 980 def sendframes():
970 981 for frame in createerrorframe(stream, requestid, msg,
971 982 errtype='server'):
972 983 yield frame
973 984
974 985 self._activecommands.remove(requestid)
975 986
976 987 return self._handlesendframes(sendframes())
977 988
978 989 def oncommanderror(self, stream, requestid, message, args=None):
979 990 """Called when a command encountered an error before sending output."""
980 991 ensureserverstream(stream)
981 992
982 993 def sendframes():
983 994 for frame in createcommanderrorresponse(stream, requestid, message,
984 995 args):
985 996 yield frame
986 997
987 998 self._activecommands.remove(requestid)
988 999
989 1000 return self._handlesendframes(sendframes())
990 1001
991 1002 def makeoutputstream(self):
992 1003 """Create a stream to be used for sending data to the client."""
993 1004 streamid = self._nextoutgoingstreamid
994 1005 self._nextoutgoingstreamid += 2
995 1006
996 1007 s = stream(streamid)
997 1008 self._outgoingstreams[streamid] = s
998 1009
999 1010 return s
1000 1011
1001 1012 def _makeerrorresult(self, msg):
1002 1013 return 'error', {
1003 1014 'message': msg,
1004 1015 }
1005 1016
1006 1017 def _makeruncommandresult(self, requestid):
1007 1018 entry = self._receivingcommands[requestid]
1008 1019
1009 1020 if not entry['requestdone']:
1010 1021 self._state = 'errored'
1011 1022 raise error.ProgrammingError('should not be called without '
1012 1023 'requestdone set')
1013 1024
1014 1025 del self._receivingcommands[requestid]
1015 1026
1016 1027 if self._receivingcommands:
1017 1028 self._state = 'command-receiving'
1018 1029 else:
1019 1030 self._state = 'idle'
1020 1031
1021 1032 # Decode the payloads as CBOR.
1022 1033 entry['payload'].seek(0)
1023 1034 request = cborutil.decodeall(entry['payload'].getvalue())[0]
1024 1035
1025 1036 if b'name' not in request:
1026 1037 self._state = 'errored'
1027 1038 return self._makeerrorresult(
1028 1039 _('command request missing "name" field'))
1029 1040
1030 1041 if b'args' not in request:
1031 1042 request[b'args'] = {}
1032 1043
1033 1044 assert requestid not in self._activecommands
1034 1045 self._activecommands.add(requestid)
1035 1046
1036 1047 return 'runcommand', {
1037 1048 'requestid': requestid,
1038 1049 'command': request[b'name'],
1039 1050 'args': request[b'args'],
1040 1051 'redirect': request.get(b'redirect'),
1041 1052 'data': entry['data'].getvalue() if entry['data'] else None,
1042 1053 }
1043 1054
1044 1055 def _makewantframeresult(self):
1045 1056 return 'wantframe', {
1046 1057 'state': self._state,
1047 1058 }
1048 1059
1049 1060 def _validatecommandrequestframe(self, frame):
1050 1061 new = frame.flags & FLAG_COMMAND_REQUEST_NEW
1051 1062 continuation = frame.flags & FLAG_COMMAND_REQUEST_CONTINUATION
1052 1063
1053 1064 if new and continuation:
1054 1065 self._state = 'errored'
1055 1066 return self._makeerrorresult(
1056 1067 _('received command request frame with both new and '
1057 1068 'continuation flags set'))
1058 1069
1059 1070 if not new and not continuation:
1060 1071 self._state = 'errored'
1061 1072 return self._makeerrorresult(
1062 1073 _('received command request frame with neither new nor '
1063 1074 'continuation flags set'))
1064 1075
1076 def _onframeinitial(self, frame):
1077 # Called when we receive a frame when in the "initial" state.
1078 if frame.typeid == FRAME_TYPE_SENDER_PROTOCOL_SETTINGS:
1079 self._state = 'protocol-settings-receiving'
1080 self._protocolsettingsdecoder = cborutil.bufferingdecoder()
1081 return self._onframeprotocolsettings(frame)
1082
1083 elif frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
1084 self._state = 'idle'
1085 return self._onframeidle(frame)
1086
1087 else:
1088 self._state = 'errored'
1089 return self._makeerrorresult(
1090 _('expected sender protocol settings or command request '
1091 'frame; got %d') % frame.typeid)
1092
1093 def _onframeprotocolsettings(self, frame):
1094 assert self._state == 'protocol-settings-receiving'
1095 assert self._protocolsettingsdecoder is not None
1096
1097 if frame.typeid != FRAME_TYPE_SENDER_PROTOCOL_SETTINGS:
1098 self._state = 'errored'
1099 return self._makeerrorresult(
1100 _('expected sender protocol settings frame; got %d') %
1101 frame.typeid)
1102
1103 more = frame.flags & FLAG_SENDER_PROTOCOL_SETTINGS_CONTINUATION
1104 eos = frame.flags & FLAG_SENDER_PROTOCOL_SETTINGS_EOS
1105
1106 if more and eos:
1107 self._state = 'errored'
1108 return self._makeerrorresult(
1109 _('sender protocol settings frame cannot have both '
1110 'continuation and end of stream flags set'))
1111
1112 if not more and not eos:
1113 self._state = 'errored'
1114 return self._makeerrorresult(
1115 _('sender protocol settings frame must have continuation or '
1116 'end of stream flag set'))
1117
1118 # TODO establish limits for maximum amount of data that can be
1119 # buffered.
1120 try:
1121 self._protocolsettingsdecoder.decode(frame.payload)
1122 except Exception as e:
1123 self._state = 'errored'
1124 return self._makeerrorresult(
1125 _('error decoding CBOR from sender protocol settings frame: %s')
1126 % stringutil.forcebytestr(e))
1127
1128 if more:
1129 return self._makewantframeresult()
1130
1131 assert eos
1132
1133 decoded = self._protocolsettingsdecoder.getavailable()
1134 self._protocolsettingsdecoder = None
1135
1136 if not decoded:
1137 self._state = 'errored'
1138 return self._makeerrorresult(
1139 _('sender protocol settings frame did not contain CBOR data'))
1140 elif len(decoded) > 1:
1141 self._state = 'errored'
1142 return self._makeerrorresult(
1143 _('sender protocol settings frame contained multiple CBOR '
1144 'values'))
1145
1146 d = decoded[0]
1147
1148 if b'contentencodings' in d:
1149 self._sendersettings['contentencodings'] = d[b'contentencodings']
1150
1151 self._state = 'idle'
1152
1153 return self._makewantframeresult()
1154
1065 1155 def _onframeidle(self, frame):
1066 1156 # The only frame type that should be received in this state is a
1067 1157 # command request.
1068 1158 if frame.typeid != FRAME_TYPE_COMMAND_REQUEST:
1069 1159 self._state = 'errored'
1070 1160 return self._makeerrorresult(
1071 1161 _('expected command request frame; got %d') % frame.typeid)
1072 1162
1073 1163 res = self._validatecommandrequestframe(frame)
1074 1164 if res:
1075 1165 return res
1076 1166
1077 1167 if frame.requestid in self._receivingcommands:
1078 1168 self._state = 'errored'
1079 1169 return self._makeerrorresult(
1080 1170 _('request with ID %d already received') % frame.requestid)
1081 1171
1082 1172 if frame.requestid in self._activecommands:
1083 1173 self._state = 'errored'
1084 1174 return self._makeerrorresult(
1085 1175 _('request with ID %d is already active') % frame.requestid)
1086 1176
1087 1177 new = frame.flags & FLAG_COMMAND_REQUEST_NEW
1088 1178 moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES
1089 1179 expectingdata = frame.flags & FLAG_COMMAND_REQUEST_EXPECT_DATA
1090 1180
1091 1181 if not new:
1092 1182 self._state = 'errored'
1093 1183 return self._makeerrorresult(
1094 1184 _('received command request frame without new flag set'))
1095 1185
1096 1186 payload = util.bytesio()
1097 1187 payload.write(frame.payload)
1098 1188
1099 1189 self._receivingcommands[frame.requestid] = {
1100 1190 'payload': payload,
1101 1191 'data': None,
1102 1192 'requestdone': not moreframes,
1103 1193 'expectingdata': bool(expectingdata),
1104 1194 }
1105 1195
1106 1196 # This is the final frame for this request. Dispatch it.
1107 1197 if not moreframes and not expectingdata:
1108 1198 return self._makeruncommandresult(frame.requestid)
1109 1199
1110 1200 assert moreframes or expectingdata
1111 1201 self._state = 'command-receiving'
1112 1202 return self._makewantframeresult()
1113 1203
1114 1204 def _onframecommandreceiving(self, frame):
1115 1205 if frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
1116 1206 # Process new command requests as such.
1117 1207 if frame.flags & FLAG_COMMAND_REQUEST_NEW:
1118 1208 return self._onframeidle(frame)
1119 1209
1120 1210 res = self._validatecommandrequestframe(frame)
1121 1211 if res:
1122 1212 return res
1123 1213
1124 1214 # All other frames should be related to a command that is currently
1125 1215 # receiving but is not active.
1126 1216 if frame.requestid in self._activecommands:
1127 1217 self._state = 'errored'
1128 1218 return self._makeerrorresult(
1129 1219 _('received frame for request that is still active: %d') %
1130 1220 frame.requestid)
1131 1221
1132 1222 if frame.requestid not in self._receivingcommands:
1133 1223 self._state = 'errored'
1134 1224 return self._makeerrorresult(
1135 1225 _('received frame for request that is not receiving: %d') %
1136 1226 frame.requestid)
1137 1227
1138 1228 entry = self._receivingcommands[frame.requestid]
1139 1229
1140 1230 if frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
1141 1231 moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES
1142 1232 expectingdata = bool(frame.flags & FLAG_COMMAND_REQUEST_EXPECT_DATA)
1143 1233
1144 1234 if entry['requestdone']:
1145 1235 self._state = 'errored'
1146 1236 return self._makeerrorresult(
1147 1237 _('received command request frame when request frames '
1148 1238 'were supposedly done'))
1149 1239
1150 1240 if expectingdata != entry['expectingdata']:
1151 1241 self._state = 'errored'
1152 1242 return self._makeerrorresult(
1153 1243 _('mismatch between expect data flag and previous frame'))
1154 1244
1155 1245 entry['payload'].write(frame.payload)
1156 1246
1157 1247 if not moreframes:
1158 1248 entry['requestdone'] = True
1159 1249
1160 1250 if not moreframes and not expectingdata:
1161 1251 return self._makeruncommandresult(frame.requestid)
1162 1252
1163 1253 return self._makewantframeresult()
1164 1254
1165 1255 elif frame.typeid == FRAME_TYPE_COMMAND_DATA:
1166 1256 if not entry['expectingdata']:
1167 1257 self._state = 'errored'
1168 1258 return self._makeerrorresult(_(
1169 1259 'received command data frame for request that is not '
1170 1260 'expecting data: %d') % frame.requestid)
1171 1261
1172 1262 if entry['data'] is None:
1173 1263 entry['data'] = util.bytesio()
1174 1264
1175 1265 return self._handlecommanddataframe(frame, entry)
1176 1266 else:
1177 1267 self._state = 'errored'
1178 1268 return self._makeerrorresult(_(
1179 1269 'received unexpected frame type: %d') % frame.typeid)
1180 1270
1181 1271 def _handlecommanddataframe(self, frame, entry):
1182 1272 assert frame.typeid == FRAME_TYPE_COMMAND_DATA
1183 1273
1184 1274 # TODO support streaming data instead of buffering it.
1185 1275 entry['data'].write(frame.payload)
1186 1276
1187 1277 if frame.flags & FLAG_COMMAND_DATA_CONTINUATION:
1188 1278 return self._makewantframeresult()
1189 1279 elif frame.flags & FLAG_COMMAND_DATA_EOS:
1190 1280 entry['data'].seek(0)
1191 1281 return self._makeruncommandresult(frame.requestid)
1192 1282 else:
1193 1283 self._state = 'errored'
1194 1284 return self._makeerrorresult(_('command data frame without '
1195 1285 'flags'))
1196 1286
1197 1287 def _onframeerrored(self, frame):
1198 1288 return self._makeerrorresult(_('server already errored'))
1199 1289
1200 1290 class commandrequest(object):
1201 1291 """Represents a request to run a command."""
1202 1292
1203 1293 def __init__(self, requestid, name, args, datafh=None, redirect=None):
1204 1294 self.requestid = requestid
1205 1295 self.name = name
1206 1296 self.args = args
1207 1297 self.datafh = datafh
1208 1298 self.redirect = redirect
1209 1299 self.state = 'pending'
1210 1300
1211 1301 class clientreactor(object):
1212 1302 """Holds state of a client issuing frame-based protocol requests.
1213 1303
1214 1304 This is like ``serverreactor`` but for client-side state.
1215 1305
1216 1306 Each instance is bound to the lifetime of a connection. For persistent
1217 1307 connection transports using e.g. TCP sockets and speaking the raw
1218 1308 framing protocol, there will be a single instance for the lifetime of
1219 1309 the TCP socket. For transports where there are multiple discrete
1220 1310 interactions (say tunneled within in HTTP request), there will be a
1221 1311 separate instance for each distinct interaction.
1222 1312 """
1223 1313 def __init__(self, hasmultiplesend=False, buffersends=True):
1224 1314 """Create a new instance.
1225 1315
1226 1316 ``hasmultiplesend`` indicates whether multiple sends are supported
1227 1317 by the transport. When True, it is possible to send commands immediately
1228 1318 instead of buffering until the caller signals an intent to finish a
1229 1319 send operation.
1230 1320
1231 1321 ``buffercommands`` indicates whether sends should be buffered until the
1232 1322 last request has been issued.
1233 1323 """
1234 1324 self._hasmultiplesend = hasmultiplesend
1235 1325 self._buffersends = buffersends
1236 1326
1237 1327 self._canissuecommands = True
1238 1328 self._cansend = True
1239 1329
1240 1330 self._nextrequestid = 1
1241 1331 # We only support a single outgoing stream for now.
1242 1332 self._outgoingstream = stream(1)
1243 1333 self._pendingrequests = collections.deque()
1244 1334 self._activerequests = {}
1245 1335 self._incomingstreams = {}
1246 1336
1247 1337 def callcommand(self, name, args, datafh=None, redirect=None):
1248 1338 """Request that a command be executed.
1249 1339
1250 1340 Receives the command name, a dict of arguments to pass to the command,
1251 1341 and an optional file object containing the raw data for the command.
1252 1342
1253 1343 Returns a 3-tuple of (request, action, action data).
1254 1344 """
1255 1345 if not self._canissuecommands:
1256 1346 raise error.ProgrammingError('cannot issue new commands')
1257 1347
1258 1348 requestid = self._nextrequestid
1259 1349 self._nextrequestid += 2
1260 1350
1261 1351 request = commandrequest(requestid, name, args, datafh=datafh,
1262 1352 redirect=redirect)
1263 1353
1264 1354 if self._buffersends:
1265 1355 self._pendingrequests.append(request)
1266 1356 return request, 'noop', {}
1267 1357 else:
1268 1358 if not self._cansend:
1269 1359 raise error.ProgrammingError('sends cannot be performed on '
1270 1360 'this instance')
1271 1361
1272 1362 if not self._hasmultiplesend:
1273 1363 self._cansend = False
1274 1364 self._canissuecommands = False
1275 1365
1276 1366 return request, 'sendframes', {
1277 1367 'framegen': self._makecommandframes(request),
1278 1368 }
1279 1369
1280 1370 def flushcommands(self):
1281 1371 """Request that all queued commands be sent.
1282 1372
1283 1373 If any commands are buffered, this will instruct the caller to send
1284 1374 them over the wire. If no commands are buffered it instructs the client
1285 1375 to no-op.
1286 1376
1287 1377 If instances aren't configured for multiple sends, no new command
1288 1378 requests are allowed after this is called.
1289 1379 """
1290 1380 if not self._pendingrequests:
1291 1381 return 'noop', {}
1292 1382
1293 1383 if not self._cansend:
1294 1384 raise error.ProgrammingError('sends cannot be performed on this '
1295 1385 'instance')
1296 1386
1297 1387 # If the instance only allows sending once, mark that we have fired
1298 1388 # our one shot.
1299 1389 if not self._hasmultiplesend:
1300 1390 self._canissuecommands = False
1301 1391 self._cansend = False
1302 1392
1303 1393 def makeframes():
1304 1394 while self._pendingrequests:
1305 1395 request = self._pendingrequests.popleft()
1306 1396 for frame in self._makecommandframes(request):
1307 1397 yield frame
1308 1398
1309 1399 return 'sendframes', {
1310 1400 'framegen': makeframes(),
1311 1401 }
1312 1402
1313 1403 def _makecommandframes(self, request):
1314 1404 """Emit frames to issue a command request.
1315 1405
1316 1406 As a side-effect, update request accounting to reflect its changed
1317 1407 state.
1318 1408 """
1319 1409 self._activerequests[request.requestid] = request
1320 1410 request.state = 'sending'
1321 1411
1322 1412 res = createcommandframes(self._outgoingstream,
1323 1413 request.requestid,
1324 1414 request.name,
1325 1415 request.args,
1326 1416 datafh=request.datafh,
1327 1417 redirect=request.redirect)
1328 1418
1329 1419 for frame in res:
1330 1420 yield frame
1331 1421
1332 1422 request.state = 'sent'
1333 1423
1334 1424 def onframerecv(self, frame):
1335 1425 """Process a frame that has been received off the wire.
1336 1426
1337 1427 Returns a 2-tuple of (action, meta) describing further action the
1338 1428 caller needs to take as a result of receiving this frame.
1339 1429 """
1340 1430 if frame.streamid % 2:
1341 1431 return 'error', {
1342 1432 'message': (
1343 1433 _('received frame with odd numbered stream ID: %d') %
1344 1434 frame.streamid),
1345 1435 }
1346 1436
1347 1437 if frame.streamid not in self._incomingstreams:
1348 1438 if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM:
1349 1439 return 'error', {
1350 1440 'message': _('received frame on unknown stream '
1351 1441 'without beginning of stream flag set'),
1352 1442 }
1353 1443
1354 1444 self._incomingstreams[frame.streamid] = stream(frame.streamid)
1355 1445
1356 1446 if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED:
1357 1447 raise error.ProgrammingError('support for decoding stream '
1358 1448 'payloads not yet implemneted')
1359 1449
1360 1450 if frame.streamflags & STREAM_FLAG_END_STREAM:
1361 1451 del self._incomingstreams[frame.streamid]
1362 1452
1363 1453 if frame.requestid not in self._activerequests:
1364 1454 return 'error', {
1365 1455 'message': (_('received frame for inactive request ID: %d') %
1366 1456 frame.requestid),
1367 1457 }
1368 1458
1369 1459 request = self._activerequests[frame.requestid]
1370 1460 request.state = 'receiving'
1371 1461
1372 1462 handlers = {
1373 1463 FRAME_TYPE_COMMAND_RESPONSE: self._oncommandresponseframe,
1374 1464 FRAME_TYPE_ERROR_RESPONSE: self._onerrorresponseframe,
1375 1465 }
1376 1466
1377 1467 meth = handlers.get(frame.typeid)
1378 1468 if not meth:
1379 1469 raise error.ProgrammingError('unhandled frame type: %d' %
1380 1470 frame.typeid)
1381 1471
1382 1472 return meth(request, frame)
1383 1473
1384 1474 def _oncommandresponseframe(self, request, frame):
1385 1475 if frame.flags & FLAG_COMMAND_RESPONSE_EOS:
1386 1476 request.state = 'received'
1387 1477 del self._activerequests[request.requestid]
1388 1478
1389 1479 return 'responsedata', {
1390 1480 'request': request,
1391 1481 'expectmore': frame.flags & FLAG_COMMAND_RESPONSE_CONTINUATION,
1392 1482 'eos': frame.flags & FLAG_COMMAND_RESPONSE_EOS,
1393 1483 'data': frame.payload,
1394 1484 }
1395 1485
1396 1486 def _onerrorresponseframe(self, request, frame):
1397 1487 request.state = 'errored'
1398 1488 del self._activerequests[request.requestid]
1399 1489
1400 1490 # The payload should be a CBOR map.
1401 1491 m = cborutil.decodeall(frame.payload)[0]
1402 1492
1403 1493 return 'error', {
1404 1494 'request': request,
1405 1495 'type': m['type'],
1406 1496 'message': m['message'],
1407 1497 }
@@ -1,499 +1,602 b''
1 1 from __future__ import absolute_import, print_function
2 2
3 3 import unittest
4 4
5 5 from mercurial.thirdparty import (
6 6 cbor,
7 7 )
8 8 from mercurial import (
9 9 util,
10 10 wireprotoframing as framing,
11 11 )
12 from mercurial.utils import (
13 cborutil,
14 )
12 15
13 16 ffs = framing.makeframefromhumanstring
14 17
15 18 OK = cbor.dumps({b'status': b'ok'})
16 19
17 20 def makereactor(deferoutput=False):
18 21 return framing.serverreactor(deferoutput=deferoutput)
19 22
20 23 def sendframes(reactor, gen):
21 24 """Send a generator of frame bytearray to a reactor.
22 25
23 26 Emits a generator of results from ``onframerecv()`` calls.
24 27 """
25 28 for frame in gen:
26 29 header = framing.parseheader(frame)
27 30 payload = frame[framing.FRAME_HEADER_SIZE:]
28 31 assert len(payload) == header.length
29 32
30 33 yield reactor.onframerecv(framing.frame(header.requestid,
31 34 header.streamid,
32 35 header.streamflags,
33 36 header.typeid,
34 37 header.flags,
35 38 payload))
36 39
37 40 def sendcommandframes(reactor, stream, rid, cmd, args, datafh=None):
38 41 """Generate frames to run a command and send them to a reactor."""
39 42 return sendframes(reactor,
40 43 framing.createcommandframes(stream, rid, cmd, args,
41 44 datafh))
42 45
43 46
44 47 class ServerReactorTests(unittest.TestCase):
45 48 def _sendsingleframe(self, reactor, f):
46 49 results = list(sendframes(reactor, [f]))
47 50 self.assertEqual(len(results), 1)
48 51
49 52 return results[0]
50 53
51 54 def assertaction(self, res, expected):
52 55 self.assertIsInstance(res, tuple)
53 56 self.assertEqual(len(res), 2)
54 57 self.assertIsInstance(res[1], dict)
55 58 self.assertEqual(res[0], expected)
56 59
57 60 def assertframesequal(self, frames, framestrings):
58 61 expected = [ffs(s) for s in framestrings]
59 62 self.assertEqual(list(frames), expected)
60 63
61 64 def test1framecommand(self):
62 65 """Receiving a command in a single frame yields request to run it."""
63 66 reactor = makereactor()
64 67 stream = framing.stream(1)
65 68 results = list(sendcommandframes(reactor, stream, 1, b'mycommand', {}))
66 69 self.assertEqual(len(results), 1)
67 70 self.assertaction(results[0], b'runcommand')
68 71 self.assertEqual(results[0][1], {
69 72 b'requestid': 1,
70 73 b'command': b'mycommand',
71 74 b'args': {},
72 75 b'redirect': None,
73 76 b'data': None,
74 77 })
75 78
76 79 result = reactor.oninputeof()
77 80 self.assertaction(result, b'noop')
78 81
79 82 def test1argument(self):
80 83 reactor = makereactor()
81 84 stream = framing.stream(1)
82 85 results = list(sendcommandframes(reactor, stream, 41, b'mycommand',
83 86 {b'foo': b'bar'}))
84 87 self.assertEqual(len(results), 1)
85 88 self.assertaction(results[0], b'runcommand')
86 89 self.assertEqual(results[0][1], {
87 90 b'requestid': 41,
88 91 b'command': b'mycommand',
89 92 b'args': {b'foo': b'bar'},
90 93 b'redirect': None,
91 94 b'data': None,
92 95 })
93 96
94 97 def testmultiarguments(self):
95 98 reactor = makereactor()
96 99 stream = framing.stream(1)
97 100 results = list(sendcommandframes(reactor, stream, 1, b'mycommand',
98 101 {b'foo': b'bar', b'biz': b'baz'}))
99 102 self.assertEqual(len(results), 1)
100 103 self.assertaction(results[0], b'runcommand')
101 104 self.assertEqual(results[0][1], {
102 105 b'requestid': 1,
103 106 b'command': b'mycommand',
104 107 b'args': {b'foo': b'bar', b'biz': b'baz'},
105 108 b'redirect': None,
106 109 b'data': None,
107 110 })
108 111
109 112 def testsimplecommanddata(self):
110 113 reactor = makereactor()
111 114 stream = framing.stream(1)
112 115 results = list(sendcommandframes(reactor, stream, 1, b'mycommand', {},
113 116 util.bytesio(b'data!')))
114 117 self.assertEqual(len(results), 2)
115 118 self.assertaction(results[0], b'wantframe')
116 119 self.assertaction(results[1], b'runcommand')
117 120 self.assertEqual(results[1][1], {
118 121 b'requestid': 1,
119 122 b'command': b'mycommand',
120 123 b'args': {},
121 124 b'redirect': None,
122 125 b'data': b'data!',
123 126 })
124 127
125 128 def testmultipledataframes(self):
126 129 frames = [
127 130 ffs(b'1 1 stream-begin command-request new|have-data '
128 131 b"cbor:{b'name': b'mycommand'}"),
129 132 ffs(b'1 1 0 command-data continuation data1'),
130 133 ffs(b'1 1 0 command-data continuation data2'),
131 134 ffs(b'1 1 0 command-data eos data3'),
132 135 ]
133 136
134 137 reactor = makereactor()
135 138 results = list(sendframes(reactor, frames))
136 139 self.assertEqual(len(results), 4)
137 140 for i in range(3):
138 141 self.assertaction(results[i], b'wantframe')
139 142 self.assertaction(results[3], b'runcommand')
140 143 self.assertEqual(results[3][1], {
141 144 b'requestid': 1,
142 145 b'command': b'mycommand',
143 146 b'args': {},
144 147 b'redirect': None,
145 148 b'data': b'data1data2data3',
146 149 })
147 150
148 151 def testargumentanddata(self):
149 152 frames = [
150 153 ffs(b'1 1 stream-begin command-request new|have-data '
151 154 b"cbor:{b'name': b'command', b'args': {b'key': b'val',"
152 155 b"b'foo': b'bar'}}"),
153 156 ffs(b'1 1 0 command-data continuation value1'),
154 157 ffs(b'1 1 0 command-data eos value2'),
155 158 ]
156 159
157 160 reactor = makereactor()
158 161 results = list(sendframes(reactor, frames))
159 162
160 163 self.assertaction(results[-1], b'runcommand')
161 164 self.assertEqual(results[-1][1], {
162 165 b'requestid': 1,
163 166 b'command': b'command',
164 167 b'args': {
165 168 b'key': b'val',
166 169 b'foo': b'bar',
167 170 },
168 171 b'redirect': None,
169 172 b'data': b'value1value2',
170 173 })
171 174
172 175 def testnewandcontinuation(self):
173 176 result = self._sendsingleframe(makereactor(),
174 177 ffs(b'1 1 stream-begin command-request new|continuation '))
175 178 self.assertaction(result, b'error')
176 179 self.assertEqual(result[1], {
177 180 b'message': b'received command request frame with both new and '
178 181 b'continuation flags set',
179 182 })
180 183
181 184 def testneithernewnorcontinuation(self):
182 185 result = self._sendsingleframe(makereactor(),
183 186 ffs(b'1 1 stream-begin command-request 0 '))
184 187 self.assertaction(result, b'error')
185 188 self.assertEqual(result[1], {
186 189 b'message': b'received command request frame with neither new nor '
187 190 b'continuation flags set',
188 191 })
189 192
190 193 def testunexpectedcommanddata(self):
191 194 """Command data frame when not running a command is an error."""
192 195 result = self._sendsingleframe(makereactor(),
193 196 ffs(b'1 1 stream-begin command-data 0 ignored'))
194 197 self.assertaction(result, b'error')
195 198 self.assertEqual(result[1], {
196 b'message': b'expected command request frame; got 2',
199 b'message': b'expected sender protocol settings or command request '
200 b'frame; got 2',
197 201 })
198 202
199 203 def testunexpectedcommanddatareceiving(self):
200 204 """Same as above except the command is receiving."""
201 205 results = list(sendframes(makereactor(), [
202 206 ffs(b'1 1 stream-begin command-request new|more '
203 207 b"cbor:{b'name': b'ignored'}"),
204 208 ffs(b'1 1 0 command-data eos ignored'),
205 209 ]))
206 210
207 211 self.assertaction(results[0], b'wantframe')
208 212 self.assertaction(results[1], b'error')
209 213 self.assertEqual(results[1][1], {
210 214 b'message': b'received command data frame for request that is not '
211 215 b'expecting data: 1',
212 216 })
213 217
214 218 def testconflictingrequestidallowed(self):
215 219 """Multiple fully serviced commands with same request ID is allowed."""
216 220 reactor = makereactor()
217 221 results = []
218 222 outstream = reactor.makeoutputstream()
219 223 results.append(self._sendsingleframe(
220 224 reactor, ffs(b'1 1 stream-begin command-request new '
221 225 b"cbor:{b'name': b'command'}")))
222 226 result = reactor.oncommandresponseready(outstream, 1, b'response1')
223 227 self.assertaction(result, b'sendframes')
224 228 list(result[1][b'framegen'])
225 229 results.append(self._sendsingleframe(
226 230 reactor, ffs(b'1 1 stream-begin command-request new '
227 231 b"cbor:{b'name': b'command'}")))
228 232 result = reactor.oncommandresponseready(outstream, 1, b'response2')
229 233 self.assertaction(result, b'sendframes')
230 234 list(result[1][b'framegen'])
231 235 results.append(self._sendsingleframe(
232 236 reactor, ffs(b'1 1 stream-begin command-request new '
233 237 b"cbor:{b'name': b'command'}")))
234 238 result = reactor.oncommandresponseready(outstream, 1, b'response3')
235 239 self.assertaction(result, b'sendframes')
236 240 list(result[1][b'framegen'])
237 241
238 242 for i in range(3):
239 243 self.assertaction(results[i], b'runcommand')
240 244 self.assertEqual(results[i][1], {
241 245 b'requestid': 1,
242 246 b'command': b'command',
243 247 b'args': {},
244 248 b'redirect': None,
245 249 b'data': None,
246 250 })
247 251
248 252 def testconflictingrequestid(self):
249 253 """Request ID for new command matching in-flight command is illegal."""
250 254 results = list(sendframes(makereactor(), [
251 255 ffs(b'1 1 stream-begin command-request new|more '
252 256 b"cbor:{b'name': b'command'}"),
253 257 ffs(b'1 1 0 command-request new '
254 258 b"cbor:{b'name': b'command1'}"),
255 259 ]))
256 260
257 261 self.assertaction(results[0], b'wantframe')
258 262 self.assertaction(results[1], b'error')
259 263 self.assertEqual(results[1][1], {
260 264 b'message': b'request with ID 1 already received',
261 265 })
262 266
263 267 def testinterleavedcommands(self):
264 268 cbor1 = cbor.dumps({
265 269 b'name': b'command1',
266 270 b'args': {
267 271 b'foo': b'bar',
268 272 b'key1': b'val',
269 273 }
270 274 }, canonical=True)
271 275 cbor3 = cbor.dumps({
272 276 b'name': b'command3',
273 277 b'args': {
274 278 b'biz': b'baz',
275 279 b'key': b'val',
276 280 },
277 281 }, canonical=True)
278 282
279 283 results = list(sendframes(makereactor(), [
280 284 ffs(b'1 1 stream-begin command-request new|more %s' % cbor1[0:6]),
281 285 ffs(b'3 1 0 command-request new|more %s' % cbor3[0:10]),
282 286 ffs(b'1 1 0 command-request continuation|more %s' % cbor1[6:9]),
283 287 ffs(b'3 1 0 command-request continuation|more %s' % cbor3[10:13]),
284 288 ffs(b'3 1 0 command-request continuation %s' % cbor3[13:]),
285 289 ffs(b'1 1 0 command-request continuation %s' % cbor1[9:]),
286 290 ]))
287 291
288 292 self.assertEqual([t[0] for t in results], [
289 293 b'wantframe',
290 294 b'wantframe',
291 295 b'wantframe',
292 296 b'wantframe',
293 297 b'runcommand',
294 298 b'runcommand',
295 299 ])
296 300
297 301 self.assertEqual(results[4][1], {
298 302 b'requestid': 3,
299 303 b'command': b'command3',
300 304 b'args': {b'biz': b'baz', b'key': b'val'},
301 305 b'redirect': None,
302 306 b'data': None,
303 307 })
304 308 self.assertEqual(results[5][1], {
305 309 b'requestid': 1,
306 310 b'command': b'command1',
307 311 b'args': {b'foo': b'bar', b'key1': b'val'},
308 312 b'redirect': None,
309 313 b'data': None,
310 314 })
311 315
312 316 def testmissingcommanddataframe(self):
313 317 # The reactor doesn't currently handle partially received commands.
314 318 # So this test is failing to do anything with request 1.
315 319 frames = [
316 320 ffs(b'1 1 stream-begin command-request new|have-data '
317 321 b"cbor:{b'name': b'command1'}"),
318 322 ffs(b'3 1 0 command-request new '
319 323 b"cbor:{b'name': b'command2'}"),
320 324 ]
321 325 results = list(sendframes(makereactor(), frames))
322 326 self.assertEqual(len(results), 2)
323 327 self.assertaction(results[0], b'wantframe')
324 328 self.assertaction(results[1], b'runcommand')
325 329
326 330 def testmissingcommanddataframeflags(self):
327 331 frames = [
328 332 ffs(b'1 1 stream-begin command-request new|have-data '
329 333 b"cbor:{b'name': b'command1'}"),
330 334 ffs(b'1 1 0 command-data 0 data'),
331 335 ]
332 336 results = list(sendframes(makereactor(), frames))
333 337 self.assertEqual(len(results), 2)
334 338 self.assertaction(results[0], b'wantframe')
335 339 self.assertaction(results[1], b'error')
336 340 self.assertEqual(results[1][1], {
337 341 b'message': b'command data frame without flags',
338 342 })
339 343
340 344 def testframefornonreceivingrequest(self):
341 345 """Receiving a frame for a command that is not receiving is illegal."""
342 346 results = list(sendframes(makereactor(), [
343 347 ffs(b'1 1 stream-begin command-request new '
344 348 b"cbor:{b'name': b'command1'}"),
345 349 ffs(b'3 1 0 command-request new|have-data '
346 350 b"cbor:{b'name': b'command3'}"),
347 351 ffs(b'5 1 0 command-data eos ignored'),
348 352 ]))
349 353 self.assertaction(results[2], b'error')
350 354 self.assertEqual(results[2][1], {
351 355 b'message': b'received frame for request that is not receiving: 5',
352 356 })
353 357
354 358 def testsimpleresponse(self):
355 359 """Bytes response to command sends result frames."""
356 360 reactor = makereactor()
357 361 instream = framing.stream(1)
358 362 list(sendcommandframes(reactor, instream, 1, b'mycommand', {}))
359 363
360 364 outstream = reactor.makeoutputstream()
361 365 result = reactor.oncommandresponseready(outstream, 1, b'response')
362 366 self.assertaction(result, b'sendframes')
363 367 self.assertframesequal(result[1][b'framegen'], [
364 368 b'1 2 stream-begin command-response eos %sresponse' % OK,
365 369 ])
366 370
367 371 def testmultiframeresponse(self):
368 372 """Bytes response spanning multiple frames is handled."""
369 373 first = b'x' * framing.DEFAULT_MAX_FRAME_SIZE
370 374 second = b'y' * 100
371 375
372 376 reactor = makereactor()
373 377 instream = framing.stream(1)
374 378 list(sendcommandframes(reactor, instream, 1, b'mycommand', {}))
375 379
376 380 outstream = reactor.makeoutputstream()
377 381 result = reactor.oncommandresponseready(outstream, 1, first + second)
378 382 self.assertaction(result, b'sendframes')
379 383 self.assertframesequal(result[1][b'framegen'], [
380 384 b'1 2 stream-begin command-response continuation %s' % OK,
381 385 b'1 2 0 command-response continuation %s' % first,
382 386 b'1 2 0 command-response eos %s' % second,
383 387 ])
384 388
385 389 def testservererror(self):
386 390 reactor = makereactor()
387 391 instream = framing.stream(1)
388 392 list(sendcommandframes(reactor, instream, 1, b'mycommand', {}))
389 393
390 394 outstream = reactor.makeoutputstream()
391 395 result = reactor.onservererror(outstream, 1, b'some message')
392 396 self.assertaction(result, b'sendframes')
393 397 self.assertframesequal(result[1][b'framegen'], [
394 398 b"1 2 stream-begin error-response 0 "
395 399 b"cbor:{b'type': b'server', "
396 400 b"b'message': [{b'msg': b'some message'}]}",
397 401 ])
398 402
399 403 def test1commanddeferresponse(self):
400 404 """Responses when in deferred output mode are delayed until EOF."""
401 405 reactor = makereactor(deferoutput=True)
402 406 instream = framing.stream(1)
403 407 results = list(sendcommandframes(reactor, instream, 1, b'mycommand',
404 408 {}))
405 409 self.assertEqual(len(results), 1)
406 410 self.assertaction(results[0], b'runcommand')
407 411
408 412 outstream = reactor.makeoutputstream()
409 413 result = reactor.oncommandresponseready(outstream, 1, b'response')
410 414 self.assertaction(result, b'noop')
411 415 result = reactor.oninputeof()
412 416 self.assertaction(result, b'sendframes')
413 417 self.assertframesequal(result[1][b'framegen'], [
414 418 b'1 2 stream-begin command-response eos %sresponse' % OK,
415 419 ])
416 420
417 421 def testmultiplecommanddeferresponse(self):
418 422 reactor = makereactor(deferoutput=True)
419 423 instream = framing.stream(1)
420 424 list(sendcommandframes(reactor, instream, 1, b'command1', {}))
421 425 list(sendcommandframes(reactor, instream, 3, b'command2', {}))
422 426
423 427 outstream = reactor.makeoutputstream()
424 428 result = reactor.oncommandresponseready(outstream, 1, b'response1')
425 429 self.assertaction(result, b'noop')
426 430 result = reactor.oncommandresponseready(outstream, 3, b'response2')
427 431 self.assertaction(result, b'noop')
428 432 result = reactor.oninputeof()
429 433 self.assertaction(result, b'sendframes')
430 434 self.assertframesequal(result[1][b'framegen'], [
431 435 b'1 2 stream-begin command-response eos %sresponse1' % OK,
432 436 b'3 2 0 command-response eos %sresponse2' % OK,
433 437 ])
434 438
435 439 def testrequestidtracking(self):
436 440 reactor = makereactor(deferoutput=True)
437 441 instream = framing.stream(1)
438 442 list(sendcommandframes(reactor, instream, 1, b'command1', {}))
439 443 list(sendcommandframes(reactor, instream, 3, b'command2', {}))
440 444 list(sendcommandframes(reactor, instream, 5, b'command3', {}))
441 445
442 446 # Register results for commands out of order.
443 447 outstream = reactor.makeoutputstream()
444 448 reactor.oncommandresponseready(outstream, 3, b'response3')
445 449 reactor.oncommandresponseready(outstream, 1, b'response1')
446 450 reactor.oncommandresponseready(outstream, 5, b'response5')
447 451
448 452 result = reactor.oninputeof()
449 453 self.assertaction(result, b'sendframes')
450 454 self.assertframesequal(result[1][b'framegen'], [
451 455 b'3 2 stream-begin command-response eos %sresponse3' % OK,
452 456 b'1 2 0 command-response eos %sresponse1' % OK,
453 457 b'5 2 0 command-response eos %sresponse5' % OK,
454 458 ])
455 459
456 460 def testduplicaterequestonactivecommand(self):
457 461 """Receiving a request ID that matches a request that isn't finished."""
458 462 reactor = makereactor()
459 463 stream = framing.stream(1)
460 464 list(sendcommandframes(reactor, stream, 1, b'command1', {}))
461 465 results = list(sendcommandframes(reactor, stream, 1, b'command1', {}))
462 466
463 467 self.assertaction(results[0], b'error')
464 468 self.assertEqual(results[0][1], {
465 469 b'message': b'request with ID 1 is already active',
466 470 })
467 471
468 472 def testduplicaterequestonactivecommandnosend(self):
469 473 """Same as above but we've registered a response but haven't sent it."""
470 474 reactor = makereactor()
471 475 instream = framing.stream(1)
472 476 list(sendcommandframes(reactor, instream, 1, b'command1', {}))
473 477 outstream = reactor.makeoutputstream()
474 478 reactor.oncommandresponseready(outstream, 1, b'response')
475 479
476 480 # We've registered the response but haven't sent it. From the
477 481 # perspective of the reactor, the command is still active.
478 482
479 483 results = list(sendcommandframes(reactor, instream, 1, b'command1', {}))
480 484 self.assertaction(results[0], b'error')
481 485 self.assertEqual(results[0][1], {
482 486 b'message': b'request with ID 1 is already active',
483 487 })
484 488
485 489 def testduplicaterequestaftersend(self):
486 490 """We can use a duplicate request ID after we've sent the response."""
487 491 reactor = makereactor()
488 492 instream = framing.stream(1)
489 493 list(sendcommandframes(reactor, instream, 1, b'command1', {}))
490 494 outstream = reactor.makeoutputstream()
491 495 res = reactor.oncommandresponseready(outstream, 1, b'response')
492 496 list(res[1][b'framegen'])
493 497
494 498 results = list(sendcommandframes(reactor, instream, 1, b'command1', {}))
495 499 self.assertaction(results[0], b'runcommand')
496 500
501 def testprotocolsettingsnoflags(self):
502 result = self._sendsingleframe(
503 makereactor(),
504 ffs(b'0 1 stream-begin sender-protocol-settings 0 '))
505 self.assertaction(result, b'error')
506 self.assertEqual(result[1], {
507 b'message': b'sender protocol settings frame must have '
508 b'continuation or end of stream flag set',
509 })
510
511 def testprotocolsettingsconflictflags(self):
512 result = self._sendsingleframe(
513 makereactor(),
514 ffs(b'0 1 stream-begin sender-protocol-settings continuation|eos '))
515 self.assertaction(result, b'error')
516 self.assertEqual(result[1], {
517 b'message': b'sender protocol settings frame cannot have both '
518 b'continuation and end of stream flags set',
519 })
520
521 def testprotocolsettingsemptypayload(self):
522 result = self._sendsingleframe(
523 makereactor(),
524 ffs(b'0 1 stream-begin sender-protocol-settings eos '))
525 self.assertaction(result, b'error')
526 self.assertEqual(result[1], {
527 b'message': b'sender protocol settings frame did not contain CBOR '
528 b'data',
529 })
530
531 def testprotocolsettingsmultipleobjects(self):
532 result = self._sendsingleframe(
533 makereactor(),
534 ffs(b'0 1 stream-begin sender-protocol-settings eos '
535 b'\x46foobar\x43foo'))
536 self.assertaction(result, b'error')
537 self.assertEqual(result[1], {
538 b'message': b'sender protocol settings frame contained multiple '
539 b'CBOR values',
540 })
541
542 def testprotocolsettingscontentencodings(self):
543 reactor = makereactor()
544
545 result = self._sendsingleframe(
546 reactor,
547 ffs(b'0 1 stream-begin sender-protocol-settings eos '
548 b'cbor:{b"contentencodings": [b"a", b"b"]}'))
549 self.assertaction(result, b'wantframe')
550
551 self.assertEqual(reactor._state, b'idle')
552 self.assertEqual(reactor._sendersettings[b'contentencodings'],
553 [b'a', b'b'])
554
555 def testprotocolsettingsmultipleframes(self):
556 reactor = makereactor()
557
558 data = b''.join(cborutil.streamencode({
559 b'contentencodings': [b'value1', b'value2'],
560 }))
561
562 results = list(sendframes(reactor, [
563 ffs(b'0 1 stream-begin sender-protocol-settings continuation %s' %
564 data[0:5]),
565 ffs(b'0 1 0 sender-protocol-settings eos %s' % data[5:]),
566 ]))
567
568 self.assertEqual(len(results), 2)
569
570 self.assertaction(results[0], b'wantframe')
571 self.assertaction(results[1], b'wantframe')
572
573 self.assertEqual(reactor._state, b'idle')
574 self.assertEqual(reactor._sendersettings[b'contentencodings'],
575 [b'value1', b'value2'])
576
577 def testprotocolsettingsbadcbor(self):
578 result = self._sendsingleframe(
579 makereactor(),
580 ffs(b'0 1 stream-begin sender-protocol-settings eos badvalue'))
581 self.assertaction(result, b'error')
582
583 def testprotocolsettingsnoninitial(self):
584 # Cannot have protocol settings frames as non-initial frames.
585 reactor = makereactor()
586
587 stream = framing.stream(1)
588 results = list(sendcommandframes(reactor, stream, 1, b'mycommand', {}))
589 self.assertEqual(len(results), 1)
590 self.assertaction(results[0], b'runcommand')
591
592 result = self._sendsingleframe(
593 reactor,
594 ffs(b'0 1 0 sender-protocol-settings eos '))
595 self.assertaction(result, b'error')
596 self.assertEqual(result[1], {
597 b'message': b'expected command request frame; got 8',
598 })
599
497 600 if __name__ == '__main__':
498 601 import silenttestrunner
499 602 silenttestrunner.main(__name__)
General Comments 0
You need to be logged in to leave comments. Login now