##// END OF EJS Templates
sshpeer: don't fail forwarding output from closed connections...
Valentin Gatien-Baron -
r47429:fa30292b default
parent child Browse files
Show More
@@ -1,725 +1,725 b''
1 # sshpeer.py - ssh repository proxy class for mercurial
1 # sshpeer.py - ssh repository proxy class for mercurial
2 #
2 #
3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import re
10 import re
11 import uuid
11 import uuid
12
12
13 from .i18n import _
13 from .i18n import _
14 from .pycompat import getattr
14 from .pycompat import getattr
15 from . import (
15 from . import (
16 error,
16 error,
17 pycompat,
17 pycompat,
18 util,
18 util,
19 wireprotoserver,
19 wireprotoserver,
20 wireprototypes,
20 wireprototypes,
21 wireprotov1peer,
21 wireprotov1peer,
22 wireprotov1server,
22 wireprotov1server,
23 )
23 )
24 from .utils import (
24 from .utils import (
25 procutil,
25 procutil,
26 stringutil,
26 stringutil,
27 )
27 )
28
28
29
29
30 def _serverquote(s):
30 def _serverquote(s):
31 """quote a string for the remote shell ... which we assume is sh"""
31 """quote a string for the remote shell ... which we assume is sh"""
32 if not s:
32 if not s:
33 return s
33 return s
34 if re.match(b'[a-zA-Z0-9@%_+=:,./-]*$', s):
34 if re.match(b'[a-zA-Z0-9@%_+=:,./-]*$', s):
35 return s
35 return s
36 return b"'%s'" % s.replace(b"'", b"'\\''")
36 return b"'%s'" % s.replace(b"'", b"'\\''")
37
37
38
38
39 def _forwardoutput(ui, pipe, warn=False):
39 def _forwardoutput(ui, pipe, warn=False):
40 """display all data currently available on pipe as remote output.
40 """display all data currently available on pipe as remote output.
41
41
42 This is non blocking."""
42 This is non blocking."""
43 if pipe:
43 if pipe and not pipe.closed:
44 s = procutil.readpipe(pipe)
44 s = procutil.readpipe(pipe)
45 if s:
45 if s:
46 display = ui.warn if warn else ui.status
46 display = ui.warn if warn else ui.status
47 for l in s.splitlines():
47 for l in s.splitlines():
48 display(_(b"remote: "), l, b'\n')
48 display(_(b"remote: "), l, b'\n')
49
49
50
50
51 class doublepipe(object):
51 class doublepipe(object):
52 """Operate a side-channel pipe in addition of a main one
52 """Operate a side-channel pipe in addition of a main one
53
53
54 The side-channel pipe contains server output to be forwarded to the user
54 The side-channel pipe contains server output to be forwarded to the user
55 input. The double pipe will behave as the "main" pipe, but will ensure the
55 input. The double pipe will behave as the "main" pipe, but will ensure the
56 content of the "side" pipe is properly processed while we wait for blocking
56 content of the "side" pipe is properly processed while we wait for blocking
57 call on the "main" pipe.
57 call on the "main" pipe.
58
58
59 If large amounts of data are read from "main", the forward will cease after
59 If large amounts of data are read from "main", the forward will cease after
60 the first bytes start to appear. This simplifies the implementation
60 the first bytes start to appear. This simplifies the implementation
61 without affecting actual output of sshpeer too much as we rarely issue
61 without affecting actual output of sshpeer too much as we rarely issue
62 large read for data not yet emitted by the server.
62 large read for data not yet emitted by the server.
63
63
64 The main pipe is expected to be a 'bufferedinputpipe' from the util module
64 The main pipe is expected to be a 'bufferedinputpipe' from the util module
65 that handle all the os specific bits. This class lives in this module
65 that handle all the os specific bits. This class lives in this module
66 because it focus on behavior specific to the ssh protocol."""
66 because it focus on behavior specific to the ssh protocol."""
67
67
68 def __init__(self, ui, main, side):
68 def __init__(self, ui, main, side):
69 self._ui = ui
69 self._ui = ui
70 self._main = main
70 self._main = main
71 self._side = side
71 self._side = side
72
72
73 def _wait(self):
73 def _wait(self):
74 """wait until some data are available on main or side
74 """wait until some data are available on main or side
75
75
76 return a pair of boolean (ismainready, issideready)
76 return a pair of boolean (ismainready, issideready)
77
77
78 (This will only wait for data if the setup is supported by `util.poll`)
78 (This will only wait for data if the setup is supported by `util.poll`)
79 """
79 """
80 if (
80 if (
81 isinstance(self._main, util.bufferedinputpipe)
81 isinstance(self._main, util.bufferedinputpipe)
82 and self._main.hasbuffer
82 and self._main.hasbuffer
83 ):
83 ):
84 # Main has data. Assume side is worth poking at.
84 # Main has data. Assume side is worth poking at.
85 return True, True
85 return True, True
86
86
87 fds = [self._main.fileno(), self._side.fileno()]
87 fds = [self._main.fileno(), self._side.fileno()]
88 try:
88 try:
89 act = util.poll(fds)
89 act = util.poll(fds)
90 except NotImplementedError:
90 except NotImplementedError:
91 # non supported yet case, assume all have data.
91 # non supported yet case, assume all have data.
92 act = fds
92 act = fds
93 return (self._main.fileno() in act, self._side.fileno() in act)
93 return (self._main.fileno() in act, self._side.fileno() in act)
94
94
95 def write(self, data):
95 def write(self, data):
96 return self._call(b'write', data)
96 return self._call(b'write', data)
97
97
98 def read(self, size):
98 def read(self, size):
99 r = self._call(b'read', size)
99 r = self._call(b'read', size)
100 if size != 0 and not r:
100 if size != 0 and not r:
101 # We've observed a condition that indicates the
101 # We've observed a condition that indicates the
102 # stdout closed unexpectedly. Check stderr one
102 # stdout closed unexpectedly. Check stderr one
103 # more time and snag anything that's there before
103 # more time and snag anything that's there before
104 # letting anyone know the main part of the pipe
104 # letting anyone know the main part of the pipe
105 # closed prematurely.
105 # closed prematurely.
106 _forwardoutput(self._ui, self._side)
106 _forwardoutput(self._ui, self._side)
107 return r
107 return r
108
108
109 def unbufferedread(self, size):
109 def unbufferedread(self, size):
110 r = self._call(b'unbufferedread', size)
110 r = self._call(b'unbufferedread', size)
111 if size != 0 and not r:
111 if size != 0 and not r:
112 # We've observed a condition that indicates the
112 # We've observed a condition that indicates the
113 # stdout closed unexpectedly. Check stderr one
113 # stdout closed unexpectedly. Check stderr one
114 # more time and snag anything that's there before
114 # more time and snag anything that's there before
115 # letting anyone know the main part of the pipe
115 # letting anyone know the main part of the pipe
116 # closed prematurely.
116 # closed prematurely.
117 _forwardoutput(self._ui, self._side)
117 _forwardoutput(self._ui, self._side)
118 return r
118 return r
119
119
120 def readline(self):
120 def readline(self):
121 return self._call(b'readline')
121 return self._call(b'readline')
122
122
123 def _call(self, methname, data=None):
123 def _call(self, methname, data=None):
124 """call <methname> on "main", forward output of "side" while blocking"""
124 """call <methname> on "main", forward output of "side" while blocking"""
125 # data can be '' or 0
125 # data can be '' or 0
126 if (data is not None and not data) or self._main.closed:
126 if (data is not None and not data) or self._main.closed:
127 _forwardoutput(self._ui, self._side)
127 _forwardoutput(self._ui, self._side)
128 return b''
128 return b''
129 while True:
129 while True:
130 mainready, sideready = self._wait()
130 mainready, sideready = self._wait()
131 if sideready:
131 if sideready:
132 _forwardoutput(self._ui, self._side)
132 _forwardoutput(self._ui, self._side)
133 if mainready:
133 if mainready:
134 meth = getattr(self._main, methname)
134 meth = getattr(self._main, methname)
135 if data is None:
135 if data is None:
136 return meth()
136 return meth()
137 else:
137 else:
138 return meth(data)
138 return meth(data)
139
139
140 def close(self):
140 def close(self):
141 return self._main.close()
141 return self._main.close()
142
142
143 @property
143 @property
144 def closed(self):
144 def closed(self):
145 return self._main.closed
145 return self._main.closed
146
146
147 def flush(self):
147 def flush(self):
148 return self._main.flush()
148 return self._main.flush()
149
149
150
150
151 def _cleanuppipes(ui, pipei, pipeo, pipee, warn):
151 def _cleanuppipes(ui, pipei, pipeo, pipee, warn):
152 """Clean up pipes used by an SSH connection."""
152 """Clean up pipes used by an SSH connection."""
153 didsomething = False
153 didsomething = False
154 if pipeo and not pipeo.closed:
154 if pipeo and not pipeo.closed:
155 didsomething = True
155 didsomething = True
156 pipeo.close()
156 pipeo.close()
157 if pipei and not pipei.closed:
157 if pipei and not pipei.closed:
158 didsomething = True
158 didsomething = True
159 pipei.close()
159 pipei.close()
160
160
161 if pipee and not pipee.closed:
161 if pipee and not pipee.closed:
162 didsomething = True
162 didsomething = True
163 # Try to read from the err descriptor until EOF.
163 # Try to read from the err descriptor until EOF.
164 try:
164 try:
165 for l in pipee:
165 for l in pipee:
166 ui.status(_(b'remote: '), l)
166 ui.status(_(b'remote: '), l)
167 except (IOError, ValueError):
167 except (IOError, ValueError):
168 pass
168 pass
169
169
170 pipee.close()
170 pipee.close()
171
171
172 if didsomething and warn is not None:
172 if didsomething and warn is not None:
173 # Encourage explicit close of sshpeers. Closing via __del__ is
173 # Encourage explicit close of sshpeers. Closing via __del__ is
174 # not very predictable when exceptions are thrown, which has led
174 # not very predictable when exceptions are thrown, which has led
175 # to deadlocks due to a peer get gc'ed in a fork
175 # to deadlocks due to a peer get gc'ed in a fork
176 # We add our own stack trace, because the stacktrace when called
176 # We add our own stack trace, because the stacktrace when called
177 # from __del__ is useless.
177 # from __del__ is useless.
178 ui.develwarn(b'missing close on SSH connection created at:\n%s' % warn)
178 ui.develwarn(b'missing close on SSH connection created at:\n%s' % warn)
179
179
180
180
181 def _makeconnection(ui, sshcmd, args, remotecmd, path, sshenv=None):
181 def _makeconnection(ui, sshcmd, args, remotecmd, path, sshenv=None):
182 """Create an SSH connection to a server.
182 """Create an SSH connection to a server.
183
183
184 Returns a tuple of (process, stdin, stdout, stderr) for the
184 Returns a tuple of (process, stdin, stdout, stderr) for the
185 spawned process.
185 spawned process.
186 """
186 """
187 cmd = b'%s %s %s' % (
187 cmd = b'%s %s %s' % (
188 sshcmd,
188 sshcmd,
189 args,
189 args,
190 procutil.shellquote(
190 procutil.shellquote(
191 b'%s -R %s serve --stdio'
191 b'%s -R %s serve --stdio'
192 % (_serverquote(remotecmd), _serverquote(path))
192 % (_serverquote(remotecmd), _serverquote(path))
193 ),
193 ),
194 )
194 )
195
195
196 ui.debug(b'running %s\n' % cmd)
196 ui.debug(b'running %s\n' % cmd)
197
197
198 # no buffer allow the use of 'select'
198 # no buffer allow the use of 'select'
199 # feel free to remove buffering and select usage when we ultimately
199 # feel free to remove buffering and select usage when we ultimately
200 # move to threading.
200 # move to threading.
201 stdin, stdout, stderr, proc = procutil.popen4(cmd, bufsize=0, env=sshenv)
201 stdin, stdout, stderr, proc = procutil.popen4(cmd, bufsize=0, env=sshenv)
202
202
203 return proc, stdin, stdout, stderr
203 return proc, stdin, stdout, stderr
204
204
205
205
206 def _clientcapabilities():
206 def _clientcapabilities():
207 """Return list of capabilities of this client.
207 """Return list of capabilities of this client.
208
208
209 Returns a list of capabilities that are supported by this client.
209 Returns a list of capabilities that are supported by this client.
210 """
210 """
211 protoparams = {b'partial-pull'}
211 protoparams = {b'partial-pull'}
212 comps = [
212 comps = [
213 e.wireprotosupport().name
213 e.wireprotosupport().name
214 for e in util.compengines.supportedwireengines(util.CLIENTROLE)
214 for e in util.compengines.supportedwireengines(util.CLIENTROLE)
215 ]
215 ]
216 protoparams.add(b'comp=%s' % b','.join(comps))
216 protoparams.add(b'comp=%s' % b','.join(comps))
217 return protoparams
217 return protoparams
218
218
219
219
220 def _performhandshake(ui, stdin, stdout, stderr):
220 def _performhandshake(ui, stdin, stdout, stderr):
221 def badresponse():
221 def badresponse():
222 # Flush any output on stderr. In general, the stderr contains errors
222 # Flush any output on stderr. In general, the stderr contains errors
223 # from the remote (ssh errors, some hg errors), and status indications
223 # from the remote (ssh errors, some hg errors), and status indications
224 # (like "adding changes"), with no current way to tell them apart.
224 # (like "adding changes"), with no current way to tell them apart.
225 # Here we failed so early that it's almost certainly only errors, so
225 # Here we failed so early that it's almost certainly only errors, so
226 # use warn=True so -q doesn't hide them.
226 # use warn=True so -q doesn't hide them.
227 _forwardoutput(ui, stderr, warn=True)
227 _forwardoutput(ui, stderr, warn=True)
228
228
229 msg = _(b'no suitable response from remote hg')
229 msg = _(b'no suitable response from remote hg')
230 hint = ui.config(b'ui', b'ssherrorhint')
230 hint = ui.config(b'ui', b'ssherrorhint')
231 raise error.RepoError(msg, hint=hint)
231 raise error.RepoError(msg, hint=hint)
232
232
233 # The handshake consists of sending wire protocol commands in reverse
233 # The handshake consists of sending wire protocol commands in reverse
234 # order of protocol implementation and then sniffing for a response
234 # order of protocol implementation and then sniffing for a response
235 # to one of them.
235 # to one of them.
236 #
236 #
237 # Those commands (from oldest to newest) are:
237 # Those commands (from oldest to newest) are:
238 #
238 #
239 # ``between``
239 # ``between``
240 # Asks for the set of revisions between a pair of revisions. Command
240 # Asks for the set of revisions between a pair of revisions. Command
241 # present in all Mercurial server implementations.
241 # present in all Mercurial server implementations.
242 #
242 #
243 # ``hello``
243 # ``hello``
244 # Instructs the server to advertise its capabilities. Introduced in
244 # Instructs the server to advertise its capabilities. Introduced in
245 # Mercurial 0.9.1.
245 # Mercurial 0.9.1.
246 #
246 #
247 # ``upgrade``
247 # ``upgrade``
248 # Requests upgrade from default transport protocol version 1 to
248 # Requests upgrade from default transport protocol version 1 to
249 # a newer version. Introduced in Mercurial 4.6 as an experimental
249 # a newer version. Introduced in Mercurial 4.6 as an experimental
250 # feature.
250 # feature.
251 #
251 #
252 # The ``between`` command is issued with a request for the null
252 # The ``between`` command is issued with a request for the null
253 # range. If the remote is a Mercurial server, this request will
253 # range. If the remote is a Mercurial server, this request will
254 # generate a specific response: ``1\n\n``. This represents the
254 # generate a specific response: ``1\n\n``. This represents the
255 # wire protocol encoded value for ``\n``. We look for ``1\n\n``
255 # wire protocol encoded value for ``\n``. We look for ``1\n\n``
256 # in the output stream and know this is the response to ``between``
256 # in the output stream and know this is the response to ``between``
257 # and we're at the end of our handshake reply.
257 # and we're at the end of our handshake reply.
258 #
258 #
259 # The response to the ``hello`` command will be a line with the
259 # The response to the ``hello`` command will be a line with the
260 # length of the value returned by that command followed by that
260 # length of the value returned by that command followed by that
261 # value. If the server doesn't support ``hello`` (which should be
261 # value. If the server doesn't support ``hello`` (which should be
262 # rare), that line will be ``0\n``. Otherwise, the value will contain
262 # rare), that line will be ``0\n``. Otherwise, the value will contain
263 # RFC 822 like lines. Of these, the ``capabilities:`` line contains
263 # RFC 822 like lines. Of these, the ``capabilities:`` line contains
264 # the capabilities of the server.
264 # the capabilities of the server.
265 #
265 #
266 # The ``upgrade`` command isn't really a command in the traditional
266 # The ``upgrade`` command isn't really a command in the traditional
267 # sense of version 1 of the transport because it isn't using the
267 # sense of version 1 of the transport because it isn't using the
268 # proper mechanism for formatting insteads: instead, it just encodes
268 # proper mechanism for formatting insteads: instead, it just encodes
269 # arguments on the line, delimited by spaces.
269 # arguments on the line, delimited by spaces.
270 #
270 #
271 # The ``upgrade`` line looks like ``upgrade <token> <capabilities>``.
271 # The ``upgrade`` line looks like ``upgrade <token> <capabilities>``.
272 # If the server doesn't support protocol upgrades, it will reply to
272 # If the server doesn't support protocol upgrades, it will reply to
273 # this line with ``0\n``. Otherwise, it emits an
273 # this line with ``0\n``. Otherwise, it emits an
274 # ``upgraded <token> <protocol>`` line to both stdout and stderr.
274 # ``upgraded <token> <protocol>`` line to both stdout and stderr.
275 # Content immediately following this line describes additional
275 # Content immediately following this line describes additional
276 # protocol and server state.
276 # protocol and server state.
277 #
277 #
278 # In addition to the responses to our command requests, the server
278 # In addition to the responses to our command requests, the server
279 # may emit "banner" output on stdout. SSH servers are allowed to
279 # may emit "banner" output on stdout. SSH servers are allowed to
280 # print messages to stdout on login. Issuing commands on connection
280 # print messages to stdout on login. Issuing commands on connection
281 # allows us to flush this banner output from the server by scanning
281 # allows us to flush this banner output from the server by scanning
282 # for output to our well-known ``between`` command. Of course, if
282 # for output to our well-known ``between`` command. Of course, if
283 # the banner contains ``1\n\n``, this will throw off our detection.
283 # the banner contains ``1\n\n``, this will throw off our detection.
284
284
285 requestlog = ui.configbool(b'devel', b'debug.peer-request')
285 requestlog = ui.configbool(b'devel', b'debug.peer-request')
286
286
287 # Generate a random token to help identify responses to version 2
287 # Generate a random token to help identify responses to version 2
288 # upgrade request.
288 # upgrade request.
289 token = pycompat.sysbytes(str(uuid.uuid4()))
289 token = pycompat.sysbytes(str(uuid.uuid4()))
290 upgradecaps = [
290 upgradecaps = [
291 (b'proto', wireprotoserver.SSHV2),
291 (b'proto', wireprotoserver.SSHV2),
292 ]
292 ]
293 upgradecaps = util.urlreq.urlencode(upgradecaps)
293 upgradecaps = util.urlreq.urlencode(upgradecaps)
294
294
295 try:
295 try:
296 pairsarg = b'%s-%s' % (b'0' * 40, b'0' * 40)
296 pairsarg = b'%s-%s' % (b'0' * 40, b'0' * 40)
297 handshake = [
297 handshake = [
298 b'hello\n',
298 b'hello\n',
299 b'between\n',
299 b'between\n',
300 b'pairs %d\n' % len(pairsarg),
300 b'pairs %d\n' % len(pairsarg),
301 pairsarg,
301 pairsarg,
302 ]
302 ]
303
303
304 # Request upgrade to version 2 if configured.
304 # Request upgrade to version 2 if configured.
305 if ui.configbool(b'experimental', b'sshpeer.advertise-v2'):
305 if ui.configbool(b'experimental', b'sshpeer.advertise-v2'):
306 ui.debug(b'sending upgrade request: %s %s\n' % (token, upgradecaps))
306 ui.debug(b'sending upgrade request: %s %s\n' % (token, upgradecaps))
307 handshake.insert(0, b'upgrade %s %s\n' % (token, upgradecaps))
307 handshake.insert(0, b'upgrade %s %s\n' % (token, upgradecaps))
308
308
309 if requestlog:
309 if requestlog:
310 ui.debug(b'devel-peer-request: hello+between\n')
310 ui.debug(b'devel-peer-request: hello+between\n')
311 ui.debug(b'devel-peer-request: pairs: %d bytes\n' % len(pairsarg))
311 ui.debug(b'devel-peer-request: pairs: %d bytes\n' % len(pairsarg))
312 ui.debug(b'sending hello command\n')
312 ui.debug(b'sending hello command\n')
313 ui.debug(b'sending between command\n')
313 ui.debug(b'sending between command\n')
314
314
315 stdin.write(b''.join(handshake))
315 stdin.write(b''.join(handshake))
316 stdin.flush()
316 stdin.flush()
317 except IOError:
317 except IOError:
318 badresponse()
318 badresponse()
319
319
320 # Assume version 1 of wire protocol by default.
320 # Assume version 1 of wire protocol by default.
321 protoname = wireprototypes.SSHV1
321 protoname = wireprototypes.SSHV1
322 reupgraded = re.compile(b'^upgraded %s (.*)$' % stringutil.reescape(token))
322 reupgraded = re.compile(b'^upgraded %s (.*)$' % stringutil.reescape(token))
323
323
324 lines = [b'', b'dummy']
324 lines = [b'', b'dummy']
325 max_noise = 500
325 max_noise = 500
326 while lines[-1] and max_noise:
326 while lines[-1] and max_noise:
327 try:
327 try:
328 l = stdout.readline()
328 l = stdout.readline()
329 _forwardoutput(ui, stderr, warn=True)
329 _forwardoutput(ui, stderr, warn=True)
330
330
331 # Look for reply to protocol upgrade request. It has a token
331 # Look for reply to protocol upgrade request. It has a token
332 # in it, so there should be no false positives.
332 # in it, so there should be no false positives.
333 m = reupgraded.match(l)
333 m = reupgraded.match(l)
334 if m:
334 if m:
335 protoname = m.group(1)
335 protoname = m.group(1)
336 ui.debug(b'protocol upgraded to %s\n' % protoname)
336 ui.debug(b'protocol upgraded to %s\n' % protoname)
337 # If an upgrade was handled, the ``hello`` and ``between``
337 # If an upgrade was handled, the ``hello`` and ``between``
338 # requests are ignored. The next output belongs to the
338 # requests are ignored. The next output belongs to the
339 # protocol, so stop scanning lines.
339 # protocol, so stop scanning lines.
340 break
340 break
341
341
342 # Otherwise it could be a banner, ``0\n`` response if server
342 # Otherwise it could be a banner, ``0\n`` response if server
343 # doesn't support upgrade.
343 # doesn't support upgrade.
344
344
345 if lines[-1] == b'1\n' and l == b'\n':
345 if lines[-1] == b'1\n' and l == b'\n':
346 break
346 break
347 if l:
347 if l:
348 ui.debug(b'remote: ', l)
348 ui.debug(b'remote: ', l)
349 lines.append(l)
349 lines.append(l)
350 max_noise -= 1
350 max_noise -= 1
351 except IOError:
351 except IOError:
352 badresponse()
352 badresponse()
353 else:
353 else:
354 badresponse()
354 badresponse()
355
355
356 caps = set()
356 caps = set()
357
357
358 # For version 1, we should see a ``capabilities`` line in response to the
358 # For version 1, we should see a ``capabilities`` line in response to the
359 # ``hello`` command.
359 # ``hello`` command.
360 if protoname == wireprototypes.SSHV1:
360 if protoname == wireprototypes.SSHV1:
361 for l in reversed(lines):
361 for l in reversed(lines):
362 # Look for response to ``hello`` command. Scan from the back so
362 # Look for response to ``hello`` command. Scan from the back so
363 # we don't misinterpret banner output as the command reply.
363 # we don't misinterpret banner output as the command reply.
364 if l.startswith(b'capabilities:'):
364 if l.startswith(b'capabilities:'):
365 caps.update(l[:-1].split(b':')[1].split())
365 caps.update(l[:-1].split(b':')[1].split())
366 break
366 break
367 elif protoname == wireprotoserver.SSHV2:
367 elif protoname == wireprotoserver.SSHV2:
368 # We see a line with number of bytes to follow and then a value
368 # We see a line with number of bytes to follow and then a value
369 # looking like ``capabilities: *``.
369 # looking like ``capabilities: *``.
370 line = stdout.readline()
370 line = stdout.readline()
371 try:
371 try:
372 valuelen = int(line)
372 valuelen = int(line)
373 except ValueError:
373 except ValueError:
374 badresponse()
374 badresponse()
375
375
376 capsline = stdout.read(valuelen)
376 capsline = stdout.read(valuelen)
377 if not capsline.startswith(b'capabilities: '):
377 if not capsline.startswith(b'capabilities: '):
378 badresponse()
378 badresponse()
379
379
380 ui.debug(b'remote: %s\n' % capsline)
380 ui.debug(b'remote: %s\n' % capsline)
381
381
382 caps.update(capsline.split(b':')[1].split())
382 caps.update(capsline.split(b':')[1].split())
383 # Trailing newline.
383 # Trailing newline.
384 stdout.read(1)
384 stdout.read(1)
385
385
386 # Error if we couldn't find capabilities, this means:
386 # Error if we couldn't find capabilities, this means:
387 #
387 #
388 # 1. Remote isn't a Mercurial server
388 # 1. Remote isn't a Mercurial server
389 # 2. Remote is a <0.9.1 Mercurial server
389 # 2. Remote is a <0.9.1 Mercurial server
390 # 3. Remote is a future Mercurial server that dropped ``hello``
390 # 3. Remote is a future Mercurial server that dropped ``hello``
391 # and other attempted handshake mechanisms.
391 # and other attempted handshake mechanisms.
392 if not caps:
392 if not caps:
393 badresponse()
393 badresponse()
394
394
395 # Flush any output on stderr before proceeding.
395 # Flush any output on stderr before proceeding.
396 _forwardoutput(ui, stderr, warn=True)
396 _forwardoutput(ui, stderr, warn=True)
397
397
398 return protoname, caps
398 return protoname, caps
399
399
400
400
401 class sshv1peer(wireprotov1peer.wirepeer):
401 class sshv1peer(wireprotov1peer.wirepeer):
402 def __init__(
402 def __init__(
403 self, ui, url, proc, stdin, stdout, stderr, caps, autoreadstderr=True
403 self, ui, url, proc, stdin, stdout, stderr, caps, autoreadstderr=True
404 ):
404 ):
405 """Create a peer from an existing SSH connection.
405 """Create a peer from an existing SSH connection.
406
406
407 ``proc`` is a handle on the underlying SSH process.
407 ``proc`` is a handle on the underlying SSH process.
408 ``stdin``, ``stdout``, and ``stderr`` are handles on the stdio
408 ``stdin``, ``stdout``, and ``stderr`` are handles on the stdio
409 pipes for that process.
409 pipes for that process.
410 ``caps`` is a set of capabilities supported by the remote.
410 ``caps`` is a set of capabilities supported by the remote.
411 ``autoreadstderr`` denotes whether to automatically read from
411 ``autoreadstderr`` denotes whether to automatically read from
412 stderr and to forward its output.
412 stderr and to forward its output.
413 """
413 """
414 self._url = url
414 self._url = url
415 self.ui = ui
415 self.ui = ui
416 # self._subprocess is unused. Keeping a handle on the process
416 # self._subprocess is unused. Keeping a handle on the process
417 # holds a reference and prevents it from being garbage collected.
417 # holds a reference and prevents it from being garbage collected.
418 self._subprocess = proc
418 self._subprocess = proc
419
419
420 # And we hook up our "doublepipe" wrapper to allow querying
420 # And we hook up our "doublepipe" wrapper to allow querying
421 # stderr any time we perform I/O.
421 # stderr any time we perform I/O.
422 if autoreadstderr:
422 if autoreadstderr:
423 stdout = doublepipe(ui, util.bufferedinputpipe(stdout), stderr)
423 stdout = doublepipe(ui, util.bufferedinputpipe(stdout), stderr)
424 stdin = doublepipe(ui, stdin, stderr)
424 stdin = doublepipe(ui, stdin, stderr)
425
425
426 self._pipeo = stdin
426 self._pipeo = stdin
427 self._pipei = stdout
427 self._pipei = stdout
428 self._pipee = stderr
428 self._pipee = stderr
429 self._caps = caps
429 self._caps = caps
430 self._autoreadstderr = autoreadstderr
430 self._autoreadstderr = autoreadstderr
431 self._initstack = b''.join(util.getstackframes(1))
431 self._initstack = b''.join(util.getstackframes(1))
432
432
433 # Commands that have a "framed" response where the first line of the
433 # Commands that have a "framed" response where the first line of the
434 # response contains the length of that response.
434 # response contains the length of that response.
435 _FRAMED_COMMANDS = {
435 _FRAMED_COMMANDS = {
436 b'batch',
436 b'batch',
437 }
437 }
438
438
439 # Begin of ipeerconnection interface.
439 # Begin of ipeerconnection interface.
440
440
441 def url(self):
441 def url(self):
442 return self._url
442 return self._url
443
443
444 def local(self):
444 def local(self):
445 return None
445 return None
446
446
447 def peer(self):
447 def peer(self):
448 return self
448 return self
449
449
450 def canpush(self):
450 def canpush(self):
451 return True
451 return True
452
452
453 def close(self):
453 def close(self):
454 self._cleanup()
454 self._cleanup()
455
455
456 # End of ipeerconnection interface.
456 # End of ipeerconnection interface.
457
457
458 # Begin of ipeercommands interface.
458 # Begin of ipeercommands interface.
459
459
460 def capabilities(self):
460 def capabilities(self):
461 return self._caps
461 return self._caps
462
462
463 # End of ipeercommands interface.
463 # End of ipeercommands interface.
464
464
465 def _readerr(self):
465 def _readerr(self):
466 _forwardoutput(self.ui, self._pipee)
466 _forwardoutput(self.ui, self._pipee)
467
467
468 def _abort(self, exception):
468 def _abort(self, exception):
469 self._cleanup()
469 self._cleanup()
470 raise exception
470 raise exception
471
471
472 def _cleanup(self, warn=None):
472 def _cleanup(self, warn=None):
473 _cleanuppipes(self.ui, self._pipei, self._pipeo, self._pipee, warn=warn)
473 _cleanuppipes(self.ui, self._pipei, self._pipeo, self._pipee, warn=warn)
474
474
475 def __del__(self):
475 def __del__(self):
476 self._cleanup(warn=self._initstack)
476 self._cleanup(warn=self._initstack)
477
477
478 def _sendrequest(self, cmd, args, framed=False):
478 def _sendrequest(self, cmd, args, framed=False):
479 if self.ui.debugflag and self.ui.configbool(
479 if self.ui.debugflag and self.ui.configbool(
480 b'devel', b'debug.peer-request'
480 b'devel', b'debug.peer-request'
481 ):
481 ):
482 dbg = self.ui.debug
482 dbg = self.ui.debug
483 line = b'devel-peer-request: %s\n'
483 line = b'devel-peer-request: %s\n'
484 dbg(line % cmd)
484 dbg(line % cmd)
485 for key, value in sorted(args.items()):
485 for key, value in sorted(args.items()):
486 if not isinstance(value, dict):
486 if not isinstance(value, dict):
487 dbg(line % b' %s: %d bytes' % (key, len(value)))
487 dbg(line % b' %s: %d bytes' % (key, len(value)))
488 else:
488 else:
489 for dk, dv in sorted(value.items()):
489 for dk, dv in sorted(value.items()):
490 dbg(line % b' %s-%s: %d' % (key, dk, len(dv)))
490 dbg(line % b' %s-%s: %d' % (key, dk, len(dv)))
491 self.ui.debug(b"sending %s command\n" % cmd)
491 self.ui.debug(b"sending %s command\n" % cmd)
492 self._pipeo.write(b"%s\n" % cmd)
492 self._pipeo.write(b"%s\n" % cmd)
493 _func, names = wireprotov1server.commands[cmd]
493 _func, names = wireprotov1server.commands[cmd]
494 keys = names.split()
494 keys = names.split()
495 wireargs = {}
495 wireargs = {}
496 for k in keys:
496 for k in keys:
497 if k == b'*':
497 if k == b'*':
498 wireargs[b'*'] = args
498 wireargs[b'*'] = args
499 break
499 break
500 else:
500 else:
501 wireargs[k] = args[k]
501 wireargs[k] = args[k]
502 del args[k]
502 del args[k]
503 for k, v in sorted(pycompat.iteritems(wireargs)):
503 for k, v in sorted(pycompat.iteritems(wireargs)):
504 self._pipeo.write(b"%s %d\n" % (k, len(v)))
504 self._pipeo.write(b"%s %d\n" % (k, len(v)))
505 if isinstance(v, dict):
505 if isinstance(v, dict):
506 for dk, dv in pycompat.iteritems(v):
506 for dk, dv in pycompat.iteritems(v):
507 self._pipeo.write(b"%s %d\n" % (dk, len(dv)))
507 self._pipeo.write(b"%s %d\n" % (dk, len(dv)))
508 self._pipeo.write(dv)
508 self._pipeo.write(dv)
509 else:
509 else:
510 self._pipeo.write(v)
510 self._pipeo.write(v)
511 self._pipeo.flush()
511 self._pipeo.flush()
512
512
513 # We know exactly how many bytes are in the response. So return a proxy
513 # We know exactly how many bytes are in the response. So return a proxy
514 # around the raw output stream that allows reading exactly this many
514 # around the raw output stream that allows reading exactly this many
515 # bytes. Callers then can read() without fear of overrunning the
515 # bytes. Callers then can read() without fear of overrunning the
516 # response.
516 # response.
517 if framed:
517 if framed:
518 amount = self._getamount()
518 amount = self._getamount()
519 return util.cappedreader(self._pipei, amount)
519 return util.cappedreader(self._pipei, amount)
520
520
521 return self._pipei
521 return self._pipei
522
522
523 def _callstream(self, cmd, **args):
523 def _callstream(self, cmd, **args):
524 args = pycompat.byteskwargs(args)
524 args = pycompat.byteskwargs(args)
525 return self._sendrequest(cmd, args, framed=cmd in self._FRAMED_COMMANDS)
525 return self._sendrequest(cmd, args, framed=cmd in self._FRAMED_COMMANDS)
526
526
527 def _callcompressable(self, cmd, **args):
527 def _callcompressable(self, cmd, **args):
528 args = pycompat.byteskwargs(args)
528 args = pycompat.byteskwargs(args)
529 return self._sendrequest(cmd, args, framed=cmd in self._FRAMED_COMMANDS)
529 return self._sendrequest(cmd, args, framed=cmd in self._FRAMED_COMMANDS)
530
530
531 def _call(self, cmd, **args):
531 def _call(self, cmd, **args):
532 args = pycompat.byteskwargs(args)
532 args = pycompat.byteskwargs(args)
533 return self._sendrequest(cmd, args, framed=True).read()
533 return self._sendrequest(cmd, args, framed=True).read()
534
534
535 def _callpush(self, cmd, fp, **args):
535 def _callpush(self, cmd, fp, **args):
536 # The server responds with an empty frame if the client should
536 # The server responds with an empty frame if the client should
537 # continue submitting the payload.
537 # continue submitting the payload.
538 r = self._call(cmd, **args)
538 r = self._call(cmd, **args)
539 if r:
539 if r:
540 return b'', r
540 return b'', r
541
541
542 # The payload consists of frames with content followed by an empty
542 # The payload consists of frames with content followed by an empty
543 # frame.
543 # frame.
544 for d in iter(lambda: fp.read(4096), b''):
544 for d in iter(lambda: fp.read(4096), b''):
545 self._writeframed(d)
545 self._writeframed(d)
546 self._writeframed(b"", flush=True)
546 self._writeframed(b"", flush=True)
547
547
548 # In case of success, there is an empty frame and a frame containing
548 # In case of success, there is an empty frame and a frame containing
549 # the integer result (as a string).
549 # the integer result (as a string).
550 # In case of error, there is a non-empty frame containing the error.
550 # In case of error, there is a non-empty frame containing the error.
551 r = self._readframed()
551 r = self._readframed()
552 if r:
552 if r:
553 return b'', r
553 return b'', r
554 return self._readframed(), b''
554 return self._readframed(), b''
555
555
556 def _calltwowaystream(self, cmd, fp, **args):
556 def _calltwowaystream(self, cmd, fp, **args):
557 # The server responds with an empty frame if the client should
557 # The server responds with an empty frame if the client should
558 # continue submitting the payload.
558 # continue submitting the payload.
559 r = self._call(cmd, **args)
559 r = self._call(cmd, **args)
560 if r:
560 if r:
561 # XXX needs to be made better
561 # XXX needs to be made better
562 raise error.Abort(_(b'unexpected remote reply: %s') % r)
562 raise error.Abort(_(b'unexpected remote reply: %s') % r)
563
563
564 # The payload consists of frames with content followed by an empty
564 # The payload consists of frames with content followed by an empty
565 # frame.
565 # frame.
566 for d in iter(lambda: fp.read(4096), b''):
566 for d in iter(lambda: fp.read(4096), b''):
567 self._writeframed(d)
567 self._writeframed(d)
568 self._writeframed(b"", flush=True)
568 self._writeframed(b"", flush=True)
569
569
570 return self._pipei
570 return self._pipei
571
571
572 def _getamount(self):
572 def _getamount(self):
573 l = self._pipei.readline()
573 l = self._pipei.readline()
574 if l == b'\n':
574 if l == b'\n':
575 if self._autoreadstderr:
575 if self._autoreadstderr:
576 self._readerr()
576 self._readerr()
577 msg = _(b'check previous remote output')
577 msg = _(b'check previous remote output')
578 self._abort(error.OutOfBandError(hint=msg))
578 self._abort(error.OutOfBandError(hint=msg))
579 if self._autoreadstderr:
579 if self._autoreadstderr:
580 self._readerr()
580 self._readerr()
581 try:
581 try:
582 return int(l)
582 return int(l)
583 except ValueError:
583 except ValueError:
584 self._abort(error.ResponseError(_(b"unexpected response:"), l))
584 self._abort(error.ResponseError(_(b"unexpected response:"), l))
585
585
586 def _readframed(self):
586 def _readframed(self):
587 size = self._getamount()
587 size = self._getamount()
588 if not size:
588 if not size:
589 return b''
589 return b''
590
590
591 return self._pipei.read(size)
591 return self._pipei.read(size)
592
592
593 def _writeframed(self, data, flush=False):
593 def _writeframed(self, data, flush=False):
594 self._pipeo.write(b"%d\n" % len(data))
594 self._pipeo.write(b"%d\n" % len(data))
595 if data:
595 if data:
596 self._pipeo.write(data)
596 self._pipeo.write(data)
597 if flush:
597 if flush:
598 self._pipeo.flush()
598 self._pipeo.flush()
599 if self._autoreadstderr:
599 if self._autoreadstderr:
600 self._readerr()
600 self._readerr()
601
601
602
602
603 class sshv2peer(sshv1peer):
603 class sshv2peer(sshv1peer):
604 """A peer that speakers version 2 of the transport protocol."""
604 """A peer that speakers version 2 of the transport protocol."""
605
605
606 # Currently version 2 is identical to version 1 post handshake.
606 # Currently version 2 is identical to version 1 post handshake.
607 # And handshake is performed before the peer is instantiated. So
607 # And handshake is performed before the peer is instantiated. So
608 # we need no custom code.
608 # we need no custom code.
609
609
610
610
611 def makepeer(ui, path, proc, stdin, stdout, stderr, autoreadstderr=True):
611 def makepeer(ui, path, proc, stdin, stdout, stderr, autoreadstderr=True):
612 """Make a peer instance from existing pipes.
612 """Make a peer instance from existing pipes.
613
613
614 ``path`` and ``proc`` are stored on the eventual peer instance and may
614 ``path`` and ``proc`` are stored on the eventual peer instance and may
615 not be used for anything meaningful.
615 not be used for anything meaningful.
616
616
617 ``stdin``, ``stdout``, and ``stderr`` are the pipes connected to the
617 ``stdin``, ``stdout``, and ``stderr`` are the pipes connected to the
618 SSH server's stdio handles.
618 SSH server's stdio handles.
619
619
620 This function is factored out to allow creating peers that don't
620 This function is factored out to allow creating peers that don't
621 actually spawn a new process. It is useful for starting SSH protocol
621 actually spawn a new process. It is useful for starting SSH protocol
622 servers and clients via non-standard means, which can be useful for
622 servers and clients via non-standard means, which can be useful for
623 testing.
623 testing.
624 """
624 """
625 try:
625 try:
626 protoname, caps = _performhandshake(ui, stdin, stdout, stderr)
626 protoname, caps = _performhandshake(ui, stdin, stdout, stderr)
627 except Exception:
627 except Exception:
628 _cleanuppipes(ui, stdout, stdin, stderr, warn=None)
628 _cleanuppipes(ui, stdout, stdin, stderr, warn=None)
629 raise
629 raise
630
630
631 if protoname == wireprototypes.SSHV1:
631 if protoname == wireprototypes.SSHV1:
632 return sshv1peer(
632 return sshv1peer(
633 ui,
633 ui,
634 path,
634 path,
635 proc,
635 proc,
636 stdin,
636 stdin,
637 stdout,
637 stdout,
638 stderr,
638 stderr,
639 caps,
639 caps,
640 autoreadstderr=autoreadstderr,
640 autoreadstderr=autoreadstderr,
641 )
641 )
642 elif protoname == wireprototypes.SSHV2:
642 elif protoname == wireprototypes.SSHV2:
643 return sshv2peer(
643 return sshv2peer(
644 ui,
644 ui,
645 path,
645 path,
646 proc,
646 proc,
647 stdin,
647 stdin,
648 stdout,
648 stdout,
649 stderr,
649 stderr,
650 caps,
650 caps,
651 autoreadstderr=autoreadstderr,
651 autoreadstderr=autoreadstderr,
652 )
652 )
653 else:
653 else:
654 _cleanuppipes(ui, stdout, stdin, stderr, warn=None)
654 _cleanuppipes(ui, stdout, stdin, stderr, warn=None)
655 raise error.RepoError(
655 raise error.RepoError(
656 _(b'unknown version of SSH protocol: %s') % protoname
656 _(b'unknown version of SSH protocol: %s') % protoname
657 )
657 )
658
658
659
659
660 def instance(ui, path, create, intents=None, createopts=None):
660 def instance(ui, path, create, intents=None, createopts=None):
661 """Create an SSH peer.
661 """Create an SSH peer.
662
662
663 The returned object conforms to the ``wireprotov1peer.wirepeer`` interface.
663 The returned object conforms to the ``wireprotov1peer.wirepeer`` interface.
664 """
664 """
665 u = util.url(path, parsequery=False, parsefragment=False)
665 u = util.url(path, parsequery=False, parsefragment=False)
666 if u.scheme != b'ssh' or not u.host or u.path is None:
666 if u.scheme != b'ssh' or not u.host or u.path is None:
667 raise error.RepoError(_(b"couldn't parse location %s") % path)
667 raise error.RepoError(_(b"couldn't parse location %s") % path)
668
668
669 util.checksafessh(path)
669 util.checksafessh(path)
670
670
671 if u.passwd is not None:
671 if u.passwd is not None:
672 raise error.RepoError(_(b'password in URL not supported'))
672 raise error.RepoError(_(b'password in URL not supported'))
673
673
674 sshcmd = ui.config(b'ui', b'ssh')
674 sshcmd = ui.config(b'ui', b'ssh')
675 remotecmd = ui.config(b'ui', b'remotecmd')
675 remotecmd = ui.config(b'ui', b'remotecmd')
676 sshaddenv = dict(ui.configitems(b'sshenv'))
676 sshaddenv = dict(ui.configitems(b'sshenv'))
677 sshenv = procutil.shellenviron(sshaddenv)
677 sshenv = procutil.shellenviron(sshaddenv)
678 remotepath = u.path or b'.'
678 remotepath = u.path or b'.'
679
679
680 args = procutil.sshargs(sshcmd, u.host, u.user, u.port)
680 args = procutil.sshargs(sshcmd, u.host, u.user, u.port)
681
681
682 if create:
682 if create:
683 # We /could/ do this, but only if the remote init command knows how to
683 # We /could/ do this, but only if the remote init command knows how to
684 # handle them. We don't yet make any assumptions about that. And without
684 # handle them. We don't yet make any assumptions about that. And without
685 # querying the remote, there's no way of knowing if the remote even
685 # querying the remote, there's no way of knowing if the remote even
686 # supports said requested feature.
686 # supports said requested feature.
687 if createopts:
687 if createopts:
688 raise error.RepoError(
688 raise error.RepoError(
689 _(
689 _(
690 b'cannot create remote SSH repositories '
690 b'cannot create remote SSH repositories '
691 b'with extra options'
691 b'with extra options'
692 )
692 )
693 )
693 )
694
694
695 cmd = b'%s %s %s' % (
695 cmd = b'%s %s %s' % (
696 sshcmd,
696 sshcmd,
697 args,
697 args,
698 procutil.shellquote(
698 procutil.shellquote(
699 b'%s init %s'
699 b'%s init %s'
700 % (_serverquote(remotecmd), _serverquote(remotepath))
700 % (_serverquote(remotecmd), _serverquote(remotepath))
701 ),
701 ),
702 )
702 )
703 ui.debug(b'running %s\n' % cmd)
703 ui.debug(b'running %s\n' % cmd)
704 res = ui.system(cmd, blockedtag=b'sshpeer', environ=sshenv)
704 res = ui.system(cmd, blockedtag=b'sshpeer', environ=sshenv)
705 if res != 0:
705 if res != 0:
706 raise error.RepoError(_(b'could not create remote repo'))
706 raise error.RepoError(_(b'could not create remote repo'))
707
707
708 proc, stdin, stdout, stderr = _makeconnection(
708 proc, stdin, stdout, stderr = _makeconnection(
709 ui, sshcmd, args, remotecmd, remotepath, sshenv
709 ui, sshcmd, args, remotecmd, remotepath, sshenv
710 )
710 )
711
711
712 peer = makepeer(ui, path, proc, stdin, stdout, stderr)
712 peer = makepeer(ui, path, proc, stdin, stdout, stderr)
713
713
714 # Finally, if supported by the server, notify it about our own
714 # Finally, if supported by the server, notify it about our own
715 # capabilities.
715 # capabilities.
716 if b'protocaps' in peer.capabilities():
716 if b'protocaps' in peer.capabilities():
717 try:
717 try:
718 peer._call(
718 peer._call(
719 b"protocaps", caps=b' '.join(sorted(_clientcapabilities()))
719 b"protocaps", caps=b' '.join(sorted(_clientcapabilities()))
720 )
720 )
721 except IOError:
721 except IOError:
722 peer._cleanup()
722 peer._cleanup()
723 raise error.RepoError(_(b'capability exchange failed'))
723 raise error.RepoError(_(b'capability exchange failed'))
724
724
725 return peer
725 return peer
@@ -1,13 +1,13 b''
1 $ hg init a
1 $ hg init a
2 $ cd a
2 $ cd a
3 $ touch a; hg commit -qAm_
3 $ touch a; hg commit -qAm_
4 $ hg bookmark $(for i in $($TESTDIR/seq.py 0 20); do echo b$i; done)
4 $ hg bookmark $(for i in $($TESTDIR/seq.py 0 20); do echo b$i; done)
5 $ hg clone . ../b -q
5 $ hg clone . ../b -q
6 $ cd ../b
6 $ cd ../b
7
7
8 Checking that when lookup multiple bookmarks in one go, if one of them
8 Checking that when lookup multiple bookmarks in one go, if one of them
9 fails (thus causing the sshpeer to be stopped), the errors from the
9 fails (thus causing the sshpeer to be stopped), the errors from the
10 further lookups don't result in tracebacks.
10 further lookups don't result in tracebacks.
11
11
12 $ hg pull -r b0 -r nosuchbookmark $(for i in $($TESTDIR/seq.py 1 20); do echo -r b$i; done) -e "\"$PYTHON\" \"$TESTDIR/dummyssh\"" ssh://user@dummy/$(pwd)/../a |& tail -n 1
12 $ hg pull -r b0 -r nosuchbookmark $(for i in $($TESTDIR/seq.py 1 20); do echo -r b$i; done) -e "\"$PYTHON\" \"$TESTDIR/dummyssh\"" ssh://user@dummy/$(pwd)/../a |& tail -n 1
13 ValueError: I/O operation on closed file
13 StopIteration
General Comments 0
You need to be logged in to leave comments. Login now