##// END OF EJS Templates
test-http-bad-server: use the new pattern-reading for a test-case...
test-http-bad-server: use the new pattern-reading for a test-case This test case is now less sensitive to change of unrelated bits of the client/server exchange. Since this introduce some churn in the output, we do it independently for each test cases. This patch is the last of such changes, for both sent and recv cases. Differential Revision: https://phab.mercurial-scm.org/D12073

File last commit:

r43346:2372284d default
r49488:009e8602 default
Show More
ssh.py
72 lines | 2.1 KiB | text/x-python | PythonLexer
Gregory Szorc
automation: initial support for running Linux tests...
r42471 # ssh.py - Interact with remote SSH servers
#
# Copyright 2019 Gregory Szorc <gregory.szorc@gmail.com>
#
# This software may be used and distributed according to the terms of the
# GNU General Public License version 2 or any later version.
# no-check-code because Python 3 native.
import socket
import time
import warnings
Augie Fackler
formatting: blacken the codebase...
r43346 from cryptography.utils import CryptographyDeprecationWarning
Gregory Szorc
automation: initial support for running Linux tests...
r42471 import paramiko
def wait_for_ssh(hostname, port, timeout=60, username=None, key_filename=None):
"""Wait for an SSH server to start on the specified host and port."""
Augie Fackler
formatting: blacken the codebase...
r43346
Gregory Szorc
automation: initial support for running Linux tests...
r42471 class IgnoreHostKeyPolicy(paramiko.MissingHostKeyPolicy):
def missing_host_key(self, client, hostname, key):
return
end_time = time.time() + timeout
# paramiko triggers a CryptographyDeprecationWarning in the cryptography
# package. Let's suppress
with warnings.catch_warnings():
Augie Fackler
formatting: blacken the codebase...
r43346 warnings.filterwarnings(
'ignore', category=CryptographyDeprecationWarning
)
Gregory Szorc
automation: initial support for running Linux tests...
r42471
while True:
client = paramiko.SSHClient()
client.set_missing_host_key_policy(IgnoreHostKeyPolicy())
try:
Augie Fackler
formatting: blacken the codebase...
r43346 client.connect(
hostname,
port=port,
username=username,
key_filename=key_filename,
timeout=5.0,
allow_agent=False,
look_for_keys=False,
)
Gregory Szorc
automation: initial support for running Linux tests...
r42471
return client
except socket.error:
pass
except paramiko.AuthenticationException:
raise
except paramiko.SSHException:
pass
if time.time() >= end_time:
raise Exception('Timeout reached waiting for SSH')
time.sleep(1.0)
def exec_command(client, command):
"""exec_command wrapper that combines stderr/stdout and returns channel"""
chan = client.get_transport().open_session()
chan.exec_command(command)
chan.set_combine_stderr(True)
stdin = chan.makefile('wb', -1)
stdout = chan.makefile('r', -1)
return chan, stdin, stdout