##// END OF EJS Templates
wireprotov2: define and use stream encoders...
Gregory Szorc -
r40167:e6752241 default
parent child Browse files
Show More
@@ -1,1609 +1,1806 b''
1 1 # wireprotoframing.py - unified framing protocol for wire protocol
2 2 #
3 3 # Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 # This file contains functionality to support the unified frame-based wire
9 9 # protocol. For details about the protocol, see
10 10 # `hg help internals.wireprotocol`.
11 11
12 12 from __future__ import absolute_import
13 13
14 14 import collections
15 15 import struct
16 16
17 17 from .i18n import _
18 18 from .thirdparty import (
19 19 attr,
20 20 )
21 21 from . import (
22 22 encoding,
23 23 error,
24 24 pycompat,
25 25 util,
26 26 wireprototypes,
27 27 )
28 28 from .utils import (
29 29 cborutil,
30 30 stringutil,
31 31 )
32 32
33 33 FRAME_HEADER_SIZE = 8
34 34 DEFAULT_MAX_FRAME_SIZE = 32768
35 35
36 36 STREAM_FLAG_BEGIN_STREAM = 0x01
37 37 STREAM_FLAG_END_STREAM = 0x02
38 38 STREAM_FLAG_ENCODING_APPLIED = 0x04
39 39
40 40 STREAM_FLAGS = {
41 41 b'stream-begin': STREAM_FLAG_BEGIN_STREAM,
42 42 b'stream-end': STREAM_FLAG_END_STREAM,
43 43 b'encoded': STREAM_FLAG_ENCODING_APPLIED,
44 44 }
45 45
46 46 FRAME_TYPE_COMMAND_REQUEST = 0x01
47 47 FRAME_TYPE_COMMAND_DATA = 0x02
48 48 FRAME_TYPE_COMMAND_RESPONSE = 0x03
49 49 FRAME_TYPE_ERROR_RESPONSE = 0x05
50 50 FRAME_TYPE_TEXT_OUTPUT = 0x06
51 51 FRAME_TYPE_PROGRESS = 0x07
52 52 FRAME_TYPE_SENDER_PROTOCOL_SETTINGS = 0x08
53 53 FRAME_TYPE_STREAM_SETTINGS = 0x09
54 54
55 55 FRAME_TYPES = {
56 56 b'command-request': FRAME_TYPE_COMMAND_REQUEST,
57 57 b'command-data': FRAME_TYPE_COMMAND_DATA,
58 58 b'command-response': FRAME_TYPE_COMMAND_RESPONSE,
59 59 b'error-response': FRAME_TYPE_ERROR_RESPONSE,
60 60 b'text-output': FRAME_TYPE_TEXT_OUTPUT,
61 61 b'progress': FRAME_TYPE_PROGRESS,
62 62 b'sender-protocol-settings': FRAME_TYPE_SENDER_PROTOCOL_SETTINGS,
63 63 b'stream-settings': FRAME_TYPE_STREAM_SETTINGS,
64 64 }
65 65
66 66 FLAG_COMMAND_REQUEST_NEW = 0x01
67 67 FLAG_COMMAND_REQUEST_CONTINUATION = 0x02
68 68 FLAG_COMMAND_REQUEST_MORE_FRAMES = 0x04
69 69 FLAG_COMMAND_REQUEST_EXPECT_DATA = 0x08
70 70
71 71 FLAGS_COMMAND_REQUEST = {
72 72 b'new': FLAG_COMMAND_REQUEST_NEW,
73 73 b'continuation': FLAG_COMMAND_REQUEST_CONTINUATION,
74 74 b'more': FLAG_COMMAND_REQUEST_MORE_FRAMES,
75 75 b'have-data': FLAG_COMMAND_REQUEST_EXPECT_DATA,
76 76 }
77 77
78 78 FLAG_COMMAND_DATA_CONTINUATION = 0x01
79 79 FLAG_COMMAND_DATA_EOS = 0x02
80 80
81 81 FLAGS_COMMAND_DATA = {
82 82 b'continuation': FLAG_COMMAND_DATA_CONTINUATION,
83 83 b'eos': FLAG_COMMAND_DATA_EOS,
84 84 }
85 85
86 86 FLAG_COMMAND_RESPONSE_CONTINUATION = 0x01
87 87 FLAG_COMMAND_RESPONSE_EOS = 0x02
88 88
89 89 FLAGS_COMMAND_RESPONSE = {
90 90 b'continuation': FLAG_COMMAND_RESPONSE_CONTINUATION,
91 91 b'eos': FLAG_COMMAND_RESPONSE_EOS,
92 92 }
93 93
94 94 FLAG_SENDER_PROTOCOL_SETTINGS_CONTINUATION = 0x01
95 95 FLAG_SENDER_PROTOCOL_SETTINGS_EOS = 0x02
96 96
97 97 FLAGS_SENDER_PROTOCOL_SETTINGS = {
98 98 b'continuation': FLAG_SENDER_PROTOCOL_SETTINGS_CONTINUATION,
99 99 b'eos': FLAG_SENDER_PROTOCOL_SETTINGS_EOS,
100 100 }
101 101
102 102 FLAG_STREAM_ENCODING_SETTINGS_CONTINUATION = 0x01
103 103 FLAG_STREAM_ENCODING_SETTINGS_EOS = 0x02
104 104
105 105 FLAGS_STREAM_ENCODING_SETTINGS = {
106 106 b'continuation': FLAG_STREAM_ENCODING_SETTINGS_CONTINUATION,
107 107 b'eos': FLAG_STREAM_ENCODING_SETTINGS_EOS,
108 108 }
109 109
110 110 # Maps frame types to their available flags.
111 111 FRAME_TYPE_FLAGS = {
112 112 FRAME_TYPE_COMMAND_REQUEST: FLAGS_COMMAND_REQUEST,
113 113 FRAME_TYPE_COMMAND_DATA: FLAGS_COMMAND_DATA,
114 114 FRAME_TYPE_COMMAND_RESPONSE: FLAGS_COMMAND_RESPONSE,
115 115 FRAME_TYPE_ERROR_RESPONSE: {},
116 116 FRAME_TYPE_TEXT_OUTPUT: {},
117 117 FRAME_TYPE_PROGRESS: {},
118 118 FRAME_TYPE_SENDER_PROTOCOL_SETTINGS: FLAGS_SENDER_PROTOCOL_SETTINGS,
119 119 FRAME_TYPE_STREAM_SETTINGS: FLAGS_STREAM_ENCODING_SETTINGS,
120 120 }
121 121
122 122 ARGUMENT_RECORD_HEADER = struct.Struct(r'<HH')
123 123
124 124 def humanflags(mapping, value):
125 125 """Convert a numeric flags value to a human value, using a mapping table."""
126 126 namemap = {v: k for k, v in mapping.iteritems()}
127 127 flags = []
128 128 val = 1
129 129 while value >= val:
130 130 if value & val:
131 131 flags.append(namemap.get(val, '<unknown 0x%02x>' % val))
132 132 val <<= 1
133 133
134 134 return b'|'.join(flags)
135 135
136 136 @attr.s(slots=True)
137 137 class frameheader(object):
138 138 """Represents the data in a frame header."""
139 139
140 140 length = attr.ib()
141 141 requestid = attr.ib()
142 142 streamid = attr.ib()
143 143 streamflags = attr.ib()
144 144 typeid = attr.ib()
145 145 flags = attr.ib()
146 146
147 147 @attr.s(slots=True, repr=False)
148 148 class frame(object):
149 149 """Represents a parsed frame."""
150 150
151 151 requestid = attr.ib()
152 152 streamid = attr.ib()
153 153 streamflags = attr.ib()
154 154 typeid = attr.ib()
155 155 flags = attr.ib()
156 156 payload = attr.ib()
157 157
158 158 @encoding.strmethod
159 159 def __repr__(self):
160 160 typename = '<unknown 0x%02x>' % self.typeid
161 161 for name, value in FRAME_TYPES.iteritems():
162 162 if value == self.typeid:
163 163 typename = name
164 164 break
165 165
166 166 return ('frame(size=%d; request=%d; stream=%d; streamflags=%s; '
167 167 'type=%s; flags=%s)' % (
168 168 len(self.payload), self.requestid, self.streamid,
169 169 humanflags(STREAM_FLAGS, self.streamflags), typename,
170 170 humanflags(FRAME_TYPE_FLAGS.get(self.typeid, {}), self.flags)))
171 171
172 172 def makeframe(requestid, streamid, streamflags, typeid, flags, payload):
173 173 """Assemble a frame into a byte array."""
174 174 # TODO assert size of payload.
175 175 frame = bytearray(FRAME_HEADER_SIZE + len(payload))
176 176
177 177 # 24 bits length
178 178 # 16 bits request id
179 179 # 8 bits stream id
180 180 # 8 bits stream flags
181 181 # 4 bits type
182 182 # 4 bits flags
183 183
184 184 l = struct.pack(r'<I', len(payload))
185 185 frame[0:3] = l[0:3]
186 186 struct.pack_into(r'<HBB', frame, 3, requestid, streamid, streamflags)
187 187 frame[7] = (typeid << 4) | flags
188 188 frame[8:] = payload
189 189
190 190 return frame
191 191
192 192 def makeframefromhumanstring(s):
193 193 """Create a frame from a human readable string
194 194
195 195 Strings have the form:
196 196
197 197 <request-id> <stream-id> <stream-flags> <type> <flags> <payload>
198 198
199 199 This can be used by user-facing applications and tests for creating
200 200 frames easily without having to type out a bunch of constants.
201 201
202 202 Request ID and stream IDs are integers.
203 203
204 204 Stream flags, frame type, and flags can be specified by integer or
205 205 named constant.
206 206
207 207 Flags can be delimited by `|` to bitwise OR them together.
208 208
209 209 If the payload begins with ``cbor:``, the following string will be
210 210 evaluated as Python literal and the resulting object will be fed into
211 211 a CBOR encoder. Otherwise, the payload is interpreted as a Python
212 212 byte string literal.
213 213 """
214 214 fields = s.split(b' ', 5)
215 215 requestid, streamid, streamflags, frametype, frameflags, payload = fields
216 216
217 217 requestid = int(requestid)
218 218 streamid = int(streamid)
219 219
220 220 finalstreamflags = 0
221 221 for flag in streamflags.split(b'|'):
222 222 if flag in STREAM_FLAGS:
223 223 finalstreamflags |= STREAM_FLAGS[flag]
224 224 else:
225 225 finalstreamflags |= int(flag)
226 226
227 227 if frametype in FRAME_TYPES:
228 228 frametype = FRAME_TYPES[frametype]
229 229 else:
230 230 frametype = int(frametype)
231 231
232 232 finalflags = 0
233 233 validflags = FRAME_TYPE_FLAGS[frametype]
234 234 for flag in frameflags.split(b'|'):
235 235 if flag in validflags:
236 236 finalflags |= validflags[flag]
237 237 else:
238 238 finalflags |= int(flag)
239 239
240 240 if payload.startswith(b'cbor:'):
241 241 payload = b''.join(cborutil.streamencode(
242 242 stringutil.evalpythonliteral(payload[5:])))
243 243
244 244 else:
245 245 payload = stringutil.unescapestr(payload)
246 246
247 247 return makeframe(requestid=requestid, streamid=streamid,
248 248 streamflags=finalstreamflags, typeid=frametype,
249 249 flags=finalflags, payload=payload)
250 250
251 251 def parseheader(data):
252 252 """Parse a unified framing protocol frame header from a buffer.
253 253
254 254 The header is expected to be in the buffer at offset 0 and the
255 255 buffer is expected to be large enough to hold a full header.
256 256 """
257 257 # 24 bits payload length (little endian)
258 258 # 16 bits request ID
259 259 # 8 bits stream ID
260 260 # 8 bits stream flags
261 261 # 4 bits frame type
262 262 # 4 bits frame flags
263 263 # ... payload
264 264 framelength = data[0] + 256 * data[1] + 16384 * data[2]
265 265 requestid, streamid, streamflags = struct.unpack_from(r'<HBB', data, 3)
266 266 typeflags = data[7]
267 267
268 268 frametype = (typeflags & 0xf0) >> 4
269 269 frameflags = typeflags & 0x0f
270 270
271 271 return frameheader(framelength, requestid, streamid, streamflags,
272 272 frametype, frameflags)
273 273
274 274 def readframe(fh):
275 275 """Read a unified framing protocol frame from a file object.
276 276
277 277 Returns a 3-tuple of (type, flags, payload) for the decoded frame or
278 278 None if no frame is available. May raise if a malformed frame is
279 279 seen.
280 280 """
281 281 header = bytearray(FRAME_HEADER_SIZE)
282 282
283 283 readcount = fh.readinto(header)
284 284
285 285 if readcount == 0:
286 286 return None
287 287
288 288 if readcount != FRAME_HEADER_SIZE:
289 289 raise error.Abort(_('received incomplete frame: got %d bytes: %s') %
290 290 (readcount, header))
291 291
292 292 h = parseheader(header)
293 293
294 294 payload = fh.read(h.length)
295 295 if len(payload) != h.length:
296 296 raise error.Abort(_('frame length error: expected %d; got %d') %
297 297 (h.length, len(payload)))
298 298
299 299 return frame(h.requestid, h.streamid, h.streamflags, h.typeid, h.flags,
300 300 payload)
301 301
302 302 def createcommandframes(stream, requestid, cmd, args, datafh=None,
303 303 maxframesize=DEFAULT_MAX_FRAME_SIZE,
304 304 redirect=None):
305 305 """Create frames necessary to transmit a request to run a command.
306 306
307 307 This is a generator of bytearrays. Each item represents a frame
308 308 ready to be sent over the wire to a peer.
309 309 """
310 310 data = {b'name': cmd}
311 311 if args:
312 312 data[b'args'] = args
313 313
314 314 if redirect:
315 315 data[b'redirect'] = redirect
316 316
317 317 data = b''.join(cborutil.streamencode(data))
318 318
319 319 offset = 0
320 320
321 321 while True:
322 322 flags = 0
323 323
324 324 # Must set new or continuation flag.
325 325 if not offset:
326 326 flags |= FLAG_COMMAND_REQUEST_NEW
327 327 else:
328 328 flags |= FLAG_COMMAND_REQUEST_CONTINUATION
329 329
330 330 # Data frames is set on all frames.
331 331 if datafh:
332 332 flags |= FLAG_COMMAND_REQUEST_EXPECT_DATA
333 333
334 334 payload = data[offset:offset + maxframesize]
335 335 offset += len(payload)
336 336
337 337 if len(payload) == maxframesize and offset < len(data):
338 338 flags |= FLAG_COMMAND_REQUEST_MORE_FRAMES
339 339
340 340 yield stream.makeframe(requestid=requestid,
341 341 typeid=FRAME_TYPE_COMMAND_REQUEST,
342 342 flags=flags,
343 343 payload=payload)
344 344
345 345 if not (flags & FLAG_COMMAND_REQUEST_MORE_FRAMES):
346 346 break
347 347
348 348 if datafh:
349 349 while True:
350 350 data = datafh.read(DEFAULT_MAX_FRAME_SIZE)
351 351
352 352 done = False
353 353 if len(data) == DEFAULT_MAX_FRAME_SIZE:
354 354 flags = FLAG_COMMAND_DATA_CONTINUATION
355 355 else:
356 356 flags = FLAG_COMMAND_DATA_EOS
357 357 assert datafh.read(1) == b''
358 358 done = True
359 359
360 360 yield stream.makeframe(requestid=requestid,
361 361 typeid=FRAME_TYPE_COMMAND_DATA,
362 362 flags=flags,
363 363 payload=data)
364 364
365 365 if done:
366 366 break
367 367
368 368 def createcommandresponseframesfrombytes(stream, requestid, data,
369 369 maxframesize=DEFAULT_MAX_FRAME_SIZE):
370 370 """Create a raw frame to send a bytes response from static bytes input.
371 371
372 372 Returns a generator of bytearrays.
373 373 """
374 374 # Automatically send the overall CBOR response map.
375 375 overall = b''.join(cborutil.streamencode({b'status': b'ok'}))
376 376 if len(overall) > maxframesize:
377 377 raise error.ProgrammingError('not yet implemented')
378 378
379 379 # Simple case where we can fit the full response in a single frame.
380 380 if len(overall) + len(data) <= maxframesize:
381 381 flags = FLAG_COMMAND_RESPONSE_EOS
382 382 yield stream.makeframe(requestid=requestid,
383 383 typeid=FRAME_TYPE_COMMAND_RESPONSE,
384 384 flags=flags,
385 385 payload=overall + data)
386 386 return
387 387
388 388 # It's easier to send the overall CBOR map in its own frame than to track
389 389 # offsets.
390 390 yield stream.makeframe(requestid=requestid,
391 391 typeid=FRAME_TYPE_COMMAND_RESPONSE,
392 392 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
393 393 payload=overall)
394 394
395 395 offset = 0
396 396 while True:
397 397 chunk = data[offset:offset + maxframesize]
398 398 offset += len(chunk)
399 399 done = offset == len(data)
400 400
401 401 if done:
402 402 flags = FLAG_COMMAND_RESPONSE_EOS
403 403 else:
404 404 flags = FLAG_COMMAND_RESPONSE_CONTINUATION
405 405
406 406 yield stream.makeframe(requestid=requestid,
407 407 typeid=FRAME_TYPE_COMMAND_RESPONSE,
408 408 flags=flags,
409 409 payload=chunk)
410 410
411 411 if done:
412 412 break
413 413
414 414 def createbytesresponseframesfromgen(stream, requestid, gen,
415 415 maxframesize=DEFAULT_MAX_FRAME_SIZE):
416 416 """Generator of frames from a generator of byte chunks.
417 417
418 418 This assumes that another frame will follow whatever this emits. i.e.
419 419 this always emits the continuation flag and never emits the end-of-stream
420 420 flag.
421 421 """
422 422 cb = util.chunkbuffer(gen)
423 423 flags = FLAG_COMMAND_RESPONSE_CONTINUATION
424 424
425 425 while True:
426 426 chunk = cb.read(maxframesize)
427 427 if not chunk:
428 428 break
429 429
430 430 yield stream.makeframe(requestid=requestid,
431 431 typeid=FRAME_TYPE_COMMAND_RESPONSE,
432 432 flags=flags,
433 433 payload=chunk)
434 434
435 435 flags |= FLAG_COMMAND_RESPONSE_CONTINUATION
436 436
437 437 def createcommandresponseokframe(stream, requestid):
438 438 overall = b''.join(cborutil.streamencode({b'status': b'ok'}))
439 439
440 440 return stream.makeframe(requestid=requestid,
441 441 typeid=FRAME_TYPE_COMMAND_RESPONSE,
442 442 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
443 443 payload=overall)
444 444
445 445 def createcommandresponseeosframe(stream, requestid):
446 446 """Create an empty payload frame representing command end-of-stream."""
447 447 return stream.makeframe(requestid=requestid,
448 448 typeid=FRAME_TYPE_COMMAND_RESPONSE,
449 449 flags=FLAG_COMMAND_RESPONSE_EOS,
450 450 payload=b'')
451 451
452 452 def createalternatelocationresponseframe(stream, requestid, location):
453 453 data = {
454 454 b'status': b'redirect',
455 455 b'location': {
456 456 b'url': location.url,
457 457 b'mediatype': location.mediatype,
458 458 }
459 459 }
460 460
461 461 for a in (r'size', r'fullhashes', r'fullhashseed', r'serverdercerts',
462 462 r'servercadercerts'):
463 463 value = getattr(location, a)
464 464 if value is not None:
465 465 data[b'location'][pycompat.bytestr(a)] = value
466 466
467 467 return stream.makeframe(requestid=requestid,
468 468 typeid=FRAME_TYPE_COMMAND_RESPONSE,
469 469 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
470 470 payload=b''.join(cborutil.streamencode(data)))
471 471
472 472 def createcommanderrorresponse(stream, requestid, message, args=None):
473 473 # TODO should this be using a list of {'msg': ..., 'args': {}} so atom
474 474 # formatting works consistently?
475 475 m = {
476 476 b'status': b'error',
477 477 b'error': {
478 478 b'message': message,
479 479 }
480 480 }
481 481
482 482 if args:
483 483 m[b'error'][b'args'] = args
484 484
485 485 overall = b''.join(cborutil.streamencode(m))
486 486
487 487 yield stream.makeframe(requestid=requestid,
488 488 typeid=FRAME_TYPE_COMMAND_RESPONSE,
489 489 flags=FLAG_COMMAND_RESPONSE_EOS,
490 490 payload=overall)
491 491
492 492 def createerrorframe(stream, requestid, msg, errtype):
493 493 # TODO properly handle frame size limits.
494 494 assert len(msg) <= DEFAULT_MAX_FRAME_SIZE
495 495
496 496 payload = b''.join(cborutil.streamencode({
497 497 b'type': errtype,
498 498 b'message': [{b'msg': msg}],
499 499 }))
500 500
501 501 yield stream.makeframe(requestid=requestid,
502 502 typeid=FRAME_TYPE_ERROR_RESPONSE,
503 503 flags=0,
504 504 payload=payload)
505 505
506 506 def createtextoutputframe(stream, requestid, atoms,
507 507 maxframesize=DEFAULT_MAX_FRAME_SIZE):
508 508 """Create a text output frame to render text to people.
509 509
510 510 ``atoms`` is a 3-tuple of (formatting string, args, labels).
511 511
512 512 The formatting string contains ``%s`` tokens to be replaced by the
513 513 corresponding indexed entry in ``args``. ``labels`` is an iterable of
514 514 formatters to be applied at rendering time. In terms of the ``ui``
515 515 class, each atom corresponds to a ``ui.write()``.
516 516 """
517 517 atomdicts = []
518 518
519 519 for (formatting, args, labels) in atoms:
520 520 # TODO look for localstr, other types here?
521 521
522 522 if not isinstance(formatting, bytes):
523 523 raise ValueError('must use bytes formatting strings')
524 524 for arg in args:
525 525 if not isinstance(arg, bytes):
526 526 raise ValueError('must use bytes for arguments')
527 527 for label in labels:
528 528 if not isinstance(label, bytes):
529 529 raise ValueError('must use bytes for labels')
530 530
531 531 # Formatting string must be ASCII.
532 532 formatting = formatting.decode(r'ascii', r'replace').encode(r'ascii')
533 533
534 534 # Arguments must be UTF-8.
535 535 args = [a.decode(r'utf-8', r'replace').encode(r'utf-8') for a in args]
536 536
537 537 # Labels must be ASCII.
538 538 labels = [l.decode(r'ascii', r'strict').encode(r'ascii')
539 539 for l in labels]
540 540
541 541 atom = {b'msg': formatting}
542 542 if args:
543 543 atom[b'args'] = args
544 544 if labels:
545 545 atom[b'labels'] = labels
546 546
547 547 atomdicts.append(atom)
548 548
549 549 payload = b''.join(cborutil.streamencode(atomdicts))
550 550
551 551 if len(payload) > maxframesize:
552 552 raise ValueError('cannot encode data in a single frame')
553 553
554 554 yield stream.makeframe(requestid=requestid,
555 555 typeid=FRAME_TYPE_TEXT_OUTPUT,
556 556 flags=0,
557 557 payload=payload)
558 558
559 559 class bufferingcommandresponseemitter(object):
560 560 """Helper object to emit command response frames intelligently.
561 561
562 562 Raw command response data is likely emitted in chunks much smaller
563 563 than what can fit in a single frame. This class exists to buffer
564 564 chunks until enough data is available to fit in a single frame.
565 565
566 566 TODO we'll need something like this when compression is supported.
567 567 So it might make sense to implement this functionality at the stream
568 568 level.
569 569 """
570 570 def __init__(self, stream, requestid, maxframesize=DEFAULT_MAX_FRAME_SIZE):
571 571 self._stream = stream
572 572 self._requestid = requestid
573 573 self._maxsize = maxframesize
574 574 self._chunks = []
575 575 self._chunkssize = 0
576 576
577 577 def send(self, data):
578 578 """Send new data for emission.
579 579
580 580 Is a generator of new frames that were derived from the new input.
581 581
582 582 If the special input ``None`` is received, flushes all buffered
583 583 data to frames.
584 584 """
585 585
586 586 if data is None:
587 587 for frame in self._flush():
588 588 yield frame
589 589 return
590 590
591 591 # There is a ton of potential to do more complicated things here.
592 592 # Our immediate goal is to coalesce small chunks into big frames,
593 593 # not achieve the fewest number of frames possible. So we go with
594 594 # a simple implementation:
595 595 #
596 596 # * If a chunk is too large for a frame, we flush and emit frames
597 597 # for the new chunk.
598 598 # * If a chunk can be buffered without total buffered size limits
599 599 # being exceeded, we do that.
600 600 # * If a chunk causes us to go over our buffering limit, we flush
601 601 # and then buffer the new chunk.
602 602
603 603 if len(data) > self._maxsize:
604 604 for frame in self._flush():
605 605 yield frame
606 606
607 607 # Now emit frames for the big chunk.
608 608 offset = 0
609 609 while True:
610 610 chunk = data[offset:offset + self._maxsize]
611 611 offset += len(chunk)
612 612
613 613 yield self._stream.makeframe(
614 614 self._requestid,
615 615 typeid=FRAME_TYPE_COMMAND_RESPONSE,
616 616 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
617 617 payload=chunk)
618 618
619 619 if offset == len(data):
620 620 return
621 621
622 622 # If we don't have enough to constitute a full frame, buffer and
623 623 # return.
624 624 if len(data) + self._chunkssize < self._maxsize:
625 625 self._chunks.append(data)
626 626 self._chunkssize += len(data)
627 627 return
628 628
629 629 # Else flush what we have and buffer the new chunk. We could do
630 630 # something more intelligent here, like break the chunk. Let's
631 631 # keep things simple for now.
632 632 for frame in self._flush():
633 633 yield frame
634 634
635 635 self._chunks.append(data)
636 636 self._chunkssize = len(data)
637 637
638 638 def _flush(self):
639 639 payload = b''.join(self._chunks)
640 640 assert len(payload) <= self._maxsize
641 641
642 642 self._chunks[:] = []
643 643 self._chunkssize = 0
644 644
645 645 yield self._stream.makeframe(
646 646 self._requestid,
647 647 typeid=FRAME_TYPE_COMMAND_RESPONSE,
648 648 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
649 649 payload=payload)
650 650
651 # TODO consider defining encoders/decoders using the util.compressionengine
652 # mechanism.
653
654 class identityencoder(object):
655 """Encoder for the "identity" stream encoding profile."""
656 def __init__(self, ui):
657 pass
658
659 def encode(self, data):
660 return data
661
662 def flush(self):
663 return b''
664
665 def finish(self):
666 return b''
667
668 class identitydecoder(object):
669 """Decoder for the "identity" stream encoding profile."""
670
671 def __init__(self, ui, extraobjs):
672 if extraobjs:
673 raise error.Abort(_('identity decoder received unexpected '
674 'additional values'))
675
676 def decode(self, data):
677 return data
678
679 class zlibencoder(object):
680 def __init__(self, ui):
681 import zlib
682 self._zlib = zlib
683 self._compressor = zlib.compressobj()
684
685 def encode(self, data):
686 return self._compressor.compress(data)
687
688 def flush(self):
689 # Z_SYNC_FLUSH doesn't reset compression context, which is
690 # what we want.
691 return self._compressor.flush(self._zlib.Z_SYNC_FLUSH)
692
693 def finish(self):
694 res = self._compressor.flush(self._zlib.Z_FINISH)
695 self._compressor = None
696 return res
697
698 class zlibdecoder(object):
699 def __init__(self, ui, extraobjs):
700 import zlib
701
702 if extraobjs:
703 raise error.Abort(_('zlib decoder received unexpected '
704 'additional values'))
705
706 self._decompressor = zlib.decompressobj()
707
708 def decode(self, data):
709 # Python 2's zlib module doesn't use the buffer protocol and can't
710 # handle all bytes-like types.
711 if not pycompat.ispy3 and isinstance(data, bytearray):
712 data = bytes(data)
713
714 return self._decompressor.decompress(data)
715
716 class zstdbaseencoder(object):
717 def __init__(self, level):
718 from . import zstd
719
720 self._zstd = zstd
721 cctx = zstd.ZstdCompressor(level=level)
722 self._compressor = cctx.compressobj()
723
724 def encode(self, data):
725 return self._compressor.compress(data)
726
727 def flush(self):
728 # COMPRESSOBJ_FLUSH_BLOCK flushes all data previously fed into the
729 # compressor and allows a decompressor to access all encoded data
730 # up to this point.
731 return self._compressor.flush(self._zstd.COMPRESSOBJ_FLUSH_BLOCK)
732
733 def finish(self):
734 res = self._compressor.flush(self._zstd.COMPRESSOBJ_FLUSH_FINISH)
735 self._compressor = None
736 return res
737
738 class zstd8mbencoder(zstdbaseencoder):
739 def __init__(self, ui):
740 super(zstd8mbencoder, self).__init__(3)
741
742 class zstdbasedecoder(object):
743 def __init__(self, maxwindowsize):
744 from . import zstd
745 dctx = zstd.ZstdDecompressor(max_window_size=maxwindowsize)
746 self._decompressor = dctx.decompressobj()
747
748 def decode(self, data):
749 return self._decompressor.decompress(data)
750
751 class zstd8mbdecoder(zstdbasedecoder):
752 def __init__(self, ui, extraobjs):
753 if extraobjs:
754 raise error.Abort(_('zstd8mb decoder received unexpected '
755 'additional values'))
756
757 super(zstd8mbdecoder, self).__init__(maxwindowsize=8 * 1048576)
758
759 # We lazily populate this to avoid excessive module imports when importing
760 # this module.
761 STREAM_ENCODERS = {}
762 STREAM_ENCODERS_ORDER = []
763
764 def populatestreamencoders():
765 if STREAM_ENCODERS:
766 return
767
768 try:
769 from . import zstd
770 zstd.__version__
771 except ImportError:
772 zstd = None
773
774 # zstandard is fastest and is preferred.
775 if zstd:
776 STREAM_ENCODERS[b'zstd-8mb'] = (zstd8mbencoder, zstd8mbdecoder)
777 STREAM_ENCODERS_ORDER.append(b'zstd-8mb')
778
779 STREAM_ENCODERS[b'zlib'] = (zlibencoder, zlibdecoder)
780 STREAM_ENCODERS_ORDER.append(b'zlib')
781
782 STREAM_ENCODERS[b'identity'] = (identityencoder, identitydecoder)
783 STREAM_ENCODERS_ORDER.append(b'identity')
784
651 785 class stream(object):
652 786 """Represents a logical unidirectional series of frames."""
653 787
654 788 def __init__(self, streamid, active=False):
655 789 self.streamid = streamid
656 790 self._active = active
657 791
658 792 def makeframe(self, requestid, typeid, flags, payload):
659 793 """Create a frame to be sent out over this stream.
660 794
661 795 Only returns the frame instance. Does not actually send it.
662 796 """
663 797 streamflags = 0
664 798 if not self._active:
665 799 streamflags |= STREAM_FLAG_BEGIN_STREAM
666 800 self._active = True
667 801
668 802 return makeframe(requestid, self.streamid, streamflags, typeid, flags,
669 803 payload)
670 804
671 805 class inputstream(stream):
672 806 """Represents a stream used for receiving data."""
673 807
674 def setdecoder(self, name, extraobjs):
808 def __init__(self, streamid, active=False):
809 super(inputstream, self).__init__(streamid, active=active)
810 self._decoder = None
811
812 def setdecoder(self, ui, name, extraobjs):
675 813 """Set the decoder for this stream.
676 814
677 815 Receives the stream profile name and any additional CBOR objects
678 816 decoded from the stream encoding settings frame payloads.
679 817 """
818 if name not in STREAM_ENCODERS:
819 raise error.Abort(_('unknown stream decoder: %s') % name)
820
821 self._decoder = STREAM_ENCODERS[name][1](ui, extraobjs)
822
823 def decode(self, data):
824 # Default is identity decoder. We don't bother instantiating one
825 # because it is trivial.
826 if not self._decoder:
827 return data
828
829 return self._decoder.decode(data)
830
831 def flush(self):
832 if not self._decoder:
833 return b''
834
835 return self._decoder.flush()
680 836
681 837 class outputstream(stream):
682 838 """Represents a stream used for sending data."""
683 839
840 def __init__(self, streamid, active=False):
841 super(outputstream, self).__init__(streamid, active=active)
842 self._encoder = None
843
844 def setencoder(self, ui, name):
845 """Set the encoder for this stream.
846
847 Receives the stream profile name.
848 """
849 if name not in STREAM_ENCODERS:
850 raise error.Abort(_('unknown stream encoder: %s') % name)
851
852 self._encoder = STREAM_ENCODERS[name][0](ui)
853
854 def encode(self, data):
855 if not self._encoder:
856 return data
857
858 return self._encoder.encode(data)
859
860 def flush(self):
861 if not self._encoder:
862 return b''
863
864 return self._encoder.flush()
865
866 def finish(self):
867 if not self._encoder:
868 return b''
869
870 self._encoder.finish()
871
684 872 def ensureserverstream(stream):
685 873 if stream.streamid % 2:
686 874 raise error.ProgrammingError('server should only write to even '
687 875 'numbered streams; %d is not even' %
688 876 stream.streamid)
689 877
690 878 DEFAULT_PROTOCOL_SETTINGS = {
691 879 'contentencodings': [b'identity'],
692 880 }
693 881
694 882 class serverreactor(object):
695 883 """Holds state of a server handling frame-based protocol requests.
696 884
697 885 This class is the "brain" of the unified frame-based protocol server
698 886 component. While the protocol is stateless from the perspective of
699 887 requests/commands, something needs to track which frames have been
700 888 received, what frames to expect, etc. This class is that thing.
701 889
702 890 Instances are modeled as a state machine of sorts. Instances are also
703 891 reactionary to external events. The point of this class is to encapsulate
704 892 the state of the connection and the exchange of frames, not to perform
705 893 work. Instead, callers tell this class when something occurs, like a
706 894 frame arriving. If that activity is worthy of a follow-up action (say
707 895 *run a command*), the return value of that handler will say so.
708 896
709 897 I/O and CPU intensive operations are purposefully delegated outside of
710 898 this class.
711 899
712 900 Consumers are expected to tell instances when events occur. They do so by
713 901 calling the various ``on*`` methods. These methods return a 2-tuple
714 902 describing any follow-up action(s) to take. The first element is the
715 903 name of an action to perform. The second is a data structure (usually
716 904 a dict) specific to that action that contains more information. e.g.
717 905 if the server wants to send frames back to the client, the data structure
718 906 will contain a reference to those frames.
719 907
720 908 Valid actions that consumers can be instructed to take are:
721 909
722 910 sendframes
723 911 Indicates that frames should be sent to the client. The ``framegen``
724 912 key contains a generator of frames that should be sent. The server
725 913 assumes that all frames are sent to the client.
726 914
727 915 error
728 916 Indicates that an error occurred. Consumer should probably abort.
729 917
730 918 runcommand
731 919 Indicates that the consumer should run a wire protocol command. Details
732 920 of the command to run are given in the data structure.
733 921
734 922 wantframe
735 923 Indicates that nothing of interest happened and the server is waiting on
736 924 more frames from the client before anything interesting can be done.
737 925
738 926 noop
739 927 Indicates no additional action is required.
740 928
741 929 Known Issues
742 930 ------------
743 931
744 932 There are no limits to the number of partially received commands or their
745 933 size. A malicious client could stream command request data and exhaust the
746 934 server's memory.
747 935
748 936 Partially received commands are not acted upon when end of input is
749 937 reached. Should the server error if it receives a partial request?
750 938 Should the client send a message to abort a partially transmitted request
751 939 to facilitate graceful shutdown?
752 940
753 941 Active requests that haven't been responded to aren't tracked. This means
754 942 that if we receive a command and instruct its dispatch, another command
755 943 with its request ID can come in over the wire and there will be a race
756 944 between who responds to what.
757 945 """
758 946
759 947 def __init__(self, ui, deferoutput=False):
760 948 """Construct a new server reactor.
761 949
762 950 ``deferoutput`` can be used to indicate that no output frames should be
763 951 instructed to be sent until input has been exhausted. In this mode,
764 952 events that would normally generate output frames (such as a command
765 953 response being ready) will instead defer instructing the consumer to
766 954 send those frames. This is useful for half-duplex transports where the
767 955 sender cannot receive until all data has been transmitted.
768 956 """
769 957 self._ui = ui
770 958 self._deferoutput = deferoutput
771 959 self._state = 'initial'
772 960 self._nextoutgoingstreamid = 2
773 961 self._bufferedframegens = []
774 962 # stream id -> stream instance for all active streams from the client.
775 963 self._incomingstreams = {}
776 964 self._outgoingstreams = {}
777 965 # request id -> dict of commands that are actively being received.
778 966 self._receivingcommands = {}
779 967 # Request IDs that have been received and are actively being processed.
780 968 # Once all output for a request has been sent, it is removed from this
781 969 # set.
782 970 self._activecommands = set()
783 971
784 972 self._protocolsettingsdecoder = None
785 973
786 974 # Sender protocol settings are optional. Set implied default values.
787 975 self._sendersettings = dict(DEFAULT_PROTOCOL_SETTINGS)
788 976
977 populatestreamencoders()
978
789 979 def onframerecv(self, frame):
790 980 """Process a frame that has been received off the wire.
791 981
792 982 Returns a dict with an ``action`` key that details what action,
793 983 if any, the consumer should take next.
794 984 """
795 985 if not frame.streamid % 2:
796 986 self._state = 'errored'
797 987 return self._makeerrorresult(
798 988 _('received frame with even numbered stream ID: %d') %
799 989 frame.streamid)
800 990
801 991 if frame.streamid not in self._incomingstreams:
802 992 if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM:
803 993 self._state = 'errored'
804 994 return self._makeerrorresult(
805 995 _('received frame on unknown inactive stream without '
806 996 'beginning of stream flag set'))
807 997
808 998 self._incomingstreams[frame.streamid] = inputstream(frame.streamid)
809 999
810 1000 if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED:
811 1001 # TODO handle decoding frames
812 1002 self._state = 'errored'
813 1003 raise error.ProgrammingError('support for decoding stream payloads '
814 1004 'not yet implemented')
815 1005
816 1006 if frame.streamflags & STREAM_FLAG_END_STREAM:
817 1007 del self._incomingstreams[frame.streamid]
818 1008
819 1009 handlers = {
820 1010 'initial': self._onframeinitial,
821 1011 'protocol-settings-receiving': self._onframeprotocolsettings,
822 1012 'idle': self._onframeidle,
823 1013 'command-receiving': self._onframecommandreceiving,
824 1014 'errored': self._onframeerrored,
825 1015 }
826 1016
827 1017 meth = handlers.get(self._state)
828 1018 if not meth:
829 1019 raise error.ProgrammingError('unhandled state: %s' % self._state)
830 1020
831 1021 return meth(frame)
832 1022
833 1023 def oncommandresponseready(self, stream, requestid, data):
834 1024 """Signal that a bytes response is ready to be sent to the client.
835 1025
836 1026 The raw bytes response is passed as an argument.
837 1027 """
838 1028 ensureserverstream(stream)
839 1029
840 1030 def sendframes():
841 1031 for frame in createcommandresponseframesfrombytes(stream, requestid,
842 1032 data):
843 1033 yield frame
844 1034
845 1035 self._activecommands.remove(requestid)
846 1036
847 1037 result = sendframes()
848 1038
849 1039 if self._deferoutput:
850 1040 self._bufferedframegens.append(result)
851 1041 return 'noop', {}
852 1042 else:
853 1043 return 'sendframes', {
854 1044 'framegen': result,
855 1045 }
856 1046
857 1047 def oncommandresponsereadyobjects(self, stream, requestid, objs):
858 1048 """Signal that objects are ready to be sent to the client.
859 1049
860 1050 ``objs`` is an iterable of objects (typically a generator) that will
861 1051 be encoded via CBOR and added to frames, which will be sent to the
862 1052 client.
863 1053 """
864 1054 ensureserverstream(stream)
865 1055
866 1056 # We need to take care over exception handling. Uncaught exceptions
867 1057 # when generating frames could lead to premature end of the frame
868 1058 # stream and the possibility of the server or client process getting
869 1059 # in a bad state.
870 1060 #
871 1061 # Keep in mind that if ``objs`` is a generator, advancing it could
872 1062 # raise exceptions that originated in e.g. wire protocol command
873 1063 # functions. That is why we differentiate between exceptions raised
874 1064 # when iterating versus other exceptions that occur.
875 1065 #
876 1066 # In all cases, when the function finishes, the request is fully
877 1067 # handled and no new frames for it should be seen.
878 1068
879 1069 def sendframes():
880 1070 emitted = False
881 1071 alternatelocationsent = False
882 1072 emitter = bufferingcommandresponseemitter(stream, requestid)
883 1073 while True:
884 1074 try:
885 1075 o = next(objs)
886 1076 except StopIteration:
887 1077 for frame in emitter.send(None):
888 1078 yield frame
889 1079
890 1080 if emitted:
891 1081 yield createcommandresponseeosframe(stream, requestid)
892 1082 break
893 1083
894 1084 except error.WireprotoCommandError as e:
895 1085 for frame in createcommanderrorresponse(
896 1086 stream, requestid, e.message, e.messageargs):
897 1087 yield frame
898 1088 break
899 1089
900 1090 except Exception as e:
901 1091 for frame in createerrorframe(
902 1092 stream, requestid, '%s' % stringutil.forcebytestr(e),
903 1093 errtype='server'):
904 1094
905 1095 yield frame
906 1096
907 1097 break
908 1098
909 1099 try:
910 1100 # Alternate location responses can only be the first and
911 1101 # only object in the output stream.
912 1102 if isinstance(o, wireprototypes.alternatelocationresponse):
913 1103 if emitted:
914 1104 raise error.ProgrammingError(
915 1105 'alternatelocationresponse seen after initial '
916 1106 'output object')
917 1107
918 1108 yield createalternatelocationresponseframe(
919 1109 stream, requestid, o)
920 1110
921 1111 alternatelocationsent = True
922 1112 emitted = True
923 1113 continue
924 1114
925 1115 if alternatelocationsent:
926 1116 raise error.ProgrammingError(
927 1117 'object follows alternatelocationresponse')
928 1118
929 1119 if not emitted:
930 1120 yield createcommandresponseokframe(stream, requestid)
931 1121 emitted = True
932 1122
933 1123 # Objects emitted by command functions can be serializable
934 1124 # data structures or special types.
935 1125 # TODO consider extracting the content normalization to a
936 1126 # standalone function, as it may be useful for e.g. cachers.
937 1127
938 1128 # A pre-encoded object is sent directly to the emitter.
939 1129 if isinstance(o, wireprototypes.encodedresponse):
940 1130 for frame in emitter.send(o.data):
941 1131 yield frame
942 1132
943 1133 # A regular object is CBOR encoded.
944 1134 else:
945 1135 for chunk in cborutil.streamencode(o):
946 1136 for frame in emitter.send(chunk):
947 1137 yield frame
948 1138
949 1139 except Exception as e:
950 1140 for frame in createerrorframe(stream, requestid,
951 1141 '%s' % e,
952 1142 errtype='server'):
953 1143 yield frame
954 1144
955 1145 break
956 1146
957 1147 self._activecommands.remove(requestid)
958 1148
959 1149 return self._handlesendframes(sendframes())
960 1150
961 1151 def oninputeof(self):
962 1152 """Signals that end of input has been received.
963 1153
964 1154 No more frames will be received. All pending activity should be
965 1155 completed.
966 1156 """
967 1157 # TODO should we do anything about in-flight commands?
968 1158
969 1159 if not self._deferoutput or not self._bufferedframegens:
970 1160 return 'noop', {}
971 1161
972 1162 # If we buffered all our responses, emit those.
973 1163 def makegen():
974 1164 for gen in self._bufferedframegens:
975 1165 for frame in gen:
976 1166 yield frame
977 1167
978 1168 return 'sendframes', {
979 1169 'framegen': makegen(),
980 1170 }
981 1171
982 1172 def _handlesendframes(self, framegen):
983 1173 if self._deferoutput:
984 1174 self._bufferedframegens.append(framegen)
985 1175 return 'noop', {}
986 1176 else:
987 1177 return 'sendframes', {
988 1178 'framegen': framegen,
989 1179 }
990 1180
991 1181 def onservererror(self, stream, requestid, msg):
992 1182 ensureserverstream(stream)
993 1183
994 1184 def sendframes():
995 1185 for frame in createerrorframe(stream, requestid, msg,
996 1186 errtype='server'):
997 1187 yield frame
998 1188
999 1189 self._activecommands.remove(requestid)
1000 1190
1001 1191 return self._handlesendframes(sendframes())
1002 1192
1003 1193 def oncommanderror(self, stream, requestid, message, args=None):
1004 1194 """Called when a command encountered an error before sending output."""
1005 1195 ensureserverstream(stream)
1006 1196
1007 1197 def sendframes():
1008 1198 for frame in createcommanderrorresponse(stream, requestid, message,
1009 1199 args):
1010 1200 yield frame
1011 1201
1012 1202 self._activecommands.remove(requestid)
1013 1203
1014 1204 return self._handlesendframes(sendframes())
1015 1205
1016 1206 def makeoutputstream(self):
1017 1207 """Create a stream to be used for sending data to the client."""
1018 1208 streamid = self._nextoutgoingstreamid
1019 1209 self._nextoutgoingstreamid += 2
1020 1210
1021 1211 s = outputstream(streamid)
1022 1212 self._outgoingstreams[streamid] = s
1023 1213
1024 1214 return s
1025 1215
1026 1216 def _makeerrorresult(self, msg):
1027 1217 return 'error', {
1028 1218 'message': msg,
1029 1219 }
1030 1220
1031 1221 def _makeruncommandresult(self, requestid):
1032 1222 entry = self._receivingcommands[requestid]
1033 1223
1034 1224 if not entry['requestdone']:
1035 1225 self._state = 'errored'
1036 1226 raise error.ProgrammingError('should not be called without '
1037 1227 'requestdone set')
1038 1228
1039 1229 del self._receivingcommands[requestid]
1040 1230
1041 1231 if self._receivingcommands:
1042 1232 self._state = 'command-receiving'
1043 1233 else:
1044 1234 self._state = 'idle'
1045 1235
1046 1236 # Decode the payloads as CBOR.
1047 1237 entry['payload'].seek(0)
1048 1238 request = cborutil.decodeall(entry['payload'].getvalue())[0]
1049 1239
1050 1240 if b'name' not in request:
1051 1241 self._state = 'errored'
1052 1242 return self._makeerrorresult(
1053 1243 _('command request missing "name" field'))
1054 1244
1055 1245 if b'args' not in request:
1056 1246 request[b'args'] = {}
1057 1247
1058 1248 assert requestid not in self._activecommands
1059 1249 self._activecommands.add(requestid)
1060 1250
1061 1251 return 'runcommand', {
1062 1252 'requestid': requestid,
1063 1253 'command': request[b'name'],
1064 1254 'args': request[b'args'],
1065 1255 'redirect': request.get(b'redirect'),
1066 1256 'data': entry['data'].getvalue() if entry['data'] else None,
1067 1257 }
1068 1258
1069 1259 def _makewantframeresult(self):
1070 1260 return 'wantframe', {
1071 1261 'state': self._state,
1072 1262 }
1073 1263
1074 1264 def _validatecommandrequestframe(self, frame):
1075 1265 new = frame.flags & FLAG_COMMAND_REQUEST_NEW
1076 1266 continuation = frame.flags & FLAG_COMMAND_REQUEST_CONTINUATION
1077 1267
1078 1268 if new and continuation:
1079 1269 self._state = 'errored'
1080 1270 return self._makeerrorresult(
1081 1271 _('received command request frame with both new and '
1082 1272 'continuation flags set'))
1083 1273
1084 1274 if not new and not continuation:
1085 1275 self._state = 'errored'
1086 1276 return self._makeerrorresult(
1087 1277 _('received command request frame with neither new nor '
1088 1278 'continuation flags set'))
1089 1279
1090 1280 def _onframeinitial(self, frame):
1091 1281 # Called when we receive a frame when in the "initial" state.
1092 1282 if frame.typeid == FRAME_TYPE_SENDER_PROTOCOL_SETTINGS:
1093 1283 self._state = 'protocol-settings-receiving'
1094 1284 self._protocolsettingsdecoder = cborutil.bufferingdecoder()
1095 1285 return self._onframeprotocolsettings(frame)
1096 1286
1097 1287 elif frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
1098 1288 self._state = 'idle'
1099 1289 return self._onframeidle(frame)
1100 1290
1101 1291 else:
1102 1292 self._state = 'errored'
1103 1293 return self._makeerrorresult(
1104 1294 _('expected sender protocol settings or command request '
1105 1295 'frame; got %d') % frame.typeid)
1106 1296
1107 1297 def _onframeprotocolsettings(self, frame):
1108 1298 assert self._state == 'protocol-settings-receiving'
1109 1299 assert self._protocolsettingsdecoder is not None
1110 1300
1111 1301 if frame.typeid != FRAME_TYPE_SENDER_PROTOCOL_SETTINGS:
1112 1302 self._state = 'errored'
1113 1303 return self._makeerrorresult(
1114 1304 _('expected sender protocol settings frame; got %d') %
1115 1305 frame.typeid)
1116 1306
1117 1307 more = frame.flags & FLAG_SENDER_PROTOCOL_SETTINGS_CONTINUATION
1118 1308 eos = frame.flags & FLAG_SENDER_PROTOCOL_SETTINGS_EOS
1119 1309
1120 1310 if more and eos:
1121 1311 self._state = 'errored'
1122 1312 return self._makeerrorresult(
1123 1313 _('sender protocol settings frame cannot have both '
1124 1314 'continuation and end of stream flags set'))
1125 1315
1126 1316 if not more and not eos:
1127 1317 self._state = 'errored'
1128 1318 return self._makeerrorresult(
1129 1319 _('sender protocol settings frame must have continuation or '
1130 1320 'end of stream flag set'))
1131 1321
1132 1322 # TODO establish limits for maximum amount of data that can be
1133 1323 # buffered.
1134 1324 try:
1135 1325 self._protocolsettingsdecoder.decode(frame.payload)
1136 1326 except Exception as e:
1137 1327 self._state = 'errored'
1138 1328 return self._makeerrorresult(
1139 1329 _('error decoding CBOR from sender protocol settings frame: %s')
1140 1330 % stringutil.forcebytestr(e))
1141 1331
1142 1332 if more:
1143 1333 return self._makewantframeresult()
1144 1334
1145 1335 assert eos
1146 1336
1147 1337 decoded = self._protocolsettingsdecoder.getavailable()
1148 1338 self._protocolsettingsdecoder = None
1149 1339
1150 1340 if not decoded:
1151 1341 self._state = 'errored'
1152 1342 return self._makeerrorresult(
1153 1343 _('sender protocol settings frame did not contain CBOR data'))
1154 1344 elif len(decoded) > 1:
1155 1345 self._state = 'errored'
1156 1346 return self._makeerrorresult(
1157 1347 _('sender protocol settings frame contained multiple CBOR '
1158 1348 'values'))
1159 1349
1160 1350 d = decoded[0]
1161 1351
1162 1352 if b'contentencodings' in d:
1163 1353 self._sendersettings['contentencodings'] = d[b'contentencodings']
1164 1354
1165 1355 self._state = 'idle'
1166 1356
1167 1357 return self._makewantframeresult()
1168 1358
1169 1359 def _onframeidle(self, frame):
1170 1360 # The only frame type that should be received in this state is a
1171 1361 # command request.
1172 1362 if frame.typeid != FRAME_TYPE_COMMAND_REQUEST:
1173 1363 self._state = 'errored'
1174 1364 return self._makeerrorresult(
1175 1365 _('expected command request frame; got %d') % frame.typeid)
1176 1366
1177 1367 res = self._validatecommandrequestframe(frame)
1178 1368 if res:
1179 1369 return res
1180 1370
1181 1371 if frame.requestid in self._receivingcommands:
1182 1372 self._state = 'errored'
1183 1373 return self._makeerrorresult(
1184 1374 _('request with ID %d already received') % frame.requestid)
1185 1375
1186 1376 if frame.requestid in self._activecommands:
1187 1377 self._state = 'errored'
1188 1378 return self._makeerrorresult(
1189 1379 _('request with ID %d is already active') % frame.requestid)
1190 1380
1191 1381 new = frame.flags & FLAG_COMMAND_REQUEST_NEW
1192 1382 moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES
1193 1383 expectingdata = frame.flags & FLAG_COMMAND_REQUEST_EXPECT_DATA
1194 1384
1195 1385 if not new:
1196 1386 self._state = 'errored'
1197 1387 return self._makeerrorresult(
1198 1388 _('received command request frame without new flag set'))
1199 1389
1200 1390 payload = util.bytesio()
1201 1391 payload.write(frame.payload)
1202 1392
1203 1393 self._receivingcommands[frame.requestid] = {
1204 1394 'payload': payload,
1205 1395 'data': None,
1206 1396 'requestdone': not moreframes,
1207 1397 'expectingdata': bool(expectingdata),
1208 1398 }
1209 1399
1210 1400 # This is the final frame for this request. Dispatch it.
1211 1401 if not moreframes and not expectingdata:
1212 1402 return self._makeruncommandresult(frame.requestid)
1213 1403
1214 1404 assert moreframes or expectingdata
1215 1405 self._state = 'command-receiving'
1216 1406 return self._makewantframeresult()
1217 1407
1218 1408 def _onframecommandreceiving(self, frame):
1219 1409 if frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
1220 1410 # Process new command requests as such.
1221 1411 if frame.flags & FLAG_COMMAND_REQUEST_NEW:
1222 1412 return self._onframeidle(frame)
1223 1413
1224 1414 res = self._validatecommandrequestframe(frame)
1225 1415 if res:
1226 1416 return res
1227 1417
1228 1418 # All other frames should be related to a command that is currently
1229 1419 # receiving but is not active.
1230 1420 if frame.requestid in self._activecommands:
1231 1421 self._state = 'errored'
1232 1422 return self._makeerrorresult(
1233 1423 _('received frame for request that is still active: %d') %
1234 1424 frame.requestid)
1235 1425
1236 1426 if frame.requestid not in self._receivingcommands:
1237 1427 self._state = 'errored'
1238 1428 return self._makeerrorresult(
1239 1429 _('received frame for request that is not receiving: %d') %
1240 1430 frame.requestid)
1241 1431
1242 1432 entry = self._receivingcommands[frame.requestid]
1243 1433
1244 1434 if frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
1245 1435 moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES
1246 1436 expectingdata = bool(frame.flags & FLAG_COMMAND_REQUEST_EXPECT_DATA)
1247 1437
1248 1438 if entry['requestdone']:
1249 1439 self._state = 'errored'
1250 1440 return self._makeerrorresult(
1251 1441 _('received command request frame when request frames '
1252 1442 'were supposedly done'))
1253 1443
1254 1444 if expectingdata != entry['expectingdata']:
1255 1445 self._state = 'errored'
1256 1446 return self._makeerrorresult(
1257 1447 _('mismatch between expect data flag and previous frame'))
1258 1448
1259 1449 entry['payload'].write(frame.payload)
1260 1450
1261 1451 if not moreframes:
1262 1452 entry['requestdone'] = True
1263 1453
1264 1454 if not moreframes and not expectingdata:
1265 1455 return self._makeruncommandresult(frame.requestid)
1266 1456
1267 1457 return self._makewantframeresult()
1268 1458
1269 1459 elif frame.typeid == FRAME_TYPE_COMMAND_DATA:
1270 1460 if not entry['expectingdata']:
1271 1461 self._state = 'errored'
1272 1462 return self._makeerrorresult(_(
1273 1463 'received command data frame for request that is not '
1274 1464 'expecting data: %d') % frame.requestid)
1275 1465
1276 1466 if entry['data'] is None:
1277 1467 entry['data'] = util.bytesio()
1278 1468
1279 1469 return self._handlecommanddataframe(frame, entry)
1280 1470 else:
1281 1471 self._state = 'errored'
1282 1472 return self._makeerrorresult(_(
1283 1473 'received unexpected frame type: %d') % frame.typeid)
1284 1474
1285 1475 def _handlecommanddataframe(self, frame, entry):
1286 1476 assert frame.typeid == FRAME_TYPE_COMMAND_DATA
1287 1477
1288 1478 # TODO support streaming data instead of buffering it.
1289 1479 entry['data'].write(frame.payload)
1290 1480
1291 1481 if frame.flags & FLAG_COMMAND_DATA_CONTINUATION:
1292 1482 return self._makewantframeresult()
1293 1483 elif frame.flags & FLAG_COMMAND_DATA_EOS:
1294 1484 entry['data'].seek(0)
1295 1485 return self._makeruncommandresult(frame.requestid)
1296 1486 else:
1297 1487 self._state = 'errored'
1298 1488 return self._makeerrorresult(_('command data frame without '
1299 1489 'flags'))
1300 1490
1301 1491 def _onframeerrored(self, frame):
1302 1492 return self._makeerrorresult(_('server already errored'))
1303 1493
1304 1494 class commandrequest(object):
1305 1495 """Represents a request to run a command."""
1306 1496
1307 1497 def __init__(self, requestid, name, args, datafh=None, redirect=None):
1308 1498 self.requestid = requestid
1309 1499 self.name = name
1310 1500 self.args = args
1311 1501 self.datafh = datafh
1312 1502 self.redirect = redirect
1313 1503 self.state = 'pending'
1314 1504
1315 1505 class clientreactor(object):
1316 1506 """Holds state of a client issuing frame-based protocol requests.
1317 1507
1318 1508 This is like ``serverreactor`` but for client-side state.
1319 1509
1320 1510 Each instance is bound to the lifetime of a connection. For persistent
1321 1511 connection transports using e.g. TCP sockets and speaking the raw
1322 1512 framing protocol, there will be a single instance for the lifetime of
1323 1513 the TCP socket. For transports where there are multiple discrete
1324 1514 interactions (say tunneled within in HTTP request), there will be a
1325 1515 separate instance for each distinct interaction.
1326 1516
1327 1517 Consumers are expected to tell instances when events occur by calling
1328 1518 various methods. These methods return a 2-tuple describing any follow-up
1329 1519 action(s) to take. The first element is the name of an action to
1330 1520 perform. The second is a data structure (usually a dict) specific to
1331 1521 that action that contains more information. e.g. if the reactor wants
1332 1522 to send frames to the server, the data structure will contain a reference
1333 1523 to those frames.
1334 1524
1335 1525 Valid actions that consumers can be instructed to take are:
1336 1526
1337 1527 noop
1338 1528 Indicates no additional action is required.
1339 1529
1340 1530 sendframes
1341 1531 Indicates that frames should be sent to the server. The ``framegen``
1342 1532 key contains a generator of frames that should be sent. The reactor
1343 1533 assumes that all frames in this generator are sent to the server.
1344 1534
1345 1535 error
1346 1536 Indicates that an error occurred. The ``message`` key contains an
1347 1537 error message describing the failure.
1348 1538
1349 1539 responsedata
1350 1540 Indicates a response to a previously-issued command was received.
1351 1541
1352 1542 The ``request`` key contains the ``commandrequest`` instance that
1353 1543 represents the request this data is for.
1354 1544
1355 1545 The ``data`` key contains the decoded data from the server.
1356 1546
1357 1547 ``expectmore`` and ``eos`` evaluate to True when more response data
1358 1548 is expected to follow or we're at the end of the response stream,
1359 1549 respectively.
1360 1550 """
1361 1551 def __init__(self, ui, hasmultiplesend=False, buffersends=True):
1362 1552 """Create a new instance.
1363 1553
1364 1554 ``hasmultiplesend`` indicates whether multiple sends are supported
1365 1555 by the transport. When True, it is possible to send commands immediately
1366 1556 instead of buffering until the caller signals an intent to finish a
1367 1557 send operation.
1368 1558
1369 1559 ``buffercommands`` indicates whether sends should be buffered until the
1370 1560 last request has been issued.
1371 1561 """
1372 1562 self._ui = ui
1373 1563 self._hasmultiplesend = hasmultiplesend
1374 1564 self._buffersends = buffersends
1375 1565
1376 1566 self._canissuecommands = True
1377 1567 self._cansend = True
1378 1568
1379 1569 self._nextrequestid = 1
1380 1570 # We only support a single outgoing stream for now.
1381 1571 self._outgoingstream = outputstream(1)
1382 1572 self._pendingrequests = collections.deque()
1383 1573 self._activerequests = {}
1384 1574 self._incomingstreams = {}
1385 1575 self._streamsettingsdecoders = {}
1386 1576
1577 populatestreamencoders()
1578
1387 1579 def callcommand(self, name, args, datafh=None, redirect=None):
1388 1580 """Request that a command be executed.
1389 1581
1390 1582 Receives the command name, a dict of arguments to pass to the command,
1391 1583 and an optional file object containing the raw data for the command.
1392 1584
1393 1585 Returns a 3-tuple of (request, action, action data).
1394 1586 """
1395 1587 if not self._canissuecommands:
1396 1588 raise error.ProgrammingError('cannot issue new commands')
1397 1589
1398 1590 requestid = self._nextrequestid
1399 1591 self._nextrequestid += 2
1400 1592
1401 1593 request = commandrequest(requestid, name, args, datafh=datafh,
1402 1594 redirect=redirect)
1403 1595
1404 1596 if self._buffersends:
1405 1597 self._pendingrequests.append(request)
1406 1598 return request, 'noop', {}
1407 1599 else:
1408 1600 if not self._cansend:
1409 1601 raise error.ProgrammingError('sends cannot be performed on '
1410 1602 'this instance')
1411 1603
1412 1604 if not self._hasmultiplesend:
1413 1605 self._cansend = False
1414 1606 self._canissuecommands = False
1415 1607
1416 1608 return request, 'sendframes', {
1417 1609 'framegen': self._makecommandframes(request),
1418 1610 }
1419 1611
1420 1612 def flushcommands(self):
1421 1613 """Request that all queued commands be sent.
1422 1614
1423 1615 If any commands are buffered, this will instruct the caller to send
1424 1616 them over the wire. If no commands are buffered it instructs the client
1425 1617 to no-op.
1426 1618
1427 1619 If instances aren't configured for multiple sends, no new command
1428 1620 requests are allowed after this is called.
1429 1621 """
1430 1622 if not self._pendingrequests:
1431 1623 return 'noop', {}
1432 1624
1433 1625 if not self._cansend:
1434 1626 raise error.ProgrammingError('sends cannot be performed on this '
1435 1627 'instance')
1436 1628
1437 1629 # If the instance only allows sending once, mark that we have fired
1438 1630 # our one shot.
1439 1631 if not self._hasmultiplesend:
1440 1632 self._canissuecommands = False
1441 1633 self._cansend = False
1442 1634
1443 1635 def makeframes():
1444 1636 while self._pendingrequests:
1445 1637 request = self._pendingrequests.popleft()
1446 1638 for frame in self._makecommandframes(request):
1447 1639 yield frame
1448 1640
1449 1641 return 'sendframes', {
1450 1642 'framegen': makeframes(),
1451 1643 }
1452 1644
1453 1645 def _makecommandframes(self, request):
1454 1646 """Emit frames to issue a command request.
1455 1647
1456 1648 As a side-effect, update request accounting to reflect its changed
1457 1649 state.
1458 1650 """
1459 1651 self._activerequests[request.requestid] = request
1460 1652 request.state = 'sending'
1461 1653
1462 1654 res = createcommandframes(self._outgoingstream,
1463 1655 request.requestid,
1464 1656 request.name,
1465 1657 request.args,
1466 1658 datafh=request.datafh,
1467 1659 redirect=request.redirect)
1468 1660
1469 1661 for frame in res:
1470 1662 yield frame
1471 1663
1472 1664 request.state = 'sent'
1473 1665
1474 1666 def onframerecv(self, frame):
1475 1667 """Process a frame that has been received off the wire.
1476 1668
1477 1669 Returns a 2-tuple of (action, meta) describing further action the
1478 1670 caller needs to take as a result of receiving this frame.
1479 1671 """
1480 1672 if frame.streamid % 2:
1481 1673 return 'error', {
1482 1674 'message': (
1483 1675 _('received frame with odd numbered stream ID: %d') %
1484 1676 frame.streamid),
1485 1677 }
1486 1678
1487 1679 if frame.streamid not in self._incomingstreams:
1488 1680 if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM:
1489 1681 return 'error', {
1490 1682 'message': _('received frame on unknown stream '
1491 1683 'without beginning of stream flag set'),
1492 1684 }
1493 1685
1494 1686 self._incomingstreams[frame.streamid] = inputstream(
1495 1687 frame.streamid)
1496 1688
1689 stream = self._incomingstreams[frame.streamid]
1690
1691 # If the payload is encoded, ask the stream to decode it. We
1692 # merely substitute the decoded result into the frame payload as
1693 # if it had been transferred all along.
1497 1694 if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED:
1498 raise error.ProgrammingError('support for decoding stream '
1499 'payloads not yet implemneted')
1695 frame.payload = stream.decode(frame.payload)
1500 1696
1501 1697 if frame.streamflags & STREAM_FLAG_END_STREAM:
1502 1698 del self._incomingstreams[frame.streamid]
1503 1699
1504 1700 if frame.typeid == FRAME_TYPE_STREAM_SETTINGS:
1505 1701 return self._onstreamsettingsframe(frame)
1506 1702
1507 1703 if frame.requestid not in self._activerequests:
1508 1704 return 'error', {
1509 1705 'message': (_('received frame for inactive request ID: %d') %
1510 1706 frame.requestid),
1511 1707 }
1512 1708
1513 1709 request = self._activerequests[frame.requestid]
1514 1710 request.state = 'receiving'
1515 1711
1516 1712 handlers = {
1517 1713 FRAME_TYPE_COMMAND_RESPONSE: self._oncommandresponseframe,
1518 1714 FRAME_TYPE_ERROR_RESPONSE: self._onerrorresponseframe,
1519 1715 }
1520 1716
1521 1717 meth = handlers.get(frame.typeid)
1522 1718 if not meth:
1523 1719 raise error.ProgrammingError('unhandled frame type: %d' %
1524 1720 frame.typeid)
1525 1721
1526 1722 return meth(request, frame)
1527 1723
1528 1724 def _onstreamsettingsframe(self, frame):
1529 1725 assert frame.typeid == FRAME_TYPE_STREAM_SETTINGS
1530 1726
1531 1727 more = frame.flags & FLAG_STREAM_ENCODING_SETTINGS_CONTINUATION
1532 1728 eos = frame.flags & FLAG_STREAM_ENCODING_SETTINGS_EOS
1533 1729
1534 1730 if more and eos:
1535 1731 return 'error', {
1536 1732 'message': (_('stream encoding settings frame cannot have both '
1537 1733 'continuation and end of stream flags set')),
1538 1734 }
1539 1735
1540 1736 if not more and not eos:
1541 1737 return 'error', {
1542 1738 'message': _('stream encoding settings frame must have '
1543 1739 'continuation or end of stream flag set'),
1544 1740 }
1545 1741
1546 1742 if frame.streamid not in self._streamsettingsdecoders:
1547 1743 decoder = cborutil.bufferingdecoder()
1548 1744 self._streamsettingsdecoders[frame.streamid] = decoder
1549 1745
1550 1746 decoder = self._streamsettingsdecoders[frame.streamid]
1551 1747
1552 1748 try:
1553 1749 decoder.decode(frame.payload)
1554 1750 except Exception as e:
1555 1751 return 'error', {
1556 1752 'message': (_('error decoding CBOR from stream encoding '
1557 1753 'settings frame: %s') %
1558 1754 stringutil.forcebytestr(e)),
1559 1755 }
1560 1756
1561 1757 if more:
1562 1758 return 'noop', {}
1563 1759
1564 1760 assert eos
1565 1761
1566 1762 decoded = decoder.getavailable()
1567 1763 del self._streamsettingsdecoders[frame.streamid]
1568 1764
1569 1765 if not decoded:
1570 1766 return 'error', {
1571 1767 'message': _('stream encoding settings frame did not contain '
1572 1768 'CBOR data'),
1573 1769 }
1574 1770
1575 1771 try:
1576 self._incomingstreams[frame.streamid].setdecoder(decoded[0],
1772 self._incomingstreams[frame.streamid].setdecoder(self._ui,
1773 decoded[0],
1577 1774 decoded[1:])
1578 1775 except Exception as e:
1579 1776 return 'error', {
1580 1777 'message': (_('error setting stream decoder: %s') %
1581 1778 stringutil.forcebytestr(e)),
1582 1779 }
1583 1780
1584 1781 return 'noop', {}
1585 1782
1586 1783 def _oncommandresponseframe(self, request, frame):
1587 1784 if frame.flags & FLAG_COMMAND_RESPONSE_EOS:
1588 1785 request.state = 'received'
1589 1786 del self._activerequests[request.requestid]
1590 1787
1591 1788 return 'responsedata', {
1592 1789 'request': request,
1593 1790 'expectmore': frame.flags & FLAG_COMMAND_RESPONSE_CONTINUATION,
1594 1791 'eos': frame.flags & FLAG_COMMAND_RESPONSE_EOS,
1595 1792 'data': frame.payload,
1596 1793 }
1597 1794
1598 1795 def _onerrorresponseframe(self, request, frame):
1599 1796 request.state = 'errored'
1600 1797 del self._activerequests[request.requestid]
1601 1798
1602 1799 # The payload should be a CBOR map.
1603 1800 m = cborutil.decodeall(frame.payload)[0]
1604 1801
1605 1802 return 'error', {
1606 1803 'request': request,
1607 1804 'type': m['type'],
1608 1805 'message': m['message'],
1609 1806 }
@@ -1,291 +1,604 b''
1 1 from __future__ import absolute_import
2 2
3 3 import unittest
4 import zlib
4 5
5 6 from mercurial import (
6 7 error,
7 8 ui as uimod,
8 9 wireprotoframing as framing,
9 10 )
10 11 from mercurial.utils import (
11 12 cborutil,
12 13 )
13 14
15 try:
16 from mercurial import zstd
17 zstd.__version__
18 except ImportError:
19 zstd = None
20
14 21 ffs = framing.makeframefromhumanstring
15 22
16 23 globalui = uimod.ui()
17 24
18 25 def sendframe(reactor, frame):
19 26 """Send a frame bytearray to a reactor."""
20 27 header = framing.parseheader(frame)
21 28 payload = frame[framing.FRAME_HEADER_SIZE:]
22 29 assert len(payload) == header.length
23 30
24 31 return reactor.onframerecv(framing.frame(header.requestid,
25 32 header.streamid,
26 33 header.streamflags,
27 34 header.typeid,
28 35 header.flags,
29 36 payload))
30 37
31 38 class SingleSendTests(unittest.TestCase):
32 39 """A reactor that can only send once rejects subsequent sends."""
33 40
34 41 if not getattr(unittest.TestCase, 'assertRaisesRegex', False):
35 42 # Python 3.7 deprecates the regex*p* version, but 2.7 lacks
36 43 # the regex version.
37 44 assertRaisesRegex = (# camelcase-required
38 45 unittest.TestCase.assertRaisesRegexp)
39 46
40 47 def testbasic(self):
41 48 reactor = framing.clientreactor(globalui,
42 49 hasmultiplesend=False,
43 50 buffersends=True)
44 51
45 52 request, action, meta = reactor.callcommand(b'foo', {})
46 53 self.assertEqual(request.state, b'pending')
47 54 self.assertEqual(action, b'noop')
48 55
49 56 action, meta = reactor.flushcommands()
50 57 self.assertEqual(action, b'sendframes')
51 58
52 59 for frame in meta[b'framegen']:
53 60 self.assertEqual(request.state, b'sending')
54 61
55 62 self.assertEqual(request.state, b'sent')
56 63
57 64 with self.assertRaisesRegex(error.ProgrammingError,
58 65 'cannot issue new commands'):
59 66 reactor.callcommand(b'foo', {})
60 67
61 68 with self.assertRaisesRegex(error.ProgrammingError,
62 69 'cannot issue new commands'):
63 70 reactor.callcommand(b'foo', {})
64 71
65 72 class NoBufferTests(unittest.TestCase):
66 73 """A reactor without send buffering sends requests immediately."""
67 74 def testbasic(self):
68 75 reactor = framing.clientreactor(globalui,
69 76 hasmultiplesend=True,
70 77 buffersends=False)
71 78
72 79 request, action, meta = reactor.callcommand(b'command1', {})
73 80 self.assertEqual(request.requestid, 1)
74 81 self.assertEqual(action, b'sendframes')
75 82
76 83 self.assertEqual(request.state, b'pending')
77 84
78 85 for frame in meta[b'framegen']:
79 86 self.assertEqual(request.state, b'sending')
80 87
81 88 self.assertEqual(request.state, b'sent')
82 89
83 90 action, meta = reactor.flushcommands()
84 91 self.assertEqual(action, b'noop')
85 92
86 93 # And we can send another command.
87 94 request, action, meta = reactor.callcommand(b'command2', {})
88 95 self.assertEqual(request.requestid, 3)
89 96 self.assertEqual(action, b'sendframes')
90 97
91 98 for frame in meta[b'framegen']:
92 99 self.assertEqual(request.state, b'sending')
93 100
94 101 self.assertEqual(request.state, b'sent')
95 102
96 103 class BadFrameRecvTests(unittest.TestCase):
97 104 if not getattr(unittest.TestCase, 'assertRaisesRegex', False):
98 105 # Python 3.7 deprecates the regex*p* version, but 2.7 lacks
99 106 # the regex version.
100 107 assertRaisesRegex = (# camelcase-required
101 108 unittest.TestCase.assertRaisesRegexp)
102 109
103 110 def testoddstream(self):
104 111 reactor = framing.clientreactor(globalui)
105 112
106 113 action, meta = sendframe(reactor, ffs(b'1 1 0 1 0 foo'))
107 114 self.assertEqual(action, b'error')
108 115 self.assertEqual(meta[b'message'],
109 116 b'received frame with odd numbered stream ID: 1')
110 117
111 118 def testunknownstream(self):
112 119 reactor = framing.clientreactor(globalui)
113 120
114 121 action, meta = sendframe(reactor, ffs(b'1 0 0 1 0 foo'))
115 122 self.assertEqual(action, b'error')
116 123 self.assertEqual(meta[b'message'],
117 124 b'received frame on unknown stream without beginning '
118 125 b'of stream flag set')
119 126
120 127 def testunhandledframetype(self):
121 128 reactor = framing.clientreactor(globalui, buffersends=False)
122 129
123 130 request, action, meta = reactor.callcommand(b'foo', {})
124 131 for frame in meta[b'framegen']:
125 132 pass
126 133
127 134 with self.assertRaisesRegex(error.ProgrammingError,
128 135 'unhandled frame type'):
129 136 sendframe(reactor, ffs(b'1 0 stream-begin text-output 0 foo'))
130 137
131 138 class StreamTests(unittest.TestCase):
132 139 def testmultipleresponseframes(self):
133 140 reactor = framing.clientreactor(globalui, buffersends=False)
134 141
135 142 request, action, meta = reactor.callcommand(b'foo', {})
136 143
137 144 self.assertEqual(action, b'sendframes')
138 145 for f in meta[b'framegen']:
139 146 pass
140 147
141 148 action, meta = sendframe(
142 149 reactor,
143 150 ffs(b'%d 0 stream-begin command-response 0 foo' %
144 151 request.requestid))
145 152 self.assertEqual(action, b'responsedata')
146 153
147 154 action, meta = sendframe(
148 155 reactor,
149 156 ffs(b'%d 0 0 command-response eos bar' % request.requestid))
150 157 self.assertEqual(action, b'responsedata')
151 158
152 159 class RedirectTests(unittest.TestCase):
153 160 def testredirect(self):
154 161 reactor = framing.clientreactor(globalui, buffersends=False)
155 162
156 163 redirect = {
157 164 b'targets': [b'a', b'b'],
158 165 b'hashes': [b'sha256'],
159 166 }
160 167
161 168 request, action, meta = reactor.callcommand(
162 169 b'foo', {}, redirect=redirect)
163 170
164 171 self.assertEqual(action, b'sendframes')
165 172
166 173 frames = list(meta[b'framegen'])
167 174 self.assertEqual(len(frames), 1)
168 175
169 176 self.assertEqual(frames[0],
170 177 ffs(b'1 1 stream-begin command-request new '
171 178 b"cbor:{b'name': b'foo', "
172 179 b"b'redirect': {b'targets': [b'a', b'b'], "
173 180 b"b'hashes': [b'sha256']}}"))
174 181
175 182 class StreamSettingsTests(unittest.TestCase):
176 183 def testnoflags(self):
177 184 reactor = framing.clientreactor(globalui, buffersends=False)
178 185
179 186 request, action, meta = reactor.callcommand(b'foo', {})
180 187 for f in meta[b'framegen']:
181 188 pass
182 189
183 190 action, meta = sendframe(reactor,
184 191 ffs(b'1 2 stream-begin stream-settings 0 '))
185 192
186 193 self.assertEqual(action, b'error')
187 194 self.assertEqual(meta, {
188 195 b'message': b'stream encoding settings frame must have '
189 196 b'continuation or end of stream flag set',
190 197 })
191 198
192 199 def testconflictflags(self):
193 200 reactor = framing.clientreactor(globalui, buffersends=False)
194 201
195 202 request, action, meta = reactor.callcommand(b'foo', {})
196 203 for f in meta[b'framegen']:
197 204 pass
198 205
199 206 action, meta = sendframe(reactor,
200 207 ffs(b'1 2 stream-begin stream-settings continuation|eos '))
201 208
202 209 self.assertEqual(action, b'error')
203 210 self.assertEqual(meta, {
204 211 b'message': b'stream encoding settings frame cannot have both '
205 212 b'continuation and end of stream flags set',
206 213 })
207 214
208 215 def testemptypayload(self):
209 216 reactor = framing.clientreactor(globalui, buffersends=False)
210 217
211 218 request, action, meta = reactor.callcommand(b'foo', {})
212 219 for f in meta[b'framegen']:
213 220 pass
214 221
215 222 action, meta = sendframe(reactor,
216 223 ffs(b'1 2 stream-begin stream-settings eos '))
217 224
218 225 self.assertEqual(action, b'error')
219 226 self.assertEqual(meta, {
220 227 b'message': b'stream encoding settings frame did not contain '
221 228 b'CBOR data'
222 229 })
223 230
224 231 def testbadcbor(self):
225 232 reactor = framing.clientreactor(globalui, buffersends=False)
226 233
227 234 request, action, meta = reactor.callcommand(b'foo', {})
228 235 for f in meta[b'framegen']:
229 236 pass
230 237
231 238 action, meta = sendframe(reactor,
232 239 ffs(b'1 2 stream-begin stream-settings eos badvalue'))
233 240
234 241 self.assertEqual(action, b'error')
235 242
236 243 def testsingleobject(self):
237 244 reactor = framing.clientreactor(globalui, buffersends=False)
238 245
239 246 request, action, meta = reactor.callcommand(b'foo', {})
240 247 for f in meta[b'framegen']:
241 248 pass
242 249
243 250 action, meta = sendframe(reactor,
244 251 ffs(b'1 2 stream-begin stream-settings eos cbor:b"identity"'))
245 252
246 253 self.assertEqual(action, b'noop')
247 254 self.assertEqual(meta, {})
248 255
249 256 def testmultipleobjects(self):
250 257 reactor = framing.clientreactor(globalui, buffersends=False)
251 258
252 259 request, action, meta = reactor.callcommand(b'foo', {})
253 260 for f in meta[b'framegen']:
254 261 pass
255 262
256 263 data = b''.join([
257 264 b''.join(cborutil.streamencode(b'identity')),
258 265 b''.join(cborutil.streamencode({b'foo', b'bar'})),
259 266 ])
260 267
261 268 action, meta = sendframe(reactor,
262 269 ffs(b'1 2 stream-begin stream-settings eos %s' % data))
263 270
264 self.assertEqual(action, b'noop')
265 self.assertEqual(meta, {})
271 self.assertEqual(action, b'error')
272 self.assertEqual(meta, {
273 b'message': b'error setting stream decoder: identity decoder '
274 b'received unexpected additional values',
275 })
266 276
267 277 def testmultipleframes(self):
268 278 reactor = framing.clientreactor(globalui, buffersends=False)
269 279
270 280 request, action, meta = reactor.callcommand(b'foo', {})
271 281 for f in meta[b'framegen']:
272 282 pass
273 283
274 284 data = b''.join(cborutil.streamencode(b'identity'))
275 285
276 286 action, meta = sendframe(reactor,
277 287 ffs(b'1 2 stream-begin stream-settings continuation %s' %
278 288 data[0:3]))
279 289
280 290 self.assertEqual(action, b'noop')
281 291 self.assertEqual(meta, {})
282 292
283 293 action, meta = sendframe(reactor,
284 294 ffs(b'1 2 0 stream-settings eos %s' % data[3:]))
285 295
286 296 self.assertEqual(action, b'noop')
287 297 self.assertEqual(meta, {})
288 298
299 def testinvalidencoder(self):
300 reactor = framing.clientreactor(globalui, buffersends=False)
301
302 request, action, meta = reactor.callcommand(b'foo', {})
303 for f in meta[b'framegen']:
304 pass
305
306 action, meta = sendframe(reactor,
307 ffs(b'1 2 stream-begin stream-settings eos cbor:b"badvalue"'))
308
309 self.assertEqual(action, b'error')
310 self.assertEqual(meta, {
311 b'message': b'error setting stream decoder: unknown stream '
312 b'decoder: badvalue',
313 })
314
315 def testzlibencoding(self):
316 reactor = framing.clientreactor(globalui, buffersends=False)
317
318 request, action, meta = reactor.callcommand(b'foo', {})
319 for f in meta[b'framegen']:
320 pass
321
322 action, meta = sendframe(reactor,
323 ffs(b'%d 2 stream-begin stream-settings eos cbor:b"zlib"' %
324 request.requestid))
325
326 self.assertEqual(action, b'noop')
327 self.assertEqual(meta, {})
328
329 result = {
330 b'status': b'ok',
331 }
332 encoded = b''.join(cborutil.streamencode(result))
333
334 compressed = zlib.compress(encoded)
335 self.assertEqual(zlib.decompress(compressed), encoded)
336
337 action, meta = sendframe(reactor,
338 ffs(b'%d 2 encoded command-response eos %s' %
339 (request.requestid, compressed)))
340
341 self.assertEqual(action, b'responsedata')
342 self.assertEqual(meta[b'data'], encoded)
343
344 def testzlibencodingsinglebyteframes(self):
345 reactor = framing.clientreactor(globalui, buffersends=False)
346
347 request, action, meta = reactor.callcommand(b'foo', {})
348 for f in meta[b'framegen']:
349 pass
350
351 action, meta = sendframe(reactor,
352 ffs(b'%d 2 stream-begin stream-settings eos cbor:b"zlib"' %
353 request.requestid))
354
355 self.assertEqual(action, b'noop')
356 self.assertEqual(meta, {})
357
358 result = {
359 b'status': b'ok',
360 }
361 encoded = b''.join(cborutil.streamencode(result))
362
363 compressed = zlib.compress(encoded)
364 self.assertEqual(zlib.decompress(compressed), encoded)
365
366 chunks = []
367
368 for i in range(len(compressed)):
369 char = compressed[i:i + 1]
370 if char == b'\\':
371 char = b'\\\\'
372 action, meta = sendframe(reactor,
373 ffs(b'%d 2 encoded command-response continuation %s' %
374 (request.requestid, char)))
375
376 self.assertEqual(action, b'responsedata')
377 chunks.append(meta[b'data'])
378 self.assertTrue(meta[b'expectmore'])
379 self.assertFalse(meta[b'eos'])
380
381 # zlib will have the full data decoded at this point, even though
382 # we haven't flushed.
383 self.assertEqual(b''.join(chunks), encoded)
384
385 # End the stream for good measure.
386 action, meta = sendframe(reactor,
387 ffs(b'%d 2 stream-end command-response eos ' % request.requestid))
388
389 self.assertEqual(action, b'responsedata')
390 self.assertEqual(meta[b'data'], b'')
391 self.assertFalse(meta[b'expectmore'])
392 self.assertTrue(meta[b'eos'])
393
394 def testzlibmultipleresponses(self):
395 # We feed in zlib compressed data on the same stream but belonging to
396 # 2 different requests. This tests our flushing behavior.
397 reactor = framing.clientreactor(globalui, buffersends=False,
398 hasmultiplesend=True)
399
400 request1, action, meta = reactor.callcommand(b'foo', {})
401 for f in meta[b'framegen']:
402 pass
403
404 request2, action, meta = reactor.callcommand(b'foo', {})
405 for f in meta[b'framegen']:
406 pass
407
408 outstream = framing.outputstream(2)
409 outstream.setencoder(globalui, b'zlib')
410
411 response1 = b''.join(cborutil.streamencode({
412 b'status': b'ok',
413 b'extra': b'response1' * 10,
414 }))
415
416 response2 = b''.join(cborutil.streamencode({
417 b'status': b'error',
418 b'extra': b'response2' * 10,
419 }))
420
421 action, meta = sendframe(reactor,
422 ffs(b'%d 2 stream-begin stream-settings eos cbor:b"zlib"' %
423 request1.requestid))
424
425 self.assertEqual(action, b'noop')
426 self.assertEqual(meta, {})
427
428 # Feeding partial data in won't get anything useful out.
429 action, meta = sendframe(reactor,
430 ffs(b'%d 2 encoded command-response continuation %s' % (
431 request1.requestid, outstream.encode(response1))))
432 self.assertEqual(action, b'responsedata')
433 self.assertEqual(meta[b'data'], b'')
434
435 # But flushing data at both ends will get our original data.
436 action, meta = sendframe(reactor,
437 ffs(b'%d 2 encoded command-response eos %s' % (
438 request1.requestid, outstream.flush())))
439 self.assertEqual(action, b'responsedata')
440 self.assertEqual(meta[b'data'], response1)
441
442 # We should be able to reuse the compressor/decompressor for the
443 # 2nd response.
444 action, meta = sendframe(reactor,
445 ffs(b'%d 2 encoded command-response continuation %s' % (
446 request2.requestid, outstream.encode(response2))))
447 self.assertEqual(action, b'responsedata')
448 self.assertEqual(meta[b'data'], b'')
449
450 action, meta = sendframe(reactor,
451 ffs(b'%d 2 encoded command-response eos %s' % (
452 request2.requestid, outstream.flush())))
453 self.assertEqual(action, b'responsedata')
454 self.assertEqual(meta[b'data'], response2)
455
456 @unittest.skipUnless(zstd, 'zstd not available')
457 def testzstd8mbencoding(self):
458 reactor = framing.clientreactor(globalui, buffersends=False)
459
460 request, action, meta = reactor.callcommand(b'foo', {})
461 for f in meta[b'framegen']:
462 pass
463
464 action, meta = sendframe(reactor,
465 ffs(b'%d 2 stream-begin stream-settings eos cbor:b"zstd-8mb"' %
466 request.requestid))
467
468 self.assertEqual(action, b'noop')
469 self.assertEqual(meta, {})
470
471 result = {
472 b'status': b'ok',
473 }
474 encoded = b''.join(cborutil.streamencode(result))
475
476 encoder = framing.zstd8mbencoder(globalui)
477 compressed = encoder.encode(encoded) + encoder.finish()
478 self.assertEqual(zstd.ZstdDecompressor().decompress(
479 compressed, max_output_size=len(encoded)), encoded)
480
481 action, meta = sendframe(reactor,
482 ffs(b'%d 2 encoded command-response eos %s' %
483 (request.requestid, compressed)))
484
485 self.assertEqual(action, b'responsedata')
486 self.assertEqual(meta[b'data'], encoded)
487
488 @unittest.skipUnless(zstd, 'zstd not available')
489 def testzstd8mbencodingsinglebyteframes(self):
490 reactor = framing.clientreactor(globalui, buffersends=False)
491
492 request, action, meta = reactor.callcommand(b'foo', {})
493 for f in meta[b'framegen']:
494 pass
495
496 action, meta = sendframe(reactor,
497 ffs(b'%d 2 stream-begin stream-settings eos cbor:b"zstd-8mb"' %
498 request.requestid))
499
500 self.assertEqual(action, b'noop')
501 self.assertEqual(meta, {})
502
503 result = {
504 b'status': b'ok',
505 }
506 encoded = b''.join(cborutil.streamencode(result))
507
508 compressed = zstd.ZstdCompressor().compress(encoded)
509 self.assertEqual(zstd.ZstdDecompressor().decompress(compressed),
510 encoded)
511
512 chunks = []
513
514 for i in range(len(compressed)):
515 char = compressed[i:i + 1]
516 if char == b'\\':
517 char = b'\\\\'
518 action, meta = sendframe(reactor,
519 ffs(b'%d 2 encoded command-response continuation %s' %
520 (request.requestid, char)))
521
522 self.assertEqual(action, b'responsedata')
523 chunks.append(meta[b'data'])
524 self.assertTrue(meta[b'expectmore'])
525 self.assertFalse(meta[b'eos'])
526
527 # zstd decompressor will flush at frame boundaries.
528 self.assertEqual(b''.join(chunks), encoded)
529
530 # End the stream for good measure.
531 action, meta = sendframe(reactor,
532 ffs(b'%d 2 stream-end command-response eos ' % request.requestid))
533
534 self.assertEqual(action, b'responsedata')
535 self.assertEqual(meta[b'data'], b'')
536 self.assertFalse(meta[b'expectmore'])
537 self.assertTrue(meta[b'eos'])
538
539 @unittest.skipUnless(zstd, 'zstd not available')
540 def testzstd8mbmultipleresponses(self):
541 # We feed in zstd compressed data on the same stream but belonging to
542 # 2 different requests. This tests our flushing behavior.
543 reactor = framing.clientreactor(globalui, buffersends=False,
544 hasmultiplesend=True)
545
546 request1, action, meta = reactor.callcommand(b'foo', {})
547 for f in meta[b'framegen']:
548 pass
549
550 request2, action, meta = reactor.callcommand(b'foo', {})
551 for f in meta[b'framegen']:
552 pass
553
554 outstream = framing.outputstream(2)
555 outstream.setencoder(globalui, b'zstd-8mb')
556
557 response1 = b''.join(cborutil.streamencode({
558 b'status': b'ok',
559 b'extra': b'response1' * 10,
560 }))
561
562 response2 = b''.join(cborutil.streamencode({
563 b'status': b'error',
564 b'extra': b'response2' * 10,
565 }))
566
567 action, meta = sendframe(reactor,
568 ffs(b'%d 2 stream-begin stream-settings eos cbor:b"zstd-8mb"' %
569 request1.requestid))
570
571 self.assertEqual(action, b'noop')
572 self.assertEqual(meta, {})
573
574 # Feeding partial data in won't get anything useful out.
575 action, meta = sendframe(reactor,
576 ffs(b'%d 2 encoded command-response continuation %s' % (
577 request1.requestid, outstream.encode(response1))))
578 self.assertEqual(action, b'responsedata')
579 self.assertEqual(meta[b'data'], b'')
580
581 # But flushing data at both ends will get our original data.
582 action, meta = sendframe(reactor,
583 ffs(b'%d 2 encoded command-response eos %s' % (
584 request1.requestid, outstream.flush())))
585 self.assertEqual(action, b'responsedata')
586 self.assertEqual(meta[b'data'], response1)
587
588 # We should be able to reuse the compressor/decompressor for the
589 # 2nd response.
590 action, meta = sendframe(reactor,
591 ffs(b'%d 2 encoded command-response continuation %s' % (
592 request2.requestid, outstream.encode(response2))))
593 self.assertEqual(action, b'responsedata')
594 self.assertEqual(meta[b'data'], b'')
595
596 action, meta = sendframe(reactor,
597 ffs(b'%d 2 encoded command-response eos %s' % (
598 request2.requestid, outstream.flush())))
599 self.assertEqual(action, b'responsedata')
600 self.assertEqual(meta[b'data'], response2)
601
289 602 if __name__ == '__main__':
290 603 import silenttestrunner
291 604 silenttestrunner.main(__name__)
General Comments 0
You need to be logged in to leave comments. Login now