##// END OF EJS Templates
bundle2: catch UnknownPartError during push...
Pierre-Yves David -
r21180:b0567772 stable
parent child Browse files
Show More
@@ -1,757 +1,757
1 1 # exchange.py - utility to exchange data between repos.
2 2 #
3 3 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from i18n import _
9 9 from node import hex, nullid
10 10 import errno, urllib
11 11 import util, scmutil, changegroup, base85
12 12 import discovery, phases, obsolete, bookmarks, bundle2
13 13
14 14 def readbundle(ui, fh, fname, vfs=None):
15 15 header = changegroup.readexactly(fh, 4)
16 16
17 17 alg = None
18 18 if not fname:
19 19 fname = "stream"
20 20 if not header.startswith('HG') and header.startswith('\0'):
21 21 fh = changegroup.headerlessfixup(fh, header)
22 22 header = "HG10"
23 23 alg = 'UN'
24 24 elif vfs:
25 25 fname = vfs.join(fname)
26 26
27 27 magic, version = header[0:2], header[2:4]
28 28
29 29 if magic != 'HG':
30 30 raise util.Abort(_('%s: not a Mercurial bundle') % fname)
31 31 if version == '10':
32 32 if alg is None:
33 33 alg = changegroup.readexactly(fh, 2)
34 34 return changegroup.unbundle10(fh, alg)
35 35 elif version == '2X':
36 36 return bundle2.unbundle20(ui, fh, header=magic + version)
37 37 else:
38 38 raise util.Abort(_('%s: unknown bundle version %s') % (fname, version))
39 39
40 40
41 41 class pushoperation(object):
42 42 """A object that represent a single push operation
43 43
44 44 It purpose is to carry push related state and very common operation.
45 45
46 46 A new should be created at the beginning of each push and discarded
47 47 afterward.
48 48 """
49 49
50 50 def __init__(self, repo, remote, force=False, revs=None, newbranch=False):
51 51 # repo we push from
52 52 self.repo = repo
53 53 self.ui = repo.ui
54 54 # repo we push to
55 55 self.remote = remote
56 56 # force option provided
57 57 self.force = force
58 58 # revs to be pushed (None is "all")
59 59 self.revs = revs
60 60 # allow push of new branch
61 61 self.newbranch = newbranch
62 62 # did a local lock get acquired?
63 63 self.locallocked = None
64 64 # Integer version of the push result
65 65 # - None means nothing to push
66 66 # - 0 means HTTP error
67 67 # - 1 means we pushed and remote head count is unchanged *or*
68 68 # we have outgoing changesets but refused to push
69 69 # - other values as described by addchangegroup()
70 70 self.ret = None
71 71 # discover.outgoing object (contains common and outgoing data)
72 72 self.outgoing = None
73 73 # all remote heads before the push
74 74 self.remoteheads = None
75 75 # testable as a boolean indicating if any nodes are missing locally.
76 76 self.incoming = None
77 77 # set of all heads common after changeset bundle push
78 78 self.commonheads = None
79 79
80 80 def push(repo, remote, force=False, revs=None, newbranch=False):
81 81 '''Push outgoing changesets (limited by revs) from a local
82 82 repository to remote. Return an integer:
83 83 - None means nothing to push
84 84 - 0 means HTTP error
85 85 - 1 means we pushed and remote head count is unchanged *or*
86 86 we have outgoing changesets but refused to push
87 87 - other values as described by addchangegroup()
88 88 '''
89 89 pushop = pushoperation(repo, remote, force, revs, newbranch)
90 90 if pushop.remote.local():
91 91 missing = (set(pushop.repo.requirements)
92 92 - pushop.remote.local().supported)
93 93 if missing:
94 94 msg = _("required features are not"
95 95 " supported in the destination:"
96 96 " %s") % (', '.join(sorted(missing)))
97 97 raise util.Abort(msg)
98 98
99 99 # there are two ways to push to remote repo:
100 100 #
101 101 # addchangegroup assumes local user can lock remote
102 102 # repo (local filesystem, old ssh servers).
103 103 #
104 104 # unbundle assumes local user cannot lock remote repo (new ssh
105 105 # servers, http servers).
106 106
107 107 if not pushop.remote.canpush():
108 108 raise util.Abort(_("destination does not support push"))
109 109 # get local lock as we might write phase data
110 110 locallock = None
111 111 try:
112 112 locallock = pushop.repo.lock()
113 113 pushop.locallocked = True
114 114 except IOError, err:
115 115 pushop.locallocked = False
116 116 if err.errno != errno.EACCES:
117 117 raise
118 118 # source repo cannot be locked.
119 119 # We do not abort the push, but just disable the local phase
120 120 # synchronisation.
121 121 msg = 'cannot lock source repository: %s\n' % err
122 122 pushop.ui.debug(msg)
123 123 try:
124 124 pushop.repo.checkpush(pushop)
125 125 lock = None
126 126 unbundle = pushop.remote.capable('unbundle')
127 127 if not unbundle:
128 128 lock = pushop.remote.lock()
129 129 try:
130 130 _pushdiscovery(pushop)
131 131 if _pushcheckoutgoing(pushop):
132 132 pushop.repo.prepushoutgoinghooks(pushop.repo,
133 133 pushop.remote,
134 134 pushop.outgoing)
135 135 if (pushop.repo.ui.configbool('experimental', 'bundle2-exp',
136 136 False)
137 137 and pushop.remote.capable('bundle2-exp')):
138 138 _pushbundle2(pushop)
139 139 else:
140 140 _pushchangeset(pushop)
141 141 _pushcomputecommonheads(pushop)
142 142 _pushsyncphase(pushop)
143 143 _pushobsolete(pushop)
144 144 finally:
145 145 if lock is not None:
146 146 lock.release()
147 147 finally:
148 148 if locallock is not None:
149 149 locallock.release()
150 150
151 151 _pushbookmark(pushop)
152 152 return pushop.ret
153 153
154 154 def _pushdiscovery(pushop):
155 155 # discovery
156 156 unfi = pushop.repo.unfiltered()
157 157 fci = discovery.findcommonincoming
158 158 commoninc = fci(unfi, pushop.remote, force=pushop.force)
159 159 common, inc, remoteheads = commoninc
160 160 fco = discovery.findcommonoutgoing
161 161 outgoing = fco(unfi, pushop.remote, onlyheads=pushop.revs,
162 162 commoninc=commoninc, force=pushop.force)
163 163 pushop.outgoing = outgoing
164 164 pushop.remoteheads = remoteheads
165 165 pushop.incoming = inc
166 166
167 167 def _pushcheckoutgoing(pushop):
168 168 outgoing = pushop.outgoing
169 169 unfi = pushop.repo.unfiltered()
170 170 if not outgoing.missing:
171 171 # nothing to push
172 172 scmutil.nochangesfound(unfi.ui, unfi, outgoing.excluded)
173 173 return False
174 174 # something to push
175 175 if not pushop.force:
176 176 # if repo.obsstore == False --> no obsolete
177 177 # then, save the iteration
178 178 if unfi.obsstore:
179 179 # this message are here for 80 char limit reason
180 180 mso = _("push includes obsolete changeset: %s!")
181 181 mst = "push includes %s changeset: %s!"
182 182 # plain versions for i18n tool to detect them
183 183 _("push includes unstable changeset: %s!")
184 184 _("push includes bumped changeset: %s!")
185 185 _("push includes divergent changeset: %s!")
186 186 # If we are to push if there is at least one
187 187 # obsolete or unstable changeset in missing, at
188 188 # least one of the missinghead will be obsolete or
189 189 # unstable. So checking heads only is ok
190 190 for node in outgoing.missingheads:
191 191 ctx = unfi[node]
192 192 if ctx.obsolete():
193 193 raise util.Abort(mso % ctx)
194 194 elif ctx.troubled():
195 195 raise util.Abort(_(mst)
196 196 % (ctx.troubles()[0],
197 197 ctx))
198 198 newbm = pushop.ui.configlist('bookmarks', 'pushing')
199 199 discovery.checkheads(unfi, pushop.remote, outgoing,
200 200 pushop.remoteheads,
201 201 pushop.newbranch,
202 202 bool(pushop.incoming),
203 203 newbm)
204 204 return True
205 205
206 206 def _pushbundle2(pushop):
207 207 """push data to the remote using bundle2
208 208
209 209 The only currently supported type of data is changegroup but this will
210 210 evolve in the future."""
211 211 # Send known head to the server for race detection.
212 212 capsblob = urllib.unquote(pushop.remote.capable('bundle2-exp'))
213 213 caps = bundle2.decodecaps(capsblob)
214 214 bundler = bundle2.bundle20(pushop.ui, caps)
215 215 # create reply capability
216 216 capsblob = bundle2.encodecaps(pushop.repo.bundle2caps)
217 217 bundler.addpart(bundle2.bundlepart('b2x:replycaps', data=capsblob))
218 218 if not pushop.force:
219 219 part = bundle2.bundlepart('B2X:CHECK:HEADS',
220 220 data=iter(pushop.remoteheads))
221 221 bundler.addpart(part)
222 222 extrainfo = _pushbundle2extraparts(pushop, bundler)
223 223 # add the changegroup bundle
224 224 cg = changegroup.getlocalbundle(pushop.repo, 'push', pushop.outgoing)
225 225 cgpart = bundle2.bundlepart('B2X:CHANGEGROUP', data=cg.getchunks())
226 226 bundler.addpart(cgpart)
227 227 stream = util.chunkbuffer(bundler.getchunks())
228 228 reply = pushop.remote.unbundle(stream, ['force'], 'push')
229 229 try:
230 230 op = bundle2.processbundle(pushop.repo, reply)
231 except KeyError, exc:
231 except bundle2.UnknownPartError, exc:
232 232 raise util.Abort('missing support for %s' % exc)
233 233 cgreplies = op.records.getreplies(cgpart.id)
234 234 assert len(cgreplies['changegroup']) == 1
235 235 pushop.ret = cgreplies['changegroup'][0]['return']
236 236 _pushbundle2extrareply(pushop, op, extrainfo)
237 237
238 238 def _pushbundle2extraparts(pushop, bundler):
239 239 """hook function to let extensions add parts
240 240
241 241 Return a dict to let extensions pass data to the reply processing.
242 242 """
243 243 return {}
244 244
245 245 def _pushbundle2extrareply(pushop, op, extrainfo):
246 246 """hook function to let extensions react to part replies
247 247
248 248 The dict from _pushbundle2extrareply is fed to this function.
249 249 """
250 250 pass
251 251
252 252 def _pushchangeset(pushop):
253 253 """Make the actual push of changeset bundle to remote repo"""
254 254 outgoing = pushop.outgoing
255 255 unbundle = pushop.remote.capable('unbundle')
256 256 # TODO: get bundlecaps from remote
257 257 bundlecaps = None
258 258 # create a changegroup from local
259 259 if pushop.revs is None and not (outgoing.excluded
260 260 or pushop.repo.changelog.filteredrevs):
261 261 # push everything,
262 262 # use the fast path, no race possible on push
263 263 bundler = changegroup.bundle10(pushop.repo, bundlecaps)
264 264 cg = changegroup.getsubset(pushop.repo,
265 265 outgoing,
266 266 bundler,
267 267 'push',
268 268 fastpath=True)
269 269 else:
270 270 cg = changegroup.getlocalbundle(pushop.repo, 'push', outgoing,
271 271 bundlecaps)
272 272
273 273 # apply changegroup to remote
274 274 if unbundle:
275 275 # local repo finds heads on server, finds out what
276 276 # revs it must push. once revs transferred, if server
277 277 # finds it has different heads (someone else won
278 278 # commit/push race), server aborts.
279 279 if pushop.force:
280 280 remoteheads = ['force']
281 281 else:
282 282 remoteheads = pushop.remoteheads
283 283 # ssh: return remote's addchangegroup()
284 284 # http: return remote's addchangegroup() or 0 for error
285 285 pushop.ret = pushop.remote.unbundle(cg, remoteheads,
286 286 'push')
287 287 else:
288 288 # we return an integer indicating remote head count
289 289 # change
290 290 pushop.ret = pushop.remote.addchangegroup(cg, 'push', pushop.repo.url())
291 291
292 292 def _pushcomputecommonheads(pushop):
293 293 unfi = pushop.repo.unfiltered()
294 294 if pushop.ret:
295 295 # push succeed, synchronize target of the push
296 296 cheads = pushop.outgoing.missingheads
297 297 elif pushop.revs is None:
298 298 # All out push fails. synchronize all common
299 299 cheads = pushop.outgoing.commonheads
300 300 else:
301 301 # I want cheads = heads(::missingheads and ::commonheads)
302 302 # (missingheads is revs with secret changeset filtered out)
303 303 #
304 304 # This can be expressed as:
305 305 # cheads = ( (missingheads and ::commonheads)
306 306 # + (commonheads and ::missingheads))"
307 307 # )
308 308 #
309 309 # while trying to push we already computed the following:
310 310 # common = (::commonheads)
311 311 # missing = ((commonheads::missingheads) - commonheads)
312 312 #
313 313 # We can pick:
314 314 # * missingheads part of common (::commonheads)
315 315 common = set(pushop.outgoing.common)
316 316 nm = pushop.repo.changelog.nodemap
317 317 cheads = [node for node in pushop.revs if nm[node] in common]
318 318 # and
319 319 # * commonheads parents on missing
320 320 revset = unfi.set('%ln and parents(roots(%ln))',
321 321 pushop.outgoing.commonheads,
322 322 pushop.outgoing.missing)
323 323 cheads.extend(c.node() for c in revset)
324 324 pushop.commonheads = cheads
325 325
326 326 def _pushsyncphase(pushop):
327 327 """synchronise phase information locally and remotely"""
328 328 unfi = pushop.repo.unfiltered()
329 329 cheads = pushop.commonheads
330 330 if pushop.ret:
331 331 # push succeed, synchronize target of the push
332 332 cheads = pushop.outgoing.missingheads
333 333 elif pushop.revs is None:
334 334 # All out push fails. synchronize all common
335 335 cheads = pushop.outgoing.commonheads
336 336 else:
337 337 # I want cheads = heads(::missingheads and ::commonheads)
338 338 # (missingheads is revs with secret changeset filtered out)
339 339 #
340 340 # This can be expressed as:
341 341 # cheads = ( (missingheads and ::commonheads)
342 342 # + (commonheads and ::missingheads))"
343 343 # )
344 344 #
345 345 # while trying to push we already computed the following:
346 346 # common = (::commonheads)
347 347 # missing = ((commonheads::missingheads) - commonheads)
348 348 #
349 349 # We can pick:
350 350 # * missingheads part of common (::commonheads)
351 351 common = set(pushop.outgoing.common)
352 352 nm = pushop.repo.changelog.nodemap
353 353 cheads = [node for node in pushop.revs if nm[node] in common]
354 354 # and
355 355 # * commonheads parents on missing
356 356 revset = unfi.set('%ln and parents(roots(%ln))',
357 357 pushop.outgoing.commonheads,
358 358 pushop.outgoing.missing)
359 359 cheads.extend(c.node() for c in revset)
360 360 pushop.commonheads = cheads
361 361 # even when we don't push, exchanging phase data is useful
362 362 remotephases = pushop.remote.listkeys('phases')
363 363 if (pushop.ui.configbool('ui', '_usedassubrepo', False)
364 364 and remotephases # server supports phases
365 365 and pushop.ret is None # nothing was pushed
366 366 and remotephases.get('publishing', False)):
367 367 # When:
368 368 # - this is a subrepo push
369 369 # - and remote support phase
370 370 # - and no changeset was pushed
371 371 # - and remote is publishing
372 372 # We may be in issue 3871 case!
373 373 # We drop the possible phase synchronisation done by
374 374 # courtesy to publish changesets possibly locally draft
375 375 # on the remote.
376 376 remotephases = {'publishing': 'True'}
377 377 if not remotephases: # old server or public only reply from non-publishing
378 378 _localphasemove(pushop, cheads)
379 379 # don't push any phase data as there is nothing to push
380 380 else:
381 381 ana = phases.analyzeremotephases(pushop.repo, cheads,
382 382 remotephases)
383 383 pheads, droots = ana
384 384 ### Apply remote phase on local
385 385 if remotephases.get('publishing', False):
386 386 _localphasemove(pushop, cheads)
387 387 else: # publish = False
388 388 _localphasemove(pushop, pheads)
389 389 _localphasemove(pushop, cheads, phases.draft)
390 390 ### Apply local phase on remote
391 391
392 392 # Get the list of all revs draft on remote by public here.
393 393 # XXX Beware that revset break if droots is not strictly
394 394 # XXX root we may want to ensure it is but it is costly
395 395 outdated = unfi.set('heads((%ln::%ln) and public())',
396 396 droots, cheads)
397 397 for newremotehead in outdated:
398 398 r = pushop.remote.pushkey('phases',
399 399 newremotehead.hex(),
400 400 str(phases.draft),
401 401 str(phases.public))
402 402 if not r:
403 403 pushop.ui.warn(_('updating %s to public failed!\n')
404 404 % newremotehead)
405 405
406 406 def _localphasemove(pushop, nodes, phase=phases.public):
407 407 """move <nodes> to <phase> in the local source repo"""
408 408 if pushop.locallocked:
409 409 phases.advanceboundary(pushop.repo, phase, nodes)
410 410 else:
411 411 # repo is not locked, do not change any phases!
412 412 # Informs the user that phases should have been moved when
413 413 # applicable.
414 414 actualmoves = [n for n in nodes if phase < pushop.repo[n].phase()]
415 415 phasestr = phases.phasenames[phase]
416 416 if actualmoves:
417 417 pushop.ui.status(_('cannot lock source repo, skipping '
418 418 'local %s phase update\n') % phasestr)
419 419
420 420 def _pushobsolete(pushop):
421 421 """utility function to push obsolete markers to a remote"""
422 422 pushop.ui.debug('try to push obsolete markers to remote\n')
423 423 repo = pushop.repo
424 424 remote = pushop.remote
425 425 if (obsolete._enabled and repo.obsstore and
426 426 'obsolete' in remote.listkeys('namespaces')):
427 427 rslts = []
428 428 remotedata = repo.listkeys('obsolete')
429 429 for key in sorted(remotedata, reverse=True):
430 430 # reverse sort to ensure we end with dump0
431 431 data = remotedata[key]
432 432 rslts.append(remote.pushkey('obsolete', key, '', data))
433 433 if [r for r in rslts if not r]:
434 434 msg = _('failed to push some obsolete markers!\n')
435 435 repo.ui.warn(msg)
436 436
437 437 def _pushbookmark(pushop):
438 438 """Update bookmark position on remote"""
439 439 ui = pushop.ui
440 440 repo = pushop.repo.unfiltered()
441 441 remote = pushop.remote
442 442 ui.debug("checking for updated bookmarks\n")
443 443 revnums = map(repo.changelog.rev, pushop.revs or [])
444 444 ancestors = [a for a in repo.changelog.ancestors(revnums, inclusive=True)]
445 445 (addsrc, adddst, advsrc, advdst, diverge, differ, invalid
446 446 ) = bookmarks.compare(repo, repo._bookmarks, remote.listkeys('bookmarks'),
447 447 srchex=hex)
448 448
449 449 for b, scid, dcid in advsrc:
450 450 if ancestors and repo[scid].rev() not in ancestors:
451 451 continue
452 452 if remote.pushkey('bookmarks', b, dcid, scid):
453 453 ui.status(_("updating bookmark %s\n") % b)
454 454 else:
455 455 ui.warn(_('updating bookmark %s failed!\n') % b)
456 456
457 457 class pulloperation(object):
458 458 """A object that represent a single pull operation
459 459
460 460 It purpose is to carry push related state and very common operation.
461 461
462 462 A new should be created at the beginning of each pull and discarded
463 463 afterward.
464 464 """
465 465
466 466 def __init__(self, repo, remote, heads=None, force=False):
467 467 # repo we pull into
468 468 self.repo = repo
469 469 # repo we pull from
470 470 self.remote = remote
471 471 # revision we try to pull (None is "all")
472 472 self.heads = heads
473 473 # do we force pull?
474 474 self.force = force
475 475 # the name the pull transaction
476 476 self._trname = 'pull\n' + util.hidepassword(remote.url())
477 477 # hold the transaction once created
478 478 self._tr = None
479 479 # set of common changeset between local and remote before pull
480 480 self.common = None
481 481 # set of pulled head
482 482 self.rheads = None
483 483 # list of missing changeset to fetch remotely
484 484 self.fetch = None
485 485 # result of changegroup pulling (used as return code by pull)
486 486 self.cgresult = None
487 487 # list of step remaining todo (related to future bundle2 usage)
488 488 self.todosteps = set(['changegroup', 'phases', 'obsmarkers'])
489 489
490 490 @util.propertycache
491 491 def pulledsubset(self):
492 492 """heads of the set of changeset target by the pull"""
493 493 # compute target subset
494 494 if self.heads is None:
495 495 # We pulled every thing possible
496 496 # sync on everything common
497 497 c = set(self.common)
498 498 ret = list(self.common)
499 499 for n in self.rheads:
500 500 if n not in c:
501 501 ret.append(n)
502 502 return ret
503 503 else:
504 504 # We pulled a specific subset
505 505 # sync on this subset
506 506 return self.heads
507 507
508 508 def gettransaction(self):
509 509 """get appropriate pull transaction, creating it if needed"""
510 510 if self._tr is None:
511 511 self._tr = self.repo.transaction(self._trname)
512 512 return self._tr
513 513
514 514 def closetransaction(self):
515 515 """close transaction if created"""
516 516 if self._tr is not None:
517 517 self._tr.close()
518 518
519 519 def releasetransaction(self):
520 520 """release transaction if created"""
521 521 if self._tr is not None:
522 522 self._tr.release()
523 523
524 524 def pull(repo, remote, heads=None, force=False):
525 525 pullop = pulloperation(repo, remote, heads, force)
526 526 if pullop.remote.local():
527 527 missing = set(pullop.remote.requirements) - pullop.repo.supported
528 528 if missing:
529 529 msg = _("required features are not"
530 530 " supported in the destination:"
531 531 " %s") % (', '.join(sorted(missing)))
532 532 raise util.Abort(msg)
533 533
534 534 lock = pullop.repo.lock()
535 535 try:
536 536 _pulldiscovery(pullop)
537 537 if (pullop.repo.ui.configbool('server', 'bundle2', False)
538 538 and pullop.remote.capable('bundle2-exp')):
539 539 _pullbundle2(pullop)
540 540 if 'changegroup' in pullop.todosteps:
541 541 _pullchangeset(pullop)
542 542 if 'phases' in pullop.todosteps:
543 543 _pullphase(pullop)
544 544 if 'obsmarkers' in pullop.todosteps:
545 545 _pullobsolete(pullop)
546 546 pullop.closetransaction()
547 547 finally:
548 548 pullop.releasetransaction()
549 549 lock.release()
550 550
551 551 return pullop.cgresult
552 552
553 553 def _pulldiscovery(pullop):
554 554 """discovery phase for the pull
555 555
556 556 Current handle changeset discovery only, will change handle all discovery
557 557 at some point."""
558 558 tmp = discovery.findcommonincoming(pullop.repo.unfiltered(),
559 559 pullop.remote,
560 560 heads=pullop.heads,
561 561 force=pullop.force)
562 562 pullop.common, pullop.fetch, pullop.rheads = tmp
563 563
564 564 def _pullbundle2(pullop):
565 565 """pull data using bundle2
566 566
567 567 For now, the only supported data are changegroup."""
568 568 kwargs = {'bundlecaps': set(['HG2X'])}
569 569 capsblob = bundle2.encodecaps(pullop.repo.bundle2caps)
570 570 kwargs['bundlecaps'].add('bundle2=' + urllib.quote(capsblob))
571 571 # pulling changegroup
572 572 pullop.todosteps.remove('changegroup')
573 573 if not pullop.fetch:
574 574 pullop.repo.ui.status(_("no changes found\n"))
575 575 pullop.cgresult = 0
576 576 else:
577 577 kwargs['common'] = pullop.common
578 578 kwargs['heads'] = pullop.heads or pullop.rheads
579 579 if pullop.heads is None and list(pullop.common) == [nullid]:
580 580 pullop.repo.ui.status(_("requesting all changes\n"))
581 581 _pullbundle2extraprepare(pullop, kwargs)
582 582 if kwargs.keys() == ['format']:
583 583 return # nothing to pull
584 584 bundle = pullop.remote.getbundle('pull', **kwargs)
585 585 try:
586 586 op = bundle2.processbundle(pullop.repo, bundle, pullop.gettransaction)
587 587 except KeyError, exc:
588 588 raise util.Abort('missing support for %s' % exc)
589 589 assert len(op.records['changegroup']) == 1
590 590 pullop.cgresult = op.records['changegroup'][0]['return']
591 591
592 592 def _pullbundle2extraprepare(pullop, kwargs):
593 593 """hook function so that extensions can extend the getbundle call"""
594 594 pass
595 595
596 596 def _pullchangeset(pullop):
597 597 """pull changeset from unbundle into the local repo"""
598 598 # We delay the open of the transaction as late as possible so we
599 599 # don't open transaction for nothing or you break future useful
600 600 # rollback call
601 601 pullop.todosteps.remove('changegroup')
602 602 if not pullop.fetch:
603 603 pullop.repo.ui.status(_("no changes found\n"))
604 604 pullop.cgresult = 0
605 605 return
606 606 pullop.gettransaction()
607 607 if pullop.heads is None and list(pullop.common) == [nullid]:
608 608 pullop.repo.ui.status(_("requesting all changes\n"))
609 609 elif pullop.heads is None and pullop.remote.capable('changegroupsubset'):
610 610 # issue1320, avoid a race if remote changed after discovery
611 611 pullop.heads = pullop.rheads
612 612
613 613 if pullop.remote.capable('getbundle'):
614 614 # TODO: get bundlecaps from remote
615 615 cg = pullop.remote.getbundle('pull', common=pullop.common,
616 616 heads=pullop.heads or pullop.rheads)
617 617 elif pullop.heads is None:
618 618 cg = pullop.remote.changegroup(pullop.fetch, 'pull')
619 619 elif not pullop.remote.capable('changegroupsubset'):
620 620 raise util.Abort(_("partial pull cannot be done because "
621 621 "other repository doesn't support "
622 622 "changegroupsubset."))
623 623 else:
624 624 cg = pullop.remote.changegroupsubset(pullop.fetch, pullop.heads, 'pull')
625 625 pullop.cgresult = changegroup.addchangegroup(pullop.repo, cg, 'pull',
626 626 pullop.remote.url())
627 627
628 628 def _pullphase(pullop):
629 629 # Get remote phases data from remote
630 630 pullop.todosteps.remove('phases')
631 631 remotephases = pullop.remote.listkeys('phases')
632 632 publishing = bool(remotephases.get('publishing', False))
633 633 if remotephases and not publishing:
634 634 # remote is new and unpublishing
635 635 pheads, _dr = phases.analyzeremotephases(pullop.repo,
636 636 pullop.pulledsubset,
637 637 remotephases)
638 638 phases.advanceboundary(pullop.repo, phases.public, pheads)
639 639 phases.advanceboundary(pullop.repo, phases.draft,
640 640 pullop.pulledsubset)
641 641 else:
642 642 # Remote is old or publishing all common changesets
643 643 # should be seen as public
644 644 phases.advanceboundary(pullop.repo, phases.public,
645 645 pullop.pulledsubset)
646 646
647 647 def _pullobsolete(pullop):
648 648 """utility function to pull obsolete markers from a remote
649 649
650 650 The `gettransaction` is function that return the pull transaction, creating
651 651 one if necessary. We return the transaction to inform the calling code that
652 652 a new transaction have been created (when applicable).
653 653
654 654 Exists mostly to allow overriding for experimentation purpose"""
655 655 pullop.todosteps.remove('obsmarkers')
656 656 tr = None
657 657 if obsolete._enabled:
658 658 pullop.repo.ui.debug('fetching remote obsolete markers\n')
659 659 remoteobs = pullop.remote.listkeys('obsolete')
660 660 if 'dump0' in remoteobs:
661 661 tr = pullop.gettransaction()
662 662 for key in sorted(remoteobs, reverse=True):
663 663 if key.startswith('dump'):
664 664 data = base85.b85decode(remoteobs[key])
665 665 pullop.repo.obsstore.mergemarkers(tr, data)
666 666 pullop.repo.invalidatevolatilesets()
667 667 return tr
668 668
669 669 def getbundle(repo, source, heads=None, common=None, bundlecaps=None,
670 670 **kwargs):
671 671 """return a full bundle (with potentially multiple kind of parts)
672 672
673 673 Could be a bundle HG10 or a bundle HG2X depending on bundlecaps
674 674 passed. For now, the bundle can contain only changegroup, but this will
675 675 changes when more part type will be available for bundle2.
676 676
677 677 This is different from changegroup.getbundle that only returns an HG10
678 678 changegroup bundle. They may eventually get reunited in the future when we
679 679 have a clearer idea of the API we what to query different data.
680 680
681 681 The implementation is at a very early stage and will get massive rework
682 682 when the API of bundle is refined.
683 683 """
684 684 # build bundle here.
685 685 cg = changegroup.getbundle(repo, source, heads=heads,
686 686 common=common, bundlecaps=bundlecaps)
687 687 if bundlecaps is None or 'HG2X' not in bundlecaps:
688 688 return cg
689 689 # very crude first implementation,
690 690 # the bundle API will change and the generation will be done lazily.
691 691 b2caps = {}
692 692 for bcaps in bundlecaps:
693 693 if bcaps.startswith('bundle2='):
694 694 blob = urllib.unquote(bcaps[len('bundle2='):])
695 695 b2caps.update(bundle2.decodecaps(blob))
696 696 bundler = bundle2.bundle20(repo.ui, b2caps)
697 697 part = bundle2.bundlepart('b2x:changegroup', data=cg.getchunks())
698 698 bundler.addpart(part)
699 699 _getbundleextrapart(bundler, repo, source, heads=None, common=None,
700 700 bundlecaps=None, **kwargs)
701 701 return util.chunkbuffer(bundler.getchunks())
702 702
703 703 def _getbundleextrapart(bundler, repo, source, heads=None, common=None,
704 704 bundlecaps=None, **kwargs):
705 705 """hook function to let extensions add parts to the requested bundle"""
706 706 pass
707 707
708 708 class PushRaced(RuntimeError):
709 709 """An exception raised during unbundling that indicate a push race"""
710 710
711 711 def check_heads(repo, their_heads, context):
712 712 """check if the heads of a repo have been modified
713 713
714 714 Used by peer for unbundling.
715 715 """
716 716 heads = repo.heads()
717 717 heads_hash = util.sha1(''.join(sorted(heads))).digest()
718 718 if not (their_heads == ['force'] or their_heads == heads or
719 719 their_heads == ['hashed', heads_hash]):
720 720 # someone else committed/pushed/unbundled while we
721 721 # were transferring data
722 722 raise PushRaced('repository changed while %s - '
723 723 'please try again' % context)
724 724
725 725 def unbundle(repo, cg, heads, source, url):
726 726 """Apply a bundle to a repo.
727 727
728 728 this function makes sure the repo is locked during the application and have
729 729 mechanism to check that no push race occurred between the creation of the
730 730 bundle and its application.
731 731
732 732 If the push was raced as PushRaced exception is raised."""
733 733 r = 0
734 734 # need a transaction when processing a bundle2 stream
735 735 tr = None
736 736 lock = repo.lock()
737 737 try:
738 738 check_heads(repo, heads, 'uploading changes')
739 739 # push can proceed
740 740 if util.safehasattr(cg, 'params'):
741 741 tr = repo.transaction('unbundle')
742 742 tr.hookargs['bundle2-exp'] = '1'
743 743 r = bundle2.processbundle(repo, cg, lambda: tr).reply
744 744 cl = repo.unfiltered().changelog
745 745 p = cl.writepending() and repo.root or ""
746 746 repo.hook('b2x-pretransactionclose', throw=True, source=source,
747 747 url=url, pending=p, **tr.hookargs)
748 748 tr.close()
749 749 repo.hook('b2x-transactionclose', source=source, url=url,
750 750 **tr.hookargs)
751 751 else:
752 752 r = changegroup.addchangegroup(repo, cg, source, url)
753 753 finally:
754 754 if tr is not None:
755 755 tr.release()
756 756 lock.release()
757 757 return r
General Comments 0
You need to be logged in to leave comments. Login now