##// END OF EJS Templates
push: make discovery extensible...
Pierre-Yves David -
r22018:ddb56e7e default
parent child Browse files
Show More
@@ -1,848 +1,881
1 1 # exchange.py - utility to exchange data between repos.
2 2 #
3 3 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from i18n import _
9 9 from node import hex, nullid
10 10 import errno, urllib
11 11 import util, scmutil, changegroup, base85, error
12 12 import discovery, phases, obsolete, bookmarks, bundle2, pushkey
13 13
14 14 def readbundle(ui, fh, fname, vfs=None):
15 15 header = changegroup.readexactly(fh, 4)
16 16
17 17 alg = None
18 18 if not fname:
19 19 fname = "stream"
20 20 if not header.startswith('HG') and header.startswith('\0'):
21 21 fh = changegroup.headerlessfixup(fh, header)
22 22 header = "HG10"
23 23 alg = 'UN'
24 24 elif vfs:
25 25 fname = vfs.join(fname)
26 26
27 27 magic, version = header[0:2], header[2:4]
28 28
29 29 if magic != 'HG':
30 30 raise util.Abort(_('%s: not a Mercurial bundle') % fname)
31 31 if version == '10':
32 32 if alg is None:
33 33 alg = changegroup.readexactly(fh, 2)
34 34 return changegroup.unbundle10(fh, alg)
35 35 elif version == '2X':
36 36 return bundle2.unbundle20(ui, fh, header=magic + version)
37 37 else:
38 38 raise util.Abort(_('%s: unknown bundle version %s') % (fname, version))
39 39
40 40
41 41 class pushoperation(object):
42 42 """A object that represent a single push operation
43 43
44 44 It purpose is to carry push related state and very common operation.
45 45
46 46 A new should be created at the beginning of each push and discarded
47 47 afterward.
48 48 """
49 49
50 50 def __init__(self, repo, remote, force=False, revs=None, newbranch=False):
51 51 # repo we push from
52 52 self.repo = repo
53 53 self.ui = repo.ui
54 54 # repo we push to
55 55 self.remote = remote
56 56 # force option provided
57 57 self.force = force
58 58 # revs to be pushed (None is "all")
59 59 self.revs = revs
60 60 # allow push of new branch
61 61 self.newbranch = newbranch
62 62 # did a local lock get acquired?
63 63 self.locallocked = None
64 64 # step already performed
65 65 # (used to check what steps have been already performed through bundle2)
66 66 self.stepsdone = set()
67 67 # Integer version of the push result
68 68 # - None means nothing to push
69 69 # - 0 means HTTP error
70 70 # - 1 means we pushed and remote head count is unchanged *or*
71 71 # we have outgoing changesets but refused to push
72 72 # - other values as described by addchangegroup()
73 73 self.ret = None
74 74 # discover.outgoing object (contains common and outgoing data)
75 75 self.outgoing = None
76 76 # all remote heads before the push
77 77 self.remoteheads = None
78 78 # testable as a boolean indicating if any nodes are missing locally.
79 79 self.incoming = None
80 80
81 81 @util.propertycache
82 82 def futureheads(self):
83 83 """future remote heads if the changeset push succeeds"""
84 84 return self.outgoing.missingheads
85 85
86 86 @util.propertycache
87 87 def fallbackheads(self):
88 88 """future remote heads if the changeset push fails"""
89 89 if self.revs is None:
90 90 # not target to push, all common are relevant
91 91 return self.outgoing.commonheads
92 92 unfi = self.repo.unfiltered()
93 93 # I want cheads = heads(::missingheads and ::commonheads)
94 94 # (missingheads is revs with secret changeset filtered out)
95 95 #
96 96 # This can be expressed as:
97 97 # cheads = ( (missingheads and ::commonheads)
98 98 # + (commonheads and ::missingheads))"
99 99 # )
100 100 #
101 101 # while trying to push we already computed the following:
102 102 # common = (::commonheads)
103 103 # missing = ((commonheads::missingheads) - commonheads)
104 104 #
105 105 # We can pick:
106 106 # * missingheads part of common (::commonheads)
107 107 common = set(self.outgoing.common)
108 108 nm = self.repo.changelog.nodemap
109 109 cheads = [node for node in self.revs if nm[node] in common]
110 110 # and
111 111 # * commonheads parents on missing
112 112 revset = unfi.set('%ln and parents(roots(%ln))',
113 113 self.outgoing.commonheads,
114 114 self.outgoing.missing)
115 115 cheads.extend(c.node() for c in revset)
116 116 return cheads
117 117
118 118 @property
119 119 def commonheads(self):
120 120 """set of all common heads after changeset bundle push"""
121 121 if self.ret:
122 122 return self.futureheads
123 123 else:
124 124 return self.fallbackheads
125 125
126 126 def push(repo, remote, force=False, revs=None, newbranch=False):
127 127 '''Push outgoing changesets (limited by revs) from a local
128 128 repository to remote. Return an integer:
129 129 - None means nothing to push
130 130 - 0 means HTTP error
131 131 - 1 means we pushed and remote head count is unchanged *or*
132 132 we have outgoing changesets but refused to push
133 133 - other values as described by addchangegroup()
134 134 '''
135 135 pushop = pushoperation(repo, remote, force, revs, newbranch)
136 136 if pushop.remote.local():
137 137 missing = (set(pushop.repo.requirements)
138 138 - pushop.remote.local().supported)
139 139 if missing:
140 140 msg = _("required features are not"
141 141 " supported in the destination:"
142 142 " %s") % (', '.join(sorted(missing)))
143 143 raise util.Abort(msg)
144 144
145 145 # there are two ways to push to remote repo:
146 146 #
147 147 # addchangegroup assumes local user can lock remote
148 148 # repo (local filesystem, old ssh servers).
149 149 #
150 150 # unbundle assumes local user cannot lock remote repo (new ssh
151 151 # servers, http servers).
152 152
153 153 if not pushop.remote.canpush():
154 154 raise util.Abort(_("destination does not support push"))
155 155 # get local lock as we might write phase data
156 156 locallock = None
157 157 try:
158 158 locallock = pushop.repo.lock()
159 159 pushop.locallocked = True
160 160 except IOError, err:
161 161 pushop.locallocked = False
162 162 if err.errno != errno.EACCES:
163 163 raise
164 164 # source repo cannot be locked.
165 165 # We do not abort the push, but just disable the local phase
166 166 # synchronisation.
167 167 msg = 'cannot lock source repository: %s\n' % err
168 168 pushop.ui.debug(msg)
169 169 try:
170 170 pushop.repo.checkpush(pushop)
171 171 lock = None
172 172 unbundle = pushop.remote.capable('unbundle')
173 173 if not unbundle:
174 174 lock = pushop.remote.lock()
175 175 try:
176 176 _pushdiscovery(pushop)
177 177 if (pushop.repo.ui.configbool('experimental', 'bundle2-exp',
178 178 False)
179 179 and pushop.remote.capable('bundle2-exp')):
180 180 _pushbundle2(pushop)
181 181 _pushchangeset(pushop)
182 182 _pushsyncphase(pushop)
183 183 _pushobsolete(pushop)
184 184 finally:
185 185 if lock is not None:
186 186 lock.release()
187 187 finally:
188 188 if locallock is not None:
189 189 locallock.release()
190 190
191 191 _pushbookmark(pushop)
192 192 return pushop.ret
193 193
194 # list of steps to perform discovery before push
195 pushdiscoveryorder = []
196
197 # Mapping between step name and function
198 #
199 # This exists to help extensions wrap steps if necessary
200 pushdiscoverymapping = {}
201
202 def pushdiscovery(stepname):
203 """decorator for function performing discovery before push
204
205 The function is added to the step -> function mapping and appended to the
206 list of steps. Beware that decorated function will be added in order (this
207 may matter).
208
209 You can only use this decorator for a new step, if you want to wrap a step
210 from an extension, change the pushdiscovery dictionary directly."""
211 def dec(func):
212 assert stepname not in pushdiscoverymapping
213 pushdiscoverymapping[stepname] = func
214 pushdiscoveryorder.append(stepname)
215 return func
216 return dec
217
218
219
194 220 def _pushdiscovery(pushop):
195 # discovery
221 """Run all discovery steps"""
222 for stepname in pushdiscoveryorder:
223 step = pushdiscoverymapping[stepname]
224 step(pushop)
225
226 @pushdiscovery('changeset')
227 def _pushdiscoverychangeset(pushop):
228 """discover the changeset that need to be pushed"""
196 229 unfi = pushop.repo.unfiltered()
197 230 fci = discovery.findcommonincoming
198 231 commoninc = fci(unfi, pushop.remote, force=pushop.force)
199 232 common, inc, remoteheads = commoninc
200 233 fco = discovery.findcommonoutgoing
201 234 outgoing = fco(unfi, pushop.remote, onlyheads=pushop.revs,
202 235 commoninc=commoninc, force=pushop.force)
203 236 pushop.outgoing = outgoing
204 237 pushop.remoteheads = remoteheads
205 238 pushop.incoming = inc
206 239
207 240 def _pushcheckoutgoing(pushop):
208 241 outgoing = pushop.outgoing
209 242 unfi = pushop.repo.unfiltered()
210 243 if not outgoing.missing:
211 244 # nothing to push
212 245 scmutil.nochangesfound(unfi.ui, unfi, outgoing.excluded)
213 246 return False
214 247 # something to push
215 248 if not pushop.force:
216 249 # if repo.obsstore == False --> no obsolete
217 250 # then, save the iteration
218 251 if unfi.obsstore:
219 252 # this message are here for 80 char limit reason
220 253 mso = _("push includes obsolete changeset: %s!")
221 254 mst = "push includes %s changeset: %s!"
222 255 # plain versions for i18n tool to detect them
223 256 _("push includes unstable changeset: %s!")
224 257 _("push includes bumped changeset: %s!")
225 258 _("push includes divergent changeset: %s!")
226 259 # If we are to push if there is at least one
227 260 # obsolete or unstable changeset in missing, at
228 261 # least one of the missinghead will be obsolete or
229 262 # unstable. So checking heads only is ok
230 263 for node in outgoing.missingheads:
231 264 ctx = unfi[node]
232 265 if ctx.obsolete():
233 266 raise util.Abort(mso % ctx)
234 267 elif ctx.troubled():
235 268 raise util.Abort(_(mst)
236 269 % (ctx.troubles()[0],
237 270 ctx))
238 271 newbm = pushop.ui.configlist('bookmarks', 'pushing')
239 272 discovery.checkheads(unfi, pushop.remote, outgoing,
240 273 pushop.remoteheads,
241 274 pushop.newbranch,
242 275 bool(pushop.incoming),
243 276 newbm)
244 277 return True
245 278
246 279 # List of names of steps to perform for an outgoing bundle2, order matters.
247 280 b2partsgenorder = []
248 281
249 282 # Mapping between step name and function
250 283 #
251 284 # This exists to help extensions wrap steps if necessary
252 285 b2partsgenmapping = {}
253 286
254 287 def b2partsgenerator(stepname):
255 288 """decorator for function generating bundle2 part
256 289
257 290 The function is added to the step -> function mapping and appended to the
258 291 list of steps. Beware that decorated functions will be added in order
259 292 (this may matter).
260 293
261 294 You can only use this decorator for new steps, if you want to wrap a step
262 295 from an extension, attack the b2partsgenmapping dictionary directly."""
263 296 def dec(func):
264 297 assert stepname not in b2partsgenmapping
265 298 b2partsgenmapping[stepname] = func
266 299 b2partsgenorder.append(stepname)
267 300 return func
268 301 return dec
269 302
270 303 @b2partsgenerator('changeset')
271 304 def _pushb2ctx(pushop, bundler):
272 305 """handle changegroup push through bundle2
273 306
274 307 addchangegroup result is stored in the ``pushop.ret`` attribute.
275 308 """
276 309 if 'changesets' in pushop.stepsdone:
277 310 return
278 311 pushop.stepsdone.add('changesets')
279 312 # Send known heads to the server for race detection.
280 313 pushop.stepsdone.add('changesets')
281 314 if not _pushcheckoutgoing(pushop):
282 315 return
283 316 pushop.repo.prepushoutgoinghooks(pushop.repo,
284 317 pushop.remote,
285 318 pushop.outgoing)
286 319 if not pushop.force:
287 320 bundler.newpart('B2X:CHECK:HEADS', data=iter(pushop.remoteheads))
288 321 cg = changegroup.getlocalbundle(pushop.repo, 'push', pushop.outgoing)
289 322 cgpart = bundler.newpart('B2X:CHANGEGROUP', data=cg.getchunks())
290 323 def handlereply(op):
291 324 """extract addchangroup returns from server reply"""
292 325 cgreplies = op.records.getreplies(cgpart.id)
293 326 assert len(cgreplies['changegroup']) == 1
294 327 pushop.ret = cgreplies['changegroup'][0]['return']
295 328 return handlereply
296 329
297 330
298 331 def _pushbundle2(pushop):
299 332 """push data to the remote using bundle2
300 333
301 334 The only currently supported type of data is changegroup but this will
302 335 evolve in the future."""
303 336 bundler = bundle2.bundle20(pushop.ui, bundle2.bundle2caps(pushop.remote))
304 337 # create reply capability
305 338 capsblob = bundle2.encodecaps(pushop.repo.bundle2caps)
306 339 bundler.newpart('b2x:replycaps', data=capsblob)
307 340 replyhandlers = []
308 341 for partgenname in b2partsgenorder:
309 342 partgen = b2partsgenmapping[partgenname]
310 343 ret = partgen(pushop, bundler)
311 344 if callable(ret):
312 345 replyhandlers.append(ret)
313 346 # do not push if nothing to push
314 347 if bundler.nbparts <= 1:
315 348 return
316 349 stream = util.chunkbuffer(bundler.getchunks())
317 350 try:
318 351 reply = pushop.remote.unbundle(stream, ['force'], 'push')
319 352 except error.BundleValueError, exc:
320 353 raise util.Abort('missing support for %s' % exc)
321 354 try:
322 355 op = bundle2.processbundle(pushop.repo, reply)
323 356 except error.BundleValueError, exc:
324 357 raise util.Abort('missing support for %s' % exc)
325 358 for rephand in replyhandlers:
326 359 rephand(op)
327 360
328 361 def _pushchangeset(pushop):
329 362 """Make the actual push of changeset bundle to remote repo"""
330 363 if 'changesets' in pushop.stepsdone:
331 364 return
332 365 pushop.stepsdone.add('changesets')
333 366 if not _pushcheckoutgoing(pushop):
334 367 return
335 368 pushop.repo.prepushoutgoinghooks(pushop.repo,
336 369 pushop.remote,
337 370 pushop.outgoing)
338 371 outgoing = pushop.outgoing
339 372 unbundle = pushop.remote.capable('unbundle')
340 373 # TODO: get bundlecaps from remote
341 374 bundlecaps = None
342 375 # create a changegroup from local
343 376 if pushop.revs is None and not (outgoing.excluded
344 377 or pushop.repo.changelog.filteredrevs):
345 378 # push everything,
346 379 # use the fast path, no race possible on push
347 380 bundler = changegroup.bundle10(pushop.repo, bundlecaps)
348 381 cg = changegroup.getsubset(pushop.repo,
349 382 outgoing,
350 383 bundler,
351 384 'push',
352 385 fastpath=True)
353 386 else:
354 387 cg = changegroup.getlocalbundle(pushop.repo, 'push', outgoing,
355 388 bundlecaps)
356 389
357 390 # apply changegroup to remote
358 391 if unbundle:
359 392 # local repo finds heads on server, finds out what
360 393 # revs it must push. once revs transferred, if server
361 394 # finds it has different heads (someone else won
362 395 # commit/push race), server aborts.
363 396 if pushop.force:
364 397 remoteheads = ['force']
365 398 else:
366 399 remoteheads = pushop.remoteheads
367 400 # ssh: return remote's addchangegroup()
368 401 # http: return remote's addchangegroup() or 0 for error
369 402 pushop.ret = pushop.remote.unbundle(cg, remoteheads,
370 403 pushop.repo.url())
371 404 else:
372 405 # we return an integer indicating remote head count
373 406 # change
374 407 pushop.ret = pushop.remote.addchangegroup(cg, 'push', pushop.repo.url())
375 408
376 409 def _pushsyncphase(pushop):
377 410 """synchronise phase information locally and remotely"""
378 411 unfi = pushop.repo.unfiltered()
379 412 cheads = pushop.commonheads
380 413 # even when we don't push, exchanging phase data is useful
381 414 remotephases = pushop.remote.listkeys('phases')
382 415 if (pushop.ui.configbool('ui', '_usedassubrepo', False)
383 416 and remotephases # server supports phases
384 417 and pushop.ret is None # nothing was pushed
385 418 and remotephases.get('publishing', False)):
386 419 # When:
387 420 # - this is a subrepo push
388 421 # - and remote support phase
389 422 # - and no changeset was pushed
390 423 # - and remote is publishing
391 424 # We may be in issue 3871 case!
392 425 # We drop the possible phase synchronisation done by
393 426 # courtesy to publish changesets possibly locally draft
394 427 # on the remote.
395 428 remotephases = {'publishing': 'True'}
396 429 if not remotephases: # old server or public only reply from non-publishing
397 430 _localphasemove(pushop, cheads)
398 431 # don't push any phase data as there is nothing to push
399 432 else:
400 433 ana = phases.analyzeremotephases(pushop.repo, cheads,
401 434 remotephases)
402 435 pheads, droots = ana
403 436 ### Apply remote phase on local
404 437 if remotephases.get('publishing', False):
405 438 _localphasemove(pushop, cheads)
406 439 else: # publish = False
407 440 _localphasemove(pushop, pheads)
408 441 _localphasemove(pushop, cheads, phases.draft)
409 442 ### Apply local phase on remote
410 443
411 444 # Get the list of all revs draft on remote by public here.
412 445 # XXX Beware that revset break if droots is not strictly
413 446 # XXX root we may want to ensure it is but it is costly
414 447 outdated = unfi.set('heads((%ln::%ln) and public())',
415 448 droots, cheads)
416 449
417 450 b2caps = bundle2.bundle2caps(pushop.remote)
418 451 if 'b2x:pushkey' in b2caps:
419 452 # server supports bundle2, let's do a batched push through it
420 453 #
421 454 # This will eventually be unified with the changesets bundle2 push
422 455 bundler = bundle2.bundle20(pushop.ui, b2caps)
423 456 capsblob = bundle2.encodecaps(pushop.repo.bundle2caps)
424 457 bundler.newpart('b2x:replycaps', data=capsblob)
425 458 part2node = []
426 459 enc = pushkey.encode
427 460 for newremotehead in outdated:
428 461 part = bundler.newpart('b2x:pushkey')
429 462 part.addparam('namespace', enc('phases'))
430 463 part.addparam('key', enc(newremotehead.hex()))
431 464 part.addparam('old', enc(str(phases.draft)))
432 465 part.addparam('new', enc(str(phases.public)))
433 466 part2node.append((part.id, newremotehead))
434 467 stream = util.chunkbuffer(bundler.getchunks())
435 468 try:
436 469 reply = pushop.remote.unbundle(stream, ['force'], 'push')
437 470 op = bundle2.processbundle(pushop.repo, reply)
438 471 except error.BundleValueError, exc:
439 472 raise util.Abort('missing support for %s' % exc)
440 473 for partid, node in part2node:
441 474 partrep = op.records.getreplies(partid)
442 475 results = partrep['pushkey']
443 476 assert len(results) <= 1
444 477 msg = None
445 478 if not results:
446 479 msg = _('server ignored update of %s to public!\n') % node
447 480 elif not int(results[0]['return']):
448 481 msg = _('updating %s to public failed!\n') % node
449 482 if msg is not None:
450 483 pushop.ui.warn(msg)
451 484
452 485 else:
453 486 # fallback to independant pushkey command
454 487 for newremotehead in outdated:
455 488 r = pushop.remote.pushkey('phases',
456 489 newremotehead.hex(),
457 490 str(phases.draft),
458 491 str(phases.public))
459 492 if not r:
460 493 pushop.ui.warn(_('updating %s to public failed!\n')
461 494 % newremotehead)
462 495
463 496 def _localphasemove(pushop, nodes, phase=phases.public):
464 497 """move <nodes> to <phase> in the local source repo"""
465 498 if pushop.locallocked:
466 499 phases.advanceboundary(pushop.repo, phase, nodes)
467 500 else:
468 501 # repo is not locked, do not change any phases!
469 502 # Informs the user that phases should have been moved when
470 503 # applicable.
471 504 actualmoves = [n for n in nodes if phase < pushop.repo[n].phase()]
472 505 phasestr = phases.phasenames[phase]
473 506 if actualmoves:
474 507 pushop.ui.status(_('cannot lock source repo, skipping '
475 508 'local %s phase update\n') % phasestr)
476 509
477 510 def _pushobsolete(pushop):
478 511 """utility function to push obsolete markers to a remote"""
479 512 pushop.ui.debug('try to push obsolete markers to remote\n')
480 513 repo = pushop.repo
481 514 remote = pushop.remote
482 515 if (obsolete._enabled and repo.obsstore and
483 516 'obsolete' in remote.listkeys('namespaces')):
484 517 rslts = []
485 518 remotedata = repo.listkeys('obsolete')
486 519 for key in sorted(remotedata, reverse=True):
487 520 # reverse sort to ensure we end with dump0
488 521 data = remotedata[key]
489 522 rslts.append(remote.pushkey('obsolete', key, '', data))
490 523 if [r for r in rslts if not r]:
491 524 msg = _('failed to push some obsolete markers!\n')
492 525 repo.ui.warn(msg)
493 526
494 527 def _pushbookmark(pushop):
495 528 """Update bookmark position on remote"""
496 529 ui = pushop.ui
497 530 repo = pushop.repo.unfiltered()
498 531 remote = pushop.remote
499 532 ui.debug("checking for updated bookmarks\n")
500 533 revnums = map(repo.changelog.rev, pushop.revs or [])
501 534 ancestors = [a for a in repo.changelog.ancestors(revnums, inclusive=True)]
502 535 (addsrc, adddst, advsrc, advdst, diverge, differ, invalid
503 536 ) = bookmarks.compare(repo, repo._bookmarks, remote.listkeys('bookmarks'),
504 537 srchex=hex)
505 538
506 539 for b, scid, dcid in advsrc:
507 540 if ancestors and repo[scid].rev() not in ancestors:
508 541 continue
509 542 if remote.pushkey('bookmarks', b, dcid, scid):
510 543 ui.status(_("updating bookmark %s\n") % b)
511 544 else:
512 545 ui.warn(_('updating bookmark %s failed!\n') % b)
513 546
514 547 class pulloperation(object):
515 548 """A object that represent a single pull operation
516 549
517 550 It purpose is to carry push related state and very common operation.
518 551
519 552 A new should be created at the beginning of each pull and discarded
520 553 afterward.
521 554 """
522 555
523 556 def __init__(self, repo, remote, heads=None, force=False):
524 557 # repo we pull into
525 558 self.repo = repo
526 559 # repo we pull from
527 560 self.remote = remote
528 561 # revision we try to pull (None is "all")
529 562 self.heads = heads
530 563 # do we force pull?
531 564 self.force = force
532 565 # the name the pull transaction
533 566 self._trname = 'pull\n' + util.hidepassword(remote.url())
534 567 # hold the transaction once created
535 568 self._tr = None
536 569 # set of common changeset between local and remote before pull
537 570 self.common = None
538 571 # set of pulled head
539 572 self.rheads = None
540 573 # list of missing changeset to fetch remotely
541 574 self.fetch = None
542 575 # result of changegroup pulling (used as return code by pull)
543 576 self.cgresult = None
544 577 # list of step remaining todo (related to future bundle2 usage)
545 578 self.todosteps = set(['changegroup', 'phases', 'obsmarkers'])
546 579
547 580 @util.propertycache
548 581 def pulledsubset(self):
549 582 """heads of the set of changeset target by the pull"""
550 583 # compute target subset
551 584 if self.heads is None:
552 585 # We pulled every thing possible
553 586 # sync on everything common
554 587 c = set(self.common)
555 588 ret = list(self.common)
556 589 for n in self.rheads:
557 590 if n not in c:
558 591 ret.append(n)
559 592 return ret
560 593 else:
561 594 # We pulled a specific subset
562 595 # sync on this subset
563 596 return self.heads
564 597
565 598 def gettransaction(self):
566 599 """get appropriate pull transaction, creating it if needed"""
567 600 if self._tr is None:
568 601 self._tr = self.repo.transaction(self._trname)
569 602 return self._tr
570 603
571 604 def closetransaction(self):
572 605 """close transaction if created"""
573 606 if self._tr is not None:
574 607 self._tr.close()
575 608
576 609 def releasetransaction(self):
577 610 """release transaction if created"""
578 611 if self._tr is not None:
579 612 self._tr.release()
580 613
581 614 def pull(repo, remote, heads=None, force=False):
582 615 pullop = pulloperation(repo, remote, heads, force)
583 616 if pullop.remote.local():
584 617 missing = set(pullop.remote.requirements) - pullop.repo.supported
585 618 if missing:
586 619 msg = _("required features are not"
587 620 " supported in the destination:"
588 621 " %s") % (', '.join(sorted(missing)))
589 622 raise util.Abort(msg)
590 623
591 624 lock = pullop.repo.lock()
592 625 try:
593 626 _pulldiscovery(pullop)
594 627 if (pullop.repo.ui.configbool('experimental', 'bundle2-exp', False)
595 628 and pullop.remote.capable('bundle2-exp')):
596 629 _pullbundle2(pullop)
597 630 if 'changegroup' in pullop.todosteps:
598 631 _pullchangeset(pullop)
599 632 if 'phases' in pullop.todosteps:
600 633 _pullphase(pullop)
601 634 if 'obsmarkers' in pullop.todosteps:
602 635 _pullobsolete(pullop)
603 636 pullop.closetransaction()
604 637 finally:
605 638 pullop.releasetransaction()
606 639 lock.release()
607 640
608 641 return pullop.cgresult
609 642
610 643 def _pulldiscovery(pullop):
611 644 """discovery phase for the pull
612 645
613 646 Current handle changeset discovery only, will change handle all discovery
614 647 at some point."""
615 648 tmp = discovery.findcommonincoming(pullop.repo.unfiltered(),
616 649 pullop.remote,
617 650 heads=pullop.heads,
618 651 force=pullop.force)
619 652 pullop.common, pullop.fetch, pullop.rheads = tmp
620 653
621 654 def _pullbundle2(pullop):
622 655 """pull data using bundle2
623 656
624 657 For now, the only supported data are changegroup."""
625 658 remotecaps = bundle2.bundle2caps(pullop.remote)
626 659 kwargs = {'bundlecaps': caps20to10(pullop.repo)}
627 660 # pulling changegroup
628 661 pullop.todosteps.remove('changegroup')
629 662
630 663 kwargs['common'] = pullop.common
631 664 kwargs['heads'] = pullop.heads or pullop.rheads
632 665 if 'b2x:listkeys' in remotecaps:
633 666 kwargs['listkeys'] = ['phase']
634 667 if not pullop.fetch:
635 668 pullop.repo.ui.status(_("no changes found\n"))
636 669 pullop.cgresult = 0
637 670 else:
638 671 if pullop.heads is None and list(pullop.common) == [nullid]:
639 672 pullop.repo.ui.status(_("requesting all changes\n"))
640 673 _pullbundle2extraprepare(pullop, kwargs)
641 674 if kwargs.keys() == ['format']:
642 675 return # nothing to pull
643 676 bundle = pullop.remote.getbundle('pull', **kwargs)
644 677 try:
645 678 op = bundle2.processbundle(pullop.repo, bundle, pullop.gettransaction)
646 679 except error.BundleValueError, exc:
647 680 raise util.Abort('missing support for %s' % exc)
648 681
649 682 if pullop.fetch:
650 683 assert len(op.records['changegroup']) == 1
651 684 pullop.cgresult = op.records['changegroup'][0]['return']
652 685
653 686 # processing phases change
654 687 for namespace, value in op.records['listkeys']:
655 688 if namespace == 'phases':
656 689 _pullapplyphases(pullop, value)
657 690
658 691 def _pullbundle2extraprepare(pullop, kwargs):
659 692 """hook function so that extensions can extend the getbundle call"""
660 693 pass
661 694
662 695 def _pullchangeset(pullop):
663 696 """pull changeset from unbundle into the local repo"""
664 697 # We delay the open of the transaction as late as possible so we
665 698 # don't open transaction for nothing or you break future useful
666 699 # rollback call
667 700 pullop.todosteps.remove('changegroup')
668 701 if not pullop.fetch:
669 702 pullop.repo.ui.status(_("no changes found\n"))
670 703 pullop.cgresult = 0
671 704 return
672 705 pullop.gettransaction()
673 706 if pullop.heads is None and list(pullop.common) == [nullid]:
674 707 pullop.repo.ui.status(_("requesting all changes\n"))
675 708 elif pullop.heads is None and pullop.remote.capable('changegroupsubset'):
676 709 # issue1320, avoid a race if remote changed after discovery
677 710 pullop.heads = pullop.rheads
678 711
679 712 if pullop.remote.capable('getbundle'):
680 713 # TODO: get bundlecaps from remote
681 714 cg = pullop.remote.getbundle('pull', common=pullop.common,
682 715 heads=pullop.heads or pullop.rheads)
683 716 elif pullop.heads is None:
684 717 cg = pullop.remote.changegroup(pullop.fetch, 'pull')
685 718 elif not pullop.remote.capable('changegroupsubset'):
686 719 raise util.Abort(_("partial pull cannot be done because "
687 720 "other repository doesn't support "
688 721 "changegroupsubset."))
689 722 else:
690 723 cg = pullop.remote.changegroupsubset(pullop.fetch, pullop.heads, 'pull')
691 724 pullop.cgresult = changegroup.addchangegroup(pullop.repo, cg, 'pull',
692 725 pullop.remote.url())
693 726
694 727 def _pullphase(pullop):
695 728 # Get remote phases data from remote
696 729 remotephases = pullop.remote.listkeys('phases')
697 730 _pullapplyphases(pullop, remotephases)
698 731
699 732 def _pullapplyphases(pullop, remotephases):
700 733 """apply phase movement from observed remote state"""
701 734 pullop.todosteps.remove('phases')
702 735 publishing = bool(remotephases.get('publishing', False))
703 736 if remotephases and not publishing:
704 737 # remote is new and unpublishing
705 738 pheads, _dr = phases.analyzeremotephases(pullop.repo,
706 739 pullop.pulledsubset,
707 740 remotephases)
708 741 phases.advanceboundary(pullop.repo, phases.public, pheads)
709 742 phases.advanceboundary(pullop.repo, phases.draft,
710 743 pullop.pulledsubset)
711 744 else:
712 745 # Remote is old or publishing all common changesets
713 746 # should be seen as public
714 747 phases.advanceboundary(pullop.repo, phases.public,
715 748 pullop.pulledsubset)
716 749
717 750 def _pullobsolete(pullop):
718 751 """utility function to pull obsolete markers from a remote
719 752
720 753 The `gettransaction` is function that return the pull transaction, creating
721 754 one if necessary. We return the transaction to inform the calling code that
722 755 a new transaction have been created (when applicable).
723 756
724 757 Exists mostly to allow overriding for experimentation purpose"""
725 758 pullop.todosteps.remove('obsmarkers')
726 759 tr = None
727 760 if obsolete._enabled:
728 761 pullop.repo.ui.debug('fetching remote obsolete markers\n')
729 762 remoteobs = pullop.remote.listkeys('obsolete')
730 763 if 'dump0' in remoteobs:
731 764 tr = pullop.gettransaction()
732 765 for key in sorted(remoteobs, reverse=True):
733 766 if key.startswith('dump'):
734 767 data = base85.b85decode(remoteobs[key])
735 768 pullop.repo.obsstore.mergemarkers(tr, data)
736 769 pullop.repo.invalidatevolatilesets()
737 770 return tr
738 771
739 772 def caps20to10(repo):
740 773 """return a set with appropriate options to use bundle20 during getbundle"""
741 774 caps = set(['HG2X'])
742 775 capsblob = bundle2.encodecaps(repo.bundle2caps)
743 776 caps.add('bundle2=' + urllib.quote(capsblob))
744 777 return caps
745 778
746 779 def getbundle(repo, source, heads=None, common=None, bundlecaps=None,
747 780 **kwargs):
748 781 """return a full bundle (with potentially multiple kind of parts)
749 782
750 783 Could be a bundle HG10 or a bundle HG2X depending on bundlecaps
751 784 passed. For now, the bundle can contain only changegroup, but this will
752 785 changes when more part type will be available for bundle2.
753 786
754 787 This is different from changegroup.getbundle that only returns an HG10
755 788 changegroup bundle. They may eventually get reunited in the future when we
756 789 have a clearer idea of the API we what to query different data.
757 790
758 791 The implementation is at a very early stage and will get massive rework
759 792 when the API of bundle is refined.
760 793 """
761 794 cg = None
762 795 if kwargs.get('cg', True):
763 796 # build changegroup bundle here.
764 797 cg = changegroup.getbundle(repo, source, heads=heads,
765 798 common=common, bundlecaps=bundlecaps)
766 799 elif 'HG2X' not in bundlecaps:
767 800 raise ValueError(_('request for bundle10 must include changegroup'))
768 801 if bundlecaps is None or 'HG2X' not in bundlecaps:
769 802 if kwargs:
770 803 raise ValueError(_('unsupported getbundle arguments: %s')
771 804 % ', '.join(sorted(kwargs.keys())))
772 805 return cg
773 806 # very crude first implementation,
774 807 # the bundle API will change and the generation will be done lazily.
775 808 b2caps = {}
776 809 for bcaps in bundlecaps:
777 810 if bcaps.startswith('bundle2='):
778 811 blob = urllib.unquote(bcaps[len('bundle2='):])
779 812 b2caps.update(bundle2.decodecaps(blob))
780 813 bundler = bundle2.bundle20(repo.ui, b2caps)
781 814 if cg:
782 815 bundler.newpart('b2x:changegroup', data=cg.getchunks())
783 816 listkeys = kwargs.get('listkeys', ())
784 817 for namespace in listkeys:
785 818 part = bundler.newpart('b2x:listkeys')
786 819 part.addparam('namespace', namespace)
787 820 keys = repo.listkeys(namespace).items()
788 821 part.data = pushkey.encodekeys(keys)
789 822 _getbundleextrapart(bundler, repo, source, heads=heads, common=common,
790 823 bundlecaps=bundlecaps, **kwargs)
791 824 return util.chunkbuffer(bundler.getchunks())
792 825
793 826 def _getbundleextrapart(bundler, repo, source, heads=None, common=None,
794 827 bundlecaps=None, **kwargs):
795 828 """hook function to let extensions add parts to the requested bundle"""
796 829 pass
797 830
798 831 def check_heads(repo, their_heads, context):
799 832 """check if the heads of a repo have been modified
800 833
801 834 Used by peer for unbundling.
802 835 """
803 836 heads = repo.heads()
804 837 heads_hash = util.sha1(''.join(sorted(heads))).digest()
805 838 if not (their_heads == ['force'] or their_heads == heads or
806 839 their_heads == ['hashed', heads_hash]):
807 840 # someone else committed/pushed/unbundled while we
808 841 # were transferring data
809 842 raise error.PushRaced('repository changed while %s - '
810 843 'please try again' % context)
811 844
812 845 def unbundle(repo, cg, heads, source, url):
813 846 """Apply a bundle to a repo.
814 847
815 848 this function makes sure the repo is locked during the application and have
816 849 mechanism to check that no push race occurred between the creation of the
817 850 bundle and its application.
818 851
819 852 If the push was raced as PushRaced exception is raised."""
820 853 r = 0
821 854 # need a transaction when processing a bundle2 stream
822 855 tr = None
823 856 lock = repo.lock()
824 857 try:
825 858 check_heads(repo, heads, 'uploading changes')
826 859 # push can proceed
827 860 if util.safehasattr(cg, 'params'):
828 861 try:
829 862 tr = repo.transaction('unbundle')
830 863 tr.hookargs['bundle2-exp'] = '1'
831 864 r = bundle2.processbundle(repo, cg, lambda: tr).reply
832 865 cl = repo.unfiltered().changelog
833 866 p = cl.writepending() and repo.root or ""
834 867 repo.hook('b2x-pretransactionclose', throw=True, source=source,
835 868 url=url, pending=p, **tr.hookargs)
836 869 tr.close()
837 870 repo.hook('b2x-transactionclose', source=source, url=url,
838 871 **tr.hookargs)
839 872 except Exception, exc:
840 873 exc.duringunbundle2 = True
841 874 raise
842 875 else:
843 876 r = changegroup.addchangegroup(repo, cg, source, url)
844 877 finally:
845 878 if tr is not None:
846 879 tr.release()
847 880 lock.release()
848 881 return r
General Comments 0
You need to be logged in to leave comments. Login now