##// END OF EJS Templates
sshpeer: use `iter(callable, sentinel)` instead of while True...
Augie Fackler -
r29727:0dbd788a default
parent child Browse files
Show More
@@ -1,374 +1,365
1 1 # sshpeer.py - ssh repository proxy class for mercurial
2 2 #
3 3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import re
11 11
12 12 from .i18n import _
13 13 from . import (
14 14 error,
15 15 util,
16 16 wireproto,
17 17 )
18 18
19 19 class remotelock(object):
20 20 def __init__(self, repo):
21 21 self.repo = repo
22 22 def release(self):
23 23 self.repo.unlock()
24 24 self.repo = None
25 25 def __enter__(self):
26 26 return self
27 27 def __exit__(self, exc_type, exc_val, exc_tb):
28 28 if self.repo:
29 29 self.release()
30 30 def __del__(self):
31 31 if self.repo:
32 32 self.release()
33 33
34 34 def _serverquote(s):
35 35 if not s:
36 36 return s
37 37 '''quote a string for the remote shell ... which we assume is sh'''
38 38 if re.match('[a-zA-Z0-9@%_+=:,./-]*$', s):
39 39 return s
40 40 return "'%s'" % s.replace("'", "'\\''")
41 41
42 42 def _forwardoutput(ui, pipe):
43 43 """display all data currently available on pipe as remote output.
44 44
45 45 This is non blocking."""
46 46 s = util.readpipe(pipe)
47 47 if s:
48 48 for l in s.splitlines():
49 49 ui.status(_("remote: "), l, '\n')
50 50
51 51 class doublepipe(object):
52 52 """Operate a side-channel pipe in addition of a main one
53 53
54 54 The side-channel pipe contains server output to be forwarded to the user
55 55 input. The double pipe will behave as the "main" pipe, but will ensure the
56 56 content of the "side" pipe is properly processed while we wait for blocking
57 57 call on the "main" pipe.
58 58
59 59 If large amounts of data are read from "main", the forward will cease after
60 60 the first bytes start to appear. This simplifies the implementation
61 61 without affecting actual output of sshpeer too much as we rarely issue
62 62 large read for data not yet emitted by the server.
63 63
64 64 The main pipe is expected to be a 'bufferedinputpipe' from the util module
65 65 that handle all the os specific bites. This class lives in this module
66 66 because it focus on behavior specific to the ssh protocol."""
67 67
68 68 def __init__(self, ui, main, side):
69 69 self._ui = ui
70 70 self._main = main
71 71 self._side = side
72 72
73 73 def _wait(self):
74 74 """wait until some data are available on main or side
75 75
76 76 return a pair of boolean (ismainready, issideready)
77 77
78 78 (This will only wait for data if the setup is supported by `util.poll`)
79 79 """
80 80 if getattr(self._main, 'hasbuffer', False): # getattr for classic pipe
81 81 return (True, True) # main has data, assume side is worth poking at.
82 82 fds = [self._main.fileno(), self._side.fileno()]
83 83 try:
84 84 act = util.poll(fds)
85 85 except NotImplementedError:
86 86 # non supported yet case, assume all have data.
87 87 act = fds
88 88 return (self._main.fileno() in act, self._side.fileno() in act)
89 89
90 90 def write(self, data):
91 91 return self._call('write', data)
92 92
93 93 def read(self, size):
94 94 return self._call('read', size)
95 95
96 96 def readline(self):
97 97 return self._call('readline')
98 98
99 99 def _call(self, methname, data=None):
100 100 """call <methname> on "main", forward output of "side" while blocking
101 101 """
102 102 # data can be '' or 0
103 103 if (data is not None and not data) or self._main.closed:
104 104 _forwardoutput(self._ui, self._side)
105 105 return ''
106 106 while True:
107 107 mainready, sideready = self._wait()
108 108 if sideready:
109 109 _forwardoutput(self._ui, self._side)
110 110 if mainready:
111 111 meth = getattr(self._main, methname)
112 112 if data is None:
113 113 return meth()
114 114 else:
115 115 return meth(data)
116 116
117 117 def close(self):
118 118 return self._main.close()
119 119
120 120 def flush(self):
121 121 return self._main.flush()
122 122
123 123 class sshpeer(wireproto.wirepeer):
124 124 def __init__(self, ui, path, create=False):
125 125 self._url = path
126 126 self.ui = ui
127 127 self.pipeo = self.pipei = self.pipee = None
128 128
129 129 u = util.url(path, parsequery=False, parsefragment=False)
130 130 if u.scheme != 'ssh' or not u.host or u.path is None:
131 131 self._abort(error.RepoError(_("couldn't parse location %s") % path))
132 132
133 133 self.user = u.user
134 134 if u.passwd is not None:
135 135 self._abort(error.RepoError(_("password in URL not supported")))
136 136 self.host = u.host
137 137 self.port = u.port
138 138 self.path = u.path or "."
139 139
140 140 sshcmd = self.ui.config("ui", "ssh", "ssh")
141 141 remotecmd = self.ui.config("ui", "remotecmd", "hg")
142 142
143 143 args = util.sshargs(sshcmd,
144 144 _serverquote(self.host),
145 145 _serverquote(self.user),
146 146 _serverquote(self.port))
147 147
148 148 if create:
149 149 cmd = '%s %s %s' % (sshcmd, args,
150 150 util.shellquote("%s init %s" %
151 151 (_serverquote(remotecmd), _serverquote(self.path))))
152 152 ui.debug('running %s\n' % cmd)
153 153 res = ui.system(cmd)
154 154 if res != 0:
155 155 self._abort(error.RepoError(_("could not create remote repo")))
156 156
157 157 self._validaterepo(sshcmd, args, remotecmd)
158 158
159 159 def url(self):
160 160 return self._url
161 161
162 162 def _validaterepo(self, sshcmd, args, remotecmd):
163 163 # cleanup up previous run
164 164 self.cleanup()
165 165
166 166 cmd = '%s %s %s' % (sshcmd, args,
167 167 util.shellquote("%s -R %s serve --stdio" %
168 168 (_serverquote(remotecmd), _serverquote(self.path))))
169 169 self.ui.debug('running %s\n' % cmd)
170 170 cmd = util.quotecommand(cmd)
171 171
172 172 # while self.subprocess isn't used, having it allows the subprocess to
173 173 # to clean up correctly later
174 174 #
175 175 # no buffer allow the use of 'select'
176 176 # feel free to remove buffering and select usage when we ultimately
177 177 # move to threading.
178 178 sub = util.popen4(cmd, bufsize=0)
179 179 self.pipeo, self.pipei, self.pipee, self.subprocess = sub
180 180
181 181 self.pipei = util.bufferedinputpipe(self.pipei)
182 182 self.pipei = doublepipe(self.ui, self.pipei, self.pipee)
183 183 self.pipeo = doublepipe(self.ui, self.pipeo, self.pipee)
184 184
185 185 # skip any noise generated by remote shell
186 186 self._callstream("hello")
187 187 r = self._callstream("between", pairs=("%s-%s" % ("0"*40, "0"*40)))
188 188 lines = ["", "dummy"]
189 189 max_noise = 500
190 190 while lines[-1] and max_noise:
191 191 l = r.readline()
192 192 self.readerr()
193 193 if lines[-1] == "1\n" and l == "\n":
194 194 break
195 195 if l:
196 196 self.ui.debug("remote: ", l)
197 197 lines.append(l)
198 198 max_noise -= 1
199 199 else:
200 200 self._abort(error.RepoError(_('no suitable response from '
201 201 'remote hg')))
202 202
203 203 self._caps = set()
204 204 for l in reversed(lines):
205 205 if l.startswith("capabilities:"):
206 206 self._caps.update(l[:-1].split(":")[1].split())
207 207 break
208 208
209 209 def _capabilities(self):
210 210 return self._caps
211 211
212 212 def readerr(self):
213 213 _forwardoutput(self.ui, self.pipee)
214 214
215 215 def _abort(self, exception):
216 216 self.cleanup()
217 217 raise exception
218 218
219 219 def cleanup(self):
220 220 if self.pipeo is None:
221 221 return
222 222 self.pipeo.close()
223 223 self.pipei.close()
224 224 try:
225 225 # read the error descriptor until EOF
226 226 for l in self.pipee:
227 227 self.ui.status(_("remote: "), l)
228 228 except (IOError, ValueError):
229 229 pass
230 230 self.pipee.close()
231 231
232 232 __del__ = cleanup
233 233
234 234 def _submitbatch(self, req):
235 235 cmds = []
236 236 for op, argsdict in req:
237 237 args = ','.join('%s=%s' % (wireproto.escapearg(k),
238 238 wireproto.escapearg(v))
239 239 for k, v in argsdict.iteritems())
240 240 cmds.append('%s %s' % (op, args))
241 241 rsp = self._callstream("batch", cmds=';'.join(cmds))
242 242 available = self._getamount()
243 243 # TODO this response parsing is probably suboptimal for large
244 244 # batches with large responses.
245 245 toread = min(available, 1024)
246 246 work = rsp.read(toread)
247 247 available -= toread
248 248 chunk = work
249 249 while chunk:
250 250 while ';' in work:
251 251 one, work = work.split(';', 1)
252 252 yield wireproto.unescapearg(one)
253 253 toread = min(available, 1024)
254 254 chunk = rsp.read(toread)
255 255 available -= toread
256 256 work += chunk
257 257 yield wireproto.unescapearg(work)
258 258
259 259 def _callstream(self, cmd, **args):
260 260 self.ui.debug("sending %s command\n" % cmd)
261 261 self.pipeo.write("%s\n" % cmd)
262 262 _func, names = wireproto.commands[cmd]
263 263 keys = names.split()
264 264 wireargs = {}
265 265 for k in keys:
266 266 if k == '*':
267 267 wireargs['*'] = args
268 268 break
269 269 else:
270 270 wireargs[k] = args[k]
271 271 del args[k]
272 272 for k, v in sorted(wireargs.iteritems()):
273 273 self.pipeo.write("%s %d\n" % (k, len(v)))
274 274 if isinstance(v, dict):
275 275 for dk, dv in v.iteritems():
276 276 self.pipeo.write("%s %d\n" % (dk, len(dv)))
277 277 self.pipeo.write(dv)
278 278 else:
279 279 self.pipeo.write(v)
280 280 self.pipeo.flush()
281 281
282 282 return self.pipei
283 283
284 284 def _callcompressable(self, cmd, **args):
285 285 return self._callstream(cmd, **args)
286 286
287 287 def _call(self, cmd, **args):
288 288 self._callstream(cmd, **args)
289 289 return self._recv()
290 290
291 291 def _callpush(self, cmd, fp, **args):
292 292 r = self._call(cmd, **args)
293 293 if r:
294 294 return '', r
295 while True:
296 d = fp.read(4096)
297 if not d:
298 break
295 for d in iter(lambda: fp.read(4096), ''):
299 296 self._send(d)
300 297 self._send("", flush=True)
301 298 r = self._recv()
302 299 if r:
303 300 return '', r
304 301 return self._recv(), ''
305 302
306 303 def _calltwowaystream(self, cmd, fp, **args):
307 304 r = self._call(cmd, **args)
308 305 if r:
309 306 # XXX needs to be made better
310 307 raise error.Abort(_('unexpected remote reply: %s') % r)
311 while True:
312 d = fp.read(4096)
313 if not d:
314 break
308 for d in iter(lambda: fp.read(4096), ''):
315 309 self._send(d)
316 310 self._send("", flush=True)
317 311 return self.pipei
318 312
319 313 def _getamount(self):
320 314 l = self.pipei.readline()
321 315 if l == '\n':
322 316 self.readerr()
323 317 msg = _('check previous remote output')
324 318 self._abort(error.OutOfBandError(hint=msg))
325 319 self.readerr()
326 320 try:
327 321 return int(l)
328 322 except ValueError:
329 323 self._abort(error.ResponseError(_("unexpected response:"), l))
330 324
331 325 def _recv(self):
332 326 return self.pipei.read(self._getamount())
333 327
334 328 def _send(self, data, flush=False):
335 329 self.pipeo.write("%d\n" % len(data))
336 330 if data:
337 331 self.pipeo.write(data)
338 332 if flush:
339 333 self.pipeo.flush()
340 334 self.readerr()
341 335
342 336 def lock(self):
343 337 self._call("lock")
344 338 return remotelock(self)
345 339
346 340 def unlock(self):
347 341 self._call("unlock")
348 342
349 343 def addchangegroup(self, cg, source, url, lock=None):
350 344 '''Send a changegroup to the remote server. Return an integer
351 345 similar to unbundle(). DEPRECATED, since it requires locking the
352 346 remote.'''
353 347 d = self._call("addchangegroup")
354 348 if d:
355 349 self._abort(error.RepoError(_("push refused: %s") % d))
356 while True:
357 d = cg.read(4096)
358 if not d:
359 break
350 for d in iter(lambda: cg.read(4096), ''):
360 351 self.pipeo.write(d)
361 352 self.readerr()
362 353
363 354 self.pipeo.flush()
364 355
365 356 self.readerr()
366 357 r = self._recv()
367 358 if not r:
368 359 return 1
369 360 try:
370 361 return int(r)
371 362 except ValueError:
372 363 self._abort(error.ResponseError(_("unexpected response:"), r))
373 364
374 365 instance = sshpeer
General Comments 0
You need to be logged in to leave comments. Login now