##// END OF EJS Templates
ssh: fix flakey ssh errors on BSD systems...
Durham Goode -
r34107:c037fd65 default
parent child Browse files
Show More
@@ -1,349 +1,359 b''
1 # sshpeer.py - ssh repository proxy class for mercurial
1 # sshpeer.py - ssh repository proxy class for mercurial
2 #
2 #
3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import re
10 import re
11
11
12 from .i18n import _
12 from .i18n import _
13 from . import (
13 from . import (
14 error,
14 error,
15 pycompat,
15 pycompat,
16 util,
16 util,
17 wireproto,
17 wireproto,
18 )
18 )
19
19
20 def _serverquote(s):
20 def _serverquote(s):
21 if not s:
21 if not s:
22 return s
22 return s
23 '''quote a string for the remote shell ... which we assume is sh'''
23 '''quote a string for the remote shell ... which we assume is sh'''
24 if re.match('[a-zA-Z0-9@%_+=:,./-]*$', s):
24 if re.match('[a-zA-Z0-9@%_+=:,./-]*$', s):
25 return s
25 return s
26 return "'%s'" % s.replace("'", "'\\''")
26 return "'%s'" % s.replace("'", "'\\''")
27
27
28 def _forwardoutput(ui, pipe):
28 def _forwardoutput(ui, pipe):
29 """display all data currently available on pipe as remote output.
29 """display all data currently available on pipe as remote output.
30
30
31 This is non blocking."""
31 This is non blocking."""
32 s = util.readpipe(pipe)
32 s = util.readpipe(pipe)
33 if s:
33 if s:
34 for l in s.splitlines():
34 for l in s.splitlines():
35 ui.status(_("remote: "), l, '\n')
35 ui.status(_("remote: "), l, '\n')
36
36
37 class doublepipe(object):
37 class doublepipe(object):
38 """Operate a side-channel pipe in addition of a main one
38 """Operate a side-channel pipe in addition of a main one
39
39
40 The side-channel pipe contains server output to be forwarded to the user
40 The side-channel pipe contains server output to be forwarded to the user
41 input. The double pipe will behave as the "main" pipe, but will ensure the
41 input. The double pipe will behave as the "main" pipe, but will ensure the
42 content of the "side" pipe is properly processed while we wait for blocking
42 content of the "side" pipe is properly processed while we wait for blocking
43 call on the "main" pipe.
43 call on the "main" pipe.
44
44
45 If large amounts of data are read from "main", the forward will cease after
45 If large amounts of data are read from "main", the forward will cease after
46 the first bytes start to appear. This simplifies the implementation
46 the first bytes start to appear. This simplifies the implementation
47 without affecting actual output of sshpeer too much as we rarely issue
47 without affecting actual output of sshpeer too much as we rarely issue
48 large read for data not yet emitted by the server.
48 large read for data not yet emitted by the server.
49
49
50 The main pipe is expected to be a 'bufferedinputpipe' from the util module
50 The main pipe is expected to be a 'bufferedinputpipe' from the util module
51 that handle all the os specific bits. This class lives in this module
51 that handle all the os specific bits. This class lives in this module
52 because it focus on behavior specific to the ssh protocol."""
52 because it focus on behavior specific to the ssh protocol."""
53
53
54 def __init__(self, ui, main, side):
54 def __init__(self, ui, main, side):
55 self._ui = ui
55 self._ui = ui
56 self._main = main
56 self._main = main
57 self._side = side
57 self._side = side
58
58
59 def _wait(self):
59 def _wait(self):
60 """wait until some data are available on main or side
60 """wait until some data are available on main or side
61
61
62 return a pair of boolean (ismainready, issideready)
62 return a pair of boolean (ismainready, issideready)
63
63
64 (This will only wait for data if the setup is supported by `util.poll`)
64 (This will only wait for data if the setup is supported by `util.poll`)
65 """
65 """
66 if getattr(self._main, 'hasbuffer', False): # getattr for classic pipe
66 if getattr(self._main, 'hasbuffer', False): # getattr for classic pipe
67 return (True, True) # main has data, assume side is worth poking at.
67 return (True, True) # main has data, assume side is worth poking at.
68 fds = [self._main.fileno(), self._side.fileno()]
68 fds = [self._main.fileno(), self._side.fileno()]
69 try:
69 try:
70 act = util.poll(fds)
70 act = util.poll(fds)
71 except NotImplementedError:
71 except NotImplementedError:
72 # non supported yet case, assume all have data.
72 # non supported yet case, assume all have data.
73 act = fds
73 act = fds
74 return (self._main.fileno() in act, self._side.fileno() in act)
74 return (self._main.fileno() in act, self._side.fileno() in act)
75
75
76 def write(self, data):
76 def write(self, data):
77 return self._call('write', data)
77 return self._call('write', data)
78
78
79 def read(self, size):
79 def read(self, size):
80 r = self._call('read', size)
80 r = self._call('read', size)
81 if size != 0 and not r:
81 if size != 0 and not r:
82 # We've observed a condition that indicates the
82 # We've observed a condition that indicates the
83 # stdout closed unexpectedly. Check stderr one
83 # stdout closed unexpectedly. Check stderr one
84 # more time and snag anything that's there before
84 # more time and snag anything that's there before
85 # letting anyone know the main part of the pipe
85 # letting anyone know the main part of the pipe
86 # closed prematurely.
86 # closed prematurely.
87 _forwardoutput(self._ui, self._side)
87 _forwardoutput(self._ui, self._side)
88 return r
88 return r
89
89
90 def readline(self):
90 def readline(self):
91 return self._call('readline')
91 return self._call('readline')
92
92
93 def _call(self, methname, data=None):
93 def _call(self, methname, data=None):
94 """call <methname> on "main", forward output of "side" while blocking
94 """call <methname> on "main", forward output of "side" while blocking
95 """
95 """
96 # data can be '' or 0
96 # data can be '' or 0
97 if (data is not None and not data) or self._main.closed:
97 if (data is not None and not data) or self._main.closed:
98 _forwardoutput(self._ui, self._side)
98 _forwardoutput(self._ui, self._side)
99 return ''
99 return ''
100 while True:
100 while True:
101 mainready, sideready = self._wait()
101 mainready, sideready = self._wait()
102 if sideready:
102 if sideready:
103 _forwardoutput(self._ui, self._side)
103 _forwardoutput(self._ui, self._side)
104 if mainready:
104 if mainready:
105 meth = getattr(self._main, methname)
105 meth = getattr(self._main, methname)
106 if data is None:
106 if data is None:
107 return meth()
107 return meth()
108 else:
108 else:
109 return meth(data)
109 return meth(data)
110
110
111 def close(self):
111 def close(self):
112 return self._main.close()
112 return self._main.close()
113
113
114 def flush(self):
114 def flush(self):
115 return self._main.flush()
115 return self._main.flush()
116
116
117 class sshpeer(wireproto.wirepeer):
117 class sshpeer(wireproto.wirepeer):
118 def __init__(self, ui, path, create=False):
118 def __init__(self, ui, path, create=False):
119 self._url = path
119 self._url = path
120 self._ui = ui
120 self._ui = ui
121 self._pipeo = self._pipei = self._pipee = None
121 self._pipeo = self._pipei = self._pipee = None
122
122
123 u = util.url(path, parsequery=False, parsefragment=False)
123 u = util.url(path, parsequery=False, parsefragment=False)
124 if u.scheme != 'ssh' or not u.host or u.path is None:
124 if u.scheme != 'ssh' or not u.host or u.path is None:
125 self._abort(error.RepoError(_("couldn't parse location %s") % path))
125 self._abort(error.RepoError(_("couldn't parse location %s") % path))
126
126
127 util.checksafessh(path)
127 util.checksafessh(path)
128
128
129 if u.passwd is not None:
129 if u.passwd is not None:
130 self._abort(error.RepoError(_("password in URL not supported")))
130 self._abort(error.RepoError(_("password in URL not supported")))
131
131
132 self._user = u.user
132 self._user = u.user
133 self._host = u.host
133 self._host = u.host
134 self._port = u.port
134 self._port = u.port
135 self._path = u.path or '.'
135 self._path = u.path or '.'
136
136
137 sshcmd = self.ui.config("ui", "ssh")
137 sshcmd = self.ui.config("ui", "ssh")
138 remotecmd = self.ui.config("ui", "remotecmd")
138 remotecmd = self.ui.config("ui", "remotecmd")
139
139
140 args = util.sshargs(sshcmd, self._host, self._user, self._port)
140 args = util.sshargs(sshcmd, self._host, self._user, self._port)
141
141
142 if create:
142 if create:
143 cmd = '%s %s %s' % (sshcmd, args,
143 cmd = '%s %s %s' % (sshcmd, args,
144 util.shellquote("%s init %s" %
144 util.shellquote("%s init %s" %
145 (_serverquote(remotecmd), _serverquote(self._path))))
145 (_serverquote(remotecmd), _serverquote(self._path))))
146 ui.debug('running %s\n' % cmd)
146 ui.debug('running %s\n' % cmd)
147 res = ui.system(cmd, blockedtag='sshpeer')
147 res = ui.system(cmd, blockedtag='sshpeer')
148 if res != 0:
148 if res != 0:
149 self._abort(error.RepoError(_("could not create remote repo")))
149 self._abort(error.RepoError(_("could not create remote repo")))
150
150
151 self._validaterepo(sshcmd, args, remotecmd)
151 self._validaterepo(sshcmd, args, remotecmd)
152
152
153 # Begin of _basepeer interface.
153 # Begin of _basepeer interface.
154
154
155 @util.propertycache
155 @util.propertycache
156 def ui(self):
156 def ui(self):
157 return self._ui
157 return self._ui
158
158
159 def url(self):
159 def url(self):
160 return self._url
160 return self._url
161
161
162 def local(self):
162 def local(self):
163 return None
163 return None
164
164
165 def peer(self):
165 def peer(self):
166 return self
166 return self
167
167
168 def canpush(self):
168 def canpush(self):
169 return True
169 return True
170
170
171 def close(self):
171 def close(self):
172 pass
172 pass
173
173
174 # End of _basepeer interface.
174 # End of _basepeer interface.
175
175
176 # Begin of _basewirecommands interface.
176 # Begin of _basewirecommands interface.
177
177
178 def capabilities(self):
178 def capabilities(self):
179 return self._caps
179 return self._caps
180
180
181 # End of _basewirecommands interface.
181 # End of _basewirecommands interface.
182
182
183 def _validaterepo(self, sshcmd, args, remotecmd):
183 def _validaterepo(self, sshcmd, args, remotecmd):
184 # cleanup up previous run
184 # cleanup up previous run
185 self._cleanup()
185 self._cleanup()
186
186
187 cmd = '%s %s %s' % (sshcmd, args,
187 cmd = '%s %s %s' % (sshcmd, args,
188 util.shellquote("%s -R %s serve --stdio" %
188 util.shellquote("%s -R %s serve --stdio" %
189 (_serverquote(remotecmd), _serverquote(self._path))))
189 (_serverquote(remotecmd), _serverquote(self._path))))
190 self.ui.debug('running %s\n' % cmd)
190 self.ui.debug('running %s\n' % cmd)
191 cmd = util.quotecommand(cmd)
191 cmd = util.quotecommand(cmd)
192
192
193 # while self._subprocess isn't used, having it allows the subprocess to
193 # while self._subprocess isn't used, having it allows the subprocess to
194 # to clean up correctly later
194 # to clean up correctly later
195 #
195 #
196 # no buffer allow the use of 'select'
196 # no buffer allow the use of 'select'
197 # feel free to remove buffering and select usage when we ultimately
197 # feel free to remove buffering and select usage when we ultimately
198 # move to threading.
198 # move to threading.
199 sub = util.popen4(cmd, bufsize=0)
199 sub = util.popen4(cmd, bufsize=0)
200 self._pipeo, self._pipei, self._pipee, self._subprocess = sub
200 self._pipeo, self._pipei, self._pipee, self._subprocess = sub
201
201
202 self._pipei = util.bufferedinputpipe(self._pipei)
202 self._pipei = util.bufferedinputpipe(self._pipei)
203 self._pipei = doublepipe(self.ui, self._pipei, self._pipee)
203 self._pipei = doublepipe(self.ui, self._pipei, self._pipee)
204 self._pipeo = doublepipe(self.ui, self._pipeo, self._pipee)
204 self._pipeo = doublepipe(self.ui, self._pipeo, self._pipee)
205
205
206 # skip any noise generated by remote shell
206 def badresponse():
207 self._callstream("hello")
207 self._abort(error.RepoError(_('no suitable response from '
208 r = self._callstream("between", pairs=("%s-%s" % ("0"*40, "0"*40)))
208 'remote hg')))
209
210 try:
211 # skip any noise generated by remote shell
212 self._callstream("hello")
213 r = self._callstream("between", pairs=("%s-%s" % ("0"*40, "0"*40)))
214 except IOError:
215 badresponse()
216
209 lines = ["", "dummy"]
217 lines = ["", "dummy"]
210 max_noise = 500
218 max_noise = 500
211 while lines[-1] and max_noise:
219 while lines[-1] and max_noise:
212 l = r.readline()
220 try:
213 self._readerr()
221 l = r.readline()
214 if lines[-1] == "1\n" and l == "\n":
222 self._readerr()
215 break
223 if lines[-1] == "1\n" and l == "\n":
216 if l:
224 break
217 self.ui.debug("remote: ", l)
225 if l:
218 lines.append(l)
226 self.ui.debug("remote: ", l)
219 max_noise -= 1
227 lines.append(l)
228 max_noise -= 1
229 except IOError:
230 badresponse()
220 else:
231 else:
221 self._abort(error.RepoError(_('no suitable response from '
232 badresponse()
222 'remote hg')))
223
233
224 self._caps = set()
234 self._caps = set()
225 for l in reversed(lines):
235 for l in reversed(lines):
226 if l.startswith("capabilities:"):
236 if l.startswith("capabilities:"):
227 self._caps.update(l[:-1].split(":")[1].split())
237 self._caps.update(l[:-1].split(":")[1].split())
228 break
238 break
229
239
230 def _readerr(self):
240 def _readerr(self):
231 _forwardoutput(self.ui, self._pipee)
241 _forwardoutput(self.ui, self._pipee)
232
242
233 def _abort(self, exception):
243 def _abort(self, exception):
234 self._cleanup()
244 self._cleanup()
235 raise exception
245 raise exception
236
246
237 def _cleanup(self):
247 def _cleanup(self):
238 if self._pipeo is None:
248 if self._pipeo is None:
239 return
249 return
240 self._pipeo.close()
250 self._pipeo.close()
241 self._pipei.close()
251 self._pipei.close()
242 try:
252 try:
243 # read the error descriptor until EOF
253 # read the error descriptor until EOF
244 for l in self._pipee:
254 for l in self._pipee:
245 self.ui.status(_("remote: "), l)
255 self.ui.status(_("remote: "), l)
246 except (IOError, ValueError):
256 except (IOError, ValueError):
247 pass
257 pass
248 self._pipee.close()
258 self._pipee.close()
249
259
250 __del__ = _cleanup
260 __del__ = _cleanup
251
261
252 def _submitbatch(self, req):
262 def _submitbatch(self, req):
253 rsp = self._callstream("batch", cmds=wireproto.encodebatchcmds(req))
263 rsp = self._callstream("batch", cmds=wireproto.encodebatchcmds(req))
254 available = self._getamount()
264 available = self._getamount()
255 # TODO this response parsing is probably suboptimal for large
265 # TODO this response parsing is probably suboptimal for large
256 # batches with large responses.
266 # batches with large responses.
257 toread = min(available, 1024)
267 toread = min(available, 1024)
258 work = rsp.read(toread)
268 work = rsp.read(toread)
259 available -= toread
269 available -= toread
260 chunk = work
270 chunk = work
261 while chunk:
271 while chunk:
262 while ';' in work:
272 while ';' in work:
263 one, work = work.split(';', 1)
273 one, work = work.split(';', 1)
264 yield wireproto.unescapearg(one)
274 yield wireproto.unescapearg(one)
265 toread = min(available, 1024)
275 toread = min(available, 1024)
266 chunk = rsp.read(toread)
276 chunk = rsp.read(toread)
267 available -= toread
277 available -= toread
268 work += chunk
278 work += chunk
269 yield wireproto.unescapearg(work)
279 yield wireproto.unescapearg(work)
270
280
271 def _callstream(self, cmd, **args):
281 def _callstream(self, cmd, **args):
272 args = pycompat.byteskwargs(args)
282 args = pycompat.byteskwargs(args)
273 self.ui.debug("sending %s command\n" % cmd)
283 self.ui.debug("sending %s command\n" % cmd)
274 self._pipeo.write("%s\n" % cmd)
284 self._pipeo.write("%s\n" % cmd)
275 _func, names = wireproto.commands[cmd]
285 _func, names = wireproto.commands[cmd]
276 keys = names.split()
286 keys = names.split()
277 wireargs = {}
287 wireargs = {}
278 for k in keys:
288 for k in keys:
279 if k == '*':
289 if k == '*':
280 wireargs['*'] = args
290 wireargs['*'] = args
281 break
291 break
282 else:
292 else:
283 wireargs[k] = args[k]
293 wireargs[k] = args[k]
284 del args[k]
294 del args[k]
285 for k, v in sorted(wireargs.iteritems()):
295 for k, v in sorted(wireargs.iteritems()):
286 self._pipeo.write("%s %d\n" % (k, len(v)))
296 self._pipeo.write("%s %d\n" % (k, len(v)))
287 if isinstance(v, dict):
297 if isinstance(v, dict):
288 for dk, dv in v.iteritems():
298 for dk, dv in v.iteritems():
289 self._pipeo.write("%s %d\n" % (dk, len(dv)))
299 self._pipeo.write("%s %d\n" % (dk, len(dv)))
290 self._pipeo.write(dv)
300 self._pipeo.write(dv)
291 else:
301 else:
292 self._pipeo.write(v)
302 self._pipeo.write(v)
293 self._pipeo.flush()
303 self._pipeo.flush()
294
304
295 return self._pipei
305 return self._pipei
296
306
297 def _callcompressable(self, cmd, **args):
307 def _callcompressable(self, cmd, **args):
298 return self._callstream(cmd, **args)
308 return self._callstream(cmd, **args)
299
309
300 def _call(self, cmd, **args):
310 def _call(self, cmd, **args):
301 self._callstream(cmd, **args)
311 self._callstream(cmd, **args)
302 return self._recv()
312 return self._recv()
303
313
304 def _callpush(self, cmd, fp, **args):
314 def _callpush(self, cmd, fp, **args):
305 r = self._call(cmd, **args)
315 r = self._call(cmd, **args)
306 if r:
316 if r:
307 return '', r
317 return '', r
308 for d in iter(lambda: fp.read(4096), ''):
318 for d in iter(lambda: fp.read(4096), ''):
309 self._send(d)
319 self._send(d)
310 self._send("", flush=True)
320 self._send("", flush=True)
311 r = self._recv()
321 r = self._recv()
312 if r:
322 if r:
313 return '', r
323 return '', r
314 return self._recv(), ''
324 return self._recv(), ''
315
325
316 def _calltwowaystream(self, cmd, fp, **args):
326 def _calltwowaystream(self, cmd, fp, **args):
317 r = self._call(cmd, **args)
327 r = self._call(cmd, **args)
318 if r:
328 if r:
319 # XXX needs to be made better
329 # XXX needs to be made better
320 raise error.Abort(_('unexpected remote reply: %s') % r)
330 raise error.Abort(_('unexpected remote reply: %s') % r)
321 for d in iter(lambda: fp.read(4096), ''):
331 for d in iter(lambda: fp.read(4096), ''):
322 self._send(d)
332 self._send(d)
323 self._send("", flush=True)
333 self._send("", flush=True)
324 return self._pipei
334 return self._pipei
325
335
326 def _getamount(self):
336 def _getamount(self):
327 l = self._pipei.readline()
337 l = self._pipei.readline()
328 if l == '\n':
338 if l == '\n':
329 self._readerr()
339 self._readerr()
330 msg = _('check previous remote output')
340 msg = _('check previous remote output')
331 self._abort(error.OutOfBandError(hint=msg))
341 self._abort(error.OutOfBandError(hint=msg))
332 self._readerr()
342 self._readerr()
333 try:
343 try:
334 return int(l)
344 return int(l)
335 except ValueError:
345 except ValueError:
336 self._abort(error.ResponseError(_("unexpected response:"), l))
346 self._abort(error.ResponseError(_("unexpected response:"), l))
337
347
338 def _recv(self):
348 def _recv(self):
339 return self._pipei.read(self._getamount())
349 return self._pipei.read(self._getamount())
340
350
341 def _send(self, data, flush=False):
351 def _send(self, data, flush=False):
342 self._pipeo.write("%d\n" % len(data))
352 self._pipeo.write("%d\n" % len(data))
343 if data:
353 if data:
344 self._pipeo.write(data)
354 self._pipeo.write(data)
345 if flush:
355 if flush:
346 self._pipeo.flush()
356 self._pipeo.flush()
347 self._readerr()
357 self._readerr()
348
358
349 instance = sshpeer
359 instance = sshpeer
General Comments 0
You need to be logged in to leave comments. Login now