##// END OF EJS Templates
sshpeer: allow doublepipe on unbuffered main pipe...
Pierre-Yves David -
r25457:2afa7481 default
parent child Browse files
Show More
@@ -1,334 +1,334 b''
1 # sshpeer.py - ssh repository proxy class for mercurial
1 # sshpeer.py - ssh repository proxy class for mercurial
2 #
2 #
3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 import re
8 import re
9 from i18n import _
9 from i18n import _
10 import util, error, wireproto
10 import util, error, wireproto
11
11
12 class remotelock(object):
12 class remotelock(object):
13 def __init__(self, repo):
13 def __init__(self, repo):
14 self.repo = repo
14 self.repo = repo
15 def release(self):
15 def release(self):
16 self.repo.unlock()
16 self.repo.unlock()
17 self.repo = None
17 self.repo = None
18 def __del__(self):
18 def __del__(self):
19 if self.repo:
19 if self.repo:
20 self.release()
20 self.release()
21
21
22 def _serverquote(s):
22 def _serverquote(s):
23 if not s:
23 if not s:
24 return s
24 return s
25 '''quote a string for the remote shell ... which we assume is sh'''
25 '''quote a string for the remote shell ... which we assume is sh'''
26 if re.match('[a-zA-Z0-9@%_+=:,./-]*$', s):
26 if re.match('[a-zA-Z0-9@%_+=:,./-]*$', s):
27 return s
27 return s
28 return "'%s'" % s.replace("'", "'\\''")
28 return "'%s'" % s.replace("'", "'\\''")
29
29
30 def _forwardoutput(ui, pipe):
30 def _forwardoutput(ui, pipe):
31 """display all data currently available on pipe as remote output.
31 """display all data currently available on pipe as remote output.
32
32
33 This is non blocking."""
33 This is non blocking."""
34 s = util.readpipe(pipe)
34 s = util.readpipe(pipe)
35 if s:
35 if s:
36 for l in s.splitlines():
36 for l in s.splitlines():
37 ui.status(_("remote: "), l, '\n')
37 ui.status(_("remote: "), l, '\n')
38
38
39 class doublepipe(object):
39 class doublepipe(object):
40 """Operate a side-channel pipe in addition of a main one
40 """Operate a side-channel pipe in addition of a main one
41
41
42 The side-channel pipe contains server output to be forwarded to the user
42 The side-channel pipe contains server output to be forwarded to the user
43 input. The double pipe will behave as the "main" pipe, but will ensure the
43 input. The double pipe will behave as the "main" pipe, but will ensure the
44 content of the "side" pipe is properly processed while we wait for blocking
44 content of the "side" pipe is properly processed while we wait for blocking
45 call on the "main" pipe.
45 call on the "main" pipe.
46
46
47 If large amounts of data are read from "main", the forward will cease after
47 If large amounts of data are read from "main", the forward will cease after
48 the first bytes start to appear. This simplifies the implementation
48 the first bytes start to appear. This simplifies the implementation
49 without affecting actual output of sshpeer too much as we rarely issue
49 without affecting actual output of sshpeer too much as we rarely issue
50 large read for data not yet emitted by the server.
50 large read for data not yet emitted by the server.
51
51
52 The main pipe is expected to be a 'bufferedinputpipe' from the util module
52 The main pipe is expected to be a 'bufferedinputpipe' from the util module
53 that handle all the os specific bites. This class lives in this module
53 that handle all the os specific bites. This class lives in this module
54 because it focus on behavior specifig to the ssh protocol."""
54 because it focus on behavior specifig to the ssh protocol."""
55
55
56 def __init__(self, ui, main, side):
56 def __init__(self, ui, main, side):
57 self._ui = ui
57 self._ui = ui
58 self._main = main
58 self._main = main
59 self._side = side
59 self._side = side
60
60
61 def _wait(self):
61 def _wait(self):
62 """wait until some data are available on main or side
62 """wait until some data are available on main or side
63
63
64 return a pair of boolean (ismainready, issideready)
64 return a pair of boolean (ismainready, issideready)
65
65
66 (This will only wait for data if the setup is supported by `util.poll`)
66 (This will only wait for data if the setup is supported by `util.poll`)
67 """
67 """
68 if self._main.hasbuffer:
68 if getattr(self._main, 'hasbuffer', False): # getattr for classic pipe
69 return (True, True) # main has data, assume side is worth poking at.
69 return (True, True) # main has data, assume side is worth poking at.
70 fds = [self._main.fileno(), self._side.fileno()]
70 fds = [self._main.fileno(), self._side.fileno()]
71 try:
71 try:
72 act = util.poll(fds)
72 act = util.poll(fds)
73 except NotImplementedError:
73 except NotImplementedError:
74 # non supported yet case, assume all have data.
74 # non supported yet case, assume all have data.
75 act = fds
75 act = fds
76 return (self._main.fileno() in act, self._side.fileno() in act)
76 return (self._main.fileno() in act, self._side.fileno() in act)
77
77
78 def write(self, data):
78 def write(self, data):
79 return self._call('write', data)
79 return self._call('write', data)
80
80
81 def read(self, size):
81 def read(self, size):
82 return self._call('read', size)
82 return self._call('read', size)
83
83
84 def readline(self):
84 def readline(self):
85 return self._call('readline')
85 return self._call('readline')
86
86
87 def _call(self, methname, data=None):
87 def _call(self, methname, data=None):
88 """call <methname> on "main", forward output of "side" while blocking
88 """call <methname> on "main", forward output of "side" while blocking
89 """
89 """
90 # data can be '' or 0
90 # data can be '' or 0
91 if (data is not None and not data) or self._main.closed:
91 if (data is not None and not data) or self._main.closed:
92 _forwardoutput(self._ui, self._side)
92 _forwardoutput(self._ui, self._side)
93 return ''
93 return ''
94 while True:
94 while True:
95 mainready, sideready = self._wait()
95 mainready, sideready = self._wait()
96 if sideready:
96 if sideready:
97 _forwardoutput(self._ui, self._side)
97 _forwardoutput(self._ui, self._side)
98 if mainready:
98 if mainready:
99 meth = getattr(self._main, methname)
99 meth = getattr(self._main, methname)
100 if data is None:
100 if data is None:
101 return meth()
101 return meth()
102 else:
102 else:
103 return meth(data)
103 return meth(data)
104
104
105 def close(self):
105 def close(self):
106 return self._main.close()
106 return self._main.close()
107
107
108 def flush(self):
108 def flush(self):
109 return self._main.flush()
109 return self._main.flush()
110
110
111 class sshpeer(wireproto.wirepeer):
111 class sshpeer(wireproto.wirepeer):
112 def __init__(self, ui, path, create=False):
112 def __init__(self, ui, path, create=False):
113 self._url = path
113 self._url = path
114 self.ui = ui
114 self.ui = ui
115 self.pipeo = self.pipei = self.pipee = None
115 self.pipeo = self.pipei = self.pipee = None
116
116
117 u = util.url(path, parsequery=False, parsefragment=False)
117 u = util.url(path, parsequery=False, parsefragment=False)
118 if u.scheme != 'ssh' or not u.host or u.path is None:
118 if u.scheme != 'ssh' or not u.host or u.path is None:
119 self._abort(error.RepoError(_("couldn't parse location %s") % path))
119 self._abort(error.RepoError(_("couldn't parse location %s") % path))
120
120
121 self.user = u.user
121 self.user = u.user
122 if u.passwd is not None:
122 if u.passwd is not None:
123 self._abort(error.RepoError(_("password in URL not supported")))
123 self._abort(error.RepoError(_("password in URL not supported")))
124 self.host = u.host
124 self.host = u.host
125 self.port = u.port
125 self.port = u.port
126 self.path = u.path or "."
126 self.path = u.path or "."
127
127
128 sshcmd = self.ui.config("ui", "ssh", "ssh")
128 sshcmd = self.ui.config("ui", "ssh", "ssh")
129 remotecmd = self.ui.config("ui", "remotecmd", "hg")
129 remotecmd = self.ui.config("ui", "remotecmd", "hg")
130
130
131 args = util.sshargs(sshcmd,
131 args = util.sshargs(sshcmd,
132 _serverquote(self.host),
132 _serverquote(self.host),
133 _serverquote(self.user),
133 _serverquote(self.user),
134 _serverquote(self.port))
134 _serverquote(self.port))
135
135
136 if create:
136 if create:
137 cmd = '%s %s %s' % (sshcmd, args,
137 cmd = '%s %s %s' % (sshcmd, args,
138 util.shellquote("%s init %s" %
138 util.shellquote("%s init %s" %
139 (_serverquote(remotecmd), _serverquote(self.path))))
139 (_serverquote(remotecmd), _serverquote(self.path))))
140 ui.debug('running %s\n' % cmd)
140 ui.debug('running %s\n' % cmd)
141 res = ui.system(cmd)
141 res = ui.system(cmd)
142 if res != 0:
142 if res != 0:
143 self._abort(error.RepoError(_("could not create remote repo")))
143 self._abort(error.RepoError(_("could not create remote repo")))
144
144
145 self._validaterepo(sshcmd, args, remotecmd)
145 self._validaterepo(sshcmd, args, remotecmd)
146
146
147 def url(self):
147 def url(self):
148 return self._url
148 return self._url
149
149
150 def _validaterepo(self, sshcmd, args, remotecmd):
150 def _validaterepo(self, sshcmd, args, remotecmd):
151 # cleanup up previous run
151 # cleanup up previous run
152 self.cleanup()
152 self.cleanup()
153
153
154 cmd = '%s %s %s' % (sshcmd, args,
154 cmd = '%s %s %s' % (sshcmd, args,
155 util.shellquote("%s -R %s serve --stdio" %
155 util.shellquote("%s -R %s serve --stdio" %
156 (_serverquote(remotecmd), _serverquote(self.path))))
156 (_serverquote(remotecmd), _serverquote(self.path))))
157 self.ui.debug('running %s\n' % cmd)
157 self.ui.debug('running %s\n' % cmd)
158 cmd = util.quotecommand(cmd)
158 cmd = util.quotecommand(cmd)
159
159
160 # while self.subprocess isn't used, having it allows the subprocess to
160 # while self.subprocess isn't used, having it allows the subprocess to
161 # to clean up correctly later
161 # to clean up correctly later
162 #
162 #
163 # no buffer allow the use of 'select'
163 # no buffer allow the use of 'select'
164 # feel free to remove buffering and select usage when we ultimately
164 # feel free to remove buffering and select usage when we ultimately
165 # move to threading.
165 # move to threading.
166 sub = util.popen4(cmd, bufsize=0)
166 sub = util.popen4(cmd, bufsize=0)
167 self.pipeo, self.pipei, self.pipee, self.subprocess = sub
167 self.pipeo, self.pipei, self.pipee, self.subprocess = sub
168
168
169 self.pipei = util.bufferedinputpipe(self.pipei)
169 self.pipei = util.bufferedinputpipe(self.pipei)
170 self.pipei = doublepipe(self.ui, self.pipei, self.pipee)
170 self.pipei = doublepipe(self.ui, self.pipei, self.pipee)
171
171
172 # skip any noise generated by remote shell
172 # skip any noise generated by remote shell
173 self._callstream("hello")
173 self._callstream("hello")
174 r = self._callstream("between", pairs=("%s-%s" % ("0"*40, "0"*40)))
174 r = self._callstream("between", pairs=("%s-%s" % ("0"*40, "0"*40)))
175 lines = ["", "dummy"]
175 lines = ["", "dummy"]
176 max_noise = 500
176 max_noise = 500
177 while lines[-1] and max_noise:
177 while lines[-1] and max_noise:
178 l = r.readline()
178 l = r.readline()
179 self.readerr()
179 self.readerr()
180 if lines[-1] == "1\n" and l == "\n":
180 if lines[-1] == "1\n" and l == "\n":
181 break
181 break
182 if l:
182 if l:
183 self.ui.debug("remote: ", l)
183 self.ui.debug("remote: ", l)
184 lines.append(l)
184 lines.append(l)
185 max_noise -= 1
185 max_noise -= 1
186 else:
186 else:
187 self._abort(error.RepoError(_('no suitable response from '
187 self._abort(error.RepoError(_('no suitable response from '
188 'remote hg')))
188 'remote hg')))
189
189
190 self._caps = set()
190 self._caps = set()
191 for l in reversed(lines):
191 for l in reversed(lines):
192 if l.startswith("capabilities:"):
192 if l.startswith("capabilities:"):
193 self._caps.update(l[:-1].split(":")[1].split())
193 self._caps.update(l[:-1].split(":")[1].split())
194 break
194 break
195
195
196 def _capabilities(self):
196 def _capabilities(self):
197 return self._caps
197 return self._caps
198
198
199 def readerr(self):
199 def readerr(self):
200 _forwardoutput(self.ui, self.pipee)
200 _forwardoutput(self.ui, self.pipee)
201
201
202 def _abort(self, exception):
202 def _abort(self, exception):
203 self.cleanup()
203 self.cleanup()
204 raise exception
204 raise exception
205
205
206 def cleanup(self):
206 def cleanup(self):
207 if self.pipeo is None:
207 if self.pipeo is None:
208 return
208 return
209 self.pipeo.close()
209 self.pipeo.close()
210 self.pipei.close()
210 self.pipei.close()
211 try:
211 try:
212 # read the error descriptor until EOF
212 # read the error descriptor until EOF
213 for l in self.pipee:
213 for l in self.pipee:
214 self.ui.status(_("remote: "), l)
214 self.ui.status(_("remote: "), l)
215 except (IOError, ValueError):
215 except (IOError, ValueError):
216 pass
216 pass
217 self.pipee.close()
217 self.pipee.close()
218
218
219 __del__ = cleanup
219 __del__ = cleanup
220
220
221 def _callstream(self, cmd, **args):
221 def _callstream(self, cmd, **args):
222 self.ui.debug("sending %s command\n" % cmd)
222 self.ui.debug("sending %s command\n" % cmd)
223 self.pipeo.write("%s\n" % cmd)
223 self.pipeo.write("%s\n" % cmd)
224 _func, names = wireproto.commands[cmd]
224 _func, names = wireproto.commands[cmd]
225 keys = names.split()
225 keys = names.split()
226 wireargs = {}
226 wireargs = {}
227 for k in keys:
227 for k in keys:
228 if k == '*':
228 if k == '*':
229 wireargs['*'] = args
229 wireargs['*'] = args
230 break
230 break
231 else:
231 else:
232 wireargs[k] = args[k]
232 wireargs[k] = args[k]
233 del args[k]
233 del args[k]
234 for k, v in sorted(wireargs.iteritems()):
234 for k, v in sorted(wireargs.iteritems()):
235 self.pipeo.write("%s %d\n" % (k, len(v)))
235 self.pipeo.write("%s %d\n" % (k, len(v)))
236 if isinstance(v, dict):
236 if isinstance(v, dict):
237 for dk, dv in v.iteritems():
237 for dk, dv in v.iteritems():
238 self.pipeo.write("%s %d\n" % (dk, len(dv)))
238 self.pipeo.write("%s %d\n" % (dk, len(dv)))
239 self.pipeo.write(dv)
239 self.pipeo.write(dv)
240 else:
240 else:
241 self.pipeo.write(v)
241 self.pipeo.write(v)
242 self.pipeo.flush()
242 self.pipeo.flush()
243
243
244 return self.pipei
244 return self.pipei
245
245
246 def _callcompressable(self, cmd, **args):
246 def _callcompressable(self, cmd, **args):
247 return self._callstream(cmd, **args)
247 return self._callstream(cmd, **args)
248
248
249 def _call(self, cmd, **args):
249 def _call(self, cmd, **args):
250 self._callstream(cmd, **args)
250 self._callstream(cmd, **args)
251 return self._recv()
251 return self._recv()
252
252
253 def _callpush(self, cmd, fp, **args):
253 def _callpush(self, cmd, fp, **args):
254 r = self._call(cmd, **args)
254 r = self._call(cmd, **args)
255 if r:
255 if r:
256 return '', r
256 return '', r
257 while True:
257 while True:
258 d = fp.read(4096)
258 d = fp.read(4096)
259 if not d:
259 if not d:
260 break
260 break
261 self._send(d)
261 self._send(d)
262 self._send("", flush=True)
262 self._send("", flush=True)
263 r = self._recv()
263 r = self._recv()
264 if r:
264 if r:
265 return '', r
265 return '', r
266 return self._recv(), ''
266 return self._recv(), ''
267
267
268 def _calltwowaystream(self, cmd, fp, **args):
268 def _calltwowaystream(self, cmd, fp, **args):
269 r = self._call(cmd, **args)
269 r = self._call(cmd, **args)
270 if r:
270 if r:
271 # XXX needs to be made better
271 # XXX needs to be made better
272 raise util.Abort('unexpected remote reply: %s' % r)
272 raise util.Abort('unexpected remote reply: %s' % r)
273 while True:
273 while True:
274 d = fp.read(4096)
274 d = fp.read(4096)
275 if not d:
275 if not d:
276 break
276 break
277 self._send(d)
277 self._send(d)
278 self._send("", flush=True)
278 self._send("", flush=True)
279 return self.pipei
279 return self.pipei
280
280
281 def _recv(self):
281 def _recv(self):
282 l = self.pipei.readline()
282 l = self.pipei.readline()
283 if l == '\n':
283 if l == '\n':
284 self.readerr()
284 self.readerr()
285 msg = _('check previous remote output')
285 msg = _('check previous remote output')
286 self._abort(error.OutOfBandError(hint=msg))
286 self._abort(error.OutOfBandError(hint=msg))
287 self.readerr()
287 self.readerr()
288 try:
288 try:
289 l = int(l)
289 l = int(l)
290 except ValueError:
290 except ValueError:
291 self._abort(error.ResponseError(_("unexpected response:"), l))
291 self._abort(error.ResponseError(_("unexpected response:"), l))
292 return self.pipei.read(l)
292 return self.pipei.read(l)
293
293
294 def _send(self, data, flush=False):
294 def _send(self, data, flush=False):
295 self.pipeo.write("%d\n" % len(data))
295 self.pipeo.write("%d\n" % len(data))
296 if data:
296 if data:
297 self.pipeo.write(data)
297 self.pipeo.write(data)
298 if flush:
298 if flush:
299 self.pipeo.flush()
299 self.pipeo.flush()
300 self.readerr()
300 self.readerr()
301
301
302 def lock(self):
302 def lock(self):
303 self._call("lock")
303 self._call("lock")
304 return remotelock(self)
304 return remotelock(self)
305
305
306 def unlock(self):
306 def unlock(self):
307 self._call("unlock")
307 self._call("unlock")
308
308
309 def addchangegroup(self, cg, source, url, lock=None):
309 def addchangegroup(self, cg, source, url, lock=None):
310 '''Send a changegroup to the remote server. Return an integer
310 '''Send a changegroup to the remote server. Return an integer
311 similar to unbundle(). DEPRECATED, since it requires locking the
311 similar to unbundle(). DEPRECATED, since it requires locking the
312 remote.'''
312 remote.'''
313 d = self._call("addchangegroup")
313 d = self._call("addchangegroup")
314 if d:
314 if d:
315 self._abort(error.RepoError(_("push refused: %s") % d))
315 self._abort(error.RepoError(_("push refused: %s") % d))
316 while True:
316 while True:
317 d = cg.read(4096)
317 d = cg.read(4096)
318 if not d:
318 if not d:
319 break
319 break
320 self.pipeo.write(d)
320 self.pipeo.write(d)
321 self.readerr()
321 self.readerr()
322
322
323 self.pipeo.flush()
323 self.pipeo.flush()
324
324
325 self.readerr()
325 self.readerr()
326 r = self._recv()
326 r = self._recv()
327 if not r:
327 if not r:
328 return 1
328 return 1
329 try:
329 try:
330 return int(r)
330 return int(r)
331 except ValueError:
331 except ValueError:
332 self._abort(error.ResponseError(_("unexpected response:"), r))
332 self._abort(error.ResponseError(_("unexpected response:"), r))
333
333
334 instance = sshpeer
334 instance = sshpeer
General Comments 0
You need to be logged in to leave comments. Login now