##// END OF EJS Templates
sshpeer: move URL validation out of sshpeer.__init__...
Gregory Szorc -
r35949:b202d360 default
parent child Browse files
Show More
@@ -1,374 +1,380 b''
1 1 # sshpeer.py - ssh repository proxy class for mercurial
2 2 #
3 3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import re
11 11
12 12 from .i18n import _
13 13 from . import (
14 14 error,
15 15 pycompat,
16 16 util,
17 17 wireproto,
18 18 )
19 19
20 20 def _serverquote(s):
21 21 """quote a string for the remote shell ... which we assume is sh"""
22 22 if not s:
23 23 return s
24 24 if re.match('[a-zA-Z0-9@%_+=:,./-]*$', s):
25 25 return s
26 26 return "'%s'" % s.replace("'", "'\\''")
27 27
28 28 def _forwardoutput(ui, pipe):
29 29 """display all data currently available on pipe as remote output.
30 30
31 31 This is non blocking."""
32 32 s = util.readpipe(pipe)
33 33 if s:
34 34 for l in s.splitlines():
35 35 ui.status(_("remote: "), l, '\n')
36 36
37 37 class doublepipe(object):
38 38 """Operate a side-channel pipe in addition of a main one
39 39
40 40 The side-channel pipe contains server output to be forwarded to the user
41 41 input. The double pipe will behave as the "main" pipe, but will ensure the
42 42 content of the "side" pipe is properly processed while we wait for blocking
43 43 call on the "main" pipe.
44 44
45 45 If large amounts of data are read from "main", the forward will cease after
46 46 the first bytes start to appear. This simplifies the implementation
47 47 without affecting actual output of sshpeer too much as we rarely issue
48 48 large read for data not yet emitted by the server.
49 49
50 50 The main pipe is expected to be a 'bufferedinputpipe' from the util module
51 51 that handle all the os specific bits. This class lives in this module
52 52 because it focus on behavior specific to the ssh protocol."""
53 53
54 54 def __init__(self, ui, main, side):
55 55 self._ui = ui
56 56 self._main = main
57 57 self._side = side
58 58
59 59 def _wait(self):
60 60 """wait until some data are available on main or side
61 61
62 62 return a pair of boolean (ismainready, issideready)
63 63
64 64 (This will only wait for data if the setup is supported by `util.poll`)
65 65 """
66 66 if getattr(self._main, 'hasbuffer', False): # getattr for classic pipe
67 67 return (True, True) # main has data, assume side is worth poking at.
68 68 fds = [self._main.fileno(), self._side.fileno()]
69 69 try:
70 70 act = util.poll(fds)
71 71 except NotImplementedError:
72 72 # non supported yet case, assume all have data.
73 73 act = fds
74 74 return (self._main.fileno() in act, self._side.fileno() in act)
75 75
76 76 def write(self, data):
77 77 return self._call('write', data)
78 78
79 79 def read(self, size):
80 80 r = self._call('read', size)
81 81 if size != 0 and not r:
82 82 # We've observed a condition that indicates the
83 83 # stdout closed unexpectedly. Check stderr one
84 84 # more time and snag anything that's there before
85 85 # letting anyone know the main part of the pipe
86 86 # closed prematurely.
87 87 _forwardoutput(self._ui, self._side)
88 88 return r
89 89
90 90 def readline(self):
91 91 return self._call('readline')
92 92
93 93 def _call(self, methname, data=None):
94 94 """call <methname> on "main", forward output of "side" while blocking
95 95 """
96 96 # data can be '' or 0
97 97 if (data is not None and not data) or self._main.closed:
98 98 _forwardoutput(self._ui, self._side)
99 99 return ''
100 100 while True:
101 101 mainready, sideready = self._wait()
102 102 if sideready:
103 103 _forwardoutput(self._ui, self._side)
104 104 if mainready:
105 105 meth = getattr(self._main, methname)
106 106 if data is None:
107 107 return meth()
108 108 else:
109 109 return meth(data)
110 110
111 111 def close(self):
112 112 return self._main.close()
113 113
114 114 def flush(self):
115 115 return self._main.flush()
116 116
117 117 class sshpeer(wireproto.wirepeer):
118 118 def __init__(self, ui, path, create=False):
119 119 self._url = path
120 120 self._ui = ui
121 121 self._pipeo = self._pipei = self._pipee = None
122 122
123 123 u = util.url(path, parsequery=False, parsefragment=False)
124 if u.scheme != 'ssh' or not u.host or u.path is None:
125 self._abort(error.RepoError(_("couldn't parse location %s") % path))
126
127 util.checksafessh(path)
128
129 if u.passwd is not None:
130 self._abort(error.RepoError(_("password in URL not supported")))
131 124
132 125 self._user = u.user
133 126 self._host = u.host
134 127 self._port = u.port
135 128 self._path = u.path or '.'
136 129
137 130 sshcmd = self.ui.config("ui", "ssh")
138 131 remotecmd = self.ui.config("ui", "remotecmd")
139 132 sshaddenv = dict(self.ui.configitems("sshenv"))
140 133 sshenv = util.shellenviron(sshaddenv)
141 134
142 135 args = util.sshargs(sshcmd, self._host, self._user, self._port)
143 136
144 137 if create:
145 138 cmd = '%s %s %s' % (sshcmd, args,
146 139 util.shellquote("%s init %s" %
147 140 (_serverquote(remotecmd), _serverquote(self._path))))
148 141 ui.debug('running %s\n' % cmd)
149 142 res = ui.system(cmd, blockedtag='sshpeer', environ=sshenv)
150 143 if res != 0:
151 144 self._abort(error.RepoError(_("could not create remote repo")))
152 145
153 146 self._validaterepo(sshcmd, args, remotecmd, sshenv)
154 147
155 148 # Begin of _basepeer interface.
156 149
157 150 @util.propertycache
158 151 def ui(self):
159 152 return self._ui
160 153
161 154 def url(self):
162 155 return self._url
163 156
164 157 def local(self):
165 158 return None
166 159
167 160 def peer(self):
168 161 return self
169 162
170 163 def canpush(self):
171 164 return True
172 165
173 166 def close(self):
174 167 pass
175 168
176 169 # End of _basepeer interface.
177 170
178 171 # Begin of _basewirecommands interface.
179 172
180 173 def capabilities(self):
181 174 return self._caps
182 175
183 176 # End of _basewirecommands interface.
184 177
185 178 def _validaterepo(self, sshcmd, args, remotecmd, sshenv=None):
186 179 # cleanup up previous run
187 180 self._cleanup()
188 181
189 182 cmd = '%s %s %s' % (sshcmd, args,
190 183 util.shellquote("%s -R %s serve --stdio" %
191 184 (_serverquote(remotecmd), _serverquote(self._path))))
192 185 self.ui.debug('running %s\n' % cmd)
193 186 cmd = util.quotecommand(cmd)
194 187
195 188 # while self._subprocess isn't used, having it allows the subprocess to
196 189 # to clean up correctly later
197 190 #
198 191 # no buffer allow the use of 'select'
199 192 # feel free to remove buffering and select usage when we ultimately
200 193 # move to threading.
201 194 sub = util.popen4(cmd, bufsize=0, env=sshenv)
202 195 self._pipeo, self._pipei, self._pipee, self._subprocess = sub
203 196
204 197 self._pipei = util.bufferedinputpipe(self._pipei)
205 198 self._pipei = doublepipe(self.ui, self._pipei, self._pipee)
206 199 self._pipeo = doublepipe(self.ui, self._pipeo, self._pipee)
207 200
208 201 def badresponse():
209 202 msg = _("no suitable response from remote hg")
210 203 hint = self.ui.config("ui", "ssherrorhint")
211 204 self._abort(error.RepoError(msg, hint=hint))
212 205
213 206 try:
214 207 # skip any noise generated by remote shell
215 208 self._callstream("hello")
216 209 r = self._callstream("between", pairs=("%s-%s" % ("0"*40, "0"*40)))
217 210 except IOError:
218 211 badresponse()
219 212
220 213 lines = ["", "dummy"]
221 214 max_noise = 500
222 215 while lines[-1] and max_noise:
223 216 try:
224 217 l = r.readline()
225 218 self._readerr()
226 219 if lines[-1] == "1\n" and l == "\n":
227 220 break
228 221 if l:
229 222 self.ui.debug("remote: ", l)
230 223 lines.append(l)
231 224 max_noise -= 1
232 225 except IOError:
233 226 badresponse()
234 227 else:
235 228 badresponse()
236 229
237 230 self._caps = set()
238 231 for l in reversed(lines):
239 232 if l.startswith("capabilities:"):
240 233 self._caps.update(l[:-1].split(":")[1].split())
241 234 break
242 235
243 236 def _readerr(self):
244 237 _forwardoutput(self.ui, self._pipee)
245 238
246 239 def _abort(self, exception):
247 240 self._cleanup()
248 241 raise exception
249 242
250 243 def _cleanup(self):
251 244 if self._pipeo is None:
252 245 return
253 246 self._pipeo.close()
254 247 self._pipei.close()
255 248 try:
256 249 # read the error descriptor until EOF
257 250 for l in self._pipee:
258 251 self.ui.status(_("remote: "), l)
259 252 except (IOError, ValueError):
260 253 pass
261 254 self._pipee.close()
262 255
263 256 __del__ = _cleanup
264 257
265 258 def _submitbatch(self, req):
266 259 rsp = self._callstream("batch", cmds=wireproto.encodebatchcmds(req))
267 260 available = self._getamount()
268 261 # TODO this response parsing is probably suboptimal for large
269 262 # batches with large responses.
270 263 toread = min(available, 1024)
271 264 work = rsp.read(toread)
272 265 available -= toread
273 266 chunk = work
274 267 while chunk:
275 268 while ';' in work:
276 269 one, work = work.split(';', 1)
277 270 yield wireproto.unescapearg(one)
278 271 toread = min(available, 1024)
279 272 chunk = rsp.read(toread)
280 273 available -= toread
281 274 work += chunk
282 275 yield wireproto.unescapearg(work)
283 276
284 277 def _callstream(self, cmd, **args):
285 278 args = pycompat.byteskwargs(args)
286 279 if (self.ui.debugflag
287 280 and self.ui.configbool('devel', 'debug.peer-request')):
288 281 dbg = self.ui.debug
289 282 line = 'devel-peer-request: %s\n'
290 283 dbg(line % cmd)
291 284 for key, value in sorted(args.items()):
292 285 if not isinstance(value, dict):
293 286 dbg(line % ' %s: %d bytes' % (key, len(value)))
294 287 else:
295 288 for dk, dv in sorted(value.items()):
296 289 dbg(line % ' %s-%s: %d' % (key, dk, len(dv)))
297 290 self.ui.debug("sending %s command\n" % cmd)
298 291 self._pipeo.write("%s\n" % cmd)
299 292 _func, names = wireproto.commands[cmd]
300 293 keys = names.split()
301 294 wireargs = {}
302 295 for k in keys:
303 296 if k == '*':
304 297 wireargs['*'] = args
305 298 break
306 299 else:
307 300 wireargs[k] = args[k]
308 301 del args[k]
309 302 for k, v in sorted(wireargs.iteritems()):
310 303 self._pipeo.write("%s %d\n" % (k, len(v)))
311 304 if isinstance(v, dict):
312 305 for dk, dv in v.iteritems():
313 306 self._pipeo.write("%s %d\n" % (dk, len(dv)))
314 307 self._pipeo.write(dv)
315 308 else:
316 309 self._pipeo.write(v)
317 310 self._pipeo.flush()
318 311
319 312 return self._pipei
320 313
321 314 def _callcompressable(self, cmd, **args):
322 315 return self._callstream(cmd, **args)
323 316
324 317 def _call(self, cmd, **args):
325 318 self._callstream(cmd, **args)
326 319 return self._recv()
327 320
328 321 def _callpush(self, cmd, fp, **args):
329 322 r = self._call(cmd, **args)
330 323 if r:
331 324 return '', r
332 325 for d in iter(lambda: fp.read(4096), ''):
333 326 self._send(d)
334 327 self._send("", flush=True)
335 328 r = self._recv()
336 329 if r:
337 330 return '', r
338 331 return self._recv(), ''
339 332
340 333 def _calltwowaystream(self, cmd, fp, **args):
341 334 r = self._call(cmd, **args)
342 335 if r:
343 336 # XXX needs to be made better
344 337 raise error.Abort(_('unexpected remote reply: %s') % r)
345 338 for d in iter(lambda: fp.read(4096), ''):
346 339 self._send(d)
347 340 self._send("", flush=True)
348 341 return self._pipei
349 342
350 343 def _getamount(self):
351 344 l = self._pipei.readline()
352 345 if l == '\n':
353 346 self._readerr()
354 347 msg = _('check previous remote output')
355 348 self._abort(error.OutOfBandError(hint=msg))
356 349 self._readerr()
357 350 try:
358 351 return int(l)
359 352 except ValueError:
360 353 self._abort(error.ResponseError(_("unexpected response:"), l))
361 354
362 355 def _recv(self):
363 356 return self._pipei.read(self._getamount())
364 357
365 358 def _send(self, data, flush=False):
366 359 self._pipeo.write("%d\n" % len(data))
367 360 if data:
368 361 self._pipeo.write(data)
369 362 if flush:
370 363 self._pipeo.flush()
371 364 self._readerr()
372 365
373 366 def instance(ui, path, create):
367 """Create an SSH peer.
368
369 The returned object conforms to the ``wireproto.wirepeer`` interface.
370 """
371 u = util.url(path, parsequery=False, parsefragment=False)
372 if u.scheme != 'ssh' or not u.host or u.path is None:
373 raise error.RepoError(_("couldn't parse location %s") % path)
374
375 util.checksafessh(path)
376
377 if u.passwd is not None:
378 raise error.RepoError(_('password in URL not supported'))
379
374 380 return sshpeer(ui, path, create=create)
General Comments 0
You need to be logged in to leave comments. Login now