##// END OF EJS Templates
bundle2: catch UnknownPartError during local push...
Pierre-Yves David -
r21182:08c84fd9 stable
parent child Browse files
Show More
@@ -1,757 +1,760 b''
1 1 # exchange.py - utility to exchange data between repos.
2 2 #
3 3 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from i18n import _
9 9 from node import hex, nullid
10 10 import errno, urllib
11 11 import util, scmutil, changegroup, base85
12 12 import discovery, phases, obsolete, bookmarks, bundle2
13 13
14 14 def readbundle(ui, fh, fname, vfs=None):
15 15 header = changegroup.readexactly(fh, 4)
16 16
17 17 alg = None
18 18 if not fname:
19 19 fname = "stream"
20 20 if not header.startswith('HG') and header.startswith('\0'):
21 21 fh = changegroup.headerlessfixup(fh, header)
22 22 header = "HG10"
23 23 alg = 'UN'
24 24 elif vfs:
25 25 fname = vfs.join(fname)
26 26
27 27 magic, version = header[0:2], header[2:4]
28 28
29 29 if magic != 'HG':
30 30 raise util.Abort(_('%s: not a Mercurial bundle') % fname)
31 31 if version == '10':
32 32 if alg is None:
33 33 alg = changegroup.readexactly(fh, 2)
34 34 return changegroup.unbundle10(fh, alg)
35 35 elif version == '2X':
36 36 return bundle2.unbundle20(ui, fh, header=magic + version)
37 37 else:
38 38 raise util.Abort(_('%s: unknown bundle version %s') % (fname, version))
39 39
40 40
41 41 class pushoperation(object):
42 42 """A object that represent a single push operation
43 43
44 44 It purpose is to carry push related state and very common operation.
45 45
46 46 A new should be created at the beginning of each push and discarded
47 47 afterward.
48 48 """
49 49
50 50 def __init__(self, repo, remote, force=False, revs=None, newbranch=False):
51 51 # repo we push from
52 52 self.repo = repo
53 53 self.ui = repo.ui
54 54 # repo we push to
55 55 self.remote = remote
56 56 # force option provided
57 57 self.force = force
58 58 # revs to be pushed (None is "all")
59 59 self.revs = revs
60 60 # allow push of new branch
61 61 self.newbranch = newbranch
62 62 # did a local lock get acquired?
63 63 self.locallocked = None
64 64 # Integer version of the push result
65 65 # - None means nothing to push
66 66 # - 0 means HTTP error
67 67 # - 1 means we pushed and remote head count is unchanged *or*
68 68 # we have outgoing changesets but refused to push
69 69 # - other values as described by addchangegroup()
70 70 self.ret = None
71 71 # discover.outgoing object (contains common and outgoing data)
72 72 self.outgoing = None
73 73 # all remote heads before the push
74 74 self.remoteheads = None
75 75 # testable as a boolean indicating if any nodes are missing locally.
76 76 self.incoming = None
77 77 # set of all heads common after changeset bundle push
78 78 self.commonheads = None
79 79
80 80 def push(repo, remote, force=False, revs=None, newbranch=False):
81 81 '''Push outgoing changesets (limited by revs) from a local
82 82 repository to remote. Return an integer:
83 83 - None means nothing to push
84 84 - 0 means HTTP error
85 85 - 1 means we pushed and remote head count is unchanged *or*
86 86 we have outgoing changesets but refused to push
87 87 - other values as described by addchangegroup()
88 88 '''
89 89 pushop = pushoperation(repo, remote, force, revs, newbranch)
90 90 if pushop.remote.local():
91 91 missing = (set(pushop.repo.requirements)
92 92 - pushop.remote.local().supported)
93 93 if missing:
94 94 msg = _("required features are not"
95 95 " supported in the destination:"
96 96 " %s") % (', '.join(sorted(missing)))
97 97 raise util.Abort(msg)
98 98
99 99 # there are two ways to push to remote repo:
100 100 #
101 101 # addchangegroup assumes local user can lock remote
102 102 # repo (local filesystem, old ssh servers).
103 103 #
104 104 # unbundle assumes local user cannot lock remote repo (new ssh
105 105 # servers, http servers).
106 106
107 107 if not pushop.remote.canpush():
108 108 raise util.Abort(_("destination does not support push"))
109 109 # get local lock as we might write phase data
110 110 locallock = None
111 111 try:
112 112 locallock = pushop.repo.lock()
113 113 pushop.locallocked = True
114 114 except IOError, err:
115 115 pushop.locallocked = False
116 116 if err.errno != errno.EACCES:
117 117 raise
118 118 # source repo cannot be locked.
119 119 # We do not abort the push, but just disable the local phase
120 120 # synchronisation.
121 121 msg = 'cannot lock source repository: %s\n' % err
122 122 pushop.ui.debug(msg)
123 123 try:
124 124 pushop.repo.checkpush(pushop)
125 125 lock = None
126 126 unbundle = pushop.remote.capable('unbundle')
127 127 if not unbundle:
128 128 lock = pushop.remote.lock()
129 129 try:
130 130 _pushdiscovery(pushop)
131 131 if _pushcheckoutgoing(pushop):
132 132 pushop.repo.prepushoutgoinghooks(pushop.repo,
133 133 pushop.remote,
134 134 pushop.outgoing)
135 135 if (pushop.repo.ui.configbool('experimental', 'bundle2-exp',
136 136 False)
137 137 and pushop.remote.capable('bundle2-exp')):
138 138 _pushbundle2(pushop)
139 139 else:
140 140 _pushchangeset(pushop)
141 141 _pushcomputecommonheads(pushop)
142 142 _pushsyncphase(pushop)
143 143 _pushobsolete(pushop)
144 144 finally:
145 145 if lock is not None:
146 146 lock.release()
147 147 finally:
148 148 if locallock is not None:
149 149 locallock.release()
150 150
151 151 _pushbookmark(pushop)
152 152 return pushop.ret
153 153
154 154 def _pushdiscovery(pushop):
155 155 # discovery
156 156 unfi = pushop.repo.unfiltered()
157 157 fci = discovery.findcommonincoming
158 158 commoninc = fci(unfi, pushop.remote, force=pushop.force)
159 159 common, inc, remoteheads = commoninc
160 160 fco = discovery.findcommonoutgoing
161 161 outgoing = fco(unfi, pushop.remote, onlyheads=pushop.revs,
162 162 commoninc=commoninc, force=pushop.force)
163 163 pushop.outgoing = outgoing
164 164 pushop.remoteheads = remoteheads
165 165 pushop.incoming = inc
166 166
167 167 def _pushcheckoutgoing(pushop):
168 168 outgoing = pushop.outgoing
169 169 unfi = pushop.repo.unfiltered()
170 170 if not outgoing.missing:
171 171 # nothing to push
172 172 scmutil.nochangesfound(unfi.ui, unfi, outgoing.excluded)
173 173 return False
174 174 # something to push
175 175 if not pushop.force:
176 176 # if repo.obsstore == False --> no obsolete
177 177 # then, save the iteration
178 178 if unfi.obsstore:
179 179 # this message are here for 80 char limit reason
180 180 mso = _("push includes obsolete changeset: %s!")
181 181 mst = "push includes %s changeset: %s!"
182 182 # plain versions for i18n tool to detect them
183 183 _("push includes unstable changeset: %s!")
184 184 _("push includes bumped changeset: %s!")
185 185 _("push includes divergent changeset: %s!")
186 186 # If we are to push if there is at least one
187 187 # obsolete or unstable changeset in missing, at
188 188 # least one of the missinghead will be obsolete or
189 189 # unstable. So checking heads only is ok
190 190 for node in outgoing.missingheads:
191 191 ctx = unfi[node]
192 192 if ctx.obsolete():
193 193 raise util.Abort(mso % ctx)
194 194 elif ctx.troubled():
195 195 raise util.Abort(_(mst)
196 196 % (ctx.troubles()[0],
197 197 ctx))
198 198 newbm = pushop.ui.configlist('bookmarks', 'pushing')
199 199 discovery.checkheads(unfi, pushop.remote, outgoing,
200 200 pushop.remoteheads,
201 201 pushop.newbranch,
202 202 bool(pushop.incoming),
203 203 newbm)
204 204 return True
205 205
206 206 def _pushbundle2(pushop):
207 207 """push data to the remote using bundle2
208 208
209 209 The only currently supported type of data is changegroup but this will
210 210 evolve in the future."""
211 211 # Send known head to the server for race detection.
212 212 capsblob = urllib.unquote(pushop.remote.capable('bundle2-exp'))
213 213 caps = bundle2.decodecaps(capsblob)
214 214 bundler = bundle2.bundle20(pushop.ui, caps)
215 215 # create reply capability
216 216 capsblob = bundle2.encodecaps(pushop.repo.bundle2caps)
217 217 bundler.addpart(bundle2.bundlepart('b2x:replycaps', data=capsblob))
218 218 if not pushop.force:
219 219 part = bundle2.bundlepart('B2X:CHECK:HEADS',
220 220 data=iter(pushop.remoteheads))
221 221 bundler.addpart(part)
222 222 extrainfo = _pushbundle2extraparts(pushop, bundler)
223 223 # add the changegroup bundle
224 224 cg = changegroup.getlocalbundle(pushop.repo, 'push', pushop.outgoing)
225 225 cgpart = bundle2.bundlepart('B2X:CHANGEGROUP', data=cg.getchunks())
226 226 bundler.addpart(cgpart)
227 227 stream = util.chunkbuffer(bundler.getchunks())
228 reply = pushop.remote.unbundle(stream, ['force'], 'push')
228 try:
229 reply = pushop.remote.unbundle(stream, ['force'], 'push')
230 except bundle2.UnknownPartError, exc:
231 raise util.Abort('missing support for %s' % exc)
229 232 try:
230 233 op = bundle2.processbundle(pushop.repo, reply)
231 234 except bundle2.UnknownPartError, exc:
232 235 raise util.Abort('missing support for %s' % exc)
233 236 cgreplies = op.records.getreplies(cgpart.id)
234 237 assert len(cgreplies['changegroup']) == 1
235 238 pushop.ret = cgreplies['changegroup'][0]['return']
236 239 _pushbundle2extrareply(pushop, op, extrainfo)
237 240
238 241 def _pushbundle2extraparts(pushop, bundler):
239 242 """hook function to let extensions add parts
240 243
241 244 Return a dict to let extensions pass data to the reply processing.
242 245 """
243 246 return {}
244 247
245 248 def _pushbundle2extrareply(pushop, op, extrainfo):
246 249 """hook function to let extensions react to part replies
247 250
248 251 The dict from _pushbundle2extrareply is fed to this function.
249 252 """
250 253 pass
251 254
252 255 def _pushchangeset(pushop):
253 256 """Make the actual push of changeset bundle to remote repo"""
254 257 outgoing = pushop.outgoing
255 258 unbundle = pushop.remote.capable('unbundle')
256 259 # TODO: get bundlecaps from remote
257 260 bundlecaps = None
258 261 # create a changegroup from local
259 262 if pushop.revs is None and not (outgoing.excluded
260 263 or pushop.repo.changelog.filteredrevs):
261 264 # push everything,
262 265 # use the fast path, no race possible on push
263 266 bundler = changegroup.bundle10(pushop.repo, bundlecaps)
264 267 cg = changegroup.getsubset(pushop.repo,
265 268 outgoing,
266 269 bundler,
267 270 'push',
268 271 fastpath=True)
269 272 else:
270 273 cg = changegroup.getlocalbundle(pushop.repo, 'push', outgoing,
271 274 bundlecaps)
272 275
273 276 # apply changegroup to remote
274 277 if unbundle:
275 278 # local repo finds heads on server, finds out what
276 279 # revs it must push. once revs transferred, if server
277 280 # finds it has different heads (someone else won
278 281 # commit/push race), server aborts.
279 282 if pushop.force:
280 283 remoteheads = ['force']
281 284 else:
282 285 remoteheads = pushop.remoteheads
283 286 # ssh: return remote's addchangegroup()
284 287 # http: return remote's addchangegroup() or 0 for error
285 288 pushop.ret = pushop.remote.unbundle(cg, remoteheads,
286 289 'push')
287 290 else:
288 291 # we return an integer indicating remote head count
289 292 # change
290 293 pushop.ret = pushop.remote.addchangegroup(cg, 'push', pushop.repo.url())
291 294
292 295 def _pushcomputecommonheads(pushop):
293 296 unfi = pushop.repo.unfiltered()
294 297 if pushop.ret:
295 298 # push succeed, synchronize target of the push
296 299 cheads = pushop.outgoing.missingheads
297 300 elif pushop.revs is None:
298 301 # All out push fails. synchronize all common
299 302 cheads = pushop.outgoing.commonheads
300 303 else:
301 304 # I want cheads = heads(::missingheads and ::commonheads)
302 305 # (missingheads is revs with secret changeset filtered out)
303 306 #
304 307 # This can be expressed as:
305 308 # cheads = ( (missingheads and ::commonheads)
306 309 # + (commonheads and ::missingheads))"
307 310 # )
308 311 #
309 312 # while trying to push we already computed the following:
310 313 # common = (::commonheads)
311 314 # missing = ((commonheads::missingheads) - commonheads)
312 315 #
313 316 # We can pick:
314 317 # * missingheads part of common (::commonheads)
315 318 common = set(pushop.outgoing.common)
316 319 nm = pushop.repo.changelog.nodemap
317 320 cheads = [node for node in pushop.revs if nm[node] in common]
318 321 # and
319 322 # * commonheads parents on missing
320 323 revset = unfi.set('%ln and parents(roots(%ln))',
321 324 pushop.outgoing.commonheads,
322 325 pushop.outgoing.missing)
323 326 cheads.extend(c.node() for c in revset)
324 327 pushop.commonheads = cheads
325 328
326 329 def _pushsyncphase(pushop):
327 330 """synchronise phase information locally and remotely"""
328 331 unfi = pushop.repo.unfiltered()
329 332 cheads = pushop.commonheads
330 333 if pushop.ret:
331 334 # push succeed, synchronize target of the push
332 335 cheads = pushop.outgoing.missingheads
333 336 elif pushop.revs is None:
334 337 # All out push fails. synchronize all common
335 338 cheads = pushop.outgoing.commonheads
336 339 else:
337 340 # I want cheads = heads(::missingheads and ::commonheads)
338 341 # (missingheads is revs with secret changeset filtered out)
339 342 #
340 343 # This can be expressed as:
341 344 # cheads = ( (missingheads and ::commonheads)
342 345 # + (commonheads and ::missingheads))"
343 346 # )
344 347 #
345 348 # while trying to push we already computed the following:
346 349 # common = (::commonheads)
347 350 # missing = ((commonheads::missingheads) - commonheads)
348 351 #
349 352 # We can pick:
350 353 # * missingheads part of common (::commonheads)
351 354 common = set(pushop.outgoing.common)
352 355 nm = pushop.repo.changelog.nodemap
353 356 cheads = [node for node in pushop.revs if nm[node] in common]
354 357 # and
355 358 # * commonheads parents on missing
356 359 revset = unfi.set('%ln and parents(roots(%ln))',
357 360 pushop.outgoing.commonheads,
358 361 pushop.outgoing.missing)
359 362 cheads.extend(c.node() for c in revset)
360 363 pushop.commonheads = cheads
361 364 # even when we don't push, exchanging phase data is useful
362 365 remotephases = pushop.remote.listkeys('phases')
363 366 if (pushop.ui.configbool('ui', '_usedassubrepo', False)
364 367 and remotephases # server supports phases
365 368 and pushop.ret is None # nothing was pushed
366 369 and remotephases.get('publishing', False)):
367 370 # When:
368 371 # - this is a subrepo push
369 372 # - and remote support phase
370 373 # - and no changeset was pushed
371 374 # - and remote is publishing
372 375 # We may be in issue 3871 case!
373 376 # We drop the possible phase synchronisation done by
374 377 # courtesy to publish changesets possibly locally draft
375 378 # on the remote.
376 379 remotephases = {'publishing': 'True'}
377 380 if not remotephases: # old server or public only reply from non-publishing
378 381 _localphasemove(pushop, cheads)
379 382 # don't push any phase data as there is nothing to push
380 383 else:
381 384 ana = phases.analyzeremotephases(pushop.repo, cheads,
382 385 remotephases)
383 386 pheads, droots = ana
384 387 ### Apply remote phase on local
385 388 if remotephases.get('publishing', False):
386 389 _localphasemove(pushop, cheads)
387 390 else: # publish = False
388 391 _localphasemove(pushop, pheads)
389 392 _localphasemove(pushop, cheads, phases.draft)
390 393 ### Apply local phase on remote
391 394
392 395 # Get the list of all revs draft on remote by public here.
393 396 # XXX Beware that revset break if droots is not strictly
394 397 # XXX root we may want to ensure it is but it is costly
395 398 outdated = unfi.set('heads((%ln::%ln) and public())',
396 399 droots, cheads)
397 400 for newremotehead in outdated:
398 401 r = pushop.remote.pushkey('phases',
399 402 newremotehead.hex(),
400 403 str(phases.draft),
401 404 str(phases.public))
402 405 if not r:
403 406 pushop.ui.warn(_('updating %s to public failed!\n')
404 407 % newremotehead)
405 408
406 409 def _localphasemove(pushop, nodes, phase=phases.public):
407 410 """move <nodes> to <phase> in the local source repo"""
408 411 if pushop.locallocked:
409 412 phases.advanceboundary(pushop.repo, phase, nodes)
410 413 else:
411 414 # repo is not locked, do not change any phases!
412 415 # Informs the user that phases should have been moved when
413 416 # applicable.
414 417 actualmoves = [n for n in nodes if phase < pushop.repo[n].phase()]
415 418 phasestr = phases.phasenames[phase]
416 419 if actualmoves:
417 420 pushop.ui.status(_('cannot lock source repo, skipping '
418 421 'local %s phase update\n') % phasestr)
419 422
420 423 def _pushobsolete(pushop):
421 424 """utility function to push obsolete markers to a remote"""
422 425 pushop.ui.debug('try to push obsolete markers to remote\n')
423 426 repo = pushop.repo
424 427 remote = pushop.remote
425 428 if (obsolete._enabled and repo.obsstore and
426 429 'obsolete' in remote.listkeys('namespaces')):
427 430 rslts = []
428 431 remotedata = repo.listkeys('obsolete')
429 432 for key in sorted(remotedata, reverse=True):
430 433 # reverse sort to ensure we end with dump0
431 434 data = remotedata[key]
432 435 rslts.append(remote.pushkey('obsolete', key, '', data))
433 436 if [r for r in rslts if not r]:
434 437 msg = _('failed to push some obsolete markers!\n')
435 438 repo.ui.warn(msg)
436 439
437 440 def _pushbookmark(pushop):
438 441 """Update bookmark position on remote"""
439 442 ui = pushop.ui
440 443 repo = pushop.repo.unfiltered()
441 444 remote = pushop.remote
442 445 ui.debug("checking for updated bookmarks\n")
443 446 revnums = map(repo.changelog.rev, pushop.revs or [])
444 447 ancestors = [a for a in repo.changelog.ancestors(revnums, inclusive=True)]
445 448 (addsrc, adddst, advsrc, advdst, diverge, differ, invalid
446 449 ) = bookmarks.compare(repo, repo._bookmarks, remote.listkeys('bookmarks'),
447 450 srchex=hex)
448 451
449 452 for b, scid, dcid in advsrc:
450 453 if ancestors and repo[scid].rev() not in ancestors:
451 454 continue
452 455 if remote.pushkey('bookmarks', b, dcid, scid):
453 456 ui.status(_("updating bookmark %s\n") % b)
454 457 else:
455 458 ui.warn(_('updating bookmark %s failed!\n') % b)
456 459
457 460 class pulloperation(object):
458 461 """A object that represent a single pull operation
459 462
460 463 It purpose is to carry push related state and very common operation.
461 464
462 465 A new should be created at the beginning of each pull and discarded
463 466 afterward.
464 467 """
465 468
466 469 def __init__(self, repo, remote, heads=None, force=False):
467 470 # repo we pull into
468 471 self.repo = repo
469 472 # repo we pull from
470 473 self.remote = remote
471 474 # revision we try to pull (None is "all")
472 475 self.heads = heads
473 476 # do we force pull?
474 477 self.force = force
475 478 # the name the pull transaction
476 479 self._trname = 'pull\n' + util.hidepassword(remote.url())
477 480 # hold the transaction once created
478 481 self._tr = None
479 482 # set of common changeset between local and remote before pull
480 483 self.common = None
481 484 # set of pulled head
482 485 self.rheads = None
483 486 # list of missing changeset to fetch remotely
484 487 self.fetch = None
485 488 # result of changegroup pulling (used as return code by pull)
486 489 self.cgresult = None
487 490 # list of step remaining todo (related to future bundle2 usage)
488 491 self.todosteps = set(['changegroup', 'phases', 'obsmarkers'])
489 492
490 493 @util.propertycache
491 494 def pulledsubset(self):
492 495 """heads of the set of changeset target by the pull"""
493 496 # compute target subset
494 497 if self.heads is None:
495 498 # We pulled every thing possible
496 499 # sync on everything common
497 500 c = set(self.common)
498 501 ret = list(self.common)
499 502 for n in self.rheads:
500 503 if n not in c:
501 504 ret.append(n)
502 505 return ret
503 506 else:
504 507 # We pulled a specific subset
505 508 # sync on this subset
506 509 return self.heads
507 510
508 511 def gettransaction(self):
509 512 """get appropriate pull transaction, creating it if needed"""
510 513 if self._tr is None:
511 514 self._tr = self.repo.transaction(self._trname)
512 515 return self._tr
513 516
514 517 def closetransaction(self):
515 518 """close transaction if created"""
516 519 if self._tr is not None:
517 520 self._tr.close()
518 521
519 522 def releasetransaction(self):
520 523 """release transaction if created"""
521 524 if self._tr is not None:
522 525 self._tr.release()
523 526
524 527 def pull(repo, remote, heads=None, force=False):
525 528 pullop = pulloperation(repo, remote, heads, force)
526 529 if pullop.remote.local():
527 530 missing = set(pullop.remote.requirements) - pullop.repo.supported
528 531 if missing:
529 532 msg = _("required features are not"
530 533 " supported in the destination:"
531 534 " %s") % (', '.join(sorted(missing)))
532 535 raise util.Abort(msg)
533 536
534 537 lock = pullop.repo.lock()
535 538 try:
536 539 _pulldiscovery(pullop)
537 540 if (pullop.repo.ui.configbool('server', 'bundle2', False)
538 541 and pullop.remote.capable('bundle2-exp')):
539 542 _pullbundle2(pullop)
540 543 if 'changegroup' in pullop.todosteps:
541 544 _pullchangeset(pullop)
542 545 if 'phases' in pullop.todosteps:
543 546 _pullphase(pullop)
544 547 if 'obsmarkers' in pullop.todosteps:
545 548 _pullobsolete(pullop)
546 549 pullop.closetransaction()
547 550 finally:
548 551 pullop.releasetransaction()
549 552 lock.release()
550 553
551 554 return pullop.cgresult
552 555
553 556 def _pulldiscovery(pullop):
554 557 """discovery phase for the pull
555 558
556 559 Current handle changeset discovery only, will change handle all discovery
557 560 at some point."""
558 561 tmp = discovery.findcommonincoming(pullop.repo.unfiltered(),
559 562 pullop.remote,
560 563 heads=pullop.heads,
561 564 force=pullop.force)
562 565 pullop.common, pullop.fetch, pullop.rheads = tmp
563 566
564 567 def _pullbundle2(pullop):
565 568 """pull data using bundle2
566 569
567 570 For now, the only supported data are changegroup."""
568 571 kwargs = {'bundlecaps': set(['HG2X'])}
569 572 capsblob = bundle2.encodecaps(pullop.repo.bundle2caps)
570 573 kwargs['bundlecaps'].add('bundle2=' + urllib.quote(capsblob))
571 574 # pulling changegroup
572 575 pullop.todosteps.remove('changegroup')
573 576 if not pullop.fetch:
574 577 pullop.repo.ui.status(_("no changes found\n"))
575 578 pullop.cgresult = 0
576 579 else:
577 580 kwargs['common'] = pullop.common
578 581 kwargs['heads'] = pullop.heads or pullop.rheads
579 582 if pullop.heads is None and list(pullop.common) == [nullid]:
580 583 pullop.repo.ui.status(_("requesting all changes\n"))
581 584 _pullbundle2extraprepare(pullop, kwargs)
582 585 if kwargs.keys() == ['format']:
583 586 return # nothing to pull
584 587 bundle = pullop.remote.getbundle('pull', **kwargs)
585 588 try:
586 589 op = bundle2.processbundle(pullop.repo, bundle, pullop.gettransaction)
587 590 except UnknownPartError, exc:
588 591 raise util.Abort('missing support for %s' % exc)
589 592 assert len(op.records['changegroup']) == 1
590 593 pullop.cgresult = op.records['changegroup'][0]['return']
591 594
592 595 def _pullbundle2extraprepare(pullop, kwargs):
593 596 """hook function so that extensions can extend the getbundle call"""
594 597 pass
595 598
596 599 def _pullchangeset(pullop):
597 600 """pull changeset from unbundle into the local repo"""
598 601 # We delay the open of the transaction as late as possible so we
599 602 # don't open transaction for nothing or you break future useful
600 603 # rollback call
601 604 pullop.todosteps.remove('changegroup')
602 605 if not pullop.fetch:
603 606 pullop.repo.ui.status(_("no changes found\n"))
604 607 pullop.cgresult = 0
605 608 return
606 609 pullop.gettransaction()
607 610 if pullop.heads is None and list(pullop.common) == [nullid]:
608 611 pullop.repo.ui.status(_("requesting all changes\n"))
609 612 elif pullop.heads is None and pullop.remote.capable('changegroupsubset'):
610 613 # issue1320, avoid a race if remote changed after discovery
611 614 pullop.heads = pullop.rheads
612 615
613 616 if pullop.remote.capable('getbundle'):
614 617 # TODO: get bundlecaps from remote
615 618 cg = pullop.remote.getbundle('pull', common=pullop.common,
616 619 heads=pullop.heads or pullop.rheads)
617 620 elif pullop.heads is None:
618 621 cg = pullop.remote.changegroup(pullop.fetch, 'pull')
619 622 elif not pullop.remote.capable('changegroupsubset'):
620 623 raise util.Abort(_("partial pull cannot be done because "
621 624 "other repository doesn't support "
622 625 "changegroupsubset."))
623 626 else:
624 627 cg = pullop.remote.changegroupsubset(pullop.fetch, pullop.heads, 'pull')
625 628 pullop.cgresult = changegroup.addchangegroup(pullop.repo, cg, 'pull',
626 629 pullop.remote.url())
627 630
628 631 def _pullphase(pullop):
629 632 # Get remote phases data from remote
630 633 pullop.todosteps.remove('phases')
631 634 remotephases = pullop.remote.listkeys('phases')
632 635 publishing = bool(remotephases.get('publishing', False))
633 636 if remotephases and not publishing:
634 637 # remote is new and unpublishing
635 638 pheads, _dr = phases.analyzeremotephases(pullop.repo,
636 639 pullop.pulledsubset,
637 640 remotephases)
638 641 phases.advanceboundary(pullop.repo, phases.public, pheads)
639 642 phases.advanceboundary(pullop.repo, phases.draft,
640 643 pullop.pulledsubset)
641 644 else:
642 645 # Remote is old or publishing all common changesets
643 646 # should be seen as public
644 647 phases.advanceboundary(pullop.repo, phases.public,
645 648 pullop.pulledsubset)
646 649
647 650 def _pullobsolete(pullop):
648 651 """utility function to pull obsolete markers from a remote
649 652
650 653 The `gettransaction` is function that return the pull transaction, creating
651 654 one if necessary. We return the transaction to inform the calling code that
652 655 a new transaction have been created (when applicable).
653 656
654 657 Exists mostly to allow overriding for experimentation purpose"""
655 658 pullop.todosteps.remove('obsmarkers')
656 659 tr = None
657 660 if obsolete._enabled:
658 661 pullop.repo.ui.debug('fetching remote obsolete markers\n')
659 662 remoteobs = pullop.remote.listkeys('obsolete')
660 663 if 'dump0' in remoteobs:
661 664 tr = pullop.gettransaction()
662 665 for key in sorted(remoteobs, reverse=True):
663 666 if key.startswith('dump'):
664 667 data = base85.b85decode(remoteobs[key])
665 668 pullop.repo.obsstore.mergemarkers(tr, data)
666 669 pullop.repo.invalidatevolatilesets()
667 670 return tr
668 671
669 672 def getbundle(repo, source, heads=None, common=None, bundlecaps=None,
670 673 **kwargs):
671 674 """return a full bundle (with potentially multiple kind of parts)
672 675
673 676 Could be a bundle HG10 or a bundle HG2X depending on bundlecaps
674 677 passed. For now, the bundle can contain only changegroup, but this will
675 678 changes when more part type will be available for bundle2.
676 679
677 680 This is different from changegroup.getbundle that only returns an HG10
678 681 changegroup bundle. They may eventually get reunited in the future when we
679 682 have a clearer idea of the API we what to query different data.
680 683
681 684 The implementation is at a very early stage and will get massive rework
682 685 when the API of bundle is refined.
683 686 """
684 687 # build bundle here.
685 688 cg = changegroup.getbundle(repo, source, heads=heads,
686 689 common=common, bundlecaps=bundlecaps)
687 690 if bundlecaps is None or 'HG2X' not in bundlecaps:
688 691 return cg
689 692 # very crude first implementation,
690 693 # the bundle API will change and the generation will be done lazily.
691 694 b2caps = {}
692 695 for bcaps in bundlecaps:
693 696 if bcaps.startswith('bundle2='):
694 697 blob = urllib.unquote(bcaps[len('bundle2='):])
695 698 b2caps.update(bundle2.decodecaps(blob))
696 699 bundler = bundle2.bundle20(repo.ui, b2caps)
697 700 part = bundle2.bundlepart('b2x:changegroup', data=cg.getchunks())
698 701 bundler.addpart(part)
699 702 _getbundleextrapart(bundler, repo, source, heads=None, common=None,
700 703 bundlecaps=None, **kwargs)
701 704 return util.chunkbuffer(bundler.getchunks())
702 705
703 706 def _getbundleextrapart(bundler, repo, source, heads=None, common=None,
704 707 bundlecaps=None, **kwargs):
705 708 """hook function to let extensions add parts to the requested bundle"""
706 709 pass
707 710
708 711 class PushRaced(RuntimeError):
709 712 """An exception raised during unbundling that indicate a push race"""
710 713
711 714 def check_heads(repo, their_heads, context):
712 715 """check if the heads of a repo have been modified
713 716
714 717 Used by peer for unbundling.
715 718 """
716 719 heads = repo.heads()
717 720 heads_hash = util.sha1(''.join(sorted(heads))).digest()
718 721 if not (their_heads == ['force'] or their_heads == heads or
719 722 their_heads == ['hashed', heads_hash]):
720 723 # someone else committed/pushed/unbundled while we
721 724 # were transferring data
722 725 raise PushRaced('repository changed while %s - '
723 726 'please try again' % context)
724 727
725 728 def unbundle(repo, cg, heads, source, url):
726 729 """Apply a bundle to a repo.
727 730
728 731 this function makes sure the repo is locked during the application and have
729 732 mechanism to check that no push race occurred between the creation of the
730 733 bundle and its application.
731 734
732 735 If the push was raced as PushRaced exception is raised."""
733 736 r = 0
734 737 # need a transaction when processing a bundle2 stream
735 738 tr = None
736 739 lock = repo.lock()
737 740 try:
738 741 check_heads(repo, heads, 'uploading changes')
739 742 # push can proceed
740 743 if util.safehasattr(cg, 'params'):
741 744 tr = repo.transaction('unbundle')
742 745 tr.hookargs['bundle2-exp'] = '1'
743 746 r = bundle2.processbundle(repo, cg, lambda: tr).reply
744 747 cl = repo.unfiltered().changelog
745 748 p = cl.writepending() and repo.root or ""
746 749 repo.hook('b2x-pretransactionclose', throw=True, source=source,
747 750 url=url, pending=p, **tr.hookargs)
748 751 tr.close()
749 752 repo.hook('b2x-transactionclose', source=source, url=url,
750 753 **tr.hookargs)
751 754 else:
752 755 r = changegroup.addchangegroup(repo, cg, source, url)
753 756 finally:
754 757 if tr is not None:
755 758 tr.release()
756 759 lock.release()
757 760 return r
General Comments 0
You need to be logged in to leave comments. Login now