##// END OF EJS Templates
sshpeer: remove frivolous call to _cleanup()...
Gregory Szorc -
r35952:94ba2993 default
parent child Browse files
Show More
@@ -1,386 +1,385 b''
1 # sshpeer.py - ssh repository proxy class for mercurial
1 # sshpeer.py - ssh repository proxy class for mercurial
2 #
2 #
3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import re
10 import re
11
11
12 from .i18n import _
12 from .i18n import _
13 from . import (
13 from . import (
14 error,
14 error,
15 pycompat,
15 pycompat,
16 util,
16 util,
17 wireproto,
17 wireproto,
18 )
18 )
19
19
20 def _serverquote(s):
20 def _serverquote(s):
21 """quote a string for the remote shell ... which we assume is sh"""
21 """quote a string for the remote shell ... which we assume is sh"""
22 if not s:
22 if not s:
23 return s
23 return s
24 if re.match('[a-zA-Z0-9@%_+=:,./-]*$', s):
24 if re.match('[a-zA-Z0-9@%_+=:,./-]*$', s):
25 return s
25 return s
26 return "'%s'" % s.replace("'", "'\\''")
26 return "'%s'" % s.replace("'", "'\\''")
27
27
28 def _forwardoutput(ui, pipe):
28 def _forwardoutput(ui, pipe):
29 """display all data currently available on pipe as remote output.
29 """display all data currently available on pipe as remote output.
30
30
31 This is non blocking."""
31 This is non blocking."""
32 s = util.readpipe(pipe)
32 s = util.readpipe(pipe)
33 if s:
33 if s:
34 for l in s.splitlines():
34 for l in s.splitlines():
35 ui.status(_("remote: "), l, '\n')
35 ui.status(_("remote: "), l, '\n')
36
36
37 class doublepipe(object):
37 class doublepipe(object):
38 """Operate a side-channel pipe in addition of a main one
38 """Operate a side-channel pipe in addition of a main one
39
39
40 The side-channel pipe contains server output to be forwarded to the user
40 The side-channel pipe contains server output to be forwarded to the user
41 input. The double pipe will behave as the "main" pipe, but will ensure the
41 input. The double pipe will behave as the "main" pipe, but will ensure the
42 content of the "side" pipe is properly processed while we wait for blocking
42 content of the "side" pipe is properly processed while we wait for blocking
43 call on the "main" pipe.
43 call on the "main" pipe.
44
44
45 If large amounts of data are read from "main", the forward will cease after
45 If large amounts of data are read from "main", the forward will cease after
46 the first bytes start to appear. This simplifies the implementation
46 the first bytes start to appear. This simplifies the implementation
47 without affecting actual output of sshpeer too much as we rarely issue
47 without affecting actual output of sshpeer too much as we rarely issue
48 large read for data not yet emitted by the server.
48 large read for data not yet emitted by the server.
49
49
50 The main pipe is expected to be a 'bufferedinputpipe' from the util module
50 The main pipe is expected to be a 'bufferedinputpipe' from the util module
51 that handle all the os specific bits. This class lives in this module
51 that handle all the os specific bits. This class lives in this module
52 because it focus on behavior specific to the ssh protocol."""
52 because it focus on behavior specific to the ssh protocol."""
53
53
54 def __init__(self, ui, main, side):
54 def __init__(self, ui, main, side):
55 self._ui = ui
55 self._ui = ui
56 self._main = main
56 self._main = main
57 self._side = side
57 self._side = side
58
58
59 def _wait(self):
59 def _wait(self):
60 """wait until some data are available on main or side
60 """wait until some data are available on main or side
61
61
62 return a pair of boolean (ismainready, issideready)
62 return a pair of boolean (ismainready, issideready)
63
63
64 (This will only wait for data if the setup is supported by `util.poll`)
64 (This will only wait for data if the setup is supported by `util.poll`)
65 """
65 """
66 if getattr(self._main, 'hasbuffer', False): # getattr for classic pipe
66 if getattr(self._main, 'hasbuffer', False): # getattr for classic pipe
67 return (True, True) # main has data, assume side is worth poking at.
67 return (True, True) # main has data, assume side is worth poking at.
68 fds = [self._main.fileno(), self._side.fileno()]
68 fds = [self._main.fileno(), self._side.fileno()]
69 try:
69 try:
70 act = util.poll(fds)
70 act = util.poll(fds)
71 except NotImplementedError:
71 except NotImplementedError:
72 # non supported yet case, assume all have data.
72 # non supported yet case, assume all have data.
73 act = fds
73 act = fds
74 return (self._main.fileno() in act, self._side.fileno() in act)
74 return (self._main.fileno() in act, self._side.fileno() in act)
75
75
76 def write(self, data):
76 def write(self, data):
77 return self._call('write', data)
77 return self._call('write', data)
78
78
79 def read(self, size):
79 def read(self, size):
80 r = self._call('read', size)
80 r = self._call('read', size)
81 if size != 0 and not r:
81 if size != 0 and not r:
82 # We've observed a condition that indicates the
82 # We've observed a condition that indicates the
83 # stdout closed unexpectedly. Check stderr one
83 # stdout closed unexpectedly. Check stderr one
84 # more time and snag anything that's there before
84 # more time and snag anything that's there before
85 # letting anyone know the main part of the pipe
85 # letting anyone know the main part of the pipe
86 # closed prematurely.
86 # closed prematurely.
87 _forwardoutput(self._ui, self._side)
87 _forwardoutput(self._ui, self._side)
88 return r
88 return r
89
89
90 def readline(self):
90 def readline(self):
91 return self._call('readline')
91 return self._call('readline')
92
92
93 def _call(self, methname, data=None):
93 def _call(self, methname, data=None):
94 """call <methname> on "main", forward output of "side" while blocking
94 """call <methname> on "main", forward output of "side" while blocking
95 """
95 """
96 # data can be '' or 0
96 # data can be '' or 0
97 if (data is not None and not data) or self._main.closed:
97 if (data is not None and not data) or self._main.closed:
98 _forwardoutput(self._ui, self._side)
98 _forwardoutput(self._ui, self._side)
99 return ''
99 return ''
100 while True:
100 while True:
101 mainready, sideready = self._wait()
101 mainready, sideready = self._wait()
102 if sideready:
102 if sideready:
103 _forwardoutput(self._ui, self._side)
103 _forwardoutput(self._ui, self._side)
104 if mainready:
104 if mainready:
105 meth = getattr(self._main, methname)
105 meth = getattr(self._main, methname)
106 if data is None:
106 if data is None:
107 return meth()
107 return meth()
108 else:
108 else:
109 return meth(data)
109 return meth(data)
110
110
111 def close(self):
111 def close(self):
112 return self._main.close()
112 return self._main.close()
113
113
114 def flush(self):
114 def flush(self):
115 return self._main.flush()
115 return self._main.flush()
116
116
117 def _cleanuppipes(ui, pipei, pipeo, pipee):
117 def _cleanuppipes(ui, pipei, pipeo, pipee):
118 """Clean up pipes used by an SSH connection."""
118 """Clean up pipes used by an SSH connection."""
119 if pipeo:
119 if pipeo:
120 pipeo.close()
120 pipeo.close()
121 if pipei:
121 if pipei:
122 pipei.close()
122 pipei.close()
123
123
124 if pipee:
124 if pipee:
125 # Try to read from the err descriptor until EOF.
125 # Try to read from the err descriptor until EOF.
126 try:
126 try:
127 for l in pipee:
127 for l in pipee:
128 ui.status(_('remote: '), l)
128 ui.status(_('remote: '), l)
129 except (IOError, ValueError):
129 except (IOError, ValueError):
130 pass
130 pass
131
131
132 pipee.close()
132 pipee.close()
133
133
134 class sshpeer(wireproto.wirepeer):
134 class sshpeer(wireproto.wirepeer):
135 def __init__(self, ui, path, create=False, sshstate=None):
135 def __init__(self, ui, path, create=False, sshstate=None):
136 self._url = path
136 self._url = path
137 self._ui = ui
137 self._ui = ui
138 self._pipeo = self._pipei = self._pipee = None
138 self._pipeo = self._pipei = self._pipee = None
139
139
140 u = util.url(path, parsequery=False, parsefragment=False)
140 u = util.url(path, parsequery=False, parsefragment=False)
141 self._path = u.path or '.'
141 self._path = u.path or '.'
142
142
143 self._validaterepo(*sshstate)
143 self._validaterepo(*sshstate)
144
144
145 # Begin of _basepeer interface.
145 # Begin of _basepeer interface.
146
146
147 @util.propertycache
147 @util.propertycache
148 def ui(self):
148 def ui(self):
149 return self._ui
149 return self._ui
150
150
151 def url(self):
151 def url(self):
152 return self._url
152 return self._url
153
153
154 def local(self):
154 def local(self):
155 return None
155 return None
156
156
157 def peer(self):
157 def peer(self):
158 return self
158 return self
159
159
160 def canpush(self):
160 def canpush(self):
161 return True
161 return True
162
162
163 def close(self):
163 def close(self):
164 pass
164 pass
165
165
166 # End of _basepeer interface.
166 # End of _basepeer interface.
167
167
168 # Begin of _basewirecommands interface.
168 # Begin of _basewirecommands interface.
169
169
170 def capabilities(self):
170 def capabilities(self):
171 return self._caps
171 return self._caps
172
172
173 # End of _basewirecommands interface.
173 # End of _basewirecommands interface.
174
174
175 def _validaterepo(self, sshcmd, args, remotecmd, sshenv=None):
175 def _validaterepo(self, sshcmd, args, remotecmd, sshenv=None):
176 # cleanup up previous run
176 assert self._pipei is None
177 self._cleanup()
178
177
179 cmd = '%s %s %s' % (sshcmd, args,
178 cmd = '%s %s %s' % (sshcmd, args,
180 util.shellquote("%s -R %s serve --stdio" %
179 util.shellquote("%s -R %s serve --stdio" %
181 (_serverquote(remotecmd), _serverquote(self._path))))
180 (_serverquote(remotecmd), _serverquote(self._path))))
182 self.ui.debug('running %s\n' % cmd)
181 self.ui.debug('running %s\n' % cmd)
183 cmd = util.quotecommand(cmd)
182 cmd = util.quotecommand(cmd)
184
183
185 # while self._subprocess isn't used, having it allows the subprocess to
184 # while self._subprocess isn't used, having it allows the subprocess to
186 # to clean up correctly later
185 # to clean up correctly later
187 #
186 #
188 # no buffer allow the use of 'select'
187 # no buffer allow the use of 'select'
189 # feel free to remove buffering and select usage when we ultimately
188 # feel free to remove buffering and select usage when we ultimately
190 # move to threading.
189 # move to threading.
191 sub = util.popen4(cmd, bufsize=0, env=sshenv)
190 sub = util.popen4(cmd, bufsize=0, env=sshenv)
192 self._pipeo, self._pipei, self._pipee, self._subprocess = sub
191 self._pipeo, self._pipei, self._pipee, self._subprocess = sub
193
192
194 self._pipei = util.bufferedinputpipe(self._pipei)
193 self._pipei = util.bufferedinputpipe(self._pipei)
195 self._pipei = doublepipe(self.ui, self._pipei, self._pipee)
194 self._pipei = doublepipe(self.ui, self._pipei, self._pipee)
196 self._pipeo = doublepipe(self.ui, self._pipeo, self._pipee)
195 self._pipeo = doublepipe(self.ui, self._pipeo, self._pipee)
197
196
198 def badresponse():
197 def badresponse():
199 msg = _("no suitable response from remote hg")
198 msg = _("no suitable response from remote hg")
200 hint = self.ui.config("ui", "ssherrorhint")
199 hint = self.ui.config("ui", "ssherrorhint")
201 self._abort(error.RepoError(msg, hint=hint))
200 self._abort(error.RepoError(msg, hint=hint))
202
201
203 try:
202 try:
204 # skip any noise generated by remote shell
203 # skip any noise generated by remote shell
205 self._callstream("hello")
204 self._callstream("hello")
206 r = self._callstream("between", pairs=("%s-%s" % ("0"*40, "0"*40)))
205 r = self._callstream("between", pairs=("%s-%s" % ("0"*40, "0"*40)))
207 except IOError:
206 except IOError:
208 badresponse()
207 badresponse()
209
208
210 lines = ["", "dummy"]
209 lines = ["", "dummy"]
211 max_noise = 500
210 max_noise = 500
212 while lines[-1] and max_noise:
211 while lines[-1] and max_noise:
213 try:
212 try:
214 l = r.readline()
213 l = r.readline()
215 self._readerr()
214 self._readerr()
216 if lines[-1] == "1\n" and l == "\n":
215 if lines[-1] == "1\n" and l == "\n":
217 break
216 break
218 if l:
217 if l:
219 self.ui.debug("remote: ", l)
218 self.ui.debug("remote: ", l)
220 lines.append(l)
219 lines.append(l)
221 max_noise -= 1
220 max_noise -= 1
222 except IOError:
221 except IOError:
223 badresponse()
222 badresponse()
224 else:
223 else:
225 badresponse()
224 badresponse()
226
225
227 self._caps = set()
226 self._caps = set()
228 for l in reversed(lines):
227 for l in reversed(lines):
229 if l.startswith("capabilities:"):
228 if l.startswith("capabilities:"):
230 self._caps.update(l[:-1].split(":")[1].split())
229 self._caps.update(l[:-1].split(":")[1].split())
231 break
230 break
232
231
233 def _readerr(self):
232 def _readerr(self):
234 _forwardoutput(self.ui, self._pipee)
233 _forwardoutput(self.ui, self._pipee)
235
234
236 def _abort(self, exception):
235 def _abort(self, exception):
237 self._cleanup()
236 self._cleanup()
238 raise exception
237 raise exception
239
238
240 def _cleanup(self):
239 def _cleanup(self):
241 _cleanuppipes(self.ui, self._pipei, self._pipeo, self._pipee)
240 _cleanuppipes(self.ui, self._pipei, self._pipeo, self._pipee)
242
241
243 __del__ = _cleanup
242 __del__ = _cleanup
244
243
245 def _submitbatch(self, req):
244 def _submitbatch(self, req):
246 rsp = self._callstream("batch", cmds=wireproto.encodebatchcmds(req))
245 rsp = self._callstream("batch", cmds=wireproto.encodebatchcmds(req))
247 available = self._getamount()
246 available = self._getamount()
248 # TODO this response parsing is probably suboptimal for large
247 # TODO this response parsing is probably suboptimal for large
249 # batches with large responses.
248 # batches with large responses.
250 toread = min(available, 1024)
249 toread = min(available, 1024)
251 work = rsp.read(toread)
250 work = rsp.read(toread)
252 available -= toread
251 available -= toread
253 chunk = work
252 chunk = work
254 while chunk:
253 while chunk:
255 while ';' in work:
254 while ';' in work:
256 one, work = work.split(';', 1)
255 one, work = work.split(';', 1)
257 yield wireproto.unescapearg(one)
256 yield wireproto.unescapearg(one)
258 toread = min(available, 1024)
257 toread = min(available, 1024)
259 chunk = rsp.read(toread)
258 chunk = rsp.read(toread)
260 available -= toread
259 available -= toread
261 work += chunk
260 work += chunk
262 yield wireproto.unescapearg(work)
261 yield wireproto.unescapearg(work)
263
262
264 def _callstream(self, cmd, **args):
263 def _callstream(self, cmd, **args):
265 args = pycompat.byteskwargs(args)
264 args = pycompat.byteskwargs(args)
266 if (self.ui.debugflag
265 if (self.ui.debugflag
267 and self.ui.configbool('devel', 'debug.peer-request')):
266 and self.ui.configbool('devel', 'debug.peer-request')):
268 dbg = self.ui.debug
267 dbg = self.ui.debug
269 line = 'devel-peer-request: %s\n'
268 line = 'devel-peer-request: %s\n'
270 dbg(line % cmd)
269 dbg(line % cmd)
271 for key, value in sorted(args.items()):
270 for key, value in sorted(args.items()):
272 if not isinstance(value, dict):
271 if not isinstance(value, dict):
273 dbg(line % ' %s: %d bytes' % (key, len(value)))
272 dbg(line % ' %s: %d bytes' % (key, len(value)))
274 else:
273 else:
275 for dk, dv in sorted(value.items()):
274 for dk, dv in sorted(value.items()):
276 dbg(line % ' %s-%s: %d' % (key, dk, len(dv)))
275 dbg(line % ' %s-%s: %d' % (key, dk, len(dv)))
277 self.ui.debug("sending %s command\n" % cmd)
276 self.ui.debug("sending %s command\n" % cmd)
278 self._pipeo.write("%s\n" % cmd)
277 self._pipeo.write("%s\n" % cmd)
279 _func, names = wireproto.commands[cmd]
278 _func, names = wireproto.commands[cmd]
280 keys = names.split()
279 keys = names.split()
281 wireargs = {}
280 wireargs = {}
282 for k in keys:
281 for k in keys:
283 if k == '*':
282 if k == '*':
284 wireargs['*'] = args
283 wireargs['*'] = args
285 break
284 break
286 else:
285 else:
287 wireargs[k] = args[k]
286 wireargs[k] = args[k]
288 del args[k]
287 del args[k]
289 for k, v in sorted(wireargs.iteritems()):
288 for k, v in sorted(wireargs.iteritems()):
290 self._pipeo.write("%s %d\n" % (k, len(v)))
289 self._pipeo.write("%s %d\n" % (k, len(v)))
291 if isinstance(v, dict):
290 if isinstance(v, dict):
292 for dk, dv in v.iteritems():
291 for dk, dv in v.iteritems():
293 self._pipeo.write("%s %d\n" % (dk, len(dv)))
292 self._pipeo.write("%s %d\n" % (dk, len(dv)))
294 self._pipeo.write(dv)
293 self._pipeo.write(dv)
295 else:
294 else:
296 self._pipeo.write(v)
295 self._pipeo.write(v)
297 self._pipeo.flush()
296 self._pipeo.flush()
298
297
299 return self._pipei
298 return self._pipei
300
299
301 def _callcompressable(self, cmd, **args):
300 def _callcompressable(self, cmd, **args):
302 return self._callstream(cmd, **args)
301 return self._callstream(cmd, **args)
303
302
304 def _call(self, cmd, **args):
303 def _call(self, cmd, **args):
305 self._callstream(cmd, **args)
304 self._callstream(cmd, **args)
306 return self._recv()
305 return self._recv()
307
306
308 def _callpush(self, cmd, fp, **args):
307 def _callpush(self, cmd, fp, **args):
309 r = self._call(cmd, **args)
308 r = self._call(cmd, **args)
310 if r:
309 if r:
311 return '', r
310 return '', r
312 for d in iter(lambda: fp.read(4096), ''):
311 for d in iter(lambda: fp.read(4096), ''):
313 self._send(d)
312 self._send(d)
314 self._send("", flush=True)
313 self._send("", flush=True)
315 r = self._recv()
314 r = self._recv()
316 if r:
315 if r:
317 return '', r
316 return '', r
318 return self._recv(), ''
317 return self._recv(), ''
319
318
320 def _calltwowaystream(self, cmd, fp, **args):
319 def _calltwowaystream(self, cmd, fp, **args):
321 r = self._call(cmd, **args)
320 r = self._call(cmd, **args)
322 if r:
321 if r:
323 # XXX needs to be made better
322 # XXX needs to be made better
324 raise error.Abort(_('unexpected remote reply: %s') % r)
323 raise error.Abort(_('unexpected remote reply: %s') % r)
325 for d in iter(lambda: fp.read(4096), ''):
324 for d in iter(lambda: fp.read(4096), ''):
326 self._send(d)
325 self._send(d)
327 self._send("", flush=True)
326 self._send("", flush=True)
328 return self._pipei
327 return self._pipei
329
328
330 def _getamount(self):
329 def _getamount(self):
331 l = self._pipei.readline()
330 l = self._pipei.readline()
332 if l == '\n':
331 if l == '\n':
333 self._readerr()
332 self._readerr()
334 msg = _('check previous remote output')
333 msg = _('check previous remote output')
335 self._abort(error.OutOfBandError(hint=msg))
334 self._abort(error.OutOfBandError(hint=msg))
336 self._readerr()
335 self._readerr()
337 try:
336 try:
338 return int(l)
337 return int(l)
339 except ValueError:
338 except ValueError:
340 self._abort(error.ResponseError(_("unexpected response:"), l))
339 self._abort(error.ResponseError(_("unexpected response:"), l))
341
340
342 def _recv(self):
341 def _recv(self):
343 return self._pipei.read(self._getamount())
342 return self._pipei.read(self._getamount())
344
343
345 def _send(self, data, flush=False):
344 def _send(self, data, flush=False):
346 self._pipeo.write("%d\n" % len(data))
345 self._pipeo.write("%d\n" % len(data))
347 if data:
346 if data:
348 self._pipeo.write(data)
347 self._pipeo.write(data)
349 if flush:
348 if flush:
350 self._pipeo.flush()
349 self._pipeo.flush()
351 self._readerr()
350 self._readerr()
352
351
353 def instance(ui, path, create):
352 def instance(ui, path, create):
354 """Create an SSH peer.
353 """Create an SSH peer.
355
354
356 The returned object conforms to the ``wireproto.wirepeer`` interface.
355 The returned object conforms to the ``wireproto.wirepeer`` interface.
357 """
356 """
358 u = util.url(path, parsequery=False, parsefragment=False)
357 u = util.url(path, parsequery=False, parsefragment=False)
359 if u.scheme != 'ssh' or not u.host or u.path is None:
358 if u.scheme != 'ssh' or not u.host or u.path is None:
360 raise error.RepoError(_("couldn't parse location %s") % path)
359 raise error.RepoError(_("couldn't parse location %s") % path)
361
360
362 util.checksafessh(path)
361 util.checksafessh(path)
363
362
364 if u.passwd is not None:
363 if u.passwd is not None:
365 raise error.RepoError(_('password in URL not supported'))
364 raise error.RepoError(_('password in URL not supported'))
366
365
367 sshcmd = ui.config('ui', 'ssh')
366 sshcmd = ui.config('ui', 'ssh')
368 remotecmd = ui.config('ui', 'remotecmd')
367 remotecmd = ui.config('ui', 'remotecmd')
369 sshaddenv = dict(ui.configitems('sshenv'))
368 sshaddenv = dict(ui.configitems('sshenv'))
370 sshenv = util.shellenviron(sshaddenv)
369 sshenv = util.shellenviron(sshaddenv)
371 remotepath = u.path or '.'
370 remotepath = u.path or '.'
372
371
373 args = util.sshargs(sshcmd, u.host, u.user, u.port)
372 args = util.sshargs(sshcmd, u.host, u.user, u.port)
374
373
375 if create:
374 if create:
376 cmd = '%s %s %s' % (sshcmd, args,
375 cmd = '%s %s %s' % (sshcmd, args,
377 util.shellquote('%s init %s' %
376 util.shellquote('%s init %s' %
378 (_serverquote(remotecmd), _serverquote(remotepath))))
377 (_serverquote(remotecmd), _serverquote(remotepath))))
379 ui.debug('running %s\n' % cmd)
378 ui.debug('running %s\n' % cmd)
380 res = ui.system(cmd, blockedtag='sshpeer', environ=sshenv)
379 res = ui.system(cmd, blockedtag='sshpeer', environ=sshenv)
381 if res != 0:
380 if res != 0:
382 raise error.RepoError(_('could not create remote repo'))
381 raise error.RepoError(_('could not create remote repo'))
383
382
384 sshstate = (sshcmd, args, remotecmd, sshenv)
383 sshstate = (sshcmd, args, remotecmd, sshenv)
385
384
386 return sshpeer(ui, path, create=create, sshstate=sshstate)
385 return sshpeer(ui, path, create=create, sshstate=sshstate)
General Comments 0
You need to be logged in to leave comments. Login now