##// END OF EJS Templates
sshpeer: make instance attributes and methods internal...
Gregory Szorc -
r33763:82d564d5 default
parent child Browse files
Show More
@@ -1,324 +1,325
1 # sshpeer.py - ssh repository proxy class for mercurial
1 # sshpeer.py - ssh repository proxy class for mercurial
2 #
2 #
3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import re
10 import re
11
11
12 from .i18n import _
12 from .i18n import _
13 from . import (
13 from . import (
14 error,
14 error,
15 pycompat,
15 pycompat,
16 util,
16 util,
17 wireproto,
17 wireproto,
18 )
18 )
19
19
20 def _serverquote(s):
20 def _serverquote(s):
21 if not s:
21 if not s:
22 return s
22 return s
23 '''quote a string for the remote shell ... which we assume is sh'''
23 '''quote a string for the remote shell ... which we assume is sh'''
24 if re.match('[a-zA-Z0-9@%_+=:,./-]*$', s):
24 if re.match('[a-zA-Z0-9@%_+=:,./-]*$', s):
25 return s
25 return s
26 return "'%s'" % s.replace("'", "'\\''")
26 return "'%s'" % s.replace("'", "'\\''")
27
27
28 def _forwardoutput(ui, pipe):
28 def _forwardoutput(ui, pipe):
29 """display all data currently available on pipe as remote output.
29 """display all data currently available on pipe as remote output.
30
30
31 This is non blocking."""
31 This is non blocking."""
32 s = util.readpipe(pipe)
32 s = util.readpipe(pipe)
33 if s:
33 if s:
34 for l in s.splitlines():
34 for l in s.splitlines():
35 ui.status(_("remote: "), l, '\n')
35 ui.status(_("remote: "), l, '\n')
36
36
37 class doublepipe(object):
37 class doublepipe(object):
38 """Operate a side-channel pipe in addition of a main one
38 """Operate a side-channel pipe in addition of a main one
39
39
40 The side-channel pipe contains server output to be forwarded to the user
40 The side-channel pipe contains server output to be forwarded to the user
41 input. The double pipe will behave as the "main" pipe, but will ensure the
41 input. The double pipe will behave as the "main" pipe, but will ensure the
42 content of the "side" pipe is properly processed while we wait for blocking
42 content of the "side" pipe is properly processed while we wait for blocking
43 call on the "main" pipe.
43 call on the "main" pipe.
44
44
45 If large amounts of data are read from "main", the forward will cease after
45 If large amounts of data are read from "main", the forward will cease after
46 the first bytes start to appear. This simplifies the implementation
46 the first bytes start to appear. This simplifies the implementation
47 without affecting actual output of sshpeer too much as we rarely issue
47 without affecting actual output of sshpeer too much as we rarely issue
48 large read for data not yet emitted by the server.
48 large read for data not yet emitted by the server.
49
49
50 The main pipe is expected to be a 'bufferedinputpipe' from the util module
50 The main pipe is expected to be a 'bufferedinputpipe' from the util module
51 that handle all the os specific bits. This class lives in this module
51 that handle all the os specific bits. This class lives in this module
52 because it focus on behavior specific to the ssh protocol."""
52 because it focus on behavior specific to the ssh protocol."""
53
53
54 def __init__(self, ui, main, side):
54 def __init__(self, ui, main, side):
55 self._ui = ui
55 self._ui = ui
56 self._main = main
56 self._main = main
57 self._side = side
57 self._side = side
58
58
59 def _wait(self):
59 def _wait(self):
60 """wait until some data are available on main or side
60 """wait until some data are available on main or side
61
61
62 return a pair of boolean (ismainready, issideready)
62 return a pair of boolean (ismainready, issideready)
63
63
64 (This will only wait for data if the setup is supported by `util.poll`)
64 (This will only wait for data if the setup is supported by `util.poll`)
65 """
65 """
66 if getattr(self._main, 'hasbuffer', False): # getattr for classic pipe
66 if getattr(self._main, 'hasbuffer', False): # getattr for classic pipe
67 return (True, True) # main has data, assume side is worth poking at.
67 return (True, True) # main has data, assume side is worth poking at.
68 fds = [self._main.fileno(), self._side.fileno()]
68 fds = [self._main.fileno(), self._side.fileno()]
69 try:
69 try:
70 act = util.poll(fds)
70 act = util.poll(fds)
71 except NotImplementedError:
71 except NotImplementedError:
72 # non supported yet case, assume all have data.
72 # non supported yet case, assume all have data.
73 act = fds
73 act = fds
74 return (self._main.fileno() in act, self._side.fileno() in act)
74 return (self._main.fileno() in act, self._side.fileno() in act)
75
75
76 def write(self, data):
76 def write(self, data):
77 return self._call('write', data)
77 return self._call('write', data)
78
78
79 def read(self, size):
79 def read(self, size):
80 r = self._call('read', size)
80 r = self._call('read', size)
81 if size != 0 and not r:
81 if size != 0 and not r:
82 # We've observed a condition that indicates the
82 # We've observed a condition that indicates the
83 # stdout closed unexpectedly. Check stderr one
83 # stdout closed unexpectedly. Check stderr one
84 # more time and snag anything that's there before
84 # more time and snag anything that's there before
85 # letting anyone know the main part of the pipe
85 # letting anyone know the main part of the pipe
86 # closed prematurely.
86 # closed prematurely.
87 _forwardoutput(self._ui, self._side)
87 _forwardoutput(self._ui, self._side)
88 return r
88 return r
89
89
90 def readline(self):
90 def readline(self):
91 return self._call('readline')
91 return self._call('readline')
92
92
93 def _call(self, methname, data=None):
93 def _call(self, methname, data=None):
94 """call <methname> on "main", forward output of "side" while blocking
94 """call <methname> on "main", forward output of "side" while blocking
95 """
95 """
96 # data can be '' or 0
96 # data can be '' or 0
97 if (data is not None and not data) or self._main.closed:
97 if (data is not None and not data) or self._main.closed:
98 _forwardoutput(self._ui, self._side)
98 _forwardoutput(self._ui, self._side)
99 return ''
99 return ''
100 while True:
100 while True:
101 mainready, sideready = self._wait()
101 mainready, sideready = self._wait()
102 if sideready:
102 if sideready:
103 _forwardoutput(self._ui, self._side)
103 _forwardoutput(self._ui, self._side)
104 if mainready:
104 if mainready:
105 meth = getattr(self._main, methname)
105 meth = getattr(self._main, methname)
106 if data is None:
106 if data is None:
107 return meth()
107 return meth()
108 else:
108 else:
109 return meth(data)
109 return meth(data)
110
110
111 def close(self):
111 def close(self):
112 return self._main.close()
112 return self._main.close()
113
113
114 def flush(self):
114 def flush(self):
115 return self._main.flush()
115 return self._main.flush()
116
116
117 class sshpeer(wireproto.wirepeer):
117 class sshpeer(wireproto.wirepeer):
118 def __init__(self, ui, path, create=False):
118 def __init__(self, ui, path, create=False):
119 self._url = path
119 self._url = path
120 self.ui = ui
120 self.ui = ui
121 self.pipeo = self.pipei = self.pipee = None
121 self._pipeo = self._pipei = self._pipee = None
122
122
123 u = util.url(path, parsequery=False, parsefragment=False)
123 u = util.url(path, parsequery=False, parsefragment=False)
124 if u.scheme != 'ssh' or not u.host or u.path is None:
124 if u.scheme != 'ssh' or not u.host or u.path is None:
125 self._abort(error.RepoError(_("couldn't parse location %s") % path))
125 self._abort(error.RepoError(_("couldn't parse location %s") % path))
126
126
127 util.checksafessh(path)
127 util.checksafessh(path)
128
128
129 self.user = u.user
130 if u.passwd is not None:
129 if u.passwd is not None:
131 self._abort(error.RepoError(_("password in URL not supported")))
130 self._abort(error.RepoError(_("password in URL not supported")))
132 self.host = u.host
131
133 self.port = u.port
132 self._user = u.user
134 self.path = u.path or "."
133 self._host = u.host
134 self._port = u.port
135 self._path = u.path or '.'
135
136
136 sshcmd = self.ui.config("ui", "ssh")
137 sshcmd = self.ui.config("ui", "ssh")
137 remotecmd = self.ui.config("ui", "remotecmd")
138 remotecmd = self.ui.config("ui", "remotecmd")
138
139
139 args = util.sshargs(sshcmd, self.host, self.user, self.port)
140 args = util.sshargs(sshcmd, self._host, self._user, self._port)
140
141
141 if create:
142 if create:
142 cmd = '%s %s %s' % (sshcmd, args,
143 cmd = '%s %s %s' % (sshcmd, args,
143 util.shellquote("%s init %s" %
144 util.shellquote("%s init %s" %
144 (_serverquote(remotecmd), _serverquote(self.path))))
145 (_serverquote(remotecmd), _serverquote(self._path))))
145 ui.debug('running %s\n' % cmd)
146 ui.debug('running %s\n' % cmd)
146 res = ui.system(cmd, blockedtag='sshpeer')
147 res = ui.system(cmd, blockedtag='sshpeer')
147 if res != 0:
148 if res != 0:
148 self._abort(error.RepoError(_("could not create remote repo")))
149 self._abort(error.RepoError(_("could not create remote repo")))
149
150
150 self._validaterepo(sshcmd, args, remotecmd)
151 self._validaterepo(sshcmd, args, remotecmd)
151
152
152 def url(self):
153 def url(self):
153 return self._url
154 return self._url
154
155
155 def _validaterepo(self, sshcmd, args, remotecmd):
156 def _validaterepo(self, sshcmd, args, remotecmd):
156 # cleanup up previous run
157 # cleanup up previous run
157 self.cleanup()
158 self._cleanup()
158
159
159 cmd = '%s %s %s' % (sshcmd, args,
160 cmd = '%s %s %s' % (sshcmd, args,
160 util.shellquote("%s -R %s serve --stdio" %
161 util.shellquote("%s -R %s serve --stdio" %
161 (_serverquote(remotecmd), _serverquote(self.path))))
162 (_serverquote(remotecmd), _serverquote(self._path))))
162 self.ui.debug('running %s\n' % cmd)
163 self.ui.debug('running %s\n' % cmd)
163 cmd = util.quotecommand(cmd)
164 cmd = util.quotecommand(cmd)
164
165
165 # while self.subprocess isn't used, having it allows the subprocess to
166 # while self._subprocess isn't used, having it allows the subprocess to
166 # to clean up correctly later
167 # to clean up correctly later
167 #
168 #
168 # no buffer allow the use of 'select'
169 # no buffer allow the use of 'select'
169 # feel free to remove buffering and select usage when we ultimately
170 # feel free to remove buffering and select usage when we ultimately
170 # move to threading.
171 # move to threading.
171 sub = util.popen4(cmd, bufsize=0)
172 sub = util.popen4(cmd, bufsize=0)
172 self.pipeo, self.pipei, self.pipee, self.subprocess = sub
173 self._pipeo, self._pipei, self._pipee, self._subprocess = sub
173
174
174 self.pipei = util.bufferedinputpipe(self.pipei)
175 self._pipei = util.bufferedinputpipe(self._pipei)
175 self.pipei = doublepipe(self.ui, self.pipei, self.pipee)
176 self._pipei = doublepipe(self.ui, self._pipei, self._pipee)
176 self.pipeo = doublepipe(self.ui, self.pipeo, self.pipee)
177 self._pipeo = doublepipe(self.ui, self._pipeo, self._pipee)
177
178
178 # skip any noise generated by remote shell
179 # skip any noise generated by remote shell
179 self._callstream("hello")
180 self._callstream("hello")
180 r = self._callstream("between", pairs=("%s-%s" % ("0"*40, "0"*40)))
181 r = self._callstream("between", pairs=("%s-%s" % ("0"*40, "0"*40)))
181 lines = ["", "dummy"]
182 lines = ["", "dummy"]
182 max_noise = 500
183 max_noise = 500
183 while lines[-1] and max_noise:
184 while lines[-1] and max_noise:
184 l = r.readline()
185 l = r.readline()
185 self.readerr()
186 self._readerr()
186 if lines[-1] == "1\n" and l == "\n":
187 if lines[-1] == "1\n" and l == "\n":
187 break
188 break
188 if l:
189 if l:
189 self.ui.debug("remote: ", l)
190 self.ui.debug("remote: ", l)
190 lines.append(l)
191 lines.append(l)
191 max_noise -= 1
192 max_noise -= 1
192 else:
193 else:
193 self._abort(error.RepoError(_('no suitable response from '
194 self._abort(error.RepoError(_('no suitable response from '
194 'remote hg')))
195 'remote hg')))
195
196
196 self._caps = set()
197 self._caps = set()
197 for l in reversed(lines):
198 for l in reversed(lines):
198 if l.startswith("capabilities:"):
199 if l.startswith("capabilities:"):
199 self._caps.update(l[:-1].split(":")[1].split())
200 self._caps.update(l[:-1].split(":")[1].split())
200 break
201 break
201
202
202 def _capabilities(self):
203 def _capabilities(self):
203 return self._caps
204 return self._caps
204
205
205 def readerr(self):
206 def _readerr(self):
206 _forwardoutput(self.ui, self.pipee)
207 _forwardoutput(self.ui, self._pipee)
207
208
208 def _abort(self, exception):
209 def _abort(self, exception):
209 self.cleanup()
210 self._cleanup()
210 raise exception
211 raise exception
211
212
212 def cleanup(self):
213 def _cleanup(self):
213 if self.pipeo is None:
214 if self._pipeo is None:
214 return
215 return
215 self.pipeo.close()
216 self._pipeo.close()
216 self.pipei.close()
217 self._pipei.close()
217 try:
218 try:
218 # read the error descriptor until EOF
219 # read the error descriptor until EOF
219 for l in self.pipee:
220 for l in self._pipee:
220 self.ui.status(_("remote: "), l)
221 self.ui.status(_("remote: "), l)
221 except (IOError, ValueError):
222 except (IOError, ValueError):
222 pass
223 pass
223 self.pipee.close()
224 self._pipee.close()
224
225
225 __del__ = cleanup
226 __del__ = _cleanup
226
227
227 def _submitbatch(self, req):
228 def _submitbatch(self, req):
228 rsp = self._callstream("batch", cmds=wireproto.encodebatchcmds(req))
229 rsp = self._callstream("batch", cmds=wireproto.encodebatchcmds(req))
229 available = self._getamount()
230 available = self._getamount()
230 # TODO this response parsing is probably suboptimal for large
231 # TODO this response parsing is probably suboptimal for large
231 # batches with large responses.
232 # batches with large responses.
232 toread = min(available, 1024)
233 toread = min(available, 1024)
233 work = rsp.read(toread)
234 work = rsp.read(toread)
234 available -= toread
235 available -= toread
235 chunk = work
236 chunk = work
236 while chunk:
237 while chunk:
237 while ';' in work:
238 while ';' in work:
238 one, work = work.split(';', 1)
239 one, work = work.split(';', 1)
239 yield wireproto.unescapearg(one)
240 yield wireproto.unescapearg(one)
240 toread = min(available, 1024)
241 toread = min(available, 1024)
241 chunk = rsp.read(toread)
242 chunk = rsp.read(toread)
242 available -= toread
243 available -= toread
243 work += chunk
244 work += chunk
244 yield wireproto.unescapearg(work)
245 yield wireproto.unescapearg(work)
245
246
246 def _callstream(self, cmd, **args):
247 def _callstream(self, cmd, **args):
247 args = pycompat.byteskwargs(args)
248 args = pycompat.byteskwargs(args)
248 self.ui.debug("sending %s command\n" % cmd)
249 self.ui.debug("sending %s command\n" % cmd)
249 self.pipeo.write("%s\n" % cmd)
250 self._pipeo.write("%s\n" % cmd)
250 _func, names = wireproto.commands[cmd]
251 _func, names = wireproto.commands[cmd]
251 keys = names.split()
252 keys = names.split()
252 wireargs = {}
253 wireargs = {}
253 for k in keys:
254 for k in keys:
254 if k == '*':
255 if k == '*':
255 wireargs['*'] = args
256 wireargs['*'] = args
256 break
257 break
257 else:
258 else:
258 wireargs[k] = args[k]
259 wireargs[k] = args[k]
259 del args[k]
260 del args[k]
260 for k, v in sorted(wireargs.iteritems()):
261 for k, v in sorted(wireargs.iteritems()):
261 self.pipeo.write("%s %d\n" % (k, len(v)))
262 self._pipeo.write("%s %d\n" % (k, len(v)))
262 if isinstance(v, dict):
263 if isinstance(v, dict):
263 for dk, dv in v.iteritems():
264 for dk, dv in v.iteritems():
264 self.pipeo.write("%s %d\n" % (dk, len(dv)))
265 self._pipeo.write("%s %d\n" % (dk, len(dv)))
265 self.pipeo.write(dv)
266 self._pipeo.write(dv)
266 else:
267 else:
267 self.pipeo.write(v)
268 self._pipeo.write(v)
268 self.pipeo.flush()
269 self._pipeo.flush()
269
270
270 return self.pipei
271 return self._pipei
271
272
272 def _callcompressable(self, cmd, **args):
273 def _callcompressable(self, cmd, **args):
273 return self._callstream(cmd, **args)
274 return self._callstream(cmd, **args)
274
275
275 def _call(self, cmd, **args):
276 def _call(self, cmd, **args):
276 self._callstream(cmd, **args)
277 self._callstream(cmd, **args)
277 return self._recv()
278 return self._recv()
278
279
279 def _callpush(self, cmd, fp, **args):
280 def _callpush(self, cmd, fp, **args):
280 r = self._call(cmd, **args)
281 r = self._call(cmd, **args)
281 if r:
282 if r:
282 return '', r
283 return '', r
283 for d in iter(lambda: fp.read(4096), ''):
284 for d in iter(lambda: fp.read(4096), ''):
284 self._send(d)
285 self._send(d)
285 self._send("", flush=True)
286 self._send("", flush=True)
286 r = self._recv()
287 r = self._recv()
287 if r:
288 if r:
288 return '', r
289 return '', r
289 return self._recv(), ''
290 return self._recv(), ''
290
291
291 def _calltwowaystream(self, cmd, fp, **args):
292 def _calltwowaystream(self, cmd, fp, **args):
292 r = self._call(cmd, **args)
293 r = self._call(cmd, **args)
293 if r:
294 if r:
294 # XXX needs to be made better
295 # XXX needs to be made better
295 raise error.Abort(_('unexpected remote reply: %s') % r)
296 raise error.Abort(_('unexpected remote reply: %s') % r)
296 for d in iter(lambda: fp.read(4096), ''):
297 for d in iter(lambda: fp.read(4096), ''):
297 self._send(d)
298 self._send(d)
298 self._send("", flush=True)
299 self._send("", flush=True)
299 return self.pipei
300 return self._pipei
300
301
301 def _getamount(self):
302 def _getamount(self):
302 l = self.pipei.readline()
303 l = self._pipei.readline()
303 if l == '\n':
304 if l == '\n':
304 self.readerr()
305 self._readerr()
305 msg = _('check previous remote output')
306 msg = _('check previous remote output')
306 self._abort(error.OutOfBandError(hint=msg))
307 self._abort(error.OutOfBandError(hint=msg))
307 self.readerr()
308 self._readerr()
308 try:
309 try:
309 return int(l)
310 return int(l)
310 except ValueError:
311 except ValueError:
311 self._abort(error.ResponseError(_("unexpected response:"), l))
312 self._abort(error.ResponseError(_("unexpected response:"), l))
312
313
313 def _recv(self):
314 def _recv(self):
314 return self.pipei.read(self._getamount())
315 return self._pipei.read(self._getamount())
315
316
316 def _send(self, data, flush=False):
317 def _send(self, data, flush=False):
317 self.pipeo.write("%d\n" % len(data))
318 self._pipeo.write("%d\n" % len(data))
318 if data:
319 if data:
319 self.pipeo.write(data)
320 self._pipeo.write(data)
320 if flush:
321 if flush:
321 self.pipeo.flush()
322 self._pipeo.flush()
322 self.readerr()
323 self._readerr()
323
324
324 instance = sshpeer
325 instance = sshpeer
General Comments 0
You need to be logged in to leave comments. Login now