# HG changeset patch # User Gregory Szorc # Date 2017-11-13 03:46:15 # Node ID 8aa43ff9c12c637e4a9837503675fb2ce1b8c86a # Parent 073eec083e2503a7c0ef9b694bec13cd9754bffa bundle2: implement generic part payload decoder The previous commit extracted _payloadchunks() to a new derived class. There was still a reference to this method in unbundlepart, making unbundlepart unusable on its own. This commit implements a generic version of a bundle2 part payload decoder, without offset tracking. seekableunbundlepart._payloadchunks() has been refactored to consume it, adding offset tracking like before. We also implement unbundlepart._payloadchunks(), which is a thin wrapper for it. Since we never instantiate unbundlepart directly, this new method is not used. This will be changed in subsequent commits. The new implementation also inlines some simple code from unpackermixin and adds some local variable to prevent extra function calls and attribute lookups. `hg perfbundleread` on an uncompressed Firefox bundle seems to show a minor win: ! bundle2 iterparts() ! wall 12.593258 comb 12.250000 user 8.870000 sys 3.380000 (best of 3) ! wall 10.891305 comb 10.820000 user 7.990000 sys 2.830000 (best of 3) ! bundle2 part seek() ! wall 13.173163 comb 11.100000 user 8.390000 sys 2.710000 (best of 3) ! wall 12.991478 comb 10.390000 user 7.720000 sys 2.670000 (best of 3) ! bundle2 part read(8k) ! wall 9.483612 comb 9.480000 user 8.420000 sys 1.060000 (best of 3) ! wall 8.599892 comb 8.580000 user 7.720000 sys 0.860000 (best of 3) ! bundle2 part read(16k) ! wall 9.159815 comb 9.150000 user 8.220000 sys 0.930000 (best of 3) ! wall 8.265361 comb 8.250000 user 7.360000 sys 0.890000 (best of 3) ! bundle2 part read(32k) ! wall 9.141308 comb 9.130000 user 8.220000 sys 0.910000 (best of 3) ! wall 8.290308 comb 8.280000 user 7.330000 sys 0.950000 (best of 3) ! bundle2 part read(128k) ! wall 8.880587 comb 8.850000 user 7.960000 sys 0.890000 (best of 3) ! wall 8.204900 comb 8.150000 user 7.210000 sys 0.940000 (best of 3) Function call overhead in Python strikes again! Of course, bundle2 decoding CPU overhead is likely small compared to decompression and changegroup application. But every little bit helps. Differential Revision: https://phab.mercurial-scm.org/D1387 diff --git a/mercurial/bundle2.py b/mercurial/bundle2.py --- a/mercurial/bundle2.py +++ b/mercurial/bundle2.py @@ -1187,6 +1187,32 @@ class interruptoperation(object): def gettransaction(self): raise TransactionUnavailable('no repo access from stream interruption') +def decodepayloadchunks(ui, fh): + """Reads bundle2 part payload data into chunks. + + Part payload data consists of framed chunks. This function takes + a file handle and emits those chunks. + """ + headersize = struct.calcsize(_fpayloadsize) + readexactly = changegroup.readexactly + + chunksize = _unpack(_fpayloadsize, readexactly(fh, headersize))[0] + indebug(ui, 'payload chunk size: %i' % chunksize) + + while chunksize: + if chunksize >= 0: + yield readexactly(fh, chunksize) + elif chunksize == flaginterrupt: + # Interrupt "signal" detected. The regular stream is interrupted + # and a bundle2 part follows. Consume it. + interrupthandler(ui, fh)() + else: + raise error.BundleValueError( + 'negative payload chunk size: %s' % chunksize) + + chunksize = _unpack(_fpayloadsize, readexactly(fh, headersize))[0] + indebug(ui, 'payload chunk size: %i' % chunksize) + class unbundlepart(unpackermixin): """a bundle part read from a bundle""" @@ -1270,6 +1296,10 @@ class unbundlepart(unpackermixin): # we read the data, tell it self._initialized = True + def _payloadchunks(self): + """Generator of decoded chunks in the payload.""" + return decodepayloadchunks(self.ui, self._fp) + def read(self, size=None): """read payload data""" if not self._initialized: @@ -1320,25 +1350,14 @@ class seekableunbundlepart(unbundlepart) self._seekfp(self._chunkindex[chunknum][1]) pos = self._chunkindex[chunknum][0] - payloadsize = self._unpack(_fpayloadsize)[0] - indebug(self.ui, 'payload chunk size: %i' % payloadsize) - while payloadsize: - if payloadsize == flaginterrupt: - # interruption detection, the handler will now read a - # single part and process it. - interrupthandler(self.ui, self._fp)() - elif payloadsize < 0: - msg = 'negative payload chunk size: %i' % payloadsize - raise error.BundleValueError(msg) - else: - result = self._readexact(payloadsize) - chunknum += 1 - pos += payloadsize - if chunknum == len(self._chunkindex): - self._chunkindex.append((pos, self._tellfp())) - yield result - payloadsize = self._unpack(_fpayloadsize)[0] - indebug(self.ui, 'payload chunk size: %i' % payloadsize) + + for chunk in decodepayloadchunks(self.ui, self._fp): + chunknum += 1 + pos += len(chunk) + if chunknum == len(self._chunkindex): + self._chunkindex.append((pos, self._tellfp())) + + yield chunk def _findchunk(self, pos): '''for a given payload position, return a chunk number and offset'''