##// END OF EJS Templates
bundle2: separate bundle10 and bundle2 cases in getbundle()...
Mike Hommey -
r22542:6b180a0c default
parent child Browse files
Show More
@@ -1,1078 +1,1120 b''
1 1 # exchange.py - utility to exchange data between repos.
2 2 #
3 3 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from i18n import _
9 9 from node import hex, nullid
10 10 import errno, urllib
11 11 import util, scmutil, changegroup, base85, error
12 12 import discovery, phases, obsolete, bookmarks, bundle2, pushkey
13 13
14 14 def readbundle(ui, fh, fname, vfs=None):
15 15 header = changegroup.readexactly(fh, 4)
16 16
17 17 alg = None
18 18 if not fname:
19 19 fname = "stream"
20 20 if not header.startswith('HG') and header.startswith('\0'):
21 21 fh = changegroup.headerlessfixup(fh, header)
22 22 header = "HG10"
23 23 alg = 'UN'
24 24 elif vfs:
25 25 fname = vfs.join(fname)
26 26
27 27 magic, version = header[0:2], header[2:4]
28 28
29 29 if magic != 'HG':
30 30 raise util.Abort(_('%s: not a Mercurial bundle') % fname)
31 31 if version == '10':
32 32 if alg is None:
33 33 alg = changegroup.readexactly(fh, 2)
34 34 return changegroup.cg1unpacker(fh, alg)
35 35 elif version == '2X':
36 36 return bundle2.unbundle20(ui, fh, header=magic + version)
37 37 else:
38 38 raise util.Abort(_('%s: unknown bundle version %s') % (fname, version))
39 39
40 40 def buildobsmarkerspart(bundler, markers):
41 41 """add an obsmarker part to the bundler with <markers>
42 42
43 43 No part is created if markers is empty.
44 44 Raises ValueError if the bundler doesn't support any known obsmarker format.
45 45 """
46 46 if markers:
47 47 remoteversions = bundle2.obsmarkersversion(bundler.capabilities)
48 48 version = obsolete.commonversion(remoteversions)
49 49 if version is None:
50 50 raise ValueError('bundler do not support common obsmarker format')
51 51 stream = obsolete.encodemarkers(markers, True, version=version)
52 52 return bundler.newpart('B2X:OBSMARKERS', data=stream)
53 53 return None
54 54
55 55 class pushoperation(object):
56 56 """A object that represent a single push operation
57 57
58 58 It purpose is to carry push related state and very common operation.
59 59
60 60 A new should be created at the beginning of each push and discarded
61 61 afterward.
62 62 """
63 63
64 64 def __init__(self, repo, remote, force=False, revs=None, newbranch=False):
65 65 # repo we push from
66 66 self.repo = repo
67 67 self.ui = repo.ui
68 68 # repo we push to
69 69 self.remote = remote
70 70 # force option provided
71 71 self.force = force
72 72 # revs to be pushed (None is "all")
73 73 self.revs = revs
74 74 # allow push of new branch
75 75 self.newbranch = newbranch
76 76 # did a local lock get acquired?
77 77 self.locallocked = None
78 78 # step already performed
79 79 # (used to check what steps have been already performed through bundle2)
80 80 self.stepsdone = set()
81 81 # Integer version of the push result
82 82 # - None means nothing to push
83 83 # - 0 means HTTP error
84 84 # - 1 means we pushed and remote head count is unchanged *or*
85 85 # we have outgoing changesets but refused to push
86 86 # - other values as described by addchangegroup()
87 87 self.ret = None
88 88 # discover.outgoing object (contains common and outgoing data)
89 89 self.outgoing = None
90 90 # all remote heads before the push
91 91 self.remoteheads = None
92 92 # testable as a boolean indicating if any nodes are missing locally.
93 93 self.incoming = None
94 94 # phases changes that must be pushed along side the changesets
95 95 self.outdatedphases = None
96 96 # phases changes that must be pushed if changeset push fails
97 97 self.fallbackoutdatedphases = None
98 98 # outgoing obsmarkers
99 99 self.outobsmarkers = set()
100 100 # outgoing bookmarks
101 101 self.outbookmarks = []
102 102
103 103 @util.propertycache
104 104 def futureheads(self):
105 105 """future remote heads if the changeset push succeeds"""
106 106 return self.outgoing.missingheads
107 107
108 108 @util.propertycache
109 109 def fallbackheads(self):
110 110 """future remote heads if the changeset push fails"""
111 111 if self.revs is None:
112 112 # not target to push, all common are relevant
113 113 return self.outgoing.commonheads
114 114 unfi = self.repo.unfiltered()
115 115 # I want cheads = heads(::missingheads and ::commonheads)
116 116 # (missingheads is revs with secret changeset filtered out)
117 117 #
118 118 # This can be expressed as:
119 119 # cheads = ( (missingheads and ::commonheads)
120 120 # + (commonheads and ::missingheads))"
121 121 # )
122 122 #
123 123 # while trying to push we already computed the following:
124 124 # common = (::commonheads)
125 125 # missing = ((commonheads::missingheads) - commonheads)
126 126 #
127 127 # We can pick:
128 128 # * missingheads part of common (::commonheads)
129 129 common = set(self.outgoing.common)
130 130 nm = self.repo.changelog.nodemap
131 131 cheads = [node for node in self.revs if nm[node] in common]
132 132 # and
133 133 # * commonheads parents on missing
134 134 revset = unfi.set('%ln and parents(roots(%ln))',
135 135 self.outgoing.commonheads,
136 136 self.outgoing.missing)
137 137 cheads.extend(c.node() for c in revset)
138 138 return cheads
139 139
140 140 @property
141 141 def commonheads(self):
142 142 """set of all common heads after changeset bundle push"""
143 143 if self.ret:
144 144 return self.futureheads
145 145 else:
146 146 return self.fallbackheads
147 147
148 148 def push(repo, remote, force=False, revs=None, newbranch=False):
149 149 '''Push outgoing changesets (limited by revs) from a local
150 150 repository to remote. Return an integer:
151 151 - None means nothing to push
152 152 - 0 means HTTP error
153 153 - 1 means we pushed and remote head count is unchanged *or*
154 154 we have outgoing changesets but refused to push
155 155 - other values as described by addchangegroup()
156 156 '''
157 157 pushop = pushoperation(repo, remote, force, revs, newbranch)
158 158 if pushop.remote.local():
159 159 missing = (set(pushop.repo.requirements)
160 160 - pushop.remote.local().supported)
161 161 if missing:
162 162 msg = _("required features are not"
163 163 " supported in the destination:"
164 164 " %s") % (', '.join(sorted(missing)))
165 165 raise util.Abort(msg)
166 166
167 167 # there are two ways to push to remote repo:
168 168 #
169 169 # addchangegroup assumes local user can lock remote
170 170 # repo (local filesystem, old ssh servers).
171 171 #
172 172 # unbundle assumes local user cannot lock remote repo (new ssh
173 173 # servers, http servers).
174 174
175 175 if not pushop.remote.canpush():
176 176 raise util.Abort(_("destination does not support push"))
177 177 # get local lock as we might write phase data
178 178 locallock = None
179 179 try:
180 180 locallock = pushop.repo.lock()
181 181 pushop.locallocked = True
182 182 except IOError, err:
183 183 pushop.locallocked = False
184 184 if err.errno != errno.EACCES:
185 185 raise
186 186 # source repo cannot be locked.
187 187 # We do not abort the push, but just disable the local phase
188 188 # synchronisation.
189 189 msg = 'cannot lock source repository: %s\n' % err
190 190 pushop.ui.debug(msg)
191 191 try:
192 192 pushop.repo.checkpush(pushop)
193 193 lock = None
194 194 unbundle = pushop.remote.capable('unbundle')
195 195 if not unbundle:
196 196 lock = pushop.remote.lock()
197 197 try:
198 198 _pushdiscovery(pushop)
199 199 if (pushop.repo.ui.configbool('experimental', 'bundle2-exp',
200 200 False)
201 201 and pushop.remote.capable('bundle2-exp')):
202 202 _pushbundle2(pushop)
203 203 _pushchangeset(pushop)
204 204 _pushsyncphase(pushop)
205 205 _pushobsolete(pushop)
206 206 _pushbookmark(pushop)
207 207 finally:
208 208 if lock is not None:
209 209 lock.release()
210 210 finally:
211 211 if locallock is not None:
212 212 locallock.release()
213 213
214 214 return pushop.ret
215 215
216 216 # list of steps to perform discovery before push
217 217 pushdiscoveryorder = []
218 218
219 219 # Mapping between step name and function
220 220 #
221 221 # This exists to help extensions wrap steps if necessary
222 222 pushdiscoverymapping = {}
223 223
224 224 def pushdiscovery(stepname):
225 225 """decorator for function performing discovery before push
226 226
227 227 The function is added to the step -> function mapping and appended to the
228 228 list of steps. Beware that decorated function will be added in order (this
229 229 may matter).
230 230
231 231 You can only use this decorator for a new step, if you want to wrap a step
232 232 from an extension, change the pushdiscovery dictionary directly."""
233 233 def dec(func):
234 234 assert stepname not in pushdiscoverymapping
235 235 pushdiscoverymapping[stepname] = func
236 236 pushdiscoveryorder.append(stepname)
237 237 return func
238 238 return dec
239 239
240 240 def _pushdiscovery(pushop):
241 241 """Run all discovery steps"""
242 242 for stepname in pushdiscoveryorder:
243 243 step = pushdiscoverymapping[stepname]
244 244 step(pushop)
245 245
246 246 @pushdiscovery('changeset')
247 247 def _pushdiscoverychangeset(pushop):
248 248 """discover the changeset that need to be pushed"""
249 249 unfi = pushop.repo.unfiltered()
250 250 fci = discovery.findcommonincoming
251 251 commoninc = fci(unfi, pushop.remote, force=pushop.force)
252 252 common, inc, remoteheads = commoninc
253 253 fco = discovery.findcommonoutgoing
254 254 outgoing = fco(unfi, pushop.remote, onlyheads=pushop.revs,
255 255 commoninc=commoninc, force=pushop.force)
256 256 pushop.outgoing = outgoing
257 257 pushop.remoteheads = remoteheads
258 258 pushop.incoming = inc
259 259
260 260 @pushdiscovery('phase')
261 261 def _pushdiscoveryphase(pushop):
262 262 """discover the phase that needs to be pushed
263 263
264 264 (computed for both success and failure case for changesets push)"""
265 265 outgoing = pushop.outgoing
266 266 unfi = pushop.repo.unfiltered()
267 267 remotephases = pushop.remote.listkeys('phases')
268 268 publishing = remotephases.get('publishing', False)
269 269 ana = phases.analyzeremotephases(pushop.repo,
270 270 pushop.fallbackheads,
271 271 remotephases)
272 272 pheads, droots = ana
273 273 extracond = ''
274 274 if not publishing:
275 275 extracond = ' and public()'
276 276 revset = 'heads((%%ln::%%ln) %s)' % extracond
277 277 # Get the list of all revs draft on remote by public here.
278 278 # XXX Beware that revset break if droots is not strictly
279 279 # XXX root we may want to ensure it is but it is costly
280 280 fallback = list(unfi.set(revset, droots, pushop.fallbackheads))
281 281 if not outgoing.missing:
282 282 future = fallback
283 283 else:
284 284 # adds changeset we are going to push as draft
285 285 #
286 286 # should not be necessary for pushblishing server, but because of an
287 287 # issue fixed in xxxxx we have to do it anyway.
288 288 fdroots = list(unfi.set('roots(%ln + %ln::)',
289 289 outgoing.missing, droots))
290 290 fdroots = [f.node() for f in fdroots]
291 291 future = list(unfi.set(revset, fdroots, pushop.futureheads))
292 292 pushop.outdatedphases = future
293 293 pushop.fallbackoutdatedphases = fallback
294 294
295 295 @pushdiscovery('obsmarker')
296 296 def _pushdiscoveryobsmarkers(pushop):
297 297 if (obsolete._enabled
298 298 and pushop.repo.obsstore
299 299 and 'obsolete' in pushop.remote.listkeys('namespaces')):
300 300 repo = pushop.repo
301 301 # very naive computation, that can be quite expensive on big repo.
302 302 # However: evolution is currently slow on them anyway.
303 303 nodes = (c.node() for c in repo.set('::%ln', pushop.futureheads))
304 304 pushop.outobsmarkers = pushop.repo.obsstore.relevantmarkers(nodes)
305 305
306 306 @pushdiscovery('bookmarks')
307 307 def _pushdiscoverybookmarks(pushop):
308 308 ui = pushop.ui
309 309 repo = pushop.repo.unfiltered()
310 310 remote = pushop.remote
311 311 ui.debug("checking for updated bookmarks\n")
312 312 ancestors = ()
313 313 if pushop.revs:
314 314 revnums = map(repo.changelog.rev, pushop.revs)
315 315 ancestors = repo.changelog.ancestors(revnums, inclusive=True)
316 316 remotebookmark = remote.listkeys('bookmarks')
317 317
318 318 comp = bookmarks.compare(repo, repo._bookmarks, remotebookmark, srchex=hex)
319 319 addsrc, adddst, advsrc, advdst, diverge, differ, invalid = comp
320 320 for b, scid, dcid in advsrc:
321 321 if not ancestors or repo[scid].rev() in ancestors:
322 322 pushop.outbookmarks.append((b, dcid, scid))
323 323
324 324 def _pushcheckoutgoing(pushop):
325 325 outgoing = pushop.outgoing
326 326 unfi = pushop.repo.unfiltered()
327 327 if not outgoing.missing:
328 328 # nothing to push
329 329 scmutil.nochangesfound(unfi.ui, unfi, outgoing.excluded)
330 330 return False
331 331 # something to push
332 332 if not pushop.force:
333 333 # if repo.obsstore == False --> no obsolete
334 334 # then, save the iteration
335 335 if unfi.obsstore:
336 336 # this message are here for 80 char limit reason
337 337 mso = _("push includes obsolete changeset: %s!")
338 338 mst = "push includes %s changeset: %s!"
339 339 # plain versions for i18n tool to detect them
340 340 _("push includes unstable changeset: %s!")
341 341 _("push includes bumped changeset: %s!")
342 342 _("push includes divergent changeset: %s!")
343 343 # If we are to push if there is at least one
344 344 # obsolete or unstable changeset in missing, at
345 345 # least one of the missinghead will be obsolete or
346 346 # unstable. So checking heads only is ok
347 347 for node in outgoing.missingheads:
348 348 ctx = unfi[node]
349 349 if ctx.obsolete():
350 350 raise util.Abort(mso % ctx)
351 351 elif ctx.troubled():
352 352 raise util.Abort(_(mst)
353 353 % (ctx.troubles()[0],
354 354 ctx))
355 355 newbm = pushop.ui.configlist('bookmarks', 'pushing')
356 356 discovery.checkheads(unfi, pushop.remote, outgoing,
357 357 pushop.remoteheads,
358 358 pushop.newbranch,
359 359 bool(pushop.incoming),
360 360 newbm)
361 361 return True
362 362
363 363 # List of names of steps to perform for an outgoing bundle2, order matters.
364 364 b2partsgenorder = []
365 365
366 366 # Mapping between step name and function
367 367 #
368 368 # This exists to help extensions wrap steps if necessary
369 369 b2partsgenmapping = {}
370 370
371 371 def b2partsgenerator(stepname):
372 372 """decorator for function generating bundle2 part
373 373
374 374 The function is added to the step -> function mapping and appended to the
375 375 list of steps. Beware that decorated functions will be added in order
376 376 (this may matter).
377 377
378 378 You can only use this decorator for new steps, if you want to wrap a step
379 379 from an extension, attack the b2partsgenmapping dictionary directly."""
380 380 def dec(func):
381 381 assert stepname not in b2partsgenmapping
382 382 b2partsgenmapping[stepname] = func
383 383 b2partsgenorder.append(stepname)
384 384 return func
385 385 return dec
386 386
387 387 @b2partsgenerator('changeset')
388 388 def _pushb2ctx(pushop, bundler):
389 389 """handle changegroup push through bundle2
390 390
391 391 addchangegroup result is stored in the ``pushop.ret`` attribute.
392 392 """
393 393 if 'changesets' in pushop.stepsdone:
394 394 return
395 395 pushop.stepsdone.add('changesets')
396 396 # Send known heads to the server for race detection.
397 397 if not _pushcheckoutgoing(pushop):
398 398 return
399 399 pushop.repo.prepushoutgoinghooks(pushop.repo,
400 400 pushop.remote,
401 401 pushop.outgoing)
402 402 if not pushop.force:
403 403 bundler.newpart('B2X:CHECK:HEADS', data=iter(pushop.remoteheads))
404 404 cg = changegroup.getlocalchangegroup(pushop.repo, 'push', pushop.outgoing)
405 405 cgpart = bundler.newpart('B2X:CHANGEGROUP', data=cg.getchunks())
406 406 def handlereply(op):
407 407 """extract addchangroup returns from server reply"""
408 408 cgreplies = op.records.getreplies(cgpart.id)
409 409 assert len(cgreplies['changegroup']) == 1
410 410 pushop.ret = cgreplies['changegroup'][0]['return']
411 411 return handlereply
412 412
413 413 @b2partsgenerator('phase')
414 414 def _pushb2phases(pushop, bundler):
415 415 """handle phase push through bundle2"""
416 416 if 'phases' in pushop.stepsdone:
417 417 return
418 418 b2caps = bundle2.bundle2caps(pushop.remote)
419 419 if not 'b2x:pushkey' in b2caps:
420 420 return
421 421 pushop.stepsdone.add('phases')
422 422 part2node = []
423 423 enc = pushkey.encode
424 424 for newremotehead in pushop.outdatedphases:
425 425 part = bundler.newpart('b2x:pushkey')
426 426 part.addparam('namespace', enc('phases'))
427 427 part.addparam('key', enc(newremotehead.hex()))
428 428 part.addparam('old', enc(str(phases.draft)))
429 429 part.addparam('new', enc(str(phases.public)))
430 430 part2node.append((part.id, newremotehead))
431 431 def handlereply(op):
432 432 for partid, node in part2node:
433 433 partrep = op.records.getreplies(partid)
434 434 results = partrep['pushkey']
435 435 assert len(results) <= 1
436 436 msg = None
437 437 if not results:
438 438 msg = _('server ignored update of %s to public!\n') % node
439 439 elif not int(results[0]['return']):
440 440 msg = _('updating %s to public failed!\n') % node
441 441 if msg is not None:
442 442 pushop.ui.warn(msg)
443 443 return handlereply
444 444
445 445 @b2partsgenerator('obsmarkers')
446 446 def _pushb2obsmarkers(pushop, bundler):
447 447 if 'obsmarkers' in pushop.stepsdone:
448 448 return
449 449 remoteversions = bundle2.obsmarkersversion(bundler.capabilities)
450 450 if obsolete.commonversion(remoteversions) is None:
451 451 return
452 452 pushop.stepsdone.add('obsmarkers')
453 453 if pushop.outobsmarkers:
454 454 buildobsmarkerspart(bundler, pushop.outobsmarkers)
455 455
456 456 @b2partsgenerator('bookmarks')
457 457 def _pushb2bookmarks(pushop, bundler):
458 458 """handle phase push through bundle2"""
459 459 if 'bookmarks' in pushop.stepsdone:
460 460 return
461 461 b2caps = bundle2.bundle2caps(pushop.remote)
462 462 if 'b2x:pushkey' not in b2caps:
463 463 return
464 464 pushop.stepsdone.add('bookmarks')
465 465 part2book = []
466 466 enc = pushkey.encode
467 467 for book, old, new in pushop.outbookmarks:
468 468 part = bundler.newpart('b2x:pushkey')
469 469 part.addparam('namespace', enc('bookmarks'))
470 470 part.addparam('key', enc(book))
471 471 part.addparam('old', enc(old))
472 472 part.addparam('new', enc(new))
473 473 part2book.append((part.id, book))
474 474 def handlereply(op):
475 475 for partid, book in part2book:
476 476 partrep = op.records.getreplies(partid)
477 477 results = partrep['pushkey']
478 478 assert len(results) <= 1
479 479 if not results:
480 480 pushop.ui.warn(_('server ignored bookmark %s update\n') % book)
481 481 else:
482 482 ret = int(results[0]['return'])
483 483 if ret:
484 484 pushop.ui.status(_("updating bookmark %s\n") % book)
485 485 else:
486 486 pushop.ui.warn(_('updating bookmark %s failed!\n') % book)
487 487 return handlereply
488 488
489 489
490 490 def _pushbundle2(pushop):
491 491 """push data to the remote using bundle2
492 492
493 493 The only currently supported type of data is changegroup but this will
494 494 evolve in the future."""
495 495 bundler = bundle2.bundle20(pushop.ui, bundle2.bundle2caps(pushop.remote))
496 496 # create reply capability
497 497 capsblob = bundle2.encodecaps(bundle2.getrepocaps(pushop.repo))
498 498 bundler.newpart('b2x:replycaps', data=capsblob)
499 499 replyhandlers = []
500 500 for partgenname in b2partsgenorder:
501 501 partgen = b2partsgenmapping[partgenname]
502 502 ret = partgen(pushop, bundler)
503 503 if callable(ret):
504 504 replyhandlers.append(ret)
505 505 # do not push if nothing to push
506 506 if bundler.nbparts <= 1:
507 507 return
508 508 stream = util.chunkbuffer(bundler.getchunks())
509 509 try:
510 510 reply = pushop.remote.unbundle(stream, ['force'], 'push')
511 511 except error.BundleValueError, exc:
512 512 raise util.Abort('missing support for %s' % exc)
513 513 try:
514 514 op = bundle2.processbundle(pushop.repo, reply)
515 515 except error.BundleValueError, exc:
516 516 raise util.Abort('missing support for %s' % exc)
517 517 for rephand in replyhandlers:
518 518 rephand(op)
519 519
520 520 def _pushchangeset(pushop):
521 521 """Make the actual push of changeset bundle to remote repo"""
522 522 if 'changesets' in pushop.stepsdone:
523 523 return
524 524 pushop.stepsdone.add('changesets')
525 525 if not _pushcheckoutgoing(pushop):
526 526 return
527 527 pushop.repo.prepushoutgoinghooks(pushop.repo,
528 528 pushop.remote,
529 529 pushop.outgoing)
530 530 outgoing = pushop.outgoing
531 531 unbundle = pushop.remote.capable('unbundle')
532 532 # TODO: get bundlecaps from remote
533 533 bundlecaps = None
534 534 # create a changegroup from local
535 535 if pushop.revs is None and not (outgoing.excluded
536 536 or pushop.repo.changelog.filteredrevs):
537 537 # push everything,
538 538 # use the fast path, no race possible on push
539 539 bundler = changegroup.cg1packer(pushop.repo, bundlecaps)
540 540 cg = changegroup.getsubset(pushop.repo,
541 541 outgoing,
542 542 bundler,
543 543 'push',
544 544 fastpath=True)
545 545 else:
546 546 cg = changegroup.getlocalchangegroup(pushop.repo, 'push', outgoing,
547 547 bundlecaps)
548 548
549 549 # apply changegroup to remote
550 550 if unbundle:
551 551 # local repo finds heads on server, finds out what
552 552 # revs it must push. once revs transferred, if server
553 553 # finds it has different heads (someone else won
554 554 # commit/push race), server aborts.
555 555 if pushop.force:
556 556 remoteheads = ['force']
557 557 else:
558 558 remoteheads = pushop.remoteheads
559 559 # ssh: return remote's addchangegroup()
560 560 # http: return remote's addchangegroup() or 0 for error
561 561 pushop.ret = pushop.remote.unbundle(cg, remoteheads,
562 562 pushop.repo.url())
563 563 else:
564 564 # we return an integer indicating remote head count
565 565 # change
566 566 pushop.ret = pushop.remote.addchangegroup(cg, 'push', pushop.repo.url())
567 567
568 568 def _pushsyncphase(pushop):
569 569 """synchronise phase information locally and remotely"""
570 570 cheads = pushop.commonheads
571 571 # even when we don't push, exchanging phase data is useful
572 572 remotephases = pushop.remote.listkeys('phases')
573 573 if (pushop.ui.configbool('ui', '_usedassubrepo', False)
574 574 and remotephases # server supports phases
575 575 and pushop.ret is None # nothing was pushed
576 576 and remotephases.get('publishing', False)):
577 577 # When:
578 578 # - this is a subrepo push
579 579 # - and remote support phase
580 580 # - and no changeset was pushed
581 581 # - and remote is publishing
582 582 # We may be in issue 3871 case!
583 583 # We drop the possible phase synchronisation done by
584 584 # courtesy to publish changesets possibly locally draft
585 585 # on the remote.
586 586 remotephases = {'publishing': 'True'}
587 587 if not remotephases: # old server or public only reply from non-publishing
588 588 _localphasemove(pushop, cheads)
589 589 # don't push any phase data as there is nothing to push
590 590 else:
591 591 ana = phases.analyzeremotephases(pushop.repo, cheads,
592 592 remotephases)
593 593 pheads, droots = ana
594 594 ### Apply remote phase on local
595 595 if remotephases.get('publishing', False):
596 596 _localphasemove(pushop, cheads)
597 597 else: # publish = False
598 598 _localphasemove(pushop, pheads)
599 599 _localphasemove(pushop, cheads, phases.draft)
600 600 ### Apply local phase on remote
601 601
602 602 if pushop.ret:
603 603 if 'phases' in pushop.stepsdone:
604 604 # phases already pushed though bundle2
605 605 return
606 606 outdated = pushop.outdatedphases
607 607 else:
608 608 outdated = pushop.fallbackoutdatedphases
609 609
610 610 pushop.stepsdone.add('phases')
611 611
612 612 # filter heads already turned public by the push
613 613 outdated = [c for c in outdated if c.node() not in pheads]
614 614 b2caps = bundle2.bundle2caps(pushop.remote)
615 615 if 'b2x:pushkey' in b2caps:
616 616 # server supports bundle2, let's do a batched push through it
617 617 #
618 618 # This will eventually be unified with the changesets bundle2 push
619 619 bundler = bundle2.bundle20(pushop.ui, b2caps)
620 620 capsblob = bundle2.encodecaps(bundle2.getrepocaps(pushop.repo))
621 621 bundler.newpart('b2x:replycaps', data=capsblob)
622 622 part2node = []
623 623 enc = pushkey.encode
624 624 for newremotehead in outdated:
625 625 part = bundler.newpart('b2x:pushkey')
626 626 part.addparam('namespace', enc('phases'))
627 627 part.addparam('key', enc(newremotehead.hex()))
628 628 part.addparam('old', enc(str(phases.draft)))
629 629 part.addparam('new', enc(str(phases.public)))
630 630 part2node.append((part.id, newremotehead))
631 631 stream = util.chunkbuffer(bundler.getchunks())
632 632 try:
633 633 reply = pushop.remote.unbundle(stream, ['force'], 'push')
634 634 op = bundle2.processbundle(pushop.repo, reply)
635 635 except error.BundleValueError, exc:
636 636 raise util.Abort('missing support for %s' % exc)
637 637 for partid, node in part2node:
638 638 partrep = op.records.getreplies(partid)
639 639 results = partrep['pushkey']
640 640 assert len(results) <= 1
641 641 msg = None
642 642 if not results:
643 643 msg = _('server ignored update of %s to public!\n') % node
644 644 elif not int(results[0]['return']):
645 645 msg = _('updating %s to public failed!\n') % node
646 646 if msg is not None:
647 647 pushop.ui.warn(msg)
648 648
649 649 else:
650 650 # fallback to independant pushkey command
651 651 for newremotehead in outdated:
652 652 r = pushop.remote.pushkey('phases',
653 653 newremotehead.hex(),
654 654 str(phases.draft),
655 655 str(phases.public))
656 656 if not r:
657 657 pushop.ui.warn(_('updating %s to public failed!\n')
658 658 % newremotehead)
659 659
660 660 def _localphasemove(pushop, nodes, phase=phases.public):
661 661 """move <nodes> to <phase> in the local source repo"""
662 662 if pushop.locallocked:
663 663 tr = pushop.repo.transaction('push-phase-sync')
664 664 try:
665 665 phases.advanceboundary(pushop.repo, tr, phase, nodes)
666 666 tr.close()
667 667 finally:
668 668 tr.release()
669 669 else:
670 670 # repo is not locked, do not change any phases!
671 671 # Informs the user that phases should have been moved when
672 672 # applicable.
673 673 actualmoves = [n for n in nodes if phase < pushop.repo[n].phase()]
674 674 phasestr = phases.phasenames[phase]
675 675 if actualmoves:
676 676 pushop.ui.status(_('cannot lock source repo, skipping '
677 677 'local %s phase update\n') % phasestr)
678 678
679 679 def _pushobsolete(pushop):
680 680 """utility function to push obsolete markers to a remote"""
681 681 if 'obsmarkers' in pushop.stepsdone:
682 682 return
683 683 pushop.ui.debug('try to push obsolete markers to remote\n')
684 684 repo = pushop.repo
685 685 remote = pushop.remote
686 686 pushop.stepsdone.add('obsmarkers')
687 687 if pushop.outobsmarkers:
688 688 rslts = []
689 689 remotedata = obsolete._pushkeyescape(pushop.outobsmarkers)
690 690 for key in sorted(remotedata, reverse=True):
691 691 # reverse sort to ensure we end with dump0
692 692 data = remotedata[key]
693 693 rslts.append(remote.pushkey('obsolete', key, '', data))
694 694 if [r for r in rslts if not r]:
695 695 msg = _('failed to push some obsolete markers!\n')
696 696 repo.ui.warn(msg)
697 697
698 698 def _pushbookmark(pushop):
699 699 """Update bookmark position on remote"""
700 700 if pushop.ret == 0 or 'bookmarks' in pushop.stepsdone:
701 701 return
702 702 pushop.stepsdone.add('bookmarks')
703 703 ui = pushop.ui
704 704 remote = pushop.remote
705 705 for b, old, new in pushop.outbookmarks:
706 706 if remote.pushkey('bookmarks', b, old, new):
707 707 ui.status(_("updating bookmark %s\n") % b)
708 708 else:
709 709 ui.warn(_('updating bookmark %s failed!\n') % b)
710 710
711 711 class pulloperation(object):
712 712 """A object that represent a single pull operation
713 713
714 714 It purpose is to carry push related state and very common operation.
715 715
716 716 A new should be created at the beginning of each pull and discarded
717 717 afterward.
718 718 """
719 719
720 720 def __init__(self, repo, remote, heads=None, force=False):
721 721 # repo we pull into
722 722 self.repo = repo
723 723 # repo we pull from
724 724 self.remote = remote
725 725 # revision we try to pull (None is "all")
726 726 self.heads = heads
727 727 # do we force pull?
728 728 self.force = force
729 729 # the name the pull transaction
730 730 self._trname = 'pull\n' + util.hidepassword(remote.url())
731 731 # hold the transaction once created
732 732 self._tr = None
733 733 # set of common changeset between local and remote before pull
734 734 self.common = None
735 735 # set of pulled head
736 736 self.rheads = None
737 737 # list of missing changeset to fetch remotely
738 738 self.fetch = None
739 739 # result of changegroup pulling (used as return code by pull)
740 740 self.cgresult = None
741 741 # list of step remaining todo (related to future bundle2 usage)
742 742 self.todosteps = set(['changegroup', 'phases', 'obsmarkers'])
743 743
744 744 @util.propertycache
745 745 def pulledsubset(self):
746 746 """heads of the set of changeset target by the pull"""
747 747 # compute target subset
748 748 if self.heads is None:
749 749 # We pulled every thing possible
750 750 # sync on everything common
751 751 c = set(self.common)
752 752 ret = list(self.common)
753 753 for n in self.rheads:
754 754 if n not in c:
755 755 ret.append(n)
756 756 return ret
757 757 else:
758 758 # We pulled a specific subset
759 759 # sync on this subset
760 760 return self.heads
761 761
762 762 def gettransaction(self):
763 763 """get appropriate pull transaction, creating it if needed"""
764 764 if self._tr is None:
765 765 self._tr = self.repo.transaction(self._trname)
766 766 return self._tr
767 767
768 768 def closetransaction(self):
769 769 """close transaction if created"""
770 770 if self._tr is not None:
771 771 self._tr.close()
772 772
773 773 def releasetransaction(self):
774 774 """release transaction if created"""
775 775 if self._tr is not None:
776 776 self._tr.release()
777 777
778 778 def pull(repo, remote, heads=None, force=False):
779 779 pullop = pulloperation(repo, remote, heads, force)
780 780 if pullop.remote.local():
781 781 missing = set(pullop.remote.requirements) - pullop.repo.supported
782 782 if missing:
783 783 msg = _("required features are not"
784 784 " supported in the destination:"
785 785 " %s") % (', '.join(sorted(missing)))
786 786 raise util.Abort(msg)
787 787
788 788 lock = pullop.repo.lock()
789 789 try:
790 790 _pulldiscovery(pullop)
791 791 if (pullop.repo.ui.configbool('experimental', 'bundle2-exp', False)
792 792 and pullop.remote.capable('bundle2-exp')):
793 793 _pullbundle2(pullop)
794 794 if 'changegroup' in pullop.todosteps:
795 795 _pullchangeset(pullop)
796 796 if 'phases' in pullop.todosteps:
797 797 _pullphase(pullop)
798 798 if 'obsmarkers' in pullop.todosteps:
799 799 _pullobsolete(pullop)
800 800 pullop.closetransaction()
801 801 finally:
802 802 pullop.releasetransaction()
803 803 lock.release()
804 804
805 805 return pullop.cgresult
806 806
807 807 def _pulldiscovery(pullop):
808 808 """discovery phase for the pull
809 809
810 810 Current handle changeset discovery only, will change handle all discovery
811 811 at some point."""
812 812 tmp = discovery.findcommonincoming(pullop.repo.unfiltered(),
813 813 pullop.remote,
814 814 heads=pullop.heads,
815 815 force=pullop.force)
816 816 pullop.common, pullop.fetch, pullop.rheads = tmp
817 817
818 818 def _pullbundle2(pullop):
819 819 """pull data using bundle2
820 820
821 821 For now, the only supported data are changegroup."""
822 822 remotecaps = bundle2.bundle2caps(pullop.remote)
823 823 kwargs = {'bundlecaps': caps20to10(pullop.repo)}
824 824 # pulling changegroup
825 825 pullop.todosteps.remove('changegroup')
826 826
827 827 kwargs['common'] = pullop.common
828 828 kwargs['heads'] = pullop.heads or pullop.rheads
829 829 kwargs['cg'] = pullop.fetch
830 830 if 'b2x:listkeys' in remotecaps:
831 831 kwargs['listkeys'] = ['phase']
832 832 if not pullop.fetch:
833 833 pullop.repo.ui.status(_("no changes found\n"))
834 834 pullop.cgresult = 0
835 835 else:
836 836 if pullop.heads is None and list(pullop.common) == [nullid]:
837 837 pullop.repo.ui.status(_("requesting all changes\n"))
838 838 if obsolete._enabled:
839 839 remoteversions = bundle2.obsmarkersversion(remotecaps)
840 840 if obsolete.commonversion(remoteversions) is not None:
841 841 kwargs['obsmarkers'] = True
842 842 pullop.todosteps.remove('obsmarkers')
843 843 _pullbundle2extraprepare(pullop, kwargs)
844 844 if kwargs.keys() == ['format']:
845 845 return # nothing to pull
846 846 bundle = pullop.remote.getbundle('pull', **kwargs)
847 847 try:
848 848 op = bundle2.processbundle(pullop.repo, bundle, pullop.gettransaction)
849 849 except error.BundleValueError, exc:
850 850 raise util.Abort('missing support for %s' % exc)
851 851
852 852 if pullop.fetch:
853 853 assert len(op.records['changegroup']) == 1
854 854 pullop.cgresult = op.records['changegroup'][0]['return']
855 855
856 856 # processing phases change
857 857 for namespace, value in op.records['listkeys']:
858 858 if namespace == 'phases':
859 859 _pullapplyphases(pullop, value)
860 860
861 861 def _pullbundle2extraprepare(pullop, kwargs):
862 862 """hook function so that extensions can extend the getbundle call"""
863 863 pass
864 864
865 865 def _pullchangeset(pullop):
866 866 """pull changeset from unbundle into the local repo"""
867 867 # We delay the open of the transaction as late as possible so we
868 868 # don't open transaction for nothing or you break future useful
869 869 # rollback call
870 870 pullop.todosteps.remove('changegroup')
871 871 if not pullop.fetch:
872 872 pullop.repo.ui.status(_("no changes found\n"))
873 873 pullop.cgresult = 0
874 874 return
875 875 pullop.gettransaction()
876 876 if pullop.heads is None and list(pullop.common) == [nullid]:
877 877 pullop.repo.ui.status(_("requesting all changes\n"))
878 878 elif pullop.heads is None and pullop.remote.capable('changegroupsubset'):
879 879 # issue1320, avoid a race if remote changed after discovery
880 880 pullop.heads = pullop.rheads
881 881
882 882 if pullop.remote.capable('getbundle'):
883 883 # TODO: get bundlecaps from remote
884 884 cg = pullop.remote.getbundle('pull', common=pullop.common,
885 885 heads=pullop.heads or pullop.rheads)
886 886 elif pullop.heads is None:
887 887 cg = pullop.remote.changegroup(pullop.fetch, 'pull')
888 888 elif not pullop.remote.capable('changegroupsubset'):
889 889 raise util.Abort(_("partial pull cannot be done because "
890 890 "other repository doesn't support "
891 891 "changegroupsubset."))
892 892 else:
893 893 cg = pullop.remote.changegroupsubset(pullop.fetch, pullop.heads, 'pull')
894 894 pullop.cgresult = changegroup.addchangegroup(pullop.repo, cg, 'pull',
895 895 pullop.remote.url())
896 896
897 897 def _pullphase(pullop):
898 898 # Get remote phases data from remote
899 899 remotephases = pullop.remote.listkeys('phases')
900 900 _pullapplyphases(pullop, remotephases)
901 901
902 902 def _pullapplyphases(pullop, remotephases):
903 903 """apply phase movement from observed remote state"""
904 904 pullop.todosteps.remove('phases')
905 905 publishing = bool(remotephases.get('publishing', False))
906 906 if remotephases and not publishing:
907 907 # remote is new and unpublishing
908 908 pheads, _dr = phases.analyzeremotephases(pullop.repo,
909 909 pullop.pulledsubset,
910 910 remotephases)
911 911 dheads = pullop.pulledsubset
912 912 else:
913 913 # Remote is old or publishing all common changesets
914 914 # should be seen as public
915 915 pheads = pullop.pulledsubset
916 916 dheads = []
917 917 unfi = pullop.repo.unfiltered()
918 918 phase = unfi._phasecache.phase
919 919 rev = unfi.changelog.nodemap.get
920 920 public = phases.public
921 921 draft = phases.draft
922 922
923 923 # exclude changesets already public locally and update the others
924 924 pheads = [pn for pn in pheads if phase(unfi, rev(pn)) > public]
925 925 if pheads:
926 926 tr = pullop.gettransaction()
927 927 phases.advanceboundary(pullop.repo, tr, public, pheads)
928 928
929 929 # exclude changesets already draft locally and update the others
930 930 dheads = [pn for pn in dheads if phase(unfi, rev(pn)) > draft]
931 931 if dheads:
932 932 tr = pullop.gettransaction()
933 933 phases.advanceboundary(pullop.repo, tr, draft, dheads)
934 934
935 935 def _pullobsolete(pullop):
936 936 """utility function to pull obsolete markers from a remote
937 937
938 938 The `gettransaction` is function that return the pull transaction, creating
939 939 one if necessary. We return the transaction to inform the calling code that
940 940 a new transaction have been created (when applicable).
941 941
942 942 Exists mostly to allow overriding for experimentation purpose"""
943 943 pullop.todosteps.remove('obsmarkers')
944 944 tr = None
945 945 if obsolete._enabled:
946 946 pullop.repo.ui.debug('fetching remote obsolete markers\n')
947 947 remoteobs = pullop.remote.listkeys('obsolete')
948 948 if 'dump0' in remoteobs:
949 949 tr = pullop.gettransaction()
950 950 for key in sorted(remoteobs, reverse=True):
951 951 if key.startswith('dump'):
952 952 data = base85.b85decode(remoteobs[key])
953 953 pullop.repo.obsstore.mergemarkers(tr, data)
954 954 pullop.repo.invalidatevolatilesets()
955 955 return tr
956 956
957 957 def caps20to10(repo):
958 958 """return a set with appropriate options to use bundle20 during getbundle"""
959 959 caps = set(['HG2X'])
960 960 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
961 961 caps.add('bundle2=' + urllib.quote(capsblob))
962 962 return caps
963 963
964 # List of names of steps to perform for a bundle2 for getbundle, order matters.
965 getbundle2partsorder = []
966
967 # Mapping between step name and function
968 #
969 # This exists to help extensions wrap steps if necessary
970 getbundle2partsmapping = {}
971
972 def getbundle2partsgenerator(stepname):
973 """decorator for function generating bundle2 part for getbundle
974
975 The function is added to the step -> function mapping and appended to the
976 list of steps. Beware that decorated functions will be added in order
977 (this may matter).
978
979 You can only use this decorator for new steps, if you want to wrap a step
980 from an extension, attack the getbundle2partsmapping dictionary directly."""
981 def dec(func):
982 assert stepname not in getbundle2partsmapping
983 getbundle2partsmapping[stepname] = func
984 getbundle2partsorder.append(stepname)
985 return func
986 return dec
987
964 988 def getbundle(repo, source, heads=None, common=None, bundlecaps=None,
965 989 **kwargs):
966 990 """return a full bundle (with potentially multiple kind of parts)
967 991
968 992 Could be a bundle HG10 or a bundle HG2X depending on bundlecaps
969 993 passed. For now, the bundle can contain only changegroup, but this will
970 994 changes when more part type will be available for bundle2.
971 995
972 996 This is different from changegroup.getchangegroup that only returns an HG10
973 997 changegroup bundle. They may eventually get reunited in the future when we
974 998 have a clearer idea of the API we what to query different data.
975 999
976 1000 The implementation is at a very early stage and will get massive rework
977 1001 when the API of bundle is refined.
978 1002 """
979 cg = None
980 if kwargs.get('cg', True):
981 # build changegroup bundle here.
982 cg = changegroup.getchangegroup(repo, source, heads=heads,
983 common=common, bundlecaps=bundlecaps)
984 elif 'HG2X' not in bundlecaps:
985 raise ValueError(_('request for bundle10 must include changegroup'))
1003 # bundle10 case
986 1004 if bundlecaps is None or 'HG2X' not in bundlecaps:
1005 if bundlecaps and not kwargs.get('cg', True):
1006 raise ValueError(_('request for bundle10 must include changegroup'))
1007
987 1008 if kwargs:
988 1009 raise ValueError(_('unsupported getbundle arguments: %s')
989 1010 % ', '.join(sorted(kwargs.keys())))
990 return cg
991 # very crude first implementation,
992 # the bundle API will change and the generation will be done lazily.
1011 return changegroup.getchangegroup(repo, source, heads=heads,
1012 common=common, bundlecaps=bundlecaps)
1013
1014 # bundle20 case
993 1015 b2caps = {}
994 1016 for bcaps in bundlecaps:
995 1017 if bcaps.startswith('bundle2='):
996 1018 blob = urllib.unquote(bcaps[len('bundle2='):])
997 1019 b2caps.update(bundle2.decodecaps(blob))
998 1020 bundler = bundle2.bundle20(repo.ui, b2caps)
1021
1022 for name in getbundle2partsorder:
1023 func = getbundle2partsmapping[name]
1024 func(bundler, repo, source, heads=heads, common=common,
1025 bundlecaps=bundlecaps, b2caps=b2caps, **kwargs)
1026
1027 return util.chunkbuffer(bundler.getchunks())
1028
1029 @getbundle2partsgenerator('changegroup')
1030 def _getbundlechangegrouppart(bundler, repo, source, heads=None, common=None,
1031 bundlecaps=None, b2caps=None, **kwargs):
1032 """add a changegroup part to the requested bundle"""
1033 cg = None
1034 if kwargs.get('cg', True):
1035 # build changegroup bundle here.
1036 cg = changegroup.getchangegroup(repo, source, heads=heads,
1037 common=common, bundlecaps=bundlecaps)
1038
999 1039 if cg:
1000 1040 bundler.newpart('b2x:changegroup', data=cg.getchunks())
1041
1042 @getbundle2partsgenerator('listkeys')
1043 def _getbundlelistkeysparts(bundler, repo, source, heads=None, common=None,
1044 bundlecaps=None, b2caps=None, **kwargs):
1045 """add parts containing listkeys namespaces to the requested bundle"""
1001 1046 listkeys = kwargs.get('listkeys', ())
1002 1047 for namespace in listkeys:
1003 1048 part = bundler.newpart('b2x:listkeys')
1004 1049 part.addparam('namespace', namespace)
1005 1050 keys = repo.listkeys(namespace).items()
1006 1051 part.data = pushkey.encodekeys(keys)
1007 _getbundleobsmarkerpart(bundler, repo, source, heads=heads, common=common,
1008 bundlecaps=bundlecaps, b2caps=b2caps, **kwargs)
1009 _getbundleextrapart(bundler, repo, source, heads=heads, common=common,
1010 bundlecaps=bundlecaps, b2caps=b2caps, **kwargs)
1011 return util.chunkbuffer(bundler.getchunks())
1012 1052
1053 @getbundle2partsgenerator('obsmarkers')
1013 1054 def _getbundleobsmarkerpart(bundler, repo, source, heads=None, common=None,
1014 1055 bundlecaps=None, b2caps=None, **kwargs):
1015 1056 """add an obsolescence markers part to the requested bundle"""
1016 1057 if kwargs.get('obsmarkers', False):
1017 1058 if heads is None:
1018 1059 heads = repo.heads()
1019 1060 subset = [c.node() for c in repo.set('::%ln', heads)]
1020 1061 markers = repo.obsstore.relevantmarkers(subset)
1021 1062 buildobsmarkerspart(bundler, markers)
1022 1063
1064 @getbundle2partsgenerator('extra')
1023 1065 def _getbundleextrapart(bundler, repo, source, heads=None, common=None,
1024 1066 bundlecaps=None, b2caps=None, **kwargs):
1025 1067 """hook function to let extensions add parts to the requested bundle"""
1026 1068 pass
1027 1069
1028 1070 def check_heads(repo, their_heads, context):
1029 1071 """check if the heads of a repo have been modified
1030 1072
1031 1073 Used by peer for unbundling.
1032 1074 """
1033 1075 heads = repo.heads()
1034 1076 heads_hash = util.sha1(''.join(sorted(heads))).digest()
1035 1077 if not (their_heads == ['force'] or their_heads == heads or
1036 1078 their_heads == ['hashed', heads_hash]):
1037 1079 # someone else committed/pushed/unbundled while we
1038 1080 # were transferring data
1039 1081 raise error.PushRaced('repository changed while %s - '
1040 1082 'please try again' % context)
1041 1083
1042 1084 def unbundle(repo, cg, heads, source, url):
1043 1085 """Apply a bundle to a repo.
1044 1086
1045 1087 this function makes sure the repo is locked during the application and have
1046 1088 mechanism to check that no push race occurred between the creation of the
1047 1089 bundle and its application.
1048 1090
1049 1091 If the push was raced as PushRaced exception is raised."""
1050 1092 r = 0
1051 1093 # need a transaction when processing a bundle2 stream
1052 1094 tr = None
1053 1095 lock = repo.lock()
1054 1096 try:
1055 1097 check_heads(repo, heads, 'uploading changes')
1056 1098 # push can proceed
1057 1099 if util.safehasattr(cg, 'params'):
1058 1100 try:
1059 1101 tr = repo.transaction('unbundle')
1060 1102 tr.hookargs['bundle2-exp'] = '1'
1061 1103 r = bundle2.processbundle(repo, cg, lambda: tr).reply
1062 1104 cl = repo.unfiltered().changelog
1063 1105 p = cl.writepending() and repo.root or ""
1064 1106 repo.hook('b2x-pretransactionclose', throw=True, source=source,
1065 1107 url=url, pending=p, **tr.hookargs)
1066 1108 tr.close()
1067 1109 repo.hook('b2x-transactionclose', source=source, url=url,
1068 1110 **tr.hookargs)
1069 1111 except Exception, exc:
1070 1112 exc.duringunbundle2 = True
1071 1113 raise
1072 1114 else:
1073 1115 r = changegroup.addchangegroup(repo, cg, source, url)
1074 1116 finally:
1075 1117 if tr is not None:
1076 1118 tr.release()
1077 1119 lock.release()
1078 1120 return r
General Comments 0
You need to be logged in to leave comments. Login now