##// END OF EJS Templates
bundle2: call _pushbundle2extraparts a bit sooner...
Pierre-Yves David -
r21898:10fcfb61 default
parent child Browse files
Show More
@@ -1,792 +1,792 b''
1 1 # exchange.py - utility to exchange data between repos.
2 2 #
3 3 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from i18n import _
9 9 from node import hex, nullid
10 10 import errno, urllib
11 11 import util, scmutil, changegroup, base85, error
12 12 import discovery, phases, obsolete, bookmarks, bundle2, pushkey
13 13
14 14 def readbundle(ui, fh, fname, vfs=None):
15 15 header = changegroup.readexactly(fh, 4)
16 16
17 17 alg = None
18 18 if not fname:
19 19 fname = "stream"
20 20 if not header.startswith('HG') and header.startswith('\0'):
21 21 fh = changegroup.headerlessfixup(fh, header)
22 22 header = "HG10"
23 23 alg = 'UN'
24 24 elif vfs:
25 25 fname = vfs.join(fname)
26 26
27 27 magic, version = header[0:2], header[2:4]
28 28
29 29 if magic != 'HG':
30 30 raise util.Abort(_('%s: not a Mercurial bundle') % fname)
31 31 if version == '10':
32 32 if alg is None:
33 33 alg = changegroup.readexactly(fh, 2)
34 34 return changegroup.unbundle10(fh, alg)
35 35 elif version == '2X':
36 36 return bundle2.unbundle20(ui, fh, header=magic + version)
37 37 else:
38 38 raise util.Abort(_('%s: unknown bundle version %s') % (fname, version))
39 39
40 40
41 41 class pushoperation(object):
42 42 """A object that represent a single push operation
43 43
44 44 It purpose is to carry push related state and very common operation.
45 45
46 46 A new should be created at the beginning of each push and discarded
47 47 afterward.
48 48 """
49 49
50 50 def __init__(self, repo, remote, force=False, revs=None, newbranch=False):
51 51 # repo we push from
52 52 self.repo = repo
53 53 self.ui = repo.ui
54 54 # repo we push to
55 55 self.remote = remote
56 56 # force option provided
57 57 self.force = force
58 58 # revs to be pushed (None is "all")
59 59 self.revs = revs
60 60 # allow push of new branch
61 61 self.newbranch = newbranch
62 62 # did a local lock get acquired?
63 63 self.locallocked = None
64 64 # Integer version of the push result
65 65 # - None means nothing to push
66 66 # - 0 means HTTP error
67 67 # - 1 means we pushed and remote head count is unchanged *or*
68 68 # we have outgoing changesets but refused to push
69 69 # - other values as described by addchangegroup()
70 70 self.ret = None
71 71 # discover.outgoing object (contains common and outgoing data)
72 72 self.outgoing = None
73 73 # all remote heads before the push
74 74 self.remoteheads = None
75 75 # testable as a boolean indicating if any nodes are missing locally.
76 76 self.incoming = None
77 77 # set of all heads common after changeset bundle push
78 78 self.commonheads = None
79 79
80 80 def push(repo, remote, force=False, revs=None, newbranch=False):
81 81 '''Push outgoing changesets (limited by revs) from a local
82 82 repository to remote. Return an integer:
83 83 - None means nothing to push
84 84 - 0 means HTTP error
85 85 - 1 means we pushed and remote head count is unchanged *or*
86 86 we have outgoing changesets but refused to push
87 87 - other values as described by addchangegroup()
88 88 '''
89 89 pushop = pushoperation(repo, remote, force, revs, newbranch)
90 90 if pushop.remote.local():
91 91 missing = (set(pushop.repo.requirements)
92 92 - pushop.remote.local().supported)
93 93 if missing:
94 94 msg = _("required features are not"
95 95 " supported in the destination:"
96 96 " %s") % (', '.join(sorted(missing)))
97 97 raise util.Abort(msg)
98 98
99 99 # there are two ways to push to remote repo:
100 100 #
101 101 # addchangegroup assumes local user can lock remote
102 102 # repo (local filesystem, old ssh servers).
103 103 #
104 104 # unbundle assumes local user cannot lock remote repo (new ssh
105 105 # servers, http servers).
106 106
107 107 if not pushop.remote.canpush():
108 108 raise util.Abort(_("destination does not support push"))
109 109 # get local lock as we might write phase data
110 110 locallock = None
111 111 try:
112 112 locallock = pushop.repo.lock()
113 113 pushop.locallocked = True
114 114 except IOError, err:
115 115 pushop.locallocked = False
116 116 if err.errno != errno.EACCES:
117 117 raise
118 118 # source repo cannot be locked.
119 119 # We do not abort the push, but just disable the local phase
120 120 # synchronisation.
121 121 msg = 'cannot lock source repository: %s\n' % err
122 122 pushop.ui.debug(msg)
123 123 try:
124 124 pushop.repo.checkpush(pushop)
125 125 lock = None
126 126 unbundle = pushop.remote.capable('unbundle')
127 127 if not unbundle:
128 128 lock = pushop.remote.lock()
129 129 try:
130 130 _pushdiscovery(pushop)
131 131 if _pushcheckoutgoing(pushop):
132 132 pushop.repo.prepushoutgoinghooks(pushop.repo,
133 133 pushop.remote,
134 134 pushop.outgoing)
135 135 if (pushop.repo.ui.configbool('experimental', 'bundle2-exp',
136 136 False)
137 137 and pushop.remote.capable('bundle2-exp')):
138 138 _pushbundle2(pushop)
139 139 else:
140 140 _pushchangeset(pushop)
141 141 _pushcomputecommonheads(pushop)
142 142 _pushsyncphase(pushop)
143 143 _pushobsolete(pushop)
144 144 finally:
145 145 if lock is not None:
146 146 lock.release()
147 147 finally:
148 148 if locallock is not None:
149 149 locallock.release()
150 150
151 151 _pushbookmark(pushop)
152 152 return pushop.ret
153 153
154 154 def _pushdiscovery(pushop):
155 155 # discovery
156 156 unfi = pushop.repo.unfiltered()
157 157 fci = discovery.findcommonincoming
158 158 commoninc = fci(unfi, pushop.remote, force=pushop.force)
159 159 common, inc, remoteheads = commoninc
160 160 fco = discovery.findcommonoutgoing
161 161 outgoing = fco(unfi, pushop.remote, onlyheads=pushop.revs,
162 162 commoninc=commoninc, force=pushop.force)
163 163 pushop.outgoing = outgoing
164 164 pushop.remoteheads = remoteheads
165 165 pushop.incoming = inc
166 166
167 167 def _pushcheckoutgoing(pushop):
168 168 outgoing = pushop.outgoing
169 169 unfi = pushop.repo.unfiltered()
170 170 if not outgoing.missing:
171 171 # nothing to push
172 172 scmutil.nochangesfound(unfi.ui, unfi, outgoing.excluded)
173 173 return False
174 174 # something to push
175 175 if not pushop.force:
176 176 # if repo.obsstore == False --> no obsolete
177 177 # then, save the iteration
178 178 if unfi.obsstore:
179 179 # this message are here for 80 char limit reason
180 180 mso = _("push includes obsolete changeset: %s!")
181 181 mst = "push includes %s changeset: %s!"
182 182 # plain versions for i18n tool to detect them
183 183 _("push includes unstable changeset: %s!")
184 184 _("push includes bumped changeset: %s!")
185 185 _("push includes divergent changeset: %s!")
186 186 # If we are to push if there is at least one
187 187 # obsolete or unstable changeset in missing, at
188 188 # least one of the missinghead will be obsolete or
189 189 # unstable. So checking heads only is ok
190 190 for node in outgoing.missingheads:
191 191 ctx = unfi[node]
192 192 if ctx.obsolete():
193 193 raise util.Abort(mso % ctx)
194 194 elif ctx.troubled():
195 195 raise util.Abort(_(mst)
196 196 % (ctx.troubles()[0],
197 197 ctx))
198 198 newbm = pushop.ui.configlist('bookmarks', 'pushing')
199 199 discovery.checkheads(unfi, pushop.remote, outgoing,
200 200 pushop.remoteheads,
201 201 pushop.newbranch,
202 202 bool(pushop.incoming),
203 203 newbm)
204 204 return True
205 205
206 206 def _pushbundle2(pushop):
207 207 """push data to the remote using bundle2
208 208
209 209 The only currently supported type of data is changegroup but this will
210 210 evolve in the future."""
211 211 bundler = bundle2.bundle20(pushop.ui, bundle2.bundle2caps(pushop.remote))
212 212 # create reply capability
213 213 capsblob = bundle2.encodecaps(pushop.repo.bundle2caps)
214 214 bundler.newpart('b2x:replycaps', data=capsblob)
215 extrainfo = _pushbundle2extraparts(pushop, bundler)
215 216 # Send known heads to the server for race detection.
216 217 if not pushop.force:
217 218 bundler.newpart('B2X:CHECK:HEADS', data=iter(pushop.remoteheads))
218 extrainfo = _pushbundle2extraparts(pushop, bundler)
219 219 # add the changegroup bundle
220 220 cg = changegroup.getlocalbundle(pushop.repo, 'push', pushop.outgoing)
221 221 cgpart = bundler.newpart('B2X:CHANGEGROUP', data=cg.getchunks())
222 222 stream = util.chunkbuffer(bundler.getchunks())
223 223 try:
224 224 reply = pushop.remote.unbundle(stream, ['force'], 'push')
225 225 except error.BundleValueError, exc:
226 226 raise util.Abort('missing support for %s' % exc)
227 227 try:
228 228 op = bundle2.processbundle(pushop.repo, reply)
229 229 except error.BundleValueError, exc:
230 230 raise util.Abort('missing support for %s' % exc)
231 231 cgreplies = op.records.getreplies(cgpart.id)
232 232 assert len(cgreplies['changegroup']) == 1
233 233 pushop.ret = cgreplies['changegroup'][0]['return']
234 234 _pushbundle2extrareply(pushop, op, extrainfo)
235 235
236 236 def _pushbundle2extraparts(pushop, bundler):
237 237 """hook function to let extensions add parts
238 238
239 239 Return a dict to let extensions pass data to the reply processing.
240 240 """
241 241 return {}
242 242
243 243 def _pushbundle2extrareply(pushop, op, extrainfo):
244 244 """hook function to let extensions react to part replies
245 245
246 246 The dict from _pushbundle2extrareply is fed to this function.
247 247 """
248 248 pass
249 249
250 250 def _pushchangeset(pushop):
251 251 """Make the actual push of changeset bundle to remote repo"""
252 252 outgoing = pushop.outgoing
253 253 unbundle = pushop.remote.capable('unbundle')
254 254 # TODO: get bundlecaps from remote
255 255 bundlecaps = None
256 256 # create a changegroup from local
257 257 if pushop.revs is None and not (outgoing.excluded
258 258 or pushop.repo.changelog.filteredrevs):
259 259 # push everything,
260 260 # use the fast path, no race possible on push
261 261 bundler = changegroup.bundle10(pushop.repo, bundlecaps)
262 262 cg = changegroup.getsubset(pushop.repo,
263 263 outgoing,
264 264 bundler,
265 265 'push',
266 266 fastpath=True)
267 267 else:
268 268 cg = changegroup.getlocalbundle(pushop.repo, 'push', outgoing,
269 269 bundlecaps)
270 270
271 271 # apply changegroup to remote
272 272 if unbundle:
273 273 # local repo finds heads on server, finds out what
274 274 # revs it must push. once revs transferred, if server
275 275 # finds it has different heads (someone else won
276 276 # commit/push race), server aborts.
277 277 if pushop.force:
278 278 remoteheads = ['force']
279 279 else:
280 280 remoteheads = pushop.remoteheads
281 281 # ssh: return remote's addchangegroup()
282 282 # http: return remote's addchangegroup() or 0 for error
283 283 pushop.ret = pushop.remote.unbundle(cg, remoteheads,
284 284 pushop.repo.url())
285 285 else:
286 286 # we return an integer indicating remote head count
287 287 # change
288 288 pushop.ret = pushop.remote.addchangegroup(cg, 'push', pushop.repo.url())
289 289
290 290 def _pushcomputecommonheads(pushop):
291 291 unfi = pushop.repo.unfiltered()
292 292 if pushop.ret:
293 293 # push succeed, synchronize target of the push
294 294 cheads = pushop.outgoing.missingheads
295 295 elif pushop.revs is None:
296 296 # All out push fails. synchronize all common
297 297 cheads = pushop.outgoing.commonheads
298 298 else:
299 299 # I want cheads = heads(::missingheads and ::commonheads)
300 300 # (missingheads is revs with secret changeset filtered out)
301 301 #
302 302 # This can be expressed as:
303 303 # cheads = ( (missingheads and ::commonheads)
304 304 # + (commonheads and ::missingheads))"
305 305 # )
306 306 #
307 307 # while trying to push we already computed the following:
308 308 # common = (::commonheads)
309 309 # missing = ((commonheads::missingheads) - commonheads)
310 310 #
311 311 # We can pick:
312 312 # * missingheads part of common (::commonheads)
313 313 common = set(pushop.outgoing.common)
314 314 nm = pushop.repo.changelog.nodemap
315 315 cheads = [node for node in pushop.revs if nm[node] in common]
316 316 # and
317 317 # * commonheads parents on missing
318 318 revset = unfi.set('%ln and parents(roots(%ln))',
319 319 pushop.outgoing.commonheads,
320 320 pushop.outgoing.missing)
321 321 cheads.extend(c.node() for c in revset)
322 322 pushop.commonheads = cheads
323 323
324 324 def _pushsyncphase(pushop):
325 325 """synchronise phase information locally and remotely"""
326 326 unfi = pushop.repo.unfiltered()
327 327 cheads = pushop.commonheads
328 328 # even when we don't push, exchanging phase data is useful
329 329 remotephases = pushop.remote.listkeys('phases')
330 330 if (pushop.ui.configbool('ui', '_usedassubrepo', False)
331 331 and remotephases # server supports phases
332 332 and pushop.ret is None # nothing was pushed
333 333 and remotephases.get('publishing', False)):
334 334 # When:
335 335 # - this is a subrepo push
336 336 # - and remote support phase
337 337 # - and no changeset was pushed
338 338 # - and remote is publishing
339 339 # We may be in issue 3871 case!
340 340 # We drop the possible phase synchronisation done by
341 341 # courtesy to publish changesets possibly locally draft
342 342 # on the remote.
343 343 remotephases = {'publishing': 'True'}
344 344 if not remotephases: # old server or public only reply from non-publishing
345 345 _localphasemove(pushop, cheads)
346 346 # don't push any phase data as there is nothing to push
347 347 else:
348 348 ana = phases.analyzeremotephases(pushop.repo, cheads,
349 349 remotephases)
350 350 pheads, droots = ana
351 351 ### Apply remote phase on local
352 352 if remotephases.get('publishing', False):
353 353 _localphasemove(pushop, cheads)
354 354 else: # publish = False
355 355 _localphasemove(pushop, pheads)
356 356 _localphasemove(pushop, cheads, phases.draft)
357 357 ### Apply local phase on remote
358 358
359 359 # Get the list of all revs draft on remote by public here.
360 360 # XXX Beware that revset break if droots is not strictly
361 361 # XXX root we may want to ensure it is but it is costly
362 362 outdated = unfi.set('heads((%ln::%ln) and public())',
363 363 droots, cheads)
364 364
365 365 b2caps = bundle2.bundle2caps(pushop.remote)
366 366 if 'b2x:pushkey' in b2caps:
367 367 # server supports bundle2, let's do a batched push through it
368 368 #
369 369 # This will eventually be unified with the changesets bundle2 push
370 370 bundler = bundle2.bundle20(pushop.ui, b2caps)
371 371 capsblob = bundle2.encodecaps(pushop.repo.bundle2caps)
372 372 bundler.newpart('b2x:replycaps', data=capsblob)
373 373 part2node = []
374 374 enc = pushkey.encode
375 375 for newremotehead in outdated:
376 376 part = bundler.newpart('b2x:pushkey')
377 377 part.addparam('namespace', enc('phases'))
378 378 part.addparam('key', enc(newremotehead.hex()))
379 379 part.addparam('old', enc(str(phases.draft)))
380 380 part.addparam('new', enc(str(phases.public)))
381 381 part2node.append((part.id, newremotehead))
382 382 stream = util.chunkbuffer(bundler.getchunks())
383 383 try:
384 384 reply = pushop.remote.unbundle(stream, ['force'], 'push')
385 385 op = bundle2.processbundle(pushop.repo, reply)
386 386 except error.BundleValueError, exc:
387 387 raise util.Abort('missing support for %s' % exc)
388 388 for partid, node in part2node:
389 389 partrep = op.records.getreplies(partid)
390 390 results = partrep['pushkey']
391 391 assert len(results) <= 1
392 392 msg = None
393 393 if not results:
394 394 msg = _('server ignored update of %s to public!\n') % node
395 395 elif not int(results[0]['return']):
396 396 msg = _('updating %s to public failed!\n') % node
397 397 if msg is not None:
398 398 pushop.ui.warn(msg)
399 399
400 400 else:
401 401 # fallback to independant pushkey command
402 402 for newremotehead in outdated:
403 403 r = pushop.remote.pushkey('phases',
404 404 newremotehead.hex(),
405 405 str(phases.draft),
406 406 str(phases.public))
407 407 if not r:
408 408 pushop.ui.warn(_('updating %s to public failed!\n')
409 409 % newremotehead)
410 410
411 411 def _localphasemove(pushop, nodes, phase=phases.public):
412 412 """move <nodes> to <phase> in the local source repo"""
413 413 if pushop.locallocked:
414 414 phases.advanceboundary(pushop.repo, phase, nodes)
415 415 else:
416 416 # repo is not locked, do not change any phases!
417 417 # Informs the user that phases should have been moved when
418 418 # applicable.
419 419 actualmoves = [n for n in nodes if phase < pushop.repo[n].phase()]
420 420 phasestr = phases.phasenames[phase]
421 421 if actualmoves:
422 422 pushop.ui.status(_('cannot lock source repo, skipping '
423 423 'local %s phase update\n') % phasestr)
424 424
425 425 def _pushobsolete(pushop):
426 426 """utility function to push obsolete markers to a remote"""
427 427 pushop.ui.debug('try to push obsolete markers to remote\n')
428 428 repo = pushop.repo
429 429 remote = pushop.remote
430 430 if (obsolete._enabled and repo.obsstore and
431 431 'obsolete' in remote.listkeys('namespaces')):
432 432 rslts = []
433 433 remotedata = repo.listkeys('obsolete')
434 434 for key in sorted(remotedata, reverse=True):
435 435 # reverse sort to ensure we end with dump0
436 436 data = remotedata[key]
437 437 rslts.append(remote.pushkey('obsolete', key, '', data))
438 438 if [r for r in rslts if not r]:
439 439 msg = _('failed to push some obsolete markers!\n')
440 440 repo.ui.warn(msg)
441 441
442 442 def _pushbookmark(pushop):
443 443 """Update bookmark position on remote"""
444 444 ui = pushop.ui
445 445 repo = pushop.repo.unfiltered()
446 446 remote = pushop.remote
447 447 ui.debug("checking for updated bookmarks\n")
448 448 revnums = map(repo.changelog.rev, pushop.revs or [])
449 449 ancestors = [a for a in repo.changelog.ancestors(revnums, inclusive=True)]
450 450 (addsrc, adddst, advsrc, advdst, diverge, differ, invalid
451 451 ) = bookmarks.compare(repo, repo._bookmarks, remote.listkeys('bookmarks'),
452 452 srchex=hex)
453 453
454 454 for b, scid, dcid in advsrc:
455 455 if ancestors and repo[scid].rev() not in ancestors:
456 456 continue
457 457 if remote.pushkey('bookmarks', b, dcid, scid):
458 458 ui.status(_("updating bookmark %s\n") % b)
459 459 else:
460 460 ui.warn(_('updating bookmark %s failed!\n') % b)
461 461
462 462 class pulloperation(object):
463 463 """A object that represent a single pull operation
464 464
465 465 It purpose is to carry push related state and very common operation.
466 466
467 467 A new should be created at the beginning of each pull and discarded
468 468 afterward.
469 469 """
470 470
471 471 def __init__(self, repo, remote, heads=None, force=False):
472 472 # repo we pull into
473 473 self.repo = repo
474 474 # repo we pull from
475 475 self.remote = remote
476 476 # revision we try to pull (None is "all")
477 477 self.heads = heads
478 478 # do we force pull?
479 479 self.force = force
480 480 # the name the pull transaction
481 481 self._trname = 'pull\n' + util.hidepassword(remote.url())
482 482 # hold the transaction once created
483 483 self._tr = None
484 484 # set of common changeset between local and remote before pull
485 485 self.common = None
486 486 # set of pulled head
487 487 self.rheads = None
488 488 # list of missing changeset to fetch remotely
489 489 self.fetch = None
490 490 # result of changegroup pulling (used as return code by pull)
491 491 self.cgresult = None
492 492 # list of step remaining todo (related to future bundle2 usage)
493 493 self.todosteps = set(['changegroup', 'phases', 'obsmarkers'])
494 494
495 495 @util.propertycache
496 496 def pulledsubset(self):
497 497 """heads of the set of changeset target by the pull"""
498 498 # compute target subset
499 499 if self.heads is None:
500 500 # We pulled every thing possible
501 501 # sync on everything common
502 502 c = set(self.common)
503 503 ret = list(self.common)
504 504 for n in self.rheads:
505 505 if n not in c:
506 506 ret.append(n)
507 507 return ret
508 508 else:
509 509 # We pulled a specific subset
510 510 # sync on this subset
511 511 return self.heads
512 512
513 513 def gettransaction(self):
514 514 """get appropriate pull transaction, creating it if needed"""
515 515 if self._tr is None:
516 516 self._tr = self.repo.transaction(self._trname)
517 517 return self._tr
518 518
519 519 def closetransaction(self):
520 520 """close transaction if created"""
521 521 if self._tr is not None:
522 522 self._tr.close()
523 523
524 524 def releasetransaction(self):
525 525 """release transaction if created"""
526 526 if self._tr is not None:
527 527 self._tr.release()
528 528
529 529 def pull(repo, remote, heads=None, force=False):
530 530 pullop = pulloperation(repo, remote, heads, force)
531 531 if pullop.remote.local():
532 532 missing = set(pullop.remote.requirements) - pullop.repo.supported
533 533 if missing:
534 534 msg = _("required features are not"
535 535 " supported in the destination:"
536 536 " %s") % (', '.join(sorted(missing)))
537 537 raise util.Abort(msg)
538 538
539 539 lock = pullop.repo.lock()
540 540 try:
541 541 _pulldiscovery(pullop)
542 542 if (pullop.repo.ui.configbool('experimental', 'bundle2-exp', False)
543 543 and pullop.remote.capable('bundle2-exp')):
544 544 _pullbundle2(pullop)
545 545 if 'changegroup' in pullop.todosteps:
546 546 _pullchangeset(pullop)
547 547 if 'phases' in pullop.todosteps:
548 548 _pullphase(pullop)
549 549 if 'obsmarkers' in pullop.todosteps:
550 550 _pullobsolete(pullop)
551 551 pullop.closetransaction()
552 552 finally:
553 553 pullop.releasetransaction()
554 554 lock.release()
555 555
556 556 return pullop.cgresult
557 557
558 558 def _pulldiscovery(pullop):
559 559 """discovery phase for the pull
560 560
561 561 Current handle changeset discovery only, will change handle all discovery
562 562 at some point."""
563 563 tmp = discovery.findcommonincoming(pullop.repo.unfiltered(),
564 564 pullop.remote,
565 565 heads=pullop.heads,
566 566 force=pullop.force)
567 567 pullop.common, pullop.fetch, pullop.rheads = tmp
568 568
569 569 def _pullbundle2(pullop):
570 570 """pull data using bundle2
571 571
572 572 For now, the only supported data are changegroup."""
573 573 remotecaps = bundle2.bundle2caps(pullop.remote)
574 574 kwargs = {'bundlecaps': caps20to10(pullop.repo)}
575 575 # pulling changegroup
576 576 pullop.todosteps.remove('changegroup')
577 577
578 578 kwargs['common'] = pullop.common
579 579 kwargs['heads'] = pullop.heads or pullop.rheads
580 580 if 'b2x:listkeys' in remotecaps:
581 581 kwargs['listkeys'] = ['phase']
582 582 if not pullop.fetch:
583 583 pullop.repo.ui.status(_("no changes found\n"))
584 584 pullop.cgresult = 0
585 585 else:
586 586 if pullop.heads is None and list(pullop.common) == [nullid]:
587 587 pullop.repo.ui.status(_("requesting all changes\n"))
588 588 _pullbundle2extraprepare(pullop, kwargs)
589 589 if kwargs.keys() == ['format']:
590 590 return # nothing to pull
591 591 bundle = pullop.remote.getbundle('pull', **kwargs)
592 592 try:
593 593 op = bundle2.processbundle(pullop.repo, bundle, pullop.gettransaction)
594 594 except error.BundleValueError, exc:
595 595 raise util.Abort('missing support for %s' % exc)
596 596
597 597 if pullop.fetch:
598 598 assert len(op.records['changegroup']) == 1
599 599 pullop.cgresult = op.records['changegroup'][0]['return']
600 600
601 601 # processing phases change
602 602 for namespace, value in op.records['listkeys']:
603 603 if namespace == 'phases':
604 604 _pullapplyphases(pullop, value)
605 605
606 606 def _pullbundle2extraprepare(pullop, kwargs):
607 607 """hook function so that extensions can extend the getbundle call"""
608 608 pass
609 609
610 610 def _pullchangeset(pullop):
611 611 """pull changeset from unbundle into the local repo"""
612 612 # We delay the open of the transaction as late as possible so we
613 613 # don't open transaction for nothing or you break future useful
614 614 # rollback call
615 615 pullop.todosteps.remove('changegroup')
616 616 if not pullop.fetch:
617 617 pullop.repo.ui.status(_("no changes found\n"))
618 618 pullop.cgresult = 0
619 619 return
620 620 pullop.gettransaction()
621 621 if pullop.heads is None and list(pullop.common) == [nullid]:
622 622 pullop.repo.ui.status(_("requesting all changes\n"))
623 623 elif pullop.heads is None and pullop.remote.capable('changegroupsubset'):
624 624 # issue1320, avoid a race if remote changed after discovery
625 625 pullop.heads = pullop.rheads
626 626
627 627 if pullop.remote.capable('getbundle'):
628 628 # TODO: get bundlecaps from remote
629 629 cg = pullop.remote.getbundle('pull', common=pullop.common,
630 630 heads=pullop.heads or pullop.rheads)
631 631 elif pullop.heads is None:
632 632 cg = pullop.remote.changegroup(pullop.fetch, 'pull')
633 633 elif not pullop.remote.capable('changegroupsubset'):
634 634 raise util.Abort(_("partial pull cannot be done because "
635 635 "other repository doesn't support "
636 636 "changegroupsubset."))
637 637 else:
638 638 cg = pullop.remote.changegroupsubset(pullop.fetch, pullop.heads, 'pull')
639 639 pullop.cgresult = changegroup.addchangegroup(pullop.repo, cg, 'pull',
640 640 pullop.remote.url())
641 641
642 642 def _pullphase(pullop):
643 643 # Get remote phases data from remote
644 644 remotephases = pullop.remote.listkeys('phases')
645 645 _pullapplyphases(pullop, remotephases)
646 646
647 647 def _pullapplyphases(pullop, remotephases):
648 648 """apply phase movement from observed remote state"""
649 649 pullop.todosteps.remove('phases')
650 650 publishing = bool(remotephases.get('publishing', False))
651 651 if remotephases and not publishing:
652 652 # remote is new and unpublishing
653 653 pheads, _dr = phases.analyzeremotephases(pullop.repo,
654 654 pullop.pulledsubset,
655 655 remotephases)
656 656 phases.advanceboundary(pullop.repo, phases.public, pheads)
657 657 phases.advanceboundary(pullop.repo, phases.draft,
658 658 pullop.pulledsubset)
659 659 else:
660 660 # Remote is old or publishing all common changesets
661 661 # should be seen as public
662 662 phases.advanceboundary(pullop.repo, phases.public,
663 663 pullop.pulledsubset)
664 664
665 665 def _pullobsolete(pullop):
666 666 """utility function to pull obsolete markers from a remote
667 667
668 668 The `gettransaction` is function that return the pull transaction, creating
669 669 one if necessary. We return the transaction to inform the calling code that
670 670 a new transaction have been created (when applicable).
671 671
672 672 Exists mostly to allow overriding for experimentation purpose"""
673 673 pullop.todosteps.remove('obsmarkers')
674 674 tr = None
675 675 if obsolete._enabled:
676 676 pullop.repo.ui.debug('fetching remote obsolete markers\n')
677 677 remoteobs = pullop.remote.listkeys('obsolete')
678 678 if 'dump0' in remoteobs:
679 679 tr = pullop.gettransaction()
680 680 for key in sorted(remoteobs, reverse=True):
681 681 if key.startswith('dump'):
682 682 data = base85.b85decode(remoteobs[key])
683 683 pullop.repo.obsstore.mergemarkers(tr, data)
684 684 pullop.repo.invalidatevolatilesets()
685 685 return tr
686 686
687 687 def caps20to10(repo):
688 688 """return a set with appropriate options to use bundle20 during getbundle"""
689 689 caps = set(['HG2X'])
690 690 capsblob = bundle2.encodecaps(repo.bundle2caps)
691 691 caps.add('bundle2=' + urllib.quote(capsblob))
692 692 return caps
693 693
694 694 def getbundle(repo, source, heads=None, common=None, bundlecaps=None,
695 695 **kwargs):
696 696 """return a full bundle (with potentially multiple kind of parts)
697 697
698 698 Could be a bundle HG10 or a bundle HG2X depending on bundlecaps
699 699 passed. For now, the bundle can contain only changegroup, but this will
700 700 changes when more part type will be available for bundle2.
701 701
702 702 This is different from changegroup.getbundle that only returns an HG10
703 703 changegroup bundle. They may eventually get reunited in the future when we
704 704 have a clearer idea of the API we what to query different data.
705 705
706 706 The implementation is at a very early stage and will get massive rework
707 707 when the API of bundle is refined.
708 708 """
709 709 # build changegroup bundle here.
710 710 cg = changegroup.getbundle(repo, source, heads=heads,
711 711 common=common, bundlecaps=bundlecaps)
712 712 if bundlecaps is None or 'HG2X' not in bundlecaps:
713 713 if kwargs:
714 714 raise ValueError(_('unsupported getbundle arguments: %s')
715 715 % ', '.join(sorted(kwargs.keys())))
716 716 return cg
717 717 # very crude first implementation,
718 718 # the bundle API will change and the generation will be done lazily.
719 719 b2caps = {}
720 720 for bcaps in bundlecaps:
721 721 if bcaps.startswith('bundle2='):
722 722 blob = urllib.unquote(bcaps[len('bundle2='):])
723 723 b2caps.update(bundle2.decodecaps(blob))
724 724 bundler = bundle2.bundle20(repo.ui, b2caps)
725 725 if cg:
726 726 bundler.newpart('b2x:changegroup', data=cg.getchunks())
727 727 listkeys = kwargs.get('listkeys', ())
728 728 for namespace in listkeys:
729 729 part = bundler.newpart('b2x:listkeys')
730 730 part.addparam('namespace', namespace)
731 731 keys = repo.listkeys(namespace).items()
732 732 part.data = pushkey.encodekeys(keys)
733 733 _getbundleextrapart(bundler, repo, source, heads=heads, common=common,
734 734 bundlecaps=bundlecaps, **kwargs)
735 735 return util.chunkbuffer(bundler.getchunks())
736 736
737 737 def _getbundleextrapart(bundler, repo, source, heads=None, common=None,
738 738 bundlecaps=None, **kwargs):
739 739 """hook function to let extensions add parts to the requested bundle"""
740 740 pass
741 741
742 742 def check_heads(repo, their_heads, context):
743 743 """check if the heads of a repo have been modified
744 744
745 745 Used by peer for unbundling.
746 746 """
747 747 heads = repo.heads()
748 748 heads_hash = util.sha1(''.join(sorted(heads))).digest()
749 749 if not (their_heads == ['force'] or their_heads == heads or
750 750 their_heads == ['hashed', heads_hash]):
751 751 # someone else committed/pushed/unbundled while we
752 752 # were transferring data
753 753 raise error.PushRaced('repository changed while %s - '
754 754 'please try again' % context)
755 755
756 756 def unbundle(repo, cg, heads, source, url):
757 757 """Apply a bundle to a repo.
758 758
759 759 this function makes sure the repo is locked during the application and have
760 760 mechanism to check that no push race occurred between the creation of the
761 761 bundle and its application.
762 762
763 763 If the push was raced as PushRaced exception is raised."""
764 764 r = 0
765 765 # need a transaction when processing a bundle2 stream
766 766 tr = None
767 767 lock = repo.lock()
768 768 try:
769 769 check_heads(repo, heads, 'uploading changes')
770 770 # push can proceed
771 771 if util.safehasattr(cg, 'params'):
772 772 try:
773 773 tr = repo.transaction('unbundle')
774 774 tr.hookargs['bundle2-exp'] = '1'
775 775 r = bundle2.processbundle(repo, cg, lambda: tr).reply
776 776 cl = repo.unfiltered().changelog
777 777 p = cl.writepending() and repo.root or ""
778 778 repo.hook('b2x-pretransactionclose', throw=True, source=source,
779 779 url=url, pending=p, **tr.hookargs)
780 780 tr.close()
781 781 repo.hook('b2x-transactionclose', source=source, url=url,
782 782 **tr.hookargs)
783 783 except Exception, exc:
784 784 exc.duringunbundle2 = True
785 785 raise
786 786 else:
787 787 r = changegroup.addchangegroup(repo, cg, source, url)
788 788 finally:
789 789 if tr is not None:
790 790 tr.release()
791 791 lock.release()
792 792 return r
General Comments 0
You need to be logged in to leave comments. Login now