##// END OF EJS Templates
sshpeer: try harder to snag stderr when stdout closes unexpectedly...
Augie Fackler -
r32062:ad6c5497 stable
parent child Browse files
Show More
@@ -1,359 +1,367
1 # sshpeer.py - ssh repository proxy class for mercurial
1 # sshpeer.py - ssh repository proxy class for mercurial
2 #
2 #
3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import re
10 import re
11
11
12 from .i18n import _
12 from .i18n import _
13 from . import (
13 from . import (
14 error,
14 error,
15 util,
15 util,
16 wireproto,
16 wireproto,
17 )
17 )
18
18
19 class remotelock(object):
19 class remotelock(object):
20 def __init__(self, repo):
20 def __init__(self, repo):
21 self.repo = repo
21 self.repo = repo
22 def release(self):
22 def release(self):
23 self.repo.unlock()
23 self.repo.unlock()
24 self.repo = None
24 self.repo = None
25 def __enter__(self):
25 def __enter__(self):
26 return self
26 return self
27 def __exit__(self, exc_type, exc_val, exc_tb):
27 def __exit__(self, exc_type, exc_val, exc_tb):
28 if self.repo:
28 if self.repo:
29 self.release()
29 self.release()
30 def __del__(self):
30 def __del__(self):
31 if self.repo:
31 if self.repo:
32 self.release()
32 self.release()
33
33
34 def _serverquote(s):
34 def _serverquote(s):
35 if not s:
35 if not s:
36 return s
36 return s
37 '''quote a string for the remote shell ... which we assume is sh'''
37 '''quote a string for the remote shell ... which we assume is sh'''
38 if re.match('[a-zA-Z0-9@%_+=:,./-]*$', s):
38 if re.match('[a-zA-Z0-9@%_+=:,./-]*$', s):
39 return s
39 return s
40 return "'%s'" % s.replace("'", "'\\''")
40 return "'%s'" % s.replace("'", "'\\''")
41
41
42 def _forwardoutput(ui, pipe):
42 def _forwardoutput(ui, pipe):
43 """display all data currently available on pipe as remote output.
43 """display all data currently available on pipe as remote output.
44
44
45 This is non blocking."""
45 This is non blocking."""
46 s = util.readpipe(pipe)
46 s = util.readpipe(pipe)
47 if s:
47 if s:
48 for l in s.splitlines():
48 for l in s.splitlines():
49 ui.status(_("remote: "), l, '\n')
49 ui.status(_("remote: "), l, '\n')
50
50
51 class doublepipe(object):
51 class doublepipe(object):
52 """Operate a side-channel pipe in addition of a main one
52 """Operate a side-channel pipe in addition of a main one
53
53
54 The side-channel pipe contains server output to be forwarded to the user
54 The side-channel pipe contains server output to be forwarded to the user
55 input. The double pipe will behave as the "main" pipe, but will ensure the
55 input. The double pipe will behave as the "main" pipe, but will ensure the
56 content of the "side" pipe is properly processed while we wait for blocking
56 content of the "side" pipe is properly processed while we wait for blocking
57 call on the "main" pipe.
57 call on the "main" pipe.
58
58
59 If large amounts of data are read from "main", the forward will cease after
59 If large amounts of data are read from "main", the forward will cease after
60 the first bytes start to appear. This simplifies the implementation
60 the first bytes start to appear. This simplifies the implementation
61 without affecting actual output of sshpeer too much as we rarely issue
61 without affecting actual output of sshpeer too much as we rarely issue
62 large read for data not yet emitted by the server.
62 large read for data not yet emitted by the server.
63
63
64 The main pipe is expected to be a 'bufferedinputpipe' from the util module
64 The main pipe is expected to be a 'bufferedinputpipe' from the util module
65 that handle all the os specific bits. This class lives in this module
65 that handle all the os specific bits. This class lives in this module
66 because it focus on behavior specific to the ssh protocol."""
66 because it focus on behavior specific to the ssh protocol."""
67
67
68 def __init__(self, ui, main, side):
68 def __init__(self, ui, main, side):
69 self._ui = ui
69 self._ui = ui
70 self._main = main
70 self._main = main
71 self._side = side
71 self._side = side
72
72
73 def _wait(self):
73 def _wait(self):
74 """wait until some data are available on main or side
74 """wait until some data are available on main or side
75
75
76 return a pair of boolean (ismainready, issideready)
76 return a pair of boolean (ismainready, issideready)
77
77
78 (This will only wait for data if the setup is supported by `util.poll`)
78 (This will only wait for data if the setup is supported by `util.poll`)
79 """
79 """
80 if getattr(self._main, 'hasbuffer', False): # getattr for classic pipe
80 if getattr(self._main, 'hasbuffer', False): # getattr for classic pipe
81 return (True, True) # main has data, assume side is worth poking at.
81 return (True, True) # main has data, assume side is worth poking at.
82 fds = [self._main.fileno(), self._side.fileno()]
82 fds = [self._main.fileno(), self._side.fileno()]
83 try:
83 try:
84 act = util.poll(fds)
84 act = util.poll(fds)
85 except NotImplementedError:
85 except NotImplementedError:
86 # non supported yet case, assume all have data.
86 # non supported yet case, assume all have data.
87 act = fds
87 act = fds
88 return (self._main.fileno() in act, self._side.fileno() in act)
88 return (self._main.fileno() in act, self._side.fileno() in act)
89
89
90 def write(self, data):
90 def write(self, data):
91 return self._call('write', data)
91 return self._call('write', data)
92
92
93 def read(self, size):
93 def read(self, size):
94 return self._call('read', size)
94 r = self._call('read', size)
95 if size != 0 and not r:
96 # We've observed a condition that indicates the
97 # stdout closed unexpectedly. Check stderr one
98 # more time and snag anything that's there before
99 # letting anyone know the main part of the pipe
100 # closed prematurely.
101 _forwardoutput(self._ui, self._side)
102 return r
95
103
96 def readline(self):
104 def readline(self):
97 return self._call('readline')
105 return self._call('readline')
98
106
99 def _call(self, methname, data=None):
107 def _call(self, methname, data=None):
100 """call <methname> on "main", forward output of "side" while blocking
108 """call <methname> on "main", forward output of "side" while blocking
101 """
109 """
102 # data can be '' or 0
110 # data can be '' or 0
103 if (data is not None and not data) or self._main.closed:
111 if (data is not None and not data) or self._main.closed:
104 _forwardoutput(self._ui, self._side)
112 _forwardoutput(self._ui, self._side)
105 return ''
113 return ''
106 while True:
114 while True:
107 mainready, sideready = self._wait()
115 mainready, sideready = self._wait()
108 if sideready:
116 if sideready:
109 _forwardoutput(self._ui, self._side)
117 _forwardoutput(self._ui, self._side)
110 if mainready:
118 if mainready:
111 meth = getattr(self._main, methname)
119 meth = getattr(self._main, methname)
112 if data is None:
120 if data is None:
113 return meth()
121 return meth()
114 else:
122 else:
115 return meth(data)
123 return meth(data)
116
124
117 def close(self):
125 def close(self):
118 return self._main.close()
126 return self._main.close()
119
127
120 def flush(self):
128 def flush(self):
121 return self._main.flush()
129 return self._main.flush()
122
130
123 class sshpeer(wireproto.wirepeer):
131 class sshpeer(wireproto.wirepeer):
124 def __init__(self, ui, path, create=False):
132 def __init__(self, ui, path, create=False):
125 self._url = path
133 self._url = path
126 self.ui = ui
134 self.ui = ui
127 self.pipeo = self.pipei = self.pipee = None
135 self.pipeo = self.pipei = self.pipee = None
128
136
129 u = util.url(path, parsequery=False, parsefragment=False)
137 u = util.url(path, parsequery=False, parsefragment=False)
130 if u.scheme != 'ssh' or not u.host or u.path is None:
138 if u.scheme != 'ssh' or not u.host or u.path is None:
131 self._abort(error.RepoError(_("couldn't parse location %s") % path))
139 self._abort(error.RepoError(_("couldn't parse location %s") % path))
132
140
133 self.user = u.user
141 self.user = u.user
134 if u.passwd is not None:
142 if u.passwd is not None:
135 self._abort(error.RepoError(_("password in URL not supported")))
143 self._abort(error.RepoError(_("password in URL not supported")))
136 self.host = u.host
144 self.host = u.host
137 self.port = u.port
145 self.port = u.port
138 self.path = u.path or "."
146 self.path = u.path or "."
139
147
140 sshcmd = self.ui.config("ui", "ssh", "ssh")
148 sshcmd = self.ui.config("ui", "ssh", "ssh")
141 remotecmd = self.ui.config("ui", "remotecmd", "hg")
149 remotecmd = self.ui.config("ui", "remotecmd", "hg")
142
150
143 args = util.sshargs(sshcmd,
151 args = util.sshargs(sshcmd,
144 _serverquote(self.host),
152 _serverquote(self.host),
145 _serverquote(self.user),
153 _serverquote(self.user),
146 _serverquote(self.port))
154 _serverquote(self.port))
147
155
148 if create:
156 if create:
149 cmd = '%s %s %s' % (sshcmd, args,
157 cmd = '%s %s %s' % (sshcmd, args,
150 util.shellquote("%s init %s" %
158 util.shellquote("%s init %s" %
151 (_serverquote(remotecmd), _serverquote(self.path))))
159 (_serverquote(remotecmd), _serverquote(self.path))))
152 ui.debug('running %s\n' % cmd)
160 ui.debug('running %s\n' % cmd)
153 res = ui.system(cmd, blockedtag='sshpeer')
161 res = ui.system(cmd, blockedtag='sshpeer')
154 if res != 0:
162 if res != 0:
155 self._abort(error.RepoError(_("could not create remote repo")))
163 self._abort(error.RepoError(_("could not create remote repo")))
156
164
157 self._validaterepo(sshcmd, args, remotecmd)
165 self._validaterepo(sshcmd, args, remotecmd)
158
166
159 def url(self):
167 def url(self):
160 return self._url
168 return self._url
161
169
162 def _validaterepo(self, sshcmd, args, remotecmd):
170 def _validaterepo(self, sshcmd, args, remotecmd):
163 # cleanup up previous run
171 # cleanup up previous run
164 self.cleanup()
172 self.cleanup()
165
173
166 cmd = '%s %s %s' % (sshcmd, args,
174 cmd = '%s %s %s' % (sshcmd, args,
167 util.shellquote("%s -R %s serve --stdio" %
175 util.shellquote("%s -R %s serve --stdio" %
168 (_serverquote(remotecmd), _serverquote(self.path))))
176 (_serverquote(remotecmd), _serverquote(self.path))))
169 self.ui.debug('running %s\n' % cmd)
177 self.ui.debug('running %s\n' % cmd)
170 cmd = util.quotecommand(cmd)
178 cmd = util.quotecommand(cmd)
171
179
172 # while self.subprocess isn't used, having it allows the subprocess to
180 # while self.subprocess isn't used, having it allows the subprocess to
173 # to clean up correctly later
181 # to clean up correctly later
174 #
182 #
175 # no buffer allow the use of 'select'
183 # no buffer allow the use of 'select'
176 # feel free to remove buffering and select usage when we ultimately
184 # feel free to remove buffering and select usage when we ultimately
177 # move to threading.
185 # move to threading.
178 sub = util.popen4(cmd, bufsize=0)
186 sub = util.popen4(cmd, bufsize=0)
179 self.pipeo, self.pipei, self.pipee, self.subprocess = sub
187 self.pipeo, self.pipei, self.pipee, self.subprocess = sub
180
188
181 self.pipei = util.bufferedinputpipe(self.pipei)
189 self.pipei = util.bufferedinputpipe(self.pipei)
182 self.pipei = doublepipe(self.ui, self.pipei, self.pipee)
190 self.pipei = doublepipe(self.ui, self.pipei, self.pipee)
183 self.pipeo = doublepipe(self.ui, self.pipeo, self.pipee)
191 self.pipeo = doublepipe(self.ui, self.pipeo, self.pipee)
184
192
185 # skip any noise generated by remote shell
193 # skip any noise generated by remote shell
186 self._callstream("hello")
194 self._callstream("hello")
187 r = self._callstream("between", pairs=("%s-%s" % ("0"*40, "0"*40)))
195 r = self._callstream("between", pairs=("%s-%s" % ("0"*40, "0"*40)))
188 lines = ["", "dummy"]
196 lines = ["", "dummy"]
189 max_noise = 500
197 max_noise = 500
190 while lines[-1] and max_noise:
198 while lines[-1] and max_noise:
191 l = r.readline()
199 l = r.readline()
192 self.readerr()
200 self.readerr()
193 if lines[-1] == "1\n" and l == "\n":
201 if lines[-1] == "1\n" and l == "\n":
194 break
202 break
195 if l:
203 if l:
196 self.ui.debug("remote: ", l)
204 self.ui.debug("remote: ", l)
197 lines.append(l)
205 lines.append(l)
198 max_noise -= 1
206 max_noise -= 1
199 else:
207 else:
200 self._abort(error.RepoError(_('no suitable response from '
208 self._abort(error.RepoError(_('no suitable response from '
201 'remote hg')))
209 'remote hg')))
202
210
203 self._caps = set()
211 self._caps = set()
204 for l in reversed(lines):
212 for l in reversed(lines):
205 if l.startswith("capabilities:"):
213 if l.startswith("capabilities:"):
206 self._caps.update(l[:-1].split(":")[1].split())
214 self._caps.update(l[:-1].split(":")[1].split())
207 break
215 break
208
216
209 def _capabilities(self):
217 def _capabilities(self):
210 return self._caps
218 return self._caps
211
219
212 def readerr(self):
220 def readerr(self):
213 _forwardoutput(self.ui, self.pipee)
221 _forwardoutput(self.ui, self.pipee)
214
222
215 def _abort(self, exception):
223 def _abort(self, exception):
216 self.cleanup()
224 self.cleanup()
217 raise exception
225 raise exception
218
226
219 def cleanup(self):
227 def cleanup(self):
220 if self.pipeo is None:
228 if self.pipeo is None:
221 return
229 return
222 self.pipeo.close()
230 self.pipeo.close()
223 self.pipei.close()
231 self.pipei.close()
224 try:
232 try:
225 # read the error descriptor until EOF
233 # read the error descriptor until EOF
226 for l in self.pipee:
234 for l in self.pipee:
227 self.ui.status(_("remote: "), l)
235 self.ui.status(_("remote: "), l)
228 except (IOError, ValueError):
236 except (IOError, ValueError):
229 pass
237 pass
230 self.pipee.close()
238 self.pipee.close()
231
239
232 __del__ = cleanup
240 __del__ = cleanup
233
241
234 def _submitbatch(self, req):
242 def _submitbatch(self, req):
235 rsp = self._callstream("batch", cmds=wireproto.encodebatchcmds(req))
243 rsp = self._callstream("batch", cmds=wireproto.encodebatchcmds(req))
236 available = self._getamount()
244 available = self._getamount()
237 # TODO this response parsing is probably suboptimal for large
245 # TODO this response parsing is probably suboptimal for large
238 # batches with large responses.
246 # batches with large responses.
239 toread = min(available, 1024)
247 toread = min(available, 1024)
240 work = rsp.read(toread)
248 work = rsp.read(toread)
241 available -= toread
249 available -= toread
242 chunk = work
250 chunk = work
243 while chunk:
251 while chunk:
244 while ';' in work:
252 while ';' in work:
245 one, work = work.split(';', 1)
253 one, work = work.split(';', 1)
246 yield wireproto.unescapearg(one)
254 yield wireproto.unescapearg(one)
247 toread = min(available, 1024)
255 toread = min(available, 1024)
248 chunk = rsp.read(toread)
256 chunk = rsp.read(toread)
249 available -= toread
257 available -= toread
250 work += chunk
258 work += chunk
251 yield wireproto.unescapearg(work)
259 yield wireproto.unescapearg(work)
252
260
253 def _callstream(self, cmd, **args):
261 def _callstream(self, cmd, **args):
254 self.ui.debug("sending %s command\n" % cmd)
262 self.ui.debug("sending %s command\n" % cmd)
255 self.pipeo.write("%s\n" % cmd)
263 self.pipeo.write("%s\n" % cmd)
256 _func, names = wireproto.commands[cmd]
264 _func, names = wireproto.commands[cmd]
257 keys = names.split()
265 keys = names.split()
258 wireargs = {}
266 wireargs = {}
259 for k in keys:
267 for k in keys:
260 if k == '*':
268 if k == '*':
261 wireargs['*'] = args
269 wireargs['*'] = args
262 break
270 break
263 else:
271 else:
264 wireargs[k] = args[k]
272 wireargs[k] = args[k]
265 del args[k]
273 del args[k]
266 for k, v in sorted(wireargs.iteritems()):
274 for k, v in sorted(wireargs.iteritems()):
267 self.pipeo.write("%s %d\n" % (k, len(v)))
275 self.pipeo.write("%s %d\n" % (k, len(v)))
268 if isinstance(v, dict):
276 if isinstance(v, dict):
269 for dk, dv in v.iteritems():
277 for dk, dv in v.iteritems():
270 self.pipeo.write("%s %d\n" % (dk, len(dv)))
278 self.pipeo.write("%s %d\n" % (dk, len(dv)))
271 self.pipeo.write(dv)
279 self.pipeo.write(dv)
272 else:
280 else:
273 self.pipeo.write(v)
281 self.pipeo.write(v)
274 self.pipeo.flush()
282 self.pipeo.flush()
275
283
276 return self.pipei
284 return self.pipei
277
285
278 def _callcompressable(self, cmd, **args):
286 def _callcompressable(self, cmd, **args):
279 return self._callstream(cmd, **args)
287 return self._callstream(cmd, **args)
280
288
281 def _call(self, cmd, **args):
289 def _call(self, cmd, **args):
282 self._callstream(cmd, **args)
290 self._callstream(cmd, **args)
283 return self._recv()
291 return self._recv()
284
292
285 def _callpush(self, cmd, fp, **args):
293 def _callpush(self, cmd, fp, **args):
286 r = self._call(cmd, **args)
294 r = self._call(cmd, **args)
287 if r:
295 if r:
288 return '', r
296 return '', r
289 for d in iter(lambda: fp.read(4096), ''):
297 for d in iter(lambda: fp.read(4096), ''):
290 self._send(d)
298 self._send(d)
291 self._send("", flush=True)
299 self._send("", flush=True)
292 r = self._recv()
300 r = self._recv()
293 if r:
301 if r:
294 return '', r
302 return '', r
295 return self._recv(), ''
303 return self._recv(), ''
296
304
297 def _calltwowaystream(self, cmd, fp, **args):
305 def _calltwowaystream(self, cmd, fp, **args):
298 r = self._call(cmd, **args)
306 r = self._call(cmd, **args)
299 if r:
307 if r:
300 # XXX needs to be made better
308 # XXX needs to be made better
301 raise error.Abort(_('unexpected remote reply: %s') % r)
309 raise error.Abort(_('unexpected remote reply: %s') % r)
302 for d in iter(lambda: fp.read(4096), ''):
310 for d in iter(lambda: fp.read(4096), ''):
303 self._send(d)
311 self._send(d)
304 self._send("", flush=True)
312 self._send("", flush=True)
305 return self.pipei
313 return self.pipei
306
314
307 def _getamount(self):
315 def _getamount(self):
308 l = self.pipei.readline()
316 l = self.pipei.readline()
309 if l == '\n':
317 if l == '\n':
310 self.readerr()
318 self.readerr()
311 msg = _('check previous remote output')
319 msg = _('check previous remote output')
312 self._abort(error.OutOfBandError(hint=msg))
320 self._abort(error.OutOfBandError(hint=msg))
313 self.readerr()
321 self.readerr()
314 try:
322 try:
315 return int(l)
323 return int(l)
316 except ValueError:
324 except ValueError:
317 self._abort(error.ResponseError(_("unexpected response:"), l))
325 self._abort(error.ResponseError(_("unexpected response:"), l))
318
326
319 def _recv(self):
327 def _recv(self):
320 return self.pipei.read(self._getamount())
328 return self.pipei.read(self._getamount())
321
329
322 def _send(self, data, flush=False):
330 def _send(self, data, flush=False):
323 self.pipeo.write("%d\n" % len(data))
331 self.pipeo.write("%d\n" % len(data))
324 if data:
332 if data:
325 self.pipeo.write(data)
333 self.pipeo.write(data)
326 if flush:
334 if flush:
327 self.pipeo.flush()
335 self.pipeo.flush()
328 self.readerr()
336 self.readerr()
329
337
330 def lock(self):
338 def lock(self):
331 self._call("lock")
339 self._call("lock")
332 return remotelock(self)
340 return remotelock(self)
333
341
334 def unlock(self):
342 def unlock(self):
335 self._call("unlock")
343 self._call("unlock")
336
344
337 def addchangegroup(self, cg, source, url, lock=None):
345 def addchangegroup(self, cg, source, url, lock=None):
338 '''Send a changegroup to the remote server. Return an integer
346 '''Send a changegroup to the remote server. Return an integer
339 similar to unbundle(). DEPRECATED, since it requires locking the
347 similar to unbundle(). DEPRECATED, since it requires locking the
340 remote.'''
348 remote.'''
341 d = self._call("addchangegroup")
349 d = self._call("addchangegroup")
342 if d:
350 if d:
343 self._abort(error.RepoError(_("push refused: %s") % d))
351 self._abort(error.RepoError(_("push refused: %s") % d))
344 for d in iter(lambda: cg.read(4096), ''):
352 for d in iter(lambda: cg.read(4096), ''):
345 self.pipeo.write(d)
353 self.pipeo.write(d)
346 self.readerr()
354 self.readerr()
347
355
348 self.pipeo.flush()
356 self.pipeo.flush()
349
357
350 self.readerr()
358 self.readerr()
351 r = self._recv()
359 r = self._recv()
352 if not r:
360 if not r:
353 return 1
361 return 1
354 try:
362 try:
355 return int(r)
363 return int(r)
356 except ValueError:
364 except ValueError:
357 self._abort(error.ResponseError(_("unexpected response:"), r))
365 self._abort(error.ResponseError(_("unexpected response:"), r))
358
366
359 instance = sshpeer
367 instance = sshpeer
General Comments 0
You need to be logged in to leave comments. Login now