##// END OF EJS Templates
sshpeer: allow doublepipe on unbuffered main pipe...
Pierre-Yves David -
r25457:2afa7481 default
parent child Browse files
Show More
@@ -1,334 +1,334 b''
1 1 # sshpeer.py - ssh repository proxy class for mercurial
2 2 #
3 3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 import re
9 9 from i18n import _
10 10 import util, error, wireproto
11 11
12 12 class remotelock(object):
13 13 def __init__(self, repo):
14 14 self.repo = repo
15 15 def release(self):
16 16 self.repo.unlock()
17 17 self.repo = None
18 18 def __del__(self):
19 19 if self.repo:
20 20 self.release()
21 21
22 22 def _serverquote(s):
23 23 if not s:
24 24 return s
25 25 '''quote a string for the remote shell ... which we assume is sh'''
26 26 if re.match('[a-zA-Z0-9@%_+=:,./-]*$', s):
27 27 return s
28 28 return "'%s'" % s.replace("'", "'\\''")
29 29
30 30 def _forwardoutput(ui, pipe):
31 31 """display all data currently available on pipe as remote output.
32 32
33 33 This is non blocking."""
34 34 s = util.readpipe(pipe)
35 35 if s:
36 36 for l in s.splitlines():
37 37 ui.status(_("remote: "), l, '\n')
38 38
39 39 class doublepipe(object):
40 40 """Operate a side-channel pipe in addition of a main one
41 41
42 42 The side-channel pipe contains server output to be forwarded to the user
43 43 input. The double pipe will behave as the "main" pipe, but will ensure the
44 44 content of the "side" pipe is properly processed while we wait for blocking
45 45 call on the "main" pipe.
46 46
47 47 If large amounts of data are read from "main", the forward will cease after
48 48 the first bytes start to appear. This simplifies the implementation
49 49 without affecting actual output of sshpeer too much as we rarely issue
50 50 large read for data not yet emitted by the server.
51 51
52 52 The main pipe is expected to be a 'bufferedinputpipe' from the util module
53 53 that handle all the os specific bites. This class lives in this module
54 54 because it focus on behavior specifig to the ssh protocol."""
55 55
56 56 def __init__(self, ui, main, side):
57 57 self._ui = ui
58 58 self._main = main
59 59 self._side = side
60 60
61 61 def _wait(self):
62 62 """wait until some data are available on main or side
63 63
64 64 return a pair of boolean (ismainready, issideready)
65 65
66 66 (This will only wait for data if the setup is supported by `util.poll`)
67 67 """
68 if self._main.hasbuffer:
68 if getattr(self._main, 'hasbuffer', False): # getattr for classic pipe
69 69 return (True, True) # main has data, assume side is worth poking at.
70 70 fds = [self._main.fileno(), self._side.fileno()]
71 71 try:
72 72 act = util.poll(fds)
73 73 except NotImplementedError:
74 74 # non supported yet case, assume all have data.
75 75 act = fds
76 76 return (self._main.fileno() in act, self._side.fileno() in act)
77 77
78 78 def write(self, data):
79 79 return self._call('write', data)
80 80
81 81 def read(self, size):
82 82 return self._call('read', size)
83 83
84 84 def readline(self):
85 85 return self._call('readline')
86 86
87 87 def _call(self, methname, data=None):
88 88 """call <methname> on "main", forward output of "side" while blocking
89 89 """
90 90 # data can be '' or 0
91 91 if (data is not None and not data) or self._main.closed:
92 92 _forwardoutput(self._ui, self._side)
93 93 return ''
94 94 while True:
95 95 mainready, sideready = self._wait()
96 96 if sideready:
97 97 _forwardoutput(self._ui, self._side)
98 98 if mainready:
99 99 meth = getattr(self._main, methname)
100 100 if data is None:
101 101 return meth()
102 102 else:
103 103 return meth(data)
104 104
105 105 def close(self):
106 106 return self._main.close()
107 107
108 108 def flush(self):
109 109 return self._main.flush()
110 110
111 111 class sshpeer(wireproto.wirepeer):
112 112 def __init__(self, ui, path, create=False):
113 113 self._url = path
114 114 self.ui = ui
115 115 self.pipeo = self.pipei = self.pipee = None
116 116
117 117 u = util.url(path, parsequery=False, parsefragment=False)
118 118 if u.scheme != 'ssh' or not u.host or u.path is None:
119 119 self._abort(error.RepoError(_("couldn't parse location %s") % path))
120 120
121 121 self.user = u.user
122 122 if u.passwd is not None:
123 123 self._abort(error.RepoError(_("password in URL not supported")))
124 124 self.host = u.host
125 125 self.port = u.port
126 126 self.path = u.path or "."
127 127
128 128 sshcmd = self.ui.config("ui", "ssh", "ssh")
129 129 remotecmd = self.ui.config("ui", "remotecmd", "hg")
130 130
131 131 args = util.sshargs(sshcmd,
132 132 _serverquote(self.host),
133 133 _serverquote(self.user),
134 134 _serverquote(self.port))
135 135
136 136 if create:
137 137 cmd = '%s %s %s' % (sshcmd, args,
138 138 util.shellquote("%s init %s" %
139 139 (_serverquote(remotecmd), _serverquote(self.path))))
140 140 ui.debug('running %s\n' % cmd)
141 141 res = ui.system(cmd)
142 142 if res != 0:
143 143 self._abort(error.RepoError(_("could not create remote repo")))
144 144
145 145 self._validaterepo(sshcmd, args, remotecmd)
146 146
147 147 def url(self):
148 148 return self._url
149 149
150 150 def _validaterepo(self, sshcmd, args, remotecmd):
151 151 # cleanup up previous run
152 152 self.cleanup()
153 153
154 154 cmd = '%s %s %s' % (sshcmd, args,
155 155 util.shellquote("%s -R %s serve --stdio" %
156 156 (_serverquote(remotecmd), _serverquote(self.path))))
157 157 self.ui.debug('running %s\n' % cmd)
158 158 cmd = util.quotecommand(cmd)
159 159
160 160 # while self.subprocess isn't used, having it allows the subprocess to
161 161 # to clean up correctly later
162 162 #
163 163 # no buffer allow the use of 'select'
164 164 # feel free to remove buffering and select usage when we ultimately
165 165 # move to threading.
166 166 sub = util.popen4(cmd, bufsize=0)
167 167 self.pipeo, self.pipei, self.pipee, self.subprocess = sub
168 168
169 169 self.pipei = util.bufferedinputpipe(self.pipei)
170 170 self.pipei = doublepipe(self.ui, self.pipei, self.pipee)
171 171
172 172 # skip any noise generated by remote shell
173 173 self._callstream("hello")
174 174 r = self._callstream("between", pairs=("%s-%s" % ("0"*40, "0"*40)))
175 175 lines = ["", "dummy"]
176 176 max_noise = 500
177 177 while lines[-1] and max_noise:
178 178 l = r.readline()
179 179 self.readerr()
180 180 if lines[-1] == "1\n" and l == "\n":
181 181 break
182 182 if l:
183 183 self.ui.debug("remote: ", l)
184 184 lines.append(l)
185 185 max_noise -= 1
186 186 else:
187 187 self._abort(error.RepoError(_('no suitable response from '
188 188 'remote hg')))
189 189
190 190 self._caps = set()
191 191 for l in reversed(lines):
192 192 if l.startswith("capabilities:"):
193 193 self._caps.update(l[:-1].split(":")[1].split())
194 194 break
195 195
196 196 def _capabilities(self):
197 197 return self._caps
198 198
199 199 def readerr(self):
200 200 _forwardoutput(self.ui, self.pipee)
201 201
202 202 def _abort(self, exception):
203 203 self.cleanup()
204 204 raise exception
205 205
206 206 def cleanup(self):
207 207 if self.pipeo is None:
208 208 return
209 209 self.pipeo.close()
210 210 self.pipei.close()
211 211 try:
212 212 # read the error descriptor until EOF
213 213 for l in self.pipee:
214 214 self.ui.status(_("remote: "), l)
215 215 except (IOError, ValueError):
216 216 pass
217 217 self.pipee.close()
218 218
219 219 __del__ = cleanup
220 220
221 221 def _callstream(self, cmd, **args):
222 222 self.ui.debug("sending %s command\n" % cmd)
223 223 self.pipeo.write("%s\n" % cmd)
224 224 _func, names = wireproto.commands[cmd]
225 225 keys = names.split()
226 226 wireargs = {}
227 227 for k in keys:
228 228 if k == '*':
229 229 wireargs['*'] = args
230 230 break
231 231 else:
232 232 wireargs[k] = args[k]
233 233 del args[k]
234 234 for k, v in sorted(wireargs.iteritems()):
235 235 self.pipeo.write("%s %d\n" % (k, len(v)))
236 236 if isinstance(v, dict):
237 237 for dk, dv in v.iteritems():
238 238 self.pipeo.write("%s %d\n" % (dk, len(dv)))
239 239 self.pipeo.write(dv)
240 240 else:
241 241 self.pipeo.write(v)
242 242 self.pipeo.flush()
243 243
244 244 return self.pipei
245 245
246 246 def _callcompressable(self, cmd, **args):
247 247 return self._callstream(cmd, **args)
248 248
249 249 def _call(self, cmd, **args):
250 250 self._callstream(cmd, **args)
251 251 return self._recv()
252 252
253 253 def _callpush(self, cmd, fp, **args):
254 254 r = self._call(cmd, **args)
255 255 if r:
256 256 return '', r
257 257 while True:
258 258 d = fp.read(4096)
259 259 if not d:
260 260 break
261 261 self._send(d)
262 262 self._send("", flush=True)
263 263 r = self._recv()
264 264 if r:
265 265 return '', r
266 266 return self._recv(), ''
267 267
268 268 def _calltwowaystream(self, cmd, fp, **args):
269 269 r = self._call(cmd, **args)
270 270 if r:
271 271 # XXX needs to be made better
272 272 raise util.Abort('unexpected remote reply: %s' % r)
273 273 while True:
274 274 d = fp.read(4096)
275 275 if not d:
276 276 break
277 277 self._send(d)
278 278 self._send("", flush=True)
279 279 return self.pipei
280 280
281 281 def _recv(self):
282 282 l = self.pipei.readline()
283 283 if l == '\n':
284 284 self.readerr()
285 285 msg = _('check previous remote output')
286 286 self._abort(error.OutOfBandError(hint=msg))
287 287 self.readerr()
288 288 try:
289 289 l = int(l)
290 290 except ValueError:
291 291 self._abort(error.ResponseError(_("unexpected response:"), l))
292 292 return self.pipei.read(l)
293 293
294 294 def _send(self, data, flush=False):
295 295 self.pipeo.write("%d\n" % len(data))
296 296 if data:
297 297 self.pipeo.write(data)
298 298 if flush:
299 299 self.pipeo.flush()
300 300 self.readerr()
301 301
302 302 def lock(self):
303 303 self._call("lock")
304 304 return remotelock(self)
305 305
306 306 def unlock(self):
307 307 self._call("unlock")
308 308
309 309 def addchangegroup(self, cg, source, url, lock=None):
310 310 '''Send a changegroup to the remote server. Return an integer
311 311 similar to unbundle(). DEPRECATED, since it requires locking the
312 312 remote.'''
313 313 d = self._call("addchangegroup")
314 314 if d:
315 315 self._abort(error.RepoError(_("push refused: %s") % d))
316 316 while True:
317 317 d = cg.read(4096)
318 318 if not d:
319 319 break
320 320 self.pipeo.write(d)
321 321 self.readerr()
322 322
323 323 self.pipeo.flush()
324 324
325 325 self.readerr()
326 326 r = self._recv()
327 327 if not r:
328 328 return 1
329 329 try:
330 330 return int(r)
331 331 except ValueError:
332 332 self._abort(error.ResponseError(_("unexpected response:"), r))
333 333
334 334 instance = sshpeer
General Comments 0
You need to be logged in to leave comments. Login now