##// END OF EJS Templates
bundle2: lazy unbundle of part payload...
Pierre-Yves David -
r21019:3dc09f83 default
parent child Browse files
Show More
@@ -288,6 +288,7 b' def processbundle(repo, unbundler, trans'
288 288 # - exception catching
289 289 unbundler.params
290 290 iterparts = iter(unbundler)
291 part = None
291 292 try:
292 293 for part in iterparts:
293 294 parttype = part.type
@@ -302,8 +303,8 b' def processbundle(repo, unbundler, trans'
302 303 # - use a more precise exception
303 304 raise
304 305 op.ui.debug('ignoring unknown advisory part %r\n' % key)
305 # todo:
306 # - consume the part once we use streaming
306 # consuming the part
307 part.read()
307 308 continue
308 309
309 310 # handler is called outside the above try block so that we don't
@@ -311,9 +312,14 b' def processbundle(repo, unbundler, trans'
311 312 # parthandlermapping lookup (any KeyError raised by handler()
312 313 # itself represents a defect of a different variety).
313 314 handler(op, part)
315 part.read()
314 316 except Exception:
317 if part is not None:
318 # consume the bundle content
319 part.read()
315 320 for part in iterparts:
316 pass # consume the bundle content
321 # consume the bundle content
322 part.read()
317 323 raise
318 324 return op
319 325
@@ -544,19 +550,21 b' class unbundlepart(unpackermixin):'
544 550 # unbundle state attr
545 551 self._headerdata = header
546 552 self._headeroffset = 0
553 self._initialized = False
554 self.consumed = False
547 555 # part data
548 556 self.id = None
549 557 self.type = None
550 558 self.mandatoryparams = None
551 559 self.advisoryparams = None
552 self.data = None
553 self._readdata()
560 self._payloadstream = None
561 self._readheader()
554 562
555 563 def _fromheader(self, size):
556 564 """return the next <size> byte from the header"""
557 565 offset = self._headeroffset
558 566 data = self._headerdata[offset:(offset + size)]
559 self._headeroffset += size
567 self._headeroffset = offset + size
560 568 return data
561 569
562 570 def _unpackheader(self, format):
@@ -566,10 +574,8 b' class unbundlepart(unpackermixin):'
566 574 data = self._fromheader(struct.calcsize(format))
567 575 return _unpack(format, data)
568 576
569 def _readdata(self):
577 def _readheader(self):
570 578 """read the header and setup the object"""
571 # some utility to help reading from the header block
572
573 579 typesize = self._unpackheader(_fparttypesize)[0]
574 580 self.type = self._fromheader(typesize)
575 581 self.ui.debug('part type: "%s"\n' % self.type)
@@ -597,14 +603,29 b' class unbundlepart(unpackermixin):'
597 603 self.mandatoryparams = manparams
598 604 self.advisoryparams = advparams
599 605 ## part payload
600 payload = []
601 payloadsize = self._unpack(_fpayloadsize)[0]
602 self.ui.debug('payload chunk size: %i\n' % payloadsize)
603 while payloadsize:
604 payload.append(self._readexact(payloadsize))
606 def payloadchunks():
605 607 payloadsize = self._unpack(_fpayloadsize)[0]
606 608 self.ui.debug('payload chunk size: %i\n' % payloadsize)
607 self.data = ''.join(payload)
609 while payloadsize:
610 yield self._readexact(payloadsize)
611 payloadsize = self._unpack(_fpayloadsize)[0]
612 self.ui.debug('payload chunk size: %i\n' % payloadsize)
613 self._payloadstream = util.chunkbuffer(payloadchunks())
614 # we read the data, tell it
615 self._initialized = True
616
617 def read(self, size=None):
618 """read payload data"""
619 if not self._initialized:
620 self._readheader()
621 if size is None:
622 data = self._payloadstream.read()
623 else:
624 data = self._payloadstream.read(size)
625 if size is None or len(data) < size:
626 self.consumed = True
627 return data
628
608 629
609 630 @parthandler('changegroup')
610 631 def handlechangegroup(op, inpart):
@@ -619,7 +640,7 b' def handlechangegroup(op, inpart):'
619 640 # we need to make sure we trigger the creation of a transaction object used
620 641 # for the whole processing scope.
621 642 op.gettransaction()
622 data = StringIO.StringIO(inpart.data)
643 data = StringIO.StringIO(inpart.read())
623 644 data.seek(0)
624 645 cg = changegroup.readbundle(data, 'bundle2part')
625 646 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
@@ -631,6 +652,7 b' def handlechangegroup(op, inpart):'
631 652 [('in-reply-to', str(inpart.id)),
632 653 ('return', '%i' % ret)])
633 654 op.reply.addpart(part)
655 assert not inpart.read()
634 656
635 657 @parthandler('reply:changegroup')
636 658 def handlechangegroup(op, inpart):
@@ -28,7 +28,7 b' Create an extension to test bundle2 API'
28 28 > """handle a "test:song" bundle2 part, printing the lyrics on stdin"""
29 29 > op.ui.write('The choir starts singing:\n')
30 30 > verses = 0
31 > for line in part.data.split('\n'):
31 > for line in part.read().split('\n'):
32 32 > op.ui.write(' %s\n' % line)
33 33 > verses += 1
34 34 > op.records.add('song', {'verses': verses})
@@ -152,7 +152,7 b' Create an extension to test bundle2 API'
152 152 > ui.write(' :%s:\n' % p.type)
153 153 > ui.write(' mandatory: %i\n' % len(p.mandatoryparams))
154 154 > ui.write(' advisory: %i\n' % len(p.advisoryparams))
155 > ui.write(' payload: %i bytes\n' % len(p.data))
155 > ui.write(' payload: %i bytes\n' % len(p.read()))
156 156 > ui.write('parts count: %i\n' % count)
157 157 > EOF
158 158 $ cat >> $HGRCPATH << EOF
@@ -378,48 +378,48 b' Test part'
378 378 part type: "test:empty"
379 379 part id: "0"
380 380 part parameters: 0
381 payload chunk size: 0
382 381 :test:empty:
383 382 mandatory: 0
384 383 advisory: 0
384 payload chunk size: 0
385 385 payload: 0 bytes
386 386 part header size: 17
387 387 part type: "test:empty"
388 388 part id: "1"
389 389 part parameters: 0
390 payload chunk size: 0
391 390 :test:empty:
392 391 mandatory: 0
393 392 advisory: 0
393 payload chunk size: 0
394 394 payload: 0 bytes
395 395 part header size: 16
396 396 part type: "test:song"
397 397 part id: "2"
398 398 part parameters: 0
399 payload chunk size: 178
400 payload chunk size: 0
401 399 :test:song:
402 400 mandatory: 0
403 401 advisory: 0
402 payload chunk size: 178
403 payload chunk size: 0
404 404 payload: 178 bytes
405 405 part header size: 43
406 406 part type: "test:math"
407 407 part id: "3"
408 408 part parameters: 3
409 payload chunk size: 2
410 payload chunk size: 0
411 409 :test:math:
412 410 mandatory: 2
413 411 advisory: 1
412 payload chunk size: 2
413 payload chunk size: 0
414 414 payload: 2 bytes
415 415 part header size: 16
416 416 part type: "test:ping"
417 417 part id: "4"
418 418 part parameters: 0
419 payload chunk size: 0
420 419 :test:ping:
421 420 mandatory: 0
422 421 advisory: 0
422 payload chunk size: 0
423 423 payload: 0 bytes
424 424 part header size: 0
425 425 end of bundle2 stream
@@ -438,22 +438,22 b' Process the bundle'
438 438 part type: "test:empty"
439 439 part id: "0"
440 440 part parameters: 0
441 ignoring unknown advisory part 'test:empty'
441 442 payload chunk size: 0
442 ignoring unknown advisory part 'test:empty'
443 443 part header size: 17
444 444 part type: "test:empty"
445 445 part id: "1"
446 446 part parameters: 0
447 ignoring unknown advisory part 'test:empty'
447 448 payload chunk size: 0
448 ignoring unknown advisory part 'test:empty'
449 449 part header size: 16
450 450 part type: "test:song"
451 451 part id: "2"
452 452 part parameters: 0
453 found a handler for part 'test:song'
454 The choir starts singing:
453 455 payload chunk size: 178
454 456 payload chunk size: 0
455 found a handler for part 'test:song'
456 The choir starts singing:
457 457 Patali Dirapata, Cromda Cromda Ripalo, Pata Pata, Ko Ko Ko
458 458 Bokoro Dipoulito, Rondi Rondi Pepino, Pata Pata, Ko Ko Ko
459 459 Emana Karassoli, Loucra Loucra Ponponto, Pata Pata, Ko Ko Ko.
@@ -461,16 +461,16 b' Process the bundle'
461 461 part type: "test:math"
462 462 part id: "3"
463 463 part parameters: 3
464 ignoring unknown advisory part 'test:math'
464 465 payload chunk size: 2
465 466 payload chunk size: 0
466 ignoring unknown advisory part 'test:math'
467 467 part header size: 16
468 468 part type: "test:ping"
469 469 part id: "4"
470 470 part parameters: 0
471 payload chunk size: 0
472 471 found a handler for part 'test:ping'
473 472 received ping request (id 4)
473 payload chunk size: 0
474 474 part header size: 0
475 475 end of bundle2 stream
476 476 0 unread bytes
General Comments 0
You need to be logged in to leave comments. Login now