##// END OF EJS Templates
sshpeer: rename 'size' to 'data' in doublepipe...
Pierre-Yves David -
r25455:dc02a284 default
parent child Browse files
Show More
@@ -1,327 +1,328 b''
1 # sshpeer.py - ssh repository proxy class for mercurial
1 # sshpeer.py - ssh repository proxy class for mercurial
2 #
2 #
3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 import re
8 import re
9 from i18n import _
9 from i18n import _
10 import util, error, wireproto
10 import util, error, wireproto
11
11
12 class remotelock(object):
12 class remotelock(object):
13 def __init__(self, repo):
13 def __init__(self, repo):
14 self.repo = repo
14 self.repo = repo
15 def release(self):
15 def release(self):
16 self.repo.unlock()
16 self.repo.unlock()
17 self.repo = None
17 self.repo = None
18 def __del__(self):
18 def __del__(self):
19 if self.repo:
19 if self.repo:
20 self.release()
20 self.release()
21
21
22 def _serverquote(s):
22 def _serverquote(s):
23 if not s:
23 if not s:
24 return s
24 return s
25 '''quote a string for the remote shell ... which we assume is sh'''
25 '''quote a string for the remote shell ... which we assume is sh'''
26 if re.match('[a-zA-Z0-9@%_+=:,./-]*$', s):
26 if re.match('[a-zA-Z0-9@%_+=:,./-]*$', s):
27 return s
27 return s
28 return "'%s'" % s.replace("'", "'\\''")
28 return "'%s'" % s.replace("'", "'\\''")
29
29
30 def _forwardoutput(ui, pipe):
30 def _forwardoutput(ui, pipe):
31 """display all data currently available on pipe as remote output.
31 """display all data currently available on pipe as remote output.
32
32
33 This is non blocking."""
33 This is non blocking."""
34 s = util.readpipe(pipe)
34 s = util.readpipe(pipe)
35 if s:
35 if s:
36 for l in s.splitlines():
36 for l in s.splitlines():
37 ui.status(_("remote: "), l, '\n')
37 ui.status(_("remote: "), l, '\n')
38
38
39 class doublepipe(object):
39 class doublepipe(object):
40 """Operate a side-channel pipe in addition of a main one
40 """Operate a side-channel pipe in addition of a main one
41
41
42 The side-channel pipe contains server output to be forwarded to the user
42 The side-channel pipe contains server output to be forwarded to the user
43 input. The double pipe will behave as the "main" pipe, but will ensure the
43 input. The double pipe will behave as the "main" pipe, but will ensure the
44 content of the "side" pipe is properly processed while we wait for blocking
44 content of the "side" pipe is properly processed while we wait for blocking
45 call on the "main" pipe.
45 call on the "main" pipe.
46
46
47 If large amounts of data are read from "main", the forward will cease after
47 If large amounts of data are read from "main", the forward will cease after
48 the first bytes start to appear. This simplifies the implementation
48 the first bytes start to appear. This simplifies the implementation
49 without affecting actual output of sshpeer too much as we rarely issue
49 without affecting actual output of sshpeer too much as we rarely issue
50 large read for data not yet emitted by the server.
50 large read for data not yet emitted by the server.
51
51
52 The main pipe is expected to be a 'bufferedinputpipe' from the util module
52 The main pipe is expected to be a 'bufferedinputpipe' from the util module
53 that handle all the os specific bites. This class lives in this module
53 that handle all the os specific bites. This class lives in this module
54 because it focus on behavior specifig to the ssh protocol."""
54 because it focus on behavior specifig to the ssh protocol."""
55
55
56 def __init__(self, ui, main, side):
56 def __init__(self, ui, main, side):
57 self._ui = ui
57 self._ui = ui
58 self._main = main
58 self._main = main
59 self._side = side
59 self._side = side
60
60
61 def _wait(self):
61 def _wait(self):
62 """wait until some data are available on main or side
62 """wait until some data are available on main or side
63
63
64 return a pair of boolean (ismainready, issideready)
64 return a pair of boolean (ismainready, issideready)
65
65
66 (This will only wait for data if the setup is supported by `util.poll`)
66 (This will only wait for data if the setup is supported by `util.poll`)
67 """
67 """
68 if self._main.hasbuffer:
68 if self._main.hasbuffer:
69 return (True, True) # main has data, assume side is worth poking at.
69 return (True, True) # main has data, assume side is worth poking at.
70 fds = [self._main.fileno(), self._side.fileno()]
70 fds = [self._main.fileno(), self._side.fileno()]
71 try:
71 try:
72 act = util.poll(fds)
72 act = util.poll(fds)
73 except NotImplementedError:
73 except NotImplementedError:
74 # non supported yet case, assume all have data.
74 # non supported yet case, assume all have data.
75 act = fds
75 act = fds
76 return (self._main.fileno() in act, self._side.fileno() in act)
76 return (self._main.fileno() in act, self._side.fileno() in act)
77
77
78 def read(self, size):
78 def read(self, size):
79 return self._call('read', size)
79 return self._call('read', size)
80
80
81 def readline(self):
81 def readline(self):
82 return self._call('readline')
82 return self._call('readline')
83
83
84 def _call(self, methname, size=None):
84 def _call(self, methname, data=None):
85 """call <methname> on "main", forward output of "side" while blocking
85 """call <methname> on "main", forward output of "side" while blocking
86 """
86 """
87 if size == 0 or self._main.closed:
87 # data can be '' or 0
88 if (data is not None and not data) or self._main.closed:
88 _forwardoutput(self._ui, self._side)
89 _forwardoutput(self._ui, self._side)
89 return ''
90 return ''
90 while True:
91 while True:
91 mainready, sideready = self._wait()
92 mainready, sideready = self._wait()
92 if sideready:
93 if sideready:
93 _forwardoutput(self._ui, self._side)
94 _forwardoutput(self._ui, self._side)
94 if mainready:
95 if mainready:
95 meth = getattr(self._main, methname)
96 meth = getattr(self._main, methname)
96 if size is None:
97 if data is None:
97 return meth()
98 return meth()
98 else:
99 else:
99 return meth(size)
100 return meth(data)
100
101
101 def close(self):
102 def close(self):
102 return self._main.close()
103 return self._main.close()
103
104
104 class sshpeer(wireproto.wirepeer):
105 class sshpeer(wireproto.wirepeer):
105 def __init__(self, ui, path, create=False):
106 def __init__(self, ui, path, create=False):
106 self._url = path
107 self._url = path
107 self.ui = ui
108 self.ui = ui
108 self.pipeo = self.pipei = self.pipee = None
109 self.pipeo = self.pipei = self.pipee = None
109
110
110 u = util.url(path, parsequery=False, parsefragment=False)
111 u = util.url(path, parsequery=False, parsefragment=False)
111 if u.scheme != 'ssh' or not u.host or u.path is None:
112 if u.scheme != 'ssh' or not u.host or u.path is None:
112 self._abort(error.RepoError(_("couldn't parse location %s") % path))
113 self._abort(error.RepoError(_("couldn't parse location %s") % path))
113
114
114 self.user = u.user
115 self.user = u.user
115 if u.passwd is not None:
116 if u.passwd is not None:
116 self._abort(error.RepoError(_("password in URL not supported")))
117 self._abort(error.RepoError(_("password in URL not supported")))
117 self.host = u.host
118 self.host = u.host
118 self.port = u.port
119 self.port = u.port
119 self.path = u.path or "."
120 self.path = u.path or "."
120
121
121 sshcmd = self.ui.config("ui", "ssh", "ssh")
122 sshcmd = self.ui.config("ui", "ssh", "ssh")
122 remotecmd = self.ui.config("ui", "remotecmd", "hg")
123 remotecmd = self.ui.config("ui", "remotecmd", "hg")
123
124
124 args = util.sshargs(sshcmd,
125 args = util.sshargs(sshcmd,
125 _serverquote(self.host),
126 _serverquote(self.host),
126 _serverquote(self.user),
127 _serverquote(self.user),
127 _serverquote(self.port))
128 _serverquote(self.port))
128
129
129 if create:
130 if create:
130 cmd = '%s %s %s' % (sshcmd, args,
131 cmd = '%s %s %s' % (sshcmd, args,
131 util.shellquote("%s init %s" %
132 util.shellquote("%s init %s" %
132 (_serverquote(remotecmd), _serverquote(self.path))))
133 (_serverquote(remotecmd), _serverquote(self.path))))
133 ui.debug('running %s\n' % cmd)
134 ui.debug('running %s\n' % cmd)
134 res = ui.system(cmd)
135 res = ui.system(cmd)
135 if res != 0:
136 if res != 0:
136 self._abort(error.RepoError(_("could not create remote repo")))
137 self._abort(error.RepoError(_("could not create remote repo")))
137
138
138 self._validaterepo(sshcmd, args, remotecmd)
139 self._validaterepo(sshcmd, args, remotecmd)
139
140
140 def url(self):
141 def url(self):
141 return self._url
142 return self._url
142
143
143 def _validaterepo(self, sshcmd, args, remotecmd):
144 def _validaterepo(self, sshcmd, args, remotecmd):
144 # cleanup up previous run
145 # cleanup up previous run
145 self.cleanup()
146 self.cleanup()
146
147
147 cmd = '%s %s %s' % (sshcmd, args,
148 cmd = '%s %s %s' % (sshcmd, args,
148 util.shellquote("%s -R %s serve --stdio" %
149 util.shellquote("%s -R %s serve --stdio" %
149 (_serverquote(remotecmd), _serverquote(self.path))))
150 (_serverquote(remotecmd), _serverquote(self.path))))
150 self.ui.debug('running %s\n' % cmd)
151 self.ui.debug('running %s\n' % cmd)
151 cmd = util.quotecommand(cmd)
152 cmd = util.quotecommand(cmd)
152
153
153 # while self.subprocess isn't used, having it allows the subprocess to
154 # while self.subprocess isn't used, having it allows the subprocess to
154 # to clean up correctly later
155 # to clean up correctly later
155 #
156 #
156 # no buffer allow the use of 'select'
157 # no buffer allow the use of 'select'
157 # feel free to remove buffering and select usage when we ultimately
158 # feel free to remove buffering and select usage when we ultimately
158 # move to threading.
159 # move to threading.
159 sub = util.popen4(cmd, bufsize=0)
160 sub = util.popen4(cmd, bufsize=0)
160 self.pipeo, self.pipei, self.pipee, self.subprocess = sub
161 self.pipeo, self.pipei, self.pipee, self.subprocess = sub
161
162
162 self.pipei = util.bufferedinputpipe(self.pipei)
163 self.pipei = util.bufferedinputpipe(self.pipei)
163 self.pipei = doublepipe(self.ui, self.pipei, self.pipee)
164 self.pipei = doublepipe(self.ui, self.pipei, self.pipee)
164
165
165 # skip any noise generated by remote shell
166 # skip any noise generated by remote shell
166 self._callstream("hello")
167 self._callstream("hello")
167 r = self._callstream("between", pairs=("%s-%s" % ("0"*40, "0"*40)))
168 r = self._callstream("between", pairs=("%s-%s" % ("0"*40, "0"*40)))
168 lines = ["", "dummy"]
169 lines = ["", "dummy"]
169 max_noise = 500
170 max_noise = 500
170 while lines[-1] and max_noise:
171 while lines[-1] and max_noise:
171 l = r.readline()
172 l = r.readline()
172 self.readerr()
173 self.readerr()
173 if lines[-1] == "1\n" and l == "\n":
174 if lines[-1] == "1\n" and l == "\n":
174 break
175 break
175 if l:
176 if l:
176 self.ui.debug("remote: ", l)
177 self.ui.debug("remote: ", l)
177 lines.append(l)
178 lines.append(l)
178 max_noise -= 1
179 max_noise -= 1
179 else:
180 else:
180 self._abort(error.RepoError(_('no suitable response from '
181 self._abort(error.RepoError(_('no suitable response from '
181 'remote hg')))
182 'remote hg')))
182
183
183 self._caps = set()
184 self._caps = set()
184 for l in reversed(lines):
185 for l in reversed(lines):
185 if l.startswith("capabilities:"):
186 if l.startswith("capabilities:"):
186 self._caps.update(l[:-1].split(":")[1].split())
187 self._caps.update(l[:-1].split(":")[1].split())
187 break
188 break
188
189
189 def _capabilities(self):
190 def _capabilities(self):
190 return self._caps
191 return self._caps
191
192
192 def readerr(self):
193 def readerr(self):
193 _forwardoutput(self.ui, self.pipee)
194 _forwardoutput(self.ui, self.pipee)
194
195
195 def _abort(self, exception):
196 def _abort(self, exception):
196 self.cleanup()
197 self.cleanup()
197 raise exception
198 raise exception
198
199
199 def cleanup(self):
200 def cleanup(self):
200 if self.pipeo is None:
201 if self.pipeo is None:
201 return
202 return
202 self.pipeo.close()
203 self.pipeo.close()
203 self.pipei.close()
204 self.pipei.close()
204 try:
205 try:
205 # read the error descriptor until EOF
206 # read the error descriptor until EOF
206 for l in self.pipee:
207 for l in self.pipee:
207 self.ui.status(_("remote: "), l)
208 self.ui.status(_("remote: "), l)
208 except (IOError, ValueError):
209 except (IOError, ValueError):
209 pass
210 pass
210 self.pipee.close()
211 self.pipee.close()
211
212
212 __del__ = cleanup
213 __del__ = cleanup
213
214
214 def _callstream(self, cmd, **args):
215 def _callstream(self, cmd, **args):
215 self.ui.debug("sending %s command\n" % cmd)
216 self.ui.debug("sending %s command\n" % cmd)
216 self.pipeo.write("%s\n" % cmd)
217 self.pipeo.write("%s\n" % cmd)
217 _func, names = wireproto.commands[cmd]
218 _func, names = wireproto.commands[cmd]
218 keys = names.split()
219 keys = names.split()
219 wireargs = {}
220 wireargs = {}
220 for k in keys:
221 for k in keys:
221 if k == '*':
222 if k == '*':
222 wireargs['*'] = args
223 wireargs['*'] = args
223 break
224 break
224 else:
225 else:
225 wireargs[k] = args[k]
226 wireargs[k] = args[k]
226 del args[k]
227 del args[k]
227 for k, v in sorted(wireargs.iteritems()):
228 for k, v in sorted(wireargs.iteritems()):
228 self.pipeo.write("%s %d\n" % (k, len(v)))
229 self.pipeo.write("%s %d\n" % (k, len(v)))
229 if isinstance(v, dict):
230 if isinstance(v, dict):
230 for dk, dv in v.iteritems():
231 for dk, dv in v.iteritems():
231 self.pipeo.write("%s %d\n" % (dk, len(dv)))
232 self.pipeo.write("%s %d\n" % (dk, len(dv)))
232 self.pipeo.write(dv)
233 self.pipeo.write(dv)
233 else:
234 else:
234 self.pipeo.write(v)
235 self.pipeo.write(v)
235 self.pipeo.flush()
236 self.pipeo.flush()
236
237
237 return self.pipei
238 return self.pipei
238
239
239 def _callcompressable(self, cmd, **args):
240 def _callcompressable(self, cmd, **args):
240 return self._callstream(cmd, **args)
241 return self._callstream(cmd, **args)
241
242
242 def _call(self, cmd, **args):
243 def _call(self, cmd, **args):
243 self._callstream(cmd, **args)
244 self._callstream(cmd, **args)
244 return self._recv()
245 return self._recv()
245
246
246 def _callpush(self, cmd, fp, **args):
247 def _callpush(self, cmd, fp, **args):
247 r = self._call(cmd, **args)
248 r = self._call(cmd, **args)
248 if r:
249 if r:
249 return '', r
250 return '', r
250 while True:
251 while True:
251 d = fp.read(4096)
252 d = fp.read(4096)
252 if not d:
253 if not d:
253 break
254 break
254 self._send(d)
255 self._send(d)
255 self._send("", flush=True)
256 self._send("", flush=True)
256 r = self._recv()
257 r = self._recv()
257 if r:
258 if r:
258 return '', r
259 return '', r
259 return self._recv(), ''
260 return self._recv(), ''
260
261
261 def _calltwowaystream(self, cmd, fp, **args):
262 def _calltwowaystream(self, cmd, fp, **args):
262 r = self._call(cmd, **args)
263 r = self._call(cmd, **args)
263 if r:
264 if r:
264 # XXX needs to be made better
265 # XXX needs to be made better
265 raise util.Abort('unexpected remote reply: %s' % r)
266 raise util.Abort('unexpected remote reply: %s' % r)
266 while True:
267 while True:
267 d = fp.read(4096)
268 d = fp.read(4096)
268 if not d:
269 if not d:
269 break
270 break
270 self._send(d)
271 self._send(d)
271 self._send("", flush=True)
272 self._send("", flush=True)
272 return self.pipei
273 return self.pipei
273
274
274 def _recv(self):
275 def _recv(self):
275 l = self.pipei.readline()
276 l = self.pipei.readline()
276 if l == '\n':
277 if l == '\n':
277 self.readerr()
278 self.readerr()
278 msg = _('check previous remote output')
279 msg = _('check previous remote output')
279 self._abort(error.OutOfBandError(hint=msg))
280 self._abort(error.OutOfBandError(hint=msg))
280 self.readerr()
281 self.readerr()
281 try:
282 try:
282 l = int(l)
283 l = int(l)
283 except ValueError:
284 except ValueError:
284 self._abort(error.ResponseError(_("unexpected response:"), l))
285 self._abort(error.ResponseError(_("unexpected response:"), l))
285 return self.pipei.read(l)
286 return self.pipei.read(l)
286
287
287 def _send(self, data, flush=False):
288 def _send(self, data, flush=False):
288 self.pipeo.write("%d\n" % len(data))
289 self.pipeo.write("%d\n" % len(data))
289 if data:
290 if data:
290 self.pipeo.write(data)
291 self.pipeo.write(data)
291 if flush:
292 if flush:
292 self.pipeo.flush()
293 self.pipeo.flush()
293 self.readerr()
294 self.readerr()
294
295
295 def lock(self):
296 def lock(self):
296 self._call("lock")
297 self._call("lock")
297 return remotelock(self)
298 return remotelock(self)
298
299
299 def unlock(self):
300 def unlock(self):
300 self._call("unlock")
301 self._call("unlock")
301
302
302 def addchangegroup(self, cg, source, url, lock=None):
303 def addchangegroup(self, cg, source, url, lock=None):
303 '''Send a changegroup to the remote server. Return an integer
304 '''Send a changegroup to the remote server. Return an integer
304 similar to unbundle(). DEPRECATED, since it requires locking the
305 similar to unbundle(). DEPRECATED, since it requires locking the
305 remote.'''
306 remote.'''
306 d = self._call("addchangegroup")
307 d = self._call("addchangegroup")
307 if d:
308 if d:
308 self._abort(error.RepoError(_("push refused: %s") % d))
309 self._abort(error.RepoError(_("push refused: %s") % d))
309 while True:
310 while True:
310 d = cg.read(4096)
311 d = cg.read(4096)
311 if not d:
312 if not d:
312 break
313 break
313 self.pipeo.write(d)
314 self.pipeo.write(d)
314 self.readerr()
315 self.readerr()
315
316
316 self.pipeo.flush()
317 self.pipeo.flush()
317
318
318 self.readerr()
319 self.readerr()
319 r = self._recv()
320 r = self._recv()
320 if not r:
321 if not r:
321 return 1
322 return 1
322 try:
323 try:
323 return int(r)
324 return int(r)
324 except ValueError:
325 except ValueError:
325 self._abort(error.ResponseError(_("unexpected response:"), r))
326 self._abort(error.ResponseError(_("unexpected response:"), r))
326
327
327 instance = sshpeer
328 instance = sshpeer
General Comments 0
You need to be logged in to leave comments. Login now