##// END OF EJS Templates
bundle2: track life cycle of parts...
Pierre-Yves David -
r21601:7ff01bef default
parent child Browse files
Show More
@@ -1,774 +1,784 b''
1 1 # bundle2.py - generic container format to transmit arbitrary data.
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7 """Handling of the new bundle2 format
8 8
9 9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 11 that will be handed to and processed by the application layer.
12 12
13 13
14 14 General format architecture
15 15 ===========================
16 16
17 17 The format is architectured as follow
18 18
19 19 - magic string
20 20 - stream level parameters
21 21 - payload parts (any number)
22 22 - end of stream marker.
23 23
24 24 the Binary format
25 25 ============================
26 26
27 27 All numbers are unsigned and big-endian.
28 28
29 29 stream level parameters
30 30 ------------------------
31 31
32 32 Binary format is as follow
33 33
34 34 :params size: (16 bits integer)
35 35
36 36 The total number of Bytes used by the parameters
37 37
38 38 :params value: arbitrary number of Bytes
39 39
40 40 A blob of `params size` containing the serialized version of all stream level
41 41 parameters.
42 42
43 43 The blob contains a space separated list of parameters. Parameters with value
44 44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45 45
46 46 Empty name are obviously forbidden.
47 47
48 48 Name MUST start with a letter. If this first letter is lower case, the
49 49 parameter is advisory and can be safely ignored. However when the first
50 50 letter is capital, the parameter is mandatory and the bundling process MUST
51 51 stop if he is not able to proceed it.
52 52
53 53 Stream parameters use a simple textual format for two main reasons:
54 54
55 55 - Stream level parameters should remain simple and we want to discourage any
56 56 crazy usage.
57 57 - Textual data allow easy human inspection of a bundle2 header in case of
58 58 troubles.
59 59
60 60 Any Applicative level options MUST go into a bundle2 part instead.
61 61
62 62 Payload part
63 63 ------------------------
64 64
65 65 Binary format is as follow
66 66
67 67 :header size: (16 bits inter)
68 68
69 69 The total number of Bytes used by the part headers. When the header is empty
70 70 (size = 0) this is interpreted as the end of stream marker.
71 71
72 72 :header:
73 73
74 74 The header defines how to interpret the part. It contains two piece of
75 75 data: the part type, and the part parameters.
76 76
77 77 The part type is used to route an application level handler, that can
78 78 interpret payload.
79 79
80 80 Part parameters are passed to the application level handler. They are
81 81 meant to convey information that will help the application level object to
82 82 interpret the part payload.
83 83
84 84 The binary format of the header is has follow
85 85
86 86 :typesize: (one byte)
87 87
88 88 :parttype: alphanumerical part name
89 89
90 90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 91 to this part.
92 92
93 93 :parameters:
94 94
95 95 Part's parameter may have arbitrary content, the binary structure is::
96 96
97 97 <mandatory-count><advisory-count><param-sizes><param-data>
98 98
99 99 :mandatory-count: 1 byte, number of mandatory parameters
100 100
101 101 :advisory-count: 1 byte, number of advisory parameters
102 102
103 103 :param-sizes:
104 104
105 105 N couple of bytes, where N is the total number of parameters. Each
106 106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107 107
108 108 :param-data:
109 109
110 110 A blob of bytes from which each parameter key and value can be
111 111 retrieved using the list of size couples stored in the previous
112 112 field.
113 113
114 114 Mandatory parameters comes first, then the advisory ones.
115 115
116 116 :payload:
117 117
118 118 payload is a series of `<chunksize><chunkdata>`.
119 119
120 120 `chunksize` is a 32 bits integer, `chunkdata` are plain bytes (as much as
121 121 `chunksize` says)` The payload part is concluded by a zero size chunk.
122 122
123 123 The current implementation always produces either zero or one chunk.
124 124 This is an implementation limitation that will ultimately be lifted.
125 125
126 126 Bundle processing
127 127 ============================
128 128
129 129 Each part is processed in order using a "part handler". Handler are registered
130 130 for a certain part type.
131 131
132 132 The matching of a part to its handler is case insensitive. The case of the
133 133 part type is used to know if a part is mandatory or advisory. If the Part type
134 134 contains any uppercase char it is considered mandatory. When no handler is
135 135 known for a Mandatory part, the process is aborted and an exception is raised.
136 136 If the part is advisory and no handler is known, the part is ignored. When the
137 137 process is aborted, the full bundle is still read from the stream to keep the
138 138 channel usable. But none of the part read from an abort are processed. In the
139 139 future, dropping the stream may become an option for channel we do not care to
140 140 preserve.
141 141 """
142 142
143 143 import util
144 144 import struct
145 145 import urllib
146 146 import string
147 147
148 148 import changegroup, error
149 149 from i18n import _
150 150
151 151 _pack = struct.pack
152 152 _unpack = struct.unpack
153 153
154 154 _magicstring = 'HG2X'
155 155
156 156 _fstreamparamsize = '>H'
157 157 _fpartheadersize = '>H'
158 158 _fparttypesize = '>B'
159 159 _fpartid = '>I'
160 160 _fpayloadsize = '>I'
161 161 _fpartparamcount = '>BB'
162 162
163 163 preferedchunksize = 4096
164 164
165 165 def _makefpartparamsizes(nbparams):
166 166 """return a struct format to read part parameter sizes
167 167
168 168 The number parameters is variable so we need to build that format
169 169 dynamically.
170 170 """
171 171 return '>'+('BB'*nbparams)
172 172
173 173 class UnknownPartError(KeyError):
174 174 """error raised when no handler is found for a Mandatory part"""
175 175 pass
176 176
177 177 parthandlermapping = {}
178 178
179 179 def parthandler(parttype):
180 180 """decorator that register a function as a bundle2 part handler
181 181
182 182 eg::
183 183
184 184 @parthandler('myparttype')
185 185 def myparttypehandler(...):
186 186 '''process a part of type "my part".'''
187 187 ...
188 188 """
189 189 def _decorator(func):
190 190 lparttype = parttype.lower() # enforce lower case matching.
191 191 assert lparttype not in parthandlermapping
192 192 parthandlermapping[lparttype] = func
193 193 return func
194 194 return _decorator
195 195
196 196 class unbundlerecords(object):
197 197 """keep record of what happens during and unbundle
198 198
199 199 New records are added using `records.add('cat', obj)`. Where 'cat' is a
200 200 category of record and obj is an arbitrary object.
201 201
202 202 `records['cat']` will return all entries of this category 'cat'.
203 203
204 204 Iterating on the object itself will yield `('category', obj)` tuples
205 205 for all entries.
206 206
207 207 All iterations happens in chronological order.
208 208 """
209 209
210 210 def __init__(self):
211 211 self._categories = {}
212 212 self._sequences = []
213 213 self._replies = {}
214 214
215 215 def add(self, category, entry, inreplyto=None):
216 216 """add a new record of a given category.
217 217
218 218 The entry can then be retrieved in the list returned by
219 219 self['category']."""
220 220 self._categories.setdefault(category, []).append(entry)
221 221 self._sequences.append((category, entry))
222 222 if inreplyto is not None:
223 223 self.getreplies(inreplyto).add(category, entry)
224 224
225 225 def getreplies(self, partid):
226 226 """get the subrecords that replies to a specific part"""
227 227 return self._replies.setdefault(partid, unbundlerecords())
228 228
229 229 def __getitem__(self, cat):
230 230 return tuple(self._categories.get(cat, ()))
231 231
232 232 def __iter__(self):
233 233 return iter(self._sequences)
234 234
235 235 def __len__(self):
236 236 return len(self._sequences)
237 237
238 238 def __nonzero__(self):
239 239 return bool(self._sequences)
240 240
241 241 class bundleoperation(object):
242 242 """an object that represents a single bundling process
243 243
244 244 Its purpose is to carry unbundle-related objects and states.
245 245
246 246 A new object should be created at the beginning of each bundle processing.
247 247 The object is to be returned by the processing function.
248 248
249 249 The object has very little content now it will ultimately contain:
250 250 * an access to the repo the bundle is applied to,
251 251 * a ui object,
252 252 * a way to retrieve a transaction to add changes to the repo,
253 253 * a way to record the result of processing each part,
254 254 * a way to construct a bundle response when applicable.
255 255 """
256 256
257 257 def __init__(self, repo, transactiongetter):
258 258 self.repo = repo
259 259 self.ui = repo.ui
260 260 self.records = unbundlerecords()
261 261 self.gettransaction = transactiongetter
262 262 self.reply = None
263 263
264 264 class TransactionUnavailable(RuntimeError):
265 265 pass
266 266
267 267 def _notransaction():
268 268 """default method to get a transaction while processing a bundle
269 269
270 270 Raise an exception to highlight the fact that no transaction was expected
271 271 to be created"""
272 272 raise TransactionUnavailable()
273 273
274 274 def processbundle(repo, unbundler, transactiongetter=_notransaction):
275 275 """This function process a bundle, apply effect to/from a repo
276 276
277 277 It iterates over each part then searches for and uses the proper handling
278 278 code to process the part. Parts are processed in order.
279 279
280 280 This is very early version of this function that will be strongly reworked
281 281 before final usage.
282 282
283 283 Unknown Mandatory part will abort the process.
284 284 """
285 285 op = bundleoperation(repo, transactiongetter)
286 286 # todo:
287 287 # - replace this is a init function soon.
288 288 # - exception catching
289 289 unbundler.params
290 290 iterparts = unbundler.iterparts()
291 291 part = None
292 292 try:
293 293 for part in iterparts:
294 294 parttype = part.type
295 295 # part key are matched lower case
296 296 key = parttype.lower()
297 297 try:
298 298 handler = parthandlermapping[key]
299 299 op.ui.debug('found a handler for part %r\n' % parttype)
300 300 except KeyError:
301 301 if key != parttype: # mandatory parts
302 302 # todo:
303 303 # - use a more precise exception
304 304 raise UnknownPartError(key)
305 305 op.ui.debug('ignoring unknown advisory part %r\n' % key)
306 306 # consuming the part
307 307 part.read()
308 308 continue
309 309
310 310 # handler is called outside the above try block so that we don't
311 311 # risk catching KeyErrors from anything other than the
312 312 # parthandlermapping lookup (any KeyError raised by handler()
313 313 # itself represents a defect of a different variety).
314 314 output = None
315 315 if op.reply is not None:
316 316 op.ui.pushbuffer(error=True)
317 317 output = ''
318 318 try:
319 319 handler(op, part)
320 320 finally:
321 321 if output is not None:
322 322 output = op.ui.popbuffer()
323 323 if output:
324 324 op.reply.newpart('b2x:output',
325 325 advisoryparams=[('in-reply-to',
326 326 str(part.id))],
327 327 data=output)
328 328 part.read()
329 329 except Exception, exc:
330 330 if part is not None:
331 331 # consume the bundle content
332 332 part.read()
333 333 for part in iterparts:
334 334 # consume the bundle content
335 335 part.read()
336 336 # Small hack to let caller code distinguish exceptions from bundle2
337 337 # processing fron the ones from bundle1 processing. This is mostly
338 338 # needed to handle different return codes to unbundle according to the
339 339 # type of bundle. We should probably clean up or drop this return code
340 340 # craziness in a future version.
341 341 exc.duringunbundle2 = True
342 342 raise
343 343 return op
344 344
345 345 def decodecaps(blob):
346 346 """decode a bundle2 caps bytes blob into a dictionnary
347 347
348 348 The blob is a list of capabilities (one per line)
349 349 Capabilities may have values using a line of the form::
350 350
351 351 capability=value1,value2,value3
352 352
353 353 The values are always a list."""
354 354 caps = {}
355 355 for line in blob.splitlines():
356 356 if not line:
357 357 continue
358 358 if '=' not in line:
359 359 key, vals = line, ()
360 360 else:
361 361 key, vals = line.split('=', 1)
362 362 vals = vals.split(',')
363 363 key = urllib.unquote(key)
364 364 vals = [urllib.unquote(v) for v in vals]
365 365 caps[key] = vals
366 366 return caps
367 367
368 368 def encodecaps(caps):
369 369 """encode a bundle2 caps dictionary into a bytes blob"""
370 370 chunks = []
371 371 for ca in sorted(caps):
372 372 vals = caps[ca]
373 373 ca = urllib.quote(ca)
374 374 vals = [urllib.quote(v) for v in vals]
375 375 if vals:
376 376 ca = "%s=%s" % (ca, ','.join(vals))
377 377 chunks.append(ca)
378 378 return '\n'.join(chunks)
379 379
380 380 class bundle20(object):
381 381 """represent an outgoing bundle2 container
382 382
383 383 Use the `addparam` method to add stream level parameter. and `newpart` to
384 384 populate it. Then call `getchunks` to retrieve all the binary chunks of
385 385 data that compose the bundle2 container."""
386 386
387 387 def __init__(self, ui, capabilities=()):
388 388 self.ui = ui
389 389 self._params = []
390 390 self._parts = []
391 391 self.capabilities = dict(capabilities)
392 392
393 393 # methods used to defines the bundle2 content
394 394 def addparam(self, name, value=None):
395 395 """add a stream level parameter"""
396 396 if not name:
397 397 raise ValueError('empty parameter name')
398 398 if name[0] not in string.letters:
399 399 raise ValueError('non letter first character: %r' % name)
400 400 self._params.append((name, value))
401 401
402 402 def addpart(self, part):
403 403 """add a new part to the bundle2 container
404 404
405 405 Parts contains the actual applicative payload."""
406 406 assert part.id is None
407 407 part.id = len(self._parts) # very cheap counter
408 408 self._parts.append(part)
409 409
410 410 def newpart(self, typeid, *args, **kwargs):
411 411 """create a new part and add it to the containers"""
412 412 part = bundlepart(typeid, *args, **kwargs)
413 413 self.addpart(part)
414 414 return part
415 415
416 416 # methods used to generate the bundle2 stream
417 417 def getchunks(self):
418 418 self.ui.debug('start emission of %s stream\n' % _magicstring)
419 419 yield _magicstring
420 420 param = self._paramchunk()
421 421 self.ui.debug('bundle parameter: %s\n' % param)
422 422 yield _pack(_fstreamparamsize, len(param))
423 423 if param:
424 424 yield param
425 425
426 426 self.ui.debug('start of parts\n')
427 427 for part in self._parts:
428 428 self.ui.debug('bundle part: "%s"\n' % part.type)
429 429 for chunk in part.getchunks():
430 430 yield chunk
431 431 self.ui.debug('end of bundle\n')
432 432 yield '\0\0'
433 433
434 434 def _paramchunk(self):
435 435 """return a encoded version of all stream parameters"""
436 436 blocks = []
437 437 for par, value in self._params:
438 438 par = urllib.quote(par)
439 439 if value is not None:
440 440 value = urllib.quote(value)
441 441 par = '%s=%s' % (par, value)
442 442 blocks.append(par)
443 443 return ' '.join(blocks)
444 444
445 445 class unpackermixin(object):
446 446 """A mixin to extract bytes and struct data from a stream"""
447 447
448 448 def __init__(self, fp):
449 449 self._fp = fp
450 450
451 451 def _unpack(self, format):
452 452 """unpack this struct format from the stream"""
453 453 data = self._readexact(struct.calcsize(format))
454 454 return _unpack(format, data)
455 455
456 456 def _readexact(self, size):
457 457 """read exactly <size> bytes from the stream"""
458 458 return changegroup.readexactly(self._fp, size)
459 459
460 460
461 461 class unbundle20(unpackermixin):
462 462 """interpret a bundle2 stream
463 463
464 464 This class is fed with a binary stream and yields parts through its
465 465 `iterparts` methods."""
466 466
467 467 def __init__(self, ui, fp, header=None):
468 468 """If header is specified, we do not read it out of the stream."""
469 469 self.ui = ui
470 470 super(unbundle20, self).__init__(fp)
471 471 if header is None:
472 472 header = self._readexact(4)
473 473 magic, version = header[0:2], header[2:4]
474 474 if magic != 'HG':
475 475 raise util.Abort(_('not a Mercurial bundle'))
476 476 if version != '2X':
477 477 raise util.Abort(_('unknown bundle version %s') % version)
478 478 self.ui.debug('start processing of %s stream\n' % header)
479 479
480 480 @util.propertycache
481 481 def params(self):
482 482 """dictionary of stream level parameters"""
483 483 self.ui.debug('reading bundle2 stream parameters\n')
484 484 params = {}
485 485 paramssize = self._unpack(_fstreamparamsize)[0]
486 486 if paramssize:
487 487 for p in self._readexact(paramssize).split(' '):
488 488 p = p.split('=', 1)
489 489 p = [urllib.unquote(i) for i in p]
490 490 if len(p) < 2:
491 491 p.append(None)
492 492 self._processparam(*p)
493 493 params[p[0]] = p[1]
494 494 return params
495 495
496 496 def _processparam(self, name, value):
497 497 """process a parameter, applying its effect if needed
498 498
499 499 Parameter starting with a lower case letter are advisory and will be
500 500 ignored when unknown. Those starting with an upper case letter are
501 501 mandatory and will this function will raise a KeyError when unknown.
502 502
503 503 Note: no option are currently supported. Any input will be either
504 504 ignored or failing.
505 505 """
506 506 if not name:
507 507 raise ValueError('empty parameter name')
508 508 if name[0] not in string.letters:
509 509 raise ValueError('non letter first character: %r' % name)
510 510 # Some logic will be later added here to try to process the option for
511 511 # a dict of known parameter.
512 512 if name[0].islower():
513 513 self.ui.debug("ignoring unknown parameter %r\n" % name)
514 514 else:
515 515 raise KeyError(name)
516 516
517 517
518 518 def iterparts(self):
519 519 """yield all parts contained in the stream"""
520 520 # make sure param have been loaded
521 521 self.params
522 522 self.ui.debug('start extraction of bundle2 parts\n')
523 523 headerblock = self._readpartheader()
524 524 while headerblock is not None:
525 525 part = unbundlepart(self.ui, headerblock, self._fp)
526 526 yield part
527 527 headerblock = self._readpartheader()
528 528 self.ui.debug('end of bundle2 stream\n')
529 529
530 530 def _readpartheader(self):
531 531 """reads a part header size and return the bytes blob
532 532
533 533 returns None if empty"""
534 534 headersize = self._unpack(_fpartheadersize)[0]
535 535 self.ui.debug('part header size: %i\n' % headersize)
536 536 if headersize:
537 537 return self._readexact(headersize)
538 538 return None
539 539
540 540
541 541 class bundlepart(object):
542 542 """A bundle2 part contains application level payload
543 543
544 544 The part `type` is used to route the part to the application level
545 545 handler.
546 546 """
547 547
548 548 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
549 549 data=''):
550 550 self.id = None
551 551 self.type = parttype
552 552 self.data = data
553 553 self.mandatoryparams = mandatoryparams
554 554 self.advisoryparams = advisoryparams
555 # status of the part's generation:
556 # - None: not started,
557 # - False: currently generated,
558 # - True: generation done.
559 self._generated = None
555 560
561 # methods used to generates the bundle2 stream
556 562 def getchunks(self):
563 if self._generated is not None:
564 raise RuntimeError('part can only be consumed once')
565 self._generated = False
557 566 #### header
558 567 ## parttype
559 568 header = [_pack(_fparttypesize, len(self.type)),
560 569 self.type, _pack(_fpartid, self.id),
561 570 ]
562 571 ## parameters
563 572 # count
564 573 manpar = self.mandatoryparams
565 574 advpar = self.advisoryparams
566 575 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
567 576 # size
568 577 parsizes = []
569 578 for key, value in manpar:
570 579 parsizes.append(len(key))
571 580 parsizes.append(len(value))
572 581 for key, value in advpar:
573 582 parsizes.append(len(key))
574 583 parsizes.append(len(value))
575 584 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
576 585 header.append(paramsizes)
577 586 # key, value
578 587 for key, value in manpar:
579 588 header.append(key)
580 589 header.append(value)
581 590 for key, value in advpar:
582 591 header.append(key)
583 592 header.append(value)
584 593 ## finalize header
585 594 headerchunk = ''.join(header)
586 595 yield _pack(_fpartheadersize, len(headerchunk))
587 596 yield headerchunk
588 597 ## payload
589 598 for chunk in self._payloadchunks():
590 599 yield _pack(_fpayloadsize, len(chunk))
591 600 yield chunk
592 601 # end of payload
593 602 yield _pack(_fpayloadsize, 0)
603 self._generated = True
594 604
595 605 def _payloadchunks(self):
596 606 """yield chunks of a the part payload
597 607
598 608 Exists to handle the different methods to provide data to a part."""
599 609 # we only support fixed size data now.
600 610 # This will be improved in the future.
601 611 if util.safehasattr(self.data, 'next'):
602 612 buff = util.chunkbuffer(self.data)
603 613 chunk = buff.read(preferedchunksize)
604 614 while chunk:
605 615 yield chunk
606 616 chunk = buff.read(preferedchunksize)
607 617 elif len(self.data):
608 618 yield self.data
609 619
610 620 class unbundlepart(unpackermixin):
611 621 """a bundle part read from a bundle"""
612 622
613 623 def __init__(self, ui, header, fp):
614 624 super(unbundlepart, self).__init__(fp)
615 625 self.ui = ui
616 626 # unbundle state attr
617 627 self._headerdata = header
618 628 self._headeroffset = 0
619 629 self._initialized = False
620 630 self.consumed = False
621 631 # part data
622 632 self.id = None
623 633 self.type = None
624 634 self.mandatoryparams = None
625 635 self.advisoryparams = None
626 636 self._payloadstream = None
627 637 self._readheader()
628 638
629 639 def _fromheader(self, size):
630 640 """return the next <size> byte from the header"""
631 641 offset = self._headeroffset
632 642 data = self._headerdata[offset:(offset + size)]
633 643 self._headeroffset = offset + size
634 644 return data
635 645
636 646 def _unpackheader(self, format):
637 647 """read given format from header
638 648
639 649 This automatically compute the size of the format to read."""
640 650 data = self._fromheader(struct.calcsize(format))
641 651 return _unpack(format, data)
642 652
643 653 def _readheader(self):
644 654 """read the header and setup the object"""
645 655 typesize = self._unpackheader(_fparttypesize)[0]
646 656 self.type = self._fromheader(typesize)
647 657 self.ui.debug('part type: "%s"\n' % self.type)
648 658 self.id = self._unpackheader(_fpartid)[0]
649 659 self.ui.debug('part id: "%s"\n' % self.id)
650 660 ## reading parameters
651 661 # param count
652 662 mancount, advcount = self._unpackheader(_fpartparamcount)
653 663 self.ui.debug('part parameters: %i\n' % (mancount + advcount))
654 664 # param size
655 665 fparamsizes = _makefpartparamsizes(mancount + advcount)
656 666 paramsizes = self._unpackheader(fparamsizes)
657 667 # make it a list of couple again
658 668 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
659 669 # split mandatory from advisory
660 670 mansizes = paramsizes[:mancount]
661 671 advsizes = paramsizes[mancount:]
662 672 # retrive param value
663 673 manparams = []
664 674 for key, value in mansizes:
665 675 manparams.append((self._fromheader(key), self._fromheader(value)))
666 676 advparams = []
667 677 for key, value in advsizes:
668 678 advparams.append((self._fromheader(key), self._fromheader(value)))
669 679 self.mandatoryparams = manparams
670 680 self.advisoryparams = advparams
671 681 ## part payload
672 682 def payloadchunks():
673 683 payloadsize = self._unpack(_fpayloadsize)[0]
674 684 self.ui.debug('payload chunk size: %i\n' % payloadsize)
675 685 while payloadsize:
676 686 yield self._readexact(payloadsize)
677 687 payloadsize = self._unpack(_fpayloadsize)[0]
678 688 self.ui.debug('payload chunk size: %i\n' % payloadsize)
679 689 self._payloadstream = util.chunkbuffer(payloadchunks())
680 690 # we read the data, tell it
681 691 self._initialized = True
682 692
683 693 def read(self, size=None):
684 694 """read payload data"""
685 695 if not self._initialized:
686 696 self._readheader()
687 697 if size is None:
688 698 data = self._payloadstream.read()
689 699 else:
690 700 data = self._payloadstream.read(size)
691 701 if size is None or len(data) < size:
692 702 self.consumed = True
693 703 return data
694 704
695 705
696 706 @parthandler('b2x:changegroup')
697 707 def handlechangegroup(op, inpart):
698 708 """apply a changegroup part on the repo
699 709
700 710 This is a very early implementation that will massive rework before being
701 711 inflicted to any end-user.
702 712 """
703 713 # Make sure we trigger a transaction creation
704 714 #
705 715 # The addchangegroup function will get a transaction object by itself, but
706 716 # we need to make sure we trigger the creation of a transaction object used
707 717 # for the whole processing scope.
708 718 op.gettransaction()
709 719 cg = changegroup.unbundle10(inpart, 'UN')
710 720 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
711 721 op.records.add('changegroup', {'return': ret})
712 722 if op.reply is not None:
713 723 # This is definitly not the final form of this
714 724 # return. But one need to start somewhere.
715 725 op.reply.newpart('b2x:reply:changegroup', (),
716 726 [('in-reply-to', str(inpart.id)),
717 727 ('return', '%i' % ret)])
718 728 assert not inpart.read()
719 729
720 730 @parthandler('b2x:reply:changegroup')
721 731 def handlechangegroup(op, inpart):
722 732 p = dict(inpart.advisoryparams)
723 733 ret = int(p['return'])
724 734 op.records.add('changegroup', {'return': ret}, int(p['in-reply-to']))
725 735
726 736 @parthandler('b2x:check:heads')
727 737 def handlechangegroup(op, inpart):
728 738 """check that head of the repo did not change
729 739
730 740 This is used to detect a push race when using unbundle.
731 741 This replaces the "heads" argument of unbundle."""
732 742 h = inpart.read(20)
733 743 heads = []
734 744 while len(h) == 20:
735 745 heads.append(h)
736 746 h = inpart.read(20)
737 747 assert not h
738 748 if heads != op.repo.heads():
739 749 raise error.PushRaced('repository changed while pushing - '
740 750 'please try again')
741 751
742 752 @parthandler('b2x:output')
743 753 def handleoutput(op, inpart):
744 754 """forward output captured on the server to the client"""
745 755 for line in inpart.read().splitlines():
746 756 op.ui.write(('remote: %s\n' % line))
747 757
748 758 @parthandler('b2x:replycaps')
749 759 def handlereplycaps(op, inpart):
750 760 """Notify that a reply bundle should be created
751 761
752 762 The payload contains the capabilities information for the reply"""
753 763 caps = decodecaps(inpart.read())
754 764 if op.reply is None:
755 765 op.reply = bundle20(op.ui, caps)
756 766
757 767 @parthandler('b2x:error:abort')
758 768 def handlereplycaps(op, inpart):
759 769 """Used to transmit abort error over the wire"""
760 770 manargs = dict(inpart.mandatoryparams)
761 771 advargs = dict(inpart.advisoryparams)
762 772 raise util.Abort(manargs['message'], hint=advargs.get('hint'))
763 773
764 774 @parthandler('b2x:error:unknownpart')
765 775 def handlereplycaps(op, inpart):
766 776 """Used to transmit unknown part error over the wire"""
767 777 manargs = dict(inpart.mandatoryparams)
768 778 raise UnknownPartError(manargs['parttype'])
769 779
770 780 @parthandler('b2x:error:pushraced')
771 781 def handlereplycaps(op, inpart):
772 782 """Used to transmit push race error over the wire"""
773 783 manargs = dict(inpart.mandatoryparams)
774 784 raise error.ResponseError(_('push failed:'), manargs['message'])
General Comments 0
You need to be logged in to leave comments. Login now