##// END OF EJS Templates
bundle2: transmit capabilities to getbundle during pull...
Pierre-Yves David -
r21143:5bb5d4ba default
parent child Browse files
Show More
@@ -1,710 +1,717
1 1 # exchange.py - utility to exchange data between repos.
2 2 #
3 3 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from i18n import _
9 9 from node import hex, nullid
10 10 import errno, urllib
11 11 import util, scmutil, changegroup, base85
12 12 import discovery, phases, obsolete, bookmarks, bundle2
13 13
14 14 def readbundle(ui, fh, fname, vfs=None):
15 15 header = changegroup.readexactly(fh, 4)
16 16
17 17 alg = None
18 18 if not fname:
19 19 fname = "stream"
20 20 if not header.startswith('HG') and header.startswith('\0'):
21 21 fh = changegroup.headerlessfixup(fh, header)
22 22 header = "HG10"
23 23 alg = 'UN'
24 24 elif vfs:
25 25 fname = vfs.join(fname)
26 26
27 27 magic, version = header[0:2], header[2:4]
28 28
29 29 if magic != 'HG':
30 30 raise util.Abort(_('%s: not a Mercurial bundle') % fname)
31 31 if version == '10':
32 32 if alg is None:
33 33 alg = changegroup.readexactly(fh, 2)
34 34 return changegroup.unbundle10(fh, alg)
35 35 elif version == '20':
36 36 return bundle2.unbundle20(ui, fh, header=magic + version)
37 37 else:
38 38 raise util.Abort(_('%s: unknown bundle version %s') % (fname, version))
39 39
40 40
41 41 class pushoperation(object):
42 42 """A object that represent a single push operation
43 43
44 44 It purpose is to carry push related state and very common operation.
45 45
46 46 A new should be created at the beginning of each push and discarded
47 47 afterward.
48 48 """
49 49
50 50 def __init__(self, repo, remote, force=False, revs=None, newbranch=False):
51 51 # repo we push from
52 52 self.repo = repo
53 53 self.ui = repo.ui
54 54 # repo we push to
55 55 self.remote = remote
56 56 # force option provided
57 57 self.force = force
58 58 # revs to be pushed (None is "all")
59 59 self.revs = revs
60 60 # allow push of new branch
61 61 self.newbranch = newbranch
62 62 # did a local lock get acquired?
63 63 self.locallocked = None
64 64 # Integer version of the push result
65 65 # - None means nothing to push
66 66 # - 0 means HTTP error
67 67 # - 1 means we pushed and remote head count is unchanged *or*
68 68 # we have outgoing changesets but refused to push
69 69 # - other values as described by addchangegroup()
70 70 self.ret = None
71 71 # discover.outgoing object (contains common and outgoing data)
72 72 self.outgoing = None
73 73 # all remote heads before the push
74 74 self.remoteheads = None
75 75 # testable as a boolean indicating if any nodes are missing locally.
76 76 self.incoming = None
77 77 # set of all heads common after changeset bundle push
78 78 self.commonheads = None
79 79
80 80 def push(repo, remote, force=False, revs=None, newbranch=False):
81 81 '''Push outgoing changesets (limited by revs) from a local
82 82 repository to remote. Return an integer:
83 83 - None means nothing to push
84 84 - 0 means HTTP error
85 85 - 1 means we pushed and remote head count is unchanged *or*
86 86 we have outgoing changesets but refused to push
87 87 - other values as described by addchangegroup()
88 88 '''
89 89 pushop = pushoperation(repo, remote, force, revs, newbranch)
90 90 if pushop.remote.local():
91 91 missing = (set(pushop.repo.requirements)
92 92 - pushop.remote.local().supported)
93 93 if missing:
94 94 msg = _("required features are not"
95 95 " supported in the destination:"
96 96 " %s") % (', '.join(sorted(missing)))
97 97 raise util.Abort(msg)
98 98
99 99 # there are two ways to push to remote repo:
100 100 #
101 101 # addchangegroup assumes local user can lock remote
102 102 # repo (local filesystem, old ssh servers).
103 103 #
104 104 # unbundle assumes local user cannot lock remote repo (new ssh
105 105 # servers, http servers).
106 106
107 107 if not pushop.remote.canpush():
108 108 raise util.Abort(_("destination does not support push"))
109 109 # get local lock as we might write phase data
110 110 locallock = None
111 111 try:
112 112 locallock = pushop.repo.lock()
113 113 pushop.locallocked = True
114 114 except IOError, err:
115 115 pushop.locallocked = False
116 116 if err.errno != errno.EACCES:
117 117 raise
118 118 # source repo cannot be locked.
119 119 # We do not abort the push, but just disable the local phase
120 120 # synchronisation.
121 121 msg = 'cannot lock source repository: %s\n' % err
122 122 pushop.ui.debug(msg)
123 123 try:
124 124 pushop.repo.checkpush(pushop)
125 125 lock = None
126 126 unbundle = pushop.remote.capable('unbundle')
127 127 if not unbundle:
128 128 lock = pushop.remote.lock()
129 129 try:
130 130 _pushdiscovery(pushop)
131 131 if _pushcheckoutgoing(pushop):
132 132 pushop.repo.prepushoutgoinghooks(pushop.repo,
133 133 pushop.remote,
134 134 pushop.outgoing)
135 135 if pushop.remote.capable('bundle2'):
136 136 _pushbundle2(pushop)
137 137 else:
138 138 _pushchangeset(pushop)
139 139 _pushcomputecommonheads(pushop)
140 140 _pushsyncphase(pushop)
141 141 _pushobsolete(pushop)
142 142 finally:
143 143 if lock is not None:
144 144 lock.release()
145 145 finally:
146 146 if locallock is not None:
147 147 locallock.release()
148 148
149 149 _pushbookmark(pushop)
150 150 return pushop.ret
151 151
152 152 def _pushdiscovery(pushop):
153 153 # discovery
154 154 unfi = pushop.repo.unfiltered()
155 155 fci = discovery.findcommonincoming
156 156 commoninc = fci(unfi, pushop.remote, force=pushop.force)
157 157 common, inc, remoteheads = commoninc
158 158 fco = discovery.findcommonoutgoing
159 159 outgoing = fco(unfi, pushop.remote, onlyheads=pushop.revs,
160 160 commoninc=commoninc, force=pushop.force)
161 161 pushop.outgoing = outgoing
162 162 pushop.remoteheads = remoteheads
163 163 pushop.incoming = inc
164 164
165 165 def _pushcheckoutgoing(pushop):
166 166 outgoing = pushop.outgoing
167 167 unfi = pushop.repo.unfiltered()
168 168 if not outgoing.missing:
169 169 # nothing to push
170 170 scmutil.nochangesfound(unfi.ui, unfi, outgoing.excluded)
171 171 return False
172 172 # something to push
173 173 if not pushop.force:
174 174 # if repo.obsstore == False --> no obsolete
175 175 # then, save the iteration
176 176 if unfi.obsstore:
177 177 # this message are here for 80 char limit reason
178 178 mso = _("push includes obsolete changeset: %s!")
179 179 mst = "push includes %s changeset: %s!"
180 180 # plain versions for i18n tool to detect them
181 181 _("push includes unstable changeset: %s!")
182 182 _("push includes bumped changeset: %s!")
183 183 _("push includes divergent changeset: %s!")
184 184 # If we are to push if there is at least one
185 185 # obsolete or unstable changeset in missing, at
186 186 # least one of the missinghead will be obsolete or
187 187 # unstable. So checking heads only is ok
188 188 for node in outgoing.missingheads:
189 189 ctx = unfi[node]
190 190 if ctx.obsolete():
191 191 raise util.Abort(mso % ctx)
192 192 elif ctx.troubled():
193 193 raise util.Abort(_(mst)
194 194 % (ctx.troubles()[0],
195 195 ctx))
196 196 newbm = pushop.ui.configlist('bookmarks', 'pushing')
197 197 discovery.checkheads(unfi, pushop.remote, outgoing,
198 198 pushop.remoteheads,
199 199 pushop.newbranch,
200 200 bool(pushop.incoming),
201 201 newbm)
202 202 return True
203 203
204 204 def _pushbundle2(pushop):
205 205 """push data to the remote using bundle2
206 206
207 207 The only currently supported type of data is changegroup but this will
208 208 evolve in the future."""
209 209 # Send known head to the server for race detection.
210 210 capsblob = urllib.unquote(pushop.remote.capable('bundle2'))
211 211 caps = bundle2.decodecaps(capsblob)
212 212 bundler = bundle2.bundle20(pushop.ui, caps)
213 213 # create reply capability
214 214 capsblob = bundle2.encodecaps(pushop.repo.bundle2caps)
215 215 bundler.addpart(bundle2.bundlepart('replycaps', data=capsblob))
216 216 if not pushop.force:
217 217 part = bundle2.bundlepart('CHECK:HEADS', data=iter(pushop.remoteheads))
218 218 bundler.addpart(part)
219 219 # add the changegroup bundle
220 220 cg = changegroup.getlocalbundle(pushop.repo, 'push', pushop.outgoing)
221 221 cgpart = bundle2.bundlepart('CHANGEGROUP', data=cg.getchunks())
222 222 bundler.addpart(cgpart)
223 223 stream = util.chunkbuffer(bundler.getchunks())
224 224 reply = pushop.remote.unbundle(stream, ['force'], 'push')
225 225 try:
226 226 op = bundle2.processbundle(pushop.repo, reply)
227 227 except KeyError, exc:
228 228 raise util.Abort('missing support for %s' % exc)
229 229 cgreplies = op.records.getreplies(cgpart.id)
230 230 assert len(cgreplies['changegroup']) == 1
231 231 pushop.ret = cgreplies['changegroup'][0]['return']
232 232
233 233 def _pushchangeset(pushop):
234 234 """Make the actual push of changeset bundle to remote repo"""
235 235 outgoing = pushop.outgoing
236 236 unbundle = pushop.remote.capable('unbundle')
237 237 # TODO: get bundlecaps from remote
238 238 bundlecaps = None
239 239 # create a changegroup from local
240 240 if pushop.revs is None and not (outgoing.excluded
241 241 or pushop.repo.changelog.filteredrevs):
242 242 # push everything,
243 243 # use the fast path, no race possible on push
244 244 bundler = changegroup.bundle10(pushop.repo, bundlecaps)
245 245 cg = changegroup.getsubset(pushop.repo,
246 246 outgoing,
247 247 bundler,
248 248 'push',
249 249 fastpath=True)
250 250 else:
251 251 cg = changegroup.getlocalbundle(pushop.repo, 'push', outgoing,
252 252 bundlecaps)
253 253
254 254 # apply changegroup to remote
255 255 if unbundle:
256 256 # local repo finds heads on server, finds out what
257 257 # revs it must push. once revs transferred, if server
258 258 # finds it has different heads (someone else won
259 259 # commit/push race), server aborts.
260 260 if pushop.force:
261 261 remoteheads = ['force']
262 262 else:
263 263 remoteheads = pushop.remoteheads
264 264 # ssh: return remote's addchangegroup()
265 265 # http: return remote's addchangegroup() or 0 for error
266 266 pushop.ret = pushop.remote.unbundle(cg, remoteheads,
267 267 'push')
268 268 else:
269 269 # we return an integer indicating remote head count
270 270 # change
271 271 pushop.ret = pushop.remote.addchangegroup(cg, 'push', pushop.repo.url())
272 272
273 273 def _pushcomputecommonheads(pushop):
274 274 unfi = pushop.repo.unfiltered()
275 275 if pushop.ret:
276 276 # push succeed, synchronize target of the push
277 277 cheads = pushop.outgoing.missingheads
278 278 elif pushop.revs is None:
279 279 # All out push fails. synchronize all common
280 280 cheads = pushop.outgoing.commonheads
281 281 else:
282 282 # I want cheads = heads(::missingheads and ::commonheads)
283 283 # (missingheads is revs with secret changeset filtered out)
284 284 #
285 285 # This can be expressed as:
286 286 # cheads = ( (missingheads and ::commonheads)
287 287 # + (commonheads and ::missingheads))"
288 288 # )
289 289 #
290 290 # while trying to push we already computed the following:
291 291 # common = (::commonheads)
292 292 # missing = ((commonheads::missingheads) - commonheads)
293 293 #
294 294 # We can pick:
295 295 # * missingheads part of common (::commonheads)
296 296 common = set(pushop.outgoing.common)
297 297 nm = pushop.repo.changelog.nodemap
298 298 cheads = [node for node in pushop.revs if nm[node] in common]
299 299 # and
300 300 # * commonheads parents on missing
301 301 revset = unfi.set('%ln and parents(roots(%ln))',
302 302 pushop.outgoing.commonheads,
303 303 pushop.outgoing.missing)
304 304 cheads.extend(c.node() for c in revset)
305 305 pushop.commonheads = cheads
306 306
307 307 def _pushsyncphase(pushop):
308 308 """synchronise phase information locally and remotely"""
309 309 unfi = pushop.repo.unfiltered()
310 310 cheads = pushop.commonheads
311 311 if pushop.ret:
312 312 # push succeed, synchronize target of the push
313 313 cheads = pushop.outgoing.missingheads
314 314 elif pushop.revs is None:
315 315 # All out push fails. synchronize all common
316 316 cheads = pushop.outgoing.commonheads
317 317 else:
318 318 # I want cheads = heads(::missingheads and ::commonheads)
319 319 # (missingheads is revs with secret changeset filtered out)
320 320 #
321 321 # This can be expressed as:
322 322 # cheads = ( (missingheads and ::commonheads)
323 323 # + (commonheads and ::missingheads))"
324 324 # )
325 325 #
326 326 # while trying to push we already computed the following:
327 327 # common = (::commonheads)
328 328 # missing = ((commonheads::missingheads) - commonheads)
329 329 #
330 330 # We can pick:
331 331 # * missingheads part of common (::commonheads)
332 332 common = set(pushop.outgoing.common)
333 333 nm = pushop.repo.changelog.nodemap
334 334 cheads = [node for node in pushop.revs if nm[node] in common]
335 335 # and
336 336 # * commonheads parents on missing
337 337 revset = unfi.set('%ln and parents(roots(%ln))',
338 338 pushop.outgoing.commonheads,
339 339 pushop.outgoing.missing)
340 340 cheads.extend(c.node() for c in revset)
341 341 pushop.commonheads = cheads
342 342 # even when we don't push, exchanging phase data is useful
343 343 remotephases = pushop.remote.listkeys('phases')
344 344 if (pushop.ui.configbool('ui', '_usedassubrepo', False)
345 345 and remotephases # server supports phases
346 346 and pushop.ret is None # nothing was pushed
347 347 and remotephases.get('publishing', False)):
348 348 # When:
349 349 # - this is a subrepo push
350 350 # - and remote support phase
351 351 # - and no changeset was pushed
352 352 # - and remote is publishing
353 353 # We may be in issue 3871 case!
354 354 # We drop the possible phase synchronisation done by
355 355 # courtesy to publish changesets possibly locally draft
356 356 # on the remote.
357 357 remotephases = {'publishing': 'True'}
358 358 if not remotephases: # old server or public only reply from non-publishing
359 359 _localphasemove(pushop, cheads)
360 360 # don't push any phase data as there is nothing to push
361 361 else:
362 362 ana = phases.analyzeremotephases(pushop.repo, cheads,
363 363 remotephases)
364 364 pheads, droots = ana
365 365 ### Apply remote phase on local
366 366 if remotephases.get('publishing', False):
367 367 _localphasemove(pushop, cheads)
368 368 else: # publish = False
369 369 _localphasemove(pushop, pheads)
370 370 _localphasemove(pushop, cheads, phases.draft)
371 371 ### Apply local phase on remote
372 372
373 373 # Get the list of all revs draft on remote by public here.
374 374 # XXX Beware that revset break if droots is not strictly
375 375 # XXX root we may want to ensure it is but it is costly
376 376 outdated = unfi.set('heads((%ln::%ln) and public())',
377 377 droots, cheads)
378 378 for newremotehead in outdated:
379 379 r = pushop.remote.pushkey('phases',
380 380 newremotehead.hex(),
381 381 str(phases.draft),
382 382 str(phases.public))
383 383 if not r:
384 384 pushop.ui.warn(_('updating %s to public failed!\n')
385 385 % newremotehead)
386 386
387 387 def _localphasemove(pushop, nodes, phase=phases.public):
388 388 """move <nodes> to <phase> in the local source repo"""
389 389 if pushop.locallocked:
390 390 phases.advanceboundary(pushop.repo, phase, nodes)
391 391 else:
392 392 # repo is not locked, do not change any phases!
393 393 # Informs the user that phases should have been moved when
394 394 # applicable.
395 395 actualmoves = [n for n in nodes if phase < pushop.repo[n].phase()]
396 396 phasestr = phases.phasenames[phase]
397 397 if actualmoves:
398 398 pushop.ui.status(_('cannot lock source repo, skipping '
399 399 'local %s phase update\n') % phasestr)
400 400
401 401 def _pushobsolete(pushop):
402 402 """utility function to push obsolete markers to a remote"""
403 403 pushop.ui.debug('try to push obsolete markers to remote\n')
404 404 repo = pushop.repo
405 405 remote = pushop.remote
406 406 if (obsolete._enabled and repo.obsstore and
407 407 'obsolete' in remote.listkeys('namespaces')):
408 408 rslts = []
409 409 remotedata = repo.listkeys('obsolete')
410 410 for key in sorted(remotedata, reverse=True):
411 411 # reverse sort to ensure we end with dump0
412 412 data = remotedata[key]
413 413 rslts.append(remote.pushkey('obsolete', key, '', data))
414 414 if [r for r in rslts if not r]:
415 415 msg = _('failed to push some obsolete markers!\n')
416 416 repo.ui.warn(msg)
417 417
418 418 def _pushbookmark(pushop):
419 419 """Update bookmark position on remote"""
420 420 ui = pushop.ui
421 421 repo = pushop.repo.unfiltered()
422 422 remote = pushop.remote
423 423 ui.debug("checking for updated bookmarks\n")
424 424 revnums = map(repo.changelog.rev, pushop.revs or [])
425 425 ancestors = [a for a in repo.changelog.ancestors(revnums, inclusive=True)]
426 426 (addsrc, adddst, advsrc, advdst, diverge, differ, invalid
427 427 ) = bookmarks.compare(repo, repo._bookmarks, remote.listkeys('bookmarks'),
428 428 srchex=hex)
429 429
430 430 for b, scid, dcid in advsrc:
431 431 if ancestors and repo[scid].rev() not in ancestors:
432 432 continue
433 433 if remote.pushkey('bookmarks', b, dcid, scid):
434 434 ui.status(_("updating bookmark %s\n") % b)
435 435 else:
436 436 ui.warn(_('updating bookmark %s failed!\n') % b)
437 437
438 438 class pulloperation(object):
439 439 """A object that represent a single pull operation
440 440
441 441 It purpose is to carry push related state and very common operation.
442 442
443 443 A new should be created at the beginning of each pull and discarded
444 444 afterward.
445 445 """
446 446
447 447 def __init__(self, repo, remote, heads=None, force=False):
448 448 # repo we pull into
449 449 self.repo = repo
450 450 # repo we pull from
451 451 self.remote = remote
452 452 # revision we try to pull (None is "all")
453 453 self.heads = heads
454 454 # do we force pull?
455 455 self.force = force
456 456 # the name the pull transaction
457 457 self._trname = 'pull\n' + util.hidepassword(remote.url())
458 458 # hold the transaction once created
459 459 self._tr = None
460 460 # set of common changeset between local and remote before pull
461 461 self.common = None
462 462 # set of pulled head
463 463 self.rheads = None
464 464 # list of missing changeset to fetch remotely
465 465 self.fetch = None
466 466 # result of changegroup pulling (used as return code by pull)
467 467 self.cgresult = None
468 468 # list of step remaining todo (related to future bundle2 usage)
469 469 self.todosteps = set(['changegroup', 'phases', 'obsmarkers'])
470 470
471 471 @util.propertycache
472 472 def pulledsubset(self):
473 473 """heads of the set of changeset target by the pull"""
474 474 # compute target subset
475 475 if self.heads is None:
476 476 # We pulled every thing possible
477 477 # sync on everything common
478 478 c = set(self.common)
479 479 ret = list(self.common)
480 480 for n in self.rheads:
481 481 if n not in c:
482 482 ret.append(n)
483 483 return ret
484 484 else:
485 485 # We pulled a specific subset
486 486 # sync on this subset
487 487 return self.heads
488 488
489 489 def gettransaction(self):
490 490 """get appropriate pull transaction, creating it if needed"""
491 491 if self._tr is None:
492 492 self._tr = self.repo.transaction(self._trname)
493 493 return self._tr
494 494
495 495 def closetransaction(self):
496 496 """close transaction if created"""
497 497 if self._tr is not None:
498 498 self._tr.close()
499 499
500 500 def releasetransaction(self):
501 501 """release transaction if created"""
502 502 if self._tr is not None:
503 503 self._tr.release()
504 504
505 505 def pull(repo, remote, heads=None, force=False):
506 506 pullop = pulloperation(repo, remote, heads, force)
507 507 if pullop.remote.local():
508 508 missing = set(pullop.remote.requirements) - pullop.repo.supported
509 509 if missing:
510 510 msg = _("required features are not"
511 511 " supported in the destination:"
512 512 " %s") % (', '.join(sorted(missing)))
513 513 raise util.Abort(msg)
514 514
515 515 lock = pullop.repo.lock()
516 516 try:
517 517 _pulldiscovery(pullop)
518 518 if pullop.remote.capable('bundle2'):
519 519 _pullbundle2(pullop)
520 520 if 'changegroup' in pullop.todosteps:
521 521 _pullchangeset(pullop)
522 522 if 'phases' in pullop.todosteps:
523 523 _pullphase(pullop)
524 524 if 'obsmarkers' in pullop.todosteps:
525 525 _pullobsolete(pullop)
526 526 pullop.closetransaction()
527 527 finally:
528 528 pullop.releasetransaction()
529 529 lock.release()
530 530
531 531 return pullop.cgresult
532 532
533 533 def _pulldiscovery(pullop):
534 534 """discovery phase for the pull
535 535
536 536 Current handle changeset discovery only, will change handle all discovery
537 537 at some point."""
538 538 tmp = discovery.findcommonincoming(pullop.repo.unfiltered(),
539 539 pullop.remote,
540 540 heads=pullop.heads,
541 541 force=pullop.force)
542 542 pullop.common, pullop.fetch, pullop.rheads = tmp
543 543
544 544 def _pullbundle2(pullop):
545 545 """pull data using bundle2
546 546
547 547 For now, the only supported data are changegroup."""
548 548 kwargs = {'bundlecaps': set(['HG20'])}
549 capsblob = bundle2.encodecaps(pullop.repo.bundle2caps)
550 kwargs['bundlecaps'].add('bundle2=' + urllib.quote(capsblob))
549 551 # pulling changegroup
550 552 pullop.todosteps.remove('changegroup')
551 553 if not pullop.fetch:
552 554 pullop.repo.ui.status(_("no changes found\n"))
553 555 pullop.cgresult = 0
554 556 else:
555 557 kwargs['common'] = pullop.common
556 558 kwargs['heads'] = pullop.heads or pullop.rheads
557 559 if pullop.heads is None and list(pullop.common) == [nullid]:
558 560 pullop.repo.ui.status(_("requesting all changes\n"))
559 561 if kwargs.keys() == ['format']:
560 562 return # nothing to pull
561 563 bundle = pullop.remote.getbundle('pull', **kwargs)
562 564 try:
563 565 op = bundle2.processbundle(pullop.repo, bundle, pullop.gettransaction)
564 566 except KeyError, exc:
565 567 raise util.Abort('missing support for %s' % exc)
566 568 assert len(op.records['changegroup']) == 1
567 569 pullop.cgresult = op.records['changegroup'][0]['return']
568 570
569 571 def _pullchangeset(pullop):
570 572 """pull changeset from unbundle into the local repo"""
571 573 # We delay the open of the transaction as late as possible so we
572 574 # don't open transaction for nothing or you break future useful
573 575 # rollback call
574 576 pullop.todosteps.remove('changegroup')
575 577 if not pullop.fetch:
576 578 pullop.repo.ui.status(_("no changes found\n"))
577 579 pullop.cgresult = 0
578 580 return
579 581 pullop.gettransaction()
580 582 if pullop.heads is None and list(pullop.common) == [nullid]:
581 583 pullop.repo.ui.status(_("requesting all changes\n"))
582 584 elif pullop.heads is None and pullop.remote.capable('changegroupsubset'):
583 585 # issue1320, avoid a race if remote changed after discovery
584 586 pullop.heads = pullop.rheads
585 587
586 588 if pullop.remote.capable('getbundle'):
587 589 # TODO: get bundlecaps from remote
588 590 cg = pullop.remote.getbundle('pull', common=pullop.common,
589 591 heads=pullop.heads or pullop.rheads)
590 592 elif pullop.heads is None:
591 593 cg = pullop.remote.changegroup(pullop.fetch, 'pull')
592 594 elif not pullop.remote.capable('changegroupsubset'):
593 595 raise util.Abort(_("partial pull cannot be done because "
594 596 "other repository doesn't support "
595 597 "changegroupsubset."))
596 598 else:
597 599 cg = pullop.remote.changegroupsubset(pullop.fetch, pullop.heads, 'pull')
598 600 pullop.cgresult = changegroup.addchangegroup(pullop.repo, cg, 'pull',
599 601 pullop.remote.url())
600 602
601 603 def _pullphase(pullop):
602 604 # Get remote phases data from remote
603 605 pullop.todosteps.remove('phases')
604 606 remotephases = pullop.remote.listkeys('phases')
605 607 publishing = bool(remotephases.get('publishing', False))
606 608 if remotephases and not publishing:
607 609 # remote is new and unpublishing
608 610 pheads, _dr = phases.analyzeremotephases(pullop.repo,
609 611 pullop.pulledsubset,
610 612 remotephases)
611 613 phases.advanceboundary(pullop.repo, phases.public, pheads)
612 614 phases.advanceboundary(pullop.repo, phases.draft,
613 615 pullop.pulledsubset)
614 616 else:
615 617 # Remote is old or publishing all common changesets
616 618 # should be seen as public
617 619 phases.advanceboundary(pullop.repo, phases.public,
618 620 pullop.pulledsubset)
619 621
620 622 def _pullobsolete(pullop):
621 623 """utility function to pull obsolete markers from a remote
622 624
623 625 The `gettransaction` is function that return the pull transaction, creating
624 626 one if necessary. We return the transaction to inform the calling code that
625 627 a new transaction have been created (when applicable).
626 628
627 629 Exists mostly to allow overriding for experimentation purpose"""
628 630 pullop.todosteps.remove('obsmarkers')
629 631 tr = None
630 632 if obsolete._enabled:
631 633 pullop.repo.ui.debug('fetching remote obsolete markers\n')
632 634 remoteobs = pullop.remote.listkeys('obsolete')
633 635 if 'dump0' in remoteobs:
634 636 tr = pullop.gettransaction()
635 637 for key in sorted(remoteobs, reverse=True):
636 638 if key.startswith('dump'):
637 639 data = base85.b85decode(remoteobs[key])
638 640 pullop.repo.obsstore.mergemarkers(tr, data)
639 641 pullop.repo.invalidatevolatilesets()
640 642 return tr
641 643
642 644 def getbundle(repo, source, heads=None, common=None, bundlecaps=None):
643 645 """return a full bundle (with potentially multiple kind of parts)
644 646
645 647 Could be a bundle HG10 or a bundle HG20 depending on bundlecaps
646 648 passed. For now, the bundle can contain only changegroup, but this will
647 649 changes when more part type will be available for bundle2.
648 650
649 651 This is different from changegroup.getbundle that only returns an HG10
650 652 changegroup bundle. They may eventually get reunited in the future when we
651 653 have a clearer idea of the API we what to query different data.
652 654
653 655 The implementation is at a very early stage and will get massive rework
654 656 when the API of bundle is refined.
655 657 """
656 658 # build bundle here.
657 659 cg = changegroup.getbundle(repo, source, heads=heads,
658 660 common=common, bundlecaps=bundlecaps)
659 661 if bundlecaps is None or 'HG20' not in bundlecaps:
660 662 return cg
661 663 # very crude first implementation,
662 664 # the bundle API will change and the generation will be done lazily.
663 bundler = bundle2.bundle20(repo.ui)
665 b2caps = {}
666 for bcaps in bundlecaps:
667 if bcaps.startswith('bundle2='):
668 blob = urllib.unquote(bcaps[len('bundle2='):])
669 b2caps.update(bundle2.decodecaps(blob))
670 bundler = bundle2.bundle20(repo.ui, b2caps)
664 671 part = bundle2.bundlepart('changegroup', data=cg.getchunks())
665 672 bundler.addpart(part)
666 673 return util.chunkbuffer(bundler.getchunks())
667 674
668 675 class PushRaced(RuntimeError):
669 676 """An exception raised during unbundling that indicate a push race"""
670 677
671 678 def check_heads(repo, their_heads, context):
672 679 """check if the heads of a repo have been modified
673 680
674 681 Used by peer for unbundling.
675 682 """
676 683 heads = repo.heads()
677 684 heads_hash = util.sha1(''.join(sorted(heads))).digest()
678 685 if not (their_heads == ['force'] or their_heads == heads or
679 686 their_heads == ['hashed', heads_hash]):
680 687 # someone else committed/pushed/unbundled while we
681 688 # were transferring data
682 689 raise PushRaced('repository changed while %s - '
683 690 'please try again' % context)
684 691
685 692 def unbundle(repo, cg, heads, source, url):
686 693 """Apply a bundle to a repo.
687 694
688 695 this function makes sure the repo is locked during the application and have
689 696 mechanism to check that no push race occurred between the creation of the
690 697 bundle and its application.
691 698
692 699 If the push was raced as PushRaced exception is raised."""
693 700 r = 0
694 701 # need a transaction when processing a bundle2 stream
695 702 tr = None
696 703 lock = repo.lock()
697 704 try:
698 705 check_heads(repo, heads, 'uploading changes')
699 706 # push can proceed
700 707 if util.safehasattr(cg, 'params'):
701 708 tr = repo.transaction('unbundle')
702 709 r = bundle2.processbundle(repo, cg, lambda: tr).reply
703 710 tr.close()
704 711 else:
705 712 r = changegroup.addchangegroup(repo, cg, source, url)
706 713 finally:
707 714 if tr is not None:
708 715 tr.release()
709 716 lock.release()
710 717 return r
General Comments 0
You need to be logged in to leave comments. Login now