##// END OF EJS Templates
bundle2: update part creators to ``addparam`` when relevant...
Pierre-Yves David -
r21606:e5588844 default
parent child Browse files
Show More
@@ -1,831 +1,829 b''
1 1 # bundle2.py - generic container format to transmit arbitrary data.
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7 """Handling of the new bundle2 format
8 8
9 9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 11 that will be handed to and processed by the application layer.
12 12
13 13
14 14 General format architecture
15 15 ===========================
16 16
17 17 The format is architectured as follow
18 18
19 19 - magic string
20 20 - stream level parameters
21 21 - payload parts (any number)
22 22 - end of stream marker.
23 23
24 24 the Binary format
25 25 ============================
26 26
27 27 All numbers are unsigned and big-endian.
28 28
29 29 stream level parameters
30 30 ------------------------
31 31
32 32 Binary format is as follow
33 33
34 34 :params size: (16 bits integer)
35 35
36 36 The total number of Bytes used by the parameters
37 37
38 38 :params value: arbitrary number of Bytes
39 39
40 40 A blob of `params size` containing the serialized version of all stream level
41 41 parameters.
42 42
43 43 The blob contains a space separated list of parameters. Parameters with value
44 44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45 45
46 46 Empty name are obviously forbidden.
47 47
48 48 Name MUST start with a letter. If this first letter is lower case, the
49 49 parameter is advisory and can be safely ignored. However when the first
50 50 letter is capital, the parameter is mandatory and the bundling process MUST
51 51 stop if he is not able to proceed it.
52 52
53 53 Stream parameters use a simple textual format for two main reasons:
54 54
55 55 - Stream level parameters should remain simple and we want to discourage any
56 56 crazy usage.
57 57 - Textual data allow easy human inspection of a bundle2 header in case of
58 58 troubles.
59 59
60 60 Any Applicative level options MUST go into a bundle2 part instead.
61 61
62 62 Payload part
63 63 ------------------------
64 64
65 65 Binary format is as follow
66 66
67 67 :header size: (16 bits inter)
68 68
69 69 The total number of Bytes used by the part headers. When the header is empty
70 70 (size = 0) this is interpreted as the end of stream marker.
71 71
72 72 :header:
73 73
74 74 The header defines how to interpret the part. It contains two piece of
75 75 data: the part type, and the part parameters.
76 76
77 77 The part type is used to route an application level handler, that can
78 78 interpret payload.
79 79
80 80 Part parameters are passed to the application level handler. They are
81 81 meant to convey information that will help the application level object to
82 82 interpret the part payload.
83 83
84 84 The binary format of the header is has follow
85 85
86 86 :typesize: (one byte)
87 87
88 88 :parttype: alphanumerical part name
89 89
90 90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 91 to this part.
92 92
93 93 :parameters:
94 94
95 95 Part's parameter may have arbitrary content, the binary structure is::
96 96
97 97 <mandatory-count><advisory-count><param-sizes><param-data>
98 98
99 99 :mandatory-count: 1 byte, number of mandatory parameters
100 100
101 101 :advisory-count: 1 byte, number of advisory parameters
102 102
103 103 :param-sizes:
104 104
105 105 N couple of bytes, where N is the total number of parameters. Each
106 106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107 107
108 108 :param-data:
109 109
110 110 A blob of bytes from which each parameter key and value can be
111 111 retrieved using the list of size couples stored in the previous
112 112 field.
113 113
114 114 Mandatory parameters comes first, then the advisory ones.
115 115
116 116 :payload:
117 117
118 118 payload is a series of `<chunksize><chunkdata>`.
119 119
120 120 `chunksize` is a 32 bits integer, `chunkdata` are plain bytes (as much as
121 121 `chunksize` says)` The payload part is concluded by a zero size chunk.
122 122
123 123 The current implementation always produces either zero or one chunk.
124 124 This is an implementation limitation that will ultimately be lifted.
125 125
126 126 Bundle processing
127 127 ============================
128 128
129 129 Each part is processed in order using a "part handler". Handler are registered
130 130 for a certain part type.
131 131
132 132 The matching of a part to its handler is case insensitive. The case of the
133 133 part type is used to know if a part is mandatory or advisory. If the Part type
134 134 contains any uppercase char it is considered mandatory. When no handler is
135 135 known for a Mandatory part, the process is aborted and an exception is raised.
136 136 If the part is advisory and no handler is known, the part is ignored. When the
137 137 process is aborted, the full bundle is still read from the stream to keep the
138 138 channel usable. But none of the part read from an abort are processed. In the
139 139 future, dropping the stream may become an option for channel we do not care to
140 140 preserve.
141 141 """
142 142
143 143 import util
144 144 import struct
145 145 import urllib
146 146 import string
147 147
148 148 import changegroup, error
149 149 from i18n import _
150 150
151 151 _pack = struct.pack
152 152 _unpack = struct.unpack
153 153
154 154 _magicstring = 'HG2X'
155 155
156 156 _fstreamparamsize = '>H'
157 157 _fpartheadersize = '>H'
158 158 _fparttypesize = '>B'
159 159 _fpartid = '>I'
160 160 _fpayloadsize = '>I'
161 161 _fpartparamcount = '>BB'
162 162
163 163 preferedchunksize = 4096
164 164
165 165 def _makefpartparamsizes(nbparams):
166 166 """return a struct format to read part parameter sizes
167 167
168 168 The number parameters is variable so we need to build that format
169 169 dynamically.
170 170 """
171 171 return '>'+('BB'*nbparams)
172 172
173 173 class UnknownPartError(KeyError):
174 174 """error raised when no handler is found for a Mandatory part"""
175 175 pass
176 176
177 177 class ReadOnlyPartError(RuntimeError):
178 178 """error raised when code tries to alter a part being generated"""
179 179 pass
180 180
181 181 parthandlermapping = {}
182 182
183 183 def parthandler(parttype):
184 184 """decorator that register a function as a bundle2 part handler
185 185
186 186 eg::
187 187
188 188 @parthandler('myparttype')
189 189 def myparttypehandler(...):
190 190 '''process a part of type "my part".'''
191 191 ...
192 192 """
193 193 def _decorator(func):
194 194 lparttype = parttype.lower() # enforce lower case matching.
195 195 assert lparttype not in parthandlermapping
196 196 parthandlermapping[lparttype] = func
197 197 return func
198 198 return _decorator
199 199
200 200 class unbundlerecords(object):
201 201 """keep record of what happens during and unbundle
202 202
203 203 New records are added using `records.add('cat', obj)`. Where 'cat' is a
204 204 category of record and obj is an arbitrary object.
205 205
206 206 `records['cat']` will return all entries of this category 'cat'.
207 207
208 208 Iterating on the object itself will yield `('category', obj)` tuples
209 209 for all entries.
210 210
211 211 All iterations happens in chronological order.
212 212 """
213 213
214 214 def __init__(self):
215 215 self._categories = {}
216 216 self._sequences = []
217 217 self._replies = {}
218 218
219 219 def add(self, category, entry, inreplyto=None):
220 220 """add a new record of a given category.
221 221
222 222 The entry can then be retrieved in the list returned by
223 223 self['category']."""
224 224 self._categories.setdefault(category, []).append(entry)
225 225 self._sequences.append((category, entry))
226 226 if inreplyto is not None:
227 227 self.getreplies(inreplyto).add(category, entry)
228 228
229 229 def getreplies(self, partid):
230 230 """get the subrecords that replies to a specific part"""
231 231 return self._replies.setdefault(partid, unbundlerecords())
232 232
233 233 def __getitem__(self, cat):
234 234 return tuple(self._categories.get(cat, ()))
235 235
236 236 def __iter__(self):
237 237 return iter(self._sequences)
238 238
239 239 def __len__(self):
240 240 return len(self._sequences)
241 241
242 242 def __nonzero__(self):
243 243 return bool(self._sequences)
244 244
245 245 class bundleoperation(object):
246 246 """an object that represents a single bundling process
247 247
248 248 Its purpose is to carry unbundle-related objects and states.
249 249
250 250 A new object should be created at the beginning of each bundle processing.
251 251 The object is to be returned by the processing function.
252 252
253 253 The object has very little content now it will ultimately contain:
254 254 * an access to the repo the bundle is applied to,
255 255 * a ui object,
256 256 * a way to retrieve a transaction to add changes to the repo,
257 257 * a way to record the result of processing each part,
258 258 * a way to construct a bundle response when applicable.
259 259 """
260 260
261 261 def __init__(self, repo, transactiongetter):
262 262 self.repo = repo
263 263 self.ui = repo.ui
264 264 self.records = unbundlerecords()
265 265 self.gettransaction = transactiongetter
266 266 self.reply = None
267 267
268 268 class TransactionUnavailable(RuntimeError):
269 269 pass
270 270
271 271 def _notransaction():
272 272 """default method to get a transaction while processing a bundle
273 273
274 274 Raise an exception to highlight the fact that no transaction was expected
275 275 to be created"""
276 276 raise TransactionUnavailable()
277 277
278 278 def processbundle(repo, unbundler, transactiongetter=_notransaction):
279 279 """This function process a bundle, apply effect to/from a repo
280 280
281 281 It iterates over each part then searches for and uses the proper handling
282 282 code to process the part. Parts are processed in order.
283 283
284 284 This is very early version of this function that will be strongly reworked
285 285 before final usage.
286 286
287 287 Unknown Mandatory part will abort the process.
288 288 """
289 289 op = bundleoperation(repo, transactiongetter)
290 290 # todo:
291 291 # - replace this is a init function soon.
292 292 # - exception catching
293 293 unbundler.params
294 294 iterparts = unbundler.iterparts()
295 295 part = None
296 296 try:
297 297 for part in iterparts:
298 298 parttype = part.type
299 299 # part key are matched lower case
300 300 key = parttype.lower()
301 301 try:
302 302 handler = parthandlermapping[key]
303 303 op.ui.debug('found a handler for part %r\n' % parttype)
304 304 except KeyError:
305 305 if key != parttype: # mandatory parts
306 306 # todo:
307 307 # - use a more precise exception
308 308 raise UnknownPartError(key)
309 309 op.ui.debug('ignoring unknown advisory part %r\n' % key)
310 310 # consuming the part
311 311 part.read()
312 312 continue
313 313
314 314 # handler is called outside the above try block so that we don't
315 315 # risk catching KeyErrors from anything other than the
316 316 # parthandlermapping lookup (any KeyError raised by handler()
317 317 # itself represents a defect of a different variety).
318 318 output = None
319 319 if op.reply is not None:
320 320 op.ui.pushbuffer(error=True)
321 321 output = ''
322 322 try:
323 323 handler(op, part)
324 324 finally:
325 325 if output is not None:
326 326 output = op.ui.popbuffer()
327 327 if output:
328 op.reply.newpart('b2x:output',
329 advisoryparams=[('in-reply-to',
330 str(part.id))],
331 data=output)
328 outpart = op.reply.newpart('b2x:output', data=output)
329 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
332 330 part.read()
333 331 except Exception, exc:
334 332 if part is not None:
335 333 # consume the bundle content
336 334 part.read()
337 335 for part in iterparts:
338 336 # consume the bundle content
339 337 part.read()
340 338 # Small hack to let caller code distinguish exceptions from bundle2
341 339 # processing fron the ones from bundle1 processing. This is mostly
342 340 # needed to handle different return codes to unbundle according to the
343 341 # type of bundle. We should probably clean up or drop this return code
344 342 # craziness in a future version.
345 343 exc.duringunbundle2 = True
346 344 raise
347 345 return op
348 346
349 347 def decodecaps(blob):
350 348 """decode a bundle2 caps bytes blob into a dictionnary
351 349
352 350 The blob is a list of capabilities (one per line)
353 351 Capabilities may have values using a line of the form::
354 352
355 353 capability=value1,value2,value3
356 354
357 355 The values are always a list."""
358 356 caps = {}
359 357 for line in blob.splitlines():
360 358 if not line:
361 359 continue
362 360 if '=' not in line:
363 361 key, vals = line, ()
364 362 else:
365 363 key, vals = line.split('=', 1)
366 364 vals = vals.split(',')
367 365 key = urllib.unquote(key)
368 366 vals = [urllib.unquote(v) for v in vals]
369 367 caps[key] = vals
370 368 return caps
371 369
372 370 def encodecaps(caps):
373 371 """encode a bundle2 caps dictionary into a bytes blob"""
374 372 chunks = []
375 373 for ca in sorted(caps):
376 374 vals = caps[ca]
377 375 ca = urllib.quote(ca)
378 376 vals = [urllib.quote(v) for v in vals]
379 377 if vals:
380 378 ca = "%s=%s" % (ca, ','.join(vals))
381 379 chunks.append(ca)
382 380 return '\n'.join(chunks)
383 381
384 382 class bundle20(object):
385 383 """represent an outgoing bundle2 container
386 384
387 385 Use the `addparam` method to add stream level parameter. and `newpart` to
388 386 populate it. Then call `getchunks` to retrieve all the binary chunks of
389 387 data that compose the bundle2 container."""
390 388
391 389 def __init__(self, ui, capabilities=()):
392 390 self.ui = ui
393 391 self._params = []
394 392 self._parts = []
395 393 self.capabilities = dict(capabilities)
396 394
397 395 # methods used to defines the bundle2 content
398 396 def addparam(self, name, value=None):
399 397 """add a stream level parameter"""
400 398 if not name:
401 399 raise ValueError('empty parameter name')
402 400 if name[0] not in string.letters:
403 401 raise ValueError('non letter first character: %r' % name)
404 402 self._params.append((name, value))
405 403
406 404 def addpart(self, part):
407 405 """add a new part to the bundle2 container
408 406
409 407 Parts contains the actual applicative payload."""
410 408 assert part.id is None
411 409 part.id = len(self._parts) # very cheap counter
412 410 self._parts.append(part)
413 411
414 412 def newpart(self, typeid, *args, **kwargs):
415 413 """create a new part and add it to the containers
416 414
417 415 As the part is directly added to the containers. For now, this means
418 416 that any failure to properly initialize the part after calling
419 417 ``newpart`` should result in a failure of the whole bundling process.
420 418
421 419 You can still fall back to manually create and add if you need better
422 420 control."""
423 421 part = bundlepart(typeid, *args, **kwargs)
424 422 self.addpart(part)
425 423 return part
426 424
427 425 # methods used to generate the bundle2 stream
428 426 def getchunks(self):
429 427 self.ui.debug('start emission of %s stream\n' % _magicstring)
430 428 yield _magicstring
431 429 param = self._paramchunk()
432 430 self.ui.debug('bundle parameter: %s\n' % param)
433 431 yield _pack(_fstreamparamsize, len(param))
434 432 if param:
435 433 yield param
436 434
437 435 self.ui.debug('start of parts\n')
438 436 for part in self._parts:
439 437 self.ui.debug('bundle part: "%s"\n' % part.type)
440 438 for chunk in part.getchunks():
441 439 yield chunk
442 440 self.ui.debug('end of bundle\n')
443 441 yield '\0\0'
444 442
445 443 def _paramchunk(self):
446 444 """return a encoded version of all stream parameters"""
447 445 blocks = []
448 446 for par, value in self._params:
449 447 par = urllib.quote(par)
450 448 if value is not None:
451 449 value = urllib.quote(value)
452 450 par = '%s=%s' % (par, value)
453 451 blocks.append(par)
454 452 return ' '.join(blocks)
455 453
456 454 class unpackermixin(object):
457 455 """A mixin to extract bytes and struct data from a stream"""
458 456
459 457 def __init__(self, fp):
460 458 self._fp = fp
461 459
462 460 def _unpack(self, format):
463 461 """unpack this struct format from the stream"""
464 462 data = self._readexact(struct.calcsize(format))
465 463 return _unpack(format, data)
466 464
467 465 def _readexact(self, size):
468 466 """read exactly <size> bytes from the stream"""
469 467 return changegroup.readexactly(self._fp, size)
470 468
471 469
472 470 class unbundle20(unpackermixin):
473 471 """interpret a bundle2 stream
474 472
475 473 This class is fed with a binary stream and yields parts through its
476 474 `iterparts` methods."""
477 475
478 476 def __init__(self, ui, fp, header=None):
479 477 """If header is specified, we do not read it out of the stream."""
480 478 self.ui = ui
481 479 super(unbundle20, self).__init__(fp)
482 480 if header is None:
483 481 header = self._readexact(4)
484 482 magic, version = header[0:2], header[2:4]
485 483 if magic != 'HG':
486 484 raise util.Abort(_('not a Mercurial bundle'))
487 485 if version != '2X':
488 486 raise util.Abort(_('unknown bundle version %s') % version)
489 487 self.ui.debug('start processing of %s stream\n' % header)
490 488
491 489 @util.propertycache
492 490 def params(self):
493 491 """dictionary of stream level parameters"""
494 492 self.ui.debug('reading bundle2 stream parameters\n')
495 493 params = {}
496 494 paramssize = self._unpack(_fstreamparamsize)[0]
497 495 if paramssize:
498 496 for p in self._readexact(paramssize).split(' '):
499 497 p = p.split('=', 1)
500 498 p = [urllib.unquote(i) for i in p]
501 499 if len(p) < 2:
502 500 p.append(None)
503 501 self._processparam(*p)
504 502 params[p[0]] = p[1]
505 503 return params
506 504
507 505 def _processparam(self, name, value):
508 506 """process a parameter, applying its effect if needed
509 507
510 508 Parameter starting with a lower case letter are advisory and will be
511 509 ignored when unknown. Those starting with an upper case letter are
512 510 mandatory and will this function will raise a KeyError when unknown.
513 511
514 512 Note: no option are currently supported. Any input will be either
515 513 ignored or failing.
516 514 """
517 515 if not name:
518 516 raise ValueError('empty parameter name')
519 517 if name[0] not in string.letters:
520 518 raise ValueError('non letter first character: %r' % name)
521 519 # Some logic will be later added here to try to process the option for
522 520 # a dict of known parameter.
523 521 if name[0].islower():
524 522 self.ui.debug("ignoring unknown parameter %r\n" % name)
525 523 else:
526 524 raise KeyError(name)
527 525
528 526
529 527 def iterparts(self):
530 528 """yield all parts contained in the stream"""
531 529 # make sure param have been loaded
532 530 self.params
533 531 self.ui.debug('start extraction of bundle2 parts\n')
534 532 headerblock = self._readpartheader()
535 533 while headerblock is not None:
536 534 part = unbundlepart(self.ui, headerblock, self._fp)
537 535 yield part
538 536 headerblock = self._readpartheader()
539 537 self.ui.debug('end of bundle2 stream\n')
540 538
541 539 def _readpartheader(self):
542 540 """reads a part header size and return the bytes blob
543 541
544 542 returns None if empty"""
545 543 headersize = self._unpack(_fpartheadersize)[0]
546 544 self.ui.debug('part header size: %i\n' % headersize)
547 545 if headersize:
548 546 return self._readexact(headersize)
549 547 return None
550 548
551 549
552 550 class bundlepart(object):
553 551 """A bundle2 part contains application level payload
554 552
555 553 The part `type` is used to route the part to the application level
556 554 handler.
557 555
558 556 The part payload is contained in ``part.data``. It could be raw bytes or a
559 557 generator of byte chunks.
560 558
561 559 You can add parameters to the part using the ``addparam`` method.
562 560 Parameters can be either mandatory (default) or advisory. Remote side
563 561 should be able to safely ignore the advisory ones.
564 562
565 563 Both data and parameters cannot be modified after the generation has begun.
566 564 """
567 565
568 566 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
569 567 data=''):
570 568 self.id = None
571 569 self.type = parttype
572 570 self._data = data
573 571 self._mandatoryparams = list(mandatoryparams)
574 572 self._advisoryparams = list(advisoryparams)
575 573 # status of the part's generation:
576 574 # - None: not started,
577 575 # - False: currently generated,
578 576 # - True: generation done.
579 577 self._generated = None
580 578
581 579 # methods used to defines the part content
582 580 def __setdata(self, data):
583 581 if self._generated is not None:
584 582 raise ReadOnlyPartError('part is being generated')
585 583 self._data = data
586 584 def __getdata(self):
587 585 return self._data
588 586 data = property(__getdata, __setdata)
589 587
590 588 @property
591 589 def mandatoryparams(self):
592 590 # make it an immutable tuple to force people through ``addparam``
593 591 return tuple(self._mandatoryparams)
594 592
595 593 @property
596 594 def advisoryparams(self):
597 595 # make it an immutable tuple to force people through ``addparam``
598 596 return tuple(self._advisoryparams)
599 597
600 598 def addparam(self, name, value='', mandatory=True):
601 599 if self._generated is not None:
602 600 raise ReadOnlyPartError('part is being generated')
603 601 params = self._advisoryparams
604 602 if mandatory:
605 603 params = self._mandatoryparams
606 604 params.append((name, value))
607 605
608 606 # methods used to generates the bundle2 stream
609 607 def getchunks(self):
610 608 if self._generated is not None:
611 609 raise RuntimeError('part can only be consumed once')
612 610 self._generated = False
613 611 #### header
614 612 ## parttype
615 613 header = [_pack(_fparttypesize, len(self.type)),
616 614 self.type, _pack(_fpartid, self.id),
617 615 ]
618 616 ## parameters
619 617 # count
620 618 manpar = self.mandatoryparams
621 619 advpar = self.advisoryparams
622 620 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
623 621 # size
624 622 parsizes = []
625 623 for key, value in manpar:
626 624 parsizes.append(len(key))
627 625 parsizes.append(len(value))
628 626 for key, value in advpar:
629 627 parsizes.append(len(key))
630 628 parsizes.append(len(value))
631 629 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
632 630 header.append(paramsizes)
633 631 # key, value
634 632 for key, value in manpar:
635 633 header.append(key)
636 634 header.append(value)
637 635 for key, value in advpar:
638 636 header.append(key)
639 637 header.append(value)
640 638 ## finalize header
641 639 headerchunk = ''.join(header)
642 640 yield _pack(_fpartheadersize, len(headerchunk))
643 641 yield headerchunk
644 642 ## payload
645 643 for chunk in self._payloadchunks():
646 644 yield _pack(_fpayloadsize, len(chunk))
647 645 yield chunk
648 646 # end of payload
649 647 yield _pack(_fpayloadsize, 0)
650 648 self._generated = True
651 649
652 650 def _payloadchunks(self):
653 651 """yield chunks of a the part payload
654 652
655 653 Exists to handle the different methods to provide data to a part."""
656 654 # we only support fixed size data now.
657 655 # This will be improved in the future.
658 656 if util.safehasattr(self.data, 'next'):
659 657 buff = util.chunkbuffer(self.data)
660 658 chunk = buff.read(preferedchunksize)
661 659 while chunk:
662 660 yield chunk
663 661 chunk = buff.read(preferedchunksize)
664 662 elif len(self.data):
665 663 yield self.data
666 664
667 665 class unbundlepart(unpackermixin):
668 666 """a bundle part read from a bundle"""
669 667
670 668 def __init__(self, ui, header, fp):
671 669 super(unbundlepart, self).__init__(fp)
672 670 self.ui = ui
673 671 # unbundle state attr
674 672 self._headerdata = header
675 673 self._headeroffset = 0
676 674 self._initialized = False
677 675 self.consumed = False
678 676 # part data
679 677 self.id = None
680 678 self.type = None
681 679 self.mandatoryparams = None
682 680 self.advisoryparams = None
683 681 self._payloadstream = None
684 682 self._readheader()
685 683
686 684 def _fromheader(self, size):
687 685 """return the next <size> byte from the header"""
688 686 offset = self._headeroffset
689 687 data = self._headerdata[offset:(offset + size)]
690 688 self._headeroffset = offset + size
691 689 return data
692 690
693 691 def _unpackheader(self, format):
694 692 """read given format from header
695 693
696 694 This automatically compute the size of the format to read."""
697 695 data = self._fromheader(struct.calcsize(format))
698 696 return _unpack(format, data)
699 697
700 698 def _readheader(self):
701 699 """read the header and setup the object"""
702 700 typesize = self._unpackheader(_fparttypesize)[0]
703 701 self.type = self._fromheader(typesize)
704 702 self.ui.debug('part type: "%s"\n' % self.type)
705 703 self.id = self._unpackheader(_fpartid)[0]
706 704 self.ui.debug('part id: "%s"\n' % self.id)
707 705 ## reading parameters
708 706 # param count
709 707 mancount, advcount = self._unpackheader(_fpartparamcount)
710 708 self.ui.debug('part parameters: %i\n' % (mancount + advcount))
711 709 # param size
712 710 fparamsizes = _makefpartparamsizes(mancount + advcount)
713 711 paramsizes = self._unpackheader(fparamsizes)
714 712 # make it a list of couple again
715 713 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
716 714 # split mandatory from advisory
717 715 mansizes = paramsizes[:mancount]
718 716 advsizes = paramsizes[mancount:]
719 717 # retrive param value
720 718 manparams = []
721 719 for key, value in mansizes:
722 720 manparams.append((self._fromheader(key), self._fromheader(value)))
723 721 advparams = []
724 722 for key, value in advsizes:
725 723 advparams.append((self._fromheader(key), self._fromheader(value)))
726 724 self.mandatoryparams = manparams
727 725 self.advisoryparams = advparams
728 726 ## part payload
729 727 def payloadchunks():
730 728 payloadsize = self._unpack(_fpayloadsize)[0]
731 729 self.ui.debug('payload chunk size: %i\n' % payloadsize)
732 730 while payloadsize:
733 731 yield self._readexact(payloadsize)
734 732 payloadsize = self._unpack(_fpayloadsize)[0]
735 733 self.ui.debug('payload chunk size: %i\n' % payloadsize)
736 734 self._payloadstream = util.chunkbuffer(payloadchunks())
737 735 # we read the data, tell it
738 736 self._initialized = True
739 737
740 738 def read(self, size=None):
741 739 """read payload data"""
742 740 if not self._initialized:
743 741 self._readheader()
744 742 if size is None:
745 743 data = self._payloadstream.read()
746 744 else:
747 745 data = self._payloadstream.read(size)
748 746 if size is None or len(data) < size:
749 747 self.consumed = True
750 748 return data
751 749
752 750
753 751 @parthandler('b2x:changegroup')
754 752 def handlechangegroup(op, inpart):
755 753 """apply a changegroup part on the repo
756 754
757 755 This is a very early implementation that will massive rework before being
758 756 inflicted to any end-user.
759 757 """
760 758 # Make sure we trigger a transaction creation
761 759 #
762 760 # The addchangegroup function will get a transaction object by itself, but
763 761 # we need to make sure we trigger the creation of a transaction object used
764 762 # for the whole processing scope.
765 763 op.gettransaction()
766 764 cg = changegroup.unbundle10(inpart, 'UN')
767 765 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
768 766 op.records.add('changegroup', {'return': ret})
769 767 if op.reply is not None:
770 768 # This is definitly not the final form of this
771 769 # return. But one need to start somewhere.
772 op.reply.newpart('b2x:reply:changegroup', (),
773 [('in-reply-to', str(inpart.id)),
774 ('return', '%i' % ret)])
770 part = op.reply.newpart('b2x:reply:changegroup')
771 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
772 part.addparam('return', '%i' % ret, mandatory=False)
775 773 assert not inpart.read()
776 774
777 775 @parthandler('b2x:reply:changegroup')
778 776 def handlechangegroup(op, inpart):
779 777 p = dict(inpart.advisoryparams)
780 778 ret = int(p['return'])
781 779 op.records.add('changegroup', {'return': ret}, int(p['in-reply-to']))
782 780
783 781 @parthandler('b2x:check:heads')
784 782 def handlechangegroup(op, inpart):
785 783 """check that head of the repo did not change
786 784
787 785 This is used to detect a push race when using unbundle.
788 786 This replaces the "heads" argument of unbundle."""
789 787 h = inpart.read(20)
790 788 heads = []
791 789 while len(h) == 20:
792 790 heads.append(h)
793 791 h = inpart.read(20)
794 792 assert not h
795 793 if heads != op.repo.heads():
796 794 raise error.PushRaced('repository changed while pushing - '
797 795 'please try again')
798 796
799 797 @parthandler('b2x:output')
800 798 def handleoutput(op, inpart):
801 799 """forward output captured on the server to the client"""
802 800 for line in inpart.read().splitlines():
803 801 op.ui.write(('remote: %s\n' % line))
804 802
805 803 @parthandler('b2x:replycaps')
806 804 def handlereplycaps(op, inpart):
807 805 """Notify that a reply bundle should be created
808 806
809 807 The payload contains the capabilities information for the reply"""
810 808 caps = decodecaps(inpart.read())
811 809 if op.reply is None:
812 810 op.reply = bundle20(op.ui, caps)
813 811
814 812 @parthandler('b2x:error:abort')
815 813 def handlereplycaps(op, inpart):
816 814 """Used to transmit abort error over the wire"""
817 815 manargs = dict(inpart.mandatoryparams)
818 816 advargs = dict(inpart.advisoryparams)
819 817 raise util.Abort(manargs['message'], hint=advargs.get('hint'))
820 818
821 819 @parthandler('b2x:error:unknownpart')
822 820 def handlereplycaps(op, inpart):
823 821 """Used to transmit unknown part error over the wire"""
824 822 manargs = dict(inpart.mandatoryparams)
825 823 raise UnknownPartError(manargs['parttype'])
826 824
827 825 @parthandler('b2x:error:pushraced')
828 826 def handlereplycaps(op, inpart):
829 827 """Used to transmit push race error over the wire"""
830 828 manargs = dict(inpart.mandatoryparams)
831 829 raise error.ResponseError(_('push failed:'), manargs['message'])
General Comments 0
You need to be logged in to leave comments. Login now