##// END OF EJS Templates
sshpeer: make instance attributes and methods internal...
Gregory Szorc -
r33763:82d564d5 default
parent child Browse files
Show More
@@ -1,324 +1,325
1 1 # sshpeer.py - ssh repository proxy class for mercurial
2 2 #
3 3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import re
11 11
12 12 from .i18n import _
13 13 from . import (
14 14 error,
15 15 pycompat,
16 16 util,
17 17 wireproto,
18 18 )
19 19
20 20 def _serverquote(s):
21 21 if not s:
22 22 return s
23 23 '''quote a string for the remote shell ... which we assume is sh'''
24 24 if re.match('[a-zA-Z0-9@%_+=:,./-]*$', s):
25 25 return s
26 26 return "'%s'" % s.replace("'", "'\\''")
27 27
28 28 def _forwardoutput(ui, pipe):
29 29 """display all data currently available on pipe as remote output.
30 30
31 31 This is non blocking."""
32 32 s = util.readpipe(pipe)
33 33 if s:
34 34 for l in s.splitlines():
35 35 ui.status(_("remote: "), l, '\n')
36 36
37 37 class doublepipe(object):
38 38 """Operate a side-channel pipe in addition of a main one
39 39
40 40 The side-channel pipe contains server output to be forwarded to the user
41 41 input. The double pipe will behave as the "main" pipe, but will ensure the
42 42 content of the "side" pipe is properly processed while we wait for blocking
43 43 call on the "main" pipe.
44 44
45 45 If large amounts of data are read from "main", the forward will cease after
46 46 the first bytes start to appear. This simplifies the implementation
47 47 without affecting actual output of sshpeer too much as we rarely issue
48 48 large read for data not yet emitted by the server.
49 49
50 50 The main pipe is expected to be a 'bufferedinputpipe' from the util module
51 51 that handle all the os specific bits. This class lives in this module
52 52 because it focus on behavior specific to the ssh protocol."""
53 53
54 54 def __init__(self, ui, main, side):
55 55 self._ui = ui
56 56 self._main = main
57 57 self._side = side
58 58
59 59 def _wait(self):
60 60 """wait until some data are available on main or side
61 61
62 62 return a pair of boolean (ismainready, issideready)
63 63
64 64 (This will only wait for data if the setup is supported by `util.poll`)
65 65 """
66 66 if getattr(self._main, 'hasbuffer', False): # getattr for classic pipe
67 67 return (True, True) # main has data, assume side is worth poking at.
68 68 fds = [self._main.fileno(), self._side.fileno()]
69 69 try:
70 70 act = util.poll(fds)
71 71 except NotImplementedError:
72 72 # non supported yet case, assume all have data.
73 73 act = fds
74 74 return (self._main.fileno() in act, self._side.fileno() in act)
75 75
76 76 def write(self, data):
77 77 return self._call('write', data)
78 78
79 79 def read(self, size):
80 80 r = self._call('read', size)
81 81 if size != 0 and not r:
82 82 # We've observed a condition that indicates the
83 83 # stdout closed unexpectedly. Check stderr one
84 84 # more time and snag anything that's there before
85 85 # letting anyone know the main part of the pipe
86 86 # closed prematurely.
87 87 _forwardoutput(self._ui, self._side)
88 88 return r
89 89
90 90 def readline(self):
91 91 return self._call('readline')
92 92
93 93 def _call(self, methname, data=None):
94 94 """call <methname> on "main", forward output of "side" while blocking
95 95 """
96 96 # data can be '' or 0
97 97 if (data is not None and not data) or self._main.closed:
98 98 _forwardoutput(self._ui, self._side)
99 99 return ''
100 100 while True:
101 101 mainready, sideready = self._wait()
102 102 if sideready:
103 103 _forwardoutput(self._ui, self._side)
104 104 if mainready:
105 105 meth = getattr(self._main, methname)
106 106 if data is None:
107 107 return meth()
108 108 else:
109 109 return meth(data)
110 110
111 111 def close(self):
112 112 return self._main.close()
113 113
114 114 def flush(self):
115 115 return self._main.flush()
116 116
117 117 class sshpeer(wireproto.wirepeer):
118 118 def __init__(self, ui, path, create=False):
119 119 self._url = path
120 120 self.ui = ui
121 self.pipeo = self.pipei = self.pipee = None
121 self._pipeo = self._pipei = self._pipee = None
122 122
123 123 u = util.url(path, parsequery=False, parsefragment=False)
124 124 if u.scheme != 'ssh' or not u.host or u.path is None:
125 125 self._abort(error.RepoError(_("couldn't parse location %s") % path))
126 126
127 127 util.checksafessh(path)
128 128
129 self.user = u.user
130 129 if u.passwd is not None:
131 130 self._abort(error.RepoError(_("password in URL not supported")))
132 self.host = u.host
133 self.port = u.port
134 self.path = u.path or "."
131
132 self._user = u.user
133 self._host = u.host
134 self._port = u.port
135 self._path = u.path or '.'
135 136
136 137 sshcmd = self.ui.config("ui", "ssh")
137 138 remotecmd = self.ui.config("ui", "remotecmd")
138 139
139 args = util.sshargs(sshcmd, self.host, self.user, self.port)
140 args = util.sshargs(sshcmd, self._host, self._user, self._port)
140 141
141 142 if create:
142 143 cmd = '%s %s %s' % (sshcmd, args,
143 144 util.shellquote("%s init %s" %
144 (_serverquote(remotecmd), _serverquote(self.path))))
145 (_serverquote(remotecmd), _serverquote(self._path))))
145 146 ui.debug('running %s\n' % cmd)
146 147 res = ui.system(cmd, blockedtag='sshpeer')
147 148 if res != 0:
148 149 self._abort(error.RepoError(_("could not create remote repo")))
149 150
150 151 self._validaterepo(sshcmd, args, remotecmd)
151 152
152 153 def url(self):
153 154 return self._url
154 155
155 156 def _validaterepo(self, sshcmd, args, remotecmd):
156 157 # cleanup up previous run
157 self.cleanup()
158 self._cleanup()
158 159
159 160 cmd = '%s %s %s' % (sshcmd, args,
160 161 util.shellquote("%s -R %s serve --stdio" %
161 (_serverquote(remotecmd), _serverquote(self.path))))
162 (_serverquote(remotecmd), _serverquote(self._path))))
162 163 self.ui.debug('running %s\n' % cmd)
163 164 cmd = util.quotecommand(cmd)
164 165
165 # while self.subprocess isn't used, having it allows the subprocess to
166 # while self._subprocess isn't used, having it allows the subprocess to
166 167 # to clean up correctly later
167 168 #
168 169 # no buffer allow the use of 'select'
169 170 # feel free to remove buffering and select usage when we ultimately
170 171 # move to threading.
171 172 sub = util.popen4(cmd, bufsize=0)
172 self.pipeo, self.pipei, self.pipee, self.subprocess = sub
173 self._pipeo, self._pipei, self._pipee, self._subprocess = sub
173 174
174 self.pipei = util.bufferedinputpipe(self.pipei)
175 self.pipei = doublepipe(self.ui, self.pipei, self.pipee)
176 self.pipeo = doublepipe(self.ui, self.pipeo, self.pipee)
175 self._pipei = util.bufferedinputpipe(self._pipei)
176 self._pipei = doublepipe(self.ui, self._pipei, self._pipee)
177 self._pipeo = doublepipe(self.ui, self._pipeo, self._pipee)
177 178
178 179 # skip any noise generated by remote shell
179 180 self._callstream("hello")
180 181 r = self._callstream("between", pairs=("%s-%s" % ("0"*40, "0"*40)))
181 182 lines = ["", "dummy"]
182 183 max_noise = 500
183 184 while lines[-1] and max_noise:
184 185 l = r.readline()
185 self.readerr()
186 self._readerr()
186 187 if lines[-1] == "1\n" and l == "\n":
187 188 break
188 189 if l:
189 190 self.ui.debug("remote: ", l)
190 191 lines.append(l)
191 192 max_noise -= 1
192 193 else:
193 194 self._abort(error.RepoError(_('no suitable response from '
194 195 'remote hg')))
195 196
196 197 self._caps = set()
197 198 for l in reversed(lines):
198 199 if l.startswith("capabilities:"):
199 200 self._caps.update(l[:-1].split(":")[1].split())
200 201 break
201 202
202 203 def _capabilities(self):
203 204 return self._caps
204 205
205 def readerr(self):
206 _forwardoutput(self.ui, self.pipee)
206 def _readerr(self):
207 _forwardoutput(self.ui, self._pipee)
207 208
208 209 def _abort(self, exception):
209 self.cleanup()
210 self._cleanup()
210 211 raise exception
211 212
212 def cleanup(self):
213 if self.pipeo is None:
213 def _cleanup(self):
214 if self._pipeo is None:
214 215 return
215 self.pipeo.close()
216 self.pipei.close()
216 self._pipeo.close()
217 self._pipei.close()
217 218 try:
218 219 # read the error descriptor until EOF
219 for l in self.pipee:
220 for l in self._pipee:
220 221 self.ui.status(_("remote: "), l)
221 222 except (IOError, ValueError):
222 223 pass
223 self.pipee.close()
224 self._pipee.close()
224 225
225 __del__ = cleanup
226 __del__ = _cleanup
226 227
227 228 def _submitbatch(self, req):
228 229 rsp = self._callstream("batch", cmds=wireproto.encodebatchcmds(req))
229 230 available = self._getamount()
230 231 # TODO this response parsing is probably suboptimal for large
231 232 # batches with large responses.
232 233 toread = min(available, 1024)
233 234 work = rsp.read(toread)
234 235 available -= toread
235 236 chunk = work
236 237 while chunk:
237 238 while ';' in work:
238 239 one, work = work.split(';', 1)
239 240 yield wireproto.unescapearg(one)
240 241 toread = min(available, 1024)
241 242 chunk = rsp.read(toread)
242 243 available -= toread
243 244 work += chunk
244 245 yield wireproto.unescapearg(work)
245 246
246 247 def _callstream(self, cmd, **args):
247 248 args = pycompat.byteskwargs(args)
248 249 self.ui.debug("sending %s command\n" % cmd)
249 self.pipeo.write("%s\n" % cmd)
250 self._pipeo.write("%s\n" % cmd)
250 251 _func, names = wireproto.commands[cmd]
251 252 keys = names.split()
252 253 wireargs = {}
253 254 for k in keys:
254 255 if k == '*':
255 256 wireargs['*'] = args
256 257 break
257 258 else:
258 259 wireargs[k] = args[k]
259 260 del args[k]
260 261 for k, v in sorted(wireargs.iteritems()):
261 self.pipeo.write("%s %d\n" % (k, len(v)))
262 self._pipeo.write("%s %d\n" % (k, len(v)))
262 263 if isinstance(v, dict):
263 264 for dk, dv in v.iteritems():
264 self.pipeo.write("%s %d\n" % (dk, len(dv)))
265 self.pipeo.write(dv)
265 self._pipeo.write("%s %d\n" % (dk, len(dv)))
266 self._pipeo.write(dv)
266 267 else:
267 self.pipeo.write(v)
268 self.pipeo.flush()
268 self._pipeo.write(v)
269 self._pipeo.flush()
269 270
270 return self.pipei
271 return self._pipei
271 272
272 273 def _callcompressable(self, cmd, **args):
273 274 return self._callstream(cmd, **args)
274 275
275 276 def _call(self, cmd, **args):
276 277 self._callstream(cmd, **args)
277 278 return self._recv()
278 279
279 280 def _callpush(self, cmd, fp, **args):
280 281 r = self._call(cmd, **args)
281 282 if r:
282 283 return '', r
283 284 for d in iter(lambda: fp.read(4096), ''):
284 285 self._send(d)
285 286 self._send("", flush=True)
286 287 r = self._recv()
287 288 if r:
288 289 return '', r
289 290 return self._recv(), ''
290 291
291 292 def _calltwowaystream(self, cmd, fp, **args):
292 293 r = self._call(cmd, **args)
293 294 if r:
294 295 # XXX needs to be made better
295 296 raise error.Abort(_('unexpected remote reply: %s') % r)
296 297 for d in iter(lambda: fp.read(4096), ''):
297 298 self._send(d)
298 299 self._send("", flush=True)
299 return self.pipei
300 return self._pipei
300 301
301 302 def _getamount(self):
302 l = self.pipei.readline()
303 l = self._pipei.readline()
303 304 if l == '\n':
304 self.readerr()
305 self._readerr()
305 306 msg = _('check previous remote output')
306 307 self._abort(error.OutOfBandError(hint=msg))
307 self.readerr()
308 self._readerr()
308 309 try:
309 310 return int(l)
310 311 except ValueError:
311 312 self._abort(error.ResponseError(_("unexpected response:"), l))
312 313
313 314 def _recv(self):
314 return self.pipei.read(self._getamount())
315 return self._pipei.read(self._getamount())
315 316
316 317 def _send(self, data, flush=False):
317 self.pipeo.write("%d\n" % len(data))
318 self._pipeo.write("%d\n" % len(data))
318 319 if data:
319 self.pipeo.write(data)
320 self._pipeo.write(data)
320 321 if flush:
321 self.pipeo.flush()
322 self.readerr()
322 self._pipeo.flush()
323 self._readerr()
323 324
324 325 instance = sshpeer
General Comments 0
You need to be logged in to leave comments. Login now