##// END OF EJS Templates
push: wrap local phase movement in a transaction...
Pierre-Yves David -
r22051:e894de23 default
parent child Browse files
Show More
@@ -1,965 +1,970
1 1 # exchange.py - utility to exchange data between repos.
2 2 #
3 3 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from i18n import _
9 9 from node import hex, nullid
10 10 import errno, urllib
11 11 import util, scmutil, changegroup, base85, error
12 12 import discovery, phases, obsolete, bookmarks, bundle2, pushkey
13 13
14 14 def readbundle(ui, fh, fname, vfs=None):
15 15 header = changegroup.readexactly(fh, 4)
16 16
17 17 alg = None
18 18 if not fname:
19 19 fname = "stream"
20 20 if not header.startswith('HG') and header.startswith('\0'):
21 21 fh = changegroup.headerlessfixup(fh, header)
22 22 header = "HG10"
23 23 alg = 'UN'
24 24 elif vfs:
25 25 fname = vfs.join(fname)
26 26
27 27 magic, version = header[0:2], header[2:4]
28 28
29 29 if magic != 'HG':
30 30 raise util.Abort(_('%s: not a Mercurial bundle') % fname)
31 31 if version == '10':
32 32 if alg is None:
33 33 alg = changegroup.readexactly(fh, 2)
34 34 return changegroup.unbundle10(fh, alg)
35 35 elif version == '2X':
36 36 return bundle2.unbundle20(ui, fh, header=magic + version)
37 37 else:
38 38 raise util.Abort(_('%s: unknown bundle version %s') % (fname, version))
39 39
40 40
41 41 class pushoperation(object):
42 42 """A object that represent a single push operation
43 43
44 44 It purpose is to carry push related state and very common operation.
45 45
46 46 A new should be created at the beginning of each push and discarded
47 47 afterward.
48 48 """
49 49
50 50 def __init__(self, repo, remote, force=False, revs=None, newbranch=False):
51 51 # repo we push from
52 52 self.repo = repo
53 53 self.ui = repo.ui
54 54 # repo we push to
55 55 self.remote = remote
56 56 # force option provided
57 57 self.force = force
58 58 # revs to be pushed (None is "all")
59 59 self.revs = revs
60 60 # allow push of new branch
61 61 self.newbranch = newbranch
62 62 # did a local lock get acquired?
63 63 self.locallocked = None
64 64 # step already performed
65 65 # (used to check what steps have been already performed through bundle2)
66 66 self.stepsdone = set()
67 67 # Integer version of the push result
68 68 # - None means nothing to push
69 69 # - 0 means HTTP error
70 70 # - 1 means we pushed and remote head count is unchanged *or*
71 71 # we have outgoing changesets but refused to push
72 72 # - other values as described by addchangegroup()
73 73 self.ret = None
74 74 # discover.outgoing object (contains common and outgoing data)
75 75 self.outgoing = None
76 76 # all remote heads before the push
77 77 self.remoteheads = None
78 78 # testable as a boolean indicating if any nodes are missing locally.
79 79 self.incoming = None
80 80 # phases changes that must be pushed along side the changesets
81 81 self.outdatedphases = None
82 82 # phases changes that must be pushed if changeset push fails
83 83 self.fallbackoutdatedphases = None
84 84 # outgoing obsmarkers
85 85 self.outobsmarkers = set()
86 86
87 87 @util.propertycache
88 88 def futureheads(self):
89 89 """future remote heads if the changeset push succeeds"""
90 90 return self.outgoing.missingheads
91 91
92 92 @util.propertycache
93 93 def fallbackheads(self):
94 94 """future remote heads if the changeset push fails"""
95 95 if self.revs is None:
96 96 # not target to push, all common are relevant
97 97 return self.outgoing.commonheads
98 98 unfi = self.repo.unfiltered()
99 99 # I want cheads = heads(::missingheads and ::commonheads)
100 100 # (missingheads is revs with secret changeset filtered out)
101 101 #
102 102 # This can be expressed as:
103 103 # cheads = ( (missingheads and ::commonheads)
104 104 # + (commonheads and ::missingheads))"
105 105 # )
106 106 #
107 107 # while trying to push we already computed the following:
108 108 # common = (::commonheads)
109 109 # missing = ((commonheads::missingheads) - commonheads)
110 110 #
111 111 # We can pick:
112 112 # * missingheads part of common (::commonheads)
113 113 common = set(self.outgoing.common)
114 114 nm = self.repo.changelog.nodemap
115 115 cheads = [node for node in self.revs if nm[node] in common]
116 116 # and
117 117 # * commonheads parents on missing
118 118 revset = unfi.set('%ln and parents(roots(%ln))',
119 119 self.outgoing.commonheads,
120 120 self.outgoing.missing)
121 121 cheads.extend(c.node() for c in revset)
122 122 return cheads
123 123
124 124 @property
125 125 def commonheads(self):
126 126 """set of all common heads after changeset bundle push"""
127 127 if self.ret:
128 128 return self.futureheads
129 129 else:
130 130 return self.fallbackheads
131 131
132 132 def push(repo, remote, force=False, revs=None, newbranch=False):
133 133 '''Push outgoing changesets (limited by revs) from a local
134 134 repository to remote. Return an integer:
135 135 - None means nothing to push
136 136 - 0 means HTTP error
137 137 - 1 means we pushed and remote head count is unchanged *or*
138 138 we have outgoing changesets but refused to push
139 139 - other values as described by addchangegroup()
140 140 '''
141 141 pushop = pushoperation(repo, remote, force, revs, newbranch)
142 142 if pushop.remote.local():
143 143 missing = (set(pushop.repo.requirements)
144 144 - pushop.remote.local().supported)
145 145 if missing:
146 146 msg = _("required features are not"
147 147 " supported in the destination:"
148 148 " %s") % (', '.join(sorted(missing)))
149 149 raise util.Abort(msg)
150 150
151 151 # there are two ways to push to remote repo:
152 152 #
153 153 # addchangegroup assumes local user can lock remote
154 154 # repo (local filesystem, old ssh servers).
155 155 #
156 156 # unbundle assumes local user cannot lock remote repo (new ssh
157 157 # servers, http servers).
158 158
159 159 if not pushop.remote.canpush():
160 160 raise util.Abort(_("destination does not support push"))
161 161 # get local lock as we might write phase data
162 162 locallock = None
163 163 try:
164 164 locallock = pushop.repo.lock()
165 165 pushop.locallocked = True
166 166 except IOError, err:
167 167 pushop.locallocked = False
168 168 if err.errno != errno.EACCES:
169 169 raise
170 170 # source repo cannot be locked.
171 171 # We do not abort the push, but just disable the local phase
172 172 # synchronisation.
173 173 msg = 'cannot lock source repository: %s\n' % err
174 174 pushop.ui.debug(msg)
175 175 try:
176 176 pushop.repo.checkpush(pushop)
177 177 lock = None
178 178 unbundle = pushop.remote.capable('unbundle')
179 179 if not unbundle:
180 180 lock = pushop.remote.lock()
181 181 try:
182 182 _pushdiscovery(pushop)
183 183 if (pushop.repo.ui.configbool('experimental', 'bundle2-exp',
184 184 False)
185 185 and pushop.remote.capable('bundle2-exp')):
186 186 _pushbundle2(pushop)
187 187 _pushchangeset(pushop)
188 188 _pushsyncphase(pushop)
189 189 _pushobsolete(pushop)
190 190 finally:
191 191 if lock is not None:
192 192 lock.release()
193 193 finally:
194 194 if locallock is not None:
195 195 locallock.release()
196 196
197 197 _pushbookmark(pushop)
198 198 return pushop.ret
199 199
200 200 # list of steps to perform discovery before push
201 201 pushdiscoveryorder = []
202 202
203 203 # Mapping between step name and function
204 204 #
205 205 # This exists to help extensions wrap steps if necessary
206 206 pushdiscoverymapping = {}
207 207
208 208 def pushdiscovery(stepname):
209 209 """decorator for function performing discovery before push
210 210
211 211 The function is added to the step -> function mapping and appended to the
212 212 list of steps. Beware that decorated function will be added in order (this
213 213 may matter).
214 214
215 215 You can only use this decorator for a new step, if you want to wrap a step
216 216 from an extension, change the pushdiscovery dictionary directly."""
217 217 def dec(func):
218 218 assert stepname not in pushdiscoverymapping
219 219 pushdiscoverymapping[stepname] = func
220 220 pushdiscoveryorder.append(stepname)
221 221 return func
222 222 return dec
223 223
224 224
225 225
226 226 def _pushdiscovery(pushop):
227 227 """Run all discovery steps"""
228 228 for stepname in pushdiscoveryorder:
229 229 step = pushdiscoverymapping[stepname]
230 230 step(pushop)
231 231
232 232 @pushdiscovery('changeset')
233 233 def _pushdiscoverychangeset(pushop):
234 234 """discover the changeset that need to be pushed"""
235 235 unfi = pushop.repo.unfiltered()
236 236 fci = discovery.findcommonincoming
237 237 commoninc = fci(unfi, pushop.remote, force=pushop.force)
238 238 common, inc, remoteheads = commoninc
239 239 fco = discovery.findcommonoutgoing
240 240 outgoing = fco(unfi, pushop.remote, onlyheads=pushop.revs,
241 241 commoninc=commoninc, force=pushop.force)
242 242 pushop.outgoing = outgoing
243 243 pushop.remoteheads = remoteheads
244 244 pushop.incoming = inc
245 245
246 246 @pushdiscovery('phase')
247 247 def _pushdiscoveryphase(pushop):
248 248 """discover the phase that needs to be pushed
249 249
250 250 (computed for both success and failure case for changesets push)"""
251 251 outgoing = pushop.outgoing
252 252 unfi = pushop.repo.unfiltered()
253 253 remotephases = pushop.remote.listkeys('phases')
254 254 publishing = remotephases.get('publishing', False)
255 255 ana = phases.analyzeremotephases(pushop.repo,
256 256 pushop.fallbackheads,
257 257 remotephases)
258 258 pheads, droots = ana
259 259 extracond = ''
260 260 if not publishing:
261 261 extracond = ' and public()'
262 262 revset = 'heads((%%ln::%%ln) %s)' % extracond
263 263 # Get the list of all revs draft on remote by public here.
264 264 # XXX Beware that revset break if droots is not strictly
265 265 # XXX root we may want to ensure it is but it is costly
266 266 fallback = list(unfi.set(revset, droots, pushop.fallbackheads))
267 267 if not outgoing.missing:
268 268 future = fallback
269 269 else:
270 270 # adds changeset we are going to push as draft
271 271 #
272 272 # should not be necessary for pushblishing server, but because of an
273 273 # issue fixed in xxxxx we have to do it anyway.
274 274 fdroots = list(unfi.set('roots(%ln + %ln::)',
275 275 outgoing.missing, droots))
276 276 fdroots = [f.node() for f in fdroots]
277 277 future = list(unfi.set(revset, fdroots, pushop.futureheads))
278 278 pushop.outdatedphases = future
279 279 pushop.fallbackoutdatedphases = fallback
280 280
281 281 @pushdiscovery('obsmarker')
282 282 def _pushdiscoveryobsmarkers(pushop):
283 283 pushop.outobsmarkers = pushop.repo.obsstore
284 284
285 285 def _pushcheckoutgoing(pushop):
286 286 outgoing = pushop.outgoing
287 287 unfi = pushop.repo.unfiltered()
288 288 if not outgoing.missing:
289 289 # nothing to push
290 290 scmutil.nochangesfound(unfi.ui, unfi, outgoing.excluded)
291 291 return False
292 292 # something to push
293 293 if not pushop.force:
294 294 # if repo.obsstore == False --> no obsolete
295 295 # then, save the iteration
296 296 if unfi.obsstore:
297 297 # this message are here for 80 char limit reason
298 298 mso = _("push includes obsolete changeset: %s!")
299 299 mst = "push includes %s changeset: %s!"
300 300 # plain versions for i18n tool to detect them
301 301 _("push includes unstable changeset: %s!")
302 302 _("push includes bumped changeset: %s!")
303 303 _("push includes divergent changeset: %s!")
304 304 # If we are to push if there is at least one
305 305 # obsolete or unstable changeset in missing, at
306 306 # least one of the missinghead will be obsolete or
307 307 # unstable. So checking heads only is ok
308 308 for node in outgoing.missingheads:
309 309 ctx = unfi[node]
310 310 if ctx.obsolete():
311 311 raise util.Abort(mso % ctx)
312 312 elif ctx.troubled():
313 313 raise util.Abort(_(mst)
314 314 % (ctx.troubles()[0],
315 315 ctx))
316 316 newbm = pushop.ui.configlist('bookmarks', 'pushing')
317 317 discovery.checkheads(unfi, pushop.remote, outgoing,
318 318 pushop.remoteheads,
319 319 pushop.newbranch,
320 320 bool(pushop.incoming),
321 321 newbm)
322 322 return True
323 323
324 324 # List of names of steps to perform for an outgoing bundle2, order matters.
325 325 b2partsgenorder = []
326 326
327 327 # Mapping between step name and function
328 328 #
329 329 # This exists to help extensions wrap steps if necessary
330 330 b2partsgenmapping = {}
331 331
332 332 def b2partsgenerator(stepname):
333 333 """decorator for function generating bundle2 part
334 334
335 335 The function is added to the step -> function mapping and appended to the
336 336 list of steps. Beware that decorated functions will be added in order
337 337 (this may matter).
338 338
339 339 You can only use this decorator for new steps, if you want to wrap a step
340 340 from an extension, attack the b2partsgenmapping dictionary directly."""
341 341 def dec(func):
342 342 assert stepname not in b2partsgenmapping
343 343 b2partsgenmapping[stepname] = func
344 344 b2partsgenorder.append(stepname)
345 345 return func
346 346 return dec
347 347
348 348 @b2partsgenerator('changeset')
349 349 def _pushb2ctx(pushop, bundler):
350 350 """handle changegroup push through bundle2
351 351
352 352 addchangegroup result is stored in the ``pushop.ret`` attribute.
353 353 """
354 354 if 'changesets' in pushop.stepsdone:
355 355 return
356 356 pushop.stepsdone.add('changesets')
357 357 # Send known heads to the server for race detection.
358 358 pushop.stepsdone.add('changesets')
359 359 if not _pushcheckoutgoing(pushop):
360 360 return
361 361 pushop.repo.prepushoutgoinghooks(pushop.repo,
362 362 pushop.remote,
363 363 pushop.outgoing)
364 364 if not pushop.force:
365 365 bundler.newpart('B2X:CHECK:HEADS', data=iter(pushop.remoteheads))
366 366 cg = changegroup.getlocalbundle(pushop.repo, 'push', pushop.outgoing)
367 367 cgpart = bundler.newpart('B2X:CHANGEGROUP', data=cg.getchunks())
368 368 def handlereply(op):
369 369 """extract addchangroup returns from server reply"""
370 370 cgreplies = op.records.getreplies(cgpart.id)
371 371 assert len(cgreplies['changegroup']) == 1
372 372 pushop.ret = cgreplies['changegroup'][0]['return']
373 373 return handlereply
374 374
375 375 @b2partsgenerator('phase')
376 376 def _pushb2phases(pushop, bundler):
377 377 """handle phase push through bundle2"""
378 378 if 'phases' in pushop.stepsdone:
379 379 return
380 380 b2caps = bundle2.bundle2caps(pushop.remote)
381 381 if not 'b2x:pushkey' in b2caps:
382 382 return
383 383 pushop.stepsdone.add('phases')
384 384 part2node = []
385 385 enc = pushkey.encode
386 386 for newremotehead in pushop.outdatedphases:
387 387 part = bundler.newpart('b2x:pushkey')
388 388 part.addparam('namespace', enc('phases'))
389 389 part.addparam('key', enc(newremotehead.hex()))
390 390 part.addparam('old', enc(str(phases.draft)))
391 391 part.addparam('new', enc(str(phases.public)))
392 392 part2node.append((part.id, newremotehead))
393 393 def handlereply(op):
394 394 for partid, node in part2node:
395 395 partrep = op.records.getreplies(partid)
396 396 results = partrep['pushkey']
397 397 assert len(results) <= 1
398 398 msg = None
399 399 if not results:
400 400 msg = _('server ignored update of %s to public!\n') % node
401 401 elif not int(results[0]['return']):
402 402 msg = _('updating %s to public failed!\n') % node
403 403 if msg is not None:
404 404 pushop.ui.warn(msg)
405 405 return handlereply
406 406
407 407 def _pushbundle2(pushop):
408 408 """push data to the remote using bundle2
409 409
410 410 The only currently supported type of data is changegroup but this will
411 411 evolve in the future."""
412 412 bundler = bundle2.bundle20(pushop.ui, bundle2.bundle2caps(pushop.remote))
413 413 # create reply capability
414 414 capsblob = bundle2.encodecaps(pushop.repo.bundle2caps)
415 415 bundler.newpart('b2x:replycaps', data=capsblob)
416 416 replyhandlers = []
417 417 for partgenname in b2partsgenorder:
418 418 partgen = b2partsgenmapping[partgenname]
419 419 ret = partgen(pushop, bundler)
420 420 if callable(ret):
421 421 replyhandlers.append(ret)
422 422 # do not push if nothing to push
423 423 if bundler.nbparts <= 1:
424 424 return
425 425 stream = util.chunkbuffer(bundler.getchunks())
426 426 try:
427 427 reply = pushop.remote.unbundle(stream, ['force'], 'push')
428 428 except error.BundleValueError, exc:
429 429 raise util.Abort('missing support for %s' % exc)
430 430 try:
431 431 op = bundle2.processbundle(pushop.repo, reply)
432 432 except error.BundleValueError, exc:
433 433 raise util.Abort('missing support for %s' % exc)
434 434 for rephand in replyhandlers:
435 435 rephand(op)
436 436
437 437 def _pushchangeset(pushop):
438 438 """Make the actual push of changeset bundle to remote repo"""
439 439 if 'changesets' in pushop.stepsdone:
440 440 return
441 441 pushop.stepsdone.add('changesets')
442 442 if not _pushcheckoutgoing(pushop):
443 443 return
444 444 pushop.repo.prepushoutgoinghooks(pushop.repo,
445 445 pushop.remote,
446 446 pushop.outgoing)
447 447 outgoing = pushop.outgoing
448 448 unbundle = pushop.remote.capable('unbundle')
449 449 # TODO: get bundlecaps from remote
450 450 bundlecaps = None
451 451 # create a changegroup from local
452 452 if pushop.revs is None and not (outgoing.excluded
453 453 or pushop.repo.changelog.filteredrevs):
454 454 # push everything,
455 455 # use the fast path, no race possible on push
456 456 bundler = changegroup.bundle10(pushop.repo, bundlecaps)
457 457 cg = changegroup.getsubset(pushop.repo,
458 458 outgoing,
459 459 bundler,
460 460 'push',
461 461 fastpath=True)
462 462 else:
463 463 cg = changegroup.getlocalbundle(pushop.repo, 'push', outgoing,
464 464 bundlecaps)
465 465
466 466 # apply changegroup to remote
467 467 if unbundle:
468 468 # local repo finds heads on server, finds out what
469 469 # revs it must push. once revs transferred, if server
470 470 # finds it has different heads (someone else won
471 471 # commit/push race), server aborts.
472 472 if pushop.force:
473 473 remoteheads = ['force']
474 474 else:
475 475 remoteheads = pushop.remoteheads
476 476 # ssh: return remote's addchangegroup()
477 477 # http: return remote's addchangegroup() or 0 for error
478 478 pushop.ret = pushop.remote.unbundle(cg, remoteheads,
479 479 pushop.repo.url())
480 480 else:
481 481 # we return an integer indicating remote head count
482 482 # change
483 483 pushop.ret = pushop.remote.addchangegroup(cg, 'push', pushop.repo.url())
484 484
485 485 def _pushsyncphase(pushop):
486 486 """synchronise phase information locally and remotely"""
487 487 cheads = pushop.commonheads
488 488 # even when we don't push, exchanging phase data is useful
489 489 remotephases = pushop.remote.listkeys('phases')
490 490 if (pushop.ui.configbool('ui', '_usedassubrepo', False)
491 491 and remotephases # server supports phases
492 492 and pushop.ret is None # nothing was pushed
493 493 and remotephases.get('publishing', False)):
494 494 # When:
495 495 # - this is a subrepo push
496 496 # - and remote support phase
497 497 # - and no changeset was pushed
498 498 # - and remote is publishing
499 499 # We may be in issue 3871 case!
500 500 # We drop the possible phase synchronisation done by
501 501 # courtesy to publish changesets possibly locally draft
502 502 # on the remote.
503 503 remotephases = {'publishing': 'True'}
504 504 if not remotephases: # old server or public only reply from non-publishing
505 505 _localphasemove(pushop, cheads)
506 506 # don't push any phase data as there is nothing to push
507 507 else:
508 508 ana = phases.analyzeremotephases(pushop.repo, cheads,
509 509 remotephases)
510 510 pheads, droots = ana
511 511 ### Apply remote phase on local
512 512 if remotephases.get('publishing', False):
513 513 _localphasemove(pushop, cheads)
514 514 else: # publish = False
515 515 _localphasemove(pushop, pheads)
516 516 _localphasemove(pushop, cheads, phases.draft)
517 517 ### Apply local phase on remote
518 518
519 519 if pushop.ret:
520 520 if 'phases' in pushop.stepsdone:
521 521 # phases already pushed though bundle2
522 522 return
523 523 outdated = pushop.outdatedphases
524 524 else:
525 525 outdated = pushop.fallbackoutdatedphases
526 526
527 527 pushop.stepsdone.add('phases')
528 528
529 529 # filter heads already turned public by the push
530 530 outdated = [c for c in outdated if c.node() not in pheads]
531 531 b2caps = bundle2.bundle2caps(pushop.remote)
532 532 if 'b2x:pushkey' in b2caps:
533 533 # server supports bundle2, let's do a batched push through it
534 534 #
535 535 # This will eventually be unified with the changesets bundle2 push
536 536 bundler = bundle2.bundle20(pushop.ui, b2caps)
537 537 capsblob = bundle2.encodecaps(pushop.repo.bundle2caps)
538 538 bundler.newpart('b2x:replycaps', data=capsblob)
539 539 part2node = []
540 540 enc = pushkey.encode
541 541 for newremotehead in outdated:
542 542 part = bundler.newpart('b2x:pushkey')
543 543 part.addparam('namespace', enc('phases'))
544 544 part.addparam('key', enc(newremotehead.hex()))
545 545 part.addparam('old', enc(str(phases.draft)))
546 546 part.addparam('new', enc(str(phases.public)))
547 547 part2node.append((part.id, newremotehead))
548 548 stream = util.chunkbuffer(bundler.getchunks())
549 549 try:
550 550 reply = pushop.remote.unbundle(stream, ['force'], 'push')
551 551 op = bundle2.processbundle(pushop.repo, reply)
552 552 except error.BundleValueError, exc:
553 553 raise util.Abort('missing support for %s' % exc)
554 554 for partid, node in part2node:
555 555 partrep = op.records.getreplies(partid)
556 556 results = partrep['pushkey']
557 557 assert len(results) <= 1
558 558 msg = None
559 559 if not results:
560 560 msg = _('server ignored update of %s to public!\n') % node
561 561 elif not int(results[0]['return']):
562 562 msg = _('updating %s to public failed!\n') % node
563 563 if msg is not None:
564 564 pushop.ui.warn(msg)
565 565
566 566 else:
567 567 # fallback to independant pushkey command
568 568 for newremotehead in outdated:
569 569 r = pushop.remote.pushkey('phases',
570 570 newremotehead.hex(),
571 571 str(phases.draft),
572 572 str(phases.public))
573 573 if not r:
574 574 pushop.ui.warn(_('updating %s to public failed!\n')
575 575 % newremotehead)
576 576
577 577 def _localphasemove(pushop, nodes, phase=phases.public):
578 578 """move <nodes> to <phase> in the local source repo"""
579 579 if pushop.locallocked:
580 phases.advanceboundary(pushop.repo, phase, nodes)
580 tr = pushop.repo.transaction('push-phase-sync')
581 try:
582 phases.advanceboundary(pushop.repo, phase, nodes)
583 tr.close()
584 finally:
585 tr.release()
581 586 else:
582 587 # repo is not locked, do not change any phases!
583 588 # Informs the user that phases should have been moved when
584 589 # applicable.
585 590 actualmoves = [n for n in nodes if phase < pushop.repo[n].phase()]
586 591 phasestr = phases.phasenames[phase]
587 592 if actualmoves:
588 593 pushop.ui.status(_('cannot lock source repo, skipping '
589 594 'local %s phase update\n') % phasestr)
590 595
591 596 def _pushobsolete(pushop):
592 597 """utility function to push obsolete markers to a remote"""
593 598 if 'obsmarkers' in pushop.stepsdone:
594 599 return
595 600 pushop.ui.debug('try to push obsolete markers to remote\n')
596 601 repo = pushop.repo
597 602 remote = pushop.remote
598 603 pushop.stepsdone.add('obsmarkers')
599 604 if (obsolete._enabled and repo.obsstore and
600 605 'obsolete' in remote.listkeys('namespaces')):
601 606 rslts = []
602 607 remotedata = obsolete._pushkeyescape(pushop.outobsmarkers)
603 608 for key in sorted(remotedata, reverse=True):
604 609 # reverse sort to ensure we end with dump0
605 610 data = remotedata[key]
606 611 rslts.append(remote.pushkey('obsolete', key, '', data))
607 612 if [r for r in rslts if not r]:
608 613 msg = _('failed to push some obsolete markers!\n')
609 614 repo.ui.warn(msg)
610 615
611 616 def _pushbookmark(pushop):
612 617 """Update bookmark position on remote"""
613 618 ui = pushop.ui
614 619 repo = pushop.repo.unfiltered()
615 620 remote = pushop.remote
616 621 ui.debug("checking for updated bookmarks\n")
617 622 revnums = map(repo.changelog.rev, pushop.revs or [])
618 623 ancestors = [a for a in repo.changelog.ancestors(revnums, inclusive=True)]
619 624 (addsrc, adddst, advsrc, advdst, diverge, differ, invalid
620 625 ) = bookmarks.compare(repo, repo._bookmarks, remote.listkeys('bookmarks'),
621 626 srchex=hex)
622 627
623 628 for b, scid, dcid in advsrc:
624 629 if ancestors and repo[scid].rev() not in ancestors:
625 630 continue
626 631 if remote.pushkey('bookmarks', b, dcid, scid):
627 632 ui.status(_("updating bookmark %s\n") % b)
628 633 else:
629 634 ui.warn(_('updating bookmark %s failed!\n') % b)
630 635
631 636 class pulloperation(object):
632 637 """A object that represent a single pull operation
633 638
634 639 It purpose is to carry push related state and very common operation.
635 640
636 641 A new should be created at the beginning of each pull and discarded
637 642 afterward.
638 643 """
639 644
640 645 def __init__(self, repo, remote, heads=None, force=False):
641 646 # repo we pull into
642 647 self.repo = repo
643 648 # repo we pull from
644 649 self.remote = remote
645 650 # revision we try to pull (None is "all")
646 651 self.heads = heads
647 652 # do we force pull?
648 653 self.force = force
649 654 # the name the pull transaction
650 655 self._trname = 'pull\n' + util.hidepassword(remote.url())
651 656 # hold the transaction once created
652 657 self._tr = None
653 658 # set of common changeset between local and remote before pull
654 659 self.common = None
655 660 # set of pulled head
656 661 self.rheads = None
657 662 # list of missing changeset to fetch remotely
658 663 self.fetch = None
659 664 # result of changegroup pulling (used as return code by pull)
660 665 self.cgresult = None
661 666 # list of step remaining todo (related to future bundle2 usage)
662 667 self.todosteps = set(['changegroup', 'phases', 'obsmarkers'])
663 668
664 669 @util.propertycache
665 670 def pulledsubset(self):
666 671 """heads of the set of changeset target by the pull"""
667 672 # compute target subset
668 673 if self.heads is None:
669 674 # We pulled every thing possible
670 675 # sync on everything common
671 676 c = set(self.common)
672 677 ret = list(self.common)
673 678 for n in self.rheads:
674 679 if n not in c:
675 680 ret.append(n)
676 681 return ret
677 682 else:
678 683 # We pulled a specific subset
679 684 # sync on this subset
680 685 return self.heads
681 686
682 687 def gettransaction(self):
683 688 """get appropriate pull transaction, creating it if needed"""
684 689 if self._tr is None:
685 690 self._tr = self.repo.transaction(self._trname)
686 691 return self._tr
687 692
688 693 def closetransaction(self):
689 694 """close transaction if created"""
690 695 if self._tr is not None:
691 696 self._tr.close()
692 697
693 698 def releasetransaction(self):
694 699 """release transaction if created"""
695 700 if self._tr is not None:
696 701 self._tr.release()
697 702
698 703 def pull(repo, remote, heads=None, force=False):
699 704 pullop = pulloperation(repo, remote, heads, force)
700 705 if pullop.remote.local():
701 706 missing = set(pullop.remote.requirements) - pullop.repo.supported
702 707 if missing:
703 708 msg = _("required features are not"
704 709 " supported in the destination:"
705 710 " %s") % (', '.join(sorted(missing)))
706 711 raise util.Abort(msg)
707 712
708 713 lock = pullop.repo.lock()
709 714 try:
710 715 _pulldiscovery(pullop)
711 716 if (pullop.repo.ui.configbool('experimental', 'bundle2-exp', False)
712 717 and pullop.remote.capable('bundle2-exp')):
713 718 _pullbundle2(pullop)
714 719 if 'changegroup' in pullop.todosteps:
715 720 _pullchangeset(pullop)
716 721 if 'phases' in pullop.todosteps:
717 722 _pullphase(pullop)
718 723 if 'obsmarkers' in pullop.todosteps:
719 724 _pullobsolete(pullop)
720 725 pullop.closetransaction()
721 726 finally:
722 727 pullop.releasetransaction()
723 728 lock.release()
724 729
725 730 return pullop.cgresult
726 731
727 732 def _pulldiscovery(pullop):
728 733 """discovery phase for the pull
729 734
730 735 Current handle changeset discovery only, will change handle all discovery
731 736 at some point."""
732 737 tmp = discovery.findcommonincoming(pullop.repo.unfiltered(),
733 738 pullop.remote,
734 739 heads=pullop.heads,
735 740 force=pullop.force)
736 741 pullop.common, pullop.fetch, pullop.rheads = tmp
737 742
738 743 def _pullbundle2(pullop):
739 744 """pull data using bundle2
740 745
741 746 For now, the only supported data are changegroup."""
742 747 remotecaps = bundle2.bundle2caps(pullop.remote)
743 748 kwargs = {'bundlecaps': caps20to10(pullop.repo)}
744 749 # pulling changegroup
745 750 pullop.todosteps.remove('changegroup')
746 751
747 752 kwargs['common'] = pullop.common
748 753 kwargs['heads'] = pullop.heads or pullop.rheads
749 754 if 'b2x:listkeys' in remotecaps:
750 755 kwargs['listkeys'] = ['phase']
751 756 if not pullop.fetch:
752 757 pullop.repo.ui.status(_("no changes found\n"))
753 758 pullop.cgresult = 0
754 759 else:
755 760 if pullop.heads is None and list(pullop.common) == [nullid]:
756 761 pullop.repo.ui.status(_("requesting all changes\n"))
757 762 _pullbundle2extraprepare(pullop, kwargs)
758 763 if kwargs.keys() == ['format']:
759 764 return # nothing to pull
760 765 bundle = pullop.remote.getbundle('pull', **kwargs)
761 766 try:
762 767 op = bundle2.processbundle(pullop.repo, bundle, pullop.gettransaction)
763 768 except error.BundleValueError, exc:
764 769 raise util.Abort('missing support for %s' % exc)
765 770
766 771 if pullop.fetch:
767 772 assert len(op.records['changegroup']) == 1
768 773 pullop.cgresult = op.records['changegroup'][0]['return']
769 774
770 775 # processing phases change
771 776 for namespace, value in op.records['listkeys']:
772 777 if namespace == 'phases':
773 778 _pullapplyphases(pullop, value)
774 779
775 780 def _pullbundle2extraprepare(pullop, kwargs):
776 781 """hook function so that extensions can extend the getbundle call"""
777 782 pass
778 783
779 784 def _pullchangeset(pullop):
780 785 """pull changeset from unbundle into the local repo"""
781 786 # We delay the open of the transaction as late as possible so we
782 787 # don't open transaction for nothing or you break future useful
783 788 # rollback call
784 789 pullop.todosteps.remove('changegroup')
785 790 if not pullop.fetch:
786 791 pullop.repo.ui.status(_("no changes found\n"))
787 792 pullop.cgresult = 0
788 793 return
789 794 pullop.gettransaction()
790 795 if pullop.heads is None and list(pullop.common) == [nullid]:
791 796 pullop.repo.ui.status(_("requesting all changes\n"))
792 797 elif pullop.heads is None and pullop.remote.capable('changegroupsubset'):
793 798 # issue1320, avoid a race if remote changed after discovery
794 799 pullop.heads = pullop.rheads
795 800
796 801 if pullop.remote.capable('getbundle'):
797 802 # TODO: get bundlecaps from remote
798 803 cg = pullop.remote.getbundle('pull', common=pullop.common,
799 804 heads=pullop.heads or pullop.rheads)
800 805 elif pullop.heads is None:
801 806 cg = pullop.remote.changegroup(pullop.fetch, 'pull')
802 807 elif not pullop.remote.capable('changegroupsubset'):
803 808 raise util.Abort(_("partial pull cannot be done because "
804 809 "other repository doesn't support "
805 810 "changegroupsubset."))
806 811 else:
807 812 cg = pullop.remote.changegroupsubset(pullop.fetch, pullop.heads, 'pull')
808 813 pullop.cgresult = changegroup.addchangegroup(pullop.repo, cg, 'pull',
809 814 pullop.remote.url())
810 815
811 816 def _pullphase(pullop):
812 817 # Get remote phases data from remote
813 818 remotephases = pullop.remote.listkeys('phases')
814 819 _pullapplyphases(pullop, remotephases)
815 820
816 821 def _pullapplyphases(pullop, remotephases):
817 822 """apply phase movement from observed remote state"""
818 823 pullop.todosteps.remove('phases')
819 824 publishing = bool(remotephases.get('publishing', False))
820 825 if remotephases and not publishing:
821 826 # remote is new and unpublishing
822 827 pheads, _dr = phases.analyzeremotephases(pullop.repo,
823 828 pullop.pulledsubset,
824 829 remotephases)
825 830 phases.advanceboundary(pullop.repo, phases.public, pheads)
826 831 phases.advanceboundary(pullop.repo, phases.draft,
827 832 pullop.pulledsubset)
828 833 else:
829 834 # Remote is old or publishing all common changesets
830 835 # should be seen as public
831 836 phases.advanceboundary(pullop.repo, phases.public,
832 837 pullop.pulledsubset)
833 838
834 839 def _pullobsolete(pullop):
835 840 """utility function to pull obsolete markers from a remote
836 841
837 842 The `gettransaction` is function that return the pull transaction, creating
838 843 one if necessary. We return the transaction to inform the calling code that
839 844 a new transaction have been created (when applicable).
840 845
841 846 Exists mostly to allow overriding for experimentation purpose"""
842 847 pullop.todosteps.remove('obsmarkers')
843 848 tr = None
844 849 if obsolete._enabled:
845 850 pullop.repo.ui.debug('fetching remote obsolete markers\n')
846 851 remoteobs = pullop.remote.listkeys('obsolete')
847 852 if 'dump0' in remoteobs:
848 853 tr = pullop.gettransaction()
849 854 for key in sorted(remoteobs, reverse=True):
850 855 if key.startswith('dump'):
851 856 data = base85.b85decode(remoteobs[key])
852 857 pullop.repo.obsstore.mergemarkers(tr, data)
853 858 pullop.repo.invalidatevolatilesets()
854 859 return tr
855 860
856 861 def caps20to10(repo):
857 862 """return a set with appropriate options to use bundle20 during getbundle"""
858 863 caps = set(['HG2X'])
859 864 capsblob = bundle2.encodecaps(repo.bundle2caps)
860 865 caps.add('bundle2=' + urllib.quote(capsblob))
861 866 return caps
862 867
863 868 def getbundle(repo, source, heads=None, common=None, bundlecaps=None,
864 869 **kwargs):
865 870 """return a full bundle (with potentially multiple kind of parts)
866 871
867 872 Could be a bundle HG10 or a bundle HG2X depending on bundlecaps
868 873 passed. For now, the bundle can contain only changegroup, but this will
869 874 changes when more part type will be available for bundle2.
870 875
871 876 This is different from changegroup.getbundle that only returns an HG10
872 877 changegroup bundle. They may eventually get reunited in the future when we
873 878 have a clearer idea of the API we what to query different data.
874 879
875 880 The implementation is at a very early stage and will get massive rework
876 881 when the API of bundle is refined.
877 882 """
878 883 cg = None
879 884 if kwargs.get('cg', True):
880 885 # build changegroup bundle here.
881 886 cg = changegroup.getbundle(repo, source, heads=heads,
882 887 common=common, bundlecaps=bundlecaps)
883 888 elif 'HG2X' not in bundlecaps:
884 889 raise ValueError(_('request for bundle10 must include changegroup'))
885 890 if bundlecaps is None or 'HG2X' not in bundlecaps:
886 891 if kwargs:
887 892 raise ValueError(_('unsupported getbundle arguments: %s')
888 893 % ', '.join(sorted(kwargs.keys())))
889 894 return cg
890 895 # very crude first implementation,
891 896 # the bundle API will change and the generation will be done lazily.
892 897 b2caps = {}
893 898 for bcaps in bundlecaps:
894 899 if bcaps.startswith('bundle2='):
895 900 blob = urllib.unquote(bcaps[len('bundle2='):])
896 901 b2caps.update(bundle2.decodecaps(blob))
897 902 bundler = bundle2.bundle20(repo.ui, b2caps)
898 903 if cg:
899 904 bundler.newpart('b2x:changegroup', data=cg.getchunks())
900 905 listkeys = kwargs.get('listkeys', ())
901 906 for namespace in listkeys:
902 907 part = bundler.newpart('b2x:listkeys')
903 908 part.addparam('namespace', namespace)
904 909 keys = repo.listkeys(namespace).items()
905 910 part.data = pushkey.encodekeys(keys)
906 911 _getbundleextrapart(bundler, repo, source, heads=heads, common=common,
907 912 bundlecaps=bundlecaps, **kwargs)
908 913 return util.chunkbuffer(bundler.getchunks())
909 914
910 915 def _getbundleextrapart(bundler, repo, source, heads=None, common=None,
911 916 bundlecaps=None, **kwargs):
912 917 """hook function to let extensions add parts to the requested bundle"""
913 918 pass
914 919
915 920 def check_heads(repo, their_heads, context):
916 921 """check if the heads of a repo have been modified
917 922
918 923 Used by peer for unbundling.
919 924 """
920 925 heads = repo.heads()
921 926 heads_hash = util.sha1(''.join(sorted(heads))).digest()
922 927 if not (their_heads == ['force'] or their_heads == heads or
923 928 their_heads == ['hashed', heads_hash]):
924 929 # someone else committed/pushed/unbundled while we
925 930 # were transferring data
926 931 raise error.PushRaced('repository changed while %s - '
927 932 'please try again' % context)
928 933
929 934 def unbundle(repo, cg, heads, source, url):
930 935 """Apply a bundle to a repo.
931 936
932 937 this function makes sure the repo is locked during the application and have
933 938 mechanism to check that no push race occurred between the creation of the
934 939 bundle and its application.
935 940
936 941 If the push was raced as PushRaced exception is raised."""
937 942 r = 0
938 943 # need a transaction when processing a bundle2 stream
939 944 tr = None
940 945 lock = repo.lock()
941 946 try:
942 947 check_heads(repo, heads, 'uploading changes')
943 948 # push can proceed
944 949 if util.safehasattr(cg, 'params'):
945 950 try:
946 951 tr = repo.transaction('unbundle')
947 952 tr.hookargs['bundle2-exp'] = '1'
948 953 r = bundle2.processbundle(repo, cg, lambda: tr).reply
949 954 cl = repo.unfiltered().changelog
950 955 p = cl.writepending() and repo.root or ""
951 956 repo.hook('b2x-pretransactionclose', throw=True, source=source,
952 957 url=url, pending=p, **tr.hookargs)
953 958 tr.close()
954 959 repo.hook('b2x-transactionclose', source=source, url=url,
955 960 **tr.hookargs)
956 961 except Exception, exc:
957 962 exc.duringunbundle2 = True
958 963 raise
959 964 else:
960 965 r = changegroup.addchangegroup(repo, cg, source, url)
961 966 finally:
962 967 if tr is not None:
963 968 tr.release()
964 969 lock.release()
965 970 return r
General Comments 0
You need to be logged in to leave comments. Login now