##// END OF EJS Templates
sshpeer: make remotelock a context manager
Bryan O'Sullivan -
r27798:8953e963 default
parent child Browse files
Show More
@@ -1,342 +1,347 b''
1 # sshpeer.py - ssh repository proxy class for mercurial
1 # sshpeer.py - ssh repository proxy class for mercurial
2 #
2 #
3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import re
10 import re
11
11
12 from .i18n import _
12 from .i18n import _
13 from . import (
13 from . import (
14 error,
14 error,
15 util,
15 util,
16 wireproto,
16 wireproto,
17 )
17 )
18
18
19 class remotelock(object):
19 class remotelock(object):
20 def __init__(self, repo):
20 def __init__(self, repo):
21 self.repo = repo
21 self.repo = repo
22 def release(self):
22 def release(self):
23 self.repo.unlock()
23 self.repo.unlock()
24 self.repo = None
24 self.repo = None
25 def __enter__(self):
26 return self
27 def __exit__(self, exc_type, exc_val, exc_tb):
28 if self.repo:
29 self.release()
25 def __del__(self):
30 def __del__(self):
26 if self.repo:
31 if self.repo:
27 self.release()
32 self.release()
28
33
29 def _serverquote(s):
34 def _serverquote(s):
30 if not s:
35 if not s:
31 return s
36 return s
32 '''quote a string for the remote shell ... which we assume is sh'''
37 '''quote a string for the remote shell ... which we assume is sh'''
33 if re.match('[a-zA-Z0-9@%_+=:,./-]*$', s):
38 if re.match('[a-zA-Z0-9@%_+=:,./-]*$', s):
34 return s
39 return s
35 return "'%s'" % s.replace("'", "'\\''")
40 return "'%s'" % s.replace("'", "'\\''")
36
41
37 def _forwardoutput(ui, pipe):
42 def _forwardoutput(ui, pipe):
38 """display all data currently available on pipe as remote output.
43 """display all data currently available on pipe as remote output.
39
44
40 This is non blocking."""
45 This is non blocking."""
41 s = util.readpipe(pipe)
46 s = util.readpipe(pipe)
42 if s:
47 if s:
43 for l in s.splitlines():
48 for l in s.splitlines():
44 ui.status(_("remote: "), l, '\n')
49 ui.status(_("remote: "), l, '\n')
45
50
46 class doublepipe(object):
51 class doublepipe(object):
47 """Operate a side-channel pipe in addition of a main one
52 """Operate a side-channel pipe in addition of a main one
48
53
49 The side-channel pipe contains server output to be forwarded to the user
54 The side-channel pipe contains server output to be forwarded to the user
50 input. The double pipe will behave as the "main" pipe, but will ensure the
55 input. The double pipe will behave as the "main" pipe, but will ensure the
51 content of the "side" pipe is properly processed while we wait for blocking
56 content of the "side" pipe is properly processed while we wait for blocking
52 call on the "main" pipe.
57 call on the "main" pipe.
53
58
54 If large amounts of data are read from "main", the forward will cease after
59 If large amounts of data are read from "main", the forward will cease after
55 the first bytes start to appear. This simplifies the implementation
60 the first bytes start to appear. This simplifies the implementation
56 without affecting actual output of sshpeer too much as we rarely issue
61 without affecting actual output of sshpeer too much as we rarely issue
57 large read for data not yet emitted by the server.
62 large read for data not yet emitted by the server.
58
63
59 The main pipe is expected to be a 'bufferedinputpipe' from the util module
64 The main pipe is expected to be a 'bufferedinputpipe' from the util module
60 that handle all the os specific bites. This class lives in this module
65 that handle all the os specific bites. This class lives in this module
61 because it focus on behavior specific to the ssh protocol."""
66 because it focus on behavior specific to the ssh protocol."""
62
67
63 def __init__(self, ui, main, side):
68 def __init__(self, ui, main, side):
64 self._ui = ui
69 self._ui = ui
65 self._main = main
70 self._main = main
66 self._side = side
71 self._side = side
67
72
68 def _wait(self):
73 def _wait(self):
69 """wait until some data are available on main or side
74 """wait until some data are available on main or side
70
75
71 return a pair of boolean (ismainready, issideready)
76 return a pair of boolean (ismainready, issideready)
72
77
73 (This will only wait for data if the setup is supported by `util.poll`)
78 (This will only wait for data if the setup is supported by `util.poll`)
74 """
79 """
75 if getattr(self._main, 'hasbuffer', False): # getattr for classic pipe
80 if getattr(self._main, 'hasbuffer', False): # getattr for classic pipe
76 return (True, True) # main has data, assume side is worth poking at.
81 return (True, True) # main has data, assume side is worth poking at.
77 fds = [self._main.fileno(), self._side.fileno()]
82 fds = [self._main.fileno(), self._side.fileno()]
78 try:
83 try:
79 act = util.poll(fds)
84 act = util.poll(fds)
80 except NotImplementedError:
85 except NotImplementedError:
81 # non supported yet case, assume all have data.
86 # non supported yet case, assume all have data.
82 act = fds
87 act = fds
83 return (self._main.fileno() in act, self._side.fileno() in act)
88 return (self._main.fileno() in act, self._side.fileno() in act)
84
89
85 def write(self, data):
90 def write(self, data):
86 return self._call('write', data)
91 return self._call('write', data)
87
92
88 def read(self, size):
93 def read(self, size):
89 return self._call('read', size)
94 return self._call('read', size)
90
95
91 def readline(self):
96 def readline(self):
92 return self._call('readline')
97 return self._call('readline')
93
98
94 def _call(self, methname, data=None):
99 def _call(self, methname, data=None):
95 """call <methname> on "main", forward output of "side" while blocking
100 """call <methname> on "main", forward output of "side" while blocking
96 """
101 """
97 # data can be '' or 0
102 # data can be '' or 0
98 if (data is not None and not data) or self._main.closed:
103 if (data is not None and not data) or self._main.closed:
99 _forwardoutput(self._ui, self._side)
104 _forwardoutput(self._ui, self._side)
100 return ''
105 return ''
101 while True:
106 while True:
102 mainready, sideready = self._wait()
107 mainready, sideready = self._wait()
103 if sideready:
108 if sideready:
104 _forwardoutput(self._ui, self._side)
109 _forwardoutput(self._ui, self._side)
105 if mainready:
110 if mainready:
106 meth = getattr(self._main, methname)
111 meth = getattr(self._main, methname)
107 if data is None:
112 if data is None:
108 return meth()
113 return meth()
109 else:
114 else:
110 return meth(data)
115 return meth(data)
111
116
112 def close(self):
117 def close(self):
113 return self._main.close()
118 return self._main.close()
114
119
115 def flush(self):
120 def flush(self):
116 return self._main.flush()
121 return self._main.flush()
117
122
118 class sshpeer(wireproto.wirepeer):
123 class sshpeer(wireproto.wirepeer):
119 def __init__(self, ui, path, create=False):
124 def __init__(self, ui, path, create=False):
120 self._url = path
125 self._url = path
121 self.ui = ui
126 self.ui = ui
122 self.pipeo = self.pipei = self.pipee = None
127 self.pipeo = self.pipei = self.pipee = None
123
128
124 u = util.url(path, parsequery=False, parsefragment=False)
129 u = util.url(path, parsequery=False, parsefragment=False)
125 if u.scheme != 'ssh' or not u.host or u.path is None:
130 if u.scheme != 'ssh' or not u.host or u.path is None:
126 self._abort(error.RepoError(_("couldn't parse location %s") % path))
131 self._abort(error.RepoError(_("couldn't parse location %s") % path))
127
132
128 self.user = u.user
133 self.user = u.user
129 if u.passwd is not None:
134 if u.passwd is not None:
130 self._abort(error.RepoError(_("password in URL not supported")))
135 self._abort(error.RepoError(_("password in URL not supported")))
131 self.host = u.host
136 self.host = u.host
132 self.port = u.port
137 self.port = u.port
133 self.path = u.path or "."
138 self.path = u.path or "."
134
139
135 sshcmd = self.ui.config("ui", "ssh", "ssh")
140 sshcmd = self.ui.config("ui", "ssh", "ssh")
136 remotecmd = self.ui.config("ui", "remotecmd", "hg")
141 remotecmd = self.ui.config("ui", "remotecmd", "hg")
137
142
138 args = util.sshargs(sshcmd,
143 args = util.sshargs(sshcmd,
139 _serverquote(self.host),
144 _serverquote(self.host),
140 _serverquote(self.user),
145 _serverquote(self.user),
141 _serverquote(self.port))
146 _serverquote(self.port))
142
147
143 if create:
148 if create:
144 cmd = '%s %s %s' % (sshcmd, args,
149 cmd = '%s %s %s' % (sshcmd, args,
145 util.shellquote("%s init %s" %
150 util.shellquote("%s init %s" %
146 (_serverquote(remotecmd), _serverquote(self.path))))
151 (_serverquote(remotecmd), _serverquote(self.path))))
147 ui.debug('running %s\n' % cmd)
152 ui.debug('running %s\n' % cmd)
148 res = ui.system(cmd)
153 res = ui.system(cmd)
149 if res != 0:
154 if res != 0:
150 self._abort(error.RepoError(_("could not create remote repo")))
155 self._abort(error.RepoError(_("could not create remote repo")))
151
156
152 self._validaterepo(sshcmd, args, remotecmd)
157 self._validaterepo(sshcmd, args, remotecmd)
153
158
154 def url(self):
159 def url(self):
155 return self._url
160 return self._url
156
161
157 def _validaterepo(self, sshcmd, args, remotecmd):
162 def _validaterepo(self, sshcmd, args, remotecmd):
158 # cleanup up previous run
163 # cleanup up previous run
159 self.cleanup()
164 self.cleanup()
160
165
161 cmd = '%s %s %s' % (sshcmd, args,
166 cmd = '%s %s %s' % (sshcmd, args,
162 util.shellquote("%s -R %s serve --stdio" %
167 util.shellquote("%s -R %s serve --stdio" %
163 (_serverquote(remotecmd), _serverquote(self.path))))
168 (_serverquote(remotecmd), _serverquote(self.path))))
164 self.ui.debug('running %s\n' % cmd)
169 self.ui.debug('running %s\n' % cmd)
165 cmd = util.quotecommand(cmd)
170 cmd = util.quotecommand(cmd)
166
171
167 # while self.subprocess isn't used, having it allows the subprocess to
172 # while self.subprocess isn't used, having it allows the subprocess to
168 # to clean up correctly later
173 # to clean up correctly later
169 #
174 #
170 # no buffer allow the use of 'select'
175 # no buffer allow the use of 'select'
171 # feel free to remove buffering and select usage when we ultimately
176 # feel free to remove buffering and select usage when we ultimately
172 # move to threading.
177 # move to threading.
173 sub = util.popen4(cmd, bufsize=0)
178 sub = util.popen4(cmd, bufsize=0)
174 self.pipeo, self.pipei, self.pipee, self.subprocess = sub
179 self.pipeo, self.pipei, self.pipee, self.subprocess = sub
175
180
176 self.pipei = util.bufferedinputpipe(self.pipei)
181 self.pipei = util.bufferedinputpipe(self.pipei)
177 self.pipei = doublepipe(self.ui, self.pipei, self.pipee)
182 self.pipei = doublepipe(self.ui, self.pipei, self.pipee)
178 self.pipeo = doublepipe(self.ui, self.pipeo, self.pipee)
183 self.pipeo = doublepipe(self.ui, self.pipeo, self.pipee)
179
184
180 # skip any noise generated by remote shell
185 # skip any noise generated by remote shell
181 self._callstream("hello")
186 self._callstream("hello")
182 r = self._callstream("between", pairs=("%s-%s" % ("0"*40, "0"*40)))
187 r = self._callstream("between", pairs=("%s-%s" % ("0"*40, "0"*40)))
183 lines = ["", "dummy"]
188 lines = ["", "dummy"]
184 max_noise = 500
189 max_noise = 500
185 while lines[-1] and max_noise:
190 while lines[-1] and max_noise:
186 l = r.readline()
191 l = r.readline()
187 self.readerr()
192 self.readerr()
188 if lines[-1] == "1\n" and l == "\n":
193 if lines[-1] == "1\n" and l == "\n":
189 break
194 break
190 if l:
195 if l:
191 self.ui.debug("remote: ", l)
196 self.ui.debug("remote: ", l)
192 lines.append(l)
197 lines.append(l)
193 max_noise -= 1
198 max_noise -= 1
194 else:
199 else:
195 self._abort(error.RepoError(_('no suitable response from '
200 self._abort(error.RepoError(_('no suitable response from '
196 'remote hg')))
201 'remote hg')))
197
202
198 self._caps = set()
203 self._caps = set()
199 for l in reversed(lines):
204 for l in reversed(lines):
200 if l.startswith("capabilities:"):
205 if l.startswith("capabilities:"):
201 self._caps.update(l[:-1].split(":")[1].split())
206 self._caps.update(l[:-1].split(":")[1].split())
202 break
207 break
203
208
204 def _capabilities(self):
209 def _capabilities(self):
205 return self._caps
210 return self._caps
206
211
207 def readerr(self):
212 def readerr(self):
208 _forwardoutput(self.ui, self.pipee)
213 _forwardoutput(self.ui, self.pipee)
209
214
210 def _abort(self, exception):
215 def _abort(self, exception):
211 self.cleanup()
216 self.cleanup()
212 raise exception
217 raise exception
213
218
214 def cleanup(self):
219 def cleanup(self):
215 if self.pipeo is None:
220 if self.pipeo is None:
216 return
221 return
217 self.pipeo.close()
222 self.pipeo.close()
218 self.pipei.close()
223 self.pipei.close()
219 try:
224 try:
220 # read the error descriptor until EOF
225 # read the error descriptor until EOF
221 for l in self.pipee:
226 for l in self.pipee:
222 self.ui.status(_("remote: "), l)
227 self.ui.status(_("remote: "), l)
223 except (IOError, ValueError):
228 except (IOError, ValueError):
224 pass
229 pass
225 self.pipee.close()
230 self.pipee.close()
226
231
227 __del__ = cleanup
232 __del__ = cleanup
228
233
229 def _callstream(self, cmd, **args):
234 def _callstream(self, cmd, **args):
230 self.ui.debug("sending %s command\n" % cmd)
235 self.ui.debug("sending %s command\n" % cmd)
231 self.pipeo.write("%s\n" % cmd)
236 self.pipeo.write("%s\n" % cmd)
232 _func, names = wireproto.commands[cmd]
237 _func, names = wireproto.commands[cmd]
233 keys = names.split()
238 keys = names.split()
234 wireargs = {}
239 wireargs = {}
235 for k in keys:
240 for k in keys:
236 if k == '*':
241 if k == '*':
237 wireargs['*'] = args
242 wireargs['*'] = args
238 break
243 break
239 else:
244 else:
240 wireargs[k] = args[k]
245 wireargs[k] = args[k]
241 del args[k]
246 del args[k]
242 for k, v in sorted(wireargs.iteritems()):
247 for k, v in sorted(wireargs.iteritems()):
243 self.pipeo.write("%s %d\n" % (k, len(v)))
248 self.pipeo.write("%s %d\n" % (k, len(v)))
244 if isinstance(v, dict):
249 if isinstance(v, dict):
245 for dk, dv in v.iteritems():
250 for dk, dv in v.iteritems():
246 self.pipeo.write("%s %d\n" % (dk, len(dv)))
251 self.pipeo.write("%s %d\n" % (dk, len(dv)))
247 self.pipeo.write(dv)
252 self.pipeo.write(dv)
248 else:
253 else:
249 self.pipeo.write(v)
254 self.pipeo.write(v)
250 self.pipeo.flush()
255 self.pipeo.flush()
251
256
252 return self.pipei
257 return self.pipei
253
258
254 def _callcompressable(self, cmd, **args):
259 def _callcompressable(self, cmd, **args):
255 return self._callstream(cmd, **args)
260 return self._callstream(cmd, **args)
256
261
257 def _call(self, cmd, **args):
262 def _call(self, cmd, **args):
258 self._callstream(cmd, **args)
263 self._callstream(cmd, **args)
259 return self._recv()
264 return self._recv()
260
265
261 def _callpush(self, cmd, fp, **args):
266 def _callpush(self, cmd, fp, **args):
262 r = self._call(cmd, **args)
267 r = self._call(cmd, **args)
263 if r:
268 if r:
264 return '', r
269 return '', r
265 while True:
270 while True:
266 d = fp.read(4096)
271 d = fp.read(4096)
267 if not d:
272 if not d:
268 break
273 break
269 self._send(d)
274 self._send(d)
270 self._send("", flush=True)
275 self._send("", flush=True)
271 r = self._recv()
276 r = self._recv()
272 if r:
277 if r:
273 return '', r
278 return '', r
274 return self._recv(), ''
279 return self._recv(), ''
275
280
276 def _calltwowaystream(self, cmd, fp, **args):
281 def _calltwowaystream(self, cmd, fp, **args):
277 r = self._call(cmd, **args)
282 r = self._call(cmd, **args)
278 if r:
283 if r:
279 # XXX needs to be made better
284 # XXX needs to be made better
280 raise error.Abort('unexpected remote reply: %s' % r)
285 raise error.Abort('unexpected remote reply: %s' % r)
281 while True:
286 while True:
282 d = fp.read(4096)
287 d = fp.read(4096)
283 if not d:
288 if not d:
284 break
289 break
285 self._send(d)
290 self._send(d)
286 self._send("", flush=True)
291 self._send("", flush=True)
287 return self.pipei
292 return self.pipei
288
293
289 def _recv(self):
294 def _recv(self):
290 l = self.pipei.readline()
295 l = self.pipei.readline()
291 if l == '\n':
296 if l == '\n':
292 self.readerr()
297 self.readerr()
293 msg = _('check previous remote output')
298 msg = _('check previous remote output')
294 self._abort(error.OutOfBandError(hint=msg))
299 self._abort(error.OutOfBandError(hint=msg))
295 self.readerr()
300 self.readerr()
296 try:
301 try:
297 l = int(l)
302 l = int(l)
298 except ValueError:
303 except ValueError:
299 self._abort(error.ResponseError(_("unexpected response:"), l))
304 self._abort(error.ResponseError(_("unexpected response:"), l))
300 return self.pipei.read(l)
305 return self.pipei.read(l)
301
306
302 def _send(self, data, flush=False):
307 def _send(self, data, flush=False):
303 self.pipeo.write("%d\n" % len(data))
308 self.pipeo.write("%d\n" % len(data))
304 if data:
309 if data:
305 self.pipeo.write(data)
310 self.pipeo.write(data)
306 if flush:
311 if flush:
307 self.pipeo.flush()
312 self.pipeo.flush()
308 self.readerr()
313 self.readerr()
309
314
310 def lock(self):
315 def lock(self):
311 self._call("lock")
316 self._call("lock")
312 return remotelock(self)
317 return remotelock(self)
313
318
314 def unlock(self):
319 def unlock(self):
315 self._call("unlock")
320 self._call("unlock")
316
321
317 def addchangegroup(self, cg, source, url, lock=None):
322 def addchangegroup(self, cg, source, url, lock=None):
318 '''Send a changegroup to the remote server. Return an integer
323 '''Send a changegroup to the remote server. Return an integer
319 similar to unbundle(). DEPRECATED, since it requires locking the
324 similar to unbundle(). DEPRECATED, since it requires locking the
320 remote.'''
325 remote.'''
321 d = self._call("addchangegroup")
326 d = self._call("addchangegroup")
322 if d:
327 if d:
323 self._abort(error.RepoError(_("push refused: %s") % d))
328 self._abort(error.RepoError(_("push refused: %s") % d))
324 while True:
329 while True:
325 d = cg.read(4096)
330 d = cg.read(4096)
326 if not d:
331 if not d:
327 break
332 break
328 self.pipeo.write(d)
333 self.pipeo.write(d)
329 self.readerr()
334 self.readerr()
330
335
331 self.pipeo.flush()
336 self.pipeo.flush()
332
337
333 self.readerr()
338 self.readerr()
334 r = self._recv()
339 r = self._recv()
335 if not r:
340 if not r:
336 return 1
341 return 1
337 try:
342 try:
338 return int(r)
343 return int(r)
339 except ValueError:
344 except ValueError:
340 self._abort(error.ResponseError(_("unexpected response:"), r))
345 self._abort(error.ResponseError(_("unexpected response:"), r))
341
346
342 instance = sshpeer
347 instance = sshpeer
General Comments 0
You need to be logged in to leave comments. Login now