##// END OF EJS Templates
sshpeer: extract pipe cleanup logic to own function...
Gregory Szorc -
r35951:805edf16 default
parent child Browse files
Show More
@@ -1,379 +1,386 b''
1 # sshpeer.py - ssh repository proxy class for mercurial
1 # sshpeer.py - ssh repository proxy class for mercurial
2 #
2 #
3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import re
10 import re
11
11
12 from .i18n import _
12 from .i18n import _
13 from . import (
13 from . import (
14 error,
14 error,
15 pycompat,
15 pycompat,
16 util,
16 util,
17 wireproto,
17 wireproto,
18 )
18 )
19
19
20 def _serverquote(s):
20 def _serverquote(s):
21 """quote a string for the remote shell ... which we assume is sh"""
21 """quote a string for the remote shell ... which we assume is sh"""
22 if not s:
22 if not s:
23 return s
23 return s
24 if re.match('[a-zA-Z0-9@%_+=:,./-]*$', s):
24 if re.match('[a-zA-Z0-9@%_+=:,./-]*$', s):
25 return s
25 return s
26 return "'%s'" % s.replace("'", "'\\''")
26 return "'%s'" % s.replace("'", "'\\''")
27
27
28 def _forwardoutput(ui, pipe):
28 def _forwardoutput(ui, pipe):
29 """display all data currently available on pipe as remote output.
29 """display all data currently available on pipe as remote output.
30
30
31 This is non blocking."""
31 This is non blocking."""
32 s = util.readpipe(pipe)
32 s = util.readpipe(pipe)
33 if s:
33 if s:
34 for l in s.splitlines():
34 for l in s.splitlines():
35 ui.status(_("remote: "), l, '\n')
35 ui.status(_("remote: "), l, '\n')
36
36
37 class doublepipe(object):
37 class doublepipe(object):
38 """Operate a side-channel pipe in addition of a main one
38 """Operate a side-channel pipe in addition of a main one
39
39
40 The side-channel pipe contains server output to be forwarded to the user
40 The side-channel pipe contains server output to be forwarded to the user
41 input. The double pipe will behave as the "main" pipe, but will ensure the
41 input. The double pipe will behave as the "main" pipe, but will ensure the
42 content of the "side" pipe is properly processed while we wait for blocking
42 content of the "side" pipe is properly processed while we wait for blocking
43 call on the "main" pipe.
43 call on the "main" pipe.
44
44
45 If large amounts of data are read from "main", the forward will cease after
45 If large amounts of data are read from "main", the forward will cease after
46 the first bytes start to appear. This simplifies the implementation
46 the first bytes start to appear. This simplifies the implementation
47 without affecting actual output of sshpeer too much as we rarely issue
47 without affecting actual output of sshpeer too much as we rarely issue
48 large read for data not yet emitted by the server.
48 large read for data not yet emitted by the server.
49
49
50 The main pipe is expected to be a 'bufferedinputpipe' from the util module
50 The main pipe is expected to be a 'bufferedinputpipe' from the util module
51 that handle all the os specific bits. This class lives in this module
51 that handle all the os specific bits. This class lives in this module
52 because it focus on behavior specific to the ssh protocol."""
52 because it focus on behavior specific to the ssh protocol."""
53
53
54 def __init__(self, ui, main, side):
54 def __init__(self, ui, main, side):
55 self._ui = ui
55 self._ui = ui
56 self._main = main
56 self._main = main
57 self._side = side
57 self._side = side
58
58
59 def _wait(self):
59 def _wait(self):
60 """wait until some data are available on main or side
60 """wait until some data are available on main or side
61
61
62 return a pair of boolean (ismainready, issideready)
62 return a pair of boolean (ismainready, issideready)
63
63
64 (This will only wait for data if the setup is supported by `util.poll`)
64 (This will only wait for data if the setup is supported by `util.poll`)
65 """
65 """
66 if getattr(self._main, 'hasbuffer', False): # getattr for classic pipe
66 if getattr(self._main, 'hasbuffer', False): # getattr for classic pipe
67 return (True, True) # main has data, assume side is worth poking at.
67 return (True, True) # main has data, assume side is worth poking at.
68 fds = [self._main.fileno(), self._side.fileno()]
68 fds = [self._main.fileno(), self._side.fileno()]
69 try:
69 try:
70 act = util.poll(fds)
70 act = util.poll(fds)
71 except NotImplementedError:
71 except NotImplementedError:
72 # non supported yet case, assume all have data.
72 # non supported yet case, assume all have data.
73 act = fds
73 act = fds
74 return (self._main.fileno() in act, self._side.fileno() in act)
74 return (self._main.fileno() in act, self._side.fileno() in act)
75
75
76 def write(self, data):
76 def write(self, data):
77 return self._call('write', data)
77 return self._call('write', data)
78
78
79 def read(self, size):
79 def read(self, size):
80 r = self._call('read', size)
80 r = self._call('read', size)
81 if size != 0 and not r:
81 if size != 0 and not r:
82 # We've observed a condition that indicates the
82 # We've observed a condition that indicates the
83 # stdout closed unexpectedly. Check stderr one
83 # stdout closed unexpectedly. Check stderr one
84 # more time and snag anything that's there before
84 # more time and snag anything that's there before
85 # letting anyone know the main part of the pipe
85 # letting anyone know the main part of the pipe
86 # closed prematurely.
86 # closed prematurely.
87 _forwardoutput(self._ui, self._side)
87 _forwardoutput(self._ui, self._side)
88 return r
88 return r
89
89
90 def readline(self):
90 def readline(self):
91 return self._call('readline')
91 return self._call('readline')
92
92
93 def _call(self, methname, data=None):
93 def _call(self, methname, data=None):
94 """call <methname> on "main", forward output of "side" while blocking
94 """call <methname> on "main", forward output of "side" while blocking
95 """
95 """
96 # data can be '' or 0
96 # data can be '' or 0
97 if (data is not None and not data) or self._main.closed:
97 if (data is not None and not data) or self._main.closed:
98 _forwardoutput(self._ui, self._side)
98 _forwardoutput(self._ui, self._side)
99 return ''
99 return ''
100 while True:
100 while True:
101 mainready, sideready = self._wait()
101 mainready, sideready = self._wait()
102 if sideready:
102 if sideready:
103 _forwardoutput(self._ui, self._side)
103 _forwardoutput(self._ui, self._side)
104 if mainready:
104 if mainready:
105 meth = getattr(self._main, methname)
105 meth = getattr(self._main, methname)
106 if data is None:
106 if data is None:
107 return meth()
107 return meth()
108 else:
108 else:
109 return meth(data)
109 return meth(data)
110
110
111 def close(self):
111 def close(self):
112 return self._main.close()
112 return self._main.close()
113
113
114 def flush(self):
114 def flush(self):
115 return self._main.flush()
115 return self._main.flush()
116
116
117 def _cleanuppipes(ui, pipei, pipeo, pipee):
118 """Clean up pipes used by an SSH connection."""
119 if pipeo:
120 pipeo.close()
121 if pipei:
122 pipei.close()
123
124 if pipee:
125 # Try to read from the err descriptor until EOF.
126 try:
127 for l in pipee:
128 ui.status(_('remote: '), l)
129 except (IOError, ValueError):
130 pass
131
132 pipee.close()
133
117 class sshpeer(wireproto.wirepeer):
134 class sshpeer(wireproto.wirepeer):
118 def __init__(self, ui, path, create=False, sshstate=None):
135 def __init__(self, ui, path, create=False, sshstate=None):
119 self._url = path
136 self._url = path
120 self._ui = ui
137 self._ui = ui
121 self._pipeo = self._pipei = self._pipee = None
138 self._pipeo = self._pipei = self._pipee = None
122
139
123 u = util.url(path, parsequery=False, parsefragment=False)
140 u = util.url(path, parsequery=False, parsefragment=False)
124 self._path = u.path or '.'
141 self._path = u.path or '.'
125
142
126 self._validaterepo(*sshstate)
143 self._validaterepo(*sshstate)
127
144
128 # Begin of _basepeer interface.
145 # Begin of _basepeer interface.
129
146
130 @util.propertycache
147 @util.propertycache
131 def ui(self):
148 def ui(self):
132 return self._ui
149 return self._ui
133
150
134 def url(self):
151 def url(self):
135 return self._url
152 return self._url
136
153
137 def local(self):
154 def local(self):
138 return None
155 return None
139
156
140 def peer(self):
157 def peer(self):
141 return self
158 return self
142
159
143 def canpush(self):
160 def canpush(self):
144 return True
161 return True
145
162
146 def close(self):
163 def close(self):
147 pass
164 pass
148
165
149 # End of _basepeer interface.
166 # End of _basepeer interface.
150
167
151 # Begin of _basewirecommands interface.
168 # Begin of _basewirecommands interface.
152
169
153 def capabilities(self):
170 def capabilities(self):
154 return self._caps
171 return self._caps
155
172
156 # End of _basewirecommands interface.
173 # End of _basewirecommands interface.
157
174
158 def _validaterepo(self, sshcmd, args, remotecmd, sshenv=None):
175 def _validaterepo(self, sshcmd, args, remotecmd, sshenv=None):
159 # cleanup up previous run
176 # cleanup up previous run
160 self._cleanup()
177 self._cleanup()
161
178
162 cmd = '%s %s %s' % (sshcmd, args,
179 cmd = '%s %s %s' % (sshcmd, args,
163 util.shellquote("%s -R %s serve --stdio" %
180 util.shellquote("%s -R %s serve --stdio" %
164 (_serverquote(remotecmd), _serverquote(self._path))))
181 (_serverquote(remotecmd), _serverquote(self._path))))
165 self.ui.debug('running %s\n' % cmd)
182 self.ui.debug('running %s\n' % cmd)
166 cmd = util.quotecommand(cmd)
183 cmd = util.quotecommand(cmd)
167
184
168 # while self._subprocess isn't used, having it allows the subprocess to
185 # while self._subprocess isn't used, having it allows the subprocess to
169 # to clean up correctly later
186 # to clean up correctly later
170 #
187 #
171 # no buffer allow the use of 'select'
188 # no buffer allow the use of 'select'
172 # feel free to remove buffering and select usage when we ultimately
189 # feel free to remove buffering and select usage when we ultimately
173 # move to threading.
190 # move to threading.
174 sub = util.popen4(cmd, bufsize=0, env=sshenv)
191 sub = util.popen4(cmd, bufsize=0, env=sshenv)
175 self._pipeo, self._pipei, self._pipee, self._subprocess = sub
192 self._pipeo, self._pipei, self._pipee, self._subprocess = sub
176
193
177 self._pipei = util.bufferedinputpipe(self._pipei)
194 self._pipei = util.bufferedinputpipe(self._pipei)
178 self._pipei = doublepipe(self.ui, self._pipei, self._pipee)
195 self._pipei = doublepipe(self.ui, self._pipei, self._pipee)
179 self._pipeo = doublepipe(self.ui, self._pipeo, self._pipee)
196 self._pipeo = doublepipe(self.ui, self._pipeo, self._pipee)
180
197
181 def badresponse():
198 def badresponse():
182 msg = _("no suitable response from remote hg")
199 msg = _("no suitable response from remote hg")
183 hint = self.ui.config("ui", "ssherrorhint")
200 hint = self.ui.config("ui", "ssherrorhint")
184 self._abort(error.RepoError(msg, hint=hint))
201 self._abort(error.RepoError(msg, hint=hint))
185
202
186 try:
203 try:
187 # skip any noise generated by remote shell
204 # skip any noise generated by remote shell
188 self._callstream("hello")
205 self._callstream("hello")
189 r = self._callstream("between", pairs=("%s-%s" % ("0"*40, "0"*40)))
206 r = self._callstream("between", pairs=("%s-%s" % ("0"*40, "0"*40)))
190 except IOError:
207 except IOError:
191 badresponse()
208 badresponse()
192
209
193 lines = ["", "dummy"]
210 lines = ["", "dummy"]
194 max_noise = 500
211 max_noise = 500
195 while lines[-1] and max_noise:
212 while lines[-1] and max_noise:
196 try:
213 try:
197 l = r.readline()
214 l = r.readline()
198 self._readerr()
215 self._readerr()
199 if lines[-1] == "1\n" and l == "\n":
216 if lines[-1] == "1\n" and l == "\n":
200 break
217 break
201 if l:
218 if l:
202 self.ui.debug("remote: ", l)
219 self.ui.debug("remote: ", l)
203 lines.append(l)
220 lines.append(l)
204 max_noise -= 1
221 max_noise -= 1
205 except IOError:
222 except IOError:
206 badresponse()
223 badresponse()
207 else:
224 else:
208 badresponse()
225 badresponse()
209
226
210 self._caps = set()
227 self._caps = set()
211 for l in reversed(lines):
228 for l in reversed(lines):
212 if l.startswith("capabilities:"):
229 if l.startswith("capabilities:"):
213 self._caps.update(l[:-1].split(":")[1].split())
230 self._caps.update(l[:-1].split(":")[1].split())
214 break
231 break
215
232
216 def _readerr(self):
233 def _readerr(self):
217 _forwardoutput(self.ui, self._pipee)
234 _forwardoutput(self.ui, self._pipee)
218
235
219 def _abort(self, exception):
236 def _abort(self, exception):
220 self._cleanup()
237 self._cleanup()
221 raise exception
238 raise exception
222
239
223 def _cleanup(self):
240 def _cleanup(self):
224 if self._pipeo is None:
241 _cleanuppipes(self.ui, self._pipei, self._pipeo, self._pipee)
225 return
226 self._pipeo.close()
227 self._pipei.close()
228 try:
229 # read the error descriptor until EOF
230 for l in self._pipee:
231 self.ui.status(_("remote: "), l)
232 except (IOError, ValueError):
233 pass
234 self._pipee.close()
235
242
236 __del__ = _cleanup
243 __del__ = _cleanup
237
244
238 def _submitbatch(self, req):
245 def _submitbatch(self, req):
239 rsp = self._callstream("batch", cmds=wireproto.encodebatchcmds(req))
246 rsp = self._callstream("batch", cmds=wireproto.encodebatchcmds(req))
240 available = self._getamount()
247 available = self._getamount()
241 # TODO this response parsing is probably suboptimal for large
248 # TODO this response parsing is probably suboptimal for large
242 # batches with large responses.
249 # batches with large responses.
243 toread = min(available, 1024)
250 toread = min(available, 1024)
244 work = rsp.read(toread)
251 work = rsp.read(toread)
245 available -= toread
252 available -= toread
246 chunk = work
253 chunk = work
247 while chunk:
254 while chunk:
248 while ';' in work:
255 while ';' in work:
249 one, work = work.split(';', 1)
256 one, work = work.split(';', 1)
250 yield wireproto.unescapearg(one)
257 yield wireproto.unescapearg(one)
251 toread = min(available, 1024)
258 toread = min(available, 1024)
252 chunk = rsp.read(toread)
259 chunk = rsp.read(toread)
253 available -= toread
260 available -= toread
254 work += chunk
261 work += chunk
255 yield wireproto.unescapearg(work)
262 yield wireproto.unescapearg(work)
256
263
257 def _callstream(self, cmd, **args):
264 def _callstream(self, cmd, **args):
258 args = pycompat.byteskwargs(args)
265 args = pycompat.byteskwargs(args)
259 if (self.ui.debugflag
266 if (self.ui.debugflag
260 and self.ui.configbool('devel', 'debug.peer-request')):
267 and self.ui.configbool('devel', 'debug.peer-request')):
261 dbg = self.ui.debug
268 dbg = self.ui.debug
262 line = 'devel-peer-request: %s\n'
269 line = 'devel-peer-request: %s\n'
263 dbg(line % cmd)
270 dbg(line % cmd)
264 for key, value in sorted(args.items()):
271 for key, value in sorted(args.items()):
265 if not isinstance(value, dict):
272 if not isinstance(value, dict):
266 dbg(line % ' %s: %d bytes' % (key, len(value)))
273 dbg(line % ' %s: %d bytes' % (key, len(value)))
267 else:
274 else:
268 for dk, dv in sorted(value.items()):
275 for dk, dv in sorted(value.items()):
269 dbg(line % ' %s-%s: %d' % (key, dk, len(dv)))
276 dbg(line % ' %s-%s: %d' % (key, dk, len(dv)))
270 self.ui.debug("sending %s command\n" % cmd)
277 self.ui.debug("sending %s command\n" % cmd)
271 self._pipeo.write("%s\n" % cmd)
278 self._pipeo.write("%s\n" % cmd)
272 _func, names = wireproto.commands[cmd]
279 _func, names = wireproto.commands[cmd]
273 keys = names.split()
280 keys = names.split()
274 wireargs = {}
281 wireargs = {}
275 for k in keys:
282 for k in keys:
276 if k == '*':
283 if k == '*':
277 wireargs['*'] = args
284 wireargs['*'] = args
278 break
285 break
279 else:
286 else:
280 wireargs[k] = args[k]
287 wireargs[k] = args[k]
281 del args[k]
288 del args[k]
282 for k, v in sorted(wireargs.iteritems()):
289 for k, v in sorted(wireargs.iteritems()):
283 self._pipeo.write("%s %d\n" % (k, len(v)))
290 self._pipeo.write("%s %d\n" % (k, len(v)))
284 if isinstance(v, dict):
291 if isinstance(v, dict):
285 for dk, dv in v.iteritems():
292 for dk, dv in v.iteritems():
286 self._pipeo.write("%s %d\n" % (dk, len(dv)))
293 self._pipeo.write("%s %d\n" % (dk, len(dv)))
287 self._pipeo.write(dv)
294 self._pipeo.write(dv)
288 else:
295 else:
289 self._pipeo.write(v)
296 self._pipeo.write(v)
290 self._pipeo.flush()
297 self._pipeo.flush()
291
298
292 return self._pipei
299 return self._pipei
293
300
294 def _callcompressable(self, cmd, **args):
301 def _callcompressable(self, cmd, **args):
295 return self._callstream(cmd, **args)
302 return self._callstream(cmd, **args)
296
303
297 def _call(self, cmd, **args):
304 def _call(self, cmd, **args):
298 self._callstream(cmd, **args)
305 self._callstream(cmd, **args)
299 return self._recv()
306 return self._recv()
300
307
301 def _callpush(self, cmd, fp, **args):
308 def _callpush(self, cmd, fp, **args):
302 r = self._call(cmd, **args)
309 r = self._call(cmd, **args)
303 if r:
310 if r:
304 return '', r
311 return '', r
305 for d in iter(lambda: fp.read(4096), ''):
312 for d in iter(lambda: fp.read(4096), ''):
306 self._send(d)
313 self._send(d)
307 self._send("", flush=True)
314 self._send("", flush=True)
308 r = self._recv()
315 r = self._recv()
309 if r:
316 if r:
310 return '', r
317 return '', r
311 return self._recv(), ''
318 return self._recv(), ''
312
319
313 def _calltwowaystream(self, cmd, fp, **args):
320 def _calltwowaystream(self, cmd, fp, **args):
314 r = self._call(cmd, **args)
321 r = self._call(cmd, **args)
315 if r:
322 if r:
316 # XXX needs to be made better
323 # XXX needs to be made better
317 raise error.Abort(_('unexpected remote reply: %s') % r)
324 raise error.Abort(_('unexpected remote reply: %s') % r)
318 for d in iter(lambda: fp.read(4096), ''):
325 for d in iter(lambda: fp.read(4096), ''):
319 self._send(d)
326 self._send(d)
320 self._send("", flush=True)
327 self._send("", flush=True)
321 return self._pipei
328 return self._pipei
322
329
323 def _getamount(self):
330 def _getamount(self):
324 l = self._pipei.readline()
331 l = self._pipei.readline()
325 if l == '\n':
332 if l == '\n':
326 self._readerr()
333 self._readerr()
327 msg = _('check previous remote output')
334 msg = _('check previous remote output')
328 self._abort(error.OutOfBandError(hint=msg))
335 self._abort(error.OutOfBandError(hint=msg))
329 self._readerr()
336 self._readerr()
330 try:
337 try:
331 return int(l)
338 return int(l)
332 except ValueError:
339 except ValueError:
333 self._abort(error.ResponseError(_("unexpected response:"), l))
340 self._abort(error.ResponseError(_("unexpected response:"), l))
334
341
335 def _recv(self):
342 def _recv(self):
336 return self._pipei.read(self._getamount())
343 return self._pipei.read(self._getamount())
337
344
338 def _send(self, data, flush=False):
345 def _send(self, data, flush=False):
339 self._pipeo.write("%d\n" % len(data))
346 self._pipeo.write("%d\n" % len(data))
340 if data:
347 if data:
341 self._pipeo.write(data)
348 self._pipeo.write(data)
342 if flush:
349 if flush:
343 self._pipeo.flush()
350 self._pipeo.flush()
344 self._readerr()
351 self._readerr()
345
352
346 def instance(ui, path, create):
353 def instance(ui, path, create):
347 """Create an SSH peer.
354 """Create an SSH peer.
348
355
349 The returned object conforms to the ``wireproto.wirepeer`` interface.
356 The returned object conforms to the ``wireproto.wirepeer`` interface.
350 """
357 """
351 u = util.url(path, parsequery=False, parsefragment=False)
358 u = util.url(path, parsequery=False, parsefragment=False)
352 if u.scheme != 'ssh' or not u.host or u.path is None:
359 if u.scheme != 'ssh' or not u.host or u.path is None:
353 raise error.RepoError(_("couldn't parse location %s") % path)
360 raise error.RepoError(_("couldn't parse location %s") % path)
354
361
355 util.checksafessh(path)
362 util.checksafessh(path)
356
363
357 if u.passwd is not None:
364 if u.passwd is not None:
358 raise error.RepoError(_('password in URL not supported'))
365 raise error.RepoError(_('password in URL not supported'))
359
366
360 sshcmd = ui.config('ui', 'ssh')
367 sshcmd = ui.config('ui', 'ssh')
361 remotecmd = ui.config('ui', 'remotecmd')
368 remotecmd = ui.config('ui', 'remotecmd')
362 sshaddenv = dict(ui.configitems('sshenv'))
369 sshaddenv = dict(ui.configitems('sshenv'))
363 sshenv = util.shellenviron(sshaddenv)
370 sshenv = util.shellenviron(sshaddenv)
364 remotepath = u.path or '.'
371 remotepath = u.path or '.'
365
372
366 args = util.sshargs(sshcmd, u.host, u.user, u.port)
373 args = util.sshargs(sshcmd, u.host, u.user, u.port)
367
374
368 if create:
375 if create:
369 cmd = '%s %s %s' % (sshcmd, args,
376 cmd = '%s %s %s' % (sshcmd, args,
370 util.shellquote('%s init %s' %
377 util.shellquote('%s init %s' %
371 (_serverquote(remotecmd), _serverquote(remotepath))))
378 (_serverquote(remotecmd), _serverquote(remotepath))))
372 ui.debug('running %s\n' % cmd)
379 ui.debug('running %s\n' % cmd)
373 res = ui.system(cmd, blockedtag='sshpeer', environ=sshenv)
380 res = ui.system(cmd, blockedtag='sshpeer', environ=sshenv)
374 if res != 0:
381 if res != 0:
375 raise error.RepoError(_('could not create remote repo'))
382 raise error.RepoError(_('could not create remote repo'))
376
383
377 sshstate = (sshcmd, args, remotecmd, sshenv)
384 sshstate = (sshcmd, args, remotecmd, sshenv)
378
385
379 return sshpeer(ui, path, create=create, sshstate=sshstate)
386 return sshpeer(ui, path, create=create, sshstate=sshstate)
General Comments 0
You need to be logged in to leave comments. Login now