##// END OF EJS Templates
sshpeer: rename 'size' to 'data' in doublepipe...
Pierre-Yves David -
r25455:dc02a284 default
parent child Browse files
Show More
@@ -1,327 +1,328 b''
1 1 # sshpeer.py - ssh repository proxy class for mercurial
2 2 #
3 3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 import re
9 9 from i18n import _
10 10 import util, error, wireproto
11 11
12 12 class remotelock(object):
13 13 def __init__(self, repo):
14 14 self.repo = repo
15 15 def release(self):
16 16 self.repo.unlock()
17 17 self.repo = None
18 18 def __del__(self):
19 19 if self.repo:
20 20 self.release()
21 21
22 22 def _serverquote(s):
23 23 if not s:
24 24 return s
25 25 '''quote a string for the remote shell ... which we assume is sh'''
26 26 if re.match('[a-zA-Z0-9@%_+=:,./-]*$', s):
27 27 return s
28 28 return "'%s'" % s.replace("'", "'\\''")
29 29
30 30 def _forwardoutput(ui, pipe):
31 31 """display all data currently available on pipe as remote output.
32 32
33 33 This is non blocking."""
34 34 s = util.readpipe(pipe)
35 35 if s:
36 36 for l in s.splitlines():
37 37 ui.status(_("remote: "), l, '\n')
38 38
39 39 class doublepipe(object):
40 40 """Operate a side-channel pipe in addition of a main one
41 41
42 42 The side-channel pipe contains server output to be forwarded to the user
43 43 input. The double pipe will behave as the "main" pipe, but will ensure the
44 44 content of the "side" pipe is properly processed while we wait for blocking
45 45 call on the "main" pipe.
46 46
47 47 If large amounts of data are read from "main", the forward will cease after
48 48 the first bytes start to appear. This simplifies the implementation
49 49 without affecting actual output of sshpeer too much as we rarely issue
50 50 large read for data not yet emitted by the server.
51 51
52 52 The main pipe is expected to be a 'bufferedinputpipe' from the util module
53 53 that handle all the os specific bites. This class lives in this module
54 54 because it focus on behavior specifig to the ssh protocol."""
55 55
56 56 def __init__(self, ui, main, side):
57 57 self._ui = ui
58 58 self._main = main
59 59 self._side = side
60 60
61 61 def _wait(self):
62 62 """wait until some data are available on main or side
63 63
64 64 return a pair of boolean (ismainready, issideready)
65 65
66 66 (This will only wait for data if the setup is supported by `util.poll`)
67 67 """
68 68 if self._main.hasbuffer:
69 69 return (True, True) # main has data, assume side is worth poking at.
70 70 fds = [self._main.fileno(), self._side.fileno()]
71 71 try:
72 72 act = util.poll(fds)
73 73 except NotImplementedError:
74 74 # non supported yet case, assume all have data.
75 75 act = fds
76 76 return (self._main.fileno() in act, self._side.fileno() in act)
77 77
78 78 def read(self, size):
79 79 return self._call('read', size)
80 80
81 81 def readline(self):
82 82 return self._call('readline')
83 83
84 def _call(self, methname, size=None):
84 def _call(self, methname, data=None):
85 85 """call <methname> on "main", forward output of "side" while blocking
86 86 """
87 if size == 0 or self._main.closed:
87 # data can be '' or 0
88 if (data is not None and not data) or self._main.closed:
88 89 _forwardoutput(self._ui, self._side)
89 90 return ''
90 91 while True:
91 92 mainready, sideready = self._wait()
92 93 if sideready:
93 94 _forwardoutput(self._ui, self._side)
94 95 if mainready:
95 96 meth = getattr(self._main, methname)
96 if size is None:
97 if data is None:
97 98 return meth()
98 99 else:
99 return meth(size)
100 return meth(data)
100 101
101 102 def close(self):
102 103 return self._main.close()
103 104
104 105 class sshpeer(wireproto.wirepeer):
105 106 def __init__(self, ui, path, create=False):
106 107 self._url = path
107 108 self.ui = ui
108 109 self.pipeo = self.pipei = self.pipee = None
109 110
110 111 u = util.url(path, parsequery=False, parsefragment=False)
111 112 if u.scheme != 'ssh' or not u.host or u.path is None:
112 113 self._abort(error.RepoError(_("couldn't parse location %s") % path))
113 114
114 115 self.user = u.user
115 116 if u.passwd is not None:
116 117 self._abort(error.RepoError(_("password in URL not supported")))
117 118 self.host = u.host
118 119 self.port = u.port
119 120 self.path = u.path or "."
120 121
121 122 sshcmd = self.ui.config("ui", "ssh", "ssh")
122 123 remotecmd = self.ui.config("ui", "remotecmd", "hg")
123 124
124 125 args = util.sshargs(sshcmd,
125 126 _serverquote(self.host),
126 127 _serverquote(self.user),
127 128 _serverquote(self.port))
128 129
129 130 if create:
130 131 cmd = '%s %s %s' % (sshcmd, args,
131 132 util.shellquote("%s init %s" %
132 133 (_serverquote(remotecmd), _serverquote(self.path))))
133 134 ui.debug('running %s\n' % cmd)
134 135 res = ui.system(cmd)
135 136 if res != 0:
136 137 self._abort(error.RepoError(_("could not create remote repo")))
137 138
138 139 self._validaterepo(sshcmd, args, remotecmd)
139 140
140 141 def url(self):
141 142 return self._url
142 143
143 144 def _validaterepo(self, sshcmd, args, remotecmd):
144 145 # cleanup up previous run
145 146 self.cleanup()
146 147
147 148 cmd = '%s %s %s' % (sshcmd, args,
148 149 util.shellquote("%s -R %s serve --stdio" %
149 150 (_serverquote(remotecmd), _serverquote(self.path))))
150 151 self.ui.debug('running %s\n' % cmd)
151 152 cmd = util.quotecommand(cmd)
152 153
153 154 # while self.subprocess isn't used, having it allows the subprocess to
154 155 # to clean up correctly later
155 156 #
156 157 # no buffer allow the use of 'select'
157 158 # feel free to remove buffering and select usage when we ultimately
158 159 # move to threading.
159 160 sub = util.popen4(cmd, bufsize=0)
160 161 self.pipeo, self.pipei, self.pipee, self.subprocess = sub
161 162
162 163 self.pipei = util.bufferedinputpipe(self.pipei)
163 164 self.pipei = doublepipe(self.ui, self.pipei, self.pipee)
164 165
165 166 # skip any noise generated by remote shell
166 167 self._callstream("hello")
167 168 r = self._callstream("between", pairs=("%s-%s" % ("0"*40, "0"*40)))
168 169 lines = ["", "dummy"]
169 170 max_noise = 500
170 171 while lines[-1] and max_noise:
171 172 l = r.readline()
172 173 self.readerr()
173 174 if lines[-1] == "1\n" and l == "\n":
174 175 break
175 176 if l:
176 177 self.ui.debug("remote: ", l)
177 178 lines.append(l)
178 179 max_noise -= 1
179 180 else:
180 181 self._abort(error.RepoError(_('no suitable response from '
181 182 'remote hg')))
182 183
183 184 self._caps = set()
184 185 for l in reversed(lines):
185 186 if l.startswith("capabilities:"):
186 187 self._caps.update(l[:-1].split(":")[1].split())
187 188 break
188 189
189 190 def _capabilities(self):
190 191 return self._caps
191 192
192 193 def readerr(self):
193 194 _forwardoutput(self.ui, self.pipee)
194 195
195 196 def _abort(self, exception):
196 197 self.cleanup()
197 198 raise exception
198 199
199 200 def cleanup(self):
200 201 if self.pipeo is None:
201 202 return
202 203 self.pipeo.close()
203 204 self.pipei.close()
204 205 try:
205 206 # read the error descriptor until EOF
206 207 for l in self.pipee:
207 208 self.ui.status(_("remote: "), l)
208 209 except (IOError, ValueError):
209 210 pass
210 211 self.pipee.close()
211 212
212 213 __del__ = cleanup
213 214
214 215 def _callstream(self, cmd, **args):
215 216 self.ui.debug("sending %s command\n" % cmd)
216 217 self.pipeo.write("%s\n" % cmd)
217 218 _func, names = wireproto.commands[cmd]
218 219 keys = names.split()
219 220 wireargs = {}
220 221 for k in keys:
221 222 if k == '*':
222 223 wireargs['*'] = args
223 224 break
224 225 else:
225 226 wireargs[k] = args[k]
226 227 del args[k]
227 228 for k, v in sorted(wireargs.iteritems()):
228 229 self.pipeo.write("%s %d\n" % (k, len(v)))
229 230 if isinstance(v, dict):
230 231 for dk, dv in v.iteritems():
231 232 self.pipeo.write("%s %d\n" % (dk, len(dv)))
232 233 self.pipeo.write(dv)
233 234 else:
234 235 self.pipeo.write(v)
235 236 self.pipeo.flush()
236 237
237 238 return self.pipei
238 239
239 240 def _callcompressable(self, cmd, **args):
240 241 return self._callstream(cmd, **args)
241 242
242 243 def _call(self, cmd, **args):
243 244 self._callstream(cmd, **args)
244 245 return self._recv()
245 246
246 247 def _callpush(self, cmd, fp, **args):
247 248 r = self._call(cmd, **args)
248 249 if r:
249 250 return '', r
250 251 while True:
251 252 d = fp.read(4096)
252 253 if not d:
253 254 break
254 255 self._send(d)
255 256 self._send("", flush=True)
256 257 r = self._recv()
257 258 if r:
258 259 return '', r
259 260 return self._recv(), ''
260 261
261 262 def _calltwowaystream(self, cmd, fp, **args):
262 263 r = self._call(cmd, **args)
263 264 if r:
264 265 # XXX needs to be made better
265 266 raise util.Abort('unexpected remote reply: %s' % r)
266 267 while True:
267 268 d = fp.read(4096)
268 269 if not d:
269 270 break
270 271 self._send(d)
271 272 self._send("", flush=True)
272 273 return self.pipei
273 274
274 275 def _recv(self):
275 276 l = self.pipei.readline()
276 277 if l == '\n':
277 278 self.readerr()
278 279 msg = _('check previous remote output')
279 280 self._abort(error.OutOfBandError(hint=msg))
280 281 self.readerr()
281 282 try:
282 283 l = int(l)
283 284 except ValueError:
284 285 self._abort(error.ResponseError(_("unexpected response:"), l))
285 286 return self.pipei.read(l)
286 287
287 288 def _send(self, data, flush=False):
288 289 self.pipeo.write("%d\n" % len(data))
289 290 if data:
290 291 self.pipeo.write(data)
291 292 if flush:
292 293 self.pipeo.flush()
293 294 self.readerr()
294 295
295 296 def lock(self):
296 297 self._call("lock")
297 298 return remotelock(self)
298 299
299 300 def unlock(self):
300 301 self._call("unlock")
301 302
302 303 def addchangegroup(self, cg, source, url, lock=None):
303 304 '''Send a changegroup to the remote server. Return an integer
304 305 similar to unbundle(). DEPRECATED, since it requires locking the
305 306 remote.'''
306 307 d = self._call("addchangegroup")
307 308 if d:
308 309 self._abort(error.RepoError(_("push refused: %s") % d))
309 310 while True:
310 311 d = cg.read(4096)
311 312 if not d:
312 313 break
313 314 self.pipeo.write(d)
314 315 self.readerr()
315 316
316 317 self.pipeo.flush()
317 318
318 319 self.readerr()
319 320 r = self._recv()
320 321 if not r:
321 322 return 1
322 323 try:
323 324 return int(r)
324 325 except ValueError:
325 326 self._abort(error.ResponseError(_("unexpected response:"), r))
326 327
327 328 instance = sshpeer
General Comments 0
You need to be logged in to leave comments. Login now