Show More
@@ -648,6 +648,140 class bufferingcommandresponseemitter(ob | |||||
648 | flags=FLAG_COMMAND_RESPONSE_CONTINUATION, |
|
648 | flags=FLAG_COMMAND_RESPONSE_CONTINUATION, | |
649 | payload=payload) |
|
649 | payload=payload) | |
650 |
|
650 | |||
|
651 | # TODO consider defining encoders/decoders using the util.compressionengine | |||
|
652 | # mechanism. | |||
|
653 | ||||
|
654 | class identityencoder(object): | |||
|
655 | """Encoder for the "identity" stream encoding profile.""" | |||
|
656 | def __init__(self, ui): | |||
|
657 | pass | |||
|
658 | ||||
|
659 | def encode(self, data): | |||
|
660 | return data | |||
|
661 | ||||
|
662 | def flush(self): | |||
|
663 | return b'' | |||
|
664 | ||||
|
665 | def finish(self): | |||
|
666 | return b'' | |||
|
667 | ||||
|
668 | class identitydecoder(object): | |||
|
669 | """Decoder for the "identity" stream encoding profile.""" | |||
|
670 | ||||
|
671 | def __init__(self, ui, extraobjs): | |||
|
672 | if extraobjs: | |||
|
673 | raise error.Abort(_('identity decoder received unexpected ' | |||
|
674 | 'additional values')) | |||
|
675 | ||||
|
676 | def decode(self, data): | |||
|
677 | return data | |||
|
678 | ||||
|
679 | class zlibencoder(object): | |||
|
680 | def __init__(self, ui): | |||
|
681 | import zlib | |||
|
682 | self._zlib = zlib | |||
|
683 | self._compressor = zlib.compressobj() | |||
|
684 | ||||
|
685 | def encode(self, data): | |||
|
686 | return self._compressor.compress(data) | |||
|
687 | ||||
|
688 | def flush(self): | |||
|
689 | # Z_SYNC_FLUSH doesn't reset compression context, which is | |||
|
690 | # what we want. | |||
|
691 | return self._compressor.flush(self._zlib.Z_SYNC_FLUSH) | |||
|
692 | ||||
|
693 | def finish(self): | |||
|
694 | res = self._compressor.flush(self._zlib.Z_FINISH) | |||
|
695 | self._compressor = None | |||
|
696 | return res | |||
|
697 | ||||
|
698 | class zlibdecoder(object): | |||
|
699 | def __init__(self, ui, extraobjs): | |||
|
700 | import zlib | |||
|
701 | ||||
|
702 | if extraobjs: | |||
|
703 | raise error.Abort(_('zlib decoder received unexpected ' | |||
|
704 | 'additional values')) | |||
|
705 | ||||
|
706 | self._decompressor = zlib.decompressobj() | |||
|
707 | ||||
|
708 | def decode(self, data): | |||
|
709 | # Python 2's zlib module doesn't use the buffer protocol and can't | |||
|
710 | # handle all bytes-like types. | |||
|
711 | if not pycompat.ispy3 and isinstance(data, bytearray): | |||
|
712 | data = bytes(data) | |||
|
713 | ||||
|
714 | return self._decompressor.decompress(data) | |||
|
715 | ||||
|
716 | class zstdbaseencoder(object): | |||
|
717 | def __init__(self, level): | |||
|
718 | from . import zstd | |||
|
719 | ||||
|
720 | self._zstd = zstd | |||
|
721 | cctx = zstd.ZstdCompressor(level=level) | |||
|
722 | self._compressor = cctx.compressobj() | |||
|
723 | ||||
|
724 | def encode(self, data): | |||
|
725 | return self._compressor.compress(data) | |||
|
726 | ||||
|
727 | def flush(self): | |||
|
728 | # COMPRESSOBJ_FLUSH_BLOCK flushes all data previously fed into the | |||
|
729 | # compressor and allows a decompressor to access all encoded data | |||
|
730 | # up to this point. | |||
|
731 | return self._compressor.flush(self._zstd.COMPRESSOBJ_FLUSH_BLOCK) | |||
|
732 | ||||
|
733 | def finish(self): | |||
|
734 | res = self._compressor.flush(self._zstd.COMPRESSOBJ_FLUSH_FINISH) | |||
|
735 | self._compressor = None | |||
|
736 | return res | |||
|
737 | ||||
|
738 | class zstd8mbencoder(zstdbaseencoder): | |||
|
739 | def __init__(self, ui): | |||
|
740 | super(zstd8mbencoder, self).__init__(3) | |||
|
741 | ||||
|
742 | class zstdbasedecoder(object): | |||
|
743 | def __init__(self, maxwindowsize): | |||
|
744 | from . import zstd | |||
|
745 | dctx = zstd.ZstdDecompressor(max_window_size=maxwindowsize) | |||
|
746 | self._decompressor = dctx.decompressobj() | |||
|
747 | ||||
|
748 | def decode(self, data): | |||
|
749 | return self._decompressor.decompress(data) | |||
|
750 | ||||
|
751 | class zstd8mbdecoder(zstdbasedecoder): | |||
|
752 | def __init__(self, ui, extraobjs): | |||
|
753 | if extraobjs: | |||
|
754 | raise error.Abort(_('zstd8mb decoder received unexpected ' | |||
|
755 | 'additional values')) | |||
|
756 | ||||
|
757 | super(zstd8mbdecoder, self).__init__(maxwindowsize=8 * 1048576) | |||
|
758 | ||||
|
759 | # We lazily populate this to avoid excessive module imports when importing | |||
|
760 | # this module. | |||
|
761 | STREAM_ENCODERS = {} | |||
|
762 | STREAM_ENCODERS_ORDER = [] | |||
|
763 | ||||
|
764 | def populatestreamencoders(): | |||
|
765 | if STREAM_ENCODERS: | |||
|
766 | return | |||
|
767 | ||||
|
768 | try: | |||
|
769 | from . import zstd | |||
|
770 | zstd.__version__ | |||
|
771 | except ImportError: | |||
|
772 | zstd = None | |||
|
773 | ||||
|
774 | # zstandard is fastest and is preferred. | |||
|
775 | if zstd: | |||
|
776 | STREAM_ENCODERS[b'zstd-8mb'] = (zstd8mbencoder, zstd8mbdecoder) | |||
|
777 | STREAM_ENCODERS_ORDER.append(b'zstd-8mb') | |||
|
778 | ||||
|
779 | STREAM_ENCODERS[b'zlib'] = (zlibencoder, zlibdecoder) | |||
|
780 | STREAM_ENCODERS_ORDER.append(b'zlib') | |||
|
781 | ||||
|
782 | STREAM_ENCODERS[b'identity'] = (identityencoder, identitydecoder) | |||
|
783 | STREAM_ENCODERS_ORDER.append(b'identity') | |||
|
784 | ||||
651 | class stream(object): |
|
785 | class stream(object): | |
652 | """Represents a logical unidirectional series of frames.""" |
|
786 | """Represents a logical unidirectional series of frames.""" | |
653 |
|
787 | |||
@@ -671,16 +805,70 class stream(object): | |||||
671 | class inputstream(stream): |
|
805 | class inputstream(stream): | |
672 | """Represents a stream used for receiving data.""" |
|
806 | """Represents a stream used for receiving data.""" | |
673 |
|
807 | |||
674 | def setdecoder(self, name, extraobjs): |
|
808 | def __init__(self, streamid, active=False): | |
|
809 | super(inputstream, self).__init__(streamid, active=active) | |||
|
810 | self._decoder = None | |||
|
811 | ||||
|
812 | def setdecoder(self, ui, name, extraobjs): | |||
675 | """Set the decoder for this stream. |
|
813 | """Set the decoder for this stream. | |
676 |
|
814 | |||
677 | Receives the stream profile name and any additional CBOR objects |
|
815 | Receives the stream profile name and any additional CBOR objects | |
678 | decoded from the stream encoding settings frame payloads. |
|
816 | decoded from the stream encoding settings frame payloads. | |
679 | """ |
|
817 | """ | |
|
818 | if name not in STREAM_ENCODERS: | |||
|
819 | raise error.Abort(_('unknown stream decoder: %s') % name) | |||
|
820 | ||||
|
821 | self._decoder = STREAM_ENCODERS[name][1](ui, extraobjs) | |||
|
822 | ||||
|
823 | def decode(self, data): | |||
|
824 | # Default is identity decoder. We don't bother instantiating one | |||
|
825 | # because it is trivial. | |||
|
826 | if not self._decoder: | |||
|
827 | return data | |||
|
828 | ||||
|
829 | return self._decoder.decode(data) | |||
|
830 | ||||
|
831 | def flush(self): | |||
|
832 | if not self._decoder: | |||
|
833 | return b'' | |||
|
834 | ||||
|
835 | return self._decoder.flush() | |||
680 |
|
836 | |||
681 | class outputstream(stream): |
|
837 | class outputstream(stream): | |
682 | """Represents a stream used for sending data.""" |
|
838 | """Represents a stream used for sending data.""" | |
683 |
|
839 | |||
|
840 | def __init__(self, streamid, active=False): | |||
|
841 | super(outputstream, self).__init__(streamid, active=active) | |||
|
842 | self._encoder = None | |||
|
843 | ||||
|
844 | def setencoder(self, ui, name): | |||
|
845 | """Set the encoder for this stream. | |||
|
846 | ||||
|
847 | Receives the stream profile name. | |||
|
848 | """ | |||
|
849 | if name not in STREAM_ENCODERS: | |||
|
850 | raise error.Abort(_('unknown stream encoder: %s') % name) | |||
|
851 | ||||
|
852 | self._encoder = STREAM_ENCODERS[name][0](ui) | |||
|
853 | ||||
|
854 | def encode(self, data): | |||
|
855 | if not self._encoder: | |||
|
856 | return data | |||
|
857 | ||||
|
858 | return self._encoder.encode(data) | |||
|
859 | ||||
|
860 | def flush(self): | |||
|
861 | if not self._encoder: | |||
|
862 | return b'' | |||
|
863 | ||||
|
864 | return self._encoder.flush() | |||
|
865 | ||||
|
866 | def finish(self): | |||
|
867 | if not self._encoder: | |||
|
868 | return b'' | |||
|
869 | ||||
|
870 | self._encoder.finish() | |||
|
871 | ||||
684 | def ensureserverstream(stream): |
|
872 | def ensureserverstream(stream): | |
685 | if stream.streamid % 2: |
|
873 | if stream.streamid % 2: | |
686 | raise error.ProgrammingError('server should only write to even ' |
|
874 | raise error.ProgrammingError('server should only write to even ' | |
@@ -786,6 +974,8 class serverreactor(object): | |||||
786 | # Sender protocol settings are optional. Set implied default values. |
|
974 | # Sender protocol settings are optional. Set implied default values. | |
787 | self._sendersettings = dict(DEFAULT_PROTOCOL_SETTINGS) |
|
975 | self._sendersettings = dict(DEFAULT_PROTOCOL_SETTINGS) | |
788 |
|
976 | |||
|
977 | populatestreamencoders() | |||
|
978 | ||||
789 | def onframerecv(self, frame): |
|
979 | def onframerecv(self, frame): | |
790 | """Process a frame that has been received off the wire. |
|
980 | """Process a frame that has been received off the wire. | |
791 |
|
981 | |||
@@ -1384,6 +1574,8 class clientreactor(object): | |||||
1384 | self._incomingstreams = {} |
|
1574 | self._incomingstreams = {} | |
1385 | self._streamsettingsdecoders = {} |
|
1575 | self._streamsettingsdecoders = {} | |
1386 |
|
1576 | |||
|
1577 | populatestreamencoders() | |||
|
1578 | ||||
1387 | def callcommand(self, name, args, datafh=None, redirect=None): |
|
1579 | def callcommand(self, name, args, datafh=None, redirect=None): | |
1388 | """Request that a command be executed. |
|
1580 | """Request that a command be executed. | |
1389 |
|
1581 | |||
@@ -1494,9 +1686,13 class clientreactor(object): | |||||
1494 | self._incomingstreams[frame.streamid] = inputstream( |
|
1686 | self._incomingstreams[frame.streamid] = inputstream( | |
1495 | frame.streamid) |
|
1687 | frame.streamid) | |
1496 |
|
1688 | |||
|
1689 | stream = self._incomingstreams[frame.streamid] | |||
|
1690 | ||||
|
1691 | # If the payload is encoded, ask the stream to decode it. We | |||
|
1692 | # merely substitute the decoded result into the frame payload as | |||
|
1693 | # if it had been transferred all along. | |||
1497 | if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED: |
|
1694 | if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED: | |
1498 | raise error.ProgrammingError('support for decoding stream ' |
|
1695 | frame.payload = stream.decode(frame.payload) | |
1499 | 'payloads not yet implemneted') |
|
|||
1500 |
|
1696 | |||
1501 | if frame.streamflags & STREAM_FLAG_END_STREAM: |
|
1697 | if frame.streamflags & STREAM_FLAG_END_STREAM: | |
1502 | del self._incomingstreams[frame.streamid] |
|
1698 | del self._incomingstreams[frame.streamid] | |
@@ -1573,7 +1769,8 class clientreactor(object): | |||||
1573 | } |
|
1769 | } | |
1574 |
|
1770 | |||
1575 | try: |
|
1771 | try: | |
1576 |
self._incomingstreams[frame.streamid].setdecoder( |
|
1772 | self._incomingstreams[frame.streamid].setdecoder(self._ui, | |
|
1773 | decoded[0], | |||
1577 | decoded[1:]) |
|
1774 | decoded[1:]) | |
1578 | except Exception as e: |
|
1775 | except Exception as e: | |
1579 | return 'error', { |
|
1776 | return 'error', { |
@@ -1,6 +1,7 | |||||
1 | from __future__ import absolute_import |
|
1 | from __future__ import absolute_import | |
2 |
|
2 | |||
3 | import unittest |
|
3 | import unittest | |
|
4 | import zlib | |||
4 |
|
5 | |||
5 | from mercurial import ( |
|
6 | from mercurial import ( | |
6 | error, |
|
7 | error, | |
@@ -11,6 +12,12 from mercurial.utils import ( | |||||
11 | cborutil, |
|
12 | cborutil, | |
12 | ) |
|
13 | ) | |
13 |
|
14 | |||
|
15 | try: | |||
|
16 | from mercurial import zstd | |||
|
17 | zstd.__version__ | |||
|
18 | except ImportError: | |||
|
19 | zstd = None | |||
|
20 | ||||
14 | ffs = framing.makeframefromhumanstring |
|
21 | ffs = framing.makeframefromhumanstring | |
15 |
|
22 | |||
16 | globalui = uimod.ui() |
|
23 | globalui = uimod.ui() | |
@@ -261,8 +268,11 class StreamSettingsTests(unittest.TestC | |||||
261 | action, meta = sendframe(reactor, |
|
268 | action, meta = sendframe(reactor, | |
262 | ffs(b'1 2 stream-begin stream-settings eos %s' % data)) |
|
269 | ffs(b'1 2 stream-begin stream-settings eos %s' % data)) | |
263 |
|
270 | |||
264 |
self.assertEqual(action, b' |
|
271 | self.assertEqual(action, b'error') | |
265 |
self.assertEqual(meta, { |
|
272 | self.assertEqual(meta, { | |
|
273 | b'message': b'error setting stream decoder: identity decoder ' | |||
|
274 | b'received unexpected additional values', | |||
|
275 | }) | |||
266 |
|
276 | |||
267 | def testmultipleframes(self): |
|
277 | def testmultipleframes(self): | |
268 | reactor = framing.clientreactor(globalui, buffersends=False) |
|
278 | reactor = framing.clientreactor(globalui, buffersends=False) | |
@@ -286,6 +296,309 class StreamSettingsTests(unittest.TestC | |||||
286 | self.assertEqual(action, b'noop') |
|
296 | self.assertEqual(action, b'noop') | |
287 | self.assertEqual(meta, {}) |
|
297 | self.assertEqual(meta, {}) | |
288 |
|
298 | |||
|
299 | def testinvalidencoder(self): | |||
|
300 | reactor = framing.clientreactor(globalui, buffersends=False) | |||
|
301 | ||||
|
302 | request, action, meta = reactor.callcommand(b'foo', {}) | |||
|
303 | for f in meta[b'framegen']: | |||
|
304 | pass | |||
|
305 | ||||
|
306 | action, meta = sendframe(reactor, | |||
|
307 | ffs(b'1 2 stream-begin stream-settings eos cbor:b"badvalue"')) | |||
|
308 | ||||
|
309 | self.assertEqual(action, b'error') | |||
|
310 | self.assertEqual(meta, { | |||
|
311 | b'message': b'error setting stream decoder: unknown stream ' | |||
|
312 | b'decoder: badvalue', | |||
|
313 | }) | |||
|
314 | ||||
|
315 | def testzlibencoding(self): | |||
|
316 | reactor = framing.clientreactor(globalui, buffersends=False) | |||
|
317 | ||||
|
318 | request, action, meta = reactor.callcommand(b'foo', {}) | |||
|
319 | for f in meta[b'framegen']: | |||
|
320 | pass | |||
|
321 | ||||
|
322 | action, meta = sendframe(reactor, | |||
|
323 | ffs(b'%d 2 stream-begin stream-settings eos cbor:b"zlib"' % | |||
|
324 | request.requestid)) | |||
|
325 | ||||
|
326 | self.assertEqual(action, b'noop') | |||
|
327 | self.assertEqual(meta, {}) | |||
|
328 | ||||
|
329 | result = { | |||
|
330 | b'status': b'ok', | |||
|
331 | } | |||
|
332 | encoded = b''.join(cborutil.streamencode(result)) | |||
|
333 | ||||
|
334 | compressed = zlib.compress(encoded) | |||
|
335 | self.assertEqual(zlib.decompress(compressed), encoded) | |||
|
336 | ||||
|
337 | action, meta = sendframe(reactor, | |||
|
338 | ffs(b'%d 2 encoded command-response eos %s' % | |||
|
339 | (request.requestid, compressed))) | |||
|
340 | ||||
|
341 | self.assertEqual(action, b'responsedata') | |||
|
342 | self.assertEqual(meta[b'data'], encoded) | |||
|
343 | ||||
|
344 | def testzlibencodingsinglebyteframes(self): | |||
|
345 | reactor = framing.clientreactor(globalui, buffersends=False) | |||
|
346 | ||||
|
347 | request, action, meta = reactor.callcommand(b'foo', {}) | |||
|
348 | for f in meta[b'framegen']: | |||
|
349 | pass | |||
|
350 | ||||
|
351 | action, meta = sendframe(reactor, | |||
|
352 | ffs(b'%d 2 stream-begin stream-settings eos cbor:b"zlib"' % | |||
|
353 | request.requestid)) | |||
|
354 | ||||
|
355 | self.assertEqual(action, b'noop') | |||
|
356 | self.assertEqual(meta, {}) | |||
|
357 | ||||
|
358 | result = { | |||
|
359 | b'status': b'ok', | |||
|
360 | } | |||
|
361 | encoded = b''.join(cborutil.streamencode(result)) | |||
|
362 | ||||
|
363 | compressed = zlib.compress(encoded) | |||
|
364 | self.assertEqual(zlib.decompress(compressed), encoded) | |||
|
365 | ||||
|
366 | chunks = [] | |||
|
367 | ||||
|
368 | for i in range(len(compressed)): | |||
|
369 | char = compressed[i:i + 1] | |||
|
370 | if char == b'\\': | |||
|
371 | char = b'\\\\' | |||
|
372 | action, meta = sendframe(reactor, | |||
|
373 | ffs(b'%d 2 encoded command-response continuation %s' % | |||
|
374 | (request.requestid, char))) | |||
|
375 | ||||
|
376 | self.assertEqual(action, b'responsedata') | |||
|
377 | chunks.append(meta[b'data']) | |||
|
378 | self.assertTrue(meta[b'expectmore']) | |||
|
379 | self.assertFalse(meta[b'eos']) | |||
|
380 | ||||
|
381 | # zlib will have the full data decoded at this point, even though | |||
|
382 | # we haven't flushed. | |||
|
383 | self.assertEqual(b''.join(chunks), encoded) | |||
|
384 | ||||
|
385 | # End the stream for good measure. | |||
|
386 | action, meta = sendframe(reactor, | |||
|
387 | ffs(b'%d 2 stream-end command-response eos ' % request.requestid)) | |||
|
388 | ||||
|
389 | self.assertEqual(action, b'responsedata') | |||
|
390 | self.assertEqual(meta[b'data'], b'') | |||
|
391 | self.assertFalse(meta[b'expectmore']) | |||
|
392 | self.assertTrue(meta[b'eos']) | |||
|
393 | ||||
|
394 | def testzlibmultipleresponses(self): | |||
|
395 | # We feed in zlib compressed data on the same stream but belonging to | |||
|
396 | # 2 different requests. This tests our flushing behavior. | |||
|
397 | reactor = framing.clientreactor(globalui, buffersends=False, | |||
|
398 | hasmultiplesend=True) | |||
|
399 | ||||
|
400 | request1, action, meta = reactor.callcommand(b'foo', {}) | |||
|
401 | for f in meta[b'framegen']: | |||
|
402 | pass | |||
|
403 | ||||
|
404 | request2, action, meta = reactor.callcommand(b'foo', {}) | |||
|
405 | for f in meta[b'framegen']: | |||
|
406 | pass | |||
|
407 | ||||
|
408 | outstream = framing.outputstream(2) | |||
|
409 | outstream.setencoder(globalui, b'zlib') | |||
|
410 | ||||
|
411 | response1 = b''.join(cborutil.streamencode({ | |||
|
412 | b'status': b'ok', | |||
|
413 | b'extra': b'response1' * 10, | |||
|
414 | })) | |||
|
415 | ||||
|
416 | response2 = b''.join(cborutil.streamencode({ | |||
|
417 | b'status': b'error', | |||
|
418 | b'extra': b'response2' * 10, | |||
|
419 | })) | |||
|
420 | ||||
|
421 | action, meta = sendframe(reactor, | |||
|
422 | ffs(b'%d 2 stream-begin stream-settings eos cbor:b"zlib"' % | |||
|
423 | request1.requestid)) | |||
|
424 | ||||
|
425 | self.assertEqual(action, b'noop') | |||
|
426 | self.assertEqual(meta, {}) | |||
|
427 | ||||
|
428 | # Feeding partial data in won't get anything useful out. | |||
|
429 | action, meta = sendframe(reactor, | |||
|
430 | ffs(b'%d 2 encoded command-response continuation %s' % ( | |||
|
431 | request1.requestid, outstream.encode(response1)))) | |||
|
432 | self.assertEqual(action, b'responsedata') | |||
|
433 | self.assertEqual(meta[b'data'], b'') | |||
|
434 | ||||
|
435 | # But flushing data at both ends will get our original data. | |||
|
436 | action, meta = sendframe(reactor, | |||
|
437 | ffs(b'%d 2 encoded command-response eos %s' % ( | |||
|
438 | request1.requestid, outstream.flush()))) | |||
|
439 | self.assertEqual(action, b'responsedata') | |||
|
440 | self.assertEqual(meta[b'data'], response1) | |||
|
441 | ||||
|
442 | # We should be able to reuse the compressor/decompressor for the | |||
|
443 | # 2nd response. | |||
|
444 | action, meta = sendframe(reactor, | |||
|
445 | ffs(b'%d 2 encoded command-response continuation %s' % ( | |||
|
446 | request2.requestid, outstream.encode(response2)))) | |||
|
447 | self.assertEqual(action, b'responsedata') | |||
|
448 | self.assertEqual(meta[b'data'], b'') | |||
|
449 | ||||
|
450 | action, meta = sendframe(reactor, | |||
|
451 | ffs(b'%d 2 encoded command-response eos %s' % ( | |||
|
452 | request2.requestid, outstream.flush()))) | |||
|
453 | self.assertEqual(action, b'responsedata') | |||
|
454 | self.assertEqual(meta[b'data'], response2) | |||
|
455 | ||||
|
456 | @unittest.skipUnless(zstd, 'zstd not available') | |||
|
457 | def testzstd8mbencoding(self): | |||
|
458 | reactor = framing.clientreactor(globalui, buffersends=False) | |||
|
459 | ||||
|
460 | request, action, meta = reactor.callcommand(b'foo', {}) | |||
|
461 | for f in meta[b'framegen']: | |||
|
462 | pass | |||
|
463 | ||||
|
464 | action, meta = sendframe(reactor, | |||
|
465 | ffs(b'%d 2 stream-begin stream-settings eos cbor:b"zstd-8mb"' % | |||
|
466 | request.requestid)) | |||
|
467 | ||||
|
468 | self.assertEqual(action, b'noop') | |||
|
469 | self.assertEqual(meta, {}) | |||
|
470 | ||||
|
471 | result = { | |||
|
472 | b'status': b'ok', | |||
|
473 | } | |||
|
474 | encoded = b''.join(cborutil.streamencode(result)) | |||
|
475 | ||||
|
476 | encoder = framing.zstd8mbencoder(globalui) | |||
|
477 | compressed = encoder.encode(encoded) + encoder.finish() | |||
|
478 | self.assertEqual(zstd.ZstdDecompressor().decompress( | |||
|
479 | compressed, max_output_size=len(encoded)), encoded) | |||
|
480 | ||||
|
481 | action, meta = sendframe(reactor, | |||
|
482 | ffs(b'%d 2 encoded command-response eos %s' % | |||
|
483 | (request.requestid, compressed))) | |||
|
484 | ||||
|
485 | self.assertEqual(action, b'responsedata') | |||
|
486 | self.assertEqual(meta[b'data'], encoded) | |||
|
487 | ||||
|
488 | @unittest.skipUnless(zstd, 'zstd not available') | |||
|
489 | def testzstd8mbencodingsinglebyteframes(self): | |||
|
490 | reactor = framing.clientreactor(globalui, buffersends=False) | |||
|
491 | ||||
|
492 | request, action, meta = reactor.callcommand(b'foo', {}) | |||
|
493 | for f in meta[b'framegen']: | |||
|
494 | pass | |||
|
495 | ||||
|
496 | action, meta = sendframe(reactor, | |||
|
497 | ffs(b'%d 2 stream-begin stream-settings eos cbor:b"zstd-8mb"' % | |||
|
498 | request.requestid)) | |||
|
499 | ||||
|
500 | self.assertEqual(action, b'noop') | |||
|
501 | self.assertEqual(meta, {}) | |||
|
502 | ||||
|
503 | result = { | |||
|
504 | b'status': b'ok', | |||
|
505 | } | |||
|
506 | encoded = b''.join(cborutil.streamencode(result)) | |||
|
507 | ||||
|
508 | compressed = zstd.ZstdCompressor().compress(encoded) | |||
|
509 | self.assertEqual(zstd.ZstdDecompressor().decompress(compressed), | |||
|
510 | encoded) | |||
|
511 | ||||
|
512 | chunks = [] | |||
|
513 | ||||
|
514 | for i in range(len(compressed)): | |||
|
515 | char = compressed[i:i + 1] | |||
|
516 | if char == b'\\': | |||
|
517 | char = b'\\\\' | |||
|
518 | action, meta = sendframe(reactor, | |||
|
519 | ffs(b'%d 2 encoded command-response continuation %s' % | |||
|
520 | (request.requestid, char))) | |||
|
521 | ||||
|
522 | self.assertEqual(action, b'responsedata') | |||
|
523 | chunks.append(meta[b'data']) | |||
|
524 | self.assertTrue(meta[b'expectmore']) | |||
|
525 | self.assertFalse(meta[b'eos']) | |||
|
526 | ||||
|
527 | # zstd decompressor will flush at frame boundaries. | |||
|
528 | self.assertEqual(b''.join(chunks), encoded) | |||
|
529 | ||||
|
530 | # End the stream for good measure. | |||
|
531 | action, meta = sendframe(reactor, | |||
|
532 | ffs(b'%d 2 stream-end command-response eos ' % request.requestid)) | |||
|
533 | ||||
|
534 | self.assertEqual(action, b'responsedata') | |||
|
535 | self.assertEqual(meta[b'data'], b'') | |||
|
536 | self.assertFalse(meta[b'expectmore']) | |||
|
537 | self.assertTrue(meta[b'eos']) | |||
|
538 | ||||
|
539 | @unittest.skipUnless(zstd, 'zstd not available') | |||
|
540 | def testzstd8mbmultipleresponses(self): | |||
|
541 | # We feed in zstd compressed data on the same stream but belonging to | |||
|
542 | # 2 different requests. This tests our flushing behavior. | |||
|
543 | reactor = framing.clientreactor(globalui, buffersends=False, | |||
|
544 | hasmultiplesend=True) | |||
|
545 | ||||
|
546 | request1, action, meta = reactor.callcommand(b'foo', {}) | |||
|
547 | for f in meta[b'framegen']: | |||
|
548 | pass | |||
|
549 | ||||
|
550 | request2, action, meta = reactor.callcommand(b'foo', {}) | |||
|
551 | for f in meta[b'framegen']: | |||
|
552 | pass | |||
|
553 | ||||
|
554 | outstream = framing.outputstream(2) | |||
|
555 | outstream.setencoder(globalui, b'zstd-8mb') | |||
|
556 | ||||
|
557 | response1 = b''.join(cborutil.streamencode({ | |||
|
558 | b'status': b'ok', | |||
|
559 | b'extra': b'response1' * 10, | |||
|
560 | })) | |||
|
561 | ||||
|
562 | response2 = b''.join(cborutil.streamencode({ | |||
|
563 | b'status': b'error', | |||
|
564 | b'extra': b'response2' * 10, | |||
|
565 | })) | |||
|
566 | ||||
|
567 | action, meta = sendframe(reactor, | |||
|
568 | ffs(b'%d 2 stream-begin stream-settings eos cbor:b"zstd-8mb"' % | |||
|
569 | request1.requestid)) | |||
|
570 | ||||
|
571 | self.assertEqual(action, b'noop') | |||
|
572 | self.assertEqual(meta, {}) | |||
|
573 | ||||
|
574 | # Feeding partial data in won't get anything useful out. | |||
|
575 | action, meta = sendframe(reactor, | |||
|
576 | ffs(b'%d 2 encoded command-response continuation %s' % ( | |||
|
577 | request1.requestid, outstream.encode(response1)))) | |||
|
578 | self.assertEqual(action, b'responsedata') | |||
|
579 | self.assertEqual(meta[b'data'], b'') | |||
|
580 | ||||
|
581 | # But flushing data at both ends will get our original data. | |||
|
582 | action, meta = sendframe(reactor, | |||
|
583 | ffs(b'%d 2 encoded command-response eos %s' % ( | |||
|
584 | request1.requestid, outstream.flush()))) | |||
|
585 | self.assertEqual(action, b'responsedata') | |||
|
586 | self.assertEqual(meta[b'data'], response1) | |||
|
587 | ||||
|
588 | # We should be able to reuse the compressor/decompressor for the | |||
|
589 | # 2nd response. | |||
|
590 | action, meta = sendframe(reactor, | |||
|
591 | ffs(b'%d 2 encoded command-response continuation %s' % ( | |||
|
592 | request2.requestid, outstream.encode(response2)))) | |||
|
593 | self.assertEqual(action, b'responsedata') | |||
|
594 | self.assertEqual(meta[b'data'], b'') | |||
|
595 | ||||
|
596 | action, meta = sendframe(reactor, | |||
|
597 | ffs(b'%d 2 encoded command-response eos %s' % ( | |||
|
598 | request2.requestid, outstream.flush()))) | |||
|
599 | self.assertEqual(action, b'responsedata') | |||
|
600 | self.assertEqual(meta[b'data'], response2) | |||
|
601 | ||||
289 | if __name__ == '__main__': |
|
602 | if __name__ == '__main__': | |
290 | import silenttestrunner |
|
603 | import silenttestrunner | |
291 | silenttestrunner.main(__name__) |
|
604 | silenttestrunner.main(__name__) |
General Comments 0
You need to be logged in to leave comments.
Login now