##// END OF EJS Templates
sshpeer: use peer interface...
Gregory Szorc -
r33803:1f8460b5 default
parent child Browse files
Show More
@@ -1,325 +1,353 b''
1 # sshpeer.py - ssh repository proxy class for mercurial
1 # sshpeer.py - ssh repository proxy class for mercurial
2 #
2 #
3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import re
10 import re
11
11
12 from .i18n import _
12 from .i18n import _
13 from . import (
13 from . import (
14 error,
14 error,
15 pycompat,
15 pycompat,
16 repository,
16 util,
17 util,
17 wireproto,
18 wireproto,
18 )
19 )
19
20
20 def _serverquote(s):
21 def _serverquote(s):
21 if not s:
22 if not s:
22 return s
23 return s
23 '''quote a string for the remote shell ... which we assume is sh'''
24 '''quote a string for the remote shell ... which we assume is sh'''
24 if re.match('[a-zA-Z0-9@%_+=:,./-]*$', s):
25 if re.match('[a-zA-Z0-9@%_+=:,./-]*$', s):
25 return s
26 return s
26 return "'%s'" % s.replace("'", "'\\''")
27 return "'%s'" % s.replace("'", "'\\''")
27
28
28 def _forwardoutput(ui, pipe):
29 def _forwardoutput(ui, pipe):
29 """display all data currently available on pipe as remote output.
30 """display all data currently available on pipe as remote output.
30
31
31 This is non blocking."""
32 This is non blocking."""
32 s = util.readpipe(pipe)
33 s = util.readpipe(pipe)
33 if s:
34 if s:
34 for l in s.splitlines():
35 for l in s.splitlines():
35 ui.status(_("remote: "), l, '\n')
36 ui.status(_("remote: "), l, '\n')
36
37
37 class doublepipe(object):
38 class doublepipe(object):
38 """Operate a side-channel pipe in addition of a main one
39 """Operate a side-channel pipe in addition of a main one
39
40
40 The side-channel pipe contains server output to be forwarded to the user
41 The side-channel pipe contains server output to be forwarded to the user
41 input. The double pipe will behave as the "main" pipe, but will ensure the
42 input. The double pipe will behave as the "main" pipe, but will ensure the
42 content of the "side" pipe is properly processed while we wait for blocking
43 content of the "side" pipe is properly processed while we wait for blocking
43 call on the "main" pipe.
44 call on the "main" pipe.
44
45
45 If large amounts of data are read from "main", the forward will cease after
46 If large amounts of data are read from "main", the forward will cease after
46 the first bytes start to appear. This simplifies the implementation
47 the first bytes start to appear. This simplifies the implementation
47 without affecting actual output of sshpeer too much as we rarely issue
48 without affecting actual output of sshpeer too much as we rarely issue
48 large read for data not yet emitted by the server.
49 large read for data not yet emitted by the server.
49
50
50 The main pipe is expected to be a 'bufferedinputpipe' from the util module
51 The main pipe is expected to be a 'bufferedinputpipe' from the util module
51 that handle all the os specific bits. This class lives in this module
52 that handle all the os specific bits. This class lives in this module
52 because it focus on behavior specific to the ssh protocol."""
53 because it focus on behavior specific to the ssh protocol."""
53
54
54 def __init__(self, ui, main, side):
55 def __init__(self, ui, main, side):
55 self._ui = ui
56 self._ui = ui
56 self._main = main
57 self._main = main
57 self._side = side
58 self._side = side
58
59
59 def _wait(self):
60 def _wait(self):
60 """wait until some data are available on main or side
61 """wait until some data are available on main or side
61
62
62 return a pair of boolean (ismainready, issideready)
63 return a pair of boolean (ismainready, issideready)
63
64
64 (This will only wait for data if the setup is supported by `util.poll`)
65 (This will only wait for data if the setup is supported by `util.poll`)
65 """
66 """
66 if getattr(self._main, 'hasbuffer', False): # getattr for classic pipe
67 if getattr(self._main, 'hasbuffer', False): # getattr for classic pipe
67 return (True, True) # main has data, assume side is worth poking at.
68 return (True, True) # main has data, assume side is worth poking at.
68 fds = [self._main.fileno(), self._side.fileno()]
69 fds = [self._main.fileno(), self._side.fileno()]
69 try:
70 try:
70 act = util.poll(fds)
71 act = util.poll(fds)
71 except NotImplementedError:
72 except NotImplementedError:
72 # non supported yet case, assume all have data.
73 # non supported yet case, assume all have data.
73 act = fds
74 act = fds
74 return (self._main.fileno() in act, self._side.fileno() in act)
75 return (self._main.fileno() in act, self._side.fileno() in act)
75
76
76 def write(self, data):
77 def write(self, data):
77 return self._call('write', data)
78 return self._call('write', data)
78
79
79 def read(self, size):
80 def read(self, size):
80 r = self._call('read', size)
81 r = self._call('read', size)
81 if size != 0 and not r:
82 if size != 0 and not r:
82 # We've observed a condition that indicates the
83 # We've observed a condition that indicates the
83 # stdout closed unexpectedly. Check stderr one
84 # stdout closed unexpectedly. Check stderr one
84 # more time and snag anything that's there before
85 # more time and snag anything that's there before
85 # letting anyone know the main part of the pipe
86 # letting anyone know the main part of the pipe
86 # closed prematurely.
87 # closed prematurely.
87 _forwardoutput(self._ui, self._side)
88 _forwardoutput(self._ui, self._side)
88 return r
89 return r
89
90
90 def readline(self):
91 def readline(self):
91 return self._call('readline')
92 return self._call('readline')
92
93
93 def _call(self, methname, data=None):
94 def _call(self, methname, data=None):
94 """call <methname> on "main", forward output of "side" while blocking
95 """call <methname> on "main", forward output of "side" while blocking
95 """
96 """
96 # data can be '' or 0
97 # data can be '' or 0
97 if (data is not None and not data) or self._main.closed:
98 if (data is not None and not data) or self._main.closed:
98 _forwardoutput(self._ui, self._side)
99 _forwardoutput(self._ui, self._side)
99 return ''
100 return ''
100 while True:
101 while True:
101 mainready, sideready = self._wait()
102 mainready, sideready = self._wait()
102 if sideready:
103 if sideready:
103 _forwardoutput(self._ui, self._side)
104 _forwardoutput(self._ui, self._side)
104 if mainready:
105 if mainready:
105 meth = getattr(self._main, methname)
106 meth = getattr(self._main, methname)
106 if data is None:
107 if data is None:
107 return meth()
108 return meth()
108 else:
109 else:
109 return meth(data)
110 return meth(data)
110
111
111 def close(self):
112 def close(self):
112 return self._main.close()
113 return self._main.close()
113
114
114 def flush(self):
115 def flush(self):
115 return self._main.flush()
116 return self._main.flush()
116
117
117 class sshpeer(wireproto.wirepeer):
118 class sshpeer(wireproto.wirepeer, repository.legacypeer):
118 def __init__(self, ui, path, create=False):
119 def __init__(self, ui, path, create=False):
119 self._url = path
120 self._url = path
120 self.ui = ui
121 self._ui = ui
121 self._pipeo = self._pipei = self._pipee = None
122 self._pipeo = self._pipei = self._pipee = None
122
123
123 u = util.url(path, parsequery=False, parsefragment=False)
124 u = util.url(path, parsequery=False, parsefragment=False)
124 if u.scheme != 'ssh' or not u.host or u.path is None:
125 if u.scheme != 'ssh' or not u.host or u.path is None:
125 self._abort(error.RepoError(_("couldn't parse location %s") % path))
126 self._abort(error.RepoError(_("couldn't parse location %s") % path))
126
127
127 util.checksafessh(path)
128 util.checksafessh(path)
128
129
129 if u.passwd is not None:
130 if u.passwd is not None:
130 self._abort(error.RepoError(_("password in URL not supported")))
131 self._abort(error.RepoError(_("password in URL not supported")))
131
132
132 self._user = u.user
133 self._user = u.user
133 self._host = u.host
134 self._host = u.host
134 self._port = u.port
135 self._port = u.port
135 self._path = u.path or '.'
136 self._path = u.path or '.'
136
137
137 sshcmd = self.ui.config("ui", "ssh")
138 sshcmd = self.ui.config("ui", "ssh")
138 remotecmd = self.ui.config("ui", "remotecmd")
139 remotecmd = self.ui.config("ui", "remotecmd")
139
140
140 args = util.sshargs(sshcmd, self._host, self._user, self._port)
141 args = util.sshargs(sshcmd, self._host, self._user, self._port)
141
142
142 if create:
143 if create:
143 cmd = '%s %s %s' % (sshcmd, args,
144 cmd = '%s %s %s' % (sshcmd, args,
144 util.shellquote("%s init %s" %
145 util.shellquote("%s init %s" %
145 (_serverquote(remotecmd), _serverquote(self._path))))
146 (_serverquote(remotecmd), _serverquote(self._path))))
146 ui.debug('running %s\n' % cmd)
147 ui.debug('running %s\n' % cmd)
147 res = ui.system(cmd, blockedtag='sshpeer')
148 res = ui.system(cmd, blockedtag='sshpeer')
148 if res != 0:
149 if res != 0:
149 self._abort(error.RepoError(_("could not create remote repo")))
150 self._abort(error.RepoError(_("could not create remote repo")))
150
151
151 self._validaterepo(sshcmd, args, remotecmd)
152 self._validaterepo(sshcmd, args, remotecmd)
152
153
154 # TODO remove this alias once peerrepository inheritance is removed.
155 self._capabilities = self.capabilities
156
157 # Begin of _basepeer interface.
158
159 @util.propertycache
160 def ui(self):
161 return self._ui
162
153 def url(self):
163 def url(self):
154 return self._url
164 return self._url
155
165
166 def local(self):
167 return None
168
169 def peer(self):
170 return self
171
172 def canpush(self):
173 return True
174
175 def close(self):
176 pass
177
178 # End of _basepeer interface.
179
180 # Begin of _basewirecommands interface.
181
182 def capabilities(self):
183 return self._caps
184
185 # End of _basewirecommands interface.
186
156 def _validaterepo(self, sshcmd, args, remotecmd):
187 def _validaterepo(self, sshcmd, args, remotecmd):
157 # cleanup up previous run
188 # cleanup up previous run
158 self._cleanup()
189 self._cleanup()
159
190
160 cmd = '%s %s %s' % (sshcmd, args,
191 cmd = '%s %s %s' % (sshcmd, args,
161 util.shellquote("%s -R %s serve --stdio" %
192 util.shellquote("%s -R %s serve --stdio" %
162 (_serverquote(remotecmd), _serverquote(self._path))))
193 (_serverquote(remotecmd), _serverquote(self._path))))
163 self.ui.debug('running %s\n' % cmd)
194 self.ui.debug('running %s\n' % cmd)
164 cmd = util.quotecommand(cmd)
195 cmd = util.quotecommand(cmd)
165
196
166 # while self._subprocess isn't used, having it allows the subprocess to
197 # while self._subprocess isn't used, having it allows the subprocess to
167 # to clean up correctly later
198 # to clean up correctly later
168 #
199 #
169 # no buffer allow the use of 'select'
200 # no buffer allow the use of 'select'
170 # feel free to remove buffering and select usage when we ultimately
201 # feel free to remove buffering and select usage when we ultimately
171 # move to threading.
202 # move to threading.
172 sub = util.popen4(cmd, bufsize=0)
203 sub = util.popen4(cmd, bufsize=0)
173 self._pipeo, self._pipei, self._pipee, self._subprocess = sub
204 self._pipeo, self._pipei, self._pipee, self._subprocess = sub
174
205
175 self._pipei = util.bufferedinputpipe(self._pipei)
206 self._pipei = util.bufferedinputpipe(self._pipei)
176 self._pipei = doublepipe(self.ui, self._pipei, self._pipee)
207 self._pipei = doublepipe(self.ui, self._pipei, self._pipee)
177 self._pipeo = doublepipe(self.ui, self._pipeo, self._pipee)
208 self._pipeo = doublepipe(self.ui, self._pipeo, self._pipee)
178
209
179 # skip any noise generated by remote shell
210 # skip any noise generated by remote shell
180 self._callstream("hello")
211 self._callstream("hello")
181 r = self._callstream("between", pairs=("%s-%s" % ("0"*40, "0"*40)))
212 r = self._callstream("between", pairs=("%s-%s" % ("0"*40, "0"*40)))
182 lines = ["", "dummy"]
213 lines = ["", "dummy"]
183 max_noise = 500
214 max_noise = 500
184 while lines[-1] and max_noise:
215 while lines[-1] and max_noise:
185 l = r.readline()
216 l = r.readline()
186 self._readerr()
217 self._readerr()
187 if lines[-1] == "1\n" and l == "\n":
218 if lines[-1] == "1\n" and l == "\n":
188 break
219 break
189 if l:
220 if l:
190 self.ui.debug("remote: ", l)
221 self.ui.debug("remote: ", l)
191 lines.append(l)
222 lines.append(l)
192 max_noise -= 1
223 max_noise -= 1
193 else:
224 else:
194 self._abort(error.RepoError(_('no suitable response from '
225 self._abort(error.RepoError(_('no suitable response from '
195 'remote hg')))
226 'remote hg')))
196
227
197 self._caps = set()
228 self._caps = set()
198 for l in reversed(lines):
229 for l in reversed(lines):
199 if l.startswith("capabilities:"):
230 if l.startswith("capabilities:"):
200 self._caps.update(l[:-1].split(":")[1].split())
231 self._caps.update(l[:-1].split(":")[1].split())
201 break
232 break
202
233
203 def _capabilities(self):
204 return self._caps
205
206 def _readerr(self):
234 def _readerr(self):
207 _forwardoutput(self.ui, self._pipee)
235 _forwardoutput(self.ui, self._pipee)
208
236
209 def _abort(self, exception):
237 def _abort(self, exception):
210 self._cleanup()
238 self._cleanup()
211 raise exception
239 raise exception
212
240
213 def _cleanup(self):
241 def _cleanup(self):
214 if self._pipeo is None:
242 if self._pipeo is None:
215 return
243 return
216 self._pipeo.close()
244 self._pipeo.close()
217 self._pipei.close()
245 self._pipei.close()
218 try:
246 try:
219 # read the error descriptor until EOF
247 # read the error descriptor until EOF
220 for l in self._pipee:
248 for l in self._pipee:
221 self.ui.status(_("remote: "), l)
249 self.ui.status(_("remote: "), l)
222 except (IOError, ValueError):
250 except (IOError, ValueError):
223 pass
251 pass
224 self._pipee.close()
252 self._pipee.close()
225
253
226 __del__ = _cleanup
254 __del__ = _cleanup
227
255
228 def _submitbatch(self, req):
256 def _submitbatch(self, req):
229 rsp = self._callstream("batch", cmds=wireproto.encodebatchcmds(req))
257 rsp = self._callstream("batch", cmds=wireproto.encodebatchcmds(req))
230 available = self._getamount()
258 available = self._getamount()
231 # TODO this response parsing is probably suboptimal for large
259 # TODO this response parsing is probably suboptimal for large
232 # batches with large responses.
260 # batches with large responses.
233 toread = min(available, 1024)
261 toread = min(available, 1024)
234 work = rsp.read(toread)
262 work = rsp.read(toread)
235 available -= toread
263 available -= toread
236 chunk = work
264 chunk = work
237 while chunk:
265 while chunk:
238 while ';' in work:
266 while ';' in work:
239 one, work = work.split(';', 1)
267 one, work = work.split(';', 1)
240 yield wireproto.unescapearg(one)
268 yield wireproto.unescapearg(one)
241 toread = min(available, 1024)
269 toread = min(available, 1024)
242 chunk = rsp.read(toread)
270 chunk = rsp.read(toread)
243 available -= toread
271 available -= toread
244 work += chunk
272 work += chunk
245 yield wireproto.unescapearg(work)
273 yield wireproto.unescapearg(work)
246
274
247 def _callstream(self, cmd, **args):
275 def _callstream(self, cmd, **args):
248 args = pycompat.byteskwargs(args)
276 args = pycompat.byteskwargs(args)
249 self.ui.debug("sending %s command\n" % cmd)
277 self.ui.debug("sending %s command\n" % cmd)
250 self._pipeo.write("%s\n" % cmd)
278 self._pipeo.write("%s\n" % cmd)
251 _func, names = wireproto.commands[cmd]
279 _func, names = wireproto.commands[cmd]
252 keys = names.split()
280 keys = names.split()
253 wireargs = {}
281 wireargs = {}
254 for k in keys:
282 for k in keys:
255 if k == '*':
283 if k == '*':
256 wireargs['*'] = args
284 wireargs['*'] = args
257 break
285 break
258 else:
286 else:
259 wireargs[k] = args[k]
287 wireargs[k] = args[k]
260 del args[k]
288 del args[k]
261 for k, v in sorted(wireargs.iteritems()):
289 for k, v in sorted(wireargs.iteritems()):
262 self._pipeo.write("%s %d\n" % (k, len(v)))
290 self._pipeo.write("%s %d\n" % (k, len(v)))
263 if isinstance(v, dict):
291 if isinstance(v, dict):
264 for dk, dv in v.iteritems():
292 for dk, dv in v.iteritems():
265 self._pipeo.write("%s %d\n" % (dk, len(dv)))
293 self._pipeo.write("%s %d\n" % (dk, len(dv)))
266 self._pipeo.write(dv)
294 self._pipeo.write(dv)
267 else:
295 else:
268 self._pipeo.write(v)
296 self._pipeo.write(v)
269 self._pipeo.flush()
297 self._pipeo.flush()
270
298
271 return self._pipei
299 return self._pipei
272
300
273 def _callcompressable(self, cmd, **args):
301 def _callcompressable(self, cmd, **args):
274 return self._callstream(cmd, **args)
302 return self._callstream(cmd, **args)
275
303
276 def _call(self, cmd, **args):
304 def _call(self, cmd, **args):
277 self._callstream(cmd, **args)
305 self._callstream(cmd, **args)
278 return self._recv()
306 return self._recv()
279
307
280 def _callpush(self, cmd, fp, **args):
308 def _callpush(self, cmd, fp, **args):
281 r = self._call(cmd, **args)
309 r = self._call(cmd, **args)
282 if r:
310 if r:
283 return '', r
311 return '', r
284 for d in iter(lambda: fp.read(4096), ''):
312 for d in iter(lambda: fp.read(4096), ''):
285 self._send(d)
313 self._send(d)
286 self._send("", flush=True)
314 self._send("", flush=True)
287 r = self._recv()
315 r = self._recv()
288 if r:
316 if r:
289 return '', r
317 return '', r
290 return self._recv(), ''
318 return self._recv(), ''
291
319
292 def _calltwowaystream(self, cmd, fp, **args):
320 def _calltwowaystream(self, cmd, fp, **args):
293 r = self._call(cmd, **args)
321 r = self._call(cmd, **args)
294 if r:
322 if r:
295 # XXX needs to be made better
323 # XXX needs to be made better
296 raise error.Abort(_('unexpected remote reply: %s') % r)
324 raise error.Abort(_('unexpected remote reply: %s') % r)
297 for d in iter(lambda: fp.read(4096), ''):
325 for d in iter(lambda: fp.read(4096), ''):
298 self._send(d)
326 self._send(d)
299 self._send("", flush=True)
327 self._send("", flush=True)
300 return self._pipei
328 return self._pipei
301
329
302 def _getamount(self):
330 def _getamount(self):
303 l = self._pipei.readline()
331 l = self._pipei.readline()
304 if l == '\n':
332 if l == '\n':
305 self._readerr()
333 self._readerr()
306 msg = _('check previous remote output')
334 msg = _('check previous remote output')
307 self._abort(error.OutOfBandError(hint=msg))
335 self._abort(error.OutOfBandError(hint=msg))
308 self._readerr()
336 self._readerr()
309 try:
337 try:
310 return int(l)
338 return int(l)
311 except ValueError:
339 except ValueError:
312 self._abort(error.ResponseError(_("unexpected response:"), l))
340 self._abort(error.ResponseError(_("unexpected response:"), l))
313
341
314 def _recv(self):
342 def _recv(self):
315 return self._pipei.read(self._getamount())
343 return self._pipei.read(self._getamount())
316
344
317 def _send(self, data, flush=False):
345 def _send(self, data, flush=False):
318 self._pipeo.write("%d\n" % len(data))
346 self._pipeo.write("%d\n" % len(data))
319 if data:
347 if data:
320 self._pipeo.write(data)
348 self._pipeo.write(data)
321 if flush:
349 if flush:
322 self._pipeo.flush()
350 self._pipeo.flush()
323 self._readerr()
351 self._readerr()
324
352
325 instance = sshpeer
353 instance = sshpeer
General Comments 0
You need to be logged in to leave comments. Login now