##// END OF EJS Templates
bundle2: store the salvaged output on the exception object...
Pierre-Yves David -
r24795:f9aa4cb8 default
parent child Browse files
Show More
@@ -1,1260 +1,1264 b''
1 1 # bundle2.py - generic container format to transmit arbitrary data.
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7 """Handling of the new bundle2 format
8 8
9 9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 11 that will be handed to and processed by the application layer.
12 12
13 13
14 14 General format architecture
15 15 ===========================
16 16
17 17 The format is architectured as follow
18 18
19 19 - magic string
20 20 - stream level parameters
21 21 - payload parts (any number)
22 22 - end of stream marker.
23 23
24 24 the Binary format
25 25 ============================
26 26
27 27 All numbers are unsigned and big-endian.
28 28
29 29 stream level parameters
30 30 ------------------------
31 31
32 32 Binary format is as follow
33 33
34 34 :params size: int32
35 35
36 36 The total number of Bytes used by the parameters
37 37
38 38 :params value: arbitrary number of Bytes
39 39
40 40 A blob of `params size` containing the serialized version of all stream level
41 41 parameters.
42 42
43 43 The blob contains a space separated list of parameters. Parameters with value
44 44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45 45
46 46 Empty name are obviously forbidden.
47 47
48 48 Name MUST start with a letter. If this first letter is lower case, the
49 49 parameter is advisory and can be safely ignored. However when the first
50 50 letter is capital, the parameter is mandatory and the bundling process MUST
51 51 stop if he is not able to proceed it.
52 52
53 53 Stream parameters use a simple textual format for two main reasons:
54 54
55 55 - Stream level parameters should remain simple and we want to discourage any
56 56 crazy usage.
57 57 - Textual data allow easy human inspection of a bundle2 header in case of
58 58 troubles.
59 59
60 60 Any Applicative level options MUST go into a bundle2 part instead.
61 61
62 62 Payload part
63 63 ------------------------
64 64
65 65 Binary format is as follow
66 66
67 67 :header size: int32
68 68
69 69 The total number of Bytes used by the part headers. When the header is empty
70 70 (size = 0) this is interpreted as the end of stream marker.
71 71
72 72 :header:
73 73
74 74 The header defines how to interpret the part. It contains two piece of
75 75 data: the part type, and the part parameters.
76 76
77 77 The part type is used to route an application level handler, that can
78 78 interpret payload.
79 79
80 80 Part parameters are passed to the application level handler. They are
81 81 meant to convey information that will help the application level object to
82 82 interpret the part payload.
83 83
84 84 The binary format of the header is has follow
85 85
86 86 :typesize: (one byte)
87 87
88 88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89 89
90 90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 91 to this part.
92 92
93 93 :parameters:
94 94
95 95 Part's parameter may have arbitrary content, the binary structure is::
96 96
97 97 <mandatory-count><advisory-count><param-sizes><param-data>
98 98
99 99 :mandatory-count: 1 byte, number of mandatory parameters
100 100
101 101 :advisory-count: 1 byte, number of advisory parameters
102 102
103 103 :param-sizes:
104 104
105 105 N couple of bytes, where N is the total number of parameters. Each
106 106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107 107
108 108 :param-data:
109 109
110 110 A blob of bytes from which each parameter key and value can be
111 111 retrieved using the list of size couples stored in the previous
112 112 field.
113 113
114 114 Mandatory parameters comes first, then the advisory ones.
115 115
116 116 Each parameter's key MUST be unique within the part.
117 117
118 118 :payload:
119 119
120 120 payload is a series of `<chunksize><chunkdata>`.
121 121
122 122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124 124
125 125 The current implementation always produces either zero or one chunk.
126 126 This is an implementation limitation that will ultimately be lifted.
127 127
128 128 `chunksize` can be negative to trigger special case processing. No such
129 129 processing is in place yet.
130 130
131 131 Bundle processing
132 132 ============================
133 133
134 134 Each part is processed in order using a "part handler". Handler are registered
135 135 for a certain part type.
136 136
137 137 The matching of a part to its handler is case insensitive. The case of the
138 138 part type is used to know if a part is mandatory or advisory. If the Part type
139 139 contains any uppercase char it is considered mandatory. When no handler is
140 140 known for a Mandatory part, the process is aborted and an exception is raised.
141 141 If the part is advisory and no handler is known, the part is ignored. When the
142 142 process is aborted, the full bundle is still read from the stream to keep the
143 143 channel usable. But none of the part read from an abort are processed. In the
144 144 future, dropping the stream may become an option for channel we do not care to
145 145 preserve.
146 146 """
147 147
148 148 import errno
149 149 import sys
150 150 import util
151 151 import struct
152 152 import urllib
153 153 import string
154 154 import obsolete
155 155 import pushkey
156 156 import url
157 157 import re
158 158
159 159 import changegroup, error
160 160 from i18n import _
161 161
162 162 _pack = struct.pack
163 163 _unpack = struct.unpack
164 164
165 165 _fstreamparamsize = '>i'
166 166 _fpartheadersize = '>i'
167 167 _fparttypesize = '>B'
168 168 _fpartid = '>I'
169 169 _fpayloadsize = '>i'
170 170 _fpartparamcount = '>BB'
171 171
172 172 preferedchunksize = 4096
173 173
174 174 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
175 175
176 176 def validateparttype(parttype):
177 177 """raise ValueError if a parttype contains invalid character"""
178 178 if _parttypeforbidden.search(parttype):
179 179 raise ValueError(parttype)
180 180
181 181 def _makefpartparamsizes(nbparams):
182 182 """return a struct format to read part parameter sizes
183 183
184 184 The number parameters is variable so we need to build that format
185 185 dynamically.
186 186 """
187 187 return '>'+('BB'*nbparams)
188 188
189 189 parthandlermapping = {}
190 190
191 191 def parthandler(parttype, params=()):
192 192 """decorator that register a function as a bundle2 part handler
193 193
194 194 eg::
195 195
196 196 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
197 197 def myparttypehandler(...):
198 198 '''process a part of type "my part".'''
199 199 ...
200 200 """
201 201 validateparttype(parttype)
202 202 def _decorator(func):
203 203 lparttype = parttype.lower() # enforce lower case matching.
204 204 assert lparttype not in parthandlermapping
205 205 parthandlermapping[lparttype] = func
206 206 func.params = frozenset(params)
207 207 return func
208 208 return _decorator
209 209
210 210 class unbundlerecords(object):
211 211 """keep record of what happens during and unbundle
212 212
213 213 New records are added using `records.add('cat', obj)`. Where 'cat' is a
214 214 category of record and obj is an arbitrary object.
215 215
216 216 `records['cat']` will return all entries of this category 'cat'.
217 217
218 218 Iterating on the object itself will yield `('category', obj)` tuples
219 219 for all entries.
220 220
221 221 All iterations happens in chronological order.
222 222 """
223 223
224 224 def __init__(self):
225 225 self._categories = {}
226 226 self._sequences = []
227 227 self._replies = {}
228 228
229 229 def add(self, category, entry, inreplyto=None):
230 230 """add a new record of a given category.
231 231
232 232 The entry can then be retrieved in the list returned by
233 233 self['category']."""
234 234 self._categories.setdefault(category, []).append(entry)
235 235 self._sequences.append((category, entry))
236 236 if inreplyto is not None:
237 237 self.getreplies(inreplyto).add(category, entry)
238 238
239 239 def getreplies(self, partid):
240 240 """get the records that are replies to a specific part"""
241 241 return self._replies.setdefault(partid, unbundlerecords())
242 242
243 243 def __getitem__(self, cat):
244 244 return tuple(self._categories.get(cat, ()))
245 245
246 246 def __iter__(self):
247 247 return iter(self._sequences)
248 248
249 249 def __len__(self):
250 250 return len(self._sequences)
251 251
252 252 def __nonzero__(self):
253 253 return bool(self._sequences)
254 254
255 255 class bundleoperation(object):
256 256 """an object that represents a single bundling process
257 257
258 258 Its purpose is to carry unbundle-related objects and states.
259 259
260 260 A new object should be created at the beginning of each bundle processing.
261 261 The object is to be returned by the processing function.
262 262
263 263 The object has very little content now it will ultimately contain:
264 264 * an access to the repo the bundle is applied to,
265 265 * a ui object,
266 266 * a way to retrieve a transaction to add changes to the repo,
267 267 * a way to record the result of processing each part,
268 268 * a way to construct a bundle response when applicable.
269 269 """
270 270
271 271 def __init__(self, repo, transactiongetter):
272 272 self.repo = repo
273 273 self.ui = repo.ui
274 274 self.records = unbundlerecords()
275 275 self.gettransaction = transactiongetter
276 276 self.reply = None
277 277
278 278 class TransactionUnavailable(RuntimeError):
279 279 pass
280 280
281 281 def _notransaction():
282 282 """default method to get a transaction while processing a bundle
283 283
284 284 Raise an exception to highlight the fact that no transaction was expected
285 285 to be created"""
286 286 raise TransactionUnavailable()
287 287
288 288 def processbundle(repo, unbundler, transactiongetter=None):
289 289 """This function process a bundle, apply effect to/from a repo
290 290
291 291 It iterates over each part then searches for and uses the proper handling
292 292 code to process the part. Parts are processed in order.
293 293
294 294 This is very early version of this function that will be strongly reworked
295 295 before final usage.
296 296
297 297 Unknown Mandatory part will abort the process.
298 298 """
299 299 if transactiongetter is None:
300 300 transactiongetter = _notransaction
301 301 op = bundleoperation(repo, transactiongetter)
302 302 # todo:
303 303 # - replace this is a init function soon.
304 304 # - exception catching
305 305 unbundler.params
306 306 iterparts = unbundler.iterparts()
307 307 part = None
308 308 try:
309 309 for part in iterparts:
310 310 _processpart(op, part)
311 311 except Exception, exc:
312 312 for part in iterparts:
313 313 # consume the bundle content
314 314 part.seek(0, 2)
315 315 # Small hack to let caller code distinguish exceptions from bundle2
316 316 # processing from processing the old format. This is mostly
317 317 # needed to handle different return codes to unbundle according to the
318 318 # type of bundle. We should probably clean up or drop this return code
319 319 # craziness in a future version.
320 320 exc.duringunbundle2 = True
321 salvaged = []
322 if op.reply is not None:
323 salvaged = op.reply.salvageoutput()
324 exc._bundle2salvagedoutput = salvaged
321 325 raise
322 326 return op
323 327
324 328 def _processpart(op, part):
325 329 """process a single part from a bundle
326 330
327 331 The part is guaranteed to have been fully consumed when the function exits
328 332 (even if an exception is raised)."""
329 333 try:
330 334 try:
331 335 handler = parthandlermapping.get(part.type)
332 336 if handler is None:
333 337 raise error.UnsupportedPartError(parttype=part.type)
334 338 op.ui.debug('found a handler for part %r\n' % part.type)
335 339 unknownparams = part.mandatorykeys - handler.params
336 340 if unknownparams:
337 341 unknownparams = list(unknownparams)
338 342 unknownparams.sort()
339 343 raise error.UnsupportedPartError(parttype=part.type,
340 344 params=unknownparams)
341 345 except error.UnsupportedPartError, exc:
342 346 if part.mandatory: # mandatory parts
343 347 raise
344 348 op.ui.debug('ignoring unsupported advisory part %s\n' % exc)
345 349 return # skip to part processing
346 350
347 351 # handler is called outside the above try block so that we don't
348 352 # risk catching KeyErrors from anything other than the
349 353 # parthandlermapping lookup (any KeyError raised by handler()
350 354 # itself represents a defect of a different variety).
351 355 output = None
352 356 if op.reply is not None:
353 357 op.ui.pushbuffer(error=True)
354 358 output = ''
355 359 try:
356 360 handler(op, part)
357 361 finally:
358 362 if output is not None:
359 363 output = op.ui.popbuffer()
360 364 if output:
361 365 outpart = op.reply.newpart('output', data=output,
362 366 mandatory=False)
363 367 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
364 368 finally:
365 369 # consume the part content to not corrupt the stream.
366 370 part.seek(0, 2)
367 371
368 372
369 373 def decodecaps(blob):
370 374 """decode a bundle2 caps bytes blob into a dictionary
371 375
372 376 The blob is a list of capabilities (one per line)
373 377 Capabilities may have values using a line of the form::
374 378
375 379 capability=value1,value2,value3
376 380
377 381 The values are always a list."""
378 382 caps = {}
379 383 for line in blob.splitlines():
380 384 if not line:
381 385 continue
382 386 if '=' not in line:
383 387 key, vals = line, ()
384 388 else:
385 389 key, vals = line.split('=', 1)
386 390 vals = vals.split(',')
387 391 key = urllib.unquote(key)
388 392 vals = [urllib.unquote(v) for v in vals]
389 393 caps[key] = vals
390 394 return caps
391 395
392 396 def encodecaps(caps):
393 397 """encode a bundle2 caps dictionary into a bytes blob"""
394 398 chunks = []
395 399 for ca in sorted(caps):
396 400 vals = caps[ca]
397 401 ca = urllib.quote(ca)
398 402 vals = [urllib.quote(v) for v in vals]
399 403 if vals:
400 404 ca = "%s=%s" % (ca, ','.join(vals))
401 405 chunks.append(ca)
402 406 return '\n'.join(chunks)
403 407
404 408 class bundle20(object):
405 409 """represent an outgoing bundle2 container
406 410
407 411 Use the `addparam` method to add stream level parameter. and `newpart` to
408 412 populate it. Then call `getchunks` to retrieve all the binary chunks of
409 413 data that compose the bundle2 container."""
410 414
411 415 _magicstring = 'HG20'
412 416
413 417 def __init__(self, ui, capabilities=()):
414 418 self.ui = ui
415 419 self._params = []
416 420 self._parts = []
417 421 self.capabilities = dict(capabilities)
418 422
419 423 @property
420 424 def nbparts(self):
421 425 """total number of parts added to the bundler"""
422 426 return len(self._parts)
423 427
424 428 # methods used to defines the bundle2 content
425 429 def addparam(self, name, value=None):
426 430 """add a stream level parameter"""
427 431 if not name:
428 432 raise ValueError('empty parameter name')
429 433 if name[0] not in string.letters:
430 434 raise ValueError('non letter first character: %r' % name)
431 435 self._params.append((name, value))
432 436
433 437 def addpart(self, part):
434 438 """add a new part to the bundle2 container
435 439
436 440 Parts contains the actual applicative payload."""
437 441 assert part.id is None
438 442 part.id = len(self._parts) # very cheap counter
439 443 self._parts.append(part)
440 444
441 445 def newpart(self, typeid, *args, **kwargs):
442 446 """create a new part and add it to the containers
443 447
444 448 As the part is directly added to the containers. For now, this means
445 449 that any failure to properly initialize the part after calling
446 450 ``newpart`` should result in a failure of the whole bundling process.
447 451
448 452 You can still fall back to manually create and add if you need better
449 453 control."""
450 454 part = bundlepart(typeid, *args, **kwargs)
451 455 self.addpart(part)
452 456 return part
453 457
454 458 # methods used to generate the bundle2 stream
455 459 def getchunks(self):
456 460 self.ui.debug('start emission of %s stream\n' % self._magicstring)
457 461 yield self._magicstring
458 462 param = self._paramchunk()
459 463 self.ui.debug('bundle parameter: %s\n' % param)
460 464 yield _pack(_fstreamparamsize, len(param))
461 465 if param:
462 466 yield param
463 467
464 468 self.ui.debug('start of parts\n')
465 469 for part in self._parts:
466 470 self.ui.debug('bundle part: "%s"\n' % part.type)
467 471 for chunk in part.getchunks():
468 472 yield chunk
469 473 self.ui.debug('end of bundle\n')
470 474 yield _pack(_fpartheadersize, 0)
471 475
472 476 def _paramchunk(self):
473 477 """return a encoded version of all stream parameters"""
474 478 blocks = []
475 479 for par, value in self._params:
476 480 par = urllib.quote(par)
477 481 if value is not None:
478 482 value = urllib.quote(value)
479 483 par = '%s=%s' % (par, value)
480 484 blocks.append(par)
481 485 return ' '.join(blocks)
482 486
483 487 def salvageoutput(self):
484 488 """return a list with a copy of all output parts in the bundle
485 489
486 490 This is meant to be used during error handling to make sure we preserve
487 491 server output"""
488 492 salvaged = []
489 493 for part in self._parts:
490 494 if part.type.startswith('output'):
491 495 salvaged.append(part.copy())
492 496 return salvaged
493 497
494 498
495 499 class unpackermixin(object):
496 500 """A mixin to extract bytes and struct data from a stream"""
497 501
498 502 def __init__(self, fp):
499 503 self._fp = fp
500 504 self._seekable = (util.safehasattr(fp, 'seek') and
501 505 util.safehasattr(fp, 'tell'))
502 506
503 507 def _unpack(self, format):
504 508 """unpack this struct format from the stream"""
505 509 data = self._readexact(struct.calcsize(format))
506 510 return _unpack(format, data)
507 511
508 512 def _readexact(self, size):
509 513 """read exactly <size> bytes from the stream"""
510 514 return changegroup.readexactly(self._fp, size)
511 515
512 516 def seek(self, offset, whence=0):
513 517 """move the underlying file pointer"""
514 518 if self._seekable:
515 519 return self._fp.seek(offset, whence)
516 520 else:
517 521 raise NotImplementedError(_('File pointer is not seekable'))
518 522
519 523 def tell(self):
520 524 """return the file offset, or None if file is not seekable"""
521 525 if self._seekable:
522 526 try:
523 527 return self._fp.tell()
524 528 except IOError, e:
525 529 if e.errno == errno.ESPIPE:
526 530 self._seekable = False
527 531 else:
528 532 raise
529 533 return None
530 534
531 535 def close(self):
532 536 """close underlying file"""
533 537 if util.safehasattr(self._fp, 'close'):
534 538 return self._fp.close()
535 539
536 540 def getunbundler(ui, fp, header=None):
537 541 """return a valid unbundler object for a given header"""
538 542 if header is None:
539 543 header = changegroup.readexactly(fp, 4)
540 544 magic, version = header[0:2], header[2:4]
541 545 if magic != 'HG':
542 546 raise util.Abort(_('not a Mercurial bundle'))
543 547 unbundlerclass = formatmap.get(version)
544 548 if unbundlerclass is None:
545 549 raise util.Abort(_('unknown bundle version %s') % version)
546 550 unbundler = unbundlerclass(ui, fp)
547 551 ui.debug('start processing of %s stream\n' % header)
548 552 return unbundler
549 553
550 554 class unbundle20(unpackermixin):
551 555 """interpret a bundle2 stream
552 556
553 557 This class is fed with a binary stream and yields parts through its
554 558 `iterparts` methods."""
555 559
556 560 def __init__(self, ui, fp):
557 561 """If header is specified, we do not read it out of the stream."""
558 562 self.ui = ui
559 563 super(unbundle20, self).__init__(fp)
560 564
561 565 @util.propertycache
562 566 def params(self):
563 567 """dictionary of stream level parameters"""
564 568 self.ui.debug('reading bundle2 stream parameters\n')
565 569 params = {}
566 570 paramssize = self._unpack(_fstreamparamsize)[0]
567 571 if paramssize < 0:
568 572 raise error.BundleValueError('negative bundle param size: %i'
569 573 % paramssize)
570 574 if paramssize:
571 575 for p in self._readexact(paramssize).split(' '):
572 576 p = p.split('=', 1)
573 577 p = [urllib.unquote(i) for i in p]
574 578 if len(p) < 2:
575 579 p.append(None)
576 580 self._processparam(*p)
577 581 params[p[0]] = p[1]
578 582 return params
579 583
580 584 def _processparam(self, name, value):
581 585 """process a parameter, applying its effect if needed
582 586
583 587 Parameter starting with a lower case letter are advisory and will be
584 588 ignored when unknown. Those starting with an upper case letter are
585 589 mandatory and will this function will raise a KeyError when unknown.
586 590
587 591 Note: no option are currently supported. Any input will be either
588 592 ignored or failing.
589 593 """
590 594 if not name:
591 595 raise ValueError('empty parameter name')
592 596 if name[0] not in string.letters:
593 597 raise ValueError('non letter first character: %r' % name)
594 598 # Some logic will be later added here to try to process the option for
595 599 # a dict of known parameter.
596 600 if name[0].islower():
597 601 self.ui.debug("ignoring unknown parameter %r\n" % name)
598 602 else:
599 603 raise error.UnsupportedPartError(params=(name,))
600 604
601 605
602 606 def iterparts(self):
603 607 """yield all parts contained in the stream"""
604 608 # make sure param have been loaded
605 609 self.params
606 610 self.ui.debug('start extraction of bundle2 parts\n')
607 611 headerblock = self._readpartheader()
608 612 while headerblock is not None:
609 613 part = unbundlepart(self.ui, headerblock, self._fp)
610 614 yield part
611 615 part.seek(0, 2)
612 616 headerblock = self._readpartheader()
613 617 self.ui.debug('end of bundle2 stream\n')
614 618
615 619 def _readpartheader(self):
616 620 """reads a part header size and return the bytes blob
617 621
618 622 returns None if empty"""
619 623 headersize = self._unpack(_fpartheadersize)[0]
620 624 if headersize < 0:
621 625 raise error.BundleValueError('negative part header size: %i'
622 626 % headersize)
623 627 self.ui.debug('part header size: %i\n' % headersize)
624 628 if headersize:
625 629 return self._readexact(headersize)
626 630 return None
627 631
628 632 def compressed(self):
629 633 return False
630 634
631 635 formatmap = {'20': unbundle20}
632 636
633 637 class bundlepart(object):
634 638 """A bundle2 part contains application level payload
635 639
636 640 The part `type` is used to route the part to the application level
637 641 handler.
638 642
639 643 The part payload is contained in ``part.data``. It could be raw bytes or a
640 644 generator of byte chunks.
641 645
642 646 You can add parameters to the part using the ``addparam`` method.
643 647 Parameters can be either mandatory (default) or advisory. Remote side
644 648 should be able to safely ignore the advisory ones.
645 649
646 650 Both data and parameters cannot be modified after the generation has begun.
647 651 """
648 652
649 653 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
650 654 data='', mandatory=True):
651 655 validateparttype(parttype)
652 656 self.id = None
653 657 self.type = parttype
654 658 self._data = data
655 659 self._mandatoryparams = list(mandatoryparams)
656 660 self._advisoryparams = list(advisoryparams)
657 661 # checking for duplicated entries
658 662 self._seenparams = set()
659 663 for pname, __ in self._mandatoryparams + self._advisoryparams:
660 664 if pname in self._seenparams:
661 665 raise RuntimeError('duplicated params: %s' % pname)
662 666 self._seenparams.add(pname)
663 667 # status of the part's generation:
664 668 # - None: not started,
665 669 # - False: currently generated,
666 670 # - True: generation done.
667 671 self._generated = None
668 672 self.mandatory = mandatory
669 673
670 674 def copy(self):
671 675 """return a copy of the part
672 676
673 677 The new part have the very same content but no partid assigned yet.
674 678 Parts with generated data cannot be copied."""
675 679 assert not util.safehasattr(self.data, 'next')
676 680 return self.__class__(self.type, self._mandatoryparams,
677 681 self._advisoryparams, self._data, self.mandatory)
678 682
679 683 # methods used to defines the part content
680 684 def __setdata(self, data):
681 685 if self._generated is not None:
682 686 raise error.ReadOnlyPartError('part is being generated')
683 687 self._data = data
684 688 def __getdata(self):
685 689 return self._data
686 690 data = property(__getdata, __setdata)
687 691
688 692 @property
689 693 def mandatoryparams(self):
690 694 # make it an immutable tuple to force people through ``addparam``
691 695 return tuple(self._mandatoryparams)
692 696
693 697 @property
694 698 def advisoryparams(self):
695 699 # make it an immutable tuple to force people through ``addparam``
696 700 return tuple(self._advisoryparams)
697 701
698 702 def addparam(self, name, value='', mandatory=True):
699 703 if self._generated is not None:
700 704 raise error.ReadOnlyPartError('part is being generated')
701 705 if name in self._seenparams:
702 706 raise ValueError('duplicated params: %s' % name)
703 707 self._seenparams.add(name)
704 708 params = self._advisoryparams
705 709 if mandatory:
706 710 params = self._mandatoryparams
707 711 params.append((name, value))
708 712
709 713 # methods used to generates the bundle2 stream
710 714 def getchunks(self):
711 715 if self._generated is not None:
712 716 raise RuntimeError('part can only be consumed once')
713 717 self._generated = False
714 718 #### header
715 719 if self.mandatory:
716 720 parttype = self.type.upper()
717 721 else:
718 722 parttype = self.type.lower()
719 723 ## parttype
720 724 header = [_pack(_fparttypesize, len(parttype)),
721 725 parttype, _pack(_fpartid, self.id),
722 726 ]
723 727 ## parameters
724 728 # count
725 729 manpar = self.mandatoryparams
726 730 advpar = self.advisoryparams
727 731 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
728 732 # size
729 733 parsizes = []
730 734 for key, value in manpar:
731 735 parsizes.append(len(key))
732 736 parsizes.append(len(value))
733 737 for key, value in advpar:
734 738 parsizes.append(len(key))
735 739 parsizes.append(len(value))
736 740 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
737 741 header.append(paramsizes)
738 742 # key, value
739 743 for key, value in manpar:
740 744 header.append(key)
741 745 header.append(value)
742 746 for key, value in advpar:
743 747 header.append(key)
744 748 header.append(value)
745 749 ## finalize header
746 750 headerchunk = ''.join(header)
747 751 yield _pack(_fpartheadersize, len(headerchunk))
748 752 yield headerchunk
749 753 ## payload
750 754 try:
751 755 for chunk in self._payloadchunks():
752 756 yield _pack(_fpayloadsize, len(chunk))
753 757 yield chunk
754 758 except Exception, exc:
755 759 # backup exception data for later
756 760 exc_info = sys.exc_info()
757 761 msg = 'unexpected error: %s' % exc
758 762 interpart = bundlepart('error:abort', [('message', msg)],
759 763 mandatory=False)
760 764 interpart.id = 0
761 765 yield _pack(_fpayloadsize, -1)
762 766 for chunk in interpart.getchunks():
763 767 yield chunk
764 768 # abort current part payload
765 769 yield _pack(_fpayloadsize, 0)
766 770 raise exc_info[0], exc_info[1], exc_info[2]
767 771 # end of payload
768 772 yield _pack(_fpayloadsize, 0)
769 773 self._generated = True
770 774
771 775 def _payloadchunks(self):
772 776 """yield chunks of a the part payload
773 777
774 778 Exists to handle the different methods to provide data to a part."""
775 779 # we only support fixed size data now.
776 780 # This will be improved in the future.
777 781 if util.safehasattr(self.data, 'next'):
778 782 buff = util.chunkbuffer(self.data)
779 783 chunk = buff.read(preferedchunksize)
780 784 while chunk:
781 785 yield chunk
782 786 chunk = buff.read(preferedchunksize)
783 787 elif len(self.data):
784 788 yield self.data
785 789
786 790
787 791 flaginterrupt = -1
788 792
789 793 class interrupthandler(unpackermixin):
790 794 """read one part and process it with restricted capability
791 795
792 796 This allows to transmit exception raised on the producer size during part
793 797 iteration while the consumer is reading a part.
794 798
795 799 Part processed in this manner only have access to a ui object,"""
796 800
797 801 def __init__(self, ui, fp):
798 802 super(interrupthandler, self).__init__(fp)
799 803 self.ui = ui
800 804
801 805 def _readpartheader(self):
802 806 """reads a part header size and return the bytes blob
803 807
804 808 returns None if empty"""
805 809 headersize = self._unpack(_fpartheadersize)[0]
806 810 if headersize < 0:
807 811 raise error.BundleValueError('negative part header size: %i'
808 812 % headersize)
809 813 self.ui.debug('part header size: %i\n' % headersize)
810 814 if headersize:
811 815 return self._readexact(headersize)
812 816 return None
813 817
814 818 def __call__(self):
815 819 self.ui.debug('bundle2 stream interruption, looking for a part.\n')
816 820 headerblock = self._readpartheader()
817 821 if headerblock is None:
818 822 self.ui.debug('no part found during interruption.\n')
819 823 return
820 824 part = unbundlepart(self.ui, headerblock, self._fp)
821 825 op = interruptoperation(self.ui)
822 826 _processpart(op, part)
823 827
824 828 class interruptoperation(object):
825 829 """A limited operation to be use by part handler during interruption
826 830
827 831 It only have access to an ui object.
828 832 """
829 833
830 834 def __init__(self, ui):
831 835 self.ui = ui
832 836 self.reply = None
833 837
834 838 @property
835 839 def repo(self):
836 840 raise RuntimeError('no repo access from stream interruption')
837 841
838 842 def gettransaction(self):
839 843 raise TransactionUnavailable('no repo access from stream interruption')
840 844
841 845 class unbundlepart(unpackermixin):
842 846 """a bundle part read from a bundle"""
843 847
844 848 def __init__(self, ui, header, fp):
845 849 super(unbundlepart, self).__init__(fp)
846 850 self.ui = ui
847 851 # unbundle state attr
848 852 self._headerdata = header
849 853 self._headeroffset = 0
850 854 self._initialized = False
851 855 self.consumed = False
852 856 # part data
853 857 self.id = None
854 858 self.type = None
855 859 self.mandatoryparams = None
856 860 self.advisoryparams = None
857 861 self.params = None
858 862 self.mandatorykeys = ()
859 863 self._payloadstream = None
860 864 self._readheader()
861 865 self._mandatory = None
862 866 self._chunkindex = [] #(payload, file) position tuples for chunk starts
863 867 self._pos = 0
864 868
865 869 def _fromheader(self, size):
866 870 """return the next <size> byte from the header"""
867 871 offset = self._headeroffset
868 872 data = self._headerdata[offset:(offset + size)]
869 873 self._headeroffset = offset + size
870 874 return data
871 875
872 876 def _unpackheader(self, format):
873 877 """read given format from header
874 878
875 879 This automatically compute the size of the format to read."""
876 880 data = self._fromheader(struct.calcsize(format))
877 881 return _unpack(format, data)
878 882
879 883 def _initparams(self, mandatoryparams, advisoryparams):
880 884 """internal function to setup all logic related parameters"""
881 885 # make it read only to prevent people touching it by mistake.
882 886 self.mandatoryparams = tuple(mandatoryparams)
883 887 self.advisoryparams = tuple(advisoryparams)
884 888 # user friendly UI
885 889 self.params = dict(self.mandatoryparams)
886 890 self.params.update(dict(self.advisoryparams))
887 891 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
888 892
889 893 def _payloadchunks(self, chunknum=0):
890 894 '''seek to specified chunk and start yielding data'''
891 895 if len(self._chunkindex) == 0:
892 896 assert chunknum == 0, 'Must start with chunk 0'
893 897 self._chunkindex.append((0, super(unbundlepart, self).tell()))
894 898 else:
895 899 assert chunknum < len(self._chunkindex), \
896 900 'Unknown chunk %d' % chunknum
897 901 super(unbundlepart, self).seek(self._chunkindex[chunknum][1])
898 902
899 903 pos = self._chunkindex[chunknum][0]
900 904 payloadsize = self._unpack(_fpayloadsize)[0]
901 905 self.ui.debug('payload chunk size: %i\n' % payloadsize)
902 906 while payloadsize:
903 907 if payloadsize == flaginterrupt:
904 908 # interruption detection, the handler will now read a
905 909 # single part and process it.
906 910 interrupthandler(self.ui, self._fp)()
907 911 elif payloadsize < 0:
908 912 msg = 'negative payload chunk size: %i' % payloadsize
909 913 raise error.BundleValueError(msg)
910 914 else:
911 915 result = self._readexact(payloadsize)
912 916 chunknum += 1
913 917 pos += payloadsize
914 918 if chunknum == len(self._chunkindex):
915 919 self._chunkindex.append((pos,
916 920 super(unbundlepart, self).tell()))
917 921 yield result
918 922 payloadsize = self._unpack(_fpayloadsize)[0]
919 923 self.ui.debug('payload chunk size: %i\n' % payloadsize)
920 924
921 925 def _findchunk(self, pos):
922 926 '''for a given payload position, return a chunk number and offset'''
923 927 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
924 928 if ppos == pos:
925 929 return chunk, 0
926 930 elif ppos > pos:
927 931 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
928 932 raise ValueError('Unknown chunk')
929 933
930 934 def _readheader(self):
931 935 """read the header and setup the object"""
932 936 typesize = self._unpackheader(_fparttypesize)[0]
933 937 self.type = self._fromheader(typesize)
934 938 self.ui.debug('part type: "%s"\n' % self.type)
935 939 self.id = self._unpackheader(_fpartid)[0]
936 940 self.ui.debug('part id: "%s"\n' % self.id)
937 941 # extract mandatory bit from type
938 942 self.mandatory = (self.type != self.type.lower())
939 943 self.type = self.type.lower()
940 944 ## reading parameters
941 945 # param count
942 946 mancount, advcount = self._unpackheader(_fpartparamcount)
943 947 self.ui.debug('part parameters: %i\n' % (mancount + advcount))
944 948 # param size
945 949 fparamsizes = _makefpartparamsizes(mancount + advcount)
946 950 paramsizes = self._unpackheader(fparamsizes)
947 951 # make it a list of couple again
948 952 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
949 953 # split mandatory from advisory
950 954 mansizes = paramsizes[:mancount]
951 955 advsizes = paramsizes[mancount:]
952 956 # retrieve param value
953 957 manparams = []
954 958 for key, value in mansizes:
955 959 manparams.append((self._fromheader(key), self._fromheader(value)))
956 960 advparams = []
957 961 for key, value in advsizes:
958 962 advparams.append((self._fromheader(key), self._fromheader(value)))
959 963 self._initparams(manparams, advparams)
960 964 ## part payload
961 965 self._payloadstream = util.chunkbuffer(self._payloadchunks())
962 966 # we read the data, tell it
963 967 self._initialized = True
964 968
965 969 def read(self, size=None):
966 970 """read payload data"""
967 971 if not self._initialized:
968 972 self._readheader()
969 973 if size is None:
970 974 data = self._payloadstream.read()
971 975 else:
972 976 data = self._payloadstream.read(size)
973 977 if size is None or len(data) < size:
974 978 self.consumed = True
975 979 self._pos += len(data)
976 980 return data
977 981
978 982 def tell(self):
979 983 return self._pos
980 984
981 985 def seek(self, offset, whence=0):
982 986 if whence == 0:
983 987 newpos = offset
984 988 elif whence == 1:
985 989 newpos = self._pos + offset
986 990 elif whence == 2:
987 991 if not self.consumed:
988 992 self.read()
989 993 newpos = self._chunkindex[-1][0] - offset
990 994 else:
991 995 raise ValueError('Unknown whence value: %r' % (whence,))
992 996
993 997 if newpos > self._chunkindex[-1][0] and not self.consumed:
994 998 self.read()
995 999 if not 0 <= newpos <= self._chunkindex[-1][0]:
996 1000 raise ValueError('Offset out of range')
997 1001
998 1002 if self._pos != newpos:
999 1003 chunk, internaloffset = self._findchunk(newpos)
1000 1004 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1001 1005 adjust = self.read(internaloffset)
1002 1006 if len(adjust) != internaloffset:
1003 1007 raise util.Abort(_('Seek failed\n'))
1004 1008 self._pos = newpos
1005 1009
1006 1010 capabilities = {'HG20': (),
1007 1011 'listkeys': (),
1008 1012 'pushkey': (),
1009 1013 'digests': tuple(sorted(util.DIGESTS.keys())),
1010 1014 'remote-changegroup': ('http', 'https'),
1011 1015 }
1012 1016
1013 1017 def getrepocaps(repo, allowpushback=False):
1014 1018 """return the bundle2 capabilities for a given repo
1015 1019
1016 1020 Exists to allow extensions (like evolution) to mutate the capabilities.
1017 1021 """
1018 1022 caps = capabilities.copy()
1019 1023 caps['changegroup'] = tuple(sorted(changegroup.packermap.keys()))
1020 1024 if obsolete.isenabled(repo, obsolete.exchangeopt):
1021 1025 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1022 1026 caps['obsmarkers'] = supportedformat
1023 1027 if allowpushback:
1024 1028 caps['pushback'] = ()
1025 1029 return caps
1026 1030
1027 1031 def bundle2caps(remote):
1028 1032 """return the bundle capabilities of a peer as dict"""
1029 1033 raw = remote.capable('bundle2')
1030 1034 if not raw and raw != '':
1031 1035 return {}
1032 1036 capsblob = urllib.unquote(remote.capable('bundle2'))
1033 1037 return decodecaps(capsblob)
1034 1038
1035 1039 def obsmarkersversion(caps):
1036 1040 """extract the list of supported obsmarkers versions from a bundle2caps dict
1037 1041 """
1038 1042 obscaps = caps.get('obsmarkers', ())
1039 1043 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1040 1044
1041 1045 @parthandler('changegroup', ('version',))
1042 1046 def handlechangegroup(op, inpart):
1043 1047 """apply a changegroup part on the repo
1044 1048
1045 1049 This is a very early implementation that will massive rework before being
1046 1050 inflicted to any end-user.
1047 1051 """
1048 1052 # Make sure we trigger a transaction creation
1049 1053 #
1050 1054 # The addchangegroup function will get a transaction object by itself, but
1051 1055 # we need to make sure we trigger the creation of a transaction object used
1052 1056 # for the whole processing scope.
1053 1057 op.gettransaction()
1054 1058 unpackerversion = inpart.params.get('version', '01')
1055 1059 # We should raise an appropriate exception here
1056 1060 unpacker = changegroup.packermap[unpackerversion][1]
1057 1061 cg = unpacker(inpart, 'UN')
1058 1062 # the source and url passed here are overwritten by the one contained in
1059 1063 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1060 1064 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
1061 1065 op.records.add('changegroup', {'return': ret})
1062 1066 if op.reply is not None:
1063 1067 # This is definitely not the final form of this
1064 1068 # return. But one need to start somewhere.
1065 1069 part = op.reply.newpart('reply:changegroup', mandatory=False)
1066 1070 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1067 1071 part.addparam('return', '%i' % ret, mandatory=False)
1068 1072 assert not inpart.read()
1069 1073
1070 1074 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1071 1075 ['digest:%s' % k for k in util.DIGESTS.keys()])
1072 1076 @parthandler('remote-changegroup', _remotechangegroupparams)
1073 1077 def handleremotechangegroup(op, inpart):
1074 1078 """apply a bundle10 on the repo, given an url and validation information
1075 1079
1076 1080 All the information about the remote bundle to import are given as
1077 1081 parameters. The parameters include:
1078 1082 - url: the url to the bundle10.
1079 1083 - size: the bundle10 file size. It is used to validate what was
1080 1084 retrieved by the client matches the server knowledge about the bundle.
1081 1085 - digests: a space separated list of the digest types provided as
1082 1086 parameters.
1083 1087 - digest:<digest-type>: the hexadecimal representation of the digest with
1084 1088 that name. Like the size, it is used to validate what was retrieved by
1085 1089 the client matches what the server knows about the bundle.
1086 1090
1087 1091 When multiple digest types are given, all of them are checked.
1088 1092 """
1089 1093 try:
1090 1094 raw_url = inpart.params['url']
1091 1095 except KeyError:
1092 1096 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1093 1097 parsed_url = util.url(raw_url)
1094 1098 if parsed_url.scheme not in capabilities['remote-changegroup']:
1095 1099 raise util.Abort(_('remote-changegroup does not support %s urls') %
1096 1100 parsed_url.scheme)
1097 1101
1098 1102 try:
1099 1103 size = int(inpart.params['size'])
1100 1104 except ValueError:
1101 1105 raise util.Abort(_('remote-changegroup: invalid value for param "%s"')
1102 1106 % 'size')
1103 1107 except KeyError:
1104 1108 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1105 1109
1106 1110 digests = {}
1107 1111 for typ in inpart.params.get('digests', '').split():
1108 1112 param = 'digest:%s' % typ
1109 1113 try:
1110 1114 value = inpart.params[param]
1111 1115 except KeyError:
1112 1116 raise util.Abort(_('remote-changegroup: missing "%s" param') %
1113 1117 param)
1114 1118 digests[typ] = value
1115 1119
1116 1120 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1117 1121
1118 1122 # Make sure we trigger a transaction creation
1119 1123 #
1120 1124 # The addchangegroup function will get a transaction object by itself, but
1121 1125 # we need to make sure we trigger the creation of a transaction object used
1122 1126 # for the whole processing scope.
1123 1127 op.gettransaction()
1124 1128 import exchange
1125 1129 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1126 1130 if not isinstance(cg, changegroup.cg1unpacker):
1127 1131 raise util.Abort(_('%s: not a bundle version 1.0') %
1128 1132 util.hidepassword(raw_url))
1129 1133 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
1130 1134 op.records.add('changegroup', {'return': ret})
1131 1135 if op.reply is not None:
1132 1136 # This is definitely not the final form of this
1133 1137 # return. But one need to start somewhere.
1134 1138 part = op.reply.newpart('reply:changegroup')
1135 1139 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1136 1140 part.addparam('return', '%i' % ret, mandatory=False)
1137 1141 try:
1138 1142 real_part.validate()
1139 1143 except util.Abort, e:
1140 1144 raise util.Abort(_('bundle at %s is corrupted:\n%s') %
1141 1145 (util.hidepassword(raw_url), str(e)))
1142 1146 assert not inpart.read()
1143 1147
1144 1148 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1145 1149 def handlereplychangegroup(op, inpart):
1146 1150 ret = int(inpart.params['return'])
1147 1151 replyto = int(inpart.params['in-reply-to'])
1148 1152 op.records.add('changegroup', {'return': ret}, replyto)
1149 1153
1150 1154 @parthandler('check:heads')
1151 1155 def handlecheckheads(op, inpart):
1152 1156 """check that head of the repo did not change
1153 1157
1154 1158 This is used to detect a push race when using unbundle.
1155 1159 This replaces the "heads" argument of unbundle."""
1156 1160 h = inpart.read(20)
1157 1161 heads = []
1158 1162 while len(h) == 20:
1159 1163 heads.append(h)
1160 1164 h = inpart.read(20)
1161 1165 assert not h
1162 1166 if heads != op.repo.heads():
1163 1167 raise error.PushRaced('repository changed while pushing - '
1164 1168 'please try again')
1165 1169
1166 1170 @parthandler('output')
1167 1171 def handleoutput(op, inpart):
1168 1172 """forward output captured on the server to the client"""
1169 1173 for line in inpart.read().splitlines():
1170 1174 op.ui.write(('remote: %s\n' % line))
1171 1175
1172 1176 @parthandler('replycaps')
1173 1177 def handlereplycaps(op, inpart):
1174 1178 """Notify that a reply bundle should be created
1175 1179
1176 1180 The payload contains the capabilities information for the reply"""
1177 1181 caps = decodecaps(inpart.read())
1178 1182 if op.reply is None:
1179 1183 op.reply = bundle20(op.ui, caps)
1180 1184
1181 1185 @parthandler('error:abort', ('message', 'hint'))
1182 1186 def handleerrorabort(op, inpart):
1183 1187 """Used to transmit abort error over the wire"""
1184 1188 raise util.Abort(inpart.params['message'], hint=inpart.params.get('hint'))
1185 1189
1186 1190 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1187 1191 def handleerrorunsupportedcontent(op, inpart):
1188 1192 """Used to transmit unknown content error over the wire"""
1189 1193 kwargs = {}
1190 1194 parttype = inpart.params.get('parttype')
1191 1195 if parttype is not None:
1192 1196 kwargs['parttype'] = parttype
1193 1197 params = inpart.params.get('params')
1194 1198 if params is not None:
1195 1199 kwargs['params'] = params.split('\0')
1196 1200
1197 1201 raise error.UnsupportedPartError(**kwargs)
1198 1202
1199 1203 @parthandler('error:pushraced', ('message',))
1200 1204 def handleerrorpushraced(op, inpart):
1201 1205 """Used to transmit push race error over the wire"""
1202 1206 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1203 1207
1204 1208 @parthandler('listkeys', ('namespace',))
1205 1209 def handlelistkeys(op, inpart):
1206 1210 """retrieve pushkey namespace content stored in a bundle2"""
1207 1211 namespace = inpart.params['namespace']
1208 1212 r = pushkey.decodekeys(inpart.read())
1209 1213 op.records.add('listkeys', (namespace, r))
1210 1214
1211 1215 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1212 1216 def handlepushkey(op, inpart):
1213 1217 """process a pushkey request"""
1214 1218 dec = pushkey.decode
1215 1219 namespace = dec(inpart.params['namespace'])
1216 1220 key = dec(inpart.params['key'])
1217 1221 old = dec(inpart.params['old'])
1218 1222 new = dec(inpart.params['new'])
1219 1223 ret = op.repo.pushkey(namespace, key, old, new)
1220 1224 record = {'namespace': namespace,
1221 1225 'key': key,
1222 1226 'old': old,
1223 1227 'new': new}
1224 1228 op.records.add('pushkey', record)
1225 1229 if op.reply is not None:
1226 1230 rpart = op.reply.newpart('reply:pushkey')
1227 1231 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1228 1232 rpart.addparam('return', '%i' % ret, mandatory=False)
1229 1233
1230 1234 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1231 1235 def handlepushkeyreply(op, inpart):
1232 1236 """retrieve the result of a pushkey request"""
1233 1237 ret = int(inpart.params['return'])
1234 1238 partid = int(inpart.params['in-reply-to'])
1235 1239 op.records.add('pushkey', {'return': ret}, partid)
1236 1240
1237 1241 @parthandler('obsmarkers')
1238 1242 def handleobsmarker(op, inpart):
1239 1243 """add a stream of obsmarkers to the repo"""
1240 1244 tr = op.gettransaction()
1241 1245 markerdata = inpart.read()
1242 1246 if op.ui.config('experimental', 'obsmarkers-exchange-debug', False):
1243 1247 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1244 1248 % len(markerdata))
1245 1249 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1246 1250 if new:
1247 1251 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1248 1252 op.records.add('obsmarkers', {'new': new})
1249 1253 if op.reply is not None:
1250 1254 rpart = op.reply.newpart('reply:obsmarkers')
1251 1255 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1252 1256 rpart.addparam('new', '%i' % new, mandatory=False)
1253 1257
1254 1258
1255 1259 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1256 1260 def handlepushkeyreply(op, inpart):
1257 1261 """retrieve the result of a pushkey request"""
1258 1262 ret = int(inpart.params['new'])
1259 1263 partid = int(inpart.params['in-reply-to'])
1260 1264 op.records.add('obsmarkers', {'new': ret}, partid)
@@ -1,1308 +1,1311 b''
1 1 # exchange.py - utility to exchange data between repos.
2 2 #
3 3 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from i18n import _
9 9 from node import hex, nullid
10 10 import errno, urllib
11 11 import util, scmutil, changegroup, base85, error
12 12 import discovery, phases, obsolete, bookmarks as bookmod, bundle2, pushkey
13 13 import lock as lockmod
14 14
15 15 def readbundle(ui, fh, fname, vfs=None):
16 16 header = changegroup.readexactly(fh, 4)
17 17
18 18 alg = None
19 19 if not fname:
20 20 fname = "stream"
21 21 if not header.startswith('HG') and header.startswith('\0'):
22 22 fh = changegroup.headerlessfixup(fh, header)
23 23 header = "HG10"
24 24 alg = 'UN'
25 25 elif vfs:
26 26 fname = vfs.join(fname)
27 27
28 28 magic, version = header[0:2], header[2:4]
29 29
30 30 if magic != 'HG':
31 31 raise util.Abort(_('%s: not a Mercurial bundle') % fname)
32 32 if version == '10':
33 33 if alg is None:
34 34 alg = changegroup.readexactly(fh, 2)
35 35 return changegroup.cg1unpacker(fh, alg)
36 36 elif version.startswith('2'):
37 37 return bundle2.getunbundler(ui, fh, header=magic + version)
38 38 else:
39 39 raise util.Abort(_('%s: unknown bundle version %s') % (fname, version))
40 40
41 41 def buildobsmarkerspart(bundler, markers):
42 42 """add an obsmarker part to the bundler with <markers>
43 43
44 44 No part is created if markers is empty.
45 45 Raises ValueError if the bundler doesn't support any known obsmarker format.
46 46 """
47 47 if markers:
48 48 remoteversions = bundle2.obsmarkersversion(bundler.capabilities)
49 49 version = obsolete.commonversion(remoteversions)
50 50 if version is None:
51 51 raise ValueError('bundler do not support common obsmarker format')
52 52 stream = obsolete.encodemarkers(markers, True, version=version)
53 53 return bundler.newpart('obsmarkers', data=stream)
54 54 return None
55 55
56 56 def _canusebundle2(op):
57 57 """return true if a pull/push can use bundle2
58 58
59 59 Feel free to nuke this function when we drop the experimental option"""
60 60 return (op.repo.ui.configbool('experimental', 'bundle2-exp', False)
61 61 and op.remote.capable('bundle2'))
62 62
63 63
64 64 class pushoperation(object):
65 65 """A object that represent a single push operation
66 66
67 67 It purpose is to carry push related state and very common operation.
68 68
69 69 A new should be created at the beginning of each push and discarded
70 70 afterward.
71 71 """
72 72
73 73 def __init__(self, repo, remote, force=False, revs=None, newbranch=False,
74 74 bookmarks=()):
75 75 # repo we push from
76 76 self.repo = repo
77 77 self.ui = repo.ui
78 78 # repo we push to
79 79 self.remote = remote
80 80 # force option provided
81 81 self.force = force
82 82 # revs to be pushed (None is "all")
83 83 self.revs = revs
84 84 # bookmark explicitly pushed
85 85 self.bookmarks = bookmarks
86 86 # allow push of new branch
87 87 self.newbranch = newbranch
88 88 # did a local lock get acquired?
89 89 self.locallocked = None
90 90 # step already performed
91 91 # (used to check what steps have been already performed through bundle2)
92 92 self.stepsdone = set()
93 93 # Integer version of the changegroup push result
94 94 # - None means nothing to push
95 95 # - 0 means HTTP error
96 96 # - 1 means we pushed and remote head count is unchanged *or*
97 97 # we have outgoing changesets but refused to push
98 98 # - other values as described by addchangegroup()
99 99 self.cgresult = None
100 100 # Boolean value for the bookmark push
101 101 self.bkresult = None
102 102 # discover.outgoing object (contains common and outgoing data)
103 103 self.outgoing = None
104 104 # all remote heads before the push
105 105 self.remoteheads = None
106 106 # testable as a boolean indicating if any nodes are missing locally.
107 107 self.incoming = None
108 108 # phases changes that must be pushed along side the changesets
109 109 self.outdatedphases = None
110 110 # phases changes that must be pushed if changeset push fails
111 111 self.fallbackoutdatedphases = None
112 112 # outgoing obsmarkers
113 113 self.outobsmarkers = set()
114 114 # outgoing bookmarks
115 115 self.outbookmarks = []
116 116 # transaction manager
117 117 self.trmanager = None
118 118
119 119 @util.propertycache
120 120 def futureheads(self):
121 121 """future remote heads if the changeset push succeeds"""
122 122 return self.outgoing.missingheads
123 123
124 124 @util.propertycache
125 125 def fallbackheads(self):
126 126 """future remote heads if the changeset push fails"""
127 127 if self.revs is None:
128 128 # not target to push, all common are relevant
129 129 return self.outgoing.commonheads
130 130 unfi = self.repo.unfiltered()
131 131 # I want cheads = heads(::missingheads and ::commonheads)
132 132 # (missingheads is revs with secret changeset filtered out)
133 133 #
134 134 # This can be expressed as:
135 135 # cheads = ( (missingheads and ::commonheads)
136 136 # + (commonheads and ::missingheads))"
137 137 # )
138 138 #
139 139 # while trying to push we already computed the following:
140 140 # common = (::commonheads)
141 141 # missing = ((commonheads::missingheads) - commonheads)
142 142 #
143 143 # We can pick:
144 144 # * missingheads part of common (::commonheads)
145 145 common = set(self.outgoing.common)
146 146 nm = self.repo.changelog.nodemap
147 147 cheads = [node for node in self.revs if nm[node] in common]
148 148 # and
149 149 # * commonheads parents on missing
150 150 revset = unfi.set('%ln and parents(roots(%ln))',
151 151 self.outgoing.commonheads,
152 152 self.outgoing.missing)
153 153 cheads.extend(c.node() for c in revset)
154 154 return cheads
155 155
156 156 @property
157 157 def commonheads(self):
158 158 """set of all common heads after changeset bundle push"""
159 159 if self.cgresult:
160 160 return self.futureheads
161 161 else:
162 162 return self.fallbackheads
163 163
164 164 # mapping of message used when pushing bookmark
165 165 bookmsgmap = {'update': (_("updating bookmark %s\n"),
166 166 _('updating bookmark %s failed!\n')),
167 167 'export': (_("exporting bookmark %s\n"),
168 168 _('exporting bookmark %s failed!\n')),
169 169 'delete': (_("deleting remote bookmark %s\n"),
170 170 _('deleting remote bookmark %s failed!\n')),
171 171 }
172 172
173 173
174 174 def push(repo, remote, force=False, revs=None, newbranch=False, bookmarks=()):
175 175 '''Push outgoing changesets (limited by revs) from a local
176 176 repository to remote. Return an integer:
177 177 - None means nothing to push
178 178 - 0 means HTTP error
179 179 - 1 means we pushed and remote head count is unchanged *or*
180 180 we have outgoing changesets but refused to push
181 181 - other values as described by addchangegroup()
182 182 '''
183 183 pushop = pushoperation(repo, remote, force, revs, newbranch, bookmarks)
184 184 if pushop.remote.local():
185 185 missing = (set(pushop.repo.requirements)
186 186 - pushop.remote.local().supported)
187 187 if missing:
188 188 msg = _("required features are not"
189 189 " supported in the destination:"
190 190 " %s") % (', '.join(sorted(missing)))
191 191 raise util.Abort(msg)
192 192
193 193 # there are two ways to push to remote repo:
194 194 #
195 195 # addchangegroup assumes local user can lock remote
196 196 # repo (local filesystem, old ssh servers).
197 197 #
198 198 # unbundle assumes local user cannot lock remote repo (new ssh
199 199 # servers, http servers).
200 200
201 201 if not pushop.remote.canpush():
202 202 raise util.Abort(_("destination does not support push"))
203 203 # get local lock as we might write phase data
204 204 localwlock = locallock = None
205 205 try:
206 206 # bundle2 push may receive a reply bundle touching bookmarks or other
207 207 # things requiring the wlock. Take it now to ensure proper ordering.
208 208 maypushback = pushop.ui.configbool('experimental', 'bundle2.pushback')
209 209 if _canusebundle2(pushop) and maypushback:
210 210 localwlock = pushop.repo.wlock()
211 211 locallock = pushop.repo.lock()
212 212 pushop.locallocked = True
213 213 except IOError, err:
214 214 pushop.locallocked = False
215 215 if err.errno != errno.EACCES:
216 216 raise
217 217 # source repo cannot be locked.
218 218 # We do not abort the push, but just disable the local phase
219 219 # synchronisation.
220 220 msg = 'cannot lock source repository: %s\n' % err
221 221 pushop.ui.debug(msg)
222 222 try:
223 223 if pushop.locallocked:
224 224 pushop.trmanager = transactionmanager(repo,
225 225 'push-response',
226 226 pushop.remote.url())
227 227 pushop.repo.checkpush(pushop)
228 228 lock = None
229 229 unbundle = pushop.remote.capable('unbundle')
230 230 if not unbundle:
231 231 lock = pushop.remote.lock()
232 232 try:
233 233 _pushdiscovery(pushop)
234 234 if _canusebundle2(pushop):
235 235 _pushbundle2(pushop)
236 236 _pushchangeset(pushop)
237 237 _pushsyncphase(pushop)
238 238 _pushobsolete(pushop)
239 239 _pushbookmark(pushop)
240 240 finally:
241 241 if lock is not None:
242 242 lock.release()
243 243 if pushop.trmanager:
244 244 pushop.trmanager.close()
245 245 finally:
246 246 if pushop.trmanager:
247 247 pushop.trmanager.release()
248 248 if locallock is not None:
249 249 locallock.release()
250 250 if localwlock is not None:
251 251 localwlock.release()
252 252
253 253 return pushop
254 254
255 255 # list of steps to perform discovery before push
256 256 pushdiscoveryorder = []
257 257
258 258 # Mapping between step name and function
259 259 #
260 260 # This exists to help extensions wrap steps if necessary
261 261 pushdiscoverymapping = {}
262 262
263 263 def pushdiscovery(stepname):
264 264 """decorator for function performing discovery before push
265 265
266 266 The function is added to the step -> function mapping and appended to the
267 267 list of steps. Beware that decorated function will be added in order (this
268 268 may matter).
269 269
270 270 You can only use this decorator for a new step, if you want to wrap a step
271 271 from an extension, change the pushdiscovery dictionary directly."""
272 272 def dec(func):
273 273 assert stepname not in pushdiscoverymapping
274 274 pushdiscoverymapping[stepname] = func
275 275 pushdiscoveryorder.append(stepname)
276 276 return func
277 277 return dec
278 278
279 279 def _pushdiscovery(pushop):
280 280 """Run all discovery steps"""
281 281 for stepname in pushdiscoveryorder:
282 282 step = pushdiscoverymapping[stepname]
283 283 step(pushop)
284 284
285 285 @pushdiscovery('changeset')
286 286 def _pushdiscoverychangeset(pushop):
287 287 """discover the changeset that need to be pushed"""
288 288 fci = discovery.findcommonincoming
289 289 commoninc = fci(pushop.repo, pushop.remote, force=pushop.force)
290 290 common, inc, remoteheads = commoninc
291 291 fco = discovery.findcommonoutgoing
292 292 outgoing = fco(pushop.repo, pushop.remote, onlyheads=pushop.revs,
293 293 commoninc=commoninc, force=pushop.force)
294 294 pushop.outgoing = outgoing
295 295 pushop.remoteheads = remoteheads
296 296 pushop.incoming = inc
297 297
298 298 @pushdiscovery('phase')
299 299 def _pushdiscoveryphase(pushop):
300 300 """discover the phase that needs to be pushed
301 301
302 302 (computed for both success and failure case for changesets push)"""
303 303 outgoing = pushop.outgoing
304 304 unfi = pushop.repo.unfiltered()
305 305 remotephases = pushop.remote.listkeys('phases')
306 306 publishing = remotephases.get('publishing', False)
307 307 ana = phases.analyzeremotephases(pushop.repo,
308 308 pushop.fallbackheads,
309 309 remotephases)
310 310 pheads, droots = ana
311 311 extracond = ''
312 312 if not publishing:
313 313 extracond = ' and public()'
314 314 revset = 'heads((%%ln::%%ln) %s)' % extracond
315 315 # Get the list of all revs draft on remote by public here.
316 316 # XXX Beware that revset break if droots is not strictly
317 317 # XXX root we may want to ensure it is but it is costly
318 318 fallback = list(unfi.set(revset, droots, pushop.fallbackheads))
319 319 if not outgoing.missing:
320 320 future = fallback
321 321 else:
322 322 # adds changeset we are going to push as draft
323 323 #
324 324 # should not be necessary for publishing server, but because of an
325 325 # issue fixed in xxxxx we have to do it anyway.
326 326 fdroots = list(unfi.set('roots(%ln + %ln::)',
327 327 outgoing.missing, droots))
328 328 fdroots = [f.node() for f in fdroots]
329 329 future = list(unfi.set(revset, fdroots, pushop.futureheads))
330 330 pushop.outdatedphases = future
331 331 pushop.fallbackoutdatedphases = fallback
332 332
333 333 @pushdiscovery('obsmarker')
334 334 def _pushdiscoveryobsmarkers(pushop):
335 335 if (obsolete.isenabled(pushop.repo, obsolete.exchangeopt)
336 336 and pushop.repo.obsstore
337 337 and 'obsolete' in pushop.remote.listkeys('namespaces')):
338 338 repo = pushop.repo
339 339 # very naive computation, that can be quite expensive on big repo.
340 340 # However: evolution is currently slow on them anyway.
341 341 nodes = (c.node() for c in repo.set('::%ln', pushop.futureheads))
342 342 pushop.outobsmarkers = pushop.repo.obsstore.relevantmarkers(nodes)
343 343
344 344 @pushdiscovery('bookmarks')
345 345 def _pushdiscoverybookmarks(pushop):
346 346 ui = pushop.ui
347 347 repo = pushop.repo.unfiltered()
348 348 remote = pushop.remote
349 349 ui.debug("checking for updated bookmarks\n")
350 350 ancestors = ()
351 351 if pushop.revs:
352 352 revnums = map(repo.changelog.rev, pushop.revs)
353 353 ancestors = repo.changelog.ancestors(revnums, inclusive=True)
354 354 remotebookmark = remote.listkeys('bookmarks')
355 355
356 356 explicit = set(pushop.bookmarks)
357 357
358 358 comp = bookmod.compare(repo, repo._bookmarks, remotebookmark, srchex=hex)
359 359 addsrc, adddst, advsrc, advdst, diverge, differ, invalid, same = comp
360 360 for b, scid, dcid in advsrc:
361 361 if b in explicit:
362 362 explicit.remove(b)
363 363 if not ancestors or repo[scid].rev() in ancestors:
364 364 pushop.outbookmarks.append((b, dcid, scid))
365 365 # search added bookmark
366 366 for b, scid, dcid in addsrc:
367 367 if b in explicit:
368 368 explicit.remove(b)
369 369 pushop.outbookmarks.append((b, '', scid))
370 370 # search for overwritten bookmark
371 371 for b, scid, dcid in advdst + diverge + differ:
372 372 if b in explicit:
373 373 explicit.remove(b)
374 374 pushop.outbookmarks.append((b, dcid, scid))
375 375 # search for bookmark to delete
376 376 for b, scid, dcid in adddst:
377 377 if b in explicit:
378 378 explicit.remove(b)
379 379 # treat as "deleted locally"
380 380 pushop.outbookmarks.append((b, dcid, ''))
381 381 # identical bookmarks shouldn't get reported
382 382 for b, scid, dcid in same:
383 383 if b in explicit:
384 384 explicit.remove(b)
385 385
386 386 if explicit:
387 387 explicit = sorted(explicit)
388 388 # we should probably list all of them
389 389 ui.warn(_('bookmark %s does not exist on the local '
390 390 'or remote repository!\n') % explicit[0])
391 391 pushop.bkresult = 2
392 392
393 393 pushop.outbookmarks.sort()
394 394
395 395 def _pushcheckoutgoing(pushop):
396 396 outgoing = pushop.outgoing
397 397 unfi = pushop.repo.unfiltered()
398 398 if not outgoing.missing:
399 399 # nothing to push
400 400 scmutil.nochangesfound(unfi.ui, unfi, outgoing.excluded)
401 401 return False
402 402 # something to push
403 403 if not pushop.force:
404 404 # if repo.obsstore == False --> no obsolete
405 405 # then, save the iteration
406 406 if unfi.obsstore:
407 407 # this message are here for 80 char limit reason
408 408 mso = _("push includes obsolete changeset: %s!")
409 409 mst = {"unstable": _("push includes unstable changeset: %s!"),
410 410 "bumped": _("push includes bumped changeset: %s!"),
411 411 "divergent": _("push includes divergent changeset: %s!")}
412 412 # If we are to push if there is at least one
413 413 # obsolete or unstable changeset in missing, at
414 414 # least one of the missinghead will be obsolete or
415 415 # unstable. So checking heads only is ok
416 416 for node in outgoing.missingheads:
417 417 ctx = unfi[node]
418 418 if ctx.obsolete():
419 419 raise util.Abort(mso % ctx)
420 420 elif ctx.troubled():
421 421 raise util.Abort(mst[ctx.troubles()[0]] % ctx)
422 422 newbm = pushop.ui.configlist('bookmarks', 'pushing')
423 423 discovery.checkheads(unfi, pushop.remote, outgoing,
424 424 pushop.remoteheads,
425 425 pushop.newbranch,
426 426 bool(pushop.incoming),
427 427 newbm)
428 428 return True
429 429
430 430 # List of names of steps to perform for an outgoing bundle2, order matters.
431 431 b2partsgenorder = []
432 432
433 433 # Mapping between step name and function
434 434 #
435 435 # This exists to help extensions wrap steps if necessary
436 436 b2partsgenmapping = {}
437 437
438 438 def b2partsgenerator(stepname, idx=None):
439 439 """decorator for function generating bundle2 part
440 440
441 441 The function is added to the step -> function mapping and appended to the
442 442 list of steps. Beware that decorated functions will be added in order
443 443 (this may matter).
444 444
445 445 You can only use this decorator for new steps, if you want to wrap a step
446 446 from an extension, attack the b2partsgenmapping dictionary directly."""
447 447 def dec(func):
448 448 assert stepname not in b2partsgenmapping
449 449 b2partsgenmapping[stepname] = func
450 450 if idx is None:
451 451 b2partsgenorder.append(stepname)
452 452 else:
453 453 b2partsgenorder.insert(idx, stepname)
454 454 return func
455 455 return dec
456 456
457 457 @b2partsgenerator('changeset')
458 458 def _pushb2ctx(pushop, bundler):
459 459 """handle changegroup push through bundle2
460 460
461 461 addchangegroup result is stored in the ``pushop.cgresult`` attribute.
462 462 """
463 463 if 'changesets' in pushop.stepsdone:
464 464 return
465 465 pushop.stepsdone.add('changesets')
466 466 # Send known heads to the server for race detection.
467 467 if not _pushcheckoutgoing(pushop):
468 468 return
469 469 pushop.repo.prepushoutgoinghooks(pushop.repo,
470 470 pushop.remote,
471 471 pushop.outgoing)
472 472 if not pushop.force:
473 473 bundler.newpart('check:heads', data=iter(pushop.remoteheads))
474 474 b2caps = bundle2.bundle2caps(pushop.remote)
475 475 version = None
476 476 cgversions = b2caps.get('changegroup')
477 477 if not cgversions: # 3.1 and 3.2 ship with an empty value
478 478 cg = changegroup.getlocalchangegroupraw(pushop.repo, 'push',
479 479 pushop.outgoing)
480 480 else:
481 481 cgversions = [v for v in cgversions if v in changegroup.packermap]
482 482 if not cgversions:
483 483 raise ValueError(_('no common changegroup version'))
484 484 version = max(cgversions)
485 485 cg = changegroup.getlocalchangegroupraw(pushop.repo, 'push',
486 486 pushop.outgoing,
487 487 version=version)
488 488 cgpart = bundler.newpart('changegroup', data=cg)
489 489 if version is not None:
490 490 cgpart.addparam('version', version)
491 491 def handlereply(op):
492 492 """extract addchangegroup returns from server reply"""
493 493 cgreplies = op.records.getreplies(cgpart.id)
494 494 assert len(cgreplies['changegroup']) == 1
495 495 pushop.cgresult = cgreplies['changegroup'][0]['return']
496 496 return handlereply
497 497
498 498 @b2partsgenerator('phase')
499 499 def _pushb2phases(pushop, bundler):
500 500 """handle phase push through bundle2"""
501 501 if 'phases' in pushop.stepsdone:
502 502 return
503 503 b2caps = bundle2.bundle2caps(pushop.remote)
504 504 if not 'pushkey' in b2caps:
505 505 return
506 506 pushop.stepsdone.add('phases')
507 507 part2node = []
508 508 enc = pushkey.encode
509 509 for newremotehead in pushop.outdatedphases:
510 510 part = bundler.newpart('pushkey')
511 511 part.addparam('namespace', enc('phases'))
512 512 part.addparam('key', enc(newremotehead.hex()))
513 513 part.addparam('old', enc(str(phases.draft)))
514 514 part.addparam('new', enc(str(phases.public)))
515 515 part2node.append((part.id, newremotehead))
516 516 def handlereply(op):
517 517 for partid, node in part2node:
518 518 partrep = op.records.getreplies(partid)
519 519 results = partrep['pushkey']
520 520 assert len(results) <= 1
521 521 msg = None
522 522 if not results:
523 523 msg = _('server ignored update of %s to public!\n') % node
524 524 elif not int(results[0]['return']):
525 525 msg = _('updating %s to public failed!\n') % node
526 526 if msg is not None:
527 527 pushop.ui.warn(msg)
528 528 return handlereply
529 529
530 530 @b2partsgenerator('obsmarkers')
531 531 def _pushb2obsmarkers(pushop, bundler):
532 532 if 'obsmarkers' in pushop.stepsdone:
533 533 return
534 534 remoteversions = bundle2.obsmarkersversion(bundler.capabilities)
535 535 if obsolete.commonversion(remoteversions) is None:
536 536 return
537 537 pushop.stepsdone.add('obsmarkers')
538 538 if pushop.outobsmarkers:
539 539 buildobsmarkerspart(bundler, pushop.outobsmarkers)
540 540
541 541 @b2partsgenerator('bookmarks')
542 542 def _pushb2bookmarks(pushop, bundler):
543 543 """handle phase push through bundle2"""
544 544 if 'bookmarks' in pushop.stepsdone:
545 545 return
546 546 b2caps = bundle2.bundle2caps(pushop.remote)
547 547 if 'pushkey' not in b2caps:
548 548 return
549 549 pushop.stepsdone.add('bookmarks')
550 550 part2book = []
551 551 enc = pushkey.encode
552 552 for book, old, new in pushop.outbookmarks:
553 553 part = bundler.newpart('pushkey')
554 554 part.addparam('namespace', enc('bookmarks'))
555 555 part.addparam('key', enc(book))
556 556 part.addparam('old', enc(old))
557 557 part.addparam('new', enc(new))
558 558 action = 'update'
559 559 if not old:
560 560 action = 'export'
561 561 elif not new:
562 562 action = 'delete'
563 563 part2book.append((part.id, book, action))
564 564
565 565
566 566 def handlereply(op):
567 567 ui = pushop.ui
568 568 for partid, book, action in part2book:
569 569 partrep = op.records.getreplies(partid)
570 570 results = partrep['pushkey']
571 571 assert len(results) <= 1
572 572 if not results:
573 573 pushop.ui.warn(_('server ignored bookmark %s update\n') % book)
574 574 else:
575 575 ret = int(results[0]['return'])
576 576 if ret:
577 577 ui.status(bookmsgmap[action][0] % book)
578 578 else:
579 579 ui.warn(bookmsgmap[action][1] % book)
580 580 if pushop.bkresult is not None:
581 581 pushop.bkresult = 1
582 582 return handlereply
583 583
584 584
585 585 def _pushbundle2(pushop):
586 586 """push data to the remote using bundle2
587 587
588 588 The only currently supported type of data is changegroup but this will
589 589 evolve in the future."""
590 590 bundler = bundle2.bundle20(pushop.ui, bundle2.bundle2caps(pushop.remote))
591 591 pushback = (pushop.trmanager
592 592 and pushop.ui.configbool('experimental', 'bundle2.pushback'))
593 593
594 594 # create reply capability
595 595 capsblob = bundle2.encodecaps(bundle2.getrepocaps(pushop.repo,
596 596 allowpushback=pushback))
597 597 bundler.newpart('replycaps', data=capsblob)
598 598 replyhandlers = []
599 599 for partgenname in b2partsgenorder:
600 600 partgen = b2partsgenmapping[partgenname]
601 601 ret = partgen(pushop, bundler)
602 602 if callable(ret):
603 603 replyhandlers.append(ret)
604 604 # do not push if nothing to push
605 605 if bundler.nbparts <= 1:
606 606 return
607 607 stream = util.chunkbuffer(bundler.getchunks())
608 608 try:
609 609 reply = pushop.remote.unbundle(stream, ['force'], 'push')
610 610 except error.BundleValueError, exc:
611 611 raise util.Abort('missing support for %s' % exc)
612 612 try:
613 613 trgetter = None
614 614 if pushback:
615 615 trgetter = pushop.trmanager.transaction
616 616 op = bundle2.processbundle(pushop.repo, reply, trgetter)
617 617 except error.BundleValueError, exc:
618 618 raise util.Abort('missing support for %s' % exc)
619 619 for rephand in replyhandlers:
620 620 rephand(op)
621 621
622 622 def _pushchangeset(pushop):
623 623 """Make the actual push of changeset bundle to remote repo"""
624 624 if 'changesets' in pushop.stepsdone:
625 625 return
626 626 pushop.stepsdone.add('changesets')
627 627 if not _pushcheckoutgoing(pushop):
628 628 return
629 629 pushop.repo.prepushoutgoinghooks(pushop.repo,
630 630 pushop.remote,
631 631 pushop.outgoing)
632 632 outgoing = pushop.outgoing
633 633 unbundle = pushop.remote.capable('unbundle')
634 634 # TODO: get bundlecaps from remote
635 635 bundlecaps = None
636 636 # create a changegroup from local
637 637 if pushop.revs is None and not (outgoing.excluded
638 638 or pushop.repo.changelog.filteredrevs):
639 639 # push everything,
640 640 # use the fast path, no race possible on push
641 641 bundler = changegroup.cg1packer(pushop.repo, bundlecaps)
642 642 cg = changegroup.getsubset(pushop.repo,
643 643 outgoing,
644 644 bundler,
645 645 'push',
646 646 fastpath=True)
647 647 else:
648 648 cg = changegroup.getlocalchangegroup(pushop.repo, 'push', outgoing,
649 649 bundlecaps)
650 650
651 651 # apply changegroup to remote
652 652 if unbundle:
653 653 # local repo finds heads on server, finds out what
654 654 # revs it must push. once revs transferred, if server
655 655 # finds it has different heads (someone else won
656 656 # commit/push race), server aborts.
657 657 if pushop.force:
658 658 remoteheads = ['force']
659 659 else:
660 660 remoteheads = pushop.remoteheads
661 661 # ssh: return remote's addchangegroup()
662 662 # http: return remote's addchangegroup() or 0 for error
663 663 pushop.cgresult = pushop.remote.unbundle(cg, remoteheads,
664 664 pushop.repo.url())
665 665 else:
666 666 # we return an integer indicating remote head count
667 667 # change
668 668 pushop.cgresult = pushop.remote.addchangegroup(cg, 'push',
669 669 pushop.repo.url())
670 670
671 671 def _pushsyncphase(pushop):
672 672 """synchronise phase information locally and remotely"""
673 673 cheads = pushop.commonheads
674 674 # even when we don't push, exchanging phase data is useful
675 675 remotephases = pushop.remote.listkeys('phases')
676 676 if (pushop.ui.configbool('ui', '_usedassubrepo', False)
677 677 and remotephases # server supports phases
678 678 and pushop.cgresult is None # nothing was pushed
679 679 and remotephases.get('publishing', False)):
680 680 # When:
681 681 # - this is a subrepo push
682 682 # - and remote support phase
683 683 # - and no changeset was pushed
684 684 # - and remote is publishing
685 685 # We may be in issue 3871 case!
686 686 # We drop the possible phase synchronisation done by
687 687 # courtesy to publish changesets possibly locally draft
688 688 # on the remote.
689 689 remotephases = {'publishing': 'True'}
690 690 if not remotephases: # old server or public only reply from non-publishing
691 691 _localphasemove(pushop, cheads)
692 692 # don't push any phase data as there is nothing to push
693 693 else:
694 694 ana = phases.analyzeremotephases(pushop.repo, cheads,
695 695 remotephases)
696 696 pheads, droots = ana
697 697 ### Apply remote phase on local
698 698 if remotephases.get('publishing', False):
699 699 _localphasemove(pushop, cheads)
700 700 else: # publish = False
701 701 _localphasemove(pushop, pheads)
702 702 _localphasemove(pushop, cheads, phases.draft)
703 703 ### Apply local phase on remote
704 704
705 705 if pushop.cgresult:
706 706 if 'phases' in pushop.stepsdone:
707 707 # phases already pushed though bundle2
708 708 return
709 709 outdated = pushop.outdatedphases
710 710 else:
711 711 outdated = pushop.fallbackoutdatedphases
712 712
713 713 pushop.stepsdone.add('phases')
714 714
715 715 # filter heads already turned public by the push
716 716 outdated = [c for c in outdated if c.node() not in pheads]
717 717 # fallback to independent pushkey command
718 718 for newremotehead in outdated:
719 719 r = pushop.remote.pushkey('phases',
720 720 newremotehead.hex(),
721 721 str(phases.draft),
722 722 str(phases.public))
723 723 if not r:
724 724 pushop.ui.warn(_('updating %s to public failed!\n')
725 725 % newremotehead)
726 726
727 727 def _localphasemove(pushop, nodes, phase=phases.public):
728 728 """move <nodes> to <phase> in the local source repo"""
729 729 if pushop.trmanager:
730 730 phases.advanceboundary(pushop.repo,
731 731 pushop.trmanager.transaction(),
732 732 phase,
733 733 nodes)
734 734 else:
735 735 # repo is not locked, do not change any phases!
736 736 # Informs the user that phases should have been moved when
737 737 # applicable.
738 738 actualmoves = [n for n in nodes if phase < pushop.repo[n].phase()]
739 739 phasestr = phases.phasenames[phase]
740 740 if actualmoves:
741 741 pushop.ui.status(_('cannot lock source repo, skipping '
742 742 'local %s phase update\n') % phasestr)
743 743
744 744 def _pushobsolete(pushop):
745 745 """utility function to push obsolete markers to a remote"""
746 746 if 'obsmarkers' in pushop.stepsdone:
747 747 return
748 748 pushop.ui.debug('try to push obsolete markers to remote\n')
749 749 repo = pushop.repo
750 750 remote = pushop.remote
751 751 pushop.stepsdone.add('obsmarkers')
752 752 if pushop.outobsmarkers:
753 753 rslts = []
754 754 remotedata = obsolete._pushkeyescape(pushop.outobsmarkers)
755 755 for key in sorted(remotedata, reverse=True):
756 756 # reverse sort to ensure we end with dump0
757 757 data = remotedata[key]
758 758 rslts.append(remote.pushkey('obsolete', key, '', data))
759 759 if [r for r in rslts if not r]:
760 760 msg = _('failed to push some obsolete markers!\n')
761 761 repo.ui.warn(msg)
762 762
763 763 def _pushbookmark(pushop):
764 764 """Update bookmark position on remote"""
765 765 if pushop.cgresult == 0 or 'bookmarks' in pushop.stepsdone:
766 766 return
767 767 pushop.stepsdone.add('bookmarks')
768 768 ui = pushop.ui
769 769 remote = pushop.remote
770 770
771 771 for b, old, new in pushop.outbookmarks:
772 772 action = 'update'
773 773 if not old:
774 774 action = 'export'
775 775 elif not new:
776 776 action = 'delete'
777 777 if remote.pushkey('bookmarks', b, old, new):
778 778 ui.status(bookmsgmap[action][0] % b)
779 779 else:
780 780 ui.warn(bookmsgmap[action][1] % b)
781 781 # discovery can have set the value form invalid entry
782 782 if pushop.bkresult is not None:
783 783 pushop.bkresult = 1
784 784
785 785 class pulloperation(object):
786 786 """A object that represent a single pull operation
787 787
788 788 It purpose is to carry pull related state and very common operation.
789 789
790 790 A new should be created at the beginning of each pull and discarded
791 791 afterward.
792 792 """
793 793
794 794 def __init__(self, repo, remote, heads=None, force=False, bookmarks=()):
795 795 # repo we pull into
796 796 self.repo = repo
797 797 # repo we pull from
798 798 self.remote = remote
799 799 # revision we try to pull (None is "all")
800 800 self.heads = heads
801 801 # bookmark pulled explicitly
802 802 self.explicitbookmarks = bookmarks
803 803 # do we force pull?
804 804 self.force = force
805 805 # transaction manager
806 806 self.trmanager = None
807 807 # set of common changeset between local and remote before pull
808 808 self.common = None
809 809 # set of pulled head
810 810 self.rheads = None
811 811 # list of missing changeset to fetch remotely
812 812 self.fetch = None
813 813 # remote bookmarks data
814 814 self.remotebookmarks = None
815 815 # result of changegroup pulling (used as return code by pull)
816 816 self.cgresult = None
817 817 # list of step already done
818 818 self.stepsdone = set()
819 819
820 820 @util.propertycache
821 821 def pulledsubset(self):
822 822 """heads of the set of changeset target by the pull"""
823 823 # compute target subset
824 824 if self.heads is None:
825 825 # We pulled every thing possible
826 826 # sync on everything common
827 827 c = set(self.common)
828 828 ret = list(self.common)
829 829 for n in self.rheads:
830 830 if n not in c:
831 831 ret.append(n)
832 832 return ret
833 833 else:
834 834 # We pulled a specific subset
835 835 # sync on this subset
836 836 return self.heads
837 837
838 838 def gettransaction(self):
839 839 # deprecated; talk to trmanager directly
840 840 return self.trmanager.transaction()
841 841
842 842 class transactionmanager(object):
843 843 """An object to manage the life cycle of a transaction
844 844
845 845 It creates the transaction on demand and calls the appropriate hooks when
846 846 closing the transaction."""
847 847 def __init__(self, repo, source, url):
848 848 self.repo = repo
849 849 self.source = source
850 850 self.url = url
851 851 self._tr = None
852 852
853 853 def transaction(self):
854 854 """Return an open transaction object, constructing if necessary"""
855 855 if not self._tr:
856 856 trname = '%s\n%s' % (self.source, util.hidepassword(self.url))
857 857 self._tr = self.repo.transaction(trname)
858 858 self._tr.hookargs['source'] = self.source
859 859 self._tr.hookargs['url'] = self.url
860 860 return self._tr
861 861
862 862 def close(self):
863 863 """close transaction if created"""
864 864 if self._tr is not None:
865 865 self._tr.close()
866 866
867 867 def release(self):
868 868 """release transaction if created"""
869 869 if self._tr is not None:
870 870 self._tr.release()
871 871
872 872 def pull(repo, remote, heads=None, force=False, bookmarks=()):
873 873 pullop = pulloperation(repo, remote, heads, force, bookmarks=bookmarks)
874 874 if pullop.remote.local():
875 875 missing = set(pullop.remote.requirements) - pullop.repo.supported
876 876 if missing:
877 877 msg = _("required features are not"
878 878 " supported in the destination:"
879 879 " %s") % (', '.join(sorted(missing)))
880 880 raise util.Abort(msg)
881 881
882 882 pullop.remotebookmarks = remote.listkeys('bookmarks')
883 883 lock = pullop.repo.lock()
884 884 try:
885 885 pullop.trmanager = transactionmanager(repo, 'pull', remote.url())
886 886 _pulldiscovery(pullop)
887 887 if _canusebundle2(pullop):
888 888 _pullbundle2(pullop)
889 889 _pullchangeset(pullop)
890 890 _pullphase(pullop)
891 891 _pullbookmarks(pullop)
892 892 _pullobsolete(pullop)
893 893 pullop.trmanager.close()
894 894 finally:
895 895 pullop.trmanager.release()
896 896 lock.release()
897 897
898 898 return pullop
899 899
900 900 # list of steps to perform discovery before pull
901 901 pulldiscoveryorder = []
902 902
903 903 # Mapping between step name and function
904 904 #
905 905 # This exists to help extensions wrap steps if necessary
906 906 pulldiscoverymapping = {}
907 907
908 908 def pulldiscovery(stepname):
909 909 """decorator for function performing discovery before pull
910 910
911 911 The function is added to the step -> function mapping and appended to the
912 912 list of steps. Beware that decorated function will be added in order (this
913 913 may matter).
914 914
915 915 You can only use this decorator for a new step, if you want to wrap a step
916 916 from an extension, change the pulldiscovery dictionary directly."""
917 917 def dec(func):
918 918 assert stepname not in pulldiscoverymapping
919 919 pulldiscoverymapping[stepname] = func
920 920 pulldiscoveryorder.append(stepname)
921 921 return func
922 922 return dec
923 923
924 924 def _pulldiscovery(pullop):
925 925 """Run all discovery steps"""
926 926 for stepname in pulldiscoveryorder:
927 927 step = pulldiscoverymapping[stepname]
928 928 step(pullop)
929 929
930 930 @pulldiscovery('changegroup')
931 931 def _pulldiscoverychangegroup(pullop):
932 932 """discovery phase for the pull
933 933
934 934 Current handle changeset discovery only, will change handle all discovery
935 935 at some point."""
936 936 tmp = discovery.findcommonincoming(pullop.repo,
937 937 pullop.remote,
938 938 heads=pullop.heads,
939 939 force=pullop.force)
940 940 common, fetch, rheads = tmp
941 941 nm = pullop.repo.unfiltered().changelog.nodemap
942 942 if fetch and rheads:
943 943 # If a remote heads in filtered locally, lets drop it from the unknown
944 944 # remote heads and put in back in common.
945 945 #
946 946 # This is a hackish solution to catch most of "common but locally
947 947 # hidden situation". We do not performs discovery on unfiltered
948 948 # repository because it end up doing a pathological amount of round
949 949 # trip for w huge amount of changeset we do not care about.
950 950 #
951 951 # If a set of such "common but filtered" changeset exist on the server
952 952 # but are not including a remote heads, we'll not be able to detect it,
953 953 scommon = set(common)
954 954 filteredrheads = []
955 955 for n in rheads:
956 956 if n in nm:
957 957 if n not in scommon:
958 958 common.append(n)
959 959 else:
960 960 filteredrheads.append(n)
961 961 if not filteredrheads:
962 962 fetch = []
963 963 rheads = filteredrheads
964 964 pullop.common = common
965 965 pullop.fetch = fetch
966 966 pullop.rheads = rheads
967 967
968 968 def _pullbundle2(pullop):
969 969 """pull data using bundle2
970 970
971 971 For now, the only supported data are changegroup."""
972 972 remotecaps = bundle2.bundle2caps(pullop.remote)
973 973 kwargs = {'bundlecaps': caps20to10(pullop.repo)}
974 974 # pulling changegroup
975 975 pullop.stepsdone.add('changegroup')
976 976
977 977 kwargs['common'] = pullop.common
978 978 kwargs['heads'] = pullop.heads or pullop.rheads
979 979 kwargs['cg'] = pullop.fetch
980 980 if 'listkeys' in remotecaps:
981 981 kwargs['listkeys'] = ['phase', 'bookmarks']
982 982 if not pullop.fetch:
983 983 pullop.repo.ui.status(_("no changes found\n"))
984 984 pullop.cgresult = 0
985 985 else:
986 986 if pullop.heads is None and list(pullop.common) == [nullid]:
987 987 pullop.repo.ui.status(_("requesting all changes\n"))
988 988 if obsolete.isenabled(pullop.repo, obsolete.exchangeopt):
989 989 remoteversions = bundle2.obsmarkersversion(remotecaps)
990 990 if obsolete.commonversion(remoteversions) is not None:
991 991 kwargs['obsmarkers'] = True
992 992 pullop.stepsdone.add('obsmarkers')
993 993 _pullbundle2extraprepare(pullop, kwargs)
994 994 bundle = pullop.remote.getbundle('pull', **kwargs)
995 995 try:
996 996 op = bundle2.processbundle(pullop.repo, bundle, pullop.gettransaction)
997 997 except error.BundleValueError, exc:
998 998 raise util.Abort('missing support for %s' % exc)
999 999
1000 1000 if pullop.fetch:
1001 1001 results = [cg['return'] for cg in op.records['changegroup']]
1002 1002 pullop.cgresult = changegroup.combineresults(results)
1003 1003
1004 1004 # processing phases change
1005 1005 for namespace, value in op.records['listkeys']:
1006 1006 if namespace == 'phases':
1007 1007 _pullapplyphases(pullop, value)
1008 1008
1009 1009 # processing bookmark update
1010 1010 for namespace, value in op.records['listkeys']:
1011 1011 if namespace == 'bookmarks':
1012 1012 pullop.remotebookmarks = value
1013 1013 _pullbookmarks(pullop)
1014 1014
1015 1015 def _pullbundle2extraprepare(pullop, kwargs):
1016 1016 """hook function so that extensions can extend the getbundle call"""
1017 1017 pass
1018 1018
1019 1019 def _pullchangeset(pullop):
1020 1020 """pull changeset from unbundle into the local repo"""
1021 1021 # We delay the open of the transaction as late as possible so we
1022 1022 # don't open transaction for nothing or you break future useful
1023 1023 # rollback call
1024 1024 if 'changegroup' in pullop.stepsdone:
1025 1025 return
1026 1026 pullop.stepsdone.add('changegroup')
1027 1027 if not pullop.fetch:
1028 1028 pullop.repo.ui.status(_("no changes found\n"))
1029 1029 pullop.cgresult = 0
1030 1030 return
1031 1031 pullop.gettransaction()
1032 1032 if pullop.heads is None and list(pullop.common) == [nullid]:
1033 1033 pullop.repo.ui.status(_("requesting all changes\n"))
1034 1034 elif pullop.heads is None and pullop.remote.capable('changegroupsubset'):
1035 1035 # issue1320, avoid a race if remote changed after discovery
1036 1036 pullop.heads = pullop.rheads
1037 1037
1038 1038 if pullop.remote.capable('getbundle'):
1039 1039 # TODO: get bundlecaps from remote
1040 1040 cg = pullop.remote.getbundle('pull', common=pullop.common,
1041 1041 heads=pullop.heads or pullop.rheads)
1042 1042 elif pullop.heads is None:
1043 1043 cg = pullop.remote.changegroup(pullop.fetch, 'pull')
1044 1044 elif not pullop.remote.capable('changegroupsubset'):
1045 1045 raise util.Abort(_("partial pull cannot be done because "
1046 1046 "other repository doesn't support "
1047 1047 "changegroupsubset."))
1048 1048 else:
1049 1049 cg = pullop.remote.changegroupsubset(pullop.fetch, pullop.heads, 'pull')
1050 1050 pullop.cgresult = changegroup.addchangegroup(pullop.repo, cg, 'pull',
1051 1051 pullop.remote.url())
1052 1052
1053 1053 def _pullphase(pullop):
1054 1054 # Get remote phases data from remote
1055 1055 if 'phases' in pullop.stepsdone:
1056 1056 return
1057 1057 remotephases = pullop.remote.listkeys('phases')
1058 1058 _pullapplyphases(pullop, remotephases)
1059 1059
1060 1060 def _pullapplyphases(pullop, remotephases):
1061 1061 """apply phase movement from observed remote state"""
1062 1062 if 'phases' in pullop.stepsdone:
1063 1063 return
1064 1064 pullop.stepsdone.add('phases')
1065 1065 publishing = bool(remotephases.get('publishing', False))
1066 1066 if remotephases and not publishing:
1067 1067 # remote is new and unpublishing
1068 1068 pheads, _dr = phases.analyzeremotephases(pullop.repo,
1069 1069 pullop.pulledsubset,
1070 1070 remotephases)
1071 1071 dheads = pullop.pulledsubset
1072 1072 else:
1073 1073 # Remote is old or publishing all common changesets
1074 1074 # should be seen as public
1075 1075 pheads = pullop.pulledsubset
1076 1076 dheads = []
1077 1077 unfi = pullop.repo.unfiltered()
1078 1078 phase = unfi._phasecache.phase
1079 1079 rev = unfi.changelog.nodemap.get
1080 1080 public = phases.public
1081 1081 draft = phases.draft
1082 1082
1083 1083 # exclude changesets already public locally and update the others
1084 1084 pheads = [pn for pn in pheads if phase(unfi, rev(pn)) > public]
1085 1085 if pheads:
1086 1086 tr = pullop.gettransaction()
1087 1087 phases.advanceboundary(pullop.repo, tr, public, pheads)
1088 1088
1089 1089 # exclude changesets already draft locally and update the others
1090 1090 dheads = [pn for pn in dheads if phase(unfi, rev(pn)) > draft]
1091 1091 if dheads:
1092 1092 tr = pullop.gettransaction()
1093 1093 phases.advanceboundary(pullop.repo, tr, draft, dheads)
1094 1094
1095 1095 def _pullbookmarks(pullop):
1096 1096 """process the remote bookmark information to update the local one"""
1097 1097 if 'bookmarks' in pullop.stepsdone:
1098 1098 return
1099 1099 pullop.stepsdone.add('bookmarks')
1100 1100 repo = pullop.repo
1101 1101 remotebookmarks = pullop.remotebookmarks
1102 1102 bookmod.updatefromremote(repo.ui, repo, remotebookmarks,
1103 1103 pullop.remote.url(),
1104 1104 pullop.gettransaction,
1105 1105 explicit=pullop.explicitbookmarks)
1106 1106
1107 1107 def _pullobsolete(pullop):
1108 1108 """utility function to pull obsolete markers from a remote
1109 1109
1110 1110 The `gettransaction` is function that return the pull transaction, creating
1111 1111 one if necessary. We return the transaction to inform the calling code that
1112 1112 a new transaction have been created (when applicable).
1113 1113
1114 1114 Exists mostly to allow overriding for experimentation purpose"""
1115 1115 if 'obsmarkers' in pullop.stepsdone:
1116 1116 return
1117 1117 pullop.stepsdone.add('obsmarkers')
1118 1118 tr = None
1119 1119 if obsolete.isenabled(pullop.repo, obsolete.exchangeopt):
1120 1120 pullop.repo.ui.debug('fetching remote obsolete markers\n')
1121 1121 remoteobs = pullop.remote.listkeys('obsolete')
1122 1122 if 'dump0' in remoteobs:
1123 1123 tr = pullop.gettransaction()
1124 1124 for key in sorted(remoteobs, reverse=True):
1125 1125 if key.startswith('dump'):
1126 1126 data = base85.b85decode(remoteobs[key])
1127 1127 pullop.repo.obsstore.mergemarkers(tr, data)
1128 1128 pullop.repo.invalidatevolatilesets()
1129 1129 return tr
1130 1130
1131 1131 def caps20to10(repo):
1132 1132 """return a set with appropriate options to use bundle20 during getbundle"""
1133 1133 caps = set(['HG20'])
1134 1134 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
1135 1135 caps.add('bundle2=' + urllib.quote(capsblob))
1136 1136 return caps
1137 1137
1138 1138 # List of names of steps to perform for a bundle2 for getbundle, order matters.
1139 1139 getbundle2partsorder = []
1140 1140
1141 1141 # Mapping between step name and function
1142 1142 #
1143 1143 # This exists to help extensions wrap steps if necessary
1144 1144 getbundle2partsmapping = {}
1145 1145
1146 1146 def getbundle2partsgenerator(stepname, idx=None):
1147 1147 """decorator for function generating bundle2 part for getbundle
1148 1148
1149 1149 The function is added to the step -> function mapping and appended to the
1150 1150 list of steps. Beware that decorated functions will be added in order
1151 1151 (this may matter).
1152 1152
1153 1153 You can only use this decorator for new steps, if you want to wrap a step
1154 1154 from an extension, attack the getbundle2partsmapping dictionary directly."""
1155 1155 def dec(func):
1156 1156 assert stepname not in getbundle2partsmapping
1157 1157 getbundle2partsmapping[stepname] = func
1158 1158 if idx is None:
1159 1159 getbundle2partsorder.append(stepname)
1160 1160 else:
1161 1161 getbundle2partsorder.insert(idx, stepname)
1162 1162 return func
1163 1163 return dec
1164 1164
1165 1165 def getbundle(repo, source, heads=None, common=None, bundlecaps=None,
1166 1166 **kwargs):
1167 1167 """return a full bundle (with potentially multiple kind of parts)
1168 1168
1169 1169 Could be a bundle HG10 or a bundle HG20 depending on bundlecaps
1170 1170 passed. For now, the bundle can contain only changegroup, but this will
1171 1171 changes when more part type will be available for bundle2.
1172 1172
1173 1173 This is different from changegroup.getchangegroup that only returns an HG10
1174 1174 changegroup bundle. They may eventually get reunited in the future when we
1175 1175 have a clearer idea of the API we what to query different data.
1176 1176
1177 1177 The implementation is at a very early stage and will get massive rework
1178 1178 when the API of bundle is refined.
1179 1179 """
1180 1180 # bundle10 case
1181 1181 usebundle2 = False
1182 1182 if bundlecaps is not None:
1183 1183 usebundle2 = util.any((cap.startswith('HG2') for cap in bundlecaps))
1184 1184 if not usebundle2:
1185 1185 if bundlecaps and not kwargs.get('cg', True):
1186 1186 raise ValueError(_('request for bundle10 must include changegroup'))
1187 1187
1188 1188 if kwargs:
1189 1189 raise ValueError(_('unsupported getbundle arguments: %s')
1190 1190 % ', '.join(sorted(kwargs.keys())))
1191 1191 return changegroup.getchangegroup(repo, source, heads=heads,
1192 1192 common=common, bundlecaps=bundlecaps)
1193 1193
1194 1194 # bundle20 case
1195 1195 b2caps = {}
1196 1196 for bcaps in bundlecaps:
1197 1197 if bcaps.startswith('bundle2='):
1198 1198 blob = urllib.unquote(bcaps[len('bundle2='):])
1199 1199 b2caps.update(bundle2.decodecaps(blob))
1200 1200 bundler = bundle2.bundle20(repo.ui, b2caps)
1201 1201
1202 1202 kwargs['heads'] = heads
1203 1203 kwargs['common'] = common
1204 1204
1205 1205 for name in getbundle2partsorder:
1206 1206 func = getbundle2partsmapping[name]
1207 1207 func(bundler, repo, source, bundlecaps=bundlecaps, b2caps=b2caps,
1208 1208 **kwargs)
1209 1209
1210 1210 return util.chunkbuffer(bundler.getchunks())
1211 1211
1212 1212 @getbundle2partsgenerator('changegroup')
1213 1213 def _getbundlechangegrouppart(bundler, repo, source, bundlecaps=None,
1214 1214 b2caps=None, heads=None, common=None, **kwargs):
1215 1215 """add a changegroup part to the requested bundle"""
1216 1216 cg = None
1217 1217 if kwargs.get('cg', True):
1218 1218 # build changegroup bundle here.
1219 1219 version = None
1220 1220 cgversions = b2caps.get('changegroup')
1221 1221 if not cgversions: # 3.1 and 3.2 ship with an empty value
1222 1222 cg = changegroup.getchangegroupraw(repo, source, heads=heads,
1223 1223 common=common,
1224 1224 bundlecaps=bundlecaps)
1225 1225 else:
1226 1226 cgversions = [v for v in cgversions if v in changegroup.packermap]
1227 1227 if not cgversions:
1228 1228 raise ValueError(_('no common changegroup version'))
1229 1229 version = max(cgversions)
1230 1230 cg = changegroup.getchangegroupraw(repo, source, heads=heads,
1231 1231 common=common,
1232 1232 bundlecaps=bundlecaps,
1233 1233 version=version)
1234 1234
1235 1235 if cg:
1236 1236 part = bundler.newpart('changegroup', data=cg)
1237 1237 if version is not None:
1238 1238 part.addparam('version', version)
1239 1239
1240 1240 @getbundle2partsgenerator('listkeys')
1241 1241 def _getbundlelistkeysparts(bundler, repo, source, bundlecaps=None,
1242 1242 b2caps=None, **kwargs):
1243 1243 """add parts containing listkeys namespaces to the requested bundle"""
1244 1244 listkeys = kwargs.get('listkeys', ())
1245 1245 for namespace in listkeys:
1246 1246 part = bundler.newpart('listkeys')
1247 1247 part.addparam('namespace', namespace)
1248 1248 keys = repo.listkeys(namespace).items()
1249 1249 part.data = pushkey.encodekeys(keys)
1250 1250
1251 1251 @getbundle2partsgenerator('obsmarkers')
1252 1252 def _getbundleobsmarkerpart(bundler, repo, source, bundlecaps=None,
1253 1253 b2caps=None, heads=None, **kwargs):
1254 1254 """add an obsolescence markers part to the requested bundle"""
1255 1255 if kwargs.get('obsmarkers', False):
1256 1256 if heads is None:
1257 1257 heads = repo.heads()
1258 1258 subset = [c.node() for c in repo.set('::%ln', heads)]
1259 1259 markers = repo.obsstore.relevantmarkers(subset)
1260 1260 buildobsmarkerspart(bundler, markers)
1261 1261
1262 1262 def check_heads(repo, their_heads, context):
1263 1263 """check if the heads of a repo have been modified
1264 1264
1265 1265 Used by peer for unbundling.
1266 1266 """
1267 1267 heads = repo.heads()
1268 1268 heads_hash = util.sha1(''.join(sorted(heads))).digest()
1269 1269 if not (their_heads == ['force'] or their_heads == heads or
1270 1270 their_heads == ['hashed', heads_hash]):
1271 1271 # someone else committed/pushed/unbundled while we
1272 1272 # were transferring data
1273 1273 raise error.PushRaced('repository changed while %s - '
1274 1274 'please try again' % context)
1275 1275
1276 1276 def unbundle(repo, cg, heads, source, url):
1277 1277 """Apply a bundle to a repo.
1278 1278
1279 1279 this function makes sure the repo is locked during the application and have
1280 1280 mechanism to check that no push race occurred between the creation of the
1281 1281 bundle and its application.
1282 1282
1283 1283 If the push was raced as PushRaced exception is raised."""
1284 1284 r = 0
1285 1285 # need a transaction when processing a bundle2 stream
1286 1286 wlock = lock = tr = None
1287 1287 try:
1288 1288 check_heads(repo, heads, 'uploading changes')
1289 1289 # push can proceed
1290 1290 if util.safehasattr(cg, 'params'):
1291 r = None
1291 1292 try:
1292 1293 wlock = repo.wlock()
1293 1294 lock = repo.lock()
1294 1295 tr = repo.transaction(source)
1295 1296 tr.hookargs['source'] = source
1296 1297 tr.hookargs['url'] = url
1297 1298 tr.hookargs['bundle2'] = '1'
1298 1299 r = bundle2.processbundle(repo, cg, lambda: tr).reply
1299 1300 tr.close()
1300 1301 except Exception, exc:
1301 1302 exc.duringunbundle2 = True
1303 if r is not None:
1304 exc._bundle2salvagedoutput = r.salvageoutput()
1302 1305 raise
1303 1306 else:
1304 1307 lock = repo.lock()
1305 1308 r = changegroup.addchangegroup(repo, cg, source, url)
1306 1309 finally:
1307 1310 lockmod.release(tr, lock, wlock)
1308 1311 return r
General Comments 0
You need to be logged in to leave comments. Login now