##// END OF EJS Templates
sshpeer: defer pipe buffering and stderr sidechannel binding...
Gregory Szorc -
r36388:11ba1a96 default
parent child Browse files
Show More
@@ -1,557 +1,566 b''
1 # sshpeer.py - ssh repository proxy class for mercurial
1 # sshpeer.py - ssh repository proxy class for mercurial
2 #
2 #
3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import re
10 import re
11 import uuid
11 import uuid
12
12
13 from .i18n import _
13 from .i18n import _
14 from . import (
14 from . import (
15 error,
15 error,
16 pycompat,
16 pycompat,
17 util,
17 util,
18 wireproto,
18 wireproto,
19 wireprotoserver,
19 wireprotoserver,
20 )
20 )
21
21
22 def _serverquote(s):
22 def _serverquote(s):
23 """quote a string for the remote shell ... which we assume is sh"""
23 """quote a string for the remote shell ... which we assume is sh"""
24 if not s:
24 if not s:
25 return s
25 return s
26 if re.match('[a-zA-Z0-9@%_+=:,./-]*$', s):
26 if re.match('[a-zA-Z0-9@%_+=:,./-]*$', s):
27 return s
27 return s
28 return "'%s'" % s.replace("'", "'\\''")
28 return "'%s'" % s.replace("'", "'\\''")
29
29
30 def _forwardoutput(ui, pipe):
30 def _forwardoutput(ui, pipe):
31 """display all data currently available on pipe as remote output.
31 """display all data currently available on pipe as remote output.
32
32
33 This is non blocking."""
33 This is non blocking."""
34 s = util.readpipe(pipe)
34 s = util.readpipe(pipe)
35 if s:
35 if s:
36 for l in s.splitlines():
36 for l in s.splitlines():
37 ui.status(_("remote: "), l, '\n')
37 ui.status(_("remote: "), l, '\n')
38
38
39 class doublepipe(object):
39 class doublepipe(object):
40 """Operate a side-channel pipe in addition of a main one
40 """Operate a side-channel pipe in addition of a main one
41
41
42 The side-channel pipe contains server output to be forwarded to the user
42 The side-channel pipe contains server output to be forwarded to the user
43 input. The double pipe will behave as the "main" pipe, but will ensure the
43 input. The double pipe will behave as the "main" pipe, but will ensure the
44 content of the "side" pipe is properly processed while we wait for blocking
44 content of the "side" pipe is properly processed while we wait for blocking
45 call on the "main" pipe.
45 call on the "main" pipe.
46
46
47 If large amounts of data are read from "main", the forward will cease after
47 If large amounts of data are read from "main", the forward will cease after
48 the first bytes start to appear. This simplifies the implementation
48 the first bytes start to appear. This simplifies the implementation
49 without affecting actual output of sshpeer too much as we rarely issue
49 without affecting actual output of sshpeer too much as we rarely issue
50 large read for data not yet emitted by the server.
50 large read for data not yet emitted by the server.
51
51
52 The main pipe is expected to be a 'bufferedinputpipe' from the util module
52 The main pipe is expected to be a 'bufferedinputpipe' from the util module
53 that handle all the os specific bits. This class lives in this module
53 that handle all the os specific bits. This class lives in this module
54 because it focus on behavior specific to the ssh protocol."""
54 because it focus on behavior specific to the ssh protocol."""
55
55
56 def __init__(self, ui, main, side):
56 def __init__(self, ui, main, side):
57 self._ui = ui
57 self._ui = ui
58 self._main = main
58 self._main = main
59 self._side = side
59 self._side = side
60
60
61 def _wait(self):
61 def _wait(self):
62 """wait until some data are available on main or side
62 """wait until some data are available on main or side
63
63
64 return a pair of boolean (ismainready, issideready)
64 return a pair of boolean (ismainready, issideready)
65
65
66 (This will only wait for data if the setup is supported by `util.poll`)
66 (This will only wait for data if the setup is supported by `util.poll`)
67 """
67 """
68 if (isinstance(self._main, util.bufferedinputpipe) and
68 if (isinstance(self._main, util.bufferedinputpipe) and
69 self._main.hasbuffer):
69 self._main.hasbuffer):
70 # Main has data. Assume side is worth poking at.
70 # Main has data. Assume side is worth poking at.
71 return True, True
71 return True, True
72
72
73 fds = [self._main.fileno(), self._side.fileno()]
73 fds = [self._main.fileno(), self._side.fileno()]
74 try:
74 try:
75 act = util.poll(fds)
75 act = util.poll(fds)
76 except NotImplementedError:
76 except NotImplementedError:
77 # non supported yet case, assume all have data.
77 # non supported yet case, assume all have data.
78 act = fds
78 act = fds
79 return (self._main.fileno() in act, self._side.fileno() in act)
79 return (self._main.fileno() in act, self._side.fileno() in act)
80
80
81 def write(self, data):
81 def write(self, data):
82 return self._call('write', data)
82 return self._call('write', data)
83
83
84 def read(self, size):
84 def read(self, size):
85 r = self._call('read', size)
85 r = self._call('read', size)
86 if size != 0 and not r:
86 if size != 0 and not r:
87 # We've observed a condition that indicates the
87 # We've observed a condition that indicates the
88 # stdout closed unexpectedly. Check stderr one
88 # stdout closed unexpectedly. Check stderr one
89 # more time and snag anything that's there before
89 # more time and snag anything that's there before
90 # letting anyone know the main part of the pipe
90 # letting anyone know the main part of the pipe
91 # closed prematurely.
91 # closed prematurely.
92 _forwardoutput(self._ui, self._side)
92 _forwardoutput(self._ui, self._side)
93 return r
93 return r
94
94
95 def readline(self):
95 def readline(self):
96 return self._call('readline')
96 return self._call('readline')
97
97
98 def _call(self, methname, data=None):
98 def _call(self, methname, data=None):
99 """call <methname> on "main", forward output of "side" while blocking
99 """call <methname> on "main", forward output of "side" while blocking
100 """
100 """
101 # data can be '' or 0
101 # data can be '' or 0
102 if (data is not None and not data) or self._main.closed:
102 if (data is not None and not data) or self._main.closed:
103 _forwardoutput(self._ui, self._side)
103 _forwardoutput(self._ui, self._side)
104 return ''
104 return ''
105 while True:
105 while True:
106 mainready, sideready = self._wait()
106 mainready, sideready = self._wait()
107 if sideready:
107 if sideready:
108 _forwardoutput(self._ui, self._side)
108 _forwardoutput(self._ui, self._side)
109 if mainready:
109 if mainready:
110 meth = getattr(self._main, methname)
110 meth = getattr(self._main, methname)
111 if data is None:
111 if data is None:
112 return meth()
112 return meth()
113 else:
113 else:
114 return meth(data)
114 return meth(data)
115
115
116 def close(self):
116 def close(self):
117 return self._main.close()
117 return self._main.close()
118
118
119 def flush(self):
119 def flush(self):
120 return self._main.flush()
120 return self._main.flush()
121
121
122 def _cleanuppipes(ui, pipei, pipeo, pipee):
122 def _cleanuppipes(ui, pipei, pipeo, pipee):
123 """Clean up pipes used by an SSH connection."""
123 """Clean up pipes used by an SSH connection."""
124 if pipeo:
124 if pipeo:
125 pipeo.close()
125 pipeo.close()
126 if pipei:
126 if pipei:
127 pipei.close()
127 pipei.close()
128
128
129 if pipee:
129 if pipee:
130 # Try to read from the err descriptor until EOF.
130 # Try to read from the err descriptor until EOF.
131 try:
131 try:
132 for l in pipee:
132 for l in pipee:
133 ui.status(_('remote: '), l)
133 ui.status(_('remote: '), l)
134 except (IOError, ValueError):
134 except (IOError, ValueError):
135 pass
135 pass
136
136
137 pipee.close()
137 pipee.close()
138
138
139 def _makeconnection(ui, sshcmd, args, remotecmd, path, sshenv=None):
139 def _makeconnection(ui, sshcmd, args, remotecmd, path, sshenv=None):
140 """Create an SSH connection to a server.
140 """Create an SSH connection to a server.
141
141
142 Returns a tuple of (process, stdin, stdout, stderr) for the
142 Returns a tuple of (process, stdin, stdout, stderr) for the
143 spawned process.
143 spawned process.
144 """
144 """
145 cmd = '%s %s %s' % (
145 cmd = '%s %s %s' % (
146 sshcmd,
146 sshcmd,
147 args,
147 args,
148 util.shellquote('%s -R %s serve --stdio' % (
148 util.shellquote('%s -R %s serve --stdio' % (
149 _serverquote(remotecmd), _serverquote(path))))
149 _serverquote(remotecmd), _serverquote(path))))
150
150
151 ui.debug('running %s\n' % cmd)
151 ui.debug('running %s\n' % cmd)
152 cmd = util.quotecommand(cmd)
152 cmd = util.quotecommand(cmd)
153
153
154 # no buffer allow the use of 'select'
154 # no buffer allow the use of 'select'
155 # feel free to remove buffering and select usage when we ultimately
155 # feel free to remove buffering and select usage when we ultimately
156 # move to threading.
156 # move to threading.
157 stdin, stdout, stderr, proc = util.popen4(cmd, bufsize=0, env=sshenv)
157 stdin, stdout, stderr, proc = util.popen4(cmd, bufsize=0, env=sshenv)
158
158
159 stdout = doublepipe(ui, util.bufferedinputpipe(stdout), stderr)
160 stdin = doublepipe(ui, stdin, stderr)
161
162 return proc, stdin, stdout, stderr
159 return proc, stdin, stdout, stderr
163
160
164 def _performhandshake(ui, stdin, stdout, stderr):
161 def _performhandshake(ui, stdin, stdout, stderr):
165 def badresponse():
162 def badresponse():
163 # Flush any output on stderr.
164 _forwardoutput(ui, stderr)
165
166 msg = _('no suitable response from remote hg')
166 msg = _('no suitable response from remote hg')
167 hint = ui.config('ui', 'ssherrorhint')
167 hint = ui.config('ui', 'ssherrorhint')
168 raise error.RepoError(msg, hint=hint)
168 raise error.RepoError(msg, hint=hint)
169
169
170 # The handshake consists of sending wire protocol commands in reverse
170 # The handshake consists of sending wire protocol commands in reverse
171 # order of protocol implementation and then sniffing for a response
171 # order of protocol implementation and then sniffing for a response
172 # to one of them.
172 # to one of them.
173 #
173 #
174 # Those commands (from oldest to newest) are:
174 # Those commands (from oldest to newest) are:
175 #
175 #
176 # ``between``
176 # ``between``
177 # Asks for the set of revisions between a pair of revisions. Command
177 # Asks for the set of revisions between a pair of revisions. Command
178 # present in all Mercurial server implementations.
178 # present in all Mercurial server implementations.
179 #
179 #
180 # ``hello``
180 # ``hello``
181 # Instructs the server to advertise its capabilities. Introduced in
181 # Instructs the server to advertise its capabilities. Introduced in
182 # Mercurial 0.9.1.
182 # Mercurial 0.9.1.
183 #
183 #
184 # ``upgrade``
184 # ``upgrade``
185 # Requests upgrade from default transport protocol version 1 to
185 # Requests upgrade from default transport protocol version 1 to
186 # a newer version. Introduced in Mercurial 4.6 as an experimental
186 # a newer version. Introduced in Mercurial 4.6 as an experimental
187 # feature.
187 # feature.
188 #
188 #
189 # The ``between`` command is issued with a request for the null
189 # The ``between`` command is issued with a request for the null
190 # range. If the remote is a Mercurial server, this request will
190 # range. If the remote is a Mercurial server, this request will
191 # generate a specific response: ``1\n\n``. This represents the
191 # generate a specific response: ``1\n\n``. This represents the
192 # wire protocol encoded value for ``\n``. We look for ``1\n\n``
192 # wire protocol encoded value for ``\n``. We look for ``1\n\n``
193 # in the output stream and know this is the response to ``between``
193 # in the output stream and know this is the response to ``between``
194 # and we're at the end of our handshake reply.
194 # and we're at the end of our handshake reply.
195 #
195 #
196 # The response to the ``hello`` command will be a line with the
196 # The response to the ``hello`` command will be a line with the
197 # length of the value returned by that command followed by that
197 # length of the value returned by that command followed by that
198 # value. If the server doesn't support ``hello`` (which should be
198 # value. If the server doesn't support ``hello`` (which should be
199 # rare), that line will be ``0\n``. Otherwise, the value will contain
199 # rare), that line will be ``0\n``. Otherwise, the value will contain
200 # RFC 822 like lines. Of these, the ``capabilities:`` line contains
200 # RFC 822 like lines. Of these, the ``capabilities:`` line contains
201 # the capabilities of the server.
201 # the capabilities of the server.
202 #
202 #
203 # The ``upgrade`` command isn't really a command in the traditional
203 # The ``upgrade`` command isn't really a command in the traditional
204 # sense of version 1 of the transport because it isn't using the
204 # sense of version 1 of the transport because it isn't using the
205 # proper mechanism for formatting insteads: instead, it just encodes
205 # proper mechanism for formatting insteads: instead, it just encodes
206 # arguments on the line, delimited by spaces.
206 # arguments on the line, delimited by spaces.
207 #
207 #
208 # The ``upgrade`` line looks like ``upgrade <token> <capabilities>``.
208 # The ``upgrade`` line looks like ``upgrade <token> <capabilities>``.
209 # If the server doesn't support protocol upgrades, it will reply to
209 # If the server doesn't support protocol upgrades, it will reply to
210 # this line with ``0\n``. Otherwise, it emits an
210 # this line with ``0\n``. Otherwise, it emits an
211 # ``upgraded <token> <protocol>`` line to both stdout and stderr.
211 # ``upgraded <token> <protocol>`` line to both stdout and stderr.
212 # Content immediately following this line describes additional
212 # Content immediately following this line describes additional
213 # protocol and server state.
213 # protocol and server state.
214 #
214 #
215 # In addition to the responses to our command requests, the server
215 # In addition to the responses to our command requests, the server
216 # may emit "banner" output on stdout. SSH servers are allowed to
216 # may emit "banner" output on stdout. SSH servers are allowed to
217 # print messages to stdout on login. Issuing commands on connection
217 # print messages to stdout on login. Issuing commands on connection
218 # allows us to flush this banner output from the server by scanning
218 # allows us to flush this banner output from the server by scanning
219 # for output to our well-known ``between`` command. Of course, if
219 # for output to our well-known ``between`` command. Of course, if
220 # the banner contains ``1\n\n``, this will throw off our detection.
220 # the banner contains ``1\n\n``, this will throw off our detection.
221
221
222 requestlog = ui.configbool('devel', 'debug.peer-request')
222 requestlog = ui.configbool('devel', 'debug.peer-request')
223
223
224 # Generate a random token to help identify responses to version 2
224 # Generate a random token to help identify responses to version 2
225 # upgrade request.
225 # upgrade request.
226 token = pycompat.sysbytes(str(uuid.uuid4()))
226 token = pycompat.sysbytes(str(uuid.uuid4()))
227 upgradecaps = [
227 upgradecaps = [
228 ('proto', wireprotoserver.SSHV2),
228 ('proto', wireprotoserver.SSHV2),
229 ]
229 ]
230 upgradecaps = util.urlreq.urlencode(upgradecaps)
230 upgradecaps = util.urlreq.urlencode(upgradecaps)
231
231
232 try:
232 try:
233 pairsarg = '%s-%s' % ('0' * 40, '0' * 40)
233 pairsarg = '%s-%s' % ('0' * 40, '0' * 40)
234 handshake = [
234 handshake = [
235 'hello\n',
235 'hello\n',
236 'between\n',
236 'between\n',
237 'pairs %d\n' % len(pairsarg),
237 'pairs %d\n' % len(pairsarg),
238 pairsarg,
238 pairsarg,
239 ]
239 ]
240
240
241 # Request upgrade to version 2 if configured.
241 # Request upgrade to version 2 if configured.
242 if ui.configbool('experimental', 'sshpeer.advertise-v2'):
242 if ui.configbool('experimental', 'sshpeer.advertise-v2'):
243 ui.debug('sending upgrade request: %s %s\n' % (token, upgradecaps))
243 ui.debug('sending upgrade request: %s %s\n' % (token, upgradecaps))
244 handshake.insert(0, 'upgrade %s %s\n' % (token, upgradecaps))
244 handshake.insert(0, 'upgrade %s %s\n' % (token, upgradecaps))
245
245
246 if requestlog:
246 if requestlog:
247 ui.debug('devel-peer-request: hello\n')
247 ui.debug('devel-peer-request: hello\n')
248 ui.debug('sending hello command\n')
248 ui.debug('sending hello command\n')
249 if requestlog:
249 if requestlog:
250 ui.debug('devel-peer-request: between\n')
250 ui.debug('devel-peer-request: between\n')
251 ui.debug('devel-peer-request: pairs: %d bytes\n' % len(pairsarg))
251 ui.debug('devel-peer-request: pairs: %d bytes\n' % len(pairsarg))
252 ui.debug('sending between command\n')
252 ui.debug('sending between command\n')
253
253
254 stdin.write(''.join(handshake))
254 stdin.write(''.join(handshake))
255 stdin.flush()
255 stdin.flush()
256 except IOError:
256 except IOError:
257 badresponse()
257 badresponse()
258
258
259 # Assume version 1 of wire protocol by default.
259 # Assume version 1 of wire protocol by default.
260 protoname = wireprotoserver.SSHV1
260 protoname = wireprotoserver.SSHV1
261 reupgraded = re.compile(b'^upgraded %s (.*)$' % re.escape(token))
261 reupgraded = re.compile(b'^upgraded %s (.*)$' % re.escape(token))
262
262
263 lines = ['', 'dummy']
263 lines = ['', 'dummy']
264 max_noise = 500
264 max_noise = 500
265 while lines[-1] and max_noise:
265 while lines[-1] and max_noise:
266 try:
266 try:
267 l = stdout.readline()
267 l = stdout.readline()
268 _forwardoutput(ui, stderr)
268 _forwardoutput(ui, stderr)
269
269
270 # Look for reply to protocol upgrade request. It has a token
270 # Look for reply to protocol upgrade request. It has a token
271 # in it, so there should be no false positives.
271 # in it, so there should be no false positives.
272 m = reupgraded.match(l)
272 m = reupgraded.match(l)
273 if m:
273 if m:
274 protoname = m.group(1)
274 protoname = m.group(1)
275 ui.debug('protocol upgraded to %s\n' % protoname)
275 ui.debug('protocol upgraded to %s\n' % protoname)
276 # If an upgrade was handled, the ``hello`` and ``between``
276 # If an upgrade was handled, the ``hello`` and ``between``
277 # requests are ignored. The next output belongs to the
277 # requests are ignored. The next output belongs to the
278 # protocol, so stop scanning lines.
278 # protocol, so stop scanning lines.
279 break
279 break
280
280
281 # Otherwise it could be a banner, ``0\n`` response if server
281 # Otherwise it could be a banner, ``0\n`` response if server
282 # doesn't support upgrade.
282 # doesn't support upgrade.
283
283
284 if lines[-1] == '1\n' and l == '\n':
284 if lines[-1] == '1\n' and l == '\n':
285 break
285 break
286 if l:
286 if l:
287 ui.debug('remote: ', l)
287 ui.debug('remote: ', l)
288 lines.append(l)
288 lines.append(l)
289 max_noise -= 1
289 max_noise -= 1
290 except IOError:
290 except IOError:
291 badresponse()
291 badresponse()
292 else:
292 else:
293 badresponse()
293 badresponse()
294
294
295 caps = set()
295 caps = set()
296
296
297 # For version 1, we should see a ``capabilities`` line in response to the
297 # For version 1, we should see a ``capabilities`` line in response to the
298 # ``hello`` command.
298 # ``hello`` command.
299 if protoname == wireprotoserver.SSHV1:
299 if protoname == wireprotoserver.SSHV1:
300 for l in reversed(lines):
300 for l in reversed(lines):
301 # Look for response to ``hello`` command. Scan from the back so
301 # Look for response to ``hello`` command. Scan from the back so
302 # we don't misinterpret banner output as the command reply.
302 # we don't misinterpret banner output as the command reply.
303 if l.startswith('capabilities:'):
303 if l.startswith('capabilities:'):
304 caps.update(l[:-1].split(':')[1].split())
304 caps.update(l[:-1].split(':')[1].split())
305 break
305 break
306 elif protoname == wireprotoserver.SSHV2:
306 elif protoname == wireprotoserver.SSHV2:
307 # We see a line with number of bytes to follow and then a value
307 # We see a line with number of bytes to follow and then a value
308 # looking like ``capabilities: *``.
308 # looking like ``capabilities: *``.
309 line = stdout.readline()
309 line = stdout.readline()
310 try:
310 try:
311 valuelen = int(line)
311 valuelen = int(line)
312 except ValueError:
312 except ValueError:
313 badresponse()
313 badresponse()
314
314
315 capsline = stdout.read(valuelen)
315 capsline = stdout.read(valuelen)
316 if not capsline.startswith('capabilities: '):
316 if not capsline.startswith('capabilities: '):
317 badresponse()
317 badresponse()
318
318
319 ui.debug('remote: %s\n' % capsline)
319 ui.debug('remote: %s\n' % capsline)
320
320
321 caps.update(capsline.split(':')[1].split())
321 caps.update(capsline.split(':')[1].split())
322 # Trailing newline.
322 # Trailing newline.
323 stdout.read(1)
323 stdout.read(1)
324
324
325 # Error if we couldn't find capabilities, this means:
325 # Error if we couldn't find capabilities, this means:
326 #
326 #
327 # 1. Remote isn't a Mercurial server
327 # 1. Remote isn't a Mercurial server
328 # 2. Remote is a <0.9.1 Mercurial server
328 # 2. Remote is a <0.9.1 Mercurial server
329 # 3. Remote is a future Mercurial server that dropped ``hello``
329 # 3. Remote is a future Mercurial server that dropped ``hello``
330 # and other attempted handshake mechanisms.
330 # and other attempted handshake mechanisms.
331 if not caps:
331 if not caps:
332 badresponse()
332 badresponse()
333
333
334 # Flush any output on stderr before proceeding.
335 _forwardoutput(ui, stderr)
336
334 return protoname, caps
337 return protoname, caps
335
338
336 class sshv1peer(wireproto.wirepeer):
339 class sshv1peer(wireproto.wirepeer):
337 def __init__(self, ui, url, proc, stdin, stdout, stderr, caps):
340 def __init__(self, ui, url, proc, stdin, stdout, stderr, caps):
338 """Create a peer from an existing SSH connection.
341 """Create a peer from an existing SSH connection.
339
342
340 ``proc`` is a handle on the underlying SSH process.
343 ``proc`` is a handle on the underlying SSH process.
341 ``stdin``, ``stdout``, and ``stderr`` are handles on the stdio
344 ``stdin``, ``stdout``, and ``stderr`` are handles on the stdio
342 pipes for that process.
345 pipes for that process.
343 ``caps`` is a set of capabilities supported by the remote.
346 ``caps`` is a set of capabilities supported by the remote.
344 """
347 """
345 self._url = url
348 self._url = url
346 self._ui = ui
349 self._ui = ui
347 # self._subprocess is unused. Keeping a handle on the process
350 # self._subprocess is unused. Keeping a handle on the process
348 # holds a reference and prevents it from being garbage collected.
351 # holds a reference and prevents it from being garbage collected.
349 self._subprocess = proc
352 self._subprocess = proc
353
354 # And we hook up our "doublepipe" wrapper to allow querying
355 # stderr any time we perform I/O.
356 stdout = doublepipe(ui, util.bufferedinputpipe(stdout), stderr)
357 stdin = doublepipe(ui, stdin, stderr)
358
350 self._pipeo = stdin
359 self._pipeo = stdin
351 self._pipei = stdout
360 self._pipei = stdout
352 self._pipee = stderr
361 self._pipee = stderr
353 self._caps = caps
362 self._caps = caps
354
363
355 # Commands that have a "framed" response where the first line of the
364 # Commands that have a "framed" response where the first line of the
356 # response contains the length of that response.
365 # response contains the length of that response.
357 _FRAMED_COMMANDS = {
366 _FRAMED_COMMANDS = {
358 'batch',
367 'batch',
359 }
368 }
360
369
361 # Begin of _basepeer interface.
370 # Begin of _basepeer interface.
362
371
363 @util.propertycache
372 @util.propertycache
364 def ui(self):
373 def ui(self):
365 return self._ui
374 return self._ui
366
375
367 def url(self):
376 def url(self):
368 return self._url
377 return self._url
369
378
370 def local(self):
379 def local(self):
371 return None
380 return None
372
381
373 def peer(self):
382 def peer(self):
374 return self
383 return self
375
384
376 def canpush(self):
385 def canpush(self):
377 return True
386 return True
378
387
379 def close(self):
388 def close(self):
380 pass
389 pass
381
390
382 # End of _basepeer interface.
391 # End of _basepeer interface.
383
392
384 # Begin of _basewirecommands interface.
393 # Begin of _basewirecommands interface.
385
394
386 def capabilities(self):
395 def capabilities(self):
387 return self._caps
396 return self._caps
388
397
389 # End of _basewirecommands interface.
398 # End of _basewirecommands interface.
390
399
391 def _readerr(self):
400 def _readerr(self):
392 _forwardoutput(self.ui, self._pipee)
401 _forwardoutput(self.ui, self._pipee)
393
402
394 def _abort(self, exception):
403 def _abort(self, exception):
395 self._cleanup()
404 self._cleanup()
396 raise exception
405 raise exception
397
406
398 def _cleanup(self):
407 def _cleanup(self):
399 _cleanuppipes(self.ui, self._pipei, self._pipeo, self._pipee)
408 _cleanuppipes(self.ui, self._pipei, self._pipeo, self._pipee)
400
409
401 __del__ = _cleanup
410 __del__ = _cleanup
402
411
403 def _sendrequest(self, cmd, args, framed=False):
412 def _sendrequest(self, cmd, args, framed=False):
404 if (self.ui.debugflag
413 if (self.ui.debugflag
405 and self.ui.configbool('devel', 'debug.peer-request')):
414 and self.ui.configbool('devel', 'debug.peer-request')):
406 dbg = self.ui.debug
415 dbg = self.ui.debug
407 line = 'devel-peer-request: %s\n'
416 line = 'devel-peer-request: %s\n'
408 dbg(line % cmd)
417 dbg(line % cmd)
409 for key, value in sorted(args.items()):
418 for key, value in sorted(args.items()):
410 if not isinstance(value, dict):
419 if not isinstance(value, dict):
411 dbg(line % ' %s: %d bytes' % (key, len(value)))
420 dbg(line % ' %s: %d bytes' % (key, len(value)))
412 else:
421 else:
413 for dk, dv in sorted(value.items()):
422 for dk, dv in sorted(value.items()):
414 dbg(line % ' %s-%s: %d' % (key, dk, len(dv)))
423 dbg(line % ' %s-%s: %d' % (key, dk, len(dv)))
415 self.ui.debug("sending %s command\n" % cmd)
424 self.ui.debug("sending %s command\n" % cmd)
416 self._pipeo.write("%s\n" % cmd)
425 self._pipeo.write("%s\n" % cmd)
417 _func, names = wireproto.commands[cmd]
426 _func, names = wireproto.commands[cmd]
418 keys = names.split()
427 keys = names.split()
419 wireargs = {}
428 wireargs = {}
420 for k in keys:
429 for k in keys:
421 if k == '*':
430 if k == '*':
422 wireargs['*'] = args
431 wireargs['*'] = args
423 break
432 break
424 else:
433 else:
425 wireargs[k] = args[k]
434 wireargs[k] = args[k]
426 del args[k]
435 del args[k]
427 for k, v in sorted(wireargs.iteritems()):
436 for k, v in sorted(wireargs.iteritems()):
428 self._pipeo.write("%s %d\n" % (k, len(v)))
437 self._pipeo.write("%s %d\n" % (k, len(v)))
429 if isinstance(v, dict):
438 if isinstance(v, dict):
430 for dk, dv in v.iteritems():
439 for dk, dv in v.iteritems():
431 self._pipeo.write("%s %d\n" % (dk, len(dv)))
440 self._pipeo.write("%s %d\n" % (dk, len(dv)))
432 self._pipeo.write(dv)
441 self._pipeo.write(dv)
433 else:
442 else:
434 self._pipeo.write(v)
443 self._pipeo.write(v)
435 self._pipeo.flush()
444 self._pipeo.flush()
436
445
437 # We know exactly how many bytes are in the response. So return a proxy
446 # We know exactly how many bytes are in the response. So return a proxy
438 # around the raw output stream that allows reading exactly this many
447 # around the raw output stream that allows reading exactly this many
439 # bytes. Callers then can read() without fear of overrunning the
448 # bytes. Callers then can read() without fear of overrunning the
440 # response.
449 # response.
441 if framed:
450 if framed:
442 amount = self._getamount()
451 amount = self._getamount()
443 return util.cappedreader(self._pipei, amount)
452 return util.cappedreader(self._pipei, amount)
444
453
445 return self._pipei
454 return self._pipei
446
455
447 def _callstream(self, cmd, **args):
456 def _callstream(self, cmd, **args):
448 args = pycompat.byteskwargs(args)
457 args = pycompat.byteskwargs(args)
449 return self._sendrequest(cmd, args, framed=cmd in self._FRAMED_COMMANDS)
458 return self._sendrequest(cmd, args, framed=cmd in self._FRAMED_COMMANDS)
450
459
451 def _callcompressable(self, cmd, **args):
460 def _callcompressable(self, cmd, **args):
452 args = pycompat.byteskwargs(args)
461 args = pycompat.byteskwargs(args)
453 return self._sendrequest(cmd, args, framed=cmd in self._FRAMED_COMMANDS)
462 return self._sendrequest(cmd, args, framed=cmd in self._FRAMED_COMMANDS)
454
463
455 def _call(self, cmd, **args):
464 def _call(self, cmd, **args):
456 args = pycompat.byteskwargs(args)
465 args = pycompat.byteskwargs(args)
457 return self._sendrequest(cmd, args, framed=True).read()
466 return self._sendrequest(cmd, args, framed=True).read()
458
467
459 def _callpush(self, cmd, fp, **args):
468 def _callpush(self, cmd, fp, **args):
460 r = self._call(cmd, **args)
469 r = self._call(cmd, **args)
461 if r:
470 if r:
462 return '', r
471 return '', r
463 for d in iter(lambda: fp.read(4096), ''):
472 for d in iter(lambda: fp.read(4096), ''):
464 self._writeframed(d)
473 self._writeframed(d)
465 self._writeframed("", flush=True)
474 self._writeframed("", flush=True)
466 r = self._readframed()
475 r = self._readframed()
467 if r:
476 if r:
468 return '', r
477 return '', r
469 return self._readframed(), ''
478 return self._readframed(), ''
470
479
471 def _calltwowaystream(self, cmd, fp, **args):
480 def _calltwowaystream(self, cmd, fp, **args):
472 r = self._call(cmd, **args)
481 r = self._call(cmd, **args)
473 if r:
482 if r:
474 # XXX needs to be made better
483 # XXX needs to be made better
475 raise error.Abort(_('unexpected remote reply: %s') % r)
484 raise error.Abort(_('unexpected remote reply: %s') % r)
476 for d in iter(lambda: fp.read(4096), ''):
485 for d in iter(lambda: fp.read(4096), ''):
477 self._writeframed(d)
486 self._writeframed(d)
478 self._writeframed("", flush=True)
487 self._writeframed("", flush=True)
479 return self._pipei
488 return self._pipei
480
489
481 def _getamount(self):
490 def _getamount(self):
482 l = self._pipei.readline()
491 l = self._pipei.readline()
483 if l == '\n':
492 if l == '\n':
484 self._readerr()
493 self._readerr()
485 msg = _('check previous remote output')
494 msg = _('check previous remote output')
486 self._abort(error.OutOfBandError(hint=msg))
495 self._abort(error.OutOfBandError(hint=msg))
487 self._readerr()
496 self._readerr()
488 try:
497 try:
489 return int(l)
498 return int(l)
490 except ValueError:
499 except ValueError:
491 self._abort(error.ResponseError(_("unexpected response:"), l))
500 self._abort(error.ResponseError(_("unexpected response:"), l))
492
501
493 def _readframed(self):
502 def _readframed(self):
494 return self._pipei.read(self._getamount())
503 return self._pipei.read(self._getamount())
495
504
496 def _writeframed(self, data, flush=False):
505 def _writeframed(self, data, flush=False):
497 self._pipeo.write("%d\n" % len(data))
506 self._pipeo.write("%d\n" % len(data))
498 if data:
507 if data:
499 self._pipeo.write(data)
508 self._pipeo.write(data)
500 if flush:
509 if flush:
501 self._pipeo.flush()
510 self._pipeo.flush()
502 self._readerr()
511 self._readerr()
503
512
504 class sshv2peer(sshv1peer):
513 class sshv2peer(sshv1peer):
505 """A peer that speakers version 2 of the transport protocol."""
514 """A peer that speakers version 2 of the transport protocol."""
506 # Currently version 2 is identical to version 1 post handshake.
515 # Currently version 2 is identical to version 1 post handshake.
507 # And handshake is performed before the peer is instantiated. So
516 # And handshake is performed before the peer is instantiated. So
508 # we need no custom code.
517 # we need no custom code.
509
518
510 def instance(ui, path, create):
519 def instance(ui, path, create):
511 """Create an SSH peer.
520 """Create an SSH peer.
512
521
513 The returned object conforms to the ``wireproto.wirepeer`` interface.
522 The returned object conforms to the ``wireproto.wirepeer`` interface.
514 """
523 """
515 u = util.url(path, parsequery=False, parsefragment=False)
524 u = util.url(path, parsequery=False, parsefragment=False)
516 if u.scheme != 'ssh' or not u.host or u.path is None:
525 if u.scheme != 'ssh' or not u.host or u.path is None:
517 raise error.RepoError(_("couldn't parse location %s") % path)
526 raise error.RepoError(_("couldn't parse location %s") % path)
518
527
519 util.checksafessh(path)
528 util.checksafessh(path)
520
529
521 if u.passwd is not None:
530 if u.passwd is not None:
522 raise error.RepoError(_('password in URL not supported'))
531 raise error.RepoError(_('password in URL not supported'))
523
532
524 sshcmd = ui.config('ui', 'ssh')
533 sshcmd = ui.config('ui', 'ssh')
525 remotecmd = ui.config('ui', 'remotecmd')
534 remotecmd = ui.config('ui', 'remotecmd')
526 sshaddenv = dict(ui.configitems('sshenv'))
535 sshaddenv = dict(ui.configitems('sshenv'))
527 sshenv = util.shellenviron(sshaddenv)
536 sshenv = util.shellenviron(sshaddenv)
528 remotepath = u.path or '.'
537 remotepath = u.path or '.'
529
538
530 args = util.sshargs(sshcmd, u.host, u.user, u.port)
539 args = util.sshargs(sshcmd, u.host, u.user, u.port)
531
540
532 if create:
541 if create:
533 cmd = '%s %s %s' % (sshcmd, args,
542 cmd = '%s %s %s' % (sshcmd, args,
534 util.shellquote('%s init %s' %
543 util.shellquote('%s init %s' %
535 (_serverquote(remotecmd), _serverquote(remotepath))))
544 (_serverquote(remotecmd), _serverquote(remotepath))))
536 ui.debug('running %s\n' % cmd)
545 ui.debug('running %s\n' % cmd)
537 res = ui.system(cmd, blockedtag='sshpeer', environ=sshenv)
546 res = ui.system(cmd, blockedtag='sshpeer', environ=sshenv)
538 if res != 0:
547 if res != 0:
539 raise error.RepoError(_('could not create remote repo'))
548 raise error.RepoError(_('could not create remote repo'))
540
549
541 proc, stdin, stdout, stderr = _makeconnection(ui, sshcmd, args, remotecmd,
550 proc, stdin, stdout, stderr = _makeconnection(ui, sshcmd, args, remotecmd,
542 remotepath, sshenv)
551 remotepath, sshenv)
543
552
544 try:
553 try:
545 protoname, caps = _performhandshake(ui, stdin, stdout, stderr)
554 protoname, caps = _performhandshake(ui, stdin, stdout, stderr)
546 except Exception:
555 except Exception:
547 _cleanuppipes(ui, stdout, stdin, stderr)
556 _cleanuppipes(ui, stdout, stdin, stderr)
548 raise
557 raise
549
558
550 if protoname == wireprotoserver.SSHV1:
559 if protoname == wireprotoserver.SSHV1:
551 return sshv1peer(ui, path, proc, stdin, stdout, stderr, caps)
560 return sshv1peer(ui, path, proc, stdin, stdout, stderr, caps)
552 elif protoname == wireprotoserver.SSHV2:
561 elif protoname == wireprotoserver.SSHV2:
553 return sshv2peer(ui, path, proc, stdin, stdout, stderr, caps)
562 return sshv2peer(ui, path, proc, stdin, stdout, stderr, caps)
554 else:
563 else:
555 _cleanuppipes(ui, stdout, stdin, stderr)
564 _cleanuppipes(ui, stdout, stdin, stderr)
556 raise error.RepoError(_('unknown version of SSH protocol: %s') %
565 raise error.RepoError(_('unknown version of SSH protocol: %s') %
557 protoname)
566 protoname)
@@ -1,76 +1,80 b''
1 # Test that certain objects conform to well-defined interfaces.
1 # Test that certain objects conform to well-defined interfaces.
2
2
3 from __future__ import absolute_import, print_function
3 from __future__ import absolute_import, print_function
4
4
5 from mercurial import (
5 from mercurial import (
6 bundlerepo,
6 bundlerepo,
7 httppeer,
7 httppeer,
8 localrepo,
8 localrepo,
9 sshpeer,
9 sshpeer,
10 statichttprepo,
10 statichttprepo,
11 ui as uimod,
11 ui as uimod,
12 unionrepo,
12 unionrepo,
13 )
13 )
14
14
15 def checkobject(o):
15 def checkobject(o):
16 """Verify a constructed object conforms to interface rules.
16 """Verify a constructed object conforms to interface rules.
17
17
18 An object must have __abstractmethods__ defined.
18 An object must have __abstractmethods__ defined.
19
19
20 All "public" attributes of the object (attributes not prefixed with
20 All "public" attributes of the object (attributes not prefixed with
21 an underscore) must be in __abstractmethods__ or appear on a base class
21 an underscore) must be in __abstractmethods__ or appear on a base class
22 with __abstractmethods__.
22 with __abstractmethods__.
23 """
23 """
24 name = o.__class__.__name__
24 name = o.__class__.__name__
25
25
26 allowed = set()
26 allowed = set()
27 for cls in o.__class__.__mro__:
27 for cls in o.__class__.__mro__:
28 if not getattr(cls, '__abstractmethods__', set()):
28 if not getattr(cls, '__abstractmethods__', set()):
29 continue
29 continue
30
30
31 allowed |= cls.__abstractmethods__
31 allowed |= cls.__abstractmethods__
32 allowed |= {a for a in dir(cls) if not a.startswith('_')}
32 allowed |= {a for a in dir(cls) if not a.startswith('_')}
33
33
34 if not allowed:
34 if not allowed:
35 print('%s does not have abstract methods' % name)
35 print('%s does not have abstract methods' % name)
36 return
36 return
37
37
38 public = {a for a in dir(o) if not a.startswith('_')}
38 public = {a for a in dir(o) if not a.startswith('_')}
39
39
40 for attr in sorted(public - allowed):
40 for attr in sorted(public - allowed):
41 print('public attributes not in abstract interface: %s.%s' % (
41 print('public attributes not in abstract interface: %s.%s' % (
42 name, attr))
42 name, attr))
43
43
44 # Facilitates testing localpeer.
44 # Facilitates testing localpeer.
45 class dummyrepo(object):
45 class dummyrepo(object):
46 def __init__(self):
46 def __init__(self):
47 self.ui = uimod.ui()
47 self.ui = uimod.ui()
48 def filtered(self, name):
48 def filtered(self, name):
49 pass
49 pass
50 def _restrictcapabilities(self, caps):
50 def _restrictcapabilities(self, caps):
51 pass
51 pass
52
52
53 # Facilitates testing sshpeer without requiring an SSH server.
53 # Facilitates testing sshpeer without requiring an SSH server.
54 class badpeer(httppeer.httppeer):
54 class badpeer(httppeer.httppeer):
55 def __init__(self):
55 def __init__(self):
56 super(badpeer, self).__init__(uimod.ui(), 'http://localhost')
56 super(badpeer, self).__init__(uimod.ui(), 'http://localhost')
57 self.badattribute = True
57 self.badattribute = True
58
58
59 def badmethod(self):
59 def badmethod(self):
60 pass
60 pass
61
61
62 class dummypipe(object):
63 def close(self):
64 pass
65
62 def main():
66 def main():
63 ui = uimod.ui()
67 ui = uimod.ui()
64
68
65 checkobject(badpeer())
69 checkobject(badpeer())
66 checkobject(httppeer.httppeer(ui, 'http://localhost'))
70 checkobject(httppeer.httppeer(ui, 'http://localhost'))
67 checkobject(localrepo.localpeer(dummyrepo()))
71 checkobject(localrepo.localpeer(dummyrepo()))
68 checkobject(sshpeer.sshv1peer(ui, 'ssh://localhost/foo', None, None, None,
72 checkobject(sshpeer.sshv1peer(ui, 'ssh://localhost/foo', None, dummypipe(),
69 None, None))
73 dummypipe(), None, None))
70 checkobject(sshpeer.sshv2peer(ui, 'ssh://localhost/foo', None, None, None,
74 checkobject(sshpeer.sshv2peer(ui, 'ssh://localhost/foo', None, dummypipe(),
71 None, None))
75 dummypipe(), None, None))
72 checkobject(bundlerepo.bundlepeer(dummyrepo()))
76 checkobject(bundlerepo.bundlepeer(dummyrepo()))
73 checkobject(statichttprepo.statichttppeer(dummyrepo()))
77 checkobject(statichttprepo.statichttppeer(dummyrepo()))
74 checkobject(unionrepo.unionpeer(dummyrepo()))
78 checkobject(unionrepo.unionpeer(dummyrepo()))
75
79
76 main()
80 main()
General Comments 0
You need to be logged in to leave comments. Login now