# HG changeset patch # User Pierre-Yves David # Date 2014-10-14 17:47:47 # Node ID ad144882318db801ebdb0c5d2ccd61254a7c9e6b # Parent 963f311e3a81aafff0835dffdcc7d244e70639c6 bundle2: add a interrupt mechanism It is now possible to emit a single part in the middle of a payload production. This part will be processed with limitation (only access to a `ui` object). The goal is to let the server raise exception and output while a part is being processed. The source motivation is to transmit exception that occurs while generating a part. This change is was the motivation to bump the bundle2 format from HG2X to HG2Y. Somehow, the format bump made it into 3.2 without it. So this change go on stable. It is low risk as bundle2 is still disabled by default. diff --git a/mercurial/bundle2.py b/mercurial/bundle2.py --- a/mercurial/bundle2.py +++ b/mercurial/bundle2.py @@ -695,6 +695,61 @@ class bundlepart(object): elif len(self.data): yield self.data + +flaginterrupt = -1 + +class interrupthandler(unpackermixin): + """read one part and process it with restricted capability + + This allows to transmit exception raised on the producer size during part + iteration while the consumer is reading a part. + + Part processed in this manner only have access to a ui object,""" + + def __init__(self, ui, fp): + super(interrupthandler, self).__init__(fp) + self.ui = ui + + def _readpartheader(self): + """reads a part header size and return the bytes blob + + returns None if empty""" + headersize = self._unpack(_fpartheadersize)[0] + if headersize < 0: + raise error.BundleValueError('negative part header size: %i' + % headersize) + self.ui.debug('part header size: %i\n' % headersize) + if headersize: + return self._readexact(headersize) + return None + + def __call__(self): + self.ui.debug('bundle2 stream interruption, looking for a part.\n') + headerblock = self._readpartheader() + if headerblock is None: + self.ui.debug('no part found during iterruption.\n') + return + part = unbundlepart(self.ui, headerblock, self._fp) + op = interruptoperation(self.ui) + _processpart(op, part) + +class interruptoperation(object): + """A limited operation to be use by part handler during interruption + + It only have access to an ui object. + """ + + def __init__(self, ui): + self.ui = ui + self.reply = None + + @property + def repo(self): + raise RuntimeError('no repo access from stream interruption') + + def gettransaction(self): + raise TransactionUnavailable('no repo access from stream interruption') + class unbundlepart(unpackermixin): """a bundle part read from a bundle""" @@ -772,10 +827,15 @@ class unbundlepart(unpackermixin): payloadsize = self._unpack(_fpayloadsize)[0] self.ui.debug('payload chunk size: %i\n' % payloadsize) while payloadsize: - if payloadsize < 0: - msg = 'negative payload chunk size: %i' % payloadsize + if payloadsize == flaginterrupt: + # interruption detection, the handler will now read a + # single part and process it. + interrupthandler(self.ui, self._fp)() + elif payloadsize < 0: + msg = 'negative payload chunk size: %i' % payloadsize raise error.BundleValueError(msg) - yield self._readexact(payloadsize) + else: + yield self._readexact(payloadsize) payloadsize = self._unpack(_fpayloadsize)[0] self.ui.debug('payload chunk size: %i\n' % payloadsize) self._payloadstream = util.chunkbuffer(payloadchunks())