##// END OF EJS Templates
bundle2: have ``newpart`` automatically add the part to the bundle...
Pierre-Yves David -
r21599:57cd844d default
parent child Browse files
Show More
@@ -1,775 +1,775
1 1 # bundle2.py - generic container format to transmit arbitrary data.
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7 """Handling of the new bundle2 format
8 8
9 9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 11 that will be handed to and processed by the application layer.
12 12
13 13
14 14 General format architecture
15 15 ===========================
16 16
17 17 The format is architectured as follow
18 18
19 19 - magic string
20 20 - stream level parameters
21 21 - payload parts (any number)
22 22 - end of stream marker.
23 23
24 24 the Binary format
25 25 ============================
26 26
27 27 All numbers are unsigned and big-endian.
28 28
29 29 stream level parameters
30 30 ------------------------
31 31
32 32 Binary format is as follow
33 33
34 34 :params size: (16 bits integer)
35 35
36 36 The total number of Bytes used by the parameters
37 37
38 38 :params value: arbitrary number of Bytes
39 39
40 40 A blob of `params size` containing the serialized version of all stream level
41 41 parameters.
42 42
43 43 The blob contains a space separated list of parameters. Parameters with value
44 44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45 45
46 46 Empty name are obviously forbidden.
47 47
48 48 Name MUST start with a letter. If this first letter is lower case, the
49 49 parameter is advisory and can be safely ignored. However when the first
50 50 letter is capital, the parameter is mandatory and the bundling process MUST
51 51 stop if he is not able to proceed it.
52 52
53 53 Stream parameters use a simple textual format for two main reasons:
54 54
55 55 - Stream level parameters should remain simple and we want to discourage any
56 56 crazy usage.
57 57 - Textual data allow easy human inspection of a bundle2 header in case of
58 58 troubles.
59 59
60 60 Any Applicative level options MUST go into a bundle2 part instead.
61 61
62 62 Payload part
63 63 ------------------------
64 64
65 65 Binary format is as follow
66 66
67 67 :header size: (16 bits inter)
68 68
69 69 The total number of Bytes used by the part headers. When the header is empty
70 70 (size = 0) this is interpreted as the end of stream marker.
71 71
72 72 :header:
73 73
74 74 The header defines how to interpret the part. It contains two piece of
75 75 data: the part type, and the part parameters.
76 76
77 77 The part type is used to route an application level handler, that can
78 78 interpret payload.
79 79
80 80 Part parameters are passed to the application level handler. They are
81 81 meant to convey information that will help the application level object to
82 82 interpret the part payload.
83 83
84 84 The binary format of the header is has follow
85 85
86 86 :typesize: (one byte)
87 87
88 88 :parttype: alphanumerical part name
89 89
90 90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 91 to this part.
92 92
93 93 :parameters:
94 94
95 95 Part's parameter may have arbitrary content, the binary structure is::
96 96
97 97 <mandatory-count><advisory-count><param-sizes><param-data>
98 98
99 99 :mandatory-count: 1 byte, number of mandatory parameters
100 100
101 101 :advisory-count: 1 byte, number of advisory parameters
102 102
103 103 :param-sizes:
104 104
105 105 N couple of bytes, where N is the total number of parameters. Each
106 106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107 107
108 108 :param-data:
109 109
110 110 A blob of bytes from which each parameter key and value can be
111 111 retrieved using the list of size couples stored in the previous
112 112 field.
113 113
114 114 Mandatory parameters comes first, then the advisory ones.
115 115
116 116 :payload:
117 117
118 118 payload is a series of `<chunksize><chunkdata>`.
119 119
120 120 `chunksize` is a 32 bits integer, `chunkdata` are plain bytes (as much as
121 121 `chunksize` says)` The payload part is concluded by a zero size chunk.
122 122
123 123 The current implementation always produces either zero or one chunk.
124 124 This is an implementation limitation that will ultimately be lifted.
125 125
126 126 Bundle processing
127 127 ============================
128 128
129 129 Each part is processed in order using a "part handler". Handler are registered
130 130 for a certain part type.
131 131
132 132 The matching of a part to its handler is case insensitive. The case of the
133 133 part type is used to know if a part is mandatory or advisory. If the Part type
134 134 contains any uppercase char it is considered mandatory. When no handler is
135 135 known for a Mandatory part, the process is aborted and an exception is raised.
136 136 If the part is advisory and no handler is known, the part is ignored. When the
137 137 process is aborted, the full bundle is still read from the stream to keep the
138 138 channel usable. But none of the part read from an abort are processed. In the
139 139 future, dropping the stream may become an option for channel we do not care to
140 140 preserve.
141 141 """
142 142
143 143 import util
144 144 import struct
145 145 import urllib
146 146 import string
147 147
148 148 import changegroup, error
149 149 from i18n import _
150 150
151 151 _pack = struct.pack
152 152 _unpack = struct.unpack
153 153
154 154 _magicstring = 'HG2X'
155 155
156 156 _fstreamparamsize = '>H'
157 157 _fpartheadersize = '>H'
158 158 _fparttypesize = '>B'
159 159 _fpartid = '>I'
160 160 _fpayloadsize = '>I'
161 161 _fpartparamcount = '>BB'
162 162
163 163 preferedchunksize = 4096
164 164
165 165 def _makefpartparamsizes(nbparams):
166 166 """return a struct format to read part parameter sizes
167 167
168 168 The number parameters is variable so we need to build that format
169 169 dynamically.
170 170 """
171 171 return '>'+('BB'*nbparams)
172 172
173 173 class UnknownPartError(KeyError):
174 174 """error raised when no handler is found for a Mandatory part"""
175 175 pass
176 176
177 177 parthandlermapping = {}
178 178
179 179 def parthandler(parttype):
180 180 """decorator that register a function as a bundle2 part handler
181 181
182 182 eg::
183 183
184 184 @parthandler('myparttype')
185 185 def myparttypehandler(...):
186 186 '''process a part of type "my part".'''
187 187 ...
188 188 """
189 189 def _decorator(func):
190 190 lparttype = parttype.lower() # enforce lower case matching.
191 191 assert lparttype not in parthandlermapping
192 192 parthandlermapping[lparttype] = func
193 193 return func
194 194 return _decorator
195 195
196 196 class unbundlerecords(object):
197 197 """keep record of what happens during and unbundle
198 198
199 199 New records are added using `records.add('cat', obj)`. Where 'cat' is a
200 200 category of record and obj is an arbitrary object.
201 201
202 202 `records['cat']` will return all entries of this category 'cat'.
203 203
204 204 Iterating on the object itself will yield `('category', obj)` tuples
205 205 for all entries.
206 206
207 207 All iterations happens in chronological order.
208 208 """
209 209
210 210 def __init__(self):
211 211 self._categories = {}
212 212 self._sequences = []
213 213 self._replies = {}
214 214
215 215 def add(self, category, entry, inreplyto=None):
216 216 """add a new record of a given category.
217 217
218 218 The entry can then be retrieved in the list returned by
219 219 self['category']."""
220 220 self._categories.setdefault(category, []).append(entry)
221 221 self._sequences.append((category, entry))
222 222 if inreplyto is not None:
223 223 self.getreplies(inreplyto).add(category, entry)
224 224
225 225 def getreplies(self, partid):
226 226 """get the subrecords that replies to a specific part"""
227 227 return self._replies.setdefault(partid, unbundlerecords())
228 228
229 229 def __getitem__(self, cat):
230 230 return tuple(self._categories.get(cat, ()))
231 231
232 232 def __iter__(self):
233 233 return iter(self._sequences)
234 234
235 235 def __len__(self):
236 236 return len(self._sequences)
237 237
238 238 def __nonzero__(self):
239 239 return bool(self._sequences)
240 240
241 241 class bundleoperation(object):
242 242 """an object that represents a single bundling process
243 243
244 244 Its purpose is to carry unbundle-related objects and states.
245 245
246 246 A new object should be created at the beginning of each bundle processing.
247 247 The object is to be returned by the processing function.
248 248
249 249 The object has very little content now it will ultimately contain:
250 250 * an access to the repo the bundle is applied to,
251 251 * a ui object,
252 252 * a way to retrieve a transaction to add changes to the repo,
253 253 * a way to record the result of processing each part,
254 254 * a way to construct a bundle response when applicable.
255 255 """
256 256
257 257 def __init__(self, repo, transactiongetter):
258 258 self.repo = repo
259 259 self.ui = repo.ui
260 260 self.records = unbundlerecords()
261 261 self.gettransaction = transactiongetter
262 262 self.reply = None
263 263
264 264 class TransactionUnavailable(RuntimeError):
265 265 pass
266 266
267 267 def _notransaction():
268 268 """default method to get a transaction while processing a bundle
269 269
270 270 Raise an exception to highlight the fact that no transaction was expected
271 271 to be created"""
272 272 raise TransactionUnavailable()
273 273
274 274 def processbundle(repo, unbundler, transactiongetter=_notransaction):
275 275 """This function process a bundle, apply effect to/from a repo
276 276
277 277 It iterates over each part then searches for and uses the proper handling
278 278 code to process the part. Parts are processed in order.
279 279
280 280 This is very early version of this function that will be strongly reworked
281 281 before final usage.
282 282
283 283 Unknown Mandatory part will abort the process.
284 284 """
285 285 op = bundleoperation(repo, transactiongetter)
286 286 # todo:
287 287 # - replace this is a init function soon.
288 288 # - exception catching
289 289 unbundler.params
290 290 iterparts = unbundler.iterparts()
291 291 part = None
292 292 try:
293 293 for part in iterparts:
294 294 parttype = part.type
295 295 # part key are matched lower case
296 296 key = parttype.lower()
297 297 try:
298 298 handler = parthandlermapping[key]
299 299 op.ui.debug('found a handler for part %r\n' % parttype)
300 300 except KeyError:
301 301 if key != parttype: # mandatory parts
302 302 # todo:
303 303 # - use a more precise exception
304 304 raise UnknownPartError(key)
305 305 op.ui.debug('ignoring unknown advisory part %r\n' % key)
306 306 # consuming the part
307 307 part.read()
308 308 continue
309 309
310 310 # handler is called outside the above try block so that we don't
311 311 # risk catching KeyErrors from anything other than the
312 312 # parthandlermapping lookup (any KeyError raised by handler()
313 313 # itself represents a defect of a different variety).
314 314 output = None
315 315 if op.reply is not None:
316 316 op.ui.pushbuffer(error=True)
317 317 output = ''
318 318 try:
319 319 handler(op, part)
320 320 finally:
321 321 if output is not None:
322 322 output = op.ui.popbuffer()
323 323 if output:
324 324 outpart = bundlepart('b2x:output',
325 325 advisoryparams=[('in-reply-to',
326 326 str(part.id))],
327 327 data=output)
328 328 op.reply.addpart(outpart)
329 329 part.read()
330 330 except Exception, exc:
331 331 if part is not None:
332 332 # consume the bundle content
333 333 part.read()
334 334 for part in iterparts:
335 335 # consume the bundle content
336 336 part.read()
337 337 # Small hack to let caller code distinguish exceptions from bundle2
338 338 # processing fron the ones from bundle1 processing. This is mostly
339 339 # needed to handle different return codes to unbundle according to the
340 340 # type of bundle. We should probably clean up or drop this return code
341 341 # craziness in a future version.
342 342 exc.duringunbundle2 = True
343 343 raise
344 344 return op
345 345
346 346 def decodecaps(blob):
347 347 """decode a bundle2 caps bytes blob into a dictionnary
348 348
349 349 The blob is a list of capabilities (one per line)
350 350 Capabilities may have values using a line of the form::
351 351
352 352 capability=value1,value2,value3
353 353
354 354 The values are always a list."""
355 355 caps = {}
356 356 for line in blob.splitlines():
357 357 if not line:
358 358 continue
359 359 if '=' not in line:
360 360 key, vals = line, ()
361 361 else:
362 362 key, vals = line.split('=', 1)
363 363 vals = vals.split(',')
364 364 key = urllib.unquote(key)
365 365 vals = [urllib.unquote(v) for v in vals]
366 366 caps[key] = vals
367 367 return caps
368 368
369 369 def encodecaps(caps):
370 370 """encode a bundle2 caps dictionary into a bytes blob"""
371 371 chunks = []
372 372 for ca in sorted(caps):
373 373 vals = caps[ca]
374 374 ca = urllib.quote(ca)
375 375 vals = [urllib.quote(v) for v in vals]
376 376 if vals:
377 377 ca = "%s=%s" % (ca, ','.join(vals))
378 378 chunks.append(ca)
379 379 return '\n'.join(chunks)
380 380
381 381 class bundle20(object):
382 382 """represent an outgoing bundle2 container
383 383
384 Use the `addparam` method to add stream level parameter. and `addpart` to
384 Use the `addparam` method to add stream level parameter. and `newpart` to
385 385 populate it. Then call `getchunks` to retrieve all the binary chunks of
386 386 data that compose the bundle2 container."""
387 387
388 388 def __init__(self, ui, capabilities=()):
389 389 self.ui = ui
390 390 self._params = []
391 391 self._parts = []
392 392 self.capabilities = dict(capabilities)
393 393
394 394 # methods used to defines the bundle2 content
395 395 def addparam(self, name, value=None):
396 396 """add a stream level parameter"""
397 397 if not name:
398 398 raise ValueError('empty parameter name')
399 399 if name[0] not in string.letters:
400 400 raise ValueError('non letter first character: %r' % name)
401 401 self._params.append((name, value))
402 402
403 403 def addpart(self, part):
404 404 """add a new part to the bundle2 container
405 405
406 406 Parts contains the actual applicative payload."""
407 407 assert part.id is None
408 408 part.id = len(self._parts) # very cheap counter
409 409 self._parts.append(part)
410 410
411 411 def newpart(self, typeid, *args, **kwargs):
412 412 """create a new part for the containers"""
413 413 part = bundlepart(typeid, *args, **kwargs)
414 self.addpart(part)
414 415 return part
415 416
416 417 # methods used to generate the bundle2 stream
417 418 def getchunks(self):
418 419 self.ui.debug('start emission of %s stream\n' % _magicstring)
419 420 yield _magicstring
420 421 param = self._paramchunk()
421 422 self.ui.debug('bundle parameter: %s\n' % param)
422 423 yield _pack(_fstreamparamsize, len(param))
423 424 if param:
424 425 yield param
425 426
426 427 self.ui.debug('start of parts\n')
427 428 for part in self._parts:
428 429 self.ui.debug('bundle part: "%s"\n' % part.type)
429 430 for chunk in part.getchunks():
430 431 yield chunk
431 432 self.ui.debug('end of bundle\n')
432 433 yield '\0\0'
433 434
434 435 def _paramchunk(self):
435 436 """return a encoded version of all stream parameters"""
436 437 blocks = []
437 438 for par, value in self._params:
438 439 par = urllib.quote(par)
439 440 if value is not None:
440 441 value = urllib.quote(value)
441 442 par = '%s=%s' % (par, value)
442 443 blocks.append(par)
443 444 return ' '.join(blocks)
444 445
445 446 class unpackermixin(object):
446 447 """A mixin to extract bytes and struct data from a stream"""
447 448
448 449 def __init__(self, fp):
449 450 self._fp = fp
450 451
451 452 def _unpack(self, format):
452 453 """unpack this struct format from the stream"""
453 454 data = self._readexact(struct.calcsize(format))
454 455 return _unpack(format, data)
455 456
456 457 def _readexact(self, size):
457 458 """read exactly <size> bytes from the stream"""
458 459 return changegroup.readexactly(self._fp, size)
459 460
460 461
461 462 class unbundle20(unpackermixin):
462 463 """interpret a bundle2 stream
463 464
464 465 This class is fed with a binary stream and yields parts through its
465 466 `iterparts` methods."""
466 467
467 468 def __init__(self, ui, fp, header=None):
468 469 """If header is specified, we do not read it out of the stream."""
469 470 self.ui = ui
470 471 super(unbundle20, self).__init__(fp)
471 472 if header is None:
472 473 header = self._readexact(4)
473 474 magic, version = header[0:2], header[2:4]
474 475 if magic != 'HG':
475 476 raise util.Abort(_('not a Mercurial bundle'))
476 477 if version != '2X':
477 478 raise util.Abort(_('unknown bundle version %s') % version)
478 479 self.ui.debug('start processing of %s stream\n' % header)
479 480
480 481 @util.propertycache
481 482 def params(self):
482 483 """dictionary of stream level parameters"""
483 484 self.ui.debug('reading bundle2 stream parameters\n')
484 485 params = {}
485 486 paramssize = self._unpack(_fstreamparamsize)[0]
486 487 if paramssize:
487 488 for p in self._readexact(paramssize).split(' '):
488 489 p = p.split('=', 1)
489 490 p = [urllib.unquote(i) for i in p]
490 491 if len(p) < 2:
491 492 p.append(None)
492 493 self._processparam(*p)
493 494 params[p[0]] = p[1]
494 495 return params
495 496
496 497 def _processparam(self, name, value):
497 498 """process a parameter, applying its effect if needed
498 499
499 500 Parameter starting with a lower case letter are advisory and will be
500 501 ignored when unknown. Those starting with an upper case letter are
501 502 mandatory and will this function will raise a KeyError when unknown.
502 503
503 504 Note: no option are currently supported. Any input will be either
504 505 ignored or failing.
505 506 """
506 507 if not name:
507 508 raise ValueError('empty parameter name')
508 509 if name[0] not in string.letters:
509 510 raise ValueError('non letter first character: %r' % name)
510 511 # Some logic will be later added here to try to process the option for
511 512 # a dict of known parameter.
512 513 if name[0].islower():
513 514 self.ui.debug("ignoring unknown parameter %r\n" % name)
514 515 else:
515 516 raise KeyError(name)
516 517
517 518
518 519 def iterparts(self):
519 520 """yield all parts contained in the stream"""
520 521 # make sure param have been loaded
521 522 self.params
522 523 self.ui.debug('start extraction of bundle2 parts\n')
523 524 headerblock = self._readpartheader()
524 525 while headerblock is not None:
525 526 part = unbundlepart(self.ui, headerblock, self._fp)
526 527 yield part
527 528 headerblock = self._readpartheader()
528 529 self.ui.debug('end of bundle2 stream\n')
529 530
530 531 def _readpartheader(self):
531 532 """reads a part header size and return the bytes blob
532 533
533 534 returns None if empty"""
534 535 headersize = self._unpack(_fpartheadersize)[0]
535 536 self.ui.debug('part header size: %i\n' % headersize)
536 537 if headersize:
537 538 return self._readexact(headersize)
538 539 return None
539 540
540 541
541 542 class bundlepart(object):
542 543 """A bundle2 part contains application level payload
543 544
544 545 The part `type` is used to route the part to the application level
545 546 handler.
546 547 """
547 548
548 549 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
549 550 data=''):
550 551 self.id = None
551 552 self.type = parttype
552 553 self.data = data
553 554 self.mandatoryparams = mandatoryparams
554 555 self.advisoryparams = advisoryparams
555 556
556 557 def getchunks(self):
557 558 #### header
558 559 ## parttype
559 560 header = [_pack(_fparttypesize, len(self.type)),
560 561 self.type, _pack(_fpartid, self.id),
561 562 ]
562 563 ## parameters
563 564 # count
564 565 manpar = self.mandatoryparams
565 566 advpar = self.advisoryparams
566 567 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
567 568 # size
568 569 parsizes = []
569 570 for key, value in manpar:
570 571 parsizes.append(len(key))
571 572 parsizes.append(len(value))
572 573 for key, value in advpar:
573 574 parsizes.append(len(key))
574 575 parsizes.append(len(value))
575 576 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
576 577 header.append(paramsizes)
577 578 # key, value
578 579 for key, value in manpar:
579 580 header.append(key)
580 581 header.append(value)
581 582 for key, value in advpar:
582 583 header.append(key)
583 584 header.append(value)
584 585 ## finalize header
585 586 headerchunk = ''.join(header)
586 587 yield _pack(_fpartheadersize, len(headerchunk))
587 588 yield headerchunk
588 589 ## payload
589 590 for chunk in self._payloadchunks():
590 591 yield _pack(_fpayloadsize, len(chunk))
591 592 yield chunk
592 593 # end of payload
593 594 yield _pack(_fpayloadsize, 0)
594 595
595 596 def _payloadchunks(self):
596 597 """yield chunks of a the part payload
597 598
598 599 Exists to handle the different methods to provide data to a part."""
599 600 # we only support fixed size data now.
600 601 # This will be improved in the future.
601 602 if util.safehasattr(self.data, 'next'):
602 603 buff = util.chunkbuffer(self.data)
603 604 chunk = buff.read(preferedchunksize)
604 605 while chunk:
605 606 yield chunk
606 607 chunk = buff.read(preferedchunksize)
607 608 elif len(self.data):
608 609 yield self.data
609 610
610 611 class unbundlepart(unpackermixin):
611 612 """a bundle part read from a bundle"""
612 613
613 614 def __init__(self, ui, header, fp):
614 615 super(unbundlepart, self).__init__(fp)
615 616 self.ui = ui
616 617 # unbundle state attr
617 618 self._headerdata = header
618 619 self._headeroffset = 0
619 620 self._initialized = False
620 621 self.consumed = False
621 622 # part data
622 623 self.id = None
623 624 self.type = None
624 625 self.mandatoryparams = None
625 626 self.advisoryparams = None
626 627 self._payloadstream = None
627 628 self._readheader()
628 629
629 630 def _fromheader(self, size):
630 631 """return the next <size> byte from the header"""
631 632 offset = self._headeroffset
632 633 data = self._headerdata[offset:(offset + size)]
633 634 self._headeroffset = offset + size
634 635 return data
635 636
636 637 def _unpackheader(self, format):
637 638 """read given format from header
638 639
639 640 This automatically compute the size of the format to read."""
640 641 data = self._fromheader(struct.calcsize(format))
641 642 return _unpack(format, data)
642 643
643 644 def _readheader(self):
644 645 """read the header and setup the object"""
645 646 typesize = self._unpackheader(_fparttypesize)[0]
646 647 self.type = self._fromheader(typesize)
647 648 self.ui.debug('part type: "%s"\n' % self.type)
648 649 self.id = self._unpackheader(_fpartid)[0]
649 650 self.ui.debug('part id: "%s"\n' % self.id)
650 651 ## reading parameters
651 652 # param count
652 653 mancount, advcount = self._unpackheader(_fpartparamcount)
653 654 self.ui.debug('part parameters: %i\n' % (mancount + advcount))
654 655 # param size
655 656 fparamsizes = _makefpartparamsizes(mancount + advcount)
656 657 paramsizes = self._unpackheader(fparamsizes)
657 658 # make it a list of couple again
658 659 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
659 660 # split mandatory from advisory
660 661 mansizes = paramsizes[:mancount]
661 662 advsizes = paramsizes[mancount:]
662 663 # retrive param value
663 664 manparams = []
664 665 for key, value in mansizes:
665 666 manparams.append((self._fromheader(key), self._fromheader(value)))
666 667 advparams = []
667 668 for key, value in advsizes:
668 669 advparams.append((self._fromheader(key), self._fromheader(value)))
669 670 self.mandatoryparams = manparams
670 671 self.advisoryparams = advparams
671 672 ## part payload
672 673 def payloadchunks():
673 674 payloadsize = self._unpack(_fpayloadsize)[0]
674 675 self.ui.debug('payload chunk size: %i\n' % payloadsize)
675 676 while payloadsize:
676 677 yield self._readexact(payloadsize)
677 678 payloadsize = self._unpack(_fpayloadsize)[0]
678 679 self.ui.debug('payload chunk size: %i\n' % payloadsize)
679 680 self._payloadstream = util.chunkbuffer(payloadchunks())
680 681 # we read the data, tell it
681 682 self._initialized = True
682 683
683 684 def read(self, size=None):
684 685 """read payload data"""
685 686 if not self._initialized:
686 687 self._readheader()
687 688 if size is None:
688 689 data = self._payloadstream.read()
689 690 else:
690 691 data = self._payloadstream.read(size)
691 692 if size is None or len(data) < size:
692 693 self.consumed = True
693 694 return data
694 695
695 696
696 697 @parthandler('b2x:changegroup')
697 698 def handlechangegroup(op, inpart):
698 699 """apply a changegroup part on the repo
699 700
700 701 This is a very early implementation that will massive rework before being
701 702 inflicted to any end-user.
702 703 """
703 704 # Make sure we trigger a transaction creation
704 705 #
705 706 # The addchangegroup function will get a transaction object by itself, but
706 707 # we need to make sure we trigger the creation of a transaction object used
707 708 # for the whole processing scope.
708 709 op.gettransaction()
709 710 cg = changegroup.unbundle10(inpart, 'UN')
710 711 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
711 712 op.records.add('changegroup', {'return': ret})
712 713 if op.reply is not None:
713 714 # This is definitly not the final form of this
714 715 # return. But one need to start somewhere.
715 part = op.reply.newpart('b2x:reply:changegroup', (),
716 op.reply.newpart('b2x:reply:changegroup', (),
716 717 [('in-reply-to', str(inpart.id)),
717 718 ('return', '%i' % ret)])
718 op.reply.addpart(part)
719 719 assert not inpart.read()
720 720
721 721 @parthandler('b2x:reply:changegroup')
722 722 def handlechangegroup(op, inpart):
723 723 p = dict(inpart.advisoryparams)
724 724 ret = int(p['return'])
725 725 op.records.add('changegroup', {'return': ret}, int(p['in-reply-to']))
726 726
727 727 @parthandler('b2x:check:heads')
728 728 def handlechangegroup(op, inpart):
729 729 """check that head of the repo did not change
730 730
731 731 This is used to detect a push race when using unbundle.
732 732 This replaces the "heads" argument of unbundle."""
733 733 h = inpart.read(20)
734 734 heads = []
735 735 while len(h) == 20:
736 736 heads.append(h)
737 737 h = inpart.read(20)
738 738 assert not h
739 739 if heads != op.repo.heads():
740 740 raise error.PushRaced('repository changed while pushing - '
741 741 'please try again')
742 742
743 743 @parthandler('b2x:output')
744 744 def handleoutput(op, inpart):
745 745 """forward output captured on the server to the client"""
746 746 for line in inpart.read().splitlines():
747 747 op.ui.write(('remote: %s\n' % line))
748 748
749 749 @parthandler('b2x:replycaps')
750 750 def handlereplycaps(op, inpart):
751 751 """Notify that a reply bundle should be created
752 752
753 753 The payload contains the capabilities information for the reply"""
754 754 caps = decodecaps(inpart.read())
755 755 if op.reply is None:
756 756 op.reply = bundle20(op.ui, caps)
757 757
758 758 @parthandler('b2x:error:abort')
759 759 def handlereplycaps(op, inpart):
760 760 """Used to transmit abort error over the wire"""
761 761 manargs = dict(inpart.mandatoryparams)
762 762 advargs = dict(inpart.advisoryparams)
763 763 raise util.Abort(manargs['message'], hint=advargs.get('hint'))
764 764
765 765 @parthandler('b2x:error:unknownpart')
766 766 def handlereplycaps(op, inpart):
767 767 """Used to transmit unknown part error over the wire"""
768 768 manargs = dict(inpart.mandatoryparams)
769 769 raise UnknownPartError(manargs['parttype'])
770 770
771 771 @parthandler('b2x:error:pushraced')
772 772 def handlereplycaps(op, inpart):
773 773 """Used to transmit push race error over the wire"""
774 774 manargs = dict(inpart.mandatoryparams)
775 775 raise error.ResponseError(_('push failed:'), manargs['message'])
General Comments 0
You need to be logged in to leave comments. Login now