##// END OF EJS Templates
sshpeer: allow write operations through double pipe...
Pierre-Yves David -
r25456:408b7979 default
parent child Browse files
Show More
@@ -1,328 +1,334 b''
1 1 # sshpeer.py - ssh repository proxy class for mercurial
2 2 #
3 3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 import re
9 9 from i18n import _
10 10 import util, error, wireproto
11 11
12 12 class remotelock(object):
13 13 def __init__(self, repo):
14 14 self.repo = repo
15 15 def release(self):
16 16 self.repo.unlock()
17 17 self.repo = None
18 18 def __del__(self):
19 19 if self.repo:
20 20 self.release()
21 21
22 22 def _serverquote(s):
23 23 if not s:
24 24 return s
25 25 '''quote a string for the remote shell ... which we assume is sh'''
26 26 if re.match('[a-zA-Z0-9@%_+=:,./-]*$', s):
27 27 return s
28 28 return "'%s'" % s.replace("'", "'\\''")
29 29
30 30 def _forwardoutput(ui, pipe):
31 31 """display all data currently available on pipe as remote output.
32 32
33 33 This is non blocking."""
34 34 s = util.readpipe(pipe)
35 35 if s:
36 36 for l in s.splitlines():
37 37 ui.status(_("remote: "), l, '\n')
38 38
39 39 class doublepipe(object):
40 40 """Operate a side-channel pipe in addition of a main one
41 41
42 42 The side-channel pipe contains server output to be forwarded to the user
43 43 input. The double pipe will behave as the "main" pipe, but will ensure the
44 44 content of the "side" pipe is properly processed while we wait for blocking
45 45 call on the "main" pipe.
46 46
47 47 If large amounts of data are read from "main", the forward will cease after
48 48 the first bytes start to appear. This simplifies the implementation
49 49 without affecting actual output of sshpeer too much as we rarely issue
50 50 large read for data not yet emitted by the server.
51 51
52 52 The main pipe is expected to be a 'bufferedinputpipe' from the util module
53 53 that handle all the os specific bites. This class lives in this module
54 54 because it focus on behavior specifig to the ssh protocol."""
55 55
56 56 def __init__(self, ui, main, side):
57 57 self._ui = ui
58 58 self._main = main
59 59 self._side = side
60 60
61 61 def _wait(self):
62 62 """wait until some data are available on main or side
63 63
64 64 return a pair of boolean (ismainready, issideready)
65 65
66 66 (This will only wait for data if the setup is supported by `util.poll`)
67 67 """
68 68 if self._main.hasbuffer:
69 69 return (True, True) # main has data, assume side is worth poking at.
70 70 fds = [self._main.fileno(), self._side.fileno()]
71 71 try:
72 72 act = util.poll(fds)
73 73 except NotImplementedError:
74 74 # non supported yet case, assume all have data.
75 75 act = fds
76 76 return (self._main.fileno() in act, self._side.fileno() in act)
77 77
78 def write(self, data):
79 return self._call('write', data)
80
78 81 def read(self, size):
79 82 return self._call('read', size)
80 83
81 84 def readline(self):
82 85 return self._call('readline')
83 86
84 87 def _call(self, methname, data=None):
85 88 """call <methname> on "main", forward output of "side" while blocking
86 89 """
87 90 # data can be '' or 0
88 91 if (data is not None and not data) or self._main.closed:
89 92 _forwardoutput(self._ui, self._side)
90 93 return ''
91 94 while True:
92 95 mainready, sideready = self._wait()
93 96 if sideready:
94 97 _forwardoutput(self._ui, self._side)
95 98 if mainready:
96 99 meth = getattr(self._main, methname)
97 100 if data is None:
98 101 return meth()
99 102 else:
100 103 return meth(data)
101 104
102 105 def close(self):
103 106 return self._main.close()
104 107
108 def flush(self):
109 return self._main.flush()
110
105 111 class sshpeer(wireproto.wirepeer):
106 112 def __init__(self, ui, path, create=False):
107 113 self._url = path
108 114 self.ui = ui
109 115 self.pipeo = self.pipei = self.pipee = None
110 116
111 117 u = util.url(path, parsequery=False, parsefragment=False)
112 118 if u.scheme != 'ssh' or not u.host or u.path is None:
113 119 self._abort(error.RepoError(_("couldn't parse location %s") % path))
114 120
115 121 self.user = u.user
116 122 if u.passwd is not None:
117 123 self._abort(error.RepoError(_("password in URL not supported")))
118 124 self.host = u.host
119 125 self.port = u.port
120 126 self.path = u.path or "."
121 127
122 128 sshcmd = self.ui.config("ui", "ssh", "ssh")
123 129 remotecmd = self.ui.config("ui", "remotecmd", "hg")
124 130
125 131 args = util.sshargs(sshcmd,
126 132 _serverquote(self.host),
127 133 _serverquote(self.user),
128 134 _serverquote(self.port))
129 135
130 136 if create:
131 137 cmd = '%s %s %s' % (sshcmd, args,
132 138 util.shellquote("%s init %s" %
133 139 (_serverquote(remotecmd), _serverquote(self.path))))
134 140 ui.debug('running %s\n' % cmd)
135 141 res = ui.system(cmd)
136 142 if res != 0:
137 143 self._abort(error.RepoError(_("could not create remote repo")))
138 144
139 145 self._validaterepo(sshcmd, args, remotecmd)
140 146
141 147 def url(self):
142 148 return self._url
143 149
144 150 def _validaterepo(self, sshcmd, args, remotecmd):
145 151 # cleanup up previous run
146 152 self.cleanup()
147 153
148 154 cmd = '%s %s %s' % (sshcmd, args,
149 155 util.shellquote("%s -R %s serve --stdio" %
150 156 (_serverquote(remotecmd), _serverquote(self.path))))
151 157 self.ui.debug('running %s\n' % cmd)
152 158 cmd = util.quotecommand(cmd)
153 159
154 160 # while self.subprocess isn't used, having it allows the subprocess to
155 161 # to clean up correctly later
156 162 #
157 163 # no buffer allow the use of 'select'
158 164 # feel free to remove buffering and select usage when we ultimately
159 165 # move to threading.
160 166 sub = util.popen4(cmd, bufsize=0)
161 167 self.pipeo, self.pipei, self.pipee, self.subprocess = sub
162 168
163 169 self.pipei = util.bufferedinputpipe(self.pipei)
164 170 self.pipei = doublepipe(self.ui, self.pipei, self.pipee)
165 171
166 172 # skip any noise generated by remote shell
167 173 self._callstream("hello")
168 174 r = self._callstream("between", pairs=("%s-%s" % ("0"*40, "0"*40)))
169 175 lines = ["", "dummy"]
170 176 max_noise = 500
171 177 while lines[-1] and max_noise:
172 178 l = r.readline()
173 179 self.readerr()
174 180 if lines[-1] == "1\n" and l == "\n":
175 181 break
176 182 if l:
177 183 self.ui.debug("remote: ", l)
178 184 lines.append(l)
179 185 max_noise -= 1
180 186 else:
181 187 self._abort(error.RepoError(_('no suitable response from '
182 188 'remote hg')))
183 189
184 190 self._caps = set()
185 191 for l in reversed(lines):
186 192 if l.startswith("capabilities:"):
187 193 self._caps.update(l[:-1].split(":")[1].split())
188 194 break
189 195
190 196 def _capabilities(self):
191 197 return self._caps
192 198
193 199 def readerr(self):
194 200 _forwardoutput(self.ui, self.pipee)
195 201
196 202 def _abort(self, exception):
197 203 self.cleanup()
198 204 raise exception
199 205
200 206 def cleanup(self):
201 207 if self.pipeo is None:
202 208 return
203 209 self.pipeo.close()
204 210 self.pipei.close()
205 211 try:
206 212 # read the error descriptor until EOF
207 213 for l in self.pipee:
208 214 self.ui.status(_("remote: "), l)
209 215 except (IOError, ValueError):
210 216 pass
211 217 self.pipee.close()
212 218
213 219 __del__ = cleanup
214 220
215 221 def _callstream(self, cmd, **args):
216 222 self.ui.debug("sending %s command\n" % cmd)
217 223 self.pipeo.write("%s\n" % cmd)
218 224 _func, names = wireproto.commands[cmd]
219 225 keys = names.split()
220 226 wireargs = {}
221 227 for k in keys:
222 228 if k == '*':
223 229 wireargs['*'] = args
224 230 break
225 231 else:
226 232 wireargs[k] = args[k]
227 233 del args[k]
228 234 for k, v in sorted(wireargs.iteritems()):
229 235 self.pipeo.write("%s %d\n" % (k, len(v)))
230 236 if isinstance(v, dict):
231 237 for dk, dv in v.iteritems():
232 238 self.pipeo.write("%s %d\n" % (dk, len(dv)))
233 239 self.pipeo.write(dv)
234 240 else:
235 241 self.pipeo.write(v)
236 242 self.pipeo.flush()
237 243
238 244 return self.pipei
239 245
240 246 def _callcompressable(self, cmd, **args):
241 247 return self._callstream(cmd, **args)
242 248
243 249 def _call(self, cmd, **args):
244 250 self._callstream(cmd, **args)
245 251 return self._recv()
246 252
247 253 def _callpush(self, cmd, fp, **args):
248 254 r = self._call(cmd, **args)
249 255 if r:
250 256 return '', r
251 257 while True:
252 258 d = fp.read(4096)
253 259 if not d:
254 260 break
255 261 self._send(d)
256 262 self._send("", flush=True)
257 263 r = self._recv()
258 264 if r:
259 265 return '', r
260 266 return self._recv(), ''
261 267
262 268 def _calltwowaystream(self, cmd, fp, **args):
263 269 r = self._call(cmd, **args)
264 270 if r:
265 271 # XXX needs to be made better
266 272 raise util.Abort('unexpected remote reply: %s' % r)
267 273 while True:
268 274 d = fp.read(4096)
269 275 if not d:
270 276 break
271 277 self._send(d)
272 278 self._send("", flush=True)
273 279 return self.pipei
274 280
275 281 def _recv(self):
276 282 l = self.pipei.readline()
277 283 if l == '\n':
278 284 self.readerr()
279 285 msg = _('check previous remote output')
280 286 self._abort(error.OutOfBandError(hint=msg))
281 287 self.readerr()
282 288 try:
283 289 l = int(l)
284 290 except ValueError:
285 291 self._abort(error.ResponseError(_("unexpected response:"), l))
286 292 return self.pipei.read(l)
287 293
288 294 def _send(self, data, flush=False):
289 295 self.pipeo.write("%d\n" % len(data))
290 296 if data:
291 297 self.pipeo.write(data)
292 298 if flush:
293 299 self.pipeo.flush()
294 300 self.readerr()
295 301
296 302 def lock(self):
297 303 self._call("lock")
298 304 return remotelock(self)
299 305
300 306 def unlock(self):
301 307 self._call("unlock")
302 308
303 309 def addchangegroup(self, cg, source, url, lock=None):
304 310 '''Send a changegroup to the remote server. Return an integer
305 311 similar to unbundle(). DEPRECATED, since it requires locking the
306 312 remote.'''
307 313 d = self._call("addchangegroup")
308 314 if d:
309 315 self._abort(error.RepoError(_("push refused: %s") % d))
310 316 while True:
311 317 d = cg.read(4096)
312 318 if not d:
313 319 break
314 320 self.pipeo.write(d)
315 321 self.readerr()
316 322
317 323 self.pipeo.flush()
318 324
319 325 self.readerr()
320 326 r = self._recv()
321 327 if not r:
322 328 return 1
323 329 try:
324 330 return int(r)
325 331 except ValueError:
326 332 self._abort(error.ResponseError(_("unexpected response:"), r))
327 333
328 334 instance = sshpeer
General Comments 0
You need to be logged in to leave comments. Login now