##// END OF EJS Templates
sshpeer: return framed file object when needed...
Gregory Szorc -
r36385:043e77f3 default
parent child Browse files
Show More
@@ -1,560 +1,554
1 # sshpeer.py - ssh repository proxy class for mercurial
1 # sshpeer.py - ssh repository proxy class for mercurial
2 #
2 #
3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import re
10 import re
11 import uuid
11 import uuid
12
12
13 from .i18n import _
13 from .i18n import _
14 from . import (
14 from . import (
15 error,
15 error,
16 pycompat,
16 pycompat,
17 util,
17 util,
18 wireproto,
18 wireproto,
19 wireprotoserver,
19 wireprotoserver,
20 )
20 )
21
21
22 def _serverquote(s):
22 def _serverquote(s):
23 """quote a string for the remote shell ... which we assume is sh"""
23 """quote a string for the remote shell ... which we assume is sh"""
24 if not s:
24 if not s:
25 return s
25 return s
26 if re.match('[a-zA-Z0-9@%_+=:,./-]*$', s):
26 if re.match('[a-zA-Z0-9@%_+=:,./-]*$', s):
27 return s
27 return s
28 return "'%s'" % s.replace("'", "'\\''")
28 return "'%s'" % s.replace("'", "'\\''")
29
29
30 def _forwardoutput(ui, pipe):
30 def _forwardoutput(ui, pipe):
31 """display all data currently available on pipe as remote output.
31 """display all data currently available on pipe as remote output.
32
32
33 This is non blocking."""
33 This is non blocking."""
34 s = util.readpipe(pipe)
34 s = util.readpipe(pipe)
35 if s:
35 if s:
36 for l in s.splitlines():
36 for l in s.splitlines():
37 ui.status(_("remote: "), l, '\n')
37 ui.status(_("remote: "), l, '\n')
38
38
39 class doublepipe(object):
39 class doublepipe(object):
40 """Operate a side-channel pipe in addition of a main one
40 """Operate a side-channel pipe in addition of a main one
41
41
42 The side-channel pipe contains server output to be forwarded to the user
42 The side-channel pipe contains server output to be forwarded to the user
43 input. The double pipe will behave as the "main" pipe, but will ensure the
43 input. The double pipe will behave as the "main" pipe, but will ensure the
44 content of the "side" pipe is properly processed while we wait for blocking
44 content of the "side" pipe is properly processed while we wait for blocking
45 call on the "main" pipe.
45 call on the "main" pipe.
46
46
47 If large amounts of data are read from "main", the forward will cease after
47 If large amounts of data are read from "main", the forward will cease after
48 the first bytes start to appear. This simplifies the implementation
48 the first bytes start to appear. This simplifies the implementation
49 without affecting actual output of sshpeer too much as we rarely issue
49 without affecting actual output of sshpeer too much as we rarely issue
50 large read for data not yet emitted by the server.
50 large read for data not yet emitted by the server.
51
51
52 The main pipe is expected to be a 'bufferedinputpipe' from the util module
52 The main pipe is expected to be a 'bufferedinputpipe' from the util module
53 that handle all the os specific bits. This class lives in this module
53 that handle all the os specific bits. This class lives in this module
54 because it focus on behavior specific to the ssh protocol."""
54 because it focus on behavior specific to the ssh protocol."""
55
55
56 def __init__(self, ui, main, side):
56 def __init__(self, ui, main, side):
57 self._ui = ui
57 self._ui = ui
58 self._main = main
58 self._main = main
59 self._side = side
59 self._side = side
60
60
61 def _wait(self):
61 def _wait(self):
62 """wait until some data are available on main or side
62 """wait until some data are available on main or side
63
63
64 return a pair of boolean (ismainready, issideready)
64 return a pair of boolean (ismainready, issideready)
65
65
66 (This will only wait for data if the setup is supported by `util.poll`)
66 (This will only wait for data if the setup is supported by `util.poll`)
67 """
67 """
68 if getattr(self._main, 'hasbuffer', False): # getattr for classic pipe
68 if getattr(self._main, 'hasbuffer', False): # getattr for classic pipe
69 return (True, True) # main has data, assume side is worth poking at.
69 return (True, True) # main has data, assume side is worth poking at.
70 fds = [self._main.fileno(), self._side.fileno()]
70 fds = [self._main.fileno(), self._side.fileno()]
71 try:
71 try:
72 act = util.poll(fds)
72 act = util.poll(fds)
73 except NotImplementedError:
73 except NotImplementedError:
74 # non supported yet case, assume all have data.
74 # non supported yet case, assume all have data.
75 act = fds
75 act = fds
76 return (self._main.fileno() in act, self._side.fileno() in act)
76 return (self._main.fileno() in act, self._side.fileno() in act)
77
77
78 def write(self, data):
78 def write(self, data):
79 return self._call('write', data)
79 return self._call('write', data)
80
80
81 def read(self, size):
81 def read(self, size):
82 r = self._call('read', size)
82 r = self._call('read', size)
83 if size != 0 and not r:
83 if size != 0 and not r:
84 # We've observed a condition that indicates the
84 # We've observed a condition that indicates the
85 # stdout closed unexpectedly. Check stderr one
85 # stdout closed unexpectedly. Check stderr one
86 # more time and snag anything that's there before
86 # more time and snag anything that's there before
87 # letting anyone know the main part of the pipe
87 # letting anyone know the main part of the pipe
88 # closed prematurely.
88 # closed prematurely.
89 _forwardoutput(self._ui, self._side)
89 _forwardoutput(self._ui, self._side)
90 return r
90 return r
91
91
92 def readline(self):
92 def readline(self):
93 return self._call('readline')
93 return self._call('readline')
94
94
95 def _call(self, methname, data=None):
95 def _call(self, methname, data=None):
96 """call <methname> on "main", forward output of "side" while blocking
96 """call <methname> on "main", forward output of "side" while blocking
97 """
97 """
98 # data can be '' or 0
98 # data can be '' or 0
99 if (data is not None and not data) or self._main.closed:
99 if (data is not None and not data) or self._main.closed:
100 _forwardoutput(self._ui, self._side)
100 _forwardoutput(self._ui, self._side)
101 return ''
101 return ''
102 while True:
102 while True:
103 mainready, sideready = self._wait()
103 mainready, sideready = self._wait()
104 if sideready:
104 if sideready:
105 _forwardoutput(self._ui, self._side)
105 _forwardoutput(self._ui, self._side)
106 if mainready:
106 if mainready:
107 meth = getattr(self._main, methname)
107 meth = getattr(self._main, methname)
108 if data is None:
108 if data is None:
109 return meth()
109 return meth()
110 else:
110 else:
111 return meth(data)
111 return meth(data)
112
112
113 def close(self):
113 def close(self):
114 return self._main.close()
114 return self._main.close()
115
115
116 def flush(self):
116 def flush(self):
117 return self._main.flush()
117 return self._main.flush()
118
118
119 def _cleanuppipes(ui, pipei, pipeo, pipee):
119 def _cleanuppipes(ui, pipei, pipeo, pipee):
120 """Clean up pipes used by an SSH connection."""
120 """Clean up pipes used by an SSH connection."""
121 if pipeo:
121 if pipeo:
122 pipeo.close()
122 pipeo.close()
123 if pipei:
123 if pipei:
124 pipei.close()
124 pipei.close()
125
125
126 if pipee:
126 if pipee:
127 # Try to read from the err descriptor until EOF.
127 # Try to read from the err descriptor until EOF.
128 try:
128 try:
129 for l in pipee:
129 for l in pipee:
130 ui.status(_('remote: '), l)
130 ui.status(_('remote: '), l)
131 except (IOError, ValueError):
131 except (IOError, ValueError):
132 pass
132 pass
133
133
134 pipee.close()
134 pipee.close()
135
135
136 def _makeconnection(ui, sshcmd, args, remotecmd, path, sshenv=None):
136 def _makeconnection(ui, sshcmd, args, remotecmd, path, sshenv=None):
137 """Create an SSH connection to a server.
137 """Create an SSH connection to a server.
138
138
139 Returns a tuple of (process, stdin, stdout, stderr) for the
139 Returns a tuple of (process, stdin, stdout, stderr) for the
140 spawned process.
140 spawned process.
141 """
141 """
142 cmd = '%s %s %s' % (
142 cmd = '%s %s %s' % (
143 sshcmd,
143 sshcmd,
144 args,
144 args,
145 util.shellquote('%s -R %s serve --stdio' % (
145 util.shellquote('%s -R %s serve --stdio' % (
146 _serverquote(remotecmd), _serverquote(path))))
146 _serverquote(remotecmd), _serverquote(path))))
147
147
148 ui.debug('running %s\n' % cmd)
148 ui.debug('running %s\n' % cmd)
149 cmd = util.quotecommand(cmd)
149 cmd = util.quotecommand(cmd)
150
150
151 # no buffer allow the use of 'select'
151 # no buffer allow the use of 'select'
152 # feel free to remove buffering and select usage when we ultimately
152 # feel free to remove buffering and select usage when we ultimately
153 # move to threading.
153 # move to threading.
154 stdin, stdout, stderr, proc = util.popen4(cmd, bufsize=0, env=sshenv)
154 stdin, stdout, stderr, proc = util.popen4(cmd, bufsize=0, env=sshenv)
155
155
156 stdout = doublepipe(ui, util.bufferedinputpipe(stdout), stderr)
156 stdout = doublepipe(ui, util.bufferedinputpipe(stdout), stderr)
157 stdin = doublepipe(ui, stdin, stderr)
157 stdin = doublepipe(ui, stdin, stderr)
158
158
159 return proc, stdin, stdout, stderr
159 return proc, stdin, stdout, stderr
160
160
161 def _performhandshake(ui, stdin, stdout, stderr):
161 def _performhandshake(ui, stdin, stdout, stderr):
162 def badresponse():
162 def badresponse():
163 msg = _('no suitable response from remote hg')
163 msg = _('no suitable response from remote hg')
164 hint = ui.config('ui', 'ssherrorhint')
164 hint = ui.config('ui', 'ssherrorhint')
165 raise error.RepoError(msg, hint=hint)
165 raise error.RepoError(msg, hint=hint)
166
166
167 # The handshake consists of sending wire protocol commands in reverse
167 # The handshake consists of sending wire protocol commands in reverse
168 # order of protocol implementation and then sniffing for a response
168 # order of protocol implementation and then sniffing for a response
169 # to one of them.
169 # to one of them.
170 #
170 #
171 # Those commands (from oldest to newest) are:
171 # Those commands (from oldest to newest) are:
172 #
172 #
173 # ``between``
173 # ``between``
174 # Asks for the set of revisions between a pair of revisions. Command
174 # Asks for the set of revisions between a pair of revisions. Command
175 # present in all Mercurial server implementations.
175 # present in all Mercurial server implementations.
176 #
176 #
177 # ``hello``
177 # ``hello``
178 # Instructs the server to advertise its capabilities. Introduced in
178 # Instructs the server to advertise its capabilities. Introduced in
179 # Mercurial 0.9.1.
179 # Mercurial 0.9.1.
180 #
180 #
181 # ``upgrade``
181 # ``upgrade``
182 # Requests upgrade from default transport protocol version 1 to
182 # Requests upgrade from default transport protocol version 1 to
183 # a newer version. Introduced in Mercurial 4.6 as an experimental
183 # a newer version. Introduced in Mercurial 4.6 as an experimental
184 # feature.
184 # feature.
185 #
185 #
186 # The ``between`` command is issued with a request for the null
186 # The ``between`` command is issued with a request for the null
187 # range. If the remote is a Mercurial server, this request will
187 # range. If the remote is a Mercurial server, this request will
188 # generate a specific response: ``1\n\n``. This represents the
188 # generate a specific response: ``1\n\n``. This represents the
189 # wire protocol encoded value for ``\n``. We look for ``1\n\n``
189 # wire protocol encoded value for ``\n``. We look for ``1\n\n``
190 # in the output stream and know this is the response to ``between``
190 # in the output stream and know this is the response to ``between``
191 # and we're at the end of our handshake reply.
191 # and we're at the end of our handshake reply.
192 #
192 #
193 # The response to the ``hello`` command will be a line with the
193 # The response to the ``hello`` command will be a line with the
194 # length of the value returned by that command followed by that
194 # length of the value returned by that command followed by that
195 # value. If the server doesn't support ``hello`` (which should be
195 # value. If the server doesn't support ``hello`` (which should be
196 # rare), that line will be ``0\n``. Otherwise, the value will contain
196 # rare), that line will be ``0\n``. Otherwise, the value will contain
197 # RFC 822 like lines. Of these, the ``capabilities:`` line contains
197 # RFC 822 like lines. Of these, the ``capabilities:`` line contains
198 # the capabilities of the server.
198 # the capabilities of the server.
199 #
199 #
200 # The ``upgrade`` command isn't really a command in the traditional
200 # The ``upgrade`` command isn't really a command in the traditional
201 # sense of version 1 of the transport because it isn't using the
201 # sense of version 1 of the transport because it isn't using the
202 # proper mechanism for formatting insteads: instead, it just encodes
202 # proper mechanism for formatting insteads: instead, it just encodes
203 # arguments on the line, delimited by spaces.
203 # arguments on the line, delimited by spaces.
204 #
204 #
205 # The ``upgrade`` line looks like ``upgrade <token> <capabilities>``.
205 # The ``upgrade`` line looks like ``upgrade <token> <capabilities>``.
206 # If the server doesn't support protocol upgrades, it will reply to
206 # If the server doesn't support protocol upgrades, it will reply to
207 # this line with ``0\n``. Otherwise, it emits an
207 # this line with ``0\n``. Otherwise, it emits an
208 # ``upgraded <token> <protocol>`` line to both stdout and stderr.
208 # ``upgraded <token> <protocol>`` line to both stdout and stderr.
209 # Content immediately following this line describes additional
209 # Content immediately following this line describes additional
210 # protocol and server state.
210 # protocol and server state.
211 #
211 #
212 # In addition to the responses to our command requests, the server
212 # In addition to the responses to our command requests, the server
213 # may emit "banner" output on stdout. SSH servers are allowed to
213 # may emit "banner" output on stdout. SSH servers are allowed to
214 # print messages to stdout on login. Issuing commands on connection
214 # print messages to stdout on login. Issuing commands on connection
215 # allows us to flush this banner output from the server by scanning
215 # allows us to flush this banner output from the server by scanning
216 # for output to our well-known ``between`` command. Of course, if
216 # for output to our well-known ``between`` command. Of course, if
217 # the banner contains ``1\n\n``, this will throw off our detection.
217 # the banner contains ``1\n\n``, this will throw off our detection.
218
218
219 requestlog = ui.configbool('devel', 'debug.peer-request')
219 requestlog = ui.configbool('devel', 'debug.peer-request')
220
220
221 # Generate a random token to help identify responses to version 2
221 # Generate a random token to help identify responses to version 2
222 # upgrade request.
222 # upgrade request.
223 token = pycompat.sysbytes(str(uuid.uuid4()))
223 token = pycompat.sysbytes(str(uuid.uuid4()))
224 upgradecaps = [
224 upgradecaps = [
225 ('proto', wireprotoserver.SSHV2),
225 ('proto', wireprotoserver.SSHV2),
226 ]
226 ]
227 upgradecaps = util.urlreq.urlencode(upgradecaps)
227 upgradecaps = util.urlreq.urlencode(upgradecaps)
228
228
229 try:
229 try:
230 pairsarg = '%s-%s' % ('0' * 40, '0' * 40)
230 pairsarg = '%s-%s' % ('0' * 40, '0' * 40)
231 handshake = [
231 handshake = [
232 'hello\n',
232 'hello\n',
233 'between\n',
233 'between\n',
234 'pairs %d\n' % len(pairsarg),
234 'pairs %d\n' % len(pairsarg),
235 pairsarg,
235 pairsarg,
236 ]
236 ]
237
237
238 # Request upgrade to version 2 if configured.
238 # Request upgrade to version 2 if configured.
239 if ui.configbool('experimental', 'sshpeer.advertise-v2'):
239 if ui.configbool('experimental', 'sshpeer.advertise-v2'):
240 ui.debug('sending upgrade request: %s %s\n' % (token, upgradecaps))
240 ui.debug('sending upgrade request: %s %s\n' % (token, upgradecaps))
241 handshake.insert(0, 'upgrade %s %s\n' % (token, upgradecaps))
241 handshake.insert(0, 'upgrade %s %s\n' % (token, upgradecaps))
242
242
243 if requestlog:
243 if requestlog:
244 ui.debug('devel-peer-request: hello\n')
244 ui.debug('devel-peer-request: hello\n')
245 ui.debug('sending hello command\n')
245 ui.debug('sending hello command\n')
246 if requestlog:
246 if requestlog:
247 ui.debug('devel-peer-request: between\n')
247 ui.debug('devel-peer-request: between\n')
248 ui.debug('devel-peer-request: pairs: %d bytes\n' % len(pairsarg))
248 ui.debug('devel-peer-request: pairs: %d bytes\n' % len(pairsarg))
249 ui.debug('sending between command\n')
249 ui.debug('sending between command\n')
250
250
251 stdin.write(''.join(handshake))
251 stdin.write(''.join(handshake))
252 stdin.flush()
252 stdin.flush()
253 except IOError:
253 except IOError:
254 badresponse()
254 badresponse()
255
255
256 # Assume version 1 of wire protocol by default.
256 # Assume version 1 of wire protocol by default.
257 protoname = wireprotoserver.SSHV1
257 protoname = wireprotoserver.SSHV1
258 reupgraded = re.compile(b'^upgraded %s (.*)$' % re.escape(token))
258 reupgraded = re.compile(b'^upgraded %s (.*)$' % re.escape(token))
259
259
260 lines = ['', 'dummy']
260 lines = ['', 'dummy']
261 max_noise = 500
261 max_noise = 500
262 while lines[-1] and max_noise:
262 while lines[-1] and max_noise:
263 try:
263 try:
264 l = stdout.readline()
264 l = stdout.readline()
265 _forwardoutput(ui, stderr)
265 _forwardoutput(ui, stderr)
266
266
267 # Look for reply to protocol upgrade request. It has a token
267 # Look for reply to protocol upgrade request. It has a token
268 # in it, so there should be no false positives.
268 # in it, so there should be no false positives.
269 m = reupgraded.match(l)
269 m = reupgraded.match(l)
270 if m:
270 if m:
271 protoname = m.group(1)
271 protoname = m.group(1)
272 ui.debug('protocol upgraded to %s\n' % protoname)
272 ui.debug('protocol upgraded to %s\n' % protoname)
273 # If an upgrade was handled, the ``hello`` and ``between``
273 # If an upgrade was handled, the ``hello`` and ``between``
274 # requests are ignored. The next output belongs to the
274 # requests are ignored. The next output belongs to the
275 # protocol, so stop scanning lines.
275 # protocol, so stop scanning lines.
276 break
276 break
277
277
278 # Otherwise it could be a banner, ``0\n`` response if server
278 # Otherwise it could be a banner, ``0\n`` response if server
279 # doesn't support upgrade.
279 # doesn't support upgrade.
280
280
281 if lines[-1] == '1\n' and l == '\n':
281 if lines[-1] == '1\n' and l == '\n':
282 break
282 break
283 if l:
283 if l:
284 ui.debug('remote: ', l)
284 ui.debug('remote: ', l)
285 lines.append(l)
285 lines.append(l)
286 max_noise -= 1
286 max_noise -= 1
287 except IOError:
287 except IOError:
288 badresponse()
288 badresponse()
289 else:
289 else:
290 badresponse()
290 badresponse()
291
291
292 caps = set()
292 caps = set()
293
293
294 # For version 1, we should see a ``capabilities`` line in response to the
294 # For version 1, we should see a ``capabilities`` line in response to the
295 # ``hello`` command.
295 # ``hello`` command.
296 if protoname == wireprotoserver.SSHV1:
296 if protoname == wireprotoserver.SSHV1:
297 for l in reversed(lines):
297 for l in reversed(lines):
298 # Look for response to ``hello`` command. Scan from the back so
298 # Look for response to ``hello`` command. Scan from the back so
299 # we don't misinterpret banner output as the command reply.
299 # we don't misinterpret banner output as the command reply.
300 if l.startswith('capabilities:'):
300 if l.startswith('capabilities:'):
301 caps.update(l[:-1].split(':')[1].split())
301 caps.update(l[:-1].split(':')[1].split())
302 break
302 break
303 elif protoname == wireprotoserver.SSHV2:
303 elif protoname == wireprotoserver.SSHV2:
304 # We see a line with number of bytes to follow and then a value
304 # We see a line with number of bytes to follow and then a value
305 # looking like ``capabilities: *``.
305 # looking like ``capabilities: *``.
306 line = stdout.readline()
306 line = stdout.readline()
307 try:
307 try:
308 valuelen = int(line)
308 valuelen = int(line)
309 except ValueError:
309 except ValueError:
310 badresponse()
310 badresponse()
311
311
312 capsline = stdout.read(valuelen)
312 capsline = stdout.read(valuelen)
313 if not capsline.startswith('capabilities: '):
313 if not capsline.startswith('capabilities: '):
314 badresponse()
314 badresponse()
315
315
316 ui.debug('remote: %s\n' % capsline)
316 ui.debug('remote: %s\n' % capsline)
317
317
318 caps.update(capsline.split(':')[1].split())
318 caps.update(capsline.split(':')[1].split())
319 # Trailing newline.
319 # Trailing newline.
320 stdout.read(1)
320 stdout.read(1)
321
321
322 # Error if we couldn't find capabilities, this means:
322 # Error if we couldn't find capabilities, this means:
323 #
323 #
324 # 1. Remote isn't a Mercurial server
324 # 1. Remote isn't a Mercurial server
325 # 2. Remote is a <0.9.1 Mercurial server
325 # 2. Remote is a <0.9.1 Mercurial server
326 # 3. Remote is a future Mercurial server that dropped ``hello``
326 # 3. Remote is a future Mercurial server that dropped ``hello``
327 # and other attempted handshake mechanisms.
327 # and other attempted handshake mechanisms.
328 if not caps:
328 if not caps:
329 badresponse()
329 badresponse()
330
330
331 return protoname, caps
331 return protoname, caps
332
332
333 class sshv1peer(wireproto.wirepeer):
333 class sshv1peer(wireproto.wirepeer):
334 def __init__(self, ui, url, proc, stdin, stdout, stderr, caps):
334 def __init__(self, ui, url, proc, stdin, stdout, stderr, caps):
335 """Create a peer from an existing SSH connection.
335 """Create a peer from an existing SSH connection.
336
336
337 ``proc`` is a handle on the underlying SSH process.
337 ``proc`` is a handle on the underlying SSH process.
338 ``stdin``, ``stdout``, and ``stderr`` are handles on the stdio
338 ``stdin``, ``stdout``, and ``stderr`` are handles on the stdio
339 pipes for that process.
339 pipes for that process.
340 ``caps`` is a set of capabilities supported by the remote.
340 ``caps`` is a set of capabilities supported by the remote.
341 """
341 """
342 self._url = url
342 self._url = url
343 self._ui = ui
343 self._ui = ui
344 # self._subprocess is unused. Keeping a handle on the process
344 # self._subprocess is unused. Keeping a handle on the process
345 # holds a reference and prevents it from being garbage collected.
345 # holds a reference and prevents it from being garbage collected.
346 self._subprocess = proc
346 self._subprocess = proc
347 self._pipeo = stdin
347 self._pipeo = stdin
348 self._pipei = stdout
348 self._pipei = stdout
349 self._pipee = stderr
349 self._pipee = stderr
350 self._caps = caps
350 self._caps = caps
351
351
352 # Commands that have a "framed" response where the first line of the
353 # response contains the length of that response.
354 _FRAMED_COMMANDS = {
355 'batch',
356 }
357
352 # Begin of _basepeer interface.
358 # Begin of _basepeer interface.
353
359
354 @util.propertycache
360 @util.propertycache
355 def ui(self):
361 def ui(self):
356 return self._ui
362 return self._ui
357
363
358 def url(self):
364 def url(self):
359 return self._url
365 return self._url
360
366
361 def local(self):
367 def local(self):
362 return None
368 return None
363
369
364 def peer(self):
370 def peer(self):
365 return self
371 return self
366
372
367 def canpush(self):
373 def canpush(self):
368 return True
374 return True
369
375
370 def close(self):
376 def close(self):
371 pass
377 pass
372
378
373 # End of _basepeer interface.
379 # End of _basepeer interface.
374
380
375 # Begin of _basewirecommands interface.
381 # Begin of _basewirecommands interface.
376
382
377 def capabilities(self):
383 def capabilities(self):
378 return self._caps
384 return self._caps
379
385
380 # End of _basewirecommands interface.
386 # End of _basewirecommands interface.
381
387
382 def _readerr(self):
388 def _readerr(self):
383 _forwardoutput(self.ui, self._pipee)
389 _forwardoutput(self.ui, self._pipee)
384
390
385 def _abort(self, exception):
391 def _abort(self, exception):
386 self._cleanup()
392 self._cleanup()
387 raise exception
393 raise exception
388
394
389 def _cleanup(self):
395 def _cleanup(self):
390 _cleanuppipes(self.ui, self._pipei, self._pipeo, self._pipee)
396 _cleanuppipes(self.ui, self._pipei, self._pipeo, self._pipee)
391
397
392 __del__ = _cleanup
398 __del__ = _cleanup
393
399
394 def _submitbatch(self, req):
400 def _sendrequest(self, cmd, args, framed=False):
395 rsp = self._callstream("batch", cmds=wireproto.encodebatchcmds(req))
396 available = self._getamount()
397 # TODO this response parsing is probably suboptimal for large
398 # batches with large responses.
399 toread = min(available, 1024)
400 work = rsp.read(toread)
401 available -= toread
402 chunk = work
403 while chunk:
404 while ';' in work:
405 one, work = work.split(';', 1)
406 yield wireproto.unescapearg(one)
407 toread = min(available, 1024)
408 chunk = rsp.read(toread)
409 available -= toread
410 work += chunk
411 yield wireproto.unescapearg(work)
412
413 def _sendrequest(self, cmd, args):
414 if (self.ui.debugflag
401 if (self.ui.debugflag
415 and self.ui.configbool('devel', 'debug.peer-request')):
402 and self.ui.configbool('devel', 'debug.peer-request')):
416 dbg = self.ui.debug
403 dbg = self.ui.debug
417 line = 'devel-peer-request: %s\n'
404 line = 'devel-peer-request: %s\n'
418 dbg(line % cmd)
405 dbg(line % cmd)
419 for key, value in sorted(args.items()):
406 for key, value in sorted(args.items()):
420 if not isinstance(value, dict):
407 if not isinstance(value, dict):
421 dbg(line % ' %s: %d bytes' % (key, len(value)))
408 dbg(line % ' %s: %d bytes' % (key, len(value)))
422 else:
409 else:
423 for dk, dv in sorted(value.items()):
410 for dk, dv in sorted(value.items()):
424 dbg(line % ' %s-%s: %d' % (key, dk, len(dv)))
411 dbg(line % ' %s-%s: %d' % (key, dk, len(dv)))
425 self.ui.debug("sending %s command\n" % cmd)
412 self.ui.debug("sending %s command\n" % cmd)
426 self._pipeo.write("%s\n" % cmd)
413 self._pipeo.write("%s\n" % cmd)
427 _func, names = wireproto.commands[cmd]
414 _func, names = wireproto.commands[cmd]
428 keys = names.split()
415 keys = names.split()
429 wireargs = {}
416 wireargs = {}
430 for k in keys:
417 for k in keys:
431 if k == '*':
418 if k == '*':
432 wireargs['*'] = args
419 wireargs['*'] = args
433 break
420 break
434 else:
421 else:
435 wireargs[k] = args[k]
422 wireargs[k] = args[k]
436 del args[k]
423 del args[k]
437 for k, v in sorted(wireargs.iteritems()):
424 for k, v in sorted(wireargs.iteritems()):
438 self._pipeo.write("%s %d\n" % (k, len(v)))
425 self._pipeo.write("%s %d\n" % (k, len(v)))
439 if isinstance(v, dict):
426 if isinstance(v, dict):
440 for dk, dv in v.iteritems():
427 for dk, dv in v.iteritems():
441 self._pipeo.write("%s %d\n" % (dk, len(dv)))
428 self._pipeo.write("%s %d\n" % (dk, len(dv)))
442 self._pipeo.write(dv)
429 self._pipeo.write(dv)
443 else:
430 else:
444 self._pipeo.write(v)
431 self._pipeo.write(v)
445 self._pipeo.flush()
432 self._pipeo.flush()
446
433
434 # We know exactly how many bytes are in the response. So return a proxy
435 # around the raw output stream that allows reading exactly this many
436 # bytes. Callers then can read() without fear of overrunning the
437 # response.
438 if framed:
439 amount = self._getamount()
440 return util.cappedreader(self._pipei, amount)
441
447 return self._pipei
442 return self._pipei
448
443
449 def _callstream(self, cmd, **args):
444 def _callstream(self, cmd, **args):
450 args = pycompat.byteskwargs(args)
445 args = pycompat.byteskwargs(args)
451 return self._sendrequest(cmd, args)
446 return self._sendrequest(cmd, args, framed=cmd in self._FRAMED_COMMANDS)
452
447
453 def _callcompressable(self, cmd, **args):
448 def _callcompressable(self, cmd, **args):
454 args = pycompat.byteskwargs(args)
449 args = pycompat.byteskwargs(args)
455 return self._sendrequest(cmd, args)
450 return self._sendrequest(cmd, args, framed=cmd in self._FRAMED_COMMANDS)
456
451
457 def _call(self, cmd, **args):
452 def _call(self, cmd, **args):
458 args = pycompat.byteskwargs(args)
453 args = pycompat.byteskwargs(args)
459 self._sendrequest(cmd, args)
454 return self._sendrequest(cmd, args, framed=True).read()
460 return self._readframed()
461
455
462 def _callpush(self, cmd, fp, **args):
456 def _callpush(self, cmd, fp, **args):
463 r = self._call(cmd, **args)
457 r = self._call(cmd, **args)
464 if r:
458 if r:
465 return '', r
459 return '', r
466 for d in iter(lambda: fp.read(4096), ''):
460 for d in iter(lambda: fp.read(4096), ''):
467 self._writeframed(d)
461 self._writeframed(d)
468 self._writeframed("", flush=True)
462 self._writeframed("", flush=True)
469 r = self._readframed()
463 r = self._readframed()
470 if r:
464 if r:
471 return '', r
465 return '', r
472 return self._readframed(), ''
466 return self._readframed(), ''
473
467
474 def _calltwowaystream(self, cmd, fp, **args):
468 def _calltwowaystream(self, cmd, fp, **args):
475 r = self._call(cmd, **args)
469 r = self._call(cmd, **args)
476 if r:
470 if r:
477 # XXX needs to be made better
471 # XXX needs to be made better
478 raise error.Abort(_('unexpected remote reply: %s') % r)
472 raise error.Abort(_('unexpected remote reply: %s') % r)
479 for d in iter(lambda: fp.read(4096), ''):
473 for d in iter(lambda: fp.read(4096), ''):
480 self._writeframed(d)
474 self._writeframed(d)
481 self._writeframed("", flush=True)
475 self._writeframed("", flush=True)
482 return self._pipei
476 return self._pipei
483
477
484 def _getamount(self):
478 def _getamount(self):
485 l = self._pipei.readline()
479 l = self._pipei.readline()
486 if l == '\n':
480 if l == '\n':
487 self._readerr()
481 self._readerr()
488 msg = _('check previous remote output')
482 msg = _('check previous remote output')
489 self._abort(error.OutOfBandError(hint=msg))
483 self._abort(error.OutOfBandError(hint=msg))
490 self._readerr()
484 self._readerr()
491 try:
485 try:
492 return int(l)
486 return int(l)
493 except ValueError:
487 except ValueError:
494 self._abort(error.ResponseError(_("unexpected response:"), l))
488 self._abort(error.ResponseError(_("unexpected response:"), l))
495
489
496 def _readframed(self):
490 def _readframed(self):
497 return self._pipei.read(self._getamount())
491 return self._pipei.read(self._getamount())
498
492
499 def _writeframed(self, data, flush=False):
493 def _writeframed(self, data, flush=False):
500 self._pipeo.write("%d\n" % len(data))
494 self._pipeo.write("%d\n" % len(data))
501 if data:
495 if data:
502 self._pipeo.write(data)
496 self._pipeo.write(data)
503 if flush:
497 if flush:
504 self._pipeo.flush()
498 self._pipeo.flush()
505 self._readerr()
499 self._readerr()
506
500
507 class sshv2peer(sshv1peer):
501 class sshv2peer(sshv1peer):
508 """A peer that speakers version 2 of the transport protocol."""
502 """A peer that speakers version 2 of the transport protocol."""
509 # Currently version 2 is identical to version 1 post handshake.
503 # Currently version 2 is identical to version 1 post handshake.
510 # And handshake is performed before the peer is instantiated. So
504 # And handshake is performed before the peer is instantiated. So
511 # we need no custom code.
505 # we need no custom code.
512
506
513 def instance(ui, path, create):
507 def instance(ui, path, create):
514 """Create an SSH peer.
508 """Create an SSH peer.
515
509
516 The returned object conforms to the ``wireproto.wirepeer`` interface.
510 The returned object conforms to the ``wireproto.wirepeer`` interface.
517 """
511 """
518 u = util.url(path, parsequery=False, parsefragment=False)
512 u = util.url(path, parsequery=False, parsefragment=False)
519 if u.scheme != 'ssh' or not u.host or u.path is None:
513 if u.scheme != 'ssh' or not u.host or u.path is None:
520 raise error.RepoError(_("couldn't parse location %s") % path)
514 raise error.RepoError(_("couldn't parse location %s") % path)
521
515
522 util.checksafessh(path)
516 util.checksafessh(path)
523
517
524 if u.passwd is not None:
518 if u.passwd is not None:
525 raise error.RepoError(_('password in URL not supported'))
519 raise error.RepoError(_('password in URL not supported'))
526
520
527 sshcmd = ui.config('ui', 'ssh')
521 sshcmd = ui.config('ui', 'ssh')
528 remotecmd = ui.config('ui', 'remotecmd')
522 remotecmd = ui.config('ui', 'remotecmd')
529 sshaddenv = dict(ui.configitems('sshenv'))
523 sshaddenv = dict(ui.configitems('sshenv'))
530 sshenv = util.shellenviron(sshaddenv)
524 sshenv = util.shellenviron(sshaddenv)
531 remotepath = u.path or '.'
525 remotepath = u.path or '.'
532
526
533 args = util.sshargs(sshcmd, u.host, u.user, u.port)
527 args = util.sshargs(sshcmd, u.host, u.user, u.port)
534
528
535 if create:
529 if create:
536 cmd = '%s %s %s' % (sshcmd, args,
530 cmd = '%s %s %s' % (sshcmd, args,
537 util.shellquote('%s init %s' %
531 util.shellquote('%s init %s' %
538 (_serverquote(remotecmd), _serverquote(remotepath))))
532 (_serverquote(remotecmd), _serverquote(remotepath))))
539 ui.debug('running %s\n' % cmd)
533 ui.debug('running %s\n' % cmd)
540 res = ui.system(cmd, blockedtag='sshpeer', environ=sshenv)
534 res = ui.system(cmd, blockedtag='sshpeer', environ=sshenv)
541 if res != 0:
535 if res != 0:
542 raise error.RepoError(_('could not create remote repo'))
536 raise error.RepoError(_('could not create remote repo'))
543
537
544 proc, stdin, stdout, stderr = _makeconnection(ui, sshcmd, args, remotecmd,
538 proc, stdin, stdout, stderr = _makeconnection(ui, sshcmd, args, remotecmd,
545 remotepath, sshenv)
539 remotepath, sshenv)
546
540
547 try:
541 try:
548 protoname, caps = _performhandshake(ui, stdin, stdout, stderr)
542 protoname, caps = _performhandshake(ui, stdin, stdout, stderr)
549 except Exception:
543 except Exception:
550 _cleanuppipes(ui, stdout, stdin, stderr)
544 _cleanuppipes(ui, stdout, stdin, stderr)
551 raise
545 raise
552
546
553 if protoname == wireprotoserver.SSHV1:
547 if protoname == wireprotoserver.SSHV1:
554 return sshv1peer(ui, path, proc, stdin, stdout, stderr, caps)
548 return sshv1peer(ui, path, proc, stdin, stdout, stderr, caps)
555 elif protoname == wireprotoserver.SSHV2:
549 elif protoname == wireprotoserver.SSHV2:
556 return sshv2peer(ui, path, proc, stdin, stdout, stderr, caps)
550 return sshv2peer(ui, path, proc, stdin, stdout, stderr, caps)
557 else:
551 else:
558 _cleanuppipes(ui, stdout, stdin, stderr)
552 _cleanuppipes(ui, stdout, stdin, stderr)
559 raise error.RepoError(_('unknown version of SSH protocol: %s') %
553 raise error.RepoError(_('unknown version of SSH protocol: %s') %
560 protoname)
554 protoname)
General Comments 0
You need to be logged in to leave comments. Login now