##// END OF EJS Templates
bundle2: add a ``newpart`` method to ``bundle20``...
Pierre-Yves David -
r21598:1b0dbb91 default
parent child Browse files
Show More
@@ -1,770 +1,775
1 1 # bundle2.py - generic container format to transmit arbitrary data.
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7 """Handling of the new bundle2 format
8 8
9 9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 11 that will be handed to and processed by the application layer.
12 12
13 13
14 14 General format architecture
15 15 ===========================
16 16
17 17 The format is architectured as follow
18 18
19 19 - magic string
20 20 - stream level parameters
21 21 - payload parts (any number)
22 22 - end of stream marker.
23 23
24 24 the Binary format
25 25 ============================
26 26
27 27 All numbers are unsigned and big-endian.
28 28
29 29 stream level parameters
30 30 ------------------------
31 31
32 32 Binary format is as follow
33 33
34 34 :params size: (16 bits integer)
35 35
36 36 The total number of Bytes used by the parameters
37 37
38 38 :params value: arbitrary number of Bytes
39 39
40 40 A blob of `params size` containing the serialized version of all stream level
41 41 parameters.
42 42
43 43 The blob contains a space separated list of parameters. Parameters with value
44 44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45 45
46 46 Empty name are obviously forbidden.
47 47
48 48 Name MUST start with a letter. If this first letter is lower case, the
49 49 parameter is advisory and can be safely ignored. However when the first
50 50 letter is capital, the parameter is mandatory and the bundling process MUST
51 51 stop if he is not able to proceed it.
52 52
53 53 Stream parameters use a simple textual format for two main reasons:
54 54
55 55 - Stream level parameters should remain simple and we want to discourage any
56 56 crazy usage.
57 57 - Textual data allow easy human inspection of a bundle2 header in case of
58 58 troubles.
59 59
60 60 Any Applicative level options MUST go into a bundle2 part instead.
61 61
62 62 Payload part
63 63 ------------------------
64 64
65 65 Binary format is as follow
66 66
67 67 :header size: (16 bits inter)
68 68
69 69 The total number of Bytes used by the part headers. When the header is empty
70 70 (size = 0) this is interpreted as the end of stream marker.
71 71
72 72 :header:
73 73
74 74 The header defines how to interpret the part. It contains two piece of
75 75 data: the part type, and the part parameters.
76 76
77 77 The part type is used to route an application level handler, that can
78 78 interpret payload.
79 79
80 80 Part parameters are passed to the application level handler. They are
81 81 meant to convey information that will help the application level object to
82 82 interpret the part payload.
83 83
84 84 The binary format of the header is has follow
85 85
86 86 :typesize: (one byte)
87 87
88 88 :parttype: alphanumerical part name
89 89
90 90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 91 to this part.
92 92
93 93 :parameters:
94 94
95 95 Part's parameter may have arbitrary content, the binary structure is::
96 96
97 97 <mandatory-count><advisory-count><param-sizes><param-data>
98 98
99 99 :mandatory-count: 1 byte, number of mandatory parameters
100 100
101 101 :advisory-count: 1 byte, number of advisory parameters
102 102
103 103 :param-sizes:
104 104
105 105 N couple of bytes, where N is the total number of parameters. Each
106 106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107 107
108 108 :param-data:
109 109
110 110 A blob of bytes from which each parameter key and value can be
111 111 retrieved using the list of size couples stored in the previous
112 112 field.
113 113
114 114 Mandatory parameters comes first, then the advisory ones.
115 115
116 116 :payload:
117 117
118 118 payload is a series of `<chunksize><chunkdata>`.
119 119
120 120 `chunksize` is a 32 bits integer, `chunkdata` are plain bytes (as much as
121 121 `chunksize` says)` The payload part is concluded by a zero size chunk.
122 122
123 123 The current implementation always produces either zero or one chunk.
124 124 This is an implementation limitation that will ultimately be lifted.
125 125
126 126 Bundle processing
127 127 ============================
128 128
129 129 Each part is processed in order using a "part handler". Handler are registered
130 130 for a certain part type.
131 131
132 132 The matching of a part to its handler is case insensitive. The case of the
133 133 part type is used to know if a part is mandatory or advisory. If the Part type
134 134 contains any uppercase char it is considered mandatory. When no handler is
135 135 known for a Mandatory part, the process is aborted and an exception is raised.
136 136 If the part is advisory and no handler is known, the part is ignored. When the
137 137 process is aborted, the full bundle is still read from the stream to keep the
138 138 channel usable. But none of the part read from an abort are processed. In the
139 139 future, dropping the stream may become an option for channel we do not care to
140 140 preserve.
141 141 """
142 142
143 143 import util
144 144 import struct
145 145 import urllib
146 146 import string
147 147
148 148 import changegroup, error
149 149 from i18n import _
150 150
151 151 _pack = struct.pack
152 152 _unpack = struct.unpack
153 153
154 154 _magicstring = 'HG2X'
155 155
156 156 _fstreamparamsize = '>H'
157 157 _fpartheadersize = '>H'
158 158 _fparttypesize = '>B'
159 159 _fpartid = '>I'
160 160 _fpayloadsize = '>I'
161 161 _fpartparamcount = '>BB'
162 162
163 163 preferedchunksize = 4096
164 164
165 165 def _makefpartparamsizes(nbparams):
166 166 """return a struct format to read part parameter sizes
167 167
168 168 The number parameters is variable so we need to build that format
169 169 dynamically.
170 170 """
171 171 return '>'+('BB'*nbparams)
172 172
173 173 class UnknownPartError(KeyError):
174 174 """error raised when no handler is found for a Mandatory part"""
175 175 pass
176 176
177 177 parthandlermapping = {}
178 178
179 179 def parthandler(parttype):
180 180 """decorator that register a function as a bundle2 part handler
181 181
182 182 eg::
183 183
184 184 @parthandler('myparttype')
185 185 def myparttypehandler(...):
186 186 '''process a part of type "my part".'''
187 187 ...
188 188 """
189 189 def _decorator(func):
190 190 lparttype = parttype.lower() # enforce lower case matching.
191 191 assert lparttype not in parthandlermapping
192 192 parthandlermapping[lparttype] = func
193 193 return func
194 194 return _decorator
195 195
196 196 class unbundlerecords(object):
197 197 """keep record of what happens during and unbundle
198 198
199 199 New records are added using `records.add('cat', obj)`. Where 'cat' is a
200 200 category of record and obj is an arbitrary object.
201 201
202 202 `records['cat']` will return all entries of this category 'cat'.
203 203
204 204 Iterating on the object itself will yield `('category', obj)` tuples
205 205 for all entries.
206 206
207 207 All iterations happens in chronological order.
208 208 """
209 209
210 210 def __init__(self):
211 211 self._categories = {}
212 212 self._sequences = []
213 213 self._replies = {}
214 214
215 215 def add(self, category, entry, inreplyto=None):
216 216 """add a new record of a given category.
217 217
218 218 The entry can then be retrieved in the list returned by
219 219 self['category']."""
220 220 self._categories.setdefault(category, []).append(entry)
221 221 self._sequences.append((category, entry))
222 222 if inreplyto is not None:
223 223 self.getreplies(inreplyto).add(category, entry)
224 224
225 225 def getreplies(self, partid):
226 226 """get the subrecords that replies to a specific part"""
227 227 return self._replies.setdefault(partid, unbundlerecords())
228 228
229 229 def __getitem__(self, cat):
230 230 return tuple(self._categories.get(cat, ()))
231 231
232 232 def __iter__(self):
233 233 return iter(self._sequences)
234 234
235 235 def __len__(self):
236 236 return len(self._sequences)
237 237
238 238 def __nonzero__(self):
239 239 return bool(self._sequences)
240 240
241 241 class bundleoperation(object):
242 242 """an object that represents a single bundling process
243 243
244 244 Its purpose is to carry unbundle-related objects and states.
245 245
246 246 A new object should be created at the beginning of each bundle processing.
247 247 The object is to be returned by the processing function.
248 248
249 249 The object has very little content now it will ultimately contain:
250 250 * an access to the repo the bundle is applied to,
251 251 * a ui object,
252 252 * a way to retrieve a transaction to add changes to the repo,
253 253 * a way to record the result of processing each part,
254 254 * a way to construct a bundle response when applicable.
255 255 """
256 256
257 257 def __init__(self, repo, transactiongetter):
258 258 self.repo = repo
259 259 self.ui = repo.ui
260 260 self.records = unbundlerecords()
261 261 self.gettransaction = transactiongetter
262 262 self.reply = None
263 263
264 264 class TransactionUnavailable(RuntimeError):
265 265 pass
266 266
267 267 def _notransaction():
268 268 """default method to get a transaction while processing a bundle
269 269
270 270 Raise an exception to highlight the fact that no transaction was expected
271 271 to be created"""
272 272 raise TransactionUnavailable()
273 273
274 274 def processbundle(repo, unbundler, transactiongetter=_notransaction):
275 275 """This function process a bundle, apply effect to/from a repo
276 276
277 277 It iterates over each part then searches for and uses the proper handling
278 278 code to process the part. Parts are processed in order.
279 279
280 280 This is very early version of this function that will be strongly reworked
281 281 before final usage.
282 282
283 283 Unknown Mandatory part will abort the process.
284 284 """
285 285 op = bundleoperation(repo, transactiongetter)
286 286 # todo:
287 287 # - replace this is a init function soon.
288 288 # - exception catching
289 289 unbundler.params
290 290 iterparts = unbundler.iterparts()
291 291 part = None
292 292 try:
293 293 for part in iterparts:
294 294 parttype = part.type
295 295 # part key are matched lower case
296 296 key = parttype.lower()
297 297 try:
298 298 handler = parthandlermapping[key]
299 299 op.ui.debug('found a handler for part %r\n' % parttype)
300 300 except KeyError:
301 301 if key != parttype: # mandatory parts
302 302 # todo:
303 303 # - use a more precise exception
304 304 raise UnknownPartError(key)
305 305 op.ui.debug('ignoring unknown advisory part %r\n' % key)
306 306 # consuming the part
307 307 part.read()
308 308 continue
309 309
310 310 # handler is called outside the above try block so that we don't
311 311 # risk catching KeyErrors from anything other than the
312 312 # parthandlermapping lookup (any KeyError raised by handler()
313 313 # itself represents a defect of a different variety).
314 314 output = None
315 315 if op.reply is not None:
316 316 op.ui.pushbuffer(error=True)
317 317 output = ''
318 318 try:
319 319 handler(op, part)
320 320 finally:
321 321 if output is not None:
322 322 output = op.ui.popbuffer()
323 323 if output:
324 324 outpart = bundlepart('b2x:output',
325 325 advisoryparams=[('in-reply-to',
326 326 str(part.id))],
327 327 data=output)
328 328 op.reply.addpart(outpart)
329 329 part.read()
330 330 except Exception, exc:
331 331 if part is not None:
332 332 # consume the bundle content
333 333 part.read()
334 334 for part in iterparts:
335 335 # consume the bundle content
336 336 part.read()
337 337 # Small hack to let caller code distinguish exceptions from bundle2
338 338 # processing fron the ones from bundle1 processing. This is mostly
339 339 # needed to handle different return codes to unbundle according to the
340 340 # type of bundle. We should probably clean up or drop this return code
341 341 # craziness in a future version.
342 342 exc.duringunbundle2 = True
343 343 raise
344 344 return op
345 345
346 346 def decodecaps(blob):
347 347 """decode a bundle2 caps bytes blob into a dictionnary
348 348
349 349 The blob is a list of capabilities (one per line)
350 350 Capabilities may have values using a line of the form::
351 351
352 352 capability=value1,value2,value3
353 353
354 354 The values are always a list."""
355 355 caps = {}
356 356 for line in blob.splitlines():
357 357 if not line:
358 358 continue
359 359 if '=' not in line:
360 360 key, vals = line, ()
361 361 else:
362 362 key, vals = line.split('=', 1)
363 363 vals = vals.split(',')
364 364 key = urllib.unquote(key)
365 365 vals = [urllib.unquote(v) for v in vals]
366 366 caps[key] = vals
367 367 return caps
368 368
369 369 def encodecaps(caps):
370 370 """encode a bundle2 caps dictionary into a bytes blob"""
371 371 chunks = []
372 372 for ca in sorted(caps):
373 373 vals = caps[ca]
374 374 ca = urllib.quote(ca)
375 375 vals = [urllib.quote(v) for v in vals]
376 376 if vals:
377 377 ca = "%s=%s" % (ca, ','.join(vals))
378 378 chunks.append(ca)
379 379 return '\n'.join(chunks)
380 380
381 381 class bundle20(object):
382 382 """represent an outgoing bundle2 container
383 383
384 384 Use the `addparam` method to add stream level parameter. and `addpart` to
385 385 populate it. Then call `getchunks` to retrieve all the binary chunks of
386 386 data that compose the bundle2 container."""
387 387
388 388 def __init__(self, ui, capabilities=()):
389 389 self.ui = ui
390 390 self._params = []
391 391 self._parts = []
392 392 self.capabilities = dict(capabilities)
393 393
394 394 # methods used to defines the bundle2 content
395 395 def addparam(self, name, value=None):
396 396 """add a stream level parameter"""
397 397 if not name:
398 398 raise ValueError('empty parameter name')
399 399 if name[0] not in string.letters:
400 400 raise ValueError('non letter first character: %r' % name)
401 401 self._params.append((name, value))
402 402
403 403 def addpart(self, part):
404 404 """add a new part to the bundle2 container
405 405
406 406 Parts contains the actual applicative payload."""
407 407 assert part.id is None
408 408 part.id = len(self._parts) # very cheap counter
409 409 self._parts.append(part)
410 410
411 def newpart(self, typeid, *args, **kwargs):
412 """create a new part for the containers"""
413 part = bundlepart(typeid, *args, **kwargs)
414 return part
415
411 416 # methods used to generate the bundle2 stream
412 417 def getchunks(self):
413 418 self.ui.debug('start emission of %s stream\n' % _magicstring)
414 419 yield _magicstring
415 420 param = self._paramchunk()
416 421 self.ui.debug('bundle parameter: %s\n' % param)
417 422 yield _pack(_fstreamparamsize, len(param))
418 423 if param:
419 424 yield param
420 425
421 426 self.ui.debug('start of parts\n')
422 427 for part in self._parts:
423 428 self.ui.debug('bundle part: "%s"\n' % part.type)
424 429 for chunk in part.getchunks():
425 430 yield chunk
426 431 self.ui.debug('end of bundle\n')
427 432 yield '\0\0'
428 433
429 434 def _paramchunk(self):
430 435 """return a encoded version of all stream parameters"""
431 436 blocks = []
432 437 for par, value in self._params:
433 438 par = urllib.quote(par)
434 439 if value is not None:
435 440 value = urllib.quote(value)
436 441 par = '%s=%s' % (par, value)
437 442 blocks.append(par)
438 443 return ' '.join(blocks)
439 444
440 445 class unpackermixin(object):
441 446 """A mixin to extract bytes and struct data from a stream"""
442 447
443 448 def __init__(self, fp):
444 449 self._fp = fp
445 450
446 451 def _unpack(self, format):
447 452 """unpack this struct format from the stream"""
448 453 data = self._readexact(struct.calcsize(format))
449 454 return _unpack(format, data)
450 455
451 456 def _readexact(self, size):
452 457 """read exactly <size> bytes from the stream"""
453 458 return changegroup.readexactly(self._fp, size)
454 459
455 460
456 461 class unbundle20(unpackermixin):
457 462 """interpret a bundle2 stream
458 463
459 464 This class is fed with a binary stream and yields parts through its
460 465 `iterparts` methods."""
461 466
462 467 def __init__(self, ui, fp, header=None):
463 468 """If header is specified, we do not read it out of the stream."""
464 469 self.ui = ui
465 470 super(unbundle20, self).__init__(fp)
466 471 if header is None:
467 472 header = self._readexact(4)
468 473 magic, version = header[0:2], header[2:4]
469 474 if magic != 'HG':
470 475 raise util.Abort(_('not a Mercurial bundle'))
471 476 if version != '2X':
472 477 raise util.Abort(_('unknown bundle version %s') % version)
473 478 self.ui.debug('start processing of %s stream\n' % header)
474 479
475 480 @util.propertycache
476 481 def params(self):
477 482 """dictionary of stream level parameters"""
478 483 self.ui.debug('reading bundle2 stream parameters\n')
479 484 params = {}
480 485 paramssize = self._unpack(_fstreamparamsize)[0]
481 486 if paramssize:
482 487 for p in self._readexact(paramssize).split(' '):
483 488 p = p.split('=', 1)
484 489 p = [urllib.unquote(i) for i in p]
485 490 if len(p) < 2:
486 491 p.append(None)
487 492 self._processparam(*p)
488 493 params[p[0]] = p[1]
489 494 return params
490 495
491 496 def _processparam(self, name, value):
492 497 """process a parameter, applying its effect if needed
493 498
494 499 Parameter starting with a lower case letter are advisory and will be
495 500 ignored when unknown. Those starting with an upper case letter are
496 501 mandatory and will this function will raise a KeyError when unknown.
497 502
498 503 Note: no option are currently supported. Any input will be either
499 504 ignored or failing.
500 505 """
501 506 if not name:
502 507 raise ValueError('empty parameter name')
503 508 if name[0] not in string.letters:
504 509 raise ValueError('non letter first character: %r' % name)
505 510 # Some logic will be later added here to try to process the option for
506 511 # a dict of known parameter.
507 512 if name[0].islower():
508 513 self.ui.debug("ignoring unknown parameter %r\n" % name)
509 514 else:
510 515 raise KeyError(name)
511 516
512 517
513 518 def iterparts(self):
514 519 """yield all parts contained in the stream"""
515 520 # make sure param have been loaded
516 521 self.params
517 522 self.ui.debug('start extraction of bundle2 parts\n')
518 523 headerblock = self._readpartheader()
519 524 while headerblock is not None:
520 525 part = unbundlepart(self.ui, headerblock, self._fp)
521 526 yield part
522 527 headerblock = self._readpartheader()
523 528 self.ui.debug('end of bundle2 stream\n')
524 529
525 530 def _readpartheader(self):
526 531 """reads a part header size and return the bytes blob
527 532
528 533 returns None if empty"""
529 534 headersize = self._unpack(_fpartheadersize)[0]
530 535 self.ui.debug('part header size: %i\n' % headersize)
531 536 if headersize:
532 537 return self._readexact(headersize)
533 538 return None
534 539
535 540
536 541 class bundlepart(object):
537 542 """A bundle2 part contains application level payload
538 543
539 544 The part `type` is used to route the part to the application level
540 545 handler.
541 546 """
542 547
543 548 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
544 549 data=''):
545 550 self.id = None
546 551 self.type = parttype
547 552 self.data = data
548 553 self.mandatoryparams = mandatoryparams
549 554 self.advisoryparams = advisoryparams
550 555
551 556 def getchunks(self):
552 557 #### header
553 558 ## parttype
554 559 header = [_pack(_fparttypesize, len(self.type)),
555 560 self.type, _pack(_fpartid, self.id),
556 561 ]
557 562 ## parameters
558 563 # count
559 564 manpar = self.mandatoryparams
560 565 advpar = self.advisoryparams
561 566 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
562 567 # size
563 568 parsizes = []
564 569 for key, value in manpar:
565 570 parsizes.append(len(key))
566 571 parsizes.append(len(value))
567 572 for key, value in advpar:
568 573 parsizes.append(len(key))
569 574 parsizes.append(len(value))
570 575 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
571 576 header.append(paramsizes)
572 577 # key, value
573 578 for key, value in manpar:
574 579 header.append(key)
575 580 header.append(value)
576 581 for key, value in advpar:
577 582 header.append(key)
578 583 header.append(value)
579 584 ## finalize header
580 585 headerchunk = ''.join(header)
581 586 yield _pack(_fpartheadersize, len(headerchunk))
582 587 yield headerchunk
583 588 ## payload
584 589 for chunk in self._payloadchunks():
585 590 yield _pack(_fpayloadsize, len(chunk))
586 591 yield chunk
587 592 # end of payload
588 593 yield _pack(_fpayloadsize, 0)
589 594
590 595 def _payloadchunks(self):
591 596 """yield chunks of a the part payload
592 597
593 598 Exists to handle the different methods to provide data to a part."""
594 599 # we only support fixed size data now.
595 600 # This will be improved in the future.
596 601 if util.safehasattr(self.data, 'next'):
597 602 buff = util.chunkbuffer(self.data)
598 603 chunk = buff.read(preferedchunksize)
599 604 while chunk:
600 605 yield chunk
601 606 chunk = buff.read(preferedchunksize)
602 607 elif len(self.data):
603 608 yield self.data
604 609
605 610 class unbundlepart(unpackermixin):
606 611 """a bundle part read from a bundle"""
607 612
608 613 def __init__(self, ui, header, fp):
609 614 super(unbundlepart, self).__init__(fp)
610 615 self.ui = ui
611 616 # unbundle state attr
612 617 self._headerdata = header
613 618 self._headeroffset = 0
614 619 self._initialized = False
615 620 self.consumed = False
616 621 # part data
617 622 self.id = None
618 623 self.type = None
619 624 self.mandatoryparams = None
620 625 self.advisoryparams = None
621 626 self._payloadstream = None
622 627 self._readheader()
623 628
624 629 def _fromheader(self, size):
625 630 """return the next <size> byte from the header"""
626 631 offset = self._headeroffset
627 632 data = self._headerdata[offset:(offset + size)]
628 633 self._headeroffset = offset + size
629 634 return data
630 635
631 636 def _unpackheader(self, format):
632 637 """read given format from header
633 638
634 639 This automatically compute the size of the format to read."""
635 640 data = self._fromheader(struct.calcsize(format))
636 641 return _unpack(format, data)
637 642
638 643 def _readheader(self):
639 644 """read the header and setup the object"""
640 645 typesize = self._unpackheader(_fparttypesize)[0]
641 646 self.type = self._fromheader(typesize)
642 647 self.ui.debug('part type: "%s"\n' % self.type)
643 648 self.id = self._unpackheader(_fpartid)[0]
644 649 self.ui.debug('part id: "%s"\n' % self.id)
645 650 ## reading parameters
646 651 # param count
647 652 mancount, advcount = self._unpackheader(_fpartparamcount)
648 653 self.ui.debug('part parameters: %i\n' % (mancount + advcount))
649 654 # param size
650 655 fparamsizes = _makefpartparamsizes(mancount + advcount)
651 656 paramsizes = self._unpackheader(fparamsizes)
652 657 # make it a list of couple again
653 658 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
654 659 # split mandatory from advisory
655 660 mansizes = paramsizes[:mancount]
656 661 advsizes = paramsizes[mancount:]
657 662 # retrive param value
658 663 manparams = []
659 664 for key, value in mansizes:
660 665 manparams.append((self._fromheader(key), self._fromheader(value)))
661 666 advparams = []
662 667 for key, value in advsizes:
663 668 advparams.append((self._fromheader(key), self._fromheader(value)))
664 669 self.mandatoryparams = manparams
665 670 self.advisoryparams = advparams
666 671 ## part payload
667 672 def payloadchunks():
668 673 payloadsize = self._unpack(_fpayloadsize)[0]
669 674 self.ui.debug('payload chunk size: %i\n' % payloadsize)
670 675 while payloadsize:
671 676 yield self._readexact(payloadsize)
672 677 payloadsize = self._unpack(_fpayloadsize)[0]
673 678 self.ui.debug('payload chunk size: %i\n' % payloadsize)
674 679 self._payloadstream = util.chunkbuffer(payloadchunks())
675 680 # we read the data, tell it
676 681 self._initialized = True
677 682
678 683 def read(self, size=None):
679 684 """read payload data"""
680 685 if not self._initialized:
681 686 self._readheader()
682 687 if size is None:
683 688 data = self._payloadstream.read()
684 689 else:
685 690 data = self._payloadstream.read(size)
686 691 if size is None or len(data) < size:
687 692 self.consumed = True
688 693 return data
689 694
690 695
691 696 @parthandler('b2x:changegroup')
692 697 def handlechangegroup(op, inpart):
693 698 """apply a changegroup part on the repo
694 699
695 700 This is a very early implementation that will massive rework before being
696 701 inflicted to any end-user.
697 702 """
698 703 # Make sure we trigger a transaction creation
699 704 #
700 705 # The addchangegroup function will get a transaction object by itself, but
701 706 # we need to make sure we trigger the creation of a transaction object used
702 707 # for the whole processing scope.
703 708 op.gettransaction()
704 709 cg = changegroup.unbundle10(inpart, 'UN')
705 710 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
706 711 op.records.add('changegroup', {'return': ret})
707 712 if op.reply is not None:
708 713 # This is definitly not the final form of this
709 714 # return. But one need to start somewhere.
710 part = bundlepart('b2x:reply:changegroup', (),
715 part = op.reply.newpart('b2x:reply:changegroup', (),
711 716 [('in-reply-to', str(inpart.id)),
712 717 ('return', '%i' % ret)])
713 718 op.reply.addpart(part)
714 719 assert not inpart.read()
715 720
716 721 @parthandler('b2x:reply:changegroup')
717 722 def handlechangegroup(op, inpart):
718 723 p = dict(inpart.advisoryparams)
719 724 ret = int(p['return'])
720 725 op.records.add('changegroup', {'return': ret}, int(p['in-reply-to']))
721 726
722 727 @parthandler('b2x:check:heads')
723 728 def handlechangegroup(op, inpart):
724 729 """check that head of the repo did not change
725 730
726 731 This is used to detect a push race when using unbundle.
727 732 This replaces the "heads" argument of unbundle."""
728 733 h = inpart.read(20)
729 734 heads = []
730 735 while len(h) == 20:
731 736 heads.append(h)
732 737 h = inpart.read(20)
733 738 assert not h
734 739 if heads != op.repo.heads():
735 740 raise error.PushRaced('repository changed while pushing - '
736 741 'please try again')
737 742
738 743 @parthandler('b2x:output')
739 744 def handleoutput(op, inpart):
740 745 """forward output captured on the server to the client"""
741 746 for line in inpart.read().splitlines():
742 747 op.ui.write(('remote: %s\n' % line))
743 748
744 749 @parthandler('b2x:replycaps')
745 750 def handlereplycaps(op, inpart):
746 751 """Notify that a reply bundle should be created
747 752
748 753 The payload contains the capabilities information for the reply"""
749 754 caps = decodecaps(inpart.read())
750 755 if op.reply is None:
751 756 op.reply = bundle20(op.ui, caps)
752 757
753 758 @parthandler('b2x:error:abort')
754 759 def handlereplycaps(op, inpart):
755 760 """Used to transmit abort error over the wire"""
756 761 manargs = dict(inpart.mandatoryparams)
757 762 advargs = dict(inpart.advisoryparams)
758 763 raise util.Abort(manargs['message'], hint=advargs.get('hint'))
759 764
760 765 @parthandler('b2x:error:unknownpart')
761 766 def handlereplycaps(op, inpart):
762 767 """Used to transmit unknown part error over the wire"""
763 768 manargs = dict(inpart.mandatoryparams)
764 769 raise UnknownPartError(manargs['parttype'])
765 770
766 771 @parthandler('b2x:error:pushraced')
767 772 def handlereplycaps(op, inpart):
768 773 """Used to transmit push race error over the wire"""
769 774 manargs = dict(inpart.mandatoryparams)
770 775 raise error.ResponseError(_('push failed:'), manargs['message'])
General Comments 0
You need to be logged in to leave comments. Login now