##// END OF EJS Templates
getbundle: add a ``cg`` boolean argument to control changegroup inclusion...
Pierre-Yves David -
r21989:bdb6d97f default
parent child Browse files
Show More
@@ -1,812 +1,816
1 1 # exchange.py - utility to exchange data between repos.
2 2 #
3 3 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from i18n import _
9 9 from node import hex, nullid
10 10 import errno, urllib
11 11 import util, scmutil, changegroup, base85, error
12 12 import discovery, phases, obsolete, bookmarks, bundle2, pushkey
13 13
14 14 def readbundle(ui, fh, fname, vfs=None):
15 15 header = changegroup.readexactly(fh, 4)
16 16
17 17 alg = None
18 18 if not fname:
19 19 fname = "stream"
20 20 if not header.startswith('HG') and header.startswith('\0'):
21 21 fh = changegroup.headerlessfixup(fh, header)
22 22 header = "HG10"
23 23 alg = 'UN'
24 24 elif vfs:
25 25 fname = vfs.join(fname)
26 26
27 27 magic, version = header[0:2], header[2:4]
28 28
29 29 if magic != 'HG':
30 30 raise util.Abort(_('%s: not a Mercurial bundle') % fname)
31 31 if version == '10':
32 32 if alg is None:
33 33 alg = changegroup.readexactly(fh, 2)
34 34 return changegroup.unbundle10(fh, alg)
35 35 elif version == '2X':
36 36 return bundle2.unbundle20(ui, fh, header=magic + version)
37 37 else:
38 38 raise util.Abort(_('%s: unknown bundle version %s') % (fname, version))
39 39
40 40
41 41 class pushoperation(object):
42 42 """A object that represent a single push operation
43 43
44 44 It purpose is to carry push related state and very common operation.
45 45
46 46 A new should be created at the beginning of each push and discarded
47 47 afterward.
48 48 """
49 49
50 50 def __init__(self, repo, remote, force=False, revs=None, newbranch=False):
51 51 # repo we push from
52 52 self.repo = repo
53 53 self.ui = repo.ui
54 54 # repo we push to
55 55 self.remote = remote
56 56 # force option provided
57 57 self.force = force
58 58 # revs to be pushed (None is "all")
59 59 self.revs = revs
60 60 # allow push of new branch
61 61 self.newbranch = newbranch
62 62 # did a local lock get acquired?
63 63 self.locallocked = None
64 64 # step already performed
65 65 # (used to check what steps have been already performed through bundle2)
66 66 self.stepsdone = set()
67 67 # Integer version of the push result
68 68 # - None means nothing to push
69 69 # - 0 means HTTP error
70 70 # - 1 means we pushed and remote head count is unchanged *or*
71 71 # we have outgoing changesets but refused to push
72 72 # - other values as described by addchangegroup()
73 73 self.ret = None
74 74 # discover.outgoing object (contains common and outgoing data)
75 75 self.outgoing = None
76 76 # all remote heads before the push
77 77 self.remoteheads = None
78 78 # testable as a boolean indicating if any nodes are missing locally.
79 79 self.incoming = None
80 80 # set of all heads common after changeset bundle push
81 81 self.commonheads = None
82 82
83 83 def push(repo, remote, force=False, revs=None, newbranch=False):
84 84 '''Push outgoing changesets (limited by revs) from a local
85 85 repository to remote. Return an integer:
86 86 - None means nothing to push
87 87 - 0 means HTTP error
88 88 - 1 means we pushed and remote head count is unchanged *or*
89 89 we have outgoing changesets but refused to push
90 90 - other values as described by addchangegroup()
91 91 '''
92 92 pushop = pushoperation(repo, remote, force, revs, newbranch)
93 93 if pushop.remote.local():
94 94 missing = (set(pushop.repo.requirements)
95 95 - pushop.remote.local().supported)
96 96 if missing:
97 97 msg = _("required features are not"
98 98 " supported in the destination:"
99 99 " %s") % (', '.join(sorted(missing)))
100 100 raise util.Abort(msg)
101 101
102 102 # there are two ways to push to remote repo:
103 103 #
104 104 # addchangegroup assumes local user can lock remote
105 105 # repo (local filesystem, old ssh servers).
106 106 #
107 107 # unbundle assumes local user cannot lock remote repo (new ssh
108 108 # servers, http servers).
109 109
110 110 if not pushop.remote.canpush():
111 111 raise util.Abort(_("destination does not support push"))
112 112 # get local lock as we might write phase data
113 113 locallock = None
114 114 try:
115 115 locallock = pushop.repo.lock()
116 116 pushop.locallocked = True
117 117 except IOError, err:
118 118 pushop.locallocked = False
119 119 if err.errno != errno.EACCES:
120 120 raise
121 121 # source repo cannot be locked.
122 122 # We do not abort the push, but just disable the local phase
123 123 # synchronisation.
124 124 msg = 'cannot lock source repository: %s\n' % err
125 125 pushop.ui.debug(msg)
126 126 try:
127 127 pushop.repo.checkpush(pushop)
128 128 lock = None
129 129 unbundle = pushop.remote.capable('unbundle')
130 130 if not unbundle:
131 131 lock = pushop.remote.lock()
132 132 try:
133 133 _pushdiscovery(pushop)
134 134 if (pushop.repo.ui.configbool('experimental', 'bundle2-exp',
135 135 False)
136 136 and pushop.remote.capable('bundle2-exp')):
137 137 _pushbundle2(pushop)
138 138 _pushchangeset(pushop)
139 139 _pushcomputecommonheads(pushop)
140 140 _pushsyncphase(pushop)
141 141 _pushobsolete(pushop)
142 142 finally:
143 143 if lock is not None:
144 144 lock.release()
145 145 finally:
146 146 if locallock is not None:
147 147 locallock.release()
148 148
149 149 _pushbookmark(pushop)
150 150 return pushop.ret
151 151
152 152 def _pushdiscovery(pushop):
153 153 # discovery
154 154 unfi = pushop.repo.unfiltered()
155 155 fci = discovery.findcommonincoming
156 156 commoninc = fci(unfi, pushop.remote, force=pushop.force)
157 157 common, inc, remoteheads = commoninc
158 158 fco = discovery.findcommonoutgoing
159 159 outgoing = fco(unfi, pushop.remote, onlyheads=pushop.revs,
160 160 commoninc=commoninc, force=pushop.force)
161 161 pushop.outgoing = outgoing
162 162 pushop.remoteheads = remoteheads
163 163 pushop.incoming = inc
164 164
165 165 def _pushcheckoutgoing(pushop):
166 166 outgoing = pushop.outgoing
167 167 unfi = pushop.repo.unfiltered()
168 168 if not outgoing.missing:
169 169 # nothing to push
170 170 scmutil.nochangesfound(unfi.ui, unfi, outgoing.excluded)
171 171 return False
172 172 # something to push
173 173 if not pushop.force:
174 174 # if repo.obsstore == False --> no obsolete
175 175 # then, save the iteration
176 176 if unfi.obsstore:
177 177 # this message are here for 80 char limit reason
178 178 mso = _("push includes obsolete changeset: %s!")
179 179 mst = "push includes %s changeset: %s!"
180 180 # plain versions for i18n tool to detect them
181 181 _("push includes unstable changeset: %s!")
182 182 _("push includes bumped changeset: %s!")
183 183 _("push includes divergent changeset: %s!")
184 184 # If we are to push if there is at least one
185 185 # obsolete or unstable changeset in missing, at
186 186 # least one of the missinghead will be obsolete or
187 187 # unstable. So checking heads only is ok
188 188 for node in outgoing.missingheads:
189 189 ctx = unfi[node]
190 190 if ctx.obsolete():
191 191 raise util.Abort(mso % ctx)
192 192 elif ctx.troubled():
193 193 raise util.Abort(_(mst)
194 194 % (ctx.troubles()[0],
195 195 ctx))
196 196 newbm = pushop.ui.configlist('bookmarks', 'pushing')
197 197 discovery.checkheads(unfi, pushop.remote, outgoing,
198 198 pushop.remoteheads,
199 199 pushop.newbranch,
200 200 bool(pushop.incoming),
201 201 newbm)
202 202 return True
203 203
204 204 def _pushb2ctx(pushop, bundler):
205 205 """handle changegroup push through bundle2
206 206
207 207 addchangegroup result is stored in the ``pushop.ret`` attribute.
208 208 """
209 209 if 'changesets' in pushop.stepsdone:
210 210 return
211 211 pushop.stepsdone.add('changesets')
212 212 # Send known heads to the server for race detection.
213 213 pushop.stepsdone.add('changesets')
214 214 if not _pushcheckoutgoing(pushop):
215 215 return
216 216 pushop.repo.prepushoutgoinghooks(pushop.repo,
217 217 pushop.remote,
218 218 pushop.outgoing)
219 219 if not pushop.force:
220 220 bundler.newpart('B2X:CHECK:HEADS', data=iter(pushop.remoteheads))
221 221 cg = changegroup.getlocalbundle(pushop.repo, 'push', pushop.outgoing)
222 222 cgpart = bundler.newpart('B2X:CHANGEGROUP', data=cg.getchunks())
223 223 def handlereply(op):
224 224 """extract addchangroup returns from server reply"""
225 225 cgreplies = op.records.getreplies(cgpart.id)
226 226 assert len(cgreplies['changegroup']) == 1
227 227 pushop.ret = cgreplies['changegroup'][0]['return']
228 228 return handlereply
229 229
230 230 # list of function that may decide to add parts to an outgoing bundle2
231 231 bundle2partsgenerators = [_pushb2ctx]
232 232
233 233 def _pushbundle2(pushop):
234 234 """push data to the remote using bundle2
235 235
236 236 The only currently supported type of data is changegroup but this will
237 237 evolve in the future."""
238 238 bundler = bundle2.bundle20(pushop.ui, bundle2.bundle2caps(pushop.remote))
239 239 # create reply capability
240 240 capsblob = bundle2.encodecaps(pushop.repo.bundle2caps)
241 241 bundler.newpart('b2x:replycaps', data=capsblob)
242 242 replyhandlers = []
243 243 for partgen in bundle2partsgenerators:
244 244 ret = partgen(pushop, bundler)
245 245 if callable(ret):
246 246 replyhandlers.append(ret)
247 247 # do not push if nothing to push
248 248 if bundler.nbparts <= 1:
249 249 return
250 250 stream = util.chunkbuffer(bundler.getchunks())
251 251 try:
252 252 reply = pushop.remote.unbundle(stream, ['force'], 'push')
253 253 except error.BundleValueError, exc:
254 254 raise util.Abort('missing support for %s' % exc)
255 255 try:
256 256 op = bundle2.processbundle(pushop.repo, reply)
257 257 except error.BundleValueError, exc:
258 258 raise util.Abort('missing support for %s' % exc)
259 259 for rephand in replyhandlers:
260 260 rephand(op)
261 261
262 262 def _pushchangeset(pushop):
263 263 """Make the actual push of changeset bundle to remote repo"""
264 264 if 'changesets' in pushop.stepsdone:
265 265 return
266 266 pushop.stepsdone.add('changesets')
267 267 if not _pushcheckoutgoing(pushop):
268 268 return
269 269 pushop.repo.prepushoutgoinghooks(pushop.repo,
270 270 pushop.remote,
271 271 pushop.outgoing)
272 272 outgoing = pushop.outgoing
273 273 unbundle = pushop.remote.capable('unbundle')
274 274 # TODO: get bundlecaps from remote
275 275 bundlecaps = None
276 276 # create a changegroup from local
277 277 if pushop.revs is None and not (outgoing.excluded
278 278 or pushop.repo.changelog.filteredrevs):
279 279 # push everything,
280 280 # use the fast path, no race possible on push
281 281 bundler = changegroup.bundle10(pushop.repo, bundlecaps)
282 282 cg = changegroup.getsubset(pushop.repo,
283 283 outgoing,
284 284 bundler,
285 285 'push',
286 286 fastpath=True)
287 287 else:
288 288 cg = changegroup.getlocalbundle(pushop.repo, 'push', outgoing,
289 289 bundlecaps)
290 290
291 291 # apply changegroup to remote
292 292 if unbundle:
293 293 # local repo finds heads on server, finds out what
294 294 # revs it must push. once revs transferred, if server
295 295 # finds it has different heads (someone else won
296 296 # commit/push race), server aborts.
297 297 if pushop.force:
298 298 remoteheads = ['force']
299 299 else:
300 300 remoteheads = pushop.remoteheads
301 301 # ssh: return remote's addchangegroup()
302 302 # http: return remote's addchangegroup() or 0 for error
303 303 pushop.ret = pushop.remote.unbundle(cg, remoteheads,
304 304 pushop.repo.url())
305 305 else:
306 306 # we return an integer indicating remote head count
307 307 # change
308 308 pushop.ret = pushop.remote.addchangegroup(cg, 'push', pushop.repo.url())
309 309
310 310 def _pushcomputecommonheads(pushop):
311 311 unfi = pushop.repo.unfiltered()
312 312 if pushop.ret:
313 313 # push succeed, synchronize target of the push
314 314 cheads = pushop.outgoing.missingheads
315 315 elif pushop.revs is None:
316 316 # All out push fails. synchronize all common
317 317 cheads = pushop.outgoing.commonheads
318 318 else:
319 319 # I want cheads = heads(::missingheads and ::commonheads)
320 320 # (missingheads is revs with secret changeset filtered out)
321 321 #
322 322 # This can be expressed as:
323 323 # cheads = ( (missingheads and ::commonheads)
324 324 # + (commonheads and ::missingheads))"
325 325 # )
326 326 #
327 327 # while trying to push we already computed the following:
328 328 # common = (::commonheads)
329 329 # missing = ((commonheads::missingheads) - commonheads)
330 330 #
331 331 # We can pick:
332 332 # * missingheads part of common (::commonheads)
333 333 common = set(pushop.outgoing.common)
334 334 nm = pushop.repo.changelog.nodemap
335 335 cheads = [node for node in pushop.revs if nm[node] in common]
336 336 # and
337 337 # * commonheads parents on missing
338 338 revset = unfi.set('%ln and parents(roots(%ln))',
339 339 pushop.outgoing.commonheads,
340 340 pushop.outgoing.missing)
341 341 cheads.extend(c.node() for c in revset)
342 342 pushop.commonheads = cheads
343 343
344 344 def _pushsyncphase(pushop):
345 345 """synchronise phase information locally and remotely"""
346 346 unfi = pushop.repo.unfiltered()
347 347 cheads = pushop.commonheads
348 348 # even when we don't push, exchanging phase data is useful
349 349 remotephases = pushop.remote.listkeys('phases')
350 350 if (pushop.ui.configbool('ui', '_usedassubrepo', False)
351 351 and remotephases # server supports phases
352 352 and pushop.ret is None # nothing was pushed
353 353 and remotephases.get('publishing', False)):
354 354 # When:
355 355 # - this is a subrepo push
356 356 # - and remote support phase
357 357 # - and no changeset was pushed
358 358 # - and remote is publishing
359 359 # We may be in issue 3871 case!
360 360 # We drop the possible phase synchronisation done by
361 361 # courtesy to publish changesets possibly locally draft
362 362 # on the remote.
363 363 remotephases = {'publishing': 'True'}
364 364 if not remotephases: # old server or public only reply from non-publishing
365 365 _localphasemove(pushop, cheads)
366 366 # don't push any phase data as there is nothing to push
367 367 else:
368 368 ana = phases.analyzeremotephases(pushop.repo, cheads,
369 369 remotephases)
370 370 pheads, droots = ana
371 371 ### Apply remote phase on local
372 372 if remotephases.get('publishing', False):
373 373 _localphasemove(pushop, cheads)
374 374 else: # publish = False
375 375 _localphasemove(pushop, pheads)
376 376 _localphasemove(pushop, cheads, phases.draft)
377 377 ### Apply local phase on remote
378 378
379 379 # Get the list of all revs draft on remote by public here.
380 380 # XXX Beware that revset break if droots is not strictly
381 381 # XXX root we may want to ensure it is but it is costly
382 382 outdated = unfi.set('heads((%ln::%ln) and public())',
383 383 droots, cheads)
384 384
385 385 b2caps = bundle2.bundle2caps(pushop.remote)
386 386 if 'b2x:pushkey' in b2caps:
387 387 # server supports bundle2, let's do a batched push through it
388 388 #
389 389 # This will eventually be unified with the changesets bundle2 push
390 390 bundler = bundle2.bundle20(pushop.ui, b2caps)
391 391 capsblob = bundle2.encodecaps(pushop.repo.bundle2caps)
392 392 bundler.newpart('b2x:replycaps', data=capsblob)
393 393 part2node = []
394 394 enc = pushkey.encode
395 395 for newremotehead in outdated:
396 396 part = bundler.newpart('b2x:pushkey')
397 397 part.addparam('namespace', enc('phases'))
398 398 part.addparam('key', enc(newremotehead.hex()))
399 399 part.addparam('old', enc(str(phases.draft)))
400 400 part.addparam('new', enc(str(phases.public)))
401 401 part2node.append((part.id, newremotehead))
402 402 stream = util.chunkbuffer(bundler.getchunks())
403 403 try:
404 404 reply = pushop.remote.unbundle(stream, ['force'], 'push')
405 405 op = bundle2.processbundle(pushop.repo, reply)
406 406 except error.BundleValueError, exc:
407 407 raise util.Abort('missing support for %s' % exc)
408 408 for partid, node in part2node:
409 409 partrep = op.records.getreplies(partid)
410 410 results = partrep['pushkey']
411 411 assert len(results) <= 1
412 412 msg = None
413 413 if not results:
414 414 msg = _('server ignored update of %s to public!\n') % node
415 415 elif not int(results[0]['return']):
416 416 msg = _('updating %s to public failed!\n') % node
417 417 if msg is not None:
418 418 pushop.ui.warn(msg)
419 419
420 420 else:
421 421 # fallback to independant pushkey command
422 422 for newremotehead in outdated:
423 423 r = pushop.remote.pushkey('phases',
424 424 newremotehead.hex(),
425 425 str(phases.draft),
426 426 str(phases.public))
427 427 if not r:
428 428 pushop.ui.warn(_('updating %s to public failed!\n')
429 429 % newremotehead)
430 430
431 431 def _localphasemove(pushop, nodes, phase=phases.public):
432 432 """move <nodes> to <phase> in the local source repo"""
433 433 if pushop.locallocked:
434 434 phases.advanceboundary(pushop.repo, phase, nodes)
435 435 else:
436 436 # repo is not locked, do not change any phases!
437 437 # Informs the user that phases should have been moved when
438 438 # applicable.
439 439 actualmoves = [n for n in nodes if phase < pushop.repo[n].phase()]
440 440 phasestr = phases.phasenames[phase]
441 441 if actualmoves:
442 442 pushop.ui.status(_('cannot lock source repo, skipping '
443 443 'local %s phase update\n') % phasestr)
444 444
445 445 def _pushobsolete(pushop):
446 446 """utility function to push obsolete markers to a remote"""
447 447 pushop.ui.debug('try to push obsolete markers to remote\n')
448 448 repo = pushop.repo
449 449 remote = pushop.remote
450 450 if (obsolete._enabled and repo.obsstore and
451 451 'obsolete' in remote.listkeys('namespaces')):
452 452 rslts = []
453 453 remotedata = repo.listkeys('obsolete')
454 454 for key in sorted(remotedata, reverse=True):
455 455 # reverse sort to ensure we end with dump0
456 456 data = remotedata[key]
457 457 rslts.append(remote.pushkey('obsolete', key, '', data))
458 458 if [r for r in rslts if not r]:
459 459 msg = _('failed to push some obsolete markers!\n')
460 460 repo.ui.warn(msg)
461 461
462 462 def _pushbookmark(pushop):
463 463 """Update bookmark position on remote"""
464 464 ui = pushop.ui
465 465 repo = pushop.repo.unfiltered()
466 466 remote = pushop.remote
467 467 ui.debug("checking for updated bookmarks\n")
468 468 revnums = map(repo.changelog.rev, pushop.revs or [])
469 469 ancestors = [a for a in repo.changelog.ancestors(revnums, inclusive=True)]
470 470 (addsrc, adddst, advsrc, advdst, diverge, differ, invalid
471 471 ) = bookmarks.compare(repo, repo._bookmarks, remote.listkeys('bookmarks'),
472 472 srchex=hex)
473 473
474 474 for b, scid, dcid in advsrc:
475 475 if ancestors and repo[scid].rev() not in ancestors:
476 476 continue
477 477 if remote.pushkey('bookmarks', b, dcid, scid):
478 478 ui.status(_("updating bookmark %s\n") % b)
479 479 else:
480 480 ui.warn(_('updating bookmark %s failed!\n') % b)
481 481
482 482 class pulloperation(object):
483 483 """A object that represent a single pull operation
484 484
485 485 It purpose is to carry push related state and very common operation.
486 486
487 487 A new should be created at the beginning of each pull and discarded
488 488 afterward.
489 489 """
490 490
491 491 def __init__(self, repo, remote, heads=None, force=False):
492 492 # repo we pull into
493 493 self.repo = repo
494 494 # repo we pull from
495 495 self.remote = remote
496 496 # revision we try to pull (None is "all")
497 497 self.heads = heads
498 498 # do we force pull?
499 499 self.force = force
500 500 # the name the pull transaction
501 501 self._trname = 'pull\n' + util.hidepassword(remote.url())
502 502 # hold the transaction once created
503 503 self._tr = None
504 504 # set of common changeset between local and remote before pull
505 505 self.common = None
506 506 # set of pulled head
507 507 self.rheads = None
508 508 # list of missing changeset to fetch remotely
509 509 self.fetch = None
510 510 # result of changegroup pulling (used as return code by pull)
511 511 self.cgresult = None
512 512 # list of step remaining todo (related to future bundle2 usage)
513 513 self.todosteps = set(['changegroup', 'phases', 'obsmarkers'])
514 514
515 515 @util.propertycache
516 516 def pulledsubset(self):
517 517 """heads of the set of changeset target by the pull"""
518 518 # compute target subset
519 519 if self.heads is None:
520 520 # We pulled every thing possible
521 521 # sync on everything common
522 522 c = set(self.common)
523 523 ret = list(self.common)
524 524 for n in self.rheads:
525 525 if n not in c:
526 526 ret.append(n)
527 527 return ret
528 528 else:
529 529 # We pulled a specific subset
530 530 # sync on this subset
531 531 return self.heads
532 532
533 533 def gettransaction(self):
534 534 """get appropriate pull transaction, creating it if needed"""
535 535 if self._tr is None:
536 536 self._tr = self.repo.transaction(self._trname)
537 537 return self._tr
538 538
539 539 def closetransaction(self):
540 540 """close transaction if created"""
541 541 if self._tr is not None:
542 542 self._tr.close()
543 543
544 544 def releasetransaction(self):
545 545 """release transaction if created"""
546 546 if self._tr is not None:
547 547 self._tr.release()
548 548
549 549 def pull(repo, remote, heads=None, force=False):
550 550 pullop = pulloperation(repo, remote, heads, force)
551 551 if pullop.remote.local():
552 552 missing = set(pullop.remote.requirements) - pullop.repo.supported
553 553 if missing:
554 554 msg = _("required features are not"
555 555 " supported in the destination:"
556 556 " %s") % (', '.join(sorted(missing)))
557 557 raise util.Abort(msg)
558 558
559 559 lock = pullop.repo.lock()
560 560 try:
561 561 _pulldiscovery(pullop)
562 562 if (pullop.repo.ui.configbool('experimental', 'bundle2-exp', False)
563 563 and pullop.remote.capable('bundle2-exp')):
564 564 _pullbundle2(pullop)
565 565 if 'changegroup' in pullop.todosteps:
566 566 _pullchangeset(pullop)
567 567 if 'phases' in pullop.todosteps:
568 568 _pullphase(pullop)
569 569 if 'obsmarkers' in pullop.todosteps:
570 570 _pullobsolete(pullop)
571 571 pullop.closetransaction()
572 572 finally:
573 573 pullop.releasetransaction()
574 574 lock.release()
575 575
576 576 return pullop.cgresult
577 577
578 578 def _pulldiscovery(pullop):
579 579 """discovery phase for the pull
580 580
581 581 Current handle changeset discovery only, will change handle all discovery
582 582 at some point."""
583 583 tmp = discovery.findcommonincoming(pullop.repo.unfiltered(),
584 584 pullop.remote,
585 585 heads=pullop.heads,
586 586 force=pullop.force)
587 587 pullop.common, pullop.fetch, pullop.rheads = tmp
588 588
589 589 def _pullbundle2(pullop):
590 590 """pull data using bundle2
591 591
592 592 For now, the only supported data are changegroup."""
593 593 remotecaps = bundle2.bundle2caps(pullop.remote)
594 594 kwargs = {'bundlecaps': caps20to10(pullop.repo)}
595 595 # pulling changegroup
596 596 pullop.todosteps.remove('changegroup')
597 597
598 598 kwargs['common'] = pullop.common
599 599 kwargs['heads'] = pullop.heads or pullop.rheads
600 600 if 'b2x:listkeys' in remotecaps:
601 601 kwargs['listkeys'] = ['phase']
602 602 if not pullop.fetch:
603 603 pullop.repo.ui.status(_("no changes found\n"))
604 604 pullop.cgresult = 0
605 605 else:
606 606 if pullop.heads is None and list(pullop.common) == [nullid]:
607 607 pullop.repo.ui.status(_("requesting all changes\n"))
608 608 _pullbundle2extraprepare(pullop, kwargs)
609 609 if kwargs.keys() == ['format']:
610 610 return # nothing to pull
611 611 bundle = pullop.remote.getbundle('pull', **kwargs)
612 612 try:
613 613 op = bundle2.processbundle(pullop.repo, bundle, pullop.gettransaction)
614 614 except error.BundleValueError, exc:
615 615 raise util.Abort('missing support for %s' % exc)
616 616
617 617 if pullop.fetch:
618 618 assert len(op.records['changegroup']) == 1
619 619 pullop.cgresult = op.records['changegroup'][0]['return']
620 620
621 621 # processing phases change
622 622 for namespace, value in op.records['listkeys']:
623 623 if namespace == 'phases':
624 624 _pullapplyphases(pullop, value)
625 625
626 626 def _pullbundle2extraprepare(pullop, kwargs):
627 627 """hook function so that extensions can extend the getbundle call"""
628 628 pass
629 629
630 630 def _pullchangeset(pullop):
631 631 """pull changeset from unbundle into the local repo"""
632 632 # We delay the open of the transaction as late as possible so we
633 633 # don't open transaction for nothing or you break future useful
634 634 # rollback call
635 635 pullop.todosteps.remove('changegroup')
636 636 if not pullop.fetch:
637 637 pullop.repo.ui.status(_("no changes found\n"))
638 638 pullop.cgresult = 0
639 639 return
640 640 pullop.gettransaction()
641 641 if pullop.heads is None and list(pullop.common) == [nullid]:
642 642 pullop.repo.ui.status(_("requesting all changes\n"))
643 643 elif pullop.heads is None and pullop.remote.capable('changegroupsubset'):
644 644 # issue1320, avoid a race if remote changed after discovery
645 645 pullop.heads = pullop.rheads
646 646
647 647 if pullop.remote.capable('getbundle'):
648 648 # TODO: get bundlecaps from remote
649 649 cg = pullop.remote.getbundle('pull', common=pullop.common,
650 650 heads=pullop.heads or pullop.rheads)
651 651 elif pullop.heads is None:
652 652 cg = pullop.remote.changegroup(pullop.fetch, 'pull')
653 653 elif not pullop.remote.capable('changegroupsubset'):
654 654 raise util.Abort(_("partial pull cannot be done because "
655 655 "other repository doesn't support "
656 656 "changegroupsubset."))
657 657 else:
658 658 cg = pullop.remote.changegroupsubset(pullop.fetch, pullop.heads, 'pull')
659 659 pullop.cgresult = changegroup.addchangegroup(pullop.repo, cg, 'pull',
660 660 pullop.remote.url())
661 661
662 662 def _pullphase(pullop):
663 663 # Get remote phases data from remote
664 664 remotephases = pullop.remote.listkeys('phases')
665 665 _pullapplyphases(pullop, remotephases)
666 666
667 667 def _pullapplyphases(pullop, remotephases):
668 668 """apply phase movement from observed remote state"""
669 669 pullop.todosteps.remove('phases')
670 670 publishing = bool(remotephases.get('publishing', False))
671 671 if remotephases and not publishing:
672 672 # remote is new and unpublishing
673 673 pheads, _dr = phases.analyzeremotephases(pullop.repo,
674 674 pullop.pulledsubset,
675 675 remotephases)
676 676 phases.advanceboundary(pullop.repo, phases.public, pheads)
677 677 phases.advanceboundary(pullop.repo, phases.draft,
678 678 pullop.pulledsubset)
679 679 else:
680 680 # Remote is old or publishing all common changesets
681 681 # should be seen as public
682 682 phases.advanceboundary(pullop.repo, phases.public,
683 683 pullop.pulledsubset)
684 684
685 685 def _pullobsolete(pullop):
686 686 """utility function to pull obsolete markers from a remote
687 687
688 688 The `gettransaction` is function that return the pull transaction, creating
689 689 one if necessary. We return the transaction to inform the calling code that
690 690 a new transaction have been created (when applicable).
691 691
692 692 Exists mostly to allow overriding for experimentation purpose"""
693 693 pullop.todosteps.remove('obsmarkers')
694 694 tr = None
695 695 if obsolete._enabled:
696 696 pullop.repo.ui.debug('fetching remote obsolete markers\n')
697 697 remoteobs = pullop.remote.listkeys('obsolete')
698 698 if 'dump0' in remoteobs:
699 699 tr = pullop.gettransaction()
700 700 for key in sorted(remoteobs, reverse=True):
701 701 if key.startswith('dump'):
702 702 data = base85.b85decode(remoteobs[key])
703 703 pullop.repo.obsstore.mergemarkers(tr, data)
704 704 pullop.repo.invalidatevolatilesets()
705 705 return tr
706 706
707 707 def caps20to10(repo):
708 708 """return a set with appropriate options to use bundle20 during getbundle"""
709 709 caps = set(['HG2X'])
710 710 capsblob = bundle2.encodecaps(repo.bundle2caps)
711 711 caps.add('bundle2=' + urllib.quote(capsblob))
712 712 return caps
713 713
714 714 def getbundle(repo, source, heads=None, common=None, bundlecaps=None,
715 715 **kwargs):
716 716 """return a full bundle (with potentially multiple kind of parts)
717 717
718 718 Could be a bundle HG10 or a bundle HG2X depending on bundlecaps
719 719 passed. For now, the bundle can contain only changegroup, but this will
720 720 changes when more part type will be available for bundle2.
721 721
722 722 This is different from changegroup.getbundle that only returns an HG10
723 723 changegroup bundle. They may eventually get reunited in the future when we
724 724 have a clearer idea of the API we what to query different data.
725 725
726 726 The implementation is at a very early stage and will get massive rework
727 727 when the API of bundle is refined.
728 728 """
729 # build changegroup bundle here.
730 cg = changegroup.getbundle(repo, source, heads=heads,
731 common=common, bundlecaps=bundlecaps)
729 cg = None
730 if kwargs.get('cg', True):
731 # build changegroup bundle here.
732 cg = changegroup.getbundle(repo, source, heads=heads,
733 common=common, bundlecaps=bundlecaps)
734 elif 'HG2X' not in bundlecaps:
735 raise ValueError(_('request for bundle10 must include changegroup'))
732 736 if bundlecaps is None or 'HG2X' not in bundlecaps:
733 737 if kwargs:
734 738 raise ValueError(_('unsupported getbundle arguments: %s')
735 739 % ', '.join(sorted(kwargs.keys())))
736 740 return cg
737 741 # very crude first implementation,
738 742 # the bundle API will change and the generation will be done lazily.
739 743 b2caps = {}
740 744 for bcaps in bundlecaps:
741 745 if bcaps.startswith('bundle2='):
742 746 blob = urllib.unquote(bcaps[len('bundle2='):])
743 747 b2caps.update(bundle2.decodecaps(blob))
744 748 bundler = bundle2.bundle20(repo.ui, b2caps)
745 749 if cg:
746 750 bundler.newpart('b2x:changegroup', data=cg.getchunks())
747 751 listkeys = kwargs.get('listkeys', ())
748 752 for namespace in listkeys:
749 753 part = bundler.newpart('b2x:listkeys')
750 754 part.addparam('namespace', namespace)
751 755 keys = repo.listkeys(namespace).items()
752 756 part.data = pushkey.encodekeys(keys)
753 757 _getbundleextrapart(bundler, repo, source, heads=heads, common=common,
754 758 bundlecaps=bundlecaps, **kwargs)
755 759 return util.chunkbuffer(bundler.getchunks())
756 760
757 761 def _getbundleextrapart(bundler, repo, source, heads=None, common=None,
758 762 bundlecaps=None, **kwargs):
759 763 """hook function to let extensions add parts to the requested bundle"""
760 764 pass
761 765
762 766 def check_heads(repo, their_heads, context):
763 767 """check if the heads of a repo have been modified
764 768
765 769 Used by peer for unbundling.
766 770 """
767 771 heads = repo.heads()
768 772 heads_hash = util.sha1(''.join(sorted(heads))).digest()
769 773 if not (their_heads == ['force'] or their_heads == heads or
770 774 their_heads == ['hashed', heads_hash]):
771 775 # someone else committed/pushed/unbundled while we
772 776 # were transferring data
773 777 raise error.PushRaced('repository changed while %s - '
774 778 'please try again' % context)
775 779
776 780 def unbundle(repo, cg, heads, source, url):
777 781 """Apply a bundle to a repo.
778 782
779 783 this function makes sure the repo is locked during the application and have
780 784 mechanism to check that no push race occurred between the creation of the
781 785 bundle and its application.
782 786
783 787 If the push was raced as PushRaced exception is raised."""
784 788 r = 0
785 789 # need a transaction when processing a bundle2 stream
786 790 tr = None
787 791 lock = repo.lock()
788 792 try:
789 793 check_heads(repo, heads, 'uploading changes')
790 794 # push can proceed
791 795 if util.safehasattr(cg, 'params'):
792 796 try:
793 797 tr = repo.transaction('unbundle')
794 798 tr.hookargs['bundle2-exp'] = '1'
795 799 r = bundle2.processbundle(repo, cg, lambda: tr).reply
796 800 cl = repo.unfiltered().changelog
797 801 p = cl.writepending() and repo.root or ""
798 802 repo.hook('b2x-pretransactionclose', throw=True, source=source,
799 803 url=url, pending=p, **tr.hookargs)
800 804 tr.close()
801 805 repo.hook('b2x-transactionclose', source=source, url=url,
802 806 **tr.hookargs)
803 807 except Exception, exc:
804 808 exc.duringunbundle2 = True
805 809 raise
806 810 else:
807 811 r = changegroup.addchangegroup(repo, cg, source, url)
808 812 finally:
809 813 if tr is not None:
810 814 tr.release()
811 815 lock.release()
812 816 return r
@@ -1,1781 +1,1783
1 1 # localrepo.py - read/write repository class for mercurial
2 2 #
3 3 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7 from node import hex, nullid, short
8 8 from i18n import _
9 9 import urllib
10 10 import peer, changegroup, subrepo, pushkey, obsolete, repoview
11 11 import changelog, dirstate, filelog, manifest, context, bookmarks, phases
12 12 import lock as lockmod
13 13 import transaction, store, encoding, exchange, bundle2
14 14 import scmutil, util, extensions, hook, error, revset
15 15 import match as matchmod
16 16 import merge as mergemod
17 17 import tags as tagsmod
18 18 from lock import release
19 19 import weakref, errno, os, time, inspect
20 20 import branchmap, pathutil
21 21 propertycache = util.propertycache
22 22 filecache = scmutil.filecache
23 23
24 24 class repofilecache(filecache):
25 25 """All filecache usage on repo are done for logic that should be unfiltered
26 26 """
27 27
28 28 def __get__(self, repo, type=None):
29 29 return super(repofilecache, self).__get__(repo.unfiltered(), type)
30 30 def __set__(self, repo, value):
31 31 return super(repofilecache, self).__set__(repo.unfiltered(), value)
32 32 def __delete__(self, repo):
33 33 return super(repofilecache, self).__delete__(repo.unfiltered())
34 34
35 35 class storecache(repofilecache):
36 36 """filecache for files in the store"""
37 37 def join(self, obj, fname):
38 38 return obj.sjoin(fname)
39 39
40 40 class unfilteredpropertycache(propertycache):
41 41 """propertycache that apply to unfiltered repo only"""
42 42
43 43 def __get__(self, repo, type=None):
44 44 unfi = repo.unfiltered()
45 45 if unfi is repo:
46 46 return super(unfilteredpropertycache, self).__get__(unfi)
47 47 return getattr(unfi, self.name)
48 48
49 49 class filteredpropertycache(propertycache):
50 50 """propertycache that must take filtering in account"""
51 51
52 52 def cachevalue(self, obj, value):
53 53 object.__setattr__(obj, self.name, value)
54 54
55 55
56 56 def hasunfilteredcache(repo, name):
57 57 """check if a repo has an unfilteredpropertycache value for <name>"""
58 58 return name in vars(repo.unfiltered())
59 59
60 60 def unfilteredmethod(orig):
61 61 """decorate method that always need to be run on unfiltered version"""
62 62 def wrapper(repo, *args, **kwargs):
63 63 return orig(repo.unfiltered(), *args, **kwargs)
64 64 return wrapper
65 65
66 66 moderncaps = set(('lookup', 'branchmap', 'pushkey', 'known', 'getbundle',
67 67 'unbundle'))
68 68 legacycaps = moderncaps.union(set(['changegroupsubset']))
69 69
70 70 class localpeer(peer.peerrepository):
71 71 '''peer for a local repo; reflects only the most recent API'''
72 72
73 73 def __init__(self, repo, caps=moderncaps):
74 74 peer.peerrepository.__init__(self)
75 75 self._repo = repo.filtered('served')
76 76 self.ui = repo.ui
77 77 self._caps = repo._restrictcapabilities(caps)
78 78 self.requirements = repo.requirements
79 79 self.supportedformats = repo.supportedformats
80 80
81 81 def close(self):
82 82 self._repo.close()
83 83
84 84 def _capabilities(self):
85 85 return self._caps
86 86
87 87 def local(self):
88 88 return self._repo
89 89
90 90 def canpush(self):
91 91 return True
92 92
93 93 def url(self):
94 94 return self._repo.url()
95 95
96 96 def lookup(self, key):
97 97 return self._repo.lookup(key)
98 98
99 99 def branchmap(self):
100 100 return self._repo.branchmap()
101 101
102 102 def heads(self):
103 103 return self._repo.heads()
104 104
105 105 def known(self, nodes):
106 106 return self._repo.known(nodes)
107 107
108 108 def getbundle(self, source, heads=None, common=None, bundlecaps=None,
109 109 format='HG10', **kwargs):
110 110 cg = exchange.getbundle(self._repo, source, heads=heads,
111 111 common=common, bundlecaps=bundlecaps, **kwargs)
112 112 if bundlecaps is not None and 'HG2X' in bundlecaps:
113 113 # When requesting a bundle2, getbundle returns a stream to make the
114 114 # wire level function happier. We need to build a proper object
115 115 # from it in local peer.
116 116 cg = bundle2.unbundle20(self.ui, cg)
117 117 return cg
118 118
119 119 # TODO We might want to move the next two calls into legacypeer and add
120 120 # unbundle instead.
121 121
122 122 def unbundle(self, cg, heads, url):
123 123 """apply a bundle on a repo
124 124
125 125 This function handles the repo locking itself."""
126 126 try:
127 127 cg = exchange.readbundle(self.ui, cg, None)
128 128 ret = exchange.unbundle(self._repo, cg, heads, 'push', url)
129 129 if util.safehasattr(ret, 'getchunks'):
130 130 # This is a bundle20 object, turn it into an unbundler.
131 131 # This little dance should be dropped eventually when the API
132 132 # is finally improved.
133 133 stream = util.chunkbuffer(ret.getchunks())
134 134 ret = bundle2.unbundle20(self.ui, stream)
135 135 return ret
136 136 except error.PushRaced, exc:
137 137 raise error.ResponseError(_('push failed:'), str(exc))
138 138
139 139 def lock(self):
140 140 return self._repo.lock()
141 141
142 142 def addchangegroup(self, cg, source, url):
143 143 return changegroup.addchangegroup(self._repo, cg, source, url)
144 144
145 145 def pushkey(self, namespace, key, old, new):
146 146 return self._repo.pushkey(namespace, key, old, new)
147 147
148 148 def listkeys(self, namespace):
149 149 return self._repo.listkeys(namespace)
150 150
151 151 def debugwireargs(self, one, two, three=None, four=None, five=None):
152 152 '''used to test argument passing over the wire'''
153 153 return "%s %s %s %s %s" % (one, two, three, four, five)
154 154
155 155 class locallegacypeer(localpeer):
156 156 '''peer extension which implements legacy methods too; used for tests with
157 157 restricted capabilities'''
158 158
159 159 def __init__(self, repo):
160 160 localpeer.__init__(self, repo, caps=legacycaps)
161 161
162 162 def branches(self, nodes):
163 163 return self._repo.branches(nodes)
164 164
165 165 def between(self, pairs):
166 166 return self._repo.between(pairs)
167 167
168 168 def changegroup(self, basenodes, source):
169 169 return changegroup.changegroup(self._repo, basenodes, source)
170 170
171 171 def changegroupsubset(self, bases, heads, source):
172 172 return changegroup.changegroupsubset(self._repo, bases, heads, source)
173 173
174 174 class localrepository(object):
175 175
176 176 supportedformats = set(('revlogv1', 'generaldelta'))
177 177 _basesupported = supportedformats | set(('store', 'fncache', 'shared',
178 178 'dotencode'))
179 179 openerreqs = set(('revlogv1', 'generaldelta'))
180 180 requirements = ['revlogv1']
181 181 filtername = None
182 182
183 183 bundle2caps = {'HG2X': (),
184 184 'b2x:listkeys': (),
185 'b2x:pushkey': ()}
185 'b2x:pushkey': (),
186 'b2x:changegroup': (),
187 }
186 188
187 189 # a list of (ui, featureset) functions.
188 190 # only functions defined in module of enabled extensions are invoked
189 191 featuresetupfuncs = set()
190 192
191 193 def _baserequirements(self, create):
192 194 return self.requirements[:]
193 195
194 196 def __init__(self, baseui, path=None, create=False):
195 197 self.wvfs = scmutil.vfs(path, expandpath=True, realpath=True)
196 198 self.wopener = self.wvfs
197 199 self.root = self.wvfs.base
198 200 self.path = self.wvfs.join(".hg")
199 201 self.origroot = path
200 202 self.auditor = pathutil.pathauditor(self.root, self._checknested)
201 203 self.vfs = scmutil.vfs(self.path)
202 204 self.opener = self.vfs
203 205 self.baseui = baseui
204 206 self.ui = baseui.copy()
205 207 self.ui.copy = baseui.copy # prevent copying repo configuration
206 208 # A list of callback to shape the phase if no data were found.
207 209 # Callback are in the form: func(repo, roots) --> processed root.
208 210 # This list it to be filled by extension during repo setup
209 211 self._phasedefaults = []
210 212 try:
211 213 self.ui.readconfig(self.join("hgrc"), self.root)
212 214 extensions.loadall(self.ui)
213 215 except IOError:
214 216 pass
215 217
216 218 if self.featuresetupfuncs:
217 219 self.supported = set(self._basesupported) # use private copy
218 220 extmods = set(m.__name__ for n, m
219 221 in extensions.extensions(self.ui))
220 222 for setupfunc in self.featuresetupfuncs:
221 223 if setupfunc.__module__ in extmods:
222 224 setupfunc(self.ui, self.supported)
223 225 else:
224 226 self.supported = self._basesupported
225 227
226 228 if not self.vfs.isdir():
227 229 if create:
228 230 if not self.wvfs.exists():
229 231 self.wvfs.makedirs()
230 232 self.vfs.makedir(notindexed=True)
231 233 requirements = self._baserequirements(create)
232 234 if self.ui.configbool('format', 'usestore', True):
233 235 self.vfs.mkdir("store")
234 236 requirements.append("store")
235 237 if self.ui.configbool('format', 'usefncache', True):
236 238 requirements.append("fncache")
237 239 if self.ui.configbool('format', 'dotencode', True):
238 240 requirements.append('dotencode')
239 241 # create an invalid changelog
240 242 self.vfs.append(
241 243 "00changelog.i",
242 244 '\0\0\0\2' # represents revlogv2
243 245 ' dummy changelog to prevent using the old repo layout'
244 246 )
245 247 if self.ui.configbool('format', 'generaldelta', False):
246 248 requirements.append("generaldelta")
247 249 requirements = set(requirements)
248 250 else:
249 251 raise error.RepoError(_("repository %s not found") % path)
250 252 elif create:
251 253 raise error.RepoError(_("repository %s already exists") % path)
252 254 else:
253 255 try:
254 256 requirements = scmutil.readrequires(self.vfs, self.supported)
255 257 except IOError, inst:
256 258 if inst.errno != errno.ENOENT:
257 259 raise
258 260 requirements = set()
259 261
260 262 self.sharedpath = self.path
261 263 try:
262 264 vfs = scmutil.vfs(self.vfs.read("sharedpath").rstrip('\n'),
263 265 realpath=True)
264 266 s = vfs.base
265 267 if not vfs.exists():
266 268 raise error.RepoError(
267 269 _('.hg/sharedpath points to nonexistent directory %s') % s)
268 270 self.sharedpath = s
269 271 except IOError, inst:
270 272 if inst.errno != errno.ENOENT:
271 273 raise
272 274
273 275 self.store = store.store(requirements, self.sharedpath, scmutil.vfs)
274 276 self.spath = self.store.path
275 277 self.svfs = self.store.vfs
276 278 self.sopener = self.svfs
277 279 self.sjoin = self.store.join
278 280 self.vfs.createmode = self.store.createmode
279 281 self._applyrequirements(requirements)
280 282 if create:
281 283 self._writerequirements()
282 284
283 285
284 286 self._branchcaches = {}
285 287 self.filterpats = {}
286 288 self._datafilters = {}
287 289 self._transref = self._lockref = self._wlockref = None
288 290
289 291 # A cache for various files under .hg/ that tracks file changes,
290 292 # (used by the filecache decorator)
291 293 #
292 294 # Maps a property name to its util.filecacheentry
293 295 self._filecache = {}
294 296
295 297 # hold sets of revision to be filtered
296 298 # should be cleared when something might have changed the filter value:
297 299 # - new changesets,
298 300 # - phase change,
299 301 # - new obsolescence marker,
300 302 # - working directory parent change,
301 303 # - bookmark changes
302 304 self.filteredrevcache = {}
303 305
304 306 def close(self):
305 307 pass
306 308
307 309 def _restrictcapabilities(self, caps):
308 310 # bundle2 is not ready for prime time, drop it unless explicitly
309 311 # required by the tests (or some brave tester)
310 312 if self.ui.configbool('experimental', 'bundle2-exp', False):
311 313 caps = set(caps)
312 314 capsblob = bundle2.encodecaps(self.bundle2caps)
313 315 caps.add('bundle2-exp=' + urllib.quote(capsblob))
314 316 return caps
315 317
316 318 def _applyrequirements(self, requirements):
317 319 self.requirements = requirements
318 320 self.sopener.options = dict((r, 1) for r in requirements
319 321 if r in self.openerreqs)
320 322 chunkcachesize = self.ui.configint('format', 'chunkcachesize')
321 323 if chunkcachesize is not None:
322 324 self.sopener.options['chunkcachesize'] = chunkcachesize
323 325
324 326 def _writerequirements(self):
325 327 reqfile = self.opener("requires", "w")
326 328 for r in sorted(self.requirements):
327 329 reqfile.write("%s\n" % r)
328 330 reqfile.close()
329 331
330 332 def _checknested(self, path):
331 333 """Determine if path is a legal nested repository."""
332 334 if not path.startswith(self.root):
333 335 return False
334 336 subpath = path[len(self.root) + 1:]
335 337 normsubpath = util.pconvert(subpath)
336 338
337 339 # XXX: Checking against the current working copy is wrong in
338 340 # the sense that it can reject things like
339 341 #
340 342 # $ hg cat -r 10 sub/x.txt
341 343 #
342 344 # if sub/ is no longer a subrepository in the working copy
343 345 # parent revision.
344 346 #
345 347 # However, it can of course also allow things that would have
346 348 # been rejected before, such as the above cat command if sub/
347 349 # is a subrepository now, but was a normal directory before.
348 350 # The old path auditor would have rejected by mistake since it
349 351 # panics when it sees sub/.hg/.
350 352 #
351 353 # All in all, checking against the working copy seems sensible
352 354 # since we want to prevent access to nested repositories on
353 355 # the filesystem *now*.
354 356 ctx = self[None]
355 357 parts = util.splitpath(subpath)
356 358 while parts:
357 359 prefix = '/'.join(parts)
358 360 if prefix in ctx.substate:
359 361 if prefix == normsubpath:
360 362 return True
361 363 else:
362 364 sub = ctx.sub(prefix)
363 365 return sub.checknested(subpath[len(prefix) + 1:])
364 366 else:
365 367 parts.pop()
366 368 return False
367 369
368 370 def peer(self):
369 371 return localpeer(self) # not cached to avoid reference cycle
370 372
371 373 def unfiltered(self):
372 374 """Return unfiltered version of the repository
373 375
374 376 Intended to be overwritten by filtered repo."""
375 377 return self
376 378
377 379 def filtered(self, name):
378 380 """Return a filtered version of a repository"""
379 381 # build a new class with the mixin and the current class
380 382 # (possibly subclass of the repo)
381 383 class proxycls(repoview.repoview, self.unfiltered().__class__):
382 384 pass
383 385 return proxycls(self, name)
384 386
385 387 @repofilecache('bookmarks')
386 388 def _bookmarks(self):
387 389 return bookmarks.bmstore(self)
388 390
389 391 @repofilecache('bookmarks.current')
390 392 def _bookmarkcurrent(self):
391 393 return bookmarks.readcurrent(self)
392 394
393 395 def bookmarkheads(self, bookmark):
394 396 name = bookmark.split('@', 1)[0]
395 397 heads = []
396 398 for mark, n in self._bookmarks.iteritems():
397 399 if mark.split('@', 1)[0] == name:
398 400 heads.append(n)
399 401 return heads
400 402
401 403 @storecache('phaseroots')
402 404 def _phasecache(self):
403 405 return phases.phasecache(self, self._phasedefaults)
404 406
405 407 @storecache('obsstore')
406 408 def obsstore(self):
407 409 store = obsolete.obsstore(self.sopener)
408 410 if store and not obsolete._enabled:
409 411 # message is rare enough to not be translated
410 412 msg = 'obsolete feature not enabled but %i markers found!\n'
411 413 self.ui.warn(msg % len(list(store)))
412 414 return store
413 415
414 416 @storecache('00changelog.i')
415 417 def changelog(self):
416 418 c = changelog.changelog(self.sopener)
417 419 if 'HG_PENDING' in os.environ:
418 420 p = os.environ['HG_PENDING']
419 421 if p.startswith(self.root):
420 422 c.readpending('00changelog.i.a')
421 423 return c
422 424
423 425 @storecache('00manifest.i')
424 426 def manifest(self):
425 427 return manifest.manifest(self.sopener)
426 428
427 429 @repofilecache('dirstate')
428 430 def dirstate(self):
429 431 warned = [0]
430 432 def validate(node):
431 433 try:
432 434 self.changelog.rev(node)
433 435 return node
434 436 except error.LookupError:
435 437 if not warned[0]:
436 438 warned[0] = True
437 439 self.ui.warn(_("warning: ignoring unknown"
438 440 " working parent %s!\n") % short(node))
439 441 return nullid
440 442
441 443 return dirstate.dirstate(self.opener, self.ui, self.root, validate)
442 444
443 445 def __getitem__(self, changeid):
444 446 if changeid is None:
445 447 return context.workingctx(self)
446 448 return context.changectx(self, changeid)
447 449
448 450 def __contains__(self, changeid):
449 451 try:
450 452 return bool(self.lookup(changeid))
451 453 except error.RepoLookupError:
452 454 return False
453 455
454 456 def __nonzero__(self):
455 457 return True
456 458
457 459 def __len__(self):
458 460 return len(self.changelog)
459 461
460 462 def __iter__(self):
461 463 return iter(self.changelog)
462 464
463 465 def revs(self, expr, *args):
464 466 '''Return a list of revisions matching the given revset'''
465 467 expr = revset.formatspec(expr, *args)
466 468 m = revset.match(None, expr)
467 469 return m(self, revset.spanset(self))
468 470
469 471 def set(self, expr, *args):
470 472 '''
471 473 Yield a context for each matching revision, after doing arg
472 474 replacement via revset.formatspec
473 475 '''
474 476 for r in self.revs(expr, *args):
475 477 yield self[r]
476 478
477 479 def url(self):
478 480 return 'file:' + self.root
479 481
480 482 def hook(self, name, throw=False, **args):
481 483 """Call a hook, passing this repo instance.
482 484
483 485 This a convenience method to aid invoking hooks. Extensions likely
484 486 won't call this unless they have registered a custom hook or are
485 487 replacing code that is expected to call a hook.
486 488 """
487 489 return hook.hook(self.ui, self, name, throw, **args)
488 490
489 491 @unfilteredmethod
490 492 def _tag(self, names, node, message, local, user, date, extra={},
491 493 editor=False):
492 494 if isinstance(names, str):
493 495 names = (names,)
494 496
495 497 branches = self.branchmap()
496 498 for name in names:
497 499 self.hook('pretag', throw=True, node=hex(node), tag=name,
498 500 local=local)
499 501 if name in branches:
500 502 self.ui.warn(_("warning: tag %s conflicts with existing"
501 503 " branch name\n") % name)
502 504
503 505 def writetags(fp, names, munge, prevtags):
504 506 fp.seek(0, 2)
505 507 if prevtags and prevtags[-1] != '\n':
506 508 fp.write('\n')
507 509 for name in names:
508 510 m = munge and munge(name) or name
509 511 if (self._tagscache.tagtypes and
510 512 name in self._tagscache.tagtypes):
511 513 old = self.tags().get(name, nullid)
512 514 fp.write('%s %s\n' % (hex(old), m))
513 515 fp.write('%s %s\n' % (hex(node), m))
514 516 fp.close()
515 517
516 518 prevtags = ''
517 519 if local:
518 520 try:
519 521 fp = self.opener('localtags', 'r+')
520 522 except IOError:
521 523 fp = self.opener('localtags', 'a')
522 524 else:
523 525 prevtags = fp.read()
524 526
525 527 # local tags are stored in the current charset
526 528 writetags(fp, names, None, prevtags)
527 529 for name in names:
528 530 self.hook('tag', node=hex(node), tag=name, local=local)
529 531 return
530 532
531 533 try:
532 534 fp = self.wfile('.hgtags', 'rb+')
533 535 except IOError, e:
534 536 if e.errno != errno.ENOENT:
535 537 raise
536 538 fp = self.wfile('.hgtags', 'ab')
537 539 else:
538 540 prevtags = fp.read()
539 541
540 542 # committed tags are stored in UTF-8
541 543 writetags(fp, names, encoding.fromlocal, prevtags)
542 544
543 545 fp.close()
544 546
545 547 self.invalidatecaches()
546 548
547 549 if '.hgtags' not in self.dirstate:
548 550 self[None].add(['.hgtags'])
549 551
550 552 m = matchmod.exact(self.root, '', ['.hgtags'])
551 553 tagnode = self.commit(message, user, date, extra=extra, match=m,
552 554 editor=editor)
553 555
554 556 for name in names:
555 557 self.hook('tag', node=hex(node), tag=name, local=local)
556 558
557 559 return tagnode
558 560
559 561 def tag(self, names, node, message, local, user, date, editor=False):
560 562 '''tag a revision with one or more symbolic names.
561 563
562 564 names is a list of strings or, when adding a single tag, names may be a
563 565 string.
564 566
565 567 if local is True, the tags are stored in a per-repository file.
566 568 otherwise, they are stored in the .hgtags file, and a new
567 569 changeset is committed with the change.
568 570
569 571 keyword arguments:
570 572
571 573 local: whether to store tags in non-version-controlled file
572 574 (default False)
573 575
574 576 message: commit message to use if committing
575 577
576 578 user: name of user to use if committing
577 579
578 580 date: date tuple to use if committing'''
579 581
580 582 if not local:
581 583 for x in self.status()[:5]:
582 584 if '.hgtags' in x:
583 585 raise util.Abort(_('working copy of .hgtags is changed '
584 586 '(please commit .hgtags manually)'))
585 587
586 588 self.tags() # instantiate the cache
587 589 self._tag(names, node, message, local, user, date, editor=editor)
588 590
589 591 @filteredpropertycache
590 592 def _tagscache(self):
591 593 '''Returns a tagscache object that contains various tags related
592 594 caches.'''
593 595
594 596 # This simplifies its cache management by having one decorated
595 597 # function (this one) and the rest simply fetch things from it.
596 598 class tagscache(object):
597 599 def __init__(self):
598 600 # These two define the set of tags for this repository. tags
599 601 # maps tag name to node; tagtypes maps tag name to 'global' or
600 602 # 'local'. (Global tags are defined by .hgtags across all
601 603 # heads, and local tags are defined in .hg/localtags.)
602 604 # They constitute the in-memory cache of tags.
603 605 self.tags = self.tagtypes = None
604 606
605 607 self.nodetagscache = self.tagslist = None
606 608
607 609 cache = tagscache()
608 610 cache.tags, cache.tagtypes = self._findtags()
609 611
610 612 return cache
611 613
612 614 def tags(self):
613 615 '''return a mapping of tag to node'''
614 616 t = {}
615 617 if self.changelog.filteredrevs:
616 618 tags, tt = self._findtags()
617 619 else:
618 620 tags = self._tagscache.tags
619 621 for k, v in tags.iteritems():
620 622 try:
621 623 # ignore tags to unknown nodes
622 624 self.changelog.rev(v)
623 625 t[k] = v
624 626 except (error.LookupError, ValueError):
625 627 pass
626 628 return t
627 629
628 630 def _findtags(self):
629 631 '''Do the hard work of finding tags. Return a pair of dicts
630 632 (tags, tagtypes) where tags maps tag name to node, and tagtypes
631 633 maps tag name to a string like \'global\' or \'local\'.
632 634 Subclasses or extensions are free to add their own tags, but
633 635 should be aware that the returned dicts will be retained for the
634 636 duration of the localrepo object.'''
635 637
636 638 # XXX what tagtype should subclasses/extensions use? Currently
637 639 # mq and bookmarks add tags, but do not set the tagtype at all.
638 640 # Should each extension invent its own tag type? Should there
639 641 # be one tagtype for all such "virtual" tags? Or is the status
640 642 # quo fine?
641 643
642 644 alltags = {} # map tag name to (node, hist)
643 645 tagtypes = {}
644 646
645 647 tagsmod.findglobaltags(self.ui, self, alltags, tagtypes)
646 648 tagsmod.readlocaltags(self.ui, self, alltags, tagtypes)
647 649
648 650 # Build the return dicts. Have to re-encode tag names because
649 651 # the tags module always uses UTF-8 (in order not to lose info
650 652 # writing to the cache), but the rest of Mercurial wants them in
651 653 # local encoding.
652 654 tags = {}
653 655 for (name, (node, hist)) in alltags.iteritems():
654 656 if node != nullid:
655 657 tags[encoding.tolocal(name)] = node
656 658 tags['tip'] = self.changelog.tip()
657 659 tagtypes = dict([(encoding.tolocal(name), value)
658 660 for (name, value) in tagtypes.iteritems()])
659 661 return (tags, tagtypes)
660 662
661 663 def tagtype(self, tagname):
662 664 '''
663 665 return the type of the given tag. result can be:
664 666
665 667 'local' : a local tag
666 668 'global' : a global tag
667 669 None : tag does not exist
668 670 '''
669 671
670 672 return self._tagscache.tagtypes.get(tagname)
671 673
672 674 def tagslist(self):
673 675 '''return a list of tags ordered by revision'''
674 676 if not self._tagscache.tagslist:
675 677 l = []
676 678 for t, n in self.tags().iteritems():
677 679 r = self.changelog.rev(n)
678 680 l.append((r, t, n))
679 681 self._tagscache.tagslist = [(t, n) for r, t, n in sorted(l)]
680 682
681 683 return self._tagscache.tagslist
682 684
683 685 def nodetags(self, node):
684 686 '''return the tags associated with a node'''
685 687 if not self._tagscache.nodetagscache:
686 688 nodetagscache = {}
687 689 for t, n in self._tagscache.tags.iteritems():
688 690 nodetagscache.setdefault(n, []).append(t)
689 691 for tags in nodetagscache.itervalues():
690 692 tags.sort()
691 693 self._tagscache.nodetagscache = nodetagscache
692 694 return self._tagscache.nodetagscache.get(node, [])
693 695
694 696 def nodebookmarks(self, node):
695 697 marks = []
696 698 for bookmark, n in self._bookmarks.iteritems():
697 699 if n == node:
698 700 marks.append(bookmark)
699 701 return sorted(marks)
700 702
701 703 def branchmap(self):
702 704 '''returns a dictionary {branch: [branchheads]} with branchheads
703 705 ordered by increasing revision number'''
704 706 branchmap.updatecache(self)
705 707 return self._branchcaches[self.filtername]
706 708
707 709 def branchtip(self, branch):
708 710 '''return the tip node for a given branch'''
709 711 try:
710 712 return self.branchmap().branchtip(branch)
711 713 except KeyError:
712 714 raise error.RepoLookupError(_("unknown branch '%s'") % branch)
713 715
714 716 def lookup(self, key):
715 717 return self[key].node()
716 718
717 719 def lookupbranch(self, key, remote=None):
718 720 repo = remote or self
719 721 if key in repo.branchmap():
720 722 return key
721 723
722 724 repo = (remote and remote.local()) and remote or self
723 725 return repo[key].branch()
724 726
725 727 def known(self, nodes):
726 728 nm = self.changelog.nodemap
727 729 pc = self._phasecache
728 730 result = []
729 731 for n in nodes:
730 732 r = nm.get(n)
731 733 resp = not (r is None or pc.phase(self, r) >= phases.secret)
732 734 result.append(resp)
733 735 return result
734 736
735 737 def local(self):
736 738 return self
737 739
738 740 def cancopy(self):
739 741 # so statichttprepo's override of local() works
740 742 if not self.local():
741 743 return False
742 744 if not self.ui.configbool('phases', 'publish', True):
743 745 return True
744 746 # if publishing we can't copy if there is filtered content
745 747 return not self.filtered('visible').changelog.filteredrevs
746 748
747 749 def join(self, f):
748 750 return os.path.join(self.path, f)
749 751
750 752 def wjoin(self, f):
751 753 return os.path.join(self.root, f)
752 754
753 755 def file(self, f):
754 756 if f[0] == '/':
755 757 f = f[1:]
756 758 return filelog.filelog(self.sopener, f)
757 759
758 760 def changectx(self, changeid):
759 761 return self[changeid]
760 762
761 763 def parents(self, changeid=None):
762 764 '''get list of changectxs for parents of changeid'''
763 765 return self[changeid].parents()
764 766
765 767 def setparents(self, p1, p2=nullid):
766 768 copies = self.dirstate.setparents(p1, p2)
767 769 pctx = self[p1]
768 770 if copies:
769 771 # Adjust copy records, the dirstate cannot do it, it
770 772 # requires access to parents manifests. Preserve them
771 773 # only for entries added to first parent.
772 774 for f in copies:
773 775 if f not in pctx and copies[f] in pctx:
774 776 self.dirstate.copy(copies[f], f)
775 777 if p2 == nullid:
776 778 for f, s in sorted(self.dirstate.copies().items()):
777 779 if f not in pctx and s not in pctx:
778 780 self.dirstate.copy(None, f)
779 781
780 782 def filectx(self, path, changeid=None, fileid=None):
781 783 """changeid can be a changeset revision, node, or tag.
782 784 fileid can be a file revision or node."""
783 785 return context.filectx(self, path, changeid, fileid)
784 786
785 787 def getcwd(self):
786 788 return self.dirstate.getcwd()
787 789
788 790 def pathto(self, f, cwd=None):
789 791 return self.dirstate.pathto(f, cwd)
790 792
791 793 def wfile(self, f, mode='r'):
792 794 return self.wopener(f, mode)
793 795
794 796 def _link(self, f):
795 797 return self.wvfs.islink(f)
796 798
797 799 def _loadfilter(self, filter):
798 800 if filter not in self.filterpats:
799 801 l = []
800 802 for pat, cmd in self.ui.configitems(filter):
801 803 if cmd == '!':
802 804 continue
803 805 mf = matchmod.match(self.root, '', [pat])
804 806 fn = None
805 807 params = cmd
806 808 for name, filterfn in self._datafilters.iteritems():
807 809 if cmd.startswith(name):
808 810 fn = filterfn
809 811 params = cmd[len(name):].lstrip()
810 812 break
811 813 if not fn:
812 814 fn = lambda s, c, **kwargs: util.filter(s, c)
813 815 # Wrap old filters not supporting keyword arguments
814 816 if not inspect.getargspec(fn)[2]:
815 817 oldfn = fn
816 818 fn = lambda s, c, **kwargs: oldfn(s, c)
817 819 l.append((mf, fn, params))
818 820 self.filterpats[filter] = l
819 821 return self.filterpats[filter]
820 822
821 823 def _filter(self, filterpats, filename, data):
822 824 for mf, fn, cmd in filterpats:
823 825 if mf(filename):
824 826 self.ui.debug("filtering %s through %s\n" % (filename, cmd))
825 827 data = fn(data, cmd, ui=self.ui, repo=self, filename=filename)
826 828 break
827 829
828 830 return data
829 831
830 832 @unfilteredpropertycache
831 833 def _encodefilterpats(self):
832 834 return self._loadfilter('encode')
833 835
834 836 @unfilteredpropertycache
835 837 def _decodefilterpats(self):
836 838 return self._loadfilter('decode')
837 839
838 840 def adddatafilter(self, name, filter):
839 841 self._datafilters[name] = filter
840 842
841 843 def wread(self, filename):
842 844 if self._link(filename):
843 845 data = self.wvfs.readlink(filename)
844 846 else:
845 847 data = self.wopener.read(filename)
846 848 return self._filter(self._encodefilterpats, filename, data)
847 849
848 850 def wwrite(self, filename, data, flags):
849 851 data = self._filter(self._decodefilterpats, filename, data)
850 852 if 'l' in flags:
851 853 self.wopener.symlink(data, filename)
852 854 else:
853 855 self.wopener.write(filename, data)
854 856 if 'x' in flags:
855 857 self.wvfs.setflags(filename, False, True)
856 858
857 859 def wwritedata(self, filename, data):
858 860 return self._filter(self._decodefilterpats, filename, data)
859 861
860 862 def transaction(self, desc, report=None):
861 863 tr = self._transref and self._transref() or None
862 864 if tr and tr.running():
863 865 return tr.nest()
864 866
865 867 # abort here if the journal already exists
866 868 if self.svfs.exists("journal"):
867 869 raise error.RepoError(
868 870 _("abandoned transaction found"),
869 871 hint=_("run 'hg recover' to clean up transaction"))
870 872
871 873 def onclose():
872 874 self.store.write(self._transref())
873 875
874 876 self._writejournal(desc)
875 877 renames = [(vfs, x, undoname(x)) for vfs, x in self._journalfiles()]
876 878 rp = report and report or self.ui.warn
877 879 tr = transaction.transaction(rp, self.sopener,
878 880 "journal",
879 881 aftertrans(renames),
880 882 self.store.createmode,
881 883 onclose)
882 884 self._transref = weakref.ref(tr)
883 885 return tr
884 886
885 887 def _journalfiles(self):
886 888 return ((self.svfs, 'journal'),
887 889 (self.vfs, 'journal.dirstate'),
888 890 (self.vfs, 'journal.branch'),
889 891 (self.vfs, 'journal.desc'),
890 892 (self.vfs, 'journal.bookmarks'),
891 893 (self.svfs, 'journal.phaseroots'))
892 894
893 895 def undofiles(self):
894 896 return [(vfs, undoname(x)) for vfs, x in self._journalfiles()]
895 897
896 898 def _writejournal(self, desc):
897 899 self.opener.write("journal.dirstate",
898 900 self.opener.tryread("dirstate"))
899 901 self.opener.write("journal.branch",
900 902 encoding.fromlocal(self.dirstate.branch()))
901 903 self.opener.write("journal.desc",
902 904 "%d\n%s\n" % (len(self), desc))
903 905 self.opener.write("journal.bookmarks",
904 906 self.opener.tryread("bookmarks"))
905 907 self.sopener.write("journal.phaseroots",
906 908 self.sopener.tryread("phaseroots"))
907 909
908 910 def recover(self):
909 911 lock = self.lock()
910 912 try:
911 913 if self.svfs.exists("journal"):
912 914 self.ui.status(_("rolling back interrupted transaction\n"))
913 915 transaction.rollback(self.sopener, "journal",
914 916 self.ui.warn)
915 917 self.invalidate()
916 918 return True
917 919 else:
918 920 self.ui.warn(_("no interrupted transaction available\n"))
919 921 return False
920 922 finally:
921 923 lock.release()
922 924
923 925 def rollback(self, dryrun=False, force=False):
924 926 wlock = lock = None
925 927 try:
926 928 wlock = self.wlock()
927 929 lock = self.lock()
928 930 if self.svfs.exists("undo"):
929 931 return self._rollback(dryrun, force)
930 932 else:
931 933 self.ui.warn(_("no rollback information available\n"))
932 934 return 1
933 935 finally:
934 936 release(lock, wlock)
935 937
936 938 @unfilteredmethod # Until we get smarter cache management
937 939 def _rollback(self, dryrun, force):
938 940 ui = self.ui
939 941 try:
940 942 args = self.opener.read('undo.desc').splitlines()
941 943 (oldlen, desc, detail) = (int(args[0]), args[1], None)
942 944 if len(args) >= 3:
943 945 detail = args[2]
944 946 oldtip = oldlen - 1
945 947
946 948 if detail and ui.verbose:
947 949 msg = (_('repository tip rolled back to revision %s'
948 950 ' (undo %s: %s)\n')
949 951 % (oldtip, desc, detail))
950 952 else:
951 953 msg = (_('repository tip rolled back to revision %s'
952 954 ' (undo %s)\n')
953 955 % (oldtip, desc))
954 956 except IOError:
955 957 msg = _('rolling back unknown transaction\n')
956 958 desc = None
957 959
958 960 if not force and self['.'] != self['tip'] and desc == 'commit':
959 961 raise util.Abort(
960 962 _('rollback of last commit while not checked out '
961 963 'may lose data'), hint=_('use -f to force'))
962 964
963 965 ui.status(msg)
964 966 if dryrun:
965 967 return 0
966 968
967 969 parents = self.dirstate.parents()
968 970 self.destroying()
969 971 transaction.rollback(self.sopener, 'undo', ui.warn)
970 972 if self.vfs.exists('undo.bookmarks'):
971 973 self.vfs.rename('undo.bookmarks', 'bookmarks')
972 974 if self.svfs.exists('undo.phaseroots'):
973 975 self.svfs.rename('undo.phaseroots', 'phaseroots')
974 976 self.invalidate()
975 977
976 978 parentgone = (parents[0] not in self.changelog.nodemap or
977 979 parents[1] not in self.changelog.nodemap)
978 980 if parentgone:
979 981 self.vfs.rename('undo.dirstate', 'dirstate')
980 982 try:
981 983 branch = self.opener.read('undo.branch')
982 984 self.dirstate.setbranch(encoding.tolocal(branch))
983 985 except IOError:
984 986 ui.warn(_('named branch could not be reset: '
985 987 'current branch is still \'%s\'\n')
986 988 % self.dirstate.branch())
987 989
988 990 self.dirstate.invalidate()
989 991 parents = tuple([p.rev() for p in self.parents()])
990 992 if len(parents) > 1:
991 993 ui.status(_('working directory now based on '
992 994 'revisions %d and %d\n') % parents)
993 995 else:
994 996 ui.status(_('working directory now based on '
995 997 'revision %d\n') % parents)
996 998 # TODO: if we know which new heads may result from this rollback, pass
997 999 # them to destroy(), which will prevent the branchhead cache from being
998 1000 # invalidated.
999 1001 self.destroyed()
1000 1002 return 0
1001 1003
1002 1004 def invalidatecaches(self):
1003 1005
1004 1006 if '_tagscache' in vars(self):
1005 1007 # can't use delattr on proxy
1006 1008 del self.__dict__['_tagscache']
1007 1009
1008 1010 self.unfiltered()._branchcaches.clear()
1009 1011 self.invalidatevolatilesets()
1010 1012
1011 1013 def invalidatevolatilesets(self):
1012 1014 self.filteredrevcache.clear()
1013 1015 obsolete.clearobscaches(self)
1014 1016
1015 1017 def invalidatedirstate(self):
1016 1018 '''Invalidates the dirstate, causing the next call to dirstate
1017 1019 to check if it was modified since the last time it was read,
1018 1020 rereading it if it has.
1019 1021
1020 1022 This is different to dirstate.invalidate() that it doesn't always
1021 1023 rereads the dirstate. Use dirstate.invalidate() if you want to
1022 1024 explicitly read the dirstate again (i.e. restoring it to a previous
1023 1025 known good state).'''
1024 1026 if hasunfilteredcache(self, 'dirstate'):
1025 1027 for k in self.dirstate._filecache:
1026 1028 try:
1027 1029 delattr(self.dirstate, k)
1028 1030 except AttributeError:
1029 1031 pass
1030 1032 delattr(self.unfiltered(), 'dirstate')
1031 1033
1032 1034 def invalidate(self):
1033 1035 unfiltered = self.unfiltered() # all file caches are stored unfiltered
1034 1036 for k in self._filecache:
1035 1037 # dirstate is invalidated separately in invalidatedirstate()
1036 1038 if k == 'dirstate':
1037 1039 continue
1038 1040
1039 1041 try:
1040 1042 delattr(unfiltered, k)
1041 1043 except AttributeError:
1042 1044 pass
1043 1045 self.invalidatecaches()
1044 1046 self.store.invalidatecaches()
1045 1047
1046 1048 def invalidateall(self):
1047 1049 '''Fully invalidates both store and non-store parts, causing the
1048 1050 subsequent operation to reread any outside changes.'''
1049 1051 # extension should hook this to invalidate its caches
1050 1052 self.invalidate()
1051 1053 self.invalidatedirstate()
1052 1054
1053 1055 def _lock(self, vfs, lockname, wait, releasefn, acquirefn, desc):
1054 1056 try:
1055 1057 l = lockmod.lock(vfs, lockname, 0, releasefn, desc=desc)
1056 1058 except error.LockHeld, inst:
1057 1059 if not wait:
1058 1060 raise
1059 1061 self.ui.warn(_("waiting for lock on %s held by %r\n") %
1060 1062 (desc, inst.locker))
1061 1063 # default to 600 seconds timeout
1062 1064 l = lockmod.lock(vfs, lockname,
1063 1065 int(self.ui.config("ui", "timeout", "600")),
1064 1066 releasefn, desc=desc)
1065 1067 self.ui.warn(_("got lock after %s seconds\n") % l.delay)
1066 1068 if acquirefn:
1067 1069 acquirefn()
1068 1070 return l
1069 1071
1070 1072 def _afterlock(self, callback):
1071 1073 """add a callback to the current repository lock.
1072 1074
1073 1075 The callback will be executed on lock release."""
1074 1076 l = self._lockref and self._lockref()
1075 1077 if l:
1076 1078 l.postrelease.append(callback)
1077 1079 else:
1078 1080 callback()
1079 1081
1080 1082 def lock(self, wait=True):
1081 1083 '''Lock the repository store (.hg/store) and return a weak reference
1082 1084 to the lock. Use this before modifying the store (e.g. committing or
1083 1085 stripping). If you are opening a transaction, get a lock as well.)'''
1084 1086 l = self._lockref and self._lockref()
1085 1087 if l is not None and l.held:
1086 1088 l.lock()
1087 1089 return l
1088 1090
1089 1091 def unlock():
1090 1092 if hasunfilteredcache(self, '_phasecache'):
1091 1093 self._phasecache.write()
1092 1094 for k, ce in self._filecache.items():
1093 1095 if k == 'dirstate' or k not in self.__dict__:
1094 1096 continue
1095 1097 ce.refresh()
1096 1098
1097 1099 l = self._lock(self.svfs, "lock", wait, unlock,
1098 1100 self.invalidate, _('repository %s') % self.origroot)
1099 1101 self._lockref = weakref.ref(l)
1100 1102 return l
1101 1103
1102 1104 def wlock(self, wait=True):
1103 1105 '''Lock the non-store parts of the repository (everything under
1104 1106 .hg except .hg/store) and return a weak reference to the lock.
1105 1107 Use this before modifying files in .hg.'''
1106 1108 l = self._wlockref and self._wlockref()
1107 1109 if l is not None and l.held:
1108 1110 l.lock()
1109 1111 return l
1110 1112
1111 1113 def unlock():
1112 1114 self.dirstate.write()
1113 1115 self._filecache['dirstate'].refresh()
1114 1116
1115 1117 l = self._lock(self.vfs, "wlock", wait, unlock,
1116 1118 self.invalidatedirstate, _('working directory of %s') %
1117 1119 self.origroot)
1118 1120 self._wlockref = weakref.ref(l)
1119 1121 return l
1120 1122
1121 1123 def _filecommit(self, fctx, manifest1, manifest2, linkrev, tr, changelist):
1122 1124 """
1123 1125 commit an individual file as part of a larger transaction
1124 1126 """
1125 1127
1126 1128 fname = fctx.path()
1127 1129 text = fctx.data()
1128 1130 flog = self.file(fname)
1129 1131 fparent1 = manifest1.get(fname, nullid)
1130 1132 fparent2 = fparent2o = manifest2.get(fname, nullid)
1131 1133
1132 1134 meta = {}
1133 1135 copy = fctx.renamed()
1134 1136 if copy and copy[0] != fname:
1135 1137 # Mark the new revision of this file as a copy of another
1136 1138 # file. This copy data will effectively act as a parent
1137 1139 # of this new revision. If this is a merge, the first
1138 1140 # parent will be the nullid (meaning "look up the copy data")
1139 1141 # and the second one will be the other parent. For example:
1140 1142 #
1141 1143 # 0 --- 1 --- 3 rev1 changes file foo
1142 1144 # \ / rev2 renames foo to bar and changes it
1143 1145 # \- 2 -/ rev3 should have bar with all changes and
1144 1146 # should record that bar descends from
1145 1147 # bar in rev2 and foo in rev1
1146 1148 #
1147 1149 # this allows this merge to succeed:
1148 1150 #
1149 1151 # 0 --- 1 --- 3 rev4 reverts the content change from rev2
1150 1152 # \ / merging rev3 and rev4 should use bar@rev2
1151 1153 # \- 2 --- 4 as the merge base
1152 1154 #
1153 1155
1154 1156 cfname = copy[0]
1155 1157 crev = manifest1.get(cfname)
1156 1158 newfparent = fparent2
1157 1159
1158 1160 if manifest2: # branch merge
1159 1161 if fparent2 == nullid or crev is None: # copied on remote side
1160 1162 if cfname in manifest2:
1161 1163 crev = manifest2[cfname]
1162 1164 newfparent = fparent1
1163 1165
1164 1166 # find source in nearest ancestor if we've lost track
1165 1167 if not crev:
1166 1168 self.ui.debug(" %s: searching for copy revision for %s\n" %
1167 1169 (fname, cfname))
1168 1170 for ancestor in self[None].ancestors():
1169 1171 if cfname in ancestor:
1170 1172 crev = ancestor[cfname].filenode()
1171 1173 break
1172 1174
1173 1175 if crev:
1174 1176 self.ui.debug(" %s: copy %s:%s\n" % (fname, cfname, hex(crev)))
1175 1177 meta["copy"] = cfname
1176 1178 meta["copyrev"] = hex(crev)
1177 1179 fparent1, fparent2 = nullid, newfparent
1178 1180 else:
1179 1181 self.ui.warn(_("warning: can't find ancestor for '%s' "
1180 1182 "copied from '%s'!\n") % (fname, cfname))
1181 1183
1182 1184 elif fparent1 == nullid:
1183 1185 fparent1, fparent2 = fparent2, nullid
1184 1186 elif fparent2 != nullid:
1185 1187 # is one parent an ancestor of the other?
1186 1188 fparentancestors = flog.commonancestorsheads(fparent1, fparent2)
1187 1189 if fparent1 in fparentancestors:
1188 1190 fparent1, fparent2 = fparent2, nullid
1189 1191 elif fparent2 in fparentancestors:
1190 1192 fparent2 = nullid
1191 1193
1192 1194 # is the file changed?
1193 1195 if fparent2 != nullid or flog.cmp(fparent1, text) or meta:
1194 1196 changelist.append(fname)
1195 1197 return flog.add(text, meta, tr, linkrev, fparent1, fparent2)
1196 1198
1197 1199 # are just the flags changed during merge?
1198 1200 if fparent1 != fparent2o and manifest1.flags(fname) != fctx.flags():
1199 1201 changelist.append(fname)
1200 1202
1201 1203 return fparent1
1202 1204
1203 1205 @unfilteredmethod
1204 1206 def commit(self, text="", user=None, date=None, match=None, force=False,
1205 1207 editor=False, extra={}):
1206 1208 """Add a new revision to current repository.
1207 1209
1208 1210 Revision information is gathered from the working directory,
1209 1211 match can be used to filter the committed files. If editor is
1210 1212 supplied, it is called to get a commit message.
1211 1213 """
1212 1214
1213 1215 def fail(f, msg):
1214 1216 raise util.Abort('%s: %s' % (f, msg))
1215 1217
1216 1218 if not match:
1217 1219 match = matchmod.always(self.root, '')
1218 1220
1219 1221 if not force:
1220 1222 vdirs = []
1221 1223 match.explicitdir = vdirs.append
1222 1224 match.bad = fail
1223 1225
1224 1226 wlock = self.wlock()
1225 1227 try:
1226 1228 wctx = self[None]
1227 1229 merge = len(wctx.parents()) > 1
1228 1230
1229 1231 if (not force and merge and match and
1230 1232 (match.files() or match.anypats())):
1231 1233 raise util.Abort(_('cannot partially commit a merge '
1232 1234 '(do not specify files or patterns)'))
1233 1235
1234 1236 changes = self.status(match=match, clean=force)
1235 1237 if force:
1236 1238 changes[0].extend(changes[6]) # mq may commit unchanged files
1237 1239
1238 1240 # check subrepos
1239 1241 subs = []
1240 1242 commitsubs = set()
1241 1243 newstate = wctx.substate.copy()
1242 1244 # only manage subrepos and .hgsubstate if .hgsub is present
1243 1245 if '.hgsub' in wctx:
1244 1246 # we'll decide whether to track this ourselves, thanks
1245 1247 for c in changes[:3]:
1246 1248 if '.hgsubstate' in c:
1247 1249 c.remove('.hgsubstate')
1248 1250
1249 1251 # compare current state to last committed state
1250 1252 # build new substate based on last committed state
1251 1253 oldstate = wctx.p1().substate
1252 1254 for s in sorted(newstate.keys()):
1253 1255 if not match(s):
1254 1256 # ignore working copy, use old state if present
1255 1257 if s in oldstate:
1256 1258 newstate[s] = oldstate[s]
1257 1259 continue
1258 1260 if not force:
1259 1261 raise util.Abort(
1260 1262 _("commit with new subrepo %s excluded") % s)
1261 1263 if wctx.sub(s).dirty(True):
1262 1264 if not self.ui.configbool('ui', 'commitsubrepos'):
1263 1265 raise util.Abort(
1264 1266 _("uncommitted changes in subrepo %s") % s,
1265 1267 hint=_("use --subrepos for recursive commit"))
1266 1268 subs.append(s)
1267 1269 commitsubs.add(s)
1268 1270 else:
1269 1271 bs = wctx.sub(s).basestate()
1270 1272 newstate[s] = (newstate[s][0], bs, newstate[s][2])
1271 1273 if oldstate.get(s, (None, None, None))[1] != bs:
1272 1274 subs.append(s)
1273 1275
1274 1276 # check for removed subrepos
1275 1277 for p in wctx.parents():
1276 1278 r = [s for s in p.substate if s not in newstate]
1277 1279 subs += [s for s in r if match(s)]
1278 1280 if subs:
1279 1281 if (not match('.hgsub') and
1280 1282 '.hgsub' in (wctx.modified() + wctx.added())):
1281 1283 raise util.Abort(
1282 1284 _("can't commit subrepos without .hgsub"))
1283 1285 changes[0].insert(0, '.hgsubstate')
1284 1286
1285 1287 elif '.hgsub' in changes[2]:
1286 1288 # clean up .hgsubstate when .hgsub is removed
1287 1289 if ('.hgsubstate' in wctx and
1288 1290 '.hgsubstate' not in changes[0] + changes[1] + changes[2]):
1289 1291 changes[2].insert(0, '.hgsubstate')
1290 1292
1291 1293 # make sure all explicit patterns are matched
1292 1294 if not force and match.files():
1293 1295 matched = set(changes[0] + changes[1] + changes[2])
1294 1296
1295 1297 for f in match.files():
1296 1298 f = self.dirstate.normalize(f)
1297 1299 if f == '.' or f in matched or f in wctx.substate:
1298 1300 continue
1299 1301 if f in changes[3]: # missing
1300 1302 fail(f, _('file not found!'))
1301 1303 if f in vdirs: # visited directory
1302 1304 d = f + '/'
1303 1305 for mf in matched:
1304 1306 if mf.startswith(d):
1305 1307 break
1306 1308 else:
1307 1309 fail(f, _("no match under directory!"))
1308 1310 elif f not in self.dirstate:
1309 1311 fail(f, _("file not tracked!"))
1310 1312
1311 1313 cctx = context.workingctx(self, text, user, date, extra, changes)
1312 1314
1313 1315 if (not force and not extra.get("close") and not merge
1314 1316 and not cctx.files()
1315 1317 and wctx.branch() == wctx.p1().branch()):
1316 1318 return None
1317 1319
1318 1320 if merge and cctx.deleted():
1319 1321 raise util.Abort(_("cannot commit merge with missing files"))
1320 1322
1321 1323 ms = mergemod.mergestate(self)
1322 1324 for f in changes[0]:
1323 1325 if f in ms and ms[f] == 'u':
1324 1326 raise util.Abort(_("unresolved merge conflicts "
1325 1327 "(see hg help resolve)"))
1326 1328
1327 1329 if editor:
1328 1330 cctx._text = editor(self, cctx, subs)
1329 1331 edited = (text != cctx._text)
1330 1332
1331 1333 # Save commit message in case this transaction gets rolled back
1332 1334 # (e.g. by a pretxncommit hook). Leave the content alone on
1333 1335 # the assumption that the user will use the same editor again.
1334 1336 msgfn = self.savecommitmessage(cctx._text)
1335 1337
1336 1338 # commit subs and write new state
1337 1339 if subs:
1338 1340 for s in sorted(commitsubs):
1339 1341 sub = wctx.sub(s)
1340 1342 self.ui.status(_('committing subrepository %s\n') %
1341 1343 subrepo.subrelpath(sub))
1342 1344 sr = sub.commit(cctx._text, user, date)
1343 1345 newstate[s] = (newstate[s][0], sr)
1344 1346 subrepo.writestate(self, newstate)
1345 1347
1346 1348 p1, p2 = self.dirstate.parents()
1347 1349 hookp1, hookp2 = hex(p1), (p2 != nullid and hex(p2) or '')
1348 1350 try:
1349 1351 self.hook("precommit", throw=True, parent1=hookp1,
1350 1352 parent2=hookp2)
1351 1353 ret = self.commitctx(cctx, True)
1352 1354 except: # re-raises
1353 1355 if edited:
1354 1356 self.ui.write(
1355 1357 _('note: commit message saved in %s\n') % msgfn)
1356 1358 raise
1357 1359
1358 1360 # update bookmarks, dirstate and mergestate
1359 1361 bookmarks.update(self, [p1, p2], ret)
1360 1362 cctx.markcommitted(ret)
1361 1363 ms.reset()
1362 1364 finally:
1363 1365 wlock.release()
1364 1366
1365 1367 def commithook(node=hex(ret), parent1=hookp1, parent2=hookp2):
1366 1368 self.hook("commit", node=node, parent1=parent1, parent2=parent2)
1367 1369 self._afterlock(commithook)
1368 1370 return ret
1369 1371
1370 1372 @unfilteredmethod
1371 1373 def commitctx(self, ctx, error=False):
1372 1374 """Add a new revision to current repository.
1373 1375 Revision information is passed via the context argument.
1374 1376 """
1375 1377
1376 1378 tr = lock = None
1377 1379 removed = list(ctx.removed())
1378 1380 p1, p2 = ctx.p1(), ctx.p2()
1379 1381 user = ctx.user()
1380 1382
1381 1383 lock = self.lock()
1382 1384 try:
1383 1385 tr = self.transaction("commit")
1384 1386 trp = weakref.proxy(tr)
1385 1387
1386 1388 if ctx.files():
1387 1389 m1 = p1.manifest().copy()
1388 1390 m2 = p2.manifest()
1389 1391
1390 1392 # check in files
1391 1393 new = {}
1392 1394 changed = []
1393 1395 linkrev = len(self)
1394 1396 for f in sorted(ctx.modified() + ctx.added()):
1395 1397 self.ui.note(f + "\n")
1396 1398 try:
1397 1399 fctx = ctx[f]
1398 1400 new[f] = self._filecommit(fctx, m1, m2, linkrev, trp,
1399 1401 changed)
1400 1402 m1.set(f, fctx.flags())
1401 1403 except OSError, inst:
1402 1404 self.ui.warn(_("trouble committing %s!\n") % f)
1403 1405 raise
1404 1406 except IOError, inst:
1405 1407 errcode = getattr(inst, 'errno', errno.ENOENT)
1406 1408 if error or errcode and errcode != errno.ENOENT:
1407 1409 self.ui.warn(_("trouble committing %s!\n") % f)
1408 1410 raise
1409 1411 else:
1410 1412 removed.append(f)
1411 1413
1412 1414 # update manifest
1413 1415 m1.update(new)
1414 1416 removed = [f for f in sorted(removed) if f in m1 or f in m2]
1415 1417 drop = [f for f in removed if f in m1]
1416 1418 for f in drop:
1417 1419 del m1[f]
1418 1420 mn = self.manifest.add(m1, trp, linkrev, p1.manifestnode(),
1419 1421 p2.manifestnode(), (new, drop))
1420 1422 files = changed + removed
1421 1423 else:
1422 1424 mn = p1.manifestnode()
1423 1425 files = []
1424 1426
1425 1427 # update changelog
1426 1428 self.changelog.delayupdate()
1427 1429 n = self.changelog.add(mn, files, ctx.description(),
1428 1430 trp, p1.node(), p2.node(),
1429 1431 user, ctx.date(), ctx.extra().copy())
1430 1432 p = lambda: self.changelog.writepending() and self.root or ""
1431 1433 xp1, xp2 = p1.hex(), p2 and p2.hex() or ''
1432 1434 self.hook('pretxncommit', throw=True, node=hex(n), parent1=xp1,
1433 1435 parent2=xp2, pending=p)
1434 1436 self.changelog.finalize(trp)
1435 1437 # set the new commit is proper phase
1436 1438 targetphase = subrepo.newcommitphase(self.ui, ctx)
1437 1439 if targetphase:
1438 1440 # retract boundary do not alter parent changeset.
1439 1441 # if a parent have higher the resulting phase will
1440 1442 # be compliant anyway
1441 1443 #
1442 1444 # if minimal phase was 0 we don't need to retract anything
1443 1445 phases.retractboundary(self, targetphase, [n])
1444 1446 tr.close()
1445 1447 branchmap.updatecache(self.filtered('served'))
1446 1448 return n
1447 1449 finally:
1448 1450 if tr:
1449 1451 tr.release()
1450 1452 lock.release()
1451 1453
1452 1454 @unfilteredmethod
1453 1455 def destroying(self):
1454 1456 '''Inform the repository that nodes are about to be destroyed.
1455 1457 Intended for use by strip and rollback, so there's a common
1456 1458 place for anything that has to be done before destroying history.
1457 1459
1458 1460 This is mostly useful for saving state that is in memory and waiting
1459 1461 to be flushed when the current lock is released. Because a call to
1460 1462 destroyed is imminent, the repo will be invalidated causing those
1461 1463 changes to stay in memory (waiting for the next unlock), or vanish
1462 1464 completely.
1463 1465 '''
1464 1466 # When using the same lock to commit and strip, the phasecache is left
1465 1467 # dirty after committing. Then when we strip, the repo is invalidated,
1466 1468 # causing those changes to disappear.
1467 1469 if '_phasecache' in vars(self):
1468 1470 self._phasecache.write()
1469 1471
1470 1472 @unfilteredmethod
1471 1473 def destroyed(self):
1472 1474 '''Inform the repository that nodes have been destroyed.
1473 1475 Intended for use by strip and rollback, so there's a common
1474 1476 place for anything that has to be done after destroying history.
1475 1477 '''
1476 1478 # When one tries to:
1477 1479 # 1) destroy nodes thus calling this method (e.g. strip)
1478 1480 # 2) use phasecache somewhere (e.g. commit)
1479 1481 #
1480 1482 # then 2) will fail because the phasecache contains nodes that were
1481 1483 # removed. We can either remove phasecache from the filecache,
1482 1484 # causing it to reload next time it is accessed, or simply filter
1483 1485 # the removed nodes now and write the updated cache.
1484 1486 self._phasecache.filterunknown(self)
1485 1487 self._phasecache.write()
1486 1488
1487 1489 # update the 'served' branch cache to help read only server process
1488 1490 # Thanks to branchcache collaboration this is done from the nearest
1489 1491 # filtered subset and it is expected to be fast.
1490 1492 branchmap.updatecache(self.filtered('served'))
1491 1493
1492 1494 # Ensure the persistent tag cache is updated. Doing it now
1493 1495 # means that the tag cache only has to worry about destroyed
1494 1496 # heads immediately after a strip/rollback. That in turn
1495 1497 # guarantees that "cachetip == currenttip" (comparing both rev
1496 1498 # and node) always means no nodes have been added or destroyed.
1497 1499
1498 1500 # XXX this is suboptimal when qrefresh'ing: we strip the current
1499 1501 # head, refresh the tag cache, then immediately add a new head.
1500 1502 # But I think doing it this way is necessary for the "instant
1501 1503 # tag cache retrieval" case to work.
1502 1504 self.invalidate()
1503 1505
1504 1506 def walk(self, match, node=None):
1505 1507 '''
1506 1508 walk recursively through the directory tree or a given
1507 1509 changeset, finding all files matched by the match
1508 1510 function
1509 1511 '''
1510 1512 return self[node].walk(match)
1511 1513
1512 1514 def status(self, node1='.', node2=None, match=None,
1513 1515 ignored=False, clean=False, unknown=False,
1514 1516 listsubrepos=False):
1515 1517 '''a convenience method that calls node1.status(node2)'''
1516 1518 return self[node1].status(node2, match, ignored, clean, unknown,
1517 1519 listsubrepos)
1518 1520
1519 1521 def heads(self, start=None):
1520 1522 heads = self.changelog.heads(start)
1521 1523 # sort the output in rev descending order
1522 1524 return sorted(heads, key=self.changelog.rev, reverse=True)
1523 1525
1524 1526 def branchheads(self, branch=None, start=None, closed=False):
1525 1527 '''return a (possibly filtered) list of heads for the given branch
1526 1528
1527 1529 Heads are returned in topological order, from newest to oldest.
1528 1530 If branch is None, use the dirstate branch.
1529 1531 If start is not None, return only heads reachable from start.
1530 1532 If closed is True, return heads that are marked as closed as well.
1531 1533 '''
1532 1534 if branch is None:
1533 1535 branch = self[None].branch()
1534 1536 branches = self.branchmap()
1535 1537 if branch not in branches:
1536 1538 return []
1537 1539 # the cache returns heads ordered lowest to highest
1538 1540 bheads = list(reversed(branches.branchheads(branch, closed=closed)))
1539 1541 if start is not None:
1540 1542 # filter out the heads that cannot be reached from startrev
1541 1543 fbheads = set(self.changelog.nodesbetween([start], bheads)[2])
1542 1544 bheads = [h for h in bheads if h in fbheads]
1543 1545 return bheads
1544 1546
1545 1547 def branches(self, nodes):
1546 1548 if not nodes:
1547 1549 nodes = [self.changelog.tip()]
1548 1550 b = []
1549 1551 for n in nodes:
1550 1552 t = n
1551 1553 while True:
1552 1554 p = self.changelog.parents(n)
1553 1555 if p[1] != nullid or p[0] == nullid:
1554 1556 b.append((t, n, p[0], p[1]))
1555 1557 break
1556 1558 n = p[0]
1557 1559 return b
1558 1560
1559 1561 def between(self, pairs):
1560 1562 r = []
1561 1563
1562 1564 for top, bottom in pairs:
1563 1565 n, l, i = top, [], 0
1564 1566 f = 1
1565 1567
1566 1568 while n != bottom and n != nullid:
1567 1569 p = self.changelog.parents(n)[0]
1568 1570 if i == f:
1569 1571 l.append(n)
1570 1572 f = f * 2
1571 1573 n = p
1572 1574 i += 1
1573 1575
1574 1576 r.append(l)
1575 1577
1576 1578 return r
1577 1579
1578 1580 def pull(self, remote, heads=None, force=False):
1579 1581 return exchange.pull (self, remote, heads, force)
1580 1582
1581 1583 def checkpush(self, pushop):
1582 1584 """Extensions can override this function if additional checks have
1583 1585 to be performed before pushing, or call it if they override push
1584 1586 command.
1585 1587 """
1586 1588 pass
1587 1589
1588 1590 @unfilteredpropertycache
1589 1591 def prepushoutgoinghooks(self):
1590 1592 """Return util.hooks consists of "(repo, remote, outgoing)"
1591 1593 functions, which are called before pushing changesets.
1592 1594 """
1593 1595 return util.hooks()
1594 1596
1595 1597 def push(self, remote, force=False, revs=None, newbranch=False):
1596 1598 return exchange.push(self, remote, force, revs, newbranch)
1597 1599
1598 1600 def stream_in(self, remote, requirements):
1599 1601 lock = self.lock()
1600 1602 try:
1601 1603 # Save remote branchmap. We will use it later
1602 1604 # to speed up branchcache creation
1603 1605 rbranchmap = None
1604 1606 if remote.capable("branchmap"):
1605 1607 rbranchmap = remote.branchmap()
1606 1608
1607 1609 fp = remote.stream_out()
1608 1610 l = fp.readline()
1609 1611 try:
1610 1612 resp = int(l)
1611 1613 except ValueError:
1612 1614 raise error.ResponseError(
1613 1615 _('unexpected response from remote server:'), l)
1614 1616 if resp == 1:
1615 1617 raise util.Abort(_('operation forbidden by server'))
1616 1618 elif resp == 2:
1617 1619 raise util.Abort(_('locking the remote repository failed'))
1618 1620 elif resp != 0:
1619 1621 raise util.Abort(_('the server sent an unknown error code'))
1620 1622 self.ui.status(_('streaming all changes\n'))
1621 1623 l = fp.readline()
1622 1624 try:
1623 1625 total_files, total_bytes = map(int, l.split(' ', 1))
1624 1626 except (ValueError, TypeError):
1625 1627 raise error.ResponseError(
1626 1628 _('unexpected response from remote server:'), l)
1627 1629 self.ui.status(_('%d files to transfer, %s of data\n') %
1628 1630 (total_files, util.bytecount(total_bytes)))
1629 1631 handled_bytes = 0
1630 1632 self.ui.progress(_('clone'), 0, total=total_bytes)
1631 1633 start = time.time()
1632 1634
1633 1635 tr = self.transaction(_('clone'))
1634 1636 try:
1635 1637 for i in xrange(total_files):
1636 1638 # XXX doesn't support '\n' or '\r' in filenames
1637 1639 l = fp.readline()
1638 1640 try:
1639 1641 name, size = l.split('\0', 1)
1640 1642 size = int(size)
1641 1643 except (ValueError, TypeError):
1642 1644 raise error.ResponseError(
1643 1645 _('unexpected response from remote server:'), l)
1644 1646 if self.ui.debugflag:
1645 1647 self.ui.debug('adding %s (%s)\n' %
1646 1648 (name, util.bytecount(size)))
1647 1649 # for backwards compat, name was partially encoded
1648 1650 ofp = self.sopener(store.decodedir(name), 'w')
1649 1651 for chunk in util.filechunkiter(fp, limit=size):
1650 1652 handled_bytes += len(chunk)
1651 1653 self.ui.progress(_('clone'), handled_bytes,
1652 1654 total=total_bytes)
1653 1655 ofp.write(chunk)
1654 1656 ofp.close()
1655 1657 tr.close()
1656 1658 finally:
1657 1659 tr.release()
1658 1660
1659 1661 # Writing straight to files circumvented the inmemory caches
1660 1662 self.invalidate()
1661 1663
1662 1664 elapsed = time.time() - start
1663 1665 if elapsed <= 0:
1664 1666 elapsed = 0.001
1665 1667 self.ui.progress(_('clone'), None)
1666 1668 self.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') %
1667 1669 (util.bytecount(total_bytes), elapsed,
1668 1670 util.bytecount(total_bytes / elapsed)))
1669 1671
1670 1672 # new requirements = old non-format requirements +
1671 1673 # new format-related
1672 1674 # requirements from the streamed-in repository
1673 1675 requirements.update(set(self.requirements) - self.supportedformats)
1674 1676 self._applyrequirements(requirements)
1675 1677 self._writerequirements()
1676 1678
1677 1679 if rbranchmap:
1678 1680 rbheads = []
1679 1681 for bheads in rbranchmap.itervalues():
1680 1682 rbheads.extend(bheads)
1681 1683
1682 1684 if rbheads:
1683 1685 rtiprev = max((int(self.changelog.rev(node))
1684 1686 for node in rbheads))
1685 1687 cache = branchmap.branchcache(rbranchmap,
1686 1688 self[rtiprev].node(),
1687 1689 rtiprev)
1688 1690 # Try to stick it as low as possible
1689 1691 # filter above served are unlikely to be fetch from a clone
1690 1692 for candidate in ('base', 'immutable', 'served'):
1691 1693 rview = self.filtered(candidate)
1692 1694 if cache.validfor(rview):
1693 1695 self._branchcaches[candidate] = cache
1694 1696 cache.write(rview)
1695 1697 break
1696 1698 self.invalidate()
1697 1699 return len(self.heads()) + 1
1698 1700 finally:
1699 1701 lock.release()
1700 1702
1701 1703 def clone(self, remote, heads=[], stream=False):
1702 1704 '''clone remote repository.
1703 1705
1704 1706 keyword arguments:
1705 1707 heads: list of revs to clone (forces use of pull)
1706 1708 stream: use streaming clone if possible'''
1707 1709
1708 1710 # now, all clients that can request uncompressed clones can
1709 1711 # read repo formats supported by all servers that can serve
1710 1712 # them.
1711 1713
1712 1714 # if revlog format changes, client will have to check version
1713 1715 # and format flags on "stream" capability, and use
1714 1716 # uncompressed only if compatible.
1715 1717
1716 1718 if not stream:
1717 1719 # if the server explicitly prefers to stream (for fast LANs)
1718 1720 stream = remote.capable('stream-preferred')
1719 1721
1720 1722 if stream and not heads:
1721 1723 # 'stream' means remote revlog format is revlogv1 only
1722 1724 if remote.capable('stream'):
1723 1725 return self.stream_in(remote, set(('revlogv1',)))
1724 1726 # otherwise, 'streamreqs' contains the remote revlog format
1725 1727 streamreqs = remote.capable('streamreqs')
1726 1728 if streamreqs:
1727 1729 streamreqs = set(streamreqs.split(','))
1728 1730 # if we support it, stream in and adjust our requirements
1729 1731 if not streamreqs - self.supportedformats:
1730 1732 return self.stream_in(remote, streamreqs)
1731 1733 return self.pull(remote, heads)
1732 1734
1733 1735 def pushkey(self, namespace, key, old, new):
1734 1736 self.hook('prepushkey', throw=True, namespace=namespace, key=key,
1735 1737 old=old, new=new)
1736 1738 self.ui.debug('pushing key for "%s:%s"\n' % (namespace, key))
1737 1739 ret = pushkey.push(self, namespace, key, old, new)
1738 1740 self.hook('pushkey', namespace=namespace, key=key, old=old, new=new,
1739 1741 ret=ret)
1740 1742 return ret
1741 1743
1742 1744 def listkeys(self, namespace):
1743 1745 self.hook('prelistkeys', throw=True, namespace=namespace)
1744 1746 self.ui.debug('listing keys for "%s"\n' % namespace)
1745 1747 values = pushkey.list(self, namespace)
1746 1748 self.hook('listkeys', namespace=namespace, values=values)
1747 1749 return values
1748 1750
1749 1751 def debugwireargs(self, one, two, three=None, four=None, five=None):
1750 1752 '''used to test argument passing over the wire'''
1751 1753 return "%s %s %s %s %s" % (one, two, three, four, five)
1752 1754
1753 1755 def savecommitmessage(self, text):
1754 1756 fp = self.opener('last-message.txt', 'wb')
1755 1757 try:
1756 1758 fp.write(text)
1757 1759 finally:
1758 1760 fp.close()
1759 1761 return self.pathto(fp.name[len(self.root) + 1:])
1760 1762
1761 1763 # used to avoid circular references so destructors work
1762 1764 def aftertrans(files):
1763 1765 renamefiles = [tuple(t) for t in files]
1764 1766 def a():
1765 1767 for vfs, src, dest in renamefiles:
1766 1768 try:
1767 1769 vfs.rename(src, dest)
1768 1770 except OSError: # journal file does not yet exist
1769 1771 pass
1770 1772 return a
1771 1773
1772 1774 def undoname(fn):
1773 1775 base, name = os.path.split(fn)
1774 1776 assert name.startswith('journal')
1775 1777 return os.path.join(base, name.replace('journal', 'undo', 1))
1776 1778
1777 1779 def instance(ui, path, create):
1778 1780 return localrepository(ui, util.urllocalpath(path), create)
1779 1781
1780 1782 def islocal(path):
1781 1783 return True
@@ -1,867 +1,868
1 1 # wireproto.py - generic wire protocol support functions
2 2 #
3 3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 import urllib, tempfile, os, sys
9 9 from i18n import _
10 10 from node import bin, hex
11 11 import changegroup as changegroupmod, bundle2, pushkey as pushkeymod
12 12 import peer, error, encoding, util, store, exchange
13 13
14 14
15 15 class abstractserverproto(object):
16 16 """abstract class that summarizes the protocol API
17 17
18 18 Used as reference and documentation.
19 19 """
20 20
21 21 def getargs(self, args):
22 22 """return the value for arguments in <args>
23 23
24 24 returns a list of values (same order as <args>)"""
25 25 raise NotImplementedError()
26 26
27 27 def getfile(self, fp):
28 28 """write the whole content of a file into a file like object
29 29
30 30 The file is in the form::
31 31
32 32 (<chunk-size>\n<chunk>)+0\n
33 33
34 34 chunk size is the ascii version of the int.
35 35 """
36 36 raise NotImplementedError()
37 37
38 38 def redirect(self):
39 39 """may setup interception for stdout and stderr
40 40
41 41 See also the `restore` method."""
42 42 raise NotImplementedError()
43 43
44 44 # If the `redirect` function does install interception, the `restore`
45 45 # function MUST be defined. If interception is not used, this function
46 46 # MUST NOT be defined.
47 47 #
48 48 # left commented here on purpose
49 49 #
50 50 #def restore(self):
51 51 # """reinstall previous stdout and stderr and return intercepted stdout
52 52 # """
53 53 # raise NotImplementedError()
54 54
55 55 def groupchunks(self, cg):
56 56 """return 4096 chunks from a changegroup object
57 57
58 58 Some protocols may have compressed the contents."""
59 59 raise NotImplementedError()
60 60
61 61 # abstract batching support
62 62
63 63 class future(object):
64 64 '''placeholder for a value to be set later'''
65 65 def set(self, value):
66 66 if util.safehasattr(self, 'value'):
67 67 raise error.RepoError("future is already set")
68 68 self.value = value
69 69
70 70 class batcher(object):
71 71 '''base class for batches of commands submittable in a single request
72 72
73 73 All methods invoked on instances of this class are simply queued and
74 74 return a a future for the result. Once you call submit(), all the queued
75 75 calls are performed and the results set in their respective futures.
76 76 '''
77 77 def __init__(self):
78 78 self.calls = []
79 79 def __getattr__(self, name):
80 80 def call(*args, **opts):
81 81 resref = future()
82 82 self.calls.append((name, args, opts, resref,))
83 83 return resref
84 84 return call
85 85 def submit(self):
86 86 pass
87 87
88 88 class localbatch(batcher):
89 89 '''performs the queued calls directly'''
90 90 def __init__(self, local):
91 91 batcher.__init__(self)
92 92 self.local = local
93 93 def submit(self):
94 94 for name, args, opts, resref in self.calls:
95 95 resref.set(getattr(self.local, name)(*args, **opts))
96 96
97 97 class remotebatch(batcher):
98 98 '''batches the queued calls; uses as few roundtrips as possible'''
99 99 def __init__(self, remote):
100 100 '''remote must support _submitbatch(encbatch) and
101 101 _submitone(op, encargs)'''
102 102 batcher.__init__(self)
103 103 self.remote = remote
104 104 def submit(self):
105 105 req, rsp = [], []
106 106 for name, args, opts, resref in self.calls:
107 107 mtd = getattr(self.remote, name)
108 108 batchablefn = getattr(mtd, 'batchable', None)
109 109 if batchablefn is not None:
110 110 batchable = batchablefn(mtd.im_self, *args, **opts)
111 111 encargsorres, encresref = batchable.next()
112 112 if encresref:
113 113 req.append((name, encargsorres,))
114 114 rsp.append((batchable, encresref, resref,))
115 115 else:
116 116 resref.set(encargsorres)
117 117 else:
118 118 if req:
119 119 self._submitreq(req, rsp)
120 120 req, rsp = [], []
121 121 resref.set(mtd(*args, **opts))
122 122 if req:
123 123 self._submitreq(req, rsp)
124 124 def _submitreq(self, req, rsp):
125 125 encresults = self.remote._submitbatch(req)
126 126 for encres, r in zip(encresults, rsp):
127 127 batchable, encresref, resref = r
128 128 encresref.set(encres)
129 129 resref.set(batchable.next())
130 130
131 131 def batchable(f):
132 132 '''annotation for batchable methods
133 133
134 134 Such methods must implement a coroutine as follows:
135 135
136 136 @batchable
137 137 def sample(self, one, two=None):
138 138 # Handle locally computable results first:
139 139 if not one:
140 140 yield "a local result", None
141 141 # Build list of encoded arguments suitable for your wire protocol:
142 142 encargs = [('one', encode(one),), ('two', encode(two),)]
143 143 # Create future for injection of encoded result:
144 144 encresref = future()
145 145 # Return encoded arguments and future:
146 146 yield encargs, encresref
147 147 # Assuming the future to be filled with the result from the batched
148 148 # request now. Decode it:
149 149 yield decode(encresref.value)
150 150
151 151 The decorator returns a function which wraps this coroutine as a plain
152 152 method, but adds the original method as an attribute called "batchable",
153 153 which is used by remotebatch to split the call into separate encoding and
154 154 decoding phases.
155 155 '''
156 156 def plain(*args, **opts):
157 157 batchable = f(*args, **opts)
158 158 encargsorres, encresref = batchable.next()
159 159 if not encresref:
160 160 return encargsorres # a local result in this case
161 161 self = args[0]
162 162 encresref.set(self._submitone(f.func_name, encargsorres))
163 163 return batchable.next()
164 164 setattr(plain, 'batchable', f)
165 165 return plain
166 166
167 167 # list of nodes encoding / decoding
168 168
169 169 def decodelist(l, sep=' '):
170 170 if l:
171 171 return map(bin, l.split(sep))
172 172 return []
173 173
174 174 def encodelist(l, sep=' '):
175 175 return sep.join(map(hex, l))
176 176
177 177 # batched call argument encoding
178 178
179 179 def escapearg(plain):
180 180 return (plain
181 181 .replace(':', '::')
182 182 .replace(',', ':,')
183 183 .replace(';', ':;')
184 184 .replace('=', ':='))
185 185
186 186 def unescapearg(escaped):
187 187 return (escaped
188 188 .replace(':=', '=')
189 189 .replace(':;', ';')
190 190 .replace(':,', ',')
191 191 .replace('::', ':'))
192 192
193 193 # mapping of options accepted by getbundle and their types
194 194 #
195 195 # Meant to be extended by extensions. It is extensions responsibility to ensure
196 196 # such options are properly processed in exchange.getbundle.
197 197 #
198 198 # supported types are:
199 199 #
200 200 # :nodes: list of binary nodes
201 201 # :csv: list of comma-separated values
202 202 # :plain: string with no transformation needed.
203 203 gboptsmap = {'heads': 'nodes',
204 204 'common': 'nodes',
205 205 'bundlecaps': 'csv',
206 'listkeys': 'csv'}
206 'listkeys': 'csv',
207 'cg': 'boolean'}
207 208
208 209 # client side
209 210
210 211 class wirepeer(peer.peerrepository):
211 212
212 213 def batch(self):
213 214 return remotebatch(self)
214 215 def _submitbatch(self, req):
215 216 cmds = []
216 217 for op, argsdict in req:
217 218 args = ','.join('%s=%s' % p for p in argsdict.iteritems())
218 219 cmds.append('%s %s' % (op, args))
219 220 rsp = self._call("batch", cmds=';'.join(cmds))
220 221 return rsp.split(';')
221 222 def _submitone(self, op, args):
222 223 return self._call(op, **args)
223 224
224 225 @batchable
225 226 def lookup(self, key):
226 227 self.requirecap('lookup', _('look up remote revision'))
227 228 f = future()
228 229 yield {'key': encoding.fromlocal(key)}, f
229 230 d = f.value
230 231 success, data = d[:-1].split(" ", 1)
231 232 if int(success):
232 233 yield bin(data)
233 234 self._abort(error.RepoError(data))
234 235
235 236 @batchable
236 237 def heads(self):
237 238 f = future()
238 239 yield {}, f
239 240 d = f.value
240 241 try:
241 242 yield decodelist(d[:-1])
242 243 except ValueError:
243 244 self._abort(error.ResponseError(_("unexpected response:"), d))
244 245
245 246 @batchable
246 247 def known(self, nodes):
247 248 f = future()
248 249 yield {'nodes': encodelist(nodes)}, f
249 250 d = f.value
250 251 try:
251 252 yield [bool(int(f)) for f in d]
252 253 except ValueError:
253 254 self._abort(error.ResponseError(_("unexpected response:"), d))
254 255
255 256 @batchable
256 257 def branchmap(self):
257 258 f = future()
258 259 yield {}, f
259 260 d = f.value
260 261 try:
261 262 branchmap = {}
262 263 for branchpart in d.splitlines():
263 264 branchname, branchheads = branchpart.split(' ', 1)
264 265 branchname = encoding.tolocal(urllib.unquote(branchname))
265 266 branchheads = decodelist(branchheads)
266 267 branchmap[branchname] = branchheads
267 268 yield branchmap
268 269 except TypeError:
269 270 self._abort(error.ResponseError(_("unexpected response:"), d))
270 271
271 272 def branches(self, nodes):
272 273 n = encodelist(nodes)
273 274 d = self._call("branches", nodes=n)
274 275 try:
275 276 br = [tuple(decodelist(b)) for b in d.splitlines()]
276 277 return br
277 278 except ValueError:
278 279 self._abort(error.ResponseError(_("unexpected response:"), d))
279 280
280 281 def between(self, pairs):
281 282 batch = 8 # avoid giant requests
282 283 r = []
283 284 for i in xrange(0, len(pairs), batch):
284 285 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
285 286 d = self._call("between", pairs=n)
286 287 try:
287 288 r.extend(l and decodelist(l) or [] for l in d.splitlines())
288 289 except ValueError:
289 290 self._abort(error.ResponseError(_("unexpected response:"), d))
290 291 return r
291 292
292 293 @batchable
293 294 def pushkey(self, namespace, key, old, new):
294 295 if not self.capable('pushkey'):
295 296 yield False, None
296 297 f = future()
297 298 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
298 299 yield {'namespace': encoding.fromlocal(namespace),
299 300 'key': encoding.fromlocal(key),
300 301 'old': encoding.fromlocal(old),
301 302 'new': encoding.fromlocal(new)}, f
302 303 d = f.value
303 304 d, output = d.split('\n', 1)
304 305 try:
305 306 d = bool(int(d))
306 307 except ValueError:
307 308 raise error.ResponseError(
308 309 _('push failed (unexpected response):'), d)
309 310 for l in output.splitlines(True):
310 311 self.ui.status(_('remote: '), l)
311 312 yield d
312 313
313 314 @batchable
314 315 def listkeys(self, namespace):
315 316 if not self.capable('pushkey'):
316 317 yield {}, None
317 318 f = future()
318 319 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
319 320 yield {'namespace': encoding.fromlocal(namespace)}, f
320 321 d = f.value
321 322 yield pushkeymod.decodekeys(d)
322 323
323 324 def stream_out(self):
324 325 return self._callstream('stream_out')
325 326
326 327 def changegroup(self, nodes, kind):
327 328 n = encodelist(nodes)
328 329 f = self._callcompressable("changegroup", roots=n)
329 330 return changegroupmod.unbundle10(f, 'UN')
330 331
331 332 def changegroupsubset(self, bases, heads, kind):
332 333 self.requirecap('changegroupsubset', _('look up remote changes'))
333 334 bases = encodelist(bases)
334 335 heads = encodelist(heads)
335 336 f = self._callcompressable("changegroupsubset",
336 337 bases=bases, heads=heads)
337 338 return changegroupmod.unbundle10(f, 'UN')
338 339
339 340 def getbundle(self, source, **kwargs):
340 341 self.requirecap('getbundle', _('look up remote changes'))
341 342 opts = {}
342 343 for key, value in kwargs.iteritems():
343 344 if value is None:
344 345 continue
345 346 keytype = gboptsmap.get(key)
346 347 if keytype is None:
347 348 assert False, 'unexpected'
348 349 elif keytype == 'nodes':
349 350 value = encodelist(value)
350 351 elif keytype == 'csv':
351 352 value = ','.join(value)
352 353 elif keytype == 'boolean':
353 354 value = bool(value)
354 355 elif keytype != 'plain':
355 356 raise KeyError('unknown getbundle option type %s'
356 357 % keytype)
357 358 opts[key] = value
358 359 f = self._callcompressable("getbundle", **opts)
359 360 bundlecaps = kwargs.get('bundlecaps')
360 361 if bundlecaps is not None and 'HG2X' in bundlecaps:
361 362 return bundle2.unbundle20(self.ui, f)
362 363 else:
363 364 return changegroupmod.unbundle10(f, 'UN')
364 365
365 366 def unbundle(self, cg, heads, source):
366 367 '''Send cg (a readable file-like object representing the
367 368 changegroup to push, typically a chunkbuffer object) to the
368 369 remote server as a bundle.
369 370
370 371 When pushing a bundle10 stream, return an integer indicating the
371 372 result of the push (see localrepository.addchangegroup()).
372 373
373 374 When pushing a bundle20 stream, return a bundle20 stream.'''
374 375
375 376 if heads != ['force'] and self.capable('unbundlehash'):
376 377 heads = encodelist(['hashed',
377 378 util.sha1(''.join(sorted(heads))).digest()])
378 379 else:
379 380 heads = encodelist(heads)
380 381
381 382 if util.safehasattr(cg, 'deltaheader'):
382 383 # this a bundle10, do the old style call sequence
383 384 ret, output = self._callpush("unbundle", cg, heads=heads)
384 385 if ret == "":
385 386 raise error.ResponseError(
386 387 _('push failed:'), output)
387 388 try:
388 389 ret = int(ret)
389 390 except ValueError:
390 391 raise error.ResponseError(
391 392 _('push failed (unexpected response):'), ret)
392 393
393 394 for l in output.splitlines(True):
394 395 self.ui.status(_('remote: '), l)
395 396 else:
396 397 # bundle2 push. Send a stream, fetch a stream.
397 398 stream = self._calltwowaystream('unbundle', cg, heads=heads)
398 399 ret = bundle2.unbundle20(self.ui, stream)
399 400 return ret
400 401
401 402 def debugwireargs(self, one, two, three=None, four=None, five=None):
402 403 # don't pass optional arguments left at their default value
403 404 opts = {}
404 405 if three is not None:
405 406 opts['three'] = three
406 407 if four is not None:
407 408 opts['four'] = four
408 409 return self._call('debugwireargs', one=one, two=two, **opts)
409 410
410 411 def _call(self, cmd, **args):
411 412 """execute <cmd> on the server
412 413
413 414 The command is expected to return a simple string.
414 415
415 416 returns the server reply as a string."""
416 417 raise NotImplementedError()
417 418
418 419 def _callstream(self, cmd, **args):
419 420 """execute <cmd> on the server
420 421
421 422 The command is expected to return a stream.
422 423
423 424 returns the server reply as a file like object."""
424 425 raise NotImplementedError()
425 426
426 427 def _callcompressable(self, cmd, **args):
427 428 """execute <cmd> on the server
428 429
429 430 The command is expected to return a stream.
430 431
431 432 The stream may have been compressed in some implementations. This
432 433 function takes care of the decompression. This is the only difference
433 434 with _callstream.
434 435
435 436 returns the server reply as a file like object.
436 437 """
437 438 raise NotImplementedError()
438 439
439 440 def _callpush(self, cmd, fp, **args):
440 441 """execute a <cmd> on server
441 442
442 443 The command is expected to be related to a push. Push has a special
443 444 return method.
444 445
445 446 returns the server reply as a (ret, output) tuple. ret is either
446 447 empty (error) or a stringified int.
447 448 """
448 449 raise NotImplementedError()
449 450
450 451 def _calltwowaystream(self, cmd, fp, **args):
451 452 """execute <cmd> on server
452 453
453 454 The command will send a stream to the server and get a stream in reply.
454 455 """
455 456 raise NotImplementedError()
456 457
457 458 def _abort(self, exception):
458 459 """clearly abort the wire protocol connection and raise the exception
459 460 """
460 461 raise NotImplementedError()
461 462
462 463 # server side
463 464
464 465 # wire protocol command can either return a string or one of these classes.
465 466 class streamres(object):
466 467 """wireproto reply: binary stream
467 468
468 469 The call was successful and the result is a stream.
469 470 Iterate on the `self.gen` attribute to retrieve chunks.
470 471 """
471 472 def __init__(self, gen):
472 473 self.gen = gen
473 474
474 475 class pushres(object):
475 476 """wireproto reply: success with simple integer return
476 477
477 478 The call was successful and returned an integer contained in `self.res`.
478 479 """
479 480 def __init__(self, res):
480 481 self.res = res
481 482
482 483 class pusherr(object):
483 484 """wireproto reply: failure
484 485
485 486 The call failed. The `self.res` attribute contains the error message.
486 487 """
487 488 def __init__(self, res):
488 489 self.res = res
489 490
490 491 class ooberror(object):
491 492 """wireproto reply: failure of a batch of operation
492 493
493 494 Something failed during a batch call. The error message is stored in
494 495 `self.message`.
495 496 """
496 497 def __init__(self, message):
497 498 self.message = message
498 499
499 500 def dispatch(repo, proto, command):
500 501 repo = repo.filtered("served")
501 502 func, spec = commands[command]
502 503 args = proto.getargs(spec)
503 504 return func(repo, proto, *args)
504 505
505 506 def options(cmd, keys, others):
506 507 opts = {}
507 508 for k in keys:
508 509 if k in others:
509 510 opts[k] = others[k]
510 511 del others[k]
511 512 if others:
512 513 sys.stderr.write("warning: %s ignored unexpected arguments %s\n"
513 514 % (cmd, ",".join(others)))
514 515 return opts
515 516
516 517 # list of commands
517 518 commands = {}
518 519
519 520 def wireprotocommand(name, args=''):
520 521 """decorator for wire protocol command"""
521 522 def register(func):
522 523 commands[name] = (func, args)
523 524 return func
524 525 return register
525 526
526 527 @wireprotocommand('batch', 'cmds *')
527 528 def batch(repo, proto, cmds, others):
528 529 repo = repo.filtered("served")
529 530 res = []
530 531 for pair in cmds.split(';'):
531 532 op, args = pair.split(' ', 1)
532 533 vals = {}
533 534 for a in args.split(','):
534 535 if a:
535 536 n, v = a.split('=')
536 537 vals[n] = unescapearg(v)
537 538 func, spec = commands[op]
538 539 if spec:
539 540 keys = spec.split()
540 541 data = {}
541 542 for k in keys:
542 543 if k == '*':
543 544 star = {}
544 545 for key in vals.keys():
545 546 if key not in keys:
546 547 star[key] = vals[key]
547 548 data['*'] = star
548 549 else:
549 550 data[k] = vals[k]
550 551 result = func(repo, proto, *[data[k] for k in keys])
551 552 else:
552 553 result = func(repo, proto)
553 554 if isinstance(result, ooberror):
554 555 return result
555 556 res.append(escapearg(result))
556 557 return ';'.join(res)
557 558
558 559 @wireprotocommand('between', 'pairs')
559 560 def between(repo, proto, pairs):
560 561 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
561 562 r = []
562 563 for b in repo.between(pairs):
563 564 r.append(encodelist(b) + "\n")
564 565 return "".join(r)
565 566
566 567 @wireprotocommand('branchmap')
567 568 def branchmap(repo, proto):
568 569 branchmap = repo.branchmap()
569 570 heads = []
570 571 for branch, nodes in branchmap.iteritems():
571 572 branchname = urllib.quote(encoding.fromlocal(branch))
572 573 branchnodes = encodelist(nodes)
573 574 heads.append('%s %s' % (branchname, branchnodes))
574 575 return '\n'.join(heads)
575 576
576 577 @wireprotocommand('branches', 'nodes')
577 578 def branches(repo, proto, nodes):
578 579 nodes = decodelist(nodes)
579 580 r = []
580 581 for b in repo.branches(nodes):
581 582 r.append(encodelist(b) + "\n")
582 583 return "".join(r)
583 584
584 585
585 586 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
586 587 'known', 'getbundle', 'unbundlehash', 'batch']
587 588
588 589 def _capabilities(repo, proto):
589 590 """return a list of capabilities for a repo
590 591
591 592 This function exists to allow extensions to easily wrap capabilities
592 593 computation
593 594
594 595 - returns a lists: easy to alter
595 596 - change done here will be propagated to both `capabilities` and `hello`
596 597 command without any other action needed.
597 598 """
598 599 # copy to prevent modification of the global list
599 600 caps = list(wireprotocaps)
600 601 if _allowstream(repo.ui):
601 602 if repo.ui.configbool('server', 'preferuncompressed', False):
602 603 caps.append('stream-preferred')
603 604 requiredformats = repo.requirements & repo.supportedformats
604 605 # if our local revlogs are just revlogv1, add 'stream' cap
605 606 if not requiredformats - set(('revlogv1',)):
606 607 caps.append('stream')
607 608 # otherwise, add 'streamreqs' detailing our local revlog format
608 609 else:
609 610 caps.append('streamreqs=%s' % ','.join(requiredformats))
610 611 if repo.ui.configbool('experimental', 'bundle2-exp', False):
611 612 capsblob = bundle2.encodecaps(repo.bundle2caps)
612 613 caps.append('bundle2-exp=' + urllib.quote(capsblob))
613 614 caps.append('unbundle=%s' % ','.join(changegroupmod.bundlepriority))
614 615 caps.append('httpheader=1024')
615 616 return caps
616 617
617 618 # If you are writing an extension and consider wrapping this function. Wrap
618 619 # `_capabilities` instead.
619 620 @wireprotocommand('capabilities')
620 621 def capabilities(repo, proto):
621 622 return ' '.join(_capabilities(repo, proto))
622 623
623 624 @wireprotocommand('changegroup', 'roots')
624 625 def changegroup(repo, proto, roots):
625 626 nodes = decodelist(roots)
626 627 cg = changegroupmod.changegroup(repo, nodes, 'serve')
627 628 return streamres(proto.groupchunks(cg))
628 629
629 630 @wireprotocommand('changegroupsubset', 'bases heads')
630 631 def changegroupsubset(repo, proto, bases, heads):
631 632 bases = decodelist(bases)
632 633 heads = decodelist(heads)
633 634 cg = changegroupmod.changegroupsubset(repo, bases, heads, 'serve')
634 635 return streamres(proto.groupchunks(cg))
635 636
636 637 @wireprotocommand('debugwireargs', 'one two *')
637 638 def debugwireargs(repo, proto, one, two, others):
638 639 # only accept optional args from the known set
639 640 opts = options('debugwireargs', ['three', 'four'], others)
640 641 return repo.debugwireargs(one, two, **opts)
641 642
642 643 # List of options accepted by getbundle.
643 644 #
644 645 # Meant to be extended by extensions. It is the extension's responsibility to
645 646 # ensure such options are properly processed in exchange.getbundle.
646 647 gboptslist = ['heads', 'common', 'bundlecaps']
647 648
648 649 @wireprotocommand('getbundle', '*')
649 650 def getbundle(repo, proto, others):
650 651 opts = options('getbundle', gboptsmap.keys(), others)
651 652 for k, v in opts.iteritems():
652 653 keytype = gboptsmap[k]
653 654 if keytype == 'nodes':
654 655 opts[k] = decodelist(v)
655 656 elif keytype == 'csv':
656 657 opts[k] = set(v.split(','))
657 658 elif keytype == 'boolean':
658 659 opts[k] = '%i' % bool(v)
659 660 elif keytype != 'plain':
660 661 raise KeyError('unknown getbundle option type %s'
661 662 % keytype)
662 663 cg = exchange.getbundle(repo, 'serve', **opts)
663 664 return streamres(proto.groupchunks(cg))
664 665
665 666 @wireprotocommand('heads')
666 667 def heads(repo, proto):
667 668 h = repo.heads()
668 669 return encodelist(h) + "\n"
669 670
670 671 @wireprotocommand('hello')
671 672 def hello(repo, proto):
672 673 '''the hello command returns a set of lines describing various
673 674 interesting things about the server, in an RFC822-like format.
674 675 Currently the only one defined is "capabilities", which
675 676 consists of a line in the form:
676 677
677 678 capabilities: space separated list of tokens
678 679 '''
679 680 return "capabilities: %s\n" % (capabilities(repo, proto))
680 681
681 682 @wireprotocommand('listkeys', 'namespace')
682 683 def listkeys(repo, proto, namespace):
683 684 d = repo.listkeys(encoding.tolocal(namespace)).items()
684 685 return pushkeymod.encodekeys(d)
685 686
686 687 @wireprotocommand('lookup', 'key')
687 688 def lookup(repo, proto, key):
688 689 try:
689 690 k = encoding.tolocal(key)
690 691 c = repo[k]
691 692 r = c.hex()
692 693 success = 1
693 694 except Exception, inst:
694 695 r = str(inst)
695 696 success = 0
696 697 return "%s %s\n" % (success, r)
697 698
698 699 @wireprotocommand('known', 'nodes *')
699 700 def known(repo, proto, nodes, others):
700 701 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
701 702
702 703 @wireprotocommand('pushkey', 'namespace key old new')
703 704 def pushkey(repo, proto, namespace, key, old, new):
704 705 # compatibility with pre-1.8 clients which were accidentally
705 706 # sending raw binary nodes rather than utf-8-encoded hex
706 707 if len(new) == 20 and new.encode('string-escape') != new:
707 708 # looks like it could be a binary node
708 709 try:
709 710 new.decode('utf-8')
710 711 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
711 712 except UnicodeDecodeError:
712 713 pass # binary, leave unmodified
713 714 else:
714 715 new = encoding.tolocal(new) # normal path
715 716
716 717 if util.safehasattr(proto, 'restore'):
717 718
718 719 proto.redirect()
719 720
720 721 try:
721 722 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
722 723 encoding.tolocal(old), new) or False
723 724 except util.Abort:
724 725 r = False
725 726
726 727 output = proto.restore()
727 728
728 729 return '%s\n%s' % (int(r), output)
729 730
730 731 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
731 732 encoding.tolocal(old), new)
732 733 return '%s\n' % int(r)
733 734
734 735 def _allowstream(ui):
735 736 return ui.configbool('server', 'uncompressed', True, untrusted=True)
736 737
737 738 def _walkstreamfiles(repo):
738 739 # this is it's own function so extensions can override it
739 740 return repo.store.walk()
740 741
741 742 @wireprotocommand('stream_out')
742 743 def stream(repo, proto):
743 744 '''If the server supports streaming clone, it advertises the "stream"
744 745 capability with a value representing the version and flags of the repo
745 746 it is serving. Client checks to see if it understands the format.
746 747
747 748 The format is simple: the server writes out a line with the amount
748 749 of files, then the total amount of bytes to be transferred (separated
749 750 by a space). Then, for each file, the server first writes the filename
750 751 and file size (separated by the null character), then the file contents.
751 752 '''
752 753
753 754 if not _allowstream(repo.ui):
754 755 return '1\n'
755 756
756 757 entries = []
757 758 total_bytes = 0
758 759 try:
759 760 # get consistent snapshot of repo, lock during scan
760 761 lock = repo.lock()
761 762 try:
762 763 repo.ui.debug('scanning\n')
763 764 for name, ename, size in _walkstreamfiles(repo):
764 765 if size:
765 766 entries.append((name, size))
766 767 total_bytes += size
767 768 finally:
768 769 lock.release()
769 770 except error.LockError:
770 771 return '2\n' # error: 2
771 772
772 773 def streamer(repo, entries, total):
773 774 '''stream out all metadata files in repository.'''
774 775 yield '0\n' # success
775 776 repo.ui.debug('%d files, %d bytes to transfer\n' %
776 777 (len(entries), total_bytes))
777 778 yield '%d %d\n' % (len(entries), total_bytes)
778 779
779 780 sopener = repo.sopener
780 781 oldaudit = sopener.mustaudit
781 782 debugflag = repo.ui.debugflag
782 783 sopener.mustaudit = False
783 784
784 785 try:
785 786 for name, size in entries:
786 787 if debugflag:
787 788 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
788 789 # partially encode name over the wire for backwards compat
789 790 yield '%s\0%d\n' % (store.encodedir(name), size)
790 791 if size <= 65536:
791 792 fp = sopener(name)
792 793 try:
793 794 data = fp.read(size)
794 795 finally:
795 796 fp.close()
796 797 yield data
797 798 else:
798 799 for chunk in util.filechunkiter(sopener(name), limit=size):
799 800 yield chunk
800 801 # replace with "finally:" when support for python 2.4 has been dropped
801 802 except Exception:
802 803 sopener.mustaudit = oldaudit
803 804 raise
804 805 sopener.mustaudit = oldaudit
805 806
806 807 return streamres(streamer(repo, entries, total_bytes))
807 808
808 809 @wireprotocommand('unbundle', 'heads')
809 810 def unbundle(repo, proto, heads):
810 811 their_heads = decodelist(heads)
811 812
812 813 try:
813 814 proto.redirect()
814 815
815 816 exchange.check_heads(repo, their_heads, 'preparing changes')
816 817
817 818 # write bundle data to temporary file because it can be big
818 819 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
819 820 fp = os.fdopen(fd, 'wb+')
820 821 r = 0
821 822 try:
822 823 proto.getfile(fp)
823 824 fp.seek(0)
824 825 gen = exchange.readbundle(repo.ui, fp, None)
825 826 r = exchange.unbundle(repo, gen, their_heads, 'serve',
826 827 proto._client())
827 828 if util.safehasattr(r, 'addpart'):
828 829 # The return looks streameable, we are in the bundle2 case and
829 830 # should return a stream.
830 831 return streamres(r.getchunks())
831 832 return pushres(r)
832 833
833 834 finally:
834 835 fp.close()
835 836 os.unlink(tempname)
836 837 except error.BundleValueError, exc:
837 838 bundler = bundle2.bundle20(repo.ui)
838 839 errpart = bundler.newpart('B2X:ERROR:UNSUPPORTEDCONTENT')
839 840 if exc.parttype is not None:
840 841 errpart.addparam('parttype', exc.parttype)
841 842 if exc.params:
842 843 errpart.addparam('params', '\0'.join(exc.params))
843 844 return streamres(bundler.getchunks())
844 845 except util.Abort, inst:
845 846 # The old code we moved used sys.stderr directly.
846 847 # We did not change it to minimise code change.
847 848 # This need to be moved to something proper.
848 849 # Feel free to do it.
849 850 if getattr(inst, 'duringunbundle2', False):
850 851 bundler = bundle2.bundle20(repo.ui)
851 852 manargs = [('message', str(inst))]
852 853 advargs = []
853 854 if inst.hint is not None:
854 855 advargs.append(('hint', inst.hint))
855 856 bundler.addpart(bundle2.bundlepart('B2X:ERROR:ABORT',
856 857 manargs, advargs))
857 858 return streamres(bundler.getchunks())
858 859 else:
859 860 sys.stderr.write("abort: %s\n" % inst)
860 861 return pushres(0)
861 862 except error.PushRaced, exc:
862 863 if getattr(exc, 'duringunbundle2', False):
863 864 bundler = bundle2.bundle20(repo.ui)
864 865 bundler.newpart('B2X:ERROR:PUSHRACED', [('message', str(exc))])
865 866 return streamres(bundler.getchunks())
866 867 else:
867 868 return pusherr(str(exc))
General Comments 0
You need to be logged in to leave comments. Login now