##// END OF EJS Templates
unbundle: extract checkheads in its own function...
Pierre-Yves David -
r20967:98485027 default
parent child Browse files
Show More
@@ -1,613 +1,630 b''
1 # exchange.py - utily to exchange data between repo.
1 # exchange.py - utily to exchange data between repo.
2 #
2 #
3 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from i18n import _
8 from i18n import _
9 from node import hex, nullid
9 from node import hex, nullid
10 import cStringIO
10 import cStringIO
11 import errno
11 import errno
12 import util, scmutil, changegroup, base85
12 import util, scmutil, changegroup, base85
13 import discovery, phases, obsolete, bookmarks, bundle2
13 import discovery, phases, obsolete, bookmarks, bundle2
14
14
15
15
16 class pushoperation(object):
16 class pushoperation(object):
17 """A object that represent a single push operation
17 """A object that represent a single push operation
18
18
19 It purpose is to carry push related state and very common operation.
19 It purpose is to carry push related state and very common operation.
20
20
21 A new should be created at the begining of each push and discarded
21 A new should be created at the begining of each push and discarded
22 afterward.
22 afterward.
23 """
23 """
24
24
25 def __init__(self, repo, remote, force=False, revs=None, newbranch=False):
25 def __init__(self, repo, remote, force=False, revs=None, newbranch=False):
26 # repo we push from
26 # repo we push from
27 self.repo = repo
27 self.repo = repo
28 self.ui = repo.ui
28 self.ui = repo.ui
29 # repo we push to
29 # repo we push to
30 self.remote = remote
30 self.remote = remote
31 # force option provided
31 # force option provided
32 self.force = force
32 self.force = force
33 # revs to be pushed (None is "all")
33 # revs to be pushed (None is "all")
34 self.revs = revs
34 self.revs = revs
35 # allow push of new branch
35 # allow push of new branch
36 self.newbranch = newbranch
36 self.newbranch = newbranch
37 # did a local lock get acquired?
37 # did a local lock get acquired?
38 self.locallocked = None
38 self.locallocked = None
39 # Integer version of the push result
39 # Integer version of the push result
40 # - None means nothing to push
40 # - None means nothing to push
41 # - 0 means HTTP error
41 # - 0 means HTTP error
42 # - 1 means we pushed and remote head count is unchanged *or*
42 # - 1 means we pushed and remote head count is unchanged *or*
43 # we have outgoing changesets but refused to push
43 # we have outgoing changesets but refused to push
44 # - other values as described by addchangegroup()
44 # - other values as described by addchangegroup()
45 self.ret = None
45 self.ret = None
46 # discover.outgoing object (contains common and outgoin data)
46 # discover.outgoing object (contains common and outgoin data)
47 self.outgoing = None
47 self.outgoing = None
48 # all remote heads before the push
48 # all remote heads before the push
49 self.remoteheads = None
49 self.remoteheads = None
50 # testable as a boolean indicating if any nodes are missing locally.
50 # testable as a boolean indicating if any nodes are missing locally.
51 self.incoming = None
51 self.incoming = None
52 # set of all heads common after changeset bundle push
52 # set of all heads common after changeset bundle push
53 self.commonheads = None
53 self.commonheads = None
54
54
55 def push(repo, remote, force=False, revs=None, newbranch=False):
55 def push(repo, remote, force=False, revs=None, newbranch=False):
56 '''Push outgoing changesets (limited by revs) from a local
56 '''Push outgoing changesets (limited by revs) from a local
57 repository to remote. Return an integer:
57 repository to remote. Return an integer:
58 - None means nothing to push
58 - None means nothing to push
59 - 0 means HTTP error
59 - 0 means HTTP error
60 - 1 means we pushed and remote head count is unchanged *or*
60 - 1 means we pushed and remote head count is unchanged *or*
61 we have outgoing changesets but refused to push
61 we have outgoing changesets but refused to push
62 - other values as described by addchangegroup()
62 - other values as described by addchangegroup()
63 '''
63 '''
64 pushop = pushoperation(repo, remote, force, revs, newbranch)
64 pushop = pushoperation(repo, remote, force, revs, newbranch)
65 if pushop.remote.local():
65 if pushop.remote.local():
66 missing = (set(pushop.repo.requirements)
66 missing = (set(pushop.repo.requirements)
67 - pushop.remote.local().supported)
67 - pushop.remote.local().supported)
68 if missing:
68 if missing:
69 msg = _("required features are not"
69 msg = _("required features are not"
70 " supported in the destination:"
70 " supported in the destination:"
71 " %s") % (', '.join(sorted(missing)))
71 " %s") % (', '.join(sorted(missing)))
72 raise util.Abort(msg)
72 raise util.Abort(msg)
73
73
74 # there are two ways to push to remote repo:
74 # there are two ways to push to remote repo:
75 #
75 #
76 # addchangegroup assumes local user can lock remote
76 # addchangegroup assumes local user can lock remote
77 # repo (local filesystem, old ssh servers).
77 # repo (local filesystem, old ssh servers).
78 #
78 #
79 # unbundle assumes local user cannot lock remote repo (new ssh
79 # unbundle assumes local user cannot lock remote repo (new ssh
80 # servers, http servers).
80 # servers, http servers).
81
81
82 if not pushop.remote.canpush():
82 if not pushop.remote.canpush():
83 raise util.Abort(_("destination does not support push"))
83 raise util.Abort(_("destination does not support push"))
84 # get local lock as we might write phase data
84 # get local lock as we might write phase data
85 locallock = None
85 locallock = None
86 try:
86 try:
87 locallock = pushop.repo.lock()
87 locallock = pushop.repo.lock()
88 pushop.locallocked = True
88 pushop.locallocked = True
89 except IOError, err:
89 except IOError, err:
90 pushop.locallocked = False
90 pushop.locallocked = False
91 if err.errno != errno.EACCES:
91 if err.errno != errno.EACCES:
92 raise
92 raise
93 # source repo cannot be locked.
93 # source repo cannot be locked.
94 # We do not abort the push, but just disable the local phase
94 # We do not abort the push, but just disable the local phase
95 # synchronisation.
95 # synchronisation.
96 msg = 'cannot lock source repository: %s\n' % err
96 msg = 'cannot lock source repository: %s\n' % err
97 pushop.ui.debug(msg)
97 pushop.ui.debug(msg)
98 try:
98 try:
99 pushop.repo.checkpush(pushop)
99 pushop.repo.checkpush(pushop)
100 lock = None
100 lock = None
101 unbundle = pushop.remote.capable('unbundle')
101 unbundle = pushop.remote.capable('unbundle')
102 if not unbundle:
102 if not unbundle:
103 lock = pushop.remote.lock()
103 lock = pushop.remote.lock()
104 try:
104 try:
105 _pushdiscovery(pushop)
105 _pushdiscovery(pushop)
106 if _pushcheckoutgoing(pushop):
106 if _pushcheckoutgoing(pushop):
107 _pushchangeset(pushop)
107 _pushchangeset(pushop)
108 _pushcomputecommonheads(pushop)
108 _pushcomputecommonheads(pushop)
109 _pushsyncphase(pushop)
109 _pushsyncphase(pushop)
110 _pushobsolete(pushop)
110 _pushobsolete(pushop)
111 finally:
111 finally:
112 if lock is not None:
112 if lock is not None:
113 lock.release()
113 lock.release()
114 finally:
114 finally:
115 if locallock is not None:
115 if locallock is not None:
116 locallock.release()
116 locallock.release()
117
117
118 _pushbookmark(pushop)
118 _pushbookmark(pushop)
119 return pushop.ret
119 return pushop.ret
120
120
121 def _pushdiscovery(pushop):
121 def _pushdiscovery(pushop):
122 # discovery
122 # discovery
123 unfi = pushop.repo.unfiltered()
123 unfi = pushop.repo.unfiltered()
124 fci = discovery.findcommonincoming
124 fci = discovery.findcommonincoming
125 commoninc = fci(unfi, pushop.remote, force=pushop.force)
125 commoninc = fci(unfi, pushop.remote, force=pushop.force)
126 common, inc, remoteheads = commoninc
126 common, inc, remoteheads = commoninc
127 fco = discovery.findcommonoutgoing
127 fco = discovery.findcommonoutgoing
128 outgoing = fco(unfi, pushop.remote, onlyheads=pushop.revs,
128 outgoing = fco(unfi, pushop.remote, onlyheads=pushop.revs,
129 commoninc=commoninc, force=pushop.force)
129 commoninc=commoninc, force=pushop.force)
130 pushop.outgoing = outgoing
130 pushop.outgoing = outgoing
131 pushop.remoteheads = remoteheads
131 pushop.remoteheads = remoteheads
132 pushop.incoming = inc
132 pushop.incoming = inc
133
133
134 def _pushcheckoutgoing(pushop):
134 def _pushcheckoutgoing(pushop):
135 outgoing = pushop.outgoing
135 outgoing = pushop.outgoing
136 unfi = pushop.repo.unfiltered()
136 unfi = pushop.repo.unfiltered()
137 if not outgoing.missing:
137 if not outgoing.missing:
138 # nothing to push
138 # nothing to push
139 scmutil.nochangesfound(unfi.ui, unfi, outgoing.excluded)
139 scmutil.nochangesfound(unfi.ui, unfi, outgoing.excluded)
140 return False
140 return False
141 # something to push
141 # something to push
142 if not pushop.force:
142 if not pushop.force:
143 # if repo.obsstore == False --> no obsolete
143 # if repo.obsstore == False --> no obsolete
144 # then, save the iteration
144 # then, save the iteration
145 if unfi.obsstore:
145 if unfi.obsstore:
146 # this message are here for 80 char limit reason
146 # this message are here for 80 char limit reason
147 mso = _("push includes obsolete changeset: %s!")
147 mso = _("push includes obsolete changeset: %s!")
148 mst = "push includes %s changeset: %s!"
148 mst = "push includes %s changeset: %s!"
149 # plain versions for i18n tool to detect them
149 # plain versions for i18n tool to detect them
150 _("push includes unstable changeset: %s!")
150 _("push includes unstable changeset: %s!")
151 _("push includes bumped changeset: %s!")
151 _("push includes bumped changeset: %s!")
152 _("push includes divergent changeset: %s!")
152 _("push includes divergent changeset: %s!")
153 # If we are to push if there is at least one
153 # If we are to push if there is at least one
154 # obsolete or unstable changeset in missing, at
154 # obsolete or unstable changeset in missing, at
155 # least one of the missinghead will be obsolete or
155 # least one of the missinghead will be obsolete or
156 # unstable. So checking heads only is ok
156 # unstable. So checking heads only is ok
157 for node in outgoing.missingheads:
157 for node in outgoing.missingheads:
158 ctx = unfi[node]
158 ctx = unfi[node]
159 if ctx.obsolete():
159 if ctx.obsolete():
160 raise util.Abort(mso % ctx)
160 raise util.Abort(mso % ctx)
161 elif ctx.troubled():
161 elif ctx.troubled():
162 raise util.Abort(_(mst)
162 raise util.Abort(_(mst)
163 % (ctx.troubles()[0],
163 % (ctx.troubles()[0],
164 ctx))
164 ctx))
165 newbm = pushop.ui.configlist('bookmarks', 'pushing')
165 newbm = pushop.ui.configlist('bookmarks', 'pushing')
166 discovery.checkheads(unfi, pushop.remote, outgoing,
166 discovery.checkheads(unfi, pushop.remote, outgoing,
167 pushop.remoteheads,
167 pushop.remoteheads,
168 pushop.newbranch,
168 pushop.newbranch,
169 bool(pushop.incoming),
169 bool(pushop.incoming),
170 newbm)
170 newbm)
171 return True
171 return True
172
172
173 def _pushchangeset(pushop):
173 def _pushchangeset(pushop):
174 """Make the actual push of changeset bundle to remote repo"""
174 """Make the actual push of changeset bundle to remote repo"""
175 outgoing = pushop.outgoing
175 outgoing = pushop.outgoing
176 unbundle = pushop.remote.capable('unbundle')
176 unbundle = pushop.remote.capable('unbundle')
177 # TODO: get bundlecaps from remote
177 # TODO: get bundlecaps from remote
178 bundlecaps = None
178 bundlecaps = None
179 # create a changegroup from local
179 # create a changegroup from local
180 if pushop.revs is None and not (outgoing.excluded
180 if pushop.revs is None and not (outgoing.excluded
181 or pushop.repo.changelog.filteredrevs):
181 or pushop.repo.changelog.filteredrevs):
182 # push everything,
182 # push everything,
183 # use the fast path, no race possible on push
183 # use the fast path, no race possible on push
184 bundler = changegroup.bundle10(pushop.repo, bundlecaps)
184 bundler = changegroup.bundle10(pushop.repo, bundlecaps)
185 cg = changegroup.getsubset(pushop.repo,
185 cg = changegroup.getsubset(pushop.repo,
186 outgoing,
186 outgoing,
187 bundler,
187 bundler,
188 'push',
188 'push',
189 fastpath=True)
189 fastpath=True)
190 else:
190 else:
191 cg = changegroup.getlocalbundle(pushop.repo, 'push', outgoing,
191 cg = changegroup.getlocalbundle(pushop.repo, 'push', outgoing,
192 bundlecaps)
192 bundlecaps)
193
193
194 # apply changegroup to remote
194 # apply changegroup to remote
195 if unbundle:
195 if unbundle:
196 # local repo finds heads on server, finds out what
196 # local repo finds heads on server, finds out what
197 # revs it must push. once revs transferred, if server
197 # revs it must push. once revs transferred, if server
198 # finds it has different heads (someone else won
198 # finds it has different heads (someone else won
199 # commit/push race), server aborts.
199 # commit/push race), server aborts.
200 if pushop.force:
200 if pushop.force:
201 remoteheads = ['force']
201 remoteheads = ['force']
202 else:
202 else:
203 remoteheads = pushop.remoteheads
203 remoteheads = pushop.remoteheads
204 # ssh: return remote's addchangegroup()
204 # ssh: return remote's addchangegroup()
205 # http: return remote's addchangegroup() or 0 for error
205 # http: return remote's addchangegroup() or 0 for error
206 pushop.ret = pushop.remote.unbundle(cg, remoteheads,
206 pushop.ret = pushop.remote.unbundle(cg, remoteheads,
207 'push')
207 'push')
208 else:
208 else:
209 # we return an integer indicating remote head count
209 # we return an integer indicating remote head count
210 # change
210 # change
211 pushop.ret = pushop.remote.addchangegroup(cg, 'push',
211 pushop.ret = pushop.remote.addchangegroup(cg, 'push',
212 pushop.repo.url())
212 pushop.repo.url())
213
213
214 def _pushcomputecommonheads(pushop):
214 def _pushcomputecommonheads(pushop):
215 unfi = pushop.repo.unfiltered()
215 unfi = pushop.repo.unfiltered()
216 if pushop.ret:
216 if pushop.ret:
217 # push succeed, synchronize target of the push
217 # push succeed, synchronize target of the push
218 cheads = pushop.outgoing.missingheads
218 cheads = pushop.outgoing.missingheads
219 elif pushop.revs is None:
219 elif pushop.revs is None:
220 # All out push fails. synchronize all common
220 # All out push fails. synchronize all common
221 cheads = pushop.outgoing.commonheads
221 cheads = pushop.outgoing.commonheads
222 else:
222 else:
223 # I want cheads = heads(::missingheads and ::commonheads)
223 # I want cheads = heads(::missingheads and ::commonheads)
224 # (missingheads is revs with secret changeset filtered out)
224 # (missingheads is revs with secret changeset filtered out)
225 #
225 #
226 # This can be expressed as:
226 # This can be expressed as:
227 # cheads = ( (missingheads and ::commonheads)
227 # cheads = ( (missingheads and ::commonheads)
228 # + (commonheads and ::missingheads))"
228 # + (commonheads and ::missingheads))"
229 # )
229 # )
230 #
230 #
231 # while trying to push we already computed the following:
231 # while trying to push we already computed the following:
232 # common = (::commonheads)
232 # common = (::commonheads)
233 # missing = ((commonheads::missingheads) - commonheads)
233 # missing = ((commonheads::missingheads) - commonheads)
234 #
234 #
235 # We can pick:
235 # We can pick:
236 # * missingheads part of common (::commonheads)
236 # * missingheads part of common (::commonheads)
237 common = set(pushop.outgoing.common)
237 common = set(pushop.outgoing.common)
238 nm = pushop.repo.changelog.nodemap
238 nm = pushop.repo.changelog.nodemap
239 cheads = [node for node in pushop.revs if nm[node] in common]
239 cheads = [node for node in pushop.revs if nm[node] in common]
240 # and
240 # and
241 # * commonheads parents on missing
241 # * commonheads parents on missing
242 revset = unfi.set('%ln and parents(roots(%ln))',
242 revset = unfi.set('%ln and parents(roots(%ln))',
243 pushop.outgoing.commonheads,
243 pushop.outgoing.commonheads,
244 pushop.outgoing.missing)
244 pushop.outgoing.missing)
245 cheads.extend(c.node() for c in revset)
245 cheads.extend(c.node() for c in revset)
246 pushop.commonheads = cheads
246 pushop.commonheads = cheads
247
247
248 def _pushsyncphase(pushop):
248 def _pushsyncphase(pushop):
249 """synchronise phase information locally and remotly"""
249 """synchronise phase information locally and remotly"""
250 unfi = pushop.repo.unfiltered()
250 unfi = pushop.repo.unfiltered()
251 cheads = pushop.commonheads
251 cheads = pushop.commonheads
252 if pushop.ret:
252 if pushop.ret:
253 # push succeed, synchronize target of the push
253 # push succeed, synchronize target of the push
254 cheads = pushop.outgoing.missingheads
254 cheads = pushop.outgoing.missingheads
255 elif pushop.revs is None:
255 elif pushop.revs is None:
256 # All out push fails. synchronize all common
256 # All out push fails. synchronize all common
257 cheads = pushop.outgoing.commonheads
257 cheads = pushop.outgoing.commonheads
258 else:
258 else:
259 # I want cheads = heads(::missingheads and ::commonheads)
259 # I want cheads = heads(::missingheads and ::commonheads)
260 # (missingheads is revs with secret changeset filtered out)
260 # (missingheads is revs with secret changeset filtered out)
261 #
261 #
262 # This can be expressed as:
262 # This can be expressed as:
263 # cheads = ( (missingheads and ::commonheads)
263 # cheads = ( (missingheads and ::commonheads)
264 # + (commonheads and ::missingheads))"
264 # + (commonheads and ::missingheads))"
265 # )
265 # )
266 #
266 #
267 # while trying to push we already computed the following:
267 # while trying to push we already computed the following:
268 # common = (::commonheads)
268 # common = (::commonheads)
269 # missing = ((commonheads::missingheads) - commonheads)
269 # missing = ((commonheads::missingheads) - commonheads)
270 #
270 #
271 # We can pick:
271 # We can pick:
272 # * missingheads part of common (::commonheads)
272 # * missingheads part of common (::commonheads)
273 common = set(pushop.outgoing.common)
273 common = set(pushop.outgoing.common)
274 nm = pushop.repo.changelog.nodemap
274 nm = pushop.repo.changelog.nodemap
275 cheads = [node for node in pushop.revs if nm[node] in common]
275 cheads = [node for node in pushop.revs if nm[node] in common]
276 # and
276 # and
277 # * commonheads parents on missing
277 # * commonheads parents on missing
278 revset = unfi.set('%ln and parents(roots(%ln))',
278 revset = unfi.set('%ln and parents(roots(%ln))',
279 pushop.outgoing.commonheads,
279 pushop.outgoing.commonheads,
280 pushop.outgoing.missing)
280 pushop.outgoing.missing)
281 cheads.extend(c.node() for c in revset)
281 cheads.extend(c.node() for c in revset)
282 pushop.commonheads = cheads
282 pushop.commonheads = cheads
283 # even when we don't push, exchanging phase data is useful
283 # even when we don't push, exchanging phase data is useful
284 remotephases = pushop.remote.listkeys('phases')
284 remotephases = pushop.remote.listkeys('phases')
285 if (pushop.ui.configbool('ui', '_usedassubrepo', False)
285 if (pushop.ui.configbool('ui', '_usedassubrepo', False)
286 and remotephases # server supports phases
286 and remotephases # server supports phases
287 and pushop.ret is None # nothing was pushed
287 and pushop.ret is None # nothing was pushed
288 and remotephases.get('publishing', False)):
288 and remotephases.get('publishing', False)):
289 # When:
289 # When:
290 # - this is a subrepo push
290 # - this is a subrepo push
291 # - and remote support phase
291 # - and remote support phase
292 # - and no changeset was pushed
292 # - and no changeset was pushed
293 # - and remote is publishing
293 # - and remote is publishing
294 # We may be in issue 3871 case!
294 # We may be in issue 3871 case!
295 # We drop the possible phase synchronisation done by
295 # We drop the possible phase synchronisation done by
296 # courtesy to publish changesets possibly locally draft
296 # courtesy to publish changesets possibly locally draft
297 # on the remote.
297 # on the remote.
298 remotephases = {'publishing': 'True'}
298 remotephases = {'publishing': 'True'}
299 if not remotephases: # old server or public only rer
299 if not remotephases: # old server or public only rer
300 _localphasemove(pushop, cheads)
300 _localphasemove(pushop, cheads)
301 # don't push any phase data as there is nothing to push
301 # don't push any phase data as there is nothing to push
302 else:
302 else:
303 ana = phases.analyzeremotephases(pushop.repo, cheads,
303 ana = phases.analyzeremotephases(pushop.repo, cheads,
304 remotephases)
304 remotephases)
305 pheads, droots = ana
305 pheads, droots = ana
306 ### Apply remote phase on local
306 ### Apply remote phase on local
307 if remotephases.get('publishing', False):
307 if remotephases.get('publishing', False):
308 _localphasemove(pushop, cheads)
308 _localphasemove(pushop, cheads)
309 else: # publish = False
309 else: # publish = False
310 _localphasemove(pushop, pheads)
310 _localphasemove(pushop, pheads)
311 _localphasemove(pushop, cheads, phases.draft)
311 _localphasemove(pushop, cheads, phases.draft)
312 ### Apply local phase on remote
312 ### Apply local phase on remote
313
313
314 # Get the list of all revs draft on remote by public here.
314 # Get the list of all revs draft on remote by public here.
315 # XXX Beware that revset break if droots is not strictly
315 # XXX Beware that revset break if droots is not strictly
316 # XXX root we may want to ensure it is but it is costly
316 # XXX root we may want to ensure it is but it is costly
317 outdated = unfi.set('heads((%ln::%ln) and public())',
317 outdated = unfi.set('heads((%ln::%ln) and public())',
318 droots, cheads)
318 droots, cheads)
319 for newremotehead in outdated:
319 for newremotehead in outdated:
320 r = pushop.remote.pushkey('phases',
320 r = pushop.remote.pushkey('phases',
321 newremotehead.hex(),
321 newremotehead.hex(),
322 str(phases.draft),
322 str(phases.draft),
323 str(phases.public))
323 str(phases.public))
324 if not r:
324 if not r:
325 pushop.ui.warn(_('updating %s to public failed!\n')
325 pushop.ui.warn(_('updating %s to public failed!\n')
326 % newremotehead)
326 % newremotehead)
327
327
328 def _localphasemove(pushop, nodes, phase=phases.public):
328 def _localphasemove(pushop, nodes, phase=phases.public):
329 """move <nodes> to <phase> in the local source repo"""
329 """move <nodes> to <phase> in the local source repo"""
330 if pushop.locallocked:
330 if pushop.locallocked:
331 phases.advanceboundary(pushop.repo, phase, nodes)
331 phases.advanceboundary(pushop.repo, phase, nodes)
332 else:
332 else:
333 # repo is not locked, do not change any phases!
333 # repo is not locked, do not change any phases!
334 # Informs the user that phases should have been moved when
334 # Informs the user that phases should have been moved when
335 # applicable.
335 # applicable.
336 actualmoves = [n for n in nodes if phase < pushop.repo[n].phase()]
336 actualmoves = [n for n in nodes if phase < pushop.repo[n].phase()]
337 phasestr = phases.phasenames[phase]
337 phasestr = phases.phasenames[phase]
338 if actualmoves:
338 if actualmoves:
339 pushop.ui.status(_('cannot lock source repo, skipping '
339 pushop.ui.status(_('cannot lock source repo, skipping '
340 'local %s phase update\n') % phasestr)
340 'local %s phase update\n') % phasestr)
341
341
342 def _pushobsolete(pushop):
342 def _pushobsolete(pushop):
343 """utility function to push obsolete markers to a remote"""
343 """utility function to push obsolete markers to a remote"""
344 pushop.ui.debug('try to push obsolete markers to remote\n')
344 pushop.ui.debug('try to push obsolete markers to remote\n')
345 repo = pushop.repo
345 repo = pushop.repo
346 remote = pushop.remote
346 remote = pushop.remote
347 if (obsolete._enabled and repo.obsstore and
347 if (obsolete._enabled and repo.obsstore and
348 'obsolete' in remote.listkeys('namespaces')):
348 'obsolete' in remote.listkeys('namespaces')):
349 rslts = []
349 rslts = []
350 remotedata = repo.listkeys('obsolete')
350 remotedata = repo.listkeys('obsolete')
351 for key in sorted(remotedata, reverse=True):
351 for key in sorted(remotedata, reverse=True):
352 # reverse sort to ensure we end with dump0
352 # reverse sort to ensure we end with dump0
353 data = remotedata[key]
353 data = remotedata[key]
354 rslts.append(remote.pushkey('obsolete', key, '', data))
354 rslts.append(remote.pushkey('obsolete', key, '', data))
355 if [r for r in rslts if not r]:
355 if [r for r in rslts if not r]:
356 msg = _('failed to push some obsolete markers!\n')
356 msg = _('failed to push some obsolete markers!\n')
357 repo.ui.warn(msg)
357 repo.ui.warn(msg)
358
358
359 def _pushbookmark(pushop):
359 def _pushbookmark(pushop):
360 """Update bookmark position on remote"""
360 """Update bookmark position on remote"""
361 ui = pushop.ui
361 ui = pushop.ui
362 repo = pushop.repo.unfiltered()
362 repo = pushop.repo.unfiltered()
363 remote = pushop.remote
363 remote = pushop.remote
364 ui.debug("checking for updated bookmarks\n")
364 ui.debug("checking for updated bookmarks\n")
365 revnums = map(repo.changelog.rev, pushop.revs or [])
365 revnums = map(repo.changelog.rev, pushop.revs or [])
366 ancestors = [a for a in repo.changelog.ancestors(revnums, inclusive=True)]
366 ancestors = [a for a in repo.changelog.ancestors(revnums, inclusive=True)]
367 (addsrc, adddst, advsrc, advdst, diverge, differ, invalid
367 (addsrc, adddst, advsrc, advdst, diverge, differ, invalid
368 ) = bookmarks.compare(repo, repo._bookmarks, remote.listkeys('bookmarks'),
368 ) = bookmarks.compare(repo, repo._bookmarks, remote.listkeys('bookmarks'),
369 srchex=hex)
369 srchex=hex)
370
370
371 for b, scid, dcid in advsrc:
371 for b, scid, dcid in advsrc:
372 if ancestors and repo[scid].rev() not in ancestors:
372 if ancestors and repo[scid].rev() not in ancestors:
373 continue
373 continue
374 if remote.pushkey('bookmarks', b, dcid, scid):
374 if remote.pushkey('bookmarks', b, dcid, scid):
375 ui.status(_("updating bookmark %s\n") % b)
375 ui.status(_("updating bookmark %s\n") % b)
376 else:
376 else:
377 ui.warn(_('updating bookmark %s failed!\n') % b)
377 ui.warn(_('updating bookmark %s failed!\n') % b)
378
378
379 class pulloperation(object):
379 class pulloperation(object):
380 """A object that represent a single pull operation
380 """A object that represent a single pull operation
381
381
382 It purpose is to carry push related state and very common operation.
382 It purpose is to carry push related state and very common operation.
383
383
384 A new should be created at the begining of each pull and discarded
384 A new should be created at the begining of each pull and discarded
385 afterward.
385 afterward.
386 """
386 """
387
387
388 def __init__(self, repo, remote, heads=None, force=False):
388 def __init__(self, repo, remote, heads=None, force=False):
389 # repo we pull into
389 # repo we pull into
390 self.repo = repo
390 self.repo = repo
391 # repo we pull from
391 # repo we pull from
392 self.remote = remote
392 self.remote = remote
393 # revision we try to pull (None is "all")
393 # revision we try to pull (None is "all")
394 self.heads = heads
394 self.heads = heads
395 # do we force pull?
395 # do we force pull?
396 self.force = force
396 self.force = force
397 # the name the pull transaction
397 # the name the pull transaction
398 self._trname = 'pull\n' + util.hidepassword(remote.url())
398 self._trname = 'pull\n' + util.hidepassword(remote.url())
399 # hold the transaction once created
399 # hold the transaction once created
400 self._tr = None
400 self._tr = None
401 # set of common changeset between local and remote before pull
401 # set of common changeset between local and remote before pull
402 self.common = None
402 self.common = None
403 # set of pulled head
403 # set of pulled head
404 self.rheads = None
404 self.rheads = None
405 # list of missing changeset to fetch remotly
405 # list of missing changeset to fetch remotly
406 self.fetch = None
406 self.fetch = None
407 # result of changegroup pulling (used as returng code by pull)
407 # result of changegroup pulling (used as returng code by pull)
408 self.cgresult = None
408 self.cgresult = None
409 # list of step remaining todo (related to future bundle2 usage)
409 # list of step remaining todo (related to future bundle2 usage)
410 self.todosteps = set(['changegroup', 'phases', 'obsmarkers'])
410 self.todosteps = set(['changegroup', 'phases', 'obsmarkers'])
411
411
412 @util.propertycache
412 @util.propertycache
413 def pulledsubset(self):
413 def pulledsubset(self):
414 """heads of the set of changeset target by the pull"""
414 """heads of the set of changeset target by the pull"""
415 # compute target subset
415 # compute target subset
416 if self.heads is None:
416 if self.heads is None:
417 # We pulled every thing possible
417 # We pulled every thing possible
418 # sync on everything common
418 # sync on everything common
419 c = set(self.common)
419 c = set(self.common)
420 ret = list(self.common)
420 ret = list(self.common)
421 for n in self.rheads:
421 for n in self.rheads:
422 if n not in c:
422 if n not in c:
423 ret.append(n)
423 ret.append(n)
424 return ret
424 return ret
425 else:
425 else:
426 # We pulled a specific subset
426 # We pulled a specific subset
427 # sync on this subset
427 # sync on this subset
428 return self.heads
428 return self.heads
429
429
430 def gettransaction(self):
430 def gettransaction(self):
431 """get appropriate pull transaction, creating it if needed"""
431 """get appropriate pull transaction, creating it if needed"""
432 if self._tr is None:
432 if self._tr is None:
433 self._tr = self.repo.transaction(self._trname)
433 self._tr = self.repo.transaction(self._trname)
434 return self._tr
434 return self._tr
435
435
436 def closetransaction(self):
436 def closetransaction(self):
437 """close transaction if created"""
437 """close transaction if created"""
438 if self._tr is not None:
438 if self._tr is not None:
439 self._tr.close()
439 self._tr.close()
440
440
441 def releasetransaction(self):
441 def releasetransaction(self):
442 """release transaction if created"""
442 """release transaction if created"""
443 if self._tr is not None:
443 if self._tr is not None:
444 self._tr.release()
444 self._tr.release()
445
445
446 def pull(repo, remote, heads=None, force=False):
446 def pull(repo, remote, heads=None, force=False):
447 pullop = pulloperation(repo, remote, heads, force)
447 pullop = pulloperation(repo, remote, heads, force)
448 if pullop.remote.local():
448 if pullop.remote.local():
449 missing = set(pullop.remote.requirements) - pullop.repo.supported
449 missing = set(pullop.remote.requirements) - pullop.repo.supported
450 if missing:
450 if missing:
451 msg = _("required features are not"
451 msg = _("required features are not"
452 " supported in the destination:"
452 " supported in the destination:"
453 " %s") % (', '.join(sorted(missing)))
453 " %s") % (', '.join(sorted(missing)))
454 raise util.Abort(msg)
454 raise util.Abort(msg)
455
455
456 lock = pullop.repo.lock()
456 lock = pullop.repo.lock()
457 try:
457 try:
458 _pulldiscovery(pullop)
458 _pulldiscovery(pullop)
459 if pullop.remote.capable('bundle2'):
459 if pullop.remote.capable('bundle2'):
460 _pullbundle2(pullop)
460 _pullbundle2(pullop)
461 if 'changegroup' in pullop.todosteps:
461 if 'changegroup' in pullop.todosteps:
462 _pullchangeset(pullop)
462 _pullchangeset(pullop)
463 if 'phases' in pullop.todosteps:
463 if 'phases' in pullop.todosteps:
464 _pullphase(pullop)
464 _pullphase(pullop)
465 if 'obsmarkers' in pullop.todosteps:
465 if 'obsmarkers' in pullop.todosteps:
466 _pullobsolete(pullop)
466 _pullobsolete(pullop)
467 pullop.closetransaction()
467 pullop.closetransaction()
468 finally:
468 finally:
469 pullop.releasetransaction()
469 pullop.releasetransaction()
470 lock.release()
470 lock.release()
471
471
472 return pullop.cgresult
472 return pullop.cgresult
473
473
474 def _pulldiscovery(pullop):
474 def _pulldiscovery(pullop):
475 """discovery phase for the pull
475 """discovery phase for the pull
476
476
477 Current handle changeset discovery only, will change handle all discovery
477 Current handle changeset discovery only, will change handle all discovery
478 at some point."""
478 at some point."""
479 tmp = discovery.findcommonincoming(pullop.repo.unfiltered(),
479 tmp = discovery.findcommonincoming(pullop.repo.unfiltered(),
480 pullop.remote,
480 pullop.remote,
481 heads=pullop.heads,
481 heads=pullop.heads,
482 force=pullop.force)
482 force=pullop.force)
483 pullop.common, pullop.fetch, pullop.rheads = tmp
483 pullop.common, pullop.fetch, pullop.rheads = tmp
484
484
485 def _pullbundle2(pullop):
485 def _pullbundle2(pullop):
486 """pull data using bundle2
486 """pull data using bundle2
487
487
488 For now, the only supported data are changegroup."""
488 For now, the only supported data are changegroup."""
489 kwargs = {'bundlecaps': set(['HG20'])}
489 kwargs = {'bundlecaps': set(['HG20'])}
490 # pulling changegroup
490 # pulling changegroup
491 pullop.todosteps.remove('changegroup')
491 pullop.todosteps.remove('changegroup')
492 if not pullop.fetch:
492 if not pullop.fetch:
493 pullop.repo.ui.status(_("no changes found\n"))
493 pullop.repo.ui.status(_("no changes found\n"))
494 pullop.cgresult = 0
494 pullop.cgresult = 0
495 else:
495 else:
496 kwargs['common'] = pullop.common
496 kwargs['common'] = pullop.common
497 kwargs['heads'] = pullop.heads or pullop.rheads
497 kwargs['heads'] = pullop.heads or pullop.rheads
498 if pullop.heads is None and list(pullop.common) == [nullid]:
498 if pullop.heads is None and list(pullop.common) == [nullid]:
499 pullop.repo.ui.status(_("requesting all changes\n"))
499 pullop.repo.ui.status(_("requesting all changes\n"))
500 if kwargs.keys() == ['format']:
500 if kwargs.keys() == ['format']:
501 return # nothing to pull
501 return # nothing to pull
502 bundle = pullop.remote.getbundle('pull', **kwargs)
502 bundle = pullop.remote.getbundle('pull', **kwargs)
503 try:
503 try:
504 op = bundle2.processbundle(pullop.repo, bundle, pullop.gettransaction)
504 op = bundle2.processbundle(pullop.repo, bundle, pullop.gettransaction)
505 except KeyError, exc:
505 except KeyError, exc:
506 raise util.Abort('missing support for %s' % exc)
506 raise util.Abort('missing support for %s' % exc)
507 assert len(op.records['changegroup']) == 1
507 assert len(op.records['changegroup']) == 1
508 pullop.cgresult = op.records['changegroup'][0]['return']
508 pullop.cgresult = op.records['changegroup'][0]['return']
509
509
510 def _pullchangeset(pullop):
510 def _pullchangeset(pullop):
511 """pull changeset from unbundle into the local repo"""
511 """pull changeset from unbundle into the local repo"""
512 # We delay the open of the transaction as late as possible so we
512 # We delay the open of the transaction as late as possible so we
513 # don't open transaction for nothing or you break future useful
513 # don't open transaction for nothing or you break future useful
514 # rollback call
514 # rollback call
515 pullop.todosteps.remove('changegroup')
515 pullop.todosteps.remove('changegroup')
516 if not pullop.fetch:
516 if not pullop.fetch:
517 pullop.repo.ui.status(_("no changes found\n"))
517 pullop.repo.ui.status(_("no changes found\n"))
518 pullop.cgresult = 0
518 pullop.cgresult = 0
519 return
519 return
520 pullop.gettransaction()
520 pullop.gettransaction()
521 if pullop.heads is None and list(pullop.common) == [nullid]:
521 if pullop.heads is None and list(pullop.common) == [nullid]:
522 pullop.repo.ui.status(_("requesting all changes\n"))
522 pullop.repo.ui.status(_("requesting all changes\n"))
523 elif pullop.heads is None and pullop.remote.capable('changegroupsubset'):
523 elif pullop.heads is None and pullop.remote.capable('changegroupsubset'):
524 # issue1320, avoid a race if remote changed after discovery
524 # issue1320, avoid a race if remote changed after discovery
525 pullop.heads = pullop.rheads
525 pullop.heads = pullop.rheads
526
526
527 if pullop.remote.capable('getbundle'):
527 if pullop.remote.capable('getbundle'):
528 # TODO: get bundlecaps from remote
528 # TODO: get bundlecaps from remote
529 cg = pullop.remote.getbundle('pull', common=pullop.common,
529 cg = pullop.remote.getbundle('pull', common=pullop.common,
530 heads=pullop.heads or pullop.rheads)
530 heads=pullop.heads or pullop.rheads)
531 elif pullop.heads is None:
531 elif pullop.heads is None:
532 cg = pullop.remote.changegroup(pullop.fetch, 'pull')
532 cg = pullop.remote.changegroup(pullop.fetch, 'pull')
533 elif not pullop.remote.capable('changegroupsubset'):
533 elif not pullop.remote.capable('changegroupsubset'):
534 raise util.Abort(_("partial pull cannot be done because "
534 raise util.Abort(_("partial pull cannot be done because "
535 "other repository doesn't support "
535 "other repository doesn't support "
536 "changegroupsubset."))
536 "changegroupsubset."))
537 else:
537 else:
538 cg = pullop.remote.changegroupsubset(pullop.fetch, pullop.heads, 'pull')
538 cg = pullop.remote.changegroupsubset(pullop.fetch, pullop.heads, 'pull')
539 pullop.cgresult = changegroup.addchangegroup(pullop.repo, cg, 'pull',
539 pullop.cgresult = changegroup.addchangegroup(pullop.repo, cg, 'pull',
540 pullop.remote.url())
540 pullop.remote.url())
541
541
542 def _pullphase(pullop):
542 def _pullphase(pullop):
543 # Get remote phases data from remote
543 # Get remote phases data from remote
544 pullop.todosteps.remove('phases')
544 pullop.todosteps.remove('phases')
545 remotephases = pullop.remote.listkeys('phases')
545 remotephases = pullop.remote.listkeys('phases')
546 publishing = bool(remotephases.get('publishing', False))
546 publishing = bool(remotephases.get('publishing', False))
547 if remotephases and not publishing:
547 if remotephases and not publishing:
548 # remote is new and unpublishing
548 # remote is new and unpublishing
549 pheads, _dr = phases.analyzeremotephases(pullop.repo,
549 pheads, _dr = phases.analyzeremotephases(pullop.repo,
550 pullop.pulledsubset,
550 pullop.pulledsubset,
551 remotephases)
551 remotephases)
552 phases.advanceboundary(pullop.repo, phases.public, pheads)
552 phases.advanceboundary(pullop.repo, phases.public, pheads)
553 phases.advanceboundary(pullop.repo, phases.draft,
553 phases.advanceboundary(pullop.repo, phases.draft,
554 pullop.pulledsubset)
554 pullop.pulledsubset)
555 else:
555 else:
556 # Remote is old or publishing all common changesets
556 # Remote is old or publishing all common changesets
557 # should be seen as public
557 # should be seen as public
558 phases.advanceboundary(pullop.repo, phases.public,
558 phases.advanceboundary(pullop.repo, phases.public,
559 pullop.pulledsubset)
559 pullop.pulledsubset)
560
560
561 def _pullobsolete(pullop):
561 def _pullobsolete(pullop):
562 """utility function to pull obsolete markers from a remote
562 """utility function to pull obsolete markers from a remote
563
563
564 The `gettransaction` is function that return the pull transaction, creating
564 The `gettransaction` is function that return the pull transaction, creating
565 one if necessary. We return the transaction to inform the calling code that
565 one if necessary. We return the transaction to inform the calling code that
566 a new transaction have been created (when applicable).
566 a new transaction have been created (when applicable).
567
567
568 Exists mostly to allow overriding for experimentation purpose"""
568 Exists mostly to allow overriding for experimentation purpose"""
569 pullop.todosteps.remove('obsmarkers')
569 pullop.todosteps.remove('obsmarkers')
570 tr = None
570 tr = None
571 if obsolete._enabled:
571 if obsolete._enabled:
572 pullop.repo.ui.debug('fetching remote obsolete markers\n')
572 pullop.repo.ui.debug('fetching remote obsolete markers\n')
573 remoteobs = pullop.remote.listkeys('obsolete')
573 remoteobs = pullop.remote.listkeys('obsolete')
574 if 'dump0' in remoteobs:
574 if 'dump0' in remoteobs:
575 tr = pullop.gettransaction()
575 tr = pullop.gettransaction()
576 for key in sorted(remoteobs, reverse=True):
576 for key in sorted(remoteobs, reverse=True):
577 if key.startswith('dump'):
577 if key.startswith('dump'):
578 data = base85.b85decode(remoteobs[key])
578 data = base85.b85decode(remoteobs[key])
579 pullop.repo.obsstore.mergemarkers(tr, data)
579 pullop.repo.obsstore.mergemarkers(tr, data)
580 pullop.repo.invalidatevolatilesets()
580 pullop.repo.invalidatevolatilesets()
581 return tr
581 return tr
582
582
583 def getbundle(repo, source, heads=None, common=None, bundlecaps=None):
583 def getbundle(repo, source, heads=None, common=None, bundlecaps=None):
584 """return a full bundle (with potentially multiple kind of parts)
584 """return a full bundle (with potentially multiple kind of parts)
585
585
586 Could be a bundle HG10 or a bundle HG20 depending on bundlecaps
586 Could be a bundle HG10 or a bundle HG20 depending on bundlecaps
587 passed. For now, the bundle can contain only changegroup, but this will
587 passed. For now, the bundle can contain only changegroup, but this will
588 changes when more part type will be available for bundle2.
588 changes when more part type will be available for bundle2.
589
589
590 This is different from changegroup.getbundle that only returns an HG10
590 This is different from changegroup.getbundle that only returns an HG10
591 changegroup bundle. They may eventually get reunited in the future when we
591 changegroup bundle. They may eventually get reunited in the future when we
592 have a clearer idea of the API we what to query different data.
592 have a clearer idea of the API we what to query different data.
593
593
594 The implementation is at a very early stage and will get massive rework
594 The implementation is at a very early stage and will get massive rework
595 when the API of bundle is refined.
595 when the API of bundle is refined.
596 """
596 """
597 # build bundle here.
597 # build bundle here.
598 cg = changegroup.getbundle(repo, source, heads=heads,
598 cg = changegroup.getbundle(repo, source, heads=heads,
599 common=common, bundlecaps=bundlecaps)
599 common=common, bundlecaps=bundlecaps)
600 if bundlecaps is None or 'HG20' not in bundlecaps:
600 if bundlecaps is None or 'HG20' not in bundlecaps:
601 return cg
601 return cg
602 # very crude first implementation,
602 # very crude first implementation,
603 # the bundle API will change and the generation will be done lazily.
603 # the bundle API will change and the generation will be done lazily.
604 bundler = bundle2.bundle20(repo.ui)
604 bundler = bundle2.bundle20(repo.ui)
605 tempname = changegroup.writebundle(cg, None, 'HG10UN')
605 tempname = changegroup.writebundle(cg, None, 'HG10UN')
606 data = open(tempname).read()
606 data = open(tempname).read()
607 part = bundle2.part('changegroup', data=data)
607 part = bundle2.part('changegroup', data=data)
608 bundler.addpart(part)
608 bundler.addpart(part)
609 temp = cStringIO.StringIO()
609 temp = cStringIO.StringIO()
610 for c in bundler.getchunks():
610 for c in bundler.getchunks():
611 temp.write(c)
611 temp.write(c)
612 temp.seek(0)
612 temp.seek(0)
613 return bundle2.unbundle20(repo.ui, temp)
613 return bundle2.unbundle20(repo.ui, temp)
614
615 class PushRaced(RuntimeError):
616 """An exception raised during unbunding that indicate a push race"""
617
618 def check_heads(repo, their_heads, context):
619 """check if the heads of a repo have been modified
620
621 Used by peer for unbundling.
622 """
623 heads = repo.heads()
624 heads_hash = util.sha1(''.join(sorted(heads))).digest()
625 if not (their_heads == ['force'] or their_heads == heads or
626 their_heads == ['hashed', heads_hash]):
627 # someone else committed/pushed/unbundled while we
628 # were transferring data
629 raise PushRaced('repository changed while %s - '
630 'please try again' % context)
@@ -1,799 +1,789 b''
1 # wireproto.py - generic wire protocol support functions
1 # wireproto.py - generic wire protocol support functions
2 #
2 #
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 import urllib, tempfile, os, sys
8 import urllib, tempfile, os, sys
9 from i18n import _
9 from i18n import _
10 from node import bin, hex
10 from node import bin, hex
11 import changegroup as changegroupmod
11 import changegroup as changegroupmod
12 import peer, error, encoding, util, store
12 import peer, error, encoding, util, store, exchange
13
13
14
14
15 class abstractserverproto(object):
15 class abstractserverproto(object):
16 """abstract class that summarizes the protocol API
16 """abstract class that summarizes the protocol API
17
17
18 Used as reference and documentation.
18 Used as reference and documentation.
19 """
19 """
20
20
21 def getargs(self, args):
21 def getargs(self, args):
22 """return the value for arguments in <args>
22 """return the value for arguments in <args>
23
23
24 returns a list of values (same order as <args>)"""
24 returns a list of values (same order as <args>)"""
25 raise NotImplementedError()
25 raise NotImplementedError()
26
26
27 def getfile(self, fp):
27 def getfile(self, fp):
28 """write the whole content of a file into a file like object
28 """write the whole content of a file into a file like object
29
29
30 The file is in the form::
30 The file is in the form::
31
31
32 (<chunk-size>\n<chunk>)+0\n
32 (<chunk-size>\n<chunk>)+0\n
33
33
34 chunk size is the ascii version of the int.
34 chunk size is the ascii version of the int.
35 """
35 """
36 raise NotImplementedError()
36 raise NotImplementedError()
37
37
38 def redirect(self):
38 def redirect(self):
39 """may setup interception for stdout and stderr
39 """may setup interception for stdout and stderr
40
40
41 See also the `restore` method."""
41 See also the `restore` method."""
42 raise NotImplementedError()
42 raise NotImplementedError()
43
43
44 # If the `redirect` function does install interception, the `restore`
44 # If the `redirect` function does install interception, the `restore`
45 # function MUST be defined. If interception is not used, this function
45 # function MUST be defined. If interception is not used, this function
46 # MUST NOT be defined.
46 # MUST NOT be defined.
47 #
47 #
48 # left commented here on purpose
48 # left commented here on purpose
49 #
49 #
50 #def restore(self):
50 #def restore(self):
51 # """reinstall previous stdout and stderr and return intercepted stdout
51 # """reinstall previous stdout and stderr and return intercepted stdout
52 # """
52 # """
53 # raise NotImplementedError()
53 # raise NotImplementedError()
54
54
55 def groupchunks(self, cg):
55 def groupchunks(self, cg):
56 """return 4096 chunks from a changegroup object
56 """return 4096 chunks from a changegroup object
57
57
58 Some protocols may have compressed the contents."""
58 Some protocols may have compressed the contents."""
59 raise NotImplementedError()
59 raise NotImplementedError()
60
60
61 # abstract batching support
61 # abstract batching support
62
62
63 class future(object):
63 class future(object):
64 '''placeholder for a value to be set later'''
64 '''placeholder for a value to be set later'''
65 def set(self, value):
65 def set(self, value):
66 if util.safehasattr(self, 'value'):
66 if util.safehasattr(self, 'value'):
67 raise error.RepoError("future is already set")
67 raise error.RepoError("future is already set")
68 self.value = value
68 self.value = value
69
69
70 class batcher(object):
70 class batcher(object):
71 '''base class for batches of commands submittable in a single request
71 '''base class for batches of commands submittable in a single request
72
72
73 All methods invoked on instances of this class are simply queued and
73 All methods invoked on instances of this class are simply queued and
74 return a a future for the result. Once you call submit(), all the queued
74 return a a future for the result. Once you call submit(), all the queued
75 calls are performed and the results set in their respective futures.
75 calls are performed and the results set in their respective futures.
76 '''
76 '''
77 def __init__(self):
77 def __init__(self):
78 self.calls = []
78 self.calls = []
79 def __getattr__(self, name):
79 def __getattr__(self, name):
80 def call(*args, **opts):
80 def call(*args, **opts):
81 resref = future()
81 resref = future()
82 self.calls.append((name, args, opts, resref,))
82 self.calls.append((name, args, opts, resref,))
83 return resref
83 return resref
84 return call
84 return call
85 def submit(self):
85 def submit(self):
86 pass
86 pass
87
87
88 class localbatch(batcher):
88 class localbatch(batcher):
89 '''performs the queued calls directly'''
89 '''performs the queued calls directly'''
90 def __init__(self, local):
90 def __init__(self, local):
91 batcher.__init__(self)
91 batcher.__init__(self)
92 self.local = local
92 self.local = local
93 def submit(self):
93 def submit(self):
94 for name, args, opts, resref in self.calls:
94 for name, args, opts, resref in self.calls:
95 resref.set(getattr(self.local, name)(*args, **opts))
95 resref.set(getattr(self.local, name)(*args, **opts))
96
96
97 class remotebatch(batcher):
97 class remotebatch(batcher):
98 '''batches the queued calls; uses as few roundtrips as possible'''
98 '''batches the queued calls; uses as few roundtrips as possible'''
99 def __init__(self, remote):
99 def __init__(self, remote):
100 '''remote must support _submitbatch(encbatch) and
100 '''remote must support _submitbatch(encbatch) and
101 _submitone(op, encargs)'''
101 _submitone(op, encargs)'''
102 batcher.__init__(self)
102 batcher.__init__(self)
103 self.remote = remote
103 self.remote = remote
104 def submit(self):
104 def submit(self):
105 req, rsp = [], []
105 req, rsp = [], []
106 for name, args, opts, resref in self.calls:
106 for name, args, opts, resref in self.calls:
107 mtd = getattr(self.remote, name)
107 mtd = getattr(self.remote, name)
108 batchablefn = getattr(mtd, 'batchable', None)
108 batchablefn = getattr(mtd, 'batchable', None)
109 if batchablefn is not None:
109 if batchablefn is not None:
110 batchable = batchablefn(mtd.im_self, *args, **opts)
110 batchable = batchablefn(mtd.im_self, *args, **opts)
111 encargsorres, encresref = batchable.next()
111 encargsorres, encresref = batchable.next()
112 if encresref:
112 if encresref:
113 req.append((name, encargsorres,))
113 req.append((name, encargsorres,))
114 rsp.append((batchable, encresref, resref,))
114 rsp.append((batchable, encresref, resref,))
115 else:
115 else:
116 resref.set(encargsorres)
116 resref.set(encargsorres)
117 else:
117 else:
118 if req:
118 if req:
119 self._submitreq(req, rsp)
119 self._submitreq(req, rsp)
120 req, rsp = [], []
120 req, rsp = [], []
121 resref.set(mtd(*args, **opts))
121 resref.set(mtd(*args, **opts))
122 if req:
122 if req:
123 self._submitreq(req, rsp)
123 self._submitreq(req, rsp)
124 def _submitreq(self, req, rsp):
124 def _submitreq(self, req, rsp):
125 encresults = self.remote._submitbatch(req)
125 encresults = self.remote._submitbatch(req)
126 for encres, r in zip(encresults, rsp):
126 for encres, r in zip(encresults, rsp):
127 batchable, encresref, resref = r
127 batchable, encresref, resref = r
128 encresref.set(encres)
128 encresref.set(encres)
129 resref.set(batchable.next())
129 resref.set(batchable.next())
130
130
131 def batchable(f):
131 def batchable(f):
132 '''annotation for batchable methods
132 '''annotation for batchable methods
133
133
134 Such methods must implement a coroutine as follows:
134 Such methods must implement a coroutine as follows:
135
135
136 @batchable
136 @batchable
137 def sample(self, one, two=None):
137 def sample(self, one, two=None):
138 # Handle locally computable results first:
138 # Handle locally computable results first:
139 if not one:
139 if not one:
140 yield "a local result", None
140 yield "a local result", None
141 # Build list of encoded arguments suitable for your wire protocol:
141 # Build list of encoded arguments suitable for your wire protocol:
142 encargs = [('one', encode(one),), ('two', encode(two),)]
142 encargs = [('one', encode(one),), ('two', encode(two),)]
143 # Create future for injection of encoded result:
143 # Create future for injection of encoded result:
144 encresref = future()
144 encresref = future()
145 # Return encoded arguments and future:
145 # Return encoded arguments and future:
146 yield encargs, encresref
146 yield encargs, encresref
147 # Assuming the future to be filled with the result from the batched
147 # Assuming the future to be filled with the result from the batched
148 # request now. Decode it:
148 # request now. Decode it:
149 yield decode(encresref.value)
149 yield decode(encresref.value)
150
150
151 The decorator returns a function which wraps this coroutine as a plain
151 The decorator returns a function which wraps this coroutine as a plain
152 method, but adds the original method as an attribute called "batchable",
152 method, but adds the original method as an attribute called "batchable",
153 which is used by remotebatch to split the call into separate encoding and
153 which is used by remotebatch to split the call into separate encoding and
154 decoding phases.
154 decoding phases.
155 '''
155 '''
156 def plain(*args, **opts):
156 def plain(*args, **opts):
157 batchable = f(*args, **opts)
157 batchable = f(*args, **opts)
158 encargsorres, encresref = batchable.next()
158 encargsorres, encresref = batchable.next()
159 if not encresref:
159 if not encresref:
160 return encargsorres # a local result in this case
160 return encargsorres # a local result in this case
161 self = args[0]
161 self = args[0]
162 encresref.set(self._submitone(f.func_name, encargsorres))
162 encresref.set(self._submitone(f.func_name, encargsorres))
163 return batchable.next()
163 return batchable.next()
164 setattr(plain, 'batchable', f)
164 setattr(plain, 'batchable', f)
165 return plain
165 return plain
166
166
167 # list of nodes encoding / decoding
167 # list of nodes encoding / decoding
168
168
169 def decodelist(l, sep=' '):
169 def decodelist(l, sep=' '):
170 if l:
170 if l:
171 return map(bin, l.split(sep))
171 return map(bin, l.split(sep))
172 return []
172 return []
173
173
174 def encodelist(l, sep=' '):
174 def encodelist(l, sep=' '):
175 return sep.join(map(hex, l))
175 return sep.join(map(hex, l))
176
176
177 # batched call argument encoding
177 # batched call argument encoding
178
178
179 def escapearg(plain):
179 def escapearg(plain):
180 return (plain
180 return (plain
181 .replace(':', '::')
181 .replace(':', '::')
182 .replace(',', ':,')
182 .replace(',', ':,')
183 .replace(';', ':;')
183 .replace(';', ':;')
184 .replace('=', ':='))
184 .replace('=', ':='))
185
185
186 def unescapearg(escaped):
186 def unescapearg(escaped):
187 return (escaped
187 return (escaped
188 .replace(':=', '=')
188 .replace(':=', '=')
189 .replace(':;', ';')
189 .replace(':;', ';')
190 .replace(':,', ',')
190 .replace(':,', ',')
191 .replace('::', ':'))
191 .replace('::', ':'))
192
192
193 # client side
193 # client side
194
194
195 class wirepeer(peer.peerrepository):
195 class wirepeer(peer.peerrepository):
196
196
197 def batch(self):
197 def batch(self):
198 return remotebatch(self)
198 return remotebatch(self)
199 def _submitbatch(self, req):
199 def _submitbatch(self, req):
200 cmds = []
200 cmds = []
201 for op, argsdict in req:
201 for op, argsdict in req:
202 args = ','.join('%s=%s' % p for p in argsdict.iteritems())
202 args = ','.join('%s=%s' % p for p in argsdict.iteritems())
203 cmds.append('%s %s' % (op, args))
203 cmds.append('%s %s' % (op, args))
204 rsp = self._call("batch", cmds=';'.join(cmds))
204 rsp = self._call("batch", cmds=';'.join(cmds))
205 return rsp.split(';')
205 return rsp.split(';')
206 def _submitone(self, op, args):
206 def _submitone(self, op, args):
207 return self._call(op, **args)
207 return self._call(op, **args)
208
208
209 @batchable
209 @batchable
210 def lookup(self, key):
210 def lookup(self, key):
211 self.requirecap('lookup', _('look up remote revision'))
211 self.requirecap('lookup', _('look up remote revision'))
212 f = future()
212 f = future()
213 yield {'key': encoding.fromlocal(key)}, f
213 yield {'key': encoding.fromlocal(key)}, f
214 d = f.value
214 d = f.value
215 success, data = d[:-1].split(" ", 1)
215 success, data = d[:-1].split(" ", 1)
216 if int(success):
216 if int(success):
217 yield bin(data)
217 yield bin(data)
218 self._abort(error.RepoError(data))
218 self._abort(error.RepoError(data))
219
219
220 @batchable
220 @batchable
221 def heads(self):
221 def heads(self):
222 f = future()
222 f = future()
223 yield {}, f
223 yield {}, f
224 d = f.value
224 d = f.value
225 try:
225 try:
226 yield decodelist(d[:-1])
226 yield decodelist(d[:-1])
227 except ValueError:
227 except ValueError:
228 self._abort(error.ResponseError(_("unexpected response:"), d))
228 self._abort(error.ResponseError(_("unexpected response:"), d))
229
229
230 @batchable
230 @batchable
231 def known(self, nodes):
231 def known(self, nodes):
232 f = future()
232 f = future()
233 yield {'nodes': encodelist(nodes)}, f
233 yield {'nodes': encodelist(nodes)}, f
234 d = f.value
234 d = f.value
235 try:
235 try:
236 yield [bool(int(f)) for f in d]
236 yield [bool(int(f)) for f in d]
237 except ValueError:
237 except ValueError:
238 self._abort(error.ResponseError(_("unexpected response:"), d))
238 self._abort(error.ResponseError(_("unexpected response:"), d))
239
239
240 @batchable
240 @batchable
241 def branchmap(self):
241 def branchmap(self):
242 f = future()
242 f = future()
243 yield {}, f
243 yield {}, f
244 d = f.value
244 d = f.value
245 try:
245 try:
246 branchmap = {}
246 branchmap = {}
247 for branchpart in d.splitlines():
247 for branchpart in d.splitlines():
248 branchname, branchheads = branchpart.split(' ', 1)
248 branchname, branchheads = branchpart.split(' ', 1)
249 branchname = encoding.tolocal(urllib.unquote(branchname))
249 branchname = encoding.tolocal(urllib.unquote(branchname))
250 branchheads = decodelist(branchheads)
250 branchheads = decodelist(branchheads)
251 branchmap[branchname] = branchheads
251 branchmap[branchname] = branchheads
252 yield branchmap
252 yield branchmap
253 except TypeError:
253 except TypeError:
254 self._abort(error.ResponseError(_("unexpected response:"), d))
254 self._abort(error.ResponseError(_("unexpected response:"), d))
255
255
256 def branches(self, nodes):
256 def branches(self, nodes):
257 n = encodelist(nodes)
257 n = encodelist(nodes)
258 d = self._call("branches", nodes=n)
258 d = self._call("branches", nodes=n)
259 try:
259 try:
260 br = [tuple(decodelist(b)) for b in d.splitlines()]
260 br = [tuple(decodelist(b)) for b in d.splitlines()]
261 return br
261 return br
262 except ValueError:
262 except ValueError:
263 self._abort(error.ResponseError(_("unexpected response:"), d))
263 self._abort(error.ResponseError(_("unexpected response:"), d))
264
264
265 def between(self, pairs):
265 def between(self, pairs):
266 batch = 8 # avoid giant requests
266 batch = 8 # avoid giant requests
267 r = []
267 r = []
268 for i in xrange(0, len(pairs), batch):
268 for i in xrange(0, len(pairs), batch):
269 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
269 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
270 d = self._call("between", pairs=n)
270 d = self._call("between", pairs=n)
271 try:
271 try:
272 r.extend(l and decodelist(l) or [] for l in d.splitlines())
272 r.extend(l and decodelist(l) or [] for l in d.splitlines())
273 except ValueError:
273 except ValueError:
274 self._abort(error.ResponseError(_("unexpected response:"), d))
274 self._abort(error.ResponseError(_("unexpected response:"), d))
275 return r
275 return r
276
276
277 @batchable
277 @batchable
278 def pushkey(self, namespace, key, old, new):
278 def pushkey(self, namespace, key, old, new):
279 if not self.capable('pushkey'):
279 if not self.capable('pushkey'):
280 yield False, None
280 yield False, None
281 f = future()
281 f = future()
282 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
282 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
283 yield {'namespace': encoding.fromlocal(namespace),
283 yield {'namespace': encoding.fromlocal(namespace),
284 'key': encoding.fromlocal(key),
284 'key': encoding.fromlocal(key),
285 'old': encoding.fromlocal(old),
285 'old': encoding.fromlocal(old),
286 'new': encoding.fromlocal(new)}, f
286 'new': encoding.fromlocal(new)}, f
287 d = f.value
287 d = f.value
288 d, output = d.split('\n', 1)
288 d, output = d.split('\n', 1)
289 try:
289 try:
290 d = bool(int(d))
290 d = bool(int(d))
291 except ValueError:
291 except ValueError:
292 raise error.ResponseError(
292 raise error.ResponseError(
293 _('push failed (unexpected response):'), d)
293 _('push failed (unexpected response):'), d)
294 for l in output.splitlines(True):
294 for l in output.splitlines(True):
295 self.ui.status(_('remote: '), l)
295 self.ui.status(_('remote: '), l)
296 yield d
296 yield d
297
297
298 @batchable
298 @batchable
299 def listkeys(self, namespace):
299 def listkeys(self, namespace):
300 if not self.capable('pushkey'):
300 if not self.capable('pushkey'):
301 yield {}, None
301 yield {}, None
302 f = future()
302 f = future()
303 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
303 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
304 yield {'namespace': encoding.fromlocal(namespace)}, f
304 yield {'namespace': encoding.fromlocal(namespace)}, f
305 d = f.value
305 d = f.value
306 r = {}
306 r = {}
307 for l in d.splitlines():
307 for l in d.splitlines():
308 k, v = l.split('\t')
308 k, v = l.split('\t')
309 r[encoding.tolocal(k)] = encoding.tolocal(v)
309 r[encoding.tolocal(k)] = encoding.tolocal(v)
310 yield r
310 yield r
311
311
312 def stream_out(self):
312 def stream_out(self):
313 return self._callstream('stream_out')
313 return self._callstream('stream_out')
314
314
315 def changegroup(self, nodes, kind):
315 def changegroup(self, nodes, kind):
316 n = encodelist(nodes)
316 n = encodelist(nodes)
317 f = self._callcompressable("changegroup", roots=n)
317 f = self._callcompressable("changegroup", roots=n)
318 return changegroupmod.unbundle10(f, 'UN')
318 return changegroupmod.unbundle10(f, 'UN')
319
319
320 def changegroupsubset(self, bases, heads, kind):
320 def changegroupsubset(self, bases, heads, kind):
321 self.requirecap('changegroupsubset', _('look up remote changes'))
321 self.requirecap('changegroupsubset', _('look up remote changes'))
322 bases = encodelist(bases)
322 bases = encodelist(bases)
323 heads = encodelist(heads)
323 heads = encodelist(heads)
324 f = self._callcompressable("changegroupsubset",
324 f = self._callcompressable("changegroupsubset",
325 bases=bases, heads=heads)
325 bases=bases, heads=heads)
326 return changegroupmod.unbundle10(f, 'UN')
326 return changegroupmod.unbundle10(f, 'UN')
327
327
328 def getbundle(self, source, heads=None, common=None, bundlecaps=None):
328 def getbundle(self, source, heads=None, common=None, bundlecaps=None):
329 self.requirecap('getbundle', _('look up remote changes'))
329 self.requirecap('getbundle', _('look up remote changes'))
330 opts = {}
330 opts = {}
331 if heads is not None:
331 if heads is not None:
332 opts['heads'] = encodelist(heads)
332 opts['heads'] = encodelist(heads)
333 if common is not None:
333 if common is not None:
334 opts['common'] = encodelist(common)
334 opts['common'] = encodelist(common)
335 if bundlecaps is not None:
335 if bundlecaps is not None:
336 opts['bundlecaps'] = ','.join(bundlecaps)
336 opts['bundlecaps'] = ','.join(bundlecaps)
337 f = self._callcompressable("getbundle", **opts)
337 f = self._callcompressable("getbundle", **opts)
338 return changegroupmod.unbundle10(f, 'UN')
338 return changegroupmod.unbundle10(f, 'UN')
339
339
340 def unbundle(self, cg, heads, source):
340 def unbundle(self, cg, heads, source):
341 '''Send cg (a readable file-like object representing the
341 '''Send cg (a readable file-like object representing the
342 changegroup to push, typically a chunkbuffer object) to the
342 changegroup to push, typically a chunkbuffer object) to the
343 remote server as a bundle. Return an integer indicating the
343 remote server as a bundle. Return an integer indicating the
344 result of the push (see localrepository.addchangegroup()).'''
344 result of the push (see localrepository.addchangegroup()).'''
345
345
346 if heads != ['force'] and self.capable('unbundlehash'):
346 if heads != ['force'] and self.capable('unbundlehash'):
347 heads = encodelist(['hashed',
347 heads = encodelist(['hashed',
348 util.sha1(''.join(sorted(heads))).digest()])
348 util.sha1(''.join(sorted(heads))).digest()])
349 else:
349 else:
350 heads = encodelist(heads)
350 heads = encodelist(heads)
351
351
352 ret, output = self._callpush("unbundle", cg, heads=heads)
352 ret, output = self._callpush("unbundle", cg, heads=heads)
353 if ret == "":
353 if ret == "":
354 raise error.ResponseError(
354 raise error.ResponseError(
355 _('push failed:'), output)
355 _('push failed:'), output)
356 try:
356 try:
357 ret = int(ret)
357 ret = int(ret)
358 except ValueError:
358 except ValueError:
359 raise error.ResponseError(
359 raise error.ResponseError(
360 _('push failed (unexpected response):'), ret)
360 _('push failed (unexpected response):'), ret)
361
361
362 for l in output.splitlines(True):
362 for l in output.splitlines(True):
363 self.ui.status(_('remote: '), l)
363 self.ui.status(_('remote: '), l)
364 return ret
364 return ret
365
365
366 def debugwireargs(self, one, two, three=None, four=None, five=None):
366 def debugwireargs(self, one, two, three=None, four=None, five=None):
367 # don't pass optional arguments left at their default value
367 # don't pass optional arguments left at their default value
368 opts = {}
368 opts = {}
369 if three is not None:
369 if three is not None:
370 opts['three'] = three
370 opts['three'] = three
371 if four is not None:
371 if four is not None:
372 opts['four'] = four
372 opts['four'] = four
373 return self._call('debugwireargs', one=one, two=two, **opts)
373 return self._call('debugwireargs', one=one, two=two, **opts)
374
374
375 def _call(self, cmd, **args):
375 def _call(self, cmd, **args):
376 """execute <cmd> on the server
376 """execute <cmd> on the server
377
377
378 The command is expected to return a simple string.
378 The command is expected to return a simple string.
379
379
380 returns the server reply as a string."""
380 returns the server reply as a string."""
381 raise NotImplementedError()
381 raise NotImplementedError()
382
382
383 def _callstream(self, cmd, **args):
383 def _callstream(self, cmd, **args):
384 """execute <cmd> on the server
384 """execute <cmd> on the server
385
385
386 The command is expected to return a stream.
386 The command is expected to return a stream.
387
387
388 returns the server reply as a file like object."""
388 returns the server reply as a file like object."""
389 raise NotImplementedError()
389 raise NotImplementedError()
390
390
391 def _callcompressable(self, cmd, **args):
391 def _callcompressable(self, cmd, **args):
392 """execute <cmd> on the server
392 """execute <cmd> on the server
393
393
394 The command is expected to return a stream.
394 The command is expected to return a stream.
395
395
396 The stream may have been compressed in some implementaitons. This
396 The stream may have been compressed in some implementaitons. This
397 function takes care of the decompression. This is the only difference
397 function takes care of the decompression. This is the only difference
398 with _callstream.
398 with _callstream.
399
399
400 returns the server reply as a file like object.
400 returns the server reply as a file like object.
401 """
401 """
402 raise NotImplementedError()
402 raise NotImplementedError()
403
403
404 def _callpush(self, cmd, fp, **args):
404 def _callpush(self, cmd, fp, **args):
405 """execute a <cmd> on server
405 """execute a <cmd> on server
406
406
407 The command is expected to be related to a push. Push has a special
407 The command is expected to be related to a push. Push has a special
408 return method.
408 return method.
409
409
410 returns the server reply as a (ret, output) tuple. ret is either
410 returns the server reply as a (ret, output) tuple. ret is either
411 empty (error) or a stringified int.
411 empty (error) or a stringified int.
412 """
412 """
413 raise NotImplementedError()
413 raise NotImplementedError()
414
414
415 def _abort(self, exception):
415 def _abort(self, exception):
416 """clearly abort the wire protocol connection and raise the exception
416 """clearly abort the wire protocol connection and raise the exception
417 """
417 """
418 raise NotImplementedError()
418 raise NotImplementedError()
419
419
420 # server side
420 # server side
421
421
422 # wire protocol command can either return a string or one of these classes.
422 # wire protocol command can either return a string or one of these classes.
423 class streamres(object):
423 class streamres(object):
424 """wireproto reply: binary stream
424 """wireproto reply: binary stream
425
425
426 The call was successful and the result is a stream.
426 The call was successful and the result is a stream.
427 Iterate on the `self.gen` attribute to retrieve chunks.
427 Iterate on the `self.gen` attribute to retrieve chunks.
428 """
428 """
429 def __init__(self, gen):
429 def __init__(self, gen):
430 self.gen = gen
430 self.gen = gen
431
431
432 class pushres(object):
432 class pushres(object):
433 """wireproto reply: success with simple integer return
433 """wireproto reply: success with simple integer return
434
434
435 The call was successful and returned an integer contained in `self.res`.
435 The call was successful and returned an integer contained in `self.res`.
436 """
436 """
437 def __init__(self, res):
437 def __init__(self, res):
438 self.res = res
438 self.res = res
439
439
440 class pusherr(object):
440 class pusherr(object):
441 """wireproto reply: failure
441 """wireproto reply: failure
442
442
443 The call failed. The `self.res` attribute contains the error message.
443 The call failed. The `self.res` attribute contains the error message.
444 """
444 """
445 def __init__(self, res):
445 def __init__(self, res):
446 self.res = res
446 self.res = res
447
447
448 class ooberror(object):
448 class ooberror(object):
449 """wireproto reply: failure of a batch of operation
449 """wireproto reply: failure of a batch of operation
450
450
451 Something failed during a batch call. The error message is stored in
451 Something failed during a batch call. The error message is stored in
452 `self.message`.
452 `self.message`.
453 """
453 """
454 def __init__(self, message):
454 def __init__(self, message):
455 self.message = message
455 self.message = message
456
456
457 def dispatch(repo, proto, command):
457 def dispatch(repo, proto, command):
458 repo = repo.filtered("served")
458 repo = repo.filtered("served")
459 func, spec = commands[command]
459 func, spec = commands[command]
460 args = proto.getargs(spec)
460 args = proto.getargs(spec)
461 return func(repo, proto, *args)
461 return func(repo, proto, *args)
462
462
463 def options(cmd, keys, others):
463 def options(cmd, keys, others):
464 opts = {}
464 opts = {}
465 for k in keys:
465 for k in keys:
466 if k in others:
466 if k in others:
467 opts[k] = others[k]
467 opts[k] = others[k]
468 del others[k]
468 del others[k]
469 if others:
469 if others:
470 sys.stderr.write("abort: %s got unexpected arguments %s\n"
470 sys.stderr.write("abort: %s got unexpected arguments %s\n"
471 % (cmd, ",".join(others)))
471 % (cmd, ",".join(others)))
472 return opts
472 return opts
473
473
474 # list of commands
474 # list of commands
475 commands = {}
475 commands = {}
476
476
477 def wireprotocommand(name, args=''):
477 def wireprotocommand(name, args=''):
478 """decorator for wireprotocol command"""
478 """decorator for wireprotocol command"""
479 def register(func):
479 def register(func):
480 commands[name] = (func, args)
480 commands[name] = (func, args)
481 return func
481 return func
482 return register
482 return register
483
483
484 @wireprotocommand('batch', 'cmds *')
484 @wireprotocommand('batch', 'cmds *')
485 def batch(repo, proto, cmds, others):
485 def batch(repo, proto, cmds, others):
486 repo = repo.filtered("served")
486 repo = repo.filtered("served")
487 res = []
487 res = []
488 for pair in cmds.split(';'):
488 for pair in cmds.split(';'):
489 op, args = pair.split(' ', 1)
489 op, args = pair.split(' ', 1)
490 vals = {}
490 vals = {}
491 for a in args.split(','):
491 for a in args.split(','):
492 if a:
492 if a:
493 n, v = a.split('=')
493 n, v = a.split('=')
494 vals[n] = unescapearg(v)
494 vals[n] = unescapearg(v)
495 func, spec = commands[op]
495 func, spec = commands[op]
496 if spec:
496 if spec:
497 keys = spec.split()
497 keys = spec.split()
498 data = {}
498 data = {}
499 for k in keys:
499 for k in keys:
500 if k == '*':
500 if k == '*':
501 star = {}
501 star = {}
502 for key in vals.keys():
502 for key in vals.keys():
503 if key not in keys:
503 if key not in keys:
504 star[key] = vals[key]
504 star[key] = vals[key]
505 data['*'] = star
505 data['*'] = star
506 else:
506 else:
507 data[k] = vals[k]
507 data[k] = vals[k]
508 result = func(repo, proto, *[data[k] for k in keys])
508 result = func(repo, proto, *[data[k] for k in keys])
509 else:
509 else:
510 result = func(repo, proto)
510 result = func(repo, proto)
511 if isinstance(result, ooberror):
511 if isinstance(result, ooberror):
512 return result
512 return result
513 res.append(escapearg(result))
513 res.append(escapearg(result))
514 return ';'.join(res)
514 return ';'.join(res)
515
515
516 @wireprotocommand('between', 'pairs')
516 @wireprotocommand('between', 'pairs')
517 def between(repo, proto, pairs):
517 def between(repo, proto, pairs):
518 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
518 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
519 r = []
519 r = []
520 for b in repo.between(pairs):
520 for b in repo.between(pairs):
521 r.append(encodelist(b) + "\n")
521 r.append(encodelist(b) + "\n")
522 return "".join(r)
522 return "".join(r)
523
523
524 @wireprotocommand('branchmap')
524 @wireprotocommand('branchmap')
525 def branchmap(repo, proto):
525 def branchmap(repo, proto):
526 branchmap = repo.branchmap()
526 branchmap = repo.branchmap()
527 heads = []
527 heads = []
528 for branch, nodes in branchmap.iteritems():
528 for branch, nodes in branchmap.iteritems():
529 branchname = urllib.quote(encoding.fromlocal(branch))
529 branchname = urllib.quote(encoding.fromlocal(branch))
530 branchnodes = encodelist(nodes)
530 branchnodes = encodelist(nodes)
531 heads.append('%s %s' % (branchname, branchnodes))
531 heads.append('%s %s' % (branchname, branchnodes))
532 return '\n'.join(heads)
532 return '\n'.join(heads)
533
533
534 @wireprotocommand('branches', 'nodes')
534 @wireprotocommand('branches', 'nodes')
535 def branches(repo, proto, nodes):
535 def branches(repo, proto, nodes):
536 nodes = decodelist(nodes)
536 nodes = decodelist(nodes)
537 r = []
537 r = []
538 for b in repo.branches(nodes):
538 for b in repo.branches(nodes):
539 r.append(encodelist(b) + "\n")
539 r.append(encodelist(b) + "\n")
540 return "".join(r)
540 return "".join(r)
541
541
542
542
543 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
543 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
544 'known', 'getbundle', 'unbundlehash', 'batch']
544 'known', 'getbundle', 'unbundlehash', 'batch']
545
545
546 def _capabilities(repo, proto):
546 def _capabilities(repo, proto):
547 """return a list of capabilities for a repo
547 """return a list of capabilities for a repo
548
548
549 This function exists to allow extensions to easily wrap capabilities
549 This function exists to allow extensions to easily wrap capabilities
550 computation
550 computation
551
551
552 - returns a lists: easy to alter
552 - returns a lists: easy to alter
553 - change done here will be propagated to both `capabilities` and `hello`
553 - change done here will be propagated to both `capabilities` and `hello`
554 command without any other effort. without any other action needed.
554 command without any other effort. without any other action needed.
555 """
555 """
556 # copy to prevent modification of the global list
556 # copy to prevent modification of the global list
557 caps = list(wireprotocaps)
557 caps = list(wireprotocaps)
558 if _allowstream(repo.ui):
558 if _allowstream(repo.ui):
559 if repo.ui.configbool('server', 'preferuncompressed', False):
559 if repo.ui.configbool('server', 'preferuncompressed', False):
560 caps.append('stream-preferred')
560 caps.append('stream-preferred')
561 requiredformats = repo.requirements & repo.supportedformats
561 requiredformats = repo.requirements & repo.supportedformats
562 # if our local revlogs are just revlogv1, add 'stream' cap
562 # if our local revlogs are just revlogv1, add 'stream' cap
563 if not requiredformats - set(('revlogv1',)):
563 if not requiredformats - set(('revlogv1',)):
564 caps.append('stream')
564 caps.append('stream')
565 # otherwise, add 'streamreqs' detailing our local revlog format
565 # otherwise, add 'streamreqs' detailing our local revlog format
566 else:
566 else:
567 caps.append('streamreqs=%s' % ','.join(requiredformats))
567 caps.append('streamreqs=%s' % ','.join(requiredformats))
568 caps.append('unbundle=%s' % ','.join(changegroupmod.bundlepriority))
568 caps.append('unbundle=%s' % ','.join(changegroupmod.bundlepriority))
569 caps.append('httpheader=1024')
569 caps.append('httpheader=1024')
570 return caps
570 return caps
571
571
572 # If you are writting and extension and consider wrapping this function. Wrap
572 # If you are writting and extension and consider wrapping this function. Wrap
573 # `_capabilities` instead.
573 # `_capabilities` instead.
574 @wireprotocommand('capabilities')
574 @wireprotocommand('capabilities')
575 def capabilities(repo, proto):
575 def capabilities(repo, proto):
576 return ' '.join(_capabilities(repo, proto))
576 return ' '.join(_capabilities(repo, proto))
577
577
578 @wireprotocommand('changegroup', 'roots')
578 @wireprotocommand('changegroup', 'roots')
579 def changegroup(repo, proto, roots):
579 def changegroup(repo, proto, roots):
580 nodes = decodelist(roots)
580 nodes = decodelist(roots)
581 cg = changegroupmod.changegroup(repo, nodes, 'serve')
581 cg = changegroupmod.changegroup(repo, nodes, 'serve')
582 return streamres(proto.groupchunks(cg))
582 return streamres(proto.groupchunks(cg))
583
583
584 @wireprotocommand('changegroupsubset', 'bases heads')
584 @wireprotocommand('changegroupsubset', 'bases heads')
585 def changegroupsubset(repo, proto, bases, heads):
585 def changegroupsubset(repo, proto, bases, heads):
586 bases = decodelist(bases)
586 bases = decodelist(bases)
587 heads = decodelist(heads)
587 heads = decodelist(heads)
588 cg = changegroupmod.changegroupsubset(repo, bases, heads, 'serve')
588 cg = changegroupmod.changegroupsubset(repo, bases, heads, 'serve')
589 return streamres(proto.groupchunks(cg))
589 return streamres(proto.groupchunks(cg))
590
590
591 @wireprotocommand('debugwireargs', 'one two *')
591 @wireprotocommand('debugwireargs', 'one two *')
592 def debugwireargs(repo, proto, one, two, others):
592 def debugwireargs(repo, proto, one, two, others):
593 # only accept optional args from the known set
593 # only accept optional args from the known set
594 opts = options('debugwireargs', ['three', 'four'], others)
594 opts = options('debugwireargs', ['three', 'four'], others)
595 return repo.debugwireargs(one, two, **opts)
595 return repo.debugwireargs(one, two, **opts)
596
596
597 @wireprotocommand('getbundle', '*')
597 @wireprotocommand('getbundle', '*')
598 def getbundle(repo, proto, others):
598 def getbundle(repo, proto, others):
599 opts = options('getbundle', ['heads', 'common', 'bundlecaps'], others)
599 opts = options('getbundle', ['heads', 'common', 'bundlecaps'], others)
600 for k, v in opts.iteritems():
600 for k, v in opts.iteritems():
601 if k in ('heads', 'common'):
601 if k in ('heads', 'common'):
602 opts[k] = decodelist(v)
602 opts[k] = decodelist(v)
603 elif k == 'bundlecaps':
603 elif k == 'bundlecaps':
604 opts[k] = set(v.split(','))
604 opts[k] = set(v.split(','))
605 cg = changegroupmod.getbundle(repo, 'serve', **opts)
605 cg = changegroupmod.getbundle(repo, 'serve', **opts)
606 return streamres(proto.groupchunks(cg))
606 return streamres(proto.groupchunks(cg))
607
607
608 @wireprotocommand('heads')
608 @wireprotocommand('heads')
609 def heads(repo, proto):
609 def heads(repo, proto):
610 h = repo.heads()
610 h = repo.heads()
611 return encodelist(h) + "\n"
611 return encodelist(h) + "\n"
612
612
613 @wireprotocommand('hello')
613 @wireprotocommand('hello')
614 def hello(repo, proto):
614 def hello(repo, proto):
615 '''the hello command returns a set of lines describing various
615 '''the hello command returns a set of lines describing various
616 interesting things about the server, in an RFC822-like format.
616 interesting things about the server, in an RFC822-like format.
617 Currently the only one defined is "capabilities", which
617 Currently the only one defined is "capabilities", which
618 consists of a line in the form:
618 consists of a line in the form:
619
619
620 capabilities: space separated list of tokens
620 capabilities: space separated list of tokens
621 '''
621 '''
622 return "capabilities: %s\n" % (capabilities(repo, proto))
622 return "capabilities: %s\n" % (capabilities(repo, proto))
623
623
624 @wireprotocommand('listkeys', 'namespace')
624 @wireprotocommand('listkeys', 'namespace')
625 def listkeys(repo, proto, namespace):
625 def listkeys(repo, proto, namespace):
626 d = repo.listkeys(encoding.tolocal(namespace)).items()
626 d = repo.listkeys(encoding.tolocal(namespace)).items()
627 t = '\n'.join(['%s\t%s' % (encoding.fromlocal(k), encoding.fromlocal(v))
627 t = '\n'.join(['%s\t%s' % (encoding.fromlocal(k), encoding.fromlocal(v))
628 for k, v in d])
628 for k, v in d])
629 return t
629 return t
630
630
631 @wireprotocommand('lookup', 'key')
631 @wireprotocommand('lookup', 'key')
632 def lookup(repo, proto, key):
632 def lookup(repo, proto, key):
633 try:
633 try:
634 k = encoding.tolocal(key)
634 k = encoding.tolocal(key)
635 c = repo[k]
635 c = repo[k]
636 r = c.hex()
636 r = c.hex()
637 success = 1
637 success = 1
638 except Exception, inst:
638 except Exception, inst:
639 r = str(inst)
639 r = str(inst)
640 success = 0
640 success = 0
641 return "%s %s\n" % (success, r)
641 return "%s %s\n" % (success, r)
642
642
643 @wireprotocommand('known', 'nodes *')
643 @wireprotocommand('known', 'nodes *')
644 def known(repo, proto, nodes, others):
644 def known(repo, proto, nodes, others):
645 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
645 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
646
646
647 @wireprotocommand('pushkey', 'namespace key old new')
647 @wireprotocommand('pushkey', 'namespace key old new')
648 def pushkey(repo, proto, namespace, key, old, new):
648 def pushkey(repo, proto, namespace, key, old, new):
649 # compatibility with pre-1.8 clients which were accidentally
649 # compatibility with pre-1.8 clients which were accidentally
650 # sending raw binary nodes rather than utf-8-encoded hex
650 # sending raw binary nodes rather than utf-8-encoded hex
651 if len(new) == 20 and new.encode('string-escape') != new:
651 if len(new) == 20 and new.encode('string-escape') != new:
652 # looks like it could be a binary node
652 # looks like it could be a binary node
653 try:
653 try:
654 new.decode('utf-8')
654 new.decode('utf-8')
655 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
655 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
656 except UnicodeDecodeError:
656 except UnicodeDecodeError:
657 pass # binary, leave unmodified
657 pass # binary, leave unmodified
658 else:
658 else:
659 new = encoding.tolocal(new) # normal path
659 new = encoding.tolocal(new) # normal path
660
660
661 if util.safehasattr(proto, 'restore'):
661 if util.safehasattr(proto, 'restore'):
662
662
663 proto.redirect()
663 proto.redirect()
664
664
665 try:
665 try:
666 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
666 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
667 encoding.tolocal(old), new) or False
667 encoding.tolocal(old), new) or False
668 except util.Abort:
668 except util.Abort:
669 r = False
669 r = False
670
670
671 output = proto.restore()
671 output = proto.restore()
672
672
673 return '%s\n%s' % (int(r), output)
673 return '%s\n%s' % (int(r), output)
674
674
675 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
675 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
676 encoding.tolocal(old), new)
676 encoding.tolocal(old), new)
677 return '%s\n' % int(r)
677 return '%s\n' % int(r)
678
678
679 def _allowstream(ui):
679 def _allowstream(ui):
680 return ui.configbool('server', 'uncompressed', True, untrusted=True)
680 return ui.configbool('server', 'uncompressed', True, untrusted=True)
681
681
682 def _walkstreamfiles(repo):
682 def _walkstreamfiles(repo):
683 # this is it's own function so extensions can override it
683 # this is it's own function so extensions can override it
684 return repo.store.walk()
684 return repo.store.walk()
685
685
686 @wireprotocommand('stream_out')
686 @wireprotocommand('stream_out')
687 def stream(repo, proto):
687 def stream(repo, proto):
688 '''If the server supports streaming clone, it advertises the "stream"
688 '''If the server supports streaming clone, it advertises the "stream"
689 capability with a value representing the version and flags of the repo
689 capability with a value representing the version and flags of the repo
690 it is serving. Client checks to see if it understands the format.
690 it is serving. Client checks to see if it understands the format.
691
691
692 The format is simple: the server writes out a line with the amount
692 The format is simple: the server writes out a line with the amount
693 of files, then the total amount of bytes to be transferred (separated
693 of files, then the total amount of bytes to be transferred (separated
694 by a space). Then, for each file, the server first writes the filename
694 by a space). Then, for each file, the server first writes the filename
695 and filesize (separated by the null character), then the file contents.
695 and filesize (separated by the null character), then the file contents.
696 '''
696 '''
697
697
698 if not _allowstream(repo.ui):
698 if not _allowstream(repo.ui):
699 return '1\n'
699 return '1\n'
700
700
701 entries = []
701 entries = []
702 total_bytes = 0
702 total_bytes = 0
703 try:
703 try:
704 # get consistent snapshot of repo, lock during scan
704 # get consistent snapshot of repo, lock during scan
705 lock = repo.lock()
705 lock = repo.lock()
706 try:
706 try:
707 repo.ui.debug('scanning\n')
707 repo.ui.debug('scanning\n')
708 for name, ename, size in _walkstreamfiles(repo):
708 for name, ename, size in _walkstreamfiles(repo):
709 if size:
709 if size:
710 entries.append((name, size))
710 entries.append((name, size))
711 total_bytes += size
711 total_bytes += size
712 finally:
712 finally:
713 lock.release()
713 lock.release()
714 except error.LockError:
714 except error.LockError:
715 return '2\n' # error: 2
715 return '2\n' # error: 2
716
716
717 def streamer(repo, entries, total):
717 def streamer(repo, entries, total):
718 '''stream out all metadata files in repository.'''
718 '''stream out all metadata files in repository.'''
719 yield '0\n' # success
719 yield '0\n' # success
720 repo.ui.debug('%d files, %d bytes to transfer\n' %
720 repo.ui.debug('%d files, %d bytes to transfer\n' %
721 (len(entries), total_bytes))
721 (len(entries), total_bytes))
722 yield '%d %d\n' % (len(entries), total_bytes)
722 yield '%d %d\n' % (len(entries), total_bytes)
723
723
724 sopener = repo.sopener
724 sopener = repo.sopener
725 oldaudit = sopener.mustaudit
725 oldaudit = sopener.mustaudit
726 debugflag = repo.ui.debugflag
726 debugflag = repo.ui.debugflag
727 sopener.mustaudit = False
727 sopener.mustaudit = False
728
728
729 try:
729 try:
730 for name, size in entries:
730 for name, size in entries:
731 if debugflag:
731 if debugflag:
732 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
732 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
733 # partially encode name over the wire for backwards compat
733 # partially encode name over the wire for backwards compat
734 yield '%s\0%d\n' % (store.encodedir(name), size)
734 yield '%s\0%d\n' % (store.encodedir(name), size)
735 if size <= 65536:
735 if size <= 65536:
736 fp = sopener(name)
736 fp = sopener(name)
737 try:
737 try:
738 data = fp.read(size)
738 data = fp.read(size)
739 finally:
739 finally:
740 fp.close()
740 fp.close()
741 yield data
741 yield data
742 else:
742 else:
743 for chunk in util.filechunkiter(sopener(name), limit=size):
743 for chunk in util.filechunkiter(sopener(name), limit=size):
744 yield chunk
744 yield chunk
745 # replace with "finally:" when support for python 2.4 has been dropped
745 # replace with "finally:" when support for python 2.4 has been dropped
746 except Exception:
746 except Exception:
747 sopener.mustaudit = oldaudit
747 sopener.mustaudit = oldaudit
748 raise
748 raise
749 sopener.mustaudit = oldaudit
749 sopener.mustaudit = oldaudit
750
750
751 return streamres(streamer(repo, entries, total_bytes))
751 return streamres(streamer(repo, entries, total_bytes))
752
752
753 @wireprotocommand('unbundle', 'heads')
753 @wireprotocommand('unbundle', 'heads')
754 def unbundle(repo, proto, heads):
754 def unbundle(repo, proto, heads):
755 their_heads = decodelist(heads)
755 their_heads = decodelist(heads)
756
756
757 def check_heads():
757 try:
758 heads = repo.heads()
758 proto.redirect()
759 heads_hash = util.sha1(''.join(sorted(heads))).digest()
760 return (their_heads == ['force'] or their_heads == heads or
761 their_heads == ['hashed', heads_hash])
762
759
763 proto.redirect()
760 exchange.check_heads(repo, their_heads, 'preparing changes')
764
761
765 # fail early if possible
762 # write bundle data to temporary file because it can be big
766 if not check_heads():
763 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
767 return pusherr('repository changed while preparing changes - '
764 fp = os.fdopen(fd, 'wb+')
768 'please try again')
765 r = 0
769
770 # write bundle data to temporary file because it can be big
771 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
772 fp = os.fdopen(fd, 'wb+')
773 r = 0
774 try:
775 proto.getfile(fp)
776 lock = repo.lock()
777 try:
766 try:
778 if not check_heads():
767 proto.getfile(fp)
779 # someone else committed/pushed/unbundled while we
768 lock = repo.lock()
780 # were transferring data
781 return pusherr('repository changed while uploading changes - '
782 'please try again')
783
784 # push can proceed
785 fp.seek(0)
786 gen = changegroupmod.readbundle(fp, None)
787
788 try:
769 try:
789 r = changegroupmod.addchangegroup(repo, gen, 'serve',
770 exchange.check_heads(repo, their_heads, 'uploading changes')
790 proto._client())
771
791 except util.Abort, inst:
772 # push can proceed
792 sys.stderr.write("abort: %s\n" % inst)
773 fp.seek(0)
774 gen = changegroupmod.readbundle(fp, None)
775
776 try:
777 r = changegroupmod.addchangegroup(repo, gen, 'serve',
778 proto._client())
779 except util.Abort, inst:
780 sys.stderr.write("abort: %s\n" % inst)
781 finally:
782 lock.release()
783 return pushres(r)
784
793 finally:
785 finally:
794 lock.release()
786 fp.close()
795 return pushres(r)
787 os.unlink(tempname)
796
788 except exchange.PushRaced, exc:
797 finally:
789 return pusherr(str(exc))
798 fp.close()
799 os.unlink(tempname)
General Comments 0
You need to be logged in to leave comments. Login now