##// END OF EJS Templates
bundle2: pull bookmark the old way if no bundle2 listkeys support (issue4701)...
Pierre-Yves David -
r25479:f00a63a4 default
parent child Browse files
Show More
@@ -1,1543 +1,1547 b''
1 1 # exchange.py - utility to exchange data between repos.
2 2 #
3 3 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 import time
9 9 from i18n import _
10 10 from node import hex, nullid
11 11 import errno, urllib
12 12 import util, scmutil, changegroup, base85, error, store
13 13 import discovery, phases, obsolete, bookmarks as bookmod, bundle2, pushkey
14 14 import lock as lockmod
15 15 import tags
16 16
17 17 def readbundle(ui, fh, fname, vfs=None):
18 18 header = changegroup.readexactly(fh, 4)
19 19
20 20 alg = None
21 21 if not fname:
22 22 fname = "stream"
23 23 if not header.startswith('HG') and header.startswith('\0'):
24 24 fh = changegroup.headerlessfixup(fh, header)
25 25 header = "HG10"
26 26 alg = 'UN'
27 27 elif vfs:
28 28 fname = vfs.join(fname)
29 29
30 30 magic, version = header[0:2], header[2:4]
31 31
32 32 if magic != 'HG':
33 33 raise util.Abort(_('%s: not a Mercurial bundle') % fname)
34 34 if version == '10':
35 35 if alg is None:
36 36 alg = changegroup.readexactly(fh, 2)
37 37 return changegroup.cg1unpacker(fh, alg)
38 38 elif version.startswith('2'):
39 39 return bundle2.getunbundler(ui, fh, header=magic + version)
40 40 else:
41 41 raise util.Abort(_('%s: unknown bundle version %s') % (fname, version))
42 42
43 43 def buildobsmarkerspart(bundler, markers):
44 44 """add an obsmarker part to the bundler with <markers>
45 45
46 46 No part is created if markers is empty.
47 47 Raises ValueError if the bundler doesn't support any known obsmarker format.
48 48 """
49 49 if markers:
50 50 remoteversions = bundle2.obsmarkersversion(bundler.capabilities)
51 51 version = obsolete.commonversion(remoteversions)
52 52 if version is None:
53 53 raise ValueError('bundler do not support common obsmarker format')
54 54 stream = obsolete.encodemarkers(markers, True, version=version)
55 55 return bundler.newpart('obsmarkers', data=stream)
56 56 return None
57 57
58 58 def _canusebundle2(op):
59 59 """return true if a pull/push can use bundle2
60 60
61 61 Feel free to nuke this function when we drop the experimental option"""
62 62 return (op.repo.ui.configbool('experimental', 'bundle2-exp', True)
63 63 and op.remote.capable('bundle2'))
64 64
65 65
66 66 class pushoperation(object):
67 67 """A object that represent a single push operation
68 68
69 69 It purpose is to carry push related state and very common operation.
70 70
71 71 A new should be created at the beginning of each push and discarded
72 72 afterward.
73 73 """
74 74
75 75 def __init__(self, repo, remote, force=False, revs=None, newbranch=False,
76 76 bookmarks=()):
77 77 # repo we push from
78 78 self.repo = repo
79 79 self.ui = repo.ui
80 80 # repo we push to
81 81 self.remote = remote
82 82 # force option provided
83 83 self.force = force
84 84 # revs to be pushed (None is "all")
85 85 self.revs = revs
86 86 # bookmark explicitly pushed
87 87 self.bookmarks = bookmarks
88 88 # allow push of new branch
89 89 self.newbranch = newbranch
90 90 # did a local lock get acquired?
91 91 self.locallocked = None
92 92 # step already performed
93 93 # (used to check what steps have been already performed through bundle2)
94 94 self.stepsdone = set()
95 95 # Integer version of the changegroup push result
96 96 # - None means nothing to push
97 97 # - 0 means HTTP error
98 98 # - 1 means we pushed and remote head count is unchanged *or*
99 99 # we have outgoing changesets but refused to push
100 100 # - other values as described by addchangegroup()
101 101 self.cgresult = None
102 102 # Boolean value for the bookmark push
103 103 self.bkresult = None
104 104 # discover.outgoing object (contains common and outgoing data)
105 105 self.outgoing = None
106 106 # all remote heads before the push
107 107 self.remoteheads = None
108 108 # testable as a boolean indicating if any nodes are missing locally.
109 109 self.incoming = None
110 110 # phases changes that must be pushed along side the changesets
111 111 self.outdatedphases = None
112 112 # phases changes that must be pushed if changeset push fails
113 113 self.fallbackoutdatedphases = None
114 114 # outgoing obsmarkers
115 115 self.outobsmarkers = set()
116 116 # outgoing bookmarks
117 117 self.outbookmarks = []
118 118 # transaction manager
119 119 self.trmanager = None
120 120
121 121 @util.propertycache
122 122 def futureheads(self):
123 123 """future remote heads if the changeset push succeeds"""
124 124 return self.outgoing.missingheads
125 125
126 126 @util.propertycache
127 127 def fallbackheads(self):
128 128 """future remote heads if the changeset push fails"""
129 129 if self.revs is None:
130 130 # not target to push, all common are relevant
131 131 return self.outgoing.commonheads
132 132 unfi = self.repo.unfiltered()
133 133 # I want cheads = heads(::missingheads and ::commonheads)
134 134 # (missingheads is revs with secret changeset filtered out)
135 135 #
136 136 # This can be expressed as:
137 137 # cheads = ( (missingheads and ::commonheads)
138 138 # + (commonheads and ::missingheads))"
139 139 # )
140 140 #
141 141 # while trying to push we already computed the following:
142 142 # common = (::commonheads)
143 143 # missing = ((commonheads::missingheads) - commonheads)
144 144 #
145 145 # We can pick:
146 146 # * missingheads part of common (::commonheads)
147 147 common = set(self.outgoing.common)
148 148 nm = self.repo.changelog.nodemap
149 149 cheads = [node for node in self.revs if nm[node] in common]
150 150 # and
151 151 # * commonheads parents on missing
152 152 revset = unfi.set('%ln and parents(roots(%ln))',
153 153 self.outgoing.commonheads,
154 154 self.outgoing.missing)
155 155 cheads.extend(c.node() for c in revset)
156 156 return cheads
157 157
158 158 @property
159 159 def commonheads(self):
160 160 """set of all common heads after changeset bundle push"""
161 161 if self.cgresult:
162 162 return self.futureheads
163 163 else:
164 164 return self.fallbackheads
165 165
166 166 # mapping of message used when pushing bookmark
167 167 bookmsgmap = {'update': (_("updating bookmark %s\n"),
168 168 _('updating bookmark %s failed!\n')),
169 169 'export': (_("exporting bookmark %s\n"),
170 170 _('exporting bookmark %s failed!\n')),
171 171 'delete': (_("deleting remote bookmark %s\n"),
172 172 _('deleting remote bookmark %s failed!\n')),
173 173 }
174 174
175 175
176 176 def push(repo, remote, force=False, revs=None, newbranch=False, bookmarks=()):
177 177 '''Push outgoing changesets (limited by revs) from a local
178 178 repository to remote. Return an integer:
179 179 - None means nothing to push
180 180 - 0 means HTTP error
181 181 - 1 means we pushed and remote head count is unchanged *or*
182 182 we have outgoing changesets but refused to push
183 183 - other values as described by addchangegroup()
184 184 '''
185 185 pushop = pushoperation(repo, remote, force, revs, newbranch, bookmarks)
186 186 if pushop.remote.local():
187 187 missing = (set(pushop.repo.requirements)
188 188 - pushop.remote.local().supported)
189 189 if missing:
190 190 msg = _("required features are not"
191 191 " supported in the destination:"
192 192 " %s") % (', '.join(sorted(missing)))
193 193 raise util.Abort(msg)
194 194
195 195 # there are two ways to push to remote repo:
196 196 #
197 197 # addchangegroup assumes local user can lock remote
198 198 # repo (local filesystem, old ssh servers).
199 199 #
200 200 # unbundle assumes local user cannot lock remote repo (new ssh
201 201 # servers, http servers).
202 202
203 203 if not pushop.remote.canpush():
204 204 raise util.Abort(_("destination does not support push"))
205 205 # get local lock as we might write phase data
206 206 localwlock = locallock = None
207 207 try:
208 208 # bundle2 push may receive a reply bundle touching bookmarks or other
209 209 # things requiring the wlock. Take it now to ensure proper ordering.
210 210 maypushback = pushop.ui.configbool('experimental', 'bundle2.pushback')
211 211 if _canusebundle2(pushop) and maypushback:
212 212 localwlock = pushop.repo.wlock()
213 213 locallock = pushop.repo.lock()
214 214 pushop.locallocked = True
215 215 except IOError, err:
216 216 pushop.locallocked = False
217 217 if err.errno != errno.EACCES:
218 218 raise
219 219 # source repo cannot be locked.
220 220 # We do not abort the push, but just disable the local phase
221 221 # synchronisation.
222 222 msg = 'cannot lock source repository: %s\n' % err
223 223 pushop.ui.debug(msg)
224 224 try:
225 225 if pushop.locallocked:
226 226 pushop.trmanager = transactionmanager(repo,
227 227 'push-response',
228 228 pushop.remote.url())
229 229 pushop.repo.checkpush(pushop)
230 230 lock = None
231 231 unbundle = pushop.remote.capable('unbundle')
232 232 if not unbundle:
233 233 lock = pushop.remote.lock()
234 234 try:
235 235 _pushdiscovery(pushop)
236 236 if _canusebundle2(pushop):
237 237 _pushbundle2(pushop)
238 238 _pushchangeset(pushop)
239 239 _pushsyncphase(pushop)
240 240 _pushobsolete(pushop)
241 241 _pushbookmark(pushop)
242 242 finally:
243 243 if lock is not None:
244 244 lock.release()
245 245 if pushop.trmanager:
246 246 pushop.trmanager.close()
247 247 finally:
248 248 if pushop.trmanager:
249 249 pushop.trmanager.release()
250 250 if locallock is not None:
251 251 locallock.release()
252 252 if localwlock is not None:
253 253 localwlock.release()
254 254
255 255 return pushop
256 256
257 257 # list of steps to perform discovery before push
258 258 pushdiscoveryorder = []
259 259
260 260 # Mapping between step name and function
261 261 #
262 262 # This exists to help extensions wrap steps if necessary
263 263 pushdiscoverymapping = {}
264 264
265 265 def pushdiscovery(stepname):
266 266 """decorator for function performing discovery before push
267 267
268 268 The function is added to the step -> function mapping and appended to the
269 269 list of steps. Beware that decorated function will be added in order (this
270 270 may matter).
271 271
272 272 You can only use this decorator for a new step, if you want to wrap a step
273 273 from an extension, change the pushdiscovery dictionary directly."""
274 274 def dec(func):
275 275 assert stepname not in pushdiscoverymapping
276 276 pushdiscoverymapping[stepname] = func
277 277 pushdiscoveryorder.append(stepname)
278 278 return func
279 279 return dec
280 280
281 281 def _pushdiscovery(pushop):
282 282 """Run all discovery steps"""
283 283 for stepname in pushdiscoveryorder:
284 284 step = pushdiscoverymapping[stepname]
285 285 step(pushop)
286 286
287 287 @pushdiscovery('changeset')
288 288 def _pushdiscoverychangeset(pushop):
289 289 """discover the changeset that need to be pushed"""
290 290 fci = discovery.findcommonincoming
291 291 commoninc = fci(pushop.repo, pushop.remote, force=pushop.force)
292 292 common, inc, remoteheads = commoninc
293 293 fco = discovery.findcommonoutgoing
294 294 outgoing = fco(pushop.repo, pushop.remote, onlyheads=pushop.revs,
295 295 commoninc=commoninc, force=pushop.force)
296 296 pushop.outgoing = outgoing
297 297 pushop.remoteheads = remoteheads
298 298 pushop.incoming = inc
299 299
300 300 @pushdiscovery('phase')
301 301 def _pushdiscoveryphase(pushop):
302 302 """discover the phase that needs to be pushed
303 303
304 304 (computed for both success and failure case for changesets push)"""
305 305 outgoing = pushop.outgoing
306 306 unfi = pushop.repo.unfiltered()
307 307 remotephases = pushop.remote.listkeys('phases')
308 308 publishing = remotephases.get('publishing', False)
309 309 if (pushop.ui.configbool('ui', '_usedassubrepo', False)
310 310 and remotephases # server supports phases
311 311 and not pushop.outgoing.missing # no changesets to be pushed
312 312 and publishing):
313 313 # When:
314 314 # - this is a subrepo push
315 315 # - and remote support phase
316 316 # - and no changeset are to be pushed
317 317 # - and remote is publishing
318 318 # We may be in issue 3871 case!
319 319 # We drop the possible phase synchronisation done by
320 320 # courtesy to publish changesets possibly locally draft
321 321 # on the remote.
322 322 remotephases = {'publishing': 'True'}
323 323 ana = phases.analyzeremotephases(pushop.repo,
324 324 pushop.fallbackheads,
325 325 remotephases)
326 326 pheads, droots = ana
327 327 extracond = ''
328 328 if not publishing:
329 329 extracond = ' and public()'
330 330 revset = 'heads((%%ln::%%ln) %s)' % extracond
331 331 # Get the list of all revs draft on remote by public here.
332 332 # XXX Beware that revset break if droots is not strictly
333 333 # XXX root we may want to ensure it is but it is costly
334 334 fallback = list(unfi.set(revset, droots, pushop.fallbackheads))
335 335 if not outgoing.missing:
336 336 future = fallback
337 337 else:
338 338 # adds changeset we are going to push as draft
339 339 #
340 340 # should not be necessary for publishing server, but because of an
341 341 # issue fixed in xxxxx we have to do it anyway.
342 342 fdroots = list(unfi.set('roots(%ln + %ln::)',
343 343 outgoing.missing, droots))
344 344 fdroots = [f.node() for f in fdroots]
345 345 future = list(unfi.set(revset, fdroots, pushop.futureheads))
346 346 pushop.outdatedphases = future
347 347 pushop.fallbackoutdatedphases = fallback
348 348
349 349 @pushdiscovery('obsmarker')
350 350 def _pushdiscoveryobsmarkers(pushop):
351 351 if (obsolete.isenabled(pushop.repo, obsolete.exchangeopt)
352 352 and pushop.repo.obsstore
353 353 and 'obsolete' in pushop.remote.listkeys('namespaces')):
354 354 repo = pushop.repo
355 355 # very naive computation, that can be quite expensive on big repo.
356 356 # However: evolution is currently slow on them anyway.
357 357 nodes = (c.node() for c in repo.set('::%ln', pushop.futureheads))
358 358 pushop.outobsmarkers = pushop.repo.obsstore.relevantmarkers(nodes)
359 359
360 360 @pushdiscovery('bookmarks')
361 361 def _pushdiscoverybookmarks(pushop):
362 362 ui = pushop.ui
363 363 repo = pushop.repo.unfiltered()
364 364 remote = pushop.remote
365 365 ui.debug("checking for updated bookmarks\n")
366 366 ancestors = ()
367 367 if pushop.revs:
368 368 revnums = map(repo.changelog.rev, pushop.revs)
369 369 ancestors = repo.changelog.ancestors(revnums, inclusive=True)
370 370 remotebookmark = remote.listkeys('bookmarks')
371 371
372 372 explicit = set(pushop.bookmarks)
373 373
374 374 comp = bookmod.compare(repo, repo._bookmarks, remotebookmark, srchex=hex)
375 375 addsrc, adddst, advsrc, advdst, diverge, differ, invalid, same = comp
376 376 for b, scid, dcid in advsrc:
377 377 if b in explicit:
378 378 explicit.remove(b)
379 379 if not ancestors or repo[scid].rev() in ancestors:
380 380 pushop.outbookmarks.append((b, dcid, scid))
381 381 # search added bookmark
382 382 for b, scid, dcid in addsrc:
383 383 if b in explicit:
384 384 explicit.remove(b)
385 385 pushop.outbookmarks.append((b, '', scid))
386 386 # search for overwritten bookmark
387 387 for b, scid, dcid in advdst + diverge + differ:
388 388 if b in explicit:
389 389 explicit.remove(b)
390 390 pushop.outbookmarks.append((b, dcid, scid))
391 391 # search for bookmark to delete
392 392 for b, scid, dcid in adddst:
393 393 if b in explicit:
394 394 explicit.remove(b)
395 395 # treat as "deleted locally"
396 396 pushop.outbookmarks.append((b, dcid, ''))
397 397 # identical bookmarks shouldn't get reported
398 398 for b, scid, dcid in same:
399 399 if b in explicit:
400 400 explicit.remove(b)
401 401
402 402 if explicit:
403 403 explicit = sorted(explicit)
404 404 # we should probably list all of them
405 405 ui.warn(_('bookmark %s does not exist on the local '
406 406 'or remote repository!\n') % explicit[0])
407 407 pushop.bkresult = 2
408 408
409 409 pushop.outbookmarks.sort()
410 410
411 411 def _pushcheckoutgoing(pushop):
412 412 outgoing = pushop.outgoing
413 413 unfi = pushop.repo.unfiltered()
414 414 if not outgoing.missing:
415 415 # nothing to push
416 416 scmutil.nochangesfound(unfi.ui, unfi, outgoing.excluded)
417 417 return False
418 418 # something to push
419 419 if not pushop.force:
420 420 # if repo.obsstore == False --> no obsolete
421 421 # then, save the iteration
422 422 if unfi.obsstore:
423 423 # this message are here for 80 char limit reason
424 424 mso = _("push includes obsolete changeset: %s!")
425 425 mst = {"unstable": _("push includes unstable changeset: %s!"),
426 426 "bumped": _("push includes bumped changeset: %s!"),
427 427 "divergent": _("push includes divergent changeset: %s!")}
428 428 # If we are to push if there is at least one
429 429 # obsolete or unstable changeset in missing, at
430 430 # least one of the missinghead will be obsolete or
431 431 # unstable. So checking heads only is ok
432 432 for node in outgoing.missingheads:
433 433 ctx = unfi[node]
434 434 if ctx.obsolete():
435 435 raise util.Abort(mso % ctx)
436 436 elif ctx.troubled():
437 437 raise util.Abort(mst[ctx.troubles()[0]] % ctx)
438 438 newbm = pushop.ui.configlist('bookmarks', 'pushing')
439 439 discovery.checkheads(unfi, pushop.remote, outgoing,
440 440 pushop.remoteheads,
441 441 pushop.newbranch,
442 442 bool(pushop.incoming),
443 443 newbm)
444 444 return True
445 445
446 446 # List of names of steps to perform for an outgoing bundle2, order matters.
447 447 b2partsgenorder = []
448 448
449 449 # Mapping between step name and function
450 450 #
451 451 # This exists to help extensions wrap steps if necessary
452 452 b2partsgenmapping = {}
453 453
454 454 def b2partsgenerator(stepname, idx=None):
455 455 """decorator for function generating bundle2 part
456 456
457 457 The function is added to the step -> function mapping and appended to the
458 458 list of steps. Beware that decorated functions will be added in order
459 459 (this may matter).
460 460
461 461 You can only use this decorator for new steps, if you want to wrap a step
462 462 from an extension, attack the b2partsgenmapping dictionary directly."""
463 463 def dec(func):
464 464 assert stepname not in b2partsgenmapping
465 465 b2partsgenmapping[stepname] = func
466 466 if idx is None:
467 467 b2partsgenorder.append(stepname)
468 468 else:
469 469 b2partsgenorder.insert(idx, stepname)
470 470 return func
471 471 return dec
472 472
473 473 @b2partsgenerator('changeset')
474 474 def _pushb2ctx(pushop, bundler):
475 475 """handle changegroup push through bundle2
476 476
477 477 addchangegroup result is stored in the ``pushop.cgresult`` attribute.
478 478 """
479 479 if 'changesets' in pushop.stepsdone:
480 480 return
481 481 pushop.stepsdone.add('changesets')
482 482 # Send known heads to the server for race detection.
483 483 if not _pushcheckoutgoing(pushop):
484 484 return
485 485 pushop.repo.prepushoutgoinghooks(pushop.repo,
486 486 pushop.remote,
487 487 pushop.outgoing)
488 488 if not pushop.force:
489 489 bundler.newpart('check:heads', data=iter(pushop.remoteheads))
490 490 b2caps = bundle2.bundle2caps(pushop.remote)
491 491 version = None
492 492 cgversions = b2caps.get('changegroup')
493 493 if not cgversions: # 3.1 and 3.2 ship with an empty value
494 494 cg = changegroup.getlocalchangegroupraw(pushop.repo, 'push',
495 495 pushop.outgoing)
496 496 else:
497 497 cgversions = [v for v in cgversions if v in changegroup.packermap]
498 498 if not cgversions:
499 499 raise ValueError(_('no common changegroup version'))
500 500 version = max(cgversions)
501 501 cg = changegroup.getlocalchangegroupraw(pushop.repo, 'push',
502 502 pushop.outgoing,
503 503 version=version)
504 504 cgpart = bundler.newpart('changegroup', data=cg)
505 505 if version is not None:
506 506 cgpart.addparam('version', version)
507 507 def handlereply(op):
508 508 """extract addchangegroup returns from server reply"""
509 509 cgreplies = op.records.getreplies(cgpart.id)
510 510 assert len(cgreplies['changegroup']) == 1
511 511 pushop.cgresult = cgreplies['changegroup'][0]['return']
512 512 return handlereply
513 513
514 514 @b2partsgenerator('phase')
515 515 def _pushb2phases(pushop, bundler):
516 516 """handle phase push through bundle2"""
517 517 if 'phases' in pushop.stepsdone:
518 518 return
519 519 b2caps = bundle2.bundle2caps(pushop.remote)
520 520 if not 'pushkey' in b2caps:
521 521 return
522 522 pushop.stepsdone.add('phases')
523 523 part2node = []
524 524 enc = pushkey.encode
525 525 for newremotehead in pushop.outdatedphases:
526 526 part = bundler.newpart('pushkey')
527 527 part.addparam('namespace', enc('phases'))
528 528 part.addparam('key', enc(newremotehead.hex()))
529 529 part.addparam('old', enc(str(phases.draft)))
530 530 part.addparam('new', enc(str(phases.public)))
531 531 part2node.append((part.id, newremotehead))
532 532 def handlereply(op):
533 533 for partid, node in part2node:
534 534 partrep = op.records.getreplies(partid)
535 535 results = partrep['pushkey']
536 536 assert len(results) <= 1
537 537 msg = None
538 538 if not results:
539 539 msg = _('server ignored update of %s to public!\n') % node
540 540 elif not int(results[0]['return']):
541 541 msg = _('updating %s to public failed!\n') % node
542 542 if msg is not None:
543 543 pushop.ui.warn(msg)
544 544 return handlereply
545 545
546 546 @b2partsgenerator('obsmarkers')
547 547 def _pushb2obsmarkers(pushop, bundler):
548 548 if 'obsmarkers' in pushop.stepsdone:
549 549 return
550 550 remoteversions = bundle2.obsmarkersversion(bundler.capabilities)
551 551 if obsolete.commonversion(remoteversions) is None:
552 552 return
553 553 pushop.stepsdone.add('obsmarkers')
554 554 if pushop.outobsmarkers:
555 555 markers = sorted(pushop.outobsmarkers)
556 556 buildobsmarkerspart(bundler, markers)
557 557
558 558 @b2partsgenerator('bookmarks')
559 559 def _pushb2bookmarks(pushop, bundler):
560 560 """handle phase push through bundle2"""
561 561 if 'bookmarks' in pushop.stepsdone:
562 562 return
563 563 b2caps = bundle2.bundle2caps(pushop.remote)
564 564 if 'pushkey' not in b2caps:
565 565 return
566 566 pushop.stepsdone.add('bookmarks')
567 567 part2book = []
568 568 enc = pushkey.encode
569 569 for book, old, new in pushop.outbookmarks:
570 570 part = bundler.newpart('pushkey')
571 571 part.addparam('namespace', enc('bookmarks'))
572 572 part.addparam('key', enc(book))
573 573 part.addparam('old', enc(old))
574 574 part.addparam('new', enc(new))
575 575 action = 'update'
576 576 if not old:
577 577 action = 'export'
578 578 elif not new:
579 579 action = 'delete'
580 580 part2book.append((part.id, book, action))
581 581
582 582
583 583 def handlereply(op):
584 584 ui = pushop.ui
585 585 for partid, book, action in part2book:
586 586 partrep = op.records.getreplies(partid)
587 587 results = partrep['pushkey']
588 588 assert len(results) <= 1
589 589 if not results:
590 590 pushop.ui.warn(_('server ignored bookmark %s update\n') % book)
591 591 else:
592 592 ret = int(results[0]['return'])
593 593 if ret:
594 594 ui.status(bookmsgmap[action][0] % book)
595 595 else:
596 596 ui.warn(bookmsgmap[action][1] % book)
597 597 if pushop.bkresult is not None:
598 598 pushop.bkresult = 1
599 599 return handlereply
600 600
601 601
602 602 def _pushbundle2(pushop):
603 603 """push data to the remote using bundle2
604 604
605 605 The only currently supported type of data is changegroup but this will
606 606 evolve in the future."""
607 607 bundler = bundle2.bundle20(pushop.ui, bundle2.bundle2caps(pushop.remote))
608 608 pushback = (pushop.trmanager
609 609 and pushop.ui.configbool('experimental', 'bundle2.pushback'))
610 610
611 611 # create reply capability
612 612 capsblob = bundle2.encodecaps(bundle2.getrepocaps(pushop.repo,
613 613 allowpushback=pushback))
614 614 bundler.newpart('replycaps', data=capsblob)
615 615 replyhandlers = []
616 616 for partgenname in b2partsgenorder:
617 617 partgen = b2partsgenmapping[partgenname]
618 618 ret = partgen(pushop, bundler)
619 619 if callable(ret):
620 620 replyhandlers.append(ret)
621 621 # do not push if nothing to push
622 622 if bundler.nbparts <= 1:
623 623 return
624 624 stream = util.chunkbuffer(bundler.getchunks())
625 625 try:
626 626 reply = pushop.remote.unbundle(stream, ['force'], 'push')
627 627 except error.BundleValueError, exc:
628 628 raise util.Abort('missing support for %s' % exc)
629 629 try:
630 630 trgetter = None
631 631 if pushback:
632 632 trgetter = pushop.trmanager.transaction
633 633 op = bundle2.processbundle(pushop.repo, reply, trgetter)
634 634 except error.BundleValueError, exc:
635 635 raise util.Abort('missing support for %s' % exc)
636 636 for rephand in replyhandlers:
637 637 rephand(op)
638 638
639 639 def _pushchangeset(pushop):
640 640 """Make the actual push of changeset bundle to remote repo"""
641 641 if 'changesets' in pushop.stepsdone:
642 642 return
643 643 pushop.stepsdone.add('changesets')
644 644 if not _pushcheckoutgoing(pushop):
645 645 return
646 646 pushop.repo.prepushoutgoinghooks(pushop.repo,
647 647 pushop.remote,
648 648 pushop.outgoing)
649 649 outgoing = pushop.outgoing
650 650 unbundle = pushop.remote.capable('unbundle')
651 651 # TODO: get bundlecaps from remote
652 652 bundlecaps = None
653 653 # create a changegroup from local
654 654 if pushop.revs is None and not (outgoing.excluded
655 655 or pushop.repo.changelog.filteredrevs):
656 656 # push everything,
657 657 # use the fast path, no race possible on push
658 658 bundler = changegroup.cg1packer(pushop.repo, bundlecaps)
659 659 cg = changegroup.getsubset(pushop.repo,
660 660 outgoing,
661 661 bundler,
662 662 'push',
663 663 fastpath=True)
664 664 else:
665 665 cg = changegroup.getlocalchangegroup(pushop.repo, 'push', outgoing,
666 666 bundlecaps)
667 667
668 668 # apply changegroup to remote
669 669 if unbundle:
670 670 # local repo finds heads on server, finds out what
671 671 # revs it must push. once revs transferred, if server
672 672 # finds it has different heads (someone else won
673 673 # commit/push race), server aborts.
674 674 if pushop.force:
675 675 remoteheads = ['force']
676 676 else:
677 677 remoteheads = pushop.remoteheads
678 678 # ssh: return remote's addchangegroup()
679 679 # http: return remote's addchangegroup() or 0 for error
680 680 pushop.cgresult = pushop.remote.unbundle(cg, remoteheads,
681 681 pushop.repo.url())
682 682 else:
683 683 # we return an integer indicating remote head count
684 684 # change
685 685 pushop.cgresult = pushop.remote.addchangegroup(cg, 'push',
686 686 pushop.repo.url())
687 687
688 688 def _pushsyncphase(pushop):
689 689 """synchronise phase information locally and remotely"""
690 690 cheads = pushop.commonheads
691 691 # even when we don't push, exchanging phase data is useful
692 692 remotephases = pushop.remote.listkeys('phases')
693 693 if (pushop.ui.configbool('ui', '_usedassubrepo', False)
694 694 and remotephases # server supports phases
695 695 and pushop.cgresult is None # nothing was pushed
696 696 and remotephases.get('publishing', False)):
697 697 # When:
698 698 # - this is a subrepo push
699 699 # - and remote support phase
700 700 # - and no changeset was pushed
701 701 # - and remote is publishing
702 702 # We may be in issue 3871 case!
703 703 # We drop the possible phase synchronisation done by
704 704 # courtesy to publish changesets possibly locally draft
705 705 # on the remote.
706 706 remotephases = {'publishing': 'True'}
707 707 if not remotephases: # old server or public only reply from non-publishing
708 708 _localphasemove(pushop, cheads)
709 709 # don't push any phase data as there is nothing to push
710 710 else:
711 711 ana = phases.analyzeremotephases(pushop.repo, cheads,
712 712 remotephases)
713 713 pheads, droots = ana
714 714 ### Apply remote phase on local
715 715 if remotephases.get('publishing', False):
716 716 _localphasemove(pushop, cheads)
717 717 else: # publish = False
718 718 _localphasemove(pushop, pheads)
719 719 _localphasemove(pushop, cheads, phases.draft)
720 720 ### Apply local phase on remote
721 721
722 722 if pushop.cgresult:
723 723 if 'phases' in pushop.stepsdone:
724 724 # phases already pushed though bundle2
725 725 return
726 726 outdated = pushop.outdatedphases
727 727 else:
728 728 outdated = pushop.fallbackoutdatedphases
729 729
730 730 pushop.stepsdone.add('phases')
731 731
732 732 # filter heads already turned public by the push
733 733 outdated = [c for c in outdated if c.node() not in pheads]
734 734 # fallback to independent pushkey command
735 735 for newremotehead in outdated:
736 736 r = pushop.remote.pushkey('phases',
737 737 newremotehead.hex(),
738 738 str(phases.draft),
739 739 str(phases.public))
740 740 if not r:
741 741 pushop.ui.warn(_('updating %s to public failed!\n')
742 742 % newremotehead)
743 743
744 744 def _localphasemove(pushop, nodes, phase=phases.public):
745 745 """move <nodes> to <phase> in the local source repo"""
746 746 if pushop.trmanager:
747 747 phases.advanceboundary(pushop.repo,
748 748 pushop.trmanager.transaction(),
749 749 phase,
750 750 nodes)
751 751 else:
752 752 # repo is not locked, do not change any phases!
753 753 # Informs the user that phases should have been moved when
754 754 # applicable.
755 755 actualmoves = [n for n in nodes if phase < pushop.repo[n].phase()]
756 756 phasestr = phases.phasenames[phase]
757 757 if actualmoves:
758 758 pushop.ui.status(_('cannot lock source repo, skipping '
759 759 'local %s phase update\n') % phasestr)
760 760
761 761 def _pushobsolete(pushop):
762 762 """utility function to push obsolete markers to a remote"""
763 763 if 'obsmarkers' in pushop.stepsdone:
764 764 return
765 765 pushop.ui.debug('try to push obsolete markers to remote\n')
766 766 repo = pushop.repo
767 767 remote = pushop.remote
768 768 pushop.stepsdone.add('obsmarkers')
769 769 if pushop.outobsmarkers:
770 770 rslts = []
771 771 remotedata = obsolete._pushkeyescape(sorted(pushop.outobsmarkers))
772 772 for key in sorted(remotedata, reverse=True):
773 773 # reverse sort to ensure we end with dump0
774 774 data = remotedata[key]
775 775 rslts.append(remote.pushkey('obsolete', key, '', data))
776 776 if [r for r in rslts if not r]:
777 777 msg = _('failed to push some obsolete markers!\n')
778 778 repo.ui.warn(msg)
779 779
780 780 def _pushbookmark(pushop):
781 781 """Update bookmark position on remote"""
782 782 if pushop.cgresult == 0 or 'bookmarks' in pushop.stepsdone:
783 783 return
784 784 pushop.stepsdone.add('bookmarks')
785 785 ui = pushop.ui
786 786 remote = pushop.remote
787 787
788 788 for b, old, new in pushop.outbookmarks:
789 789 action = 'update'
790 790 if not old:
791 791 action = 'export'
792 792 elif not new:
793 793 action = 'delete'
794 794 if remote.pushkey('bookmarks', b, old, new):
795 795 ui.status(bookmsgmap[action][0] % b)
796 796 else:
797 797 ui.warn(bookmsgmap[action][1] % b)
798 798 # discovery can have set the value form invalid entry
799 799 if pushop.bkresult is not None:
800 800 pushop.bkresult = 1
801 801
802 802 class pulloperation(object):
803 803 """A object that represent a single pull operation
804 804
805 805 It purpose is to carry pull related state and very common operation.
806 806
807 807 A new should be created at the beginning of each pull and discarded
808 808 afterward.
809 809 """
810 810
811 811 def __init__(self, repo, remote, heads=None, force=False, bookmarks=(),
812 812 remotebookmarks=None):
813 813 # repo we pull into
814 814 self.repo = repo
815 815 # repo we pull from
816 816 self.remote = remote
817 817 # revision we try to pull (None is "all")
818 818 self.heads = heads
819 819 # bookmark pulled explicitly
820 820 self.explicitbookmarks = bookmarks
821 821 # do we force pull?
822 822 self.force = force
823 823 # transaction manager
824 824 self.trmanager = None
825 825 # set of common changeset between local and remote before pull
826 826 self.common = None
827 827 # set of pulled head
828 828 self.rheads = None
829 829 # list of missing changeset to fetch remotely
830 830 self.fetch = None
831 831 # remote bookmarks data
832 832 self.remotebookmarks = remotebookmarks
833 833 # result of changegroup pulling (used as return code by pull)
834 834 self.cgresult = None
835 835 # list of step already done
836 836 self.stepsdone = set()
837 837
838 838 @util.propertycache
839 839 def pulledsubset(self):
840 840 """heads of the set of changeset target by the pull"""
841 841 # compute target subset
842 842 if self.heads is None:
843 843 # We pulled every thing possible
844 844 # sync on everything common
845 845 c = set(self.common)
846 846 ret = list(self.common)
847 847 for n in self.rheads:
848 848 if n not in c:
849 849 ret.append(n)
850 850 return ret
851 851 else:
852 852 # We pulled a specific subset
853 853 # sync on this subset
854 854 return self.heads
855 855
856 856 def gettransaction(self):
857 857 # deprecated; talk to trmanager directly
858 858 return self.trmanager.transaction()
859 859
860 860 class transactionmanager(object):
861 861 """An object to manage the life cycle of a transaction
862 862
863 863 It creates the transaction on demand and calls the appropriate hooks when
864 864 closing the transaction."""
865 865 def __init__(self, repo, source, url):
866 866 self.repo = repo
867 867 self.source = source
868 868 self.url = url
869 869 self._tr = None
870 870
871 871 def transaction(self):
872 872 """Return an open transaction object, constructing if necessary"""
873 873 if not self._tr:
874 874 trname = '%s\n%s' % (self.source, util.hidepassword(self.url))
875 875 self._tr = self.repo.transaction(trname)
876 876 self._tr.hookargs['source'] = self.source
877 877 self._tr.hookargs['url'] = self.url
878 878 return self._tr
879 879
880 880 def close(self):
881 881 """close transaction if created"""
882 882 if self._tr is not None:
883 883 self._tr.close()
884 884
885 885 def release(self):
886 886 """release transaction if created"""
887 887 if self._tr is not None:
888 888 self._tr.release()
889 889
890 890 def pull(repo, remote, heads=None, force=False, bookmarks=(), opargs=None):
891 891 if opargs is None:
892 892 opargs = {}
893 893 pullop = pulloperation(repo, remote, heads, force, bookmarks=bookmarks,
894 894 **opargs)
895 895 if pullop.remote.local():
896 896 missing = set(pullop.remote.requirements) - pullop.repo.supported
897 897 if missing:
898 898 msg = _("required features are not"
899 899 " supported in the destination:"
900 900 " %s") % (', '.join(sorted(missing)))
901 901 raise util.Abort(msg)
902 902
903 903 lock = pullop.repo.lock()
904 904 try:
905 905 pullop.trmanager = transactionmanager(repo, 'pull', remote.url())
906 906 _pulldiscovery(pullop)
907 907 if _canusebundle2(pullop):
908 908 _pullbundle2(pullop)
909 909 _pullchangeset(pullop)
910 910 _pullphase(pullop)
911 911 _pullbookmarks(pullop)
912 912 _pullobsolete(pullop)
913 913 pullop.trmanager.close()
914 914 finally:
915 915 pullop.trmanager.release()
916 916 lock.release()
917 917
918 918 return pullop
919 919
920 920 # list of steps to perform discovery before pull
921 921 pulldiscoveryorder = []
922 922
923 923 # Mapping between step name and function
924 924 #
925 925 # This exists to help extensions wrap steps if necessary
926 926 pulldiscoverymapping = {}
927 927
928 928 def pulldiscovery(stepname):
929 929 """decorator for function performing discovery before pull
930 930
931 931 The function is added to the step -> function mapping and appended to the
932 932 list of steps. Beware that decorated function will be added in order (this
933 933 may matter).
934 934
935 935 You can only use this decorator for a new step, if you want to wrap a step
936 936 from an extension, change the pulldiscovery dictionary directly."""
937 937 def dec(func):
938 938 assert stepname not in pulldiscoverymapping
939 939 pulldiscoverymapping[stepname] = func
940 940 pulldiscoveryorder.append(stepname)
941 941 return func
942 942 return dec
943 943
944 944 def _pulldiscovery(pullop):
945 945 """Run all discovery steps"""
946 946 for stepname in pulldiscoveryorder:
947 947 step = pulldiscoverymapping[stepname]
948 948 step(pullop)
949 949
950 950 @pulldiscovery('b1:bookmarks')
951 951 def _pullbookmarkbundle1(pullop):
952 952 """fetch bookmark data in bundle1 case
953 953
954 954 If not using bundle2, we have to fetch bookmarks before changeset
955 955 discovery to reduce the chance and impact of race conditions."""
956 956 if pullop.remotebookmarks is not None:
957 957 return
958 if not _canusebundle2(pullop): # all bundle2 server now support listkeys
958 if (_canusebundle2(pullop)
959 and 'listkeys' in bundle2.bundle2caps(pullop.remote)):
960 # all known bundle2 servers now support listkeys, but lets be nice with
961 # new implementation.
962 return
959 963 pullop.remotebookmarks = pullop.remote.listkeys('bookmarks')
960 964
961 965
962 966 @pulldiscovery('changegroup')
963 967 def _pulldiscoverychangegroup(pullop):
964 968 """discovery phase for the pull
965 969
966 970 Current handle changeset discovery only, will change handle all discovery
967 971 at some point."""
968 972 tmp = discovery.findcommonincoming(pullop.repo,
969 973 pullop.remote,
970 974 heads=pullop.heads,
971 975 force=pullop.force)
972 976 common, fetch, rheads = tmp
973 977 nm = pullop.repo.unfiltered().changelog.nodemap
974 978 if fetch and rheads:
975 979 # If a remote heads in filtered locally, lets drop it from the unknown
976 980 # remote heads and put in back in common.
977 981 #
978 982 # This is a hackish solution to catch most of "common but locally
979 983 # hidden situation". We do not performs discovery on unfiltered
980 984 # repository because it end up doing a pathological amount of round
981 985 # trip for w huge amount of changeset we do not care about.
982 986 #
983 987 # If a set of such "common but filtered" changeset exist on the server
984 988 # but are not including a remote heads, we'll not be able to detect it,
985 989 scommon = set(common)
986 990 filteredrheads = []
987 991 for n in rheads:
988 992 if n in nm:
989 993 if n not in scommon:
990 994 common.append(n)
991 995 else:
992 996 filteredrheads.append(n)
993 997 if not filteredrheads:
994 998 fetch = []
995 999 rheads = filteredrheads
996 1000 pullop.common = common
997 1001 pullop.fetch = fetch
998 1002 pullop.rheads = rheads
999 1003
1000 1004 def _pullbundle2(pullop):
1001 1005 """pull data using bundle2
1002 1006
1003 1007 For now, the only supported data are changegroup."""
1004 1008 remotecaps = bundle2.bundle2caps(pullop.remote)
1005 1009 kwargs = {'bundlecaps': caps20to10(pullop.repo)}
1006 1010 # pulling changegroup
1007 1011 pullop.stepsdone.add('changegroup')
1008 1012
1009 1013 kwargs['common'] = pullop.common
1010 1014 kwargs['heads'] = pullop.heads or pullop.rheads
1011 1015 kwargs['cg'] = pullop.fetch
1012 1016 if 'listkeys' in remotecaps:
1013 1017 kwargs['listkeys'] = ['phase']
1014 1018 if pullop.remotebookmarks is None:
1015 1019 # make sure to always includes bookmark data when migrating
1016 1020 # `hg incoming --bundle` to using this function.
1017 1021 kwargs['listkeys'].append('bookmarks')
1018 1022 if not pullop.fetch:
1019 1023 pullop.repo.ui.status(_("no changes found\n"))
1020 1024 pullop.cgresult = 0
1021 1025 else:
1022 1026 if pullop.heads is None and list(pullop.common) == [nullid]:
1023 1027 pullop.repo.ui.status(_("requesting all changes\n"))
1024 1028 if obsolete.isenabled(pullop.repo, obsolete.exchangeopt):
1025 1029 remoteversions = bundle2.obsmarkersversion(remotecaps)
1026 1030 if obsolete.commonversion(remoteversions) is not None:
1027 1031 kwargs['obsmarkers'] = True
1028 1032 pullop.stepsdone.add('obsmarkers')
1029 1033 _pullbundle2extraprepare(pullop, kwargs)
1030 1034 bundle = pullop.remote.getbundle('pull', **kwargs)
1031 1035 try:
1032 1036 op = bundle2.processbundle(pullop.repo, bundle, pullop.gettransaction)
1033 1037 except error.BundleValueError, exc:
1034 1038 raise util.Abort('missing support for %s' % exc)
1035 1039
1036 1040 if pullop.fetch:
1037 1041 results = [cg['return'] for cg in op.records['changegroup']]
1038 1042 pullop.cgresult = changegroup.combineresults(results)
1039 1043
1040 1044 # processing phases change
1041 1045 for namespace, value in op.records['listkeys']:
1042 1046 if namespace == 'phases':
1043 1047 _pullapplyphases(pullop, value)
1044 1048
1045 1049 # processing bookmark update
1046 1050 for namespace, value in op.records['listkeys']:
1047 1051 if namespace == 'bookmarks':
1048 1052 pullop.remotebookmarks = value
1049 1053
1050 1054 # bookmark data were either already there or pulled in the bundle
1051 1055 if pullop.remotebookmarks is not None:
1052 1056 _pullbookmarks(pullop)
1053 1057
1054 1058 def _pullbundle2extraprepare(pullop, kwargs):
1055 1059 """hook function so that extensions can extend the getbundle call"""
1056 1060 pass
1057 1061
1058 1062 def _pullchangeset(pullop):
1059 1063 """pull changeset from unbundle into the local repo"""
1060 1064 # We delay the open of the transaction as late as possible so we
1061 1065 # don't open transaction for nothing or you break future useful
1062 1066 # rollback call
1063 1067 if 'changegroup' in pullop.stepsdone:
1064 1068 return
1065 1069 pullop.stepsdone.add('changegroup')
1066 1070 if not pullop.fetch:
1067 1071 pullop.repo.ui.status(_("no changes found\n"))
1068 1072 pullop.cgresult = 0
1069 1073 return
1070 1074 pullop.gettransaction()
1071 1075 if pullop.heads is None and list(pullop.common) == [nullid]:
1072 1076 pullop.repo.ui.status(_("requesting all changes\n"))
1073 1077 elif pullop.heads is None and pullop.remote.capable('changegroupsubset'):
1074 1078 # issue1320, avoid a race if remote changed after discovery
1075 1079 pullop.heads = pullop.rheads
1076 1080
1077 1081 if pullop.remote.capable('getbundle'):
1078 1082 # TODO: get bundlecaps from remote
1079 1083 cg = pullop.remote.getbundle('pull', common=pullop.common,
1080 1084 heads=pullop.heads or pullop.rheads)
1081 1085 elif pullop.heads is None:
1082 1086 cg = pullop.remote.changegroup(pullop.fetch, 'pull')
1083 1087 elif not pullop.remote.capable('changegroupsubset'):
1084 1088 raise util.Abort(_("partial pull cannot be done because "
1085 1089 "other repository doesn't support "
1086 1090 "changegroupsubset."))
1087 1091 else:
1088 1092 cg = pullop.remote.changegroupsubset(pullop.fetch, pullop.heads, 'pull')
1089 1093 pullop.cgresult = changegroup.addchangegroup(pullop.repo, cg, 'pull',
1090 1094 pullop.remote.url())
1091 1095
1092 1096 def _pullphase(pullop):
1093 1097 # Get remote phases data from remote
1094 1098 if 'phases' in pullop.stepsdone:
1095 1099 return
1096 1100 remotephases = pullop.remote.listkeys('phases')
1097 1101 _pullapplyphases(pullop, remotephases)
1098 1102
1099 1103 def _pullapplyphases(pullop, remotephases):
1100 1104 """apply phase movement from observed remote state"""
1101 1105 if 'phases' in pullop.stepsdone:
1102 1106 return
1103 1107 pullop.stepsdone.add('phases')
1104 1108 publishing = bool(remotephases.get('publishing', False))
1105 1109 if remotephases and not publishing:
1106 1110 # remote is new and unpublishing
1107 1111 pheads, _dr = phases.analyzeremotephases(pullop.repo,
1108 1112 pullop.pulledsubset,
1109 1113 remotephases)
1110 1114 dheads = pullop.pulledsubset
1111 1115 else:
1112 1116 # Remote is old or publishing all common changesets
1113 1117 # should be seen as public
1114 1118 pheads = pullop.pulledsubset
1115 1119 dheads = []
1116 1120 unfi = pullop.repo.unfiltered()
1117 1121 phase = unfi._phasecache.phase
1118 1122 rev = unfi.changelog.nodemap.get
1119 1123 public = phases.public
1120 1124 draft = phases.draft
1121 1125
1122 1126 # exclude changesets already public locally and update the others
1123 1127 pheads = [pn for pn in pheads if phase(unfi, rev(pn)) > public]
1124 1128 if pheads:
1125 1129 tr = pullop.gettransaction()
1126 1130 phases.advanceboundary(pullop.repo, tr, public, pheads)
1127 1131
1128 1132 # exclude changesets already draft locally and update the others
1129 1133 dheads = [pn for pn in dheads if phase(unfi, rev(pn)) > draft]
1130 1134 if dheads:
1131 1135 tr = pullop.gettransaction()
1132 1136 phases.advanceboundary(pullop.repo, tr, draft, dheads)
1133 1137
1134 1138 def _pullbookmarks(pullop):
1135 1139 """process the remote bookmark information to update the local one"""
1136 1140 if 'bookmarks' in pullop.stepsdone:
1137 1141 return
1138 1142 pullop.stepsdone.add('bookmarks')
1139 1143 repo = pullop.repo
1140 1144 remotebookmarks = pullop.remotebookmarks
1141 1145 bookmod.updatefromremote(repo.ui, repo, remotebookmarks,
1142 1146 pullop.remote.url(),
1143 1147 pullop.gettransaction,
1144 1148 explicit=pullop.explicitbookmarks)
1145 1149
1146 1150 def _pullobsolete(pullop):
1147 1151 """utility function to pull obsolete markers from a remote
1148 1152
1149 1153 The `gettransaction` is function that return the pull transaction, creating
1150 1154 one if necessary. We return the transaction to inform the calling code that
1151 1155 a new transaction have been created (when applicable).
1152 1156
1153 1157 Exists mostly to allow overriding for experimentation purpose"""
1154 1158 if 'obsmarkers' in pullop.stepsdone:
1155 1159 return
1156 1160 pullop.stepsdone.add('obsmarkers')
1157 1161 tr = None
1158 1162 if obsolete.isenabled(pullop.repo, obsolete.exchangeopt):
1159 1163 pullop.repo.ui.debug('fetching remote obsolete markers\n')
1160 1164 remoteobs = pullop.remote.listkeys('obsolete')
1161 1165 if 'dump0' in remoteobs:
1162 1166 tr = pullop.gettransaction()
1163 1167 for key in sorted(remoteobs, reverse=True):
1164 1168 if key.startswith('dump'):
1165 1169 data = base85.b85decode(remoteobs[key])
1166 1170 pullop.repo.obsstore.mergemarkers(tr, data)
1167 1171 pullop.repo.invalidatevolatilesets()
1168 1172 return tr
1169 1173
1170 1174 def caps20to10(repo):
1171 1175 """return a set with appropriate options to use bundle20 during getbundle"""
1172 1176 caps = set(['HG20'])
1173 1177 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
1174 1178 caps.add('bundle2=' + urllib.quote(capsblob))
1175 1179 return caps
1176 1180
1177 1181 # List of names of steps to perform for a bundle2 for getbundle, order matters.
1178 1182 getbundle2partsorder = []
1179 1183
1180 1184 # Mapping between step name and function
1181 1185 #
1182 1186 # This exists to help extensions wrap steps if necessary
1183 1187 getbundle2partsmapping = {}
1184 1188
1185 1189 def getbundle2partsgenerator(stepname, idx=None):
1186 1190 """decorator for function generating bundle2 part for getbundle
1187 1191
1188 1192 The function is added to the step -> function mapping and appended to the
1189 1193 list of steps. Beware that decorated functions will be added in order
1190 1194 (this may matter).
1191 1195
1192 1196 You can only use this decorator for new steps, if you want to wrap a step
1193 1197 from an extension, attack the getbundle2partsmapping dictionary directly."""
1194 1198 def dec(func):
1195 1199 assert stepname not in getbundle2partsmapping
1196 1200 getbundle2partsmapping[stepname] = func
1197 1201 if idx is None:
1198 1202 getbundle2partsorder.append(stepname)
1199 1203 else:
1200 1204 getbundle2partsorder.insert(idx, stepname)
1201 1205 return func
1202 1206 return dec
1203 1207
1204 1208 def getbundle(repo, source, heads=None, common=None, bundlecaps=None,
1205 1209 **kwargs):
1206 1210 """return a full bundle (with potentially multiple kind of parts)
1207 1211
1208 1212 Could be a bundle HG10 or a bundle HG20 depending on bundlecaps
1209 1213 passed. For now, the bundle can contain only changegroup, but this will
1210 1214 changes when more part type will be available for bundle2.
1211 1215
1212 1216 This is different from changegroup.getchangegroup that only returns an HG10
1213 1217 changegroup bundle. They may eventually get reunited in the future when we
1214 1218 have a clearer idea of the API we what to query different data.
1215 1219
1216 1220 The implementation is at a very early stage and will get massive rework
1217 1221 when the API of bundle is refined.
1218 1222 """
1219 1223 # bundle10 case
1220 1224 usebundle2 = False
1221 1225 if bundlecaps is not None:
1222 1226 usebundle2 = any((cap.startswith('HG2') for cap in bundlecaps))
1223 1227 if not usebundle2:
1224 1228 if bundlecaps and not kwargs.get('cg', True):
1225 1229 raise ValueError(_('request for bundle10 must include changegroup'))
1226 1230
1227 1231 if kwargs:
1228 1232 raise ValueError(_('unsupported getbundle arguments: %s')
1229 1233 % ', '.join(sorted(kwargs.keys())))
1230 1234 return changegroup.getchangegroup(repo, source, heads=heads,
1231 1235 common=common, bundlecaps=bundlecaps)
1232 1236
1233 1237 # bundle20 case
1234 1238 b2caps = {}
1235 1239 for bcaps in bundlecaps:
1236 1240 if bcaps.startswith('bundle2='):
1237 1241 blob = urllib.unquote(bcaps[len('bundle2='):])
1238 1242 b2caps.update(bundle2.decodecaps(blob))
1239 1243 bundler = bundle2.bundle20(repo.ui, b2caps)
1240 1244
1241 1245 kwargs['heads'] = heads
1242 1246 kwargs['common'] = common
1243 1247
1244 1248 for name in getbundle2partsorder:
1245 1249 func = getbundle2partsmapping[name]
1246 1250 func(bundler, repo, source, bundlecaps=bundlecaps, b2caps=b2caps,
1247 1251 **kwargs)
1248 1252
1249 1253 return util.chunkbuffer(bundler.getchunks())
1250 1254
1251 1255 @getbundle2partsgenerator('changegroup')
1252 1256 def _getbundlechangegrouppart(bundler, repo, source, bundlecaps=None,
1253 1257 b2caps=None, heads=None, common=None, **kwargs):
1254 1258 """add a changegroup part to the requested bundle"""
1255 1259 cg = None
1256 1260 if kwargs.get('cg', True):
1257 1261 # build changegroup bundle here.
1258 1262 version = None
1259 1263 cgversions = b2caps.get('changegroup')
1260 1264 if not cgversions: # 3.1 and 3.2 ship with an empty value
1261 1265 cg = changegroup.getchangegroupraw(repo, source, heads=heads,
1262 1266 common=common,
1263 1267 bundlecaps=bundlecaps)
1264 1268 else:
1265 1269 cgversions = [v for v in cgversions if v in changegroup.packermap]
1266 1270 if not cgversions:
1267 1271 raise ValueError(_('no common changegroup version'))
1268 1272 version = max(cgversions)
1269 1273 cg = changegroup.getchangegroupraw(repo, source, heads=heads,
1270 1274 common=common,
1271 1275 bundlecaps=bundlecaps,
1272 1276 version=version)
1273 1277
1274 1278 if cg:
1275 1279 part = bundler.newpart('changegroup', data=cg)
1276 1280 if version is not None:
1277 1281 part.addparam('version', version)
1278 1282
1279 1283 @getbundle2partsgenerator('listkeys')
1280 1284 def _getbundlelistkeysparts(bundler, repo, source, bundlecaps=None,
1281 1285 b2caps=None, **kwargs):
1282 1286 """add parts containing listkeys namespaces to the requested bundle"""
1283 1287 listkeys = kwargs.get('listkeys', ())
1284 1288 for namespace in listkeys:
1285 1289 part = bundler.newpart('listkeys')
1286 1290 part.addparam('namespace', namespace)
1287 1291 keys = repo.listkeys(namespace).items()
1288 1292 part.data = pushkey.encodekeys(keys)
1289 1293
1290 1294 @getbundle2partsgenerator('obsmarkers')
1291 1295 def _getbundleobsmarkerpart(bundler, repo, source, bundlecaps=None,
1292 1296 b2caps=None, heads=None, **kwargs):
1293 1297 """add an obsolescence markers part to the requested bundle"""
1294 1298 if kwargs.get('obsmarkers', False):
1295 1299 if heads is None:
1296 1300 heads = repo.heads()
1297 1301 subset = [c.node() for c in repo.set('::%ln', heads)]
1298 1302 markers = repo.obsstore.relevantmarkers(subset)
1299 1303 markers = sorted(markers)
1300 1304 buildobsmarkerspart(bundler, markers)
1301 1305
1302 1306 @getbundle2partsgenerator('hgtagsfnodes')
1303 1307 def _getbundletagsfnodes(bundler, repo, source, bundlecaps=None,
1304 1308 b2caps=None, heads=None, common=None,
1305 1309 **kwargs):
1306 1310 """Transfer the .hgtags filenodes mapping.
1307 1311
1308 1312 Only values for heads in this bundle will be transferred.
1309 1313
1310 1314 The part data consists of pairs of 20 byte changeset node and .hgtags
1311 1315 filenodes raw values.
1312 1316 """
1313 1317 # Don't send unless:
1314 1318 # - changeset are being exchanged,
1315 1319 # - the client supports it.
1316 1320 if not (kwargs.get('cg', True) and 'hgtagsfnodes' in b2caps):
1317 1321 return
1318 1322
1319 1323 outgoing = changegroup.computeoutgoing(repo, heads, common)
1320 1324
1321 1325 if not outgoing.missingheads:
1322 1326 return
1323 1327
1324 1328 cache = tags.hgtagsfnodescache(repo.unfiltered())
1325 1329 chunks = []
1326 1330
1327 1331 # .hgtags fnodes are only relevant for head changesets. While we could
1328 1332 # transfer values for all known nodes, there will likely be little to
1329 1333 # no benefit.
1330 1334 #
1331 1335 # We don't bother using a generator to produce output data because
1332 1336 # a) we only have 40 bytes per head and even esoteric numbers of heads
1333 1337 # consume little memory (1M heads is 40MB) b) we don't want to send the
1334 1338 # part if we don't have entries and knowing if we have entries requires
1335 1339 # cache lookups.
1336 1340 for node in outgoing.missingheads:
1337 1341 # Don't compute missing, as this may slow down serving.
1338 1342 fnode = cache.getfnode(node, computemissing=False)
1339 1343 if fnode is not None:
1340 1344 chunks.extend([node, fnode])
1341 1345
1342 1346 if chunks:
1343 1347 bundler.newpart('hgtagsfnodes', data=''.join(chunks))
1344 1348
1345 1349 def check_heads(repo, their_heads, context):
1346 1350 """check if the heads of a repo have been modified
1347 1351
1348 1352 Used by peer for unbundling.
1349 1353 """
1350 1354 heads = repo.heads()
1351 1355 heads_hash = util.sha1(''.join(sorted(heads))).digest()
1352 1356 if not (their_heads == ['force'] or their_heads == heads or
1353 1357 their_heads == ['hashed', heads_hash]):
1354 1358 # someone else committed/pushed/unbundled while we
1355 1359 # were transferring data
1356 1360 raise error.PushRaced('repository changed while %s - '
1357 1361 'please try again' % context)
1358 1362
1359 1363 def unbundle(repo, cg, heads, source, url):
1360 1364 """Apply a bundle to a repo.
1361 1365
1362 1366 this function makes sure the repo is locked during the application and have
1363 1367 mechanism to check that no push race occurred between the creation of the
1364 1368 bundle and its application.
1365 1369
1366 1370 If the push was raced as PushRaced exception is raised."""
1367 1371 r = 0
1368 1372 # need a transaction when processing a bundle2 stream
1369 1373 wlock = lock = tr = None
1370 1374 recordout = None
1371 1375 # quick fix for output mismatch with bundle2 in 3.4
1372 1376 captureoutput = repo.ui.configbool('experimental', 'bundle2-output-capture',
1373 1377 False)
1374 1378 if url.startswith('remote:http:') or url.startswith('remote:https:'):
1375 1379 captureoutput = True
1376 1380 try:
1377 1381 check_heads(repo, heads, 'uploading changes')
1378 1382 # push can proceed
1379 1383 if util.safehasattr(cg, 'params'):
1380 1384 r = None
1381 1385 try:
1382 1386 wlock = repo.wlock()
1383 1387 lock = repo.lock()
1384 1388 tr = repo.transaction(source)
1385 1389 tr.hookargs['source'] = source
1386 1390 tr.hookargs['url'] = url
1387 1391 tr.hookargs['bundle2'] = '1'
1388 1392 op = bundle2.bundleoperation(repo, lambda: tr,
1389 1393 captureoutput=captureoutput)
1390 1394 try:
1391 1395 r = bundle2.processbundle(repo, cg, op=op)
1392 1396 finally:
1393 1397 r = op.reply
1394 1398 if captureoutput and r is not None:
1395 1399 repo.ui.pushbuffer(error=True, subproc=True)
1396 1400 def recordout(output):
1397 1401 r.newpart('output', data=output, mandatory=False)
1398 1402 tr.close()
1399 1403 except BaseException, exc:
1400 1404 exc.duringunbundle2 = True
1401 1405 if captureoutput and r is not None:
1402 1406 parts = exc._bundle2salvagedoutput = r.salvageoutput()
1403 1407 def recordout(output):
1404 1408 part = bundle2.bundlepart('output', data=output,
1405 1409 mandatory=False)
1406 1410 parts.append(part)
1407 1411 raise
1408 1412 else:
1409 1413 lock = repo.lock()
1410 1414 r = changegroup.addchangegroup(repo, cg, source, url)
1411 1415 finally:
1412 1416 lockmod.release(tr, lock, wlock)
1413 1417 if recordout is not None:
1414 1418 recordout(repo.ui.popbuffer())
1415 1419 return r
1416 1420
1417 1421 # This is it's own function so extensions can override it.
1418 1422 def _walkstreamfiles(repo):
1419 1423 return repo.store.walk()
1420 1424
1421 1425 def generatestreamclone(repo):
1422 1426 """Emit content for a streaming clone.
1423 1427
1424 1428 This is a generator of raw chunks that constitute a streaming clone.
1425 1429
1426 1430 The stream begins with a line of 2 space-delimited integers containing the
1427 1431 number of entries and total bytes size.
1428 1432
1429 1433 Next, are N entries for each file being transferred. Each file entry starts
1430 1434 as a line with the file name and integer size delimited by a null byte.
1431 1435 The raw file data follows. Following the raw file data is the next file
1432 1436 entry, or EOF.
1433 1437
1434 1438 When used on the wire protocol, an additional line indicating protocol
1435 1439 success will be prepended to the stream. This function is not responsible
1436 1440 for adding it.
1437 1441
1438 1442 This function will obtain a repository lock to ensure a consistent view of
1439 1443 the store is captured. It therefore may raise LockError.
1440 1444 """
1441 1445 entries = []
1442 1446 total_bytes = 0
1443 1447 # Get consistent snapshot of repo, lock during scan.
1444 1448 lock = repo.lock()
1445 1449 try:
1446 1450 repo.ui.debug('scanning\n')
1447 1451 for name, ename, size in _walkstreamfiles(repo):
1448 1452 if size:
1449 1453 entries.append((name, size))
1450 1454 total_bytes += size
1451 1455 finally:
1452 1456 lock.release()
1453 1457
1454 1458 repo.ui.debug('%d files, %d bytes to transfer\n' %
1455 1459 (len(entries), total_bytes))
1456 1460 yield '%d %d\n' % (len(entries), total_bytes)
1457 1461
1458 1462 sopener = repo.svfs
1459 1463 oldaudit = sopener.mustaudit
1460 1464 debugflag = repo.ui.debugflag
1461 1465 sopener.mustaudit = False
1462 1466
1463 1467 try:
1464 1468 for name, size in entries:
1465 1469 if debugflag:
1466 1470 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
1467 1471 # partially encode name over the wire for backwards compat
1468 1472 yield '%s\0%d\n' % (store.encodedir(name), size)
1469 1473 if size <= 65536:
1470 1474 fp = sopener(name)
1471 1475 try:
1472 1476 data = fp.read(size)
1473 1477 finally:
1474 1478 fp.close()
1475 1479 yield data
1476 1480 else:
1477 1481 for chunk in util.filechunkiter(sopener(name), limit=size):
1478 1482 yield chunk
1479 1483 finally:
1480 1484 sopener.mustaudit = oldaudit
1481 1485
1482 1486 def consumestreamclone(repo, fp):
1483 1487 """Apply the contents from a streaming clone file.
1484 1488
1485 1489 This takes the output from "streamout" and applies it to the specified
1486 1490 repository.
1487 1491
1488 1492 Like "streamout," the status line added by the wire protocol is not handled
1489 1493 by this function.
1490 1494 """
1491 1495 lock = repo.lock()
1492 1496 try:
1493 1497 repo.ui.status(_('streaming all changes\n'))
1494 1498 l = fp.readline()
1495 1499 try:
1496 1500 total_files, total_bytes = map(int, l.split(' ', 1))
1497 1501 except (ValueError, TypeError):
1498 1502 raise error.ResponseError(
1499 1503 _('unexpected response from remote server:'), l)
1500 1504 repo.ui.status(_('%d files to transfer, %s of data\n') %
1501 1505 (total_files, util.bytecount(total_bytes)))
1502 1506 handled_bytes = 0
1503 1507 repo.ui.progress(_('clone'), 0, total=total_bytes)
1504 1508 start = time.time()
1505 1509
1506 1510 tr = repo.transaction(_('clone'))
1507 1511 try:
1508 1512 for i in xrange(total_files):
1509 1513 # XXX doesn't support '\n' or '\r' in filenames
1510 1514 l = fp.readline()
1511 1515 try:
1512 1516 name, size = l.split('\0', 1)
1513 1517 size = int(size)
1514 1518 except (ValueError, TypeError):
1515 1519 raise error.ResponseError(
1516 1520 _('unexpected response from remote server:'), l)
1517 1521 if repo.ui.debugflag:
1518 1522 repo.ui.debug('adding %s (%s)\n' %
1519 1523 (name, util.bytecount(size)))
1520 1524 # for backwards compat, name was partially encoded
1521 1525 ofp = repo.svfs(store.decodedir(name), 'w')
1522 1526 for chunk in util.filechunkiter(fp, limit=size):
1523 1527 handled_bytes += len(chunk)
1524 1528 repo.ui.progress(_('clone'), handled_bytes,
1525 1529 total=total_bytes)
1526 1530 ofp.write(chunk)
1527 1531 ofp.close()
1528 1532 tr.close()
1529 1533 finally:
1530 1534 tr.release()
1531 1535
1532 1536 # Writing straight to files circumvented the inmemory caches
1533 1537 repo.invalidate()
1534 1538
1535 1539 elapsed = time.time() - start
1536 1540 if elapsed <= 0:
1537 1541 elapsed = 0.001
1538 1542 repo.ui.progress(_('clone'), None)
1539 1543 repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') %
1540 1544 (util.bytecount(total_bytes), elapsed,
1541 1545 util.bytecount(total_bytes / elapsed)))
1542 1546 finally:
1543 1547 lock.release()
General Comments 0
You need to be logged in to leave comments. Login now