##// END OF EJS Templates
bundle2: add support for a 'stream' parameter to 'getbundle'...
Boris Feld -
r35777:c24dad55 default
parent child Browse files
Show More
@@ -1,2209 +1,2222 b''
1 1 # exchange.py - utility to exchange data between repos.
2 2 #
3 3 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import collections
11 11 import errno
12 12 import hashlib
13 13
14 14 from .i18n import _
15 15 from .node import (
16 16 bin,
17 17 hex,
18 18 nullid,
19 19 )
20 20 from . import (
21 21 bookmarks as bookmod,
22 22 bundle2,
23 23 changegroup,
24 24 discovery,
25 25 error,
26 26 lock as lockmod,
27 27 logexchange,
28 28 obsolete,
29 29 phases,
30 30 pushkey,
31 31 pycompat,
32 32 scmutil,
33 33 sslutil,
34 34 streamclone,
35 35 url as urlmod,
36 36 util,
37 37 )
38 38
39 39 urlerr = util.urlerr
40 40 urlreq = util.urlreq
41 41
42 42 # Maps bundle version human names to changegroup versions.
43 43 _bundlespeccgversions = {'v1': '01',
44 44 'v2': '02',
45 45 'packed1': 's1',
46 46 'bundle2': '02', #legacy
47 47 }
48 48
49 49 # Compression engines allowed in version 1. THIS SHOULD NEVER CHANGE.
50 50 _bundlespecv1compengines = {'gzip', 'bzip2', 'none'}
51 51
52 52 def parsebundlespec(repo, spec, strict=True, externalnames=False):
53 53 """Parse a bundle string specification into parts.
54 54
55 55 Bundle specifications denote a well-defined bundle/exchange format.
56 56 The content of a given specification should not change over time in
57 57 order to ensure that bundles produced by a newer version of Mercurial are
58 58 readable from an older version.
59 59
60 60 The string currently has the form:
61 61
62 62 <compression>-<type>[;<parameter0>[;<parameter1>]]
63 63
64 64 Where <compression> is one of the supported compression formats
65 65 and <type> is (currently) a version string. A ";" can follow the type and
66 66 all text afterwards is interpreted as URI encoded, ";" delimited key=value
67 67 pairs.
68 68
69 69 If ``strict`` is True (the default) <compression> is required. Otherwise,
70 70 it is optional.
71 71
72 72 If ``externalnames`` is False (the default), the human-centric names will
73 73 be converted to their internal representation.
74 74
75 75 Returns a 3-tuple of (compression, version, parameters). Compression will
76 76 be ``None`` if not in strict mode and a compression isn't defined.
77 77
78 78 An ``InvalidBundleSpecification`` is raised when the specification is
79 79 not syntactically well formed.
80 80
81 81 An ``UnsupportedBundleSpecification`` is raised when the compression or
82 82 bundle type/version is not recognized.
83 83
84 84 Note: this function will likely eventually return a more complex data
85 85 structure, including bundle2 part information.
86 86 """
87 87 def parseparams(s):
88 88 if ';' not in s:
89 89 return s, {}
90 90
91 91 params = {}
92 92 version, paramstr = s.split(';', 1)
93 93
94 94 for p in paramstr.split(';'):
95 95 if '=' not in p:
96 96 raise error.InvalidBundleSpecification(
97 97 _('invalid bundle specification: '
98 98 'missing "=" in parameter: %s') % p)
99 99
100 100 key, value = p.split('=', 1)
101 101 key = urlreq.unquote(key)
102 102 value = urlreq.unquote(value)
103 103 params[key] = value
104 104
105 105 return version, params
106 106
107 107
108 108 if strict and '-' not in spec:
109 109 raise error.InvalidBundleSpecification(
110 110 _('invalid bundle specification; '
111 111 'must be prefixed with compression: %s') % spec)
112 112
113 113 if '-' in spec:
114 114 compression, version = spec.split('-', 1)
115 115
116 116 if compression not in util.compengines.supportedbundlenames:
117 117 raise error.UnsupportedBundleSpecification(
118 118 _('%s compression is not supported') % compression)
119 119
120 120 version, params = parseparams(version)
121 121
122 122 if version not in _bundlespeccgversions:
123 123 raise error.UnsupportedBundleSpecification(
124 124 _('%s is not a recognized bundle version') % version)
125 125 else:
126 126 # Value could be just the compression or just the version, in which
127 127 # case some defaults are assumed (but only when not in strict mode).
128 128 assert not strict
129 129
130 130 spec, params = parseparams(spec)
131 131
132 132 if spec in util.compengines.supportedbundlenames:
133 133 compression = spec
134 134 version = 'v1'
135 135 # Generaldelta repos require v2.
136 136 if 'generaldelta' in repo.requirements:
137 137 version = 'v2'
138 138 # Modern compression engines require v2.
139 139 if compression not in _bundlespecv1compengines:
140 140 version = 'v2'
141 141 elif spec in _bundlespeccgversions:
142 142 if spec == 'packed1':
143 143 compression = 'none'
144 144 else:
145 145 compression = 'bzip2'
146 146 version = spec
147 147 else:
148 148 raise error.UnsupportedBundleSpecification(
149 149 _('%s is not a recognized bundle specification') % spec)
150 150
151 151 # Bundle version 1 only supports a known set of compression engines.
152 152 if version == 'v1' and compression not in _bundlespecv1compengines:
153 153 raise error.UnsupportedBundleSpecification(
154 154 _('compression engine %s is not supported on v1 bundles') %
155 155 compression)
156 156
157 157 # The specification for packed1 can optionally declare the data formats
158 158 # required to apply it. If we see this metadata, compare against what the
159 159 # repo supports and error if the bundle isn't compatible.
160 160 if version == 'packed1' and 'requirements' in params:
161 161 requirements = set(params['requirements'].split(','))
162 162 missingreqs = requirements - repo.supportedformats
163 163 if missingreqs:
164 164 raise error.UnsupportedBundleSpecification(
165 165 _('missing support for repository features: %s') %
166 166 ', '.join(sorted(missingreqs)))
167 167
168 168 if not externalnames:
169 169 engine = util.compengines.forbundlename(compression)
170 170 compression = engine.bundletype()[1]
171 171 version = _bundlespeccgversions[version]
172 172 return compression, version, params
173 173
174 174 def readbundle(ui, fh, fname, vfs=None):
175 175 header = changegroup.readexactly(fh, 4)
176 176
177 177 alg = None
178 178 if not fname:
179 179 fname = "stream"
180 180 if not header.startswith('HG') and header.startswith('\0'):
181 181 fh = changegroup.headerlessfixup(fh, header)
182 182 header = "HG10"
183 183 alg = 'UN'
184 184 elif vfs:
185 185 fname = vfs.join(fname)
186 186
187 187 magic, version = header[0:2], header[2:4]
188 188
189 189 if magic != 'HG':
190 190 raise error.Abort(_('%s: not a Mercurial bundle') % fname)
191 191 if version == '10':
192 192 if alg is None:
193 193 alg = changegroup.readexactly(fh, 2)
194 194 return changegroup.cg1unpacker(fh, alg)
195 195 elif version.startswith('2'):
196 196 return bundle2.getunbundler(ui, fh, magicstring=magic + version)
197 197 elif version == 'S1':
198 198 return streamclone.streamcloneapplier(fh)
199 199 else:
200 200 raise error.Abort(_('%s: unknown bundle version %s') % (fname, version))
201 201
202 202 def getbundlespec(ui, fh):
203 203 """Infer the bundlespec from a bundle file handle.
204 204
205 205 The input file handle is seeked and the original seek position is not
206 206 restored.
207 207 """
208 208 def speccompression(alg):
209 209 try:
210 210 return util.compengines.forbundletype(alg).bundletype()[0]
211 211 except KeyError:
212 212 return None
213 213
214 214 b = readbundle(ui, fh, None)
215 215 if isinstance(b, changegroup.cg1unpacker):
216 216 alg = b._type
217 217 if alg == '_truncatedBZ':
218 218 alg = 'BZ'
219 219 comp = speccompression(alg)
220 220 if not comp:
221 221 raise error.Abort(_('unknown compression algorithm: %s') % alg)
222 222 return '%s-v1' % comp
223 223 elif isinstance(b, bundle2.unbundle20):
224 224 if 'Compression' in b.params:
225 225 comp = speccompression(b.params['Compression'])
226 226 if not comp:
227 227 raise error.Abort(_('unknown compression algorithm: %s') % comp)
228 228 else:
229 229 comp = 'none'
230 230
231 231 version = None
232 232 for part in b.iterparts():
233 233 if part.type == 'changegroup':
234 234 version = part.params['version']
235 235 if version in ('01', '02'):
236 236 version = 'v2'
237 237 else:
238 238 raise error.Abort(_('changegroup version %s does not have '
239 239 'a known bundlespec') % version,
240 240 hint=_('try upgrading your Mercurial '
241 241 'client'))
242 242
243 243 if not version:
244 244 raise error.Abort(_('could not identify changegroup version in '
245 245 'bundle'))
246 246
247 247 return '%s-%s' % (comp, version)
248 248 elif isinstance(b, streamclone.streamcloneapplier):
249 249 requirements = streamclone.readbundle1header(fh)[2]
250 250 params = 'requirements=%s' % ','.join(sorted(requirements))
251 251 return 'none-packed1;%s' % urlreq.quote(params)
252 252 else:
253 253 raise error.Abort(_('unknown bundle type: %s') % b)
254 254
255 255 def _computeoutgoing(repo, heads, common):
256 256 """Computes which revs are outgoing given a set of common
257 257 and a set of heads.
258 258
259 259 This is a separate function so extensions can have access to
260 260 the logic.
261 261
262 262 Returns a discovery.outgoing object.
263 263 """
264 264 cl = repo.changelog
265 265 if common:
266 266 hasnode = cl.hasnode
267 267 common = [n for n in common if hasnode(n)]
268 268 else:
269 269 common = [nullid]
270 270 if not heads:
271 271 heads = cl.heads()
272 272 return discovery.outgoing(repo, common, heads)
273 273
274 274 def _forcebundle1(op):
275 275 """return true if a pull/push must use bundle1
276 276
277 277 This function is used to allow testing of the older bundle version"""
278 278 ui = op.repo.ui
279 279 forcebundle1 = False
280 280 # The goal is this config is to allow developer to choose the bundle
281 281 # version used during exchanged. This is especially handy during test.
282 282 # Value is a list of bundle version to be picked from, highest version
283 283 # should be used.
284 284 #
285 285 # developer config: devel.legacy.exchange
286 286 exchange = ui.configlist('devel', 'legacy.exchange')
287 287 forcebundle1 = 'bundle2' not in exchange and 'bundle1' in exchange
288 288 return forcebundle1 or not op.remote.capable('bundle2')
289 289
290 290 class pushoperation(object):
291 291 """A object that represent a single push operation
292 292
293 293 Its purpose is to carry push related state and very common operations.
294 294
295 295 A new pushoperation should be created at the beginning of each push and
296 296 discarded afterward.
297 297 """
298 298
299 299 def __init__(self, repo, remote, force=False, revs=None, newbranch=False,
300 300 bookmarks=(), pushvars=None):
301 301 # repo we push from
302 302 self.repo = repo
303 303 self.ui = repo.ui
304 304 # repo we push to
305 305 self.remote = remote
306 306 # force option provided
307 307 self.force = force
308 308 # revs to be pushed (None is "all")
309 309 self.revs = revs
310 310 # bookmark explicitly pushed
311 311 self.bookmarks = bookmarks
312 312 # allow push of new branch
313 313 self.newbranch = newbranch
314 314 # step already performed
315 315 # (used to check what steps have been already performed through bundle2)
316 316 self.stepsdone = set()
317 317 # Integer version of the changegroup push result
318 318 # - None means nothing to push
319 319 # - 0 means HTTP error
320 320 # - 1 means we pushed and remote head count is unchanged *or*
321 321 # we have outgoing changesets but refused to push
322 322 # - other values as described by addchangegroup()
323 323 self.cgresult = None
324 324 # Boolean value for the bookmark push
325 325 self.bkresult = None
326 326 # discover.outgoing object (contains common and outgoing data)
327 327 self.outgoing = None
328 328 # all remote topological heads before the push
329 329 self.remoteheads = None
330 330 # Details of the remote branch pre and post push
331 331 #
332 332 # mapping: {'branch': ([remoteheads],
333 333 # [newheads],
334 334 # [unsyncedheads],
335 335 # [discardedheads])}
336 336 # - branch: the branch name
337 337 # - remoteheads: the list of remote heads known locally
338 338 # None if the branch is new
339 339 # - newheads: the new remote heads (known locally) with outgoing pushed
340 340 # - unsyncedheads: the list of remote heads unknown locally.
341 341 # - discardedheads: the list of remote heads made obsolete by the push
342 342 self.pushbranchmap = None
343 343 # testable as a boolean indicating if any nodes are missing locally.
344 344 self.incoming = None
345 345 # summary of the remote phase situation
346 346 self.remotephases = None
347 347 # phases changes that must be pushed along side the changesets
348 348 self.outdatedphases = None
349 349 # phases changes that must be pushed if changeset push fails
350 350 self.fallbackoutdatedphases = None
351 351 # outgoing obsmarkers
352 352 self.outobsmarkers = set()
353 353 # outgoing bookmarks
354 354 self.outbookmarks = []
355 355 # transaction manager
356 356 self.trmanager = None
357 357 # map { pushkey partid -> callback handling failure}
358 358 # used to handle exception from mandatory pushkey part failure
359 359 self.pkfailcb = {}
360 360 # an iterable of pushvars or None
361 361 self.pushvars = pushvars
362 362
363 363 @util.propertycache
364 364 def futureheads(self):
365 365 """future remote heads if the changeset push succeeds"""
366 366 return self.outgoing.missingheads
367 367
368 368 @util.propertycache
369 369 def fallbackheads(self):
370 370 """future remote heads if the changeset push fails"""
371 371 if self.revs is None:
372 372 # not target to push, all common are relevant
373 373 return self.outgoing.commonheads
374 374 unfi = self.repo.unfiltered()
375 375 # I want cheads = heads(::missingheads and ::commonheads)
376 376 # (missingheads is revs with secret changeset filtered out)
377 377 #
378 378 # This can be expressed as:
379 379 # cheads = ( (missingheads and ::commonheads)
380 380 # + (commonheads and ::missingheads))"
381 381 # )
382 382 #
383 383 # while trying to push we already computed the following:
384 384 # common = (::commonheads)
385 385 # missing = ((commonheads::missingheads) - commonheads)
386 386 #
387 387 # We can pick:
388 388 # * missingheads part of common (::commonheads)
389 389 common = self.outgoing.common
390 390 nm = self.repo.changelog.nodemap
391 391 cheads = [node for node in self.revs if nm[node] in common]
392 392 # and
393 393 # * commonheads parents on missing
394 394 revset = unfi.set('%ln and parents(roots(%ln))',
395 395 self.outgoing.commonheads,
396 396 self.outgoing.missing)
397 397 cheads.extend(c.node() for c in revset)
398 398 return cheads
399 399
400 400 @property
401 401 def commonheads(self):
402 402 """set of all common heads after changeset bundle push"""
403 403 if self.cgresult:
404 404 return self.futureheads
405 405 else:
406 406 return self.fallbackheads
407 407
408 408 # mapping of message used when pushing bookmark
409 409 bookmsgmap = {'update': (_("updating bookmark %s\n"),
410 410 _('updating bookmark %s failed!\n')),
411 411 'export': (_("exporting bookmark %s\n"),
412 412 _('exporting bookmark %s failed!\n')),
413 413 'delete': (_("deleting remote bookmark %s\n"),
414 414 _('deleting remote bookmark %s failed!\n')),
415 415 }
416 416
417 417
418 418 def push(repo, remote, force=False, revs=None, newbranch=False, bookmarks=(),
419 419 opargs=None):
420 420 '''Push outgoing changesets (limited by revs) from a local
421 421 repository to remote. Return an integer:
422 422 - None means nothing to push
423 423 - 0 means HTTP error
424 424 - 1 means we pushed and remote head count is unchanged *or*
425 425 we have outgoing changesets but refused to push
426 426 - other values as described by addchangegroup()
427 427 '''
428 428 if opargs is None:
429 429 opargs = {}
430 430 pushop = pushoperation(repo, remote, force, revs, newbranch, bookmarks,
431 431 **pycompat.strkwargs(opargs))
432 432 if pushop.remote.local():
433 433 missing = (set(pushop.repo.requirements)
434 434 - pushop.remote.local().supported)
435 435 if missing:
436 436 msg = _("required features are not"
437 437 " supported in the destination:"
438 438 " %s") % (', '.join(sorted(missing)))
439 439 raise error.Abort(msg)
440 440
441 441 if not pushop.remote.canpush():
442 442 raise error.Abort(_("destination does not support push"))
443 443
444 444 if not pushop.remote.capable('unbundle'):
445 445 raise error.Abort(_('cannot push: destination does not support the '
446 446 'unbundle wire protocol command'))
447 447
448 448 # get lock as we might write phase data
449 449 wlock = lock = None
450 450 try:
451 451 # bundle2 push may receive a reply bundle touching bookmarks or other
452 452 # things requiring the wlock. Take it now to ensure proper ordering.
453 453 maypushback = pushop.ui.configbool('experimental', 'bundle2.pushback')
454 454 if (not _forcebundle1(pushop)) and maypushback:
455 455 wlock = pushop.repo.wlock()
456 456 lock = pushop.repo.lock()
457 457 pushop.trmanager = transactionmanager(pushop.repo,
458 458 'push-response',
459 459 pushop.remote.url())
460 460 except IOError as err:
461 461 if err.errno != errno.EACCES:
462 462 raise
463 463 # source repo cannot be locked.
464 464 # We do not abort the push, but just disable the local phase
465 465 # synchronisation.
466 466 msg = 'cannot lock source repository: %s\n' % err
467 467 pushop.ui.debug(msg)
468 468
469 469 with wlock or util.nullcontextmanager(), \
470 470 lock or util.nullcontextmanager(), \
471 471 pushop.trmanager or util.nullcontextmanager():
472 472 pushop.repo.checkpush(pushop)
473 473 _pushdiscovery(pushop)
474 474 if not _forcebundle1(pushop):
475 475 _pushbundle2(pushop)
476 476 _pushchangeset(pushop)
477 477 _pushsyncphase(pushop)
478 478 _pushobsolete(pushop)
479 479 _pushbookmark(pushop)
480 480
481 481 return pushop
482 482
483 483 # list of steps to perform discovery before push
484 484 pushdiscoveryorder = []
485 485
486 486 # Mapping between step name and function
487 487 #
488 488 # This exists to help extensions wrap steps if necessary
489 489 pushdiscoverymapping = {}
490 490
491 491 def pushdiscovery(stepname):
492 492 """decorator for function performing discovery before push
493 493
494 494 The function is added to the step -> function mapping and appended to the
495 495 list of steps. Beware that decorated function will be added in order (this
496 496 may matter).
497 497
498 498 You can only use this decorator for a new step, if you want to wrap a step
499 499 from an extension, change the pushdiscovery dictionary directly."""
500 500 def dec(func):
501 501 assert stepname not in pushdiscoverymapping
502 502 pushdiscoverymapping[stepname] = func
503 503 pushdiscoveryorder.append(stepname)
504 504 return func
505 505 return dec
506 506
507 507 def _pushdiscovery(pushop):
508 508 """Run all discovery steps"""
509 509 for stepname in pushdiscoveryorder:
510 510 step = pushdiscoverymapping[stepname]
511 511 step(pushop)
512 512
513 513 @pushdiscovery('changeset')
514 514 def _pushdiscoverychangeset(pushop):
515 515 """discover the changeset that need to be pushed"""
516 516 fci = discovery.findcommonincoming
517 517 if pushop.revs:
518 518 commoninc = fci(pushop.repo, pushop.remote, force=pushop.force,
519 519 ancestorsof=pushop.revs)
520 520 else:
521 521 commoninc = fci(pushop.repo, pushop.remote, force=pushop.force)
522 522 common, inc, remoteheads = commoninc
523 523 fco = discovery.findcommonoutgoing
524 524 outgoing = fco(pushop.repo, pushop.remote, onlyheads=pushop.revs,
525 525 commoninc=commoninc, force=pushop.force)
526 526 pushop.outgoing = outgoing
527 527 pushop.remoteheads = remoteheads
528 528 pushop.incoming = inc
529 529
530 530 @pushdiscovery('phase')
531 531 def _pushdiscoveryphase(pushop):
532 532 """discover the phase that needs to be pushed
533 533
534 534 (computed for both success and failure case for changesets push)"""
535 535 outgoing = pushop.outgoing
536 536 unfi = pushop.repo.unfiltered()
537 537 remotephases = pushop.remote.listkeys('phases')
538 538 if (pushop.ui.configbool('ui', '_usedassubrepo')
539 539 and remotephases # server supports phases
540 540 and not pushop.outgoing.missing # no changesets to be pushed
541 541 and remotephases.get('publishing', False)):
542 542 # When:
543 543 # - this is a subrepo push
544 544 # - and remote support phase
545 545 # - and no changeset are to be pushed
546 546 # - and remote is publishing
547 547 # We may be in issue 3781 case!
548 548 # We drop the possible phase synchronisation done by
549 549 # courtesy to publish changesets possibly locally draft
550 550 # on the remote.
551 551 pushop.outdatedphases = []
552 552 pushop.fallbackoutdatedphases = []
553 553 return
554 554
555 555 pushop.remotephases = phases.remotephasessummary(pushop.repo,
556 556 pushop.fallbackheads,
557 557 remotephases)
558 558 droots = pushop.remotephases.draftroots
559 559
560 560 extracond = ''
561 561 if not pushop.remotephases.publishing:
562 562 extracond = ' and public()'
563 563 revset = 'heads((%%ln::%%ln) %s)' % extracond
564 564 # Get the list of all revs draft on remote by public here.
565 565 # XXX Beware that revset break if droots is not strictly
566 566 # XXX root we may want to ensure it is but it is costly
567 567 fallback = list(unfi.set(revset, droots, pushop.fallbackheads))
568 568 if not outgoing.missing:
569 569 future = fallback
570 570 else:
571 571 # adds changeset we are going to push as draft
572 572 #
573 573 # should not be necessary for publishing server, but because of an
574 574 # issue fixed in xxxxx we have to do it anyway.
575 575 fdroots = list(unfi.set('roots(%ln + %ln::)',
576 576 outgoing.missing, droots))
577 577 fdroots = [f.node() for f in fdroots]
578 578 future = list(unfi.set(revset, fdroots, pushop.futureheads))
579 579 pushop.outdatedphases = future
580 580 pushop.fallbackoutdatedphases = fallback
581 581
582 582 @pushdiscovery('obsmarker')
583 583 def _pushdiscoveryobsmarkers(pushop):
584 584 if (obsolete.isenabled(pushop.repo, obsolete.exchangeopt)
585 585 and pushop.repo.obsstore
586 586 and 'obsolete' in pushop.remote.listkeys('namespaces')):
587 587 repo = pushop.repo
588 588 # very naive computation, that can be quite expensive on big repo.
589 589 # However: evolution is currently slow on them anyway.
590 590 nodes = (c.node() for c in repo.set('::%ln', pushop.futureheads))
591 591 pushop.outobsmarkers = pushop.repo.obsstore.relevantmarkers(nodes)
592 592
593 593 @pushdiscovery('bookmarks')
594 594 def _pushdiscoverybookmarks(pushop):
595 595 ui = pushop.ui
596 596 repo = pushop.repo.unfiltered()
597 597 remote = pushop.remote
598 598 ui.debug("checking for updated bookmarks\n")
599 599 ancestors = ()
600 600 if pushop.revs:
601 601 revnums = map(repo.changelog.rev, pushop.revs)
602 602 ancestors = repo.changelog.ancestors(revnums, inclusive=True)
603 603 remotebookmark = remote.listkeys('bookmarks')
604 604
605 605 explicit = set([repo._bookmarks.expandname(bookmark)
606 606 for bookmark in pushop.bookmarks])
607 607
608 608 remotebookmark = bookmod.unhexlifybookmarks(remotebookmark)
609 609 comp = bookmod.comparebookmarks(repo, repo._bookmarks, remotebookmark)
610 610
611 611 def safehex(x):
612 612 if x is None:
613 613 return x
614 614 return hex(x)
615 615
616 616 def hexifycompbookmarks(bookmarks):
617 617 for b, scid, dcid in bookmarks:
618 618 yield b, safehex(scid), safehex(dcid)
619 619
620 620 comp = [hexifycompbookmarks(marks) for marks in comp]
621 621 addsrc, adddst, advsrc, advdst, diverge, differ, invalid, same = comp
622 622
623 623 for b, scid, dcid in advsrc:
624 624 if b in explicit:
625 625 explicit.remove(b)
626 626 if not ancestors or repo[scid].rev() in ancestors:
627 627 pushop.outbookmarks.append((b, dcid, scid))
628 628 # search added bookmark
629 629 for b, scid, dcid in addsrc:
630 630 if b in explicit:
631 631 explicit.remove(b)
632 632 pushop.outbookmarks.append((b, '', scid))
633 633 # search for overwritten bookmark
634 634 for b, scid, dcid in list(advdst) + list(diverge) + list(differ):
635 635 if b in explicit:
636 636 explicit.remove(b)
637 637 pushop.outbookmarks.append((b, dcid, scid))
638 638 # search for bookmark to delete
639 639 for b, scid, dcid in adddst:
640 640 if b in explicit:
641 641 explicit.remove(b)
642 642 # treat as "deleted locally"
643 643 pushop.outbookmarks.append((b, dcid, ''))
644 644 # identical bookmarks shouldn't get reported
645 645 for b, scid, dcid in same:
646 646 if b in explicit:
647 647 explicit.remove(b)
648 648
649 649 if explicit:
650 650 explicit = sorted(explicit)
651 651 # we should probably list all of them
652 652 ui.warn(_('bookmark %s does not exist on the local '
653 653 'or remote repository!\n') % explicit[0])
654 654 pushop.bkresult = 2
655 655
656 656 pushop.outbookmarks.sort()
657 657
658 658 def _pushcheckoutgoing(pushop):
659 659 outgoing = pushop.outgoing
660 660 unfi = pushop.repo.unfiltered()
661 661 if not outgoing.missing:
662 662 # nothing to push
663 663 scmutil.nochangesfound(unfi.ui, unfi, outgoing.excluded)
664 664 return False
665 665 # something to push
666 666 if not pushop.force:
667 667 # if repo.obsstore == False --> no obsolete
668 668 # then, save the iteration
669 669 if unfi.obsstore:
670 670 # this message are here for 80 char limit reason
671 671 mso = _("push includes obsolete changeset: %s!")
672 672 mspd = _("push includes phase-divergent changeset: %s!")
673 673 mscd = _("push includes content-divergent changeset: %s!")
674 674 mst = {"orphan": _("push includes orphan changeset: %s!"),
675 675 "phase-divergent": mspd,
676 676 "content-divergent": mscd}
677 677 # If we are to push if there is at least one
678 678 # obsolete or unstable changeset in missing, at
679 679 # least one of the missinghead will be obsolete or
680 680 # unstable. So checking heads only is ok
681 681 for node in outgoing.missingheads:
682 682 ctx = unfi[node]
683 683 if ctx.obsolete():
684 684 raise error.Abort(mso % ctx)
685 685 elif ctx.isunstable():
686 686 # TODO print more than one instability in the abort
687 687 # message
688 688 raise error.Abort(mst[ctx.instabilities()[0]] % ctx)
689 689
690 690 discovery.checkheads(pushop)
691 691 return True
692 692
693 693 # List of names of steps to perform for an outgoing bundle2, order matters.
694 694 b2partsgenorder = []
695 695
696 696 # Mapping between step name and function
697 697 #
698 698 # This exists to help extensions wrap steps if necessary
699 699 b2partsgenmapping = {}
700 700
701 701 def b2partsgenerator(stepname, idx=None):
702 702 """decorator for function generating bundle2 part
703 703
704 704 The function is added to the step -> function mapping and appended to the
705 705 list of steps. Beware that decorated functions will be added in order
706 706 (this may matter).
707 707
708 708 You can only use this decorator for new steps, if you want to wrap a step
709 709 from an extension, attack the b2partsgenmapping dictionary directly."""
710 710 def dec(func):
711 711 assert stepname not in b2partsgenmapping
712 712 b2partsgenmapping[stepname] = func
713 713 if idx is None:
714 714 b2partsgenorder.append(stepname)
715 715 else:
716 716 b2partsgenorder.insert(idx, stepname)
717 717 return func
718 718 return dec
719 719
720 720 def _pushb2ctxcheckheads(pushop, bundler):
721 721 """Generate race condition checking parts
722 722
723 723 Exists as an independent function to aid extensions
724 724 """
725 725 # * 'force' do not check for push race,
726 726 # * if we don't push anything, there are nothing to check.
727 727 if not pushop.force and pushop.outgoing.missingheads:
728 728 allowunrelated = 'related' in bundler.capabilities.get('checkheads', ())
729 729 emptyremote = pushop.pushbranchmap is None
730 730 if not allowunrelated or emptyremote:
731 731 bundler.newpart('check:heads', data=iter(pushop.remoteheads))
732 732 else:
733 733 affected = set()
734 734 for branch, heads in pushop.pushbranchmap.iteritems():
735 735 remoteheads, newheads, unsyncedheads, discardedheads = heads
736 736 if remoteheads is not None:
737 737 remote = set(remoteheads)
738 738 affected |= set(discardedheads) & remote
739 739 affected |= remote - set(newheads)
740 740 if affected:
741 741 data = iter(sorted(affected))
742 742 bundler.newpart('check:updated-heads', data=data)
743 743
744 744 def _pushing(pushop):
745 745 """return True if we are pushing anything"""
746 746 return bool(pushop.outgoing.missing
747 747 or pushop.outdatedphases
748 748 or pushop.outobsmarkers
749 749 or pushop.outbookmarks)
750 750
751 751 @b2partsgenerator('check-bookmarks')
752 752 def _pushb2checkbookmarks(pushop, bundler):
753 753 """insert bookmark move checking"""
754 754 if not _pushing(pushop) or pushop.force:
755 755 return
756 756 b2caps = bundle2.bundle2caps(pushop.remote)
757 757 hasbookmarkcheck = 'bookmarks' in b2caps
758 758 if not (pushop.outbookmarks and hasbookmarkcheck):
759 759 return
760 760 data = []
761 761 for book, old, new in pushop.outbookmarks:
762 762 old = bin(old)
763 763 data.append((book, old))
764 764 checkdata = bookmod.binaryencode(data)
765 765 bundler.newpart('check:bookmarks', data=checkdata)
766 766
767 767 @b2partsgenerator('check-phases')
768 768 def _pushb2checkphases(pushop, bundler):
769 769 """insert phase move checking"""
770 770 if not _pushing(pushop) or pushop.force:
771 771 return
772 772 b2caps = bundle2.bundle2caps(pushop.remote)
773 773 hasphaseheads = 'heads' in b2caps.get('phases', ())
774 774 if pushop.remotephases is not None and hasphaseheads:
775 775 # check that the remote phase has not changed
776 776 checks = [[] for p in phases.allphases]
777 777 checks[phases.public].extend(pushop.remotephases.publicheads)
778 778 checks[phases.draft].extend(pushop.remotephases.draftroots)
779 779 if any(checks):
780 780 for nodes in checks:
781 781 nodes.sort()
782 782 checkdata = phases.binaryencode(checks)
783 783 bundler.newpart('check:phases', data=checkdata)
784 784
785 785 @b2partsgenerator('changeset')
786 786 def _pushb2ctx(pushop, bundler):
787 787 """handle changegroup push through bundle2
788 788
789 789 addchangegroup result is stored in the ``pushop.cgresult`` attribute.
790 790 """
791 791 if 'changesets' in pushop.stepsdone:
792 792 return
793 793 pushop.stepsdone.add('changesets')
794 794 # Send known heads to the server for race detection.
795 795 if not _pushcheckoutgoing(pushop):
796 796 return
797 797 pushop.repo.prepushoutgoinghooks(pushop)
798 798
799 799 _pushb2ctxcheckheads(pushop, bundler)
800 800
801 801 b2caps = bundle2.bundle2caps(pushop.remote)
802 802 version = '01'
803 803 cgversions = b2caps.get('changegroup')
804 804 if cgversions: # 3.1 and 3.2 ship with an empty value
805 805 cgversions = [v for v in cgversions
806 806 if v in changegroup.supportedoutgoingversions(
807 807 pushop.repo)]
808 808 if not cgversions:
809 809 raise ValueError(_('no common changegroup version'))
810 810 version = max(cgversions)
811 811 cgstream = changegroup.makestream(pushop.repo, pushop.outgoing, version,
812 812 'push')
813 813 cgpart = bundler.newpart('changegroup', data=cgstream)
814 814 if cgversions:
815 815 cgpart.addparam('version', version)
816 816 if 'treemanifest' in pushop.repo.requirements:
817 817 cgpart.addparam('treemanifest', '1')
818 818 def handlereply(op):
819 819 """extract addchangegroup returns from server reply"""
820 820 cgreplies = op.records.getreplies(cgpart.id)
821 821 assert len(cgreplies['changegroup']) == 1
822 822 pushop.cgresult = cgreplies['changegroup'][0]['return']
823 823 return handlereply
824 824
825 825 @b2partsgenerator('phase')
826 826 def _pushb2phases(pushop, bundler):
827 827 """handle phase push through bundle2"""
828 828 if 'phases' in pushop.stepsdone:
829 829 return
830 830 b2caps = bundle2.bundle2caps(pushop.remote)
831 831 ui = pushop.repo.ui
832 832
833 833 legacyphase = 'phases' in ui.configlist('devel', 'legacy.exchange')
834 834 haspushkey = 'pushkey' in b2caps
835 835 hasphaseheads = 'heads' in b2caps.get('phases', ())
836 836
837 837 if hasphaseheads and not legacyphase:
838 838 return _pushb2phaseheads(pushop, bundler)
839 839 elif haspushkey:
840 840 return _pushb2phasespushkey(pushop, bundler)
841 841
842 842 def _pushb2phaseheads(pushop, bundler):
843 843 """push phase information through a bundle2 - binary part"""
844 844 pushop.stepsdone.add('phases')
845 845 if pushop.outdatedphases:
846 846 updates = [[] for p in phases.allphases]
847 847 updates[0].extend(h.node() for h in pushop.outdatedphases)
848 848 phasedata = phases.binaryencode(updates)
849 849 bundler.newpart('phase-heads', data=phasedata)
850 850
851 851 def _pushb2phasespushkey(pushop, bundler):
852 852 """push phase information through a bundle2 - pushkey part"""
853 853 pushop.stepsdone.add('phases')
854 854 part2node = []
855 855
856 856 def handlefailure(pushop, exc):
857 857 targetid = int(exc.partid)
858 858 for partid, node in part2node:
859 859 if partid == targetid:
860 860 raise error.Abort(_('updating %s to public failed') % node)
861 861
862 862 enc = pushkey.encode
863 863 for newremotehead in pushop.outdatedphases:
864 864 part = bundler.newpart('pushkey')
865 865 part.addparam('namespace', enc('phases'))
866 866 part.addparam('key', enc(newremotehead.hex()))
867 867 part.addparam('old', enc('%d' % phases.draft))
868 868 part.addparam('new', enc('%d' % phases.public))
869 869 part2node.append((part.id, newremotehead))
870 870 pushop.pkfailcb[part.id] = handlefailure
871 871
872 872 def handlereply(op):
873 873 for partid, node in part2node:
874 874 partrep = op.records.getreplies(partid)
875 875 results = partrep['pushkey']
876 876 assert len(results) <= 1
877 877 msg = None
878 878 if not results:
879 879 msg = _('server ignored update of %s to public!\n') % node
880 880 elif not int(results[0]['return']):
881 881 msg = _('updating %s to public failed!\n') % node
882 882 if msg is not None:
883 883 pushop.ui.warn(msg)
884 884 return handlereply
885 885
886 886 @b2partsgenerator('obsmarkers')
887 887 def _pushb2obsmarkers(pushop, bundler):
888 888 if 'obsmarkers' in pushop.stepsdone:
889 889 return
890 890 remoteversions = bundle2.obsmarkersversion(bundler.capabilities)
891 891 if obsolete.commonversion(remoteversions) is None:
892 892 return
893 893 pushop.stepsdone.add('obsmarkers')
894 894 if pushop.outobsmarkers:
895 895 markers = sorted(pushop.outobsmarkers)
896 896 bundle2.buildobsmarkerspart(bundler, markers)
897 897
898 898 @b2partsgenerator('bookmarks')
899 899 def _pushb2bookmarks(pushop, bundler):
900 900 """handle bookmark push through bundle2"""
901 901 if 'bookmarks' in pushop.stepsdone:
902 902 return
903 903 b2caps = bundle2.bundle2caps(pushop.remote)
904 904
905 905 legacy = pushop.repo.ui.configlist('devel', 'legacy.exchange')
906 906 legacybooks = 'bookmarks' in legacy
907 907
908 908 if not legacybooks and 'bookmarks' in b2caps:
909 909 return _pushb2bookmarkspart(pushop, bundler)
910 910 elif 'pushkey' in b2caps:
911 911 return _pushb2bookmarkspushkey(pushop, bundler)
912 912
913 913 def _bmaction(old, new):
914 914 """small utility for bookmark pushing"""
915 915 if not old:
916 916 return 'export'
917 917 elif not new:
918 918 return 'delete'
919 919 return 'update'
920 920
921 921 def _pushb2bookmarkspart(pushop, bundler):
922 922 pushop.stepsdone.add('bookmarks')
923 923 if not pushop.outbookmarks:
924 924 return
925 925
926 926 allactions = []
927 927 data = []
928 928 for book, old, new in pushop.outbookmarks:
929 929 new = bin(new)
930 930 data.append((book, new))
931 931 allactions.append((book, _bmaction(old, new)))
932 932 checkdata = bookmod.binaryencode(data)
933 933 bundler.newpart('bookmarks', data=checkdata)
934 934
935 935 def handlereply(op):
936 936 ui = pushop.ui
937 937 # if success
938 938 for book, action in allactions:
939 939 ui.status(bookmsgmap[action][0] % book)
940 940
941 941 return handlereply
942 942
943 943 def _pushb2bookmarkspushkey(pushop, bundler):
944 944 pushop.stepsdone.add('bookmarks')
945 945 part2book = []
946 946 enc = pushkey.encode
947 947
948 948 def handlefailure(pushop, exc):
949 949 targetid = int(exc.partid)
950 950 for partid, book, action in part2book:
951 951 if partid == targetid:
952 952 raise error.Abort(bookmsgmap[action][1].rstrip() % book)
953 953 # we should not be called for part we did not generated
954 954 assert False
955 955
956 956 for book, old, new in pushop.outbookmarks:
957 957 part = bundler.newpart('pushkey')
958 958 part.addparam('namespace', enc('bookmarks'))
959 959 part.addparam('key', enc(book))
960 960 part.addparam('old', enc(old))
961 961 part.addparam('new', enc(new))
962 962 action = 'update'
963 963 if not old:
964 964 action = 'export'
965 965 elif not new:
966 966 action = 'delete'
967 967 part2book.append((part.id, book, action))
968 968 pushop.pkfailcb[part.id] = handlefailure
969 969
970 970 def handlereply(op):
971 971 ui = pushop.ui
972 972 for partid, book, action in part2book:
973 973 partrep = op.records.getreplies(partid)
974 974 results = partrep['pushkey']
975 975 assert len(results) <= 1
976 976 if not results:
977 977 pushop.ui.warn(_('server ignored bookmark %s update\n') % book)
978 978 else:
979 979 ret = int(results[0]['return'])
980 980 if ret:
981 981 ui.status(bookmsgmap[action][0] % book)
982 982 else:
983 983 ui.warn(bookmsgmap[action][1] % book)
984 984 if pushop.bkresult is not None:
985 985 pushop.bkresult = 1
986 986 return handlereply
987 987
988 988 @b2partsgenerator('pushvars', idx=0)
989 989 def _getbundlesendvars(pushop, bundler):
990 990 '''send shellvars via bundle2'''
991 991 pushvars = pushop.pushvars
992 992 if pushvars:
993 993 shellvars = {}
994 994 for raw in pushvars:
995 995 if '=' not in raw:
996 996 msg = ("unable to parse variable '%s', should follow "
997 997 "'KEY=VALUE' or 'KEY=' format")
998 998 raise error.Abort(msg % raw)
999 999 k, v = raw.split('=', 1)
1000 1000 shellvars[k] = v
1001 1001
1002 1002 part = bundler.newpart('pushvars')
1003 1003
1004 1004 for key, value in shellvars.iteritems():
1005 1005 part.addparam(key, value, mandatory=False)
1006 1006
1007 1007 def _pushbundle2(pushop):
1008 1008 """push data to the remote using bundle2
1009 1009
1010 1010 The only currently supported type of data is changegroup but this will
1011 1011 evolve in the future."""
1012 1012 bundler = bundle2.bundle20(pushop.ui, bundle2.bundle2caps(pushop.remote))
1013 1013 pushback = (pushop.trmanager
1014 1014 and pushop.ui.configbool('experimental', 'bundle2.pushback'))
1015 1015
1016 1016 # create reply capability
1017 1017 capsblob = bundle2.encodecaps(bundle2.getrepocaps(pushop.repo,
1018 1018 allowpushback=pushback))
1019 1019 bundler.newpart('replycaps', data=capsblob)
1020 1020 replyhandlers = []
1021 1021 for partgenname in b2partsgenorder:
1022 1022 partgen = b2partsgenmapping[partgenname]
1023 1023 ret = partgen(pushop, bundler)
1024 1024 if callable(ret):
1025 1025 replyhandlers.append(ret)
1026 1026 # do not push if nothing to push
1027 1027 if bundler.nbparts <= 1:
1028 1028 return
1029 1029 stream = util.chunkbuffer(bundler.getchunks())
1030 1030 try:
1031 1031 try:
1032 1032 reply = pushop.remote.unbundle(
1033 1033 stream, ['force'], pushop.remote.url())
1034 1034 except error.BundleValueError as exc:
1035 1035 raise error.Abort(_('missing support for %s') % exc)
1036 1036 try:
1037 1037 trgetter = None
1038 1038 if pushback:
1039 1039 trgetter = pushop.trmanager.transaction
1040 1040 op = bundle2.processbundle(pushop.repo, reply, trgetter)
1041 1041 except error.BundleValueError as exc:
1042 1042 raise error.Abort(_('missing support for %s') % exc)
1043 1043 except bundle2.AbortFromPart as exc:
1044 1044 pushop.ui.status(_('remote: %s\n') % exc)
1045 1045 if exc.hint is not None:
1046 1046 pushop.ui.status(_('remote: %s\n') % ('(%s)' % exc.hint))
1047 1047 raise error.Abort(_('push failed on remote'))
1048 1048 except error.PushkeyFailed as exc:
1049 1049 partid = int(exc.partid)
1050 1050 if partid not in pushop.pkfailcb:
1051 1051 raise
1052 1052 pushop.pkfailcb[partid](pushop, exc)
1053 1053 for rephand in replyhandlers:
1054 1054 rephand(op)
1055 1055
1056 1056 def _pushchangeset(pushop):
1057 1057 """Make the actual push of changeset bundle to remote repo"""
1058 1058 if 'changesets' in pushop.stepsdone:
1059 1059 return
1060 1060 pushop.stepsdone.add('changesets')
1061 1061 if not _pushcheckoutgoing(pushop):
1062 1062 return
1063 1063
1064 1064 # Should have verified this in push().
1065 1065 assert pushop.remote.capable('unbundle')
1066 1066
1067 1067 pushop.repo.prepushoutgoinghooks(pushop)
1068 1068 outgoing = pushop.outgoing
1069 1069 # TODO: get bundlecaps from remote
1070 1070 bundlecaps = None
1071 1071 # create a changegroup from local
1072 1072 if pushop.revs is None and not (outgoing.excluded
1073 1073 or pushop.repo.changelog.filteredrevs):
1074 1074 # push everything,
1075 1075 # use the fast path, no race possible on push
1076 1076 cg = changegroup.makechangegroup(pushop.repo, outgoing, '01', 'push',
1077 1077 fastpath=True, bundlecaps=bundlecaps)
1078 1078 else:
1079 1079 cg = changegroup.makechangegroup(pushop.repo, outgoing, '01',
1080 1080 'push', bundlecaps=bundlecaps)
1081 1081
1082 1082 # apply changegroup to remote
1083 1083 # local repo finds heads on server, finds out what
1084 1084 # revs it must push. once revs transferred, if server
1085 1085 # finds it has different heads (someone else won
1086 1086 # commit/push race), server aborts.
1087 1087 if pushop.force:
1088 1088 remoteheads = ['force']
1089 1089 else:
1090 1090 remoteheads = pushop.remoteheads
1091 1091 # ssh: return remote's addchangegroup()
1092 1092 # http: return remote's addchangegroup() or 0 for error
1093 1093 pushop.cgresult = pushop.remote.unbundle(cg, remoteheads,
1094 1094 pushop.repo.url())
1095 1095
1096 1096 def _pushsyncphase(pushop):
1097 1097 """synchronise phase information locally and remotely"""
1098 1098 cheads = pushop.commonheads
1099 1099 # even when we don't push, exchanging phase data is useful
1100 1100 remotephases = pushop.remote.listkeys('phases')
1101 1101 if (pushop.ui.configbool('ui', '_usedassubrepo')
1102 1102 and remotephases # server supports phases
1103 1103 and pushop.cgresult is None # nothing was pushed
1104 1104 and remotephases.get('publishing', False)):
1105 1105 # When:
1106 1106 # - this is a subrepo push
1107 1107 # - and remote support phase
1108 1108 # - and no changeset was pushed
1109 1109 # - and remote is publishing
1110 1110 # We may be in issue 3871 case!
1111 1111 # We drop the possible phase synchronisation done by
1112 1112 # courtesy to publish changesets possibly locally draft
1113 1113 # on the remote.
1114 1114 remotephases = {'publishing': 'True'}
1115 1115 if not remotephases: # old server or public only reply from non-publishing
1116 1116 _localphasemove(pushop, cheads)
1117 1117 # don't push any phase data as there is nothing to push
1118 1118 else:
1119 1119 ana = phases.analyzeremotephases(pushop.repo, cheads,
1120 1120 remotephases)
1121 1121 pheads, droots = ana
1122 1122 ### Apply remote phase on local
1123 1123 if remotephases.get('publishing', False):
1124 1124 _localphasemove(pushop, cheads)
1125 1125 else: # publish = False
1126 1126 _localphasemove(pushop, pheads)
1127 1127 _localphasemove(pushop, cheads, phases.draft)
1128 1128 ### Apply local phase on remote
1129 1129
1130 1130 if pushop.cgresult:
1131 1131 if 'phases' in pushop.stepsdone:
1132 1132 # phases already pushed though bundle2
1133 1133 return
1134 1134 outdated = pushop.outdatedphases
1135 1135 else:
1136 1136 outdated = pushop.fallbackoutdatedphases
1137 1137
1138 1138 pushop.stepsdone.add('phases')
1139 1139
1140 1140 # filter heads already turned public by the push
1141 1141 outdated = [c for c in outdated if c.node() not in pheads]
1142 1142 # fallback to independent pushkey command
1143 1143 for newremotehead in outdated:
1144 1144 r = pushop.remote.pushkey('phases',
1145 1145 newremotehead.hex(),
1146 1146 str(phases.draft),
1147 1147 str(phases.public))
1148 1148 if not r:
1149 1149 pushop.ui.warn(_('updating %s to public failed!\n')
1150 1150 % newremotehead)
1151 1151
1152 1152 def _localphasemove(pushop, nodes, phase=phases.public):
1153 1153 """move <nodes> to <phase> in the local source repo"""
1154 1154 if pushop.trmanager:
1155 1155 phases.advanceboundary(pushop.repo,
1156 1156 pushop.trmanager.transaction(),
1157 1157 phase,
1158 1158 nodes)
1159 1159 else:
1160 1160 # repo is not locked, do not change any phases!
1161 1161 # Informs the user that phases should have been moved when
1162 1162 # applicable.
1163 1163 actualmoves = [n for n in nodes if phase < pushop.repo[n].phase()]
1164 1164 phasestr = phases.phasenames[phase]
1165 1165 if actualmoves:
1166 1166 pushop.ui.status(_('cannot lock source repo, skipping '
1167 1167 'local %s phase update\n') % phasestr)
1168 1168
1169 1169 def _pushobsolete(pushop):
1170 1170 """utility function to push obsolete markers to a remote"""
1171 1171 if 'obsmarkers' in pushop.stepsdone:
1172 1172 return
1173 1173 repo = pushop.repo
1174 1174 remote = pushop.remote
1175 1175 pushop.stepsdone.add('obsmarkers')
1176 1176 if pushop.outobsmarkers:
1177 1177 pushop.ui.debug('try to push obsolete markers to remote\n')
1178 1178 rslts = []
1179 1179 remotedata = obsolete._pushkeyescape(sorted(pushop.outobsmarkers))
1180 1180 for key in sorted(remotedata, reverse=True):
1181 1181 # reverse sort to ensure we end with dump0
1182 1182 data = remotedata[key]
1183 1183 rslts.append(remote.pushkey('obsolete', key, '', data))
1184 1184 if [r for r in rslts if not r]:
1185 1185 msg = _('failed to push some obsolete markers!\n')
1186 1186 repo.ui.warn(msg)
1187 1187
1188 1188 def _pushbookmark(pushop):
1189 1189 """Update bookmark position on remote"""
1190 1190 if pushop.cgresult == 0 or 'bookmarks' in pushop.stepsdone:
1191 1191 return
1192 1192 pushop.stepsdone.add('bookmarks')
1193 1193 ui = pushop.ui
1194 1194 remote = pushop.remote
1195 1195
1196 1196 for b, old, new in pushop.outbookmarks:
1197 1197 action = 'update'
1198 1198 if not old:
1199 1199 action = 'export'
1200 1200 elif not new:
1201 1201 action = 'delete'
1202 1202 if remote.pushkey('bookmarks', b, old, new):
1203 1203 ui.status(bookmsgmap[action][0] % b)
1204 1204 else:
1205 1205 ui.warn(bookmsgmap[action][1] % b)
1206 1206 # discovery can have set the value form invalid entry
1207 1207 if pushop.bkresult is not None:
1208 1208 pushop.bkresult = 1
1209 1209
1210 1210 class pulloperation(object):
1211 1211 """A object that represent a single pull operation
1212 1212
1213 1213 It purpose is to carry pull related state and very common operation.
1214 1214
1215 1215 A new should be created at the beginning of each pull and discarded
1216 1216 afterward.
1217 1217 """
1218 1218
1219 1219 def __init__(self, repo, remote, heads=None, force=False, bookmarks=(),
1220 1220 remotebookmarks=None, streamclonerequested=None):
1221 1221 # repo we pull into
1222 1222 self.repo = repo
1223 1223 # repo we pull from
1224 1224 self.remote = remote
1225 1225 # revision we try to pull (None is "all")
1226 1226 self.heads = heads
1227 1227 # bookmark pulled explicitly
1228 1228 self.explicitbookmarks = [repo._bookmarks.expandname(bookmark)
1229 1229 for bookmark in bookmarks]
1230 1230 # do we force pull?
1231 1231 self.force = force
1232 1232 # whether a streaming clone was requested
1233 1233 self.streamclonerequested = streamclonerequested
1234 1234 # transaction manager
1235 1235 self.trmanager = None
1236 1236 # set of common changeset between local and remote before pull
1237 1237 self.common = None
1238 1238 # set of pulled head
1239 1239 self.rheads = None
1240 1240 # list of missing changeset to fetch remotely
1241 1241 self.fetch = None
1242 1242 # remote bookmarks data
1243 1243 self.remotebookmarks = remotebookmarks
1244 1244 # result of changegroup pulling (used as return code by pull)
1245 1245 self.cgresult = None
1246 1246 # list of step already done
1247 1247 self.stepsdone = set()
1248 1248 # Whether we attempted a clone from pre-generated bundles.
1249 1249 self.clonebundleattempted = False
1250 1250
1251 1251 @util.propertycache
1252 1252 def pulledsubset(self):
1253 1253 """heads of the set of changeset target by the pull"""
1254 1254 # compute target subset
1255 1255 if self.heads is None:
1256 1256 # We pulled every thing possible
1257 1257 # sync on everything common
1258 1258 c = set(self.common)
1259 1259 ret = list(self.common)
1260 1260 for n in self.rheads:
1261 1261 if n not in c:
1262 1262 ret.append(n)
1263 1263 return ret
1264 1264 else:
1265 1265 # We pulled a specific subset
1266 1266 # sync on this subset
1267 1267 return self.heads
1268 1268
1269 1269 @util.propertycache
1270 1270 def canusebundle2(self):
1271 1271 return not _forcebundle1(self)
1272 1272
1273 1273 @util.propertycache
1274 1274 def remotebundle2caps(self):
1275 1275 return bundle2.bundle2caps(self.remote)
1276 1276
1277 1277 def gettransaction(self):
1278 1278 # deprecated; talk to trmanager directly
1279 1279 return self.trmanager.transaction()
1280 1280
1281 1281 class transactionmanager(util.transactional):
1282 1282 """An object to manage the life cycle of a transaction
1283 1283
1284 1284 It creates the transaction on demand and calls the appropriate hooks when
1285 1285 closing the transaction."""
1286 1286 def __init__(self, repo, source, url):
1287 1287 self.repo = repo
1288 1288 self.source = source
1289 1289 self.url = url
1290 1290 self._tr = None
1291 1291
1292 1292 def transaction(self):
1293 1293 """Return an open transaction object, constructing if necessary"""
1294 1294 if not self._tr:
1295 1295 trname = '%s\n%s' % (self.source, util.hidepassword(self.url))
1296 1296 self._tr = self.repo.transaction(trname)
1297 1297 self._tr.hookargs['source'] = self.source
1298 1298 self._tr.hookargs['url'] = self.url
1299 1299 return self._tr
1300 1300
1301 1301 def close(self):
1302 1302 """close transaction if created"""
1303 1303 if self._tr is not None:
1304 1304 self._tr.close()
1305 1305
1306 1306 def release(self):
1307 1307 """release transaction if created"""
1308 1308 if self._tr is not None:
1309 1309 self._tr.release()
1310 1310
1311 1311 def pull(repo, remote, heads=None, force=False, bookmarks=(), opargs=None,
1312 1312 streamclonerequested=None):
1313 1313 """Fetch repository data from a remote.
1314 1314
1315 1315 This is the main function used to retrieve data from a remote repository.
1316 1316
1317 1317 ``repo`` is the local repository to clone into.
1318 1318 ``remote`` is a peer instance.
1319 1319 ``heads`` is an iterable of revisions we want to pull. ``None`` (the
1320 1320 default) means to pull everything from the remote.
1321 1321 ``bookmarks`` is an iterable of bookmarks requesting to be pulled. By
1322 1322 default, all remote bookmarks are pulled.
1323 1323 ``opargs`` are additional keyword arguments to pass to ``pulloperation``
1324 1324 initialization.
1325 1325 ``streamclonerequested`` is a boolean indicating whether a "streaming
1326 1326 clone" is requested. A "streaming clone" is essentially a raw file copy
1327 1327 of revlogs from the server. This only works when the local repository is
1328 1328 empty. The default value of ``None`` means to respect the server
1329 1329 configuration for preferring stream clones.
1330 1330
1331 1331 Returns the ``pulloperation`` created for this pull.
1332 1332 """
1333 1333 if opargs is None:
1334 1334 opargs = {}
1335 1335 pullop = pulloperation(repo, remote, heads, force, bookmarks=bookmarks,
1336 1336 streamclonerequested=streamclonerequested,
1337 1337 **pycompat.strkwargs(opargs))
1338 1338
1339 1339 peerlocal = pullop.remote.local()
1340 1340 if peerlocal:
1341 1341 missing = set(peerlocal.requirements) - pullop.repo.supported
1342 1342 if missing:
1343 1343 msg = _("required features are not"
1344 1344 " supported in the destination:"
1345 1345 " %s") % (', '.join(sorted(missing)))
1346 1346 raise error.Abort(msg)
1347 1347
1348 1348 pullop.trmanager = transactionmanager(repo, 'pull', remote.url())
1349 1349 with repo.wlock(), repo.lock(), pullop.trmanager:
1350 1350 # This should ideally be in _pullbundle2(). However, it needs to run
1351 1351 # before discovery to avoid extra work.
1352 1352 _maybeapplyclonebundle(pullop)
1353 1353 streamclone.maybeperformlegacystreamclone(pullop)
1354 1354 _pulldiscovery(pullop)
1355 1355 if pullop.canusebundle2:
1356 1356 _pullbundle2(pullop)
1357 1357 _pullchangeset(pullop)
1358 1358 _pullphase(pullop)
1359 1359 _pullbookmarks(pullop)
1360 1360 _pullobsolete(pullop)
1361 1361
1362 1362 # storing remotenames
1363 1363 if repo.ui.configbool('experimental', 'remotenames'):
1364 1364 logexchange.pullremotenames(repo, remote)
1365 1365
1366 1366 return pullop
1367 1367
1368 1368 # list of steps to perform discovery before pull
1369 1369 pulldiscoveryorder = []
1370 1370
1371 1371 # Mapping between step name and function
1372 1372 #
1373 1373 # This exists to help extensions wrap steps if necessary
1374 1374 pulldiscoverymapping = {}
1375 1375
1376 1376 def pulldiscovery(stepname):
1377 1377 """decorator for function performing discovery before pull
1378 1378
1379 1379 The function is added to the step -> function mapping and appended to the
1380 1380 list of steps. Beware that decorated function will be added in order (this
1381 1381 may matter).
1382 1382
1383 1383 You can only use this decorator for a new step, if you want to wrap a step
1384 1384 from an extension, change the pulldiscovery dictionary directly."""
1385 1385 def dec(func):
1386 1386 assert stepname not in pulldiscoverymapping
1387 1387 pulldiscoverymapping[stepname] = func
1388 1388 pulldiscoveryorder.append(stepname)
1389 1389 return func
1390 1390 return dec
1391 1391
1392 1392 def _pulldiscovery(pullop):
1393 1393 """Run all discovery steps"""
1394 1394 for stepname in pulldiscoveryorder:
1395 1395 step = pulldiscoverymapping[stepname]
1396 1396 step(pullop)
1397 1397
1398 1398 @pulldiscovery('b1:bookmarks')
1399 1399 def _pullbookmarkbundle1(pullop):
1400 1400 """fetch bookmark data in bundle1 case
1401 1401
1402 1402 If not using bundle2, we have to fetch bookmarks before changeset
1403 1403 discovery to reduce the chance and impact of race conditions."""
1404 1404 if pullop.remotebookmarks is not None:
1405 1405 return
1406 1406 if pullop.canusebundle2 and 'listkeys' in pullop.remotebundle2caps:
1407 1407 # all known bundle2 servers now support listkeys, but lets be nice with
1408 1408 # new implementation.
1409 1409 return
1410 1410 books = pullop.remote.listkeys('bookmarks')
1411 1411 pullop.remotebookmarks = bookmod.unhexlifybookmarks(books)
1412 1412
1413 1413
1414 1414 @pulldiscovery('changegroup')
1415 1415 def _pulldiscoverychangegroup(pullop):
1416 1416 """discovery phase for the pull
1417 1417
1418 1418 Current handle changeset discovery only, will change handle all discovery
1419 1419 at some point."""
1420 1420 tmp = discovery.findcommonincoming(pullop.repo,
1421 1421 pullop.remote,
1422 1422 heads=pullop.heads,
1423 1423 force=pullop.force)
1424 1424 common, fetch, rheads = tmp
1425 1425 nm = pullop.repo.unfiltered().changelog.nodemap
1426 1426 if fetch and rheads:
1427 1427 # If a remote heads is filtered locally, put in back in common.
1428 1428 #
1429 1429 # This is a hackish solution to catch most of "common but locally
1430 1430 # hidden situation". We do not performs discovery on unfiltered
1431 1431 # repository because it end up doing a pathological amount of round
1432 1432 # trip for w huge amount of changeset we do not care about.
1433 1433 #
1434 1434 # If a set of such "common but filtered" changeset exist on the server
1435 1435 # but are not including a remote heads, we'll not be able to detect it,
1436 1436 scommon = set(common)
1437 1437 for n in rheads:
1438 1438 if n in nm:
1439 1439 if n not in scommon:
1440 1440 common.append(n)
1441 1441 if set(rheads).issubset(set(common)):
1442 1442 fetch = []
1443 1443 pullop.common = common
1444 1444 pullop.fetch = fetch
1445 1445 pullop.rheads = rheads
1446 1446
1447 1447 def _pullbundle2(pullop):
1448 1448 """pull data using bundle2
1449 1449
1450 1450 For now, the only supported data are changegroup."""
1451 1451 kwargs = {'bundlecaps': caps20to10(pullop.repo)}
1452 1452
1453 1453 # At the moment we don't do stream clones over bundle2. If that is
1454 1454 # implemented then here's where the check for that will go.
1455 1455 streaming = False
1456 1456
1457 1457 # pulling changegroup
1458 1458 pullop.stepsdone.add('changegroup')
1459 1459
1460 1460 kwargs['common'] = pullop.common
1461 1461 kwargs['heads'] = pullop.heads or pullop.rheads
1462 1462 kwargs['cg'] = pullop.fetch
1463 1463
1464 1464 ui = pullop.repo.ui
1465 1465 legacyphase = 'phases' in ui.configlist('devel', 'legacy.exchange')
1466 1466 hasbinaryphase = 'heads' in pullop.remotebundle2caps.get('phases', ())
1467 1467 if (not legacyphase and hasbinaryphase):
1468 1468 kwargs['phases'] = True
1469 1469 pullop.stepsdone.add('phases')
1470 1470
1471 1471 bookmarksrequested = False
1472 1472 legacybookmark = 'bookmarks' in ui.configlist('devel', 'legacy.exchange')
1473 1473 hasbinarybook = 'bookmarks' in pullop.remotebundle2caps
1474 1474
1475 1475 if pullop.remotebookmarks is not None:
1476 1476 pullop.stepsdone.add('request-bookmarks')
1477 1477
1478 1478 if ('request-bookmarks' not in pullop.stepsdone
1479 1479 and pullop.remotebookmarks is None
1480 1480 and not legacybookmark and hasbinarybook):
1481 1481 kwargs['bookmarks'] = True
1482 1482 bookmarksrequested = True
1483 1483
1484 1484 if 'listkeys' in pullop.remotebundle2caps:
1485 1485 if 'phases' not in pullop.stepsdone:
1486 1486 kwargs['listkeys'] = ['phases']
1487 1487 if 'request-bookmarks' not in pullop.stepsdone:
1488 1488 # make sure to always includes bookmark data when migrating
1489 1489 # `hg incoming --bundle` to using this function.
1490 1490 pullop.stepsdone.add('request-bookmarks')
1491 1491 kwargs.setdefault('listkeys', []).append('bookmarks')
1492 1492
1493 1493 # If this is a full pull / clone and the server supports the clone bundles
1494 1494 # feature, tell the server whether we attempted a clone bundle. The
1495 1495 # presence of this flag indicates the client supports clone bundles. This
1496 1496 # will enable the server to treat clients that support clone bundles
1497 1497 # differently from those that don't.
1498 1498 if (pullop.remote.capable('clonebundles')
1499 1499 and pullop.heads is None and list(pullop.common) == [nullid]):
1500 1500 kwargs['cbattempted'] = pullop.clonebundleattempted
1501 1501
1502 1502 if streaming:
1503 1503 pullop.repo.ui.status(_('streaming all changes\n'))
1504 1504 elif not pullop.fetch:
1505 1505 pullop.repo.ui.status(_("no changes found\n"))
1506 1506 pullop.cgresult = 0
1507 1507 else:
1508 1508 if pullop.heads is None and list(pullop.common) == [nullid]:
1509 1509 pullop.repo.ui.status(_("requesting all changes\n"))
1510 1510 if obsolete.isenabled(pullop.repo, obsolete.exchangeopt):
1511 1511 remoteversions = bundle2.obsmarkersversion(pullop.remotebundle2caps)
1512 1512 if obsolete.commonversion(remoteversions) is not None:
1513 1513 kwargs['obsmarkers'] = True
1514 1514 pullop.stepsdone.add('obsmarkers')
1515 1515 _pullbundle2extraprepare(pullop, kwargs)
1516 1516 bundle = pullop.remote.getbundle('pull', **pycompat.strkwargs(kwargs))
1517 1517 try:
1518 1518 op = bundle2.bundleoperation(pullop.repo, pullop.gettransaction)
1519 1519 op.modes['bookmarks'] = 'records'
1520 1520 bundle2.processbundle(pullop.repo, bundle, op=op)
1521 1521 except bundle2.AbortFromPart as exc:
1522 1522 pullop.repo.ui.status(_('remote: abort: %s\n') % exc)
1523 1523 raise error.Abort(_('pull failed on remote'), hint=exc.hint)
1524 1524 except error.BundleValueError as exc:
1525 1525 raise error.Abort(_('missing support for %s') % exc)
1526 1526
1527 1527 if pullop.fetch:
1528 1528 pullop.cgresult = bundle2.combinechangegroupresults(op)
1529 1529
1530 1530 # processing phases change
1531 1531 for namespace, value in op.records['listkeys']:
1532 1532 if namespace == 'phases':
1533 1533 _pullapplyphases(pullop, value)
1534 1534
1535 1535 # processing bookmark update
1536 1536 if bookmarksrequested:
1537 1537 books = {}
1538 1538 for record in op.records['bookmarks']:
1539 1539 books[record['bookmark']] = record["node"]
1540 1540 pullop.remotebookmarks = books
1541 1541 else:
1542 1542 for namespace, value in op.records['listkeys']:
1543 1543 if namespace == 'bookmarks':
1544 1544 pullop.remotebookmarks = bookmod.unhexlifybookmarks(value)
1545 1545
1546 1546 # bookmark data were either already there or pulled in the bundle
1547 1547 if pullop.remotebookmarks is not None:
1548 1548 _pullbookmarks(pullop)
1549 1549
1550 1550 def _pullbundle2extraprepare(pullop, kwargs):
1551 1551 """hook function so that extensions can extend the getbundle call"""
1552 1552
1553 1553 def _pullchangeset(pullop):
1554 1554 """pull changeset from unbundle into the local repo"""
1555 1555 # We delay the open of the transaction as late as possible so we
1556 1556 # don't open transaction for nothing or you break future useful
1557 1557 # rollback call
1558 1558 if 'changegroup' in pullop.stepsdone:
1559 1559 return
1560 1560 pullop.stepsdone.add('changegroup')
1561 1561 if not pullop.fetch:
1562 1562 pullop.repo.ui.status(_("no changes found\n"))
1563 1563 pullop.cgresult = 0
1564 1564 return
1565 1565 tr = pullop.gettransaction()
1566 1566 if pullop.heads is None and list(pullop.common) == [nullid]:
1567 1567 pullop.repo.ui.status(_("requesting all changes\n"))
1568 1568 elif pullop.heads is None and pullop.remote.capable('changegroupsubset'):
1569 1569 # issue1320, avoid a race if remote changed after discovery
1570 1570 pullop.heads = pullop.rheads
1571 1571
1572 1572 if pullop.remote.capable('getbundle'):
1573 1573 # TODO: get bundlecaps from remote
1574 1574 cg = pullop.remote.getbundle('pull', common=pullop.common,
1575 1575 heads=pullop.heads or pullop.rheads)
1576 1576 elif pullop.heads is None:
1577 1577 cg = pullop.remote.changegroup(pullop.fetch, 'pull')
1578 1578 elif not pullop.remote.capable('changegroupsubset'):
1579 1579 raise error.Abort(_("partial pull cannot be done because "
1580 1580 "other repository doesn't support "
1581 1581 "changegroupsubset."))
1582 1582 else:
1583 1583 cg = pullop.remote.changegroupsubset(pullop.fetch, pullop.heads, 'pull')
1584 1584 bundleop = bundle2.applybundle(pullop.repo, cg, tr, 'pull',
1585 1585 pullop.remote.url())
1586 1586 pullop.cgresult = bundle2.combinechangegroupresults(bundleop)
1587 1587
1588 1588 def _pullphase(pullop):
1589 1589 # Get remote phases data from remote
1590 1590 if 'phases' in pullop.stepsdone:
1591 1591 return
1592 1592 remotephases = pullop.remote.listkeys('phases')
1593 1593 _pullapplyphases(pullop, remotephases)
1594 1594
1595 1595 def _pullapplyphases(pullop, remotephases):
1596 1596 """apply phase movement from observed remote state"""
1597 1597 if 'phases' in pullop.stepsdone:
1598 1598 return
1599 1599 pullop.stepsdone.add('phases')
1600 1600 publishing = bool(remotephases.get('publishing', False))
1601 1601 if remotephases and not publishing:
1602 1602 # remote is new and non-publishing
1603 1603 pheads, _dr = phases.analyzeremotephases(pullop.repo,
1604 1604 pullop.pulledsubset,
1605 1605 remotephases)
1606 1606 dheads = pullop.pulledsubset
1607 1607 else:
1608 1608 # Remote is old or publishing all common changesets
1609 1609 # should be seen as public
1610 1610 pheads = pullop.pulledsubset
1611 1611 dheads = []
1612 1612 unfi = pullop.repo.unfiltered()
1613 1613 phase = unfi._phasecache.phase
1614 1614 rev = unfi.changelog.nodemap.get
1615 1615 public = phases.public
1616 1616 draft = phases.draft
1617 1617
1618 1618 # exclude changesets already public locally and update the others
1619 1619 pheads = [pn for pn in pheads if phase(unfi, rev(pn)) > public]
1620 1620 if pheads:
1621 1621 tr = pullop.gettransaction()
1622 1622 phases.advanceboundary(pullop.repo, tr, public, pheads)
1623 1623
1624 1624 # exclude changesets already draft locally and update the others
1625 1625 dheads = [pn for pn in dheads if phase(unfi, rev(pn)) > draft]
1626 1626 if dheads:
1627 1627 tr = pullop.gettransaction()
1628 1628 phases.advanceboundary(pullop.repo, tr, draft, dheads)
1629 1629
1630 1630 def _pullbookmarks(pullop):
1631 1631 """process the remote bookmark information to update the local one"""
1632 1632 if 'bookmarks' in pullop.stepsdone:
1633 1633 return
1634 1634 pullop.stepsdone.add('bookmarks')
1635 1635 repo = pullop.repo
1636 1636 remotebookmarks = pullop.remotebookmarks
1637 1637 bookmod.updatefromremote(repo.ui, repo, remotebookmarks,
1638 1638 pullop.remote.url(),
1639 1639 pullop.gettransaction,
1640 1640 explicit=pullop.explicitbookmarks)
1641 1641
1642 1642 def _pullobsolete(pullop):
1643 1643 """utility function to pull obsolete markers from a remote
1644 1644
1645 1645 The `gettransaction` is function that return the pull transaction, creating
1646 1646 one if necessary. We return the transaction to inform the calling code that
1647 1647 a new transaction have been created (when applicable).
1648 1648
1649 1649 Exists mostly to allow overriding for experimentation purpose"""
1650 1650 if 'obsmarkers' in pullop.stepsdone:
1651 1651 return
1652 1652 pullop.stepsdone.add('obsmarkers')
1653 1653 tr = None
1654 1654 if obsolete.isenabled(pullop.repo, obsolete.exchangeopt):
1655 1655 pullop.repo.ui.debug('fetching remote obsolete markers\n')
1656 1656 remoteobs = pullop.remote.listkeys('obsolete')
1657 1657 if 'dump0' in remoteobs:
1658 1658 tr = pullop.gettransaction()
1659 1659 markers = []
1660 1660 for key in sorted(remoteobs, reverse=True):
1661 1661 if key.startswith('dump'):
1662 1662 data = util.b85decode(remoteobs[key])
1663 1663 version, newmarks = obsolete._readmarkers(data)
1664 1664 markers += newmarks
1665 1665 if markers:
1666 1666 pullop.repo.obsstore.add(tr, markers)
1667 1667 pullop.repo.invalidatevolatilesets()
1668 1668 return tr
1669 1669
1670 1670 def caps20to10(repo):
1671 1671 """return a set with appropriate options to use bundle20 during getbundle"""
1672 1672 caps = {'HG20'}
1673 1673 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
1674 1674 caps.add('bundle2=' + urlreq.quote(capsblob))
1675 1675 return caps
1676 1676
1677 1677 # List of names of steps to perform for a bundle2 for getbundle, order matters.
1678 1678 getbundle2partsorder = []
1679 1679
1680 1680 # Mapping between step name and function
1681 1681 #
1682 1682 # This exists to help extensions wrap steps if necessary
1683 1683 getbundle2partsmapping = {}
1684 1684
1685 1685 def getbundle2partsgenerator(stepname, idx=None):
1686 1686 """decorator for function generating bundle2 part for getbundle
1687 1687
1688 1688 The function is added to the step -> function mapping and appended to the
1689 1689 list of steps. Beware that decorated functions will be added in order
1690 1690 (this may matter).
1691 1691
1692 1692 You can only use this decorator for new steps, if you want to wrap a step
1693 1693 from an extension, attack the getbundle2partsmapping dictionary directly."""
1694 1694 def dec(func):
1695 1695 assert stepname not in getbundle2partsmapping
1696 1696 getbundle2partsmapping[stepname] = func
1697 1697 if idx is None:
1698 1698 getbundle2partsorder.append(stepname)
1699 1699 else:
1700 1700 getbundle2partsorder.insert(idx, stepname)
1701 1701 return func
1702 1702 return dec
1703 1703
1704 1704 def bundle2requested(bundlecaps):
1705 1705 if bundlecaps is not None:
1706 1706 return any(cap.startswith('HG2') for cap in bundlecaps)
1707 1707 return False
1708 1708
1709 1709 def getbundlechunks(repo, source, heads=None, common=None, bundlecaps=None,
1710 1710 **kwargs):
1711 1711 """Return chunks constituting a bundle's raw data.
1712 1712
1713 1713 Could be a bundle HG10 or a bundle HG20 depending on bundlecaps
1714 1714 passed.
1715 1715
1716 1716 Returns an iterator over raw chunks (of varying sizes).
1717 1717 """
1718 1718 kwargs = pycompat.byteskwargs(kwargs)
1719 1719 usebundle2 = bundle2requested(bundlecaps)
1720 1720 # bundle10 case
1721 1721 if not usebundle2:
1722 1722 if bundlecaps and not kwargs.get('cg', True):
1723 1723 raise ValueError(_('request for bundle10 must include changegroup'))
1724 1724
1725 1725 if kwargs:
1726 1726 raise ValueError(_('unsupported getbundle arguments: %s')
1727 1727 % ', '.join(sorted(kwargs.keys())))
1728 1728 outgoing = _computeoutgoing(repo, heads, common)
1729 1729 return changegroup.makestream(repo, outgoing, '01', source,
1730 1730 bundlecaps=bundlecaps)
1731 1731
1732 1732 # bundle20 case
1733 1733 b2caps = {}
1734 1734 for bcaps in bundlecaps:
1735 1735 if bcaps.startswith('bundle2='):
1736 1736 blob = urlreq.unquote(bcaps[len('bundle2='):])
1737 1737 b2caps.update(bundle2.decodecaps(blob))
1738 1738 bundler = bundle2.bundle20(repo.ui, b2caps)
1739 1739
1740 1740 kwargs['heads'] = heads
1741 1741 kwargs['common'] = common
1742 1742
1743 1743 for name in getbundle2partsorder:
1744 1744 func = getbundle2partsmapping[name]
1745 1745 func(bundler, repo, source, bundlecaps=bundlecaps, b2caps=b2caps,
1746 1746 **pycompat.strkwargs(kwargs))
1747 1747
1748 1748 return bundler.getchunks()
1749 1749
1750 @getbundle2partsgenerator('stream')
1751 def _getbundlestream(bundler, repo, source, bundlecaps=None,
1752 b2caps=None, heads=None, common=None, **kwargs):
1753 if not kwargs.get('stream', False):
1754 return
1755 filecount, bytecount, it = streamclone.generatev2(repo)
1756 requirements = ' '.join(repo.requirements)
1757 part = bundler.newpart('stream', data=it)
1758 part.addparam('bytecount', '%d' % bytecount, mandatory=True)
1759 part.addparam('filecount', '%d' % filecount, mandatory=True)
1760 part.addparam('requirements', requirements, mandatory=True)
1761 part.addparam('version', 'v2', mandatory=True)
1762
1750 1763 @getbundle2partsgenerator('changegroup')
1751 1764 def _getbundlechangegrouppart(bundler, repo, source, bundlecaps=None,
1752 1765 b2caps=None, heads=None, common=None, **kwargs):
1753 1766 """add a changegroup part to the requested bundle"""
1754 1767 cgstream = None
1755 1768 if kwargs.get(r'cg', True):
1756 1769 # build changegroup bundle here.
1757 1770 version = '01'
1758 1771 cgversions = b2caps.get('changegroup')
1759 1772 if cgversions: # 3.1 and 3.2 ship with an empty value
1760 1773 cgversions = [v for v in cgversions
1761 1774 if v in changegroup.supportedoutgoingversions(repo)]
1762 1775 if not cgversions:
1763 1776 raise ValueError(_('no common changegroup version'))
1764 1777 version = max(cgversions)
1765 1778 outgoing = _computeoutgoing(repo, heads, common)
1766 1779 if outgoing.missing:
1767 1780 cgstream = changegroup.makestream(repo, outgoing, version, source,
1768 1781 bundlecaps=bundlecaps)
1769 1782
1770 1783 if cgstream:
1771 1784 part = bundler.newpart('changegroup', data=cgstream)
1772 1785 if cgversions:
1773 1786 part.addparam('version', version)
1774 1787 part.addparam('nbchanges', '%d' % len(outgoing.missing),
1775 1788 mandatory=False)
1776 1789 if 'treemanifest' in repo.requirements:
1777 1790 part.addparam('treemanifest', '1')
1778 1791
1779 1792 @getbundle2partsgenerator('bookmarks')
1780 1793 def _getbundlebookmarkpart(bundler, repo, source, bundlecaps=None,
1781 1794 b2caps=None, **kwargs):
1782 1795 """add a bookmark part to the requested bundle"""
1783 1796 if not kwargs.get(r'bookmarks', False):
1784 1797 return
1785 1798 if 'bookmarks' not in b2caps:
1786 1799 raise ValueError(_('no common bookmarks exchange method'))
1787 1800 books = bookmod.listbinbookmarks(repo)
1788 1801 data = bookmod.binaryencode(books)
1789 1802 if data:
1790 1803 bundler.newpart('bookmarks', data=data)
1791 1804
1792 1805 @getbundle2partsgenerator('listkeys')
1793 1806 def _getbundlelistkeysparts(bundler, repo, source, bundlecaps=None,
1794 1807 b2caps=None, **kwargs):
1795 1808 """add parts containing listkeys namespaces to the requested bundle"""
1796 1809 listkeys = kwargs.get(r'listkeys', ())
1797 1810 for namespace in listkeys:
1798 1811 part = bundler.newpart('listkeys')
1799 1812 part.addparam('namespace', namespace)
1800 1813 keys = repo.listkeys(namespace).items()
1801 1814 part.data = pushkey.encodekeys(keys)
1802 1815
1803 1816 @getbundle2partsgenerator('obsmarkers')
1804 1817 def _getbundleobsmarkerpart(bundler, repo, source, bundlecaps=None,
1805 1818 b2caps=None, heads=None, **kwargs):
1806 1819 """add an obsolescence markers part to the requested bundle"""
1807 1820 if kwargs.get(r'obsmarkers', False):
1808 1821 if heads is None:
1809 1822 heads = repo.heads()
1810 1823 subset = [c.node() for c in repo.set('::%ln', heads)]
1811 1824 markers = repo.obsstore.relevantmarkers(subset)
1812 1825 markers = sorted(markers)
1813 1826 bundle2.buildobsmarkerspart(bundler, markers)
1814 1827
1815 1828 @getbundle2partsgenerator('phases')
1816 1829 def _getbundlephasespart(bundler, repo, source, bundlecaps=None,
1817 1830 b2caps=None, heads=None, **kwargs):
1818 1831 """add phase heads part to the requested bundle"""
1819 1832 if kwargs.get(r'phases', False):
1820 1833 if not 'heads' in b2caps.get('phases'):
1821 1834 raise ValueError(_('no common phases exchange method'))
1822 1835 if heads is None:
1823 1836 heads = repo.heads()
1824 1837
1825 1838 headsbyphase = collections.defaultdict(set)
1826 1839 if repo.publishing():
1827 1840 headsbyphase[phases.public] = heads
1828 1841 else:
1829 1842 # find the appropriate heads to move
1830 1843
1831 1844 phase = repo._phasecache.phase
1832 1845 node = repo.changelog.node
1833 1846 rev = repo.changelog.rev
1834 1847 for h in heads:
1835 1848 headsbyphase[phase(repo, rev(h))].add(h)
1836 1849 seenphases = list(headsbyphase.keys())
1837 1850
1838 1851 # We do not handle anything but public and draft phase for now)
1839 1852 if seenphases:
1840 1853 assert max(seenphases) <= phases.draft
1841 1854
1842 1855 # if client is pulling non-public changesets, we need to find
1843 1856 # intermediate public heads.
1844 1857 draftheads = headsbyphase.get(phases.draft, set())
1845 1858 if draftheads:
1846 1859 publicheads = headsbyphase.get(phases.public, set())
1847 1860
1848 1861 revset = 'heads(only(%ln, %ln) and public())'
1849 1862 extraheads = repo.revs(revset, draftheads, publicheads)
1850 1863 for r in extraheads:
1851 1864 headsbyphase[phases.public].add(node(r))
1852 1865
1853 1866 # transform data in a format used by the encoding function
1854 1867 phasemapping = []
1855 1868 for phase in phases.allphases:
1856 1869 phasemapping.append(sorted(headsbyphase[phase]))
1857 1870
1858 1871 # generate the actual part
1859 1872 phasedata = phases.binaryencode(phasemapping)
1860 1873 bundler.newpart('phase-heads', data=phasedata)
1861 1874
1862 1875 @getbundle2partsgenerator('hgtagsfnodes')
1863 1876 def _getbundletagsfnodes(bundler, repo, source, bundlecaps=None,
1864 1877 b2caps=None, heads=None, common=None,
1865 1878 **kwargs):
1866 1879 """Transfer the .hgtags filenodes mapping.
1867 1880
1868 1881 Only values for heads in this bundle will be transferred.
1869 1882
1870 1883 The part data consists of pairs of 20 byte changeset node and .hgtags
1871 1884 filenodes raw values.
1872 1885 """
1873 1886 # Don't send unless:
1874 1887 # - changeset are being exchanged,
1875 1888 # - the client supports it.
1876 1889 if not (kwargs.get(r'cg', True) and 'hgtagsfnodes' in b2caps):
1877 1890 return
1878 1891
1879 1892 outgoing = _computeoutgoing(repo, heads, common)
1880 1893 bundle2.addparttagsfnodescache(repo, bundler, outgoing)
1881 1894
1882 1895 def check_heads(repo, their_heads, context):
1883 1896 """check if the heads of a repo have been modified
1884 1897
1885 1898 Used by peer for unbundling.
1886 1899 """
1887 1900 heads = repo.heads()
1888 1901 heads_hash = hashlib.sha1(''.join(sorted(heads))).digest()
1889 1902 if not (their_heads == ['force'] or their_heads == heads or
1890 1903 their_heads == ['hashed', heads_hash]):
1891 1904 # someone else committed/pushed/unbundled while we
1892 1905 # were transferring data
1893 1906 raise error.PushRaced('repository changed while %s - '
1894 1907 'please try again' % context)
1895 1908
1896 1909 def unbundle(repo, cg, heads, source, url):
1897 1910 """Apply a bundle to a repo.
1898 1911
1899 1912 this function makes sure the repo is locked during the application and have
1900 1913 mechanism to check that no push race occurred between the creation of the
1901 1914 bundle and its application.
1902 1915
1903 1916 If the push was raced as PushRaced exception is raised."""
1904 1917 r = 0
1905 1918 # need a transaction when processing a bundle2 stream
1906 1919 # [wlock, lock, tr] - needs to be an array so nested functions can modify it
1907 1920 lockandtr = [None, None, None]
1908 1921 recordout = None
1909 1922 # quick fix for output mismatch with bundle2 in 3.4
1910 1923 captureoutput = repo.ui.configbool('experimental', 'bundle2-output-capture')
1911 1924 if url.startswith('remote:http:') or url.startswith('remote:https:'):
1912 1925 captureoutput = True
1913 1926 try:
1914 1927 # note: outside bundle1, 'heads' is expected to be empty and this
1915 1928 # 'check_heads' call wil be a no-op
1916 1929 check_heads(repo, heads, 'uploading changes')
1917 1930 # push can proceed
1918 1931 if not isinstance(cg, bundle2.unbundle20):
1919 1932 # legacy case: bundle1 (changegroup 01)
1920 1933 txnname = "\n".join([source, util.hidepassword(url)])
1921 1934 with repo.lock(), repo.transaction(txnname) as tr:
1922 1935 op = bundle2.applybundle(repo, cg, tr, source, url)
1923 1936 r = bundle2.combinechangegroupresults(op)
1924 1937 else:
1925 1938 r = None
1926 1939 try:
1927 1940 def gettransaction():
1928 1941 if not lockandtr[2]:
1929 1942 lockandtr[0] = repo.wlock()
1930 1943 lockandtr[1] = repo.lock()
1931 1944 lockandtr[2] = repo.transaction(source)
1932 1945 lockandtr[2].hookargs['source'] = source
1933 1946 lockandtr[2].hookargs['url'] = url
1934 1947 lockandtr[2].hookargs['bundle2'] = '1'
1935 1948 return lockandtr[2]
1936 1949
1937 1950 # Do greedy locking by default until we're satisfied with lazy
1938 1951 # locking.
1939 1952 if not repo.ui.configbool('experimental', 'bundle2lazylocking'):
1940 1953 gettransaction()
1941 1954
1942 1955 op = bundle2.bundleoperation(repo, gettransaction,
1943 1956 captureoutput=captureoutput)
1944 1957 try:
1945 1958 op = bundle2.processbundle(repo, cg, op=op)
1946 1959 finally:
1947 1960 r = op.reply
1948 1961 if captureoutput and r is not None:
1949 1962 repo.ui.pushbuffer(error=True, subproc=True)
1950 1963 def recordout(output):
1951 1964 r.newpart('output', data=output, mandatory=False)
1952 1965 if lockandtr[2] is not None:
1953 1966 lockandtr[2].close()
1954 1967 except BaseException as exc:
1955 1968 exc.duringunbundle2 = True
1956 1969 if captureoutput and r is not None:
1957 1970 parts = exc._bundle2salvagedoutput = r.salvageoutput()
1958 1971 def recordout(output):
1959 1972 part = bundle2.bundlepart('output', data=output,
1960 1973 mandatory=False)
1961 1974 parts.append(part)
1962 1975 raise
1963 1976 finally:
1964 1977 lockmod.release(lockandtr[2], lockandtr[1], lockandtr[0])
1965 1978 if recordout is not None:
1966 1979 recordout(repo.ui.popbuffer())
1967 1980 return r
1968 1981
1969 1982 def _maybeapplyclonebundle(pullop):
1970 1983 """Apply a clone bundle from a remote, if possible."""
1971 1984
1972 1985 repo = pullop.repo
1973 1986 remote = pullop.remote
1974 1987
1975 1988 if not repo.ui.configbool('ui', 'clonebundles'):
1976 1989 return
1977 1990
1978 1991 # Only run if local repo is empty.
1979 1992 if len(repo):
1980 1993 return
1981 1994
1982 1995 if pullop.heads:
1983 1996 return
1984 1997
1985 1998 if not remote.capable('clonebundles'):
1986 1999 return
1987 2000
1988 2001 res = remote._call('clonebundles')
1989 2002
1990 2003 # If we call the wire protocol command, that's good enough to record the
1991 2004 # attempt.
1992 2005 pullop.clonebundleattempted = True
1993 2006
1994 2007 entries = parseclonebundlesmanifest(repo, res)
1995 2008 if not entries:
1996 2009 repo.ui.note(_('no clone bundles available on remote; '
1997 2010 'falling back to regular clone\n'))
1998 2011 return
1999 2012
2000 2013 entries = filterclonebundleentries(
2001 2014 repo, entries, streamclonerequested=pullop.streamclonerequested)
2002 2015
2003 2016 if not entries:
2004 2017 # There is a thundering herd concern here. However, if a server
2005 2018 # operator doesn't advertise bundles appropriate for its clients,
2006 2019 # they deserve what's coming. Furthermore, from a client's
2007 2020 # perspective, no automatic fallback would mean not being able to
2008 2021 # clone!
2009 2022 repo.ui.warn(_('no compatible clone bundles available on server; '
2010 2023 'falling back to regular clone\n'))
2011 2024 repo.ui.warn(_('(you may want to report this to the server '
2012 2025 'operator)\n'))
2013 2026 return
2014 2027
2015 2028 entries = sortclonebundleentries(repo.ui, entries)
2016 2029
2017 2030 url = entries[0]['URL']
2018 2031 repo.ui.status(_('applying clone bundle from %s\n') % url)
2019 2032 if trypullbundlefromurl(repo.ui, repo, url):
2020 2033 repo.ui.status(_('finished applying clone bundle\n'))
2021 2034 # Bundle failed.
2022 2035 #
2023 2036 # We abort by default to avoid the thundering herd of
2024 2037 # clients flooding a server that was expecting expensive
2025 2038 # clone load to be offloaded.
2026 2039 elif repo.ui.configbool('ui', 'clonebundlefallback'):
2027 2040 repo.ui.warn(_('falling back to normal clone\n'))
2028 2041 else:
2029 2042 raise error.Abort(_('error applying bundle'),
2030 2043 hint=_('if this error persists, consider contacting '
2031 2044 'the server operator or disable clone '
2032 2045 'bundles via '
2033 2046 '"--config ui.clonebundles=false"'))
2034 2047
2035 2048 def parseclonebundlesmanifest(repo, s):
2036 2049 """Parses the raw text of a clone bundles manifest.
2037 2050
2038 2051 Returns a list of dicts. The dicts have a ``URL`` key corresponding
2039 2052 to the URL and other keys are the attributes for the entry.
2040 2053 """
2041 2054 m = []
2042 2055 for line in s.splitlines():
2043 2056 fields = line.split()
2044 2057 if not fields:
2045 2058 continue
2046 2059 attrs = {'URL': fields[0]}
2047 2060 for rawattr in fields[1:]:
2048 2061 key, value = rawattr.split('=', 1)
2049 2062 key = urlreq.unquote(key)
2050 2063 value = urlreq.unquote(value)
2051 2064 attrs[key] = value
2052 2065
2053 2066 # Parse BUNDLESPEC into components. This makes client-side
2054 2067 # preferences easier to specify since you can prefer a single
2055 2068 # component of the BUNDLESPEC.
2056 2069 if key == 'BUNDLESPEC':
2057 2070 try:
2058 2071 comp, version, params = parsebundlespec(repo, value,
2059 2072 externalnames=True)
2060 2073 attrs['COMPRESSION'] = comp
2061 2074 attrs['VERSION'] = version
2062 2075 except error.InvalidBundleSpecification:
2063 2076 pass
2064 2077 except error.UnsupportedBundleSpecification:
2065 2078 pass
2066 2079
2067 2080 m.append(attrs)
2068 2081
2069 2082 return m
2070 2083
2071 2084 def filterclonebundleentries(repo, entries, streamclonerequested=False):
2072 2085 """Remove incompatible clone bundle manifest entries.
2073 2086
2074 2087 Accepts a list of entries parsed with ``parseclonebundlesmanifest``
2075 2088 and returns a new list consisting of only the entries that this client
2076 2089 should be able to apply.
2077 2090
2078 2091 There is no guarantee we'll be able to apply all returned entries because
2079 2092 the metadata we use to filter on may be missing or wrong.
2080 2093 """
2081 2094 newentries = []
2082 2095 for entry in entries:
2083 2096 spec = entry.get('BUNDLESPEC')
2084 2097 if spec:
2085 2098 try:
2086 2099 comp, version, params = parsebundlespec(repo, spec, strict=True)
2087 2100
2088 2101 # If a stream clone was requested, filter out non-streamclone
2089 2102 # entries.
2090 2103 if streamclonerequested and (comp != 'UN' or version != 's1'):
2091 2104 repo.ui.debug('filtering %s because not a stream clone\n' %
2092 2105 entry['URL'])
2093 2106 continue
2094 2107
2095 2108 except error.InvalidBundleSpecification as e:
2096 2109 repo.ui.debug(str(e) + '\n')
2097 2110 continue
2098 2111 except error.UnsupportedBundleSpecification as e:
2099 2112 repo.ui.debug('filtering %s because unsupported bundle '
2100 2113 'spec: %s\n' % (entry['URL'], str(e)))
2101 2114 continue
2102 2115 # If we don't have a spec and requested a stream clone, we don't know
2103 2116 # what the entry is so don't attempt to apply it.
2104 2117 elif streamclonerequested:
2105 2118 repo.ui.debug('filtering %s because cannot determine if a stream '
2106 2119 'clone bundle\n' % entry['URL'])
2107 2120 continue
2108 2121
2109 2122 if 'REQUIRESNI' in entry and not sslutil.hassni:
2110 2123 repo.ui.debug('filtering %s because SNI not supported\n' %
2111 2124 entry['URL'])
2112 2125 continue
2113 2126
2114 2127 newentries.append(entry)
2115 2128
2116 2129 return newentries
2117 2130
2118 2131 class clonebundleentry(object):
2119 2132 """Represents an item in a clone bundles manifest.
2120 2133
2121 2134 This rich class is needed to support sorting since sorted() in Python 3
2122 2135 doesn't support ``cmp`` and our comparison is complex enough that ``key=``
2123 2136 won't work.
2124 2137 """
2125 2138
2126 2139 def __init__(self, value, prefers):
2127 2140 self.value = value
2128 2141 self.prefers = prefers
2129 2142
2130 2143 def _cmp(self, other):
2131 2144 for prefkey, prefvalue in self.prefers:
2132 2145 avalue = self.value.get(prefkey)
2133 2146 bvalue = other.value.get(prefkey)
2134 2147
2135 2148 # Special case for b missing attribute and a matches exactly.
2136 2149 if avalue is not None and bvalue is None and avalue == prefvalue:
2137 2150 return -1
2138 2151
2139 2152 # Special case for a missing attribute and b matches exactly.
2140 2153 if bvalue is not None and avalue is None and bvalue == prefvalue:
2141 2154 return 1
2142 2155
2143 2156 # We can't compare unless attribute present on both.
2144 2157 if avalue is None or bvalue is None:
2145 2158 continue
2146 2159
2147 2160 # Same values should fall back to next attribute.
2148 2161 if avalue == bvalue:
2149 2162 continue
2150 2163
2151 2164 # Exact matches come first.
2152 2165 if avalue == prefvalue:
2153 2166 return -1
2154 2167 if bvalue == prefvalue:
2155 2168 return 1
2156 2169
2157 2170 # Fall back to next attribute.
2158 2171 continue
2159 2172
2160 2173 # If we got here we couldn't sort by attributes and prefers. Fall
2161 2174 # back to index order.
2162 2175 return 0
2163 2176
2164 2177 def __lt__(self, other):
2165 2178 return self._cmp(other) < 0
2166 2179
2167 2180 def __gt__(self, other):
2168 2181 return self._cmp(other) > 0
2169 2182
2170 2183 def __eq__(self, other):
2171 2184 return self._cmp(other) == 0
2172 2185
2173 2186 def __le__(self, other):
2174 2187 return self._cmp(other) <= 0
2175 2188
2176 2189 def __ge__(self, other):
2177 2190 return self._cmp(other) >= 0
2178 2191
2179 2192 def __ne__(self, other):
2180 2193 return self._cmp(other) != 0
2181 2194
2182 2195 def sortclonebundleentries(ui, entries):
2183 2196 prefers = ui.configlist('ui', 'clonebundleprefers')
2184 2197 if not prefers:
2185 2198 return list(entries)
2186 2199
2187 2200 prefers = [p.split('=', 1) for p in prefers]
2188 2201
2189 2202 items = sorted(clonebundleentry(v, prefers) for v in entries)
2190 2203 return [i.value for i in items]
2191 2204
2192 2205 def trypullbundlefromurl(ui, repo, url):
2193 2206 """Attempt to apply a bundle from a URL."""
2194 2207 with repo.lock(), repo.transaction('bundleurl') as tr:
2195 2208 try:
2196 2209 fh = urlmod.open(ui, url)
2197 2210 cg = readbundle(ui, fh, 'stream')
2198 2211
2199 2212 if isinstance(cg, streamclone.streamcloneapplier):
2200 2213 cg.apply(repo)
2201 2214 else:
2202 2215 bundle2.applybundle(repo, cg, tr, 'clonebundles', url)
2203 2216 return True
2204 2217 except urlerr.httperror as e:
2205 2218 ui.warn(_('HTTP error fetching bundle: %s\n') % str(e))
2206 2219 except urlerr.urlerror as e:
2207 2220 ui.warn(_('error fetching bundle: %s\n') % e.reason)
2208 2221
2209 2222 return False
@@ -1,1068 +1,1070 b''
1 1 # wireproto.py - generic wire protocol support functions
2 2 #
3 3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import hashlib
11 11 import os
12 12 import tempfile
13 13
14 14 from .i18n import _
15 15 from .node import (
16 16 bin,
17 17 hex,
18 18 nullid,
19 19 )
20 20
21 21 from . import (
22 22 bundle2,
23 23 changegroup as changegroupmod,
24 24 discovery,
25 25 encoding,
26 26 error,
27 27 exchange,
28 28 peer,
29 29 pushkey as pushkeymod,
30 30 pycompat,
31 31 repository,
32 32 streamclone,
33 33 util,
34 34 )
35 35
36 36 urlerr = util.urlerr
37 37 urlreq = util.urlreq
38 38
39 39 bundle2requiredmain = _('incompatible Mercurial client; bundle2 required')
40 40 bundle2requiredhint = _('see https://www.mercurial-scm.org/wiki/'
41 41 'IncompatibleClient')
42 42 bundle2required = '%s\n(%s)\n' % (bundle2requiredmain, bundle2requiredhint)
43 43
44 44 class abstractserverproto(object):
45 45 """abstract class that summarizes the protocol API
46 46
47 47 Used as reference and documentation.
48 48 """
49 49
50 50 def getargs(self, args):
51 51 """return the value for arguments in <args>
52 52
53 53 returns a list of values (same order as <args>)"""
54 54 raise NotImplementedError()
55 55
56 56 def getfile(self, fp):
57 57 """write the whole content of a file into a file like object
58 58
59 59 The file is in the form::
60 60
61 61 (<chunk-size>\n<chunk>)+0\n
62 62
63 63 chunk size is the ascii version of the int.
64 64 """
65 65 raise NotImplementedError()
66 66
67 67 def redirect(self):
68 68 """may setup interception for stdout and stderr
69 69
70 70 See also the `restore` method."""
71 71 raise NotImplementedError()
72 72
73 73 # If the `redirect` function does install interception, the `restore`
74 74 # function MUST be defined. If interception is not used, this function
75 75 # MUST NOT be defined.
76 76 #
77 77 # left commented here on purpose
78 78 #
79 79 #def restore(self):
80 80 # """reinstall previous stdout and stderr and return intercepted stdout
81 81 # """
82 82 # raise NotImplementedError()
83 83
84 84 class remoteiterbatcher(peer.iterbatcher):
85 85 def __init__(self, remote):
86 86 super(remoteiterbatcher, self).__init__()
87 87 self._remote = remote
88 88
89 89 def __getattr__(self, name):
90 90 # Validate this method is batchable, since submit() only supports
91 91 # batchable methods.
92 92 fn = getattr(self._remote, name)
93 93 if not getattr(fn, 'batchable', None):
94 94 raise error.ProgrammingError('Attempted to batch a non-batchable '
95 95 'call to %r' % name)
96 96
97 97 return super(remoteiterbatcher, self).__getattr__(name)
98 98
99 99 def submit(self):
100 100 """Break the batch request into many patch calls and pipeline them.
101 101
102 102 This is mostly valuable over http where request sizes can be
103 103 limited, but can be used in other places as well.
104 104 """
105 105 # 2-tuple of (command, arguments) that represents what will be
106 106 # sent over the wire.
107 107 requests = []
108 108
109 109 # 4-tuple of (command, final future, @batchable generator, remote
110 110 # future).
111 111 results = []
112 112
113 113 for command, args, opts, finalfuture in self.calls:
114 114 mtd = getattr(self._remote, command)
115 115 batchable = mtd.batchable(mtd.__self__, *args, **opts)
116 116
117 117 commandargs, fremote = next(batchable)
118 118 assert fremote
119 119 requests.append((command, commandargs))
120 120 results.append((command, finalfuture, batchable, fremote))
121 121
122 122 if requests:
123 123 self._resultiter = self._remote._submitbatch(requests)
124 124
125 125 self._results = results
126 126
127 127 def results(self):
128 128 for command, finalfuture, batchable, remotefuture in self._results:
129 129 # Get the raw result, set it in the remote future, feed it
130 130 # back into the @batchable generator so it can be decoded, and
131 131 # set the result on the final future to this value.
132 132 remoteresult = next(self._resultiter)
133 133 remotefuture.set(remoteresult)
134 134 finalfuture.set(next(batchable))
135 135
136 136 # Verify our @batchable generators only emit 2 values.
137 137 try:
138 138 next(batchable)
139 139 except StopIteration:
140 140 pass
141 141 else:
142 142 raise error.ProgrammingError('%s @batchable generator emitted '
143 143 'unexpected value count' % command)
144 144
145 145 yield finalfuture.value
146 146
147 147 # Forward a couple of names from peer to make wireproto interactions
148 148 # slightly more sensible.
149 149 batchable = peer.batchable
150 150 future = peer.future
151 151
152 152 # list of nodes encoding / decoding
153 153
154 154 def decodelist(l, sep=' '):
155 155 if l:
156 156 return [bin(v) for v in l.split(sep)]
157 157 return []
158 158
159 159 def encodelist(l, sep=' '):
160 160 try:
161 161 return sep.join(map(hex, l))
162 162 except TypeError:
163 163 raise
164 164
165 165 # batched call argument encoding
166 166
167 167 def escapearg(plain):
168 168 return (plain
169 169 .replace(':', ':c')
170 170 .replace(',', ':o')
171 171 .replace(';', ':s')
172 172 .replace('=', ':e'))
173 173
174 174 def unescapearg(escaped):
175 175 return (escaped
176 176 .replace(':e', '=')
177 177 .replace(':s', ';')
178 178 .replace(':o', ',')
179 179 .replace(':c', ':'))
180 180
181 181 def encodebatchcmds(req):
182 182 """Return a ``cmds`` argument value for the ``batch`` command."""
183 183 cmds = []
184 184 for op, argsdict in req:
185 185 # Old servers didn't properly unescape argument names. So prevent
186 186 # the sending of argument names that may not be decoded properly by
187 187 # servers.
188 188 assert all(escapearg(k) == k for k in argsdict)
189 189
190 190 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
191 191 for k, v in argsdict.iteritems())
192 192 cmds.append('%s %s' % (op, args))
193 193
194 194 return ';'.join(cmds)
195 195
196 196 # mapping of options accepted by getbundle and their types
197 197 #
198 198 # Meant to be extended by extensions. It is extensions responsibility to ensure
199 199 # such options are properly processed in exchange.getbundle.
200 200 #
201 201 # supported types are:
202 202 #
203 203 # :nodes: list of binary nodes
204 204 # :csv: list of comma-separated values
205 205 # :scsv: list of comma-separated values return as set
206 206 # :plain: string with no transformation needed.
207 207 gboptsmap = {'heads': 'nodes',
208 208 'bookmarks': 'boolean',
209 209 'common': 'nodes',
210 210 'obsmarkers': 'boolean',
211 211 'phases': 'boolean',
212 212 'bundlecaps': 'scsv',
213 213 'listkeys': 'csv',
214 214 'cg': 'boolean',
215 'cbattempted': 'boolean'}
215 'cbattempted': 'boolean',
216 'stream': 'boolean',
217 }
216 218
217 219 # client side
218 220
219 221 class wirepeer(repository.legacypeer):
220 222 """Client-side interface for communicating with a peer repository.
221 223
222 224 Methods commonly call wire protocol commands of the same name.
223 225
224 226 See also httppeer.py and sshpeer.py for protocol-specific
225 227 implementations of this interface.
226 228 """
227 229 # Begin of basewirepeer interface.
228 230
229 231 def iterbatch(self):
230 232 return remoteiterbatcher(self)
231 233
232 234 @batchable
233 235 def lookup(self, key):
234 236 self.requirecap('lookup', _('look up remote revision'))
235 237 f = future()
236 238 yield {'key': encoding.fromlocal(key)}, f
237 239 d = f.value
238 240 success, data = d[:-1].split(" ", 1)
239 241 if int(success):
240 242 yield bin(data)
241 243 else:
242 244 self._abort(error.RepoError(data))
243 245
244 246 @batchable
245 247 def heads(self):
246 248 f = future()
247 249 yield {}, f
248 250 d = f.value
249 251 try:
250 252 yield decodelist(d[:-1])
251 253 except ValueError:
252 254 self._abort(error.ResponseError(_("unexpected response:"), d))
253 255
254 256 @batchable
255 257 def known(self, nodes):
256 258 f = future()
257 259 yield {'nodes': encodelist(nodes)}, f
258 260 d = f.value
259 261 try:
260 262 yield [bool(int(b)) for b in d]
261 263 except ValueError:
262 264 self._abort(error.ResponseError(_("unexpected response:"), d))
263 265
264 266 @batchable
265 267 def branchmap(self):
266 268 f = future()
267 269 yield {}, f
268 270 d = f.value
269 271 try:
270 272 branchmap = {}
271 273 for branchpart in d.splitlines():
272 274 branchname, branchheads = branchpart.split(' ', 1)
273 275 branchname = encoding.tolocal(urlreq.unquote(branchname))
274 276 branchheads = decodelist(branchheads)
275 277 branchmap[branchname] = branchheads
276 278 yield branchmap
277 279 except TypeError:
278 280 self._abort(error.ResponseError(_("unexpected response:"), d))
279 281
280 282 @batchable
281 283 def listkeys(self, namespace):
282 284 if not self.capable('pushkey'):
283 285 yield {}, None
284 286 f = future()
285 287 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
286 288 yield {'namespace': encoding.fromlocal(namespace)}, f
287 289 d = f.value
288 290 self.ui.debug('received listkey for "%s": %i bytes\n'
289 291 % (namespace, len(d)))
290 292 yield pushkeymod.decodekeys(d)
291 293
292 294 @batchable
293 295 def pushkey(self, namespace, key, old, new):
294 296 if not self.capable('pushkey'):
295 297 yield False, None
296 298 f = future()
297 299 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
298 300 yield {'namespace': encoding.fromlocal(namespace),
299 301 'key': encoding.fromlocal(key),
300 302 'old': encoding.fromlocal(old),
301 303 'new': encoding.fromlocal(new)}, f
302 304 d = f.value
303 305 d, output = d.split('\n', 1)
304 306 try:
305 307 d = bool(int(d))
306 308 except ValueError:
307 309 raise error.ResponseError(
308 310 _('push failed (unexpected response):'), d)
309 311 for l in output.splitlines(True):
310 312 self.ui.status(_('remote: '), l)
311 313 yield d
312 314
313 315 def stream_out(self):
314 316 return self._callstream('stream_out')
315 317
316 318 def getbundle(self, source, **kwargs):
317 319 kwargs = pycompat.byteskwargs(kwargs)
318 320 self.requirecap('getbundle', _('look up remote changes'))
319 321 opts = {}
320 322 bundlecaps = kwargs.get('bundlecaps')
321 323 if bundlecaps is not None:
322 324 kwargs['bundlecaps'] = sorted(bundlecaps)
323 325 else:
324 326 bundlecaps = () # kwargs could have it to None
325 327 for key, value in kwargs.iteritems():
326 328 if value is None:
327 329 continue
328 330 keytype = gboptsmap.get(key)
329 331 if keytype is None:
330 332 raise error.ProgrammingError(
331 333 'Unexpectedly None keytype for key %s' % key)
332 334 elif keytype == 'nodes':
333 335 value = encodelist(value)
334 336 elif keytype in ('csv', 'scsv'):
335 337 value = ','.join(value)
336 338 elif keytype == 'boolean':
337 339 value = '%i' % bool(value)
338 340 elif keytype != 'plain':
339 341 raise KeyError('unknown getbundle option type %s'
340 342 % keytype)
341 343 opts[key] = value
342 344 f = self._callcompressable("getbundle", **pycompat.strkwargs(opts))
343 345 if any((cap.startswith('HG2') for cap in bundlecaps)):
344 346 return bundle2.getunbundler(self.ui, f)
345 347 else:
346 348 return changegroupmod.cg1unpacker(f, 'UN')
347 349
348 350 def unbundle(self, cg, heads, url):
349 351 '''Send cg (a readable file-like object representing the
350 352 changegroup to push, typically a chunkbuffer object) to the
351 353 remote server as a bundle.
352 354
353 355 When pushing a bundle10 stream, return an integer indicating the
354 356 result of the push (see changegroup.apply()).
355 357
356 358 When pushing a bundle20 stream, return a bundle20 stream.
357 359
358 360 `url` is the url the client thinks it's pushing to, which is
359 361 visible to hooks.
360 362 '''
361 363
362 364 if heads != ['force'] and self.capable('unbundlehash'):
363 365 heads = encodelist(['hashed',
364 366 hashlib.sha1(''.join(sorted(heads))).digest()])
365 367 else:
366 368 heads = encodelist(heads)
367 369
368 370 if util.safehasattr(cg, 'deltaheader'):
369 371 # this a bundle10, do the old style call sequence
370 372 ret, output = self._callpush("unbundle", cg, heads=heads)
371 373 if ret == "":
372 374 raise error.ResponseError(
373 375 _('push failed:'), output)
374 376 try:
375 377 ret = int(ret)
376 378 except ValueError:
377 379 raise error.ResponseError(
378 380 _('push failed (unexpected response):'), ret)
379 381
380 382 for l in output.splitlines(True):
381 383 self.ui.status(_('remote: '), l)
382 384 else:
383 385 # bundle2 push. Send a stream, fetch a stream.
384 386 stream = self._calltwowaystream('unbundle', cg, heads=heads)
385 387 ret = bundle2.getunbundler(self.ui, stream)
386 388 return ret
387 389
388 390 # End of basewirepeer interface.
389 391
390 392 # Begin of baselegacywirepeer interface.
391 393
392 394 def branches(self, nodes):
393 395 n = encodelist(nodes)
394 396 d = self._call("branches", nodes=n)
395 397 try:
396 398 br = [tuple(decodelist(b)) for b in d.splitlines()]
397 399 return br
398 400 except ValueError:
399 401 self._abort(error.ResponseError(_("unexpected response:"), d))
400 402
401 403 def between(self, pairs):
402 404 batch = 8 # avoid giant requests
403 405 r = []
404 406 for i in xrange(0, len(pairs), batch):
405 407 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
406 408 d = self._call("between", pairs=n)
407 409 try:
408 410 r.extend(l and decodelist(l) or [] for l in d.splitlines())
409 411 except ValueError:
410 412 self._abort(error.ResponseError(_("unexpected response:"), d))
411 413 return r
412 414
413 415 def changegroup(self, nodes, kind):
414 416 n = encodelist(nodes)
415 417 f = self._callcompressable("changegroup", roots=n)
416 418 return changegroupmod.cg1unpacker(f, 'UN')
417 419
418 420 def changegroupsubset(self, bases, heads, kind):
419 421 self.requirecap('changegroupsubset', _('look up remote changes'))
420 422 bases = encodelist(bases)
421 423 heads = encodelist(heads)
422 424 f = self._callcompressable("changegroupsubset",
423 425 bases=bases, heads=heads)
424 426 return changegroupmod.cg1unpacker(f, 'UN')
425 427
426 428 # End of baselegacywirepeer interface.
427 429
428 430 def _submitbatch(self, req):
429 431 """run batch request <req> on the server
430 432
431 433 Returns an iterator of the raw responses from the server.
432 434 """
433 435 rsp = self._callstream("batch", cmds=encodebatchcmds(req))
434 436 chunk = rsp.read(1024)
435 437 work = [chunk]
436 438 while chunk:
437 439 while ';' not in chunk and chunk:
438 440 chunk = rsp.read(1024)
439 441 work.append(chunk)
440 442 merged = ''.join(work)
441 443 while ';' in merged:
442 444 one, merged = merged.split(';', 1)
443 445 yield unescapearg(one)
444 446 chunk = rsp.read(1024)
445 447 work = [merged, chunk]
446 448 yield unescapearg(''.join(work))
447 449
448 450 def _submitone(self, op, args):
449 451 return self._call(op, **pycompat.strkwargs(args))
450 452
451 453 def debugwireargs(self, one, two, three=None, four=None, five=None):
452 454 # don't pass optional arguments left at their default value
453 455 opts = {}
454 456 if three is not None:
455 457 opts[r'three'] = three
456 458 if four is not None:
457 459 opts[r'four'] = four
458 460 return self._call('debugwireargs', one=one, two=two, **opts)
459 461
460 462 def _call(self, cmd, **args):
461 463 """execute <cmd> on the server
462 464
463 465 The command is expected to return a simple string.
464 466
465 467 returns the server reply as a string."""
466 468 raise NotImplementedError()
467 469
468 470 def _callstream(self, cmd, **args):
469 471 """execute <cmd> on the server
470 472
471 473 The command is expected to return a stream. Note that if the
472 474 command doesn't return a stream, _callstream behaves
473 475 differently for ssh and http peers.
474 476
475 477 returns the server reply as a file like object.
476 478 """
477 479 raise NotImplementedError()
478 480
479 481 def _callcompressable(self, cmd, **args):
480 482 """execute <cmd> on the server
481 483
482 484 The command is expected to return a stream.
483 485
484 486 The stream may have been compressed in some implementations. This
485 487 function takes care of the decompression. This is the only difference
486 488 with _callstream.
487 489
488 490 returns the server reply as a file like object.
489 491 """
490 492 raise NotImplementedError()
491 493
492 494 def _callpush(self, cmd, fp, **args):
493 495 """execute a <cmd> on server
494 496
495 497 The command is expected to be related to a push. Push has a special
496 498 return method.
497 499
498 500 returns the server reply as a (ret, output) tuple. ret is either
499 501 empty (error) or a stringified int.
500 502 """
501 503 raise NotImplementedError()
502 504
503 505 def _calltwowaystream(self, cmd, fp, **args):
504 506 """execute <cmd> on server
505 507
506 508 The command will send a stream to the server and get a stream in reply.
507 509 """
508 510 raise NotImplementedError()
509 511
510 512 def _abort(self, exception):
511 513 """clearly abort the wire protocol connection and raise the exception
512 514 """
513 515 raise NotImplementedError()
514 516
515 517 # server side
516 518
517 519 # wire protocol command can either return a string or one of these classes.
518 520 class streamres(object):
519 521 """wireproto reply: binary stream
520 522
521 523 The call was successful and the result is a stream.
522 524
523 525 Accepts a generator containing chunks of data to be sent to the client.
524 526
525 527 ``prefer_uncompressed`` indicates that the data is expected to be
526 528 uncompressable and that the stream should therefore use the ``none``
527 529 engine.
528 530 """
529 531 def __init__(self, gen=None, prefer_uncompressed=False):
530 532 self.gen = gen
531 533 self.prefer_uncompressed = prefer_uncompressed
532 534
533 535 class streamres_legacy(object):
534 536 """wireproto reply: uncompressed binary stream
535 537
536 538 The call was successful and the result is a stream.
537 539
538 540 Accepts a generator containing chunks of data to be sent to the client.
539 541
540 542 Like ``streamres``, but sends an uncompressed data for "version 1" clients
541 543 using the application/mercurial-0.1 media type.
542 544 """
543 545 def __init__(self, gen=None):
544 546 self.gen = gen
545 547
546 548 class pushres(object):
547 549 """wireproto reply: success with simple integer return
548 550
549 551 The call was successful and returned an integer contained in `self.res`.
550 552 """
551 553 def __init__(self, res):
552 554 self.res = res
553 555
554 556 class pusherr(object):
555 557 """wireproto reply: failure
556 558
557 559 The call failed. The `self.res` attribute contains the error message.
558 560 """
559 561 def __init__(self, res):
560 562 self.res = res
561 563
562 564 class ooberror(object):
563 565 """wireproto reply: failure of a batch of operation
564 566
565 567 Something failed during a batch call. The error message is stored in
566 568 `self.message`.
567 569 """
568 570 def __init__(self, message):
569 571 self.message = message
570 572
571 573 def getdispatchrepo(repo, proto, command):
572 574 """Obtain the repo used for processing wire protocol commands.
573 575
574 576 The intent of this function is to serve as a monkeypatch point for
575 577 extensions that need commands to operate on different repo views under
576 578 specialized circumstances.
577 579 """
578 580 return repo.filtered('served')
579 581
580 582 def dispatch(repo, proto, command):
581 583 repo = getdispatchrepo(repo, proto, command)
582 584 func, spec = commands[command]
583 585 args = proto.getargs(spec)
584 586 return func(repo, proto, *args)
585 587
586 588 def options(cmd, keys, others):
587 589 opts = {}
588 590 for k in keys:
589 591 if k in others:
590 592 opts[k] = others[k]
591 593 del others[k]
592 594 if others:
593 595 util.stderr.write("warning: %s ignored unexpected arguments %s\n"
594 596 % (cmd, ",".join(others)))
595 597 return opts
596 598
597 599 def bundle1allowed(repo, action):
598 600 """Whether a bundle1 operation is allowed from the server.
599 601
600 602 Priority is:
601 603
602 604 1. server.bundle1gd.<action> (if generaldelta active)
603 605 2. server.bundle1.<action>
604 606 3. server.bundle1gd (if generaldelta active)
605 607 4. server.bundle1
606 608 """
607 609 ui = repo.ui
608 610 gd = 'generaldelta' in repo.requirements
609 611
610 612 if gd:
611 613 v = ui.configbool('server', 'bundle1gd.%s' % action)
612 614 if v is not None:
613 615 return v
614 616
615 617 v = ui.configbool('server', 'bundle1.%s' % action)
616 618 if v is not None:
617 619 return v
618 620
619 621 if gd:
620 622 v = ui.configbool('server', 'bundle1gd')
621 623 if v is not None:
622 624 return v
623 625
624 626 return ui.configbool('server', 'bundle1')
625 627
626 628 def supportedcompengines(ui, proto, role):
627 629 """Obtain the list of supported compression engines for a request."""
628 630 assert role in (util.CLIENTROLE, util.SERVERROLE)
629 631
630 632 compengines = util.compengines.supportedwireengines(role)
631 633
632 634 # Allow config to override default list and ordering.
633 635 if role == util.SERVERROLE:
634 636 configengines = ui.configlist('server', 'compressionengines')
635 637 config = 'server.compressionengines'
636 638 else:
637 639 # This is currently implemented mainly to facilitate testing. In most
638 640 # cases, the server should be in charge of choosing a compression engine
639 641 # because a server has the most to lose from a sub-optimal choice. (e.g.
640 642 # CPU DoS due to an expensive engine or a network DoS due to poor
641 643 # compression ratio).
642 644 configengines = ui.configlist('experimental',
643 645 'clientcompressionengines')
644 646 config = 'experimental.clientcompressionengines'
645 647
646 648 # No explicit config. Filter out the ones that aren't supposed to be
647 649 # advertised and return default ordering.
648 650 if not configengines:
649 651 attr = 'serverpriority' if role == util.SERVERROLE else 'clientpriority'
650 652 return [e for e in compengines
651 653 if getattr(e.wireprotosupport(), attr) > 0]
652 654
653 655 # If compression engines are listed in the config, assume there is a good
654 656 # reason for it (like server operators wanting to achieve specific
655 657 # performance characteristics). So fail fast if the config references
656 658 # unusable compression engines.
657 659 validnames = set(e.name() for e in compengines)
658 660 invalidnames = set(e for e in configengines if e not in validnames)
659 661 if invalidnames:
660 662 raise error.Abort(_('invalid compression engine defined in %s: %s') %
661 663 (config, ', '.join(sorted(invalidnames))))
662 664
663 665 compengines = [e for e in compengines if e.name() in configengines]
664 666 compengines = sorted(compengines,
665 667 key=lambda e: configengines.index(e.name()))
666 668
667 669 if not compengines:
668 670 raise error.Abort(_('%s config option does not specify any known '
669 671 'compression engines') % config,
670 672 hint=_('usable compression engines: %s') %
671 673 ', '.sorted(validnames))
672 674
673 675 return compengines
674 676
675 677 # list of commands
676 678 commands = {}
677 679
678 680 def wireprotocommand(name, args=''):
679 681 """decorator for wire protocol command"""
680 682 def register(func):
681 683 commands[name] = (func, args)
682 684 return func
683 685 return register
684 686
685 687 @wireprotocommand('batch', 'cmds *')
686 688 def batch(repo, proto, cmds, others):
687 689 repo = repo.filtered("served")
688 690 res = []
689 691 for pair in cmds.split(';'):
690 692 op, args = pair.split(' ', 1)
691 693 vals = {}
692 694 for a in args.split(','):
693 695 if a:
694 696 n, v = a.split('=')
695 697 vals[unescapearg(n)] = unescapearg(v)
696 698 func, spec = commands[op]
697 699 if spec:
698 700 keys = spec.split()
699 701 data = {}
700 702 for k in keys:
701 703 if k == '*':
702 704 star = {}
703 705 for key in vals.keys():
704 706 if key not in keys:
705 707 star[key] = vals[key]
706 708 data['*'] = star
707 709 else:
708 710 data[k] = vals[k]
709 711 result = func(repo, proto, *[data[k] for k in keys])
710 712 else:
711 713 result = func(repo, proto)
712 714 if isinstance(result, ooberror):
713 715 return result
714 716 res.append(escapearg(result))
715 717 return ';'.join(res)
716 718
717 719 @wireprotocommand('between', 'pairs')
718 720 def between(repo, proto, pairs):
719 721 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
720 722 r = []
721 723 for b in repo.between(pairs):
722 724 r.append(encodelist(b) + "\n")
723 725 return "".join(r)
724 726
725 727 @wireprotocommand('branchmap')
726 728 def branchmap(repo, proto):
727 729 branchmap = repo.branchmap()
728 730 heads = []
729 731 for branch, nodes in branchmap.iteritems():
730 732 branchname = urlreq.quote(encoding.fromlocal(branch))
731 733 branchnodes = encodelist(nodes)
732 734 heads.append('%s %s' % (branchname, branchnodes))
733 735 return '\n'.join(heads)
734 736
735 737 @wireprotocommand('branches', 'nodes')
736 738 def branches(repo, proto, nodes):
737 739 nodes = decodelist(nodes)
738 740 r = []
739 741 for b in repo.branches(nodes):
740 742 r.append(encodelist(b) + "\n")
741 743 return "".join(r)
742 744
743 745 @wireprotocommand('clonebundles', '')
744 746 def clonebundles(repo, proto):
745 747 """Server command for returning info for available bundles to seed clones.
746 748
747 749 Clients will parse this response and determine what bundle to fetch.
748 750
749 751 Extensions may wrap this command to filter or dynamically emit data
750 752 depending on the request. e.g. you could advertise URLs for the closest
751 753 data center given the client's IP address.
752 754 """
753 755 return repo.vfs.tryread('clonebundles.manifest')
754 756
755 757 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
756 758 'known', 'getbundle', 'unbundlehash', 'batch']
757 759
758 760 def _capabilities(repo, proto):
759 761 """return a list of capabilities for a repo
760 762
761 763 This function exists to allow extensions to easily wrap capabilities
762 764 computation
763 765
764 766 - returns a lists: easy to alter
765 767 - change done here will be propagated to both `capabilities` and `hello`
766 768 command without any other action needed.
767 769 """
768 770 # copy to prevent modification of the global list
769 771 caps = list(wireprotocaps)
770 772 if streamclone.allowservergeneration(repo):
771 773 if repo.ui.configbool('server', 'preferuncompressed'):
772 774 caps.append('stream-preferred')
773 775 requiredformats = repo.requirements & repo.supportedformats
774 776 # if our local revlogs are just revlogv1, add 'stream' cap
775 777 if not requiredformats - {'revlogv1'}:
776 778 caps.append('stream')
777 779 # otherwise, add 'streamreqs' detailing our local revlog format
778 780 else:
779 781 caps.append('streamreqs=%s' % ','.join(sorted(requiredformats)))
780 782 if repo.ui.configbool('experimental', 'bundle2-advertise'):
781 783 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
782 784 caps.append('bundle2=' + urlreq.quote(capsblob))
783 785 caps.append('unbundle=%s' % ','.join(bundle2.bundlepriority))
784 786
785 787 if proto.name == 'http':
786 788 caps.append('httpheader=%d' %
787 789 repo.ui.configint('server', 'maxhttpheaderlen'))
788 790 if repo.ui.configbool('experimental', 'httppostargs'):
789 791 caps.append('httppostargs')
790 792
791 793 # FUTURE advertise 0.2rx once support is implemented
792 794 # FUTURE advertise minrx and mintx after consulting config option
793 795 caps.append('httpmediatype=0.1rx,0.1tx,0.2tx')
794 796
795 797 compengines = supportedcompengines(repo.ui, proto, util.SERVERROLE)
796 798 if compengines:
797 799 comptypes = ','.join(urlreq.quote(e.wireprotosupport().name)
798 800 for e in compengines)
799 801 caps.append('compression=%s' % comptypes)
800 802
801 803 return caps
802 804
803 805 # If you are writing an extension and consider wrapping this function. Wrap
804 806 # `_capabilities` instead.
805 807 @wireprotocommand('capabilities')
806 808 def capabilities(repo, proto):
807 809 return ' '.join(_capabilities(repo, proto))
808 810
809 811 @wireprotocommand('changegroup', 'roots')
810 812 def changegroup(repo, proto, roots):
811 813 nodes = decodelist(roots)
812 814 outgoing = discovery.outgoing(repo, missingroots=nodes,
813 815 missingheads=repo.heads())
814 816 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
815 817 gen = iter(lambda: cg.read(32768), '')
816 818 return streamres(gen=gen)
817 819
818 820 @wireprotocommand('changegroupsubset', 'bases heads')
819 821 def changegroupsubset(repo, proto, bases, heads):
820 822 bases = decodelist(bases)
821 823 heads = decodelist(heads)
822 824 outgoing = discovery.outgoing(repo, missingroots=bases,
823 825 missingheads=heads)
824 826 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
825 827 gen = iter(lambda: cg.read(32768), '')
826 828 return streamres(gen=gen)
827 829
828 830 @wireprotocommand('debugwireargs', 'one two *')
829 831 def debugwireargs(repo, proto, one, two, others):
830 832 # only accept optional args from the known set
831 833 opts = options('debugwireargs', ['three', 'four'], others)
832 834 return repo.debugwireargs(one, two, **pycompat.strkwargs(opts))
833 835
834 836 @wireprotocommand('getbundle', '*')
835 837 def getbundle(repo, proto, others):
836 838 opts = options('getbundle', gboptsmap.keys(), others)
837 839 for k, v in opts.iteritems():
838 840 keytype = gboptsmap[k]
839 841 if keytype == 'nodes':
840 842 opts[k] = decodelist(v)
841 843 elif keytype == 'csv':
842 844 opts[k] = list(v.split(','))
843 845 elif keytype == 'scsv':
844 846 opts[k] = set(v.split(','))
845 847 elif keytype == 'boolean':
846 848 # Client should serialize False as '0', which is a non-empty string
847 849 # so it evaluates as a True bool.
848 850 if v == '0':
849 851 opts[k] = False
850 852 else:
851 853 opts[k] = bool(v)
852 854 elif keytype != 'plain':
853 855 raise KeyError('unknown getbundle option type %s'
854 856 % keytype)
855 857
856 858 if not bundle1allowed(repo, 'pull'):
857 859 if not exchange.bundle2requested(opts.get('bundlecaps')):
858 860 if proto.name == 'http':
859 861 return ooberror(bundle2required)
860 862 raise error.Abort(bundle2requiredmain,
861 863 hint=bundle2requiredhint)
862 864
863 865 try:
864 866 if repo.ui.configbool('server', 'disablefullbundle'):
865 867 # Check to see if this is a full clone.
866 868 clheads = set(repo.changelog.heads())
867 869 heads = set(opts.get('heads', set()))
868 870 common = set(opts.get('common', set()))
869 871 common.discard(nullid)
870 872 if not common and clheads == heads:
871 873 raise error.Abort(
872 874 _('server has pull-based clones disabled'),
873 875 hint=_('remove --pull if specified or upgrade Mercurial'))
874 876
875 877 chunks = exchange.getbundlechunks(repo, 'serve',
876 878 **pycompat.strkwargs(opts))
877 879 except error.Abort as exc:
878 880 # cleanly forward Abort error to the client
879 881 if not exchange.bundle2requested(opts.get('bundlecaps')):
880 882 if proto.name == 'http':
881 883 return ooberror(str(exc) + '\n')
882 884 raise # cannot do better for bundle1 + ssh
883 885 # bundle2 request expect a bundle2 reply
884 886 bundler = bundle2.bundle20(repo.ui)
885 887 manargs = [('message', str(exc))]
886 888 advargs = []
887 889 if exc.hint is not None:
888 890 advargs.append(('hint', exc.hint))
889 891 bundler.addpart(bundle2.bundlepart('error:abort',
890 892 manargs, advargs))
891 893 return streamres(gen=bundler.getchunks())
892 894 return streamres(gen=chunks)
893 895
894 896 @wireprotocommand('heads')
895 897 def heads(repo, proto):
896 898 h = repo.heads()
897 899 return encodelist(h) + "\n"
898 900
899 901 @wireprotocommand('hello')
900 902 def hello(repo, proto):
901 903 '''the hello command returns a set of lines describing various
902 904 interesting things about the server, in an RFC822-like format.
903 905 Currently the only one defined is "capabilities", which
904 906 consists of a line in the form:
905 907
906 908 capabilities: space separated list of tokens
907 909 '''
908 910 return "capabilities: %s\n" % (capabilities(repo, proto))
909 911
910 912 @wireprotocommand('listkeys', 'namespace')
911 913 def listkeys(repo, proto, namespace):
912 914 d = repo.listkeys(encoding.tolocal(namespace)).items()
913 915 return pushkeymod.encodekeys(d)
914 916
915 917 @wireprotocommand('lookup', 'key')
916 918 def lookup(repo, proto, key):
917 919 try:
918 920 k = encoding.tolocal(key)
919 921 c = repo[k]
920 922 r = c.hex()
921 923 success = 1
922 924 except Exception as inst:
923 925 r = str(inst)
924 926 success = 0
925 927 return "%d %s\n" % (success, r)
926 928
927 929 @wireprotocommand('known', 'nodes *')
928 930 def known(repo, proto, nodes, others):
929 931 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
930 932
931 933 @wireprotocommand('pushkey', 'namespace key old new')
932 934 def pushkey(repo, proto, namespace, key, old, new):
933 935 # compatibility with pre-1.8 clients which were accidentally
934 936 # sending raw binary nodes rather than utf-8-encoded hex
935 937 if len(new) == 20 and util.escapestr(new) != new:
936 938 # looks like it could be a binary node
937 939 try:
938 940 new.decode('utf-8')
939 941 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
940 942 except UnicodeDecodeError:
941 943 pass # binary, leave unmodified
942 944 else:
943 945 new = encoding.tolocal(new) # normal path
944 946
945 947 if util.safehasattr(proto, 'restore'):
946 948
947 949 proto.redirect()
948 950
949 951 try:
950 952 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
951 953 encoding.tolocal(old), new) or False
952 954 except error.Abort:
953 955 r = False
954 956
955 957 output = proto.restore()
956 958
957 959 return '%s\n%s' % (int(r), output)
958 960
959 961 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
960 962 encoding.tolocal(old), new)
961 963 return '%s\n' % int(r)
962 964
963 965 @wireprotocommand('stream_out')
964 966 def stream(repo, proto):
965 967 '''If the server supports streaming clone, it advertises the "stream"
966 968 capability with a value representing the version and flags of the repo
967 969 it is serving. Client checks to see if it understands the format.
968 970 '''
969 971 return streamres_legacy(streamclone.generatev1wireproto(repo))
970 972
971 973 @wireprotocommand('unbundle', 'heads')
972 974 def unbundle(repo, proto, heads):
973 975 their_heads = decodelist(heads)
974 976
975 977 try:
976 978 proto.redirect()
977 979
978 980 exchange.check_heads(repo, their_heads, 'preparing changes')
979 981
980 982 # write bundle data to temporary file because it can be big
981 983 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
982 984 fp = os.fdopen(fd, pycompat.sysstr('wb+'))
983 985 r = 0
984 986 try:
985 987 proto.getfile(fp)
986 988 fp.seek(0)
987 989 gen = exchange.readbundle(repo.ui, fp, None)
988 990 if (isinstance(gen, changegroupmod.cg1unpacker)
989 991 and not bundle1allowed(repo, 'push')):
990 992 if proto.name == 'http':
991 993 # need to special case http because stderr do not get to
992 994 # the http client on failed push so we need to abuse some
993 995 # other error type to make sure the message get to the
994 996 # user.
995 997 return ooberror(bundle2required)
996 998 raise error.Abort(bundle2requiredmain,
997 999 hint=bundle2requiredhint)
998 1000
999 1001 r = exchange.unbundle(repo, gen, their_heads, 'serve',
1000 1002 proto._client())
1001 1003 if util.safehasattr(r, 'addpart'):
1002 1004 # The return looks streamable, we are in the bundle2 case and
1003 1005 # should return a stream.
1004 1006 return streamres_legacy(gen=r.getchunks())
1005 1007 return pushres(r)
1006 1008
1007 1009 finally:
1008 1010 fp.close()
1009 1011 os.unlink(tempname)
1010 1012
1011 1013 except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
1012 1014 # handle non-bundle2 case first
1013 1015 if not getattr(exc, 'duringunbundle2', False):
1014 1016 try:
1015 1017 raise
1016 1018 except error.Abort:
1017 1019 # The old code we moved used util.stderr directly.
1018 1020 # We did not change it to minimise code change.
1019 1021 # This need to be moved to something proper.
1020 1022 # Feel free to do it.
1021 1023 util.stderr.write("abort: %s\n" % exc)
1022 1024 if exc.hint is not None:
1023 1025 util.stderr.write("(%s)\n" % exc.hint)
1024 1026 return pushres(0)
1025 1027 except error.PushRaced:
1026 1028 return pusherr(str(exc))
1027 1029
1028 1030 bundler = bundle2.bundle20(repo.ui)
1029 1031 for out in getattr(exc, '_bundle2salvagedoutput', ()):
1030 1032 bundler.addpart(out)
1031 1033 try:
1032 1034 try:
1033 1035 raise
1034 1036 except error.PushkeyFailed as exc:
1035 1037 # check client caps
1036 1038 remotecaps = getattr(exc, '_replycaps', None)
1037 1039 if (remotecaps is not None
1038 1040 and 'pushkey' not in remotecaps.get('error', ())):
1039 1041 # no support remote side, fallback to Abort handler.
1040 1042 raise
1041 1043 part = bundler.newpart('error:pushkey')
1042 1044 part.addparam('in-reply-to', exc.partid)
1043 1045 if exc.namespace is not None:
1044 1046 part.addparam('namespace', exc.namespace, mandatory=False)
1045 1047 if exc.key is not None:
1046 1048 part.addparam('key', exc.key, mandatory=False)
1047 1049 if exc.new is not None:
1048 1050 part.addparam('new', exc.new, mandatory=False)
1049 1051 if exc.old is not None:
1050 1052 part.addparam('old', exc.old, mandatory=False)
1051 1053 if exc.ret is not None:
1052 1054 part.addparam('ret', exc.ret, mandatory=False)
1053 1055 except error.BundleValueError as exc:
1054 1056 errpart = bundler.newpart('error:unsupportedcontent')
1055 1057 if exc.parttype is not None:
1056 1058 errpart.addparam('parttype', exc.parttype)
1057 1059 if exc.params:
1058 1060 errpart.addparam('params', '\0'.join(exc.params))
1059 1061 except error.Abort as exc:
1060 1062 manargs = [('message', str(exc))]
1061 1063 advargs = []
1062 1064 if exc.hint is not None:
1063 1065 advargs.append(('hint', exc.hint))
1064 1066 bundler.addpart(bundle2.bundlepart('error:abort',
1065 1067 manargs, advargs))
1066 1068 except error.PushRaced as exc:
1067 1069 bundler.newpart('error:pushraced', [('message', str(exc))])
1068 1070 return streamres_legacy(gen=bundler.getchunks())
General Comments 0
You need to be logged in to leave comments. Login now