##// END OF EJS Templates
bundle2: add a interrupt mechanism...
Pierre-Yves David -
r23066:ad144882 stable
parent child Browse files
Show More
@@ -695,6 +695,61 b' class bundlepart(object):'
695 elif len(self.data):
695 elif len(self.data):
696 yield self.data
696 yield self.data
697
697
698
699 flaginterrupt = -1
700
701 class interrupthandler(unpackermixin):
702 """read one part and process it with restricted capability
703
704 This allows to transmit exception raised on the producer size during part
705 iteration while the consumer is reading a part.
706
707 Part processed in this manner only have access to a ui object,"""
708
709 def __init__(self, ui, fp):
710 super(interrupthandler, self).__init__(fp)
711 self.ui = ui
712
713 def _readpartheader(self):
714 """reads a part header size and return the bytes blob
715
716 returns None if empty"""
717 headersize = self._unpack(_fpartheadersize)[0]
718 if headersize < 0:
719 raise error.BundleValueError('negative part header size: %i'
720 % headersize)
721 self.ui.debug('part header size: %i\n' % headersize)
722 if headersize:
723 return self._readexact(headersize)
724 return None
725
726 def __call__(self):
727 self.ui.debug('bundle2 stream interruption, looking for a part.\n')
728 headerblock = self._readpartheader()
729 if headerblock is None:
730 self.ui.debug('no part found during iterruption.\n')
731 return
732 part = unbundlepart(self.ui, headerblock, self._fp)
733 op = interruptoperation(self.ui)
734 _processpart(op, part)
735
736 class interruptoperation(object):
737 """A limited operation to be use by part handler during interruption
738
739 It only have access to an ui object.
740 """
741
742 def __init__(self, ui):
743 self.ui = ui
744 self.reply = None
745
746 @property
747 def repo(self):
748 raise RuntimeError('no repo access from stream interruption')
749
750 def gettransaction(self):
751 raise TransactionUnavailable('no repo access from stream interruption')
752
698 class unbundlepart(unpackermixin):
753 class unbundlepart(unpackermixin):
699 """a bundle part read from a bundle"""
754 """a bundle part read from a bundle"""
700
755
@@ -772,10 +827,15 b' class unbundlepart(unpackermixin):'
772 payloadsize = self._unpack(_fpayloadsize)[0]
827 payloadsize = self._unpack(_fpayloadsize)[0]
773 self.ui.debug('payload chunk size: %i\n' % payloadsize)
828 self.ui.debug('payload chunk size: %i\n' % payloadsize)
774 while payloadsize:
829 while payloadsize:
775 if payloadsize < 0:
830 if payloadsize == flaginterrupt:
776 msg = 'negative payload chunk size: %i' % payloadsize
831 # interruption detection, the handler will now read a
832 # single part and process it.
833 interrupthandler(self.ui, self._fp)()
834 elif payloadsize < 0:
835 msg = 'negative payload chunk size: %i' % payloadsize
777 raise error.BundleValueError(msg)
836 raise error.BundleValueError(msg)
778 yield self._readexact(payloadsize)
837 else:
838 yield self._readexact(payloadsize)
779 payloadsize = self._unpack(_fpayloadsize)[0]
839 payloadsize = self._unpack(_fpayloadsize)[0]
780 self.ui.debug('payload chunk size: %i\n' % payloadsize)
840 self.ui.debug('payload chunk size: %i\n' % payloadsize)
781 self._payloadstream = util.chunkbuffer(payloadchunks())
841 self._payloadstream = util.chunkbuffer(payloadchunks())
General Comments 0
You need to be logged in to leave comments. Login now