##// END OF EJS Templates
sshpeer: use peer interface...
Gregory Szorc -
r33803:1f8460b5 default
parent child Browse files
Show More
@@ -1,325 +1,353 b''
1 1 # sshpeer.py - ssh repository proxy class for mercurial
2 2 #
3 3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import re
11 11
12 12 from .i18n import _
13 13 from . import (
14 14 error,
15 15 pycompat,
16 repository,
16 17 util,
17 18 wireproto,
18 19 )
19 20
20 21 def _serverquote(s):
21 22 if not s:
22 23 return s
23 24 '''quote a string for the remote shell ... which we assume is sh'''
24 25 if re.match('[a-zA-Z0-9@%_+=:,./-]*$', s):
25 26 return s
26 27 return "'%s'" % s.replace("'", "'\\''")
27 28
28 29 def _forwardoutput(ui, pipe):
29 30 """display all data currently available on pipe as remote output.
30 31
31 32 This is non blocking."""
32 33 s = util.readpipe(pipe)
33 34 if s:
34 35 for l in s.splitlines():
35 36 ui.status(_("remote: "), l, '\n')
36 37
37 38 class doublepipe(object):
38 39 """Operate a side-channel pipe in addition of a main one
39 40
40 41 The side-channel pipe contains server output to be forwarded to the user
41 42 input. The double pipe will behave as the "main" pipe, but will ensure the
42 43 content of the "side" pipe is properly processed while we wait for blocking
43 44 call on the "main" pipe.
44 45
45 46 If large amounts of data are read from "main", the forward will cease after
46 47 the first bytes start to appear. This simplifies the implementation
47 48 without affecting actual output of sshpeer too much as we rarely issue
48 49 large read for data not yet emitted by the server.
49 50
50 51 The main pipe is expected to be a 'bufferedinputpipe' from the util module
51 52 that handle all the os specific bits. This class lives in this module
52 53 because it focus on behavior specific to the ssh protocol."""
53 54
54 55 def __init__(self, ui, main, side):
55 56 self._ui = ui
56 57 self._main = main
57 58 self._side = side
58 59
59 60 def _wait(self):
60 61 """wait until some data are available on main or side
61 62
62 63 return a pair of boolean (ismainready, issideready)
63 64
64 65 (This will only wait for data if the setup is supported by `util.poll`)
65 66 """
66 67 if getattr(self._main, 'hasbuffer', False): # getattr for classic pipe
67 68 return (True, True) # main has data, assume side is worth poking at.
68 69 fds = [self._main.fileno(), self._side.fileno()]
69 70 try:
70 71 act = util.poll(fds)
71 72 except NotImplementedError:
72 73 # non supported yet case, assume all have data.
73 74 act = fds
74 75 return (self._main.fileno() in act, self._side.fileno() in act)
75 76
76 77 def write(self, data):
77 78 return self._call('write', data)
78 79
79 80 def read(self, size):
80 81 r = self._call('read', size)
81 82 if size != 0 and not r:
82 83 # We've observed a condition that indicates the
83 84 # stdout closed unexpectedly. Check stderr one
84 85 # more time and snag anything that's there before
85 86 # letting anyone know the main part of the pipe
86 87 # closed prematurely.
87 88 _forwardoutput(self._ui, self._side)
88 89 return r
89 90
90 91 def readline(self):
91 92 return self._call('readline')
92 93
93 94 def _call(self, methname, data=None):
94 95 """call <methname> on "main", forward output of "side" while blocking
95 96 """
96 97 # data can be '' or 0
97 98 if (data is not None and not data) or self._main.closed:
98 99 _forwardoutput(self._ui, self._side)
99 100 return ''
100 101 while True:
101 102 mainready, sideready = self._wait()
102 103 if sideready:
103 104 _forwardoutput(self._ui, self._side)
104 105 if mainready:
105 106 meth = getattr(self._main, methname)
106 107 if data is None:
107 108 return meth()
108 109 else:
109 110 return meth(data)
110 111
111 112 def close(self):
112 113 return self._main.close()
113 114
114 115 def flush(self):
115 116 return self._main.flush()
116 117
117 class sshpeer(wireproto.wirepeer):
118 class sshpeer(wireproto.wirepeer, repository.legacypeer):
118 119 def __init__(self, ui, path, create=False):
119 120 self._url = path
120 self.ui = ui
121 self._ui = ui
121 122 self._pipeo = self._pipei = self._pipee = None
122 123
123 124 u = util.url(path, parsequery=False, parsefragment=False)
124 125 if u.scheme != 'ssh' or not u.host or u.path is None:
125 126 self._abort(error.RepoError(_("couldn't parse location %s") % path))
126 127
127 128 util.checksafessh(path)
128 129
129 130 if u.passwd is not None:
130 131 self._abort(error.RepoError(_("password in URL not supported")))
131 132
132 133 self._user = u.user
133 134 self._host = u.host
134 135 self._port = u.port
135 136 self._path = u.path or '.'
136 137
137 138 sshcmd = self.ui.config("ui", "ssh")
138 139 remotecmd = self.ui.config("ui", "remotecmd")
139 140
140 141 args = util.sshargs(sshcmd, self._host, self._user, self._port)
141 142
142 143 if create:
143 144 cmd = '%s %s %s' % (sshcmd, args,
144 145 util.shellquote("%s init %s" %
145 146 (_serverquote(remotecmd), _serverquote(self._path))))
146 147 ui.debug('running %s\n' % cmd)
147 148 res = ui.system(cmd, blockedtag='sshpeer')
148 149 if res != 0:
149 150 self._abort(error.RepoError(_("could not create remote repo")))
150 151
151 152 self._validaterepo(sshcmd, args, remotecmd)
152 153
154 # TODO remove this alias once peerrepository inheritance is removed.
155 self._capabilities = self.capabilities
156
157 # Begin of _basepeer interface.
158
159 @util.propertycache
160 def ui(self):
161 return self._ui
162
153 163 def url(self):
154 164 return self._url
155 165
166 def local(self):
167 return None
168
169 def peer(self):
170 return self
171
172 def canpush(self):
173 return True
174
175 def close(self):
176 pass
177
178 # End of _basepeer interface.
179
180 # Begin of _basewirecommands interface.
181
182 def capabilities(self):
183 return self._caps
184
185 # End of _basewirecommands interface.
186
156 187 def _validaterepo(self, sshcmd, args, remotecmd):
157 188 # cleanup up previous run
158 189 self._cleanup()
159 190
160 191 cmd = '%s %s %s' % (sshcmd, args,
161 192 util.shellquote("%s -R %s serve --stdio" %
162 193 (_serverquote(remotecmd), _serverquote(self._path))))
163 194 self.ui.debug('running %s\n' % cmd)
164 195 cmd = util.quotecommand(cmd)
165 196
166 197 # while self._subprocess isn't used, having it allows the subprocess to
167 198 # to clean up correctly later
168 199 #
169 200 # no buffer allow the use of 'select'
170 201 # feel free to remove buffering and select usage when we ultimately
171 202 # move to threading.
172 203 sub = util.popen4(cmd, bufsize=0)
173 204 self._pipeo, self._pipei, self._pipee, self._subprocess = sub
174 205
175 206 self._pipei = util.bufferedinputpipe(self._pipei)
176 207 self._pipei = doublepipe(self.ui, self._pipei, self._pipee)
177 208 self._pipeo = doublepipe(self.ui, self._pipeo, self._pipee)
178 209
179 210 # skip any noise generated by remote shell
180 211 self._callstream("hello")
181 212 r = self._callstream("between", pairs=("%s-%s" % ("0"*40, "0"*40)))
182 213 lines = ["", "dummy"]
183 214 max_noise = 500
184 215 while lines[-1] and max_noise:
185 216 l = r.readline()
186 217 self._readerr()
187 218 if lines[-1] == "1\n" and l == "\n":
188 219 break
189 220 if l:
190 221 self.ui.debug("remote: ", l)
191 222 lines.append(l)
192 223 max_noise -= 1
193 224 else:
194 225 self._abort(error.RepoError(_('no suitable response from '
195 226 'remote hg')))
196 227
197 228 self._caps = set()
198 229 for l in reversed(lines):
199 230 if l.startswith("capabilities:"):
200 231 self._caps.update(l[:-1].split(":")[1].split())
201 232 break
202 233
203 def _capabilities(self):
204 return self._caps
205
206 234 def _readerr(self):
207 235 _forwardoutput(self.ui, self._pipee)
208 236
209 237 def _abort(self, exception):
210 238 self._cleanup()
211 239 raise exception
212 240
213 241 def _cleanup(self):
214 242 if self._pipeo is None:
215 243 return
216 244 self._pipeo.close()
217 245 self._pipei.close()
218 246 try:
219 247 # read the error descriptor until EOF
220 248 for l in self._pipee:
221 249 self.ui.status(_("remote: "), l)
222 250 except (IOError, ValueError):
223 251 pass
224 252 self._pipee.close()
225 253
226 254 __del__ = _cleanup
227 255
228 256 def _submitbatch(self, req):
229 257 rsp = self._callstream("batch", cmds=wireproto.encodebatchcmds(req))
230 258 available = self._getamount()
231 259 # TODO this response parsing is probably suboptimal for large
232 260 # batches with large responses.
233 261 toread = min(available, 1024)
234 262 work = rsp.read(toread)
235 263 available -= toread
236 264 chunk = work
237 265 while chunk:
238 266 while ';' in work:
239 267 one, work = work.split(';', 1)
240 268 yield wireproto.unescapearg(one)
241 269 toread = min(available, 1024)
242 270 chunk = rsp.read(toread)
243 271 available -= toread
244 272 work += chunk
245 273 yield wireproto.unescapearg(work)
246 274
247 275 def _callstream(self, cmd, **args):
248 276 args = pycompat.byteskwargs(args)
249 277 self.ui.debug("sending %s command\n" % cmd)
250 278 self._pipeo.write("%s\n" % cmd)
251 279 _func, names = wireproto.commands[cmd]
252 280 keys = names.split()
253 281 wireargs = {}
254 282 for k in keys:
255 283 if k == '*':
256 284 wireargs['*'] = args
257 285 break
258 286 else:
259 287 wireargs[k] = args[k]
260 288 del args[k]
261 289 for k, v in sorted(wireargs.iteritems()):
262 290 self._pipeo.write("%s %d\n" % (k, len(v)))
263 291 if isinstance(v, dict):
264 292 for dk, dv in v.iteritems():
265 293 self._pipeo.write("%s %d\n" % (dk, len(dv)))
266 294 self._pipeo.write(dv)
267 295 else:
268 296 self._pipeo.write(v)
269 297 self._pipeo.flush()
270 298
271 299 return self._pipei
272 300
273 301 def _callcompressable(self, cmd, **args):
274 302 return self._callstream(cmd, **args)
275 303
276 304 def _call(self, cmd, **args):
277 305 self._callstream(cmd, **args)
278 306 return self._recv()
279 307
280 308 def _callpush(self, cmd, fp, **args):
281 309 r = self._call(cmd, **args)
282 310 if r:
283 311 return '', r
284 312 for d in iter(lambda: fp.read(4096), ''):
285 313 self._send(d)
286 314 self._send("", flush=True)
287 315 r = self._recv()
288 316 if r:
289 317 return '', r
290 318 return self._recv(), ''
291 319
292 320 def _calltwowaystream(self, cmd, fp, **args):
293 321 r = self._call(cmd, **args)
294 322 if r:
295 323 # XXX needs to be made better
296 324 raise error.Abort(_('unexpected remote reply: %s') % r)
297 325 for d in iter(lambda: fp.read(4096), ''):
298 326 self._send(d)
299 327 self._send("", flush=True)
300 328 return self._pipei
301 329
302 330 def _getamount(self):
303 331 l = self._pipei.readline()
304 332 if l == '\n':
305 333 self._readerr()
306 334 msg = _('check previous remote output')
307 335 self._abort(error.OutOfBandError(hint=msg))
308 336 self._readerr()
309 337 try:
310 338 return int(l)
311 339 except ValueError:
312 340 self._abort(error.ResponseError(_("unexpected response:"), l))
313 341
314 342 def _recv(self):
315 343 return self._pipei.read(self._getamount())
316 344
317 345 def _send(self, data, flush=False):
318 346 self._pipeo.write("%d\n" % len(data))
319 347 if data:
320 348 self._pipeo.write(data)
321 349 if flush:
322 350 self._pipeo.flush()
323 351 self._readerr()
324 352
325 353 instance = sshpeer
General Comments 0
You need to be logged in to leave comments. Login now