##// END OF EJS Templates
streamclone: add support for cloning non append-only file...
Boris Feld -
r35783:56c30b31 default
parent child Browse files
Show More
@@ -1,2234 +1,2235 b''
1 1 # exchange.py - utility to exchange data between repos.
2 2 #
3 3 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import collections
11 11 import errno
12 12 import hashlib
13 13
14 14 from .i18n import _
15 15 from .node import (
16 16 bin,
17 17 hex,
18 18 nullid,
19 19 )
20 20 from . import (
21 21 bookmarks as bookmod,
22 22 bundle2,
23 23 changegroup,
24 24 discovery,
25 25 error,
26 26 lock as lockmod,
27 27 logexchange,
28 28 obsolete,
29 29 phases,
30 30 pushkey,
31 31 pycompat,
32 32 scmutil,
33 33 sslutil,
34 34 streamclone,
35 35 url as urlmod,
36 36 util,
37 37 )
38 38
39 39 urlerr = util.urlerr
40 40 urlreq = util.urlreq
41 41
42 42 # Maps bundle version human names to changegroup versions.
43 43 _bundlespeccgversions = {'v1': '01',
44 44 'v2': '02',
45 45 'packed1': 's1',
46 46 'bundle2': '02', #legacy
47 47 }
48 48
49 49 # Compression engines allowed in version 1. THIS SHOULD NEVER CHANGE.
50 50 _bundlespecv1compengines = {'gzip', 'bzip2', 'none'}
51 51
52 52 def parsebundlespec(repo, spec, strict=True, externalnames=False):
53 53 """Parse a bundle string specification into parts.
54 54
55 55 Bundle specifications denote a well-defined bundle/exchange format.
56 56 The content of a given specification should not change over time in
57 57 order to ensure that bundles produced by a newer version of Mercurial are
58 58 readable from an older version.
59 59
60 60 The string currently has the form:
61 61
62 62 <compression>-<type>[;<parameter0>[;<parameter1>]]
63 63
64 64 Where <compression> is one of the supported compression formats
65 65 and <type> is (currently) a version string. A ";" can follow the type and
66 66 all text afterwards is interpreted as URI encoded, ";" delimited key=value
67 67 pairs.
68 68
69 69 If ``strict`` is True (the default) <compression> is required. Otherwise,
70 70 it is optional.
71 71
72 72 If ``externalnames`` is False (the default), the human-centric names will
73 73 be converted to their internal representation.
74 74
75 75 Returns a 3-tuple of (compression, version, parameters). Compression will
76 76 be ``None`` if not in strict mode and a compression isn't defined.
77 77
78 78 An ``InvalidBundleSpecification`` is raised when the specification is
79 79 not syntactically well formed.
80 80
81 81 An ``UnsupportedBundleSpecification`` is raised when the compression or
82 82 bundle type/version is not recognized.
83 83
84 84 Note: this function will likely eventually return a more complex data
85 85 structure, including bundle2 part information.
86 86 """
87 87 def parseparams(s):
88 88 if ';' not in s:
89 89 return s, {}
90 90
91 91 params = {}
92 92 version, paramstr = s.split(';', 1)
93 93
94 94 for p in paramstr.split(';'):
95 95 if '=' not in p:
96 96 raise error.InvalidBundleSpecification(
97 97 _('invalid bundle specification: '
98 98 'missing "=" in parameter: %s') % p)
99 99
100 100 key, value = p.split('=', 1)
101 101 key = urlreq.unquote(key)
102 102 value = urlreq.unquote(value)
103 103 params[key] = value
104 104
105 105 return version, params
106 106
107 107
108 108 if strict and '-' not in spec:
109 109 raise error.InvalidBundleSpecification(
110 110 _('invalid bundle specification; '
111 111 'must be prefixed with compression: %s') % spec)
112 112
113 113 if '-' in spec:
114 114 compression, version = spec.split('-', 1)
115 115
116 116 if compression not in util.compengines.supportedbundlenames:
117 117 raise error.UnsupportedBundleSpecification(
118 118 _('%s compression is not supported') % compression)
119 119
120 120 version, params = parseparams(version)
121 121
122 122 if version not in _bundlespeccgversions:
123 123 raise error.UnsupportedBundleSpecification(
124 124 _('%s is not a recognized bundle version') % version)
125 125 else:
126 126 # Value could be just the compression or just the version, in which
127 127 # case some defaults are assumed (but only when not in strict mode).
128 128 assert not strict
129 129
130 130 spec, params = parseparams(spec)
131 131
132 132 if spec in util.compengines.supportedbundlenames:
133 133 compression = spec
134 134 version = 'v1'
135 135 # Generaldelta repos require v2.
136 136 if 'generaldelta' in repo.requirements:
137 137 version = 'v2'
138 138 # Modern compression engines require v2.
139 139 if compression not in _bundlespecv1compengines:
140 140 version = 'v2'
141 141 elif spec in _bundlespeccgversions:
142 142 if spec == 'packed1':
143 143 compression = 'none'
144 144 else:
145 145 compression = 'bzip2'
146 146 version = spec
147 147 else:
148 148 raise error.UnsupportedBundleSpecification(
149 149 _('%s is not a recognized bundle specification') % spec)
150 150
151 151 # Bundle version 1 only supports a known set of compression engines.
152 152 if version == 'v1' and compression not in _bundlespecv1compengines:
153 153 raise error.UnsupportedBundleSpecification(
154 154 _('compression engine %s is not supported on v1 bundles') %
155 155 compression)
156 156
157 157 # The specification for packed1 can optionally declare the data formats
158 158 # required to apply it. If we see this metadata, compare against what the
159 159 # repo supports and error if the bundle isn't compatible.
160 160 if version == 'packed1' and 'requirements' in params:
161 161 requirements = set(params['requirements'].split(','))
162 162 missingreqs = requirements - repo.supportedformats
163 163 if missingreqs:
164 164 raise error.UnsupportedBundleSpecification(
165 165 _('missing support for repository features: %s') %
166 166 ', '.join(sorted(missingreqs)))
167 167
168 168 if not externalnames:
169 169 engine = util.compengines.forbundlename(compression)
170 170 compression = engine.bundletype()[1]
171 171 version = _bundlespeccgversions[version]
172 172 return compression, version, params
173 173
174 174 def readbundle(ui, fh, fname, vfs=None):
175 175 header = changegroup.readexactly(fh, 4)
176 176
177 177 alg = None
178 178 if not fname:
179 179 fname = "stream"
180 180 if not header.startswith('HG') and header.startswith('\0'):
181 181 fh = changegroup.headerlessfixup(fh, header)
182 182 header = "HG10"
183 183 alg = 'UN'
184 184 elif vfs:
185 185 fname = vfs.join(fname)
186 186
187 187 magic, version = header[0:2], header[2:4]
188 188
189 189 if magic != 'HG':
190 190 raise error.Abort(_('%s: not a Mercurial bundle') % fname)
191 191 if version == '10':
192 192 if alg is None:
193 193 alg = changegroup.readexactly(fh, 2)
194 194 return changegroup.cg1unpacker(fh, alg)
195 195 elif version.startswith('2'):
196 196 return bundle2.getunbundler(ui, fh, magicstring=magic + version)
197 197 elif version == 'S1':
198 198 return streamclone.streamcloneapplier(fh)
199 199 else:
200 200 raise error.Abort(_('%s: unknown bundle version %s') % (fname, version))
201 201
202 202 def getbundlespec(ui, fh):
203 203 """Infer the bundlespec from a bundle file handle.
204 204
205 205 The input file handle is seeked and the original seek position is not
206 206 restored.
207 207 """
208 208 def speccompression(alg):
209 209 try:
210 210 return util.compengines.forbundletype(alg).bundletype()[0]
211 211 except KeyError:
212 212 return None
213 213
214 214 b = readbundle(ui, fh, None)
215 215 if isinstance(b, changegroup.cg1unpacker):
216 216 alg = b._type
217 217 if alg == '_truncatedBZ':
218 218 alg = 'BZ'
219 219 comp = speccompression(alg)
220 220 if not comp:
221 221 raise error.Abort(_('unknown compression algorithm: %s') % alg)
222 222 return '%s-v1' % comp
223 223 elif isinstance(b, bundle2.unbundle20):
224 224 if 'Compression' in b.params:
225 225 comp = speccompression(b.params['Compression'])
226 226 if not comp:
227 227 raise error.Abort(_('unknown compression algorithm: %s') % comp)
228 228 else:
229 229 comp = 'none'
230 230
231 231 version = None
232 232 for part in b.iterparts():
233 233 if part.type == 'changegroup':
234 234 version = part.params['version']
235 235 if version in ('01', '02'):
236 236 version = 'v2'
237 237 else:
238 238 raise error.Abort(_('changegroup version %s does not have '
239 239 'a known bundlespec') % version,
240 240 hint=_('try upgrading your Mercurial '
241 241 'client'))
242 242
243 243 if not version:
244 244 raise error.Abort(_('could not identify changegroup version in '
245 245 'bundle'))
246 246
247 247 return '%s-%s' % (comp, version)
248 248 elif isinstance(b, streamclone.streamcloneapplier):
249 249 requirements = streamclone.readbundle1header(fh)[2]
250 250 params = 'requirements=%s' % ','.join(sorted(requirements))
251 251 return 'none-packed1;%s' % urlreq.quote(params)
252 252 else:
253 253 raise error.Abort(_('unknown bundle type: %s') % b)
254 254
255 255 def _computeoutgoing(repo, heads, common):
256 256 """Computes which revs are outgoing given a set of common
257 257 and a set of heads.
258 258
259 259 This is a separate function so extensions can have access to
260 260 the logic.
261 261
262 262 Returns a discovery.outgoing object.
263 263 """
264 264 cl = repo.changelog
265 265 if common:
266 266 hasnode = cl.hasnode
267 267 common = [n for n in common if hasnode(n)]
268 268 else:
269 269 common = [nullid]
270 270 if not heads:
271 271 heads = cl.heads()
272 272 return discovery.outgoing(repo, common, heads)
273 273
274 274 def _forcebundle1(op):
275 275 """return true if a pull/push must use bundle1
276 276
277 277 This function is used to allow testing of the older bundle version"""
278 278 ui = op.repo.ui
279 279 forcebundle1 = False
280 280 # The goal is this config is to allow developer to choose the bundle
281 281 # version used during exchanged. This is especially handy during test.
282 282 # Value is a list of bundle version to be picked from, highest version
283 283 # should be used.
284 284 #
285 285 # developer config: devel.legacy.exchange
286 286 exchange = ui.configlist('devel', 'legacy.exchange')
287 287 forcebundle1 = 'bundle2' not in exchange and 'bundle1' in exchange
288 288 return forcebundle1 or not op.remote.capable('bundle2')
289 289
290 290 class pushoperation(object):
291 291 """A object that represent a single push operation
292 292
293 293 Its purpose is to carry push related state and very common operations.
294 294
295 295 A new pushoperation should be created at the beginning of each push and
296 296 discarded afterward.
297 297 """
298 298
299 299 def __init__(self, repo, remote, force=False, revs=None, newbranch=False,
300 300 bookmarks=(), pushvars=None):
301 301 # repo we push from
302 302 self.repo = repo
303 303 self.ui = repo.ui
304 304 # repo we push to
305 305 self.remote = remote
306 306 # force option provided
307 307 self.force = force
308 308 # revs to be pushed (None is "all")
309 309 self.revs = revs
310 310 # bookmark explicitly pushed
311 311 self.bookmarks = bookmarks
312 312 # allow push of new branch
313 313 self.newbranch = newbranch
314 314 # step already performed
315 315 # (used to check what steps have been already performed through bundle2)
316 316 self.stepsdone = set()
317 317 # Integer version of the changegroup push result
318 318 # - None means nothing to push
319 319 # - 0 means HTTP error
320 320 # - 1 means we pushed and remote head count is unchanged *or*
321 321 # we have outgoing changesets but refused to push
322 322 # - other values as described by addchangegroup()
323 323 self.cgresult = None
324 324 # Boolean value for the bookmark push
325 325 self.bkresult = None
326 326 # discover.outgoing object (contains common and outgoing data)
327 327 self.outgoing = None
328 328 # all remote topological heads before the push
329 329 self.remoteheads = None
330 330 # Details of the remote branch pre and post push
331 331 #
332 332 # mapping: {'branch': ([remoteheads],
333 333 # [newheads],
334 334 # [unsyncedheads],
335 335 # [discardedheads])}
336 336 # - branch: the branch name
337 337 # - remoteheads: the list of remote heads known locally
338 338 # None if the branch is new
339 339 # - newheads: the new remote heads (known locally) with outgoing pushed
340 340 # - unsyncedheads: the list of remote heads unknown locally.
341 341 # - discardedheads: the list of remote heads made obsolete by the push
342 342 self.pushbranchmap = None
343 343 # testable as a boolean indicating if any nodes are missing locally.
344 344 self.incoming = None
345 345 # summary of the remote phase situation
346 346 self.remotephases = None
347 347 # phases changes that must be pushed along side the changesets
348 348 self.outdatedphases = None
349 349 # phases changes that must be pushed if changeset push fails
350 350 self.fallbackoutdatedphases = None
351 351 # outgoing obsmarkers
352 352 self.outobsmarkers = set()
353 353 # outgoing bookmarks
354 354 self.outbookmarks = []
355 355 # transaction manager
356 356 self.trmanager = None
357 357 # map { pushkey partid -> callback handling failure}
358 358 # used to handle exception from mandatory pushkey part failure
359 359 self.pkfailcb = {}
360 360 # an iterable of pushvars or None
361 361 self.pushvars = pushvars
362 362
363 363 @util.propertycache
364 364 def futureheads(self):
365 365 """future remote heads if the changeset push succeeds"""
366 366 return self.outgoing.missingheads
367 367
368 368 @util.propertycache
369 369 def fallbackheads(self):
370 370 """future remote heads if the changeset push fails"""
371 371 if self.revs is None:
372 372 # not target to push, all common are relevant
373 373 return self.outgoing.commonheads
374 374 unfi = self.repo.unfiltered()
375 375 # I want cheads = heads(::missingheads and ::commonheads)
376 376 # (missingheads is revs with secret changeset filtered out)
377 377 #
378 378 # This can be expressed as:
379 379 # cheads = ( (missingheads and ::commonheads)
380 380 # + (commonheads and ::missingheads))"
381 381 # )
382 382 #
383 383 # while trying to push we already computed the following:
384 384 # common = (::commonheads)
385 385 # missing = ((commonheads::missingheads) - commonheads)
386 386 #
387 387 # We can pick:
388 388 # * missingheads part of common (::commonheads)
389 389 common = self.outgoing.common
390 390 nm = self.repo.changelog.nodemap
391 391 cheads = [node for node in self.revs if nm[node] in common]
392 392 # and
393 393 # * commonheads parents on missing
394 394 revset = unfi.set('%ln and parents(roots(%ln))',
395 395 self.outgoing.commonheads,
396 396 self.outgoing.missing)
397 397 cheads.extend(c.node() for c in revset)
398 398 return cheads
399 399
400 400 @property
401 401 def commonheads(self):
402 402 """set of all common heads after changeset bundle push"""
403 403 if self.cgresult:
404 404 return self.futureheads
405 405 else:
406 406 return self.fallbackheads
407 407
408 408 # mapping of message used when pushing bookmark
409 409 bookmsgmap = {'update': (_("updating bookmark %s\n"),
410 410 _('updating bookmark %s failed!\n')),
411 411 'export': (_("exporting bookmark %s\n"),
412 412 _('exporting bookmark %s failed!\n')),
413 413 'delete': (_("deleting remote bookmark %s\n"),
414 414 _('deleting remote bookmark %s failed!\n')),
415 415 }
416 416
417 417
418 418 def push(repo, remote, force=False, revs=None, newbranch=False, bookmarks=(),
419 419 opargs=None):
420 420 '''Push outgoing changesets (limited by revs) from a local
421 421 repository to remote. Return an integer:
422 422 - None means nothing to push
423 423 - 0 means HTTP error
424 424 - 1 means we pushed and remote head count is unchanged *or*
425 425 we have outgoing changesets but refused to push
426 426 - other values as described by addchangegroup()
427 427 '''
428 428 if opargs is None:
429 429 opargs = {}
430 430 pushop = pushoperation(repo, remote, force, revs, newbranch, bookmarks,
431 431 **pycompat.strkwargs(opargs))
432 432 if pushop.remote.local():
433 433 missing = (set(pushop.repo.requirements)
434 434 - pushop.remote.local().supported)
435 435 if missing:
436 436 msg = _("required features are not"
437 437 " supported in the destination:"
438 438 " %s") % (', '.join(sorted(missing)))
439 439 raise error.Abort(msg)
440 440
441 441 if not pushop.remote.canpush():
442 442 raise error.Abort(_("destination does not support push"))
443 443
444 444 if not pushop.remote.capable('unbundle'):
445 445 raise error.Abort(_('cannot push: destination does not support the '
446 446 'unbundle wire protocol command'))
447 447
448 448 # get lock as we might write phase data
449 449 wlock = lock = None
450 450 try:
451 451 # bundle2 push may receive a reply bundle touching bookmarks or other
452 452 # things requiring the wlock. Take it now to ensure proper ordering.
453 453 maypushback = pushop.ui.configbool('experimental', 'bundle2.pushback')
454 454 if (not _forcebundle1(pushop)) and maypushback:
455 455 wlock = pushop.repo.wlock()
456 456 lock = pushop.repo.lock()
457 457 pushop.trmanager = transactionmanager(pushop.repo,
458 458 'push-response',
459 459 pushop.remote.url())
460 460 except IOError as err:
461 461 if err.errno != errno.EACCES:
462 462 raise
463 463 # source repo cannot be locked.
464 464 # We do not abort the push, but just disable the local phase
465 465 # synchronisation.
466 466 msg = 'cannot lock source repository: %s\n' % err
467 467 pushop.ui.debug(msg)
468 468
469 469 with wlock or util.nullcontextmanager(), \
470 470 lock or util.nullcontextmanager(), \
471 471 pushop.trmanager or util.nullcontextmanager():
472 472 pushop.repo.checkpush(pushop)
473 473 _pushdiscovery(pushop)
474 474 if not _forcebundle1(pushop):
475 475 _pushbundle2(pushop)
476 476 _pushchangeset(pushop)
477 477 _pushsyncphase(pushop)
478 478 _pushobsolete(pushop)
479 479 _pushbookmark(pushop)
480 480
481 481 return pushop
482 482
483 483 # list of steps to perform discovery before push
484 484 pushdiscoveryorder = []
485 485
486 486 # Mapping between step name and function
487 487 #
488 488 # This exists to help extensions wrap steps if necessary
489 489 pushdiscoverymapping = {}
490 490
491 491 def pushdiscovery(stepname):
492 492 """decorator for function performing discovery before push
493 493
494 494 The function is added to the step -> function mapping and appended to the
495 495 list of steps. Beware that decorated function will be added in order (this
496 496 may matter).
497 497
498 498 You can only use this decorator for a new step, if you want to wrap a step
499 499 from an extension, change the pushdiscovery dictionary directly."""
500 500 def dec(func):
501 501 assert stepname not in pushdiscoverymapping
502 502 pushdiscoverymapping[stepname] = func
503 503 pushdiscoveryorder.append(stepname)
504 504 return func
505 505 return dec
506 506
507 507 def _pushdiscovery(pushop):
508 508 """Run all discovery steps"""
509 509 for stepname in pushdiscoveryorder:
510 510 step = pushdiscoverymapping[stepname]
511 511 step(pushop)
512 512
513 513 @pushdiscovery('changeset')
514 514 def _pushdiscoverychangeset(pushop):
515 515 """discover the changeset that need to be pushed"""
516 516 fci = discovery.findcommonincoming
517 517 if pushop.revs:
518 518 commoninc = fci(pushop.repo, pushop.remote, force=pushop.force,
519 519 ancestorsof=pushop.revs)
520 520 else:
521 521 commoninc = fci(pushop.repo, pushop.remote, force=pushop.force)
522 522 common, inc, remoteheads = commoninc
523 523 fco = discovery.findcommonoutgoing
524 524 outgoing = fco(pushop.repo, pushop.remote, onlyheads=pushop.revs,
525 525 commoninc=commoninc, force=pushop.force)
526 526 pushop.outgoing = outgoing
527 527 pushop.remoteheads = remoteheads
528 528 pushop.incoming = inc
529 529
530 530 @pushdiscovery('phase')
531 531 def _pushdiscoveryphase(pushop):
532 532 """discover the phase that needs to be pushed
533 533
534 534 (computed for both success and failure case for changesets push)"""
535 535 outgoing = pushop.outgoing
536 536 unfi = pushop.repo.unfiltered()
537 537 remotephases = pushop.remote.listkeys('phases')
538 538 if (pushop.ui.configbool('ui', '_usedassubrepo')
539 539 and remotephases # server supports phases
540 540 and not pushop.outgoing.missing # no changesets to be pushed
541 541 and remotephases.get('publishing', False)):
542 542 # When:
543 543 # - this is a subrepo push
544 544 # - and remote support phase
545 545 # - and no changeset are to be pushed
546 546 # - and remote is publishing
547 547 # We may be in issue 3781 case!
548 548 # We drop the possible phase synchronisation done by
549 549 # courtesy to publish changesets possibly locally draft
550 550 # on the remote.
551 551 pushop.outdatedphases = []
552 552 pushop.fallbackoutdatedphases = []
553 553 return
554 554
555 555 pushop.remotephases = phases.remotephasessummary(pushop.repo,
556 556 pushop.fallbackheads,
557 557 remotephases)
558 558 droots = pushop.remotephases.draftroots
559 559
560 560 extracond = ''
561 561 if not pushop.remotephases.publishing:
562 562 extracond = ' and public()'
563 563 revset = 'heads((%%ln::%%ln) %s)' % extracond
564 564 # Get the list of all revs draft on remote by public here.
565 565 # XXX Beware that revset break if droots is not strictly
566 566 # XXX root we may want to ensure it is but it is costly
567 567 fallback = list(unfi.set(revset, droots, pushop.fallbackheads))
568 568 if not outgoing.missing:
569 569 future = fallback
570 570 else:
571 571 # adds changeset we are going to push as draft
572 572 #
573 573 # should not be necessary for publishing server, but because of an
574 574 # issue fixed in xxxxx we have to do it anyway.
575 575 fdroots = list(unfi.set('roots(%ln + %ln::)',
576 576 outgoing.missing, droots))
577 577 fdroots = [f.node() for f in fdroots]
578 578 future = list(unfi.set(revset, fdroots, pushop.futureheads))
579 579 pushop.outdatedphases = future
580 580 pushop.fallbackoutdatedphases = fallback
581 581
582 582 @pushdiscovery('obsmarker')
583 583 def _pushdiscoveryobsmarkers(pushop):
584 584 if (obsolete.isenabled(pushop.repo, obsolete.exchangeopt)
585 585 and pushop.repo.obsstore
586 586 and 'obsolete' in pushop.remote.listkeys('namespaces')):
587 587 repo = pushop.repo
588 588 # very naive computation, that can be quite expensive on big repo.
589 589 # However: evolution is currently slow on them anyway.
590 590 nodes = (c.node() for c in repo.set('::%ln', pushop.futureheads))
591 591 pushop.outobsmarkers = pushop.repo.obsstore.relevantmarkers(nodes)
592 592
593 593 @pushdiscovery('bookmarks')
594 594 def _pushdiscoverybookmarks(pushop):
595 595 ui = pushop.ui
596 596 repo = pushop.repo.unfiltered()
597 597 remote = pushop.remote
598 598 ui.debug("checking for updated bookmarks\n")
599 599 ancestors = ()
600 600 if pushop.revs:
601 601 revnums = map(repo.changelog.rev, pushop.revs)
602 602 ancestors = repo.changelog.ancestors(revnums, inclusive=True)
603 603 remotebookmark = remote.listkeys('bookmarks')
604 604
605 605 explicit = set([repo._bookmarks.expandname(bookmark)
606 606 for bookmark in pushop.bookmarks])
607 607
608 608 remotebookmark = bookmod.unhexlifybookmarks(remotebookmark)
609 609 comp = bookmod.comparebookmarks(repo, repo._bookmarks, remotebookmark)
610 610
611 611 def safehex(x):
612 612 if x is None:
613 613 return x
614 614 return hex(x)
615 615
616 616 def hexifycompbookmarks(bookmarks):
617 617 for b, scid, dcid in bookmarks:
618 618 yield b, safehex(scid), safehex(dcid)
619 619
620 620 comp = [hexifycompbookmarks(marks) for marks in comp]
621 621 addsrc, adddst, advsrc, advdst, diverge, differ, invalid, same = comp
622 622
623 623 for b, scid, dcid in advsrc:
624 624 if b in explicit:
625 625 explicit.remove(b)
626 626 if not ancestors or repo[scid].rev() in ancestors:
627 627 pushop.outbookmarks.append((b, dcid, scid))
628 628 # search added bookmark
629 629 for b, scid, dcid in addsrc:
630 630 if b in explicit:
631 631 explicit.remove(b)
632 632 pushop.outbookmarks.append((b, '', scid))
633 633 # search for overwritten bookmark
634 634 for b, scid, dcid in list(advdst) + list(diverge) + list(differ):
635 635 if b in explicit:
636 636 explicit.remove(b)
637 637 pushop.outbookmarks.append((b, dcid, scid))
638 638 # search for bookmark to delete
639 639 for b, scid, dcid in adddst:
640 640 if b in explicit:
641 641 explicit.remove(b)
642 642 # treat as "deleted locally"
643 643 pushop.outbookmarks.append((b, dcid, ''))
644 644 # identical bookmarks shouldn't get reported
645 645 for b, scid, dcid in same:
646 646 if b in explicit:
647 647 explicit.remove(b)
648 648
649 649 if explicit:
650 650 explicit = sorted(explicit)
651 651 # we should probably list all of them
652 652 ui.warn(_('bookmark %s does not exist on the local '
653 653 'or remote repository!\n') % explicit[0])
654 654 pushop.bkresult = 2
655 655
656 656 pushop.outbookmarks.sort()
657 657
658 658 def _pushcheckoutgoing(pushop):
659 659 outgoing = pushop.outgoing
660 660 unfi = pushop.repo.unfiltered()
661 661 if not outgoing.missing:
662 662 # nothing to push
663 663 scmutil.nochangesfound(unfi.ui, unfi, outgoing.excluded)
664 664 return False
665 665 # something to push
666 666 if not pushop.force:
667 667 # if repo.obsstore == False --> no obsolete
668 668 # then, save the iteration
669 669 if unfi.obsstore:
670 670 # this message are here for 80 char limit reason
671 671 mso = _("push includes obsolete changeset: %s!")
672 672 mspd = _("push includes phase-divergent changeset: %s!")
673 673 mscd = _("push includes content-divergent changeset: %s!")
674 674 mst = {"orphan": _("push includes orphan changeset: %s!"),
675 675 "phase-divergent": mspd,
676 676 "content-divergent": mscd}
677 677 # If we are to push if there is at least one
678 678 # obsolete or unstable changeset in missing, at
679 679 # least one of the missinghead will be obsolete or
680 680 # unstable. So checking heads only is ok
681 681 for node in outgoing.missingheads:
682 682 ctx = unfi[node]
683 683 if ctx.obsolete():
684 684 raise error.Abort(mso % ctx)
685 685 elif ctx.isunstable():
686 686 # TODO print more than one instability in the abort
687 687 # message
688 688 raise error.Abort(mst[ctx.instabilities()[0]] % ctx)
689 689
690 690 discovery.checkheads(pushop)
691 691 return True
692 692
693 693 # List of names of steps to perform for an outgoing bundle2, order matters.
694 694 b2partsgenorder = []
695 695
696 696 # Mapping between step name and function
697 697 #
698 698 # This exists to help extensions wrap steps if necessary
699 699 b2partsgenmapping = {}
700 700
701 701 def b2partsgenerator(stepname, idx=None):
702 702 """decorator for function generating bundle2 part
703 703
704 704 The function is added to the step -> function mapping and appended to the
705 705 list of steps. Beware that decorated functions will be added in order
706 706 (this may matter).
707 707
708 708 You can only use this decorator for new steps, if you want to wrap a step
709 709 from an extension, attack the b2partsgenmapping dictionary directly."""
710 710 def dec(func):
711 711 assert stepname not in b2partsgenmapping
712 712 b2partsgenmapping[stepname] = func
713 713 if idx is None:
714 714 b2partsgenorder.append(stepname)
715 715 else:
716 716 b2partsgenorder.insert(idx, stepname)
717 717 return func
718 718 return dec
719 719
720 720 def _pushb2ctxcheckheads(pushop, bundler):
721 721 """Generate race condition checking parts
722 722
723 723 Exists as an independent function to aid extensions
724 724 """
725 725 # * 'force' do not check for push race,
726 726 # * if we don't push anything, there are nothing to check.
727 727 if not pushop.force and pushop.outgoing.missingheads:
728 728 allowunrelated = 'related' in bundler.capabilities.get('checkheads', ())
729 729 emptyremote = pushop.pushbranchmap is None
730 730 if not allowunrelated or emptyremote:
731 731 bundler.newpart('check:heads', data=iter(pushop.remoteheads))
732 732 else:
733 733 affected = set()
734 734 for branch, heads in pushop.pushbranchmap.iteritems():
735 735 remoteheads, newheads, unsyncedheads, discardedheads = heads
736 736 if remoteheads is not None:
737 737 remote = set(remoteheads)
738 738 affected |= set(discardedheads) & remote
739 739 affected |= remote - set(newheads)
740 740 if affected:
741 741 data = iter(sorted(affected))
742 742 bundler.newpart('check:updated-heads', data=data)
743 743
744 744 def _pushing(pushop):
745 745 """return True if we are pushing anything"""
746 746 return bool(pushop.outgoing.missing
747 747 or pushop.outdatedphases
748 748 or pushop.outobsmarkers
749 749 or pushop.outbookmarks)
750 750
751 751 @b2partsgenerator('check-bookmarks')
752 752 def _pushb2checkbookmarks(pushop, bundler):
753 753 """insert bookmark move checking"""
754 754 if not _pushing(pushop) or pushop.force:
755 755 return
756 756 b2caps = bundle2.bundle2caps(pushop.remote)
757 757 hasbookmarkcheck = 'bookmarks' in b2caps
758 758 if not (pushop.outbookmarks and hasbookmarkcheck):
759 759 return
760 760 data = []
761 761 for book, old, new in pushop.outbookmarks:
762 762 old = bin(old)
763 763 data.append((book, old))
764 764 checkdata = bookmod.binaryencode(data)
765 765 bundler.newpart('check:bookmarks', data=checkdata)
766 766
767 767 @b2partsgenerator('check-phases')
768 768 def _pushb2checkphases(pushop, bundler):
769 769 """insert phase move checking"""
770 770 if not _pushing(pushop) or pushop.force:
771 771 return
772 772 b2caps = bundle2.bundle2caps(pushop.remote)
773 773 hasphaseheads = 'heads' in b2caps.get('phases', ())
774 774 if pushop.remotephases is not None and hasphaseheads:
775 775 # check that the remote phase has not changed
776 776 checks = [[] for p in phases.allphases]
777 777 checks[phases.public].extend(pushop.remotephases.publicheads)
778 778 checks[phases.draft].extend(pushop.remotephases.draftroots)
779 779 if any(checks):
780 780 for nodes in checks:
781 781 nodes.sort()
782 782 checkdata = phases.binaryencode(checks)
783 783 bundler.newpart('check:phases', data=checkdata)
784 784
785 785 @b2partsgenerator('changeset')
786 786 def _pushb2ctx(pushop, bundler):
787 787 """handle changegroup push through bundle2
788 788
789 789 addchangegroup result is stored in the ``pushop.cgresult`` attribute.
790 790 """
791 791 if 'changesets' in pushop.stepsdone:
792 792 return
793 793 pushop.stepsdone.add('changesets')
794 794 # Send known heads to the server for race detection.
795 795 if not _pushcheckoutgoing(pushop):
796 796 return
797 797 pushop.repo.prepushoutgoinghooks(pushop)
798 798
799 799 _pushb2ctxcheckheads(pushop, bundler)
800 800
801 801 b2caps = bundle2.bundle2caps(pushop.remote)
802 802 version = '01'
803 803 cgversions = b2caps.get('changegroup')
804 804 if cgversions: # 3.1 and 3.2 ship with an empty value
805 805 cgversions = [v for v in cgversions
806 806 if v in changegroup.supportedoutgoingversions(
807 807 pushop.repo)]
808 808 if not cgversions:
809 809 raise ValueError(_('no common changegroup version'))
810 810 version = max(cgversions)
811 811 cgstream = changegroup.makestream(pushop.repo, pushop.outgoing, version,
812 812 'push')
813 813 cgpart = bundler.newpart('changegroup', data=cgstream)
814 814 if cgversions:
815 815 cgpart.addparam('version', version)
816 816 if 'treemanifest' in pushop.repo.requirements:
817 817 cgpart.addparam('treemanifest', '1')
818 818 def handlereply(op):
819 819 """extract addchangegroup returns from server reply"""
820 820 cgreplies = op.records.getreplies(cgpart.id)
821 821 assert len(cgreplies['changegroup']) == 1
822 822 pushop.cgresult = cgreplies['changegroup'][0]['return']
823 823 return handlereply
824 824
825 825 @b2partsgenerator('phase')
826 826 def _pushb2phases(pushop, bundler):
827 827 """handle phase push through bundle2"""
828 828 if 'phases' in pushop.stepsdone:
829 829 return
830 830 b2caps = bundle2.bundle2caps(pushop.remote)
831 831 ui = pushop.repo.ui
832 832
833 833 legacyphase = 'phases' in ui.configlist('devel', 'legacy.exchange')
834 834 haspushkey = 'pushkey' in b2caps
835 835 hasphaseheads = 'heads' in b2caps.get('phases', ())
836 836
837 837 if hasphaseheads and not legacyphase:
838 838 return _pushb2phaseheads(pushop, bundler)
839 839 elif haspushkey:
840 840 return _pushb2phasespushkey(pushop, bundler)
841 841
842 842 def _pushb2phaseheads(pushop, bundler):
843 843 """push phase information through a bundle2 - binary part"""
844 844 pushop.stepsdone.add('phases')
845 845 if pushop.outdatedphases:
846 846 updates = [[] for p in phases.allphases]
847 847 updates[0].extend(h.node() for h in pushop.outdatedphases)
848 848 phasedata = phases.binaryencode(updates)
849 849 bundler.newpart('phase-heads', data=phasedata)
850 850
851 851 def _pushb2phasespushkey(pushop, bundler):
852 852 """push phase information through a bundle2 - pushkey part"""
853 853 pushop.stepsdone.add('phases')
854 854 part2node = []
855 855
856 856 def handlefailure(pushop, exc):
857 857 targetid = int(exc.partid)
858 858 for partid, node in part2node:
859 859 if partid == targetid:
860 860 raise error.Abort(_('updating %s to public failed') % node)
861 861
862 862 enc = pushkey.encode
863 863 for newremotehead in pushop.outdatedphases:
864 864 part = bundler.newpart('pushkey')
865 865 part.addparam('namespace', enc('phases'))
866 866 part.addparam('key', enc(newremotehead.hex()))
867 867 part.addparam('old', enc('%d' % phases.draft))
868 868 part.addparam('new', enc('%d' % phases.public))
869 869 part2node.append((part.id, newremotehead))
870 870 pushop.pkfailcb[part.id] = handlefailure
871 871
872 872 def handlereply(op):
873 873 for partid, node in part2node:
874 874 partrep = op.records.getreplies(partid)
875 875 results = partrep['pushkey']
876 876 assert len(results) <= 1
877 877 msg = None
878 878 if not results:
879 879 msg = _('server ignored update of %s to public!\n') % node
880 880 elif not int(results[0]['return']):
881 881 msg = _('updating %s to public failed!\n') % node
882 882 if msg is not None:
883 883 pushop.ui.warn(msg)
884 884 return handlereply
885 885
886 886 @b2partsgenerator('obsmarkers')
887 887 def _pushb2obsmarkers(pushop, bundler):
888 888 if 'obsmarkers' in pushop.stepsdone:
889 889 return
890 890 remoteversions = bundle2.obsmarkersversion(bundler.capabilities)
891 891 if obsolete.commonversion(remoteversions) is None:
892 892 return
893 893 pushop.stepsdone.add('obsmarkers')
894 894 if pushop.outobsmarkers:
895 895 markers = sorted(pushop.outobsmarkers)
896 896 bundle2.buildobsmarkerspart(bundler, markers)
897 897
898 898 @b2partsgenerator('bookmarks')
899 899 def _pushb2bookmarks(pushop, bundler):
900 900 """handle bookmark push through bundle2"""
901 901 if 'bookmarks' in pushop.stepsdone:
902 902 return
903 903 b2caps = bundle2.bundle2caps(pushop.remote)
904 904
905 905 legacy = pushop.repo.ui.configlist('devel', 'legacy.exchange')
906 906 legacybooks = 'bookmarks' in legacy
907 907
908 908 if not legacybooks and 'bookmarks' in b2caps:
909 909 return _pushb2bookmarkspart(pushop, bundler)
910 910 elif 'pushkey' in b2caps:
911 911 return _pushb2bookmarkspushkey(pushop, bundler)
912 912
913 913 def _bmaction(old, new):
914 914 """small utility for bookmark pushing"""
915 915 if not old:
916 916 return 'export'
917 917 elif not new:
918 918 return 'delete'
919 919 return 'update'
920 920
921 921 def _pushb2bookmarkspart(pushop, bundler):
922 922 pushop.stepsdone.add('bookmarks')
923 923 if not pushop.outbookmarks:
924 924 return
925 925
926 926 allactions = []
927 927 data = []
928 928 for book, old, new in pushop.outbookmarks:
929 929 new = bin(new)
930 930 data.append((book, new))
931 931 allactions.append((book, _bmaction(old, new)))
932 932 checkdata = bookmod.binaryencode(data)
933 933 bundler.newpart('bookmarks', data=checkdata)
934 934
935 935 def handlereply(op):
936 936 ui = pushop.ui
937 937 # if success
938 938 for book, action in allactions:
939 939 ui.status(bookmsgmap[action][0] % book)
940 940
941 941 return handlereply
942 942
943 943 def _pushb2bookmarkspushkey(pushop, bundler):
944 944 pushop.stepsdone.add('bookmarks')
945 945 part2book = []
946 946 enc = pushkey.encode
947 947
948 948 def handlefailure(pushop, exc):
949 949 targetid = int(exc.partid)
950 950 for partid, book, action in part2book:
951 951 if partid == targetid:
952 952 raise error.Abort(bookmsgmap[action][1].rstrip() % book)
953 953 # we should not be called for part we did not generated
954 954 assert False
955 955
956 956 for book, old, new in pushop.outbookmarks:
957 957 part = bundler.newpart('pushkey')
958 958 part.addparam('namespace', enc('bookmarks'))
959 959 part.addparam('key', enc(book))
960 960 part.addparam('old', enc(old))
961 961 part.addparam('new', enc(new))
962 962 action = 'update'
963 963 if not old:
964 964 action = 'export'
965 965 elif not new:
966 966 action = 'delete'
967 967 part2book.append((part.id, book, action))
968 968 pushop.pkfailcb[part.id] = handlefailure
969 969
970 970 def handlereply(op):
971 971 ui = pushop.ui
972 972 for partid, book, action in part2book:
973 973 partrep = op.records.getreplies(partid)
974 974 results = partrep['pushkey']
975 975 assert len(results) <= 1
976 976 if not results:
977 977 pushop.ui.warn(_('server ignored bookmark %s update\n') % book)
978 978 else:
979 979 ret = int(results[0]['return'])
980 980 if ret:
981 981 ui.status(bookmsgmap[action][0] % book)
982 982 else:
983 983 ui.warn(bookmsgmap[action][1] % book)
984 984 if pushop.bkresult is not None:
985 985 pushop.bkresult = 1
986 986 return handlereply
987 987
988 988 @b2partsgenerator('pushvars', idx=0)
989 989 def _getbundlesendvars(pushop, bundler):
990 990 '''send shellvars via bundle2'''
991 991 pushvars = pushop.pushvars
992 992 if pushvars:
993 993 shellvars = {}
994 994 for raw in pushvars:
995 995 if '=' not in raw:
996 996 msg = ("unable to parse variable '%s', should follow "
997 997 "'KEY=VALUE' or 'KEY=' format")
998 998 raise error.Abort(msg % raw)
999 999 k, v = raw.split('=', 1)
1000 1000 shellvars[k] = v
1001 1001
1002 1002 part = bundler.newpart('pushvars')
1003 1003
1004 1004 for key, value in shellvars.iteritems():
1005 1005 part.addparam(key, value, mandatory=False)
1006 1006
1007 1007 def _pushbundle2(pushop):
1008 1008 """push data to the remote using bundle2
1009 1009
1010 1010 The only currently supported type of data is changegroup but this will
1011 1011 evolve in the future."""
1012 1012 bundler = bundle2.bundle20(pushop.ui, bundle2.bundle2caps(pushop.remote))
1013 1013 pushback = (pushop.trmanager
1014 1014 and pushop.ui.configbool('experimental', 'bundle2.pushback'))
1015 1015
1016 1016 # create reply capability
1017 1017 capsblob = bundle2.encodecaps(bundle2.getrepocaps(pushop.repo,
1018 1018 allowpushback=pushback))
1019 1019 bundler.newpart('replycaps', data=capsblob)
1020 1020 replyhandlers = []
1021 1021 for partgenname in b2partsgenorder:
1022 1022 partgen = b2partsgenmapping[partgenname]
1023 1023 ret = partgen(pushop, bundler)
1024 1024 if callable(ret):
1025 1025 replyhandlers.append(ret)
1026 1026 # do not push if nothing to push
1027 1027 if bundler.nbparts <= 1:
1028 1028 return
1029 1029 stream = util.chunkbuffer(bundler.getchunks())
1030 1030 try:
1031 1031 try:
1032 1032 reply = pushop.remote.unbundle(
1033 1033 stream, ['force'], pushop.remote.url())
1034 1034 except error.BundleValueError as exc:
1035 1035 raise error.Abort(_('missing support for %s') % exc)
1036 1036 try:
1037 1037 trgetter = None
1038 1038 if pushback:
1039 1039 trgetter = pushop.trmanager.transaction
1040 1040 op = bundle2.processbundle(pushop.repo, reply, trgetter)
1041 1041 except error.BundleValueError as exc:
1042 1042 raise error.Abort(_('missing support for %s') % exc)
1043 1043 except bundle2.AbortFromPart as exc:
1044 1044 pushop.ui.status(_('remote: %s\n') % exc)
1045 1045 if exc.hint is not None:
1046 1046 pushop.ui.status(_('remote: %s\n') % ('(%s)' % exc.hint))
1047 1047 raise error.Abort(_('push failed on remote'))
1048 1048 except error.PushkeyFailed as exc:
1049 1049 partid = int(exc.partid)
1050 1050 if partid not in pushop.pkfailcb:
1051 1051 raise
1052 1052 pushop.pkfailcb[partid](pushop, exc)
1053 1053 for rephand in replyhandlers:
1054 1054 rephand(op)
1055 1055
1056 1056 def _pushchangeset(pushop):
1057 1057 """Make the actual push of changeset bundle to remote repo"""
1058 1058 if 'changesets' in pushop.stepsdone:
1059 1059 return
1060 1060 pushop.stepsdone.add('changesets')
1061 1061 if not _pushcheckoutgoing(pushop):
1062 1062 return
1063 1063
1064 1064 # Should have verified this in push().
1065 1065 assert pushop.remote.capable('unbundle')
1066 1066
1067 1067 pushop.repo.prepushoutgoinghooks(pushop)
1068 1068 outgoing = pushop.outgoing
1069 1069 # TODO: get bundlecaps from remote
1070 1070 bundlecaps = None
1071 1071 # create a changegroup from local
1072 1072 if pushop.revs is None and not (outgoing.excluded
1073 1073 or pushop.repo.changelog.filteredrevs):
1074 1074 # push everything,
1075 1075 # use the fast path, no race possible on push
1076 1076 cg = changegroup.makechangegroup(pushop.repo, outgoing, '01', 'push',
1077 1077 fastpath=True, bundlecaps=bundlecaps)
1078 1078 else:
1079 1079 cg = changegroup.makechangegroup(pushop.repo, outgoing, '01',
1080 1080 'push', bundlecaps=bundlecaps)
1081 1081
1082 1082 # apply changegroup to remote
1083 1083 # local repo finds heads on server, finds out what
1084 1084 # revs it must push. once revs transferred, if server
1085 1085 # finds it has different heads (someone else won
1086 1086 # commit/push race), server aborts.
1087 1087 if pushop.force:
1088 1088 remoteheads = ['force']
1089 1089 else:
1090 1090 remoteheads = pushop.remoteheads
1091 1091 # ssh: return remote's addchangegroup()
1092 1092 # http: return remote's addchangegroup() or 0 for error
1093 1093 pushop.cgresult = pushop.remote.unbundle(cg, remoteheads,
1094 1094 pushop.repo.url())
1095 1095
1096 1096 def _pushsyncphase(pushop):
1097 1097 """synchronise phase information locally and remotely"""
1098 1098 cheads = pushop.commonheads
1099 1099 # even when we don't push, exchanging phase data is useful
1100 1100 remotephases = pushop.remote.listkeys('phases')
1101 1101 if (pushop.ui.configbool('ui', '_usedassubrepo')
1102 1102 and remotephases # server supports phases
1103 1103 and pushop.cgresult is None # nothing was pushed
1104 1104 and remotephases.get('publishing', False)):
1105 1105 # When:
1106 1106 # - this is a subrepo push
1107 1107 # - and remote support phase
1108 1108 # - and no changeset was pushed
1109 1109 # - and remote is publishing
1110 1110 # We may be in issue 3871 case!
1111 1111 # We drop the possible phase synchronisation done by
1112 1112 # courtesy to publish changesets possibly locally draft
1113 1113 # on the remote.
1114 1114 remotephases = {'publishing': 'True'}
1115 1115 if not remotephases: # old server or public only reply from non-publishing
1116 1116 _localphasemove(pushop, cheads)
1117 1117 # don't push any phase data as there is nothing to push
1118 1118 else:
1119 1119 ana = phases.analyzeremotephases(pushop.repo, cheads,
1120 1120 remotephases)
1121 1121 pheads, droots = ana
1122 1122 ### Apply remote phase on local
1123 1123 if remotephases.get('publishing', False):
1124 1124 _localphasemove(pushop, cheads)
1125 1125 else: # publish = False
1126 1126 _localphasemove(pushop, pheads)
1127 1127 _localphasemove(pushop, cheads, phases.draft)
1128 1128 ### Apply local phase on remote
1129 1129
1130 1130 if pushop.cgresult:
1131 1131 if 'phases' in pushop.stepsdone:
1132 1132 # phases already pushed though bundle2
1133 1133 return
1134 1134 outdated = pushop.outdatedphases
1135 1135 else:
1136 1136 outdated = pushop.fallbackoutdatedphases
1137 1137
1138 1138 pushop.stepsdone.add('phases')
1139 1139
1140 1140 # filter heads already turned public by the push
1141 1141 outdated = [c for c in outdated if c.node() not in pheads]
1142 1142 # fallback to independent pushkey command
1143 1143 for newremotehead in outdated:
1144 1144 r = pushop.remote.pushkey('phases',
1145 1145 newremotehead.hex(),
1146 1146 str(phases.draft),
1147 1147 str(phases.public))
1148 1148 if not r:
1149 1149 pushop.ui.warn(_('updating %s to public failed!\n')
1150 1150 % newremotehead)
1151 1151
1152 1152 def _localphasemove(pushop, nodes, phase=phases.public):
1153 1153 """move <nodes> to <phase> in the local source repo"""
1154 1154 if pushop.trmanager:
1155 1155 phases.advanceboundary(pushop.repo,
1156 1156 pushop.trmanager.transaction(),
1157 1157 phase,
1158 1158 nodes)
1159 1159 else:
1160 1160 # repo is not locked, do not change any phases!
1161 1161 # Informs the user that phases should have been moved when
1162 1162 # applicable.
1163 1163 actualmoves = [n for n in nodes if phase < pushop.repo[n].phase()]
1164 1164 phasestr = phases.phasenames[phase]
1165 1165 if actualmoves:
1166 1166 pushop.ui.status(_('cannot lock source repo, skipping '
1167 1167 'local %s phase update\n') % phasestr)
1168 1168
1169 1169 def _pushobsolete(pushop):
1170 1170 """utility function to push obsolete markers to a remote"""
1171 1171 if 'obsmarkers' in pushop.stepsdone:
1172 1172 return
1173 1173 repo = pushop.repo
1174 1174 remote = pushop.remote
1175 1175 pushop.stepsdone.add('obsmarkers')
1176 1176 if pushop.outobsmarkers:
1177 1177 pushop.ui.debug('try to push obsolete markers to remote\n')
1178 1178 rslts = []
1179 1179 remotedata = obsolete._pushkeyescape(sorted(pushop.outobsmarkers))
1180 1180 for key in sorted(remotedata, reverse=True):
1181 1181 # reverse sort to ensure we end with dump0
1182 1182 data = remotedata[key]
1183 1183 rslts.append(remote.pushkey('obsolete', key, '', data))
1184 1184 if [r for r in rslts if not r]:
1185 1185 msg = _('failed to push some obsolete markers!\n')
1186 1186 repo.ui.warn(msg)
1187 1187
1188 1188 def _pushbookmark(pushop):
1189 1189 """Update bookmark position on remote"""
1190 1190 if pushop.cgresult == 0 or 'bookmarks' in pushop.stepsdone:
1191 1191 return
1192 1192 pushop.stepsdone.add('bookmarks')
1193 1193 ui = pushop.ui
1194 1194 remote = pushop.remote
1195 1195
1196 1196 for b, old, new in pushop.outbookmarks:
1197 1197 action = 'update'
1198 1198 if not old:
1199 1199 action = 'export'
1200 1200 elif not new:
1201 1201 action = 'delete'
1202 1202 if remote.pushkey('bookmarks', b, old, new):
1203 1203 ui.status(bookmsgmap[action][0] % b)
1204 1204 else:
1205 1205 ui.warn(bookmsgmap[action][1] % b)
1206 1206 # discovery can have set the value form invalid entry
1207 1207 if pushop.bkresult is not None:
1208 1208 pushop.bkresult = 1
1209 1209
1210 1210 class pulloperation(object):
1211 1211 """A object that represent a single pull operation
1212 1212
1213 1213 It purpose is to carry pull related state and very common operation.
1214 1214
1215 1215 A new should be created at the beginning of each pull and discarded
1216 1216 afterward.
1217 1217 """
1218 1218
1219 1219 def __init__(self, repo, remote, heads=None, force=False, bookmarks=(),
1220 1220 remotebookmarks=None, streamclonerequested=None):
1221 1221 # repo we pull into
1222 1222 self.repo = repo
1223 1223 # repo we pull from
1224 1224 self.remote = remote
1225 1225 # revision we try to pull (None is "all")
1226 1226 self.heads = heads
1227 1227 # bookmark pulled explicitly
1228 1228 self.explicitbookmarks = [repo._bookmarks.expandname(bookmark)
1229 1229 for bookmark in bookmarks]
1230 1230 # do we force pull?
1231 1231 self.force = force
1232 1232 # whether a streaming clone was requested
1233 1233 self.streamclonerequested = streamclonerequested
1234 1234 # transaction manager
1235 1235 self.trmanager = None
1236 1236 # set of common changeset between local and remote before pull
1237 1237 self.common = None
1238 1238 # set of pulled head
1239 1239 self.rheads = None
1240 1240 # list of missing changeset to fetch remotely
1241 1241 self.fetch = None
1242 1242 # remote bookmarks data
1243 1243 self.remotebookmarks = remotebookmarks
1244 1244 # result of changegroup pulling (used as return code by pull)
1245 1245 self.cgresult = None
1246 1246 # list of step already done
1247 1247 self.stepsdone = set()
1248 1248 # Whether we attempted a clone from pre-generated bundles.
1249 1249 self.clonebundleattempted = False
1250 1250
1251 1251 @util.propertycache
1252 1252 def pulledsubset(self):
1253 1253 """heads of the set of changeset target by the pull"""
1254 1254 # compute target subset
1255 1255 if self.heads is None:
1256 1256 # We pulled every thing possible
1257 1257 # sync on everything common
1258 1258 c = set(self.common)
1259 1259 ret = list(self.common)
1260 1260 for n in self.rheads:
1261 1261 if n not in c:
1262 1262 ret.append(n)
1263 1263 return ret
1264 1264 else:
1265 1265 # We pulled a specific subset
1266 1266 # sync on this subset
1267 1267 return self.heads
1268 1268
1269 1269 @util.propertycache
1270 1270 def canusebundle2(self):
1271 1271 return not _forcebundle1(self)
1272 1272
1273 1273 @util.propertycache
1274 1274 def remotebundle2caps(self):
1275 1275 return bundle2.bundle2caps(self.remote)
1276 1276
1277 1277 def gettransaction(self):
1278 1278 # deprecated; talk to trmanager directly
1279 1279 return self.trmanager.transaction()
1280 1280
1281 1281 class transactionmanager(util.transactional):
1282 1282 """An object to manage the life cycle of a transaction
1283 1283
1284 1284 It creates the transaction on demand and calls the appropriate hooks when
1285 1285 closing the transaction."""
1286 1286 def __init__(self, repo, source, url):
1287 1287 self.repo = repo
1288 1288 self.source = source
1289 1289 self.url = url
1290 1290 self._tr = None
1291 1291
1292 1292 def transaction(self):
1293 1293 """Return an open transaction object, constructing if necessary"""
1294 1294 if not self._tr:
1295 1295 trname = '%s\n%s' % (self.source, util.hidepassword(self.url))
1296 1296 self._tr = self.repo.transaction(trname)
1297 1297 self._tr.hookargs['source'] = self.source
1298 1298 self._tr.hookargs['url'] = self.url
1299 1299 return self._tr
1300 1300
1301 1301 def close(self):
1302 1302 """close transaction if created"""
1303 1303 if self._tr is not None:
1304 1304 self._tr.close()
1305 1305
1306 1306 def release(self):
1307 1307 """release transaction if created"""
1308 1308 if self._tr is not None:
1309 1309 self._tr.release()
1310 1310
1311 1311 def pull(repo, remote, heads=None, force=False, bookmarks=(), opargs=None,
1312 1312 streamclonerequested=None):
1313 1313 """Fetch repository data from a remote.
1314 1314
1315 1315 This is the main function used to retrieve data from a remote repository.
1316 1316
1317 1317 ``repo`` is the local repository to clone into.
1318 1318 ``remote`` is a peer instance.
1319 1319 ``heads`` is an iterable of revisions we want to pull. ``None`` (the
1320 1320 default) means to pull everything from the remote.
1321 1321 ``bookmarks`` is an iterable of bookmarks requesting to be pulled. By
1322 1322 default, all remote bookmarks are pulled.
1323 1323 ``opargs`` are additional keyword arguments to pass to ``pulloperation``
1324 1324 initialization.
1325 1325 ``streamclonerequested`` is a boolean indicating whether a "streaming
1326 1326 clone" is requested. A "streaming clone" is essentially a raw file copy
1327 1327 of revlogs from the server. This only works when the local repository is
1328 1328 empty. The default value of ``None`` means to respect the server
1329 1329 configuration for preferring stream clones.
1330 1330
1331 1331 Returns the ``pulloperation`` created for this pull.
1332 1332 """
1333 1333 if opargs is None:
1334 1334 opargs = {}
1335 1335 pullop = pulloperation(repo, remote, heads, force, bookmarks=bookmarks,
1336 1336 streamclonerequested=streamclonerequested,
1337 1337 **pycompat.strkwargs(opargs))
1338 1338
1339 1339 peerlocal = pullop.remote.local()
1340 1340 if peerlocal:
1341 1341 missing = set(peerlocal.requirements) - pullop.repo.supported
1342 1342 if missing:
1343 1343 msg = _("required features are not"
1344 1344 " supported in the destination:"
1345 1345 " %s") % (', '.join(sorted(missing)))
1346 1346 raise error.Abort(msg)
1347 1347
1348 1348 pullop.trmanager = transactionmanager(repo, 'pull', remote.url())
1349 1349 with repo.wlock(), repo.lock(), pullop.trmanager:
1350 1350 # This should ideally be in _pullbundle2(). However, it needs to run
1351 1351 # before discovery to avoid extra work.
1352 1352 _maybeapplyclonebundle(pullop)
1353 1353 streamclone.maybeperformlegacystreamclone(pullop)
1354 1354 _pulldiscovery(pullop)
1355 1355 if pullop.canusebundle2:
1356 1356 _pullbundle2(pullop)
1357 1357 _pullchangeset(pullop)
1358 1358 _pullphase(pullop)
1359 1359 _pullbookmarks(pullop)
1360 1360 _pullobsolete(pullop)
1361 1361
1362 1362 # storing remotenames
1363 1363 if repo.ui.configbool('experimental', 'remotenames'):
1364 1364 logexchange.pullremotenames(repo, remote)
1365 1365
1366 1366 return pullop
1367 1367
1368 1368 # list of steps to perform discovery before pull
1369 1369 pulldiscoveryorder = []
1370 1370
1371 1371 # Mapping between step name and function
1372 1372 #
1373 1373 # This exists to help extensions wrap steps if necessary
1374 1374 pulldiscoverymapping = {}
1375 1375
1376 1376 def pulldiscovery(stepname):
1377 1377 """decorator for function performing discovery before pull
1378 1378
1379 1379 The function is added to the step -> function mapping and appended to the
1380 1380 list of steps. Beware that decorated function will be added in order (this
1381 1381 may matter).
1382 1382
1383 1383 You can only use this decorator for a new step, if you want to wrap a step
1384 1384 from an extension, change the pulldiscovery dictionary directly."""
1385 1385 def dec(func):
1386 1386 assert stepname not in pulldiscoverymapping
1387 1387 pulldiscoverymapping[stepname] = func
1388 1388 pulldiscoveryorder.append(stepname)
1389 1389 return func
1390 1390 return dec
1391 1391
1392 1392 def _pulldiscovery(pullop):
1393 1393 """Run all discovery steps"""
1394 1394 for stepname in pulldiscoveryorder:
1395 1395 step = pulldiscoverymapping[stepname]
1396 1396 step(pullop)
1397 1397
1398 1398 @pulldiscovery('b1:bookmarks')
1399 1399 def _pullbookmarkbundle1(pullop):
1400 1400 """fetch bookmark data in bundle1 case
1401 1401
1402 1402 If not using bundle2, we have to fetch bookmarks before changeset
1403 1403 discovery to reduce the chance and impact of race conditions."""
1404 1404 if pullop.remotebookmarks is not None:
1405 1405 return
1406 1406 if pullop.canusebundle2 and 'listkeys' in pullop.remotebundle2caps:
1407 1407 # all known bundle2 servers now support listkeys, but lets be nice with
1408 1408 # new implementation.
1409 1409 return
1410 1410 books = pullop.remote.listkeys('bookmarks')
1411 1411 pullop.remotebookmarks = bookmod.unhexlifybookmarks(books)
1412 1412
1413 1413
1414 1414 @pulldiscovery('changegroup')
1415 1415 def _pulldiscoverychangegroup(pullop):
1416 1416 """discovery phase for the pull
1417 1417
1418 1418 Current handle changeset discovery only, will change handle all discovery
1419 1419 at some point."""
1420 1420 tmp = discovery.findcommonincoming(pullop.repo,
1421 1421 pullop.remote,
1422 1422 heads=pullop.heads,
1423 1423 force=pullop.force)
1424 1424 common, fetch, rheads = tmp
1425 1425 nm = pullop.repo.unfiltered().changelog.nodemap
1426 1426 if fetch and rheads:
1427 1427 # If a remote heads is filtered locally, put in back in common.
1428 1428 #
1429 1429 # This is a hackish solution to catch most of "common but locally
1430 1430 # hidden situation". We do not performs discovery on unfiltered
1431 1431 # repository because it end up doing a pathological amount of round
1432 1432 # trip for w huge amount of changeset we do not care about.
1433 1433 #
1434 1434 # If a set of such "common but filtered" changeset exist on the server
1435 1435 # but are not including a remote heads, we'll not be able to detect it,
1436 1436 scommon = set(common)
1437 1437 for n in rheads:
1438 1438 if n in nm:
1439 1439 if n not in scommon:
1440 1440 common.append(n)
1441 1441 if set(rheads).issubset(set(common)):
1442 1442 fetch = []
1443 1443 pullop.common = common
1444 1444 pullop.fetch = fetch
1445 1445 pullop.rheads = rheads
1446 1446
1447 1447 def _pullbundle2(pullop):
1448 1448 """pull data using bundle2
1449 1449
1450 1450 For now, the only supported data are changegroup."""
1451 1451 kwargs = {'bundlecaps': caps20to10(pullop.repo)}
1452 1452
1453 1453 # make ui easier to access
1454 1454 ui = pullop.repo.ui
1455 1455
1456 1456 # At the moment we don't do stream clones over bundle2. If that is
1457 1457 # implemented then here's where the check for that will go.
1458 1458 streaming = streamclone.canperformstreamclone(pullop, bundle2=True)[0]
1459 1459
1460 1460 # declare pull perimeters
1461 1461 kwargs['common'] = pullop.common
1462 1462 kwargs['heads'] = pullop.heads or pullop.rheads
1463 1463
1464 1464 if streaming:
1465 1465 kwargs['cg'] = False
1466 1466 kwargs['stream'] = True
1467 1467 pullop.stepsdone.add('changegroup')
1468 pullop.stepsdone.add('phases')
1468 1469
1469 1470 else:
1470 1471 # pulling changegroup
1471 1472 pullop.stepsdone.add('changegroup')
1472 1473
1473 1474 kwargs['cg'] = pullop.fetch
1474 1475
1475 1476 legacyphase = 'phases' in ui.configlist('devel', 'legacy.exchange')
1476 1477 hasbinaryphase = 'heads' in pullop.remotebundle2caps.get('phases', ())
1477 1478 if (not legacyphase and hasbinaryphase):
1478 1479 kwargs['phases'] = True
1479 1480 pullop.stepsdone.add('phases')
1480 1481
1481 1482 if 'listkeys' in pullop.remotebundle2caps:
1482 1483 if 'phases' not in pullop.stepsdone:
1483 1484 kwargs['listkeys'] = ['phases']
1484 1485
1485 1486 bookmarksrequested = False
1486 1487 legacybookmark = 'bookmarks' in ui.configlist('devel', 'legacy.exchange')
1487 1488 hasbinarybook = 'bookmarks' in pullop.remotebundle2caps
1488 1489
1489 1490 if pullop.remotebookmarks is not None:
1490 1491 pullop.stepsdone.add('request-bookmarks')
1491 1492
1492 1493 if ('request-bookmarks' not in pullop.stepsdone
1493 1494 and pullop.remotebookmarks is None
1494 1495 and not legacybookmark and hasbinarybook):
1495 1496 kwargs['bookmarks'] = True
1496 1497 bookmarksrequested = True
1497 1498
1498 1499 if 'listkeys' in pullop.remotebundle2caps:
1499 1500 if 'request-bookmarks' not in pullop.stepsdone:
1500 1501 # make sure to always includes bookmark data when migrating
1501 1502 # `hg incoming --bundle` to using this function.
1502 1503 pullop.stepsdone.add('request-bookmarks')
1503 1504 kwargs.setdefault('listkeys', []).append('bookmarks')
1504 1505
1505 1506 # If this is a full pull / clone and the server supports the clone bundles
1506 1507 # feature, tell the server whether we attempted a clone bundle. The
1507 1508 # presence of this flag indicates the client supports clone bundles. This
1508 1509 # will enable the server to treat clients that support clone bundles
1509 1510 # differently from those that don't.
1510 1511 if (pullop.remote.capable('clonebundles')
1511 1512 and pullop.heads is None and list(pullop.common) == [nullid]):
1512 1513 kwargs['cbattempted'] = pullop.clonebundleattempted
1513 1514
1514 1515 if streaming:
1515 1516 pullop.repo.ui.status(_('streaming all changes\n'))
1516 1517 elif not pullop.fetch:
1517 1518 pullop.repo.ui.status(_("no changes found\n"))
1518 1519 pullop.cgresult = 0
1519 1520 else:
1520 1521 if pullop.heads is None and list(pullop.common) == [nullid]:
1521 1522 pullop.repo.ui.status(_("requesting all changes\n"))
1522 1523 if obsolete.isenabled(pullop.repo, obsolete.exchangeopt):
1523 1524 remoteversions = bundle2.obsmarkersversion(pullop.remotebundle2caps)
1524 1525 if obsolete.commonversion(remoteversions) is not None:
1525 1526 kwargs['obsmarkers'] = True
1526 1527 pullop.stepsdone.add('obsmarkers')
1527 1528 _pullbundle2extraprepare(pullop, kwargs)
1528 1529 bundle = pullop.remote.getbundle('pull', **pycompat.strkwargs(kwargs))
1529 1530 try:
1530 1531 op = bundle2.bundleoperation(pullop.repo, pullop.gettransaction)
1531 1532 op.modes['bookmarks'] = 'records'
1532 1533 bundle2.processbundle(pullop.repo, bundle, op=op)
1533 1534 except bundle2.AbortFromPart as exc:
1534 1535 pullop.repo.ui.status(_('remote: abort: %s\n') % exc)
1535 1536 raise error.Abort(_('pull failed on remote'), hint=exc.hint)
1536 1537 except error.BundleValueError as exc:
1537 1538 raise error.Abort(_('missing support for %s') % exc)
1538 1539
1539 1540 if pullop.fetch:
1540 1541 pullop.cgresult = bundle2.combinechangegroupresults(op)
1541 1542
1542 1543 # processing phases change
1543 1544 for namespace, value in op.records['listkeys']:
1544 1545 if namespace == 'phases':
1545 1546 _pullapplyphases(pullop, value)
1546 1547
1547 1548 # processing bookmark update
1548 1549 if bookmarksrequested:
1549 1550 books = {}
1550 1551 for record in op.records['bookmarks']:
1551 1552 books[record['bookmark']] = record["node"]
1552 1553 pullop.remotebookmarks = books
1553 1554 else:
1554 1555 for namespace, value in op.records['listkeys']:
1555 1556 if namespace == 'bookmarks':
1556 1557 pullop.remotebookmarks = bookmod.unhexlifybookmarks(value)
1557 1558
1558 1559 # bookmark data were either already there or pulled in the bundle
1559 1560 if pullop.remotebookmarks is not None:
1560 1561 _pullbookmarks(pullop)
1561 1562
1562 1563 def _pullbundle2extraprepare(pullop, kwargs):
1563 1564 """hook function so that extensions can extend the getbundle call"""
1564 1565
1565 1566 def _pullchangeset(pullop):
1566 1567 """pull changeset from unbundle into the local repo"""
1567 1568 # We delay the open of the transaction as late as possible so we
1568 1569 # don't open transaction for nothing or you break future useful
1569 1570 # rollback call
1570 1571 if 'changegroup' in pullop.stepsdone:
1571 1572 return
1572 1573 pullop.stepsdone.add('changegroup')
1573 1574 if not pullop.fetch:
1574 1575 pullop.repo.ui.status(_("no changes found\n"))
1575 1576 pullop.cgresult = 0
1576 1577 return
1577 1578 tr = pullop.gettransaction()
1578 1579 if pullop.heads is None and list(pullop.common) == [nullid]:
1579 1580 pullop.repo.ui.status(_("requesting all changes\n"))
1580 1581 elif pullop.heads is None and pullop.remote.capable('changegroupsubset'):
1581 1582 # issue1320, avoid a race if remote changed after discovery
1582 1583 pullop.heads = pullop.rheads
1583 1584
1584 1585 if pullop.remote.capable('getbundle'):
1585 1586 # TODO: get bundlecaps from remote
1586 1587 cg = pullop.remote.getbundle('pull', common=pullop.common,
1587 1588 heads=pullop.heads or pullop.rheads)
1588 1589 elif pullop.heads is None:
1589 1590 cg = pullop.remote.changegroup(pullop.fetch, 'pull')
1590 1591 elif not pullop.remote.capable('changegroupsubset'):
1591 1592 raise error.Abort(_("partial pull cannot be done because "
1592 1593 "other repository doesn't support "
1593 1594 "changegroupsubset."))
1594 1595 else:
1595 1596 cg = pullop.remote.changegroupsubset(pullop.fetch, pullop.heads, 'pull')
1596 1597 bundleop = bundle2.applybundle(pullop.repo, cg, tr, 'pull',
1597 1598 pullop.remote.url())
1598 1599 pullop.cgresult = bundle2.combinechangegroupresults(bundleop)
1599 1600
1600 1601 def _pullphase(pullop):
1601 1602 # Get remote phases data from remote
1602 1603 if 'phases' in pullop.stepsdone:
1603 1604 return
1604 1605 remotephases = pullop.remote.listkeys('phases')
1605 1606 _pullapplyphases(pullop, remotephases)
1606 1607
1607 1608 def _pullapplyphases(pullop, remotephases):
1608 1609 """apply phase movement from observed remote state"""
1609 1610 if 'phases' in pullop.stepsdone:
1610 1611 return
1611 1612 pullop.stepsdone.add('phases')
1612 1613 publishing = bool(remotephases.get('publishing', False))
1613 1614 if remotephases and not publishing:
1614 1615 # remote is new and non-publishing
1615 1616 pheads, _dr = phases.analyzeremotephases(pullop.repo,
1616 1617 pullop.pulledsubset,
1617 1618 remotephases)
1618 1619 dheads = pullop.pulledsubset
1619 1620 else:
1620 1621 # Remote is old or publishing all common changesets
1621 1622 # should be seen as public
1622 1623 pheads = pullop.pulledsubset
1623 1624 dheads = []
1624 1625 unfi = pullop.repo.unfiltered()
1625 1626 phase = unfi._phasecache.phase
1626 1627 rev = unfi.changelog.nodemap.get
1627 1628 public = phases.public
1628 1629 draft = phases.draft
1629 1630
1630 1631 # exclude changesets already public locally and update the others
1631 1632 pheads = [pn for pn in pheads if phase(unfi, rev(pn)) > public]
1632 1633 if pheads:
1633 1634 tr = pullop.gettransaction()
1634 1635 phases.advanceboundary(pullop.repo, tr, public, pheads)
1635 1636
1636 1637 # exclude changesets already draft locally and update the others
1637 1638 dheads = [pn for pn in dheads if phase(unfi, rev(pn)) > draft]
1638 1639 if dheads:
1639 1640 tr = pullop.gettransaction()
1640 1641 phases.advanceboundary(pullop.repo, tr, draft, dheads)
1641 1642
1642 1643 def _pullbookmarks(pullop):
1643 1644 """process the remote bookmark information to update the local one"""
1644 1645 if 'bookmarks' in pullop.stepsdone:
1645 1646 return
1646 1647 pullop.stepsdone.add('bookmarks')
1647 1648 repo = pullop.repo
1648 1649 remotebookmarks = pullop.remotebookmarks
1649 1650 bookmod.updatefromremote(repo.ui, repo, remotebookmarks,
1650 1651 pullop.remote.url(),
1651 1652 pullop.gettransaction,
1652 1653 explicit=pullop.explicitbookmarks)
1653 1654
1654 1655 def _pullobsolete(pullop):
1655 1656 """utility function to pull obsolete markers from a remote
1656 1657
1657 1658 The `gettransaction` is function that return the pull transaction, creating
1658 1659 one if necessary. We return the transaction to inform the calling code that
1659 1660 a new transaction have been created (when applicable).
1660 1661
1661 1662 Exists mostly to allow overriding for experimentation purpose"""
1662 1663 if 'obsmarkers' in pullop.stepsdone:
1663 1664 return
1664 1665 pullop.stepsdone.add('obsmarkers')
1665 1666 tr = None
1666 1667 if obsolete.isenabled(pullop.repo, obsolete.exchangeopt):
1667 1668 pullop.repo.ui.debug('fetching remote obsolete markers\n')
1668 1669 remoteobs = pullop.remote.listkeys('obsolete')
1669 1670 if 'dump0' in remoteobs:
1670 1671 tr = pullop.gettransaction()
1671 1672 markers = []
1672 1673 for key in sorted(remoteobs, reverse=True):
1673 1674 if key.startswith('dump'):
1674 1675 data = util.b85decode(remoteobs[key])
1675 1676 version, newmarks = obsolete._readmarkers(data)
1676 1677 markers += newmarks
1677 1678 if markers:
1678 1679 pullop.repo.obsstore.add(tr, markers)
1679 1680 pullop.repo.invalidatevolatilesets()
1680 1681 return tr
1681 1682
1682 1683 def caps20to10(repo):
1683 1684 """return a set with appropriate options to use bundle20 during getbundle"""
1684 1685 caps = {'HG20'}
1685 1686 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
1686 1687 caps.add('bundle2=' + urlreq.quote(capsblob))
1687 1688 return caps
1688 1689
1689 1690 # List of names of steps to perform for a bundle2 for getbundle, order matters.
1690 1691 getbundle2partsorder = []
1691 1692
1692 1693 # Mapping between step name and function
1693 1694 #
1694 1695 # This exists to help extensions wrap steps if necessary
1695 1696 getbundle2partsmapping = {}
1696 1697
1697 1698 def getbundle2partsgenerator(stepname, idx=None):
1698 1699 """decorator for function generating bundle2 part for getbundle
1699 1700
1700 1701 The function is added to the step -> function mapping and appended to the
1701 1702 list of steps. Beware that decorated functions will be added in order
1702 1703 (this may matter).
1703 1704
1704 1705 You can only use this decorator for new steps, if you want to wrap a step
1705 1706 from an extension, attack the getbundle2partsmapping dictionary directly."""
1706 1707 def dec(func):
1707 1708 assert stepname not in getbundle2partsmapping
1708 1709 getbundle2partsmapping[stepname] = func
1709 1710 if idx is None:
1710 1711 getbundle2partsorder.append(stepname)
1711 1712 else:
1712 1713 getbundle2partsorder.insert(idx, stepname)
1713 1714 return func
1714 1715 return dec
1715 1716
1716 1717 def bundle2requested(bundlecaps):
1717 1718 if bundlecaps is not None:
1718 1719 return any(cap.startswith('HG2') for cap in bundlecaps)
1719 1720 return False
1720 1721
1721 1722 def getbundlechunks(repo, source, heads=None, common=None, bundlecaps=None,
1722 1723 **kwargs):
1723 1724 """Return chunks constituting a bundle's raw data.
1724 1725
1725 1726 Could be a bundle HG10 or a bundle HG20 depending on bundlecaps
1726 1727 passed.
1727 1728
1728 1729 Returns an iterator over raw chunks (of varying sizes).
1729 1730 """
1730 1731 kwargs = pycompat.byteskwargs(kwargs)
1731 1732 usebundle2 = bundle2requested(bundlecaps)
1732 1733 # bundle10 case
1733 1734 if not usebundle2:
1734 1735 if bundlecaps and not kwargs.get('cg', True):
1735 1736 raise ValueError(_('request for bundle10 must include changegroup'))
1736 1737
1737 1738 if kwargs:
1738 1739 raise ValueError(_('unsupported getbundle arguments: %s')
1739 1740 % ', '.join(sorted(kwargs.keys())))
1740 1741 outgoing = _computeoutgoing(repo, heads, common)
1741 1742 return changegroup.makestream(repo, outgoing, '01', source,
1742 1743 bundlecaps=bundlecaps)
1743 1744
1744 1745 # bundle20 case
1745 1746 b2caps = {}
1746 1747 for bcaps in bundlecaps:
1747 1748 if bcaps.startswith('bundle2='):
1748 1749 blob = urlreq.unquote(bcaps[len('bundle2='):])
1749 1750 b2caps.update(bundle2.decodecaps(blob))
1750 1751 bundler = bundle2.bundle20(repo.ui, b2caps)
1751 1752
1752 1753 kwargs['heads'] = heads
1753 1754 kwargs['common'] = common
1754 1755
1755 1756 for name in getbundle2partsorder:
1756 1757 func = getbundle2partsmapping[name]
1757 1758 func(bundler, repo, source, bundlecaps=bundlecaps, b2caps=b2caps,
1758 1759 **pycompat.strkwargs(kwargs))
1759 1760
1760 1761 return bundler.getchunks()
1761 1762
1762 1763 @getbundle2partsgenerator('stream')
1763 1764 def _getbundlestream(bundler, repo, source, bundlecaps=None,
1764 1765 b2caps=None, heads=None, common=None, **kwargs):
1765 1766 if not kwargs.get('stream', False):
1766 1767 return
1767 1768 filecount, bytecount, it = streamclone.generatev2(repo)
1768 1769 requirements = ' '.join(repo.requirements)
1769 1770 part = bundler.newpart('stream', data=it)
1770 1771 part.addparam('bytecount', '%d' % bytecount, mandatory=True)
1771 1772 part.addparam('filecount', '%d' % filecount, mandatory=True)
1772 1773 part.addparam('requirements', requirements, mandatory=True)
1773 1774 part.addparam('version', 'v2', mandatory=True)
1774 1775
1775 1776 @getbundle2partsgenerator('changegroup')
1776 1777 def _getbundlechangegrouppart(bundler, repo, source, bundlecaps=None,
1777 1778 b2caps=None, heads=None, common=None, **kwargs):
1778 1779 """add a changegroup part to the requested bundle"""
1779 1780 cgstream = None
1780 1781 if kwargs.get(r'cg', True):
1781 1782 # build changegroup bundle here.
1782 1783 version = '01'
1783 1784 cgversions = b2caps.get('changegroup')
1784 1785 if cgversions: # 3.1 and 3.2 ship with an empty value
1785 1786 cgversions = [v for v in cgversions
1786 1787 if v in changegroup.supportedoutgoingversions(repo)]
1787 1788 if not cgversions:
1788 1789 raise ValueError(_('no common changegroup version'))
1789 1790 version = max(cgversions)
1790 1791 outgoing = _computeoutgoing(repo, heads, common)
1791 1792 if outgoing.missing:
1792 1793 cgstream = changegroup.makestream(repo, outgoing, version, source,
1793 1794 bundlecaps=bundlecaps)
1794 1795
1795 1796 if cgstream:
1796 1797 part = bundler.newpart('changegroup', data=cgstream)
1797 1798 if cgversions:
1798 1799 part.addparam('version', version)
1799 1800 part.addparam('nbchanges', '%d' % len(outgoing.missing),
1800 1801 mandatory=False)
1801 1802 if 'treemanifest' in repo.requirements:
1802 1803 part.addparam('treemanifest', '1')
1803 1804
1804 1805 @getbundle2partsgenerator('bookmarks')
1805 1806 def _getbundlebookmarkpart(bundler, repo, source, bundlecaps=None,
1806 1807 b2caps=None, **kwargs):
1807 1808 """add a bookmark part to the requested bundle"""
1808 1809 if not kwargs.get(r'bookmarks', False):
1809 1810 return
1810 1811 if 'bookmarks' not in b2caps:
1811 1812 raise ValueError(_('no common bookmarks exchange method'))
1812 1813 books = bookmod.listbinbookmarks(repo)
1813 1814 data = bookmod.binaryencode(books)
1814 1815 if data:
1815 1816 bundler.newpart('bookmarks', data=data)
1816 1817
1817 1818 @getbundle2partsgenerator('listkeys')
1818 1819 def _getbundlelistkeysparts(bundler, repo, source, bundlecaps=None,
1819 1820 b2caps=None, **kwargs):
1820 1821 """add parts containing listkeys namespaces to the requested bundle"""
1821 1822 listkeys = kwargs.get(r'listkeys', ())
1822 1823 for namespace in listkeys:
1823 1824 part = bundler.newpart('listkeys')
1824 1825 part.addparam('namespace', namespace)
1825 1826 keys = repo.listkeys(namespace).items()
1826 1827 part.data = pushkey.encodekeys(keys)
1827 1828
1828 1829 @getbundle2partsgenerator('obsmarkers')
1829 1830 def _getbundleobsmarkerpart(bundler, repo, source, bundlecaps=None,
1830 1831 b2caps=None, heads=None, **kwargs):
1831 1832 """add an obsolescence markers part to the requested bundle"""
1832 1833 if kwargs.get(r'obsmarkers', False):
1833 1834 if heads is None:
1834 1835 heads = repo.heads()
1835 1836 subset = [c.node() for c in repo.set('::%ln', heads)]
1836 1837 markers = repo.obsstore.relevantmarkers(subset)
1837 1838 markers = sorted(markers)
1838 1839 bundle2.buildobsmarkerspart(bundler, markers)
1839 1840
1840 1841 @getbundle2partsgenerator('phases')
1841 1842 def _getbundlephasespart(bundler, repo, source, bundlecaps=None,
1842 1843 b2caps=None, heads=None, **kwargs):
1843 1844 """add phase heads part to the requested bundle"""
1844 1845 if kwargs.get(r'phases', False):
1845 1846 if not 'heads' in b2caps.get('phases'):
1846 1847 raise ValueError(_('no common phases exchange method'))
1847 1848 if heads is None:
1848 1849 heads = repo.heads()
1849 1850
1850 1851 headsbyphase = collections.defaultdict(set)
1851 1852 if repo.publishing():
1852 1853 headsbyphase[phases.public] = heads
1853 1854 else:
1854 1855 # find the appropriate heads to move
1855 1856
1856 1857 phase = repo._phasecache.phase
1857 1858 node = repo.changelog.node
1858 1859 rev = repo.changelog.rev
1859 1860 for h in heads:
1860 1861 headsbyphase[phase(repo, rev(h))].add(h)
1861 1862 seenphases = list(headsbyphase.keys())
1862 1863
1863 1864 # We do not handle anything but public and draft phase for now)
1864 1865 if seenphases:
1865 1866 assert max(seenphases) <= phases.draft
1866 1867
1867 1868 # if client is pulling non-public changesets, we need to find
1868 1869 # intermediate public heads.
1869 1870 draftheads = headsbyphase.get(phases.draft, set())
1870 1871 if draftheads:
1871 1872 publicheads = headsbyphase.get(phases.public, set())
1872 1873
1873 1874 revset = 'heads(only(%ln, %ln) and public())'
1874 1875 extraheads = repo.revs(revset, draftheads, publicheads)
1875 1876 for r in extraheads:
1876 1877 headsbyphase[phases.public].add(node(r))
1877 1878
1878 1879 # transform data in a format used by the encoding function
1879 1880 phasemapping = []
1880 1881 for phase in phases.allphases:
1881 1882 phasemapping.append(sorted(headsbyphase[phase]))
1882 1883
1883 1884 # generate the actual part
1884 1885 phasedata = phases.binaryencode(phasemapping)
1885 1886 bundler.newpart('phase-heads', data=phasedata)
1886 1887
1887 1888 @getbundle2partsgenerator('hgtagsfnodes')
1888 1889 def _getbundletagsfnodes(bundler, repo, source, bundlecaps=None,
1889 1890 b2caps=None, heads=None, common=None,
1890 1891 **kwargs):
1891 1892 """Transfer the .hgtags filenodes mapping.
1892 1893
1893 1894 Only values for heads in this bundle will be transferred.
1894 1895
1895 1896 The part data consists of pairs of 20 byte changeset node and .hgtags
1896 1897 filenodes raw values.
1897 1898 """
1898 1899 # Don't send unless:
1899 1900 # - changeset are being exchanged,
1900 1901 # - the client supports it.
1901 1902 if not (kwargs.get(r'cg', True) and 'hgtagsfnodes' in b2caps):
1902 1903 return
1903 1904
1904 1905 outgoing = _computeoutgoing(repo, heads, common)
1905 1906 bundle2.addparttagsfnodescache(repo, bundler, outgoing)
1906 1907
1907 1908 def check_heads(repo, their_heads, context):
1908 1909 """check if the heads of a repo have been modified
1909 1910
1910 1911 Used by peer for unbundling.
1911 1912 """
1912 1913 heads = repo.heads()
1913 1914 heads_hash = hashlib.sha1(''.join(sorted(heads))).digest()
1914 1915 if not (their_heads == ['force'] or their_heads == heads or
1915 1916 their_heads == ['hashed', heads_hash]):
1916 1917 # someone else committed/pushed/unbundled while we
1917 1918 # were transferring data
1918 1919 raise error.PushRaced('repository changed while %s - '
1919 1920 'please try again' % context)
1920 1921
1921 1922 def unbundle(repo, cg, heads, source, url):
1922 1923 """Apply a bundle to a repo.
1923 1924
1924 1925 this function makes sure the repo is locked during the application and have
1925 1926 mechanism to check that no push race occurred between the creation of the
1926 1927 bundle and its application.
1927 1928
1928 1929 If the push was raced as PushRaced exception is raised."""
1929 1930 r = 0
1930 1931 # need a transaction when processing a bundle2 stream
1931 1932 # [wlock, lock, tr] - needs to be an array so nested functions can modify it
1932 1933 lockandtr = [None, None, None]
1933 1934 recordout = None
1934 1935 # quick fix for output mismatch with bundle2 in 3.4
1935 1936 captureoutput = repo.ui.configbool('experimental', 'bundle2-output-capture')
1936 1937 if url.startswith('remote:http:') or url.startswith('remote:https:'):
1937 1938 captureoutput = True
1938 1939 try:
1939 1940 # note: outside bundle1, 'heads' is expected to be empty and this
1940 1941 # 'check_heads' call wil be a no-op
1941 1942 check_heads(repo, heads, 'uploading changes')
1942 1943 # push can proceed
1943 1944 if not isinstance(cg, bundle2.unbundle20):
1944 1945 # legacy case: bundle1 (changegroup 01)
1945 1946 txnname = "\n".join([source, util.hidepassword(url)])
1946 1947 with repo.lock(), repo.transaction(txnname) as tr:
1947 1948 op = bundle2.applybundle(repo, cg, tr, source, url)
1948 1949 r = bundle2.combinechangegroupresults(op)
1949 1950 else:
1950 1951 r = None
1951 1952 try:
1952 1953 def gettransaction():
1953 1954 if not lockandtr[2]:
1954 1955 lockandtr[0] = repo.wlock()
1955 1956 lockandtr[1] = repo.lock()
1956 1957 lockandtr[2] = repo.transaction(source)
1957 1958 lockandtr[2].hookargs['source'] = source
1958 1959 lockandtr[2].hookargs['url'] = url
1959 1960 lockandtr[2].hookargs['bundle2'] = '1'
1960 1961 return lockandtr[2]
1961 1962
1962 1963 # Do greedy locking by default until we're satisfied with lazy
1963 1964 # locking.
1964 1965 if not repo.ui.configbool('experimental', 'bundle2lazylocking'):
1965 1966 gettransaction()
1966 1967
1967 1968 op = bundle2.bundleoperation(repo, gettransaction,
1968 1969 captureoutput=captureoutput)
1969 1970 try:
1970 1971 op = bundle2.processbundle(repo, cg, op=op)
1971 1972 finally:
1972 1973 r = op.reply
1973 1974 if captureoutput and r is not None:
1974 1975 repo.ui.pushbuffer(error=True, subproc=True)
1975 1976 def recordout(output):
1976 1977 r.newpart('output', data=output, mandatory=False)
1977 1978 if lockandtr[2] is not None:
1978 1979 lockandtr[2].close()
1979 1980 except BaseException as exc:
1980 1981 exc.duringunbundle2 = True
1981 1982 if captureoutput and r is not None:
1982 1983 parts = exc._bundle2salvagedoutput = r.salvageoutput()
1983 1984 def recordout(output):
1984 1985 part = bundle2.bundlepart('output', data=output,
1985 1986 mandatory=False)
1986 1987 parts.append(part)
1987 1988 raise
1988 1989 finally:
1989 1990 lockmod.release(lockandtr[2], lockandtr[1], lockandtr[0])
1990 1991 if recordout is not None:
1991 1992 recordout(repo.ui.popbuffer())
1992 1993 return r
1993 1994
1994 1995 def _maybeapplyclonebundle(pullop):
1995 1996 """Apply a clone bundle from a remote, if possible."""
1996 1997
1997 1998 repo = pullop.repo
1998 1999 remote = pullop.remote
1999 2000
2000 2001 if not repo.ui.configbool('ui', 'clonebundles'):
2001 2002 return
2002 2003
2003 2004 # Only run if local repo is empty.
2004 2005 if len(repo):
2005 2006 return
2006 2007
2007 2008 if pullop.heads:
2008 2009 return
2009 2010
2010 2011 if not remote.capable('clonebundles'):
2011 2012 return
2012 2013
2013 2014 res = remote._call('clonebundles')
2014 2015
2015 2016 # If we call the wire protocol command, that's good enough to record the
2016 2017 # attempt.
2017 2018 pullop.clonebundleattempted = True
2018 2019
2019 2020 entries = parseclonebundlesmanifest(repo, res)
2020 2021 if not entries:
2021 2022 repo.ui.note(_('no clone bundles available on remote; '
2022 2023 'falling back to regular clone\n'))
2023 2024 return
2024 2025
2025 2026 entries = filterclonebundleentries(
2026 2027 repo, entries, streamclonerequested=pullop.streamclonerequested)
2027 2028
2028 2029 if not entries:
2029 2030 # There is a thundering herd concern here. However, if a server
2030 2031 # operator doesn't advertise bundles appropriate for its clients,
2031 2032 # they deserve what's coming. Furthermore, from a client's
2032 2033 # perspective, no automatic fallback would mean not being able to
2033 2034 # clone!
2034 2035 repo.ui.warn(_('no compatible clone bundles available on server; '
2035 2036 'falling back to regular clone\n'))
2036 2037 repo.ui.warn(_('(you may want to report this to the server '
2037 2038 'operator)\n'))
2038 2039 return
2039 2040
2040 2041 entries = sortclonebundleentries(repo.ui, entries)
2041 2042
2042 2043 url = entries[0]['URL']
2043 2044 repo.ui.status(_('applying clone bundle from %s\n') % url)
2044 2045 if trypullbundlefromurl(repo.ui, repo, url):
2045 2046 repo.ui.status(_('finished applying clone bundle\n'))
2046 2047 # Bundle failed.
2047 2048 #
2048 2049 # We abort by default to avoid the thundering herd of
2049 2050 # clients flooding a server that was expecting expensive
2050 2051 # clone load to be offloaded.
2051 2052 elif repo.ui.configbool('ui', 'clonebundlefallback'):
2052 2053 repo.ui.warn(_('falling back to normal clone\n'))
2053 2054 else:
2054 2055 raise error.Abort(_('error applying bundle'),
2055 2056 hint=_('if this error persists, consider contacting '
2056 2057 'the server operator or disable clone '
2057 2058 'bundles via '
2058 2059 '"--config ui.clonebundles=false"'))
2059 2060
2060 2061 def parseclonebundlesmanifest(repo, s):
2061 2062 """Parses the raw text of a clone bundles manifest.
2062 2063
2063 2064 Returns a list of dicts. The dicts have a ``URL`` key corresponding
2064 2065 to the URL and other keys are the attributes for the entry.
2065 2066 """
2066 2067 m = []
2067 2068 for line in s.splitlines():
2068 2069 fields = line.split()
2069 2070 if not fields:
2070 2071 continue
2071 2072 attrs = {'URL': fields[0]}
2072 2073 for rawattr in fields[1:]:
2073 2074 key, value = rawattr.split('=', 1)
2074 2075 key = urlreq.unquote(key)
2075 2076 value = urlreq.unquote(value)
2076 2077 attrs[key] = value
2077 2078
2078 2079 # Parse BUNDLESPEC into components. This makes client-side
2079 2080 # preferences easier to specify since you can prefer a single
2080 2081 # component of the BUNDLESPEC.
2081 2082 if key == 'BUNDLESPEC':
2082 2083 try:
2083 2084 comp, version, params = parsebundlespec(repo, value,
2084 2085 externalnames=True)
2085 2086 attrs['COMPRESSION'] = comp
2086 2087 attrs['VERSION'] = version
2087 2088 except error.InvalidBundleSpecification:
2088 2089 pass
2089 2090 except error.UnsupportedBundleSpecification:
2090 2091 pass
2091 2092
2092 2093 m.append(attrs)
2093 2094
2094 2095 return m
2095 2096
2096 2097 def filterclonebundleentries(repo, entries, streamclonerequested=False):
2097 2098 """Remove incompatible clone bundle manifest entries.
2098 2099
2099 2100 Accepts a list of entries parsed with ``parseclonebundlesmanifest``
2100 2101 and returns a new list consisting of only the entries that this client
2101 2102 should be able to apply.
2102 2103
2103 2104 There is no guarantee we'll be able to apply all returned entries because
2104 2105 the metadata we use to filter on may be missing or wrong.
2105 2106 """
2106 2107 newentries = []
2107 2108 for entry in entries:
2108 2109 spec = entry.get('BUNDLESPEC')
2109 2110 if spec:
2110 2111 try:
2111 2112 comp, version, params = parsebundlespec(repo, spec, strict=True)
2112 2113
2113 2114 # If a stream clone was requested, filter out non-streamclone
2114 2115 # entries.
2115 2116 if streamclonerequested and (comp != 'UN' or version != 's1'):
2116 2117 repo.ui.debug('filtering %s because not a stream clone\n' %
2117 2118 entry['URL'])
2118 2119 continue
2119 2120
2120 2121 except error.InvalidBundleSpecification as e:
2121 2122 repo.ui.debug(str(e) + '\n')
2122 2123 continue
2123 2124 except error.UnsupportedBundleSpecification as e:
2124 2125 repo.ui.debug('filtering %s because unsupported bundle '
2125 2126 'spec: %s\n' % (entry['URL'], str(e)))
2126 2127 continue
2127 2128 # If we don't have a spec and requested a stream clone, we don't know
2128 2129 # what the entry is so don't attempt to apply it.
2129 2130 elif streamclonerequested:
2130 2131 repo.ui.debug('filtering %s because cannot determine if a stream '
2131 2132 'clone bundle\n' % entry['URL'])
2132 2133 continue
2133 2134
2134 2135 if 'REQUIRESNI' in entry and not sslutil.hassni:
2135 2136 repo.ui.debug('filtering %s because SNI not supported\n' %
2136 2137 entry['URL'])
2137 2138 continue
2138 2139
2139 2140 newentries.append(entry)
2140 2141
2141 2142 return newentries
2142 2143
2143 2144 class clonebundleentry(object):
2144 2145 """Represents an item in a clone bundles manifest.
2145 2146
2146 2147 This rich class is needed to support sorting since sorted() in Python 3
2147 2148 doesn't support ``cmp`` and our comparison is complex enough that ``key=``
2148 2149 won't work.
2149 2150 """
2150 2151
2151 2152 def __init__(self, value, prefers):
2152 2153 self.value = value
2153 2154 self.prefers = prefers
2154 2155
2155 2156 def _cmp(self, other):
2156 2157 for prefkey, prefvalue in self.prefers:
2157 2158 avalue = self.value.get(prefkey)
2158 2159 bvalue = other.value.get(prefkey)
2159 2160
2160 2161 # Special case for b missing attribute and a matches exactly.
2161 2162 if avalue is not None and bvalue is None and avalue == prefvalue:
2162 2163 return -1
2163 2164
2164 2165 # Special case for a missing attribute and b matches exactly.
2165 2166 if bvalue is not None and avalue is None and bvalue == prefvalue:
2166 2167 return 1
2167 2168
2168 2169 # We can't compare unless attribute present on both.
2169 2170 if avalue is None or bvalue is None:
2170 2171 continue
2171 2172
2172 2173 # Same values should fall back to next attribute.
2173 2174 if avalue == bvalue:
2174 2175 continue
2175 2176
2176 2177 # Exact matches come first.
2177 2178 if avalue == prefvalue:
2178 2179 return -1
2179 2180 if bvalue == prefvalue:
2180 2181 return 1
2181 2182
2182 2183 # Fall back to next attribute.
2183 2184 continue
2184 2185
2185 2186 # If we got here we couldn't sort by attributes and prefers. Fall
2186 2187 # back to index order.
2187 2188 return 0
2188 2189
2189 2190 def __lt__(self, other):
2190 2191 return self._cmp(other) < 0
2191 2192
2192 2193 def __gt__(self, other):
2193 2194 return self._cmp(other) > 0
2194 2195
2195 2196 def __eq__(self, other):
2196 2197 return self._cmp(other) == 0
2197 2198
2198 2199 def __le__(self, other):
2199 2200 return self._cmp(other) <= 0
2200 2201
2201 2202 def __ge__(self, other):
2202 2203 return self._cmp(other) >= 0
2203 2204
2204 2205 def __ne__(self, other):
2205 2206 return self._cmp(other) != 0
2206 2207
2207 2208 def sortclonebundleentries(ui, entries):
2208 2209 prefers = ui.configlist('ui', 'clonebundleprefers')
2209 2210 if not prefers:
2210 2211 return list(entries)
2211 2212
2212 2213 prefers = [p.split('=', 1) for p in prefers]
2213 2214
2214 2215 items = sorted(clonebundleentry(v, prefers) for v in entries)
2215 2216 return [i.value for i in items]
2216 2217
2217 2218 def trypullbundlefromurl(ui, repo, url):
2218 2219 """Attempt to apply a bundle from a URL."""
2219 2220 with repo.lock(), repo.transaction('bundleurl') as tr:
2220 2221 try:
2221 2222 fh = urlmod.open(ui, url)
2222 2223 cg = readbundle(ui, fh, 'stream')
2223 2224
2224 2225 if isinstance(cg, streamclone.streamcloneapplier):
2225 2226 cg.apply(repo)
2226 2227 else:
2227 2228 bundle2.applybundle(repo, cg, tr, 'clonebundles', url)
2228 2229 return True
2229 2230 except urlerr.httperror as e:
2230 2231 ui.warn(_('HTTP error fetching bundle: %s\n') % str(e))
2231 2232 except urlerr.urlerror as e:
2232 2233 ui.warn(_('error fetching bundle: %s\n') % e.reason)
2233 2234
2234 2235 return False
@@ -1,541 +1,595 b''
1 1 # streamclone.py - producing and consuming streaming repository data
2 2 #
3 3 # Copyright 2015 Gregory Szorc <gregory.szorc@gmail.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 import contextlib
11 import os
10 12 import struct
13 import tempfile
11 14
12 15 from .i18n import _
13 16 from . import (
14 17 branchmap,
15 18 error,
16 19 phases,
17 20 store,
18 21 util,
19 22 )
20 23
21 24 def canperformstreamclone(pullop, bundle2=False):
22 25 """Whether it is possible to perform a streaming clone as part of pull.
23 26
24 27 ``bundle2`` will cause the function to consider stream clone through
25 28 bundle2 and only through bundle2.
26 29
27 30 Returns a tuple of (supported, requirements). ``supported`` is True if
28 31 streaming clone is supported and False otherwise. ``requirements`` is
29 32 a set of repo requirements from the remote, or ``None`` if stream clone
30 33 isn't supported.
31 34 """
32 35 repo = pullop.repo
33 36 remote = pullop.remote
34 37
35 38 bundle2supported = False
36 39 if pullop.canusebundle2:
37 40 if 'v2' in pullop.remotebundle2caps.get('stream', []):
38 41 bundle2supported = True
39 42 # else
40 43 # Server doesn't support bundle2 stream clone or doesn't support
41 44 # the versions we support. Fall back and possibly allow legacy.
42 45
43 46 # Ensures legacy code path uses available bundle2.
44 47 if bundle2supported and not bundle2:
45 48 return False, None
46 49 # Ensures bundle2 doesn't try to do a stream clone if it isn't supported.
47 50 elif bundle2 and not bundle2supported:
48 51 return False, None
49 52
50 53 # Streaming clone only works on empty repositories.
51 54 if len(repo):
52 55 return False, None
53 56
54 57 # Streaming clone only works if all data is being requested.
55 58 if pullop.heads:
56 59 return False, None
57 60
58 61 streamrequested = pullop.streamclonerequested
59 62
60 63 # If we don't have a preference, let the server decide for us. This
61 64 # likely only comes into play in LANs.
62 65 if streamrequested is None:
63 66 # The server can advertise whether to prefer streaming clone.
64 67 streamrequested = remote.capable('stream-preferred')
65 68
66 69 if not streamrequested:
67 70 return False, None
68 71
69 72 # In order for stream clone to work, the client has to support all the
70 73 # requirements advertised by the server.
71 74 #
72 75 # The server advertises its requirements via the "stream" and "streamreqs"
73 76 # capability. "stream" (a value-less capability) is advertised if and only
74 77 # if the only requirement is "revlogv1." Else, the "streamreqs" capability
75 78 # is advertised and contains a comma-delimited list of requirements.
76 79 requirements = set()
77 80 if remote.capable('stream'):
78 81 requirements.add('revlogv1')
79 82 else:
80 83 streamreqs = remote.capable('streamreqs')
81 84 # This is weird and shouldn't happen with modern servers.
82 85 if not streamreqs:
83 86 pullop.repo.ui.warn(_(
84 87 'warning: stream clone requested but server has them '
85 88 'disabled\n'))
86 89 return False, None
87 90
88 91 streamreqs = set(streamreqs.split(','))
89 92 # Server requires something we don't support. Bail.
90 93 missingreqs = streamreqs - repo.supportedformats
91 94 if missingreqs:
92 95 pullop.repo.ui.warn(_(
93 96 'warning: stream clone requested but client is missing '
94 97 'requirements: %s\n') % ', '.join(sorted(missingreqs)))
95 98 pullop.repo.ui.warn(
96 99 _('(see https://www.mercurial-scm.org/wiki/MissingRequirement '
97 100 'for more information)\n'))
98 101 return False, None
99 102 requirements = streamreqs
100 103
101 104 return True, requirements
102 105
103 106 def maybeperformlegacystreamclone(pullop):
104 107 """Possibly perform a legacy stream clone operation.
105 108
106 109 Legacy stream clones are performed as part of pull but before all other
107 110 operations.
108 111
109 112 A legacy stream clone will not be performed if a bundle2 stream clone is
110 113 supported.
111 114 """
112 115 supported, requirements = canperformstreamclone(pullop)
113 116
114 117 if not supported:
115 118 return
116 119
117 120 repo = pullop.repo
118 121 remote = pullop.remote
119 122
120 123 # Save remote branchmap. We will use it later to speed up branchcache
121 124 # creation.
122 125 rbranchmap = None
123 126 if remote.capable('branchmap'):
124 127 rbranchmap = remote.branchmap()
125 128
126 129 repo.ui.status(_('streaming all changes\n'))
127 130
128 131 fp = remote.stream_out()
129 132 l = fp.readline()
130 133 try:
131 134 resp = int(l)
132 135 except ValueError:
133 136 raise error.ResponseError(
134 137 _('unexpected response from remote server:'), l)
135 138 if resp == 1:
136 139 raise error.Abort(_('operation forbidden by server'))
137 140 elif resp == 2:
138 141 raise error.Abort(_('locking the remote repository failed'))
139 142 elif resp != 0:
140 143 raise error.Abort(_('the server sent an unknown error code'))
141 144
142 145 l = fp.readline()
143 146 try:
144 147 filecount, bytecount = map(int, l.split(' ', 1))
145 148 except (ValueError, TypeError):
146 149 raise error.ResponseError(
147 150 _('unexpected response from remote server:'), l)
148 151
149 152 with repo.lock():
150 153 consumev1(repo, fp, filecount, bytecount)
151 154
152 155 # new requirements = old non-format requirements +
153 156 # new format-related remote requirements
154 157 # requirements from the streamed-in repository
155 158 repo.requirements = requirements | (
156 159 repo.requirements - repo.supportedformats)
157 160 repo._applyopenerreqs()
158 161 repo._writerequirements()
159 162
160 163 if rbranchmap:
161 164 branchmap.replacecache(repo, rbranchmap)
162 165
163 166 repo.invalidate()
164 167
165 168 def allowservergeneration(repo):
166 169 """Whether streaming clones are allowed from the server."""
167 170 if not repo.ui.configbool('server', 'uncompressed', untrusted=True):
168 171 return False
169 172
170 173 # The way stream clone works makes it impossible to hide secret changesets.
171 174 # So don't allow this by default.
172 175 secret = phases.hassecret(repo)
173 176 if secret:
174 177 return repo.ui.configbool('server', 'uncompressedallowsecret')
175 178
176 179 return True
177 180
178 181 # This is it's own function so extensions can override it.
179 182 def _walkstreamfiles(repo):
180 183 return repo.store.walk()
181 184
182 185 def generatev1(repo):
183 186 """Emit content for version 1 of a streaming clone.
184 187
185 188 This returns a 3-tuple of (file count, byte size, data iterator).
186 189
187 190 The data iterator consists of N entries for each file being transferred.
188 191 Each file entry starts as a line with the file name and integer size
189 192 delimited by a null byte.
190 193
191 194 The raw file data follows. Following the raw file data is the next file
192 195 entry, or EOF.
193 196
194 197 When used on the wire protocol, an additional line indicating protocol
195 198 success will be prepended to the stream. This function is not responsible
196 199 for adding it.
197 200
198 201 This function will obtain a repository lock to ensure a consistent view of
199 202 the store is captured. It therefore may raise LockError.
200 203 """
201 204 entries = []
202 205 total_bytes = 0
203 206 # Get consistent snapshot of repo, lock during scan.
204 207 with repo.lock():
205 208 repo.ui.debug('scanning\n')
206 209 for name, ename, size in _walkstreamfiles(repo):
207 210 if size:
208 211 entries.append((name, size))
209 212 total_bytes += size
210 213
211 214 repo.ui.debug('%d files, %d bytes to transfer\n' %
212 215 (len(entries), total_bytes))
213 216
214 217 svfs = repo.svfs
215 218 debugflag = repo.ui.debugflag
216 219
217 220 def emitrevlogdata():
218 221 for name, size in entries:
219 222 if debugflag:
220 223 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
221 224 # partially encode name over the wire for backwards compat
222 225 yield '%s\0%d\n' % (store.encodedir(name), size)
223 226 # auditing at this stage is both pointless (paths are already
224 227 # trusted by the local repo) and expensive
225 228 with svfs(name, 'rb', auditpath=False) as fp:
226 229 if size <= 65536:
227 230 yield fp.read(size)
228 231 else:
229 232 for chunk in util.filechunkiter(fp, limit=size):
230 233 yield chunk
231 234
232 235 return len(entries), total_bytes, emitrevlogdata()
233 236
234 237 def generatev1wireproto(repo):
235 238 """Emit content for version 1 of streaming clone suitable for the wire.
236 239
237 240 This is the data output from ``generatev1()`` with 2 header lines. The
238 241 first line indicates overall success. The 2nd contains the file count and
239 242 byte size of payload.
240 243
241 244 The success line contains "0" for success, "1" for stream generation not
242 245 allowed, and "2" for error locking the repository (possibly indicating
243 246 a permissions error for the server process).
244 247 """
245 248 if not allowservergeneration(repo):
246 249 yield '1\n'
247 250 return
248 251
249 252 try:
250 253 filecount, bytecount, it = generatev1(repo)
251 254 except error.LockError:
252 255 yield '2\n'
253 256 return
254 257
255 258 # Indicates successful response.
256 259 yield '0\n'
257 260 yield '%d %d\n' % (filecount, bytecount)
258 261 for chunk in it:
259 262 yield chunk
260 263
261 264 def generatebundlev1(repo, compression='UN'):
262 265 """Emit content for version 1 of a stream clone bundle.
263 266
264 267 The first 4 bytes of the output ("HGS1") denote this as stream clone
265 268 bundle version 1.
266 269
267 270 The next 2 bytes indicate the compression type. Only "UN" is currently
268 271 supported.
269 272
270 273 The next 16 bytes are two 64-bit big endian unsigned integers indicating
271 274 file count and byte count, respectively.
272 275
273 276 The next 2 bytes is a 16-bit big endian unsigned short declaring the length
274 277 of the requirements string, including a trailing \0. The following N bytes
275 278 are the requirements string, which is ASCII containing a comma-delimited
276 279 list of repo requirements that are needed to support the data.
277 280
278 281 The remaining content is the output of ``generatev1()`` (which may be
279 282 compressed in the future).
280 283
281 284 Returns a tuple of (requirements, data generator).
282 285 """
283 286 if compression != 'UN':
284 287 raise ValueError('we do not support the compression argument yet')
285 288
286 289 requirements = repo.requirements & repo.supportedformats
287 290 requires = ','.join(sorted(requirements))
288 291
289 292 def gen():
290 293 yield 'HGS1'
291 294 yield compression
292 295
293 296 filecount, bytecount, it = generatev1(repo)
294 297 repo.ui.status(_('writing %d bytes for %d files\n') %
295 298 (bytecount, filecount))
296 299
297 300 yield struct.pack('>QQ', filecount, bytecount)
298 301 yield struct.pack('>H', len(requires) + 1)
299 302 yield requires + '\0'
300 303
301 304 # This is where we'll add compression in the future.
302 305 assert compression == 'UN'
303 306
304 307 seen = 0
305 308 repo.ui.progress(_('bundle'), 0, total=bytecount, unit=_('bytes'))
306 309
307 310 for chunk in it:
308 311 seen += len(chunk)
309 312 repo.ui.progress(_('bundle'), seen, total=bytecount,
310 313 unit=_('bytes'))
311 314 yield chunk
312 315
313 316 repo.ui.progress(_('bundle'), None)
314 317
315 318 return requirements, gen()
316 319
317 320 def consumev1(repo, fp, filecount, bytecount):
318 321 """Apply the contents from version 1 of a streaming clone file handle.
319 322
320 323 This takes the output from "stream_out" and applies it to the specified
321 324 repository.
322 325
323 326 Like "stream_out," the status line added by the wire protocol is not
324 327 handled by this function.
325 328 """
326 329 with repo.lock():
327 330 repo.ui.status(_('%d files to transfer, %s of data\n') %
328 331 (filecount, util.bytecount(bytecount)))
329 332 handled_bytes = 0
330 333 repo.ui.progress(_('clone'), 0, total=bytecount, unit=_('bytes'))
331 334 start = util.timer()
332 335
333 336 # TODO: get rid of (potential) inconsistency
334 337 #
335 338 # If transaction is started and any @filecache property is
336 339 # changed at this point, it causes inconsistency between
337 340 # in-memory cached property and streamclone-ed file on the
338 341 # disk. Nested transaction prevents transaction scope "clone"
339 342 # below from writing in-memory changes out at the end of it,
340 343 # even though in-memory changes are discarded at the end of it
341 344 # regardless of transaction nesting.
342 345 #
343 346 # But transaction nesting can't be simply prohibited, because
344 347 # nesting occurs also in ordinary case (e.g. enabling
345 348 # clonebundles).
346 349
347 350 with repo.transaction('clone'):
348 351 with repo.svfs.backgroundclosing(repo.ui, expectedcount=filecount):
349 352 for i in xrange(filecount):
350 353 # XXX doesn't support '\n' or '\r' in filenames
351 354 l = fp.readline()
352 355 try:
353 356 name, size = l.split('\0', 1)
354 357 size = int(size)
355 358 except (ValueError, TypeError):
356 359 raise error.ResponseError(
357 360 _('unexpected response from remote server:'), l)
358 361 if repo.ui.debugflag:
359 362 repo.ui.debug('adding %s (%s)\n' %
360 363 (name, util.bytecount(size)))
361 364 # for backwards compat, name was partially encoded
362 365 path = store.decodedir(name)
363 366 with repo.svfs(path, 'w', backgroundclose=True) as ofp:
364 367 for chunk in util.filechunkiter(fp, limit=size):
365 368 handled_bytes += len(chunk)
366 369 repo.ui.progress(_('clone'), handled_bytes,
367 370 total=bytecount, unit=_('bytes'))
368 371 ofp.write(chunk)
369 372
370 373 # force @filecache properties to be reloaded from
371 374 # streamclone-ed file at next access
372 375 repo.invalidate(clearfilecache=True)
373 376
374 377 elapsed = util.timer() - start
375 378 if elapsed <= 0:
376 379 elapsed = 0.001
377 380 repo.ui.progress(_('clone'), None)
378 381 repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') %
379 382 (util.bytecount(bytecount), elapsed,
380 383 util.bytecount(bytecount / elapsed)))
381 384
382 385 def readbundle1header(fp):
383 386 compression = fp.read(2)
384 387 if compression != 'UN':
385 388 raise error.Abort(_('only uncompressed stream clone bundles are '
386 389 'supported; got %s') % compression)
387 390
388 391 filecount, bytecount = struct.unpack('>QQ', fp.read(16))
389 392 requireslen = struct.unpack('>H', fp.read(2))[0]
390 393 requires = fp.read(requireslen)
391 394
392 395 if not requires.endswith('\0'):
393 396 raise error.Abort(_('malformed stream clone bundle: '
394 397 'requirements not properly encoded'))
395 398
396 399 requirements = set(requires.rstrip('\0').split(','))
397 400
398 401 return filecount, bytecount, requirements
399 402
400 403 def applybundlev1(repo, fp):
401 404 """Apply the content from a stream clone bundle version 1.
402 405
403 406 We assume the 4 byte header has been read and validated and the file handle
404 407 is at the 2 byte compression identifier.
405 408 """
406 409 if len(repo):
407 410 raise error.Abort(_('cannot apply stream clone bundle on non-empty '
408 411 'repo'))
409 412
410 413 filecount, bytecount, requirements = readbundle1header(fp)
411 414 missingreqs = requirements - repo.supportedformats
412 415 if missingreqs:
413 416 raise error.Abort(_('unable to apply stream clone: '
414 417 'unsupported format: %s') %
415 418 ', '.join(sorted(missingreqs)))
416 419
417 420 consumev1(repo, fp, filecount, bytecount)
418 421
419 422 class streamcloneapplier(object):
420 423 """Class to manage applying streaming clone bundles.
421 424
422 425 We need to wrap ``applybundlev1()`` in a dedicated type to enable bundle
423 426 readers to perform bundle type-specific functionality.
424 427 """
425 428 def __init__(self, fh):
426 429 self._fh = fh
427 430
428 431 def apply(self, repo):
429 432 return applybundlev1(repo, self._fh)
430 433
434 # type of file to stream
435 _fileappend = 0 # append only file
436 _filefull = 1 # full snapshot file
437
438 # This is it's own function so extensions can override it.
439 def _walkstreamfullstorefiles(repo):
440 """list snapshot file from the store"""
441 fnames = []
442 if not repo.publishing():
443 fnames.append('phaseroots')
444 return fnames
445
446 def _filterfull(entry, copy, vfs):
447 """actually copy the snapshot files"""
448 name, ftype, data = entry
449 if ftype != _filefull:
450 return entry
451 return (name, ftype, copy(vfs.join(name)))
452
453 @contextlib.contextmanager
454 def maketempcopies():
455 """return a function to temporary copy file"""
456 files = []
457 try:
458 def copy(src):
459 fd, dst = tempfile.mkstemp()
460 os.close(fd)
461 files.append(dst)
462 util.copyfiles(src, dst, hardlink=True)
463 return dst
464 yield copy
465 finally:
466 for tmp in files:
467 util.tryunlink(tmp)
468
431 469 def _emit(repo, entries, totalfilesize):
432 470 """actually emit the stream bundle"""
471 vfs = repo.svfs
433 472 progress = repo.ui.progress
434 473 progress(_('bundle'), 0, total=totalfilesize, unit=_('bytes'))
435 vfs = repo.svfs
474 with maketempcopies() as copy:
436 475 try:
476 # copy is delayed until we are in the try
477 entries = [_filterfull(e, copy, vfs) for e in entries]
478 yield None # this release the lock on the repository
437 479 seen = 0
438 for name, size in entries:
480
481 for name, ftype, data in entries:
439 482 yield util.uvarintencode(len(name))
483 if ftype == _fileappend:
440 484 fp = vfs(name)
485 size = data
486 elif ftype == _filefull:
487 fp = open(data, 'rb')
488 size = util.fstat(fp).st_size
441 489 try:
442 490 yield util.uvarintencode(size)
443 491 yield name
444 492 if size <= 65536:
445 493 chunks = (fp.read(size),)
446 494 else:
447 495 chunks = util.filechunkiter(fp, limit=size)
448 496 for chunk in chunks:
449 497 seen += len(chunk)
450 498 progress(_('bundle'), seen, total=totalfilesize,
451 499 unit=_('bytes'))
452 500 yield chunk
453 501 finally:
454 502 fp.close()
455 503 finally:
456 504 progress(_('bundle'), None)
457 505
458 506 def generatev2(repo):
459 507 """Emit content for version 2 of a streaming clone.
460 508
461 509 the data stream consists the following entries:
462 510 1) A varint containing the length of the filename
463 511 2) A varint containing the length of file data
464 512 3) N bytes containing the filename (the internal, store-agnostic form)
465 513 4) N bytes containing the file data
466 514
467 515 Returns a 3-tuple of (file count, file size, data iterator).
468 516 """
469 517
470 518 with repo.lock():
471 519
472 520 entries = []
473 521 totalfilesize = 0
474 522
475 523 repo.ui.debug('scanning\n')
476 524 for name, ename, size in _walkstreamfiles(repo):
477 525 if size:
478 entries.append((name, size))
526 entries.append((name, _fileappend, size))
479 527 totalfilesize += size
528 for name in _walkstreamfullstorefiles(repo):
529 if repo.svfs.exists(name):
530 totalfilesize += repo.svfs.lstat(name).st_size
531 entries.append((name, _filefull, None))
480 532
481 533 chunks = _emit(repo, entries, totalfilesize)
534 first = next(chunks)
535 assert first is None
482 536
483 537 return len(entries), totalfilesize, chunks
484 538
485 539 def consumev2(repo, fp, filecount, filesize):
486 540 """Apply the contents from a version 2 streaming clone.
487 541
488 542 Data is read from an object that only needs to provide a ``read(size)``
489 543 method.
490 544 """
491 545 with repo.lock():
492 546 repo.ui.status(_('%d files to transfer, %s of data\n') %
493 547 (filecount, util.bytecount(filesize)))
494 548
495 549 start = util.timer()
496 550 handledbytes = 0
497 551 progress = repo.ui.progress
498 552
499 553 progress(_('clone'), handledbytes, total=filesize, unit=_('bytes'))
500 554
501 555 vfs = repo.svfs
502 556
503 557 with repo.transaction('clone'):
504 558 with vfs.backgroundclosing(repo.ui):
505 559 for i in range(filecount):
506 560 namelen = util.uvarintdecodestream(fp)
507 561 datalen = util.uvarintdecodestream(fp)
508 562
509 563 name = fp.read(namelen)
510 564
511 565 if repo.ui.debugflag:
512 566 repo.ui.debug('adding %s (%s)\n' %
513 567 (name, util.bytecount(datalen)))
514 568
515 569 with vfs(name, 'w') as ofp:
516 570 for chunk in util.filechunkiter(fp, limit=datalen):
517 571 handledbytes += len(chunk)
518 572 progress(_('clone'), handledbytes, total=filesize,
519 573 unit=_('bytes'))
520 574 ofp.write(chunk)
521 575
522 576 # force @filecache properties to be reloaded from
523 577 # streamclone-ed file at next access
524 578 repo.invalidate(clearfilecache=True)
525 579
526 580 elapsed = util.timer() - start
527 581 if elapsed <= 0:
528 582 elapsed = 0.001
529 583 progress(_('clone'), None)
530 584 repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') %
531 585 (util.bytecount(handledbytes), elapsed,
532 586 util.bytecount(handledbytes / elapsed)))
533 587
534 588 def applybundlev2(repo, fp, filecount, filesize, requirements):
535 589 missingreqs = [r for r in requirements if r not in repo.supported]
536 590 if missingreqs:
537 591 raise error.Abort(_('unable to apply stream clone: '
538 592 'unsupported format: %s') %
539 593 ', '.join(sorted(missingreqs)))
540 594
541 595 consumev2(repo, fp, filecount, filesize)
@@ -1,332 +1,330 b''
1 1 #require serve
2 2
3 3 #testcases stream-legacy stream-bundle2
4 4
5 5 #if stream-bundle2
6 6 $ cat << EOF >> $HGRCPATH
7 7 > [experimental]
8 8 > bundle2.stream = yes
9 9 > EOF
10 10 #endif
11 11
12 12 Initialize repository
13 13 the status call is to check for issue5130
14 14
15 15 $ hg init server
16 16 $ cd server
17 17 $ touch foo
18 18 $ hg -q commit -A -m initial
19 19 >>> for i in range(1024):
20 20 ... with open(str(i), 'wb') as fh:
21 21 ... fh.write(str(i))
22 22 $ hg -q commit -A -m 'add a lot of files'
23 23 $ hg st
24 24 $ hg serve -p $HGPORT -d --pid-file=hg.pid
25 25 $ cat hg.pid >> $DAEMON_PIDS
26 26 $ cd ..
27 27
28 28 Basic clone
29 29
30 30 #if stream-legacy
31 31 $ hg clone --stream -U http://localhost:$HGPORT clone1
32 32 streaming all changes
33 33 1027 files to transfer, 96.3 KB of data
34 34 transferred 96.3 KB in * seconds (*/sec) (glob)
35 35 searching for changes
36 36 no changes found
37 37 #endif
38 38 #if stream-bundle2
39 39 $ hg clone --stream -U http://localhost:$HGPORT clone1
40 40 streaming all changes
41 41 1027 files to transfer, 96.3 KB of data
42 42 transferred 96.3 KB in * seconds (* */sec) (glob)
43 43 #endif
44 44
45 45 --uncompressed is an alias to --stream
46 46
47 47 #if stream-legacy
48 48 $ hg clone --uncompressed -U http://localhost:$HGPORT clone1-uncompressed
49 49 streaming all changes
50 50 1027 files to transfer, 96.3 KB of data
51 51 transferred 96.3 KB in * seconds (*/sec) (glob)
52 52 searching for changes
53 53 no changes found
54 54 #endif
55 55 #if stream-bundle2
56 56 $ hg clone --uncompressed -U http://localhost:$HGPORT clone1-uncompressed
57 57 streaming all changes
58 58 1027 files to transfer, 96.3 KB of data
59 59 transferred 96.3 KB in * seconds (* */sec) (glob)
60 60 #endif
61 61
62 62 Clone with background file closing enabled
63 63
64 64 #if stream-legacy
65 65 $ hg --debug --config worker.backgroundclose=true --config worker.backgroundcloseminfilecount=1 clone --stream -U http://localhost:$HGPORT clone-background | grep -v adding
66 66 using http://localhost:$HGPORT/
67 67 sending capabilities command
68 68 sending branchmap command
69 69 streaming all changes
70 70 sending stream_out command
71 71 1027 files to transfer, 96.3 KB of data
72 72 starting 4 threads for background file closing
73 73 transferred 96.3 KB in * seconds (*/sec) (glob)
74 74 query 1; heads
75 75 sending batch command
76 76 searching for changes
77 77 all remote heads known locally
78 78 no changes found
79 79 sending getbundle command
80 80 bundle2-input-bundle: with-transaction
81 81 bundle2-input-part: "listkeys" (params: 1 mandatory) supported
82 82 bundle2-input-part: "phase-heads" supported
83 83 bundle2-input-part: total payload size 24
84 84 bundle2-input-bundle: 1 parts total
85 85 checking for updated bookmarks
86 86 #endif
87 87 #if stream-bundle2
88 88 $ hg --debug --config worker.backgroundclose=true --config worker.backgroundcloseminfilecount=1 clone --stream -U http://localhost:$HGPORT clone-background | grep -v adding
89 89 using http://localhost:$HGPORT/
90 90 sending capabilities command
91 91 query 1; heads
92 92 sending batch command
93 93 streaming all changes
94 94 sending getbundle command
95 95 bundle2-input-bundle: with-transaction
96 96 bundle2-input-part: "stream" (params: 4 mandatory) supported
97 97 applying stream bundle
98 98 1027 files to transfer, 96.3 KB of data
99 99 starting 4 threads for background file closing
100 100 transferred 96.3 KB in * seconds (* */sec) (glob)
101 101 bundle2-input-part: total payload size 110887
102 102 bundle2-input-part: "listkeys" (params: 1 mandatory) supported
103 bundle2-input-part: "phase-heads" supported
104 bundle2-input-part: total payload size 24
105 bundle2-input-bundle: 2 parts total
103 bundle2-input-bundle: 1 parts total
106 104 checking for updated bookmarks
107 105 #endif
108 106
109 107 Cannot stream clone when there are secret changesets
110 108
111 109 $ hg -R server phase --force --secret -r tip
112 110 $ hg clone --stream -U http://localhost:$HGPORT secret-denied
113 111 warning: stream clone requested but server has them disabled
114 112 requesting all changes
115 113 adding changesets
116 114 adding manifests
117 115 adding file changes
118 116 added 1 changesets with 1 changes to 1 files
119 117 new changesets 96ee1d7354c4
120 118
121 119 $ killdaemons.py
122 120
123 121 Streaming of secrets can be overridden by server config
124 122
125 123 $ cd server
126 124 $ hg serve --config server.uncompressedallowsecret=true -p $HGPORT -d --pid-file=hg.pid
127 125 $ cat hg.pid > $DAEMON_PIDS
128 126 $ cd ..
129 127
130 128 #if stream-legacy
131 129 $ hg clone --stream -U http://localhost:$HGPORT secret-allowed
132 130 streaming all changes
133 131 1027 files to transfer, 96.3 KB of data
134 132 transferred 96.3 KB in * seconds (*/sec) (glob)
135 133 searching for changes
136 134 no changes found
137 135 #endif
138 136 #if stream-bundle2
139 137 $ hg clone --stream -U http://localhost:$HGPORT secret-allowed
140 138 streaming all changes
141 139 1027 files to transfer, 96.3 KB of data
142 140 transferred 96.3 KB in * seconds (* */sec) (glob)
143 141 #endif
144 142
145 143 $ killdaemons.py
146 144
147 145 Verify interaction between preferuncompressed and secret presence
148 146
149 147 $ cd server
150 148 $ hg serve --config server.preferuncompressed=true -p $HGPORT -d --pid-file=hg.pid
151 149 $ cat hg.pid > $DAEMON_PIDS
152 150 $ cd ..
153 151
154 152 $ hg clone -U http://localhost:$HGPORT preferuncompressed-secret
155 153 requesting all changes
156 154 adding changesets
157 155 adding manifests
158 156 adding file changes
159 157 added 1 changesets with 1 changes to 1 files
160 158 new changesets 96ee1d7354c4
161 159
162 160 $ killdaemons.py
163 161
164 162 Clone not allowed when full bundles disabled and can't serve secrets
165 163
166 164 $ cd server
167 165 $ hg serve --config server.disablefullbundle=true -p $HGPORT -d --pid-file=hg.pid
168 166 $ cat hg.pid > $DAEMON_PIDS
169 167 $ cd ..
170 168
171 169 $ hg clone --stream http://localhost:$HGPORT secret-full-disabled
172 170 warning: stream clone requested but server has them disabled
173 171 requesting all changes
174 172 remote: abort: server has pull-based clones disabled
175 173 abort: pull failed on remote
176 174 (remove --pull if specified or upgrade Mercurial)
177 175 [255]
178 176
179 177 Local stream clone with secrets involved
180 178 (This is just a test over behavior: if you have access to the repo's files,
181 179 there is no security so it isn't important to prevent a clone here.)
182 180
183 181 $ hg clone -U --stream server local-secret
184 182 warning: stream clone requested but server has them disabled
185 183 requesting all changes
186 184 adding changesets
187 185 adding manifests
188 186 adding file changes
189 187 added 1 changesets with 1 changes to 1 files
190 188 new changesets 96ee1d7354c4
191 189
192 190 Stream clone while repo is changing:
193 191
194 192 $ mkdir changing
195 193 $ cd changing
196 194
197 195 extension for delaying the server process so we reliably can modify the repo
198 196 while cloning
199 197
200 198 $ cat > delayer.py <<EOF
201 199 > import time
202 200 > from mercurial import extensions, vfs
203 201 > def __call__(orig, self, path, *args, **kwargs):
204 202 > if path == 'data/f1.i':
205 203 > time.sleep(2)
206 204 > return orig(self, path, *args, **kwargs)
207 205 > extensions.wrapfunction(vfs.vfs, '__call__', __call__)
208 206 > EOF
209 207
210 208 prepare repo with small and big file to cover both code paths in emitrevlogdata
211 209
212 210 $ hg init repo
213 211 $ touch repo/f1
214 212 $ $TESTDIR/seq.py 50000 > repo/f2
215 213 $ hg -R repo ci -Aqm "0"
216 214 $ hg serve -R repo -p $HGPORT1 -d --pid-file=hg.pid --config extensions.delayer=delayer.py
217 215 $ cat hg.pid >> $DAEMON_PIDS
218 216
219 217 clone while modifying the repo between stating file with write lock and
220 218 actually serving file content
221 219
222 220 $ hg clone -q --stream -U http://localhost:$HGPORT1 clone &
223 221 $ sleep 1
224 222 $ echo >> repo/f1
225 223 $ echo >> repo/f2
226 224 $ hg -R repo ci -m "1"
227 225 $ wait
228 226 $ hg -R clone id
229 227 000000000000
230 228 $ cd ..
231 229
232 230 Stream repository with bookmarks
233 231 --------------------------------
234 232
235 233 (revert introduction of secret changeset)
236 234
237 235 $ hg -R server phase --draft 'secret()'
238 236
239 237 add a bookmark
240 238
241 239 $ hg -R server bookmark -r tip some-bookmark
242 240
243 241 clone it
244 242
245 243 #if stream-legacy
246 244 $ hg clone --stream http://localhost:$HGPORT with-bookmarks
247 245 streaming all changes
248 246 1027 files to transfer, 96.3 KB of data
249 247 transferred 96.3 KB in * seconds (*) (glob)
250 248 searching for changes
251 249 no changes found
252 250 updating to branch default
253 251 1025 files updated, 0 files merged, 0 files removed, 0 files unresolved
254 252 #endif
255 253 #if stream-bundle2
256 254 $ hg clone --stream http://localhost:$HGPORT with-bookmarks
257 255 streaming all changes
258 256 1027 files to transfer, 96.3 KB of data
259 257 transferred 96.3 KB in * seconds (* */sec) (glob)
260 258 updating to branch default
261 259 1025 files updated, 0 files merged, 0 files removed, 0 files unresolved
262 260 #endif
263 261 $ hg -R with-bookmarks bookmarks
264 262 some-bookmark 1:c17445101a72
265 263
266 264 Stream repository with phases
267 265 -----------------------------
268 266
269 267 Clone as publishing
270 268
271 269 $ hg -R server phase -r 'all()'
272 270 0: draft
273 271 1: draft
274 272
275 273 #if stream-legacy
276 274 $ hg clone --stream http://localhost:$HGPORT phase-publish
277 275 streaming all changes
278 276 1027 files to transfer, 96.3 KB of data
279 277 transferred 96.3 KB in * seconds (*) (glob)
280 278 searching for changes
281 279 no changes found
282 280 updating to branch default
283 281 1025 files updated, 0 files merged, 0 files removed, 0 files unresolved
284 282 #endif
285 283 #if stream-bundle2
286 284 $ hg clone --stream http://localhost:$HGPORT phase-publish
287 285 streaming all changes
288 286 1027 files to transfer, 96.3 KB of data
289 287 transferred 96.3 KB in * seconds (* */sec) (glob)
290 288 updating to branch default
291 289 1025 files updated, 0 files merged, 0 files removed, 0 files unresolved
292 290 #endif
293 291 $ hg -R phase-publish phase -r 'all()'
294 292 0: public
295 293 1: public
296 294
297 295 Clone as non publishing
298 296
299 297 $ cat << EOF >> server/.hg/hgrc
300 298 > [phases]
301 299 > publish = False
302 300 > EOF
303 301 $ killdaemons.py
304 302 $ hg -R server serve -p $HGPORT -d --pid-file=hg.pid
305 303 $ cat hg.pid >> $DAEMON_PIDS
306 304
307 305 #if stream-legacy
308 306 $ hg clone --stream http://localhost:$HGPORT phase-no-publish
309 307 streaming all changes
310 308 1027 files to transfer, 96.3 KB of data
311 309 transferred 96.3 KB in * seconds (*) (glob)
312 310 searching for changes
313 311 no changes found
314 312 updating to branch default
315 313 1025 files updated, 0 files merged, 0 files removed, 0 files unresolved
316 314 $ hg -R phase-no-publish phase -r 'all()'
317 315 0: public
318 316 1: public
319 317 #endif
320 318 #if stream-bundle2
321 319 $ hg clone --stream http://localhost:$HGPORT phase-no-publish
322 320 streaming all changes
323 1027 files to transfer, 96.3 KB of data
324 transferred 96.3 KB in * seconds (* */sec) (glob)
321 1028 files to transfer, 96.4 KB of data
322 transferred 96.4 KB in * seconds (* */sec) (glob)
325 323 updating to branch default
326 324 1025 files updated, 0 files merged, 0 files removed, 0 files unresolved
327 325 $ hg -R phase-no-publish phase -r 'all()'
328 0: public
329 1: public
326 0: draft
327 1: draft
330 328 #endif
331 329
332 330 $ killdaemons.py
General Comments 0
You need to be logged in to leave comments. Login now