##// END OF EJS Templates
sshpeer: extract pipe cleanup logic to own function...
Gregory Szorc -
r35951:805edf16 default
parent child Browse files
Show More
@@ -1,379 +1,386 b''
1 1 # sshpeer.py - ssh repository proxy class for mercurial
2 2 #
3 3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import re
11 11
12 12 from .i18n import _
13 13 from . import (
14 14 error,
15 15 pycompat,
16 16 util,
17 17 wireproto,
18 18 )
19 19
20 20 def _serverquote(s):
21 21 """quote a string for the remote shell ... which we assume is sh"""
22 22 if not s:
23 23 return s
24 24 if re.match('[a-zA-Z0-9@%_+=:,./-]*$', s):
25 25 return s
26 26 return "'%s'" % s.replace("'", "'\\''")
27 27
28 28 def _forwardoutput(ui, pipe):
29 29 """display all data currently available on pipe as remote output.
30 30
31 31 This is non blocking."""
32 32 s = util.readpipe(pipe)
33 33 if s:
34 34 for l in s.splitlines():
35 35 ui.status(_("remote: "), l, '\n')
36 36
37 37 class doublepipe(object):
38 38 """Operate a side-channel pipe in addition of a main one
39 39
40 40 The side-channel pipe contains server output to be forwarded to the user
41 41 input. The double pipe will behave as the "main" pipe, but will ensure the
42 42 content of the "side" pipe is properly processed while we wait for blocking
43 43 call on the "main" pipe.
44 44
45 45 If large amounts of data are read from "main", the forward will cease after
46 46 the first bytes start to appear. This simplifies the implementation
47 47 without affecting actual output of sshpeer too much as we rarely issue
48 48 large read for data not yet emitted by the server.
49 49
50 50 The main pipe is expected to be a 'bufferedinputpipe' from the util module
51 51 that handle all the os specific bits. This class lives in this module
52 52 because it focus on behavior specific to the ssh protocol."""
53 53
54 54 def __init__(self, ui, main, side):
55 55 self._ui = ui
56 56 self._main = main
57 57 self._side = side
58 58
59 59 def _wait(self):
60 60 """wait until some data are available on main or side
61 61
62 62 return a pair of boolean (ismainready, issideready)
63 63
64 64 (This will only wait for data if the setup is supported by `util.poll`)
65 65 """
66 66 if getattr(self._main, 'hasbuffer', False): # getattr for classic pipe
67 67 return (True, True) # main has data, assume side is worth poking at.
68 68 fds = [self._main.fileno(), self._side.fileno()]
69 69 try:
70 70 act = util.poll(fds)
71 71 except NotImplementedError:
72 72 # non supported yet case, assume all have data.
73 73 act = fds
74 74 return (self._main.fileno() in act, self._side.fileno() in act)
75 75
76 76 def write(self, data):
77 77 return self._call('write', data)
78 78
79 79 def read(self, size):
80 80 r = self._call('read', size)
81 81 if size != 0 and not r:
82 82 # We've observed a condition that indicates the
83 83 # stdout closed unexpectedly. Check stderr one
84 84 # more time and snag anything that's there before
85 85 # letting anyone know the main part of the pipe
86 86 # closed prematurely.
87 87 _forwardoutput(self._ui, self._side)
88 88 return r
89 89
90 90 def readline(self):
91 91 return self._call('readline')
92 92
93 93 def _call(self, methname, data=None):
94 94 """call <methname> on "main", forward output of "side" while blocking
95 95 """
96 96 # data can be '' or 0
97 97 if (data is not None and not data) or self._main.closed:
98 98 _forwardoutput(self._ui, self._side)
99 99 return ''
100 100 while True:
101 101 mainready, sideready = self._wait()
102 102 if sideready:
103 103 _forwardoutput(self._ui, self._side)
104 104 if mainready:
105 105 meth = getattr(self._main, methname)
106 106 if data is None:
107 107 return meth()
108 108 else:
109 109 return meth(data)
110 110
111 111 def close(self):
112 112 return self._main.close()
113 113
114 114 def flush(self):
115 115 return self._main.flush()
116 116
117 def _cleanuppipes(ui, pipei, pipeo, pipee):
118 """Clean up pipes used by an SSH connection."""
119 if pipeo:
120 pipeo.close()
121 if pipei:
122 pipei.close()
123
124 if pipee:
125 # Try to read from the err descriptor until EOF.
126 try:
127 for l in pipee:
128 ui.status(_('remote: '), l)
129 except (IOError, ValueError):
130 pass
131
132 pipee.close()
133
117 134 class sshpeer(wireproto.wirepeer):
118 135 def __init__(self, ui, path, create=False, sshstate=None):
119 136 self._url = path
120 137 self._ui = ui
121 138 self._pipeo = self._pipei = self._pipee = None
122 139
123 140 u = util.url(path, parsequery=False, parsefragment=False)
124 141 self._path = u.path or '.'
125 142
126 143 self._validaterepo(*sshstate)
127 144
128 145 # Begin of _basepeer interface.
129 146
130 147 @util.propertycache
131 148 def ui(self):
132 149 return self._ui
133 150
134 151 def url(self):
135 152 return self._url
136 153
137 154 def local(self):
138 155 return None
139 156
140 157 def peer(self):
141 158 return self
142 159
143 160 def canpush(self):
144 161 return True
145 162
146 163 def close(self):
147 164 pass
148 165
149 166 # End of _basepeer interface.
150 167
151 168 # Begin of _basewirecommands interface.
152 169
153 170 def capabilities(self):
154 171 return self._caps
155 172
156 173 # End of _basewirecommands interface.
157 174
158 175 def _validaterepo(self, sshcmd, args, remotecmd, sshenv=None):
159 176 # cleanup up previous run
160 177 self._cleanup()
161 178
162 179 cmd = '%s %s %s' % (sshcmd, args,
163 180 util.shellquote("%s -R %s serve --stdio" %
164 181 (_serverquote(remotecmd), _serverquote(self._path))))
165 182 self.ui.debug('running %s\n' % cmd)
166 183 cmd = util.quotecommand(cmd)
167 184
168 185 # while self._subprocess isn't used, having it allows the subprocess to
169 186 # to clean up correctly later
170 187 #
171 188 # no buffer allow the use of 'select'
172 189 # feel free to remove buffering and select usage when we ultimately
173 190 # move to threading.
174 191 sub = util.popen4(cmd, bufsize=0, env=sshenv)
175 192 self._pipeo, self._pipei, self._pipee, self._subprocess = sub
176 193
177 194 self._pipei = util.bufferedinputpipe(self._pipei)
178 195 self._pipei = doublepipe(self.ui, self._pipei, self._pipee)
179 196 self._pipeo = doublepipe(self.ui, self._pipeo, self._pipee)
180 197
181 198 def badresponse():
182 199 msg = _("no suitable response from remote hg")
183 200 hint = self.ui.config("ui", "ssherrorhint")
184 201 self._abort(error.RepoError(msg, hint=hint))
185 202
186 203 try:
187 204 # skip any noise generated by remote shell
188 205 self._callstream("hello")
189 206 r = self._callstream("between", pairs=("%s-%s" % ("0"*40, "0"*40)))
190 207 except IOError:
191 208 badresponse()
192 209
193 210 lines = ["", "dummy"]
194 211 max_noise = 500
195 212 while lines[-1] and max_noise:
196 213 try:
197 214 l = r.readline()
198 215 self._readerr()
199 216 if lines[-1] == "1\n" and l == "\n":
200 217 break
201 218 if l:
202 219 self.ui.debug("remote: ", l)
203 220 lines.append(l)
204 221 max_noise -= 1
205 222 except IOError:
206 223 badresponse()
207 224 else:
208 225 badresponse()
209 226
210 227 self._caps = set()
211 228 for l in reversed(lines):
212 229 if l.startswith("capabilities:"):
213 230 self._caps.update(l[:-1].split(":")[1].split())
214 231 break
215 232
216 233 def _readerr(self):
217 234 _forwardoutput(self.ui, self._pipee)
218 235
219 236 def _abort(self, exception):
220 237 self._cleanup()
221 238 raise exception
222 239
223 240 def _cleanup(self):
224 if self._pipeo is None:
225 return
226 self._pipeo.close()
227 self._pipei.close()
228 try:
229 # read the error descriptor until EOF
230 for l in self._pipee:
231 self.ui.status(_("remote: "), l)
232 except (IOError, ValueError):
233 pass
234 self._pipee.close()
241 _cleanuppipes(self.ui, self._pipei, self._pipeo, self._pipee)
235 242
236 243 __del__ = _cleanup
237 244
238 245 def _submitbatch(self, req):
239 246 rsp = self._callstream("batch", cmds=wireproto.encodebatchcmds(req))
240 247 available = self._getamount()
241 248 # TODO this response parsing is probably suboptimal for large
242 249 # batches with large responses.
243 250 toread = min(available, 1024)
244 251 work = rsp.read(toread)
245 252 available -= toread
246 253 chunk = work
247 254 while chunk:
248 255 while ';' in work:
249 256 one, work = work.split(';', 1)
250 257 yield wireproto.unescapearg(one)
251 258 toread = min(available, 1024)
252 259 chunk = rsp.read(toread)
253 260 available -= toread
254 261 work += chunk
255 262 yield wireproto.unescapearg(work)
256 263
257 264 def _callstream(self, cmd, **args):
258 265 args = pycompat.byteskwargs(args)
259 266 if (self.ui.debugflag
260 267 and self.ui.configbool('devel', 'debug.peer-request')):
261 268 dbg = self.ui.debug
262 269 line = 'devel-peer-request: %s\n'
263 270 dbg(line % cmd)
264 271 for key, value in sorted(args.items()):
265 272 if not isinstance(value, dict):
266 273 dbg(line % ' %s: %d bytes' % (key, len(value)))
267 274 else:
268 275 for dk, dv in sorted(value.items()):
269 276 dbg(line % ' %s-%s: %d' % (key, dk, len(dv)))
270 277 self.ui.debug("sending %s command\n" % cmd)
271 278 self._pipeo.write("%s\n" % cmd)
272 279 _func, names = wireproto.commands[cmd]
273 280 keys = names.split()
274 281 wireargs = {}
275 282 for k in keys:
276 283 if k == '*':
277 284 wireargs['*'] = args
278 285 break
279 286 else:
280 287 wireargs[k] = args[k]
281 288 del args[k]
282 289 for k, v in sorted(wireargs.iteritems()):
283 290 self._pipeo.write("%s %d\n" % (k, len(v)))
284 291 if isinstance(v, dict):
285 292 for dk, dv in v.iteritems():
286 293 self._pipeo.write("%s %d\n" % (dk, len(dv)))
287 294 self._pipeo.write(dv)
288 295 else:
289 296 self._pipeo.write(v)
290 297 self._pipeo.flush()
291 298
292 299 return self._pipei
293 300
294 301 def _callcompressable(self, cmd, **args):
295 302 return self._callstream(cmd, **args)
296 303
297 304 def _call(self, cmd, **args):
298 305 self._callstream(cmd, **args)
299 306 return self._recv()
300 307
301 308 def _callpush(self, cmd, fp, **args):
302 309 r = self._call(cmd, **args)
303 310 if r:
304 311 return '', r
305 312 for d in iter(lambda: fp.read(4096), ''):
306 313 self._send(d)
307 314 self._send("", flush=True)
308 315 r = self._recv()
309 316 if r:
310 317 return '', r
311 318 return self._recv(), ''
312 319
313 320 def _calltwowaystream(self, cmd, fp, **args):
314 321 r = self._call(cmd, **args)
315 322 if r:
316 323 # XXX needs to be made better
317 324 raise error.Abort(_('unexpected remote reply: %s') % r)
318 325 for d in iter(lambda: fp.read(4096), ''):
319 326 self._send(d)
320 327 self._send("", flush=True)
321 328 return self._pipei
322 329
323 330 def _getamount(self):
324 331 l = self._pipei.readline()
325 332 if l == '\n':
326 333 self._readerr()
327 334 msg = _('check previous remote output')
328 335 self._abort(error.OutOfBandError(hint=msg))
329 336 self._readerr()
330 337 try:
331 338 return int(l)
332 339 except ValueError:
333 340 self._abort(error.ResponseError(_("unexpected response:"), l))
334 341
335 342 def _recv(self):
336 343 return self._pipei.read(self._getamount())
337 344
338 345 def _send(self, data, flush=False):
339 346 self._pipeo.write("%d\n" % len(data))
340 347 if data:
341 348 self._pipeo.write(data)
342 349 if flush:
343 350 self._pipeo.flush()
344 351 self._readerr()
345 352
346 353 def instance(ui, path, create):
347 354 """Create an SSH peer.
348 355
349 356 The returned object conforms to the ``wireproto.wirepeer`` interface.
350 357 """
351 358 u = util.url(path, parsequery=False, parsefragment=False)
352 359 if u.scheme != 'ssh' or not u.host or u.path is None:
353 360 raise error.RepoError(_("couldn't parse location %s") % path)
354 361
355 362 util.checksafessh(path)
356 363
357 364 if u.passwd is not None:
358 365 raise error.RepoError(_('password in URL not supported'))
359 366
360 367 sshcmd = ui.config('ui', 'ssh')
361 368 remotecmd = ui.config('ui', 'remotecmd')
362 369 sshaddenv = dict(ui.configitems('sshenv'))
363 370 sshenv = util.shellenviron(sshaddenv)
364 371 remotepath = u.path or '.'
365 372
366 373 args = util.sshargs(sshcmd, u.host, u.user, u.port)
367 374
368 375 if create:
369 376 cmd = '%s %s %s' % (sshcmd, args,
370 377 util.shellquote('%s init %s' %
371 378 (_serverquote(remotecmd), _serverquote(remotepath))))
372 379 ui.debug('running %s\n' % cmd)
373 380 res = ui.system(cmd, blockedtag='sshpeer', environ=sshenv)
374 381 if res != 0:
375 382 raise error.RepoError(_('could not create remote repo'))
376 383
377 384 sshstate = (sshcmd, args, remotecmd, sshenv)
378 385
379 386 return sshpeer(ui, path, create=create, sshstate=sshstate)
General Comments 0
You need to be logged in to leave comments. Login now