##// END OF EJS Templates
sshpeer: set a blockedtag when starting ssh...
Simon Farnsworth -
r31197:764f4581 default
parent child Browse files
Show More
@@ -1,359 +1,359
1 # sshpeer.py - ssh repository proxy class for mercurial
1 # sshpeer.py - ssh repository proxy class for mercurial
2 #
2 #
3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import re
10 import re
11
11
12 from .i18n import _
12 from .i18n import _
13 from . import (
13 from . import (
14 error,
14 error,
15 util,
15 util,
16 wireproto,
16 wireproto,
17 )
17 )
18
18
19 class remotelock(object):
19 class remotelock(object):
20 def __init__(self, repo):
20 def __init__(self, repo):
21 self.repo = repo
21 self.repo = repo
22 def release(self):
22 def release(self):
23 self.repo.unlock()
23 self.repo.unlock()
24 self.repo = None
24 self.repo = None
25 def __enter__(self):
25 def __enter__(self):
26 return self
26 return self
27 def __exit__(self, exc_type, exc_val, exc_tb):
27 def __exit__(self, exc_type, exc_val, exc_tb):
28 if self.repo:
28 if self.repo:
29 self.release()
29 self.release()
30 def __del__(self):
30 def __del__(self):
31 if self.repo:
31 if self.repo:
32 self.release()
32 self.release()
33
33
34 def _serverquote(s):
34 def _serverquote(s):
35 if not s:
35 if not s:
36 return s
36 return s
37 '''quote a string for the remote shell ... which we assume is sh'''
37 '''quote a string for the remote shell ... which we assume is sh'''
38 if re.match('[a-zA-Z0-9@%_+=:,./-]*$', s):
38 if re.match('[a-zA-Z0-9@%_+=:,./-]*$', s):
39 return s
39 return s
40 return "'%s'" % s.replace("'", "'\\''")
40 return "'%s'" % s.replace("'", "'\\''")
41
41
42 def _forwardoutput(ui, pipe):
42 def _forwardoutput(ui, pipe):
43 """display all data currently available on pipe as remote output.
43 """display all data currently available on pipe as remote output.
44
44
45 This is non blocking."""
45 This is non blocking."""
46 s = util.readpipe(pipe)
46 s = util.readpipe(pipe)
47 if s:
47 if s:
48 for l in s.splitlines():
48 for l in s.splitlines():
49 ui.status(_("remote: "), l, '\n')
49 ui.status(_("remote: "), l, '\n')
50
50
51 class doublepipe(object):
51 class doublepipe(object):
52 """Operate a side-channel pipe in addition of a main one
52 """Operate a side-channel pipe in addition of a main one
53
53
54 The side-channel pipe contains server output to be forwarded to the user
54 The side-channel pipe contains server output to be forwarded to the user
55 input. The double pipe will behave as the "main" pipe, but will ensure the
55 input. The double pipe will behave as the "main" pipe, but will ensure the
56 content of the "side" pipe is properly processed while we wait for blocking
56 content of the "side" pipe is properly processed while we wait for blocking
57 call on the "main" pipe.
57 call on the "main" pipe.
58
58
59 If large amounts of data are read from "main", the forward will cease after
59 If large amounts of data are read from "main", the forward will cease after
60 the first bytes start to appear. This simplifies the implementation
60 the first bytes start to appear. This simplifies the implementation
61 without affecting actual output of sshpeer too much as we rarely issue
61 without affecting actual output of sshpeer too much as we rarely issue
62 large read for data not yet emitted by the server.
62 large read for data not yet emitted by the server.
63
63
64 The main pipe is expected to be a 'bufferedinputpipe' from the util module
64 The main pipe is expected to be a 'bufferedinputpipe' from the util module
65 that handle all the os specific bites. This class lives in this module
65 that handle all the os specific bites. This class lives in this module
66 because it focus on behavior specific to the ssh protocol."""
66 because it focus on behavior specific to the ssh protocol."""
67
67
68 def __init__(self, ui, main, side):
68 def __init__(self, ui, main, side):
69 self._ui = ui
69 self._ui = ui
70 self._main = main
70 self._main = main
71 self._side = side
71 self._side = side
72
72
73 def _wait(self):
73 def _wait(self):
74 """wait until some data are available on main or side
74 """wait until some data are available on main or side
75
75
76 return a pair of boolean (ismainready, issideready)
76 return a pair of boolean (ismainready, issideready)
77
77
78 (This will only wait for data if the setup is supported by `util.poll`)
78 (This will only wait for data if the setup is supported by `util.poll`)
79 """
79 """
80 if getattr(self._main, 'hasbuffer', False): # getattr for classic pipe
80 if getattr(self._main, 'hasbuffer', False): # getattr for classic pipe
81 return (True, True) # main has data, assume side is worth poking at.
81 return (True, True) # main has data, assume side is worth poking at.
82 fds = [self._main.fileno(), self._side.fileno()]
82 fds = [self._main.fileno(), self._side.fileno()]
83 try:
83 try:
84 act = util.poll(fds)
84 act = util.poll(fds)
85 except NotImplementedError:
85 except NotImplementedError:
86 # non supported yet case, assume all have data.
86 # non supported yet case, assume all have data.
87 act = fds
87 act = fds
88 return (self._main.fileno() in act, self._side.fileno() in act)
88 return (self._main.fileno() in act, self._side.fileno() in act)
89
89
90 def write(self, data):
90 def write(self, data):
91 return self._call('write', data)
91 return self._call('write', data)
92
92
93 def read(self, size):
93 def read(self, size):
94 return self._call('read', size)
94 return self._call('read', size)
95
95
96 def readline(self):
96 def readline(self):
97 return self._call('readline')
97 return self._call('readline')
98
98
99 def _call(self, methname, data=None):
99 def _call(self, methname, data=None):
100 """call <methname> on "main", forward output of "side" while blocking
100 """call <methname> on "main", forward output of "side" while blocking
101 """
101 """
102 # data can be '' or 0
102 # data can be '' or 0
103 if (data is not None and not data) or self._main.closed:
103 if (data is not None and not data) or self._main.closed:
104 _forwardoutput(self._ui, self._side)
104 _forwardoutput(self._ui, self._side)
105 return ''
105 return ''
106 while True:
106 while True:
107 mainready, sideready = self._wait()
107 mainready, sideready = self._wait()
108 if sideready:
108 if sideready:
109 _forwardoutput(self._ui, self._side)
109 _forwardoutput(self._ui, self._side)
110 if mainready:
110 if mainready:
111 meth = getattr(self._main, methname)
111 meth = getattr(self._main, methname)
112 if data is None:
112 if data is None:
113 return meth()
113 return meth()
114 else:
114 else:
115 return meth(data)
115 return meth(data)
116
116
117 def close(self):
117 def close(self):
118 return self._main.close()
118 return self._main.close()
119
119
120 def flush(self):
120 def flush(self):
121 return self._main.flush()
121 return self._main.flush()
122
122
123 class sshpeer(wireproto.wirepeer):
123 class sshpeer(wireproto.wirepeer):
124 def __init__(self, ui, path, create=False):
124 def __init__(self, ui, path, create=False):
125 self._url = path
125 self._url = path
126 self.ui = ui
126 self.ui = ui
127 self.pipeo = self.pipei = self.pipee = None
127 self.pipeo = self.pipei = self.pipee = None
128
128
129 u = util.url(path, parsequery=False, parsefragment=False)
129 u = util.url(path, parsequery=False, parsefragment=False)
130 if u.scheme != 'ssh' or not u.host or u.path is None:
130 if u.scheme != 'ssh' or not u.host or u.path is None:
131 self._abort(error.RepoError(_("couldn't parse location %s") % path))
131 self._abort(error.RepoError(_("couldn't parse location %s") % path))
132
132
133 self.user = u.user
133 self.user = u.user
134 if u.passwd is not None:
134 if u.passwd is not None:
135 self._abort(error.RepoError(_("password in URL not supported")))
135 self._abort(error.RepoError(_("password in URL not supported")))
136 self.host = u.host
136 self.host = u.host
137 self.port = u.port
137 self.port = u.port
138 self.path = u.path or "."
138 self.path = u.path or "."
139
139
140 sshcmd = self.ui.config("ui", "ssh", "ssh")
140 sshcmd = self.ui.config("ui", "ssh", "ssh")
141 remotecmd = self.ui.config("ui", "remotecmd", "hg")
141 remotecmd = self.ui.config("ui", "remotecmd", "hg")
142
142
143 args = util.sshargs(sshcmd,
143 args = util.sshargs(sshcmd,
144 _serverquote(self.host),
144 _serverquote(self.host),
145 _serverquote(self.user),
145 _serverquote(self.user),
146 _serverquote(self.port))
146 _serverquote(self.port))
147
147
148 if create:
148 if create:
149 cmd = '%s %s %s' % (sshcmd, args,
149 cmd = '%s %s %s' % (sshcmd, args,
150 util.shellquote("%s init %s" %
150 util.shellquote("%s init %s" %
151 (_serverquote(remotecmd), _serverquote(self.path))))
151 (_serverquote(remotecmd), _serverquote(self.path))))
152 ui.debug('running %s\n' % cmd)
152 ui.debug('running %s\n' % cmd)
153 res = ui.system(cmd)
153 res = ui.system(cmd, blockedtag='sshpeer')
154 if res != 0:
154 if res != 0:
155 self._abort(error.RepoError(_("could not create remote repo")))
155 self._abort(error.RepoError(_("could not create remote repo")))
156
156
157 self._validaterepo(sshcmd, args, remotecmd)
157 self._validaterepo(sshcmd, args, remotecmd)
158
158
159 def url(self):
159 def url(self):
160 return self._url
160 return self._url
161
161
162 def _validaterepo(self, sshcmd, args, remotecmd):
162 def _validaterepo(self, sshcmd, args, remotecmd):
163 # cleanup up previous run
163 # cleanup up previous run
164 self.cleanup()
164 self.cleanup()
165
165
166 cmd = '%s %s %s' % (sshcmd, args,
166 cmd = '%s %s %s' % (sshcmd, args,
167 util.shellquote("%s -R %s serve --stdio" %
167 util.shellquote("%s -R %s serve --stdio" %
168 (_serverquote(remotecmd), _serverquote(self.path))))
168 (_serverquote(remotecmd), _serverquote(self.path))))
169 self.ui.debug('running %s\n' % cmd)
169 self.ui.debug('running %s\n' % cmd)
170 cmd = util.quotecommand(cmd)
170 cmd = util.quotecommand(cmd)
171
171
172 # while self.subprocess isn't used, having it allows the subprocess to
172 # while self.subprocess isn't used, having it allows the subprocess to
173 # to clean up correctly later
173 # to clean up correctly later
174 #
174 #
175 # no buffer allow the use of 'select'
175 # no buffer allow the use of 'select'
176 # feel free to remove buffering and select usage when we ultimately
176 # feel free to remove buffering and select usage when we ultimately
177 # move to threading.
177 # move to threading.
178 sub = util.popen4(cmd, bufsize=0)
178 sub = util.popen4(cmd, bufsize=0)
179 self.pipeo, self.pipei, self.pipee, self.subprocess = sub
179 self.pipeo, self.pipei, self.pipee, self.subprocess = sub
180
180
181 self.pipei = util.bufferedinputpipe(self.pipei)
181 self.pipei = util.bufferedinputpipe(self.pipei)
182 self.pipei = doublepipe(self.ui, self.pipei, self.pipee)
182 self.pipei = doublepipe(self.ui, self.pipei, self.pipee)
183 self.pipeo = doublepipe(self.ui, self.pipeo, self.pipee)
183 self.pipeo = doublepipe(self.ui, self.pipeo, self.pipee)
184
184
185 # skip any noise generated by remote shell
185 # skip any noise generated by remote shell
186 self._callstream("hello")
186 self._callstream("hello")
187 r = self._callstream("between", pairs=("%s-%s" % ("0"*40, "0"*40)))
187 r = self._callstream("between", pairs=("%s-%s" % ("0"*40, "0"*40)))
188 lines = ["", "dummy"]
188 lines = ["", "dummy"]
189 max_noise = 500
189 max_noise = 500
190 while lines[-1] and max_noise:
190 while lines[-1] and max_noise:
191 l = r.readline()
191 l = r.readline()
192 self.readerr()
192 self.readerr()
193 if lines[-1] == "1\n" and l == "\n":
193 if lines[-1] == "1\n" and l == "\n":
194 break
194 break
195 if l:
195 if l:
196 self.ui.debug("remote: ", l)
196 self.ui.debug("remote: ", l)
197 lines.append(l)
197 lines.append(l)
198 max_noise -= 1
198 max_noise -= 1
199 else:
199 else:
200 self._abort(error.RepoError(_('no suitable response from '
200 self._abort(error.RepoError(_('no suitable response from '
201 'remote hg')))
201 'remote hg')))
202
202
203 self._caps = set()
203 self._caps = set()
204 for l in reversed(lines):
204 for l in reversed(lines):
205 if l.startswith("capabilities:"):
205 if l.startswith("capabilities:"):
206 self._caps.update(l[:-1].split(":")[1].split())
206 self._caps.update(l[:-1].split(":")[1].split())
207 break
207 break
208
208
209 def _capabilities(self):
209 def _capabilities(self):
210 return self._caps
210 return self._caps
211
211
212 def readerr(self):
212 def readerr(self):
213 _forwardoutput(self.ui, self.pipee)
213 _forwardoutput(self.ui, self.pipee)
214
214
215 def _abort(self, exception):
215 def _abort(self, exception):
216 self.cleanup()
216 self.cleanup()
217 raise exception
217 raise exception
218
218
219 def cleanup(self):
219 def cleanup(self):
220 if self.pipeo is None:
220 if self.pipeo is None:
221 return
221 return
222 self.pipeo.close()
222 self.pipeo.close()
223 self.pipei.close()
223 self.pipei.close()
224 try:
224 try:
225 # read the error descriptor until EOF
225 # read the error descriptor until EOF
226 for l in self.pipee:
226 for l in self.pipee:
227 self.ui.status(_("remote: "), l)
227 self.ui.status(_("remote: "), l)
228 except (IOError, ValueError):
228 except (IOError, ValueError):
229 pass
229 pass
230 self.pipee.close()
230 self.pipee.close()
231
231
232 __del__ = cleanup
232 __del__ = cleanup
233
233
234 def _submitbatch(self, req):
234 def _submitbatch(self, req):
235 rsp = self._callstream("batch", cmds=wireproto.encodebatchcmds(req))
235 rsp = self._callstream("batch", cmds=wireproto.encodebatchcmds(req))
236 available = self._getamount()
236 available = self._getamount()
237 # TODO this response parsing is probably suboptimal for large
237 # TODO this response parsing is probably suboptimal for large
238 # batches with large responses.
238 # batches with large responses.
239 toread = min(available, 1024)
239 toread = min(available, 1024)
240 work = rsp.read(toread)
240 work = rsp.read(toread)
241 available -= toread
241 available -= toread
242 chunk = work
242 chunk = work
243 while chunk:
243 while chunk:
244 while ';' in work:
244 while ';' in work:
245 one, work = work.split(';', 1)
245 one, work = work.split(';', 1)
246 yield wireproto.unescapearg(one)
246 yield wireproto.unescapearg(one)
247 toread = min(available, 1024)
247 toread = min(available, 1024)
248 chunk = rsp.read(toread)
248 chunk = rsp.read(toread)
249 available -= toread
249 available -= toread
250 work += chunk
250 work += chunk
251 yield wireproto.unescapearg(work)
251 yield wireproto.unescapearg(work)
252
252
253 def _callstream(self, cmd, **args):
253 def _callstream(self, cmd, **args):
254 self.ui.debug("sending %s command\n" % cmd)
254 self.ui.debug("sending %s command\n" % cmd)
255 self.pipeo.write("%s\n" % cmd)
255 self.pipeo.write("%s\n" % cmd)
256 _func, names = wireproto.commands[cmd]
256 _func, names = wireproto.commands[cmd]
257 keys = names.split()
257 keys = names.split()
258 wireargs = {}
258 wireargs = {}
259 for k in keys:
259 for k in keys:
260 if k == '*':
260 if k == '*':
261 wireargs['*'] = args
261 wireargs['*'] = args
262 break
262 break
263 else:
263 else:
264 wireargs[k] = args[k]
264 wireargs[k] = args[k]
265 del args[k]
265 del args[k]
266 for k, v in sorted(wireargs.iteritems()):
266 for k, v in sorted(wireargs.iteritems()):
267 self.pipeo.write("%s %d\n" % (k, len(v)))
267 self.pipeo.write("%s %d\n" % (k, len(v)))
268 if isinstance(v, dict):
268 if isinstance(v, dict):
269 for dk, dv in v.iteritems():
269 for dk, dv in v.iteritems():
270 self.pipeo.write("%s %d\n" % (dk, len(dv)))
270 self.pipeo.write("%s %d\n" % (dk, len(dv)))
271 self.pipeo.write(dv)
271 self.pipeo.write(dv)
272 else:
272 else:
273 self.pipeo.write(v)
273 self.pipeo.write(v)
274 self.pipeo.flush()
274 self.pipeo.flush()
275
275
276 return self.pipei
276 return self.pipei
277
277
278 def _callcompressable(self, cmd, **args):
278 def _callcompressable(self, cmd, **args):
279 return self._callstream(cmd, **args)
279 return self._callstream(cmd, **args)
280
280
281 def _call(self, cmd, **args):
281 def _call(self, cmd, **args):
282 self._callstream(cmd, **args)
282 self._callstream(cmd, **args)
283 return self._recv()
283 return self._recv()
284
284
285 def _callpush(self, cmd, fp, **args):
285 def _callpush(self, cmd, fp, **args):
286 r = self._call(cmd, **args)
286 r = self._call(cmd, **args)
287 if r:
287 if r:
288 return '', r
288 return '', r
289 for d in iter(lambda: fp.read(4096), ''):
289 for d in iter(lambda: fp.read(4096), ''):
290 self._send(d)
290 self._send(d)
291 self._send("", flush=True)
291 self._send("", flush=True)
292 r = self._recv()
292 r = self._recv()
293 if r:
293 if r:
294 return '', r
294 return '', r
295 return self._recv(), ''
295 return self._recv(), ''
296
296
297 def _calltwowaystream(self, cmd, fp, **args):
297 def _calltwowaystream(self, cmd, fp, **args):
298 r = self._call(cmd, **args)
298 r = self._call(cmd, **args)
299 if r:
299 if r:
300 # XXX needs to be made better
300 # XXX needs to be made better
301 raise error.Abort(_('unexpected remote reply: %s') % r)
301 raise error.Abort(_('unexpected remote reply: %s') % r)
302 for d in iter(lambda: fp.read(4096), ''):
302 for d in iter(lambda: fp.read(4096), ''):
303 self._send(d)
303 self._send(d)
304 self._send("", flush=True)
304 self._send("", flush=True)
305 return self.pipei
305 return self.pipei
306
306
307 def _getamount(self):
307 def _getamount(self):
308 l = self.pipei.readline()
308 l = self.pipei.readline()
309 if l == '\n':
309 if l == '\n':
310 self.readerr()
310 self.readerr()
311 msg = _('check previous remote output')
311 msg = _('check previous remote output')
312 self._abort(error.OutOfBandError(hint=msg))
312 self._abort(error.OutOfBandError(hint=msg))
313 self.readerr()
313 self.readerr()
314 try:
314 try:
315 return int(l)
315 return int(l)
316 except ValueError:
316 except ValueError:
317 self._abort(error.ResponseError(_("unexpected response:"), l))
317 self._abort(error.ResponseError(_("unexpected response:"), l))
318
318
319 def _recv(self):
319 def _recv(self):
320 return self.pipei.read(self._getamount())
320 return self.pipei.read(self._getamount())
321
321
322 def _send(self, data, flush=False):
322 def _send(self, data, flush=False):
323 self.pipeo.write("%d\n" % len(data))
323 self.pipeo.write("%d\n" % len(data))
324 if data:
324 if data:
325 self.pipeo.write(data)
325 self.pipeo.write(data)
326 if flush:
326 if flush:
327 self.pipeo.flush()
327 self.pipeo.flush()
328 self.readerr()
328 self.readerr()
329
329
330 def lock(self):
330 def lock(self):
331 self._call("lock")
331 self._call("lock")
332 return remotelock(self)
332 return remotelock(self)
333
333
334 def unlock(self):
334 def unlock(self):
335 self._call("unlock")
335 self._call("unlock")
336
336
337 def addchangegroup(self, cg, source, url, lock=None):
337 def addchangegroup(self, cg, source, url, lock=None):
338 '''Send a changegroup to the remote server. Return an integer
338 '''Send a changegroup to the remote server. Return an integer
339 similar to unbundle(). DEPRECATED, since it requires locking the
339 similar to unbundle(). DEPRECATED, since it requires locking the
340 remote.'''
340 remote.'''
341 d = self._call("addchangegroup")
341 d = self._call("addchangegroup")
342 if d:
342 if d:
343 self._abort(error.RepoError(_("push refused: %s") % d))
343 self._abort(error.RepoError(_("push refused: %s") % d))
344 for d in iter(lambda: cg.read(4096), ''):
344 for d in iter(lambda: cg.read(4096), ''):
345 self.pipeo.write(d)
345 self.pipeo.write(d)
346 self.readerr()
346 self.readerr()
347
347
348 self.pipeo.flush()
348 self.pipeo.flush()
349
349
350 self.readerr()
350 self.readerr()
351 r = self._recv()
351 r = self._recv()
352 if not r:
352 if not r:
353 return 1
353 return 1
354 try:
354 try:
355 return int(r)
355 return int(r)
356 except ValueError:
356 except ValueError:
357 self._abort(error.ResponseError(_("unexpected response:"), r))
357 self._abort(error.ResponseError(_("unexpected response:"), r))
358
358
359 instance = sshpeer
359 instance = sshpeer
General Comments 0
You need to be logged in to leave comments. Login now