##// END OF EJS Templates
sshpeer: allow write operations through double pipe...
Pierre-Yves David -
r25456:408b7979 default
parent child Browse files
Show More
@@ -1,328 +1,334 b''
1 # sshpeer.py - ssh repository proxy class for mercurial
1 # sshpeer.py - ssh repository proxy class for mercurial
2 #
2 #
3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 import re
8 import re
9 from i18n import _
9 from i18n import _
10 import util, error, wireproto
10 import util, error, wireproto
11
11
12 class remotelock(object):
12 class remotelock(object):
13 def __init__(self, repo):
13 def __init__(self, repo):
14 self.repo = repo
14 self.repo = repo
15 def release(self):
15 def release(self):
16 self.repo.unlock()
16 self.repo.unlock()
17 self.repo = None
17 self.repo = None
18 def __del__(self):
18 def __del__(self):
19 if self.repo:
19 if self.repo:
20 self.release()
20 self.release()
21
21
22 def _serverquote(s):
22 def _serverquote(s):
23 if not s:
23 if not s:
24 return s
24 return s
25 '''quote a string for the remote shell ... which we assume is sh'''
25 '''quote a string for the remote shell ... which we assume is sh'''
26 if re.match('[a-zA-Z0-9@%_+=:,./-]*$', s):
26 if re.match('[a-zA-Z0-9@%_+=:,./-]*$', s):
27 return s
27 return s
28 return "'%s'" % s.replace("'", "'\\''")
28 return "'%s'" % s.replace("'", "'\\''")
29
29
30 def _forwardoutput(ui, pipe):
30 def _forwardoutput(ui, pipe):
31 """display all data currently available on pipe as remote output.
31 """display all data currently available on pipe as remote output.
32
32
33 This is non blocking."""
33 This is non blocking."""
34 s = util.readpipe(pipe)
34 s = util.readpipe(pipe)
35 if s:
35 if s:
36 for l in s.splitlines():
36 for l in s.splitlines():
37 ui.status(_("remote: "), l, '\n')
37 ui.status(_("remote: "), l, '\n')
38
38
39 class doublepipe(object):
39 class doublepipe(object):
40 """Operate a side-channel pipe in addition of a main one
40 """Operate a side-channel pipe in addition of a main one
41
41
42 The side-channel pipe contains server output to be forwarded to the user
42 The side-channel pipe contains server output to be forwarded to the user
43 input. The double pipe will behave as the "main" pipe, but will ensure the
43 input. The double pipe will behave as the "main" pipe, but will ensure the
44 content of the "side" pipe is properly processed while we wait for blocking
44 content of the "side" pipe is properly processed while we wait for blocking
45 call on the "main" pipe.
45 call on the "main" pipe.
46
46
47 If large amounts of data are read from "main", the forward will cease after
47 If large amounts of data are read from "main", the forward will cease after
48 the first bytes start to appear. This simplifies the implementation
48 the first bytes start to appear. This simplifies the implementation
49 without affecting actual output of sshpeer too much as we rarely issue
49 without affecting actual output of sshpeer too much as we rarely issue
50 large read for data not yet emitted by the server.
50 large read for data not yet emitted by the server.
51
51
52 The main pipe is expected to be a 'bufferedinputpipe' from the util module
52 The main pipe is expected to be a 'bufferedinputpipe' from the util module
53 that handle all the os specific bites. This class lives in this module
53 that handle all the os specific bites. This class lives in this module
54 because it focus on behavior specifig to the ssh protocol."""
54 because it focus on behavior specifig to the ssh protocol."""
55
55
56 def __init__(self, ui, main, side):
56 def __init__(self, ui, main, side):
57 self._ui = ui
57 self._ui = ui
58 self._main = main
58 self._main = main
59 self._side = side
59 self._side = side
60
60
61 def _wait(self):
61 def _wait(self):
62 """wait until some data are available on main or side
62 """wait until some data are available on main or side
63
63
64 return a pair of boolean (ismainready, issideready)
64 return a pair of boolean (ismainready, issideready)
65
65
66 (This will only wait for data if the setup is supported by `util.poll`)
66 (This will only wait for data if the setup is supported by `util.poll`)
67 """
67 """
68 if self._main.hasbuffer:
68 if self._main.hasbuffer:
69 return (True, True) # main has data, assume side is worth poking at.
69 return (True, True) # main has data, assume side is worth poking at.
70 fds = [self._main.fileno(), self._side.fileno()]
70 fds = [self._main.fileno(), self._side.fileno()]
71 try:
71 try:
72 act = util.poll(fds)
72 act = util.poll(fds)
73 except NotImplementedError:
73 except NotImplementedError:
74 # non supported yet case, assume all have data.
74 # non supported yet case, assume all have data.
75 act = fds
75 act = fds
76 return (self._main.fileno() in act, self._side.fileno() in act)
76 return (self._main.fileno() in act, self._side.fileno() in act)
77
77
78 def write(self, data):
79 return self._call('write', data)
80
78 def read(self, size):
81 def read(self, size):
79 return self._call('read', size)
82 return self._call('read', size)
80
83
81 def readline(self):
84 def readline(self):
82 return self._call('readline')
85 return self._call('readline')
83
86
84 def _call(self, methname, data=None):
87 def _call(self, methname, data=None):
85 """call <methname> on "main", forward output of "side" while blocking
88 """call <methname> on "main", forward output of "side" while blocking
86 """
89 """
87 # data can be '' or 0
90 # data can be '' or 0
88 if (data is not None and not data) or self._main.closed:
91 if (data is not None and not data) or self._main.closed:
89 _forwardoutput(self._ui, self._side)
92 _forwardoutput(self._ui, self._side)
90 return ''
93 return ''
91 while True:
94 while True:
92 mainready, sideready = self._wait()
95 mainready, sideready = self._wait()
93 if sideready:
96 if sideready:
94 _forwardoutput(self._ui, self._side)
97 _forwardoutput(self._ui, self._side)
95 if mainready:
98 if mainready:
96 meth = getattr(self._main, methname)
99 meth = getattr(self._main, methname)
97 if data is None:
100 if data is None:
98 return meth()
101 return meth()
99 else:
102 else:
100 return meth(data)
103 return meth(data)
101
104
102 def close(self):
105 def close(self):
103 return self._main.close()
106 return self._main.close()
104
107
108 def flush(self):
109 return self._main.flush()
110
105 class sshpeer(wireproto.wirepeer):
111 class sshpeer(wireproto.wirepeer):
106 def __init__(self, ui, path, create=False):
112 def __init__(self, ui, path, create=False):
107 self._url = path
113 self._url = path
108 self.ui = ui
114 self.ui = ui
109 self.pipeo = self.pipei = self.pipee = None
115 self.pipeo = self.pipei = self.pipee = None
110
116
111 u = util.url(path, parsequery=False, parsefragment=False)
117 u = util.url(path, parsequery=False, parsefragment=False)
112 if u.scheme != 'ssh' or not u.host or u.path is None:
118 if u.scheme != 'ssh' or not u.host or u.path is None:
113 self._abort(error.RepoError(_("couldn't parse location %s") % path))
119 self._abort(error.RepoError(_("couldn't parse location %s") % path))
114
120
115 self.user = u.user
121 self.user = u.user
116 if u.passwd is not None:
122 if u.passwd is not None:
117 self._abort(error.RepoError(_("password in URL not supported")))
123 self._abort(error.RepoError(_("password in URL not supported")))
118 self.host = u.host
124 self.host = u.host
119 self.port = u.port
125 self.port = u.port
120 self.path = u.path or "."
126 self.path = u.path or "."
121
127
122 sshcmd = self.ui.config("ui", "ssh", "ssh")
128 sshcmd = self.ui.config("ui", "ssh", "ssh")
123 remotecmd = self.ui.config("ui", "remotecmd", "hg")
129 remotecmd = self.ui.config("ui", "remotecmd", "hg")
124
130
125 args = util.sshargs(sshcmd,
131 args = util.sshargs(sshcmd,
126 _serverquote(self.host),
132 _serverquote(self.host),
127 _serverquote(self.user),
133 _serverquote(self.user),
128 _serverquote(self.port))
134 _serverquote(self.port))
129
135
130 if create:
136 if create:
131 cmd = '%s %s %s' % (sshcmd, args,
137 cmd = '%s %s %s' % (sshcmd, args,
132 util.shellquote("%s init %s" %
138 util.shellquote("%s init %s" %
133 (_serverquote(remotecmd), _serverquote(self.path))))
139 (_serverquote(remotecmd), _serverquote(self.path))))
134 ui.debug('running %s\n' % cmd)
140 ui.debug('running %s\n' % cmd)
135 res = ui.system(cmd)
141 res = ui.system(cmd)
136 if res != 0:
142 if res != 0:
137 self._abort(error.RepoError(_("could not create remote repo")))
143 self._abort(error.RepoError(_("could not create remote repo")))
138
144
139 self._validaterepo(sshcmd, args, remotecmd)
145 self._validaterepo(sshcmd, args, remotecmd)
140
146
141 def url(self):
147 def url(self):
142 return self._url
148 return self._url
143
149
144 def _validaterepo(self, sshcmd, args, remotecmd):
150 def _validaterepo(self, sshcmd, args, remotecmd):
145 # cleanup up previous run
151 # cleanup up previous run
146 self.cleanup()
152 self.cleanup()
147
153
148 cmd = '%s %s %s' % (sshcmd, args,
154 cmd = '%s %s %s' % (sshcmd, args,
149 util.shellquote("%s -R %s serve --stdio" %
155 util.shellquote("%s -R %s serve --stdio" %
150 (_serverquote(remotecmd), _serverquote(self.path))))
156 (_serverquote(remotecmd), _serverquote(self.path))))
151 self.ui.debug('running %s\n' % cmd)
157 self.ui.debug('running %s\n' % cmd)
152 cmd = util.quotecommand(cmd)
158 cmd = util.quotecommand(cmd)
153
159
154 # while self.subprocess isn't used, having it allows the subprocess to
160 # while self.subprocess isn't used, having it allows the subprocess to
155 # to clean up correctly later
161 # to clean up correctly later
156 #
162 #
157 # no buffer allow the use of 'select'
163 # no buffer allow the use of 'select'
158 # feel free to remove buffering and select usage when we ultimately
164 # feel free to remove buffering and select usage when we ultimately
159 # move to threading.
165 # move to threading.
160 sub = util.popen4(cmd, bufsize=0)
166 sub = util.popen4(cmd, bufsize=0)
161 self.pipeo, self.pipei, self.pipee, self.subprocess = sub
167 self.pipeo, self.pipei, self.pipee, self.subprocess = sub
162
168
163 self.pipei = util.bufferedinputpipe(self.pipei)
169 self.pipei = util.bufferedinputpipe(self.pipei)
164 self.pipei = doublepipe(self.ui, self.pipei, self.pipee)
170 self.pipei = doublepipe(self.ui, self.pipei, self.pipee)
165
171
166 # skip any noise generated by remote shell
172 # skip any noise generated by remote shell
167 self._callstream("hello")
173 self._callstream("hello")
168 r = self._callstream("between", pairs=("%s-%s" % ("0"*40, "0"*40)))
174 r = self._callstream("between", pairs=("%s-%s" % ("0"*40, "0"*40)))
169 lines = ["", "dummy"]
175 lines = ["", "dummy"]
170 max_noise = 500
176 max_noise = 500
171 while lines[-1] and max_noise:
177 while lines[-1] and max_noise:
172 l = r.readline()
178 l = r.readline()
173 self.readerr()
179 self.readerr()
174 if lines[-1] == "1\n" and l == "\n":
180 if lines[-1] == "1\n" and l == "\n":
175 break
181 break
176 if l:
182 if l:
177 self.ui.debug("remote: ", l)
183 self.ui.debug("remote: ", l)
178 lines.append(l)
184 lines.append(l)
179 max_noise -= 1
185 max_noise -= 1
180 else:
186 else:
181 self._abort(error.RepoError(_('no suitable response from '
187 self._abort(error.RepoError(_('no suitable response from '
182 'remote hg')))
188 'remote hg')))
183
189
184 self._caps = set()
190 self._caps = set()
185 for l in reversed(lines):
191 for l in reversed(lines):
186 if l.startswith("capabilities:"):
192 if l.startswith("capabilities:"):
187 self._caps.update(l[:-1].split(":")[1].split())
193 self._caps.update(l[:-1].split(":")[1].split())
188 break
194 break
189
195
190 def _capabilities(self):
196 def _capabilities(self):
191 return self._caps
197 return self._caps
192
198
193 def readerr(self):
199 def readerr(self):
194 _forwardoutput(self.ui, self.pipee)
200 _forwardoutput(self.ui, self.pipee)
195
201
196 def _abort(self, exception):
202 def _abort(self, exception):
197 self.cleanup()
203 self.cleanup()
198 raise exception
204 raise exception
199
205
200 def cleanup(self):
206 def cleanup(self):
201 if self.pipeo is None:
207 if self.pipeo is None:
202 return
208 return
203 self.pipeo.close()
209 self.pipeo.close()
204 self.pipei.close()
210 self.pipei.close()
205 try:
211 try:
206 # read the error descriptor until EOF
212 # read the error descriptor until EOF
207 for l in self.pipee:
213 for l in self.pipee:
208 self.ui.status(_("remote: "), l)
214 self.ui.status(_("remote: "), l)
209 except (IOError, ValueError):
215 except (IOError, ValueError):
210 pass
216 pass
211 self.pipee.close()
217 self.pipee.close()
212
218
213 __del__ = cleanup
219 __del__ = cleanup
214
220
215 def _callstream(self, cmd, **args):
221 def _callstream(self, cmd, **args):
216 self.ui.debug("sending %s command\n" % cmd)
222 self.ui.debug("sending %s command\n" % cmd)
217 self.pipeo.write("%s\n" % cmd)
223 self.pipeo.write("%s\n" % cmd)
218 _func, names = wireproto.commands[cmd]
224 _func, names = wireproto.commands[cmd]
219 keys = names.split()
225 keys = names.split()
220 wireargs = {}
226 wireargs = {}
221 for k in keys:
227 for k in keys:
222 if k == '*':
228 if k == '*':
223 wireargs['*'] = args
229 wireargs['*'] = args
224 break
230 break
225 else:
231 else:
226 wireargs[k] = args[k]
232 wireargs[k] = args[k]
227 del args[k]
233 del args[k]
228 for k, v in sorted(wireargs.iteritems()):
234 for k, v in sorted(wireargs.iteritems()):
229 self.pipeo.write("%s %d\n" % (k, len(v)))
235 self.pipeo.write("%s %d\n" % (k, len(v)))
230 if isinstance(v, dict):
236 if isinstance(v, dict):
231 for dk, dv in v.iteritems():
237 for dk, dv in v.iteritems():
232 self.pipeo.write("%s %d\n" % (dk, len(dv)))
238 self.pipeo.write("%s %d\n" % (dk, len(dv)))
233 self.pipeo.write(dv)
239 self.pipeo.write(dv)
234 else:
240 else:
235 self.pipeo.write(v)
241 self.pipeo.write(v)
236 self.pipeo.flush()
242 self.pipeo.flush()
237
243
238 return self.pipei
244 return self.pipei
239
245
240 def _callcompressable(self, cmd, **args):
246 def _callcompressable(self, cmd, **args):
241 return self._callstream(cmd, **args)
247 return self._callstream(cmd, **args)
242
248
243 def _call(self, cmd, **args):
249 def _call(self, cmd, **args):
244 self._callstream(cmd, **args)
250 self._callstream(cmd, **args)
245 return self._recv()
251 return self._recv()
246
252
247 def _callpush(self, cmd, fp, **args):
253 def _callpush(self, cmd, fp, **args):
248 r = self._call(cmd, **args)
254 r = self._call(cmd, **args)
249 if r:
255 if r:
250 return '', r
256 return '', r
251 while True:
257 while True:
252 d = fp.read(4096)
258 d = fp.read(4096)
253 if not d:
259 if not d:
254 break
260 break
255 self._send(d)
261 self._send(d)
256 self._send("", flush=True)
262 self._send("", flush=True)
257 r = self._recv()
263 r = self._recv()
258 if r:
264 if r:
259 return '', r
265 return '', r
260 return self._recv(), ''
266 return self._recv(), ''
261
267
262 def _calltwowaystream(self, cmd, fp, **args):
268 def _calltwowaystream(self, cmd, fp, **args):
263 r = self._call(cmd, **args)
269 r = self._call(cmd, **args)
264 if r:
270 if r:
265 # XXX needs to be made better
271 # XXX needs to be made better
266 raise util.Abort('unexpected remote reply: %s' % r)
272 raise util.Abort('unexpected remote reply: %s' % r)
267 while True:
273 while True:
268 d = fp.read(4096)
274 d = fp.read(4096)
269 if not d:
275 if not d:
270 break
276 break
271 self._send(d)
277 self._send(d)
272 self._send("", flush=True)
278 self._send("", flush=True)
273 return self.pipei
279 return self.pipei
274
280
275 def _recv(self):
281 def _recv(self):
276 l = self.pipei.readline()
282 l = self.pipei.readline()
277 if l == '\n':
283 if l == '\n':
278 self.readerr()
284 self.readerr()
279 msg = _('check previous remote output')
285 msg = _('check previous remote output')
280 self._abort(error.OutOfBandError(hint=msg))
286 self._abort(error.OutOfBandError(hint=msg))
281 self.readerr()
287 self.readerr()
282 try:
288 try:
283 l = int(l)
289 l = int(l)
284 except ValueError:
290 except ValueError:
285 self._abort(error.ResponseError(_("unexpected response:"), l))
291 self._abort(error.ResponseError(_("unexpected response:"), l))
286 return self.pipei.read(l)
292 return self.pipei.read(l)
287
293
288 def _send(self, data, flush=False):
294 def _send(self, data, flush=False):
289 self.pipeo.write("%d\n" % len(data))
295 self.pipeo.write("%d\n" % len(data))
290 if data:
296 if data:
291 self.pipeo.write(data)
297 self.pipeo.write(data)
292 if flush:
298 if flush:
293 self.pipeo.flush()
299 self.pipeo.flush()
294 self.readerr()
300 self.readerr()
295
301
296 def lock(self):
302 def lock(self):
297 self._call("lock")
303 self._call("lock")
298 return remotelock(self)
304 return remotelock(self)
299
305
300 def unlock(self):
306 def unlock(self):
301 self._call("unlock")
307 self._call("unlock")
302
308
303 def addchangegroup(self, cg, source, url, lock=None):
309 def addchangegroup(self, cg, source, url, lock=None):
304 '''Send a changegroup to the remote server. Return an integer
310 '''Send a changegroup to the remote server. Return an integer
305 similar to unbundle(). DEPRECATED, since it requires locking the
311 similar to unbundle(). DEPRECATED, since it requires locking the
306 remote.'''
312 remote.'''
307 d = self._call("addchangegroup")
313 d = self._call("addchangegroup")
308 if d:
314 if d:
309 self._abort(error.RepoError(_("push refused: %s") % d))
315 self._abort(error.RepoError(_("push refused: %s") % d))
310 while True:
316 while True:
311 d = cg.read(4096)
317 d = cg.read(4096)
312 if not d:
318 if not d:
313 break
319 break
314 self.pipeo.write(d)
320 self.pipeo.write(d)
315 self.readerr()
321 self.readerr()
316
322
317 self.pipeo.flush()
323 self.pipeo.flush()
318
324
319 self.readerr()
325 self.readerr()
320 r = self._recv()
326 r = self._recv()
321 if not r:
327 if not r:
322 return 1
328 return 1
323 try:
329 try:
324 return int(r)
330 return int(r)
325 except ValueError:
331 except ValueError:
326 self._abort(error.ResponseError(_("unexpected response:"), r))
332 self._abort(error.ResponseError(_("unexpected response:"), r))
327
333
328 instance = sshpeer
334 instance = sshpeer
General Comments 0
You need to be logged in to leave comments. Login now