##// END OF EJS Templates
sshpeer: set a blockedtag when starting ssh...
Simon Farnsworth -
r31197:764f4581 default
parent child Browse files
Show More
@@ -1,359 +1,359
1 1 # sshpeer.py - ssh repository proxy class for mercurial
2 2 #
3 3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import re
11 11
12 12 from .i18n import _
13 13 from . import (
14 14 error,
15 15 util,
16 16 wireproto,
17 17 )
18 18
19 19 class remotelock(object):
20 20 def __init__(self, repo):
21 21 self.repo = repo
22 22 def release(self):
23 23 self.repo.unlock()
24 24 self.repo = None
25 25 def __enter__(self):
26 26 return self
27 27 def __exit__(self, exc_type, exc_val, exc_tb):
28 28 if self.repo:
29 29 self.release()
30 30 def __del__(self):
31 31 if self.repo:
32 32 self.release()
33 33
34 34 def _serverquote(s):
35 35 if not s:
36 36 return s
37 37 '''quote a string for the remote shell ... which we assume is sh'''
38 38 if re.match('[a-zA-Z0-9@%_+=:,./-]*$', s):
39 39 return s
40 40 return "'%s'" % s.replace("'", "'\\''")
41 41
42 42 def _forwardoutput(ui, pipe):
43 43 """display all data currently available on pipe as remote output.
44 44
45 45 This is non blocking."""
46 46 s = util.readpipe(pipe)
47 47 if s:
48 48 for l in s.splitlines():
49 49 ui.status(_("remote: "), l, '\n')
50 50
51 51 class doublepipe(object):
52 52 """Operate a side-channel pipe in addition of a main one
53 53
54 54 The side-channel pipe contains server output to be forwarded to the user
55 55 input. The double pipe will behave as the "main" pipe, but will ensure the
56 56 content of the "side" pipe is properly processed while we wait for blocking
57 57 call on the "main" pipe.
58 58
59 59 If large amounts of data are read from "main", the forward will cease after
60 60 the first bytes start to appear. This simplifies the implementation
61 61 without affecting actual output of sshpeer too much as we rarely issue
62 62 large read for data not yet emitted by the server.
63 63
64 64 The main pipe is expected to be a 'bufferedinputpipe' from the util module
65 65 that handle all the os specific bites. This class lives in this module
66 66 because it focus on behavior specific to the ssh protocol."""
67 67
68 68 def __init__(self, ui, main, side):
69 69 self._ui = ui
70 70 self._main = main
71 71 self._side = side
72 72
73 73 def _wait(self):
74 74 """wait until some data are available on main or side
75 75
76 76 return a pair of boolean (ismainready, issideready)
77 77
78 78 (This will only wait for data if the setup is supported by `util.poll`)
79 79 """
80 80 if getattr(self._main, 'hasbuffer', False): # getattr for classic pipe
81 81 return (True, True) # main has data, assume side is worth poking at.
82 82 fds = [self._main.fileno(), self._side.fileno()]
83 83 try:
84 84 act = util.poll(fds)
85 85 except NotImplementedError:
86 86 # non supported yet case, assume all have data.
87 87 act = fds
88 88 return (self._main.fileno() in act, self._side.fileno() in act)
89 89
90 90 def write(self, data):
91 91 return self._call('write', data)
92 92
93 93 def read(self, size):
94 94 return self._call('read', size)
95 95
96 96 def readline(self):
97 97 return self._call('readline')
98 98
99 99 def _call(self, methname, data=None):
100 100 """call <methname> on "main", forward output of "side" while blocking
101 101 """
102 102 # data can be '' or 0
103 103 if (data is not None and not data) or self._main.closed:
104 104 _forwardoutput(self._ui, self._side)
105 105 return ''
106 106 while True:
107 107 mainready, sideready = self._wait()
108 108 if sideready:
109 109 _forwardoutput(self._ui, self._side)
110 110 if mainready:
111 111 meth = getattr(self._main, methname)
112 112 if data is None:
113 113 return meth()
114 114 else:
115 115 return meth(data)
116 116
117 117 def close(self):
118 118 return self._main.close()
119 119
120 120 def flush(self):
121 121 return self._main.flush()
122 122
123 123 class sshpeer(wireproto.wirepeer):
124 124 def __init__(self, ui, path, create=False):
125 125 self._url = path
126 126 self.ui = ui
127 127 self.pipeo = self.pipei = self.pipee = None
128 128
129 129 u = util.url(path, parsequery=False, parsefragment=False)
130 130 if u.scheme != 'ssh' or not u.host or u.path is None:
131 131 self._abort(error.RepoError(_("couldn't parse location %s") % path))
132 132
133 133 self.user = u.user
134 134 if u.passwd is not None:
135 135 self._abort(error.RepoError(_("password in URL not supported")))
136 136 self.host = u.host
137 137 self.port = u.port
138 138 self.path = u.path or "."
139 139
140 140 sshcmd = self.ui.config("ui", "ssh", "ssh")
141 141 remotecmd = self.ui.config("ui", "remotecmd", "hg")
142 142
143 143 args = util.sshargs(sshcmd,
144 144 _serverquote(self.host),
145 145 _serverquote(self.user),
146 146 _serverquote(self.port))
147 147
148 148 if create:
149 149 cmd = '%s %s %s' % (sshcmd, args,
150 150 util.shellquote("%s init %s" %
151 151 (_serverquote(remotecmd), _serverquote(self.path))))
152 152 ui.debug('running %s\n' % cmd)
153 res = ui.system(cmd)
153 res = ui.system(cmd, blockedtag='sshpeer')
154 154 if res != 0:
155 155 self._abort(error.RepoError(_("could not create remote repo")))
156 156
157 157 self._validaterepo(sshcmd, args, remotecmd)
158 158
159 159 def url(self):
160 160 return self._url
161 161
162 162 def _validaterepo(self, sshcmd, args, remotecmd):
163 163 # cleanup up previous run
164 164 self.cleanup()
165 165
166 166 cmd = '%s %s %s' % (sshcmd, args,
167 167 util.shellquote("%s -R %s serve --stdio" %
168 168 (_serverquote(remotecmd), _serverquote(self.path))))
169 169 self.ui.debug('running %s\n' % cmd)
170 170 cmd = util.quotecommand(cmd)
171 171
172 172 # while self.subprocess isn't used, having it allows the subprocess to
173 173 # to clean up correctly later
174 174 #
175 175 # no buffer allow the use of 'select'
176 176 # feel free to remove buffering and select usage when we ultimately
177 177 # move to threading.
178 178 sub = util.popen4(cmd, bufsize=0)
179 179 self.pipeo, self.pipei, self.pipee, self.subprocess = sub
180 180
181 181 self.pipei = util.bufferedinputpipe(self.pipei)
182 182 self.pipei = doublepipe(self.ui, self.pipei, self.pipee)
183 183 self.pipeo = doublepipe(self.ui, self.pipeo, self.pipee)
184 184
185 185 # skip any noise generated by remote shell
186 186 self._callstream("hello")
187 187 r = self._callstream("between", pairs=("%s-%s" % ("0"*40, "0"*40)))
188 188 lines = ["", "dummy"]
189 189 max_noise = 500
190 190 while lines[-1] and max_noise:
191 191 l = r.readline()
192 192 self.readerr()
193 193 if lines[-1] == "1\n" and l == "\n":
194 194 break
195 195 if l:
196 196 self.ui.debug("remote: ", l)
197 197 lines.append(l)
198 198 max_noise -= 1
199 199 else:
200 200 self._abort(error.RepoError(_('no suitable response from '
201 201 'remote hg')))
202 202
203 203 self._caps = set()
204 204 for l in reversed(lines):
205 205 if l.startswith("capabilities:"):
206 206 self._caps.update(l[:-1].split(":")[1].split())
207 207 break
208 208
209 209 def _capabilities(self):
210 210 return self._caps
211 211
212 212 def readerr(self):
213 213 _forwardoutput(self.ui, self.pipee)
214 214
215 215 def _abort(self, exception):
216 216 self.cleanup()
217 217 raise exception
218 218
219 219 def cleanup(self):
220 220 if self.pipeo is None:
221 221 return
222 222 self.pipeo.close()
223 223 self.pipei.close()
224 224 try:
225 225 # read the error descriptor until EOF
226 226 for l in self.pipee:
227 227 self.ui.status(_("remote: "), l)
228 228 except (IOError, ValueError):
229 229 pass
230 230 self.pipee.close()
231 231
232 232 __del__ = cleanup
233 233
234 234 def _submitbatch(self, req):
235 235 rsp = self._callstream("batch", cmds=wireproto.encodebatchcmds(req))
236 236 available = self._getamount()
237 237 # TODO this response parsing is probably suboptimal for large
238 238 # batches with large responses.
239 239 toread = min(available, 1024)
240 240 work = rsp.read(toread)
241 241 available -= toread
242 242 chunk = work
243 243 while chunk:
244 244 while ';' in work:
245 245 one, work = work.split(';', 1)
246 246 yield wireproto.unescapearg(one)
247 247 toread = min(available, 1024)
248 248 chunk = rsp.read(toread)
249 249 available -= toread
250 250 work += chunk
251 251 yield wireproto.unescapearg(work)
252 252
253 253 def _callstream(self, cmd, **args):
254 254 self.ui.debug("sending %s command\n" % cmd)
255 255 self.pipeo.write("%s\n" % cmd)
256 256 _func, names = wireproto.commands[cmd]
257 257 keys = names.split()
258 258 wireargs = {}
259 259 for k in keys:
260 260 if k == '*':
261 261 wireargs['*'] = args
262 262 break
263 263 else:
264 264 wireargs[k] = args[k]
265 265 del args[k]
266 266 for k, v in sorted(wireargs.iteritems()):
267 267 self.pipeo.write("%s %d\n" % (k, len(v)))
268 268 if isinstance(v, dict):
269 269 for dk, dv in v.iteritems():
270 270 self.pipeo.write("%s %d\n" % (dk, len(dv)))
271 271 self.pipeo.write(dv)
272 272 else:
273 273 self.pipeo.write(v)
274 274 self.pipeo.flush()
275 275
276 276 return self.pipei
277 277
278 278 def _callcompressable(self, cmd, **args):
279 279 return self._callstream(cmd, **args)
280 280
281 281 def _call(self, cmd, **args):
282 282 self._callstream(cmd, **args)
283 283 return self._recv()
284 284
285 285 def _callpush(self, cmd, fp, **args):
286 286 r = self._call(cmd, **args)
287 287 if r:
288 288 return '', r
289 289 for d in iter(lambda: fp.read(4096), ''):
290 290 self._send(d)
291 291 self._send("", flush=True)
292 292 r = self._recv()
293 293 if r:
294 294 return '', r
295 295 return self._recv(), ''
296 296
297 297 def _calltwowaystream(self, cmd, fp, **args):
298 298 r = self._call(cmd, **args)
299 299 if r:
300 300 # XXX needs to be made better
301 301 raise error.Abort(_('unexpected remote reply: %s') % r)
302 302 for d in iter(lambda: fp.read(4096), ''):
303 303 self._send(d)
304 304 self._send("", flush=True)
305 305 return self.pipei
306 306
307 307 def _getamount(self):
308 308 l = self.pipei.readline()
309 309 if l == '\n':
310 310 self.readerr()
311 311 msg = _('check previous remote output')
312 312 self._abort(error.OutOfBandError(hint=msg))
313 313 self.readerr()
314 314 try:
315 315 return int(l)
316 316 except ValueError:
317 317 self._abort(error.ResponseError(_("unexpected response:"), l))
318 318
319 319 def _recv(self):
320 320 return self.pipei.read(self._getamount())
321 321
322 322 def _send(self, data, flush=False):
323 323 self.pipeo.write("%d\n" % len(data))
324 324 if data:
325 325 self.pipeo.write(data)
326 326 if flush:
327 327 self.pipeo.flush()
328 328 self.readerr()
329 329
330 330 def lock(self):
331 331 self._call("lock")
332 332 return remotelock(self)
333 333
334 334 def unlock(self):
335 335 self._call("unlock")
336 336
337 337 def addchangegroup(self, cg, source, url, lock=None):
338 338 '''Send a changegroup to the remote server. Return an integer
339 339 similar to unbundle(). DEPRECATED, since it requires locking the
340 340 remote.'''
341 341 d = self._call("addchangegroup")
342 342 if d:
343 343 self._abort(error.RepoError(_("push refused: %s") % d))
344 344 for d in iter(lambda: cg.read(4096), ''):
345 345 self.pipeo.write(d)
346 346 self.readerr()
347 347
348 348 self.pipeo.flush()
349 349
350 350 self.readerr()
351 351 r = self._recv()
352 352 if not r:
353 353 return 1
354 354 try:
355 355 return int(r)
356 356 except ValueError:
357 357 self._abort(error.ResponseError(_("unexpected response:"), r))
358 358
359 359 instance = sshpeer
General Comments 0
You need to be logged in to leave comments. Login now