# HG changeset patch # User Pierre-Yves David # Date 2014-04-11 20:05:22 # Node ID 3dc09f831a2ed18de94b97c2fd568cf1994e6230 # Parent c848bfd02366b38add0dc264759d2817c85baab2 bundle2: lazy unbundle of part payload The `unbundle` part gains a `read` method to retrieve payload content. This method behaves as a python file-like read method. The bundle-processing code is updated to make sure a part is fully consumed before another one is extracted. Test output changes because the debug output is even more interleaved now. diff --git a/mercurial/bundle2.py b/mercurial/bundle2.py --- a/mercurial/bundle2.py +++ b/mercurial/bundle2.py @@ -288,6 +288,7 @@ def processbundle(repo, unbundler, trans # - exception catching unbundler.params iterparts = iter(unbundler) + part = None try: for part in iterparts: parttype = part.type @@ -302,8 +303,8 @@ def processbundle(repo, unbundler, trans # - use a more precise exception raise op.ui.debug('ignoring unknown advisory part %r\n' % key) - # todo: - # - consume the part once we use streaming + # consuming the part + part.read() continue # handler is called outside the above try block so that we don't @@ -311,9 +312,14 @@ def processbundle(repo, unbundler, trans # parthandlermapping lookup (any KeyError raised by handler() # itself represents a defect of a different variety). handler(op, part) + part.read() except Exception: + if part is not None: + # consume the bundle content + part.read() for part in iterparts: - pass # consume the bundle content + # consume the bundle content + part.read() raise return op @@ -544,19 +550,21 @@ class unbundlepart(unpackermixin): # unbundle state attr self._headerdata = header self._headeroffset = 0 + self._initialized = False + self.consumed = False # part data self.id = None self.type = None self.mandatoryparams = None self.advisoryparams = None - self.data = None - self._readdata() + self._payloadstream = None + self._readheader() def _fromheader(self, size): """return the next byte from the header""" offset = self._headeroffset data = self._headerdata[offset:(offset + size)] - self._headeroffset += size + self._headeroffset = offset + size return data def _unpackheader(self, format): @@ -566,10 +574,8 @@ class unbundlepart(unpackermixin): data = self._fromheader(struct.calcsize(format)) return _unpack(format, data) - def _readdata(self): + def _readheader(self): """read the header and setup the object""" - # some utility to help reading from the header block - typesize = self._unpackheader(_fparttypesize)[0] self.type = self._fromheader(typesize) self.ui.debug('part type: "%s"\n' % self.type) @@ -597,14 +603,29 @@ class unbundlepart(unpackermixin): self.mandatoryparams = manparams self.advisoryparams = advparams ## part payload - payload = [] - payloadsize = self._unpack(_fpayloadsize)[0] - self.ui.debug('payload chunk size: %i\n' % payloadsize) - while payloadsize: - payload.append(self._readexact(payloadsize)) + def payloadchunks(): payloadsize = self._unpack(_fpayloadsize)[0] self.ui.debug('payload chunk size: %i\n' % payloadsize) - self.data = ''.join(payload) + while payloadsize: + yield self._readexact(payloadsize) + payloadsize = self._unpack(_fpayloadsize)[0] + self.ui.debug('payload chunk size: %i\n' % payloadsize) + self._payloadstream = util.chunkbuffer(payloadchunks()) + # we read the data, tell it + self._initialized = True + + def read(self, size=None): + """read payload data""" + if not self._initialized: + self._readheader() + if size is None: + data = self._payloadstream.read() + else: + data = self._payloadstream.read(size) + if size is None or len(data) < size: + self.consumed = True + return data + @parthandler('changegroup') def handlechangegroup(op, inpart): @@ -619,7 +640,7 @@ def handlechangegroup(op, inpart): # we need to make sure we trigger the creation of a transaction object used # for the whole processing scope. op.gettransaction() - data = StringIO.StringIO(inpart.data) + data = StringIO.StringIO(inpart.read()) data.seek(0) cg = changegroup.readbundle(data, 'bundle2part') ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2') @@ -631,6 +652,7 @@ def handlechangegroup(op, inpart): [('in-reply-to', str(inpart.id)), ('return', '%i' % ret)]) op.reply.addpart(part) + assert not inpart.read() @parthandler('reply:changegroup') def handlechangegroup(op, inpart): diff --git a/tests/test-bundle2.t b/tests/test-bundle2.t --- a/tests/test-bundle2.t +++ b/tests/test-bundle2.t @@ -28,7 +28,7 @@ Create an extension to test bundle2 API > """handle a "test:song" bundle2 part, printing the lyrics on stdin""" > op.ui.write('The choir starts singing:\n') > verses = 0 - > for line in part.data.split('\n'): + > for line in part.read().split('\n'): > op.ui.write(' %s\n' % line) > verses += 1 > op.records.add('song', {'verses': verses}) @@ -152,7 +152,7 @@ Create an extension to test bundle2 API > ui.write(' :%s:\n' % p.type) > ui.write(' mandatory: %i\n' % len(p.mandatoryparams)) > ui.write(' advisory: %i\n' % len(p.advisoryparams)) - > ui.write(' payload: %i bytes\n' % len(p.data)) + > ui.write(' payload: %i bytes\n' % len(p.read())) > ui.write('parts count: %i\n' % count) > EOF $ cat >> $HGRCPATH << EOF @@ -378,48 +378,48 @@ Test part part type: "test:empty" part id: "0" part parameters: 0 - payload chunk size: 0 :test:empty: mandatory: 0 advisory: 0 + payload chunk size: 0 payload: 0 bytes part header size: 17 part type: "test:empty" part id: "1" part parameters: 0 - payload chunk size: 0 :test:empty: mandatory: 0 advisory: 0 + payload chunk size: 0 payload: 0 bytes part header size: 16 part type: "test:song" part id: "2" part parameters: 0 - payload chunk size: 178 - payload chunk size: 0 :test:song: mandatory: 0 advisory: 0 + payload chunk size: 178 + payload chunk size: 0 payload: 178 bytes part header size: 43 part type: "test:math" part id: "3" part parameters: 3 - payload chunk size: 2 - payload chunk size: 0 :test:math: mandatory: 2 advisory: 1 + payload chunk size: 2 + payload chunk size: 0 payload: 2 bytes part header size: 16 part type: "test:ping" part id: "4" part parameters: 0 - payload chunk size: 0 :test:ping: mandatory: 0 advisory: 0 + payload chunk size: 0 payload: 0 bytes part header size: 0 end of bundle2 stream @@ -438,22 +438,22 @@ Process the bundle part type: "test:empty" part id: "0" part parameters: 0 + ignoring unknown advisory part 'test:empty' payload chunk size: 0 - ignoring unknown advisory part 'test:empty' part header size: 17 part type: "test:empty" part id: "1" part parameters: 0 + ignoring unknown advisory part 'test:empty' payload chunk size: 0 - ignoring unknown advisory part 'test:empty' part header size: 16 part type: "test:song" part id: "2" part parameters: 0 + found a handler for part 'test:song' + The choir starts singing: payload chunk size: 178 payload chunk size: 0 - found a handler for part 'test:song' - The choir starts singing: Patali Dirapata, Cromda Cromda Ripalo, Pata Pata, Ko Ko Ko Bokoro Dipoulito, Rondi Rondi Pepino, Pata Pata, Ko Ko Ko Emana Karassoli, Loucra Loucra Ponponto, Pata Pata, Ko Ko Ko. @@ -461,16 +461,16 @@ Process the bundle part type: "test:math" part id: "3" part parameters: 3 + ignoring unknown advisory part 'test:math' payload chunk size: 2 payload chunk size: 0 - ignoring unknown advisory part 'test:math' part header size: 16 part type: "test:ping" part id: "4" part parameters: 0 - payload chunk size: 0 found a handler for part 'test:ping' received ping request (id 4) + payload chunk size: 0 part header size: 0 end of bundle2 stream 0 unread bytes