##// END OF EJS Templates
bundle2: allow extensions to plug into the push process...
Pierre-Yves David -
r21149:c0d96bce default
parent child Browse files
Show More
@@ -1,721 +1,737
1 1 # exchange.py - utility to exchange data between repos.
2 2 #
3 3 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from i18n import _
9 9 from node import hex, nullid
10 10 import errno, urllib
11 11 import util, scmutil, changegroup, base85
12 12 import discovery, phases, obsolete, bookmarks, bundle2
13 13
14 14 def readbundle(ui, fh, fname, vfs=None):
15 15 header = changegroup.readexactly(fh, 4)
16 16
17 17 alg = None
18 18 if not fname:
19 19 fname = "stream"
20 20 if not header.startswith('HG') and header.startswith('\0'):
21 21 fh = changegroup.headerlessfixup(fh, header)
22 22 header = "HG10"
23 23 alg = 'UN'
24 24 elif vfs:
25 25 fname = vfs.join(fname)
26 26
27 27 magic, version = header[0:2], header[2:4]
28 28
29 29 if magic != 'HG':
30 30 raise util.Abort(_('%s: not a Mercurial bundle') % fname)
31 31 if version == '10':
32 32 if alg is None:
33 33 alg = changegroup.readexactly(fh, 2)
34 34 return changegroup.unbundle10(fh, alg)
35 35 elif version == '2X':
36 36 return bundle2.unbundle20(ui, fh, header=magic + version)
37 37 else:
38 38 raise util.Abort(_('%s: unknown bundle version %s') % (fname, version))
39 39
40 40
41 41 class pushoperation(object):
42 42 """A object that represent a single push operation
43 43
44 44 It purpose is to carry push related state and very common operation.
45 45
46 46 A new should be created at the beginning of each push and discarded
47 47 afterward.
48 48 """
49 49
50 50 def __init__(self, repo, remote, force=False, revs=None, newbranch=False):
51 51 # repo we push from
52 52 self.repo = repo
53 53 self.ui = repo.ui
54 54 # repo we push to
55 55 self.remote = remote
56 56 # force option provided
57 57 self.force = force
58 58 # revs to be pushed (None is "all")
59 59 self.revs = revs
60 60 # allow push of new branch
61 61 self.newbranch = newbranch
62 62 # did a local lock get acquired?
63 63 self.locallocked = None
64 64 # Integer version of the push result
65 65 # - None means nothing to push
66 66 # - 0 means HTTP error
67 67 # - 1 means we pushed and remote head count is unchanged *or*
68 68 # we have outgoing changesets but refused to push
69 69 # - other values as described by addchangegroup()
70 70 self.ret = None
71 71 # discover.outgoing object (contains common and outgoing data)
72 72 self.outgoing = None
73 73 # all remote heads before the push
74 74 self.remoteheads = None
75 75 # testable as a boolean indicating if any nodes are missing locally.
76 76 self.incoming = None
77 77 # set of all heads common after changeset bundle push
78 78 self.commonheads = None
79 79
80 80 def push(repo, remote, force=False, revs=None, newbranch=False):
81 81 '''Push outgoing changesets (limited by revs) from a local
82 82 repository to remote. Return an integer:
83 83 - None means nothing to push
84 84 - 0 means HTTP error
85 85 - 1 means we pushed and remote head count is unchanged *or*
86 86 we have outgoing changesets but refused to push
87 87 - other values as described by addchangegroup()
88 88 '''
89 89 pushop = pushoperation(repo, remote, force, revs, newbranch)
90 90 if pushop.remote.local():
91 91 missing = (set(pushop.repo.requirements)
92 92 - pushop.remote.local().supported)
93 93 if missing:
94 94 msg = _("required features are not"
95 95 " supported in the destination:"
96 96 " %s") % (', '.join(sorted(missing)))
97 97 raise util.Abort(msg)
98 98
99 99 # there are two ways to push to remote repo:
100 100 #
101 101 # addchangegroup assumes local user can lock remote
102 102 # repo (local filesystem, old ssh servers).
103 103 #
104 104 # unbundle assumes local user cannot lock remote repo (new ssh
105 105 # servers, http servers).
106 106
107 107 if not pushop.remote.canpush():
108 108 raise util.Abort(_("destination does not support push"))
109 109 # get local lock as we might write phase data
110 110 locallock = None
111 111 try:
112 112 locallock = pushop.repo.lock()
113 113 pushop.locallocked = True
114 114 except IOError, err:
115 115 pushop.locallocked = False
116 116 if err.errno != errno.EACCES:
117 117 raise
118 118 # source repo cannot be locked.
119 119 # We do not abort the push, but just disable the local phase
120 120 # synchronisation.
121 121 msg = 'cannot lock source repository: %s\n' % err
122 122 pushop.ui.debug(msg)
123 123 try:
124 124 pushop.repo.checkpush(pushop)
125 125 lock = None
126 126 unbundle = pushop.remote.capable('unbundle')
127 127 if not unbundle:
128 128 lock = pushop.remote.lock()
129 129 try:
130 130 _pushdiscovery(pushop)
131 131 if _pushcheckoutgoing(pushop):
132 132 pushop.repo.prepushoutgoinghooks(pushop.repo,
133 133 pushop.remote,
134 134 pushop.outgoing)
135 135 if (pushop.repo.ui.configbool('experimental', 'bundle2-exp',
136 136 False)
137 137 and pushop.remote.capable('bundle2-exp')):
138 138 _pushbundle2(pushop)
139 139 else:
140 140 _pushchangeset(pushop)
141 141 _pushcomputecommonheads(pushop)
142 142 _pushsyncphase(pushop)
143 143 _pushobsolete(pushop)
144 144 finally:
145 145 if lock is not None:
146 146 lock.release()
147 147 finally:
148 148 if locallock is not None:
149 149 locallock.release()
150 150
151 151 _pushbookmark(pushop)
152 152 return pushop.ret
153 153
154 154 def _pushdiscovery(pushop):
155 155 # discovery
156 156 unfi = pushop.repo.unfiltered()
157 157 fci = discovery.findcommonincoming
158 158 commoninc = fci(unfi, pushop.remote, force=pushop.force)
159 159 common, inc, remoteheads = commoninc
160 160 fco = discovery.findcommonoutgoing
161 161 outgoing = fco(unfi, pushop.remote, onlyheads=pushop.revs,
162 162 commoninc=commoninc, force=pushop.force)
163 163 pushop.outgoing = outgoing
164 164 pushop.remoteheads = remoteheads
165 165 pushop.incoming = inc
166 166
167 167 def _pushcheckoutgoing(pushop):
168 168 outgoing = pushop.outgoing
169 169 unfi = pushop.repo.unfiltered()
170 170 if not outgoing.missing:
171 171 # nothing to push
172 172 scmutil.nochangesfound(unfi.ui, unfi, outgoing.excluded)
173 173 return False
174 174 # something to push
175 175 if not pushop.force:
176 176 # if repo.obsstore == False --> no obsolete
177 177 # then, save the iteration
178 178 if unfi.obsstore:
179 179 # this message are here for 80 char limit reason
180 180 mso = _("push includes obsolete changeset: %s!")
181 181 mst = "push includes %s changeset: %s!"
182 182 # plain versions for i18n tool to detect them
183 183 _("push includes unstable changeset: %s!")
184 184 _("push includes bumped changeset: %s!")
185 185 _("push includes divergent changeset: %s!")
186 186 # If we are to push if there is at least one
187 187 # obsolete or unstable changeset in missing, at
188 188 # least one of the missinghead will be obsolete or
189 189 # unstable. So checking heads only is ok
190 190 for node in outgoing.missingheads:
191 191 ctx = unfi[node]
192 192 if ctx.obsolete():
193 193 raise util.Abort(mso % ctx)
194 194 elif ctx.troubled():
195 195 raise util.Abort(_(mst)
196 196 % (ctx.troubles()[0],
197 197 ctx))
198 198 newbm = pushop.ui.configlist('bookmarks', 'pushing')
199 199 discovery.checkheads(unfi, pushop.remote, outgoing,
200 200 pushop.remoteheads,
201 201 pushop.newbranch,
202 202 bool(pushop.incoming),
203 203 newbm)
204 204 return True
205 205
206 206 def _pushbundle2(pushop):
207 207 """push data to the remote using bundle2
208 208
209 209 The only currently supported type of data is changegroup but this will
210 210 evolve in the future."""
211 211 # Send known head to the server for race detection.
212 212 capsblob = urllib.unquote(pushop.remote.capable('bundle2-exp'))
213 213 caps = bundle2.decodecaps(capsblob)
214 214 bundler = bundle2.bundle20(pushop.ui, caps)
215 215 # create reply capability
216 216 capsblob = bundle2.encodecaps(pushop.repo.bundle2caps)
217 217 bundler.addpart(bundle2.bundlepart('b2x:replycaps', data=capsblob))
218 218 if not pushop.force:
219 219 part = bundle2.bundlepart('B2X:CHECK:HEADS',
220 220 data=iter(pushop.remoteheads))
221 221 bundler.addpart(part)
222 extrainfo = _pushbundle2extraparts(pushop, bundler)
222 223 # add the changegroup bundle
223 224 cg = changegroup.getlocalbundle(pushop.repo, 'push', pushop.outgoing)
224 225 cgpart = bundle2.bundlepart('B2X:CHANGEGROUP', data=cg.getchunks())
225 226 bundler.addpart(cgpart)
226 227 stream = util.chunkbuffer(bundler.getchunks())
227 228 reply = pushop.remote.unbundle(stream, ['force'], 'push')
228 229 try:
229 230 op = bundle2.processbundle(pushop.repo, reply)
230 231 except KeyError, exc:
231 232 raise util.Abort('missing support for %s' % exc)
232 233 cgreplies = op.records.getreplies(cgpart.id)
233 234 assert len(cgreplies['changegroup']) == 1
234 235 pushop.ret = cgreplies['changegroup'][0]['return']
236 _pushbundle2extrareply(pushop, op, extrainfo)
237
238 def _pushbundle2extraparts(pushop, bundler):
239 """hook function to let extensions add parts
240
241 Return a dict to let extensions pass data to the reply processing.
242 """
243 return {}
244
245 def _pushbundle2extrareply(pushop, op, extrainfo):
246 """hook function to let extensions react to part replies
247
248 The dict from _pushbundle2extrareply is fed to this function.
249 """
250 pass
235 251
236 252 def _pushchangeset(pushop):
237 253 """Make the actual push of changeset bundle to remote repo"""
238 254 outgoing = pushop.outgoing
239 255 unbundle = pushop.remote.capable('unbundle')
240 256 # TODO: get bundlecaps from remote
241 257 bundlecaps = None
242 258 # create a changegroup from local
243 259 if pushop.revs is None and not (outgoing.excluded
244 260 or pushop.repo.changelog.filteredrevs):
245 261 # push everything,
246 262 # use the fast path, no race possible on push
247 263 bundler = changegroup.bundle10(pushop.repo, bundlecaps)
248 264 cg = changegroup.getsubset(pushop.repo,
249 265 outgoing,
250 266 bundler,
251 267 'push',
252 268 fastpath=True)
253 269 else:
254 270 cg = changegroup.getlocalbundle(pushop.repo, 'push', outgoing,
255 271 bundlecaps)
256 272
257 273 # apply changegroup to remote
258 274 if unbundle:
259 275 # local repo finds heads on server, finds out what
260 276 # revs it must push. once revs transferred, if server
261 277 # finds it has different heads (someone else won
262 278 # commit/push race), server aborts.
263 279 if pushop.force:
264 280 remoteheads = ['force']
265 281 else:
266 282 remoteheads = pushop.remoteheads
267 283 # ssh: return remote's addchangegroup()
268 284 # http: return remote's addchangegroup() or 0 for error
269 285 pushop.ret = pushop.remote.unbundle(cg, remoteheads,
270 286 'push')
271 287 else:
272 288 # we return an integer indicating remote head count
273 289 # change
274 290 pushop.ret = pushop.remote.addchangegroup(cg, 'push', pushop.repo.url())
275 291
276 292 def _pushcomputecommonheads(pushop):
277 293 unfi = pushop.repo.unfiltered()
278 294 if pushop.ret:
279 295 # push succeed, synchronize target of the push
280 296 cheads = pushop.outgoing.missingheads
281 297 elif pushop.revs is None:
282 298 # All out push fails. synchronize all common
283 299 cheads = pushop.outgoing.commonheads
284 300 else:
285 301 # I want cheads = heads(::missingheads and ::commonheads)
286 302 # (missingheads is revs with secret changeset filtered out)
287 303 #
288 304 # This can be expressed as:
289 305 # cheads = ( (missingheads and ::commonheads)
290 306 # + (commonheads and ::missingheads))"
291 307 # )
292 308 #
293 309 # while trying to push we already computed the following:
294 310 # common = (::commonheads)
295 311 # missing = ((commonheads::missingheads) - commonheads)
296 312 #
297 313 # We can pick:
298 314 # * missingheads part of common (::commonheads)
299 315 common = set(pushop.outgoing.common)
300 316 nm = pushop.repo.changelog.nodemap
301 317 cheads = [node for node in pushop.revs if nm[node] in common]
302 318 # and
303 319 # * commonheads parents on missing
304 320 revset = unfi.set('%ln and parents(roots(%ln))',
305 321 pushop.outgoing.commonheads,
306 322 pushop.outgoing.missing)
307 323 cheads.extend(c.node() for c in revset)
308 324 pushop.commonheads = cheads
309 325
310 326 def _pushsyncphase(pushop):
311 327 """synchronise phase information locally and remotely"""
312 328 unfi = pushop.repo.unfiltered()
313 329 cheads = pushop.commonheads
314 330 if pushop.ret:
315 331 # push succeed, synchronize target of the push
316 332 cheads = pushop.outgoing.missingheads
317 333 elif pushop.revs is None:
318 334 # All out push fails. synchronize all common
319 335 cheads = pushop.outgoing.commonheads
320 336 else:
321 337 # I want cheads = heads(::missingheads and ::commonheads)
322 338 # (missingheads is revs with secret changeset filtered out)
323 339 #
324 340 # This can be expressed as:
325 341 # cheads = ( (missingheads and ::commonheads)
326 342 # + (commonheads and ::missingheads))"
327 343 # )
328 344 #
329 345 # while trying to push we already computed the following:
330 346 # common = (::commonheads)
331 347 # missing = ((commonheads::missingheads) - commonheads)
332 348 #
333 349 # We can pick:
334 350 # * missingheads part of common (::commonheads)
335 351 common = set(pushop.outgoing.common)
336 352 nm = pushop.repo.changelog.nodemap
337 353 cheads = [node for node in pushop.revs if nm[node] in common]
338 354 # and
339 355 # * commonheads parents on missing
340 356 revset = unfi.set('%ln and parents(roots(%ln))',
341 357 pushop.outgoing.commonheads,
342 358 pushop.outgoing.missing)
343 359 cheads.extend(c.node() for c in revset)
344 360 pushop.commonheads = cheads
345 361 # even when we don't push, exchanging phase data is useful
346 362 remotephases = pushop.remote.listkeys('phases')
347 363 if (pushop.ui.configbool('ui', '_usedassubrepo', False)
348 364 and remotephases # server supports phases
349 365 and pushop.ret is None # nothing was pushed
350 366 and remotephases.get('publishing', False)):
351 367 # When:
352 368 # - this is a subrepo push
353 369 # - and remote support phase
354 370 # - and no changeset was pushed
355 371 # - and remote is publishing
356 372 # We may be in issue 3871 case!
357 373 # We drop the possible phase synchronisation done by
358 374 # courtesy to publish changesets possibly locally draft
359 375 # on the remote.
360 376 remotephases = {'publishing': 'True'}
361 377 if not remotephases: # old server or public only reply from non-publishing
362 378 _localphasemove(pushop, cheads)
363 379 # don't push any phase data as there is nothing to push
364 380 else:
365 381 ana = phases.analyzeremotephases(pushop.repo, cheads,
366 382 remotephases)
367 383 pheads, droots = ana
368 384 ### Apply remote phase on local
369 385 if remotephases.get('publishing', False):
370 386 _localphasemove(pushop, cheads)
371 387 else: # publish = False
372 388 _localphasemove(pushop, pheads)
373 389 _localphasemove(pushop, cheads, phases.draft)
374 390 ### Apply local phase on remote
375 391
376 392 # Get the list of all revs draft on remote by public here.
377 393 # XXX Beware that revset break if droots is not strictly
378 394 # XXX root we may want to ensure it is but it is costly
379 395 outdated = unfi.set('heads((%ln::%ln) and public())',
380 396 droots, cheads)
381 397 for newremotehead in outdated:
382 398 r = pushop.remote.pushkey('phases',
383 399 newremotehead.hex(),
384 400 str(phases.draft),
385 401 str(phases.public))
386 402 if not r:
387 403 pushop.ui.warn(_('updating %s to public failed!\n')
388 404 % newremotehead)
389 405
390 406 def _localphasemove(pushop, nodes, phase=phases.public):
391 407 """move <nodes> to <phase> in the local source repo"""
392 408 if pushop.locallocked:
393 409 phases.advanceboundary(pushop.repo, phase, nodes)
394 410 else:
395 411 # repo is not locked, do not change any phases!
396 412 # Informs the user that phases should have been moved when
397 413 # applicable.
398 414 actualmoves = [n for n in nodes if phase < pushop.repo[n].phase()]
399 415 phasestr = phases.phasenames[phase]
400 416 if actualmoves:
401 417 pushop.ui.status(_('cannot lock source repo, skipping '
402 418 'local %s phase update\n') % phasestr)
403 419
404 420 def _pushobsolete(pushop):
405 421 """utility function to push obsolete markers to a remote"""
406 422 pushop.ui.debug('try to push obsolete markers to remote\n')
407 423 repo = pushop.repo
408 424 remote = pushop.remote
409 425 if (obsolete._enabled and repo.obsstore and
410 426 'obsolete' in remote.listkeys('namespaces')):
411 427 rslts = []
412 428 remotedata = repo.listkeys('obsolete')
413 429 for key in sorted(remotedata, reverse=True):
414 430 # reverse sort to ensure we end with dump0
415 431 data = remotedata[key]
416 432 rslts.append(remote.pushkey('obsolete', key, '', data))
417 433 if [r for r in rslts if not r]:
418 434 msg = _('failed to push some obsolete markers!\n')
419 435 repo.ui.warn(msg)
420 436
421 437 def _pushbookmark(pushop):
422 438 """Update bookmark position on remote"""
423 439 ui = pushop.ui
424 440 repo = pushop.repo.unfiltered()
425 441 remote = pushop.remote
426 442 ui.debug("checking for updated bookmarks\n")
427 443 revnums = map(repo.changelog.rev, pushop.revs or [])
428 444 ancestors = [a for a in repo.changelog.ancestors(revnums, inclusive=True)]
429 445 (addsrc, adddst, advsrc, advdst, diverge, differ, invalid
430 446 ) = bookmarks.compare(repo, repo._bookmarks, remote.listkeys('bookmarks'),
431 447 srchex=hex)
432 448
433 449 for b, scid, dcid in advsrc:
434 450 if ancestors and repo[scid].rev() not in ancestors:
435 451 continue
436 452 if remote.pushkey('bookmarks', b, dcid, scid):
437 453 ui.status(_("updating bookmark %s\n") % b)
438 454 else:
439 455 ui.warn(_('updating bookmark %s failed!\n') % b)
440 456
441 457 class pulloperation(object):
442 458 """A object that represent a single pull operation
443 459
444 460 It purpose is to carry push related state and very common operation.
445 461
446 462 A new should be created at the beginning of each pull and discarded
447 463 afterward.
448 464 """
449 465
450 466 def __init__(self, repo, remote, heads=None, force=False):
451 467 # repo we pull into
452 468 self.repo = repo
453 469 # repo we pull from
454 470 self.remote = remote
455 471 # revision we try to pull (None is "all")
456 472 self.heads = heads
457 473 # do we force pull?
458 474 self.force = force
459 475 # the name the pull transaction
460 476 self._trname = 'pull\n' + util.hidepassword(remote.url())
461 477 # hold the transaction once created
462 478 self._tr = None
463 479 # set of common changeset between local and remote before pull
464 480 self.common = None
465 481 # set of pulled head
466 482 self.rheads = None
467 483 # list of missing changeset to fetch remotely
468 484 self.fetch = None
469 485 # result of changegroup pulling (used as return code by pull)
470 486 self.cgresult = None
471 487 # list of step remaining todo (related to future bundle2 usage)
472 488 self.todosteps = set(['changegroup', 'phases', 'obsmarkers'])
473 489
474 490 @util.propertycache
475 491 def pulledsubset(self):
476 492 """heads of the set of changeset target by the pull"""
477 493 # compute target subset
478 494 if self.heads is None:
479 495 # We pulled every thing possible
480 496 # sync on everything common
481 497 c = set(self.common)
482 498 ret = list(self.common)
483 499 for n in self.rheads:
484 500 if n not in c:
485 501 ret.append(n)
486 502 return ret
487 503 else:
488 504 # We pulled a specific subset
489 505 # sync on this subset
490 506 return self.heads
491 507
492 508 def gettransaction(self):
493 509 """get appropriate pull transaction, creating it if needed"""
494 510 if self._tr is None:
495 511 self._tr = self.repo.transaction(self._trname)
496 512 return self._tr
497 513
498 514 def closetransaction(self):
499 515 """close transaction if created"""
500 516 if self._tr is not None:
501 517 self._tr.close()
502 518
503 519 def releasetransaction(self):
504 520 """release transaction if created"""
505 521 if self._tr is not None:
506 522 self._tr.release()
507 523
508 524 def pull(repo, remote, heads=None, force=False):
509 525 pullop = pulloperation(repo, remote, heads, force)
510 526 if pullop.remote.local():
511 527 missing = set(pullop.remote.requirements) - pullop.repo.supported
512 528 if missing:
513 529 msg = _("required features are not"
514 530 " supported in the destination:"
515 531 " %s") % (', '.join(sorted(missing)))
516 532 raise util.Abort(msg)
517 533
518 534 lock = pullop.repo.lock()
519 535 try:
520 536 _pulldiscovery(pullop)
521 537 if (pullop.repo.ui.configbool('server', 'bundle2', False)
522 538 and pullop.remote.capable('bundle2-exp')):
523 539 _pullbundle2(pullop)
524 540 if 'changegroup' in pullop.todosteps:
525 541 _pullchangeset(pullop)
526 542 if 'phases' in pullop.todosteps:
527 543 _pullphase(pullop)
528 544 if 'obsmarkers' in pullop.todosteps:
529 545 _pullobsolete(pullop)
530 546 pullop.closetransaction()
531 547 finally:
532 548 pullop.releasetransaction()
533 549 lock.release()
534 550
535 551 return pullop.cgresult
536 552
537 553 def _pulldiscovery(pullop):
538 554 """discovery phase for the pull
539 555
540 556 Current handle changeset discovery only, will change handle all discovery
541 557 at some point."""
542 558 tmp = discovery.findcommonincoming(pullop.repo.unfiltered(),
543 559 pullop.remote,
544 560 heads=pullop.heads,
545 561 force=pullop.force)
546 562 pullop.common, pullop.fetch, pullop.rheads = tmp
547 563
548 564 def _pullbundle2(pullop):
549 565 """pull data using bundle2
550 566
551 567 For now, the only supported data are changegroup."""
552 568 kwargs = {'bundlecaps': set(['HG2X'])}
553 569 capsblob = bundle2.encodecaps(pullop.repo.bundle2caps)
554 570 kwargs['bundlecaps'].add('bundle2=' + urllib.quote(capsblob))
555 571 # pulling changegroup
556 572 pullop.todosteps.remove('changegroup')
557 573 if not pullop.fetch:
558 574 pullop.repo.ui.status(_("no changes found\n"))
559 575 pullop.cgresult = 0
560 576 else:
561 577 kwargs['common'] = pullop.common
562 578 kwargs['heads'] = pullop.heads or pullop.rheads
563 579 if pullop.heads is None and list(pullop.common) == [nullid]:
564 580 pullop.repo.ui.status(_("requesting all changes\n"))
565 581 if kwargs.keys() == ['format']:
566 582 return # nothing to pull
567 583 bundle = pullop.remote.getbundle('pull', **kwargs)
568 584 try:
569 585 op = bundle2.processbundle(pullop.repo, bundle, pullop.gettransaction)
570 586 except KeyError, exc:
571 587 raise util.Abort('missing support for %s' % exc)
572 588 assert len(op.records['changegroup']) == 1
573 589 pullop.cgresult = op.records['changegroup'][0]['return']
574 590
575 591 def _pullchangeset(pullop):
576 592 """pull changeset from unbundle into the local repo"""
577 593 # We delay the open of the transaction as late as possible so we
578 594 # don't open transaction for nothing or you break future useful
579 595 # rollback call
580 596 pullop.todosteps.remove('changegroup')
581 597 if not pullop.fetch:
582 598 pullop.repo.ui.status(_("no changes found\n"))
583 599 pullop.cgresult = 0
584 600 return
585 601 pullop.gettransaction()
586 602 if pullop.heads is None and list(pullop.common) == [nullid]:
587 603 pullop.repo.ui.status(_("requesting all changes\n"))
588 604 elif pullop.heads is None and pullop.remote.capable('changegroupsubset'):
589 605 # issue1320, avoid a race if remote changed after discovery
590 606 pullop.heads = pullop.rheads
591 607
592 608 if pullop.remote.capable('getbundle'):
593 609 # TODO: get bundlecaps from remote
594 610 cg = pullop.remote.getbundle('pull', common=pullop.common,
595 611 heads=pullop.heads or pullop.rheads)
596 612 elif pullop.heads is None:
597 613 cg = pullop.remote.changegroup(pullop.fetch, 'pull')
598 614 elif not pullop.remote.capable('changegroupsubset'):
599 615 raise util.Abort(_("partial pull cannot be done because "
600 616 "other repository doesn't support "
601 617 "changegroupsubset."))
602 618 else:
603 619 cg = pullop.remote.changegroupsubset(pullop.fetch, pullop.heads, 'pull')
604 620 pullop.cgresult = changegroup.addchangegroup(pullop.repo, cg, 'pull',
605 621 pullop.remote.url())
606 622
607 623 def _pullphase(pullop):
608 624 # Get remote phases data from remote
609 625 pullop.todosteps.remove('phases')
610 626 remotephases = pullop.remote.listkeys('phases')
611 627 publishing = bool(remotephases.get('publishing', False))
612 628 if remotephases and not publishing:
613 629 # remote is new and unpublishing
614 630 pheads, _dr = phases.analyzeremotephases(pullop.repo,
615 631 pullop.pulledsubset,
616 632 remotephases)
617 633 phases.advanceboundary(pullop.repo, phases.public, pheads)
618 634 phases.advanceboundary(pullop.repo, phases.draft,
619 635 pullop.pulledsubset)
620 636 else:
621 637 # Remote is old or publishing all common changesets
622 638 # should be seen as public
623 639 phases.advanceboundary(pullop.repo, phases.public,
624 640 pullop.pulledsubset)
625 641
626 642 def _pullobsolete(pullop):
627 643 """utility function to pull obsolete markers from a remote
628 644
629 645 The `gettransaction` is function that return the pull transaction, creating
630 646 one if necessary. We return the transaction to inform the calling code that
631 647 a new transaction have been created (when applicable).
632 648
633 649 Exists mostly to allow overriding for experimentation purpose"""
634 650 pullop.todosteps.remove('obsmarkers')
635 651 tr = None
636 652 if obsolete._enabled:
637 653 pullop.repo.ui.debug('fetching remote obsolete markers\n')
638 654 remoteobs = pullop.remote.listkeys('obsolete')
639 655 if 'dump0' in remoteobs:
640 656 tr = pullop.gettransaction()
641 657 for key in sorted(remoteobs, reverse=True):
642 658 if key.startswith('dump'):
643 659 data = base85.b85decode(remoteobs[key])
644 660 pullop.repo.obsstore.mergemarkers(tr, data)
645 661 pullop.repo.invalidatevolatilesets()
646 662 return tr
647 663
648 664 def getbundle(repo, source, heads=None, common=None, bundlecaps=None):
649 665 """return a full bundle (with potentially multiple kind of parts)
650 666
651 667 Could be a bundle HG10 or a bundle HG2X depending on bundlecaps
652 668 passed. For now, the bundle can contain only changegroup, but this will
653 669 changes when more part type will be available for bundle2.
654 670
655 671 This is different from changegroup.getbundle that only returns an HG10
656 672 changegroup bundle. They may eventually get reunited in the future when we
657 673 have a clearer idea of the API we what to query different data.
658 674
659 675 The implementation is at a very early stage and will get massive rework
660 676 when the API of bundle is refined.
661 677 """
662 678 # build bundle here.
663 679 cg = changegroup.getbundle(repo, source, heads=heads,
664 680 common=common, bundlecaps=bundlecaps)
665 681 if bundlecaps is None or 'HG2X' not in bundlecaps:
666 682 return cg
667 683 # very crude first implementation,
668 684 # the bundle API will change and the generation will be done lazily.
669 685 b2caps = {}
670 686 for bcaps in bundlecaps:
671 687 if bcaps.startswith('bundle2='):
672 688 blob = urllib.unquote(bcaps[len('bundle2='):])
673 689 b2caps.update(bundle2.decodecaps(blob))
674 690 bundler = bundle2.bundle20(repo.ui, b2caps)
675 691 part = bundle2.bundlepart('b2x:changegroup', data=cg.getchunks())
676 692 bundler.addpart(part)
677 693 return util.chunkbuffer(bundler.getchunks())
678 694
679 695 class PushRaced(RuntimeError):
680 696 """An exception raised during unbundling that indicate a push race"""
681 697
682 698 def check_heads(repo, their_heads, context):
683 699 """check if the heads of a repo have been modified
684 700
685 701 Used by peer for unbundling.
686 702 """
687 703 heads = repo.heads()
688 704 heads_hash = util.sha1(''.join(sorted(heads))).digest()
689 705 if not (their_heads == ['force'] or their_heads == heads or
690 706 their_heads == ['hashed', heads_hash]):
691 707 # someone else committed/pushed/unbundled while we
692 708 # were transferring data
693 709 raise PushRaced('repository changed while %s - '
694 710 'please try again' % context)
695 711
696 712 def unbundle(repo, cg, heads, source, url):
697 713 """Apply a bundle to a repo.
698 714
699 715 this function makes sure the repo is locked during the application and have
700 716 mechanism to check that no push race occurred between the creation of the
701 717 bundle and its application.
702 718
703 719 If the push was raced as PushRaced exception is raised."""
704 720 r = 0
705 721 # need a transaction when processing a bundle2 stream
706 722 tr = None
707 723 lock = repo.lock()
708 724 try:
709 725 check_heads(repo, heads, 'uploading changes')
710 726 # push can proceed
711 727 if util.safehasattr(cg, 'params'):
712 728 tr = repo.transaction('unbundle')
713 729 r = bundle2.processbundle(repo, cg, lambda: tr).reply
714 730 tr.close()
715 731 else:
716 732 r = changegroup.addchangegroup(repo, cg, source, url)
717 733 finally:
718 734 if tr is not None:
719 735 tr.release()
720 736 lock.release()
721 737 return r
General Comments 0
You need to be logged in to leave comments. Login now