##// END OF EJS Templates
sshpeer: implement peer for version 2 of wire protocol...
Gregory Szorc -
r35996:59e4a778 default
parent child Browse files
Show More
@@ -1,540 +1,553 b''
1 # sshpeer.py - ssh repository proxy class for mercurial
1 # sshpeer.py - ssh repository proxy class for mercurial
2 #
2 #
3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import re
10 import re
11 import uuid
11 import uuid
12
12
13 from .i18n import _
13 from .i18n import _
14 from . import (
14 from . import (
15 error,
15 error,
16 pycompat,
16 pycompat,
17 util,
17 util,
18 wireproto,
18 wireproto,
19 wireprotoserver,
19 wireprotoserver,
20 )
20 )
21
21
22 def _serverquote(s):
22 def _serverquote(s):
23 """quote a string for the remote shell ... which we assume is sh"""
23 """quote a string for the remote shell ... which we assume is sh"""
24 if not s:
24 if not s:
25 return s
25 return s
26 if re.match('[a-zA-Z0-9@%_+=:,./-]*$', s):
26 if re.match('[a-zA-Z0-9@%_+=:,./-]*$', s):
27 return s
27 return s
28 return "'%s'" % s.replace("'", "'\\''")
28 return "'%s'" % s.replace("'", "'\\''")
29
29
30 def _forwardoutput(ui, pipe):
30 def _forwardoutput(ui, pipe):
31 """display all data currently available on pipe as remote output.
31 """display all data currently available on pipe as remote output.
32
32
33 This is non blocking."""
33 This is non blocking."""
34 s = util.readpipe(pipe)
34 s = util.readpipe(pipe)
35 if s:
35 if s:
36 for l in s.splitlines():
36 for l in s.splitlines():
37 ui.status(_("remote: "), l, '\n')
37 ui.status(_("remote: "), l, '\n')
38
38
39 class doublepipe(object):
39 class doublepipe(object):
40 """Operate a side-channel pipe in addition of a main one
40 """Operate a side-channel pipe in addition of a main one
41
41
42 The side-channel pipe contains server output to be forwarded to the user
42 The side-channel pipe contains server output to be forwarded to the user
43 input. The double pipe will behave as the "main" pipe, but will ensure the
43 input. The double pipe will behave as the "main" pipe, but will ensure the
44 content of the "side" pipe is properly processed while we wait for blocking
44 content of the "side" pipe is properly processed while we wait for blocking
45 call on the "main" pipe.
45 call on the "main" pipe.
46
46
47 If large amounts of data are read from "main", the forward will cease after
47 If large amounts of data are read from "main", the forward will cease after
48 the first bytes start to appear. This simplifies the implementation
48 the first bytes start to appear. This simplifies the implementation
49 without affecting actual output of sshpeer too much as we rarely issue
49 without affecting actual output of sshpeer too much as we rarely issue
50 large read for data not yet emitted by the server.
50 large read for data not yet emitted by the server.
51
51
52 The main pipe is expected to be a 'bufferedinputpipe' from the util module
52 The main pipe is expected to be a 'bufferedinputpipe' from the util module
53 that handle all the os specific bits. This class lives in this module
53 that handle all the os specific bits. This class lives in this module
54 because it focus on behavior specific to the ssh protocol."""
54 because it focus on behavior specific to the ssh protocol."""
55
55
56 def __init__(self, ui, main, side):
56 def __init__(self, ui, main, side):
57 self._ui = ui
57 self._ui = ui
58 self._main = main
58 self._main = main
59 self._side = side
59 self._side = side
60
60
61 def _wait(self):
61 def _wait(self):
62 """wait until some data are available on main or side
62 """wait until some data are available on main or side
63
63
64 return a pair of boolean (ismainready, issideready)
64 return a pair of boolean (ismainready, issideready)
65
65
66 (This will only wait for data if the setup is supported by `util.poll`)
66 (This will only wait for data if the setup is supported by `util.poll`)
67 """
67 """
68 if getattr(self._main, 'hasbuffer', False): # getattr for classic pipe
68 if getattr(self._main, 'hasbuffer', False): # getattr for classic pipe
69 return (True, True) # main has data, assume side is worth poking at.
69 return (True, True) # main has data, assume side is worth poking at.
70 fds = [self._main.fileno(), self._side.fileno()]
70 fds = [self._main.fileno(), self._side.fileno()]
71 try:
71 try:
72 act = util.poll(fds)
72 act = util.poll(fds)
73 except NotImplementedError:
73 except NotImplementedError:
74 # non supported yet case, assume all have data.
74 # non supported yet case, assume all have data.
75 act = fds
75 act = fds
76 return (self._main.fileno() in act, self._side.fileno() in act)
76 return (self._main.fileno() in act, self._side.fileno() in act)
77
77
78 def write(self, data):
78 def write(self, data):
79 return self._call('write', data)
79 return self._call('write', data)
80
80
81 def read(self, size):
81 def read(self, size):
82 r = self._call('read', size)
82 r = self._call('read', size)
83 if size != 0 and not r:
83 if size != 0 and not r:
84 # We've observed a condition that indicates the
84 # We've observed a condition that indicates the
85 # stdout closed unexpectedly. Check stderr one
85 # stdout closed unexpectedly. Check stderr one
86 # more time and snag anything that's there before
86 # more time and snag anything that's there before
87 # letting anyone know the main part of the pipe
87 # letting anyone know the main part of the pipe
88 # closed prematurely.
88 # closed prematurely.
89 _forwardoutput(self._ui, self._side)
89 _forwardoutput(self._ui, self._side)
90 return r
90 return r
91
91
92 def readline(self):
92 def readline(self):
93 return self._call('readline')
93 return self._call('readline')
94
94
95 def _call(self, methname, data=None):
95 def _call(self, methname, data=None):
96 """call <methname> on "main", forward output of "side" while blocking
96 """call <methname> on "main", forward output of "side" while blocking
97 """
97 """
98 # data can be '' or 0
98 # data can be '' or 0
99 if (data is not None and not data) or self._main.closed:
99 if (data is not None and not data) or self._main.closed:
100 _forwardoutput(self._ui, self._side)
100 _forwardoutput(self._ui, self._side)
101 return ''
101 return ''
102 while True:
102 while True:
103 mainready, sideready = self._wait()
103 mainready, sideready = self._wait()
104 if sideready:
104 if sideready:
105 _forwardoutput(self._ui, self._side)
105 _forwardoutput(self._ui, self._side)
106 if mainready:
106 if mainready:
107 meth = getattr(self._main, methname)
107 meth = getattr(self._main, methname)
108 if data is None:
108 if data is None:
109 return meth()
109 return meth()
110 else:
110 else:
111 return meth(data)
111 return meth(data)
112
112
113 def close(self):
113 def close(self):
114 return self._main.close()
114 return self._main.close()
115
115
116 def flush(self):
116 def flush(self):
117 return self._main.flush()
117 return self._main.flush()
118
118
119 def _cleanuppipes(ui, pipei, pipeo, pipee):
119 def _cleanuppipes(ui, pipei, pipeo, pipee):
120 """Clean up pipes used by an SSH connection."""
120 """Clean up pipes used by an SSH connection."""
121 if pipeo:
121 if pipeo:
122 pipeo.close()
122 pipeo.close()
123 if pipei:
123 if pipei:
124 pipei.close()
124 pipei.close()
125
125
126 if pipee:
126 if pipee:
127 # Try to read from the err descriptor until EOF.
127 # Try to read from the err descriptor until EOF.
128 try:
128 try:
129 for l in pipee:
129 for l in pipee:
130 ui.status(_('remote: '), l)
130 ui.status(_('remote: '), l)
131 except (IOError, ValueError):
131 except (IOError, ValueError):
132 pass
132 pass
133
133
134 pipee.close()
134 pipee.close()
135
135
136 def _makeconnection(ui, sshcmd, args, remotecmd, path, sshenv=None):
136 def _makeconnection(ui, sshcmd, args, remotecmd, path, sshenv=None):
137 """Create an SSH connection to a server.
137 """Create an SSH connection to a server.
138
138
139 Returns a tuple of (process, stdin, stdout, stderr) for the
139 Returns a tuple of (process, stdin, stdout, stderr) for the
140 spawned process.
140 spawned process.
141 """
141 """
142 cmd = '%s %s %s' % (
142 cmd = '%s %s %s' % (
143 sshcmd,
143 sshcmd,
144 args,
144 args,
145 util.shellquote('%s -R %s serve --stdio' % (
145 util.shellquote('%s -R %s serve --stdio' % (
146 _serverquote(remotecmd), _serverquote(path))))
146 _serverquote(remotecmd), _serverquote(path))))
147
147
148 ui.debug('running %s\n' % cmd)
148 ui.debug('running %s\n' % cmd)
149 cmd = util.quotecommand(cmd)
149 cmd = util.quotecommand(cmd)
150
150
151 # no buffer allow the use of 'select'
151 # no buffer allow the use of 'select'
152 # feel free to remove buffering and select usage when we ultimately
152 # feel free to remove buffering and select usage when we ultimately
153 # move to threading.
153 # move to threading.
154 stdin, stdout, stderr, proc = util.popen4(cmd, bufsize=0, env=sshenv)
154 stdin, stdout, stderr, proc = util.popen4(cmd, bufsize=0, env=sshenv)
155
155
156 stdout = doublepipe(ui, util.bufferedinputpipe(stdout), stderr)
156 stdout = doublepipe(ui, util.bufferedinputpipe(stdout), stderr)
157 stdin = doublepipe(ui, stdin, stderr)
157 stdin = doublepipe(ui, stdin, stderr)
158
158
159 return proc, stdin, stdout, stderr
159 return proc, stdin, stdout, stderr
160
160
161 def _performhandshake(ui, stdin, stdout, stderr):
161 def _performhandshake(ui, stdin, stdout, stderr):
162 def badresponse():
162 def badresponse():
163 msg = _('no suitable response from remote hg')
163 msg = _('no suitable response from remote hg')
164 hint = ui.config('ui', 'ssherrorhint')
164 hint = ui.config('ui', 'ssherrorhint')
165 raise error.RepoError(msg, hint=hint)
165 raise error.RepoError(msg, hint=hint)
166
166
167 # The handshake consists of sending wire protocol commands in reverse
167 # The handshake consists of sending wire protocol commands in reverse
168 # order of protocol implementation and then sniffing for a response
168 # order of protocol implementation and then sniffing for a response
169 # to one of them.
169 # to one of them.
170 #
170 #
171 # Those commands (from oldest to newest) are:
171 # Those commands (from oldest to newest) are:
172 #
172 #
173 # ``between``
173 # ``between``
174 # Asks for the set of revisions between a pair of revisions. Command
174 # Asks for the set of revisions between a pair of revisions. Command
175 # present in all Mercurial server implementations.
175 # present in all Mercurial server implementations.
176 #
176 #
177 # ``hello``
177 # ``hello``
178 # Instructs the server to advertise its capabilities. Introduced in
178 # Instructs the server to advertise its capabilities. Introduced in
179 # Mercurial 0.9.1.
179 # Mercurial 0.9.1.
180 #
180 #
181 # ``upgrade``
181 # ``upgrade``
182 # Requests upgrade from default transport protocol version 1 to
182 # Requests upgrade from default transport protocol version 1 to
183 # a newer version. Introduced in Mercurial 4.6 as an experimental
183 # a newer version. Introduced in Mercurial 4.6 as an experimental
184 # feature.
184 # feature.
185 #
185 #
186 # The ``between`` command is issued with a request for the null
186 # The ``between`` command is issued with a request for the null
187 # range. If the remote is a Mercurial server, this request will
187 # range. If the remote is a Mercurial server, this request will
188 # generate a specific response: ``1\n\n``. This represents the
188 # generate a specific response: ``1\n\n``. This represents the
189 # wire protocol encoded value for ``\n``. We look for ``1\n\n``
189 # wire protocol encoded value for ``\n``. We look for ``1\n\n``
190 # in the output stream and know this is the response to ``between``
190 # in the output stream and know this is the response to ``between``
191 # and we're at the end of our handshake reply.
191 # and we're at the end of our handshake reply.
192 #
192 #
193 # The response to the ``hello`` command will be a line with the
193 # The response to the ``hello`` command will be a line with the
194 # length of the value returned by that command followed by that
194 # length of the value returned by that command followed by that
195 # value. If the server doesn't support ``hello`` (which should be
195 # value. If the server doesn't support ``hello`` (which should be
196 # rare), that line will be ``0\n``. Otherwise, the value will contain
196 # rare), that line will be ``0\n``. Otherwise, the value will contain
197 # RFC 822 like lines. Of these, the ``capabilities:`` line contains
197 # RFC 822 like lines. Of these, the ``capabilities:`` line contains
198 # the capabilities of the server.
198 # the capabilities of the server.
199 #
199 #
200 # The ``upgrade`` command isn't really a command in the traditional
200 # The ``upgrade`` command isn't really a command in the traditional
201 # sense of version 1 of the transport because it isn't using the
201 # sense of version 1 of the transport because it isn't using the
202 # proper mechanism for formatting insteads: instead, it just encodes
202 # proper mechanism for formatting insteads: instead, it just encodes
203 # arguments on the line, delimited by spaces.
203 # arguments on the line, delimited by spaces.
204 #
204 #
205 # The ``upgrade`` line looks like ``upgrade <token> <capabilities>``.
205 # The ``upgrade`` line looks like ``upgrade <token> <capabilities>``.
206 # If the server doesn't support protocol upgrades, it will reply to
206 # If the server doesn't support protocol upgrades, it will reply to
207 # this line with ``0\n``. Otherwise, it emits an
207 # this line with ``0\n``. Otherwise, it emits an
208 # ``upgraded <token> <protocol>`` line to both stdout and stderr.
208 # ``upgraded <token> <protocol>`` line to both stdout and stderr.
209 # Content immediately following this line describes additional
209 # Content immediately following this line describes additional
210 # protocol and server state.
210 # protocol and server state.
211 #
211 #
212 # In addition to the responses to our command requests, the server
212 # In addition to the responses to our command requests, the server
213 # may emit "banner" output on stdout. SSH servers are allowed to
213 # may emit "banner" output on stdout. SSH servers are allowed to
214 # print messages to stdout on login. Issuing commands on connection
214 # print messages to stdout on login. Issuing commands on connection
215 # allows us to flush this banner output from the server by scanning
215 # allows us to flush this banner output from the server by scanning
216 # for output to our well-known ``between`` command. Of course, if
216 # for output to our well-known ``between`` command. Of course, if
217 # the banner contains ``1\n\n``, this will throw off our detection.
217 # the banner contains ``1\n\n``, this will throw off our detection.
218
218
219 requestlog = ui.configbool('devel', 'debug.peer-request')
219 requestlog = ui.configbool('devel', 'debug.peer-request')
220
220
221 # Generate a random token to help identify responses to version 2
221 # Generate a random token to help identify responses to version 2
222 # upgrade request.
222 # upgrade request.
223 token = bytes(uuid.uuid4())
223 token = bytes(uuid.uuid4())
224 upgradecaps = [
224 upgradecaps = [
225 ('proto', wireprotoserver.SSHV2),
225 ('proto', wireprotoserver.SSHV2),
226 ]
226 ]
227 upgradecaps = util.urlreq.urlencode(upgradecaps)
227 upgradecaps = util.urlreq.urlencode(upgradecaps)
228
228
229 try:
229 try:
230 pairsarg = '%s-%s' % ('0' * 40, '0' * 40)
230 pairsarg = '%s-%s' % ('0' * 40, '0' * 40)
231 handshake = [
231 handshake = [
232 'hello\n',
232 'hello\n',
233 'between\n',
233 'between\n',
234 'pairs %d\n' % len(pairsarg),
234 'pairs %d\n' % len(pairsarg),
235 pairsarg,
235 pairsarg,
236 ]
236 ]
237
237
238 # Request upgrade to version 2 if configured.
238 # Request upgrade to version 2 if configured.
239 if ui.configbool('experimental', 'sshpeer.advertise-v2'):
239 if ui.configbool('experimental', 'sshpeer.advertise-v2'):
240 ui.debug('sending upgrade request: %s %s\n' % (token, upgradecaps))
240 ui.debug('sending upgrade request: %s %s\n' % (token, upgradecaps))
241 handshake.insert(0, 'upgrade %s %s\n' % (token, upgradecaps))
241 handshake.insert(0, 'upgrade %s %s\n' % (token, upgradecaps))
242
242
243 if requestlog:
243 if requestlog:
244 ui.debug('devel-peer-request: hello\n')
244 ui.debug('devel-peer-request: hello\n')
245 ui.debug('sending hello command\n')
245 ui.debug('sending hello command\n')
246 if requestlog:
246 if requestlog:
247 ui.debug('devel-peer-request: between\n')
247 ui.debug('devel-peer-request: between\n')
248 ui.debug('devel-peer-request: pairs: %d bytes\n' % len(pairsarg))
248 ui.debug('devel-peer-request: pairs: %d bytes\n' % len(pairsarg))
249 ui.debug('sending between command\n')
249 ui.debug('sending between command\n')
250
250
251 stdin.write(''.join(handshake))
251 stdin.write(''.join(handshake))
252 stdin.flush()
252 stdin.flush()
253 except IOError:
253 except IOError:
254 badresponse()
254 badresponse()
255
255
256 # Assume version 1 of wire protocol by default.
256 # Assume version 1 of wire protocol by default.
257 protoname = wireprotoserver.SSHV1
257 protoname = wireprotoserver.SSHV1
258 reupgraded = re.compile(b'^upgraded %s (.*)$' % re.escape(token))
258 reupgraded = re.compile(b'^upgraded %s (.*)$' % re.escape(token))
259
259
260 lines = ['', 'dummy']
260 lines = ['', 'dummy']
261 max_noise = 500
261 max_noise = 500
262 while lines[-1] and max_noise:
262 while lines[-1] and max_noise:
263 try:
263 try:
264 l = stdout.readline()
264 l = stdout.readline()
265 _forwardoutput(ui, stderr)
265 _forwardoutput(ui, stderr)
266
266
267 # Look for reply to protocol upgrade request. It has a token
267 # Look for reply to protocol upgrade request. It has a token
268 # in it, so there should be no false positives.
268 # in it, so there should be no false positives.
269 m = reupgraded.match(l)
269 m = reupgraded.match(l)
270 if m:
270 if m:
271 protoname = m.group(1)
271 protoname = m.group(1)
272 ui.debug('protocol upgraded to %s\n' % protoname)
272 ui.debug('protocol upgraded to %s\n' % protoname)
273 # If an upgrade was handled, the ``hello`` and ``between``
273 # If an upgrade was handled, the ``hello`` and ``between``
274 # requests are ignored. The next output belongs to the
274 # requests are ignored. The next output belongs to the
275 # protocol, so stop scanning lines.
275 # protocol, so stop scanning lines.
276 break
276 break
277
277
278 # Otherwise it could be a banner, ``0\n`` response if server
278 # Otherwise it could be a banner, ``0\n`` response if server
279 # doesn't support upgrade.
279 # doesn't support upgrade.
280
280
281 if lines[-1] == '1\n' and l == '\n':
281 if lines[-1] == '1\n' and l == '\n':
282 break
282 break
283 if l:
283 if l:
284 ui.debug('remote: ', l)
284 ui.debug('remote: ', l)
285 lines.append(l)
285 lines.append(l)
286 max_noise -= 1
286 max_noise -= 1
287 except IOError:
287 except IOError:
288 badresponse()
288 badresponse()
289 else:
289 else:
290 badresponse()
290 badresponse()
291
291
292 caps = set()
292 caps = set()
293
293
294 # For version 1, we should see a ``capabilities`` line in response to the
294 # For version 1, we should see a ``capabilities`` line in response to the
295 # ``hello`` command.
295 # ``hello`` command.
296 if protoname == wireprotoserver.SSHV1:
296 if protoname == wireprotoserver.SSHV1:
297 for l in reversed(lines):
297 for l in reversed(lines):
298 # Look for response to ``hello`` command. Scan from the back so
298 # Look for response to ``hello`` command. Scan from the back so
299 # we don't misinterpret banner output as the command reply.
299 # we don't misinterpret banner output as the command reply.
300 if l.startswith('capabilities:'):
300 if l.startswith('capabilities:'):
301 caps.update(l[:-1].split(':')[1].split())
301 caps.update(l[:-1].split(':')[1].split())
302 break
302 break
303 elif protoname == wireprotoserver.SSHV2:
303 elif protoname == wireprotoserver.SSHV2:
304 # We see a line with number of bytes to follow and then a value
304 # We see a line with number of bytes to follow and then a value
305 # looking like ``capabilities: *``.
305 # looking like ``capabilities: *``.
306 line = stdout.readline()
306 line = stdout.readline()
307 try:
307 try:
308 valuelen = int(line)
308 valuelen = int(line)
309 except ValueError:
309 except ValueError:
310 badresponse()
310 badresponse()
311
311
312 capsline = stdout.read(valuelen)
312 capsline = stdout.read(valuelen)
313 if not capsline.startswith('capabilities: '):
313 if not capsline.startswith('capabilities: '):
314 badresponse()
314 badresponse()
315
315
316 caps.update(capsline.split(':')[1].split())
316 caps.update(capsline.split(':')[1].split())
317 # Trailing newline.
317 # Trailing newline.
318 stdout.read(1)
318 stdout.read(1)
319
319
320 # Error if we couldn't find capabilities, this means:
320 # Error if we couldn't find capabilities, this means:
321 #
321 #
322 # 1. Remote isn't a Mercurial server
322 # 1. Remote isn't a Mercurial server
323 # 2. Remote is a <0.9.1 Mercurial server
323 # 2. Remote is a <0.9.1 Mercurial server
324 # 3. Remote is a future Mercurial server that dropped ``hello``
324 # 3. Remote is a future Mercurial server that dropped ``hello``
325 # and other attempted handshake mechanisms.
325 # and other attempted handshake mechanisms.
326 if not caps:
326 if not caps:
327 badresponse()
327 badresponse()
328
328
329 return caps
329 return protoname, caps
330
330
331 class sshv1peer(wireproto.wirepeer):
331 class sshv1peer(wireproto.wirepeer):
332 def __init__(self, ui, url, proc, stdin, stdout, stderr, caps):
332 def __init__(self, ui, url, proc, stdin, stdout, stderr, caps):
333 """Create a peer from an existing SSH connection.
333 """Create a peer from an existing SSH connection.
334
334
335 ``proc`` is a handle on the underlying SSH process.
335 ``proc`` is a handle on the underlying SSH process.
336 ``stdin``, ``stdout``, and ``stderr`` are handles on the stdio
336 ``stdin``, ``stdout``, and ``stderr`` are handles on the stdio
337 pipes for that process.
337 pipes for that process.
338 ``caps`` is a set of capabilities supported by the remote.
338 ``caps`` is a set of capabilities supported by the remote.
339 """
339 """
340 self._url = url
340 self._url = url
341 self._ui = ui
341 self._ui = ui
342 # self._subprocess is unused. Keeping a handle on the process
342 # self._subprocess is unused. Keeping a handle on the process
343 # holds a reference and prevents it from being garbage collected.
343 # holds a reference and prevents it from being garbage collected.
344 self._subprocess = proc
344 self._subprocess = proc
345 self._pipeo = stdin
345 self._pipeo = stdin
346 self._pipei = stdout
346 self._pipei = stdout
347 self._pipee = stderr
347 self._pipee = stderr
348 self._caps = caps
348 self._caps = caps
349
349
350 # Begin of _basepeer interface.
350 # Begin of _basepeer interface.
351
351
352 @util.propertycache
352 @util.propertycache
353 def ui(self):
353 def ui(self):
354 return self._ui
354 return self._ui
355
355
356 def url(self):
356 def url(self):
357 return self._url
357 return self._url
358
358
359 def local(self):
359 def local(self):
360 return None
360 return None
361
361
362 def peer(self):
362 def peer(self):
363 return self
363 return self
364
364
365 def canpush(self):
365 def canpush(self):
366 return True
366 return True
367
367
368 def close(self):
368 def close(self):
369 pass
369 pass
370
370
371 # End of _basepeer interface.
371 # End of _basepeer interface.
372
372
373 # Begin of _basewirecommands interface.
373 # Begin of _basewirecommands interface.
374
374
375 def capabilities(self):
375 def capabilities(self):
376 return self._caps
376 return self._caps
377
377
378 # End of _basewirecommands interface.
378 # End of _basewirecommands interface.
379
379
380 def _readerr(self):
380 def _readerr(self):
381 _forwardoutput(self.ui, self._pipee)
381 _forwardoutput(self.ui, self._pipee)
382
382
383 def _abort(self, exception):
383 def _abort(self, exception):
384 self._cleanup()
384 self._cleanup()
385 raise exception
385 raise exception
386
386
387 def _cleanup(self):
387 def _cleanup(self):
388 _cleanuppipes(self.ui, self._pipei, self._pipeo, self._pipee)
388 _cleanuppipes(self.ui, self._pipei, self._pipeo, self._pipee)
389
389
390 __del__ = _cleanup
390 __del__ = _cleanup
391
391
392 def _submitbatch(self, req):
392 def _submitbatch(self, req):
393 rsp = self._callstream("batch", cmds=wireproto.encodebatchcmds(req))
393 rsp = self._callstream("batch", cmds=wireproto.encodebatchcmds(req))
394 available = self._getamount()
394 available = self._getamount()
395 # TODO this response parsing is probably suboptimal for large
395 # TODO this response parsing is probably suboptimal for large
396 # batches with large responses.
396 # batches with large responses.
397 toread = min(available, 1024)
397 toread = min(available, 1024)
398 work = rsp.read(toread)
398 work = rsp.read(toread)
399 available -= toread
399 available -= toread
400 chunk = work
400 chunk = work
401 while chunk:
401 while chunk:
402 while ';' in work:
402 while ';' in work:
403 one, work = work.split(';', 1)
403 one, work = work.split(';', 1)
404 yield wireproto.unescapearg(one)
404 yield wireproto.unescapearg(one)
405 toread = min(available, 1024)
405 toread = min(available, 1024)
406 chunk = rsp.read(toread)
406 chunk = rsp.read(toread)
407 available -= toread
407 available -= toread
408 work += chunk
408 work += chunk
409 yield wireproto.unescapearg(work)
409 yield wireproto.unescapearg(work)
410
410
411 def _callstream(self, cmd, **args):
411 def _callstream(self, cmd, **args):
412 args = pycompat.byteskwargs(args)
412 args = pycompat.byteskwargs(args)
413 if (self.ui.debugflag
413 if (self.ui.debugflag
414 and self.ui.configbool('devel', 'debug.peer-request')):
414 and self.ui.configbool('devel', 'debug.peer-request')):
415 dbg = self.ui.debug
415 dbg = self.ui.debug
416 line = 'devel-peer-request: %s\n'
416 line = 'devel-peer-request: %s\n'
417 dbg(line % cmd)
417 dbg(line % cmd)
418 for key, value in sorted(args.items()):
418 for key, value in sorted(args.items()):
419 if not isinstance(value, dict):
419 if not isinstance(value, dict):
420 dbg(line % ' %s: %d bytes' % (key, len(value)))
420 dbg(line % ' %s: %d bytes' % (key, len(value)))
421 else:
421 else:
422 for dk, dv in sorted(value.items()):
422 for dk, dv in sorted(value.items()):
423 dbg(line % ' %s-%s: %d' % (key, dk, len(dv)))
423 dbg(line % ' %s-%s: %d' % (key, dk, len(dv)))
424 self.ui.debug("sending %s command\n" % cmd)
424 self.ui.debug("sending %s command\n" % cmd)
425 self._pipeo.write("%s\n" % cmd)
425 self._pipeo.write("%s\n" % cmd)
426 _func, names = wireproto.commands[cmd]
426 _func, names = wireproto.commands[cmd]
427 keys = names.split()
427 keys = names.split()
428 wireargs = {}
428 wireargs = {}
429 for k in keys:
429 for k in keys:
430 if k == '*':
430 if k == '*':
431 wireargs['*'] = args
431 wireargs['*'] = args
432 break
432 break
433 else:
433 else:
434 wireargs[k] = args[k]
434 wireargs[k] = args[k]
435 del args[k]
435 del args[k]
436 for k, v in sorted(wireargs.iteritems()):
436 for k, v in sorted(wireargs.iteritems()):
437 self._pipeo.write("%s %d\n" % (k, len(v)))
437 self._pipeo.write("%s %d\n" % (k, len(v)))
438 if isinstance(v, dict):
438 if isinstance(v, dict):
439 for dk, dv in v.iteritems():
439 for dk, dv in v.iteritems():
440 self._pipeo.write("%s %d\n" % (dk, len(dv)))
440 self._pipeo.write("%s %d\n" % (dk, len(dv)))
441 self._pipeo.write(dv)
441 self._pipeo.write(dv)
442 else:
442 else:
443 self._pipeo.write(v)
443 self._pipeo.write(v)
444 self._pipeo.flush()
444 self._pipeo.flush()
445
445
446 return self._pipei
446 return self._pipei
447
447
448 def _callcompressable(self, cmd, **args):
448 def _callcompressable(self, cmd, **args):
449 return self._callstream(cmd, **args)
449 return self._callstream(cmd, **args)
450
450
451 def _call(self, cmd, **args):
451 def _call(self, cmd, **args):
452 self._callstream(cmd, **args)
452 self._callstream(cmd, **args)
453 return self._recv()
453 return self._recv()
454
454
455 def _callpush(self, cmd, fp, **args):
455 def _callpush(self, cmd, fp, **args):
456 r = self._call(cmd, **args)
456 r = self._call(cmd, **args)
457 if r:
457 if r:
458 return '', r
458 return '', r
459 for d in iter(lambda: fp.read(4096), ''):
459 for d in iter(lambda: fp.read(4096), ''):
460 self._send(d)
460 self._send(d)
461 self._send("", flush=True)
461 self._send("", flush=True)
462 r = self._recv()
462 r = self._recv()
463 if r:
463 if r:
464 return '', r
464 return '', r
465 return self._recv(), ''
465 return self._recv(), ''
466
466
467 def _calltwowaystream(self, cmd, fp, **args):
467 def _calltwowaystream(self, cmd, fp, **args):
468 r = self._call(cmd, **args)
468 r = self._call(cmd, **args)
469 if r:
469 if r:
470 # XXX needs to be made better
470 # XXX needs to be made better
471 raise error.Abort(_('unexpected remote reply: %s') % r)
471 raise error.Abort(_('unexpected remote reply: %s') % r)
472 for d in iter(lambda: fp.read(4096), ''):
472 for d in iter(lambda: fp.read(4096), ''):
473 self._send(d)
473 self._send(d)
474 self._send("", flush=True)
474 self._send("", flush=True)
475 return self._pipei
475 return self._pipei
476
476
477 def _getamount(self):
477 def _getamount(self):
478 l = self._pipei.readline()
478 l = self._pipei.readline()
479 if l == '\n':
479 if l == '\n':
480 self._readerr()
480 self._readerr()
481 msg = _('check previous remote output')
481 msg = _('check previous remote output')
482 self._abort(error.OutOfBandError(hint=msg))
482 self._abort(error.OutOfBandError(hint=msg))
483 self._readerr()
483 self._readerr()
484 try:
484 try:
485 return int(l)
485 return int(l)
486 except ValueError:
486 except ValueError:
487 self._abort(error.ResponseError(_("unexpected response:"), l))
487 self._abort(error.ResponseError(_("unexpected response:"), l))
488
488
489 def _recv(self):
489 def _recv(self):
490 return self._pipei.read(self._getamount())
490 return self._pipei.read(self._getamount())
491
491
492 def _send(self, data, flush=False):
492 def _send(self, data, flush=False):
493 self._pipeo.write("%d\n" % len(data))
493 self._pipeo.write("%d\n" % len(data))
494 if data:
494 if data:
495 self._pipeo.write(data)
495 self._pipeo.write(data)
496 if flush:
496 if flush:
497 self._pipeo.flush()
497 self._pipeo.flush()
498 self._readerr()
498 self._readerr()
499
499
500 class sshv2peer(sshv1peer):
501 """A peer that speakers version 2 of the transport protocol."""
502 # Currently version 2 is identical to version 1 post handshake.
503 # And handshake is performed before the peer is instantiated. So
504 # we need no custom code.
505
500 def instance(ui, path, create):
506 def instance(ui, path, create):
501 """Create an SSH peer.
507 """Create an SSH peer.
502
508
503 The returned object conforms to the ``wireproto.wirepeer`` interface.
509 The returned object conforms to the ``wireproto.wirepeer`` interface.
504 """
510 """
505 u = util.url(path, parsequery=False, parsefragment=False)
511 u = util.url(path, parsequery=False, parsefragment=False)
506 if u.scheme != 'ssh' or not u.host or u.path is None:
512 if u.scheme != 'ssh' or not u.host or u.path is None:
507 raise error.RepoError(_("couldn't parse location %s") % path)
513 raise error.RepoError(_("couldn't parse location %s") % path)
508
514
509 util.checksafessh(path)
515 util.checksafessh(path)
510
516
511 if u.passwd is not None:
517 if u.passwd is not None:
512 raise error.RepoError(_('password in URL not supported'))
518 raise error.RepoError(_('password in URL not supported'))
513
519
514 sshcmd = ui.config('ui', 'ssh')
520 sshcmd = ui.config('ui', 'ssh')
515 remotecmd = ui.config('ui', 'remotecmd')
521 remotecmd = ui.config('ui', 'remotecmd')
516 sshaddenv = dict(ui.configitems('sshenv'))
522 sshaddenv = dict(ui.configitems('sshenv'))
517 sshenv = util.shellenviron(sshaddenv)
523 sshenv = util.shellenviron(sshaddenv)
518 remotepath = u.path or '.'
524 remotepath = u.path or '.'
519
525
520 args = util.sshargs(sshcmd, u.host, u.user, u.port)
526 args = util.sshargs(sshcmd, u.host, u.user, u.port)
521
527
522 if create:
528 if create:
523 cmd = '%s %s %s' % (sshcmd, args,
529 cmd = '%s %s %s' % (sshcmd, args,
524 util.shellquote('%s init %s' %
530 util.shellquote('%s init %s' %
525 (_serverquote(remotecmd), _serverquote(remotepath))))
531 (_serverquote(remotecmd), _serverquote(remotepath))))
526 ui.debug('running %s\n' % cmd)
532 ui.debug('running %s\n' % cmd)
527 res = ui.system(cmd, blockedtag='sshpeer', environ=sshenv)
533 res = ui.system(cmd, blockedtag='sshpeer', environ=sshenv)
528 if res != 0:
534 if res != 0:
529 raise error.RepoError(_('could not create remote repo'))
535 raise error.RepoError(_('could not create remote repo'))
530
536
531 proc, stdin, stdout, stderr = _makeconnection(ui, sshcmd, args, remotecmd,
537 proc, stdin, stdout, stderr = _makeconnection(ui, sshcmd, args, remotecmd,
532 remotepath, sshenv)
538 remotepath, sshenv)
533
539
534 try:
540 try:
535 caps = _performhandshake(ui, stdin, stdout, stderr)
541 protoname, caps = _performhandshake(ui, stdin, stdout, stderr)
536 except Exception:
542 except Exception:
537 _cleanuppipes(ui, stdout, stdin, stderr)
543 _cleanuppipes(ui, stdout, stdin, stderr)
538 raise
544 raise
539
545
540 return sshv1peer(ui, path, proc, stdin, stdout, stderr, caps)
546 if protoname == wireprotoserver.SSHV1:
547 return sshv1peer(ui, path, proc, stdin, stdout, stderr, caps)
548 elif protoname == wireprotoserver.SSHV2:
549 return sshv2peer(ui, path, proc, stdin, stdout, stderr, caps)
550 else:
551 _cleanuppipes(ui, stdout, stdin, stderr)
552 raise error.RepoError(_('unknown version of SSH protocol: %s') %
553 protoname)
@@ -1,74 +1,76 b''
1 # Test that certain objects conform to well-defined interfaces.
1 # Test that certain objects conform to well-defined interfaces.
2
2
3 from __future__ import absolute_import, print_function
3 from __future__ import absolute_import, print_function
4
4
5 from mercurial import (
5 from mercurial import (
6 bundlerepo,
6 bundlerepo,
7 httppeer,
7 httppeer,
8 localrepo,
8 localrepo,
9 sshpeer,
9 sshpeer,
10 statichttprepo,
10 statichttprepo,
11 ui as uimod,
11 ui as uimod,
12 unionrepo,
12 unionrepo,
13 )
13 )
14
14
15 def checkobject(o):
15 def checkobject(o):
16 """Verify a constructed object conforms to interface rules.
16 """Verify a constructed object conforms to interface rules.
17
17
18 An object must have __abstractmethods__ defined.
18 An object must have __abstractmethods__ defined.
19
19
20 All "public" attributes of the object (attributes not prefixed with
20 All "public" attributes of the object (attributes not prefixed with
21 an underscore) must be in __abstractmethods__ or appear on a base class
21 an underscore) must be in __abstractmethods__ or appear on a base class
22 with __abstractmethods__.
22 with __abstractmethods__.
23 """
23 """
24 name = o.__class__.__name__
24 name = o.__class__.__name__
25
25
26 allowed = set()
26 allowed = set()
27 for cls in o.__class__.__mro__:
27 for cls in o.__class__.__mro__:
28 if not getattr(cls, '__abstractmethods__', set()):
28 if not getattr(cls, '__abstractmethods__', set()):
29 continue
29 continue
30
30
31 allowed |= cls.__abstractmethods__
31 allowed |= cls.__abstractmethods__
32 allowed |= {a for a in dir(cls) if not a.startswith('_')}
32 allowed |= {a for a in dir(cls) if not a.startswith('_')}
33
33
34 if not allowed:
34 if not allowed:
35 print('%s does not have abstract methods' % name)
35 print('%s does not have abstract methods' % name)
36 return
36 return
37
37
38 public = {a for a in dir(o) if not a.startswith('_')}
38 public = {a for a in dir(o) if not a.startswith('_')}
39
39
40 for attr in sorted(public - allowed):
40 for attr in sorted(public - allowed):
41 print('public attributes not in abstract interface: %s.%s' % (
41 print('public attributes not in abstract interface: %s.%s' % (
42 name, attr))
42 name, attr))
43
43
44 # Facilitates testing localpeer.
44 # Facilitates testing localpeer.
45 class dummyrepo(object):
45 class dummyrepo(object):
46 def __init__(self):
46 def __init__(self):
47 self.ui = uimod.ui()
47 self.ui = uimod.ui()
48 def filtered(self, name):
48 def filtered(self, name):
49 pass
49 pass
50 def _restrictcapabilities(self, caps):
50 def _restrictcapabilities(self, caps):
51 pass
51 pass
52
52
53 # Facilitates testing sshpeer without requiring an SSH server.
53 # Facilitates testing sshpeer without requiring an SSH server.
54 class badpeer(httppeer.httppeer):
54 class badpeer(httppeer.httppeer):
55 def __init__(self):
55 def __init__(self):
56 super(badpeer, self).__init__(uimod.ui(), 'http://localhost')
56 super(badpeer, self).__init__(uimod.ui(), 'http://localhost')
57 self.badattribute = True
57 self.badattribute = True
58
58
59 def badmethod(self):
59 def badmethod(self):
60 pass
60 pass
61
61
62 def main():
62 def main():
63 ui = uimod.ui()
63 ui = uimod.ui()
64
64
65 checkobject(badpeer())
65 checkobject(badpeer())
66 checkobject(httppeer.httppeer(ui, 'http://localhost'))
66 checkobject(httppeer.httppeer(ui, 'http://localhost'))
67 checkobject(localrepo.localpeer(dummyrepo()))
67 checkobject(localrepo.localpeer(dummyrepo()))
68 checkobject(sshpeer.sshv1peer(ui, 'ssh://localhost/foo', None, None, None,
68 checkobject(sshpeer.sshv1peer(ui, 'ssh://localhost/foo', None, None, None,
69 None, None))
69 None, None))
70 checkobject(sshpeer.sshv2peer(ui, 'ssh://localhost/foo', None, None, None,
71 None, None))
70 checkobject(bundlerepo.bundlepeer(dummyrepo()))
72 checkobject(bundlerepo.bundlepeer(dummyrepo()))
71 checkobject(statichttprepo.statichttppeer(dummyrepo()))
73 checkobject(statichttprepo.statichttppeer(dummyrepo()))
72 checkobject(unionrepo.unionpeer(dummyrepo()))
74 checkobject(unionrepo.unionpeer(dummyrepo()))
73
75
74 main()
76 main()
General Comments 0
You need to be logged in to leave comments. Login now