##// END OF EJS Templates
bundle2: include client capabilities in the pushed bundle...
Pierre-Yves David -
r21142:15039ce3 default
parent child Browse files
Show More
@@ -1,708 +1,710 b''
1 1 # exchange.py - utility to exchange data between repos.
2 2 #
3 3 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from i18n import _
9 9 from node import hex, nullid
10 10 import errno, urllib
11 11 import util, scmutil, changegroup, base85
12 12 import discovery, phases, obsolete, bookmarks, bundle2
13 13
14 14 def readbundle(ui, fh, fname, vfs=None):
15 15 header = changegroup.readexactly(fh, 4)
16 16
17 17 alg = None
18 18 if not fname:
19 19 fname = "stream"
20 20 if not header.startswith('HG') and header.startswith('\0'):
21 21 fh = changegroup.headerlessfixup(fh, header)
22 22 header = "HG10"
23 23 alg = 'UN'
24 24 elif vfs:
25 25 fname = vfs.join(fname)
26 26
27 27 magic, version = header[0:2], header[2:4]
28 28
29 29 if magic != 'HG':
30 30 raise util.Abort(_('%s: not a Mercurial bundle') % fname)
31 31 if version == '10':
32 32 if alg is None:
33 33 alg = changegroup.readexactly(fh, 2)
34 34 return changegroup.unbundle10(fh, alg)
35 35 elif version == '20':
36 36 return bundle2.unbundle20(ui, fh, header=magic + version)
37 37 else:
38 38 raise util.Abort(_('%s: unknown bundle version %s') % (fname, version))
39 39
40 40
41 41 class pushoperation(object):
42 42 """A object that represent a single push operation
43 43
44 44 It purpose is to carry push related state and very common operation.
45 45
46 46 A new should be created at the beginning of each push and discarded
47 47 afterward.
48 48 """
49 49
50 50 def __init__(self, repo, remote, force=False, revs=None, newbranch=False):
51 51 # repo we push from
52 52 self.repo = repo
53 53 self.ui = repo.ui
54 54 # repo we push to
55 55 self.remote = remote
56 56 # force option provided
57 57 self.force = force
58 58 # revs to be pushed (None is "all")
59 59 self.revs = revs
60 60 # allow push of new branch
61 61 self.newbranch = newbranch
62 62 # did a local lock get acquired?
63 63 self.locallocked = None
64 64 # Integer version of the push result
65 65 # - None means nothing to push
66 66 # - 0 means HTTP error
67 67 # - 1 means we pushed and remote head count is unchanged *or*
68 68 # we have outgoing changesets but refused to push
69 69 # - other values as described by addchangegroup()
70 70 self.ret = None
71 71 # discover.outgoing object (contains common and outgoing data)
72 72 self.outgoing = None
73 73 # all remote heads before the push
74 74 self.remoteheads = None
75 75 # testable as a boolean indicating if any nodes are missing locally.
76 76 self.incoming = None
77 77 # set of all heads common after changeset bundle push
78 78 self.commonheads = None
79 79
80 80 def push(repo, remote, force=False, revs=None, newbranch=False):
81 81 '''Push outgoing changesets (limited by revs) from a local
82 82 repository to remote. Return an integer:
83 83 - None means nothing to push
84 84 - 0 means HTTP error
85 85 - 1 means we pushed and remote head count is unchanged *or*
86 86 we have outgoing changesets but refused to push
87 87 - other values as described by addchangegroup()
88 88 '''
89 89 pushop = pushoperation(repo, remote, force, revs, newbranch)
90 90 if pushop.remote.local():
91 91 missing = (set(pushop.repo.requirements)
92 92 - pushop.remote.local().supported)
93 93 if missing:
94 94 msg = _("required features are not"
95 95 " supported in the destination:"
96 96 " %s") % (', '.join(sorted(missing)))
97 97 raise util.Abort(msg)
98 98
99 99 # there are two ways to push to remote repo:
100 100 #
101 101 # addchangegroup assumes local user can lock remote
102 102 # repo (local filesystem, old ssh servers).
103 103 #
104 104 # unbundle assumes local user cannot lock remote repo (new ssh
105 105 # servers, http servers).
106 106
107 107 if not pushop.remote.canpush():
108 108 raise util.Abort(_("destination does not support push"))
109 109 # get local lock as we might write phase data
110 110 locallock = None
111 111 try:
112 112 locallock = pushop.repo.lock()
113 113 pushop.locallocked = True
114 114 except IOError, err:
115 115 pushop.locallocked = False
116 116 if err.errno != errno.EACCES:
117 117 raise
118 118 # source repo cannot be locked.
119 119 # We do not abort the push, but just disable the local phase
120 120 # synchronisation.
121 121 msg = 'cannot lock source repository: %s\n' % err
122 122 pushop.ui.debug(msg)
123 123 try:
124 124 pushop.repo.checkpush(pushop)
125 125 lock = None
126 126 unbundle = pushop.remote.capable('unbundle')
127 127 if not unbundle:
128 128 lock = pushop.remote.lock()
129 129 try:
130 130 _pushdiscovery(pushop)
131 131 if _pushcheckoutgoing(pushop):
132 132 pushop.repo.prepushoutgoinghooks(pushop.repo,
133 133 pushop.remote,
134 134 pushop.outgoing)
135 135 if pushop.remote.capable('bundle2'):
136 136 _pushbundle2(pushop)
137 137 else:
138 138 _pushchangeset(pushop)
139 139 _pushcomputecommonheads(pushop)
140 140 _pushsyncphase(pushop)
141 141 _pushobsolete(pushop)
142 142 finally:
143 143 if lock is not None:
144 144 lock.release()
145 145 finally:
146 146 if locallock is not None:
147 147 locallock.release()
148 148
149 149 _pushbookmark(pushop)
150 150 return pushop.ret
151 151
152 152 def _pushdiscovery(pushop):
153 153 # discovery
154 154 unfi = pushop.repo.unfiltered()
155 155 fci = discovery.findcommonincoming
156 156 commoninc = fci(unfi, pushop.remote, force=pushop.force)
157 157 common, inc, remoteheads = commoninc
158 158 fco = discovery.findcommonoutgoing
159 159 outgoing = fco(unfi, pushop.remote, onlyheads=pushop.revs,
160 160 commoninc=commoninc, force=pushop.force)
161 161 pushop.outgoing = outgoing
162 162 pushop.remoteheads = remoteheads
163 163 pushop.incoming = inc
164 164
165 165 def _pushcheckoutgoing(pushop):
166 166 outgoing = pushop.outgoing
167 167 unfi = pushop.repo.unfiltered()
168 168 if not outgoing.missing:
169 169 # nothing to push
170 170 scmutil.nochangesfound(unfi.ui, unfi, outgoing.excluded)
171 171 return False
172 172 # something to push
173 173 if not pushop.force:
174 174 # if repo.obsstore == False --> no obsolete
175 175 # then, save the iteration
176 176 if unfi.obsstore:
177 177 # this message are here for 80 char limit reason
178 178 mso = _("push includes obsolete changeset: %s!")
179 179 mst = "push includes %s changeset: %s!"
180 180 # plain versions for i18n tool to detect them
181 181 _("push includes unstable changeset: %s!")
182 182 _("push includes bumped changeset: %s!")
183 183 _("push includes divergent changeset: %s!")
184 184 # If we are to push if there is at least one
185 185 # obsolete or unstable changeset in missing, at
186 186 # least one of the missinghead will be obsolete or
187 187 # unstable. So checking heads only is ok
188 188 for node in outgoing.missingheads:
189 189 ctx = unfi[node]
190 190 if ctx.obsolete():
191 191 raise util.Abort(mso % ctx)
192 192 elif ctx.troubled():
193 193 raise util.Abort(_(mst)
194 194 % (ctx.troubles()[0],
195 195 ctx))
196 196 newbm = pushop.ui.configlist('bookmarks', 'pushing')
197 197 discovery.checkheads(unfi, pushop.remote, outgoing,
198 198 pushop.remoteheads,
199 199 pushop.newbranch,
200 200 bool(pushop.incoming),
201 201 newbm)
202 202 return True
203 203
204 204 def _pushbundle2(pushop):
205 205 """push data to the remote using bundle2
206 206
207 207 The only currently supported type of data is changegroup but this will
208 208 evolve in the future."""
209 209 # Send known head to the server for race detection.
210 210 capsblob = urllib.unquote(pushop.remote.capable('bundle2'))
211 211 caps = bundle2.decodecaps(capsblob)
212 212 bundler = bundle2.bundle20(pushop.ui, caps)
213 bundler.addpart(bundle2.bundlepart('replycaps'))
213 # create reply capability
214 capsblob = bundle2.encodecaps(pushop.repo.bundle2caps)
215 bundler.addpart(bundle2.bundlepart('replycaps', data=capsblob))
214 216 if not pushop.force:
215 217 part = bundle2.bundlepart('CHECK:HEADS', data=iter(pushop.remoteheads))
216 218 bundler.addpart(part)
217 219 # add the changegroup bundle
218 220 cg = changegroup.getlocalbundle(pushop.repo, 'push', pushop.outgoing)
219 221 cgpart = bundle2.bundlepart('CHANGEGROUP', data=cg.getchunks())
220 222 bundler.addpart(cgpart)
221 223 stream = util.chunkbuffer(bundler.getchunks())
222 224 reply = pushop.remote.unbundle(stream, ['force'], 'push')
223 225 try:
224 226 op = bundle2.processbundle(pushop.repo, reply)
225 227 except KeyError, exc:
226 228 raise util.Abort('missing support for %s' % exc)
227 229 cgreplies = op.records.getreplies(cgpart.id)
228 230 assert len(cgreplies['changegroup']) == 1
229 231 pushop.ret = cgreplies['changegroup'][0]['return']
230 232
231 233 def _pushchangeset(pushop):
232 234 """Make the actual push of changeset bundle to remote repo"""
233 235 outgoing = pushop.outgoing
234 236 unbundle = pushop.remote.capable('unbundle')
235 237 # TODO: get bundlecaps from remote
236 238 bundlecaps = None
237 239 # create a changegroup from local
238 240 if pushop.revs is None and not (outgoing.excluded
239 241 or pushop.repo.changelog.filteredrevs):
240 242 # push everything,
241 243 # use the fast path, no race possible on push
242 244 bundler = changegroup.bundle10(pushop.repo, bundlecaps)
243 245 cg = changegroup.getsubset(pushop.repo,
244 246 outgoing,
245 247 bundler,
246 248 'push',
247 249 fastpath=True)
248 250 else:
249 251 cg = changegroup.getlocalbundle(pushop.repo, 'push', outgoing,
250 252 bundlecaps)
251 253
252 254 # apply changegroup to remote
253 255 if unbundle:
254 256 # local repo finds heads on server, finds out what
255 257 # revs it must push. once revs transferred, if server
256 258 # finds it has different heads (someone else won
257 259 # commit/push race), server aborts.
258 260 if pushop.force:
259 261 remoteheads = ['force']
260 262 else:
261 263 remoteheads = pushop.remoteheads
262 264 # ssh: return remote's addchangegroup()
263 265 # http: return remote's addchangegroup() or 0 for error
264 266 pushop.ret = pushop.remote.unbundle(cg, remoteheads,
265 267 'push')
266 268 else:
267 269 # we return an integer indicating remote head count
268 270 # change
269 271 pushop.ret = pushop.remote.addchangegroup(cg, 'push', pushop.repo.url())
270 272
271 273 def _pushcomputecommonheads(pushop):
272 274 unfi = pushop.repo.unfiltered()
273 275 if pushop.ret:
274 276 # push succeed, synchronize target of the push
275 277 cheads = pushop.outgoing.missingheads
276 278 elif pushop.revs is None:
277 279 # All out push fails. synchronize all common
278 280 cheads = pushop.outgoing.commonheads
279 281 else:
280 282 # I want cheads = heads(::missingheads and ::commonheads)
281 283 # (missingheads is revs with secret changeset filtered out)
282 284 #
283 285 # This can be expressed as:
284 286 # cheads = ( (missingheads and ::commonheads)
285 287 # + (commonheads and ::missingheads))"
286 288 # )
287 289 #
288 290 # while trying to push we already computed the following:
289 291 # common = (::commonheads)
290 292 # missing = ((commonheads::missingheads) - commonheads)
291 293 #
292 294 # We can pick:
293 295 # * missingheads part of common (::commonheads)
294 296 common = set(pushop.outgoing.common)
295 297 nm = pushop.repo.changelog.nodemap
296 298 cheads = [node for node in pushop.revs if nm[node] in common]
297 299 # and
298 300 # * commonheads parents on missing
299 301 revset = unfi.set('%ln and parents(roots(%ln))',
300 302 pushop.outgoing.commonheads,
301 303 pushop.outgoing.missing)
302 304 cheads.extend(c.node() for c in revset)
303 305 pushop.commonheads = cheads
304 306
305 307 def _pushsyncphase(pushop):
306 308 """synchronise phase information locally and remotely"""
307 309 unfi = pushop.repo.unfiltered()
308 310 cheads = pushop.commonheads
309 311 if pushop.ret:
310 312 # push succeed, synchronize target of the push
311 313 cheads = pushop.outgoing.missingheads
312 314 elif pushop.revs is None:
313 315 # All out push fails. synchronize all common
314 316 cheads = pushop.outgoing.commonheads
315 317 else:
316 318 # I want cheads = heads(::missingheads and ::commonheads)
317 319 # (missingheads is revs with secret changeset filtered out)
318 320 #
319 321 # This can be expressed as:
320 322 # cheads = ( (missingheads and ::commonheads)
321 323 # + (commonheads and ::missingheads))"
322 324 # )
323 325 #
324 326 # while trying to push we already computed the following:
325 327 # common = (::commonheads)
326 328 # missing = ((commonheads::missingheads) - commonheads)
327 329 #
328 330 # We can pick:
329 331 # * missingheads part of common (::commonheads)
330 332 common = set(pushop.outgoing.common)
331 333 nm = pushop.repo.changelog.nodemap
332 334 cheads = [node for node in pushop.revs if nm[node] in common]
333 335 # and
334 336 # * commonheads parents on missing
335 337 revset = unfi.set('%ln and parents(roots(%ln))',
336 338 pushop.outgoing.commonheads,
337 339 pushop.outgoing.missing)
338 340 cheads.extend(c.node() for c in revset)
339 341 pushop.commonheads = cheads
340 342 # even when we don't push, exchanging phase data is useful
341 343 remotephases = pushop.remote.listkeys('phases')
342 344 if (pushop.ui.configbool('ui', '_usedassubrepo', False)
343 345 and remotephases # server supports phases
344 346 and pushop.ret is None # nothing was pushed
345 347 and remotephases.get('publishing', False)):
346 348 # When:
347 349 # - this is a subrepo push
348 350 # - and remote support phase
349 351 # - and no changeset was pushed
350 352 # - and remote is publishing
351 353 # We may be in issue 3871 case!
352 354 # We drop the possible phase synchronisation done by
353 355 # courtesy to publish changesets possibly locally draft
354 356 # on the remote.
355 357 remotephases = {'publishing': 'True'}
356 358 if not remotephases: # old server or public only reply from non-publishing
357 359 _localphasemove(pushop, cheads)
358 360 # don't push any phase data as there is nothing to push
359 361 else:
360 362 ana = phases.analyzeremotephases(pushop.repo, cheads,
361 363 remotephases)
362 364 pheads, droots = ana
363 365 ### Apply remote phase on local
364 366 if remotephases.get('publishing', False):
365 367 _localphasemove(pushop, cheads)
366 368 else: # publish = False
367 369 _localphasemove(pushop, pheads)
368 370 _localphasemove(pushop, cheads, phases.draft)
369 371 ### Apply local phase on remote
370 372
371 373 # Get the list of all revs draft on remote by public here.
372 374 # XXX Beware that revset break if droots is not strictly
373 375 # XXX root we may want to ensure it is but it is costly
374 376 outdated = unfi.set('heads((%ln::%ln) and public())',
375 377 droots, cheads)
376 378 for newremotehead in outdated:
377 379 r = pushop.remote.pushkey('phases',
378 380 newremotehead.hex(),
379 381 str(phases.draft),
380 382 str(phases.public))
381 383 if not r:
382 384 pushop.ui.warn(_('updating %s to public failed!\n')
383 385 % newremotehead)
384 386
385 387 def _localphasemove(pushop, nodes, phase=phases.public):
386 388 """move <nodes> to <phase> in the local source repo"""
387 389 if pushop.locallocked:
388 390 phases.advanceboundary(pushop.repo, phase, nodes)
389 391 else:
390 392 # repo is not locked, do not change any phases!
391 393 # Informs the user that phases should have been moved when
392 394 # applicable.
393 395 actualmoves = [n for n in nodes if phase < pushop.repo[n].phase()]
394 396 phasestr = phases.phasenames[phase]
395 397 if actualmoves:
396 398 pushop.ui.status(_('cannot lock source repo, skipping '
397 399 'local %s phase update\n') % phasestr)
398 400
399 401 def _pushobsolete(pushop):
400 402 """utility function to push obsolete markers to a remote"""
401 403 pushop.ui.debug('try to push obsolete markers to remote\n')
402 404 repo = pushop.repo
403 405 remote = pushop.remote
404 406 if (obsolete._enabled and repo.obsstore and
405 407 'obsolete' in remote.listkeys('namespaces')):
406 408 rslts = []
407 409 remotedata = repo.listkeys('obsolete')
408 410 for key in sorted(remotedata, reverse=True):
409 411 # reverse sort to ensure we end with dump0
410 412 data = remotedata[key]
411 413 rslts.append(remote.pushkey('obsolete', key, '', data))
412 414 if [r for r in rslts if not r]:
413 415 msg = _('failed to push some obsolete markers!\n')
414 416 repo.ui.warn(msg)
415 417
416 418 def _pushbookmark(pushop):
417 419 """Update bookmark position on remote"""
418 420 ui = pushop.ui
419 421 repo = pushop.repo.unfiltered()
420 422 remote = pushop.remote
421 423 ui.debug("checking for updated bookmarks\n")
422 424 revnums = map(repo.changelog.rev, pushop.revs or [])
423 425 ancestors = [a for a in repo.changelog.ancestors(revnums, inclusive=True)]
424 426 (addsrc, adddst, advsrc, advdst, diverge, differ, invalid
425 427 ) = bookmarks.compare(repo, repo._bookmarks, remote.listkeys('bookmarks'),
426 428 srchex=hex)
427 429
428 430 for b, scid, dcid in advsrc:
429 431 if ancestors and repo[scid].rev() not in ancestors:
430 432 continue
431 433 if remote.pushkey('bookmarks', b, dcid, scid):
432 434 ui.status(_("updating bookmark %s\n") % b)
433 435 else:
434 436 ui.warn(_('updating bookmark %s failed!\n') % b)
435 437
436 438 class pulloperation(object):
437 439 """A object that represent a single pull operation
438 440
439 441 It purpose is to carry push related state and very common operation.
440 442
441 443 A new should be created at the beginning of each pull and discarded
442 444 afterward.
443 445 """
444 446
445 447 def __init__(self, repo, remote, heads=None, force=False):
446 448 # repo we pull into
447 449 self.repo = repo
448 450 # repo we pull from
449 451 self.remote = remote
450 452 # revision we try to pull (None is "all")
451 453 self.heads = heads
452 454 # do we force pull?
453 455 self.force = force
454 456 # the name the pull transaction
455 457 self._trname = 'pull\n' + util.hidepassword(remote.url())
456 458 # hold the transaction once created
457 459 self._tr = None
458 460 # set of common changeset between local and remote before pull
459 461 self.common = None
460 462 # set of pulled head
461 463 self.rheads = None
462 464 # list of missing changeset to fetch remotely
463 465 self.fetch = None
464 466 # result of changegroup pulling (used as return code by pull)
465 467 self.cgresult = None
466 468 # list of step remaining todo (related to future bundle2 usage)
467 469 self.todosteps = set(['changegroup', 'phases', 'obsmarkers'])
468 470
469 471 @util.propertycache
470 472 def pulledsubset(self):
471 473 """heads of the set of changeset target by the pull"""
472 474 # compute target subset
473 475 if self.heads is None:
474 476 # We pulled every thing possible
475 477 # sync on everything common
476 478 c = set(self.common)
477 479 ret = list(self.common)
478 480 for n in self.rheads:
479 481 if n not in c:
480 482 ret.append(n)
481 483 return ret
482 484 else:
483 485 # We pulled a specific subset
484 486 # sync on this subset
485 487 return self.heads
486 488
487 489 def gettransaction(self):
488 490 """get appropriate pull transaction, creating it if needed"""
489 491 if self._tr is None:
490 492 self._tr = self.repo.transaction(self._trname)
491 493 return self._tr
492 494
493 495 def closetransaction(self):
494 496 """close transaction if created"""
495 497 if self._tr is not None:
496 498 self._tr.close()
497 499
498 500 def releasetransaction(self):
499 501 """release transaction if created"""
500 502 if self._tr is not None:
501 503 self._tr.release()
502 504
503 505 def pull(repo, remote, heads=None, force=False):
504 506 pullop = pulloperation(repo, remote, heads, force)
505 507 if pullop.remote.local():
506 508 missing = set(pullop.remote.requirements) - pullop.repo.supported
507 509 if missing:
508 510 msg = _("required features are not"
509 511 " supported in the destination:"
510 512 " %s") % (', '.join(sorted(missing)))
511 513 raise util.Abort(msg)
512 514
513 515 lock = pullop.repo.lock()
514 516 try:
515 517 _pulldiscovery(pullop)
516 518 if pullop.remote.capable('bundle2'):
517 519 _pullbundle2(pullop)
518 520 if 'changegroup' in pullop.todosteps:
519 521 _pullchangeset(pullop)
520 522 if 'phases' in pullop.todosteps:
521 523 _pullphase(pullop)
522 524 if 'obsmarkers' in pullop.todosteps:
523 525 _pullobsolete(pullop)
524 526 pullop.closetransaction()
525 527 finally:
526 528 pullop.releasetransaction()
527 529 lock.release()
528 530
529 531 return pullop.cgresult
530 532
531 533 def _pulldiscovery(pullop):
532 534 """discovery phase for the pull
533 535
534 536 Current handle changeset discovery only, will change handle all discovery
535 537 at some point."""
536 538 tmp = discovery.findcommonincoming(pullop.repo.unfiltered(),
537 539 pullop.remote,
538 540 heads=pullop.heads,
539 541 force=pullop.force)
540 542 pullop.common, pullop.fetch, pullop.rheads = tmp
541 543
542 544 def _pullbundle2(pullop):
543 545 """pull data using bundle2
544 546
545 547 For now, the only supported data are changegroup."""
546 548 kwargs = {'bundlecaps': set(['HG20'])}
547 549 # pulling changegroup
548 550 pullop.todosteps.remove('changegroup')
549 551 if not pullop.fetch:
550 552 pullop.repo.ui.status(_("no changes found\n"))
551 553 pullop.cgresult = 0
552 554 else:
553 555 kwargs['common'] = pullop.common
554 556 kwargs['heads'] = pullop.heads or pullop.rheads
555 557 if pullop.heads is None and list(pullop.common) == [nullid]:
556 558 pullop.repo.ui.status(_("requesting all changes\n"))
557 559 if kwargs.keys() == ['format']:
558 560 return # nothing to pull
559 561 bundle = pullop.remote.getbundle('pull', **kwargs)
560 562 try:
561 563 op = bundle2.processbundle(pullop.repo, bundle, pullop.gettransaction)
562 564 except KeyError, exc:
563 565 raise util.Abort('missing support for %s' % exc)
564 566 assert len(op.records['changegroup']) == 1
565 567 pullop.cgresult = op.records['changegroup'][0]['return']
566 568
567 569 def _pullchangeset(pullop):
568 570 """pull changeset from unbundle into the local repo"""
569 571 # We delay the open of the transaction as late as possible so we
570 572 # don't open transaction for nothing or you break future useful
571 573 # rollback call
572 574 pullop.todosteps.remove('changegroup')
573 575 if not pullop.fetch:
574 576 pullop.repo.ui.status(_("no changes found\n"))
575 577 pullop.cgresult = 0
576 578 return
577 579 pullop.gettransaction()
578 580 if pullop.heads is None and list(pullop.common) == [nullid]:
579 581 pullop.repo.ui.status(_("requesting all changes\n"))
580 582 elif pullop.heads is None and pullop.remote.capable('changegroupsubset'):
581 583 # issue1320, avoid a race if remote changed after discovery
582 584 pullop.heads = pullop.rheads
583 585
584 586 if pullop.remote.capable('getbundle'):
585 587 # TODO: get bundlecaps from remote
586 588 cg = pullop.remote.getbundle('pull', common=pullop.common,
587 589 heads=pullop.heads or pullop.rheads)
588 590 elif pullop.heads is None:
589 591 cg = pullop.remote.changegroup(pullop.fetch, 'pull')
590 592 elif not pullop.remote.capable('changegroupsubset'):
591 593 raise util.Abort(_("partial pull cannot be done because "
592 594 "other repository doesn't support "
593 595 "changegroupsubset."))
594 596 else:
595 597 cg = pullop.remote.changegroupsubset(pullop.fetch, pullop.heads, 'pull')
596 598 pullop.cgresult = changegroup.addchangegroup(pullop.repo, cg, 'pull',
597 599 pullop.remote.url())
598 600
599 601 def _pullphase(pullop):
600 602 # Get remote phases data from remote
601 603 pullop.todosteps.remove('phases')
602 604 remotephases = pullop.remote.listkeys('phases')
603 605 publishing = bool(remotephases.get('publishing', False))
604 606 if remotephases and not publishing:
605 607 # remote is new and unpublishing
606 608 pheads, _dr = phases.analyzeremotephases(pullop.repo,
607 609 pullop.pulledsubset,
608 610 remotephases)
609 611 phases.advanceboundary(pullop.repo, phases.public, pheads)
610 612 phases.advanceboundary(pullop.repo, phases.draft,
611 613 pullop.pulledsubset)
612 614 else:
613 615 # Remote is old or publishing all common changesets
614 616 # should be seen as public
615 617 phases.advanceboundary(pullop.repo, phases.public,
616 618 pullop.pulledsubset)
617 619
618 620 def _pullobsolete(pullop):
619 621 """utility function to pull obsolete markers from a remote
620 622
621 623 The `gettransaction` is function that return the pull transaction, creating
622 624 one if necessary. We return the transaction to inform the calling code that
623 625 a new transaction have been created (when applicable).
624 626
625 627 Exists mostly to allow overriding for experimentation purpose"""
626 628 pullop.todosteps.remove('obsmarkers')
627 629 tr = None
628 630 if obsolete._enabled:
629 631 pullop.repo.ui.debug('fetching remote obsolete markers\n')
630 632 remoteobs = pullop.remote.listkeys('obsolete')
631 633 if 'dump0' in remoteobs:
632 634 tr = pullop.gettransaction()
633 635 for key in sorted(remoteobs, reverse=True):
634 636 if key.startswith('dump'):
635 637 data = base85.b85decode(remoteobs[key])
636 638 pullop.repo.obsstore.mergemarkers(tr, data)
637 639 pullop.repo.invalidatevolatilesets()
638 640 return tr
639 641
640 642 def getbundle(repo, source, heads=None, common=None, bundlecaps=None):
641 643 """return a full bundle (with potentially multiple kind of parts)
642 644
643 645 Could be a bundle HG10 or a bundle HG20 depending on bundlecaps
644 646 passed. For now, the bundle can contain only changegroup, but this will
645 647 changes when more part type will be available for bundle2.
646 648
647 649 This is different from changegroup.getbundle that only returns an HG10
648 650 changegroup bundle. They may eventually get reunited in the future when we
649 651 have a clearer idea of the API we what to query different data.
650 652
651 653 The implementation is at a very early stage and will get massive rework
652 654 when the API of bundle is refined.
653 655 """
654 656 # build bundle here.
655 657 cg = changegroup.getbundle(repo, source, heads=heads,
656 658 common=common, bundlecaps=bundlecaps)
657 659 if bundlecaps is None or 'HG20' not in bundlecaps:
658 660 return cg
659 661 # very crude first implementation,
660 662 # the bundle API will change and the generation will be done lazily.
661 663 bundler = bundle2.bundle20(repo.ui)
662 664 part = bundle2.bundlepart('changegroup', data=cg.getchunks())
663 665 bundler.addpart(part)
664 666 return util.chunkbuffer(bundler.getchunks())
665 667
666 668 class PushRaced(RuntimeError):
667 669 """An exception raised during unbundling that indicate a push race"""
668 670
669 671 def check_heads(repo, their_heads, context):
670 672 """check if the heads of a repo have been modified
671 673
672 674 Used by peer for unbundling.
673 675 """
674 676 heads = repo.heads()
675 677 heads_hash = util.sha1(''.join(sorted(heads))).digest()
676 678 if not (their_heads == ['force'] or their_heads == heads or
677 679 their_heads == ['hashed', heads_hash]):
678 680 # someone else committed/pushed/unbundled while we
679 681 # were transferring data
680 682 raise PushRaced('repository changed while %s - '
681 683 'please try again' % context)
682 684
683 685 def unbundle(repo, cg, heads, source, url):
684 686 """Apply a bundle to a repo.
685 687
686 688 this function makes sure the repo is locked during the application and have
687 689 mechanism to check that no push race occurred between the creation of the
688 690 bundle and its application.
689 691
690 692 If the push was raced as PushRaced exception is raised."""
691 693 r = 0
692 694 # need a transaction when processing a bundle2 stream
693 695 tr = None
694 696 lock = repo.lock()
695 697 try:
696 698 check_heads(repo, heads, 'uploading changes')
697 699 # push can proceed
698 700 if util.safehasattr(cg, 'params'):
699 701 tr = repo.transaction('unbundle')
700 702 r = bundle2.processbundle(repo, cg, lambda: tr).reply
701 703 tr.close()
702 704 else:
703 705 r = changegroup.addchangegroup(repo, cg, source, url)
704 706 finally:
705 707 if tr is not None:
706 708 tr.release()
707 709 lock.release()
708 710 return r
General Comments 0
You need to be logged in to leave comments. Login now