##// END OF EJS Templates
sshpeer: remove frivolous call to _cleanup()...
Gregory Szorc -
r35952:94ba2993 default
parent child Browse files
Show More
@@ -1,386 +1,385 b''
1 1 # sshpeer.py - ssh repository proxy class for mercurial
2 2 #
3 3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import re
11 11
12 12 from .i18n import _
13 13 from . import (
14 14 error,
15 15 pycompat,
16 16 util,
17 17 wireproto,
18 18 )
19 19
20 20 def _serverquote(s):
21 21 """quote a string for the remote shell ... which we assume is sh"""
22 22 if not s:
23 23 return s
24 24 if re.match('[a-zA-Z0-9@%_+=:,./-]*$', s):
25 25 return s
26 26 return "'%s'" % s.replace("'", "'\\''")
27 27
28 28 def _forwardoutput(ui, pipe):
29 29 """display all data currently available on pipe as remote output.
30 30
31 31 This is non blocking."""
32 32 s = util.readpipe(pipe)
33 33 if s:
34 34 for l in s.splitlines():
35 35 ui.status(_("remote: "), l, '\n')
36 36
37 37 class doublepipe(object):
38 38 """Operate a side-channel pipe in addition of a main one
39 39
40 40 The side-channel pipe contains server output to be forwarded to the user
41 41 input. The double pipe will behave as the "main" pipe, but will ensure the
42 42 content of the "side" pipe is properly processed while we wait for blocking
43 43 call on the "main" pipe.
44 44
45 45 If large amounts of data are read from "main", the forward will cease after
46 46 the first bytes start to appear. This simplifies the implementation
47 47 without affecting actual output of sshpeer too much as we rarely issue
48 48 large read for data not yet emitted by the server.
49 49
50 50 The main pipe is expected to be a 'bufferedinputpipe' from the util module
51 51 that handle all the os specific bits. This class lives in this module
52 52 because it focus on behavior specific to the ssh protocol."""
53 53
54 54 def __init__(self, ui, main, side):
55 55 self._ui = ui
56 56 self._main = main
57 57 self._side = side
58 58
59 59 def _wait(self):
60 60 """wait until some data are available on main or side
61 61
62 62 return a pair of boolean (ismainready, issideready)
63 63
64 64 (This will only wait for data if the setup is supported by `util.poll`)
65 65 """
66 66 if getattr(self._main, 'hasbuffer', False): # getattr for classic pipe
67 67 return (True, True) # main has data, assume side is worth poking at.
68 68 fds = [self._main.fileno(), self._side.fileno()]
69 69 try:
70 70 act = util.poll(fds)
71 71 except NotImplementedError:
72 72 # non supported yet case, assume all have data.
73 73 act = fds
74 74 return (self._main.fileno() in act, self._side.fileno() in act)
75 75
76 76 def write(self, data):
77 77 return self._call('write', data)
78 78
79 79 def read(self, size):
80 80 r = self._call('read', size)
81 81 if size != 0 and not r:
82 82 # We've observed a condition that indicates the
83 83 # stdout closed unexpectedly. Check stderr one
84 84 # more time and snag anything that's there before
85 85 # letting anyone know the main part of the pipe
86 86 # closed prematurely.
87 87 _forwardoutput(self._ui, self._side)
88 88 return r
89 89
90 90 def readline(self):
91 91 return self._call('readline')
92 92
93 93 def _call(self, methname, data=None):
94 94 """call <methname> on "main", forward output of "side" while blocking
95 95 """
96 96 # data can be '' or 0
97 97 if (data is not None and not data) or self._main.closed:
98 98 _forwardoutput(self._ui, self._side)
99 99 return ''
100 100 while True:
101 101 mainready, sideready = self._wait()
102 102 if sideready:
103 103 _forwardoutput(self._ui, self._side)
104 104 if mainready:
105 105 meth = getattr(self._main, methname)
106 106 if data is None:
107 107 return meth()
108 108 else:
109 109 return meth(data)
110 110
111 111 def close(self):
112 112 return self._main.close()
113 113
114 114 def flush(self):
115 115 return self._main.flush()
116 116
117 117 def _cleanuppipes(ui, pipei, pipeo, pipee):
118 118 """Clean up pipes used by an SSH connection."""
119 119 if pipeo:
120 120 pipeo.close()
121 121 if pipei:
122 122 pipei.close()
123 123
124 124 if pipee:
125 125 # Try to read from the err descriptor until EOF.
126 126 try:
127 127 for l in pipee:
128 128 ui.status(_('remote: '), l)
129 129 except (IOError, ValueError):
130 130 pass
131 131
132 132 pipee.close()
133 133
134 134 class sshpeer(wireproto.wirepeer):
135 135 def __init__(self, ui, path, create=False, sshstate=None):
136 136 self._url = path
137 137 self._ui = ui
138 138 self._pipeo = self._pipei = self._pipee = None
139 139
140 140 u = util.url(path, parsequery=False, parsefragment=False)
141 141 self._path = u.path or '.'
142 142
143 143 self._validaterepo(*sshstate)
144 144
145 145 # Begin of _basepeer interface.
146 146
147 147 @util.propertycache
148 148 def ui(self):
149 149 return self._ui
150 150
151 151 def url(self):
152 152 return self._url
153 153
154 154 def local(self):
155 155 return None
156 156
157 157 def peer(self):
158 158 return self
159 159
160 160 def canpush(self):
161 161 return True
162 162
163 163 def close(self):
164 164 pass
165 165
166 166 # End of _basepeer interface.
167 167
168 168 # Begin of _basewirecommands interface.
169 169
170 170 def capabilities(self):
171 171 return self._caps
172 172
173 173 # End of _basewirecommands interface.
174 174
175 175 def _validaterepo(self, sshcmd, args, remotecmd, sshenv=None):
176 # cleanup up previous run
177 self._cleanup()
176 assert self._pipei is None
178 177
179 178 cmd = '%s %s %s' % (sshcmd, args,
180 179 util.shellquote("%s -R %s serve --stdio" %
181 180 (_serverquote(remotecmd), _serverquote(self._path))))
182 181 self.ui.debug('running %s\n' % cmd)
183 182 cmd = util.quotecommand(cmd)
184 183
185 184 # while self._subprocess isn't used, having it allows the subprocess to
186 185 # to clean up correctly later
187 186 #
188 187 # no buffer allow the use of 'select'
189 188 # feel free to remove buffering and select usage when we ultimately
190 189 # move to threading.
191 190 sub = util.popen4(cmd, bufsize=0, env=sshenv)
192 191 self._pipeo, self._pipei, self._pipee, self._subprocess = sub
193 192
194 193 self._pipei = util.bufferedinputpipe(self._pipei)
195 194 self._pipei = doublepipe(self.ui, self._pipei, self._pipee)
196 195 self._pipeo = doublepipe(self.ui, self._pipeo, self._pipee)
197 196
198 197 def badresponse():
199 198 msg = _("no suitable response from remote hg")
200 199 hint = self.ui.config("ui", "ssherrorhint")
201 200 self._abort(error.RepoError(msg, hint=hint))
202 201
203 202 try:
204 203 # skip any noise generated by remote shell
205 204 self._callstream("hello")
206 205 r = self._callstream("between", pairs=("%s-%s" % ("0"*40, "0"*40)))
207 206 except IOError:
208 207 badresponse()
209 208
210 209 lines = ["", "dummy"]
211 210 max_noise = 500
212 211 while lines[-1] and max_noise:
213 212 try:
214 213 l = r.readline()
215 214 self._readerr()
216 215 if lines[-1] == "1\n" and l == "\n":
217 216 break
218 217 if l:
219 218 self.ui.debug("remote: ", l)
220 219 lines.append(l)
221 220 max_noise -= 1
222 221 except IOError:
223 222 badresponse()
224 223 else:
225 224 badresponse()
226 225
227 226 self._caps = set()
228 227 for l in reversed(lines):
229 228 if l.startswith("capabilities:"):
230 229 self._caps.update(l[:-1].split(":")[1].split())
231 230 break
232 231
233 232 def _readerr(self):
234 233 _forwardoutput(self.ui, self._pipee)
235 234
236 235 def _abort(self, exception):
237 236 self._cleanup()
238 237 raise exception
239 238
240 239 def _cleanup(self):
241 240 _cleanuppipes(self.ui, self._pipei, self._pipeo, self._pipee)
242 241
243 242 __del__ = _cleanup
244 243
245 244 def _submitbatch(self, req):
246 245 rsp = self._callstream("batch", cmds=wireproto.encodebatchcmds(req))
247 246 available = self._getamount()
248 247 # TODO this response parsing is probably suboptimal for large
249 248 # batches with large responses.
250 249 toread = min(available, 1024)
251 250 work = rsp.read(toread)
252 251 available -= toread
253 252 chunk = work
254 253 while chunk:
255 254 while ';' in work:
256 255 one, work = work.split(';', 1)
257 256 yield wireproto.unescapearg(one)
258 257 toread = min(available, 1024)
259 258 chunk = rsp.read(toread)
260 259 available -= toread
261 260 work += chunk
262 261 yield wireproto.unescapearg(work)
263 262
264 263 def _callstream(self, cmd, **args):
265 264 args = pycompat.byteskwargs(args)
266 265 if (self.ui.debugflag
267 266 and self.ui.configbool('devel', 'debug.peer-request')):
268 267 dbg = self.ui.debug
269 268 line = 'devel-peer-request: %s\n'
270 269 dbg(line % cmd)
271 270 for key, value in sorted(args.items()):
272 271 if not isinstance(value, dict):
273 272 dbg(line % ' %s: %d bytes' % (key, len(value)))
274 273 else:
275 274 for dk, dv in sorted(value.items()):
276 275 dbg(line % ' %s-%s: %d' % (key, dk, len(dv)))
277 276 self.ui.debug("sending %s command\n" % cmd)
278 277 self._pipeo.write("%s\n" % cmd)
279 278 _func, names = wireproto.commands[cmd]
280 279 keys = names.split()
281 280 wireargs = {}
282 281 for k in keys:
283 282 if k == '*':
284 283 wireargs['*'] = args
285 284 break
286 285 else:
287 286 wireargs[k] = args[k]
288 287 del args[k]
289 288 for k, v in sorted(wireargs.iteritems()):
290 289 self._pipeo.write("%s %d\n" % (k, len(v)))
291 290 if isinstance(v, dict):
292 291 for dk, dv in v.iteritems():
293 292 self._pipeo.write("%s %d\n" % (dk, len(dv)))
294 293 self._pipeo.write(dv)
295 294 else:
296 295 self._pipeo.write(v)
297 296 self._pipeo.flush()
298 297
299 298 return self._pipei
300 299
301 300 def _callcompressable(self, cmd, **args):
302 301 return self._callstream(cmd, **args)
303 302
304 303 def _call(self, cmd, **args):
305 304 self._callstream(cmd, **args)
306 305 return self._recv()
307 306
308 307 def _callpush(self, cmd, fp, **args):
309 308 r = self._call(cmd, **args)
310 309 if r:
311 310 return '', r
312 311 for d in iter(lambda: fp.read(4096), ''):
313 312 self._send(d)
314 313 self._send("", flush=True)
315 314 r = self._recv()
316 315 if r:
317 316 return '', r
318 317 return self._recv(), ''
319 318
320 319 def _calltwowaystream(self, cmd, fp, **args):
321 320 r = self._call(cmd, **args)
322 321 if r:
323 322 # XXX needs to be made better
324 323 raise error.Abort(_('unexpected remote reply: %s') % r)
325 324 for d in iter(lambda: fp.read(4096), ''):
326 325 self._send(d)
327 326 self._send("", flush=True)
328 327 return self._pipei
329 328
330 329 def _getamount(self):
331 330 l = self._pipei.readline()
332 331 if l == '\n':
333 332 self._readerr()
334 333 msg = _('check previous remote output')
335 334 self._abort(error.OutOfBandError(hint=msg))
336 335 self._readerr()
337 336 try:
338 337 return int(l)
339 338 except ValueError:
340 339 self._abort(error.ResponseError(_("unexpected response:"), l))
341 340
342 341 def _recv(self):
343 342 return self._pipei.read(self._getamount())
344 343
345 344 def _send(self, data, flush=False):
346 345 self._pipeo.write("%d\n" % len(data))
347 346 if data:
348 347 self._pipeo.write(data)
349 348 if flush:
350 349 self._pipeo.flush()
351 350 self._readerr()
352 351
353 352 def instance(ui, path, create):
354 353 """Create an SSH peer.
355 354
356 355 The returned object conforms to the ``wireproto.wirepeer`` interface.
357 356 """
358 357 u = util.url(path, parsequery=False, parsefragment=False)
359 358 if u.scheme != 'ssh' or not u.host or u.path is None:
360 359 raise error.RepoError(_("couldn't parse location %s") % path)
361 360
362 361 util.checksafessh(path)
363 362
364 363 if u.passwd is not None:
365 364 raise error.RepoError(_('password in URL not supported'))
366 365
367 366 sshcmd = ui.config('ui', 'ssh')
368 367 remotecmd = ui.config('ui', 'remotecmd')
369 368 sshaddenv = dict(ui.configitems('sshenv'))
370 369 sshenv = util.shellenviron(sshaddenv)
371 370 remotepath = u.path or '.'
372 371
373 372 args = util.sshargs(sshcmd, u.host, u.user, u.port)
374 373
375 374 if create:
376 375 cmd = '%s %s %s' % (sshcmd, args,
377 376 util.shellquote('%s init %s' %
378 377 (_serverquote(remotecmd), _serverquote(remotepath))))
379 378 ui.debug('running %s\n' % cmd)
380 379 res = ui.system(cmd, blockedtag='sshpeer', environ=sshenv)
381 380 if res != 0:
382 381 raise error.RepoError(_('could not create remote repo'))
383 382
384 383 sshstate = (sshcmd, args, remotecmd, sshenv)
385 384
386 385 return sshpeer(ui, path, create=create, sshstate=sshstate)
General Comments 0
You need to be logged in to leave comments. Login now