##// END OF EJS Templates
unbundle: extract the core logic in another function...
Pierre-Yves David -
r20968:33d5fdd9 default
parent child Browse files
Show More
@@ -1,630 +1,656 b''
1 # exchange.py - utily to exchange data between repo.
1 # exchange.py - utily to exchange data between repo.
2 #
2 #
3 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 import sys
8 from i18n import _
9 from i18n import _
9 from node import hex, nullid
10 from node import hex, nullid
10 import cStringIO
11 import cStringIO
11 import errno
12 import errno
12 import util, scmutil, changegroup, base85
13 import util, scmutil, changegroup, base85
13 import discovery, phases, obsolete, bookmarks, bundle2
14 import discovery, phases, obsolete, bookmarks, bundle2
14
15
15
16
16 class pushoperation(object):
17 class pushoperation(object):
17 """A object that represent a single push operation
18 """A object that represent a single push operation
18
19
19 It purpose is to carry push related state and very common operation.
20 It purpose is to carry push related state and very common operation.
20
21
21 A new should be created at the begining of each push and discarded
22 A new should be created at the begining of each push and discarded
22 afterward.
23 afterward.
23 """
24 """
24
25
25 def __init__(self, repo, remote, force=False, revs=None, newbranch=False):
26 def __init__(self, repo, remote, force=False, revs=None, newbranch=False):
26 # repo we push from
27 # repo we push from
27 self.repo = repo
28 self.repo = repo
28 self.ui = repo.ui
29 self.ui = repo.ui
29 # repo we push to
30 # repo we push to
30 self.remote = remote
31 self.remote = remote
31 # force option provided
32 # force option provided
32 self.force = force
33 self.force = force
33 # revs to be pushed (None is "all")
34 # revs to be pushed (None is "all")
34 self.revs = revs
35 self.revs = revs
35 # allow push of new branch
36 # allow push of new branch
36 self.newbranch = newbranch
37 self.newbranch = newbranch
37 # did a local lock get acquired?
38 # did a local lock get acquired?
38 self.locallocked = None
39 self.locallocked = None
39 # Integer version of the push result
40 # Integer version of the push result
40 # - None means nothing to push
41 # - None means nothing to push
41 # - 0 means HTTP error
42 # - 0 means HTTP error
42 # - 1 means we pushed and remote head count is unchanged *or*
43 # - 1 means we pushed and remote head count is unchanged *or*
43 # we have outgoing changesets but refused to push
44 # we have outgoing changesets but refused to push
44 # - other values as described by addchangegroup()
45 # - other values as described by addchangegroup()
45 self.ret = None
46 self.ret = None
46 # discover.outgoing object (contains common and outgoin data)
47 # discover.outgoing object (contains common and outgoin data)
47 self.outgoing = None
48 self.outgoing = None
48 # all remote heads before the push
49 # all remote heads before the push
49 self.remoteheads = None
50 self.remoteheads = None
50 # testable as a boolean indicating if any nodes are missing locally.
51 # testable as a boolean indicating if any nodes are missing locally.
51 self.incoming = None
52 self.incoming = None
52 # set of all heads common after changeset bundle push
53 # set of all heads common after changeset bundle push
53 self.commonheads = None
54 self.commonheads = None
54
55
55 def push(repo, remote, force=False, revs=None, newbranch=False):
56 def push(repo, remote, force=False, revs=None, newbranch=False):
56 '''Push outgoing changesets (limited by revs) from a local
57 '''Push outgoing changesets (limited by revs) from a local
57 repository to remote. Return an integer:
58 repository to remote. Return an integer:
58 - None means nothing to push
59 - None means nothing to push
59 - 0 means HTTP error
60 - 0 means HTTP error
60 - 1 means we pushed and remote head count is unchanged *or*
61 - 1 means we pushed and remote head count is unchanged *or*
61 we have outgoing changesets but refused to push
62 we have outgoing changesets but refused to push
62 - other values as described by addchangegroup()
63 - other values as described by addchangegroup()
63 '''
64 '''
64 pushop = pushoperation(repo, remote, force, revs, newbranch)
65 pushop = pushoperation(repo, remote, force, revs, newbranch)
65 if pushop.remote.local():
66 if pushop.remote.local():
66 missing = (set(pushop.repo.requirements)
67 missing = (set(pushop.repo.requirements)
67 - pushop.remote.local().supported)
68 - pushop.remote.local().supported)
68 if missing:
69 if missing:
69 msg = _("required features are not"
70 msg = _("required features are not"
70 " supported in the destination:"
71 " supported in the destination:"
71 " %s") % (', '.join(sorted(missing)))
72 " %s") % (', '.join(sorted(missing)))
72 raise util.Abort(msg)
73 raise util.Abort(msg)
73
74
74 # there are two ways to push to remote repo:
75 # there are two ways to push to remote repo:
75 #
76 #
76 # addchangegroup assumes local user can lock remote
77 # addchangegroup assumes local user can lock remote
77 # repo (local filesystem, old ssh servers).
78 # repo (local filesystem, old ssh servers).
78 #
79 #
79 # unbundle assumes local user cannot lock remote repo (new ssh
80 # unbundle assumes local user cannot lock remote repo (new ssh
80 # servers, http servers).
81 # servers, http servers).
81
82
82 if not pushop.remote.canpush():
83 if not pushop.remote.canpush():
83 raise util.Abort(_("destination does not support push"))
84 raise util.Abort(_("destination does not support push"))
84 # get local lock as we might write phase data
85 # get local lock as we might write phase data
85 locallock = None
86 locallock = None
86 try:
87 try:
87 locallock = pushop.repo.lock()
88 locallock = pushop.repo.lock()
88 pushop.locallocked = True
89 pushop.locallocked = True
89 except IOError, err:
90 except IOError, err:
90 pushop.locallocked = False
91 pushop.locallocked = False
91 if err.errno != errno.EACCES:
92 if err.errno != errno.EACCES:
92 raise
93 raise
93 # source repo cannot be locked.
94 # source repo cannot be locked.
94 # We do not abort the push, but just disable the local phase
95 # We do not abort the push, but just disable the local phase
95 # synchronisation.
96 # synchronisation.
96 msg = 'cannot lock source repository: %s\n' % err
97 msg = 'cannot lock source repository: %s\n' % err
97 pushop.ui.debug(msg)
98 pushop.ui.debug(msg)
98 try:
99 try:
99 pushop.repo.checkpush(pushop)
100 pushop.repo.checkpush(pushop)
100 lock = None
101 lock = None
101 unbundle = pushop.remote.capable('unbundle')
102 unbundle = pushop.remote.capable('unbundle')
102 if not unbundle:
103 if not unbundle:
103 lock = pushop.remote.lock()
104 lock = pushop.remote.lock()
104 try:
105 try:
105 _pushdiscovery(pushop)
106 _pushdiscovery(pushop)
106 if _pushcheckoutgoing(pushop):
107 if _pushcheckoutgoing(pushop):
107 _pushchangeset(pushop)
108 _pushchangeset(pushop)
108 _pushcomputecommonheads(pushop)
109 _pushcomputecommonheads(pushop)
109 _pushsyncphase(pushop)
110 _pushsyncphase(pushop)
110 _pushobsolete(pushop)
111 _pushobsolete(pushop)
111 finally:
112 finally:
112 if lock is not None:
113 if lock is not None:
113 lock.release()
114 lock.release()
114 finally:
115 finally:
115 if locallock is not None:
116 if locallock is not None:
116 locallock.release()
117 locallock.release()
117
118
118 _pushbookmark(pushop)
119 _pushbookmark(pushop)
119 return pushop.ret
120 return pushop.ret
120
121
121 def _pushdiscovery(pushop):
122 def _pushdiscovery(pushop):
122 # discovery
123 # discovery
123 unfi = pushop.repo.unfiltered()
124 unfi = pushop.repo.unfiltered()
124 fci = discovery.findcommonincoming
125 fci = discovery.findcommonincoming
125 commoninc = fci(unfi, pushop.remote, force=pushop.force)
126 commoninc = fci(unfi, pushop.remote, force=pushop.force)
126 common, inc, remoteheads = commoninc
127 common, inc, remoteheads = commoninc
127 fco = discovery.findcommonoutgoing
128 fco = discovery.findcommonoutgoing
128 outgoing = fco(unfi, pushop.remote, onlyheads=pushop.revs,
129 outgoing = fco(unfi, pushop.remote, onlyheads=pushop.revs,
129 commoninc=commoninc, force=pushop.force)
130 commoninc=commoninc, force=pushop.force)
130 pushop.outgoing = outgoing
131 pushop.outgoing = outgoing
131 pushop.remoteheads = remoteheads
132 pushop.remoteheads = remoteheads
132 pushop.incoming = inc
133 pushop.incoming = inc
133
134
134 def _pushcheckoutgoing(pushop):
135 def _pushcheckoutgoing(pushop):
135 outgoing = pushop.outgoing
136 outgoing = pushop.outgoing
136 unfi = pushop.repo.unfiltered()
137 unfi = pushop.repo.unfiltered()
137 if not outgoing.missing:
138 if not outgoing.missing:
138 # nothing to push
139 # nothing to push
139 scmutil.nochangesfound(unfi.ui, unfi, outgoing.excluded)
140 scmutil.nochangesfound(unfi.ui, unfi, outgoing.excluded)
140 return False
141 return False
141 # something to push
142 # something to push
142 if not pushop.force:
143 if not pushop.force:
143 # if repo.obsstore == False --> no obsolete
144 # if repo.obsstore == False --> no obsolete
144 # then, save the iteration
145 # then, save the iteration
145 if unfi.obsstore:
146 if unfi.obsstore:
146 # this message are here for 80 char limit reason
147 # this message are here for 80 char limit reason
147 mso = _("push includes obsolete changeset: %s!")
148 mso = _("push includes obsolete changeset: %s!")
148 mst = "push includes %s changeset: %s!"
149 mst = "push includes %s changeset: %s!"
149 # plain versions for i18n tool to detect them
150 # plain versions for i18n tool to detect them
150 _("push includes unstable changeset: %s!")
151 _("push includes unstable changeset: %s!")
151 _("push includes bumped changeset: %s!")
152 _("push includes bumped changeset: %s!")
152 _("push includes divergent changeset: %s!")
153 _("push includes divergent changeset: %s!")
153 # If we are to push if there is at least one
154 # If we are to push if there is at least one
154 # obsolete or unstable changeset in missing, at
155 # obsolete or unstable changeset in missing, at
155 # least one of the missinghead will be obsolete or
156 # least one of the missinghead will be obsolete or
156 # unstable. So checking heads only is ok
157 # unstable. So checking heads only is ok
157 for node in outgoing.missingheads:
158 for node in outgoing.missingheads:
158 ctx = unfi[node]
159 ctx = unfi[node]
159 if ctx.obsolete():
160 if ctx.obsolete():
160 raise util.Abort(mso % ctx)
161 raise util.Abort(mso % ctx)
161 elif ctx.troubled():
162 elif ctx.troubled():
162 raise util.Abort(_(mst)
163 raise util.Abort(_(mst)
163 % (ctx.troubles()[0],
164 % (ctx.troubles()[0],
164 ctx))
165 ctx))
165 newbm = pushop.ui.configlist('bookmarks', 'pushing')
166 newbm = pushop.ui.configlist('bookmarks', 'pushing')
166 discovery.checkheads(unfi, pushop.remote, outgoing,
167 discovery.checkheads(unfi, pushop.remote, outgoing,
167 pushop.remoteheads,
168 pushop.remoteheads,
168 pushop.newbranch,
169 pushop.newbranch,
169 bool(pushop.incoming),
170 bool(pushop.incoming),
170 newbm)
171 newbm)
171 return True
172 return True
172
173
173 def _pushchangeset(pushop):
174 def _pushchangeset(pushop):
174 """Make the actual push of changeset bundle to remote repo"""
175 """Make the actual push of changeset bundle to remote repo"""
175 outgoing = pushop.outgoing
176 outgoing = pushop.outgoing
176 unbundle = pushop.remote.capable('unbundle')
177 unbundle = pushop.remote.capable('unbundle')
177 # TODO: get bundlecaps from remote
178 # TODO: get bundlecaps from remote
178 bundlecaps = None
179 bundlecaps = None
179 # create a changegroup from local
180 # create a changegroup from local
180 if pushop.revs is None and not (outgoing.excluded
181 if pushop.revs is None and not (outgoing.excluded
181 or pushop.repo.changelog.filteredrevs):
182 or pushop.repo.changelog.filteredrevs):
182 # push everything,
183 # push everything,
183 # use the fast path, no race possible on push
184 # use the fast path, no race possible on push
184 bundler = changegroup.bundle10(pushop.repo, bundlecaps)
185 bundler = changegroup.bundle10(pushop.repo, bundlecaps)
185 cg = changegroup.getsubset(pushop.repo,
186 cg = changegroup.getsubset(pushop.repo,
186 outgoing,
187 outgoing,
187 bundler,
188 bundler,
188 'push',
189 'push',
189 fastpath=True)
190 fastpath=True)
190 else:
191 else:
191 cg = changegroup.getlocalbundle(pushop.repo, 'push', outgoing,
192 cg = changegroup.getlocalbundle(pushop.repo, 'push', outgoing,
192 bundlecaps)
193 bundlecaps)
193
194
194 # apply changegroup to remote
195 # apply changegroup to remote
195 if unbundle:
196 if unbundle:
196 # local repo finds heads on server, finds out what
197 # local repo finds heads on server, finds out what
197 # revs it must push. once revs transferred, if server
198 # revs it must push. once revs transferred, if server
198 # finds it has different heads (someone else won
199 # finds it has different heads (someone else won
199 # commit/push race), server aborts.
200 # commit/push race), server aborts.
200 if pushop.force:
201 if pushop.force:
201 remoteheads = ['force']
202 remoteheads = ['force']
202 else:
203 else:
203 remoteheads = pushop.remoteheads
204 remoteheads = pushop.remoteheads
204 # ssh: return remote's addchangegroup()
205 # ssh: return remote's addchangegroup()
205 # http: return remote's addchangegroup() or 0 for error
206 # http: return remote's addchangegroup() or 0 for error
206 pushop.ret = pushop.remote.unbundle(cg, remoteheads,
207 pushop.ret = pushop.remote.unbundle(cg, remoteheads,
207 'push')
208 'push')
208 else:
209 else:
209 # we return an integer indicating remote head count
210 # we return an integer indicating remote head count
210 # change
211 # change
211 pushop.ret = pushop.remote.addchangegroup(cg, 'push',
212 pushop.ret = pushop.remote.addchangegroup(cg, 'push',
212 pushop.repo.url())
213 pushop.repo.url())
213
214
214 def _pushcomputecommonheads(pushop):
215 def _pushcomputecommonheads(pushop):
215 unfi = pushop.repo.unfiltered()
216 unfi = pushop.repo.unfiltered()
216 if pushop.ret:
217 if pushop.ret:
217 # push succeed, synchronize target of the push
218 # push succeed, synchronize target of the push
218 cheads = pushop.outgoing.missingheads
219 cheads = pushop.outgoing.missingheads
219 elif pushop.revs is None:
220 elif pushop.revs is None:
220 # All out push fails. synchronize all common
221 # All out push fails. synchronize all common
221 cheads = pushop.outgoing.commonheads
222 cheads = pushop.outgoing.commonheads
222 else:
223 else:
223 # I want cheads = heads(::missingheads and ::commonheads)
224 # I want cheads = heads(::missingheads and ::commonheads)
224 # (missingheads is revs with secret changeset filtered out)
225 # (missingheads is revs with secret changeset filtered out)
225 #
226 #
226 # This can be expressed as:
227 # This can be expressed as:
227 # cheads = ( (missingheads and ::commonheads)
228 # cheads = ( (missingheads and ::commonheads)
228 # + (commonheads and ::missingheads))"
229 # + (commonheads and ::missingheads))"
229 # )
230 # )
230 #
231 #
231 # while trying to push we already computed the following:
232 # while trying to push we already computed the following:
232 # common = (::commonheads)
233 # common = (::commonheads)
233 # missing = ((commonheads::missingheads) - commonheads)
234 # missing = ((commonheads::missingheads) - commonheads)
234 #
235 #
235 # We can pick:
236 # We can pick:
236 # * missingheads part of common (::commonheads)
237 # * missingheads part of common (::commonheads)
237 common = set(pushop.outgoing.common)
238 common = set(pushop.outgoing.common)
238 nm = pushop.repo.changelog.nodemap
239 nm = pushop.repo.changelog.nodemap
239 cheads = [node for node in pushop.revs if nm[node] in common]
240 cheads = [node for node in pushop.revs if nm[node] in common]
240 # and
241 # and
241 # * commonheads parents on missing
242 # * commonheads parents on missing
242 revset = unfi.set('%ln and parents(roots(%ln))',
243 revset = unfi.set('%ln and parents(roots(%ln))',
243 pushop.outgoing.commonheads,
244 pushop.outgoing.commonheads,
244 pushop.outgoing.missing)
245 pushop.outgoing.missing)
245 cheads.extend(c.node() for c in revset)
246 cheads.extend(c.node() for c in revset)
246 pushop.commonheads = cheads
247 pushop.commonheads = cheads
247
248
248 def _pushsyncphase(pushop):
249 def _pushsyncphase(pushop):
249 """synchronise phase information locally and remotly"""
250 """synchronise phase information locally and remotly"""
250 unfi = pushop.repo.unfiltered()
251 unfi = pushop.repo.unfiltered()
251 cheads = pushop.commonheads
252 cheads = pushop.commonheads
252 if pushop.ret:
253 if pushop.ret:
253 # push succeed, synchronize target of the push
254 # push succeed, synchronize target of the push
254 cheads = pushop.outgoing.missingheads
255 cheads = pushop.outgoing.missingheads
255 elif pushop.revs is None:
256 elif pushop.revs is None:
256 # All out push fails. synchronize all common
257 # All out push fails. synchronize all common
257 cheads = pushop.outgoing.commonheads
258 cheads = pushop.outgoing.commonheads
258 else:
259 else:
259 # I want cheads = heads(::missingheads and ::commonheads)
260 # I want cheads = heads(::missingheads and ::commonheads)
260 # (missingheads is revs with secret changeset filtered out)
261 # (missingheads is revs with secret changeset filtered out)
261 #
262 #
262 # This can be expressed as:
263 # This can be expressed as:
263 # cheads = ( (missingheads and ::commonheads)
264 # cheads = ( (missingheads and ::commonheads)
264 # + (commonheads and ::missingheads))"
265 # + (commonheads and ::missingheads))"
265 # )
266 # )
266 #
267 #
267 # while trying to push we already computed the following:
268 # while trying to push we already computed the following:
268 # common = (::commonheads)
269 # common = (::commonheads)
269 # missing = ((commonheads::missingheads) - commonheads)
270 # missing = ((commonheads::missingheads) - commonheads)
270 #
271 #
271 # We can pick:
272 # We can pick:
272 # * missingheads part of common (::commonheads)
273 # * missingheads part of common (::commonheads)
273 common = set(pushop.outgoing.common)
274 common = set(pushop.outgoing.common)
274 nm = pushop.repo.changelog.nodemap
275 nm = pushop.repo.changelog.nodemap
275 cheads = [node for node in pushop.revs if nm[node] in common]
276 cheads = [node for node in pushop.revs if nm[node] in common]
276 # and
277 # and
277 # * commonheads parents on missing
278 # * commonheads parents on missing
278 revset = unfi.set('%ln and parents(roots(%ln))',
279 revset = unfi.set('%ln and parents(roots(%ln))',
279 pushop.outgoing.commonheads,
280 pushop.outgoing.commonheads,
280 pushop.outgoing.missing)
281 pushop.outgoing.missing)
281 cheads.extend(c.node() for c in revset)
282 cheads.extend(c.node() for c in revset)
282 pushop.commonheads = cheads
283 pushop.commonheads = cheads
283 # even when we don't push, exchanging phase data is useful
284 # even when we don't push, exchanging phase data is useful
284 remotephases = pushop.remote.listkeys('phases')
285 remotephases = pushop.remote.listkeys('phases')
285 if (pushop.ui.configbool('ui', '_usedassubrepo', False)
286 if (pushop.ui.configbool('ui', '_usedassubrepo', False)
286 and remotephases # server supports phases
287 and remotephases # server supports phases
287 and pushop.ret is None # nothing was pushed
288 and pushop.ret is None # nothing was pushed
288 and remotephases.get('publishing', False)):
289 and remotephases.get('publishing', False)):
289 # When:
290 # When:
290 # - this is a subrepo push
291 # - this is a subrepo push
291 # - and remote support phase
292 # - and remote support phase
292 # - and no changeset was pushed
293 # - and no changeset was pushed
293 # - and remote is publishing
294 # - and remote is publishing
294 # We may be in issue 3871 case!
295 # We may be in issue 3871 case!
295 # We drop the possible phase synchronisation done by
296 # We drop the possible phase synchronisation done by
296 # courtesy to publish changesets possibly locally draft
297 # courtesy to publish changesets possibly locally draft
297 # on the remote.
298 # on the remote.
298 remotephases = {'publishing': 'True'}
299 remotephases = {'publishing': 'True'}
299 if not remotephases: # old server or public only rer
300 if not remotephases: # old server or public only rer
300 _localphasemove(pushop, cheads)
301 _localphasemove(pushop, cheads)
301 # don't push any phase data as there is nothing to push
302 # don't push any phase data as there is nothing to push
302 else:
303 else:
303 ana = phases.analyzeremotephases(pushop.repo, cheads,
304 ana = phases.analyzeremotephases(pushop.repo, cheads,
304 remotephases)
305 remotephases)
305 pheads, droots = ana
306 pheads, droots = ana
306 ### Apply remote phase on local
307 ### Apply remote phase on local
307 if remotephases.get('publishing', False):
308 if remotephases.get('publishing', False):
308 _localphasemove(pushop, cheads)
309 _localphasemove(pushop, cheads)
309 else: # publish = False
310 else: # publish = False
310 _localphasemove(pushop, pheads)
311 _localphasemove(pushop, pheads)
311 _localphasemove(pushop, cheads, phases.draft)
312 _localphasemove(pushop, cheads, phases.draft)
312 ### Apply local phase on remote
313 ### Apply local phase on remote
313
314
314 # Get the list of all revs draft on remote by public here.
315 # Get the list of all revs draft on remote by public here.
315 # XXX Beware that revset break if droots is not strictly
316 # XXX Beware that revset break if droots is not strictly
316 # XXX root we may want to ensure it is but it is costly
317 # XXX root we may want to ensure it is but it is costly
317 outdated = unfi.set('heads((%ln::%ln) and public())',
318 outdated = unfi.set('heads((%ln::%ln) and public())',
318 droots, cheads)
319 droots, cheads)
319 for newremotehead in outdated:
320 for newremotehead in outdated:
320 r = pushop.remote.pushkey('phases',
321 r = pushop.remote.pushkey('phases',
321 newremotehead.hex(),
322 newremotehead.hex(),
322 str(phases.draft),
323 str(phases.draft),
323 str(phases.public))
324 str(phases.public))
324 if not r:
325 if not r:
325 pushop.ui.warn(_('updating %s to public failed!\n')
326 pushop.ui.warn(_('updating %s to public failed!\n')
326 % newremotehead)
327 % newremotehead)
327
328
328 def _localphasemove(pushop, nodes, phase=phases.public):
329 def _localphasemove(pushop, nodes, phase=phases.public):
329 """move <nodes> to <phase> in the local source repo"""
330 """move <nodes> to <phase> in the local source repo"""
330 if pushop.locallocked:
331 if pushop.locallocked:
331 phases.advanceboundary(pushop.repo, phase, nodes)
332 phases.advanceboundary(pushop.repo, phase, nodes)
332 else:
333 else:
333 # repo is not locked, do not change any phases!
334 # repo is not locked, do not change any phases!
334 # Informs the user that phases should have been moved when
335 # Informs the user that phases should have been moved when
335 # applicable.
336 # applicable.
336 actualmoves = [n for n in nodes if phase < pushop.repo[n].phase()]
337 actualmoves = [n for n in nodes if phase < pushop.repo[n].phase()]
337 phasestr = phases.phasenames[phase]
338 phasestr = phases.phasenames[phase]
338 if actualmoves:
339 if actualmoves:
339 pushop.ui.status(_('cannot lock source repo, skipping '
340 pushop.ui.status(_('cannot lock source repo, skipping '
340 'local %s phase update\n') % phasestr)
341 'local %s phase update\n') % phasestr)
341
342
342 def _pushobsolete(pushop):
343 def _pushobsolete(pushop):
343 """utility function to push obsolete markers to a remote"""
344 """utility function to push obsolete markers to a remote"""
344 pushop.ui.debug('try to push obsolete markers to remote\n')
345 pushop.ui.debug('try to push obsolete markers to remote\n')
345 repo = pushop.repo
346 repo = pushop.repo
346 remote = pushop.remote
347 remote = pushop.remote
347 if (obsolete._enabled and repo.obsstore and
348 if (obsolete._enabled and repo.obsstore and
348 'obsolete' in remote.listkeys('namespaces')):
349 'obsolete' in remote.listkeys('namespaces')):
349 rslts = []
350 rslts = []
350 remotedata = repo.listkeys('obsolete')
351 remotedata = repo.listkeys('obsolete')
351 for key in sorted(remotedata, reverse=True):
352 for key in sorted(remotedata, reverse=True):
352 # reverse sort to ensure we end with dump0
353 # reverse sort to ensure we end with dump0
353 data = remotedata[key]
354 data = remotedata[key]
354 rslts.append(remote.pushkey('obsolete', key, '', data))
355 rslts.append(remote.pushkey('obsolete', key, '', data))
355 if [r for r in rslts if not r]:
356 if [r for r in rslts if not r]:
356 msg = _('failed to push some obsolete markers!\n')
357 msg = _('failed to push some obsolete markers!\n')
357 repo.ui.warn(msg)
358 repo.ui.warn(msg)
358
359
359 def _pushbookmark(pushop):
360 def _pushbookmark(pushop):
360 """Update bookmark position on remote"""
361 """Update bookmark position on remote"""
361 ui = pushop.ui
362 ui = pushop.ui
362 repo = pushop.repo.unfiltered()
363 repo = pushop.repo.unfiltered()
363 remote = pushop.remote
364 remote = pushop.remote
364 ui.debug("checking for updated bookmarks\n")
365 ui.debug("checking for updated bookmarks\n")
365 revnums = map(repo.changelog.rev, pushop.revs or [])
366 revnums = map(repo.changelog.rev, pushop.revs or [])
366 ancestors = [a for a in repo.changelog.ancestors(revnums, inclusive=True)]
367 ancestors = [a for a in repo.changelog.ancestors(revnums, inclusive=True)]
367 (addsrc, adddst, advsrc, advdst, diverge, differ, invalid
368 (addsrc, adddst, advsrc, advdst, diverge, differ, invalid
368 ) = bookmarks.compare(repo, repo._bookmarks, remote.listkeys('bookmarks'),
369 ) = bookmarks.compare(repo, repo._bookmarks, remote.listkeys('bookmarks'),
369 srchex=hex)
370 srchex=hex)
370
371
371 for b, scid, dcid in advsrc:
372 for b, scid, dcid in advsrc:
372 if ancestors and repo[scid].rev() not in ancestors:
373 if ancestors and repo[scid].rev() not in ancestors:
373 continue
374 continue
374 if remote.pushkey('bookmarks', b, dcid, scid):
375 if remote.pushkey('bookmarks', b, dcid, scid):
375 ui.status(_("updating bookmark %s\n") % b)
376 ui.status(_("updating bookmark %s\n") % b)
376 else:
377 else:
377 ui.warn(_('updating bookmark %s failed!\n') % b)
378 ui.warn(_('updating bookmark %s failed!\n') % b)
378
379
379 class pulloperation(object):
380 class pulloperation(object):
380 """A object that represent a single pull operation
381 """A object that represent a single pull operation
381
382
382 It purpose is to carry push related state and very common operation.
383 It purpose is to carry push related state and very common operation.
383
384
384 A new should be created at the begining of each pull and discarded
385 A new should be created at the begining of each pull and discarded
385 afterward.
386 afterward.
386 """
387 """
387
388
388 def __init__(self, repo, remote, heads=None, force=False):
389 def __init__(self, repo, remote, heads=None, force=False):
389 # repo we pull into
390 # repo we pull into
390 self.repo = repo
391 self.repo = repo
391 # repo we pull from
392 # repo we pull from
392 self.remote = remote
393 self.remote = remote
393 # revision we try to pull (None is "all")
394 # revision we try to pull (None is "all")
394 self.heads = heads
395 self.heads = heads
395 # do we force pull?
396 # do we force pull?
396 self.force = force
397 self.force = force
397 # the name the pull transaction
398 # the name the pull transaction
398 self._trname = 'pull\n' + util.hidepassword(remote.url())
399 self._trname = 'pull\n' + util.hidepassword(remote.url())
399 # hold the transaction once created
400 # hold the transaction once created
400 self._tr = None
401 self._tr = None
401 # set of common changeset between local and remote before pull
402 # set of common changeset between local and remote before pull
402 self.common = None
403 self.common = None
403 # set of pulled head
404 # set of pulled head
404 self.rheads = None
405 self.rheads = None
405 # list of missing changeset to fetch remotly
406 # list of missing changeset to fetch remotly
406 self.fetch = None
407 self.fetch = None
407 # result of changegroup pulling (used as returng code by pull)
408 # result of changegroup pulling (used as returng code by pull)
408 self.cgresult = None
409 self.cgresult = None
409 # list of step remaining todo (related to future bundle2 usage)
410 # list of step remaining todo (related to future bundle2 usage)
410 self.todosteps = set(['changegroup', 'phases', 'obsmarkers'])
411 self.todosteps = set(['changegroup', 'phases', 'obsmarkers'])
411
412
412 @util.propertycache
413 @util.propertycache
413 def pulledsubset(self):
414 def pulledsubset(self):
414 """heads of the set of changeset target by the pull"""
415 """heads of the set of changeset target by the pull"""
415 # compute target subset
416 # compute target subset
416 if self.heads is None:
417 if self.heads is None:
417 # We pulled every thing possible
418 # We pulled every thing possible
418 # sync on everything common
419 # sync on everything common
419 c = set(self.common)
420 c = set(self.common)
420 ret = list(self.common)
421 ret = list(self.common)
421 for n in self.rheads:
422 for n in self.rheads:
422 if n not in c:
423 if n not in c:
423 ret.append(n)
424 ret.append(n)
424 return ret
425 return ret
425 else:
426 else:
426 # We pulled a specific subset
427 # We pulled a specific subset
427 # sync on this subset
428 # sync on this subset
428 return self.heads
429 return self.heads
429
430
430 def gettransaction(self):
431 def gettransaction(self):
431 """get appropriate pull transaction, creating it if needed"""
432 """get appropriate pull transaction, creating it if needed"""
432 if self._tr is None:
433 if self._tr is None:
433 self._tr = self.repo.transaction(self._trname)
434 self._tr = self.repo.transaction(self._trname)
434 return self._tr
435 return self._tr
435
436
436 def closetransaction(self):
437 def closetransaction(self):
437 """close transaction if created"""
438 """close transaction if created"""
438 if self._tr is not None:
439 if self._tr is not None:
439 self._tr.close()
440 self._tr.close()
440
441
441 def releasetransaction(self):
442 def releasetransaction(self):
442 """release transaction if created"""
443 """release transaction if created"""
443 if self._tr is not None:
444 if self._tr is not None:
444 self._tr.release()
445 self._tr.release()
445
446
446 def pull(repo, remote, heads=None, force=False):
447 def pull(repo, remote, heads=None, force=False):
447 pullop = pulloperation(repo, remote, heads, force)
448 pullop = pulloperation(repo, remote, heads, force)
448 if pullop.remote.local():
449 if pullop.remote.local():
449 missing = set(pullop.remote.requirements) - pullop.repo.supported
450 missing = set(pullop.remote.requirements) - pullop.repo.supported
450 if missing:
451 if missing:
451 msg = _("required features are not"
452 msg = _("required features are not"
452 " supported in the destination:"
453 " supported in the destination:"
453 " %s") % (', '.join(sorted(missing)))
454 " %s") % (', '.join(sorted(missing)))
454 raise util.Abort(msg)
455 raise util.Abort(msg)
455
456
456 lock = pullop.repo.lock()
457 lock = pullop.repo.lock()
457 try:
458 try:
458 _pulldiscovery(pullop)
459 _pulldiscovery(pullop)
459 if pullop.remote.capable('bundle2'):
460 if pullop.remote.capable('bundle2'):
460 _pullbundle2(pullop)
461 _pullbundle2(pullop)
461 if 'changegroup' in pullop.todosteps:
462 if 'changegroup' in pullop.todosteps:
462 _pullchangeset(pullop)
463 _pullchangeset(pullop)
463 if 'phases' in pullop.todosteps:
464 if 'phases' in pullop.todosteps:
464 _pullphase(pullop)
465 _pullphase(pullop)
465 if 'obsmarkers' in pullop.todosteps:
466 if 'obsmarkers' in pullop.todosteps:
466 _pullobsolete(pullop)
467 _pullobsolete(pullop)
467 pullop.closetransaction()
468 pullop.closetransaction()
468 finally:
469 finally:
469 pullop.releasetransaction()
470 pullop.releasetransaction()
470 lock.release()
471 lock.release()
471
472
472 return pullop.cgresult
473 return pullop.cgresult
473
474
474 def _pulldiscovery(pullop):
475 def _pulldiscovery(pullop):
475 """discovery phase for the pull
476 """discovery phase for the pull
476
477
477 Current handle changeset discovery only, will change handle all discovery
478 Current handle changeset discovery only, will change handle all discovery
478 at some point."""
479 at some point."""
479 tmp = discovery.findcommonincoming(pullop.repo.unfiltered(),
480 tmp = discovery.findcommonincoming(pullop.repo.unfiltered(),
480 pullop.remote,
481 pullop.remote,
481 heads=pullop.heads,
482 heads=pullop.heads,
482 force=pullop.force)
483 force=pullop.force)
483 pullop.common, pullop.fetch, pullop.rheads = tmp
484 pullop.common, pullop.fetch, pullop.rheads = tmp
484
485
485 def _pullbundle2(pullop):
486 def _pullbundle2(pullop):
486 """pull data using bundle2
487 """pull data using bundle2
487
488
488 For now, the only supported data are changegroup."""
489 For now, the only supported data are changegroup."""
489 kwargs = {'bundlecaps': set(['HG20'])}
490 kwargs = {'bundlecaps': set(['HG20'])}
490 # pulling changegroup
491 # pulling changegroup
491 pullop.todosteps.remove('changegroup')
492 pullop.todosteps.remove('changegroup')
492 if not pullop.fetch:
493 if not pullop.fetch:
493 pullop.repo.ui.status(_("no changes found\n"))
494 pullop.repo.ui.status(_("no changes found\n"))
494 pullop.cgresult = 0
495 pullop.cgresult = 0
495 else:
496 else:
496 kwargs['common'] = pullop.common
497 kwargs['common'] = pullop.common
497 kwargs['heads'] = pullop.heads or pullop.rheads
498 kwargs['heads'] = pullop.heads or pullop.rheads
498 if pullop.heads is None and list(pullop.common) == [nullid]:
499 if pullop.heads is None and list(pullop.common) == [nullid]:
499 pullop.repo.ui.status(_("requesting all changes\n"))
500 pullop.repo.ui.status(_("requesting all changes\n"))
500 if kwargs.keys() == ['format']:
501 if kwargs.keys() == ['format']:
501 return # nothing to pull
502 return # nothing to pull
502 bundle = pullop.remote.getbundle('pull', **kwargs)
503 bundle = pullop.remote.getbundle('pull', **kwargs)
503 try:
504 try:
504 op = bundle2.processbundle(pullop.repo, bundle, pullop.gettransaction)
505 op = bundle2.processbundle(pullop.repo, bundle, pullop.gettransaction)
505 except KeyError, exc:
506 except KeyError, exc:
506 raise util.Abort('missing support for %s' % exc)
507 raise util.Abort('missing support for %s' % exc)
507 assert len(op.records['changegroup']) == 1
508 assert len(op.records['changegroup']) == 1
508 pullop.cgresult = op.records['changegroup'][0]['return']
509 pullop.cgresult = op.records['changegroup'][0]['return']
509
510
510 def _pullchangeset(pullop):
511 def _pullchangeset(pullop):
511 """pull changeset from unbundle into the local repo"""
512 """pull changeset from unbundle into the local repo"""
512 # We delay the open of the transaction as late as possible so we
513 # We delay the open of the transaction as late as possible so we
513 # don't open transaction for nothing or you break future useful
514 # don't open transaction for nothing or you break future useful
514 # rollback call
515 # rollback call
515 pullop.todosteps.remove('changegroup')
516 pullop.todosteps.remove('changegroup')
516 if not pullop.fetch:
517 if not pullop.fetch:
517 pullop.repo.ui.status(_("no changes found\n"))
518 pullop.repo.ui.status(_("no changes found\n"))
518 pullop.cgresult = 0
519 pullop.cgresult = 0
519 return
520 return
520 pullop.gettransaction()
521 pullop.gettransaction()
521 if pullop.heads is None and list(pullop.common) == [nullid]:
522 if pullop.heads is None and list(pullop.common) == [nullid]:
522 pullop.repo.ui.status(_("requesting all changes\n"))
523 pullop.repo.ui.status(_("requesting all changes\n"))
523 elif pullop.heads is None and pullop.remote.capable('changegroupsubset'):
524 elif pullop.heads is None and pullop.remote.capable('changegroupsubset'):
524 # issue1320, avoid a race if remote changed after discovery
525 # issue1320, avoid a race if remote changed after discovery
525 pullop.heads = pullop.rheads
526 pullop.heads = pullop.rheads
526
527
527 if pullop.remote.capable('getbundle'):
528 if pullop.remote.capable('getbundle'):
528 # TODO: get bundlecaps from remote
529 # TODO: get bundlecaps from remote
529 cg = pullop.remote.getbundle('pull', common=pullop.common,
530 cg = pullop.remote.getbundle('pull', common=pullop.common,
530 heads=pullop.heads or pullop.rheads)
531 heads=pullop.heads or pullop.rheads)
531 elif pullop.heads is None:
532 elif pullop.heads is None:
532 cg = pullop.remote.changegroup(pullop.fetch, 'pull')
533 cg = pullop.remote.changegroup(pullop.fetch, 'pull')
533 elif not pullop.remote.capable('changegroupsubset'):
534 elif not pullop.remote.capable('changegroupsubset'):
534 raise util.Abort(_("partial pull cannot be done because "
535 raise util.Abort(_("partial pull cannot be done because "
535 "other repository doesn't support "
536 "other repository doesn't support "
536 "changegroupsubset."))
537 "changegroupsubset."))
537 else:
538 else:
538 cg = pullop.remote.changegroupsubset(pullop.fetch, pullop.heads, 'pull')
539 cg = pullop.remote.changegroupsubset(pullop.fetch, pullop.heads, 'pull')
539 pullop.cgresult = changegroup.addchangegroup(pullop.repo, cg, 'pull',
540 pullop.cgresult = changegroup.addchangegroup(pullop.repo, cg, 'pull',
540 pullop.remote.url())
541 pullop.remote.url())
541
542
542 def _pullphase(pullop):
543 def _pullphase(pullop):
543 # Get remote phases data from remote
544 # Get remote phases data from remote
544 pullop.todosteps.remove('phases')
545 pullop.todosteps.remove('phases')
545 remotephases = pullop.remote.listkeys('phases')
546 remotephases = pullop.remote.listkeys('phases')
546 publishing = bool(remotephases.get('publishing', False))
547 publishing = bool(remotephases.get('publishing', False))
547 if remotephases and not publishing:
548 if remotephases and not publishing:
548 # remote is new and unpublishing
549 # remote is new and unpublishing
549 pheads, _dr = phases.analyzeremotephases(pullop.repo,
550 pheads, _dr = phases.analyzeremotephases(pullop.repo,
550 pullop.pulledsubset,
551 pullop.pulledsubset,
551 remotephases)
552 remotephases)
552 phases.advanceboundary(pullop.repo, phases.public, pheads)
553 phases.advanceboundary(pullop.repo, phases.public, pheads)
553 phases.advanceboundary(pullop.repo, phases.draft,
554 phases.advanceboundary(pullop.repo, phases.draft,
554 pullop.pulledsubset)
555 pullop.pulledsubset)
555 else:
556 else:
556 # Remote is old or publishing all common changesets
557 # Remote is old or publishing all common changesets
557 # should be seen as public
558 # should be seen as public
558 phases.advanceboundary(pullop.repo, phases.public,
559 phases.advanceboundary(pullop.repo, phases.public,
559 pullop.pulledsubset)
560 pullop.pulledsubset)
560
561
561 def _pullobsolete(pullop):
562 def _pullobsolete(pullop):
562 """utility function to pull obsolete markers from a remote
563 """utility function to pull obsolete markers from a remote
563
564
564 The `gettransaction` is function that return the pull transaction, creating
565 The `gettransaction` is function that return the pull transaction, creating
565 one if necessary. We return the transaction to inform the calling code that
566 one if necessary. We return the transaction to inform the calling code that
566 a new transaction have been created (when applicable).
567 a new transaction have been created (when applicable).
567
568
568 Exists mostly to allow overriding for experimentation purpose"""
569 Exists mostly to allow overriding for experimentation purpose"""
569 pullop.todosteps.remove('obsmarkers')
570 pullop.todosteps.remove('obsmarkers')
570 tr = None
571 tr = None
571 if obsolete._enabled:
572 if obsolete._enabled:
572 pullop.repo.ui.debug('fetching remote obsolete markers\n')
573 pullop.repo.ui.debug('fetching remote obsolete markers\n')
573 remoteobs = pullop.remote.listkeys('obsolete')
574 remoteobs = pullop.remote.listkeys('obsolete')
574 if 'dump0' in remoteobs:
575 if 'dump0' in remoteobs:
575 tr = pullop.gettransaction()
576 tr = pullop.gettransaction()
576 for key in sorted(remoteobs, reverse=True):
577 for key in sorted(remoteobs, reverse=True):
577 if key.startswith('dump'):
578 if key.startswith('dump'):
578 data = base85.b85decode(remoteobs[key])
579 data = base85.b85decode(remoteobs[key])
579 pullop.repo.obsstore.mergemarkers(tr, data)
580 pullop.repo.obsstore.mergemarkers(tr, data)
580 pullop.repo.invalidatevolatilesets()
581 pullop.repo.invalidatevolatilesets()
581 return tr
582 return tr
582
583
583 def getbundle(repo, source, heads=None, common=None, bundlecaps=None):
584 def getbundle(repo, source, heads=None, common=None, bundlecaps=None):
584 """return a full bundle (with potentially multiple kind of parts)
585 """return a full bundle (with potentially multiple kind of parts)
585
586
586 Could be a bundle HG10 or a bundle HG20 depending on bundlecaps
587 Could be a bundle HG10 or a bundle HG20 depending on bundlecaps
587 passed. For now, the bundle can contain only changegroup, but this will
588 passed. For now, the bundle can contain only changegroup, but this will
588 changes when more part type will be available for bundle2.
589 changes when more part type will be available for bundle2.
589
590
590 This is different from changegroup.getbundle that only returns an HG10
591 This is different from changegroup.getbundle that only returns an HG10
591 changegroup bundle. They may eventually get reunited in the future when we
592 changegroup bundle. They may eventually get reunited in the future when we
592 have a clearer idea of the API we what to query different data.
593 have a clearer idea of the API we what to query different data.
593
594
594 The implementation is at a very early stage and will get massive rework
595 The implementation is at a very early stage and will get massive rework
595 when the API of bundle is refined.
596 when the API of bundle is refined.
596 """
597 """
597 # build bundle here.
598 # build bundle here.
598 cg = changegroup.getbundle(repo, source, heads=heads,
599 cg = changegroup.getbundle(repo, source, heads=heads,
599 common=common, bundlecaps=bundlecaps)
600 common=common, bundlecaps=bundlecaps)
600 if bundlecaps is None or 'HG20' not in bundlecaps:
601 if bundlecaps is None or 'HG20' not in bundlecaps:
601 return cg
602 return cg
602 # very crude first implementation,
603 # very crude first implementation,
603 # the bundle API will change and the generation will be done lazily.
604 # the bundle API will change and the generation will be done lazily.
604 bundler = bundle2.bundle20(repo.ui)
605 bundler = bundle2.bundle20(repo.ui)
605 tempname = changegroup.writebundle(cg, None, 'HG10UN')
606 tempname = changegroup.writebundle(cg, None, 'HG10UN')
606 data = open(tempname).read()
607 data = open(tempname).read()
607 part = bundle2.part('changegroup', data=data)
608 part = bundle2.part('changegroup', data=data)
608 bundler.addpart(part)
609 bundler.addpart(part)
609 temp = cStringIO.StringIO()
610 temp = cStringIO.StringIO()
610 for c in bundler.getchunks():
611 for c in bundler.getchunks():
611 temp.write(c)
612 temp.write(c)
612 temp.seek(0)
613 temp.seek(0)
613 return bundle2.unbundle20(repo.ui, temp)
614 return bundle2.unbundle20(repo.ui, temp)
614
615
615 class PushRaced(RuntimeError):
616 class PushRaced(RuntimeError):
616 """An exception raised during unbunding that indicate a push race"""
617 """An exception raised during unbunding that indicate a push race"""
617
618
618 def check_heads(repo, their_heads, context):
619 def check_heads(repo, their_heads, context):
619 """check if the heads of a repo have been modified
620 """check if the heads of a repo have been modified
620
621
621 Used by peer for unbundling.
622 Used by peer for unbundling.
622 """
623 """
623 heads = repo.heads()
624 heads = repo.heads()
624 heads_hash = util.sha1(''.join(sorted(heads))).digest()
625 heads_hash = util.sha1(''.join(sorted(heads))).digest()
625 if not (their_heads == ['force'] or their_heads == heads or
626 if not (their_heads == ['force'] or their_heads == heads or
626 their_heads == ['hashed', heads_hash]):
627 their_heads == ['hashed', heads_hash]):
627 # someone else committed/pushed/unbundled while we
628 # someone else committed/pushed/unbundled while we
628 # were transferring data
629 # were transferring data
629 raise PushRaced('repository changed while %s - '
630 raise PushRaced('repository changed while %s - '
630 'please try again' % context)
631 'please try again' % context)
632
633 def unbundle(repo, cg, heads, source, url):
634 """Apply a bundle to a repo.
635
636 this function makes sure the repo is locked during the application and have
637 mechanism to check that no push race occured between the creation of the
638 bundle and its application.
639
640 If the push was raced as PushRaced exception is raised."""
641 r = 0
642 lock = repo.lock()
643 try:
644 check_heads(repo, heads, 'uploading changes')
645 # push can proceed
646 try:
647 r = changegroup.addchangegroup(repo, cg, source, url)
648 except util.Abort, inst:
649 # The old code we moved used sys.stderr directly.
650 # We did not changed it to minise code change.
651 # This need to be moved to something proper.
652 # Feel free to do it.
653 sys.stderr.write("abort: %s\n" % inst)
654 finally:
655 lock.release()
656 return r
@@ -1,789 +1,778 b''
1 # wireproto.py - generic wire protocol support functions
1 # wireproto.py - generic wire protocol support functions
2 #
2 #
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 import urllib, tempfile, os, sys
8 import urllib, tempfile, os, sys
9 from i18n import _
9 from i18n import _
10 from node import bin, hex
10 from node import bin, hex
11 import changegroup as changegroupmod
11 import changegroup as changegroupmod
12 import peer, error, encoding, util, store, exchange
12 import peer, error, encoding, util, store, exchange
13
13
14
14
15 class abstractserverproto(object):
15 class abstractserverproto(object):
16 """abstract class that summarizes the protocol API
16 """abstract class that summarizes the protocol API
17
17
18 Used as reference and documentation.
18 Used as reference and documentation.
19 """
19 """
20
20
21 def getargs(self, args):
21 def getargs(self, args):
22 """return the value for arguments in <args>
22 """return the value for arguments in <args>
23
23
24 returns a list of values (same order as <args>)"""
24 returns a list of values (same order as <args>)"""
25 raise NotImplementedError()
25 raise NotImplementedError()
26
26
27 def getfile(self, fp):
27 def getfile(self, fp):
28 """write the whole content of a file into a file like object
28 """write the whole content of a file into a file like object
29
29
30 The file is in the form::
30 The file is in the form::
31
31
32 (<chunk-size>\n<chunk>)+0\n
32 (<chunk-size>\n<chunk>)+0\n
33
33
34 chunk size is the ascii version of the int.
34 chunk size is the ascii version of the int.
35 """
35 """
36 raise NotImplementedError()
36 raise NotImplementedError()
37
37
38 def redirect(self):
38 def redirect(self):
39 """may setup interception for stdout and stderr
39 """may setup interception for stdout and stderr
40
40
41 See also the `restore` method."""
41 See also the `restore` method."""
42 raise NotImplementedError()
42 raise NotImplementedError()
43
43
44 # If the `redirect` function does install interception, the `restore`
44 # If the `redirect` function does install interception, the `restore`
45 # function MUST be defined. If interception is not used, this function
45 # function MUST be defined. If interception is not used, this function
46 # MUST NOT be defined.
46 # MUST NOT be defined.
47 #
47 #
48 # left commented here on purpose
48 # left commented here on purpose
49 #
49 #
50 #def restore(self):
50 #def restore(self):
51 # """reinstall previous stdout and stderr and return intercepted stdout
51 # """reinstall previous stdout and stderr and return intercepted stdout
52 # """
52 # """
53 # raise NotImplementedError()
53 # raise NotImplementedError()
54
54
55 def groupchunks(self, cg):
55 def groupchunks(self, cg):
56 """return 4096 chunks from a changegroup object
56 """return 4096 chunks from a changegroup object
57
57
58 Some protocols may have compressed the contents."""
58 Some protocols may have compressed the contents."""
59 raise NotImplementedError()
59 raise NotImplementedError()
60
60
61 # abstract batching support
61 # abstract batching support
62
62
63 class future(object):
63 class future(object):
64 '''placeholder for a value to be set later'''
64 '''placeholder for a value to be set later'''
65 def set(self, value):
65 def set(self, value):
66 if util.safehasattr(self, 'value'):
66 if util.safehasattr(self, 'value'):
67 raise error.RepoError("future is already set")
67 raise error.RepoError("future is already set")
68 self.value = value
68 self.value = value
69
69
70 class batcher(object):
70 class batcher(object):
71 '''base class for batches of commands submittable in a single request
71 '''base class for batches of commands submittable in a single request
72
72
73 All methods invoked on instances of this class are simply queued and
73 All methods invoked on instances of this class are simply queued and
74 return a a future for the result. Once you call submit(), all the queued
74 return a a future for the result. Once you call submit(), all the queued
75 calls are performed and the results set in their respective futures.
75 calls are performed and the results set in their respective futures.
76 '''
76 '''
77 def __init__(self):
77 def __init__(self):
78 self.calls = []
78 self.calls = []
79 def __getattr__(self, name):
79 def __getattr__(self, name):
80 def call(*args, **opts):
80 def call(*args, **opts):
81 resref = future()
81 resref = future()
82 self.calls.append((name, args, opts, resref,))
82 self.calls.append((name, args, opts, resref,))
83 return resref
83 return resref
84 return call
84 return call
85 def submit(self):
85 def submit(self):
86 pass
86 pass
87
87
88 class localbatch(batcher):
88 class localbatch(batcher):
89 '''performs the queued calls directly'''
89 '''performs the queued calls directly'''
90 def __init__(self, local):
90 def __init__(self, local):
91 batcher.__init__(self)
91 batcher.__init__(self)
92 self.local = local
92 self.local = local
93 def submit(self):
93 def submit(self):
94 for name, args, opts, resref in self.calls:
94 for name, args, opts, resref in self.calls:
95 resref.set(getattr(self.local, name)(*args, **opts))
95 resref.set(getattr(self.local, name)(*args, **opts))
96
96
97 class remotebatch(batcher):
97 class remotebatch(batcher):
98 '''batches the queued calls; uses as few roundtrips as possible'''
98 '''batches the queued calls; uses as few roundtrips as possible'''
99 def __init__(self, remote):
99 def __init__(self, remote):
100 '''remote must support _submitbatch(encbatch) and
100 '''remote must support _submitbatch(encbatch) and
101 _submitone(op, encargs)'''
101 _submitone(op, encargs)'''
102 batcher.__init__(self)
102 batcher.__init__(self)
103 self.remote = remote
103 self.remote = remote
104 def submit(self):
104 def submit(self):
105 req, rsp = [], []
105 req, rsp = [], []
106 for name, args, opts, resref in self.calls:
106 for name, args, opts, resref in self.calls:
107 mtd = getattr(self.remote, name)
107 mtd = getattr(self.remote, name)
108 batchablefn = getattr(mtd, 'batchable', None)
108 batchablefn = getattr(mtd, 'batchable', None)
109 if batchablefn is not None:
109 if batchablefn is not None:
110 batchable = batchablefn(mtd.im_self, *args, **opts)
110 batchable = batchablefn(mtd.im_self, *args, **opts)
111 encargsorres, encresref = batchable.next()
111 encargsorres, encresref = batchable.next()
112 if encresref:
112 if encresref:
113 req.append((name, encargsorres,))
113 req.append((name, encargsorres,))
114 rsp.append((batchable, encresref, resref,))
114 rsp.append((batchable, encresref, resref,))
115 else:
115 else:
116 resref.set(encargsorres)
116 resref.set(encargsorres)
117 else:
117 else:
118 if req:
118 if req:
119 self._submitreq(req, rsp)
119 self._submitreq(req, rsp)
120 req, rsp = [], []
120 req, rsp = [], []
121 resref.set(mtd(*args, **opts))
121 resref.set(mtd(*args, **opts))
122 if req:
122 if req:
123 self._submitreq(req, rsp)
123 self._submitreq(req, rsp)
124 def _submitreq(self, req, rsp):
124 def _submitreq(self, req, rsp):
125 encresults = self.remote._submitbatch(req)
125 encresults = self.remote._submitbatch(req)
126 for encres, r in zip(encresults, rsp):
126 for encres, r in zip(encresults, rsp):
127 batchable, encresref, resref = r
127 batchable, encresref, resref = r
128 encresref.set(encres)
128 encresref.set(encres)
129 resref.set(batchable.next())
129 resref.set(batchable.next())
130
130
131 def batchable(f):
131 def batchable(f):
132 '''annotation for batchable methods
132 '''annotation for batchable methods
133
133
134 Such methods must implement a coroutine as follows:
134 Such methods must implement a coroutine as follows:
135
135
136 @batchable
136 @batchable
137 def sample(self, one, two=None):
137 def sample(self, one, two=None):
138 # Handle locally computable results first:
138 # Handle locally computable results first:
139 if not one:
139 if not one:
140 yield "a local result", None
140 yield "a local result", None
141 # Build list of encoded arguments suitable for your wire protocol:
141 # Build list of encoded arguments suitable for your wire protocol:
142 encargs = [('one', encode(one),), ('two', encode(two),)]
142 encargs = [('one', encode(one),), ('two', encode(two),)]
143 # Create future for injection of encoded result:
143 # Create future for injection of encoded result:
144 encresref = future()
144 encresref = future()
145 # Return encoded arguments and future:
145 # Return encoded arguments and future:
146 yield encargs, encresref
146 yield encargs, encresref
147 # Assuming the future to be filled with the result from the batched
147 # Assuming the future to be filled with the result from the batched
148 # request now. Decode it:
148 # request now. Decode it:
149 yield decode(encresref.value)
149 yield decode(encresref.value)
150
150
151 The decorator returns a function which wraps this coroutine as a plain
151 The decorator returns a function which wraps this coroutine as a plain
152 method, but adds the original method as an attribute called "batchable",
152 method, but adds the original method as an attribute called "batchable",
153 which is used by remotebatch to split the call into separate encoding and
153 which is used by remotebatch to split the call into separate encoding and
154 decoding phases.
154 decoding phases.
155 '''
155 '''
156 def plain(*args, **opts):
156 def plain(*args, **opts):
157 batchable = f(*args, **opts)
157 batchable = f(*args, **opts)
158 encargsorres, encresref = batchable.next()
158 encargsorres, encresref = batchable.next()
159 if not encresref:
159 if not encresref:
160 return encargsorres # a local result in this case
160 return encargsorres # a local result in this case
161 self = args[0]
161 self = args[0]
162 encresref.set(self._submitone(f.func_name, encargsorres))
162 encresref.set(self._submitone(f.func_name, encargsorres))
163 return batchable.next()
163 return batchable.next()
164 setattr(plain, 'batchable', f)
164 setattr(plain, 'batchable', f)
165 return plain
165 return plain
166
166
167 # list of nodes encoding / decoding
167 # list of nodes encoding / decoding
168
168
169 def decodelist(l, sep=' '):
169 def decodelist(l, sep=' '):
170 if l:
170 if l:
171 return map(bin, l.split(sep))
171 return map(bin, l.split(sep))
172 return []
172 return []
173
173
174 def encodelist(l, sep=' '):
174 def encodelist(l, sep=' '):
175 return sep.join(map(hex, l))
175 return sep.join(map(hex, l))
176
176
177 # batched call argument encoding
177 # batched call argument encoding
178
178
179 def escapearg(plain):
179 def escapearg(plain):
180 return (plain
180 return (plain
181 .replace(':', '::')
181 .replace(':', '::')
182 .replace(',', ':,')
182 .replace(',', ':,')
183 .replace(';', ':;')
183 .replace(';', ':;')
184 .replace('=', ':='))
184 .replace('=', ':='))
185
185
186 def unescapearg(escaped):
186 def unescapearg(escaped):
187 return (escaped
187 return (escaped
188 .replace(':=', '=')
188 .replace(':=', '=')
189 .replace(':;', ';')
189 .replace(':;', ';')
190 .replace(':,', ',')
190 .replace(':,', ',')
191 .replace('::', ':'))
191 .replace('::', ':'))
192
192
193 # client side
193 # client side
194
194
195 class wirepeer(peer.peerrepository):
195 class wirepeer(peer.peerrepository):
196
196
197 def batch(self):
197 def batch(self):
198 return remotebatch(self)
198 return remotebatch(self)
199 def _submitbatch(self, req):
199 def _submitbatch(self, req):
200 cmds = []
200 cmds = []
201 for op, argsdict in req:
201 for op, argsdict in req:
202 args = ','.join('%s=%s' % p for p in argsdict.iteritems())
202 args = ','.join('%s=%s' % p for p in argsdict.iteritems())
203 cmds.append('%s %s' % (op, args))
203 cmds.append('%s %s' % (op, args))
204 rsp = self._call("batch", cmds=';'.join(cmds))
204 rsp = self._call("batch", cmds=';'.join(cmds))
205 return rsp.split(';')
205 return rsp.split(';')
206 def _submitone(self, op, args):
206 def _submitone(self, op, args):
207 return self._call(op, **args)
207 return self._call(op, **args)
208
208
209 @batchable
209 @batchable
210 def lookup(self, key):
210 def lookup(self, key):
211 self.requirecap('lookup', _('look up remote revision'))
211 self.requirecap('lookup', _('look up remote revision'))
212 f = future()
212 f = future()
213 yield {'key': encoding.fromlocal(key)}, f
213 yield {'key': encoding.fromlocal(key)}, f
214 d = f.value
214 d = f.value
215 success, data = d[:-1].split(" ", 1)
215 success, data = d[:-1].split(" ", 1)
216 if int(success):
216 if int(success):
217 yield bin(data)
217 yield bin(data)
218 self._abort(error.RepoError(data))
218 self._abort(error.RepoError(data))
219
219
220 @batchable
220 @batchable
221 def heads(self):
221 def heads(self):
222 f = future()
222 f = future()
223 yield {}, f
223 yield {}, f
224 d = f.value
224 d = f.value
225 try:
225 try:
226 yield decodelist(d[:-1])
226 yield decodelist(d[:-1])
227 except ValueError:
227 except ValueError:
228 self._abort(error.ResponseError(_("unexpected response:"), d))
228 self._abort(error.ResponseError(_("unexpected response:"), d))
229
229
230 @batchable
230 @batchable
231 def known(self, nodes):
231 def known(self, nodes):
232 f = future()
232 f = future()
233 yield {'nodes': encodelist(nodes)}, f
233 yield {'nodes': encodelist(nodes)}, f
234 d = f.value
234 d = f.value
235 try:
235 try:
236 yield [bool(int(f)) for f in d]
236 yield [bool(int(f)) for f in d]
237 except ValueError:
237 except ValueError:
238 self._abort(error.ResponseError(_("unexpected response:"), d))
238 self._abort(error.ResponseError(_("unexpected response:"), d))
239
239
240 @batchable
240 @batchable
241 def branchmap(self):
241 def branchmap(self):
242 f = future()
242 f = future()
243 yield {}, f
243 yield {}, f
244 d = f.value
244 d = f.value
245 try:
245 try:
246 branchmap = {}
246 branchmap = {}
247 for branchpart in d.splitlines():
247 for branchpart in d.splitlines():
248 branchname, branchheads = branchpart.split(' ', 1)
248 branchname, branchheads = branchpart.split(' ', 1)
249 branchname = encoding.tolocal(urllib.unquote(branchname))
249 branchname = encoding.tolocal(urllib.unquote(branchname))
250 branchheads = decodelist(branchheads)
250 branchheads = decodelist(branchheads)
251 branchmap[branchname] = branchheads
251 branchmap[branchname] = branchheads
252 yield branchmap
252 yield branchmap
253 except TypeError:
253 except TypeError:
254 self._abort(error.ResponseError(_("unexpected response:"), d))
254 self._abort(error.ResponseError(_("unexpected response:"), d))
255
255
256 def branches(self, nodes):
256 def branches(self, nodes):
257 n = encodelist(nodes)
257 n = encodelist(nodes)
258 d = self._call("branches", nodes=n)
258 d = self._call("branches", nodes=n)
259 try:
259 try:
260 br = [tuple(decodelist(b)) for b in d.splitlines()]
260 br = [tuple(decodelist(b)) for b in d.splitlines()]
261 return br
261 return br
262 except ValueError:
262 except ValueError:
263 self._abort(error.ResponseError(_("unexpected response:"), d))
263 self._abort(error.ResponseError(_("unexpected response:"), d))
264
264
265 def between(self, pairs):
265 def between(self, pairs):
266 batch = 8 # avoid giant requests
266 batch = 8 # avoid giant requests
267 r = []
267 r = []
268 for i in xrange(0, len(pairs), batch):
268 for i in xrange(0, len(pairs), batch):
269 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
269 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
270 d = self._call("between", pairs=n)
270 d = self._call("between", pairs=n)
271 try:
271 try:
272 r.extend(l and decodelist(l) or [] for l in d.splitlines())
272 r.extend(l and decodelist(l) or [] for l in d.splitlines())
273 except ValueError:
273 except ValueError:
274 self._abort(error.ResponseError(_("unexpected response:"), d))
274 self._abort(error.ResponseError(_("unexpected response:"), d))
275 return r
275 return r
276
276
277 @batchable
277 @batchable
278 def pushkey(self, namespace, key, old, new):
278 def pushkey(self, namespace, key, old, new):
279 if not self.capable('pushkey'):
279 if not self.capable('pushkey'):
280 yield False, None
280 yield False, None
281 f = future()
281 f = future()
282 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
282 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
283 yield {'namespace': encoding.fromlocal(namespace),
283 yield {'namespace': encoding.fromlocal(namespace),
284 'key': encoding.fromlocal(key),
284 'key': encoding.fromlocal(key),
285 'old': encoding.fromlocal(old),
285 'old': encoding.fromlocal(old),
286 'new': encoding.fromlocal(new)}, f
286 'new': encoding.fromlocal(new)}, f
287 d = f.value
287 d = f.value
288 d, output = d.split('\n', 1)
288 d, output = d.split('\n', 1)
289 try:
289 try:
290 d = bool(int(d))
290 d = bool(int(d))
291 except ValueError:
291 except ValueError:
292 raise error.ResponseError(
292 raise error.ResponseError(
293 _('push failed (unexpected response):'), d)
293 _('push failed (unexpected response):'), d)
294 for l in output.splitlines(True):
294 for l in output.splitlines(True):
295 self.ui.status(_('remote: '), l)
295 self.ui.status(_('remote: '), l)
296 yield d
296 yield d
297
297
298 @batchable
298 @batchable
299 def listkeys(self, namespace):
299 def listkeys(self, namespace):
300 if not self.capable('pushkey'):
300 if not self.capable('pushkey'):
301 yield {}, None
301 yield {}, None
302 f = future()
302 f = future()
303 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
303 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
304 yield {'namespace': encoding.fromlocal(namespace)}, f
304 yield {'namespace': encoding.fromlocal(namespace)}, f
305 d = f.value
305 d = f.value
306 r = {}
306 r = {}
307 for l in d.splitlines():
307 for l in d.splitlines():
308 k, v = l.split('\t')
308 k, v = l.split('\t')
309 r[encoding.tolocal(k)] = encoding.tolocal(v)
309 r[encoding.tolocal(k)] = encoding.tolocal(v)
310 yield r
310 yield r
311
311
312 def stream_out(self):
312 def stream_out(self):
313 return self._callstream('stream_out')
313 return self._callstream('stream_out')
314
314
315 def changegroup(self, nodes, kind):
315 def changegroup(self, nodes, kind):
316 n = encodelist(nodes)
316 n = encodelist(nodes)
317 f = self._callcompressable("changegroup", roots=n)
317 f = self._callcompressable("changegroup", roots=n)
318 return changegroupmod.unbundle10(f, 'UN')
318 return changegroupmod.unbundle10(f, 'UN')
319
319
320 def changegroupsubset(self, bases, heads, kind):
320 def changegroupsubset(self, bases, heads, kind):
321 self.requirecap('changegroupsubset', _('look up remote changes'))
321 self.requirecap('changegroupsubset', _('look up remote changes'))
322 bases = encodelist(bases)
322 bases = encodelist(bases)
323 heads = encodelist(heads)
323 heads = encodelist(heads)
324 f = self._callcompressable("changegroupsubset",
324 f = self._callcompressable("changegroupsubset",
325 bases=bases, heads=heads)
325 bases=bases, heads=heads)
326 return changegroupmod.unbundle10(f, 'UN')
326 return changegroupmod.unbundle10(f, 'UN')
327
327
328 def getbundle(self, source, heads=None, common=None, bundlecaps=None):
328 def getbundle(self, source, heads=None, common=None, bundlecaps=None):
329 self.requirecap('getbundle', _('look up remote changes'))
329 self.requirecap('getbundle', _('look up remote changes'))
330 opts = {}
330 opts = {}
331 if heads is not None:
331 if heads is not None:
332 opts['heads'] = encodelist(heads)
332 opts['heads'] = encodelist(heads)
333 if common is not None:
333 if common is not None:
334 opts['common'] = encodelist(common)
334 opts['common'] = encodelist(common)
335 if bundlecaps is not None:
335 if bundlecaps is not None:
336 opts['bundlecaps'] = ','.join(bundlecaps)
336 opts['bundlecaps'] = ','.join(bundlecaps)
337 f = self._callcompressable("getbundle", **opts)
337 f = self._callcompressable("getbundle", **opts)
338 return changegroupmod.unbundle10(f, 'UN')
338 return changegroupmod.unbundle10(f, 'UN')
339
339
340 def unbundle(self, cg, heads, source):
340 def unbundle(self, cg, heads, source):
341 '''Send cg (a readable file-like object representing the
341 '''Send cg (a readable file-like object representing the
342 changegroup to push, typically a chunkbuffer object) to the
342 changegroup to push, typically a chunkbuffer object) to the
343 remote server as a bundle. Return an integer indicating the
343 remote server as a bundle. Return an integer indicating the
344 result of the push (see localrepository.addchangegroup()).'''
344 result of the push (see localrepository.addchangegroup()).'''
345
345
346 if heads != ['force'] and self.capable('unbundlehash'):
346 if heads != ['force'] and self.capable('unbundlehash'):
347 heads = encodelist(['hashed',
347 heads = encodelist(['hashed',
348 util.sha1(''.join(sorted(heads))).digest()])
348 util.sha1(''.join(sorted(heads))).digest()])
349 else:
349 else:
350 heads = encodelist(heads)
350 heads = encodelist(heads)
351
351
352 ret, output = self._callpush("unbundle", cg, heads=heads)
352 ret, output = self._callpush("unbundle", cg, heads=heads)
353 if ret == "":
353 if ret == "":
354 raise error.ResponseError(
354 raise error.ResponseError(
355 _('push failed:'), output)
355 _('push failed:'), output)
356 try:
356 try:
357 ret = int(ret)
357 ret = int(ret)
358 except ValueError:
358 except ValueError:
359 raise error.ResponseError(
359 raise error.ResponseError(
360 _('push failed (unexpected response):'), ret)
360 _('push failed (unexpected response):'), ret)
361
361
362 for l in output.splitlines(True):
362 for l in output.splitlines(True):
363 self.ui.status(_('remote: '), l)
363 self.ui.status(_('remote: '), l)
364 return ret
364 return ret
365
365
366 def debugwireargs(self, one, two, three=None, four=None, five=None):
366 def debugwireargs(self, one, two, three=None, four=None, five=None):
367 # don't pass optional arguments left at their default value
367 # don't pass optional arguments left at their default value
368 opts = {}
368 opts = {}
369 if three is not None:
369 if three is not None:
370 opts['three'] = three
370 opts['three'] = three
371 if four is not None:
371 if four is not None:
372 opts['four'] = four
372 opts['four'] = four
373 return self._call('debugwireargs', one=one, two=two, **opts)
373 return self._call('debugwireargs', one=one, two=two, **opts)
374
374
375 def _call(self, cmd, **args):
375 def _call(self, cmd, **args):
376 """execute <cmd> on the server
376 """execute <cmd> on the server
377
377
378 The command is expected to return a simple string.
378 The command is expected to return a simple string.
379
379
380 returns the server reply as a string."""
380 returns the server reply as a string."""
381 raise NotImplementedError()
381 raise NotImplementedError()
382
382
383 def _callstream(self, cmd, **args):
383 def _callstream(self, cmd, **args):
384 """execute <cmd> on the server
384 """execute <cmd> on the server
385
385
386 The command is expected to return a stream.
386 The command is expected to return a stream.
387
387
388 returns the server reply as a file like object."""
388 returns the server reply as a file like object."""
389 raise NotImplementedError()
389 raise NotImplementedError()
390
390
391 def _callcompressable(self, cmd, **args):
391 def _callcompressable(self, cmd, **args):
392 """execute <cmd> on the server
392 """execute <cmd> on the server
393
393
394 The command is expected to return a stream.
394 The command is expected to return a stream.
395
395
396 The stream may have been compressed in some implementaitons. This
396 The stream may have been compressed in some implementaitons. This
397 function takes care of the decompression. This is the only difference
397 function takes care of the decompression. This is the only difference
398 with _callstream.
398 with _callstream.
399
399
400 returns the server reply as a file like object.
400 returns the server reply as a file like object.
401 """
401 """
402 raise NotImplementedError()
402 raise NotImplementedError()
403
403
404 def _callpush(self, cmd, fp, **args):
404 def _callpush(self, cmd, fp, **args):
405 """execute a <cmd> on server
405 """execute a <cmd> on server
406
406
407 The command is expected to be related to a push. Push has a special
407 The command is expected to be related to a push. Push has a special
408 return method.
408 return method.
409
409
410 returns the server reply as a (ret, output) tuple. ret is either
410 returns the server reply as a (ret, output) tuple. ret is either
411 empty (error) or a stringified int.
411 empty (error) or a stringified int.
412 """
412 """
413 raise NotImplementedError()
413 raise NotImplementedError()
414
414
415 def _abort(self, exception):
415 def _abort(self, exception):
416 """clearly abort the wire protocol connection and raise the exception
416 """clearly abort the wire protocol connection and raise the exception
417 """
417 """
418 raise NotImplementedError()
418 raise NotImplementedError()
419
419
420 # server side
420 # server side
421
421
422 # wire protocol command can either return a string or one of these classes.
422 # wire protocol command can either return a string or one of these classes.
423 class streamres(object):
423 class streamres(object):
424 """wireproto reply: binary stream
424 """wireproto reply: binary stream
425
425
426 The call was successful and the result is a stream.
426 The call was successful and the result is a stream.
427 Iterate on the `self.gen` attribute to retrieve chunks.
427 Iterate on the `self.gen` attribute to retrieve chunks.
428 """
428 """
429 def __init__(self, gen):
429 def __init__(self, gen):
430 self.gen = gen
430 self.gen = gen
431
431
432 class pushres(object):
432 class pushres(object):
433 """wireproto reply: success with simple integer return
433 """wireproto reply: success with simple integer return
434
434
435 The call was successful and returned an integer contained in `self.res`.
435 The call was successful and returned an integer contained in `self.res`.
436 """
436 """
437 def __init__(self, res):
437 def __init__(self, res):
438 self.res = res
438 self.res = res
439
439
440 class pusherr(object):
440 class pusherr(object):
441 """wireproto reply: failure
441 """wireproto reply: failure
442
442
443 The call failed. The `self.res` attribute contains the error message.
443 The call failed. The `self.res` attribute contains the error message.
444 """
444 """
445 def __init__(self, res):
445 def __init__(self, res):
446 self.res = res
446 self.res = res
447
447
448 class ooberror(object):
448 class ooberror(object):
449 """wireproto reply: failure of a batch of operation
449 """wireproto reply: failure of a batch of operation
450
450
451 Something failed during a batch call. The error message is stored in
451 Something failed during a batch call. The error message is stored in
452 `self.message`.
452 `self.message`.
453 """
453 """
454 def __init__(self, message):
454 def __init__(self, message):
455 self.message = message
455 self.message = message
456
456
457 def dispatch(repo, proto, command):
457 def dispatch(repo, proto, command):
458 repo = repo.filtered("served")
458 repo = repo.filtered("served")
459 func, spec = commands[command]
459 func, spec = commands[command]
460 args = proto.getargs(spec)
460 args = proto.getargs(spec)
461 return func(repo, proto, *args)
461 return func(repo, proto, *args)
462
462
463 def options(cmd, keys, others):
463 def options(cmd, keys, others):
464 opts = {}
464 opts = {}
465 for k in keys:
465 for k in keys:
466 if k in others:
466 if k in others:
467 opts[k] = others[k]
467 opts[k] = others[k]
468 del others[k]
468 del others[k]
469 if others:
469 if others:
470 sys.stderr.write("abort: %s got unexpected arguments %s\n"
470 sys.stderr.write("abort: %s got unexpected arguments %s\n"
471 % (cmd, ",".join(others)))
471 % (cmd, ",".join(others)))
472 return opts
472 return opts
473
473
474 # list of commands
474 # list of commands
475 commands = {}
475 commands = {}
476
476
477 def wireprotocommand(name, args=''):
477 def wireprotocommand(name, args=''):
478 """decorator for wireprotocol command"""
478 """decorator for wireprotocol command"""
479 def register(func):
479 def register(func):
480 commands[name] = (func, args)
480 commands[name] = (func, args)
481 return func
481 return func
482 return register
482 return register
483
483
484 @wireprotocommand('batch', 'cmds *')
484 @wireprotocommand('batch', 'cmds *')
485 def batch(repo, proto, cmds, others):
485 def batch(repo, proto, cmds, others):
486 repo = repo.filtered("served")
486 repo = repo.filtered("served")
487 res = []
487 res = []
488 for pair in cmds.split(';'):
488 for pair in cmds.split(';'):
489 op, args = pair.split(' ', 1)
489 op, args = pair.split(' ', 1)
490 vals = {}
490 vals = {}
491 for a in args.split(','):
491 for a in args.split(','):
492 if a:
492 if a:
493 n, v = a.split('=')
493 n, v = a.split('=')
494 vals[n] = unescapearg(v)
494 vals[n] = unescapearg(v)
495 func, spec = commands[op]
495 func, spec = commands[op]
496 if spec:
496 if spec:
497 keys = spec.split()
497 keys = spec.split()
498 data = {}
498 data = {}
499 for k in keys:
499 for k in keys:
500 if k == '*':
500 if k == '*':
501 star = {}
501 star = {}
502 for key in vals.keys():
502 for key in vals.keys():
503 if key not in keys:
503 if key not in keys:
504 star[key] = vals[key]
504 star[key] = vals[key]
505 data['*'] = star
505 data['*'] = star
506 else:
506 else:
507 data[k] = vals[k]
507 data[k] = vals[k]
508 result = func(repo, proto, *[data[k] for k in keys])
508 result = func(repo, proto, *[data[k] for k in keys])
509 else:
509 else:
510 result = func(repo, proto)
510 result = func(repo, proto)
511 if isinstance(result, ooberror):
511 if isinstance(result, ooberror):
512 return result
512 return result
513 res.append(escapearg(result))
513 res.append(escapearg(result))
514 return ';'.join(res)
514 return ';'.join(res)
515
515
516 @wireprotocommand('between', 'pairs')
516 @wireprotocommand('between', 'pairs')
517 def between(repo, proto, pairs):
517 def between(repo, proto, pairs):
518 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
518 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
519 r = []
519 r = []
520 for b in repo.between(pairs):
520 for b in repo.between(pairs):
521 r.append(encodelist(b) + "\n")
521 r.append(encodelist(b) + "\n")
522 return "".join(r)
522 return "".join(r)
523
523
524 @wireprotocommand('branchmap')
524 @wireprotocommand('branchmap')
525 def branchmap(repo, proto):
525 def branchmap(repo, proto):
526 branchmap = repo.branchmap()
526 branchmap = repo.branchmap()
527 heads = []
527 heads = []
528 for branch, nodes in branchmap.iteritems():
528 for branch, nodes in branchmap.iteritems():
529 branchname = urllib.quote(encoding.fromlocal(branch))
529 branchname = urllib.quote(encoding.fromlocal(branch))
530 branchnodes = encodelist(nodes)
530 branchnodes = encodelist(nodes)
531 heads.append('%s %s' % (branchname, branchnodes))
531 heads.append('%s %s' % (branchname, branchnodes))
532 return '\n'.join(heads)
532 return '\n'.join(heads)
533
533
534 @wireprotocommand('branches', 'nodes')
534 @wireprotocommand('branches', 'nodes')
535 def branches(repo, proto, nodes):
535 def branches(repo, proto, nodes):
536 nodes = decodelist(nodes)
536 nodes = decodelist(nodes)
537 r = []
537 r = []
538 for b in repo.branches(nodes):
538 for b in repo.branches(nodes):
539 r.append(encodelist(b) + "\n")
539 r.append(encodelist(b) + "\n")
540 return "".join(r)
540 return "".join(r)
541
541
542
542
543 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
543 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
544 'known', 'getbundle', 'unbundlehash', 'batch']
544 'known', 'getbundle', 'unbundlehash', 'batch']
545
545
546 def _capabilities(repo, proto):
546 def _capabilities(repo, proto):
547 """return a list of capabilities for a repo
547 """return a list of capabilities for a repo
548
548
549 This function exists to allow extensions to easily wrap capabilities
549 This function exists to allow extensions to easily wrap capabilities
550 computation
550 computation
551
551
552 - returns a lists: easy to alter
552 - returns a lists: easy to alter
553 - change done here will be propagated to both `capabilities` and `hello`
553 - change done here will be propagated to both `capabilities` and `hello`
554 command without any other effort. without any other action needed.
554 command without any other effort. without any other action needed.
555 """
555 """
556 # copy to prevent modification of the global list
556 # copy to prevent modification of the global list
557 caps = list(wireprotocaps)
557 caps = list(wireprotocaps)
558 if _allowstream(repo.ui):
558 if _allowstream(repo.ui):
559 if repo.ui.configbool('server', 'preferuncompressed', False):
559 if repo.ui.configbool('server', 'preferuncompressed', False):
560 caps.append('stream-preferred')
560 caps.append('stream-preferred')
561 requiredformats = repo.requirements & repo.supportedformats
561 requiredformats = repo.requirements & repo.supportedformats
562 # if our local revlogs are just revlogv1, add 'stream' cap
562 # if our local revlogs are just revlogv1, add 'stream' cap
563 if not requiredformats - set(('revlogv1',)):
563 if not requiredformats - set(('revlogv1',)):
564 caps.append('stream')
564 caps.append('stream')
565 # otherwise, add 'streamreqs' detailing our local revlog format
565 # otherwise, add 'streamreqs' detailing our local revlog format
566 else:
566 else:
567 caps.append('streamreqs=%s' % ','.join(requiredformats))
567 caps.append('streamreqs=%s' % ','.join(requiredformats))
568 caps.append('unbundle=%s' % ','.join(changegroupmod.bundlepriority))
568 caps.append('unbundle=%s' % ','.join(changegroupmod.bundlepriority))
569 caps.append('httpheader=1024')
569 caps.append('httpheader=1024')
570 return caps
570 return caps
571
571
572 # If you are writting and extension and consider wrapping this function. Wrap
572 # If you are writting and extension and consider wrapping this function. Wrap
573 # `_capabilities` instead.
573 # `_capabilities` instead.
574 @wireprotocommand('capabilities')
574 @wireprotocommand('capabilities')
575 def capabilities(repo, proto):
575 def capabilities(repo, proto):
576 return ' '.join(_capabilities(repo, proto))
576 return ' '.join(_capabilities(repo, proto))
577
577
578 @wireprotocommand('changegroup', 'roots')
578 @wireprotocommand('changegroup', 'roots')
579 def changegroup(repo, proto, roots):
579 def changegroup(repo, proto, roots):
580 nodes = decodelist(roots)
580 nodes = decodelist(roots)
581 cg = changegroupmod.changegroup(repo, nodes, 'serve')
581 cg = changegroupmod.changegroup(repo, nodes, 'serve')
582 return streamres(proto.groupchunks(cg))
582 return streamres(proto.groupchunks(cg))
583
583
584 @wireprotocommand('changegroupsubset', 'bases heads')
584 @wireprotocommand('changegroupsubset', 'bases heads')
585 def changegroupsubset(repo, proto, bases, heads):
585 def changegroupsubset(repo, proto, bases, heads):
586 bases = decodelist(bases)
586 bases = decodelist(bases)
587 heads = decodelist(heads)
587 heads = decodelist(heads)
588 cg = changegroupmod.changegroupsubset(repo, bases, heads, 'serve')
588 cg = changegroupmod.changegroupsubset(repo, bases, heads, 'serve')
589 return streamres(proto.groupchunks(cg))
589 return streamres(proto.groupchunks(cg))
590
590
591 @wireprotocommand('debugwireargs', 'one two *')
591 @wireprotocommand('debugwireargs', 'one two *')
592 def debugwireargs(repo, proto, one, two, others):
592 def debugwireargs(repo, proto, one, two, others):
593 # only accept optional args from the known set
593 # only accept optional args from the known set
594 opts = options('debugwireargs', ['three', 'four'], others)
594 opts = options('debugwireargs', ['three', 'four'], others)
595 return repo.debugwireargs(one, two, **opts)
595 return repo.debugwireargs(one, two, **opts)
596
596
597 @wireprotocommand('getbundle', '*')
597 @wireprotocommand('getbundle', '*')
598 def getbundle(repo, proto, others):
598 def getbundle(repo, proto, others):
599 opts = options('getbundle', ['heads', 'common', 'bundlecaps'], others)
599 opts = options('getbundle', ['heads', 'common', 'bundlecaps'], others)
600 for k, v in opts.iteritems():
600 for k, v in opts.iteritems():
601 if k in ('heads', 'common'):
601 if k in ('heads', 'common'):
602 opts[k] = decodelist(v)
602 opts[k] = decodelist(v)
603 elif k == 'bundlecaps':
603 elif k == 'bundlecaps':
604 opts[k] = set(v.split(','))
604 opts[k] = set(v.split(','))
605 cg = changegroupmod.getbundle(repo, 'serve', **opts)
605 cg = changegroupmod.getbundle(repo, 'serve', **opts)
606 return streamres(proto.groupchunks(cg))
606 return streamres(proto.groupchunks(cg))
607
607
608 @wireprotocommand('heads')
608 @wireprotocommand('heads')
609 def heads(repo, proto):
609 def heads(repo, proto):
610 h = repo.heads()
610 h = repo.heads()
611 return encodelist(h) + "\n"
611 return encodelist(h) + "\n"
612
612
613 @wireprotocommand('hello')
613 @wireprotocommand('hello')
614 def hello(repo, proto):
614 def hello(repo, proto):
615 '''the hello command returns a set of lines describing various
615 '''the hello command returns a set of lines describing various
616 interesting things about the server, in an RFC822-like format.
616 interesting things about the server, in an RFC822-like format.
617 Currently the only one defined is "capabilities", which
617 Currently the only one defined is "capabilities", which
618 consists of a line in the form:
618 consists of a line in the form:
619
619
620 capabilities: space separated list of tokens
620 capabilities: space separated list of tokens
621 '''
621 '''
622 return "capabilities: %s\n" % (capabilities(repo, proto))
622 return "capabilities: %s\n" % (capabilities(repo, proto))
623
623
624 @wireprotocommand('listkeys', 'namespace')
624 @wireprotocommand('listkeys', 'namespace')
625 def listkeys(repo, proto, namespace):
625 def listkeys(repo, proto, namespace):
626 d = repo.listkeys(encoding.tolocal(namespace)).items()
626 d = repo.listkeys(encoding.tolocal(namespace)).items()
627 t = '\n'.join(['%s\t%s' % (encoding.fromlocal(k), encoding.fromlocal(v))
627 t = '\n'.join(['%s\t%s' % (encoding.fromlocal(k), encoding.fromlocal(v))
628 for k, v in d])
628 for k, v in d])
629 return t
629 return t
630
630
631 @wireprotocommand('lookup', 'key')
631 @wireprotocommand('lookup', 'key')
632 def lookup(repo, proto, key):
632 def lookup(repo, proto, key):
633 try:
633 try:
634 k = encoding.tolocal(key)
634 k = encoding.tolocal(key)
635 c = repo[k]
635 c = repo[k]
636 r = c.hex()
636 r = c.hex()
637 success = 1
637 success = 1
638 except Exception, inst:
638 except Exception, inst:
639 r = str(inst)
639 r = str(inst)
640 success = 0
640 success = 0
641 return "%s %s\n" % (success, r)
641 return "%s %s\n" % (success, r)
642
642
643 @wireprotocommand('known', 'nodes *')
643 @wireprotocommand('known', 'nodes *')
644 def known(repo, proto, nodes, others):
644 def known(repo, proto, nodes, others):
645 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
645 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
646
646
647 @wireprotocommand('pushkey', 'namespace key old new')
647 @wireprotocommand('pushkey', 'namespace key old new')
648 def pushkey(repo, proto, namespace, key, old, new):
648 def pushkey(repo, proto, namespace, key, old, new):
649 # compatibility with pre-1.8 clients which were accidentally
649 # compatibility with pre-1.8 clients which were accidentally
650 # sending raw binary nodes rather than utf-8-encoded hex
650 # sending raw binary nodes rather than utf-8-encoded hex
651 if len(new) == 20 and new.encode('string-escape') != new:
651 if len(new) == 20 and new.encode('string-escape') != new:
652 # looks like it could be a binary node
652 # looks like it could be a binary node
653 try:
653 try:
654 new.decode('utf-8')
654 new.decode('utf-8')
655 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
655 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
656 except UnicodeDecodeError:
656 except UnicodeDecodeError:
657 pass # binary, leave unmodified
657 pass # binary, leave unmodified
658 else:
658 else:
659 new = encoding.tolocal(new) # normal path
659 new = encoding.tolocal(new) # normal path
660
660
661 if util.safehasattr(proto, 'restore'):
661 if util.safehasattr(proto, 'restore'):
662
662
663 proto.redirect()
663 proto.redirect()
664
664
665 try:
665 try:
666 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
666 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
667 encoding.tolocal(old), new) or False
667 encoding.tolocal(old), new) or False
668 except util.Abort:
668 except util.Abort:
669 r = False
669 r = False
670
670
671 output = proto.restore()
671 output = proto.restore()
672
672
673 return '%s\n%s' % (int(r), output)
673 return '%s\n%s' % (int(r), output)
674
674
675 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
675 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
676 encoding.tolocal(old), new)
676 encoding.tolocal(old), new)
677 return '%s\n' % int(r)
677 return '%s\n' % int(r)
678
678
679 def _allowstream(ui):
679 def _allowstream(ui):
680 return ui.configbool('server', 'uncompressed', True, untrusted=True)
680 return ui.configbool('server', 'uncompressed', True, untrusted=True)
681
681
682 def _walkstreamfiles(repo):
682 def _walkstreamfiles(repo):
683 # this is it's own function so extensions can override it
683 # this is it's own function so extensions can override it
684 return repo.store.walk()
684 return repo.store.walk()
685
685
686 @wireprotocommand('stream_out')
686 @wireprotocommand('stream_out')
687 def stream(repo, proto):
687 def stream(repo, proto):
688 '''If the server supports streaming clone, it advertises the "stream"
688 '''If the server supports streaming clone, it advertises the "stream"
689 capability with a value representing the version and flags of the repo
689 capability with a value representing the version and flags of the repo
690 it is serving. Client checks to see if it understands the format.
690 it is serving. Client checks to see if it understands the format.
691
691
692 The format is simple: the server writes out a line with the amount
692 The format is simple: the server writes out a line with the amount
693 of files, then the total amount of bytes to be transferred (separated
693 of files, then the total amount of bytes to be transferred (separated
694 by a space). Then, for each file, the server first writes the filename
694 by a space). Then, for each file, the server first writes the filename
695 and filesize (separated by the null character), then the file contents.
695 and filesize (separated by the null character), then the file contents.
696 '''
696 '''
697
697
698 if not _allowstream(repo.ui):
698 if not _allowstream(repo.ui):
699 return '1\n'
699 return '1\n'
700
700
701 entries = []
701 entries = []
702 total_bytes = 0
702 total_bytes = 0
703 try:
703 try:
704 # get consistent snapshot of repo, lock during scan
704 # get consistent snapshot of repo, lock during scan
705 lock = repo.lock()
705 lock = repo.lock()
706 try:
706 try:
707 repo.ui.debug('scanning\n')
707 repo.ui.debug('scanning\n')
708 for name, ename, size in _walkstreamfiles(repo):
708 for name, ename, size in _walkstreamfiles(repo):
709 if size:
709 if size:
710 entries.append((name, size))
710 entries.append((name, size))
711 total_bytes += size
711 total_bytes += size
712 finally:
712 finally:
713 lock.release()
713 lock.release()
714 except error.LockError:
714 except error.LockError:
715 return '2\n' # error: 2
715 return '2\n' # error: 2
716
716
717 def streamer(repo, entries, total):
717 def streamer(repo, entries, total):
718 '''stream out all metadata files in repository.'''
718 '''stream out all metadata files in repository.'''
719 yield '0\n' # success
719 yield '0\n' # success
720 repo.ui.debug('%d files, %d bytes to transfer\n' %
720 repo.ui.debug('%d files, %d bytes to transfer\n' %
721 (len(entries), total_bytes))
721 (len(entries), total_bytes))
722 yield '%d %d\n' % (len(entries), total_bytes)
722 yield '%d %d\n' % (len(entries), total_bytes)
723
723
724 sopener = repo.sopener
724 sopener = repo.sopener
725 oldaudit = sopener.mustaudit
725 oldaudit = sopener.mustaudit
726 debugflag = repo.ui.debugflag
726 debugflag = repo.ui.debugflag
727 sopener.mustaudit = False
727 sopener.mustaudit = False
728
728
729 try:
729 try:
730 for name, size in entries:
730 for name, size in entries:
731 if debugflag:
731 if debugflag:
732 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
732 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
733 # partially encode name over the wire for backwards compat
733 # partially encode name over the wire for backwards compat
734 yield '%s\0%d\n' % (store.encodedir(name), size)
734 yield '%s\0%d\n' % (store.encodedir(name), size)
735 if size <= 65536:
735 if size <= 65536:
736 fp = sopener(name)
736 fp = sopener(name)
737 try:
737 try:
738 data = fp.read(size)
738 data = fp.read(size)
739 finally:
739 finally:
740 fp.close()
740 fp.close()
741 yield data
741 yield data
742 else:
742 else:
743 for chunk in util.filechunkiter(sopener(name), limit=size):
743 for chunk in util.filechunkiter(sopener(name), limit=size):
744 yield chunk
744 yield chunk
745 # replace with "finally:" when support for python 2.4 has been dropped
745 # replace with "finally:" when support for python 2.4 has been dropped
746 except Exception:
746 except Exception:
747 sopener.mustaudit = oldaudit
747 sopener.mustaudit = oldaudit
748 raise
748 raise
749 sopener.mustaudit = oldaudit
749 sopener.mustaudit = oldaudit
750
750
751 return streamres(streamer(repo, entries, total_bytes))
751 return streamres(streamer(repo, entries, total_bytes))
752
752
753 @wireprotocommand('unbundle', 'heads')
753 @wireprotocommand('unbundle', 'heads')
754 def unbundle(repo, proto, heads):
754 def unbundle(repo, proto, heads):
755 their_heads = decodelist(heads)
755 their_heads = decodelist(heads)
756
756
757 try:
757 try:
758 proto.redirect()
758 proto.redirect()
759
759
760 exchange.check_heads(repo, their_heads, 'preparing changes')
760 exchange.check_heads(repo, their_heads, 'preparing changes')
761
761
762 # write bundle data to temporary file because it can be big
762 # write bundle data to temporary file because it can be big
763 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
763 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
764 fp = os.fdopen(fd, 'wb+')
764 fp = os.fdopen(fd, 'wb+')
765 r = 0
765 r = 0
766 try:
766 try:
767 proto.getfile(fp)
767 proto.getfile(fp)
768 lock = repo.lock()
768 fp.seek(0)
769 try:
769 gen = changegroupmod.readbundle(fp, None)
770 exchange.check_heads(repo, their_heads, 'uploading changes')
770 r = exchange.unbundle(repo, gen, their_heads, 'serve',
771
771 proto._client())
772 # push can proceed
773 fp.seek(0)
774 gen = changegroupmod.readbundle(fp, None)
775
776 try:
777 r = changegroupmod.addchangegroup(repo, gen, 'serve',
778 proto._client())
779 except util.Abort, inst:
780 sys.stderr.write("abort: %s\n" % inst)
781 finally:
782 lock.release()
783 return pushres(r)
772 return pushres(r)
784
773
785 finally:
774 finally:
786 fp.close()
775 fp.close()
787 os.unlink(tempname)
776 os.unlink(tempname)
788 except exchange.PushRaced, exc:
777 except exchange.PushRaced, exc:
789 return pusherr(str(exc))
778 return pusherr(str(exc))
General Comments 0
You need to be logged in to leave comments. Login now