##// END OF EJS Templates
sshpeer: also use doublepipe for client to server communication...
Pierre-Yves David -
r25458:4642f0b8 default
parent child Browse files
Show More
@@ -1,334 +1,335 b''
1 1 # sshpeer.py - ssh repository proxy class for mercurial
2 2 #
3 3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 import re
9 9 from i18n import _
10 10 import util, error, wireproto
11 11
12 12 class remotelock(object):
13 13 def __init__(self, repo):
14 14 self.repo = repo
15 15 def release(self):
16 16 self.repo.unlock()
17 17 self.repo = None
18 18 def __del__(self):
19 19 if self.repo:
20 20 self.release()
21 21
22 22 def _serverquote(s):
23 23 if not s:
24 24 return s
25 25 '''quote a string for the remote shell ... which we assume is sh'''
26 26 if re.match('[a-zA-Z0-9@%_+=:,./-]*$', s):
27 27 return s
28 28 return "'%s'" % s.replace("'", "'\\''")
29 29
30 30 def _forwardoutput(ui, pipe):
31 31 """display all data currently available on pipe as remote output.
32 32
33 33 This is non blocking."""
34 34 s = util.readpipe(pipe)
35 35 if s:
36 36 for l in s.splitlines():
37 37 ui.status(_("remote: "), l, '\n')
38 38
39 39 class doublepipe(object):
40 40 """Operate a side-channel pipe in addition of a main one
41 41
42 42 The side-channel pipe contains server output to be forwarded to the user
43 43 input. The double pipe will behave as the "main" pipe, but will ensure the
44 44 content of the "side" pipe is properly processed while we wait for blocking
45 45 call on the "main" pipe.
46 46
47 47 If large amounts of data are read from "main", the forward will cease after
48 48 the first bytes start to appear. This simplifies the implementation
49 49 without affecting actual output of sshpeer too much as we rarely issue
50 50 large read for data not yet emitted by the server.
51 51
52 52 The main pipe is expected to be a 'bufferedinputpipe' from the util module
53 53 that handle all the os specific bites. This class lives in this module
54 54 because it focus on behavior specifig to the ssh protocol."""
55 55
56 56 def __init__(self, ui, main, side):
57 57 self._ui = ui
58 58 self._main = main
59 59 self._side = side
60 60
61 61 def _wait(self):
62 62 """wait until some data are available on main or side
63 63
64 64 return a pair of boolean (ismainready, issideready)
65 65
66 66 (This will only wait for data if the setup is supported by `util.poll`)
67 67 """
68 68 if getattr(self._main, 'hasbuffer', False): # getattr for classic pipe
69 69 return (True, True) # main has data, assume side is worth poking at.
70 70 fds = [self._main.fileno(), self._side.fileno()]
71 71 try:
72 72 act = util.poll(fds)
73 73 except NotImplementedError:
74 74 # non supported yet case, assume all have data.
75 75 act = fds
76 76 return (self._main.fileno() in act, self._side.fileno() in act)
77 77
78 78 def write(self, data):
79 79 return self._call('write', data)
80 80
81 81 def read(self, size):
82 82 return self._call('read', size)
83 83
84 84 def readline(self):
85 85 return self._call('readline')
86 86
87 87 def _call(self, methname, data=None):
88 88 """call <methname> on "main", forward output of "side" while blocking
89 89 """
90 90 # data can be '' or 0
91 91 if (data is not None and not data) or self._main.closed:
92 92 _forwardoutput(self._ui, self._side)
93 93 return ''
94 94 while True:
95 95 mainready, sideready = self._wait()
96 96 if sideready:
97 97 _forwardoutput(self._ui, self._side)
98 98 if mainready:
99 99 meth = getattr(self._main, methname)
100 100 if data is None:
101 101 return meth()
102 102 else:
103 103 return meth(data)
104 104
105 105 def close(self):
106 106 return self._main.close()
107 107
108 108 def flush(self):
109 109 return self._main.flush()
110 110
111 111 class sshpeer(wireproto.wirepeer):
112 112 def __init__(self, ui, path, create=False):
113 113 self._url = path
114 114 self.ui = ui
115 115 self.pipeo = self.pipei = self.pipee = None
116 116
117 117 u = util.url(path, parsequery=False, parsefragment=False)
118 118 if u.scheme != 'ssh' or not u.host or u.path is None:
119 119 self._abort(error.RepoError(_("couldn't parse location %s") % path))
120 120
121 121 self.user = u.user
122 122 if u.passwd is not None:
123 123 self._abort(error.RepoError(_("password in URL not supported")))
124 124 self.host = u.host
125 125 self.port = u.port
126 126 self.path = u.path or "."
127 127
128 128 sshcmd = self.ui.config("ui", "ssh", "ssh")
129 129 remotecmd = self.ui.config("ui", "remotecmd", "hg")
130 130
131 131 args = util.sshargs(sshcmd,
132 132 _serverquote(self.host),
133 133 _serverquote(self.user),
134 134 _serverquote(self.port))
135 135
136 136 if create:
137 137 cmd = '%s %s %s' % (sshcmd, args,
138 138 util.shellquote("%s init %s" %
139 139 (_serverquote(remotecmd), _serverquote(self.path))))
140 140 ui.debug('running %s\n' % cmd)
141 141 res = ui.system(cmd)
142 142 if res != 0:
143 143 self._abort(error.RepoError(_("could not create remote repo")))
144 144
145 145 self._validaterepo(sshcmd, args, remotecmd)
146 146
147 147 def url(self):
148 148 return self._url
149 149
150 150 def _validaterepo(self, sshcmd, args, remotecmd):
151 151 # cleanup up previous run
152 152 self.cleanup()
153 153
154 154 cmd = '%s %s %s' % (sshcmd, args,
155 155 util.shellquote("%s -R %s serve --stdio" %
156 156 (_serverquote(remotecmd), _serverquote(self.path))))
157 157 self.ui.debug('running %s\n' % cmd)
158 158 cmd = util.quotecommand(cmd)
159 159
160 160 # while self.subprocess isn't used, having it allows the subprocess to
161 161 # to clean up correctly later
162 162 #
163 163 # no buffer allow the use of 'select'
164 164 # feel free to remove buffering and select usage when we ultimately
165 165 # move to threading.
166 166 sub = util.popen4(cmd, bufsize=0)
167 167 self.pipeo, self.pipei, self.pipee, self.subprocess = sub
168 168
169 169 self.pipei = util.bufferedinputpipe(self.pipei)
170 170 self.pipei = doublepipe(self.ui, self.pipei, self.pipee)
171 self.pipeo = doublepipe(self.ui, self.pipeo, self.pipee)
171 172
172 173 # skip any noise generated by remote shell
173 174 self._callstream("hello")
174 175 r = self._callstream("between", pairs=("%s-%s" % ("0"*40, "0"*40)))
175 176 lines = ["", "dummy"]
176 177 max_noise = 500
177 178 while lines[-1] and max_noise:
178 179 l = r.readline()
179 180 self.readerr()
180 181 if lines[-1] == "1\n" and l == "\n":
181 182 break
182 183 if l:
183 184 self.ui.debug("remote: ", l)
184 185 lines.append(l)
185 186 max_noise -= 1
186 187 else:
187 188 self._abort(error.RepoError(_('no suitable response from '
188 189 'remote hg')))
189 190
190 191 self._caps = set()
191 192 for l in reversed(lines):
192 193 if l.startswith("capabilities:"):
193 194 self._caps.update(l[:-1].split(":")[1].split())
194 195 break
195 196
196 197 def _capabilities(self):
197 198 return self._caps
198 199
199 200 def readerr(self):
200 201 _forwardoutput(self.ui, self.pipee)
201 202
202 203 def _abort(self, exception):
203 204 self.cleanup()
204 205 raise exception
205 206
206 207 def cleanup(self):
207 208 if self.pipeo is None:
208 209 return
209 210 self.pipeo.close()
210 211 self.pipei.close()
211 212 try:
212 213 # read the error descriptor until EOF
213 214 for l in self.pipee:
214 215 self.ui.status(_("remote: "), l)
215 216 except (IOError, ValueError):
216 217 pass
217 218 self.pipee.close()
218 219
219 220 __del__ = cleanup
220 221
221 222 def _callstream(self, cmd, **args):
222 223 self.ui.debug("sending %s command\n" % cmd)
223 224 self.pipeo.write("%s\n" % cmd)
224 225 _func, names = wireproto.commands[cmd]
225 226 keys = names.split()
226 227 wireargs = {}
227 228 for k in keys:
228 229 if k == '*':
229 230 wireargs['*'] = args
230 231 break
231 232 else:
232 233 wireargs[k] = args[k]
233 234 del args[k]
234 235 for k, v in sorted(wireargs.iteritems()):
235 236 self.pipeo.write("%s %d\n" % (k, len(v)))
236 237 if isinstance(v, dict):
237 238 for dk, dv in v.iteritems():
238 239 self.pipeo.write("%s %d\n" % (dk, len(dv)))
239 240 self.pipeo.write(dv)
240 241 else:
241 242 self.pipeo.write(v)
242 243 self.pipeo.flush()
243 244
244 245 return self.pipei
245 246
246 247 def _callcompressable(self, cmd, **args):
247 248 return self._callstream(cmd, **args)
248 249
249 250 def _call(self, cmd, **args):
250 251 self._callstream(cmd, **args)
251 252 return self._recv()
252 253
253 254 def _callpush(self, cmd, fp, **args):
254 255 r = self._call(cmd, **args)
255 256 if r:
256 257 return '', r
257 258 while True:
258 259 d = fp.read(4096)
259 260 if not d:
260 261 break
261 262 self._send(d)
262 263 self._send("", flush=True)
263 264 r = self._recv()
264 265 if r:
265 266 return '', r
266 267 return self._recv(), ''
267 268
268 269 def _calltwowaystream(self, cmd, fp, **args):
269 270 r = self._call(cmd, **args)
270 271 if r:
271 272 # XXX needs to be made better
272 273 raise util.Abort('unexpected remote reply: %s' % r)
273 274 while True:
274 275 d = fp.read(4096)
275 276 if not d:
276 277 break
277 278 self._send(d)
278 279 self._send("", flush=True)
279 280 return self.pipei
280 281
281 282 def _recv(self):
282 283 l = self.pipei.readline()
283 284 if l == '\n':
284 285 self.readerr()
285 286 msg = _('check previous remote output')
286 287 self._abort(error.OutOfBandError(hint=msg))
287 288 self.readerr()
288 289 try:
289 290 l = int(l)
290 291 except ValueError:
291 292 self._abort(error.ResponseError(_("unexpected response:"), l))
292 293 return self.pipei.read(l)
293 294
294 295 def _send(self, data, flush=False):
295 296 self.pipeo.write("%d\n" % len(data))
296 297 if data:
297 298 self.pipeo.write(data)
298 299 if flush:
299 300 self.pipeo.flush()
300 301 self.readerr()
301 302
302 303 def lock(self):
303 304 self._call("lock")
304 305 return remotelock(self)
305 306
306 307 def unlock(self):
307 308 self._call("unlock")
308 309
309 310 def addchangegroup(self, cg, source, url, lock=None):
310 311 '''Send a changegroup to the remote server. Return an integer
311 312 similar to unbundle(). DEPRECATED, since it requires locking the
312 313 remote.'''
313 314 d = self._call("addchangegroup")
314 315 if d:
315 316 self._abort(error.RepoError(_("push refused: %s") % d))
316 317 while True:
317 318 d = cg.read(4096)
318 319 if not d:
319 320 break
320 321 self.pipeo.write(d)
321 322 self.readerr()
322 323
323 324 self.pipeo.flush()
324 325
325 326 self.readerr()
326 327 r = self._recv()
327 328 if not r:
328 329 return 1
329 330 try:
330 331 return int(r)
331 332 except ValueError:
332 333 self._abort(error.ResponseError(_("unexpected response:"), r))
333 334
334 335 instance = sshpeer
General Comments 0
You need to be logged in to leave comments. Login now