##// END OF EJS Templates
bundle2: lazy unbundle of part payload...
Pierre-Yves David -
r21019:3dc09f83 default
parent child Browse files
Show More
@@ -288,6 +288,7 b' def processbundle(repo, unbundler, trans'
288 # - exception catching
288 # - exception catching
289 unbundler.params
289 unbundler.params
290 iterparts = iter(unbundler)
290 iterparts = iter(unbundler)
291 part = None
291 try:
292 try:
292 for part in iterparts:
293 for part in iterparts:
293 parttype = part.type
294 parttype = part.type
@@ -302,8 +303,8 b' def processbundle(repo, unbundler, trans'
302 # - use a more precise exception
303 # - use a more precise exception
303 raise
304 raise
304 op.ui.debug('ignoring unknown advisory part %r\n' % key)
305 op.ui.debug('ignoring unknown advisory part %r\n' % key)
305 # todo:
306 # consuming the part
306 # - consume the part once we use streaming
307 part.read()
307 continue
308 continue
308
309
309 # handler is called outside the above try block so that we don't
310 # handler is called outside the above try block so that we don't
@@ -311,9 +312,14 b' def processbundle(repo, unbundler, trans'
311 # parthandlermapping lookup (any KeyError raised by handler()
312 # parthandlermapping lookup (any KeyError raised by handler()
312 # itself represents a defect of a different variety).
313 # itself represents a defect of a different variety).
313 handler(op, part)
314 handler(op, part)
315 part.read()
314 except Exception:
316 except Exception:
317 if part is not None:
318 # consume the bundle content
319 part.read()
315 for part in iterparts:
320 for part in iterparts:
316 pass # consume the bundle content
321 # consume the bundle content
322 part.read()
317 raise
323 raise
318 return op
324 return op
319
325
@@ -544,19 +550,21 b' class unbundlepart(unpackermixin):'
544 # unbundle state attr
550 # unbundle state attr
545 self._headerdata = header
551 self._headerdata = header
546 self._headeroffset = 0
552 self._headeroffset = 0
553 self._initialized = False
554 self.consumed = False
547 # part data
555 # part data
548 self.id = None
556 self.id = None
549 self.type = None
557 self.type = None
550 self.mandatoryparams = None
558 self.mandatoryparams = None
551 self.advisoryparams = None
559 self.advisoryparams = None
552 self.data = None
560 self._payloadstream = None
553 self._readdata()
561 self._readheader()
554
562
555 def _fromheader(self, size):
563 def _fromheader(self, size):
556 """return the next <size> byte from the header"""
564 """return the next <size> byte from the header"""
557 offset = self._headeroffset
565 offset = self._headeroffset
558 data = self._headerdata[offset:(offset + size)]
566 data = self._headerdata[offset:(offset + size)]
559 self._headeroffset += size
567 self._headeroffset = offset + size
560 return data
568 return data
561
569
562 def _unpackheader(self, format):
570 def _unpackheader(self, format):
@@ -566,10 +574,8 b' class unbundlepart(unpackermixin):'
566 data = self._fromheader(struct.calcsize(format))
574 data = self._fromheader(struct.calcsize(format))
567 return _unpack(format, data)
575 return _unpack(format, data)
568
576
569 def _readdata(self):
577 def _readheader(self):
570 """read the header and setup the object"""
578 """read the header and setup the object"""
571 # some utility to help reading from the header block
572
573 typesize = self._unpackheader(_fparttypesize)[0]
579 typesize = self._unpackheader(_fparttypesize)[0]
574 self.type = self._fromheader(typesize)
580 self.type = self._fromheader(typesize)
575 self.ui.debug('part type: "%s"\n' % self.type)
581 self.ui.debug('part type: "%s"\n' % self.type)
@@ -597,14 +603,29 b' class unbundlepart(unpackermixin):'
597 self.mandatoryparams = manparams
603 self.mandatoryparams = manparams
598 self.advisoryparams = advparams
604 self.advisoryparams = advparams
599 ## part payload
605 ## part payload
600 payload = []
606 def payloadchunks():
601 payloadsize = self._unpack(_fpayloadsize)[0]
602 self.ui.debug('payload chunk size: %i\n' % payloadsize)
603 while payloadsize:
604 payload.append(self._readexact(payloadsize))
605 payloadsize = self._unpack(_fpayloadsize)[0]
607 payloadsize = self._unpack(_fpayloadsize)[0]
606 self.ui.debug('payload chunk size: %i\n' % payloadsize)
608 self.ui.debug('payload chunk size: %i\n' % payloadsize)
607 self.data = ''.join(payload)
609 while payloadsize:
610 yield self._readexact(payloadsize)
611 payloadsize = self._unpack(_fpayloadsize)[0]
612 self.ui.debug('payload chunk size: %i\n' % payloadsize)
613 self._payloadstream = util.chunkbuffer(payloadchunks())
614 # we read the data, tell it
615 self._initialized = True
616
617 def read(self, size=None):
618 """read payload data"""
619 if not self._initialized:
620 self._readheader()
621 if size is None:
622 data = self._payloadstream.read()
623 else:
624 data = self._payloadstream.read(size)
625 if size is None or len(data) < size:
626 self.consumed = True
627 return data
628
608
629
609 @parthandler('changegroup')
630 @parthandler('changegroup')
610 def handlechangegroup(op, inpart):
631 def handlechangegroup(op, inpart):
@@ -619,7 +640,7 b' def handlechangegroup(op, inpart):'
619 # we need to make sure we trigger the creation of a transaction object used
640 # we need to make sure we trigger the creation of a transaction object used
620 # for the whole processing scope.
641 # for the whole processing scope.
621 op.gettransaction()
642 op.gettransaction()
622 data = StringIO.StringIO(inpart.data)
643 data = StringIO.StringIO(inpart.read())
623 data.seek(0)
644 data.seek(0)
624 cg = changegroup.readbundle(data, 'bundle2part')
645 cg = changegroup.readbundle(data, 'bundle2part')
625 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
646 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
@@ -631,6 +652,7 b' def handlechangegroup(op, inpart):'
631 [('in-reply-to', str(inpart.id)),
652 [('in-reply-to', str(inpart.id)),
632 ('return', '%i' % ret)])
653 ('return', '%i' % ret)])
633 op.reply.addpart(part)
654 op.reply.addpart(part)
655 assert not inpart.read()
634
656
635 @parthandler('reply:changegroup')
657 @parthandler('reply:changegroup')
636 def handlechangegroup(op, inpart):
658 def handlechangegroup(op, inpart):
@@ -28,7 +28,7 b' Create an extension to test bundle2 API'
28 > """handle a "test:song" bundle2 part, printing the lyrics on stdin"""
28 > """handle a "test:song" bundle2 part, printing the lyrics on stdin"""
29 > op.ui.write('The choir starts singing:\n')
29 > op.ui.write('The choir starts singing:\n')
30 > verses = 0
30 > verses = 0
31 > for line in part.data.split('\n'):
31 > for line in part.read().split('\n'):
32 > op.ui.write(' %s\n' % line)
32 > op.ui.write(' %s\n' % line)
33 > verses += 1
33 > verses += 1
34 > op.records.add('song', {'verses': verses})
34 > op.records.add('song', {'verses': verses})
@@ -152,7 +152,7 b' Create an extension to test bundle2 API'
152 > ui.write(' :%s:\n' % p.type)
152 > ui.write(' :%s:\n' % p.type)
153 > ui.write(' mandatory: %i\n' % len(p.mandatoryparams))
153 > ui.write(' mandatory: %i\n' % len(p.mandatoryparams))
154 > ui.write(' advisory: %i\n' % len(p.advisoryparams))
154 > ui.write(' advisory: %i\n' % len(p.advisoryparams))
155 > ui.write(' payload: %i bytes\n' % len(p.data))
155 > ui.write(' payload: %i bytes\n' % len(p.read()))
156 > ui.write('parts count: %i\n' % count)
156 > ui.write('parts count: %i\n' % count)
157 > EOF
157 > EOF
158 $ cat >> $HGRCPATH << EOF
158 $ cat >> $HGRCPATH << EOF
@@ -378,48 +378,48 b' Test part'
378 part type: "test:empty"
378 part type: "test:empty"
379 part id: "0"
379 part id: "0"
380 part parameters: 0
380 part parameters: 0
381 payload chunk size: 0
382 :test:empty:
381 :test:empty:
383 mandatory: 0
382 mandatory: 0
384 advisory: 0
383 advisory: 0
384 payload chunk size: 0
385 payload: 0 bytes
385 payload: 0 bytes
386 part header size: 17
386 part header size: 17
387 part type: "test:empty"
387 part type: "test:empty"
388 part id: "1"
388 part id: "1"
389 part parameters: 0
389 part parameters: 0
390 payload chunk size: 0
391 :test:empty:
390 :test:empty:
392 mandatory: 0
391 mandatory: 0
393 advisory: 0
392 advisory: 0
393 payload chunk size: 0
394 payload: 0 bytes
394 payload: 0 bytes
395 part header size: 16
395 part header size: 16
396 part type: "test:song"
396 part type: "test:song"
397 part id: "2"
397 part id: "2"
398 part parameters: 0
398 part parameters: 0
399 payload chunk size: 178
400 payload chunk size: 0
401 :test:song:
399 :test:song:
402 mandatory: 0
400 mandatory: 0
403 advisory: 0
401 advisory: 0
402 payload chunk size: 178
403 payload chunk size: 0
404 payload: 178 bytes
404 payload: 178 bytes
405 part header size: 43
405 part header size: 43
406 part type: "test:math"
406 part type: "test:math"
407 part id: "3"
407 part id: "3"
408 part parameters: 3
408 part parameters: 3
409 payload chunk size: 2
410 payload chunk size: 0
411 :test:math:
409 :test:math:
412 mandatory: 2
410 mandatory: 2
413 advisory: 1
411 advisory: 1
412 payload chunk size: 2
413 payload chunk size: 0
414 payload: 2 bytes
414 payload: 2 bytes
415 part header size: 16
415 part header size: 16
416 part type: "test:ping"
416 part type: "test:ping"
417 part id: "4"
417 part id: "4"
418 part parameters: 0
418 part parameters: 0
419 payload chunk size: 0
420 :test:ping:
419 :test:ping:
421 mandatory: 0
420 mandatory: 0
422 advisory: 0
421 advisory: 0
422 payload chunk size: 0
423 payload: 0 bytes
423 payload: 0 bytes
424 part header size: 0
424 part header size: 0
425 end of bundle2 stream
425 end of bundle2 stream
@@ -438,22 +438,22 b' Process the bundle'
438 part type: "test:empty"
438 part type: "test:empty"
439 part id: "0"
439 part id: "0"
440 part parameters: 0
440 part parameters: 0
441 ignoring unknown advisory part 'test:empty'
441 payload chunk size: 0
442 payload chunk size: 0
442 ignoring unknown advisory part 'test:empty'
443 part header size: 17
443 part header size: 17
444 part type: "test:empty"
444 part type: "test:empty"
445 part id: "1"
445 part id: "1"
446 part parameters: 0
446 part parameters: 0
447 ignoring unknown advisory part 'test:empty'
447 payload chunk size: 0
448 payload chunk size: 0
448 ignoring unknown advisory part 'test:empty'
449 part header size: 16
449 part header size: 16
450 part type: "test:song"
450 part type: "test:song"
451 part id: "2"
451 part id: "2"
452 part parameters: 0
452 part parameters: 0
453 found a handler for part 'test:song'
454 The choir starts singing:
453 payload chunk size: 178
455 payload chunk size: 178
454 payload chunk size: 0
456 payload chunk size: 0
455 found a handler for part 'test:song'
456 The choir starts singing:
457 Patali Dirapata, Cromda Cromda Ripalo, Pata Pata, Ko Ko Ko
457 Patali Dirapata, Cromda Cromda Ripalo, Pata Pata, Ko Ko Ko
458 Bokoro Dipoulito, Rondi Rondi Pepino, Pata Pata, Ko Ko Ko
458 Bokoro Dipoulito, Rondi Rondi Pepino, Pata Pata, Ko Ko Ko
459 Emana Karassoli, Loucra Loucra Ponponto, Pata Pata, Ko Ko Ko.
459 Emana Karassoli, Loucra Loucra Ponponto, Pata Pata, Ko Ko Ko.
@@ -461,16 +461,16 b' Process the bundle'
461 part type: "test:math"
461 part type: "test:math"
462 part id: "3"
462 part id: "3"
463 part parameters: 3
463 part parameters: 3
464 ignoring unknown advisory part 'test:math'
464 payload chunk size: 2
465 payload chunk size: 2
465 payload chunk size: 0
466 payload chunk size: 0
466 ignoring unknown advisory part 'test:math'
467 part header size: 16
467 part header size: 16
468 part type: "test:ping"
468 part type: "test:ping"
469 part id: "4"
469 part id: "4"
470 part parameters: 0
470 part parameters: 0
471 payload chunk size: 0
472 found a handler for part 'test:ping'
471 found a handler for part 'test:ping'
473 received ping request (id 4)
472 received ping request (id 4)
473 payload chunk size: 0
474 part header size: 0
474 part header size: 0
475 end of bundle2 stream
475 end of bundle2 stream
476 0 unread bytes
476 0 unread bytes
General Comments 0
You need to be logged in to leave comments. Login now