##// END OF EJS Templates
sshpeer: establish SSH connection before class instantiation...
Gregory Szorc -
r35953:00b9e26d default
parent child Browse files
Show More
@@ -1,385 +1,391 b''
1 1 # sshpeer.py - ssh repository proxy class for mercurial
2 2 #
3 3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import re
11 11
12 12 from .i18n import _
13 13 from . import (
14 14 error,
15 15 pycompat,
16 16 util,
17 17 wireproto,
18 18 )
19 19
20 20 def _serverquote(s):
21 21 """quote a string for the remote shell ... which we assume is sh"""
22 22 if not s:
23 23 return s
24 24 if re.match('[a-zA-Z0-9@%_+=:,./-]*$', s):
25 25 return s
26 26 return "'%s'" % s.replace("'", "'\\''")
27 27
28 28 def _forwardoutput(ui, pipe):
29 29 """display all data currently available on pipe as remote output.
30 30
31 31 This is non blocking."""
32 32 s = util.readpipe(pipe)
33 33 if s:
34 34 for l in s.splitlines():
35 35 ui.status(_("remote: "), l, '\n')
36 36
37 37 class doublepipe(object):
38 38 """Operate a side-channel pipe in addition of a main one
39 39
40 40 The side-channel pipe contains server output to be forwarded to the user
41 41 input. The double pipe will behave as the "main" pipe, but will ensure the
42 42 content of the "side" pipe is properly processed while we wait for blocking
43 43 call on the "main" pipe.
44 44
45 45 If large amounts of data are read from "main", the forward will cease after
46 46 the first bytes start to appear. This simplifies the implementation
47 47 without affecting actual output of sshpeer too much as we rarely issue
48 48 large read for data not yet emitted by the server.
49 49
50 50 The main pipe is expected to be a 'bufferedinputpipe' from the util module
51 51 that handle all the os specific bits. This class lives in this module
52 52 because it focus on behavior specific to the ssh protocol."""
53 53
54 54 def __init__(self, ui, main, side):
55 55 self._ui = ui
56 56 self._main = main
57 57 self._side = side
58 58
59 59 def _wait(self):
60 60 """wait until some data are available on main or side
61 61
62 62 return a pair of boolean (ismainready, issideready)
63 63
64 64 (This will only wait for data if the setup is supported by `util.poll`)
65 65 """
66 66 if getattr(self._main, 'hasbuffer', False): # getattr for classic pipe
67 67 return (True, True) # main has data, assume side is worth poking at.
68 68 fds = [self._main.fileno(), self._side.fileno()]
69 69 try:
70 70 act = util.poll(fds)
71 71 except NotImplementedError:
72 72 # non supported yet case, assume all have data.
73 73 act = fds
74 74 return (self._main.fileno() in act, self._side.fileno() in act)
75 75
76 76 def write(self, data):
77 77 return self._call('write', data)
78 78
79 79 def read(self, size):
80 80 r = self._call('read', size)
81 81 if size != 0 and not r:
82 82 # We've observed a condition that indicates the
83 83 # stdout closed unexpectedly. Check stderr one
84 84 # more time and snag anything that's there before
85 85 # letting anyone know the main part of the pipe
86 86 # closed prematurely.
87 87 _forwardoutput(self._ui, self._side)
88 88 return r
89 89
90 90 def readline(self):
91 91 return self._call('readline')
92 92
93 93 def _call(self, methname, data=None):
94 94 """call <methname> on "main", forward output of "side" while blocking
95 95 """
96 96 # data can be '' or 0
97 97 if (data is not None and not data) or self._main.closed:
98 98 _forwardoutput(self._ui, self._side)
99 99 return ''
100 100 while True:
101 101 mainready, sideready = self._wait()
102 102 if sideready:
103 103 _forwardoutput(self._ui, self._side)
104 104 if mainready:
105 105 meth = getattr(self._main, methname)
106 106 if data is None:
107 107 return meth()
108 108 else:
109 109 return meth(data)
110 110
111 111 def close(self):
112 112 return self._main.close()
113 113
114 114 def flush(self):
115 115 return self._main.flush()
116 116
117 117 def _cleanuppipes(ui, pipei, pipeo, pipee):
118 118 """Clean up pipes used by an SSH connection."""
119 119 if pipeo:
120 120 pipeo.close()
121 121 if pipei:
122 122 pipei.close()
123 123
124 124 if pipee:
125 125 # Try to read from the err descriptor until EOF.
126 126 try:
127 127 for l in pipee:
128 128 ui.status(_('remote: '), l)
129 129 except (IOError, ValueError):
130 130 pass
131 131
132 132 pipee.close()
133 133
134 def _makeconnection(ui, sshcmd, args, remotecmd, path, sshenv=None):
135 """Create an SSH connection to a server.
136
137 Returns a tuple of (process, stdin, stdout, stderr) for the
138 spawned process.
139 """
140 cmd = '%s %s %s' % (
141 sshcmd,
142 args,
143 util.shellquote('%s -R %s serve --stdio' % (
144 _serverquote(remotecmd), _serverquote(path))))
145
146 ui.debug('running %s\n' % cmd)
147 cmd = util.quotecommand(cmd)
148
149 # no buffer allow the use of 'select'
150 # feel free to remove buffering and select usage when we ultimately
151 # move to threading.
152 stdin, stdout, stderr, proc = util.popen4(cmd, bufsize=0, env=sshenv)
153
154 stdout = doublepipe(ui, util.bufferedinputpipe(stdout), stderr)
155 stdin = doublepipe(ui, stdin, stderr)
156
157 return proc, stdin, stdout, stderr
158
134 159 class sshpeer(wireproto.wirepeer):
135 160 def __init__(self, ui, path, create=False, sshstate=None):
136 161 self._url = path
137 162 self._ui = ui
138 self._pipeo = self._pipei = self._pipee = None
163 # self._subprocess is unused. Keeping a handle on the process
164 # holds a reference and prevents it from being garbage collected.
165 self._subprocess, self._pipei, self._pipeo, self._pipee = sshstate
139 166
140 u = util.url(path, parsequery=False, parsefragment=False)
141 self._path = u.path or '.'
142
143 self._validaterepo(*sshstate)
167 self._validaterepo()
144 168
145 169 # Begin of _basepeer interface.
146 170
147 171 @util.propertycache
148 172 def ui(self):
149 173 return self._ui
150 174
151 175 def url(self):
152 176 return self._url
153 177
154 178 def local(self):
155 179 return None
156 180
157 181 def peer(self):
158 182 return self
159 183
160 184 def canpush(self):
161 185 return True
162 186
163 187 def close(self):
164 188 pass
165 189
166 190 # End of _basepeer interface.
167 191
168 192 # Begin of _basewirecommands interface.
169 193
170 194 def capabilities(self):
171 195 return self._caps
172 196
173 197 # End of _basewirecommands interface.
174 198
175 def _validaterepo(self, sshcmd, args, remotecmd, sshenv=None):
176 assert self._pipei is None
177
178 cmd = '%s %s %s' % (sshcmd, args,
179 util.shellquote("%s -R %s serve --stdio" %
180 (_serverquote(remotecmd), _serverquote(self._path))))
181 self.ui.debug('running %s\n' % cmd)
182 cmd = util.quotecommand(cmd)
183
184 # while self._subprocess isn't used, having it allows the subprocess to
185 # to clean up correctly later
186 #
187 # no buffer allow the use of 'select'
188 # feel free to remove buffering and select usage when we ultimately
189 # move to threading.
190 sub = util.popen4(cmd, bufsize=0, env=sshenv)
191 self._pipeo, self._pipei, self._pipee, self._subprocess = sub
192
193 self._pipei = util.bufferedinputpipe(self._pipei)
194 self._pipei = doublepipe(self.ui, self._pipei, self._pipee)
195 self._pipeo = doublepipe(self.ui, self._pipeo, self._pipee)
196
199 def _validaterepo(self):
197 200 def badresponse():
198 201 msg = _("no suitable response from remote hg")
199 202 hint = self.ui.config("ui", "ssherrorhint")
200 203 self._abort(error.RepoError(msg, hint=hint))
201 204
202 205 try:
203 206 # skip any noise generated by remote shell
204 207 self._callstream("hello")
205 208 r = self._callstream("between", pairs=("%s-%s" % ("0"*40, "0"*40)))
206 209 except IOError:
207 210 badresponse()
208 211
209 212 lines = ["", "dummy"]
210 213 max_noise = 500
211 214 while lines[-1] and max_noise:
212 215 try:
213 216 l = r.readline()
214 217 self._readerr()
215 218 if lines[-1] == "1\n" and l == "\n":
216 219 break
217 220 if l:
218 221 self.ui.debug("remote: ", l)
219 222 lines.append(l)
220 223 max_noise -= 1
221 224 except IOError:
222 225 badresponse()
223 226 else:
224 227 badresponse()
225 228
226 229 self._caps = set()
227 230 for l in reversed(lines):
228 231 if l.startswith("capabilities:"):
229 232 self._caps.update(l[:-1].split(":")[1].split())
230 233 break
231 234
232 235 def _readerr(self):
233 236 _forwardoutput(self.ui, self._pipee)
234 237
235 238 def _abort(self, exception):
236 239 self._cleanup()
237 240 raise exception
238 241
239 242 def _cleanup(self):
240 243 _cleanuppipes(self.ui, self._pipei, self._pipeo, self._pipee)
241 244
242 245 __del__ = _cleanup
243 246
244 247 def _submitbatch(self, req):
245 248 rsp = self._callstream("batch", cmds=wireproto.encodebatchcmds(req))
246 249 available = self._getamount()
247 250 # TODO this response parsing is probably suboptimal for large
248 251 # batches with large responses.
249 252 toread = min(available, 1024)
250 253 work = rsp.read(toread)
251 254 available -= toread
252 255 chunk = work
253 256 while chunk:
254 257 while ';' in work:
255 258 one, work = work.split(';', 1)
256 259 yield wireproto.unescapearg(one)
257 260 toread = min(available, 1024)
258 261 chunk = rsp.read(toread)
259 262 available -= toread
260 263 work += chunk
261 264 yield wireproto.unescapearg(work)
262 265
263 266 def _callstream(self, cmd, **args):
264 267 args = pycompat.byteskwargs(args)
265 268 if (self.ui.debugflag
266 269 and self.ui.configbool('devel', 'debug.peer-request')):
267 270 dbg = self.ui.debug
268 271 line = 'devel-peer-request: %s\n'
269 272 dbg(line % cmd)
270 273 for key, value in sorted(args.items()):
271 274 if not isinstance(value, dict):
272 275 dbg(line % ' %s: %d bytes' % (key, len(value)))
273 276 else:
274 277 for dk, dv in sorted(value.items()):
275 278 dbg(line % ' %s-%s: %d' % (key, dk, len(dv)))
276 279 self.ui.debug("sending %s command\n" % cmd)
277 280 self._pipeo.write("%s\n" % cmd)
278 281 _func, names = wireproto.commands[cmd]
279 282 keys = names.split()
280 283 wireargs = {}
281 284 for k in keys:
282 285 if k == '*':
283 286 wireargs['*'] = args
284 287 break
285 288 else:
286 289 wireargs[k] = args[k]
287 290 del args[k]
288 291 for k, v in sorted(wireargs.iteritems()):
289 292 self._pipeo.write("%s %d\n" % (k, len(v)))
290 293 if isinstance(v, dict):
291 294 for dk, dv in v.iteritems():
292 295 self._pipeo.write("%s %d\n" % (dk, len(dv)))
293 296 self._pipeo.write(dv)
294 297 else:
295 298 self._pipeo.write(v)
296 299 self._pipeo.flush()
297 300
298 301 return self._pipei
299 302
300 303 def _callcompressable(self, cmd, **args):
301 304 return self._callstream(cmd, **args)
302 305
303 306 def _call(self, cmd, **args):
304 307 self._callstream(cmd, **args)
305 308 return self._recv()
306 309
307 310 def _callpush(self, cmd, fp, **args):
308 311 r = self._call(cmd, **args)
309 312 if r:
310 313 return '', r
311 314 for d in iter(lambda: fp.read(4096), ''):
312 315 self._send(d)
313 316 self._send("", flush=True)
314 317 r = self._recv()
315 318 if r:
316 319 return '', r
317 320 return self._recv(), ''
318 321
319 322 def _calltwowaystream(self, cmd, fp, **args):
320 323 r = self._call(cmd, **args)
321 324 if r:
322 325 # XXX needs to be made better
323 326 raise error.Abort(_('unexpected remote reply: %s') % r)
324 327 for d in iter(lambda: fp.read(4096), ''):
325 328 self._send(d)
326 329 self._send("", flush=True)
327 330 return self._pipei
328 331
329 332 def _getamount(self):
330 333 l = self._pipei.readline()
331 334 if l == '\n':
332 335 self._readerr()
333 336 msg = _('check previous remote output')
334 337 self._abort(error.OutOfBandError(hint=msg))
335 338 self._readerr()
336 339 try:
337 340 return int(l)
338 341 except ValueError:
339 342 self._abort(error.ResponseError(_("unexpected response:"), l))
340 343
341 344 def _recv(self):
342 345 return self._pipei.read(self._getamount())
343 346
344 347 def _send(self, data, flush=False):
345 348 self._pipeo.write("%d\n" % len(data))
346 349 if data:
347 350 self._pipeo.write(data)
348 351 if flush:
349 352 self._pipeo.flush()
350 353 self._readerr()
351 354
352 355 def instance(ui, path, create):
353 356 """Create an SSH peer.
354 357
355 358 The returned object conforms to the ``wireproto.wirepeer`` interface.
356 359 """
357 360 u = util.url(path, parsequery=False, parsefragment=False)
358 361 if u.scheme != 'ssh' or not u.host or u.path is None:
359 362 raise error.RepoError(_("couldn't parse location %s") % path)
360 363
361 364 util.checksafessh(path)
362 365
363 366 if u.passwd is not None:
364 367 raise error.RepoError(_('password in URL not supported'))
365 368
366 369 sshcmd = ui.config('ui', 'ssh')
367 370 remotecmd = ui.config('ui', 'remotecmd')
368 371 sshaddenv = dict(ui.configitems('sshenv'))
369 372 sshenv = util.shellenviron(sshaddenv)
370 373 remotepath = u.path or '.'
371 374
372 375 args = util.sshargs(sshcmd, u.host, u.user, u.port)
373 376
374 377 if create:
375 378 cmd = '%s %s %s' % (sshcmd, args,
376 379 util.shellquote('%s init %s' %
377 380 (_serverquote(remotecmd), _serverquote(remotepath))))
378 381 ui.debug('running %s\n' % cmd)
379 382 res = ui.system(cmd, blockedtag='sshpeer', environ=sshenv)
380 383 if res != 0:
381 384 raise error.RepoError(_('could not create remote repo'))
382 385
383 sshstate = (sshcmd, args, remotecmd, sshenv)
386 proc, stdin, stdout, stderr = _makeconnection(ui, sshcmd, args, remotecmd,
387 remotepath, sshenv)
388
389 sshstate = (proc, stdout, stdin, stderr)
384 390
385 391 return sshpeer(ui, path, create=create, sshstate=sshstate)
@@ -1,77 +1,78 b''
1 1 # Test that certain objects conform to well-defined interfaces.
2 2
3 3 from __future__ import absolute_import, print_function
4 4
5 5 from mercurial import (
6 6 bundlerepo,
7 7 httppeer,
8 8 localrepo,
9 9 sshpeer,
10 10 statichttprepo,
11 11 ui as uimod,
12 12 unionrepo,
13 13 )
14 14
15 15 def checkobject(o):
16 16 """Verify a constructed object conforms to interface rules.
17 17
18 18 An object must have __abstractmethods__ defined.
19 19
20 20 All "public" attributes of the object (attributes not prefixed with
21 21 an underscore) must be in __abstractmethods__ or appear on a base class
22 22 with __abstractmethods__.
23 23 """
24 24 name = o.__class__.__name__
25 25
26 26 allowed = set()
27 27 for cls in o.__class__.__mro__:
28 28 if not getattr(cls, '__abstractmethods__', set()):
29 29 continue
30 30
31 31 allowed |= cls.__abstractmethods__
32 32 allowed |= {a for a in dir(cls) if not a.startswith('_')}
33 33
34 34 if not allowed:
35 35 print('%s does not have abstract methods' % name)
36 36 return
37 37
38 38 public = {a for a in dir(o) if not a.startswith('_')}
39 39
40 40 for attr in sorted(public - allowed):
41 41 print('public attributes not in abstract interface: %s.%s' % (
42 42 name, attr))
43 43
44 44 # Facilitates testing localpeer.
45 45 class dummyrepo(object):
46 46 def __init__(self):
47 47 self.ui = uimod.ui()
48 48 def filtered(self, name):
49 49 pass
50 50 def _restrictcapabilities(self, caps):
51 51 pass
52 52
53 53 # Facilitates testing sshpeer without requiring an SSH server.
54 54 class testingsshpeer(sshpeer.sshpeer):
55 55 def _validaterepo(self, *args, **kwargs):
56 56 pass
57 57
58 58 class badpeer(httppeer.httppeer):
59 59 def __init__(self):
60 60 super(badpeer, self).__init__(uimod.ui(), 'http://localhost')
61 61 self.badattribute = True
62 62
63 63 def badmethod(self):
64 64 pass
65 65
66 66 def main():
67 67 ui = uimod.ui()
68 68
69 69 checkobject(badpeer())
70 70 checkobject(httppeer.httppeer(ui, 'http://localhost'))
71 71 checkobject(localrepo.localpeer(dummyrepo()))
72 checkobject(testingsshpeer(ui, 'ssh://localhost/foo', False, ()))
72 checkobject(testingsshpeer(ui, 'ssh://localhost/foo', False,
73 (None, None, None, None)))
73 74 checkobject(bundlerepo.bundlepeer(dummyrepo()))
74 75 checkobject(statichttprepo.statichttppeer(dummyrepo()))
75 76 checkobject(unionrepo.unionpeer(dummyrepo()))
76 77
77 78 main()
General Comments 0
You need to be logged in to leave comments. Login now