##// END OF EJS Templates
sshpeer: also use doublepipe for client to server communication...
Pierre-Yves David -
r25458:4642f0b8 default
parent child Browse files
Show More
@@ -1,334 +1,335 b''
1 # sshpeer.py - ssh repository proxy class for mercurial
1 # sshpeer.py - ssh repository proxy class for mercurial
2 #
2 #
3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 import re
8 import re
9 from i18n import _
9 from i18n import _
10 import util, error, wireproto
10 import util, error, wireproto
11
11
12 class remotelock(object):
12 class remotelock(object):
13 def __init__(self, repo):
13 def __init__(self, repo):
14 self.repo = repo
14 self.repo = repo
15 def release(self):
15 def release(self):
16 self.repo.unlock()
16 self.repo.unlock()
17 self.repo = None
17 self.repo = None
18 def __del__(self):
18 def __del__(self):
19 if self.repo:
19 if self.repo:
20 self.release()
20 self.release()
21
21
22 def _serverquote(s):
22 def _serverquote(s):
23 if not s:
23 if not s:
24 return s
24 return s
25 '''quote a string for the remote shell ... which we assume is sh'''
25 '''quote a string for the remote shell ... which we assume is sh'''
26 if re.match('[a-zA-Z0-9@%_+=:,./-]*$', s):
26 if re.match('[a-zA-Z0-9@%_+=:,./-]*$', s):
27 return s
27 return s
28 return "'%s'" % s.replace("'", "'\\''")
28 return "'%s'" % s.replace("'", "'\\''")
29
29
30 def _forwardoutput(ui, pipe):
30 def _forwardoutput(ui, pipe):
31 """display all data currently available on pipe as remote output.
31 """display all data currently available on pipe as remote output.
32
32
33 This is non blocking."""
33 This is non blocking."""
34 s = util.readpipe(pipe)
34 s = util.readpipe(pipe)
35 if s:
35 if s:
36 for l in s.splitlines():
36 for l in s.splitlines():
37 ui.status(_("remote: "), l, '\n')
37 ui.status(_("remote: "), l, '\n')
38
38
39 class doublepipe(object):
39 class doublepipe(object):
40 """Operate a side-channel pipe in addition of a main one
40 """Operate a side-channel pipe in addition of a main one
41
41
42 The side-channel pipe contains server output to be forwarded to the user
42 The side-channel pipe contains server output to be forwarded to the user
43 input. The double pipe will behave as the "main" pipe, but will ensure the
43 input. The double pipe will behave as the "main" pipe, but will ensure the
44 content of the "side" pipe is properly processed while we wait for blocking
44 content of the "side" pipe is properly processed while we wait for blocking
45 call on the "main" pipe.
45 call on the "main" pipe.
46
46
47 If large amounts of data are read from "main", the forward will cease after
47 If large amounts of data are read from "main", the forward will cease after
48 the first bytes start to appear. This simplifies the implementation
48 the first bytes start to appear. This simplifies the implementation
49 without affecting actual output of sshpeer too much as we rarely issue
49 without affecting actual output of sshpeer too much as we rarely issue
50 large read for data not yet emitted by the server.
50 large read for data not yet emitted by the server.
51
51
52 The main pipe is expected to be a 'bufferedinputpipe' from the util module
52 The main pipe is expected to be a 'bufferedinputpipe' from the util module
53 that handle all the os specific bites. This class lives in this module
53 that handle all the os specific bites. This class lives in this module
54 because it focus on behavior specifig to the ssh protocol."""
54 because it focus on behavior specifig to the ssh protocol."""
55
55
56 def __init__(self, ui, main, side):
56 def __init__(self, ui, main, side):
57 self._ui = ui
57 self._ui = ui
58 self._main = main
58 self._main = main
59 self._side = side
59 self._side = side
60
60
61 def _wait(self):
61 def _wait(self):
62 """wait until some data are available on main or side
62 """wait until some data are available on main or side
63
63
64 return a pair of boolean (ismainready, issideready)
64 return a pair of boolean (ismainready, issideready)
65
65
66 (This will only wait for data if the setup is supported by `util.poll`)
66 (This will only wait for data if the setup is supported by `util.poll`)
67 """
67 """
68 if getattr(self._main, 'hasbuffer', False): # getattr for classic pipe
68 if getattr(self._main, 'hasbuffer', False): # getattr for classic pipe
69 return (True, True) # main has data, assume side is worth poking at.
69 return (True, True) # main has data, assume side is worth poking at.
70 fds = [self._main.fileno(), self._side.fileno()]
70 fds = [self._main.fileno(), self._side.fileno()]
71 try:
71 try:
72 act = util.poll(fds)
72 act = util.poll(fds)
73 except NotImplementedError:
73 except NotImplementedError:
74 # non supported yet case, assume all have data.
74 # non supported yet case, assume all have data.
75 act = fds
75 act = fds
76 return (self._main.fileno() in act, self._side.fileno() in act)
76 return (self._main.fileno() in act, self._side.fileno() in act)
77
77
78 def write(self, data):
78 def write(self, data):
79 return self._call('write', data)
79 return self._call('write', data)
80
80
81 def read(self, size):
81 def read(self, size):
82 return self._call('read', size)
82 return self._call('read', size)
83
83
84 def readline(self):
84 def readline(self):
85 return self._call('readline')
85 return self._call('readline')
86
86
87 def _call(self, methname, data=None):
87 def _call(self, methname, data=None):
88 """call <methname> on "main", forward output of "side" while blocking
88 """call <methname> on "main", forward output of "side" while blocking
89 """
89 """
90 # data can be '' or 0
90 # data can be '' or 0
91 if (data is not None and not data) or self._main.closed:
91 if (data is not None and not data) or self._main.closed:
92 _forwardoutput(self._ui, self._side)
92 _forwardoutput(self._ui, self._side)
93 return ''
93 return ''
94 while True:
94 while True:
95 mainready, sideready = self._wait()
95 mainready, sideready = self._wait()
96 if sideready:
96 if sideready:
97 _forwardoutput(self._ui, self._side)
97 _forwardoutput(self._ui, self._side)
98 if mainready:
98 if mainready:
99 meth = getattr(self._main, methname)
99 meth = getattr(self._main, methname)
100 if data is None:
100 if data is None:
101 return meth()
101 return meth()
102 else:
102 else:
103 return meth(data)
103 return meth(data)
104
104
105 def close(self):
105 def close(self):
106 return self._main.close()
106 return self._main.close()
107
107
108 def flush(self):
108 def flush(self):
109 return self._main.flush()
109 return self._main.flush()
110
110
111 class sshpeer(wireproto.wirepeer):
111 class sshpeer(wireproto.wirepeer):
112 def __init__(self, ui, path, create=False):
112 def __init__(self, ui, path, create=False):
113 self._url = path
113 self._url = path
114 self.ui = ui
114 self.ui = ui
115 self.pipeo = self.pipei = self.pipee = None
115 self.pipeo = self.pipei = self.pipee = None
116
116
117 u = util.url(path, parsequery=False, parsefragment=False)
117 u = util.url(path, parsequery=False, parsefragment=False)
118 if u.scheme != 'ssh' or not u.host or u.path is None:
118 if u.scheme != 'ssh' or not u.host or u.path is None:
119 self._abort(error.RepoError(_("couldn't parse location %s") % path))
119 self._abort(error.RepoError(_("couldn't parse location %s") % path))
120
120
121 self.user = u.user
121 self.user = u.user
122 if u.passwd is not None:
122 if u.passwd is not None:
123 self._abort(error.RepoError(_("password in URL not supported")))
123 self._abort(error.RepoError(_("password in URL not supported")))
124 self.host = u.host
124 self.host = u.host
125 self.port = u.port
125 self.port = u.port
126 self.path = u.path or "."
126 self.path = u.path or "."
127
127
128 sshcmd = self.ui.config("ui", "ssh", "ssh")
128 sshcmd = self.ui.config("ui", "ssh", "ssh")
129 remotecmd = self.ui.config("ui", "remotecmd", "hg")
129 remotecmd = self.ui.config("ui", "remotecmd", "hg")
130
130
131 args = util.sshargs(sshcmd,
131 args = util.sshargs(sshcmd,
132 _serverquote(self.host),
132 _serverquote(self.host),
133 _serverquote(self.user),
133 _serverquote(self.user),
134 _serverquote(self.port))
134 _serverquote(self.port))
135
135
136 if create:
136 if create:
137 cmd = '%s %s %s' % (sshcmd, args,
137 cmd = '%s %s %s' % (sshcmd, args,
138 util.shellquote("%s init %s" %
138 util.shellquote("%s init %s" %
139 (_serverquote(remotecmd), _serverquote(self.path))))
139 (_serverquote(remotecmd), _serverquote(self.path))))
140 ui.debug('running %s\n' % cmd)
140 ui.debug('running %s\n' % cmd)
141 res = ui.system(cmd)
141 res = ui.system(cmd)
142 if res != 0:
142 if res != 0:
143 self._abort(error.RepoError(_("could not create remote repo")))
143 self._abort(error.RepoError(_("could not create remote repo")))
144
144
145 self._validaterepo(sshcmd, args, remotecmd)
145 self._validaterepo(sshcmd, args, remotecmd)
146
146
147 def url(self):
147 def url(self):
148 return self._url
148 return self._url
149
149
150 def _validaterepo(self, sshcmd, args, remotecmd):
150 def _validaterepo(self, sshcmd, args, remotecmd):
151 # cleanup up previous run
151 # cleanup up previous run
152 self.cleanup()
152 self.cleanup()
153
153
154 cmd = '%s %s %s' % (sshcmd, args,
154 cmd = '%s %s %s' % (sshcmd, args,
155 util.shellquote("%s -R %s serve --stdio" %
155 util.shellquote("%s -R %s serve --stdio" %
156 (_serverquote(remotecmd), _serverquote(self.path))))
156 (_serverquote(remotecmd), _serverquote(self.path))))
157 self.ui.debug('running %s\n' % cmd)
157 self.ui.debug('running %s\n' % cmd)
158 cmd = util.quotecommand(cmd)
158 cmd = util.quotecommand(cmd)
159
159
160 # while self.subprocess isn't used, having it allows the subprocess to
160 # while self.subprocess isn't used, having it allows the subprocess to
161 # to clean up correctly later
161 # to clean up correctly later
162 #
162 #
163 # no buffer allow the use of 'select'
163 # no buffer allow the use of 'select'
164 # feel free to remove buffering and select usage when we ultimately
164 # feel free to remove buffering and select usage when we ultimately
165 # move to threading.
165 # move to threading.
166 sub = util.popen4(cmd, bufsize=0)
166 sub = util.popen4(cmd, bufsize=0)
167 self.pipeo, self.pipei, self.pipee, self.subprocess = sub
167 self.pipeo, self.pipei, self.pipee, self.subprocess = sub
168
168
169 self.pipei = util.bufferedinputpipe(self.pipei)
169 self.pipei = util.bufferedinputpipe(self.pipei)
170 self.pipei = doublepipe(self.ui, self.pipei, self.pipee)
170 self.pipei = doublepipe(self.ui, self.pipei, self.pipee)
171 self.pipeo = doublepipe(self.ui, self.pipeo, self.pipee)
171
172
172 # skip any noise generated by remote shell
173 # skip any noise generated by remote shell
173 self._callstream("hello")
174 self._callstream("hello")
174 r = self._callstream("between", pairs=("%s-%s" % ("0"*40, "0"*40)))
175 r = self._callstream("between", pairs=("%s-%s" % ("0"*40, "0"*40)))
175 lines = ["", "dummy"]
176 lines = ["", "dummy"]
176 max_noise = 500
177 max_noise = 500
177 while lines[-1] and max_noise:
178 while lines[-1] and max_noise:
178 l = r.readline()
179 l = r.readline()
179 self.readerr()
180 self.readerr()
180 if lines[-1] == "1\n" and l == "\n":
181 if lines[-1] == "1\n" and l == "\n":
181 break
182 break
182 if l:
183 if l:
183 self.ui.debug("remote: ", l)
184 self.ui.debug("remote: ", l)
184 lines.append(l)
185 lines.append(l)
185 max_noise -= 1
186 max_noise -= 1
186 else:
187 else:
187 self._abort(error.RepoError(_('no suitable response from '
188 self._abort(error.RepoError(_('no suitable response from '
188 'remote hg')))
189 'remote hg')))
189
190
190 self._caps = set()
191 self._caps = set()
191 for l in reversed(lines):
192 for l in reversed(lines):
192 if l.startswith("capabilities:"):
193 if l.startswith("capabilities:"):
193 self._caps.update(l[:-1].split(":")[1].split())
194 self._caps.update(l[:-1].split(":")[1].split())
194 break
195 break
195
196
196 def _capabilities(self):
197 def _capabilities(self):
197 return self._caps
198 return self._caps
198
199
199 def readerr(self):
200 def readerr(self):
200 _forwardoutput(self.ui, self.pipee)
201 _forwardoutput(self.ui, self.pipee)
201
202
202 def _abort(self, exception):
203 def _abort(self, exception):
203 self.cleanup()
204 self.cleanup()
204 raise exception
205 raise exception
205
206
206 def cleanup(self):
207 def cleanup(self):
207 if self.pipeo is None:
208 if self.pipeo is None:
208 return
209 return
209 self.pipeo.close()
210 self.pipeo.close()
210 self.pipei.close()
211 self.pipei.close()
211 try:
212 try:
212 # read the error descriptor until EOF
213 # read the error descriptor until EOF
213 for l in self.pipee:
214 for l in self.pipee:
214 self.ui.status(_("remote: "), l)
215 self.ui.status(_("remote: "), l)
215 except (IOError, ValueError):
216 except (IOError, ValueError):
216 pass
217 pass
217 self.pipee.close()
218 self.pipee.close()
218
219
219 __del__ = cleanup
220 __del__ = cleanup
220
221
221 def _callstream(self, cmd, **args):
222 def _callstream(self, cmd, **args):
222 self.ui.debug("sending %s command\n" % cmd)
223 self.ui.debug("sending %s command\n" % cmd)
223 self.pipeo.write("%s\n" % cmd)
224 self.pipeo.write("%s\n" % cmd)
224 _func, names = wireproto.commands[cmd]
225 _func, names = wireproto.commands[cmd]
225 keys = names.split()
226 keys = names.split()
226 wireargs = {}
227 wireargs = {}
227 for k in keys:
228 for k in keys:
228 if k == '*':
229 if k == '*':
229 wireargs['*'] = args
230 wireargs['*'] = args
230 break
231 break
231 else:
232 else:
232 wireargs[k] = args[k]
233 wireargs[k] = args[k]
233 del args[k]
234 del args[k]
234 for k, v in sorted(wireargs.iteritems()):
235 for k, v in sorted(wireargs.iteritems()):
235 self.pipeo.write("%s %d\n" % (k, len(v)))
236 self.pipeo.write("%s %d\n" % (k, len(v)))
236 if isinstance(v, dict):
237 if isinstance(v, dict):
237 for dk, dv in v.iteritems():
238 for dk, dv in v.iteritems():
238 self.pipeo.write("%s %d\n" % (dk, len(dv)))
239 self.pipeo.write("%s %d\n" % (dk, len(dv)))
239 self.pipeo.write(dv)
240 self.pipeo.write(dv)
240 else:
241 else:
241 self.pipeo.write(v)
242 self.pipeo.write(v)
242 self.pipeo.flush()
243 self.pipeo.flush()
243
244
244 return self.pipei
245 return self.pipei
245
246
246 def _callcompressable(self, cmd, **args):
247 def _callcompressable(self, cmd, **args):
247 return self._callstream(cmd, **args)
248 return self._callstream(cmd, **args)
248
249
249 def _call(self, cmd, **args):
250 def _call(self, cmd, **args):
250 self._callstream(cmd, **args)
251 self._callstream(cmd, **args)
251 return self._recv()
252 return self._recv()
252
253
253 def _callpush(self, cmd, fp, **args):
254 def _callpush(self, cmd, fp, **args):
254 r = self._call(cmd, **args)
255 r = self._call(cmd, **args)
255 if r:
256 if r:
256 return '', r
257 return '', r
257 while True:
258 while True:
258 d = fp.read(4096)
259 d = fp.read(4096)
259 if not d:
260 if not d:
260 break
261 break
261 self._send(d)
262 self._send(d)
262 self._send("", flush=True)
263 self._send("", flush=True)
263 r = self._recv()
264 r = self._recv()
264 if r:
265 if r:
265 return '', r
266 return '', r
266 return self._recv(), ''
267 return self._recv(), ''
267
268
268 def _calltwowaystream(self, cmd, fp, **args):
269 def _calltwowaystream(self, cmd, fp, **args):
269 r = self._call(cmd, **args)
270 r = self._call(cmd, **args)
270 if r:
271 if r:
271 # XXX needs to be made better
272 # XXX needs to be made better
272 raise util.Abort('unexpected remote reply: %s' % r)
273 raise util.Abort('unexpected remote reply: %s' % r)
273 while True:
274 while True:
274 d = fp.read(4096)
275 d = fp.read(4096)
275 if not d:
276 if not d:
276 break
277 break
277 self._send(d)
278 self._send(d)
278 self._send("", flush=True)
279 self._send("", flush=True)
279 return self.pipei
280 return self.pipei
280
281
281 def _recv(self):
282 def _recv(self):
282 l = self.pipei.readline()
283 l = self.pipei.readline()
283 if l == '\n':
284 if l == '\n':
284 self.readerr()
285 self.readerr()
285 msg = _('check previous remote output')
286 msg = _('check previous remote output')
286 self._abort(error.OutOfBandError(hint=msg))
287 self._abort(error.OutOfBandError(hint=msg))
287 self.readerr()
288 self.readerr()
288 try:
289 try:
289 l = int(l)
290 l = int(l)
290 except ValueError:
291 except ValueError:
291 self._abort(error.ResponseError(_("unexpected response:"), l))
292 self._abort(error.ResponseError(_("unexpected response:"), l))
292 return self.pipei.read(l)
293 return self.pipei.read(l)
293
294
294 def _send(self, data, flush=False):
295 def _send(self, data, flush=False):
295 self.pipeo.write("%d\n" % len(data))
296 self.pipeo.write("%d\n" % len(data))
296 if data:
297 if data:
297 self.pipeo.write(data)
298 self.pipeo.write(data)
298 if flush:
299 if flush:
299 self.pipeo.flush()
300 self.pipeo.flush()
300 self.readerr()
301 self.readerr()
301
302
302 def lock(self):
303 def lock(self):
303 self._call("lock")
304 self._call("lock")
304 return remotelock(self)
305 return remotelock(self)
305
306
306 def unlock(self):
307 def unlock(self):
307 self._call("unlock")
308 self._call("unlock")
308
309
309 def addchangegroup(self, cg, source, url, lock=None):
310 def addchangegroup(self, cg, source, url, lock=None):
310 '''Send a changegroup to the remote server. Return an integer
311 '''Send a changegroup to the remote server. Return an integer
311 similar to unbundle(). DEPRECATED, since it requires locking the
312 similar to unbundle(). DEPRECATED, since it requires locking the
312 remote.'''
313 remote.'''
313 d = self._call("addchangegroup")
314 d = self._call("addchangegroup")
314 if d:
315 if d:
315 self._abort(error.RepoError(_("push refused: %s") % d))
316 self._abort(error.RepoError(_("push refused: %s") % d))
316 while True:
317 while True:
317 d = cg.read(4096)
318 d = cg.read(4096)
318 if not d:
319 if not d:
319 break
320 break
320 self.pipeo.write(d)
321 self.pipeo.write(d)
321 self.readerr()
322 self.readerr()
322
323
323 self.pipeo.flush()
324 self.pipeo.flush()
324
325
325 self.readerr()
326 self.readerr()
326 r = self._recv()
327 r = self._recv()
327 if not r:
328 if not r:
328 return 1
329 return 1
329 try:
330 try:
330 return int(r)
331 return int(r)
331 except ValueError:
332 except ValueError:
332 self._abort(error.ResponseError(_("unexpected response:"), r))
333 self._abort(error.ResponseError(_("unexpected response:"), r))
333
334
334 instance = sshpeer
335 instance = sshpeer
General Comments 0
You need to be logged in to leave comments. Login now