##// END OF EJS Templates
wireprotov2: define and use stream encoders...
Gregory Szorc -
r40167:e6752241 default
parent child Browse files
Show More
@@ -648,6 +648,140 b' class bufferingcommandresponseemitter(ob'
648 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
648 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
649 payload=payload)
649 payload=payload)
650
650
651 # TODO consider defining encoders/decoders using the util.compressionengine
652 # mechanism.
653
654 class identityencoder(object):
655 """Encoder for the "identity" stream encoding profile."""
656 def __init__(self, ui):
657 pass
658
659 def encode(self, data):
660 return data
661
662 def flush(self):
663 return b''
664
665 def finish(self):
666 return b''
667
668 class identitydecoder(object):
669 """Decoder for the "identity" stream encoding profile."""
670
671 def __init__(self, ui, extraobjs):
672 if extraobjs:
673 raise error.Abort(_('identity decoder received unexpected '
674 'additional values'))
675
676 def decode(self, data):
677 return data
678
679 class zlibencoder(object):
680 def __init__(self, ui):
681 import zlib
682 self._zlib = zlib
683 self._compressor = zlib.compressobj()
684
685 def encode(self, data):
686 return self._compressor.compress(data)
687
688 def flush(self):
689 # Z_SYNC_FLUSH doesn't reset compression context, which is
690 # what we want.
691 return self._compressor.flush(self._zlib.Z_SYNC_FLUSH)
692
693 def finish(self):
694 res = self._compressor.flush(self._zlib.Z_FINISH)
695 self._compressor = None
696 return res
697
698 class zlibdecoder(object):
699 def __init__(self, ui, extraobjs):
700 import zlib
701
702 if extraobjs:
703 raise error.Abort(_('zlib decoder received unexpected '
704 'additional values'))
705
706 self._decompressor = zlib.decompressobj()
707
708 def decode(self, data):
709 # Python 2's zlib module doesn't use the buffer protocol and can't
710 # handle all bytes-like types.
711 if not pycompat.ispy3 and isinstance(data, bytearray):
712 data = bytes(data)
713
714 return self._decompressor.decompress(data)
715
716 class zstdbaseencoder(object):
717 def __init__(self, level):
718 from . import zstd
719
720 self._zstd = zstd
721 cctx = zstd.ZstdCompressor(level=level)
722 self._compressor = cctx.compressobj()
723
724 def encode(self, data):
725 return self._compressor.compress(data)
726
727 def flush(self):
728 # COMPRESSOBJ_FLUSH_BLOCK flushes all data previously fed into the
729 # compressor and allows a decompressor to access all encoded data
730 # up to this point.
731 return self._compressor.flush(self._zstd.COMPRESSOBJ_FLUSH_BLOCK)
732
733 def finish(self):
734 res = self._compressor.flush(self._zstd.COMPRESSOBJ_FLUSH_FINISH)
735 self._compressor = None
736 return res
737
738 class zstd8mbencoder(zstdbaseencoder):
739 def __init__(self, ui):
740 super(zstd8mbencoder, self).__init__(3)
741
742 class zstdbasedecoder(object):
743 def __init__(self, maxwindowsize):
744 from . import zstd
745 dctx = zstd.ZstdDecompressor(max_window_size=maxwindowsize)
746 self._decompressor = dctx.decompressobj()
747
748 def decode(self, data):
749 return self._decompressor.decompress(data)
750
751 class zstd8mbdecoder(zstdbasedecoder):
752 def __init__(self, ui, extraobjs):
753 if extraobjs:
754 raise error.Abort(_('zstd8mb decoder received unexpected '
755 'additional values'))
756
757 super(zstd8mbdecoder, self).__init__(maxwindowsize=8 * 1048576)
758
759 # We lazily populate this to avoid excessive module imports when importing
760 # this module.
761 STREAM_ENCODERS = {}
762 STREAM_ENCODERS_ORDER = []
763
764 def populatestreamencoders():
765 if STREAM_ENCODERS:
766 return
767
768 try:
769 from . import zstd
770 zstd.__version__
771 except ImportError:
772 zstd = None
773
774 # zstandard is fastest and is preferred.
775 if zstd:
776 STREAM_ENCODERS[b'zstd-8mb'] = (zstd8mbencoder, zstd8mbdecoder)
777 STREAM_ENCODERS_ORDER.append(b'zstd-8mb')
778
779 STREAM_ENCODERS[b'zlib'] = (zlibencoder, zlibdecoder)
780 STREAM_ENCODERS_ORDER.append(b'zlib')
781
782 STREAM_ENCODERS[b'identity'] = (identityencoder, identitydecoder)
783 STREAM_ENCODERS_ORDER.append(b'identity')
784
651 class stream(object):
785 class stream(object):
652 """Represents a logical unidirectional series of frames."""
786 """Represents a logical unidirectional series of frames."""
653
787
@@ -671,16 +805,70 b' class stream(object):'
671 class inputstream(stream):
805 class inputstream(stream):
672 """Represents a stream used for receiving data."""
806 """Represents a stream used for receiving data."""
673
807
674 def setdecoder(self, name, extraobjs):
808 def __init__(self, streamid, active=False):
809 super(inputstream, self).__init__(streamid, active=active)
810 self._decoder = None
811
812 def setdecoder(self, ui, name, extraobjs):
675 """Set the decoder for this stream.
813 """Set the decoder for this stream.
676
814
677 Receives the stream profile name and any additional CBOR objects
815 Receives the stream profile name and any additional CBOR objects
678 decoded from the stream encoding settings frame payloads.
816 decoded from the stream encoding settings frame payloads.
679 """
817 """
818 if name not in STREAM_ENCODERS:
819 raise error.Abort(_('unknown stream decoder: %s') % name)
820
821 self._decoder = STREAM_ENCODERS[name][1](ui, extraobjs)
822
823 def decode(self, data):
824 # Default is identity decoder. We don't bother instantiating one
825 # because it is trivial.
826 if not self._decoder:
827 return data
828
829 return self._decoder.decode(data)
830
831 def flush(self):
832 if not self._decoder:
833 return b''
834
835 return self._decoder.flush()
680
836
681 class outputstream(stream):
837 class outputstream(stream):
682 """Represents a stream used for sending data."""
838 """Represents a stream used for sending data."""
683
839
840 def __init__(self, streamid, active=False):
841 super(outputstream, self).__init__(streamid, active=active)
842 self._encoder = None
843
844 def setencoder(self, ui, name):
845 """Set the encoder for this stream.
846
847 Receives the stream profile name.
848 """
849 if name not in STREAM_ENCODERS:
850 raise error.Abort(_('unknown stream encoder: %s') % name)
851
852 self._encoder = STREAM_ENCODERS[name][0](ui)
853
854 def encode(self, data):
855 if not self._encoder:
856 return data
857
858 return self._encoder.encode(data)
859
860 def flush(self):
861 if not self._encoder:
862 return b''
863
864 return self._encoder.flush()
865
866 def finish(self):
867 if not self._encoder:
868 return b''
869
870 self._encoder.finish()
871
684 def ensureserverstream(stream):
872 def ensureserverstream(stream):
685 if stream.streamid % 2:
873 if stream.streamid % 2:
686 raise error.ProgrammingError('server should only write to even '
874 raise error.ProgrammingError('server should only write to even '
@@ -786,6 +974,8 b' class serverreactor(object):'
786 # Sender protocol settings are optional. Set implied default values.
974 # Sender protocol settings are optional. Set implied default values.
787 self._sendersettings = dict(DEFAULT_PROTOCOL_SETTINGS)
975 self._sendersettings = dict(DEFAULT_PROTOCOL_SETTINGS)
788
976
977 populatestreamencoders()
978
789 def onframerecv(self, frame):
979 def onframerecv(self, frame):
790 """Process a frame that has been received off the wire.
980 """Process a frame that has been received off the wire.
791
981
@@ -1384,6 +1574,8 b' class clientreactor(object):'
1384 self._incomingstreams = {}
1574 self._incomingstreams = {}
1385 self._streamsettingsdecoders = {}
1575 self._streamsettingsdecoders = {}
1386
1576
1577 populatestreamencoders()
1578
1387 def callcommand(self, name, args, datafh=None, redirect=None):
1579 def callcommand(self, name, args, datafh=None, redirect=None):
1388 """Request that a command be executed.
1580 """Request that a command be executed.
1389
1581
@@ -1494,9 +1686,13 b' class clientreactor(object):'
1494 self._incomingstreams[frame.streamid] = inputstream(
1686 self._incomingstreams[frame.streamid] = inputstream(
1495 frame.streamid)
1687 frame.streamid)
1496
1688
1689 stream = self._incomingstreams[frame.streamid]
1690
1691 # If the payload is encoded, ask the stream to decode it. We
1692 # merely substitute the decoded result into the frame payload as
1693 # if it had been transferred all along.
1497 if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED:
1694 if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED:
1498 raise error.ProgrammingError('support for decoding stream '
1695 frame.payload = stream.decode(frame.payload)
1499 'payloads not yet implemneted')
1500
1696
1501 if frame.streamflags & STREAM_FLAG_END_STREAM:
1697 if frame.streamflags & STREAM_FLAG_END_STREAM:
1502 del self._incomingstreams[frame.streamid]
1698 del self._incomingstreams[frame.streamid]
@@ -1573,7 +1769,8 b' class clientreactor(object):'
1573 }
1769 }
1574
1770
1575 try:
1771 try:
1576 self._incomingstreams[frame.streamid].setdecoder(decoded[0],
1772 self._incomingstreams[frame.streamid].setdecoder(self._ui,
1773 decoded[0],
1577 decoded[1:])
1774 decoded[1:])
1578 except Exception as e:
1775 except Exception as e:
1579 return 'error', {
1776 return 'error', {
@@ -1,6 +1,7 b''
1 from __future__ import absolute_import
1 from __future__ import absolute_import
2
2
3 import unittest
3 import unittest
4 import zlib
4
5
5 from mercurial import (
6 from mercurial import (
6 error,
7 error,
@@ -11,6 +12,12 b' from mercurial.utils import ('
11 cborutil,
12 cborutil,
12 )
13 )
13
14
15 try:
16 from mercurial import zstd
17 zstd.__version__
18 except ImportError:
19 zstd = None
20
14 ffs = framing.makeframefromhumanstring
21 ffs = framing.makeframefromhumanstring
15
22
16 globalui = uimod.ui()
23 globalui = uimod.ui()
@@ -261,8 +268,11 b' class StreamSettingsTests(unittest.TestC'
261 action, meta = sendframe(reactor,
268 action, meta = sendframe(reactor,
262 ffs(b'1 2 stream-begin stream-settings eos %s' % data))
269 ffs(b'1 2 stream-begin stream-settings eos %s' % data))
263
270
264 self.assertEqual(action, b'noop')
271 self.assertEqual(action, b'error')
265 self.assertEqual(meta, {})
272 self.assertEqual(meta, {
273 b'message': b'error setting stream decoder: identity decoder '
274 b'received unexpected additional values',
275 })
266
276
267 def testmultipleframes(self):
277 def testmultipleframes(self):
268 reactor = framing.clientreactor(globalui, buffersends=False)
278 reactor = framing.clientreactor(globalui, buffersends=False)
@@ -286,6 +296,309 b' class StreamSettingsTests(unittest.TestC'
286 self.assertEqual(action, b'noop')
296 self.assertEqual(action, b'noop')
287 self.assertEqual(meta, {})
297 self.assertEqual(meta, {})
288
298
299 def testinvalidencoder(self):
300 reactor = framing.clientreactor(globalui, buffersends=False)
301
302 request, action, meta = reactor.callcommand(b'foo', {})
303 for f in meta[b'framegen']:
304 pass
305
306 action, meta = sendframe(reactor,
307 ffs(b'1 2 stream-begin stream-settings eos cbor:b"badvalue"'))
308
309 self.assertEqual(action, b'error')
310 self.assertEqual(meta, {
311 b'message': b'error setting stream decoder: unknown stream '
312 b'decoder: badvalue',
313 })
314
315 def testzlibencoding(self):
316 reactor = framing.clientreactor(globalui, buffersends=False)
317
318 request, action, meta = reactor.callcommand(b'foo', {})
319 for f in meta[b'framegen']:
320 pass
321
322 action, meta = sendframe(reactor,
323 ffs(b'%d 2 stream-begin stream-settings eos cbor:b"zlib"' %
324 request.requestid))
325
326 self.assertEqual(action, b'noop')
327 self.assertEqual(meta, {})
328
329 result = {
330 b'status': b'ok',
331 }
332 encoded = b''.join(cborutil.streamencode(result))
333
334 compressed = zlib.compress(encoded)
335 self.assertEqual(zlib.decompress(compressed), encoded)
336
337 action, meta = sendframe(reactor,
338 ffs(b'%d 2 encoded command-response eos %s' %
339 (request.requestid, compressed)))
340
341 self.assertEqual(action, b'responsedata')
342 self.assertEqual(meta[b'data'], encoded)
343
344 def testzlibencodingsinglebyteframes(self):
345 reactor = framing.clientreactor(globalui, buffersends=False)
346
347 request, action, meta = reactor.callcommand(b'foo', {})
348 for f in meta[b'framegen']:
349 pass
350
351 action, meta = sendframe(reactor,
352 ffs(b'%d 2 stream-begin stream-settings eos cbor:b"zlib"' %
353 request.requestid))
354
355 self.assertEqual(action, b'noop')
356 self.assertEqual(meta, {})
357
358 result = {
359 b'status': b'ok',
360 }
361 encoded = b''.join(cborutil.streamencode(result))
362
363 compressed = zlib.compress(encoded)
364 self.assertEqual(zlib.decompress(compressed), encoded)
365
366 chunks = []
367
368 for i in range(len(compressed)):
369 char = compressed[i:i + 1]
370 if char == b'\\':
371 char = b'\\\\'
372 action, meta = sendframe(reactor,
373 ffs(b'%d 2 encoded command-response continuation %s' %
374 (request.requestid, char)))
375
376 self.assertEqual(action, b'responsedata')
377 chunks.append(meta[b'data'])
378 self.assertTrue(meta[b'expectmore'])
379 self.assertFalse(meta[b'eos'])
380
381 # zlib will have the full data decoded at this point, even though
382 # we haven't flushed.
383 self.assertEqual(b''.join(chunks), encoded)
384
385 # End the stream for good measure.
386 action, meta = sendframe(reactor,
387 ffs(b'%d 2 stream-end command-response eos ' % request.requestid))
388
389 self.assertEqual(action, b'responsedata')
390 self.assertEqual(meta[b'data'], b'')
391 self.assertFalse(meta[b'expectmore'])
392 self.assertTrue(meta[b'eos'])
393
394 def testzlibmultipleresponses(self):
395 # We feed in zlib compressed data on the same stream but belonging to
396 # 2 different requests. This tests our flushing behavior.
397 reactor = framing.clientreactor(globalui, buffersends=False,
398 hasmultiplesend=True)
399
400 request1, action, meta = reactor.callcommand(b'foo', {})
401 for f in meta[b'framegen']:
402 pass
403
404 request2, action, meta = reactor.callcommand(b'foo', {})
405 for f in meta[b'framegen']:
406 pass
407
408 outstream = framing.outputstream(2)
409 outstream.setencoder(globalui, b'zlib')
410
411 response1 = b''.join(cborutil.streamencode({
412 b'status': b'ok',
413 b'extra': b'response1' * 10,
414 }))
415
416 response2 = b''.join(cborutil.streamencode({
417 b'status': b'error',
418 b'extra': b'response2' * 10,
419 }))
420
421 action, meta = sendframe(reactor,
422 ffs(b'%d 2 stream-begin stream-settings eos cbor:b"zlib"' %
423 request1.requestid))
424
425 self.assertEqual(action, b'noop')
426 self.assertEqual(meta, {})
427
428 # Feeding partial data in won't get anything useful out.
429 action, meta = sendframe(reactor,
430 ffs(b'%d 2 encoded command-response continuation %s' % (
431 request1.requestid, outstream.encode(response1))))
432 self.assertEqual(action, b'responsedata')
433 self.assertEqual(meta[b'data'], b'')
434
435 # But flushing data at both ends will get our original data.
436 action, meta = sendframe(reactor,
437 ffs(b'%d 2 encoded command-response eos %s' % (
438 request1.requestid, outstream.flush())))
439 self.assertEqual(action, b'responsedata')
440 self.assertEqual(meta[b'data'], response1)
441
442 # We should be able to reuse the compressor/decompressor for the
443 # 2nd response.
444 action, meta = sendframe(reactor,
445 ffs(b'%d 2 encoded command-response continuation %s' % (
446 request2.requestid, outstream.encode(response2))))
447 self.assertEqual(action, b'responsedata')
448 self.assertEqual(meta[b'data'], b'')
449
450 action, meta = sendframe(reactor,
451 ffs(b'%d 2 encoded command-response eos %s' % (
452 request2.requestid, outstream.flush())))
453 self.assertEqual(action, b'responsedata')
454 self.assertEqual(meta[b'data'], response2)
455
456 @unittest.skipUnless(zstd, 'zstd not available')
457 def testzstd8mbencoding(self):
458 reactor = framing.clientreactor(globalui, buffersends=False)
459
460 request, action, meta = reactor.callcommand(b'foo', {})
461 for f in meta[b'framegen']:
462 pass
463
464 action, meta = sendframe(reactor,
465 ffs(b'%d 2 stream-begin stream-settings eos cbor:b"zstd-8mb"' %
466 request.requestid))
467
468 self.assertEqual(action, b'noop')
469 self.assertEqual(meta, {})
470
471 result = {
472 b'status': b'ok',
473 }
474 encoded = b''.join(cborutil.streamencode(result))
475
476 encoder = framing.zstd8mbencoder(globalui)
477 compressed = encoder.encode(encoded) + encoder.finish()
478 self.assertEqual(zstd.ZstdDecompressor().decompress(
479 compressed, max_output_size=len(encoded)), encoded)
480
481 action, meta = sendframe(reactor,
482 ffs(b'%d 2 encoded command-response eos %s' %
483 (request.requestid, compressed)))
484
485 self.assertEqual(action, b'responsedata')
486 self.assertEqual(meta[b'data'], encoded)
487
488 @unittest.skipUnless(zstd, 'zstd not available')
489 def testzstd8mbencodingsinglebyteframes(self):
490 reactor = framing.clientreactor(globalui, buffersends=False)
491
492 request, action, meta = reactor.callcommand(b'foo', {})
493 for f in meta[b'framegen']:
494 pass
495
496 action, meta = sendframe(reactor,
497 ffs(b'%d 2 stream-begin stream-settings eos cbor:b"zstd-8mb"' %
498 request.requestid))
499
500 self.assertEqual(action, b'noop')
501 self.assertEqual(meta, {})
502
503 result = {
504 b'status': b'ok',
505 }
506 encoded = b''.join(cborutil.streamencode(result))
507
508 compressed = zstd.ZstdCompressor().compress(encoded)
509 self.assertEqual(zstd.ZstdDecompressor().decompress(compressed),
510 encoded)
511
512 chunks = []
513
514 for i in range(len(compressed)):
515 char = compressed[i:i + 1]
516 if char == b'\\':
517 char = b'\\\\'
518 action, meta = sendframe(reactor,
519 ffs(b'%d 2 encoded command-response continuation %s' %
520 (request.requestid, char)))
521
522 self.assertEqual(action, b'responsedata')
523 chunks.append(meta[b'data'])
524 self.assertTrue(meta[b'expectmore'])
525 self.assertFalse(meta[b'eos'])
526
527 # zstd decompressor will flush at frame boundaries.
528 self.assertEqual(b''.join(chunks), encoded)
529
530 # End the stream for good measure.
531 action, meta = sendframe(reactor,
532 ffs(b'%d 2 stream-end command-response eos ' % request.requestid))
533
534 self.assertEqual(action, b'responsedata')
535 self.assertEqual(meta[b'data'], b'')
536 self.assertFalse(meta[b'expectmore'])
537 self.assertTrue(meta[b'eos'])
538
539 @unittest.skipUnless(zstd, 'zstd not available')
540 def testzstd8mbmultipleresponses(self):
541 # We feed in zstd compressed data on the same stream but belonging to
542 # 2 different requests. This tests our flushing behavior.
543 reactor = framing.clientreactor(globalui, buffersends=False,
544 hasmultiplesend=True)
545
546 request1, action, meta = reactor.callcommand(b'foo', {})
547 for f in meta[b'framegen']:
548 pass
549
550 request2, action, meta = reactor.callcommand(b'foo', {})
551 for f in meta[b'framegen']:
552 pass
553
554 outstream = framing.outputstream(2)
555 outstream.setencoder(globalui, b'zstd-8mb')
556
557 response1 = b''.join(cborutil.streamencode({
558 b'status': b'ok',
559 b'extra': b'response1' * 10,
560 }))
561
562 response2 = b''.join(cborutil.streamencode({
563 b'status': b'error',
564 b'extra': b'response2' * 10,
565 }))
566
567 action, meta = sendframe(reactor,
568 ffs(b'%d 2 stream-begin stream-settings eos cbor:b"zstd-8mb"' %
569 request1.requestid))
570
571 self.assertEqual(action, b'noop')
572 self.assertEqual(meta, {})
573
574 # Feeding partial data in won't get anything useful out.
575 action, meta = sendframe(reactor,
576 ffs(b'%d 2 encoded command-response continuation %s' % (
577 request1.requestid, outstream.encode(response1))))
578 self.assertEqual(action, b'responsedata')
579 self.assertEqual(meta[b'data'], b'')
580
581 # But flushing data at both ends will get our original data.
582 action, meta = sendframe(reactor,
583 ffs(b'%d 2 encoded command-response eos %s' % (
584 request1.requestid, outstream.flush())))
585 self.assertEqual(action, b'responsedata')
586 self.assertEqual(meta[b'data'], response1)
587
588 # We should be able to reuse the compressor/decompressor for the
589 # 2nd response.
590 action, meta = sendframe(reactor,
591 ffs(b'%d 2 encoded command-response continuation %s' % (
592 request2.requestid, outstream.encode(response2))))
593 self.assertEqual(action, b'responsedata')
594 self.assertEqual(meta[b'data'], b'')
595
596 action, meta = sendframe(reactor,
597 ffs(b'%d 2 encoded command-response eos %s' % (
598 request2.requestid, outstream.flush())))
599 self.assertEqual(action, b'responsedata')
600 self.assertEqual(meta[b'data'], response2)
601
289 if __name__ == '__main__':
602 if __name__ == '__main__':
290 import silenttestrunner
603 import silenttestrunner
291 silenttestrunner.main(__name__)
604 silenttestrunner.main(__name__)
General Comments 0
You need to be logged in to leave comments. Login now