##// END OF EJS Templates
sshpeer: make pipe polling code more explicit...
Gregory Szorc -
r36387:066e6a9d default
parent child Browse files
Show More
@@ -1,554 +1,557 b''
1 # sshpeer.py - ssh repository proxy class for mercurial
1 # sshpeer.py - ssh repository proxy class for mercurial
2 #
2 #
3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import re
10 import re
11 import uuid
11 import uuid
12
12
13 from .i18n import _
13 from .i18n import _
14 from . import (
14 from . import (
15 error,
15 error,
16 pycompat,
16 pycompat,
17 util,
17 util,
18 wireproto,
18 wireproto,
19 wireprotoserver,
19 wireprotoserver,
20 )
20 )
21
21
22 def _serverquote(s):
22 def _serverquote(s):
23 """quote a string for the remote shell ... which we assume is sh"""
23 """quote a string for the remote shell ... which we assume is sh"""
24 if not s:
24 if not s:
25 return s
25 return s
26 if re.match('[a-zA-Z0-9@%_+=:,./-]*$', s):
26 if re.match('[a-zA-Z0-9@%_+=:,./-]*$', s):
27 return s
27 return s
28 return "'%s'" % s.replace("'", "'\\''")
28 return "'%s'" % s.replace("'", "'\\''")
29
29
30 def _forwardoutput(ui, pipe):
30 def _forwardoutput(ui, pipe):
31 """display all data currently available on pipe as remote output.
31 """display all data currently available on pipe as remote output.
32
32
33 This is non blocking."""
33 This is non blocking."""
34 s = util.readpipe(pipe)
34 s = util.readpipe(pipe)
35 if s:
35 if s:
36 for l in s.splitlines():
36 for l in s.splitlines():
37 ui.status(_("remote: "), l, '\n')
37 ui.status(_("remote: "), l, '\n')
38
38
39 class doublepipe(object):
39 class doublepipe(object):
40 """Operate a side-channel pipe in addition of a main one
40 """Operate a side-channel pipe in addition of a main one
41
41
42 The side-channel pipe contains server output to be forwarded to the user
42 The side-channel pipe contains server output to be forwarded to the user
43 input. The double pipe will behave as the "main" pipe, but will ensure the
43 input. The double pipe will behave as the "main" pipe, but will ensure the
44 content of the "side" pipe is properly processed while we wait for blocking
44 content of the "side" pipe is properly processed while we wait for blocking
45 call on the "main" pipe.
45 call on the "main" pipe.
46
46
47 If large amounts of data are read from "main", the forward will cease after
47 If large amounts of data are read from "main", the forward will cease after
48 the first bytes start to appear. This simplifies the implementation
48 the first bytes start to appear. This simplifies the implementation
49 without affecting actual output of sshpeer too much as we rarely issue
49 without affecting actual output of sshpeer too much as we rarely issue
50 large read for data not yet emitted by the server.
50 large read for data not yet emitted by the server.
51
51
52 The main pipe is expected to be a 'bufferedinputpipe' from the util module
52 The main pipe is expected to be a 'bufferedinputpipe' from the util module
53 that handle all the os specific bits. This class lives in this module
53 that handle all the os specific bits. This class lives in this module
54 because it focus on behavior specific to the ssh protocol."""
54 because it focus on behavior specific to the ssh protocol."""
55
55
56 def __init__(self, ui, main, side):
56 def __init__(self, ui, main, side):
57 self._ui = ui
57 self._ui = ui
58 self._main = main
58 self._main = main
59 self._side = side
59 self._side = side
60
60
61 def _wait(self):
61 def _wait(self):
62 """wait until some data are available on main or side
62 """wait until some data are available on main or side
63
63
64 return a pair of boolean (ismainready, issideready)
64 return a pair of boolean (ismainready, issideready)
65
65
66 (This will only wait for data if the setup is supported by `util.poll`)
66 (This will only wait for data if the setup is supported by `util.poll`)
67 """
67 """
68 if getattr(self._main, 'hasbuffer', False): # getattr for classic pipe
68 if (isinstance(self._main, util.bufferedinputpipe) and
69 return (True, True) # main has data, assume side is worth poking at.
69 self._main.hasbuffer):
70 # Main has data. Assume side is worth poking at.
71 return True, True
72
70 fds = [self._main.fileno(), self._side.fileno()]
73 fds = [self._main.fileno(), self._side.fileno()]
71 try:
74 try:
72 act = util.poll(fds)
75 act = util.poll(fds)
73 except NotImplementedError:
76 except NotImplementedError:
74 # non supported yet case, assume all have data.
77 # non supported yet case, assume all have data.
75 act = fds
78 act = fds
76 return (self._main.fileno() in act, self._side.fileno() in act)
79 return (self._main.fileno() in act, self._side.fileno() in act)
77
80
78 def write(self, data):
81 def write(self, data):
79 return self._call('write', data)
82 return self._call('write', data)
80
83
81 def read(self, size):
84 def read(self, size):
82 r = self._call('read', size)
85 r = self._call('read', size)
83 if size != 0 and not r:
86 if size != 0 and not r:
84 # We've observed a condition that indicates the
87 # We've observed a condition that indicates the
85 # stdout closed unexpectedly. Check stderr one
88 # stdout closed unexpectedly. Check stderr one
86 # more time and snag anything that's there before
89 # more time and snag anything that's there before
87 # letting anyone know the main part of the pipe
90 # letting anyone know the main part of the pipe
88 # closed prematurely.
91 # closed prematurely.
89 _forwardoutput(self._ui, self._side)
92 _forwardoutput(self._ui, self._side)
90 return r
93 return r
91
94
92 def readline(self):
95 def readline(self):
93 return self._call('readline')
96 return self._call('readline')
94
97
95 def _call(self, methname, data=None):
98 def _call(self, methname, data=None):
96 """call <methname> on "main", forward output of "side" while blocking
99 """call <methname> on "main", forward output of "side" while blocking
97 """
100 """
98 # data can be '' or 0
101 # data can be '' or 0
99 if (data is not None and not data) or self._main.closed:
102 if (data is not None and not data) or self._main.closed:
100 _forwardoutput(self._ui, self._side)
103 _forwardoutput(self._ui, self._side)
101 return ''
104 return ''
102 while True:
105 while True:
103 mainready, sideready = self._wait()
106 mainready, sideready = self._wait()
104 if sideready:
107 if sideready:
105 _forwardoutput(self._ui, self._side)
108 _forwardoutput(self._ui, self._side)
106 if mainready:
109 if mainready:
107 meth = getattr(self._main, methname)
110 meth = getattr(self._main, methname)
108 if data is None:
111 if data is None:
109 return meth()
112 return meth()
110 else:
113 else:
111 return meth(data)
114 return meth(data)
112
115
113 def close(self):
116 def close(self):
114 return self._main.close()
117 return self._main.close()
115
118
116 def flush(self):
119 def flush(self):
117 return self._main.flush()
120 return self._main.flush()
118
121
119 def _cleanuppipes(ui, pipei, pipeo, pipee):
122 def _cleanuppipes(ui, pipei, pipeo, pipee):
120 """Clean up pipes used by an SSH connection."""
123 """Clean up pipes used by an SSH connection."""
121 if pipeo:
124 if pipeo:
122 pipeo.close()
125 pipeo.close()
123 if pipei:
126 if pipei:
124 pipei.close()
127 pipei.close()
125
128
126 if pipee:
129 if pipee:
127 # Try to read from the err descriptor until EOF.
130 # Try to read from the err descriptor until EOF.
128 try:
131 try:
129 for l in pipee:
132 for l in pipee:
130 ui.status(_('remote: '), l)
133 ui.status(_('remote: '), l)
131 except (IOError, ValueError):
134 except (IOError, ValueError):
132 pass
135 pass
133
136
134 pipee.close()
137 pipee.close()
135
138
136 def _makeconnection(ui, sshcmd, args, remotecmd, path, sshenv=None):
139 def _makeconnection(ui, sshcmd, args, remotecmd, path, sshenv=None):
137 """Create an SSH connection to a server.
140 """Create an SSH connection to a server.
138
141
139 Returns a tuple of (process, stdin, stdout, stderr) for the
142 Returns a tuple of (process, stdin, stdout, stderr) for the
140 spawned process.
143 spawned process.
141 """
144 """
142 cmd = '%s %s %s' % (
145 cmd = '%s %s %s' % (
143 sshcmd,
146 sshcmd,
144 args,
147 args,
145 util.shellquote('%s -R %s serve --stdio' % (
148 util.shellquote('%s -R %s serve --stdio' % (
146 _serverquote(remotecmd), _serverquote(path))))
149 _serverquote(remotecmd), _serverquote(path))))
147
150
148 ui.debug('running %s\n' % cmd)
151 ui.debug('running %s\n' % cmd)
149 cmd = util.quotecommand(cmd)
152 cmd = util.quotecommand(cmd)
150
153
151 # no buffer allow the use of 'select'
154 # no buffer allow the use of 'select'
152 # feel free to remove buffering and select usage when we ultimately
155 # feel free to remove buffering and select usage when we ultimately
153 # move to threading.
156 # move to threading.
154 stdin, stdout, stderr, proc = util.popen4(cmd, bufsize=0, env=sshenv)
157 stdin, stdout, stderr, proc = util.popen4(cmd, bufsize=0, env=sshenv)
155
158
156 stdout = doublepipe(ui, util.bufferedinputpipe(stdout), stderr)
159 stdout = doublepipe(ui, util.bufferedinputpipe(stdout), stderr)
157 stdin = doublepipe(ui, stdin, stderr)
160 stdin = doublepipe(ui, stdin, stderr)
158
161
159 return proc, stdin, stdout, stderr
162 return proc, stdin, stdout, stderr
160
163
161 def _performhandshake(ui, stdin, stdout, stderr):
164 def _performhandshake(ui, stdin, stdout, stderr):
162 def badresponse():
165 def badresponse():
163 msg = _('no suitable response from remote hg')
166 msg = _('no suitable response from remote hg')
164 hint = ui.config('ui', 'ssherrorhint')
167 hint = ui.config('ui', 'ssherrorhint')
165 raise error.RepoError(msg, hint=hint)
168 raise error.RepoError(msg, hint=hint)
166
169
167 # The handshake consists of sending wire protocol commands in reverse
170 # The handshake consists of sending wire protocol commands in reverse
168 # order of protocol implementation and then sniffing for a response
171 # order of protocol implementation and then sniffing for a response
169 # to one of them.
172 # to one of them.
170 #
173 #
171 # Those commands (from oldest to newest) are:
174 # Those commands (from oldest to newest) are:
172 #
175 #
173 # ``between``
176 # ``between``
174 # Asks for the set of revisions between a pair of revisions. Command
177 # Asks for the set of revisions between a pair of revisions. Command
175 # present in all Mercurial server implementations.
178 # present in all Mercurial server implementations.
176 #
179 #
177 # ``hello``
180 # ``hello``
178 # Instructs the server to advertise its capabilities. Introduced in
181 # Instructs the server to advertise its capabilities. Introduced in
179 # Mercurial 0.9.1.
182 # Mercurial 0.9.1.
180 #
183 #
181 # ``upgrade``
184 # ``upgrade``
182 # Requests upgrade from default transport protocol version 1 to
185 # Requests upgrade from default transport protocol version 1 to
183 # a newer version. Introduced in Mercurial 4.6 as an experimental
186 # a newer version. Introduced in Mercurial 4.6 as an experimental
184 # feature.
187 # feature.
185 #
188 #
186 # The ``between`` command is issued with a request for the null
189 # The ``between`` command is issued with a request for the null
187 # range. If the remote is a Mercurial server, this request will
190 # range. If the remote is a Mercurial server, this request will
188 # generate a specific response: ``1\n\n``. This represents the
191 # generate a specific response: ``1\n\n``. This represents the
189 # wire protocol encoded value for ``\n``. We look for ``1\n\n``
192 # wire protocol encoded value for ``\n``. We look for ``1\n\n``
190 # in the output stream and know this is the response to ``between``
193 # in the output stream and know this is the response to ``between``
191 # and we're at the end of our handshake reply.
194 # and we're at the end of our handshake reply.
192 #
195 #
193 # The response to the ``hello`` command will be a line with the
196 # The response to the ``hello`` command will be a line with the
194 # length of the value returned by that command followed by that
197 # length of the value returned by that command followed by that
195 # value. If the server doesn't support ``hello`` (which should be
198 # value. If the server doesn't support ``hello`` (which should be
196 # rare), that line will be ``0\n``. Otherwise, the value will contain
199 # rare), that line will be ``0\n``. Otherwise, the value will contain
197 # RFC 822 like lines. Of these, the ``capabilities:`` line contains
200 # RFC 822 like lines. Of these, the ``capabilities:`` line contains
198 # the capabilities of the server.
201 # the capabilities of the server.
199 #
202 #
200 # The ``upgrade`` command isn't really a command in the traditional
203 # The ``upgrade`` command isn't really a command in the traditional
201 # sense of version 1 of the transport because it isn't using the
204 # sense of version 1 of the transport because it isn't using the
202 # proper mechanism for formatting insteads: instead, it just encodes
205 # proper mechanism for formatting insteads: instead, it just encodes
203 # arguments on the line, delimited by spaces.
206 # arguments on the line, delimited by spaces.
204 #
207 #
205 # The ``upgrade`` line looks like ``upgrade <token> <capabilities>``.
208 # The ``upgrade`` line looks like ``upgrade <token> <capabilities>``.
206 # If the server doesn't support protocol upgrades, it will reply to
209 # If the server doesn't support protocol upgrades, it will reply to
207 # this line with ``0\n``. Otherwise, it emits an
210 # this line with ``0\n``. Otherwise, it emits an
208 # ``upgraded <token> <protocol>`` line to both stdout and stderr.
211 # ``upgraded <token> <protocol>`` line to both stdout and stderr.
209 # Content immediately following this line describes additional
212 # Content immediately following this line describes additional
210 # protocol and server state.
213 # protocol and server state.
211 #
214 #
212 # In addition to the responses to our command requests, the server
215 # In addition to the responses to our command requests, the server
213 # may emit "banner" output on stdout. SSH servers are allowed to
216 # may emit "banner" output on stdout. SSH servers are allowed to
214 # print messages to stdout on login. Issuing commands on connection
217 # print messages to stdout on login. Issuing commands on connection
215 # allows us to flush this banner output from the server by scanning
218 # allows us to flush this banner output from the server by scanning
216 # for output to our well-known ``between`` command. Of course, if
219 # for output to our well-known ``between`` command. Of course, if
217 # the banner contains ``1\n\n``, this will throw off our detection.
220 # the banner contains ``1\n\n``, this will throw off our detection.
218
221
219 requestlog = ui.configbool('devel', 'debug.peer-request')
222 requestlog = ui.configbool('devel', 'debug.peer-request')
220
223
221 # Generate a random token to help identify responses to version 2
224 # Generate a random token to help identify responses to version 2
222 # upgrade request.
225 # upgrade request.
223 token = pycompat.sysbytes(str(uuid.uuid4()))
226 token = pycompat.sysbytes(str(uuid.uuid4()))
224 upgradecaps = [
227 upgradecaps = [
225 ('proto', wireprotoserver.SSHV2),
228 ('proto', wireprotoserver.SSHV2),
226 ]
229 ]
227 upgradecaps = util.urlreq.urlencode(upgradecaps)
230 upgradecaps = util.urlreq.urlencode(upgradecaps)
228
231
229 try:
232 try:
230 pairsarg = '%s-%s' % ('0' * 40, '0' * 40)
233 pairsarg = '%s-%s' % ('0' * 40, '0' * 40)
231 handshake = [
234 handshake = [
232 'hello\n',
235 'hello\n',
233 'between\n',
236 'between\n',
234 'pairs %d\n' % len(pairsarg),
237 'pairs %d\n' % len(pairsarg),
235 pairsarg,
238 pairsarg,
236 ]
239 ]
237
240
238 # Request upgrade to version 2 if configured.
241 # Request upgrade to version 2 if configured.
239 if ui.configbool('experimental', 'sshpeer.advertise-v2'):
242 if ui.configbool('experimental', 'sshpeer.advertise-v2'):
240 ui.debug('sending upgrade request: %s %s\n' % (token, upgradecaps))
243 ui.debug('sending upgrade request: %s %s\n' % (token, upgradecaps))
241 handshake.insert(0, 'upgrade %s %s\n' % (token, upgradecaps))
244 handshake.insert(0, 'upgrade %s %s\n' % (token, upgradecaps))
242
245
243 if requestlog:
246 if requestlog:
244 ui.debug('devel-peer-request: hello\n')
247 ui.debug('devel-peer-request: hello\n')
245 ui.debug('sending hello command\n')
248 ui.debug('sending hello command\n')
246 if requestlog:
249 if requestlog:
247 ui.debug('devel-peer-request: between\n')
250 ui.debug('devel-peer-request: between\n')
248 ui.debug('devel-peer-request: pairs: %d bytes\n' % len(pairsarg))
251 ui.debug('devel-peer-request: pairs: %d bytes\n' % len(pairsarg))
249 ui.debug('sending between command\n')
252 ui.debug('sending between command\n')
250
253
251 stdin.write(''.join(handshake))
254 stdin.write(''.join(handshake))
252 stdin.flush()
255 stdin.flush()
253 except IOError:
256 except IOError:
254 badresponse()
257 badresponse()
255
258
256 # Assume version 1 of wire protocol by default.
259 # Assume version 1 of wire protocol by default.
257 protoname = wireprotoserver.SSHV1
260 protoname = wireprotoserver.SSHV1
258 reupgraded = re.compile(b'^upgraded %s (.*)$' % re.escape(token))
261 reupgraded = re.compile(b'^upgraded %s (.*)$' % re.escape(token))
259
262
260 lines = ['', 'dummy']
263 lines = ['', 'dummy']
261 max_noise = 500
264 max_noise = 500
262 while lines[-1] and max_noise:
265 while lines[-1] and max_noise:
263 try:
266 try:
264 l = stdout.readline()
267 l = stdout.readline()
265 _forwardoutput(ui, stderr)
268 _forwardoutput(ui, stderr)
266
269
267 # Look for reply to protocol upgrade request. It has a token
270 # Look for reply to protocol upgrade request. It has a token
268 # in it, so there should be no false positives.
271 # in it, so there should be no false positives.
269 m = reupgraded.match(l)
272 m = reupgraded.match(l)
270 if m:
273 if m:
271 protoname = m.group(1)
274 protoname = m.group(1)
272 ui.debug('protocol upgraded to %s\n' % protoname)
275 ui.debug('protocol upgraded to %s\n' % protoname)
273 # If an upgrade was handled, the ``hello`` and ``between``
276 # If an upgrade was handled, the ``hello`` and ``between``
274 # requests are ignored. The next output belongs to the
277 # requests are ignored. The next output belongs to the
275 # protocol, so stop scanning lines.
278 # protocol, so stop scanning lines.
276 break
279 break
277
280
278 # Otherwise it could be a banner, ``0\n`` response if server
281 # Otherwise it could be a banner, ``0\n`` response if server
279 # doesn't support upgrade.
282 # doesn't support upgrade.
280
283
281 if lines[-1] == '1\n' and l == '\n':
284 if lines[-1] == '1\n' and l == '\n':
282 break
285 break
283 if l:
286 if l:
284 ui.debug('remote: ', l)
287 ui.debug('remote: ', l)
285 lines.append(l)
288 lines.append(l)
286 max_noise -= 1
289 max_noise -= 1
287 except IOError:
290 except IOError:
288 badresponse()
291 badresponse()
289 else:
292 else:
290 badresponse()
293 badresponse()
291
294
292 caps = set()
295 caps = set()
293
296
294 # For version 1, we should see a ``capabilities`` line in response to the
297 # For version 1, we should see a ``capabilities`` line in response to the
295 # ``hello`` command.
298 # ``hello`` command.
296 if protoname == wireprotoserver.SSHV1:
299 if protoname == wireprotoserver.SSHV1:
297 for l in reversed(lines):
300 for l in reversed(lines):
298 # Look for response to ``hello`` command. Scan from the back so
301 # Look for response to ``hello`` command. Scan from the back so
299 # we don't misinterpret banner output as the command reply.
302 # we don't misinterpret banner output as the command reply.
300 if l.startswith('capabilities:'):
303 if l.startswith('capabilities:'):
301 caps.update(l[:-1].split(':')[1].split())
304 caps.update(l[:-1].split(':')[1].split())
302 break
305 break
303 elif protoname == wireprotoserver.SSHV2:
306 elif protoname == wireprotoserver.SSHV2:
304 # We see a line with number of bytes to follow and then a value
307 # We see a line with number of bytes to follow and then a value
305 # looking like ``capabilities: *``.
308 # looking like ``capabilities: *``.
306 line = stdout.readline()
309 line = stdout.readline()
307 try:
310 try:
308 valuelen = int(line)
311 valuelen = int(line)
309 except ValueError:
312 except ValueError:
310 badresponse()
313 badresponse()
311
314
312 capsline = stdout.read(valuelen)
315 capsline = stdout.read(valuelen)
313 if not capsline.startswith('capabilities: '):
316 if not capsline.startswith('capabilities: '):
314 badresponse()
317 badresponse()
315
318
316 ui.debug('remote: %s\n' % capsline)
319 ui.debug('remote: %s\n' % capsline)
317
320
318 caps.update(capsline.split(':')[1].split())
321 caps.update(capsline.split(':')[1].split())
319 # Trailing newline.
322 # Trailing newline.
320 stdout.read(1)
323 stdout.read(1)
321
324
322 # Error if we couldn't find capabilities, this means:
325 # Error if we couldn't find capabilities, this means:
323 #
326 #
324 # 1. Remote isn't a Mercurial server
327 # 1. Remote isn't a Mercurial server
325 # 2. Remote is a <0.9.1 Mercurial server
328 # 2. Remote is a <0.9.1 Mercurial server
326 # 3. Remote is a future Mercurial server that dropped ``hello``
329 # 3. Remote is a future Mercurial server that dropped ``hello``
327 # and other attempted handshake mechanisms.
330 # and other attempted handshake mechanisms.
328 if not caps:
331 if not caps:
329 badresponse()
332 badresponse()
330
333
331 return protoname, caps
334 return protoname, caps
332
335
333 class sshv1peer(wireproto.wirepeer):
336 class sshv1peer(wireproto.wirepeer):
334 def __init__(self, ui, url, proc, stdin, stdout, stderr, caps):
337 def __init__(self, ui, url, proc, stdin, stdout, stderr, caps):
335 """Create a peer from an existing SSH connection.
338 """Create a peer from an existing SSH connection.
336
339
337 ``proc`` is a handle on the underlying SSH process.
340 ``proc`` is a handle on the underlying SSH process.
338 ``stdin``, ``stdout``, and ``stderr`` are handles on the stdio
341 ``stdin``, ``stdout``, and ``stderr`` are handles on the stdio
339 pipes for that process.
342 pipes for that process.
340 ``caps`` is a set of capabilities supported by the remote.
343 ``caps`` is a set of capabilities supported by the remote.
341 """
344 """
342 self._url = url
345 self._url = url
343 self._ui = ui
346 self._ui = ui
344 # self._subprocess is unused. Keeping a handle on the process
347 # self._subprocess is unused. Keeping a handle on the process
345 # holds a reference and prevents it from being garbage collected.
348 # holds a reference and prevents it from being garbage collected.
346 self._subprocess = proc
349 self._subprocess = proc
347 self._pipeo = stdin
350 self._pipeo = stdin
348 self._pipei = stdout
351 self._pipei = stdout
349 self._pipee = stderr
352 self._pipee = stderr
350 self._caps = caps
353 self._caps = caps
351
354
352 # Commands that have a "framed" response where the first line of the
355 # Commands that have a "framed" response where the first line of the
353 # response contains the length of that response.
356 # response contains the length of that response.
354 _FRAMED_COMMANDS = {
357 _FRAMED_COMMANDS = {
355 'batch',
358 'batch',
356 }
359 }
357
360
358 # Begin of _basepeer interface.
361 # Begin of _basepeer interface.
359
362
360 @util.propertycache
363 @util.propertycache
361 def ui(self):
364 def ui(self):
362 return self._ui
365 return self._ui
363
366
364 def url(self):
367 def url(self):
365 return self._url
368 return self._url
366
369
367 def local(self):
370 def local(self):
368 return None
371 return None
369
372
370 def peer(self):
373 def peer(self):
371 return self
374 return self
372
375
373 def canpush(self):
376 def canpush(self):
374 return True
377 return True
375
378
376 def close(self):
379 def close(self):
377 pass
380 pass
378
381
379 # End of _basepeer interface.
382 # End of _basepeer interface.
380
383
381 # Begin of _basewirecommands interface.
384 # Begin of _basewirecommands interface.
382
385
383 def capabilities(self):
386 def capabilities(self):
384 return self._caps
387 return self._caps
385
388
386 # End of _basewirecommands interface.
389 # End of _basewirecommands interface.
387
390
388 def _readerr(self):
391 def _readerr(self):
389 _forwardoutput(self.ui, self._pipee)
392 _forwardoutput(self.ui, self._pipee)
390
393
391 def _abort(self, exception):
394 def _abort(self, exception):
392 self._cleanup()
395 self._cleanup()
393 raise exception
396 raise exception
394
397
395 def _cleanup(self):
398 def _cleanup(self):
396 _cleanuppipes(self.ui, self._pipei, self._pipeo, self._pipee)
399 _cleanuppipes(self.ui, self._pipei, self._pipeo, self._pipee)
397
400
398 __del__ = _cleanup
401 __del__ = _cleanup
399
402
400 def _sendrequest(self, cmd, args, framed=False):
403 def _sendrequest(self, cmd, args, framed=False):
401 if (self.ui.debugflag
404 if (self.ui.debugflag
402 and self.ui.configbool('devel', 'debug.peer-request')):
405 and self.ui.configbool('devel', 'debug.peer-request')):
403 dbg = self.ui.debug
406 dbg = self.ui.debug
404 line = 'devel-peer-request: %s\n'
407 line = 'devel-peer-request: %s\n'
405 dbg(line % cmd)
408 dbg(line % cmd)
406 for key, value in sorted(args.items()):
409 for key, value in sorted(args.items()):
407 if not isinstance(value, dict):
410 if not isinstance(value, dict):
408 dbg(line % ' %s: %d bytes' % (key, len(value)))
411 dbg(line % ' %s: %d bytes' % (key, len(value)))
409 else:
412 else:
410 for dk, dv in sorted(value.items()):
413 for dk, dv in sorted(value.items()):
411 dbg(line % ' %s-%s: %d' % (key, dk, len(dv)))
414 dbg(line % ' %s-%s: %d' % (key, dk, len(dv)))
412 self.ui.debug("sending %s command\n" % cmd)
415 self.ui.debug("sending %s command\n" % cmd)
413 self._pipeo.write("%s\n" % cmd)
416 self._pipeo.write("%s\n" % cmd)
414 _func, names = wireproto.commands[cmd]
417 _func, names = wireproto.commands[cmd]
415 keys = names.split()
418 keys = names.split()
416 wireargs = {}
419 wireargs = {}
417 for k in keys:
420 for k in keys:
418 if k == '*':
421 if k == '*':
419 wireargs['*'] = args
422 wireargs['*'] = args
420 break
423 break
421 else:
424 else:
422 wireargs[k] = args[k]
425 wireargs[k] = args[k]
423 del args[k]
426 del args[k]
424 for k, v in sorted(wireargs.iteritems()):
427 for k, v in sorted(wireargs.iteritems()):
425 self._pipeo.write("%s %d\n" % (k, len(v)))
428 self._pipeo.write("%s %d\n" % (k, len(v)))
426 if isinstance(v, dict):
429 if isinstance(v, dict):
427 for dk, dv in v.iteritems():
430 for dk, dv in v.iteritems():
428 self._pipeo.write("%s %d\n" % (dk, len(dv)))
431 self._pipeo.write("%s %d\n" % (dk, len(dv)))
429 self._pipeo.write(dv)
432 self._pipeo.write(dv)
430 else:
433 else:
431 self._pipeo.write(v)
434 self._pipeo.write(v)
432 self._pipeo.flush()
435 self._pipeo.flush()
433
436
434 # We know exactly how many bytes are in the response. So return a proxy
437 # We know exactly how many bytes are in the response. So return a proxy
435 # around the raw output stream that allows reading exactly this many
438 # around the raw output stream that allows reading exactly this many
436 # bytes. Callers then can read() without fear of overrunning the
439 # bytes. Callers then can read() without fear of overrunning the
437 # response.
440 # response.
438 if framed:
441 if framed:
439 amount = self._getamount()
442 amount = self._getamount()
440 return util.cappedreader(self._pipei, amount)
443 return util.cappedreader(self._pipei, amount)
441
444
442 return self._pipei
445 return self._pipei
443
446
444 def _callstream(self, cmd, **args):
447 def _callstream(self, cmd, **args):
445 args = pycompat.byteskwargs(args)
448 args = pycompat.byteskwargs(args)
446 return self._sendrequest(cmd, args, framed=cmd in self._FRAMED_COMMANDS)
449 return self._sendrequest(cmd, args, framed=cmd in self._FRAMED_COMMANDS)
447
450
448 def _callcompressable(self, cmd, **args):
451 def _callcompressable(self, cmd, **args):
449 args = pycompat.byteskwargs(args)
452 args = pycompat.byteskwargs(args)
450 return self._sendrequest(cmd, args, framed=cmd in self._FRAMED_COMMANDS)
453 return self._sendrequest(cmd, args, framed=cmd in self._FRAMED_COMMANDS)
451
454
452 def _call(self, cmd, **args):
455 def _call(self, cmd, **args):
453 args = pycompat.byteskwargs(args)
456 args = pycompat.byteskwargs(args)
454 return self._sendrequest(cmd, args, framed=True).read()
457 return self._sendrequest(cmd, args, framed=True).read()
455
458
456 def _callpush(self, cmd, fp, **args):
459 def _callpush(self, cmd, fp, **args):
457 r = self._call(cmd, **args)
460 r = self._call(cmd, **args)
458 if r:
461 if r:
459 return '', r
462 return '', r
460 for d in iter(lambda: fp.read(4096), ''):
463 for d in iter(lambda: fp.read(4096), ''):
461 self._writeframed(d)
464 self._writeframed(d)
462 self._writeframed("", flush=True)
465 self._writeframed("", flush=True)
463 r = self._readframed()
466 r = self._readframed()
464 if r:
467 if r:
465 return '', r
468 return '', r
466 return self._readframed(), ''
469 return self._readframed(), ''
467
470
468 def _calltwowaystream(self, cmd, fp, **args):
471 def _calltwowaystream(self, cmd, fp, **args):
469 r = self._call(cmd, **args)
472 r = self._call(cmd, **args)
470 if r:
473 if r:
471 # XXX needs to be made better
474 # XXX needs to be made better
472 raise error.Abort(_('unexpected remote reply: %s') % r)
475 raise error.Abort(_('unexpected remote reply: %s') % r)
473 for d in iter(lambda: fp.read(4096), ''):
476 for d in iter(lambda: fp.read(4096), ''):
474 self._writeframed(d)
477 self._writeframed(d)
475 self._writeframed("", flush=True)
478 self._writeframed("", flush=True)
476 return self._pipei
479 return self._pipei
477
480
478 def _getamount(self):
481 def _getamount(self):
479 l = self._pipei.readline()
482 l = self._pipei.readline()
480 if l == '\n':
483 if l == '\n':
481 self._readerr()
484 self._readerr()
482 msg = _('check previous remote output')
485 msg = _('check previous remote output')
483 self._abort(error.OutOfBandError(hint=msg))
486 self._abort(error.OutOfBandError(hint=msg))
484 self._readerr()
487 self._readerr()
485 try:
488 try:
486 return int(l)
489 return int(l)
487 except ValueError:
490 except ValueError:
488 self._abort(error.ResponseError(_("unexpected response:"), l))
491 self._abort(error.ResponseError(_("unexpected response:"), l))
489
492
490 def _readframed(self):
493 def _readframed(self):
491 return self._pipei.read(self._getamount())
494 return self._pipei.read(self._getamount())
492
495
493 def _writeframed(self, data, flush=False):
496 def _writeframed(self, data, flush=False):
494 self._pipeo.write("%d\n" % len(data))
497 self._pipeo.write("%d\n" % len(data))
495 if data:
498 if data:
496 self._pipeo.write(data)
499 self._pipeo.write(data)
497 if flush:
500 if flush:
498 self._pipeo.flush()
501 self._pipeo.flush()
499 self._readerr()
502 self._readerr()
500
503
501 class sshv2peer(sshv1peer):
504 class sshv2peer(sshv1peer):
502 """A peer that speakers version 2 of the transport protocol."""
505 """A peer that speakers version 2 of the transport protocol."""
503 # Currently version 2 is identical to version 1 post handshake.
506 # Currently version 2 is identical to version 1 post handshake.
504 # And handshake is performed before the peer is instantiated. So
507 # And handshake is performed before the peer is instantiated. So
505 # we need no custom code.
508 # we need no custom code.
506
509
507 def instance(ui, path, create):
510 def instance(ui, path, create):
508 """Create an SSH peer.
511 """Create an SSH peer.
509
512
510 The returned object conforms to the ``wireproto.wirepeer`` interface.
513 The returned object conforms to the ``wireproto.wirepeer`` interface.
511 """
514 """
512 u = util.url(path, parsequery=False, parsefragment=False)
515 u = util.url(path, parsequery=False, parsefragment=False)
513 if u.scheme != 'ssh' or not u.host or u.path is None:
516 if u.scheme != 'ssh' or not u.host or u.path is None:
514 raise error.RepoError(_("couldn't parse location %s") % path)
517 raise error.RepoError(_("couldn't parse location %s") % path)
515
518
516 util.checksafessh(path)
519 util.checksafessh(path)
517
520
518 if u.passwd is not None:
521 if u.passwd is not None:
519 raise error.RepoError(_('password in URL not supported'))
522 raise error.RepoError(_('password in URL not supported'))
520
523
521 sshcmd = ui.config('ui', 'ssh')
524 sshcmd = ui.config('ui', 'ssh')
522 remotecmd = ui.config('ui', 'remotecmd')
525 remotecmd = ui.config('ui', 'remotecmd')
523 sshaddenv = dict(ui.configitems('sshenv'))
526 sshaddenv = dict(ui.configitems('sshenv'))
524 sshenv = util.shellenviron(sshaddenv)
527 sshenv = util.shellenviron(sshaddenv)
525 remotepath = u.path or '.'
528 remotepath = u.path or '.'
526
529
527 args = util.sshargs(sshcmd, u.host, u.user, u.port)
530 args = util.sshargs(sshcmd, u.host, u.user, u.port)
528
531
529 if create:
532 if create:
530 cmd = '%s %s %s' % (sshcmd, args,
533 cmd = '%s %s %s' % (sshcmd, args,
531 util.shellquote('%s init %s' %
534 util.shellquote('%s init %s' %
532 (_serverquote(remotecmd), _serverquote(remotepath))))
535 (_serverquote(remotecmd), _serverquote(remotepath))))
533 ui.debug('running %s\n' % cmd)
536 ui.debug('running %s\n' % cmd)
534 res = ui.system(cmd, blockedtag='sshpeer', environ=sshenv)
537 res = ui.system(cmd, blockedtag='sshpeer', environ=sshenv)
535 if res != 0:
538 if res != 0:
536 raise error.RepoError(_('could not create remote repo'))
539 raise error.RepoError(_('could not create remote repo'))
537
540
538 proc, stdin, stdout, stderr = _makeconnection(ui, sshcmd, args, remotecmd,
541 proc, stdin, stdout, stderr = _makeconnection(ui, sshcmd, args, remotecmd,
539 remotepath, sshenv)
542 remotepath, sshenv)
540
543
541 try:
544 try:
542 protoname, caps = _performhandshake(ui, stdin, stdout, stderr)
545 protoname, caps = _performhandshake(ui, stdin, stdout, stderr)
543 except Exception:
546 except Exception:
544 _cleanuppipes(ui, stdout, stdin, stderr)
547 _cleanuppipes(ui, stdout, stdin, stderr)
545 raise
548 raise
546
549
547 if protoname == wireprotoserver.SSHV1:
550 if protoname == wireprotoserver.SSHV1:
548 return sshv1peer(ui, path, proc, stdin, stdout, stderr, caps)
551 return sshv1peer(ui, path, proc, stdin, stdout, stderr, caps)
549 elif protoname == wireprotoserver.SSHV2:
552 elif protoname == wireprotoserver.SSHV2:
550 return sshv2peer(ui, path, proc, stdin, stdout, stderr, caps)
553 return sshv2peer(ui, path, proc, stdin, stdout, stderr, caps)
551 else:
554 else:
552 _cleanuppipes(ui, stdout, stdin, stderr)
555 _cleanuppipes(ui, stdout, stdin, stderr)
553 raise error.RepoError(_('unknown version of SSH protocol: %s') %
556 raise error.RepoError(_('unknown version of SSH protocol: %s') %
554 protoname)
557 protoname)
General Comments 0
You need to be logged in to leave comments. Login now