##// END OF EJS Templates
exchange: remove duplicated addition to pushop.stepdone...
Pierre-Yves David -
r22244:172036d6 default
parent child Browse files
Show More
@@ -1,1030 +1,1029 b''
1 1 # exchange.py - utility to exchange data between repos.
2 2 #
3 3 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from i18n import _
9 9 from node import hex, nullid
10 10 import errno, urllib
11 11 import util, scmutil, changegroup, base85, error
12 12 import discovery, phases, obsolete, bookmarks, bundle2, pushkey
13 13
14 14 def readbundle(ui, fh, fname, vfs=None):
15 15 header = changegroup.readexactly(fh, 4)
16 16
17 17 alg = None
18 18 if not fname:
19 19 fname = "stream"
20 20 if not header.startswith('HG') and header.startswith('\0'):
21 21 fh = changegroup.headerlessfixup(fh, header)
22 22 header = "HG10"
23 23 alg = 'UN'
24 24 elif vfs:
25 25 fname = vfs.join(fname)
26 26
27 27 magic, version = header[0:2], header[2:4]
28 28
29 29 if magic != 'HG':
30 30 raise util.Abort(_('%s: not a Mercurial bundle') % fname)
31 31 if version == '10':
32 32 if alg is None:
33 33 alg = changegroup.readexactly(fh, 2)
34 34 return changegroup.unbundle10(fh, alg)
35 35 elif version == '2X':
36 36 return bundle2.unbundle20(ui, fh, header=magic + version)
37 37 else:
38 38 raise util.Abort(_('%s: unknown bundle version %s') % (fname, version))
39 39
40 40
41 41 class pushoperation(object):
42 42 """A object that represent a single push operation
43 43
44 44 It purpose is to carry push related state and very common operation.
45 45
46 46 A new should be created at the beginning of each push and discarded
47 47 afterward.
48 48 """
49 49
50 50 def __init__(self, repo, remote, force=False, revs=None, newbranch=False):
51 51 # repo we push from
52 52 self.repo = repo
53 53 self.ui = repo.ui
54 54 # repo we push to
55 55 self.remote = remote
56 56 # force option provided
57 57 self.force = force
58 58 # revs to be pushed (None is "all")
59 59 self.revs = revs
60 60 # allow push of new branch
61 61 self.newbranch = newbranch
62 62 # did a local lock get acquired?
63 63 self.locallocked = None
64 64 # step already performed
65 65 # (used to check what steps have been already performed through bundle2)
66 66 self.stepsdone = set()
67 67 # Integer version of the push result
68 68 # - None means nothing to push
69 69 # - 0 means HTTP error
70 70 # - 1 means we pushed and remote head count is unchanged *or*
71 71 # we have outgoing changesets but refused to push
72 72 # - other values as described by addchangegroup()
73 73 self.ret = None
74 74 # discover.outgoing object (contains common and outgoing data)
75 75 self.outgoing = None
76 76 # all remote heads before the push
77 77 self.remoteheads = None
78 78 # testable as a boolean indicating if any nodes are missing locally.
79 79 self.incoming = None
80 80 # phases changes that must be pushed along side the changesets
81 81 self.outdatedphases = None
82 82 # phases changes that must be pushed if changeset push fails
83 83 self.fallbackoutdatedphases = None
84 84 # outgoing obsmarkers
85 85 self.outobsmarkers = set()
86 86 # outgoing bookmarks
87 87 self.outbookmarks = []
88 88
89 89 @util.propertycache
90 90 def futureheads(self):
91 91 """future remote heads if the changeset push succeeds"""
92 92 return self.outgoing.missingheads
93 93
94 94 @util.propertycache
95 95 def fallbackheads(self):
96 96 """future remote heads if the changeset push fails"""
97 97 if self.revs is None:
98 98 # not target to push, all common are relevant
99 99 return self.outgoing.commonheads
100 100 unfi = self.repo.unfiltered()
101 101 # I want cheads = heads(::missingheads and ::commonheads)
102 102 # (missingheads is revs with secret changeset filtered out)
103 103 #
104 104 # This can be expressed as:
105 105 # cheads = ( (missingheads and ::commonheads)
106 106 # + (commonheads and ::missingheads))"
107 107 # )
108 108 #
109 109 # while trying to push we already computed the following:
110 110 # common = (::commonheads)
111 111 # missing = ((commonheads::missingheads) - commonheads)
112 112 #
113 113 # We can pick:
114 114 # * missingheads part of common (::commonheads)
115 115 common = set(self.outgoing.common)
116 116 nm = self.repo.changelog.nodemap
117 117 cheads = [node for node in self.revs if nm[node] in common]
118 118 # and
119 119 # * commonheads parents on missing
120 120 revset = unfi.set('%ln and parents(roots(%ln))',
121 121 self.outgoing.commonheads,
122 122 self.outgoing.missing)
123 123 cheads.extend(c.node() for c in revset)
124 124 return cheads
125 125
126 126 @property
127 127 def commonheads(self):
128 128 """set of all common heads after changeset bundle push"""
129 129 if self.ret:
130 130 return self.futureheads
131 131 else:
132 132 return self.fallbackheads
133 133
134 134 def push(repo, remote, force=False, revs=None, newbranch=False):
135 135 '''Push outgoing changesets (limited by revs) from a local
136 136 repository to remote. Return an integer:
137 137 - None means nothing to push
138 138 - 0 means HTTP error
139 139 - 1 means we pushed and remote head count is unchanged *or*
140 140 we have outgoing changesets but refused to push
141 141 - other values as described by addchangegroup()
142 142 '''
143 143 pushop = pushoperation(repo, remote, force, revs, newbranch)
144 144 if pushop.remote.local():
145 145 missing = (set(pushop.repo.requirements)
146 146 - pushop.remote.local().supported)
147 147 if missing:
148 148 msg = _("required features are not"
149 149 " supported in the destination:"
150 150 " %s") % (', '.join(sorted(missing)))
151 151 raise util.Abort(msg)
152 152
153 153 # there are two ways to push to remote repo:
154 154 #
155 155 # addchangegroup assumes local user can lock remote
156 156 # repo (local filesystem, old ssh servers).
157 157 #
158 158 # unbundle assumes local user cannot lock remote repo (new ssh
159 159 # servers, http servers).
160 160
161 161 if not pushop.remote.canpush():
162 162 raise util.Abort(_("destination does not support push"))
163 163 # get local lock as we might write phase data
164 164 locallock = None
165 165 try:
166 166 locallock = pushop.repo.lock()
167 167 pushop.locallocked = True
168 168 except IOError, err:
169 169 pushop.locallocked = False
170 170 if err.errno != errno.EACCES:
171 171 raise
172 172 # source repo cannot be locked.
173 173 # We do not abort the push, but just disable the local phase
174 174 # synchronisation.
175 175 msg = 'cannot lock source repository: %s\n' % err
176 176 pushop.ui.debug(msg)
177 177 try:
178 178 pushop.repo.checkpush(pushop)
179 179 lock = None
180 180 unbundle = pushop.remote.capable('unbundle')
181 181 if not unbundle:
182 182 lock = pushop.remote.lock()
183 183 try:
184 184 _pushdiscovery(pushop)
185 185 if (pushop.repo.ui.configbool('experimental', 'bundle2-exp',
186 186 False)
187 187 and pushop.remote.capable('bundle2-exp')):
188 188 _pushbundle2(pushop)
189 189 _pushchangeset(pushop)
190 190 _pushsyncphase(pushop)
191 191 _pushobsolete(pushop)
192 192 _pushbookmark(pushop)
193 193 finally:
194 194 if lock is not None:
195 195 lock.release()
196 196 finally:
197 197 if locallock is not None:
198 198 locallock.release()
199 199
200 200 return pushop.ret
201 201
202 202 # list of steps to perform discovery before push
203 203 pushdiscoveryorder = []
204 204
205 205 # Mapping between step name and function
206 206 #
207 207 # This exists to help extensions wrap steps if necessary
208 208 pushdiscoverymapping = {}
209 209
210 210 def pushdiscovery(stepname):
211 211 """decorator for function performing discovery before push
212 212
213 213 The function is added to the step -> function mapping and appended to the
214 214 list of steps. Beware that decorated function will be added in order (this
215 215 may matter).
216 216
217 217 You can only use this decorator for a new step, if you want to wrap a step
218 218 from an extension, change the pushdiscovery dictionary directly."""
219 219 def dec(func):
220 220 assert stepname not in pushdiscoverymapping
221 221 pushdiscoverymapping[stepname] = func
222 222 pushdiscoveryorder.append(stepname)
223 223 return func
224 224 return dec
225 225
226 226 def _pushdiscovery(pushop):
227 227 """Run all discovery steps"""
228 228 for stepname in pushdiscoveryorder:
229 229 step = pushdiscoverymapping[stepname]
230 230 step(pushop)
231 231
232 232 @pushdiscovery('changeset')
233 233 def _pushdiscoverychangeset(pushop):
234 234 """discover the changeset that need to be pushed"""
235 235 unfi = pushop.repo.unfiltered()
236 236 fci = discovery.findcommonincoming
237 237 commoninc = fci(unfi, pushop.remote, force=pushop.force)
238 238 common, inc, remoteheads = commoninc
239 239 fco = discovery.findcommonoutgoing
240 240 outgoing = fco(unfi, pushop.remote, onlyheads=pushop.revs,
241 241 commoninc=commoninc, force=pushop.force)
242 242 pushop.outgoing = outgoing
243 243 pushop.remoteheads = remoteheads
244 244 pushop.incoming = inc
245 245
246 246 @pushdiscovery('phase')
247 247 def _pushdiscoveryphase(pushop):
248 248 """discover the phase that needs to be pushed
249 249
250 250 (computed for both success and failure case for changesets push)"""
251 251 outgoing = pushop.outgoing
252 252 unfi = pushop.repo.unfiltered()
253 253 remotephases = pushop.remote.listkeys('phases')
254 254 publishing = remotephases.get('publishing', False)
255 255 ana = phases.analyzeremotephases(pushop.repo,
256 256 pushop.fallbackheads,
257 257 remotephases)
258 258 pheads, droots = ana
259 259 extracond = ''
260 260 if not publishing:
261 261 extracond = ' and public()'
262 262 revset = 'heads((%%ln::%%ln) %s)' % extracond
263 263 # Get the list of all revs draft on remote by public here.
264 264 # XXX Beware that revset break if droots is not strictly
265 265 # XXX root we may want to ensure it is but it is costly
266 266 fallback = list(unfi.set(revset, droots, pushop.fallbackheads))
267 267 if not outgoing.missing:
268 268 future = fallback
269 269 else:
270 270 # adds changeset we are going to push as draft
271 271 #
272 272 # should not be necessary for pushblishing server, but because of an
273 273 # issue fixed in xxxxx we have to do it anyway.
274 274 fdroots = list(unfi.set('roots(%ln + %ln::)',
275 275 outgoing.missing, droots))
276 276 fdroots = [f.node() for f in fdroots]
277 277 future = list(unfi.set(revset, fdroots, pushop.futureheads))
278 278 pushop.outdatedphases = future
279 279 pushop.fallbackoutdatedphases = fallback
280 280
281 281 @pushdiscovery('obsmarker')
282 282 def _pushdiscoveryobsmarkers(pushop):
283 283 pushop.outobsmarkers = pushop.repo.obsstore
284 284
285 285 @pushdiscovery('bookmarks')
286 286 def _pushdiscoverybookmarks(pushop):
287 287 ui = pushop.ui
288 288 repo = pushop.repo.unfiltered()
289 289 remote = pushop.remote
290 290 ui.debug("checking for updated bookmarks\n")
291 291 ancestors = ()
292 292 if pushop.revs:
293 293 revnums = map(repo.changelog.rev, pushop.revs)
294 294 ancestors = repo.changelog.ancestors(revnums, inclusive=True)
295 295 remotebookmark = remote.listkeys('bookmarks')
296 296
297 297 comp = bookmarks.compare(repo, repo._bookmarks, remotebookmark, srchex=hex)
298 298 addsrc, adddst, advsrc, advdst, diverge, differ, invalid = comp
299 299 for b, scid, dcid in advsrc:
300 300 if not ancestors or repo[scid].rev() in ancestors:
301 301 pushop.outbookmarks.append((b, dcid, scid))
302 302
303 303 def _pushcheckoutgoing(pushop):
304 304 outgoing = pushop.outgoing
305 305 unfi = pushop.repo.unfiltered()
306 306 if not outgoing.missing:
307 307 # nothing to push
308 308 scmutil.nochangesfound(unfi.ui, unfi, outgoing.excluded)
309 309 return False
310 310 # something to push
311 311 if not pushop.force:
312 312 # if repo.obsstore == False --> no obsolete
313 313 # then, save the iteration
314 314 if unfi.obsstore:
315 315 # this message are here for 80 char limit reason
316 316 mso = _("push includes obsolete changeset: %s!")
317 317 mst = "push includes %s changeset: %s!"
318 318 # plain versions for i18n tool to detect them
319 319 _("push includes unstable changeset: %s!")
320 320 _("push includes bumped changeset: %s!")
321 321 _("push includes divergent changeset: %s!")
322 322 # If we are to push if there is at least one
323 323 # obsolete or unstable changeset in missing, at
324 324 # least one of the missinghead will be obsolete or
325 325 # unstable. So checking heads only is ok
326 326 for node in outgoing.missingheads:
327 327 ctx = unfi[node]
328 328 if ctx.obsolete():
329 329 raise util.Abort(mso % ctx)
330 330 elif ctx.troubled():
331 331 raise util.Abort(_(mst)
332 332 % (ctx.troubles()[0],
333 333 ctx))
334 334 newbm = pushop.ui.configlist('bookmarks', 'pushing')
335 335 discovery.checkheads(unfi, pushop.remote, outgoing,
336 336 pushop.remoteheads,
337 337 pushop.newbranch,
338 338 bool(pushop.incoming),
339 339 newbm)
340 340 return True
341 341
342 342 # List of names of steps to perform for an outgoing bundle2, order matters.
343 343 b2partsgenorder = []
344 344
345 345 # Mapping between step name and function
346 346 #
347 347 # This exists to help extensions wrap steps if necessary
348 348 b2partsgenmapping = {}
349 349
350 350 def b2partsgenerator(stepname):
351 351 """decorator for function generating bundle2 part
352 352
353 353 The function is added to the step -> function mapping and appended to the
354 354 list of steps. Beware that decorated functions will be added in order
355 355 (this may matter).
356 356
357 357 You can only use this decorator for new steps, if you want to wrap a step
358 358 from an extension, attack the b2partsgenmapping dictionary directly."""
359 359 def dec(func):
360 360 assert stepname not in b2partsgenmapping
361 361 b2partsgenmapping[stepname] = func
362 362 b2partsgenorder.append(stepname)
363 363 return func
364 364 return dec
365 365
366 366 @b2partsgenerator('changeset')
367 367 def _pushb2ctx(pushop, bundler):
368 368 """handle changegroup push through bundle2
369 369
370 370 addchangegroup result is stored in the ``pushop.ret`` attribute.
371 371 """
372 372 if 'changesets' in pushop.stepsdone:
373 373 return
374 374 pushop.stepsdone.add('changesets')
375 375 # Send known heads to the server for race detection.
376 pushop.stepsdone.add('changesets')
377 376 if not _pushcheckoutgoing(pushop):
378 377 return
379 378 pushop.repo.prepushoutgoinghooks(pushop.repo,
380 379 pushop.remote,
381 380 pushop.outgoing)
382 381 if not pushop.force:
383 382 bundler.newpart('B2X:CHECK:HEADS', data=iter(pushop.remoteheads))
384 383 cg = changegroup.getlocalbundle(pushop.repo, 'push', pushop.outgoing)
385 384 cgpart = bundler.newpart('B2X:CHANGEGROUP', data=cg.getchunks())
386 385 def handlereply(op):
387 386 """extract addchangroup returns from server reply"""
388 387 cgreplies = op.records.getreplies(cgpart.id)
389 388 assert len(cgreplies['changegroup']) == 1
390 389 pushop.ret = cgreplies['changegroup'][0]['return']
391 390 return handlereply
392 391
393 392 @b2partsgenerator('phase')
394 393 def _pushb2phases(pushop, bundler):
395 394 """handle phase push through bundle2"""
396 395 if 'phases' in pushop.stepsdone:
397 396 return
398 397 b2caps = bundle2.bundle2caps(pushop.remote)
399 398 if not 'b2x:pushkey' in b2caps:
400 399 return
401 400 pushop.stepsdone.add('phases')
402 401 part2node = []
403 402 enc = pushkey.encode
404 403 for newremotehead in pushop.outdatedphases:
405 404 part = bundler.newpart('b2x:pushkey')
406 405 part.addparam('namespace', enc('phases'))
407 406 part.addparam('key', enc(newremotehead.hex()))
408 407 part.addparam('old', enc(str(phases.draft)))
409 408 part.addparam('new', enc(str(phases.public)))
410 409 part2node.append((part.id, newremotehead))
411 410 def handlereply(op):
412 411 for partid, node in part2node:
413 412 partrep = op.records.getreplies(partid)
414 413 results = partrep['pushkey']
415 414 assert len(results) <= 1
416 415 msg = None
417 416 if not results:
418 417 msg = _('server ignored update of %s to public!\n') % node
419 418 elif not int(results[0]['return']):
420 419 msg = _('updating %s to public failed!\n') % node
421 420 if msg is not None:
422 421 pushop.ui.warn(msg)
423 422 return handlereply
424 423
425 424 @b2partsgenerator('bookmarks')
426 425 def _pushb2bookmarks(pushop, bundler):
427 426 """handle phase push through bundle2"""
428 427 if 'bookmarks' in pushop.stepsdone:
429 428 return
430 429 b2caps = bundle2.bundle2caps(pushop.remote)
431 430 if 'b2x:pushkey' not in b2caps:
432 431 return
433 432 pushop.stepsdone.add('bookmarks')
434 433 part2book = []
435 434 enc = pushkey.encode
436 435 for book, old, new in pushop.outbookmarks:
437 436 part = bundler.newpart('b2x:pushkey')
438 437 part.addparam('namespace', enc('bookmarks'))
439 438 part.addparam('key', enc(book))
440 439 part.addparam('old', enc(old))
441 440 part.addparam('new', enc(new))
442 441 part2book.append((part.id, book))
443 442 def handlereply(op):
444 443 for partid, book in part2book:
445 444 partrep = op.records.getreplies(partid)
446 445 results = partrep['pushkey']
447 446 assert len(results) <= 1
448 447 if not results:
449 448 pushop.ui.warn(_('server ignored bookmark %s update\n') % book)
450 449 else:
451 450 ret = int(results[0]['return'])
452 451 if ret:
453 452 pushop.ui.status(_("updating bookmark %s\n") % book)
454 453 else:
455 454 pushop.ui.warn(_('updating bookmark %s failed!\n') % book)
456 455 return handlereply
457 456
458 457
459 458 def _pushbundle2(pushop):
460 459 """push data to the remote using bundle2
461 460
462 461 The only currently supported type of data is changegroup but this will
463 462 evolve in the future."""
464 463 bundler = bundle2.bundle20(pushop.ui, bundle2.bundle2caps(pushop.remote))
465 464 # create reply capability
466 465 capsblob = bundle2.encodecaps(pushop.repo.bundle2caps)
467 466 bundler.newpart('b2x:replycaps', data=capsblob)
468 467 replyhandlers = []
469 468 for partgenname in b2partsgenorder:
470 469 partgen = b2partsgenmapping[partgenname]
471 470 ret = partgen(pushop, bundler)
472 471 if callable(ret):
473 472 replyhandlers.append(ret)
474 473 # do not push if nothing to push
475 474 if bundler.nbparts <= 1:
476 475 return
477 476 stream = util.chunkbuffer(bundler.getchunks())
478 477 try:
479 478 reply = pushop.remote.unbundle(stream, ['force'], 'push')
480 479 except error.BundleValueError, exc:
481 480 raise util.Abort('missing support for %s' % exc)
482 481 try:
483 482 op = bundle2.processbundle(pushop.repo, reply)
484 483 except error.BundleValueError, exc:
485 484 raise util.Abort('missing support for %s' % exc)
486 485 for rephand in replyhandlers:
487 486 rephand(op)
488 487
489 488 def _pushchangeset(pushop):
490 489 """Make the actual push of changeset bundle to remote repo"""
491 490 if 'changesets' in pushop.stepsdone:
492 491 return
493 492 pushop.stepsdone.add('changesets')
494 493 if not _pushcheckoutgoing(pushop):
495 494 return
496 495 pushop.repo.prepushoutgoinghooks(pushop.repo,
497 496 pushop.remote,
498 497 pushop.outgoing)
499 498 outgoing = pushop.outgoing
500 499 unbundle = pushop.remote.capable('unbundle')
501 500 # TODO: get bundlecaps from remote
502 501 bundlecaps = None
503 502 # create a changegroup from local
504 503 if pushop.revs is None and not (outgoing.excluded
505 504 or pushop.repo.changelog.filteredrevs):
506 505 # push everything,
507 506 # use the fast path, no race possible on push
508 507 bundler = changegroup.bundle10(pushop.repo, bundlecaps)
509 508 cg = changegroup.getsubset(pushop.repo,
510 509 outgoing,
511 510 bundler,
512 511 'push',
513 512 fastpath=True)
514 513 else:
515 514 cg = changegroup.getlocalbundle(pushop.repo, 'push', outgoing,
516 515 bundlecaps)
517 516
518 517 # apply changegroup to remote
519 518 if unbundle:
520 519 # local repo finds heads on server, finds out what
521 520 # revs it must push. once revs transferred, if server
522 521 # finds it has different heads (someone else won
523 522 # commit/push race), server aborts.
524 523 if pushop.force:
525 524 remoteheads = ['force']
526 525 else:
527 526 remoteheads = pushop.remoteheads
528 527 # ssh: return remote's addchangegroup()
529 528 # http: return remote's addchangegroup() or 0 for error
530 529 pushop.ret = pushop.remote.unbundle(cg, remoteheads,
531 530 pushop.repo.url())
532 531 else:
533 532 # we return an integer indicating remote head count
534 533 # change
535 534 pushop.ret = pushop.remote.addchangegroup(cg, 'push', pushop.repo.url())
536 535
537 536 def _pushsyncphase(pushop):
538 537 """synchronise phase information locally and remotely"""
539 538 cheads = pushop.commonheads
540 539 # even when we don't push, exchanging phase data is useful
541 540 remotephases = pushop.remote.listkeys('phases')
542 541 if (pushop.ui.configbool('ui', '_usedassubrepo', False)
543 542 and remotephases # server supports phases
544 543 and pushop.ret is None # nothing was pushed
545 544 and remotephases.get('publishing', False)):
546 545 # When:
547 546 # - this is a subrepo push
548 547 # - and remote support phase
549 548 # - and no changeset was pushed
550 549 # - and remote is publishing
551 550 # We may be in issue 3871 case!
552 551 # We drop the possible phase synchronisation done by
553 552 # courtesy to publish changesets possibly locally draft
554 553 # on the remote.
555 554 remotephases = {'publishing': 'True'}
556 555 if not remotephases: # old server or public only reply from non-publishing
557 556 _localphasemove(pushop, cheads)
558 557 # don't push any phase data as there is nothing to push
559 558 else:
560 559 ana = phases.analyzeremotephases(pushop.repo, cheads,
561 560 remotephases)
562 561 pheads, droots = ana
563 562 ### Apply remote phase on local
564 563 if remotephases.get('publishing', False):
565 564 _localphasemove(pushop, cheads)
566 565 else: # publish = False
567 566 _localphasemove(pushop, pheads)
568 567 _localphasemove(pushop, cheads, phases.draft)
569 568 ### Apply local phase on remote
570 569
571 570 if pushop.ret:
572 571 if 'phases' in pushop.stepsdone:
573 572 # phases already pushed though bundle2
574 573 return
575 574 outdated = pushop.outdatedphases
576 575 else:
577 576 outdated = pushop.fallbackoutdatedphases
578 577
579 578 pushop.stepsdone.add('phases')
580 579
581 580 # filter heads already turned public by the push
582 581 outdated = [c for c in outdated if c.node() not in pheads]
583 582 b2caps = bundle2.bundle2caps(pushop.remote)
584 583 if 'b2x:pushkey' in b2caps:
585 584 # server supports bundle2, let's do a batched push through it
586 585 #
587 586 # This will eventually be unified with the changesets bundle2 push
588 587 bundler = bundle2.bundle20(pushop.ui, b2caps)
589 588 capsblob = bundle2.encodecaps(pushop.repo.bundle2caps)
590 589 bundler.newpart('b2x:replycaps', data=capsblob)
591 590 part2node = []
592 591 enc = pushkey.encode
593 592 for newremotehead in outdated:
594 593 part = bundler.newpart('b2x:pushkey')
595 594 part.addparam('namespace', enc('phases'))
596 595 part.addparam('key', enc(newremotehead.hex()))
597 596 part.addparam('old', enc(str(phases.draft)))
598 597 part.addparam('new', enc(str(phases.public)))
599 598 part2node.append((part.id, newremotehead))
600 599 stream = util.chunkbuffer(bundler.getchunks())
601 600 try:
602 601 reply = pushop.remote.unbundle(stream, ['force'], 'push')
603 602 op = bundle2.processbundle(pushop.repo, reply)
604 603 except error.BundleValueError, exc:
605 604 raise util.Abort('missing support for %s' % exc)
606 605 for partid, node in part2node:
607 606 partrep = op.records.getreplies(partid)
608 607 results = partrep['pushkey']
609 608 assert len(results) <= 1
610 609 msg = None
611 610 if not results:
612 611 msg = _('server ignored update of %s to public!\n') % node
613 612 elif not int(results[0]['return']):
614 613 msg = _('updating %s to public failed!\n') % node
615 614 if msg is not None:
616 615 pushop.ui.warn(msg)
617 616
618 617 else:
619 618 # fallback to independant pushkey command
620 619 for newremotehead in outdated:
621 620 r = pushop.remote.pushkey('phases',
622 621 newremotehead.hex(),
623 622 str(phases.draft),
624 623 str(phases.public))
625 624 if not r:
626 625 pushop.ui.warn(_('updating %s to public failed!\n')
627 626 % newremotehead)
628 627
629 628 def _localphasemove(pushop, nodes, phase=phases.public):
630 629 """move <nodes> to <phase> in the local source repo"""
631 630 if pushop.locallocked:
632 631 tr = pushop.repo.transaction('push-phase-sync')
633 632 try:
634 633 phases.advanceboundary(pushop.repo, tr, phase, nodes)
635 634 tr.close()
636 635 finally:
637 636 tr.release()
638 637 else:
639 638 # repo is not locked, do not change any phases!
640 639 # Informs the user that phases should have been moved when
641 640 # applicable.
642 641 actualmoves = [n for n in nodes if phase < pushop.repo[n].phase()]
643 642 phasestr = phases.phasenames[phase]
644 643 if actualmoves:
645 644 pushop.ui.status(_('cannot lock source repo, skipping '
646 645 'local %s phase update\n') % phasestr)
647 646
648 647 def _pushobsolete(pushop):
649 648 """utility function to push obsolete markers to a remote"""
650 649 if 'obsmarkers' in pushop.stepsdone:
651 650 return
652 651 pushop.ui.debug('try to push obsolete markers to remote\n')
653 652 repo = pushop.repo
654 653 remote = pushop.remote
655 654 pushop.stepsdone.add('obsmarkers')
656 655 if (obsolete._enabled and repo.obsstore and
657 656 'obsolete' in remote.listkeys('namespaces')):
658 657 rslts = []
659 658 remotedata = obsolete._pushkeyescape(pushop.outobsmarkers)
660 659 for key in sorted(remotedata, reverse=True):
661 660 # reverse sort to ensure we end with dump0
662 661 data = remotedata[key]
663 662 rslts.append(remote.pushkey('obsolete', key, '', data))
664 663 if [r for r in rslts if not r]:
665 664 msg = _('failed to push some obsolete markers!\n')
666 665 repo.ui.warn(msg)
667 666
668 667 def _pushbookmark(pushop):
669 668 """Update bookmark position on remote"""
670 669 if pushop.ret == 0 or 'bookmarks' in pushop.stepsdone:
671 670 return
672 671 pushop.stepsdone.add('bookmarks')
673 672 ui = pushop.ui
674 673 remote = pushop.remote
675 674 for b, old, new in pushop.outbookmarks:
676 675 if remote.pushkey('bookmarks', b, old, new):
677 676 ui.status(_("updating bookmark %s\n") % b)
678 677 else:
679 678 ui.warn(_('updating bookmark %s failed!\n') % b)
680 679
681 680 class pulloperation(object):
682 681 """A object that represent a single pull operation
683 682
684 683 It purpose is to carry push related state and very common operation.
685 684
686 685 A new should be created at the beginning of each pull and discarded
687 686 afterward.
688 687 """
689 688
690 689 def __init__(self, repo, remote, heads=None, force=False):
691 690 # repo we pull into
692 691 self.repo = repo
693 692 # repo we pull from
694 693 self.remote = remote
695 694 # revision we try to pull (None is "all")
696 695 self.heads = heads
697 696 # do we force pull?
698 697 self.force = force
699 698 # the name the pull transaction
700 699 self._trname = 'pull\n' + util.hidepassword(remote.url())
701 700 # hold the transaction once created
702 701 self._tr = None
703 702 # set of common changeset between local and remote before pull
704 703 self.common = None
705 704 # set of pulled head
706 705 self.rheads = None
707 706 # list of missing changeset to fetch remotely
708 707 self.fetch = None
709 708 # result of changegroup pulling (used as return code by pull)
710 709 self.cgresult = None
711 710 # list of step remaining todo (related to future bundle2 usage)
712 711 self.todosteps = set(['changegroup', 'phases', 'obsmarkers'])
713 712
714 713 @util.propertycache
715 714 def pulledsubset(self):
716 715 """heads of the set of changeset target by the pull"""
717 716 # compute target subset
718 717 if self.heads is None:
719 718 # We pulled every thing possible
720 719 # sync on everything common
721 720 c = set(self.common)
722 721 ret = list(self.common)
723 722 for n in self.rheads:
724 723 if n not in c:
725 724 ret.append(n)
726 725 return ret
727 726 else:
728 727 # We pulled a specific subset
729 728 # sync on this subset
730 729 return self.heads
731 730
732 731 def gettransaction(self):
733 732 """get appropriate pull transaction, creating it if needed"""
734 733 if self._tr is None:
735 734 self._tr = self.repo.transaction(self._trname)
736 735 return self._tr
737 736
738 737 def closetransaction(self):
739 738 """close transaction if created"""
740 739 if self._tr is not None:
741 740 self._tr.close()
742 741
743 742 def releasetransaction(self):
744 743 """release transaction if created"""
745 744 if self._tr is not None:
746 745 self._tr.release()
747 746
748 747 def pull(repo, remote, heads=None, force=False):
749 748 pullop = pulloperation(repo, remote, heads, force)
750 749 if pullop.remote.local():
751 750 missing = set(pullop.remote.requirements) - pullop.repo.supported
752 751 if missing:
753 752 msg = _("required features are not"
754 753 " supported in the destination:"
755 754 " %s") % (', '.join(sorted(missing)))
756 755 raise util.Abort(msg)
757 756
758 757 lock = pullop.repo.lock()
759 758 try:
760 759 _pulldiscovery(pullop)
761 760 if (pullop.repo.ui.configbool('experimental', 'bundle2-exp', False)
762 761 and pullop.remote.capable('bundle2-exp')):
763 762 _pullbundle2(pullop)
764 763 if 'changegroup' in pullop.todosteps:
765 764 _pullchangeset(pullop)
766 765 if 'phases' in pullop.todosteps:
767 766 _pullphase(pullop)
768 767 if 'obsmarkers' in pullop.todosteps:
769 768 _pullobsolete(pullop)
770 769 pullop.closetransaction()
771 770 finally:
772 771 pullop.releasetransaction()
773 772 lock.release()
774 773
775 774 return pullop.cgresult
776 775
777 776 def _pulldiscovery(pullop):
778 777 """discovery phase for the pull
779 778
780 779 Current handle changeset discovery only, will change handle all discovery
781 780 at some point."""
782 781 tmp = discovery.findcommonincoming(pullop.repo.unfiltered(),
783 782 pullop.remote,
784 783 heads=pullop.heads,
785 784 force=pullop.force)
786 785 pullop.common, pullop.fetch, pullop.rheads = tmp
787 786
788 787 def _pullbundle2(pullop):
789 788 """pull data using bundle2
790 789
791 790 For now, the only supported data are changegroup."""
792 791 remotecaps = bundle2.bundle2caps(pullop.remote)
793 792 kwargs = {'bundlecaps': caps20to10(pullop.repo)}
794 793 # pulling changegroup
795 794 pullop.todosteps.remove('changegroup')
796 795
797 796 kwargs['common'] = pullop.common
798 797 kwargs['heads'] = pullop.heads or pullop.rheads
799 798 if 'b2x:listkeys' in remotecaps:
800 799 kwargs['listkeys'] = ['phase']
801 800 if not pullop.fetch:
802 801 pullop.repo.ui.status(_("no changes found\n"))
803 802 pullop.cgresult = 0
804 803 else:
805 804 if pullop.heads is None and list(pullop.common) == [nullid]:
806 805 pullop.repo.ui.status(_("requesting all changes\n"))
807 806 _pullbundle2extraprepare(pullop, kwargs)
808 807 if kwargs.keys() == ['format']:
809 808 return # nothing to pull
810 809 bundle = pullop.remote.getbundle('pull', **kwargs)
811 810 try:
812 811 op = bundle2.processbundle(pullop.repo, bundle, pullop.gettransaction)
813 812 except error.BundleValueError, exc:
814 813 raise util.Abort('missing support for %s' % exc)
815 814
816 815 if pullop.fetch:
817 816 assert len(op.records['changegroup']) == 1
818 817 pullop.cgresult = op.records['changegroup'][0]['return']
819 818
820 819 # processing phases change
821 820 for namespace, value in op.records['listkeys']:
822 821 if namespace == 'phases':
823 822 _pullapplyphases(pullop, value)
824 823
825 824 def _pullbundle2extraprepare(pullop, kwargs):
826 825 """hook function so that extensions can extend the getbundle call"""
827 826 pass
828 827
829 828 def _pullchangeset(pullop):
830 829 """pull changeset from unbundle into the local repo"""
831 830 # We delay the open of the transaction as late as possible so we
832 831 # don't open transaction for nothing or you break future useful
833 832 # rollback call
834 833 pullop.todosteps.remove('changegroup')
835 834 if not pullop.fetch:
836 835 pullop.repo.ui.status(_("no changes found\n"))
837 836 pullop.cgresult = 0
838 837 return
839 838 pullop.gettransaction()
840 839 if pullop.heads is None and list(pullop.common) == [nullid]:
841 840 pullop.repo.ui.status(_("requesting all changes\n"))
842 841 elif pullop.heads is None and pullop.remote.capable('changegroupsubset'):
843 842 # issue1320, avoid a race if remote changed after discovery
844 843 pullop.heads = pullop.rheads
845 844
846 845 if pullop.remote.capable('getbundle'):
847 846 # TODO: get bundlecaps from remote
848 847 cg = pullop.remote.getbundle('pull', common=pullop.common,
849 848 heads=pullop.heads or pullop.rheads)
850 849 elif pullop.heads is None:
851 850 cg = pullop.remote.changegroup(pullop.fetch, 'pull')
852 851 elif not pullop.remote.capable('changegroupsubset'):
853 852 raise util.Abort(_("partial pull cannot be done because "
854 853 "other repository doesn't support "
855 854 "changegroupsubset."))
856 855 else:
857 856 cg = pullop.remote.changegroupsubset(pullop.fetch, pullop.heads, 'pull')
858 857 pullop.cgresult = changegroup.addchangegroup(pullop.repo, cg, 'pull',
859 858 pullop.remote.url())
860 859
861 860 def _pullphase(pullop):
862 861 # Get remote phases data from remote
863 862 remotephases = pullop.remote.listkeys('phases')
864 863 _pullapplyphases(pullop, remotephases)
865 864
866 865 def _pullapplyphases(pullop, remotephases):
867 866 """apply phase movement from observed remote state"""
868 867 pullop.todosteps.remove('phases')
869 868 publishing = bool(remotephases.get('publishing', False))
870 869 if remotephases and not publishing:
871 870 # remote is new and unpublishing
872 871 pheads, _dr = phases.analyzeremotephases(pullop.repo,
873 872 pullop.pulledsubset,
874 873 remotephases)
875 874 dheads = pullop.pulledsubset
876 875 else:
877 876 # Remote is old or publishing all common changesets
878 877 # should be seen as public
879 878 pheads = pullop.pulledsubset
880 879 dheads = []
881 880 unfi = pullop.repo.unfiltered()
882 881 phase = unfi._phasecache.phase
883 882 rev = unfi.changelog.nodemap.get
884 883 public = phases.public
885 884 draft = phases.draft
886 885
887 886 # exclude changesets already public locally and update the others
888 887 pheads = [pn for pn in pheads if phase(unfi, rev(pn)) > public]
889 888 if pheads:
890 889 tr = pullop.gettransaction()
891 890 phases.advanceboundary(pullop.repo, tr, public, pheads)
892 891
893 892 # exclude changesets already draft locally and update the others
894 893 dheads = [pn for pn in dheads if phase(unfi, rev(pn)) > draft]
895 894 if dheads:
896 895 tr = pullop.gettransaction()
897 896 phases.advanceboundary(pullop.repo, tr, draft, dheads)
898 897
899 898 def _pullobsolete(pullop):
900 899 """utility function to pull obsolete markers from a remote
901 900
902 901 The `gettransaction` is function that return the pull transaction, creating
903 902 one if necessary. We return the transaction to inform the calling code that
904 903 a new transaction have been created (when applicable).
905 904
906 905 Exists mostly to allow overriding for experimentation purpose"""
907 906 pullop.todosteps.remove('obsmarkers')
908 907 tr = None
909 908 if obsolete._enabled:
910 909 pullop.repo.ui.debug('fetching remote obsolete markers\n')
911 910 remoteobs = pullop.remote.listkeys('obsolete')
912 911 if 'dump0' in remoteobs:
913 912 tr = pullop.gettransaction()
914 913 for key in sorted(remoteobs, reverse=True):
915 914 if key.startswith('dump'):
916 915 data = base85.b85decode(remoteobs[key])
917 916 pullop.repo.obsstore.mergemarkers(tr, data)
918 917 pullop.repo.invalidatevolatilesets()
919 918 return tr
920 919
921 920 def caps20to10(repo):
922 921 """return a set with appropriate options to use bundle20 during getbundle"""
923 922 caps = set(['HG2X'])
924 923 capsblob = bundle2.encodecaps(repo.bundle2caps)
925 924 caps.add('bundle2=' + urllib.quote(capsblob))
926 925 return caps
927 926
928 927 def getbundle(repo, source, heads=None, common=None, bundlecaps=None,
929 928 **kwargs):
930 929 """return a full bundle (with potentially multiple kind of parts)
931 930
932 931 Could be a bundle HG10 or a bundle HG2X depending on bundlecaps
933 932 passed. For now, the bundle can contain only changegroup, but this will
934 933 changes when more part type will be available for bundle2.
935 934
936 935 This is different from changegroup.getbundle that only returns an HG10
937 936 changegroup bundle. They may eventually get reunited in the future when we
938 937 have a clearer idea of the API we what to query different data.
939 938
940 939 The implementation is at a very early stage and will get massive rework
941 940 when the API of bundle is refined.
942 941 """
943 942 cg = None
944 943 if kwargs.get('cg', True):
945 944 # build changegroup bundle here.
946 945 cg = changegroup.getbundle(repo, source, heads=heads,
947 946 common=common, bundlecaps=bundlecaps)
948 947 elif 'HG2X' not in bundlecaps:
949 948 raise ValueError(_('request for bundle10 must include changegroup'))
950 949 if bundlecaps is None or 'HG2X' not in bundlecaps:
951 950 if kwargs:
952 951 raise ValueError(_('unsupported getbundle arguments: %s')
953 952 % ', '.join(sorted(kwargs.keys())))
954 953 return cg
955 954 # very crude first implementation,
956 955 # the bundle API will change and the generation will be done lazily.
957 956 b2caps = {}
958 957 for bcaps in bundlecaps:
959 958 if bcaps.startswith('bundle2='):
960 959 blob = urllib.unquote(bcaps[len('bundle2='):])
961 960 b2caps.update(bundle2.decodecaps(blob))
962 961 bundler = bundle2.bundle20(repo.ui, b2caps)
963 962 if cg:
964 963 bundler.newpart('b2x:changegroup', data=cg.getchunks())
965 964 listkeys = kwargs.get('listkeys', ())
966 965 for namespace in listkeys:
967 966 part = bundler.newpart('b2x:listkeys')
968 967 part.addparam('namespace', namespace)
969 968 keys = repo.listkeys(namespace).items()
970 969 part.data = pushkey.encodekeys(keys)
971 970 _getbundleextrapart(bundler, repo, source, heads=heads, common=common,
972 971 bundlecaps=bundlecaps, **kwargs)
973 972 return util.chunkbuffer(bundler.getchunks())
974 973
975 974 def _getbundleextrapart(bundler, repo, source, heads=None, common=None,
976 975 bundlecaps=None, **kwargs):
977 976 """hook function to let extensions add parts to the requested bundle"""
978 977 pass
979 978
980 979 def check_heads(repo, their_heads, context):
981 980 """check if the heads of a repo have been modified
982 981
983 982 Used by peer for unbundling.
984 983 """
985 984 heads = repo.heads()
986 985 heads_hash = util.sha1(''.join(sorted(heads))).digest()
987 986 if not (their_heads == ['force'] or their_heads == heads or
988 987 their_heads == ['hashed', heads_hash]):
989 988 # someone else committed/pushed/unbundled while we
990 989 # were transferring data
991 990 raise error.PushRaced('repository changed while %s - '
992 991 'please try again' % context)
993 992
994 993 def unbundle(repo, cg, heads, source, url):
995 994 """Apply a bundle to a repo.
996 995
997 996 this function makes sure the repo is locked during the application and have
998 997 mechanism to check that no push race occurred between the creation of the
999 998 bundle and its application.
1000 999
1001 1000 If the push was raced as PushRaced exception is raised."""
1002 1001 r = 0
1003 1002 # need a transaction when processing a bundle2 stream
1004 1003 tr = None
1005 1004 lock = repo.lock()
1006 1005 try:
1007 1006 check_heads(repo, heads, 'uploading changes')
1008 1007 # push can proceed
1009 1008 if util.safehasattr(cg, 'params'):
1010 1009 try:
1011 1010 tr = repo.transaction('unbundle')
1012 1011 tr.hookargs['bundle2-exp'] = '1'
1013 1012 r = bundle2.processbundle(repo, cg, lambda: tr).reply
1014 1013 cl = repo.unfiltered().changelog
1015 1014 p = cl.writepending() and repo.root or ""
1016 1015 repo.hook('b2x-pretransactionclose', throw=True, source=source,
1017 1016 url=url, pending=p, **tr.hookargs)
1018 1017 tr.close()
1019 1018 repo.hook('b2x-transactionclose', source=source, url=url,
1020 1019 **tr.hookargs)
1021 1020 except Exception, exc:
1022 1021 exc.duringunbundle2 = True
1023 1022 raise
1024 1023 else:
1025 1024 r = changegroup.addchangegroup(repo, cg, source, url)
1026 1025 finally:
1027 1026 if tr is not None:
1028 1027 tr.release()
1029 1028 lock.release()
1030 1029 return r
General Comments 0
You need to be logged in to leave comments. Login now