##// END OF EJS Templates
exchange: advertise if a clone bundle was attempted...
Gregory Szorc -
r26690:704818fb default
parent child Browse files
Show More
@@ -1,1791 +1,1801
1 1 # exchange.py - utility to exchange data between repos.
2 2 #
3 3 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from i18n import _
9 9 from node import hex, nullid
10 10 import errno, urllib, urllib2
11 11 import util, scmutil, changegroup, base85, error
12 12 import discovery, phases, obsolete, bookmarks as bookmod, bundle2, pushkey
13 13 import lock as lockmod
14 14 import streamclone
15 15 import sslutil
16 16 import tags
17 17 import url as urlmod
18 18
19 19 # Maps bundle compression human names to internal representation.
20 20 _bundlespeccompressions = {'none': None,
21 21 'bzip2': 'BZ',
22 22 'gzip': 'GZ',
23 23 }
24 24
25 25 # Maps bundle version human names to changegroup versions.
26 26 _bundlespeccgversions = {'v1': '01',
27 27 'v2': '02',
28 28 'bundle2': '02', #legacy
29 29 }
30 30
31 31 def parsebundlespec(repo, spec, strict=True, externalnames=False):
32 32 """Parse a bundle string specification into parts.
33 33
34 34 Bundle specifications denote a well-defined bundle/exchange format.
35 35 The content of a given specification should not change over time in
36 36 order to ensure that bundles produced by a newer version of Mercurial are
37 37 readable from an older version.
38 38
39 39 The string currently has the form:
40 40
41 41 <compression>-<type>
42 42
43 43 Where <compression> is one of the supported compression formats
44 44 and <type> is (currently) a version string.
45 45
46 46 If ``strict`` is True (the default) <compression> is required. Otherwise,
47 47 it is optional.
48 48
49 49 If ``externalnames`` is False (the default), the human-centric names will
50 50 be converted to their internal representation.
51 51
52 52 Returns a 2-tuple of (compression, version). Compression will be ``None``
53 53 if not in strict mode and a compression isn't defined.
54 54
55 55 An ``InvalidBundleSpecification`` is raised when the specification is
56 56 not syntactically well formed.
57 57
58 58 An ``UnsupportedBundleSpecification`` is raised when the compression or
59 59 bundle type/version is not recognized.
60 60
61 61 Note: this function will likely eventually return a more complex data
62 62 structure, including bundle2 part information.
63 63 """
64 64 if strict and '-' not in spec:
65 65 raise error.InvalidBundleSpecification(
66 66 _('invalid bundle specification; '
67 67 'must be prefixed with compression: %s') % spec)
68 68
69 69 if '-' in spec:
70 70 compression, version = spec.split('-', 1)
71 71
72 72 if compression not in _bundlespeccompressions:
73 73 raise error.UnsupportedBundleSpecification(
74 74 _('%s compression is not supported') % compression)
75 75
76 76 if version not in _bundlespeccgversions:
77 77 raise error.UnsupportedBundleSpecification(
78 78 _('%s is not a recognized bundle version') % version)
79 79 else:
80 80 # Value could be just the compression or just the version, in which
81 81 # case some defaults are assumed (but only when not in strict mode).
82 82 assert not strict
83 83
84 84 if spec in _bundlespeccompressions:
85 85 compression = spec
86 86 version = 'v1'
87 87 if 'generaldelta' in repo.requirements:
88 88 version = 'v2'
89 89 elif spec in _bundlespeccgversions:
90 90 compression = 'bzip2'
91 91 version = spec
92 92 else:
93 93 raise error.UnsupportedBundleSpecification(
94 94 _('%s is not a recognized bundle specification') % spec)
95 95
96 96 if not externalnames:
97 97 compression = _bundlespeccompressions[compression]
98 98 version = _bundlespeccgversions[version]
99 99 return compression, version
100 100
101 101 def readbundle(ui, fh, fname, vfs=None):
102 102 header = changegroup.readexactly(fh, 4)
103 103
104 104 alg = None
105 105 if not fname:
106 106 fname = "stream"
107 107 if not header.startswith('HG') and header.startswith('\0'):
108 108 fh = changegroup.headerlessfixup(fh, header)
109 109 header = "HG10"
110 110 alg = 'UN'
111 111 elif vfs:
112 112 fname = vfs.join(fname)
113 113
114 114 magic, version = header[0:2], header[2:4]
115 115
116 116 if magic != 'HG':
117 117 raise error.Abort(_('%s: not a Mercurial bundle') % fname)
118 118 if version == '10':
119 119 if alg is None:
120 120 alg = changegroup.readexactly(fh, 2)
121 121 return changegroup.cg1unpacker(fh, alg)
122 122 elif version.startswith('2'):
123 123 return bundle2.getunbundler(ui, fh, magicstring=magic + version)
124 124 else:
125 125 raise error.Abort(_('%s: unknown bundle version %s') % (fname, version))
126 126
127 127 def buildobsmarkerspart(bundler, markers):
128 128 """add an obsmarker part to the bundler with <markers>
129 129
130 130 No part is created if markers is empty.
131 131 Raises ValueError if the bundler doesn't support any known obsmarker format.
132 132 """
133 133 if markers:
134 134 remoteversions = bundle2.obsmarkersversion(bundler.capabilities)
135 135 version = obsolete.commonversion(remoteversions)
136 136 if version is None:
137 137 raise ValueError('bundler do not support common obsmarker format')
138 138 stream = obsolete.encodemarkers(markers, True, version=version)
139 139 return bundler.newpart('obsmarkers', data=stream)
140 140 return None
141 141
142 142 def _canusebundle2(op):
143 143 """return true if a pull/push can use bundle2
144 144
145 145 Feel free to nuke this function when we drop the experimental option"""
146 146 return (op.repo.ui.configbool('experimental', 'bundle2-exp', True)
147 147 and op.remote.capable('bundle2'))
148 148
149 149
150 150 class pushoperation(object):
151 151 """A object that represent a single push operation
152 152
153 153 It purpose is to carry push related state and very common operation.
154 154
155 155 A new should be created at the beginning of each push and discarded
156 156 afterward.
157 157 """
158 158
159 159 def __init__(self, repo, remote, force=False, revs=None, newbranch=False,
160 160 bookmarks=()):
161 161 # repo we push from
162 162 self.repo = repo
163 163 self.ui = repo.ui
164 164 # repo we push to
165 165 self.remote = remote
166 166 # force option provided
167 167 self.force = force
168 168 # revs to be pushed (None is "all")
169 169 self.revs = revs
170 170 # bookmark explicitly pushed
171 171 self.bookmarks = bookmarks
172 172 # allow push of new branch
173 173 self.newbranch = newbranch
174 174 # did a local lock get acquired?
175 175 self.locallocked = None
176 176 # step already performed
177 177 # (used to check what steps have been already performed through bundle2)
178 178 self.stepsdone = set()
179 179 # Integer version of the changegroup push result
180 180 # - None means nothing to push
181 181 # - 0 means HTTP error
182 182 # - 1 means we pushed and remote head count is unchanged *or*
183 183 # we have outgoing changesets but refused to push
184 184 # - other values as described by addchangegroup()
185 185 self.cgresult = None
186 186 # Boolean value for the bookmark push
187 187 self.bkresult = None
188 188 # discover.outgoing object (contains common and outgoing data)
189 189 self.outgoing = None
190 190 # all remote heads before the push
191 191 self.remoteheads = None
192 192 # testable as a boolean indicating if any nodes are missing locally.
193 193 self.incoming = None
194 194 # phases changes that must be pushed along side the changesets
195 195 self.outdatedphases = None
196 196 # phases changes that must be pushed if changeset push fails
197 197 self.fallbackoutdatedphases = None
198 198 # outgoing obsmarkers
199 199 self.outobsmarkers = set()
200 200 # outgoing bookmarks
201 201 self.outbookmarks = []
202 202 # transaction manager
203 203 self.trmanager = None
204 204 # map { pushkey partid -> callback handling failure}
205 205 # used to handle exception from mandatory pushkey part failure
206 206 self.pkfailcb = {}
207 207
208 208 @util.propertycache
209 209 def futureheads(self):
210 210 """future remote heads if the changeset push succeeds"""
211 211 return self.outgoing.missingheads
212 212
213 213 @util.propertycache
214 214 def fallbackheads(self):
215 215 """future remote heads if the changeset push fails"""
216 216 if self.revs is None:
217 217 # not target to push, all common are relevant
218 218 return self.outgoing.commonheads
219 219 unfi = self.repo.unfiltered()
220 220 # I want cheads = heads(::missingheads and ::commonheads)
221 221 # (missingheads is revs with secret changeset filtered out)
222 222 #
223 223 # This can be expressed as:
224 224 # cheads = ( (missingheads and ::commonheads)
225 225 # + (commonheads and ::missingheads))"
226 226 # )
227 227 #
228 228 # while trying to push we already computed the following:
229 229 # common = (::commonheads)
230 230 # missing = ((commonheads::missingheads) - commonheads)
231 231 #
232 232 # We can pick:
233 233 # * missingheads part of common (::commonheads)
234 234 common = self.outgoing.common
235 235 nm = self.repo.changelog.nodemap
236 236 cheads = [node for node in self.revs if nm[node] in common]
237 237 # and
238 238 # * commonheads parents on missing
239 239 revset = unfi.set('%ln and parents(roots(%ln))',
240 240 self.outgoing.commonheads,
241 241 self.outgoing.missing)
242 242 cheads.extend(c.node() for c in revset)
243 243 return cheads
244 244
245 245 @property
246 246 def commonheads(self):
247 247 """set of all common heads after changeset bundle push"""
248 248 if self.cgresult:
249 249 return self.futureheads
250 250 else:
251 251 return self.fallbackheads
252 252
253 253 # mapping of message used when pushing bookmark
254 254 bookmsgmap = {'update': (_("updating bookmark %s\n"),
255 255 _('updating bookmark %s failed!\n')),
256 256 'export': (_("exporting bookmark %s\n"),
257 257 _('exporting bookmark %s failed!\n')),
258 258 'delete': (_("deleting remote bookmark %s\n"),
259 259 _('deleting remote bookmark %s failed!\n')),
260 260 }
261 261
262 262
263 263 def push(repo, remote, force=False, revs=None, newbranch=False, bookmarks=()):
264 264 '''Push outgoing changesets (limited by revs) from a local
265 265 repository to remote. Return an integer:
266 266 - None means nothing to push
267 267 - 0 means HTTP error
268 268 - 1 means we pushed and remote head count is unchanged *or*
269 269 we have outgoing changesets but refused to push
270 270 - other values as described by addchangegroup()
271 271 '''
272 272 pushop = pushoperation(repo, remote, force, revs, newbranch, bookmarks)
273 273 if pushop.remote.local():
274 274 missing = (set(pushop.repo.requirements)
275 275 - pushop.remote.local().supported)
276 276 if missing:
277 277 msg = _("required features are not"
278 278 " supported in the destination:"
279 279 " %s") % (', '.join(sorted(missing)))
280 280 raise error.Abort(msg)
281 281
282 282 # there are two ways to push to remote repo:
283 283 #
284 284 # addchangegroup assumes local user can lock remote
285 285 # repo (local filesystem, old ssh servers).
286 286 #
287 287 # unbundle assumes local user cannot lock remote repo (new ssh
288 288 # servers, http servers).
289 289
290 290 if not pushop.remote.canpush():
291 291 raise error.Abort(_("destination does not support push"))
292 292 # get local lock as we might write phase data
293 293 localwlock = locallock = None
294 294 try:
295 295 # bundle2 push may receive a reply bundle touching bookmarks or other
296 296 # things requiring the wlock. Take it now to ensure proper ordering.
297 297 maypushback = pushop.ui.configbool('experimental', 'bundle2.pushback')
298 298 if _canusebundle2(pushop) and maypushback:
299 299 localwlock = pushop.repo.wlock()
300 300 locallock = pushop.repo.lock()
301 301 pushop.locallocked = True
302 302 except IOError as err:
303 303 pushop.locallocked = False
304 304 if err.errno != errno.EACCES:
305 305 raise
306 306 # source repo cannot be locked.
307 307 # We do not abort the push, but just disable the local phase
308 308 # synchronisation.
309 309 msg = 'cannot lock source repository: %s\n' % err
310 310 pushop.ui.debug(msg)
311 311 try:
312 312 if pushop.locallocked:
313 313 pushop.trmanager = transactionmanager(pushop.repo,
314 314 'push-response',
315 315 pushop.remote.url())
316 316 pushop.repo.checkpush(pushop)
317 317 lock = None
318 318 unbundle = pushop.remote.capable('unbundle')
319 319 if not unbundle:
320 320 lock = pushop.remote.lock()
321 321 try:
322 322 _pushdiscovery(pushop)
323 323 if _canusebundle2(pushop):
324 324 _pushbundle2(pushop)
325 325 _pushchangeset(pushop)
326 326 _pushsyncphase(pushop)
327 327 _pushobsolete(pushop)
328 328 _pushbookmark(pushop)
329 329 finally:
330 330 if lock is not None:
331 331 lock.release()
332 332 if pushop.trmanager:
333 333 pushop.trmanager.close()
334 334 finally:
335 335 if pushop.trmanager:
336 336 pushop.trmanager.release()
337 337 if locallock is not None:
338 338 locallock.release()
339 339 if localwlock is not None:
340 340 localwlock.release()
341 341
342 342 return pushop
343 343
344 344 # list of steps to perform discovery before push
345 345 pushdiscoveryorder = []
346 346
347 347 # Mapping between step name and function
348 348 #
349 349 # This exists to help extensions wrap steps if necessary
350 350 pushdiscoverymapping = {}
351 351
352 352 def pushdiscovery(stepname):
353 353 """decorator for function performing discovery before push
354 354
355 355 The function is added to the step -> function mapping and appended to the
356 356 list of steps. Beware that decorated function will be added in order (this
357 357 may matter).
358 358
359 359 You can only use this decorator for a new step, if you want to wrap a step
360 360 from an extension, change the pushdiscovery dictionary directly."""
361 361 def dec(func):
362 362 assert stepname not in pushdiscoverymapping
363 363 pushdiscoverymapping[stepname] = func
364 364 pushdiscoveryorder.append(stepname)
365 365 return func
366 366 return dec
367 367
368 368 def _pushdiscovery(pushop):
369 369 """Run all discovery steps"""
370 370 for stepname in pushdiscoveryorder:
371 371 step = pushdiscoverymapping[stepname]
372 372 step(pushop)
373 373
374 374 @pushdiscovery('changeset')
375 375 def _pushdiscoverychangeset(pushop):
376 376 """discover the changeset that need to be pushed"""
377 377 fci = discovery.findcommonincoming
378 378 commoninc = fci(pushop.repo, pushop.remote, force=pushop.force)
379 379 common, inc, remoteheads = commoninc
380 380 fco = discovery.findcommonoutgoing
381 381 outgoing = fco(pushop.repo, pushop.remote, onlyheads=pushop.revs,
382 382 commoninc=commoninc, force=pushop.force)
383 383 pushop.outgoing = outgoing
384 384 pushop.remoteheads = remoteheads
385 385 pushop.incoming = inc
386 386
387 387 @pushdiscovery('phase')
388 388 def _pushdiscoveryphase(pushop):
389 389 """discover the phase that needs to be pushed
390 390
391 391 (computed for both success and failure case for changesets push)"""
392 392 outgoing = pushop.outgoing
393 393 unfi = pushop.repo.unfiltered()
394 394 remotephases = pushop.remote.listkeys('phases')
395 395 publishing = remotephases.get('publishing', False)
396 396 if (pushop.ui.configbool('ui', '_usedassubrepo', False)
397 397 and remotephases # server supports phases
398 398 and not pushop.outgoing.missing # no changesets to be pushed
399 399 and publishing):
400 400 # When:
401 401 # - this is a subrepo push
402 402 # - and remote support phase
403 403 # - and no changeset are to be pushed
404 404 # - and remote is publishing
405 405 # We may be in issue 3871 case!
406 406 # We drop the possible phase synchronisation done by
407 407 # courtesy to publish changesets possibly locally draft
408 408 # on the remote.
409 409 remotephases = {'publishing': 'True'}
410 410 ana = phases.analyzeremotephases(pushop.repo,
411 411 pushop.fallbackheads,
412 412 remotephases)
413 413 pheads, droots = ana
414 414 extracond = ''
415 415 if not publishing:
416 416 extracond = ' and public()'
417 417 revset = 'heads((%%ln::%%ln) %s)' % extracond
418 418 # Get the list of all revs draft on remote by public here.
419 419 # XXX Beware that revset break if droots is not strictly
420 420 # XXX root we may want to ensure it is but it is costly
421 421 fallback = list(unfi.set(revset, droots, pushop.fallbackheads))
422 422 if not outgoing.missing:
423 423 future = fallback
424 424 else:
425 425 # adds changeset we are going to push as draft
426 426 #
427 427 # should not be necessary for publishing server, but because of an
428 428 # issue fixed in xxxxx we have to do it anyway.
429 429 fdroots = list(unfi.set('roots(%ln + %ln::)',
430 430 outgoing.missing, droots))
431 431 fdroots = [f.node() for f in fdroots]
432 432 future = list(unfi.set(revset, fdroots, pushop.futureheads))
433 433 pushop.outdatedphases = future
434 434 pushop.fallbackoutdatedphases = fallback
435 435
436 436 @pushdiscovery('obsmarker')
437 437 def _pushdiscoveryobsmarkers(pushop):
438 438 if (obsolete.isenabled(pushop.repo, obsolete.exchangeopt)
439 439 and pushop.repo.obsstore
440 440 and 'obsolete' in pushop.remote.listkeys('namespaces')):
441 441 repo = pushop.repo
442 442 # very naive computation, that can be quite expensive on big repo.
443 443 # However: evolution is currently slow on them anyway.
444 444 nodes = (c.node() for c in repo.set('::%ln', pushop.futureheads))
445 445 pushop.outobsmarkers = pushop.repo.obsstore.relevantmarkers(nodes)
446 446
447 447 @pushdiscovery('bookmarks')
448 448 def _pushdiscoverybookmarks(pushop):
449 449 ui = pushop.ui
450 450 repo = pushop.repo.unfiltered()
451 451 remote = pushop.remote
452 452 ui.debug("checking for updated bookmarks\n")
453 453 ancestors = ()
454 454 if pushop.revs:
455 455 revnums = map(repo.changelog.rev, pushop.revs)
456 456 ancestors = repo.changelog.ancestors(revnums, inclusive=True)
457 457 remotebookmark = remote.listkeys('bookmarks')
458 458
459 459 explicit = set(pushop.bookmarks)
460 460
461 461 comp = bookmod.compare(repo, repo._bookmarks, remotebookmark, srchex=hex)
462 462 addsrc, adddst, advsrc, advdst, diverge, differ, invalid, same = comp
463 463 for b, scid, dcid in advsrc:
464 464 if b in explicit:
465 465 explicit.remove(b)
466 466 if not ancestors or repo[scid].rev() in ancestors:
467 467 pushop.outbookmarks.append((b, dcid, scid))
468 468 # search added bookmark
469 469 for b, scid, dcid in addsrc:
470 470 if b in explicit:
471 471 explicit.remove(b)
472 472 pushop.outbookmarks.append((b, '', scid))
473 473 # search for overwritten bookmark
474 474 for b, scid, dcid in advdst + diverge + differ:
475 475 if b in explicit:
476 476 explicit.remove(b)
477 477 pushop.outbookmarks.append((b, dcid, scid))
478 478 # search for bookmark to delete
479 479 for b, scid, dcid in adddst:
480 480 if b in explicit:
481 481 explicit.remove(b)
482 482 # treat as "deleted locally"
483 483 pushop.outbookmarks.append((b, dcid, ''))
484 484 # identical bookmarks shouldn't get reported
485 485 for b, scid, dcid in same:
486 486 if b in explicit:
487 487 explicit.remove(b)
488 488
489 489 if explicit:
490 490 explicit = sorted(explicit)
491 491 # we should probably list all of them
492 492 ui.warn(_('bookmark %s does not exist on the local '
493 493 'or remote repository!\n') % explicit[0])
494 494 pushop.bkresult = 2
495 495
496 496 pushop.outbookmarks.sort()
497 497
498 498 def _pushcheckoutgoing(pushop):
499 499 outgoing = pushop.outgoing
500 500 unfi = pushop.repo.unfiltered()
501 501 if not outgoing.missing:
502 502 # nothing to push
503 503 scmutil.nochangesfound(unfi.ui, unfi, outgoing.excluded)
504 504 return False
505 505 # something to push
506 506 if not pushop.force:
507 507 # if repo.obsstore == False --> no obsolete
508 508 # then, save the iteration
509 509 if unfi.obsstore:
510 510 # this message are here for 80 char limit reason
511 511 mso = _("push includes obsolete changeset: %s!")
512 512 mst = {"unstable": _("push includes unstable changeset: %s!"),
513 513 "bumped": _("push includes bumped changeset: %s!"),
514 514 "divergent": _("push includes divergent changeset: %s!")}
515 515 # If we are to push if there is at least one
516 516 # obsolete or unstable changeset in missing, at
517 517 # least one of the missinghead will be obsolete or
518 518 # unstable. So checking heads only is ok
519 519 for node in outgoing.missingheads:
520 520 ctx = unfi[node]
521 521 if ctx.obsolete():
522 522 raise error.Abort(mso % ctx)
523 523 elif ctx.troubled():
524 524 raise error.Abort(mst[ctx.troubles()[0]] % ctx)
525 525
526 526 # internal config: bookmarks.pushing
527 527 newbm = pushop.ui.configlist('bookmarks', 'pushing')
528 528 discovery.checkheads(unfi, pushop.remote, outgoing,
529 529 pushop.remoteheads,
530 530 pushop.newbranch,
531 531 bool(pushop.incoming),
532 532 newbm)
533 533 return True
534 534
535 535 # List of names of steps to perform for an outgoing bundle2, order matters.
536 536 b2partsgenorder = []
537 537
538 538 # Mapping between step name and function
539 539 #
540 540 # This exists to help extensions wrap steps if necessary
541 541 b2partsgenmapping = {}
542 542
543 543 def b2partsgenerator(stepname, idx=None):
544 544 """decorator for function generating bundle2 part
545 545
546 546 The function is added to the step -> function mapping and appended to the
547 547 list of steps. Beware that decorated functions will be added in order
548 548 (this may matter).
549 549
550 550 You can only use this decorator for new steps, if you want to wrap a step
551 551 from an extension, attack the b2partsgenmapping dictionary directly."""
552 552 def dec(func):
553 553 assert stepname not in b2partsgenmapping
554 554 b2partsgenmapping[stepname] = func
555 555 if idx is None:
556 556 b2partsgenorder.append(stepname)
557 557 else:
558 558 b2partsgenorder.insert(idx, stepname)
559 559 return func
560 560 return dec
561 561
562 562 def _pushb2ctxcheckheads(pushop, bundler):
563 563 """Generate race condition checking parts
564 564
565 565 Exists as an indepedent function to aid extensions
566 566 """
567 567 if not pushop.force:
568 568 bundler.newpart('check:heads', data=iter(pushop.remoteheads))
569 569
570 570 @b2partsgenerator('changeset')
571 571 def _pushb2ctx(pushop, bundler):
572 572 """handle changegroup push through bundle2
573 573
574 574 addchangegroup result is stored in the ``pushop.cgresult`` attribute.
575 575 """
576 576 if 'changesets' in pushop.stepsdone:
577 577 return
578 578 pushop.stepsdone.add('changesets')
579 579 # Send known heads to the server for race detection.
580 580 if not _pushcheckoutgoing(pushop):
581 581 return
582 582 pushop.repo.prepushoutgoinghooks(pushop.repo,
583 583 pushop.remote,
584 584 pushop.outgoing)
585 585
586 586 _pushb2ctxcheckheads(pushop, bundler)
587 587
588 588 b2caps = bundle2.bundle2caps(pushop.remote)
589 589 version = None
590 590 cgversions = b2caps.get('changegroup')
591 591 if not cgversions: # 3.1 and 3.2 ship with an empty value
592 592 cg = changegroup.getlocalchangegroupraw(pushop.repo, 'push',
593 593 pushop.outgoing)
594 594 else:
595 595 cgversions = [v for v in cgversions if v in changegroup.packermap]
596 596 if not cgversions:
597 597 raise ValueError(_('no common changegroup version'))
598 598 version = max(cgversions)
599 599 cg = changegroup.getlocalchangegroupraw(pushop.repo, 'push',
600 600 pushop.outgoing,
601 601 version=version)
602 602 cgpart = bundler.newpart('changegroup', data=cg)
603 603 if version is not None:
604 604 cgpart.addparam('version', version)
605 605 def handlereply(op):
606 606 """extract addchangegroup returns from server reply"""
607 607 cgreplies = op.records.getreplies(cgpart.id)
608 608 assert len(cgreplies['changegroup']) == 1
609 609 pushop.cgresult = cgreplies['changegroup'][0]['return']
610 610 return handlereply
611 611
612 612 @b2partsgenerator('phase')
613 613 def _pushb2phases(pushop, bundler):
614 614 """handle phase push through bundle2"""
615 615 if 'phases' in pushop.stepsdone:
616 616 return
617 617 b2caps = bundle2.bundle2caps(pushop.remote)
618 618 if not 'pushkey' in b2caps:
619 619 return
620 620 pushop.stepsdone.add('phases')
621 621 part2node = []
622 622
623 623 def handlefailure(pushop, exc):
624 624 targetid = int(exc.partid)
625 625 for partid, node in part2node:
626 626 if partid == targetid:
627 627 raise error.Abort(_('updating %s to public failed') % node)
628 628
629 629 enc = pushkey.encode
630 630 for newremotehead in pushop.outdatedphases:
631 631 part = bundler.newpart('pushkey')
632 632 part.addparam('namespace', enc('phases'))
633 633 part.addparam('key', enc(newremotehead.hex()))
634 634 part.addparam('old', enc(str(phases.draft)))
635 635 part.addparam('new', enc(str(phases.public)))
636 636 part2node.append((part.id, newremotehead))
637 637 pushop.pkfailcb[part.id] = handlefailure
638 638
639 639 def handlereply(op):
640 640 for partid, node in part2node:
641 641 partrep = op.records.getreplies(partid)
642 642 results = partrep['pushkey']
643 643 assert len(results) <= 1
644 644 msg = None
645 645 if not results:
646 646 msg = _('server ignored update of %s to public!\n') % node
647 647 elif not int(results[0]['return']):
648 648 msg = _('updating %s to public failed!\n') % node
649 649 if msg is not None:
650 650 pushop.ui.warn(msg)
651 651 return handlereply
652 652
653 653 @b2partsgenerator('obsmarkers')
654 654 def _pushb2obsmarkers(pushop, bundler):
655 655 if 'obsmarkers' in pushop.stepsdone:
656 656 return
657 657 remoteversions = bundle2.obsmarkersversion(bundler.capabilities)
658 658 if obsolete.commonversion(remoteversions) is None:
659 659 return
660 660 pushop.stepsdone.add('obsmarkers')
661 661 if pushop.outobsmarkers:
662 662 markers = sorted(pushop.outobsmarkers)
663 663 buildobsmarkerspart(bundler, markers)
664 664
665 665 @b2partsgenerator('bookmarks')
666 666 def _pushb2bookmarks(pushop, bundler):
667 667 """handle bookmark push through bundle2"""
668 668 if 'bookmarks' in pushop.stepsdone:
669 669 return
670 670 b2caps = bundle2.bundle2caps(pushop.remote)
671 671 if 'pushkey' not in b2caps:
672 672 return
673 673 pushop.stepsdone.add('bookmarks')
674 674 part2book = []
675 675 enc = pushkey.encode
676 676
677 677 def handlefailure(pushop, exc):
678 678 targetid = int(exc.partid)
679 679 for partid, book, action in part2book:
680 680 if partid == targetid:
681 681 raise error.Abort(bookmsgmap[action][1].rstrip() % book)
682 682 # we should not be called for part we did not generated
683 683 assert False
684 684
685 685 for book, old, new in pushop.outbookmarks:
686 686 part = bundler.newpart('pushkey')
687 687 part.addparam('namespace', enc('bookmarks'))
688 688 part.addparam('key', enc(book))
689 689 part.addparam('old', enc(old))
690 690 part.addparam('new', enc(new))
691 691 action = 'update'
692 692 if not old:
693 693 action = 'export'
694 694 elif not new:
695 695 action = 'delete'
696 696 part2book.append((part.id, book, action))
697 697 pushop.pkfailcb[part.id] = handlefailure
698 698
699 699 def handlereply(op):
700 700 ui = pushop.ui
701 701 for partid, book, action in part2book:
702 702 partrep = op.records.getreplies(partid)
703 703 results = partrep['pushkey']
704 704 assert len(results) <= 1
705 705 if not results:
706 706 pushop.ui.warn(_('server ignored bookmark %s update\n') % book)
707 707 else:
708 708 ret = int(results[0]['return'])
709 709 if ret:
710 710 ui.status(bookmsgmap[action][0] % book)
711 711 else:
712 712 ui.warn(bookmsgmap[action][1] % book)
713 713 if pushop.bkresult is not None:
714 714 pushop.bkresult = 1
715 715 return handlereply
716 716
717 717
718 718 def _pushbundle2(pushop):
719 719 """push data to the remote using bundle2
720 720
721 721 The only currently supported type of data is changegroup but this will
722 722 evolve in the future."""
723 723 bundler = bundle2.bundle20(pushop.ui, bundle2.bundle2caps(pushop.remote))
724 724 pushback = (pushop.trmanager
725 725 and pushop.ui.configbool('experimental', 'bundle2.pushback'))
726 726
727 727 # create reply capability
728 728 capsblob = bundle2.encodecaps(bundle2.getrepocaps(pushop.repo,
729 729 allowpushback=pushback))
730 730 bundler.newpart('replycaps', data=capsblob)
731 731 replyhandlers = []
732 732 for partgenname in b2partsgenorder:
733 733 partgen = b2partsgenmapping[partgenname]
734 734 ret = partgen(pushop, bundler)
735 735 if callable(ret):
736 736 replyhandlers.append(ret)
737 737 # do not push if nothing to push
738 738 if bundler.nbparts <= 1:
739 739 return
740 740 stream = util.chunkbuffer(bundler.getchunks())
741 741 try:
742 742 try:
743 743 reply = pushop.remote.unbundle(stream, ['force'], 'push')
744 744 except error.BundleValueError as exc:
745 745 raise error.Abort('missing support for %s' % exc)
746 746 try:
747 747 trgetter = None
748 748 if pushback:
749 749 trgetter = pushop.trmanager.transaction
750 750 op = bundle2.processbundle(pushop.repo, reply, trgetter)
751 751 except error.BundleValueError as exc:
752 752 raise error.Abort('missing support for %s' % exc)
753 753 except error.PushkeyFailed as exc:
754 754 partid = int(exc.partid)
755 755 if partid not in pushop.pkfailcb:
756 756 raise
757 757 pushop.pkfailcb[partid](pushop, exc)
758 758 for rephand in replyhandlers:
759 759 rephand(op)
760 760
761 761 def _pushchangeset(pushop):
762 762 """Make the actual push of changeset bundle to remote repo"""
763 763 if 'changesets' in pushop.stepsdone:
764 764 return
765 765 pushop.stepsdone.add('changesets')
766 766 if not _pushcheckoutgoing(pushop):
767 767 return
768 768 pushop.repo.prepushoutgoinghooks(pushop.repo,
769 769 pushop.remote,
770 770 pushop.outgoing)
771 771 outgoing = pushop.outgoing
772 772 unbundle = pushop.remote.capable('unbundle')
773 773 # TODO: get bundlecaps from remote
774 774 bundlecaps = None
775 775 # create a changegroup from local
776 776 if pushop.revs is None and not (outgoing.excluded
777 777 or pushop.repo.changelog.filteredrevs):
778 778 # push everything,
779 779 # use the fast path, no race possible on push
780 780 bundler = changegroup.cg1packer(pushop.repo, bundlecaps)
781 781 cg = changegroup.getsubset(pushop.repo,
782 782 outgoing,
783 783 bundler,
784 784 'push',
785 785 fastpath=True)
786 786 else:
787 787 cg = changegroup.getlocalchangegroup(pushop.repo, 'push', outgoing,
788 788 bundlecaps)
789 789
790 790 # apply changegroup to remote
791 791 if unbundle:
792 792 # local repo finds heads on server, finds out what
793 793 # revs it must push. once revs transferred, if server
794 794 # finds it has different heads (someone else won
795 795 # commit/push race), server aborts.
796 796 if pushop.force:
797 797 remoteheads = ['force']
798 798 else:
799 799 remoteheads = pushop.remoteheads
800 800 # ssh: return remote's addchangegroup()
801 801 # http: return remote's addchangegroup() or 0 for error
802 802 pushop.cgresult = pushop.remote.unbundle(cg, remoteheads,
803 803 pushop.repo.url())
804 804 else:
805 805 # we return an integer indicating remote head count
806 806 # change
807 807 pushop.cgresult = pushop.remote.addchangegroup(cg, 'push',
808 808 pushop.repo.url())
809 809
810 810 def _pushsyncphase(pushop):
811 811 """synchronise phase information locally and remotely"""
812 812 cheads = pushop.commonheads
813 813 # even when we don't push, exchanging phase data is useful
814 814 remotephases = pushop.remote.listkeys('phases')
815 815 if (pushop.ui.configbool('ui', '_usedassubrepo', False)
816 816 and remotephases # server supports phases
817 817 and pushop.cgresult is None # nothing was pushed
818 818 and remotephases.get('publishing', False)):
819 819 # When:
820 820 # - this is a subrepo push
821 821 # - and remote support phase
822 822 # - and no changeset was pushed
823 823 # - and remote is publishing
824 824 # We may be in issue 3871 case!
825 825 # We drop the possible phase synchronisation done by
826 826 # courtesy to publish changesets possibly locally draft
827 827 # on the remote.
828 828 remotephases = {'publishing': 'True'}
829 829 if not remotephases: # old server or public only reply from non-publishing
830 830 _localphasemove(pushop, cheads)
831 831 # don't push any phase data as there is nothing to push
832 832 else:
833 833 ana = phases.analyzeremotephases(pushop.repo, cheads,
834 834 remotephases)
835 835 pheads, droots = ana
836 836 ### Apply remote phase on local
837 837 if remotephases.get('publishing', False):
838 838 _localphasemove(pushop, cheads)
839 839 else: # publish = False
840 840 _localphasemove(pushop, pheads)
841 841 _localphasemove(pushop, cheads, phases.draft)
842 842 ### Apply local phase on remote
843 843
844 844 if pushop.cgresult:
845 845 if 'phases' in pushop.stepsdone:
846 846 # phases already pushed though bundle2
847 847 return
848 848 outdated = pushop.outdatedphases
849 849 else:
850 850 outdated = pushop.fallbackoutdatedphases
851 851
852 852 pushop.stepsdone.add('phases')
853 853
854 854 # filter heads already turned public by the push
855 855 outdated = [c for c in outdated if c.node() not in pheads]
856 856 # fallback to independent pushkey command
857 857 for newremotehead in outdated:
858 858 r = pushop.remote.pushkey('phases',
859 859 newremotehead.hex(),
860 860 str(phases.draft),
861 861 str(phases.public))
862 862 if not r:
863 863 pushop.ui.warn(_('updating %s to public failed!\n')
864 864 % newremotehead)
865 865
866 866 def _localphasemove(pushop, nodes, phase=phases.public):
867 867 """move <nodes> to <phase> in the local source repo"""
868 868 if pushop.trmanager:
869 869 phases.advanceboundary(pushop.repo,
870 870 pushop.trmanager.transaction(),
871 871 phase,
872 872 nodes)
873 873 else:
874 874 # repo is not locked, do not change any phases!
875 875 # Informs the user that phases should have been moved when
876 876 # applicable.
877 877 actualmoves = [n for n in nodes if phase < pushop.repo[n].phase()]
878 878 phasestr = phases.phasenames[phase]
879 879 if actualmoves:
880 880 pushop.ui.status(_('cannot lock source repo, skipping '
881 881 'local %s phase update\n') % phasestr)
882 882
883 883 def _pushobsolete(pushop):
884 884 """utility function to push obsolete markers to a remote"""
885 885 if 'obsmarkers' in pushop.stepsdone:
886 886 return
887 887 repo = pushop.repo
888 888 remote = pushop.remote
889 889 pushop.stepsdone.add('obsmarkers')
890 890 if pushop.outobsmarkers:
891 891 pushop.ui.debug('try to push obsolete markers to remote\n')
892 892 rslts = []
893 893 remotedata = obsolete._pushkeyescape(sorted(pushop.outobsmarkers))
894 894 for key in sorted(remotedata, reverse=True):
895 895 # reverse sort to ensure we end with dump0
896 896 data = remotedata[key]
897 897 rslts.append(remote.pushkey('obsolete', key, '', data))
898 898 if [r for r in rslts if not r]:
899 899 msg = _('failed to push some obsolete markers!\n')
900 900 repo.ui.warn(msg)
901 901
902 902 def _pushbookmark(pushop):
903 903 """Update bookmark position on remote"""
904 904 if pushop.cgresult == 0 or 'bookmarks' in pushop.stepsdone:
905 905 return
906 906 pushop.stepsdone.add('bookmarks')
907 907 ui = pushop.ui
908 908 remote = pushop.remote
909 909
910 910 for b, old, new in pushop.outbookmarks:
911 911 action = 'update'
912 912 if not old:
913 913 action = 'export'
914 914 elif not new:
915 915 action = 'delete'
916 916 if remote.pushkey('bookmarks', b, old, new):
917 917 ui.status(bookmsgmap[action][0] % b)
918 918 else:
919 919 ui.warn(bookmsgmap[action][1] % b)
920 920 # discovery can have set the value form invalid entry
921 921 if pushop.bkresult is not None:
922 922 pushop.bkresult = 1
923 923
924 924 class pulloperation(object):
925 925 """A object that represent a single pull operation
926 926
927 927 It purpose is to carry pull related state and very common operation.
928 928
929 929 A new should be created at the beginning of each pull and discarded
930 930 afterward.
931 931 """
932 932
933 933 def __init__(self, repo, remote, heads=None, force=False, bookmarks=(),
934 934 remotebookmarks=None, streamclonerequested=None):
935 935 # repo we pull into
936 936 self.repo = repo
937 937 # repo we pull from
938 938 self.remote = remote
939 939 # revision we try to pull (None is "all")
940 940 self.heads = heads
941 941 # bookmark pulled explicitly
942 942 self.explicitbookmarks = bookmarks
943 943 # do we force pull?
944 944 self.force = force
945 945 # whether a streaming clone was requested
946 946 self.streamclonerequested = streamclonerequested
947 947 # transaction manager
948 948 self.trmanager = None
949 949 # set of common changeset between local and remote before pull
950 950 self.common = None
951 951 # set of pulled head
952 952 self.rheads = None
953 953 # list of missing changeset to fetch remotely
954 954 self.fetch = None
955 955 # remote bookmarks data
956 956 self.remotebookmarks = remotebookmarks
957 957 # result of changegroup pulling (used as return code by pull)
958 958 self.cgresult = None
959 959 # list of step already done
960 960 self.stepsdone = set()
961 961 # Whether we attempted a clone from pre-generated bundles.
962 962 self.clonebundleattempted = False
963 963
964 964 @util.propertycache
965 965 def pulledsubset(self):
966 966 """heads of the set of changeset target by the pull"""
967 967 # compute target subset
968 968 if self.heads is None:
969 969 # We pulled every thing possible
970 970 # sync on everything common
971 971 c = set(self.common)
972 972 ret = list(self.common)
973 973 for n in self.rheads:
974 974 if n not in c:
975 975 ret.append(n)
976 976 return ret
977 977 else:
978 978 # We pulled a specific subset
979 979 # sync on this subset
980 980 return self.heads
981 981
982 982 @util.propertycache
983 983 def canusebundle2(self):
984 984 return _canusebundle2(self)
985 985
986 986 @util.propertycache
987 987 def remotebundle2caps(self):
988 988 return bundle2.bundle2caps(self.remote)
989 989
990 990 def gettransaction(self):
991 991 # deprecated; talk to trmanager directly
992 992 return self.trmanager.transaction()
993 993
994 994 class transactionmanager(object):
995 995 """An object to manage the life cycle of a transaction
996 996
997 997 It creates the transaction on demand and calls the appropriate hooks when
998 998 closing the transaction."""
999 999 def __init__(self, repo, source, url):
1000 1000 self.repo = repo
1001 1001 self.source = source
1002 1002 self.url = url
1003 1003 self._tr = None
1004 1004
1005 1005 def transaction(self):
1006 1006 """Return an open transaction object, constructing if necessary"""
1007 1007 if not self._tr:
1008 1008 trname = '%s\n%s' % (self.source, util.hidepassword(self.url))
1009 1009 self._tr = self.repo.transaction(trname)
1010 1010 self._tr.hookargs['source'] = self.source
1011 1011 self._tr.hookargs['url'] = self.url
1012 1012 return self._tr
1013 1013
1014 1014 def close(self):
1015 1015 """close transaction if created"""
1016 1016 if self._tr is not None:
1017 1017 self._tr.close()
1018 1018
1019 1019 def release(self):
1020 1020 """release transaction if created"""
1021 1021 if self._tr is not None:
1022 1022 self._tr.release()
1023 1023
1024 1024 def pull(repo, remote, heads=None, force=False, bookmarks=(), opargs=None,
1025 1025 streamclonerequested=None):
1026 1026 """Fetch repository data from a remote.
1027 1027
1028 1028 This is the main function used to retrieve data from a remote repository.
1029 1029
1030 1030 ``repo`` is the local repository to clone into.
1031 1031 ``remote`` is a peer instance.
1032 1032 ``heads`` is an iterable of revisions we want to pull. ``None`` (the
1033 1033 default) means to pull everything from the remote.
1034 1034 ``bookmarks`` is an iterable of bookmarks requesting to be pulled. By
1035 1035 default, all remote bookmarks are pulled.
1036 1036 ``opargs`` are additional keyword arguments to pass to ``pulloperation``
1037 1037 initialization.
1038 1038 ``streamclonerequested`` is a boolean indicating whether a "streaming
1039 1039 clone" is requested. A "streaming clone" is essentially a raw file copy
1040 1040 of revlogs from the server. This only works when the local repository is
1041 1041 empty. The default value of ``None`` means to respect the server
1042 1042 configuration for preferring stream clones.
1043 1043
1044 1044 Returns the ``pulloperation`` created for this pull.
1045 1045 """
1046 1046 if opargs is None:
1047 1047 opargs = {}
1048 1048 pullop = pulloperation(repo, remote, heads, force, bookmarks=bookmarks,
1049 1049 streamclonerequested=streamclonerequested, **opargs)
1050 1050 if pullop.remote.local():
1051 1051 missing = set(pullop.remote.requirements) - pullop.repo.supported
1052 1052 if missing:
1053 1053 msg = _("required features are not"
1054 1054 " supported in the destination:"
1055 1055 " %s") % (', '.join(sorted(missing)))
1056 1056 raise error.Abort(msg)
1057 1057
1058 1058 lock = pullop.repo.lock()
1059 1059 try:
1060 1060 pullop.trmanager = transactionmanager(repo, 'pull', remote.url())
1061 1061 streamclone.maybeperformlegacystreamclone(pullop)
1062 1062 # This should ideally be in _pullbundle2(). However, it needs to run
1063 1063 # before discovery to avoid extra work.
1064 1064 _maybeapplyclonebundle(pullop)
1065 1065 _pulldiscovery(pullop)
1066 1066 if pullop.canusebundle2:
1067 1067 _pullbundle2(pullop)
1068 1068 _pullchangeset(pullop)
1069 1069 _pullphase(pullop)
1070 1070 _pullbookmarks(pullop)
1071 1071 _pullobsolete(pullop)
1072 1072 pullop.trmanager.close()
1073 1073 finally:
1074 1074 pullop.trmanager.release()
1075 1075 lock.release()
1076 1076
1077 1077 return pullop
1078 1078
1079 1079 # list of steps to perform discovery before pull
1080 1080 pulldiscoveryorder = []
1081 1081
1082 1082 # Mapping between step name and function
1083 1083 #
1084 1084 # This exists to help extensions wrap steps if necessary
1085 1085 pulldiscoverymapping = {}
1086 1086
1087 1087 def pulldiscovery(stepname):
1088 1088 """decorator for function performing discovery before pull
1089 1089
1090 1090 The function is added to the step -> function mapping and appended to the
1091 1091 list of steps. Beware that decorated function will be added in order (this
1092 1092 may matter).
1093 1093
1094 1094 You can only use this decorator for a new step, if you want to wrap a step
1095 1095 from an extension, change the pulldiscovery dictionary directly."""
1096 1096 def dec(func):
1097 1097 assert stepname not in pulldiscoverymapping
1098 1098 pulldiscoverymapping[stepname] = func
1099 1099 pulldiscoveryorder.append(stepname)
1100 1100 return func
1101 1101 return dec
1102 1102
1103 1103 def _pulldiscovery(pullop):
1104 1104 """Run all discovery steps"""
1105 1105 for stepname in pulldiscoveryorder:
1106 1106 step = pulldiscoverymapping[stepname]
1107 1107 step(pullop)
1108 1108
1109 1109 @pulldiscovery('b1:bookmarks')
1110 1110 def _pullbookmarkbundle1(pullop):
1111 1111 """fetch bookmark data in bundle1 case
1112 1112
1113 1113 If not using bundle2, we have to fetch bookmarks before changeset
1114 1114 discovery to reduce the chance and impact of race conditions."""
1115 1115 if pullop.remotebookmarks is not None:
1116 1116 return
1117 1117 if pullop.canusebundle2 and 'listkeys' in pullop.remotebundle2caps:
1118 1118 # all known bundle2 servers now support listkeys, but lets be nice with
1119 1119 # new implementation.
1120 1120 return
1121 1121 pullop.remotebookmarks = pullop.remote.listkeys('bookmarks')
1122 1122
1123 1123
1124 1124 @pulldiscovery('changegroup')
1125 1125 def _pulldiscoverychangegroup(pullop):
1126 1126 """discovery phase for the pull
1127 1127
1128 1128 Current handle changeset discovery only, will change handle all discovery
1129 1129 at some point."""
1130 1130 tmp = discovery.findcommonincoming(pullop.repo,
1131 1131 pullop.remote,
1132 1132 heads=pullop.heads,
1133 1133 force=pullop.force)
1134 1134 common, fetch, rheads = tmp
1135 1135 nm = pullop.repo.unfiltered().changelog.nodemap
1136 1136 if fetch and rheads:
1137 1137 # If a remote heads in filtered locally, lets drop it from the unknown
1138 1138 # remote heads and put in back in common.
1139 1139 #
1140 1140 # This is a hackish solution to catch most of "common but locally
1141 1141 # hidden situation". We do not performs discovery on unfiltered
1142 1142 # repository because it end up doing a pathological amount of round
1143 1143 # trip for w huge amount of changeset we do not care about.
1144 1144 #
1145 1145 # If a set of such "common but filtered" changeset exist on the server
1146 1146 # but are not including a remote heads, we'll not be able to detect it,
1147 1147 scommon = set(common)
1148 1148 filteredrheads = []
1149 1149 for n in rheads:
1150 1150 if n in nm:
1151 1151 if n not in scommon:
1152 1152 common.append(n)
1153 1153 else:
1154 1154 filteredrheads.append(n)
1155 1155 if not filteredrheads:
1156 1156 fetch = []
1157 1157 rheads = filteredrheads
1158 1158 pullop.common = common
1159 1159 pullop.fetch = fetch
1160 1160 pullop.rheads = rheads
1161 1161
1162 1162 def _pullbundle2(pullop):
1163 1163 """pull data using bundle2
1164 1164
1165 1165 For now, the only supported data are changegroup."""
1166 1166 kwargs = {'bundlecaps': caps20to10(pullop.repo)}
1167 1167
1168 1168 streaming, streamreqs = streamclone.canperformstreamclone(pullop)
1169 1169
1170 1170 # pulling changegroup
1171 1171 pullop.stepsdone.add('changegroup')
1172 1172
1173 1173 kwargs['common'] = pullop.common
1174 1174 kwargs['heads'] = pullop.heads or pullop.rheads
1175 1175 kwargs['cg'] = pullop.fetch
1176 1176 if 'listkeys' in pullop.remotebundle2caps:
1177 1177 kwargs['listkeys'] = ['phase']
1178 1178 if pullop.remotebookmarks is None:
1179 1179 # make sure to always includes bookmark data when migrating
1180 1180 # `hg incoming --bundle` to using this function.
1181 1181 kwargs['listkeys'].append('bookmarks')
1182
1183 # If this is a full pull / clone and the server supports the clone bundles
1184 # feature, tell the server whether we attempted a clone bundle. The
1185 # presence of this flag indicates the client supports clone bundles. This
1186 # will enable the server to treat clients that support clone bundles
1187 # differently from those that don't.
1188 if (pullop.remote.capable('clonebundles')
1189 and pullop.heads is None and list(pullop.common) == [nullid]):
1190 kwargs['cbattempted'] = pullop.clonebundleattempted
1191
1182 1192 if streaming:
1183 1193 pullop.repo.ui.status(_('streaming all changes\n'))
1184 1194 elif not pullop.fetch:
1185 1195 pullop.repo.ui.status(_("no changes found\n"))
1186 1196 pullop.cgresult = 0
1187 1197 else:
1188 1198 if pullop.heads is None and list(pullop.common) == [nullid]:
1189 1199 pullop.repo.ui.status(_("requesting all changes\n"))
1190 1200 if obsolete.isenabled(pullop.repo, obsolete.exchangeopt):
1191 1201 remoteversions = bundle2.obsmarkersversion(pullop.remotebundle2caps)
1192 1202 if obsolete.commonversion(remoteversions) is not None:
1193 1203 kwargs['obsmarkers'] = True
1194 1204 pullop.stepsdone.add('obsmarkers')
1195 1205 _pullbundle2extraprepare(pullop, kwargs)
1196 1206 bundle = pullop.remote.getbundle('pull', **kwargs)
1197 1207 try:
1198 1208 op = bundle2.processbundle(pullop.repo, bundle, pullop.gettransaction)
1199 1209 except error.BundleValueError as exc:
1200 1210 raise error.Abort('missing support for %s' % exc)
1201 1211
1202 1212 if pullop.fetch:
1203 1213 results = [cg['return'] for cg in op.records['changegroup']]
1204 1214 pullop.cgresult = changegroup.combineresults(results)
1205 1215
1206 1216 # processing phases change
1207 1217 for namespace, value in op.records['listkeys']:
1208 1218 if namespace == 'phases':
1209 1219 _pullapplyphases(pullop, value)
1210 1220
1211 1221 # processing bookmark update
1212 1222 for namespace, value in op.records['listkeys']:
1213 1223 if namespace == 'bookmarks':
1214 1224 pullop.remotebookmarks = value
1215 1225
1216 1226 # bookmark data were either already there or pulled in the bundle
1217 1227 if pullop.remotebookmarks is not None:
1218 1228 _pullbookmarks(pullop)
1219 1229
1220 1230 def _pullbundle2extraprepare(pullop, kwargs):
1221 1231 """hook function so that extensions can extend the getbundle call"""
1222 1232 pass
1223 1233
1224 1234 def _pullchangeset(pullop):
1225 1235 """pull changeset from unbundle into the local repo"""
1226 1236 # We delay the open of the transaction as late as possible so we
1227 1237 # don't open transaction for nothing or you break future useful
1228 1238 # rollback call
1229 1239 if 'changegroup' in pullop.stepsdone:
1230 1240 return
1231 1241 pullop.stepsdone.add('changegroup')
1232 1242 if not pullop.fetch:
1233 1243 pullop.repo.ui.status(_("no changes found\n"))
1234 1244 pullop.cgresult = 0
1235 1245 return
1236 1246 pullop.gettransaction()
1237 1247 if pullop.heads is None and list(pullop.common) == [nullid]:
1238 1248 pullop.repo.ui.status(_("requesting all changes\n"))
1239 1249 elif pullop.heads is None and pullop.remote.capable('changegroupsubset'):
1240 1250 # issue1320, avoid a race if remote changed after discovery
1241 1251 pullop.heads = pullop.rheads
1242 1252
1243 1253 if pullop.remote.capable('getbundle'):
1244 1254 # TODO: get bundlecaps from remote
1245 1255 cg = pullop.remote.getbundle('pull', common=pullop.common,
1246 1256 heads=pullop.heads or pullop.rheads)
1247 1257 elif pullop.heads is None:
1248 1258 cg = pullop.remote.changegroup(pullop.fetch, 'pull')
1249 1259 elif not pullop.remote.capable('changegroupsubset'):
1250 1260 raise error.Abort(_("partial pull cannot be done because "
1251 1261 "other repository doesn't support "
1252 1262 "changegroupsubset."))
1253 1263 else:
1254 1264 cg = pullop.remote.changegroupsubset(pullop.fetch, pullop.heads, 'pull')
1255 1265 pullop.cgresult = changegroup.addchangegroup(pullop.repo, cg, 'pull',
1256 1266 pullop.remote.url())
1257 1267
1258 1268 def _pullphase(pullop):
1259 1269 # Get remote phases data from remote
1260 1270 if 'phases' in pullop.stepsdone:
1261 1271 return
1262 1272 remotephases = pullop.remote.listkeys('phases')
1263 1273 _pullapplyphases(pullop, remotephases)
1264 1274
1265 1275 def _pullapplyphases(pullop, remotephases):
1266 1276 """apply phase movement from observed remote state"""
1267 1277 if 'phases' in pullop.stepsdone:
1268 1278 return
1269 1279 pullop.stepsdone.add('phases')
1270 1280 publishing = bool(remotephases.get('publishing', False))
1271 1281 if remotephases and not publishing:
1272 1282 # remote is new and unpublishing
1273 1283 pheads, _dr = phases.analyzeremotephases(pullop.repo,
1274 1284 pullop.pulledsubset,
1275 1285 remotephases)
1276 1286 dheads = pullop.pulledsubset
1277 1287 else:
1278 1288 # Remote is old or publishing all common changesets
1279 1289 # should be seen as public
1280 1290 pheads = pullop.pulledsubset
1281 1291 dheads = []
1282 1292 unfi = pullop.repo.unfiltered()
1283 1293 phase = unfi._phasecache.phase
1284 1294 rev = unfi.changelog.nodemap.get
1285 1295 public = phases.public
1286 1296 draft = phases.draft
1287 1297
1288 1298 # exclude changesets already public locally and update the others
1289 1299 pheads = [pn for pn in pheads if phase(unfi, rev(pn)) > public]
1290 1300 if pheads:
1291 1301 tr = pullop.gettransaction()
1292 1302 phases.advanceboundary(pullop.repo, tr, public, pheads)
1293 1303
1294 1304 # exclude changesets already draft locally and update the others
1295 1305 dheads = [pn for pn in dheads if phase(unfi, rev(pn)) > draft]
1296 1306 if dheads:
1297 1307 tr = pullop.gettransaction()
1298 1308 phases.advanceboundary(pullop.repo, tr, draft, dheads)
1299 1309
1300 1310 def _pullbookmarks(pullop):
1301 1311 """process the remote bookmark information to update the local one"""
1302 1312 if 'bookmarks' in pullop.stepsdone:
1303 1313 return
1304 1314 pullop.stepsdone.add('bookmarks')
1305 1315 repo = pullop.repo
1306 1316 remotebookmarks = pullop.remotebookmarks
1307 1317 bookmod.updatefromremote(repo.ui, repo, remotebookmarks,
1308 1318 pullop.remote.url(),
1309 1319 pullop.gettransaction,
1310 1320 explicit=pullop.explicitbookmarks)
1311 1321
1312 1322 def _pullobsolete(pullop):
1313 1323 """utility function to pull obsolete markers from a remote
1314 1324
1315 1325 The `gettransaction` is function that return the pull transaction, creating
1316 1326 one if necessary. We return the transaction to inform the calling code that
1317 1327 a new transaction have been created (when applicable).
1318 1328
1319 1329 Exists mostly to allow overriding for experimentation purpose"""
1320 1330 if 'obsmarkers' in pullop.stepsdone:
1321 1331 return
1322 1332 pullop.stepsdone.add('obsmarkers')
1323 1333 tr = None
1324 1334 if obsolete.isenabled(pullop.repo, obsolete.exchangeopt):
1325 1335 pullop.repo.ui.debug('fetching remote obsolete markers\n')
1326 1336 remoteobs = pullop.remote.listkeys('obsolete')
1327 1337 if 'dump0' in remoteobs:
1328 1338 tr = pullop.gettransaction()
1329 1339 for key in sorted(remoteobs, reverse=True):
1330 1340 if key.startswith('dump'):
1331 1341 data = base85.b85decode(remoteobs[key])
1332 1342 pullop.repo.obsstore.mergemarkers(tr, data)
1333 1343 pullop.repo.invalidatevolatilesets()
1334 1344 return tr
1335 1345
1336 1346 def caps20to10(repo):
1337 1347 """return a set with appropriate options to use bundle20 during getbundle"""
1338 1348 caps = set(['HG20'])
1339 1349 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
1340 1350 caps.add('bundle2=' + urllib.quote(capsblob))
1341 1351 return caps
1342 1352
1343 1353 # List of names of steps to perform for a bundle2 for getbundle, order matters.
1344 1354 getbundle2partsorder = []
1345 1355
1346 1356 # Mapping between step name and function
1347 1357 #
1348 1358 # This exists to help extensions wrap steps if necessary
1349 1359 getbundle2partsmapping = {}
1350 1360
1351 1361 def getbundle2partsgenerator(stepname, idx=None):
1352 1362 """decorator for function generating bundle2 part for getbundle
1353 1363
1354 1364 The function is added to the step -> function mapping and appended to the
1355 1365 list of steps. Beware that decorated functions will be added in order
1356 1366 (this may matter).
1357 1367
1358 1368 You can only use this decorator for new steps, if you want to wrap a step
1359 1369 from an extension, attack the getbundle2partsmapping dictionary directly."""
1360 1370 def dec(func):
1361 1371 assert stepname not in getbundle2partsmapping
1362 1372 getbundle2partsmapping[stepname] = func
1363 1373 if idx is None:
1364 1374 getbundle2partsorder.append(stepname)
1365 1375 else:
1366 1376 getbundle2partsorder.insert(idx, stepname)
1367 1377 return func
1368 1378 return dec
1369 1379
1370 1380 def getbundle(repo, source, heads=None, common=None, bundlecaps=None,
1371 1381 **kwargs):
1372 1382 """return a full bundle (with potentially multiple kind of parts)
1373 1383
1374 1384 Could be a bundle HG10 or a bundle HG20 depending on bundlecaps
1375 1385 passed. For now, the bundle can contain only changegroup, but this will
1376 1386 changes when more part type will be available for bundle2.
1377 1387
1378 1388 This is different from changegroup.getchangegroup that only returns an HG10
1379 1389 changegroup bundle. They may eventually get reunited in the future when we
1380 1390 have a clearer idea of the API we what to query different data.
1381 1391
1382 1392 The implementation is at a very early stage and will get massive rework
1383 1393 when the API of bundle is refined.
1384 1394 """
1385 1395 # bundle10 case
1386 1396 usebundle2 = False
1387 1397 if bundlecaps is not None:
1388 1398 usebundle2 = any((cap.startswith('HG2') for cap in bundlecaps))
1389 1399 if not usebundle2:
1390 1400 if bundlecaps and not kwargs.get('cg', True):
1391 1401 raise ValueError(_('request for bundle10 must include changegroup'))
1392 1402
1393 1403 if kwargs:
1394 1404 raise ValueError(_('unsupported getbundle arguments: %s')
1395 1405 % ', '.join(sorted(kwargs.keys())))
1396 1406 return changegroup.getchangegroup(repo, source, heads=heads,
1397 1407 common=common, bundlecaps=bundlecaps)
1398 1408
1399 1409 # bundle20 case
1400 1410 b2caps = {}
1401 1411 for bcaps in bundlecaps:
1402 1412 if bcaps.startswith('bundle2='):
1403 1413 blob = urllib.unquote(bcaps[len('bundle2='):])
1404 1414 b2caps.update(bundle2.decodecaps(blob))
1405 1415 bundler = bundle2.bundle20(repo.ui, b2caps)
1406 1416
1407 1417 kwargs['heads'] = heads
1408 1418 kwargs['common'] = common
1409 1419
1410 1420 for name in getbundle2partsorder:
1411 1421 func = getbundle2partsmapping[name]
1412 1422 func(bundler, repo, source, bundlecaps=bundlecaps, b2caps=b2caps,
1413 1423 **kwargs)
1414 1424
1415 1425 return util.chunkbuffer(bundler.getchunks())
1416 1426
1417 1427 @getbundle2partsgenerator('changegroup')
1418 1428 def _getbundlechangegrouppart(bundler, repo, source, bundlecaps=None,
1419 1429 b2caps=None, heads=None, common=None, **kwargs):
1420 1430 """add a changegroup part to the requested bundle"""
1421 1431 cg = None
1422 1432 if kwargs.get('cg', True):
1423 1433 # build changegroup bundle here.
1424 1434 version = None
1425 1435 cgversions = b2caps.get('changegroup')
1426 1436 getcgkwargs = {}
1427 1437 if cgversions: # 3.1 and 3.2 ship with an empty value
1428 1438 cgversions = [v for v in cgversions if v in changegroup.packermap]
1429 1439 if not cgversions:
1430 1440 raise ValueError(_('no common changegroup version'))
1431 1441 version = getcgkwargs['version'] = max(cgversions)
1432 1442 outgoing = changegroup.computeoutgoing(repo, heads, common)
1433 1443 cg = changegroup.getlocalchangegroupraw(repo, source, outgoing,
1434 1444 bundlecaps=bundlecaps,
1435 1445 **getcgkwargs)
1436 1446
1437 1447 if cg:
1438 1448 part = bundler.newpart('changegroup', data=cg)
1439 1449 if version is not None:
1440 1450 part.addparam('version', version)
1441 1451 part.addparam('nbchanges', str(len(outgoing.missing)), mandatory=False)
1442 1452
1443 1453 @getbundle2partsgenerator('listkeys')
1444 1454 def _getbundlelistkeysparts(bundler, repo, source, bundlecaps=None,
1445 1455 b2caps=None, **kwargs):
1446 1456 """add parts containing listkeys namespaces to the requested bundle"""
1447 1457 listkeys = kwargs.get('listkeys', ())
1448 1458 for namespace in listkeys:
1449 1459 part = bundler.newpart('listkeys')
1450 1460 part.addparam('namespace', namespace)
1451 1461 keys = repo.listkeys(namespace).items()
1452 1462 part.data = pushkey.encodekeys(keys)
1453 1463
1454 1464 @getbundle2partsgenerator('obsmarkers')
1455 1465 def _getbundleobsmarkerpart(bundler, repo, source, bundlecaps=None,
1456 1466 b2caps=None, heads=None, **kwargs):
1457 1467 """add an obsolescence markers part to the requested bundle"""
1458 1468 if kwargs.get('obsmarkers', False):
1459 1469 if heads is None:
1460 1470 heads = repo.heads()
1461 1471 subset = [c.node() for c in repo.set('::%ln', heads)]
1462 1472 markers = repo.obsstore.relevantmarkers(subset)
1463 1473 markers = sorted(markers)
1464 1474 buildobsmarkerspart(bundler, markers)
1465 1475
1466 1476 @getbundle2partsgenerator('hgtagsfnodes')
1467 1477 def _getbundletagsfnodes(bundler, repo, source, bundlecaps=None,
1468 1478 b2caps=None, heads=None, common=None,
1469 1479 **kwargs):
1470 1480 """Transfer the .hgtags filenodes mapping.
1471 1481
1472 1482 Only values for heads in this bundle will be transferred.
1473 1483
1474 1484 The part data consists of pairs of 20 byte changeset node and .hgtags
1475 1485 filenodes raw values.
1476 1486 """
1477 1487 # Don't send unless:
1478 1488 # - changeset are being exchanged,
1479 1489 # - the client supports it.
1480 1490 if not (kwargs.get('cg', True) and 'hgtagsfnodes' in b2caps):
1481 1491 return
1482 1492
1483 1493 outgoing = changegroup.computeoutgoing(repo, heads, common)
1484 1494
1485 1495 if not outgoing.missingheads:
1486 1496 return
1487 1497
1488 1498 cache = tags.hgtagsfnodescache(repo.unfiltered())
1489 1499 chunks = []
1490 1500
1491 1501 # .hgtags fnodes are only relevant for head changesets. While we could
1492 1502 # transfer values for all known nodes, there will likely be little to
1493 1503 # no benefit.
1494 1504 #
1495 1505 # We don't bother using a generator to produce output data because
1496 1506 # a) we only have 40 bytes per head and even esoteric numbers of heads
1497 1507 # consume little memory (1M heads is 40MB) b) we don't want to send the
1498 1508 # part if we don't have entries and knowing if we have entries requires
1499 1509 # cache lookups.
1500 1510 for node in outgoing.missingheads:
1501 1511 # Don't compute missing, as this may slow down serving.
1502 1512 fnode = cache.getfnode(node, computemissing=False)
1503 1513 if fnode is not None:
1504 1514 chunks.extend([node, fnode])
1505 1515
1506 1516 if chunks:
1507 1517 bundler.newpart('hgtagsfnodes', data=''.join(chunks))
1508 1518
1509 1519 def check_heads(repo, their_heads, context):
1510 1520 """check if the heads of a repo have been modified
1511 1521
1512 1522 Used by peer for unbundling.
1513 1523 """
1514 1524 heads = repo.heads()
1515 1525 heads_hash = util.sha1(''.join(sorted(heads))).digest()
1516 1526 if not (their_heads == ['force'] or their_heads == heads or
1517 1527 their_heads == ['hashed', heads_hash]):
1518 1528 # someone else committed/pushed/unbundled while we
1519 1529 # were transferring data
1520 1530 raise error.PushRaced('repository changed while %s - '
1521 1531 'please try again' % context)
1522 1532
1523 1533 def unbundle(repo, cg, heads, source, url):
1524 1534 """Apply a bundle to a repo.
1525 1535
1526 1536 this function makes sure the repo is locked during the application and have
1527 1537 mechanism to check that no push race occurred between the creation of the
1528 1538 bundle and its application.
1529 1539
1530 1540 If the push was raced as PushRaced exception is raised."""
1531 1541 r = 0
1532 1542 # need a transaction when processing a bundle2 stream
1533 1543 # [wlock, lock, tr] - needs to be an array so nested functions can modify it
1534 1544 lockandtr = [None, None, None]
1535 1545 recordout = None
1536 1546 # quick fix for output mismatch with bundle2 in 3.4
1537 1547 captureoutput = repo.ui.configbool('experimental', 'bundle2-output-capture',
1538 1548 False)
1539 1549 if url.startswith('remote:http:') or url.startswith('remote:https:'):
1540 1550 captureoutput = True
1541 1551 try:
1542 1552 check_heads(repo, heads, 'uploading changes')
1543 1553 # push can proceed
1544 1554 if util.safehasattr(cg, 'params'):
1545 1555 r = None
1546 1556 try:
1547 1557 def gettransaction():
1548 1558 if not lockandtr[2]:
1549 1559 lockandtr[0] = repo.wlock()
1550 1560 lockandtr[1] = repo.lock()
1551 1561 lockandtr[2] = repo.transaction(source)
1552 1562 lockandtr[2].hookargs['source'] = source
1553 1563 lockandtr[2].hookargs['url'] = url
1554 1564 lockandtr[2].hookargs['bundle2'] = '1'
1555 1565 return lockandtr[2]
1556 1566
1557 1567 # Do greedy locking by default until we're satisfied with lazy
1558 1568 # locking.
1559 1569 if not repo.ui.configbool('experimental', 'bundle2lazylocking'):
1560 1570 gettransaction()
1561 1571
1562 1572 op = bundle2.bundleoperation(repo, gettransaction,
1563 1573 captureoutput=captureoutput)
1564 1574 try:
1565 1575 op = bundle2.processbundle(repo, cg, op=op)
1566 1576 finally:
1567 1577 r = op.reply
1568 1578 if captureoutput and r is not None:
1569 1579 repo.ui.pushbuffer(error=True, subproc=True)
1570 1580 def recordout(output):
1571 1581 r.newpart('output', data=output, mandatory=False)
1572 1582 if lockandtr[2] is not None:
1573 1583 lockandtr[2].close()
1574 1584 except BaseException as exc:
1575 1585 exc.duringunbundle2 = True
1576 1586 if captureoutput and r is not None:
1577 1587 parts = exc._bundle2salvagedoutput = r.salvageoutput()
1578 1588 def recordout(output):
1579 1589 part = bundle2.bundlepart('output', data=output,
1580 1590 mandatory=False)
1581 1591 parts.append(part)
1582 1592 raise
1583 1593 else:
1584 1594 lockandtr[1] = repo.lock()
1585 1595 r = changegroup.addchangegroup(repo, cg, source, url)
1586 1596 finally:
1587 1597 lockmod.release(lockandtr[2], lockandtr[1], lockandtr[0])
1588 1598 if recordout is not None:
1589 1599 recordout(repo.ui.popbuffer())
1590 1600 return r
1591 1601
1592 1602 def _maybeapplyclonebundle(pullop):
1593 1603 """Apply a clone bundle from a remote, if possible."""
1594 1604
1595 1605 repo = pullop.repo
1596 1606 remote = pullop.remote
1597 1607
1598 1608 if not repo.ui.configbool('experimental', 'clonebundles', False):
1599 1609 return
1600 1610
1601 1611 if pullop.heads:
1602 1612 return
1603 1613
1604 1614 if not remote.capable('clonebundles'):
1605 1615 return
1606 1616
1607 1617 res = remote._call('clonebundles')
1608 1618
1609 1619 # If we call the wire protocol command, that's good enough to record the
1610 1620 # attempt.
1611 1621 pullop.clonebundleattempted = True
1612 1622
1613 1623 entries = parseclonebundlesmanifest(repo, res)
1614 1624 if not entries:
1615 1625 repo.ui.note(_('no clone bundles available on remote; '
1616 1626 'falling back to regular clone\n'))
1617 1627 return
1618 1628
1619 1629 entries = filterclonebundleentries(repo, entries)
1620 1630 if not entries:
1621 1631 # There is a thundering herd concern here. However, if a server
1622 1632 # operator doesn't advertise bundles appropriate for its clients,
1623 1633 # they deserve what's coming. Furthermore, from a client's
1624 1634 # perspective, no automatic fallback would mean not being able to
1625 1635 # clone!
1626 1636 repo.ui.warn(_('no compatible clone bundles available on server; '
1627 1637 'falling back to regular clone\n'))
1628 1638 repo.ui.warn(_('(you may want to report this to the server '
1629 1639 'operator)\n'))
1630 1640 return
1631 1641
1632 1642 entries = sortclonebundleentries(repo.ui, entries)
1633 1643
1634 1644 url = entries[0]['URL']
1635 1645 repo.ui.status(_('applying clone bundle from %s\n') % url)
1636 1646 if trypullbundlefromurl(repo.ui, repo, url):
1637 1647 repo.ui.status(_('finished applying clone bundle\n'))
1638 1648 # Bundle failed.
1639 1649 #
1640 1650 # We abort by default to avoid the thundering herd of
1641 1651 # clients flooding a server that was expecting expensive
1642 1652 # clone load to be offloaded.
1643 1653 elif repo.ui.configbool('ui', 'clonebundlefallback', False):
1644 1654 repo.ui.warn(_('falling back to normal clone\n'))
1645 1655 else:
1646 1656 raise error.Abort(_('error applying bundle'),
1647 1657 hint=_('if this error persists, consider contacting '
1648 1658 'the server operator or disable clone '
1649 1659 'bundles via '
1650 1660 '"--config experimental.clonebundles=false"'))
1651 1661
1652 1662 def parseclonebundlesmanifest(repo, s):
1653 1663 """Parses the raw text of a clone bundles manifest.
1654 1664
1655 1665 Returns a list of dicts. The dicts have a ``URL`` key corresponding
1656 1666 to the URL and other keys are the attributes for the entry.
1657 1667 """
1658 1668 m = []
1659 1669 for line in s.splitlines():
1660 1670 fields = line.split()
1661 1671 if not fields:
1662 1672 continue
1663 1673 attrs = {'URL': fields[0]}
1664 1674 for rawattr in fields[1:]:
1665 1675 key, value = rawattr.split('=', 1)
1666 1676 key = urllib.unquote(key)
1667 1677 value = urllib.unquote(value)
1668 1678 attrs[key] = value
1669 1679
1670 1680 # Parse BUNDLESPEC into components. This makes client-side
1671 1681 # preferences easier to specify since you can prefer a single
1672 1682 # component of the BUNDLESPEC.
1673 1683 if key == 'BUNDLESPEC':
1674 1684 try:
1675 1685 comp, version = parsebundlespec(repo, value,
1676 1686 externalnames=True)
1677 1687 attrs['COMPRESSION'] = comp
1678 1688 attrs['VERSION'] = version
1679 1689 except error.InvalidBundleSpecification:
1680 1690 pass
1681 1691 except error.UnsupportedBundleSpecification:
1682 1692 pass
1683 1693
1684 1694 m.append(attrs)
1685 1695
1686 1696 return m
1687 1697
1688 1698 def filterclonebundleentries(repo, entries):
1689 1699 """Remove incompatible clone bundle manifest entries.
1690 1700
1691 1701 Accepts a list of entries parsed with ``parseclonebundlesmanifest``
1692 1702 and returns a new list consisting of only the entries that this client
1693 1703 should be able to apply.
1694 1704
1695 1705 There is no guarantee we'll be able to apply all returned entries because
1696 1706 the metadata we use to filter on may be missing or wrong.
1697 1707 """
1698 1708 newentries = []
1699 1709 for entry in entries:
1700 1710 spec = entry.get('BUNDLESPEC')
1701 1711 if spec:
1702 1712 try:
1703 1713 parsebundlespec(repo, spec, strict=True)
1704 1714 except error.InvalidBundleSpecification as e:
1705 1715 repo.ui.debug(str(e) + '\n')
1706 1716 continue
1707 1717 except error.UnsupportedBundleSpecification as e:
1708 1718 repo.ui.debug('filtering %s because unsupported bundle '
1709 1719 'spec: %s\n' % (entry['URL'], str(e)))
1710 1720 continue
1711 1721
1712 1722 if 'REQUIRESNI' in entry and not sslutil.hassni:
1713 1723 repo.ui.debug('filtering %s because SNI not supported\n' %
1714 1724 entry['URL'])
1715 1725 continue
1716 1726
1717 1727 newentries.append(entry)
1718 1728
1719 1729 return newentries
1720 1730
1721 1731 def sortclonebundleentries(ui, entries):
1722 1732 # experimental config: experimental.clonebundleprefers
1723 1733 prefers = ui.configlist('experimental', 'clonebundleprefers', default=[])
1724 1734 if not prefers:
1725 1735 return list(entries)
1726 1736
1727 1737 prefers = [p.split('=', 1) for p in prefers]
1728 1738
1729 1739 # Our sort function.
1730 1740 def compareentry(a, b):
1731 1741 for prefkey, prefvalue in prefers:
1732 1742 avalue = a.get(prefkey)
1733 1743 bvalue = b.get(prefkey)
1734 1744
1735 1745 # Special case for b missing attribute and a matches exactly.
1736 1746 if avalue is not None and bvalue is None and avalue == prefvalue:
1737 1747 return -1
1738 1748
1739 1749 # Special case for a missing attribute and b matches exactly.
1740 1750 if bvalue is not None and avalue is None and bvalue == prefvalue:
1741 1751 return 1
1742 1752
1743 1753 # We can't compare unless attribute present on both.
1744 1754 if avalue is None or bvalue is None:
1745 1755 continue
1746 1756
1747 1757 # Same values should fall back to next attribute.
1748 1758 if avalue == bvalue:
1749 1759 continue
1750 1760
1751 1761 # Exact matches come first.
1752 1762 if avalue == prefvalue:
1753 1763 return -1
1754 1764 if bvalue == prefvalue:
1755 1765 return 1
1756 1766
1757 1767 # Fall back to next attribute.
1758 1768 continue
1759 1769
1760 1770 # If we got here we couldn't sort by attributes and prefers. Fall
1761 1771 # back to index order.
1762 1772 return 0
1763 1773
1764 1774 return sorted(entries, cmp=compareentry)
1765 1775
1766 1776 def trypullbundlefromurl(ui, repo, url):
1767 1777 """Attempt to apply a bundle from a URL."""
1768 1778 lock = repo.lock()
1769 1779 try:
1770 1780 tr = repo.transaction('bundleurl')
1771 1781 try:
1772 1782 try:
1773 1783 fh = urlmod.open(ui, url)
1774 1784 cg = readbundle(ui, fh, 'stream')
1775 1785
1776 1786 if isinstance(cg, bundle2.unbundle20):
1777 1787 bundle2.processbundle(repo, cg, lambda: tr)
1778 1788 else:
1779 1789 changegroup.addchangegroup(repo, cg, 'clonebundles', url)
1780 1790 tr.close()
1781 1791 return True
1782 1792 except urllib2.HTTPError as e:
1783 1793 ui.warn(_('HTTP error fetching bundle: %s\n') % str(e))
1784 1794 except urllib2.URLError as e:
1785 1795 ui.warn(_('error fetching bundle: %s\n') % e.reason)
1786 1796
1787 1797 return False
1788 1798 finally:
1789 1799 tr.release()
1790 1800 finally:
1791 1801 lock.release()
@@ -1,815 +1,816
1 1 # wireproto.py - generic wire protocol support functions
2 2 #
3 3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import os
11 11 import sys
12 12 import tempfile
13 13 import urllib
14 14
15 15 from .i18n import _
16 16 from .node import (
17 17 bin,
18 18 hex,
19 19 )
20 20
21 21 from . import (
22 22 bundle2,
23 23 changegroup as changegroupmod,
24 24 encoding,
25 25 error,
26 26 exchange,
27 27 peer,
28 28 pushkey as pushkeymod,
29 29 streamclone,
30 30 util,
31 31 )
32 32
33 33 class abstractserverproto(object):
34 34 """abstract class that summarizes the protocol API
35 35
36 36 Used as reference and documentation.
37 37 """
38 38
39 39 def getargs(self, args):
40 40 """return the value for arguments in <args>
41 41
42 42 returns a list of values (same order as <args>)"""
43 43 raise NotImplementedError()
44 44
45 45 def getfile(self, fp):
46 46 """write the whole content of a file into a file like object
47 47
48 48 The file is in the form::
49 49
50 50 (<chunk-size>\n<chunk>)+0\n
51 51
52 52 chunk size is the ascii version of the int.
53 53 """
54 54 raise NotImplementedError()
55 55
56 56 def redirect(self):
57 57 """may setup interception for stdout and stderr
58 58
59 59 See also the `restore` method."""
60 60 raise NotImplementedError()
61 61
62 62 # If the `redirect` function does install interception, the `restore`
63 63 # function MUST be defined. If interception is not used, this function
64 64 # MUST NOT be defined.
65 65 #
66 66 # left commented here on purpose
67 67 #
68 68 #def restore(self):
69 69 # """reinstall previous stdout and stderr and return intercepted stdout
70 70 # """
71 71 # raise NotImplementedError()
72 72
73 73 def groupchunks(self, cg):
74 74 """return 4096 chunks from a changegroup object
75 75
76 76 Some protocols may have compressed the contents."""
77 77 raise NotImplementedError()
78 78
79 79 class remotebatch(peer.batcher):
80 80 '''batches the queued calls; uses as few roundtrips as possible'''
81 81 def __init__(self, remote):
82 82 '''remote must support _submitbatch(encbatch) and
83 83 _submitone(op, encargs)'''
84 84 peer.batcher.__init__(self)
85 85 self.remote = remote
86 86 def submit(self):
87 87 req, rsp = [], []
88 88 for name, args, opts, resref in self.calls:
89 89 mtd = getattr(self.remote, name)
90 90 batchablefn = getattr(mtd, 'batchable', None)
91 91 if batchablefn is not None:
92 92 batchable = batchablefn(mtd.im_self, *args, **opts)
93 93 encargsorres, encresref = batchable.next()
94 94 if encresref:
95 95 req.append((name, encargsorres,))
96 96 rsp.append((batchable, encresref, resref,))
97 97 else:
98 98 resref.set(encargsorres)
99 99 else:
100 100 if req:
101 101 self._submitreq(req, rsp)
102 102 req, rsp = [], []
103 103 resref.set(mtd(*args, **opts))
104 104 if req:
105 105 self._submitreq(req, rsp)
106 106 def _submitreq(self, req, rsp):
107 107 encresults = self.remote._submitbatch(req)
108 108 for encres, r in zip(encresults, rsp):
109 109 batchable, encresref, resref = r
110 110 encresref.set(encres)
111 111 resref.set(batchable.next())
112 112
113 113 # Forward a couple of names from peer to make wireproto interactions
114 114 # slightly more sensible.
115 115 batchable = peer.batchable
116 116 future = peer.future
117 117
118 118 # list of nodes encoding / decoding
119 119
120 120 def decodelist(l, sep=' '):
121 121 if l:
122 122 return map(bin, l.split(sep))
123 123 return []
124 124
125 125 def encodelist(l, sep=' '):
126 126 try:
127 127 return sep.join(map(hex, l))
128 128 except TypeError:
129 129 raise
130 130
131 131 # batched call argument encoding
132 132
133 133 def escapearg(plain):
134 134 return (plain
135 135 .replace(':', ':c')
136 136 .replace(',', ':o')
137 137 .replace(';', ':s')
138 138 .replace('=', ':e'))
139 139
140 140 def unescapearg(escaped):
141 141 return (escaped
142 142 .replace(':e', '=')
143 143 .replace(':s', ';')
144 144 .replace(':o', ',')
145 145 .replace(':c', ':'))
146 146
147 147 # mapping of options accepted by getbundle and their types
148 148 #
149 149 # Meant to be extended by extensions. It is extensions responsibility to ensure
150 150 # such options are properly processed in exchange.getbundle.
151 151 #
152 152 # supported types are:
153 153 #
154 154 # :nodes: list of binary nodes
155 155 # :csv: list of comma-separated values
156 156 # :scsv: list of comma-separated values return as set
157 157 # :plain: string with no transformation needed.
158 158 gboptsmap = {'heads': 'nodes',
159 159 'common': 'nodes',
160 160 'obsmarkers': 'boolean',
161 161 'bundlecaps': 'scsv',
162 162 'listkeys': 'csv',
163 'cg': 'boolean'}
163 'cg': 'boolean',
164 'cbattempted': 'boolean'}
164 165
165 166 # client side
166 167
167 168 class wirepeer(peer.peerrepository):
168 169
169 170 def batch(self):
170 171 if self.capable('batch'):
171 172 return remotebatch(self)
172 173 else:
173 174 return peer.localbatch(self)
174 175 def _submitbatch(self, req):
175 176 cmds = []
176 177 for op, argsdict in req:
177 178 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
178 179 for k, v in argsdict.iteritems())
179 180 cmds.append('%s %s' % (op, args))
180 181 rsp = self._call("batch", cmds=';'.join(cmds))
181 182 return [unescapearg(r) for r in rsp.split(';')]
182 183 def _submitone(self, op, args):
183 184 return self._call(op, **args)
184 185
185 186 @batchable
186 187 def lookup(self, key):
187 188 self.requirecap('lookup', _('look up remote revision'))
188 189 f = future()
189 190 yield {'key': encoding.fromlocal(key)}, f
190 191 d = f.value
191 192 success, data = d[:-1].split(" ", 1)
192 193 if int(success):
193 194 yield bin(data)
194 195 self._abort(error.RepoError(data))
195 196
196 197 @batchable
197 198 def heads(self):
198 199 f = future()
199 200 yield {}, f
200 201 d = f.value
201 202 try:
202 203 yield decodelist(d[:-1])
203 204 except ValueError:
204 205 self._abort(error.ResponseError(_("unexpected response:"), d))
205 206
206 207 @batchable
207 208 def known(self, nodes):
208 209 f = future()
209 210 yield {'nodes': encodelist(nodes)}, f
210 211 d = f.value
211 212 try:
212 213 yield [bool(int(b)) for b in d]
213 214 except ValueError:
214 215 self._abort(error.ResponseError(_("unexpected response:"), d))
215 216
216 217 @batchable
217 218 def branchmap(self):
218 219 f = future()
219 220 yield {}, f
220 221 d = f.value
221 222 try:
222 223 branchmap = {}
223 224 for branchpart in d.splitlines():
224 225 branchname, branchheads = branchpart.split(' ', 1)
225 226 branchname = encoding.tolocal(urllib.unquote(branchname))
226 227 branchheads = decodelist(branchheads)
227 228 branchmap[branchname] = branchheads
228 229 yield branchmap
229 230 except TypeError:
230 231 self._abort(error.ResponseError(_("unexpected response:"), d))
231 232
232 233 def branches(self, nodes):
233 234 n = encodelist(nodes)
234 235 d = self._call("branches", nodes=n)
235 236 try:
236 237 br = [tuple(decodelist(b)) for b in d.splitlines()]
237 238 return br
238 239 except ValueError:
239 240 self._abort(error.ResponseError(_("unexpected response:"), d))
240 241
241 242 def between(self, pairs):
242 243 batch = 8 # avoid giant requests
243 244 r = []
244 245 for i in xrange(0, len(pairs), batch):
245 246 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
246 247 d = self._call("between", pairs=n)
247 248 try:
248 249 r.extend(l and decodelist(l) or [] for l in d.splitlines())
249 250 except ValueError:
250 251 self._abort(error.ResponseError(_("unexpected response:"), d))
251 252 return r
252 253
253 254 @batchable
254 255 def pushkey(self, namespace, key, old, new):
255 256 if not self.capable('pushkey'):
256 257 yield False, None
257 258 f = future()
258 259 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
259 260 yield {'namespace': encoding.fromlocal(namespace),
260 261 'key': encoding.fromlocal(key),
261 262 'old': encoding.fromlocal(old),
262 263 'new': encoding.fromlocal(new)}, f
263 264 d = f.value
264 265 d, output = d.split('\n', 1)
265 266 try:
266 267 d = bool(int(d))
267 268 except ValueError:
268 269 raise error.ResponseError(
269 270 _('push failed (unexpected response):'), d)
270 271 for l in output.splitlines(True):
271 272 self.ui.status(_('remote: '), l)
272 273 yield d
273 274
274 275 @batchable
275 276 def listkeys(self, namespace):
276 277 if not self.capable('pushkey'):
277 278 yield {}, None
278 279 f = future()
279 280 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
280 281 yield {'namespace': encoding.fromlocal(namespace)}, f
281 282 d = f.value
282 283 self.ui.debug('received listkey for "%s": %i bytes\n'
283 284 % (namespace, len(d)))
284 285 yield pushkeymod.decodekeys(d)
285 286
286 287 def stream_out(self):
287 288 return self._callstream('stream_out')
288 289
289 290 def changegroup(self, nodes, kind):
290 291 n = encodelist(nodes)
291 292 f = self._callcompressable("changegroup", roots=n)
292 293 return changegroupmod.cg1unpacker(f, 'UN')
293 294
294 295 def changegroupsubset(self, bases, heads, kind):
295 296 self.requirecap('changegroupsubset', _('look up remote changes'))
296 297 bases = encodelist(bases)
297 298 heads = encodelist(heads)
298 299 f = self._callcompressable("changegroupsubset",
299 300 bases=bases, heads=heads)
300 301 return changegroupmod.cg1unpacker(f, 'UN')
301 302
302 303 def getbundle(self, source, **kwargs):
303 304 self.requirecap('getbundle', _('look up remote changes'))
304 305 opts = {}
305 306 bundlecaps = kwargs.get('bundlecaps')
306 307 if bundlecaps is not None:
307 308 kwargs['bundlecaps'] = sorted(bundlecaps)
308 309 else:
309 310 bundlecaps = () # kwargs could have it to None
310 311 for key, value in kwargs.iteritems():
311 312 if value is None:
312 313 continue
313 314 keytype = gboptsmap.get(key)
314 315 if keytype is None:
315 316 assert False, 'unexpected'
316 317 elif keytype == 'nodes':
317 318 value = encodelist(value)
318 319 elif keytype in ('csv', 'scsv'):
319 320 value = ','.join(value)
320 321 elif keytype == 'boolean':
321 322 value = '%i' % bool(value)
322 323 elif keytype != 'plain':
323 324 raise KeyError('unknown getbundle option type %s'
324 325 % keytype)
325 326 opts[key] = value
326 327 f = self._callcompressable("getbundle", **opts)
327 328 if any((cap.startswith('HG2') for cap in bundlecaps)):
328 329 return bundle2.getunbundler(self.ui, f)
329 330 else:
330 331 return changegroupmod.cg1unpacker(f, 'UN')
331 332
332 333 def unbundle(self, cg, heads, source):
333 334 '''Send cg (a readable file-like object representing the
334 335 changegroup to push, typically a chunkbuffer object) to the
335 336 remote server as a bundle.
336 337
337 338 When pushing a bundle10 stream, return an integer indicating the
338 339 result of the push (see localrepository.addchangegroup()).
339 340
340 341 When pushing a bundle20 stream, return a bundle20 stream.'''
341 342
342 343 if heads != ['force'] and self.capable('unbundlehash'):
343 344 heads = encodelist(['hashed',
344 345 util.sha1(''.join(sorted(heads))).digest()])
345 346 else:
346 347 heads = encodelist(heads)
347 348
348 349 if util.safehasattr(cg, 'deltaheader'):
349 350 # this a bundle10, do the old style call sequence
350 351 ret, output = self._callpush("unbundle", cg, heads=heads)
351 352 if ret == "":
352 353 raise error.ResponseError(
353 354 _('push failed:'), output)
354 355 try:
355 356 ret = int(ret)
356 357 except ValueError:
357 358 raise error.ResponseError(
358 359 _('push failed (unexpected response):'), ret)
359 360
360 361 for l in output.splitlines(True):
361 362 self.ui.status(_('remote: '), l)
362 363 else:
363 364 # bundle2 push. Send a stream, fetch a stream.
364 365 stream = self._calltwowaystream('unbundle', cg, heads=heads)
365 366 ret = bundle2.getunbundler(self.ui, stream)
366 367 return ret
367 368
368 369 def debugwireargs(self, one, two, three=None, four=None, five=None):
369 370 # don't pass optional arguments left at their default value
370 371 opts = {}
371 372 if three is not None:
372 373 opts['three'] = three
373 374 if four is not None:
374 375 opts['four'] = four
375 376 return self._call('debugwireargs', one=one, two=two, **opts)
376 377
377 378 def _call(self, cmd, **args):
378 379 """execute <cmd> on the server
379 380
380 381 The command is expected to return a simple string.
381 382
382 383 returns the server reply as a string."""
383 384 raise NotImplementedError()
384 385
385 386 def _callstream(self, cmd, **args):
386 387 """execute <cmd> on the server
387 388
388 389 The command is expected to return a stream.
389 390
390 391 returns the server reply as a file like object."""
391 392 raise NotImplementedError()
392 393
393 394 def _callcompressable(self, cmd, **args):
394 395 """execute <cmd> on the server
395 396
396 397 The command is expected to return a stream.
397 398
398 399 The stream may have been compressed in some implementations. This
399 400 function takes care of the decompression. This is the only difference
400 401 with _callstream.
401 402
402 403 returns the server reply as a file like object.
403 404 """
404 405 raise NotImplementedError()
405 406
406 407 def _callpush(self, cmd, fp, **args):
407 408 """execute a <cmd> on server
408 409
409 410 The command is expected to be related to a push. Push has a special
410 411 return method.
411 412
412 413 returns the server reply as a (ret, output) tuple. ret is either
413 414 empty (error) or a stringified int.
414 415 """
415 416 raise NotImplementedError()
416 417
417 418 def _calltwowaystream(self, cmd, fp, **args):
418 419 """execute <cmd> on server
419 420
420 421 The command will send a stream to the server and get a stream in reply.
421 422 """
422 423 raise NotImplementedError()
423 424
424 425 def _abort(self, exception):
425 426 """clearly abort the wire protocol connection and raise the exception
426 427 """
427 428 raise NotImplementedError()
428 429
429 430 # server side
430 431
431 432 # wire protocol command can either return a string or one of these classes.
432 433 class streamres(object):
433 434 """wireproto reply: binary stream
434 435
435 436 The call was successful and the result is a stream.
436 437 Iterate on the `self.gen` attribute to retrieve chunks.
437 438 """
438 439 def __init__(self, gen):
439 440 self.gen = gen
440 441
441 442 class pushres(object):
442 443 """wireproto reply: success with simple integer return
443 444
444 445 The call was successful and returned an integer contained in `self.res`.
445 446 """
446 447 def __init__(self, res):
447 448 self.res = res
448 449
449 450 class pusherr(object):
450 451 """wireproto reply: failure
451 452
452 453 The call failed. The `self.res` attribute contains the error message.
453 454 """
454 455 def __init__(self, res):
455 456 self.res = res
456 457
457 458 class ooberror(object):
458 459 """wireproto reply: failure of a batch of operation
459 460
460 461 Something failed during a batch call. The error message is stored in
461 462 `self.message`.
462 463 """
463 464 def __init__(self, message):
464 465 self.message = message
465 466
466 467 def dispatch(repo, proto, command):
467 468 repo = repo.filtered("served")
468 469 func, spec = commands[command]
469 470 args = proto.getargs(spec)
470 471 return func(repo, proto, *args)
471 472
472 473 def options(cmd, keys, others):
473 474 opts = {}
474 475 for k in keys:
475 476 if k in others:
476 477 opts[k] = others[k]
477 478 del others[k]
478 479 if others:
479 480 sys.stderr.write("warning: %s ignored unexpected arguments %s\n"
480 481 % (cmd, ",".join(others)))
481 482 return opts
482 483
483 484 # list of commands
484 485 commands = {}
485 486
486 487 def wireprotocommand(name, args=''):
487 488 """decorator for wire protocol command"""
488 489 def register(func):
489 490 commands[name] = (func, args)
490 491 return func
491 492 return register
492 493
493 494 @wireprotocommand('batch', 'cmds *')
494 495 def batch(repo, proto, cmds, others):
495 496 repo = repo.filtered("served")
496 497 res = []
497 498 for pair in cmds.split(';'):
498 499 op, args = pair.split(' ', 1)
499 500 vals = {}
500 501 for a in args.split(','):
501 502 if a:
502 503 n, v = a.split('=')
503 504 vals[n] = unescapearg(v)
504 505 func, spec = commands[op]
505 506 if spec:
506 507 keys = spec.split()
507 508 data = {}
508 509 for k in keys:
509 510 if k == '*':
510 511 star = {}
511 512 for key in vals.keys():
512 513 if key not in keys:
513 514 star[key] = vals[key]
514 515 data['*'] = star
515 516 else:
516 517 data[k] = vals[k]
517 518 result = func(repo, proto, *[data[k] for k in keys])
518 519 else:
519 520 result = func(repo, proto)
520 521 if isinstance(result, ooberror):
521 522 return result
522 523 res.append(escapearg(result))
523 524 return ';'.join(res)
524 525
525 526 @wireprotocommand('between', 'pairs')
526 527 def between(repo, proto, pairs):
527 528 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
528 529 r = []
529 530 for b in repo.between(pairs):
530 531 r.append(encodelist(b) + "\n")
531 532 return "".join(r)
532 533
533 534 @wireprotocommand('branchmap')
534 535 def branchmap(repo, proto):
535 536 branchmap = repo.branchmap()
536 537 heads = []
537 538 for branch, nodes in branchmap.iteritems():
538 539 branchname = urllib.quote(encoding.fromlocal(branch))
539 540 branchnodes = encodelist(nodes)
540 541 heads.append('%s %s' % (branchname, branchnodes))
541 542 return '\n'.join(heads)
542 543
543 544 @wireprotocommand('branches', 'nodes')
544 545 def branches(repo, proto, nodes):
545 546 nodes = decodelist(nodes)
546 547 r = []
547 548 for b in repo.branches(nodes):
548 549 r.append(encodelist(b) + "\n")
549 550 return "".join(r)
550 551
551 552
552 553 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
553 554 'known', 'getbundle', 'unbundlehash', 'batch']
554 555
555 556 def _capabilities(repo, proto):
556 557 """return a list of capabilities for a repo
557 558
558 559 This function exists to allow extensions to easily wrap capabilities
559 560 computation
560 561
561 562 - returns a lists: easy to alter
562 563 - change done here will be propagated to both `capabilities` and `hello`
563 564 command without any other action needed.
564 565 """
565 566 # copy to prevent modification of the global list
566 567 caps = list(wireprotocaps)
567 568 if streamclone.allowservergeneration(repo.ui):
568 569 if repo.ui.configbool('server', 'preferuncompressed', False):
569 570 caps.append('stream-preferred')
570 571 requiredformats = repo.requirements & repo.supportedformats
571 572 # if our local revlogs are just revlogv1, add 'stream' cap
572 573 if not requiredformats - set(('revlogv1',)):
573 574 caps.append('stream')
574 575 # otherwise, add 'streamreqs' detailing our local revlog format
575 576 else:
576 577 caps.append('streamreqs=%s' % ','.join(requiredformats))
577 578 if repo.ui.configbool('experimental', 'bundle2-advertise', True):
578 579 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
579 580 caps.append('bundle2=' + urllib.quote(capsblob))
580 581 caps.append('unbundle=%s' % ','.join(changegroupmod.bundlepriority))
581 582 caps.append(
582 583 'httpheader=%d' % repo.ui.configint('server', 'maxhttpheaderlen', 1024))
583 584 return caps
584 585
585 586 # If you are writing an extension and consider wrapping this function. Wrap
586 587 # `_capabilities` instead.
587 588 @wireprotocommand('capabilities')
588 589 def capabilities(repo, proto):
589 590 return ' '.join(_capabilities(repo, proto))
590 591
591 592 @wireprotocommand('changegroup', 'roots')
592 593 def changegroup(repo, proto, roots):
593 594 nodes = decodelist(roots)
594 595 cg = changegroupmod.changegroup(repo, nodes, 'serve')
595 596 return streamres(proto.groupchunks(cg))
596 597
597 598 @wireprotocommand('changegroupsubset', 'bases heads')
598 599 def changegroupsubset(repo, proto, bases, heads):
599 600 bases = decodelist(bases)
600 601 heads = decodelist(heads)
601 602 cg = changegroupmod.changegroupsubset(repo, bases, heads, 'serve')
602 603 return streamres(proto.groupchunks(cg))
603 604
604 605 @wireprotocommand('debugwireargs', 'one two *')
605 606 def debugwireargs(repo, proto, one, two, others):
606 607 # only accept optional args from the known set
607 608 opts = options('debugwireargs', ['three', 'four'], others)
608 609 return repo.debugwireargs(one, two, **opts)
609 610
610 611 # List of options accepted by getbundle.
611 612 #
612 613 # Meant to be extended by extensions. It is the extension's responsibility to
613 614 # ensure such options are properly processed in exchange.getbundle.
614 615 gboptslist = ['heads', 'common', 'bundlecaps']
615 616
616 617 @wireprotocommand('getbundle', '*')
617 618 def getbundle(repo, proto, others):
618 619 opts = options('getbundle', gboptsmap.keys(), others)
619 620 for k, v in opts.iteritems():
620 621 keytype = gboptsmap[k]
621 622 if keytype == 'nodes':
622 623 opts[k] = decodelist(v)
623 624 elif keytype == 'csv':
624 625 opts[k] = list(v.split(','))
625 626 elif keytype == 'scsv':
626 627 opts[k] = set(v.split(','))
627 628 elif keytype == 'boolean':
628 629 # Client should serialize False as '0', which is a non-empty string
629 630 # so it evaluates as a True bool.
630 631 if v == '0':
631 632 opts[k] = False
632 633 else:
633 634 opts[k] = bool(v)
634 635 elif keytype != 'plain':
635 636 raise KeyError('unknown getbundle option type %s'
636 637 % keytype)
637 638 cg = exchange.getbundle(repo, 'serve', **opts)
638 639 return streamres(proto.groupchunks(cg))
639 640
640 641 @wireprotocommand('heads')
641 642 def heads(repo, proto):
642 643 h = repo.heads()
643 644 return encodelist(h) + "\n"
644 645
645 646 @wireprotocommand('hello')
646 647 def hello(repo, proto):
647 648 '''the hello command returns a set of lines describing various
648 649 interesting things about the server, in an RFC822-like format.
649 650 Currently the only one defined is "capabilities", which
650 651 consists of a line in the form:
651 652
652 653 capabilities: space separated list of tokens
653 654 '''
654 655 return "capabilities: %s\n" % (capabilities(repo, proto))
655 656
656 657 @wireprotocommand('listkeys', 'namespace')
657 658 def listkeys(repo, proto, namespace):
658 659 d = repo.listkeys(encoding.tolocal(namespace)).items()
659 660 return pushkeymod.encodekeys(d)
660 661
661 662 @wireprotocommand('lookup', 'key')
662 663 def lookup(repo, proto, key):
663 664 try:
664 665 k = encoding.tolocal(key)
665 666 c = repo[k]
666 667 r = c.hex()
667 668 success = 1
668 669 except Exception as inst:
669 670 r = str(inst)
670 671 success = 0
671 672 return "%s %s\n" % (success, r)
672 673
673 674 @wireprotocommand('known', 'nodes *')
674 675 def known(repo, proto, nodes, others):
675 676 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
676 677
677 678 @wireprotocommand('pushkey', 'namespace key old new')
678 679 def pushkey(repo, proto, namespace, key, old, new):
679 680 # compatibility with pre-1.8 clients which were accidentally
680 681 # sending raw binary nodes rather than utf-8-encoded hex
681 682 if len(new) == 20 and new.encode('string-escape') != new:
682 683 # looks like it could be a binary node
683 684 try:
684 685 new.decode('utf-8')
685 686 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
686 687 except UnicodeDecodeError:
687 688 pass # binary, leave unmodified
688 689 else:
689 690 new = encoding.tolocal(new) # normal path
690 691
691 692 if util.safehasattr(proto, 'restore'):
692 693
693 694 proto.redirect()
694 695
695 696 try:
696 697 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
697 698 encoding.tolocal(old), new) or False
698 699 except error.Abort:
699 700 r = False
700 701
701 702 output = proto.restore()
702 703
703 704 return '%s\n%s' % (int(r), output)
704 705
705 706 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
706 707 encoding.tolocal(old), new)
707 708 return '%s\n' % int(r)
708 709
709 710 @wireprotocommand('stream_out')
710 711 def stream(repo, proto):
711 712 '''If the server supports streaming clone, it advertises the "stream"
712 713 capability with a value representing the version and flags of the repo
713 714 it is serving. Client checks to see if it understands the format.
714 715 '''
715 716 if not streamclone.allowservergeneration(repo.ui):
716 717 return '1\n'
717 718
718 719 def getstream(it):
719 720 yield '0\n'
720 721 for chunk in it:
721 722 yield chunk
722 723
723 724 try:
724 725 # LockError may be raised before the first result is yielded. Don't
725 726 # emit output until we're sure we got the lock successfully.
726 727 it = streamclone.generatev1wireproto(repo)
727 728 return streamres(getstream(it))
728 729 except error.LockError:
729 730 return '2\n'
730 731
731 732 @wireprotocommand('unbundle', 'heads')
732 733 def unbundle(repo, proto, heads):
733 734 their_heads = decodelist(heads)
734 735
735 736 try:
736 737 proto.redirect()
737 738
738 739 exchange.check_heads(repo, their_heads, 'preparing changes')
739 740
740 741 # write bundle data to temporary file because it can be big
741 742 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
742 743 fp = os.fdopen(fd, 'wb+')
743 744 r = 0
744 745 try:
745 746 proto.getfile(fp)
746 747 fp.seek(0)
747 748 gen = exchange.readbundle(repo.ui, fp, None)
748 749 r = exchange.unbundle(repo, gen, their_heads, 'serve',
749 750 proto._client())
750 751 if util.safehasattr(r, 'addpart'):
751 752 # The return looks streamable, we are in the bundle2 case and
752 753 # should return a stream.
753 754 return streamres(r.getchunks())
754 755 return pushres(r)
755 756
756 757 finally:
757 758 fp.close()
758 759 os.unlink(tempname)
759 760
760 761 except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
761 762 # handle non-bundle2 case first
762 763 if not getattr(exc, 'duringunbundle2', False):
763 764 try:
764 765 raise
765 766 except error.Abort:
766 767 # The old code we moved used sys.stderr directly.
767 768 # We did not change it to minimise code change.
768 769 # This need to be moved to something proper.
769 770 # Feel free to do it.
770 771 sys.stderr.write("abort: %s\n" % exc)
771 772 return pushres(0)
772 773 except error.PushRaced:
773 774 return pusherr(str(exc))
774 775
775 776 bundler = bundle2.bundle20(repo.ui)
776 777 for out in getattr(exc, '_bundle2salvagedoutput', ()):
777 778 bundler.addpart(out)
778 779 try:
779 780 try:
780 781 raise
781 782 except error.PushkeyFailed as exc:
782 783 # check client caps
783 784 remotecaps = getattr(exc, '_replycaps', None)
784 785 if (remotecaps is not None
785 786 and 'pushkey' not in remotecaps.get('error', ())):
786 787 # no support remote side, fallback to Abort handler.
787 788 raise
788 789 part = bundler.newpart('error:pushkey')
789 790 part.addparam('in-reply-to', exc.partid)
790 791 if exc.namespace is not None:
791 792 part.addparam('namespace', exc.namespace, mandatory=False)
792 793 if exc.key is not None:
793 794 part.addparam('key', exc.key, mandatory=False)
794 795 if exc.new is not None:
795 796 part.addparam('new', exc.new, mandatory=False)
796 797 if exc.old is not None:
797 798 part.addparam('old', exc.old, mandatory=False)
798 799 if exc.ret is not None:
799 800 part.addparam('ret', exc.ret, mandatory=False)
800 801 except error.BundleValueError as exc:
801 802 errpart = bundler.newpart('error:unsupportedcontent')
802 803 if exc.parttype is not None:
803 804 errpart.addparam('parttype', exc.parttype)
804 805 if exc.params:
805 806 errpart.addparam('params', '\0'.join(exc.params))
806 807 except error.Abort as exc:
807 808 manargs = [('message', str(exc))]
808 809 advargs = []
809 810 if exc.hint is not None:
810 811 advargs.append(('hint', exc.hint))
811 812 bundler.addpart(bundle2.bundlepart('error:abort',
812 813 manargs, advargs))
813 814 except error.PushRaced as exc:
814 815 bundler.newpart('error:pushraced', [('message', str(exc))])
815 816 return streamres(bundler.getchunks())
General Comments 0
You need to be logged in to leave comments. Login now