##// END OF EJS Templates
discovery: run discovery on filtered repository...
Pierre-Yves David -
r23848:c5456b64 default
parent child Browse files
Show More
@@ -1,1294 +1,1318 b''
1 1 # exchange.py - utility to exchange data between repos.
2 2 #
3 3 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from i18n import _
9 9 from node import hex, nullid
10 10 import errno, urllib
11 11 import util, scmutil, changegroup, base85, error
12 12 import discovery, phases, obsolete, bookmarks as bookmod, bundle2, pushkey
13 13
14 14 def readbundle(ui, fh, fname, vfs=None):
15 15 header = changegroup.readexactly(fh, 4)
16 16
17 17 alg = None
18 18 if not fname:
19 19 fname = "stream"
20 20 if not header.startswith('HG') and header.startswith('\0'):
21 21 fh = changegroup.headerlessfixup(fh, header)
22 22 header = "HG10"
23 23 alg = 'UN'
24 24 elif vfs:
25 25 fname = vfs.join(fname)
26 26
27 27 magic, version = header[0:2], header[2:4]
28 28
29 29 if magic != 'HG':
30 30 raise util.Abort(_('%s: not a Mercurial bundle') % fname)
31 31 if version == '10':
32 32 if alg is None:
33 33 alg = changegroup.readexactly(fh, 2)
34 34 return changegroup.cg1unpacker(fh, alg)
35 35 elif version == '2Y':
36 36 return bundle2.unbundle20(ui, fh, header=magic + version)
37 37 else:
38 38 raise util.Abort(_('%s: unknown bundle version %s') % (fname, version))
39 39
40 40 def buildobsmarkerspart(bundler, markers):
41 41 """add an obsmarker part to the bundler with <markers>
42 42
43 43 No part is created if markers is empty.
44 44 Raises ValueError if the bundler doesn't support any known obsmarker format.
45 45 """
46 46 if markers:
47 47 remoteversions = bundle2.obsmarkersversion(bundler.capabilities)
48 48 version = obsolete.commonversion(remoteversions)
49 49 if version is None:
50 50 raise ValueError('bundler do not support common obsmarker format')
51 51 stream = obsolete.encodemarkers(markers, True, version=version)
52 52 return bundler.newpart('b2x:obsmarkers', data=stream)
53 53 return None
54 54
55 55 class pushoperation(object):
56 56 """A object that represent a single push operation
57 57
58 58 It purpose is to carry push related state and very common operation.
59 59
60 60 A new should be created at the beginning of each push and discarded
61 61 afterward.
62 62 """
63 63
64 64 def __init__(self, repo, remote, force=False, revs=None, newbranch=False,
65 65 bookmarks=()):
66 66 # repo we push from
67 67 self.repo = repo
68 68 self.ui = repo.ui
69 69 # repo we push to
70 70 self.remote = remote
71 71 # force option provided
72 72 self.force = force
73 73 # revs to be pushed (None is "all")
74 74 self.revs = revs
75 75 # bookmark explicitly pushed
76 76 self.bookmarks = bookmarks
77 77 # allow push of new branch
78 78 self.newbranch = newbranch
79 79 # did a local lock get acquired?
80 80 self.locallocked = None
81 81 # step already performed
82 82 # (used to check what steps have been already performed through bundle2)
83 83 self.stepsdone = set()
84 84 # Integer version of the changegroup push result
85 85 # - None means nothing to push
86 86 # - 0 means HTTP error
87 87 # - 1 means we pushed and remote head count is unchanged *or*
88 88 # we have outgoing changesets but refused to push
89 89 # - other values as described by addchangegroup()
90 90 self.cgresult = None
91 91 # Boolean value for the bookmark push
92 92 self.bkresult = None
93 93 # discover.outgoing object (contains common and outgoing data)
94 94 self.outgoing = None
95 95 # all remote heads before the push
96 96 self.remoteheads = None
97 97 # testable as a boolean indicating if any nodes are missing locally.
98 98 self.incoming = None
99 99 # phases changes that must be pushed along side the changesets
100 100 self.outdatedphases = None
101 101 # phases changes that must be pushed if changeset push fails
102 102 self.fallbackoutdatedphases = None
103 103 # outgoing obsmarkers
104 104 self.outobsmarkers = set()
105 105 # outgoing bookmarks
106 106 self.outbookmarks = []
107 107 # transaction manager
108 108 self.trmanager = None
109 109
110 110 @util.propertycache
111 111 def futureheads(self):
112 112 """future remote heads if the changeset push succeeds"""
113 113 return self.outgoing.missingheads
114 114
115 115 @util.propertycache
116 116 def fallbackheads(self):
117 117 """future remote heads if the changeset push fails"""
118 118 if self.revs is None:
119 119 # not target to push, all common are relevant
120 120 return self.outgoing.commonheads
121 121 unfi = self.repo.unfiltered()
122 122 # I want cheads = heads(::missingheads and ::commonheads)
123 123 # (missingheads is revs with secret changeset filtered out)
124 124 #
125 125 # This can be expressed as:
126 126 # cheads = ( (missingheads and ::commonheads)
127 127 # + (commonheads and ::missingheads))"
128 128 # )
129 129 #
130 130 # while trying to push we already computed the following:
131 131 # common = (::commonheads)
132 132 # missing = ((commonheads::missingheads) - commonheads)
133 133 #
134 134 # We can pick:
135 135 # * missingheads part of common (::commonheads)
136 136 common = set(self.outgoing.common)
137 137 nm = self.repo.changelog.nodemap
138 138 cheads = [node for node in self.revs if nm[node] in common]
139 139 # and
140 140 # * commonheads parents on missing
141 141 revset = unfi.set('%ln and parents(roots(%ln))',
142 142 self.outgoing.commonheads,
143 143 self.outgoing.missing)
144 144 cheads.extend(c.node() for c in revset)
145 145 return cheads
146 146
147 147 @property
148 148 def commonheads(self):
149 149 """set of all common heads after changeset bundle push"""
150 150 if self.cgresult:
151 151 return self.futureheads
152 152 else:
153 153 return self.fallbackheads
154 154
155 155 # mapping of message used when pushing bookmark
156 156 bookmsgmap = {'update': (_("updating bookmark %s\n"),
157 157 _('updating bookmark %s failed!\n')),
158 158 'export': (_("exporting bookmark %s\n"),
159 159 _('exporting bookmark %s failed!\n')),
160 160 'delete': (_("deleting remote bookmark %s\n"),
161 161 _('deleting remote bookmark %s failed!\n')),
162 162 }
163 163
164 164
165 165 def push(repo, remote, force=False, revs=None, newbranch=False, bookmarks=()):
166 166 '''Push outgoing changesets (limited by revs) from a local
167 167 repository to remote. Return an integer:
168 168 - None means nothing to push
169 169 - 0 means HTTP error
170 170 - 1 means we pushed and remote head count is unchanged *or*
171 171 we have outgoing changesets but refused to push
172 172 - other values as described by addchangegroup()
173 173 '''
174 174 pushop = pushoperation(repo, remote, force, revs, newbranch, bookmarks)
175 175 if pushop.remote.local():
176 176 missing = (set(pushop.repo.requirements)
177 177 - pushop.remote.local().supported)
178 178 if missing:
179 179 msg = _("required features are not"
180 180 " supported in the destination:"
181 181 " %s") % (', '.join(sorted(missing)))
182 182 raise util.Abort(msg)
183 183
184 184 # there are two ways to push to remote repo:
185 185 #
186 186 # addchangegroup assumes local user can lock remote
187 187 # repo (local filesystem, old ssh servers).
188 188 #
189 189 # unbundle assumes local user cannot lock remote repo (new ssh
190 190 # servers, http servers).
191 191
192 192 if not pushop.remote.canpush():
193 193 raise util.Abort(_("destination does not support push"))
194 194 # get local lock as we might write phase data
195 195 locallock = None
196 196 try:
197 197 locallock = pushop.repo.lock()
198 198 pushop.locallocked = True
199 199 except IOError, err:
200 200 pushop.locallocked = False
201 201 if err.errno != errno.EACCES:
202 202 raise
203 203 # source repo cannot be locked.
204 204 # We do not abort the push, but just disable the local phase
205 205 # synchronisation.
206 206 msg = 'cannot lock source repository: %s\n' % err
207 207 pushop.ui.debug(msg)
208 208 try:
209 209 if pushop.locallocked:
210 210 pushop.trmanager = transactionmanager(repo,
211 211 'push-response',
212 212 pushop.remote.url())
213 213 pushop.repo.checkpush(pushop)
214 214 lock = None
215 215 unbundle = pushop.remote.capable('unbundle')
216 216 if not unbundle:
217 217 lock = pushop.remote.lock()
218 218 try:
219 219 _pushdiscovery(pushop)
220 220 if (pushop.repo.ui.configbool('experimental', 'bundle2-exp',
221 221 False)
222 222 and pushop.remote.capable('bundle2-exp')):
223 223 _pushbundle2(pushop)
224 224 _pushchangeset(pushop)
225 225 _pushsyncphase(pushop)
226 226 _pushobsolete(pushop)
227 227 _pushbookmark(pushop)
228 228 finally:
229 229 if lock is not None:
230 230 lock.release()
231 231 if pushop.trmanager:
232 232 pushop.trmanager.close()
233 233 finally:
234 234 if pushop.trmanager:
235 235 pushop.trmanager.release()
236 236 if locallock is not None:
237 237 locallock.release()
238 238
239 239 return pushop
240 240
241 241 # list of steps to perform discovery before push
242 242 pushdiscoveryorder = []
243 243
244 244 # Mapping between step name and function
245 245 #
246 246 # This exists to help extensions wrap steps if necessary
247 247 pushdiscoverymapping = {}
248 248
249 249 def pushdiscovery(stepname):
250 250 """decorator for function performing discovery before push
251 251
252 252 The function is added to the step -> function mapping and appended to the
253 253 list of steps. Beware that decorated function will be added in order (this
254 254 may matter).
255 255
256 256 You can only use this decorator for a new step, if you want to wrap a step
257 257 from an extension, change the pushdiscovery dictionary directly."""
258 258 def dec(func):
259 259 assert stepname not in pushdiscoverymapping
260 260 pushdiscoverymapping[stepname] = func
261 261 pushdiscoveryorder.append(stepname)
262 262 return func
263 263 return dec
264 264
265 265 def _pushdiscovery(pushop):
266 266 """Run all discovery steps"""
267 267 for stepname in pushdiscoveryorder:
268 268 step = pushdiscoverymapping[stepname]
269 269 step(pushop)
270 270
271 271 @pushdiscovery('changeset')
272 272 def _pushdiscoverychangeset(pushop):
273 273 """discover the changeset that need to be pushed"""
274 unfi = pushop.repo.unfiltered()
275 274 fci = discovery.findcommonincoming
276 commoninc = fci(unfi, pushop.remote, force=pushop.force)
275 commoninc = fci(pushop.repo, pushop.remote, force=pushop.force)
277 276 common, inc, remoteheads = commoninc
278 277 fco = discovery.findcommonoutgoing
279 outgoing = fco(unfi, pushop.remote, onlyheads=pushop.revs,
278 outgoing = fco(pushop.repo, pushop.remote, onlyheads=pushop.revs,
280 279 commoninc=commoninc, force=pushop.force)
281 280 pushop.outgoing = outgoing
282 281 pushop.remoteheads = remoteheads
283 282 pushop.incoming = inc
284 283
285 284 @pushdiscovery('phase')
286 285 def _pushdiscoveryphase(pushop):
287 286 """discover the phase that needs to be pushed
288 287
289 288 (computed for both success and failure case for changesets push)"""
290 289 outgoing = pushop.outgoing
291 290 unfi = pushop.repo.unfiltered()
292 291 remotephases = pushop.remote.listkeys('phases')
293 292 publishing = remotephases.get('publishing', False)
294 293 ana = phases.analyzeremotephases(pushop.repo,
295 294 pushop.fallbackheads,
296 295 remotephases)
297 296 pheads, droots = ana
298 297 extracond = ''
299 298 if not publishing:
300 299 extracond = ' and public()'
301 300 revset = 'heads((%%ln::%%ln) %s)' % extracond
302 301 # Get the list of all revs draft on remote by public here.
303 302 # XXX Beware that revset break if droots is not strictly
304 303 # XXX root we may want to ensure it is but it is costly
305 304 fallback = list(unfi.set(revset, droots, pushop.fallbackheads))
306 305 if not outgoing.missing:
307 306 future = fallback
308 307 else:
309 308 # adds changeset we are going to push as draft
310 309 #
311 310 # should not be necessary for publishing server, but because of an
312 311 # issue fixed in xxxxx we have to do it anyway.
313 312 fdroots = list(unfi.set('roots(%ln + %ln::)',
314 313 outgoing.missing, droots))
315 314 fdroots = [f.node() for f in fdroots]
316 315 future = list(unfi.set(revset, fdroots, pushop.futureheads))
317 316 pushop.outdatedphases = future
318 317 pushop.fallbackoutdatedphases = fallback
319 318
320 319 @pushdiscovery('obsmarker')
321 320 def _pushdiscoveryobsmarkers(pushop):
322 321 if (obsolete.isenabled(pushop.repo, obsolete.exchangeopt)
323 322 and pushop.repo.obsstore
324 323 and 'obsolete' in pushop.remote.listkeys('namespaces')):
325 324 repo = pushop.repo
326 325 # very naive computation, that can be quite expensive on big repo.
327 326 # However: evolution is currently slow on them anyway.
328 327 nodes = (c.node() for c in repo.set('::%ln', pushop.futureheads))
329 328 pushop.outobsmarkers = pushop.repo.obsstore.relevantmarkers(nodes)
330 329
331 330 @pushdiscovery('bookmarks')
332 331 def _pushdiscoverybookmarks(pushop):
333 332 ui = pushop.ui
334 333 repo = pushop.repo.unfiltered()
335 334 remote = pushop.remote
336 335 ui.debug("checking for updated bookmarks\n")
337 336 ancestors = ()
338 337 if pushop.revs:
339 338 revnums = map(repo.changelog.rev, pushop.revs)
340 339 ancestors = repo.changelog.ancestors(revnums, inclusive=True)
341 340 remotebookmark = remote.listkeys('bookmarks')
342 341
343 342 explicit = set(pushop.bookmarks)
344 343
345 344 comp = bookmod.compare(repo, repo._bookmarks, remotebookmark, srchex=hex)
346 345 addsrc, adddst, advsrc, advdst, diverge, differ, invalid, same = comp
347 346 for b, scid, dcid in advsrc:
348 347 if b in explicit:
349 348 explicit.remove(b)
350 349 if not ancestors or repo[scid].rev() in ancestors:
351 350 pushop.outbookmarks.append((b, dcid, scid))
352 351 # search added bookmark
353 352 for b, scid, dcid in addsrc:
354 353 if b in explicit:
355 354 explicit.remove(b)
356 355 pushop.outbookmarks.append((b, '', scid))
357 356 # search for overwritten bookmark
358 357 for b, scid, dcid in advdst + diverge + differ:
359 358 if b in explicit:
360 359 explicit.remove(b)
361 360 pushop.outbookmarks.append((b, dcid, scid))
362 361 # search for bookmark to delete
363 362 for b, scid, dcid in adddst:
364 363 if b in explicit:
365 364 explicit.remove(b)
366 365 # treat as "deleted locally"
367 366 pushop.outbookmarks.append((b, dcid, ''))
368 367 # identical bookmarks shouldn't get reported
369 368 for b, scid, dcid in same:
370 369 if b in explicit:
371 370 explicit.remove(b)
372 371
373 372 if explicit:
374 373 explicit = sorted(explicit)
375 374 # we should probably list all of them
376 375 ui.warn(_('bookmark %s does not exist on the local '
377 376 'or remote repository!\n') % explicit[0])
378 377 pushop.bkresult = 2
379 378
380 379 pushop.outbookmarks.sort()
381 380
382 381 def _pushcheckoutgoing(pushop):
383 382 outgoing = pushop.outgoing
384 383 unfi = pushop.repo.unfiltered()
385 384 if not outgoing.missing:
386 385 # nothing to push
387 386 scmutil.nochangesfound(unfi.ui, unfi, outgoing.excluded)
388 387 return False
389 388 # something to push
390 389 if not pushop.force:
391 390 # if repo.obsstore == False --> no obsolete
392 391 # then, save the iteration
393 392 if unfi.obsstore:
394 393 # this message are here for 80 char limit reason
395 394 mso = _("push includes obsolete changeset: %s!")
396 395 mst = {"unstable": _("push includes unstable changeset: %s!"),
397 396 "bumped": _("push includes bumped changeset: %s!"),
398 397 "divergent": _("push includes divergent changeset: %s!")}
399 398 # If we are to push if there is at least one
400 399 # obsolete or unstable changeset in missing, at
401 400 # least one of the missinghead will be obsolete or
402 401 # unstable. So checking heads only is ok
403 402 for node in outgoing.missingheads:
404 403 ctx = unfi[node]
405 404 if ctx.obsolete():
406 405 raise util.Abort(mso % ctx)
407 406 elif ctx.troubled():
408 407 raise util.Abort(mst[ctx.troubles()[0]] % ctx)
409 408 newbm = pushop.ui.configlist('bookmarks', 'pushing')
410 409 discovery.checkheads(unfi, pushop.remote, outgoing,
411 410 pushop.remoteheads,
412 411 pushop.newbranch,
413 412 bool(pushop.incoming),
414 413 newbm)
415 414 return True
416 415
417 416 # List of names of steps to perform for an outgoing bundle2, order matters.
418 417 b2partsgenorder = []
419 418
420 419 # Mapping between step name and function
421 420 #
422 421 # This exists to help extensions wrap steps if necessary
423 422 b2partsgenmapping = {}
424 423
425 424 def b2partsgenerator(stepname):
426 425 """decorator for function generating bundle2 part
427 426
428 427 The function is added to the step -> function mapping and appended to the
429 428 list of steps. Beware that decorated functions will be added in order
430 429 (this may matter).
431 430
432 431 You can only use this decorator for new steps, if you want to wrap a step
433 432 from an extension, attack the b2partsgenmapping dictionary directly."""
434 433 def dec(func):
435 434 assert stepname not in b2partsgenmapping
436 435 b2partsgenmapping[stepname] = func
437 436 b2partsgenorder.append(stepname)
438 437 return func
439 438 return dec
440 439
441 440 @b2partsgenerator('changeset')
442 441 def _pushb2ctx(pushop, bundler):
443 442 """handle changegroup push through bundle2
444 443
445 444 addchangegroup result is stored in the ``pushop.cgresult`` attribute.
446 445 """
447 446 if 'changesets' in pushop.stepsdone:
448 447 return
449 448 pushop.stepsdone.add('changesets')
450 449 # Send known heads to the server for race detection.
451 450 if not _pushcheckoutgoing(pushop):
452 451 return
453 452 pushop.repo.prepushoutgoinghooks(pushop.repo,
454 453 pushop.remote,
455 454 pushop.outgoing)
456 455 if not pushop.force:
457 456 bundler.newpart('b2x:check:heads', data=iter(pushop.remoteheads))
458 457 b2caps = bundle2.bundle2caps(pushop.remote)
459 458 version = None
460 459 cgversions = b2caps.get('b2x:changegroup')
461 460 if not cgversions: # 3.1 and 3.2 ship with an empty value
462 461 cg = changegroup.getlocalchangegroupraw(pushop.repo, 'push',
463 462 pushop.outgoing)
464 463 else:
465 464 cgversions = [v for v in cgversions if v in changegroup.packermap]
466 465 if not cgversions:
467 466 raise ValueError(_('no common changegroup version'))
468 467 version = max(cgversions)
469 468 cg = changegroup.getlocalchangegroupraw(pushop.repo, 'push',
470 469 pushop.outgoing,
471 470 version=version)
472 471 cgpart = bundler.newpart('b2x:changegroup', data=cg)
473 472 if version is not None:
474 473 cgpart.addparam('version', version)
475 474 def handlereply(op):
476 475 """extract addchangegroup returns from server reply"""
477 476 cgreplies = op.records.getreplies(cgpart.id)
478 477 assert len(cgreplies['changegroup']) == 1
479 478 pushop.cgresult = cgreplies['changegroup'][0]['return']
480 479 return handlereply
481 480
482 481 @b2partsgenerator('phase')
483 482 def _pushb2phases(pushop, bundler):
484 483 """handle phase push through bundle2"""
485 484 if 'phases' in pushop.stepsdone:
486 485 return
487 486 b2caps = bundle2.bundle2caps(pushop.remote)
488 487 if not 'b2x:pushkey' in b2caps:
489 488 return
490 489 pushop.stepsdone.add('phases')
491 490 part2node = []
492 491 enc = pushkey.encode
493 492 for newremotehead in pushop.outdatedphases:
494 493 part = bundler.newpart('b2x:pushkey')
495 494 part.addparam('namespace', enc('phases'))
496 495 part.addparam('key', enc(newremotehead.hex()))
497 496 part.addparam('old', enc(str(phases.draft)))
498 497 part.addparam('new', enc(str(phases.public)))
499 498 part2node.append((part.id, newremotehead))
500 499 def handlereply(op):
501 500 for partid, node in part2node:
502 501 partrep = op.records.getreplies(partid)
503 502 results = partrep['pushkey']
504 503 assert len(results) <= 1
505 504 msg = None
506 505 if not results:
507 506 msg = _('server ignored update of %s to public!\n') % node
508 507 elif not int(results[0]['return']):
509 508 msg = _('updating %s to public failed!\n') % node
510 509 if msg is not None:
511 510 pushop.ui.warn(msg)
512 511 return handlereply
513 512
514 513 @b2partsgenerator('obsmarkers')
515 514 def _pushb2obsmarkers(pushop, bundler):
516 515 if 'obsmarkers' in pushop.stepsdone:
517 516 return
518 517 remoteversions = bundle2.obsmarkersversion(bundler.capabilities)
519 518 if obsolete.commonversion(remoteversions) is None:
520 519 return
521 520 pushop.stepsdone.add('obsmarkers')
522 521 if pushop.outobsmarkers:
523 522 buildobsmarkerspart(bundler, pushop.outobsmarkers)
524 523
525 524 @b2partsgenerator('bookmarks')
526 525 def _pushb2bookmarks(pushop, bundler):
527 526 """handle phase push through bundle2"""
528 527 if 'bookmarks' in pushop.stepsdone:
529 528 return
530 529 b2caps = bundle2.bundle2caps(pushop.remote)
531 530 if 'b2x:pushkey' not in b2caps:
532 531 return
533 532 pushop.stepsdone.add('bookmarks')
534 533 part2book = []
535 534 enc = pushkey.encode
536 535 for book, old, new in pushop.outbookmarks:
537 536 part = bundler.newpart('b2x:pushkey')
538 537 part.addparam('namespace', enc('bookmarks'))
539 538 part.addparam('key', enc(book))
540 539 part.addparam('old', enc(old))
541 540 part.addparam('new', enc(new))
542 541 action = 'update'
543 542 if not old:
544 543 action = 'export'
545 544 elif not new:
546 545 action = 'delete'
547 546 part2book.append((part.id, book, action))
548 547
549 548
550 549 def handlereply(op):
551 550 ui = pushop.ui
552 551 for partid, book, action in part2book:
553 552 partrep = op.records.getreplies(partid)
554 553 results = partrep['pushkey']
555 554 assert len(results) <= 1
556 555 if not results:
557 556 pushop.ui.warn(_('server ignored bookmark %s update\n') % book)
558 557 else:
559 558 ret = int(results[0]['return'])
560 559 if ret:
561 560 ui.status(bookmsgmap[action][0] % book)
562 561 else:
563 562 ui.warn(bookmsgmap[action][1] % book)
564 563 if pushop.bkresult is not None:
565 564 pushop.bkresult = 1
566 565 return handlereply
567 566
568 567
569 568 def _pushbundle2(pushop):
570 569 """push data to the remote using bundle2
571 570
572 571 The only currently supported type of data is changegroup but this will
573 572 evolve in the future."""
574 573 bundler = bundle2.bundle20(pushop.ui, bundle2.bundle2caps(pushop.remote))
575 574 pushback = (pushop.trmanager
576 575 and pushop.ui.configbool('experimental', 'bundle2.pushback'))
577 576
578 577 # create reply capability
579 578 capsblob = bundle2.encodecaps(bundle2.getrepocaps(pushop.repo,
580 579 allowpushback=pushback))
581 580 bundler.newpart('b2x:replycaps', data=capsblob)
582 581 replyhandlers = []
583 582 for partgenname in b2partsgenorder:
584 583 partgen = b2partsgenmapping[partgenname]
585 584 ret = partgen(pushop, bundler)
586 585 if callable(ret):
587 586 replyhandlers.append(ret)
588 587 # do not push if nothing to push
589 588 if bundler.nbparts <= 1:
590 589 return
591 590 stream = util.chunkbuffer(bundler.getchunks())
592 591 try:
593 592 reply = pushop.remote.unbundle(stream, ['force'], 'push')
594 593 except error.BundleValueError, exc:
595 594 raise util.Abort('missing support for %s' % exc)
596 595 try:
597 596 trgetter = None
598 597 if pushback:
599 598 trgetter = pushop.trmanager.transaction
600 599 op = bundle2.processbundle(pushop.repo, reply, trgetter)
601 600 except error.BundleValueError, exc:
602 601 raise util.Abort('missing support for %s' % exc)
603 602 for rephand in replyhandlers:
604 603 rephand(op)
605 604
606 605 def _pushchangeset(pushop):
607 606 """Make the actual push of changeset bundle to remote repo"""
608 607 if 'changesets' in pushop.stepsdone:
609 608 return
610 609 pushop.stepsdone.add('changesets')
611 610 if not _pushcheckoutgoing(pushop):
612 611 return
613 612 pushop.repo.prepushoutgoinghooks(pushop.repo,
614 613 pushop.remote,
615 614 pushop.outgoing)
616 615 outgoing = pushop.outgoing
617 616 unbundle = pushop.remote.capable('unbundle')
618 617 # TODO: get bundlecaps from remote
619 618 bundlecaps = None
620 619 # create a changegroup from local
621 620 if pushop.revs is None and not (outgoing.excluded
622 621 or pushop.repo.changelog.filteredrevs):
623 622 # push everything,
624 623 # use the fast path, no race possible on push
625 624 bundler = changegroup.cg1packer(pushop.repo, bundlecaps)
626 625 cg = changegroup.getsubset(pushop.repo,
627 626 outgoing,
628 627 bundler,
629 628 'push',
630 629 fastpath=True)
631 630 else:
632 631 cg = changegroup.getlocalchangegroup(pushop.repo, 'push', outgoing,
633 632 bundlecaps)
634 633
635 634 # apply changegroup to remote
636 635 if unbundle:
637 636 # local repo finds heads on server, finds out what
638 637 # revs it must push. once revs transferred, if server
639 638 # finds it has different heads (someone else won
640 639 # commit/push race), server aborts.
641 640 if pushop.force:
642 641 remoteheads = ['force']
643 642 else:
644 643 remoteheads = pushop.remoteheads
645 644 # ssh: return remote's addchangegroup()
646 645 # http: return remote's addchangegroup() or 0 for error
647 646 pushop.cgresult = pushop.remote.unbundle(cg, remoteheads,
648 647 pushop.repo.url())
649 648 else:
650 649 # we return an integer indicating remote head count
651 650 # change
652 651 pushop.cgresult = pushop.remote.addchangegroup(cg, 'push',
653 652 pushop.repo.url())
654 653
655 654 def _pushsyncphase(pushop):
656 655 """synchronise phase information locally and remotely"""
657 656 cheads = pushop.commonheads
658 657 # even when we don't push, exchanging phase data is useful
659 658 remotephases = pushop.remote.listkeys('phases')
660 659 if (pushop.ui.configbool('ui', '_usedassubrepo', False)
661 660 and remotephases # server supports phases
662 661 and pushop.cgresult is None # nothing was pushed
663 662 and remotephases.get('publishing', False)):
664 663 # When:
665 664 # - this is a subrepo push
666 665 # - and remote support phase
667 666 # - and no changeset was pushed
668 667 # - and remote is publishing
669 668 # We may be in issue 3871 case!
670 669 # We drop the possible phase synchronisation done by
671 670 # courtesy to publish changesets possibly locally draft
672 671 # on the remote.
673 672 remotephases = {'publishing': 'True'}
674 673 if not remotephases: # old server or public only reply from non-publishing
675 674 _localphasemove(pushop, cheads)
676 675 # don't push any phase data as there is nothing to push
677 676 else:
678 677 ana = phases.analyzeremotephases(pushop.repo, cheads,
679 678 remotephases)
680 679 pheads, droots = ana
681 680 ### Apply remote phase on local
682 681 if remotephases.get('publishing', False):
683 682 _localphasemove(pushop, cheads)
684 683 else: # publish = False
685 684 _localphasemove(pushop, pheads)
686 685 _localphasemove(pushop, cheads, phases.draft)
687 686 ### Apply local phase on remote
688 687
689 688 if pushop.cgresult:
690 689 if 'phases' in pushop.stepsdone:
691 690 # phases already pushed though bundle2
692 691 return
693 692 outdated = pushop.outdatedphases
694 693 else:
695 694 outdated = pushop.fallbackoutdatedphases
696 695
697 696 pushop.stepsdone.add('phases')
698 697
699 698 # filter heads already turned public by the push
700 699 outdated = [c for c in outdated if c.node() not in pheads]
701 700 # fallback to independent pushkey command
702 701 for newremotehead in outdated:
703 702 r = pushop.remote.pushkey('phases',
704 703 newremotehead.hex(),
705 704 str(phases.draft),
706 705 str(phases.public))
707 706 if not r:
708 707 pushop.ui.warn(_('updating %s to public failed!\n')
709 708 % newremotehead)
710 709
711 710 def _localphasemove(pushop, nodes, phase=phases.public):
712 711 """move <nodes> to <phase> in the local source repo"""
713 712 if pushop.trmanager:
714 713 phases.advanceboundary(pushop.repo,
715 714 pushop.trmanager.transaction(),
716 715 phase,
717 716 nodes)
718 717 else:
719 718 # repo is not locked, do not change any phases!
720 719 # Informs the user that phases should have been moved when
721 720 # applicable.
722 721 actualmoves = [n for n in nodes if phase < pushop.repo[n].phase()]
723 722 phasestr = phases.phasenames[phase]
724 723 if actualmoves:
725 724 pushop.ui.status(_('cannot lock source repo, skipping '
726 725 'local %s phase update\n') % phasestr)
727 726
728 727 def _pushobsolete(pushop):
729 728 """utility function to push obsolete markers to a remote"""
730 729 if 'obsmarkers' in pushop.stepsdone:
731 730 return
732 731 pushop.ui.debug('try to push obsolete markers to remote\n')
733 732 repo = pushop.repo
734 733 remote = pushop.remote
735 734 pushop.stepsdone.add('obsmarkers')
736 735 if pushop.outobsmarkers:
737 736 rslts = []
738 737 remotedata = obsolete._pushkeyescape(pushop.outobsmarkers)
739 738 for key in sorted(remotedata, reverse=True):
740 739 # reverse sort to ensure we end with dump0
741 740 data = remotedata[key]
742 741 rslts.append(remote.pushkey('obsolete', key, '', data))
743 742 if [r for r in rslts if not r]:
744 743 msg = _('failed to push some obsolete markers!\n')
745 744 repo.ui.warn(msg)
746 745
747 746 def _pushbookmark(pushop):
748 747 """Update bookmark position on remote"""
749 748 if pushop.cgresult == 0 or 'bookmarks' in pushop.stepsdone:
750 749 return
751 750 pushop.stepsdone.add('bookmarks')
752 751 ui = pushop.ui
753 752 remote = pushop.remote
754 753
755 754 for b, old, new in pushop.outbookmarks:
756 755 action = 'update'
757 756 if not old:
758 757 action = 'export'
759 758 elif not new:
760 759 action = 'delete'
761 760 if remote.pushkey('bookmarks', b, old, new):
762 761 ui.status(bookmsgmap[action][0] % b)
763 762 else:
764 763 ui.warn(bookmsgmap[action][1] % b)
765 764 # discovery can have set the value form invalid entry
766 765 if pushop.bkresult is not None:
767 766 pushop.bkresult = 1
768 767
769 768 class pulloperation(object):
770 769 """A object that represent a single pull operation
771 770
772 771 It purpose is to carry pull related state and very common operation.
773 772
774 773 A new should be created at the beginning of each pull and discarded
775 774 afterward.
776 775 """
777 776
778 777 def __init__(self, repo, remote, heads=None, force=False, bookmarks=()):
779 778 # repo we pull into
780 779 self.repo = repo
781 780 # repo we pull from
782 781 self.remote = remote
783 782 # revision we try to pull (None is "all")
784 783 self.heads = heads
785 784 # bookmark pulled explicitly
786 785 self.explicitbookmarks = bookmarks
787 786 # do we force pull?
788 787 self.force = force
789 788 # transaction manager
790 789 self.trmanager = None
791 790 # set of common changeset between local and remote before pull
792 791 self.common = None
793 792 # set of pulled head
794 793 self.rheads = None
795 794 # list of missing changeset to fetch remotely
796 795 self.fetch = None
797 796 # remote bookmarks data
798 797 self.remotebookmarks = None
799 798 # result of changegroup pulling (used as return code by pull)
800 799 self.cgresult = None
801 800 # list of step already done
802 801 self.stepsdone = set()
803 802
804 803 @util.propertycache
805 804 def pulledsubset(self):
806 805 """heads of the set of changeset target by the pull"""
807 806 # compute target subset
808 807 if self.heads is None:
809 808 # We pulled every thing possible
810 809 # sync on everything common
811 810 c = set(self.common)
812 811 ret = list(self.common)
813 812 for n in self.rheads:
814 813 if n not in c:
815 814 ret.append(n)
816 815 return ret
817 816 else:
818 817 # We pulled a specific subset
819 818 # sync on this subset
820 819 return self.heads
821 820
822 821 def gettransaction(self):
823 822 # deprecated; talk to trmanager directly
824 823 return self.trmanager.transaction()
825 824
826 825 class transactionmanager(object):
827 826 """An object to manage the life cycle of a transaction
828 827
829 828 It creates the transaction on demand and calls the appropriate hooks when
830 829 closing the transaction."""
831 830 def __init__(self, repo, source, url):
832 831 self.repo = repo
833 832 self.source = source
834 833 self.url = url
835 834 self._tr = None
836 835
837 836 def transaction(self):
838 837 """Return an open transaction object, constructing if necessary"""
839 838 if not self._tr:
840 839 trname = '%s\n%s' % (self.source, util.hidepassword(self.url))
841 840 self._tr = self.repo.transaction(trname)
842 841 self._tr.hookargs['source'] = self.source
843 842 self._tr.hookargs['url'] = self.url
844 843 return self._tr
845 844
846 845 def close(self):
847 846 """close transaction if created"""
848 847 if self._tr is not None:
849 848 repo = self.repo
850 849 p = lambda: self._tr.writepending() and repo.root or ""
851 850 repo.hook('b2x-pretransactionclose', throw=True, pending=p,
852 851 **self._tr.hookargs)
853 852 hookargs = dict(self._tr.hookargs)
854 853 def runhooks():
855 854 repo.hook('b2x-transactionclose', **hookargs)
856 855 self._tr.addpostclose('b2x-hook-transactionclose',
857 856 lambda tr: repo._afterlock(runhooks))
858 857 self._tr.close()
859 858
860 859 def release(self):
861 860 """release transaction if created"""
862 861 if self._tr is not None:
863 862 self._tr.release()
864 863
865 864 def pull(repo, remote, heads=None, force=False, bookmarks=()):
866 865 pullop = pulloperation(repo, remote, heads, force, bookmarks=bookmarks)
867 866 if pullop.remote.local():
868 867 missing = set(pullop.remote.requirements) - pullop.repo.supported
869 868 if missing:
870 869 msg = _("required features are not"
871 870 " supported in the destination:"
872 871 " %s") % (', '.join(sorted(missing)))
873 872 raise util.Abort(msg)
874 873
875 874 pullop.remotebookmarks = remote.listkeys('bookmarks')
876 875 lock = pullop.repo.lock()
877 876 try:
878 877 pullop.trmanager = transactionmanager(repo, 'pull', remote.url())
879 878 _pulldiscovery(pullop)
880 879 if (pullop.repo.ui.configbool('experimental', 'bundle2-exp', False)
881 880 and pullop.remote.capable('bundle2-exp')):
882 881 _pullbundle2(pullop)
883 882 _pullchangeset(pullop)
884 883 _pullphase(pullop)
885 884 _pullbookmarks(pullop)
886 885 _pullobsolete(pullop)
887 886 pullop.trmanager.close()
888 887 finally:
889 888 pullop.trmanager.release()
890 889 lock.release()
891 890
892 891 return pullop
893 892
894 893 # list of steps to perform discovery before pull
895 894 pulldiscoveryorder = []
896 895
897 896 # Mapping between step name and function
898 897 #
899 898 # This exists to help extensions wrap steps if necessary
900 899 pulldiscoverymapping = {}
901 900
902 901 def pulldiscovery(stepname):
903 902 """decorator for function performing discovery before pull
904 903
905 904 The function is added to the step -> function mapping and appended to the
906 905 list of steps. Beware that decorated function will be added in order (this
907 906 may matter).
908 907
909 908 You can only use this decorator for a new step, if you want to wrap a step
910 909 from an extension, change the pulldiscovery dictionary directly."""
911 910 def dec(func):
912 911 assert stepname not in pulldiscoverymapping
913 912 pulldiscoverymapping[stepname] = func
914 913 pulldiscoveryorder.append(stepname)
915 914 return func
916 915 return dec
917 916
918 917 def _pulldiscovery(pullop):
919 918 """Run all discovery steps"""
920 919 for stepname in pulldiscoveryorder:
921 920 step = pulldiscoverymapping[stepname]
922 921 step(pullop)
923 922
924 923 @pulldiscovery('changegroup')
925 924 def _pulldiscoverychangegroup(pullop):
926 925 """discovery phase for the pull
927 926
928 927 Current handle changeset discovery only, will change handle all discovery
929 928 at some point."""
930 tmp = discovery.findcommonincoming(pullop.repo.unfiltered(),
929 tmp = discovery.findcommonincoming(pullop.repo,
931 930 pullop.remote,
932 931 heads=pullop.heads,
933 932 force=pullop.force)
934 pullop.common, pullop.fetch, pullop.rheads = tmp
933 common, fetch, rheads = tmp
934 nm = pullop.repo.unfiltered().changelog.nodemap
935 if fetch and rheads:
936 # If a remote heads in filtered locally, lets drop it from the unknown
937 # remote heads and put in back in common.
938 #
939 # This is a hackish solution to catch most of "common but locally
940 # hidden situation". We do not performs discovery on unfiltered
941 # repository because it end up doing a pathological amount of round
942 # trip for w huge amount of changeset we do not care about.
943 #
944 # If a set of such "common but filtered" changeset exist on the server
945 # but are not including a remote heads, we'll not be able to detect it,
946 scommon = set(common)
947 filteredrheads = []
948 for n in rheads:
949 if n in nm and n not in scommon:
950 common.append(n)
951 else:
952 filteredrheads.append(n)
953 if not filteredrheads:
954 fetch = []
955 rheads = filteredrheads
956 pullop.common = common
957 pullop.fetch = fetch
958 pullop.rheads = rheads
935 959
936 960 def _pullbundle2(pullop):
937 961 """pull data using bundle2
938 962
939 963 For now, the only supported data are changegroup."""
940 964 remotecaps = bundle2.bundle2caps(pullop.remote)
941 965 kwargs = {'bundlecaps': caps20to10(pullop.repo)}
942 966 # pulling changegroup
943 967 pullop.stepsdone.add('changegroup')
944 968
945 969 kwargs['common'] = pullop.common
946 970 kwargs['heads'] = pullop.heads or pullop.rheads
947 971 kwargs['cg'] = pullop.fetch
948 972 if 'b2x:listkeys' in remotecaps:
949 973 kwargs['listkeys'] = ['phase', 'bookmarks']
950 974 if not pullop.fetch:
951 975 pullop.repo.ui.status(_("no changes found\n"))
952 976 pullop.cgresult = 0
953 977 else:
954 978 if pullop.heads is None and list(pullop.common) == [nullid]:
955 979 pullop.repo.ui.status(_("requesting all changes\n"))
956 980 if obsolete.isenabled(pullop.repo, obsolete.exchangeopt):
957 981 remoteversions = bundle2.obsmarkersversion(remotecaps)
958 982 if obsolete.commonversion(remoteversions) is not None:
959 983 kwargs['obsmarkers'] = True
960 984 pullop.stepsdone.add('obsmarkers')
961 985 _pullbundle2extraprepare(pullop, kwargs)
962 986 if kwargs.keys() == ['format']:
963 987 return # nothing to pull
964 988 bundle = pullop.remote.getbundle('pull', **kwargs)
965 989 try:
966 990 op = bundle2.processbundle(pullop.repo, bundle, pullop.gettransaction)
967 991 except error.BundleValueError, exc:
968 992 raise util.Abort('missing support for %s' % exc)
969 993
970 994 if pullop.fetch:
971 995 changedheads = 0
972 996 pullop.cgresult = 1
973 997 for cg in op.records['changegroup']:
974 998 ret = cg['return']
975 999 # If any changegroup result is 0, return 0
976 1000 if ret == 0:
977 1001 pullop.cgresult = 0
978 1002 break
979 1003 if ret < -1:
980 1004 changedheads += ret + 1
981 1005 elif ret > 1:
982 1006 changedheads += ret - 1
983 1007 if changedheads > 0:
984 1008 pullop.cgresult = 1 + changedheads
985 1009 elif changedheads < 0:
986 1010 pullop.cgresult = -1 + changedheads
987 1011
988 1012 # processing phases change
989 1013 for namespace, value in op.records['listkeys']:
990 1014 if namespace == 'phases':
991 1015 _pullapplyphases(pullop, value)
992 1016
993 1017 # processing bookmark update
994 1018 for namespace, value in op.records['listkeys']:
995 1019 if namespace == 'bookmarks':
996 1020 pullop.remotebookmarks = value
997 1021 _pullbookmarks(pullop)
998 1022
999 1023 def _pullbundle2extraprepare(pullop, kwargs):
1000 1024 """hook function so that extensions can extend the getbundle call"""
1001 1025 pass
1002 1026
1003 1027 def _pullchangeset(pullop):
1004 1028 """pull changeset from unbundle into the local repo"""
1005 1029 # We delay the open of the transaction as late as possible so we
1006 1030 # don't open transaction for nothing or you break future useful
1007 1031 # rollback call
1008 1032 if 'changegroup' in pullop.stepsdone:
1009 1033 return
1010 1034 pullop.stepsdone.add('changegroup')
1011 1035 if not pullop.fetch:
1012 1036 pullop.repo.ui.status(_("no changes found\n"))
1013 1037 pullop.cgresult = 0
1014 1038 return
1015 1039 pullop.gettransaction()
1016 1040 if pullop.heads is None and list(pullop.common) == [nullid]:
1017 1041 pullop.repo.ui.status(_("requesting all changes\n"))
1018 1042 elif pullop.heads is None and pullop.remote.capable('changegroupsubset'):
1019 1043 # issue1320, avoid a race if remote changed after discovery
1020 1044 pullop.heads = pullop.rheads
1021 1045
1022 1046 if pullop.remote.capable('getbundle'):
1023 1047 # TODO: get bundlecaps from remote
1024 1048 cg = pullop.remote.getbundle('pull', common=pullop.common,
1025 1049 heads=pullop.heads or pullop.rheads)
1026 1050 elif pullop.heads is None:
1027 1051 cg = pullop.remote.changegroup(pullop.fetch, 'pull')
1028 1052 elif not pullop.remote.capable('changegroupsubset'):
1029 1053 raise util.Abort(_("partial pull cannot be done because "
1030 1054 "other repository doesn't support "
1031 1055 "changegroupsubset."))
1032 1056 else:
1033 1057 cg = pullop.remote.changegroupsubset(pullop.fetch, pullop.heads, 'pull')
1034 1058 pullop.cgresult = changegroup.addchangegroup(pullop.repo, cg, 'pull',
1035 1059 pullop.remote.url())
1036 1060
1037 1061 def _pullphase(pullop):
1038 1062 # Get remote phases data from remote
1039 1063 if 'phases' in pullop.stepsdone:
1040 1064 return
1041 1065 remotephases = pullop.remote.listkeys('phases')
1042 1066 _pullapplyphases(pullop, remotephases)
1043 1067
1044 1068 def _pullapplyphases(pullop, remotephases):
1045 1069 """apply phase movement from observed remote state"""
1046 1070 if 'phases' in pullop.stepsdone:
1047 1071 return
1048 1072 pullop.stepsdone.add('phases')
1049 1073 publishing = bool(remotephases.get('publishing', False))
1050 1074 if remotephases and not publishing:
1051 1075 # remote is new and unpublishing
1052 1076 pheads, _dr = phases.analyzeremotephases(pullop.repo,
1053 1077 pullop.pulledsubset,
1054 1078 remotephases)
1055 1079 dheads = pullop.pulledsubset
1056 1080 else:
1057 1081 # Remote is old or publishing all common changesets
1058 1082 # should be seen as public
1059 1083 pheads = pullop.pulledsubset
1060 1084 dheads = []
1061 1085 unfi = pullop.repo.unfiltered()
1062 1086 phase = unfi._phasecache.phase
1063 1087 rev = unfi.changelog.nodemap.get
1064 1088 public = phases.public
1065 1089 draft = phases.draft
1066 1090
1067 1091 # exclude changesets already public locally and update the others
1068 1092 pheads = [pn for pn in pheads if phase(unfi, rev(pn)) > public]
1069 1093 if pheads:
1070 1094 tr = pullop.gettransaction()
1071 1095 phases.advanceboundary(pullop.repo, tr, public, pheads)
1072 1096
1073 1097 # exclude changesets already draft locally and update the others
1074 1098 dheads = [pn for pn in dheads if phase(unfi, rev(pn)) > draft]
1075 1099 if dheads:
1076 1100 tr = pullop.gettransaction()
1077 1101 phases.advanceboundary(pullop.repo, tr, draft, dheads)
1078 1102
1079 1103 def _pullbookmarks(pullop):
1080 1104 """process the remote bookmark information to update the local one"""
1081 1105 if 'bookmarks' in pullop.stepsdone:
1082 1106 return
1083 1107 pullop.stepsdone.add('bookmarks')
1084 1108 repo = pullop.repo
1085 1109 remotebookmarks = pullop.remotebookmarks
1086 1110 bookmod.updatefromremote(repo.ui, repo, remotebookmarks,
1087 1111 pullop.remote.url(),
1088 1112 pullop.gettransaction,
1089 1113 explicit=pullop.explicitbookmarks)
1090 1114
1091 1115 def _pullobsolete(pullop):
1092 1116 """utility function to pull obsolete markers from a remote
1093 1117
1094 1118 The `gettransaction` is function that return the pull transaction, creating
1095 1119 one if necessary. We return the transaction to inform the calling code that
1096 1120 a new transaction have been created (when applicable).
1097 1121
1098 1122 Exists mostly to allow overriding for experimentation purpose"""
1099 1123 if 'obsmarkers' in pullop.stepsdone:
1100 1124 return
1101 1125 pullop.stepsdone.add('obsmarkers')
1102 1126 tr = None
1103 1127 if obsolete.isenabled(pullop.repo, obsolete.exchangeopt):
1104 1128 pullop.repo.ui.debug('fetching remote obsolete markers\n')
1105 1129 remoteobs = pullop.remote.listkeys('obsolete')
1106 1130 if 'dump0' in remoteobs:
1107 1131 tr = pullop.gettransaction()
1108 1132 for key in sorted(remoteobs, reverse=True):
1109 1133 if key.startswith('dump'):
1110 1134 data = base85.b85decode(remoteobs[key])
1111 1135 pullop.repo.obsstore.mergemarkers(tr, data)
1112 1136 pullop.repo.invalidatevolatilesets()
1113 1137 return tr
1114 1138
1115 1139 def caps20to10(repo):
1116 1140 """return a set with appropriate options to use bundle20 during getbundle"""
1117 1141 caps = set(['HG2Y'])
1118 1142 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
1119 1143 caps.add('bundle2=' + urllib.quote(capsblob))
1120 1144 return caps
1121 1145
1122 1146 # List of names of steps to perform for a bundle2 for getbundle, order matters.
1123 1147 getbundle2partsorder = []
1124 1148
1125 1149 # Mapping between step name and function
1126 1150 #
1127 1151 # This exists to help extensions wrap steps if necessary
1128 1152 getbundle2partsmapping = {}
1129 1153
1130 1154 def getbundle2partsgenerator(stepname):
1131 1155 """decorator for function generating bundle2 part for getbundle
1132 1156
1133 1157 The function is added to the step -> function mapping and appended to the
1134 1158 list of steps. Beware that decorated functions will be added in order
1135 1159 (this may matter).
1136 1160
1137 1161 You can only use this decorator for new steps, if you want to wrap a step
1138 1162 from an extension, attack the getbundle2partsmapping dictionary directly."""
1139 1163 def dec(func):
1140 1164 assert stepname not in getbundle2partsmapping
1141 1165 getbundle2partsmapping[stepname] = func
1142 1166 getbundle2partsorder.append(stepname)
1143 1167 return func
1144 1168 return dec
1145 1169
1146 1170 def getbundle(repo, source, heads=None, common=None, bundlecaps=None,
1147 1171 **kwargs):
1148 1172 """return a full bundle (with potentially multiple kind of parts)
1149 1173
1150 1174 Could be a bundle HG10 or a bundle HG2Y depending on bundlecaps
1151 1175 passed. For now, the bundle can contain only changegroup, but this will
1152 1176 changes when more part type will be available for bundle2.
1153 1177
1154 1178 This is different from changegroup.getchangegroup that only returns an HG10
1155 1179 changegroup bundle. They may eventually get reunited in the future when we
1156 1180 have a clearer idea of the API we what to query different data.
1157 1181
1158 1182 The implementation is at a very early stage and will get massive rework
1159 1183 when the API of bundle is refined.
1160 1184 """
1161 1185 # bundle10 case
1162 1186 if bundlecaps is None or 'HG2Y' not in bundlecaps:
1163 1187 if bundlecaps and not kwargs.get('cg', True):
1164 1188 raise ValueError(_('request for bundle10 must include changegroup'))
1165 1189
1166 1190 if kwargs:
1167 1191 raise ValueError(_('unsupported getbundle arguments: %s')
1168 1192 % ', '.join(sorted(kwargs.keys())))
1169 1193 return changegroup.getchangegroup(repo, source, heads=heads,
1170 1194 common=common, bundlecaps=bundlecaps)
1171 1195
1172 1196 # bundle20 case
1173 1197 b2caps = {}
1174 1198 for bcaps in bundlecaps:
1175 1199 if bcaps.startswith('bundle2='):
1176 1200 blob = urllib.unquote(bcaps[len('bundle2='):])
1177 1201 b2caps.update(bundle2.decodecaps(blob))
1178 1202 bundler = bundle2.bundle20(repo.ui, b2caps)
1179 1203
1180 1204 kwargs['heads'] = heads
1181 1205 kwargs['common'] = common
1182 1206
1183 1207 for name in getbundle2partsorder:
1184 1208 func = getbundle2partsmapping[name]
1185 1209 func(bundler, repo, source, bundlecaps=bundlecaps, b2caps=b2caps,
1186 1210 **kwargs)
1187 1211
1188 1212 return util.chunkbuffer(bundler.getchunks())
1189 1213
1190 1214 @getbundle2partsgenerator('changegroup')
1191 1215 def _getbundlechangegrouppart(bundler, repo, source, bundlecaps=None,
1192 1216 b2caps=None, heads=None, common=None, **kwargs):
1193 1217 """add a changegroup part to the requested bundle"""
1194 1218 cg = None
1195 1219 if kwargs.get('cg', True):
1196 1220 # build changegroup bundle here.
1197 1221 version = None
1198 1222 cgversions = b2caps.get('b2x:changegroup')
1199 1223 if not cgversions: # 3.1 and 3.2 ship with an empty value
1200 1224 cg = changegroup.getchangegroupraw(repo, source, heads=heads,
1201 1225 common=common,
1202 1226 bundlecaps=bundlecaps)
1203 1227 else:
1204 1228 cgversions = [v for v in cgversions if v in changegroup.packermap]
1205 1229 if not cgversions:
1206 1230 raise ValueError(_('no common changegroup version'))
1207 1231 version = max(cgversions)
1208 1232 cg = changegroup.getchangegroupraw(repo, source, heads=heads,
1209 1233 common=common,
1210 1234 bundlecaps=bundlecaps,
1211 1235 version=version)
1212 1236
1213 1237 if cg:
1214 1238 part = bundler.newpart('b2x:changegroup', data=cg)
1215 1239 if version is not None:
1216 1240 part.addparam('version', version)
1217 1241
1218 1242 @getbundle2partsgenerator('listkeys')
1219 1243 def _getbundlelistkeysparts(bundler, repo, source, bundlecaps=None,
1220 1244 b2caps=None, **kwargs):
1221 1245 """add parts containing listkeys namespaces to the requested bundle"""
1222 1246 listkeys = kwargs.get('listkeys', ())
1223 1247 for namespace in listkeys:
1224 1248 part = bundler.newpart('b2x:listkeys')
1225 1249 part.addparam('namespace', namespace)
1226 1250 keys = repo.listkeys(namespace).items()
1227 1251 part.data = pushkey.encodekeys(keys)
1228 1252
1229 1253 @getbundle2partsgenerator('obsmarkers')
1230 1254 def _getbundleobsmarkerpart(bundler, repo, source, bundlecaps=None,
1231 1255 b2caps=None, heads=None, **kwargs):
1232 1256 """add an obsolescence markers part to the requested bundle"""
1233 1257 if kwargs.get('obsmarkers', False):
1234 1258 if heads is None:
1235 1259 heads = repo.heads()
1236 1260 subset = [c.node() for c in repo.set('::%ln', heads)]
1237 1261 markers = repo.obsstore.relevantmarkers(subset)
1238 1262 buildobsmarkerspart(bundler, markers)
1239 1263
1240 1264 def check_heads(repo, their_heads, context):
1241 1265 """check if the heads of a repo have been modified
1242 1266
1243 1267 Used by peer for unbundling.
1244 1268 """
1245 1269 heads = repo.heads()
1246 1270 heads_hash = util.sha1(''.join(sorted(heads))).digest()
1247 1271 if not (their_heads == ['force'] or their_heads == heads or
1248 1272 their_heads == ['hashed', heads_hash]):
1249 1273 # someone else committed/pushed/unbundled while we
1250 1274 # were transferring data
1251 1275 raise error.PushRaced('repository changed while %s - '
1252 1276 'please try again' % context)
1253 1277
1254 1278 def unbundle(repo, cg, heads, source, url):
1255 1279 """Apply a bundle to a repo.
1256 1280
1257 1281 this function makes sure the repo is locked during the application and have
1258 1282 mechanism to check that no push race occurred between the creation of the
1259 1283 bundle and its application.
1260 1284
1261 1285 If the push was raced as PushRaced exception is raised."""
1262 1286 r = 0
1263 1287 # need a transaction when processing a bundle2 stream
1264 1288 tr = None
1265 1289 lock = repo.lock()
1266 1290 try:
1267 1291 check_heads(repo, heads, 'uploading changes')
1268 1292 # push can proceed
1269 1293 if util.safehasattr(cg, 'params'):
1270 1294 try:
1271 1295 tr = repo.transaction('unbundle')
1272 1296 tr.hookargs['source'] = source
1273 1297 tr.hookargs['url'] = url
1274 1298 tr.hookargs['bundle2-exp'] = '1'
1275 1299 r = bundle2.processbundle(repo, cg, lambda: tr).reply
1276 1300 p = lambda: tr.writepending() and repo.root or ""
1277 1301 repo.hook('b2x-pretransactionclose', throw=True, pending=p,
1278 1302 **tr.hookargs)
1279 1303 hookargs = dict(tr.hookargs)
1280 1304 def runhooks():
1281 1305 repo.hook('b2x-transactionclose', **hookargs)
1282 1306 tr.addpostclose('b2x-hook-transactionclose',
1283 1307 lambda tr: repo._afterlock(runhooks))
1284 1308 tr.close()
1285 1309 except Exception, exc:
1286 1310 exc.duringunbundle2 = True
1287 1311 raise
1288 1312 else:
1289 1313 r = changegroup.addchangegroup(repo, cg, source, url)
1290 1314 finally:
1291 1315 if tr is not None:
1292 1316 tr.release()
1293 1317 lock.release()
1294 1318 return r
@@ -1,869 +1,873 b''
1 1 # wireproto.py - generic wire protocol support functions
2 2 #
3 3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 import urllib, tempfile, os, sys
9 9 from i18n import _
10 10 from node import bin, hex
11 11 import changegroup as changegroupmod, bundle2, pushkey as pushkeymod
12 12 import peer, error, encoding, util, store, exchange
13 13
14 14
15 15 class abstractserverproto(object):
16 16 """abstract class that summarizes the protocol API
17 17
18 18 Used as reference and documentation.
19 19 """
20 20
21 21 def getargs(self, args):
22 22 """return the value for arguments in <args>
23 23
24 24 returns a list of values (same order as <args>)"""
25 25 raise NotImplementedError()
26 26
27 27 def getfile(self, fp):
28 28 """write the whole content of a file into a file like object
29 29
30 30 The file is in the form::
31 31
32 32 (<chunk-size>\n<chunk>)+0\n
33 33
34 34 chunk size is the ascii version of the int.
35 35 """
36 36 raise NotImplementedError()
37 37
38 38 def redirect(self):
39 39 """may setup interception for stdout and stderr
40 40
41 41 See also the `restore` method."""
42 42 raise NotImplementedError()
43 43
44 44 # If the `redirect` function does install interception, the `restore`
45 45 # function MUST be defined. If interception is not used, this function
46 46 # MUST NOT be defined.
47 47 #
48 48 # left commented here on purpose
49 49 #
50 50 #def restore(self):
51 51 # """reinstall previous stdout and stderr and return intercepted stdout
52 52 # """
53 53 # raise NotImplementedError()
54 54
55 55 def groupchunks(self, cg):
56 56 """return 4096 chunks from a changegroup object
57 57
58 58 Some protocols may have compressed the contents."""
59 59 raise NotImplementedError()
60 60
61 61 # abstract batching support
62 62
63 63 class future(object):
64 64 '''placeholder for a value to be set later'''
65 65 def set(self, value):
66 66 if util.safehasattr(self, 'value'):
67 67 raise error.RepoError("future is already set")
68 68 self.value = value
69 69
70 70 class batcher(object):
71 71 '''base class for batches of commands submittable in a single request
72 72
73 73 All methods invoked on instances of this class are simply queued and
74 74 return a a future for the result. Once you call submit(), all the queued
75 75 calls are performed and the results set in their respective futures.
76 76 '''
77 77 def __init__(self):
78 78 self.calls = []
79 79 def __getattr__(self, name):
80 80 def call(*args, **opts):
81 81 resref = future()
82 82 self.calls.append((name, args, opts, resref,))
83 83 return resref
84 84 return call
85 85 def submit(self):
86 86 pass
87 87
88 88 class localbatch(batcher):
89 89 '''performs the queued calls directly'''
90 90 def __init__(self, local):
91 91 batcher.__init__(self)
92 92 self.local = local
93 93 def submit(self):
94 94 for name, args, opts, resref in self.calls:
95 95 resref.set(getattr(self.local, name)(*args, **opts))
96 96
97 97 class remotebatch(batcher):
98 98 '''batches the queued calls; uses as few roundtrips as possible'''
99 99 def __init__(self, remote):
100 100 '''remote must support _submitbatch(encbatch) and
101 101 _submitone(op, encargs)'''
102 102 batcher.__init__(self)
103 103 self.remote = remote
104 104 def submit(self):
105 105 req, rsp = [], []
106 106 for name, args, opts, resref in self.calls:
107 107 mtd = getattr(self.remote, name)
108 108 batchablefn = getattr(mtd, 'batchable', None)
109 109 if batchablefn is not None:
110 110 batchable = batchablefn(mtd.im_self, *args, **opts)
111 111 encargsorres, encresref = batchable.next()
112 112 if encresref:
113 113 req.append((name, encargsorres,))
114 114 rsp.append((batchable, encresref, resref,))
115 115 else:
116 116 resref.set(encargsorres)
117 117 else:
118 118 if req:
119 119 self._submitreq(req, rsp)
120 120 req, rsp = [], []
121 121 resref.set(mtd(*args, **opts))
122 122 if req:
123 123 self._submitreq(req, rsp)
124 124 def _submitreq(self, req, rsp):
125 125 encresults = self.remote._submitbatch(req)
126 126 for encres, r in zip(encresults, rsp):
127 127 batchable, encresref, resref = r
128 128 encresref.set(encres)
129 129 resref.set(batchable.next())
130 130
131 131 def batchable(f):
132 132 '''annotation for batchable methods
133 133
134 134 Such methods must implement a coroutine as follows:
135 135
136 136 @batchable
137 137 def sample(self, one, two=None):
138 138 # Handle locally computable results first:
139 139 if not one:
140 140 yield "a local result", None
141 141 # Build list of encoded arguments suitable for your wire protocol:
142 142 encargs = [('one', encode(one),), ('two', encode(two),)]
143 143 # Create future for injection of encoded result:
144 144 encresref = future()
145 145 # Return encoded arguments and future:
146 146 yield encargs, encresref
147 147 # Assuming the future to be filled with the result from the batched
148 148 # request now. Decode it:
149 149 yield decode(encresref.value)
150 150
151 151 The decorator returns a function which wraps this coroutine as a plain
152 152 method, but adds the original method as an attribute called "batchable",
153 153 which is used by remotebatch to split the call into separate encoding and
154 154 decoding phases.
155 155 '''
156 156 def plain(*args, **opts):
157 157 batchable = f(*args, **opts)
158 158 encargsorres, encresref = batchable.next()
159 159 if not encresref:
160 160 return encargsorres # a local result in this case
161 161 self = args[0]
162 162 encresref.set(self._submitone(f.func_name, encargsorres))
163 163 return batchable.next()
164 164 setattr(plain, 'batchable', f)
165 165 return plain
166 166
167 167 # list of nodes encoding / decoding
168 168
169 169 def decodelist(l, sep=' '):
170 170 if l:
171 171 return map(bin, l.split(sep))
172 172 return []
173 173
174 174 def encodelist(l, sep=' '):
175 return sep.join(map(hex, l))
175 try:
176 return sep.join(map(hex, l))
177 except TypeError:
178 print l
179 raise
176 180
177 181 # batched call argument encoding
178 182
179 183 def escapearg(plain):
180 184 return (plain
181 185 .replace(':', '::')
182 186 .replace(',', ':,')
183 187 .replace(';', ':;')
184 188 .replace('=', ':='))
185 189
186 190 def unescapearg(escaped):
187 191 return (escaped
188 192 .replace(':=', '=')
189 193 .replace(':;', ';')
190 194 .replace(':,', ',')
191 195 .replace('::', ':'))
192 196
193 197 # mapping of options accepted by getbundle and their types
194 198 #
195 199 # Meant to be extended by extensions. It is extensions responsibility to ensure
196 200 # such options are properly processed in exchange.getbundle.
197 201 #
198 202 # supported types are:
199 203 #
200 204 # :nodes: list of binary nodes
201 205 # :csv: list of comma-separated values
202 206 # :plain: string with no transformation needed.
203 207 gboptsmap = {'heads': 'nodes',
204 208 'common': 'nodes',
205 209 'obsmarkers': 'boolean',
206 210 'bundlecaps': 'csv',
207 211 'listkeys': 'csv',
208 212 'cg': 'boolean'}
209 213
210 214 # client side
211 215
212 216 class wirepeer(peer.peerrepository):
213 217
214 218 def batch(self):
215 219 return remotebatch(self)
216 220 def _submitbatch(self, req):
217 221 cmds = []
218 222 for op, argsdict in req:
219 223 args = ','.join('%s=%s' % p for p in argsdict.iteritems())
220 224 cmds.append('%s %s' % (op, args))
221 225 rsp = self._call("batch", cmds=';'.join(cmds))
222 226 return rsp.split(';')
223 227 def _submitone(self, op, args):
224 228 return self._call(op, **args)
225 229
226 230 @batchable
227 231 def lookup(self, key):
228 232 self.requirecap('lookup', _('look up remote revision'))
229 233 f = future()
230 234 yield {'key': encoding.fromlocal(key)}, f
231 235 d = f.value
232 236 success, data = d[:-1].split(" ", 1)
233 237 if int(success):
234 238 yield bin(data)
235 239 self._abort(error.RepoError(data))
236 240
237 241 @batchable
238 242 def heads(self):
239 243 f = future()
240 244 yield {}, f
241 245 d = f.value
242 246 try:
243 247 yield decodelist(d[:-1])
244 248 except ValueError:
245 249 self._abort(error.ResponseError(_("unexpected response:"), d))
246 250
247 251 @batchable
248 252 def known(self, nodes):
249 253 f = future()
250 254 yield {'nodes': encodelist(nodes)}, f
251 255 d = f.value
252 256 try:
253 257 yield [bool(int(b)) for b in d]
254 258 except ValueError:
255 259 self._abort(error.ResponseError(_("unexpected response:"), d))
256 260
257 261 @batchable
258 262 def branchmap(self):
259 263 f = future()
260 264 yield {}, f
261 265 d = f.value
262 266 try:
263 267 branchmap = {}
264 268 for branchpart in d.splitlines():
265 269 branchname, branchheads = branchpart.split(' ', 1)
266 270 branchname = encoding.tolocal(urllib.unquote(branchname))
267 271 branchheads = decodelist(branchheads)
268 272 branchmap[branchname] = branchheads
269 273 yield branchmap
270 274 except TypeError:
271 275 self._abort(error.ResponseError(_("unexpected response:"), d))
272 276
273 277 def branches(self, nodes):
274 278 n = encodelist(nodes)
275 279 d = self._call("branches", nodes=n)
276 280 try:
277 281 br = [tuple(decodelist(b)) for b in d.splitlines()]
278 282 return br
279 283 except ValueError:
280 284 self._abort(error.ResponseError(_("unexpected response:"), d))
281 285
282 286 def between(self, pairs):
283 287 batch = 8 # avoid giant requests
284 288 r = []
285 289 for i in xrange(0, len(pairs), batch):
286 290 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
287 291 d = self._call("between", pairs=n)
288 292 try:
289 293 r.extend(l and decodelist(l) or [] for l in d.splitlines())
290 294 except ValueError:
291 295 self._abort(error.ResponseError(_("unexpected response:"), d))
292 296 return r
293 297
294 298 @batchable
295 299 def pushkey(self, namespace, key, old, new):
296 300 if not self.capable('pushkey'):
297 301 yield False, None
298 302 f = future()
299 303 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
300 304 yield {'namespace': encoding.fromlocal(namespace),
301 305 'key': encoding.fromlocal(key),
302 306 'old': encoding.fromlocal(old),
303 307 'new': encoding.fromlocal(new)}, f
304 308 d = f.value
305 309 d, output = d.split('\n', 1)
306 310 try:
307 311 d = bool(int(d))
308 312 except ValueError:
309 313 raise error.ResponseError(
310 314 _('push failed (unexpected response):'), d)
311 315 for l in output.splitlines(True):
312 316 self.ui.status(_('remote: '), l)
313 317 yield d
314 318
315 319 @batchable
316 320 def listkeys(self, namespace):
317 321 if not self.capable('pushkey'):
318 322 yield {}, None
319 323 f = future()
320 324 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
321 325 yield {'namespace': encoding.fromlocal(namespace)}, f
322 326 d = f.value
323 327 yield pushkeymod.decodekeys(d)
324 328
325 329 def stream_out(self):
326 330 return self._callstream('stream_out')
327 331
328 332 def changegroup(self, nodes, kind):
329 333 n = encodelist(nodes)
330 334 f = self._callcompressable("changegroup", roots=n)
331 335 return changegroupmod.cg1unpacker(f, 'UN')
332 336
333 337 def changegroupsubset(self, bases, heads, kind):
334 338 self.requirecap('changegroupsubset', _('look up remote changes'))
335 339 bases = encodelist(bases)
336 340 heads = encodelist(heads)
337 341 f = self._callcompressable("changegroupsubset",
338 342 bases=bases, heads=heads)
339 343 return changegroupmod.cg1unpacker(f, 'UN')
340 344
341 345 def getbundle(self, source, **kwargs):
342 346 self.requirecap('getbundle', _('look up remote changes'))
343 347 opts = {}
344 348 for key, value in kwargs.iteritems():
345 349 if value is None:
346 350 continue
347 351 keytype = gboptsmap.get(key)
348 352 if keytype is None:
349 353 assert False, 'unexpected'
350 354 elif keytype == 'nodes':
351 355 value = encodelist(value)
352 356 elif keytype == 'csv':
353 357 value = ','.join(value)
354 358 elif keytype == 'boolean':
355 359 value = '%i' % bool(value)
356 360 elif keytype != 'plain':
357 361 raise KeyError('unknown getbundle option type %s'
358 362 % keytype)
359 363 opts[key] = value
360 364 f = self._callcompressable("getbundle", **opts)
361 365 bundlecaps = kwargs.get('bundlecaps')
362 366 if bundlecaps is not None and 'HG2Y' in bundlecaps:
363 367 return bundle2.unbundle20(self.ui, f)
364 368 else:
365 369 return changegroupmod.cg1unpacker(f, 'UN')
366 370
367 371 def unbundle(self, cg, heads, source):
368 372 '''Send cg (a readable file-like object representing the
369 373 changegroup to push, typically a chunkbuffer object) to the
370 374 remote server as a bundle.
371 375
372 376 When pushing a bundle10 stream, return an integer indicating the
373 377 result of the push (see localrepository.addchangegroup()).
374 378
375 379 When pushing a bundle20 stream, return a bundle20 stream.'''
376 380
377 381 if heads != ['force'] and self.capable('unbundlehash'):
378 382 heads = encodelist(['hashed',
379 383 util.sha1(''.join(sorted(heads))).digest()])
380 384 else:
381 385 heads = encodelist(heads)
382 386
383 387 if util.safehasattr(cg, 'deltaheader'):
384 388 # this a bundle10, do the old style call sequence
385 389 ret, output = self._callpush("unbundle", cg, heads=heads)
386 390 if ret == "":
387 391 raise error.ResponseError(
388 392 _('push failed:'), output)
389 393 try:
390 394 ret = int(ret)
391 395 except ValueError:
392 396 raise error.ResponseError(
393 397 _('push failed (unexpected response):'), ret)
394 398
395 399 for l in output.splitlines(True):
396 400 self.ui.status(_('remote: '), l)
397 401 else:
398 402 # bundle2 push. Send a stream, fetch a stream.
399 403 stream = self._calltwowaystream('unbundle', cg, heads=heads)
400 404 ret = bundle2.unbundle20(self.ui, stream)
401 405 return ret
402 406
403 407 def debugwireargs(self, one, two, three=None, four=None, five=None):
404 408 # don't pass optional arguments left at their default value
405 409 opts = {}
406 410 if three is not None:
407 411 opts['three'] = three
408 412 if four is not None:
409 413 opts['four'] = four
410 414 return self._call('debugwireargs', one=one, two=two, **opts)
411 415
412 416 def _call(self, cmd, **args):
413 417 """execute <cmd> on the server
414 418
415 419 The command is expected to return a simple string.
416 420
417 421 returns the server reply as a string."""
418 422 raise NotImplementedError()
419 423
420 424 def _callstream(self, cmd, **args):
421 425 """execute <cmd> on the server
422 426
423 427 The command is expected to return a stream.
424 428
425 429 returns the server reply as a file like object."""
426 430 raise NotImplementedError()
427 431
428 432 def _callcompressable(self, cmd, **args):
429 433 """execute <cmd> on the server
430 434
431 435 The command is expected to return a stream.
432 436
433 437 The stream may have been compressed in some implementations. This
434 438 function takes care of the decompression. This is the only difference
435 439 with _callstream.
436 440
437 441 returns the server reply as a file like object.
438 442 """
439 443 raise NotImplementedError()
440 444
441 445 def _callpush(self, cmd, fp, **args):
442 446 """execute a <cmd> on server
443 447
444 448 The command is expected to be related to a push. Push has a special
445 449 return method.
446 450
447 451 returns the server reply as a (ret, output) tuple. ret is either
448 452 empty (error) or a stringified int.
449 453 """
450 454 raise NotImplementedError()
451 455
452 456 def _calltwowaystream(self, cmd, fp, **args):
453 457 """execute <cmd> on server
454 458
455 459 The command will send a stream to the server and get a stream in reply.
456 460 """
457 461 raise NotImplementedError()
458 462
459 463 def _abort(self, exception):
460 464 """clearly abort the wire protocol connection and raise the exception
461 465 """
462 466 raise NotImplementedError()
463 467
464 468 # server side
465 469
466 470 # wire protocol command can either return a string or one of these classes.
467 471 class streamres(object):
468 472 """wireproto reply: binary stream
469 473
470 474 The call was successful and the result is a stream.
471 475 Iterate on the `self.gen` attribute to retrieve chunks.
472 476 """
473 477 def __init__(self, gen):
474 478 self.gen = gen
475 479
476 480 class pushres(object):
477 481 """wireproto reply: success with simple integer return
478 482
479 483 The call was successful and returned an integer contained in `self.res`.
480 484 """
481 485 def __init__(self, res):
482 486 self.res = res
483 487
484 488 class pusherr(object):
485 489 """wireproto reply: failure
486 490
487 491 The call failed. The `self.res` attribute contains the error message.
488 492 """
489 493 def __init__(self, res):
490 494 self.res = res
491 495
492 496 class ooberror(object):
493 497 """wireproto reply: failure of a batch of operation
494 498
495 499 Something failed during a batch call. The error message is stored in
496 500 `self.message`.
497 501 """
498 502 def __init__(self, message):
499 503 self.message = message
500 504
501 505 def dispatch(repo, proto, command):
502 506 repo = repo.filtered("served")
503 507 func, spec = commands[command]
504 508 args = proto.getargs(spec)
505 509 return func(repo, proto, *args)
506 510
507 511 def options(cmd, keys, others):
508 512 opts = {}
509 513 for k in keys:
510 514 if k in others:
511 515 opts[k] = others[k]
512 516 del others[k]
513 517 if others:
514 518 sys.stderr.write("warning: %s ignored unexpected arguments %s\n"
515 519 % (cmd, ",".join(others)))
516 520 return opts
517 521
518 522 # list of commands
519 523 commands = {}
520 524
521 525 def wireprotocommand(name, args=''):
522 526 """decorator for wire protocol command"""
523 527 def register(func):
524 528 commands[name] = (func, args)
525 529 return func
526 530 return register
527 531
528 532 @wireprotocommand('batch', 'cmds *')
529 533 def batch(repo, proto, cmds, others):
530 534 repo = repo.filtered("served")
531 535 res = []
532 536 for pair in cmds.split(';'):
533 537 op, args = pair.split(' ', 1)
534 538 vals = {}
535 539 for a in args.split(','):
536 540 if a:
537 541 n, v = a.split('=')
538 542 vals[n] = unescapearg(v)
539 543 func, spec = commands[op]
540 544 if spec:
541 545 keys = spec.split()
542 546 data = {}
543 547 for k in keys:
544 548 if k == '*':
545 549 star = {}
546 550 for key in vals.keys():
547 551 if key not in keys:
548 552 star[key] = vals[key]
549 553 data['*'] = star
550 554 else:
551 555 data[k] = vals[k]
552 556 result = func(repo, proto, *[data[k] for k in keys])
553 557 else:
554 558 result = func(repo, proto)
555 559 if isinstance(result, ooberror):
556 560 return result
557 561 res.append(escapearg(result))
558 562 return ';'.join(res)
559 563
560 564 @wireprotocommand('between', 'pairs')
561 565 def between(repo, proto, pairs):
562 566 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
563 567 r = []
564 568 for b in repo.between(pairs):
565 569 r.append(encodelist(b) + "\n")
566 570 return "".join(r)
567 571
568 572 @wireprotocommand('branchmap')
569 573 def branchmap(repo, proto):
570 574 branchmap = repo.branchmap()
571 575 heads = []
572 576 for branch, nodes in branchmap.iteritems():
573 577 branchname = urllib.quote(encoding.fromlocal(branch))
574 578 branchnodes = encodelist(nodes)
575 579 heads.append('%s %s' % (branchname, branchnodes))
576 580 return '\n'.join(heads)
577 581
578 582 @wireprotocommand('branches', 'nodes')
579 583 def branches(repo, proto, nodes):
580 584 nodes = decodelist(nodes)
581 585 r = []
582 586 for b in repo.branches(nodes):
583 587 r.append(encodelist(b) + "\n")
584 588 return "".join(r)
585 589
586 590
587 591 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
588 592 'known', 'getbundle', 'unbundlehash', 'batch']
589 593
590 594 def _capabilities(repo, proto):
591 595 """return a list of capabilities for a repo
592 596
593 597 This function exists to allow extensions to easily wrap capabilities
594 598 computation
595 599
596 600 - returns a lists: easy to alter
597 601 - change done here will be propagated to both `capabilities` and `hello`
598 602 command without any other action needed.
599 603 """
600 604 # copy to prevent modification of the global list
601 605 caps = list(wireprotocaps)
602 606 if _allowstream(repo.ui):
603 607 if repo.ui.configbool('server', 'preferuncompressed', False):
604 608 caps.append('stream-preferred')
605 609 requiredformats = repo.requirements & repo.supportedformats
606 610 # if our local revlogs are just revlogv1, add 'stream' cap
607 611 if not requiredformats - set(('revlogv1',)):
608 612 caps.append('stream')
609 613 # otherwise, add 'streamreqs' detailing our local revlog format
610 614 else:
611 615 caps.append('streamreqs=%s' % ','.join(requiredformats))
612 616 if repo.ui.configbool('experimental', 'bundle2-exp', False):
613 617 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
614 618 caps.append('bundle2-exp=' + urllib.quote(capsblob))
615 619 caps.append('unbundle=%s' % ','.join(changegroupmod.bundlepriority))
616 620 caps.append('httpheader=1024')
617 621 return caps
618 622
619 623 # If you are writing an extension and consider wrapping this function. Wrap
620 624 # `_capabilities` instead.
621 625 @wireprotocommand('capabilities')
622 626 def capabilities(repo, proto):
623 627 return ' '.join(_capabilities(repo, proto))
624 628
625 629 @wireprotocommand('changegroup', 'roots')
626 630 def changegroup(repo, proto, roots):
627 631 nodes = decodelist(roots)
628 632 cg = changegroupmod.changegroup(repo, nodes, 'serve')
629 633 return streamres(proto.groupchunks(cg))
630 634
631 635 @wireprotocommand('changegroupsubset', 'bases heads')
632 636 def changegroupsubset(repo, proto, bases, heads):
633 637 bases = decodelist(bases)
634 638 heads = decodelist(heads)
635 639 cg = changegroupmod.changegroupsubset(repo, bases, heads, 'serve')
636 640 return streamres(proto.groupchunks(cg))
637 641
638 642 @wireprotocommand('debugwireargs', 'one two *')
639 643 def debugwireargs(repo, proto, one, two, others):
640 644 # only accept optional args from the known set
641 645 opts = options('debugwireargs', ['three', 'four'], others)
642 646 return repo.debugwireargs(one, two, **opts)
643 647
644 648 # List of options accepted by getbundle.
645 649 #
646 650 # Meant to be extended by extensions. It is the extension's responsibility to
647 651 # ensure such options are properly processed in exchange.getbundle.
648 652 gboptslist = ['heads', 'common', 'bundlecaps']
649 653
650 654 @wireprotocommand('getbundle', '*')
651 655 def getbundle(repo, proto, others):
652 656 opts = options('getbundle', gboptsmap.keys(), others)
653 657 for k, v in opts.iteritems():
654 658 keytype = gboptsmap[k]
655 659 if keytype == 'nodes':
656 660 opts[k] = decodelist(v)
657 661 elif keytype == 'csv':
658 662 opts[k] = set(v.split(','))
659 663 elif keytype == 'boolean':
660 664 opts[k] = bool(v)
661 665 elif keytype != 'plain':
662 666 raise KeyError('unknown getbundle option type %s'
663 667 % keytype)
664 668 cg = exchange.getbundle(repo, 'serve', **opts)
665 669 return streamres(proto.groupchunks(cg))
666 670
667 671 @wireprotocommand('heads')
668 672 def heads(repo, proto):
669 673 h = repo.heads()
670 674 return encodelist(h) + "\n"
671 675
672 676 @wireprotocommand('hello')
673 677 def hello(repo, proto):
674 678 '''the hello command returns a set of lines describing various
675 679 interesting things about the server, in an RFC822-like format.
676 680 Currently the only one defined is "capabilities", which
677 681 consists of a line in the form:
678 682
679 683 capabilities: space separated list of tokens
680 684 '''
681 685 return "capabilities: %s\n" % (capabilities(repo, proto))
682 686
683 687 @wireprotocommand('listkeys', 'namespace')
684 688 def listkeys(repo, proto, namespace):
685 689 d = repo.listkeys(encoding.tolocal(namespace)).items()
686 690 return pushkeymod.encodekeys(d)
687 691
688 692 @wireprotocommand('lookup', 'key')
689 693 def lookup(repo, proto, key):
690 694 try:
691 695 k = encoding.tolocal(key)
692 696 c = repo[k]
693 697 r = c.hex()
694 698 success = 1
695 699 except Exception, inst:
696 700 r = str(inst)
697 701 success = 0
698 702 return "%s %s\n" % (success, r)
699 703
700 704 @wireprotocommand('known', 'nodes *')
701 705 def known(repo, proto, nodes, others):
702 706 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
703 707
704 708 @wireprotocommand('pushkey', 'namespace key old new')
705 709 def pushkey(repo, proto, namespace, key, old, new):
706 710 # compatibility with pre-1.8 clients which were accidentally
707 711 # sending raw binary nodes rather than utf-8-encoded hex
708 712 if len(new) == 20 and new.encode('string-escape') != new:
709 713 # looks like it could be a binary node
710 714 try:
711 715 new.decode('utf-8')
712 716 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
713 717 except UnicodeDecodeError:
714 718 pass # binary, leave unmodified
715 719 else:
716 720 new = encoding.tolocal(new) # normal path
717 721
718 722 if util.safehasattr(proto, 'restore'):
719 723
720 724 proto.redirect()
721 725
722 726 try:
723 727 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
724 728 encoding.tolocal(old), new) or False
725 729 except util.Abort:
726 730 r = False
727 731
728 732 output = proto.restore()
729 733
730 734 return '%s\n%s' % (int(r), output)
731 735
732 736 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
733 737 encoding.tolocal(old), new)
734 738 return '%s\n' % int(r)
735 739
736 740 def _allowstream(ui):
737 741 return ui.configbool('server', 'uncompressed', True, untrusted=True)
738 742
739 743 def _walkstreamfiles(repo):
740 744 # this is it's own function so extensions can override it
741 745 return repo.store.walk()
742 746
743 747 @wireprotocommand('stream_out')
744 748 def stream(repo, proto):
745 749 '''If the server supports streaming clone, it advertises the "stream"
746 750 capability with a value representing the version and flags of the repo
747 751 it is serving. Client checks to see if it understands the format.
748 752
749 753 The format is simple: the server writes out a line with the amount
750 754 of files, then the total amount of bytes to be transferred (separated
751 755 by a space). Then, for each file, the server first writes the filename
752 756 and file size (separated by the null character), then the file contents.
753 757 '''
754 758
755 759 if not _allowstream(repo.ui):
756 760 return '1\n'
757 761
758 762 entries = []
759 763 total_bytes = 0
760 764 try:
761 765 # get consistent snapshot of repo, lock during scan
762 766 lock = repo.lock()
763 767 try:
764 768 repo.ui.debug('scanning\n')
765 769 for name, ename, size in _walkstreamfiles(repo):
766 770 if size:
767 771 entries.append((name, size))
768 772 total_bytes += size
769 773 finally:
770 774 lock.release()
771 775 except error.LockError:
772 776 return '2\n' # error: 2
773 777
774 778 def streamer(repo, entries, total):
775 779 '''stream out all metadata files in repository.'''
776 780 yield '0\n' # success
777 781 repo.ui.debug('%d files, %d bytes to transfer\n' %
778 782 (len(entries), total_bytes))
779 783 yield '%d %d\n' % (len(entries), total_bytes)
780 784
781 785 sopener = repo.sopener
782 786 oldaudit = sopener.mustaudit
783 787 debugflag = repo.ui.debugflag
784 788 sopener.mustaudit = False
785 789
786 790 try:
787 791 for name, size in entries:
788 792 if debugflag:
789 793 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
790 794 # partially encode name over the wire for backwards compat
791 795 yield '%s\0%d\n' % (store.encodedir(name), size)
792 796 if size <= 65536:
793 797 fp = sopener(name)
794 798 try:
795 799 data = fp.read(size)
796 800 finally:
797 801 fp.close()
798 802 yield data
799 803 else:
800 804 for chunk in util.filechunkiter(sopener(name), limit=size):
801 805 yield chunk
802 806 # replace with "finally:" when support for python 2.4 has been dropped
803 807 except Exception:
804 808 sopener.mustaudit = oldaudit
805 809 raise
806 810 sopener.mustaudit = oldaudit
807 811
808 812 return streamres(streamer(repo, entries, total_bytes))
809 813
810 814 @wireprotocommand('unbundle', 'heads')
811 815 def unbundle(repo, proto, heads):
812 816 their_heads = decodelist(heads)
813 817
814 818 try:
815 819 proto.redirect()
816 820
817 821 exchange.check_heads(repo, their_heads, 'preparing changes')
818 822
819 823 # write bundle data to temporary file because it can be big
820 824 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
821 825 fp = os.fdopen(fd, 'wb+')
822 826 r = 0
823 827 try:
824 828 proto.getfile(fp)
825 829 fp.seek(0)
826 830 gen = exchange.readbundle(repo.ui, fp, None)
827 831 r = exchange.unbundle(repo, gen, their_heads, 'serve',
828 832 proto._client())
829 833 if util.safehasattr(r, 'addpart'):
830 834 # The return looks streamable, we are in the bundle2 case and
831 835 # should return a stream.
832 836 return streamres(r.getchunks())
833 837 return pushres(r)
834 838
835 839 finally:
836 840 fp.close()
837 841 os.unlink(tempname)
838 842 except error.BundleValueError, exc:
839 843 bundler = bundle2.bundle20(repo.ui)
840 844 errpart = bundler.newpart('b2x:error:unsupportedcontent')
841 845 if exc.parttype is not None:
842 846 errpart.addparam('parttype', exc.parttype)
843 847 if exc.params:
844 848 errpart.addparam('params', '\0'.join(exc.params))
845 849 return streamres(bundler.getchunks())
846 850 except util.Abort, inst:
847 851 # The old code we moved used sys.stderr directly.
848 852 # We did not change it to minimise code change.
849 853 # This need to be moved to something proper.
850 854 # Feel free to do it.
851 855 if getattr(inst, 'duringunbundle2', False):
852 856 bundler = bundle2.bundle20(repo.ui)
853 857 manargs = [('message', str(inst))]
854 858 advargs = []
855 859 if inst.hint is not None:
856 860 advargs.append(('hint', inst.hint))
857 861 bundler.addpart(bundle2.bundlepart('b2x:error:abort',
858 862 manargs, advargs))
859 863 return streamres(bundler.getchunks())
860 864 else:
861 865 sys.stderr.write("abort: %s\n" % inst)
862 866 return pushres(0)
863 867 except error.PushRaced, exc:
864 868 if getattr(exc, 'duringunbundle2', False):
865 869 bundler = bundle2.bundle20(repo.ui)
866 870 bundler.newpart('b2x:error:pushraced', [('message', str(exc))])
867 871 return streamres(bundler.getchunks())
868 872 else:
869 873 return pusherr(str(exc))
General Comments 0
You need to be logged in to leave comments. Login now