##// END OF EJS Templates
sshpeer: move URL validation out of sshpeer.__init__...
Gregory Szorc -
r35949:b202d360 default
parent child Browse files
Show More
@@ -1,374 +1,380 b''
1 # sshpeer.py - ssh repository proxy class for mercurial
1 # sshpeer.py - ssh repository proxy class for mercurial
2 #
2 #
3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import re
10 import re
11
11
12 from .i18n import _
12 from .i18n import _
13 from . import (
13 from . import (
14 error,
14 error,
15 pycompat,
15 pycompat,
16 util,
16 util,
17 wireproto,
17 wireproto,
18 )
18 )
19
19
20 def _serverquote(s):
20 def _serverquote(s):
21 """quote a string for the remote shell ... which we assume is sh"""
21 """quote a string for the remote shell ... which we assume is sh"""
22 if not s:
22 if not s:
23 return s
23 return s
24 if re.match('[a-zA-Z0-9@%_+=:,./-]*$', s):
24 if re.match('[a-zA-Z0-9@%_+=:,./-]*$', s):
25 return s
25 return s
26 return "'%s'" % s.replace("'", "'\\''")
26 return "'%s'" % s.replace("'", "'\\''")
27
27
28 def _forwardoutput(ui, pipe):
28 def _forwardoutput(ui, pipe):
29 """display all data currently available on pipe as remote output.
29 """display all data currently available on pipe as remote output.
30
30
31 This is non blocking."""
31 This is non blocking."""
32 s = util.readpipe(pipe)
32 s = util.readpipe(pipe)
33 if s:
33 if s:
34 for l in s.splitlines():
34 for l in s.splitlines():
35 ui.status(_("remote: "), l, '\n')
35 ui.status(_("remote: "), l, '\n')
36
36
37 class doublepipe(object):
37 class doublepipe(object):
38 """Operate a side-channel pipe in addition of a main one
38 """Operate a side-channel pipe in addition of a main one
39
39
40 The side-channel pipe contains server output to be forwarded to the user
40 The side-channel pipe contains server output to be forwarded to the user
41 input. The double pipe will behave as the "main" pipe, but will ensure the
41 input. The double pipe will behave as the "main" pipe, but will ensure the
42 content of the "side" pipe is properly processed while we wait for blocking
42 content of the "side" pipe is properly processed while we wait for blocking
43 call on the "main" pipe.
43 call on the "main" pipe.
44
44
45 If large amounts of data are read from "main", the forward will cease after
45 If large amounts of data are read from "main", the forward will cease after
46 the first bytes start to appear. This simplifies the implementation
46 the first bytes start to appear. This simplifies the implementation
47 without affecting actual output of sshpeer too much as we rarely issue
47 without affecting actual output of sshpeer too much as we rarely issue
48 large read for data not yet emitted by the server.
48 large read for data not yet emitted by the server.
49
49
50 The main pipe is expected to be a 'bufferedinputpipe' from the util module
50 The main pipe is expected to be a 'bufferedinputpipe' from the util module
51 that handle all the os specific bits. This class lives in this module
51 that handle all the os specific bits. This class lives in this module
52 because it focus on behavior specific to the ssh protocol."""
52 because it focus on behavior specific to the ssh protocol."""
53
53
54 def __init__(self, ui, main, side):
54 def __init__(self, ui, main, side):
55 self._ui = ui
55 self._ui = ui
56 self._main = main
56 self._main = main
57 self._side = side
57 self._side = side
58
58
59 def _wait(self):
59 def _wait(self):
60 """wait until some data are available on main or side
60 """wait until some data are available on main or side
61
61
62 return a pair of boolean (ismainready, issideready)
62 return a pair of boolean (ismainready, issideready)
63
63
64 (This will only wait for data if the setup is supported by `util.poll`)
64 (This will only wait for data if the setup is supported by `util.poll`)
65 """
65 """
66 if getattr(self._main, 'hasbuffer', False): # getattr for classic pipe
66 if getattr(self._main, 'hasbuffer', False): # getattr for classic pipe
67 return (True, True) # main has data, assume side is worth poking at.
67 return (True, True) # main has data, assume side is worth poking at.
68 fds = [self._main.fileno(), self._side.fileno()]
68 fds = [self._main.fileno(), self._side.fileno()]
69 try:
69 try:
70 act = util.poll(fds)
70 act = util.poll(fds)
71 except NotImplementedError:
71 except NotImplementedError:
72 # non supported yet case, assume all have data.
72 # non supported yet case, assume all have data.
73 act = fds
73 act = fds
74 return (self._main.fileno() in act, self._side.fileno() in act)
74 return (self._main.fileno() in act, self._side.fileno() in act)
75
75
76 def write(self, data):
76 def write(self, data):
77 return self._call('write', data)
77 return self._call('write', data)
78
78
79 def read(self, size):
79 def read(self, size):
80 r = self._call('read', size)
80 r = self._call('read', size)
81 if size != 0 and not r:
81 if size != 0 and not r:
82 # We've observed a condition that indicates the
82 # We've observed a condition that indicates the
83 # stdout closed unexpectedly. Check stderr one
83 # stdout closed unexpectedly. Check stderr one
84 # more time and snag anything that's there before
84 # more time and snag anything that's there before
85 # letting anyone know the main part of the pipe
85 # letting anyone know the main part of the pipe
86 # closed prematurely.
86 # closed prematurely.
87 _forwardoutput(self._ui, self._side)
87 _forwardoutput(self._ui, self._side)
88 return r
88 return r
89
89
90 def readline(self):
90 def readline(self):
91 return self._call('readline')
91 return self._call('readline')
92
92
93 def _call(self, methname, data=None):
93 def _call(self, methname, data=None):
94 """call <methname> on "main", forward output of "side" while blocking
94 """call <methname> on "main", forward output of "side" while blocking
95 """
95 """
96 # data can be '' or 0
96 # data can be '' or 0
97 if (data is not None and not data) or self._main.closed:
97 if (data is not None and not data) or self._main.closed:
98 _forwardoutput(self._ui, self._side)
98 _forwardoutput(self._ui, self._side)
99 return ''
99 return ''
100 while True:
100 while True:
101 mainready, sideready = self._wait()
101 mainready, sideready = self._wait()
102 if sideready:
102 if sideready:
103 _forwardoutput(self._ui, self._side)
103 _forwardoutput(self._ui, self._side)
104 if mainready:
104 if mainready:
105 meth = getattr(self._main, methname)
105 meth = getattr(self._main, methname)
106 if data is None:
106 if data is None:
107 return meth()
107 return meth()
108 else:
108 else:
109 return meth(data)
109 return meth(data)
110
110
111 def close(self):
111 def close(self):
112 return self._main.close()
112 return self._main.close()
113
113
114 def flush(self):
114 def flush(self):
115 return self._main.flush()
115 return self._main.flush()
116
116
117 class sshpeer(wireproto.wirepeer):
117 class sshpeer(wireproto.wirepeer):
118 def __init__(self, ui, path, create=False):
118 def __init__(self, ui, path, create=False):
119 self._url = path
119 self._url = path
120 self._ui = ui
120 self._ui = ui
121 self._pipeo = self._pipei = self._pipee = None
121 self._pipeo = self._pipei = self._pipee = None
122
122
123 u = util.url(path, parsequery=False, parsefragment=False)
123 u = util.url(path, parsequery=False, parsefragment=False)
124 if u.scheme != 'ssh' or not u.host or u.path is None:
125 self._abort(error.RepoError(_("couldn't parse location %s") % path))
126
127 util.checksafessh(path)
128
129 if u.passwd is not None:
130 self._abort(error.RepoError(_("password in URL not supported")))
131
124
132 self._user = u.user
125 self._user = u.user
133 self._host = u.host
126 self._host = u.host
134 self._port = u.port
127 self._port = u.port
135 self._path = u.path or '.'
128 self._path = u.path or '.'
136
129
137 sshcmd = self.ui.config("ui", "ssh")
130 sshcmd = self.ui.config("ui", "ssh")
138 remotecmd = self.ui.config("ui", "remotecmd")
131 remotecmd = self.ui.config("ui", "remotecmd")
139 sshaddenv = dict(self.ui.configitems("sshenv"))
132 sshaddenv = dict(self.ui.configitems("sshenv"))
140 sshenv = util.shellenviron(sshaddenv)
133 sshenv = util.shellenviron(sshaddenv)
141
134
142 args = util.sshargs(sshcmd, self._host, self._user, self._port)
135 args = util.sshargs(sshcmd, self._host, self._user, self._port)
143
136
144 if create:
137 if create:
145 cmd = '%s %s %s' % (sshcmd, args,
138 cmd = '%s %s %s' % (sshcmd, args,
146 util.shellquote("%s init %s" %
139 util.shellquote("%s init %s" %
147 (_serverquote(remotecmd), _serverquote(self._path))))
140 (_serverquote(remotecmd), _serverquote(self._path))))
148 ui.debug('running %s\n' % cmd)
141 ui.debug('running %s\n' % cmd)
149 res = ui.system(cmd, blockedtag='sshpeer', environ=sshenv)
142 res = ui.system(cmd, blockedtag='sshpeer', environ=sshenv)
150 if res != 0:
143 if res != 0:
151 self._abort(error.RepoError(_("could not create remote repo")))
144 self._abort(error.RepoError(_("could not create remote repo")))
152
145
153 self._validaterepo(sshcmd, args, remotecmd, sshenv)
146 self._validaterepo(sshcmd, args, remotecmd, sshenv)
154
147
155 # Begin of _basepeer interface.
148 # Begin of _basepeer interface.
156
149
157 @util.propertycache
150 @util.propertycache
158 def ui(self):
151 def ui(self):
159 return self._ui
152 return self._ui
160
153
161 def url(self):
154 def url(self):
162 return self._url
155 return self._url
163
156
164 def local(self):
157 def local(self):
165 return None
158 return None
166
159
167 def peer(self):
160 def peer(self):
168 return self
161 return self
169
162
170 def canpush(self):
163 def canpush(self):
171 return True
164 return True
172
165
173 def close(self):
166 def close(self):
174 pass
167 pass
175
168
176 # End of _basepeer interface.
169 # End of _basepeer interface.
177
170
178 # Begin of _basewirecommands interface.
171 # Begin of _basewirecommands interface.
179
172
180 def capabilities(self):
173 def capabilities(self):
181 return self._caps
174 return self._caps
182
175
183 # End of _basewirecommands interface.
176 # End of _basewirecommands interface.
184
177
185 def _validaterepo(self, sshcmd, args, remotecmd, sshenv=None):
178 def _validaterepo(self, sshcmd, args, remotecmd, sshenv=None):
186 # cleanup up previous run
179 # cleanup up previous run
187 self._cleanup()
180 self._cleanup()
188
181
189 cmd = '%s %s %s' % (sshcmd, args,
182 cmd = '%s %s %s' % (sshcmd, args,
190 util.shellquote("%s -R %s serve --stdio" %
183 util.shellquote("%s -R %s serve --stdio" %
191 (_serverquote(remotecmd), _serverquote(self._path))))
184 (_serverquote(remotecmd), _serverquote(self._path))))
192 self.ui.debug('running %s\n' % cmd)
185 self.ui.debug('running %s\n' % cmd)
193 cmd = util.quotecommand(cmd)
186 cmd = util.quotecommand(cmd)
194
187
195 # while self._subprocess isn't used, having it allows the subprocess to
188 # while self._subprocess isn't used, having it allows the subprocess to
196 # to clean up correctly later
189 # to clean up correctly later
197 #
190 #
198 # no buffer allow the use of 'select'
191 # no buffer allow the use of 'select'
199 # feel free to remove buffering and select usage when we ultimately
192 # feel free to remove buffering and select usage when we ultimately
200 # move to threading.
193 # move to threading.
201 sub = util.popen4(cmd, bufsize=0, env=sshenv)
194 sub = util.popen4(cmd, bufsize=0, env=sshenv)
202 self._pipeo, self._pipei, self._pipee, self._subprocess = sub
195 self._pipeo, self._pipei, self._pipee, self._subprocess = sub
203
196
204 self._pipei = util.bufferedinputpipe(self._pipei)
197 self._pipei = util.bufferedinputpipe(self._pipei)
205 self._pipei = doublepipe(self.ui, self._pipei, self._pipee)
198 self._pipei = doublepipe(self.ui, self._pipei, self._pipee)
206 self._pipeo = doublepipe(self.ui, self._pipeo, self._pipee)
199 self._pipeo = doublepipe(self.ui, self._pipeo, self._pipee)
207
200
208 def badresponse():
201 def badresponse():
209 msg = _("no suitable response from remote hg")
202 msg = _("no suitable response from remote hg")
210 hint = self.ui.config("ui", "ssherrorhint")
203 hint = self.ui.config("ui", "ssherrorhint")
211 self._abort(error.RepoError(msg, hint=hint))
204 self._abort(error.RepoError(msg, hint=hint))
212
205
213 try:
206 try:
214 # skip any noise generated by remote shell
207 # skip any noise generated by remote shell
215 self._callstream("hello")
208 self._callstream("hello")
216 r = self._callstream("between", pairs=("%s-%s" % ("0"*40, "0"*40)))
209 r = self._callstream("between", pairs=("%s-%s" % ("0"*40, "0"*40)))
217 except IOError:
210 except IOError:
218 badresponse()
211 badresponse()
219
212
220 lines = ["", "dummy"]
213 lines = ["", "dummy"]
221 max_noise = 500
214 max_noise = 500
222 while lines[-1] and max_noise:
215 while lines[-1] and max_noise:
223 try:
216 try:
224 l = r.readline()
217 l = r.readline()
225 self._readerr()
218 self._readerr()
226 if lines[-1] == "1\n" and l == "\n":
219 if lines[-1] == "1\n" and l == "\n":
227 break
220 break
228 if l:
221 if l:
229 self.ui.debug("remote: ", l)
222 self.ui.debug("remote: ", l)
230 lines.append(l)
223 lines.append(l)
231 max_noise -= 1
224 max_noise -= 1
232 except IOError:
225 except IOError:
233 badresponse()
226 badresponse()
234 else:
227 else:
235 badresponse()
228 badresponse()
236
229
237 self._caps = set()
230 self._caps = set()
238 for l in reversed(lines):
231 for l in reversed(lines):
239 if l.startswith("capabilities:"):
232 if l.startswith("capabilities:"):
240 self._caps.update(l[:-1].split(":")[1].split())
233 self._caps.update(l[:-1].split(":")[1].split())
241 break
234 break
242
235
243 def _readerr(self):
236 def _readerr(self):
244 _forwardoutput(self.ui, self._pipee)
237 _forwardoutput(self.ui, self._pipee)
245
238
246 def _abort(self, exception):
239 def _abort(self, exception):
247 self._cleanup()
240 self._cleanup()
248 raise exception
241 raise exception
249
242
250 def _cleanup(self):
243 def _cleanup(self):
251 if self._pipeo is None:
244 if self._pipeo is None:
252 return
245 return
253 self._pipeo.close()
246 self._pipeo.close()
254 self._pipei.close()
247 self._pipei.close()
255 try:
248 try:
256 # read the error descriptor until EOF
249 # read the error descriptor until EOF
257 for l in self._pipee:
250 for l in self._pipee:
258 self.ui.status(_("remote: "), l)
251 self.ui.status(_("remote: "), l)
259 except (IOError, ValueError):
252 except (IOError, ValueError):
260 pass
253 pass
261 self._pipee.close()
254 self._pipee.close()
262
255
263 __del__ = _cleanup
256 __del__ = _cleanup
264
257
265 def _submitbatch(self, req):
258 def _submitbatch(self, req):
266 rsp = self._callstream("batch", cmds=wireproto.encodebatchcmds(req))
259 rsp = self._callstream("batch", cmds=wireproto.encodebatchcmds(req))
267 available = self._getamount()
260 available = self._getamount()
268 # TODO this response parsing is probably suboptimal for large
261 # TODO this response parsing is probably suboptimal for large
269 # batches with large responses.
262 # batches with large responses.
270 toread = min(available, 1024)
263 toread = min(available, 1024)
271 work = rsp.read(toread)
264 work = rsp.read(toread)
272 available -= toread
265 available -= toread
273 chunk = work
266 chunk = work
274 while chunk:
267 while chunk:
275 while ';' in work:
268 while ';' in work:
276 one, work = work.split(';', 1)
269 one, work = work.split(';', 1)
277 yield wireproto.unescapearg(one)
270 yield wireproto.unescapearg(one)
278 toread = min(available, 1024)
271 toread = min(available, 1024)
279 chunk = rsp.read(toread)
272 chunk = rsp.read(toread)
280 available -= toread
273 available -= toread
281 work += chunk
274 work += chunk
282 yield wireproto.unescapearg(work)
275 yield wireproto.unescapearg(work)
283
276
284 def _callstream(self, cmd, **args):
277 def _callstream(self, cmd, **args):
285 args = pycompat.byteskwargs(args)
278 args = pycompat.byteskwargs(args)
286 if (self.ui.debugflag
279 if (self.ui.debugflag
287 and self.ui.configbool('devel', 'debug.peer-request')):
280 and self.ui.configbool('devel', 'debug.peer-request')):
288 dbg = self.ui.debug
281 dbg = self.ui.debug
289 line = 'devel-peer-request: %s\n'
282 line = 'devel-peer-request: %s\n'
290 dbg(line % cmd)
283 dbg(line % cmd)
291 for key, value in sorted(args.items()):
284 for key, value in sorted(args.items()):
292 if not isinstance(value, dict):
285 if not isinstance(value, dict):
293 dbg(line % ' %s: %d bytes' % (key, len(value)))
286 dbg(line % ' %s: %d bytes' % (key, len(value)))
294 else:
287 else:
295 for dk, dv in sorted(value.items()):
288 for dk, dv in sorted(value.items()):
296 dbg(line % ' %s-%s: %d' % (key, dk, len(dv)))
289 dbg(line % ' %s-%s: %d' % (key, dk, len(dv)))
297 self.ui.debug("sending %s command\n" % cmd)
290 self.ui.debug("sending %s command\n" % cmd)
298 self._pipeo.write("%s\n" % cmd)
291 self._pipeo.write("%s\n" % cmd)
299 _func, names = wireproto.commands[cmd]
292 _func, names = wireproto.commands[cmd]
300 keys = names.split()
293 keys = names.split()
301 wireargs = {}
294 wireargs = {}
302 for k in keys:
295 for k in keys:
303 if k == '*':
296 if k == '*':
304 wireargs['*'] = args
297 wireargs['*'] = args
305 break
298 break
306 else:
299 else:
307 wireargs[k] = args[k]
300 wireargs[k] = args[k]
308 del args[k]
301 del args[k]
309 for k, v in sorted(wireargs.iteritems()):
302 for k, v in sorted(wireargs.iteritems()):
310 self._pipeo.write("%s %d\n" % (k, len(v)))
303 self._pipeo.write("%s %d\n" % (k, len(v)))
311 if isinstance(v, dict):
304 if isinstance(v, dict):
312 for dk, dv in v.iteritems():
305 for dk, dv in v.iteritems():
313 self._pipeo.write("%s %d\n" % (dk, len(dv)))
306 self._pipeo.write("%s %d\n" % (dk, len(dv)))
314 self._pipeo.write(dv)
307 self._pipeo.write(dv)
315 else:
308 else:
316 self._pipeo.write(v)
309 self._pipeo.write(v)
317 self._pipeo.flush()
310 self._pipeo.flush()
318
311
319 return self._pipei
312 return self._pipei
320
313
321 def _callcompressable(self, cmd, **args):
314 def _callcompressable(self, cmd, **args):
322 return self._callstream(cmd, **args)
315 return self._callstream(cmd, **args)
323
316
324 def _call(self, cmd, **args):
317 def _call(self, cmd, **args):
325 self._callstream(cmd, **args)
318 self._callstream(cmd, **args)
326 return self._recv()
319 return self._recv()
327
320
328 def _callpush(self, cmd, fp, **args):
321 def _callpush(self, cmd, fp, **args):
329 r = self._call(cmd, **args)
322 r = self._call(cmd, **args)
330 if r:
323 if r:
331 return '', r
324 return '', r
332 for d in iter(lambda: fp.read(4096), ''):
325 for d in iter(lambda: fp.read(4096), ''):
333 self._send(d)
326 self._send(d)
334 self._send("", flush=True)
327 self._send("", flush=True)
335 r = self._recv()
328 r = self._recv()
336 if r:
329 if r:
337 return '', r
330 return '', r
338 return self._recv(), ''
331 return self._recv(), ''
339
332
340 def _calltwowaystream(self, cmd, fp, **args):
333 def _calltwowaystream(self, cmd, fp, **args):
341 r = self._call(cmd, **args)
334 r = self._call(cmd, **args)
342 if r:
335 if r:
343 # XXX needs to be made better
336 # XXX needs to be made better
344 raise error.Abort(_('unexpected remote reply: %s') % r)
337 raise error.Abort(_('unexpected remote reply: %s') % r)
345 for d in iter(lambda: fp.read(4096), ''):
338 for d in iter(lambda: fp.read(4096), ''):
346 self._send(d)
339 self._send(d)
347 self._send("", flush=True)
340 self._send("", flush=True)
348 return self._pipei
341 return self._pipei
349
342
350 def _getamount(self):
343 def _getamount(self):
351 l = self._pipei.readline()
344 l = self._pipei.readline()
352 if l == '\n':
345 if l == '\n':
353 self._readerr()
346 self._readerr()
354 msg = _('check previous remote output')
347 msg = _('check previous remote output')
355 self._abort(error.OutOfBandError(hint=msg))
348 self._abort(error.OutOfBandError(hint=msg))
356 self._readerr()
349 self._readerr()
357 try:
350 try:
358 return int(l)
351 return int(l)
359 except ValueError:
352 except ValueError:
360 self._abort(error.ResponseError(_("unexpected response:"), l))
353 self._abort(error.ResponseError(_("unexpected response:"), l))
361
354
362 def _recv(self):
355 def _recv(self):
363 return self._pipei.read(self._getamount())
356 return self._pipei.read(self._getamount())
364
357
365 def _send(self, data, flush=False):
358 def _send(self, data, flush=False):
366 self._pipeo.write("%d\n" % len(data))
359 self._pipeo.write("%d\n" % len(data))
367 if data:
360 if data:
368 self._pipeo.write(data)
361 self._pipeo.write(data)
369 if flush:
362 if flush:
370 self._pipeo.flush()
363 self._pipeo.flush()
371 self._readerr()
364 self._readerr()
372
365
373 def instance(ui, path, create):
366 def instance(ui, path, create):
367 """Create an SSH peer.
368
369 The returned object conforms to the ``wireproto.wirepeer`` interface.
370 """
371 u = util.url(path, parsequery=False, parsefragment=False)
372 if u.scheme != 'ssh' or not u.host or u.path is None:
373 raise error.RepoError(_("couldn't parse location %s") % path)
374
375 util.checksafessh(path)
376
377 if u.passwd is not None:
378 raise error.RepoError(_('password in URL not supported'))
379
374 return sshpeer(ui, path, create=create)
380 return sshpeer(ui, path, create=create)
General Comments 0
You need to be logged in to leave comments. Login now