##// END OF EJS Templates
streamclone: move code out of exchange.py...
Gregory Szorc -
r26443:d947086d default
parent child Browse files
Show More
@@ -1,1598 +1,1469 b''
1 1 # exchange.py - utility to exchange data between repos.
2 2 #
3 3 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 import time
9 8 from i18n import _
10 9 from node import hex, nullid
11 10 import errno, urllib
12 import util, scmutil, changegroup, base85, error, store
11 import util, scmutil, changegroup, base85, error
13 12 import discovery, phases, obsolete, bookmarks as bookmod, bundle2, pushkey
14 13 import lock as lockmod
15 14 import tags
16 15
17 16 def readbundle(ui, fh, fname, vfs=None):
18 17 header = changegroup.readexactly(fh, 4)
19 18
20 19 alg = None
21 20 if not fname:
22 21 fname = "stream"
23 22 if not header.startswith('HG') and header.startswith('\0'):
24 23 fh = changegroup.headerlessfixup(fh, header)
25 24 header = "HG10"
26 25 alg = 'UN'
27 26 elif vfs:
28 27 fname = vfs.join(fname)
29 28
30 29 magic, version = header[0:2], header[2:4]
31 30
32 31 if magic != 'HG':
33 32 raise util.Abort(_('%s: not a Mercurial bundle') % fname)
34 33 if version == '10':
35 34 if alg is None:
36 35 alg = changegroup.readexactly(fh, 2)
37 36 return changegroup.cg1unpacker(fh, alg)
38 37 elif version.startswith('2'):
39 38 return bundle2.getunbundler(ui, fh, magicstring=magic + version)
40 39 else:
41 40 raise util.Abort(_('%s: unknown bundle version %s') % (fname, version))
42 41
43 42 def buildobsmarkerspart(bundler, markers):
44 43 """add an obsmarker part to the bundler with <markers>
45 44
46 45 No part is created if markers is empty.
47 46 Raises ValueError if the bundler doesn't support any known obsmarker format.
48 47 """
49 48 if markers:
50 49 remoteversions = bundle2.obsmarkersversion(bundler.capabilities)
51 50 version = obsolete.commonversion(remoteversions)
52 51 if version is None:
53 52 raise ValueError('bundler do not support common obsmarker format')
54 53 stream = obsolete.encodemarkers(markers, True, version=version)
55 54 return bundler.newpart('obsmarkers', data=stream)
56 55 return None
57 56
58 57 def _canusebundle2(op):
59 58 """return true if a pull/push can use bundle2
60 59
61 60 Feel free to nuke this function when we drop the experimental option"""
62 61 return (op.repo.ui.configbool('experimental', 'bundle2-exp', True)
63 62 and op.remote.capable('bundle2'))
64 63
65 64
66 65 class pushoperation(object):
67 66 """A object that represent a single push operation
68 67
69 68 It purpose is to carry push related state and very common operation.
70 69
71 70 A new should be created at the beginning of each push and discarded
72 71 afterward.
73 72 """
74 73
75 74 def __init__(self, repo, remote, force=False, revs=None, newbranch=False,
76 75 bookmarks=()):
77 76 # repo we push from
78 77 self.repo = repo
79 78 self.ui = repo.ui
80 79 # repo we push to
81 80 self.remote = remote
82 81 # force option provided
83 82 self.force = force
84 83 # revs to be pushed (None is "all")
85 84 self.revs = revs
86 85 # bookmark explicitly pushed
87 86 self.bookmarks = bookmarks
88 87 # allow push of new branch
89 88 self.newbranch = newbranch
90 89 # did a local lock get acquired?
91 90 self.locallocked = None
92 91 # step already performed
93 92 # (used to check what steps have been already performed through bundle2)
94 93 self.stepsdone = set()
95 94 # Integer version of the changegroup push result
96 95 # - None means nothing to push
97 96 # - 0 means HTTP error
98 97 # - 1 means we pushed and remote head count is unchanged *or*
99 98 # we have outgoing changesets but refused to push
100 99 # - other values as described by addchangegroup()
101 100 self.cgresult = None
102 101 # Boolean value for the bookmark push
103 102 self.bkresult = None
104 103 # discover.outgoing object (contains common and outgoing data)
105 104 self.outgoing = None
106 105 # all remote heads before the push
107 106 self.remoteheads = None
108 107 # testable as a boolean indicating if any nodes are missing locally.
109 108 self.incoming = None
110 109 # phases changes that must be pushed along side the changesets
111 110 self.outdatedphases = None
112 111 # phases changes that must be pushed if changeset push fails
113 112 self.fallbackoutdatedphases = None
114 113 # outgoing obsmarkers
115 114 self.outobsmarkers = set()
116 115 # outgoing bookmarks
117 116 self.outbookmarks = []
118 117 # transaction manager
119 118 self.trmanager = None
120 119 # map { pushkey partid -> callback handling failure}
121 120 # used to handle exception from mandatory pushkey part failure
122 121 self.pkfailcb = {}
123 122
124 123 @util.propertycache
125 124 def futureheads(self):
126 125 """future remote heads if the changeset push succeeds"""
127 126 return self.outgoing.missingheads
128 127
129 128 @util.propertycache
130 129 def fallbackheads(self):
131 130 """future remote heads if the changeset push fails"""
132 131 if self.revs is None:
133 132 # not target to push, all common are relevant
134 133 return self.outgoing.commonheads
135 134 unfi = self.repo.unfiltered()
136 135 # I want cheads = heads(::missingheads and ::commonheads)
137 136 # (missingheads is revs with secret changeset filtered out)
138 137 #
139 138 # This can be expressed as:
140 139 # cheads = ( (missingheads and ::commonheads)
141 140 # + (commonheads and ::missingheads))"
142 141 # )
143 142 #
144 143 # while trying to push we already computed the following:
145 144 # common = (::commonheads)
146 145 # missing = ((commonheads::missingheads) - commonheads)
147 146 #
148 147 # We can pick:
149 148 # * missingheads part of common (::commonheads)
150 149 common = self.outgoing.common
151 150 nm = self.repo.changelog.nodemap
152 151 cheads = [node for node in self.revs if nm[node] in common]
153 152 # and
154 153 # * commonheads parents on missing
155 154 revset = unfi.set('%ln and parents(roots(%ln))',
156 155 self.outgoing.commonheads,
157 156 self.outgoing.missing)
158 157 cheads.extend(c.node() for c in revset)
159 158 return cheads
160 159
161 160 @property
162 161 def commonheads(self):
163 162 """set of all common heads after changeset bundle push"""
164 163 if self.cgresult:
165 164 return self.futureheads
166 165 else:
167 166 return self.fallbackheads
168 167
169 168 # mapping of message used when pushing bookmark
170 169 bookmsgmap = {'update': (_("updating bookmark %s\n"),
171 170 _('updating bookmark %s failed!\n')),
172 171 'export': (_("exporting bookmark %s\n"),
173 172 _('exporting bookmark %s failed!\n')),
174 173 'delete': (_("deleting remote bookmark %s\n"),
175 174 _('deleting remote bookmark %s failed!\n')),
176 175 }
177 176
178 177
179 178 def push(repo, remote, force=False, revs=None, newbranch=False, bookmarks=()):
180 179 '''Push outgoing changesets (limited by revs) from a local
181 180 repository to remote. Return an integer:
182 181 - None means nothing to push
183 182 - 0 means HTTP error
184 183 - 1 means we pushed and remote head count is unchanged *or*
185 184 we have outgoing changesets but refused to push
186 185 - other values as described by addchangegroup()
187 186 '''
188 187 pushop = pushoperation(repo, remote, force, revs, newbranch, bookmarks)
189 188 if pushop.remote.local():
190 189 missing = (set(pushop.repo.requirements)
191 190 - pushop.remote.local().supported)
192 191 if missing:
193 192 msg = _("required features are not"
194 193 " supported in the destination:"
195 194 " %s") % (', '.join(sorted(missing)))
196 195 raise util.Abort(msg)
197 196
198 197 # there are two ways to push to remote repo:
199 198 #
200 199 # addchangegroup assumes local user can lock remote
201 200 # repo (local filesystem, old ssh servers).
202 201 #
203 202 # unbundle assumes local user cannot lock remote repo (new ssh
204 203 # servers, http servers).
205 204
206 205 if not pushop.remote.canpush():
207 206 raise util.Abort(_("destination does not support push"))
208 207 # get local lock as we might write phase data
209 208 localwlock = locallock = None
210 209 try:
211 210 # bundle2 push may receive a reply bundle touching bookmarks or other
212 211 # things requiring the wlock. Take it now to ensure proper ordering.
213 212 maypushback = pushop.ui.configbool('experimental', 'bundle2.pushback')
214 213 if _canusebundle2(pushop) and maypushback:
215 214 localwlock = pushop.repo.wlock()
216 215 locallock = pushop.repo.lock()
217 216 pushop.locallocked = True
218 217 except IOError as err:
219 218 pushop.locallocked = False
220 219 if err.errno != errno.EACCES:
221 220 raise
222 221 # source repo cannot be locked.
223 222 # We do not abort the push, but just disable the local phase
224 223 # synchronisation.
225 224 msg = 'cannot lock source repository: %s\n' % err
226 225 pushop.ui.debug(msg)
227 226 try:
228 227 if pushop.locallocked:
229 228 pushop.trmanager = transactionmanager(repo,
230 229 'push-response',
231 230 pushop.remote.url())
232 231 pushop.repo.checkpush(pushop)
233 232 lock = None
234 233 unbundle = pushop.remote.capable('unbundle')
235 234 if not unbundle:
236 235 lock = pushop.remote.lock()
237 236 try:
238 237 _pushdiscovery(pushop)
239 238 if _canusebundle2(pushop):
240 239 _pushbundle2(pushop)
241 240 _pushchangeset(pushop)
242 241 _pushsyncphase(pushop)
243 242 _pushobsolete(pushop)
244 243 _pushbookmark(pushop)
245 244 finally:
246 245 if lock is not None:
247 246 lock.release()
248 247 if pushop.trmanager:
249 248 pushop.trmanager.close()
250 249 finally:
251 250 if pushop.trmanager:
252 251 pushop.trmanager.release()
253 252 if locallock is not None:
254 253 locallock.release()
255 254 if localwlock is not None:
256 255 localwlock.release()
257 256
258 257 return pushop
259 258
260 259 # list of steps to perform discovery before push
261 260 pushdiscoveryorder = []
262 261
263 262 # Mapping between step name and function
264 263 #
265 264 # This exists to help extensions wrap steps if necessary
266 265 pushdiscoverymapping = {}
267 266
268 267 def pushdiscovery(stepname):
269 268 """decorator for function performing discovery before push
270 269
271 270 The function is added to the step -> function mapping and appended to the
272 271 list of steps. Beware that decorated function will be added in order (this
273 272 may matter).
274 273
275 274 You can only use this decorator for a new step, if you want to wrap a step
276 275 from an extension, change the pushdiscovery dictionary directly."""
277 276 def dec(func):
278 277 assert stepname not in pushdiscoverymapping
279 278 pushdiscoverymapping[stepname] = func
280 279 pushdiscoveryorder.append(stepname)
281 280 return func
282 281 return dec
283 282
284 283 def _pushdiscovery(pushop):
285 284 """Run all discovery steps"""
286 285 for stepname in pushdiscoveryorder:
287 286 step = pushdiscoverymapping[stepname]
288 287 step(pushop)
289 288
290 289 @pushdiscovery('changeset')
291 290 def _pushdiscoverychangeset(pushop):
292 291 """discover the changeset that need to be pushed"""
293 292 fci = discovery.findcommonincoming
294 293 commoninc = fci(pushop.repo, pushop.remote, force=pushop.force)
295 294 common, inc, remoteheads = commoninc
296 295 fco = discovery.findcommonoutgoing
297 296 outgoing = fco(pushop.repo, pushop.remote, onlyheads=pushop.revs,
298 297 commoninc=commoninc, force=pushop.force)
299 298 pushop.outgoing = outgoing
300 299 pushop.remoteheads = remoteheads
301 300 pushop.incoming = inc
302 301
303 302 @pushdiscovery('phase')
304 303 def _pushdiscoveryphase(pushop):
305 304 """discover the phase that needs to be pushed
306 305
307 306 (computed for both success and failure case for changesets push)"""
308 307 outgoing = pushop.outgoing
309 308 unfi = pushop.repo.unfiltered()
310 309 remotephases = pushop.remote.listkeys('phases')
311 310 publishing = remotephases.get('publishing', False)
312 311 if (pushop.ui.configbool('ui', '_usedassubrepo', False)
313 312 and remotephases # server supports phases
314 313 and not pushop.outgoing.missing # no changesets to be pushed
315 314 and publishing):
316 315 # When:
317 316 # - this is a subrepo push
318 317 # - and remote support phase
319 318 # - and no changeset are to be pushed
320 319 # - and remote is publishing
321 320 # We may be in issue 3871 case!
322 321 # We drop the possible phase synchronisation done by
323 322 # courtesy to publish changesets possibly locally draft
324 323 # on the remote.
325 324 remotephases = {'publishing': 'True'}
326 325 ana = phases.analyzeremotephases(pushop.repo,
327 326 pushop.fallbackheads,
328 327 remotephases)
329 328 pheads, droots = ana
330 329 extracond = ''
331 330 if not publishing:
332 331 extracond = ' and public()'
333 332 revset = 'heads((%%ln::%%ln) %s)' % extracond
334 333 # Get the list of all revs draft on remote by public here.
335 334 # XXX Beware that revset break if droots is not strictly
336 335 # XXX root we may want to ensure it is but it is costly
337 336 fallback = list(unfi.set(revset, droots, pushop.fallbackheads))
338 337 if not outgoing.missing:
339 338 future = fallback
340 339 else:
341 340 # adds changeset we are going to push as draft
342 341 #
343 342 # should not be necessary for publishing server, but because of an
344 343 # issue fixed in xxxxx we have to do it anyway.
345 344 fdroots = list(unfi.set('roots(%ln + %ln::)',
346 345 outgoing.missing, droots))
347 346 fdroots = [f.node() for f in fdroots]
348 347 future = list(unfi.set(revset, fdroots, pushop.futureheads))
349 348 pushop.outdatedphases = future
350 349 pushop.fallbackoutdatedphases = fallback
351 350
352 351 @pushdiscovery('obsmarker')
353 352 def _pushdiscoveryobsmarkers(pushop):
354 353 if (obsolete.isenabled(pushop.repo, obsolete.exchangeopt)
355 354 and pushop.repo.obsstore
356 355 and 'obsolete' in pushop.remote.listkeys('namespaces')):
357 356 repo = pushop.repo
358 357 # very naive computation, that can be quite expensive on big repo.
359 358 # However: evolution is currently slow on them anyway.
360 359 nodes = (c.node() for c in repo.set('::%ln', pushop.futureheads))
361 360 pushop.outobsmarkers = pushop.repo.obsstore.relevantmarkers(nodes)
362 361
363 362 @pushdiscovery('bookmarks')
364 363 def _pushdiscoverybookmarks(pushop):
365 364 ui = pushop.ui
366 365 repo = pushop.repo.unfiltered()
367 366 remote = pushop.remote
368 367 ui.debug("checking for updated bookmarks\n")
369 368 ancestors = ()
370 369 if pushop.revs:
371 370 revnums = map(repo.changelog.rev, pushop.revs)
372 371 ancestors = repo.changelog.ancestors(revnums, inclusive=True)
373 372 remotebookmark = remote.listkeys('bookmarks')
374 373
375 374 explicit = set(pushop.bookmarks)
376 375
377 376 comp = bookmod.compare(repo, repo._bookmarks, remotebookmark, srchex=hex)
378 377 addsrc, adddst, advsrc, advdst, diverge, differ, invalid, same = comp
379 378 for b, scid, dcid in advsrc:
380 379 if b in explicit:
381 380 explicit.remove(b)
382 381 if not ancestors or repo[scid].rev() in ancestors:
383 382 pushop.outbookmarks.append((b, dcid, scid))
384 383 # search added bookmark
385 384 for b, scid, dcid in addsrc:
386 385 if b in explicit:
387 386 explicit.remove(b)
388 387 pushop.outbookmarks.append((b, '', scid))
389 388 # search for overwritten bookmark
390 389 for b, scid, dcid in advdst + diverge + differ:
391 390 if b in explicit:
392 391 explicit.remove(b)
393 392 pushop.outbookmarks.append((b, dcid, scid))
394 393 # search for bookmark to delete
395 394 for b, scid, dcid in adddst:
396 395 if b in explicit:
397 396 explicit.remove(b)
398 397 # treat as "deleted locally"
399 398 pushop.outbookmarks.append((b, dcid, ''))
400 399 # identical bookmarks shouldn't get reported
401 400 for b, scid, dcid in same:
402 401 if b in explicit:
403 402 explicit.remove(b)
404 403
405 404 if explicit:
406 405 explicit = sorted(explicit)
407 406 # we should probably list all of them
408 407 ui.warn(_('bookmark %s does not exist on the local '
409 408 'or remote repository!\n') % explicit[0])
410 409 pushop.bkresult = 2
411 410
412 411 pushop.outbookmarks.sort()
413 412
414 413 def _pushcheckoutgoing(pushop):
415 414 outgoing = pushop.outgoing
416 415 unfi = pushop.repo.unfiltered()
417 416 if not outgoing.missing:
418 417 # nothing to push
419 418 scmutil.nochangesfound(unfi.ui, unfi, outgoing.excluded)
420 419 return False
421 420 # something to push
422 421 if not pushop.force:
423 422 # if repo.obsstore == False --> no obsolete
424 423 # then, save the iteration
425 424 if unfi.obsstore:
426 425 # this message are here for 80 char limit reason
427 426 mso = _("push includes obsolete changeset: %s!")
428 427 mst = {"unstable": _("push includes unstable changeset: %s!"),
429 428 "bumped": _("push includes bumped changeset: %s!"),
430 429 "divergent": _("push includes divergent changeset: %s!")}
431 430 # If we are to push if there is at least one
432 431 # obsolete or unstable changeset in missing, at
433 432 # least one of the missinghead will be obsolete or
434 433 # unstable. So checking heads only is ok
435 434 for node in outgoing.missingheads:
436 435 ctx = unfi[node]
437 436 if ctx.obsolete():
438 437 raise util.Abort(mso % ctx)
439 438 elif ctx.troubled():
440 439 raise util.Abort(mst[ctx.troubles()[0]] % ctx)
441 440
442 441 # internal config: bookmarks.pushing
443 442 newbm = pushop.ui.configlist('bookmarks', 'pushing')
444 443 discovery.checkheads(unfi, pushop.remote, outgoing,
445 444 pushop.remoteheads,
446 445 pushop.newbranch,
447 446 bool(pushop.incoming),
448 447 newbm)
449 448 return True
450 449
451 450 # List of names of steps to perform for an outgoing bundle2, order matters.
452 451 b2partsgenorder = []
453 452
454 453 # Mapping between step name and function
455 454 #
456 455 # This exists to help extensions wrap steps if necessary
457 456 b2partsgenmapping = {}
458 457
459 458 def b2partsgenerator(stepname, idx=None):
460 459 """decorator for function generating bundle2 part
461 460
462 461 The function is added to the step -> function mapping and appended to the
463 462 list of steps. Beware that decorated functions will be added in order
464 463 (this may matter).
465 464
466 465 You can only use this decorator for new steps, if you want to wrap a step
467 466 from an extension, attack the b2partsgenmapping dictionary directly."""
468 467 def dec(func):
469 468 assert stepname not in b2partsgenmapping
470 469 b2partsgenmapping[stepname] = func
471 470 if idx is None:
472 471 b2partsgenorder.append(stepname)
473 472 else:
474 473 b2partsgenorder.insert(idx, stepname)
475 474 return func
476 475 return dec
477 476
478 477 def _pushb2ctxcheckheads(pushop, bundler):
479 478 """Generate race condition checking parts
480 479
481 480 Exists as an indepedent function to aid extensions
482 481 """
483 482 if not pushop.force:
484 483 bundler.newpart('check:heads', data=iter(pushop.remoteheads))
485 484
486 485 @b2partsgenerator('changeset')
487 486 def _pushb2ctx(pushop, bundler):
488 487 """handle changegroup push through bundle2
489 488
490 489 addchangegroup result is stored in the ``pushop.cgresult`` attribute.
491 490 """
492 491 if 'changesets' in pushop.stepsdone:
493 492 return
494 493 pushop.stepsdone.add('changesets')
495 494 # Send known heads to the server for race detection.
496 495 if not _pushcheckoutgoing(pushop):
497 496 return
498 497 pushop.repo.prepushoutgoinghooks(pushop.repo,
499 498 pushop.remote,
500 499 pushop.outgoing)
501 500
502 501 _pushb2ctxcheckheads(pushop, bundler)
503 502
504 503 b2caps = bundle2.bundle2caps(pushop.remote)
505 504 version = None
506 505 cgversions = b2caps.get('changegroup')
507 506 if not cgversions: # 3.1 and 3.2 ship with an empty value
508 507 cg = changegroup.getlocalchangegroupraw(pushop.repo, 'push',
509 508 pushop.outgoing)
510 509 else:
511 510 cgversions = [v for v in cgversions if v in changegroup.packermap]
512 511 if not cgversions:
513 512 raise ValueError(_('no common changegroup version'))
514 513 version = max(cgversions)
515 514 cg = changegroup.getlocalchangegroupraw(pushop.repo, 'push',
516 515 pushop.outgoing,
517 516 version=version)
518 517 cgpart = bundler.newpart('changegroup', data=cg)
519 518 if version is not None:
520 519 cgpart.addparam('version', version)
521 520 def handlereply(op):
522 521 """extract addchangegroup returns from server reply"""
523 522 cgreplies = op.records.getreplies(cgpart.id)
524 523 assert len(cgreplies['changegroup']) == 1
525 524 pushop.cgresult = cgreplies['changegroup'][0]['return']
526 525 return handlereply
527 526
528 527 @b2partsgenerator('phase')
529 528 def _pushb2phases(pushop, bundler):
530 529 """handle phase push through bundle2"""
531 530 if 'phases' in pushop.stepsdone:
532 531 return
533 532 b2caps = bundle2.bundle2caps(pushop.remote)
534 533 if not 'pushkey' in b2caps:
535 534 return
536 535 pushop.stepsdone.add('phases')
537 536 part2node = []
538 537
539 538 def handlefailure(pushop, exc):
540 539 targetid = int(exc.partid)
541 540 for partid, node in part2node:
542 541 if partid == targetid:
543 542 raise error.Abort(_('updating %s to public failed') % node)
544 543
545 544 enc = pushkey.encode
546 545 for newremotehead in pushop.outdatedphases:
547 546 part = bundler.newpart('pushkey')
548 547 part.addparam('namespace', enc('phases'))
549 548 part.addparam('key', enc(newremotehead.hex()))
550 549 part.addparam('old', enc(str(phases.draft)))
551 550 part.addparam('new', enc(str(phases.public)))
552 551 part2node.append((part.id, newremotehead))
553 552 pushop.pkfailcb[part.id] = handlefailure
554 553
555 554 def handlereply(op):
556 555 for partid, node in part2node:
557 556 partrep = op.records.getreplies(partid)
558 557 results = partrep['pushkey']
559 558 assert len(results) <= 1
560 559 msg = None
561 560 if not results:
562 561 msg = _('server ignored update of %s to public!\n') % node
563 562 elif not int(results[0]['return']):
564 563 msg = _('updating %s to public failed!\n') % node
565 564 if msg is not None:
566 565 pushop.ui.warn(msg)
567 566 return handlereply
568 567
569 568 @b2partsgenerator('obsmarkers')
570 569 def _pushb2obsmarkers(pushop, bundler):
571 570 if 'obsmarkers' in pushop.stepsdone:
572 571 return
573 572 remoteversions = bundle2.obsmarkersversion(bundler.capabilities)
574 573 if obsolete.commonversion(remoteversions) is None:
575 574 return
576 575 pushop.stepsdone.add('obsmarkers')
577 576 if pushop.outobsmarkers:
578 577 markers = sorted(pushop.outobsmarkers)
579 578 buildobsmarkerspart(bundler, markers)
580 579
581 580 @b2partsgenerator('bookmarks')
582 581 def _pushb2bookmarks(pushop, bundler):
583 582 """handle bookmark push through bundle2"""
584 583 if 'bookmarks' in pushop.stepsdone:
585 584 return
586 585 b2caps = bundle2.bundle2caps(pushop.remote)
587 586 if 'pushkey' not in b2caps:
588 587 return
589 588 pushop.stepsdone.add('bookmarks')
590 589 part2book = []
591 590 enc = pushkey.encode
592 591
593 592 def handlefailure(pushop, exc):
594 593 targetid = int(exc.partid)
595 594 for partid, book, action in part2book:
596 595 if partid == targetid:
597 596 raise error.Abort(bookmsgmap[action][1].rstrip() % book)
598 597 # we should not be called for part we did not generated
599 598 assert False
600 599
601 600 for book, old, new in pushop.outbookmarks:
602 601 part = bundler.newpart('pushkey')
603 602 part.addparam('namespace', enc('bookmarks'))
604 603 part.addparam('key', enc(book))
605 604 part.addparam('old', enc(old))
606 605 part.addparam('new', enc(new))
607 606 action = 'update'
608 607 if not old:
609 608 action = 'export'
610 609 elif not new:
611 610 action = 'delete'
612 611 part2book.append((part.id, book, action))
613 612 pushop.pkfailcb[part.id] = handlefailure
614 613
615 614 def handlereply(op):
616 615 ui = pushop.ui
617 616 for partid, book, action in part2book:
618 617 partrep = op.records.getreplies(partid)
619 618 results = partrep['pushkey']
620 619 assert len(results) <= 1
621 620 if not results:
622 621 pushop.ui.warn(_('server ignored bookmark %s update\n') % book)
623 622 else:
624 623 ret = int(results[0]['return'])
625 624 if ret:
626 625 ui.status(bookmsgmap[action][0] % book)
627 626 else:
628 627 ui.warn(bookmsgmap[action][1] % book)
629 628 if pushop.bkresult is not None:
630 629 pushop.bkresult = 1
631 630 return handlereply
632 631
633 632
634 633 def _pushbundle2(pushop):
635 634 """push data to the remote using bundle2
636 635
637 636 The only currently supported type of data is changegroup but this will
638 637 evolve in the future."""
639 638 bundler = bundle2.bundle20(pushop.ui, bundle2.bundle2caps(pushop.remote))
640 639 pushback = (pushop.trmanager
641 640 and pushop.ui.configbool('experimental', 'bundle2.pushback'))
642 641
643 642 # create reply capability
644 643 capsblob = bundle2.encodecaps(bundle2.getrepocaps(pushop.repo,
645 644 allowpushback=pushback))
646 645 bundler.newpart('replycaps', data=capsblob)
647 646 replyhandlers = []
648 647 for partgenname in b2partsgenorder:
649 648 partgen = b2partsgenmapping[partgenname]
650 649 ret = partgen(pushop, bundler)
651 650 if callable(ret):
652 651 replyhandlers.append(ret)
653 652 # do not push if nothing to push
654 653 if bundler.nbparts <= 1:
655 654 return
656 655 stream = util.chunkbuffer(bundler.getchunks())
657 656 try:
658 657 try:
659 658 reply = pushop.remote.unbundle(stream, ['force'], 'push')
660 659 except error.BundleValueError as exc:
661 660 raise util.Abort('missing support for %s' % exc)
662 661 try:
663 662 trgetter = None
664 663 if pushback:
665 664 trgetter = pushop.trmanager.transaction
666 665 op = bundle2.processbundle(pushop.repo, reply, trgetter)
667 666 except error.BundleValueError as exc:
668 667 raise util.Abort('missing support for %s' % exc)
669 668 except error.PushkeyFailed as exc:
670 669 partid = int(exc.partid)
671 670 if partid not in pushop.pkfailcb:
672 671 raise
673 672 pushop.pkfailcb[partid](pushop, exc)
674 673 for rephand in replyhandlers:
675 674 rephand(op)
676 675
677 676 def _pushchangeset(pushop):
678 677 """Make the actual push of changeset bundle to remote repo"""
679 678 if 'changesets' in pushop.stepsdone:
680 679 return
681 680 pushop.stepsdone.add('changesets')
682 681 if not _pushcheckoutgoing(pushop):
683 682 return
684 683 pushop.repo.prepushoutgoinghooks(pushop.repo,
685 684 pushop.remote,
686 685 pushop.outgoing)
687 686 outgoing = pushop.outgoing
688 687 unbundle = pushop.remote.capable('unbundle')
689 688 # TODO: get bundlecaps from remote
690 689 bundlecaps = None
691 690 # create a changegroup from local
692 691 if pushop.revs is None and not (outgoing.excluded
693 692 or pushop.repo.changelog.filteredrevs):
694 693 # push everything,
695 694 # use the fast path, no race possible on push
696 695 bundler = changegroup.cg1packer(pushop.repo, bundlecaps)
697 696 cg = changegroup.getsubset(pushop.repo,
698 697 outgoing,
699 698 bundler,
700 699 'push',
701 700 fastpath=True)
702 701 else:
703 702 cg = changegroup.getlocalchangegroup(pushop.repo, 'push', outgoing,
704 703 bundlecaps)
705 704
706 705 # apply changegroup to remote
707 706 if unbundle:
708 707 # local repo finds heads on server, finds out what
709 708 # revs it must push. once revs transferred, if server
710 709 # finds it has different heads (someone else won
711 710 # commit/push race), server aborts.
712 711 if pushop.force:
713 712 remoteheads = ['force']
714 713 else:
715 714 remoteheads = pushop.remoteheads
716 715 # ssh: return remote's addchangegroup()
717 716 # http: return remote's addchangegroup() or 0 for error
718 717 pushop.cgresult = pushop.remote.unbundle(cg, remoteheads,
719 718 pushop.repo.url())
720 719 else:
721 720 # we return an integer indicating remote head count
722 721 # change
723 722 pushop.cgresult = pushop.remote.addchangegroup(cg, 'push',
724 723 pushop.repo.url())
725 724
726 725 def _pushsyncphase(pushop):
727 726 """synchronise phase information locally and remotely"""
728 727 cheads = pushop.commonheads
729 728 # even when we don't push, exchanging phase data is useful
730 729 remotephases = pushop.remote.listkeys('phases')
731 730 if (pushop.ui.configbool('ui', '_usedassubrepo', False)
732 731 and remotephases # server supports phases
733 732 and pushop.cgresult is None # nothing was pushed
734 733 and remotephases.get('publishing', False)):
735 734 # When:
736 735 # - this is a subrepo push
737 736 # - and remote support phase
738 737 # - and no changeset was pushed
739 738 # - and remote is publishing
740 739 # We may be in issue 3871 case!
741 740 # We drop the possible phase synchronisation done by
742 741 # courtesy to publish changesets possibly locally draft
743 742 # on the remote.
744 743 remotephases = {'publishing': 'True'}
745 744 if not remotephases: # old server or public only reply from non-publishing
746 745 _localphasemove(pushop, cheads)
747 746 # don't push any phase data as there is nothing to push
748 747 else:
749 748 ana = phases.analyzeremotephases(pushop.repo, cheads,
750 749 remotephases)
751 750 pheads, droots = ana
752 751 ### Apply remote phase on local
753 752 if remotephases.get('publishing', False):
754 753 _localphasemove(pushop, cheads)
755 754 else: # publish = False
756 755 _localphasemove(pushop, pheads)
757 756 _localphasemove(pushop, cheads, phases.draft)
758 757 ### Apply local phase on remote
759 758
760 759 if pushop.cgresult:
761 760 if 'phases' in pushop.stepsdone:
762 761 # phases already pushed though bundle2
763 762 return
764 763 outdated = pushop.outdatedphases
765 764 else:
766 765 outdated = pushop.fallbackoutdatedphases
767 766
768 767 pushop.stepsdone.add('phases')
769 768
770 769 # filter heads already turned public by the push
771 770 outdated = [c for c in outdated if c.node() not in pheads]
772 771 # fallback to independent pushkey command
773 772 for newremotehead in outdated:
774 773 r = pushop.remote.pushkey('phases',
775 774 newremotehead.hex(),
776 775 str(phases.draft),
777 776 str(phases.public))
778 777 if not r:
779 778 pushop.ui.warn(_('updating %s to public failed!\n')
780 779 % newremotehead)
781 780
782 781 def _localphasemove(pushop, nodes, phase=phases.public):
783 782 """move <nodes> to <phase> in the local source repo"""
784 783 if pushop.trmanager:
785 784 phases.advanceboundary(pushop.repo,
786 785 pushop.trmanager.transaction(),
787 786 phase,
788 787 nodes)
789 788 else:
790 789 # repo is not locked, do not change any phases!
791 790 # Informs the user that phases should have been moved when
792 791 # applicable.
793 792 actualmoves = [n for n in nodes if phase < pushop.repo[n].phase()]
794 793 phasestr = phases.phasenames[phase]
795 794 if actualmoves:
796 795 pushop.ui.status(_('cannot lock source repo, skipping '
797 796 'local %s phase update\n') % phasestr)
798 797
799 798 def _pushobsolete(pushop):
800 799 """utility function to push obsolete markers to a remote"""
801 800 if 'obsmarkers' in pushop.stepsdone:
802 801 return
803 802 repo = pushop.repo
804 803 remote = pushop.remote
805 804 pushop.stepsdone.add('obsmarkers')
806 805 if pushop.outobsmarkers:
807 806 pushop.ui.debug('try to push obsolete markers to remote\n')
808 807 rslts = []
809 808 remotedata = obsolete._pushkeyescape(sorted(pushop.outobsmarkers))
810 809 for key in sorted(remotedata, reverse=True):
811 810 # reverse sort to ensure we end with dump0
812 811 data = remotedata[key]
813 812 rslts.append(remote.pushkey('obsolete', key, '', data))
814 813 if [r for r in rslts if not r]:
815 814 msg = _('failed to push some obsolete markers!\n')
816 815 repo.ui.warn(msg)
817 816
818 817 def _pushbookmark(pushop):
819 818 """Update bookmark position on remote"""
820 819 if pushop.cgresult == 0 or 'bookmarks' in pushop.stepsdone:
821 820 return
822 821 pushop.stepsdone.add('bookmarks')
823 822 ui = pushop.ui
824 823 remote = pushop.remote
825 824
826 825 for b, old, new in pushop.outbookmarks:
827 826 action = 'update'
828 827 if not old:
829 828 action = 'export'
830 829 elif not new:
831 830 action = 'delete'
832 831 if remote.pushkey('bookmarks', b, old, new):
833 832 ui.status(bookmsgmap[action][0] % b)
834 833 else:
835 834 ui.warn(bookmsgmap[action][1] % b)
836 835 # discovery can have set the value form invalid entry
837 836 if pushop.bkresult is not None:
838 837 pushop.bkresult = 1
839 838
840 839 class pulloperation(object):
841 840 """A object that represent a single pull operation
842 841
843 842 It purpose is to carry pull related state and very common operation.
844 843
845 844 A new should be created at the beginning of each pull and discarded
846 845 afterward.
847 846 """
848 847
849 848 def __init__(self, repo, remote, heads=None, force=False, bookmarks=(),
850 849 remotebookmarks=None):
851 850 # repo we pull into
852 851 self.repo = repo
853 852 # repo we pull from
854 853 self.remote = remote
855 854 # revision we try to pull (None is "all")
856 855 self.heads = heads
857 856 # bookmark pulled explicitly
858 857 self.explicitbookmarks = bookmarks
859 858 # do we force pull?
860 859 self.force = force
861 860 # transaction manager
862 861 self.trmanager = None
863 862 # set of common changeset between local and remote before pull
864 863 self.common = None
865 864 # set of pulled head
866 865 self.rheads = None
867 866 # list of missing changeset to fetch remotely
868 867 self.fetch = None
869 868 # remote bookmarks data
870 869 self.remotebookmarks = remotebookmarks
871 870 # result of changegroup pulling (used as return code by pull)
872 871 self.cgresult = None
873 872 # list of step already done
874 873 self.stepsdone = set()
875 874
876 875 @util.propertycache
877 876 def pulledsubset(self):
878 877 """heads of the set of changeset target by the pull"""
879 878 # compute target subset
880 879 if self.heads is None:
881 880 # We pulled every thing possible
882 881 # sync on everything common
883 882 c = set(self.common)
884 883 ret = list(self.common)
885 884 for n in self.rheads:
886 885 if n not in c:
887 886 ret.append(n)
888 887 return ret
889 888 else:
890 889 # We pulled a specific subset
891 890 # sync on this subset
892 891 return self.heads
893 892
894 893 def gettransaction(self):
895 894 # deprecated; talk to trmanager directly
896 895 return self.trmanager.transaction()
897 896
898 897 class transactionmanager(object):
899 898 """An object to manage the life cycle of a transaction
900 899
901 900 It creates the transaction on demand and calls the appropriate hooks when
902 901 closing the transaction."""
903 902 def __init__(self, repo, source, url):
904 903 self.repo = repo
905 904 self.source = source
906 905 self.url = url
907 906 self._tr = None
908 907
909 908 def transaction(self):
910 909 """Return an open transaction object, constructing if necessary"""
911 910 if not self._tr:
912 911 trname = '%s\n%s' % (self.source, util.hidepassword(self.url))
913 912 self._tr = self.repo.transaction(trname)
914 913 self._tr.hookargs['source'] = self.source
915 914 self._tr.hookargs['url'] = self.url
916 915 return self._tr
917 916
918 917 def close(self):
919 918 """close transaction if created"""
920 919 if self._tr is not None:
921 920 self._tr.close()
922 921
923 922 def release(self):
924 923 """release transaction if created"""
925 924 if self._tr is not None:
926 925 self._tr.release()
927 926
928 927 def pull(repo, remote, heads=None, force=False, bookmarks=(), opargs=None):
929 928 """Fetch repository data from a remote.
930 929
931 930 This is the main function used to retrieve data from a remote repository.
932 931
933 932 ``repo`` is the local repository to clone into.
934 933 ``remote`` is a peer instance.
935 934 ``heads`` is an iterable of revisions we want to pull. ``None`` (the
936 935 default) means to pull everything from the remote.
937 936 ``bookmarks`` is an iterable of bookmarks requesting to be pulled. By
938 937 default, all remote bookmarks are pulled.
939 938 ``opargs`` are additional keyword arguments to pass to ``pulloperation``
940 939 initialization.
941 940
942 941 Returns the ``pulloperation`` created for this pull.
943 942 """
944 943 if opargs is None:
945 944 opargs = {}
946 945 pullop = pulloperation(repo, remote, heads, force, bookmarks=bookmarks,
947 946 **opargs)
948 947 if pullop.remote.local():
949 948 missing = set(pullop.remote.requirements) - pullop.repo.supported
950 949 if missing:
951 950 msg = _("required features are not"
952 951 " supported in the destination:"
953 952 " %s") % (', '.join(sorted(missing)))
954 953 raise util.Abort(msg)
955 954
956 955 lock = pullop.repo.lock()
957 956 try:
958 957 pullop.trmanager = transactionmanager(repo, 'pull', remote.url())
959 958 _pulldiscovery(pullop)
960 959 if _canusebundle2(pullop):
961 960 _pullbundle2(pullop)
962 961 _pullchangeset(pullop)
963 962 _pullphase(pullop)
964 963 _pullbookmarks(pullop)
965 964 _pullobsolete(pullop)
966 965 pullop.trmanager.close()
967 966 finally:
968 967 pullop.trmanager.release()
969 968 lock.release()
970 969
971 970 return pullop
972 971
973 972 # list of steps to perform discovery before pull
974 973 pulldiscoveryorder = []
975 974
976 975 # Mapping between step name and function
977 976 #
978 977 # This exists to help extensions wrap steps if necessary
979 978 pulldiscoverymapping = {}
980 979
981 980 def pulldiscovery(stepname):
982 981 """decorator for function performing discovery before pull
983 982
984 983 The function is added to the step -> function mapping and appended to the
985 984 list of steps. Beware that decorated function will be added in order (this
986 985 may matter).
987 986
988 987 You can only use this decorator for a new step, if you want to wrap a step
989 988 from an extension, change the pulldiscovery dictionary directly."""
990 989 def dec(func):
991 990 assert stepname not in pulldiscoverymapping
992 991 pulldiscoverymapping[stepname] = func
993 992 pulldiscoveryorder.append(stepname)
994 993 return func
995 994 return dec
996 995
997 996 def _pulldiscovery(pullop):
998 997 """Run all discovery steps"""
999 998 for stepname in pulldiscoveryorder:
1000 999 step = pulldiscoverymapping[stepname]
1001 1000 step(pullop)
1002 1001
1003 1002 @pulldiscovery('b1:bookmarks')
1004 1003 def _pullbookmarkbundle1(pullop):
1005 1004 """fetch bookmark data in bundle1 case
1006 1005
1007 1006 If not using bundle2, we have to fetch bookmarks before changeset
1008 1007 discovery to reduce the chance and impact of race conditions."""
1009 1008 if pullop.remotebookmarks is not None:
1010 1009 return
1011 1010 if (_canusebundle2(pullop)
1012 1011 and 'listkeys' in bundle2.bundle2caps(pullop.remote)):
1013 1012 # all known bundle2 servers now support listkeys, but lets be nice with
1014 1013 # new implementation.
1015 1014 return
1016 1015 pullop.remotebookmarks = pullop.remote.listkeys('bookmarks')
1017 1016
1018 1017
1019 1018 @pulldiscovery('changegroup')
1020 1019 def _pulldiscoverychangegroup(pullop):
1021 1020 """discovery phase for the pull
1022 1021
1023 1022 Current handle changeset discovery only, will change handle all discovery
1024 1023 at some point."""
1025 1024 tmp = discovery.findcommonincoming(pullop.repo,
1026 1025 pullop.remote,
1027 1026 heads=pullop.heads,
1028 1027 force=pullop.force)
1029 1028 common, fetch, rheads = tmp
1030 1029 nm = pullop.repo.unfiltered().changelog.nodemap
1031 1030 if fetch and rheads:
1032 1031 # If a remote heads in filtered locally, lets drop it from the unknown
1033 1032 # remote heads and put in back in common.
1034 1033 #
1035 1034 # This is a hackish solution to catch most of "common but locally
1036 1035 # hidden situation". We do not performs discovery on unfiltered
1037 1036 # repository because it end up doing a pathological amount of round
1038 1037 # trip for w huge amount of changeset we do not care about.
1039 1038 #
1040 1039 # If a set of such "common but filtered" changeset exist on the server
1041 1040 # but are not including a remote heads, we'll not be able to detect it,
1042 1041 scommon = set(common)
1043 1042 filteredrheads = []
1044 1043 for n in rheads:
1045 1044 if n in nm:
1046 1045 if n not in scommon:
1047 1046 common.append(n)
1048 1047 else:
1049 1048 filteredrheads.append(n)
1050 1049 if not filteredrheads:
1051 1050 fetch = []
1052 1051 rheads = filteredrheads
1053 1052 pullop.common = common
1054 1053 pullop.fetch = fetch
1055 1054 pullop.rheads = rheads
1056 1055
1057 1056 def _pullbundle2(pullop):
1058 1057 """pull data using bundle2
1059 1058
1060 1059 For now, the only supported data are changegroup."""
1061 1060 remotecaps = bundle2.bundle2caps(pullop.remote)
1062 1061 kwargs = {'bundlecaps': caps20to10(pullop.repo)}
1063 1062 # pulling changegroup
1064 1063 pullop.stepsdone.add('changegroup')
1065 1064
1066 1065 kwargs['common'] = pullop.common
1067 1066 kwargs['heads'] = pullop.heads or pullop.rheads
1068 1067 kwargs['cg'] = pullop.fetch
1069 1068 if 'listkeys' in remotecaps:
1070 1069 kwargs['listkeys'] = ['phase']
1071 1070 if pullop.remotebookmarks is None:
1072 1071 # make sure to always includes bookmark data when migrating
1073 1072 # `hg incoming --bundle` to using this function.
1074 1073 kwargs['listkeys'].append('bookmarks')
1075 1074 if not pullop.fetch:
1076 1075 pullop.repo.ui.status(_("no changes found\n"))
1077 1076 pullop.cgresult = 0
1078 1077 else:
1079 1078 if pullop.heads is None and list(pullop.common) == [nullid]:
1080 1079 pullop.repo.ui.status(_("requesting all changes\n"))
1081 1080 if obsolete.isenabled(pullop.repo, obsolete.exchangeopt):
1082 1081 remoteversions = bundle2.obsmarkersversion(remotecaps)
1083 1082 if obsolete.commonversion(remoteversions) is not None:
1084 1083 kwargs['obsmarkers'] = True
1085 1084 pullop.stepsdone.add('obsmarkers')
1086 1085 _pullbundle2extraprepare(pullop, kwargs)
1087 1086 bundle = pullop.remote.getbundle('pull', **kwargs)
1088 1087 try:
1089 1088 op = bundle2.processbundle(pullop.repo, bundle, pullop.gettransaction)
1090 1089 except error.BundleValueError as exc:
1091 1090 raise util.Abort('missing support for %s' % exc)
1092 1091
1093 1092 if pullop.fetch:
1094 1093 results = [cg['return'] for cg in op.records['changegroup']]
1095 1094 pullop.cgresult = changegroup.combineresults(results)
1096 1095
1097 1096 # processing phases change
1098 1097 for namespace, value in op.records['listkeys']:
1099 1098 if namespace == 'phases':
1100 1099 _pullapplyphases(pullop, value)
1101 1100
1102 1101 # processing bookmark update
1103 1102 for namespace, value in op.records['listkeys']:
1104 1103 if namespace == 'bookmarks':
1105 1104 pullop.remotebookmarks = value
1106 1105
1107 1106 # bookmark data were either already there or pulled in the bundle
1108 1107 if pullop.remotebookmarks is not None:
1109 1108 _pullbookmarks(pullop)
1110 1109
1111 1110 def _pullbundle2extraprepare(pullop, kwargs):
1112 1111 """hook function so that extensions can extend the getbundle call"""
1113 1112 pass
1114 1113
1115 1114 def _pullchangeset(pullop):
1116 1115 """pull changeset from unbundle into the local repo"""
1117 1116 # We delay the open of the transaction as late as possible so we
1118 1117 # don't open transaction for nothing or you break future useful
1119 1118 # rollback call
1120 1119 if 'changegroup' in pullop.stepsdone:
1121 1120 return
1122 1121 pullop.stepsdone.add('changegroup')
1123 1122 if not pullop.fetch:
1124 1123 pullop.repo.ui.status(_("no changes found\n"))
1125 1124 pullop.cgresult = 0
1126 1125 return
1127 1126 pullop.gettransaction()
1128 1127 if pullop.heads is None and list(pullop.common) == [nullid]:
1129 1128 pullop.repo.ui.status(_("requesting all changes\n"))
1130 1129 elif pullop.heads is None and pullop.remote.capable('changegroupsubset'):
1131 1130 # issue1320, avoid a race if remote changed after discovery
1132 1131 pullop.heads = pullop.rheads
1133 1132
1134 1133 if pullop.remote.capable('getbundle'):
1135 1134 # TODO: get bundlecaps from remote
1136 1135 cg = pullop.remote.getbundle('pull', common=pullop.common,
1137 1136 heads=pullop.heads or pullop.rheads)
1138 1137 elif pullop.heads is None:
1139 1138 cg = pullop.remote.changegroup(pullop.fetch, 'pull')
1140 1139 elif not pullop.remote.capable('changegroupsubset'):
1141 1140 raise util.Abort(_("partial pull cannot be done because "
1142 1141 "other repository doesn't support "
1143 1142 "changegroupsubset."))
1144 1143 else:
1145 1144 cg = pullop.remote.changegroupsubset(pullop.fetch, pullop.heads, 'pull')
1146 1145 pullop.cgresult = changegroup.addchangegroup(pullop.repo, cg, 'pull',
1147 1146 pullop.remote.url())
1148 1147
1149 1148 def _pullphase(pullop):
1150 1149 # Get remote phases data from remote
1151 1150 if 'phases' in pullop.stepsdone:
1152 1151 return
1153 1152 remotephases = pullop.remote.listkeys('phases')
1154 1153 _pullapplyphases(pullop, remotephases)
1155 1154
1156 1155 def _pullapplyphases(pullop, remotephases):
1157 1156 """apply phase movement from observed remote state"""
1158 1157 if 'phases' in pullop.stepsdone:
1159 1158 return
1160 1159 pullop.stepsdone.add('phases')
1161 1160 publishing = bool(remotephases.get('publishing', False))
1162 1161 if remotephases and not publishing:
1163 1162 # remote is new and unpublishing
1164 1163 pheads, _dr = phases.analyzeremotephases(pullop.repo,
1165 1164 pullop.pulledsubset,
1166 1165 remotephases)
1167 1166 dheads = pullop.pulledsubset
1168 1167 else:
1169 1168 # Remote is old or publishing all common changesets
1170 1169 # should be seen as public
1171 1170 pheads = pullop.pulledsubset
1172 1171 dheads = []
1173 1172 unfi = pullop.repo.unfiltered()
1174 1173 phase = unfi._phasecache.phase
1175 1174 rev = unfi.changelog.nodemap.get
1176 1175 public = phases.public
1177 1176 draft = phases.draft
1178 1177
1179 1178 # exclude changesets already public locally and update the others
1180 1179 pheads = [pn for pn in pheads if phase(unfi, rev(pn)) > public]
1181 1180 if pheads:
1182 1181 tr = pullop.gettransaction()
1183 1182 phases.advanceboundary(pullop.repo, tr, public, pheads)
1184 1183
1185 1184 # exclude changesets already draft locally and update the others
1186 1185 dheads = [pn for pn in dheads if phase(unfi, rev(pn)) > draft]
1187 1186 if dheads:
1188 1187 tr = pullop.gettransaction()
1189 1188 phases.advanceboundary(pullop.repo, tr, draft, dheads)
1190 1189
1191 1190 def _pullbookmarks(pullop):
1192 1191 """process the remote bookmark information to update the local one"""
1193 1192 if 'bookmarks' in pullop.stepsdone:
1194 1193 return
1195 1194 pullop.stepsdone.add('bookmarks')
1196 1195 repo = pullop.repo
1197 1196 remotebookmarks = pullop.remotebookmarks
1198 1197 bookmod.updatefromremote(repo.ui, repo, remotebookmarks,
1199 1198 pullop.remote.url(),
1200 1199 pullop.gettransaction,
1201 1200 explicit=pullop.explicitbookmarks)
1202 1201
1203 1202 def _pullobsolete(pullop):
1204 1203 """utility function to pull obsolete markers from a remote
1205 1204
1206 1205 The `gettransaction` is function that return the pull transaction, creating
1207 1206 one if necessary. We return the transaction to inform the calling code that
1208 1207 a new transaction have been created (when applicable).
1209 1208
1210 1209 Exists mostly to allow overriding for experimentation purpose"""
1211 1210 if 'obsmarkers' in pullop.stepsdone:
1212 1211 return
1213 1212 pullop.stepsdone.add('obsmarkers')
1214 1213 tr = None
1215 1214 if obsolete.isenabled(pullop.repo, obsolete.exchangeopt):
1216 1215 pullop.repo.ui.debug('fetching remote obsolete markers\n')
1217 1216 remoteobs = pullop.remote.listkeys('obsolete')
1218 1217 if 'dump0' in remoteobs:
1219 1218 tr = pullop.gettransaction()
1220 1219 for key in sorted(remoteobs, reverse=True):
1221 1220 if key.startswith('dump'):
1222 1221 data = base85.b85decode(remoteobs[key])
1223 1222 pullop.repo.obsstore.mergemarkers(tr, data)
1224 1223 pullop.repo.invalidatevolatilesets()
1225 1224 return tr
1226 1225
1227 1226 def caps20to10(repo):
1228 1227 """return a set with appropriate options to use bundle20 during getbundle"""
1229 1228 caps = set(['HG20'])
1230 1229 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
1231 1230 caps.add('bundle2=' + urllib.quote(capsblob))
1232 1231 return caps
1233 1232
1234 1233 # List of names of steps to perform for a bundle2 for getbundle, order matters.
1235 1234 getbundle2partsorder = []
1236 1235
1237 1236 # Mapping between step name and function
1238 1237 #
1239 1238 # This exists to help extensions wrap steps if necessary
1240 1239 getbundle2partsmapping = {}
1241 1240
1242 1241 def getbundle2partsgenerator(stepname, idx=None):
1243 1242 """decorator for function generating bundle2 part for getbundle
1244 1243
1245 1244 The function is added to the step -> function mapping and appended to the
1246 1245 list of steps. Beware that decorated functions will be added in order
1247 1246 (this may matter).
1248 1247
1249 1248 You can only use this decorator for new steps, if you want to wrap a step
1250 1249 from an extension, attack the getbundle2partsmapping dictionary directly."""
1251 1250 def dec(func):
1252 1251 assert stepname not in getbundle2partsmapping
1253 1252 getbundle2partsmapping[stepname] = func
1254 1253 if idx is None:
1255 1254 getbundle2partsorder.append(stepname)
1256 1255 else:
1257 1256 getbundle2partsorder.insert(idx, stepname)
1258 1257 return func
1259 1258 return dec
1260 1259
1261 1260 def getbundle(repo, source, heads=None, common=None, bundlecaps=None,
1262 1261 **kwargs):
1263 1262 """return a full bundle (with potentially multiple kind of parts)
1264 1263
1265 1264 Could be a bundle HG10 or a bundle HG20 depending on bundlecaps
1266 1265 passed. For now, the bundle can contain only changegroup, but this will
1267 1266 changes when more part type will be available for bundle2.
1268 1267
1269 1268 This is different from changegroup.getchangegroup that only returns an HG10
1270 1269 changegroup bundle. They may eventually get reunited in the future when we
1271 1270 have a clearer idea of the API we what to query different data.
1272 1271
1273 1272 The implementation is at a very early stage and will get massive rework
1274 1273 when the API of bundle is refined.
1275 1274 """
1276 1275 # bundle10 case
1277 1276 usebundle2 = False
1278 1277 if bundlecaps is not None:
1279 1278 usebundle2 = any((cap.startswith('HG2') for cap in bundlecaps))
1280 1279 if not usebundle2:
1281 1280 if bundlecaps and not kwargs.get('cg', True):
1282 1281 raise ValueError(_('request for bundle10 must include changegroup'))
1283 1282
1284 1283 if kwargs:
1285 1284 raise ValueError(_('unsupported getbundle arguments: %s')
1286 1285 % ', '.join(sorted(kwargs.keys())))
1287 1286 return changegroup.getchangegroup(repo, source, heads=heads,
1288 1287 common=common, bundlecaps=bundlecaps)
1289 1288
1290 1289 # bundle20 case
1291 1290 b2caps = {}
1292 1291 for bcaps in bundlecaps:
1293 1292 if bcaps.startswith('bundle2='):
1294 1293 blob = urllib.unquote(bcaps[len('bundle2='):])
1295 1294 b2caps.update(bundle2.decodecaps(blob))
1296 1295 bundler = bundle2.bundle20(repo.ui, b2caps)
1297 1296
1298 1297 kwargs['heads'] = heads
1299 1298 kwargs['common'] = common
1300 1299
1301 1300 for name in getbundle2partsorder:
1302 1301 func = getbundle2partsmapping[name]
1303 1302 func(bundler, repo, source, bundlecaps=bundlecaps, b2caps=b2caps,
1304 1303 **kwargs)
1305 1304
1306 1305 return util.chunkbuffer(bundler.getchunks())
1307 1306
1308 1307 @getbundle2partsgenerator('changegroup')
1309 1308 def _getbundlechangegrouppart(bundler, repo, source, bundlecaps=None,
1310 1309 b2caps=None, heads=None, common=None, **kwargs):
1311 1310 """add a changegroup part to the requested bundle"""
1312 1311 cg = None
1313 1312 if kwargs.get('cg', True):
1314 1313 # build changegroup bundle here.
1315 1314 version = None
1316 1315 cgversions = b2caps.get('changegroup')
1317 1316 getcgkwargs = {}
1318 1317 if cgversions: # 3.1 and 3.2 ship with an empty value
1319 1318 cgversions = [v for v in cgversions if v in changegroup.packermap]
1320 1319 if not cgversions:
1321 1320 raise ValueError(_('no common changegroup version'))
1322 1321 version = getcgkwargs['version'] = max(cgversions)
1323 1322 outgoing = changegroup.computeoutgoing(repo, heads, common)
1324 1323 cg = changegroup.getlocalchangegroupraw(repo, source, outgoing,
1325 1324 bundlecaps=bundlecaps,
1326 1325 **getcgkwargs)
1327 1326
1328 1327 if cg:
1329 1328 part = bundler.newpart('changegroup', data=cg)
1330 1329 if version is not None:
1331 1330 part.addparam('version', version)
1332 1331 part.addparam('nbchanges', str(len(outgoing.missing)), mandatory=False)
1333 1332
1334 1333 @getbundle2partsgenerator('listkeys')
1335 1334 def _getbundlelistkeysparts(bundler, repo, source, bundlecaps=None,
1336 1335 b2caps=None, **kwargs):
1337 1336 """add parts containing listkeys namespaces to the requested bundle"""
1338 1337 listkeys = kwargs.get('listkeys', ())
1339 1338 for namespace in listkeys:
1340 1339 part = bundler.newpart('listkeys')
1341 1340 part.addparam('namespace', namespace)
1342 1341 keys = repo.listkeys(namespace).items()
1343 1342 part.data = pushkey.encodekeys(keys)
1344 1343
1345 1344 @getbundle2partsgenerator('obsmarkers')
1346 1345 def _getbundleobsmarkerpart(bundler, repo, source, bundlecaps=None,
1347 1346 b2caps=None, heads=None, **kwargs):
1348 1347 """add an obsolescence markers part to the requested bundle"""
1349 1348 if kwargs.get('obsmarkers', False):
1350 1349 if heads is None:
1351 1350 heads = repo.heads()
1352 1351 subset = [c.node() for c in repo.set('::%ln', heads)]
1353 1352 markers = repo.obsstore.relevantmarkers(subset)
1354 1353 markers = sorted(markers)
1355 1354 buildobsmarkerspart(bundler, markers)
1356 1355
1357 1356 @getbundle2partsgenerator('hgtagsfnodes')
1358 1357 def _getbundletagsfnodes(bundler, repo, source, bundlecaps=None,
1359 1358 b2caps=None, heads=None, common=None,
1360 1359 **kwargs):
1361 1360 """Transfer the .hgtags filenodes mapping.
1362 1361
1363 1362 Only values for heads in this bundle will be transferred.
1364 1363
1365 1364 The part data consists of pairs of 20 byte changeset node and .hgtags
1366 1365 filenodes raw values.
1367 1366 """
1368 1367 # Don't send unless:
1369 1368 # - changeset are being exchanged,
1370 1369 # - the client supports it.
1371 1370 if not (kwargs.get('cg', True) and 'hgtagsfnodes' in b2caps):
1372 1371 return
1373 1372
1374 1373 outgoing = changegroup.computeoutgoing(repo, heads, common)
1375 1374
1376 1375 if not outgoing.missingheads:
1377 1376 return
1378 1377
1379 1378 cache = tags.hgtagsfnodescache(repo.unfiltered())
1380 1379 chunks = []
1381 1380
1382 1381 # .hgtags fnodes are only relevant for head changesets. While we could
1383 1382 # transfer values for all known nodes, there will likely be little to
1384 1383 # no benefit.
1385 1384 #
1386 1385 # We don't bother using a generator to produce output data because
1387 1386 # a) we only have 40 bytes per head and even esoteric numbers of heads
1388 1387 # consume little memory (1M heads is 40MB) b) we don't want to send the
1389 1388 # part if we don't have entries and knowing if we have entries requires
1390 1389 # cache lookups.
1391 1390 for node in outgoing.missingheads:
1392 1391 # Don't compute missing, as this may slow down serving.
1393 1392 fnode = cache.getfnode(node, computemissing=False)
1394 1393 if fnode is not None:
1395 1394 chunks.extend([node, fnode])
1396 1395
1397 1396 if chunks:
1398 1397 bundler.newpart('hgtagsfnodes', data=''.join(chunks))
1399 1398
1400 1399 def check_heads(repo, their_heads, context):
1401 1400 """check if the heads of a repo have been modified
1402 1401
1403 1402 Used by peer for unbundling.
1404 1403 """
1405 1404 heads = repo.heads()
1406 1405 heads_hash = util.sha1(''.join(sorted(heads))).digest()
1407 1406 if not (their_heads == ['force'] or their_heads == heads or
1408 1407 their_heads == ['hashed', heads_hash]):
1409 1408 # someone else committed/pushed/unbundled while we
1410 1409 # were transferring data
1411 1410 raise error.PushRaced('repository changed while %s - '
1412 1411 'please try again' % context)
1413 1412
1414 1413 def unbundle(repo, cg, heads, source, url):
1415 1414 """Apply a bundle to a repo.
1416 1415
1417 1416 this function makes sure the repo is locked during the application and have
1418 1417 mechanism to check that no push race occurred between the creation of the
1419 1418 bundle and its application.
1420 1419
1421 1420 If the push was raced as PushRaced exception is raised."""
1422 1421 r = 0
1423 1422 # need a transaction when processing a bundle2 stream
1424 1423 wlock = lock = tr = None
1425 1424 recordout = None
1426 1425 # quick fix for output mismatch with bundle2 in 3.4
1427 1426 captureoutput = repo.ui.configbool('experimental', 'bundle2-output-capture',
1428 1427 False)
1429 1428 if url.startswith('remote:http:') or url.startswith('remote:https:'):
1430 1429 captureoutput = True
1431 1430 try:
1432 1431 check_heads(repo, heads, 'uploading changes')
1433 1432 # push can proceed
1434 1433 if util.safehasattr(cg, 'params'):
1435 1434 r = None
1436 1435 try:
1437 1436 wlock = repo.wlock()
1438 1437 lock = repo.lock()
1439 1438 tr = repo.transaction(source)
1440 1439 tr.hookargs['source'] = source
1441 1440 tr.hookargs['url'] = url
1442 1441 tr.hookargs['bundle2'] = '1'
1443 1442 op = bundle2.bundleoperation(repo, lambda: tr,
1444 1443 captureoutput=captureoutput)
1445 1444 try:
1446 1445 op = bundle2.processbundle(repo, cg, op=op)
1447 1446 finally:
1448 1447 r = op.reply
1449 1448 if captureoutput and r is not None:
1450 1449 repo.ui.pushbuffer(error=True, subproc=True)
1451 1450 def recordout(output):
1452 1451 r.newpart('output', data=output, mandatory=False)
1453 1452 tr.close()
1454 1453 except BaseException as exc:
1455 1454 exc.duringunbundle2 = True
1456 1455 if captureoutput and r is not None:
1457 1456 parts = exc._bundle2salvagedoutput = r.salvageoutput()
1458 1457 def recordout(output):
1459 1458 part = bundle2.bundlepart('output', data=output,
1460 1459 mandatory=False)
1461 1460 parts.append(part)
1462 1461 raise
1463 1462 else:
1464 1463 lock = repo.lock()
1465 1464 r = changegroup.addchangegroup(repo, cg, source, url)
1466 1465 finally:
1467 1466 lockmod.release(tr, lock, wlock)
1468 1467 if recordout is not None:
1469 1468 recordout(repo.ui.popbuffer())
1470 1469 return r
1471
1472 # This is it's own function so extensions can override it.
1473 def _walkstreamfiles(repo):
1474 return repo.store.walk()
1475
1476 def generatestreamclone(repo):
1477 """Emit content for a streaming clone.
1478
1479 This is a generator of raw chunks that constitute a streaming clone.
1480
1481 The stream begins with a line of 2 space-delimited integers containing the
1482 number of entries and total bytes size.
1483
1484 Next, are N entries for each file being transferred. Each file entry starts
1485 as a line with the file name and integer size delimited by a null byte.
1486 The raw file data follows. Following the raw file data is the next file
1487 entry, or EOF.
1488
1489 When used on the wire protocol, an additional line indicating protocol
1490 success will be prepended to the stream. This function is not responsible
1491 for adding it.
1492
1493 This function will obtain a repository lock to ensure a consistent view of
1494 the store is captured. It therefore may raise LockError.
1495 """
1496 entries = []
1497 total_bytes = 0
1498 # Get consistent snapshot of repo, lock during scan.
1499 lock = repo.lock()
1500 try:
1501 repo.ui.debug('scanning\n')
1502 for name, ename, size in _walkstreamfiles(repo):
1503 if size:
1504 entries.append((name, size))
1505 total_bytes += size
1506 finally:
1507 lock.release()
1508
1509 repo.ui.debug('%d files, %d bytes to transfer\n' %
1510 (len(entries), total_bytes))
1511 yield '%d %d\n' % (len(entries), total_bytes)
1512
1513 svfs = repo.svfs
1514 oldaudit = svfs.mustaudit
1515 debugflag = repo.ui.debugflag
1516 svfs.mustaudit = False
1517
1518 try:
1519 for name, size in entries:
1520 if debugflag:
1521 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
1522 # partially encode name over the wire for backwards compat
1523 yield '%s\0%d\n' % (store.encodedir(name), size)
1524 if size <= 65536:
1525 fp = svfs(name)
1526 try:
1527 data = fp.read(size)
1528 finally:
1529 fp.close()
1530 yield data
1531 else:
1532 for chunk in util.filechunkiter(svfs(name), limit=size):
1533 yield chunk
1534 finally:
1535 svfs.mustaudit = oldaudit
1536
1537 def consumestreamclone(repo, fp):
1538 """Apply the contents from a streaming clone file.
1539
1540 This takes the output from "streamout" and applies it to the specified
1541 repository.
1542
1543 Like "streamout," the status line added by the wire protocol is not handled
1544 by this function.
1545 """
1546 lock = repo.lock()
1547 try:
1548 repo.ui.status(_('streaming all changes\n'))
1549 l = fp.readline()
1550 try:
1551 total_files, total_bytes = map(int, l.split(' ', 1))
1552 except (ValueError, TypeError):
1553 raise error.ResponseError(
1554 _('unexpected response from remote server:'), l)
1555 repo.ui.status(_('%d files to transfer, %s of data\n') %
1556 (total_files, util.bytecount(total_bytes)))
1557 handled_bytes = 0
1558 repo.ui.progress(_('clone'), 0, total=total_bytes)
1559 start = time.time()
1560
1561 tr = repo.transaction(_('clone'))
1562 try:
1563 for i in xrange(total_files):
1564 # XXX doesn't support '\n' or '\r' in filenames
1565 l = fp.readline()
1566 try:
1567 name, size = l.split('\0', 1)
1568 size = int(size)
1569 except (ValueError, TypeError):
1570 raise error.ResponseError(
1571 _('unexpected response from remote server:'), l)
1572 if repo.ui.debugflag:
1573 repo.ui.debug('adding %s (%s)\n' %
1574 (name, util.bytecount(size)))
1575 # for backwards compat, name was partially encoded
1576 ofp = repo.svfs(store.decodedir(name), 'w')
1577 for chunk in util.filechunkiter(fp, limit=size):
1578 handled_bytes += len(chunk)
1579 repo.ui.progress(_('clone'), handled_bytes,
1580 total=total_bytes)
1581 ofp.write(chunk)
1582 ofp.close()
1583 tr.close()
1584 finally:
1585 tr.release()
1586
1587 # Writing straight to files circumvented the inmemory caches
1588 repo.invalidate()
1589
1590 elapsed = time.time() - start
1591 if elapsed <= 0:
1592 elapsed = 0.001
1593 repo.ui.progress(_('clone'), None)
1594 repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') %
1595 (util.bytecount(total_bytes), elapsed,
1596 util.bytecount(total_bytes / elapsed)))
1597 finally:
1598 lock.release()
@@ -1,91 +1,221 b''
1 1 # streamclone.py - producing and consuming streaming repository data
2 2 #
3 3 # Copyright 2015 Gregory Szorc <gregory.szorc@gmail.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 import time
11
10 12 from .i18n import _
11 13 from . import (
12 14 branchmap,
13 15 error,
14 exchange,
16 store,
15 17 util,
16 18 )
17 19
20 # This is it's own function so extensions can override it.
21 def _walkstreamfiles(repo):
22 return repo.store.walk()
23
24 def generatev1(repo):
25 """Emit content for version 1 of a streaming clone.
26
27 This is a generator of raw chunks that constitute a streaming clone.
28
29 The stream begins with a line of 2 space-delimited integers containing the
30 number of entries and total bytes size.
31
32 Next, are N entries for each file being transferred. Each file entry starts
33 as a line with the file name and integer size delimited by a null byte.
34 The raw file data follows. Following the raw file data is the next file
35 entry, or EOF.
36
37 When used on the wire protocol, an additional line indicating protocol
38 success will be prepended to the stream. This function is not responsible
39 for adding it.
40
41 This function will obtain a repository lock to ensure a consistent view of
42 the store is captured. It therefore may raise LockError.
43 """
44 entries = []
45 total_bytes = 0
46 # Get consistent snapshot of repo, lock during scan.
47 lock = repo.lock()
48 try:
49 repo.ui.debug('scanning\n')
50 for name, ename, size in _walkstreamfiles(repo):
51 if size:
52 entries.append((name, size))
53 total_bytes += size
54 finally:
55 lock.release()
56
57 repo.ui.debug('%d files, %d bytes to transfer\n' %
58 (len(entries), total_bytes))
59 yield '%d %d\n' % (len(entries), total_bytes)
60
61 svfs = repo.svfs
62 oldaudit = svfs.mustaudit
63 debugflag = repo.ui.debugflag
64 svfs.mustaudit = False
65
66 try:
67 for name, size in entries:
68 if debugflag:
69 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
70 # partially encode name over the wire for backwards compat
71 yield '%s\0%d\n' % (store.encodedir(name), size)
72 if size <= 65536:
73 fp = svfs(name)
74 try:
75 data = fp.read(size)
76 finally:
77 fp.close()
78 yield data
79 else:
80 for chunk in util.filechunkiter(svfs(name), limit=size):
81 yield chunk
82 finally:
83 svfs.mustaudit = oldaudit
84
85 def consumev1(repo, fp):
86 """Apply the contents from version 1 of a streaming clone file handle.
87
88 This takes the output from "streamout" and applies it to the specified
89 repository.
90
91 Like "streamout," the status line added by the wire protocol is not handled
92 by this function.
93 """
94 lock = repo.lock()
95 try:
96 repo.ui.status(_('streaming all changes\n'))
97 l = fp.readline()
98 try:
99 total_files, total_bytes = map(int, l.split(' ', 1))
100 except (ValueError, TypeError):
101 raise error.ResponseError(
102 _('unexpected response from remote server:'), l)
103 repo.ui.status(_('%d files to transfer, %s of data\n') %
104 (total_files, util.bytecount(total_bytes)))
105 handled_bytes = 0
106 repo.ui.progress(_('clone'), 0, total=total_bytes)
107 start = time.time()
108
109 tr = repo.transaction(_('clone'))
110 try:
111 for i in xrange(total_files):
112 # XXX doesn't support '\n' or '\r' in filenames
113 l = fp.readline()
114 try:
115 name, size = l.split('\0', 1)
116 size = int(size)
117 except (ValueError, TypeError):
118 raise error.ResponseError(
119 _('unexpected response from remote server:'), l)
120 if repo.ui.debugflag:
121 repo.ui.debug('adding %s (%s)\n' %
122 (name, util.bytecount(size)))
123 # for backwards compat, name was partially encoded
124 ofp = repo.svfs(store.decodedir(name), 'w')
125 for chunk in util.filechunkiter(fp, limit=size):
126 handled_bytes += len(chunk)
127 repo.ui.progress(_('clone'), handled_bytes,
128 total=total_bytes)
129 ofp.write(chunk)
130 ofp.close()
131 tr.close()
132 finally:
133 tr.release()
134
135 # Writing straight to files circumvented the inmemory caches
136 repo.invalidate()
137
138 elapsed = time.time() - start
139 if elapsed <= 0:
140 elapsed = 0.001
141 repo.ui.progress(_('clone'), None)
142 repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') %
143 (util.bytecount(total_bytes), elapsed,
144 util.bytecount(total_bytes / elapsed)))
145 finally:
146 lock.release()
147
18 148 def streamin(repo, remote, remotereqs):
19 149 # Save remote branchmap. We will use it later
20 150 # to speed up branchcache creation
21 151 rbranchmap = None
22 152 if remote.capable("branchmap"):
23 153 rbranchmap = remote.branchmap()
24 154
25 155 fp = remote.stream_out()
26 156 l = fp.readline()
27 157 try:
28 158 resp = int(l)
29 159 except ValueError:
30 160 raise error.ResponseError(
31 161 _('unexpected response from remote server:'), l)
32 162 if resp == 1:
33 163 raise util.Abort(_('operation forbidden by server'))
34 164 elif resp == 2:
35 165 raise util.Abort(_('locking the remote repository failed'))
36 166 elif resp != 0:
37 167 raise util.Abort(_('the server sent an unknown error code'))
38 168
39 169 applyremotedata(repo, remotereqs, rbranchmap, fp)
40 170 return len(repo.heads()) + 1
41 171
42 172 def applyremotedata(repo, remotereqs, remotebranchmap, fp):
43 173 """Apply stream clone data to a repository.
44 174
45 175 "remotereqs" is a set of requirements to handle the incoming data.
46 176 "remotebranchmap" is the result of a branchmap lookup on the remote. It
47 177 can be None.
48 178 "fp" is a file object containing the raw stream data, suitable for
49 feeding into exchange.consumestreamclone.
179 feeding into consumev1().
50 180 """
51 181 lock = repo.lock()
52 182 try:
53 exchange.consumestreamclone(repo, fp)
183 consumev1(repo, fp)
54 184
55 185 # new requirements = old non-format requirements +
56 186 # new format-related remote requirements
57 187 # requirements from the streamed-in repository
58 188 repo.requirements = remotereqs | (
59 189 repo.requirements - repo.supportedformats)
60 190 repo._applyopenerreqs()
61 191 repo._writerequirements()
62 192
63 193 if remotebranchmap:
64 194 rbheads = []
65 195 closed = []
66 196 for bheads in remotebranchmap.itervalues():
67 197 rbheads.extend(bheads)
68 198 for h in bheads:
69 199 r = repo.changelog.rev(h)
70 200 b, c = repo.changelog.branchinfo(r)
71 201 if c:
72 202 closed.append(h)
73 203
74 204 if rbheads:
75 205 rtiprev = max((int(repo.changelog.rev(node))
76 206 for node in rbheads))
77 207 cache = branchmap.branchcache(remotebranchmap,
78 208 repo[rtiprev].node(),
79 209 rtiprev,
80 210 closednodes=closed)
81 211 # Try to stick it as low as possible
82 212 # filter above served are unlikely to be fetch from a clone
83 213 for candidate in ('base', 'immutable', 'served'):
84 214 rview = repo.filtered(candidate)
85 215 if cache.validfor(rview):
86 216 repo._branchcaches[candidate] = cache
87 217 cache.write(rview)
88 218 break
89 219 repo.invalidate()
90 220 finally:
91 221 lock.release()
@@ -1,812 +1,813 b''
1 1 # wireproto.py - generic wire protocol support functions
2 2 #
3 3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import os
11 11 import sys
12 12 import tempfile
13 13 import urllib
14 14
15 15 from .i18n import _
16 16 from .node import (
17 17 bin,
18 18 hex,
19 19 )
20 20
21 21 from . import (
22 22 bundle2,
23 23 changegroup as changegroupmod,
24 24 encoding,
25 25 error,
26 26 exchange,
27 27 peer,
28 28 pushkey as pushkeymod,
29 streamclone,
29 30 util,
30 31 )
31 32
32 33 class abstractserverproto(object):
33 34 """abstract class that summarizes the protocol API
34 35
35 36 Used as reference and documentation.
36 37 """
37 38
38 39 def getargs(self, args):
39 40 """return the value for arguments in <args>
40 41
41 42 returns a list of values (same order as <args>)"""
42 43 raise NotImplementedError()
43 44
44 45 def getfile(self, fp):
45 46 """write the whole content of a file into a file like object
46 47
47 48 The file is in the form::
48 49
49 50 (<chunk-size>\n<chunk>)+0\n
50 51
51 52 chunk size is the ascii version of the int.
52 53 """
53 54 raise NotImplementedError()
54 55
55 56 def redirect(self):
56 57 """may setup interception for stdout and stderr
57 58
58 59 See also the `restore` method."""
59 60 raise NotImplementedError()
60 61
61 62 # If the `redirect` function does install interception, the `restore`
62 63 # function MUST be defined. If interception is not used, this function
63 64 # MUST NOT be defined.
64 65 #
65 66 # left commented here on purpose
66 67 #
67 68 #def restore(self):
68 69 # """reinstall previous stdout and stderr and return intercepted stdout
69 70 # """
70 71 # raise NotImplementedError()
71 72
72 73 def groupchunks(self, cg):
73 74 """return 4096 chunks from a changegroup object
74 75
75 76 Some protocols may have compressed the contents."""
76 77 raise NotImplementedError()
77 78
78 79 class remotebatch(peer.batcher):
79 80 '''batches the queued calls; uses as few roundtrips as possible'''
80 81 def __init__(self, remote):
81 82 '''remote must support _submitbatch(encbatch) and
82 83 _submitone(op, encargs)'''
83 84 peer.batcher.__init__(self)
84 85 self.remote = remote
85 86 def submit(self):
86 87 req, rsp = [], []
87 88 for name, args, opts, resref in self.calls:
88 89 mtd = getattr(self.remote, name)
89 90 batchablefn = getattr(mtd, 'batchable', None)
90 91 if batchablefn is not None:
91 92 batchable = batchablefn(mtd.im_self, *args, **opts)
92 93 encargsorres, encresref = batchable.next()
93 94 if encresref:
94 95 req.append((name, encargsorres,))
95 96 rsp.append((batchable, encresref, resref,))
96 97 else:
97 98 resref.set(encargsorres)
98 99 else:
99 100 if req:
100 101 self._submitreq(req, rsp)
101 102 req, rsp = [], []
102 103 resref.set(mtd(*args, **opts))
103 104 if req:
104 105 self._submitreq(req, rsp)
105 106 def _submitreq(self, req, rsp):
106 107 encresults = self.remote._submitbatch(req)
107 108 for encres, r in zip(encresults, rsp):
108 109 batchable, encresref, resref = r
109 110 encresref.set(encres)
110 111 resref.set(batchable.next())
111 112
112 113 # Forward a couple of names from peer to make wireproto interactions
113 114 # slightly more sensible.
114 115 batchable = peer.batchable
115 116 future = peer.future
116 117
117 118 # list of nodes encoding / decoding
118 119
119 120 def decodelist(l, sep=' '):
120 121 if l:
121 122 return map(bin, l.split(sep))
122 123 return []
123 124
124 125 def encodelist(l, sep=' '):
125 126 try:
126 127 return sep.join(map(hex, l))
127 128 except TypeError:
128 129 raise
129 130
130 131 # batched call argument encoding
131 132
132 133 def escapearg(plain):
133 134 return (plain
134 135 .replace(':', ':c')
135 136 .replace(',', ':o')
136 137 .replace(';', ':s')
137 138 .replace('=', ':e'))
138 139
139 140 def unescapearg(escaped):
140 141 return (escaped
141 142 .replace(':e', '=')
142 143 .replace(':s', ';')
143 144 .replace(':o', ',')
144 145 .replace(':c', ':'))
145 146
146 147 # mapping of options accepted by getbundle and their types
147 148 #
148 149 # Meant to be extended by extensions. It is extensions responsibility to ensure
149 150 # such options are properly processed in exchange.getbundle.
150 151 #
151 152 # supported types are:
152 153 #
153 154 # :nodes: list of binary nodes
154 155 # :csv: list of comma-separated values
155 156 # :scsv: list of comma-separated values return as set
156 157 # :plain: string with no transformation needed.
157 158 gboptsmap = {'heads': 'nodes',
158 159 'common': 'nodes',
159 160 'obsmarkers': 'boolean',
160 161 'bundlecaps': 'scsv',
161 162 'listkeys': 'csv',
162 163 'cg': 'boolean'}
163 164
164 165 # client side
165 166
166 167 class wirepeer(peer.peerrepository):
167 168
168 169 def batch(self):
169 170 if self.capable('batch'):
170 171 return remotebatch(self)
171 172 else:
172 173 return peer.localbatch(self)
173 174 def _submitbatch(self, req):
174 175 cmds = []
175 176 for op, argsdict in req:
176 177 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
177 178 for k, v in argsdict.iteritems())
178 179 cmds.append('%s %s' % (op, args))
179 180 rsp = self._call("batch", cmds=';'.join(cmds))
180 181 return [unescapearg(r) for r in rsp.split(';')]
181 182 def _submitone(self, op, args):
182 183 return self._call(op, **args)
183 184
184 185 @batchable
185 186 def lookup(self, key):
186 187 self.requirecap('lookup', _('look up remote revision'))
187 188 f = future()
188 189 yield {'key': encoding.fromlocal(key)}, f
189 190 d = f.value
190 191 success, data = d[:-1].split(" ", 1)
191 192 if int(success):
192 193 yield bin(data)
193 194 self._abort(error.RepoError(data))
194 195
195 196 @batchable
196 197 def heads(self):
197 198 f = future()
198 199 yield {}, f
199 200 d = f.value
200 201 try:
201 202 yield decodelist(d[:-1])
202 203 except ValueError:
203 204 self._abort(error.ResponseError(_("unexpected response:"), d))
204 205
205 206 @batchable
206 207 def known(self, nodes):
207 208 f = future()
208 209 yield {'nodes': encodelist(nodes)}, f
209 210 d = f.value
210 211 try:
211 212 yield [bool(int(b)) for b in d]
212 213 except ValueError:
213 214 self._abort(error.ResponseError(_("unexpected response:"), d))
214 215
215 216 @batchable
216 217 def branchmap(self):
217 218 f = future()
218 219 yield {}, f
219 220 d = f.value
220 221 try:
221 222 branchmap = {}
222 223 for branchpart in d.splitlines():
223 224 branchname, branchheads = branchpart.split(' ', 1)
224 225 branchname = encoding.tolocal(urllib.unquote(branchname))
225 226 branchheads = decodelist(branchheads)
226 227 branchmap[branchname] = branchheads
227 228 yield branchmap
228 229 except TypeError:
229 230 self._abort(error.ResponseError(_("unexpected response:"), d))
230 231
231 232 def branches(self, nodes):
232 233 n = encodelist(nodes)
233 234 d = self._call("branches", nodes=n)
234 235 try:
235 236 br = [tuple(decodelist(b)) for b in d.splitlines()]
236 237 return br
237 238 except ValueError:
238 239 self._abort(error.ResponseError(_("unexpected response:"), d))
239 240
240 241 def between(self, pairs):
241 242 batch = 8 # avoid giant requests
242 243 r = []
243 244 for i in xrange(0, len(pairs), batch):
244 245 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
245 246 d = self._call("between", pairs=n)
246 247 try:
247 248 r.extend(l and decodelist(l) or [] for l in d.splitlines())
248 249 except ValueError:
249 250 self._abort(error.ResponseError(_("unexpected response:"), d))
250 251 return r
251 252
252 253 @batchable
253 254 def pushkey(self, namespace, key, old, new):
254 255 if not self.capable('pushkey'):
255 256 yield False, None
256 257 f = future()
257 258 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
258 259 yield {'namespace': encoding.fromlocal(namespace),
259 260 'key': encoding.fromlocal(key),
260 261 'old': encoding.fromlocal(old),
261 262 'new': encoding.fromlocal(new)}, f
262 263 d = f.value
263 264 d, output = d.split('\n', 1)
264 265 try:
265 266 d = bool(int(d))
266 267 except ValueError:
267 268 raise error.ResponseError(
268 269 _('push failed (unexpected response):'), d)
269 270 for l in output.splitlines(True):
270 271 self.ui.status(_('remote: '), l)
271 272 yield d
272 273
273 274 @batchable
274 275 def listkeys(self, namespace):
275 276 if not self.capable('pushkey'):
276 277 yield {}, None
277 278 f = future()
278 279 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
279 280 yield {'namespace': encoding.fromlocal(namespace)}, f
280 281 d = f.value
281 282 self.ui.debug('received listkey for "%s": %i bytes\n'
282 283 % (namespace, len(d)))
283 284 yield pushkeymod.decodekeys(d)
284 285
285 286 def stream_out(self):
286 287 return self._callstream('stream_out')
287 288
288 289 def changegroup(self, nodes, kind):
289 290 n = encodelist(nodes)
290 291 f = self._callcompressable("changegroup", roots=n)
291 292 return changegroupmod.cg1unpacker(f, 'UN')
292 293
293 294 def changegroupsubset(self, bases, heads, kind):
294 295 self.requirecap('changegroupsubset', _('look up remote changes'))
295 296 bases = encodelist(bases)
296 297 heads = encodelist(heads)
297 298 f = self._callcompressable("changegroupsubset",
298 299 bases=bases, heads=heads)
299 300 return changegroupmod.cg1unpacker(f, 'UN')
300 301
301 302 def getbundle(self, source, **kwargs):
302 303 self.requirecap('getbundle', _('look up remote changes'))
303 304 opts = {}
304 305 bundlecaps = kwargs.get('bundlecaps')
305 306 if bundlecaps is not None:
306 307 kwargs['bundlecaps'] = sorted(bundlecaps)
307 308 else:
308 309 bundlecaps = () # kwargs could have it to None
309 310 for key, value in kwargs.iteritems():
310 311 if value is None:
311 312 continue
312 313 keytype = gboptsmap.get(key)
313 314 if keytype is None:
314 315 assert False, 'unexpected'
315 316 elif keytype == 'nodes':
316 317 value = encodelist(value)
317 318 elif keytype in ('csv', 'scsv'):
318 319 value = ','.join(value)
319 320 elif keytype == 'boolean':
320 321 value = '%i' % bool(value)
321 322 elif keytype != 'plain':
322 323 raise KeyError('unknown getbundle option type %s'
323 324 % keytype)
324 325 opts[key] = value
325 326 f = self._callcompressable("getbundle", **opts)
326 327 if any((cap.startswith('HG2') for cap in bundlecaps)):
327 328 return bundle2.getunbundler(self.ui, f)
328 329 else:
329 330 return changegroupmod.cg1unpacker(f, 'UN')
330 331
331 332 def unbundle(self, cg, heads, source):
332 333 '''Send cg (a readable file-like object representing the
333 334 changegroup to push, typically a chunkbuffer object) to the
334 335 remote server as a bundle.
335 336
336 337 When pushing a bundle10 stream, return an integer indicating the
337 338 result of the push (see localrepository.addchangegroup()).
338 339
339 340 When pushing a bundle20 stream, return a bundle20 stream.'''
340 341
341 342 if heads != ['force'] and self.capable('unbundlehash'):
342 343 heads = encodelist(['hashed',
343 344 util.sha1(''.join(sorted(heads))).digest()])
344 345 else:
345 346 heads = encodelist(heads)
346 347
347 348 if util.safehasattr(cg, 'deltaheader'):
348 349 # this a bundle10, do the old style call sequence
349 350 ret, output = self._callpush("unbundle", cg, heads=heads)
350 351 if ret == "":
351 352 raise error.ResponseError(
352 353 _('push failed:'), output)
353 354 try:
354 355 ret = int(ret)
355 356 except ValueError:
356 357 raise error.ResponseError(
357 358 _('push failed (unexpected response):'), ret)
358 359
359 360 for l in output.splitlines(True):
360 361 self.ui.status(_('remote: '), l)
361 362 else:
362 363 # bundle2 push. Send a stream, fetch a stream.
363 364 stream = self._calltwowaystream('unbundle', cg, heads=heads)
364 365 ret = bundle2.getunbundler(self.ui, stream)
365 366 return ret
366 367
367 368 def debugwireargs(self, one, two, three=None, four=None, five=None):
368 369 # don't pass optional arguments left at their default value
369 370 opts = {}
370 371 if three is not None:
371 372 opts['three'] = three
372 373 if four is not None:
373 374 opts['four'] = four
374 375 return self._call('debugwireargs', one=one, two=two, **opts)
375 376
376 377 def _call(self, cmd, **args):
377 378 """execute <cmd> on the server
378 379
379 380 The command is expected to return a simple string.
380 381
381 382 returns the server reply as a string."""
382 383 raise NotImplementedError()
383 384
384 385 def _callstream(self, cmd, **args):
385 386 """execute <cmd> on the server
386 387
387 388 The command is expected to return a stream.
388 389
389 390 returns the server reply as a file like object."""
390 391 raise NotImplementedError()
391 392
392 393 def _callcompressable(self, cmd, **args):
393 394 """execute <cmd> on the server
394 395
395 396 The command is expected to return a stream.
396 397
397 398 The stream may have been compressed in some implementations. This
398 399 function takes care of the decompression. This is the only difference
399 400 with _callstream.
400 401
401 402 returns the server reply as a file like object.
402 403 """
403 404 raise NotImplementedError()
404 405
405 406 def _callpush(self, cmd, fp, **args):
406 407 """execute a <cmd> on server
407 408
408 409 The command is expected to be related to a push. Push has a special
409 410 return method.
410 411
411 412 returns the server reply as a (ret, output) tuple. ret is either
412 413 empty (error) or a stringified int.
413 414 """
414 415 raise NotImplementedError()
415 416
416 417 def _calltwowaystream(self, cmd, fp, **args):
417 418 """execute <cmd> on server
418 419
419 420 The command will send a stream to the server and get a stream in reply.
420 421 """
421 422 raise NotImplementedError()
422 423
423 424 def _abort(self, exception):
424 425 """clearly abort the wire protocol connection and raise the exception
425 426 """
426 427 raise NotImplementedError()
427 428
428 429 # server side
429 430
430 431 # wire protocol command can either return a string or one of these classes.
431 432 class streamres(object):
432 433 """wireproto reply: binary stream
433 434
434 435 The call was successful and the result is a stream.
435 436 Iterate on the `self.gen` attribute to retrieve chunks.
436 437 """
437 438 def __init__(self, gen):
438 439 self.gen = gen
439 440
440 441 class pushres(object):
441 442 """wireproto reply: success with simple integer return
442 443
443 444 The call was successful and returned an integer contained in `self.res`.
444 445 """
445 446 def __init__(self, res):
446 447 self.res = res
447 448
448 449 class pusherr(object):
449 450 """wireproto reply: failure
450 451
451 452 The call failed. The `self.res` attribute contains the error message.
452 453 """
453 454 def __init__(self, res):
454 455 self.res = res
455 456
456 457 class ooberror(object):
457 458 """wireproto reply: failure of a batch of operation
458 459
459 460 Something failed during a batch call. The error message is stored in
460 461 `self.message`.
461 462 """
462 463 def __init__(self, message):
463 464 self.message = message
464 465
465 466 def dispatch(repo, proto, command):
466 467 repo = repo.filtered("served")
467 468 func, spec = commands[command]
468 469 args = proto.getargs(spec)
469 470 return func(repo, proto, *args)
470 471
471 472 def options(cmd, keys, others):
472 473 opts = {}
473 474 for k in keys:
474 475 if k in others:
475 476 opts[k] = others[k]
476 477 del others[k]
477 478 if others:
478 479 sys.stderr.write("warning: %s ignored unexpected arguments %s\n"
479 480 % (cmd, ",".join(others)))
480 481 return opts
481 482
482 483 # list of commands
483 484 commands = {}
484 485
485 486 def wireprotocommand(name, args=''):
486 487 """decorator for wire protocol command"""
487 488 def register(func):
488 489 commands[name] = (func, args)
489 490 return func
490 491 return register
491 492
492 493 @wireprotocommand('batch', 'cmds *')
493 494 def batch(repo, proto, cmds, others):
494 495 repo = repo.filtered("served")
495 496 res = []
496 497 for pair in cmds.split(';'):
497 498 op, args = pair.split(' ', 1)
498 499 vals = {}
499 500 for a in args.split(','):
500 501 if a:
501 502 n, v = a.split('=')
502 503 vals[n] = unescapearg(v)
503 504 func, spec = commands[op]
504 505 if spec:
505 506 keys = spec.split()
506 507 data = {}
507 508 for k in keys:
508 509 if k == '*':
509 510 star = {}
510 511 for key in vals.keys():
511 512 if key not in keys:
512 513 star[key] = vals[key]
513 514 data['*'] = star
514 515 else:
515 516 data[k] = vals[k]
516 517 result = func(repo, proto, *[data[k] for k in keys])
517 518 else:
518 519 result = func(repo, proto)
519 520 if isinstance(result, ooberror):
520 521 return result
521 522 res.append(escapearg(result))
522 523 return ';'.join(res)
523 524
524 525 @wireprotocommand('between', 'pairs')
525 526 def between(repo, proto, pairs):
526 527 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
527 528 r = []
528 529 for b in repo.between(pairs):
529 530 r.append(encodelist(b) + "\n")
530 531 return "".join(r)
531 532
532 533 @wireprotocommand('branchmap')
533 534 def branchmap(repo, proto):
534 535 branchmap = repo.branchmap()
535 536 heads = []
536 537 for branch, nodes in branchmap.iteritems():
537 538 branchname = urllib.quote(encoding.fromlocal(branch))
538 539 branchnodes = encodelist(nodes)
539 540 heads.append('%s %s' % (branchname, branchnodes))
540 541 return '\n'.join(heads)
541 542
542 543 @wireprotocommand('branches', 'nodes')
543 544 def branches(repo, proto, nodes):
544 545 nodes = decodelist(nodes)
545 546 r = []
546 547 for b in repo.branches(nodes):
547 548 r.append(encodelist(b) + "\n")
548 549 return "".join(r)
549 550
550 551
551 552 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
552 553 'known', 'getbundle', 'unbundlehash', 'batch']
553 554
554 555 def _capabilities(repo, proto):
555 556 """return a list of capabilities for a repo
556 557
557 558 This function exists to allow extensions to easily wrap capabilities
558 559 computation
559 560
560 561 - returns a lists: easy to alter
561 562 - change done here will be propagated to both `capabilities` and `hello`
562 563 command without any other action needed.
563 564 """
564 565 # copy to prevent modification of the global list
565 566 caps = list(wireprotocaps)
566 567 if _allowstream(repo.ui):
567 568 if repo.ui.configbool('server', 'preferuncompressed', False):
568 569 caps.append('stream-preferred')
569 570 requiredformats = repo.requirements & repo.supportedformats
570 571 # if our local revlogs are just revlogv1, add 'stream' cap
571 572 if not requiredformats - set(('revlogv1',)):
572 573 caps.append('stream')
573 574 # otherwise, add 'streamreqs' detailing our local revlog format
574 575 else:
575 576 caps.append('streamreqs=%s' % ','.join(requiredformats))
576 577 if repo.ui.configbool('experimental', 'bundle2-advertise', True):
577 578 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
578 579 caps.append('bundle2=' + urllib.quote(capsblob))
579 580 caps.append('unbundle=%s' % ','.join(changegroupmod.bundlepriority))
580 581 caps.append(
581 582 'httpheader=%d' % repo.ui.configint('server', 'maxhttpheaderlen', 1024))
582 583 return caps
583 584
584 585 # If you are writing an extension and consider wrapping this function. Wrap
585 586 # `_capabilities` instead.
586 587 @wireprotocommand('capabilities')
587 588 def capabilities(repo, proto):
588 589 return ' '.join(_capabilities(repo, proto))
589 590
590 591 @wireprotocommand('changegroup', 'roots')
591 592 def changegroup(repo, proto, roots):
592 593 nodes = decodelist(roots)
593 594 cg = changegroupmod.changegroup(repo, nodes, 'serve')
594 595 return streamres(proto.groupchunks(cg))
595 596
596 597 @wireprotocommand('changegroupsubset', 'bases heads')
597 598 def changegroupsubset(repo, proto, bases, heads):
598 599 bases = decodelist(bases)
599 600 heads = decodelist(heads)
600 601 cg = changegroupmod.changegroupsubset(repo, bases, heads, 'serve')
601 602 return streamres(proto.groupchunks(cg))
602 603
603 604 @wireprotocommand('debugwireargs', 'one two *')
604 605 def debugwireargs(repo, proto, one, two, others):
605 606 # only accept optional args from the known set
606 607 opts = options('debugwireargs', ['three', 'four'], others)
607 608 return repo.debugwireargs(one, two, **opts)
608 609
609 610 # List of options accepted by getbundle.
610 611 #
611 612 # Meant to be extended by extensions. It is the extension's responsibility to
612 613 # ensure such options are properly processed in exchange.getbundle.
613 614 gboptslist = ['heads', 'common', 'bundlecaps']
614 615
615 616 @wireprotocommand('getbundle', '*')
616 617 def getbundle(repo, proto, others):
617 618 opts = options('getbundle', gboptsmap.keys(), others)
618 619 for k, v in opts.iteritems():
619 620 keytype = gboptsmap[k]
620 621 if keytype == 'nodes':
621 622 opts[k] = decodelist(v)
622 623 elif keytype == 'csv':
623 624 opts[k] = list(v.split(','))
624 625 elif keytype == 'scsv':
625 626 opts[k] = set(v.split(','))
626 627 elif keytype == 'boolean':
627 628 opts[k] = bool(v)
628 629 elif keytype != 'plain':
629 630 raise KeyError('unknown getbundle option type %s'
630 631 % keytype)
631 632 cg = exchange.getbundle(repo, 'serve', **opts)
632 633 return streamres(proto.groupchunks(cg))
633 634
634 635 @wireprotocommand('heads')
635 636 def heads(repo, proto):
636 637 h = repo.heads()
637 638 return encodelist(h) + "\n"
638 639
639 640 @wireprotocommand('hello')
640 641 def hello(repo, proto):
641 642 '''the hello command returns a set of lines describing various
642 643 interesting things about the server, in an RFC822-like format.
643 644 Currently the only one defined is "capabilities", which
644 645 consists of a line in the form:
645 646
646 647 capabilities: space separated list of tokens
647 648 '''
648 649 return "capabilities: %s\n" % (capabilities(repo, proto))
649 650
650 651 @wireprotocommand('listkeys', 'namespace')
651 652 def listkeys(repo, proto, namespace):
652 653 d = repo.listkeys(encoding.tolocal(namespace)).items()
653 654 return pushkeymod.encodekeys(d)
654 655
655 656 @wireprotocommand('lookup', 'key')
656 657 def lookup(repo, proto, key):
657 658 try:
658 659 k = encoding.tolocal(key)
659 660 c = repo[k]
660 661 r = c.hex()
661 662 success = 1
662 663 except Exception as inst:
663 664 r = str(inst)
664 665 success = 0
665 666 return "%s %s\n" % (success, r)
666 667
667 668 @wireprotocommand('known', 'nodes *')
668 669 def known(repo, proto, nodes, others):
669 670 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
670 671
671 672 @wireprotocommand('pushkey', 'namespace key old new')
672 673 def pushkey(repo, proto, namespace, key, old, new):
673 674 # compatibility with pre-1.8 clients which were accidentally
674 675 # sending raw binary nodes rather than utf-8-encoded hex
675 676 if len(new) == 20 and new.encode('string-escape') != new:
676 677 # looks like it could be a binary node
677 678 try:
678 679 new.decode('utf-8')
679 680 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
680 681 except UnicodeDecodeError:
681 682 pass # binary, leave unmodified
682 683 else:
683 684 new = encoding.tolocal(new) # normal path
684 685
685 686 if util.safehasattr(proto, 'restore'):
686 687
687 688 proto.redirect()
688 689
689 690 try:
690 691 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
691 692 encoding.tolocal(old), new) or False
692 693 except util.Abort:
693 694 r = False
694 695
695 696 output = proto.restore()
696 697
697 698 return '%s\n%s' % (int(r), output)
698 699
699 700 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
700 701 encoding.tolocal(old), new)
701 702 return '%s\n' % int(r)
702 703
703 704 def _allowstream(ui):
704 705 return ui.configbool('server', 'uncompressed', True, untrusted=True)
705 706
706 707 @wireprotocommand('stream_out')
707 708 def stream(repo, proto):
708 709 '''If the server supports streaming clone, it advertises the "stream"
709 710 capability with a value representing the version and flags of the repo
710 711 it is serving. Client checks to see if it understands the format.
711 712 '''
712 713 if not _allowstream(repo.ui):
713 714 return '1\n'
714 715
715 716 def getstream(it):
716 717 yield '0\n'
717 718 for chunk in it:
718 719 yield chunk
719 720
720 721 try:
721 722 # LockError may be raised before the first result is yielded. Don't
722 723 # emit output until we're sure we got the lock successfully.
723 it = exchange.generatestreamclone(repo)
724 it = streamclone.generatev1(repo)
724 725 return streamres(getstream(it))
725 726 except error.LockError:
726 727 return '2\n'
727 728
728 729 @wireprotocommand('unbundle', 'heads')
729 730 def unbundle(repo, proto, heads):
730 731 their_heads = decodelist(heads)
731 732
732 733 try:
733 734 proto.redirect()
734 735
735 736 exchange.check_heads(repo, their_heads, 'preparing changes')
736 737
737 738 # write bundle data to temporary file because it can be big
738 739 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
739 740 fp = os.fdopen(fd, 'wb+')
740 741 r = 0
741 742 try:
742 743 proto.getfile(fp)
743 744 fp.seek(0)
744 745 gen = exchange.readbundle(repo.ui, fp, None)
745 746 r = exchange.unbundle(repo, gen, their_heads, 'serve',
746 747 proto._client())
747 748 if util.safehasattr(r, 'addpart'):
748 749 # The return looks streamable, we are in the bundle2 case and
749 750 # should return a stream.
750 751 return streamres(r.getchunks())
751 752 return pushres(r)
752 753
753 754 finally:
754 755 fp.close()
755 756 os.unlink(tempname)
756 757
757 758 except (error.BundleValueError, util.Abort, error.PushRaced) as exc:
758 759 # handle non-bundle2 case first
759 760 if not getattr(exc, 'duringunbundle2', False):
760 761 try:
761 762 raise
762 763 except util.Abort:
763 764 # The old code we moved used sys.stderr directly.
764 765 # We did not change it to minimise code change.
765 766 # This need to be moved to something proper.
766 767 # Feel free to do it.
767 768 sys.stderr.write("abort: %s\n" % exc)
768 769 return pushres(0)
769 770 except error.PushRaced:
770 771 return pusherr(str(exc))
771 772
772 773 bundler = bundle2.bundle20(repo.ui)
773 774 for out in getattr(exc, '_bundle2salvagedoutput', ()):
774 775 bundler.addpart(out)
775 776 try:
776 777 try:
777 778 raise
778 779 except error.PushkeyFailed as exc:
779 780 # check client caps
780 781 remotecaps = getattr(exc, '_replycaps', None)
781 782 if (remotecaps is not None
782 783 and 'pushkey' not in remotecaps.get('error', ())):
783 784 # no support remote side, fallback to Abort handler.
784 785 raise
785 786 part = bundler.newpart('error:pushkey')
786 787 part.addparam('in-reply-to', exc.partid)
787 788 if exc.namespace is not None:
788 789 part.addparam('namespace', exc.namespace, mandatory=False)
789 790 if exc.key is not None:
790 791 part.addparam('key', exc.key, mandatory=False)
791 792 if exc.new is not None:
792 793 part.addparam('new', exc.new, mandatory=False)
793 794 if exc.old is not None:
794 795 part.addparam('old', exc.old, mandatory=False)
795 796 if exc.ret is not None:
796 797 part.addparam('ret', exc.ret, mandatory=False)
797 798 except error.BundleValueError as exc:
798 799 errpart = bundler.newpart('error:unsupportedcontent')
799 800 if exc.parttype is not None:
800 801 errpart.addparam('parttype', exc.parttype)
801 802 if exc.params:
802 803 errpart.addparam('params', '\0'.join(exc.params))
803 804 except util.Abort as exc:
804 805 manargs = [('message', str(exc))]
805 806 advargs = []
806 807 if exc.hint is not None:
807 808 advargs.append(('hint', exc.hint))
808 809 bundler.addpart(bundle2.bundlepart('error:abort',
809 810 manargs, advargs))
810 811 except error.PushRaced as exc:
811 812 bundler.newpart('error:pushraced', [('message', str(exc))])
812 813 return streamres(bundler.getchunks())
General Comments 0
You need to be logged in to leave comments. Login now