##// END OF EJS Templates
pull: only prefetch bookmarks when using bundle1...
Pierre-Yves David -
r25369:02defdb1 default
parent child Browse files
Show More
@@ -1,1477 +1,1486
1 1 # exchange.py - utility to exchange data between repos.
2 2 #
3 3 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 import time
9 9 from i18n import _
10 10 from node import hex, nullid
11 11 import errno, urllib
12 12 import util, scmutil, changegroup, base85, error, store
13 13 import discovery, phases, obsolete, bookmarks as bookmod, bundle2, pushkey
14 14 import lock as lockmod
15 15
16 16 def readbundle(ui, fh, fname, vfs=None):
17 17 header = changegroup.readexactly(fh, 4)
18 18
19 19 alg = None
20 20 if not fname:
21 21 fname = "stream"
22 22 if not header.startswith('HG') and header.startswith('\0'):
23 23 fh = changegroup.headerlessfixup(fh, header)
24 24 header = "HG10"
25 25 alg = 'UN'
26 26 elif vfs:
27 27 fname = vfs.join(fname)
28 28
29 29 magic, version = header[0:2], header[2:4]
30 30
31 31 if magic != 'HG':
32 32 raise util.Abort(_('%s: not a Mercurial bundle') % fname)
33 33 if version == '10':
34 34 if alg is None:
35 35 alg = changegroup.readexactly(fh, 2)
36 36 return changegroup.cg1unpacker(fh, alg)
37 37 elif version.startswith('2'):
38 38 return bundle2.getunbundler(ui, fh, header=magic + version)
39 39 else:
40 40 raise util.Abort(_('%s: unknown bundle version %s') % (fname, version))
41 41
42 42 def buildobsmarkerspart(bundler, markers):
43 43 """add an obsmarker part to the bundler with <markers>
44 44
45 45 No part is created if markers is empty.
46 46 Raises ValueError if the bundler doesn't support any known obsmarker format.
47 47 """
48 48 if markers:
49 49 remoteversions = bundle2.obsmarkersversion(bundler.capabilities)
50 50 version = obsolete.commonversion(remoteversions)
51 51 if version is None:
52 52 raise ValueError('bundler do not support common obsmarker format')
53 53 stream = obsolete.encodemarkers(markers, True, version=version)
54 54 return bundler.newpart('obsmarkers', data=stream)
55 55 return None
56 56
57 57 def _canusebundle2(op):
58 58 """return true if a pull/push can use bundle2
59 59
60 60 Feel free to nuke this function when we drop the experimental option"""
61 61 return (op.repo.ui.configbool('experimental', 'bundle2-exp', False)
62 62 and op.remote.capable('bundle2'))
63 63
64 64
65 65 class pushoperation(object):
66 66 """A object that represent a single push operation
67 67
68 68 It purpose is to carry push related state and very common operation.
69 69
70 70 A new should be created at the beginning of each push and discarded
71 71 afterward.
72 72 """
73 73
74 74 def __init__(self, repo, remote, force=False, revs=None, newbranch=False,
75 75 bookmarks=()):
76 76 # repo we push from
77 77 self.repo = repo
78 78 self.ui = repo.ui
79 79 # repo we push to
80 80 self.remote = remote
81 81 # force option provided
82 82 self.force = force
83 83 # revs to be pushed (None is "all")
84 84 self.revs = revs
85 85 # bookmark explicitly pushed
86 86 self.bookmarks = bookmarks
87 87 # allow push of new branch
88 88 self.newbranch = newbranch
89 89 # did a local lock get acquired?
90 90 self.locallocked = None
91 91 # step already performed
92 92 # (used to check what steps have been already performed through bundle2)
93 93 self.stepsdone = set()
94 94 # Integer version of the changegroup push result
95 95 # - None means nothing to push
96 96 # - 0 means HTTP error
97 97 # - 1 means we pushed and remote head count is unchanged *or*
98 98 # we have outgoing changesets but refused to push
99 99 # - other values as described by addchangegroup()
100 100 self.cgresult = None
101 101 # Boolean value for the bookmark push
102 102 self.bkresult = None
103 103 # discover.outgoing object (contains common and outgoing data)
104 104 self.outgoing = None
105 105 # all remote heads before the push
106 106 self.remoteheads = None
107 107 # testable as a boolean indicating if any nodes are missing locally.
108 108 self.incoming = None
109 109 # phases changes that must be pushed along side the changesets
110 110 self.outdatedphases = None
111 111 # phases changes that must be pushed if changeset push fails
112 112 self.fallbackoutdatedphases = None
113 113 # outgoing obsmarkers
114 114 self.outobsmarkers = set()
115 115 # outgoing bookmarks
116 116 self.outbookmarks = []
117 117 # transaction manager
118 118 self.trmanager = None
119 119
120 120 @util.propertycache
121 121 def futureheads(self):
122 122 """future remote heads if the changeset push succeeds"""
123 123 return self.outgoing.missingheads
124 124
125 125 @util.propertycache
126 126 def fallbackheads(self):
127 127 """future remote heads if the changeset push fails"""
128 128 if self.revs is None:
129 129 # not target to push, all common are relevant
130 130 return self.outgoing.commonheads
131 131 unfi = self.repo.unfiltered()
132 132 # I want cheads = heads(::missingheads and ::commonheads)
133 133 # (missingheads is revs with secret changeset filtered out)
134 134 #
135 135 # This can be expressed as:
136 136 # cheads = ( (missingheads and ::commonheads)
137 137 # + (commonheads and ::missingheads))"
138 138 # )
139 139 #
140 140 # while trying to push we already computed the following:
141 141 # common = (::commonheads)
142 142 # missing = ((commonheads::missingheads) - commonheads)
143 143 #
144 144 # We can pick:
145 145 # * missingheads part of common (::commonheads)
146 146 common = set(self.outgoing.common)
147 147 nm = self.repo.changelog.nodemap
148 148 cheads = [node for node in self.revs if nm[node] in common]
149 149 # and
150 150 # * commonheads parents on missing
151 151 revset = unfi.set('%ln and parents(roots(%ln))',
152 152 self.outgoing.commonheads,
153 153 self.outgoing.missing)
154 154 cheads.extend(c.node() for c in revset)
155 155 return cheads
156 156
157 157 @property
158 158 def commonheads(self):
159 159 """set of all common heads after changeset bundle push"""
160 160 if self.cgresult:
161 161 return self.futureheads
162 162 else:
163 163 return self.fallbackheads
164 164
165 165 # mapping of message used when pushing bookmark
166 166 bookmsgmap = {'update': (_("updating bookmark %s\n"),
167 167 _('updating bookmark %s failed!\n')),
168 168 'export': (_("exporting bookmark %s\n"),
169 169 _('exporting bookmark %s failed!\n')),
170 170 'delete': (_("deleting remote bookmark %s\n"),
171 171 _('deleting remote bookmark %s failed!\n')),
172 172 }
173 173
174 174
175 175 def push(repo, remote, force=False, revs=None, newbranch=False, bookmarks=()):
176 176 '''Push outgoing changesets (limited by revs) from a local
177 177 repository to remote. Return an integer:
178 178 - None means nothing to push
179 179 - 0 means HTTP error
180 180 - 1 means we pushed and remote head count is unchanged *or*
181 181 we have outgoing changesets but refused to push
182 182 - other values as described by addchangegroup()
183 183 '''
184 184 pushop = pushoperation(repo, remote, force, revs, newbranch, bookmarks)
185 185 if pushop.remote.local():
186 186 missing = (set(pushop.repo.requirements)
187 187 - pushop.remote.local().supported)
188 188 if missing:
189 189 msg = _("required features are not"
190 190 " supported in the destination:"
191 191 " %s") % (', '.join(sorted(missing)))
192 192 raise util.Abort(msg)
193 193
194 194 # there are two ways to push to remote repo:
195 195 #
196 196 # addchangegroup assumes local user can lock remote
197 197 # repo (local filesystem, old ssh servers).
198 198 #
199 199 # unbundle assumes local user cannot lock remote repo (new ssh
200 200 # servers, http servers).
201 201
202 202 if not pushop.remote.canpush():
203 203 raise util.Abort(_("destination does not support push"))
204 204 # get local lock as we might write phase data
205 205 localwlock = locallock = None
206 206 try:
207 207 # bundle2 push may receive a reply bundle touching bookmarks or other
208 208 # things requiring the wlock. Take it now to ensure proper ordering.
209 209 maypushback = pushop.ui.configbool('experimental', 'bundle2.pushback')
210 210 if _canusebundle2(pushop) and maypushback:
211 211 localwlock = pushop.repo.wlock()
212 212 locallock = pushop.repo.lock()
213 213 pushop.locallocked = True
214 214 except IOError, err:
215 215 pushop.locallocked = False
216 216 if err.errno != errno.EACCES:
217 217 raise
218 218 # source repo cannot be locked.
219 219 # We do not abort the push, but just disable the local phase
220 220 # synchronisation.
221 221 msg = 'cannot lock source repository: %s\n' % err
222 222 pushop.ui.debug(msg)
223 223 try:
224 224 if pushop.locallocked:
225 225 pushop.trmanager = transactionmanager(repo,
226 226 'push-response',
227 227 pushop.remote.url())
228 228 pushop.repo.checkpush(pushop)
229 229 lock = None
230 230 unbundle = pushop.remote.capable('unbundle')
231 231 if not unbundle:
232 232 lock = pushop.remote.lock()
233 233 try:
234 234 _pushdiscovery(pushop)
235 235 if _canusebundle2(pushop):
236 236 _pushbundle2(pushop)
237 237 _pushchangeset(pushop)
238 238 _pushsyncphase(pushop)
239 239 _pushobsolete(pushop)
240 240 _pushbookmark(pushop)
241 241 finally:
242 242 if lock is not None:
243 243 lock.release()
244 244 if pushop.trmanager:
245 245 pushop.trmanager.close()
246 246 finally:
247 247 if pushop.trmanager:
248 248 pushop.trmanager.release()
249 249 if locallock is not None:
250 250 locallock.release()
251 251 if localwlock is not None:
252 252 localwlock.release()
253 253
254 254 return pushop
255 255
256 256 # list of steps to perform discovery before push
257 257 pushdiscoveryorder = []
258 258
259 259 # Mapping between step name and function
260 260 #
261 261 # This exists to help extensions wrap steps if necessary
262 262 pushdiscoverymapping = {}
263 263
264 264 def pushdiscovery(stepname):
265 265 """decorator for function performing discovery before push
266 266
267 267 The function is added to the step -> function mapping and appended to the
268 268 list of steps. Beware that decorated function will be added in order (this
269 269 may matter).
270 270
271 271 You can only use this decorator for a new step, if you want to wrap a step
272 272 from an extension, change the pushdiscovery dictionary directly."""
273 273 def dec(func):
274 274 assert stepname not in pushdiscoverymapping
275 275 pushdiscoverymapping[stepname] = func
276 276 pushdiscoveryorder.append(stepname)
277 277 return func
278 278 return dec
279 279
280 280 def _pushdiscovery(pushop):
281 281 """Run all discovery steps"""
282 282 for stepname in pushdiscoveryorder:
283 283 step = pushdiscoverymapping[stepname]
284 284 step(pushop)
285 285
286 286 @pushdiscovery('changeset')
287 287 def _pushdiscoverychangeset(pushop):
288 288 """discover the changeset that need to be pushed"""
289 289 fci = discovery.findcommonincoming
290 290 commoninc = fci(pushop.repo, pushop.remote, force=pushop.force)
291 291 common, inc, remoteheads = commoninc
292 292 fco = discovery.findcommonoutgoing
293 293 outgoing = fco(pushop.repo, pushop.remote, onlyheads=pushop.revs,
294 294 commoninc=commoninc, force=pushop.force)
295 295 pushop.outgoing = outgoing
296 296 pushop.remoteheads = remoteheads
297 297 pushop.incoming = inc
298 298
299 299 @pushdiscovery('phase')
300 300 def _pushdiscoveryphase(pushop):
301 301 """discover the phase that needs to be pushed
302 302
303 303 (computed for both success and failure case for changesets push)"""
304 304 outgoing = pushop.outgoing
305 305 unfi = pushop.repo.unfiltered()
306 306 remotephases = pushop.remote.listkeys('phases')
307 307 publishing = remotephases.get('publishing', False)
308 308 if (pushop.ui.configbool('ui', '_usedassubrepo', False)
309 309 and remotephases # server supports phases
310 310 and not pushop.outgoing.missing # no changesets to be pushed
311 311 and publishing):
312 312 # When:
313 313 # - this is a subrepo push
314 314 # - and remote support phase
315 315 # - and no changeset are to be pushed
316 316 # - and remote is publishing
317 317 # We may be in issue 3871 case!
318 318 # We drop the possible phase synchronisation done by
319 319 # courtesy to publish changesets possibly locally draft
320 320 # on the remote.
321 321 remotephases = {'publishing': 'True'}
322 322 ana = phases.analyzeremotephases(pushop.repo,
323 323 pushop.fallbackheads,
324 324 remotephases)
325 325 pheads, droots = ana
326 326 extracond = ''
327 327 if not publishing:
328 328 extracond = ' and public()'
329 329 revset = 'heads((%%ln::%%ln) %s)' % extracond
330 330 # Get the list of all revs draft on remote by public here.
331 331 # XXX Beware that revset break if droots is not strictly
332 332 # XXX root we may want to ensure it is but it is costly
333 333 fallback = list(unfi.set(revset, droots, pushop.fallbackheads))
334 334 if not outgoing.missing:
335 335 future = fallback
336 336 else:
337 337 # adds changeset we are going to push as draft
338 338 #
339 339 # should not be necessary for publishing server, but because of an
340 340 # issue fixed in xxxxx we have to do it anyway.
341 341 fdroots = list(unfi.set('roots(%ln + %ln::)',
342 342 outgoing.missing, droots))
343 343 fdroots = [f.node() for f in fdroots]
344 344 future = list(unfi.set(revset, fdroots, pushop.futureheads))
345 345 pushop.outdatedphases = future
346 346 pushop.fallbackoutdatedphases = fallback
347 347
348 348 @pushdiscovery('obsmarker')
349 349 def _pushdiscoveryobsmarkers(pushop):
350 350 if (obsolete.isenabled(pushop.repo, obsolete.exchangeopt)
351 351 and pushop.repo.obsstore
352 352 and 'obsolete' in pushop.remote.listkeys('namespaces')):
353 353 repo = pushop.repo
354 354 # very naive computation, that can be quite expensive on big repo.
355 355 # However: evolution is currently slow on them anyway.
356 356 nodes = (c.node() for c in repo.set('::%ln', pushop.futureheads))
357 357 pushop.outobsmarkers = pushop.repo.obsstore.relevantmarkers(nodes)
358 358
359 359 @pushdiscovery('bookmarks')
360 360 def _pushdiscoverybookmarks(pushop):
361 361 ui = pushop.ui
362 362 repo = pushop.repo.unfiltered()
363 363 remote = pushop.remote
364 364 ui.debug("checking for updated bookmarks\n")
365 365 ancestors = ()
366 366 if pushop.revs:
367 367 revnums = map(repo.changelog.rev, pushop.revs)
368 368 ancestors = repo.changelog.ancestors(revnums, inclusive=True)
369 369 remotebookmark = remote.listkeys('bookmarks')
370 370
371 371 explicit = set(pushop.bookmarks)
372 372
373 373 comp = bookmod.compare(repo, repo._bookmarks, remotebookmark, srchex=hex)
374 374 addsrc, adddst, advsrc, advdst, diverge, differ, invalid, same = comp
375 375 for b, scid, dcid in advsrc:
376 376 if b in explicit:
377 377 explicit.remove(b)
378 378 if not ancestors or repo[scid].rev() in ancestors:
379 379 pushop.outbookmarks.append((b, dcid, scid))
380 380 # search added bookmark
381 381 for b, scid, dcid in addsrc:
382 382 if b in explicit:
383 383 explicit.remove(b)
384 384 pushop.outbookmarks.append((b, '', scid))
385 385 # search for overwritten bookmark
386 386 for b, scid, dcid in advdst + diverge + differ:
387 387 if b in explicit:
388 388 explicit.remove(b)
389 389 pushop.outbookmarks.append((b, dcid, scid))
390 390 # search for bookmark to delete
391 391 for b, scid, dcid in adddst:
392 392 if b in explicit:
393 393 explicit.remove(b)
394 394 # treat as "deleted locally"
395 395 pushop.outbookmarks.append((b, dcid, ''))
396 396 # identical bookmarks shouldn't get reported
397 397 for b, scid, dcid in same:
398 398 if b in explicit:
399 399 explicit.remove(b)
400 400
401 401 if explicit:
402 402 explicit = sorted(explicit)
403 403 # we should probably list all of them
404 404 ui.warn(_('bookmark %s does not exist on the local '
405 405 'or remote repository!\n') % explicit[0])
406 406 pushop.bkresult = 2
407 407
408 408 pushop.outbookmarks.sort()
409 409
410 410 def _pushcheckoutgoing(pushop):
411 411 outgoing = pushop.outgoing
412 412 unfi = pushop.repo.unfiltered()
413 413 if not outgoing.missing:
414 414 # nothing to push
415 415 scmutil.nochangesfound(unfi.ui, unfi, outgoing.excluded)
416 416 return False
417 417 # something to push
418 418 if not pushop.force:
419 419 # if repo.obsstore == False --> no obsolete
420 420 # then, save the iteration
421 421 if unfi.obsstore:
422 422 # this message are here for 80 char limit reason
423 423 mso = _("push includes obsolete changeset: %s!")
424 424 mst = {"unstable": _("push includes unstable changeset: %s!"),
425 425 "bumped": _("push includes bumped changeset: %s!"),
426 426 "divergent": _("push includes divergent changeset: %s!")}
427 427 # If we are to push if there is at least one
428 428 # obsolete or unstable changeset in missing, at
429 429 # least one of the missinghead will be obsolete or
430 430 # unstable. So checking heads only is ok
431 431 for node in outgoing.missingheads:
432 432 ctx = unfi[node]
433 433 if ctx.obsolete():
434 434 raise util.Abort(mso % ctx)
435 435 elif ctx.troubled():
436 436 raise util.Abort(mst[ctx.troubles()[0]] % ctx)
437 437 newbm = pushop.ui.configlist('bookmarks', 'pushing')
438 438 discovery.checkheads(unfi, pushop.remote, outgoing,
439 439 pushop.remoteheads,
440 440 pushop.newbranch,
441 441 bool(pushop.incoming),
442 442 newbm)
443 443 return True
444 444
445 445 # List of names of steps to perform for an outgoing bundle2, order matters.
446 446 b2partsgenorder = []
447 447
448 448 # Mapping between step name and function
449 449 #
450 450 # This exists to help extensions wrap steps if necessary
451 451 b2partsgenmapping = {}
452 452
453 453 def b2partsgenerator(stepname, idx=None):
454 454 """decorator for function generating bundle2 part
455 455
456 456 The function is added to the step -> function mapping and appended to the
457 457 list of steps. Beware that decorated functions will be added in order
458 458 (this may matter).
459 459
460 460 You can only use this decorator for new steps, if you want to wrap a step
461 461 from an extension, attack the b2partsgenmapping dictionary directly."""
462 462 def dec(func):
463 463 assert stepname not in b2partsgenmapping
464 464 b2partsgenmapping[stepname] = func
465 465 if idx is None:
466 466 b2partsgenorder.append(stepname)
467 467 else:
468 468 b2partsgenorder.insert(idx, stepname)
469 469 return func
470 470 return dec
471 471
472 472 @b2partsgenerator('changeset')
473 473 def _pushb2ctx(pushop, bundler):
474 474 """handle changegroup push through bundle2
475 475
476 476 addchangegroup result is stored in the ``pushop.cgresult`` attribute.
477 477 """
478 478 if 'changesets' in pushop.stepsdone:
479 479 return
480 480 pushop.stepsdone.add('changesets')
481 481 # Send known heads to the server for race detection.
482 482 if not _pushcheckoutgoing(pushop):
483 483 return
484 484 pushop.repo.prepushoutgoinghooks(pushop.repo,
485 485 pushop.remote,
486 486 pushop.outgoing)
487 487 if not pushop.force:
488 488 bundler.newpart('check:heads', data=iter(pushop.remoteheads))
489 489 b2caps = bundle2.bundle2caps(pushop.remote)
490 490 version = None
491 491 cgversions = b2caps.get('changegroup')
492 492 if not cgversions: # 3.1 and 3.2 ship with an empty value
493 493 cg = changegroup.getlocalchangegroupraw(pushop.repo, 'push',
494 494 pushop.outgoing)
495 495 else:
496 496 cgversions = [v for v in cgversions if v in changegroup.packermap]
497 497 if not cgversions:
498 498 raise ValueError(_('no common changegroup version'))
499 499 version = max(cgversions)
500 500 cg = changegroup.getlocalchangegroupraw(pushop.repo, 'push',
501 501 pushop.outgoing,
502 502 version=version)
503 503 cgpart = bundler.newpart('changegroup', data=cg)
504 504 if version is not None:
505 505 cgpart.addparam('version', version)
506 506 def handlereply(op):
507 507 """extract addchangegroup returns from server reply"""
508 508 cgreplies = op.records.getreplies(cgpart.id)
509 509 assert len(cgreplies['changegroup']) == 1
510 510 pushop.cgresult = cgreplies['changegroup'][0]['return']
511 511 return handlereply
512 512
513 513 @b2partsgenerator('phase')
514 514 def _pushb2phases(pushop, bundler):
515 515 """handle phase push through bundle2"""
516 516 if 'phases' in pushop.stepsdone:
517 517 return
518 518 b2caps = bundle2.bundle2caps(pushop.remote)
519 519 if not 'pushkey' in b2caps:
520 520 return
521 521 pushop.stepsdone.add('phases')
522 522 part2node = []
523 523 enc = pushkey.encode
524 524 for newremotehead in pushop.outdatedphases:
525 525 part = bundler.newpart('pushkey')
526 526 part.addparam('namespace', enc('phases'))
527 527 part.addparam('key', enc(newremotehead.hex()))
528 528 part.addparam('old', enc(str(phases.draft)))
529 529 part.addparam('new', enc(str(phases.public)))
530 530 part2node.append((part.id, newremotehead))
531 531 def handlereply(op):
532 532 for partid, node in part2node:
533 533 partrep = op.records.getreplies(partid)
534 534 results = partrep['pushkey']
535 535 assert len(results) <= 1
536 536 msg = None
537 537 if not results:
538 538 msg = _('server ignored update of %s to public!\n') % node
539 539 elif not int(results[0]['return']):
540 540 msg = _('updating %s to public failed!\n') % node
541 541 if msg is not None:
542 542 pushop.ui.warn(msg)
543 543 return handlereply
544 544
545 545 @b2partsgenerator('obsmarkers')
546 546 def _pushb2obsmarkers(pushop, bundler):
547 547 if 'obsmarkers' in pushop.stepsdone:
548 548 return
549 549 remoteversions = bundle2.obsmarkersversion(bundler.capabilities)
550 550 if obsolete.commonversion(remoteversions) is None:
551 551 return
552 552 pushop.stepsdone.add('obsmarkers')
553 553 if pushop.outobsmarkers:
554 554 markers = sorted(pushop.outobsmarkers)
555 555 buildobsmarkerspart(bundler, markers)
556 556
557 557 @b2partsgenerator('bookmarks')
558 558 def _pushb2bookmarks(pushop, bundler):
559 559 """handle phase push through bundle2"""
560 560 if 'bookmarks' in pushop.stepsdone:
561 561 return
562 562 b2caps = bundle2.bundle2caps(pushop.remote)
563 563 if 'pushkey' not in b2caps:
564 564 return
565 565 pushop.stepsdone.add('bookmarks')
566 566 part2book = []
567 567 enc = pushkey.encode
568 568 for book, old, new in pushop.outbookmarks:
569 569 part = bundler.newpart('pushkey')
570 570 part.addparam('namespace', enc('bookmarks'))
571 571 part.addparam('key', enc(book))
572 572 part.addparam('old', enc(old))
573 573 part.addparam('new', enc(new))
574 574 action = 'update'
575 575 if not old:
576 576 action = 'export'
577 577 elif not new:
578 578 action = 'delete'
579 579 part2book.append((part.id, book, action))
580 580
581 581
582 582 def handlereply(op):
583 583 ui = pushop.ui
584 584 for partid, book, action in part2book:
585 585 partrep = op.records.getreplies(partid)
586 586 results = partrep['pushkey']
587 587 assert len(results) <= 1
588 588 if not results:
589 589 pushop.ui.warn(_('server ignored bookmark %s update\n') % book)
590 590 else:
591 591 ret = int(results[0]['return'])
592 592 if ret:
593 593 ui.status(bookmsgmap[action][0] % book)
594 594 else:
595 595 ui.warn(bookmsgmap[action][1] % book)
596 596 if pushop.bkresult is not None:
597 597 pushop.bkresult = 1
598 598 return handlereply
599 599
600 600
601 601 def _pushbundle2(pushop):
602 602 """push data to the remote using bundle2
603 603
604 604 The only currently supported type of data is changegroup but this will
605 605 evolve in the future."""
606 606 bundler = bundle2.bundle20(pushop.ui, bundle2.bundle2caps(pushop.remote))
607 607 pushback = (pushop.trmanager
608 608 and pushop.ui.configbool('experimental', 'bundle2.pushback'))
609 609
610 610 # create reply capability
611 611 capsblob = bundle2.encodecaps(bundle2.getrepocaps(pushop.repo,
612 612 allowpushback=pushback))
613 613 bundler.newpart('replycaps', data=capsblob)
614 614 replyhandlers = []
615 615 for partgenname in b2partsgenorder:
616 616 partgen = b2partsgenmapping[partgenname]
617 617 ret = partgen(pushop, bundler)
618 618 if callable(ret):
619 619 replyhandlers.append(ret)
620 620 # do not push if nothing to push
621 621 if bundler.nbparts <= 1:
622 622 return
623 623 stream = util.chunkbuffer(bundler.getchunks())
624 624 try:
625 625 reply = pushop.remote.unbundle(stream, ['force'], 'push')
626 626 except error.BundleValueError, exc:
627 627 raise util.Abort('missing support for %s' % exc)
628 628 try:
629 629 trgetter = None
630 630 if pushback:
631 631 trgetter = pushop.trmanager.transaction
632 632 op = bundle2.processbundle(pushop.repo, reply, trgetter)
633 633 except error.BundleValueError, exc:
634 634 raise util.Abort('missing support for %s' % exc)
635 635 for rephand in replyhandlers:
636 636 rephand(op)
637 637
638 638 def _pushchangeset(pushop):
639 639 """Make the actual push of changeset bundle to remote repo"""
640 640 if 'changesets' in pushop.stepsdone:
641 641 return
642 642 pushop.stepsdone.add('changesets')
643 643 if not _pushcheckoutgoing(pushop):
644 644 return
645 645 pushop.repo.prepushoutgoinghooks(pushop.repo,
646 646 pushop.remote,
647 647 pushop.outgoing)
648 648 outgoing = pushop.outgoing
649 649 unbundle = pushop.remote.capable('unbundle')
650 650 # TODO: get bundlecaps from remote
651 651 bundlecaps = None
652 652 # create a changegroup from local
653 653 if pushop.revs is None and not (outgoing.excluded
654 654 or pushop.repo.changelog.filteredrevs):
655 655 # push everything,
656 656 # use the fast path, no race possible on push
657 657 bundler = changegroup.cg1packer(pushop.repo, bundlecaps)
658 658 cg = changegroup.getsubset(pushop.repo,
659 659 outgoing,
660 660 bundler,
661 661 'push',
662 662 fastpath=True)
663 663 else:
664 664 cg = changegroup.getlocalchangegroup(pushop.repo, 'push', outgoing,
665 665 bundlecaps)
666 666
667 667 # apply changegroup to remote
668 668 if unbundle:
669 669 # local repo finds heads on server, finds out what
670 670 # revs it must push. once revs transferred, if server
671 671 # finds it has different heads (someone else won
672 672 # commit/push race), server aborts.
673 673 if pushop.force:
674 674 remoteheads = ['force']
675 675 else:
676 676 remoteheads = pushop.remoteheads
677 677 # ssh: return remote's addchangegroup()
678 678 # http: return remote's addchangegroup() or 0 for error
679 679 pushop.cgresult = pushop.remote.unbundle(cg, remoteheads,
680 680 pushop.repo.url())
681 681 else:
682 682 # we return an integer indicating remote head count
683 683 # change
684 684 pushop.cgresult = pushop.remote.addchangegroup(cg, 'push',
685 685 pushop.repo.url())
686 686
687 687 def _pushsyncphase(pushop):
688 688 """synchronise phase information locally and remotely"""
689 689 cheads = pushop.commonheads
690 690 # even when we don't push, exchanging phase data is useful
691 691 remotephases = pushop.remote.listkeys('phases')
692 692 if (pushop.ui.configbool('ui', '_usedassubrepo', False)
693 693 and remotephases # server supports phases
694 694 and pushop.cgresult is None # nothing was pushed
695 695 and remotephases.get('publishing', False)):
696 696 # When:
697 697 # - this is a subrepo push
698 698 # - and remote support phase
699 699 # - and no changeset was pushed
700 700 # - and remote is publishing
701 701 # We may be in issue 3871 case!
702 702 # We drop the possible phase synchronisation done by
703 703 # courtesy to publish changesets possibly locally draft
704 704 # on the remote.
705 705 remotephases = {'publishing': 'True'}
706 706 if not remotephases: # old server or public only reply from non-publishing
707 707 _localphasemove(pushop, cheads)
708 708 # don't push any phase data as there is nothing to push
709 709 else:
710 710 ana = phases.analyzeremotephases(pushop.repo, cheads,
711 711 remotephases)
712 712 pheads, droots = ana
713 713 ### Apply remote phase on local
714 714 if remotephases.get('publishing', False):
715 715 _localphasemove(pushop, cheads)
716 716 else: # publish = False
717 717 _localphasemove(pushop, pheads)
718 718 _localphasemove(pushop, cheads, phases.draft)
719 719 ### Apply local phase on remote
720 720
721 721 if pushop.cgresult:
722 722 if 'phases' in pushop.stepsdone:
723 723 # phases already pushed though bundle2
724 724 return
725 725 outdated = pushop.outdatedphases
726 726 else:
727 727 outdated = pushop.fallbackoutdatedphases
728 728
729 729 pushop.stepsdone.add('phases')
730 730
731 731 # filter heads already turned public by the push
732 732 outdated = [c for c in outdated if c.node() not in pheads]
733 733 # fallback to independent pushkey command
734 734 for newremotehead in outdated:
735 735 r = pushop.remote.pushkey('phases',
736 736 newremotehead.hex(),
737 737 str(phases.draft),
738 738 str(phases.public))
739 739 if not r:
740 740 pushop.ui.warn(_('updating %s to public failed!\n')
741 741 % newremotehead)
742 742
743 743 def _localphasemove(pushop, nodes, phase=phases.public):
744 744 """move <nodes> to <phase> in the local source repo"""
745 745 if pushop.trmanager:
746 746 phases.advanceboundary(pushop.repo,
747 747 pushop.trmanager.transaction(),
748 748 phase,
749 749 nodes)
750 750 else:
751 751 # repo is not locked, do not change any phases!
752 752 # Informs the user that phases should have been moved when
753 753 # applicable.
754 754 actualmoves = [n for n in nodes if phase < pushop.repo[n].phase()]
755 755 phasestr = phases.phasenames[phase]
756 756 if actualmoves:
757 757 pushop.ui.status(_('cannot lock source repo, skipping '
758 758 'local %s phase update\n') % phasestr)
759 759
760 760 def _pushobsolete(pushop):
761 761 """utility function to push obsolete markers to a remote"""
762 762 if 'obsmarkers' in pushop.stepsdone:
763 763 return
764 764 pushop.ui.debug('try to push obsolete markers to remote\n')
765 765 repo = pushop.repo
766 766 remote = pushop.remote
767 767 pushop.stepsdone.add('obsmarkers')
768 768 if pushop.outobsmarkers:
769 769 rslts = []
770 770 remotedata = obsolete._pushkeyescape(sorted(pushop.outobsmarkers))
771 771 for key in sorted(remotedata, reverse=True):
772 772 # reverse sort to ensure we end with dump0
773 773 data = remotedata[key]
774 774 rslts.append(remote.pushkey('obsolete', key, '', data))
775 775 if [r for r in rslts if not r]:
776 776 msg = _('failed to push some obsolete markers!\n')
777 777 repo.ui.warn(msg)
778 778
779 779 def _pushbookmark(pushop):
780 780 """Update bookmark position on remote"""
781 781 if pushop.cgresult == 0 or 'bookmarks' in pushop.stepsdone:
782 782 return
783 783 pushop.stepsdone.add('bookmarks')
784 784 ui = pushop.ui
785 785 remote = pushop.remote
786 786
787 787 for b, old, new in pushop.outbookmarks:
788 788 action = 'update'
789 789 if not old:
790 790 action = 'export'
791 791 elif not new:
792 792 action = 'delete'
793 793 if remote.pushkey('bookmarks', b, old, new):
794 794 ui.status(bookmsgmap[action][0] % b)
795 795 else:
796 796 ui.warn(bookmsgmap[action][1] % b)
797 797 # discovery can have set the value form invalid entry
798 798 if pushop.bkresult is not None:
799 799 pushop.bkresult = 1
800 800
801 801 class pulloperation(object):
802 802 """A object that represent a single pull operation
803 803
804 804 It purpose is to carry pull related state and very common operation.
805 805
806 806 A new should be created at the beginning of each pull and discarded
807 807 afterward.
808 808 """
809 809
810 810 def __init__(self, repo, remote, heads=None, force=False, bookmarks=()):
811 811 # repo we pull into
812 812 self.repo = repo
813 813 # repo we pull from
814 814 self.remote = remote
815 815 # revision we try to pull (None is "all")
816 816 self.heads = heads
817 817 # bookmark pulled explicitly
818 818 self.explicitbookmarks = bookmarks
819 819 # do we force pull?
820 820 self.force = force
821 821 # transaction manager
822 822 self.trmanager = None
823 823 # set of common changeset between local and remote before pull
824 824 self.common = None
825 825 # set of pulled head
826 826 self.rheads = None
827 827 # list of missing changeset to fetch remotely
828 828 self.fetch = None
829 829 # remote bookmarks data
830 830 self.remotebookmarks = None
831 831 # result of changegroup pulling (used as return code by pull)
832 832 self.cgresult = None
833 833 # list of step already done
834 834 self.stepsdone = set()
835 835
836 836 @util.propertycache
837 837 def pulledsubset(self):
838 838 """heads of the set of changeset target by the pull"""
839 839 # compute target subset
840 840 if self.heads is None:
841 841 # We pulled every thing possible
842 842 # sync on everything common
843 843 c = set(self.common)
844 844 ret = list(self.common)
845 845 for n in self.rheads:
846 846 if n not in c:
847 847 ret.append(n)
848 848 return ret
849 849 else:
850 850 # We pulled a specific subset
851 851 # sync on this subset
852 852 return self.heads
853 853
854 854 def gettransaction(self):
855 855 # deprecated; talk to trmanager directly
856 856 return self.trmanager.transaction()
857 857
858 858 class transactionmanager(object):
859 859 """An object to manage the life cycle of a transaction
860 860
861 861 It creates the transaction on demand and calls the appropriate hooks when
862 862 closing the transaction."""
863 863 def __init__(self, repo, source, url):
864 864 self.repo = repo
865 865 self.source = source
866 866 self.url = url
867 867 self._tr = None
868 868
869 869 def transaction(self):
870 870 """Return an open transaction object, constructing if necessary"""
871 871 if not self._tr:
872 872 trname = '%s\n%s' % (self.source, util.hidepassword(self.url))
873 873 self._tr = self.repo.transaction(trname)
874 874 self._tr.hookargs['source'] = self.source
875 875 self._tr.hookargs['url'] = self.url
876 876 return self._tr
877 877
878 878 def close(self):
879 879 """close transaction if created"""
880 880 if self._tr is not None:
881 881 self._tr.close()
882 882
883 883 def release(self):
884 884 """release transaction if created"""
885 885 if self._tr is not None:
886 886 self._tr.release()
887 887
888 888 def pull(repo, remote, heads=None, force=False, bookmarks=()):
889 889 pullop = pulloperation(repo, remote, heads, force, bookmarks=bookmarks)
890 890 if pullop.remote.local():
891 891 missing = set(pullop.remote.requirements) - pullop.repo.supported
892 892 if missing:
893 893 msg = _("required features are not"
894 894 " supported in the destination:"
895 895 " %s") % (', '.join(sorted(missing)))
896 896 raise util.Abort(msg)
897 897
898 pullop.remotebookmarks = remote.listkeys('bookmarks')
899 898 lock = pullop.repo.lock()
900 899 try:
901 900 pullop.trmanager = transactionmanager(repo, 'pull', remote.url())
902 901 _pulldiscovery(pullop)
903 902 if _canusebundle2(pullop):
904 903 _pullbundle2(pullop)
905 904 _pullchangeset(pullop)
906 905 _pullphase(pullop)
907 906 _pullbookmarks(pullop)
908 907 _pullobsolete(pullop)
909 908 pullop.trmanager.close()
910 909 finally:
911 910 pullop.trmanager.release()
912 911 lock.release()
913 912
914 913 return pullop
915 914
916 915 # list of steps to perform discovery before pull
917 916 pulldiscoveryorder = []
918 917
919 918 # Mapping between step name and function
920 919 #
921 920 # This exists to help extensions wrap steps if necessary
922 921 pulldiscoverymapping = {}
923 922
924 923 def pulldiscovery(stepname):
925 924 """decorator for function performing discovery before pull
926 925
927 926 The function is added to the step -> function mapping and appended to the
928 927 list of steps. Beware that decorated function will be added in order (this
929 928 may matter).
930 929
931 930 You can only use this decorator for a new step, if you want to wrap a step
932 931 from an extension, change the pulldiscovery dictionary directly."""
933 932 def dec(func):
934 933 assert stepname not in pulldiscoverymapping
935 934 pulldiscoverymapping[stepname] = func
936 935 pulldiscoveryorder.append(stepname)
937 936 return func
938 937 return dec
939 938
940 939 def _pulldiscovery(pullop):
941 940 """Run all discovery steps"""
942 941 for stepname in pulldiscoveryorder:
943 942 step = pulldiscoverymapping[stepname]
944 943 step(pullop)
945 944
945 @pulldiscovery('b1:bookmarks')
946 def _pullbookmarkbundle1(pullop):
947 """fetch bookmark data in bundle1 case
948
949 If not using bundle2, we have to fetch bookmarks before changeset
950 discovery to reduce the chance and impact of race conditions."""
951 if not _canusebundle2(pullop): # all bundle2 server now support listkeys
952 pullop.remotebookmarks = pullop.remote.listkeys('bookmarks')
953
954
946 955 @pulldiscovery('changegroup')
947 956 def _pulldiscoverychangegroup(pullop):
948 957 """discovery phase for the pull
949 958
950 959 Current handle changeset discovery only, will change handle all discovery
951 960 at some point."""
952 961 tmp = discovery.findcommonincoming(pullop.repo,
953 962 pullop.remote,
954 963 heads=pullop.heads,
955 964 force=pullop.force)
956 965 common, fetch, rheads = tmp
957 966 nm = pullop.repo.unfiltered().changelog.nodemap
958 967 if fetch and rheads:
959 968 # If a remote heads in filtered locally, lets drop it from the unknown
960 969 # remote heads and put in back in common.
961 970 #
962 971 # This is a hackish solution to catch most of "common but locally
963 972 # hidden situation". We do not performs discovery on unfiltered
964 973 # repository because it end up doing a pathological amount of round
965 974 # trip for w huge amount of changeset we do not care about.
966 975 #
967 976 # If a set of such "common but filtered" changeset exist on the server
968 977 # but are not including a remote heads, we'll not be able to detect it,
969 978 scommon = set(common)
970 979 filteredrheads = []
971 980 for n in rheads:
972 981 if n in nm:
973 982 if n not in scommon:
974 983 common.append(n)
975 984 else:
976 985 filteredrheads.append(n)
977 986 if not filteredrheads:
978 987 fetch = []
979 988 rheads = filteredrheads
980 989 pullop.common = common
981 990 pullop.fetch = fetch
982 991 pullop.rheads = rheads
983 992
984 993 def _pullbundle2(pullop):
985 994 """pull data using bundle2
986 995
987 996 For now, the only supported data are changegroup."""
988 997 remotecaps = bundle2.bundle2caps(pullop.remote)
989 998 kwargs = {'bundlecaps': caps20to10(pullop.repo)}
990 999 # pulling changegroup
991 1000 pullop.stepsdone.add('changegroup')
992 1001
993 1002 kwargs['common'] = pullop.common
994 1003 kwargs['heads'] = pullop.heads or pullop.rheads
995 1004 kwargs['cg'] = pullop.fetch
996 1005 if 'listkeys' in remotecaps:
997 1006 kwargs['listkeys'] = ['phase', 'bookmarks']
998 1007 if not pullop.fetch:
999 1008 pullop.repo.ui.status(_("no changes found\n"))
1000 1009 pullop.cgresult = 0
1001 1010 else:
1002 1011 if pullop.heads is None and list(pullop.common) == [nullid]:
1003 1012 pullop.repo.ui.status(_("requesting all changes\n"))
1004 1013 if obsolete.isenabled(pullop.repo, obsolete.exchangeopt):
1005 1014 remoteversions = bundle2.obsmarkersversion(remotecaps)
1006 1015 if obsolete.commonversion(remoteversions) is not None:
1007 1016 kwargs['obsmarkers'] = True
1008 1017 pullop.stepsdone.add('obsmarkers')
1009 1018 _pullbundle2extraprepare(pullop, kwargs)
1010 1019 bundle = pullop.remote.getbundle('pull', **kwargs)
1011 1020 try:
1012 1021 op = bundle2.processbundle(pullop.repo, bundle, pullop.gettransaction)
1013 1022 except error.BundleValueError, exc:
1014 1023 raise util.Abort('missing support for %s' % exc)
1015 1024
1016 1025 if pullop.fetch:
1017 1026 results = [cg['return'] for cg in op.records['changegroup']]
1018 1027 pullop.cgresult = changegroup.combineresults(results)
1019 1028
1020 1029 # processing phases change
1021 1030 for namespace, value in op.records['listkeys']:
1022 1031 if namespace == 'phases':
1023 1032 _pullapplyphases(pullop, value)
1024 1033
1025 1034 # processing bookmark update
1026 1035 for namespace, value in op.records['listkeys']:
1027 1036 if namespace == 'bookmarks':
1028 1037 pullop.remotebookmarks = value
1029 1038 _pullbookmarks(pullop)
1030 1039
1031 1040 def _pullbundle2extraprepare(pullop, kwargs):
1032 1041 """hook function so that extensions can extend the getbundle call"""
1033 1042 pass
1034 1043
1035 1044 def _pullchangeset(pullop):
1036 1045 """pull changeset from unbundle into the local repo"""
1037 1046 # We delay the open of the transaction as late as possible so we
1038 1047 # don't open transaction for nothing or you break future useful
1039 1048 # rollback call
1040 1049 if 'changegroup' in pullop.stepsdone:
1041 1050 return
1042 1051 pullop.stepsdone.add('changegroup')
1043 1052 if not pullop.fetch:
1044 1053 pullop.repo.ui.status(_("no changes found\n"))
1045 1054 pullop.cgresult = 0
1046 1055 return
1047 1056 pullop.gettransaction()
1048 1057 if pullop.heads is None and list(pullop.common) == [nullid]:
1049 1058 pullop.repo.ui.status(_("requesting all changes\n"))
1050 1059 elif pullop.heads is None and pullop.remote.capable('changegroupsubset'):
1051 1060 # issue1320, avoid a race if remote changed after discovery
1052 1061 pullop.heads = pullop.rheads
1053 1062
1054 1063 if pullop.remote.capable('getbundle'):
1055 1064 # TODO: get bundlecaps from remote
1056 1065 cg = pullop.remote.getbundle('pull', common=pullop.common,
1057 1066 heads=pullop.heads or pullop.rheads)
1058 1067 elif pullop.heads is None:
1059 1068 cg = pullop.remote.changegroup(pullop.fetch, 'pull')
1060 1069 elif not pullop.remote.capable('changegroupsubset'):
1061 1070 raise util.Abort(_("partial pull cannot be done because "
1062 1071 "other repository doesn't support "
1063 1072 "changegroupsubset."))
1064 1073 else:
1065 1074 cg = pullop.remote.changegroupsubset(pullop.fetch, pullop.heads, 'pull')
1066 1075 pullop.cgresult = changegroup.addchangegroup(pullop.repo, cg, 'pull',
1067 1076 pullop.remote.url())
1068 1077
1069 1078 def _pullphase(pullop):
1070 1079 # Get remote phases data from remote
1071 1080 if 'phases' in pullop.stepsdone:
1072 1081 return
1073 1082 remotephases = pullop.remote.listkeys('phases')
1074 1083 _pullapplyphases(pullop, remotephases)
1075 1084
1076 1085 def _pullapplyphases(pullop, remotephases):
1077 1086 """apply phase movement from observed remote state"""
1078 1087 if 'phases' in pullop.stepsdone:
1079 1088 return
1080 1089 pullop.stepsdone.add('phases')
1081 1090 publishing = bool(remotephases.get('publishing', False))
1082 1091 if remotephases and not publishing:
1083 1092 # remote is new and unpublishing
1084 1093 pheads, _dr = phases.analyzeremotephases(pullop.repo,
1085 1094 pullop.pulledsubset,
1086 1095 remotephases)
1087 1096 dheads = pullop.pulledsubset
1088 1097 else:
1089 1098 # Remote is old or publishing all common changesets
1090 1099 # should be seen as public
1091 1100 pheads = pullop.pulledsubset
1092 1101 dheads = []
1093 1102 unfi = pullop.repo.unfiltered()
1094 1103 phase = unfi._phasecache.phase
1095 1104 rev = unfi.changelog.nodemap.get
1096 1105 public = phases.public
1097 1106 draft = phases.draft
1098 1107
1099 1108 # exclude changesets already public locally and update the others
1100 1109 pheads = [pn for pn in pheads if phase(unfi, rev(pn)) > public]
1101 1110 if pheads:
1102 1111 tr = pullop.gettransaction()
1103 1112 phases.advanceboundary(pullop.repo, tr, public, pheads)
1104 1113
1105 1114 # exclude changesets already draft locally and update the others
1106 1115 dheads = [pn for pn in dheads if phase(unfi, rev(pn)) > draft]
1107 1116 if dheads:
1108 1117 tr = pullop.gettransaction()
1109 1118 phases.advanceboundary(pullop.repo, tr, draft, dheads)
1110 1119
1111 1120 def _pullbookmarks(pullop):
1112 1121 """process the remote bookmark information to update the local one"""
1113 1122 if 'bookmarks' in pullop.stepsdone:
1114 1123 return
1115 1124 pullop.stepsdone.add('bookmarks')
1116 1125 repo = pullop.repo
1117 1126 remotebookmarks = pullop.remotebookmarks
1118 1127 bookmod.updatefromremote(repo.ui, repo, remotebookmarks,
1119 1128 pullop.remote.url(),
1120 1129 pullop.gettransaction,
1121 1130 explicit=pullop.explicitbookmarks)
1122 1131
1123 1132 def _pullobsolete(pullop):
1124 1133 """utility function to pull obsolete markers from a remote
1125 1134
1126 1135 The `gettransaction` is function that return the pull transaction, creating
1127 1136 one if necessary. We return the transaction to inform the calling code that
1128 1137 a new transaction have been created (when applicable).
1129 1138
1130 1139 Exists mostly to allow overriding for experimentation purpose"""
1131 1140 if 'obsmarkers' in pullop.stepsdone:
1132 1141 return
1133 1142 pullop.stepsdone.add('obsmarkers')
1134 1143 tr = None
1135 1144 if obsolete.isenabled(pullop.repo, obsolete.exchangeopt):
1136 1145 pullop.repo.ui.debug('fetching remote obsolete markers\n')
1137 1146 remoteobs = pullop.remote.listkeys('obsolete')
1138 1147 if 'dump0' in remoteobs:
1139 1148 tr = pullop.gettransaction()
1140 1149 for key in sorted(remoteobs, reverse=True):
1141 1150 if key.startswith('dump'):
1142 1151 data = base85.b85decode(remoteobs[key])
1143 1152 pullop.repo.obsstore.mergemarkers(tr, data)
1144 1153 pullop.repo.invalidatevolatilesets()
1145 1154 return tr
1146 1155
1147 1156 def caps20to10(repo):
1148 1157 """return a set with appropriate options to use bundle20 during getbundle"""
1149 1158 caps = set(['HG20'])
1150 1159 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
1151 1160 caps.add('bundle2=' + urllib.quote(capsblob))
1152 1161 return caps
1153 1162
1154 1163 # List of names of steps to perform for a bundle2 for getbundle, order matters.
1155 1164 getbundle2partsorder = []
1156 1165
1157 1166 # Mapping between step name and function
1158 1167 #
1159 1168 # This exists to help extensions wrap steps if necessary
1160 1169 getbundle2partsmapping = {}
1161 1170
1162 1171 def getbundle2partsgenerator(stepname, idx=None):
1163 1172 """decorator for function generating bundle2 part for getbundle
1164 1173
1165 1174 The function is added to the step -> function mapping and appended to the
1166 1175 list of steps. Beware that decorated functions will be added in order
1167 1176 (this may matter).
1168 1177
1169 1178 You can only use this decorator for new steps, if you want to wrap a step
1170 1179 from an extension, attack the getbundle2partsmapping dictionary directly."""
1171 1180 def dec(func):
1172 1181 assert stepname not in getbundle2partsmapping
1173 1182 getbundle2partsmapping[stepname] = func
1174 1183 if idx is None:
1175 1184 getbundle2partsorder.append(stepname)
1176 1185 else:
1177 1186 getbundle2partsorder.insert(idx, stepname)
1178 1187 return func
1179 1188 return dec
1180 1189
1181 1190 def getbundle(repo, source, heads=None, common=None, bundlecaps=None,
1182 1191 **kwargs):
1183 1192 """return a full bundle (with potentially multiple kind of parts)
1184 1193
1185 1194 Could be a bundle HG10 or a bundle HG20 depending on bundlecaps
1186 1195 passed. For now, the bundle can contain only changegroup, but this will
1187 1196 changes when more part type will be available for bundle2.
1188 1197
1189 1198 This is different from changegroup.getchangegroup that only returns an HG10
1190 1199 changegroup bundle. They may eventually get reunited in the future when we
1191 1200 have a clearer idea of the API we what to query different data.
1192 1201
1193 1202 The implementation is at a very early stage and will get massive rework
1194 1203 when the API of bundle is refined.
1195 1204 """
1196 1205 # bundle10 case
1197 1206 usebundle2 = False
1198 1207 if bundlecaps is not None:
1199 1208 usebundle2 = any((cap.startswith('HG2') for cap in bundlecaps))
1200 1209 if not usebundle2:
1201 1210 if bundlecaps and not kwargs.get('cg', True):
1202 1211 raise ValueError(_('request for bundle10 must include changegroup'))
1203 1212
1204 1213 if kwargs:
1205 1214 raise ValueError(_('unsupported getbundle arguments: %s')
1206 1215 % ', '.join(sorted(kwargs.keys())))
1207 1216 return changegroup.getchangegroup(repo, source, heads=heads,
1208 1217 common=common, bundlecaps=bundlecaps)
1209 1218
1210 1219 # bundle20 case
1211 1220 b2caps = {}
1212 1221 for bcaps in bundlecaps:
1213 1222 if bcaps.startswith('bundle2='):
1214 1223 blob = urllib.unquote(bcaps[len('bundle2='):])
1215 1224 b2caps.update(bundle2.decodecaps(blob))
1216 1225 bundler = bundle2.bundle20(repo.ui, b2caps)
1217 1226
1218 1227 kwargs['heads'] = heads
1219 1228 kwargs['common'] = common
1220 1229
1221 1230 for name in getbundle2partsorder:
1222 1231 func = getbundle2partsmapping[name]
1223 1232 func(bundler, repo, source, bundlecaps=bundlecaps, b2caps=b2caps,
1224 1233 **kwargs)
1225 1234
1226 1235 return util.chunkbuffer(bundler.getchunks())
1227 1236
1228 1237 @getbundle2partsgenerator('changegroup')
1229 1238 def _getbundlechangegrouppart(bundler, repo, source, bundlecaps=None,
1230 1239 b2caps=None, heads=None, common=None, **kwargs):
1231 1240 """add a changegroup part to the requested bundle"""
1232 1241 cg = None
1233 1242 if kwargs.get('cg', True):
1234 1243 # build changegroup bundle here.
1235 1244 version = None
1236 1245 cgversions = b2caps.get('changegroup')
1237 1246 if not cgversions: # 3.1 and 3.2 ship with an empty value
1238 1247 cg = changegroup.getchangegroupraw(repo, source, heads=heads,
1239 1248 common=common,
1240 1249 bundlecaps=bundlecaps)
1241 1250 else:
1242 1251 cgversions = [v for v in cgversions if v in changegroup.packermap]
1243 1252 if not cgversions:
1244 1253 raise ValueError(_('no common changegroup version'))
1245 1254 version = max(cgversions)
1246 1255 cg = changegroup.getchangegroupraw(repo, source, heads=heads,
1247 1256 common=common,
1248 1257 bundlecaps=bundlecaps,
1249 1258 version=version)
1250 1259
1251 1260 if cg:
1252 1261 part = bundler.newpart('changegroup', data=cg)
1253 1262 if version is not None:
1254 1263 part.addparam('version', version)
1255 1264
1256 1265 @getbundle2partsgenerator('listkeys')
1257 1266 def _getbundlelistkeysparts(bundler, repo, source, bundlecaps=None,
1258 1267 b2caps=None, **kwargs):
1259 1268 """add parts containing listkeys namespaces to the requested bundle"""
1260 1269 listkeys = kwargs.get('listkeys', ())
1261 1270 for namespace in listkeys:
1262 1271 part = bundler.newpart('listkeys')
1263 1272 part.addparam('namespace', namespace)
1264 1273 keys = repo.listkeys(namespace).items()
1265 1274 part.data = pushkey.encodekeys(keys)
1266 1275
1267 1276 @getbundle2partsgenerator('obsmarkers')
1268 1277 def _getbundleobsmarkerpart(bundler, repo, source, bundlecaps=None,
1269 1278 b2caps=None, heads=None, **kwargs):
1270 1279 """add an obsolescence markers part to the requested bundle"""
1271 1280 if kwargs.get('obsmarkers', False):
1272 1281 if heads is None:
1273 1282 heads = repo.heads()
1274 1283 subset = [c.node() for c in repo.set('::%ln', heads)]
1275 1284 markers = repo.obsstore.relevantmarkers(subset)
1276 1285 markers = sorted(markers)
1277 1286 buildobsmarkerspart(bundler, markers)
1278 1287
1279 1288 def check_heads(repo, their_heads, context):
1280 1289 """check if the heads of a repo have been modified
1281 1290
1282 1291 Used by peer for unbundling.
1283 1292 """
1284 1293 heads = repo.heads()
1285 1294 heads_hash = util.sha1(''.join(sorted(heads))).digest()
1286 1295 if not (their_heads == ['force'] or their_heads == heads or
1287 1296 their_heads == ['hashed', heads_hash]):
1288 1297 # someone else committed/pushed/unbundled while we
1289 1298 # were transferring data
1290 1299 raise error.PushRaced('repository changed while %s - '
1291 1300 'please try again' % context)
1292 1301
1293 1302 def unbundle(repo, cg, heads, source, url):
1294 1303 """Apply a bundle to a repo.
1295 1304
1296 1305 this function makes sure the repo is locked during the application and have
1297 1306 mechanism to check that no push race occurred between the creation of the
1298 1307 bundle and its application.
1299 1308
1300 1309 If the push was raced as PushRaced exception is raised."""
1301 1310 r = 0
1302 1311 # need a transaction when processing a bundle2 stream
1303 1312 wlock = lock = tr = None
1304 1313 recordout = None
1305 1314 # quick fix for output mismatch with bundle2 in 3.4
1306 1315 captureoutput = repo.ui.configbool('experimental', 'bundle2-output-capture',
1307 1316 False)
1308 1317 if url.startswith('remote:'):
1309 1318 captureoutput = True
1310 1319 try:
1311 1320 check_heads(repo, heads, 'uploading changes')
1312 1321 # push can proceed
1313 1322 if util.safehasattr(cg, 'params'):
1314 1323 r = None
1315 1324 try:
1316 1325 wlock = repo.wlock()
1317 1326 lock = repo.lock()
1318 1327 tr = repo.transaction(source)
1319 1328 tr.hookargs['source'] = source
1320 1329 tr.hookargs['url'] = url
1321 1330 tr.hookargs['bundle2'] = '1'
1322 1331 op = bundle2.bundleoperation(repo, lambda: tr,
1323 1332 captureoutput=captureoutput)
1324 1333 try:
1325 1334 r = bundle2.processbundle(repo, cg, op=op)
1326 1335 finally:
1327 1336 r = op.reply
1328 1337 if captureoutput and r is not None:
1329 1338 repo.ui.pushbuffer(error=True, subproc=True)
1330 1339 def recordout(output):
1331 1340 r.newpart('output', data=output, mandatory=False)
1332 1341 tr.close()
1333 1342 except BaseException, exc:
1334 1343 exc.duringunbundle2 = True
1335 1344 if captureoutput and r is not None:
1336 1345 parts = exc._bundle2salvagedoutput = r.salvageoutput()
1337 1346 def recordout(output):
1338 1347 part = bundle2.bundlepart('output', data=output,
1339 1348 mandatory=False)
1340 1349 parts.append(part)
1341 1350 raise
1342 1351 else:
1343 1352 lock = repo.lock()
1344 1353 r = changegroup.addchangegroup(repo, cg, source, url)
1345 1354 finally:
1346 1355 lockmod.release(tr, lock, wlock)
1347 1356 if recordout is not None:
1348 1357 recordout(repo.ui.popbuffer())
1349 1358 return r
1350 1359
1351 1360 # This is it's own function so extensions can override it.
1352 1361 def _walkstreamfiles(repo):
1353 1362 return repo.store.walk()
1354 1363
1355 1364 def generatestreamclone(repo):
1356 1365 """Emit content for a streaming clone.
1357 1366
1358 1367 This is a generator of raw chunks that constitute a streaming clone.
1359 1368
1360 1369 The stream begins with a line of 2 space-delimited integers containing the
1361 1370 number of entries and total bytes size.
1362 1371
1363 1372 Next, are N entries for each file being transferred. Each file entry starts
1364 1373 as a line with the file name and integer size delimited by a null byte.
1365 1374 The raw file data follows. Following the raw file data is the next file
1366 1375 entry, or EOF.
1367 1376
1368 1377 When used on the wire protocol, an additional line indicating protocol
1369 1378 success will be prepended to the stream. This function is not responsible
1370 1379 for adding it.
1371 1380
1372 1381 This function will obtain a repository lock to ensure a consistent view of
1373 1382 the store is captured. It therefore may raise LockError.
1374 1383 """
1375 1384 entries = []
1376 1385 total_bytes = 0
1377 1386 # Get consistent snapshot of repo, lock during scan.
1378 1387 lock = repo.lock()
1379 1388 try:
1380 1389 repo.ui.debug('scanning\n')
1381 1390 for name, ename, size in _walkstreamfiles(repo):
1382 1391 if size:
1383 1392 entries.append((name, size))
1384 1393 total_bytes += size
1385 1394 finally:
1386 1395 lock.release()
1387 1396
1388 1397 repo.ui.debug('%d files, %d bytes to transfer\n' %
1389 1398 (len(entries), total_bytes))
1390 1399 yield '%d %d\n' % (len(entries), total_bytes)
1391 1400
1392 1401 sopener = repo.svfs
1393 1402 oldaudit = sopener.mustaudit
1394 1403 debugflag = repo.ui.debugflag
1395 1404 sopener.mustaudit = False
1396 1405
1397 1406 try:
1398 1407 for name, size in entries:
1399 1408 if debugflag:
1400 1409 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
1401 1410 # partially encode name over the wire for backwards compat
1402 1411 yield '%s\0%d\n' % (store.encodedir(name), size)
1403 1412 if size <= 65536:
1404 1413 fp = sopener(name)
1405 1414 try:
1406 1415 data = fp.read(size)
1407 1416 finally:
1408 1417 fp.close()
1409 1418 yield data
1410 1419 else:
1411 1420 for chunk in util.filechunkiter(sopener(name), limit=size):
1412 1421 yield chunk
1413 1422 finally:
1414 1423 sopener.mustaudit = oldaudit
1415 1424
1416 1425 def consumestreamclone(repo, fp):
1417 1426 """Apply the contents from a streaming clone file.
1418 1427
1419 1428 This takes the output from "streamout" and applies it to the specified
1420 1429 repository.
1421 1430
1422 1431 Like "streamout," the status line added by the wire protocol is not handled
1423 1432 by this function.
1424 1433 """
1425 1434 lock = repo.lock()
1426 1435 try:
1427 1436 repo.ui.status(_('streaming all changes\n'))
1428 1437 l = fp.readline()
1429 1438 try:
1430 1439 total_files, total_bytes = map(int, l.split(' ', 1))
1431 1440 except (ValueError, TypeError):
1432 1441 raise error.ResponseError(
1433 1442 _('unexpected response from remote server:'), l)
1434 1443 repo.ui.status(_('%d files to transfer, %s of data\n') %
1435 1444 (total_files, util.bytecount(total_bytes)))
1436 1445 handled_bytes = 0
1437 1446 repo.ui.progress(_('clone'), 0, total=total_bytes)
1438 1447 start = time.time()
1439 1448
1440 1449 tr = repo.transaction(_('clone'))
1441 1450 try:
1442 1451 for i in xrange(total_files):
1443 1452 # XXX doesn't support '\n' or '\r' in filenames
1444 1453 l = fp.readline()
1445 1454 try:
1446 1455 name, size = l.split('\0', 1)
1447 1456 size = int(size)
1448 1457 except (ValueError, TypeError):
1449 1458 raise error.ResponseError(
1450 1459 _('unexpected response from remote server:'), l)
1451 1460 if repo.ui.debugflag:
1452 1461 repo.ui.debug('adding %s (%s)\n' %
1453 1462 (name, util.bytecount(size)))
1454 1463 # for backwards compat, name was partially encoded
1455 1464 ofp = repo.svfs(store.decodedir(name), 'w')
1456 1465 for chunk in util.filechunkiter(fp, limit=size):
1457 1466 handled_bytes += len(chunk)
1458 1467 repo.ui.progress(_('clone'), handled_bytes,
1459 1468 total=total_bytes)
1460 1469 ofp.write(chunk)
1461 1470 ofp.close()
1462 1471 tr.close()
1463 1472 finally:
1464 1473 tr.release()
1465 1474
1466 1475 # Writing straight to files circumvented the inmemory caches
1467 1476 repo.invalidate()
1468 1477
1469 1478 elapsed = time.time() - start
1470 1479 if elapsed <= 0:
1471 1480 elapsed = 0.001
1472 1481 repo.ui.progress(_('clone'), None)
1473 1482 repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') %
1474 1483 (util.bytecount(total_bytes), elapsed,
1475 1484 util.bytecount(total_bytes / elapsed)))
1476 1485 finally:
1477 1486 lock.release()
General Comments 0
You need to be logged in to leave comments. Login now