##// END OF EJS Templates
unbundle: extract the core logic in another function...
Pierre-Yves David -
r20968:33d5fdd9 default
parent child Browse files
Show More
@@ -1,630 +1,656 b''
1 1 # exchange.py - utily to exchange data between repo.
2 2 #
3 3 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 import sys
8 9 from i18n import _
9 10 from node import hex, nullid
10 11 import cStringIO
11 12 import errno
12 13 import util, scmutil, changegroup, base85
13 14 import discovery, phases, obsolete, bookmarks, bundle2
14 15
15 16
16 17 class pushoperation(object):
17 18 """A object that represent a single push operation
18 19
19 20 It purpose is to carry push related state and very common operation.
20 21
21 22 A new should be created at the begining of each push and discarded
22 23 afterward.
23 24 """
24 25
25 26 def __init__(self, repo, remote, force=False, revs=None, newbranch=False):
26 27 # repo we push from
27 28 self.repo = repo
28 29 self.ui = repo.ui
29 30 # repo we push to
30 31 self.remote = remote
31 32 # force option provided
32 33 self.force = force
33 34 # revs to be pushed (None is "all")
34 35 self.revs = revs
35 36 # allow push of new branch
36 37 self.newbranch = newbranch
37 38 # did a local lock get acquired?
38 39 self.locallocked = None
39 40 # Integer version of the push result
40 41 # - None means nothing to push
41 42 # - 0 means HTTP error
42 43 # - 1 means we pushed and remote head count is unchanged *or*
43 44 # we have outgoing changesets but refused to push
44 45 # - other values as described by addchangegroup()
45 46 self.ret = None
46 47 # discover.outgoing object (contains common and outgoin data)
47 48 self.outgoing = None
48 49 # all remote heads before the push
49 50 self.remoteheads = None
50 51 # testable as a boolean indicating if any nodes are missing locally.
51 52 self.incoming = None
52 53 # set of all heads common after changeset bundle push
53 54 self.commonheads = None
54 55
55 56 def push(repo, remote, force=False, revs=None, newbranch=False):
56 57 '''Push outgoing changesets (limited by revs) from a local
57 58 repository to remote. Return an integer:
58 59 - None means nothing to push
59 60 - 0 means HTTP error
60 61 - 1 means we pushed and remote head count is unchanged *or*
61 62 we have outgoing changesets but refused to push
62 63 - other values as described by addchangegroup()
63 64 '''
64 65 pushop = pushoperation(repo, remote, force, revs, newbranch)
65 66 if pushop.remote.local():
66 67 missing = (set(pushop.repo.requirements)
67 68 - pushop.remote.local().supported)
68 69 if missing:
69 70 msg = _("required features are not"
70 71 " supported in the destination:"
71 72 " %s") % (', '.join(sorted(missing)))
72 73 raise util.Abort(msg)
73 74
74 75 # there are two ways to push to remote repo:
75 76 #
76 77 # addchangegroup assumes local user can lock remote
77 78 # repo (local filesystem, old ssh servers).
78 79 #
79 80 # unbundle assumes local user cannot lock remote repo (new ssh
80 81 # servers, http servers).
81 82
82 83 if not pushop.remote.canpush():
83 84 raise util.Abort(_("destination does not support push"))
84 85 # get local lock as we might write phase data
85 86 locallock = None
86 87 try:
87 88 locallock = pushop.repo.lock()
88 89 pushop.locallocked = True
89 90 except IOError, err:
90 91 pushop.locallocked = False
91 92 if err.errno != errno.EACCES:
92 93 raise
93 94 # source repo cannot be locked.
94 95 # We do not abort the push, but just disable the local phase
95 96 # synchronisation.
96 97 msg = 'cannot lock source repository: %s\n' % err
97 98 pushop.ui.debug(msg)
98 99 try:
99 100 pushop.repo.checkpush(pushop)
100 101 lock = None
101 102 unbundle = pushop.remote.capable('unbundle')
102 103 if not unbundle:
103 104 lock = pushop.remote.lock()
104 105 try:
105 106 _pushdiscovery(pushop)
106 107 if _pushcheckoutgoing(pushop):
107 108 _pushchangeset(pushop)
108 109 _pushcomputecommonheads(pushop)
109 110 _pushsyncphase(pushop)
110 111 _pushobsolete(pushop)
111 112 finally:
112 113 if lock is not None:
113 114 lock.release()
114 115 finally:
115 116 if locallock is not None:
116 117 locallock.release()
117 118
118 119 _pushbookmark(pushop)
119 120 return pushop.ret
120 121
121 122 def _pushdiscovery(pushop):
122 123 # discovery
123 124 unfi = pushop.repo.unfiltered()
124 125 fci = discovery.findcommonincoming
125 126 commoninc = fci(unfi, pushop.remote, force=pushop.force)
126 127 common, inc, remoteheads = commoninc
127 128 fco = discovery.findcommonoutgoing
128 129 outgoing = fco(unfi, pushop.remote, onlyheads=pushop.revs,
129 130 commoninc=commoninc, force=pushop.force)
130 131 pushop.outgoing = outgoing
131 132 pushop.remoteheads = remoteheads
132 133 pushop.incoming = inc
133 134
134 135 def _pushcheckoutgoing(pushop):
135 136 outgoing = pushop.outgoing
136 137 unfi = pushop.repo.unfiltered()
137 138 if not outgoing.missing:
138 139 # nothing to push
139 140 scmutil.nochangesfound(unfi.ui, unfi, outgoing.excluded)
140 141 return False
141 142 # something to push
142 143 if not pushop.force:
143 144 # if repo.obsstore == False --> no obsolete
144 145 # then, save the iteration
145 146 if unfi.obsstore:
146 147 # this message are here for 80 char limit reason
147 148 mso = _("push includes obsolete changeset: %s!")
148 149 mst = "push includes %s changeset: %s!"
149 150 # plain versions for i18n tool to detect them
150 151 _("push includes unstable changeset: %s!")
151 152 _("push includes bumped changeset: %s!")
152 153 _("push includes divergent changeset: %s!")
153 154 # If we are to push if there is at least one
154 155 # obsolete or unstable changeset in missing, at
155 156 # least one of the missinghead will be obsolete or
156 157 # unstable. So checking heads only is ok
157 158 for node in outgoing.missingheads:
158 159 ctx = unfi[node]
159 160 if ctx.obsolete():
160 161 raise util.Abort(mso % ctx)
161 162 elif ctx.troubled():
162 163 raise util.Abort(_(mst)
163 164 % (ctx.troubles()[0],
164 165 ctx))
165 166 newbm = pushop.ui.configlist('bookmarks', 'pushing')
166 167 discovery.checkheads(unfi, pushop.remote, outgoing,
167 168 pushop.remoteheads,
168 169 pushop.newbranch,
169 170 bool(pushop.incoming),
170 171 newbm)
171 172 return True
172 173
173 174 def _pushchangeset(pushop):
174 175 """Make the actual push of changeset bundle to remote repo"""
175 176 outgoing = pushop.outgoing
176 177 unbundle = pushop.remote.capable('unbundle')
177 178 # TODO: get bundlecaps from remote
178 179 bundlecaps = None
179 180 # create a changegroup from local
180 181 if pushop.revs is None and not (outgoing.excluded
181 182 or pushop.repo.changelog.filteredrevs):
182 183 # push everything,
183 184 # use the fast path, no race possible on push
184 185 bundler = changegroup.bundle10(pushop.repo, bundlecaps)
185 186 cg = changegroup.getsubset(pushop.repo,
186 187 outgoing,
187 188 bundler,
188 189 'push',
189 190 fastpath=True)
190 191 else:
191 192 cg = changegroup.getlocalbundle(pushop.repo, 'push', outgoing,
192 193 bundlecaps)
193 194
194 195 # apply changegroup to remote
195 196 if unbundle:
196 197 # local repo finds heads on server, finds out what
197 198 # revs it must push. once revs transferred, if server
198 199 # finds it has different heads (someone else won
199 200 # commit/push race), server aborts.
200 201 if pushop.force:
201 202 remoteheads = ['force']
202 203 else:
203 204 remoteheads = pushop.remoteheads
204 205 # ssh: return remote's addchangegroup()
205 206 # http: return remote's addchangegroup() or 0 for error
206 207 pushop.ret = pushop.remote.unbundle(cg, remoteheads,
207 208 'push')
208 209 else:
209 210 # we return an integer indicating remote head count
210 211 # change
211 212 pushop.ret = pushop.remote.addchangegroup(cg, 'push',
212 213 pushop.repo.url())
213 214
214 215 def _pushcomputecommonheads(pushop):
215 216 unfi = pushop.repo.unfiltered()
216 217 if pushop.ret:
217 218 # push succeed, synchronize target of the push
218 219 cheads = pushop.outgoing.missingheads
219 220 elif pushop.revs is None:
220 221 # All out push fails. synchronize all common
221 222 cheads = pushop.outgoing.commonheads
222 223 else:
223 224 # I want cheads = heads(::missingheads and ::commonheads)
224 225 # (missingheads is revs with secret changeset filtered out)
225 226 #
226 227 # This can be expressed as:
227 228 # cheads = ( (missingheads and ::commonheads)
228 229 # + (commonheads and ::missingheads))"
229 230 # )
230 231 #
231 232 # while trying to push we already computed the following:
232 233 # common = (::commonheads)
233 234 # missing = ((commonheads::missingheads) - commonheads)
234 235 #
235 236 # We can pick:
236 237 # * missingheads part of common (::commonheads)
237 238 common = set(pushop.outgoing.common)
238 239 nm = pushop.repo.changelog.nodemap
239 240 cheads = [node for node in pushop.revs if nm[node] in common]
240 241 # and
241 242 # * commonheads parents on missing
242 243 revset = unfi.set('%ln and parents(roots(%ln))',
243 244 pushop.outgoing.commonheads,
244 245 pushop.outgoing.missing)
245 246 cheads.extend(c.node() for c in revset)
246 247 pushop.commonheads = cheads
247 248
248 249 def _pushsyncphase(pushop):
249 250 """synchronise phase information locally and remotly"""
250 251 unfi = pushop.repo.unfiltered()
251 252 cheads = pushop.commonheads
252 253 if pushop.ret:
253 254 # push succeed, synchronize target of the push
254 255 cheads = pushop.outgoing.missingheads
255 256 elif pushop.revs is None:
256 257 # All out push fails. synchronize all common
257 258 cheads = pushop.outgoing.commonheads
258 259 else:
259 260 # I want cheads = heads(::missingheads and ::commonheads)
260 261 # (missingheads is revs with secret changeset filtered out)
261 262 #
262 263 # This can be expressed as:
263 264 # cheads = ( (missingheads and ::commonheads)
264 265 # + (commonheads and ::missingheads))"
265 266 # )
266 267 #
267 268 # while trying to push we already computed the following:
268 269 # common = (::commonheads)
269 270 # missing = ((commonheads::missingheads) - commonheads)
270 271 #
271 272 # We can pick:
272 273 # * missingheads part of common (::commonheads)
273 274 common = set(pushop.outgoing.common)
274 275 nm = pushop.repo.changelog.nodemap
275 276 cheads = [node for node in pushop.revs if nm[node] in common]
276 277 # and
277 278 # * commonheads parents on missing
278 279 revset = unfi.set('%ln and parents(roots(%ln))',
279 280 pushop.outgoing.commonheads,
280 281 pushop.outgoing.missing)
281 282 cheads.extend(c.node() for c in revset)
282 283 pushop.commonheads = cheads
283 284 # even when we don't push, exchanging phase data is useful
284 285 remotephases = pushop.remote.listkeys('phases')
285 286 if (pushop.ui.configbool('ui', '_usedassubrepo', False)
286 287 and remotephases # server supports phases
287 288 and pushop.ret is None # nothing was pushed
288 289 and remotephases.get('publishing', False)):
289 290 # When:
290 291 # - this is a subrepo push
291 292 # - and remote support phase
292 293 # - and no changeset was pushed
293 294 # - and remote is publishing
294 295 # We may be in issue 3871 case!
295 296 # We drop the possible phase synchronisation done by
296 297 # courtesy to publish changesets possibly locally draft
297 298 # on the remote.
298 299 remotephases = {'publishing': 'True'}
299 300 if not remotephases: # old server or public only rer
300 301 _localphasemove(pushop, cheads)
301 302 # don't push any phase data as there is nothing to push
302 303 else:
303 304 ana = phases.analyzeremotephases(pushop.repo, cheads,
304 305 remotephases)
305 306 pheads, droots = ana
306 307 ### Apply remote phase on local
307 308 if remotephases.get('publishing', False):
308 309 _localphasemove(pushop, cheads)
309 310 else: # publish = False
310 311 _localphasemove(pushop, pheads)
311 312 _localphasemove(pushop, cheads, phases.draft)
312 313 ### Apply local phase on remote
313 314
314 315 # Get the list of all revs draft on remote by public here.
315 316 # XXX Beware that revset break if droots is not strictly
316 317 # XXX root we may want to ensure it is but it is costly
317 318 outdated = unfi.set('heads((%ln::%ln) and public())',
318 319 droots, cheads)
319 320 for newremotehead in outdated:
320 321 r = pushop.remote.pushkey('phases',
321 322 newremotehead.hex(),
322 323 str(phases.draft),
323 324 str(phases.public))
324 325 if not r:
325 326 pushop.ui.warn(_('updating %s to public failed!\n')
326 327 % newremotehead)
327 328
328 329 def _localphasemove(pushop, nodes, phase=phases.public):
329 330 """move <nodes> to <phase> in the local source repo"""
330 331 if pushop.locallocked:
331 332 phases.advanceboundary(pushop.repo, phase, nodes)
332 333 else:
333 334 # repo is not locked, do not change any phases!
334 335 # Informs the user that phases should have been moved when
335 336 # applicable.
336 337 actualmoves = [n for n in nodes if phase < pushop.repo[n].phase()]
337 338 phasestr = phases.phasenames[phase]
338 339 if actualmoves:
339 340 pushop.ui.status(_('cannot lock source repo, skipping '
340 341 'local %s phase update\n') % phasestr)
341 342
342 343 def _pushobsolete(pushop):
343 344 """utility function to push obsolete markers to a remote"""
344 345 pushop.ui.debug('try to push obsolete markers to remote\n')
345 346 repo = pushop.repo
346 347 remote = pushop.remote
347 348 if (obsolete._enabled and repo.obsstore and
348 349 'obsolete' in remote.listkeys('namespaces')):
349 350 rslts = []
350 351 remotedata = repo.listkeys('obsolete')
351 352 for key in sorted(remotedata, reverse=True):
352 353 # reverse sort to ensure we end with dump0
353 354 data = remotedata[key]
354 355 rslts.append(remote.pushkey('obsolete', key, '', data))
355 356 if [r for r in rslts if not r]:
356 357 msg = _('failed to push some obsolete markers!\n')
357 358 repo.ui.warn(msg)
358 359
359 360 def _pushbookmark(pushop):
360 361 """Update bookmark position on remote"""
361 362 ui = pushop.ui
362 363 repo = pushop.repo.unfiltered()
363 364 remote = pushop.remote
364 365 ui.debug("checking for updated bookmarks\n")
365 366 revnums = map(repo.changelog.rev, pushop.revs or [])
366 367 ancestors = [a for a in repo.changelog.ancestors(revnums, inclusive=True)]
367 368 (addsrc, adddst, advsrc, advdst, diverge, differ, invalid
368 369 ) = bookmarks.compare(repo, repo._bookmarks, remote.listkeys('bookmarks'),
369 370 srchex=hex)
370 371
371 372 for b, scid, dcid in advsrc:
372 373 if ancestors and repo[scid].rev() not in ancestors:
373 374 continue
374 375 if remote.pushkey('bookmarks', b, dcid, scid):
375 376 ui.status(_("updating bookmark %s\n") % b)
376 377 else:
377 378 ui.warn(_('updating bookmark %s failed!\n') % b)
378 379
379 380 class pulloperation(object):
380 381 """A object that represent a single pull operation
381 382
382 383 It purpose is to carry push related state and very common operation.
383 384
384 385 A new should be created at the begining of each pull and discarded
385 386 afterward.
386 387 """
387 388
388 389 def __init__(self, repo, remote, heads=None, force=False):
389 390 # repo we pull into
390 391 self.repo = repo
391 392 # repo we pull from
392 393 self.remote = remote
393 394 # revision we try to pull (None is "all")
394 395 self.heads = heads
395 396 # do we force pull?
396 397 self.force = force
397 398 # the name the pull transaction
398 399 self._trname = 'pull\n' + util.hidepassword(remote.url())
399 400 # hold the transaction once created
400 401 self._tr = None
401 402 # set of common changeset between local and remote before pull
402 403 self.common = None
403 404 # set of pulled head
404 405 self.rheads = None
405 406 # list of missing changeset to fetch remotly
406 407 self.fetch = None
407 408 # result of changegroup pulling (used as returng code by pull)
408 409 self.cgresult = None
409 410 # list of step remaining todo (related to future bundle2 usage)
410 411 self.todosteps = set(['changegroup', 'phases', 'obsmarkers'])
411 412
412 413 @util.propertycache
413 414 def pulledsubset(self):
414 415 """heads of the set of changeset target by the pull"""
415 416 # compute target subset
416 417 if self.heads is None:
417 418 # We pulled every thing possible
418 419 # sync on everything common
419 420 c = set(self.common)
420 421 ret = list(self.common)
421 422 for n in self.rheads:
422 423 if n not in c:
423 424 ret.append(n)
424 425 return ret
425 426 else:
426 427 # We pulled a specific subset
427 428 # sync on this subset
428 429 return self.heads
429 430
430 431 def gettransaction(self):
431 432 """get appropriate pull transaction, creating it if needed"""
432 433 if self._tr is None:
433 434 self._tr = self.repo.transaction(self._trname)
434 435 return self._tr
435 436
436 437 def closetransaction(self):
437 438 """close transaction if created"""
438 439 if self._tr is not None:
439 440 self._tr.close()
440 441
441 442 def releasetransaction(self):
442 443 """release transaction if created"""
443 444 if self._tr is not None:
444 445 self._tr.release()
445 446
446 447 def pull(repo, remote, heads=None, force=False):
447 448 pullop = pulloperation(repo, remote, heads, force)
448 449 if pullop.remote.local():
449 450 missing = set(pullop.remote.requirements) - pullop.repo.supported
450 451 if missing:
451 452 msg = _("required features are not"
452 453 " supported in the destination:"
453 454 " %s") % (', '.join(sorted(missing)))
454 455 raise util.Abort(msg)
455 456
456 457 lock = pullop.repo.lock()
457 458 try:
458 459 _pulldiscovery(pullop)
459 460 if pullop.remote.capable('bundle2'):
460 461 _pullbundle2(pullop)
461 462 if 'changegroup' in pullop.todosteps:
462 463 _pullchangeset(pullop)
463 464 if 'phases' in pullop.todosteps:
464 465 _pullphase(pullop)
465 466 if 'obsmarkers' in pullop.todosteps:
466 467 _pullobsolete(pullop)
467 468 pullop.closetransaction()
468 469 finally:
469 470 pullop.releasetransaction()
470 471 lock.release()
471 472
472 473 return pullop.cgresult
473 474
474 475 def _pulldiscovery(pullop):
475 476 """discovery phase for the pull
476 477
477 478 Current handle changeset discovery only, will change handle all discovery
478 479 at some point."""
479 480 tmp = discovery.findcommonincoming(pullop.repo.unfiltered(),
480 481 pullop.remote,
481 482 heads=pullop.heads,
482 483 force=pullop.force)
483 484 pullop.common, pullop.fetch, pullop.rheads = tmp
484 485
485 486 def _pullbundle2(pullop):
486 487 """pull data using bundle2
487 488
488 489 For now, the only supported data are changegroup."""
489 490 kwargs = {'bundlecaps': set(['HG20'])}
490 491 # pulling changegroup
491 492 pullop.todosteps.remove('changegroup')
492 493 if not pullop.fetch:
493 494 pullop.repo.ui.status(_("no changes found\n"))
494 495 pullop.cgresult = 0
495 496 else:
496 497 kwargs['common'] = pullop.common
497 498 kwargs['heads'] = pullop.heads or pullop.rheads
498 499 if pullop.heads is None and list(pullop.common) == [nullid]:
499 500 pullop.repo.ui.status(_("requesting all changes\n"))
500 501 if kwargs.keys() == ['format']:
501 502 return # nothing to pull
502 503 bundle = pullop.remote.getbundle('pull', **kwargs)
503 504 try:
504 505 op = bundle2.processbundle(pullop.repo, bundle, pullop.gettransaction)
505 506 except KeyError, exc:
506 507 raise util.Abort('missing support for %s' % exc)
507 508 assert len(op.records['changegroup']) == 1
508 509 pullop.cgresult = op.records['changegroup'][0]['return']
509 510
510 511 def _pullchangeset(pullop):
511 512 """pull changeset from unbundle into the local repo"""
512 513 # We delay the open of the transaction as late as possible so we
513 514 # don't open transaction for nothing or you break future useful
514 515 # rollback call
515 516 pullop.todosteps.remove('changegroup')
516 517 if not pullop.fetch:
517 518 pullop.repo.ui.status(_("no changes found\n"))
518 519 pullop.cgresult = 0
519 520 return
520 521 pullop.gettransaction()
521 522 if pullop.heads is None and list(pullop.common) == [nullid]:
522 523 pullop.repo.ui.status(_("requesting all changes\n"))
523 524 elif pullop.heads is None and pullop.remote.capable('changegroupsubset'):
524 525 # issue1320, avoid a race if remote changed after discovery
525 526 pullop.heads = pullop.rheads
526 527
527 528 if pullop.remote.capable('getbundle'):
528 529 # TODO: get bundlecaps from remote
529 530 cg = pullop.remote.getbundle('pull', common=pullop.common,
530 531 heads=pullop.heads or pullop.rheads)
531 532 elif pullop.heads is None:
532 533 cg = pullop.remote.changegroup(pullop.fetch, 'pull')
533 534 elif not pullop.remote.capable('changegroupsubset'):
534 535 raise util.Abort(_("partial pull cannot be done because "
535 536 "other repository doesn't support "
536 537 "changegroupsubset."))
537 538 else:
538 539 cg = pullop.remote.changegroupsubset(pullop.fetch, pullop.heads, 'pull')
539 540 pullop.cgresult = changegroup.addchangegroup(pullop.repo, cg, 'pull',
540 541 pullop.remote.url())
541 542
542 543 def _pullphase(pullop):
543 544 # Get remote phases data from remote
544 545 pullop.todosteps.remove('phases')
545 546 remotephases = pullop.remote.listkeys('phases')
546 547 publishing = bool(remotephases.get('publishing', False))
547 548 if remotephases and not publishing:
548 549 # remote is new and unpublishing
549 550 pheads, _dr = phases.analyzeremotephases(pullop.repo,
550 551 pullop.pulledsubset,
551 552 remotephases)
552 553 phases.advanceboundary(pullop.repo, phases.public, pheads)
553 554 phases.advanceboundary(pullop.repo, phases.draft,
554 555 pullop.pulledsubset)
555 556 else:
556 557 # Remote is old or publishing all common changesets
557 558 # should be seen as public
558 559 phases.advanceboundary(pullop.repo, phases.public,
559 560 pullop.pulledsubset)
560 561
561 562 def _pullobsolete(pullop):
562 563 """utility function to pull obsolete markers from a remote
563 564
564 565 The `gettransaction` is function that return the pull transaction, creating
565 566 one if necessary. We return the transaction to inform the calling code that
566 567 a new transaction have been created (when applicable).
567 568
568 569 Exists mostly to allow overriding for experimentation purpose"""
569 570 pullop.todosteps.remove('obsmarkers')
570 571 tr = None
571 572 if obsolete._enabled:
572 573 pullop.repo.ui.debug('fetching remote obsolete markers\n')
573 574 remoteobs = pullop.remote.listkeys('obsolete')
574 575 if 'dump0' in remoteobs:
575 576 tr = pullop.gettransaction()
576 577 for key in sorted(remoteobs, reverse=True):
577 578 if key.startswith('dump'):
578 579 data = base85.b85decode(remoteobs[key])
579 580 pullop.repo.obsstore.mergemarkers(tr, data)
580 581 pullop.repo.invalidatevolatilesets()
581 582 return tr
582 583
583 584 def getbundle(repo, source, heads=None, common=None, bundlecaps=None):
584 585 """return a full bundle (with potentially multiple kind of parts)
585 586
586 587 Could be a bundle HG10 or a bundle HG20 depending on bundlecaps
587 588 passed. For now, the bundle can contain only changegroup, but this will
588 589 changes when more part type will be available for bundle2.
589 590
590 591 This is different from changegroup.getbundle that only returns an HG10
591 592 changegroup bundle. They may eventually get reunited in the future when we
592 593 have a clearer idea of the API we what to query different data.
593 594
594 595 The implementation is at a very early stage and will get massive rework
595 596 when the API of bundle is refined.
596 597 """
597 598 # build bundle here.
598 599 cg = changegroup.getbundle(repo, source, heads=heads,
599 600 common=common, bundlecaps=bundlecaps)
600 601 if bundlecaps is None or 'HG20' not in bundlecaps:
601 602 return cg
602 603 # very crude first implementation,
603 604 # the bundle API will change and the generation will be done lazily.
604 605 bundler = bundle2.bundle20(repo.ui)
605 606 tempname = changegroup.writebundle(cg, None, 'HG10UN')
606 607 data = open(tempname).read()
607 608 part = bundle2.part('changegroup', data=data)
608 609 bundler.addpart(part)
609 610 temp = cStringIO.StringIO()
610 611 for c in bundler.getchunks():
611 612 temp.write(c)
612 613 temp.seek(0)
613 614 return bundle2.unbundle20(repo.ui, temp)
614 615
615 616 class PushRaced(RuntimeError):
616 617 """An exception raised during unbunding that indicate a push race"""
617 618
618 619 def check_heads(repo, their_heads, context):
619 620 """check if the heads of a repo have been modified
620 621
621 622 Used by peer for unbundling.
622 623 """
623 624 heads = repo.heads()
624 625 heads_hash = util.sha1(''.join(sorted(heads))).digest()
625 626 if not (their_heads == ['force'] or their_heads == heads or
626 627 their_heads == ['hashed', heads_hash]):
627 628 # someone else committed/pushed/unbundled while we
628 629 # were transferring data
629 630 raise PushRaced('repository changed while %s - '
630 631 'please try again' % context)
632
633 def unbundle(repo, cg, heads, source, url):
634 """Apply a bundle to a repo.
635
636 this function makes sure the repo is locked during the application and have
637 mechanism to check that no push race occured between the creation of the
638 bundle and its application.
639
640 If the push was raced as PushRaced exception is raised."""
641 r = 0
642 lock = repo.lock()
643 try:
644 check_heads(repo, heads, 'uploading changes')
645 # push can proceed
646 try:
647 r = changegroup.addchangegroup(repo, cg, source, url)
648 except util.Abort, inst:
649 # The old code we moved used sys.stderr directly.
650 # We did not changed it to minise code change.
651 # This need to be moved to something proper.
652 # Feel free to do it.
653 sys.stderr.write("abort: %s\n" % inst)
654 finally:
655 lock.release()
656 return r
@@ -1,789 +1,778 b''
1 1 # wireproto.py - generic wire protocol support functions
2 2 #
3 3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 import urllib, tempfile, os, sys
9 9 from i18n import _
10 10 from node import bin, hex
11 11 import changegroup as changegroupmod
12 12 import peer, error, encoding, util, store, exchange
13 13
14 14
15 15 class abstractserverproto(object):
16 16 """abstract class that summarizes the protocol API
17 17
18 18 Used as reference and documentation.
19 19 """
20 20
21 21 def getargs(self, args):
22 22 """return the value for arguments in <args>
23 23
24 24 returns a list of values (same order as <args>)"""
25 25 raise NotImplementedError()
26 26
27 27 def getfile(self, fp):
28 28 """write the whole content of a file into a file like object
29 29
30 30 The file is in the form::
31 31
32 32 (<chunk-size>\n<chunk>)+0\n
33 33
34 34 chunk size is the ascii version of the int.
35 35 """
36 36 raise NotImplementedError()
37 37
38 38 def redirect(self):
39 39 """may setup interception for stdout and stderr
40 40
41 41 See also the `restore` method."""
42 42 raise NotImplementedError()
43 43
44 44 # If the `redirect` function does install interception, the `restore`
45 45 # function MUST be defined. If interception is not used, this function
46 46 # MUST NOT be defined.
47 47 #
48 48 # left commented here on purpose
49 49 #
50 50 #def restore(self):
51 51 # """reinstall previous stdout and stderr and return intercepted stdout
52 52 # """
53 53 # raise NotImplementedError()
54 54
55 55 def groupchunks(self, cg):
56 56 """return 4096 chunks from a changegroup object
57 57
58 58 Some protocols may have compressed the contents."""
59 59 raise NotImplementedError()
60 60
61 61 # abstract batching support
62 62
63 63 class future(object):
64 64 '''placeholder for a value to be set later'''
65 65 def set(self, value):
66 66 if util.safehasattr(self, 'value'):
67 67 raise error.RepoError("future is already set")
68 68 self.value = value
69 69
70 70 class batcher(object):
71 71 '''base class for batches of commands submittable in a single request
72 72
73 73 All methods invoked on instances of this class are simply queued and
74 74 return a a future for the result. Once you call submit(), all the queued
75 75 calls are performed and the results set in their respective futures.
76 76 '''
77 77 def __init__(self):
78 78 self.calls = []
79 79 def __getattr__(self, name):
80 80 def call(*args, **opts):
81 81 resref = future()
82 82 self.calls.append((name, args, opts, resref,))
83 83 return resref
84 84 return call
85 85 def submit(self):
86 86 pass
87 87
88 88 class localbatch(batcher):
89 89 '''performs the queued calls directly'''
90 90 def __init__(self, local):
91 91 batcher.__init__(self)
92 92 self.local = local
93 93 def submit(self):
94 94 for name, args, opts, resref in self.calls:
95 95 resref.set(getattr(self.local, name)(*args, **opts))
96 96
97 97 class remotebatch(batcher):
98 98 '''batches the queued calls; uses as few roundtrips as possible'''
99 99 def __init__(self, remote):
100 100 '''remote must support _submitbatch(encbatch) and
101 101 _submitone(op, encargs)'''
102 102 batcher.__init__(self)
103 103 self.remote = remote
104 104 def submit(self):
105 105 req, rsp = [], []
106 106 for name, args, opts, resref in self.calls:
107 107 mtd = getattr(self.remote, name)
108 108 batchablefn = getattr(mtd, 'batchable', None)
109 109 if batchablefn is not None:
110 110 batchable = batchablefn(mtd.im_self, *args, **opts)
111 111 encargsorres, encresref = batchable.next()
112 112 if encresref:
113 113 req.append((name, encargsorres,))
114 114 rsp.append((batchable, encresref, resref,))
115 115 else:
116 116 resref.set(encargsorres)
117 117 else:
118 118 if req:
119 119 self._submitreq(req, rsp)
120 120 req, rsp = [], []
121 121 resref.set(mtd(*args, **opts))
122 122 if req:
123 123 self._submitreq(req, rsp)
124 124 def _submitreq(self, req, rsp):
125 125 encresults = self.remote._submitbatch(req)
126 126 for encres, r in zip(encresults, rsp):
127 127 batchable, encresref, resref = r
128 128 encresref.set(encres)
129 129 resref.set(batchable.next())
130 130
131 131 def batchable(f):
132 132 '''annotation for batchable methods
133 133
134 134 Such methods must implement a coroutine as follows:
135 135
136 136 @batchable
137 137 def sample(self, one, two=None):
138 138 # Handle locally computable results first:
139 139 if not one:
140 140 yield "a local result", None
141 141 # Build list of encoded arguments suitable for your wire protocol:
142 142 encargs = [('one', encode(one),), ('two', encode(two),)]
143 143 # Create future for injection of encoded result:
144 144 encresref = future()
145 145 # Return encoded arguments and future:
146 146 yield encargs, encresref
147 147 # Assuming the future to be filled with the result from the batched
148 148 # request now. Decode it:
149 149 yield decode(encresref.value)
150 150
151 151 The decorator returns a function which wraps this coroutine as a plain
152 152 method, but adds the original method as an attribute called "batchable",
153 153 which is used by remotebatch to split the call into separate encoding and
154 154 decoding phases.
155 155 '''
156 156 def plain(*args, **opts):
157 157 batchable = f(*args, **opts)
158 158 encargsorres, encresref = batchable.next()
159 159 if not encresref:
160 160 return encargsorres # a local result in this case
161 161 self = args[0]
162 162 encresref.set(self._submitone(f.func_name, encargsorres))
163 163 return batchable.next()
164 164 setattr(plain, 'batchable', f)
165 165 return plain
166 166
167 167 # list of nodes encoding / decoding
168 168
169 169 def decodelist(l, sep=' '):
170 170 if l:
171 171 return map(bin, l.split(sep))
172 172 return []
173 173
174 174 def encodelist(l, sep=' '):
175 175 return sep.join(map(hex, l))
176 176
177 177 # batched call argument encoding
178 178
179 179 def escapearg(plain):
180 180 return (plain
181 181 .replace(':', '::')
182 182 .replace(',', ':,')
183 183 .replace(';', ':;')
184 184 .replace('=', ':='))
185 185
186 186 def unescapearg(escaped):
187 187 return (escaped
188 188 .replace(':=', '=')
189 189 .replace(':;', ';')
190 190 .replace(':,', ',')
191 191 .replace('::', ':'))
192 192
193 193 # client side
194 194
195 195 class wirepeer(peer.peerrepository):
196 196
197 197 def batch(self):
198 198 return remotebatch(self)
199 199 def _submitbatch(self, req):
200 200 cmds = []
201 201 for op, argsdict in req:
202 202 args = ','.join('%s=%s' % p for p in argsdict.iteritems())
203 203 cmds.append('%s %s' % (op, args))
204 204 rsp = self._call("batch", cmds=';'.join(cmds))
205 205 return rsp.split(';')
206 206 def _submitone(self, op, args):
207 207 return self._call(op, **args)
208 208
209 209 @batchable
210 210 def lookup(self, key):
211 211 self.requirecap('lookup', _('look up remote revision'))
212 212 f = future()
213 213 yield {'key': encoding.fromlocal(key)}, f
214 214 d = f.value
215 215 success, data = d[:-1].split(" ", 1)
216 216 if int(success):
217 217 yield bin(data)
218 218 self._abort(error.RepoError(data))
219 219
220 220 @batchable
221 221 def heads(self):
222 222 f = future()
223 223 yield {}, f
224 224 d = f.value
225 225 try:
226 226 yield decodelist(d[:-1])
227 227 except ValueError:
228 228 self._abort(error.ResponseError(_("unexpected response:"), d))
229 229
230 230 @batchable
231 231 def known(self, nodes):
232 232 f = future()
233 233 yield {'nodes': encodelist(nodes)}, f
234 234 d = f.value
235 235 try:
236 236 yield [bool(int(f)) for f in d]
237 237 except ValueError:
238 238 self._abort(error.ResponseError(_("unexpected response:"), d))
239 239
240 240 @batchable
241 241 def branchmap(self):
242 242 f = future()
243 243 yield {}, f
244 244 d = f.value
245 245 try:
246 246 branchmap = {}
247 247 for branchpart in d.splitlines():
248 248 branchname, branchheads = branchpart.split(' ', 1)
249 249 branchname = encoding.tolocal(urllib.unquote(branchname))
250 250 branchheads = decodelist(branchheads)
251 251 branchmap[branchname] = branchheads
252 252 yield branchmap
253 253 except TypeError:
254 254 self._abort(error.ResponseError(_("unexpected response:"), d))
255 255
256 256 def branches(self, nodes):
257 257 n = encodelist(nodes)
258 258 d = self._call("branches", nodes=n)
259 259 try:
260 260 br = [tuple(decodelist(b)) for b in d.splitlines()]
261 261 return br
262 262 except ValueError:
263 263 self._abort(error.ResponseError(_("unexpected response:"), d))
264 264
265 265 def between(self, pairs):
266 266 batch = 8 # avoid giant requests
267 267 r = []
268 268 for i in xrange(0, len(pairs), batch):
269 269 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
270 270 d = self._call("between", pairs=n)
271 271 try:
272 272 r.extend(l and decodelist(l) or [] for l in d.splitlines())
273 273 except ValueError:
274 274 self._abort(error.ResponseError(_("unexpected response:"), d))
275 275 return r
276 276
277 277 @batchable
278 278 def pushkey(self, namespace, key, old, new):
279 279 if not self.capable('pushkey'):
280 280 yield False, None
281 281 f = future()
282 282 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
283 283 yield {'namespace': encoding.fromlocal(namespace),
284 284 'key': encoding.fromlocal(key),
285 285 'old': encoding.fromlocal(old),
286 286 'new': encoding.fromlocal(new)}, f
287 287 d = f.value
288 288 d, output = d.split('\n', 1)
289 289 try:
290 290 d = bool(int(d))
291 291 except ValueError:
292 292 raise error.ResponseError(
293 293 _('push failed (unexpected response):'), d)
294 294 for l in output.splitlines(True):
295 295 self.ui.status(_('remote: '), l)
296 296 yield d
297 297
298 298 @batchable
299 299 def listkeys(self, namespace):
300 300 if not self.capable('pushkey'):
301 301 yield {}, None
302 302 f = future()
303 303 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
304 304 yield {'namespace': encoding.fromlocal(namespace)}, f
305 305 d = f.value
306 306 r = {}
307 307 for l in d.splitlines():
308 308 k, v = l.split('\t')
309 309 r[encoding.tolocal(k)] = encoding.tolocal(v)
310 310 yield r
311 311
312 312 def stream_out(self):
313 313 return self._callstream('stream_out')
314 314
315 315 def changegroup(self, nodes, kind):
316 316 n = encodelist(nodes)
317 317 f = self._callcompressable("changegroup", roots=n)
318 318 return changegroupmod.unbundle10(f, 'UN')
319 319
320 320 def changegroupsubset(self, bases, heads, kind):
321 321 self.requirecap('changegroupsubset', _('look up remote changes'))
322 322 bases = encodelist(bases)
323 323 heads = encodelist(heads)
324 324 f = self._callcompressable("changegroupsubset",
325 325 bases=bases, heads=heads)
326 326 return changegroupmod.unbundle10(f, 'UN')
327 327
328 328 def getbundle(self, source, heads=None, common=None, bundlecaps=None):
329 329 self.requirecap('getbundle', _('look up remote changes'))
330 330 opts = {}
331 331 if heads is not None:
332 332 opts['heads'] = encodelist(heads)
333 333 if common is not None:
334 334 opts['common'] = encodelist(common)
335 335 if bundlecaps is not None:
336 336 opts['bundlecaps'] = ','.join(bundlecaps)
337 337 f = self._callcompressable("getbundle", **opts)
338 338 return changegroupmod.unbundle10(f, 'UN')
339 339
340 340 def unbundle(self, cg, heads, source):
341 341 '''Send cg (a readable file-like object representing the
342 342 changegroup to push, typically a chunkbuffer object) to the
343 343 remote server as a bundle. Return an integer indicating the
344 344 result of the push (see localrepository.addchangegroup()).'''
345 345
346 346 if heads != ['force'] and self.capable('unbundlehash'):
347 347 heads = encodelist(['hashed',
348 348 util.sha1(''.join(sorted(heads))).digest()])
349 349 else:
350 350 heads = encodelist(heads)
351 351
352 352 ret, output = self._callpush("unbundle", cg, heads=heads)
353 353 if ret == "":
354 354 raise error.ResponseError(
355 355 _('push failed:'), output)
356 356 try:
357 357 ret = int(ret)
358 358 except ValueError:
359 359 raise error.ResponseError(
360 360 _('push failed (unexpected response):'), ret)
361 361
362 362 for l in output.splitlines(True):
363 363 self.ui.status(_('remote: '), l)
364 364 return ret
365 365
366 366 def debugwireargs(self, one, two, three=None, four=None, five=None):
367 367 # don't pass optional arguments left at their default value
368 368 opts = {}
369 369 if three is not None:
370 370 opts['three'] = three
371 371 if four is not None:
372 372 opts['four'] = four
373 373 return self._call('debugwireargs', one=one, two=two, **opts)
374 374
375 375 def _call(self, cmd, **args):
376 376 """execute <cmd> on the server
377 377
378 378 The command is expected to return a simple string.
379 379
380 380 returns the server reply as a string."""
381 381 raise NotImplementedError()
382 382
383 383 def _callstream(self, cmd, **args):
384 384 """execute <cmd> on the server
385 385
386 386 The command is expected to return a stream.
387 387
388 388 returns the server reply as a file like object."""
389 389 raise NotImplementedError()
390 390
391 391 def _callcompressable(self, cmd, **args):
392 392 """execute <cmd> on the server
393 393
394 394 The command is expected to return a stream.
395 395
396 396 The stream may have been compressed in some implementaitons. This
397 397 function takes care of the decompression. This is the only difference
398 398 with _callstream.
399 399
400 400 returns the server reply as a file like object.
401 401 """
402 402 raise NotImplementedError()
403 403
404 404 def _callpush(self, cmd, fp, **args):
405 405 """execute a <cmd> on server
406 406
407 407 The command is expected to be related to a push. Push has a special
408 408 return method.
409 409
410 410 returns the server reply as a (ret, output) tuple. ret is either
411 411 empty (error) or a stringified int.
412 412 """
413 413 raise NotImplementedError()
414 414
415 415 def _abort(self, exception):
416 416 """clearly abort the wire protocol connection and raise the exception
417 417 """
418 418 raise NotImplementedError()
419 419
420 420 # server side
421 421
422 422 # wire protocol command can either return a string or one of these classes.
423 423 class streamres(object):
424 424 """wireproto reply: binary stream
425 425
426 426 The call was successful and the result is a stream.
427 427 Iterate on the `self.gen` attribute to retrieve chunks.
428 428 """
429 429 def __init__(self, gen):
430 430 self.gen = gen
431 431
432 432 class pushres(object):
433 433 """wireproto reply: success with simple integer return
434 434
435 435 The call was successful and returned an integer contained in `self.res`.
436 436 """
437 437 def __init__(self, res):
438 438 self.res = res
439 439
440 440 class pusherr(object):
441 441 """wireproto reply: failure
442 442
443 443 The call failed. The `self.res` attribute contains the error message.
444 444 """
445 445 def __init__(self, res):
446 446 self.res = res
447 447
448 448 class ooberror(object):
449 449 """wireproto reply: failure of a batch of operation
450 450
451 451 Something failed during a batch call. The error message is stored in
452 452 `self.message`.
453 453 """
454 454 def __init__(self, message):
455 455 self.message = message
456 456
457 457 def dispatch(repo, proto, command):
458 458 repo = repo.filtered("served")
459 459 func, spec = commands[command]
460 460 args = proto.getargs(spec)
461 461 return func(repo, proto, *args)
462 462
463 463 def options(cmd, keys, others):
464 464 opts = {}
465 465 for k in keys:
466 466 if k in others:
467 467 opts[k] = others[k]
468 468 del others[k]
469 469 if others:
470 470 sys.stderr.write("abort: %s got unexpected arguments %s\n"
471 471 % (cmd, ",".join(others)))
472 472 return opts
473 473
474 474 # list of commands
475 475 commands = {}
476 476
477 477 def wireprotocommand(name, args=''):
478 478 """decorator for wireprotocol command"""
479 479 def register(func):
480 480 commands[name] = (func, args)
481 481 return func
482 482 return register
483 483
484 484 @wireprotocommand('batch', 'cmds *')
485 485 def batch(repo, proto, cmds, others):
486 486 repo = repo.filtered("served")
487 487 res = []
488 488 for pair in cmds.split(';'):
489 489 op, args = pair.split(' ', 1)
490 490 vals = {}
491 491 for a in args.split(','):
492 492 if a:
493 493 n, v = a.split('=')
494 494 vals[n] = unescapearg(v)
495 495 func, spec = commands[op]
496 496 if spec:
497 497 keys = spec.split()
498 498 data = {}
499 499 for k in keys:
500 500 if k == '*':
501 501 star = {}
502 502 for key in vals.keys():
503 503 if key not in keys:
504 504 star[key] = vals[key]
505 505 data['*'] = star
506 506 else:
507 507 data[k] = vals[k]
508 508 result = func(repo, proto, *[data[k] for k in keys])
509 509 else:
510 510 result = func(repo, proto)
511 511 if isinstance(result, ooberror):
512 512 return result
513 513 res.append(escapearg(result))
514 514 return ';'.join(res)
515 515
516 516 @wireprotocommand('between', 'pairs')
517 517 def between(repo, proto, pairs):
518 518 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
519 519 r = []
520 520 for b in repo.between(pairs):
521 521 r.append(encodelist(b) + "\n")
522 522 return "".join(r)
523 523
524 524 @wireprotocommand('branchmap')
525 525 def branchmap(repo, proto):
526 526 branchmap = repo.branchmap()
527 527 heads = []
528 528 for branch, nodes in branchmap.iteritems():
529 529 branchname = urllib.quote(encoding.fromlocal(branch))
530 530 branchnodes = encodelist(nodes)
531 531 heads.append('%s %s' % (branchname, branchnodes))
532 532 return '\n'.join(heads)
533 533
534 534 @wireprotocommand('branches', 'nodes')
535 535 def branches(repo, proto, nodes):
536 536 nodes = decodelist(nodes)
537 537 r = []
538 538 for b in repo.branches(nodes):
539 539 r.append(encodelist(b) + "\n")
540 540 return "".join(r)
541 541
542 542
543 543 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
544 544 'known', 'getbundle', 'unbundlehash', 'batch']
545 545
546 546 def _capabilities(repo, proto):
547 547 """return a list of capabilities for a repo
548 548
549 549 This function exists to allow extensions to easily wrap capabilities
550 550 computation
551 551
552 552 - returns a lists: easy to alter
553 553 - change done here will be propagated to both `capabilities` and `hello`
554 554 command without any other effort. without any other action needed.
555 555 """
556 556 # copy to prevent modification of the global list
557 557 caps = list(wireprotocaps)
558 558 if _allowstream(repo.ui):
559 559 if repo.ui.configbool('server', 'preferuncompressed', False):
560 560 caps.append('stream-preferred')
561 561 requiredformats = repo.requirements & repo.supportedformats
562 562 # if our local revlogs are just revlogv1, add 'stream' cap
563 563 if not requiredformats - set(('revlogv1',)):
564 564 caps.append('stream')
565 565 # otherwise, add 'streamreqs' detailing our local revlog format
566 566 else:
567 567 caps.append('streamreqs=%s' % ','.join(requiredformats))
568 568 caps.append('unbundle=%s' % ','.join(changegroupmod.bundlepriority))
569 569 caps.append('httpheader=1024')
570 570 return caps
571 571
572 572 # If you are writting and extension and consider wrapping this function. Wrap
573 573 # `_capabilities` instead.
574 574 @wireprotocommand('capabilities')
575 575 def capabilities(repo, proto):
576 576 return ' '.join(_capabilities(repo, proto))
577 577
578 578 @wireprotocommand('changegroup', 'roots')
579 579 def changegroup(repo, proto, roots):
580 580 nodes = decodelist(roots)
581 581 cg = changegroupmod.changegroup(repo, nodes, 'serve')
582 582 return streamres(proto.groupchunks(cg))
583 583
584 584 @wireprotocommand('changegroupsubset', 'bases heads')
585 585 def changegroupsubset(repo, proto, bases, heads):
586 586 bases = decodelist(bases)
587 587 heads = decodelist(heads)
588 588 cg = changegroupmod.changegroupsubset(repo, bases, heads, 'serve')
589 589 return streamres(proto.groupchunks(cg))
590 590
591 591 @wireprotocommand('debugwireargs', 'one two *')
592 592 def debugwireargs(repo, proto, one, two, others):
593 593 # only accept optional args from the known set
594 594 opts = options('debugwireargs', ['three', 'four'], others)
595 595 return repo.debugwireargs(one, two, **opts)
596 596
597 597 @wireprotocommand('getbundle', '*')
598 598 def getbundle(repo, proto, others):
599 599 opts = options('getbundle', ['heads', 'common', 'bundlecaps'], others)
600 600 for k, v in opts.iteritems():
601 601 if k in ('heads', 'common'):
602 602 opts[k] = decodelist(v)
603 603 elif k == 'bundlecaps':
604 604 opts[k] = set(v.split(','))
605 605 cg = changegroupmod.getbundle(repo, 'serve', **opts)
606 606 return streamres(proto.groupchunks(cg))
607 607
608 608 @wireprotocommand('heads')
609 609 def heads(repo, proto):
610 610 h = repo.heads()
611 611 return encodelist(h) + "\n"
612 612
613 613 @wireprotocommand('hello')
614 614 def hello(repo, proto):
615 615 '''the hello command returns a set of lines describing various
616 616 interesting things about the server, in an RFC822-like format.
617 617 Currently the only one defined is "capabilities", which
618 618 consists of a line in the form:
619 619
620 620 capabilities: space separated list of tokens
621 621 '''
622 622 return "capabilities: %s\n" % (capabilities(repo, proto))
623 623
624 624 @wireprotocommand('listkeys', 'namespace')
625 625 def listkeys(repo, proto, namespace):
626 626 d = repo.listkeys(encoding.tolocal(namespace)).items()
627 627 t = '\n'.join(['%s\t%s' % (encoding.fromlocal(k), encoding.fromlocal(v))
628 628 for k, v in d])
629 629 return t
630 630
631 631 @wireprotocommand('lookup', 'key')
632 632 def lookup(repo, proto, key):
633 633 try:
634 634 k = encoding.tolocal(key)
635 635 c = repo[k]
636 636 r = c.hex()
637 637 success = 1
638 638 except Exception, inst:
639 639 r = str(inst)
640 640 success = 0
641 641 return "%s %s\n" % (success, r)
642 642
643 643 @wireprotocommand('known', 'nodes *')
644 644 def known(repo, proto, nodes, others):
645 645 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
646 646
647 647 @wireprotocommand('pushkey', 'namespace key old new')
648 648 def pushkey(repo, proto, namespace, key, old, new):
649 649 # compatibility with pre-1.8 clients which were accidentally
650 650 # sending raw binary nodes rather than utf-8-encoded hex
651 651 if len(new) == 20 and new.encode('string-escape') != new:
652 652 # looks like it could be a binary node
653 653 try:
654 654 new.decode('utf-8')
655 655 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
656 656 except UnicodeDecodeError:
657 657 pass # binary, leave unmodified
658 658 else:
659 659 new = encoding.tolocal(new) # normal path
660 660
661 661 if util.safehasattr(proto, 'restore'):
662 662
663 663 proto.redirect()
664 664
665 665 try:
666 666 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
667 667 encoding.tolocal(old), new) or False
668 668 except util.Abort:
669 669 r = False
670 670
671 671 output = proto.restore()
672 672
673 673 return '%s\n%s' % (int(r), output)
674 674
675 675 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
676 676 encoding.tolocal(old), new)
677 677 return '%s\n' % int(r)
678 678
679 679 def _allowstream(ui):
680 680 return ui.configbool('server', 'uncompressed', True, untrusted=True)
681 681
682 682 def _walkstreamfiles(repo):
683 683 # this is it's own function so extensions can override it
684 684 return repo.store.walk()
685 685
686 686 @wireprotocommand('stream_out')
687 687 def stream(repo, proto):
688 688 '''If the server supports streaming clone, it advertises the "stream"
689 689 capability with a value representing the version and flags of the repo
690 690 it is serving. Client checks to see if it understands the format.
691 691
692 692 The format is simple: the server writes out a line with the amount
693 693 of files, then the total amount of bytes to be transferred (separated
694 694 by a space). Then, for each file, the server first writes the filename
695 695 and filesize (separated by the null character), then the file contents.
696 696 '''
697 697
698 698 if not _allowstream(repo.ui):
699 699 return '1\n'
700 700
701 701 entries = []
702 702 total_bytes = 0
703 703 try:
704 704 # get consistent snapshot of repo, lock during scan
705 705 lock = repo.lock()
706 706 try:
707 707 repo.ui.debug('scanning\n')
708 708 for name, ename, size in _walkstreamfiles(repo):
709 709 if size:
710 710 entries.append((name, size))
711 711 total_bytes += size
712 712 finally:
713 713 lock.release()
714 714 except error.LockError:
715 715 return '2\n' # error: 2
716 716
717 717 def streamer(repo, entries, total):
718 718 '''stream out all metadata files in repository.'''
719 719 yield '0\n' # success
720 720 repo.ui.debug('%d files, %d bytes to transfer\n' %
721 721 (len(entries), total_bytes))
722 722 yield '%d %d\n' % (len(entries), total_bytes)
723 723
724 724 sopener = repo.sopener
725 725 oldaudit = sopener.mustaudit
726 726 debugflag = repo.ui.debugflag
727 727 sopener.mustaudit = False
728 728
729 729 try:
730 730 for name, size in entries:
731 731 if debugflag:
732 732 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
733 733 # partially encode name over the wire for backwards compat
734 734 yield '%s\0%d\n' % (store.encodedir(name), size)
735 735 if size <= 65536:
736 736 fp = sopener(name)
737 737 try:
738 738 data = fp.read(size)
739 739 finally:
740 740 fp.close()
741 741 yield data
742 742 else:
743 743 for chunk in util.filechunkiter(sopener(name), limit=size):
744 744 yield chunk
745 745 # replace with "finally:" when support for python 2.4 has been dropped
746 746 except Exception:
747 747 sopener.mustaudit = oldaudit
748 748 raise
749 749 sopener.mustaudit = oldaudit
750 750
751 751 return streamres(streamer(repo, entries, total_bytes))
752 752
753 753 @wireprotocommand('unbundle', 'heads')
754 754 def unbundle(repo, proto, heads):
755 755 their_heads = decodelist(heads)
756 756
757 757 try:
758 758 proto.redirect()
759 759
760 760 exchange.check_heads(repo, their_heads, 'preparing changes')
761 761
762 762 # write bundle data to temporary file because it can be big
763 763 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
764 764 fp = os.fdopen(fd, 'wb+')
765 765 r = 0
766 766 try:
767 767 proto.getfile(fp)
768 lock = repo.lock()
769 try:
770 exchange.check_heads(repo, their_heads, 'uploading changes')
771
772 # push can proceed
773 768 fp.seek(0)
774 769 gen = changegroupmod.readbundle(fp, None)
775
776 try:
777 r = changegroupmod.addchangegroup(repo, gen, 'serve',
770 r = exchange.unbundle(repo, gen, their_heads, 'serve',
778 771 proto._client())
779 except util.Abort, inst:
780 sys.stderr.write("abort: %s\n" % inst)
781 finally:
782 lock.release()
783 772 return pushres(r)
784 773
785 774 finally:
786 775 fp.close()
787 776 os.unlink(tempname)
788 777 except exchange.PushRaced, exc:
789 778 return pusherr(str(exc))
General Comments 0
You need to be logged in to leave comments. Login now