##// END OF EJS Templates
bundle2: use a more specific UnknownPartError when no handler is found...
Pierre-Yves David -
r21179:372f4772 stable
parent child Browse files
Show More
@@ -1,751 +1,755 b''
1 1 # bundle2.py - generic container format to transmit arbitrary data.
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7 """Handling of the new bundle2 format
8 8
9 9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 11 that will be handed to and processed by the application layer.
12 12
13 13
14 14 General format architecture
15 15 ===========================
16 16
17 17 The format is architectured as follow
18 18
19 19 - magic string
20 20 - stream level parameters
21 21 - payload parts (any number)
22 22 - end of stream marker.
23 23
24 24 the Binary format
25 25 ============================
26 26
27 27 All numbers are unsigned and big-endian.
28 28
29 29 stream level parameters
30 30 ------------------------
31 31
32 32 Binary format is as follow
33 33
34 34 :params size: (16 bits integer)
35 35
36 36 The total number of Bytes used by the parameters
37 37
38 38 :params value: arbitrary number of Bytes
39 39
40 40 A blob of `params size` containing the serialized version of all stream level
41 41 parameters.
42 42
43 43 The blob contains a space separated list of parameters. Parameters with value
44 44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45 45
46 46 Empty name are obviously forbidden.
47 47
48 48 Name MUST start with a letter. If this first letter is lower case, the
49 49 parameter is advisory and can be safely ignored. However when the first
50 50 letter is capital, the parameter is mandatory and the bundling process MUST
51 51 stop if he is not able to proceed it.
52 52
53 53 Stream parameters use a simple textual format for two main reasons:
54 54
55 55 - Stream level parameters should remain simple and we want to discourage any
56 56 crazy usage.
57 57 - Textual data allow easy human inspection of a bundle2 header in case of
58 58 troubles.
59 59
60 60 Any Applicative level options MUST go into a bundle2 part instead.
61 61
62 62 Payload part
63 63 ------------------------
64 64
65 65 Binary format is as follow
66 66
67 67 :header size: (16 bits inter)
68 68
69 69 The total number of Bytes used by the part headers. When the header is empty
70 70 (size = 0) this is interpreted as the end of stream marker.
71 71
72 72 :header:
73 73
74 74 The header defines how to interpret the part. It contains two piece of
75 75 data: the part type, and the part parameters.
76 76
77 77 The part type is used to route an application level handler, that can
78 78 interpret payload.
79 79
80 80 Part parameters are passed to the application level handler. They are
81 81 meant to convey information that will help the application level object to
82 82 interpret the part payload.
83 83
84 84 The binary format of the header is has follow
85 85
86 86 :typesize: (one byte)
87 87
88 88 :parttype: alphanumerical part name
89 89
90 90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 91 to this part.
92 92
93 93 :parameters:
94 94
95 95 Part's parameter may have arbitrary content, the binary structure is::
96 96
97 97 <mandatory-count><advisory-count><param-sizes><param-data>
98 98
99 99 :mandatory-count: 1 byte, number of mandatory parameters
100 100
101 101 :advisory-count: 1 byte, number of advisory parameters
102 102
103 103 :param-sizes:
104 104
105 105 N couple of bytes, where N is the total number of parameters. Each
106 106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107 107
108 108 :param-data:
109 109
110 110 A blob of bytes from which each parameter key and value can be
111 111 retrieved using the list of size couples stored in the previous
112 112 field.
113 113
114 114 Mandatory parameters comes first, then the advisory ones.
115 115
116 116 :payload:
117 117
118 118 payload is a series of `<chunksize><chunkdata>`.
119 119
120 120 `chunksize` is a 32 bits integer, `chunkdata` are plain bytes (as much as
121 121 `chunksize` says)` The payload part is concluded by a zero size chunk.
122 122
123 123 The current implementation always produces either zero or one chunk.
124 124 This is an implementation limitation that will ultimately be lifted.
125 125
126 126 Bundle processing
127 127 ============================
128 128
129 129 Each part is processed in order using a "part handler". Handler are registered
130 130 for a certain part type.
131 131
132 132 The matching of a part to its handler is case insensitive. The case of the
133 133 part type is used to know if a part is mandatory or advisory. If the Part type
134 134 contains any uppercase char it is considered mandatory. When no handler is
135 135 known for a Mandatory part, the process is aborted and an exception is raised.
136 136 If the part is advisory and no handler is known, the part is ignored. When the
137 137 process is aborted, the full bundle is still read from the stream to keep the
138 138 channel usable. But none of the part read from an abort are processed. In the
139 139 future, dropping the stream may become an option for channel we do not care to
140 140 preserve.
141 141 """
142 142
143 143 import util
144 144 import struct
145 145 import urllib
146 146 import string
147 147
148 148 import changegroup
149 149 from i18n import _
150 150
151 151 _pack = struct.pack
152 152 _unpack = struct.unpack
153 153
154 154 _magicstring = 'HG2X'
155 155
156 156 _fstreamparamsize = '>H'
157 157 _fpartheadersize = '>H'
158 158 _fparttypesize = '>B'
159 159 _fpartid = '>I'
160 160 _fpayloadsize = '>I'
161 161 _fpartparamcount = '>BB'
162 162
163 163 preferedchunksize = 4096
164 164
165 165 def _makefpartparamsizes(nbparams):
166 166 """return a struct format to read part parameter sizes
167 167
168 168 The number parameters is variable so we need to build that format
169 169 dynamically.
170 170 """
171 171 return '>'+('BB'*nbparams)
172 172
173 class UnknownPartError(KeyError):
174 """error raised when no handler is found for a Mandatory part"""
175 pass
176
173 177 parthandlermapping = {}
174 178
175 179 def parthandler(parttype):
176 180 """decorator that register a function as a bundle2 part handler
177 181
178 182 eg::
179 183
180 184 @parthandler('myparttype')
181 185 def myparttypehandler(...):
182 186 '''process a part of type "my part".'''
183 187 ...
184 188 """
185 189 def _decorator(func):
186 190 lparttype = parttype.lower() # enforce lower case matching.
187 191 assert lparttype not in parthandlermapping
188 192 parthandlermapping[lparttype] = func
189 193 return func
190 194 return _decorator
191 195
192 196 class unbundlerecords(object):
193 197 """keep record of what happens during and unbundle
194 198
195 199 New records are added using `records.add('cat', obj)`. Where 'cat' is a
196 200 category of record and obj is an arbitrary object.
197 201
198 202 `records['cat']` will return all entries of this category 'cat'.
199 203
200 204 Iterating on the object itself will yield `('category', obj)` tuples
201 205 for all entries.
202 206
203 207 All iterations happens in chronological order.
204 208 """
205 209
206 210 def __init__(self):
207 211 self._categories = {}
208 212 self._sequences = []
209 213 self._replies = {}
210 214
211 215 def add(self, category, entry, inreplyto=None):
212 216 """add a new record of a given category.
213 217
214 218 The entry can then be retrieved in the list returned by
215 219 self['category']."""
216 220 self._categories.setdefault(category, []).append(entry)
217 221 self._sequences.append((category, entry))
218 222 if inreplyto is not None:
219 223 self.getreplies(inreplyto).add(category, entry)
220 224
221 225 def getreplies(self, partid):
222 226 """get the subrecords that replies to a specific part"""
223 227 return self._replies.setdefault(partid, unbundlerecords())
224 228
225 229 def __getitem__(self, cat):
226 230 return tuple(self._categories.get(cat, ()))
227 231
228 232 def __iter__(self):
229 233 return iter(self._sequences)
230 234
231 235 def __len__(self):
232 236 return len(self._sequences)
233 237
234 238 def __nonzero__(self):
235 239 return bool(self._sequences)
236 240
237 241 class bundleoperation(object):
238 242 """an object that represents a single bundling process
239 243
240 244 Its purpose is to carry unbundle-related objects and states.
241 245
242 246 A new object should be created at the beginning of each bundle processing.
243 247 The object is to be returned by the processing function.
244 248
245 249 The object has very little content now it will ultimately contain:
246 250 * an access to the repo the bundle is applied to,
247 251 * a ui object,
248 252 * a way to retrieve a transaction to add changes to the repo,
249 253 * a way to record the result of processing each part,
250 254 * a way to construct a bundle response when applicable.
251 255 """
252 256
253 257 def __init__(self, repo, transactiongetter):
254 258 self.repo = repo
255 259 self.ui = repo.ui
256 260 self.records = unbundlerecords()
257 261 self.gettransaction = transactiongetter
258 262 self.reply = None
259 263
260 264 class TransactionUnavailable(RuntimeError):
261 265 pass
262 266
263 267 def _notransaction():
264 268 """default method to get a transaction while processing a bundle
265 269
266 270 Raise an exception to highlight the fact that no transaction was expected
267 271 to be created"""
268 272 raise TransactionUnavailable()
269 273
270 274 def processbundle(repo, unbundler, transactiongetter=_notransaction):
271 275 """This function process a bundle, apply effect to/from a repo
272 276
273 277 It iterates over each part then searches for and uses the proper handling
274 278 code to process the part. Parts are processed in order.
275 279
276 280 This is very early version of this function that will be strongly reworked
277 281 before final usage.
278 282
279 283 Unknown Mandatory part will abort the process.
280 284 """
281 285 op = bundleoperation(repo, transactiongetter)
282 286 # todo:
283 287 # - replace this is a init function soon.
284 288 # - exception catching
285 289 unbundler.params
286 290 iterparts = unbundler.iterparts()
287 291 part = None
288 292 try:
289 293 for part in iterparts:
290 294 parttype = part.type
291 295 # part key are matched lower case
292 296 key = parttype.lower()
293 297 try:
294 298 handler = parthandlermapping[key]
295 299 op.ui.debug('found a handler for part %r\n' % parttype)
296 300 except KeyError:
297 301 if key != parttype: # mandatory parts
298 302 # todo:
299 303 # - use a more precise exception
300 raise
304 raise UnknownPartError(key)
301 305 op.ui.debug('ignoring unknown advisory part %r\n' % key)
302 306 # consuming the part
303 307 part.read()
304 308 continue
305 309
306 310 # handler is called outside the above try block so that we don't
307 311 # risk catching KeyErrors from anything other than the
308 312 # parthandlermapping lookup (any KeyError raised by handler()
309 313 # itself represents a defect of a different variety).
310 314 output = None
311 315 if op.reply is not None:
312 316 op.ui.pushbuffer(error=True)
313 317 output = ''
314 318 try:
315 319 handler(op, part)
316 320 finally:
317 321 if output is not None:
318 322 output = op.ui.popbuffer()
319 323 if output:
320 324 outpart = bundlepart('b2x:output',
321 325 advisoryparams=[('in-reply-to',
322 326 str(part.id))],
323 327 data=output)
324 328 op.reply.addpart(outpart)
325 329 part.read()
326 330 except Exception, exc:
327 331 if part is not None:
328 332 # consume the bundle content
329 333 part.read()
330 334 for part in iterparts:
331 335 # consume the bundle content
332 336 part.read()
333 337 # Small hack to let caller code distinguish exceptions from bundle2
334 338 # processing fron the ones from bundle1 processing. This is mostly
335 339 # needed to handle different return codes to unbundle according to the
336 340 # type of bundle. We should probably clean up or drop this return code
337 341 # craziness in a future version.
338 342 exc.duringunbundle2 = True
339 343 raise
340 344 return op
341 345
342 346 def decodecaps(blob):
343 347 """decode a bundle2 caps bytes blob into a dictionnary
344 348
345 349 The blob is a list of capabilities (one per line)
346 350 Capabilities may have values using a line of the form::
347 351
348 352 capability=value1,value2,value3
349 353
350 354 The values are always a list."""
351 355 caps = {}
352 356 for line in blob.splitlines():
353 357 if not line:
354 358 continue
355 359 if '=' not in line:
356 360 key, vals = line, ()
357 361 else:
358 362 key, vals = line.split('=', 1)
359 363 vals = vals.split(',')
360 364 key = urllib.unquote(key)
361 365 vals = [urllib.unquote(v) for v in vals]
362 366 caps[key] = vals
363 367 return caps
364 368
365 369 def encodecaps(caps):
366 370 """encode a bundle2 caps dictionary into a bytes blob"""
367 371 chunks = []
368 372 for ca in sorted(caps):
369 373 vals = caps[ca]
370 374 ca = urllib.quote(ca)
371 375 vals = [urllib.quote(v) for v in vals]
372 376 if vals:
373 377 ca = "%s=%s" % (ca, ','.join(vals))
374 378 chunks.append(ca)
375 379 return '\n'.join(chunks)
376 380
377 381 class bundle20(object):
378 382 """represent an outgoing bundle2 container
379 383
380 384 Use the `addparam` method to add stream level parameter. and `addpart` to
381 385 populate it. Then call `getchunks` to retrieve all the binary chunks of
382 386 data that compose the bundle2 container."""
383 387
384 388 def __init__(self, ui, capabilities=()):
385 389 self.ui = ui
386 390 self._params = []
387 391 self._parts = []
388 392 self.capabilities = dict(capabilities)
389 393
390 394 def addparam(self, name, value=None):
391 395 """add a stream level parameter"""
392 396 if not name:
393 397 raise ValueError('empty parameter name')
394 398 if name[0] not in string.letters:
395 399 raise ValueError('non letter first character: %r' % name)
396 400 self._params.append((name, value))
397 401
398 402 def addpart(self, part):
399 403 """add a new part to the bundle2 container
400 404
401 405 Parts contains the actual applicative payload."""
402 406 assert part.id is None
403 407 part.id = len(self._parts) # very cheap counter
404 408 self._parts.append(part)
405 409
406 410 def getchunks(self):
407 411 self.ui.debug('start emission of %s stream\n' % _magicstring)
408 412 yield _magicstring
409 413 param = self._paramchunk()
410 414 self.ui.debug('bundle parameter: %s\n' % param)
411 415 yield _pack(_fstreamparamsize, len(param))
412 416 if param:
413 417 yield param
414 418
415 419 self.ui.debug('start of parts\n')
416 420 for part in self._parts:
417 421 self.ui.debug('bundle part: "%s"\n' % part.type)
418 422 for chunk in part.getchunks():
419 423 yield chunk
420 424 self.ui.debug('end of bundle\n')
421 425 yield '\0\0'
422 426
423 427 def _paramchunk(self):
424 428 """return a encoded version of all stream parameters"""
425 429 blocks = []
426 430 for par, value in self._params:
427 431 par = urllib.quote(par)
428 432 if value is not None:
429 433 value = urllib.quote(value)
430 434 par = '%s=%s' % (par, value)
431 435 blocks.append(par)
432 436 return ' '.join(blocks)
433 437
434 438 class unpackermixin(object):
435 439 """A mixin to extract bytes and struct data from a stream"""
436 440
437 441 def __init__(self, fp):
438 442 self._fp = fp
439 443
440 444 def _unpack(self, format):
441 445 """unpack this struct format from the stream"""
442 446 data = self._readexact(struct.calcsize(format))
443 447 return _unpack(format, data)
444 448
445 449 def _readexact(self, size):
446 450 """read exactly <size> bytes from the stream"""
447 451 return changegroup.readexactly(self._fp, size)
448 452
449 453
450 454 class unbundle20(unpackermixin):
451 455 """interpret a bundle2 stream
452 456
453 457 This class is fed with a binary stream and yields parts through its
454 458 `iterparts` methods."""
455 459
456 460 def __init__(self, ui, fp, header=None):
457 461 """If header is specified, we do not read it out of the stream."""
458 462 self.ui = ui
459 463 super(unbundle20, self).__init__(fp)
460 464 if header is None:
461 465 header = self._readexact(4)
462 466 magic, version = header[0:2], header[2:4]
463 467 if magic != 'HG':
464 468 raise util.Abort(_('not a Mercurial bundle'))
465 469 if version != '2X':
466 470 raise util.Abort(_('unknown bundle version %s') % version)
467 471 self.ui.debug('start processing of %s stream\n' % header)
468 472
469 473 @util.propertycache
470 474 def params(self):
471 475 """dictionary of stream level parameters"""
472 476 self.ui.debug('reading bundle2 stream parameters\n')
473 477 params = {}
474 478 paramssize = self._unpack(_fstreamparamsize)[0]
475 479 if paramssize:
476 480 for p in self._readexact(paramssize).split(' '):
477 481 p = p.split('=', 1)
478 482 p = [urllib.unquote(i) for i in p]
479 483 if len(p) < 2:
480 484 p.append(None)
481 485 self._processparam(*p)
482 486 params[p[0]] = p[1]
483 487 return params
484 488
485 489 def _processparam(self, name, value):
486 490 """process a parameter, applying its effect if needed
487 491
488 492 Parameter starting with a lower case letter are advisory and will be
489 493 ignored when unknown. Those starting with an upper case letter are
490 494 mandatory and will this function will raise a KeyError when unknown.
491 495
492 496 Note: no option are currently supported. Any input will be either
493 497 ignored or failing.
494 498 """
495 499 if not name:
496 500 raise ValueError('empty parameter name')
497 501 if name[0] not in string.letters:
498 502 raise ValueError('non letter first character: %r' % name)
499 503 # Some logic will be later added here to try to process the option for
500 504 # a dict of known parameter.
501 505 if name[0].islower():
502 506 self.ui.debug("ignoring unknown parameter %r\n" % name)
503 507 else:
504 508 raise KeyError(name)
505 509
506 510
507 511 def iterparts(self):
508 512 """yield all parts contained in the stream"""
509 513 # make sure param have been loaded
510 514 self.params
511 515 self.ui.debug('start extraction of bundle2 parts\n')
512 516 headerblock = self._readpartheader()
513 517 while headerblock is not None:
514 518 part = unbundlepart(self.ui, headerblock, self._fp)
515 519 yield part
516 520 headerblock = self._readpartheader()
517 521 self.ui.debug('end of bundle2 stream\n')
518 522
519 523 def _readpartheader(self):
520 524 """reads a part header size and return the bytes blob
521 525
522 526 returns None if empty"""
523 527 headersize = self._unpack(_fpartheadersize)[0]
524 528 self.ui.debug('part header size: %i\n' % headersize)
525 529 if headersize:
526 530 return self._readexact(headersize)
527 531 return None
528 532
529 533
530 534 class bundlepart(object):
531 535 """A bundle2 part contains application level payload
532 536
533 537 The part `type` is used to route the part to the application level
534 538 handler.
535 539 """
536 540
537 541 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
538 542 data=''):
539 543 self.id = None
540 544 self.type = parttype
541 545 self.data = data
542 546 self.mandatoryparams = mandatoryparams
543 547 self.advisoryparams = advisoryparams
544 548
545 549 def getchunks(self):
546 550 #### header
547 551 ## parttype
548 552 header = [_pack(_fparttypesize, len(self.type)),
549 553 self.type, _pack(_fpartid, self.id),
550 554 ]
551 555 ## parameters
552 556 # count
553 557 manpar = self.mandatoryparams
554 558 advpar = self.advisoryparams
555 559 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
556 560 # size
557 561 parsizes = []
558 562 for key, value in manpar:
559 563 parsizes.append(len(key))
560 564 parsizes.append(len(value))
561 565 for key, value in advpar:
562 566 parsizes.append(len(key))
563 567 parsizes.append(len(value))
564 568 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
565 569 header.append(paramsizes)
566 570 # key, value
567 571 for key, value in manpar:
568 572 header.append(key)
569 573 header.append(value)
570 574 for key, value in advpar:
571 575 header.append(key)
572 576 header.append(value)
573 577 ## finalize header
574 578 headerchunk = ''.join(header)
575 579 yield _pack(_fpartheadersize, len(headerchunk))
576 580 yield headerchunk
577 581 ## payload
578 582 for chunk in self._payloadchunks():
579 583 yield _pack(_fpayloadsize, len(chunk))
580 584 yield chunk
581 585 # end of payload
582 586 yield _pack(_fpayloadsize, 0)
583 587
584 588 def _payloadchunks(self):
585 589 """yield chunks of a the part payload
586 590
587 591 Exists to handle the different methods to provide data to a part."""
588 592 # we only support fixed size data now.
589 593 # This will be improved in the future.
590 594 if util.safehasattr(self.data, 'next'):
591 595 buff = util.chunkbuffer(self.data)
592 596 chunk = buff.read(preferedchunksize)
593 597 while chunk:
594 598 yield chunk
595 599 chunk = buff.read(preferedchunksize)
596 600 elif len(self.data):
597 601 yield self.data
598 602
599 603 class unbundlepart(unpackermixin):
600 604 """a bundle part read from a bundle"""
601 605
602 606 def __init__(self, ui, header, fp):
603 607 super(unbundlepart, self).__init__(fp)
604 608 self.ui = ui
605 609 # unbundle state attr
606 610 self._headerdata = header
607 611 self._headeroffset = 0
608 612 self._initialized = False
609 613 self.consumed = False
610 614 # part data
611 615 self.id = None
612 616 self.type = None
613 617 self.mandatoryparams = None
614 618 self.advisoryparams = None
615 619 self._payloadstream = None
616 620 self._readheader()
617 621
618 622 def _fromheader(self, size):
619 623 """return the next <size> byte from the header"""
620 624 offset = self._headeroffset
621 625 data = self._headerdata[offset:(offset + size)]
622 626 self._headeroffset = offset + size
623 627 return data
624 628
625 629 def _unpackheader(self, format):
626 630 """read given format from header
627 631
628 632 This automatically compute the size of the format to read."""
629 633 data = self._fromheader(struct.calcsize(format))
630 634 return _unpack(format, data)
631 635
632 636 def _readheader(self):
633 637 """read the header and setup the object"""
634 638 typesize = self._unpackheader(_fparttypesize)[0]
635 639 self.type = self._fromheader(typesize)
636 640 self.ui.debug('part type: "%s"\n' % self.type)
637 641 self.id = self._unpackheader(_fpartid)[0]
638 642 self.ui.debug('part id: "%s"\n' % self.id)
639 643 ## reading parameters
640 644 # param count
641 645 mancount, advcount = self._unpackheader(_fpartparamcount)
642 646 self.ui.debug('part parameters: %i\n' % (mancount + advcount))
643 647 # param size
644 648 fparamsizes = _makefpartparamsizes(mancount + advcount)
645 649 paramsizes = self._unpackheader(fparamsizes)
646 650 # make it a list of couple again
647 651 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
648 652 # split mandatory from advisory
649 653 mansizes = paramsizes[:mancount]
650 654 advsizes = paramsizes[mancount:]
651 655 # retrive param value
652 656 manparams = []
653 657 for key, value in mansizes:
654 658 manparams.append((self._fromheader(key), self._fromheader(value)))
655 659 advparams = []
656 660 for key, value in advsizes:
657 661 advparams.append((self._fromheader(key), self._fromheader(value)))
658 662 self.mandatoryparams = manparams
659 663 self.advisoryparams = advparams
660 664 ## part payload
661 665 def payloadchunks():
662 666 payloadsize = self._unpack(_fpayloadsize)[0]
663 667 self.ui.debug('payload chunk size: %i\n' % payloadsize)
664 668 while payloadsize:
665 669 yield self._readexact(payloadsize)
666 670 payloadsize = self._unpack(_fpayloadsize)[0]
667 671 self.ui.debug('payload chunk size: %i\n' % payloadsize)
668 672 self._payloadstream = util.chunkbuffer(payloadchunks())
669 673 # we read the data, tell it
670 674 self._initialized = True
671 675
672 676 def read(self, size=None):
673 677 """read payload data"""
674 678 if not self._initialized:
675 679 self._readheader()
676 680 if size is None:
677 681 data = self._payloadstream.read()
678 682 else:
679 683 data = self._payloadstream.read(size)
680 684 if size is None or len(data) < size:
681 685 self.consumed = True
682 686 return data
683 687
684 688
685 689 @parthandler('b2x:changegroup')
686 690 def handlechangegroup(op, inpart):
687 691 """apply a changegroup part on the repo
688 692
689 693 This is a very early implementation that will massive rework before being
690 694 inflicted to any end-user.
691 695 """
692 696 # Make sure we trigger a transaction creation
693 697 #
694 698 # The addchangegroup function will get a transaction object by itself, but
695 699 # we need to make sure we trigger the creation of a transaction object used
696 700 # for the whole processing scope.
697 701 op.gettransaction()
698 702 cg = changegroup.unbundle10(inpart, 'UN')
699 703 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
700 704 op.records.add('changegroup', {'return': ret})
701 705 if op.reply is not None:
702 706 # This is definitly not the final form of this
703 707 # return. But one need to start somewhere.
704 708 part = bundlepart('b2x:reply:changegroup', (),
705 709 [('in-reply-to', str(inpart.id)),
706 710 ('return', '%i' % ret)])
707 711 op.reply.addpart(part)
708 712 assert not inpart.read()
709 713
710 714 @parthandler('b2x:reply:changegroup')
711 715 def handlechangegroup(op, inpart):
712 716 p = dict(inpart.advisoryparams)
713 717 ret = int(p['return'])
714 718 op.records.add('changegroup', {'return': ret}, int(p['in-reply-to']))
715 719
716 720 @parthandler('b2x:check:heads')
717 721 def handlechangegroup(op, inpart):
718 722 """check that head of the repo did not change
719 723
720 724 This is used to detect a push race when using unbundle.
721 725 This replaces the "heads" argument of unbundle."""
722 726 h = inpart.read(20)
723 727 heads = []
724 728 while len(h) == 20:
725 729 heads.append(h)
726 730 h = inpart.read(20)
727 731 assert not h
728 732 if heads != op.repo.heads():
729 733 raise exchange.PushRaced()
730 734
731 735 @parthandler('b2x:output')
732 736 def handleoutput(op, inpart):
733 737 """forward output captured on the server to the client"""
734 738 for line in inpart.read().splitlines():
735 739 op.ui.write(('remote: %s\n' % line))
736 740
737 741 @parthandler('b2x:replycaps')
738 742 def handlereplycaps(op, inpart):
739 743 """Notify that a reply bundle should be created
740 744
741 745 The payload contains the capabilities information for the reply"""
742 746 caps = decodecaps(inpart.read())
743 747 if op.reply is None:
744 748 op.reply = bundle20(op.ui, caps)
745 749
746 750 @parthandler('b2x:error:abort')
747 751 def handlereplycaps(op, inpart):
748 752 """Used to transmit abort error over the wire"""
749 753 manargs = dict(inpart.mandatoryparams)
750 754 advargs = dict(inpart.advisoryparams)
751 755 raise util.Abort(manargs['message'], hint=advargs.get('hint'))
General Comments 0
You need to be logged in to leave comments. Login now