##// END OF EJS Templates
bundle2-push: introduce a list of part generating functions...
Pierre-Yves David -
r21904:5fbccbcc default
parent child Browse files
Show More
@@ -1,821 +1,827
1 1 # exchange.py - utility to exchange data between repos.
2 2 #
3 3 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from i18n import _
9 9 from node import hex, nullid
10 10 import errno, urllib
11 11 import util, scmutil, changegroup, base85, error
12 12 import discovery, phases, obsolete, bookmarks, bundle2, pushkey
13 13
14 14 def readbundle(ui, fh, fname, vfs=None):
15 15 header = changegroup.readexactly(fh, 4)
16 16
17 17 alg = None
18 18 if not fname:
19 19 fname = "stream"
20 20 if not header.startswith('HG') and header.startswith('\0'):
21 21 fh = changegroup.headerlessfixup(fh, header)
22 22 header = "HG10"
23 23 alg = 'UN'
24 24 elif vfs:
25 25 fname = vfs.join(fname)
26 26
27 27 magic, version = header[0:2], header[2:4]
28 28
29 29 if magic != 'HG':
30 30 raise util.Abort(_('%s: not a Mercurial bundle') % fname)
31 31 if version == '10':
32 32 if alg is None:
33 33 alg = changegroup.readexactly(fh, 2)
34 34 return changegroup.unbundle10(fh, alg)
35 35 elif version == '2X':
36 36 return bundle2.unbundle20(ui, fh, header=magic + version)
37 37 else:
38 38 raise util.Abort(_('%s: unknown bundle version %s') % (fname, version))
39 39
40 40
41 41 class pushoperation(object):
42 42 """A object that represent a single push operation
43 43
44 44 It purpose is to carry push related state and very common operation.
45 45
46 46 A new should be created at the beginning of each push and discarded
47 47 afterward.
48 48 """
49 49
50 50 def __init__(self, repo, remote, force=False, revs=None, newbranch=False):
51 51 # repo we push from
52 52 self.repo = repo
53 53 self.ui = repo.ui
54 54 # repo we push to
55 55 self.remote = remote
56 56 # force option provided
57 57 self.force = force
58 58 # revs to be pushed (None is "all")
59 59 self.revs = revs
60 60 # allow push of new branch
61 61 self.newbranch = newbranch
62 62 # did a local lock get acquired?
63 63 self.locallocked = None
64 64 # step already performed
65 65 # (used to check what steps have been already performed through bundle2)
66 66 self.stepsdone = set()
67 67 # Integer version of the push result
68 68 # - None means nothing to push
69 69 # - 0 means HTTP error
70 70 # - 1 means we pushed and remote head count is unchanged *or*
71 71 # we have outgoing changesets but refused to push
72 72 # - other values as described by addchangegroup()
73 73 self.ret = None
74 74 # discover.outgoing object (contains common and outgoing data)
75 75 self.outgoing = None
76 76 # all remote heads before the push
77 77 self.remoteheads = None
78 78 # testable as a boolean indicating if any nodes are missing locally.
79 79 self.incoming = None
80 80 # set of all heads common after changeset bundle push
81 81 self.commonheads = None
82 82
83 83 def push(repo, remote, force=False, revs=None, newbranch=False):
84 84 '''Push outgoing changesets (limited by revs) from a local
85 85 repository to remote. Return an integer:
86 86 - None means nothing to push
87 87 - 0 means HTTP error
88 88 - 1 means we pushed and remote head count is unchanged *or*
89 89 we have outgoing changesets but refused to push
90 90 - other values as described by addchangegroup()
91 91 '''
92 92 pushop = pushoperation(repo, remote, force, revs, newbranch)
93 93 if pushop.remote.local():
94 94 missing = (set(pushop.repo.requirements)
95 95 - pushop.remote.local().supported)
96 96 if missing:
97 97 msg = _("required features are not"
98 98 " supported in the destination:"
99 99 " %s") % (', '.join(sorted(missing)))
100 100 raise util.Abort(msg)
101 101
102 102 # there are two ways to push to remote repo:
103 103 #
104 104 # addchangegroup assumes local user can lock remote
105 105 # repo (local filesystem, old ssh servers).
106 106 #
107 107 # unbundle assumes local user cannot lock remote repo (new ssh
108 108 # servers, http servers).
109 109
110 110 if not pushop.remote.canpush():
111 111 raise util.Abort(_("destination does not support push"))
112 112 # get local lock as we might write phase data
113 113 locallock = None
114 114 try:
115 115 locallock = pushop.repo.lock()
116 116 pushop.locallocked = True
117 117 except IOError, err:
118 118 pushop.locallocked = False
119 119 if err.errno != errno.EACCES:
120 120 raise
121 121 # source repo cannot be locked.
122 122 # We do not abort the push, but just disable the local phase
123 123 # synchronisation.
124 124 msg = 'cannot lock source repository: %s\n' % err
125 125 pushop.ui.debug(msg)
126 126 try:
127 127 pushop.repo.checkpush(pushop)
128 128 lock = None
129 129 unbundle = pushop.remote.capable('unbundle')
130 130 if not unbundle:
131 131 lock = pushop.remote.lock()
132 132 try:
133 133 _pushdiscovery(pushop)
134 134 if (pushop.repo.ui.configbool('experimental', 'bundle2-exp',
135 135 False)
136 136 and pushop.remote.capable('bundle2-exp')):
137 137 _pushbundle2(pushop)
138 138 _pushchangeset(pushop)
139 139 _pushcomputecommonheads(pushop)
140 140 _pushsyncphase(pushop)
141 141 _pushobsolete(pushop)
142 142 finally:
143 143 if lock is not None:
144 144 lock.release()
145 145 finally:
146 146 if locallock is not None:
147 147 locallock.release()
148 148
149 149 _pushbookmark(pushop)
150 150 return pushop.ret
151 151
152 152 def _pushdiscovery(pushop):
153 153 # discovery
154 154 unfi = pushop.repo.unfiltered()
155 155 fci = discovery.findcommonincoming
156 156 commoninc = fci(unfi, pushop.remote, force=pushop.force)
157 157 common, inc, remoteheads = commoninc
158 158 fco = discovery.findcommonoutgoing
159 159 outgoing = fco(unfi, pushop.remote, onlyheads=pushop.revs,
160 160 commoninc=commoninc, force=pushop.force)
161 161 pushop.outgoing = outgoing
162 162 pushop.remoteheads = remoteheads
163 163 pushop.incoming = inc
164 164
165 165 def _pushcheckoutgoing(pushop):
166 166 outgoing = pushop.outgoing
167 167 unfi = pushop.repo.unfiltered()
168 168 if not outgoing.missing:
169 169 # nothing to push
170 170 scmutil.nochangesfound(unfi.ui, unfi, outgoing.excluded)
171 171 return False
172 172 # something to push
173 173 if not pushop.force:
174 174 # if repo.obsstore == False --> no obsolete
175 175 # then, save the iteration
176 176 if unfi.obsstore:
177 177 # this message are here for 80 char limit reason
178 178 mso = _("push includes obsolete changeset: %s!")
179 179 mst = "push includes %s changeset: %s!"
180 180 # plain versions for i18n tool to detect them
181 181 _("push includes unstable changeset: %s!")
182 182 _("push includes bumped changeset: %s!")
183 183 _("push includes divergent changeset: %s!")
184 184 # If we are to push if there is at least one
185 185 # obsolete or unstable changeset in missing, at
186 186 # least one of the missinghead will be obsolete or
187 187 # unstable. So checking heads only is ok
188 188 for node in outgoing.missingheads:
189 189 ctx = unfi[node]
190 190 if ctx.obsolete():
191 191 raise util.Abort(mso % ctx)
192 192 elif ctx.troubled():
193 193 raise util.Abort(_(mst)
194 194 % (ctx.troubles()[0],
195 195 ctx))
196 196 newbm = pushop.ui.configlist('bookmarks', 'pushing')
197 197 discovery.checkheads(unfi, pushop.remote, outgoing,
198 198 pushop.remoteheads,
199 199 pushop.newbranch,
200 200 bool(pushop.incoming),
201 201 newbm)
202 202 return True
203 203
204 204 def _pushb2ctx(pushop, bundler):
205 205 """handle changegroup push through bundle2
206 206
207 207 addchangegroup result is stored in the ``pushop.ret`` attribute.
208 208 """
209 209 if 'changesets' in pushop.stepsdone:
210 210 return
211 211 pushop.stepsdone.add('changesets')
212 212 # Send known heads to the server for race detection.
213 213 pushop.stepsdone.add('changesets')
214 214 if not _pushcheckoutgoing(pushop):
215 215 return
216 216 pushop.repo.prepushoutgoinghooks(pushop.repo,
217 217 pushop.remote,
218 218 pushop.outgoing)
219 219 if not pushop.force:
220 220 bundler.newpart('B2X:CHECK:HEADS', data=iter(pushop.remoteheads))
221 221 cg = changegroup.getlocalbundle(pushop.repo, 'push', pushop.outgoing)
222 222 cgpart = bundler.newpart('B2X:CHANGEGROUP', data=cg.getchunks())
223 223 def handlereply(op):
224 224 """extract addchangroup returns from server reply"""
225 225 cgreplies = op.records.getreplies(cgpart.id)
226 226 assert len(cgreplies['changegroup']) == 1
227 227 pushop.ret = cgreplies['changegroup'][0]['return']
228 228 return handlereply
229 229
230 # list of function that may decide to add parts to an outgoing bundle2
231 bundle2partsgenerators = [_pushb2ctx]
232
230 233 def _pushbundle2(pushop):
231 234 """push data to the remote using bundle2
232 235
233 236 The only currently supported type of data is changegroup but this will
234 237 evolve in the future."""
235 238 bundler = bundle2.bundle20(pushop.ui, bundle2.bundle2caps(pushop.remote))
236 239 # create reply capability
237 240 capsblob = bundle2.encodecaps(pushop.repo.bundle2caps)
238 241 bundler.newpart('b2x:replycaps', data=capsblob)
239 242 extrainfo = _pushbundle2extraparts(pushop, bundler)
240 # add the changegroup bundle
241 cgreplyhandler = _pushb2ctx(pushop, bundler)
242 # do not push if no other parts than the capability
243 replyhandlers = []
244 for partgen in bundle2partsgenerators:
245 ret = partgen(pushop, bundler)
246 replyhandlers.append(ret)
247 # do not push if nothing to push
243 248 if bundler.nbparts <= 1:
244 249 return
245 250 stream = util.chunkbuffer(bundler.getchunks())
246 251 try:
247 252 reply = pushop.remote.unbundle(stream, ['force'], 'push')
248 253 except error.BundleValueError, exc:
249 254 raise util.Abort('missing support for %s' % exc)
250 255 try:
251 256 op = bundle2.processbundle(pushop.repo, reply)
252 257 except error.BundleValueError, exc:
253 258 raise util.Abort('missing support for %s' % exc)
254 cgreplyhandler(op)
259 for rephand in replyhandlers:
260 rephand(op)
255 261 _pushbundle2extrareply(pushop, op, extrainfo)
256 262
257 263 def _pushbundle2extraparts(pushop, bundler):
258 264 """hook function to let extensions add parts
259 265
260 266 Return a dict to let extensions pass data to the reply processing.
261 267 """
262 268 return {}
263 269
264 270 def _pushbundle2extrareply(pushop, op, extrainfo):
265 271 """hook function to let extensions react to part replies
266 272
267 273 The dict from _pushbundle2extrareply is fed to this function.
268 274 """
269 275 pass
270 276
271 277 def _pushchangeset(pushop):
272 278 """Make the actual push of changeset bundle to remote repo"""
273 279 if 'changesets' in pushop.stepsdone:
274 280 return
275 281 pushop.stepsdone.add('changesets')
276 282 if not _pushcheckoutgoing(pushop):
277 283 return
278 284 pushop.repo.prepushoutgoinghooks(pushop.repo,
279 285 pushop.remote,
280 286 pushop.outgoing)
281 287 outgoing = pushop.outgoing
282 288 unbundle = pushop.remote.capable('unbundle')
283 289 # TODO: get bundlecaps from remote
284 290 bundlecaps = None
285 291 # create a changegroup from local
286 292 if pushop.revs is None and not (outgoing.excluded
287 293 or pushop.repo.changelog.filteredrevs):
288 294 # push everything,
289 295 # use the fast path, no race possible on push
290 296 bundler = changegroup.bundle10(pushop.repo, bundlecaps)
291 297 cg = changegroup.getsubset(pushop.repo,
292 298 outgoing,
293 299 bundler,
294 300 'push',
295 301 fastpath=True)
296 302 else:
297 303 cg = changegroup.getlocalbundle(pushop.repo, 'push', outgoing,
298 304 bundlecaps)
299 305
300 306 # apply changegroup to remote
301 307 if unbundle:
302 308 # local repo finds heads on server, finds out what
303 309 # revs it must push. once revs transferred, if server
304 310 # finds it has different heads (someone else won
305 311 # commit/push race), server aborts.
306 312 if pushop.force:
307 313 remoteheads = ['force']
308 314 else:
309 315 remoteheads = pushop.remoteheads
310 316 # ssh: return remote's addchangegroup()
311 317 # http: return remote's addchangegroup() or 0 for error
312 318 pushop.ret = pushop.remote.unbundle(cg, remoteheads,
313 319 pushop.repo.url())
314 320 else:
315 321 # we return an integer indicating remote head count
316 322 # change
317 323 pushop.ret = pushop.remote.addchangegroup(cg, 'push', pushop.repo.url())
318 324
319 325 def _pushcomputecommonheads(pushop):
320 326 unfi = pushop.repo.unfiltered()
321 327 if pushop.ret:
322 328 # push succeed, synchronize target of the push
323 329 cheads = pushop.outgoing.missingheads
324 330 elif pushop.revs is None:
325 331 # All out push fails. synchronize all common
326 332 cheads = pushop.outgoing.commonheads
327 333 else:
328 334 # I want cheads = heads(::missingheads and ::commonheads)
329 335 # (missingheads is revs with secret changeset filtered out)
330 336 #
331 337 # This can be expressed as:
332 338 # cheads = ( (missingheads and ::commonheads)
333 339 # + (commonheads and ::missingheads))"
334 340 # )
335 341 #
336 342 # while trying to push we already computed the following:
337 343 # common = (::commonheads)
338 344 # missing = ((commonheads::missingheads) - commonheads)
339 345 #
340 346 # We can pick:
341 347 # * missingheads part of common (::commonheads)
342 348 common = set(pushop.outgoing.common)
343 349 nm = pushop.repo.changelog.nodemap
344 350 cheads = [node for node in pushop.revs if nm[node] in common]
345 351 # and
346 352 # * commonheads parents on missing
347 353 revset = unfi.set('%ln and parents(roots(%ln))',
348 354 pushop.outgoing.commonheads,
349 355 pushop.outgoing.missing)
350 356 cheads.extend(c.node() for c in revset)
351 357 pushop.commonheads = cheads
352 358
353 359 def _pushsyncphase(pushop):
354 360 """synchronise phase information locally and remotely"""
355 361 unfi = pushop.repo.unfiltered()
356 362 cheads = pushop.commonheads
357 363 # even when we don't push, exchanging phase data is useful
358 364 remotephases = pushop.remote.listkeys('phases')
359 365 if (pushop.ui.configbool('ui', '_usedassubrepo', False)
360 366 and remotephases # server supports phases
361 367 and pushop.ret is None # nothing was pushed
362 368 and remotephases.get('publishing', False)):
363 369 # When:
364 370 # - this is a subrepo push
365 371 # - and remote support phase
366 372 # - and no changeset was pushed
367 373 # - and remote is publishing
368 374 # We may be in issue 3871 case!
369 375 # We drop the possible phase synchronisation done by
370 376 # courtesy to publish changesets possibly locally draft
371 377 # on the remote.
372 378 remotephases = {'publishing': 'True'}
373 379 if not remotephases: # old server or public only reply from non-publishing
374 380 _localphasemove(pushop, cheads)
375 381 # don't push any phase data as there is nothing to push
376 382 else:
377 383 ana = phases.analyzeremotephases(pushop.repo, cheads,
378 384 remotephases)
379 385 pheads, droots = ana
380 386 ### Apply remote phase on local
381 387 if remotephases.get('publishing', False):
382 388 _localphasemove(pushop, cheads)
383 389 else: # publish = False
384 390 _localphasemove(pushop, pheads)
385 391 _localphasemove(pushop, cheads, phases.draft)
386 392 ### Apply local phase on remote
387 393
388 394 # Get the list of all revs draft on remote by public here.
389 395 # XXX Beware that revset break if droots is not strictly
390 396 # XXX root we may want to ensure it is but it is costly
391 397 outdated = unfi.set('heads((%ln::%ln) and public())',
392 398 droots, cheads)
393 399
394 400 b2caps = bundle2.bundle2caps(pushop.remote)
395 401 if 'b2x:pushkey' in b2caps:
396 402 # server supports bundle2, let's do a batched push through it
397 403 #
398 404 # This will eventually be unified with the changesets bundle2 push
399 405 bundler = bundle2.bundle20(pushop.ui, b2caps)
400 406 capsblob = bundle2.encodecaps(pushop.repo.bundle2caps)
401 407 bundler.newpart('b2x:replycaps', data=capsblob)
402 408 part2node = []
403 409 enc = pushkey.encode
404 410 for newremotehead in outdated:
405 411 part = bundler.newpart('b2x:pushkey')
406 412 part.addparam('namespace', enc('phases'))
407 413 part.addparam('key', enc(newremotehead.hex()))
408 414 part.addparam('old', enc(str(phases.draft)))
409 415 part.addparam('new', enc(str(phases.public)))
410 416 part2node.append((part.id, newremotehead))
411 417 stream = util.chunkbuffer(bundler.getchunks())
412 418 try:
413 419 reply = pushop.remote.unbundle(stream, ['force'], 'push')
414 420 op = bundle2.processbundle(pushop.repo, reply)
415 421 except error.BundleValueError, exc:
416 422 raise util.Abort('missing support for %s' % exc)
417 423 for partid, node in part2node:
418 424 partrep = op.records.getreplies(partid)
419 425 results = partrep['pushkey']
420 426 assert len(results) <= 1
421 427 msg = None
422 428 if not results:
423 429 msg = _('server ignored update of %s to public!\n') % node
424 430 elif not int(results[0]['return']):
425 431 msg = _('updating %s to public failed!\n') % node
426 432 if msg is not None:
427 433 pushop.ui.warn(msg)
428 434
429 435 else:
430 436 # fallback to independant pushkey command
431 437 for newremotehead in outdated:
432 438 r = pushop.remote.pushkey('phases',
433 439 newremotehead.hex(),
434 440 str(phases.draft),
435 441 str(phases.public))
436 442 if not r:
437 443 pushop.ui.warn(_('updating %s to public failed!\n')
438 444 % newremotehead)
439 445
440 446 def _localphasemove(pushop, nodes, phase=phases.public):
441 447 """move <nodes> to <phase> in the local source repo"""
442 448 if pushop.locallocked:
443 449 phases.advanceboundary(pushop.repo, phase, nodes)
444 450 else:
445 451 # repo is not locked, do not change any phases!
446 452 # Informs the user that phases should have been moved when
447 453 # applicable.
448 454 actualmoves = [n for n in nodes if phase < pushop.repo[n].phase()]
449 455 phasestr = phases.phasenames[phase]
450 456 if actualmoves:
451 457 pushop.ui.status(_('cannot lock source repo, skipping '
452 458 'local %s phase update\n') % phasestr)
453 459
454 460 def _pushobsolete(pushop):
455 461 """utility function to push obsolete markers to a remote"""
456 462 pushop.ui.debug('try to push obsolete markers to remote\n')
457 463 repo = pushop.repo
458 464 remote = pushop.remote
459 465 if (obsolete._enabled and repo.obsstore and
460 466 'obsolete' in remote.listkeys('namespaces')):
461 467 rslts = []
462 468 remotedata = repo.listkeys('obsolete')
463 469 for key in sorted(remotedata, reverse=True):
464 470 # reverse sort to ensure we end with dump0
465 471 data = remotedata[key]
466 472 rslts.append(remote.pushkey('obsolete', key, '', data))
467 473 if [r for r in rslts if not r]:
468 474 msg = _('failed to push some obsolete markers!\n')
469 475 repo.ui.warn(msg)
470 476
471 477 def _pushbookmark(pushop):
472 478 """Update bookmark position on remote"""
473 479 ui = pushop.ui
474 480 repo = pushop.repo.unfiltered()
475 481 remote = pushop.remote
476 482 ui.debug("checking for updated bookmarks\n")
477 483 revnums = map(repo.changelog.rev, pushop.revs or [])
478 484 ancestors = [a for a in repo.changelog.ancestors(revnums, inclusive=True)]
479 485 (addsrc, adddst, advsrc, advdst, diverge, differ, invalid
480 486 ) = bookmarks.compare(repo, repo._bookmarks, remote.listkeys('bookmarks'),
481 487 srchex=hex)
482 488
483 489 for b, scid, dcid in advsrc:
484 490 if ancestors and repo[scid].rev() not in ancestors:
485 491 continue
486 492 if remote.pushkey('bookmarks', b, dcid, scid):
487 493 ui.status(_("updating bookmark %s\n") % b)
488 494 else:
489 495 ui.warn(_('updating bookmark %s failed!\n') % b)
490 496
491 497 class pulloperation(object):
492 498 """A object that represent a single pull operation
493 499
494 500 It purpose is to carry push related state and very common operation.
495 501
496 502 A new should be created at the beginning of each pull and discarded
497 503 afterward.
498 504 """
499 505
500 506 def __init__(self, repo, remote, heads=None, force=False):
501 507 # repo we pull into
502 508 self.repo = repo
503 509 # repo we pull from
504 510 self.remote = remote
505 511 # revision we try to pull (None is "all")
506 512 self.heads = heads
507 513 # do we force pull?
508 514 self.force = force
509 515 # the name the pull transaction
510 516 self._trname = 'pull\n' + util.hidepassword(remote.url())
511 517 # hold the transaction once created
512 518 self._tr = None
513 519 # set of common changeset between local and remote before pull
514 520 self.common = None
515 521 # set of pulled head
516 522 self.rheads = None
517 523 # list of missing changeset to fetch remotely
518 524 self.fetch = None
519 525 # result of changegroup pulling (used as return code by pull)
520 526 self.cgresult = None
521 527 # list of step remaining todo (related to future bundle2 usage)
522 528 self.todosteps = set(['changegroup', 'phases', 'obsmarkers'])
523 529
524 530 @util.propertycache
525 531 def pulledsubset(self):
526 532 """heads of the set of changeset target by the pull"""
527 533 # compute target subset
528 534 if self.heads is None:
529 535 # We pulled every thing possible
530 536 # sync on everything common
531 537 c = set(self.common)
532 538 ret = list(self.common)
533 539 for n in self.rheads:
534 540 if n not in c:
535 541 ret.append(n)
536 542 return ret
537 543 else:
538 544 # We pulled a specific subset
539 545 # sync on this subset
540 546 return self.heads
541 547
542 548 def gettransaction(self):
543 549 """get appropriate pull transaction, creating it if needed"""
544 550 if self._tr is None:
545 551 self._tr = self.repo.transaction(self._trname)
546 552 return self._tr
547 553
548 554 def closetransaction(self):
549 555 """close transaction if created"""
550 556 if self._tr is not None:
551 557 self._tr.close()
552 558
553 559 def releasetransaction(self):
554 560 """release transaction if created"""
555 561 if self._tr is not None:
556 562 self._tr.release()
557 563
558 564 def pull(repo, remote, heads=None, force=False):
559 565 pullop = pulloperation(repo, remote, heads, force)
560 566 if pullop.remote.local():
561 567 missing = set(pullop.remote.requirements) - pullop.repo.supported
562 568 if missing:
563 569 msg = _("required features are not"
564 570 " supported in the destination:"
565 571 " %s") % (', '.join(sorted(missing)))
566 572 raise util.Abort(msg)
567 573
568 574 lock = pullop.repo.lock()
569 575 try:
570 576 _pulldiscovery(pullop)
571 577 if (pullop.repo.ui.configbool('experimental', 'bundle2-exp', False)
572 578 and pullop.remote.capable('bundle2-exp')):
573 579 _pullbundle2(pullop)
574 580 if 'changegroup' in pullop.todosteps:
575 581 _pullchangeset(pullop)
576 582 if 'phases' in pullop.todosteps:
577 583 _pullphase(pullop)
578 584 if 'obsmarkers' in pullop.todosteps:
579 585 _pullobsolete(pullop)
580 586 pullop.closetransaction()
581 587 finally:
582 588 pullop.releasetransaction()
583 589 lock.release()
584 590
585 591 return pullop.cgresult
586 592
587 593 def _pulldiscovery(pullop):
588 594 """discovery phase for the pull
589 595
590 596 Current handle changeset discovery only, will change handle all discovery
591 597 at some point."""
592 598 tmp = discovery.findcommonincoming(pullop.repo.unfiltered(),
593 599 pullop.remote,
594 600 heads=pullop.heads,
595 601 force=pullop.force)
596 602 pullop.common, pullop.fetch, pullop.rheads = tmp
597 603
598 604 def _pullbundle2(pullop):
599 605 """pull data using bundle2
600 606
601 607 For now, the only supported data are changegroup."""
602 608 remotecaps = bundle2.bundle2caps(pullop.remote)
603 609 kwargs = {'bundlecaps': caps20to10(pullop.repo)}
604 610 # pulling changegroup
605 611 pullop.todosteps.remove('changegroup')
606 612
607 613 kwargs['common'] = pullop.common
608 614 kwargs['heads'] = pullop.heads or pullop.rheads
609 615 if 'b2x:listkeys' in remotecaps:
610 616 kwargs['listkeys'] = ['phase']
611 617 if not pullop.fetch:
612 618 pullop.repo.ui.status(_("no changes found\n"))
613 619 pullop.cgresult = 0
614 620 else:
615 621 if pullop.heads is None and list(pullop.common) == [nullid]:
616 622 pullop.repo.ui.status(_("requesting all changes\n"))
617 623 _pullbundle2extraprepare(pullop, kwargs)
618 624 if kwargs.keys() == ['format']:
619 625 return # nothing to pull
620 626 bundle = pullop.remote.getbundle('pull', **kwargs)
621 627 try:
622 628 op = bundle2.processbundle(pullop.repo, bundle, pullop.gettransaction)
623 629 except error.BundleValueError, exc:
624 630 raise util.Abort('missing support for %s' % exc)
625 631
626 632 if pullop.fetch:
627 633 assert len(op.records['changegroup']) == 1
628 634 pullop.cgresult = op.records['changegroup'][0]['return']
629 635
630 636 # processing phases change
631 637 for namespace, value in op.records['listkeys']:
632 638 if namespace == 'phases':
633 639 _pullapplyphases(pullop, value)
634 640
635 641 def _pullbundle2extraprepare(pullop, kwargs):
636 642 """hook function so that extensions can extend the getbundle call"""
637 643 pass
638 644
639 645 def _pullchangeset(pullop):
640 646 """pull changeset from unbundle into the local repo"""
641 647 # We delay the open of the transaction as late as possible so we
642 648 # don't open transaction for nothing or you break future useful
643 649 # rollback call
644 650 pullop.todosteps.remove('changegroup')
645 651 if not pullop.fetch:
646 652 pullop.repo.ui.status(_("no changes found\n"))
647 653 pullop.cgresult = 0
648 654 return
649 655 pullop.gettransaction()
650 656 if pullop.heads is None and list(pullop.common) == [nullid]:
651 657 pullop.repo.ui.status(_("requesting all changes\n"))
652 658 elif pullop.heads is None and pullop.remote.capable('changegroupsubset'):
653 659 # issue1320, avoid a race if remote changed after discovery
654 660 pullop.heads = pullop.rheads
655 661
656 662 if pullop.remote.capable('getbundle'):
657 663 # TODO: get bundlecaps from remote
658 664 cg = pullop.remote.getbundle('pull', common=pullop.common,
659 665 heads=pullop.heads or pullop.rheads)
660 666 elif pullop.heads is None:
661 667 cg = pullop.remote.changegroup(pullop.fetch, 'pull')
662 668 elif not pullop.remote.capable('changegroupsubset'):
663 669 raise util.Abort(_("partial pull cannot be done because "
664 670 "other repository doesn't support "
665 671 "changegroupsubset."))
666 672 else:
667 673 cg = pullop.remote.changegroupsubset(pullop.fetch, pullop.heads, 'pull')
668 674 pullop.cgresult = changegroup.addchangegroup(pullop.repo, cg, 'pull',
669 675 pullop.remote.url())
670 676
671 677 def _pullphase(pullop):
672 678 # Get remote phases data from remote
673 679 remotephases = pullop.remote.listkeys('phases')
674 680 _pullapplyphases(pullop, remotephases)
675 681
676 682 def _pullapplyphases(pullop, remotephases):
677 683 """apply phase movement from observed remote state"""
678 684 pullop.todosteps.remove('phases')
679 685 publishing = bool(remotephases.get('publishing', False))
680 686 if remotephases and not publishing:
681 687 # remote is new and unpublishing
682 688 pheads, _dr = phases.analyzeremotephases(pullop.repo,
683 689 pullop.pulledsubset,
684 690 remotephases)
685 691 phases.advanceboundary(pullop.repo, phases.public, pheads)
686 692 phases.advanceboundary(pullop.repo, phases.draft,
687 693 pullop.pulledsubset)
688 694 else:
689 695 # Remote is old or publishing all common changesets
690 696 # should be seen as public
691 697 phases.advanceboundary(pullop.repo, phases.public,
692 698 pullop.pulledsubset)
693 699
694 700 def _pullobsolete(pullop):
695 701 """utility function to pull obsolete markers from a remote
696 702
697 703 The `gettransaction` is function that return the pull transaction, creating
698 704 one if necessary. We return the transaction to inform the calling code that
699 705 a new transaction have been created (when applicable).
700 706
701 707 Exists mostly to allow overriding for experimentation purpose"""
702 708 pullop.todosteps.remove('obsmarkers')
703 709 tr = None
704 710 if obsolete._enabled:
705 711 pullop.repo.ui.debug('fetching remote obsolete markers\n')
706 712 remoteobs = pullop.remote.listkeys('obsolete')
707 713 if 'dump0' in remoteobs:
708 714 tr = pullop.gettransaction()
709 715 for key in sorted(remoteobs, reverse=True):
710 716 if key.startswith('dump'):
711 717 data = base85.b85decode(remoteobs[key])
712 718 pullop.repo.obsstore.mergemarkers(tr, data)
713 719 pullop.repo.invalidatevolatilesets()
714 720 return tr
715 721
716 722 def caps20to10(repo):
717 723 """return a set with appropriate options to use bundle20 during getbundle"""
718 724 caps = set(['HG2X'])
719 725 capsblob = bundle2.encodecaps(repo.bundle2caps)
720 726 caps.add('bundle2=' + urllib.quote(capsblob))
721 727 return caps
722 728
723 729 def getbundle(repo, source, heads=None, common=None, bundlecaps=None,
724 730 **kwargs):
725 731 """return a full bundle (with potentially multiple kind of parts)
726 732
727 733 Could be a bundle HG10 or a bundle HG2X depending on bundlecaps
728 734 passed. For now, the bundle can contain only changegroup, but this will
729 735 changes when more part type will be available for bundle2.
730 736
731 737 This is different from changegroup.getbundle that only returns an HG10
732 738 changegroup bundle. They may eventually get reunited in the future when we
733 739 have a clearer idea of the API we what to query different data.
734 740
735 741 The implementation is at a very early stage and will get massive rework
736 742 when the API of bundle is refined.
737 743 """
738 744 # build changegroup bundle here.
739 745 cg = changegroup.getbundle(repo, source, heads=heads,
740 746 common=common, bundlecaps=bundlecaps)
741 747 if bundlecaps is None or 'HG2X' not in bundlecaps:
742 748 if kwargs:
743 749 raise ValueError(_('unsupported getbundle arguments: %s')
744 750 % ', '.join(sorted(kwargs.keys())))
745 751 return cg
746 752 # very crude first implementation,
747 753 # the bundle API will change and the generation will be done lazily.
748 754 b2caps = {}
749 755 for bcaps in bundlecaps:
750 756 if bcaps.startswith('bundle2='):
751 757 blob = urllib.unquote(bcaps[len('bundle2='):])
752 758 b2caps.update(bundle2.decodecaps(blob))
753 759 bundler = bundle2.bundle20(repo.ui, b2caps)
754 760 if cg:
755 761 bundler.newpart('b2x:changegroup', data=cg.getchunks())
756 762 listkeys = kwargs.get('listkeys', ())
757 763 for namespace in listkeys:
758 764 part = bundler.newpart('b2x:listkeys')
759 765 part.addparam('namespace', namespace)
760 766 keys = repo.listkeys(namespace).items()
761 767 part.data = pushkey.encodekeys(keys)
762 768 _getbundleextrapart(bundler, repo, source, heads=heads, common=common,
763 769 bundlecaps=bundlecaps, **kwargs)
764 770 return util.chunkbuffer(bundler.getchunks())
765 771
766 772 def _getbundleextrapart(bundler, repo, source, heads=None, common=None,
767 773 bundlecaps=None, **kwargs):
768 774 """hook function to let extensions add parts to the requested bundle"""
769 775 pass
770 776
771 777 def check_heads(repo, their_heads, context):
772 778 """check if the heads of a repo have been modified
773 779
774 780 Used by peer for unbundling.
775 781 """
776 782 heads = repo.heads()
777 783 heads_hash = util.sha1(''.join(sorted(heads))).digest()
778 784 if not (their_heads == ['force'] or their_heads == heads or
779 785 their_heads == ['hashed', heads_hash]):
780 786 # someone else committed/pushed/unbundled while we
781 787 # were transferring data
782 788 raise error.PushRaced('repository changed while %s - '
783 789 'please try again' % context)
784 790
785 791 def unbundle(repo, cg, heads, source, url):
786 792 """Apply a bundle to a repo.
787 793
788 794 this function makes sure the repo is locked during the application and have
789 795 mechanism to check that no push race occurred between the creation of the
790 796 bundle and its application.
791 797
792 798 If the push was raced as PushRaced exception is raised."""
793 799 r = 0
794 800 # need a transaction when processing a bundle2 stream
795 801 tr = None
796 802 lock = repo.lock()
797 803 try:
798 804 check_heads(repo, heads, 'uploading changes')
799 805 # push can proceed
800 806 if util.safehasattr(cg, 'params'):
801 807 try:
802 808 tr = repo.transaction('unbundle')
803 809 tr.hookargs['bundle2-exp'] = '1'
804 810 r = bundle2.processbundle(repo, cg, lambda: tr).reply
805 811 cl = repo.unfiltered().changelog
806 812 p = cl.writepending() and repo.root or ""
807 813 repo.hook('b2x-pretransactionclose', throw=True, source=source,
808 814 url=url, pending=p, **tr.hookargs)
809 815 tr.close()
810 816 repo.hook('b2x-transactionclose', source=source, url=url,
811 817 **tr.hookargs)
812 818 except Exception, exc:
813 819 exc.duringunbundle2 = True
814 820 raise
815 821 else:
816 822 r = changegroup.addchangegroup(repo, cg, source, url)
817 823 finally:
818 824 if tr is not None:
819 825 tr.release()
820 826 lock.release()
821 827 return r
General Comments 0
You need to be logged in to leave comments. Login now