Show More
@@ -695,6 +695,61 b' class bundlepart(object):' | |||||
695 | elif len(self.data): |
|
695 | elif len(self.data): | |
696 | yield self.data |
|
696 | yield self.data | |
697 |
|
697 | |||
|
698 | ||||
|
699 | flaginterrupt = -1 | |||
|
700 | ||||
|
701 | class interrupthandler(unpackermixin): | |||
|
702 | """read one part and process it with restricted capability | |||
|
703 | ||||
|
704 | This allows to transmit exception raised on the producer size during part | |||
|
705 | iteration while the consumer is reading a part. | |||
|
706 | ||||
|
707 | Part processed in this manner only have access to a ui object,""" | |||
|
708 | ||||
|
709 | def __init__(self, ui, fp): | |||
|
710 | super(interrupthandler, self).__init__(fp) | |||
|
711 | self.ui = ui | |||
|
712 | ||||
|
713 | def _readpartheader(self): | |||
|
714 | """reads a part header size and return the bytes blob | |||
|
715 | ||||
|
716 | returns None if empty""" | |||
|
717 | headersize = self._unpack(_fpartheadersize)[0] | |||
|
718 | if headersize < 0: | |||
|
719 | raise error.BundleValueError('negative part header size: %i' | |||
|
720 | % headersize) | |||
|
721 | self.ui.debug('part header size: %i\n' % headersize) | |||
|
722 | if headersize: | |||
|
723 | return self._readexact(headersize) | |||
|
724 | return None | |||
|
725 | ||||
|
726 | def __call__(self): | |||
|
727 | self.ui.debug('bundle2 stream interruption, looking for a part.\n') | |||
|
728 | headerblock = self._readpartheader() | |||
|
729 | if headerblock is None: | |||
|
730 | self.ui.debug('no part found during iterruption.\n') | |||
|
731 | return | |||
|
732 | part = unbundlepart(self.ui, headerblock, self._fp) | |||
|
733 | op = interruptoperation(self.ui) | |||
|
734 | _processpart(op, part) | |||
|
735 | ||||
|
736 | class interruptoperation(object): | |||
|
737 | """A limited operation to be use by part handler during interruption | |||
|
738 | ||||
|
739 | It only have access to an ui object. | |||
|
740 | """ | |||
|
741 | ||||
|
742 | def __init__(self, ui): | |||
|
743 | self.ui = ui | |||
|
744 | self.reply = None | |||
|
745 | ||||
|
746 | @property | |||
|
747 | def repo(self): | |||
|
748 | raise RuntimeError('no repo access from stream interruption') | |||
|
749 | ||||
|
750 | def gettransaction(self): | |||
|
751 | raise TransactionUnavailable('no repo access from stream interruption') | |||
|
752 | ||||
698 | class unbundlepart(unpackermixin): |
|
753 | class unbundlepart(unpackermixin): | |
699 | """a bundle part read from a bundle""" |
|
754 | """a bundle part read from a bundle""" | |
700 |
|
755 | |||
@@ -772,10 +827,15 b' class unbundlepart(unpackermixin):' | |||||
772 | payloadsize = self._unpack(_fpayloadsize)[0] |
|
827 | payloadsize = self._unpack(_fpayloadsize)[0] | |
773 | self.ui.debug('payload chunk size: %i\n' % payloadsize) |
|
828 | self.ui.debug('payload chunk size: %i\n' % payloadsize) | |
774 | while payloadsize: |
|
829 | while payloadsize: | |
775 |
if payloadsize |
|
830 | if payloadsize == flaginterrupt: | |
776 | msg = 'negative payload chunk size: %i' % payloadsize |
|
831 | # interruption detection, the handler will now read a | |
|
832 | # single part and process it. | |||
|
833 | interrupthandler(self.ui, self._fp)() | |||
|
834 | elif payloadsize < 0: | |||
|
835 | msg = 'negative payload chunk size: %i' % payloadsize | |||
777 | raise error.BundleValueError(msg) |
|
836 | raise error.BundleValueError(msg) | |
778 | yield self._readexact(payloadsize) |
|
837 | else: | |
|
838 | yield self._readexact(payloadsize) | |||
779 | payloadsize = self._unpack(_fpayloadsize)[0] |
|
839 | payloadsize = self._unpack(_fpayloadsize)[0] | |
780 | self.ui.debug('payload chunk size: %i\n' % payloadsize) |
|
840 | self.ui.debug('payload chunk size: %i\n' % payloadsize) | |
781 | self._payloadstream = util.chunkbuffer(payloadchunks()) |
|
841 | self._payloadstream = util.chunkbuffer(payloadchunks()) |
General Comments 0
You need to be logged in to leave comments.
Login now