Show More
@@ -648,6 +648,140 b' class bufferingcommandresponseemitter(ob' | |||
|
648 | 648 | flags=FLAG_COMMAND_RESPONSE_CONTINUATION, |
|
649 | 649 | payload=payload) |
|
650 | 650 | |
|
651 | # TODO consider defining encoders/decoders using the util.compressionengine | |
|
652 | # mechanism. | |
|
653 | ||
|
654 | class identityencoder(object): | |
|
655 | """Encoder for the "identity" stream encoding profile.""" | |
|
656 | def __init__(self, ui): | |
|
657 | pass | |
|
658 | ||
|
659 | def encode(self, data): | |
|
660 | return data | |
|
661 | ||
|
662 | def flush(self): | |
|
663 | return b'' | |
|
664 | ||
|
665 | def finish(self): | |
|
666 | return b'' | |
|
667 | ||
|
668 | class identitydecoder(object): | |
|
669 | """Decoder for the "identity" stream encoding profile.""" | |
|
670 | ||
|
671 | def __init__(self, ui, extraobjs): | |
|
672 | if extraobjs: | |
|
673 | raise error.Abort(_('identity decoder received unexpected ' | |
|
674 | 'additional values')) | |
|
675 | ||
|
676 | def decode(self, data): | |
|
677 | return data | |
|
678 | ||
|
679 | class zlibencoder(object): | |
|
680 | def __init__(self, ui): | |
|
681 | import zlib | |
|
682 | self._zlib = zlib | |
|
683 | self._compressor = zlib.compressobj() | |
|
684 | ||
|
685 | def encode(self, data): | |
|
686 | return self._compressor.compress(data) | |
|
687 | ||
|
688 | def flush(self): | |
|
689 | # Z_SYNC_FLUSH doesn't reset compression context, which is | |
|
690 | # what we want. | |
|
691 | return self._compressor.flush(self._zlib.Z_SYNC_FLUSH) | |
|
692 | ||
|
693 | def finish(self): | |
|
694 | res = self._compressor.flush(self._zlib.Z_FINISH) | |
|
695 | self._compressor = None | |
|
696 | return res | |
|
697 | ||
|
698 | class zlibdecoder(object): | |
|
699 | def __init__(self, ui, extraobjs): | |
|
700 | import zlib | |
|
701 | ||
|
702 | if extraobjs: | |
|
703 | raise error.Abort(_('zlib decoder received unexpected ' | |
|
704 | 'additional values')) | |
|
705 | ||
|
706 | self._decompressor = zlib.decompressobj() | |
|
707 | ||
|
708 | def decode(self, data): | |
|
709 | # Python 2's zlib module doesn't use the buffer protocol and can't | |
|
710 | # handle all bytes-like types. | |
|
711 | if not pycompat.ispy3 and isinstance(data, bytearray): | |
|
712 | data = bytes(data) | |
|
713 | ||
|
714 | return self._decompressor.decompress(data) | |
|
715 | ||
|
716 | class zstdbaseencoder(object): | |
|
717 | def __init__(self, level): | |
|
718 | from . import zstd | |
|
719 | ||
|
720 | self._zstd = zstd | |
|
721 | cctx = zstd.ZstdCompressor(level=level) | |
|
722 | self._compressor = cctx.compressobj() | |
|
723 | ||
|
724 | def encode(self, data): | |
|
725 | return self._compressor.compress(data) | |
|
726 | ||
|
727 | def flush(self): | |
|
728 | # COMPRESSOBJ_FLUSH_BLOCK flushes all data previously fed into the | |
|
729 | # compressor and allows a decompressor to access all encoded data | |
|
730 | # up to this point. | |
|
731 | return self._compressor.flush(self._zstd.COMPRESSOBJ_FLUSH_BLOCK) | |
|
732 | ||
|
733 | def finish(self): | |
|
734 | res = self._compressor.flush(self._zstd.COMPRESSOBJ_FLUSH_FINISH) | |
|
735 | self._compressor = None | |
|
736 | return res | |
|
737 | ||
|
738 | class zstd8mbencoder(zstdbaseencoder): | |
|
739 | def __init__(self, ui): | |
|
740 | super(zstd8mbencoder, self).__init__(3) | |
|
741 | ||
|
742 | class zstdbasedecoder(object): | |
|
743 | def __init__(self, maxwindowsize): | |
|
744 | from . import zstd | |
|
745 | dctx = zstd.ZstdDecompressor(max_window_size=maxwindowsize) | |
|
746 | self._decompressor = dctx.decompressobj() | |
|
747 | ||
|
748 | def decode(self, data): | |
|
749 | return self._decompressor.decompress(data) | |
|
750 | ||
|
751 | class zstd8mbdecoder(zstdbasedecoder): | |
|
752 | def __init__(self, ui, extraobjs): | |
|
753 | if extraobjs: | |
|
754 | raise error.Abort(_('zstd8mb decoder received unexpected ' | |
|
755 | 'additional values')) | |
|
756 | ||
|
757 | super(zstd8mbdecoder, self).__init__(maxwindowsize=8 * 1048576) | |
|
758 | ||
|
759 | # We lazily populate this to avoid excessive module imports when importing | |
|
760 | # this module. | |
|
761 | STREAM_ENCODERS = {} | |
|
762 | STREAM_ENCODERS_ORDER = [] | |
|
763 | ||
|
764 | def populatestreamencoders(): | |
|
765 | if STREAM_ENCODERS: | |
|
766 | return | |
|
767 | ||
|
768 | try: | |
|
769 | from . import zstd | |
|
770 | zstd.__version__ | |
|
771 | except ImportError: | |
|
772 | zstd = None | |
|
773 | ||
|
774 | # zstandard is fastest and is preferred. | |
|
775 | if zstd: | |
|
776 | STREAM_ENCODERS[b'zstd-8mb'] = (zstd8mbencoder, zstd8mbdecoder) | |
|
777 | STREAM_ENCODERS_ORDER.append(b'zstd-8mb') | |
|
778 | ||
|
779 | STREAM_ENCODERS[b'zlib'] = (zlibencoder, zlibdecoder) | |
|
780 | STREAM_ENCODERS_ORDER.append(b'zlib') | |
|
781 | ||
|
782 | STREAM_ENCODERS[b'identity'] = (identityencoder, identitydecoder) | |
|
783 | STREAM_ENCODERS_ORDER.append(b'identity') | |
|
784 | ||
|
651 | 785 | class stream(object): |
|
652 | 786 | """Represents a logical unidirectional series of frames.""" |
|
653 | 787 | |
@@ -671,16 +805,70 b' class stream(object):' | |||
|
671 | 805 | class inputstream(stream): |
|
672 | 806 | """Represents a stream used for receiving data.""" |
|
673 | 807 | |
|
674 | def setdecoder(self, name, extraobjs): | |
|
808 | def __init__(self, streamid, active=False): | |
|
809 | super(inputstream, self).__init__(streamid, active=active) | |
|
810 | self._decoder = None | |
|
811 | ||
|
812 | def setdecoder(self, ui, name, extraobjs): | |
|
675 | 813 | """Set the decoder for this stream. |
|
676 | 814 | |
|
677 | 815 | Receives the stream profile name and any additional CBOR objects |
|
678 | 816 | decoded from the stream encoding settings frame payloads. |
|
679 | 817 | """ |
|
818 | if name not in STREAM_ENCODERS: | |
|
819 | raise error.Abort(_('unknown stream decoder: %s') % name) | |
|
820 | ||
|
821 | self._decoder = STREAM_ENCODERS[name][1](ui, extraobjs) | |
|
822 | ||
|
823 | def decode(self, data): | |
|
824 | # Default is identity decoder. We don't bother instantiating one | |
|
825 | # because it is trivial. | |
|
826 | if not self._decoder: | |
|
827 | return data | |
|
828 | ||
|
829 | return self._decoder.decode(data) | |
|
830 | ||
|
831 | def flush(self): | |
|
832 | if not self._decoder: | |
|
833 | return b'' | |
|
834 | ||
|
835 | return self._decoder.flush() | |
|
680 | 836 | |
|
681 | 837 | class outputstream(stream): |
|
682 | 838 | """Represents a stream used for sending data.""" |
|
683 | 839 | |
|
840 | def __init__(self, streamid, active=False): | |
|
841 | super(outputstream, self).__init__(streamid, active=active) | |
|
842 | self._encoder = None | |
|
843 | ||
|
844 | def setencoder(self, ui, name): | |
|
845 | """Set the encoder for this stream. | |
|
846 | ||
|
847 | Receives the stream profile name. | |
|
848 | """ | |
|
849 | if name not in STREAM_ENCODERS: | |
|
850 | raise error.Abort(_('unknown stream encoder: %s') % name) | |
|
851 | ||
|
852 | self._encoder = STREAM_ENCODERS[name][0](ui) | |
|
853 | ||
|
854 | def encode(self, data): | |
|
855 | if not self._encoder: | |
|
856 | return data | |
|
857 | ||
|
858 | return self._encoder.encode(data) | |
|
859 | ||
|
860 | def flush(self): | |
|
861 | if not self._encoder: | |
|
862 | return b'' | |
|
863 | ||
|
864 | return self._encoder.flush() | |
|
865 | ||
|
866 | def finish(self): | |
|
867 | if not self._encoder: | |
|
868 | return b'' | |
|
869 | ||
|
870 | self._encoder.finish() | |
|
871 | ||
|
684 | 872 | def ensureserverstream(stream): |
|
685 | 873 | if stream.streamid % 2: |
|
686 | 874 | raise error.ProgrammingError('server should only write to even ' |
@@ -786,6 +974,8 b' class serverreactor(object):' | |||
|
786 | 974 | # Sender protocol settings are optional. Set implied default values. |
|
787 | 975 | self._sendersettings = dict(DEFAULT_PROTOCOL_SETTINGS) |
|
788 | 976 | |
|
977 | populatestreamencoders() | |
|
978 | ||
|
789 | 979 | def onframerecv(self, frame): |
|
790 | 980 | """Process a frame that has been received off the wire. |
|
791 | 981 | |
@@ -1384,6 +1574,8 b' class clientreactor(object):' | |||
|
1384 | 1574 | self._incomingstreams = {} |
|
1385 | 1575 | self._streamsettingsdecoders = {} |
|
1386 | 1576 | |
|
1577 | populatestreamencoders() | |
|
1578 | ||
|
1387 | 1579 | def callcommand(self, name, args, datafh=None, redirect=None): |
|
1388 | 1580 | """Request that a command be executed. |
|
1389 | 1581 | |
@@ -1494,9 +1686,13 b' class clientreactor(object):' | |||
|
1494 | 1686 | self._incomingstreams[frame.streamid] = inputstream( |
|
1495 | 1687 | frame.streamid) |
|
1496 | 1688 | |
|
1689 | stream = self._incomingstreams[frame.streamid] | |
|
1690 | ||
|
1691 | # If the payload is encoded, ask the stream to decode it. We | |
|
1692 | # merely substitute the decoded result into the frame payload as | |
|
1693 | # if it had been transferred all along. | |
|
1497 | 1694 | if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED: |
|
1498 | raise error.ProgrammingError('support for decoding stream ' | |
|
1499 | 'payloads not yet implemneted') | |
|
1695 | frame.payload = stream.decode(frame.payload) | |
|
1500 | 1696 | |
|
1501 | 1697 | if frame.streamflags & STREAM_FLAG_END_STREAM: |
|
1502 | 1698 | del self._incomingstreams[frame.streamid] |
@@ -1573,7 +1769,8 b' class clientreactor(object):' | |||
|
1573 | 1769 | } |
|
1574 | 1770 | |
|
1575 | 1771 | try: |
|
1576 |
self._incomingstreams[frame.streamid].setdecoder( |
|
|
1772 | self._incomingstreams[frame.streamid].setdecoder(self._ui, | |
|
1773 | decoded[0], | |
|
1577 | 1774 | decoded[1:]) |
|
1578 | 1775 | except Exception as e: |
|
1579 | 1776 | return 'error', { |
@@ -1,6 +1,7 b'' | |||
|
1 | 1 | from __future__ import absolute_import |
|
2 | 2 | |
|
3 | 3 | import unittest |
|
4 | import zlib | |
|
4 | 5 | |
|
5 | 6 | from mercurial import ( |
|
6 | 7 | error, |
@@ -11,6 +12,12 b' from mercurial.utils import (' | |||
|
11 | 12 | cborutil, |
|
12 | 13 | ) |
|
13 | 14 | |
|
15 | try: | |
|
16 | from mercurial import zstd | |
|
17 | zstd.__version__ | |
|
18 | except ImportError: | |
|
19 | zstd = None | |
|
20 | ||
|
14 | 21 | ffs = framing.makeframefromhumanstring |
|
15 | 22 | |
|
16 | 23 | globalui = uimod.ui() |
@@ -261,8 +268,11 b' class StreamSettingsTests(unittest.TestC' | |||
|
261 | 268 | action, meta = sendframe(reactor, |
|
262 | 269 | ffs(b'1 2 stream-begin stream-settings eos %s' % data)) |
|
263 | 270 | |
|
264 |
self.assertEqual(action, b' |
|
|
265 |
self.assertEqual(meta, { |
|
|
271 | self.assertEqual(action, b'error') | |
|
272 | self.assertEqual(meta, { | |
|
273 | b'message': b'error setting stream decoder: identity decoder ' | |
|
274 | b'received unexpected additional values', | |
|
275 | }) | |
|
266 | 276 | |
|
267 | 277 | def testmultipleframes(self): |
|
268 | 278 | reactor = framing.clientreactor(globalui, buffersends=False) |
@@ -286,6 +296,309 b' class StreamSettingsTests(unittest.TestC' | |||
|
286 | 296 | self.assertEqual(action, b'noop') |
|
287 | 297 | self.assertEqual(meta, {}) |
|
288 | 298 | |
|
299 | def testinvalidencoder(self): | |
|
300 | reactor = framing.clientreactor(globalui, buffersends=False) | |
|
301 | ||
|
302 | request, action, meta = reactor.callcommand(b'foo', {}) | |
|
303 | for f in meta[b'framegen']: | |
|
304 | pass | |
|
305 | ||
|
306 | action, meta = sendframe(reactor, | |
|
307 | ffs(b'1 2 stream-begin stream-settings eos cbor:b"badvalue"')) | |
|
308 | ||
|
309 | self.assertEqual(action, b'error') | |
|
310 | self.assertEqual(meta, { | |
|
311 | b'message': b'error setting stream decoder: unknown stream ' | |
|
312 | b'decoder: badvalue', | |
|
313 | }) | |
|
314 | ||
|
315 | def testzlibencoding(self): | |
|
316 | reactor = framing.clientreactor(globalui, buffersends=False) | |
|
317 | ||
|
318 | request, action, meta = reactor.callcommand(b'foo', {}) | |
|
319 | for f in meta[b'framegen']: | |
|
320 | pass | |
|
321 | ||
|
322 | action, meta = sendframe(reactor, | |
|
323 | ffs(b'%d 2 stream-begin stream-settings eos cbor:b"zlib"' % | |
|
324 | request.requestid)) | |
|
325 | ||
|
326 | self.assertEqual(action, b'noop') | |
|
327 | self.assertEqual(meta, {}) | |
|
328 | ||
|
329 | result = { | |
|
330 | b'status': b'ok', | |
|
331 | } | |
|
332 | encoded = b''.join(cborutil.streamencode(result)) | |
|
333 | ||
|
334 | compressed = zlib.compress(encoded) | |
|
335 | self.assertEqual(zlib.decompress(compressed), encoded) | |
|
336 | ||
|
337 | action, meta = sendframe(reactor, | |
|
338 | ffs(b'%d 2 encoded command-response eos %s' % | |
|
339 | (request.requestid, compressed))) | |
|
340 | ||
|
341 | self.assertEqual(action, b'responsedata') | |
|
342 | self.assertEqual(meta[b'data'], encoded) | |
|
343 | ||
|
344 | def testzlibencodingsinglebyteframes(self): | |
|
345 | reactor = framing.clientreactor(globalui, buffersends=False) | |
|
346 | ||
|
347 | request, action, meta = reactor.callcommand(b'foo', {}) | |
|
348 | for f in meta[b'framegen']: | |
|
349 | pass | |
|
350 | ||
|
351 | action, meta = sendframe(reactor, | |
|
352 | ffs(b'%d 2 stream-begin stream-settings eos cbor:b"zlib"' % | |
|
353 | request.requestid)) | |
|
354 | ||
|
355 | self.assertEqual(action, b'noop') | |
|
356 | self.assertEqual(meta, {}) | |
|
357 | ||
|
358 | result = { | |
|
359 | b'status': b'ok', | |
|
360 | } | |
|
361 | encoded = b''.join(cborutil.streamencode(result)) | |
|
362 | ||
|
363 | compressed = zlib.compress(encoded) | |
|
364 | self.assertEqual(zlib.decompress(compressed), encoded) | |
|
365 | ||
|
366 | chunks = [] | |
|
367 | ||
|
368 | for i in range(len(compressed)): | |
|
369 | char = compressed[i:i + 1] | |
|
370 | if char == b'\\': | |
|
371 | char = b'\\\\' | |
|
372 | action, meta = sendframe(reactor, | |
|
373 | ffs(b'%d 2 encoded command-response continuation %s' % | |
|
374 | (request.requestid, char))) | |
|
375 | ||
|
376 | self.assertEqual(action, b'responsedata') | |
|
377 | chunks.append(meta[b'data']) | |
|
378 | self.assertTrue(meta[b'expectmore']) | |
|
379 | self.assertFalse(meta[b'eos']) | |
|
380 | ||
|
381 | # zlib will have the full data decoded at this point, even though | |
|
382 | # we haven't flushed. | |
|
383 | self.assertEqual(b''.join(chunks), encoded) | |
|
384 | ||
|
385 | # End the stream for good measure. | |
|
386 | action, meta = sendframe(reactor, | |
|
387 | ffs(b'%d 2 stream-end command-response eos ' % request.requestid)) | |
|
388 | ||
|
389 | self.assertEqual(action, b'responsedata') | |
|
390 | self.assertEqual(meta[b'data'], b'') | |
|
391 | self.assertFalse(meta[b'expectmore']) | |
|
392 | self.assertTrue(meta[b'eos']) | |
|
393 | ||
|
394 | def testzlibmultipleresponses(self): | |
|
395 | # We feed in zlib compressed data on the same stream but belonging to | |
|
396 | # 2 different requests. This tests our flushing behavior. | |
|
397 | reactor = framing.clientreactor(globalui, buffersends=False, | |
|
398 | hasmultiplesend=True) | |
|
399 | ||
|
400 | request1, action, meta = reactor.callcommand(b'foo', {}) | |
|
401 | for f in meta[b'framegen']: | |
|
402 | pass | |
|
403 | ||
|
404 | request2, action, meta = reactor.callcommand(b'foo', {}) | |
|
405 | for f in meta[b'framegen']: | |
|
406 | pass | |
|
407 | ||
|
408 | outstream = framing.outputstream(2) | |
|
409 | outstream.setencoder(globalui, b'zlib') | |
|
410 | ||
|
411 | response1 = b''.join(cborutil.streamencode({ | |
|
412 | b'status': b'ok', | |
|
413 | b'extra': b'response1' * 10, | |
|
414 | })) | |
|
415 | ||
|
416 | response2 = b''.join(cborutil.streamencode({ | |
|
417 | b'status': b'error', | |
|
418 | b'extra': b'response2' * 10, | |
|
419 | })) | |
|
420 | ||
|
421 | action, meta = sendframe(reactor, | |
|
422 | ffs(b'%d 2 stream-begin stream-settings eos cbor:b"zlib"' % | |
|
423 | request1.requestid)) | |
|
424 | ||
|
425 | self.assertEqual(action, b'noop') | |
|
426 | self.assertEqual(meta, {}) | |
|
427 | ||
|
428 | # Feeding partial data in won't get anything useful out. | |
|
429 | action, meta = sendframe(reactor, | |
|
430 | ffs(b'%d 2 encoded command-response continuation %s' % ( | |
|
431 | request1.requestid, outstream.encode(response1)))) | |
|
432 | self.assertEqual(action, b'responsedata') | |
|
433 | self.assertEqual(meta[b'data'], b'') | |
|
434 | ||
|
435 | # But flushing data at both ends will get our original data. | |
|
436 | action, meta = sendframe(reactor, | |
|
437 | ffs(b'%d 2 encoded command-response eos %s' % ( | |
|
438 | request1.requestid, outstream.flush()))) | |
|
439 | self.assertEqual(action, b'responsedata') | |
|
440 | self.assertEqual(meta[b'data'], response1) | |
|
441 | ||
|
442 | # We should be able to reuse the compressor/decompressor for the | |
|
443 | # 2nd response. | |
|
444 | action, meta = sendframe(reactor, | |
|
445 | ffs(b'%d 2 encoded command-response continuation %s' % ( | |
|
446 | request2.requestid, outstream.encode(response2)))) | |
|
447 | self.assertEqual(action, b'responsedata') | |
|
448 | self.assertEqual(meta[b'data'], b'') | |
|
449 | ||
|
450 | action, meta = sendframe(reactor, | |
|
451 | ffs(b'%d 2 encoded command-response eos %s' % ( | |
|
452 | request2.requestid, outstream.flush()))) | |
|
453 | self.assertEqual(action, b'responsedata') | |
|
454 | self.assertEqual(meta[b'data'], response2) | |
|
455 | ||
|
456 | @unittest.skipUnless(zstd, 'zstd not available') | |
|
457 | def testzstd8mbencoding(self): | |
|
458 | reactor = framing.clientreactor(globalui, buffersends=False) | |
|
459 | ||
|
460 | request, action, meta = reactor.callcommand(b'foo', {}) | |
|
461 | for f in meta[b'framegen']: | |
|
462 | pass | |
|
463 | ||
|
464 | action, meta = sendframe(reactor, | |
|
465 | ffs(b'%d 2 stream-begin stream-settings eos cbor:b"zstd-8mb"' % | |
|
466 | request.requestid)) | |
|
467 | ||
|
468 | self.assertEqual(action, b'noop') | |
|
469 | self.assertEqual(meta, {}) | |
|
470 | ||
|
471 | result = { | |
|
472 | b'status': b'ok', | |
|
473 | } | |
|
474 | encoded = b''.join(cborutil.streamencode(result)) | |
|
475 | ||
|
476 | encoder = framing.zstd8mbencoder(globalui) | |
|
477 | compressed = encoder.encode(encoded) + encoder.finish() | |
|
478 | self.assertEqual(zstd.ZstdDecompressor().decompress( | |
|
479 | compressed, max_output_size=len(encoded)), encoded) | |
|
480 | ||
|
481 | action, meta = sendframe(reactor, | |
|
482 | ffs(b'%d 2 encoded command-response eos %s' % | |
|
483 | (request.requestid, compressed))) | |
|
484 | ||
|
485 | self.assertEqual(action, b'responsedata') | |
|
486 | self.assertEqual(meta[b'data'], encoded) | |
|
487 | ||
|
488 | @unittest.skipUnless(zstd, 'zstd not available') | |
|
489 | def testzstd8mbencodingsinglebyteframes(self): | |
|
490 | reactor = framing.clientreactor(globalui, buffersends=False) | |
|
491 | ||
|
492 | request, action, meta = reactor.callcommand(b'foo', {}) | |
|
493 | for f in meta[b'framegen']: | |
|
494 | pass | |
|
495 | ||
|
496 | action, meta = sendframe(reactor, | |
|
497 | ffs(b'%d 2 stream-begin stream-settings eos cbor:b"zstd-8mb"' % | |
|
498 | request.requestid)) | |
|
499 | ||
|
500 | self.assertEqual(action, b'noop') | |
|
501 | self.assertEqual(meta, {}) | |
|
502 | ||
|
503 | result = { | |
|
504 | b'status': b'ok', | |
|
505 | } | |
|
506 | encoded = b''.join(cborutil.streamencode(result)) | |
|
507 | ||
|
508 | compressed = zstd.ZstdCompressor().compress(encoded) | |
|
509 | self.assertEqual(zstd.ZstdDecompressor().decompress(compressed), | |
|
510 | encoded) | |
|
511 | ||
|
512 | chunks = [] | |
|
513 | ||
|
514 | for i in range(len(compressed)): | |
|
515 | char = compressed[i:i + 1] | |
|
516 | if char == b'\\': | |
|
517 | char = b'\\\\' | |
|
518 | action, meta = sendframe(reactor, | |
|
519 | ffs(b'%d 2 encoded command-response continuation %s' % | |
|
520 | (request.requestid, char))) | |
|
521 | ||
|
522 | self.assertEqual(action, b'responsedata') | |
|
523 | chunks.append(meta[b'data']) | |
|
524 | self.assertTrue(meta[b'expectmore']) | |
|
525 | self.assertFalse(meta[b'eos']) | |
|
526 | ||
|
527 | # zstd decompressor will flush at frame boundaries. | |
|
528 | self.assertEqual(b''.join(chunks), encoded) | |
|
529 | ||
|
530 | # End the stream for good measure. | |
|
531 | action, meta = sendframe(reactor, | |
|
532 | ffs(b'%d 2 stream-end command-response eos ' % request.requestid)) | |
|
533 | ||
|
534 | self.assertEqual(action, b'responsedata') | |
|
535 | self.assertEqual(meta[b'data'], b'') | |
|
536 | self.assertFalse(meta[b'expectmore']) | |
|
537 | self.assertTrue(meta[b'eos']) | |
|
538 | ||
|
539 | @unittest.skipUnless(zstd, 'zstd not available') | |
|
540 | def testzstd8mbmultipleresponses(self): | |
|
541 | # We feed in zstd compressed data on the same stream but belonging to | |
|
542 | # 2 different requests. This tests our flushing behavior. | |
|
543 | reactor = framing.clientreactor(globalui, buffersends=False, | |
|
544 | hasmultiplesend=True) | |
|
545 | ||
|
546 | request1, action, meta = reactor.callcommand(b'foo', {}) | |
|
547 | for f in meta[b'framegen']: | |
|
548 | pass | |
|
549 | ||
|
550 | request2, action, meta = reactor.callcommand(b'foo', {}) | |
|
551 | for f in meta[b'framegen']: | |
|
552 | pass | |
|
553 | ||
|
554 | outstream = framing.outputstream(2) | |
|
555 | outstream.setencoder(globalui, b'zstd-8mb') | |
|
556 | ||
|
557 | response1 = b''.join(cborutil.streamencode({ | |
|
558 | b'status': b'ok', | |
|
559 | b'extra': b'response1' * 10, | |
|
560 | })) | |
|
561 | ||
|
562 | response2 = b''.join(cborutil.streamencode({ | |
|
563 | b'status': b'error', | |
|
564 | b'extra': b'response2' * 10, | |
|
565 | })) | |
|
566 | ||
|
567 | action, meta = sendframe(reactor, | |
|
568 | ffs(b'%d 2 stream-begin stream-settings eos cbor:b"zstd-8mb"' % | |
|
569 | request1.requestid)) | |
|
570 | ||
|
571 | self.assertEqual(action, b'noop') | |
|
572 | self.assertEqual(meta, {}) | |
|
573 | ||
|
574 | # Feeding partial data in won't get anything useful out. | |
|
575 | action, meta = sendframe(reactor, | |
|
576 | ffs(b'%d 2 encoded command-response continuation %s' % ( | |
|
577 | request1.requestid, outstream.encode(response1)))) | |
|
578 | self.assertEqual(action, b'responsedata') | |
|
579 | self.assertEqual(meta[b'data'], b'') | |
|
580 | ||
|
581 | # But flushing data at both ends will get our original data. | |
|
582 | action, meta = sendframe(reactor, | |
|
583 | ffs(b'%d 2 encoded command-response eos %s' % ( | |
|
584 | request1.requestid, outstream.flush()))) | |
|
585 | self.assertEqual(action, b'responsedata') | |
|
586 | self.assertEqual(meta[b'data'], response1) | |
|
587 | ||
|
588 | # We should be able to reuse the compressor/decompressor for the | |
|
589 | # 2nd response. | |
|
590 | action, meta = sendframe(reactor, | |
|
591 | ffs(b'%d 2 encoded command-response continuation %s' % ( | |
|
592 | request2.requestid, outstream.encode(response2)))) | |
|
593 | self.assertEqual(action, b'responsedata') | |
|
594 | self.assertEqual(meta[b'data'], b'') | |
|
595 | ||
|
596 | action, meta = sendframe(reactor, | |
|
597 | ffs(b'%d 2 encoded command-response eos %s' % ( | |
|
598 | request2.requestid, outstream.flush()))) | |
|
599 | self.assertEqual(action, b'responsedata') | |
|
600 | self.assertEqual(meta[b'data'], response2) | |
|
601 | ||
|
289 | 602 | if __name__ == '__main__': |
|
290 | 603 | import silenttestrunner |
|
291 | 604 | silenttestrunner.main(__name__) |
General Comments 0
You need to be logged in to leave comments.
Login now