##// END OF EJS Templates
wireproto: consolidate code for obtaining "cmds" argument value...
Gregory Szorc -
r29733:bb04f96d default
parent child Browse files
Show More
@@ -1,365 +1,359 b''
1 # sshpeer.py - ssh repository proxy class for mercurial
1 # sshpeer.py - ssh repository proxy class for mercurial
2 #
2 #
3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import re
10 import re
11
11
12 from .i18n import _
12 from .i18n import _
13 from . import (
13 from . import (
14 error,
14 error,
15 util,
15 util,
16 wireproto,
16 wireproto,
17 )
17 )
18
18
19 class remotelock(object):
19 class remotelock(object):
20 def __init__(self, repo):
20 def __init__(self, repo):
21 self.repo = repo
21 self.repo = repo
22 def release(self):
22 def release(self):
23 self.repo.unlock()
23 self.repo.unlock()
24 self.repo = None
24 self.repo = None
25 def __enter__(self):
25 def __enter__(self):
26 return self
26 return self
27 def __exit__(self, exc_type, exc_val, exc_tb):
27 def __exit__(self, exc_type, exc_val, exc_tb):
28 if self.repo:
28 if self.repo:
29 self.release()
29 self.release()
30 def __del__(self):
30 def __del__(self):
31 if self.repo:
31 if self.repo:
32 self.release()
32 self.release()
33
33
34 def _serverquote(s):
34 def _serverquote(s):
35 if not s:
35 if not s:
36 return s
36 return s
37 '''quote a string for the remote shell ... which we assume is sh'''
37 '''quote a string for the remote shell ... which we assume is sh'''
38 if re.match('[a-zA-Z0-9@%_+=:,./-]*$', s):
38 if re.match('[a-zA-Z0-9@%_+=:,./-]*$', s):
39 return s
39 return s
40 return "'%s'" % s.replace("'", "'\\''")
40 return "'%s'" % s.replace("'", "'\\''")
41
41
42 def _forwardoutput(ui, pipe):
42 def _forwardoutput(ui, pipe):
43 """display all data currently available on pipe as remote output.
43 """display all data currently available on pipe as remote output.
44
44
45 This is non blocking."""
45 This is non blocking."""
46 s = util.readpipe(pipe)
46 s = util.readpipe(pipe)
47 if s:
47 if s:
48 for l in s.splitlines():
48 for l in s.splitlines():
49 ui.status(_("remote: "), l, '\n')
49 ui.status(_("remote: "), l, '\n')
50
50
51 class doublepipe(object):
51 class doublepipe(object):
52 """Operate a side-channel pipe in addition of a main one
52 """Operate a side-channel pipe in addition of a main one
53
53
54 The side-channel pipe contains server output to be forwarded to the user
54 The side-channel pipe contains server output to be forwarded to the user
55 input. The double pipe will behave as the "main" pipe, but will ensure the
55 input. The double pipe will behave as the "main" pipe, but will ensure the
56 content of the "side" pipe is properly processed while we wait for blocking
56 content of the "side" pipe is properly processed while we wait for blocking
57 call on the "main" pipe.
57 call on the "main" pipe.
58
58
59 If large amounts of data are read from "main", the forward will cease after
59 If large amounts of data are read from "main", the forward will cease after
60 the first bytes start to appear. This simplifies the implementation
60 the first bytes start to appear. This simplifies the implementation
61 without affecting actual output of sshpeer too much as we rarely issue
61 without affecting actual output of sshpeer too much as we rarely issue
62 large read for data not yet emitted by the server.
62 large read for data not yet emitted by the server.
63
63
64 The main pipe is expected to be a 'bufferedinputpipe' from the util module
64 The main pipe is expected to be a 'bufferedinputpipe' from the util module
65 that handle all the os specific bites. This class lives in this module
65 that handle all the os specific bites. This class lives in this module
66 because it focus on behavior specific to the ssh protocol."""
66 because it focus on behavior specific to the ssh protocol."""
67
67
68 def __init__(self, ui, main, side):
68 def __init__(self, ui, main, side):
69 self._ui = ui
69 self._ui = ui
70 self._main = main
70 self._main = main
71 self._side = side
71 self._side = side
72
72
73 def _wait(self):
73 def _wait(self):
74 """wait until some data are available on main or side
74 """wait until some data are available on main or side
75
75
76 return a pair of boolean (ismainready, issideready)
76 return a pair of boolean (ismainready, issideready)
77
77
78 (This will only wait for data if the setup is supported by `util.poll`)
78 (This will only wait for data if the setup is supported by `util.poll`)
79 """
79 """
80 if getattr(self._main, 'hasbuffer', False): # getattr for classic pipe
80 if getattr(self._main, 'hasbuffer', False): # getattr for classic pipe
81 return (True, True) # main has data, assume side is worth poking at.
81 return (True, True) # main has data, assume side is worth poking at.
82 fds = [self._main.fileno(), self._side.fileno()]
82 fds = [self._main.fileno(), self._side.fileno()]
83 try:
83 try:
84 act = util.poll(fds)
84 act = util.poll(fds)
85 except NotImplementedError:
85 except NotImplementedError:
86 # non supported yet case, assume all have data.
86 # non supported yet case, assume all have data.
87 act = fds
87 act = fds
88 return (self._main.fileno() in act, self._side.fileno() in act)
88 return (self._main.fileno() in act, self._side.fileno() in act)
89
89
90 def write(self, data):
90 def write(self, data):
91 return self._call('write', data)
91 return self._call('write', data)
92
92
93 def read(self, size):
93 def read(self, size):
94 return self._call('read', size)
94 return self._call('read', size)
95
95
96 def readline(self):
96 def readline(self):
97 return self._call('readline')
97 return self._call('readline')
98
98
99 def _call(self, methname, data=None):
99 def _call(self, methname, data=None):
100 """call <methname> on "main", forward output of "side" while blocking
100 """call <methname> on "main", forward output of "side" while blocking
101 """
101 """
102 # data can be '' or 0
102 # data can be '' or 0
103 if (data is not None and not data) or self._main.closed:
103 if (data is not None and not data) or self._main.closed:
104 _forwardoutput(self._ui, self._side)
104 _forwardoutput(self._ui, self._side)
105 return ''
105 return ''
106 while True:
106 while True:
107 mainready, sideready = self._wait()
107 mainready, sideready = self._wait()
108 if sideready:
108 if sideready:
109 _forwardoutput(self._ui, self._side)
109 _forwardoutput(self._ui, self._side)
110 if mainready:
110 if mainready:
111 meth = getattr(self._main, methname)
111 meth = getattr(self._main, methname)
112 if data is None:
112 if data is None:
113 return meth()
113 return meth()
114 else:
114 else:
115 return meth(data)
115 return meth(data)
116
116
117 def close(self):
117 def close(self):
118 return self._main.close()
118 return self._main.close()
119
119
120 def flush(self):
120 def flush(self):
121 return self._main.flush()
121 return self._main.flush()
122
122
123 class sshpeer(wireproto.wirepeer):
123 class sshpeer(wireproto.wirepeer):
124 def __init__(self, ui, path, create=False):
124 def __init__(self, ui, path, create=False):
125 self._url = path
125 self._url = path
126 self.ui = ui
126 self.ui = ui
127 self.pipeo = self.pipei = self.pipee = None
127 self.pipeo = self.pipei = self.pipee = None
128
128
129 u = util.url(path, parsequery=False, parsefragment=False)
129 u = util.url(path, parsequery=False, parsefragment=False)
130 if u.scheme != 'ssh' or not u.host or u.path is None:
130 if u.scheme != 'ssh' or not u.host or u.path is None:
131 self._abort(error.RepoError(_("couldn't parse location %s") % path))
131 self._abort(error.RepoError(_("couldn't parse location %s") % path))
132
132
133 self.user = u.user
133 self.user = u.user
134 if u.passwd is not None:
134 if u.passwd is not None:
135 self._abort(error.RepoError(_("password in URL not supported")))
135 self._abort(error.RepoError(_("password in URL not supported")))
136 self.host = u.host
136 self.host = u.host
137 self.port = u.port
137 self.port = u.port
138 self.path = u.path or "."
138 self.path = u.path or "."
139
139
140 sshcmd = self.ui.config("ui", "ssh", "ssh")
140 sshcmd = self.ui.config("ui", "ssh", "ssh")
141 remotecmd = self.ui.config("ui", "remotecmd", "hg")
141 remotecmd = self.ui.config("ui", "remotecmd", "hg")
142
142
143 args = util.sshargs(sshcmd,
143 args = util.sshargs(sshcmd,
144 _serverquote(self.host),
144 _serverquote(self.host),
145 _serverquote(self.user),
145 _serverquote(self.user),
146 _serverquote(self.port))
146 _serverquote(self.port))
147
147
148 if create:
148 if create:
149 cmd = '%s %s %s' % (sshcmd, args,
149 cmd = '%s %s %s' % (sshcmd, args,
150 util.shellquote("%s init %s" %
150 util.shellquote("%s init %s" %
151 (_serverquote(remotecmd), _serverquote(self.path))))
151 (_serverquote(remotecmd), _serverquote(self.path))))
152 ui.debug('running %s\n' % cmd)
152 ui.debug('running %s\n' % cmd)
153 res = ui.system(cmd)
153 res = ui.system(cmd)
154 if res != 0:
154 if res != 0:
155 self._abort(error.RepoError(_("could not create remote repo")))
155 self._abort(error.RepoError(_("could not create remote repo")))
156
156
157 self._validaterepo(sshcmd, args, remotecmd)
157 self._validaterepo(sshcmd, args, remotecmd)
158
158
159 def url(self):
159 def url(self):
160 return self._url
160 return self._url
161
161
162 def _validaterepo(self, sshcmd, args, remotecmd):
162 def _validaterepo(self, sshcmd, args, remotecmd):
163 # cleanup up previous run
163 # cleanup up previous run
164 self.cleanup()
164 self.cleanup()
165
165
166 cmd = '%s %s %s' % (sshcmd, args,
166 cmd = '%s %s %s' % (sshcmd, args,
167 util.shellquote("%s -R %s serve --stdio" %
167 util.shellquote("%s -R %s serve --stdio" %
168 (_serverquote(remotecmd), _serverquote(self.path))))
168 (_serverquote(remotecmd), _serverquote(self.path))))
169 self.ui.debug('running %s\n' % cmd)
169 self.ui.debug('running %s\n' % cmd)
170 cmd = util.quotecommand(cmd)
170 cmd = util.quotecommand(cmd)
171
171
172 # while self.subprocess isn't used, having it allows the subprocess to
172 # while self.subprocess isn't used, having it allows the subprocess to
173 # to clean up correctly later
173 # to clean up correctly later
174 #
174 #
175 # no buffer allow the use of 'select'
175 # no buffer allow the use of 'select'
176 # feel free to remove buffering and select usage when we ultimately
176 # feel free to remove buffering and select usage when we ultimately
177 # move to threading.
177 # move to threading.
178 sub = util.popen4(cmd, bufsize=0)
178 sub = util.popen4(cmd, bufsize=0)
179 self.pipeo, self.pipei, self.pipee, self.subprocess = sub
179 self.pipeo, self.pipei, self.pipee, self.subprocess = sub
180
180
181 self.pipei = util.bufferedinputpipe(self.pipei)
181 self.pipei = util.bufferedinputpipe(self.pipei)
182 self.pipei = doublepipe(self.ui, self.pipei, self.pipee)
182 self.pipei = doublepipe(self.ui, self.pipei, self.pipee)
183 self.pipeo = doublepipe(self.ui, self.pipeo, self.pipee)
183 self.pipeo = doublepipe(self.ui, self.pipeo, self.pipee)
184
184
185 # skip any noise generated by remote shell
185 # skip any noise generated by remote shell
186 self._callstream("hello")
186 self._callstream("hello")
187 r = self._callstream("between", pairs=("%s-%s" % ("0"*40, "0"*40)))
187 r = self._callstream("between", pairs=("%s-%s" % ("0"*40, "0"*40)))
188 lines = ["", "dummy"]
188 lines = ["", "dummy"]
189 max_noise = 500
189 max_noise = 500
190 while lines[-1] and max_noise:
190 while lines[-1] and max_noise:
191 l = r.readline()
191 l = r.readline()
192 self.readerr()
192 self.readerr()
193 if lines[-1] == "1\n" and l == "\n":
193 if lines[-1] == "1\n" and l == "\n":
194 break
194 break
195 if l:
195 if l:
196 self.ui.debug("remote: ", l)
196 self.ui.debug("remote: ", l)
197 lines.append(l)
197 lines.append(l)
198 max_noise -= 1
198 max_noise -= 1
199 else:
199 else:
200 self._abort(error.RepoError(_('no suitable response from '
200 self._abort(error.RepoError(_('no suitable response from '
201 'remote hg')))
201 'remote hg')))
202
202
203 self._caps = set()
203 self._caps = set()
204 for l in reversed(lines):
204 for l in reversed(lines):
205 if l.startswith("capabilities:"):
205 if l.startswith("capabilities:"):
206 self._caps.update(l[:-1].split(":")[1].split())
206 self._caps.update(l[:-1].split(":")[1].split())
207 break
207 break
208
208
209 def _capabilities(self):
209 def _capabilities(self):
210 return self._caps
210 return self._caps
211
211
212 def readerr(self):
212 def readerr(self):
213 _forwardoutput(self.ui, self.pipee)
213 _forwardoutput(self.ui, self.pipee)
214
214
215 def _abort(self, exception):
215 def _abort(self, exception):
216 self.cleanup()
216 self.cleanup()
217 raise exception
217 raise exception
218
218
219 def cleanup(self):
219 def cleanup(self):
220 if self.pipeo is None:
220 if self.pipeo is None:
221 return
221 return
222 self.pipeo.close()
222 self.pipeo.close()
223 self.pipei.close()
223 self.pipei.close()
224 try:
224 try:
225 # read the error descriptor until EOF
225 # read the error descriptor until EOF
226 for l in self.pipee:
226 for l in self.pipee:
227 self.ui.status(_("remote: "), l)
227 self.ui.status(_("remote: "), l)
228 except (IOError, ValueError):
228 except (IOError, ValueError):
229 pass
229 pass
230 self.pipee.close()
230 self.pipee.close()
231
231
232 __del__ = cleanup
232 __del__ = cleanup
233
233
234 def _submitbatch(self, req):
234 def _submitbatch(self, req):
235 cmds = []
235 rsp = self._callstream("batch", cmds=wireproto.encodebatchcmds(req))
236 for op, argsdict in req:
237 args = ','.join('%s=%s' % (wireproto.escapearg(k),
238 wireproto.escapearg(v))
239 for k, v in argsdict.iteritems())
240 cmds.append('%s %s' % (op, args))
241 rsp = self._callstream("batch", cmds=';'.join(cmds))
242 available = self._getamount()
236 available = self._getamount()
243 # TODO this response parsing is probably suboptimal for large
237 # TODO this response parsing is probably suboptimal for large
244 # batches with large responses.
238 # batches with large responses.
245 toread = min(available, 1024)
239 toread = min(available, 1024)
246 work = rsp.read(toread)
240 work = rsp.read(toread)
247 available -= toread
241 available -= toread
248 chunk = work
242 chunk = work
249 while chunk:
243 while chunk:
250 while ';' in work:
244 while ';' in work:
251 one, work = work.split(';', 1)
245 one, work = work.split(';', 1)
252 yield wireproto.unescapearg(one)
246 yield wireproto.unescapearg(one)
253 toread = min(available, 1024)
247 toread = min(available, 1024)
254 chunk = rsp.read(toread)
248 chunk = rsp.read(toread)
255 available -= toread
249 available -= toread
256 work += chunk
250 work += chunk
257 yield wireproto.unescapearg(work)
251 yield wireproto.unescapearg(work)
258
252
259 def _callstream(self, cmd, **args):
253 def _callstream(self, cmd, **args):
260 self.ui.debug("sending %s command\n" % cmd)
254 self.ui.debug("sending %s command\n" % cmd)
261 self.pipeo.write("%s\n" % cmd)
255 self.pipeo.write("%s\n" % cmd)
262 _func, names = wireproto.commands[cmd]
256 _func, names = wireproto.commands[cmd]
263 keys = names.split()
257 keys = names.split()
264 wireargs = {}
258 wireargs = {}
265 for k in keys:
259 for k in keys:
266 if k == '*':
260 if k == '*':
267 wireargs['*'] = args
261 wireargs['*'] = args
268 break
262 break
269 else:
263 else:
270 wireargs[k] = args[k]
264 wireargs[k] = args[k]
271 del args[k]
265 del args[k]
272 for k, v in sorted(wireargs.iteritems()):
266 for k, v in sorted(wireargs.iteritems()):
273 self.pipeo.write("%s %d\n" % (k, len(v)))
267 self.pipeo.write("%s %d\n" % (k, len(v)))
274 if isinstance(v, dict):
268 if isinstance(v, dict):
275 for dk, dv in v.iteritems():
269 for dk, dv in v.iteritems():
276 self.pipeo.write("%s %d\n" % (dk, len(dv)))
270 self.pipeo.write("%s %d\n" % (dk, len(dv)))
277 self.pipeo.write(dv)
271 self.pipeo.write(dv)
278 else:
272 else:
279 self.pipeo.write(v)
273 self.pipeo.write(v)
280 self.pipeo.flush()
274 self.pipeo.flush()
281
275
282 return self.pipei
276 return self.pipei
283
277
284 def _callcompressable(self, cmd, **args):
278 def _callcompressable(self, cmd, **args):
285 return self._callstream(cmd, **args)
279 return self._callstream(cmd, **args)
286
280
287 def _call(self, cmd, **args):
281 def _call(self, cmd, **args):
288 self._callstream(cmd, **args)
282 self._callstream(cmd, **args)
289 return self._recv()
283 return self._recv()
290
284
291 def _callpush(self, cmd, fp, **args):
285 def _callpush(self, cmd, fp, **args):
292 r = self._call(cmd, **args)
286 r = self._call(cmd, **args)
293 if r:
287 if r:
294 return '', r
288 return '', r
295 for d in iter(lambda: fp.read(4096), ''):
289 for d in iter(lambda: fp.read(4096), ''):
296 self._send(d)
290 self._send(d)
297 self._send("", flush=True)
291 self._send("", flush=True)
298 r = self._recv()
292 r = self._recv()
299 if r:
293 if r:
300 return '', r
294 return '', r
301 return self._recv(), ''
295 return self._recv(), ''
302
296
303 def _calltwowaystream(self, cmd, fp, **args):
297 def _calltwowaystream(self, cmd, fp, **args):
304 r = self._call(cmd, **args)
298 r = self._call(cmd, **args)
305 if r:
299 if r:
306 # XXX needs to be made better
300 # XXX needs to be made better
307 raise error.Abort(_('unexpected remote reply: %s') % r)
301 raise error.Abort(_('unexpected remote reply: %s') % r)
308 for d in iter(lambda: fp.read(4096), ''):
302 for d in iter(lambda: fp.read(4096), ''):
309 self._send(d)
303 self._send(d)
310 self._send("", flush=True)
304 self._send("", flush=True)
311 return self.pipei
305 return self.pipei
312
306
313 def _getamount(self):
307 def _getamount(self):
314 l = self.pipei.readline()
308 l = self.pipei.readline()
315 if l == '\n':
309 if l == '\n':
316 self.readerr()
310 self.readerr()
317 msg = _('check previous remote output')
311 msg = _('check previous remote output')
318 self._abort(error.OutOfBandError(hint=msg))
312 self._abort(error.OutOfBandError(hint=msg))
319 self.readerr()
313 self.readerr()
320 try:
314 try:
321 return int(l)
315 return int(l)
322 except ValueError:
316 except ValueError:
323 self._abort(error.ResponseError(_("unexpected response:"), l))
317 self._abort(error.ResponseError(_("unexpected response:"), l))
324
318
325 def _recv(self):
319 def _recv(self):
326 return self.pipei.read(self._getamount())
320 return self.pipei.read(self._getamount())
327
321
328 def _send(self, data, flush=False):
322 def _send(self, data, flush=False):
329 self.pipeo.write("%d\n" % len(data))
323 self.pipeo.write("%d\n" % len(data))
330 if data:
324 if data:
331 self.pipeo.write(data)
325 self.pipeo.write(data)
332 if flush:
326 if flush:
333 self.pipeo.flush()
327 self.pipeo.flush()
334 self.readerr()
328 self.readerr()
335
329
336 def lock(self):
330 def lock(self):
337 self._call("lock")
331 self._call("lock")
338 return remotelock(self)
332 return remotelock(self)
339
333
340 def unlock(self):
334 def unlock(self):
341 self._call("unlock")
335 self._call("unlock")
342
336
343 def addchangegroup(self, cg, source, url, lock=None):
337 def addchangegroup(self, cg, source, url, lock=None):
344 '''Send a changegroup to the remote server. Return an integer
338 '''Send a changegroup to the remote server. Return an integer
345 similar to unbundle(). DEPRECATED, since it requires locking the
339 similar to unbundle(). DEPRECATED, since it requires locking the
346 remote.'''
340 remote.'''
347 d = self._call("addchangegroup")
341 d = self._call("addchangegroup")
348 if d:
342 if d:
349 self._abort(error.RepoError(_("push refused: %s") % d))
343 self._abort(error.RepoError(_("push refused: %s") % d))
350 for d in iter(lambda: cg.read(4096), ''):
344 for d in iter(lambda: cg.read(4096), ''):
351 self.pipeo.write(d)
345 self.pipeo.write(d)
352 self.readerr()
346 self.readerr()
353
347
354 self.pipeo.flush()
348 self.pipeo.flush()
355
349
356 self.readerr()
350 self.readerr()
357 r = self._recv()
351 r = self._recv()
358 if not r:
352 if not r:
359 return 1
353 return 1
360 try:
354 try:
361 return int(r)
355 return int(r)
362 except ValueError:
356 except ValueError:
363 self._abort(error.ResponseError(_("unexpected response:"), r))
357 self._abort(error.ResponseError(_("unexpected response:"), r))
364
358
365 instance = sshpeer
359 instance = sshpeer
@@ -1,952 +1,957 b''
1 # wireproto.py - generic wire protocol support functions
1 # wireproto.py - generic wire protocol support functions
2 #
2 #
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import hashlib
10 import hashlib
11 import itertools
11 import itertools
12 import os
12 import os
13 import sys
13 import sys
14 import tempfile
14 import tempfile
15
15
16 from .i18n import _
16 from .i18n import _
17 from .node import (
17 from .node import (
18 bin,
18 bin,
19 hex,
19 hex,
20 )
20 )
21
21
22 from . import (
22 from . import (
23 bundle2,
23 bundle2,
24 changegroup as changegroupmod,
24 changegroup as changegroupmod,
25 encoding,
25 encoding,
26 error,
26 error,
27 exchange,
27 exchange,
28 peer,
28 peer,
29 pushkey as pushkeymod,
29 pushkey as pushkeymod,
30 streamclone,
30 streamclone,
31 util,
31 util,
32 )
32 )
33
33
34 urlerr = util.urlerr
34 urlerr = util.urlerr
35 urlreq = util.urlreq
35 urlreq = util.urlreq
36
36
37 bundle2required = _(
37 bundle2required = _(
38 'incompatible Mercurial client; bundle2 required\n'
38 'incompatible Mercurial client; bundle2 required\n'
39 '(see https://www.mercurial-scm.org/wiki/IncompatibleClient)\n')
39 '(see https://www.mercurial-scm.org/wiki/IncompatibleClient)\n')
40
40
41 class abstractserverproto(object):
41 class abstractserverproto(object):
42 """abstract class that summarizes the protocol API
42 """abstract class that summarizes the protocol API
43
43
44 Used as reference and documentation.
44 Used as reference and documentation.
45 """
45 """
46
46
47 def getargs(self, args):
47 def getargs(self, args):
48 """return the value for arguments in <args>
48 """return the value for arguments in <args>
49
49
50 returns a list of values (same order as <args>)"""
50 returns a list of values (same order as <args>)"""
51 raise NotImplementedError()
51 raise NotImplementedError()
52
52
53 def getfile(self, fp):
53 def getfile(self, fp):
54 """write the whole content of a file into a file like object
54 """write the whole content of a file into a file like object
55
55
56 The file is in the form::
56 The file is in the form::
57
57
58 (<chunk-size>\n<chunk>)+0\n
58 (<chunk-size>\n<chunk>)+0\n
59
59
60 chunk size is the ascii version of the int.
60 chunk size is the ascii version of the int.
61 """
61 """
62 raise NotImplementedError()
62 raise NotImplementedError()
63
63
64 def redirect(self):
64 def redirect(self):
65 """may setup interception for stdout and stderr
65 """may setup interception for stdout and stderr
66
66
67 See also the `restore` method."""
67 See also the `restore` method."""
68 raise NotImplementedError()
68 raise NotImplementedError()
69
69
70 # If the `redirect` function does install interception, the `restore`
70 # If the `redirect` function does install interception, the `restore`
71 # function MUST be defined. If interception is not used, this function
71 # function MUST be defined. If interception is not used, this function
72 # MUST NOT be defined.
72 # MUST NOT be defined.
73 #
73 #
74 # left commented here on purpose
74 # left commented here on purpose
75 #
75 #
76 #def restore(self):
76 #def restore(self):
77 # """reinstall previous stdout and stderr and return intercepted stdout
77 # """reinstall previous stdout and stderr and return intercepted stdout
78 # """
78 # """
79 # raise NotImplementedError()
79 # raise NotImplementedError()
80
80
81 def groupchunks(self, cg):
81 def groupchunks(self, cg):
82 """return 4096 chunks from a changegroup object
82 """return 4096 chunks from a changegroup object
83
83
84 Some protocols may have compressed the contents."""
84 Some protocols may have compressed the contents."""
85 raise NotImplementedError()
85 raise NotImplementedError()
86
86
87 class remotebatch(peer.batcher):
87 class remotebatch(peer.batcher):
88 '''batches the queued calls; uses as few roundtrips as possible'''
88 '''batches the queued calls; uses as few roundtrips as possible'''
89 def __init__(self, remote):
89 def __init__(self, remote):
90 '''remote must support _submitbatch(encbatch) and
90 '''remote must support _submitbatch(encbatch) and
91 _submitone(op, encargs)'''
91 _submitone(op, encargs)'''
92 peer.batcher.__init__(self)
92 peer.batcher.__init__(self)
93 self.remote = remote
93 self.remote = remote
94 def submit(self):
94 def submit(self):
95 req, rsp = [], []
95 req, rsp = [], []
96 for name, args, opts, resref in self.calls:
96 for name, args, opts, resref in self.calls:
97 mtd = getattr(self.remote, name)
97 mtd = getattr(self.remote, name)
98 batchablefn = getattr(mtd, 'batchable', None)
98 batchablefn = getattr(mtd, 'batchable', None)
99 if batchablefn is not None:
99 if batchablefn is not None:
100 batchable = batchablefn(mtd.im_self, *args, **opts)
100 batchable = batchablefn(mtd.im_self, *args, **opts)
101 encargsorres, encresref = next(batchable)
101 encargsorres, encresref = next(batchable)
102 if encresref:
102 if encresref:
103 req.append((name, encargsorres,))
103 req.append((name, encargsorres,))
104 rsp.append((batchable, encresref, resref,))
104 rsp.append((batchable, encresref, resref,))
105 else:
105 else:
106 resref.set(encargsorres)
106 resref.set(encargsorres)
107 else:
107 else:
108 if req:
108 if req:
109 self._submitreq(req, rsp)
109 self._submitreq(req, rsp)
110 req, rsp = [], []
110 req, rsp = [], []
111 resref.set(mtd(*args, **opts))
111 resref.set(mtd(*args, **opts))
112 if req:
112 if req:
113 self._submitreq(req, rsp)
113 self._submitreq(req, rsp)
114 def _submitreq(self, req, rsp):
114 def _submitreq(self, req, rsp):
115 encresults = self.remote._submitbatch(req)
115 encresults = self.remote._submitbatch(req)
116 for encres, r in zip(encresults, rsp):
116 for encres, r in zip(encresults, rsp):
117 batchable, encresref, resref = r
117 batchable, encresref, resref = r
118 encresref.set(encres)
118 encresref.set(encres)
119 resref.set(next(batchable))
119 resref.set(next(batchable))
120
120
121 class remoteiterbatcher(peer.iterbatcher):
121 class remoteiterbatcher(peer.iterbatcher):
122 def __init__(self, remote):
122 def __init__(self, remote):
123 super(remoteiterbatcher, self).__init__()
123 super(remoteiterbatcher, self).__init__()
124 self._remote = remote
124 self._remote = remote
125
125
126 def __getattr__(self, name):
126 def __getattr__(self, name):
127 if not getattr(self._remote, name, False):
127 if not getattr(self._remote, name, False):
128 raise AttributeError(
128 raise AttributeError(
129 'Attempted to iterbatch non-batchable call to %r' % name)
129 'Attempted to iterbatch non-batchable call to %r' % name)
130 return super(remoteiterbatcher, self).__getattr__(name)
130 return super(remoteiterbatcher, self).__getattr__(name)
131
131
132 def submit(self):
132 def submit(self):
133 """Break the batch request into many patch calls and pipeline them.
133 """Break the batch request into many patch calls and pipeline them.
134
134
135 This is mostly valuable over http where request sizes can be
135 This is mostly valuable over http where request sizes can be
136 limited, but can be used in other places as well.
136 limited, but can be used in other places as well.
137 """
137 """
138 req, rsp = [], []
138 req, rsp = [], []
139 for name, args, opts, resref in self.calls:
139 for name, args, opts, resref in self.calls:
140 mtd = getattr(self._remote, name)
140 mtd = getattr(self._remote, name)
141 batchable = mtd.batchable(mtd.im_self, *args, **opts)
141 batchable = mtd.batchable(mtd.im_self, *args, **opts)
142 encargsorres, encresref = next(batchable)
142 encargsorres, encresref = next(batchable)
143 assert encresref
143 assert encresref
144 req.append((name, encargsorres))
144 req.append((name, encargsorres))
145 rsp.append((batchable, encresref))
145 rsp.append((batchable, encresref))
146 if req:
146 if req:
147 self._resultiter = self._remote._submitbatch(req)
147 self._resultiter = self._remote._submitbatch(req)
148 self._rsp = rsp
148 self._rsp = rsp
149
149
150 def results(self):
150 def results(self):
151 for (batchable, encresref), encres in itertools.izip(
151 for (batchable, encresref), encres in itertools.izip(
152 self._rsp, self._resultiter):
152 self._rsp, self._resultiter):
153 encresref.set(encres)
153 encresref.set(encres)
154 yield next(batchable)
154 yield next(batchable)
155
155
156 # Forward a couple of names from peer to make wireproto interactions
156 # Forward a couple of names from peer to make wireproto interactions
157 # slightly more sensible.
157 # slightly more sensible.
158 batchable = peer.batchable
158 batchable = peer.batchable
159 future = peer.future
159 future = peer.future
160
160
161 # list of nodes encoding / decoding
161 # list of nodes encoding / decoding
162
162
163 def decodelist(l, sep=' '):
163 def decodelist(l, sep=' '):
164 if l:
164 if l:
165 return map(bin, l.split(sep))
165 return map(bin, l.split(sep))
166 return []
166 return []
167
167
168 def encodelist(l, sep=' '):
168 def encodelist(l, sep=' '):
169 try:
169 try:
170 return sep.join(map(hex, l))
170 return sep.join(map(hex, l))
171 except TypeError:
171 except TypeError:
172 raise
172 raise
173
173
174 # batched call argument encoding
174 # batched call argument encoding
175
175
176 def escapearg(plain):
176 def escapearg(plain):
177 return (plain
177 return (plain
178 .replace(':', ':c')
178 .replace(':', ':c')
179 .replace(',', ':o')
179 .replace(',', ':o')
180 .replace(';', ':s')
180 .replace(';', ':s')
181 .replace('=', ':e'))
181 .replace('=', ':e'))
182
182
183 def unescapearg(escaped):
183 def unescapearg(escaped):
184 return (escaped
184 return (escaped
185 .replace(':e', '=')
185 .replace(':e', '=')
186 .replace(':s', ';')
186 .replace(':s', ';')
187 .replace(':o', ',')
187 .replace(':o', ',')
188 .replace(':c', ':'))
188 .replace(':c', ':'))
189
189
190 def encodebatchcmds(req):
191 """Return a ``cmds`` argument value for the ``batch`` command."""
192 cmds = []
193 for op, argsdict in req:
194 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
195 for k, v in argsdict.iteritems())
196 cmds.append('%s %s' % (op, args))
197
198 return ';'.join(cmds)
199
190 # mapping of options accepted by getbundle and their types
200 # mapping of options accepted by getbundle and their types
191 #
201 #
192 # Meant to be extended by extensions. It is extensions responsibility to ensure
202 # Meant to be extended by extensions. It is extensions responsibility to ensure
193 # such options are properly processed in exchange.getbundle.
203 # such options are properly processed in exchange.getbundle.
194 #
204 #
195 # supported types are:
205 # supported types are:
196 #
206 #
197 # :nodes: list of binary nodes
207 # :nodes: list of binary nodes
198 # :csv: list of comma-separated values
208 # :csv: list of comma-separated values
199 # :scsv: list of comma-separated values return as set
209 # :scsv: list of comma-separated values return as set
200 # :plain: string with no transformation needed.
210 # :plain: string with no transformation needed.
201 gboptsmap = {'heads': 'nodes',
211 gboptsmap = {'heads': 'nodes',
202 'common': 'nodes',
212 'common': 'nodes',
203 'obsmarkers': 'boolean',
213 'obsmarkers': 'boolean',
204 'bundlecaps': 'scsv',
214 'bundlecaps': 'scsv',
205 'listkeys': 'csv',
215 'listkeys': 'csv',
206 'cg': 'boolean',
216 'cg': 'boolean',
207 'cbattempted': 'boolean'}
217 'cbattempted': 'boolean'}
208
218
209 # client side
219 # client side
210
220
211 class wirepeer(peer.peerrepository):
221 class wirepeer(peer.peerrepository):
212 """Client-side interface for communicating with a peer repository.
222 """Client-side interface for communicating with a peer repository.
213
223
214 Methods commonly call wire protocol commands of the same name.
224 Methods commonly call wire protocol commands of the same name.
215
225
216 See also httppeer.py and sshpeer.py for protocol-specific
226 See also httppeer.py and sshpeer.py for protocol-specific
217 implementations of this interface.
227 implementations of this interface.
218 """
228 """
219 def batch(self):
229 def batch(self):
220 if self.capable('batch'):
230 if self.capable('batch'):
221 return remotebatch(self)
231 return remotebatch(self)
222 else:
232 else:
223 return peer.localbatch(self)
233 return peer.localbatch(self)
224 def _submitbatch(self, req):
234 def _submitbatch(self, req):
225 """run batch request <req> on the server
235 """run batch request <req> on the server
226
236
227 Returns an iterator of the raw responses from the server.
237 Returns an iterator of the raw responses from the server.
228 """
238 """
229 cmds = []
239 rsp = self._callstream("batch", cmds=encodebatchcmds(req))
230 for op, argsdict in req:
231 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
232 for k, v in argsdict.iteritems())
233 cmds.append('%s %s' % (op, args))
234 rsp = self._callstream("batch", cmds=';'.join(cmds))
235 chunk = rsp.read(1024)
240 chunk = rsp.read(1024)
236 work = [chunk]
241 work = [chunk]
237 while chunk:
242 while chunk:
238 while ';' not in chunk and chunk:
243 while ';' not in chunk and chunk:
239 chunk = rsp.read(1024)
244 chunk = rsp.read(1024)
240 work.append(chunk)
245 work.append(chunk)
241 merged = ''.join(work)
246 merged = ''.join(work)
242 while ';' in merged:
247 while ';' in merged:
243 one, merged = merged.split(';', 1)
248 one, merged = merged.split(';', 1)
244 yield unescapearg(one)
249 yield unescapearg(one)
245 chunk = rsp.read(1024)
250 chunk = rsp.read(1024)
246 work = [merged, chunk]
251 work = [merged, chunk]
247 yield unescapearg(''.join(work))
252 yield unescapearg(''.join(work))
248
253
249 def _submitone(self, op, args):
254 def _submitone(self, op, args):
250 return self._call(op, **args)
255 return self._call(op, **args)
251
256
252 def iterbatch(self):
257 def iterbatch(self):
253 return remoteiterbatcher(self)
258 return remoteiterbatcher(self)
254
259
255 @batchable
260 @batchable
256 def lookup(self, key):
261 def lookup(self, key):
257 self.requirecap('lookup', _('look up remote revision'))
262 self.requirecap('lookup', _('look up remote revision'))
258 f = future()
263 f = future()
259 yield {'key': encoding.fromlocal(key)}, f
264 yield {'key': encoding.fromlocal(key)}, f
260 d = f.value
265 d = f.value
261 success, data = d[:-1].split(" ", 1)
266 success, data = d[:-1].split(" ", 1)
262 if int(success):
267 if int(success):
263 yield bin(data)
268 yield bin(data)
264 self._abort(error.RepoError(data))
269 self._abort(error.RepoError(data))
265
270
266 @batchable
271 @batchable
267 def heads(self):
272 def heads(self):
268 f = future()
273 f = future()
269 yield {}, f
274 yield {}, f
270 d = f.value
275 d = f.value
271 try:
276 try:
272 yield decodelist(d[:-1])
277 yield decodelist(d[:-1])
273 except ValueError:
278 except ValueError:
274 self._abort(error.ResponseError(_("unexpected response:"), d))
279 self._abort(error.ResponseError(_("unexpected response:"), d))
275
280
276 @batchable
281 @batchable
277 def known(self, nodes):
282 def known(self, nodes):
278 f = future()
283 f = future()
279 yield {'nodes': encodelist(nodes)}, f
284 yield {'nodes': encodelist(nodes)}, f
280 d = f.value
285 d = f.value
281 try:
286 try:
282 yield [bool(int(b)) for b in d]
287 yield [bool(int(b)) for b in d]
283 except ValueError:
288 except ValueError:
284 self._abort(error.ResponseError(_("unexpected response:"), d))
289 self._abort(error.ResponseError(_("unexpected response:"), d))
285
290
286 @batchable
291 @batchable
287 def branchmap(self):
292 def branchmap(self):
288 f = future()
293 f = future()
289 yield {}, f
294 yield {}, f
290 d = f.value
295 d = f.value
291 try:
296 try:
292 branchmap = {}
297 branchmap = {}
293 for branchpart in d.splitlines():
298 for branchpart in d.splitlines():
294 branchname, branchheads = branchpart.split(' ', 1)
299 branchname, branchheads = branchpart.split(' ', 1)
295 branchname = encoding.tolocal(urlreq.unquote(branchname))
300 branchname = encoding.tolocal(urlreq.unquote(branchname))
296 branchheads = decodelist(branchheads)
301 branchheads = decodelist(branchheads)
297 branchmap[branchname] = branchheads
302 branchmap[branchname] = branchheads
298 yield branchmap
303 yield branchmap
299 except TypeError:
304 except TypeError:
300 self._abort(error.ResponseError(_("unexpected response:"), d))
305 self._abort(error.ResponseError(_("unexpected response:"), d))
301
306
302 def branches(self, nodes):
307 def branches(self, nodes):
303 n = encodelist(nodes)
308 n = encodelist(nodes)
304 d = self._call("branches", nodes=n)
309 d = self._call("branches", nodes=n)
305 try:
310 try:
306 br = [tuple(decodelist(b)) for b in d.splitlines()]
311 br = [tuple(decodelist(b)) for b in d.splitlines()]
307 return br
312 return br
308 except ValueError:
313 except ValueError:
309 self._abort(error.ResponseError(_("unexpected response:"), d))
314 self._abort(error.ResponseError(_("unexpected response:"), d))
310
315
311 def between(self, pairs):
316 def between(self, pairs):
312 batch = 8 # avoid giant requests
317 batch = 8 # avoid giant requests
313 r = []
318 r = []
314 for i in xrange(0, len(pairs), batch):
319 for i in xrange(0, len(pairs), batch):
315 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
320 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
316 d = self._call("between", pairs=n)
321 d = self._call("between", pairs=n)
317 try:
322 try:
318 r.extend(l and decodelist(l) or [] for l in d.splitlines())
323 r.extend(l and decodelist(l) or [] for l in d.splitlines())
319 except ValueError:
324 except ValueError:
320 self._abort(error.ResponseError(_("unexpected response:"), d))
325 self._abort(error.ResponseError(_("unexpected response:"), d))
321 return r
326 return r
322
327
323 @batchable
328 @batchable
324 def pushkey(self, namespace, key, old, new):
329 def pushkey(self, namespace, key, old, new):
325 if not self.capable('pushkey'):
330 if not self.capable('pushkey'):
326 yield False, None
331 yield False, None
327 f = future()
332 f = future()
328 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
333 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
329 yield {'namespace': encoding.fromlocal(namespace),
334 yield {'namespace': encoding.fromlocal(namespace),
330 'key': encoding.fromlocal(key),
335 'key': encoding.fromlocal(key),
331 'old': encoding.fromlocal(old),
336 'old': encoding.fromlocal(old),
332 'new': encoding.fromlocal(new)}, f
337 'new': encoding.fromlocal(new)}, f
333 d = f.value
338 d = f.value
334 d, output = d.split('\n', 1)
339 d, output = d.split('\n', 1)
335 try:
340 try:
336 d = bool(int(d))
341 d = bool(int(d))
337 except ValueError:
342 except ValueError:
338 raise error.ResponseError(
343 raise error.ResponseError(
339 _('push failed (unexpected response):'), d)
344 _('push failed (unexpected response):'), d)
340 for l in output.splitlines(True):
345 for l in output.splitlines(True):
341 self.ui.status(_('remote: '), l)
346 self.ui.status(_('remote: '), l)
342 yield d
347 yield d
343
348
344 @batchable
349 @batchable
345 def listkeys(self, namespace):
350 def listkeys(self, namespace):
346 if not self.capable('pushkey'):
351 if not self.capable('pushkey'):
347 yield {}, None
352 yield {}, None
348 f = future()
353 f = future()
349 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
354 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
350 yield {'namespace': encoding.fromlocal(namespace)}, f
355 yield {'namespace': encoding.fromlocal(namespace)}, f
351 d = f.value
356 d = f.value
352 self.ui.debug('received listkey for "%s": %i bytes\n'
357 self.ui.debug('received listkey for "%s": %i bytes\n'
353 % (namespace, len(d)))
358 % (namespace, len(d)))
354 yield pushkeymod.decodekeys(d)
359 yield pushkeymod.decodekeys(d)
355
360
356 def stream_out(self):
361 def stream_out(self):
357 return self._callstream('stream_out')
362 return self._callstream('stream_out')
358
363
359 def changegroup(self, nodes, kind):
364 def changegroup(self, nodes, kind):
360 n = encodelist(nodes)
365 n = encodelist(nodes)
361 f = self._callcompressable("changegroup", roots=n)
366 f = self._callcompressable("changegroup", roots=n)
362 return changegroupmod.cg1unpacker(f, 'UN')
367 return changegroupmod.cg1unpacker(f, 'UN')
363
368
364 def changegroupsubset(self, bases, heads, kind):
369 def changegroupsubset(self, bases, heads, kind):
365 self.requirecap('changegroupsubset', _('look up remote changes'))
370 self.requirecap('changegroupsubset', _('look up remote changes'))
366 bases = encodelist(bases)
371 bases = encodelist(bases)
367 heads = encodelist(heads)
372 heads = encodelist(heads)
368 f = self._callcompressable("changegroupsubset",
373 f = self._callcompressable("changegroupsubset",
369 bases=bases, heads=heads)
374 bases=bases, heads=heads)
370 return changegroupmod.cg1unpacker(f, 'UN')
375 return changegroupmod.cg1unpacker(f, 'UN')
371
376
372 def getbundle(self, source, **kwargs):
377 def getbundle(self, source, **kwargs):
373 self.requirecap('getbundle', _('look up remote changes'))
378 self.requirecap('getbundle', _('look up remote changes'))
374 opts = {}
379 opts = {}
375 bundlecaps = kwargs.get('bundlecaps')
380 bundlecaps = kwargs.get('bundlecaps')
376 if bundlecaps is not None:
381 if bundlecaps is not None:
377 kwargs['bundlecaps'] = sorted(bundlecaps)
382 kwargs['bundlecaps'] = sorted(bundlecaps)
378 else:
383 else:
379 bundlecaps = () # kwargs could have it to None
384 bundlecaps = () # kwargs could have it to None
380 for key, value in kwargs.iteritems():
385 for key, value in kwargs.iteritems():
381 if value is None:
386 if value is None:
382 continue
387 continue
383 keytype = gboptsmap.get(key)
388 keytype = gboptsmap.get(key)
384 if keytype is None:
389 if keytype is None:
385 assert False, 'unexpected'
390 assert False, 'unexpected'
386 elif keytype == 'nodes':
391 elif keytype == 'nodes':
387 value = encodelist(value)
392 value = encodelist(value)
388 elif keytype in ('csv', 'scsv'):
393 elif keytype in ('csv', 'scsv'):
389 value = ','.join(value)
394 value = ','.join(value)
390 elif keytype == 'boolean':
395 elif keytype == 'boolean':
391 value = '%i' % bool(value)
396 value = '%i' % bool(value)
392 elif keytype != 'plain':
397 elif keytype != 'plain':
393 raise KeyError('unknown getbundle option type %s'
398 raise KeyError('unknown getbundle option type %s'
394 % keytype)
399 % keytype)
395 opts[key] = value
400 opts[key] = value
396 f = self._callcompressable("getbundle", **opts)
401 f = self._callcompressable("getbundle", **opts)
397 if any((cap.startswith('HG2') for cap in bundlecaps)):
402 if any((cap.startswith('HG2') for cap in bundlecaps)):
398 return bundle2.getunbundler(self.ui, f)
403 return bundle2.getunbundler(self.ui, f)
399 else:
404 else:
400 return changegroupmod.cg1unpacker(f, 'UN')
405 return changegroupmod.cg1unpacker(f, 'UN')
401
406
402 def unbundle(self, cg, heads, url):
407 def unbundle(self, cg, heads, url):
403 '''Send cg (a readable file-like object representing the
408 '''Send cg (a readable file-like object representing the
404 changegroup to push, typically a chunkbuffer object) to the
409 changegroup to push, typically a chunkbuffer object) to the
405 remote server as a bundle.
410 remote server as a bundle.
406
411
407 When pushing a bundle10 stream, return an integer indicating the
412 When pushing a bundle10 stream, return an integer indicating the
408 result of the push (see localrepository.addchangegroup()).
413 result of the push (see localrepository.addchangegroup()).
409
414
410 When pushing a bundle20 stream, return a bundle20 stream.
415 When pushing a bundle20 stream, return a bundle20 stream.
411
416
412 `url` is the url the client thinks it's pushing to, which is
417 `url` is the url the client thinks it's pushing to, which is
413 visible to hooks.
418 visible to hooks.
414 '''
419 '''
415
420
416 if heads != ['force'] and self.capable('unbundlehash'):
421 if heads != ['force'] and self.capable('unbundlehash'):
417 heads = encodelist(['hashed',
422 heads = encodelist(['hashed',
418 hashlib.sha1(''.join(sorted(heads))).digest()])
423 hashlib.sha1(''.join(sorted(heads))).digest()])
419 else:
424 else:
420 heads = encodelist(heads)
425 heads = encodelist(heads)
421
426
422 if util.safehasattr(cg, 'deltaheader'):
427 if util.safehasattr(cg, 'deltaheader'):
423 # this a bundle10, do the old style call sequence
428 # this a bundle10, do the old style call sequence
424 ret, output = self._callpush("unbundle", cg, heads=heads)
429 ret, output = self._callpush("unbundle", cg, heads=heads)
425 if ret == "":
430 if ret == "":
426 raise error.ResponseError(
431 raise error.ResponseError(
427 _('push failed:'), output)
432 _('push failed:'), output)
428 try:
433 try:
429 ret = int(ret)
434 ret = int(ret)
430 except ValueError:
435 except ValueError:
431 raise error.ResponseError(
436 raise error.ResponseError(
432 _('push failed (unexpected response):'), ret)
437 _('push failed (unexpected response):'), ret)
433
438
434 for l in output.splitlines(True):
439 for l in output.splitlines(True):
435 self.ui.status(_('remote: '), l)
440 self.ui.status(_('remote: '), l)
436 else:
441 else:
437 # bundle2 push. Send a stream, fetch a stream.
442 # bundle2 push. Send a stream, fetch a stream.
438 stream = self._calltwowaystream('unbundle', cg, heads=heads)
443 stream = self._calltwowaystream('unbundle', cg, heads=heads)
439 ret = bundle2.getunbundler(self.ui, stream)
444 ret = bundle2.getunbundler(self.ui, stream)
440 return ret
445 return ret
441
446
442 def debugwireargs(self, one, two, three=None, four=None, five=None):
447 def debugwireargs(self, one, two, three=None, four=None, five=None):
443 # don't pass optional arguments left at their default value
448 # don't pass optional arguments left at their default value
444 opts = {}
449 opts = {}
445 if three is not None:
450 if three is not None:
446 opts['three'] = three
451 opts['three'] = three
447 if four is not None:
452 if four is not None:
448 opts['four'] = four
453 opts['four'] = four
449 return self._call('debugwireargs', one=one, two=two, **opts)
454 return self._call('debugwireargs', one=one, two=two, **opts)
450
455
451 def _call(self, cmd, **args):
456 def _call(self, cmd, **args):
452 """execute <cmd> on the server
457 """execute <cmd> on the server
453
458
454 The command is expected to return a simple string.
459 The command is expected to return a simple string.
455
460
456 returns the server reply as a string."""
461 returns the server reply as a string."""
457 raise NotImplementedError()
462 raise NotImplementedError()
458
463
459 def _callstream(self, cmd, **args):
464 def _callstream(self, cmd, **args):
460 """execute <cmd> on the server
465 """execute <cmd> on the server
461
466
462 The command is expected to return a stream. Note that if the
467 The command is expected to return a stream. Note that if the
463 command doesn't return a stream, _callstream behaves
468 command doesn't return a stream, _callstream behaves
464 differently for ssh and http peers.
469 differently for ssh and http peers.
465
470
466 returns the server reply as a file like object.
471 returns the server reply as a file like object.
467 """
472 """
468 raise NotImplementedError()
473 raise NotImplementedError()
469
474
470 def _callcompressable(self, cmd, **args):
475 def _callcompressable(self, cmd, **args):
471 """execute <cmd> on the server
476 """execute <cmd> on the server
472
477
473 The command is expected to return a stream.
478 The command is expected to return a stream.
474
479
475 The stream may have been compressed in some implementations. This
480 The stream may have been compressed in some implementations. This
476 function takes care of the decompression. This is the only difference
481 function takes care of the decompression. This is the only difference
477 with _callstream.
482 with _callstream.
478
483
479 returns the server reply as a file like object.
484 returns the server reply as a file like object.
480 """
485 """
481 raise NotImplementedError()
486 raise NotImplementedError()
482
487
483 def _callpush(self, cmd, fp, **args):
488 def _callpush(self, cmd, fp, **args):
484 """execute a <cmd> on server
489 """execute a <cmd> on server
485
490
486 The command is expected to be related to a push. Push has a special
491 The command is expected to be related to a push. Push has a special
487 return method.
492 return method.
488
493
489 returns the server reply as a (ret, output) tuple. ret is either
494 returns the server reply as a (ret, output) tuple. ret is either
490 empty (error) or a stringified int.
495 empty (error) or a stringified int.
491 """
496 """
492 raise NotImplementedError()
497 raise NotImplementedError()
493
498
494 def _calltwowaystream(self, cmd, fp, **args):
499 def _calltwowaystream(self, cmd, fp, **args):
495 """execute <cmd> on server
500 """execute <cmd> on server
496
501
497 The command will send a stream to the server and get a stream in reply.
502 The command will send a stream to the server and get a stream in reply.
498 """
503 """
499 raise NotImplementedError()
504 raise NotImplementedError()
500
505
501 def _abort(self, exception):
506 def _abort(self, exception):
502 """clearly abort the wire protocol connection and raise the exception
507 """clearly abort the wire protocol connection and raise the exception
503 """
508 """
504 raise NotImplementedError()
509 raise NotImplementedError()
505
510
506 # server side
511 # server side
507
512
508 # wire protocol command can either return a string or one of these classes.
513 # wire protocol command can either return a string or one of these classes.
509 class streamres(object):
514 class streamres(object):
510 """wireproto reply: binary stream
515 """wireproto reply: binary stream
511
516
512 The call was successful and the result is a stream.
517 The call was successful and the result is a stream.
513 Iterate on the `self.gen` attribute to retrieve chunks.
518 Iterate on the `self.gen` attribute to retrieve chunks.
514 """
519 """
515 def __init__(self, gen):
520 def __init__(self, gen):
516 self.gen = gen
521 self.gen = gen
517
522
518 class pushres(object):
523 class pushres(object):
519 """wireproto reply: success with simple integer return
524 """wireproto reply: success with simple integer return
520
525
521 The call was successful and returned an integer contained in `self.res`.
526 The call was successful and returned an integer contained in `self.res`.
522 """
527 """
523 def __init__(self, res):
528 def __init__(self, res):
524 self.res = res
529 self.res = res
525
530
526 class pusherr(object):
531 class pusherr(object):
527 """wireproto reply: failure
532 """wireproto reply: failure
528
533
529 The call failed. The `self.res` attribute contains the error message.
534 The call failed. The `self.res` attribute contains the error message.
530 """
535 """
531 def __init__(self, res):
536 def __init__(self, res):
532 self.res = res
537 self.res = res
533
538
534 class ooberror(object):
539 class ooberror(object):
535 """wireproto reply: failure of a batch of operation
540 """wireproto reply: failure of a batch of operation
536
541
537 Something failed during a batch call. The error message is stored in
542 Something failed during a batch call. The error message is stored in
538 `self.message`.
543 `self.message`.
539 """
544 """
540 def __init__(self, message):
545 def __init__(self, message):
541 self.message = message
546 self.message = message
542
547
543 def getdispatchrepo(repo, proto, command):
548 def getdispatchrepo(repo, proto, command):
544 """Obtain the repo used for processing wire protocol commands.
549 """Obtain the repo used for processing wire protocol commands.
545
550
546 The intent of this function is to serve as a monkeypatch point for
551 The intent of this function is to serve as a monkeypatch point for
547 extensions that need commands to operate on different repo views under
552 extensions that need commands to operate on different repo views under
548 specialized circumstances.
553 specialized circumstances.
549 """
554 """
550 return repo.filtered('served')
555 return repo.filtered('served')
551
556
552 def dispatch(repo, proto, command):
557 def dispatch(repo, proto, command):
553 repo = getdispatchrepo(repo, proto, command)
558 repo = getdispatchrepo(repo, proto, command)
554 func, spec = commands[command]
559 func, spec = commands[command]
555 args = proto.getargs(spec)
560 args = proto.getargs(spec)
556 return func(repo, proto, *args)
561 return func(repo, proto, *args)
557
562
558 def options(cmd, keys, others):
563 def options(cmd, keys, others):
559 opts = {}
564 opts = {}
560 for k in keys:
565 for k in keys:
561 if k in others:
566 if k in others:
562 opts[k] = others[k]
567 opts[k] = others[k]
563 del others[k]
568 del others[k]
564 if others:
569 if others:
565 sys.stderr.write("warning: %s ignored unexpected arguments %s\n"
570 sys.stderr.write("warning: %s ignored unexpected arguments %s\n"
566 % (cmd, ",".join(others)))
571 % (cmd, ",".join(others)))
567 return opts
572 return opts
568
573
569 def bundle1allowed(repo, action):
574 def bundle1allowed(repo, action):
570 """Whether a bundle1 operation is allowed from the server.
575 """Whether a bundle1 operation is allowed from the server.
571
576
572 Priority is:
577 Priority is:
573
578
574 1. server.bundle1gd.<action> (if generaldelta active)
579 1. server.bundle1gd.<action> (if generaldelta active)
575 2. server.bundle1.<action>
580 2. server.bundle1.<action>
576 3. server.bundle1gd (if generaldelta active)
581 3. server.bundle1gd (if generaldelta active)
577 4. server.bundle1
582 4. server.bundle1
578 """
583 """
579 ui = repo.ui
584 ui = repo.ui
580 gd = 'generaldelta' in repo.requirements
585 gd = 'generaldelta' in repo.requirements
581
586
582 if gd:
587 if gd:
583 v = ui.configbool('server', 'bundle1gd.%s' % action, None)
588 v = ui.configbool('server', 'bundle1gd.%s' % action, None)
584 if v is not None:
589 if v is not None:
585 return v
590 return v
586
591
587 v = ui.configbool('server', 'bundle1.%s' % action, None)
592 v = ui.configbool('server', 'bundle1.%s' % action, None)
588 if v is not None:
593 if v is not None:
589 return v
594 return v
590
595
591 if gd:
596 if gd:
592 v = ui.configbool('server', 'bundle1gd', None)
597 v = ui.configbool('server', 'bundle1gd', None)
593 if v is not None:
598 if v is not None:
594 return v
599 return v
595
600
596 return ui.configbool('server', 'bundle1', True)
601 return ui.configbool('server', 'bundle1', True)
597
602
598 # list of commands
603 # list of commands
599 commands = {}
604 commands = {}
600
605
601 def wireprotocommand(name, args=''):
606 def wireprotocommand(name, args=''):
602 """decorator for wire protocol command"""
607 """decorator for wire protocol command"""
603 def register(func):
608 def register(func):
604 commands[name] = (func, args)
609 commands[name] = (func, args)
605 return func
610 return func
606 return register
611 return register
607
612
608 @wireprotocommand('batch', 'cmds *')
613 @wireprotocommand('batch', 'cmds *')
609 def batch(repo, proto, cmds, others):
614 def batch(repo, proto, cmds, others):
610 repo = repo.filtered("served")
615 repo = repo.filtered("served")
611 res = []
616 res = []
612 for pair in cmds.split(';'):
617 for pair in cmds.split(';'):
613 op, args = pair.split(' ', 1)
618 op, args = pair.split(' ', 1)
614 vals = {}
619 vals = {}
615 for a in args.split(','):
620 for a in args.split(','):
616 if a:
621 if a:
617 n, v = a.split('=')
622 n, v = a.split('=')
618 vals[n] = unescapearg(v)
623 vals[n] = unescapearg(v)
619 func, spec = commands[op]
624 func, spec = commands[op]
620 if spec:
625 if spec:
621 keys = spec.split()
626 keys = spec.split()
622 data = {}
627 data = {}
623 for k in keys:
628 for k in keys:
624 if k == '*':
629 if k == '*':
625 star = {}
630 star = {}
626 for key in vals.keys():
631 for key in vals.keys():
627 if key not in keys:
632 if key not in keys:
628 star[key] = vals[key]
633 star[key] = vals[key]
629 data['*'] = star
634 data['*'] = star
630 else:
635 else:
631 data[k] = vals[k]
636 data[k] = vals[k]
632 result = func(repo, proto, *[data[k] for k in keys])
637 result = func(repo, proto, *[data[k] for k in keys])
633 else:
638 else:
634 result = func(repo, proto)
639 result = func(repo, proto)
635 if isinstance(result, ooberror):
640 if isinstance(result, ooberror):
636 return result
641 return result
637 res.append(escapearg(result))
642 res.append(escapearg(result))
638 return ';'.join(res)
643 return ';'.join(res)
639
644
640 @wireprotocommand('between', 'pairs')
645 @wireprotocommand('between', 'pairs')
641 def between(repo, proto, pairs):
646 def between(repo, proto, pairs):
642 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
647 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
643 r = []
648 r = []
644 for b in repo.between(pairs):
649 for b in repo.between(pairs):
645 r.append(encodelist(b) + "\n")
650 r.append(encodelist(b) + "\n")
646 return "".join(r)
651 return "".join(r)
647
652
648 @wireprotocommand('branchmap')
653 @wireprotocommand('branchmap')
649 def branchmap(repo, proto):
654 def branchmap(repo, proto):
650 branchmap = repo.branchmap()
655 branchmap = repo.branchmap()
651 heads = []
656 heads = []
652 for branch, nodes in branchmap.iteritems():
657 for branch, nodes in branchmap.iteritems():
653 branchname = urlreq.quote(encoding.fromlocal(branch))
658 branchname = urlreq.quote(encoding.fromlocal(branch))
654 branchnodes = encodelist(nodes)
659 branchnodes = encodelist(nodes)
655 heads.append('%s %s' % (branchname, branchnodes))
660 heads.append('%s %s' % (branchname, branchnodes))
656 return '\n'.join(heads)
661 return '\n'.join(heads)
657
662
658 @wireprotocommand('branches', 'nodes')
663 @wireprotocommand('branches', 'nodes')
659 def branches(repo, proto, nodes):
664 def branches(repo, proto, nodes):
660 nodes = decodelist(nodes)
665 nodes = decodelist(nodes)
661 r = []
666 r = []
662 for b in repo.branches(nodes):
667 for b in repo.branches(nodes):
663 r.append(encodelist(b) + "\n")
668 r.append(encodelist(b) + "\n")
664 return "".join(r)
669 return "".join(r)
665
670
666 @wireprotocommand('clonebundles', '')
671 @wireprotocommand('clonebundles', '')
667 def clonebundles(repo, proto):
672 def clonebundles(repo, proto):
668 """Server command for returning info for available bundles to seed clones.
673 """Server command for returning info for available bundles to seed clones.
669
674
670 Clients will parse this response and determine what bundle to fetch.
675 Clients will parse this response and determine what bundle to fetch.
671
676
672 Extensions may wrap this command to filter or dynamically emit data
677 Extensions may wrap this command to filter or dynamically emit data
673 depending on the request. e.g. you could advertise URLs for the closest
678 depending on the request. e.g. you could advertise URLs for the closest
674 data center given the client's IP address.
679 data center given the client's IP address.
675 """
680 """
676 return repo.opener.tryread('clonebundles.manifest')
681 return repo.opener.tryread('clonebundles.manifest')
677
682
678 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
683 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
679 'known', 'getbundle', 'unbundlehash', 'batch']
684 'known', 'getbundle', 'unbundlehash', 'batch']
680
685
681 def _capabilities(repo, proto):
686 def _capabilities(repo, proto):
682 """return a list of capabilities for a repo
687 """return a list of capabilities for a repo
683
688
684 This function exists to allow extensions to easily wrap capabilities
689 This function exists to allow extensions to easily wrap capabilities
685 computation
690 computation
686
691
687 - returns a lists: easy to alter
692 - returns a lists: easy to alter
688 - change done here will be propagated to both `capabilities` and `hello`
693 - change done here will be propagated to both `capabilities` and `hello`
689 command without any other action needed.
694 command without any other action needed.
690 """
695 """
691 # copy to prevent modification of the global list
696 # copy to prevent modification of the global list
692 caps = list(wireprotocaps)
697 caps = list(wireprotocaps)
693 if streamclone.allowservergeneration(repo.ui):
698 if streamclone.allowservergeneration(repo.ui):
694 if repo.ui.configbool('server', 'preferuncompressed', False):
699 if repo.ui.configbool('server', 'preferuncompressed', False):
695 caps.append('stream-preferred')
700 caps.append('stream-preferred')
696 requiredformats = repo.requirements & repo.supportedformats
701 requiredformats = repo.requirements & repo.supportedformats
697 # if our local revlogs are just revlogv1, add 'stream' cap
702 # if our local revlogs are just revlogv1, add 'stream' cap
698 if not requiredformats - set(('revlogv1',)):
703 if not requiredformats - set(('revlogv1',)):
699 caps.append('stream')
704 caps.append('stream')
700 # otherwise, add 'streamreqs' detailing our local revlog format
705 # otherwise, add 'streamreqs' detailing our local revlog format
701 else:
706 else:
702 caps.append('streamreqs=%s' % ','.join(sorted(requiredformats)))
707 caps.append('streamreqs=%s' % ','.join(sorted(requiredformats)))
703 if repo.ui.configbool('experimental', 'bundle2-advertise', True):
708 if repo.ui.configbool('experimental', 'bundle2-advertise', True):
704 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
709 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
705 caps.append('bundle2=' + urlreq.quote(capsblob))
710 caps.append('bundle2=' + urlreq.quote(capsblob))
706 caps.append('unbundle=%s' % ','.join(bundle2.bundlepriority))
711 caps.append('unbundle=%s' % ','.join(bundle2.bundlepriority))
707 caps.append(
712 caps.append(
708 'httpheader=%d' % repo.ui.configint('server', 'maxhttpheaderlen', 1024))
713 'httpheader=%d' % repo.ui.configint('server', 'maxhttpheaderlen', 1024))
709 if repo.ui.configbool('experimental', 'httppostargs', False):
714 if repo.ui.configbool('experimental', 'httppostargs', False):
710 caps.append('httppostargs')
715 caps.append('httppostargs')
711 return caps
716 return caps
712
717
713 # If you are writing an extension and consider wrapping this function. Wrap
718 # If you are writing an extension and consider wrapping this function. Wrap
714 # `_capabilities` instead.
719 # `_capabilities` instead.
715 @wireprotocommand('capabilities')
720 @wireprotocommand('capabilities')
716 def capabilities(repo, proto):
721 def capabilities(repo, proto):
717 return ' '.join(_capabilities(repo, proto))
722 return ' '.join(_capabilities(repo, proto))
718
723
719 @wireprotocommand('changegroup', 'roots')
724 @wireprotocommand('changegroup', 'roots')
720 def changegroup(repo, proto, roots):
725 def changegroup(repo, proto, roots):
721 nodes = decodelist(roots)
726 nodes = decodelist(roots)
722 cg = changegroupmod.changegroup(repo, nodes, 'serve')
727 cg = changegroupmod.changegroup(repo, nodes, 'serve')
723 return streamres(proto.groupchunks(cg))
728 return streamres(proto.groupchunks(cg))
724
729
725 @wireprotocommand('changegroupsubset', 'bases heads')
730 @wireprotocommand('changegroupsubset', 'bases heads')
726 def changegroupsubset(repo, proto, bases, heads):
731 def changegroupsubset(repo, proto, bases, heads):
727 bases = decodelist(bases)
732 bases = decodelist(bases)
728 heads = decodelist(heads)
733 heads = decodelist(heads)
729 cg = changegroupmod.changegroupsubset(repo, bases, heads, 'serve')
734 cg = changegroupmod.changegroupsubset(repo, bases, heads, 'serve')
730 return streamres(proto.groupchunks(cg))
735 return streamres(proto.groupchunks(cg))
731
736
732 @wireprotocommand('debugwireargs', 'one two *')
737 @wireprotocommand('debugwireargs', 'one two *')
733 def debugwireargs(repo, proto, one, two, others):
738 def debugwireargs(repo, proto, one, two, others):
734 # only accept optional args from the known set
739 # only accept optional args from the known set
735 opts = options('debugwireargs', ['three', 'four'], others)
740 opts = options('debugwireargs', ['three', 'four'], others)
736 return repo.debugwireargs(one, two, **opts)
741 return repo.debugwireargs(one, two, **opts)
737
742
738 # List of options accepted by getbundle.
743 # List of options accepted by getbundle.
739 #
744 #
740 # Meant to be extended by extensions. It is the extension's responsibility to
745 # Meant to be extended by extensions. It is the extension's responsibility to
741 # ensure such options are properly processed in exchange.getbundle.
746 # ensure such options are properly processed in exchange.getbundle.
742 gboptslist = ['heads', 'common', 'bundlecaps']
747 gboptslist = ['heads', 'common', 'bundlecaps']
743
748
744 @wireprotocommand('getbundle', '*')
749 @wireprotocommand('getbundle', '*')
745 def getbundle(repo, proto, others):
750 def getbundle(repo, proto, others):
746 opts = options('getbundle', gboptsmap.keys(), others)
751 opts = options('getbundle', gboptsmap.keys(), others)
747 for k, v in opts.iteritems():
752 for k, v in opts.iteritems():
748 keytype = gboptsmap[k]
753 keytype = gboptsmap[k]
749 if keytype == 'nodes':
754 if keytype == 'nodes':
750 opts[k] = decodelist(v)
755 opts[k] = decodelist(v)
751 elif keytype == 'csv':
756 elif keytype == 'csv':
752 opts[k] = list(v.split(','))
757 opts[k] = list(v.split(','))
753 elif keytype == 'scsv':
758 elif keytype == 'scsv':
754 opts[k] = set(v.split(','))
759 opts[k] = set(v.split(','))
755 elif keytype == 'boolean':
760 elif keytype == 'boolean':
756 # Client should serialize False as '0', which is a non-empty string
761 # Client should serialize False as '0', which is a non-empty string
757 # so it evaluates as a True bool.
762 # so it evaluates as a True bool.
758 if v == '0':
763 if v == '0':
759 opts[k] = False
764 opts[k] = False
760 else:
765 else:
761 opts[k] = bool(v)
766 opts[k] = bool(v)
762 elif keytype != 'plain':
767 elif keytype != 'plain':
763 raise KeyError('unknown getbundle option type %s'
768 raise KeyError('unknown getbundle option type %s'
764 % keytype)
769 % keytype)
765
770
766 if not bundle1allowed(repo, 'pull'):
771 if not bundle1allowed(repo, 'pull'):
767 if not exchange.bundle2requested(opts.get('bundlecaps')):
772 if not exchange.bundle2requested(opts.get('bundlecaps')):
768 return ooberror(bundle2required)
773 return ooberror(bundle2required)
769
774
770 cg = exchange.getbundle(repo, 'serve', **opts)
775 cg = exchange.getbundle(repo, 'serve', **opts)
771 return streamres(proto.groupchunks(cg))
776 return streamres(proto.groupchunks(cg))
772
777
773 @wireprotocommand('heads')
778 @wireprotocommand('heads')
774 def heads(repo, proto):
779 def heads(repo, proto):
775 h = repo.heads()
780 h = repo.heads()
776 return encodelist(h) + "\n"
781 return encodelist(h) + "\n"
777
782
778 @wireprotocommand('hello')
783 @wireprotocommand('hello')
779 def hello(repo, proto):
784 def hello(repo, proto):
780 '''the hello command returns a set of lines describing various
785 '''the hello command returns a set of lines describing various
781 interesting things about the server, in an RFC822-like format.
786 interesting things about the server, in an RFC822-like format.
782 Currently the only one defined is "capabilities", which
787 Currently the only one defined is "capabilities", which
783 consists of a line in the form:
788 consists of a line in the form:
784
789
785 capabilities: space separated list of tokens
790 capabilities: space separated list of tokens
786 '''
791 '''
787 return "capabilities: %s\n" % (capabilities(repo, proto))
792 return "capabilities: %s\n" % (capabilities(repo, proto))
788
793
789 @wireprotocommand('listkeys', 'namespace')
794 @wireprotocommand('listkeys', 'namespace')
790 def listkeys(repo, proto, namespace):
795 def listkeys(repo, proto, namespace):
791 d = repo.listkeys(encoding.tolocal(namespace)).items()
796 d = repo.listkeys(encoding.tolocal(namespace)).items()
792 return pushkeymod.encodekeys(d)
797 return pushkeymod.encodekeys(d)
793
798
794 @wireprotocommand('lookup', 'key')
799 @wireprotocommand('lookup', 'key')
795 def lookup(repo, proto, key):
800 def lookup(repo, proto, key):
796 try:
801 try:
797 k = encoding.tolocal(key)
802 k = encoding.tolocal(key)
798 c = repo[k]
803 c = repo[k]
799 r = c.hex()
804 r = c.hex()
800 success = 1
805 success = 1
801 except Exception as inst:
806 except Exception as inst:
802 r = str(inst)
807 r = str(inst)
803 success = 0
808 success = 0
804 return "%s %s\n" % (success, r)
809 return "%s %s\n" % (success, r)
805
810
806 @wireprotocommand('known', 'nodes *')
811 @wireprotocommand('known', 'nodes *')
807 def known(repo, proto, nodes, others):
812 def known(repo, proto, nodes, others):
808 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
813 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
809
814
810 @wireprotocommand('pushkey', 'namespace key old new')
815 @wireprotocommand('pushkey', 'namespace key old new')
811 def pushkey(repo, proto, namespace, key, old, new):
816 def pushkey(repo, proto, namespace, key, old, new):
812 # compatibility with pre-1.8 clients which were accidentally
817 # compatibility with pre-1.8 clients which were accidentally
813 # sending raw binary nodes rather than utf-8-encoded hex
818 # sending raw binary nodes rather than utf-8-encoded hex
814 if len(new) == 20 and new.encode('string-escape') != new:
819 if len(new) == 20 and new.encode('string-escape') != new:
815 # looks like it could be a binary node
820 # looks like it could be a binary node
816 try:
821 try:
817 new.decode('utf-8')
822 new.decode('utf-8')
818 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
823 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
819 except UnicodeDecodeError:
824 except UnicodeDecodeError:
820 pass # binary, leave unmodified
825 pass # binary, leave unmodified
821 else:
826 else:
822 new = encoding.tolocal(new) # normal path
827 new = encoding.tolocal(new) # normal path
823
828
824 if util.safehasattr(proto, 'restore'):
829 if util.safehasattr(proto, 'restore'):
825
830
826 proto.redirect()
831 proto.redirect()
827
832
828 try:
833 try:
829 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
834 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
830 encoding.tolocal(old), new) or False
835 encoding.tolocal(old), new) or False
831 except error.Abort:
836 except error.Abort:
832 r = False
837 r = False
833
838
834 output = proto.restore()
839 output = proto.restore()
835
840
836 return '%s\n%s' % (int(r), output)
841 return '%s\n%s' % (int(r), output)
837
842
838 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
843 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
839 encoding.tolocal(old), new)
844 encoding.tolocal(old), new)
840 return '%s\n' % int(r)
845 return '%s\n' % int(r)
841
846
842 @wireprotocommand('stream_out')
847 @wireprotocommand('stream_out')
843 def stream(repo, proto):
848 def stream(repo, proto):
844 '''If the server supports streaming clone, it advertises the "stream"
849 '''If the server supports streaming clone, it advertises the "stream"
845 capability with a value representing the version and flags of the repo
850 capability with a value representing the version and flags of the repo
846 it is serving. Client checks to see if it understands the format.
851 it is serving. Client checks to see if it understands the format.
847 '''
852 '''
848 if not streamclone.allowservergeneration(repo.ui):
853 if not streamclone.allowservergeneration(repo.ui):
849 return '1\n'
854 return '1\n'
850
855
851 def getstream(it):
856 def getstream(it):
852 yield '0\n'
857 yield '0\n'
853 for chunk in it:
858 for chunk in it:
854 yield chunk
859 yield chunk
855
860
856 try:
861 try:
857 # LockError may be raised before the first result is yielded. Don't
862 # LockError may be raised before the first result is yielded. Don't
858 # emit output until we're sure we got the lock successfully.
863 # emit output until we're sure we got the lock successfully.
859 it = streamclone.generatev1wireproto(repo)
864 it = streamclone.generatev1wireproto(repo)
860 return streamres(getstream(it))
865 return streamres(getstream(it))
861 except error.LockError:
866 except error.LockError:
862 return '2\n'
867 return '2\n'
863
868
864 @wireprotocommand('unbundle', 'heads')
869 @wireprotocommand('unbundle', 'heads')
865 def unbundle(repo, proto, heads):
870 def unbundle(repo, proto, heads):
866 their_heads = decodelist(heads)
871 their_heads = decodelist(heads)
867
872
868 try:
873 try:
869 proto.redirect()
874 proto.redirect()
870
875
871 exchange.check_heads(repo, their_heads, 'preparing changes')
876 exchange.check_heads(repo, their_heads, 'preparing changes')
872
877
873 # write bundle data to temporary file because it can be big
878 # write bundle data to temporary file because it can be big
874 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
879 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
875 fp = os.fdopen(fd, 'wb+')
880 fp = os.fdopen(fd, 'wb+')
876 r = 0
881 r = 0
877 try:
882 try:
878 proto.getfile(fp)
883 proto.getfile(fp)
879 fp.seek(0)
884 fp.seek(0)
880 gen = exchange.readbundle(repo.ui, fp, None)
885 gen = exchange.readbundle(repo.ui, fp, None)
881 if (isinstance(gen, changegroupmod.cg1unpacker)
886 if (isinstance(gen, changegroupmod.cg1unpacker)
882 and not bundle1allowed(repo, 'push')):
887 and not bundle1allowed(repo, 'push')):
883 return ooberror(bundle2required)
888 return ooberror(bundle2required)
884
889
885 r = exchange.unbundle(repo, gen, their_heads, 'serve',
890 r = exchange.unbundle(repo, gen, their_heads, 'serve',
886 proto._client())
891 proto._client())
887 if util.safehasattr(r, 'addpart'):
892 if util.safehasattr(r, 'addpart'):
888 # The return looks streamable, we are in the bundle2 case and
893 # The return looks streamable, we are in the bundle2 case and
889 # should return a stream.
894 # should return a stream.
890 return streamres(r.getchunks())
895 return streamres(r.getchunks())
891 return pushres(r)
896 return pushres(r)
892
897
893 finally:
898 finally:
894 fp.close()
899 fp.close()
895 os.unlink(tempname)
900 os.unlink(tempname)
896
901
897 except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
902 except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
898 # handle non-bundle2 case first
903 # handle non-bundle2 case first
899 if not getattr(exc, 'duringunbundle2', False):
904 if not getattr(exc, 'duringunbundle2', False):
900 try:
905 try:
901 raise
906 raise
902 except error.Abort:
907 except error.Abort:
903 # The old code we moved used sys.stderr directly.
908 # The old code we moved used sys.stderr directly.
904 # We did not change it to minimise code change.
909 # We did not change it to minimise code change.
905 # This need to be moved to something proper.
910 # This need to be moved to something proper.
906 # Feel free to do it.
911 # Feel free to do it.
907 sys.stderr.write("abort: %s\n" % exc)
912 sys.stderr.write("abort: %s\n" % exc)
908 return pushres(0)
913 return pushres(0)
909 except error.PushRaced:
914 except error.PushRaced:
910 return pusherr(str(exc))
915 return pusherr(str(exc))
911
916
912 bundler = bundle2.bundle20(repo.ui)
917 bundler = bundle2.bundle20(repo.ui)
913 for out in getattr(exc, '_bundle2salvagedoutput', ()):
918 for out in getattr(exc, '_bundle2salvagedoutput', ()):
914 bundler.addpart(out)
919 bundler.addpart(out)
915 try:
920 try:
916 try:
921 try:
917 raise
922 raise
918 except error.PushkeyFailed as exc:
923 except error.PushkeyFailed as exc:
919 # check client caps
924 # check client caps
920 remotecaps = getattr(exc, '_replycaps', None)
925 remotecaps = getattr(exc, '_replycaps', None)
921 if (remotecaps is not None
926 if (remotecaps is not None
922 and 'pushkey' not in remotecaps.get('error', ())):
927 and 'pushkey' not in remotecaps.get('error', ())):
923 # no support remote side, fallback to Abort handler.
928 # no support remote side, fallback to Abort handler.
924 raise
929 raise
925 part = bundler.newpart('error:pushkey')
930 part = bundler.newpart('error:pushkey')
926 part.addparam('in-reply-to', exc.partid)
931 part.addparam('in-reply-to', exc.partid)
927 if exc.namespace is not None:
932 if exc.namespace is not None:
928 part.addparam('namespace', exc.namespace, mandatory=False)
933 part.addparam('namespace', exc.namespace, mandatory=False)
929 if exc.key is not None:
934 if exc.key is not None:
930 part.addparam('key', exc.key, mandatory=False)
935 part.addparam('key', exc.key, mandatory=False)
931 if exc.new is not None:
936 if exc.new is not None:
932 part.addparam('new', exc.new, mandatory=False)
937 part.addparam('new', exc.new, mandatory=False)
933 if exc.old is not None:
938 if exc.old is not None:
934 part.addparam('old', exc.old, mandatory=False)
939 part.addparam('old', exc.old, mandatory=False)
935 if exc.ret is not None:
940 if exc.ret is not None:
936 part.addparam('ret', exc.ret, mandatory=False)
941 part.addparam('ret', exc.ret, mandatory=False)
937 except error.BundleValueError as exc:
942 except error.BundleValueError as exc:
938 errpart = bundler.newpart('error:unsupportedcontent')
943 errpart = bundler.newpart('error:unsupportedcontent')
939 if exc.parttype is not None:
944 if exc.parttype is not None:
940 errpart.addparam('parttype', exc.parttype)
945 errpart.addparam('parttype', exc.parttype)
941 if exc.params:
946 if exc.params:
942 errpart.addparam('params', '\0'.join(exc.params))
947 errpart.addparam('params', '\0'.join(exc.params))
943 except error.Abort as exc:
948 except error.Abort as exc:
944 manargs = [('message', str(exc))]
949 manargs = [('message', str(exc))]
945 advargs = []
950 advargs = []
946 if exc.hint is not None:
951 if exc.hint is not None:
947 advargs.append(('hint', exc.hint))
952 advargs.append(('hint', exc.hint))
948 bundler.addpart(bundle2.bundlepart('error:abort',
953 bundler.addpart(bundle2.bundlepart('error:abort',
949 manargs, advargs))
954 manargs, advargs))
950 except error.PushRaced as exc:
955 except error.PushRaced as exc:
951 bundler.newpart('error:pushraced', [('message', str(exc))])
956 bundler.newpart('error:pushraced', [('message', str(exc))])
952 return streamres(bundler.getchunks())
957 return streamres(bundler.getchunks())
General Comments 0
You need to be logged in to leave comments. Login now