##// END OF EJS Templates
sshpeer: extract the forward output logic...
Pierre-Yves David -
r25244:cf90764f default
parent child Browse files
Show More
@@ -1,248 +1,254 b''
1 # sshpeer.py - ssh repository proxy class for mercurial
1 # sshpeer.py - ssh repository proxy class for mercurial
2 #
2 #
3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 import re
8 import re
9 from i18n import _
9 from i18n import _
10 import util, error, wireproto
10 import util, error, wireproto
11
11
12 class remotelock(object):
12 class remotelock(object):
13 def __init__(self, repo):
13 def __init__(self, repo):
14 self.repo = repo
14 self.repo = repo
15 def release(self):
15 def release(self):
16 self.repo.unlock()
16 self.repo.unlock()
17 self.repo = None
17 self.repo = None
18 def __del__(self):
18 def __del__(self):
19 if self.repo:
19 if self.repo:
20 self.release()
20 self.release()
21
21
22 def _serverquote(s):
22 def _serverquote(s):
23 if not s:
23 if not s:
24 return s
24 return s
25 '''quote a string for the remote shell ... which we assume is sh'''
25 '''quote a string for the remote shell ... which we assume is sh'''
26 if re.match('[a-zA-Z0-9@%_+=:,./-]*$', s):
26 if re.match('[a-zA-Z0-9@%_+=:,./-]*$', s):
27 return s
27 return s
28 return "'%s'" % s.replace("'", "'\\''")
28 return "'%s'" % s.replace("'", "'\\''")
29
29
30 def _forwardoutput(ui, pipe):
31 """display all data currently available on pipe as remote output.
32
33 This is non blocking."""
34 s = util.readpipe(pipe)
35 if s:
36 for l in s.splitlines():
37 ui.status(_("remote: "), l, '\n')
38
30 class sshpeer(wireproto.wirepeer):
39 class sshpeer(wireproto.wirepeer):
31 def __init__(self, ui, path, create=False):
40 def __init__(self, ui, path, create=False):
32 self._url = path
41 self._url = path
33 self.ui = ui
42 self.ui = ui
34 self.pipeo = self.pipei = self.pipee = None
43 self.pipeo = self.pipei = self.pipee = None
35
44
36 u = util.url(path, parsequery=False, parsefragment=False)
45 u = util.url(path, parsequery=False, parsefragment=False)
37 if u.scheme != 'ssh' or not u.host or u.path is None:
46 if u.scheme != 'ssh' or not u.host or u.path is None:
38 self._abort(error.RepoError(_("couldn't parse location %s") % path))
47 self._abort(error.RepoError(_("couldn't parse location %s") % path))
39
48
40 self.user = u.user
49 self.user = u.user
41 if u.passwd is not None:
50 if u.passwd is not None:
42 self._abort(error.RepoError(_("password in URL not supported")))
51 self._abort(error.RepoError(_("password in URL not supported")))
43 self.host = u.host
52 self.host = u.host
44 self.port = u.port
53 self.port = u.port
45 self.path = u.path or "."
54 self.path = u.path or "."
46
55
47 sshcmd = self.ui.config("ui", "ssh", "ssh")
56 sshcmd = self.ui.config("ui", "ssh", "ssh")
48 remotecmd = self.ui.config("ui", "remotecmd", "hg")
57 remotecmd = self.ui.config("ui", "remotecmd", "hg")
49
58
50 args = util.sshargs(sshcmd,
59 args = util.sshargs(sshcmd,
51 _serverquote(self.host),
60 _serverquote(self.host),
52 _serverquote(self.user),
61 _serverquote(self.user),
53 _serverquote(self.port))
62 _serverquote(self.port))
54
63
55 if create:
64 if create:
56 cmd = '%s %s %s' % (sshcmd, args,
65 cmd = '%s %s %s' % (sshcmd, args,
57 util.shellquote("%s init %s" %
66 util.shellquote("%s init %s" %
58 (_serverquote(remotecmd), _serverquote(self.path))))
67 (_serverquote(remotecmd), _serverquote(self.path))))
59 ui.debug('running %s\n' % cmd)
68 ui.debug('running %s\n' % cmd)
60 res = ui.system(cmd)
69 res = ui.system(cmd)
61 if res != 0:
70 if res != 0:
62 self._abort(error.RepoError(_("could not create remote repo")))
71 self._abort(error.RepoError(_("could not create remote repo")))
63
72
64 self._validaterepo(sshcmd, args, remotecmd)
73 self._validaterepo(sshcmd, args, remotecmd)
65
74
66 def url(self):
75 def url(self):
67 return self._url
76 return self._url
68
77
69 def _validaterepo(self, sshcmd, args, remotecmd):
78 def _validaterepo(self, sshcmd, args, remotecmd):
70 # cleanup up previous run
79 # cleanup up previous run
71 self.cleanup()
80 self.cleanup()
72
81
73 cmd = '%s %s %s' % (sshcmd, args,
82 cmd = '%s %s %s' % (sshcmd, args,
74 util.shellquote("%s -R %s serve --stdio" %
83 util.shellquote("%s -R %s serve --stdio" %
75 (_serverquote(remotecmd), _serverquote(self.path))))
84 (_serverquote(remotecmd), _serverquote(self.path))))
76 self.ui.debug('running %s\n' % cmd)
85 self.ui.debug('running %s\n' % cmd)
77 cmd = util.quotecommand(cmd)
86 cmd = util.quotecommand(cmd)
78
87
79 # while self.subprocess isn't used, having it allows the subprocess to
88 # while self.subprocess isn't used, having it allows the subprocess to
80 # to clean up correctly later
89 # to clean up correctly later
81 self.pipeo, self.pipei, self.pipee, self.subprocess = util.popen4(cmd)
90 self.pipeo, self.pipei, self.pipee, self.subprocess = util.popen4(cmd)
82
91
83 # skip any noise generated by remote shell
92 # skip any noise generated by remote shell
84 self._callstream("hello")
93 self._callstream("hello")
85 r = self._callstream("between", pairs=("%s-%s" % ("0"*40, "0"*40)))
94 r = self._callstream("between", pairs=("%s-%s" % ("0"*40, "0"*40)))
86 lines = ["", "dummy"]
95 lines = ["", "dummy"]
87 max_noise = 500
96 max_noise = 500
88 while lines[-1] and max_noise:
97 while lines[-1] and max_noise:
89 l = r.readline()
98 l = r.readline()
90 self.readerr()
99 self.readerr()
91 if lines[-1] == "1\n" and l == "\n":
100 if lines[-1] == "1\n" and l == "\n":
92 break
101 break
93 if l:
102 if l:
94 self.ui.debug("remote: ", l)
103 self.ui.debug("remote: ", l)
95 lines.append(l)
104 lines.append(l)
96 max_noise -= 1
105 max_noise -= 1
97 else:
106 else:
98 self._abort(error.RepoError(_('no suitable response from '
107 self._abort(error.RepoError(_('no suitable response from '
99 'remote hg')))
108 'remote hg')))
100
109
101 self._caps = set()
110 self._caps = set()
102 for l in reversed(lines):
111 for l in reversed(lines):
103 if l.startswith("capabilities:"):
112 if l.startswith("capabilities:"):
104 self._caps.update(l[:-1].split(":")[1].split())
113 self._caps.update(l[:-1].split(":")[1].split())
105 break
114 break
106
115
107 def _capabilities(self):
116 def _capabilities(self):
108 return self._caps
117 return self._caps
109
118
110 def readerr(self):
119 def readerr(self):
111 s = util.readpipe(self.pipee)
120 _forwardoutput(self.ui, self.pipee)
112 if s:
113 for l in s.splitlines():
114 self.ui.status(_("remote: "), l, '\n')
115
121
116 def _abort(self, exception):
122 def _abort(self, exception):
117 self.cleanup()
123 self.cleanup()
118 raise exception
124 raise exception
119
125
120 def cleanup(self):
126 def cleanup(self):
121 if self.pipeo is None:
127 if self.pipeo is None:
122 return
128 return
123 self.pipeo.close()
129 self.pipeo.close()
124 self.pipei.close()
130 self.pipei.close()
125 try:
131 try:
126 # read the error descriptor until EOF
132 # read the error descriptor until EOF
127 for l in self.pipee:
133 for l in self.pipee:
128 self.ui.status(_("remote: "), l)
134 self.ui.status(_("remote: "), l)
129 except (IOError, ValueError):
135 except (IOError, ValueError):
130 pass
136 pass
131 self.pipee.close()
137 self.pipee.close()
132
138
133 __del__ = cleanup
139 __del__ = cleanup
134
140
135 def _callstream(self, cmd, **args):
141 def _callstream(self, cmd, **args):
136 self.ui.debug("sending %s command\n" % cmd)
142 self.ui.debug("sending %s command\n" % cmd)
137 self.pipeo.write("%s\n" % cmd)
143 self.pipeo.write("%s\n" % cmd)
138 _func, names = wireproto.commands[cmd]
144 _func, names = wireproto.commands[cmd]
139 keys = names.split()
145 keys = names.split()
140 wireargs = {}
146 wireargs = {}
141 for k in keys:
147 for k in keys:
142 if k == '*':
148 if k == '*':
143 wireargs['*'] = args
149 wireargs['*'] = args
144 break
150 break
145 else:
151 else:
146 wireargs[k] = args[k]
152 wireargs[k] = args[k]
147 del args[k]
153 del args[k]
148 for k, v in sorted(wireargs.iteritems()):
154 for k, v in sorted(wireargs.iteritems()):
149 self.pipeo.write("%s %d\n" % (k, len(v)))
155 self.pipeo.write("%s %d\n" % (k, len(v)))
150 if isinstance(v, dict):
156 if isinstance(v, dict):
151 for dk, dv in v.iteritems():
157 for dk, dv in v.iteritems():
152 self.pipeo.write("%s %d\n" % (dk, len(dv)))
158 self.pipeo.write("%s %d\n" % (dk, len(dv)))
153 self.pipeo.write(dv)
159 self.pipeo.write(dv)
154 else:
160 else:
155 self.pipeo.write(v)
161 self.pipeo.write(v)
156 self.pipeo.flush()
162 self.pipeo.flush()
157
163
158 return self.pipei
164 return self.pipei
159
165
160 def _callcompressable(self, cmd, **args):
166 def _callcompressable(self, cmd, **args):
161 return self._callstream(cmd, **args)
167 return self._callstream(cmd, **args)
162
168
163 def _call(self, cmd, **args):
169 def _call(self, cmd, **args):
164 self._callstream(cmd, **args)
170 self._callstream(cmd, **args)
165 return self._recv()
171 return self._recv()
166
172
167 def _callpush(self, cmd, fp, **args):
173 def _callpush(self, cmd, fp, **args):
168 r = self._call(cmd, **args)
174 r = self._call(cmd, **args)
169 if r:
175 if r:
170 return '', r
176 return '', r
171 while True:
177 while True:
172 d = fp.read(4096)
178 d = fp.read(4096)
173 if not d:
179 if not d:
174 break
180 break
175 self._send(d)
181 self._send(d)
176 self._send("", flush=True)
182 self._send("", flush=True)
177 r = self._recv()
183 r = self._recv()
178 if r:
184 if r:
179 return '', r
185 return '', r
180 return self._recv(), ''
186 return self._recv(), ''
181
187
182 def _calltwowaystream(self, cmd, fp, **args):
188 def _calltwowaystream(self, cmd, fp, **args):
183 r = self._call(cmd, **args)
189 r = self._call(cmd, **args)
184 if r:
190 if r:
185 # XXX needs to be made better
191 # XXX needs to be made better
186 raise util.Abort('unexpected remote reply: %s' % r)
192 raise util.Abort('unexpected remote reply: %s' % r)
187 while True:
193 while True:
188 d = fp.read(4096)
194 d = fp.read(4096)
189 if not d:
195 if not d:
190 break
196 break
191 self._send(d)
197 self._send(d)
192 self._send("", flush=True)
198 self._send("", flush=True)
193 return self.pipei
199 return self.pipei
194
200
195 def _recv(self):
201 def _recv(self):
196 l = self.pipei.readline()
202 l = self.pipei.readline()
197 if l == '\n':
203 if l == '\n':
198 self.readerr()
204 self.readerr()
199 msg = _('check previous remote output')
205 msg = _('check previous remote output')
200 self._abort(error.OutOfBandError(hint=msg))
206 self._abort(error.OutOfBandError(hint=msg))
201 self.readerr()
207 self.readerr()
202 try:
208 try:
203 l = int(l)
209 l = int(l)
204 except ValueError:
210 except ValueError:
205 self._abort(error.ResponseError(_("unexpected response:"), l))
211 self._abort(error.ResponseError(_("unexpected response:"), l))
206 return self.pipei.read(l)
212 return self.pipei.read(l)
207
213
208 def _send(self, data, flush=False):
214 def _send(self, data, flush=False):
209 self.pipeo.write("%d\n" % len(data))
215 self.pipeo.write("%d\n" % len(data))
210 if data:
216 if data:
211 self.pipeo.write(data)
217 self.pipeo.write(data)
212 if flush:
218 if flush:
213 self.pipeo.flush()
219 self.pipeo.flush()
214 self.readerr()
220 self.readerr()
215
221
216 def lock(self):
222 def lock(self):
217 self._call("lock")
223 self._call("lock")
218 return remotelock(self)
224 return remotelock(self)
219
225
220 def unlock(self):
226 def unlock(self):
221 self._call("unlock")
227 self._call("unlock")
222
228
223 def addchangegroup(self, cg, source, url, lock=None):
229 def addchangegroup(self, cg, source, url, lock=None):
224 '''Send a changegroup to the remote server. Return an integer
230 '''Send a changegroup to the remote server. Return an integer
225 similar to unbundle(). DEPRECATED, since it requires locking the
231 similar to unbundle(). DEPRECATED, since it requires locking the
226 remote.'''
232 remote.'''
227 d = self._call("addchangegroup")
233 d = self._call("addchangegroup")
228 if d:
234 if d:
229 self._abort(error.RepoError(_("push refused: %s") % d))
235 self._abort(error.RepoError(_("push refused: %s") % d))
230 while True:
236 while True:
231 d = cg.read(4096)
237 d = cg.read(4096)
232 if not d:
238 if not d:
233 break
239 break
234 self.pipeo.write(d)
240 self.pipeo.write(d)
235 self.readerr()
241 self.readerr()
236
242
237 self.pipeo.flush()
243 self.pipeo.flush()
238
244
239 self.readerr()
245 self.readerr()
240 r = self._recv()
246 r = self._recv()
241 if not r:
247 if not r:
242 return 1
248 return 1
243 try:
249 try:
244 return int(r)
250 return int(r)
245 except ValueError:
251 except ValueError:
246 self._abort(error.ResponseError(_("unexpected response:"), r))
252 self._abort(error.ResponseError(_("unexpected response:"), r))
247
253
248 instance = sshpeer
254 instance = sshpeer
General Comments 0
You need to be logged in to leave comments. Login now