diff --git a/mercurial/bundle2.py b/mercurial/bundle2.py --- a/mercurial/bundle2.py +++ b/mercurial/bundle2.py @@ -695,6 +695,61 @@ class bundlepart(object): elif len(self.data): yield self.data + +flaginterrupt = -1 + +class interrupthandler(unpackermixin): + """read one part and process it with restricted capability + + This allows to transmit exception raised on the producer size during part + iteration while the consumer is reading a part. + + Part processed in this manner only have access to a ui object,""" + + def __init__(self, ui, fp): + super(interrupthandler, self).__init__(fp) + self.ui = ui + + def _readpartheader(self): + """reads a part header size and return the bytes blob + + returns None if empty""" + headersize = self._unpack(_fpartheadersize)[0] + if headersize < 0: + raise error.BundleValueError('negative part header size: %i' + % headersize) + self.ui.debug('part header size: %i\n' % headersize) + if headersize: + return self._readexact(headersize) + return None + + def __call__(self): + self.ui.debug('bundle2 stream interruption, looking for a part.\n') + headerblock = self._readpartheader() + if headerblock is None: + self.ui.debug('no part found during iterruption.\n') + return + part = unbundlepart(self.ui, headerblock, self._fp) + op = interruptoperation(self.ui) + _processpart(op, part) + +class interruptoperation(object): + """A limited operation to be use by part handler during interruption + + It only have access to an ui object. + """ + + def __init__(self, ui): + self.ui = ui + self.reply = None + + @property + def repo(self): + raise RuntimeError('no repo access from stream interruption') + + def gettransaction(self): + raise TransactionUnavailable('no repo access from stream interruption') + class unbundlepart(unpackermixin): """a bundle part read from a bundle""" @@ -772,10 +827,15 @@ class unbundlepart(unpackermixin): payloadsize = self._unpack(_fpayloadsize)[0] self.ui.debug('payload chunk size: %i\n' % payloadsize) while payloadsize: - if payloadsize < 0: - msg = 'negative payload chunk size: %i' % payloadsize + if payloadsize == flaginterrupt: + # interruption detection, the handler will now read a + # single part and process it. + interrupthandler(self.ui, self._fp)() + elif payloadsize < 0: + msg = 'negative payload chunk size: %i' % payloadsize raise error.BundleValueError(msg) - yield self._readexact(payloadsize) + else: + yield self._readexact(payloadsize) payloadsize = self._unpack(_fpayloadsize)[0] self.ui.debug('payload chunk size: %i\n' % payloadsize) self._payloadstream = util.chunkbuffer(payloadchunks())