##// END OF EJS Templates
sshpeer: make "instance" a function...
Gregory Szorc -
r35946:b0d2885c default
parent child Browse files
Show More
@@ -1,373 +1,374 b''
1 # sshpeer.py - ssh repository proxy class for mercurial
1 # sshpeer.py - ssh repository proxy class for mercurial
2 #
2 #
3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import re
10 import re
11
11
12 from .i18n import _
12 from .i18n import _
13 from . import (
13 from . import (
14 error,
14 error,
15 pycompat,
15 pycompat,
16 util,
16 util,
17 wireproto,
17 wireproto,
18 )
18 )
19
19
20 def _serverquote(s):
20 def _serverquote(s):
21 """quote a string for the remote shell ... which we assume is sh"""
21 """quote a string for the remote shell ... which we assume is sh"""
22 if not s:
22 if not s:
23 return s
23 return s
24 if re.match('[a-zA-Z0-9@%_+=:,./-]*$', s):
24 if re.match('[a-zA-Z0-9@%_+=:,./-]*$', s):
25 return s
25 return s
26 return "'%s'" % s.replace("'", "'\\''")
26 return "'%s'" % s.replace("'", "'\\''")
27
27
28 def _forwardoutput(ui, pipe):
28 def _forwardoutput(ui, pipe):
29 """display all data currently available on pipe as remote output.
29 """display all data currently available on pipe as remote output.
30
30
31 This is non blocking."""
31 This is non blocking."""
32 s = util.readpipe(pipe)
32 s = util.readpipe(pipe)
33 if s:
33 if s:
34 for l in s.splitlines():
34 for l in s.splitlines():
35 ui.status(_("remote: "), l, '\n')
35 ui.status(_("remote: "), l, '\n')
36
36
37 class doublepipe(object):
37 class doublepipe(object):
38 """Operate a side-channel pipe in addition of a main one
38 """Operate a side-channel pipe in addition of a main one
39
39
40 The side-channel pipe contains server output to be forwarded to the user
40 The side-channel pipe contains server output to be forwarded to the user
41 input. The double pipe will behave as the "main" pipe, but will ensure the
41 input. The double pipe will behave as the "main" pipe, but will ensure the
42 content of the "side" pipe is properly processed while we wait for blocking
42 content of the "side" pipe is properly processed while we wait for blocking
43 call on the "main" pipe.
43 call on the "main" pipe.
44
44
45 If large amounts of data are read from "main", the forward will cease after
45 If large amounts of data are read from "main", the forward will cease after
46 the first bytes start to appear. This simplifies the implementation
46 the first bytes start to appear. This simplifies the implementation
47 without affecting actual output of sshpeer too much as we rarely issue
47 without affecting actual output of sshpeer too much as we rarely issue
48 large read for data not yet emitted by the server.
48 large read for data not yet emitted by the server.
49
49
50 The main pipe is expected to be a 'bufferedinputpipe' from the util module
50 The main pipe is expected to be a 'bufferedinputpipe' from the util module
51 that handle all the os specific bits. This class lives in this module
51 that handle all the os specific bits. This class lives in this module
52 because it focus on behavior specific to the ssh protocol."""
52 because it focus on behavior specific to the ssh protocol."""
53
53
54 def __init__(self, ui, main, side):
54 def __init__(self, ui, main, side):
55 self._ui = ui
55 self._ui = ui
56 self._main = main
56 self._main = main
57 self._side = side
57 self._side = side
58
58
59 def _wait(self):
59 def _wait(self):
60 """wait until some data are available on main or side
60 """wait until some data are available on main or side
61
61
62 return a pair of boolean (ismainready, issideready)
62 return a pair of boolean (ismainready, issideready)
63
63
64 (This will only wait for data if the setup is supported by `util.poll`)
64 (This will only wait for data if the setup is supported by `util.poll`)
65 """
65 """
66 if getattr(self._main, 'hasbuffer', False): # getattr for classic pipe
66 if getattr(self._main, 'hasbuffer', False): # getattr for classic pipe
67 return (True, True) # main has data, assume side is worth poking at.
67 return (True, True) # main has data, assume side is worth poking at.
68 fds = [self._main.fileno(), self._side.fileno()]
68 fds = [self._main.fileno(), self._side.fileno()]
69 try:
69 try:
70 act = util.poll(fds)
70 act = util.poll(fds)
71 except NotImplementedError:
71 except NotImplementedError:
72 # non supported yet case, assume all have data.
72 # non supported yet case, assume all have data.
73 act = fds
73 act = fds
74 return (self._main.fileno() in act, self._side.fileno() in act)
74 return (self._main.fileno() in act, self._side.fileno() in act)
75
75
76 def write(self, data):
76 def write(self, data):
77 return self._call('write', data)
77 return self._call('write', data)
78
78
79 def read(self, size):
79 def read(self, size):
80 r = self._call('read', size)
80 r = self._call('read', size)
81 if size != 0 and not r:
81 if size != 0 and not r:
82 # We've observed a condition that indicates the
82 # We've observed a condition that indicates the
83 # stdout closed unexpectedly. Check stderr one
83 # stdout closed unexpectedly. Check stderr one
84 # more time and snag anything that's there before
84 # more time and snag anything that's there before
85 # letting anyone know the main part of the pipe
85 # letting anyone know the main part of the pipe
86 # closed prematurely.
86 # closed prematurely.
87 _forwardoutput(self._ui, self._side)
87 _forwardoutput(self._ui, self._side)
88 return r
88 return r
89
89
90 def readline(self):
90 def readline(self):
91 return self._call('readline')
91 return self._call('readline')
92
92
93 def _call(self, methname, data=None):
93 def _call(self, methname, data=None):
94 """call <methname> on "main", forward output of "side" while blocking
94 """call <methname> on "main", forward output of "side" while blocking
95 """
95 """
96 # data can be '' or 0
96 # data can be '' or 0
97 if (data is not None and not data) or self._main.closed:
97 if (data is not None and not data) or self._main.closed:
98 _forwardoutput(self._ui, self._side)
98 _forwardoutput(self._ui, self._side)
99 return ''
99 return ''
100 while True:
100 while True:
101 mainready, sideready = self._wait()
101 mainready, sideready = self._wait()
102 if sideready:
102 if sideready:
103 _forwardoutput(self._ui, self._side)
103 _forwardoutput(self._ui, self._side)
104 if mainready:
104 if mainready:
105 meth = getattr(self._main, methname)
105 meth = getattr(self._main, methname)
106 if data is None:
106 if data is None:
107 return meth()
107 return meth()
108 else:
108 else:
109 return meth(data)
109 return meth(data)
110
110
111 def close(self):
111 def close(self):
112 return self._main.close()
112 return self._main.close()
113
113
114 def flush(self):
114 def flush(self):
115 return self._main.flush()
115 return self._main.flush()
116
116
117 class sshpeer(wireproto.wirepeer):
117 class sshpeer(wireproto.wirepeer):
118 def __init__(self, ui, path, create=False):
118 def __init__(self, ui, path, create=False):
119 self._url = path
119 self._url = path
120 self._ui = ui
120 self._ui = ui
121 self._pipeo = self._pipei = self._pipee = None
121 self._pipeo = self._pipei = self._pipee = None
122
122
123 u = util.url(path, parsequery=False, parsefragment=False)
123 u = util.url(path, parsequery=False, parsefragment=False)
124 if u.scheme != 'ssh' or not u.host or u.path is None:
124 if u.scheme != 'ssh' or not u.host or u.path is None:
125 self._abort(error.RepoError(_("couldn't parse location %s") % path))
125 self._abort(error.RepoError(_("couldn't parse location %s") % path))
126
126
127 util.checksafessh(path)
127 util.checksafessh(path)
128
128
129 if u.passwd is not None:
129 if u.passwd is not None:
130 self._abort(error.RepoError(_("password in URL not supported")))
130 self._abort(error.RepoError(_("password in URL not supported")))
131
131
132 self._user = u.user
132 self._user = u.user
133 self._host = u.host
133 self._host = u.host
134 self._port = u.port
134 self._port = u.port
135 self._path = u.path or '.'
135 self._path = u.path or '.'
136
136
137 sshcmd = self.ui.config("ui", "ssh")
137 sshcmd = self.ui.config("ui", "ssh")
138 remotecmd = self.ui.config("ui", "remotecmd")
138 remotecmd = self.ui.config("ui", "remotecmd")
139 sshaddenv = dict(self.ui.configitems("sshenv"))
139 sshaddenv = dict(self.ui.configitems("sshenv"))
140 sshenv = util.shellenviron(sshaddenv)
140 sshenv = util.shellenviron(sshaddenv)
141
141
142 args = util.sshargs(sshcmd, self._host, self._user, self._port)
142 args = util.sshargs(sshcmd, self._host, self._user, self._port)
143
143
144 if create:
144 if create:
145 cmd = '%s %s %s' % (sshcmd, args,
145 cmd = '%s %s %s' % (sshcmd, args,
146 util.shellquote("%s init %s" %
146 util.shellquote("%s init %s" %
147 (_serverquote(remotecmd), _serverquote(self._path))))
147 (_serverquote(remotecmd), _serverquote(self._path))))
148 ui.debug('running %s\n' % cmd)
148 ui.debug('running %s\n' % cmd)
149 res = ui.system(cmd, blockedtag='sshpeer', environ=sshenv)
149 res = ui.system(cmd, blockedtag='sshpeer', environ=sshenv)
150 if res != 0:
150 if res != 0:
151 self._abort(error.RepoError(_("could not create remote repo")))
151 self._abort(error.RepoError(_("could not create remote repo")))
152
152
153 self._validaterepo(sshcmd, args, remotecmd, sshenv)
153 self._validaterepo(sshcmd, args, remotecmd, sshenv)
154
154
155 # Begin of _basepeer interface.
155 # Begin of _basepeer interface.
156
156
157 @util.propertycache
157 @util.propertycache
158 def ui(self):
158 def ui(self):
159 return self._ui
159 return self._ui
160
160
161 def url(self):
161 def url(self):
162 return self._url
162 return self._url
163
163
164 def local(self):
164 def local(self):
165 return None
165 return None
166
166
167 def peer(self):
167 def peer(self):
168 return self
168 return self
169
169
170 def canpush(self):
170 def canpush(self):
171 return True
171 return True
172
172
173 def close(self):
173 def close(self):
174 pass
174 pass
175
175
176 # End of _basepeer interface.
176 # End of _basepeer interface.
177
177
178 # Begin of _basewirecommands interface.
178 # Begin of _basewirecommands interface.
179
179
180 def capabilities(self):
180 def capabilities(self):
181 return self._caps
181 return self._caps
182
182
183 # End of _basewirecommands interface.
183 # End of _basewirecommands interface.
184
184
185 def _validaterepo(self, sshcmd, args, remotecmd, sshenv=None):
185 def _validaterepo(self, sshcmd, args, remotecmd, sshenv=None):
186 # cleanup up previous run
186 # cleanup up previous run
187 self._cleanup()
187 self._cleanup()
188
188
189 cmd = '%s %s %s' % (sshcmd, args,
189 cmd = '%s %s %s' % (sshcmd, args,
190 util.shellquote("%s -R %s serve --stdio" %
190 util.shellquote("%s -R %s serve --stdio" %
191 (_serverquote(remotecmd), _serverquote(self._path))))
191 (_serverquote(remotecmd), _serverquote(self._path))))
192 self.ui.debug('running %s\n' % cmd)
192 self.ui.debug('running %s\n' % cmd)
193 cmd = util.quotecommand(cmd)
193 cmd = util.quotecommand(cmd)
194
194
195 # while self._subprocess isn't used, having it allows the subprocess to
195 # while self._subprocess isn't used, having it allows the subprocess to
196 # to clean up correctly later
196 # to clean up correctly later
197 #
197 #
198 # no buffer allow the use of 'select'
198 # no buffer allow the use of 'select'
199 # feel free to remove buffering and select usage when we ultimately
199 # feel free to remove buffering and select usage when we ultimately
200 # move to threading.
200 # move to threading.
201 sub = util.popen4(cmd, bufsize=0, env=sshenv)
201 sub = util.popen4(cmd, bufsize=0, env=sshenv)
202 self._pipeo, self._pipei, self._pipee, self._subprocess = sub
202 self._pipeo, self._pipei, self._pipee, self._subprocess = sub
203
203
204 self._pipei = util.bufferedinputpipe(self._pipei)
204 self._pipei = util.bufferedinputpipe(self._pipei)
205 self._pipei = doublepipe(self.ui, self._pipei, self._pipee)
205 self._pipei = doublepipe(self.ui, self._pipei, self._pipee)
206 self._pipeo = doublepipe(self.ui, self._pipeo, self._pipee)
206 self._pipeo = doublepipe(self.ui, self._pipeo, self._pipee)
207
207
208 def badresponse():
208 def badresponse():
209 msg = _("no suitable response from remote hg")
209 msg = _("no suitable response from remote hg")
210 hint = self.ui.config("ui", "ssherrorhint")
210 hint = self.ui.config("ui", "ssherrorhint")
211 self._abort(error.RepoError(msg, hint=hint))
211 self._abort(error.RepoError(msg, hint=hint))
212
212
213 try:
213 try:
214 # skip any noise generated by remote shell
214 # skip any noise generated by remote shell
215 self._callstream("hello")
215 self._callstream("hello")
216 r = self._callstream("between", pairs=("%s-%s" % ("0"*40, "0"*40)))
216 r = self._callstream("between", pairs=("%s-%s" % ("0"*40, "0"*40)))
217 except IOError:
217 except IOError:
218 badresponse()
218 badresponse()
219
219
220 lines = ["", "dummy"]
220 lines = ["", "dummy"]
221 max_noise = 500
221 max_noise = 500
222 while lines[-1] and max_noise:
222 while lines[-1] and max_noise:
223 try:
223 try:
224 l = r.readline()
224 l = r.readline()
225 self._readerr()
225 self._readerr()
226 if lines[-1] == "1\n" and l == "\n":
226 if lines[-1] == "1\n" and l == "\n":
227 break
227 break
228 if l:
228 if l:
229 self.ui.debug("remote: ", l)
229 self.ui.debug("remote: ", l)
230 lines.append(l)
230 lines.append(l)
231 max_noise -= 1
231 max_noise -= 1
232 except IOError:
232 except IOError:
233 badresponse()
233 badresponse()
234 else:
234 else:
235 badresponse()
235 badresponse()
236
236
237 self._caps = set()
237 self._caps = set()
238 for l in reversed(lines):
238 for l in reversed(lines):
239 if l.startswith("capabilities:"):
239 if l.startswith("capabilities:"):
240 self._caps.update(l[:-1].split(":")[1].split())
240 self._caps.update(l[:-1].split(":")[1].split())
241 break
241 break
242
242
243 def _readerr(self):
243 def _readerr(self):
244 _forwardoutput(self.ui, self._pipee)
244 _forwardoutput(self.ui, self._pipee)
245
245
246 def _abort(self, exception):
246 def _abort(self, exception):
247 self._cleanup()
247 self._cleanup()
248 raise exception
248 raise exception
249
249
250 def _cleanup(self):
250 def _cleanup(self):
251 if self._pipeo is None:
251 if self._pipeo is None:
252 return
252 return
253 self._pipeo.close()
253 self._pipeo.close()
254 self._pipei.close()
254 self._pipei.close()
255 try:
255 try:
256 # read the error descriptor until EOF
256 # read the error descriptor until EOF
257 for l in self._pipee:
257 for l in self._pipee:
258 self.ui.status(_("remote: "), l)
258 self.ui.status(_("remote: "), l)
259 except (IOError, ValueError):
259 except (IOError, ValueError):
260 pass
260 pass
261 self._pipee.close()
261 self._pipee.close()
262
262
263 __del__ = _cleanup
263 __del__ = _cleanup
264
264
265 def _submitbatch(self, req):
265 def _submitbatch(self, req):
266 rsp = self._callstream("batch", cmds=wireproto.encodebatchcmds(req))
266 rsp = self._callstream("batch", cmds=wireproto.encodebatchcmds(req))
267 available = self._getamount()
267 available = self._getamount()
268 # TODO this response parsing is probably suboptimal for large
268 # TODO this response parsing is probably suboptimal for large
269 # batches with large responses.
269 # batches with large responses.
270 toread = min(available, 1024)
270 toread = min(available, 1024)
271 work = rsp.read(toread)
271 work = rsp.read(toread)
272 available -= toread
272 available -= toread
273 chunk = work
273 chunk = work
274 while chunk:
274 while chunk:
275 while ';' in work:
275 while ';' in work:
276 one, work = work.split(';', 1)
276 one, work = work.split(';', 1)
277 yield wireproto.unescapearg(one)
277 yield wireproto.unescapearg(one)
278 toread = min(available, 1024)
278 toread = min(available, 1024)
279 chunk = rsp.read(toread)
279 chunk = rsp.read(toread)
280 available -= toread
280 available -= toread
281 work += chunk
281 work += chunk
282 yield wireproto.unescapearg(work)
282 yield wireproto.unescapearg(work)
283
283
284 def _callstream(self, cmd, **args):
284 def _callstream(self, cmd, **args):
285 args = pycompat.byteskwargs(args)
285 args = pycompat.byteskwargs(args)
286 if (self.ui.debugflag
286 if (self.ui.debugflag
287 and self.ui.configbool('devel', 'debug.peer-request')):
287 and self.ui.configbool('devel', 'debug.peer-request')):
288 dbg = self.ui.debug
288 dbg = self.ui.debug
289 line = 'devel-peer-request: %s\n'
289 line = 'devel-peer-request: %s\n'
290 dbg(line % cmd)
290 dbg(line % cmd)
291 for key, value in sorted(args.items()):
291 for key, value in sorted(args.items()):
292 if not isinstance(value, dict):
292 if not isinstance(value, dict):
293 dbg(line % ' %s: %d bytes' % (key, len(value)))
293 dbg(line % ' %s: %d bytes' % (key, len(value)))
294 else:
294 else:
295 for dk, dv in sorted(value.items()):
295 for dk, dv in sorted(value.items()):
296 dbg(line % ' %s-%s: %d' % (key, dk, len(dv)))
296 dbg(line % ' %s-%s: %d' % (key, dk, len(dv)))
297 self.ui.debug("sending %s command\n" % cmd)
297 self.ui.debug("sending %s command\n" % cmd)
298 self._pipeo.write("%s\n" % cmd)
298 self._pipeo.write("%s\n" % cmd)
299 _func, names = wireproto.commands[cmd]
299 _func, names = wireproto.commands[cmd]
300 keys = names.split()
300 keys = names.split()
301 wireargs = {}
301 wireargs = {}
302 for k in keys:
302 for k in keys:
303 if k == '*':
303 if k == '*':
304 wireargs['*'] = args
304 wireargs['*'] = args
305 break
305 break
306 else:
306 else:
307 wireargs[k] = args[k]
307 wireargs[k] = args[k]
308 del args[k]
308 del args[k]
309 for k, v in sorted(wireargs.iteritems()):
309 for k, v in sorted(wireargs.iteritems()):
310 self._pipeo.write("%s %d\n" % (k, len(v)))
310 self._pipeo.write("%s %d\n" % (k, len(v)))
311 if isinstance(v, dict):
311 if isinstance(v, dict):
312 for dk, dv in v.iteritems():
312 for dk, dv in v.iteritems():
313 self._pipeo.write("%s %d\n" % (dk, len(dv)))
313 self._pipeo.write("%s %d\n" % (dk, len(dv)))
314 self._pipeo.write(dv)
314 self._pipeo.write(dv)
315 else:
315 else:
316 self._pipeo.write(v)
316 self._pipeo.write(v)
317 self._pipeo.flush()
317 self._pipeo.flush()
318
318
319 return self._pipei
319 return self._pipei
320
320
321 def _callcompressable(self, cmd, **args):
321 def _callcompressable(self, cmd, **args):
322 return self._callstream(cmd, **args)
322 return self._callstream(cmd, **args)
323
323
324 def _call(self, cmd, **args):
324 def _call(self, cmd, **args):
325 self._callstream(cmd, **args)
325 self._callstream(cmd, **args)
326 return self._recv()
326 return self._recv()
327
327
328 def _callpush(self, cmd, fp, **args):
328 def _callpush(self, cmd, fp, **args):
329 r = self._call(cmd, **args)
329 r = self._call(cmd, **args)
330 if r:
330 if r:
331 return '', r
331 return '', r
332 for d in iter(lambda: fp.read(4096), ''):
332 for d in iter(lambda: fp.read(4096), ''):
333 self._send(d)
333 self._send(d)
334 self._send("", flush=True)
334 self._send("", flush=True)
335 r = self._recv()
335 r = self._recv()
336 if r:
336 if r:
337 return '', r
337 return '', r
338 return self._recv(), ''
338 return self._recv(), ''
339
339
340 def _calltwowaystream(self, cmd, fp, **args):
340 def _calltwowaystream(self, cmd, fp, **args):
341 r = self._call(cmd, **args)
341 r = self._call(cmd, **args)
342 if r:
342 if r:
343 # XXX needs to be made better
343 # XXX needs to be made better
344 raise error.Abort(_('unexpected remote reply: %s') % r)
344 raise error.Abort(_('unexpected remote reply: %s') % r)
345 for d in iter(lambda: fp.read(4096), ''):
345 for d in iter(lambda: fp.read(4096), ''):
346 self._send(d)
346 self._send(d)
347 self._send("", flush=True)
347 self._send("", flush=True)
348 return self._pipei
348 return self._pipei
349
349
350 def _getamount(self):
350 def _getamount(self):
351 l = self._pipei.readline()
351 l = self._pipei.readline()
352 if l == '\n':
352 if l == '\n':
353 self._readerr()
353 self._readerr()
354 msg = _('check previous remote output')
354 msg = _('check previous remote output')
355 self._abort(error.OutOfBandError(hint=msg))
355 self._abort(error.OutOfBandError(hint=msg))
356 self._readerr()
356 self._readerr()
357 try:
357 try:
358 return int(l)
358 return int(l)
359 except ValueError:
359 except ValueError:
360 self._abort(error.ResponseError(_("unexpected response:"), l))
360 self._abort(error.ResponseError(_("unexpected response:"), l))
361
361
362 def _recv(self):
362 def _recv(self):
363 return self._pipei.read(self._getamount())
363 return self._pipei.read(self._getamount())
364
364
365 def _send(self, data, flush=False):
365 def _send(self, data, flush=False):
366 self._pipeo.write("%d\n" % len(data))
366 self._pipeo.write("%d\n" % len(data))
367 if data:
367 if data:
368 self._pipeo.write(data)
368 self._pipeo.write(data)
369 if flush:
369 if flush:
370 self._pipeo.flush()
370 self._pipeo.flush()
371 self._readerr()
371 self._readerr()
372
372
373 instance = sshpeer
373 def instance(ui, path, create):
374 return sshpeer(ui, path, create=create)
General Comments 0
You need to be logged in to leave comments. Login now