##// END OF EJS Templates
sshpeer: use absolute_import
Gregory Szorc -
r25975:de7a3893 default
parent child Browse files
Show More
@@ -1,335 +1,342 b''
1 # sshpeer.py - ssh repository proxy class for mercurial
1 # sshpeer.py - ssh repository proxy class for mercurial
2 #
2 #
3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
9
8 import re
10 import re
9 from i18n import _
11
10 import util, error, wireproto
12 from .i18n import _
13 from . import (
14 error,
15 util,
16 wireproto,
17 )
11
18
12 class remotelock(object):
19 class remotelock(object):
13 def __init__(self, repo):
20 def __init__(self, repo):
14 self.repo = repo
21 self.repo = repo
15 def release(self):
22 def release(self):
16 self.repo.unlock()
23 self.repo.unlock()
17 self.repo = None
24 self.repo = None
18 def __del__(self):
25 def __del__(self):
19 if self.repo:
26 if self.repo:
20 self.release()
27 self.release()
21
28
22 def _serverquote(s):
29 def _serverquote(s):
23 if not s:
30 if not s:
24 return s
31 return s
25 '''quote a string for the remote shell ... which we assume is sh'''
32 '''quote a string for the remote shell ... which we assume is sh'''
26 if re.match('[a-zA-Z0-9@%_+=:,./-]*$', s):
33 if re.match('[a-zA-Z0-9@%_+=:,./-]*$', s):
27 return s
34 return s
28 return "'%s'" % s.replace("'", "'\\''")
35 return "'%s'" % s.replace("'", "'\\''")
29
36
30 def _forwardoutput(ui, pipe):
37 def _forwardoutput(ui, pipe):
31 """display all data currently available on pipe as remote output.
38 """display all data currently available on pipe as remote output.
32
39
33 This is non blocking."""
40 This is non blocking."""
34 s = util.readpipe(pipe)
41 s = util.readpipe(pipe)
35 if s:
42 if s:
36 for l in s.splitlines():
43 for l in s.splitlines():
37 ui.status(_("remote: "), l, '\n')
44 ui.status(_("remote: "), l, '\n')
38
45
39 class doublepipe(object):
46 class doublepipe(object):
40 """Operate a side-channel pipe in addition of a main one
47 """Operate a side-channel pipe in addition of a main one
41
48
42 The side-channel pipe contains server output to be forwarded to the user
49 The side-channel pipe contains server output to be forwarded to the user
43 input. The double pipe will behave as the "main" pipe, but will ensure the
50 input. The double pipe will behave as the "main" pipe, but will ensure the
44 content of the "side" pipe is properly processed while we wait for blocking
51 content of the "side" pipe is properly processed while we wait for blocking
45 call on the "main" pipe.
52 call on the "main" pipe.
46
53
47 If large amounts of data are read from "main", the forward will cease after
54 If large amounts of data are read from "main", the forward will cease after
48 the first bytes start to appear. This simplifies the implementation
55 the first bytes start to appear. This simplifies the implementation
49 without affecting actual output of sshpeer too much as we rarely issue
56 without affecting actual output of sshpeer too much as we rarely issue
50 large read for data not yet emitted by the server.
57 large read for data not yet emitted by the server.
51
58
52 The main pipe is expected to be a 'bufferedinputpipe' from the util module
59 The main pipe is expected to be a 'bufferedinputpipe' from the util module
53 that handle all the os specific bites. This class lives in this module
60 that handle all the os specific bites. This class lives in this module
54 because it focus on behavior specifig to the ssh protocol."""
61 because it focus on behavior specifig to the ssh protocol."""
55
62
56 def __init__(self, ui, main, side):
63 def __init__(self, ui, main, side):
57 self._ui = ui
64 self._ui = ui
58 self._main = main
65 self._main = main
59 self._side = side
66 self._side = side
60
67
61 def _wait(self):
68 def _wait(self):
62 """wait until some data are available on main or side
69 """wait until some data are available on main or side
63
70
64 return a pair of boolean (ismainready, issideready)
71 return a pair of boolean (ismainready, issideready)
65
72
66 (This will only wait for data if the setup is supported by `util.poll`)
73 (This will only wait for data if the setup is supported by `util.poll`)
67 """
74 """
68 if getattr(self._main, 'hasbuffer', False): # getattr for classic pipe
75 if getattr(self._main, 'hasbuffer', False): # getattr for classic pipe
69 return (True, True) # main has data, assume side is worth poking at.
76 return (True, True) # main has data, assume side is worth poking at.
70 fds = [self._main.fileno(), self._side.fileno()]
77 fds = [self._main.fileno(), self._side.fileno()]
71 try:
78 try:
72 act = util.poll(fds)
79 act = util.poll(fds)
73 except NotImplementedError:
80 except NotImplementedError:
74 # non supported yet case, assume all have data.
81 # non supported yet case, assume all have data.
75 act = fds
82 act = fds
76 return (self._main.fileno() in act, self._side.fileno() in act)
83 return (self._main.fileno() in act, self._side.fileno() in act)
77
84
78 def write(self, data):
85 def write(self, data):
79 return self._call('write', data)
86 return self._call('write', data)
80
87
81 def read(self, size):
88 def read(self, size):
82 return self._call('read', size)
89 return self._call('read', size)
83
90
84 def readline(self):
91 def readline(self):
85 return self._call('readline')
92 return self._call('readline')
86
93
87 def _call(self, methname, data=None):
94 def _call(self, methname, data=None):
88 """call <methname> on "main", forward output of "side" while blocking
95 """call <methname> on "main", forward output of "side" while blocking
89 """
96 """
90 # data can be '' or 0
97 # data can be '' or 0
91 if (data is not None and not data) or self._main.closed:
98 if (data is not None and not data) or self._main.closed:
92 _forwardoutput(self._ui, self._side)
99 _forwardoutput(self._ui, self._side)
93 return ''
100 return ''
94 while True:
101 while True:
95 mainready, sideready = self._wait()
102 mainready, sideready = self._wait()
96 if sideready:
103 if sideready:
97 _forwardoutput(self._ui, self._side)
104 _forwardoutput(self._ui, self._side)
98 if mainready:
105 if mainready:
99 meth = getattr(self._main, methname)
106 meth = getattr(self._main, methname)
100 if data is None:
107 if data is None:
101 return meth()
108 return meth()
102 else:
109 else:
103 return meth(data)
110 return meth(data)
104
111
105 def close(self):
112 def close(self):
106 return self._main.close()
113 return self._main.close()
107
114
108 def flush(self):
115 def flush(self):
109 return self._main.flush()
116 return self._main.flush()
110
117
111 class sshpeer(wireproto.wirepeer):
118 class sshpeer(wireproto.wirepeer):
112 def __init__(self, ui, path, create=False):
119 def __init__(self, ui, path, create=False):
113 self._url = path
120 self._url = path
114 self.ui = ui
121 self.ui = ui
115 self.pipeo = self.pipei = self.pipee = None
122 self.pipeo = self.pipei = self.pipee = None
116
123
117 u = util.url(path, parsequery=False, parsefragment=False)
124 u = util.url(path, parsequery=False, parsefragment=False)
118 if u.scheme != 'ssh' or not u.host or u.path is None:
125 if u.scheme != 'ssh' or not u.host or u.path is None:
119 self._abort(error.RepoError(_("couldn't parse location %s") % path))
126 self._abort(error.RepoError(_("couldn't parse location %s") % path))
120
127
121 self.user = u.user
128 self.user = u.user
122 if u.passwd is not None:
129 if u.passwd is not None:
123 self._abort(error.RepoError(_("password in URL not supported")))
130 self._abort(error.RepoError(_("password in URL not supported")))
124 self.host = u.host
131 self.host = u.host
125 self.port = u.port
132 self.port = u.port
126 self.path = u.path or "."
133 self.path = u.path or "."
127
134
128 sshcmd = self.ui.config("ui", "ssh", "ssh")
135 sshcmd = self.ui.config("ui", "ssh", "ssh")
129 remotecmd = self.ui.config("ui", "remotecmd", "hg")
136 remotecmd = self.ui.config("ui", "remotecmd", "hg")
130
137
131 args = util.sshargs(sshcmd,
138 args = util.sshargs(sshcmd,
132 _serverquote(self.host),
139 _serverquote(self.host),
133 _serverquote(self.user),
140 _serverquote(self.user),
134 _serverquote(self.port))
141 _serverquote(self.port))
135
142
136 if create:
143 if create:
137 cmd = '%s %s %s' % (sshcmd, args,
144 cmd = '%s %s %s' % (sshcmd, args,
138 util.shellquote("%s init %s" %
145 util.shellquote("%s init %s" %
139 (_serverquote(remotecmd), _serverquote(self.path))))
146 (_serverquote(remotecmd), _serverquote(self.path))))
140 ui.debug('running %s\n' % cmd)
147 ui.debug('running %s\n' % cmd)
141 res = ui.system(cmd)
148 res = ui.system(cmd)
142 if res != 0:
149 if res != 0:
143 self._abort(error.RepoError(_("could not create remote repo")))
150 self._abort(error.RepoError(_("could not create remote repo")))
144
151
145 self._validaterepo(sshcmd, args, remotecmd)
152 self._validaterepo(sshcmd, args, remotecmd)
146
153
147 def url(self):
154 def url(self):
148 return self._url
155 return self._url
149
156
150 def _validaterepo(self, sshcmd, args, remotecmd):
157 def _validaterepo(self, sshcmd, args, remotecmd):
151 # cleanup up previous run
158 # cleanup up previous run
152 self.cleanup()
159 self.cleanup()
153
160
154 cmd = '%s %s %s' % (sshcmd, args,
161 cmd = '%s %s %s' % (sshcmd, args,
155 util.shellquote("%s -R %s serve --stdio" %
162 util.shellquote("%s -R %s serve --stdio" %
156 (_serverquote(remotecmd), _serverquote(self.path))))
163 (_serverquote(remotecmd), _serverquote(self.path))))
157 self.ui.debug('running %s\n' % cmd)
164 self.ui.debug('running %s\n' % cmd)
158 cmd = util.quotecommand(cmd)
165 cmd = util.quotecommand(cmd)
159
166
160 # while self.subprocess isn't used, having it allows the subprocess to
167 # while self.subprocess isn't used, having it allows the subprocess to
161 # to clean up correctly later
168 # to clean up correctly later
162 #
169 #
163 # no buffer allow the use of 'select'
170 # no buffer allow the use of 'select'
164 # feel free to remove buffering and select usage when we ultimately
171 # feel free to remove buffering and select usage when we ultimately
165 # move to threading.
172 # move to threading.
166 sub = util.popen4(cmd, bufsize=0)
173 sub = util.popen4(cmd, bufsize=0)
167 self.pipeo, self.pipei, self.pipee, self.subprocess = sub
174 self.pipeo, self.pipei, self.pipee, self.subprocess = sub
168
175
169 self.pipei = util.bufferedinputpipe(self.pipei)
176 self.pipei = util.bufferedinputpipe(self.pipei)
170 self.pipei = doublepipe(self.ui, self.pipei, self.pipee)
177 self.pipei = doublepipe(self.ui, self.pipei, self.pipee)
171 self.pipeo = doublepipe(self.ui, self.pipeo, self.pipee)
178 self.pipeo = doublepipe(self.ui, self.pipeo, self.pipee)
172
179
173 # skip any noise generated by remote shell
180 # skip any noise generated by remote shell
174 self._callstream("hello")
181 self._callstream("hello")
175 r = self._callstream("between", pairs=("%s-%s" % ("0"*40, "0"*40)))
182 r = self._callstream("between", pairs=("%s-%s" % ("0"*40, "0"*40)))
176 lines = ["", "dummy"]
183 lines = ["", "dummy"]
177 max_noise = 500
184 max_noise = 500
178 while lines[-1] and max_noise:
185 while lines[-1] and max_noise:
179 l = r.readline()
186 l = r.readline()
180 self.readerr()
187 self.readerr()
181 if lines[-1] == "1\n" and l == "\n":
188 if lines[-1] == "1\n" and l == "\n":
182 break
189 break
183 if l:
190 if l:
184 self.ui.debug("remote: ", l)
191 self.ui.debug("remote: ", l)
185 lines.append(l)
192 lines.append(l)
186 max_noise -= 1
193 max_noise -= 1
187 else:
194 else:
188 self._abort(error.RepoError(_('no suitable response from '
195 self._abort(error.RepoError(_('no suitable response from '
189 'remote hg')))
196 'remote hg')))
190
197
191 self._caps = set()
198 self._caps = set()
192 for l in reversed(lines):
199 for l in reversed(lines):
193 if l.startswith("capabilities:"):
200 if l.startswith("capabilities:"):
194 self._caps.update(l[:-1].split(":")[1].split())
201 self._caps.update(l[:-1].split(":")[1].split())
195 break
202 break
196
203
197 def _capabilities(self):
204 def _capabilities(self):
198 return self._caps
205 return self._caps
199
206
200 def readerr(self):
207 def readerr(self):
201 _forwardoutput(self.ui, self.pipee)
208 _forwardoutput(self.ui, self.pipee)
202
209
203 def _abort(self, exception):
210 def _abort(self, exception):
204 self.cleanup()
211 self.cleanup()
205 raise exception
212 raise exception
206
213
207 def cleanup(self):
214 def cleanup(self):
208 if self.pipeo is None:
215 if self.pipeo is None:
209 return
216 return
210 self.pipeo.close()
217 self.pipeo.close()
211 self.pipei.close()
218 self.pipei.close()
212 try:
219 try:
213 # read the error descriptor until EOF
220 # read the error descriptor until EOF
214 for l in self.pipee:
221 for l in self.pipee:
215 self.ui.status(_("remote: "), l)
222 self.ui.status(_("remote: "), l)
216 except (IOError, ValueError):
223 except (IOError, ValueError):
217 pass
224 pass
218 self.pipee.close()
225 self.pipee.close()
219
226
220 __del__ = cleanup
227 __del__ = cleanup
221
228
222 def _callstream(self, cmd, **args):
229 def _callstream(self, cmd, **args):
223 self.ui.debug("sending %s command\n" % cmd)
230 self.ui.debug("sending %s command\n" % cmd)
224 self.pipeo.write("%s\n" % cmd)
231 self.pipeo.write("%s\n" % cmd)
225 _func, names = wireproto.commands[cmd]
232 _func, names = wireproto.commands[cmd]
226 keys = names.split()
233 keys = names.split()
227 wireargs = {}
234 wireargs = {}
228 for k in keys:
235 for k in keys:
229 if k == '*':
236 if k == '*':
230 wireargs['*'] = args
237 wireargs['*'] = args
231 break
238 break
232 else:
239 else:
233 wireargs[k] = args[k]
240 wireargs[k] = args[k]
234 del args[k]
241 del args[k]
235 for k, v in sorted(wireargs.iteritems()):
242 for k, v in sorted(wireargs.iteritems()):
236 self.pipeo.write("%s %d\n" % (k, len(v)))
243 self.pipeo.write("%s %d\n" % (k, len(v)))
237 if isinstance(v, dict):
244 if isinstance(v, dict):
238 for dk, dv in v.iteritems():
245 for dk, dv in v.iteritems():
239 self.pipeo.write("%s %d\n" % (dk, len(dv)))
246 self.pipeo.write("%s %d\n" % (dk, len(dv)))
240 self.pipeo.write(dv)
247 self.pipeo.write(dv)
241 else:
248 else:
242 self.pipeo.write(v)
249 self.pipeo.write(v)
243 self.pipeo.flush()
250 self.pipeo.flush()
244
251
245 return self.pipei
252 return self.pipei
246
253
247 def _callcompressable(self, cmd, **args):
254 def _callcompressable(self, cmd, **args):
248 return self._callstream(cmd, **args)
255 return self._callstream(cmd, **args)
249
256
250 def _call(self, cmd, **args):
257 def _call(self, cmd, **args):
251 self._callstream(cmd, **args)
258 self._callstream(cmd, **args)
252 return self._recv()
259 return self._recv()
253
260
254 def _callpush(self, cmd, fp, **args):
261 def _callpush(self, cmd, fp, **args):
255 r = self._call(cmd, **args)
262 r = self._call(cmd, **args)
256 if r:
263 if r:
257 return '', r
264 return '', r
258 while True:
265 while True:
259 d = fp.read(4096)
266 d = fp.read(4096)
260 if not d:
267 if not d:
261 break
268 break
262 self._send(d)
269 self._send(d)
263 self._send("", flush=True)
270 self._send("", flush=True)
264 r = self._recv()
271 r = self._recv()
265 if r:
272 if r:
266 return '', r
273 return '', r
267 return self._recv(), ''
274 return self._recv(), ''
268
275
269 def _calltwowaystream(self, cmd, fp, **args):
276 def _calltwowaystream(self, cmd, fp, **args):
270 r = self._call(cmd, **args)
277 r = self._call(cmd, **args)
271 if r:
278 if r:
272 # XXX needs to be made better
279 # XXX needs to be made better
273 raise util.Abort('unexpected remote reply: %s' % r)
280 raise util.Abort('unexpected remote reply: %s' % r)
274 while True:
281 while True:
275 d = fp.read(4096)
282 d = fp.read(4096)
276 if not d:
283 if not d:
277 break
284 break
278 self._send(d)
285 self._send(d)
279 self._send("", flush=True)
286 self._send("", flush=True)
280 return self.pipei
287 return self.pipei
281
288
282 def _recv(self):
289 def _recv(self):
283 l = self.pipei.readline()
290 l = self.pipei.readline()
284 if l == '\n':
291 if l == '\n':
285 self.readerr()
292 self.readerr()
286 msg = _('check previous remote output')
293 msg = _('check previous remote output')
287 self._abort(error.OutOfBandError(hint=msg))
294 self._abort(error.OutOfBandError(hint=msg))
288 self.readerr()
295 self.readerr()
289 try:
296 try:
290 l = int(l)
297 l = int(l)
291 except ValueError:
298 except ValueError:
292 self._abort(error.ResponseError(_("unexpected response:"), l))
299 self._abort(error.ResponseError(_("unexpected response:"), l))
293 return self.pipei.read(l)
300 return self.pipei.read(l)
294
301
295 def _send(self, data, flush=False):
302 def _send(self, data, flush=False):
296 self.pipeo.write("%d\n" % len(data))
303 self.pipeo.write("%d\n" % len(data))
297 if data:
304 if data:
298 self.pipeo.write(data)
305 self.pipeo.write(data)
299 if flush:
306 if flush:
300 self.pipeo.flush()
307 self.pipeo.flush()
301 self.readerr()
308 self.readerr()
302
309
303 def lock(self):
310 def lock(self):
304 self._call("lock")
311 self._call("lock")
305 return remotelock(self)
312 return remotelock(self)
306
313
307 def unlock(self):
314 def unlock(self):
308 self._call("unlock")
315 self._call("unlock")
309
316
310 def addchangegroup(self, cg, source, url, lock=None):
317 def addchangegroup(self, cg, source, url, lock=None):
311 '''Send a changegroup to the remote server. Return an integer
318 '''Send a changegroup to the remote server. Return an integer
312 similar to unbundle(). DEPRECATED, since it requires locking the
319 similar to unbundle(). DEPRECATED, since it requires locking the
313 remote.'''
320 remote.'''
314 d = self._call("addchangegroup")
321 d = self._call("addchangegroup")
315 if d:
322 if d:
316 self._abort(error.RepoError(_("push refused: %s") % d))
323 self._abort(error.RepoError(_("push refused: %s") % d))
317 while True:
324 while True:
318 d = cg.read(4096)
325 d = cg.read(4096)
319 if not d:
326 if not d:
320 break
327 break
321 self.pipeo.write(d)
328 self.pipeo.write(d)
322 self.readerr()
329 self.readerr()
323
330
324 self.pipeo.flush()
331 self.pipeo.flush()
325
332
326 self.readerr()
333 self.readerr()
327 r = self._recv()
334 r = self._recv()
328 if not r:
335 if not r:
329 return 1
336 return 1
330 try:
337 try:
331 return int(r)
338 return int(r)
332 except ValueError:
339 except ValueError:
333 self._abort(error.ResponseError(_("unexpected response:"), r))
340 self._abort(error.ResponseError(_("unexpected response:"), r))
334
341
335 instance = sshpeer
342 instance = sshpeer
General Comments 0
You need to be logged in to leave comments. Login now