##// END OF EJS Templates
bundle2-push: move changegroup push validation inside _pushb2ctx...
Pierre-Yves David -
r21903:48f61cfb default
parent child Browse files
Show More
@@ -1,811 +1,821
1 1 # exchange.py - utility to exchange data between repos.
2 2 #
3 3 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from i18n import _
9 9 from node import hex, nullid
10 10 import errno, urllib
11 11 import util, scmutil, changegroup, base85, error
12 12 import discovery, phases, obsolete, bookmarks, bundle2, pushkey
13 13
14 14 def readbundle(ui, fh, fname, vfs=None):
15 15 header = changegroup.readexactly(fh, 4)
16 16
17 17 alg = None
18 18 if not fname:
19 19 fname = "stream"
20 20 if not header.startswith('HG') and header.startswith('\0'):
21 21 fh = changegroup.headerlessfixup(fh, header)
22 22 header = "HG10"
23 23 alg = 'UN'
24 24 elif vfs:
25 25 fname = vfs.join(fname)
26 26
27 27 magic, version = header[0:2], header[2:4]
28 28
29 29 if magic != 'HG':
30 30 raise util.Abort(_('%s: not a Mercurial bundle') % fname)
31 31 if version == '10':
32 32 if alg is None:
33 33 alg = changegroup.readexactly(fh, 2)
34 34 return changegroup.unbundle10(fh, alg)
35 35 elif version == '2X':
36 36 return bundle2.unbundle20(ui, fh, header=magic + version)
37 37 else:
38 38 raise util.Abort(_('%s: unknown bundle version %s') % (fname, version))
39 39
40 40
41 41 class pushoperation(object):
42 42 """A object that represent a single push operation
43 43
44 44 It purpose is to carry push related state and very common operation.
45 45
46 46 A new should be created at the beginning of each push and discarded
47 47 afterward.
48 48 """
49 49
50 50 def __init__(self, repo, remote, force=False, revs=None, newbranch=False):
51 51 # repo we push from
52 52 self.repo = repo
53 53 self.ui = repo.ui
54 54 # repo we push to
55 55 self.remote = remote
56 56 # force option provided
57 57 self.force = force
58 58 # revs to be pushed (None is "all")
59 59 self.revs = revs
60 60 # allow push of new branch
61 61 self.newbranch = newbranch
62 62 # did a local lock get acquired?
63 63 self.locallocked = None
64 64 # step already performed
65 65 # (used to check what steps have been already performed through bundle2)
66 66 self.stepsdone = set()
67 67 # Integer version of the push result
68 68 # - None means nothing to push
69 69 # - 0 means HTTP error
70 70 # - 1 means we pushed and remote head count is unchanged *or*
71 71 # we have outgoing changesets but refused to push
72 72 # - other values as described by addchangegroup()
73 73 self.ret = None
74 74 # discover.outgoing object (contains common and outgoing data)
75 75 self.outgoing = None
76 76 # all remote heads before the push
77 77 self.remoteheads = None
78 78 # testable as a boolean indicating if any nodes are missing locally.
79 79 self.incoming = None
80 80 # set of all heads common after changeset bundle push
81 81 self.commonheads = None
82 82
83 83 def push(repo, remote, force=False, revs=None, newbranch=False):
84 84 '''Push outgoing changesets (limited by revs) from a local
85 85 repository to remote. Return an integer:
86 86 - None means nothing to push
87 87 - 0 means HTTP error
88 88 - 1 means we pushed and remote head count is unchanged *or*
89 89 we have outgoing changesets but refused to push
90 90 - other values as described by addchangegroup()
91 91 '''
92 92 pushop = pushoperation(repo, remote, force, revs, newbranch)
93 93 if pushop.remote.local():
94 94 missing = (set(pushop.repo.requirements)
95 95 - pushop.remote.local().supported)
96 96 if missing:
97 97 msg = _("required features are not"
98 98 " supported in the destination:"
99 99 " %s") % (', '.join(sorted(missing)))
100 100 raise util.Abort(msg)
101 101
102 102 # there are two ways to push to remote repo:
103 103 #
104 104 # addchangegroup assumes local user can lock remote
105 105 # repo (local filesystem, old ssh servers).
106 106 #
107 107 # unbundle assumes local user cannot lock remote repo (new ssh
108 108 # servers, http servers).
109 109
110 110 if not pushop.remote.canpush():
111 111 raise util.Abort(_("destination does not support push"))
112 112 # get local lock as we might write phase data
113 113 locallock = None
114 114 try:
115 115 locallock = pushop.repo.lock()
116 116 pushop.locallocked = True
117 117 except IOError, err:
118 118 pushop.locallocked = False
119 119 if err.errno != errno.EACCES:
120 120 raise
121 121 # source repo cannot be locked.
122 122 # We do not abort the push, but just disable the local phase
123 123 # synchronisation.
124 124 msg = 'cannot lock source repository: %s\n' % err
125 125 pushop.ui.debug(msg)
126 126 try:
127 127 pushop.repo.checkpush(pushop)
128 128 lock = None
129 129 unbundle = pushop.remote.capable('unbundle')
130 130 if not unbundle:
131 131 lock = pushop.remote.lock()
132 132 try:
133 133 _pushdiscovery(pushop)
134 if _pushcheckoutgoing(pushop):
135 pushop.repo.prepushoutgoinghooks(pushop.repo,
136 pushop.remote,
137 pushop.outgoing)
138 if (pushop.repo.ui.configbool('experimental', 'bundle2-exp',
139 False)
140 and pushop.remote.capable('bundle2-exp')):
141 _pushbundle2(pushop)
142 _pushchangeset(pushop)
134 if (pushop.repo.ui.configbool('experimental', 'bundle2-exp',
135 False)
136 and pushop.remote.capable('bundle2-exp')):
137 _pushbundle2(pushop)
138 _pushchangeset(pushop)
143 139 _pushcomputecommonheads(pushop)
144 140 _pushsyncphase(pushop)
145 141 _pushobsolete(pushop)
146 142 finally:
147 143 if lock is not None:
148 144 lock.release()
149 145 finally:
150 146 if locallock is not None:
151 147 locallock.release()
152 148
153 149 _pushbookmark(pushop)
154 150 return pushop.ret
155 151
156 152 def _pushdiscovery(pushop):
157 153 # discovery
158 154 unfi = pushop.repo.unfiltered()
159 155 fci = discovery.findcommonincoming
160 156 commoninc = fci(unfi, pushop.remote, force=pushop.force)
161 157 common, inc, remoteheads = commoninc
162 158 fco = discovery.findcommonoutgoing
163 159 outgoing = fco(unfi, pushop.remote, onlyheads=pushop.revs,
164 160 commoninc=commoninc, force=pushop.force)
165 161 pushop.outgoing = outgoing
166 162 pushop.remoteheads = remoteheads
167 163 pushop.incoming = inc
168 164
169 165 def _pushcheckoutgoing(pushop):
170 166 outgoing = pushop.outgoing
171 167 unfi = pushop.repo.unfiltered()
172 168 if not outgoing.missing:
173 169 # nothing to push
174 170 scmutil.nochangesfound(unfi.ui, unfi, outgoing.excluded)
175 171 return False
176 172 # something to push
177 173 if not pushop.force:
178 174 # if repo.obsstore == False --> no obsolete
179 175 # then, save the iteration
180 176 if unfi.obsstore:
181 177 # this message are here for 80 char limit reason
182 178 mso = _("push includes obsolete changeset: %s!")
183 179 mst = "push includes %s changeset: %s!"
184 180 # plain versions for i18n tool to detect them
185 181 _("push includes unstable changeset: %s!")
186 182 _("push includes bumped changeset: %s!")
187 183 _("push includes divergent changeset: %s!")
188 184 # If we are to push if there is at least one
189 185 # obsolete or unstable changeset in missing, at
190 186 # least one of the missinghead will be obsolete or
191 187 # unstable. So checking heads only is ok
192 188 for node in outgoing.missingheads:
193 189 ctx = unfi[node]
194 190 if ctx.obsolete():
195 191 raise util.Abort(mso % ctx)
196 192 elif ctx.troubled():
197 193 raise util.Abort(_(mst)
198 194 % (ctx.troubles()[0],
199 195 ctx))
200 196 newbm = pushop.ui.configlist('bookmarks', 'pushing')
201 197 discovery.checkheads(unfi, pushop.remote, outgoing,
202 198 pushop.remoteheads,
203 199 pushop.newbranch,
204 200 bool(pushop.incoming),
205 201 newbm)
206 202 return True
207 203
208 204 def _pushb2ctx(pushop, bundler):
209 205 """handle changegroup push through bundle2
210 206
211 207 addchangegroup result is stored in the ``pushop.ret`` attribute.
212 208 """
213 209 if 'changesets' in pushop.stepsdone:
214 210 return
215 211 pushop.stepsdone.add('changesets')
216 212 # Send known heads to the server for race detection.
213 pushop.stepsdone.add('changesets')
214 if not _pushcheckoutgoing(pushop):
215 return
216 pushop.repo.prepushoutgoinghooks(pushop.repo,
217 pushop.remote,
218 pushop.outgoing)
217 219 if not pushop.force:
218 220 bundler.newpart('B2X:CHECK:HEADS', data=iter(pushop.remoteheads))
219 221 cg = changegroup.getlocalbundle(pushop.repo, 'push', pushop.outgoing)
220 222 cgpart = bundler.newpart('B2X:CHANGEGROUP', data=cg.getchunks())
221 223 def handlereply(op):
222 224 """extract addchangroup returns from server reply"""
223 225 cgreplies = op.records.getreplies(cgpart.id)
224 226 assert len(cgreplies['changegroup']) == 1
225 227 pushop.ret = cgreplies['changegroup'][0]['return']
226 228 return handlereply
227 229
228 230 def _pushbundle2(pushop):
229 231 """push data to the remote using bundle2
230 232
231 233 The only currently supported type of data is changegroup but this will
232 234 evolve in the future."""
233 235 bundler = bundle2.bundle20(pushop.ui, bundle2.bundle2caps(pushop.remote))
234 236 # create reply capability
235 237 capsblob = bundle2.encodecaps(pushop.repo.bundle2caps)
236 238 bundler.newpart('b2x:replycaps', data=capsblob)
237 239 extrainfo = _pushbundle2extraparts(pushop, bundler)
238 240 # add the changegroup bundle
239 241 cgreplyhandler = _pushb2ctx(pushop, bundler)
242 # do not push if no other parts than the capability
243 if bundler.nbparts <= 1:
244 return
240 245 stream = util.chunkbuffer(bundler.getchunks())
241 246 try:
242 247 reply = pushop.remote.unbundle(stream, ['force'], 'push')
243 248 except error.BundleValueError, exc:
244 249 raise util.Abort('missing support for %s' % exc)
245 250 try:
246 251 op = bundle2.processbundle(pushop.repo, reply)
247 252 except error.BundleValueError, exc:
248 253 raise util.Abort('missing support for %s' % exc)
249 254 cgreplyhandler(op)
250 255 _pushbundle2extrareply(pushop, op, extrainfo)
251 256
252 257 def _pushbundle2extraparts(pushop, bundler):
253 258 """hook function to let extensions add parts
254 259
255 260 Return a dict to let extensions pass data to the reply processing.
256 261 """
257 262 return {}
258 263
259 264 def _pushbundle2extrareply(pushop, op, extrainfo):
260 265 """hook function to let extensions react to part replies
261 266
262 267 The dict from _pushbundle2extrareply is fed to this function.
263 268 """
264 269 pass
265 270
266 271 def _pushchangeset(pushop):
267 272 """Make the actual push of changeset bundle to remote repo"""
268 273 if 'changesets' in pushop.stepsdone:
269 274 return
270 275 pushop.stepsdone.add('changesets')
276 if not _pushcheckoutgoing(pushop):
277 return
278 pushop.repo.prepushoutgoinghooks(pushop.repo,
279 pushop.remote,
280 pushop.outgoing)
271 281 outgoing = pushop.outgoing
272 282 unbundle = pushop.remote.capable('unbundle')
273 283 # TODO: get bundlecaps from remote
274 284 bundlecaps = None
275 285 # create a changegroup from local
276 286 if pushop.revs is None and not (outgoing.excluded
277 287 or pushop.repo.changelog.filteredrevs):
278 288 # push everything,
279 289 # use the fast path, no race possible on push
280 290 bundler = changegroup.bundle10(pushop.repo, bundlecaps)
281 291 cg = changegroup.getsubset(pushop.repo,
282 292 outgoing,
283 293 bundler,
284 294 'push',
285 295 fastpath=True)
286 296 else:
287 297 cg = changegroup.getlocalbundle(pushop.repo, 'push', outgoing,
288 298 bundlecaps)
289 299
290 300 # apply changegroup to remote
291 301 if unbundle:
292 302 # local repo finds heads on server, finds out what
293 303 # revs it must push. once revs transferred, if server
294 304 # finds it has different heads (someone else won
295 305 # commit/push race), server aborts.
296 306 if pushop.force:
297 307 remoteheads = ['force']
298 308 else:
299 309 remoteheads = pushop.remoteheads
300 310 # ssh: return remote's addchangegroup()
301 311 # http: return remote's addchangegroup() or 0 for error
302 312 pushop.ret = pushop.remote.unbundle(cg, remoteheads,
303 313 pushop.repo.url())
304 314 else:
305 315 # we return an integer indicating remote head count
306 316 # change
307 317 pushop.ret = pushop.remote.addchangegroup(cg, 'push', pushop.repo.url())
308 318
309 319 def _pushcomputecommonheads(pushop):
310 320 unfi = pushop.repo.unfiltered()
311 321 if pushop.ret:
312 322 # push succeed, synchronize target of the push
313 323 cheads = pushop.outgoing.missingheads
314 324 elif pushop.revs is None:
315 325 # All out push fails. synchronize all common
316 326 cheads = pushop.outgoing.commonheads
317 327 else:
318 328 # I want cheads = heads(::missingheads and ::commonheads)
319 329 # (missingheads is revs with secret changeset filtered out)
320 330 #
321 331 # This can be expressed as:
322 332 # cheads = ( (missingheads and ::commonheads)
323 333 # + (commonheads and ::missingheads))"
324 334 # )
325 335 #
326 336 # while trying to push we already computed the following:
327 337 # common = (::commonheads)
328 338 # missing = ((commonheads::missingheads) - commonheads)
329 339 #
330 340 # We can pick:
331 341 # * missingheads part of common (::commonheads)
332 342 common = set(pushop.outgoing.common)
333 343 nm = pushop.repo.changelog.nodemap
334 344 cheads = [node for node in pushop.revs if nm[node] in common]
335 345 # and
336 346 # * commonheads parents on missing
337 347 revset = unfi.set('%ln and parents(roots(%ln))',
338 348 pushop.outgoing.commonheads,
339 349 pushop.outgoing.missing)
340 350 cheads.extend(c.node() for c in revset)
341 351 pushop.commonheads = cheads
342 352
343 353 def _pushsyncphase(pushop):
344 354 """synchronise phase information locally and remotely"""
345 355 unfi = pushop.repo.unfiltered()
346 356 cheads = pushop.commonheads
347 357 # even when we don't push, exchanging phase data is useful
348 358 remotephases = pushop.remote.listkeys('phases')
349 359 if (pushop.ui.configbool('ui', '_usedassubrepo', False)
350 360 and remotephases # server supports phases
351 361 and pushop.ret is None # nothing was pushed
352 362 and remotephases.get('publishing', False)):
353 363 # When:
354 364 # - this is a subrepo push
355 365 # - and remote support phase
356 366 # - and no changeset was pushed
357 367 # - and remote is publishing
358 368 # We may be in issue 3871 case!
359 369 # We drop the possible phase synchronisation done by
360 370 # courtesy to publish changesets possibly locally draft
361 371 # on the remote.
362 372 remotephases = {'publishing': 'True'}
363 373 if not remotephases: # old server or public only reply from non-publishing
364 374 _localphasemove(pushop, cheads)
365 375 # don't push any phase data as there is nothing to push
366 376 else:
367 377 ana = phases.analyzeremotephases(pushop.repo, cheads,
368 378 remotephases)
369 379 pheads, droots = ana
370 380 ### Apply remote phase on local
371 381 if remotephases.get('publishing', False):
372 382 _localphasemove(pushop, cheads)
373 383 else: # publish = False
374 384 _localphasemove(pushop, pheads)
375 385 _localphasemove(pushop, cheads, phases.draft)
376 386 ### Apply local phase on remote
377 387
378 388 # Get the list of all revs draft on remote by public here.
379 389 # XXX Beware that revset break if droots is not strictly
380 390 # XXX root we may want to ensure it is but it is costly
381 391 outdated = unfi.set('heads((%ln::%ln) and public())',
382 392 droots, cheads)
383 393
384 394 b2caps = bundle2.bundle2caps(pushop.remote)
385 395 if 'b2x:pushkey' in b2caps:
386 396 # server supports bundle2, let's do a batched push through it
387 397 #
388 398 # This will eventually be unified with the changesets bundle2 push
389 399 bundler = bundle2.bundle20(pushop.ui, b2caps)
390 400 capsblob = bundle2.encodecaps(pushop.repo.bundle2caps)
391 401 bundler.newpart('b2x:replycaps', data=capsblob)
392 402 part2node = []
393 403 enc = pushkey.encode
394 404 for newremotehead in outdated:
395 405 part = bundler.newpart('b2x:pushkey')
396 406 part.addparam('namespace', enc('phases'))
397 407 part.addparam('key', enc(newremotehead.hex()))
398 408 part.addparam('old', enc(str(phases.draft)))
399 409 part.addparam('new', enc(str(phases.public)))
400 410 part2node.append((part.id, newremotehead))
401 411 stream = util.chunkbuffer(bundler.getchunks())
402 412 try:
403 413 reply = pushop.remote.unbundle(stream, ['force'], 'push')
404 414 op = bundle2.processbundle(pushop.repo, reply)
405 415 except error.BundleValueError, exc:
406 416 raise util.Abort('missing support for %s' % exc)
407 417 for partid, node in part2node:
408 418 partrep = op.records.getreplies(partid)
409 419 results = partrep['pushkey']
410 420 assert len(results) <= 1
411 421 msg = None
412 422 if not results:
413 423 msg = _('server ignored update of %s to public!\n') % node
414 424 elif not int(results[0]['return']):
415 425 msg = _('updating %s to public failed!\n') % node
416 426 if msg is not None:
417 427 pushop.ui.warn(msg)
418 428
419 429 else:
420 430 # fallback to independant pushkey command
421 431 for newremotehead in outdated:
422 432 r = pushop.remote.pushkey('phases',
423 433 newremotehead.hex(),
424 434 str(phases.draft),
425 435 str(phases.public))
426 436 if not r:
427 437 pushop.ui.warn(_('updating %s to public failed!\n')
428 438 % newremotehead)
429 439
430 440 def _localphasemove(pushop, nodes, phase=phases.public):
431 441 """move <nodes> to <phase> in the local source repo"""
432 442 if pushop.locallocked:
433 443 phases.advanceboundary(pushop.repo, phase, nodes)
434 444 else:
435 445 # repo is not locked, do not change any phases!
436 446 # Informs the user that phases should have been moved when
437 447 # applicable.
438 448 actualmoves = [n for n in nodes if phase < pushop.repo[n].phase()]
439 449 phasestr = phases.phasenames[phase]
440 450 if actualmoves:
441 451 pushop.ui.status(_('cannot lock source repo, skipping '
442 452 'local %s phase update\n') % phasestr)
443 453
444 454 def _pushobsolete(pushop):
445 455 """utility function to push obsolete markers to a remote"""
446 456 pushop.ui.debug('try to push obsolete markers to remote\n')
447 457 repo = pushop.repo
448 458 remote = pushop.remote
449 459 if (obsolete._enabled and repo.obsstore and
450 460 'obsolete' in remote.listkeys('namespaces')):
451 461 rslts = []
452 462 remotedata = repo.listkeys('obsolete')
453 463 for key in sorted(remotedata, reverse=True):
454 464 # reverse sort to ensure we end with dump0
455 465 data = remotedata[key]
456 466 rslts.append(remote.pushkey('obsolete', key, '', data))
457 467 if [r for r in rslts if not r]:
458 468 msg = _('failed to push some obsolete markers!\n')
459 469 repo.ui.warn(msg)
460 470
461 471 def _pushbookmark(pushop):
462 472 """Update bookmark position on remote"""
463 473 ui = pushop.ui
464 474 repo = pushop.repo.unfiltered()
465 475 remote = pushop.remote
466 476 ui.debug("checking for updated bookmarks\n")
467 477 revnums = map(repo.changelog.rev, pushop.revs or [])
468 478 ancestors = [a for a in repo.changelog.ancestors(revnums, inclusive=True)]
469 479 (addsrc, adddst, advsrc, advdst, diverge, differ, invalid
470 480 ) = bookmarks.compare(repo, repo._bookmarks, remote.listkeys('bookmarks'),
471 481 srchex=hex)
472 482
473 483 for b, scid, dcid in advsrc:
474 484 if ancestors and repo[scid].rev() not in ancestors:
475 485 continue
476 486 if remote.pushkey('bookmarks', b, dcid, scid):
477 487 ui.status(_("updating bookmark %s\n") % b)
478 488 else:
479 489 ui.warn(_('updating bookmark %s failed!\n') % b)
480 490
481 491 class pulloperation(object):
482 492 """A object that represent a single pull operation
483 493
484 494 It purpose is to carry push related state and very common operation.
485 495
486 496 A new should be created at the beginning of each pull and discarded
487 497 afterward.
488 498 """
489 499
490 500 def __init__(self, repo, remote, heads=None, force=False):
491 501 # repo we pull into
492 502 self.repo = repo
493 503 # repo we pull from
494 504 self.remote = remote
495 505 # revision we try to pull (None is "all")
496 506 self.heads = heads
497 507 # do we force pull?
498 508 self.force = force
499 509 # the name the pull transaction
500 510 self._trname = 'pull\n' + util.hidepassword(remote.url())
501 511 # hold the transaction once created
502 512 self._tr = None
503 513 # set of common changeset between local and remote before pull
504 514 self.common = None
505 515 # set of pulled head
506 516 self.rheads = None
507 517 # list of missing changeset to fetch remotely
508 518 self.fetch = None
509 519 # result of changegroup pulling (used as return code by pull)
510 520 self.cgresult = None
511 521 # list of step remaining todo (related to future bundle2 usage)
512 522 self.todosteps = set(['changegroup', 'phases', 'obsmarkers'])
513 523
514 524 @util.propertycache
515 525 def pulledsubset(self):
516 526 """heads of the set of changeset target by the pull"""
517 527 # compute target subset
518 528 if self.heads is None:
519 529 # We pulled every thing possible
520 530 # sync on everything common
521 531 c = set(self.common)
522 532 ret = list(self.common)
523 533 for n in self.rheads:
524 534 if n not in c:
525 535 ret.append(n)
526 536 return ret
527 537 else:
528 538 # We pulled a specific subset
529 539 # sync on this subset
530 540 return self.heads
531 541
532 542 def gettransaction(self):
533 543 """get appropriate pull transaction, creating it if needed"""
534 544 if self._tr is None:
535 545 self._tr = self.repo.transaction(self._trname)
536 546 return self._tr
537 547
538 548 def closetransaction(self):
539 549 """close transaction if created"""
540 550 if self._tr is not None:
541 551 self._tr.close()
542 552
543 553 def releasetransaction(self):
544 554 """release transaction if created"""
545 555 if self._tr is not None:
546 556 self._tr.release()
547 557
548 558 def pull(repo, remote, heads=None, force=False):
549 559 pullop = pulloperation(repo, remote, heads, force)
550 560 if pullop.remote.local():
551 561 missing = set(pullop.remote.requirements) - pullop.repo.supported
552 562 if missing:
553 563 msg = _("required features are not"
554 564 " supported in the destination:"
555 565 " %s") % (', '.join(sorted(missing)))
556 566 raise util.Abort(msg)
557 567
558 568 lock = pullop.repo.lock()
559 569 try:
560 570 _pulldiscovery(pullop)
561 571 if (pullop.repo.ui.configbool('experimental', 'bundle2-exp', False)
562 572 and pullop.remote.capable('bundle2-exp')):
563 573 _pullbundle2(pullop)
564 574 if 'changegroup' in pullop.todosteps:
565 575 _pullchangeset(pullop)
566 576 if 'phases' in pullop.todosteps:
567 577 _pullphase(pullop)
568 578 if 'obsmarkers' in pullop.todosteps:
569 579 _pullobsolete(pullop)
570 580 pullop.closetransaction()
571 581 finally:
572 582 pullop.releasetransaction()
573 583 lock.release()
574 584
575 585 return pullop.cgresult
576 586
577 587 def _pulldiscovery(pullop):
578 588 """discovery phase for the pull
579 589
580 590 Current handle changeset discovery only, will change handle all discovery
581 591 at some point."""
582 592 tmp = discovery.findcommonincoming(pullop.repo.unfiltered(),
583 593 pullop.remote,
584 594 heads=pullop.heads,
585 595 force=pullop.force)
586 596 pullop.common, pullop.fetch, pullop.rheads = tmp
587 597
588 598 def _pullbundle2(pullop):
589 599 """pull data using bundle2
590 600
591 601 For now, the only supported data are changegroup."""
592 602 remotecaps = bundle2.bundle2caps(pullop.remote)
593 603 kwargs = {'bundlecaps': caps20to10(pullop.repo)}
594 604 # pulling changegroup
595 605 pullop.todosteps.remove('changegroup')
596 606
597 607 kwargs['common'] = pullop.common
598 608 kwargs['heads'] = pullop.heads or pullop.rheads
599 609 if 'b2x:listkeys' in remotecaps:
600 610 kwargs['listkeys'] = ['phase']
601 611 if not pullop.fetch:
602 612 pullop.repo.ui.status(_("no changes found\n"))
603 613 pullop.cgresult = 0
604 614 else:
605 615 if pullop.heads is None and list(pullop.common) == [nullid]:
606 616 pullop.repo.ui.status(_("requesting all changes\n"))
607 617 _pullbundle2extraprepare(pullop, kwargs)
608 618 if kwargs.keys() == ['format']:
609 619 return # nothing to pull
610 620 bundle = pullop.remote.getbundle('pull', **kwargs)
611 621 try:
612 622 op = bundle2.processbundle(pullop.repo, bundle, pullop.gettransaction)
613 623 except error.BundleValueError, exc:
614 624 raise util.Abort('missing support for %s' % exc)
615 625
616 626 if pullop.fetch:
617 627 assert len(op.records['changegroup']) == 1
618 628 pullop.cgresult = op.records['changegroup'][0]['return']
619 629
620 630 # processing phases change
621 631 for namespace, value in op.records['listkeys']:
622 632 if namespace == 'phases':
623 633 _pullapplyphases(pullop, value)
624 634
625 635 def _pullbundle2extraprepare(pullop, kwargs):
626 636 """hook function so that extensions can extend the getbundle call"""
627 637 pass
628 638
629 639 def _pullchangeset(pullop):
630 640 """pull changeset from unbundle into the local repo"""
631 641 # We delay the open of the transaction as late as possible so we
632 642 # don't open transaction for nothing or you break future useful
633 643 # rollback call
634 644 pullop.todosteps.remove('changegroup')
635 645 if not pullop.fetch:
636 646 pullop.repo.ui.status(_("no changes found\n"))
637 647 pullop.cgresult = 0
638 648 return
639 649 pullop.gettransaction()
640 650 if pullop.heads is None and list(pullop.common) == [nullid]:
641 651 pullop.repo.ui.status(_("requesting all changes\n"))
642 652 elif pullop.heads is None and pullop.remote.capable('changegroupsubset'):
643 653 # issue1320, avoid a race if remote changed after discovery
644 654 pullop.heads = pullop.rheads
645 655
646 656 if pullop.remote.capable('getbundle'):
647 657 # TODO: get bundlecaps from remote
648 658 cg = pullop.remote.getbundle('pull', common=pullop.common,
649 659 heads=pullop.heads or pullop.rheads)
650 660 elif pullop.heads is None:
651 661 cg = pullop.remote.changegroup(pullop.fetch, 'pull')
652 662 elif not pullop.remote.capable('changegroupsubset'):
653 663 raise util.Abort(_("partial pull cannot be done because "
654 664 "other repository doesn't support "
655 665 "changegroupsubset."))
656 666 else:
657 667 cg = pullop.remote.changegroupsubset(pullop.fetch, pullop.heads, 'pull')
658 668 pullop.cgresult = changegroup.addchangegroup(pullop.repo, cg, 'pull',
659 669 pullop.remote.url())
660 670
661 671 def _pullphase(pullop):
662 672 # Get remote phases data from remote
663 673 remotephases = pullop.remote.listkeys('phases')
664 674 _pullapplyphases(pullop, remotephases)
665 675
666 676 def _pullapplyphases(pullop, remotephases):
667 677 """apply phase movement from observed remote state"""
668 678 pullop.todosteps.remove('phases')
669 679 publishing = bool(remotephases.get('publishing', False))
670 680 if remotephases and not publishing:
671 681 # remote is new and unpublishing
672 682 pheads, _dr = phases.analyzeremotephases(pullop.repo,
673 683 pullop.pulledsubset,
674 684 remotephases)
675 685 phases.advanceboundary(pullop.repo, phases.public, pheads)
676 686 phases.advanceboundary(pullop.repo, phases.draft,
677 687 pullop.pulledsubset)
678 688 else:
679 689 # Remote is old or publishing all common changesets
680 690 # should be seen as public
681 691 phases.advanceboundary(pullop.repo, phases.public,
682 692 pullop.pulledsubset)
683 693
684 694 def _pullobsolete(pullop):
685 695 """utility function to pull obsolete markers from a remote
686 696
687 697 The `gettransaction` is function that return the pull transaction, creating
688 698 one if necessary. We return the transaction to inform the calling code that
689 699 a new transaction have been created (when applicable).
690 700
691 701 Exists mostly to allow overriding for experimentation purpose"""
692 702 pullop.todosteps.remove('obsmarkers')
693 703 tr = None
694 704 if obsolete._enabled:
695 705 pullop.repo.ui.debug('fetching remote obsolete markers\n')
696 706 remoteobs = pullop.remote.listkeys('obsolete')
697 707 if 'dump0' in remoteobs:
698 708 tr = pullop.gettransaction()
699 709 for key in sorted(remoteobs, reverse=True):
700 710 if key.startswith('dump'):
701 711 data = base85.b85decode(remoteobs[key])
702 712 pullop.repo.obsstore.mergemarkers(tr, data)
703 713 pullop.repo.invalidatevolatilesets()
704 714 return tr
705 715
706 716 def caps20to10(repo):
707 717 """return a set with appropriate options to use bundle20 during getbundle"""
708 718 caps = set(['HG2X'])
709 719 capsblob = bundle2.encodecaps(repo.bundle2caps)
710 720 caps.add('bundle2=' + urllib.quote(capsblob))
711 721 return caps
712 722
713 723 def getbundle(repo, source, heads=None, common=None, bundlecaps=None,
714 724 **kwargs):
715 725 """return a full bundle (with potentially multiple kind of parts)
716 726
717 727 Could be a bundle HG10 or a bundle HG2X depending on bundlecaps
718 728 passed. For now, the bundle can contain only changegroup, but this will
719 729 changes when more part type will be available for bundle2.
720 730
721 731 This is different from changegroup.getbundle that only returns an HG10
722 732 changegroup bundle. They may eventually get reunited in the future when we
723 733 have a clearer idea of the API we what to query different data.
724 734
725 735 The implementation is at a very early stage and will get massive rework
726 736 when the API of bundle is refined.
727 737 """
728 738 # build changegroup bundle here.
729 739 cg = changegroup.getbundle(repo, source, heads=heads,
730 740 common=common, bundlecaps=bundlecaps)
731 741 if bundlecaps is None or 'HG2X' not in bundlecaps:
732 742 if kwargs:
733 743 raise ValueError(_('unsupported getbundle arguments: %s')
734 744 % ', '.join(sorted(kwargs.keys())))
735 745 return cg
736 746 # very crude first implementation,
737 747 # the bundle API will change and the generation will be done lazily.
738 748 b2caps = {}
739 749 for bcaps in bundlecaps:
740 750 if bcaps.startswith('bundle2='):
741 751 blob = urllib.unquote(bcaps[len('bundle2='):])
742 752 b2caps.update(bundle2.decodecaps(blob))
743 753 bundler = bundle2.bundle20(repo.ui, b2caps)
744 754 if cg:
745 755 bundler.newpart('b2x:changegroup', data=cg.getchunks())
746 756 listkeys = kwargs.get('listkeys', ())
747 757 for namespace in listkeys:
748 758 part = bundler.newpart('b2x:listkeys')
749 759 part.addparam('namespace', namespace)
750 760 keys = repo.listkeys(namespace).items()
751 761 part.data = pushkey.encodekeys(keys)
752 762 _getbundleextrapart(bundler, repo, source, heads=heads, common=common,
753 763 bundlecaps=bundlecaps, **kwargs)
754 764 return util.chunkbuffer(bundler.getchunks())
755 765
756 766 def _getbundleextrapart(bundler, repo, source, heads=None, common=None,
757 767 bundlecaps=None, **kwargs):
758 768 """hook function to let extensions add parts to the requested bundle"""
759 769 pass
760 770
761 771 def check_heads(repo, their_heads, context):
762 772 """check if the heads of a repo have been modified
763 773
764 774 Used by peer for unbundling.
765 775 """
766 776 heads = repo.heads()
767 777 heads_hash = util.sha1(''.join(sorted(heads))).digest()
768 778 if not (their_heads == ['force'] or their_heads == heads or
769 779 their_heads == ['hashed', heads_hash]):
770 780 # someone else committed/pushed/unbundled while we
771 781 # were transferring data
772 782 raise error.PushRaced('repository changed while %s - '
773 783 'please try again' % context)
774 784
775 785 def unbundle(repo, cg, heads, source, url):
776 786 """Apply a bundle to a repo.
777 787
778 788 this function makes sure the repo is locked during the application and have
779 789 mechanism to check that no push race occurred between the creation of the
780 790 bundle and its application.
781 791
782 792 If the push was raced as PushRaced exception is raised."""
783 793 r = 0
784 794 # need a transaction when processing a bundle2 stream
785 795 tr = None
786 796 lock = repo.lock()
787 797 try:
788 798 check_heads(repo, heads, 'uploading changes')
789 799 # push can proceed
790 800 if util.safehasattr(cg, 'params'):
791 801 try:
792 802 tr = repo.transaction('unbundle')
793 803 tr.hookargs['bundle2-exp'] = '1'
794 804 r = bundle2.processbundle(repo, cg, lambda: tr).reply
795 805 cl = repo.unfiltered().changelog
796 806 p = cl.writepending() and repo.root or ""
797 807 repo.hook('b2x-pretransactionclose', throw=True, source=source,
798 808 url=url, pending=p, **tr.hookargs)
799 809 tr.close()
800 810 repo.hook('b2x-transactionclose', source=source, url=url,
801 811 **tr.hookargs)
802 812 except Exception, exc:
803 813 exc.duringunbundle2 = True
804 814 raise
805 815 else:
806 816 r = changegroup.addchangegroup(repo, cg, source, url)
807 817 finally:
808 818 if tr is not None:
809 819 tr.release()
810 820 lock.release()
811 821 return r
General Comments 0
You need to be logged in to leave comments. Login now