##// END OF EJS Templates
ssh: fix flakey ssh errors on BSD systems...
Durham Goode -
r34194:1908dc95 stable
parent child Browse files
Show More
@@ -1,368 +1,378 b''
1 # sshpeer.py - ssh repository proxy class for mercurial
1 # sshpeer.py - ssh repository proxy class for mercurial
2 #
2 #
3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import re
10 import re
11
11
12 from .i18n import _
12 from .i18n import _
13 from . import (
13 from . import (
14 error,
14 error,
15 pycompat,
15 pycompat,
16 util,
16 util,
17 wireproto,
17 wireproto,
18 )
18 )
19
19
20 class remotelock(object):
20 class remotelock(object):
21 def __init__(self, repo):
21 def __init__(self, repo):
22 self.repo = repo
22 self.repo = repo
23 def release(self):
23 def release(self):
24 self.repo.unlock()
24 self.repo.unlock()
25 self.repo = None
25 self.repo = None
26 def __enter__(self):
26 def __enter__(self):
27 return self
27 return self
28 def __exit__(self, exc_type, exc_val, exc_tb):
28 def __exit__(self, exc_type, exc_val, exc_tb):
29 if self.repo:
29 if self.repo:
30 self.release()
30 self.release()
31 def __del__(self):
31 def __del__(self):
32 if self.repo:
32 if self.repo:
33 self.release()
33 self.release()
34
34
35 def _serverquote(s):
35 def _serverquote(s):
36 if not s:
36 if not s:
37 return s
37 return s
38 '''quote a string for the remote shell ... which we assume is sh'''
38 '''quote a string for the remote shell ... which we assume is sh'''
39 if re.match('[a-zA-Z0-9@%_+=:,./-]*$', s):
39 if re.match('[a-zA-Z0-9@%_+=:,./-]*$', s):
40 return s
40 return s
41 return "'%s'" % s.replace("'", "'\\''")
41 return "'%s'" % s.replace("'", "'\\''")
42
42
43 def _forwardoutput(ui, pipe):
43 def _forwardoutput(ui, pipe):
44 """display all data currently available on pipe as remote output.
44 """display all data currently available on pipe as remote output.
45
45
46 This is non blocking."""
46 This is non blocking."""
47 s = util.readpipe(pipe)
47 s = util.readpipe(pipe)
48 if s:
48 if s:
49 for l in s.splitlines():
49 for l in s.splitlines():
50 ui.status(_("remote: "), l, '\n')
50 ui.status(_("remote: "), l, '\n')
51
51
52 class doublepipe(object):
52 class doublepipe(object):
53 """Operate a side-channel pipe in addition of a main one
53 """Operate a side-channel pipe in addition of a main one
54
54
55 The side-channel pipe contains server output to be forwarded to the user
55 The side-channel pipe contains server output to be forwarded to the user
56 input. The double pipe will behave as the "main" pipe, but will ensure the
56 input. The double pipe will behave as the "main" pipe, but will ensure the
57 content of the "side" pipe is properly processed while we wait for blocking
57 content of the "side" pipe is properly processed while we wait for blocking
58 call on the "main" pipe.
58 call on the "main" pipe.
59
59
60 If large amounts of data are read from "main", the forward will cease after
60 If large amounts of data are read from "main", the forward will cease after
61 the first bytes start to appear. This simplifies the implementation
61 the first bytes start to appear. This simplifies the implementation
62 without affecting actual output of sshpeer too much as we rarely issue
62 without affecting actual output of sshpeer too much as we rarely issue
63 large read for data not yet emitted by the server.
63 large read for data not yet emitted by the server.
64
64
65 The main pipe is expected to be a 'bufferedinputpipe' from the util module
65 The main pipe is expected to be a 'bufferedinputpipe' from the util module
66 that handle all the os specific bits. This class lives in this module
66 that handle all the os specific bits. This class lives in this module
67 because it focus on behavior specific to the ssh protocol."""
67 because it focus on behavior specific to the ssh protocol."""
68
68
69 def __init__(self, ui, main, side):
69 def __init__(self, ui, main, side):
70 self._ui = ui
70 self._ui = ui
71 self._main = main
71 self._main = main
72 self._side = side
72 self._side = side
73
73
74 def _wait(self):
74 def _wait(self):
75 """wait until some data are available on main or side
75 """wait until some data are available on main or side
76
76
77 return a pair of boolean (ismainready, issideready)
77 return a pair of boolean (ismainready, issideready)
78
78
79 (This will only wait for data if the setup is supported by `util.poll`)
79 (This will only wait for data if the setup is supported by `util.poll`)
80 """
80 """
81 if getattr(self._main, 'hasbuffer', False): # getattr for classic pipe
81 if getattr(self._main, 'hasbuffer', False): # getattr for classic pipe
82 return (True, True) # main has data, assume side is worth poking at.
82 return (True, True) # main has data, assume side is worth poking at.
83 fds = [self._main.fileno(), self._side.fileno()]
83 fds = [self._main.fileno(), self._side.fileno()]
84 try:
84 try:
85 act = util.poll(fds)
85 act = util.poll(fds)
86 except NotImplementedError:
86 except NotImplementedError:
87 # non supported yet case, assume all have data.
87 # non supported yet case, assume all have data.
88 act = fds
88 act = fds
89 return (self._main.fileno() in act, self._side.fileno() in act)
89 return (self._main.fileno() in act, self._side.fileno() in act)
90
90
91 def write(self, data):
91 def write(self, data):
92 return self._call('write', data)
92 return self._call('write', data)
93
93
94 def read(self, size):
94 def read(self, size):
95 r = self._call('read', size)
95 r = self._call('read', size)
96 if size != 0 and not r:
96 if size != 0 and not r:
97 # We've observed a condition that indicates the
97 # We've observed a condition that indicates the
98 # stdout closed unexpectedly. Check stderr one
98 # stdout closed unexpectedly. Check stderr one
99 # more time and snag anything that's there before
99 # more time and snag anything that's there before
100 # letting anyone know the main part of the pipe
100 # letting anyone know the main part of the pipe
101 # closed prematurely.
101 # closed prematurely.
102 _forwardoutput(self._ui, self._side)
102 _forwardoutput(self._ui, self._side)
103 return r
103 return r
104
104
105 def readline(self):
105 def readline(self):
106 return self._call('readline')
106 return self._call('readline')
107
107
108 def _call(self, methname, data=None):
108 def _call(self, methname, data=None):
109 """call <methname> on "main", forward output of "side" while blocking
109 """call <methname> on "main", forward output of "side" while blocking
110 """
110 """
111 # data can be '' or 0
111 # data can be '' or 0
112 if (data is not None and not data) or self._main.closed:
112 if (data is not None and not data) or self._main.closed:
113 _forwardoutput(self._ui, self._side)
113 _forwardoutput(self._ui, self._side)
114 return ''
114 return ''
115 while True:
115 while True:
116 mainready, sideready = self._wait()
116 mainready, sideready = self._wait()
117 if sideready:
117 if sideready:
118 _forwardoutput(self._ui, self._side)
118 _forwardoutput(self._ui, self._side)
119 if mainready:
119 if mainready:
120 meth = getattr(self._main, methname)
120 meth = getattr(self._main, methname)
121 if data is None:
121 if data is None:
122 return meth()
122 return meth()
123 else:
123 else:
124 return meth(data)
124 return meth(data)
125
125
126 def close(self):
126 def close(self):
127 return self._main.close()
127 return self._main.close()
128
128
129 def flush(self):
129 def flush(self):
130 return self._main.flush()
130 return self._main.flush()
131
131
132 class sshpeer(wireproto.wirepeer):
132 class sshpeer(wireproto.wirepeer):
133 def __init__(self, ui, path, create=False):
133 def __init__(self, ui, path, create=False):
134 self._url = path
134 self._url = path
135 self.ui = ui
135 self.ui = ui
136 self.pipeo = self.pipei = self.pipee = None
136 self.pipeo = self.pipei = self.pipee = None
137
137
138 u = util.url(path, parsequery=False, parsefragment=False)
138 u = util.url(path, parsequery=False, parsefragment=False)
139 if u.scheme != 'ssh' or not u.host or u.path is None:
139 if u.scheme != 'ssh' or not u.host or u.path is None:
140 self._abort(error.RepoError(_("couldn't parse location %s") % path))
140 self._abort(error.RepoError(_("couldn't parse location %s") % path))
141
141
142 util.checksafessh(path)
142 util.checksafessh(path)
143
143
144 self.user = u.user
144 self.user = u.user
145 if u.passwd is not None:
145 if u.passwd is not None:
146 self._abort(error.RepoError(_("password in URL not supported")))
146 self._abort(error.RepoError(_("password in URL not supported")))
147 self.host = u.host
147 self.host = u.host
148 self.port = u.port
148 self.port = u.port
149 self.path = u.path or "."
149 self.path = u.path or "."
150
150
151 sshcmd = self.ui.config("ui", "ssh")
151 sshcmd = self.ui.config("ui", "ssh")
152 remotecmd = self.ui.config("ui", "remotecmd")
152 remotecmd = self.ui.config("ui", "remotecmd")
153
153
154 args = util.sshargs(sshcmd, self.host, self.user, self.port)
154 args = util.sshargs(sshcmd, self.host, self.user, self.port)
155
155
156 if create:
156 if create:
157 cmd = '%s %s %s' % (sshcmd, args,
157 cmd = '%s %s %s' % (sshcmd, args,
158 util.shellquote("%s init %s" %
158 util.shellquote("%s init %s" %
159 (_serverquote(remotecmd), _serverquote(self.path))))
159 (_serverquote(remotecmd), _serverquote(self.path))))
160 ui.debug('running %s\n' % cmd)
160 ui.debug('running %s\n' % cmd)
161 res = ui.system(cmd, blockedtag='sshpeer')
161 res = ui.system(cmd, blockedtag='sshpeer')
162 if res != 0:
162 if res != 0:
163 self._abort(error.RepoError(_("could not create remote repo")))
163 self._abort(error.RepoError(_("could not create remote repo")))
164
164
165 self._validaterepo(sshcmd, args, remotecmd)
165 self._validaterepo(sshcmd, args, remotecmd)
166
166
167 def url(self):
167 def url(self):
168 return self._url
168 return self._url
169
169
170 def _validaterepo(self, sshcmd, args, remotecmd):
170 def _validaterepo(self, sshcmd, args, remotecmd):
171 # cleanup up previous run
171 # cleanup up previous run
172 self.cleanup()
172 self.cleanup()
173
173
174 cmd = '%s %s %s' % (sshcmd, args,
174 cmd = '%s %s %s' % (sshcmd, args,
175 util.shellquote("%s -R %s serve --stdio" %
175 util.shellquote("%s -R %s serve --stdio" %
176 (_serverquote(remotecmd), _serverquote(self.path))))
176 (_serverquote(remotecmd), _serverquote(self.path))))
177 self.ui.debug('running %s\n' % cmd)
177 self.ui.debug('running %s\n' % cmd)
178 cmd = util.quotecommand(cmd)
178 cmd = util.quotecommand(cmd)
179
179
180 # while self.subprocess isn't used, having it allows the subprocess to
180 # while self.subprocess isn't used, having it allows the subprocess to
181 # to clean up correctly later
181 # to clean up correctly later
182 #
182 #
183 # no buffer allow the use of 'select'
183 # no buffer allow the use of 'select'
184 # feel free to remove buffering and select usage when we ultimately
184 # feel free to remove buffering and select usage when we ultimately
185 # move to threading.
185 # move to threading.
186 sub = util.popen4(cmd, bufsize=0)
186 sub = util.popen4(cmd, bufsize=0)
187 self.pipeo, self.pipei, self.pipee, self.subprocess = sub
187 self.pipeo, self.pipei, self.pipee, self.subprocess = sub
188
188
189 self.pipei = util.bufferedinputpipe(self.pipei)
189 self.pipei = util.bufferedinputpipe(self.pipei)
190 self.pipei = doublepipe(self.ui, self.pipei, self.pipee)
190 self.pipei = doublepipe(self.ui, self.pipei, self.pipee)
191 self.pipeo = doublepipe(self.ui, self.pipeo, self.pipee)
191 self.pipeo = doublepipe(self.ui, self.pipeo, self.pipee)
192
192
193 def badresponse():
194 self._abort(error.RepoError(_('no suitable response from '
195 'remote hg')))
196
197 try:
193 # skip any noise generated by remote shell
198 # skip any noise generated by remote shell
194 self._callstream("hello")
199 self._callstream("hello")
195 r = self._callstream("between", pairs=("%s-%s" % ("0"*40, "0"*40)))
200 r = self._callstream("between", pairs=("%s-%s" % ("0"*40, "0"*40)))
201 except IOError:
202 badresponse()
203
196 lines = ["", "dummy"]
204 lines = ["", "dummy"]
197 max_noise = 500
205 max_noise = 500
198 while lines[-1] and max_noise:
206 while lines[-1] and max_noise:
207 try:
199 l = r.readline()
208 l = r.readline()
200 self.readerr()
209 self.readerr()
201 if lines[-1] == "1\n" and l == "\n":
210 if lines[-1] == "1\n" and l == "\n":
202 break
211 break
203 if l:
212 if l:
204 self.ui.debug("remote: ", l)
213 self.ui.debug("remote: ", l)
205 lines.append(l)
214 lines.append(l)
206 max_noise -= 1
215 max_noise -= 1
216 except IOError:
217 badresponse()
207 else:
218 else:
208 self._abort(error.RepoError(_('no suitable response from '
219 badresponse()
209 'remote hg')))
210
220
211 self._caps = set()
221 self._caps = set()
212 for l in reversed(lines):
222 for l in reversed(lines):
213 if l.startswith("capabilities:"):
223 if l.startswith("capabilities:"):
214 self._caps.update(l[:-1].split(":")[1].split())
224 self._caps.update(l[:-1].split(":")[1].split())
215 break
225 break
216
226
217 def _capabilities(self):
227 def _capabilities(self):
218 return self._caps
228 return self._caps
219
229
220 def readerr(self):
230 def readerr(self):
221 _forwardoutput(self.ui, self.pipee)
231 _forwardoutput(self.ui, self.pipee)
222
232
223 def _abort(self, exception):
233 def _abort(self, exception):
224 self.cleanup()
234 self.cleanup()
225 raise exception
235 raise exception
226
236
227 def cleanup(self):
237 def cleanup(self):
228 if self.pipeo is None:
238 if self.pipeo is None:
229 return
239 return
230 self.pipeo.close()
240 self.pipeo.close()
231 self.pipei.close()
241 self.pipei.close()
232 try:
242 try:
233 # read the error descriptor until EOF
243 # read the error descriptor until EOF
234 for l in self.pipee:
244 for l in self.pipee:
235 self.ui.status(_("remote: "), l)
245 self.ui.status(_("remote: "), l)
236 except (IOError, ValueError):
246 except (IOError, ValueError):
237 pass
247 pass
238 self.pipee.close()
248 self.pipee.close()
239
249
240 __del__ = cleanup
250 __del__ = cleanup
241
251
242 def _submitbatch(self, req):
252 def _submitbatch(self, req):
243 rsp = self._callstream("batch", cmds=wireproto.encodebatchcmds(req))
253 rsp = self._callstream("batch", cmds=wireproto.encodebatchcmds(req))
244 available = self._getamount()
254 available = self._getamount()
245 # TODO this response parsing is probably suboptimal for large
255 # TODO this response parsing is probably suboptimal for large
246 # batches with large responses.
256 # batches with large responses.
247 toread = min(available, 1024)
257 toread = min(available, 1024)
248 work = rsp.read(toread)
258 work = rsp.read(toread)
249 available -= toread
259 available -= toread
250 chunk = work
260 chunk = work
251 while chunk:
261 while chunk:
252 while ';' in work:
262 while ';' in work:
253 one, work = work.split(';', 1)
263 one, work = work.split(';', 1)
254 yield wireproto.unescapearg(one)
264 yield wireproto.unescapearg(one)
255 toread = min(available, 1024)
265 toread = min(available, 1024)
256 chunk = rsp.read(toread)
266 chunk = rsp.read(toread)
257 available -= toread
267 available -= toread
258 work += chunk
268 work += chunk
259 yield wireproto.unescapearg(work)
269 yield wireproto.unescapearg(work)
260
270
261 def _callstream(self, cmd, **args):
271 def _callstream(self, cmd, **args):
262 args = pycompat.byteskwargs(args)
272 args = pycompat.byteskwargs(args)
263 self.ui.debug("sending %s command\n" % cmd)
273 self.ui.debug("sending %s command\n" % cmd)
264 self.pipeo.write("%s\n" % cmd)
274 self.pipeo.write("%s\n" % cmd)
265 _func, names = wireproto.commands[cmd]
275 _func, names = wireproto.commands[cmd]
266 keys = names.split()
276 keys = names.split()
267 wireargs = {}
277 wireargs = {}
268 for k in keys:
278 for k in keys:
269 if k == '*':
279 if k == '*':
270 wireargs['*'] = args
280 wireargs['*'] = args
271 break
281 break
272 else:
282 else:
273 wireargs[k] = args[k]
283 wireargs[k] = args[k]
274 del args[k]
284 del args[k]
275 for k, v in sorted(wireargs.iteritems()):
285 for k, v in sorted(wireargs.iteritems()):
276 self.pipeo.write("%s %d\n" % (k, len(v)))
286 self.pipeo.write("%s %d\n" % (k, len(v)))
277 if isinstance(v, dict):
287 if isinstance(v, dict):
278 for dk, dv in v.iteritems():
288 for dk, dv in v.iteritems():
279 self.pipeo.write("%s %d\n" % (dk, len(dv)))
289 self.pipeo.write("%s %d\n" % (dk, len(dv)))
280 self.pipeo.write(dv)
290 self.pipeo.write(dv)
281 else:
291 else:
282 self.pipeo.write(v)
292 self.pipeo.write(v)
283 self.pipeo.flush()
293 self.pipeo.flush()
284
294
285 return self.pipei
295 return self.pipei
286
296
287 def _callcompressable(self, cmd, **args):
297 def _callcompressable(self, cmd, **args):
288 return self._callstream(cmd, **args)
298 return self._callstream(cmd, **args)
289
299
290 def _call(self, cmd, **args):
300 def _call(self, cmd, **args):
291 self._callstream(cmd, **args)
301 self._callstream(cmd, **args)
292 return self._recv()
302 return self._recv()
293
303
294 def _callpush(self, cmd, fp, **args):
304 def _callpush(self, cmd, fp, **args):
295 r = self._call(cmd, **args)
305 r = self._call(cmd, **args)
296 if r:
306 if r:
297 return '', r
307 return '', r
298 for d in iter(lambda: fp.read(4096), ''):
308 for d in iter(lambda: fp.read(4096), ''):
299 self._send(d)
309 self._send(d)
300 self._send("", flush=True)
310 self._send("", flush=True)
301 r = self._recv()
311 r = self._recv()
302 if r:
312 if r:
303 return '', r
313 return '', r
304 return self._recv(), ''
314 return self._recv(), ''
305
315
306 def _calltwowaystream(self, cmd, fp, **args):
316 def _calltwowaystream(self, cmd, fp, **args):
307 r = self._call(cmd, **args)
317 r = self._call(cmd, **args)
308 if r:
318 if r:
309 # XXX needs to be made better
319 # XXX needs to be made better
310 raise error.Abort(_('unexpected remote reply: %s') % r)
320 raise error.Abort(_('unexpected remote reply: %s') % r)
311 for d in iter(lambda: fp.read(4096), ''):
321 for d in iter(lambda: fp.read(4096), ''):
312 self._send(d)
322 self._send(d)
313 self._send("", flush=True)
323 self._send("", flush=True)
314 return self.pipei
324 return self.pipei
315
325
316 def _getamount(self):
326 def _getamount(self):
317 l = self.pipei.readline()
327 l = self.pipei.readline()
318 if l == '\n':
328 if l == '\n':
319 self.readerr()
329 self.readerr()
320 msg = _('check previous remote output')
330 msg = _('check previous remote output')
321 self._abort(error.OutOfBandError(hint=msg))
331 self._abort(error.OutOfBandError(hint=msg))
322 self.readerr()
332 self.readerr()
323 try:
333 try:
324 return int(l)
334 return int(l)
325 except ValueError:
335 except ValueError:
326 self._abort(error.ResponseError(_("unexpected response:"), l))
336 self._abort(error.ResponseError(_("unexpected response:"), l))
327
337
328 def _recv(self):
338 def _recv(self):
329 return self.pipei.read(self._getamount())
339 return self.pipei.read(self._getamount())
330
340
331 def _send(self, data, flush=False):
341 def _send(self, data, flush=False):
332 self.pipeo.write("%d\n" % len(data))
342 self.pipeo.write("%d\n" % len(data))
333 if data:
343 if data:
334 self.pipeo.write(data)
344 self.pipeo.write(data)
335 if flush:
345 if flush:
336 self.pipeo.flush()
346 self.pipeo.flush()
337 self.readerr()
347 self.readerr()
338
348
339 def lock(self):
349 def lock(self):
340 self._call("lock")
350 self._call("lock")
341 return remotelock(self)
351 return remotelock(self)
342
352
343 def unlock(self):
353 def unlock(self):
344 self._call("unlock")
354 self._call("unlock")
345
355
346 def addchangegroup(self, cg, source, url, lock=None):
356 def addchangegroup(self, cg, source, url, lock=None):
347 '''Send a changegroup to the remote server. Return an integer
357 '''Send a changegroup to the remote server. Return an integer
348 similar to unbundle(). DEPRECATED, since it requires locking the
358 similar to unbundle(). DEPRECATED, since it requires locking the
349 remote.'''
359 remote.'''
350 d = self._call("addchangegroup")
360 d = self._call("addchangegroup")
351 if d:
361 if d:
352 self._abort(error.RepoError(_("push refused: %s") % d))
362 self._abort(error.RepoError(_("push refused: %s") % d))
353 for d in iter(lambda: cg.read(4096), ''):
363 for d in iter(lambda: cg.read(4096), ''):
354 self.pipeo.write(d)
364 self.pipeo.write(d)
355 self.readerr()
365 self.readerr()
356
366
357 self.pipeo.flush()
367 self.pipeo.flush()
358
368
359 self.readerr()
369 self.readerr()
360 r = self._recv()
370 r = self._recv()
361 if not r:
371 if not r:
362 return 1
372 return 1
363 try:
373 try:
364 return int(r)
374 return int(r)
365 except ValueError:
375 except ValueError:
366 self._abort(error.ResponseError(_("unexpected response:"), r))
376 self._abort(error.ResponseError(_("unexpected response:"), r))
367
377
368 instance = sshpeer
378 instance = sshpeer
General Comments 0
You need to be logged in to leave comments. Login now