##// END OF EJS Templates
bundle2: introduce a bundle2caps function...
Pierre-Yves David -
r21644:17755dd8 default
parent child Browse files
Show More
@@ -1,855 +1,862 b''
1 1 # bundle2.py - generic container format to transmit arbitrary data.
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7 """Handling of the new bundle2 format
8 8
9 9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 11 that will be handed to and processed by the application layer.
12 12
13 13
14 14 General format architecture
15 15 ===========================
16 16
17 17 The format is architectured as follow
18 18
19 19 - magic string
20 20 - stream level parameters
21 21 - payload parts (any number)
22 22 - end of stream marker.
23 23
24 24 the Binary format
25 25 ============================
26 26
27 27 All numbers are unsigned and big-endian.
28 28
29 29 stream level parameters
30 30 ------------------------
31 31
32 32 Binary format is as follow
33 33
34 34 :params size: (16 bits integer)
35 35
36 36 The total number of Bytes used by the parameters
37 37
38 38 :params value: arbitrary number of Bytes
39 39
40 40 A blob of `params size` containing the serialized version of all stream level
41 41 parameters.
42 42
43 43 The blob contains a space separated list of parameters. Parameters with value
44 44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45 45
46 46 Empty name are obviously forbidden.
47 47
48 48 Name MUST start with a letter. If this first letter is lower case, the
49 49 parameter is advisory and can be safely ignored. However when the first
50 50 letter is capital, the parameter is mandatory and the bundling process MUST
51 51 stop if he is not able to proceed it.
52 52
53 53 Stream parameters use a simple textual format for two main reasons:
54 54
55 55 - Stream level parameters should remain simple and we want to discourage any
56 56 crazy usage.
57 57 - Textual data allow easy human inspection of a bundle2 header in case of
58 58 troubles.
59 59
60 60 Any Applicative level options MUST go into a bundle2 part instead.
61 61
62 62 Payload part
63 63 ------------------------
64 64
65 65 Binary format is as follow
66 66
67 67 :header size: (16 bits inter)
68 68
69 69 The total number of Bytes used by the part headers. When the header is empty
70 70 (size = 0) this is interpreted as the end of stream marker.
71 71
72 72 :header:
73 73
74 74 The header defines how to interpret the part. It contains two piece of
75 75 data: the part type, and the part parameters.
76 76
77 77 The part type is used to route an application level handler, that can
78 78 interpret payload.
79 79
80 80 Part parameters are passed to the application level handler. They are
81 81 meant to convey information that will help the application level object to
82 82 interpret the part payload.
83 83
84 84 The binary format of the header is has follow
85 85
86 86 :typesize: (one byte)
87 87
88 88 :parttype: alphanumerical part name
89 89
90 90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 91 to this part.
92 92
93 93 :parameters:
94 94
95 95 Part's parameter may have arbitrary content, the binary structure is::
96 96
97 97 <mandatory-count><advisory-count><param-sizes><param-data>
98 98
99 99 :mandatory-count: 1 byte, number of mandatory parameters
100 100
101 101 :advisory-count: 1 byte, number of advisory parameters
102 102
103 103 :param-sizes:
104 104
105 105 N couple of bytes, where N is the total number of parameters. Each
106 106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107 107
108 108 :param-data:
109 109
110 110 A blob of bytes from which each parameter key and value can be
111 111 retrieved using the list of size couples stored in the previous
112 112 field.
113 113
114 114 Mandatory parameters comes first, then the advisory ones.
115 115
116 116 Each parameter's key MUST be unique within the part.
117 117
118 118 :payload:
119 119
120 120 payload is a series of `<chunksize><chunkdata>`.
121 121
122 122 `chunksize` is a 32 bits integer, `chunkdata` are plain bytes (as much as
123 123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124 124
125 125 The current implementation always produces either zero or one chunk.
126 126 This is an implementation limitation that will ultimately be lifted.
127 127
128 128 Bundle processing
129 129 ============================
130 130
131 131 Each part is processed in order using a "part handler". Handler are registered
132 132 for a certain part type.
133 133
134 134 The matching of a part to its handler is case insensitive. The case of the
135 135 part type is used to know if a part is mandatory or advisory. If the Part type
136 136 contains any uppercase char it is considered mandatory. When no handler is
137 137 known for a Mandatory part, the process is aborted and an exception is raised.
138 138 If the part is advisory and no handler is known, the part is ignored. When the
139 139 process is aborted, the full bundle is still read from the stream to keep the
140 140 channel usable. But none of the part read from an abort are processed. In the
141 141 future, dropping the stream may become an option for channel we do not care to
142 142 preserve.
143 143 """
144 144
145 145 import util
146 146 import struct
147 147 import urllib
148 148 import string
149 149
150 150 import changegroup, error
151 151 from i18n import _
152 152
153 153 _pack = struct.pack
154 154 _unpack = struct.unpack
155 155
156 156 _magicstring = 'HG2X'
157 157
158 158 _fstreamparamsize = '>H'
159 159 _fpartheadersize = '>H'
160 160 _fparttypesize = '>B'
161 161 _fpartid = '>I'
162 162 _fpayloadsize = '>I'
163 163 _fpartparamcount = '>BB'
164 164
165 165 preferedchunksize = 4096
166 166
167 167 def _makefpartparamsizes(nbparams):
168 168 """return a struct format to read part parameter sizes
169 169
170 170 The number parameters is variable so we need to build that format
171 171 dynamically.
172 172 """
173 173 return '>'+('BB'*nbparams)
174 174
175 175 parthandlermapping = {}
176 176
177 177 def parthandler(parttype, params=()):
178 178 """decorator that register a function as a bundle2 part handler
179 179
180 180 eg::
181 181
182 182 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
183 183 def myparttypehandler(...):
184 184 '''process a part of type "my part".'''
185 185 ...
186 186 """
187 187 def _decorator(func):
188 188 lparttype = parttype.lower() # enforce lower case matching.
189 189 assert lparttype not in parthandlermapping
190 190 parthandlermapping[lparttype] = func
191 191 func.params = frozenset(params)
192 192 return func
193 193 return _decorator
194 194
195 195 class unbundlerecords(object):
196 196 """keep record of what happens during and unbundle
197 197
198 198 New records are added using `records.add('cat', obj)`. Where 'cat' is a
199 199 category of record and obj is an arbitrary object.
200 200
201 201 `records['cat']` will return all entries of this category 'cat'.
202 202
203 203 Iterating on the object itself will yield `('category', obj)` tuples
204 204 for all entries.
205 205
206 206 All iterations happens in chronological order.
207 207 """
208 208
209 209 def __init__(self):
210 210 self._categories = {}
211 211 self._sequences = []
212 212 self._replies = {}
213 213
214 214 def add(self, category, entry, inreplyto=None):
215 215 """add a new record of a given category.
216 216
217 217 The entry can then be retrieved in the list returned by
218 218 self['category']."""
219 219 self._categories.setdefault(category, []).append(entry)
220 220 self._sequences.append((category, entry))
221 221 if inreplyto is not None:
222 222 self.getreplies(inreplyto).add(category, entry)
223 223
224 224 def getreplies(self, partid):
225 225 """get the subrecords that replies to a specific part"""
226 226 return self._replies.setdefault(partid, unbundlerecords())
227 227
228 228 def __getitem__(self, cat):
229 229 return tuple(self._categories.get(cat, ()))
230 230
231 231 def __iter__(self):
232 232 return iter(self._sequences)
233 233
234 234 def __len__(self):
235 235 return len(self._sequences)
236 236
237 237 def __nonzero__(self):
238 238 return bool(self._sequences)
239 239
240 240 class bundleoperation(object):
241 241 """an object that represents a single bundling process
242 242
243 243 Its purpose is to carry unbundle-related objects and states.
244 244
245 245 A new object should be created at the beginning of each bundle processing.
246 246 The object is to be returned by the processing function.
247 247
248 248 The object has very little content now it will ultimately contain:
249 249 * an access to the repo the bundle is applied to,
250 250 * a ui object,
251 251 * a way to retrieve a transaction to add changes to the repo,
252 252 * a way to record the result of processing each part,
253 253 * a way to construct a bundle response when applicable.
254 254 """
255 255
256 256 def __init__(self, repo, transactiongetter):
257 257 self.repo = repo
258 258 self.ui = repo.ui
259 259 self.records = unbundlerecords()
260 260 self.gettransaction = transactiongetter
261 261 self.reply = None
262 262
263 263 class TransactionUnavailable(RuntimeError):
264 264 pass
265 265
266 266 def _notransaction():
267 267 """default method to get a transaction while processing a bundle
268 268
269 269 Raise an exception to highlight the fact that no transaction was expected
270 270 to be created"""
271 271 raise TransactionUnavailable()
272 272
273 273 def processbundle(repo, unbundler, transactiongetter=_notransaction):
274 274 """This function process a bundle, apply effect to/from a repo
275 275
276 276 It iterates over each part then searches for and uses the proper handling
277 277 code to process the part. Parts are processed in order.
278 278
279 279 This is very early version of this function that will be strongly reworked
280 280 before final usage.
281 281
282 282 Unknown Mandatory part will abort the process.
283 283 """
284 284 op = bundleoperation(repo, transactiongetter)
285 285 # todo:
286 286 # - replace this is a init function soon.
287 287 # - exception catching
288 288 unbundler.params
289 289 iterparts = unbundler.iterparts()
290 290 part = None
291 291 try:
292 292 for part in iterparts:
293 293 parttype = part.type
294 294 # part key are matched lower case
295 295 key = parttype.lower()
296 296 try:
297 297 handler = parthandlermapping.get(key)
298 298 if handler is None:
299 299 raise error.BundleValueError(parttype=key)
300 300 op.ui.debug('found a handler for part %r\n' % parttype)
301 301 unknownparams = part.mandatorykeys - handler.params
302 302 if unknownparams:
303 303 unknownparams = list(unknownparams)
304 304 unknownparams.sort()
305 305 raise error.BundleValueError(parttype=key,
306 306 params=unknownparams)
307 307 except error.BundleValueError, exc:
308 308 if key != parttype: # mandatory parts
309 309 raise
310 310 op.ui.debug('ignoring unsupported advisory part %s\n' % exc)
311 311 # consuming the part
312 312 part.read()
313 313 continue
314 314
315 315
316 316 # handler is called outside the above try block so that we don't
317 317 # risk catching KeyErrors from anything other than the
318 318 # parthandlermapping lookup (any KeyError raised by handler()
319 319 # itself represents a defect of a different variety).
320 320 output = None
321 321 if op.reply is not None:
322 322 op.ui.pushbuffer(error=True)
323 323 output = ''
324 324 try:
325 325 handler(op, part)
326 326 finally:
327 327 if output is not None:
328 328 output = op.ui.popbuffer()
329 329 if output:
330 330 outpart = op.reply.newpart('b2x:output', data=output)
331 331 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
332 332 part.read()
333 333 except Exception, exc:
334 334 if part is not None:
335 335 # consume the bundle content
336 336 part.read()
337 337 for part in iterparts:
338 338 # consume the bundle content
339 339 part.read()
340 340 # Small hack to let caller code distinguish exceptions from bundle2
341 341 # processing fron the ones from bundle1 processing. This is mostly
342 342 # needed to handle different return codes to unbundle according to the
343 343 # type of bundle. We should probably clean up or drop this return code
344 344 # craziness in a future version.
345 345 exc.duringunbundle2 = True
346 346 raise
347 347 return op
348 348
349 349 def decodecaps(blob):
350 350 """decode a bundle2 caps bytes blob into a dictionnary
351 351
352 352 The blob is a list of capabilities (one per line)
353 353 Capabilities may have values using a line of the form::
354 354
355 355 capability=value1,value2,value3
356 356
357 357 The values are always a list."""
358 358 caps = {}
359 359 for line in blob.splitlines():
360 360 if not line:
361 361 continue
362 362 if '=' not in line:
363 363 key, vals = line, ()
364 364 else:
365 365 key, vals = line.split('=', 1)
366 366 vals = vals.split(',')
367 367 key = urllib.unquote(key)
368 368 vals = [urllib.unquote(v) for v in vals]
369 369 caps[key] = vals
370 370 return caps
371 371
372 372 def encodecaps(caps):
373 373 """encode a bundle2 caps dictionary into a bytes blob"""
374 374 chunks = []
375 375 for ca in sorted(caps):
376 376 vals = caps[ca]
377 377 ca = urllib.quote(ca)
378 378 vals = [urllib.quote(v) for v in vals]
379 379 if vals:
380 380 ca = "%s=%s" % (ca, ','.join(vals))
381 381 chunks.append(ca)
382 382 return '\n'.join(chunks)
383 383
384 384 class bundle20(object):
385 385 """represent an outgoing bundle2 container
386 386
387 387 Use the `addparam` method to add stream level parameter. and `newpart` to
388 388 populate it. Then call `getchunks` to retrieve all the binary chunks of
389 389 data that compose the bundle2 container."""
390 390
391 391 def __init__(self, ui, capabilities=()):
392 392 self.ui = ui
393 393 self._params = []
394 394 self._parts = []
395 395 self.capabilities = dict(capabilities)
396 396
397 397 # methods used to defines the bundle2 content
398 398 def addparam(self, name, value=None):
399 399 """add a stream level parameter"""
400 400 if not name:
401 401 raise ValueError('empty parameter name')
402 402 if name[0] not in string.letters:
403 403 raise ValueError('non letter first character: %r' % name)
404 404 self._params.append((name, value))
405 405
406 406 def addpart(self, part):
407 407 """add a new part to the bundle2 container
408 408
409 409 Parts contains the actual applicative payload."""
410 410 assert part.id is None
411 411 part.id = len(self._parts) # very cheap counter
412 412 self._parts.append(part)
413 413
414 414 def newpart(self, typeid, *args, **kwargs):
415 415 """create a new part and add it to the containers
416 416
417 417 As the part is directly added to the containers. For now, this means
418 418 that any failure to properly initialize the part after calling
419 419 ``newpart`` should result in a failure of the whole bundling process.
420 420
421 421 You can still fall back to manually create and add if you need better
422 422 control."""
423 423 part = bundlepart(typeid, *args, **kwargs)
424 424 self.addpart(part)
425 425 return part
426 426
427 427 # methods used to generate the bundle2 stream
428 428 def getchunks(self):
429 429 self.ui.debug('start emission of %s stream\n' % _magicstring)
430 430 yield _magicstring
431 431 param = self._paramchunk()
432 432 self.ui.debug('bundle parameter: %s\n' % param)
433 433 yield _pack(_fstreamparamsize, len(param))
434 434 if param:
435 435 yield param
436 436
437 437 self.ui.debug('start of parts\n')
438 438 for part in self._parts:
439 439 self.ui.debug('bundle part: "%s"\n' % part.type)
440 440 for chunk in part.getchunks():
441 441 yield chunk
442 442 self.ui.debug('end of bundle\n')
443 443 yield '\0\0'
444 444
445 445 def _paramchunk(self):
446 446 """return a encoded version of all stream parameters"""
447 447 blocks = []
448 448 for par, value in self._params:
449 449 par = urllib.quote(par)
450 450 if value is not None:
451 451 value = urllib.quote(value)
452 452 par = '%s=%s' % (par, value)
453 453 blocks.append(par)
454 454 return ' '.join(blocks)
455 455
456 456 class unpackermixin(object):
457 457 """A mixin to extract bytes and struct data from a stream"""
458 458
459 459 def __init__(self, fp):
460 460 self._fp = fp
461 461
462 462 def _unpack(self, format):
463 463 """unpack this struct format from the stream"""
464 464 data = self._readexact(struct.calcsize(format))
465 465 return _unpack(format, data)
466 466
467 467 def _readexact(self, size):
468 468 """read exactly <size> bytes from the stream"""
469 469 return changegroup.readexactly(self._fp, size)
470 470
471 471
472 472 class unbundle20(unpackermixin):
473 473 """interpret a bundle2 stream
474 474
475 475 This class is fed with a binary stream and yields parts through its
476 476 `iterparts` methods."""
477 477
478 478 def __init__(self, ui, fp, header=None):
479 479 """If header is specified, we do not read it out of the stream."""
480 480 self.ui = ui
481 481 super(unbundle20, self).__init__(fp)
482 482 if header is None:
483 483 header = self._readexact(4)
484 484 magic, version = header[0:2], header[2:4]
485 485 if magic != 'HG':
486 486 raise util.Abort(_('not a Mercurial bundle'))
487 487 if version != '2X':
488 488 raise util.Abort(_('unknown bundle version %s') % version)
489 489 self.ui.debug('start processing of %s stream\n' % header)
490 490
491 491 @util.propertycache
492 492 def params(self):
493 493 """dictionary of stream level parameters"""
494 494 self.ui.debug('reading bundle2 stream parameters\n')
495 495 params = {}
496 496 paramssize = self._unpack(_fstreamparamsize)[0]
497 497 if paramssize:
498 498 for p in self._readexact(paramssize).split(' '):
499 499 p = p.split('=', 1)
500 500 p = [urllib.unquote(i) for i in p]
501 501 if len(p) < 2:
502 502 p.append(None)
503 503 self._processparam(*p)
504 504 params[p[0]] = p[1]
505 505 return params
506 506
507 507 def _processparam(self, name, value):
508 508 """process a parameter, applying its effect if needed
509 509
510 510 Parameter starting with a lower case letter are advisory and will be
511 511 ignored when unknown. Those starting with an upper case letter are
512 512 mandatory and will this function will raise a KeyError when unknown.
513 513
514 514 Note: no option are currently supported. Any input will be either
515 515 ignored or failing.
516 516 """
517 517 if not name:
518 518 raise ValueError('empty parameter name')
519 519 if name[0] not in string.letters:
520 520 raise ValueError('non letter first character: %r' % name)
521 521 # Some logic will be later added here to try to process the option for
522 522 # a dict of known parameter.
523 523 if name[0].islower():
524 524 self.ui.debug("ignoring unknown parameter %r\n" % name)
525 525 else:
526 526 raise error.BundleValueError(params=(name,))
527 527
528 528
529 529 def iterparts(self):
530 530 """yield all parts contained in the stream"""
531 531 # make sure param have been loaded
532 532 self.params
533 533 self.ui.debug('start extraction of bundle2 parts\n')
534 534 headerblock = self._readpartheader()
535 535 while headerblock is not None:
536 536 part = unbundlepart(self.ui, headerblock, self._fp)
537 537 yield part
538 538 headerblock = self._readpartheader()
539 539 self.ui.debug('end of bundle2 stream\n')
540 540
541 541 def _readpartheader(self):
542 542 """reads a part header size and return the bytes blob
543 543
544 544 returns None if empty"""
545 545 headersize = self._unpack(_fpartheadersize)[0]
546 546 self.ui.debug('part header size: %i\n' % headersize)
547 547 if headersize:
548 548 return self._readexact(headersize)
549 549 return None
550 550
551 551
552 552 class bundlepart(object):
553 553 """A bundle2 part contains application level payload
554 554
555 555 The part `type` is used to route the part to the application level
556 556 handler.
557 557
558 558 The part payload is contained in ``part.data``. It could be raw bytes or a
559 559 generator of byte chunks.
560 560
561 561 You can add parameters to the part using the ``addparam`` method.
562 562 Parameters can be either mandatory (default) or advisory. Remote side
563 563 should be able to safely ignore the advisory ones.
564 564
565 565 Both data and parameters cannot be modified after the generation has begun.
566 566 """
567 567
568 568 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
569 569 data=''):
570 570 self.id = None
571 571 self.type = parttype
572 572 self._data = data
573 573 self._mandatoryparams = list(mandatoryparams)
574 574 self._advisoryparams = list(advisoryparams)
575 575 # checking for duplicated entries
576 576 self._seenparams = set()
577 577 for pname, __ in self._mandatoryparams + self._advisoryparams:
578 578 if pname in self._seenparams:
579 579 raise RuntimeError('duplicated params: %s' % pname)
580 580 self._seenparams.add(pname)
581 581 # status of the part's generation:
582 582 # - None: not started,
583 583 # - False: currently generated,
584 584 # - True: generation done.
585 585 self._generated = None
586 586
587 587 # methods used to defines the part content
588 588 def __setdata(self, data):
589 589 if self._generated is not None:
590 590 raise error.ReadOnlyPartError('part is being generated')
591 591 self._data = data
592 592 def __getdata(self):
593 593 return self._data
594 594 data = property(__getdata, __setdata)
595 595
596 596 @property
597 597 def mandatoryparams(self):
598 598 # make it an immutable tuple to force people through ``addparam``
599 599 return tuple(self._mandatoryparams)
600 600
601 601 @property
602 602 def advisoryparams(self):
603 603 # make it an immutable tuple to force people through ``addparam``
604 604 return tuple(self._advisoryparams)
605 605
606 606 def addparam(self, name, value='', mandatory=True):
607 607 if self._generated is not None:
608 608 raise error.ReadOnlyPartError('part is being generated')
609 609 if name in self._seenparams:
610 610 raise ValueError('duplicated params: %s' % name)
611 611 self._seenparams.add(name)
612 612 params = self._advisoryparams
613 613 if mandatory:
614 614 params = self._mandatoryparams
615 615 params.append((name, value))
616 616
617 617 # methods used to generates the bundle2 stream
618 618 def getchunks(self):
619 619 if self._generated is not None:
620 620 raise RuntimeError('part can only be consumed once')
621 621 self._generated = False
622 622 #### header
623 623 ## parttype
624 624 header = [_pack(_fparttypesize, len(self.type)),
625 625 self.type, _pack(_fpartid, self.id),
626 626 ]
627 627 ## parameters
628 628 # count
629 629 manpar = self.mandatoryparams
630 630 advpar = self.advisoryparams
631 631 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
632 632 # size
633 633 parsizes = []
634 634 for key, value in manpar:
635 635 parsizes.append(len(key))
636 636 parsizes.append(len(value))
637 637 for key, value in advpar:
638 638 parsizes.append(len(key))
639 639 parsizes.append(len(value))
640 640 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
641 641 header.append(paramsizes)
642 642 # key, value
643 643 for key, value in manpar:
644 644 header.append(key)
645 645 header.append(value)
646 646 for key, value in advpar:
647 647 header.append(key)
648 648 header.append(value)
649 649 ## finalize header
650 650 headerchunk = ''.join(header)
651 651 yield _pack(_fpartheadersize, len(headerchunk))
652 652 yield headerchunk
653 653 ## payload
654 654 for chunk in self._payloadchunks():
655 655 yield _pack(_fpayloadsize, len(chunk))
656 656 yield chunk
657 657 # end of payload
658 658 yield _pack(_fpayloadsize, 0)
659 659 self._generated = True
660 660
661 661 def _payloadchunks(self):
662 662 """yield chunks of a the part payload
663 663
664 664 Exists to handle the different methods to provide data to a part."""
665 665 # we only support fixed size data now.
666 666 # This will be improved in the future.
667 667 if util.safehasattr(self.data, 'next'):
668 668 buff = util.chunkbuffer(self.data)
669 669 chunk = buff.read(preferedchunksize)
670 670 while chunk:
671 671 yield chunk
672 672 chunk = buff.read(preferedchunksize)
673 673 elif len(self.data):
674 674 yield self.data
675 675
676 676 class unbundlepart(unpackermixin):
677 677 """a bundle part read from a bundle"""
678 678
679 679 def __init__(self, ui, header, fp):
680 680 super(unbundlepart, self).__init__(fp)
681 681 self.ui = ui
682 682 # unbundle state attr
683 683 self._headerdata = header
684 684 self._headeroffset = 0
685 685 self._initialized = False
686 686 self.consumed = False
687 687 # part data
688 688 self.id = None
689 689 self.type = None
690 690 self.mandatoryparams = None
691 691 self.advisoryparams = None
692 692 self.params = None
693 693 self.mandatorykeys = ()
694 694 self._payloadstream = None
695 695 self._readheader()
696 696
697 697 def _fromheader(self, size):
698 698 """return the next <size> byte from the header"""
699 699 offset = self._headeroffset
700 700 data = self._headerdata[offset:(offset + size)]
701 701 self._headeroffset = offset + size
702 702 return data
703 703
704 704 def _unpackheader(self, format):
705 705 """read given format from header
706 706
707 707 This automatically compute the size of the format to read."""
708 708 data = self._fromheader(struct.calcsize(format))
709 709 return _unpack(format, data)
710 710
711 711 def _initparams(self, mandatoryparams, advisoryparams):
712 712 """internal function to setup all logic related parameters"""
713 713 # make it read only to prevent people touching it by mistake.
714 714 self.mandatoryparams = tuple(mandatoryparams)
715 715 self.advisoryparams = tuple(advisoryparams)
716 716 # user friendly UI
717 717 self.params = dict(self.mandatoryparams)
718 718 self.params.update(dict(self.advisoryparams))
719 719 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
720 720
721 721 def _readheader(self):
722 722 """read the header and setup the object"""
723 723 typesize = self._unpackheader(_fparttypesize)[0]
724 724 self.type = self._fromheader(typesize)
725 725 self.ui.debug('part type: "%s"\n' % self.type)
726 726 self.id = self._unpackheader(_fpartid)[0]
727 727 self.ui.debug('part id: "%s"\n' % self.id)
728 728 ## reading parameters
729 729 # param count
730 730 mancount, advcount = self._unpackheader(_fpartparamcount)
731 731 self.ui.debug('part parameters: %i\n' % (mancount + advcount))
732 732 # param size
733 733 fparamsizes = _makefpartparamsizes(mancount + advcount)
734 734 paramsizes = self._unpackheader(fparamsizes)
735 735 # make it a list of couple again
736 736 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
737 737 # split mandatory from advisory
738 738 mansizes = paramsizes[:mancount]
739 739 advsizes = paramsizes[mancount:]
740 740 # retrive param value
741 741 manparams = []
742 742 for key, value in mansizes:
743 743 manparams.append((self._fromheader(key), self._fromheader(value)))
744 744 advparams = []
745 745 for key, value in advsizes:
746 746 advparams.append((self._fromheader(key), self._fromheader(value)))
747 747 self._initparams(manparams, advparams)
748 748 ## part payload
749 749 def payloadchunks():
750 750 payloadsize = self._unpack(_fpayloadsize)[0]
751 751 self.ui.debug('payload chunk size: %i\n' % payloadsize)
752 752 while payloadsize:
753 753 yield self._readexact(payloadsize)
754 754 payloadsize = self._unpack(_fpayloadsize)[0]
755 755 self.ui.debug('payload chunk size: %i\n' % payloadsize)
756 756 self._payloadstream = util.chunkbuffer(payloadchunks())
757 757 # we read the data, tell it
758 758 self._initialized = True
759 759
760 760 def read(self, size=None):
761 761 """read payload data"""
762 762 if not self._initialized:
763 763 self._readheader()
764 764 if size is None:
765 765 data = self._payloadstream.read()
766 766 else:
767 767 data = self._payloadstream.read(size)
768 768 if size is None or len(data) < size:
769 769 self.consumed = True
770 770 return data
771 771
772 def bundle2caps(remote):
773 """return the bundlecapabilities of a peer as dict"""
774 raw = remote.capable('bundle2-exp')
775 if not raw and raw != '':
776 return {}
777 capsblob = urllib.unquote(remote.capable('bundle2-exp'))
778 return decodecaps(capsblob)
772 779
773 780 @parthandler('b2x:changegroup')
774 781 def handlechangegroup(op, inpart):
775 782 """apply a changegroup part on the repo
776 783
777 784 This is a very early implementation that will massive rework before being
778 785 inflicted to any end-user.
779 786 """
780 787 # Make sure we trigger a transaction creation
781 788 #
782 789 # The addchangegroup function will get a transaction object by itself, but
783 790 # we need to make sure we trigger the creation of a transaction object used
784 791 # for the whole processing scope.
785 792 op.gettransaction()
786 793 cg = changegroup.unbundle10(inpart, 'UN')
787 794 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
788 795 op.records.add('changegroup', {'return': ret})
789 796 if op.reply is not None:
790 797 # This is definitly not the final form of this
791 798 # return. But one need to start somewhere.
792 799 part = op.reply.newpart('b2x:reply:changegroup')
793 800 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
794 801 part.addparam('return', '%i' % ret, mandatory=False)
795 802 assert not inpart.read()
796 803
797 804 @parthandler('b2x:reply:changegroup', ('return', 'in-reply-to'))
798 805 def handlechangegroup(op, inpart):
799 806 ret = int(inpart.params['return'])
800 807 replyto = int(inpart.params['in-reply-to'])
801 808 op.records.add('changegroup', {'return': ret}, replyto)
802 809
803 810 @parthandler('b2x:check:heads')
804 811 def handlechangegroup(op, inpart):
805 812 """check that head of the repo did not change
806 813
807 814 This is used to detect a push race when using unbundle.
808 815 This replaces the "heads" argument of unbundle."""
809 816 h = inpart.read(20)
810 817 heads = []
811 818 while len(h) == 20:
812 819 heads.append(h)
813 820 h = inpart.read(20)
814 821 assert not h
815 822 if heads != op.repo.heads():
816 823 raise error.PushRaced('repository changed while pushing - '
817 824 'please try again')
818 825
819 826 @parthandler('b2x:output')
820 827 def handleoutput(op, inpart):
821 828 """forward output captured on the server to the client"""
822 829 for line in inpart.read().splitlines():
823 830 op.ui.write(('remote: %s\n' % line))
824 831
825 832 @parthandler('b2x:replycaps')
826 833 def handlereplycaps(op, inpart):
827 834 """Notify that a reply bundle should be created
828 835
829 836 The payload contains the capabilities information for the reply"""
830 837 caps = decodecaps(inpart.read())
831 838 if op.reply is None:
832 839 op.reply = bundle20(op.ui, caps)
833 840
834 841 @parthandler('b2x:error:abort', ('message', 'hint'))
835 842 def handlereplycaps(op, inpart):
836 843 """Used to transmit abort error over the wire"""
837 844 raise util.Abort(inpart.params['message'], hint=inpart.params.get('hint'))
838 845
839 846 @parthandler('b2x:error:unsupportedcontent', ('parttype', 'params'))
840 847 def handlereplycaps(op, inpart):
841 848 """Used to transmit unknown content error over the wire"""
842 849 kwargs = {}
843 850 parttype = inpart.params.get('parttype')
844 851 if parttype is not None:
845 852 kwargs['parttype'] = parttype
846 853 params = inpart.params.get('params')
847 854 if params is not None:
848 855 kwargs['params'] = params.split('\0')
849 856
850 857 raise error.BundleValueError(**kwargs)
851 858
852 859 @parthandler('b2x:error:pushraced', ('message',))
853 860 def handlereplycaps(op, inpart):
854 861 """Used to transmit push race error over the wire"""
855 862 raise error.ResponseError(_('push failed:'), inpart.params['message'])
@@ -1,730 +1,728 b''
1 1 # exchange.py - utility to exchange data between repos.
2 2 #
3 3 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from i18n import _
9 9 from node import hex, nullid
10 10 import errno, urllib
11 11 import util, scmutil, changegroup, base85, error
12 12 import discovery, phases, obsolete, bookmarks, bundle2
13 13
14 14 def readbundle(ui, fh, fname, vfs=None):
15 15 header = changegroup.readexactly(fh, 4)
16 16
17 17 alg = None
18 18 if not fname:
19 19 fname = "stream"
20 20 if not header.startswith('HG') and header.startswith('\0'):
21 21 fh = changegroup.headerlessfixup(fh, header)
22 22 header = "HG10"
23 23 alg = 'UN'
24 24 elif vfs:
25 25 fname = vfs.join(fname)
26 26
27 27 magic, version = header[0:2], header[2:4]
28 28
29 29 if magic != 'HG':
30 30 raise util.Abort(_('%s: not a Mercurial bundle') % fname)
31 31 if version == '10':
32 32 if alg is None:
33 33 alg = changegroup.readexactly(fh, 2)
34 34 return changegroup.unbundle10(fh, alg)
35 35 elif version == '2X':
36 36 return bundle2.unbundle20(ui, fh, header=magic + version)
37 37 else:
38 38 raise util.Abort(_('%s: unknown bundle version %s') % (fname, version))
39 39
40 40
41 41 class pushoperation(object):
42 42 """A object that represent a single push operation
43 43
44 44 It purpose is to carry push related state and very common operation.
45 45
46 46 A new should be created at the beginning of each push and discarded
47 47 afterward.
48 48 """
49 49
50 50 def __init__(self, repo, remote, force=False, revs=None, newbranch=False):
51 51 # repo we push from
52 52 self.repo = repo
53 53 self.ui = repo.ui
54 54 # repo we push to
55 55 self.remote = remote
56 56 # force option provided
57 57 self.force = force
58 58 # revs to be pushed (None is "all")
59 59 self.revs = revs
60 60 # allow push of new branch
61 61 self.newbranch = newbranch
62 62 # did a local lock get acquired?
63 63 self.locallocked = None
64 64 # Integer version of the push result
65 65 # - None means nothing to push
66 66 # - 0 means HTTP error
67 67 # - 1 means we pushed and remote head count is unchanged *or*
68 68 # we have outgoing changesets but refused to push
69 69 # - other values as described by addchangegroup()
70 70 self.ret = None
71 71 # discover.outgoing object (contains common and outgoing data)
72 72 self.outgoing = None
73 73 # all remote heads before the push
74 74 self.remoteheads = None
75 75 # testable as a boolean indicating if any nodes are missing locally.
76 76 self.incoming = None
77 77 # set of all heads common after changeset bundle push
78 78 self.commonheads = None
79 79
80 80 def push(repo, remote, force=False, revs=None, newbranch=False):
81 81 '''Push outgoing changesets (limited by revs) from a local
82 82 repository to remote. Return an integer:
83 83 - None means nothing to push
84 84 - 0 means HTTP error
85 85 - 1 means we pushed and remote head count is unchanged *or*
86 86 we have outgoing changesets but refused to push
87 87 - other values as described by addchangegroup()
88 88 '''
89 89 pushop = pushoperation(repo, remote, force, revs, newbranch)
90 90 if pushop.remote.local():
91 91 missing = (set(pushop.repo.requirements)
92 92 - pushop.remote.local().supported)
93 93 if missing:
94 94 msg = _("required features are not"
95 95 " supported in the destination:"
96 96 " %s") % (', '.join(sorted(missing)))
97 97 raise util.Abort(msg)
98 98
99 99 # there are two ways to push to remote repo:
100 100 #
101 101 # addchangegroup assumes local user can lock remote
102 102 # repo (local filesystem, old ssh servers).
103 103 #
104 104 # unbundle assumes local user cannot lock remote repo (new ssh
105 105 # servers, http servers).
106 106
107 107 if not pushop.remote.canpush():
108 108 raise util.Abort(_("destination does not support push"))
109 109 # get local lock as we might write phase data
110 110 locallock = None
111 111 try:
112 112 locallock = pushop.repo.lock()
113 113 pushop.locallocked = True
114 114 except IOError, err:
115 115 pushop.locallocked = False
116 116 if err.errno != errno.EACCES:
117 117 raise
118 118 # source repo cannot be locked.
119 119 # We do not abort the push, but just disable the local phase
120 120 # synchronisation.
121 121 msg = 'cannot lock source repository: %s\n' % err
122 122 pushop.ui.debug(msg)
123 123 try:
124 124 pushop.repo.checkpush(pushop)
125 125 lock = None
126 126 unbundle = pushop.remote.capable('unbundle')
127 127 if not unbundle:
128 128 lock = pushop.remote.lock()
129 129 try:
130 130 _pushdiscovery(pushop)
131 131 if _pushcheckoutgoing(pushop):
132 132 pushop.repo.prepushoutgoinghooks(pushop.repo,
133 133 pushop.remote,
134 134 pushop.outgoing)
135 135 if (pushop.repo.ui.configbool('experimental', 'bundle2-exp',
136 136 False)
137 137 and pushop.remote.capable('bundle2-exp')):
138 138 _pushbundle2(pushop)
139 139 else:
140 140 _pushchangeset(pushop)
141 141 _pushcomputecommonheads(pushop)
142 142 _pushsyncphase(pushop)
143 143 _pushobsolete(pushop)
144 144 finally:
145 145 if lock is not None:
146 146 lock.release()
147 147 finally:
148 148 if locallock is not None:
149 149 locallock.release()
150 150
151 151 _pushbookmark(pushop)
152 152 return pushop.ret
153 153
154 154 def _pushdiscovery(pushop):
155 155 # discovery
156 156 unfi = pushop.repo.unfiltered()
157 157 fci = discovery.findcommonincoming
158 158 commoninc = fci(unfi, pushop.remote, force=pushop.force)
159 159 common, inc, remoteheads = commoninc
160 160 fco = discovery.findcommonoutgoing
161 161 outgoing = fco(unfi, pushop.remote, onlyheads=pushop.revs,
162 162 commoninc=commoninc, force=pushop.force)
163 163 pushop.outgoing = outgoing
164 164 pushop.remoteheads = remoteheads
165 165 pushop.incoming = inc
166 166
167 167 def _pushcheckoutgoing(pushop):
168 168 outgoing = pushop.outgoing
169 169 unfi = pushop.repo.unfiltered()
170 170 if not outgoing.missing:
171 171 # nothing to push
172 172 scmutil.nochangesfound(unfi.ui, unfi, outgoing.excluded)
173 173 return False
174 174 # something to push
175 175 if not pushop.force:
176 176 # if repo.obsstore == False --> no obsolete
177 177 # then, save the iteration
178 178 if unfi.obsstore:
179 179 # this message are here for 80 char limit reason
180 180 mso = _("push includes obsolete changeset: %s!")
181 181 mst = "push includes %s changeset: %s!"
182 182 # plain versions for i18n tool to detect them
183 183 _("push includes unstable changeset: %s!")
184 184 _("push includes bumped changeset: %s!")
185 185 _("push includes divergent changeset: %s!")
186 186 # If we are to push if there is at least one
187 187 # obsolete or unstable changeset in missing, at
188 188 # least one of the missinghead will be obsolete or
189 189 # unstable. So checking heads only is ok
190 190 for node in outgoing.missingheads:
191 191 ctx = unfi[node]
192 192 if ctx.obsolete():
193 193 raise util.Abort(mso % ctx)
194 194 elif ctx.troubled():
195 195 raise util.Abort(_(mst)
196 196 % (ctx.troubles()[0],
197 197 ctx))
198 198 newbm = pushop.ui.configlist('bookmarks', 'pushing')
199 199 discovery.checkheads(unfi, pushop.remote, outgoing,
200 200 pushop.remoteheads,
201 201 pushop.newbranch,
202 202 bool(pushop.incoming),
203 203 newbm)
204 204 return True
205 205
206 206 def _pushbundle2(pushop):
207 207 """push data to the remote using bundle2
208 208
209 209 The only currently supported type of data is changegroup but this will
210 210 evolve in the future."""
211 capsblob = urllib.unquote(pushop.remote.capable('bundle2-exp'))
212 caps = bundle2.decodecaps(capsblob)
213 bundler = bundle2.bundle20(pushop.ui, caps)
211 bundler = bundle2.bundle20(pushop.ui, bundle2.bundle2caps(pushop.remote))
214 212 # create reply capability
215 213 capsblob = bundle2.encodecaps(pushop.repo.bundle2caps)
216 214 bundler.newpart('b2x:replycaps', data=capsblob)
217 215 # Send known heads to the server for race detection.
218 216 if not pushop.force:
219 217 bundler.newpart('B2X:CHECK:HEADS', data=iter(pushop.remoteheads))
220 218 extrainfo = _pushbundle2extraparts(pushop, bundler)
221 219 # add the changegroup bundle
222 220 cg = changegroup.getlocalbundle(pushop.repo, 'push', pushop.outgoing)
223 221 cgpart = bundler.newpart('B2X:CHANGEGROUP', data=cg.getchunks())
224 222 stream = util.chunkbuffer(bundler.getchunks())
225 223 try:
226 224 reply = pushop.remote.unbundle(stream, ['force'], 'push')
227 225 except error.BundleValueError, exc:
228 226 raise util.Abort('missing support for %s' % exc)
229 227 try:
230 228 op = bundle2.processbundle(pushop.repo, reply)
231 229 except error.BundleValueError, exc:
232 230 raise util.Abort('missing support for %s' % exc)
233 231 cgreplies = op.records.getreplies(cgpart.id)
234 232 assert len(cgreplies['changegroup']) == 1
235 233 pushop.ret = cgreplies['changegroup'][0]['return']
236 234 _pushbundle2extrareply(pushop, op, extrainfo)
237 235
238 236 def _pushbundle2extraparts(pushop, bundler):
239 237 """hook function to let extensions add parts
240 238
241 239 Return a dict to let extensions pass data to the reply processing.
242 240 """
243 241 return {}
244 242
245 243 def _pushbundle2extrareply(pushop, op, extrainfo):
246 244 """hook function to let extensions react to part replies
247 245
248 246 The dict from _pushbundle2extrareply is fed to this function.
249 247 """
250 248 pass
251 249
252 250 def _pushchangeset(pushop):
253 251 """Make the actual push of changeset bundle to remote repo"""
254 252 outgoing = pushop.outgoing
255 253 unbundle = pushop.remote.capable('unbundle')
256 254 # TODO: get bundlecaps from remote
257 255 bundlecaps = None
258 256 # create a changegroup from local
259 257 if pushop.revs is None and not (outgoing.excluded
260 258 or pushop.repo.changelog.filteredrevs):
261 259 # push everything,
262 260 # use the fast path, no race possible on push
263 261 bundler = changegroup.bundle10(pushop.repo, bundlecaps)
264 262 cg = changegroup.getsubset(pushop.repo,
265 263 outgoing,
266 264 bundler,
267 265 'push',
268 266 fastpath=True)
269 267 else:
270 268 cg = changegroup.getlocalbundle(pushop.repo, 'push', outgoing,
271 269 bundlecaps)
272 270
273 271 # apply changegroup to remote
274 272 if unbundle:
275 273 # local repo finds heads on server, finds out what
276 274 # revs it must push. once revs transferred, if server
277 275 # finds it has different heads (someone else won
278 276 # commit/push race), server aborts.
279 277 if pushop.force:
280 278 remoteheads = ['force']
281 279 else:
282 280 remoteheads = pushop.remoteheads
283 281 # ssh: return remote's addchangegroup()
284 282 # http: return remote's addchangegroup() or 0 for error
285 283 pushop.ret = pushop.remote.unbundle(cg, remoteheads,
286 284 'push')
287 285 else:
288 286 # we return an integer indicating remote head count
289 287 # change
290 288 pushop.ret = pushop.remote.addchangegroup(cg, 'push', pushop.repo.url())
291 289
292 290 def _pushcomputecommonheads(pushop):
293 291 unfi = pushop.repo.unfiltered()
294 292 if pushop.ret:
295 293 # push succeed, synchronize target of the push
296 294 cheads = pushop.outgoing.missingheads
297 295 elif pushop.revs is None:
298 296 # All out push fails. synchronize all common
299 297 cheads = pushop.outgoing.commonheads
300 298 else:
301 299 # I want cheads = heads(::missingheads and ::commonheads)
302 300 # (missingheads is revs with secret changeset filtered out)
303 301 #
304 302 # This can be expressed as:
305 303 # cheads = ( (missingheads and ::commonheads)
306 304 # + (commonheads and ::missingheads))"
307 305 # )
308 306 #
309 307 # while trying to push we already computed the following:
310 308 # common = (::commonheads)
311 309 # missing = ((commonheads::missingheads) - commonheads)
312 310 #
313 311 # We can pick:
314 312 # * missingheads part of common (::commonheads)
315 313 common = set(pushop.outgoing.common)
316 314 nm = pushop.repo.changelog.nodemap
317 315 cheads = [node for node in pushop.revs if nm[node] in common]
318 316 # and
319 317 # * commonheads parents on missing
320 318 revset = unfi.set('%ln and parents(roots(%ln))',
321 319 pushop.outgoing.commonheads,
322 320 pushop.outgoing.missing)
323 321 cheads.extend(c.node() for c in revset)
324 322 pushop.commonheads = cheads
325 323
326 324 def _pushsyncphase(pushop):
327 325 """synchronise phase information locally and remotely"""
328 326 unfi = pushop.repo.unfiltered()
329 327 cheads = pushop.commonheads
330 328 # even when we don't push, exchanging phase data is useful
331 329 remotephases = pushop.remote.listkeys('phases')
332 330 if (pushop.ui.configbool('ui', '_usedassubrepo', False)
333 331 and remotephases # server supports phases
334 332 and pushop.ret is None # nothing was pushed
335 333 and remotephases.get('publishing', False)):
336 334 # When:
337 335 # - this is a subrepo push
338 336 # - and remote support phase
339 337 # - and no changeset was pushed
340 338 # - and remote is publishing
341 339 # We may be in issue 3871 case!
342 340 # We drop the possible phase synchronisation done by
343 341 # courtesy to publish changesets possibly locally draft
344 342 # on the remote.
345 343 remotephases = {'publishing': 'True'}
346 344 if not remotephases: # old server or public only reply from non-publishing
347 345 _localphasemove(pushop, cheads)
348 346 # don't push any phase data as there is nothing to push
349 347 else:
350 348 ana = phases.analyzeremotephases(pushop.repo, cheads,
351 349 remotephases)
352 350 pheads, droots = ana
353 351 ### Apply remote phase on local
354 352 if remotephases.get('publishing', False):
355 353 _localphasemove(pushop, cheads)
356 354 else: # publish = False
357 355 _localphasemove(pushop, pheads)
358 356 _localphasemove(pushop, cheads, phases.draft)
359 357 ### Apply local phase on remote
360 358
361 359 # Get the list of all revs draft on remote by public here.
362 360 # XXX Beware that revset break if droots is not strictly
363 361 # XXX root we may want to ensure it is but it is costly
364 362 outdated = unfi.set('heads((%ln::%ln) and public())',
365 363 droots, cheads)
366 364 for newremotehead in outdated:
367 365 r = pushop.remote.pushkey('phases',
368 366 newremotehead.hex(),
369 367 str(phases.draft),
370 368 str(phases.public))
371 369 if not r:
372 370 pushop.ui.warn(_('updating %s to public failed!\n')
373 371 % newremotehead)
374 372
375 373 def _localphasemove(pushop, nodes, phase=phases.public):
376 374 """move <nodes> to <phase> in the local source repo"""
377 375 if pushop.locallocked:
378 376 phases.advanceboundary(pushop.repo, phase, nodes)
379 377 else:
380 378 # repo is not locked, do not change any phases!
381 379 # Informs the user that phases should have been moved when
382 380 # applicable.
383 381 actualmoves = [n for n in nodes if phase < pushop.repo[n].phase()]
384 382 phasestr = phases.phasenames[phase]
385 383 if actualmoves:
386 384 pushop.ui.status(_('cannot lock source repo, skipping '
387 385 'local %s phase update\n') % phasestr)
388 386
389 387 def _pushobsolete(pushop):
390 388 """utility function to push obsolete markers to a remote"""
391 389 pushop.ui.debug('try to push obsolete markers to remote\n')
392 390 repo = pushop.repo
393 391 remote = pushop.remote
394 392 if (obsolete._enabled and repo.obsstore and
395 393 'obsolete' in remote.listkeys('namespaces')):
396 394 rslts = []
397 395 remotedata = repo.listkeys('obsolete')
398 396 for key in sorted(remotedata, reverse=True):
399 397 # reverse sort to ensure we end with dump0
400 398 data = remotedata[key]
401 399 rslts.append(remote.pushkey('obsolete', key, '', data))
402 400 if [r for r in rslts if not r]:
403 401 msg = _('failed to push some obsolete markers!\n')
404 402 repo.ui.warn(msg)
405 403
406 404 def _pushbookmark(pushop):
407 405 """Update bookmark position on remote"""
408 406 ui = pushop.ui
409 407 repo = pushop.repo.unfiltered()
410 408 remote = pushop.remote
411 409 ui.debug("checking for updated bookmarks\n")
412 410 revnums = map(repo.changelog.rev, pushop.revs or [])
413 411 ancestors = [a for a in repo.changelog.ancestors(revnums, inclusive=True)]
414 412 (addsrc, adddst, advsrc, advdst, diverge, differ, invalid
415 413 ) = bookmarks.compare(repo, repo._bookmarks, remote.listkeys('bookmarks'),
416 414 srchex=hex)
417 415
418 416 for b, scid, dcid in advsrc:
419 417 if ancestors and repo[scid].rev() not in ancestors:
420 418 continue
421 419 if remote.pushkey('bookmarks', b, dcid, scid):
422 420 ui.status(_("updating bookmark %s\n") % b)
423 421 else:
424 422 ui.warn(_('updating bookmark %s failed!\n') % b)
425 423
426 424 class pulloperation(object):
427 425 """A object that represent a single pull operation
428 426
429 427 It purpose is to carry push related state and very common operation.
430 428
431 429 A new should be created at the beginning of each pull and discarded
432 430 afterward.
433 431 """
434 432
435 433 def __init__(self, repo, remote, heads=None, force=False):
436 434 # repo we pull into
437 435 self.repo = repo
438 436 # repo we pull from
439 437 self.remote = remote
440 438 # revision we try to pull (None is "all")
441 439 self.heads = heads
442 440 # do we force pull?
443 441 self.force = force
444 442 # the name the pull transaction
445 443 self._trname = 'pull\n' + util.hidepassword(remote.url())
446 444 # hold the transaction once created
447 445 self._tr = None
448 446 # set of common changeset between local and remote before pull
449 447 self.common = None
450 448 # set of pulled head
451 449 self.rheads = None
452 450 # list of missing changeset to fetch remotely
453 451 self.fetch = None
454 452 # result of changegroup pulling (used as return code by pull)
455 453 self.cgresult = None
456 454 # list of step remaining todo (related to future bundle2 usage)
457 455 self.todosteps = set(['changegroup', 'phases', 'obsmarkers'])
458 456
459 457 @util.propertycache
460 458 def pulledsubset(self):
461 459 """heads of the set of changeset target by the pull"""
462 460 # compute target subset
463 461 if self.heads is None:
464 462 # We pulled every thing possible
465 463 # sync on everything common
466 464 c = set(self.common)
467 465 ret = list(self.common)
468 466 for n in self.rheads:
469 467 if n not in c:
470 468 ret.append(n)
471 469 return ret
472 470 else:
473 471 # We pulled a specific subset
474 472 # sync on this subset
475 473 return self.heads
476 474
477 475 def gettransaction(self):
478 476 """get appropriate pull transaction, creating it if needed"""
479 477 if self._tr is None:
480 478 self._tr = self.repo.transaction(self._trname)
481 479 return self._tr
482 480
483 481 def closetransaction(self):
484 482 """close transaction if created"""
485 483 if self._tr is not None:
486 484 self._tr.close()
487 485
488 486 def releasetransaction(self):
489 487 """release transaction if created"""
490 488 if self._tr is not None:
491 489 self._tr.release()
492 490
493 491 def pull(repo, remote, heads=None, force=False):
494 492 pullop = pulloperation(repo, remote, heads, force)
495 493 if pullop.remote.local():
496 494 missing = set(pullop.remote.requirements) - pullop.repo.supported
497 495 if missing:
498 496 msg = _("required features are not"
499 497 " supported in the destination:"
500 498 " %s") % (', '.join(sorted(missing)))
501 499 raise util.Abort(msg)
502 500
503 501 lock = pullop.repo.lock()
504 502 try:
505 503 _pulldiscovery(pullop)
506 504 if (pullop.repo.ui.configbool('experimental', 'bundle2-exp', False)
507 505 and pullop.remote.capable('bundle2-exp')):
508 506 _pullbundle2(pullop)
509 507 if 'changegroup' in pullop.todosteps:
510 508 _pullchangeset(pullop)
511 509 if 'phases' in pullop.todosteps:
512 510 _pullphase(pullop)
513 511 if 'obsmarkers' in pullop.todosteps:
514 512 _pullobsolete(pullop)
515 513 pullop.closetransaction()
516 514 finally:
517 515 pullop.releasetransaction()
518 516 lock.release()
519 517
520 518 return pullop.cgresult
521 519
522 520 def _pulldiscovery(pullop):
523 521 """discovery phase for the pull
524 522
525 523 Current handle changeset discovery only, will change handle all discovery
526 524 at some point."""
527 525 tmp = discovery.findcommonincoming(pullop.repo.unfiltered(),
528 526 pullop.remote,
529 527 heads=pullop.heads,
530 528 force=pullop.force)
531 529 pullop.common, pullop.fetch, pullop.rheads = tmp
532 530
533 531 def _pullbundle2(pullop):
534 532 """pull data using bundle2
535 533
536 534 For now, the only supported data are changegroup."""
537 535 kwargs = {'bundlecaps': set(['HG2X'])}
538 536 capsblob = bundle2.encodecaps(pullop.repo.bundle2caps)
539 537 kwargs['bundlecaps'].add('bundle2=' + urllib.quote(capsblob))
540 538 # pulling changegroup
541 539 pullop.todosteps.remove('changegroup')
542 540
543 541 kwargs['common'] = pullop.common
544 542 kwargs['heads'] = pullop.heads or pullop.rheads
545 543 if not pullop.fetch:
546 544 pullop.repo.ui.status(_("no changes found\n"))
547 545 pullop.cgresult = 0
548 546 else:
549 547 if pullop.heads is None and list(pullop.common) == [nullid]:
550 548 pullop.repo.ui.status(_("requesting all changes\n"))
551 549 _pullbundle2extraprepare(pullop, kwargs)
552 550 if kwargs.keys() == ['format']:
553 551 return # nothing to pull
554 552 bundle = pullop.remote.getbundle('pull', **kwargs)
555 553 try:
556 554 op = bundle2.processbundle(pullop.repo, bundle, pullop.gettransaction)
557 555 except error.BundleValueError, exc:
558 556 raise util.Abort('missing support for %s' % exc)
559 557
560 558 if pullop.fetch:
561 559 assert len(op.records['changegroup']) == 1
562 560 pullop.cgresult = op.records['changegroup'][0]['return']
563 561
564 562 def _pullbundle2extraprepare(pullop, kwargs):
565 563 """hook function so that extensions can extend the getbundle call"""
566 564 pass
567 565
568 566 def _pullchangeset(pullop):
569 567 """pull changeset from unbundle into the local repo"""
570 568 # We delay the open of the transaction as late as possible so we
571 569 # don't open transaction for nothing or you break future useful
572 570 # rollback call
573 571 pullop.todosteps.remove('changegroup')
574 572 if not pullop.fetch:
575 573 pullop.repo.ui.status(_("no changes found\n"))
576 574 pullop.cgresult = 0
577 575 return
578 576 pullop.gettransaction()
579 577 if pullop.heads is None and list(pullop.common) == [nullid]:
580 578 pullop.repo.ui.status(_("requesting all changes\n"))
581 579 elif pullop.heads is None and pullop.remote.capable('changegroupsubset'):
582 580 # issue1320, avoid a race if remote changed after discovery
583 581 pullop.heads = pullop.rheads
584 582
585 583 if pullop.remote.capable('getbundle'):
586 584 # TODO: get bundlecaps from remote
587 585 cg = pullop.remote.getbundle('pull', common=pullop.common,
588 586 heads=pullop.heads or pullop.rheads)
589 587 elif pullop.heads is None:
590 588 cg = pullop.remote.changegroup(pullop.fetch, 'pull')
591 589 elif not pullop.remote.capable('changegroupsubset'):
592 590 raise util.Abort(_("partial pull cannot be done because "
593 591 "other repository doesn't support "
594 592 "changegroupsubset."))
595 593 else:
596 594 cg = pullop.remote.changegroupsubset(pullop.fetch, pullop.heads, 'pull')
597 595 pullop.cgresult = changegroup.addchangegroup(pullop.repo, cg, 'pull',
598 596 pullop.remote.url())
599 597
600 598 def _pullphase(pullop):
601 599 # Get remote phases data from remote
602 600 pullop.todosteps.remove('phases')
603 601 remotephases = pullop.remote.listkeys('phases')
604 602 publishing = bool(remotephases.get('publishing', False))
605 603 if remotephases and not publishing:
606 604 # remote is new and unpublishing
607 605 pheads, _dr = phases.analyzeremotephases(pullop.repo,
608 606 pullop.pulledsubset,
609 607 remotephases)
610 608 phases.advanceboundary(pullop.repo, phases.public, pheads)
611 609 phases.advanceboundary(pullop.repo, phases.draft,
612 610 pullop.pulledsubset)
613 611 else:
614 612 # Remote is old or publishing all common changesets
615 613 # should be seen as public
616 614 phases.advanceboundary(pullop.repo, phases.public,
617 615 pullop.pulledsubset)
618 616
619 617 def _pullobsolete(pullop):
620 618 """utility function to pull obsolete markers from a remote
621 619
622 620 The `gettransaction` is function that return the pull transaction, creating
623 621 one if necessary. We return the transaction to inform the calling code that
624 622 a new transaction have been created (when applicable).
625 623
626 624 Exists mostly to allow overriding for experimentation purpose"""
627 625 pullop.todosteps.remove('obsmarkers')
628 626 tr = None
629 627 if obsolete._enabled:
630 628 pullop.repo.ui.debug('fetching remote obsolete markers\n')
631 629 remoteobs = pullop.remote.listkeys('obsolete')
632 630 if 'dump0' in remoteobs:
633 631 tr = pullop.gettransaction()
634 632 for key in sorted(remoteobs, reverse=True):
635 633 if key.startswith('dump'):
636 634 data = base85.b85decode(remoteobs[key])
637 635 pullop.repo.obsstore.mergemarkers(tr, data)
638 636 pullop.repo.invalidatevolatilesets()
639 637 return tr
640 638
641 639 def getbundle(repo, source, heads=None, common=None, bundlecaps=None,
642 640 **kwargs):
643 641 """return a full bundle (with potentially multiple kind of parts)
644 642
645 643 Could be a bundle HG10 or a bundle HG2X depending on bundlecaps
646 644 passed. For now, the bundle can contain only changegroup, but this will
647 645 changes when more part type will be available for bundle2.
648 646
649 647 This is different from changegroup.getbundle that only returns an HG10
650 648 changegroup bundle. They may eventually get reunited in the future when we
651 649 have a clearer idea of the API we what to query different data.
652 650
653 651 The implementation is at a very early stage and will get massive rework
654 652 when the API of bundle is refined.
655 653 """
656 654 # build changegroup bundle here.
657 655 cg = changegroup.getbundle(repo, source, heads=heads,
658 656 common=common, bundlecaps=bundlecaps)
659 657 if bundlecaps is None or 'HG2X' not in bundlecaps:
660 658 return cg
661 659 # very crude first implementation,
662 660 # the bundle API will change and the generation will be done lazily.
663 661 b2caps = {}
664 662 for bcaps in bundlecaps:
665 663 if bcaps.startswith('bundle2='):
666 664 blob = urllib.unquote(bcaps[len('bundle2='):])
667 665 b2caps.update(bundle2.decodecaps(blob))
668 666 bundler = bundle2.bundle20(repo.ui, b2caps)
669 667 if cg:
670 668 bundler.newpart('b2x:changegroup', data=cg.getchunks())
671 669 _getbundleextrapart(bundler, repo, source, heads=heads, common=common,
672 670 bundlecaps=bundlecaps, **kwargs)
673 671 return util.chunkbuffer(bundler.getchunks())
674 672
675 673 def _getbundleextrapart(bundler, repo, source, heads=None, common=None,
676 674 bundlecaps=None, **kwargs):
677 675 """hook function to let extensions add parts to the requested bundle"""
678 676 pass
679 677
680 678 def check_heads(repo, their_heads, context):
681 679 """check if the heads of a repo have been modified
682 680
683 681 Used by peer for unbundling.
684 682 """
685 683 heads = repo.heads()
686 684 heads_hash = util.sha1(''.join(sorted(heads))).digest()
687 685 if not (their_heads == ['force'] or their_heads == heads or
688 686 their_heads == ['hashed', heads_hash]):
689 687 # someone else committed/pushed/unbundled while we
690 688 # were transferring data
691 689 raise error.PushRaced('repository changed while %s - '
692 690 'please try again' % context)
693 691
694 692 def unbundle(repo, cg, heads, source, url):
695 693 """Apply a bundle to a repo.
696 694
697 695 this function makes sure the repo is locked during the application and have
698 696 mechanism to check that no push race occurred between the creation of the
699 697 bundle and its application.
700 698
701 699 If the push was raced as PushRaced exception is raised."""
702 700 r = 0
703 701 # need a transaction when processing a bundle2 stream
704 702 tr = None
705 703 lock = repo.lock()
706 704 try:
707 705 check_heads(repo, heads, 'uploading changes')
708 706 # push can proceed
709 707 if util.safehasattr(cg, 'params'):
710 708 try:
711 709 tr = repo.transaction('unbundle')
712 710 tr.hookargs['bundle2-exp'] = '1'
713 711 r = bundle2.processbundle(repo, cg, lambda: tr).reply
714 712 cl = repo.unfiltered().changelog
715 713 p = cl.writepending() and repo.root or ""
716 714 repo.hook('b2x-pretransactionclose', throw=True, source=source,
717 715 url=url, pending=p, **tr.hookargs)
718 716 tr.close()
719 717 repo.hook('b2x-transactionclose', source=source, url=url,
720 718 **tr.hookargs)
721 719 except Exception, exc:
722 720 exc.duringunbundle2 = True
723 721 raise
724 722 else:
725 723 r = changegroup.addchangegroup(repo, cg, source, url)
726 724 finally:
727 725 if tr is not None:
728 726 tr.release()
729 727 lock.release()
730 728 return r
General Comments 0
You need to be logged in to leave comments. Login now