##// END OF EJS Templates
exchange: allow fallbackheads to use lazy set behavior...
Durham Goode -
r26184:327d09f0 default
parent child Browse files
Show More
@@ -1,1574 +1,1574 b''
1 1 # exchange.py - utility to exchange data between repos.
2 2 #
3 3 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 import time
9 9 from i18n import _
10 10 from node import hex, nullid
11 11 import errno, urllib
12 12 import util, scmutil, changegroup, base85, error, store
13 13 import discovery, phases, obsolete, bookmarks as bookmod, bundle2, pushkey
14 14 import lock as lockmod
15 15 import tags
16 16
17 17 def readbundle(ui, fh, fname, vfs=None):
18 18 header = changegroup.readexactly(fh, 4)
19 19
20 20 alg = None
21 21 if not fname:
22 22 fname = "stream"
23 23 if not header.startswith('HG') and header.startswith('\0'):
24 24 fh = changegroup.headerlessfixup(fh, header)
25 25 header = "HG10"
26 26 alg = 'UN'
27 27 elif vfs:
28 28 fname = vfs.join(fname)
29 29
30 30 magic, version = header[0:2], header[2:4]
31 31
32 32 if magic != 'HG':
33 33 raise util.Abort(_('%s: not a Mercurial bundle') % fname)
34 34 if version == '10':
35 35 if alg is None:
36 36 alg = changegroup.readexactly(fh, 2)
37 37 return changegroup.cg1unpacker(fh, alg)
38 38 elif version.startswith('2'):
39 39 return bundle2.getunbundler(ui, fh, magicstring=magic + version)
40 40 else:
41 41 raise util.Abort(_('%s: unknown bundle version %s') % (fname, version))
42 42
43 43 def buildobsmarkerspart(bundler, markers):
44 44 """add an obsmarker part to the bundler with <markers>
45 45
46 46 No part is created if markers is empty.
47 47 Raises ValueError if the bundler doesn't support any known obsmarker format.
48 48 """
49 49 if markers:
50 50 remoteversions = bundle2.obsmarkersversion(bundler.capabilities)
51 51 version = obsolete.commonversion(remoteversions)
52 52 if version is None:
53 53 raise ValueError('bundler do not support common obsmarker format')
54 54 stream = obsolete.encodemarkers(markers, True, version=version)
55 55 return bundler.newpart('obsmarkers', data=stream)
56 56 return None
57 57
58 58 def _canusebundle2(op):
59 59 """return true if a pull/push can use bundle2
60 60
61 61 Feel free to nuke this function when we drop the experimental option"""
62 62 return (op.repo.ui.configbool('experimental', 'bundle2-exp', True)
63 63 and op.remote.capable('bundle2'))
64 64
65 65
66 66 class pushoperation(object):
67 67 """A object that represent a single push operation
68 68
69 69 It purpose is to carry push related state and very common operation.
70 70
71 71 A new should be created at the beginning of each push and discarded
72 72 afterward.
73 73 """
74 74
75 75 def __init__(self, repo, remote, force=False, revs=None, newbranch=False,
76 76 bookmarks=()):
77 77 # repo we push from
78 78 self.repo = repo
79 79 self.ui = repo.ui
80 80 # repo we push to
81 81 self.remote = remote
82 82 # force option provided
83 83 self.force = force
84 84 # revs to be pushed (None is "all")
85 85 self.revs = revs
86 86 # bookmark explicitly pushed
87 87 self.bookmarks = bookmarks
88 88 # allow push of new branch
89 89 self.newbranch = newbranch
90 90 # did a local lock get acquired?
91 91 self.locallocked = None
92 92 # step already performed
93 93 # (used to check what steps have been already performed through bundle2)
94 94 self.stepsdone = set()
95 95 # Integer version of the changegroup push result
96 96 # - None means nothing to push
97 97 # - 0 means HTTP error
98 98 # - 1 means we pushed and remote head count is unchanged *or*
99 99 # we have outgoing changesets but refused to push
100 100 # - other values as described by addchangegroup()
101 101 self.cgresult = None
102 102 # Boolean value for the bookmark push
103 103 self.bkresult = None
104 104 # discover.outgoing object (contains common and outgoing data)
105 105 self.outgoing = None
106 106 # all remote heads before the push
107 107 self.remoteheads = None
108 108 # testable as a boolean indicating if any nodes are missing locally.
109 109 self.incoming = None
110 110 # phases changes that must be pushed along side the changesets
111 111 self.outdatedphases = None
112 112 # phases changes that must be pushed if changeset push fails
113 113 self.fallbackoutdatedphases = None
114 114 # outgoing obsmarkers
115 115 self.outobsmarkers = set()
116 116 # outgoing bookmarks
117 117 self.outbookmarks = []
118 118 # transaction manager
119 119 self.trmanager = None
120 120 # map { pushkey partid -> callback handling failure}
121 121 # used to handle exception from mandatory pushkey part failure
122 122 self.pkfailcb = {}
123 123
124 124 @util.propertycache
125 125 def futureheads(self):
126 126 """future remote heads if the changeset push succeeds"""
127 127 return self.outgoing.missingheads
128 128
129 129 @util.propertycache
130 130 def fallbackheads(self):
131 131 """future remote heads if the changeset push fails"""
132 132 if self.revs is None:
133 133 # not target to push, all common are relevant
134 134 return self.outgoing.commonheads
135 135 unfi = self.repo.unfiltered()
136 136 # I want cheads = heads(::missingheads and ::commonheads)
137 137 # (missingheads is revs with secret changeset filtered out)
138 138 #
139 139 # This can be expressed as:
140 140 # cheads = ( (missingheads and ::commonheads)
141 141 # + (commonheads and ::missingheads))"
142 142 # )
143 143 #
144 144 # while trying to push we already computed the following:
145 145 # common = (::commonheads)
146 146 # missing = ((commonheads::missingheads) - commonheads)
147 147 #
148 148 # We can pick:
149 149 # * missingheads part of common (::commonheads)
150 common = set(self.outgoing.common)
150 common = self.outgoing.common
151 151 nm = self.repo.changelog.nodemap
152 152 cheads = [node for node in self.revs if nm[node] in common]
153 153 # and
154 154 # * commonheads parents on missing
155 155 revset = unfi.set('%ln and parents(roots(%ln))',
156 156 self.outgoing.commonheads,
157 157 self.outgoing.missing)
158 158 cheads.extend(c.node() for c in revset)
159 159 return cheads
160 160
161 161 @property
162 162 def commonheads(self):
163 163 """set of all common heads after changeset bundle push"""
164 164 if self.cgresult:
165 165 return self.futureheads
166 166 else:
167 167 return self.fallbackheads
168 168
169 169 # mapping of message used when pushing bookmark
170 170 bookmsgmap = {'update': (_("updating bookmark %s\n"),
171 171 _('updating bookmark %s failed!\n')),
172 172 'export': (_("exporting bookmark %s\n"),
173 173 _('exporting bookmark %s failed!\n')),
174 174 'delete': (_("deleting remote bookmark %s\n"),
175 175 _('deleting remote bookmark %s failed!\n')),
176 176 }
177 177
178 178
179 179 def push(repo, remote, force=False, revs=None, newbranch=False, bookmarks=()):
180 180 '''Push outgoing changesets (limited by revs) from a local
181 181 repository to remote. Return an integer:
182 182 - None means nothing to push
183 183 - 0 means HTTP error
184 184 - 1 means we pushed and remote head count is unchanged *or*
185 185 we have outgoing changesets but refused to push
186 186 - other values as described by addchangegroup()
187 187 '''
188 188 pushop = pushoperation(repo, remote, force, revs, newbranch, bookmarks)
189 189 if pushop.remote.local():
190 190 missing = (set(pushop.repo.requirements)
191 191 - pushop.remote.local().supported)
192 192 if missing:
193 193 msg = _("required features are not"
194 194 " supported in the destination:"
195 195 " %s") % (', '.join(sorted(missing)))
196 196 raise util.Abort(msg)
197 197
198 198 # there are two ways to push to remote repo:
199 199 #
200 200 # addchangegroup assumes local user can lock remote
201 201 # repo (local filesystem, old ssh servers).
202 202 #
203 203 # unbundle assumes local user cannot lock remote repo (new ssh
204 204 # servers, http servers).
205 205
206 206 if not pushop.remote.canpush():
207 207 raise util.Abort(_("destination does not support push"))
208 208 # get local lock as we might write phase data
209 209 localwlock = locallock = None
210 210 try:
211 211 # bundle2 push may receive a reply bundle touching bookmarks or other
212 212 # things requiring the wlock. Take it now to ensure proper ordering.
213 213 maypushback = pushop.ui.configbool('experimental', 'bundle2.pushback')
214 214 if _canusebundle2(pushop) and maypushback:
215 215 localwlock = pushop.repo.wlock()
216 216 locallock = pushop.repo.lock()
217 217 pushop.locallocked = True
218 218 except IOError as err:
219 219 pushop.locallocked = False
220 220 if err.errno != errno.EACCES:
221 221 raise
222 222 # source repo cannot be locked.
223 223 # We do not abort the push, but just disable the local phase
224 224 # synchronisation.
225 225 msg = 'cannot lock source repository: %s\n' % err
226 226 pushop.ui.debug(msg)
227 227 try:
228 228 if pushop.locallocked:
229 229 pushop.trmanager = transactionmanager(repo,
230 230 'push-response',
231 231 pushop.remote.url())
232 232 pushop.repo.checkpush(pushop)
233 233 lock = None
234 234 unbundle = pushop.remote.capable('unbundle')
235 235 if not unbundle:
236 236 lock = pushop.remote.lock()
237 237 try:
238 238 _pushdiscovery(pushop)
239 239 if _canusebundle2(pushop):
240 240 _pushbundle2(pushop)
241 241 _pushchangeset(pushop)
242 242 _pushsyncphase(pushop)
243 243 _pushobsolete(pushop)
244 244 _pushbookmark(pushop)
245 245 finally:
246 246 if lock is not None:
247 247 lock.release()
248 248 if pushop.trmanager:
249 249 pushop.trmanager.close()
250 250 finally:
251 251 if pushop.trmanager:
252 252 pushop.trmanager.release()
253 253 if locallock is not None:
254 254 locallock.release()
255 255 if localwlock is not None:
256 256 localwlock.release()
257 257
258 258 return pushop
259 259
260 260 # list of steps to perform discovery before push
261 261 pushdiscoveryorder = []
262 262
263 263 # Mapping between step name and function
264 264 #
265 265 # This exists to help extensions wrap steps if necessary
266 266 pushdiscoverymapping = {}
267 267
268 268 def pushdiscovery(stepname):
269 269 """decorator for function performing discovery before push
270 270
271 271 The function is added to the step -> function mapping and appended to the
272 272 list of steps. Beware that decorated function will be added in order (this
273 273 may matter).
274 274
275 275 You can only use this decorator for a new step, if you want to wrap a step
276 276 from an extension, change the pushdiscovery dictionary directly."""
277 277 def dec(func):
278 278 assert stepname not in pushdiscoverymapping
279 279 pushdiscoverymapping[stepname] = func
280 280 pushdiscoveryorder.append(stepname)
281 281 return func
282 282 return dec
283 283
284 284 def _pushdiscovery(pushop):
285 285 """Run all discovery steps"""
286 286 for stepname in pushdiscoveryorder:
287 287 step = pushdiscoverymapping[stepname]
288 288 step(pushop)
289 289
290 290 @pushdiscovery('changeset')
291 291 def _pushdiscoverychangeset(pushop):
292 292 """discover the changeset that need to be pushed"""
293 293 fci = discovery.findcommonincoming
294 294 commoninc = fci(pushop.repo, pushop.remote, force=pushop.force)
295 295 common, inc, remoteheads = commoninc
296 296 fco = discovery.findcommonoutgoing
297 297 outgoing = fco(pushop.repo, pushop.remote, onlyheads=pushop.revs,
298 298 commoninc=commoninc, force=pushop.force)
299 299 pushop.outgoing = outgoing
300 300 pushop.remoteheads = remoteheads
301 301 pushop.incoming = inc
302 302
303 303 @pushdiscovery('phase')
304 304 def _pushdiscoveryphase(pushop):
305 305 """discover the phase that needs to be pushed
306 306
307 307 (computed for both success and failure case for changesets push)"""
308 308 outgoing = pushop.outgoing
309 309 unfi = pushop.repo.unfiltered()
310 310 remotephases = pushop.remote.listkeys('phases')
311 311 publishing = remotephases.get('publishing', False)
312 312 if (pushop.ui.configbool('ui', '_usedassubrepo', False)
313 313 and remotephases # server supports phases
314 314 and not pushop.outgoing.missing # no changesets to be pushed
315 315 and publishing):
316 316 # When:
317 317 # - this is a subrepo push
318 318 # - and remote support phase
319 319 # - and no changeset are to be pushed
320 320 # - and remote is publishing
321 321 # We may be in issue 3871 case!
322 322 # We drop the possible phase synchronisation done by
323 323 # courtesy to publish changesets possibly locally draft
324 324 # on the remote.
325 325 remotephases = {'publishing': 'True'}
326 326 ana = phases.analyzeremotephases(pushop.repo,
327 327 pushop.fallbackheads,
328 328 remotephases)
329 329 pheads, droots = ana
330 330 extracond = ''
331 331 if not publishing:
332 332 extracond = ' and public()'
333 333 revset = 'heads((%%ln::%%ln) %s)' % extracond
334 334 # Get the list of all revs draft on remote by public here.
335 335 # XXX Beware that revset break if droots is not strictly
336 336 # XXX root we may want to ensure it is but it is costly
337 337 fallback = list(unfi.set(revset, droots, pushop.fallbackheads))
338 338 if not outgoing.missing:
339 339 future = fallback
340 340 else:
341 341 # adds changeset we are going to push as draft
342 342 #
343 343 # should not be necessary for publishing server, but because of an
344 344 # issue fixed in xxxxx we have to do it anyway.
345 345 fdroots = list(unfi.set('roots(%ln + %ln::)',
346 346 outgoing.missing, droots))
347 347 fdroots = [f.node() for f in fdroots]
348 348 future = list(unfi.set(revset, fdroots, pushop.futureheads))
349 349 pushop.outdatedphases = future
350 350 pushop.fallbackoutdatedphases = fallback
351 351
352 352 @pushdiscovery('obsmarker')
353 353 def _pushdiscoveryobsmarkers(pushop):
354 354 if (obsolete.isenabled(pushop.repo, obsolete.exchangeopt)
355 355 and pushop.repo.obsstore
356 356 and 'obsolete' in pushop.remote.listkeys('namespaces')):
357 357 repo = pushop.repo
358 358 # very naive computation, that can be quite expensive on big repo.
359 359 # However: evolution is currently slow on them anyway.
360 360 nodes = (c.node() for c in repo.set('::%ln', pushop.futureheads))
361 361 pushop.outobsmarkers = pushop.repo.obsstore.relevantmarkers(nodes)
362 362
363 363 @pushdiscovery('bookmarks')
364 364 def _pushdiscoverybookmarks(pushop):
365 365 ui = pushop.ui
366 366 repo = pushop.repo.unfiltered()
367 367 remote = pushop.remote
368 368 ui.debug("checking for updated bookmarks\n")
369 369 ancestors = ()
370 370 if pushop.revs:
371 371 revnums = map(repo.changelog.rev, pushop.revs)
372 372 ancestors = repo.changelog.ancestors(revnums, inclusive=True)
373 373 remotebookmark = remote.listkeys('bookmarks')
374 374
375 375 explicit = set(pushop.bookmarks)
376 376
377 377 comp = bookmod.compare(repo, repo._bookmarks, remotebookmark, srchex=hex)
378 378 addsrc, adddst, advsrc, advdst, diverge, differ, invalid, same = comp
379 379 for b, scid, dcid in advsrc:
380 380 if b in explicit:
381 381 explicit.remove(b)
382 382 if not ancestors or repo[scid].rev() in ancestors:
383 383 pushop.outbookmarks.append((b, dcid, scid))
384 384 # search added bookmark
385 385 for b, scid, dcid in addsrc:
386 386 if b in explicit:
387 387 explicit.remove(b)
388 388 pushop.outbookmarks.append((b, '', scid))
389 389 # search for overwritten bookmark
390 390 for b, scid, dcid in advdst + diverge + differ:
391 391 if b in explicit:
392 392 explicit.remove(b)
393 393 pushop.outbookmarks.append((b, dcid, scid))
394 394 # search for bookmark to delete
395 395 for b, scid, dcid in adddst:
396 396 if b in explicit:
397 397 explicit.remove(b)
398 398 # treat as "deleted locally"
399 399 pushop.outbookmarks.append((b, dcid, ''))
400 400 # identical bookmarks shouldn't get reported
401 401 for b, scid, dcid in same:
402 402 if b in explicit:
403 403 explicit.remove(b)
404 404
405 405 if explicit:
406 406 explicit = sorted(explicit)
407 407 # we should probably list all of them
408 408 ui.warn(_('bookmark %s does not exist on the local '
409 409 'or remote repository!\n') % explicit[0])
410 410 pushop.bkresult = 2
411 411
412 412 pushop.outbookmarks.sort()
413 413
414 414 def _pushcheckoutgoing(pushop):
415 415 outgoing = pushop.outgoing
416 416 unfi = pushop.repo.unfiltered()
417 417 if not outgoing.missing:
418 418 # nothing to push
419 419 scmutil.nochangesfound(unfi.ui, unfi, outgoing.excluded)
420 420 return False
421 421 # something to push
422 422 if not pushop.force:
423 423 # if repo.obsstore == False --> no obsolete
424 424 # then, save the iteration
425 425 if unfi.obsstore:
426 426 # this message are here for 80 char limit reason
427 427 mso = _("push includes obsolete changeset: %s!")
428 428 mst = {"unstable": _("push includes unstable changeset: %s!"),
429 429 "bumped": _("push includes bumped changeset: %s!"),
430 430 "divergent": _("push includes divergent changeset: %s!")}
431 431 # If we are to push if there is at least one
432 432 # obsolete or unstable changeset in missing, at
433 433 # least one of the missinghead will be obsolete or
434 434 # unstable. So checking heads only is ok
435 435 for node in outgoing.missingheads:
436 436 ctx = unfi[node]
437 437 if ctx.obsolete():
438 438 raise util.Abort(mso % ctx)
439 439 elif ctx.troubled():
440 440 raise util.Abort(mst[ctx.troubles()[0]] % ctx)
441 441
442 442 # internal config: bookmarks.pushing
443 443 newbm = pushop.ui.configlist('bookmarks', 'pushing')
444 444 discovery.checkheads(unfi, pushop.remote, outgoing,
445 445 pushop.remoteheads,
446 446 pushop.newbranch,
447 447 bool(pushop.incoming),
448 448 newbm)
449 449 return True
450 450
451 451 # List of names of steps to perform for an outgoing bundle2, order matters.
452 452 b2partsgenorder = []
453 453
454 454 # Mapping between step name and function
455 455 #
456 456 # This exists to help extensions wrap steps if necessary
457 457 b2partsgenmapping = {}
458 458
459 459 def b2partsgenerator(stepname, idx=None):
460 460 """decorator for function generating bundle2 part
461 461
462 462 The function is added to the step -> function mapping and appended to the
463 463 list of steps. Beware that decorated functions will be added in order
464 464 (this may matter).
465 465
466 466 You can only use this decorator for new steps, if you want to wrap a step
467 467 from an extension, attack the b2partsgenmapping dictionary directly."""
468 468 def dec(func):
469 469 assert stepname not in b2partsgenmapping
470 470 b2partsgenmapping[stepname] = func
471 471 if idx is None:
472 472 b2partsgenorder.append(stepname)
473 473 else:
474 474 b2partsgenorder.insert(idx, stepname)
475 475 return func
476 476 return dec
477 477
478 478 @b2partsgenerator('changeset')
479 479 def _pushb2ctx(pushop, bundler):
480 480 """handle changegroup push through bundle2
481 481
482 482 addchangegroup result is stored in the ``pushop.cgresult`` attribute.
483 483 """
484 484 if 'changesets' in pushop.stepsdone:
485 485 return
486 486 pushop.stepsdone.add('changesets')
487 487 # Send known heads to the server for race detection.
488 488 if not _pushcheckoutgoing(pushop):
489 489 return
490 490 pushop.repo.prepushoutgoinghooks(pushop.repo,
491 491 pushop.remote,
492 492 pushop.outgoing)
493 493 if not pushop.force:
494 494 bundler.newpart('check:heads', data=iter(pushop.remoteheads))
495 495 b2caps = bundle2.bundle2caps(pushop.remote)
496 496 version = None
497 497 cgversions = b2caps.get('changegroup')
498 498 if not cgversions: # 3.1 and 3.2 ship with an empty value
499 499 cg = changegroup.getlocalchangegroupraw(pushop.repo, 'push',
500 500 pushop.outgoing)
501 501 else:
502 502 cgversions = [v for v in cgversions if v in changegroup.packermap]
503 503 if not cgversions:
504 504 raise ValueError(_('no common changegroup version'))
505 505 version = max(cgversions)
506 506 cg = changegroup.getlocalchangegroupraw(pushop.repo, 'push',
507 507 pushop.outgoing,
508 508 version=version)
509 509 cgpart = bundler.newpart('changegroup', data=cg)
510 510 if version is not None:
511 511 cgpart.addparam('version', version)
512 512 def handlereply(op):
513 513 """extract addchangegroup returns from server reply"""
514 514 cgreplies = op.records.getreplies(cgpart.id)
515 515 assert len(cgreplies['changegroup']) == 1
516 516 pushop.cgresult = cgreplies['changegroup'][0]['return']
517 517 return handlereply
518 518
519 519 @b2partsgenerator('phase')
520 520 def _pushb2phases(pushop, bundler):
521 521 """handle phase push through bundle2"""
522 522 if 'phases' in pushop.stepsdone:
523 523 return
524 524 b2caps = bundle2.bundle2caps(pushop.remote)
525 525 if not 'pushkey' in b2caps:
526 526 return
527 527 pushop.stepsdone.add('phases')
528 528 part2node = []
529 529
530 530 def handlefailure(pushop, exc):
531 531 targetid = int(exc.partid)
532 532 for partid, node in part2node:
533 533 if partid == targetid:
534 534 raise error.Abort(_('updating %s to public failed') % node)
535 535
536 536 enc = pushkey.encode
537 537 for newremotehead in pushop.outdatedphases:
538 538 part = bundler.newpart('pushkey')
539 539 part.addparam('namespace', enc('phases'))
540 540 part.addparam('key', enc(newremotehead.hex()))
541 541 part.addparam('old', enc(str(phases.draft)))
542 542 part.addparam('new', enc(str(phases.public)))
543 543 part2node.append((part.id, newremotehead))
544 544 pushop.pkfailcb[part.id] = handlefailure
545 545
546 546 def handlereply(op):
547 547 for partid, node in part2node:
548 548 partrep = op.records.getreplies(partid)
549 549 results = partrep['pushkey']
550 550 assert len(results) <= 1
551 551 msg = None
552 552 if not results:
553 553 msg = _('server ignored update of %s to public!\n') % node
554 554 elif not int(results[0]['return']):
555 555 msg = _('updating %s to public failed!\n') % node
556 556 if msg is not None:
557 557 pushop.ui.warn(msg)
558 558 return handlereply
559 559
560 560 @b2partsgenerator('obsmarkers')
561 561 def _pushb2obsmarkers(pushop, bundler):
562 562 if 'obsmarkers' in pushop.stepsdone:
563 563 return
564 564 remoteversions = bundle2.obsmarkersversion(bundler.capabilities)
565 565 if obsolete.commonversion(remoteversions) is None:
566 566 return
567 567 pushop.stepsdone.add('obsmarkers')
568 568 if pushop.outobsmarkers:
569 569 markers = sorted(pushop.outobsmarkers)
570 570 buildobsmarkerspart(bundler, markers)
571 571
572 572 @b2partsgenerator('bookmarks')
573 573 def _pushb2bookmarks(pushop, bundler):
574 574 """handle bookmark push through bundle2"""
575 575 if 'bookmarks' in pushop.stepsdone:
576 576 return
577 577 b2caps = bundle2.bundle2caps(pushop.remote)
578 578 if 'pushkey' not in b2caps:
579 579 return
580 580 pushop.stepsdone.add('bookmarks')
581 581 part2book = []
582 582 enc = pushkey.encode
583 583
584 584 def handlefailure(pushop, exc):
585 585 targetid = int(exc.partid)
586 586 for partid, book, action in part2book:
587 587 if partid == targetid:
588 588 raise error.Abort(bookmsgmap[action][1].rstrip() % book)
589 589 # we should not be called for part we did not generated
590 590 assert False
591 591
592 592 for book, old, new in pushop.outbookmarks:
593 593 part = bundler.newpart('pushkey')
594 594 part.addparam('namespace', enc('bookmarks'))
595 595 part.addparam('key', enc(book))
596 596 part.addparam('old', enc(old))
597 597 part.addparam('new', enc(new))
598 598 action = 'update'
599 599 if not old:
600 600 action = 'export'
601 601 elif not new:
602 602 action = 'delete'
603 603 part2book.append((part.id, book, action))
604 604 pushop.pkfailcb[part.id] = handlefailure
605 605
606 606 def handlereply(op):
607 607 ui = pushop.ui
608 608 for partid, book, action in part2book:
609 609 partrep = op.records.getreplies(partid)
610 610 results = partrep['pushkey']
611 611 assert len(results) <= 1
612 612 if not results:
613 613 pushop.ui.warn(_('server ignored bookmark %s update\n') % book)
614 614 else:
615 615 ret = int(results[0]['return'])
616 616 if ret:
617 617 ui.status(bookmsgmap[action][0] % book)
618 618 else:
619 619 ui.warn(bookmsgmap[action][1] % book)
620 620 if pushop.bkresult is not None:
621 621 pushop.bkresult = 1
622 622 return handlereply
623 623
624 624
625 625 def _pushbundle2(pushop):
626 626 """push data to the remote using bundle2
627 627
628 628 The only currently supported type of data is changegroup but this will
629 629 evolve in the future."""
630 630 bundler = bundle2.bundle20(pushop.ui, bundle2.bundle2caps(pushop.remote))
631 631 pushback = (pushop.trmanager
632 632 and pushop.ui.configbool('experimental', 'bundle2.pushback'))
633 633
634 634 # create reply capability
635 635 capsblob = bundle2.encodecaps(bundle2.getrepocaps(pushop.repo,
636 636 allowpushback=pushback))
637 637 bundler.newpart('replycaps', data=capsblob)
638 638 replyhandlers = []
639 639 for partgenname in b2partsgenorder:
640 640 partgen = b2partsgenmapping[partgenname]
641 641 ret = partgen(pushop, bundler)
642 642 if callable(ret):
643 643 replyhandlers.append(ret)
644 644 # do not push if nothing to push
645 645 if bundler.nbparts <= 1:
646 646 return
647 647 stream = util.chunkbuffer(bundler.getchunks())
648 648 try:
649 649 try:
650 650 reply = pushop.remote.unbundle(stream, ['force'], 'push')
651 651 except error.BundleValueError as exc:
652 652 raise util.Abort('missing support for %s' % exc)
653 653 try:
654 654 trgetter = None
655 655 if pushback:
656 656 trgetter = pushop.trmanager.transaction
657 657 op = bundle2.processbundle(pushop.repo, reply, trgetter)
658 658 except error.BundleValueError as exc:
659 659 raise util.Abort('missing support for %s' % exc)
660 660 except error.PushkeyFailed as exc:
661 661 partid = int(exc.partid)
662 662 if partid not in pushop.pkfailcb:
663 663 raise
664 664 pushop.pkfailcb[partid](pushop, exc)
665 665 for rephand in replyhandlers:
666 666 rephand(op)
667 667
668 668 def _pushchangeset(pushop):
669 669 """Make the actual push of changeset bundle to remote repo"""
670 670 if 'changesets' in pushop.stepsdone:
671 671 return
672 672 pushop.stepsdone.add('changesets')
673 673 if not _pushcheckoutgoing(pushop):
674 674 return
675 675 pushop.repo.prepushoutgoinghooks(pushop.repo,
676 676 pushop.remote,
677 677 pushop.outgoing)
678 678 outgoing = pushop.outgoing
679 679 unbundle = pushop.remote.capable('unbundle')
680 680 # TODO: get bundlecaps from remote
681 681 bundlecaps = None
682 682 # create a changegroup from local
683 683 if pushop.revs is None and not (outgoing.excluded
684 684 or pushop.repo.changelog.filteredrevs):
685 685 # push everything,
686 686 # use the fast path, no race possible on push
687 687 bundler = changegroup.cg1packer(pushop.repo, bundlecaps)
688 688 cg = changegroup.getsubset(pushop.repo,
689 689 outgoing,
690 690 bundler,
691 691 'push',
692 692 fastpath=True)
693 693 else:
694 694 cg = changegroup.getlocalchangegroup(pushop.repo, 'push', outgoing,
695 695 bundlecaps)
696 696
697 697 # apply changegroup to remote
698 698 if unbundle:
699 699 # local repo finds heads on server, finds out what
700 700 # revs it must push. once revs transferred, if server
701 701 # finds it has different heads (someone else won
702 702 # commit/push race), server aborts.
703 703 if pushop.force:
704 704 remoteheads = ['force']
705 705 else:
706 706 remoteheads = pushop.remoteheads
707 707 # ssh: return remote's addchangegroup()
708 708 # http: return remote's addchangegroup() or 0 for error
709 709 pushop.cgresult = pushop.remote.unbundle(cg, remoteheads,
710 710 pushop.repo.url())
711 711 else:
712 712 # we return an integer indicating remote head count
713 713 # change
714 714 pushop.cgresult = pushop.remote.addchangegroup(cg, 'push',
715 715 pushop.repo.url())
716 716
717 717 def _pushsyncphase(pushop):
718 718 """synchronise phase information locally and remotely"""
719 719 cheads = pushop.commonheads
720 720 # even when we don't push, exchanging phase data is useful
721 721 remotephases = pushop.remote.listkeys('phases')
722 722 if (pushop.ui.configbool('ui', '_usedassubrepo', False)
723 723 and remotephases # server supports phases
724 724 and pushop.cgresult is None # nothing was pushed
725 725 and remotephases.get('publishing', False)):
726 726 # When:
727 727 # - this is a subrepo push
728 728 # - and remote support phase
729 729 # - and no changeset was pushed
730 730 # - and remote is publishing
731 731 # We may be in issue 3871 case!
732 732 # We drop the possible phase synchronisation done by
733 733 # courtesy to publish changesets possibly locally draft
734 734 # on the remote.
735 735 remotephases = {'publishing': 'True'}
736 736 if not remotephases: # old server or public only reply from non-publishing
737 737 _localphasemove(pushop, cheads)
738 738 # don't push any phase data as there is nothing to push
739 739 else:
740 740 ana = phases.analyzeremotephases(pushop.repo, cheads,
741 741 remotephases)
742 742 pheads, droots = ana
743 743 ### Apply remote phase on local
744 744 if remotephases.get('publishing', False):
745 745 _localphasemove(pushop, cheads)
746 746 else: # publish = False
747 747 _localphasemove(pushop, pheads)
748 748 _localphasemove(pushop, cheads, phases.draft)
749 749 ### Apply local phase on remote
750 750
751 751 if pushop.cgresult:
752 752 if 'phases' in pushop.stepsdone:
753 753 # phases already pushed though bundle2
754 754 return
755 755 outdated = pushop.outdatedphases
756 756 else:
757 757 outdated = pushop.fallbackoutdatedphases
758 758
759 759 pushop.stepsdone.add('phases')
760 760
761 761 # filter heads already turned public by the push
762 762 outdated = [c for c in outdated if c.node() not in pheads]
763 763 # fallback to independent pushkey command
764 764 for newremotehead in outdated:
765 765 r = pushop.remote.pushkey('phases',
766 766 newremotehead.hex(),
767 767 str(phases.draft),
768 768 str(phases.public))
769 769 if not r:
770 770 pushop.ui.warn(_('updating %s to public failed!\n')
771 771 % newremotehead)
772 772
773 773 def _localphasemove(pushop, nodes, phase=phases.public):
774 774 """move <nodes> to <phase> in the local source repo"""
775 775 if pushop.trmanager:
776 776 phases.advanceboundary(pushop.repo,
777 777 pushop.trmanager.transaction(),
778 778 phase,
779 779 nodes)
780 780 else:
781 781 # repo is not locked, do not change any phases!
782 782 # Informs the user that phases should have been moved when
783 783 # applicable.
784 784 actualmoves = [n for n in nodes if phase < pushop.repo[n].phase()]
785 785 phasestr = phases.phasenames[phase]
786 786 if actualmoves:
787 787 pushop.ui.status(_('cannot lock source repo, skipping '
788 788 'local %s phase update\n') % phasestr)
789 789
790 790 def _pushobsolete(pushop):
791 791 """utility function to push obsolete markers to a remote"""
792 792 if 'obsmarkers' in pushop.stepsdone:
793 793 return
794 794 repo = pushop.repo
795 795 remote = pushop.remote
796 796 pushop.stepsdone.add('obsmarkers')
797 797 if pushop.outobsmarkers:
798 798 pushop.ui.debug('try to push obsolete markers to remote\n')
799 799 rslts = []
800 800 remotedata = obsolete._pushkeyescape(sorted(pushop.outobsmarkers))
801 801 for key in sorted(remotedata, reverse=True):
802 802 # reverse sort to ensure we end with dump0
803 803 data = remotedata[key]
804 804 rslts.append(remote.pushkey('obsolete', key, '', data))
805 805 if [r for r in rslts if not r]:
806 806 msg = _('failed to push some obsolete markers!\n')
807 807 repo.ui.warn(msg)
808 808
809 809 def _pushbookmark(pushop):
810 810 """Update bookmark position on remote"""
811 811 if pushop.cgresult == 0 or 'bookmarks' in pushop.stepsdone:
812 812 return
813 813 pushop.stepsdone.add('bookmarks')
814 814 ui = pushop.ui
815 815 remote = pushop.remote
816 816
817 817 for b, old, new in pushop.outbookmarks:
818 818 action = 'update'
819 819 if not old:
820 820 action = 'export'
821 821 elif not new:
822 822 action = 'delete'
823 823 if remote.pushkey('bookmarks', b, old, new):
824 824 ui.status(bookmsgmap[action][0] % b)
825 825 else:
826 826 ui.warn(bookmsgmap[action][1] % b)
827 827 # discovery can have set the value form invalid entry
828 828 if pushop.bkresult is not None:
829 829 pushop.bkresult = 1
830 830
831 831 class pulloperation(object):
832 832 """A object that represent a single pull operation
833 833
834 834 It purpose is to carry pull related state and very common operation.
835 835
836 836 A new should be created at the beginning of each pull and discarded
837 837 afterward.
838 838 """
839 839
840 840 def __init__(self, repo, remote, heads=None, force=False, bookmarks=(),
841 841 remotebookmarks=None):
842 842 # repo we pull into
843 843 self.repo = repo
844 844 # repo we pull from
845 845 self.remote = remote
846 846 # revision we try to pull (None is "all")
847 847 self.heads = heads
848 848 # bookmark pulled explicitly
849 849 self.explicitbookmarks = bookmarks
850 850 # do we force pull?
851 851 self.force = force
852 852 # transaction manager
853 853 self.trmanager = None
854 854 # set of common changeset between local and remote before pull
855 855 self.common = None
856 856 # set of pulled head
857 857 self.rheads = None
858 858 # list of missing changeset to fetch remotely
859 859 self.fetch = None
860 860 # remote bookmarks data
861 861 self.remotebookmarks = remotebookmarks
862 862 # result of changegroup pulling (used as return code by pull)
863 863 self.cgresult = None
864 864 # list of step already done
865 865 self.stepsdone = set()
866 866
867 867 @util.propertycache
868 868 def pulledsubset(self):
869 869 """heads of the set of changeset target by the pull"""
870 870 # compute target subset
871 871 if self.heads is None:
872 872 # We pulled every thing possible
873 873 # sync on everything common
874 874 c = set(self.common)
875 875 ret = list(self.common)
876 876 for n in self.rheads:
877 877 if n not in c:
878 878 ret.append(n)
879 879 return ret
880 880 else:
881 881 # We pulled a specific subset
882 882 # sync on this subset
883 883 return self.heads
884 884
885 885 def gettransaction(self):
886 886 # deprecated; talk to trmanager directly
887 887 return self.trmanager.transaction()
888 888
889 889 class transactionmanager(object):
890 890 """An object to manage the life cycle of a transaction
891 891
892 892 It creates the transaction on demand and calls the appropriate hooks when
893 893 closing the transaction."""
894 894 def __init__(self, repo, source, url):
895 895 self.repo = repo
896 896 self.source = source
897 897 self.url = url
898 898 self._tr = None
899 899
900 900 def transaction(self):
901 901 """Return an open transaction object, constructing if necessary"""
902 902 if not self._tr:
903 903 trname = '%s\n%s' % (self.source, util.hidepassword(self.url))
904 904 self._tr = self.repo.transaction(trname)
905 905 self._tr.hookargs['source'] = self.source
906 906 self._tr.hookargs['url'] = self.url
907 907 return self._tr
908 908
909 909 def close(self):
910 910 """close transaction if created"""
911 911 if self._tr is not None:
912 912 self._tr.close()
913 913
914 914 def release(self):
915 915 """release transaction if created"""
916 916 if self._tr is not None:
917 917 self._tr.release()
918 918
919 919 def pull(repo, remote, heads=None, force=False, bookmarks=(), opargs=None):
920 920 if opargs is None:
921 921 opargs = {}
922 922 pullop = pulloperation(repo, remote, heads, force, bookmarks=bookmarks,
923 923 **opargs)
924 924 if pullop.remote.local():
925 925 missing = set(pullop.remote.requirements) - pullop.repo.supported
926 926 if missing:
927 927 msg = _("required features are not"
928 928 " supported in the destination:"
929 929 " %s") % (', '.join(sorted(missing)))
930 930 raise util.Abort(msg)
931 931
932 932 lock = pullop.repo.lock()
933 933 try:
934 934 pullop.trmanager = transactionmanager(repo, 'pull', remote.url())
935 935 _pulldiscovery(pullop)
936 936 if _canusebundle2(pullop):
937 937 _pullbundle2(pullop)
938 938 _pullchangeset(pullop)
939 939 _pullphase(pullop)
940 940 _pullbookmarks(pullop)
941 941 _pullobsolete(pullop)
942 942 pullop.trmanager.close()
943 943 finally:
944 944 pullop.trmanager.release()
945 945 lock.release()
946 946
947 947 return pullop
948 948
949 949 # list of steps to perform discovery before pull
950 950 pulldiscoveryorder = []
951 951
952 952 # Mapping between step name and function
953 953 #
954 954 # This exists to help extensions wrap steps if necessary
955 955 pulldiscoverymapping = {}
956 956
957 957 def pulldiscovery(stepname):
958 958 """decorator for function performing discovery before pull
959 959
960 960 The function is added to the step -> function mapping and appended to the
961 961 list of steps. Beware that decorated function will be added in order (this
962 962 may matter).
963 963
964 964 You can only use this decorator for a new step, if you want to wrap a step
965 965 from an extension, change the pulldiscovery dictionary directly."""
966 966 def dec(func):
967 967 assert stepname not in pulldiscoverymapping
968 968 pulldiscoverymapping[stepname] = func
969 969 pulldiscoveryorder.append(stepname)
970 970 return func
971 971 return dec
972 972
973 973 def _pulldiscovery(pullop):
974 974 """Run all discovery steps"""
975 975 for stepname in pulldiscoveryorder:
976 976 step = pulldiscoverymapping[stepname]
977 977 step(pullop)
978 978
979 979 @pulldiscovery('b1:bookmarks')
980 980 def _pullbookmarkbundle1(pullop):
981 981 """fetch bookmark data in bundle1 case
982 982
983 983 If not using bundle2, we have to fetch bookmarks before changeset
984 984 discovery to reduce the chance and impact of race conditions."""
985 985 if pullop.remotebookmarks is not None:
986 986 return
987 987 if (_canusebundle2(pullop)
988 988 and 'listkeys' in bundle2.bundle2caps(pullop.remote)):
989 989 # all known bundle2 servers now support listkeys, but lets be nice with
990 990 # new implementation.
991 991 return
992 992 pullop.remotebookmarks = pullop.remote.listkeys('bookmarks')
993 993
994 994
995 995 @pulldiscovery('changegroup')
996 996 def _pulldiscoverychangegroup(pullop):
997 997 """discovery phase for the pull
998 998
999 999 Current handle changeset discovery only, will change handle all discovery
1000 1000 at some point."""
1001 1001 tmp = discovery.findcommonincoming(pullop.repo,
1002 1002 pullop.remote,
1003 1003 heads=pullop.heads,
1004 1004 force=pullop.force)
1005 1005 common, fetch, rheads = tmp
1006 1006 nm = pullop.repo.unfiltered().changelog.nodemap
1007 1007 if fetch and rheads:
1008 1008 # If a remote heads in filtered locally, lets drop it from the unknown
1009 1009 # remote heads and put in back in common.
1010 1010 #
1011 1011 # This is a hackish solution to catch most of "common but locally
1012 1012 # hidden situation". We do not performs discovery on unfiltered
1013 1013 # repository because it end up doing a pathological amount of round
1014 1014 # trip for w huge amount of changeset we do not care about.
1015 1015 #
1016 1016 # If a set of such "common but filtered" changeset exist on the server
1017 1017 # but are not including a remote heads, we'll not be able to detect it,
1018 1018 scommon = set(common)
1019 1019 filteredrheads = []
1020 1020 for n in rheads:
1021 1021 if n in nm:
1022 1022 if n not in scommon:
1023 1023 common.append(n)
1024 1024 else:
1025 1025 filteredrheads.append(n)
1026 1026 if not filteredrheads:
1027 1027 fetch = []
1028 1028 rheads = filteredrheads
1029 1029 pullop.common = common
1030 1030 pullop.fetch = fetch
1031 1031 pullop.rheads = rheads
1032 1032
1033 1033 def _pullbundle2(pullop):
1034 1034 """pull data using bundle2
1035 1035
1036 1036 For now, the only supported data are changegroup."""
1037 1037 remotecaps = bundle2.bundle2caps(pullop.remote)
1038 1038 kwargs = {'bundlecaps': caps20to10(pullop.repo)}
1039 1039 # pulling changegroup
1040 1040 pullop.stepsdone.add('changegroup')
1041 1041
1042 1042 kwargs['common'] = pullop.common
1043 1043 kwargs['heads'] = pullop.heads or pullop.rheads
1044 1044 kwargs['cg'] = pullop.fetch
1045 1045 if 'listkeys' in remotecaps:
1046 1046 kwargs['listkeys'] = ['phase']
1047 1047 if pullop.remotebookmarks is None:
1048 1048 # make sure to always includes bookmark data when migrating
1049 1049 # `hg incoming --bundle` to using this function.
1050 1050 kwargs['listkeys'].append('bookmarks')
1051 1051 if not pullop.fetch:
1052 1052 pullop.repo.ui.status(_("no changes found\n"))
1053 1053 pullop.cgresult = 0
1054 1054 else:
1055 1055 if pullop.heads is None and list(pullop.common) == [nullid]:
1056 1056 pullop.repo.ui.status(_("requesting all changes\n"))
1057 1057 if obsolete.isenabled(pullop.repo, obsolete.exchangeopt):
1058 1058 remoteversions = bundle2.obsmarkersversion(remotecaps)
1059 1059 if obsolete.commonversion(remoteversions) is not None:
1060 1060 kwargs['obsmarkers'] = True
1061 1061 pullop.stepsdone.add('obsmarkers')
1062 1062 _pullbundle2extraprepare(pullop, kwargs)
1063 1063 bundle = pullop.remote.getbundle('pull', **kwargs)
1064 1064 try:
1065 1065 op = bundle2.processbundle(pullop.repo, bundle, pullop.gettransaction)
1066 1066 except error.BundleValueError as exc:
1067 1067 raise util.Abort('missing support for %s' % exc)
1068 1068
1069 1069 if pullop.fetch:
1070 1070 results = [cg['return'] for cg in op.records['changegroup']]
1071 1071 pullop.cgresult = changegroup.combineresults(results)
1072 1072
1073 1073 # processing phases change
1074 1074 for namespace, value in op.records['listkeys']:
1075 1075 if namespace == 'phases':
1076 1076 _pullapplyphases(pullop, value)
1077 1077
1078 1078 # processing bookmark update
1079 1079 for namespace, value in op.records['listkeys']:
1080 1080 if namespace == 'bookmarks':
1081 1081 pullop.remotebookmarks = value
1082 1082
1083 1083 # bookmark data were either already there or pulled in the bundle
1084 1084 if pullop.remotebookmarks is not None:
1085 1085 _pullbookmarks(pullop)
1086 1086
1087 1087 def _pullbundle2extraprepare(pullop, kwargs):
1088 1088 """hook function so that extensions can extend the getbundle call"""
1089 1089 pass
1090 1090
1091 1091 def _pullchangeset(pullop):
1092 1092 """pull changeset from unbundle into the local repo"""
1093 1093 # We delay the open of the transaction as late as possible so we
1094 1094 # don't open transaction for nothing or you break future useful
1095 1095 # rollback call
1096 1096 if 'changegroup' in pullop.stepsdone:
1097 1097 return
1098 1098 pullop.stepsdone.add('changegroup')
1099 1099 if not pullop.fetch:
1100 1100 pullop.repo.ui.status(_("no changes found\n"))
1101 1101 pullop.cgresult = 0
1102 1102 return
1103 1103 pullop.gettransaction()
1104 1104 if pullop.heads is None and list(pullop.common) == [nullid]:
1105 1105 pullop.repo.ui.status(_("requesting all changes\n"))
1106 1106 elif pullop.heads is None and pullop.remote.capable('changegroupsubset'):
1107 1107 # issue1320, avoid a race if remote changed after discovery
1108 1108 pullop.heads = pullop.rheads
1109 1109
1110 1110 if pullop.remote.capable('getbundle'):
1111 1111 # TODO: get bundlecaps from remote
1112 1112 cg = pullop.remote.getbundle('pull', common=pullop.common,
1113 1113 heads=pullop.heads or pullop.rheads)
1114 1114 elif pullop.heads is None:
1115 1115 cg = pullop.remote.changegroup(pullop.fetch, 'pull')
1116 1116 elif not pullop.remote.capable('changegroupsubset'):
1117 1117 raise util.Abort(_("partial pull cannot be done because "
1118 1118 "other repository doesn't support "
1119 1119 "changegroupsubset."))
1120 1120 else:
1121 1121 cg = pullop.remote.changegroupsubset(pullop.fetch, pullop.heads, 'pull')
1122 1122 pullop.cgresult = changegroup.addchangegroup(pullop.repo, cg, 'pull',
1123 1123 pullop.remote.url())
1124 1124
1125 1125 def _pullphase(pullop):
1126 1126 # Get remote phases data from remote
1127 1127 if 'phases' in pullop.stepsdone:
1128 1128 return
1129 1129 remotephases = pullop.remote.listkeys('phases')
1130 1130 _pullapplyphases(pullop, remotephases)
1131 1131
1132 1132 def _pullapplyphases(pullop, remotephases):
1133 1133 """apply phase movement from observed remote state"""
1134 1134 if 'phases' in pullop.stepsdone:
1135 1135 return
1136 1136 pullop.stepsdone.add('phases')
1137 1137 publishing = bool(remotephases.get('publishing', False))
1138 1138 if remotephases and not publishing:
1139 1139 # remote is new and unpublishing
1140 1140 pheads, _dr = phases.analyzeremotephases(pullop.repo,
1141 1141 pullop.pulledsubset,
1142 1142 remotephases)
1143 1143 dheads = pullop.pulledsubset
1144 1144 else:
1145 1145 # Remote is old or publishing all common changesets
1146 1146 # should be seen as public
1147 1147 pheads = pullop.pulledsubset
1148 1148 dheads = []
1149 1149 unfi = pullop.repo.unfiltered()
1150 1150 phase = unfi._phasecache.phase
1151 1151 rev = unfi.changelog.nodemap.get
1152 1152 public = phases.public
1153 1153 draft = phases.draft
1154 1154
1155 1155 # exclude changesets already public locally and update the others
1156 1156 pheads = [pn for pn in pheads if phase(unfi, rev(pn)) > public]
1157 1157 if pheads:
1158 1158 tr = pullop.gettransaction()
1159 1159 phases.advanceboundary(pullop.repo, tr, public, pheads)
1160 1160
1161 1161 # exclude changesets already draft locally and update the others
1162 1162 dheads = [pn for pn in dheads if phase(unfi, rev(pn)) > draft]
1163 1163 if dheads:
1164 1164 tr = pullop.gettransaction()
1165 1165 phases.advanceboundary(pullop.repo, tr, draft, dheads)
1166 1166
1167 1167 def _pullbookmarks(pullop):
1168 1168 """process the remote bookmark information to update the local one"""
1169 1169 if 'bookmarks' in pullop.stepsdone:
1170 1170 return
1171 1171 pullop.stepsdone.add('bookmarks')
1172 1172 repo = pullop.repo
1173 1173 remotebookmarks = pullop.remotebookmarks
1174 1174 bookmod.updatefromremote(repo.ui, repo, remotebookmarks,
1175 1175 pullop.remote.url(),
1176 1176 pullop.gettransaction,
1177 1177 explicit=pullop.explicitbookmarks)
1178 1178
1179 1179 def _pullobsolete(pullop):
1180 1180 """utility function to pull obsolete markers from a remote
1181 1181
1182 1182 The `gettransaction` is function that return the pull transaction, creating
1183 1183 one if necessary. We return the transaction to inform the calling code that
1184 1184 a new transaction have been created (when applicable).
1185 1185
1186 1186 Exists mostly to allow overriding for experimentation purpose"""
1187 1187 if 'obsmarkers' in pullop.stepsdone:
1188 1188 return
1189 1189 pullop.stepsdone.add('obsmarkers')
1190 1190 tr = None
1191 1191 if obsolete.isenabled(pullop.repo, obsolete.exchangeopt):
1192 1192 pullop.repo.ui.debug('fetching remote obsolete markers\n')
1193 1193 remoteobs = pullop.remote.listkeys('obsolete')
1194 1194 if 'dump0' in remoteobs:
1195 1195 tr = pullop.gettransaction()
1196 1196 for key in sorted(remoteobs, reverse=True):
1197 1197 if key.startswith('dump'):
1198 1198 data = base85.b85decode(remoteobs[key])
1199 1199 pullop.repo.obsstore.mergemarkers(tr, data)
1200 1200 pullop.repo.invalidatevolatilesets()
1201 1201 return tr
1202 1202
1203 1203 def caps20to10(repo):
1204 1204 """return a set with appropriate options to use bundle20 during getbundle"""
1205 1205 caps = set(['HG20'])
1206 1206 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
1207 1207 caps.add('bundle2=' + urllib.quote(capsblob))
1208 1208 return caps
1209 1209
1210 1210 # List of names of steps to perform for a bundle2 for getbundle, order matters.
1211 1211 getbundle2partsorder = []
1212 1212
1213 1213 # Mapping between step name and function
1214 1214 #
1215 1215 # This exists to help extensions wrap steps if necessary
1216 1216 getbundle2partsmapping = {}
1217 1217
1218 1218 def getbundle2partsgenerator(stepname, idx=None):
1219 1219 """decorator for function generating bundle2 part for getbundle
1220 1220
1221 1221 The function is added to the step -> function mapping and appended to the
1222 1222 list of steps. Beware that decorated functions will be added in order
1223 1223 (this may matter).
1224 1224
1225 1225 You can only use this decorator for new steps, if you want to wrap a step
1226 1226 from an extension, attack the getbundle2partsmapping dictionary directly."""
1227 1227 def dec(func):
1228 1228 assert stepname not in getbundle2partsmapping
1229 1229 getbundle2partsmapping[stepname] = func
1230 1230 if idx is None:
1231 1231 getbundle2partsorder.append(stepname)
1232 1232 else:
1233 1233 getbundle2partsorder.insert(idx, stepname)
1234 1234 return func
1235 1235 return dec
1236 1236
1237 1237 def getbundle(repo, source, heads=None, common=None, bundlecaps=None,
1238 1238 **kwargs):
1239 1239 """return a full bundle (with potentially multiple kind of parts)
1240 1240
1241 1241 Could be a bundle HG10 or a bundle HG20 depending on bundlecaps
1242 1242 passed. For now, the bundle can contain only changegroup, but this will
1243 1243 changes when more part type will be available for bundle2.
1244 1244
1245 1245 This is different from changegroup.getchangegroup that only returns an HG10
1246 1246 changegroup bundle. They may eventually get reunited in the future when we
1247 1247 have a clearer idea of the API we what to query different data.
1248 1248
1249 1249 The implementation is at a very early stage and will get massive rework
1250 1250 when the API of bundle is refined.
1251 1251 """
1252 1252 # bundle10 case
1253 1253 usebundle2 = False
1254 1254 if bundlecaps is not None:
1255 1255 usebundle2 = any((cap.startswith('HG2') for cap in bundlecaps))
1256 1256 if not usebundle2:
1257 1257 if bundlecaps and not kwargs.get('cg', True):
1258 1258 raise ValueError(_('request for bundle10 must include changegroup'))
1259 1259
1260 1260 if kwargs:
1261 1261 raise ValueError(_('unsupported getbundle arguments: %s')
1262 1262 % ', '.join(sorted(kwargs.keys())))
1263 1263 return changegroup.getchangegroup(repo, source, heads=heads,
1264 1264 common=common, bundlecaps=bundlecaps)
1265 1265
1266 1266 # bundle20 case
1267 1267 b2caps = {}
1268 1268 for bcaps in bundlecaps:
1269 1269 if bcaps.startswith('bundle2='):
1270 1270 blob = urllib.unquote(bcaps[len('bundle2='):])
1271 1271 b2caps.update(bundle2.decodecaps(blob))
1272 1272 bundler = bundle2.bundle20(repo.ui, b2caps)
1273 1273
1274 1274 kwargs['heads'] = heads
1275 1275 kwargs['common'] = common
1276 1276
1277 1277 for name in getbundle2partsorder:
1278 1278 func = getbundle2partsmapping[name]
1279 1279 func(bundler, repo, source, bundlecaps=bundlecaps, b2caps=b2caps,
1280 1280 **kwargs)
1281 1281
1282 1282 return util.chunkbuffer(bundler.getchunks())
1283 1283
1284 1284 @getbundle2partsgenerator('changegroup')
1285 1285 def _getbundlechangegrouppart(bundler, repo, source, bundlecaps=None,
1286 1286 b2caps=None, heads=None, common=None, **kwargs):
1287 1287 """add a changegroup part to the requested bundle"""
1288 1288 cg = None
1289 1289 if kwargs.get('cg', True):
1290 1290 # build changegroup bundle here.
1291 1291 version = None
1292 1292 cgversions = b2caps.get('changegroup')
1293 1293 getcgkwargs = {}
1294 1294 if cgversions: # 3.1 and 3.2 ship with an empty value
1295 1295 cgversions = [v for v in cgversions if v in changegroup.packermap]
1296 1296 if not cgversions:
1297 1297 raise ValueError(_('no common changegroup version'))
1298 1298 version = getcgkwargs['version'] = max(cgversions)
1299 1299 outgoing = changegroup.computeoutgoing(repo, heads, common)
1300 1300 cg = changegroup.getlocalchangegroupraw(repo, source, outgoing,
1301 1301 bundlecaps=bundlecaps,
1302 1302 **getcgkwargs)
1303 1303
1304 1304 if cg:
1305 1305 part = bundler.newpart('changegroup', data=cg)
1306 1306 if version is not None:
1307 1307 part.addparam('version', version)
1308 1308 part.addparam('nbchanges', str(len(outgoing.missing)), mandatory=False)
1309 1309
1310 1310 @getbundle2partsgenerator('listkeys')
1311 1311 def _getbundlelistkeysparts(bundler, repo, source, bundlecaps=None,
1312 1312 b2caps=None, **kwargs):
1313 1313 """add parts containing listkeys namespaces to the requested bundle"""
1314 1314 listkeys = kwargs.get('listkeys', ())
1315 1315 for namespace in listkeys:
1316 1316 part = bundler.newpart('listkeys')
1317 1317 part.addparam('namespace', namespace)
1318 1318 keys = repo.listkeys(namespace).items()
1319 1319 part.data = pushkey.encodekeys(keys)
1320 1320
1321 1321 @getbundle2partsgenerator('obsmarkers')
1322 1322 def _getbundleobsmarkerpart(bundler, repo, source, bundlecaps=None,
1323 1323 b2caps=None, heads=None, **kwargs):
1324 1324 """add an obsolescence markers part to the requested bundle"""
1325 1325 if kwargs.get('obsmarkers', False):
1326 1326 if heads is None:
1327 1327 heads = repo.heads()
1328 1328 subset = [c.node() for c in repo.set('::%ln', heads)]
1329 1329 markers = repo.obsstore.relevantmarkers(subset)
1330 1330 markers = sorted(markers)
1331 1331 buildobsmarkerspart(bundler, markers)
1332 1332
1333 1333 @getbundle2partsgenerator('hgtagsfnodes')
1334 1334 def _getbundletagsfnodes(bundler, repo, source, bundlecaps=None,
1335 1335 b2caps=None, heads=None, common=None,
1336 1336 **kwargs):
1337 1337 """Transfer the .hgtags filenodes mapping.
1338 1338
1339 1339 Only values for heads in this bundle will be transferred.
1340 1340
1341 1341 The part data consists of pairs of 20 byte changeset node and .hgtags
1342 1342 filenodes raw values.
1343 1343 """
1344 1344 # Don't send unless:
1345 1345 # - changeset are being exchanged,
1346 1346 # - the client supports it.
1347 1347 if not (kwargs.get('cg', True) and 'hgtagsfnodes' in b2caps):
1348 1348 return
1349 1349
1350 1350 outgoing = changegroup.computeoutgoing(repo, heads, common)
1351 1351
1352 1352 if not outgoing.missingheads:
1353 1353 return
1354 1354
1355 1355 cache = tags.hgtagsfnodescache(repo.unfiltered())
1356 1356 chunks = []
1357 1357
1358 1358 # .hgtags fnodes are only relevant for head changesets. While we could
1359 1359 # transfer values for all known nodes, there will likely be little to
1360 1360 # no benefit.
1361 1361 #
1362 1362 # We don't bother using a generator to produce output data because
1363 1363 # a) we only have 40 bytes per head and even esoteric numbers of heads
1364 1364 # consume little memory (1M heads is 40MB) b) we don't want to send the
1365 1365 # part if we don't have entries and knowing if we have entries requires
1366 1366 # cache lookups.
1367 1367 for node in outgoing.missingheads:
1368 1368 # Don't compute missing, as this may slow down serving.
1369 1369 fnode = cache.getfnode(node, computemissing=False)
1370 1370 if fnode is not None:
1371 1371 chunks.extend([node, fnode])
1372 1372
1373 1373 if chunks:
1374 1374 bundler.newpart('hgtagsfnodes', data=''.join(chunks))
1375 1375
1376 1376 def check_heads(repo, their_heads, context):
1377 1377 """check if the heads of a repo have been modified
1378 1378
1379 1379 Used by peer for unbundling.
1380 1380 """
1381 1381 heads = repo.heads()
1382 1382 heads_hash = util.sha1(''.join(sorted(heads))).digest()
1383 1383 if not (their_heads == ['force'] or their_heads == heads or
1384 1384 their_heads == ['hashed', heads_hash]):
1385 1385 # someone else committed/pushed/unbundled while we
1386 1386 # were transferring data
1387 1387 raise error.PushRaced('repository changed while %s - '
1388 1388 'please try again' % context)
1389 1389
1390 1390 def unbundle(repo, cg, heads, source, url):
1391 1391 """Apply a bundle to a repo.
1392 1392
1393 1393 this function makes sure the repo is locked during the application and have
1394 1394 mechanism to check that no push race occurred between the creation of the
1395 1395 bundle and its application.
1396 1396
1397 1397 If the push was raced as PushRaced exception is raised."""
1398 1398 r = 0
1399 1399 # need a transaction when processing a bundle2 stream
1400 1400 wlock = lock = tr = None
1401 1401 recordout = None
1402 1402 # quick fix for output mismatch with bundle2 in 3.4
1403 1403 captureoutput = repo.ui.configbool('experimental', 'bundle2-output-capture',
1404 1404 False)
1405 1405 if url.startswith('remote:http:') or url.startswith('remote:https:'):
1406 1406 captureoutput = True
1407 1407 try:
1408 1408 check_heads(repo, heads, 'uploading changes')
1409 1409 # push can proceed
1410 1410 if util.safehasattr(cg, 'params'):
1411 1411 r = None
1412 1412 try:
1413 1413 wlock = repo.wlock()
1414 1414 lock = repo.lock()
1415 1415 tr = repo.transaction(source)
1416 1416 tr.hookargs['source'] = source
1417 1417 tr.hookargs['url'] = url
1418 1418 tr.hookargs['bundle2'] = '1'
1419 1419 op = bundle2.bundleoperation(repo, lambda: tr,
1420 1420 captureoutput=captureoutput)
1421 1421 try:
1422 1422 op = bundle2.processbundle(repo, cg, op=op)
1423 1423 finally:
1424 1424 r = op.reply
1425 1425 if captureoutput and r is not None:
1426 1426 repo.ui.pushbuffer(error=True, subproc=True)
1427 1427 def recordout(output):
1428 1428 r.newpart('output', data=output, mandatory=False)
1429 1429 tr.close()
1430 1430 except BaseException as exc:
1431 1431 exc.duringunbundle2 = True
1432 1432 if captureoutput and r is not None:
1433 1433 parts = exc._bundle2salvagedoutput = r.salvageoutput()
1434 1434 def recordout(output):
1435 1435 part = bundle2.bundlepart('output', data=output,
1436 1436 mandatory=False)
1437 1437 parts.append(part)
1438 1438 raise
1439 1439 else:
1440 1440 lock = repo.lock()
1441 1441 r = changegroup.addchangegroup(repo, cg, source, url)
1442 1442 finally:
1443 1443 lockmod.release(tr, lock, wlock)
1444 1444 if recordout is not None:
1445 1445 recordout(repo.ui.popbuffer())
1446 1446 return r
1447 1447
1448 1448 # This is it's own function so extensions can override it.
1449 1449 def _walkstreamfiles(repo):
1450 1450 return repo.store.walk()
1451 1451
1452 1452 def generatestreamclone(repo):
1453 1453 """Emit content for a streaming clone.
1454 1454
1455 1455 This is a generator of raw chunks that constitute a streaming clone.
1456 1456
1457 1457 The stream begins with a line of 2 space-delimited integers containing the
1458 1458 number of entries and total bytes size.
1459 1459
1460 1460 Next, are N entries for each file being transferred. Each file entry starts
1461 1461 as a line with the file name and integer size delimited by a null byte.
1462 1462 The raw file data follows. Following the raw file data is the next file
1463 1463 entry, or EOF.
1464 1464
1465 1465 When used on the wire protocol, an additional line indicating protocol
1466 1466 success will be prepended to the stream. This function is not responsible
1467 1467 for adding it.
1468 1468
1469 1469 This function will obtain a repository lock to ensure a consistent view of
1470 1470 the store is captured. It therefore may raise LockError.
1471 1471 """
1472 1472 entries = []
1473 1473 total_bytes = 0
1474 1474 # Get consistent snapshot of repo, lock during scan.
1475 1475 lock = repo.lock()
1476 1476 try:
1477 1477 repo.ui.debug('scanning\n')
1478 1478 for name, ename, size in _walkstreamfiles(repo):
1479 1479 if size:
1480 1480 entries.append((name, size))
1481 1481 total_bytes += size
1482 1482 finally:
1483 1483 lock.release()
1484 1484
1485 1485 repo.ui.debug('%d files, %d bytes to transfer\n' %
1486 1486 (len(entries), total_bytes))
1487 1487 yield '%d %d\n' % (len(entries), total_bytes)
1488 1488
1489 1489 svfs = repo.svfs
1490 1490 oldaudit = svfs.mustaudit
1491 1491 debugflag = repo.ui.debugflag
1492 1492 svfs.mustaudit = False
1493 1493
1494 1494 try:
1495 1495 for name, size in entries:
1496 1496 if debugflag:
1497 1497 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
1498 1498 # partially encode name over the wire for backwards compat
1499 1499 yield '%s\0%d\n' % (store.encodedir(name), size)
1500 1500 if size <= 65536:
1501 1501 fp = svfs(name)
1502 1502 try:
1503 1503 data = fp.read(size)
1504 1504 finally:
1505 1505 fp.close()
1506 1506 yield data
1507 1507 else:
1508 1508 for chunk in util.filechunkiter(svfs(name), limit=size):
1509 1509 yield chunk
1510 1510 finally:
1511 1511 svfs.mustaudit = oldaudit
1512 1512
1513 1513 def consumestreamclone(repo, fp):
1514 1514 """Apply the contents from a streaming clone file.
1515 1515
1516 1516 This takes the output from "streamout" and applies it to the specified
1517 1517 repository.
1518 1518
1519 1519 Like "streamout," the status line added by the wire protocol is not handled
1520 1520 by this function.
1521 1521 """
1522 1522 lock = repo.lock()
1523 1523 try:
1524 1524 repo.ui.status(_('streaming all changes\n'))
1525 1525 l = fp.readline()
1526 1526 try:
1527 1527 total_files, total_bytes = map(int, l.split(' ', 1))
1528 1528 except (ValueError, TypeError):
1529 1529 raise error.ResponseError(
1530 1530 _('unexpected response from remote server:'), l)
1531 1531 repo.ui.status(_('%d files to transfer, %s of data\n') %
1532 1532 (total_files, util.bytecount(total_bytes)))
1533 1533 handled_bytes = 0
1534 1534 repo.ui.progress(_('clone'), 0, total=total_bytes)
1535 1535 start = time.time()
1536 1536
1537 1537 tr = repo.transaction(_('clone'))
1538 1538 try:
1539 1539 for i in xrange(total_files):
1540 1540 # XXX doesn't support '\n' or '\r' in filenames
1541 1541 l = fp.readline()
1542 1542 try:
1543 1543 name, size = l.split('\0', 1)
1544 1544 size = int(size)
1545 1545 except (ValueError, TypeError):
1546 1546 raise error.ResponseError(
1547 1547 _('unexpected response from remote server:'), l)
1548 1548 if repo.ui.debugflag:
1549 1549 repo.ui.debug('adding %s (%s)\n' %
1550 1550 (name, util.bytecount(size)))
1551 1551 # for backwards compat, name was partially encoded
1552 1552 ofp = repo.svfs(store.decodedir(name), 'w')
1553 1553 for chunk in util.filechunkiter(fp, limit=size):
1554 1554 handled_bytes += len(chunk)
1555 1555 repo.ui.progress(_('clone'), handled_bytes,
1556 1556 total=total_bytes)
1557 1557 ofp.write(chunk)
1558 1558 ofp.close()
1559 1559 tr.close()
1560 1560 finally:
1561 1561 tr.release()
1562 1562
1563 1563 # Writing straight to files circumvented the inmemory caches
1564 1564 repo.invalidate()
1565 1565
1566 1566 elapsed = time.time() - start
1567 1567 if elapsed <= 0:
1568 1568 elapsed = 0.001
1569 1569 repo.ui.progress(_('clone'), None)
1570 1570 repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') %
1571 1571 (util.bytecount(total_bytes), elapsed,
1572 1572 util.bytecount(total_bytes / elapsed)))
1573 1573 finally:
1574 1574 lock.release()
General Comments 0
You need to be logged in to leave comments. Login now