From 52dffc0674e94e3df90e37e2c89100c29b61e696 2011-08-16 23:43:03 From: Fernando Perez Date: 2011-08-16 23:43:03 Subject: [PATCH] Merge pull request #685 from minrk/enginessh Add SSH tunneling to engines --- diff --git a/IPython/external/ssh/tunnel.py b/IPython/external/ssh/tunnel.py index fc073b3..871b557 100644 --- a/IPython/external/ssh/tunnel.py +++ b/IPython/external/ssh/tunnel.py @@ -110,6 +110,22 @@ def tunnel_connection(socket, addr, server, keyfile=None, password=None, paramik selected local port of the tunnel. """ + new_url, tunnel = open_tunnel(addr, server, keyfile=keyfile, password=password, paramiko=paramiko) + socket.connect(new_url) + return tunnel + + +def open_tunnel(addr, server, keyfile=None, password=None, paramiko=None): + """Open a tunneled connection from a 0MQ url. + + For use inside tunnel_connection. + + Returns + ------- + + (url, tunnel): The 0MQ url that has been forwarded, and the tunnel object + """ + lport = select_random_ports(1)[0] transport, addr = addr.split('://') ip,rport = addr.split(':') @@ -121,8 +137,7 @@ def tunnel_connection(socket, addr, server, keyfile=None, password=None, paramik else: tunnelf = openssh_tunnel tunnel = tunnelf(lport, rport, server, remoteip=ip, keyfile=keyfile, password=password) - socket.connect('tcp://127.0.0.1:%i'%lport) - return tunnel + return 'tcp://127.0.0.1:%i'%lport, tunnel def openssh_tunnel(lport, rport, server, remoteip='127.0.0.1', keyfile=None, password=None, timeout=15): """Create an ssh tunnel using command-line ssh that connects port lport diff --git a/IPython/parallel/apps/ipcontrollerapp.py b/IPython/parallel/apps/ipcontrollerapp.py index 695ca17..362318c 100755 --- a/IPython/parallel/apps/ipcontrollerapp.py +++ b/IPython/parallel/apps/ipcontrollerapp.py @@ -116,6 +116,7 @@ flags.update(boolean_flag('secure', 'IPControllerApp.secure', aliases = dict( secure = 'IPControllerApp.secure', ssh = 'IPControllerApp.ssh_server', + enginessh = 'IPControllerApp.engine_ssh_server', location = 'IPControllerApp.location', ident = 'Session.session', @@ -158,6 +159,11 @@ class IPControllerApp(BaseParallelApplication): processes. It should be of the form: [user@]server[:port]. The Controller's listening addresses must be accessible from the ssh server""", ) + engine_ssh_server = Unicode(u'', config=True, + help="""ssh url for engines to use when connecting to the Controller + processes. It should be of the form: [user@]server[:port]. The + Controller's listening addresses must be accessible from the ssh server""", + ) location = Unicode(u'', config=True, help="""The external IP or domain name of the Controller, used for disambiguating engine and client connections.""", @@ -218,6 +224,8 @@ class IPControllerApp(BaseParallelApplication): c.HubFactory.engine_ip = ip c.HubFactory.regport = int(ports) self.location = cfg['location'] + if not self.engine_ssh_server: + self.engine_ssh_server = cfg['ssh'] # load client config with open(os.path.join(self.profile_dir.security_dir, 'ipcontroller-client.json')) as f: cfg = json.loads(f.read()) @@ -226,7 +234,8 @@ class IPControllerApp(BaseParallelApplication): c.HubFactory.client_transport = xport ip,ports = addr.split(':') c.HubFactory.client_ip = ip - self.ssh_server = cfg['ssh'] + if not self.ssh_server: + self.ssh_server = cfg['ssh'] assert int(ports) == c.HubFactory.regport, "regport mismatch" def init_hub(self): @@ -271,6 +280,7 @@ class IPControllerApp(BaseParallelApplication): self.save_connection_dict('ipcontroller-client.json', cdict) edict = cdict edict['url']="%s://%s:%s"%((f.client_transport, f.client_ip, f.regport)) + edict['ssh'] = self.engine_ssh_server self.save_connection_dict('ipcontroller-engine.json', edict) # diff --git a/IPython/parallel/apps/ipengineapp.py b/IPython/parallel/apps/ipengineapp.py index 1263d91..43fca7a 100755 --- a/IPython/parallel/apps/ipengineapp.py +++ b/IPython/parallel/apps/ipengineapp.py @@ -118,6 +118,8 @@ aliases = dict( keyfile = 'Session.keyfile', url = 'EngineFactory.url', + ssh = 'EngineFactory.sshserver', + sshkey = 'EngineFactory.sshkey', ip = 'EngineFactory.ip', transport = 'EngineFactory.transport', port = 'EngineFactory.regport', @@ -192,6 +194,40 @@ class IPEngineApp(BaseParallelApplication): self.profile_dir.security_dir, self.url_file_name ) + + def load_connector_file(self): + """load config from a JSON connector file, + at a *lower* priority than command-line/config files. + """ + + self.log.info("Loading url_file %r"%self.url_file) + config = self.config + + with open(self.url_file) as f: + d = json.loads(f.read()) + + try: + config.Session.key + except AttributeError: + if d['exec_key']: + config.Session.key = asbytes(d['exec_key']) + + try: + config.EngineFactory.location + except AttributeError: + config.EngineFactory.location = d['location'] + + d['url'] = disambiguate_url(d['url'], config.EngineFactory.location) + try: + config.EngineFactory.url + except AttributeError: + config.EngineFactory.url = d['url'] + + try: + config.EngineFactory.sshserver + except AttributeError: + config.EngineFactory.sshserver = d['ssh'] + def init_engine(self): # This is the working dir by now. sys.path.insert(0, '') @@ -219,14 +255,7 @@ class IPEngineApp(BaseParallelApplication): time.sleep(0.1) if os.path.exists(self.url_file): - self.log.info("Loading url_file %r"%self.url_file) - with open(self.url_file) as f: - d = json.loads(f.read()) - if d['exec_key']: - config.Session.key = asbytes(d['exec_key']) - d['url'] = disambiguate_url(d['url'], d['location']) - config.EngineFactory.url = d['url'] - config.EngineFactory.location = d['location'] + self.load_connector_file() elif not url_specified: self.log.critical("Fatal: url file never arrived: %s"%self.url_file) self.exit(1) @@ -253,7 +282,7 @@ class IPEngineApp(BaseParallelApplication): except: self.log.error("Couldn't start the Engine", exc_info=True) self.exit(1) - + def forward_logging(self): if self.log_url: self.log.info("Forwarding logging to %s"%self.log_url) @@ -265,7 +294,7 @@ class IPEngineApp(BaseParallelApplication): handler.setLevel(self.log_level) self.log.addHandler(handler) self._log_handler = handler - # + def init_mpi(self): global mpi self.mpi = MPI(config=self.config) diff --git a/IPython/parallel/apps/launcher.py b/IPython/parallel/apps/launcher.py index 493c766..2efee30 100644 --- a/IPython/parallel/apps/launcher.py +++ b/IPython/parallel/apps/launcher.py @@ -56,7 +56,7 @@ from zmq.eventloop import ioloop from IPython.config.application import Application from IPython.config.configurable import LoggingConfigurable from IPython.utils.text import EvalFormatter -from IPython.utils.traitlets import Any, Int, List, Unicode, Dict, Instance +from IPython.utils.traitlets import Any, Int, CFloat, List, Unicode, Dict, Instance from IPython.utils.path import get_ipython_module_path from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError @@ -364,6 +364,12 @@ class LocalEngineSetLauncher(BaseLauncher): ['--log-to-file','--log-level=%i'%logging.INFO], config=True, help="command-line arguments to pass to ipengine" ) + delay = CFloat(0.1, config=True, + help="""delay (in seconds) between starting each engine after the first. + This can help force the engines to get their ids in order, or limit + process flood when starting many engines.""" + ) + # launcher class launcher_class = LocalEngineLauncher @@ -381,6 +387,8 @@ class LocalEngineSetLauncher(BaseLauncher): self.profile_dir = unicode(profile_dir) dlist = [] for i in range(n): + if i > 0: + time.sleep(self.delay) el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log) # Copy the engine args over to each engine launcher. el.engine_args = copy.deepcopy(self.engine_args) @@ -603,6 +611,8 @@ class SSHEngineSetLauncher(LocalEngineSetLauncher): else: user=None for i in range(n): + if i > 0: + time.sleep(self.delay) el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log) # Copy the engine args over to each engine launcher. diff --git a/IPython/parallel/client/client.py b/IPython/parallel/client/client.py index 846b8eb..f6d4698 100644 --- a/IPython/parallel/client/client.py +++ b/IPython/parallel/client/client.py @@ -171,7 +171,7 @@ class Client(HasTraits): A string of the form passed to ssh, i.e. 'server.tld' or 'user@server.tld:port' If keyfile or password is specified, and this is not, it will default to the ip given in addr. - sshkey : str; path to public ssh key file + sshkey : str; path to ssh private key file This specifies a key to be used in ssh login, default None. Regular default ssh keys will be used without specifying this argument. password : str diff --git a/IPython/parallel/engine/engine.py b/IPython/parallel/engine/engine.py index 59ef87f..7cf07ef 100755 --- a/IPython/parallel/engine/engine.py +++ b/IPython/parallel/engine/engine.py @@ -17,12 +17,16 @@ from __future__ import print_function import sys import time +from getpass import getpass import zmq from zmq.eventloop import ioloop, zmqstream +from IPython.external.ssh import tunnel # internal -from IPython.utils.traitlets import Instance, Dict, Int, Type, CFloat, Unicode, CBytes +from IPython.utils.traitlets import ( + Instance, Dict, Int, Type, CFloat, Unicode, CBytes, Bool +) # from IPython.utils.localinterfaces import LOCALHOST from IPython.parallel.controller.heartmonitor import Heart @@ -50,6 +54,12 @@ class EngineFactory(RegistrationFactory): timeout=CFloat(2,config=True, help="""The time (in seconds) to wait for the Controller to respond to registration requests before giving up.""") + sshserver=Unicode(config=True, + help="""The SSH server to use for tunneling connections to the Controller.""") + sshkey=Unicode(config=True, + help="""The SSH private key file to use when tunneling connections to the Controller.""") + paramiko=Bool(sys.platform == 'win32', config=True, + help="""Whether to use paramiko instead of openssh for tunnels.""") # not configurable: user_ns=Dict() @@ -61,28 +71,70 @@ class EngineFactory(RegistrationFactory): ident = Unicode() def _ident_changed(self, name, old, new): self.bident = asbytes(new) + using_ssh=Bool(False) def __init__(self, **kwargs): super(EngineFactory, self).__init__(**kwargs) self.ident = self.session.session - ctx = self.context + + def init_connector(self): + """construct connection function, which handles tunnels.""" + self.using_ssh = bool(self.sshkey or self.sshserver) - reg = ctx.socket(zmq.XREQ) - reg.setsockopt(zmq.IDENTITY, self.bident) - reg.connect(self.url) - self.registrar = zmqstream.ZMQStream(reg, self.loop) + if self.sshkey and not self.sshserver: + # We are using ssh directly to the controller, tunneling localhost to localhost + self.sshserver = self.url.split('://')[1].split(':')[0] + + if self.using_ssh: + if tunnel.try_passwordless_ssh(self.sshserver, self.sshkey, self.paramiko): + password=False + else: + password = getpass("SSH Password for %s: "%self.sshserver) + else: + password = False + + def connect(s, url): + url = disambiguate_url(url, self.location) + if self.using_ssh: + self.log.debug("Tunneling connection to %s via %s"%(url, self.sshserver)) + return tunnel.tunnel_connection(s, url, self.sshserver, + keyfile=self.sshkey, paramiko=self.paramiko, + password=password, + ) + else: + return s.connect(url) + + def maybe_tunnel(url): + """like connect, but don't complete the connection (for use by heartbeat)""" + url = disambiguate_url(url, self.location) + if self.using_ssh: + self.log.debug("Tunneling connection to %s via %s"%(url, self.sshserver)) + url,tunnelobj = tunnel.open_tunnel(url, self.sshserver, + keyfile=self.sshkey, paramiko=self.paramiko, + password=password, + ) + return url + return connect, maybe_tunnel def register(self): """send the registration_request""" self.log.info("Registering with controller at %s"%self.url) + ctx = self.context + connect,maybe_tunnel = self.init_connector() + reg = ctx.socket(zmq.XREQ) + reg.setsockopt(zmq.IDENTITY, self.bident) + connect(reg, self.url) + self.registrar = zmqstream.ZMQStream(reg, self.loop) + + content = dict(queue=self.ident, heartbeat=self.ident, control=self.ident) - self.registrar.on_recv(self.complete_registration) + self.registrar.on_recv(lambda msg: self.complete_registration(msg, connect, maybe_tunnel)) # print (self.session.key) self.session.send(self.registrar, "registration_request",content=content) - def complete_registration(self, msg): + def complete_registration(self, msg, connect, maybe_tunnel): # print msg self._abort_dc.stop() ctx = self.context @@ -94,6 +146,14 @@ class EngineFactory(RegistrationFactory): if msg.content.status == 'ok': self.id = int(msg.content.id) + # launch heartbeat + hb_addrs = msg.content.heartbeat + + # possibly forward hb ports with tunnels + hb_addrs = [ maybe_tunnel(addr) for addr in hb_addrs ] + heart = Heart(*map(str, hb_addrs), heart_id=identity) + heart.start() + # create Shell Streams (MUX, Task, etc.): queue_addr = msg.content.mux shell_addrs = [ str(queue_addr) ] @@ -114,24 +174,20 @@ class EngineFactory(RegistrationFactory): stream.setsockopt(zmq.IDENTITY, identity) shell_streams = [stream] for addr in shell_addrs: - stream.connect(disambiguate_url(addr, self.location)) + connect(stream, addr) # end single stream-socket # control stream: control_addr = str(msg.content.control) control_stream = zmqstream.ZMQStream(ctx.socket(zmq.XREP), loop) control_stream.setsockopt(zmq.IDENTITY, identity) - control_stream.connect(disambiguate_url(control_addr, self.location)) + connect(control_stream, control_addr) # create iopub stream: iopub_addr = msg.content.iopub iopub_stream = zmqstream.ZMQStream(ctx.socket(zmq.PUB), loop) iopub_stream.setsockopt(zmq.IDENTITY, identity) - iopub_stream.connect(disambiguate_url(iopub_addr, self.location)) - - # launch heartbeat - hb_addrs = msg.content.heartbeat - # print (hb_addrs) + connect(iopub_stream, iopub_addr) # # Redirect input streams and set a display hook. if self.out_stream_factory: @@ -147,9 +203,6 @@ class EngineFactory(RegistrationFactory): control_stream=control_stream, shell_streams=shell_streams, iopub_stream=iopub_stream, loop=loop, user_ns = self.user_ns, log=self.log) self.kernel.start() - hb_addrs = [ disambiguate_url(addr, self.location) for addr in hb_addrs ] - heart = Heart(*map(str, hb_addrs), heart_id=identity) - heart.start() else: diff --git a/docs/source/parallel/parallel_process.txt b/docs/source/parallel/parallel_process.txt index ffa2df1..851e619 100644 --- a/docs/source/parallel/parallel_process.txt +++ b/docs/source/parallel/parallel_process.txt @@ -484,6 +484,49 @@ The ``file`` flag works like this:: (:file:`~/.ipython/profile_/security` is the same on all of them), then things will just work! +SSH Tunnels +*********** + +If your engines are not on the same LAN as the controller, or you are on a highly +restricted network where your nodes cannot see each others ports, then you can +use SSH tunnels to connect engines to the controller. + +.. note:: + + This does not work in all cases. Manual tunnels may be an option, but are + highly inconvenient. Support for manual tunnels will be improved. + +You can instruct all engines to use ssh, by specifying the ssh server in +:file:`ipcontroller-engine.json`: + +.. I know this is really JSON, but the example is a subset of Python: +.. sourcecode:: python + + { + "url":"tcp://192.168.1.123:56951", + "exec_key":"26f4c040-587d-4a4e-b58b-030b96399584", + "ssh":"user@example.com", + "location":"192.168.1.123" + } + +This will be specified if you give the ``--enginessh=use@example.com`` argument when +starting :command:`ipcontroller`. + +Or you can specify an ssh server on the command-line when starting an engine:: + + $> ipengine --profile=foo --ssh=my.login.node + +For example, if your system is totally restricted, then all connections will actually be +loopback, and ssh tunnels will be used to connect engines to the controller:: + + [node1] $> ipcontroller --enginessh=node1 + [node2] $> ipengine + [node3] $> ipcluster engines --n=4 + +Or if you want to start many engines on each node, the command `ipcluster engines --n=4` +without any configuration is equivalent to running ipengine 4 times. + + Make JSON files persistent -------------------------- diff --git a/docs/source/parallel/parallel_security.txt b/docs/source/parallel/parallel_security.txt index 49f78b7..ea9338e 100644 --- a/docs/source/parallel/parallel_security.txt +++ b/docs/source/parallel/parallel_security.txt @@ -105,9 +105,6 @@ use OpenSSH or Paramiko, or the tunneling utilities are insufficient, then they construct the tunnels themselves, and simply connect clients and engines as if the controller were on loopback on the connecting machine. -.. note:: - - There is not currently tunneling available for engines. Authentication --------------