##// END OF EJS Templates
sshpeer: convert command name to sysstr before accessing method...
marmoute -
r51813:88268725 default
parent child Browse files
Show More
@@ -1,710 +1,710 b''
1 # sshpeer.py - ssh repository proxy class for mercurial
1 # sshpeer.py - ssh repository proxy class for mercurial
2 #
2 #
3 # Copyright 2005, 2006 Olivia Mackall <olivia@selenic.com>
3 # Copyright 2005, 2006 Olivia Mackall <olivia@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8
8
9 import re
9 import re
10 import uuid
10 import uuid
11
11
12 from .i18n import _
12 from .i18n import _
13 from .pycompat import getattr
13 from .pycompat import getattr
14 from . import (
14 from . import (
15 error,
15 error,
16 pycompat,
16 pycompat,
17 util,
17 util,
18 wireprototypes,
18 wireprototypes,
19 wireprotov1peer,
19 wireprotov1peer,
20 wireprotov1server,
20 wireprotov1server,
21 )
21 )
22 from .utils import (
22 from .utils import (
23 procutil,
23 procutil,
24 stringutil,
24 stringutil,
25 urlutil,
25 urlutil,
26 )
26 )
27
27
28
28
29 def _serverquote(s):
29 def _serverquote(s):
30 """quote a string for the remote shell ... which we assume is sh"""
30 """quote a string for the remote shell ... which we assume is sh"""
31 if not s:
31 if not s:
32 return s
32 return s
33 if re.match(b'[a-zA-Z0-9@%_+=:,./-]*$', s):
33 if re.match(b'[a-zA-Z0-9@%_+=:,./-]*$', s):
34 return s
34 return s
35 return b"'%s'" % s.replace(b"'", b"'\\''")
35 return b"'%s'" % s.replace(b"'", b"'\\''")
36
36
37
37
38 def _forwardoutput(ui, pipe, warn=False):
38 def _forwardoutput(ui, pipe, warn=False):
39 """display all data currently available on pipe as remote output.
39 """display all data currently available on pipe as remote output.
40
40
41 This is non blocking."""
41 This is non blocking."""
42 if pipe and not pipe.closed:
42 if pipe and not pipe.closed:
43 s = procutil.readpipe(pipe)
43 s = procutil.readpipe(pipe)
44 if s:
44 if s:
45 display = ui.warn if warn else ui.status
45 display = ui.warn if warn else ui.status
46 for l in s.splitlines():
46 for l in s.splitlines():
47 display(_(b"remote: "), l, b'\n')
47 display(_(b"remote: "), l, b'\n')
48
48
49
49
50 class doublepipe:
50 class doublepipe:
51 """Operate a side-channel pipe in addition of a main one
51 """Operate a side-channel pipe in addition of a main one
52
52
53 The side-channel pipe contains server output to be forwarded to the user
53 The side-channel pipe contains server output to be forwarded to the user
54 input. The double pipe will behave as the "main" pipe, but will ensure the
54 input. The double pipe will behave as the "main" pipe, but will ensure the
55 content of the "side" pipe is properly processed while we wait for blocking
55 content of the "side" pipe is properly processed while we wait for blocking
56 call on the "main" pipe.
56 call on the "main" pipe.
57
57
58 If large amounts of data are read from "main", the forward will cease after
58 If large amounts of data are read from "main", the forward will cease after
59 the first bytes start to appear. This simplifies the implementation
59 the first bytes start to appear. This simplifies the implementation
60 without affecting actual output of sshpeer too much as we rarely issue
60 without affecting actual output of sshpeer too much as we rarely issue
61 large read for data not yet emitted by the server.
61 large read for data not yet emitted by the server.
62
62
63 The main pipe is expected to be a 'bufferedinputpipe' from the util module
63 The main pipe is expected to be a 'bufferedinputpipe' from the util module
64 that handle all the os specific bits. This class lives in this module
64 that handle all the os specific bits. This class lives in this module
65 because it focus on behavior specific to the ssh protocol."""
65 because it focus on behavior specific to the ssh protocol."""
66
66
67 def __init__(self, ui, main, side):
67 def __init__(self, ui, main, side):
68 self._ui = ui
68 self._ui = ui
69 self._main = main
69 self._main = main
70 self._side = side
70 self._side = side
71
71
72 def _wait(self):
72 def _wait(self):
73 """wait until some data are available on main or side
73 """wait until some data are available on main or side
74
74
75 return a pair of boolean (ismainready, issideready)
75 return a pair of boolean (ismainready, issideready)
76
76
77 (This will only wait for data if the setup is supported by `util.poll`)
77 (This will only wait for data if the setup is supported by `util.poll`)
78 """
78 """
79 if (
79 if (
80 isinstance(self._main, util.bufferedinputpipe)
80 isinstance(self._main, util.bufferedinputpipe)
81 and self._main.hasbuffer
81 and self._main.hasbuffer
82 ):
82 ):
83 # Main has data. Assume side is worth poking at.
83 # Main has data. Assume side is worth poking at.
84 return True, True
84 return True, True
85
85
86 fds = [self._main.fileno(), self._side.fileno()]
86 fds = [self._main.fileno(), self._side.fileno()]
87 try:
87 try:
88 act = util.poll(fds)
88 act = util.poll(fds)
89 except NotImplementedError:
89 except NotImplementedError:
90 # non supported yet case, assume all have data.
90 # non supported yet case, assume all have data.
91 act = fds
91 act = fds
92 return (self._main.fileno() in act, self._side.fileno() in act)
92 return (self._main.fileno() in act, self._side.fileno() in act)
93
93
94 def write(self, data):
94 def write(self, data):
95 return self._call(b'write', data)
95 return self._call(b'write', data)
96
96
97 def read(self, size):
97 def read(self, size):
98 r = self._call(b'read', size)
98 r = self._call(b'read', size)
99 if size != 0 and not r:
99 if size != 0 and not r:
100 # We've observed a condition that indicates the
100 # We've observed a condition that indicates the
101 # stdout closed unexpectedly. Check stderr one
101 # stdout closed unexpectedly. Check stderr one
102 # more time and snag anything that's there before
102 # more time and snag anything that's there before
103 # letting anyone know the main part of the pipe
103 # letting anyone know the main part of the pipe
104 # closed prematurely.
104 # closed prematurely.
105 _forwardoutput(self._ui, self._side)
105 _forwardoutput(self._ui, self._side)
106 return r
106 return r
107
107
108 def unbufferedread(self, size):
108 def unbufferedread(self, size):
109 r = self._call(b'unbufferedread', size)
109 r = self._call(b'unbufferedread', size)
110 if size != 0 and not r:
110 if size != 0 and not r:
111 # We've observed a condition that indicates the
111 # We've observed a condition that indicates the
112 # stdout closed unexpectedly. Check stderr one
112 # stdout closed unexpectedly. Check stderr one
113 # more time and snag anything that's there before
113 # more time and snag anything that's there before
114 # letting anyone know the main part of the pipe
114 # letting anyone know the main part of the pipe
115 # closed prematurely.
115 # closed prematurely.
116 _forwardoutput(self._ui, self._side)
116 _forwardoutput(self._ui, self._side)
117 return r
117 return r
118
118
119 def readline(self):
119 def readline(self):
120 return self._call(b'readline')
120 return self._call(b'readline')
121
121
122 def _call(self, methname, data=None):
122 def _call(self, methname, data=None):
123 """call <methname> on "main", forward output of "side" while blocking"""
123 """call <methname> on "main", forward output of "side" while blocking"""
124 # data can be '' or 0
124 # data can be '' or 0
125 if (data is not None and not data) or self._main.closed:
125 if (data is not None and not data) or self._main.closed:
126 _forwardoutput(self._ui, self._side)
126 _forwardoutput(self._ui, self._side)
127 return b''
127 return b''
128 while True:
128 while True:
129 mainready, sideready = self._wait()
129 mainready, sideready = self._wait()
130 if sideready:
130 if sideready:
131 _forwardoutput(self._ui, self._side)
131 _forwardoutput(self._ui, self._side)
132 if mainready:
132 if mainready:
133 meth = getattr(self._main, methname)
133 meth = getattr(self._main, pycompat.sysstr(methname))
134 if data is None:
134 if data is None:
135 return meth()
135 return meth()
136 else:
136 else:
137 return meth(data)
137 return meth(data)
138
138
139 def close(self):
139 def close(self):
140 return self._main.close()
140 return self._main.close()
141
141
142 @property
142 @property
143 def closed(self):
143 def closed(self):
144 return self._main.closed
144 return self._main.closed
145
145
146 def flush(self):
146 def flush(self):
147 return self._main.flush()
147 return self._main.flush()
148
148
149
149
150 def _cleanuppipes(ui, pipei, pipeo, pipee, warn):
150 def _cleanuppipes(ui, pipei, pipeo, pipee, warn):
151 """Clean up pipes used by an SSH connection."""
151 """Clean up pipes used by an SSH connection."""
152 didsomething = False
152 didsomething = False
153 if pipeo and not pipeo.closed:
153 if pipeo and not pipeo.closed:
154 didsomething = True
154 didsomething = True
155 pipeo.close()
155 pipeo.close()
156 if pipei and not pipei.closed:
156 if pipei and not pipei.closed:
157 didsomething = True
157 didsomething = True
158 pipei.close()
158 pipei.close()
159
159
160 if pipee and not pipee.closed:
160 if pipee and not pipee.closed:
161 didsomething = True
161 didsomething = True
162 # Try to read from the err descriptor until EOF.
162 # Try to read from the err descriptor until EOF.
163 try:
163 try:
164 for l in pipee:
164 for l in pipee:
165 ui.status(_(b'remote: '), l)
165 ui.status(_(b'remote: '), l)
166 except (IOError, ValueError):
166 except (IOError, ValueError):
167 pass
167 pass
168
168
169 pipee.close()
169 pipee.close()
170
170
171 if didsomething and warn is not None:
171 if didsomething and warn is not None:
172 # Encourage explicit close of sshpeers. Closing via __del__ is
172 # Encourage explicit close of sshpeers. Closing via __del__ is
173 # not very predictable when exceptions are thrown, which has led
173 # not very predictable when exceptions are thrown, which has led
174 # to deadlocks due to a peer get gc'ed in a fork
174 # to deadlocks due to a peer get gc'ed in a fork
175 # We add our own stack trace, because the stacktrace when called
175 # We add our own stack trace, because the stacktrace when called
176 # from __del__ is useless.
176 # from __del__ is useless.
177 ui.develwarn(b'missing close on SSH connection created at:\n%s' % warn)
177 ui.develwarn(b'missing close on SSH connection created at:\n%s' % warn)
178
178
179
179
180 def _makeconnection(
180 def _makeconnection(
181 ui, sshcmd, args, remotecmd, path, sshenv=None, remotehidden=False
181 ui, sshcmd, args, remotecmd, path, sshenv=None, remotehidden=False
182 ):
182 ):
183 """Create an SSH connection to a server.
183 """Create an SSH connection to a server.
184
184
185 Returns a tuple of (process, stdin, stdout, stderr) for the
185 Returns a tuple of (process, stdin, stdout, stderr) for the
186 spawned process.
186 spawned process.
187 """
187 """
188 cmd = b'%s %s %s' % (
188 cmd = b'%s %s %s' % (
189 sshcmd,
189 sshcmd,
190 args,
190 args,
191 procutil.shellquote(
191 procutil.shellquote(
192 b'%s -R %s serve --stdio%s'
192 b'%s -R %s serve --stdio%s'
193 % (
193 % (
194 _serverquote(remotecmd),
194 _serverquote(remotecmd),
195 _serverquote(path),
195 _serverquote(path),
196 b' --hidden' if remotehidden else b'',
196 b' --hidden' if remotehidden else b'',
197 )
197 )
198 ),
198 ),
199 )
199 )
200
200
201 ui.debug(b'running %s\n' % cmd)
201 ui.debug(b'running %s\n' % cmd)
202
202
203 # no buffer allow the use of 'select'
203 # no buffer allow the use of 'select'
204 # feel free to remove buffering and select usage when we ultimately
204 # feel free to remove buffering and select usage when we ultimately
205 # move to threading.
205 # move to threading.
206 stdin, stdout, stderr, proc = procutil.popen4(cmd, bufsize=0, env=sshenv)
206 stdin, stdout, stderr, proc = procutil.popen4(cmd, bufsize=0, env=sshenv)
207
207
208 return proc, stdin, stdout, stderr
208 return proc, stdin, stdout, stderr
209
209
210
210
211 def _clientcapabilities():
211 def _clientcapabilities():
212 """Return list of capabilities of this client.
212 """Return list of capabilities of this client.
213
213
214 Returns a list of capabilities that are supported by this client.
214 Returns a list of capabilities that are supported by this client.
215 """
215 """
216 protoparams = {b'partial-pull'}
216 protoparams = {b'partial-pull'}
217 comps = [
217 comps = [
218 e.wireprotosupport().name
218 e.wireprotosupport().name
219 for e in util.compengines.supportedwireengines(util.CLIENTROLE)
219 for e in util.compengines.supportedwireengines(util.CLIENTROLE)
220 ]
220 ]
221 protoparams.add(b'comp=%s' % b','.join(comps))
221 protoparams.add(b'comp=%s' % b','.join(comps))
222 return protoparams
222 return protoparams
223
223
224
224
225 def _performhandshake(ui, stdin, stdout, stderr):
225 def _performhandshake(ui, stdin, stdout, stderr):
226 def badresponse():
226 def badresponse():
227 # Flush any output on stderr. In general, the stderr contains errors
227 # Flush any output on stderr. In general, the stderr contains errors
228 # from the remote (ssh errors, some hg errors), and status indications
228 # from the remote (ssh errors, some hg errors), and status indications
229 # (like "adding changes"), with no current way to tell them apart.
229 # (like "adding changes"), with no current way to tell them apart.
230 # Here we failed so early that it's almost certainly only errors, so
230 # Here we failed so early that it's almost certainly only errors, so
231 # use warn=True so -q doesn't hide them.
231 # use warn=True so -q doesn't hide them.
232 _forwardoutput(ui, stderr, warn=True)
232 _forwardoutput(ui, stderr, warn=True)
233
233
234 msg = _(b'no suitable response from remote hg')
234 msg = _(b'no suitable response from remote hg')
235 hint = ui.config(b'ui', b'ssherrorhint')
235 hint = ui.config(b'ui', b'ssherrorhint')
236 raise error.RepoError(msg, hint=hint)
236 raise error.RepoError(msg, hint=hint)
237
237
238 # The handshake consists of sending wire protocol commands in reverse
238 # The handshake consists of sending wire protocol commands in reverse
239 # order of protocol implementation and then sniffing for a response
239 # order of protocol implementation and then sniffing for a response
240 # to one of them.
240 # to one of them.
241 #
241 #
242 # Those commands (from oldest to newest) are:
242 # Those commands (from oldest to newest) are:
243 #
243 #
244 # ``between``
244 # ``between``
245 # Asks for the set of revisions between a pair of revisions. Command
245 # Asks for the set of revisions between a pair of revisions. Command
246 # present in all Mercurial server implementations.
246 # present in all Mercurial server implementations.
247 #
247 #
248 # ``hello``
248 # ``hello``
249 # Instructs the server to advertise its capabilities. Introduced in
249 # Instructs the server to advertise its capabilities. Introduced in
250 # Mercurial 0.9.1.
250 # Mercurial 0.9.1.
251 #
251 #
252 # ``upgrade``
252 # ``upgrade``
253 # Requests upgrade from default transport protocol version 1 to
253 # Requests upgrade from default transport protocol version 1 to
254 # a newer version. Introduced in Mercurial 4.6 as an experimental
254 # a newer version. Introduced in Mercurial 4.6 as an experimental
255 # feature.
255 # feature.
256 #
256 #
257 # The ``between`` command is issued with a request for the null
257 # The ``between`` command is issued with a request for the null
258 # range. If the remote is a Mercurial server, this request will
258 # range. If the remote is a Mercurial server, this request will
259 # generate a specific response: ``1\n\n``. This represents the
259 # generate a specific response: ``1\n\n``. This represents the
260 # wire protocol encoded value for ``\n``. We look for ``1\n\n``
260 # wire protocol encoded value for ``\n``. We look for ``1\n\n``
261 # in the output stream and know this is the response to ``between``
261 # in the output stream and know this is the response to ``between``
262 # and we're at the end of our handshake reply.
262 # and we're at the end of our handshake reply.
263 #
263 #
264 # The response to the ``hello`` command will be a line with the
264 # The response to the ``hello`` command will be a line with the
265 # length of the value returned by that command followed by that
265 # length of the value returned by that command followed by that
266 # value. If the server doesn't support ``hello`` (which should be
266 # value. If the server doesn't support ``hello`` (which should be
267 # rare), that line will be ``0\n``. Otherwise, the value will contain
267 # rare), that line will be ``0\n``. Otherwise, the value will contain
268 # RFC 822 like lines. Of these, the ``capabilities:`` line contains
268 # RFC 822 like lines. Of these, the ``capabilities:`` line contains
269 # the capabilities of the server.
269 # the capabilities of the server.
270 #
270 #
271 # The ``upgrade`` command isn't really a command in the traditional
271 # The ``upgrade`` command isn't really a command in the traditional
272 # sense of version 1 of the transport because it isn't using the
272 # sense of version 1 of the transport because it isn't using the
273 # proper mechanism for formatting insteads: instead, it just encodes
273 # proper mechanism for formatting insteads: instead, it just encodes
274 # arguments on the line, delimited by spaces.
274 # arguments on the line, delimited by spaces.
275 #
275 #
276 # The ``upgrade`` line looks like ``upgrade <token> <capabilities>``.
276 # The ``upgrade`` line looks like ``upgrade <token> <capabilities>``.
277 # If the server doesn't support protocol upgrades, it will reply to
277 # If the server doesn't support protocol upgrades, it will reply to
278 # this line with ``0\n``. Otherwise, it emits an
278 # this line with ``0\n``. Otherwise, it emits an
279 # ``upgraded <token> <protocol>`` line to both stdout and stderr.
279 # ``upgraded <token> <protocol>`` line to both stdout and stderr.
280 # Content immediately following this line describes additional
280 # Content immediately following this line describes additional
281 # protocol and server state.
281 # protocol and server state.
282 #
282 #
283 # In addition to the responses to our command requests, the server
283 # In addition to the responses to our command requests, the server
284 # may emit "banner" output on stdout. SSH servers are allowed to
284 # may emit "banner" output on stdout. SSH servers are allowed to
285 # print messages to stdout on login. Issuing commands on connection
285 # print messages to stdout on login. Issuing commands on connection
286 # allows us to flush this banner output from the server by scanning
286 # allows us to flush this banner output from the server by scanning
287 # for output to our well-known ``between`` command. Of course, if
287 # for output to our well-known ``between`` command. Of course, if
288 # the banner contains ``1\n\n``, this will throw off our detection.
288 # the banner contains ``1\n\n``, this will throw off our detection.
289
289
290 requestlog = ui.configbool(b'devel', b'debug.peer-request')
290 requestlog = ui.configbool(b'devel', b'debug.peer-request')
291
291
292 # Generate a random token to help identify responses to version 2
292 # Generate a random token to help identify responses to version 2
293 # upgrade request.
293 # upgrade request.
294 token = pycompat.sysbytes(str(uuid.uuid4()))
294 token = pycompat.sysbytes(str(uuid.uuid4()))
295
295
296 try:
296 try:
297 pairsarg = b'%s-%s' % (b'0' * 40, b'0' * 40)
297 pairsarg = b'%s-%s' % (b'0' * 40, b'0' * 40)
298 handshake = [
298 handshake = [
299 b'hello\n',
299 b'hello\n',
300 b'between\n',
300 b'between\n',
301 b'pairs %d\n' % len(pairsarg),
301 b'pairs %d\n' % len(pairsarg),
302 pairsarg,
302 pairsarg,
303 ]
303 ]
304
304
305 if requestlog:
305 if requestlog:
306 ui.debug(b'devel-peer-request: hello+between\n')
306 ui.debug(b'devel-peer-request: hello+between\n')
307 ui.debug(b'devel-peer-request: pairs: %d bytes\n' % len(pairsarg))
307 ui.debug(b'devel-peer-request: pairs: %d bytes\n' % len(pairsarg))
308 ui.debug(b'sending hello command\n')
308 ui.debug(b'sending hello command\n')
309 ui.debug(b'sending between command\n')
309 ui.debug(b'sending between command\n')
310
310
311 stdin.write(b''.join(handshake))
311 stdin.write(b''.join(handshake))
312 stdin.flush()
312 stdin.flush()
313 except IOError:
313 except IOError:
314 badresponse()
314 badresponse()
315
315
316 # Assume version 1 of wire protocol by default.
316 # Assume version 1 of wire protocol by default.
317 protoname = wireprototypes.SSHV1
317 protoname = wireprototypes.SSHV1
318 reupgraded = re.compile(b'^upgraded %s (.*)$' % stringutil.reescape(token))
318 reupgraded = re.compile(b'^upgraded %s (.*)$' % stringutil.reescape(token))
319
319
320 lines = [b'', b'dummy']
320 lines = [b'', b'dummy']
321 max_noise = 500
321 max_noise = 500
322 while lines[-1] and max_noise:
322 while lines[-1] and max_noise:
323 try:
323 try:
324 l = stdout.readline()
324 l = stdout.readline()
325 _forwardoutput(ui, stderr, warn=True)
325 _forwardoutput(ui, stderr, warn=True)
326
326
327 # Look for reply to protocol upgrade request. It has a token
327 # Look for reply to protocol upgrade request. It has a token
328 # in it, so there should be no false positives.
328 # in it, so there should be no false positives.
329 m = reupgraded.match(l)
329 m = reupgraded.match(l)
330 if m:
330 if m:
331 protoname = m.group(1)
331 protoname = m.group(1)
332 ui.debug(b'protocol upgraded to %s\n' % protoname)
332 ui.debug(b'protocol upgraded to %s\n' % protoname)
333 # If an upgrade was handled, the ``hello`` and ``between``
333 # If an upgrade was handled, the ``hello`` and ``between``
334 # requests are ignored. The next output belongs to the
334 # requests are ignored. The next output belongs to the
335 # protocol, so stop scanning lines.
335 # protocol, so stop scanning lines.
336 break
336 break
337
337
338 # Otherwise it could be a banner, ``0\n`` response if server
338 # Otherwise it could be a banner, ``0\n`` response if server
339 # doesn't support upgrade.
339 # doesn't support upgrade.
340
340
341 if lines[-1] == b'1\n' and l == b'\n':
341 if lines[-1] == b'1\n' and l == b'\n':
342 break
342 break
343 if l:
343 if l:
344 ui.debug(b'remote: ', l)
344 ui.debug(b'remote: ', l)
345 lines.append(l)
345 lines.append(l)
346 max_noise -= 1
346 max_noise -= 1
347 except IOError:
347 except IOError:
348 badresponse()
348 badresponse()
349 else:
349 else:
350 badresponse()
350 badresponse()
351
351
352 caps = set()
352 caps = set()
353
353
354 # For version 1, we should see a ``capabilities`` line in response to the
354 # For version 1, we should see a ``capabilities`` line in response to the
355 # ``hello`` command.
355 # ``hello`` command.
356 if protoname == wireprototypes.SSHV1:
356 if protoname == wireprototypes.SSHV1:
357 for l in reversed(lines):
357 for l in reversed(lines):
358 # Look for response to ``hello`` command. Scan from the back so
358 # Look for response to ``hello`` command. Scan from the back so
359 # we don't misinterpret banner output as the command reply.
359 # we don't misinterpret banner output as the command reply.
360 if l.startswith(b'capabilities:'):
360 if l.startswith(b'capabilities:'):
361 caps.update(l[:-1].split(b':')[1].split())
361 caps.update(l[:-1].split(b':')[1].split())
362 break
362 break
363
363
364 # Error if we couldn't find capabilities, this means:
364 # Error if we couldn't find capabilities, this means:
365 #
365 #
366 # 1. Remote isn't a Mercurial server
366 # 1. Remote isn't a Mercurial server
367 # 2. Remote is a <0.9.1 Mercurial server
367 # 2. Remote is a <0.9.1 Mercurial server
368 # 3. Remote is a future Mercurial server that dropped ``hello``
368 # 3. Remote is a future Mercurial server that dropped ``hello``
369 # and other attempted handshake mechanisms.
369 # and other attempted handshake mechanisms.
370 if not caps:
370 if not caps:
371 badresponse()
371 badresponse()
372
372
373 # Flush any output on stderr before proceeding.
373 # Flush any output on stderr before proceeding.
374 _forwardoutput(ui, stderr, warn=True)
374 _forwardoutput(ui, stderr, warn=True)
375
375
376 return protoname, caps
376 return protoname, caps
377
377
378
378
379 class sshv1peer(wireprotov1peer.wirepeer):
379 class sshv1peer(wireprotov1peer.wirepeer):
380 def __init__(
380 def __init__(
381 self,
381 self,
382 ui,
382 ui,
383 path,
383 path,
384 proc,
384 proc,
385 stdin,
385 stdin,
386 stdout,
386 stdout,
387 stderr,
387 stderr,
388 caps,
388 caps,
389 autoreadstderr=True,
389 autoreadstderr=True,
390 remotehidden=False,
390 remotehidden=False,
391 ):
391 ):
392 """Create a peer from an existing SSH connection.
392 """Create a peer from an existing SSH connection.
393
393
394 ``proc`` is a handle on the underlying SSH process.
394 ``proc`` is a handle on the underlying SSH process.
395 ``stdin``, ``stdout``, and ``stderr`` are handles on the stdio
395 ``stdin``, ``stdout``, and ``stderr`` are handles on the stdio
396 pipes for that process.
396 pipes for that process.
397 ``caps`` is a set of capabilities supported by the remote.
397 ``caps`` is a set of capabilities supported by the remote.
398 ``autoreadstderr`` denotes whether to automatically read from
398 ``autoreadstderr`` denotes whether to automatically read from
399 stderr and to forward its output.
399 stderr and to forward its output.
400 """
400 """
401 super().__init__(ui, path=path, remotehidden=remotehidden)
401 super().__init__(ui, path=path, remotehidden=remotehidden)
402 # self._subprocess is unused. Keeping a handle on the process
402 # self._subprocess is unused. Keeping a handle on the process
403 # holds a reference and prevents it from being garbage collected.
403 # holds a reference and prevents it from being garbage collected.
404 self._subprocess = proc
404 self._subprocess = proc
405
405
406 # And we hook up our "doublepipe" wrapper to allow querying
406 # And we hook up our "doublepipe" wrapper to allow querying
407 # stderr any time we perform I/O.
407 # stderr any time we perform I/O.
408 if autoreadstderr:
408 if autoreadstderr:
409 stdout = doublepipe(ui, util.bufferedinputpipe(stdout), stderr)
409 stdout = doublepipe(ui, util.bufferedinputpipe(stdout), stderr)
410 stdin = doublepipe(ui, stdin, stderr)
410 stdin = doublepipe(ui, stdin, stderr)
411
411
412 self._pipeo = stdin
412 self._pipeo = stdin
413 self._pipei = stdout
413 self._pipei = stdout
414 self._pipee = stderr
414 self._pipee = stderr
415 self._caps = caps
415 self._caps = caps
416 self._autoreadstderr = autoreadstderr
416 self._autoreadstderr = autoreadstderr
417 self._initstack = b''.join(util.getstackframes(1))
417 self._initstack = b''.join(util.getstackframes(1))
418 self._remotehidden = remotehidden
418 self._remotehidden = remotehidden
419
419
420 # Commands that have a "framed" response where the first line of the
420 # Commands that have a "framed" response where the first line of the
421 # response contains the length of that response.
421 # response contains the length of that response.
422 _FRAMED_COMMANDS = {
422 _FRAMED_COMMANDS = {
423 b'batch',
423 b'batch',
424 }
424 }
425
425
426 # Begin of ipeerconnection interface.
426 # Begin of ipeerconnection interface.
427
427
428 def url(self):
428 def url(self):
429 return self.path.loc
429 return self.path.loc
430
430
431 def local(self):
431 def local(self):
432 return None
432 return None
433
433
434 def canpush(self):
434 def canpush(self):
435 return True
435 return True
436
436
437 def close(self):
437 def close(self):
438 self._cleanup()
438 self._cleanup()
439
439
440 # End of ipeerconnection interface.
440 # End of ipeerconnection interface.
441
441
442 # Begin of ipeercommands interface.
442 # Begin of ipeercommands interface.
443
443
444 def capabilities(self):
444 def capabilities(self):
445 return self._caps
445 return self._caps
446
446
447 # End of ipeercommands interface.
447 # End of ipeercommands interface.
448
448
449 def _readerr(self):
449 def _readerr(self):
450 _forwardoutput(self.ui, self._pipee)
450 _forwardoutput(self.ui, self._pipee)
451
451
452 def _abort(self, exception):
452 def _abort(self, exception):
453 self._cleanup()
453 self._cleanup()
454 raise exception
454 raise exception
455
455
456 def _cleanup(self, warn=None):
456 def _cleanup(self, warn=None):
457 _cleanuppipes(self.ui, self._pipei, self._pipeo, self._pipee, warn=warn)
457 _cleanuppipes(self.ui, self._pipei, self._pipeo, self._pipee, warn=warn)
458
458
459 def __del__(self):
459 def __del__(self):
460 self._cleanup(warn=self._initstack)
460 self._cleanup(warn=self._initstack)
461
461
462 def _sendrequest(self, cmd, args, framed=False):
462 def _sendrequest(self, cmd, args, framed=False):
463 if self.ui.debugflag and self.ui.configbool(
463 if self.ui.debugflag and self.ui.configbool(
464 b'devel', b'debug.peer-request'
464 b'devel', b'debug.peer-request'
465 ):
465 ):
466 dbg = self.ui.debug
466 dbg = self.ui.debug
467 line = b'devel-peer-request: %s\n'
467 line = b'devel-peer-request: %s\n'
468 dbg(line % cmd)
468 dbg(line % cmd)
469 for key, value in sorted(args.items()):
469 for key, value in sorted(args.items()):
470 if not isinstance(value, dict):
470 if not isinstance(value, dict):
471 dbg(line % b' %s: %d bytes' % (key, len(value)))
471 dbg(line % b' %s: %d bytes' % (key, len(value)))
472 else:
472 else:
473 for dk, dv in sorted(value.items()):
473 for dk, dv in sorted(value.items()):
474 dbg(line % b' %s-%s: %d' % (key, dk, len(dv)))
474 dbg(line % b' %s-%s: %d' % (key, dk, len(dv)))
475 self.ui.debug(b"sending %s command\n" % cmd)
475 self.ui.debug(b"sending %s command\n" % cmd)
476 self._pipeo.write(b"%s\n" % cmd)
476 self._pipeo.write(b"%s\n" % cmd)
477 _func, names = wireprotov1server.commands[cmd]
477 _func, names = wireprotov1server.commands[cmd]
478 keys = names.split()
478 keys = names.split()
479 wireargs = {}
479 wireargs = {}
480 for k in keys:
480 for k in keys:
481 if k == b'*':
481 if k == b'*':
482 wireargs[b'*'] = args
482 wireargs[b'*'] = args
483 break
483 break
484 else:
484 else:
485 wireargs[k] = args[k]
485 wireargs[k] = args[k]
486 del args[k]
486 del args[k]
487 for k, v in sorted(wireargs.items()):
487 for k, v in sorted(wireargs.items()):
488 self._pipeo.write(b"%s %d\n" % (k, len(v)))
488 self._pipeo.write(b"%s %d\n" % (k, len(v)))
489 if isinstance(v, dict):
489 if isinstance(v, dict):
490 for dk, dv in v.items():
490 for dk, dv in v.items():
491 self._pipeo.write(b"%s %d\n" % (dk, len(dv)))
491 self._pipeo.write(b"%s %d\n" % (dk, len(dv)))
492 self._pipeo.write(dv)
492 self._pipeo.write(dv)
493 else:
493 else:
494 self._pipeo.write(v)
494 self._pipeo.write(v)
495 self._pipeo.flush()
495 self._pipeo.flush()
496
496
497 # We know exactly how many bytes are in the response. So return a proxy
497 # We know exactly how many bytes are in the response. So return a proxy
498 # around the raw output stream that allows reading exactly this many
498 # around the raw output stream that allows reading exactly this many
499 # bytes. Callers then can read() without fear of overrunning the
499 # bytes. Callers then can read() without fear of overrunning the
500 # response.
500 # response.
501 if framed:
501 if framed:
502 amount = self._getamount()
502 amount = self._getamount()
503 return util.cappedreader(self._pipei, amount)
503 return util.cappedreader(self._pipei, amount)
504
504
505 return self._pipei
505 return self._pipei
506
506
507 def _callstream(self, cmd, **args):
507 def _callstream(self, cmd, **args):
508 args = pycompat.byteskwargs(args)
508 args = pycompat.byteskwargs(args)
509 return self._sendrequest(cmd, args, framed=cmd in self._FRAMED_COMMANDS)
509 return self._sendrequest(cmd, args, framed=cmd in self._FRAMED_COMMANDS)
510
510
511 def _callcompressable(self, cmd, **args):
511 def _callcompressable(self, cmd, **args):
512 args = pycompat.byteskwargs(args)
512 args = pycompat.byteskwargs(args)
513 return self._sendrequest(cmd, args, framed=cmd in self._FRAMED_COMMANDS)
513 return self._sendrequest(cmd, args, framed=cmd in self._FRAMED_COMMANDS)
514
514
515 def _call(self, cmd, **args):
515 def _call(self, cmd, **args):
516 args = pycompat.byteskwargs(args)
516 args = pycompat.byteskwargs(args)
517 return self._sendrequest(cmd, args, framed=True).read()
517 return self._sendrequest(cmd, args, framed=True).read()
518
518
519 def _callpush(self, cmd, fp, **args):
519 def _callpush(self, cmd, fp, **args):
520 # The server responds with an empty frame if the client should
520 # The server responds with an empty frame if the client should
521 # continue submitting the payload.
521 # continue submitting the payload.
522 r = self._call(cmd, **args)
522 r = self._call(cmd, **args)
523 if r:
523 if r:
524 return b'', r
524 return b'', r
525
525
526 # The payload consists of frames with content followed by an empty
526 # The payload consists of frames with content followed by an empty
527 # frame.
527 # frame.
528 for d in iter(lambda: fp.read(4096), b''):
528 for d in iter(lambda: fp.read(4096), b''):
529 self._writeframed(d)
529 self._writeframed(d)
530 self._writeframed(b"", flush=True)
530 self._writeframed(b"", flush=True)
531
531
532 # In case of success, there is an empty frame and a frame containing
532 # In case of success, there is an empty frame and a frame containing
533 # the integer result (as a string).
533 # the integer result (as a string).
534 # In case of error, there is a non-empty frame containing the error.
534 # In case of error, there is a non-empty frame containing the error.
535 r = self._readframed()
535 r = self._readframed()
536 if r:
536 if r:
537 return b'', r
537 return b'', r
538 return self._readframed(), b''
538 return self._readframed(), b''
539
539
540 def _calltwowaystream(self, cmd, fp, **args):
540 def _calltwowaystream(self, cmd, fp, **args):
541 # The server responds with an empty frame if the client should
541 # The server responds with an empty frame if the client should
542 # continue submitting the payload.
542 # continue submitting the payload.
543 r = self._call(cmd, **args)
543 r = self._call(cmd, **args)
544 if r:
544 if r:
545 # XXX needs to be made better
545 # XXX needs to be made better
546 raise error.Abort(_(b'unexpected remote reply: %s') % r)
546 raise error.Abort(_(b'unexpected remote reply: %s') % r)
547
547
548 # The payload consists of frames with content followed by an empty
548 # The payload consists of frames with content followed by an empty
549 # frame.
549 # frame.
550 for d in iter(lambda: fp.read(4096), b''):
550 for d in iter(lambda: fp.read(4096), b''):
551 self._writeframed(d)
551 self._writeframed(d)
552 self._writeframed(b"", flush=True)
552 self._writeframed(b"", flush=True)
553
553
554 return self._pipei
554 return self._pipei
555
555
556 def _getamount(self):
556 def _getamount(self):
557 l = self._pipei.readline()
557 l = self._pipei.readline()
558 if l == b'\n':
558 if l == b'\n':
559 if self._autoreadstderr:
559 if self._autoreadstderr:
560 self._readerr()
560 self._readerr()
561 msg = _(b'check previous remote output')
561 msg = _(b'check previous remote output')
562 self._abort(error.OutOfBandError(hint=msg))
562 self._abort(error.OutOfBandError(hint=msg))
563 if self._autoreadstderr:
563 if self._autoreadstderr:
564 self._readerr()
564 self._readerr()
565 try:
565 try:
566 return int(l)
566 return int(l)
567 except ValueError:
567 except ValueError:
568 self._abort(error.ResponseError(_(b"unexpected response:"), l))
568 self._abort(error.ResponseError(_(b"unexpected response:"), l))
569
569
570 def _readframed(self):
570 def _readframed(self):
571 size = self._getamount()
571 size = self._getamount()
572 if not size:
572 if not size:
573 return b''
573 return b''
574
574
575 return self._pipei.read(size)
575 return self._pipei.read(size)
576
576
577 def _writeframed(self, data, flush=False):
577 def _writeframed(self, data, flush=False):
578 self._pipeo.write(b"%d\n" % len(data))
578 self._pipeo.write(b"%d\n" % len(data))
579 if data:
579 if data:
580 self._pipeo.write(data)
580 self._pipeo.write(data)
581 if flush:
581 if flush:
582 self._pipeo.flush()
582 self._pipeo.flush()
583 if self._autoreadstderr:
583 if self._autoreadstderr:
584 self._readerr()
584 self._readerr()
585
585
586
586
587 def _make_peer(
587 def _make_peer(
588 ui,
588 ui,
589 path,
589 path,
590 proc,
590 proc,
591 stdin,
591 stdin,
592 stdout,
592 stdout,
593 stderr,
593 stderr,
594 autoreadstderr=True,
594 autoreadstderr=True,
595 remotehidden=False,
595 remotehidden=False,
596 ):
596 ):
597 """Make a peer instance from existing pipes.
597 """Make a peer instance from existing pipes.
598
598
599 ``path`` and ``proc`` are stored on the eventual peer instance and may
599 ``path`` and ``proc`` are stored on the eventual peer instance and may
600 not be used for anything meaningful.
600 not be used for anything meaningful.
601
601
602 ``stdin``, ``stdout``, and ``stderr`` are the pipes connected to the
602 ``stdin``, ``stdout``, and ``stderr`` are the pipes connected to the
603 SSH server's stdio handles.
603 SSH server's stdio handles.
604
604
605 This function is factored out to allow creating peers that don't
605 This function is factored out to allow creating peers that don't
606 actually spawn a new process. It is useful for starting SSH protocol
606 actually spawn a new process. It is useful for starting SSH protocol
607 servers and clients via non-standard means, which can be useful for
607 servers and clients via non-standard means, which can be useful for
608 testing.
608 testing.
609 """
609 """
610 try:
610 try:
611 protoname, caps = _performhandshake(ui, stdin, stdout, stderr)
611 protoname, caps = _performhandshake(ui, stdin, stdout, stderr)
612 except Exception:
612 except Exception:
613 _cleanuppipes(ui, stdout, stdin, stderr, warn=None)
613 _cleanuppipes(ui, stdout, stdin, stderr, warn=None)
614 raise
614 raise
615
615
616 if protoname == wireprototypes.SSHV1:
616 if protoname == wireprototypes.SSHV1:
617 return sshv1peer(
617 return sshv1peer(
618 ui,
618 ui,
619 path,
619 path,
620 proc,
620 proc,
621 stdin,
621 stdin,
622 stdout,
622 stdout,
623 stderr,
623 stderr,
624 caps,
624 caps,
625 autoreadstderr=autoreadstderr,
625 autoreadstderr=autoreadstderr,
626 remotehidden=remotehidden,
626 remotehidden=remotehidden,
627 )
627 )
628 else:
628 else:
629 _cleanuppipes(ui, stdout, stdin, stderr, warn=None)
629 _cleanuppipes(ui, stdout, stdin, stderr, warn=None)
630 raise error.RepoError(
630 raise error.RepoError(
631 _(b'unknown version of SSH protocol: %s') % protoname
631 _(b'unknown version of SSH protocol: %s') % protoname
632 )
632 )
633
633
634
634
635 def make_peer(
635 def make_peer(
636 ui, path, create, intents=None, createopts=None, remotehidden=False
636 ui, path, create, intents=None, createopts=None, remotehidden=False
637 ):
637 ):
638 """Create an SSH peer.
638 """Create an SSH peer.
639
639
640 The returned object conforms to the ``wireprotov1peer.wirepeer`` interface.
640 The returned object conforms to the ``wireprotov1peer.wirepeer`` interface.
641 """
641 """
642 u = urlutil.url(path.loc, parsequery=False, parsefragment=False)
642 u = urlutil.url(path.loc, parsequery=False, parsefragment=False)
643 if u.scheme != b'ssh' or not u.host or u.path is None:
643 if u.scheme != b'ssh' or not u.host or u.path is None:
644 raise error.RepoError(_(b"couldn't parse location %s") % path)
644 raise error.RepoError(_(b"couldn't parse location %s") % path)
645
645
646 urlutil.checksafessh(path.loc)
646 urlutil.checksafessh(path.loc)
647
647
648 if u.passwd is not None:
648 if u.passwd is not None:
649 raise error.RepoError(_(b'password in URL not supported'))
649 raise error.RepoError(_(b'password in URL not supported'))
650
650
651 sshcmd = ui.config(b'ui', b'ssh')
651 sshcmd = ui.config(b'ui', b'ssh')
652 remotecmd = ui.config(b'ui', b'remotecmd')
652 remotecmd = ui.config(b'ui', b'remotecmd')
653 sshaddenv = dict(ui.configitems(b'sshenv'))
653 sshaddenv = dict(ui.configitems(b'sshenv'))
654 sshenv = procutil.shellenviron(sshaddenv)
654 sshenv = procutil.shellenviron(sshaddenv)
655 remotepath = u.path or b'.'
655 remotepath = u.path or b'.'
656
656
657 args = procutil.sshargs(sshcmd, u.host, u.user, u.port)
657 args = procutil.sshargs(sshcmd, u.host, u.user, u.port)
658
658
659 if create:
659 if create:
660 # We /could/ do this, but only if the remote init command knows how to
660 # We /could/ do this, but only if the remote init command knows how to
661 # handle them. We don't yet make any assumptions about that. And without
661 # handle them. We don't yet make any assumptions about that. And without
662 # querying the remote, there's no way of knowing if the remote even
662 # querying the remote, there's no way of knowing if the remote even
663 # supports said requested feature.
663 # supports said requested feature.
664 if createopts:
664 if createopts:
665 raise error.RepoError(
665 raise error.RepoError(
666 _(
666 _(
667 b'cannot create remote SSH repositories '
667 b'cannot create remote SSH repositories '
668 b'with extra options'
668 b'with extra options'
669 )
669 )
670 )
670 )
671
671
672 cmd = b'%s %s %s' % (
672 cmd = b'%s %s %s' % (
673 sshcmd,
673 sshcmd,
674 args,
674 args,
675 procutil.shellquote(
675 procutil.shellquote(
676 b'%s init %s'
676 b'%s init %s'
677 % (_serverquote(remotecmd), _serverquote(remotepath))
677 % (_serverquote(remotecmd), _serverquote(remotepath))
678 ),
678 ),
679 )
679 )
680 ui.debug(b'running %s\n' % cmd)
680 ui.debug(b'running %s\n' % cmd)
681 res = ui.system(cmd, blockedtag=b'sshpeer', environ=sshenv)
681 res = ui.system(cmd, blockedtag=b'sshpeer', environ=sshenv)
682 if res != 0:
682 if res != 0:
683 raise error.RepoError(_(b'could not create remote repo'))
683 raise error.RepoError(_(b'could not create remote repo'))
684
684
685 proc, stdin, stdout, stderr = _makeconnection(
685 proc, stdin, stdout, stderr = _makeconnection(
686 ui,
686 ui,
687 sshcmd,
687 sshcmd,
688 args,
688 args,
689 remotecmd,
689 remotecmd,
690 remotepath,
690 remotepath,
691 sshenv,
691 sshenv,
692 remotehidden=remotehidden,
692 remotehidden=remotehidden,
693 )
693 )
694
694
695 peer = _make_peer(
695 peer = _make_peer(
696 ui, path, proc, stdin, stdout, stderr, remotehidden=remotehidden
696 ui, path, proc, stdin, stdout, stderr, remotehidden=remotehidden
697 )
697 )
698
698
699 # Finally, if supported by the server, notify it about our own
699 # Finally, if supported by the server, notify it about our own
700 # capabilities.
700 # capabilities.
701 if b'protocaps' in peer.capabilities():
701 if b'protocaps' in peer.capabilities():
702 try:
702 try:
703 peer._call(
703 peer._call(
704 b"protocaps", caps=b' '.join(sorted(_clientcapabilities()))
704 b"protocaps", caps=b' '.join(sorted(_clientcapabilities()))
705 )
705 )
706 except IOError:
706 except IOError:
707 peer._cleanup()
707 peer._cleanup()
708 raise error.RepoError(_(b'capability exchange failed'))
708 raise error.RepoError(_(b'capability exchange failed'))
709
709
710 return peer
710 return peer
General Comments 0
You need to be logged in to leave comments. Login now