##// END OF EJS Templates
sshpeer: make remotelock a context manager
Bryan O'Sullivan -
r27798:8953e963 default
parent child Browse files
Show More
@@ -1,342 +1,347 b''
1 1 # sshpeer.py - ssh repository proxy class for mercurial
2 2 #
3 3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import re
11 11
12 12 from .i18n import _
13 13 from . import (
14 14 error,
15 15 util,
16 16 wireproto,
17 17 )
18 18
19 19 class remotelock(object):
20 20 def __init__(self, repo):
21 21 self.repo = repo
22 22 def release(self):
23 23 self.repo.unlock()
24 24 self.repo = None
25 def __enter__(self):
26 return self
27 def __exit__(self, exc_type, exc_val, exc_tb):
28 if self.repo:
29 self.release()
25 30 def __del__(self):
26 31 if self.repo:
27 32 self.release()
28 33
29 34 def _serverquote(s):
30 35 if not s:
31 36 return s
32 37 '''quote a string for the remote shell ... which we assume is sh'''
33 38 if re.match('[a-zA-Z0-9@%_+=:,./-]*$', s):
34 39 return s
35 40 return "'%s'" % s.replace("'", "'\\''")
36 41
37 42 def _forwardoutput(ui, pipe):
38 43 """display all data currently available on pipe as remote output.
39 44
40 45 This is non blocking."""
41 46 s = util.readpipe(pipe)
42 47 if s:
43 48 for l in s.splitlines():
44 49 ui.status(_("remote: "), l, '\n')
45 50
46 51 class doublepipe(object):
47 52 """Operate a side-channel pipe in addition of a main one
48 53
49 54 The side-channel pipe contains server output to be forwarded to the user
50 55 input. The double pipe will behave as the "main" pipe, but will ensure the
51 56 content of the "side" pipe is properly processed while we wait for blocking
52 57 call on the "main" pipe.
53 58
54 59 If large amounts of data are read from "main", the forward will cease after
55 60 the first bytes start to appear. This simplifies the implementation
56 61 without affecting actual output of sshpeer too much as we rarely issue
57 62 large read for data not yet emitted by the server.
58 63
59 64 The main pipe is expected to be a 'bufferedinputpipe' from the util module
60 65 that handle all the os specific bites. This class lives in this module
61 66 because it focus on behavior specific to the ssh protocol."""
62 67
63 68 def __init__(self, ui, main, side):
64 69 self._ui = ui
65 70 self._main = main
66 71 self._side = side
67 72
68 73 def _wait(self):
69 74 """wait until some data are available on main or side
70 75
71 76 return a pair of boolean (ismainready, issideready)
72 77
73 78 (This will only wait for data if the setup is supported by `util.poll`)
74 79 """
75 80 if getattr(self._main, 'hasbuffer', False): # getattr for classic pipe
76 81 return (True, True) # main has data, assume side is worth poking at.
77 82 fds = [self._main.fileno(), self._side.fileno()]
78 83 try:
79 84 act = util.poll(fds)
80 85 except NotImplementedError:
81 86 # non supported yet case, assume all have data.
82 87 act = fds
83 88 return (self._main.fileno() in act, self._side.fileno() in act)
84 89
85 90 def write(self, data):
86 91 return self._call('write', data)
87 92
88 93 def read(self, size):
89 94 return self._call('read', size)
90 95
91 96 def readline(self):
92 97 return self._call('readline')
93 98
94 99 def _call(self, methname, data=None):
95 100 """call <methname> on "main", forward output of "side" while blocking
96 101 """
97 102 # data can be '' or 0
98 103 if (data is not None and not data) or self._main.closed:
99 104 _forwardoutput(self._ui, self._side)
100 105 return ''
101 106 while True:
102 107 mainready, sideready = self._wait()
103 108 if sideready:
104 109 _forwardoutput(self._ui, self._side)
105 110 if mainready:
106 111 meth = getattr(self._main, methname)
107 112 if data is None:
108 113 return meth()
109 114 else:
110 115 return meth(data)
111 116
112 117 def close(self):
113 118 return self._main.close()
114 119
115 120 def flush(self):
116 121 return self._main.flush()
117 122
118 123 class sshpeer(wireproto.wirepeer):
119 124 def __init__(self, ui, path, create=False):
120 125 self._url = path
121 126 self.ui = ui
122 127 self.pipeo = self.pipei = self.pipee = None
123 128
124 129 u = util.url(path, parsequery=False, parsefragment=False)
125 130 if u.scheme != 'ssh' or not u.host or u.path is None:
126 131 self._abort(error.RepoError(_("couldn't parse location %s") % path))
127 132
128 133 self.user = u.user
129 134 if u.passwd is not None:
130 135 self._abort(error.RepoError(_("password in URL not supported")))
131 136 self.host = u.host
132 137 self.port = u.port
133 138 self.path = u.path or "."
134 139
135 140 sshcmd = self.ui.config("ui", "ssh", "ssh")
136 141 remotecmd = self.ui.config("ui", "remotecmd", "hg")
137 142
138 143 args = util.sshargs(sshcmd,
139 144 _serverquote(self.host),
140 145 _serverquote(self.user),
141 146 _serverquote(self.port))
142 147
143 148 if create:
144 149 cmd = '%s %s %s' % (sshcmd, args,
145 150 util.shellquote("%s init %s" %
146 151 (_serverquote(remotecmd), _serverquote(self.path))))
147 152 ui.debug('running %s\n' % cmd)
148 153 res = ui.system(cmd)
149 154 if res != 0:
150 155 self._abort(error.RepoError(_("could not create remote repo")))
151 156
152 157 self._validaterepo(sshcmd, args, remotecmd)
153 158
154 159 def url(self):
155 160 return self._url
156 161
157 162 def _validaterepo(self, sshcmd, args, remotecmd):
158 163 # cleanup up previous run
159 164 self.cleanup()
160 165
161 166 cmd = '%s %s %s' % (sshcmd, args,
162 167 util.shellquote("%s -R %s serve --stdio" %
163 168 (_serverquote(remotecmd), _serverquote(self.path))))
164 169 self.ui.debug('running %s\n' % cmd)
165 170 cmd = util.quotecommand(cmd)
166 171
167 172 # while self.subprocess isn't used, having it allows the subprocess to
168 173 # to clean up correctly later
169 174 #
170 175 # no buffer allow the use of 'select'
171 176 # feel free to remove buffering and select usage when we ultimately
172 177 # move to threading.
173 178 sub = util.popen4(cmd, bufsize=0)
174 179 self.pipeo, self.pipei, self.pipee, self.subprocess = sub
175 180
176 181 self.pipei = util.bufferedinputpipe(self.pipei)
177 182 self.pipei = doublepipe(self.ui, self.pipei, self.pipee)
178 183 self.pipeo = doublepipe(self.ui, self.pipeo, self.pipee)
179 184
180 185 # skip any noise generated by remote shell
181 186 self._callstream("hello")
182 187 r = self._callstream("between", pairs=("%s-%s" % ("0"*40, "0"*40)))
183 188 lines = ["", "dummy"]
184 189 max_noise = 500
185 190 while lines[-1] and max_noise:
186 191 l = r.readline()
187 192 self.readerr()
188 193 if lines[-1] == "1\n" and l == "\n":
189 194 break
190 195 if l:
191 196 self.ui.debug("remote: ", l)
192 197 lines.append(l)
193 198 max_noise -= 1
194 199 else:
195 200 self._abort(error.RepoError(_('no suitable response from '
196 201 'remote hg')))
197 202
198 203 self._caps = set()
199 204 for l in reversed(lines):
200 205 if l.startswith("capabilities:"):
201 206 self._caps.update(l[:-1].split(":")[1].split())
202 207 break
203 208
204 209 def _capabilities(self):
205 210 return self._caps
206 211
207 212 def readerr(self):
208 213 _forwardoutput(self.ui, self.pipee)
209 214
210 215 def _abort(self, exception):
211 216 self.cleanup()
212 217 raise exception
213 218
214 219 def cleanup(self):
215 220 if self.pipeo is None:
216 221 return
217 222 self.pipeo.close()
218 223 self.pipei.close()
219 224 try:
220 225 # read the error descriptor until EOF
221 226 for l in self.pipee:
222 227 self.ui.status(_("remote: "), l)
223 228 except (IOError, ValueError):
224 229 pass
225 230 self.pipee.close()
226 231
227 232 __del__ = cleanup
228 233
229 234 def _callstream(self, cmd, **args):
230 235 self.ui.debug("sending %s command\n" % cmd)
231 236 self.pipeo.write("%s\n" % cmd)
232 237 _func, names = wireproto.commands[cmd]
233 238 keys = names.split()
234 239 wireargs = {}
235 240 for k in keys:
236 241 if k == '*':
237 242 wireargs['*'] = args
238 243 break
239 244 else:
240 245 wireargs[k] = args[k]
241 246 del args[k]
242 247 for k, v in sorted(wireargs.iteritems()):
243 248 self.pipeo.write("%s %d\n" % (k, len(v)))
244 249 if isinstance(v, dict):
245 250 for dk, dv in v.iteritems():
246 251 self.pipeo.write("%s %d\n" % (dk, len(dv)))
247 252 self.pipeo.write(dv)
248 253 else:
249 254 self.pipeo.write(v)
250 255 self.pipeo.flush()
251 256
252 257 return self.pipei
253 258
254 259 def _callcompressable(self, cmd, **args):
255 260 return self._callstream(cmd, **args)
256 261
257 262 def _call(self, cmd, **args):
258 263 self._callstream(cmd, **args)
259 264 return self._recv()
260 265
261 266 def _callpush(self, cmd, fp, **args):
262 267 r = self._call(cmd, **args)
263 268 if r:
264 269 return '', r
265 270 while True:
266 271 d = fp.read(4096)
267 272 if not d:
268 273 break
269 274 self._send(d)
270 275 self._send("", flush=True)
271 276 r = self._recv()
272 277 if r:
273 278 return '', r
274 279 return self._recv(), ''
275 280
276 281 def _calltwowaystream(self, cmd, fp, **args):
277 282 r = self._call(cmd, **args)
278 283 if r:
279 284 # XXX needs to be made better
280 285 raise error.Abort('unexpected remote reply: %s' % r)
281 286 while True:
282 287 d = fp.read(4096)
283 288 if not d:
284 289 break
285 290 self._send(d)
286 291 self._send("", flush=True)
287 292 return self.pipei
288 293
289 294 def _recv(self):
290 295 l = self.pipei.readline()
291 296 if l == '\n':
292 297 self.readerr()
293 298 msg = _('check previous remote output')
294 299 self._abort(error.OutOfBandError(hint=msg))
295 300 self.readerr()
296 301 try:
297 302 l = int(l)
298 303 except ValueError:
299 304 self._abort(error.ResponseError(_("unexpected response:"), l))
300 305 return self.pipei.read(l)
301 306
302 307 def _send(self, data, flush=False):
303 308 self.pipeo.write("%d\n" % len(data))
304 309 if data:
305 310 self.pipeo.write(data)
306 311 if flush:
307 312 self.pipeo.flush()
308 313 self.readerr()
309 314
310 315 def lock(self):
311 316 self._call("lock")
312 317 return remotelock(self)
313 318
314 319 def unlock(self):
315 320 self._call("unlock")
316 321
317 322 def addchangegroup(self, cg, source, url, lock=None):
318 323 '''Send a changegroup to the remote server. Return an integer
319 324 similar to unbundle(). DEPRECATED, since it requires locking the
320 325 remote.'''
321 326 d = self._call("addchangegroup")
322 327 if d:
323 328 self._abort(error.RepoError(_("push refused: %s") % d))
324 329 while True:
325 330 d = cg.read(4096)
326 331 if not d:
327 332 break
328 333 self.pipeo.write(d)
329 334 self.readerr()
330 335
331 336 self.pipeo.flush()
332 337
333 338 self.readerr()
334 339 r = self._recv()
335 340 if not r:
336 341 return 1
337 342 try:
338 343 return int(r)
339 344 except ValueError:
340 345 self._abort(error.ResponseError(_("unexpected response:"), r))
341 346
342 347 instance = sshpeer
General Comments 0
You need to be logged in to leave comments. Login now