##// END OF EJS Templates
sshpeer: extract the forward output logic...
Pierre-Yves David -
r25244:cf90764f default
parent child Browse files
Show More
@@ -1,248 +1,254 b''
1 1 # sshpeer.py - ssh repository proxy class for mercurial
2 2 #
3 3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 import re
9 9 from i18n import _
10 10 import util, error, wireproto
11 11
12 12 class remotelock(object):
13 13 def __init__(self, repo):
14 14 self.repo = repo
15 15 def release(self):
16 16 self.repo.unlock()
17 17 self.repo = None
18 18 def __del__(self):
19 19 if self.repo:
20 20 self.release()
21 21
22 22 def _serverquote(s):
23 23 if not s:
24 24 return s
25 25 '''quote a string for the remote shell ... which we assume is sh'''
26 26 if re.match('[a-zA-Z0-9@%_+=:,./-]*$', s):
27 27 return s
28 28 return "'%s'" % s.replace("'", "'\\''")
29 29
30 def _forwardoutput(ui, pipe):
31 """display all data currently available on pipe as remote output.
32
33 This is non blocking."""
34 s = util.readpipe(pipe)
35 if s:
36 for l in s.splitlines():
37 ui.status(_("remote: "), l, '\n')
38
30 39 class sshpeer(wireproto.wirepeer):
31 40 def __init__(self, ui, path, create=False):
32 41 self._url = path
33 42 self.ui = ui
34 43 self.pipeo = self.pipei = self.pipee = None
35 44
36 45 u = util.url(path, parsequery=False, parsefragment=False)
37 46 if u.scheme != 'ssh' or not u.host or u.path is None:
38 47 self._abort(error.RepoError(_("couldn't parse location %s") % path))
39 48
40 49 self.user = u.user
41 50 if u.passwd is not None:
42 51 self._abort(error.RepoError(_("password in URL not supported")))
43 52 self.host = u.host
44 53 self.port = u.port
45 54 self.path = u.path or "."
46 55
47 56 sshcmd = self.ui.config("ui", "ssh", "ssh")
48 57 remotecmd = self.ui.config("ui", "remotecmd", "hg")
49 58
50 59 args = util.sshargs(sshcmd,
51 60 _serverquote(self.host),
52 61 _serverquote(self.user),
53 62 _serverquote(self.port))
54 63
55 64 if create:
56 65 cmd = '%s %s %s' % (sshcmd, args,
57 66 util.shellquote("%s init %s" %
58 67 (_serverquote(remotecmd), _serverquote(self.path))))
59 68 ui.debug('running %s\n' % cmd)
60 69 res = ui.system(cmd)
61 70 if res != 0:
62 71 self._abort(error.RepoError(_("could not create remote repo")))
63 72
64 73 self._validaterepo(sshcmd, args, remotecmd)
65 74
66 75 def url(self):
67 76 return self._url
68 77
69 78 def _validaterepo(self, sshcmd, args, remotecmd):
70 79 # cleanup up previous run
71 80 self.cleanup()
72 81
73 82 cmd = '%s %s %s' % (sshcmd, args,
74 83 util.shellquote("%s -R %s serve --stdio" %
75 84 (_serverquote(remotecmd), _serverquote(self.path))))
76 85 self.ui.debug('running %s\n' % cmd)
77 86 cmd = util.quotecommand(cmd)
78 87
79 88 # while self.subprocess isn't used, having it allows the subprocess to
80 89 # to clean up correctly later
81 90 self.pipeo, self.pipei, self.pipee, self.subprocess = util.popen4(cmd)
82 91
83 92 # skip any noise generated by remote shell
84 93 self._callstream("hello")
85 94 r = self._callstream("between", pairs=("%s-%s" % ("0"*40, "0"*40)))
86 95 lines = ["", "dummy"]
87 96 max_noise = 500
88 97 while lines[-1] and max_noise:
89 98 l = r.readline()
90 99 self.readerr()
91 100 if lines[-1] == "1\n" and l == "\n":
92 101 break
93 102 if l:
94 103 self.ui.debug("remote: ", l)
95 104 lines.append(l)
96 105 max_noise -= 1
97 106 else:
98 107 self._abort(error.RepoError(_('no suitable response from '
99 108 'remote hg')))
100 109
101 110 self._caps = set()
102 111 for l in reversed(lines):
103 112 if l.startswith("capabilities:"):
104 113 self._caps.update(l[:-1].split(":")[1].split())
105 114 break
106 115
107 116 def _capabilities(self):
108 117 return self._caps
109 118
110 119 def readerr(self):
111 s = util.readpipe(self.pipee)
112 if s:
113 for l in s.splitlines():
114 self.ui.status(_("remote: "), l, '\n')
120 _forwardoutput(self.ui, self.pipee)
115 121
116 122 def _abort(self, exception):
117 123 self.cleanup()
118 124 raise exception
119 125
120 126 def cleanup(self):
121 127 if self.pipeo is None:
122 128 return
123 129 self.pipeo.close()
124 130 self.pipei.close()
125 131 try:
126 132 # read the error descriptor until EOF
127 133 for l in self.pipee:
128 134 self.ui.status(_("remote: "), l)
129 135 except (IOError, ValueError):
130 136 pass
131 137 self.pipee.close()
132 138
133 139 __del__ = cleanup
134 140
135 141 def _callstream(self, cmd, **args):
136 142 self.ui.debug("sending %s command\n" % cmd)
137 143 self.pipeo.write("%s\n" % cmd)
138 144 _func, names = wireproto.commands[cmd]
139 145 keys = names.split()
140 146 wireargs = {}
141 147 for k in keys:
142 148 if k == '*':
143 149 wireargs['*'] = args
144 150 break
145 151 else:
146 152 wireargs[k] = args[k]
147 153 del args[k]
148 154 for k, v in sorted(wireargs.iteritems()):
149 155 self.pipeo.write("%s %d\n" % (k, len(v)))
150 156 if isinstance(v, dict):
151 157 for dk, dv in v.iteritems():
152 158 self.pipeo.write("%s %d\n" % (dk, len(dv)))
153 159 self.pipeo.write(dv)
154 160 else:
155 161 self.pipeo.write(v)
156 162 self.pipeo.flush()
157 163
158 164 return self.pipei
159 165
160 166 def _callcompressable(self, cmd, **args):
161 167 return self._callstream(cmd, **args)
162 168
163 169 def _call(self, cmd, **args):
164 170 self._callstream(cmd, **args)
165 171 return self._recv()
166 172
167 173 def _callpush(self, cmd, fp, **args):
168 174 r = self._call(cmd, **args)
169 175 if r:
170 176 return '', r
171 177 while True:
172 178 d = fp.read(4096)
173 179 if not d:
174 180 break
175 181 self._send(d)
176 182 self._send("", flush=True)
177 183 r = self._recv()
178 184 if r:
179 185 return '', r
180 186 return self._recv(), ''
181 187
182 188 def _calltwowaystream(self, cmd, fp, **args):
183 189 r = self._call(cmd, **args)
184 190 if r:
185 191 # XXX needs to be made better
186 192 raise util.Abort('unexpected remote reply: %s' % r)
187 193 while True:
188 194 d = fp.read(4096)
189 195 if not d:
190 196 break
191 197 self._send(d)
192 198 self._send("", flush=True)
193 199 return self.pipei
194 200
195 201 def _recv(self):
196 202 l = self.pipei.readline()
197 203 if l == '\n':
198 204 self.readerr()
199 205 msg = _('check previous remote output')
200 206 self._abort(error.OutOfBandError(hint=msg))
201 207 self.readerr()
202 208 try:
203 209 l = int(l)
204 210 except ValueError:
205 211 self._abort(error.ResponseError(_("unexpected response:"), l))
206 212 return self.pipei.read(l)
207 213
208 214 def _send(self, data, flush=False):
209 215 self.pipeo.write("%d\n" % len(data))
210 216 if data:
211 217 self.pipeo.write(data)
212 218 if flush:
213 219 self.pipeo.flush()
214 220 self.readerr()
215 221
216 222 def lock(self):
217 223 self._call("lock")
218 224 return remotelock(self)
219 225
220 226 def unlock(self):
221 227 self._call("unlock")
222 228
223 229 def addchangegroup(self, cg, source, url, lock=None):
224 230 '''Send a changegroup to the remote server. Return an integer
225 231 similar to unbundle(). DEPRECATED, since it requires locking the
226 232 remote.'''
227 233 d = self._call("addchangegroup")
228 234 if d:
229 235 self._abort(error.RepoError(_("push refused: %s") % d))
230 236 while True:
231 237 d = cg.read(4096)
232 238 if not d:
233 239 break
234 240 self.pipeo.write(d)
235 241 self.readerr()
236 242
237 243 self.pipeo.flush()
238 244
239 245 self.readerr()
240 246 r = self._recv()
241 247 if not r:
242 248 return 1
243 249 try:
244 250 return int(r)
245 251 except ValueError:
246 252 self._abort(error.ResponseError(_("unexpected response:"), r))
247 253
248 254 instance = sshpeer
General Comments 0
You need to be logged in to leave comments. Login now