##// END OF EJS Templates
sshpeer: check pipe validity before forwarding output from it...
Matt Harbison -
r36851:46f4d71e default
parent child Browse files
Show More
@@ -1,612 +1,613
1 # sshpeer.py - ssh repository proxy class for mercurial
1 # sshpeer.py - ssh repository proxy class for mercurial
2 #
2 #
3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import re
10 import re
11 import uuid
11 import uuid
12
12
13 from .i18n import _
13 from .i18n import _
14 from . import (
14 from . import (
15 error,
15 error,
16 pycompat,
16 pycompat,
17 util,
17 util,
18 wireproto,
18 wireproto,
19 wireprotoserver,
19 wireprotoserver,
20 wireprototypes,
20 wireprototypes,
21 )
21 )
22
22
23 def _serverquote(s):
23 def _serverquote(s):
24 """quote a string for the remote shell ... which we assume is sh"""
24 """quote a string for the remote shell ... which we assume is sh"""
25 if not s:
25 if not s:
26 return s
26 return s
27 if re.match('[a-zA-Z0-9@%_+=:,./-]*$', s):
27 if re.match('[a-zA-Z0-9@%_+=:,./-]*$', s):
28 return s
28 return s
29 return "'%s'" % s.replace("'", "'\\''")
29 return "'%s'" % s.replace("'", "'\\''")
30
30
31 def _forwardoutput(ui, pipe):
31 def _forwardoutput(ui, pipe):
32 """display all data currently available on pipe as remote output.
32 """display all data currently available on pipe as remote output.
33
33
34 This is non blocking."""
34 This is non blocking."""
35 if pipe:
35 s = util.readpipe(pipe)
36 s = util.readpipe(pipe)
36 if s:
37 if s:
37 for l in s.splitlines():
38 for l in s.splitlines():
38 ui.status(_("remote: "), l, '\n')
39 ui.status(_("remote: "), l, '\n')
39
40
40 class doublepipe(object):
41 class doublepipe(object):
41 """Operate a side-channel pipe in addition of a main one
42 """Operate a side-channel pipe in addition of a main one
42
43
43 The side-channel pipe contains server output to be forwarded to the user
44 The side-channel pipe contains server output to be forwarded to the user
44 input. The double pipe will behave as the "main" pipe, but will ensure the
45 input. The double pipe will behave as the "main" pipe, but will ensure the
45 content of the "side" pipe is properly processed while we wait for blocking
46 content of the "side" pipe is properly processed while we wait for blocking
46 call on the "main" pipe.
47 call on the "main" pipe.
47
48
48 If large amounts of data are read from "main", the forward will cease after
49 If large amounts of data are read from "main", the forward will cease after
49 the first bytes start to appear. This simplifies the implementation
50 the first bytes start to appear. This simplifies the implementation
50 without affecting actual output of sshpeer too much as we rarely issue
51 without affecting actual output of sshpeer too much as we rarely issue
51 large read for data not yet emitted by the server.
52 large read for data not yet emitted by the server.
52
53
53 The main pipe is expected to be a 'bufferedinputpipe' from the util module
54 The main pipe is expected to be a 'bufferedinputpipe' from the util module
54 that handle all the os specific bits. This class lives in this module
55 that handle all the os specific bits. This class lives in this module
55 because it focus on behavior specific to the ssh protocol."""
56 because it focus on behavior specific to the ssh protocol."""
56
57
57 def __init__(self, ui, main, side):
58 def __init__(self, ui, main, side):
58 self._ui = ui
59 self._ui = ui
59 self._main = main
60 self._main = main
60 self._side = side
61 self._side = side
61
62
62 def _wait(self):
63 def _wait(self):
63 """wait until some data are available on main or side
64 """wait until some data are available on main or side
64
65
65 return a pair of boolean (ismainready, issideready)
66 return a pair of boolean (ismainready, issideready)
66
67
67 (This will only wait for data if the setup is supported by `util.poll`)
68 (This will only wait for data if the setup is supported by `util.poll`)
68 """
69 """
69 if (isinstance(self._main, util.bufferedinputpipe) and
70 if (isinstance(self._main, util.bufferedinputpipe) and
70 self._main.hasbuffer):
71 self._main.hasbuffer):
71 # Main has data. Assume side is worth poking at.
72 # Main has data. Assume side is worth poking at.
72 return True, True
73 return True, True
73
74
74 fds = [self._main.fileno(), self._side.fileno()]
75 fds = [self._main.fileno(), self._side.fileno()]
75 try:
76 try:
76 act = util.poll(fds)
77 act = util.poll(fds)
77 except NotImplementedError:
78 except NotImplementedError:
78 # non supported yet case, assume all have data.
79 # non supported yet case, assume all have data.
79 act = fds
80 act = fds
80 return (self._main.fileno() in act, self._side.fileno() in act)
81 return (self._main.fileno() in act, self._side.fileno() in act)
81
82
82 def write(self, data):
83 def write(self, data):
83 return self._call('write', data)
84 return self._call('write', data)
84
85
85 def read(self, size):
86 def read(self, size):
86 r = self._call('read', size)
87 r = self._call('read', size)
87 if size != 0 and not r:
88 if size != 0 and not r:
88 # We've observed a condition that indicates the
89 # We've observed a condition that indicates the
89 # stdout closed unexpectedly. Check stderr one
90 # stdout closed unexpectedly. Check stderr one
90 # more time and snag anything that's there before
91 # more time and snag anything that's there before
91 # letting anyone know the main part of the pipe
92 # letting anyone know the main part of the pipe
92 # closed prematurely.
93 # closed prematurely.
93 _forwardoutput(self._ui, self._side)
94 _forwardoutput(self._ui, self._side)
94 return r
95 return r
95
96
96 def readline(self):
97 def readline(self):
97 return self._call('readline')
98 return self._call('readline')
98
99
99 def _call(self, methname, data=None):
100 def _call(self, methname, data=None):
100 """call <methname> on "main", forward output of "side" while blocking
101 """call <methname> on "main", forward output of "side" while blocking
101 """
102 """
102 # data can be '' or 0
103 # data can be '' or 0
103 if (data is not None and not data) or self._main.closed:
104 if (data is not None and not data) or self._main.closed:
104 _forwardoutput(self._ui, self._side)
105 _forwardoutput(self._ui, self._side)
105 return ''
106 return ''
106 while True:
107 while True:
107 mainready, sideready = self._wait()
108 mainready, sideready = self._wait()
108 if sideready:
109 if sideready:
109 _forwardoutput(self._ui, self._side)
110 _forwardoutput(self._ui, self._side)
110 if mainready:
111 if mainready:
111 meth = getattr(self._main, methname)
112 meth = getattr(self._main, methname)
112 if data is None:
113 if data is None:
113 return meth()
114 return meth()
114 else:
115 else:
115 return meth(data)
116 return meth(data)
116
117
117 def close(self):
118 def close(self):
118 return self._main.close()
119 return self._main.close()
119
120
120 def flush(self):
121 def flush(self):
121 return self._main.flush()
122 return self._main.flush()
122
123
123 def _cleanuppipes(ui, pipei, pipeo, pipee):
124 def _cleanuppipes(ui, pipei, pipeo, pipee):
124 """Clean up pipes used by an SSH connection."""
125 """Clean up pipes used by an SSH connection."""
125 if pipeo:
126 if pipeo:
126 pipeo.close()
127 pipeo.close()
127 if pipei:
128 if pipei:
128 pipei.close()
129 pipei.close()
129
130
130 if pipee:
131 if pipee:
131 # Try to read from the err descriptor until EOF.
132 # Try to read from the err descriptor until EOF.
132 try:
133 try:
133 for l in pipee:
134 for l in pipee:
134 ui.status(_('remote: '), l)
135 ui.status(_('remote: '), l)
135 except (IOError, ValueError):
136 except (IOError, ValueError):
136 pass
137 pass
137
138
138 pipee.close()
139 pipee.close()
139
140
140 def _makeconnection(ui, sshcmd, args, remotecmd, path, sshenv=None):
141 def _makeconnection(ui, sshcmd, args, remotecmd, path, sshenv=None):
141 """Create an SSH connection to a server.
142 """Create an SSH connection to a server.
142
143
143 Returns a tuple of (process, stdin, stdout, stderr) for the
144 Returns a tuple of (process, stdin, stdout, stderr) for the
144 spawned process.
145 spawned process.
145 """
146 """
146 cmd = '%s %s %s' % (
147 cmd = '%s %s %s' % (
147 sshcmd,
148 sshcmd,
148 args,
149 args,
149 util.shellquote('%s -R %s serve --stdio' % (
150 util.shellquote('%s -R %s serve --stdio' % (
150 _serverquote(remotecmd), _serverquote(path))))
151 _serverquote(remotecmd), _serverquote(path))))
151
152
152 ui.debug('running %s\n' % cmd)
153 ui.debug('running %s\n' % cmd)
153 cmd = util.quotecommand(cmd)
154 cmd = util.quotecommand(cmd)
154
155
155 # no buffer allow the use of 'select'
156 # no buffer allow the use of 'select'
156 # feel free to remove buffering and select usage when we ultimately
157 # feel free to remove buffering and select usage when we ultimately
157 # move to threading.
158 # move to threading.
158 stdin, stdout, stderr, proc = util.popen4(cmd, bufsize=0, env=sshenv)
159 stdin, stdout, stderr, proc = util.popen4(cmd, bufsize=0, env=sshenv)
159
160
160 return proc, stdin, stdout, stderr
161 return proc, stdin, stdout, stderr
161
162
162 def _performhandshake(ui, stdin, stdout, stderr):
163 def _performhandshake(ui, stdin, stdout, stderr):
163 def badresponse():
164 def badresponse():
164 # Flush any output on stderr.
165 # Flush any output on stderr.
165 _forwardoutput(ui, stderr)
166 _forwardoutput(ui, stderr)
166
167
167 msg = _('no suitable response from remote hg')
168 msg = _('no suitable response from remote hg')
168 hint = ui.config('ui', 'ssherrorhint')
169 hint = ui.config('ui', 'ssherrorhint')
169 raise error.RepoError(msg, hint=hint)
170 raise error.RepoError(msg, hint=hint)
170
171
171 # The handshake consists of sending wire protocol commands in reverse
172 # The handshake consists of sending wire protocol commands in reverse
172 # order of protocol implementation and then sniffing for a response
173 # order of protocol implementation and then sniffing for a response
173 # to one of them.
174 # to one of them.
174 #
175 #
175 # Those commands (from oldest to newest) are:
176 # Those commands (from oldest to newest) are:
176 #
177 #
177 # ``between``
178 # ``between``
178 # Asks for the set of revisions between a pair of revisions. Command
179 # Asks for the set of revisions between a pair of revisions. Command
179 # present in all Mercurial server implementations.
180 # present in all Mercurial server implementations.
180 #
181 #
181 # ``hello``
182 # ``hello``
182 # Instructs the server to advertise its capabilities. Introduced in
183 # Instructs the server to advertise its capabilities. Introduced in
183 # Mercurial 0.9.1.
184 # Mercurial 0.9.1.
184 #
185 #
185 # ``upgrade``
186 # ``upgrade``
186 # Requests upgrade from default transport protocol version 1 to
187 # Requests upgrade from default transport protocol version 1 to
187 # a newer version. Introduced in Mercurial 4.6 as an experimental
188 # a newer version. Introduced in Mercurial 4.6 as an experimental
188 # feature.
189 # feature.
189 #
190 #
190 # The ``between`` command is issued with a request for the null
191 # The ``between`` command is issued with a request for the null
191 # range. If the remote is a Mercurial server, this request will
192 # range. If the remote is a Mercurial server, this request will
192 # generate a specific response: ``1\n\n``. This represents the
193 # generate a specific response: ``1\n\n``. This represents the
193 # wire protocol encoded value for ``\n``. We look for ``1\n\n``
194 # wire protocol encoded value for ``\n``. We look for ``1\n\n``
194 # in the output stream and know this is the response to ``between``
195 # in the output stream and know this is the response to ``between``
195 # and we're at the end of our handshake reply.
196 # and we're at the end of our handshake reply.
196 #
197 #
197 # The response to the ``hello`` command will be a line with the
198 # The response to the ``hello`` command will be a line with the
198 # length of the value returned by that command followed by that
199 # length of the value returned by that command followed by that
199 # value. If the server doesn't support ``hello`` (which should be
200 # value. If the server doesn't support ``hello`` (which should be
200 # rare), that line will be ``0\n``. Otherwise, the value will contain
201 # rare), that line will be ``0\n``. Otherwise, the value will contain
201 # RFC 822 like lines. Of these, the ``capabilities:`` line contains
202 # RFC 822 like lines. Of these, the ``capabilities:`` line contains
202 # the capabilities of the server.
203 # the capabilities of the server.
203 #
204 #
204 # The ``upgrade`` command isn't really a command in the traditional
205 # The ``upgrade`` command isn't really a command in the traditional
205 # sense of version 1 of the transport because it isn't using the
206 # sense of version 1 of the transport because it isn't using the
206 # proper mechanism for formatting insteads: instead, it just encodes
207 # proper mechanism for formatting insteads: instead, it just encodes
207 # arguments on the line, delimited by spaces.
208 # arguments on the line, delimited by spaces.
208 #
209 #
209 # The ``upgrade`` line looks like ``upgrade <token> <capabilities>``.
210 # The ``upgrade`` line looks like ``upgrade <token> <capabilities>``.
210 # If the server doesn't support protocol upgrades, it will reply to
211 # If the server doesn't support protocol upgrades, it will reply to
211 # this line with ``0\n``. Otherwise, it emits an
212 # this line with ``0\n``. Otherwise, it emits an
212 # ``upgraded <token> <protocol>`` line to both stdout and stderr.
213 # ``upgraded <token> <protocol>`` line to both stdout and stderr.
213 # Content immediately following this line describes additional
214 # Content immediately following this line describes additional
214 # protocol and server state.
215 # protocol and server state.
215 #
216 #
216 # In addition to the responses to our command requests, the server
217 # In addition to the responses to our command requests, the server
217 # may emit "banner" output on stdout. SSH servers are allowed to
218 # may emit "banner" output on stdout. SSH servers are allowed to
218 # print messages to stdout on login. Issuing commands on connection
219 # print messages to stdout on login. Issuing commands on connection
219 # allows us to flush this banner output from the server by scanning
220 # allows us to flush this banner output from the server by scanning
220 # for output to our well-known ``between`` command. Of course, if
221 # for output to our well-known ``between`` command. Of course, if
221 # the banner contains ``1\n\n``, this will throw off our detection.
222 # the banner contains ``1\n\n``, this will throw off our detection.
222
223
223 requestlog = ui.configbool('devel', 'debug.peer-request')
224 requestlog = ui.configbool('devel', 'debug.peer-request')
224
225
225 # Generate a random token to help identify responses to version 2
226 # Generate a random token to help identify responses to version 2
226 # upgrade request.
227 # upgrade request.
227 token = pycompat.sysbytes(str(uuid.uuid4()))
228 token = pycompat.sysbytes(str(uuid.uuid4()))
228 upgradecaps = [
229 upgradecaps = [
229 ('proto', wireprotoserver.SSHV2),
230 ('proto', wireprotoserver.SSHV2),
230 ]
231 ]
231 upgradecaps = util.urlreq.urlencode(upgradecaps)
232 upgradecaps = util.urlreq.urlencode(upgradecaps)
232
233
233 try:
234 try:
234 pairsarg = '%s-%s' % ('0' * 40, '0' * 40)
235 pairsarg = '%s-%s' % ('0' * 40, '0' * 40)
235 handshake = [
236 handshake = [
236 'hello\n',
237 'hello\n',
237 'between\n',
238 'between\n',
238 'pairs %d\n' % len(pairsarg),
239 'pairs %d\n' % len(pairsarg),
239 pairsarg,
240 pairsarg,
240 ]
241 ]
241
242
242 # Request upgrade to version 2 if configured.
243 # Request upgrade to version 2 if configured.
243 if ui.configbool('experimental', 'sshpeer.advertise-v2'):
244 if ui.configbool('experimental', 'sshpeer.advertise-v2'):
244 ui.debug('sending upgrade request: %s %s\n' % (token, upgradecaps))
245 ui.debug('sending upgrade request: %s %s\n' % (token, upgradecaps))
245 handshake.insert(0, 'upgrade %s %s\n' % (token, upgradecaps))
246 handshake.insert(0, 'upgrade %s %s\n' % (token, upgradecaps))
246
247
247 if requestlog:
248 if requestlog:
248 ui.debug('devel-peer-request: hello\n')
249 ui.debug('devel-peer-request: hello\n')
249 ui.debug('sending hello command\n')
250 ui.debug('sending hello command\n')
250 if requestlog:
251 if requestlog:
251 ui.debug('devel-peer-request: between\n')
252 ui.debug('devel-peer-request: between\n')
252 ui.debug('devel-peer-request: pairs: %d bytes\n' % len(pairsarg))
253 ui.debug('devel-peer-request: pairs: %d bytes\n' % len(pairsarg))
253 ui.debug('sending between command\n')
254 ui.debug('sending between command\n')
254
255
255 stdin.write(''.join(handshake))
256 stdin.write(''.join(handshake))
256 stdin.flush()
257 stdin.flush()
257 except IOError:
258 except IOError:
258 badresponse()
259 badresponse()
259
260
260 # Assume version 1 of wire protocol by default.
261 # Assume version 1 of wire protocol by default.
261 protoname = wireprototypes.SSHV1
262 protoname = wireprototypes.SSHV1
262 reupgraded = re.compile(b'^upgraded %s (.*)$' % re.escape(token))
263 reupgraded = re.compile(b'^upgraded %s (.*)$' % re.escape(token))
263
264
264 lines = ['', 'dummy']
265 lines = ['', 'dummy']
265 max_noise = 500
266 max_noise = 500
266 while lines[-1] and max_noise:
267 while lines[-1] and max_noise:
267 try:
268 try:
268 l = stdout.readline()
269 l = stdout.readline()
269 _forwardoutput(ui, stderr)
270 _forwardoutput(ui, stderr)
270
271
271 # Look for reply to protocol upgrade request. It has a token
272 # Look for reply to protocol upgrade request. It has a token
272 # in it, so there should be no false positives.
273 # in it, so there should be no false positives.
273 m = reupgraded.match(l)
274 m = reupgraded.match(l)
274 if m:
275 if m:
275 protoname = m.group(1)
276 protoname = m.group(1)
276 ui.debug('protocol upgraded to %s\n' % protoname)
277 ui.debug('protocol upgraded to %s\n' % protoname)
277 # If an upgrade was handled, the ``hello`` and ``between``
278 # If an upgrade was handled, the ``hello`` and ``between``
278 # requests are ignored. The next output belongs to the
279 # requests are ignored. The next output belongs to the
279 # protocol, so stop scanning lines.
280 # protocol, so stop scanning lines.
280 break
281 break
281
282
282 # Otherwise it could be a banner, ``0\n`` response if server
283 # Otherwise it could be a banner, ``0\n`` response if server
283 # doesn't support upgrade.
284 # doesn't support upgrade.
284
285
285 if lines[-1] == '1\n' and l == '\n':
286 if lines[-1] == '1\n' and l == '\n':
286 break
287 break
287 if l:
288 if l:
288 ui.debug('remote: ', l)
289 ui.debug('remote: ', l)
289 lines.append(l)
290 lines.append(l)
290 max_noise -= 1
291 max_noise -= 1
291 except IOError:
292 except IOError:
292 badresponse()
293 badresponse()
293 else:
294 else:
294 badresponse()
295 badresponse()
295
296
296 caps = set()
297 caps = set()
297
298
298 # For version 1, we should see a ``capabilities`` line in response to the
299 # For version 1, we should see a ``capabilities`` line in response to the
299 # ``hello`` command.
300 # ``hello`` command.
300 if protoname == wireprototypes.SSHV1:
301 if protoname == wireprototypes.SSHV1:
301 for l in reversed(lines):
302 for l in reversed(lines):
302 # Look for response to ``hello`` command. Scan from the back so
303 # Look for response to ``hello`` command. Scan from the back so
303 # we don't misinterpret banner output as the command reply.
304 # we don't misinterpret banner output as the command reply.
304 if l.startswith('capabilities:'):
305 if l.startswith('capabilities:'):
305 caps.update(l[:-1].split(':')[1].split())
306 caps.update(l[:-1].split(':')[1].split())
306 break
307 break
307 elif protoname == wireprotoserver.SSHV2:
308 elif protoname == wireprotoserver.SSHV2:
308 # We see a line with number of bytes to follow and then a value
309 # We see a line with number of bytes to follow and then a value
309 # looking like ``capabilities: *``.
310 # looking like ``capabilities: *``.
310 line = stdout.readline()
311 line = stdout.readline()
311 try:
312 try:
312 valuelen = int(line)
313 valuelen = int(line)
313 except ValueError:
314 except ValueError:
314 badresponse()
315 badresponse()
315
316
316 capsline = stdout.read(valuelen)
317 capsline = stdout.read(valuelen)
317 if not capsline.startswith('capabilities: '):
318 if not capsline.startswith('capabilities: '):
318 badresponse()
319 badresponse()
319
320
320 ui.debug('remote: %s\n' % capsline)
321 ui.debug('remote: %s\n' % capsline)
321
322
322 caps.update(capsline.split(':')[1].split())
323 caps.update(capsline.split(':')[1].split())
323 # Trailing newline.
324 # Trailing newline.
324 stdout.read(1)
325 stdout.read(1)
325
326
326 # Error if we couldn't find capabilities, this means:
327 # Error if we couldn't find capabilities, this means:
327 #
328 #
328 # 1. Remote isn't a Mercurial server
329 # 1. Remote isn't a Mercurial server
329 # 2. Remote is a <0.9.1 Mercurial server
330 # 2. Remote is a <0.9.1 Mercurial server
330 # 3. Remote is a future Mercurial server that dropped ``hello``
331 # 3. Remote is a future Mercurial server that dropped ``hello``
331 # and other attempted handshake mechanisms.
332 # and other attempted handshake mechanisms.
332 if not caps:
333 if not caps:
333 badresponse()
334 badresponse()
334
335
335 # Flush any output on stderr before proceeding.
336 # Flush any output on stderr before proceeding.
336 _forwardoutput(ui, stderr)
337 _forwardoutput(ui, stderr)
337
338
338 return protoname, caps
339 return protoname, caps
339
340
340 class sshv1peer(wireproto.wirepeer):
341 class sshv1peer(wireproto.wirepeer):
341 def __init__(self, ui, url, proc, stdin, stdout, stderr, caps,
342 def __init__(self, ui, url, proc, stdin, stdout, stderr, caps,
342 autoreadstderr=True):
343 autoreadstderr=True):
343 """Create a peer from an existing SSH connection.
344 """Create a peer from an existing SSH connection.
344
345
345 ``proc`` is a handle on the underlying SSH process.
346 ``proc`` is a handle on the underlying SSH process.
346 ``stdin``, ``stdout``, and ``stderr`` are handles on the stdio
347 ``stdin``, ``stdout``, and ``stderr`` are handles on the stdio
347 pipes for that process.
348 pipes for that process.
348 ``caps`` is a set of capabilities supported by the remote.
349 ``caps`` is a set of capabilities supported by the remote.
349 ``autoreadstderr`` denotes whether to automatically read from
350 ``autoreadstderr`` denotes whether to automatically read from
350 stderr and to forward its output.
351 stderr and to forward its output.
351 """
352 """
352 self._url = url
353 self._url = url
353 self._ui = ui
354 self._ui = ui
354 # self._subprocess is unused. Keeping a handle on the process
355 # self._subprocess is unused. Keeping a handle on the process
355 # holds a reference and prevents it from being garbage collected.
356 # holds a reference and prevents it from being garbage collected.
356 self._subprocess = proc
357 self._subprocess = proc
357
358
358 # And we hook up our "doublepipe" wrapper to allow querying
359 # And we hook up our "doublepipe" wrapper to allow querying
359 # stderr any time we perform I/O.
360 # stderr any time we perform I/O.
360 if autoreadstderr:
361 if autoreadstderr:
361 stdout = doublepipe(ui, util.bufferedinputpipe(stdout), stderr)
362 stdout = doublepipe(ui, util.bufferedinputpipe(stdout), stderr)
362 stdin = doublepipe(ui, stdin, stderr)
363 stdin = doublepipe(ui, stdin, stderr)
363
364
364 self._pipeo = stdin
365 self._pipeo = stdin
365 self._pipei = stdout
366 self._pipei = stdout
366 self._pipee = stderr
367 self._pipee = stderr
367 self._caps = caps
368 self._caps = caps
368 self._autoreadstderr = autoreadstderr
369 self._autoreadstderr = autoreadstderr
369
370
370 # Commands that have a "framed" response where the first line of the
371 # Commands that have a "framed" response where the first line of the
371 # response contains the length of that response.
372 # response contains the length of that response.
372 _FRAMED_COMMANDS = {
373 _FRAMED_COMMANDS = {
373 'batch',
374 'batch',
374 }
375 }
375
376
376 # Begin of _basepeer interface.
377 # Begin of _basepeer interface.
377
378
378 @util.propertycache
379 @util.propertycache
379 def ui(self):
380 def ui(self):
380 return self._ui
381 return self._ui
381
382
382 def url(self):
383 def url(self):
383 return self._url
384 return self._url
384
385
385 def local(self):
386 def local(self):
386 return None
387 return None
387
388
388 def peer(self):
389 def peer(self):
389 return self
390 return self
390
391
391 def canpush(self):
392 def canpush(self):
392 return True
393 return True
393
394
394 def close(self):
395 def close(self):
395 pass
396 pass
396
397
397 # End of _basepeer interface.
398 # End of _basepeer interface.
398
399
399 # Begin of _basewirecommands interface.
400 # Begin of _basewirecommands interface.
400
401
401 def capabilities(self):
402 def capabilities(self):
402 return self._caps
403 return self._caps
403
404
404 # End of _basewirecommands interface.
405 # End of _basewirecommands interface.
405
406
406 def _readerr(self):
407 def _readerr(self):
407 _forwardoutput(self.ui, self._pipee)
408 _forwardoutput(self.ui, self._pipee)
408
409
409 def _abort(self, exception):
410 def _abort(self, exception):
410 self._cleanup()
411 self._cleanup()
411 raise exception
412 raise exception
412
413
413 def _cleanup(self):
414 def _cleanup(self):
414 _cleanuppipes(self.ui, self._pipei, self._pipeo, self._pipee)
415 _cleanuppipes(self.ui, self._pipei, self._pipeo, self._pipee)
415
416
416 __del__ = _cleanup
417 __del__ = _cleanup
417
418
418 def _sendrequest(self, cmd, args, framed=False):
419 def _sendrequest(self, cmd, args, framed=False):
419 if (self.ui.debugflag
420 if (self.ui.debugflag
420 and self.ui.configbool('devel', 'debug.peer-request')):
421 and self.ui.configbool('devel', 'debug.peer-request')):
421 dbg = self.ui.debug
422 dbg = self.ui.debug
422 line = 'devel-peer-request: %s\n'
423 line = 'devel-peer-request: %s\n'
423 dbg(line % cmd)
424 dbg(line % cmd)
424 for key, value in sorted(args.items()):
425 for key, value in sorted(args.items()):
425 if not isinstance(value, dict):
426 if not isinstance(value, dict):
426 dbg(line % ' %s: %d bytes' % (key, len(value)))
427 dbg(line % ' %s: %d bytes' % (key, len(value)))
427 else:
428 else:
428 for dk, dv in sorted(value.items()):
429 for dk, dv in sorted(value.items()):
429 dbg(line % ' %s-%s: %d' % (key, dk, len(dv)))
430 dbg(line % ' %s-%s: %d' % (key, dk, len(dv)))
430 self.ui.debug("sending %s command\n" % cmd)
431 self.ui.debug("sending %s command\n" % cmd)
431 self._pipeo.write("%s\n" % cmd)
432 self._pipeo.write("%s\n" % cmd)
432 _func, names = wireproto.commands[cmd]
433 _func, names = wireproto.commands[cmd]
433 keys = names.split()
434 keys = names.split()
434 wireargs = {}
435 wireargs = {}
435 for k in keys:
436 for k in keys:
436 if k == '*':
437 if k == '*':
437 wireargs['*'] = args
438 wireargs['*'] = args
438 break
439 break
439 else:
440 else:
440 wireargs[k] = args[k]
441 wireargs[k] = args[k]
441 del args[k]
442 del args[k]
442 for k, v in sorted(wireargs.iteritems()):
443 for k, v in sorted(wireargs.iteritems()):
443 self._pipeo.write("%s %d\n" % (k, len(v)))
444 self._pipeo.write("%s %d\n" % (k, len(v)))
444 if isinstance(v, dict):
445 if isinstance(v, dict):
445 for dk, dv in v.iteritems():
446 for dk, dv in v.iteritems():
446 self._pipeo.write("%s %d\n" % (dk, len(dv)))
447 self._pipeo.write("%s %d\n" % (dk, len(dv)))
447 self._pipeo.write(dv)
448 self._pipeo.write(dv)
448 else:
449 else:
449 self._pipeo.write(v)
450 self._pipeo.write(v)
450 self._pipeo.flush()
451 self._pipeo.flush()
451
452
452 # We know exactly how many bytes are in the response. So return a proxy
453 # We know exactly how many bytes are in the response. So return a proxy
453 # around the raw output stream that allows reading exactly this many
454 # around the raw output stream that allows reading exactly this many
454 # bytes. Callers then can read() without fear of overrunning the
455 # bytes. Callers then can read() without fear of overrunning the
455 # response.
456 # response.
456 if framed:
457 if framed:
457 amount = self._getamount()
458 amount = self._getamount()
458 return util.cappedreader(self._pipei, amount)
459 return util.cappedreader(self._pipei, amount)
459
460
460 return self._pipei
461 return self._pipei
461
462
462 def _callstream(self, cmd, **args):
463 def _callstream(self, cmd, **args):
463 args = pycompat.byteskwargs(args)
464 args = pycompat.byteskwargs(args)
464 return self._sendrequest(cmd, args, framed=cmd in self._FRAMED_COMMANDS)
465 return self._sendrequest(cmd, args, framed=cmd in self._FRAMED_COMMANDS)
465
466
466 def _callcompressable(self, cmd, **args):
467 def _callcompressable(self, cmd, **args):
467 args = pycompat.byteskwargs(args)
468 args = pycompat.byteskwargs(args)
468 return self._sendrequest(cmd, args, framed=cmd in self._FRAMED_COMMANDS)
469 return self._sendrequest(cmd, args, framed=cmd in self._FRAMED_COMMANDS)
469
470
470 def _call(self, cmd, **args):
471 def _call(self, cmd, **args):
471 args = pycompat.byteskwargs(args)
472 args = pycompat.byteskwargs(args)
472 return self._sendrequest(cmd, args, framed=True).read()
473 return self._sendrequest(cmd, args, framed=True).read()
473
474
474 def _callpush(self, cmd, fp, **args):
475 def _callpush(self, cmd, fp, **args):
475 # The server responds with an empty frame if the client should
476 # The server responds with an empty frame if the client should
476 # continue submitting the payload.
477 # continue submitting the payload.
477 r = self._call(cmd, **args)
478 r = self._call(cmd, **args)
478 if r:
479 if r:
479 return '', r
480 return '', r
480
481
481 # The payload consists of frames with content followed by an empty
482 # The payload consists of frames with content followed by an empty
482 # frame.
483 # frame.
483 for d in iter(lambda: fp.read(4096), ''):
484 for d in iter(lambda: fp.read(4096), ''):
484 self._writeframed(d)
485 self._writeframed(d)
485 self._writeframed("", flush=True)
486 self._writeframed("", flush=True)
486
487
487 # In case of success, there is an empty frame and a frame containing
488 # In case of success, there is an empty frame and a frame containing
488 # the integer result (as a string).
489 # the integer result (as a string).
489 # In case of error, there is a non-empty frame containing the error.
490 # In case of error, there is a non-empty frame containing the error.
490 r = self._readframed()
491 r = self._readframed()
491 if r:
492 if r:
492 return '', r
493 return '', r
493 return self._readframed(), ''
494 return self._readframed(), ''
494
495
495 def _calltwowaystream(self, cmd, fp, **args):
496 def _calltwowaystream(self, cmd, fp, **args):
496 # The server responds with an empty frame if the client should
497 # The server responds with an empty frame if the client should
497 # continue submitting the payload.
498 # continue submitting the payload.
498 r = self._call(cmd, **args)
499 r = self._call(cmd, **args)
499 if r:
500 if r:
500 # XXX needs to be made better
501 # XXX needs to be made better
501 raise error.Abort(_('unexpected remote reply: %s') % r)
502 raise error.Abort(_('unexpected remote reply: %s') % r)
502
503
503 # The payload consists of frames with content followed by an empty
504 # The payload consists of frames with content followed by an empty
504 # frame.
505 # frame.
505 for d in iter(lambda: fp.read(4096), ''):
506 for d in iter(lambda: fp.read(4096), ''):
506 self._writeframed(d)
507 self._writeframed(d)
507 self._writeframed("", flush=True)
508 self._writeframed("", flush=True)
508
509
509 return self._pipei
510 return self._pipei
510
511
511 def _getamount(self):
512 def _getamount(self):
512 l = self._pipei.readline()
513 l = self._pipei.readline()
513 if l == '\n':
514 if l == '\n':
514 if self._autoreadstderr:
515 if self._autoreadstderr:
515 self._readerr()
516 self._readerr()
516 msg = _('check previous remote output')
517 msg = _('check previous remote output')
517 self._abort(error.OutOfBandError(hint=msg))
518 self._abort(error.OutOfBandError(hint=msg))
518 if self._autoreadstderr:
519 if self._autoreadstderr:
519 self._readerr()
520 self._readerr()
520 try:
521 try:
521 return int(l)
522 return int(l)
522 except ValueError:
523 except ValueError:
523 self._abort(error.ResponseError(_("unexpected response:"), l))
524 self._abort(error.ResponseError(_("unexpected response:"), l))
524
525
525 def _readframed(self):
526 def _readframed(self):
526 size = self._getamount()
527 size = self._getamount()
527 if not size:
528 if not size:
528 return b''
529 return b''
529
530
530 return self._pipei.read(size)
531 return self._pipei.read(size)
531
532
532 def _writeframed(self, data, flush=False):
533 def _writeframed(self, data, flush=False):
533 self._pipeo.write("%d\n" % len(data))
534 self._pipeo.write("%d\n" % len(data))
534 if data:
535 if data:
535 self._pipeo.write(data)
536 self._pipeo.write(data)
536 if flush:
537 if flush:
537 self._pipeo.flush()
538 self._pipeo.flush()
538 if self._autoreadstderr:
539 if self._autoreadstderr:
539 self._readerr()
540 self._readerr()
540
541
541 class sshv2peer(sshv1peer):
542 class sshv2peer(sshv1peer):
542 """A peer that speakers version 2 of the transport protocol."""
543 """A peer that speakers version 2 of the transport protocol."""
543 # Currently version 2 is identical to version 1 post handshake.
544 # Currently version 2 is identical to version 1 post handshake.
544 # And handshake is performed before the peer is instantiated. So
545 # And handshake is performed before the peer is instantiated. So
545 # we need no custom code.
546 # we need no custom code.
546
547
547 def makepeer(ui, path, proc, stdin, stdout, stderr, autoreadstderr=True):
548 def makepeer(ui, path, proc, stdin, stdout, stderr, autoreadstderr=True):
548 """Make a peer instance from existing pipes.
549 """Make a peer instance from existing pipes.
549
550
550 ``path`` and ``proc`` are stored on the eventual peer instance and may
551 ``path`` and ``proc`` are stored on the eventual peer instance and may
551 not be used for anything meaningful.
552 not be used for anything meaningful.
552
553
553 ``stdin``, ``stdout``, and ``stderr`` are the pipes connected to the
554 ``stdin``, ``stdout``, and ``stderr`` are the pipes connected to the
554 SSH server's stdio handles.
555 SSH server's stdio handles.
555
556
556 This function is factored out to allow creating peers that don't
557 This function is factored out to allow creating peers that don't
557 actually spawn a new process. It is useful for starting SSH protocol
558 actually spawn a new process. It is useful for starting SSH protocol
558 servers and clients via non-standard means, which can be useful for
559 servers and clients via non-standard means, which can be useful for
559 testing.
560 testing.
560 """
561 """
561 try:
562 try:
562 protoname, caps = _performhandshake(ui, stdin, stdout, stderr)
563 protoname, caps = _performhandshake(ui, stdin, stdout, stderr)
563 except Exception:
564 except Exception:
564 _cleanuppipes(ui, stdout, stdin, stderr)
565 _cleanuppipes(ui, stdout, stdin, stderr)
565 raise
566 raise
566
567
567 if protoname == wireprototypes.SSHV1:
568 if protoname == wireprototypes.SSHV1:
568 return sshv1peer(ui, path, proc, stdin, stdout, stderr, caps,
569 return sshv1peer(ui, path, proc, stdin, stdout, stderr, caps,
569 autoreadstderr=autoreadstderr)
570 autoreadstderr=autoreadstderr)
570 elif protoname == wireprototypes.SSHV2:
571 elif protoname == wireprototypes.SSHV2:
571 return sshv2peer(ui, path, proc, stdin, stdout, stderr, caps,
572 return sshv2peer(ui, path, proc, stdin, stdout, stderr, caps,
572 autoreadstderr=autoreadstderr)
573 autoreadstderr=autoreadstderr)
573 else:
574 else:
574 _cleanuppipes(ui, stdout, stdin, stderr)
575 _cleanuppipes(ui, stdout, stdin, stderr)
575 raise error.RepoError(_('unknown version of SSH protocol: %s') %
576 raise error.RepoError(_('unknown version of SSH protocol: %s') %
576 protoname)
577 protoname)
577
578
578 def instance(ui, path, create):
579 def instance(ui, path, create):
579 """Create an SSH peer.
580 """Create an SSH peer.
580
581
581 The returned object conforms to the ``wireproto.wirepeer`` interface.
582 The returned object conforms to the ``wireproto.wirepeer`` interface.
582 """
583 """
583 u = util.url(path, parsequery=False, parsefragment=False)
584 u = util.url(path, parsequery=False, parsefragment=False)
584 if u.scheme != 'ssh' or not u.host or u.path is None:
585 if u.scheme != 'ssh' or not u.host or u.path is None:
585 raise error.RepoError(_("couldn't parse location %s") % path)
586 raise error.RepoError(_("couldn't parse location %s") % path)
586
587
587 util.checksafessh(path)
588 util.checksafessh(path)
588
589
589 if u.passwd is not None:
590 if u.passwd is not None:
590 raise error.RepoError(_('password in URL not supported'))
591 raise error.RepoError(_('password in URL not supported'))
591
592
592 sshcmd = ui.config('ui', 'ssh')
593 sshcmd = ui.config('ui', 'ssh')
593 remotecmd = ui.config('ui', 'remotecmd')
594 remotecmd = ui.config('ui', 'remotecmd')
594 sshaddenv = dict(ui.configitems('sshenv'))
595 sshaddenv = dict(ui.configitems('sshenv'))
595 sshenv = util.shellenviron(sshaddenv)
596 sshenv = util.shellenviron(sshaddenv)
596 remotepath = u.path or '.'
597 remotepath = u.path or '.'
597
598
598 args = util.sshargs(sshcmd, u.host, u.user, u.port)
599 args = util.sshargs(sshcmd, u.host, u.user, u.port)
599
600
600 if create:
601 if create:
601 cmd = '%s %s %s' % (sshcmd, args,
602 cmd = '%s %s %s' % (sshcmd, args,
602 util.shellquote('%s init %s' %
603 util.shellquote('%s init %s' %
603 (_serverquote(remotecmd), _serverquote(remotepath))))
604 (_serverquote(remotecmd), _serverquote(remotepath))))
604 ui.debug('running %s\n' % cmd)
605 ui.debug('running %s\n' % cmd)
605 res = ui.system(cmd, blockedtag='sshpeer', environ=sshenv)
606 res = ui.system(cmd, blockedtag='sshpeer', environ=sshenv)
606 if res != 0:
607 if res != 0:
607 raise error.RepoError(_('could not create remote repo'))
608 raise error.RepoError(_('could not create remote repo'))
608
609
609 proc, stdin, stdout, stderr = _makeconnection(ui, sshcmd, args, remotecmd,
610 proc, stdin, stdout, stderr = _makeconnection(ui, sshcmd, args, remotecmd,
610 remotepath, sshenv)
611 remotepath, sshenv)
611
612
612 return makepeer(ui, path, proc, stdin, stdout, stderr)
613 return makepeer(ui, path, proc, stdin, stdout, stderr)
General Comments 0
You need to be logged in to leave comments. Login now