##// END OF EJS Templates
sshpeer: make sshpeer.close() close the underlying connection...
Valentin Gatien-Baron -
r47415:8c490610 default
parent child Browse files
Show More
@@ -1,707 +1,707 b''
1 # sshpeer.py - ssh repository proxy class for mercurial
1 # sshpeer.py - ssh repository proxy class for mercurial
2 #
2 #
3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import re
10 import re
11 import uuid
11 import uuid
12
12
13 from .i18n import _
13 from .i18n import _
14 from .pycompat import getattr
14 from .pycompat import getattr
15 from . import (
15 from . import (
16 error,
16 error,
17 pycompat,
17 pycompat,
18 util,
18 util,
19 wireprotoserver,
19 wireprotoserver,
20 wireprototypes,
20 wireprototypes,
21 wireprotov1peer,
21 wireprotov1peer,
22 wireprotov1server,
22 wireprotov1server,
23 )
23 )
24 from .utils import (
24 from .utils import (
25 procutil,
25 procutil,
26 stringutil,
26 stringutil,
27 )
27 )
28
28
29
29
30 def _serverquote(s):
30 def _serverquote(s):
31 """quote a string for the remote shell ... which we assume is sh"""
31 """quote a string for the remote shell ... which we assume is sh"""
32 if not s:
32 if not s:
33 return s
33 return s
34 if re.match(b'[a-zA-Z0-9@%_+=:,./-]*$', s):
34 if re.match(b'[a-zA-Z0-9@%_+=:,./-]*$', s):
35 return s
35 return s
36 return b"'%s'" % s.replace(b"'", b"'\\''")
36 return b"'%s'" % s.replace(b"'", b"'\\''")
37
37
38
38
39 def _forwardoutput(ui, pipe, warn=False):
39 def _forwardoutput(ui, pipe, warn=False):
40 """display all data currently available on pipe as remote output.
40 """display all data currently available on pipe as remote output.
41
41
42 This is non blocking."""
42 This is non blocking."""
43 if pipe:
43 if pipe:
44 s = procutil.readpipe(pipe)
44 s = procutil.readpipe(pipe)
45 if s:
45 if s:
46 display = ui.warn if warn else ui.status
46 display = ui.warn if warn else ui.status
47 for l in s.splitlines():
47 for l in s.splitlines():
48 display(_(b"remote: "), l, b'\n')
48 display(_(b"remote: "), l, b'\n')
49
49
50
50
51 class doublepipe(object):
51 class doublepipe(object):
52 """Operate a side-channel pipe in addition of a main one
52 """Operate a side-channel pipe in addition of a main one
53
53
54 The side-channel pipe contains server output to be forwarded to the user
54 The side-channel pipe contains server output to be forwarded to the user
55 input. The double pipe will behave as the "main" pipe, but will ensure the
55 input. The double pipe will behave as the "main" pipe, but will ensure the
56 content of the "side" pipe is properly processed while we wait for blocking
56 content of the "side" pipe is properly processed while we wait for blocking
57 call on the "main" pipe.
57 call on the "main" pipe.
58
58
59 If large amounts of data are read from "main", the forward will cease after
59 If large amounts of data are read from "main", the forward will cease after
60 the first bytes start to appear. This simplifies the implementation
60 the first bytes start to appear. This simplifies the implementation
61 without affecting actual output of sshpeer too much as we rarely issue
61 without affecting actual output of sshpeer too much as we rarely issue
62 large read for data not yet emitted by the server.
62 large read for data not yet emitted by the server.
63
63
64 The main pipe is expected to be a 'bufferedinputpipe' from the util module
64 The main pipe is expected to be a 'bufferedinputpipe' from the util module
65 that handle all the os specific bits. This class lives in this module
65 that handle all the os specific bits. This class lives in this module
66 because it focus on behavior specific to the ssh protocol."""
66 because it focus on behavior specific to the ssh protocol."""
67
67
68 def __init__(self, ui, main, side):
68 def __init__(self, ui, main, side):
69 self._ui = ui
69 self._ui = ui
70 self._main = main
70 self._main = main
71 self._side = side
71 self._side = side
72
72
73 def _wait(self):
73 def _wait(self):
74 """wait until some data are available on main or side
74 """wait until some data are available on main or side
75
75
76 return a pair of boolean (ismainready, issideready)
76 return a pair of boolean (ismainready, issideready)
77
77
78 (This will only wait for data if the setup is supported by `util.poll`)
78 (This will only wait for data if the setup is supported by `util.poll`)
79 """
79 """
80 if (
80 if (
81 isinstance(self._main, util.bufferedinputpipe)
81 isinstance(self._main, util.bufferedinputpipe)
82 and self._main.hasbuffer
82 and self._main.hasbuffer
83 ):
83 ):
84 # Main has data. Assume side is worth poking at.
84 # Main has data. Assume side is worth poking at.
85 return True, True
85 return True, True
86
86
87 fds = [self._main.fileno(), self._side.fileno()]
87 fds = [self._main.fileno(), self._side.fileno()]
88 try:
88 try:
89 act = util.poll(fds)
89 act = util.poll(fds)
90 except NotImplementedError:
90 except NotImplementedError:
91 # non supported yet case, assume all have data.
91 # non supported yet case, assume all have data.
92 act = fds
92 act = fds
93 return (self._main.fileno() in act, self._side.fileno() in act)
93 return (self._main.fileno() in act, self._side.fileno() in act)
94
94
95 def write(self, data):
95 def write(self, data):
96 return self._call(b'write', data)
96 return self._call(b'write', data)
97
97
98 def read(self, size):
98 def read(self, size):
99 r = self._call(b'read', size)
99 r = self._call(b'read', size)
100 if size != 0 and not r:
100 if size != 0 and not r:
101 # We've observed a condition that indicates the
101 # We've observed a condition that indicates the
102 # stdout closed unexpectedly. Check stderr one
102 # stdout closed unexpectedly. Check stderr one
103 # more time and snag anything that's there before
103 # more time and snag anything that's there before
104 # letting anyone know the main part of the pipe
104 # letting anyone know the main part of the pipe
105 # closed prematurely.
105 # closed prematurely.
106 _forwardoutput(self._ui, self._side)
106 _forwardoutput(self._ui, self._side)
107 return r
107 return r
108
108
109 def unbufferedread(self, size):
109 def unbufferedread(self, size):
110 r = self._call(b'unbufferedread', size)
110 r = self._call(b'unbufferedread', size)
111 if size != 0 and not r:
111 if size != 0 and not r:
112 # We've observed a condition that indicates the
112 # We've observed a condition that indicates the
113 # stdout closed unexpectedly. Check stderr one
113 # stdout closed unexpectedly. Check stderr one
114 # more time and snag anything that's there before
114 # more time and snag anything that's there before
115 # letting anyone know the main part of the pipe
115 # letting anyone know the main part of the pipe
116 # closed prematurely.
116 # closed prematurely.
117 _forwardoutput(self._ui, self._side)
117 _forwardoutput(self._ui, self._side)
118 return r
118 return r
119
119
120 def readline(self):
120 def readline(self):
121 return self._call(b'readline')
121 return self._call(b'readline')
122
122
123 def _call(self, methname, data=None):
123 def _call(self, methname, data=None):
124 """call <methname> on "main", forward output of "side" while blocking"""
124 """call <methname> on "main", forward output of "side" while blocking"""
125 # data can be '' or 0
125 # data can be '' or 0
126 if (data is not None and not data) or self._main.closed:
126 if (data is not None and not data) or self._main.closed:
127 _forwardoutput(self._ui, self._side)
127 _forwardoutput(self._ui, self._side)
128 return b''
128 return b''
129 while True:
129 while True:
130 mainready, sideready = self._wait()
130 mainready, sideready = self._wait()
131 if sideready:
131 if sideready:
132 _forwardoutput(self._ui, self._side)
132 _forwardoutput(self._ui, self._side)
133 if mainready:
133 if mainready:
134 meth = getattr(self._main, methname)
134 meth = getattr(self._main, methname)
135 if data is None:
135 if data is None:
136 return meth()
136 return meth()
137 else:
137 else:
138 return meth(data)
138 return meth(data)
139
139
140 def close(self):
140 def close(self):
141 return self._main.close()
141 return self._main.close()
142
142
143 def flush(self):
143 def flush(self):
144 return self._main.flush()
144 return self._main.flush()
145
145
146
146
147 def _cleanuppipes(ui, pipei, pipeo, pipee):
147 def _cleanuppipes(ui, pipei, pipeo, pipee):
148 """Clean up pipes used by an SSH connection."""
148 """Clean up pipes used by an SSH connection."""
149 if pipeo:
149 if pipeo:
150 pipeo.close()
150 pipeo.close()
151 if pipei:
151 if pipei:
152 pipei.close()
152 pipei.close()
153
153
154 if pipee:
154 if pipee:
155 # Try to read from the err descriptor until EOF.
155 # Try to read from the err descriptor until EOF.
156 try:
156 try:
157 for l in pipee:
157 for l in pipee:
158 ui.status(_(b'remote: '), l)
158 ui.status(_(b'remote: '), l)
159 except (IOError, ValueError):
159 except (IOError, ValueError):
160 pass
160 pass
161
161
162 pipee.close()
162 pipee.close()
163
163
164
164
165 def _makeconnection(ui, sshcmd, args, remotecmd, path, sshenv=None):
165 def _makeconnection(ui, sshcmd, args, remotecmd, path, sshenv=None):
166 """Create an SSH connection to a server.
166 """Create an SSH connection to a server.
167
167
168 Returns a tuple of (process, stdin, stdout, stderr) for the
168 Returns a tuple of (process, stdin, stdout, stderr) for the
169 spawned process.
169 spawned process.
170 """
170 """
171 cmd = b'%s %s %s' % (
171 cmd = b'%s %s %s' % (
172 sshcmd,
172 sshcmd,
173 args,
173 args,
174 procutil.shellquote(
174 procutil.shellquote(
175 b'%s -R %s serve --stdio'
175 b'%s -R %s serve --stdio'
176 % (_serverquote(remotecmd), _serverquote(path))
176 % (_serverquote(remotecmd), _serverquote(path))
177 ),
177 ),
178 )
178 )
179
179
180 ui.debug(b'running %s\n' % cmd)
180 ui.debug(b'running %s\n' % cmd)
181
181
182 # no buffer allow the use of 'select'
182 # no buffer allow the use of 'select'
183 # feel free to remove buffering and select usage when we ultimately
183 # feel free to remove buffering and select usage when we ultimately
184 # move to threading.
184 # move to threading.
185 stdin, stdout, stderr, proc = procutil.popen4(cmd, bufsize=0, env=sshenv)
185 stdin, stdout, stderr, proc = procutil.popen4(cmd, bufsize=0, env=sshenv)
186
186
187 return proc, stdin, stdout, stderr
187 return proc, stdin, stdout, stderr
188
188
189
189
190 def _clientcapabilities():
190 def _clientcapabilities():
191 """Return list of capabilities of this client.
191 """Return list of capabilities of this client.
192
192
193 Returns a list of capabilities that are supported by this client.
193 Returns a list of capabilities that are supported by this client.
194 """
194 """
195 protoparams = {b'partial-pull'}
195 protoparams = {b'partial-pull'}
196 comps = [
196 comps = [
197 e.wireprotosupport().name
197 e.wireprotosupport().name
198 for e in util.compengines.supportedwireengines(util.CLIENTROLE)
198 for e in util.compengines.supportedwireengines(util.CLIENTROLE)
199 ]
199 ]
200 protoparams.add(b'comp=%s' % b','.join(comps))
200 protoparams.add(b'comp=%s' % b','.join(comps))
201 return protoparams
201 return protoparams
202
202
203
203
204 def _performhandshake(ui, stdin, stdout, stderr):
204 def _performhandshake(ui, stdin, stdout, stderr):
205 def badresponse():
205 def badresponse():
206 # Flush any output on stderr. In general, the stderr contains errors
206 # Flush any output on stderr. In general, the stderr contains errors
207 # from the remote (ssh errors, some hg errors), and status indications
207 # from the remote (ssh errors, some hg errors), and status indications
208 # (like "adding changes"), with no current way to tell them apart.
208 # (like "adding changes"), with no current way to tell them apart.
209 # Here we failed so early that it's almost certainly only errors, so
209 # Here we failed so early that it's almost certainly only errors, so
210 # use warn=True so -q doesn't hide them.
210 # use warn=True so -q doesn't hide them.
211 _forwardoutput(ui, stderr, warn=True)
211 _forwardoutput(ui, stderr, warn=True)
212
212
213 msg = _(b'no suitable response from remote hg')
213 msg = _(b'no suitable response from remote hg')
214 hint = ui.config(b'ui', b'ssherrorhint')
214 hint = ui.config(b'ui', b'ssherrorhint')
215 raise error.RepoError(msg, hint=hint)
215 raise error.RepoError(msg, hint=hint)
216
216
217 # The handshake consists of sending wire protocol commands in reverse
217 # The handshake consists of sending wire protocol commands in reverse
218 # order of protocol implementation and then sniffing for a response
218 # order of protocol implementation and then sniffing for a response
219 # to one of them.
219 # to one of them.
220 #
220 #
221 # Those commands (from oldest to newest) are:
221 # Those commands (from oldest to newest) are:
222 #
222 #
223 # ``between``
223 # ``between``
224 # Asks for the set of revisions between a pair of revisions. Command
224 # Asks for the set of revisions between a pair of revisions. Command
225 # present in all Mercurial server implementations.
225 # present in all Mercurial server implementations.
226 #
226 #
227 # ``hello``
227 # ``hello``
228 # Instructs the server to advertise its capabilities. Introduced in
228 # Instructs the server to advertise its capabilities. Introduced in
229 # Mercurial 0.9.1.
229 # Mercurial 0.9.1.
230 #
230 #
231 # ``upgrade``
231 # ``upgrade``
232 # Requests upgrade from default transport protocol version 1 to
232 # Requests upgrade from default transport protocol version 1 to
233 # a newer version. Introduced in Mercurial 4.6 as an experimental
233 # a newer version. Introduced in Mercurial 4.6 as an experimental
234 # feature.
234 # feature.
235 #
235 #
236 # The ``between`` command is issued with a request for the null
236 # The ``between`` command is issued with a request for the null
237 # range. If the remote is a Mercurial server, this request will
237 # range. If the remote is a Mercurial server, this request will
238 # generate a specific response: ``1\n\n``. This represents the
238 # generate a specific response: ``1\n\n``. This represents the
239 # wire protocol encoded value for ``\n``. We look for ``1\n\n``
239 # wire protocol encoded value for ``\n``. We look for ``1\n\n``
240 # in the output stream and know this is the response to ``between``
240 # in the output stream and know this is the response to ``between``
241 # and we're at the end of our handshake reply.
241 # and we're at the end of our handshake reply.
242 #
242 #
243 # The response to the ``hello`` command will be a line with the
243 # The response to the ``hello`` command will be a line with the
244 # length of the value returned by that command followed by that
244 # length of the value returned by that command followed by that
245 # value. If the server doesn't support ``hello`` (which should be
245 # value. If the server doesn't support ``hello`` (which should be
246 # rare), that line will be ``0\n``. Otherwise, the value will contain
246 # rare), that line will be ``0\n``. Otherwise, the value will contain
247 # RFC 822 like lines. Of these, the ``capabilities:`` line contains
247 # RFC 822 like lines. Of these, the ``capabilities:`` line contains
248 # the capabilities of the server.
248 # the capabilities of the server.
249 #
249 #
250 # The ``upgrade`` command isn't really a command in the traditional
250 # The ``upgrade`` command isn't really a command in the traditional
251 # sense of version 1 of the transport because it isn't using the
251 # sense of version 1 of the transport because it isn't using the
252 # proper mechanism for formatting insteads: instead, it just encodes
252 # proper mechanism for formatting insteads: instead, it just encodes
253 # arguments on the line, delimited by spaces.
253 # arguments on the line, delimited by spaces.
254 #
254 #
255 # The ``upgrade`` line looks like ``upgrade <token> <capabilities>``.
255 # The ``upgrade`` line looks like ``upgrade <token> <capabilities>``.
256 # If the server doesn't support protocol upgrades, it will reply to
256 # If the server doesn't support protocol upgrades, it will reply to
257 # this line with ``0\n``. Otherwise, it emits an
257 # this line with ``0\n``. Otherwise, it emits an
258 # ``upgraded <token> <protocol>`` line to both stdout and stderr.
258 # ``upgraded <token> <protocol>`` line to both stdout and stderr.
259 # Content immediately following this line describes additional
259 # Content immediately following this line describes additional
260 # protocol and server state.
260 # protocol and server state.
261 #
261 #
262 # In addition to the responses to our command requests, the server
262 # In addition to the responses to our command requests, the server
263 # may emit "banner" output on stdout. SSH servers are allowed to
263 # may emit "banner" output on stdout. SSH servers are allowed to
264 # print messages to stdout on login. Issuing commands on connection
264 # print messages to stdout on login. Issuing commands on connection
265 # allows us to flush this banner output from the server by scanning
265 # allows us to flush this banner output from the server by scanning
266 # for output to our well-known ``between`` command. Of course, if
266 # for output to our well-known ``between`` command. Of course, if
267 # the banner contains ``1\n\n``, this will throw off our detection.
267 # the banner contains ``1\n\n``, this will throw off our detection.
268
268
269 requestlog = ui.configbool(b'devel', b'debug.peer-request')
269 requestlog = ui.configbool(b'devel', b'debug.peer-request')
270
270
271 # Generate a random token to help identify responses to version 2
271 # Generate a random token to help identify responses to version 2
272 # upgrade request.
272 # upgrade request.
273 token = pycompat.sysbytes(str(uuid.uuid4()))
273 token = pycompat.sysbytes(str(uuid.uuid4()))
274 upgradecaps = [
274 upgradecaps = [
275 (b'proto', wireprotoserver.SSHV2),
275 (b'proto', wireprotoserver.SSHV2),
276 ]
276 ]
277 upgradecaps = util.urlreq.urlencode(upgradecaps)
277 upgradecaps = util.urlreq.urlencode(upgradecaps)
278
278
279 try:
279 try:
280 pairsarg = b'%s-%s' % (b'0' * 40, b'0' * 40)
280 pairsarg = b'%s-%s' % (b'0' * 40, b'0' * 40)
281 handshake = [
281 handshake = [
282 b'hello\n',
282 b'hello\n',
283 b'between\n',
283 b'between\n',
284 b'pairs %d\n' % len(pairsarg),
284 b'pairs %d\n' % len(pairsarg),
285 pairsarg,
285 pairsarg,
286 ]
286 ]
287
287
288 # Request upgrade to version 2 if configured.
288 # Request upgrade to version 2 if configured.
289 if ui.configbool(b'experimental', b'sshpeer.advertise-v2'):
289 if ui.configbool(b'experimental', b'sshpeer.advertise-v2'):
290 ui.debug(b'sending upgrade request: %s %s\n' % (token, upgradecaps))
290 ui.debug(b'sending upgrade request: %s %s\n' % (token, upgradecaps))
291 handshake.insert(0, b'upgrade %s %s\n' % (token, upgradecaps))
291 handshake.insert(0, b'upgrade %s %s\n' % (token, upgradecaps))
292
292
293 if requestlog:
293 if requestlog:
294 ui.debug(b'devel-peer-request: hello+between\n')
294 ui.debug(b'devel-peer-request: hello+between\n')
295 ui.debug(b'devel-peer-request: pairs: %d bytes\n' % len(pairsarg))
295 ui.debug(b'devel-peer-request: pairs: %d bytes\n' % len(pairsarg))
296 ui.debug(b'sending hello command\n')
296 ui.debug(b'sending hello command\n')
297 ui.debug(b'sending between command\n')
297 ui.debug(b'sending between command\n')
298
298
299 stdin.write(b''.join(handshake))
299 stdin.write(b''.join(handshake))
300 stdin.flush()
300 stdin.flush()
301 except IOError:
301 except IOError:
302 badresponse()
302 badresponse()
303
303
304 # Assume version 1 of wire protocol by default.
304 # Assume version 1 of wire protocol by default.
305 protoname = wireprototypes.SSHV1
305 protoname = wireprototypes.SSHV1
306 reupgraded = re.compile(b'^upgraded %s (.*)$' % stringutil.reescape(token))
306 reupgraded = re.compile(b'^upgraded %s (.*)$' % stringutil.reescape(token))
307
307
308 lines = [b'', b'dummy']
308 lines = [b'', b'dummy']
309 max_noise = 500
309 max_noise = 500
310 while lines[-1] and max_noise:
310 while lines[-1] and max_noise:
311 try:
311 try:
312 l = stdout.readline()
312 l = stdout.readline()
313 _forwardoutput(ui, stderr, warn=True)
313 _forwardoutput(ui, stderr, warn=True)
314
314
315 # Look for reply to protocol upgrade request. It has a token
315 # Look for reply to protocol upgrade request. It has a token
316 # in it, so there should be no false positives.
316 # in it, so there should be no false positives.
317 m = reupgraded.match(l)
317 m = reupgraded.match(l)
318 if m:
318 if m:
319 protoname = m.group(1)
319 protoname = m.group(1)
320 ui.debug(b'protocol upgraded to %s\n' % protoname)
320 ui.debug(b'protocol upgraded to %s\n' % protoname)
321 # If an upgrade was handled, the ``hello`` and ``between``
321 # If an upgrade was handled, the ``hello`` and ``between``
322 # requests are ignored. The next output belongs to the
322 # requests are ignored. The next output belongs to the
323 # protocol, so stop scanning lines.
323 # protocol, so stop scanning lines.
324 break
324 break
325
325
326 # Otherwise it could be a banner, ``0\n`` response if server
326 # Otherwise it could be a banner, ``0\n`` response if server
327 # doesn't support upgrade.
327 # doesn't support upgrade.
328
328
329 if lines[-1] == b'1\n' and l == b'\n':
329 if lines[-1] == b'1\n' and l == b'\n':
330 break
330 break
331 if l:
331 if l:
332 ui.debug(b'remote: ', l)
332 ui.debug(b'remote: ', l)
333 lines.append(l)
333 lines.append(l)
334 max_noise -= 1
334 max_noise -= 1
335 except IOError:
335 except IOError:
336 badresponse()
336 badresponse()
337 else:
337 else:
338 badresponse()
338 badresponse()
339
339
340 caps = set()
340 caps = set()
341
341
342 # For version 1, we should see a ``capabilities`` line in response to the
342 # For version 1, we should see a ``capabilities`` line in response to the
343 # ``hello`` command.
343 # ``hello`` command.
344 if protoname == wireprototypes.SSHV1:
344 if protoname == wireprototypes.SSHV1:
345 for l in reversed(lines):
345 for l in reversed(lines):
346 # Look for response to ``hello`` command. Scan from the back so
346 # Look for response to ``hello`` command. Scan from the back so
347 # we don't misinterpret banner output as the command reply.
347 # we don't misinterpret banner output as the command reply.
348 if l.startswith(b'capabilities:'):
348 if l.startswith(b'capabilities:'):
349 caps.update(l[:-1].split(b':')[1].split())
349 caps.update(l[:-1].split(b':')[1].split())
350 break
350 break
351 elif protoname == wireprotoserver.SSHV2:
351 elif protoname == wireprotoserver.SSHV2:
352 # We see a line with number of bytes to follow and then a value
352 # We see a line with number of bytes to follow and then a value
353 # looking like ``capabilities: *``.
353 # looking like ``capabilities: *``.
354 line = stdout.readline()
354 line = stdout.readline()
355 try:
355 try:
356 valuelen = int(line)
356 valuelen = int(line)
357 except ValueError:
357 except ValueError:
358 badresponse()
358 badresponse()
359
359
360 capsline = stdout.read(valuelen)
360 capsline = stdout.read(valuelen)
361 if not capsline.startswith(b'capabilities: '):
361 if not capsline.startswith(b'capabilities: '):
362 badresponse()
362 badresponse()
363
363
364 ui.debug(b'remote: %s\n' % capsline)
364 ui.debug(b'remote: %s\n' % capsline)
365
365
366 caps.update(capsline.split(b':')[1].split())
366 caps.update(capsline.split(b':')[1].split())
367 # Trailing newline.
367 # Trailing newline.
368 stdout.read(1)
368 stdout.read(1)
369
369
370 # Error if we couldn't find capabilities, this means:
370 # Error if we couldn't find capabilities, this means:
371 #
371 #
372 # 1. Remote isn't a Mercurial server
372 # 1. Remote isn't a Mercurial server
373 # 2. Remote is a <0.9.1 Mercurial server
373 # 2. Remote is a <0.9.1 Mercurial server
374 # 3. Remote is a future Mercurial server that dropped ``hello``
374 # 3. Remote is a future Mercurial server that dropped ``hello``
375 # and other attempted handshake mechanisms.
375 # and other attempted handshake mechanisms.
376 if not caps:
376 if not caps:
377 badresponse()
377 badresponse()
378
378
379 # Flush any output on stderr before proceeding.
379 # Flush any output on stderr before proceeding.
380 _forwardoutput(ui, stderr, warn=True)
380 _forwardoutput(ui, stderr, warn=True)
381
381
382 return protoname, caps
382 return protoname, caps
383
383
384
384
385 class sshv1peer(wireprotov1peer.wirepeer):
385 class sshv1peer(wireprotov1peer.wirepeer):
386 def __init__(
386 def __init__(
387 self, ui, url, proc, stdin, stdout, stderr, caps, autoreadstderr=True
387 self, ui, url, proc, stdin, stdout, stderr, caps, autoreadstderr=True
388 ):
388 ):
389 """Create a peer from an existing SSH connection.
389 """Create a peer from an existing SSH connection.
390
390
391 ``proc`` is a handle on the underlying SSH process.
391 ``proc`` is a handle on the underlying SSH process.
392 ``stdin``, ``stdout``, and ``stderr`` are handles on the stdio
392 ``stdin``, ``stdout``, and ``stderr`` are handles on the stdio
393 pipes for that process.
393 pipes for that process.
394 ``caps`` is a set of capabilities supported by the remote.
394 ``caps`` is a set of capabilities supported by the remote.
395 ``autoreadstderr`` denotes whether to automatically read from
395 ``autoreadstderr`` denotes whether to automatically read from
396 stderr and to forward its output.
396 stderr and to forward its output.
397 """
397 """
398 self._url = url
398 self._url = url
399 self.ui = ui
399 self.ui = ui
400 # self._subprocess is unused. Keeping a handle on the process
400 # self._subprocess is unused. Keeping a handle on the process
401 # holds a reference and prevents it from being garbage collected.
401 # holds a reference and prevents it from being garbage collected.
402 self._subprocess = proc
402 self._subprocess = proc
403
403
404 # And we hook up our "doublepipe" wrapper to allow querying
404 # And we hook up our "doublepipe" wrapper to allow querying
405 # stderr any time we perform I/O.
405 # stderr any time we perform I/O.
406 if autoreadstderr:
406 if autoreadstderr:
407 stdout = doublepipe(ui, util.bufferedinputpipe(stdout), stderr)
407 stdout = doublepipe(ui, util.bufferedinputpipe(stdout), stderr)
408 stdin = doublepipe(ui, stdin, stderr)
408 stdin = doublepipe(ui, stdin, stderr)
409
409
410 self._pipeo = stdin
410 self._pipeo = stdin
411 self._pipei = stdout
411 self._pipei = stdout
412 self._pipee = stderr
412 self._pipee = stderr
413 self._caps = caps
413 self._caps = caps
414 self._autoreadstderr = autoreadstderr
414 self._autoreadstderr = autoreadstderr
415
415
416 # Commands that have a "framed" response where the first line of the
416 # Commands that have a "framed" response where the first line of the
417 # response contains the length of that response.
417 # response contains the length of that response.
418 _FRAMED_COMMANDS = {
418 _FRAMED_COMMANDS = {
419 b'batch',
419 b'batch',
420 }
420 }
421
421
422 # Begin of ipeerconnection interface.
422 # Begin of ipeerconnection interface.
423
423
424 def url(self):
424 def url(self):
425 return self._url
425 return self._url
426
426
427 def local(self):
427 def local(self):
428 return None
428 return None
429
429
430 def peer(self):
430 def peer(self):
431 return self
431 return self
432
432
433 def canpush(self):
433 def canpush(self):
434 return True
434 return True
435
435
436 def close(self):
436 def close(self):
437 pass
437 self._cleanup()
438
438
439 # End of ipeerconnection interface.
439 # End of ipeerconnection interface.
440
440
441 # Begin of ipeercommands interface.
441 # Begin of ipeercommands interface.
442
442
443 def capabilities(self):
443 def capabilities(self):
444 return self._caps
444 return self._caps
445
445
446 # End of ipeercommands interface.
446 # End of ipeercommands interface.
447
447
448 def _readerr(self):
448 def _readerr(self):
449 _forwardoutput(self.ui, self._pipee)
449 _forwardoutput(self.ui, self._pipee)
450
450
451 def _abort(self, exception):
451 def _abort(self, exception):
452 self._cleanup()
452 self._cleanup()
453 raise exception
453 raise exception
454
454
455 def _cleanup(self):
455 def _cleanup(self):
456 _cleanuppipes(self.ui, self._pipei, self._pipeo, self._pipee)
456 _cleanuppipes(self.ui, self._pipei, self._pipeo, self._pipee)
457
457
458 __del__ = _cleanup
458 __del__ = _cleanup
459
459
460 def _sendrequest(self, cmd, args, framed=False):
460 def _sendrequest(self, cmd, args, framed=False):
461 if self.ui.debugflag and self.ui.configbool(
461 if self.ui.debugflag and self.ui.configbool(
462 b'devel', b'debug.peer-request'
462 b'devel', b'debug.peer-request'
463 ):
463 ):
464 dbg = self.ui.debug
464 dbg = self.ui.debug
465 line = b'devel-peer-request: %s\n'
465 line = b'devel-peer-request: %s\n'
466 dbg(line % cmd)
466 dbg(line % cmd)
467 for key, value in sorted(args.items()):
467 for key, value in sorted(args.items()):
468 if not isinstance(value, dict):
468 if not isinstance(value, dict):
469 dbg(line % b' %s: %d bytes' % (key, len(value)))
469 dbg(line % b' %s: %d bytes' % (key, len(value)))
470 else:
470 else:
471 for dk, dv in sorted(value.items()):
471 for dk, dv in sorted(value.items()):
472 dbg(line % b' %s-%s: %d' % (key, dk, len(dv)))
472 dbg(line % b' %s-%s: %d' % (key, dk, len(dv)))
473 self.ui.debug(b"sending %s command\n" % cmd)
473 self.ui.debug(b"sending %s command\n" % cmd)
474 self._pipeo.write(b"%s\n" % cmd)
474 self._pipeo.write(b"%s\n" % cmd)
475 _func, names = wireprotov1server.commands[cmd]
475 _func, names = wireprotov1server.commands[cmd]
476 keys = names.split()
476 keys = names.split()
477 wireargs = {}
477 wireargs = {}
478 for k in keys:
478 for k in keys:
479 if k == b'*':
479 if k == b'*':
480 wireargs[b'*'] = args
480 wireargs[b'*'] = args
481 break
481 break
482 else:
482 else:
483 wireargs[k] = args[k]
483 wireargs[k] = args[k]
484 del args[k]
484 del args[k]
485 for k, v in sorted(pycompat.iteritems(wireargs)):
485 for k, v in sorted(pycompat.iteritems(wireargs)):
486 self._pipeo.write(b"%s %d\n" % (k, len(v)))
486 self._pipeo.write(b"%s %d\n" % (k, len(v)))
487 if isinstance(v, dict):
487 if isinstance(v, dict):
488 for dk, dv in pycompat.iteritems(v):
488 for dk, dv in pycompat.iteritems(v):
489 self._pipeo.write(b"%s %d\n" % (dk, len(dv)))
489 self._pipeo.write(b"%s %d\n" % (dk, len(dv)))
490 self._pipeo.write(dv)
490 self._pipeo.write(dv)
491 else:
491 else:
492 self._pipeo.write(v)
492 self._pipeo.write(v)
493 self._pipeo.flush()
493 self._pipeo.flush()
494
494
495 # We know exactly how many bytes are in the response. So return a proxy
495 # We know exactly how many bytes are in the response. So return a proxy
496 # around the raw output stream that allows reading exactly this many
496 # around the raw output stream that allows reading exactly this many
497 # bytes. Callers then can read() without fear of overrunning the
497 # bytes. Callers then can read() without fear of overrunning the
498 # response.
498 # response.
499 if framed:
499 if framed:
500 amount = self._getamount()
500 amount = self._getamount()
501 return util.cappedreader(self._pipei, amount)
501 return util.cappedreader(self._pipei, amount)
502
502
503 return self._pipei
503 return self._pipei
504
504
505 def _callstream(self, cmd, **args):
505 def _callstream(self, cmd, **args):
506 args = pycompat.byteskwargs(args)
506 args = pycompat.byteskwargs(args)
507 return self._sendrequest(cmd, args, framed=cmd in self._FRAMED_COMMANDS)
507 return self._sendrequest(cmd, args, framed=cmd in self._FRAMED_COMMANDS)
508
508
509 def _callcompressable(self, cmd, **args):
509 def _callcompressable(self, cmd, **args):
510 args = pycompat.byteskwargs(args)
510 args = pycompat.byteskwargs(args)
511 return self._sendrequest(cmd, args, framed=cmd in self._FRAMED_COMMANDS)
511 return self._sendrequest(cmd, args, framed=cmd in self._FRAMED_COMMANDS)
512
512
513 def _call(self, cmd, **args):
513 def _call(self, cmd, **args):
514 args = pycompat.byteskwargs(args)
514 args = pycompat.byteskwargs(args)
515 return self._sendrequest(cmd, args, framed=True).read()
515 return self._sendrequest(cmd, args, framed=True).read()
516
516
517 def _callpush(self, cmd, fp, **args):
517 def _callpush(self, cmd, fp, **args):
518 # The server responds with an empty frame if the client should
518 # The server responds with an empty frame if the client should
519 # continue submitting the payload.
519 # continue submitting the payload.
520 r = self._call(cmd, **args)
520 r = self._call(cmd, **args)
521 if r:
521 if r:
522 return b'', r
522 return b'', r
523
523
524 # The payload consists of frames with content followed by an empty
524 # The payload consists of frames with content followed by an empty
525 # frame.
525 # frame.
526 for d in iter(lambda: fp.read(4096), b''):
526 for d in iter(lambda: fp.read(4096), b''):
527 self._writeframed(d)
527 self._writeframed(d)
528 self._writeframed(b"", flush=True)
528 self._writeframed(b"", flush=True)
529
529
530 # In case of success, there is an empty frame and a frame containing
530 # In case of success, there is an empty frame and a frame containing
531 # the integer result (as a string).
531 # the integer result (as a string).
532 # In case of error, there is a non-empty frame containing the error.
532 # In case of error, there is a non-empty frame containing the error.
533 r = self._readframed()
533 r = self._readframed()
534 if r:
534 if r:
535 return b'', r
535 return b'', r
536 return self._readframed(), b''
536 return self._readframed(), b''
537
537
538 def _calltwowaystream(self, cmd, fp, **args):
538 def _calltwowaystream(self, cmd, fp, **args):
539 # The server responds with an empty frame if the client should
539 # The server responds with an empty frame if the client should
540 # continue submitting the payload.
540 # continue submitting the payload.
541 r = self._call(cmd, **args)
541 r = self._call(cmd, **args)
542 if r:
542 if r:
543 # XXX needs to be made better
543 # XXX needs to be made better
544 raise error.Abort(_(b'unexpected remote reply: %s') % r)
544 raise error.Abort(_(b'unexpected remote reply: %s') % r)
545
545
546 # The payload consists of frames with content followed by an empty
546 # The payload consists of frames with content followed by an empty
547 # frame.
547 # frame.
548 for d in iter(lambda: fp.read(4096), b''):
548 for d in iter(lambda: fp.read(4096), b''):
549 self._writeframed(d)
549 self._writeframed(d)
550 self._writeframed(b"", flush=True)
550 self._writeframed(b"", flush=True)
551
551
552 return self._pipei
552 return self._pipei
553
553
554 def _getamount(self):
554 def _getamount(self):
555 l = self._pipei.readline()
555 l = self._pipei.readline()
556 if l == b'\n':
556 if l == b'\n':
557 if self._autoreadstderr:
557 if self._autoreadstderr:
558 self._readerr()
558 self._readerr()
559 msg = _(b'check previous remote output')
559 msg = _(b'check previous remote output')
560 self._abort(error.OutOfBandError(hint=msg))
560 self._abort(error.OutOfBandError(hint=msg))
561 if self._autoreadstderr:
561 if self._autoreadstderr:
562 self._readerr()
562 self._readerr()
563 try:
563 try:
564 return int(l)
564 return int(l)
565 except ValueError:
565 except ValueError:
566 self._abort(error.ResponseError(_(b"unexpected response:"), l))
566 self._abort(error.ResponseError(_(b"unexpected response:"), l))
567
567
568 def _readframed(self):
568 def _readframed(self):
569 size = self._getamount()
569 size = self._getamount()
570 if not size:
570 if not size:
571 return b''
571 return b''
572
572
573 return self._pipei.read(size)
573 return self._pipei.read(size)
574
574
575 def _writeframed(self, data, flush=False):
575 def _writeframed(self, data, flush=False):
576 self._pipeo.write(b"%d\n" % len(data))
576 self._pipeo.write(b"%d\n" % len(data))
577 if data:
577 if data:
578 self._pipeo.write(data)
578 self._pipeo.write(data)
579 if flush:
579 if flush:
580 self._pipeo.flush()
580 self._pipeo.flush()
581 if self._autoreadstderr:
581 if self._autoreadstderr:
582 self._readerr()
582 self._readerr()
583
583
584
584
585 class sshv2peer(sshv1peer):
585 class sshv2peer(sshv1peer):
586 """A peer that speakers version 2 of the transport protocol."""
586 """A peer that speakers version 2 of the transport protocol."""
587
587
588 # Currently version 2 is identical to version 1 post handshake.
588 # Currently version 2 is identical to version 1 post handshake.
589 # And handshake is performed before the peer is instantiated. So
589 # And handshake is performed before the peer is instantiated. So
590 # we need no custom code.
590 # we need no custom code.
591
591
592
592
593 def makepeer(ui, path, proc, stdin, stdout, stderr, autoreadstderr=True):
593 def makepeer(ui, path, proc, stdin, stdout, stderr, autoreadstderr=True):
594 """Make a peer instance from existing pipes.
594 """Make a peer instance from existing pipes.
595
595
596 ``path`` and ``proc`` are stored on the eventual peer instance and may
596 ``path`` and ``proc`` are stored on the eventual peer instance and may
597 not be used for anything meaningful.
597 not be used for anything meaningful.
598
598
599 ``stdin``, ``stdout``, and ``stderr`` are the pipes connected to the
599 ``stdin``, ``stdout``, and ``stderr`` are the pipes connected to the
600 SSH server's stdio handles.
600 SSH server's stdio handles.
601
601
602 This function is factored out to allow creating peers that don't
602 This function is factored out to allow creating peers that don't
603 actually spawn a new process. It is useful for starting SSH protocol
603 actually spawn a new process. It is useful for starting SSH protocol
604 servers and clients via non-standard means, which can be useful for
604 servers and clients via non-standard means, which can be useful for
605 testing.
605 testing.
606 """
606 """
607 try:
607 try:
608 protoname, caps = _performhandshake(ui, stdin, stdout, stderr)
608 protoname, caps = _performhandshake(ui, stdin, stdout, stderr)
609 except Exception:
609 except Exception:
610 _cleanuppipes(ui, stdout, stdin, stderr)
610 _cleanuppipes(ui, stdout, stdin, stderr)
611 raise
611 raise
612
612
613 if protoname == wireprototypes.SSHV1:
613 if protoname == wireprototypes.SSHV1:
614 return sshv1peer(
614 return sshv1peer(
615 ui,
615 ui,
616 path,
616 path,
617 proc,
617 proc,
618 stdin,
618 stdin,
619 stdout,
619 stdout,
620 stderr,
620 stderr,
621 caps,
621 caps,
622 autoreadstderr=autoreadstderr,
622 autoreadstderr=autoreadstderr,
623 )
623 )
624 elif protoname == wireprototypes.SSHV2:
624 elif protoname == wireprototypes.SSHV2:
625 return sshv2peer(
625 return sshv2peer(
626 ui,
626 ui,
627 path,
627 path,
628 proc,
628 proc,
629 stdin,
629 stdin,
630 stdout,
630 stdout,
631 stderr,
631 stderr,
632 caps,
632 caps,
633 autoreadstderr=autoreadstderr,
633 autoreadstderr=autoreadstderr,
634 )
634 )
635 else:
635 else:
636 _cleanuppipes(ui, stdout, stdin, stderr)
636 _cleanuppipes(ui, stdout, stdin, stderr)
637 raise error.RepoError(
637 raise error.RepoError(
638 _(b'unknown version of SSH protocol: %s') % protoname
638 _(b'unknown version of SSH protocol: %s') % protoname
639 )
639 )
640
640
641
641
642 def instance(ui, path, create, intents=None, createopts=None):
642 def instance(ui, path, create, intents=None, createopts=None):
643 """Create an SSH peer.
643 """Create an SSH peer.
644
644
645 The returned object conforms to the ``wireprotov1peer.wirepeer`` interface.
645 The returned object conforms to the ``wireprotov1peer.wirepeer`` interface.
646 """
646 """
647 u = util.url(path, parsequery=False, parsefragment=False)
647 u = util.url(path, parsequery=False, parsefragment=False)
648 if u.scheme != b'ssh' or not u.host or u.path is None:
648 if u.scheme != b'ssh' or not u.host or u.path is None:
649 raise error.RepoError(_(b"couldn't parse location %s") % path)
649 raise error.RepoError(_(b"couldn't parse location %s") % path)
650
650
651 util.checksafessh(path)
651 util.checksafessh(path)
652
652
653 if u.passwd is not None:
653 if u.passwd is not None:
654 raise error.RepoError(_(b'password in URL not supported'))
654 raise error.RepoError(_(b'password in URL not supported'))
655
655
656 sshcmd = ui.config(b'ui', b'ssh')
656 sshcmd = ui.config(b'ui', b'ssh')
657 remotecmd = ui.config(b'ui', b'remotecmd')
657 remotecmd = ui.config(b'ui', b'remotecmd')
658 sshaddenv = dict(ui.configitems(b'sshenv'))
658 sshaddenv = dict(ui.configitems(b'sshenv'))
659 sshenv = procutil.shellenviron(sshaddenv)
659 sshenv = procutil.shellenviron(sshaddenv)
660 remotepath = u.path or b'.'
660 remotepath = u.path or b'.'
661
661
662 args = procutil.sshargs(sshcmd, u.host, u.user, u.port)
662 args = procutil.sshargs(sshcmd, u.host, u.user, u.port)
663
663
664 if create:
664 if create:
665 # We /could/ do this, but only if the remote init command knows how to
665 # We /could/ do this, but only if the remote init command knows how to
666 # handle them. We don't yet make any assumptions about that. And without
666 # handle them. We don't yet make any assumptions about that. And without
667 # querying the remote, there's no way of knowing if the remote even
667 # querying the remote, there's no way of knowing if the remote even
668 # supports said requested feature.
668 # supports said requested feature.
669 if createopts:
669 if createopts:
670 raise error.RepoError(
670 raise error.RepoError(
671 _(
671 _(
672 b'cannot create remote SSH repositories '
672 b'cannot create remote SSH repositories '
673 b'with extra options'
673 b'with extra options'
674 )
674 )
675 )
675 )
676
676
677 cmd = b'%s %s %s' % (
677 cmd = b'%s %s %s' % (
678 sshcmd,
678 sshcmd,
679 args,
679 args,
680 procutil.shellquote(
680 procutil.shellquote(
681 b'%s init %s'
681 b'%s init %s'
682 % (_serverquote(remotecmd), _serverquote(remotepath))
682 % (_serverquote(remotecmd), _serverquote(remotepath))
683 ),
683 ),
684 )
684 )
685 ui.debug(b'running %s\n' % cmd)
685 ui.debug(b'running %s\n' % cmd)
686 res = ui.system(cmd, blockedtag=b'sshpeer', environ=sshenv)
686 res = ui.system(cmd, blockedtag=b'sshpeer', environ=sshenv)
687 if res != 0:
687 if res != 0:
688 raise error.RepoError(_(b'could not create remote repo'))
688 raise error.RepoError(_(b'could not create remote repo'))
689
689
690 proc, stdin, stdout, stderr = _makeconnection(
690 proc, stdin, stdout, stderr = _makeconnection(
691 ui, sshcmd, args, remotecmd, remotepath, sshenv
691 ui, sshcmd, args, remotecmd, remotepath, sshenv
692 )
692 )
693
693
694 peer = makepeer(ui, path, proc, stdin, stdout, stderr)
694 peer = makepeer(ui, path, proc, stdin, stdout, stderr)
695
695
696 # Finally, if supported by the server, notify it about our own
696 # Finally, if supported by the server, notify it about our own
697 # capabilities.
697 # capabilities.
698 if b'protocaps' in peer.capabilities():
698 if b'protocaps' in peer.capabilities():
699 try:
699 try:
700 peer._call(
700 peer._call(
701 b"protocaps", caps=b' '.join(sorted(_clientcapabilities()))
701 b"protocaps", caps=b' '.join(sorted(_clientcapabilities()))
702 )
702 )
703 except IOError:
703 except IOError:
704 peer._cleanup()
704 peer._cleanup()
705 raise error.RepoError(_(b'capability exchange failed'))
705 raise error.RepoError(_(b'capability exchange failed'))
706
706
707 return peer
707 return peer
General Comments 0
You need to be logged in to leave comments. Login now