##// END OF EJS Templates
sshpeer: add a develwarning if an sshpeer is not closed explicitly...
Valentin Gatien-Baron -
r47418:db8037e3 default
parent child Browse files
Show More
@@ -1,90 +1,90 b''
1 1 # connectionpool.py - class for pooling peer connections for reuse
2 2 #
3 3 # Copyright 2017 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 from mercurial import (
11 11 extensions,
12 12 hg,
13 13 pycompat,
14 14 sshpeer,
15 15 util,
16 16 )
17 17
18 18 _sshv1peer = sshpeer.sshv1peer
19 19
20 20
21 21 class connectionpool(object):
22 22 def __init__(self, repo):
23 23 self._repo = repo
24 24 self._pool = dict()
25 25
26 26 def get(self, path):
27 27 pathpool = self._pool.get(path)
28 28 if pathpool is None:
29 29 pathpool = list()
30 30 self._pool[path] = pathpool
31 31
32 32 conn = None
33 33 if len(pathpool) > 0:
34 34 try:
35 35 conn = pathpool.pop()
36 36 peer = conn.peer
37 37 # If the connection has died, drop it
38 38 if isinstance(peer, _sshv1peer):
39 39 if peer._subprocess.poll() is not None:
40 40 conn = None
41 41 except IndexError:
42 42 pass
43 43
44 44 if conn is None:
45 45
46 46 peer = hg.peer(self._repo.ui, {}, path)
47 47 if util.safehasattr(peer, '_cleanup'):
48 48
49 49 class mypeer(peer.__class__):
50 def _cleanup(self):
50 def _cleanup(self, warn=None):
51 51 # close pipee first so peer.cleanup reading it won't
52 52 # deadlock, if there are other processes with pipeo
53 53 # open (i.e. us).
54 54 if util.safehasattr(self, 'pipee'):
55 55 self.pipee.close()
56 56 return super(mypeer, self)._cleanup()
57 57
58 58 peer.__class__ = mypeer
59 59
60 60 conn = connection(pathpool, peer)
61 61
62 62 return conn
63 63
64 64 def close(self):
65 65 for pathpool in pycompat.itervalues(self._pool):
66 66 for conn in pathpool:
67 67 conn.close()
68 68 del pathpool[:]
69 69
70 70
71 71 class connection(object):
72 72 def __init__(self, pool, peer):
73 73 self._pool = pool
74 74 self.peer = peer
75 75
76 76 def __enter__(self):
77 77 return self
78 78
79 79 def __exit__(self, type, value, traceback):
80 80 # Only add the connection back to the pool if there was no exception,
81 81 # since an exception could mean the connection is not in a reusable
82 82 # state.
83 83 if type is None:
84 84 self._pool.append(self)
85 85 else:
86 86 self.close()
87 87
88 88 def close(self):
89 89 if util.safehasattr(self.peer, 'cleanup'):
90 90 self.peer.cleanup()
@@ -1,711 +1,728 b''
1 1 # sshpeer.py - ssh repository proxy class for mercurial
2 2 #
3 3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import re
11 11 import uuid
12 12
13 13 from .i18n import _
14 14 from .pycompat import getattr
15 15 from . import (
16 16 error,
17 17 pycompat,
18 18 util,
19 19 wireprotoserver,
20 20 wireprototypes,
21 21 wireprotov1peer,
22 22 wireprotov1server,
23 23 )
24 24 from .utils import (
25 25 procutil,
26 26 stringutil,
27 27 )
28 28
29 29
30 30 def _serverquote(s):
31 31 """quote a string for the remote shell ... which we assume is sh"""
32 32 if not s:
33 33 return s
34 34 if re.match(b'[a-zA-Z0-9@%_+=:,./-]*$', s):
35 35 return s
36 36 return b"'%s'" % s.replace(b"'", b"'\\''")
37 37
38 38
39 39 def _forwardoutput(ui, pipe, warn=False):
40 40 """display all data currently available on pipe as remote output.
41 41
42 42 This is non blocking."""
43 43 if pipe:
44 44 s = procutil.readpipe(pipe)
45 45 if s:
46 46 display = ui.warn if warn else ui.status
47 47 for l in s.splitlines():
48 48 display(_(b"remote: "), l, b'\n')
49 49
50 50
51 51 class doublepipe(object):
52 52 """Operate a side-channel pipe in addition of a main one
53 53
54 54 The side-channel pipe contains server output to be forwarded to the user
55 55 input. The double pipe will behave as the "main" pipe, but will ensure the
56 56 content of the "side" pipe is properly processed while we wait for blocking
57 57 call on the "main" pipe.
58 58
59 59 If large amounts of data are read from "main", the forward will cease after
60 60 the first bytes start to appear. This simplifies the implementation
61 61 without affecting actual output of sshpeer too much as we rarely issue
62 62 large read for data not yet emitted by the server.
63 63
64 64 The main pipe is expected to be a 'bufferedinputpipe' from the util module
65 65 that handle all the os specific bits. This class lives in this module
66 66 because it focus on behavior specific to the ssh protocol."""
67 67
68 68 def __init__(self, ui, main, side):
69 69 self._ui = ui
70 70 self._main = main
71 71 self._side = side
72 72
73 73 def _wait(self):
74 74 """wait until some data are available on main or side
75 75
76 76 return a pair of boolean (ismainready, issideready)
77 77
78 78 (This will only wait for data if the setup is supported by `util.poll`)
79 79 """
80 80 if (
81 81 isinstance(self._main, util.bufferedinputpipe)
82 82 and self._main.hasbuffer
83 83 ):
84 84 # Main has data. Assume side is worth poking at.
85 85 return True, True
86 86
87 87 fds = [self._main.fileno(), self._side.fileno()]
88 88 try:
89 89 act = util.poll(fds)
90 90 except NotImplementedError:
91 91 # non supported yet case, assume all have data.
92 92 act = fds
93 93 return (self._main.fileno() in act, self._side.fileno() in act)
94 94
95 95 def write(self, data):
96 96 return self._call(b'write', data)
97 97
98 98 def read(self, size):
99 99 r = self._call(b'read', size)
100 100 if size != 0 and not r:
101 101 # We've observed a condition that indicates the
102 102 # stdout closed unexpectedly. Check stderr one
103 103 # more time and snag anything that's there before
104 104 # letting anyone know the main part of the pipe
105 105 # closed prematurely.
106 106 _forwardoutput(self._ui, self._side)
107 107 return r
108 108
109 109 def unbufferedread(self, size):
110 110 r = self._call(b'unbufferedread', size)
111 111 if size != 0 and not r:
112 112 # We've observed a condition that indicates the
113 113 # stdout closed unexpectedly. Check stderr one
114 114 # more time and snag anything that's there before
115 115 # letting anyone know the main part of the pipe
116 116 # closed prematurely.
117 117 _forwardoutput(self._ui, self._side)
118 118 return r
119 119
120 120 def readline(self):
121 121 return self._call(b'readline')
122 122
123 123 def _call(self, methname, data=None):
124 124 """call <methname> on "main", forward output of "side" while blocking"""
125 125 # data can be '' or 0
126 126 if (data is not None and not data) or self._main.closed:
127 127 _forwardoutput(self._ui, self._side)
128 128 return b''
129 129 while True:
130 130 mainready, sideready = self._wait()
131 131 if sideready:
132 132 _forwardoutput(self._ui, self._side)
133 133 if mainready:
134 134 meth = getattr(self._main, methname)
135 135 if data is None:
136 136 return meth()
137 137 else:
138 138 return meth(data)
139 139
140 140 def close(self):
141 141 return self._main.close()
142 142
143 143 @property
144 144 def closed(self):
145 145 return self._main.closed
146 146
147 147 def flush(self):
148 148 return self._main.flush()
149 149
150 150
151 def _cleanuppipes(ui, pipei, pipeo, pipee):
151 def _cleanuppipes(ui, pipei, pipeo, pipee, warn):
152 152 """Clean up pipes used by an SSH connection."""
153 if pipeo:
153 didsomething = False
154 if pipeo and not pipeo.closed:
155 didsomething = True
154 156 pipeo.close()
155 if pipei:
157 if pipei and not pipei.closed:
158 didsomething = True
156 159 pipei.close()
157 160
158 if pipee:
161 if pipee and not pipee.closed:
162 didsomething = True
159 163 # Try to read from the err descriptor until EOF.
160 164 try:
161 165 for l in pipee:
162 166 ui.status(_(b'remote: '), l)
163 167 except (IOError, ValueError):
164 168 pass
165 169
166 170 pipee.close()
167 171
172 if didsomething and warn is not None:
173 # Encourage explicit close of sshpeers. Closing via __del__ is
174 # not very predictable when exceptions are thrown, which has led
175 # to deadlocks due to a peer get gc'ed in a fork
176 # We add our own stack trace, because the stacktrace when called
177 # from __del__ is useless.
178 if False: # enabled in next commit
179 ui.develwarn(
180 b'missing close on SSH connection created at:\n%s' % warn
181 )
182
168 183
169 184 def _makeconnection(ui, sshcmd, args, remotecmd, path, sshenv=None):
170 185 """Create an SSH connection to a server.
171 186
172 187 Returns a tuple of (process, stdin, stdout, stderr) for the
173 188 spawned process.
174 189 """
175 190 cmd = b'%s %s %s' % (
176 191 sshcmd,
177 192 args,
178 193 procutil.shellquote(
179 194 b'%s -R %s serve --stdio'
180 195 % (_serverquote(remotecmd), _serverquote(path))
181 196 ),
182 197 )
183 198
184 199 ui.debug(b'running %s\n' % cmd)
185 200
186 201 # no buffer allow the use of 'select'
187 202 # feel free to remove buffering and select usage when we ultimately
188 203 # move to threading.
189 204 stdin, stdout, stderr, proc = procutil.popen4(cmd, bufsize=0, env=sshenv)
190 205
191 206 return proc, stdin, stdout, stderr
192 207
193 208
194 209 def _clientcapabilities():
195 210 """Return list of capabilities of this client.
196 211
197 212 Returns a list of capabilities that are supported by this client.
198 213 """
199 214 protoparams = {b'partial-pull'}
200 215 comps = [
201 216 e.wireprotosupport().name
202 217 for e in util.compengines.supportedwireengines(util.CLIENTROLE)
203 218 ]
204 219 protoparams.add(b'comp=%s' % b','.join(comps))
205 220 return protoparams
206 221
207 222
208 223 def _performhandshake(ui, stdin, stdout, stderr):
209 224 def badresponse():
210 225 # Flush any output on stderr. In general, the stderr contains errors
211 226 # from the remote (ssh errors, some hg errors), and status indications
212 227 # (like "adding changes"), with no current way to tell them apart.
213 228 # Here we failed so early that it's almost certainly only errors, so
214 229 # use warn=True so -q doesn't hide them.
215 230 _forwardoutput(ui, stderr, warn=True)
216 231
217 232 msg = _(b'no suitable response from remote hg')
218 233 hint = ui.config(b'ui', b'ssherrorhint')
219 234 raise error.RepoError(msg, hint=hint)
220 235
221 236 # The handshake consists of sending wire protocol commands in reverse
222 237 # order of protocol implementation and then sniffing for a response
223 238 # to one of them.
224 239 #
225 240 # Those commands (from oldest to newest) are:
226 241 #
227 242 # ``between``
228 243 # Asks for the set of revisions between a pair of revisions. Command
229 244 # present in all Mercurial server implementations.
230 245 #
231 246 # ``hello``
232 247 # Instructs the server to advertise its capabilities. Introduced in
233 248 # Mercurial 0.9.1.
234 249 #
235 250 # ``upgrade``
236 251 # Requests upgrade from default transport protocol version 1 to
237 252 # a newer version. Introduced in Mercurial 4.6 as an experimental
238 253 # feature.
239 254 #
240 255 # The ``between`` command is issued with a request for the null
241 256 # range. If the remote is a Mercurial server, this request will
242 257 # generate a specific response: ``1\n\n``. This represents the
243 258 # wire protocol encoded value for ``\n``. We look for ``1\n\n``
244 259 # in the output stream and know this is the response to ``between``
245 260 # and we're at the end of our handshake reply.
246 261 #
247 262 # The response to the ``hello`` command will be a line with the
248 263 # length of the value returned by that command followed by that
249 264 # value. If the server doesn't support ``hello`` (which should be
250 265 # rare), that line will be ``0\n``. Otherwise, the value will contain
251 266 # RFC 822 like lines. Of these, the ``capabilities:`` line contains
252 267 # the capabilities of the server.
253 268 #
254 269 # The ``upgrade`` command isn't really a command in the traditional
255 270 # sense of version 1 of the transport because it isn't using the
256 271 # proper mechanism for formatting insteads: instead, it just encodes
257 272 # arguments on the line, delimited by spaces.
258 273 #
259 274 # The ``upgrade`` line looks like ``upgrade <token> <capabilities>``.
260 275 # If the server doesn't support protocol upgrades, it will reply to
261 276 # this line with ``0\n``. Otherwise, it emits an
262 277 # ``upgraded <token> <protocol>`` line to both stdout and stderr.
263 278 # Content immediately following this line describes additional
264 279 # protocol and server state.
265 280 #
266 281 # In addition to the responses to our command requests, the server
267 282 # may emit "banner" output on stdout. SSH servers are allowed to
268 283 # print messages to stdout on login. Issuing commands on connection
269 284 # allows us to flush this banner output from the server by scanning
270 285 # for output to our well-known ``between`` command. Of course, if
271 286 # the banner contains ``1\n\n``, this will throw off our detection.
272 287
273 288 requestlog = ui.configbool(b'devel', b'debug.peer-request')
274 289
275 290 # Generate a random token to help identify responses to version 2
276 291 # upgrade request.
277 292 token = pycompat.sysbytes(str(uuid.uuid4()))
278 293 upgradecaps = [
279 294 (b'proto', wireprotoserver.SSHV2),
280 295 ]
281 296 upgradecaps = util.urlreq.urlencode(upgradecaps)
282 297
283 298 try:
284 299 pairsarg = b'%s-%s' % (b'0' * 40, b'0' * 40)
285 300 handshake = [
286 301 b'hello\n',
287 302 b'between\n',
288 303 b'pairs %d\n' % len(pairsarg),
289 304 pairsarg,
290 305 ]
291 306
292 307 # Request upgrade to version 2 if configured.
293 308 if ui.configbool(b'experimental', b'sshpeer.advertise-v2'):
294 309 ui.debug(b'sending upgrade request: %s %s\n' % (token, upgradecaps))
295 310 handshake.insert(0, b'upgrade %s %s\n' % (token, upgradecaps))
296 311
297 312 if requestlog:
298 313 ui.debug(b'devel-peer-request: hello+between\n')
299 314 ui.debug(b'devel-peer-request: pairs: %d bytes\n' % len(pairsarg))
300 315 ui.debug(b'sending hello command\n')
301 316 ui.debug(b'sending between command\n')
302 317
303 318 stdin.write(b''.join(handshake))
304 319 stdin.flush()
305 320 except IOError:
306 321 badresponse()
307 322
308 323 # Assume version 1 of wire protocol by default.
309 324 protoname = wireprototypes.SSHV1
310 325 reupgraded = re.compile(b'^upgraded %s (.*)$' % stringutil.reescape(token))
311 326
312 327 lines = [b'', b'dummy']
313 328 max_noise = 500
314 329 while lines[-1] and max_noise:
315 330 try:
316 331 l = stdout.readline()
317 332 _forwardoutput(ui, stderr, warn=True)
318 333
319 334 # Look for reply to protocol upgrade request. It has a token
320 335 # in it, so there should be no false positives.
321 336 m = reupgraded.match(l)
322 337 if m:
323 338 protoname = m.group(1)
324 339 ui.debug(b'protocol upgraded to %s\n' % protoname)
325 340 # If an upgrade was handled, the ``hello`` and ``between``
326 341 # requests are ignored. The next output belongs to the
327 342 # protocol, so stop scanning lines.
328 343 break
329 344
330 345 # Otherwise it could be a banner, ``0\n`` response if server
331 346 # doesn't support upgrade.
332 347
333 348 if lines[-1] == b'1\n' and l == b'\n':
334 349 break
335 350 if l:
336 351 ui.debug(b'remote: ', l)
337 352 lines.append(l)
338 353 max_noise -= 1
339 354 except IOError:
340 355 badresponse()
341 356 else:
342 357 badresponse()
343 358
344 359 caps = set()
345 360
346 361 # For version 1, we should see a ``capabilities`` line in response to the
347 362 # ``hello`` command.
348 363 if protoname == wireprototypes.SSHV1:
349 364 for l in reversed(lines):
350 365 # Look for response to ``hello`` command. Scan from the back so
351 366 # we don't misinterpret banner output as the command reply.
352 367 if l.startswith(b'capabilities:'):
353 368 caps.update(l[:-1].split(b':')[1].split())
354 369 break
355 370 elif protoname == wireprotoserver.SSHV2:
356 371 # We see a line with number of bytes to follow and then a value
357 372 # looking like ``capabilities: *``.
358 373 line = stdout.readline()
359 374 try:
360 375 valuelen = int(line)
361 376 except ValueError:
362 377 badresponse()
363 378
364 379 capsline = stdout.read(valuelen)
365 380 if not capsline.startswith(b'capabilities: '):
366 381 badresponse()
367 382
368 383 ui.debug(b'remote: %s\n' % capsline)
369 384
370 385 caps.update(capsline.split(b':')[1].split())
371 386 # Trailing newline.
372 387 stdout.read(1)
373 388
374 389 # Error if we couldn't find capabilities, this means:
375 390 #
376 391 # 1. Remote isn't a Mercurial server
377 392 # 2. Remote is a <0.9.1 Mercurial server
378 393 # 3. Remote is a future Mercurial server that dropped ``hello``
379 394 # and other attempted handshake mechanisms.
380 395 if not caps:
381 396 badresponse()
382 397
383 398 # Flush any output on stderr before proceeding.
384 399 _forwardoutput(ui, stderr, warn=True)
385 400
386 401 return protoname, caps
387 402
388 403
389 404 class sshv1peer(wireprotov1peer.wirepeer):
390 405 def __init__(
391 406 self, ui, url, proc, stdin, stdout, stderr, caps, autoreadstderr=True
392 407 ):
393 408 """Create a peer from an existing SSH connection.
394 409
395 410 ``proc`` is a handle on the underlying SSH process.
396 411 ``stdin``, ``stdout``, and ``stderr`` are handles on the stdio
397 412 pipes for that process.
398 413 ``caps`` is a set of capabilities supported by the remote.
399 414 ``autoreadstderr`` denotes whether to automatically read from
400 415 stderr and to forward its output.
401 416 """
402 417 self._url = url
403 418 self.ui = ui
404 419 # self._subprocess is unused. Keeping a handle on the process
405 420 # holds a reference and prevents it from being garbage collected.
406 421 self._subprocess = proc
407 422
408 423 # And we hook up our "doublepipe" wrapper to allow querying
409 424 # stderr any time we perform I/O.
410 425 if autoreadstderr:
411 426 stdout = doublepipe(ui, util.bufferedinputpipe(stdout), stderr)
412 427 stdin = doublepipe(ui, stdin, stderr)
413 428
414 429 self._pipeo = stdin
415 430 self._pipei = stdout
416 431 self._pipee = stderr
417 432 self._caps = caps
418 433 self._autoreadstderr = autoreadstderr
434 self._initstack = b''.join(util.getstackframes(1))
419 435
420 436 # Commands that have a "framed" response where the first line of the
421 437 # response contains the length of that response.
422 438 _FRAMED_COMMANDS = {
423 439 b'batch',
424 440 }
425 441
426 442 # Begin of ipeerconnection interface.
427 443
428 444 def url(self):
429 445 return self._url
430 446
431 447 def local(self):
432 448 return None
433 449
434 450 def peer(self):
435 451 return self
436 452
437 453 def canpush(self):
438 454 return True
439 455
440 456 def close(self):
441 457 self._cleanup()
442 458
443 459 # End of ipeerconnection interface.
444 460
445 461 # Begin of ipeercommands interface.
446 462
447 463 def capabilities(self):
448 464 return self._caps
449 465
450 466 # End of ipeercommands interface.
451 467
452 468 def _readerr(self):
453 469 _forwardoutput(self.ui, self._pipee)
454 470
455 471 def _abort(self, exception):
456 472 self._cleanup()
457 473 raise exception
458 474
459 def _cleanup(self):
460 _cleanuppipes(self.ui, self._pipei, self._pipeo, self._pipee)
475 def _cleanup(self, warn=None):
476 _cleanuppipes(self.ui, self._pipei, self._pipeo, self._pipee, warn=warn)
461 477
462 __del__ = _cleanup
478 def __del__(self):
479 self._cleanup(warn=self._initstack)
463 480
464 481 def _sendrequest(self, cmd, args, framed=False):
465 482 if self.ui.debugflag and self.ui.configbool(
466 483 b'devel', b'debug.peer-request'
467 484 ):
468 485 dbg = self.ui.debug
469 486 line = b'devel-peer-request: %s\n'
470 487 dbg(line % cmd)
471 488 for key, value in sorted(args.items()):
472 489 if not isinstance(value, dict):
473 490 dbg(line % b' %s: %d bytes' % (key, len(value)))
474 491 else:
475 492 for dk, dv in sorted(value.items()):
476 493 dbg(line % b' %s-%s: %d' % (key, dk, len(dv)))
477 494 self.ui.debug(b"sending %s command\n" % cmd)
478 495 self._pipeo.write(b"%s\n" % cmd)
479 496 _func, names = wireprotov1server.commands[cmd]
480 497 keys = names.split()
481 498 wireargs = {}
482 499 for k in keys:
483 500 if k == b'*':
484 501 wireargs[b'*'] = args
485 502 break
486 503 else:
487 504 wireargs[k] = args[k]
488 505 del args[k]
489 506 for k, v in sorted(pycompat.iteritems(wireargs)):
490 507 self._pipeo.write(b"%s %d\n" % (k, len(v)))
491 508 if isinstance(v, dict):
492 509 for dk, dv in pycompat.iteritems(v):
493 510 self._pipeo.write(b"%s %d\n" % (dk, len(dv)))
494 511 self._pipeo.write(dv)
495 512 else:
496 513 self._pipeo.write(v)
497 514 self._pipeo.flush()
498 515
499 516 # We know exactly how many bytes are in the response. So return a proxy
500 517 # around the raw output stream that allows reading exactly this many
501 518 # bytes. Callers then can read() without fear of overrunning the
502 519 # response.
503 520 if framed:
504 521 amount = self._getamount()
505 522 return util.cappedreader(self._pipei, amount)
506 523
507 524 return self._pipei
508 525
509 526 def _callstream(self, cmd, **args):
510 527 args = pycompat.byteskwargs(args)
511 528 return self._sendrequest(cmd, args, framed=cmd in self._FRAMED_COMMANDS)
512 529
513 530 def _callcompressable(self, cmd, **args):
514 531 args = pycompat.byteskwargs(args)
515 532 return self._sendrequest(cmd, args, framed=cmd in self._FRAMED_COMMANDS)
516 533
517 534 def _call(self, cmd, **args):
518 535 args = pycompat.byteskwargs(args)
519 536 return self._sendrequest(cmd, args, framed=True).read()
520 537
521 538 def _callpush(self, cmd, fp, **args):
522 539 # The server responds with an empty frame if the client should
523 540 # continue submitting the payload.
524 541 r = self._call(cmd, **args)
525 542 if r:
526 543 return b'', r
527 544
528 545 # The payload consists of frames with content followed by an empty
529 546 # frame.
530 547 for d in iter(lambda: fp.read(4096), b''):
531 548 self._writeframed(d)
532 549 self._writeframed(b"", flush=True)
533 550
534 551 # In case of success, there is an empty frame and a frame containing
535 552 # the integer result (as a string).
536 553 # In case of error, there is a non-empty frame containing the error.
537 554 r = self._readframed()
538 555 if r:
539 556 return b'', r
540 557 return self._readframed(), b''
541 558
542 559 def _calltwowaystream(self, cmd, fp, **args):
543 560 # The server responds with an empty frame if the client should
544 561 # continue submitting the payload.
545 562 r = self._call(cmd, **args)
546 563 if r:
547 564 # XXX needs to be made better
548 565 raise error.Abort(_(b'unexpected remote reply: %s') % r)
549 566
550 567 # The payload consists of frames with content followed by an empty
551 568 # frame.
552 569 for d in iter(lambda: fp.read(4096), b''):
553 570 self._writeframed(d)
554 571 self._writeframed(b"", flush=True)
555 572
556 573 return self._pipei
557 574
558 575 def _getamount(self):
559 576 l = self._pipei.readline()
560 577 if l == b'\n':
561 578 if self._autoreadstderr:
562 579 self._readerr()
563 580 msg = _(b'check previous remote output')
564 581 self._abort(error.OutOfBandError(hint=msg))
565 582 if self._autoreadstderr:
566 583 self._readerr()
567 584 try:
568 585 return int(l)
569 586 except ValueError:
570 587 self._abort(error.ResponseError(_(b"unexpected response:"), l))
571 588
572 589 def _readframed(self):
573 590 size = self._getamount()
574 591 if not size:
575 592 return b''
576 593
577 594 return self._pipei.read(size)
578 595
579 596 def _writeframed(self, data, flush=False):
580 597 self._pipeo.write(b"%d\n" % len(data))
581 598 if data:
582 599 self._pipeo.write(data)
583 600 if flush:
584 601 self._pipeo.flush()
585 602 if self._autoreadstderr:
586 603 self._readerr()
587 604
588 605
589 606 class sshv2peer(sshv1peer):
590 607 """A peer that speakers version 2 of the transport protocol."""
591 608
592 609 # Currently version 2 is identical to version 1 post handshake.
593 610 # And handshake is performed before the peer is instantiated. So
594 611 # we need no custom code.
595 612
596 613
597 614 def makepeer(ui, path, proc, stdin, stdout, stderr, autoreadstderr=True):
598 615 """Make a peer instance from existing pipes.
599 616
600 617 ``path`` and ``proc`` are stored on the eventual peer instance and may
601 618 not be used for anything meaningful.
602 619
603 620 ``stdin``, ``stdout``, and ``stderr`` are the pipes connected to the
604 621 SSH server's stdio handles.
605 622
606 623 This function is factored out to allow creating peers that don't
607 624 actually spawn a new process. It is useful for starting SSH protocol
608 625 servers and clients via non-standard means, which can be useful for
609 626 testing.
610 627 """
611 628 try:
612 629 protoname, caps = _performhandshake(ui, stdin, stdout, stderr)
613 630 except Exception:
614 _cleanuppipes(ui, stdout, stdin, stderr)
631 _cleanuppipes(ui, stdout, stdin, stderr, warn=None)
615 632 raise
616 633
617 634 if protoname == wireprototypes.SSHV1:
618 635 return sshv1peer(
619 636 ui,
620 637 path,
621 638 proc,
622 639 stdin,
623 640 stdout,
624 641 stderr,
625 642 caps,
626 643 autoreadstderr=autoreadstderr,
627 644 )
628 645 elif protoname == wireprototypes.SSHV2:
629 646 return sshv2peer(
630 647 ui,
631 648 path,
632 649 proc,
633 650 stdin,
634 651 stdout,
635 652 stderr,
636 653 caps,
637 654 autoreadstderr=autoreadstderr,
638 655 )
639 656 else:
640 _cleanuppipes(ui, stdout, stdin, stderr)
657 _cleanuppipes(ui, stdout, stdin, stderr, warn=None)
641 658 raise error.RepoError(
642 659 _(b'unknown version of SSH protocol: %s') % protoname
643 660 )
644 661
645 662
646 663 def instance(ui, path, create, intents=None, createopts=None):
647 664 """Create an SSH peer.
648 665
649 666 The returned object conforms to the ``wireprotov1peer.wirepeer`` interface.
650 667 """
651 668 u = util.url(path, parsequery=False, parsefragment=False)
652 669 if u.scheme != b'ssh' or not u.host or u.path is None:
653 670 raise error.RepoError(_(b"couldn't parse location %s") % path)
654 671
655 672 util.checksafessh(path)
656 673
657 674 if u.passwd is not None:
658 675 raise error.RepoError(_(b'password in URL not supported'))
659 676
660 677 sshcmd = ui.config(b'ui', b'ssh')
661 678 remotecmd = ui.config(b'ui', b'remotecmd')
662 679 sshaddenv = dict(ui.configitems(b'sshenv'))
663 680 sshenv = procutil.shellenviron(sshaddenv)
664 681 remotepath = u.path or b'.'
665 682
666 683 args = procutil.sshargs(sshcmd, u.host, u.user, u.port)
667 684
668 685 if create:
669 686 # We /could/ do this, but only if the remote init command knows how to
670 687 # handle them. We don't yet make any assumptions about that. And without
671 688 # querying the remote, there's no way of knowing if the remote even
672 689 # supports said requested feature.
673 690 if createopts:
674 691 raise error.RepoError(
675 692 _(
676 693 b'cannot create remote SSH repositories '
677 694 b'with extra options'
678 695 )
679 696 )
680 697
681 698 cmd = b'%s %s %s' % (
682 699 sshcmd,
683 700 args,
684 701 procutil.shellquote(
685 702 b'%s init %s'
686 703 % (_serverquote(remotecmd), _serverquote(remotepath))
687 704 ),
688 705 )
689 706 ui.debug(b'running %s\n' % cmd)
690 707 res = ui.system(cmd, blockedtag=b'sshpeer', environ=sshenv)
691 708 if res != 0:
692 709 raise error.RepoError(_(b'could not create remote repo'))
693 710
694 711 proc, stdin, stdout, stderr = _makeconnection(
695 712 ui, sshcmd, args, remotecmd, remotepath, sshenv
696 713 )
697 714
698 715 peer = makepeer(ui, path, proc, stdin, stdout, stderr)
699 716
700 717 # Finally, if supported by the server, notify it about our own
701 718 # capabilities.
702 719 if b'protocaps' in peer.capabilities():
703 720 try:
704 721 peer._call(
705 722 b"protocaps", caps=b' '.join(sorted(_clientcapabilities()))
706 723 )
707 724 except IOError:
708 725 peer._cleanup()
709 726 raise error.RepoError(_(b'capability exchange failed'))
710 727
711 728 return peer
General Comments 0
You need to be logged in to leave comments. Login now