##// END OF EJS Templates
exchange: obtain compression engines from the registrar...
Gregory Szorc -
r30440:c3944ab1 default
parent child Browse files
Show More
@@ -1,1956 +1,1951
1 1 # exchange.py - utility to exchange data between repos.
2 2 #
3 3 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import errno
11 11 import hashlib
12 12
13 13 from .i18n import _
14 14 from .node import (
15 15 hex,
16 16 nullid,
17 17 )
18 18 from . import (
19 19 base85,
20 20 bookmarks as bookmod,
21 21 bundle2,
22 22 changegroup,
23 23 discovery,
24 24 error,
25 25 lock as lockmod,
26 26 obsolete,
27 27 phases,
28 28 pushkey,
29 29 scmutil,
30 30 sslutil,
31 31 streamclone,
32 32 tags,
33 33 url as urlmod,
34 34 util,
35 35 )
36 36
37 37 urlerr = util.urlerr
38 38 urlreq = util.urlreq
39 39
40 # Maps bundle compression human names to internal representation.
41 _bundlespeccompressions = {'none': None,
42 'bzip2': 'BZ',
43 'gzip': 'GZ',
44 }
45
46 40 # Maps bundle version human names to changegroup versions.
47 41 _bundlespeccgversions = {'v1': '01',
48 42 'v2': '02',
49 43 'packed1': 's1',
50 44 'bundle2': '02', #legacy
51 45 }
52 46
53 47 def parsebundlespec(repo, spec, strict=True, externalnames=False):
54 48 """Parse a bundle string specification into parts.
55 49
56 50 Bundle specifications denote a well-defined bundle/exchange format.
57 51 The content of a given specification should not change over time in
58 52 order to ensure that bundles produced by a newer version of Mercurial are
59 53 readable from an older version.
60 54
61 55 The string currently has the form:
62 56
63 57 <compression>-<type>[;<parameter0>[;<parameter1>]]
64 58
65 59 Where <compression> is one of the supported compression formats
66 60 and <type> is (currently) a version string. A ";" can follow the type and
67 61 all text afterwards is interpreted as URI encoded, ";" delimited key=value
68 62 pairs.
69 63
70 64 If ``strict`` is True (the default) <compression> is required. Otherwise,
71 65 it is optional.
72 66
73 67 If ``externalnames`` is False (the default), the human-centric names will
74 68 be converted to their internal representation.
75 69
76 70 Returns a 3-tuple of (compression, version, parameters). Compression will
77 71 be ``None`` if not in strict mode and a compression isn't defined.
78 72
79 73 An ``InvalidBundleSpecification`` is raised when the specification is
80 74 not syntactically well formed.
81 75
82 76 An ``UnsupportedBundleSpecification`` is raised when the compression or
83 77 bundle type/version is not recognized.
84 78
85 79 Note: this function will likely eventually return a more complex data
86 80 structure, including bundle2 part information.
87 81 """
88 82 def parseparams(s):
89 83 if ';' not in s:
90 84 return s, {}
91 85
92 86 params = {}
93 87 version, paramstr = s.split(';', 1)
94 88
95 89 for p in paramstr.split(';'):
96 90 if '=' not in p:
97 91 raise error.InvalidBundleSpecification(
98 92 _('invalid bundle specification: '
99 93 'missing "=" in parameter: %s') % p)
100 94
101 95 key, value = p.split('=', 1)
102 96 key = urlreq.unquote(key)
103 97 value = urlreq.unquote(value)
104 98 params[key] = value
105 99
106 100 return version, params
107 101
108 102
109 103 if strict and '-' not in spec:
110 104 raise error.InvalidBundleSpecification(
111 105 _('invalid bundle specification; '
112 106 'must be prefixed with compression: %s') % spec)
113 107
114 108 if '-' in spec:
115 109 compression, version = spec.split('-', 1)
116 110
117 if compression not in _bundlespeccompressions:
111 if compression not in util.compengines.supportedbundlenames:
118 112 raise error.UnsupportedBundleSpecification(
119 113 _('%s compression is not supported') % compression)
120 114
121 115 version, params = parseparams(version)
122 116
123 117 if version not in _bundlespeccgversions:
124 118 raise error.UnsupportedBundleSpecification(
125 119 _('%s is not a recognized bundle version') % version)
126 120 else:
127 121 # Value could be just the compression or just the version, in which
128 122 # case some defaults are assumed (but only when not in strict mode).
129 123 assert not strict
130 124
131 125 spec, params = parseparams(spec)
132 126
133 if spec in _bundlespeccompressions:
127 if spec in util.compengines.supportedbundlenames:
134 128 compression = spec
135 129 version = 'v1'
136 130 if 'generaldelta' in repo.requirements:
137 131 version = 'v2'
138 132 elif spec in _bundlespeccgversions:
139 133 if spec == 'packed1':
140 134 compression = 'none'
141 135 else:
142 136 compression = 'bzip2'
143 137 version = spec
144 138 else:
145 139 raise error.UnsupportedBundleSpecification(
146 140 _('%s is not a recognized bundle specification') % spec)
147 141
148 142 # The specification for packed1 can optionally declare the data formats
149 143 # required to apply it. If we see this metadata, compare against what the
150 144 # repo supports and error if the bundle isn't compatible.
151 145 if version == 'packed1' and 'requirements' in params:
152 146 requirements = set(params['requirements'].split(','))
153 147 missingreqs = requirements - repo.supportedformats
154 148 if missingreqs:
155 149 raise error.UnsupportedBundleSpecification(
156 150 _('missing support for repository features: %s') %
157 151 ', '.join(sorted(missingreqs)))
158 152
159 153 if not externalnames:
160 compression = _bundlespeccompressions[compression]
154 engine = util.compengines.forbundlename(compression)
155 compression = engine.bundletype()[1]
161 156 version = _bundlespeccgversions[version]
162 157 return compression, version, params
163 158
164 159 def readbundle(ui, fh, fname, vfs=None):
165 160 header = changegroup.readexactly(fh, 4)
166 161
167 162 alg = None
168 163 if not fname:
169 164 fname = "stream"
170 165 if not header.startswith('HG') and header.startswith('\0'):
171 166 fh = changegroup.headerlessfixup(fh, header)
172 167 header = "HG10"
173 168 alg = 'UN'
174 169 elif vfs:
175 170 fname = vfs.join(fname)
176 171
177 172 magic, version = header[0:2], header[2:4]
178 173
179 174 if magic != 'HG':
180 175 raise error.Abort(_('%s: not a Mercurial bundle') % fname)
181 176 if version == '10':
182 177 if alg is None:
183 178 alg = changegroup.readexactly(fh, 2)
184 179 return changegroup.cg1unpacker(fh, alg)
185 180 elif version.startswith('2'):
186 181 return bundle2.getunbundler(ui, fh, magicstring=magic + version)
187 182 elif version == 'S1':
188 183 return streamclone.streamcloneapplier(fh)
189 184 else:
190 185 raise error.Abort(_('%s: unknown bundle version %s') % (fname, version))
191 186
192 187 def getbundlespec(ui, fh):
193 188 """Infer the bundlespec from a bundle file handle.
194 189
195 190 The input file handle is seeked and the original seek position is not
196 191 restored.
197 192 """
198 193 def speccompression(alg):
199 for k, v in _bundlespeccompressions.items():
200 if v == alg:
201 return k
202 return None
194 try:
195 return util.compengines.forbundletype(alg).bundletype()[0]
196 except KeyError:
197 return None
203 198
204 199 b = readbundle(ui, fh, None)
205 200 if isinstance(b, changegroup.cg1unpacker):
206 201 alg = b._type
207 202 if alg == '_truncatedBZ':
208 203 alg = 'BZ'
209 204 comp = speccompression(alg)
210 205 if not comp:
211 206 raise error.Abort(_('unknown compression algorithm: %s') % alg)
212 207 return '%s-v1' % comp
213 208 elif isinstance(b, bundle2.unbundle20):
214 209 if 'Compression' in b.params:
215 210 comp = speccompression(b.params['Compression'])
216 211 if not comp:
217 212 raise error.Abort(_('unknown compression algorithm: %s') % comp)
218 213 else:
219 214 comp = 'none'
220 215
221 216 version = None
222 217 for part in b.iterparts():
223 218 if part.type == 'changegroup':
224 219 version = part.params['version']
225 220 if version in ('01', '02'):
226 221 version = 'v2'
227 222 else:
228 223 raise error.Abort(_('changegroup version %s does not have '
229 224 'a known bundlespec') % version,
230 225 hint=_('try upgrading your Mercurial '
231 226 'client'))
232 227
233 228 if not version:
234 229 raise error.Abort(_('could not identify changegroup version in '
235 230 'bundle'))
236 231
237 232 return '%s-%s' % (comp, version)
238 233 elif isinstance(b, streamclone.streamcloneapplier):
239 234 requirements = streamclone.readbundle1header(fh)[2]
240 235 params = 'requirements=%s' % ','.join(sorted(requirements))
241 236 return 'none-packed1;%s' % urlreq.quote(params)
242 237 else:
243 238 raise error.Abort(_('unknown bundle type: %s') % b)
244 239
245 240 def buildobsmarkerspart(bundler, markers):
246 241 """add an obsmarker part to the bundler with <markers>
247 242
248 243 No part is created if markers is empty.
249 244 Raises ValueError if the bundler doesn't support any known obsmarker format.
250 245 """
251 246 if markers:
252 247 remoteversions = bundle2.obsmarkersversion(bundler.capabilities)
253 248 version = obsolete.commonversion(remoteversions)
254 249 if version is None:
255 250 raise ValueError('bundler does not support common obsmarker format')
256 251 stream = obsolete.encodemarkers(markers, True, version=version)
257 252 return bundler.newpart('obsmarkers', data=stream)
258 253 return None
259 254
260 255 def _computeoutgoing(repo, heads, common):
261 256 """Computes which revs are outgoing given a set of common
262 257 and a set of heads.
263 258
264 259 This is a separate function so extensions can have access to
265 260 the logic.
266 261
267 262 Returns a discovery.outgoing object.
268 263 """
269 264 cl = repo.changelog
270 265 if common:
271 266 hasnode = cl.hasnode
272 267 common = [n for n in common if hasnode(n)]
273 268 else:
274 269 common = [nullid]
275 270 if not heads:
276 271 heads = cl.heads()
277 272 return discovery.outgoing(repo, common, heads)
278 273
279 274 def _forcebundle1(op):
280 275 """return true if a pull/push must use bundle1
281 276
282 277 This function is used to allow testing of the older bundle version"""
283 278 ui = op.repo.ui
284 279 forcebundle1 = False
285 280 # The goal is this config is to allow developer to choose the bundle
286 281 # version used during exchanged. This is especially handy during test.
287 282 # Value is a list of bundle version to be picked from, highest version
288 283 # should be used.
289 284 #
290 285 # developer config: devel.legacy.exchange
291 286 exchange = ui.configlist('devel', 'legacy.exchange')
292 287 forcebundle1 = 'bundle2' not in exchange and 'bundle1' in exchange
293 288 return forcebundle1 or not op.remote.capable('bundle2')
294 289
295 290 class pushoperation(object):
296 291 """A object that represent a single push operation
297 292
298 293 Its purpose is to carry push related state and very common operations.
299 294
300 295 A new pushoperation should be created at the beginning of each push and
301 296 discarded afterward.
302 297 """
303 298
304 299 def __init__(self, repo, remote, force=False, revs=None, newbranch=False,
305 300 bookmarks=()):
306 301 # repo we push from
307 302 self.repo = repo
308 303 self.ui = repo.ui
309 304 # repo we push to
310 305 self.remote = remote
311 306 # force option provided
312 307 self.force = force
313 308 # revs to be pushed (None is "all")
314 309 self.revs = revs
315 310 # bookmark explicitly pushed
316 311 self.bookmarks = bookmarks
317 312 # allow push of new branch
318 313 self.newbranch = newbranch
319 314 # did a local lock get acquired?
320 315 self.locallocked = None
321 316 # step already performed
322 317 # (used to check what steps have been already performed through bundle2)
323 318 self.stepsdone = set()
324 319 # Integer version of the changegroup push result
325 320 # - None means nothing to push
326 321 # - 0 means HTTP error
327 322 # - 1 means we pushed and remote head count is unchanged *or*
328 323 # we have outgoing changesets but refused to push
329 324 # - other values as described by addchangegroup()
330 325 self.cgresult = None
331 326 # Boolean value for the bookmark push
332 327 self.bkresult = None
333 328 # discover.outgoing object (contains common and outgoing data)
334 329 self.outgoing = None
335 330 # all remote heads before the push
336 331 self.remoteheads = None
337 332 # testable as a boolean indicating if any nodes are missing locally.
338 333 self.incoming = None
339 334 # phases changes that must be pushed along side the changesets
340 335 self.outdatedphases = None
341 336 # phases changes that must be pushed if changeset push fails
342 337 self.fallbackoutdatedphases = None
343 338 # outgoing obsmarkers
344 339 self.outobsmarkers = set()
345 340 # outgoing bookmarks
346 341 self.outbookmarks = []
347 342 # transaction manager
348 343 self.trmanager = None
349 344 # map { pushkey partid -> callback handling failure}
350 345 # used to handle exception from mandatory pushkey part failure
351 346 self.pkfailcb = {}
352 347
353 348 @util.propertycache
354 349 def futureheads(self):
355 350 """future remote heads if the changeset push succeeds"""
356 351 return self.outgoing.missingheads
357 352
358 353 @util.propertycache
359 354 def fallbackheads(self):
360 355 """future remote heads if the changeset push fails"""
361 356 if self.revs is None:
362 357 # not target to push, all common are relevant
363 358 return self.outgoing.commonheads
364 359 unfi = self.repo.unfiltered()
365 360 # I want cheads = heads(::missingheads and ::commonheads)
366 361 # (missingheads is revs with secret changeset filtered out)
367 362 #
368 363 # This can be expressed as:
369 364 # cheads = ( (missingheads and ::commonheads)
370 365 # + (commonheads and ::missingheads))"
371 366 # )
372 367 #
373 368 # while trying to push we already computed the following:
374 369 # common = (::commonheads)
375 370 # missing = ((commonheads::missingheads) - commonheads)
376 371 #
377 372 # We can pick:
378 373 # * missingheads part of common (::commonheads)
379 374 common = self.outgoing.common
380 375 nm = self.repo.changelog.nodemap
381 376 cheads = [node for node in self.revs if nm[node] in common]
382 377 # and
383 378 # * commonheads parents on missing
384 379 revset = unfi.set('%ln and parents(roots(%ln))',
385 380 self.outgoing.commonheads,
386 381 self.outgoing.missing)
387 382 cheads.extend(c.node() for c in revset)
388 383 return cheads
389 384
390 385 @property
391 386 def commonheads(self):
392 387 """set of all common heads after changeset bundle push"""
393 388 if self.cgresult:
394 389 return self.futureheads
395 390 else:
396 391 return self.fallbackheads
397 392
398 393 # mapping of message used when pushing bookmark
399 394 bookmsgmap = {'update': (_("updating bookmark %s\n"),
400 395 _('updating bookmark %s failed!\n')),
401 396 'export': (_("exporting bookmark %s\n"),
402 397 _('exporting bookmark %s failed!\n')),
403 398 'delete': (_("deleting remote bookmark %s\n"),
404 399 _('deleting remote bookmark %s failed!\n')),
405 400 }
406 401
407 402
408 403 def push(repo, remote, force=False, revs=None, newbranch=False, bookmarks=(),
409 404 opargs=None):
410 405 '''Push outgoing changesets (limited by revs) from a local
411 406 repository to remote. Return an integer:
412 407 - None means nothing to push
413 408 - 0 means HTTP error
414 409 - 1 means we pushed and remote head count is unchanged *or*
415 410 we have outgoing changesets but refused to push
416 411 - other values as described by addchangegroup()
417 412 '''
418 413 if opargs is None:
419 414 opargs = {}
420 415 pushop = pushoperation(repo, remote, force, revs, newbranch, bookmarks,
421 416 **opargs)
422 417 if pushop.remote.local():
423 418 missing = (set(pushop.repo.requirements)
424 419 - pushop.remote.local().supported)
425 420 if missing:
426 421 msg = _("required features are not"
427 422 " supported in the destination:"
428 423 " %s") % (', '.join(sorted(missing)))
429 424 raise error.Abort(msg)
430 425
431 426 # there are two ways to push to remote repo:
432 427 #
433 428 # addchangegroup assumes local user can lock remote
434 429 # repo (local filesystem, old ssh servers).
435 430 #
436 431 # unbundle assumes local user cannot lock remote repo (new ssh
437 432 # servers, http servers).
438 433
439 434 if not pushop.remote.canpush():
440 435 raise error.Abort(_("destination does not support push"))
441 436 # get local lock as we might write phase data
442 437 localwlock = locallock = None
443 438 try:
444 439 # bundle2 push may receive a reply bundle touching bookmarks or other
445 440 # things requiring the wlock. Take it now to ensure proper ordering.
446 441 maypushback = pushop.ui.configbool('experimental', 'bundle2.pushback')
447 442 if (not _forcebundle1(pushop)) and maypushback:
448 443 localwlock = pushop.repo.wlock()
449 444 locallock = pushop.repo.lock()
450 445 pushop.locallocked = True
451 446 except IOError as err:
452 447 pushop.locallocked = False
453 448 if err.errno != errno.EACCES:
454 449 raise
455 450 # source repo cannot be locked.
456 451 # We do not abort the push, but just disable the local phase
457 452 # synchronisation.
458 453 msg = 'cannot lock source repository: %s\n' % err
459 454 pushop.ui.debug(msg)
460 455 try:
461 456 if pushop.locallocked:
462 457 pushop.trmanager = transactionmanager(pushop.repo,
463 458 'push-response',
464 459 pushop.remote.url())
465 460 pushop.repo.checkpush(pushop)
466 461 lock = None
467 462 unbundle = pushop.remote.capable('unbundle')
468 463 if not unbundle:
469 464 lock = pushop.remote.lock()
470 465 try:
471 466 _pushdiscovery(pushop)
472 467 if not _forcebundle1(pushop):
473 468 _pushbundle2(pushop)
474 469 _pushchangeset(pushop)
475 470 _pushsyncphase(pushop)
476 471 _pushobsolete(pushop)
477 472 _pushbookmark(pushop)
478 473 finally:
479 474 if lock is not None:
480 475 lock.release()
481 476 if pushop.trmanager:
482 477 pushop.trmanager.close()
483 478 finally:
484 479 if pushop.trmanager:
485 480 pushop.trmanager.release()
486 481 if locallock is not None:
487 482 locallock.release()
488 483 if localwlock is not None:
489 484 localwlock.release()
490 485
491 486 return pushop
492 487
493 488 # list of steps to perform discovery before push
494 489 pushdiscoveryorder = []
495 490
496 491 # Mapping between step name and function
497 492 #
498 493 # This exists to help extensions wrap steps if necessary
499 494 pushdiscoverymapping = {}
500 495
501 496 def pushdiscovery(stepname):
502 497 """decorator for function performing discovery before push
503 498
504 499 The function is added to the step -> function mapping and appended to the
505 500 list of steps. Beware that decorated function will be added in order (this
506 501 may matter).
507 502
508 503 You can only use this decorator for a new step, if you want to wrap a step
509 504 from an extension, change the pushdiscovery dictionary directly."""
510 505 def dec(func):
511 506 assert stepname not in pushdiscoverymapping
512 507 pushdiscoverymapping[stepname] = func
513 508 pushdiscoveryorder.append(stepname)
514 509 return func
515 510 return dec
516 511
517 512 def _pushdiscovery(pushop):
518 513 """Run all discovery steps"""
519 514 for stepname in pushdiscoveryorder:
520 515 step = pushdiscoverymapping[stepname]
521 516 step(pushop)
522 517
523 518 @pushdiscovery('changeset')
524 519 def _pushdiscoverychangeset(pushop):
525 520 """discover the changeset that need to be pushed"""
526 521 fci = discovery.findcommonincoming
527 522 commoninc = fci(pushop.repo, pushop.remote, force=pushop.force)
528 523 common, inc, remoteheads = commoninc
529 524 fco = discovery.findcommonoutgoing
530 525 outgoing = fco(pushop.repo, pushop.remote, onlyheads=pushop.revs,
531 526 commoninc=commoninc, force=pushop.force)
532 527 pushop.outgoing = outgoing
533 528 pushop.remoteheads = remoteheads
534 529 pushop.incoming = inc
535 530
536 531 @pushdiscovery('phase')
537 532 def _pushdiscoveryphase(pushop):
538 533 """discover the phase that needs to be pushed
539 534
540 535 (computed for both success and failure case for changesets push)"""
541 536 outgoing = pushop.outgoing
542 537 unfi = pushop.repo.unfiltered()
543 538 remotephases = pushop.remote.listkeys('phases')
544 539 publishing = remotephases.get('publishing', False)
545 540 if (pushop.ui.configbool('ui', '_usedassubrepo', False)
546 541 and remotephases # server supports phases
547 542 and not pushop.outgoing.missing # no changesets to be pushed
548 543 and publishing):
549 544 # When:
550 545 # - this is a subrepo push
551 546 # - and remote support phase
552 547 # - and no changeset are to be pushed
553 548 # - and remote is publishing
554 549 # We may be in issue 3871 case!
555 550 # We drop the possible phase synchronisation done by
556 551 # courtesy to publish changesets possibly locally draft
557 552 # on the remote.
558 553 remotephases = {'publishing': 'True'}
559 554 ana = phases.analyzeremotephases(pushop.repo,
560 555 pushop.fallbackheads,
561 556 remotephases)
562 557 pheads, droots = ana
563 558 extracond = ''
564 559 if not publishing:
565 560 extracond = ' and public()'
566 561 revset = 'heads((%%ln::%%ln) %s)' % extracond
567 562 # Get the list of all revs draft on remote by public here.
568 563 # XXX Beware that revset break if droots is not strictly
569 564 # XXX root we may want to ensure it is but it is costly
570 565 fallback = list(unfi.set(revset, droots, pushop.fallbackheads))
571 566 if not outgoing.missing:
572 567 future = fallback
573 568 else:
574 569 # adds changeset we are going to push as draft
575 570 #
576 571 # should not be necessary for publishing server, but because of an
577 572 # issue fixed in xxxxx we have to do it anyway.
578 573 fdroots = list(unfi.set('roots(%ln + %ln::)',
579 574 outgoing.missing, droots))
580 575 fdroots = [f.node() for f in fdroots]
581 576 future = list(unfi.set(revset, fdroots, pushop.futureheads))
582 577 pushop.outdatedphases = future
583 578 pushop.fallbackoutdatedphases = fallback
584 579
585 580 @pushdiscovery('obsmarker')
586 581 def _pushdiscoveryobsmarkers(pushop):
587 582 if (obsolete.isenabled(pushop.repo, obsolete.exchangeopt)
588 583 and pushop.repo.obsstore
589 584 and 'obsolete' in pushop.remote.listkeys('namespaces')):
590 585 repo = pushop.repo
591 586 # very naive computation, that can be quite expensive on big repo.
592 587 # However: evolution is currently slow on them anyway.
593 588 nodes = (c.node() for c in repo.set('::%ln', pushop.futureheads))
594 589 pushop.outobsmarkers = pushop.repo.obsstore.relevantmarkers(nodes)
595 590
596 591 @pushdiscovery('bookmarks')
597 592 def _pushdiscoverybookmarks(pushop):
598 593 ui = pushop.ui
599 594 repo = pushop.repo.unfiltered()
600 595 remote = pushop.remote
601 596 ui.debug("checking for updated bookmarks\n")
602 597 ancestors = ()
603 598 if pushop.revs:
604 599 revnums = map(repo.changelog.rev, pushop.revs)
605 600 ancestors = repo.changelog.ancestors(revnums, inclusive=True)
606 601 remotebookmark = remote.listkeys('bookmarks')
607 602
608 603 explicit = set([repo._bookmarks.expandname(bookmark)
609 604 for bookmark in pushop.bookmarks])
610 605
611 606 comp = bookmod.compare(repo, repo._bookmarks, remotebookmark, srchex=hex)
612 607 addsrc, adddst, advsrc, advdst, diverge, differ, invalid, same = comp
613 608 for b, scid, dcid in advsrc:
614 609 if b in explicit:
615 610 explicit.remove(b)
616 611 if not ancestors or repo[scid].rev() in ancestors:
617 612 pushop.outbookmarks.append((b, dcid, scid))
618 613 # search added bookmark
619 614 for b, scid, dcid in addsrc:
620 615 if b in explicit:
621 616 explicit.remove(b)
622 617 pushop.outbookmarks.append((b, '', scid))
623 618 # search for overwritten bookmark
624 619 for b, scid, dcid in advdst + diverge + differ:
625 620 if b in explicit:
626 621 explicit.remove(b)
627 622 pushop.outbookmarks.append((b, dcid, scid))
628 623 # search for bookmark to delete
629 624 for b, scid, dcid in adddst:
630 625 if b in explicit:
631 626 explicit.remove(b)
632 627 # treat as "deleted locally"
633 628 pushop.outbookmarks.append((b, dcid, ''))
634 629 # identical bookmarks shouldn't get reported
635 630 for b, scid, dcid in same:
636 631 if b in explicit:
637 632 explicit.remove(b)
638 633
639 634 if explicit:
640 635 explicit = sorted(explicit)
641 636 # we should probably list all of them
642 637 ui.warn(_('bookmark %s does not exist on the local '
643 638 'or remote repository!\n') % explicit[0])
644 639 pushop.bkresult = 2
645 640
646 641 pushop.outbookmarks.sort()
647 642
648 643 def _pushcheckoutgoing(pushop):
649 644 outgoing = pushop.outgoing
650 645 unfi = pushop.repo.unfiltered()
651 646 if not outgoing.missing:
652 647 # nothing to push
653 648 scmutil.nochangesfound(unfi.ui, unfi, outgoing.excluded)
654 649 return False
655 650 # something to push
656 651 if not pushop.force:
657 652 # if repo.obsstore == False --> no obsolete
658 653 # then, save the iteration
659 654 if unfi.obsstore:
660 655 # this message are here for 80 char limit reason
661 656 mso = _("push includes obsolete changeset: %s!")
662 657 mst = {"unstable": _("push includes unstable changeset: %s!"),
663 658 "bumped": _("push includes bumped changeset: %s!"),
664 659 "divergent": _("push includes divergent changeset: %s!")}
665 660 # If we are to push if there is at least one
666 661 # obsolete or unstable changeset in missing, at
667 662 # least one of the missinghead will be obsolete or
668 663 # unstable. So checking heads only is ok
669 664 for node in outgoing.missingheads:
670 665 ctx = unfi[node]
671 666 if ctx.obsolete():
672 667 raise error.Abort(mso % ctx)
673 668 elif ctx.troubled():
674 669 raise error.Abort(mst[ctx.troubles()[0]] % ctx)
675 670
676 671 discovery.checkheads(pushop)
677 672 return True
678 673
679 674 # List of names of steps to perform for an outgoing bundle2, order matters.
680 675 b2partsgenorder = []
681 676
682 677 # Mapping between step name and function
683 678 #
684 679 # This exists to help extensions wrap steps if necessary
685 680 b2partsgenmapping = {}
686 681
687 682 def b2partsgenerator(stepname, idx=None):
688 683 """decorator for function generating bundle2 part
689 684
690 685 The function is added to the step -> function mapping and appended to the
691 686 list of steps. Beware that decorated functions will be added in order
692 687 (this may matter).
693 688
694 689 You can only use this decorator for new steps, if you want to wrap a step
695 690 from an extension, attack the b2partsgenmapping dictionary directly."""
696 691 def dec(func):
697 692 assert stepname not in b2partsgenmapping
698 693 b2partsgenmapping[stepname] = func
699 694 if idx is None:
700 695 b2partsgenorder.append(stepname)
701 696 else:
702 697 b2partsgenorder.insert(idx, stepname)
703 698 return func
704 699 return dec
705 700
706 701 def _pushb2ctxcheckheads(pushop, bundler):
707 702 """Generate race condition checking parts
708 703
709 704 Exists as an independent function to aid extensions
710 705 """
711 706 if not pushop.force:
712 707 bundler.newpart('check:heads', data=iter(pushop.remoteheads))
713 708
714 709 @b2partsgenerator('changeset')
715 710 def _pushb2ctx(pushop, bundler):
716 711 """handle changegroup push through bundle2
717 712
718 713 addchangegroup result is stored in the ``pushop.cgresult`` attribute.
719 714 """
720 715 if 'changesets' in pushop.stepsdone:
721 716 return
722 717 pushop.stepsdone.add('changesets')
723 718 # Send known heads to the server for race detection.
724 719 if not _pushcheckoutgoing(pushop):
725 720 return
726 721 pushop.repo.prepushoutgoinghooks(pushop)
727 722
728 723 _pushb2ctxcheckheads(pushop, bundler)
729 724
730 725 b2caps = bundle2.bundle2caps(pushop.remote)
731 726 version = '01'
732 727 cgversions = b2caps.get('changegroup')
733 728 if cgversions: # 3.1 and 3.2 ship with an empty value
734 729 cgversions = [v for v in cgversions
735 730 if v in changegroup.supportedoutgoingversions(
736 731 pushop.repo)]
737 732 if not cgversions:
738 733 raise ValueError(_('no common changegroup version'))
739 734 version = max(cgversions)
740 735 cg = changegroup.getlocalchangegroupraw(pushop.repo, 'push',
741 736 pushop.outgoing,
742 737 version=version)
743 738 cgpart = bundler.newpart('changegroup', data=cg)
744 739 if cgversions:
745 740 cgpart.addparam('version', version)
746 741 if 'treemanifest' in pushop.repo.requirements:
747 742 cgpart.addparam('treemanifest', '1')
748 743 def handlereply(op):
749 744 """extract addchangegroup returns from server reply"""
750 745 cgreplies = op.records.getreplies(cgpart.id)
751 746 assert len(cgreplies['changegroup']) == 1
752 747 pushop.cgresult = cgreplies['changegroup'][0]['return']
753 748 return handlereply
754 749
755 750 @b2partsgenerator('phase')
756 751 def _pushb2phases(pushop, bundler):
757 752 """handle phase push through bundle2"""
758 753 if 'phases' in pushop.stepsdone:
759 754 return
760 755 b2caps = bundle2.bundle2caps(pushop.remote)
761 756 if not 'pushkey' in b2caps:
762 757 return
763 758 pushop.stepsdone.add('phases')
764 759 part2node = []
765 760
766 761 def handlefailure(pushop, exc):
767 762 targetid = int(exc.partid)
768 763 for partid, node in part2node:
769 764 if partid == targetid:
770 765 raise error.Abort(_('updating %s to public failed') % node)
771 766
772 767 enc = pushkey.encode
773 768 for newremotehead in pushop.outdatedphases:
774 769 part = bundler.newpart('pushkey')
775 770 part.addparam('namespace', enc('phases'))
776 771 part.addparam('key', enc(newremotehead.hex()))
777 772 part.addparam('old', enc(str(phases.draft)))
778 773 part.addparam('new', enc(str(phases.public)))
779 774 part2node.append((part.id, newremotehead))
780 775 pushop.pkfailcb[part.id] = handlefailure
781 776
782 777 def handlereply(op):
783 778 for partid, node in part2node:
784 779 partrep = op.records.getreplies(partid)
785 780 results = partrep['pushkey']
786 781 assert len(results) <= 1
787 782 msg = None
788 783 if not results:
789 784 msg = _('server ignored update of %s to public!\n') % node
790 785 elif not int(results[0]['return']):
791 786 msg = _('updating %s to public failed!\n') % node
792 787 if msg is not None:
793 788 pushop.ui.warn(msg)
794 789 return handlereply
795 790
796 791 @b2partsgenerator('obsmarkers')
797 792 def _pushb2obsmarkers(pushop, bundler):
798 793 if 'obsmarkers' in pushop.stepsdone:
799 794 return
800 795 remoteversions = bundle2.obsmarkersversion(bundler.capabilities)
801 796 if obsolete.commonversion(remoteversions) is None:
802 797 return
803 798 pushop.stepsdone.add('obsmarkers')
804 799 if pushop.outobsmarkers:
805 800 markers = sorted(pushop.outobsmarkers)
806 801 buildobsmarkerspart(bundler, markers)
807 802
808 803 @b2partsgenerator('bookmarks')
809 804 def _pushb2bookmarks(pushop, bundler):
810 805 """handle bookmark push through bundle2"""
811 806 if 'bookmarks' in pushop.stepsdone:
812 807 return
813 808 b2caps = bundle2.bundle2caps(pushop.remote)
814 809 if 'pushkey' not in b2caps:
815 810 return
816 811 pushop.stepsdone.add('bookmarks')
817 812 part2book = []
818 813 enc = pushkey.encode
819 814
820 815 def handlefailure(pushop, exc):
821 816 targetid = int(exc.partid)
822 817 for partid, book, action in part2book:
823 818 if partid == targetid:
824 819 raise error.Abort(bookmsgmap[action][1].rstrip() % book)
825 820 # we should not be called for part we did not generated
826 821 assert False
827 822
828 823 for book, old, new in pushop.outbookmarks:
829 824 part = bundler.newpart('pushkey')
830 825 part.addparam('namespace', enc('bookmarks'))
831 826 part.addparam('key', enc(book))
832 827 part.addparam('old', enc(old))
833 828 part.addparam('new', enc(new))
834 829 action = 'update'
835 830 if not old:
836 831 action = 'export'
837 832 elif not new:
838 833 action = 'delete'
839 834 part2book.append((part.id, book, action))
840 835 pushop.pkfailcb[part.id] = handlefailure
841 836
842 837 def handlereply(op):
843 838 ui = pushop.ui
844 839 for partid, book, action in part2book:
845 840 partrep = op.records.getreplies(partid)
846 841 results = partrep['pushkey']
847 842 assert len(results) <= 1
848 843 if not results:
849 844 pushop.ui.warn(_('server ignored bookmark %s update\n') % book)
850 845 else:
851 846 ret = int(results[0]['return'])
852 847 if ret:
853 848 ui.status(bookmsgmap[action][0] % book)
854 849 else:
855 850 ui.warn(bookmsgmap[action][1] % book)
856 851 if pushop.bkresult is not None:
857 852 pushop.bkresult = 1
858 853 return handlereply
859 854
860 855
861 856 def _pushbundle2(pushop):
862 857 """push data to the remote using bundle2
863 858
864 859 The only currently supported type of data is changegroup but this will
865 860 evolve in the future."""
866 861 bundler = bundle2.bundle20(pushop.ui, bundle2.bundle2caps(pushop.remote))
867 862 pushback = (pushop.trmanager
868 863 and pushop.ui.configbool('experimental', 'bundle2.pushback'))
869 864
870 865 # create reply capability
871 866 capsblob = bundle2.encodecaps(bundle2.getrepocaps(pushop.repo,
872 867 allowpushback=pushback))
873 868 bundler.newpart('replycaps', data=capsblob)
874 869 replyhandlers = []
875 870 for partgenname in b2partsgenorder:
876 871 partgen = b2partsgenmapping[partgenname]
877 872 ret = partgen(pushop, bundler)
878 873 if callable(ret):
879 874 replyhandlers.append(ret)
880 875 # do not push if nothing to push
881 876 if bundler.nbparts <= 1:
882 877 return
883 878 stream = util.chunkbuffer(bundler.getchunks())
884 879 try:
885 880 try:
886 881 reply = pushop.remote.unbundle(
887 882 stream, ['force'], pushop.remote.url())
888 883 except error.BundleValueError as exc:
889 884 raise error.Abort(_('missing support for %s') % exc)
890 885 try:
891 886 trgetter = None
892 887 if pushback:
893 888 trgetter = pushop.trmanager.transaction
894 889 op = bundle2.processbundle(pushop.repo, reply, trgetter)
895 890 except error.BundleValueError as exc:
896 891 raise error.Abort(_('missing support for %s') % exc)
897 892 except bundle2.AbortFromPart as exc:
898 893 pushop.ui.status(_('remote: %s\n') % exc)
899 894 raise error.Abort(_('push failed on remote'), hint=exc.hint)
900 895 except error.PushkeyFailed as exc:
901 896 partid = int(exc.partid)
902 897 if partid not in pushop.pkfailcb:
903 898 raise
904 899 pushop.pkfailcb[partid](pushop, exc)
905 900 for rephand in replyhandlers:
906 901 rephand(op)
907 902
908 903 def _pushchangeset(pushop):
909 904 """Make the actual push of changeset bundle to remote repo"""
910 905 if 'changesets' in pushop.stepsdone:
911 906 return
912 907 pushop.stepsdone.add('changesets')
913 908 if not _pushcheckoutgoing(pushop):
914 909 return
915 910 pushop.repo.prepushoutgoinghooks(pushop)
916 911 outgoing = pushop.outgoing
917 912 unbundle = pushop.remote.capable('unbundle')
918 913 # TODO: get bundlecaps from remote
919 914 bundlecaps = None
920 915 # create a changegroup from local
921 916 if pushop.revs is None and not (outgoing.excluded
922 917 or pushop.repo.changelog.filteredrevs):
923 918 # push everything,
924 919 # use the fast path, no race possible on push
925 920 bundler = changegroup.cg1packer(pushop.repo, bundlecaps)
926 921 cg = changegroup.getsubset(pushop.repo,
927 922 outgoing,
928 923 bundler,
929 924 'push',
930 925 fastpath=True)
931 926 else:
932 927 cg = changegroup.getlocalchangegroup(pushop.repo, 'push', outgoing,
933 928 bundlecaps)
934 929
935 930 # apply changegroup to remote
936 931 if unbundle:
937 932 # local repo finds heads on server, finds out what
938 933 # revs it must push. once revs transferred, if server
939 934 # finds it has different heads (someone else won
940 935 # commit/push race), server aborts.
941 936 if pushop.force:
942 937 remoteheads = ['force']
943 938 else:
944 939 remoteheads = pushop.remoteheads
945 940 # ssh: return remote's addchangegroup()
946 941 # http: return remote's addchangegroup() or 0 for error
947 942 pushop.cgresult = pushop.remote.unbundle(cg, remoteheads,
948 943 pushop.repo.url())
949 944 else:
950 945 # we return an integer indicating remote head count
951 946 # change
952 947 pushop.cgresult = pushop.remote.addchangegroup(cg, 'push',
953 948 pushop.repo.url())
954 949
955 950 def _pushsyncphase(pushop):
956 951 """synchronise phase information locally and remotely"""
957 952 cheads = pushop.commonheads
958 953 # even when we don't push, exchanging phase data is useful
959 954 remotephases = pushop.remote.listkeys('phases')
960 955 if (pushop.ui.configbool('ui', '_usedassubrepo', False)
961 956 and remotephases # server supports phases
962 957 and pushop.cgresult is None # nothing was pushed
963 958 and remotephases.get('publishing', False)):
964 959 # When:
965 960 # - this is a subrepo push
966 961 # - and remote support phase
967 962 # - and no changeset was pushed
968 963 # - and remote is publishing
969 964 # We may be in issue 3871 case!
970 965 # We drop the possible phase synchronisation done by
971 966 # courtesy to publish changesets possibly locally draft
972 967 # on the remote.
973 968 remotephases = {'publishing': 'True'}
974 969 if not remotephases: # old server or public only reply from non-publishing
975 970 _localphasemove(pushop, cheads)
976 971 # don't push any phase data as there is nothing to push
977 972 else:
978 973 ana = phases.analyzeremotephases(pushop.repo, cheads,
979 974 remotephases)
980 975 pheads, droots = ana
981 976 ### Apply remote phase on local
982 977 if remotephases.get('publishing', False):
983 978 _localphasemove(pushop, cheads)
984 979 else: # publish = False
985 980 _localphasemove(pushop, pheads)
986 981 _localphasemove(pushop, cheads, phases.draft)
987 982 ### Apply local phase on remote
988 983
989 984 if pushop.cgresult:
990 985 if 'phases' in pushop.stepsdone:
991 986 # phases already pushed though bundle2
992 987 return
993 988 outdated = pushop.outdatedphases
994 989 else:
995 990 outdated = pushop.fallbackoutdatedphases
996 991
997 992 pushop.stepsdone.add('phases')
998 993
999 994 # filter heads already turned public by the push
1000 995 outdated = [c for c in outdated if c.node() not in pheads]
1001 996 # fallback to independent pushkey command
1002 997 for newremotehead in outdated:
1003 998 r = pushop.remote.pushkey('phases',
1004 999 newremotehead.hex(),
1005 1000 str(phases.draft),
1006 1001 str(phases.public))
1007 1002 if not r:
1008 1003 pushop.ui.warn(_('updating %s to public failed!\n')
1009 1004 % newremotehead)
1010 1005
1011 1006 def _localphasemove(pushop, nodes, phase=phases.public):
1012 1007 """move <nodes> to <phase> in the local source repo"""
1013 1008 if pushop.trmanager:
1014 1009 phases.advanceboundary(pushop.repo,
1015 1010 pushop.trmanager.transaction(),
1016 1011 phase,
1017 1012 nodes)
1018 1013 else:
1019 1014 # repo is not locked, do not change any phases!
1020 1015 # Informs the user that phases should have been moved when
1021 1016 # applicable.
1022 1017 actualmoves = [n for n in nodes if phase < pushop.repo[n].phase()]
1023 1018 phasestr = phases.phasenames[phase]
1024 1019 if actualmoves:
1025 1020 pushop.ui.status(_('cannot lock source repo, skipping '
1026 1021 'local %s phase update\n') % phasestr)
1027 1022
1028 1023 def _pushobsolete(pushop):
1029 1024 """utility function to push obsolete markers to a remote"""
1030 1025 if 'obsmarkers' in pushop.stepsdone:
1031 1026 return
1032 1027 repo = pushop.repo
1033 1028 remote = pushop.remote
1034 1029 pushop.stepsdone.add('obsmarkers')
1035 1030 if pushop.outobsmarkers:
1036 1031 pushop.ui.debug('try to push obsolete markers to remote\n')
1037 1032 rslts = []
1038 1033 remotedata = obsolete._pushkeyescape(sorted(pushop.outobsmarkers))
1039 1034 for key in sorted(remotedata, reverse=True):
1040 1035 # reverse sort to ensure we end with dump0
1041 1036 data = remotedata[key]
1042 1037 rslts.append(remote.pushkey('obsolete', key, '', data))
1043 1038 if [r for r in rslts if not r]:
1044 1039 msg = _('failed to push some obsolete markers!\n')
1045 1040 repo.ui.warn(msg)
1046 1041
1047 1042 def _pushbookmark(pushop):
1048 1043 """Update bookmark position on remote"""
1049 1044 if pushop.cgresult == 0 or 'bookmarks' in pushop.stepsdone:
1050 1045 return
1051 1046 pushop.stepsdone.add('bookmarks')
1052 1047 ui = pushop.ui
1053 1048 remote = pushop.remote
1054 1049
1055 1050 for b, old, new in pushop.outbookmarks:
1056 1051 action = 'update'
1057 1052 if not old:
1058 1053 action = 'export'
1059 1054 elif not new:
1060 1055 action = 'delete'
1061 1056 if remote.pushkey('bookmarks', b, old, new):
1062 1057 ui.status(bookmsgmap[action][0] % b)
1063 1058 else:
1064 1059 ui.warn(bookmsgmap[action][1] % b)
1065 1060 # discovery can have set the value form invalid entry
1066 1061 if pushop.bkresult is not None:
1067 1062 pushop.bkresult = 1
1068 1063
1069 1064 class pulloperation(object):
1070 1065 """A object that represent a single pull operation
1071 1066
1072 1067 It purpose is to carry pull related state and very common operation.
1073 1068
1074 1069 A new should be created at the beginning of each pull and discarded
1075 1070 afterward.
1076 1071 """
1077 1072
1078 1073 def __init__(self, repo, remote, heads=None, force=False, bookmarks=(),
1079 1074 remotebookmarks=None, streamclonerequested=None):
1080 1075 # repo we pull into
1081 1076 self.repo = repo
1082 1077 # repo we pull from
1083 1078 self.remote = remote
1084 1079 # revision we try to pull (None is "all")
1085 1080 self.heads = heads
1086 1081 # bookmark pulled explicitly
1087 1082 self.explicitbookmarks = [repo._bookmarks.expandname(bookmark)
1088 1083 for bookmark in bookmarks]
1089 1084 # do we force pull?
1090 1085 self.force = force
1091 1086 # whether a streaming clone was requested
1092 1087 self.streamclonerequested = streamclonerequested
1093 1088 # transaction manager
1094 1089 self.trmanager = None
1095 1090 # set of common changeset between local and remote before pull
1096 1091 self.common = None
1097 1092 # set of pulled head
1098 1093 self.rheads = None
1099 1094 # list of missing changeset to fetch remotely
1100 1095 self.fetch = None
1101 1096 # remote bookmarks data
1102 1097 self.remotebookmarks = remotebookmarks
1103 1098 # result of changegroup pulling (used as return code by pull)
1104 1099 self.cgresult = None
1105 1100 # list of step already done
1106 1101 self.stepsdone = set()
1107 1102 # Whether we attempted a clone from pre-generated bundles.
1108 1103 self.clonebundleattempted = False
1109 1104
1110 1105 @util.propertycache
1111 1106 def pulledsubset(self):
1112 1107 """heads of the set of changeset target by the pull"""
1113 1108 # compute target subset
1114 1109 if self.heads is None:
1115 1110 # We pulled every thing possible
1116 1111 # sync on everything common
1117 1112 c = set(self.common)
1118 1113 ret = list(self.common)
1119 1114 for n in self.rheads:
1120 1115 if n not in c:
1121 1116 ret.append(n)
1122 1117 return ret
1123 1118 else:
1124 1119 # We pulled a specific subset
1125 1120 # sync on this subset
1126 1121 return self.heads
1127 1122
1128 1123 @util.propertycache
1129 1124 def canusebundle2(self):
1130 1125 return not _forcebundle1(self)
1131 1126
1132 1127 @util.propertycache
1133 1128 def remotebundle2caps(self):
1134 1129 return bundle2.bundle2caps(self.remote)
1135 1130
1136 1131 def gettransaction(self):
1137 1132 # deprecated; talk to trmanager directly
1138 1133 return self.trmanager.transaction()
1139 1134
1140 1135 class transactionmanager(object):
1141 1136 """An object to manage the life cycle of a transaction
1142 1137
1143 1138 It creates the transaction on demand and calls the appropriate hooks when
1144 1139 closing the transaction."""
1145 1140 def __init__(self, repo, source, url):
1146 1141 self.repo = repo
1147 1142 self.source = source
1148 1143 self.url = url
1149 1144 self._tr = None
1150 1145
1151 1146 def transaction(self):
1152 1147 """Return an open transaction object, constructing if necessary"""
1153 1148 if not self._tr:
1154 1149 trname = '%s\n%s' % (self.source, util.hidepassword(self.url))
1155 1150 self._tr = self.repo.transaction(trname)
1156 1151 self._tr.hookargs['source'] = self.source
1157 1152 self._tr.hookargs['url'] = self.url
1158 1153 return self._tr
1159 1154
1160 1155 def close(self):
1161 1156 """close transaction if created"""
1162 1157 if self._tr is not None:
1163 1158 self._tr.close()
1164 1159
1165 1160 def release(self):
1166 1161 """release transaction if created"""
1167 1162 if self._tr is not None:
1168 1163 self._tr.release()
1169 1164
1170 1165 def pull(repo, remote, heads=None, force=False, bookmarks=(), opargs=None,
1171 1166 streamclonerequested=None):
1172 1167 """Fetch repository data from a remote.
1173 1168
1174 1169 This is the main function used to retrieve data from a remote repository.
1175 1170
1176 1171 ``repo`` is the local repository to clone into.
1177 1172 ``remote`` is a peer instance.
1178 1173 ``heads`` is an iterable of revisions we want to pull. ``None`` (the
1179 1174 default) means to pull everything from the remote.
1180 1175 ``bookmarks`` is an iterable of bookmarks requesting to be pulled. By
1181 1176 default, all remote bookmarks are pulled.
1182 1177 ``opargs`` are additional keyword arguments to pass to ``pulloperation``
1183 1178 initialization.
1184 1179 ``streamclonerequested`` is a boolean indicating whether a "streaming
1185 1180 clone" is requested. A "streaming clone" is essentially a raw file copy
1186 1181 of revlogs from the server. This only works when the local repository is
1187 1182 empty. The default value of ``None`` means to respect the server
1188 1183 configuration for preferring stream clones.
1189 1184
1190 1185 Returns the ``pulloperation`` created for this pull.
1191 1186 """
1192 1187 if opargs is None:
1193 1188 opargs = {}
1194 1189 pullop = pulloperation(repo, remote, heads, force, bookmarks=bookmarks,
1195 1190 streamclonerequested=streamclonerequested, **opargs)
1196 1191 if pullop.remote.local():
1197 1192 missing = set(pullop.remote.requirements) - pullop.repo.supported
1198 1193 if missing:
1199 1194 msg = _("required features are not"
1200 1195 " supported in the destination:"
1201 1196 " %s") % (', '.join(sorted(missing)))
1202 1197 raise error.Abort(msg)
1203 1198
1204 1199 wlock = lock = None
1205 1200 try:
1206 1201 wlock = pullop.repo.wlock()
1207 1202 lock = pullop.repo.lock()
1208 1203 pullop.trmanager = transactionmanager(repo, 'pull', remote.url())
1209 1204 streamclone.maybeperformlegacystreamclone(pullop)
1210 1205 # This should ideally be in _pullbundle2(). However, it needs to run
1211 1206 # before discovery to avoid extra work.
1212 1207 _maybeapplyclonebundle(pullop)
1213 1208 _pulldiscovery(pullop)
1214 1209 if pullop.canusebundle2:
1215 1210 _pullbundle2(pullop)
1216 1211 _pullchangeset(pullop)
1217 1212 _pullphase(pullop)
1218 1213 _pullbookmarks(pullop)
1219 1214 _pullobsolete(pullop)
1220 1215 pullop.trmanager.close()
1221 1216 finally:
1222 1217 lockmod.release(pullop.trmanager, lock, wlock)
1223 1218
1224 1219 return pullop
1225 1220
1226 1221 # list of steps to perform discovery before pull
1227 1222 pulldiscoveryorder = []
1228 1223
1229 1224 # Mapping between step name and function
1230 1225 #
1231 1226 # This exists to help extensions wrap steps if necessary
1232 1227 pulldiscoverymapping = {}
1233 1228
1234 1229 def pulldiscovery(stepname):
1235 1230 """decorator for function performing discovery before pull
1236 1231
1237 1232 The function is added to the step -> function mapping and appended to the
1238 1233 list of steps. Beware that decorated function will be added in order (this
1239 1234 may matter).
1240 1235
1241 1236 You can only use this decorator for a new step, if you want to wrap a step
1242 1237 from an extension, change the pulldiscovery dictionary directly."""
1243 1238 def dec(func):
1244 1239 assert stepname not in pulldiscoverymapping
1245 1240 pulldiscoverymapping[stepname] = func
1246 1241 pulldiscoveryorder.append(stepname)
1247 1242 return func
1248 1243 return dec
1249 1244
1250 1245 def _pulldiscovery(pullop):
1251 1246 """Run all discovery steps"""
1252 1247 for stepname in pulldiscoveryorder:
1253 1248 step = pulldiscoverymapping[stepname]
1254 1249 step(pullop)
1255 1250
1256 1251 @pulldiscovery('b1:bookmarks')
1257 1252 def _pullbookmarkbundle1(pullop):
1258 1253 """fetch bookmark data in bundle1 case
1259 1254
1260 1255 If not using bundle2, we have to fetch bookmarks before changeset
1261 1256 discovery to reduce the chance and impact of race conditions."""
1262 1257 if pullop.remotebookmarks is not None:
1263 1258 return
1264 1259 if pullop.canusebundle2 and 'listkeys' in pullop.remotebundle2caps:
1265 1260 # all known bundle2 servers now support listkeys, but lets be nice with
1266 1261 # new implementation.
1267 1262 return
1268 1263 pullop.remotebookmarks = pullop.remote.listkeys('bookmarks')
1269 1264
1270 1265
1271 1266 @pulldiscovery('changegroup')
1272 1267 def _pulldiscoverychangegroup(pullop):
1273 1268 """discovery phase for the pull
1274 1269
1275 1270 Current handle changeset discovery only, will change handle all discovery
1276 1271 at some point."""
1277 1272 tmp = discovery.findcommonincoming(pullop.repo,
1278 1273 pullop.remote,
1279 1274 heads=pullop.heads,
1280 1275 force=pullop.force)
1281 1276 common, fetch, rheads = tmp
1282 1277 nm = pullop.repo.unfiltered().changelog.nodemap
1283 1278 if fetch and rheads:
1284 1279 # If a remote heads in filtered locally, lets drop it from the unknown
1285 1280 # remote heads and put in back in common.
1286 1281 #
1287 1282 # This is a hackish solution to catch most of "common but locally
1288 1283 # hidden situation". We do not performs discovery on unfiltered
1289 1284 # repository because it end up doing a pathological amount of round
1290 1285 # trip for w huge amount of changeset we do not care about.
1291 1286 #
1292 1287 # If a set of such "common but filtered" changeset exist on the server
1293 1288 # but are not including a remote heads, we'll not be able to detect it,
1294 1289 scommon = set(common)
1295 1290 filteredrheads = []
1296 1291 for n in rheads:
1297 1292 if n in nm:
1298 1293 if n not in scommon:
1299 1294 common.append(n)
1300 1295 else:
1301 1296 filteredrheads.append(n)
1302 1297 if not filteredrheads:
1303 1298 fetch = []
1304 1299 rheads = filteredrheads
1305 1300 pullop.common = common
1306 1301 pullop.fetch = fetch
1307 1302 pullop.rheads = rheads
1308 1303
1309 1304 def _pullbundle2(pullop):
1310 1305 """pull data using bundle2
1311 1306
1312 1307 For now, the only supported data are changegroup."""
1313 1308 kwargs = {'bundlecaps': caps20to10(pullop.repo)}
1314 1309
1315 1310 streaming, streamreqs = streamclone.canperformstreamclone(pullop)
1316 1311
1317 1312 # pulling changegroup
1318 1313 pullop.stepsdone.add('changegroup')
1319 1314
1320 1315 kwargs['common'] = pullop.common
1321 1316 kwargs['heads'] = pullop.heads or pullop.rheads
1322 1317 kwargs['cg'] = pullop.fetch
1323 1318 if 'listkeys' in pullop.remotebundle2caps:
1324 1319 kwargs['listkeys'] = ['phases']
1325 1320 if pullop.remotebookmarks is None:
1326 1321 # make sure to always includes bookmark data when migrating
1327 1322 # `hg incoming --bundle` to using this function.
1328 1323 kwargs['listkeys'].append('bookmarks')
1329 1324
1330 1325 # If this is a full pull / clone and the server supports the clone bundles
1331 1326 # feature, tell the server whether we attempted a clone bundle. The
1332 1327 # presence of this flag indicates the client supports clone bundles. This
1333 1328 # will enable the server to treat clients that support clone bundles
1334 1329 # differently from those that don't.
1335 1330 if (pullop.remote.capable('clonebundles')
1336 1331 and pullop.heads is None and list(pullop.common) == [nullid]):
1337 1332 kwargs['cbattempted'] = pullop.clonebundleattempted
1338 1333
1339 1334 if streaming:
1340 1335 pullop.repo.ui.status(_('streaming all changes\n'))
1341 1336 elif not pullop.fetch:
1342 1337 pullop.repo.ui.status(_("no changes found\n"))
1343 1338 pullop.cgresult = 0
1344 1339 else:
1345 1340 if pullop.heads is None and list(pullop.common) == [nullid]:
1346 1341 pullop.repo.ui.status(_("requesting all changes\n"))
1347 1342 if obsolete.isenabled(pullop.repo, obsolete.exchangeopt):
1348 1343 remoteversions = bundle2.obsmarkersversion(pullop.remotebundle2caps)
1349 1344 if obsolete.commonversion(remoteversions) is not None:
1350 1345 kwargs['obsmarkers'] = True
1351 1346 pullop.stepsdone.add('obsmarkers')
1352 1347 _pullbundle2extraprepare(pullop, kwargs)
1353 1348 bundle = pullop.remote.getbundle('pull', **kwargs)
1354 1349 try:
1355 1350 op = bundle2.processbundle(pullop.repo, bundle, pullop.gettransaction)
1356 1351 except error.BundleValueError as exc:
1357 1352 raise error.Abort(_('missing support for %s') % exc)
1358 1353
1359 1354 if pullop.fetch:
1360 1355 results = [cg['return'] for cg in op.records['changegroup']]
1361 1356 pullop.cgresult = changegroup.combineresults(results)
1362 1357
1363 1358 # processing phases change
1364 1359 for namespace, value in op.records['listkeys']:
1365 1360 if namespace == 'phases':
1366 1361 _pullapplyphases(pullop, value)
1367 1362
1368 1363 # processing bookmark update
1369 1364 for namespace, value in op.records['listkeys']:
1370 1365 if namespace == 'bookmarks':
1371 1366 pullop.remotebookmarks = value
1372 1367
1373 1368 # bookmark data were either already there or pulled in the bundle
1374 1369 if pullop.remotebookmarks is not None:
1375 1370 _pullbookmarks(pullop)
1376 1371
1377 1372 def _pullbundle2extraprepare(pullop, kwargs):
1378 1373 """hook function so that extensions can extend the getbundle call"""
1379 1374 pass
1380 1375
1381 1376 def _pullchangeset(pullop):
1382 1377 """pull changeset from unbundle into the local repo"""
1383 1378 # We delay the open of the transaction as late as possible so we
1384 1379 # don't open transaction for nothing or you break future useful
1385 1380 # rollback call
1386 1381 if 'changegroup' in pullop.stepsdone:
1387 1382 return
1388 1383 pullop.stepsdone.add('changegroup')
1389 1384 if not pullop.fetch:
1390 1385 pullop.repo.ui.status(_("no changes found\n"))
1391 1386 pullop.cgresult = 0
1392 1387 return
1393 1388 pullop.gettransaction()
1394 1389 if pullop.heads is None and list(pullop.common) == [nullid]:
1395 1390 pullop.repo.ui.status(_("requesting all changes\n"))
1396 1391 elif pullop.heads is None and pullop.remote.capable('changegroupsubset'):
1397 1392 # issue1320, avoid a race if remote changed after discovery
1398 1393 pullop.heads = pullop.rheads
1399 1394
1400 1395 if pullop.remote.capable('getbundle'):
1401 1396 # TODO: get bundlecaps from remote
1402 1397 cg = pullop.remote.getbundle('pull', common=pullop.common,
1403 1398 heads=pullop.heads or pullop.rheads)
1404 1399 elif pullop.heads is None:
1405 1400 cg = pullop.remote.changegroup(pullop.fetch, 'pull')
1406 1401 elif not pullop.remote.capable('changegroupsubset'):
1407 1402 raise error.Abort(_("partial pull cannot be done because "
1408 1403 "other repository doesn't support "
1409 1404 "changegroupsubset."))
1410 1405 else:
1411 1406 cg = pullop.remote.changegroupsubset(pullop.fetch, pullop.heads, 'pull')
1412 1407 pullop.cgresult = cg.apply(pullop.repo, 'pull', pullop.remote.url())
1413 1408
1414 1409 def _pullphase(pullop):
1415 1410 # Get remote phases data from remote
1416 1411 if 'phases' in pullop.stepsdone:
1417 1412 return
1418 1413 remotephases = pullop.remote.listkeys('phases')
1419 1414 _pullapplyphases(pullop, remotephases)
1420 1415
1421 1416 def _pullapplyphases(pullop, remotephases):
1422 1417 """apply phase movement from observed remote state"""
1423 1418 if 'phases' in pullop.stepsdone:
1424 1419 return
1425 1420 pullop.stepsdone.add('phases')
1426 1421 publishing = bool(remotephases.get('publishing', False))
1427 1422 if remotephases and not publishing:
1428 1423 # remote is new and non-publishing
1429 1424 pheads, _dr = phases.analyzeremotephases(pullop.repo,
1430 1425 pullop.pulledsubset,
1431 1426 remotephases)
1432 1427 dheads = pullop.pulledsubset
1433 1428 else:
1434 1429 # Remote is old or publishing all common changesets
1435 1430 # should be seen as public
1436 1431 pheads = pullop.pulledsubset
1437 1432 dheads = []
1438 1433 unfi = pullop.repo.unfiltered()
1439 1434 phase = unfi._phasecache.phase
1440 1435 rev = unfi.changelog.nodemap.get
1441 1436 public = phases.public
1442 1437 draft = phases.draft
1443 1438
1444 1439 # exclude changesets already public locally and update the others
1445 1440 pheads = [pn for pn in pheads if phase(unfi, rev(pn)) > public]
1446 1441 if pheads:
1447 1442 tr = pullop.gettransaction()
1448 1443 phases.advanceboundary(pullop.repo, tr, public, pheads)
1449 1444
1450 1445 # exclude changesets already draft locally and update the others
1451 1446 dheads = [pn for pn in dheads if phase(unfi, rev(pn)) > draft]
1452 1447 if dheads:
1453 1448 tr = pullop.gettransaction()
1454 1449 phases.advanceboundary(pullop.repo, tr, draft, dheads)
1455 1450
1456 1451 def _pullbookmarks(pullop):
1457 1452 """process the remote bookmark information to update the local one"""
1458 1453 if 'bookmarks' in pullop.stepsdone:
1459 1454 return
1460 1455 pullop.stepsdone.add('bookmarks')
1461 1456 repo = pullop.repo
1462 1457 remotebookmarks = pullop.remotebookmarks
1463 1458 bookmod.updatefromremote(repo.ui, repo, remotebookmarks,
1464 1459 pullop.remote.url(),
1465 1460 pullop.gettransaction,
1466 1461 explicit=pullop.explicitbookmarks)
1467 1462
1468 1463 def _pullobsolete(pullop):
1469 1464 """utility function to pull obsolete markers from a remote
1470 1465
1471 1466 The `gettransaction` is function that return the pull transaction, creating
1472 1467 one if necessary. We return the transaction to inform the calling code that
1473 1468 a new transaction have been created (when applicable).
1474 1469
1475 1470 Exists mostly to allow overriding for experimentation purpose"""
1476 1471 if 'obsmarkers' in pullop.stepsdone:
1477 1472 return
1478 1473 pullop.stepsdone.add('obsmarkers')
1479 1474 tr = None
1480 1475 if obsolete.isenabled(pullop.repo, obsolete.exchangeopt):
1481 1476 pullop.repo.ui.debug('fetching remote obsolete markers\n')
1482 1477 remoteobs = pullop.remote.listkeys('obsolete')
1483 1478 if 'dump0' in remoteobs:
1484 1479 tr = pullop.gettransaction()
1485 1480 markers = []
1486 1481 for key in sorted(remoteobs, reverse=True):
1487 1482 if key.startswith('dump'):
1488 1483 data = base85.b85decode(remoteobs[key])
1489 1484 version, newmarks = obsolete._readmarkers(data)
1490 1485 markers += newmarks
1491 1486 if markers:
1492 1487 pullop.repo.obsstore.add(tr, markers)
1493 1488 pullop.repo.invalidatevolatilesets()
1494 1489 return tr
1495 1490
1496 1491 def caps20to10(repo):
1497 1492 """return a set with appropriate options to use bundle20 during getbundle"""
1498 1493 caps = set(['HG20'])
1499 1494 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
1500 1495 caps.add('bundle2=' + urlreq.quote(capsblob))
1501 1496 return caps
1502 1497
1503 1498 # List of names of steps to perform for a bundle2 for getbundle, order matters.
1504 1499 getbundle2partsorder = []
1505 1500
1506 1501 # Mapping between step name and function
1507 1502 #
1508 1503 # This exists to help extensions wrap steps if necessary
1509 1504 getbundle2partsmapping = {}
1510 1505
1511 1506 def getbundle2partsgenerator(stepname, idx=None):
1512 1507 """decorator for function generating bundle2 part for getbundle
1513 1508
1514 1509 The function is added to the step -> function mapping and appended to the
1515 1510 list of steps. Beware that decorated functions will be added in order
1516 1511 (this may matter).
1517 1512
1518 1513 You can only use this decorator for new steps, if you want to wrap a step
1519 1514 from an extension, attack the getbundle2partsmapping dictionary directly."""
1520 1515 def dec(func):
1521 1516 assert stepname not in getbundle2partsmapping
1522 1517 getbundle2partsmapping[stepname] = func
1523 1518 if idx is None:
1524 1519 getbundle2partsorder.append(stepname)
1525 1520 else:
1526 1521 getbundle2partsorder.insert(idx, stepname)
1527 1522 return func
1528 1523 return dec
1529 1524
1530 1525 def bundle2requested(bundlecaps):
1531 1526 if bundlecaps is not None:
1532 1527 return any(cap.startswith('HG2') for cap in bundlecaps)
1533 1528 return False
1534 1529
1535 1530 def getbundlechunks(repo, source, heads=None, common=None, bundlecaps=None,
1536 1531 **kwargs):
1537 1532 """Return chunks constituting a bundle's raw data.
1538 1533
1539 1534 Could be a bundle HG10 or a bundle HG20 depending on bundlecaps
1540 1535 passed.
1541 1536
1542 1537 Returns an iterator over raw chunks (of varying sizes).
1543 1538 """
1544 1539 usebundle2 = bundle2requested(bundlecaps)
1545 1540 # bundle10 case
1546 1541 if not usebundle2:
1547 1542 if bundlecaps and not kwargs.get('cg', True):
1548 1543 raise ValueError(_('request for bundle10 must include changegroup'))
1549 1544
1550 1545 if kwargs:
1551 1546 raise ValueError(_('unsupported getbundle arguments: %s')
1552 1547 % ', '.join(sorted(kwargs.keys())))
1553 1548 outgoing = _computeoutgoing(repo, heads, common)
1554 1549 bundler = changegroup.getbundler('01', repo, bundlecaps)
1555 1550 return changegroup.getsubsetraw(repo, outgoing, bundler, source)
1556 1551
1557 1552 # bundle20 case
1558 1553 b2caps = {}
1559 1554 for bcaps in bundlecaps:
1560 1555 if bcaps.startswith('bundle2='):
1561 1556 blob = urlreq.unquote(bcaps[len('bundle2='):])
1562 1557 b2caps.update(bundle2.decodecaps(blob))
1563 1558 bundler = bundle2.bundle20(repo.ui, b2caps)
1564 1559
1565 1560 kwargs['heads'] = heads
1566 1561 kwargs['common'] = common
1567 1562
1568 1563 for name in getbundle2partsorder:
1569 1564 func = getbundle2partsmapping[name]
1570 1565 func(bundler, repo, source, bundlecaps=bundlecaps, b2caps=b2caps,
1571 1566 **kwargs)
1572 1567
1573 1568 return bundler.getchunks()
1574 1569
1575 1570 @getbundle2partsgenerator('changegroup')
1576 1571 def _getbundlechangegrouppart(bundler, repo, source, bundlecaps=None,
1577 1572 b2caps=None, heads=None, common=None, **kwargs):
1578 1573 """add a changegroup part to the requested bundle"""
1579 1574 cg = None
1580 1575 if kwargs.get('cg', True):
1581 1576 # build changegroup bundle here.
1582 1577 version = '01'
1583 1578 cgversions = b2caps.get('changegroup')
1584 1579 if cgversions: # 3.1 and 3.2 ship with an empty value
1585 1580 cgversions = [v for v in cgversions
1586 1581 if v in changegroup.supportedoutgoingversions(repo)]
1587 1582 if not cgversions:
1588 1583 raise ValueError(_('no common changegroup version'))
1589 1584 version = max(cgversions)
1590 1585 outgoing = _computeoutgoing(repo, heads, common)
1591 1586 cg = changegroup.getlocalchangegroupraw(repo, source, outgoing,
1592 1587 bundlecaps=bundlecaps,
1593 1588 version=version)
1594 1589
1595 1590 if cg:
1596 1591 part = bundler.newpart('changegroup', data=cg)
1597 1592 if cgversions:
1598 1593 part.addparam('version', version)
1599 1594 part.addparam('nbchanges', str(len(outgoing.missing)), mandatory=False)
1600 1595 if 'treemanifest' in repo.requirements:
1601 1596 part.addparam('treemanifest', '1')
1602 1597
1603 1598 @getbundle2partsgenerator('listkeys')
1604 1599 def _getbundlelistkeysparts(bundler, repo, source, bundlecaps=None,
1605 1600 b2caps=None, **kwargs):
1606 1601 """add parts containing listkeys namespaces to the requested bundle"""
1607 1602 listkeys = kwargs.get('listkeys', ())
1608 1603 for namespace in listkeys:
1609 1604 part = bundler.newpart('listkeys')
1610 1605 part.addparam('namespace', namespace)
1611 1606 keys = repo.listkeys(namespace).items()
1612 1607 part.data = pushkey.encodekeys(keys)
1613 1608
1614 1609 @getbundle2partsgenerator('obsmarkers')
1615 1610 def _getbundleobsmarkerpart(bundler, repo, source, bundlecaps=None,
1616 1611 b2caps=None, heads=None, **kwargs):
1617 1612 """add an obsolescence markers part to the requested bundle"""
1618 1613 if kwargs.get('obsmarkers', False):
1619 1614 if heads is None:
1620 1615 heads = repo.heads()
1621 1616 subset = [c.node() for c in repo.set('::%ln', heads)]
1622 1617 markers = repo.obsstore.relevantmarkers(subset)
1623 1618 markers = sorted(markers)
1624 1619 buildobsmarkerspart(bundler, markers)
1625 1620
1626 1621 @getbundle2partsgenerator('hgtagsfnodes')
1627 1622 def _getbundletagsfnodes(bundler, repo, source, bundlecaps=None,
1628 1623 b2caps=None, heads=None, common=None,
1629 1624 **kwargs):
1630 1625 """Transfer the .hgtags filenodes mapping.
1631 1626
1632 1627 Only values for heads in this bundle will be transferred.
1633 1628
1634 1629 The part data consists of pairs of 20 byte changeset node and .hgtags
1635 1630 filenodes raw values.
1636 1631 """
1637 1632 # Don't send unless:
1638 1633 # - changeset are being exchanged,
1639 1634 # - the client supports it.
1640 1635 if not (kwargs.get('cg', True) and 'hgtagsfnodes' in b2caps):
1641 1636 return
1642 1637
1643 1638 outgoing = _computeoutgoing(repo, heads, common)
1644 1639
1645 1640 if not outgoing.missingheads:
1646 1641 return
1647 1642
1648 1643 cache = tags.hgtagsfnodescache(repo.unfiltered())
1649 1644 chunks = []
1650 1645
1651 1646 # .hgtags fnodes are only relevant for head changesets. While we could
1652 1647 # transfer values for all known nodes, there will likely be little to
1653 1648 # no benefit.
1654 1649 #
1655 1650 # We don't bother using a generator to produce output data because
1656 1651 # a) we only have 40 bytes per head and even esoteric numbers of heads
1657 1652 # consume little memory (1M heads is 40MB) b) we don't want to send the
1658 1653 # part if we don't have entries and knowing if we have entries requires
1659 1654 # cache lookups.
1660 1655 for node in outgoing.missingheads:
1661 1656 # Don't compute missing, as this may slow down serving.
1662 1657 fnode = cache.getfnode(node, computemissing=False)
1663 1658 if fnode is not None:
1664 1659 chunks.extend([node, fnode])
1665 1660
1666 1661 if chunks:
1667 1662 bundler.newpart('hgtagsfnodes', data=''.join(chunks))
1668 1663
1669 1664 def check_heads(repo, their_heads, context):
1670 1665 """check if the heads of a repo have been modified
1671 1666
1672 1667 Used by peer for unbundling.
1673 1668 """
1674 1669 heads = repo.heads()
1675 1670 heads_hash = hashlib.sha1(''.join(sorted(heads))).digest()
1676 1671 if not (their_heads == ['force'] or their_heads == heads or
1677 1672 their_heads == ['hashed', heads_hash]):
1678 1673 # someone else committed/pushed/unbundled while we
1679 1674 # were transferring data
1680 1675 raise error.PushRaced('repository changed while %s - '
1681 1676 'please try again' % context)
1682 1677
1683 1678 def unbundle(repo, cg, heads, source, url):
1684 1679 """Apply a bundle to a repo.
1685 1680
1686 1681 this function makes sure the repo is locked during the application and have
1687 1682 mechanism to check that no push race occurred between the creation of the
1688 1683 bundle and its application.
1689 1684
1690 1685 If the push was raced as PushRaced exception is raised."""
1691 1686 r = 0
1692 1687 # need a transaction when processing a bundle2 stream
1693 1688 # [wlock, lock, tr] - needs to be an array so nested functions can modify it
1694 1689 lockandtr = [None, None, None]
1695 1690 recordout = None
1696 1691 # quick fix for output mismatch with bundle2 in 3.4
1697 1692 captureoutput = repo.ui.configbool('experimental', 'bundle2-output-capture',
1698 1693 False)
1699 1694 if url.startswith('remote:http:') or url.startswith('remote:https:'):
1700 1695 captureoutput = True
1701 1696 try:
1702 1697 check_heads(repo, heads, 'uploading changes')
1703 1698 # push can proceed
1704 1699 if util.safehasattr(cg, 'params'):
1705 1700 r = None
1706 1701 try:
1707 1702 def gettransaction():
1708 1703 if not lockandtr[2]:
1709 1704 lockandtr[0] = repo.wlock()
1710 1705 lockandtr[1] = repo.lock()
1711 1706 lockandtr[2] = repo.transaction(source)
1712 1707 lockandtr[2].hookargs['source'] = source
1713 1708 lockandtr[2].hookargs['url'] = url
1714 1709 lockandtr[2].hookargs['bundle2'] = '1'
1715 1710 return lockandtr[2]
1716 1711
1717 1712 # Do greedy locking by default until we're satisfied with lazy
1718 1713 # locking.
1719 1714 if not repo.ui.configbool('experimental', 'bundle2lazylocking'):
1720 1715 gettransaction()
1721 1716
1722 1717 op = bundle2.bundleoperation(repo, gettransaction,
1723 1718 captureoutput=captureoutput)
1724 1719 try:
1725 1720 op = bundle2.processbundle(repo, cg, op=op)
1726 1721 finally:
1727 1722 r = op.reply
1728 1723 if captureoutput and r is not None:
1729 1724 repo.ui.pushbuffer(error=True, subproc=True)
1730 1725 def recordout(output):
1731 1726 r.newpart('output', data=output, mandatory=False)
1732 1727 if lockandtr[2] is not None:
1733 1728 lockandtr[2].close()
1734 1729 except BaseException as exc:
1735 1730 exc.duringunbundle2 = True
1736 1731 if captureoutput and r is not None:
1737 1732 parts = exc._bundle2salvagedoutput = r.salvageoutput()
1738 1733 def recordout(output):
1739 1734 part = bundle2.bundlepart('output', data=output,
1740 1735 mandatory=False)
1741 1736 parts.append(part)
1742 1737 raise
1743 1738 else:
1744 1739 lockandtr[1] = repo.lock()
1745 1740 r = cg.apply(repo, source, url)
1746 1741 finally:
1747 1742 lockmod.release(lockandtr[2], lockandtr[1], lockandtr[0])
1748 1743 if recordout is not None:
1749 1744 recordout(repo.ui.popbuffer())
1750 1745 return r
1751 1746
1752 1747 def _maybeapplyclonebundle(pullop):
1753 1748 """Apply a clone bundle from a remote, if possible."""
1754 1749
1755 1750 repo = pullop.repo
1756 1751 remote = pullop.remote
1757 1752
1758 1753 if not repo.ui.configbool('ui', 'clonebundles', True):
1759 1754 return
1760 1755
1761 1756 # Only run if local repo is empty.
1762 1757 if len(repo):
1763 1758 return
1764 1759
1765 1760 if pullop.heads:
1766 1761 return
1767 1762
1768 1763 if not remote.capable('clonebundles'):
1769 1764 return
1770 1765
1771 1766 res = remote._call('clonebundles')
1772 1767
1773 1768 # If we call the wire protocol command, that's good enough to record the
1774 1769 # attempt.
1775 1770 pullop.clonebundleattempted = True
1776 1771
1777 1772 entries = parseclonebundlesmanifest(repo, res)
1778 1773 if not entries:
1779 1774 repo.ui.note(_('no clone bundles available on remote; '
1780 1775 'falling back to regular clone\n'))
1781 1776 return
1782 1777
1783 1778 entries = filterclonebundleentries(repo, entries)
1784 1779 if not entries:
1785 1780 # There is a thundering herd concern here. However, if a server
1786 1781 # operator doesn't advertise bundles appropriate for its clients,
1787 1782 # they deserve what's coming. Furthermore, from a client's
1788 1783 # perspective, no automatic fallback would mean not being able to
1789 1784 # clone!
1790 1785 repo.ui.warn(_('no compatible clone bundles available on server; '
1791 1786 'falling back to regular clone\n'))
1792 1787 repo.ui.warn(_('(you may want to report this to the server '
1793 1788 'operator)\n'))
1794 1789 return
1795 1790
1796 1791 entries = sortclonebundleentries(repo.ui, entries)
1797 1792
1798 1793 url = entries[0]['URL']
1799 1794 repo.ui.status(_('applying clone bundle from %s\n') % url)
1800 1795 if trypullbundlefromurl(repo.ui, repo, url):
1801 1796 repo.ui.status(_('finished applying clone bundle\n'))
1802 1797 # Bundle failed.
1803 1798 #
1804 1799 # We abort by default to avoid the thundering herd of
1805 1800 # clients flooding a server that was expecting expensive
1806 1801 # clone load to be offloaded.
1807 1802 elif repo.ui.configbool('ui', 'clonebundlefallback', False):
1808 1803 repo.ui.warn(_('falling back to normal clone\n'))
1809 1804 else:
1810 1805 raise error.Abort(_('error applying bundle'),
1811 1806 hint=_('if this error persists, consider contacting '
1812 1807 'the server operator or disable clone '
1813 1808 'bundles via '
1814 1809 '"--config ui.clonebundles=false"'))
1815 1810
1816 1811 def parseclonebundlesmanifest(repo, s):
1817 1812 """Parses the raw text of a clone bundles manifest.
1818 1813
1819 1814 Returns a list of dicts. The dicts have a ``URL`` key corresponding
1820 1815 to the URL and other keys are the attributes for the entry.
1821 1816 """
1822 1817 m = []
1823 1818 for line in s.splitlines():
1824 1819 fields = line.split()
1825 1820 if not fields:
1826 1821 continue
1827 1822 attrs = {'URL': fields[0]}
1828 1823 for rawattr in fields[1:]:
1829 1824 key, value = rawattr.split('=', 1)
1830 1825 key = urlreq.unquote(key)
1831 1826 value = urlreq.unquote(value)
1832 1827 attrs[key] = value
1833 1828
1834 1829 # Parse BUNDLESPEC into components. This makes client-side
1835 1830 # preferences easier to specify since you can prefer a single
1836 1831 # component of the BUNDLESPEC.
1837 1832 if key == 'BUNDLESPEC':
1838 1833 try:
1839 1834 comp, version, params = parsebundlespec(repo, value,
1840 1835 externalnames=True)
1841 1836 attrs['COMPRESSION'] = comp
1842 1837 attrs['VERSION'] = version
1843 1838 except error.InvalidBundleSpecification:
1844 1839 pass
1845 1840 except error.UnsupportedBundleSpecification:
1846 1841 pass
1847 1842
1848 1843 m.append(attrs)
1849 1844
1850 1845 return m
1851 1846
1852 1847 def filterclonebundleentries(repo, entries):
1853 1848 """Remove incompatible clone bundle manifest entries.
1854 1849
1855 1850 Accepts a list of entries parsed with ``parseclonebundlesmanifest``
1856 1851 and returns a new list consisting of only the entries that this client
1857 1852 should be able to apply.
1858 1853
1859 1854 There is no guarantee we'll be able to apply all returned entries because
1860 1855 the metadata we use to filter on may be missing or wrong.
1861 1856 """
1862 1857 newentries = []
1863 1858 for entry in entries:
1864 1859 spec = entry.get('BUNDLESPEC')
1865 1860 if spec:
1866 1861 try:
1867 1862 parsebundlespec(repo, spec, strict=True)
1868 1863 except error.InvalidBundleSpecification as e:
1869 1864 repo.ui.debug(str(e) + '\n')
1870 1865 continue
1871 1866 except error.UnsupportedBundleSpecification as e:
1872 1867 repo.ui.debug('filtering %s because unsupported bundle '
1873 1868 'spec: %s\n' % (entry['URL'], str(e)))
1874 1869 continue
1875 1870
1876 1871 if 'REQUIRESNI' in entry and not sslutil.hassni:
1877 1872 repo.ui.debug('filtering %s because SNI not supported\n' %
1878 1873 entry['URL'])
1879 1874 continue
1880 1875
1881 1876 newentries.append(entry)
1882 1877
1883 1878 return newentries
1884 1879
1885 1880 def sortclonebundleentries(ui, entries):
1886 1881 prefers = ui.configlist('ui', 'clonebundleprefers', default=[])
1887 1882 if not prefers:
1888 1883 return list(entries)
1889 1884
1890 1885 prefers = [p.split('=', 1) for p in prefers]
1891 1886
1892 1887 # Our sort function.
1893 1888 def compareentry(a, b):
1894 1889 for prefkey, prefvalue in prefers:
1895 1890 avalue = a.get(prefkey)
1896 1891 bvalue = b.get(prefkey)
1897 1892
1898 1893 # Special case for b missing attribute and a matches exactly.
1899 1894 if avalue is not None and bvalue is None and avalue == prefvalue:
1900 1895 return -1
1901 1896
1902 1897 # Special case for a missing attribute and b matches exactly.
1903 1898 if bvalue is not None and avalue is None and bvalue == prefvalue:
1904 1899 return 1
1905 1900
1906 1901 # We can't compare unless attribute present on both.
1907 1902 if avalue is None or bvalue is None:
1908 1903 continue
1909 1904
1910 1905 # Same values should fall back to next attribute.
1911 1906 if avalue == bvalue:
1912 1907 continue
1913 1908
1914 1909 # Exact matches come first.
1915 1910 if avalue == prefvalue:
1916 1911 return -1
1917 1912 if bvalue == prefvalue:
1918 1913 return 1
1919 1914
1920 1915 # Fall back to next attribute.
1921 1916 continue
1922 1917
1923 1918 # If we got here we couldn't sort by attributes and prefers. Fall
1924 1919 # back to index order.
1925 1920 return 0
1926 1921
1927 1922 return sorted(entries, cmp=compareentry)
1928 1923
1929 1924 def trypullbundlefromurl(ui, repo, url):
1930 1925 """Attempt to apply a bundle from a URL."""
1931 1926 lock = repo.lock()
1932 1927 try:
1933 1928 tr = repo.transaction('bundleurl')
1934 1929 try:
1935 1930 try:
1936 1931 fh = urlmod.open(ui, url)
1937 1932 cg = readbundle(ui, fh, 'stream')
1938 1933
1939 1934 if isinstance(cg, bundle2.unbundle20):
1940 1935 bundle2.processbundle(repo, cg, lambda: tr)
1941 1936 elif isinstance(cg, streamclone.streamcloneapplier):
1942 1937 cg.apply(repo)
1943 1938 else:
1944 1939 cg.apply(repo, 'clonebundles', url)
1945 1940 tr.close()
1946 1941 return True
1947 1942 except urlerr.httperror as e:
1948 1943 ui.warn(_('HTTP error fetching bundle: %s\n') % str(e))
1949 1944 except urlerr.urlerror as e:
1950 1945 ui.warn(_('error fetching bundle: %s\n') % e.reason[1])
1951 1946
1952 1947 return False
1953 1948 finally:
1954 1949 tr.release()
1955 1950 finally:
1956 1951 lock.release()
General Comments 0
You need to be logged in to leave comments. Login now