##// END OF EJS Templates
peer: get the `path` object down to the sshpeer...
marmoute -
r50656:73ed1d13 default
parent child Browse files
Show More
@@ -1,676 +1,674
1 # sshpeer.py - ssh repository proxy class for mercurial
1 # sshpeer.py - ssh repository proxy class for mercurial
2 #
2 #
3 # Copyright 2005, 2006 Olivia Mackall <olivia@selenic.com>
3 # Copyright 2005, 2006 Olivia Mackall <olivia@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8
8
9 import re
9 import re
10 import uuid
10 import uuid
11
11
12 from .i18n import _
12 from .i18n import _
13 from .pycompat import getattr
13 from .pycompat import getattr
14 from . import (
14 from . import (
15 error,
15 error,
16 pycompat,
16 pycompat,
17 util,
17 util,
18 wireprototypes,
18 wireprototypes,
19 wireprotov1peer,
19 wireprotov1peer,
20 wireprotov1server,
20 wireprotov1server,
21 )
21 )
22 from .utils import (
22 from .utils import (
23 procutil,
23 procutil,
24 stringutil,
24 stringutil,
25 urlutil,
25 urlutil,
26 )
26 )
27
27
28
28
29 def _serverquote(s):
29 def _serverquote(s):
30 """quote a string for the remote shell ... which we assume is sh"""
30 """quote a string for the remote shell ... which we assume is sh"""
31 if not s:
31 if not s:
32 return s
32 return s
33 if re.match(b'[a-zA-Z0-9@%_+=:,./-]*$', s):
33 if re.match(b'[a-zA-Z0-9@%_+=:,./-]*$', s):
34 return s
34 return s
35 return b"'%s'" % s.replace(b"'", b"'\\''")
35 return b"'%s'" % s.replace(b"'", b"'\\''")
36
36
37
37
38 def _forwardoutput(ui, pipe, warn=False):
38 def _forwardoutput(ui, pipe, warn=False):
39 """display all data currently available on pipe as remote output.
39 """display all data currently available on pipe as remote output.
40
40
41 This is non blocking."""
41 This is non blocking."""
42 if pipe and not pipe.closed:
42 if pipe and not pipe.closed:
43 s = procutil.readpipe(pipe)
43 s = procutil.readpipe(pipe)
44 if s:
44 if s:
45 display = ui.warn if warn else ui.status
45 display = ui.warn if warn else ui.status
46 for l in s.splitlines():
46 for l in s.splitlines():
47 display(_(b"remote: "), l, b'\n')
47 display(_(b"remote: "), l, b'\n')
48
48
49
49
50 class doublepipe:
50 class doublepipe:
51 """Operate a side-channel pipe in addition of a main one
51 """Operate a side-channel pipe in addition of a main one
52
52
53 The side-channel pipe contains server output to be forwarded to the user
53 The side-channel pipe contains server output to be forwarded to the user
54 input. The double pipe will behave as the "main" pipe, but will ensure the
54 input. The double pipe will behave as the "main" pipe, but will ensure the
55 content of the "side" pipe is properly processed while we wait for blocking
55 content of the "side" pipe is properly processed while we wait for blocking
56 call on the "main" pipe.
56 call on the "main" pipe.
57
57
58 If large amounts of data are read from "main", the forward will cease after
58 If large amounts of data are read from "main", the forward will cease after
59 the first bytes start to appear. This simplifies the implementation
59 the first bytes start to appear. This simplifies the implementation
60 without affecting actual output of sshpeer too much as we rarely issue
60 without affecting actual output of sshpeer too much as we rarely issue
61 large read for data not yet emitted by the server.
61 large read for data not yet emitted by the server.
62
62
63 The main pipe is expected to be a 'bufferedinputpipe' from the util module
63 The main pipe is expected to be a 'bufferedinputpipe' from the util module
64 that handle all the os specific bits. This class lives in this module
64 that handle all the os specific bits. This class lives in this module
65 because it focus on behavior specific to the ssh protocol."""
65 because it focus on behavior specific to the ssh protocol."""
66
66
67 def __init__(self, ui, main, side):
67 def __init__(self, ui, main, side):
68 self._ui = ui
68 self._ui = ui
69 self._main = main
69 self._main = main
70 self._side = side
70 self._side = side
71
71
72 def _wait(self):
72 def _wait(self):
73 """wait until some data are available on main or side
73 """wait until some data are available on main or side
74
74
75 return a pair of boolean (ismainready, issideready)
75 return a pair of boolean (ismainready, issideready)
76
76
77 (This will only wait for data if the setup is supported by `util.poll`)
77 (This will only wait for data if the setup is supported by `util.poll`)
78 """
78 """
79 if (
79 if (
80 isinstance(self._main, util.bufferedinputpipe)
80 isinstance(self._main, util.bufferedinputpipe)
81 and self._main.hasbuffer
81 and self._main.hasbuffer
82 ):
82 ):
83 # Main has data. Assume side is worth poking at.
83 # Main has data. Assume side is worth poking at.
84 return True, True
84 return True, True
85
85
86 fds = [self._main.fileno(), self._side.fileno()]
86 fds = [self._main.fileno(), self._side.fileno()]
87 try:
87 try:
88 act = util.poll(fds)
88 act = util.poll(fds)
89 except NotImplementedError:
89 except NotImplementedError:
90 # non supported yet case, assume all have data.
90 # non supported yet case, assume all have data.
91 act = fds
91 act = fds
92 return (self._main.fileno() in act, self._side.fileno() in act)
92 return (self._main.fileno() in act, self._side.fileno() in act)
93
93
94 def write(self, data):
94 def write(self, data):
95 return self._call(b'write', data)
95 return self._call(b'write', data)
96
96
97 def read(self, size):
97 def read(self, size):
98 r = self._call(b'read', size)
98 r = self._call(b'read', size)
99 if size != 0 and not r:
99 if size != 0 and not r:
100 # We've observed a condition that indicates the
100 # We've observed a condition that indicates the
101 # stdout closed unexpectedly. Check stderr one
101 # stdout closed unexpectedly. Check stderr one
102 # more time and snag anything that's there before
102 # more time and snag anything that's there before
103 # letting anyone know the main part of the pipe
103 # letting anyone know the main part of the pipe
104 # closed prematurely.
104 # closed prematurely.
105 _forwardoutput(self._ui, self._side)
105 _forwardoutput(self._ui, self._side)
106 return r
106 return r
107
107
108 def unbufferedread(self, size):
108 def unbufferedread(self, size):
109 r = self._call(b'unbufferedread', size)
109 r = self._call(b'unbufferedread', size)
110 if size != 0 and not r:
110 if size != 0 and not r:
111 # We've observed a condition that indicates the
111 # We've observed a condition that indicates the
112 # stdout closed unexpectedly. Check stderr one
112 # stdout closed unexpectedly. Check stderr one
113 # more time and snag anything that's there before
113 # more time and snag anything that's there before
114 # letting anyone know the main part of the pipe
114 # letting anyone know the main part of the pipe
115 # closed prematurely.
115 # closed prematurely.
116 _forwardoutput(self._ui, self._side)
116 _forwardoutput(self._ui, self._side)
117 return r
117 return r
118
118
119 def readline(self):
119 def readline(self):
120 return self._call(b'readline')
120 return self._call(b'readline')
121
121
122 def _call(self, methname, data=None):
122 def _call(self, methname, data=None):
123 """call <methname> on "main", forward output of "side" while blocking"""
123 """call <methname> on "main", forward output of "side" while blocking"""
124 # data can be '' or 0
124 # data can be '' or 0
125 if (data is not None and not data) or self._main.closed:
125 if (data is not None and not data) or self._main.closed:
126 _forwardoutput(self._ui, self._side)
126 _forwardoutput(self._ui, self._side)
127 return b''
127 return b''
128 while True:
128 while True:
129 mainready, sideready = self._wait()
129 mainready, sideready = self._wait()
130 if sideready:
130 if sideready:
131 _forwardoutput(self._ui, self._side)
131 _forwardoutput(self._ui, self._side)
132 if mainready:
132 if mainready:
133 meth = getattr(self._main, methname)
133 meth = getattr(self._main, methname)
134 if data is None:
134 if data is None:
135 return meth()
135 return meth()
136 else:
136 else:
137 return meth(data)
137 return meth(data)
138
138
139 def close(self):
139 def close(self):
140 return self._main.close()
140 return self._main.close()
141
141
142 @property
142 @property
143 def closed(self):
143 def closed(self):
144 return self._main.closed
144 return self._main.closed
145
145
146 def flush(self):
146 def flush(self):
147 return self._main.flush()
147 return self._main.flush()
148
148
149
149
150 def _cleanuppipes(ui, pipei, pipeo, pipee, warn):
150 def _cleanuppipes(ui, pipei, pipeo, pipee, warn):
151 """Clean up pipes used by an SSH connection."""
151 """Clean up pipes used by an SSH connection."""
152 didsomething = False
152 didsomething = False
153 if pipeo and not pipeo.closed:
153 if pipeo and not pipeo.closed:
154 didsomething = True
154 didsomething = True
155 pipeo.close()
155 pipeo.close()
156 if pipei and not pipei.closed:
156 if pipei and not pipei.closed:
157 didsomething = True
157 didsomething = True
158 pipei.close()
158 pipei.close()
159
159
160 if pipee and not pipee.closed:
160 if pipee and not pipee.closed:
161 didsomething = True
161 didsomething = True
162 # Try to read from the err descriptor until EOF.
162 # Try to read from the err descriptor until EOF.
163 try:
163 try:
164 for l in pipee:
164 for l in pipee:
165 ui.status(_(b'remote: '), l)
165 ui.status(_(b'remote: '), l)
166 except (IOError, ValueError):
166 except (IOError, ValueError):
167 pass
167 pass
168
168
169 pipee.close()
169 pipee.close()
170
170
171 if didsomething and warn is not None:
171 if didsomething and warn is not None:
172 # Encourage explicit close of sshpeers. Closing via __del__ is
172 # Encourage explicit close of sshpeers. Closing via __del__ is
173 # not very predictable when exceptions are thrown, which has led
173 # not very predictable when exceptions are thrown, which has led
174 # to deadlocks due to a peer get gc'ed in a fork
174 # to deadlocks due to a peer get gc'ed in a fork
175 # We add our own stack trace, because the stacktrace when called
175 # We add our own stack trace, because the stacktrace when called
176 # from __del__ is useless.
176 # from __del__ is useless.
177 ui.develwarn(b'missing close on SSH connection created at:\n%s' % warn)
177 ui.develwarn(b'missing close on SSH connection created at:\n%s' % warn)
178
178
179
179
180 def _makeconnection(ui, sshcmd, args, remotecmd, path, sshenv=None):
180 def _makeconnection(ui, sshcmd, args, remotecmd, path, sshenv=None):
181 """Create an SSH connection to a server.
181 """Create an SSH connection to a server.
182
182
183 Returns a tuple of (process, stdin, stdout, stderr) for the
183 Returns a tuple of (process, stdin, stdout, stderr) for the
184 spawned process.
184 spawned process.
185 """
185 """
186 cmd = b'%s %s %s' % (
186 cmd = b'%s %s %s' % (
187 sshcmd,
187 sshcmd,
188 args,
188 args,
189 procutil.shellquote(
189 procutil.shellquote(
190 b'%s -R %s serve --stdio'
190 b'%s -R %s serve --stdio'
191 % (_serverquote(remotecmd), _serverquote(path))
191 % (_serverquote(remotecmd), _serverquote(path))
192 ),
192 ),
193 )
193 )
194
194
195 ui.debug(b'running %s\n' % cmd)
195 ui.debug(b'running %s\n' % cmd)
196
196
197 # no buffer allow the use of 'select'
197 # no buffer allow the use of 'select'
198 # feel free to remove buffering and select usage when we ultimately
198 # feel free to remove buffering and select usage when we ultimately
199 # move to threading.
199 # move to threading.
200 stdin, stdout, stderr, proc = procutil.popen4(cmd, bufsize=0, env=sshenv)
200 stdin, stdout, stderr, proc = procutil.popen4(cmd, bufsize=0, env=sshenv)
201
201
202 return proc, stdin, stdout, stderr
202 return proc, stdin, stdout, stderr
203
203
204
204
205 def _clientcapabilities():
205 def _clientcapabilities():
206 """Return list of capabilities of this client.
206 """Return list of capabilities of this client.
207
207
208 Returns a list of capabilities that are supported by this client.
208 Returns a list of capabilities that are supported by this client.
209 """
209 """
210 protoparams = {b'partial-pull'}
210 protoparams = {b'partial-pull'}
211 comps = [
211 comps = [
212 e.wireprotosupport().name
212 e.wireprotosupport().name
213 for e in util.compengines.supportedwireengines(util.CLIENTROLE)
213 for e in util.compengines.supportedwireengines(util.CLIENTROLE)
214 ]
214 ]
215 protoparams.add(b'comp=%s' % b','.join(comps))
215 protoparams.add(b'comp=%s' % b','.join(comps))
216 return protoparams
216 return protoparams
217
217
218
218
219 def _performhandshake(ui, stdin, stdout, stderr):
219 def _performhandshake(ui, stdin, stdout, stderr):
220 def badresponse():
220 def badresponse():
221 # Flush any output on stderr. In general, the stderr contains errors
221 # Flush any output on stderr. In general, the stderr contains errors
222 # from the remote (ssh errors, some hg errors), and status indications
222 # from the remote (ssh errors, some hg errors), and status indications
223 # (like "adding changes"), with no current way to tell them apart.
223 # (like "adding changes"), with no current way to tell them apart.
224 # Here we failed so early that it's almost certainly only errors, so
224 # Here we failed so early that it's almost certainly only errors, so
225 # use warn=True so -q doesn't hide them.
225 # use warn=True so -q doesn't hide them.
226 _forwardoutput(ui, stderr, warn=True)
226 _forwardoutput(ui, stderr, warn=True)
227
227
228 msg = _(b'no suitable response from remote hg')
228 msg = _(b'no suitable response from remote hg')
229 hint = ui.config(b'ui', b'ssherrorhint')
229 hint = ui.config(b'ui', b'ssherrorhint')
230 raise error.RepoError(msg, hint=hint)
230 raise error.RepoError(msg, hint=hint)
231
231
232 # The handshake consists of sending wire protocol commands in reverse
232 # The handshake consists of sending wire protocol commands in reverse
233 # order of protocol implementation and then sniffing for a response
233 # order of protocol implementation and then sniffing for a response
234 # to one of them.
234 # to one of them.
235 #
235 #
236 # Those commands (from oldest to newest) are:
236 # Those commands (from oldest to newest) are:
237 #
237 #
238 # ``between``
238 # ``between``
239 # Asks for the set of revisions between a pair of revisions. Command
239 # Asks for the set of revisions between a pair of revisions. Command
240 # present in all Mercurial server implementations.
240 # present in all Mercurial server implementations.
241 #
241 #
242 # ``hello``
242 # ``hello``
243 # Instructs the server to advertise its capabilities. Introduced in
243 # Instructs the server to advertise its capabilities. Introduced in
244 # Mercurial 0.9.1.
244 # Mercurial 0.9.1.
245 #
245 #
246 # ``upgrade``
246 # ``upgrade``
247 # Requests upgrade from default transport protocol version 1 to
247 # Requests upgrade from default transport protocol version 1 to
248 # a newer version. Introduced in Mercurial 4.6 as an experimental
248 # a newer version. Introduced in Mercurial 4.6 as an experimental
249 # feature.
249 # feature.
250 #
250 #
251 # The ``between`` command is issued with a request for the null
251 # The ``between`` command is issued with a request for the null
252 # range. If the remote is a Mercurial server, this request will
252 # range. If the remote is a Mercurial server, this request will
253 # generate a specific response: ``1\n\n``. This represents the
253 # generate a specific response: ``1\n\n``. This represents the
254 # wire protocol encoded value for ``\n``. We look for ``1\n\n``
254 # wire protocol encoded value for ``\n``. We look for ``1\n\n``
255 # in the output stream and know this is the response to ``between``
255 # in the output stream and know this is the response to ``between``
256 # and we're at the end of our handshake reply.
256 # and we're at the end of our handshake reply.
257 #
257 #
258 # The response to the ``hello`` command will be a line with the
258 # The response to the ``hello`` command will be a line with the
259 # length of the value returned by that command followed by that
259 # length of the value returned by that command followed by that
260 # value. If the server doesn't support ``hello`` (which should be
260 # value. If the server doesn't support ``hello`` (which should be
261 # rare), that line will be ``0\n``. Otherwise, the value will contain
261 # rare), that line will be ``0\n``. Otherwise, the value will contain
262 # RFC 822 like lines. Of these, the ``capabilities:`` line contains
262 # RFC 822 like lines. Of these, the ``capabilities:`` line contains
263 # the capabilities of the server.
263 # the capabilities of the server.
264 #
264 #
265 # The ``upgrade`` command isn't really a command in the traditional
265 # The ``upgrade`` command isn't really a command in the traditional
266 # sense of version 1 of the transport because it isn't using the
266 # sense of version 1 of the transport because it isn't using the
267 # proper mechanism for formatting insteads: instead, it just encodes
267 # proper mechanism for formatting insteads: instead, it just encodes
268 # arguments on the line, delimited by spaces.
268 # arguments on the line, delimited by spaces.
269 #
269 #
270 # The ``upgrade`` line looks like ``upgrade <token> <capabilities>``.
270 # The ``upgrade`` line looks like ``upgrade <token> <capabilities>``.
271 # If the server doesn't support protocol upgrades, it will reply to
271 # If the server doesn't support protocol upgrades, it will reply to
272 # this line with ``0\n``. Otherwise, it emits an
272 # this line with ``0\n``. Otherwise, it emits an
273 # ``upgraded <token> <protocol>`` line to both stdout and stderr.
273 # ``upgraded <token> <protocol>`` line to both stdout and stderr.
274 # Content immediately following this line describes additional
274 # Content immediately following this line describes additional
275 # protocol and server state.
275 # protocol and server state.
276 #
276 #
277 # In addition to the responses to our command requests, the server
277 # In addition to the responses to our command requests, the server
278 # may emit "banner" output on stdout. SSH servers are allowed to
278 # may emit "banner" output on stdout. SSH servers are allowed to
279 # print messages to stdout on login. Issuing commands on connection
279 # print messages to stdout on login. Issuing commands on connection
280 # allows us to flush this banner output from the server by scanning
280 # allows us to flush this banner output from the server by scanning
281 # for output to our well-known ``between`` command. Of course, if
281 # for output to our well-known ``between`` command. Of course, if
282 # the banner contains ``1\n\n``, this will throw off our detection.
282 # the banner contains ``1\n\n``, this will throw off our detection.
283
283
284 requestlog = ui.configbool(b'devel', b'debug.peer-request')
284 requestlog = ui.configbool(b'devel', b'debug.peer-request')
285
285
286 # Generate a random token to help identify responses to version 2
286 # Generate a random token to help identify responses to version 2
287 # upgrade request.
287 # upgrade request.
288 token = pycompat.sysbytes(str(uuid.uuid4()))
288 token = pycompat.sysbytes(str(uuid.uuid4()))
289
289
290 try:
290 try:
291 pairsarg = b'%s-%s' % (b'0' * 40, b'0' * 40)
291 pairsarg = b'%s-%s' % (b'0' * 40, b'0' * 40)
292 handshake = [
292 handshake = [
293 b'hello\n',
293 b'hello\n',
294 b'between\n',
294 b'between\n',
295 b'pairs %d\n' % len(pairsarg),
295 b'pairs %d\n' % len(pairsarg),
296 pairsarg,
296 pairsarg,
297 ]
297 ]
298
298
299 if requestlog:
299 if requestlog:
300 ui.debug(b'devel-peer-request: hello+between\n')
300 ui.debug(b'devel-peer-request: hello+between\n')
301 ui.debug(b'devel-peer-request: pairs: %d bytes\n' % len(pairsarg))
301 ui.debug(b'devel-peer-request: pairs: %d bytes\n' % len(pairsarg))
302 ui.debug(b'sending hello command\n')
302 ui.debug(b'sending hello command\n')
303 ui.debug(b'sending between command\n')
303 ui.debug(b'sending between command\n')
304
304
305 stdin.write(b''.join(handshake))
305 stdin.write(b''.join(handshake))
306 stdin.flush()
306 stdin.flush()
307 except IOError:
307 except IOError:
308 badresponse()
308 badresponse()
309
309
310 # Assume version 1 of wire protocol by default.
310 # Assume version 1 of wire protocol by default.
311 protoname = wireprototypes.SSHV1
311 protoname = wireprototypes.SSHV1
312 reupgraded = re.compile(b'^upgraded %s (.*)$' % stringutil.reescape(token))
312 reupgraded = re.compile(b'^upgraded %s (.*)$' % stringutil.reescape(token))
313
313
314 lines = [b'', b'dummy']
314 lines = [b'', b'dummy']
315 max_noise = 500
315 max_noise = 500
316 while lines[-1] and max_noise:
316 while lines[-1] and max_noise:
317 try:
317 try:
318 l = stdout.readline()
318 l = stdout.readline()
319 _forwardoutput(ui, stderr, warn=True)
319 _forwardoutput(ui, stderr, warn=True)
320
320
321 # Look for reply to protocol upgrade request. It has a token
321 # Look for reply to protocol upgrade request. It has a token
322 # in it, so there should be no false positives.
322 # in it, so there should be no false positives.
323 m = reupgraded.match(l)
323 m = reupgraded.match(l)
324 if m:
324 if m:
325 protoname = m.group(1)
325 protoname = m.group(1)
326 ui.debug(b'protocol upgraded to %s\n' % protoname)
326 ui.debug(b'protocol upgraded to %s\n' % protoname)
327 # If an upgrade was handled, the ``hello`` and ``between``
327 # If an upgrade was handled, the ``hello`` and ``between``
328 # requests are ignored. The next output belongs to the
328 # requests are ignored. The next output belongs to the
329 # protocol, so stop scanning lines.
329 # protocol, so stop scanning lines.
330 break
330 break
331
331
332 # Otherwise it could be a banner, ``0\n`` response if server
332 # Otherwise it could be a banner, ``0\n`` response if server
333 # doesn't support upgrade.
333 # doesn't support upgrade.
334
334
335 if lines[-1] == b'1\n' and l == b'\n':
335 if lines[-1] == b'1\n' and l == b'\n':
336 break
336 break
337 if l:
337 if l:
338 ui.debug(b'remote: ', l)
338 ui.debug(b'remote: ', l)
339 lines.append(l)
339 lines.append(l)
340 max_noise -= 1
340 max_noise -= 1
341 except IOError:
341 except IOError:
342 badresponse()
342 badresponse()
343 else:
343 else:
344 badresponse()
344 badresponse()
345
345
346 caps = set()
346 caps = set()
347
347
348 # For version 1, we should see a ``capabilities`` line in response to the
348 # For version 1, we should see a ``capabilities`` line in response to the
349 # ``hello`` command.
349 # ``hello`` command.
350 if protoname == wireprototypes.SSHV1:
350 if protoname == wireprototypes.SSHV1:
351 for l in reversed(lines):
351 for l in reversed(lines):
352 # Look for response to ``hello`` command. Scan from the back so
352 # Look for response to ``hello`` command. Scan from the back so
353 # we don't misinterpret banner output as the command reply.
353 # we don't misinterpret banner output as the command reply.
354 if l.startswith(b'capabilities:'):
354 if l.startswith(b'capabilities:'):
355 caps.update(l[:-1].split(b':')[1].split())
355 caps.update(l[:-1].split(b':')[1].split())
356 break
356 break
357
357
358 # Error if we couldn't find capabilities, this means:
358 # Error if we couldn't find capabilities, this means:
359 #
359 #
360 # 1. Remote isn't a Mercurial server
360 # 1. Remote isn't a Mercurial server
361 # 2. Remote is a <0.9.1 Mercurial server
361 # 2. Remote is a <0.9.1 Mercurial server
362 # 3. Remote is a future Mercurial server that dropped ``hello``
362 # 3. Remote is a future Mercurial server that dropped ``hello``
363 # and other attempted handshake mechanisms.
363 # and other attempted handshake mechanisms.
364 if not caps:
364 if not caps:
365 badresponse()
365 badresponse()
366
366
367 # Flush any output on stderr before proceeding.
367 # Flush any output on stderr before proceeding.
368 _forwardoutput(ui, stderr, warn=True)
368 _forwardoutput(ui, stderr, warn=True)
369
369
370 return protoname, caps
370 return protoname, caps
371
371
372
372
373 class sshv1peer(wireprotov1peer.wirepeer):
373 class sshv1peer(wireprotov1peer.wirepeer):
374 def __init__(
374 def __init__(
375 self, ui, url, proc, stdin, stdout, stderr, caps, autoreadstderr=True
375 self, ui, path, proc, stdin, stdout, stderr, caps, autoreadstderr=True
376 ):
376 ):
377 """Create a peer from an existing SSH connection.
377 """Create a peer from an existing SSH connection.
378
378
379 ``proc`` is a handle on the underlying SSH process.
379 ``proc`` is a handle on the underlying SSH process.
380 ``stdin``, ``stdout``, and ``stderr`` are handles on the stdio
380 ``stdin``, ``stdout``, and ``stderr`` are handles on the stdio
381 pipes for that process.
381 pipes for that process.
382 ``caps`` is a set of capabilities supported by the remote.
382 ``caps`` is a set of capabilities supported by the remote.
383 ``autoreadstderr`` denotes whether to automatically read from
383 ``autoreadstderr`` denotes whether to automatically read from
384 stderr and to forward its output.
384 stderr and to forward its output.
385 """
385 """
386 super().__init__(ui)
386 super().__init__(ui, path=path)
387 self._url = url
388 # self._subprocess is unused. Keeping a handle on the process
387 # self._subprocess is unused. Keeping a handle on the process
389 # holds a reference and prevents it from being garbage collected.
388 # holds a reference and prevents it from being garbage collected.
390 self._subprocess = proc
389 self._subprocess = proc
391
390
392 # And we hook up our "doublepipe" wrapper to allow querying
391 # And we hook up our "doublepipe" wrapper to allow querying
393 # stderr any time we perform I/O.
392 # stderr any time we perform I/O.
394 if autoreadstderr:
393 if autoreadstderr:
395 stdout = doublepipe(ui, util.bufferedinputpipe(stdout), stderr)
394 stdout = doublepipe(ui, util.bufferedinputpipe(stdout), stderr)
396 stdin = doublepipe(ui, stdin, stderr)
395 stdin = doublepipe(ui, stdin, stderr)
397
396
398 self._pipeo = stdin
397 self._pipeo = stdin
399 self._pipei = stdout
398 self._pipei = stdout
400 self._pipee = stderr
399 self._pipee = stderr
401 self._caps = caps
400 self._caps = caps
402 self._autoreadstderr = autoreadstderr
401 self._autoreadstderr = autoreadstderr
403 self._initstack = b''.join(util.getstackframes(1))
402 self._initstack = b''.join(util.getstackframes(1))
404
403
405 # Commands that have a "framed" response where the first line of the
404 # Commands that have a "framed" response where the first line of the
406 # response contains the length of that response.
405 # response contains the length of that response.
407 _FRAMED_COMMANDS = {
406 _FRAMED_COMMANDS = {
408 b'batch',
407 b'batch',
409 }
408 }
410
409
411 # Begin of ipeerconnection interface.
410 # Begin of ipeerconnection interface.
412
411
413 def url(self):
412 def url(self):
414 return self._url
413 return self.path.loc
415
414
416 def local(self):
415 def local(self):
417 return None
416 return None
418
417
419 def canpush(self):
418 def canpush(self):
420 return True
419 return True
421
420
422 def close(self):
421 def close(self):
423 self._cleanup()
422 self._cleanup()
424
423
425 # End of ipeerconnection interface.
424 # End of ipeerconnection interface.
426
425
427 # Begin of ipeercommands interface.
426 # Begin of ipeercommands interface.
428
427
429 def capabilities(self):
428 def capabilities(self):
430 return self._caps
429 return self._caps
431
430
432 # End of ipeercommands interface.
431 # End of ipeercommands interface.
433
432
434 def _readerr(self):
433 def _readerr(self):
435 _forwardoutput(self.ui, self._pipee)
434 _forwardoutput(self.ui, self._pipee)
436
435
437 def _abort(self, exception):
436 def _abort(self, exception):
438 self._cleanup()
437 self._cleanup()
439 raise exception
438 raise exception
440
439
441 def _cleanup(self, warn=None):
440 def _cleanup(self, warn=None):
442 _cleanuppipes(self.ui, self._pipei, self._pipeo, self._pipee, warn=warn)
441 _cleanuppipes(self.ui, self._pipei, self._pipeo, self._pipee, warn=warn)
443
442
444 def __del__(self):
443 def __del__(self):
445 self._cleanup(warn=self._initstack)
444 self._cleanup(warn=self._initstack)
446
445
447 def _sendrequest(self, cmd, args, framed=False):
446 def _sendrequest(self, cmd, args, framed=False):
448 if self.ui.debugflag and self.ui.configbool(
447 if self.ui.debugflag and self.ui.configbool(
449 b'devel', b'debug.peer-request'
448 b'devel', b'debug.peer-request'
450 ):
449 ):
451 dbg = self.ui.debug
450 dbg = self.ui.debug
452 line = b'devel-peer-request: %s\n'
451 line = b'devel-peer-request: %s\n'
453 dbg(line % cmd)
452 dbg(line % cmd)
454 for key, value in sorted(args.items()):
453 for key, value in sorted(args.items()):
455 if not isinstance(value, dict):
454 if not isinstance(value, dict):
456 dbg(line % b' %s: %d bytes' % (key, len(value)))
455 dbg(line % b' %s: %d bytes' % (key, len(value)))
457 else:
456 else:
458 for dk, dv in sorted(value.items()):
457 for dk, dv in sorted(value.items()):
459 dbg(line % b' %s-%s: %d' % (key, dk, len(dv)))
458 dbg(line % b' %s-%s: %d' % (key, dk, len(dv)))
460 self.ui.debug(b"sending %s command\n" % cmd)
459 self.ui.debug(b"sending %s command\n" % cmd)
461 self._pipeo.write(b"%s\n" % cmd)
460 self._pipeo.write(b"%s\n" % cmd)
462 _func, names = wireprotov1server.commands[cmd]
461 _func, names = wireprotov1server.commands[cmd]
463 keys = names.split()
462 keys = names.split()
464 wireargs = {}
463 wireargs = {}
465 for k in keys:
464 for k in keys:
466 if k == b'*':
465 if k == b'*':
467 wireargs[b'*'] = args
466 wireargs[b'*'] = args
468 break
467 break
469 else:
468 else:
470 wireargs[k] = args[k]
469 wireargs[k] = args[k]
471 del args[k]
470 del args[k]
472 for k, v in sorted(wireargs.items()):
471 for k, v in sorted(wireargs.items()):
473 self._pipeo.write(b"%s %d\n" % (k, len(v)))
472 self._pipeo.write(b"%s %d\n" % (k, len(v)))
474 if isinstance(v, dict):
473 if isinstance(v, dict):
475 for dk, dv in v.items():
474 for dk, dv in v.items():
476 self._pipeo.write(b"%s %d\n" % (dk, len(dv)))
475 self._pipeo.write(b"%s %d\n" % (dk, len(dv)))
477 self._pipeo.write(dv)
476 self._pipeo.write(dv)
478 else:
477 else:
479 self._pipeo.write(v)
478 self._pipeo.write(v)
480 self._pipeo.flush()
479 self._pipeo.flush()
481
480
482 # We know exactly how many bytes are in the response. So return a proxy
481 # We know exactly how many bytes are in the response. So return a proxy
483 # around the raw output stream that allows reading exactly this many
482 # around the raw output stream that allows reading exactly this many
484 # bytes. Callers then can read() without fear of overrunning the
483 # bytes. Callers then can read() without fear of overrunning the
485 # response.
484 # response.
486 if framed:
485 if framed:
487 amount = self._getamount()
486 amount = self._getamount()
488 return util.cappedreader(self._pipei, amount)
487 return util.cappedreader(self._pipei, amount)
489
488
490 return self._pipei
489 return self._pipei
491
490
492 def _callstream(self, cmd, **args):
491 def _callstream(self, cmd, **args):
493 args = pycompat.byteskwargs(args)
492 args = pycompat.byteskwargs(args)
494 return self._sendrequest(cmd, args, framed=cmd in self._FRAMED_COMMANDS)
493 return self._sendrequest(cmd, args, framed=cmd in self._FRAMED_COMMANDS)
495
494
496 def _callcompressable(self, cmd, **args):
495 def _callcompressable(self, cmd, **args):
497 args = pycompat.byteskwargs(args)
496 args = pycompat.byteskwargs(args)
498 return self._sendrequest(cmd, args, framed=cmd in self._FRAMED_COMMANDS)
497 return self._sendrequest(cmd, args, framed=cmd in self._FRAMED_COMMANDS)
499
498
500 def _call(self, cmd, **args):
499 def _call(self, cmd, **args):
501 args = pycompat.byteskwargs(args)
500 args = pycompat.byteskwargs(args)
502 return self._sendrequest(cmd, args, framed=True).read()
501 return self._sendrequest(cmd, args, framed=True).read()
503
502
504 def _callpush(self, cmd, fp, **args):
503 def _callpush(self, cmd, fp, **args):
505 # The server responds with an empty frame if the client should
504 # The server responds with an empty frame if the client should
506 # continue submitting the payload.
505 # continue submitting the payload.
507 r = self._call(cmd, **args)
506 r = self._call(cmd, **args)
508 if r:
507 if r:
509 return b'', r
508 return b'', r
510
509
511 # The payload consists of frames with content followed by an empty
510 # The payload consists of frames with content followed by an empty
512 # frame.
511 # frame.
513 for d in iter(lambda: fp.read(4096), b''):
512 for d in iter(lambda: fp.read(4096), b''):
514 self._writeframed(d)
513 self._writeframed(d)
515 self._writeframed(b"", flush=True)
514 self._writeframed(b"", flush=True)
516
515
517 # In case of success, there is an empty frame and a frame containing
516 # In case of success, there is an empty frame and a frame containing
518 # the integer result (as a string).
517 # the integer result (as a string).
519 # In case of error, there is a non-empty frame containing the error.
518 # In case of error, there is a non-empty frame containing the error.
520 r = self._readframed()
519 r = self._readframed()
521 if r:
520 if r:
522 return b'', r
521 return b'', r
523 return self._readframed(), b''
522 return self._readframed(), b''
524
523
525 def _calltwowaystream(self, cmd, fp, **args):
524 def _calltwowaystream(self, cmd, fp, **args):
526 # The server responds with an empty frame if the client should
525 # The server responds with an empty frame if the client should
527 # continue submitting the payload.
526 # continue submitting the payload.
528 r = self._call(cmd, **args)
527 r = self._call(cmd, **args)
529 if r:
528 if r:
530 # XXX needs to be made better
529 # XXX needs to be made better
531 raise error.Abort(_(b'unexpected remote reply: %s') % r)
530 raise error.Abort(_(b'unexpected remote reply: %s') % r)
532
531
533 # The payload consists of frames with content followed by an empty
532 # The payload consists of frames with content followed by an empty
534 # frame.
533 # frame.
535 for d in iter(lambda: fp.read(4096), b''):
534 for d in iter(lambda: fp.read(4096), b''):
536 self._writeframed(d)
535 self._writeframed(d)
537 self._writeframed(b"", flush=True)
536 self._writeframed(b"", flush=True)
538
537
539 return self._pipei
538 return self._pipei
540
539
541 def _getamount(self):
540 def _getamount(self):
542 l = self._pipei.readline()
541 l = self._pipei.readline()
543 if l == b'\n':
542 if l == b'\n':
544 if self._autoreadstderr:
543 if self._autoreadstderr:
545 self._readerr()
544 self._readerr()
546 msg = _(b'check previous remote output')
545 msg = _(b'check previous remote output')
547 self._abort(error.OutOfBandError(hint=msg))
546 self._abort(error.OutOfBandError(hint=msg))
548 if self._autoreadstderr:
547 if self._autoreadstderr:
549 self._readerr()
548 self._readerr()
550 try:
549 try:
551 return int(l)
550 return int(l)
552 except ValueError:
551 except ValueError:
553 self._abort(error.ResponseError(_(b"unexpected response:"), l))
552 self._abort(error.ResponseError(_(b"unexpected response:"), l))
554
553
555 def _readframed(self):
554 def _readframed(self):
556 size = self._getamount()
555 size = self._getamount()
557 if not size:
556 if not size:
558 return b''
557 return b''
559
558
560 return self._pipei.read(size)
559 return self._pipei.read(size)
561
560
562 def _writeframed(self, data, flush=False):
561 def _writeframed(self, data, flush=False):
563 self._pipeo.write(b"%d\n" % len(data))
562 self._pipeo.write(b"%d\n" % len(data))
564 if data:
563 if data:
565 self._pipeo.write(data)
564 self._pipeo.write(data)
566 if flush:
565 if flush:
567 self._pipeo.flush()
566 self._pipeo.flush()
568 if self._autoreadstderr:
567 if self._autoreadstderr:
569 self._readerr()
568 self._readerr()
570
569
571
570
572 def makepeer(ui, path, proc, stdin, stdout, stderr, autoreadstderr=True):
571 def makepeer(ui, path, proc, stdin, stdout, stderr, autoreadstderr=True):
573 """Make a peer instance from existing pipes.
572 """Make a peer instance from existing pipes.
574
573
575 ``path`` and ``proc`` are stored on the eventual peer instance and may
574 ``path`` and ``proc`` are stored on the eventual peer instance and may
576 not be used for anything meaningful.
575 not be used for anything meaningful.
577
576
578 ``stdin``, ``stdout``, and ``stderr`` are the pipes connected to the
577 ``stdin``, ``stdout``, and ``stderr`` are the pipes connected to the
579 SSH server's stdio handles.
578 SSH server's stdio handles.
580
579
581 This function is factored out to allow creating peers that don't
580 This function is factored out to allow creating peers that don't
582 actually spawn a new process. It is useful for starting SSH protocol
581 actually spawn a new process. It is useful for starting SSH protocol
583 servers and clients via non-standard means, which can be useful for
582 servers and clients via non-standard means, which can be useful for
584 testing.
583 testing.
585 """
584 """
586 try:
585 try:
587 protoname, caps = _performhandshake(ui, stdin, stdout, stderr)
586 protoname, caps = _performhandshake(ui, stdin, stdout, stderr)
588 except Exception:
587 except Exception:
589 _cleanuppipes(ui, stdout, stdin, stderr, warn=None)
588 _cleanuppipes(ui, stdout, stdin, stderr, warn=None)
590 raise
589 raise
591
590
592 if protoname == wireprototypes.SSHV1:
591 if protoname == wireprototypes.SSHV1:
593 return sshv1peer(
592 return sshv1peer(
594 ui,
593 ui,
595 path,
594 path,
596 proc,
595 proc,
597 stdin,
596 stdin,
598 stdout,
597 stdout,
599 stderr,
598 stderr,
600 caps,
599 caps,
601 autoreadstderr=autoreadstderr,
600 autoreadstderr=autoreadstderr,
602 )
601 )
603 else:
602 else:
604 _cleanuppipes(ui, stdout, stdin, stderr, warn=None)
603 _cleanuppipes(ui, stdout, stdin, stderr, warn=None)
605 raise error.RepoError(
604 raise error.RepoError(
606 _(b'unknown version of SSH protocol: %s') % protoname
605 _(b'unknown version of SSH protocol: %s') % protoname
607 )
606 )
608
607
609
608
610 def make_peer(ui, path, create, intents=None, createopts=None):
609 def make_peer(ui, path, create, intents=None, createopts=None):
611 """Create an SSH peer.
610 """Create an SSH peer.
612
611
613 The returned object conforms to the ``wireprotov1peer.wirepeer`` interface.
612 The returned object conforms to the ``wireprotov1peer.wirepeer`` interface.
614 """
613 """
615 path = path.loc
614 u = urlutil.url(path.loc, parsequery=False, parsefragment=False)
616 u = urlutil.url(path, parsequery=False, parsefragment=False)
617 if u.scheme != b'ssh' or not u.host or u.path is None:
615 if u.scheme != b'ssh' or not u.host or u.path is None:
618 raise error.RepoError(_(b"couldn't parse location %s") % path)
616 raise error.RepoError(_(b"couldn't parse location %s") % path)
619
617
620 urlutil.checksafessh(path)
618 urlutil.checksafessh(path.loc)
621
619
622 if u.passwd is not None:
620 if u.passwd is not None:
623 raise error.RepoError(_(b'password in URL not supported'))
621 raise error.RepoError(_(b'password in URL not supported'))
624
622
625 sshcmd = ui.config(b'ui', b'ssh')
623 sshcmd = ui.config(b'ui', b'ssh')
626 remotecmd = ui.config(b'ui', b'remotecmd')
624 remotecmd = ui.config(b'ui', b'remotecmd')
627 sshaddenv = dict(ui.configitems(b'sshenv'))
625 sshaddenv = dict(ui.configitems(b'sshenv'))
628 sshenv = procutil.shellenviron(sshaddenv)
626 sshenv = procutil.shellenviron(sshaddenv)
629 remotepath = u.path or b'.'
627 remotepath = u.path or b'.'
630
628
631 args = procutil.sshargs(sshcmd, u.host, u.user, u.port)
629 args = procutil.sshargs(sshcmd, u.host, u.user, u.port)
632
630
633 if create:
631 if create:
634 # We /could/ do this, but only if the remote init command knows how to
632 # We /could/ do this, but only if the remote init command knows how to
635 # handle them. We don't yet make any assumptions about that. And without
633 # handle them. We don't yet make any assumptions about that. And without
636 # querying the remote, there's no way of knowing if the remote even
634 # querying the remote, there's no way of knowing if the remote even
637 # supports said requested feature.
635 # supports said requested feature.
638 if createopts:
636 if createopts:
639 raise error.RepoError(
637 raise error.RepoError(
640 _(
638 _(
641 b'cannot create remote SSH repositories '
639 b'cannot create remote SSH repositories '
642 b'with extra options'
640 b'with extra options'
643 )
641 )
644 )
642 )
645
643
646 cmd = b'%s %s %s' % (
644 cmd = b'%s %s %s' % (
647 sshcmd,
645 sshcmd,
648 args,
646 args,
649 procutil.shellquote(
647 procutil.shellquote(
650 b'%s init %s'
648 b'%s init %s'
651 % (_serverquote(remotecmd), _serverquote(remotepath))
649 % (_serverquote(remotecmd), _serverquote(remotepath))
652 ),
650 ),
653 )
651 )
654 ui.debug(b'running %s\n' % cmd)
652 ui.debug(b'running %s\n' % cmd)
655 res = ui.system(cmd, blockedtag=b'sshpeer', environ=sshenv)
653 res = ui.system(cmd, blockedtag=b'sshpeer', environ=sshenv)
656 if res != 0:
654 if res != 0:
657 raise error.RepoError(_(b'could not create remote repo'))
655 raise error.RepoError(_(b'could not create remote repo'))
658
656
659 proc, stdin, stdout, stderr = _makeconnection(
657 proc, stdin, stdout, stderr = _makeconnection(
660 ui, sshcmd, args, remotecmd, remotepath, sshenv
658 ui, sshcmd, args, remotecmd, remotepath, sshenv
661 )
659 )
662
660
663 peer = makepeer(ui, path, proc, stdin, stdout, stderr)
661 peer = makepeer(ui, path, proc, stdin, stdout, stderr)
664
662
665 # Finally, if supported by the server, notify it about our own
663 # Finally, if supported by the server, notify it about our own
666 # capabilities.
664 # capabilities.
667 if b'protocaps' in peer.capabilities():
665 if b'protocaps' in peer.capabilities():
668 try:
666 try:
669 peer._call(
667 peer._call(
670 b"protocaps", caps=b' '.join(sorted(_clientcapabilities()))
668 b"protocaps", caps=b' '.join(sorted(_clientcapabilities()))
671 )
669 )
672 except IOError:
670 except IOError:
673 peer._cleanup()
671 peer._cleanup()
674 raise error.RepoError(_(b'capability exchange failed'))
672 raise error.RepoError(_(b'capability exchange failed'))
675
673
676 return peer
674 return peer
General Comments 0
You need to be logged in to leave comments. Login now