##// END OF EJS Templates
sshpeer: make "instance" a function...
Gregory Szorc -
r35946:b0d2885c default
parent child Browse files
Show More
@@ -1,373 +1,374 b''
1 1 # sshpeer.py - ssh repository proxy class for mercurial
2 2 #
3 3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import re
11 11
12 12 from .i18n import _
13 13 from . import (
14 14 error,
15 15 pycompat,
16 16 util,
17 17 wireproto,
18 18 )
19 19
20 20 def _serverquote(s):
21 21 """quote a string for the remote shell ... which we assume is sh"""
22 22 if not s:
23 23 return s
24 24 if re.match('[a-zA-Z0-9@%_+=:,./-]*$', s):
25 25 return s
26 26 return "'%s'" % s.replace("'", "'\\''")
27 27
28 28 def _forwardoutput(ui, pipe):
29 29 """display all data currently available on pipe as remote output.
30 30
31 31 This is non blocking."""
32 32 s = util.readpipe(pipe)
33 33 if s:
34 34 for l in s.splitlines():
35 35 ui.status(_("remote: "), l, '\n')
36 36
37 37 class doublepipe(object):
38 38 """Operate a side-channel pipe in addition of a main one
39 39
40 40 The side-channel pipe contains server output to be forwarded to the user
41 41 input. The double pipe will behave as the "main" pipe, but will ensure the
42 42 content of the "side" pipe is properly processed while we wait for blocking
43 43 call on the "main" pipe.
44 44
45 45 If large amounts of data are read from "main", the forward will cease after
46 46 the first bytes start to appear. This simplifies the implementation
47 47 without affecting actual output of sshpeer too much as we rarely issue
48 48 large read for data not yet emitted by the server.
49 49
50 50 The main pipe is expected to be a 'bufferedinputpipe' from the util module
51 51 that handle all the os specific bits. This class lives in this module
52 52 because it focus on behavior specific to the ssh protocol."""
53 53
54 54 def __init__(self, ui, main, side):
55 55 self._ui = ui
56 56 self._main = main
57 57 self._side = side
58 58
59 59 def _wait(self):
60 60 """wait until some data are available on main or side
61 61
62 62 return a pair of boolean (ismainready, issideready)
63 63
64 64 (This will only wait for data if the setup is supported by `util.poll`)
65 65 """
66 66 if getattr(self._main, 'hasbuffer', False): # getattr for classic pipe
67 67 return (True, True) # main has data, assume side is worth poking at.
68 68 fds = [self._main.fileno(), self._side.fileno()]
69 69 try:
70 70 act = util.poll(fds)
71 71 except NotImplementedError:
72 72 # non supported yet case, assume all have data.
73 73 act = fds
74 74 return (self._main.fileno() in act, self._side.fileno() in act)
75 75
76 76 def write(self, data):
77 77 return self._call('write', data)
78 78
79 79 def read(self, size):
80 80 r = self._call('read', size)
81 81 if size != 0 and not r:
82 82 # We've observed a condition that indicates the
83 83 # stdout closed unexpectedly. Check stderr one
84 84 # more time and snag anything that's there before
85 85 # letting anyone know the main part of the pipe
86 86 # closed prematurely.
87 87 _forwardoutput(self._ui, self._side)
88 88 return r
89 89
90 90 def readline(self):
91 91 return self._call('readline')
92 92
93 93 def _call(self, methname, data=None):
94 94 """call <methname> on "main", forward output of "side" while blocking
95 95 """
96 96 # data can be '' or 0
97 97 if (data is not None and not data) or self._main.closed:
98 98 _forwardoutput(self._ui, self._side)
99 99 return ''
100 100 while True:
101 101 mainready, sideready = self._wait()
102 102 if sideready:
103 103 _forwardoutput(self._ui, self._side)
104 104 if mainready:
105 105 meth = getattr(self._main, methname)
106 106 if data is None:
107 107 return meth()
108 108 else:
109 109 return meth(data)
110 110
111 111 def close(self):
112 112 return self._main.close()
113 113
114 114 def flush(self):
115 115 return self._main.flush()
116 116
117 117 class sshpeer(wireproto.wirepeer):
118 118 def __init__(self, ui, path, create=False):
119 119 self._url = path
120 120 self._ui = ui
121 121 self._pipeo = self._pipei = self._pipee = None
122 122
123 123 u = util.url(path, parsequery=False, parsefragment=False)
124 124 if u.scheme != 'ssh' or not u.host or u.path is None:
125 125 self._abort(error.RepoError(_("couldn't parse location %s") % path))
126 126
127 127 util.checksafessh(path)
128 128
129 129 if u.passwd is not None:
130 130 self._abort(error.RepoError(_("password in URL not supported")))
131 131
132 132 self._user = u.user
133 133 self._host = u.host
134 134 self._port = u.port
135 135 self._path = u.path or '.'
136 136
137 137 sshcmd = self.ui.config("ui", "ssh")
138 138 remotecmd = self.ui.config("ui", "remotecmd")
139 139 sshaddenv = dict(self.ui.configitems("sshenv"))
140 140 sshenv = util.shellenviron(sshaddenv)
141 141
142 142 args = util.sshargs(sshcmd, self._host, self._user, self._port)
143 143
144 144 if create:
145 145 cmd = '%s %s %s' % (sshcmd, args,
146 146 util.shellquote("%s init %s" %
147 147 (_serverquote(remotecmd), _serverquote(self._path))))
148 148 ui.debug('running %s\n' % cmd)
149 149 res = ui.system(cmd, blockedtag='sshpeer', environ=sshenv)
150 150 if res != 0:
151 151 self._abort(error.RepoError(_("could not create remote repo")))
152 152
153 153 self._validaterepo(sshcmd, args, remotecmd, sshenv)
154 154
155 155 # Begin of _basepeer interface.
156 156
157 157 @util.propertycache
158 158 def ui(self):
159 159 return self._ui
160 160
161 161 def url(self):
162 162 return self._url
163 163
164 164 def local(self):
165 165 return None
166 166
167 167 def peer(self):
168 168 return self
169 169
170 170 def canpush(self):
171 171 return True
172 172
173 173 def close(self):
174 174 pass
175 175
176 176 # End of _basepeer interface.
177 177
178 178 # Begin of _basewirecommands interface.
179 179
180 180 def capabilities(self):
181 181 return self._caps
182 182
183 183 # End of _basewirecommands interface.
184 184
185 185 def _validaterepo(self, sshcmd, args, remotecmd, sshenv=None):
186 186 # cleanup up previous run
187 187 self._cleanup()
188 188
189 189 cmd = '%s %s %s' % (sshcmd, args,
190 190 util.shellquote("%s -R %s serve --stdio" %
191 191 (_serverquote(remotecmd), _serverquote(self._path))))
192 192 self.ui.debug('running %s\n' % cmd)
193 193 cmd = util.quotecommand(cmd)
194 194
195 195 # while self._subprocess isn't used, having it allows the subprocess to
196 196 # to clean up correctly later
197 197 #
198 198 # no buffer allow the use of 'select'
199 199 # feel free to remove buffering and select usage when we ultimately
200 200 # move to threading.
201 201 sub = util.popen4(cmd, bufsize=0, env=sshenv)
202 202 self._pipeo, self._pipei, self._pipee, self._subprocess = sub
203 203
204 204 self._pipei = util.bufferedinputpipe(self._pipei)
205 205 self._pipei = doublepipe(self.ui, self._pipei, self._pipee)
206 206 self._pipeo = doublepipe(self.ui, self._pipeo, self._pipee)
207 207
208 208 def badresponse():
209 209 msg = _("no suitable response from remote hg")
210 210 hint = self.ui.config("ui", "ssherrorhint")
211 211 self._abort(error.RepoError(msg, hint=hint))
212 212
213 213 try:
214 214 # skip any noise generated by remote shell
215 215 self._callstream("hello")
216 216 r = self._callstream("between", pairs=("%s-%s" % ("0"*40, "0"*40)))
217 217 except IOError:
218 218 badresponse()
219 219
220 220 lines = ["", "dummy"]
221 221 max_noise = 500
222 222 while lines[-1] and max_noise:
223 223 try:
224 224 l = r.readline()
225 225 self._readerr()
226 226 if lines[-1] == "1\n" and l == "\n":
227 227 break
228 228 if l:
229 229 self.ui.debug("remote: ", l)
230 230 lines.append(l)
231 231 max_noise -= 1
232 232 except IOError:
233 233 badresponse()
234 234 else:
235 235 badresponse()
236 236
237 237 self._caps = set()
238 238 for l in reversed(lines):
239 239 if l.startswith("capabilities:"):
240 240 self._caps.update(l[:-1].split(":")[1].split())
241 241 break
242 242
243 243 def _readerr(self):
244 244 _forwardoutput(self.ui, self._pipee)
245 245
246 246 def _abort(self, exception):
247 247 self._cleanup()
248 248 raise exception
249 249
250 250 def _cleanup(self):
251 251 if self._pipeo is None:
252 252 return
253 253 self._pipeo.close()
254 254 self._pipei.close()
255 255 try:
256 256 # read the error descriptor until EOF
257 257 for l in self._pipee:
258 258 self.ui.status(_("remote: "), l)
259 259 except (IOError, ValueError):
260 260 pass
261 261 self._pipee.close()
262 262
263 263 __del__ = _cleanup
264 264
265 265 def _submitbatch(self, req):
266 266 rsp = self._callstream("batch", cmds=wireproto.encodebatchcmds(req))
267 267 available = self._getamount()
268 268 # TODO this response parsing is probably suboptimal for large
269 269 # batches with large responses.
270 270 toread = min(available, 1024)
271 271 work = rsp.read(toread)
272 272 available -= toread
273 273 chunk = work
274 274 while chunk:
275 275 while ';' in work:
276 276 one, work = work.split(';', 1)
277 277 yield wireproto.unescapearg(one)
278 278 toread = min(available, 1024)
279 279 chunk = rsp.read(toread)
280 280 available -= toread
281 281 work += chunk
282 282 yield wireproto.unescapearg(work)
283 283
284 284 def _callstream(self, cmd, **args):
285 285 args = pycompat.byteskwargs(args)
286 286 if (self.ui.debugflag
287 287 and self.ui.configbool('devel', 'debug.peer-request')):
288 288 dbg = self.ui.debug
289 289 line = 'devel-peer-request: %s\n'
290 290 dbg(line % cmd)
291 291 for key, value in sorted(args.items()):
292 292 if not isinstance(value, dict):
293 293 dbg(line % ' %s: %d bytes' % (key, len(value)))
294 294 else:
295 295 for dk, dv in sorted(value.items()):
296 296 dbg(line % ' %s-%s: %d' % (key, dk, len(dv)))
297 297 self.ui.debug("sending %s command\n" % cmd)
298 298 self._pipeo.write("%s\n" % cmd)
299 299 _func, names = wireproto.commands[cmd]
300 300 keys = names.split()
301 301 wireargs = {}
302 302 for k in keys:
303 303 if k == '*':
304 304 wireargs['*'] = args
305 305 break
306 306 else:
307 307 wireargs[k] = args[k]
308 308 del args[k]
309 309 for k, v in sorted(wireargs.iteritems()):
310 310 self._pipeo.write("%s %d\n" % (k, len(v)))
311 311 if isinstance(v, dict):
312 312 for dk, dv in v.iteritems():
313 313 self._pipeo.write("%s %d\n" % (dk, len(dv)))
314 314 self._pipeo.write(dv)
315 315 else:
316 316 self._pipeo.write(v)
317 317 self._pipeo.flush()
318 318
319 319 return self._pipei
320 320
321 321 def _callcompressable(self, cmd, **args):
322 322 return self._callstream(cmd, **args)
323 323
324 324 def _call(self, cmd, **args):
325 325 self._callstream(cmd, **args)
326 326 return self._recv()
327 327
328 328 def _callpush(self, cmd, fp, **args):
329 329 r = self._call(cmd, **args)
330 330 if r:
331 331 return '', r
332 332 for d in iter(lambda: fp.read(4096), ''):
333 333 self._send(d)
334 334 self._send("", flush=True)
335 335 r = self._recv()
336 336 if r:
337 337 return '', r
338 338 return self._recv(), ''
339 339
340 340 def _calltwowaystream(self, cmd, fp, **args):
341 341 r = self._call(cmd, **args)
342 342 if r:
343 343 # XXX needs to be made better
344 344 raise error.Abort(_('unexpected remote reply: %s') % r)
345 345 for d in iter(lambda: fp.read(4096), ''):
346 346 self._send(d)
347 347 self._send("", flush=True)
348 348 return self._pipei
349 349
350 350 def _getamount(self):
351 351 l = self._pipei.readline()
352 352 if l == '\n':
353 353 self._readerr()
354 354 msg = _('check previous remote output')
355 355 self._abort(error.OutOfBandError(hint=msg))
356 356 self._readerr()
357 357 try:
358 358 return int(l)
359 359 except ValueError:
360 360 self._abort(error.ResponseError(_("unexpected response:"), l))
361 361
362 362 def _recv(self):
363 363 return self._pipei.read(self._getamount())
364 364
365 365 def _send(self, data, flush=False):
366 366 self._pipeo.write("%d\n" % len(data))
367 367 if data:
368 368 self._pipeo.write(data)
369 369 if flush:
370 370 self._pipeo.flush()
371 371 self._readerr()
372 372
373 instance = sshpeer
373 def instance(ui, path, create):
374 return sshpeer(ui, path, create=create)
General Comments 0
You need to be logged in to leave comments. Login now