##// END OF EJS Templates
unbundle: extract checkheads in its own function...
Pierre-Yves David -
r20967:98485027 default
parent child Browse files
Show More
@@ -1,613 +1,630 b''
1 1 # exchange.py - utily to exchange data between repo.
2 2 #
3 3 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from i18n import _
9 9 from node import hex, nullid
10 10 import cStringIO
11 11 import errno
12 12 import util, scmutil, changegroup, base85
13 13 import discovery, phases, obsolete, bookmarks, bundle2
14 14
15 15
16 16 class pushoperation(object):
17 17 """A object that represent a single push operation
18 18
19 19 It purpose is to carry push related state and very common operation.
20 20
21 21 A new should be created at the begining of each push and discarded
22 22 afterward.
23 23 """
24 24
25 25 def __init__(self, repo, remote, force=False, revs=None, newbranch=False):
26 26 # repo we push from
27 27 self.repo = repo
28 28 self.ui = repo.ui
29 29 # repo we push to
30 30 self.remote = remote
31 31 # force option provided
32 32 self.force = force
33 33 # revs to be pushed (None is "all")
34 34 self.revs = revs
35 35 # allow push of new branch
36 36 self.newbranch = newbranch
37 37 # did a local lock get acquired?
38 38 self.locallocked = None
39 39 # Integer version of the push result
40 40 # - None means nothing to push
41 41 # - 0 means HTTP error
42 42 # - 1 means we pushed and remote head count is unchanged *or*
43 43 # we have outgoing changesets but refused to push
44 44 # - other values as described by addchangegroup()
45 45 self.ret = None
46 46 # discover.outgoing object (contains common and outgoin data)
47 47 self.outgoing = None
48 48 # all remote heads before the push
49 49 self.remoteheads = None
50 50 # testable as a boolean indicating if any nodes are missing locally.
51 51 self.incoming = None
52 52 # set of all heads common after changeset bundle push
53 53 self.commonheads = None
54 54
55 55 def push(repo, remote, force=False, revs=None, newbranch=False):
56 56 '''Push outgoing changesets (limited by revs) from a local
57 57 repository to remote. Return an integer:
58 58 - None means nothing to push
59 59 - 0 means HTTP error
60 60 - 1 means we pushed and remote head count is unchanged *or*
61 61 we have outgoing changesets but refused to push
62 62 - other values as described by addchangegroup()
63 63 '''
64 64 pushop = pushoperation(repo, remote, force, revs, newbranch)
65 65 if pushop.remote.local():
66 66 missing = (set(pushop.repo.requirements)
67 67 - pushop.remote.local().supported)
68 68 if missing:
69 69 msg = _("required features are not"
70 70 " supported in the destination:"
71 71 " %s") % (', '.join(sorted(missing)))
72 72 raise util.Abort(msg)
73 73
74 74 # there are two ways to push to remote repo:
75 75 #
76 76 # addchangegroup assumes local user can lock remote
77 77 # repo (local filesystem, old ssh servers).
78 78 #
79 79 # unbundle assumes local user cannot lock remote repo (new ssh
80 80 # servers, http servers).
81 81
82 82 if not pushop.remote.canpush():
83 83 raise util.Abort(_("destination does not support push"))
84 84 # get local lock as we might write phase data
85 85 locallock = None
86 86 try:
87 87 locallock = pushop.repo.lock()
88 88 pushop.locallocked = True
89 89 except IOError, err:
90 90 pushop.locallocked = False
91 91 if err.errno != errno.EACCES:
92 92 raise
93 93 # source repo cannot be locked.
94 94 # We do not abort the push, but just disable the local phase
95 95 # synchronisation.
96 96 msg = 'cannot lock source repository: %s\n' % err
97 97 pushop.ui.debug(msg)
98 98 try:
99 99 pushop.repo.checkpush(pushop)
100 100 lock = None
101 101 unbundle = pushop.remote.capable('unbundle')
102 102 if not unbundle:
103 103 lock = pushop.remote.lock()
104 104 try:
105 105 _pushdiscovery(pushop)
106 106 if _pushcheckoutgoing(pushop):
107 107 _pushchangeset(pushop)
108 108 _pushcomputecommonheads(pushop)
109 109 _pushsyncphase(pushop)
110 110 _pushobsolete(pushop)
111 111 finally:
112 112 if lock is not None:
113 113 lock.release()
114 114 finally:
115 115 if locallock is not None:
116 116 locallock.release()
117 117
118 118 _pushbookmark(pushop)
119 119 return pushop.ret
120 120
121 121 def _pushdiscovery(pushop):
122 122 # discovery
123 123 unfi = pushop.repo.unfiltered()
124 124 fci = discovery.findcommonincoming
125 125 commoninc = fci(unfi, pushop.remote, force=pushop.force)
126 126 common, inc, remoteheads = commoninc
127 127 fco = discovery.findcommonoutgoing
128 128 outgoing = fco(unfi, pushop.remote, onlyheads=pushop.revs,
129 129 commoninc=commoninc, force=pushop.force)
130 130 pushop.outgoing = outgoing
131 131 pushop.remoteheads = remoteheads
132 132 pushop.incoming = inc
133 133
134 134 def _pushcheckoutgoing(pushop):
135 135 outgoing = pushop.outgoing
136 136 unfi = pushop.repo.unfiltered()
137 137 if not outgoing.missing:
138 138 # nothing to push
139 139 scmutil.nochangesfound(unfi.ui, unfi, outgoing.excluded)
140 140 return False
141 141 # something to push
142 142 if not pushop.force:
143 143 # if repo.obsstore == False --> no obsolete
144 144 # then, save the iteration
145 145 if unfi.obsstore:
146 146 # this message are here for 80 char limit reason
147 147 mso = _("push includes obsolete changeset: %s!")
148 148 mst = "push includes %s changeset: %s!"
149 149 # plain versions for i18n tool to detect them
150 150 _("push includes unstable changeset: %s!")
151 151 _("push includes bumped changeset: %s!")
152 152 _("push includes divergent changeset: %s!")
153 153 # If we are to push if there is at least one
154 154 # obsolete or unstable changeset in missing, at
155 155 # least one of the missinghead will be obsolete or
156 156 # unstable. So checking heads only is ok
157 157 for node in outgoing.missingheads:
158 158 ctx = unfi[node]
159 159 if ctx.obsolete():
160 160 raise util.Abort(mso % ctx)
161 161 elif ctx.troubled():
162 162 raise util.Abort(_(mst)
163 163 % (ctx.troubles()[0],
164 164 ctx))
165 165 newbm = pushop.ui.configlist('bookmarks', 'pushing')
166 166 discovery.checkheads(unfi, pushop.remote, outgoing,
167 167 pushop.remoteheads,
168 168 pushop.newbranch,
169 169 bool(pushop.incoming),
170 170 newbm)
171 171 return True
172 172
173 173 def _pushchangeset(pushop):
174 174 """Make the actual push of changeset bundle to remote repo"""
175 175 outgoing = pushop.outgoing
176 176 unbundle = pushop.remote.capable('unbundle')
177 177 # TODO: get bundlecaps from remote
178 178 bundlecaps = None
179 179 # create a changegroup from local
180 180 if pushop.revs is None and not (outgoing.excluded
181 181 or pushop.repo.changelog.filteredrevs):
182 182 # push everything,
183 183 # use the fast path, no race possible on push
184 184 bundler = changegroup.bundle10(pushop.repo, bundlecaps)
185 185 cg = changegroup.getsubset(pushop.repo,
186 186 outgoing,
187 187 bundler,
188 188 'push',
189 189 fastpath=True)
190 190 else:
191 191 cg = changegroup.getlocalbundle(pushop.repo, 'push', outgoing,
192 192 bundlecaps)
193 193
194 194 # apply changegroup to remote
195 195 if unbundle:
196 196 # local repo finds heads on server, finds out what
197 197 # revs it must push. once revs transferred, if server
198 198 # finds it has different heads (someone else won
199 199 # commit/push race), server aborts.
200 200 if pushop.force:
201 201 remoteheads = ['force']
202 202 else:
203 203 remoteheads = pushop.remoteheads
204 204 # ssh: return remote's addchangegroup()
205 205 # http: return remote's addchangegroup() or 0 for error
206 206 pushop.ret = pushop.remote.unbundle(cg, remoteheads,
207 207 'push')
208 208 else:
209 209 # we return an integer indicating remote head count
210 210 # change
211 211 pushop.ret = pushop.remote.addchangegroup(cg, 'push',
212 212 pushop.repo.url())
213 213
214 214 def _pushcomputecommonheads(pushop):
215 215 unfi = pushop.repo.unfiltered()
216 216 if pushop.ret:
217 217 # push succeed, synchronize target of the push
218 218 cheads = pushop.outgoing.missingheads
219 219 elif pushop.revs is None:
220 220 # All out push fails. synchronize all common
221 221 cheads = pushop.outgoing.commonheads
222 222 else:
223 223 # I want cheads = heads(::missingheads and ::commonheads)
224 224 # (missingheads is revs with secret changeset filtered out)
225 225 #
226 226 # This can be expressed as:
227 227 # cheads = ( (missingheads and ::commonheads)
228 228 # + (commonheads and ::missingheads))"
229 229 # )
230 230 #
231 231 # while trying to push we already computed the following:
232 232 # common = (::commonheads)
233 233 # missing = ((commonheads::missingheads) - commonheads)
234 234 #
235 235 # We can pick:
236 236 # * missingheads part of common (::commonheads)
237 237 common = set(pushop.outgoing.common)
238 238 nm = pushop.repo.changelog.nodemap
239 239 cheads = [node for node in pushop.revs if nm[node] in common]
240 240 # and
241 241 # * commonheads parents on missing
242 242 revset = unfi.set('%ln and parents(roots(%ln))',
243 243 pushop.outgoing.commonheads,
244 244 pushop.outgoing.missing)
245 245 cheads.extend(c.node() for c in revset)
246 246 pushop.commonheads = cheads
247 247
248 248 def _pushsyncphase(pushop):
249 249 """synchronise phase information locally and remotly"""
250 250 unfi = pushop.repo.unfiltered()
251 251 cheads = pushop.commonheads
252 252 if pushop.ret:
253 253 # push succeed, synchronize target of the push
254 254 cheads = pushop.outgoing.missingheads
255 255 elif pushop.revs is None:
256 256 # All out push fails. synchronize all common
257 257 cheads = pushop.outgoing.commonheads
258 258 else:
259 259 # I want cheads = heads(::missingheads and ::commonheads)
260 260 # (missingheads is revs with secret changeset filtered out)
261 261 #
262 262 # This can be expressed as:
263 263 # cheads = ( (missingheads and ::commonheads)
264 264 # + (commonheads and ::missingheads))"
265 265 # )
266 266 #
267 267 # while trying to push we already computed the following:
268 268 # common = (::commonheads)
269 269 # missing = ((commonheads::missingheads) - commonheads)
270 270 #
271 271 # We can pick:
272 272 # * missingheads part of common (::commonheads)
273 273 common = set(pushop.outgoing.common)
274 274 nm = pushop.repo.changelog.nodemap
275 275 cheads = [node for node in pushop.revs if nm[node] in common]
276 276 # and
277 277 # * commonheads parents on missing
278 278 revset = unfi.set('%ln and parents(roots(%ln))',
279 279 pushop.outgoing.commonheads,
280 280 pushop.outgoing.missing)
281 281 cheads.extend(c.node() for c in revset)
282 282 pushop.commonheads = cheads
283 283 # even when we don't push, exchanging phase data is useful
284 284 remotephases = pushop.remote.listkeys('phases')
285 285 if (pushop.ui.configbool('ui', '_usedassubrepo', False)
286 286 and remotephases # server supports phases
287 287 and pushop.ret is None # nothing was pushed
288 288 and remotephases.get('publishing', False)):
289 289 # When:
290 290 # - this is a subrepo push
291 291 # - and remote support phase
292 292 # - and no changeset was pushed
293 293 # - and remote is publishing
294 294 # We may be in issue 3871 case!
295 295 # We drop the possible phase synchronisation done by
296 296 # courtesy to publish changesets possibly locally draft
297 297 # on the remote.
298 298 remotephases = {'publishing': 'True'}
299 299 if not remotephases: # old server or public only rer
300 300 _localphasemove(pushop, cheads)
301 301 # don't push any phase data as there is nothing to push
302 302 else:
303 303 ana = phases.analyzeremotephases(pushop.repo, cheads,
304 304 remotephases)
305 305 pheads, droots = ana
306 306 ### Apply remote phase on local
307 307 if remotephases.get('publishing', False):
308 308 _localphasemove(pushop, cheads)
309 309 else: # publish = False
310 310 _localphasemove(pushop, pheads)
311 311 _localphasemove(pushop, cheads, phases.draft)
312 312 ### Apply local phase on remote
313 313
314 314 # Get the list of all revs draft on remote by public here.
315 315 # XXX Beware that revset break if droots is not strictly
316 316 # XXX root we may want to ensure it is but it is costly
317 317 outdated = unfi.set('heads((%ln::%ln) and public())',
318 318 droots, cheads)
319 319 for newremotehead in outdated:
320 320 r = pushop.remote.pushkey('phases',
321 321 newremotehead.hex(),
322 322 str(phases.draft),
323 323 str(phases.public))
324 324 if not r:
325 325 pushop.ui.warn(_('updating %s to public failed!\n')
326 326 % newremotehead)
327 327
328 328 def _localphasemove(pushop, nodes, phase=phases.public):
329 329 """move <nodes> to <phase> in the local source repo"""
330 330 if pushop.locallocked:
331 331 phases.advanceboundary(pushop.repo, phase, nodes)
332 332 else:
333 333 # repo is not locked, do not change any phases!
334 334 # Informs the user that phases should have been moved when
335 335 # applicable.
336 336 actualmoves = [n for n in nodes if phase < pushop.repo[n].phase()]
337 337 phasestr = phases.phasenames[phase]
338 338 if actualmoves:
339 339 pushop.ui.status(_('cannot lock source repo, skipping '
340 340 'local %s phase update\n') % phasestr)
341 341
342 342 def _pushobsolete(pushop):
343 343 """utility function to push obsolete markers to a remote"""
344 344 pushop.ui.debug('try to push obsolete markers to remote\n')
345 345 repo = pushop.repo
346 346 remote = pushop.remote
347 347 if (obsolete._enabled and repo.obsstore and
348 348 'obsolete' in remote.listkeys('namespaces')):
349 349 rslts = []
350 350 remotedata = repo.listkeys('obsolete')
351 351 for key in sorted(remotedata, reverse=True):
352 352 # reverse sort to ensure we end with dump0
353 353 data = remotedata[key]
354 354 rslts.append(remote.pushkey('obsolete', key, '', data))
355 355 if [r for r in rslts if not r]:
356 356 msg = _('failed to push some obsolete markers!\n')
357 357 repo.ui.warn(msg)
358 358
359 359 def _pushbookmark(pushop):
360 360 """Update bookmark position on remote"""
361 361 ui = pushop.ui
362 362 repo = pushop.repo.unfiltered()
363 363 remote = pushop.remote
364 364 ui.debug("checking for updated bookmarks\n")
365 365 revnums = map(repo.changelog.rev, pushop.revs or [])
366 366 ancestors = [a for a in repo.changelog.ancestors(revnums, inclusive=True)]
367 367 (addsrc, adddst, advsrc, advdst, diverge, differ, invalid
368 368 ) = bookmarks.compare(repo, repo._bookmarks, remote.listkeys('bookmarks'),
369 369 srchex=hex)
370 370
371 371 for b, scid, dcid in advsrc:
372 372 if ancestors and repo[scid].rev() not in ancestors:
373 373 continue
374 374 if remote.pushkey('bookmarks', b, dcid, scid):
375 375 ui.status(_("updating bookmark %s\n") % b)
376 376 else:
377 377 ui.warn(_('updating bookmark %s failed!\n') % b)
378 378
379 379 class pulloperation(object):
380 380 """A object that represent a single pull operation
381 381
382 382 It purpose is to carry push related state and very common operation.
383 383
384 384 A new should be created at the begining of each pull and discarded
385 385 afterward.
386 386 """
387 387
388 388 def __init__(self, repo, remote, heads=None, force=False):
389 389 # repo we pull into
390 390 self.repo = repo
391 391 # repo we pull from
392 392 self.remote = remote
393 393 # revision we try to pull (None is "all")
394 394 self.heads = heads
395 395 # do we force pull?
396 396 self.force = force
397 397 # the name the pull transaction
398 398 self._trname = 'pull\n' + util.hidepassword(remote.url())
399 399 # hold the transaction once created
400 400 self._tr = None
401 401 # set of common changeset between local and remote before pull
402 402 self.common = None
403 403 # set of pulled head
404 404 self.rheads = None
405 405 # list of missing changeset to fetch remotly
406 406 self.fetch = None
407 407 # result of changegroup pulling (used as returng code by pull)
408 408 self.cgresult = None
409 409 # list of step remaining todo (related to future bundle2 usage)
410 410 self.todosteps = set(['changegroup', 'phases', 'obsmarkers'])
411 411
412 412 @util.propertycache
413 413 def pulledsubset(self):
414 414 """heads of the set of changeset target by the pull"""
415 415 # compute target subset
416 416 if self.heads is None:
417 417 # We pulled every thing possible
418 418 # sync on everything common
419 419 c = set(self.common)
420 420 ret = list(self.common)
421 421 for n in self.rheads:
422 422 if n not in c:
423 423 ret.append(n)
424 424 return ret
425 425 else:
426 426 # We pulled a specific subset
427 427 # sync on this subset
428 428 return self.heads
429 429
430 430 def gettransaction(self):
431 431 """get appropriate pull transaction, creating it if needed"""
432 432 if self._tr is None:
433 433 self._tr = self.repo.transaction(self._trname)
434 434 return self._tr
435 435
436 436 def closetransaction(self):
437 437 """close transaction if created"""
438 438 if self._tr is not None:
439 439 self._tr.close()
440 440
441 441 def releasetransaction(self):
442 442 """release transaction if created"""
443 443 if self._tr is not None:
444 444 self._tr.release()
445 445
446 446 def pull(repo, remote, heads=None, force=False):
447 447 pullop = pulloperation(repo, remote, heads, force)
448 448 if pullop.remote.local():
449 449 missing = set(pullop.remote.requirements) - pullop.repo.supported
450 450 if missing:
451 451 msg = _("required features are not"
452 452 " supported in the destination:"
453 453 " %s") % (', '.join(sorted(missing)))
454 454 raise util.Abort(msg)
455 455
456 456 lock = pullop.repo.lock()
457 457 try:
458 458 _pulldiscovery(pullop)
459 459 if pullop.remote.capable('bundle2'):
460 460 _pullbundle2(pullop)
461 461 if 'changegroup' in pullop.todosteps:
462 462 _pullchangeset(pullop)
463 463 if 'phases' in pullop.todosteps:
464 464 _pullphase(pullop)
465 465 if 'obsmarkers' in pullop.todosteps:
466 466 _pullobsolete(pullop)
467 467 pullop.closetransaction()
468 468 finally:
469 469 pullop.releasetransaction()
470 470 lock.release()
471 471
472 472 return pullop.cgresult
473 473
474 474 def _pulldiscovery(pullop):
475 475 """discovery phase for the pull
476 476
477 477 Current handle changeset discovery only, will change handle all discovery
478 478 at some point."""
479 479 tmp = discovery.findcommonincoming(pullop.repo.unfiltered(),
480 480 pullop.remote,
481 481 heads=pullop.heads,
482 482 force=pullop.force)
483 483 pullop.common, pullop.fetch, pullop.rheads = tmp
484 484
485 485 def _pullbundle2(pullop):
486 486 """pull data using bundle2
487 487
488 488 For now, the only supported data are changegroup."""
489 489 kwargs = {'bundlecaps': set(['HG20'])}
490 490 # pulling changegroup
491 491 pullop.todosteps.remove('changegroup')
492 492 if not pullop.fetch:
493 493 pullop.repo.ui.status(_("no changes found\n"))
494 494 pullop.cgresult = 0
495 495 else:
496 496 kwargs['common'] = pullop.common
497 497 kwargs['heads'] = pullop.heads or pullop.rheads
498 498 if pullop.heads is None and list(pullop.common) == [nullid]:
499 499 pullop.repo.ui.status(_("requesting all changes\n"))
500 500 if kwargs.keys() == ['format']:
501 501 return # nothing to pull
502 502 bundle = pullop.remote.getbundle('pull', **kwargs)
503 503 try:
504 504 op = bundle2.processbundle(pullop.repo, bundle, pullop.gettransaction)
505 505 except KeyError, exc:
506 506 raise util.Abort('missing support for %s' % exc)
507 507 assert len(op.records['changegroup']) == 1
508 508 pullop.cgresult = op.records['changegroup'][0]['return']
509 509
510 510 def _pullchangeset(pullop):
511 511 """pull changeset from unbundle into the local repo"""
512 512 # We delay the open of the transaction as late as possible so we
513 513 # don't open transaction for nothing or you break future useful
514 514 # rollback call
515 515 pullop.todosteps.remove('changegroup')
516 516 if not pullop.fetch:
517 517 pullop.repo.ui.status(_("no changes found\n"))
518 518 pullop.cgresult = 0
519 519 return
520 520 pullop.gettransaction()
521 521 if pullop.heads is None and list(pullop.common) == [nullid]:
522 522 pullop.repo.ui.status(_("requesting all changes\n"))
523 523 elif pullop.heads is None and pullop.remote.capable('changegroupsubset'):
524 524 # issue1320, avoid a race if remote changed after discovery
525 525 pullop.heads = pullop.rheads
526 526
527 527 if pullop.remote.capable('getbundle'):
528 528 # TODO: get bundlecaps from remote
529 529 cg = pullop.remote.getbundle('pull', common=pullop.common,
530 530 heads=pullop.heads or pullop.rheads)
531 531 elif pullop.heads is None:
532 532 cg = pullop.remote.changegroup(pullop.fetch, 'pull')
533 533 elif not pullop.remote.capable('changegroupsubset'):
534 534 raise util.Abort(_("partial pull cannot be done because "
535 535 "other repository doesn't support "
536 536 "changegroupsubset."))
537 537 else:
538 538 cg = pullop.remote.changegroupsubset(pullop.fetch, pullop.heads, 'pull')
539 539 pullop.cgresult = changegroup.addchangegroup(pullop.repo, cg, 'pull',
540 540 pullop.remote.url())
541 541
542 542 def _pullphase(pullop):
543 543 # Get remote phases data from remote
544 544 pullop.todosteps.remove('phases')
545 545 remotephases = pullop.remote.listkeys('phases')
546 546 publishing = bool(remotephases.get('publishing', False))
547 547 if remotephases and not publishing:
548 548 # remote is new and unpublishing
549 549 pheads, _dr = phases.analyzeremotephases(pullop.repo,
550 550 pullop.pulledsubset,
551 551 remotephases)
552 552 phases.advanceboundary(pullop.repo, phases.public, pheads)
553 553 phases.advanceboundary(pullop.repo, phases.draft,
554 554 pullop.pulledsubset)
555 555 else:
556 556 # Remote is old or publishing all common changesets
557 557 # should be seen as public
558 558 phases.advanceboundary(pullop.repo, phases.public,
559 559 pullop.pulledsubset)
560 560
561 561 def _pullobsolete(pullop):
562 562 """utility function to pull obsolete markers from a remote
563 563
564 564 The `gettransaction` is function that return the pull transaction, creating
565 565 one if necessary. We return the transaction to inform the calling code that
566 566 a new transaction have been created (when applicable).
567 567
568 568 Exists mostly to allow overriding for experimentation purpose"""
569 569 pullop.todosteps.remove('obsmarkers')
570 570 tr = None
571 571 if obsolete._enabled:
572 572 pullop.repo.ui.debug('fetching remote obsolete markers\n')
573 573 remoteobs = pullop.remote.listkeys('obsolete')
574 574 if 'dump0' in remoteobs:
575 575 tr = pullop.gettransaction()
576 576 for key in sorted(remoteobs, reverse=True):
577 577 if key.startswith('dump'):
578 578 data = base85.b85decode(remoteobs[key])
579 579 pullop.repo.obsstore.mergemarkers(tr, data)
580 580 pullop.repo.invalidatevolatilesets()
581 581 return tr
582 582
583 583 def getbundle(repo, source, heads=None, common=None, bundlecaps=None):
584 584 """return a full bundle (with potentially multiple kind of parts)
585 585
586 586 Could be a bundle HG10 or a bundle HG20 depending on bundlecaps
587 587 passed. For now, the bundle can contain only changegroup, but this will
588 588 changes when more part type will be available for bundle2.
589 589
590 590 This is different from changegroup.getbundle that only returns an HG10
591 591 changegroup bundle. They may eventually get reunited in the future when we
592 592 have a clearer idea of the API we what to query different data.
593 593
594 594 The implementation is at a very early stage and will get massive rework
595 595 when the API of bundle is refined.
596 596 """
597 597 # build bundle here.
598 598 cg = changegroup.getbundle(repo, source, heads=heads,
599 599 common=common, bundlecaps=bundlecaps)
600 600 if bundlecaps is None or 'HG20' not in bundlecaps:
601 601 return cg
602 602 # very crude first implementation,
603 603 # the bundle API will change and the generation will be done lazily.
604 604 bundler = bundle2.bundle20(repo.ui)
605 605 tempname = changegroup.writebundle(cg, None, 'HG10UN')
606 606 data = open(tempname).read()
607 607 part = bundle2.part('changegroup', data=data)
608 608 bundler.addpart(part)
609 609 temp = cStringIO.StringIO()
610 610 for c in bundler.getchunks():
611 611 temp.write(c)
612 612 temp.seek(0)
613 613 return bundle2.unbundle20(repo.ui, temp)
614
615 class PushRaced(RuntimeError):
616 """An exception raised during unbunding that indicate a push race"""
617
618 def check_heads(repo, their_heads, context):
619 """check if the heads of a repo have been modified
620
621 Used by peer for unbundling.
622 """
623 heads = repo.heads()
624 heads_hash = util.sha1(''.join(sorted(heads))).digest()
625 if not (their_heads == ['force'] or their_heads == heads or
626 their_heads == ['hashed', heads_hash]):
627 # someone else committed/pushed/unbundled while we
628 # were transferring data
629 raise PushRaced('repository changed while %s - '
630 'please try again' % context)
@@ -1,799 +1,789 b''
1 1 # wireproto.py - generic wire protocol support functions
2 2 #
3 3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 import urllib, tempfile, os, sys
9 9 from i18n import _
10 10 from node import bin, hex
11 11 import changegroup as changegroupmod
12 import peer, error, encoding, util, store
12 import peer, error, encoding, util, store, exchange
13 13
14 14
15 15 class abstractserverproto(object):
16 16 """abstract class that summarizes the protocol API
17 17
18 18 Used as reference and documentation.
19 19 """
20 20
21 21 def getargs(self, args):
22 22 """return the value for arguments in <args>
23 23
24 24 returns a list of values (same order as <args>)"""
25 25 raise NotImplementedError()
26 26
27 27 def getfile(self, fp):
28 28 """write the whole content of a file into a file like object
29 29
30 30 The file is in the form::
31 31
32 32 (<chunk-size>\n<chunk>)+0\n
33 33
34 34 chunk size is the ascii version of the int.
35 35 """
36 36 raise NotImplementedError()
37 37
38 38 def redirect(self):
39 39 """may setup interception for stdout and stderr
40 40
41 41 See also the `restore` method."""
42 42 raise NotImplementedError()
43 43
44 44 # If the `redirect` function does install interception, the `restore`
45 45 # function MUST be defined. If interception is not used, this function
46 46 # MUST NOT be defined.
47 47 #
48 48 # left commented here on purpose
49 49 #
50 50 #def restore(self):
51 51 # """reinstall previous stdout and stderr and return intercepted stdout
52 52 # """
53 53 # raise NotImplementedError()
54 54
55 55 def groupchunks(self, cg):
56 56 """return 4096 chunks from a changegroup object
57 57
58 58 Some protocols may have compressed the contents."""
59 59 raise NotImplementedError()
60 60
61 61 # abstract batching support
62 62
63 63 class future(object):
64 64 '''placeholder for a value to be set later'''
65 65 def set(self, value):
66 66 if util.safehasattr(self, 'value'):
67 67 raise error.RepoError("future is already set")
68 68 self.value = value
69 69
70 70 class batcher(object):
71 71 '''base class for batches of commands submittable in a single request
72 72
73 73 All methods invoked on instances of this class are simply queued and
74 74 return a a future for the result. Once you call submit(), all the queued
75 75 calls are performed and the results set in their respective futures.
76 76 '''
77 77 def __init__(self):
78 78 self.calls = []
79 79 def __getattr__(self, name):
80 80 def call(*args, **opts):
81 81 resref = future()
82 82 self.calls.append((name, args, opts, resref,))
83 83 return resref
84 84 return call
85 85 def submit(self):
86 86 pass
87 87
88 88 class localbatch(batcher):
89 89 '''performs the queued calls directly'''
90 90 def __init__(self, local):
91 91 batcher.__init__(self)
92 92 self.local = local
93 93 def submit(self):
94 94 for name, args, opts, resref in self.calls:
95 95 resref.set(getattr(self.local, name)(*args, **opts))
96 96
97 97 class remotebatch(batcher):
98 98 '''batches the queued calls; uses as few roundtrips as possible'''
99 99 def __init__(self, remote):
100 100 '''remote must support _submitbatch(encbatch) and
101 101 _submitone(op, encargs)'''
102 102 batcher.__init__(self)
103 103 self.remote = remote
104 104 def submit(self):
105 105 req, rsp = [], []
106 106 for name, args, opts, resref in self.calls:
107 107 mtd = getattr(self.remote, name)
108 108 batchablefn = getattr(mtd, 'batchable', None)
109 109 if batchablefn is not None:
110 110 batchable = batchablefn(mtd.im_self, *args, **opts)
111 111 encargsorres, encresref = batchable.next()
112 112 if encresref:
113 113 req.append((name, encargsorres,))
114 114 rsp.append((batchable, encresref, resref,))
115 115 else:
116 116 resref.set(encargsorres)
117 117 else:
118 118 if req:
119 119 self._submitreq(req, rsp)
120 120 req, rsp = [], []
121 121 resref.set(mtd(*args, **opts))
122 122 if req:
123 123 self._submitreq(req, rsp)
124 124 def _submitreq(self, req, rsp):
125 125 encresults = self.remote._submitbatch(req)
126 126 for encres, r in zip(encresults, rsp):
127 127 batchable, encresref, resref = r
128 128 encresref.set(encres)
129 129 resref.set(batchable.next())
130 130
131 131 def batchable(f):
132 132 '''annotation for batchable methods
133 133
134 134 Such methods must implement a coroutine as follows:
135 135
136 136 @batchable
137 137 def sample(self, one, two=None):
138 138 # Handle locally computable results first:
139 139 if not one:
140 140 yield "a local result", None
141 141 # Build list of encoded arguments suitable for your wire protocol:
142 142 encargs = [('one', encode(one),), ('two', encode(two),)]
143 143 # Create future for injection of encoded result:
144 144 encresref = future()
145 145 # Return encoded arguments and future:
146 146 yield encargs, encresref
147 147 # Assuming the future to be filled with the result from the batched
148 148 # request now. Decode it:
149 149 yield decode(encresref.value)
150 150
151 151 The decorator returns a function which wraps this coroutine as a plain
152 152 method, but adds the original method as an attribute called "batchable",
153 153 which is used by remotebatch to split the call into separate encoding and
154 154 decoding phases.
155 155 '''
156 156 def plain(*args, **opts):
157 157 batchable = f(*args, **opts)
158 158 encargsorres, encresref = batchable.next()
159 159 if not encresref:
160 160 return encargsorres # a local result in this case
161 161 self = args[0]
162 162 encresref.set(self._submitone(f.func_name, encargsorres))
163 163 return batchable.next()
164 164 setattr(plain, 'batchable', f)
165 165 return plain
166 166
167 167 # list of nodes encoding / decoding
168 168
169 169 def decodelist(l, sep=' '):
170 170 if l:
171 171 return map(bin, l.split(sep))
172 172 return []
173 173
174 174 def encodelist(l, sep=' '):
175 175 return sep.join(map(hex, l))
176 176
177 177 # batched call argument encoding
178 178
179 179 def escapearg(plain):
180 180 return (plain
181 181 .replace(':', '::')
182 182 .replace(',', ':,')
183 183 .replace(';', ':;')
184 184 .replace('=', ':='))
185 185
186 186 def unescapearg(escaped):
187 187 return (escaped
188 188 .replace(':=', '=')
189 189 .replace(':;', ';')
190 190 .replace(':,', ',')
191 191 .replace('::', ':'))
192 192
193 193 # client side
194 194
195 195 class wirepeer(peer.peerrepository):
196 196
197 197 def batch(self):
198 198 return remotebatch(self)
199 199 def _submitbatch(self, req):
200 200 cmds = []
201 201 for op, argsdict in req:
202 202 args = ','.join('%s=%s' % p for p in argsdict.iteritems())
203 203 cmds.append('%s %s' % (op, args))
204 204 rsp = self._call("batch", cmds=';'.join(cmds))
205 205 return rsp.split(';')
206 206 def _submitone(self, op, args):
207 207 return self._call(op, **args)
208 208
209 209 @batchable
210 210 def lookup(self, key):
211 211 self.requirecap('lookup', _('look up remote revision'))
212 212 f = future()
213 213 yield {'key': encoding.fromlocal(key)}, f
214 214 d = f.value
215 215 success, data = d[:-1].split(" ", 1)
216 216 if int(success):
217 217 yield bin(data)
218 218 self._abort(error.RepoError(data))
219 219
220 220 @batchable
221 221 def heads(self):
222 222 f = future()
223 223 yield {}, f
224 224 d = f.value
225 225 try:
226 226 yield decodelist(d[:-1])
227 227 except ValueError:
228 228 self._abort(error.ResponseError(_("unexpected response:"), d))
229 229
230 230 @batchable
231 231 def known(self, nodes):
232 232 f = future()
233 233 yield {'nodes': encodelist(nodes)}, f
234 234 d = f.value
235 235 try:
236 236 yield [bool(int(f)) for f in d]
237 237 except ValueError:
238 238 self._abort(error.ResponseError(_("unexpected response:"), d))
239 239
240 240 @batchable
241 241 def branchmap(self):
242 242 f = future()
243 243 yield {}, f
244 244 d = f.value
245 245 try:
246 246 branchmap = {}
247 247 for branchpart in d.splitlines():
248 248 branchname, branchheads = branchpart.split(' ', 1)
249 249 branchname = encoding.tolocal(urllib.unquote(branchname))
250 250 branchheads = decodelist(branchheads)
251 251 branchmap[branchname] = branchheads
252 252 yield branchmap
253 253 except TypeError:
254 254 self._abort(error.ResponseError(_("unexpected response:"), d))
255 255
256 256 def branches(self, nodes):
257 257 n = encodelist(nodes)
258 258 d = self._call("branches", nodes=n)
259 259 try:
260 260 br = [tuple(decodelist(b)) for b in d.splitlines()]
261 261 return br
262 262 except ValueError:
263 263 self._abort(error.ResponseError(_("unexpected response:"), d))
264 264
265 265 def between(self, pairs):
266 266 batch = 8 # avoid giant requests
267 267 r = []
268 268 for i in xrange(0, len(pairs), batch):
269 269 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
270 270 d = self._call("between", pairs=n)
271 271 try:
272 272 r.extend(l and decodelist(l) or [] for l in d.splitlines())
273 273 except ValueError:
274 274 self._abort(error.ResponseError(_("unexpected response:"), d))
275 275 return r
276 276
277 277 @batchable
278 278 def pushkey(self, namespace, key, old, new):
279 279 if not self.capable('pushkey'):
280 280 yield False, None
281 281 f = future()
282 282 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
283 283 yield {'namespace': encoding.fromlocal(namespace),
284 284 'key': encoding.fromlocal(key),
285 285 'old': encoding.fromlocal(old),
286 286 'new': encoding.fromlocal(new)}, f
287 287 d = f.value
288 288 d, output = d.split('\n', 1)
289 289 try:
290 290 d = bool(int(d))
291 291 except ValueError:
292 292 raise error.ResponseError(
293 293 _('push failed (unexpected response):'), d)
294 294 for l in output.splitlines(True):
295 295 self.ui.status(_('remote: '), l)
296 296 yield d
297 297
298 298 @batchable
299 299 def listkeys(self, namespace):
300 300 if not self.capable('pushkey'):
301 301 yield {}, None
302 302 f = future()
303 303 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
304 304 yield {'namespace': encoding.fromlocal(namespace)}, f
305 305 d = f.value
306 306 r = {}
307 307 for l in d.splitlines():
308 308 k, v = l.split('\t')
309 309 r[encoding.tolocal(k)] = encoding.tolocal(v)
310 310 yield r
311 311
312 312 def stream_out(self):
313 313 return self._callstream('stream_out')
314 314
315 315 def changegroup(self, nodes, kind):
316 316 n = encodelist(nodes)
317 317 f = self._callcompressable("changegroup", roots=n)
318 318 return changegroupmod.unbundle10(f, 'UN')
319 319
320 320 def changegroupsubset(self, bases, heads, kind):
321 321 self.requirecap('changegroupsubset', _('look up remote changes'))
322 322 bases = encodelist(bases)
323 323 heads = encodelist(heads)
324 324 f = self._callcompressable("changegroupsubset",
325 325 bases=bases, heads=heads)
326 326 return changegroupmod.unbundle10(f, 'UN')
327 327
328 328 def getbundle(self, source, heads=None, common=None, bundlecaps=None):
329 329 self.requirecap('getbundle', _('look up remote changes'))
330 330 opts = {}
331 331 if heads is not None:
332 332 opts['heads'] = encodelist(heads)
333 333 if common is not None:
334 334 opts['common'] = encodelist(common)
335 335 if bundlecaps is not None:
336 336 opts['bundlecaps'] = ','.join(bundlecaps)
337 337 f = self._callcompressable("getbundle", **opts)
338 338 return changegroupmod.unbundle10(f, 'UN')
339 339
340 340 def unbundle(self, cg, heads, source):
341 341 '''Send cg (a readable file-like object representing the
342 342 changegroup to push, typically a chunkbuffer object) to the
343 343 remote server as a bundle. Return an integer indicating the
344 344 result of the push (see localrepository.addchangegroup()).'''
345 345
346 346 if heads != ['force'] and self.capable('unbundlehash'):
347 347 heads = encodelist(['hashed',
348 348 util.sha1(''.join(sorted(heads))).digest()])
349 349 else:
350 350 heads = encodelist(heads)
351 351
352 352 ret, output = self._callpush("unbundle", cg, heads=heads)
353 353 if ret == "":
354 354 raise error.ResponseError(
355 355 _('push failed:'), output)
356 356 try:
357 357 ret = int(ret)
358 358 except ValueError:
359 359 raise error.ResponseError(
360 360 _('push failed (unexpected response):'), ret)
361 361
362 362 for l in output.splitlines(True):
363 363 self.ui.status(_('remote: '), l)
364 364 return ret
365 365
366 366 def debugwireargs(self, one, two, three=None, four=None, five=None):
367 367 # don't pass optional arguments left at their default value
368 368 opts = {}
369 369 if three is not None:
370 370 opts['three'] = three
371 371 if four is not None:
372 372 opts['four'] = four
373 373 return self._call('debugwireargs', one=one, two=two, **opts)
374 374
375 375 def _call(self, cmd, **args):
376 376 """execute <cmd> on the server
377 377
378 378 The command is expected to return a simple string.
379 379
380 380 returns the server reply as a string."""
381 381 raise NotImplementedError()
382 382
383 383 def _callstream(self, cmd, **args):
384 384 """execute <cmd> on the server
385 385
386 386 The command is expected to return a stream.
387 387
388 388 returns the server reply as a file like object."""
389 389 raise NotImplementedError()
390 390
391 391 def _callcompressable(self, cmd, **args):
392 392 """execute <cmd> on the server
393 393
394 394 The command is expected to return a stream.
395 395
396 396 The stream may have been compressed in some implementaitons. This
397 397 function takes care of the decompression. This is the only difference
398 398 with _callstream.
399 399
400 400 returns the server reply as a file like object.
401 401 """
402 402 raise NotImplementedError()
403 403
404 404 def _callpush(self, cmd, fp, **args):
405 405 """execute a <cmd> on server
406 406
407 407 The command is expected to be related to a push. Push has a special
408 408 return method.
409 409
410 410 returns the server reply as a (ret, output) tuple. ret is either
411 411 empty (error) or a stringified int.
412 412 """
413 413 raise NotImplementedError()
414 414
415 415 def _abort(self, exception):
416 416 """clearly abort the wire protocol connection and raise the exception
417 417 """
418 418 raise NotImplementedError()
419 419
420 420 # server side
421 421
422 422 # wire protocol command can either return a string or one of these classes.
423 423 class streamres(object):
424 424 """wireproto reply: binary stream
425 425
426 426 The call was successful and the result is a stream.
427 427 Iterate on the `self.gen` attribute to retrieve chunks.
428 428 """
429 429 def __init__(self, gen):
430 430 self.gen = gen
431 431
432 432 class pushres(object):
433 433 """wireproto reply: success with simple integer return
434 434
435 435 The call was successful and returned an integer contained in `self.res`.
436 436 """
437 437 def __init__(self, res):
438 438 self.res = res
439 439
440 440 class pusherr(object):
441 441 """wireproto reply: failure
442 442
443 443 The call failed. The `self.res` attribute contains the error message.
444 444 """
445 445 def __init__(self, res):
446 446 self.res = res
447 447
448 448 class ooberror(object):
449 449 """wireproto reply: failure of a batch of operation
450 450
451 451 Something failed during a batch call. The error message is stored in
452 452 `self.message`.
453 453 """
454 454 def __init__(self, message):
455 455 self.message = message
456 456
457 457 def dispatch(repo, proto, command):
458 458 repo = repo.filtered("served")
459 459 func, spec = commands[command]
460 460 args = proto.getargs(spec)
461 461 return func(repo, proto, *args)
462 462
463 463 def options(cmd, keys, others):
464 464 opts = {}
465 465 for k in keys:
466 466 if k in others:
467 467 opts[k] = others[k]
468 468 del others[k]
469 469 if others:
470 470 sys.stderr.write("abort: %s got unexpected arguments %s\n"
471 471 % (cmd, ",".join(others)))
472 472 return opts
473 473
474 474 # list of commands
475 475 commands = {}
476 476
477 477 def wireprotocommand(name, args=''):
478 478 """decorator for wireprotocol command"""
479 479 def register(func):
480 480 commands[name] = (func, args)
481 481 return func
482 482 return register
483 483
484 484 @wireprotocommand('batch', 'cmds *')
485 485 def batch(repo, proto, cmds, others):
486 486 repo = repo.filtered("served")
487 487 res = []
488 488 for pair in cmds.split(';'):
489 489 op, args = pair.split(' ', 1)
490 490 vals = {}
491 491 for a in args.split(','):
492 492 if a:
493 493 n, v = a.split('=')
494 494 vals[n] = unescapearg(v)
495 495 func, spec = commands[op]
496 496 if spec:
497 497 keys = spec.split()
498 498 data = {}
499 499 for k in keys:
500 500 if k == '*':
501 501 star = {}
502 502 for key in vals.keys():
503 503 if key not in keys:
504 504 star[key] = vals[key]
505 505 data['*'] = star
506 506 else:
507 507 data[k] = vals[k]
508 508 result = func(repo, proto, *[data[k] for k in keys])
509 509 else:
510 510 result = func(repo, proto)
511 511 if isinstance(result, ooberror):
512 512 return result
513 513 res.append(escapearg(result))
514 514 return ';'.join(res)
515 515
516 516 @wireprotocommand('between', 'pairs')
517 517 def between(repo, proto, pairs):
518 518 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
519 519 r = []
520 520 for b in repo.between(pairs):
521 521 r.append(encodelist(b) + "\n")
522 522 return "".join(r)
523 523
524 524 @wireprotocommand('branchmap')
525 525 def branchmap(repo, proto):
526 526 branchmap = repo.branchmap()
527 527 heads = []
528 528 for branch, nodes in branchmap.iteritems():
529 529 branchname = urllib.quote(encoding.fromlocal(branch))
530 530 branchnodes = encodelist(nodes)
531 531 heads.append('%s %s' % (branchname, branchnodes))
532 532 return '\n'.join(heads)
533 533
534 534 @wireprotocommand('branches', 'nodes')
535 535 def branches(repo, proto, nodes):
536 536 nodes = decodelist(nodes)
537 537 r = []
538 538 for b in repo.branches(nodes):
539 539 r.append(encodelist(b) + "\n")
540 540 return "".join(r)
541 541
542 542
543 543 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
544 544 'known', 'getbundle', 'unbundlehash', 'batch']
545 545
546 546 def _capabilities(repo, proto):
547 547 """return a list of capabilities for a repo
548 548
549 549 This function exists to allow extensions to easily wrap capabilities
550 550 computation
551 551
552 552 - returns a lists: easy to alter
553 553 - change done here will be propagated to both `capabilities` and `hello`
554 554 command without any other effort. without any other action needed.
555 555 """
556 556 # copy to prevent modification of the global list
557 557 caps = list(wireprotocaps)
558 558 if _allowstream(repo.ui):
559 559 if repo.ui.configbool('server', 'preferuncompressed', False):
560 560 caps.append('stream-preferred')
561 561 requiredformats = repo.requirements & repo.supportedformats
562 562 # if our local revlogs are just revlogv1, add 'stream' cap
563 563 if not requiredformats - set(('revlogv1',)):
564 564 caps.append('stream')
565 565 # otherwise, add 'streamreqs' detailing our local revlog format
566 566 else:
567 567 caps.append('streamreqs=%s' % ','.join(requiredformats))
568 568 caps.append('unbundle=%s' % ','.join(changegroupmod.bundlepriority))
569 569 caps.append('httpheader=1024')
570 570 return caps
571 571
572 572 # If you are writting and extension and consider wrapping this function. Wrap
573 573 # `_capabilities` instead.
574 574 @wireprotocommand('capabilities')
575 575 def capabilities(repo, proto):
576 576 return ' '.join(_capabilities(repo, proto))
577 577
578 578 @wireprotocommand('changegroup', 'roots')
579 579 def changegroup(repo, proto, roots):
580 580 nodes = decodelist(roots)
581 581 cg = changegroupmod.changegroup(repo, nodes, 'serve')
582 582 return streamres(proto.groupchunks(cg))
583 583
584 584 @wireprotocommand('changegroupsubset', 'bases heads')
585 585 def changegroupsubset(repo, proto, bases, heads):
586 586 bases = decodelist(bases)
587 587 heads = decodelist(heads)
588 588 cg = changegroupmod.changegroupsubset(repo, bases, heads, 'serve')
589 589 return streamres(proto.groupchunks(cg))
590 590
591 591 @wireprotocommand('debugwireargs', 'one two *')
592 592 def debugwireargs(repo, proto, one, two, others):
593 593 # only accept optional args from the known set
594 594 opts = options('debugwireargs', ['three', 'four'], others)
595 595 return repo.debugwireargs(one, two, **opts)
596 596
597 597 @wireprotocommand('getbundle', '*')
598 598 def getbundle(repo, proto, others):
599 599 opts = options('getbundle', ['heads', 'common', 'bundlecaps'], others)
600 600 for k, v in opts.iteritems():
601 601 if k in ('heads', 'common'):
602 602 opts[k] = decodelist(v)
603 603 elif k == 'bundlecaps':
604 604 opts[k] = set(v.split(','))
605 605 cg = changegroupmod.getbundle(repo, 'serve', **opts)
606 606 return streamres(proto.groupchunks(cg))
607 607
608 608 @wireprotocommand('heads')
609 609 def heads(repo, proto):
610 610 h = repo.heads()
611 611 return encodelist(h) + "\n"
612 612
613 613 @wireprotocommand('hello')
614 614 def hello(repo, proto):
615 615 '''the hello command returns a set of lines describing various
616 616 interesting things about the server, in an RFC822-like format.
617 617 Currently the only one defined is "capabilities", which
618 618 consists of a line in the form:
619 619
620 620 capabilities: space separated list of tokens
621 621 '''
622 622 return "capabilities: %s\n" % (capabilities(repo, proto))
623 623
624 624 @wireprotocommand('listkeys', 'namespace')
625 625 def listkeys(repo, proto, namespace):
626 626 d = repo.listkeys(encoding.tolocal(namespace)).items()
627 627 t = '\n'.join(['%s\t%s' % (encoding.fromlocal(k), encoding.fromlocal(v))
628 628 for k, v in d])
629 629 return t
630 630
631 631 @wireprotocommand('lookup', 'key')
632 632 def lookup(repo, proto, key):
633 633 try:
634 634 k = encoding.tolocal(key)
635 635 c = repo[k]
636 636 r = c.hex()
637 637 success = 1
638 638 except Exception, inst:
639 639 r = str(inst)
640 640 success = 0
641 641 return "%s %s\n" % (success, r)
642 642
643 643 @wireprotocommand('known', 'nodes *')
644 644 def known(repo, proto, nodes, others):
645 645 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
646 646
647 647 @wireprotocommand('pushkey', 'namespace key old new')
648 648 def pushkey(repo, proto, namespace, key, old, new):
649 649 # compatibility with pre-1.8 clients which were accidentally
650 650 # sending raw binary nodes rather than utf-8-encoded hex
651 651 if len(new) == 20 and new.encode('string-escape') != new:
652 652 # looks like it could be a binary node
653 653 try:
654 654 new.decode('utf-8')
655 655 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
656 656 except UnicodeDecodeError:
657 657 pass # binary, leave unmodified
658 658 else:
659 659 new = encoding.tolocal(new) # normal path
660 660
661 661 if util.safehasattr(proto, 'restore'):
662 662
663 663 proto.redirect()
664 664
665 665 try:
666 666 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
667 667 encoding.tolocal(old), new) or False
668 668 except util.Abort:
669 669 r = False
670 670
671 671 output = proto.restore()
672 672
673 673 return '%s\n%s' % (int(r), output)
674 674
675 675 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
676 676 encoding.tolocal(old), new)
677 677 return '%s\n' % int(r)
678 678
679 679 def _allowstream(ui):
680 680 return ui.configbool('server', 'uncompressed', True, untrusted=True)
681 681
682 682 def _walkstreamfiles(repo):
683 683 # this is it's own function so extensions can override it
684 684 return repo.store.walk()
685 685
686 686 @wireprotocommand('stream_out')
687 687 def stream(repo, proto):
688 688 '''If the server supports streaming clone, it advertises the "stream"
689 689 capability with a value representing the version and flags of the repo
690 690 it is serving. Client checks to see if it understands the format.
691 691
692 692 The format is simple: the server writes out a line with the amount
693 693 of files, then the total amount of bytes to be transferred (separated
694 694 by a space). Then, for each file, the server first writes the filename
695 695 and filesize (separated by the null character), then the file contents.
696 696 '''
697 697
698 698 if not _allowstream(repo.ui):
699 699 return '1\n'
700 700
701 701 entries = []
702 702 total_bytes = 0
703 703 try:
704 704 # get consistent snapshot of repo, lock during scan
705 705 lock = repo.lock()
706 706 try:
707 707 repo.ui.debug('scanning\n')
708 708 for name, ename, size in _walkstreamfiles(repo):
709 709 if size:
710 710 entries.append((name, size))
711 711 total_bytes += size
712 712 finally:
713 713 lock.release()
714 714 except error.LockError:
715 715 return '2\n' # error: 2
716 716
717 717 def streamer(repo, entries, total):
718 718 '''stream out all metadata files in repository.'''
719 719 yield '0\n' # success
720 720 repo.ui.debug('%d files, %d bytes to transfer\n' %
721 721 (len(entries), total_bytes))
722 722 yield '%d %d\n' % (len(entries), total_bytes)
723 723
724 724 sopener = repo.sopener
725 725 oldaudit = sopener.mustaudit
726 726 debugflag = repo.ui.debugflag
727 727 sopener.mustaudit = False
728 728
729 729 try:
730 730 for name, size in entries:
731 731 if debugflag:
732 732 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
733 733 # partially encode name over the wire for backwards compat
734 734 yield '%s\0%d\n' % (store.encodedir(name), size)
735 735 if size <= 65536:
736 736 fp = sopener(name)
737 737 try:
738 738 data = fp.read(size)
739 739 finally:
740 740 fp.close()
741 741 yield data
742 742 else:
743 743 for chunk in util.filechunkiter(sopener(name), limit=size):
744 744 yield chunk
745 745 # replace with "finally:" when support for python 2.4 has been dropped
746 746 except Exception:
747 747 sopener.mustaudit = oldaudit
748 748 raise
749 749 sopener.mustaudit = oldaudit
750 750
751 751 return streamres(streamer(repo, entries, total_bytes))
752 752
753 753 @wireprotocommand('unbundle', 'heads')
754 754 def unbundle(repo, proto, heads):
755 755 their_heads = decodelist(heads)
756 756
757 def check_heads():
758 heads = repo.heads()
759 heads_hash = util.sha1(''.join(sorted(heads))).digest()
760 return (their_heads == ['force'] or their_heads == heads or
761 their_heads == ['hashed', heads_hash])
757 try:
758 proto.redirect()
762 759
763 proto.redirect()
760 exchange.check_heads(repo, their_heads, 'preparing changes')
764 761
765 # fail early if possible
766 if not check_heads():
767 return pusherr('repository changed while preparing changes - '
768 'please try again')
769
770 # write bundle data to temporary file because it can be big
771 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
772 fp = os.fdopen(fd, 'wb+')
773 r = 0
774 try:
775 proto.getfile(fp)
776 lock = repo.lock()
762 # write bundle data to temporary file because it can be big
763 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
764 fp = os.fdopen(fd, 'wb+')
765 r = 0
777 766 try:
778 if not check_heads():
779 # someone else committed/pushed/unbundled while we
780 # were transferring data
781 return pusherr('repository changed while uploading changes - '
782 'please try again')
783
784 # push can proceed
785 fp.seek(0)
786 gen = changegroupmod.readbundle(fp, None)
787
767 proto.getfile(fp)
768 lock = repo.lock()
788 769 try:
789 r = changegroupmod.addchangegroup(repo, gen, 'serve',
790 proto._client())
791 except util.Abort, inst:
792 sys.stderr.write("abort: %s\n" % inst)
770 exchange.check_heads(repo, their_heads, 'uploading changes')
771
772 # push can proceed
773 fp.seek(0)
774 gen = changegroupmod.readbundle(fp, None)
775
776 try:
777 r = changegroupmod.addchangegroup(repo, gen, 'serve',
778 proto._client())
779 except util.Abort, inst:
780 sys.stderr.write("abort: %s\n" % inst)
781 finally:
782 lock.release()
783 return pushres(r)
784
793 785 finally:
794 lock.release()
795 return pushres(r)
796
797 finally:
798 fp.close()
799 os.unlink(tempname)
786 fp.close()
787 os.unlink(tempname)
788 except exchange.PushRaced, exc:
789 return pusherr(str(exc))
General Comments 0
You need to be logged in to leave comments. Login now