##// END OF EJS Templates
bundle2: add a interrupt mechanism...
Pierre-Yves David -
r23066:ad144882 stable
parent child Browse files
Show More
@@ -695,6 +695,61 b' class bundlepart(object):'
695 695 elif len(self.data):
696 696 yield self.data
697 697
698
699 flaginterrupt = -1
700
701 class interrupthandler(unpackermixin):
702 """read one part and process it with restricted capability
703
704 This allows to transmit exception raised on the producer size during part
705 iteration while the consumer is reading a part.
706
707 Part processed in this manner only have access to a ui object,"""
708
709 def __init__(self, ui, fp):
710 super(interrupthandler, self).__init__(fp)
711 self.ui = ui
712
713 def _readpartheader(self):
714 """reads a part header size and return the bytes blob
715
716 returns None if empty"""
717 headersize = self._unpack(_fpartheadersize)[0]
718 if headersize < 0:
719 raise error.BundleValueError('negative part header size: %i'
720 % headersize)
721 self.ui.debug('part header size: %i\n' % headersize)
722 if headersize:
723 return self._readexact(headersize)
724 return None
725
726 def __call__(self):
727 self.ui.debug('bundle2 stream interruption, looking for a part.\n')
728 headerblock = self._readpartheader()
729 if headerblock is None:
730 self.ui.debug('no part found during iterruption.\n')
731 return
732 part = unbundlepart(self.ui, headerblock, self._fp)
733 op = interruptoperation(self.ui)
734 _processpart(op, part)
735
736 class interruptoperation(object):
737 """A limited operation to be use by part handler during interruption
738
739 It only have access to an ui object.
740 """
741
742 def __init__(self, ui):
743 self.ui = ui
744 self.reply = None
745
746 @property
747 def repo(self):
748 raise RuntimeError('no repo access from stream interruption')
749
750 def gettransaction(self):
751 raise TransactionUnavailable('no repo access from stream interruption')
752
698 753 class unbundlepart(unpackermixin):
699 754 """a bundle part read from a bundle"""
700 755
@@ -772,10 +827,15 b' class unbundlepart(unpackermixin):'
772 827 payloadsize = self._unpack(_fpayloadsize)[0]
773 828 self.ui.debug('payload chunk size: %i\n' % payloadsize)
774 829 while payloadsize:
775 if payloadsize < 0:
776 msg = 'negative payload chunk size: %i' % payloadsize
830 if payloadsize == flaginterrupt:
831 # interruption detection, the handler will now read a
832 # single part and process it.
833 interrupthandler(self.ui, self._fp)()
834 elif payloadsize < 0:
835 msg = 'negative payload chunk size: %i' % payloadsize
777 836 raise error.BundleValueError(msg)
778 yield self._readexact(payloadsize)
837 else:
838 yield self._readexact(payloadsize)
779 839 payloadsize = self._unpack(_fpayloadsize)[0]
780 840 self.ui.debug('payload chunk size: %i\n' % payloadsize)
781 841 self._payloadstream = util.chunkbuffer(payloadchunks())
General Comments 0
You need to be logged in to leave comments. Login now