diff --git a/mercurial/bundle2.py b/mercurial/bundle2.py --- a/mercurial/bundle2.py +++ b/mercurial/bundle2.py @@ -1187,6 +1187,32 @@ class interruptoperation(object): def gettransaction(self): raise TransactionUnavailable('no repo access from stream interruption') +def decodepayloadchunks(ui, fh): + """Reads bundle2 part payload data into chunks. + + Part payload data consists of framed chunks. This function takes + a file handle and emits those chunks. + """ + headersize = struct.calcsize(_fpayloadsize) + readexactly = changegroup.readexactly + + chunksize = _unpack(_fpayloadsize, readexactly(fh, headersize))[0] + indebug(ui, 'payload chunk size: %i' % chunksize) + + while chunksize: + if chunksize >= 0: + yield readexactly(fh, chunksize) + elif chunksize == flaginterrupt: + # Interrupt "signal" detected. The regular stream is interrupted + # and a bundle2 part follows. Consume it. + interrupthandler(ui, fh)() + else: + raise error.BundleValueError( + 'negative payload chunk size: %s' % chunksize) + + chunksize = _unpack(_fpayloadsize, readexactly(fh, headersize))[0] + indebug(ui, 'payload chunk size: %i' % chunksize) + class unbundlepart(unpackermixin): """a bundle part read from a bundle""" @@ -1270,6 +1296,10 @@ class unbundlepart(unpackermixin): # we read the data, tell it self._initialized = True + def _payloadchunks(self): + """Generator of decoded chunks in the payload.""" + return decodepayloadchunks(self.ui, self._fp) + def read(self, size=None): """read payload data""" if not self._initialized: @@ -1320,25 +1350,14 @@ class seekableunbundlepart(unbundlepart) self._seekfp(self._chunkindex[chunknum][1]) pos = self._chunkindex[chunknum][0] - payloadsize = self._unpack(_fpayloadsize)[0] - indebug(self.ui, 'payload chunk size: %i' % payloadsize) - while payloadsize: - if payloadsize == flaginterrupt: - # interruption detection, the handler will now read a - # single part and process it. - interrupthandler(self.ui, self._fp)() - elif payloadsize < 0: - msg = 'negative payload chunk size: %i' % payloadsize - raise error.BundleValueError(msg) - else: - result = self._readexact(payloadsize) - chunknum += 1 - pos += payloadsize - if chunknum == len(self._chunkindex): - self._chunkindex.append((pos, self._tellfp())) - yield result - payloadsize = self._unpack(_fpayloadsize)[0] - indebug(self.ui, 'payload chunk size: %i' % payloadsize) + + for chunk in decodepayloadchunks(self.ui, self._fp): + chunknum += 1 + pos += len(chunk) + if chunknum == len(self._chunkindex): + self._chunkindex.append((pos, self._tellfp())) + + yield chunk def _findchunk(self, pos): '''for a given payload position, return a chunk number and offset'''