Show More
@@ -695,6 +695,61 b' class bundlepart(object):' | |||
|
695 | 695 | elif len(self.data): |
|
696 | 696 | yield self.data |
|
697 | 697 | |
|
698 | ||
|
699 | flaginterrupt = -1 | |
|
700 | ||
|
701 | class interrupthandler(unpackermixin): | |
|
702 | """read one part and process it with restricted capability | |
|
703 | ||
|
704 | This allows to transmit exception raised on the producer size during part | |
|
705 | iteration while the consumer is reading a part. | |
|
706 | ||
|
707 | Part processed in this manner only have access to a ui object,""" | |
|
708 | ||
|
709 | def __init__(self, ui, fp): | |
|
710 | super(interrupthandler, self).__init__(fp) | |
|
711 | self.ui = ui | |
|
712 | ||
|
713 | def _readpartheader(self): | |
|
714 | """reads a part header size and return the bytes blob | |
|
715 | ||
|
716 | returns None if empty""" | |
|
717 | headersize = self._unpack(_fpartheadersize)[0] | |
|
718 | if headersize < 0: | |
|
719 | raise error.BundleValueError('negative part header size: %i' | |
|
720 | % headersize) | |
|
721 | self.ui.debug('part header size: %i\n' % headersize) | |
|
722 | if headersize: | |
|
723 | return self._readexact(headersize) | |
|
724 | return None | |
|
725 | ||
|
726 | def __call__(self): | |
|
727 | self.ui.debug('bundle2 stream interruption, looking for a part.\n') | |
|
728 | headerblock = self._readpartheader() | |
|
729 | if headerblock is None: | |
|
730 | self.ui.debug('no part found during iterruption.\n') | |
|
731 | return | |
|
732 | part = unbundlepart(self.ui, headerblock, self._fp) | |
|
733 | op = interruptoperation(self.ui) | |
|
734 | _processpart(op, part) | |
|
735 | ||
|
736 | class interruptoperation(object): | |
|
737 | """A limited operation to be use by part handler during interruption | |
|
738 | ||
|
739 | It only have access to an ui object. | |
|
740 | """ | |
|
741 | ||
|
742 | def __init__(self, ui): | |
|
743 | self.ui = ui | |
|
744 | self.reply = None | |
|
745 | ||
|
746 | @property | |
|
747 | def repo(self): | |
|
748 | raise RuntimeError('no repo access from stream interruption') | |
|
749 | ||
|
750 | def gettransaction(self): | |
|
751 | raise TransactionUnavailable('no repo access from stream interruption') | |
|
752 | ||
|
698 | 753 | class unbundlepart(unpackermixin): |
|
699 | 754 | """a bundle part read from a bundle""" |
|
700 | 755 | |
@@ -772,10 +827,15 b' class unbundlepart(unpackermixin):' | |||
|
772 | 827 | payloadsize = self._unpack(_fpayloadsize)[0] |
|
773 | 828 | self.ui.debug('payload chunk size: %i\n' % payloadsize) |
|
774 | 829 | while payloadsize: |
|
775 |
if payloadsize |
|
|
776 | msg = 'negative payload chunk size: %i' % payloadsize | |
|
830 | if payloadsize == flaginterrupt: | |
|
831 | # interruption detection, the handler will now read a | |
|
832 | # single part and process it. | |
|
833 | interrupthandler(self.ui, self._fp)() | |
|
834 | elif payloadsize < 0: | |
|
835 | msg = 'negative payload chunk size: %i' % payloadsize | |
|
777 | 836 | raise error.BundleValueError(msg) |
|
778 | yield self._readexact(payloadsize) | |
|
837 | else: | |
|
838 | yield self._readexact(payloadsize) | |
|
779 | 839 | payloadsize = self._unpack(_fpayloadsize)[0] |
|
780 | 840 | self.ui.debug('payload chunk size: %i\n' % payloadsize) |
|
781 | 841 | self._payloadstream = util.chunkbuffer(payloadchunks()) |
General Comments 0
You need to be logged in to leave comments.
Login now