##// END OF EJS Templates
ssh: fix flakey ssh errors on BSD systems...
Durham Goode -
r34107:c037fd65 default
parent child Browse files
Show More
@@ -1,349 +1,359 b''
1 1 # sshpeer.py - ssh repository proxy class for mercurial
2 2 #
3 3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import re
11 11
12 12 from .i18n import _
13 13 from . import (
14 14 error,
15 15 pycompat,
16 16 util,
17 17 wireproto,
18 18 )
19 19
20 20 def _serverquote(s):
21 21 if not s:
22 22 return s
23 23 '''quote a string for the remote shell ... which we assume is sh'''
24 24 if re.match('[a-zA-Z0-9@%_+=:,./-]*$', s):
25 25 return s
26 26 return "'%s'" % s.replace("'", "'\\''")
27 27
28 28 def _forwardoutput(ui, pipe):
29 29 """display all data currently available on pipe as remote output.
30 30
31 31 This is non blocking."""
32 32 s = util.readpipe(pipe)
33 33 if s:
34 34 for l in s.splitlines():
35 35 ui.status(_("remote: "), l, '\n')
36 36
37 37 class doublepipe(object):
38 38 """Operate a side-channel pipe in addition of a main one
39 39
40 40 The side-channel pipe contains server output to be forwarded to the user
41 41 input. The double pipe will behave as the "main" pipe, but will ensure the
42 42 content of the "side" pipe is properly processed while we wait for blocking
43 43 call on the "main" pipe.
44 44
45 45 If large amounts of data are read from "main", the forward will cease after
46 46 the first bytes start to appear. This simplifies the implementation
47 47 without affecting actual output of sshpeer too much as we rarely issue
48 48 large read for data not yet emitted by the server.
49 49
50 50 The main pipe is expected to be a 'bufferedinputpipe' from the util module
51 51 that handle all the os specific bits. This class lives in this module
52 52 because it focus on behavior specific to the ssh protocol."""
53 53
54 54 def __init__(self, ui, main, side):
55 55 self._ui = ui
56 56 self._main = main
57 57 self._side = side
58 58
59 59 def _wait(self):
60 60 """wait until some data are available on main or side
61 61
62 62 return a pair of boolean (ismainready, issideready)
63 63
64 64 (This will only wait for data if the setup is supported by `util.poll`)
65 65 """
66 66 if getattr(self._main, 'hasbuffer', False): # getattr for classic pipe
67 67 return (True, True) # main has data, assume side is worth poking at.
68 68 fds = [self._main.fileno(), self._side.fileno()]
69 69 try:
70 70 act = util.poll(fds)
71 71 except NotImplementedError:
72 72 # non supported yet case, assume all have data.
73 73 act = fds
74 74 return (self._main.fileno() in act, self._side.fileno() in act)
75 75
76 76 def write(self, data):
77 77 return self._call('write', data)
78 78
79 79 def read(self, size):
80 80 r = self._call('read', size)
81 81 if size != 0 and not r:
82 82 # We've observed a condition that indicates the
83 83 # stdout closed unexpectedly. Check stderr one
84 84 # more time and snag anything that's there before
85 85 # letting anyone know the main part of the pipe
86 86 # closed prematurely.
87 87 _forwardoutput(self._ui, self._side)
88 88 return r
89 89
90 90 def readline(self):
91 91 return self._call('readline')
92 92
93 93 def _call(self, methname, data=None):
94 94 """call <methname> on "main", forward output of "side" while blocking
95 95 """
96 96 # data can be '' or 0
97 97 if (data is not None and not data) or self._main.closed:
98 98 _forwardoutput(self._ui, self._side)
99 99 return ''
100 100 while True:
101 101 mainready, sideready = self._wait()
102 102 if sideready:
103 103 _forwardoutput(self._ui, self._side)
104 104 if mainready:
105 105 meth = getattr(self._main, methname)
106 106 if data is None:
107 107 return meth()
108 108 else:
109 109 return meth(data)
110 110
111 111 def close(self):
112 112 return self._main.close()
113 113
114 114 def flush(self):
115 115 return self._main.flush()
116 116
117 117 class sshpeer(wireproto.wirepeer):
118 118 def __init__(self, ui, path, create=False):
119 119 self._url = path
120 120 self._ui = ui
121 121 self._pipeo = self._pipei = self._pipee = None
122 122
123 123 u = util.url(path, parsequery=False, parsefragment=False)
124 124 if u.scheme != 'ssh' or not u.host or u.path is None:
125 125 self._abort(error.RepoError(_("couldn't parse location %s") % path))
126 126
127 127 util.checksafessh(path)
128 128
129 129 if u.passwd is not None:
130 130 self._abort(error.RepoError(_("password in URL not supported")))
131 131
132 132 self._user = u.user
133 133 self._host = u.host
134 134 self._port = u.port
135 135 self._path = u.path or '.'
136 136
137 137 sshcmd = self.ui.config("ui", "ssh")
138 138 remotecmd = self.ui.config("ui", "remotecmd")
139 139
140 140 args = util.sshargs(sshcmd, self._host, self._user, self._port)
141 141
142 142 if create:
143 143 cmd = '%s %s %s' % (sshcmd, args,
144 144 util.shellquote("%s init %s" %
145 145 (_serverquote(remotecmd), _serverquote(self._path))))
146 146 ui.debug('running %s\n' % cmd)
147 147 res = ui.system(cmd, blockedtag='sshpeer')
148 148 if res != 0:
149 149 self._abort(error.RepoError(_("could not create remote repo")))
150 150
151 151 self._validaterepo(sshcmd, args, remotecmd)
152 152
153 153 # Begin of _basepeer interface.
154 154
155 155 @util.propertycache
156 156 def ui(self):
157 157 return self._ui
158 158
159 159 def url(self):
160 160 return self._url
161 161
162 162 def local(self):
163 163 return None
164 164
165 165 def peer(self):
166 166 return self
167 167
168 168 def canpush(self):
169 169 return True
170 170
171 171 def close(self):
172 172 pass
173 173
174 174 # End of _basepeer interface.
175 175
176 176 # Begin of _basewirecommands interface.
177 177
178 178 def capabilities(self):
179 179 return self._caps
180 180
181 181 # End of _basewirecommands interface.
182 182
183 183 def _validaterepo(self, sshcmd, args, remotecmd):
184 184 # cleanup up previous run
185 185 self._cleanup()
186 186
187 187 cmd = '%s %s %s' % (sshcmd, args,
188 188 util.shellquote("%s -R %s serve --stdio" %
189 189 (_serverquote(remotecmd), _serverquote(self._path))))
190 190 self.ui.debug('running %s\n' % cmd)
191 191 cmd = util.quotecommand(cmd)
192 192
193 193 # while self._subprocess isn't used, having it allows the subprocess to
194 194 # to clean up correctly later
195 195 #
196 196 # no buffer allow the use of 'select'
197 197 # feel free to remove buffering and select usage when we ultimately
198 198 # move to threading.
199 199 sub = util.popen4(cmd, bufsize=0)
200 200 self._pipeo, self._pipei, self._pipee, self._subprocess = sub
201 201
202 202 self._pipei = util.bufferedinputpipe(self._pipei)
203 203 self._pipei = doublepipe(self.ui, self._pipei, self._pipee)
204 204 self._pipeo = doublepipe(self.ui, self._pipeo, self._pipee)
205 205
206 # skip any noise generated by remote shell
207 self._callstream("hello")
208 r = self._callstream("between", pairs=("%s-%s" % ("0"*40, "0"*40)))
206 def badresponse():
207 self._abort(error.RepoError(_('no suitable response from '
208 'remote hg')))
209
210 try:
211 # skip any noise generated by remote shell
212 self._callstream("hello")
213 r = self._callstream("between", pairs=("%s-%s" % ("0"*40, "0"*40)))
214 except IOError:
215 badresponse()
216
209 217 lines = ["", "dummy"]
210 218 max_noise = 500
211 219 while lines[-1] and max_noise:
212 l = r.readline()
213 self._readerr()
214 if lines[-1] == "1\n" and l == "\n":
215 break
216 if l:
217 self.ui.debug("remote: ", l)
218 lines.append(l)
219 max_noise -= 1
220 try:
221 l = r.readline()
222 self._readerr()
223 if lines[-1] == "1\n" and l == "\n":
224 break
225 if l:
226 self.ui.debug("remote: ", l)
227 lines.append(l)
228 max_noise -= 1
229 except IOError:
230 badresponse()
220 231 else:
221 self._abort(error.RepoError(_('no suitable response from '
222 'remote hg')))
232 badresponse()
223 233
224 234 self._caps = set()
225 235 for l in reversed(lines):
226 236 if l.startswith("capabilities:"):
227 237 self._caps.update(l[:-1].split(":")[1].split())
228 238 break
229 239
230 240 def _readerr(self):
231 241 _forwardoutput(self.ui, self._pipee)
232 242
233 243 def _abort(self, exception):
234 244 self._cleanup()
235 245 raise exception
236 246
237 247 def _cleanup(self):
238 248 if self._pipeo is None:
239 249 return
240 250 self._pipeo.close()
241 251 self._pipei.close()
242 252 try:
243 253 # read the error descriptor until EOF
244 254 for l in self._pipee:
245 255 self.ui.status(_("remote: "), l)
246 256 except (IOError, ValueError):
247 257 pass
248 258 self._pipee.close()
249 259
250 260 __del__ = _cleanup
251 261
252 262 def _submitbatch(self, req):
253 263 rsp = self._callstream("batch", cmds=wireproto.encodebatchcmds(req))
254 264 available = self._getamount()
255 265 # TODO this response parsing is probably suboptimal for large
256 266 # batches with large responses.
257 267 toread = min(available, 1024)
258 268 work = rsp.read(toread)
259 269 available -= toread
260 270 chunk = work
261 271 while chunk:
262 272 while ';' in work:
263 273 one, work = work.split(';', 1)
264 274 yield wireproto.unescapearg(one)
265 275 toread = min(available, 1024)
266 276 chunk = rsp.read(toread)
267 277 available -= toread
268 278 work += chunk
269 279 yield wireproto.unescapearg(work)
270 280
271 281 def _callstream(self, cmd, **args):
272 282 args = pycompat.byteskwargs(args)
273 283 self.ui.debug("sending %s command\n" % cmd)
274 284 self._pipeo.write("%s\n" % cmd)
275 285 _func, names = wireproto.commands[cmd]
276 286 keys = names.split()
277 287 wireargs = {}
278 288 for k in keys:
279 289 if k == '*':
280 290 wireargs['*'] = args
281 291 break
282 292 else:
283 293 wireargs[k] = args[k]
284 294 del args[k]
285 295 for k, v in sorted(wireargs.iteritems()):
286 296 self._pipeo.write("%s %d\n" % (k, len(v)))
287 297 if isinstance(v, dict):
288 298 for dk, dv in v.iteritems():
289 299 self._pipeo.write("%s %d\n" % (dk, len(dv)))
290 300 self._pipeo.write(dv)
291 301 else:
292 302 self._pipeo.write(v)
293 303 self._pipeo.flush()
294 304
295 305 return self._pipei
296 306
297 307 def _callcompressable(self, cmd, **args):
298 308 return self._callstream(cmd, **args)
299 309
300 310 def _call(self, cmd, **args):
301 311 self._callstream(cmd, **args)
302 312 return self._recv()
303 313
304 314 def _callpush(self, cmd, fp, **args):
305 315 r = self._call(cmd, **args)
306 316 if r:
307 317 return '', r
308 318 for d in iter(lambda: fp.read(4096), ''):
309 319 self._send(d)
310 320 self._send("", flush=True)
311 321 r = self._recv()
312 322 if r:
313 323 return '', r
314 324 return self._recv(), ''
315 325
316 326 def _calltwowaystream(self, cmd, fp, **args):
317 327 r = self._call(cmd, **args)
318 328 if r:
319 329 # XXX needs to be made better
320 330 raise error.Abort(_('unexpected remote reply: %s') % r)
321 331 for d in iter(lambda: fp.read(4096), ''):
322 332 self._send(d)
323 333 self._send("", flush=True)
324 334 return self._pipei
325 335
326 336 def _getamount(self):
327 337 l = self._pipei.readline()
328 338 if l == '\n':
329 339 self._readerr()
330 340 msg = _('check previous remote output')
331 341 self._abort(error.OutOfBandError(hint=msg))
332 342 self._readerr()
333 343 try:
334 344 return int(l)
335 345 except ValueError:
336 346 self._abort(error.ResponseError(_("unexpected response:"), l))
337 347
338 348 def _recv(self):
339 349 return self._pipei.read(self._getamount())
340 350
341 351 def _send(self, data, flush=False):
342 352 self._pipeo.write("%d\n" % len(data))
343 353 if data:
344 354 self._pipeo.write(data)
345 355 if flush:
346 356 self._pipeo.flush()
347 357 self._readerr()
348 358
349 359 instance = sshpeer
General Comments 0
You need to be logged in to leave comments. Login now