##// END OF EJS Templates
exchange: add a `buildobsmarkerpart` function...
Pierre-Yves David -
r22346:a76660f8 default
parent child Browse files
Show More
@@ -1,1031 +1,1045 b''
1 1 # exchange.py - utility to exchange data between repos.
2 2 #
3 3 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from i18n import _
9 9 from node import hex, nullid
10 10 import errno, urllib
11 11 import util, scmutil, changegroup, base85, error
12 12 import discovery, phases, obsolete, bookmarks, bundle2, pushkey
13 13
14 14 def readbundle(ui, fh, fname, vfs=None):
15 15 header = changegroup.readexactly(fh, 4)
16 16
17 17 alg = None
18 18 if not fname:
19 19 fname = "stream"
20 20 if not header.startswith('HG') and header.startswith('\0'):
21 21 fh = changegroup.headerlessfixup(fh, header)
22 22 header = "HG10"
23 23 alg = 'UN'
24 24 elif vfs:
25 25 fname = vfs.join(fname)
26 26
27 27 magic, version = header[0:2], header[2:4]
28 28
29 29 if magic != 'HG':
30 30 raise util.Abort(_('%s: not a Mercurial bundle') % fname)
31 31 if version == '10':
32 32 if alg is None:
33 33 alg = changegroup.readexactly(fh, 2)
34 34 return changegroup.unbundle10(fh, alg)
35 35 elif version == '2X':
36 36 return bundle2.unbundle20(ui, fh, header=magic + version)
37 37 else:
38 38 raise util.Abort(_('%s: unknown bundle version %s') % (fname, version))
39 39
40 def buildobsmarkerspart(bundler, markers):
41 """add an obsmarker part to the bundler with <markers>
42
43 No part is created if markers is empty.
44 Raises ValueError if the bundler doesn't support any known obsmarker format.
45 """
46 if markers:
47 remoteversions = bundle2.obsmarkersversion(bundler.capabilities)
48 version = obsolete.commonversion(remoteversions)
49 if version is None:
50 raise ValueError('bundler do not support common obsmarker format')
51 stream = obsolete.encodemarkers(markers, True, version=version)
52 return bundler.newpart('B2X:OBSMARKERS', data=stream)
53 return None
40 54
41 55 class pushoperation(object):
42 56 """A object that represent a single push operation
43 57
44 58 It purpose is to carry push related state and very common operation.
45 59
46 60 A new should be created at the beginning of each push and discarded
47 61 afterward.
48 62 """
49 63
50 64 def __init__(self, repo, remote, force=False, revs=None, newbranch=False):
51 65 # repo we push from
52 66 self.repo = repo
53 67 self.ui = repo.ui
54 68 # repo we push to
55 69 self.remote = remote
56 70 # force option provided
57 71 self.force = force
58 72 # revs to be pushed (None is "all")
59 73 self.revs = revs
60 74 # allow push of new branch
61 75 self.newbranch = newbranch
62 76 # did a local lock get acquired?
63 77 self.locallocked = None
64 78 # step already performed
65 79 # (used to check what steps have been already performed through bundle2)
66 80 self.stepsdone = set()
67 81 # Integer version of the push result
68 82 # - None means nothing to push
69 83 # - 0 means HTTP error
70 84 # - 1 means we pushed and remote head count is unchanged *or*
71 85 # we have outgoing changesets but refused to push
72 86 # - other values as described by addchangegroup()
73 87 self.ret = None
74 88 # discover.outgoing object (contains common and outgoing data)
75 89 self.outgoing = None
76 90 # all remote heads before the push
77 91 self.remoteheads = None
78 92 # testable as a boolean indicating if any nodes are missing locally.
79 93 self.incoming = None
80 94 # phases changes that must be pushed along side the changesets
81 95 self.outdatedphases = None
82 96 # phases changes that must be pushed if changeset push fails
83 97 self.fallbackoutdatedphases = None
84 98 # outgoing obsmarkers
85 99 self.outobsmarkers = set()
86 100 # outgoing bookmarks
87 101 self.outbookmarks = []
88 102
89 103 @util.propertycache
90 104 def futureheads(self):
91 105 """future remote heads if the changeset push succeeds"""
92 106 return self.outgoing.missingheads
93 107
94 108 @util.propertycache
95 109 def fallbackheads(self):
96 110 """future remote heads if the changeset push fails"""
97 111 if self.revs is None:
98 112 # not target to push, all common are relevant
99 113 return self.outgoing.commonheads
100 114 unfi = self.repo.unfiltered()
101 115 # I want cheads = heads(::missingheads and ::commonheads)
102 116 # (missingheads is revs with secret changeset filtered out)
103 117 #
104 118 # This can be expressed as:
105 119 # cheads = ( (missingheads and ::commonheads)
106 120 # + (commonheads and ::missingheads))"
107 121 # )
108 122 #
109 123 # while trying to push we already computed the following:
110 124 # common = (::commonheads)
111 125 # missing = ((commonheads::missingheads) - commonheads)
112 126 #
113 127 # We can pick:
114 128 # * missingheads part of common (::commonheads)
115 129 common = set(self.outgoing.common)
116 130 nm = self.repo.changelog.nodemap
117 131 cheads = [node for node in self.revs if nm[node] in common]
118 132 # and
119 133 # * commonheads parents on missing
120 134 revset = unfi.set('%ln and parents(roots(%ln))',
121 135 self.outgoing.commonheads,
122 136 self.outgoing.missing)
123 137 cheads.extend(c.node() for c in revset)
124 138 return cheads
125 139
126 140 @property
127 141 def commonheads(self):
128 142 """set of all common heads after changeset bundle push"""
129 143 if self.ret:
130 144 return self.futureheads
131 145 else:
132 146 return self.fallbackheads
133 147
134 148 def push(repo, remote, force=False, revs=None, newbranch=False):
135 149 '''Push outgoing changesets (limited by revs) from a local
136 150 repository to remote. Return an integer:
137 151 - None means nothing to push
138 152 - 0 means HTTP error
139 153 - 1 means we pushed and remote head count is unchanged *or*
140 154 we have outgoing changesets but refused to push
141 155 - other values as described by addchangegroup()
142 156 '''
143 157 pushop = pushoperation(repo, remote, force, revs, newbranch)
144 158 if pushop.remote.local():
145 159 missing = (set(pushop.repo.requirements)
146 160 - pushop.remote.local().supported)
147 161 if missing:
148 162 msg = _("required features are not"
149 163 " supported in the destination:"
150 164 " %s") % (', '.join(sorted(missing)))
151 165 raise util.Abort(msg)
152 166
153 167 # there are two ways to push to remote repo:
154 168 #
155 169 # addchangegroup assumes local user can lock remote
156 170 # repo (local filesystem, old ssh servers).
157 171 #
158 172 # unbundle assumes local user cannot lock remote repo (new ssh
159 173 # servers, http servers).
160 174
161 175 if not pushop.remote.canpush():
162 176 raise util.Abort(_("destination does not support push"))
163 177 # get local lock as we might write phase data
164 178 locallock = None
165 179 try:
166 180 locallock = pushop.repo.lock()
167 181 pushop.locallocked = True
168 182 except IOError, err:
169 183 pushop.locallocked = False
170 184 if err.errno != errno.EACCES:
171 185 raise
172 186 # source repo cannot be locked.
173 187 # We do not abort the push, but just disable the local phase
174 188 # synchronisation.
175 189 msg = 'cannot lock source repository: %s\n' % err
176 190 pushop.ui.debug(msg)
177 191 try:
178 192 pushop.repo.checkpush(pushop)
179 193 lock = None
180 194 unbundle = pushop.remote.capable('unbundle')
181 195 if not unbundle:
182 196 lock = pushop.remote.lock()
183 197 try:
184 198 _pushdiscovery(pushop)
185 199 if (pushop.repo.ui.configbool('experimental', 'bundle2-exp',
186 200 False)
187 201 and pushop.remote.capable('bundle2-exp')):
188 202 _pushbundle2(pushop)
189 203 _pushchangeset(pushop)
190 204 _pushsyncphase(pushop)
191 205 _pushobsolete(pushop)
192 206 _pushbookmark(pushop)
193 207 finally:
194 208 if lock is not None:
195 209 lock.release()
196 210 finally:
197 211 if locallock is not None:
198 212 locallock.release()
199 213
200 214 return pushop.ret
201 215
202 216 # list of steps to perform discovery before push
203 217 pushdiscoveryorder = []
204 218
205 219 # Mapping between step name and function
206 220 #
207 221 # This exists to help extensions wrap steps if necessary
208 222 pushdiscoverymapping = {}
209 223
210 224 def pushdiscovery(stepname):
211 225 """decorator for function performing discovery before push
212 226
213 227 The function is added to the step -> function mapping and appended to the
214 228 list of steps. Beware that decorated function will be added in order (this
215 229 may matter).
216 230
217 231 You can only use this decorator for a new step, if you want to wrap a step
218 232 from an extension, change the pushdiscovery dictionary directly."""
219 233 def dec(func):
220 234 assert stepname not in pushdiscoverymapping
221 235 pushdiscoverymapping[stepname] = func
222 236 pushdiscoveryorder.append(stepname)
223 237 return func
224 238 return dec
225 239
226 240 def _pushdiscovery(pushop):
227 241 """Run all discovery steps"""
228 242 for stepname in pushdiscoveryorder:
229 243 step = pushdiscoverymapping[stepname]
230 244 step(pushop)
231 245
232 246 @pushdiscovery('changeset')
233 247 def _pushdiscoverychangeset(pushop):
234 248 """discover the changeset that need to be pushed"""
235 249 unfi = pushop.repo.unfiltered()
236 250 fci = discovery.findcommonincoming
237 251 commoninc = fci(unfi, pushop.remote, force=pushop.force)
238 252 common, inc, remoteheads = commoninc
239 253 fco = discovery.findcommonoutgoing
240 254 outgoing = fco(unfi, pushop.remote, onlyheads=pushop.revs,
241 255 commoninc=commoninc, force=pushop.force)
242 256 pushop.outgoing = outgoing
243 257 pushop.remoteheads = remoteheads
244 258 pushop.incoming = inc
245 259
246 260 @pushdiscovery('phase')
247 261 def _pushdiscoveryphase(pushop):
248 262 """discover the phase that needs to be pushed
249 263
250 264 (computed for both success and failure case for changesets push)"""
251 265 outgoing = pushop.outgoing
252 266 unfi = pushop.repo.unfiltered()
253 267 remotephases = pushop.remote.listkeys('phases')
254 268 publishing = remotephases.get('publishing', False)
255 269 ana = phases.analyzeremotephases(pushop.repo,
256 270 pushop.fallbackheads,
257 271 remotephases)
258 272 pheads, droots = ana
259 273 extracond = ''
260 274 if not publishing:
261 275 extracond = ' and public()'
262 276 revset = 'heads((%%ln::%%ln) %s)' % extracond
263 277 # Get the list of all revs draft on remote by public here.
264 278 # XXX Beware that revset break if droots is not strictly
265 279 # XXX root we may want to ensure it is but it is costly
266 280 fallback = list(unfi.set(revset, droots, pushop.fallbackheads))
267 281 if not outgoing.missing:
268 282 future = fallback
269 283 else:
270 284 # adds changeset we are going to push as draft
271 285 #
272 286 # should not be necessary for pushblishing server, but because of an
273 287 # issue fixed in xxxxx we have to do it anyway.
274 288 fdroots = list(unfi.set('roots(%ln + %ln::)',
275 289 outgoing.missing, droots))
276 290 fdroots = [f.node() for f in fdroots]
277 291 future = list(unfi.set(revset, fdroots, pushop.futureheads))
278 292 pushop.outdatedphases = future
279 293 pushop.fallbackoutdatedphases = fallback
280 294
281 295 @pushdiscovery('obsmarker')
282 296 def _pushdiscoveryobsmarkers(pushop):
283 297 if (obsolete._enabled
284 298 and pushop.repo.obsstore
285 299 and 'obsolete' in pushop.remote.listkeys('namespaces')):
286 300 pushop.outobsmarkers = pushop.repo.obsstore
287 301
288 302 @pushdiscovery('bookmarks')
289 303 def _pushdiscoverybookmarks(pushop):
290 304 ui = pushop.ui
291 305 repo = pushop.repo.unfiltered()
292 306 remote = pushop.remote
293 307 ui.debug("checking for updated bookmarks\n")
294 308 ancestors = ()
295 309 if pushop.revs:
296 310 revnums = map(repo.changelog.rev, pushop.revs)
297 311 ancestors = repo.changelog.ancestors(revnums, inclusive=True)
298 312 remotebookmark = remote.listkeys('bookmarks')
299 313
300 314 comp = bookmarks.compare(repo, repo._bookmarks, remotebookmark, srchex=hex)
301 315 addsrc, adddst, advsrc, advdst, diverge, differ, invalid = comp
302 316 for b, scid, dcid in advsrc:
303 317 if not ancestors or repo[scid].rev() in ancestors:
304 318 pushop.outbookmarks.append((b, dcid, scid))
305 319
306 320 def _pushcheckoutgoing(pushop):
307 321 outgoing = pushop.outgoing
308 322 unfi = pushop.repo.unfiltered()
309 323 if not outgoing.missing:
310 324 # nothing to push
311 325 scmutil.nochangesfound(unfi.ui, unfi, outgoing.excluded)
312 326 return False
313 327 # something to push
314 328 if not pushop.force:
315 329 # if repo.obsstore == False --> no obsolete
316 330 # then, save the iteration
317 331 if unfi.obsstore:
318 332 # this message are here for 80 char limit reason
319 333 mso = _("push includes obsolete changeset: %s!")
320 334 mst = "push includes %s changeset: %s!"
321 335 # plain versions for i18n tool to detect them
322 336 _("push includes unstable changeset: %s!")
323 337 _("push includes bumped changeset: %s!")
324 338 _("push includes divergent changeset: %s!")
325 339 # If we are to push if there is at least one
326 340 # obsolete or unstable changeset in missing, at
327 341 # least one of the missinghead will be obsolete or
328 342 # unstable. So checking heads only is ok
329 343 for node in outgoing.missingheads:
330 344 ctx = unfi[node]
331 345 if ctx.obsolete():
332 346 raise util.Abort(mso % ctx)
333 347 elif ctx.troubled():
334 348 raise util.Abort(_(mst)
335 349 % (ctx.troubles()[0],
336 350 ctx))
337 351 newbm = pushop.ui.configlist('bookmarks', 'pushing')
338 352 discovery.checkheads(unfi, pushop.remote, outgoing,
339 353 pushop.remoteheads,
340 354 pushop.newbranch,
341 355 bool(pushop.incoming),
342 356 newbm)
343 357 return True
344 358
345 359 # List of names of steps to perform for an outgoing bundle2, order matters.
346 360 b2partsgenorder = []
347 361
348 362 # Mapping between step name and function
349 363 #
350 364 # This exists to help extensions wrap steps if necessary
351 365 b2partsgenmapping = {}
352 366
353 367 def b2partsgenerator(stepname):
354 368 """decorator for function generating bundle2 part
355 369
356 370 The function is added to the step -> function mapping and appended to the
357 371 list of steps. Beware that decorated functions will be added in order
358 372 (this may matter).
359 373
360 374 You can only use this decorator for new steps, if you want to wrap a step
361 375 from an extension, attack the b2partsgenmapping dictionary directly."""
362 376 def dec(func):
363 377 assert stepname not in b2partsgenmapping
364 378 b2partsgenmapping[stepname] = func
365 379 b2partsgenorder.append(stepname)
366 380 return func
367 381 return dec
368 382
369 383 @b2partsgenerator('changeset')
370 384 def _pushb2ctx(pushop, bundler):
371 385 """handle changegroup push through bundle2
372 386
373 387 addchangegroup result is stored in the ``pushop.ret`` attribute.
374 388 """
375 389 if 'changesets' in pushop.stepsdone:
376 390 return
377 391 pushop.stepsdone.add('changesets')
378 392 # Send known heads to the server for race detection.
379 393 if not _pushcheckoutgoing(pushop):
380 394 return
381 395 pushop.repo.prepushoutgoinghooks(pushop.repo,
382 396 pushop.remote,
383 397 pushop.outgoing)
384 398 if not pushop.force:
385 399 bundler.newpart('B2X:CHECK:HEADS', data=iter(pushop.remoteheads))
386 400 cg = changegroup.getlocalbundle(pushop.repo, 'push', pushop.outgoing)
387 401 cgpart = bundler.newpart('B2X:CHANGEGROUP', data=cg.getchunks())
388 402 def handlereply(op):
389 403 """extract addchangroup returns from server reply"""
390 404 cgreplies = op.records.getreplies(cgpart.id)
391 405 assert len(cgreplies['changegroup']) == 1
392 406 pushop.ret = cgreplies['changegroup'][0]['return']
393 407 return handlereply
394 408
395 409 @b2partsgenerator('phase')
396 410 def _pushb2phases(pushop, bundler):
397 411 """handle phase push through bundle2"""
398 412 if 'phases' in pushop.stepsdone:
399 413 return
400 414 b2caps = bundle2.bundle2caps(pushop.remote)
401 415 if not 'b2x:pushkey' in b2caps:
402 416 return
403 417 pushop.stepsdone.add('phases')
404 418 part2node = []
405 419 enc = pushkey.encode
406 420 for newremotehead in pushop.outdatedphases:
407 421 part = bundler.newpart('b2x:pushkey')
408 422 part.addparam('namespace', enc('phases'))
409 423 part.addparam('key', enc(newremotehead.hex()))
410 424 part.addparam('old', enc(str(phases.draft)))
411 425 part.addparam('new', enc(str(phases.public)))
412 426 part2node.append((part.id, newremotehead))
413 427 def handlereply(op):
414 428 for partid, node in part2node:
415 429 partrep = op.records.getreplies(partid)
416 430 results = partrep['pushkey']
417 431 assert len(results) <= 1
418 432 msg = None
419 433 if not results:
420 434 msg = _('server ignored update of %s to public!\n') % node
421 435 elif not int(results[0]['return']):
422 436 msg = _('updating %s to public failed!\n') % node
423 437 if msg is not None:
424 438 pushop.ui.warn(msg)
425 439 return handlereply
426 440
427 441 @b2partsgenerator('bookmarks')
428 442 def _pushb2bookmarks(pushop, bundler):
429 443 """handle phase push through bundle2"""
430 444 if 'bookmarks' in pushop.stepsdone:
431 445 return
432 446 b2caps = bundle2.bundle2caps(pushop.remote)
433 447 if 'b2x:pushkey' not in b2caps:
434 448 return
435 449 pushop.stepsdone.add('bookmarks')
436 450 part2book = []
437 451 enc = pushkey.encode
438 452 for book, old, new in pushop.outbookmarks:
439 453 part = bundler.newpart('b2x:pushkey')
440 454 part.addparam('namespace', enc('bookmarks'))
441 455 part.addparam('key', enc(book))
442 456 part.addparam('old', enc(old))
443 457 part.addparam('new', enc(new))
444 458 part2book.append((part.id, book))
445 459 def handlereply(op):
446 460 for partid, book in part2book:
447 461 partrep = op.records.getreplies(partid)
448 462 results = partrep['pushkey']
449 463 assert len(results) <= 1
450 464 if not results:
451 465 pushop.ui.warn(_('server ignored bookmark %s update\n') % book)
452 466 else:
453 467 ret = int(results[0]['return'])
454 468 if ret:
455 469 pushop.ui.status(_("updating bookmark %s\n") % book)
456 470 else:
457 471 pushop.ui.warn(_('updating bookmark %s failed!\n') % book)
458 472 return handlereply
459 473
460 474
461 475 def _pushbundle2(pushop):
462 476 """push data to the remote using bundle2
463 477
464 478 The only currently supported type of data is changegroup but this will
465 479 evolve in the future."""
466 480 bundler = bundle2.bundle20(pushop.ui, bundle2.bundle2caps(pushop.remote))
467 481 # create reply capability
468 482 capsblob = bundle2.encodecaps(bundle2.getrepocaps(pushop.repo))
469 483 bundler.newpart('b2x:replycaps', data=capsblob)
470 484 replyhandlers = []
471 485 for partgenname in b2partsgenorder:
472 486 partgen = b2partsgenmapping[partgenname]
473 487 ret = partgen(pushop, bundler)
474 488 if callable(ret):
475 489 replyhandlers.append(ret)
476 490 # do not push if nothing to push
477 491 if bundler.nbparts <= 1:
478 492 return
479 493 stream = util.chunkbuffer(bundler.getchunks())
480 494 try:
481 495 reply = pushop.remote.unbundle(stream, ['force'], 'push')
482 496 except error.BundleValueError, exc:
483 497 raise util.Abort('missing support for %s' % exc)
484 498 try:
485 499 op = bundle2.processbundle(pushop.repo, reply)
486 500 except error.BundleValueError, exc:
487 501 raise util.Abort('missing support for %s' % exc)
488 502 for rephand in replyhandlers:
489 503 rephand(op)
490 504
491 505 def _pushchangeset(pushop):
492 506 """Make the actual push of changeset bundle to remote repo"""
493 507 if 'changesets' in pushop.stepsdone:
494 508 return
495 509 pushop.stepsdone.add('changesets')
496 510 if not _pushcheckoutgoing(pushop):
497 511 return
498 512 pushop.repo.prepushoutgoinghooks(pushop.repo,
499 513 pushop.remote,
500 514 pushop.outgoing)
501 515 outgoing = pushop.outgoing
502 516 unbundle = pushop.remote.capable('unbundle')
503 517 # TODO: get bundlecaps from remote
504 518 bundlecaps = None
505 519 # create a changegroup from local
506 520 if pushop.revs is None and not (outgoing.excluded
507 521 or pushop.repo.changelog.filteredrevs):
508 522 # push everything,
509 523 # use the fast path, no race possible on push
510 524 bundler = changegroup.bundle10(pushop.repo, bundlecaps)
511 525 cg = changegroup.getsubset(pushop.repo,
512 526 outgoing,
513 527 bundler,
514 528 'push',
515 529 fastpath=True)
516 530 else:
517 531 cg = changegroup.getlocalbundle(pushop.repo, 'push', outgoing,
518 532 bundlecaps)
519 533
520 534 # apply changegroup to remote
521 535 if unbundle:
522 536 # local repo finds heads on server, finds out what
523 537 # revs it must push. once revs transferred, if server
524 538 # finds it has different heads (someone else won
525 539 # commit/push race), server aborts.
526 540 if pushop.force:
527 541 remoteheads = ['force']
528 542 else:
529 543 remoteheads = pushop.remoteheads
530 544 # ssh: return remote's addchangegroup()
531 545 # http: return remote's addchangegroup() or 0 for error
532 546 pushop.ret = pushop.remote.unbundle(cg, remoteheads,
533 547 pushop.repo.url())
534 548 else:
535 549 # we return an integer indicating remote head count
536 550 # change
537 551 pushop.ret = pushop.remote.addchangegroup(cg, 'push', pushop.repo.url())
538 552
539 553 def _pushsyncphase(pushop):
540 554 """synchronise phase information locally and remotely"""
541 555 cheads = pushop.commonheads
542 556 # even when we don't push, exchanging phase data is useful
543 557 remotephases = pushop.remote.listkeys('phases')
544 558 if (pushop.ui.configbool('ui', '_usedassubrepo', False)
545 559 and remotephases # server supports phases
546 560 and pushop.ret is None # nothing was pushed
547 561 and remotephases.get('publishing', False)):
548 562 # When:
549 563 # - this is a subrepo push
550 564 # - and remote support phase
551 565 # - and no changeset was pushed
552 566 # - and remote is publishing
553 567 # We may be in issue 3871 case!
554 568 # We drop the possible phase synchronisation done by
555 569 # courtesy to publish changesets possibly locally draft
556 570 # on the remote.
557 571 remotephases = {'publishing': 'True'}
558 572 if not remotephases: # old server or public only reply from non-publishing
559 573 _localphasemove(pushop, cheads)
560 574 # don't push any phase data as there is nothing to push
561 575 else:
562 576 ana = phases.analyzeremotephases(pushop.repo, cheads,
563 577 remotephases)
564 578 pheads, droots = ana
565 579 ### Apply remote phase on local
566 580 if remotephases.get('publishing', False):
567 581 _localphasemove(pushop, cheads)
568 582 else: # publish = False
569 583 _localphasemove(pushop, pheads)
570 584 _localphasemove(pushop, cheads, phases.draft)
571 585 ### Apply local phase on remote
572 586
573 587 if pushop.ret:
574 588 if 'phases' in pushop.stepsdone:
575 589 # phases already pushed though bundle2
576 590 return
577 591 outdated = pushop.outdatedphases
578 592 else:
579 593 outdated = pushop.fallbackoutdatedphases
580 594
581 595 pushop.stepsdone.add('phases')
582 596
583 597 # filter heads already turned public by the push
584 598 outdated = [c for c in outdated if c.node() not in pheads]
585 599 b2caps = bundle2.bundle2caps(pushop.remote)
586 600 if 'b2x:pushkey' in b2caps:
587 601 # server supports bundle2, let's do a batched push through it
588 602 #
589 603 # This will eventually be unified with the changesets bundle2 push
590 604 bundler = bundle2.bundle20(pushop.ui, b2caps)
591 605 capsblob = bundle2.encodecaps(bundle2.getrepocaps(pushop.repo))
592 606 bundler.newpart('b2x:replycaps', data=capsblob)
593 607 part2node = []
594 608 enc = pushkey.encode
595 609 for newremotehead in outdated:
596 610 part = bundler.newpart('b2x:pushkey')
597 611 part.addparam('namespace', enc('phases'))
598 612 part.addparam('key', enc(newremotehead.hex()))
599 613 part.addparam('old', enc(str(phases.draft)))
600 614 part.addparam('new', enc(str(phases.public)))
601 615 part2node.append((part.id, newremotehead))
602 616 stream = util.chunkbuffer(bundler.getchunks())
603 617 try:
604 618 reply = pushop.remote.unbundle(stream, ['force'], 'push')
605 619 op = bundle2.processbundle(pushop.repo, reply)
606 620 except error.BundleValueError, exc:
607 621 raise util.Abort('missing support for %s' % exc)
608 622 for partid, node in part2node:
609 623 partrep = op.records.getreplies(partid)
610 624 results = partrep['pushkey']
611 625 assert len(results) <= 1
612 626 msg = None
613 627 if not results:
614 628 msg = _('server ignored update of %s to public!\n') % node
615 629 elif not int(results[0]['return']):
616 630 msg = _('updating %s to public failed!\n') % node
617 631 if msg is not None:
618 632 pushop.ui.warn(msg)
619 633
620 634 else:
621 635 # fallback to independant pushkey command
622 636 for newremotehead in outdated:
623 637 r = pushop.remote.pushkey('phases',
624 638 newremotehead.hex(),
625 639 str(phases.draft),
626 640 str(phases.public))
627 641 if not r:
628 642 pushop.ui.warn(_('updating %s to public failed!\n')
629 643 % newremotehead)
630 644
631 645 def _localphasemove(pushop, nodes, phase=phases.public):
632 646 """move <nodes> to <phase> in the local source repo"""
633 647 if pushop.locallocked:
634 648 tr = pushop.repo.transaction('push-phase-sync')
635 649 try:
636 650 phases.advanceboundary(pushop.repo, tr, phase, nodes)
637 651 tr.close()
638 652 finally:
639 653 tr.release()
640 654 else:
641 655 # repo is not locked, do not change any phases!
642 656 # Informs the user that phases should have been moved when
643 657 # applicable.
644 658 actualmoves = [n for n in nodes if phase < pushop.repo[n].phase()]
645 659 phasestr = phases.phasenames[phase]
646 660 if actualmoves:
647 661 pushop.ui.status(_('cannot lock source repo, skipping '
648 662 'local %s phase update\n') % phasestr)
649 663
650 664 def _pushobsolete(pushop):
651 665 """utility function to push obsolete markers to a remote"""
652 666 if 'obsmarkers' in pushop.stepsdone:
653 667 return
654 668 pushop.ui.debug('try to push obsolete markers to remote\n')
655 669 repo = pushop.repo
656 670 remote = pushop.remote
657 671 pushop.stepsdone.add('obsmarkers')
658 672 if (pushop.outobsmarkers):
659 673 rslts = []
660 674 remotedata = obsolete._pushkeyescape(pushop.outobsmarkers)
661 675 for key in sorted(remotedata, reverse=True):
662 676 # reverse sort to ensure we end with dump0
663 677 data = remotedata[key]
664 678 rslts.append(remote.pushkey('obsolete', key, '', data))
665 679 if [r for r in rslts if not r]:
666 680 msg = _('failed to push some obsolete markers!\n')
667 681 repo.ui.warn(msg)
668 682
669 683 def _pushbookmark(pushop):
670 684 """Update bookmark position on remote"""
671 685 if pushop.ret == 0 or 'bookmarks' in pushop.stepsdone:
672 686 return
673 687 pushop.stepsdone.add('bookmarks')
674 688 ui = pushop.ui
675 689 remote = pushop.remote
676 690 for b, old, new in pushop.outbookmarks:
677 691 if remote.pushkey('bookmarks', b, old, new):
678 692 ui.status(_("updating bookmark %s\n") % b)
679 693 else:
680 694 ui.warn(_('updating bookmark %s failed!\n') % b)
681 695
682 696 class pulloperation(object):
683 697 """A object that represent a single pull operation
684 698
685 699 It purpose is to carry push related state and very common operation.
686 700
687 701 A new should be created at the beginning of each pull and discarded
688 702 afterward.
689 703 """
690 704
691 705 def __init__(self, repo, remote, heads=None, force=False):
692 706 # repo we pull into
693 707 self.repo = repo
694 708 # repo we pull from
695 709 self.remote = remote
696 710 # revision we try to pull (None is "all")
697 711 self.heads = heads
698 712 # do we force pull?
699 713 self.force = force
700 714 # the name the pull transaction
701 715 self._trname = 'pull\n' + util.hidepassword(remote.url())
702 716 # hold the transaction once created
703 717 self._tr = None
704 718 # set of common changeset between local and remote before pull
705 719 self.common = None
706 720 # set of pulled head
707 721 self.rheads = None
708 722 # list of missing changeset to fetch remotely
709 723 self.fetch = None
710 724 # result of changegroup pulling (used as return code by pull)
711 725 self.cgresult = None
712 726 # list of step remaining todo (related to future bundle2 usage)
713 727 self.todosteps = set(['changegroup', 'phases', 'obsmarkers'])
714 728
715 729 @util.propertycache
716 730 def pulledsubset(self):
717 731 """heads of the set of changeset target by the pull"""
718 732 # compute target subset
719 733 if self.heads is None:
720 734 # We pulled every thing possible
721 735 # sync on everything common
722 736 c = set(self.common)
723 737 ret = list(self.common)
724 738 for n in self.rheads:
725 739 if n not in c:
726 740 ret.append(n)
727 741 return ret
728 742 else:
729 743 # We pulled a specific subset
730 744 # sync on this subset
731 745 return self.heads
732 746
733 747 def gettransaction(self):
734 748 """get appropriate pull transaction, creating it if needed"""
735 749 if self._tr is None:
736 750 self._tr = self.repo.transaction(self._trname)
737 751 return self._tr
738 752
739 753 def closetransaction(self):
740 754 """close transaction if created"""
741 755 if self._tr is not None:
742 756 self._tr.close()
743 757
744 758 def releasetransaction(self):
745 759 """release transaction if created"""
746 760 if self._tr is not None:
747 761 self._tr.release()
748 762
749 763 def pull(repo, remote, heads=None, force=False):
750 764 pullop = pulloperation(repo, remote, heads, force)
751 765 if pullop.remote.local():
752 766 missing = set(pullop.remote.requirements) - pullop.repo.supported
753 767 if missing:
754 768 msg = _("required features are not"
755 769 " supported in the destination:"
756 770 " %s") % (', '.join(sorted(missing)))
757 771 raise util.Abort(msg)
758 772
759 773 lock = pullop.repo.lock()
760 774 try:
761 775 _pulldiscovery(pullop)
762 776 if (pullop.repo.ui.configbool('experimental', 'bundle2-exp', False)
763 777 and pullop.remote.capable('bundle2-exp')):
764 778 _pullbundle2(pullop)
765 779 if 'changegroup' in pullop.todosteps:
766 780 _pullchangeset(pullop)
767 781 if 'phases' in pullop.todosteps:
768 782 _pullphase(pullop)
769 783 if 'obsmarkers' in pullop.todosteps:
770 784 _pullobsolete(pullop)
771 785 pullop.closetransaction()
772 786 finally:
773 787 pullop.releasetransaction()
774 788 lock.release()
775 789
776 790 return pullop.cgresult
777 791
778 792 def _pulldiscovery(pullop):
779 793 """discovery phase for the pull
780 794
781 795 Current handle changeset discovery only, will change handle all discovery
782 796 at some point."""
783 797 tmp = discovery.findcommonincoming(pullop.repo.unfiltered(),
784 798 pullop.remote,
785 799 heads=pullop.heads,
786 800 force=pullop.force)
787 801 pullop.common, pullop.fetch, pullop.rheads = tmp
788 802
789 803 def _pullbundle2(pullop):
790 804 """pull data using bundle2
791 805
792 806 For now, the only supported data are changegroup."""
793 807 remotecaps = bundle2.bundle2caps(pullop.remote)
794 808 kwargs = {'bundlecaps': caps20to10(pullop.repo)}
795 809 # pulling changegroup
796 810 pullop.todosteps.remove('changegroup')
797 811
798 812 kwargs['common'] = pullop.common
799 813 kwargs['heads'] = pullop.heads or pullop.rheads
800 814 if 'b2x:listkeys' in remotecaps:
801 815 kwargs['listkeys'] = ['phase']
802 816 if not pullop.fetch:
803 817 pullop.repo.ui.status(_("no changes found\n"))
804 818 pullop.cgresult = 0
805 819 else:
806 820 if pullop.heads is None and list(pullop.common) == [nullid]:
807 821 pullop.repo.ui.status(_("requesting all changes\n"))
808 822 _pullbundle2extraprepare(pullop, kwargs)
809 823 if kwargs.keys() == ['format']:
810 824 return # nothing to pull
811 825 bundle = pullop.remote.getbundle('pull', **kwargs)
812 826 try:
813 827 op = bundle2.processbundle(pullop.repo, bundle, pullop.gettransaction)
814 828 except error.BundleValueError, exc:
815 829 raise util.Abort('missing support for %s' % exc)
816 830
817 831 if pullop.fetch:
818 832 assert len(op.records['changegroup']) == 1
819 833 pullop.cgresult = op.records['changegroup'][0]['return']
820 834
821 835 # processing phases change
822 836 for namespace, value in op.records['listkeys']:
823 837 if namespace == 'phases':
824 838 _pullapplyphases(pullop, value)
825 839
826 840 def _pullbundle2extraprepare(pullop, kwargs):
827 841 """hook function so that extensions can extend the getbundle call"""
828 842 pass
829 843
830 844 def _pullchangeset(pullop):
831 845 """pull changeset from unbundle into the local repo"""
832 846 # We delay the open of the transaction as late as possible so we
833 847 # don't open transaction for nothing or you break future useful
834 848 # rollback call
835 849 pullop.todosteps.remove('changegroup')
836 850 if not pullop.fetch:
837 851 pullop.repo.ui.status(_("no changes found\n"))
838 852 pullop.cgresult = 0
839 853 return
840 854 pullop.gettransaction()
841 855 if pullop.heads is None and list(pullop.common) == [nullid]:
842 856 pullop.repo.ui.status(_("requesting all changes\n"))
843 857 elif pullop.heads is None and pullop.remote.capable('changegroupsubset'):
844 858 # issue1320, avoid a race if remote changed after discovery
845 859 pullop.heads = pullop.rheads
846 860
847 861 if pullop.remote.capable('getbundle'):
848 862 # TODO: get bundlecaps from remote
849 863 cg = pullop.remote.getbundle('pull', common=pullop.common,
850 864 heads=pullop.heads or pullop.rheads)
851 865 elif pullop.heads is None:
852 866 cg = pullop.remote.changegroup(pullop.fetch, 'pull')
853 867 elif not pullop.remote.capable('changegroupsubset'):
854 868 raise util.Abort(_("partial pull cannot be done because "
855 869 "other repository doesn't support "
856 870 "changegroupsubset."))
857 871 else:
858 872 cg = pullop.remote.changegroupsubset(pullop.fetch, pullop.heads, 'pull')
859 873 pullop.cgresult = changegroup.addchangegroup(pullop.repo, cg, 'pull',
860 874 pullop.remote.url())
861 875
862 876 def _pullphase(pullop):
863 877 # Get remote phases data from remote
864 878 remotephases = pullop.remote.listkeys('phases')
865 879 _pullapplyphases(pullop, remotephases)
866 880
867 881 def _pullapplyphases(pullop, remotephases):
868 882 """apply phase movement from observed remote state"""
869 883 pullop.todosteps.remove('phases')
870 884 publishing = bool(remotephases.get('publishing', False))
871 885 if remotephases and not publishing:
872 886 # remote is new and unpublishing
873 887 pheads, _dr = phases.analyzeremotephases(pullop.repo,
874 888 pullop.pulledsubset,
875 889 remotephases)
876 890 dheads = pullop.pulledsubset
877 891 else:
878 892 # Remote is old or publishing all common changesets
879 893 # should be seen as public
880 894 pheads = pullop.pulledsubset
881 895 dheads = []
882 896 unfi = pullop.repo.unfiltered()
883 897 phase = unfi._phasecache.phase
884 898 rev = unfi.changelog.nodemap.get
885 899 public = phases.public
886 900 draft = phases.draft
887 901
888 902 # exclude changesets already public locally and update the others
889 903 pheads = [pn for pn in pheads if phase(unfi, rev(pn)) > public]
890 904 if pheads:
891 905 tr = pullop.gettransaction()
892 906 phases.advanceboundary(pullop.repo, tr, public, pheads)
893 907
894 908 # exclude changesets already draft locally and update the others
895 909 dheads = [pn for pn in dheads if phase(unfi, rev(pn)) > draft]
896 910 if dheads:
897 911 tr = pullop.gettransaction()
898 912 phases.advanceboundary(pullop.repo, tr, draft, dheads)
899 913
900 914 def _pullobsolete(pullop):
901 915 """utility function to pull obsolete markers from a remote
902 916
903 917 The `gettransaction` is function that return the pull transaction, creating
904 918 one if necessary. We return the transaction to inform the calling code that
905 919 a new transaction have been created (when applicable).
906 920
907 921 Exists mostly to allow overriding for experimentation purpose"""
908 922 pullop.todosteps.remove('obsmarkers')
909 923 tr = None
910 924 if obsolete._enabled:
911 925 pullop.repo.ui.debug('fetching remote obsolete markers\n')
912 926 remoteobs = pullop.remote.listkeys('obsolete')
913 927 if 'dump0' in remoteobs:
914 928 tr = pullop.gettransaction()
915 929 for key in sorted(remoteobs, reverse=True):
916 930 if key.startswith('dump'):
917 931 data = base85.b85decode(remoteobs[key])
918 932 pullop.repo.obsstore.mergemarkers(tr, data)
919 933 pullop.repo.invalidatevolatilesets()
920 934 return tr
921 935
922 936 def caps20to10(repo):
923 937 """return a set with appropriate options to use bundle20 during getbundle"""
924 938 caps = set(['HG2X'])
925 939 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
926 940 caps.add('bundle2=' + urllib.quote(capsblob))
927 941 return caps
928 942
929 943 def getbundle(repo, source, heads=None, common=None, bundlecaps=None,
930 944 **kwargs):
931 945 """return a full bundle (with potentially multiple kind of parts)
932 946
933 947 Could be a bundle HG10 or a bundle HG2X depending on bundlecaps
934 948 passed. For now, the bundle can contain only changegroup, but this will
935 949 changes when more part type will be available for bundle2.
936 950
937 951 This is different from changegroup.getbundle that only returns an HG10
938 952 changegroup bundle. They may eventually get reunited in the future when we
939 953 have a clearer idea of the API we what to query different data.
940 954
941 955 The implementation is at a very early stage and will get massive rework
942 956 when the API of bundle is refined.
943 957 """
944 958 cg = None
945 959 if kwargs.get('cg', True):
946 960 # build changegroup bundle here.
947 961 cg = changegroup.getbundle(repo, source, heads=heads,
948 962 common=common, bundlecaps=bundlecaps)
949 963 elif 'HG2X' not in bundlecaps:
950 964 raise ValueError(_('request for bundle10 must include changegroup'))
951 965 if bundlecaps is None or 'HG2X' not in bundlecaps:
952 966 if kwargs:
953 967 raise ValueError(_('unsupported getbundle arguments: %s')
954 968 % ', '.join(sorted(kwargs.keys())))
955 969 return cg
956 970 # very crude first implementation,
957 971 # the bundle API will change and the generation will be done lazily.
958 972 b2caps = {}
959 973 for bcaps in bundlecaps:
960 974 if bcaps.startswith('bundle2='):
961 975 blob = urllib.unquote(bcaps[len('bundle2='):])
962 976 b2caps.update(bundle2.decodecaps(blob))
963 977 bundler = bundle2.bundle20(repo.ui, b2caps)
964 978 if cg:
965 979 bundler.newpart('b2x:changegroup', data=cg.getchunks())
966 980 listkeys = kwargs.get('listkeys', ())
967 981 for namespace in listkeys:
968 982 part = bundler.newpart('b2x:listkeys')
969 983 part.addparam('namespace', namespace)
970 984 keys = repo.listkeys(namespace).items()
971 985 part.data = pushkey.encodekeys(keys)
972 986 _getbundleextrapart(bundler, repo, source, heads=heads, common=common,
973 987 bundlecaps=bundlecaps, **kwargs)
974 988 return util.chunkbuffer(bundler.getchunks())
975 989
976 990 def _getbundleextrapart(bundler, repo, source, heads=None, common=None,
977 991 bundlecaps=None, **kwargs):
978 992 """hook function to let extensions add parts to the requested bundle"""
979 993 pass
980 994
981 995 def check_heads(repo, their_heads, context):
982 996 """check if the heads of a repo have been modified
983 997
984 998 Used by peer for unbundling.
985 999 """
986 1000 heads = repo.heads()
987 1001 heads_hash = util.sha1(''.join(sorted(heads))).digest()
988 1002 if not (their_heads == ['force'] or their_heads == heads or
989 1003 their_heads == ['hashed', heads_hash]):
990 1004 # someone else committed/pushed/unbundled while we
991 1005 # were transferring data
992 1006 raise error.PushRaced('repository changed while %s - '
993 1007 'please try again' % context)
994 1008
995 1009 def unbundle(repo, cg, heads, source, url):
996 1010 """Apply a bundle to a repo.
997 1011
998 1012 this function makes sure the repo is locked during the application and have
999 1013 mechanism to check that no push race occurred between the creation of the
1000 1014 bundle and its application.
1001 1015
1002 1016 If the push was raced as PushRaced exception is raised."""
1003 1017 r = 0
1004 1018 # need a transaction when processing a bundle2 stream
1005 1019 tr = None
1006 1020 lock = repo.lock()
1007 1021 try:
1008 1022 check_heads(repo, heads, 'uploading changes')
1009 1023 # push can proceed
1010 1024 if util.safehasattr(cg, 'params'):
1011 1025 try:
1012 1026 tr = repo.transaction('unbundle')
1013 1027 tr.hookargs['bundle2-exp'] = '1'
1014 1028 r = bundle2.processbundle(repo, cg, lambda: tr).reply
1015 1029 cl = repo.unfiltered().changelog
1016 1030 p = cl.writepending() and repo.root or ""
1017 1031 repo.hook('b2x-pretransactionclose', throw=True, source=source,
1018 1032 url=url, pending=p, **tr.hookargs)
1019 1033 tr.close()
1020 1034 repo.hook('b2x-transactionclose', source=source, url=url,
1021 1035 **tr.hookargs)
1022 1036 except Exception, exc:
1023 1037 exc.duringunbundle2 = True
1024 1038 raise
1025 1039 else:
1026 1040 r = changegroup.addchangegroup(repo, cg, source, url)
1027 1041 finally:
1028 1042 if tr is not None:
1029 1043 tr.release()
1030 1044 lock.release()
1031 1045 return r
General Comments 0
You need to be logged in to leave comments. Login now