##// END OF EJS Templates
push: use stepsdone to control bookmark push...
Pierre-Yves David -
r22240:d092f4b6 default
parent child Browse files
Show More
@@ -1,995 +1,996 b''
1 1 # exchange.py - utility to exchange data between repos.
2 2 #
3 3 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from i18n import _
9 9 from node import hex, nullid
10 10 import errno, urllib
11 11 import util, scmutil, changegroup, base85, error
12 12 import discovery, phases, obsolete, bookmarks, bundle2, pushkey
13 13
14 14 def readbundle(ui, fh, fname, vfs=None):
15 15 header = changegroup.readexactly(fh, 4)
16 16
17 17 alg = None
18 18 if not fname:
19 19 fname = "stream"
20 20 if not header.startswith('HG') and header.startswith('\0'):
21 21 fh = changegroup.headerlessfixup(fh, header)
22 22 header = "HG10"
23 23 alg = 'UN'
24 24 elif vfs:
25 25 fname = vfs.join(fname)
26 26
27 27 magic, version = header[0:2], header[2:4]
28 28
29 29 if magic != 'HG':
30 30 raise util.Abort(_('%s: not a Mercurial bundle') % fname)
31 31 if version == '10':
32 32 if alg is None:
33 33 alg = changegroup.readexactly(fh, 2)
34 34 return changegroup.unbundle10(fh, alg)
35 35 elif version == '2X':
36 36 return bundle2.unbundle20(ui, fh, header=magic + version)
37 37 else:
38 38 raise util.Abort(_('%s: unknown bundle version %s') % (fname, version))
39 39
40 40
41 41 class pushoperation(object):
42 42 """A object that represent a single push operation
43 43
44 44 It purpose is to carry push related state and very common operation.
45 45
46 46 A new should be created at the beginning of each push and discarded
47 47 afterward.
48 48 """
49 49
50 50 def __init__(self, repo, remote, force=False, revs=None, newbranch=False):
51 51 # repo we push from
52 52 self.repo = repo
53 53 self.ui = repo.ui
54 54 # repo we push to
55 55 self.remote = remote
56 56 # force option provided
57 57 self.force = force
58 58 # revs to be pushed (None is "all")
59 59 self.revs = revs
60 60 # allow push of new branch
61 61 self.newbranch = newbranch
62 62 # did a local lock get acquired?
63 63 self.locallocked = None
64 64 # step already performed
65 65 # (used to check what steps have been already performed through bundle2)
66 66 self.stepsdone = set()
67 67 # Integer version of the push result
68 68 # - None means nothing to push
69 69 # - 0 means HTTP error
70 70 # - 1 means we pushed and remote head count is unchanged *or*
71 71 # we have outgoing changesets but refused to push
72 72 # - other values as described by addchangegroup()
73 73 self.ret = None
74 74 # discover.outgoing object (contains common and outgoing data)
75 75 self.outgoing = None
76 76 # all remote heads before the push
77 77 self.remoteheads = None
78 78 # testable as a boolean indicating if any nodes are missing locally.
79 79 self.incoming = None
80 80 # phases changes that must be pushed along side the changesets
81 81 self.outdatedphases = None
82 82 # phases changes that must be pushed if changeset push fails
83 83 self.fallbackoutdatedphases = None
84 84 # outgoing obsmarkers
85 85 self.outobsmarkers = set()
86 86 # outgoing bookmarks
87 87 self.outbookmarks = []
88 88
89 89 @util.propertycache
90 90 def futureheads(self):
91 91 """future remote heads if the changeset push succeeds"""
92 92 return self.outgoing.missingheads
93 93
94 94 @util.propertycache
95 95 def fallbackheads(self):
96 96 """future remote heads if the changeset push fails"""
97 97 if self.revs is None:
98 98 # not target to push, all common are relevant
99 99 return self.outgoing.commonheads
100 100 unfi = self.repo.unfiltered()
101 101 # I want cheads = heads(::missingheads and ::commonheads)
102 102 # (missingheads is revs with secret changeset filtered out)
103 103 #
104 104 # This can be expressed as:
105 105 # cheads = ( (missingheads and ::commonheads)
106 106 # + (commonheads and ::missingheads))"
107 107 # )
108 108 #
109 109 # while trying to push we already computed the following:
110 110 # common = (::commonheads)
111 111 # missing = ((commonheads::missingheads) - commonheads)
112 112 #
113 113 # We can pick:
114 114 # * missingheads part of common (::commonheads)
115 115 common = set(self.outgoing.common)
116 116 nm = self.repo.changelog.nodemap
117 117 cheads = [node for node in self.revs if nm[node] in common]
118 118 # and
119 119 # * commonheads parents on missing
120 120 revset = unfi.set('%ln and parents(roots(%ln))',
121 121 self.outgoing.commonheads,
122 122 self.outgoing.missing)
123 123 cheads.extend(c.node() for c in revset)
124 124 return cheads
125 125
126 126 @property
127 127 def commonheads(self):
128 128 """set of all common heads after changeset bundle push"""
129 129 if self.ret:
130 130 return self.futureheads
131 131 else:
132 132 return self.fallbackheads
133 133
134 134 def push(repo, remote, force=False, revs=None, newbranch=False):
135 135 '''Push outgoing changesets (limited by revs) from a local
136 136 repository to remote. Return an integer:
137 137 - None means nothing to push
138 138 - 0 means HTTP error
139 139 - 1 means we pushed and remote head count is unchanged *or*
140 140 we have outgoing changesets but refused to push
141 141 - other values as described by addchangegroup()
142 142 '''
143 143 pushop = pushoperation(repo, remote, force, revs, newbranch)
144 144 if pushop.remote.local():
145 145 missing = (set(pushop.repo.requirements)
146 146 - pushop.remote.local().supported)
147 147 if missing:
148 148 msg = _("required features are not"
149 149 " supported in the destination:"
150 150 " %s") % (', '.join(sorted(missing)))
151 151 raise util.Abort(msg)
152 152
153 153 # there are two ways to push to remote repo:
154 154 #
155 155 # addchangegroup assumes local user can lock remote
156 156 # repo (local filesystem, old ssh servers).
157 157 #
158 158 # unbundle assumes local user cannot lock remote repo (new ssh
159 159 # servers, http servers).
160 160
161 161 if not pushop.remote.canpush():
162 162 raise util.Abort(_("destination does not support push"))
163 163 # get local lock as we might write phase data
164 164 locallock = None
165 165 try:
166 166 locallock = pushop.repo.lock()
167 167 pushop.locallocked = True
168 168 except IOError, err:
169 169 pushop.locallocked = False
170 170 if err.errno != errno.EACCES:
171 171 raise
172 172 # source repo cannot be locked.
173 173 # We do not abort the push, but just disable the local phase
174 174 # synchronisation.
175 175 msg = 'cannot lock source repository: %s\n' % err
176 176 pushop.ui.debug(msg)
177 177 try:
178 178 pushop.repo.checkpush(pushop)
179 179 lock = None
180 180 unbundle = pushop.remote.capable('unbundle')
181 181 if not unbundle:
182 182 lock = pushop.remote.lock()
183 183 try:
184 184 _pushdiscovery(pushop)
185 185 if (pushop.repo.ui.configbool('experimental', 'bundle2-exp',
186 186 False)
187 187 and pushop.remote.capable('bundle2-exp')):
188 188 _pushbundle2(pushop)
189 189 _pushchangeset(pushop)
190 190 _pushsyncphase(pushop)
191 191 _pushobsolete(pushop)
192 192 _pushbookmark(pushop)
193 193 finally:
194 194 if lock is not None:
195 195 lock.release()
196 196 finally:
197 197 if locallock is not None:
198 198 locallock.release()
199 199
200 200 return pushop.ret
201 201
202 202 # list of steps to perform discovery before push
203 203 pushdiscoveryorder = []
204 204
205 205 # Mapping between step name and function
206 206 #
207 207 # This exists to help extensions wrap steps if necessary
208 208 pushdiscoverymapping = {}
209 209
210 210 def pushdiscovery(stepname):
211 211 """decorator for function performing discovery before push
212 212
213 213 The function is added to the step -> function mapping and appended to the
214 214 list of steps. Beware that decorated function will be added in order (this
215 215 may matter).
216 216
217 217 You can only use this decorator for a new step, if you want to wrap a step
218 218 from an extension, change the pushdiscovery dictionary directly."""
219 219 def dec(func):
220 220 assert stepname not in pushdiscoverymapping
221 221 pushdiscoverymapping[stepname] = func
222 222 pushdiscoveryorder.append(stepname)
223 223 return func
224 224 return dec
225 225
226 226 def _pushdiscovery(pushop):
227 227 """Run all discovery steps"""
228 228 for stepname in pushdiscoveryorder:
229 229 step = pushdiscoverymapping[stepname]
230 230 step(pushop)
231 231
232 232 @pushdiscovery('changeset')
233 233 def _pushdiscoverychangeset(pushop):
234 234 """discover the changeset that need to be pushed"""
235 235 unfi = pushop.repo.unfiltered()
236 236 fci = discovery.findcommonincoming
237 237 commoninc = fci(unfi, pushop.remote, force=pushop.force)
238 238 common, inc, remoteheads = commoninc
239 239 fco = discovery.findcommonoutgoing
240 240 outgoing = fco(unfi, pushop.remote, onlyheads=pushop.revs,
241 241 commoninc=commoninc, force=pushop.force)
242 242 pushop.outgoing = outgoing
243 243 pushop.remoteheads = remoteheads
244 244 pushop.incoming = inc
245 245
246 246 @pushdiscovery('phase')
247 247 def _pushdiscoveryphase(pushop):
248 248 """discover the phase that needs to be pushed
249 249
250 250 (computed for both success and failure case for changesets push)"""
251 251 outgoing = pushop.outgoing
252 252 unfi = pushop.repo.unfiltered()
253 253 remotephases = pushop.remote.listkeys('phases')
254 254 publishing = remotephases.get('publishing', False)
255 255 ana = phases.analyzeremotephases(pushop.repo,
256 256 pushop.fallbackheads,
257 257 remotephases)
258 258 pheads, droots = ana
259 259 extracond = ''
260 260 if not publishing:
261 261 extracond = ' and public()'
262 262 revset = 'heads((%%ln::%%ln) %s)' % extracond
263 263 # Get the list of all revs draft on remote by public here.
264 264 # XXX Beware that revset break if droots is not strictly
265 265 # XXX root we may want to ensure it is but it is costly
266 266 fallback = list(unfi.set(revset, droots, pushop.fallbackheads))
267 267 if not outgoing.missing:
268 268 future = fallback
269 269 else:
270 270 # adds changeset we are going to push as draft
271 271 #
272 272 # should not be necessary for pushblishing server, but because of an
273 273 # issue fixed in xxxxx we have to do it anyway.
274 274 fdroots = list(unfi.set('roots(%ln + %ln::)',
275 275 outgoing.missing, droots))
276 276 fdroots = [f.node() for f in fdroots]
277 277 future = list(unfi.set(revset, fdroots, pushop.futureheads))
278 278 pushop.outdatedphases = future
279 279 pushop.fallbackoutdatedphases = fallback
280 280
281 281 @pushdiscovery('obsmarker')
282 282 def _pushdiscoveryobsmarkers(pushop):
283 283 pushop.outobsmarkers = pushop.repo.obsstore
284 284
285 285 @pushdiscovery('bookmarks')
286 286 def _pushdiscoverybookmarks(pushop):
287 287 ui = pushop.ui
288 288 repo = pushop.repo.unfiltered()
289 289 remote = pushop.remote
290 290 ui.debug("checking for updated bookmarks\n")
291 291 ancestors = ()
292 292 if pushop.revs:
293 293 revnums = map(repo.changelog.rev, pushop.revs)
294 294 ancestors = repo.changelog.ancestors(revnums, inclusive=True)
295 295 remotebookmark = remote.listkeys('bookmarks')
296 296
297 297 comp = bookmarks.compare(repo, repo._bookmarks, remotebookmark, srchex=hex)
298 298 (addsrc, adddst, advsrc, advdst, diverge, differ, invalid) = comp
299 299 for b, scid, dcid in advsrc:
300 300 if not ancestors or repo[scid].rev() in ancestors:
301 301 pushop.outbookmarks.append((b, dcid, scid))
302 302
303 303 def _pushcheckoutgoing(pushop):
304 304 outgoing = pushop.outgoing
305 305 unfi = pushop.repo.unfiltered()
306 306 if not outgoing.missing:
307 307 # nothing to push
308 308 scmutil.nochangesfound(unfi.ui, unfi, outgoing.excluded)
309 309 return False
310 310 # something to push
311 311 if not pushop.force:
312 312 # if repo.obsstore == False --> no obsolete
313 313 # then, save the iteration
314 314 if unfi.obsstore:
315 315 # this message are here for 80 char limit reason
316 316 mso = _("push includes obsolete changeset: %s!")
317 317 mst = "push includes %s changeset: %s!"
318 318 # plain versions for i18n tool to detect them
319 319 _("push includes unstable changeset: %s!")
320 320 _("push includes bumped changeset: %s!")
321 321 _("push includes divergent changeset: %s!")
322 322 # If we are to push if there is at least one
323 323 # obsolete or unstable changeset in missing, at
324 324 # least one of the missinghead will be obsolete or
325 325 # unstable. So checking heads only is ok
326 326 for node in outgoing.missingheads:
327 327 ctx = unfi[node]
328 328 if ctx.obsolete():
329 329 raise util.Abort(mso % ctx)
330 330 elif ctx.troubled():
331 331 raise util.Abort(_(mst)
332 332 % (ctx.troubles()[0],
333 333 ctx))
334 334 newbm = pushop.ui.configlist('bookmarks', 'pushing')
335 335 discovery.checkheads(unfi, pushop.remote, outgoing,
336 336 pushop.remoteheads,
337 337 pushop.newbranch,
338 338 bool(pushop.incoming),
339 339 newbm)
340 340 return True
341 341
342 342 # List of names of steps to perform for an outgoing bundle2, order matters.
343 343 b2partsgenorder = []
344 344
345 345 # Mapping between step name and function
346 346 #
347 347 # This exists to help extensions wrap steps if necessary
348 348 b2partsgenmapping = {}
349 349
350 350 def b2partsgenerator(stepname):
351 351 """decorator for function generating bundle2 part
352 352
353 353 The function is added to the step -> function mapping and appended to the
354 354 list of steps. Beware that decorated functions will be added in order
355 355 (this may matter).
356 356
357 357 You can only use this decorator for new steps, if you want to wrap a step
358 358 from an extension, attack the b2partsgenmapping dictionary directly."""
359 359 def dec(func):
360 360 assert stepname not in b2partsgenmapping
361 361 b2partsgenmapping[stepname] = func
362 362 b2partsgenorder.append(stepname)
363 363 return func
364 364 return dec
365 365
366 366 @b2partsgenerator('changeset')
367 367 def _pushb2ctx(pushop, bundler):
368 368 """handle changegroup push through bundle2
369 369
370 370 addchangegroup result is stored in the ``pushop.ret`` attribute.
371 371 """
372 372 if 'changesets' in pushop.stepsdone:
373 373 return
374 374 pushop.stepsdone.add('changesets')
375 375 # Send known heads to the server for race detection.
376 376 pushop.stepsdone.add('changesets')
377 377 if not _pushcheckoutgoing(pushop):
378 378 return
379 379 pushop.repo.prepushoutgoinghooks(pushop.repo,
380 380 pushop.remote,
381 381 pushop.outgoing)
382 382 if not pushop.force:
383 383 bundler.newpart('B2X:CHECK:HEADS', data=iter(pushop.remoteheads))
384 384 cg = changegroup.getlocalbundle(pushop.repo, 'push', pushop.outgoing)
385 385 cgpart = bundler.newpart('B2X:CHANGEGROUP', data=cg.getchunks())
386 386 def handlereply(op):
387 387 """extract addchangroup returns from server reply"""
388 388 cgreplies = op.records.getreplies(cgpart.id)
389 389 assert len(cgreplies['changegroup']) == 1
390 390 pushop.ret = cgreplies['changegroup'][0]['return']
391 391 return handlereply
392 392
393 393 @b2partsgenerator('phase')
394 394 def _pushb2phases(pushop, bundler):
395 395 """handle phase push through bundle2"""
396 396 if 'phases' in pushop.stepsdone:
397 397 return
398 398 b2caps = bundle2.bundle2caps(pushop.remote)
399 399 if not 'b2x:pushkey' in b2caps:
400 400 return
401 401 pushop.stepsdone.add('phases')
402 402 part2node = []
403 403 enc = pushkey.encode
404 404 for newremotehead in pushop.outdatedphases:
405 405 part = bundler.newpart('b2x:pushkey')
406 406 part.addparam('namespace', enc('phases'))
407 407 part.addparam('key', enc(newremotehead.hex()))
408 408 part.addparam('old', enc(str(phases.draft)))
409 409 part.addparam('new', enc(str(phases.public)))
410 410 part2node.append((part.id, newremotehead))
411 411 def handlereply(op):
412 412 for partid, node in part2node:
413 413 partrep = op.records.getreplies(partid)
414 414 results = partrep['pushkey']
415 415 assert len(results) <= 1
416 416 msg = None
417 417 if not results:
418 418 msg = _('server ignored update of %s to public!\n') % node
419 419 elif not int(results[0]['return']):
420 420 msg = _('updating %s to public failed!\n') % node
421 421 if msg is not None:
422 422 pushop.ui.warn(msg)
423 423 return handlereply
424 424
425 425 def _pushbundle2(pushop):
426 426 """push data to the remote using bundle2
427 427
428 428 The only currently supported type of data is changegroup but this will
429 429 evolve in the future."""
430 430 bundler = bundle2.bundle20(pushop.ui, bundle2.bundle2caps(pushop.remote))
431 431 # create reply capability
432 432 capsblob = bundle2.encodecaps(pushop.repo.bundle2caps)
433 433 bundler.newpart('b2x:replycaps', data=capsblob)
434 434 replyhandlers = []
435 435 for partgenname in b2partsgenorder:
436 436 partgen = b2partsgenmapping[partgenname]
437 437 ret = partgen(pushop, bundler)
438 438 if callable(ret):
439 439 replyhandlers.append(ret)
440 440 # do not push if nothing to push
441 441 if bundler.nbparts <= 1:
442 442 return
443 443 stream = util.chunkbuffer(bundler.getchunks())
444 444 try:
445 445 reply = pushop.remote.unbundle(stream, ['force'], 'push')
446 446 except error.BundleValueError, exc:
447 447 raise util.Abort('missing support for %s' % exc)
448 448 try:
449 449 op = bundle2.processbundle(pushop.repo, reply)
450 450 except error.BundleValueError, exc:
451 451 raise util.Abort('missing support for %s' % exc)
452 452 for rephand in replyhandlers:
453 453 rephand(op)
454 454
455 455 def _pushchangeset(pushop):
456 456 """Make the actual push of changeset bundle to remote repo"""
457 457 if 'changesets' in pushop.stepsdone:
458 458 return
459 459 pushop.stepsdone.add('changesets')
460 460 if not _pushcheckoutgoing(pushop):
461 461 return
462 462 pushop.repo.prepushoutgoinghooks(pushop.repo,
463 463 pushop.remote,
464 464 pushop.outgoing)
465 465 outgoing = pushop.outgoing
466 466 unbundle = pushop.remote.capable('unbundle')
467 467 # TODO: get bundlecaps from remote
468 468 bundlecaps = None
469 469 # create a changegroup from local
470 470 if pushop.revs is None and not (outgoing.excluded
471 471 or pushop.repo.changelog.filteredrevs):
472 472 # push everything,
473 473 # use the fast path, no race possible on push
474 474 bundler = changegroup.bundle10(pushop.repo, bundlecaps)
475 475 cg = changegroup.getsubset(pushop.repo,
476 476 outgoing,
477 477 bundler,
478 478 'push',
479 479 fastpath=True)
480 480 else:
481 481 cg = changegroup.getlocalbundle(pushop.repo, 'push', outgoing,
482 482 bundlecaps)
483 483
484 484 # apply changegroup to remote
485 485 if unbundle:
486 486 # local repo finds heads on server, finds out what
487 487 # revs it must push. once revs transferred, if server
488 488 # finds it has different heads (someone else won
489 489 # commit/push race), server aborts.
490 490 if pushop.force:
491 491 remoteheads = ['force']
492 492 else:
493 493 remoteheads = pushop.remoteheads
494 494 # ssh: return remote's addchangegroup()
495 495 # http: return remote's addchangegroup() or 0 for error
496 496 pushop.ret = pushop.remote.unbundle(cg, remoteheads,
497 497 pushop.repo.url())
498 498 else:
499 499 # we return an integer indicating remote head count
500 500 # change
501 501 pushop.ret = pushop.remote.addchangegroup(cg, 'push', pushop.repo.url())
502 502
503 503 def _pushsyncphase(pushop):
504 504 """synchronise phase information locally and remotely"""
505 505 cheads = pushop.commonheads
506 506 # even when we don't push, exchanging phase data is useful
507 507 remotephases = pushop.remote.listkeys('phases')
508 508 if (pushop.ui.configbool('ui', '_usedassubrepo', False)
509 509 and remotephases # server supports phases
510 510 and pushop.ret is None # nothing was pushed
511 511 and remotephases.get('publishing', False)):
512 512 # When:
513 513 # - this is a subrepo push
514 514 # - and remote support phase
515 515 # - and no changeset was pushed
516 516 # - and remote is publishing
517 517 # We may be in issue 3871 case!
518 518 # We drop the possible phase synchronisation done by
519 519 # courtesy to publish changesets possibly locally draft
520 520 # on the remote.
521 521 remotephases = {'publishing': 'True'}
522 522 if not remotephases: # old server or public only reply from non-publishing
523 523 _localphasemove(pushop, cheads)
524 524 # don't push any phase data as there is nothing to push
525 525 else:
526 526 ana = phases.analyzeremotephases(pushop.repo, cheads,
527 527 remotephases)
528 528 pheads, droots = ana
529 529 ### Apply remote phase on local
530 530 if remotephases.get('publishing', False):
531 531 _localphasemove(pushop, cheads)
532 532 else: # publish = False
533 533 _localphasemove(pushop, pheads)
534 534 _localphasemove(pushop, cheads, phases.draft)
535 535 ### Apply local phase on remote
536 536
537 537 if pushop.ret:
538 538 if 'phases' in pushop.stepsdone:
539 539 # phases already pushed though bundle2
540 540 return
541 541 outdated = pushop.outdatedphases
542 542 else:
543 543 outdated = pushop.fallbackoutdatedphases
544 544
545 545 pushop.stepsdone.add('phases')
546 546
547 547 # filter heads already turned public by the push
548 548 outdated = [c for c in outdated if c.node() not in pheads]
549 549 b2caps = bundle2.bundle2caps(pushop.remote)
550 550 if 'b2x:pushkey' in b2caps:
551 551 # server supports bundle2, let's do a batched push through it
552 552 #
553 553 # This will eventually be unified with the changesets bundle2 push
554 554 bundler = bundle2.bundle20(pushop.ui, b2caps)
555 555 capsblob = bundle2.encodecaps(pushop.repo.bundle2caps)
556 556 bundler.newpart('b2x:replycaps', data=capsblob)
557 557 part2node = []
558 558 enc = pushkey.encode
559 559 for newremotehead in outdated:
560 560 part = bundler.newpart('b2x:pushkey')
561 561 part.addparam('namespace', enc('phases'))
562 562 part.addparam('key', enc(newremotehead.hex()))
563 563 part.addparam('old', enc(str(phases.draft)))
564 564 part.addparam('new', enc(str(phases.public)))
565 565 part2node.append((part.id, newremotehead))
566 566 stream = util.chunkbuffer(bundler.getchunks())
567 567 try:
568 568 reply = pushop.remote.unbundle(stream, ['force'], 'push')
569 569 op = bundle2.processbundle(pushop.repo, reply)
570 570 except error.BundleValueError, exc:
571 571 raise util.Abort('missing support for %s' % exc)
572 572 for partid, node in part2node:
573 573 partrep = op.records.getreplies(partid)
574 574 results = partrep['pushkey']
575 575 assert len(results) <= 1
576 576 msg = None
577 577 if not results:
578 578 msg = _('server ignored update of %s to public!\n') % node
579 579 elif not int(results[0]['return']):
580 580 msg = _('updating %s to public failed!\n') % node
581 581 if msg is not None:
582 582 pushop.ui.warn(msg)
583 583
584 584 else:
585 585 # fallback to independant pushkey command
586 586 for newremotehead in outdated:
587 587 r = pushop.remote.pushkey('phases',
588 588 newremotehead.hex(),
589 589 str(phases.draft),
590 590 str(phases.public))
591 591 if not r:
592 592 pushop.ui.warn(_('updating %s to public failed!\n')
593 593 % newremotehead)
594 594
595 595 def _localphasemove(pushop, nodes, phase=phases.public):
596 596 """move <nodes> to <phase> in the local source repo"""
597 597 if pushop.locallocked:
598 598 tr = pushop.repo.transaction('push-phase-sync')
599 599 try:
600 600 phases.advanceboundary(pushop.repo, tr, phase, nodes)
601 601 tr.close()
602 602 finally:
603 603 tr.release()
604 604 else:
605 605 # repo is not locked, do not change any phases!
606 606 # Informs the user that phases should have been moved when
607 607 # applicable.
608 608 actualmoves = [n for n in nodes if phase < pushop.repo[n].phase()]
609 609 phasestr = phases.phasenames[phase]
610 610 if actualmoves:
611 611 pushop.ui.status(_('cannot lock source repo, skipping '
612 612 'local %s phase update\n') % phasestr)
613 613
614 614 def _pushobsolete(pushop):
615 615 """utility function to push obsolete markers to a remote"""
616 616 if 'obsmarkers' in pushop.stepsdone:
617 617 return
618 618 pushop.ui.debug('try to push obsolete markers to remote\n')
619 619 repo = pushop.repo
620 620 remote = pushop.remote
621 621 pushop.stepsdone.add('obsmarkers')
622 622 if (obsolete._enabled and repo.obsstore and
623 623 'obsolete' in remote.listkeys('namespaces')):
624 624 rslts = []
625 625 remotedata = obsolete._pushkeyescape(pushop.outobsmarkers)
626 626 for key in sorted(remotedata, reverse=True):
627 627 # reverse sort to ensure we end with dump0
628 628 data = remotedata[key]
629 629 rslts.append(remote.pushkey('obsolete', key, '', data))
630 630 if [r for r in rslts if not r]:
631 631 msg = _('failed to push some obsolete markers!\n')
632 632 repo.ui.warn(msg)
633 633
634 634 def _pushbookmark(pushop):
635 635 """Update bookmark position on remote"""
636 if pushop.ret == 0:
636 if pushop.ret == 0 or 'bookmarks' in pushop.stepsdone:
637 637 return
638 pushop.stepsdone.add('bookmarks')
638 639 ui = pushop.ui
639 640 remote = pushop.remote
640 641 for b, old, new in pushop.outbookmarks:
641 642 if remote.pushkey('bookmarks', b, old, new):
642 643 ui.status(_("updating bookmark %s\n") % b)
643 644 else:
644 645 ui.warn(_('updating bookmark %s failed!\n') % b)
645 646
646 647 class pulloperation(object):
647 648 """A object that represent a single pull operation
648 649
649 650 It purpose is to carry push related state and very common operation.
650 651
651 652 A new should be created at the beginning of each pull and discarded
652 653 afterward.
653 654 """
654 655
655 656 def __init__(self, repo, remote, heads=None, force=False):
656 657 # repo we pull into
657 658 self.repo = repo
658 659 # repo we pull from
659 660 self.remote = remote
660 661 # revision we try to pull (None is "all")
661 662 self.heads = heads
662 663 # do we force pull?
663 664 self.force = force
664 665 # the name the pull transaction
665 666 self._trname = 'pull\n' + util.hidepassword(remote.url())
666 667 # hold the transaction once created
667 668 self._tr = None
668 669 # set of common changeset between local and remote before pull
669 670 self.common = None
670 671 # set of pulled head
671 672 self.rheads = None
672 673 # list of missing changeset to fetch remotely
673 674 self.fetch = None
674 675 # result of changegroup pulling (used as return code by pull)
675 676 self.cgresult = None
676 677 # list of step remaining todo (related to future bundle2 usage)
677 678 self.todosteps = set(['changegroup', 'phases', 'obsmarkers'])
678 679
679 680 @util.propertycache
680 681 def pulledsubset(self):
681 682 """heads of the set of changeset target by the pull"""
682 683 # compute target subset
683 684 if self.heads is None:
684 685 # We pulled every thing possible
685 686 # sync on everything common
686 687 c = set(self.common)
687 688 ret = list(self.common)
688 689 for n in self.rheads:
689 690 if n not in c:
690 691 ret.append(n)
691 692 return ret
692 693 else:
693 694 # We pulled a specific subset
694 695 # sync on this subset
695 696 return self.heads
696 697
697 698 def gettransaction(self):
698 699 """get appropriate pull transaction, creating it if needed"""
699 700 if self._tr is None:
700 701 self._tr = self.repo.transaction(self._trname)
701 702 return self._tr
702 703
703 704 def closetransaction(self):
704 705 """close transaction if created"""
705 706 if self._tr is not None:
706 707 self._tr.close()
707 708
708 709 def releasetransaction(self):
709 710 """release transaction if created"""
710 711 if self._tr is not None:
711 712 self._tr.release()
712 713
713 714 def pull(repo, remote, heads=None, force=False):
714 715 pullop = pulloperation(repo, remote, heads, force)
715 716 if pullop.remote.local():
716 717 missing = set(pullop.remote.requirements) - pullop.repo.supported
717 718 if missing:
718 719 msg = _("required features are not"
719 720 " supported in the destination:"
720 721 " %s") % (', '.join(sorted(missing)))
721 722 raise util.Abort(msg)
722 723
723 724 lock = pullop.repo.lock()
724 725 try:
725 726 _pulldiscovery(pullop)
726 727 if (pullop.repo.ui.configbool('experimental', 'bundle2-exp', False)
727 728 and pullop.remote.capable('bundle2-exp')):
728 729 _pullbundle2(pullop)
729 730 if 'changegroup' in pullop.todosteps:
730 731 _pullchangeset(pullop)
731 732 if 'phases' in pullop.todosteps:
732 733 _pullphase(pullop)
733 734 if 'obsmarkers' in pullop.todosteps:
734 735 _pullobsolete(pullop)
735 736 pullop.closetransaction()
736 737 finally:
737 738 pullop.releasetransaction()
738 739 lock.release()
739 740
740 741 return pullop.cgresult
741 742
742 743 def _pulldiscovery(pullop):
743 744 """discovery phase for the pull
744 745
745 746 Current handle changeset discovery only, will change handle all discovery
746 747 at some point."""
747 748 tmp = discovery.findcommonincoming(pullop.repo.unfiltered(),
748 749 pullop.remote,
749 750 heads=pullop.heads,
750 751 force=pullop.force)
751 752 pullop.common, pullop.fetch, pullop.rheads = tmp
752 753
753 754 def _pullbundle2(pullop):
754 755 """pull data using bundle2
755 756
756 757 For now, the only supported data are changegroup."""
757 758 remotecaps = bundle2.bundle2caps(pullop.remote)
758 759 kwargs = {'bundlecaps': caps20to10(pullop.repo)}
759 760 # pulling changegroup
760 761 pullop.todosteps.remove('changegroup')
761 762
762 763 kwargs['common'] = pullop.common
763 764 kwargs['heads'] = pullop.heads or pullop.rheads
764 765 if 'b2x:listkeys' in remotecaps:
765 766 kwargs['listkeys'] = ['phase']
766 767 if not pullop.fetch:
767 768 pullop.repo.ui.status(_("no changes found\n"))
768 769 pullop.cgresult = 0
769 770 else:
770 771 if pullop.heads is None and list(pullop.common) == [nullid]:
771 772 pullop.repo.ui.status(_("requesting all changes\n"))
772 773 _pullbundle2extraprepare(pullop, kwargs)
773 774 if kwargs.keys() == ['format']:
774 775 return # nothing to pull
775 776 bundle = pullop.remote.getbundle('pull', **kwargs)
776 777 try:
777 778 op = bundle2.processbundle(pullop.repo, bundle, pullop.gettransaction)
778 779 except error.BundleValueError, exc:
779 780 raise util.Abort('missing support for %s' % exc)
780 781
781 782 if pullop.fetch:
782 783 assert len(op.records['changegroup']) == 1
783 784 pullop.cgresult = op.records['changegroup'][0]['return']
784 785
785 786 # processing phases change
786 787 for namespace, value in op.records['listkeys']:
787 788 if namespace == 'phases':
788 789 _pullapplyphases(pullop, value)
789 790
790 791 def _pullbundle2extraprepare(pullop, kwargs):
791 792 """hook function so that extensions can extend the getbundle call"""
792 793 pass
793 794
794 795 def _pullchangeset(pullop):
795 796 """pull changeset from unbundle into the local repo"""
796 797 # We delay the open of the transaction as late as possible so we
797 798 # don't open transaction for nothing or you break future useful
798 799 # rollback call
799 800 pullop.todosteps.remove('changegroup')
800 801 if not pullop.fetch:
801 802 pullop.repo.ui.status(_("no changes found\n"))
802 803 pullop.cgresult = 0
803 804 return
804 805 pullop.gettransaction()
805 806 if pullop.heads is None and list(pullop.common) == [nullid]:
806 807 pullop.repo.ui.status(_("requesting all changes\n"))
807 808 elif pullop.heads is None and pullop.remote.capable('changegroupsubset'):
808 809 # issue1320, avoid a race if remote changed after discovery
809 810 pullop.heads = pullop.rheads
810 811
811 812 if pullop.remote.capable('getbundle'):
812 813 # TODO: get bundlecaps from remote
813 814 cg = pullop.remote.getbundle('pull', common=pullop.common,
814 815 heads=pullop.heads or pullop.rheads)
815 816 elif pullop.heads is None:
816 817 cg = pullop.remote.changegroup(pullop.fetch, 'pull')
817 818 elif not pullop.remote.capable('changegroupsubset'):
818 819 raise util.Abort(_("partial pull cannot be done because "
819 820 "other repository doesn't support "
820 821 "changegroupsubset."))
821 822 else:
822 823 cg = pullop.remote.changegroupsubset(pullop.fetch, pullop.heads, 'pull')
823 824 pullop.cgresult = changegroup.addchangegroup(pullop.repo, cg, 'pull',
824 825 pullop.remote.url())
825 826
826 827 def _pullphase(pullop):
827 828 # Get remote phases data from remote
828 829 remotephases = pullop.remote.listkeys('phases')
829 830 _pullapplyphases(pullop, remotephases)
830 831
831 832 def _pullapplyphases(pullop, remotephases):
832 833 """apply phase movement from observed remote state"""
833 834 pullop.todosteps.remove('phases')
834 835 publishing = bool(remotephases.get('publishing', False))
835 836 if remotephases and not publishing:
836 837 # remote is new and unpublishing
837 838 pheads, _dr = phases.analyzeremotephases(pullop.repo,
838 839 pullop.pulledsubset,
839 840 remotephases)
840 841 dheads = pullop.pulledsubset
841 842 else:
842 843 # Remote is old or publishing all common changesets
843 844 # should be seen as public
844 845 pheads = pullop.pulledsubset
845 846 dheads = []
846 847 unfi = pullop.repo.unfiltered()
847 848 phase = unfi._phasecache.phase
848 849 rev = unfi.changelog.nodemap.get
849 850 public = phases.public
850 851 draft = phases.draft
851 852
852 853 # exclude changesets already public locally and update the others
853 854 pheads = [pn for pn in pheads if phase(unfi, rev(pn)) > public]
854 855 if pheads:
855 856 tr = pullop.gettransaction()
856 857 phases.advanceboundary(pullop.repo, tr, public, pheads)
857 858
858 859 # exclude changesets already draft locally and update the others
859 860 dheads = [pn for pn in dheads if phase(unfi, rev(pn)) > draft]
860 861 if dheads:
861 862 tr = pullop.gettransaction()
862 863 phases.advanceboundary(pullop.repo, tr, draft, dheads)
863 864
864 865 def _pullobsolete(pullop):
865 866 """utility function to pull obsolete markers from a remote
866 867
867 868 The `gettransaction` is function that return the pull transaction, creating
868 869 one if necessary. We return the transaction to inform the calling code that
869 870 a new transaction have been created (when applicable).
870 871
871 872 Exists mostly to allow overriding for experimentation purpose"""
872 873 pullop.todosteps.remove('obsmarkers')
873 874 tr = None
874 875 if obsolete._enabled:
875 876 pullop.repo.ui.debug('fetching remote obsolete markers\n')
876 877 remoteobs = pullop.remote.listkeys('obsolete')
877 878 if 'dump0' in remoteobs:
878 879 tr = pullop.gettransaction()
879 880 for key in sorted(remoteobs, reverse=True):
880 881 if key.startswith('dump'):
881 882 data = base85.b85decode(remoteobs[key])
882 883 pullop.repo.obsstore.mergemarkers(tr, data)
883 884 pullop.repo.invalidatevolatilesets()
884 885 return tr
885 886
886 887 def caps20to10(repo):
887 888 """return a set with appropriate options to use bundle20 during getbundle"""
888 889 caps = set(['HG2X'])
889 890 capsblob = bundle2.encodecaps(repo.bundle2caps)
890 891 caps.add('bundle2=' + urllib.quote(capsblob))
891 892 return caps
892 893
893 894 def getbundle(repo, source, heads=None, common=None, bundlecaps=None,
894 895 **kwargs):
895 896 """return a full bundle (with potentially multiple kind of parts)
896 897
897 898 Could be a bundle HG10 or a bundle HG2X depending on bundlecaps
898 899 passed. For now, the bundle can contain only changegroup, but this will
899 900 changes when more part type will be available for bundle2.
900 901
901 902 This is different from changegroup.getbundle that only returns an HG10
902 903 changegroup bundle. They may eventually get reunited in the future when we
903 904 have a clearer idea of the API we what to query different data.
904 905
905 906 The implementation is at a very early stage and will get massive rework
906 907 when the API of bundle is refined.
907 908 """
908 909 cg = None
909 910 if kwargs.get('cg', True):
910 911 # build changegroup bundle here.
911 912 cg = changegroup.getbundle(repo, source, heads=heads,
912 913 common=common, bundlecaps=bundlecaps)
913 914 elif 'HG2X' not in bundlecaps:
914 915 raise ValueError(_('request for bundle10 must include changegroup'))
915 916 if bundlecaps is None or 'HG2X' not in bundlecaps:
916 917 if kwargs:
917 918 raise ValueError(_('unsupported getbundle arguments: %s')
918 919 % ', '.join(sorted(kwargs.keys())))
919 920 return cg
920 921 # very crude first implementation,
921 922 # the bundle API will change and the generation will be done lazily.
922 923 b2caps = {}
923 924 for bcaps in bundlecaps:
924 925 if bcaps.startswith('bundle2='):
925 926 blob = urllib.unquote(bcaps[len('bundle2='):])
926 927 b2caps.update(bundle2.decodecaps(blob))
927 928 bundler = bundle2.bundle20(repo.ui, b2caps)
928 929 if cg:
929 930 bundler.newpart('b2x:changegroup', data=cg.getchunks())
930 931 listkeys = kwargs.get('listkeys', ())
931 932 for namespace in listkeys:
932 933 part = bundler.newpart('b2x:listkeys')
933 934 part.addparam('namespace', namespace)
934 935 keys = repo.listkeys(namespace).items()
935 936 part.data = pushkey.encodekeys(keys)
936 937 _getbundleextrapart(bundler, repo, source, heads=heads, common=common,
937 938 bundlecaps=bundlecaps, **kwargs)
938 939 return util.chunkbuffer(bundler.getchunks())
939 940
940 941 def _getbundleextrapart(bundler, repo, source, heads=None, common=None,
941 942 bundlecaps=None, **kwargs):
942 943 """hook function to let extensions add parts to the requested bundle"""
943 944 pass
944 945
945 946 def check_heads(repo, their_heads, context):
946 947 """check if the heads of a repo have been modified
947 948
948 949 Used by peer for unbundling.
949 950 """
950 951 heads = repo.heads()
951 952 heads_hash = util.sha1(''.join(sorted(heads))).digest()
952 953 if not (their_heads == ['force'] or their_heads == heads or
953 954 their_heads == ['hashed', heads_hash]):
954 955 # someone else committed/pushed/unbundled while we
955 956 # were transferring data
956 957 raise error.PushRaced('repository changed while %s - '
957 958 'please try again' % context)
958 959
959 960 def unbundle(repo, cg, heads, source, url):
960 961 """Apply a bundle to a repo.
961 962
962 963 this function makes sure the repo is locked during the application and have
963 964 mechanism to check that no push race occurred between the creation of the
964 965 bundle and its application.
965 966
966 967 If the push was raced as PushRaced exception is raised."""
967 968 r = 0
968 969 # need a transaction when processing a bundle2 stream
969 970 tr = None
970 971 lock = repo.lock()
971 972 try:
972 973 check_heads(repo, heads, 'uploading changes')
973 974 # push can proceed
974 975 if util.safehasattr(cg, 'params'):
975 976 try:
976 977 tr = repo.transaction('unbundle')
977 978 tr.hookargs['bundle2-exp'] = '1'
978 979 r = bundle2.processbundle(repo, cg, lambda: tr).reply
979 980 cl = repo.unfiltered().changelog
980 981 p = cl.writepending() and repo.root or ""
981 982 repo.hook('b2x-pretransactionclose', throw=True, source=source,
982 983 url=url, pending=p, **tr.hookargs)
983 984 tr.close()
984 985 repo.hook('b2x-transactionclose', source=source, url=url,
985 986 **tr.hookargs)
986 987 except Exception, exc:
987 988 exc.duringunbundle2 = True
988 989 raise
989 990 else:
990 991 r = changegroup.addchangegroup(repo, cg, source, url)
991 992 finally:
992 993 if tr is not None:
993 994 tr.release()
994 995 lock.release()
995 996 return r
General Comments 0
You need to be logged in to leave comments. Login now