##// END OF EJS Templates
sshpeer: move docstring to top...
Yuya Nishihara -
r35475:b520c8f9 @8 default
parent child Browse files
Show More
@@ -1,362 +1,362 b''
1 # sshpeer.py - ssh repository proxy class for mercurial
1 # sshpeer.py - ssh repository proxy class for mercurial
2 #
2 #
3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import re
10 import re
11
11
12 from .i18n import _
12 from .i18n import _
13 from . import (
13 from . import (
14 error,
14 error,
15 pycompat,
15 pycompat,
16 util,
16 util,
17 wireproto,
17 wireproto,
18 )
18 )
19
19
20 def _serverquote(s):
20 def _serverquote(s):
21 """quote a string for the remote shell ... which we assume is sh"""
21 if not s:
22 if not s:
22 return s
23 return s
23 '''quote a string for the remote shell ... which we assume is sh'''
24 if re.match('[a-zA-Z0-9@%_+=:,./-]*$', s):
24 if re.match('[a-zA-Z0-9@%_+=:,./-]*$', s):
25 return s
25 return s
26 return "'%s'" % s.replace("'", "'\\''")
26 return "'%s'" % s.replace("'", "'\\''")
27
27
28 def _forwardoutput(ui, pipe):
28 def _forwardoutput(ui, pipe):
29 """display all data currently available on pipe as remote output.
29 """display all data currently available on pipe as remote output.
30
30
31 This is non blocking."""
31 This is non blocking."""
32 s = util.readpipe(pipe)
32 s = util.readpipe(pipe)
33 if s:
33 if s:
34 for l in s.splitlines():
34 for l in s.splitlines():
35 ui.status(_("remote: "), l, '\n')
35 ui.status(_("remote: "), l, '\n')
36
36
37 class doublepipe(object):
37 class doublepipe(object):
38 """Operate a side-channel pipe in addition of a main one
38 """Operate a side-channel pipe in addition of a main one
39
39
40 The side-channel pipe contains server output to be forwarded to the user
40 The side-channel pipe contains server output to be forwarded to the user
41 input. The double pipe will behave as the "main" pipe, but will ensure the
41 input. The double pipe will behave as the "main" pipe, but will ensure the
42 content of the "side" pipe is properly processed while we wait for blocking
42 content of the "side" pipe is properly processed while we wait for blocking
43 call on the "main" pipe.
43 call on the "main" pipe.
44
44
45 If large amounts of data are read from "main", the forward will cease after
45 If large amounts of data are read from "main", the forward will cease after
46 the first bytes start to appear. This simplifies the implementation
46 the first bytes start to appear. This simplifies the implementation
47 without affecting actual output of sshpeer too much as we rarely issue
47 without affecting actual output of sshpeer too much as we rarely issue
48 large read for data not yet emitted by the server.
48 large read for data not yet emitted by the server.
49
49
50 The main pipe is expected to be a 'bufferedinputpipe' from the util module
50 The main pipe is expected to be a 'bufferedinputpipe' from the util module
51 that handle all the os specific bits. This class lives in this module
51 that handle all the os specific bits. This class lives in this module
52 because it focus on behavior specific to the ssh protocol."""
52 because it focus on behavior specific to the ssh protocol."""
53
53
54 def __init__(self, ui, main, side):
54 def __init__(self, ui, main, side):
55 self._ui = ui
55 self._ui = ui
56 self._main = main
56 self._main = main
57 self._side = side
57 self._side = side
58
58
59 def _wait(self):
59 def _wait(self):
60 """wait until some data are available on main or side
60 """wait until some data are available on main or side
61
61
62 return a pair of boolean (ismainready, issideready)
62 return a pair of boolean (ismainready, issideready)
63
63
64 (This will only wait for data if the setup is supported by `util.poll`)
64 (This will only wait for data if the setup is supported by `util.poll`)
65 """
65 """
66 if getattr(self._main, 'hasbuffer', False): # getattr for classic pipe
66 if getattr(self._main, 'hasbuffer', False): # getattr for classic pipe
67 return (True, True) # main has data, assume side is worth poking at.
67 return (True, True) # main has data, assume side is worth poking at.
68 fds = [self._main.fileno(), self._side.fileno()]
68 fds = [self._main.fileno(), self._side.fileno()]
69 try:
69 try:
70 act = util.poll(fds)
70 act = util.poll(fds)
71 except NotImplementedError:
71 except NotImplementedError:
72 # non supported yet case, assume all have data.
72 # non supported yet case, assume all have data.
73 act = fds
73 act = fds
74 return (self._main.fileno() in act, self._side.fileno() in act)
74 return (self._main.fileno() in act, self._side.fileno() in act)
75
75
76 def write(self, data):
76 def write(self, data):
77 return self._call('write', data)
77 return self._call('write', data)
78
78
79 def read(self, size):
79 def read(self, size):
80 r = self._call('read', size)
80 r = self._call('read', size)
81 if size != 0 and not r:
81 if size != 0 and not r:
82 # We've observed a condition that indicates the
82 # We've observed a condition that indicates the
83 # stdout closed unexpectedly. Check stderr one
83 # stdout closed unexpectedly. Check stderr one
84 # more time and snag anything that's there before
84 # more time and snag anything that's there before
85 # letting anyone know the main part of the pipe
85 # letting anyone know the main part of the pipe
86 # closed prematurely.
86 # closed prematurely.
87 _forwardoutput(self._ui, self._side)
87 _forwardoutput(self._ui, self._side)
88 return r
88 return r
89
89
90 def readline(self):
90 def readline(self):
91 return self._call('readline')
91 return self._call('readline')
92
92
93 def _call(self, methname, data=None):
93 def _call(self, methname, data=None):
94 """call <methname> on "main", forward output of "side" while blocking
94 """call <methname> on "main", forward output of "side" while blocking
95 """
95 """
96 # data can be '' or 0
96 # data can be '' or 0
97 if (data is not None and not data) or self._main.closed:
97 if (data is not None and not data) or self._main.closed:
98 _forwardoutput(self._ui, self._side)
98 _forwardoutput(self._ui, self._side)
99 return ''
99 return ''
100 while True:
100 while True:
101 mainready, sideready = self._wait()
101 mainready, sideready = self._wait()
102 if sideready:
102 if sideready:
103 _forwardoutput(self._ui, self._side)
103 _forwardoutput(self._ui, self._side)
104 if mainready:
104 if mainready:
105 meth = getattr(self._main, methname)
105 meth = getattr(self._main, methname)
106 if data is None:
106 if data is None:
107 return meth()
107 return meth()
108 else:
108 else:
109 return meth(data)
109 return meth(data)
110
110
111 def close(self):
111 def close(self):
112 return self._main.close()
112 return self._main.close()
113
113
114 def flush(self):
114 def flush(self):
115 return self._main.flush()
115 return self._main.flush()
116
116
117 class sshpeer(wireproto.wirepeer):
117 class sshpeer(wireproto.wirepeer):
118 def __init__(self, ui, path, create=False):
118 def __init__(self, ui, path, create=False):
119 self._url = path
119 self._url = path
120 self._ui = ui
120 self._ui = ui
121 self._pipeo = self._pipei = self._pipee = None
121 self._pipeo = self._pipei = self._pipee = None
122
122
123 u = util.url(path, parsequery=False, parsefragment=False)
123 u = util.url(path, parsequery=False, parsefragment=False)
124 if u.scheme != 'ssh' or not u.host or u.path is None:
124 if u.scheme != 'ssh' or not u.host or u.path is None:
125 self._abort(error.RepoError(_("couldn't parse location %s") % path))
125 self._abort(error.RepoError(_("couldn't parse location %s") % path))
126
126
127 util.checksafessh(path)
127 util.checksafessh(path)
128
128
129 if u.passwd is not None:
129 if u.passwd is not None:
130 self._abort(error.RepoError(_("password in URL not supported")))
130 self._abort(error.RepoError(_("password in URL not supported")))
131
131
132 self._user = u.user
132 self._user = u.user
133 self._host = u.host
133 self._host = u.host
134 self._port = u.port
134 self._port = u.port
135 self._path = u.path or '.'
135 self._path = u.path or '.'
136
136
137 sshcmd = self.ui.config("ui", "ssh")
137 sshcmd = self.ui.config("ui", "ssh")
138 remotecmd = self.ui.config("ui", "remotecmd")
138 remotecmd = self.ui.config("ui", "remotecmd")
139 sshaddenv = dict(self.ui.configitems("sshenv"))
139 sshaddenv = dict(self.ui.configitems("sshenv"))
140 sshenv = util.shellenviron(sshaddenv)
140 sshenv = util.shellenviron(sshaddenv)
141
141
142 args = util.sshargs(sshcmd, self._host, self._user, self._port)
142 args = util.sshargs(sshcmd, self._host, self._user, self._port)
143
143
144 if create:
144 if create:
145 cmd = '%s %s %s' % (sshcmd, args,
145 cmd = '%s %s %s' % (sshcmd, args,
146 util.shellquote("%s init %s" %
146 util.shellquote("%s init %s" %
147 (_serverquote(remotecmd), _serverquote(self._path))))
147 (_serverquote(remotecmd), _serverquote(self._path))))
148 ui.debug('running %s\n' % cmd)
148 ui.debug('running %s\n' % cmd)
149 res = ui.system(cmd, blockedtag='sshpeer', environ=sshenv)
149 res = ui.system(cmd, blockedtag='sshpeer', environ=sshenv)
150 if res != 0:
150 if res != 0:
151 self._abort(error.RepoError(_("could not create remote repo")))
151 self._abort(error.RepoError(_("could not create remote repo")))
152
152
153 self._validaterepo(sshcmd, args, remotecmd, sshenv)
153 self._validaterepo(sshcmd, args, remotecmd, sshenv)
154
154
155 # Begin of _basepeer interface.
155 # Begin of _basepeer interface.
156
156
157 @util.propertycache
157 @util.propertycache
158 def ui(self):
158 def ui(self):
159 return self._ui
159 return self._ui
160
160
161 def url(self):
161 def url(self):
162 return self._url
162 return self._url
163
163
164 def local(self):
164 def local(self):
165 return None
165 return None
166
166
167 def peer(self):
167 def peer(self):
168 return self
168 return self
169
169
170 def canpush(self):
170 def canpush(self):
171 return True
171 return True
172
172
173 def close(self):
173 def close(self):
174 pass
174 pass
175
175
176 # End of _basepeer interface.
176 # End of _basepeer interface.
177
177
178 # Begin of _basewirecommands interface.
178 # Begin of _basewirecommands interface.
179
179
180 def capabilities(self):
180 def capabilities(self):
181 return self._caps
181 return self._caps
182
182
183 # End of _basewirecommands interface.
183 # End of _basewirecommands interface.
184
184
185 def _validaterepo(self, sshcmd, args, remotecmd, sshenv=None):
185 def _validaterepo(self, sshcmd, args, remotecmd, sshenv=None):
186 # cleanup up previous run
186 # cleanup up previous run
187 self._cleanup()
187 self._cleanup()
188
188
189 cmd = '%s %s %s' % (sshcmd, args,
189 cmd = '%s %s %s' % (sshcmd, args,
190 util.shellquote("%s -R %s serve --stdio" %
190 util.shellquote("%s -R %s serve --stdio" %
191 (_serverquote(remotecmd), _serverquote(self._path))))
191 (_serverquote(remotecmd), _serverquote(self._path))))
192 self.ui.debug('running %s\n' % cmd)
192 self.ui.debug('running %s\n' % cmd)
193 cmd = util.quotecommand(cmd)
193 cmd = util.quotecommand(cmd)
194
194
195 # while self._subprocess isn't used, having it allows the subprocess to
195 # while self._subprocess isn't used, having it allows the subprocess to
196 # to clean up correctly later
196 # to clean up correctly later
197 #
197 #
198 # no buffer allow the use of 'select'
198 # no buffer allow the use of 'select'
199 # feel free to remove buffering and select usage when we ultimately
199 # feel free to remove buffering and select usage when we ultimately
200 # move to threading.
200 # move to threading.
201 sub = util.popen4(cmd, bufsize=0, env=sshenv)
201 sub = util.popen4(cmd, bufsize=0, env=sshenv)
202 self._pipeo, self._pipei, self._pipee, self._subprocess = sub
202 self._pipeo, self._pipei, self._pipee, self._subprocess = sub
203
203
204 self._pipei = util.bufferedinputpipe(self._pipei)
204 self._pipei = util.bufferedinputpipe(self._pipei)
205 self._pipei = doublepipe(self.ui, self._pipei, self._pipee)
205 self._pipei = doublepipe(self.ui, self._pipei, self._pipee)
206 self._pipeo = doublepipe(self.ui, self._pipeo, self._pipee)
206 self._pipeo = doublepipe(self.ui, self._pipeo, self._pipee)
207
207
208 def badresponse():
208 def badresponse():
209 msg = _("no suitable response from remote hg")
209 msg = _("no suitable response from remote hg")
210 hint = self.ui.config("ui", "ssherrorhint")
210 hint = self.ui.config("ui", "ssherrorhint")
211 self._abort(error.RepoError(msg, hint=hint))
211 self._abort(error.RepoError(msg, hint=hint))
212
212
213 try:
213 try:
214 # skip any noise generated by remote shell
214 # skip any noise generated by remote shell
215 self._callstream("hello")
215 self._callstream("hello")
216 r = self._callstream("between", pairs=("%s-%s" % ("0"*40, "0"*40)))
216 r = self._callstream("between", pairs=("%s-%s" % ("0"*40, "0"*40)))
217 except IOError:
217 except IOError:
218 badresponse()
218 badresponse()
219
219
220 lines = ["", "dummy"]
220 lines = ["", "dummy"]
221 max_noise = 500
221 max_noise = 500
222 while lines[-1] and max_noise:
222 while lines[-1] and max_noise:
223 try:
223 try:
224 l = r.readline()
224 l = r.readline()
225 self._readerr()
225 self._readerr()
226 if lines[-1] == "1\n" and l == "\n":
226 if lines[-1] == "1\n" and l == "\n":
227 break
227 break
228 if l:
228 if l:
229 self.ui.debug("remote: ", l)
229 self.ui.debug("remote: ", l)
230 lines.append(l)
230 lines.append(l)
231 max_noise -= 1
231 max_noise -= 1
232 except IOError:
232 except IOError:
233 badresponse()
233 badresponse()
234 else:
234 else:
235 badresponse()
235 badresponse()
236
236
237 self._caps = set()
237 self._caps = set()
238 for l in reversed(lines):
238 for l in reversed(lines):
239 if l.startswith("capabilities:"):
239 if l.startswith("capabilities:"):
240 self._caps.update(l[:-1].split(":")[1].split())
240 self._caps.update(l[:-1].split(":")[1].split())
241 break
241 break
242
242
243 def _readerr(self):
243 def _readerr(self):
244 _forwardoutput(self.ui, self._pipee)
244 _forwardoutput(self.ui, self._pipee)
245
245
246 def _abort(self, exception):
246 def _abort(self, exception):
247 self._cleanup()
247 self._cleanup()
248 raise exception
248 raise exception
249
249
250 def _cleanup(self):
250 def _cleanup(self):
251 if self._pipeo is None:
251 if self._pipeo is None:
252 return
252 return
253 self._pipeo.close()
253 self._pipeo.close()
254 self._pipei.close()
254 self._pipei.close()
255 try:
255 try:
256 # read the error descriptor until EOF
256 # read the error descriptor until EOF
257 for l in self._pipee:
257 for l in self._pipee:
258 self.ui.status(_("remote: "), l)
258 self.ui.status(_("remote: "), l)
259 except (IOError, ValueError):
259 except (IOError, ValueError):
260 pass
260 pass
261 self._pipee.close()
261 self._pipee.close()
262
262
263 __del__ = _cleanup
263 __del__ = _cleanup
264
264
265 def _submitbatch(self, req):
265 def _submitbatch(self, req):
266 rsp = self._callstream("batch", cmds=wireproto.encodebatchcmds(req))
266 rsp = self._callstream("batch", cmds=wireproto.encodebatchcmds(req))
267 available = self._getamount()
267 available = self._getamount()
268 # TODO this response parsing is probably suboptimal for large
268 # TODO this response parsing is probably suboptimal for large
269 # batches with large responses.
269 # batches with large responses.
270 toread = min(available, 1024)
270 toread = min(available, 1024)
271 work = rsp.read(toread)
271 work = rsp.read(toread)
272 available -= toread
272 available -= toread
273 chunk = work
273 chunk = work
274 while chunk:
274 while chunk:
275 while ';' in work:
275 while ';' in work:
276 one, work = work.split(';', 1)
276 one, work = work.split(';', 1)
277 yield wireproto.unescapearg(one)
277 yield wireproto.unescapearg(one)
278 toread = min(available, 1024)
278 toread = min(available, 1024)
279 chunk = rsp.read(toread)
279 chunk = rsp.read(toread)
280 available -= toread
280 available -= toread
281 work += chunk
281 work += chunk
282 yield wireproto.unescapearg(work)
282 yield wireproto.unescapearg(work)
283
283
284 def _callstream(self, cmd, **args):
284 def _callstream(self, cmd, **args):
285 args = pycompat.byteskwargs(args)
285 args = pycompat.byteskwargs(args)
286 self.ui.debug("sending %s command\n" % cmd)
286 self.ui.debug("sending %s command\n" % cmd)
287 self._pipeo.write("%s\n" % cmd)
287 self._pipeo.write("%s\n" % cmd)
288 _func, names = wireproto.commands[cmd]
288 _func, names = wireproto.commands[cmd]
289 keys = names.split()
289 keys = names.split()
290 wireargs = {}
290 wireargs = {}
291 for k in keys:
291 for k in keys:
292 if k == '*':
292 if k == '*':
293 wireargs['*'] = args
293 wireargs['*'] = args
294 break
294 break
295 else:
295 else:
296 wireargs[k] = args[k]
296 wireargs[k] = args[k]
297 del args[k]
297 del args[k]
298 for k, v in sorted(wireargs.iteritems()):
298 for k, v in sorted(wireargs.iteritems()):
299 self._pipeo.write("%s %d\n" % (k, len(v)))
299 self._pipeo.write("%s %d\n" % (k, len(v)))
300 if isinstance(v, dict):
300 if isinstance(v, dict):
301 for dk, dv in v.iteritems():
301 for dk, dv in v.iteritems():
302 self._pipeo.write("%s %d\n" % (dk, len(dv)))
302 self._pipeo.write("%s %d\n" % (dk, len(dv)))
303 self._pipeo.write(dv)
303 self._pipeo.write(dv)
304 else:
304 else:
305 self._pipeo.write(v)
305 self._pipeo.write(v)
306 self._pipeo.flush()
306 self._pipeo.flush()
307
307
308 return self._pipei
308 return self._pipei
309
309
310 def _callcompressable(self, cmd, **args):
310 def _callcompressable(self, cmd, **args):
311 return self._callstream(cmd, **args)
311 return self._callstream(cmd, **args)
312
312
313 def _call(self, cmd, **args):
313 def _call(self, cmd, **args):
314 self._callstream(cmd, **args)
314 self._callstream(cmd, **args)
315 return self._recv()
315 return self._recv()
316
316
317 def _callpush(self, cmd, fp, **args):
317 def _callpush(self, cmd, fp, **args):
318 r = self._call(cmd, **args)
318 r = self._call(cmd, **args)
319 if r:
319 if r:
320 return '', r
320 return '', r
321 for d in iter(lambda: fp.read(4096), ''):
321 for d in iter(lambda: fp.read(4096), ''):
322 self._send(d)
322 self._send(d)
323 self._send("", flush=True)
323 self._send("", flush=True)
324 r = self._recv()
324 r = self._recv()
325 if r:
325 if r:
326 return '', r
326 return '', r
327 return self._recv(), ''
327 return self._recv(), ''
328
328
329 def _calltwowaystream(self, cmd, fp, **args):
329 def _calltwowaystream(self, cmd, fp, **args):
330 r = self._call(cmd, **args)
330 r = self._call(cmd, **args)
331 if r:
331 if r:
332 # XXX needs to be made better
332 # XXX needs to be made better
333 raise error.Abort(_('unexpected remote reply: %s') % r)
333 raise error.Abort(_('unexpected remote reply: %s') % r)
334 for d in iter(lambda: fp.read(4096), ''):
334 for d in iter(lambda: fp.read(4096), ''):
335 self._send(d)
335 self._send(d)
336 self._send("", flush=True)
336 self._send("", flush=True)
337 return self._pipei
337 return self._pipei
338
338
339 def _getamount(self):
339 def _getamount(self):
340 l = self._pipei.readline()
340 l = self._pipei.readline()
341 if l == '\n':
341 if l == '\n':
342 self._readerr()
342 self._readerr()
343 msg = _('check previous remote output')
343 msg = _('check previous remote output')
344 self._abort(error.OutOfBandError(hint=msg))
344 self._abort(error.OutOfBandError(hint=msg))
345 self._readerr()
345 self._readerr()
346 try:
346 try:
347 return int(l)
347 return int(l)
348 except ValueError:
348 except ValueError:
349 self._abort(error.ResponseError(_("unexpected response:"), l))
349 self._abort(error.ResponseError(_("unexpected response:"), l))
350
350
351 def _recv(self):
351 def _recv(self):
352 return self._pipei.read(self._getamount())
352 return self._pipei.read(self._getamount())
353
353
354 def _send(self, data, flush=False):
354 def _send(self, data, flush=False):
355 self._pipeo.write("%d\n" % len(data))
355 self._pipeo.write("%d\n" % len(data))
356 if data:
356 if data:
357 self._pipeo.write(data)
357 self._pipeo.write(data)
358 if flush:
358 if flush:
359 self._pipeo.flush()
359 self._pipeo.flush()
360 self._readerr()
360 self._readerr()
361
361
362 instance = sshpeer
362 instance = sshpeer
General Comments 0
You need to be logged in to leave comments. Login now