##// END OF EJS Templates
push: use bundle2 to push phases when available...
Pierre-Yves David -
r21662:09f19e09 default
parent child Browse files
Show More
@@ -1,754 +1,792 b''
1 1 # exchange.py - utility to exchange data between repos.
2 2 #
3 3 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from i18n import _
9 9 from node import hex, nullid
10 10 import errno, urllib
11 11 import util, scmutil, changegroup, base85, error
12 12 import discovery, phases, obsolete, bookmarks, bundle2, pushkey
13 13
14 14 def readbundle(ui, fh, fname, vfs=None):
15 15 header = changegroup.readexactly(fh, 4)
16 16
17 17 alg = None
18 18 if not fname:
19 19 fname = "stream"
20 20 if not header.startswith('HG') and header.startswith('\0'):
21 21 fh = changegroup.headerlessfixup(fh, header)
22 22 header = "HG10"
23 23 alg = 'UN'
24 24 elif vfs:
25 25 fname = vfs.join(fname)
26 26
27 27 magic, version = header[0:2], header[2:4]
28 28
29 29 if magic != 'HG':
30 30 raise util.Abort(_('%s: not a Mercurial bundle') % fname)
31 31 if version == '10':
32 32 if alg is None:
33 33 alg = changegroup.readexactly(fh, 2)
34 34 return changegroup.unbundle10(fh, alg)
35 35 elif version == '2X':
36 36 return bundle2.unbundle20(ui, fh, header=magic + version)
37 37 else:
38 38 raise util.Abort(_('%s: unknown bundle version %s') % (fname, version))
39 39
40 40
41 41 class pushoperation(object):
42 42 """A object that represent a single push operation
43 43
44 44 It purpose is to carry push related state and very common operation.
45 45
46 46 A new should be created at the beginning of each push and discarded
47 47 afterward.
48 48 """
49 49
50 50 def __init__(self, repo, remote, force=False, revs=None, newbranch=False):
51 51 # repo we push from
52 52 self.repo = repo
53 53 self.ui = repo.ui
54 54 # repo we push to
55 55 self.remote = remote
56 56 # force option provided
57 57 self.force = force
58 58 # revs to be pushed (None is "all")
59 59 self.revs = revs
60 60 # allow push of new branch
61 61 self.newbranch = newbranch
62 62 # did a local lock get acquired?
63 63 self.locallocked = None
64 64 # Integer version of the push result
65 65 # - None means nothing to push
66 66 # - 0 means HTTP error
67 67 # - 1 means we pushed and remote head count is unchanged *or*
68 68 # we have outgoing changesets but refused to push
69 69 # - other values as described by addchangegroup()
70 70 self.ret = None
71 71 # discover.outgoing object (contains common and outgoing data)
72 72 self.outgoing = None
73 73 # all remote heads before the push
74 74 self.remoteheads = None
75 75 # testable as a boolean indicating if any nodes are missing locally.
76 76 self.incoming = None
77 77 # set of all heads common after changeset bundle push
78 78 self.commonheads = None
79 79
80 80 def push(repo, remote, force=False, revs=None, newbranch=False):
81 81 '''Push outgoing changesets (limited by revs) from a local
82 82 repository to remote. Return an integer:
83 83 - None means nothing to push
84 84 - 0 means HTTP error
85 85 - 1 means we pushed and remote head count is unchanged *or*
86 86 we have outgoing changesets but refused to push
87 87 - other values as described by addchangegroup()
88 88 '''
89 89 pushop = pushoperation(repo, remote, force, revs, newbranch)
90 90 if pushop.remote.local():
91 91 missing = (set(pushop.repo.requirements)
92 92 - pushop.remote.local().supported)
93 93 if missing:
94 94 msg = _("required features are not"
95 95 " supported in the destination:"
96 96 " %s") % (', '.join(sorted(missing)))
97 97 raise util.Abort(msg)
98 98
99 99 # there are two ways to push to remote repo:
100 100 #
101 101 # addchangegroup assumes local user can lock remote
102 102 # repo (local filesystem, old ssh servers).
103 103 #
104 104 # unbundle assumes local user cannot lock remote repo (new ssh
105 105 # servers, http servers).
106 106
107 107 if not pushop.remote.canpush():
108 108 raise util.Abort(_("destination does not support push"))
109 109 # get local lock as we might write phase data
110 110 locallock = None
111 111 try:
112 112 locallock = pushop.repo.lock()
113 113 pushop.locallocked = True
114 114 except IOError, err:
115 115 pushop.locallocked = False
116 116 if err.errno != errno.EACCES:
117 117 raise
118 118 # source repo cannot be locked.
119 119 # We do not abort the push, but just disable the local phase
120 120 # synchronisation.
121 121 msg = 'cannot lock source repository: %s\n' % err
122 122 pushop.ui.debug(msg)
123 123 try:
124 124 pushop.repo.checkpush(pushop)
125 125 lock = None
126 126 unbundle = pushop.remote.capable('unbundle')
127 127 if not unbundle:
128 128 lock = pushop.remote.lock()
129 129 try:
130 130 _pushdiscovery(pushop)
131 131 if _pushcheckoutgoing(pushop):
132 132 pushop.repo.prepushoutgoinghooks(pushop.repo,
133 133 pushop.remote,
134 134 pushop.outgoing)
135 135 if (pushop.repo.ui.configbool('experimental', 'bundle2-exp',
136 136 False)
137 137 and pushop.remote.capable('bundle2-exp')):
138 138 _pushbundle2(pushop)
139 139 else:
140 140 _pushchangeset(pushop)
141 141 _pushcomputecommonheads(pushop)
142 142 _pushsyncphase(pushop)
143 143 _pushobsolete(pushop)
144 144 finally:
145 145 if lock is not None:
146 146 lock.release()
147 147 finally:
148 148 if locallock is not None:
149 149 locallock.release()
150 150
151 151 _pushbookmark(pushop)
152 152 return pushop.ret
153 153
154 154 def _pushdiscovery(pushop):
155 155 # discovery
156 156 unfi = pushop.repo.unfiltered()
157 157 fci = discovery.findcommonincoming
158 158 commoninc = fci(unfi, pushop.remote, force=pushop.force)
159 159 common, inc, remoteheads = commoninc
160 160 fco = discovery.findcommonoutgoing
161 161 outgoing = fco(unfi, pushop.remote, onlyheads=pushop.revs,
162 162 commoninc=commoninc, force=pushop.force)
163 163 pushop.outgoing = outgoing
164 164 pushop.remoteheads = remoteheads
165 165 pushop.incoming = inc
166 166
167 167 def _pushcheckoutgoing(pushop):
168 168 outgoing = pushop.outgoing
169 169 unfi = pushop.repo.unfiltered()
170 170 if not outgoing.missing:
171 171 # nothing to push
172 172 scmutil.nochangesfound(unfi.ui, unfi, outgoing.excluded)
173 173 return False
174 174 # something to push
175 175 if not pushop.force:
176 176 # if repo.obsstore == False --> no obsolete
177 177 # then, save the iteration
178 178 if unfi.obsstore:
179 179 # this message are here for 80 char limit reason
180 180 mso = _("push includes obsolete changeset: %s!")
181 181 mst = "push includes %s changeset: %s!"
182 182 # plain versions for i18n tool to detect them
183 183 _("push includes unstable changeset: %s!")
184 184 _("push includes bumped changeset: %s!")
185 185 _("push includes divergent changeset: %s!")
186 186 # If we are to push if there is at least one
187 187 # obsolete or unstable changeset in missing, at
188 188 # least one of the missinghead will be obsolete or
189 189 # unstable. So checking heads only is ok
190 190 for node in outgoing.missingheads:
191 191 ctx = unfi[node]
192 192 if ctx.obsolete():
193 193 raise util.Abort(mso % ctx)
194 194 elif ctx.troubled():
195 195 raise util.Abort(_(mst)
196 196 % (ctx.troubles()[0],
197 197 ctx))
198 198 newbm = pushop.ui.configlist('bookmarks', 'pushing')
199 199 discovery.checkheads(unfi, pushop.remote, outgoing,
200 200 pushop.remoteheads,
201 201 pushop.newbranch,
202 202 bool(pushop.incoming),
203 203 newbm)
204 204 return True
205 205
206 206 def _pushbundle2(pushop):
207 207 """push data to the remote using bundle2
208 208
209 209 The only currently supported type of data is changegroup but this will
210 210 evolve in the future."""
211 211 bundler = bundle2.bundle20(pushop.ui, bundle2.bundle2caps(pushop.remote))
212 212 # create reply capability
213 213 capsblob = bundle2.encodecaps(pushop.repo.bundle2caps)
214 214 bundler.newpart('b2x:replycaps', data=capsblob)
215 215 # Send known heads to the server for race detection.
216 216 if not pushop.force:
217 217 bundler.newpart('B2X:CHECK:HEADS', data=iter(pushop.remoteheads))
218 218 extrainfo = _pushbundle2extraparts(pushop, bundler)
219 219 # add the changegroup bundle
220 220 cg = changegroup.getlocalbundle(pushop.repo, 'push', pushop.outgoing)
221 221 cgpart = bundler.newpart('B2X:CHANGEGROUP', data=cg.getchunks())
222 222 stream = util.chunkbuffer(bundler.getchunks())
223 223 try:
224 224 reply = pushop.remote.unbundle(stream, ['force'], 'push')
225 225 except error.BundleValueError, exc:
226 226 raise util.Abort('missing support for %s' % exc)
227 227 try:
228 228 op = bundle2.processbundle(pushop.repo, reply)
229 229 except error.BundleValueError, exc:
230 230 raise util.Abort('missing support for %s' % exc)
231 231 cgreplies = op.records.getreplies(cgpart.id)
232 232 assert len(cgreplies['changegroup']) == 1
233 233 pushop.ret = cgreplies['changegroup'][0]['return']
234 234 _pushbundle2extrareply(pushop, op, extrainfo)
235 235
236 236 def _pushbundle2extraparts(pushop, bundler):
237 237 """hook function to let extensions add parts
238 238
239 239 Return a dict to let extensions pass data to the reply processing.
240 240 """
241 241 return {}
242 242
243 243 def _pushbundle2extrareply(pushop, op, extrainfo):
244 244 """hook function to let extensions react to part replies
245 245
246 246 The dict from _pushbundle2extrareply is fed to this function.
247 247 """
248 248 pass
249 249
250 250 def _pushchangeset(pushop):
251 251 """Make the actual push of changeset bundle to remote repo"""
252 252 outgoing = pushop.outgoing
253 253 unbundle = pushop.remote.capable('unbundle')
254 254 # TODO: get bundlecaps from remote
255 255 bundlecaps = None
256 256 # create a changegroup from local
257 257 if pushop.revs is None and not (outgoing.excluded
258 258 or pushop.repo.changelog.filteredrevs):
259 259 # push everything,
260 260 # use the fast path, no race possible on push
261 261 bundler = changegroup.bundle10(pushop.repo, bundlecaps)
262 262 cg = changegroup.getsubset(pushop.repo,
263 263 outgoing,
264 264 bundler,
265 265 'push',
266 266 fastpath=True)
267 267 else:
268 268 cg = changegroup.getlocalbundle(pushop.repo, 'push', outgoing,
269 269 bundlecaps)
270 270
271 271 # apply changegroup to remote
272 272 if unbundle:
273 273 # local repo finds heads on server, finds out what
274 274 # revs it must push. once revs transferred, if server
275 275 # finds it has different heads (someone else won
276 276 # commit/push race), server aborts.
277 277 if pushop.force:
278 278 remoteheads = ['force']
279 279 else:
280 280 remoteheads = pushop.remoteheads
281 281 # ssh: return remote's addchangegroup()
282 282 # http: return remote's addchangegroup() or 0 for error
283 283 pushop.ret = pushop.remote.unbundle(cg, remoteheads,
284 284 'push')
285 285 else:
286 286 # we return an integer indicating remote head count
287 287 # change
288 288 pushop.ret = pushop.remote.addchangegroup(cg, 'push', pushop.repo.url())
289 289
290 290 def _pushcomputecommonheads(pushop):
291 291 unfi = pushop.repo.unfiltered()
292 292 if pushop.ret:
293 293 # push succeed, synchronize target of the push
294 294 cheads = pushop.outgoing.missingheads
295 295 elif pushop.revs is None:
296 296 # All out push fails. synchronize all common
297 297 cheads = pushop.outgoing.commonheads
298 298 else:
299 299 # I want cheads = heads(::missingheads and ::commonheads)
300 300 # (missingheads is revs with secret changeset filtered out)
301 301 #
302 302 # This can be expressed as:
303 303 # cheads = ( (missingheads and ::commonheads)
304 304 # + (commonheads and ::missingheads))"
305 305 # )
306 306 #
307 307 # while trying to push we already computed the following:
308 308 # common = (::commonheads)
309 309 # missing = ((commonheads::missingheads) - commonheads)
310 310 #
311 311 # We can pick:
312 312 # * missingheads part of common (::commonheads)
313 313 common = set(pushop.outgoing.common)
314 314 nm = pushop.repo.changelog.nodemap
315 315 cheads = [node for node in pushop.revs if nm[node] in common]
316 316 # and
317 317 # * commonheads parents on missing
318 318 revset = unfi.set('%ln and parents(roots(%ln))',
319 319 pushop.outgoing.commonheads,
320 320 pushop.outgoing.missing)
321 321 cheads.extend(c.node() for c in revset)
322 322 pushop.commonheads = cheads
323 323
324 324 def _pushsyncphase(pushop):
325 325 """synchronise phase information locally and remotely"""
326 326 unfi = pushop.repo.unfiltered()
327 327 cheads = pushop.commonheads
328 328 # even when we don't push, exchanging phase data is useful
329 329 remotephases = pushop.remote.listkeys('phases')
330 330 if (pushop.ui.configbool('ui', '_usedassubrepo', False)
331 331 and remotephases # server supports phases
332 332 and pushop.ret is None # nothing was pushed
333 333 and remotephases.get('publishing', False)):
334 334 # When:
335 335 # - this is a subrepo push
336 336 # - and remote support phase
337 337 # - and no changeset was pushed
338 338 # - and remote is publishing
339 339 # We may be in issue 3871 case!
340 340 # We drop the possible phase synchronisation done by
341 341 # courtesy to publish changesets possibly locally draft
342 342 # on the remote.
343 343 remotephases = {'publishing': 'True'}
344 344 if not remotephases: # old server or public only reply from non-publishing
345 345 _localphasemove(pushop, cheads)
346 346 # don't push any phase data as there is nothing to push
347 347 else:
348 348 ana = phases.analyzeremotephases(pushop.repo, cheads,
349 349 remotephases)
350 350 pheads, droots = ana
351 351 ### Apply remote phase on local
352 352 if remotephases.get('publishing', False):
353 353 _localphasemove(pushop, cheads)
354 354 else: # publish = False
355 355 _localphasemove(pushop, pheads)
356 356 _localphasemove(pushop, cheads, phases.draft)
357 357 ### Apply local phase on remote
358 358
359 359 # Get the list of all revs draft on remote by public here.
360 360 # XXX Beware that revset break if droots is not strictly
361 361 # XXX root we may want to ensure it is but it is costly
362 outdated = unfi.set('heads((%ln::%ln) and public())',
363 droots, cheads)
364 for newremotehead in outdated:
365 r = pushop.remote.pushkey('phases',
366 newremotehead.hex(),
367 str(phases.draft),
368 str(phases.public))
369 if not r:
370 pushop.ui.warn(_('updating %s to public failed!\n')
371 % newremotehead)
362 outdated = unfi.set('heads((%ln::%ln) and public())',
363 droots, cheads)
364
365 b2caps = bundle2.bundle2caps(pushop.remote)
366 if 'b2x:pushkey' in b2caps:
367 # server supports bundle2, let's do a batched push through it
368 #
369 # This will eventually be unified with the changesets bundle2 push
370 bundler = bundle2.bundle20(pushop.ui, b2caps)
371 capsblob = bundle2.encodecaps(pushop.repo.bundle2caps)
372 bundler.newpart('b2x:replycaps', data=capsblob)
373 part2node = []
374 enc = pushkey.encode
375 for newremotehead in outdated:
376 part = bundler.newpart('b2x:pushkey')
377 part.addparam('namespace', enc('phases'))
378 part.addparam('key', enc(newremotehead.hex()))
379 part.addparam('old', enc(str(phases.draft)))
380 part.addparam('new', enc(str(phases.public)))
381 part2node.append((part.id, newremotehead))
382 stream = util.chunkbuffer(bundler.getchunks())
383 try:
384 reply = pushop.remote.unbundle(stream, ['force'], 'push')
385 op = bundle2.processbundle(pushop.repo, reply)
386 except error.BundleValueError, exc:
387 raise util.Abort('missing support for %s' % exc)
388 for partid, node in part2node:
389 partrep = op.records.getreplies(partid)
390 results = partrep['pushkey']
391 assert len(results) <= 1
392 msg = None
393 if not results:
394 msg = _('server ignored update of %s to public!\n') % node
395 elif not int(results[0]['return']):
396 msg = _('updating %s to public failed!\n') % node
397 if msg is not None:
398 pushop.ui.warn(msg)
399
400 else:
401 # fallback to independant pushkey command
402 for newremotehead in outdated:
403 r = pushop.remote.pushkey('phases',
404 newremotehead.hex(),
405 str(phases.draft),
406 str(phases.public))
407 if not r:
408 pushop.ui.warn(_('updating %s to public failed!\n')
409 % newremotehead)
372 410
373 411 def _localphasemove(pushop, nodes, phase=phases.public):
374 412 """move <nodes> to <phase> in the local source repo"""
375 413 if pushop.locallocked:
376 414 phases.advanceboundary(pushop.repo, phase, nodes)
377 415 else:
378 416 # repo is not locked, do not change any phases!
379 417 # Informs the user that phases should have been moved when
380 418 # applicable.
381 419 actualmoves = [n for n in nodes if phase < pushop.repo[n].phase()]
382 420 phasestr = phases.phasenames[phase]
383 421 if actualmoves:
384 422 pushop.ui.status(_('cannot lock source repo, skipping '
385 423 'local %s phase update\n') % phasestr)
386 424
387 425 def _pushobsolete(pushop):
388 426 """utility function to push obsolete markers to a remote"""
389 427 pushop.ui.debug('try to push obsolete markers to remote\n')
390 428 repo = pushop.repo
391 429 remote = pushop.remote
392 430 if (obsolete._enabled and repo.obsstore and
393 431 'obsolete' in remote.listkeys('namespaces')):
394 432 rslts = []
395 433 remotedata = repo.listkeys('obsolete')
396 434 for key in sorted(remotedata, reverse=True):
397 435 # reverse sort to ensure we end with dump0
398 436 data = remotedata[key]
399 437 rslts.append(remote.pushkey('obsolete', key, '', data))
400 438 if [r for r in rslts if not r]:
401 439 msg = _('failed to push some obsolete markers!\n')
402 440 repo.ui.warn(msg)
403 441
404 442 def _pushbookmark(pushop):
405 443 """Update bookmark position on remote"""
406 444 ui = pushop.ui
407 445 repo = pushop.repo.unfiltered()
408 446 remote = pushop.remote
409 447 ui.debug("checking for updated bookmarks\n")
410 448 revnums = map(repo.changelog.rev, pushop.revs or [])
411 449 ancestors = [a for a in repo.changelog.ancestors(revnums, inclusive=True)]
412 450 (addsrc, adddst, advsrc, advdst, diverge, differ, invalid
413 451 ) = bookmarks.compare(repo, repo._bookmarks, remote.listkeys('bookmarks'),
414 452 srchex=hex)
415 453
416 454 for b, scid, dcid in advsrc:
417 455 if ancestors and repo[scid].rev() not in ancestors:
418 456 continue
419 457 if remote.pushkey('bookmarks', b, dcid, scid):
420 458 ui.status(_("updating bookmark %s\n") % b)
421 459 else:
422 460 ui.warn(_('updating bookmark %s failed!\n') % b)
423 461
424 462 class pulloperation(object):
425 463 """A object that represent a single pull operation
426 464
427 465 It purpose is to carry push related state and very common operation.
428 466
429 467 A new should be created at the beginning of each pull and discarded
430 468 afterward.
431 469 """
432 470
433 471 def __init__(self, repo, remote, heads=None, force=False):
434 472 # repo we pull into
435 473 self.repo = repo
436 474 # repo we pull from
437 475 self.remote = remote
438 476 # revision we try to pull (None is "all")
439 477 self.heads = heads
440 478 # do we force pull?
441 479 self.force = force
442 480 # the name the pull transaction
443 481 self._trname = 'pull\n' + util.hidepassword(remote.url())
444 482 # hold the transaction once created
445 483 self._tr = None
446 484 # set of common changeset between local and remote before pull
447 485 self.common = None
448 486 # set of pulled head
449 487 self.rheads = None
450 488 # list of missing changeset to fetch remotely
451 489 self.fetch = None
452 490 # result of changegroup pulling (used as return code by pull)
453 491 self.cgresult = None
454 492 # list of step remaining todo (related to future bundle2 usage)
455 493 self.todosteps = set(['changegroup', 'phases', 'obsmarkers'])
456 494
457 495 @util.propertycache
458 496 def pulledsubset(self):
459 497 """heads of the set of changeset target by the pull"""
460 498 # compute target subset
461 499 if self.heads is None:
462 500 # We pulled every thing possible
463 501 # sync on everything common
464 502 c = set(self.common)
465 503 ret = list(self.common)
466 504 for n in self.rheads:
467 505 if n not in c:
468 506 ret.append(n)
469 507 return ret
470 508 else:
471 509 # We pulled a specific subset
472 510 # sync on this subset
473 511 return self.heads
474 512
475 513 def gettransaction(self):
476 514 """get appropriate pull transaction, creating it if needed"""
477 515 if self._tr is None:
478 516 self._tr = self.repo.transaction(self._trname)
479 517 return self._tr
480 518
481 519 def closetransaction(self):
482 520 """close transaction if created"""
483 521 if self._tr is not None:
484 522 self._tr.close()
485 523
486 524 def releasetransaction(self):
487 525 """release transaction if created"""
488 526 if self._tr is not None:
489 527 self._tr.release()
490 528
491 529 def pull(repo, remote, heads=None, force=False):
492 530 pullop = pulloperation(repo, remote, heads, force)
493 531 if pullop.remote.local():
494 532 missing = set(pullop.remote.requirements) - pullop.repo.supported
495 533 if missing:
496 534 msg = _("required features are not"
497 535 " supported in the destination:"
498 536 " %s") % (', '.join(sorted(missing)))
499 537 raise util.Abort(msg)
500 538
501 539 lock = pullop.repo.lock()
502 540 try:
503 541 _pulldiscovery(pullop)
504 542 if (pullop.repo.ui.configbool('experimental', 'bundle2-exp', False)
505 543 and pullop.remote.capable('bundle2-exp')):
506 544 _pullbundle2(pullop)
507 545 if 'changegroup' in pullop.todosteps:
508 546 _pullchangeset(pullop)
509 547 if 'phases' in pullop.todosteps:
510 548 _pullphase(pullop)
511 549 if 'obsmarkers' in pullop.todosteps:
512 550 _pullobsolete(pullop)
513 551 pullop.closetransaction()
514 552 finally:
515 553 pullop.releasetransaction()
516 554 lock.release()
517 555
518 556 return pullop.cgresult
519 557
520 558 def _pulldiscovery(pullop):
521 559 """discovery phase for the pull
522 560
523 561 Current handle changeset discovery only, will change handle all discovery
524 562 at some point."""
525 563 tmp = discovery.findcommonincoming(pullop.repo.unfiltered(),
526 564 pullop.remote,
527 565 heads=pullop.heads,
528 566 force=pullop.force)
529 567 pullop.common, pullop.fetch, pullop.rheads = tmp
530 568
531 569 def _pullbundle2(pullop):
532 570 """pull data using bundle2
533 571
534 572 For now, the only supported data are changegroup."""
535 573 remotecaps = bundle2.bundle2caps(pullop.remote)
536 574 kwargs = {'bundlecaps': caps20to10(pullop.repo)}
537 575 # pulling changegroup
538 576 pullop.todosteps.remove('changegroup')
539 577
540 578 kwargs['common'] = pullop.common
541 579 kwargs['heads'] = pullop.heads or pullop.rheads
542 580 if 'b2x:listkeys' in remotecaps:
543 581 kwargs['listkeys'] = ['phase']
544 582 if not pullop.fetch:
545 583 pullop.repo.ui.status(_("no changes found\n"))
546 584 pullop.cgresult = 0
547 585 else:
548 586 if pullop.heads is None and list(pullop.common) == [nullid]:
549 587 pullop.repo.ui.status(_("requesting all changes\n"))
550 588 _pullbundle2extraprepare(pullop, kwargs)
551 589 if kwargs.keys() == ['format']:
552 590 return # nothing to pull
553 591 bundle = pullop.remote.getbundle('pull', **kwargs)
554 592 try:
555 593 op = bundle2.processbundle(pullop.repo, bundle, pullop.gettransaction)
556 594 except error.BundleValueError, exc:
557 595 raise util.Abort('missing support for %s' % exc)
558 596
559 597 if pullop.fetch:
560 598 assert len(op.records['changegroup']) == 1
561 599 pullop.cgresult = op.records['changegroup'][0]['return']
562 600
563 601 # processing phases change
564 602 for namespace, value in op.records['listkeys']:
565 603 if namespace == 'phases':
566 604 _pullapplyphases(pullop, value)
567 605
568 606 def _pullbundle2extraprepare(pullop, kwargs):
569 607 """hook function so that extensions can extend the getbundle call"""
570 608 pass
571 609
572 610 def _pullchangeset(pullop):
573 611 """pull changeset from unbundle into the local repo"""
574 612 # We delay the open of the transaction as late as possible so we
575 613 # don't open transaction for nothing or you break future useful
576 614 # rollback call
577 615 pullop.todosteps.remove('changegroup')
578 616 if not pullop.fetch:
579 617 pullop.repo.ui.status(_("no changes found\n"))
580 618 pullop.cgresult = 0
581 619 return
582 620 pullop.gettransaction()
583 621 if pullop.heads is None and list(pullop.common) == [nullid]:
584 622 pullop.repo.ui.status(_("requesting all changes\n"))
585 623 elif pullop.heads is None and pullop.remote.capable('changegroupsubset'):
586 624 # issue1320, avoid a race if remote changed after discovery
587 625 pullop.heads = pullop.rheads
588 626
589 627 if pullop.remote.capable('getbundle'):
590 628 # TODO: get bundlecaps from remote
591 629 cg = pullop.remote.getbundle('pull', common=pullop.common,
592 630 heads=pullop.heads or pullop.rheads)
593 631 elif pullop.heads is None:
594 632 cg = pullop.remote.changegroup(pullop.fetch, 'pull')
595 633 elif not pullop.remote.capable('changegroupsubset'):
596 634 raise util.Abort(_("partial pull cannot be done because "
597 635 "other repository doesn't support "
598 636 "changegroupsubset."))
599 637 else:
600 638 cg = pullop.remote.changegroupsubset(pullop.fetch, pullop.heads, 'pull')
601 639 pullop.cgresult = changegroup.addchangegroup(pullop.repo, cg, 'pull',
602 640 pullop.remote.url())
603 641
604 642 def _pullphase(pullop):
605 643 # Get remote phases data from remote
606 644 remotephases = pullop.remote.listkeys('phases')
607 645 _pullapplyphases(pullop, remotephases)
608 646
609 647 def _pullapplyphases(pullop, remotephases):
610 648 """apply phase movement from observed remote state"""
611 649 pullop.todosteps.remove('phases')
612 650 publishing = bool(remotephases.get('publishing', False))
613 651 if remotephases and not publishing:
614 652 # remote is new and unpublishing
615 653 pheads, _dr = phases.analyzeremotephases(pullop.repo,
616 654 pullop.pulledsubset,
617 655 remotephases)
618 656 phases.advanceboundary(pullop.repo, phases.public, pheads)
619 657 phases.advanceboundary(pullop.repo, phases.draft,
620 658 pullop.pulledsubset)
621 659 else:
622 660 # Remote is old or publishing all common changesets
623 661 # should be seen as public
624 662 phases.advanceboundary(pullop.repo, phases.public,
625 663 pullop.pulledsubset)
626 664
627 665 def _pullobsolete(pullop):
628 666 """utility function to pull obsolete markers from a remote
629 667
630 668 The `gettransaction` is function that return the pull transaction, creating
631 669 one if necessary. We return the transaction to inform the calling code that
632 670 a new transaction have been created (when applicable).
633 671
634 672 Exists mostly to allow overriding for experimentation purpose"""
635 673 pullop.todosteps.remove('obsmarkers')
636 674 tr = None
637 675 if obsolete._enabled:
638 676 pullop.repo.ui.debug('fetching remote obsolete markers\n')
639 677 remoteobs = pullop.remote.listkeys('obsolete')
640 678 if 'dump0' in remoteobs:
641 679 tr = pullop.gettransaction()
642 680 for key in sorted(remoteobs, reverse=True):
643 681 if key.startswith('dump'):
644 682 data = base85.b85decode(remoteobs[key])
645 683 pullop.repo.obsstore.mergemarkers(tr, data)
646 684 pullop.repo.invalidatevolatilesets()
647 685 return tr
648 686
649 687 def caps20to10(repo):
650 688 """return a set with appropriate options to use bundle20 during getbundle"""
651 689 caps = set(['HG2X'])
652 690 capsblob = bundle2.encodecaps(repo.bundle2caps)
653 691 caps.add('bundle2=' + urllib.quote(capsblob))
654 692 return caps
655 693
656 694 def getbundle(repo, source, heads=None, common=None, bundlecaps=None,
657 695 **kwargs):
658 696 """return a full bundle (with potentially multiple kind of parts)
659 697
660 698 Could be a bundle HG10 or a bundle HG2X depending on bundlecaps
661 699 passed. For now, the bundle can contain only changegroup, but this will
662 700 changes when more part type will be available for bundle2.
663 701
664 702 This is different from changegroup.getbundle that only returns an HG10
665 703 changegroup bundle. They may eventually get reunited in the future when we
666 704 have a clearer idea of the API we what to query different data.
667 705
668 706 The implementation is at a very early stage and will get massive rework
669 707 when the API of bundle is refined.
670 708 """
671 709 # build changegroup bundle here.
672 710 cg = changegroup.getbundle(repo, source, heads=heads,
673 711 common=common, bundlecaps=bundlecaps)
674 712 if bundlecaps is None or 'HG2X' not in bundlecaps:
675 713 if kwargs:
676 714 raise ValueError(_('unsupported getbundle arguments: %s')
677 715 % ', '.join(sorted(kwargs.keys())))
678 716 return cg
679 717 # very crude first implementation,
680 718 # the bundle API will change and the generation will be done lazily.
681 719 b2caps = {}
682 720 for bcaps in bundlecaps:
683 721 if bcaps.startswith('bundle2='):
684 722 blob = urllib.unquote(bcaps[len('bundle2='):])
685 723 b2caps.update(bundle2.decodecaps(blob))
686 724 bundler = bundle2.bundle20(repo.ui, b2caps)
687 725 if cg:
688 726 bundler.newpart('b2x:changegroup', data=cg.getchunks())
689 727 listkeys = kwargs.get('listkeys', ())
690 728 for namespace in listkeys:
691 729 part = bundler.newpart('b2x:listkeys')
692 730 part.addparam('namespace', namespace)
693 731 keys = repo.listkeys(namespace).items()
694 732 part.data = pushkey.encodekeys(keys)
695 733 _getbundleextrapart(bundler, repo, source, heads=heads, common=common,
696 734 bundlecaps=bundlecaps, **kwargs)
697 735 return util.chunkbuffer(bundler.getchunks())
698 736
699 737 def _getbundleextrapart(bundler, repo, source, heads=None, common=None,
700 738 bundlecaps=None, **kwargs):
701 739 """hook function to let extensions add parts to the requested bundle"""
702 740 pass
703 741
704 742 def check_heads(repo, their_heads, context):
705 743 """check if the heads of a repo have been modified
706 744
707 745 Used by peer for unbundling.
708 746 """
709 747 heads = repo.heads()
710 748 heads_hash = util.sha1(''.join(sorted(heads))).digest()
711 749 if not (their_heads == ['force'] or their_heads == heads or
712 750 their_heads == ['hashed', heads_hash]):
713 751 # someone else committed/pushed/unbundled while we
714 752 # were transferring data
715 753 raise error.PushRaced('repository changed while %s - '
716 754 'please try again' % context)
717 755
718 756 def unbundle(repo, cg, heads, source, url):
719 757 """Apply a bundle to a repo.
720 758
721 759 this function makes sure the repo is locked during the application and have
722 760 mechanism to check that no push race occurred between the creation of the
723 761 bundle and its application.
724 762
725 763 If the push was raced as PushRaced exception is raised."""
726 764 r = 0
727 765 # need a transaction when processing a bundle2 stream
728 766 tr = None
729 767 lock = repo.lock()
730 768 try:
731 769 check_heads(repo, heads, 'uploading changes')
732 770 # push can proceed
733 771 if util.safehasattr(cg, 'params'):
734 772 try:
735 773 tr = repo.transaction('unbundle')
736 774 tr.hookargs['bundle2-exp'] = '1'
737 775 r = bundle2.processbundle(repo, cg, lambda: tr).reply
738 776 cl = repo.unfiltered().changelog
739 777 p = cl.writepending() and repo.root or ""
740 778 repo.hook('b2x-pretransactionclose', throw=True, source=source,
741 779 url=url, pending=p, **tr.hookargs)
742 780 tr.close()
743 781 repo.hook('b2x-transactionclose', source=source, url=url,
744 782 **tr.hookargs)
745 783 except Exception, exc:
746 784 exc.duringunbundle2 = True
747 785 raise
748 786 else:
749 787 r = changegroup.addchangegroup(repo, cg, source, url)
750 788 finally:
751 789 if tr is not None:
752 790 tr.release()
753 791 lock.release()
754 792 return r
General Comments 0
You need to be logged in to leave comments. Login now