From e4b173fe79a7e77fc72ca941b2f40f5501c32201 2009-02-20 19:13:18 From: Brian Granger Date: 2009-02-20 19:13:18 Subject: [PATCH] Merging vvatsa's ipcluster-dev branch. This merge brings in a new ssh mode for ipcluster. Thanks to Vishal Vatsa for this. --- diff --git a/IPython/kernel/engineservice.py b/IPython/kernel/engineservice.py index 26577f2..b301f0f 100644 --- a/IPython/kernel/engineservice.py +++ b/IPython/kernel/engineservice.py @@ -693,7 +693,7 @@ class QueuedEngine(object): @queue def execute(self, lines): pass - + @queue def push(self, namespace): pass diff --git a/IPython/kernel/multienginefc.py b/IPython/kernel/multienginefc.py index ec51e47..30de28d 100644 --- a/IPython/kernel/multienginefc.py +++ b/IPython/kernel/multienginefc.py @@ -131,7 +131,7 @@ class FCSynchronousMultiEngineFromMultiEngine(Referenceable): def _addDeferredIDCallback(self, did, callback, *args, **kwargs): self._deferredIDCallbacks[did] = (callback, args, kwargs) return did - + #--------------------------------------------------------------------------- # IEngineMultiplexer related methods #--------------------------------------------------------------------------- @@ -346,7 +346,7 @@ class FCFullSynchronousMultiEngineClient(object): #--------------------------------------------------------------------------- # IEngineMultiplexer related methods #--------------------------------------------------------------------------- - + def execute(self, lines, targets='all', block=True): d = self.remote_reference.callRemote('execute', lines, targets, block) d.addCallback(self.unpackage) diff --git a/IPython/kernel/scripts/ipcluster.py b/IPython/kernel/scripts/ipcluster.py index 7c3b278..640a41a 100644 --- a/IPython/kernel/scripts/ipcluster.py +++ b/IPython/kernel/scripts/ipcluster.py @@ -18,6 +18,7 @@ import os import re import sys import signal +import tempfile pjoin = os.path.join from twisted.internet import reactor, defer @@ -81,10 +82,10 @@ class LauncherProcessProtocol(ProcessProtocol): ) else: raise UnknownStatus("unknown exit status, this is probably a bug in Twisted") - + def outReceived(self, data): log.msg(data) - + def errReceived(self, data): log.err(data) @@ -272,7 +273,7 @@ class BatchEngineSet(object): self.context = {} self.context.update(kwargs) self.batch_file = self.template_file+'-run' - + def parse_job_id(self, output): m = re.match(self.job_id_regexp, output) if m is not None: @@ -320,6 +321,140 @@ class PBSEngineSet(BatchEngineSet): BatchEngineSet.__init__(self, template_file, **kwargs) +sshx_template="""#!/bin/sh +"$@" &> /dev/null & +echo $! +""" + +engine_killer_template="""#!/bin/sh +ps -fu `whoami` | grep '[i]pengine' | awk '{print $2}' | xargs kill -TERM +""" + +class SSHEngineSet(object): + sshx_template=sshx_template + engine_killer_template=engine_killer_template + + def __init__(self, engine_hosts, sshx=None, ipengine="ipengine"): + """Start a controller on localhost and engines using ssh. + + The engine_hosts argument is a dict with hostnames as keys and + the number of engine (int) as values. sshx is the name of a local + file that will be used to run remote commands. This file is used + to setup the environment properly. + """ + + self.temp_dir = tempfile.gettempdir() + if sshx is not None: + self.sshx = sshx + else: + # Write the sshx.sh file locally from our template. + self.sshx = os.path.join( + self.temp_dir, + '%s-main-sshx.sh' % os.environ['USER'] + ) + f = open(self.sshx, 'w') + f.writelines(self.sshx_template) + f.close() + self.engine_command = ipengine + self.engine_hosts = engine_hosts + # Write the engine killer script file locally from our template. + self.engine_killer = os.path.join( + self.temp_dir, + '%s-local-engine_killer.sh' % os.environ['USER'] + ) + f = open(self.engine_killer, 'w') + f.writelines(self.engine_killer_template) + f.close() + + def start(self, send_furl=False): + dlist = [] + for host in self.engine_hosts.keys(): + count = self.engine_hosts[host] + d = self._start(host, count, send_furl) + dlist.append(d) + return gatherBoth(dlist, consumeErrors=True) + + def _start(self, hostname, count=1, send_furl=False): + if send_furl: + d = self._scp_furl(hostname) + else: + d = defer.succeed(None) + d.addCallback(lambda r: self._scp_sshx(hostname)) + d.addCallback(lambda r: self._ssh_engine(hostname, count)) + return d + + def _scp_furl(self, hostname): + scp_cmd = "scp ~/.ipython/security/ipcontroller-engine.furl %s:.ipython/security/" % (hostname) + cmd_list = scp_cmd.split() + cmd_list[1] = os.path.expanduser(cmd_list[1]) + log.msg('Copying furl file: %s' % scp_cmd) + d = getProcessOutput(cmd_list[0], cmd_list[1:], env=os.environ) + return d + + def _scp_sshx(self, hostname): + scp_cmd = "scp %s %s:%s/%s-sshx.sh" % ( + self.sshx, hostname, + self.temp_dir, os.environ['USER'] + ) + print + log.msg("Copying sshx: %s" % scp_cmd) + sshx_scp = scp_cmd.split() + d = getProcessOutput(sshx_scp[0], sshx_scp[1:], env=os.environ) + return d + + def _ssh_engine(self, hostname, count): + exec_engine = "ssh %s sh %s/%s-sshx.sh %s" % ( + hostname, self.temp_dir, + os.environ['USER'], self.engine_command + ) + cmds = exec_engine.split() + dlist = [] + log.msg("about to start engines...") + for i in range(count): + log.msg('Starting engines: %s' % exec_engine) + d = getProcessOutput(cmds[0], cmds[1:], env=os.environ) + dlist.append(d) + return gatherBoth(dlist, consumeErrors=True) + + def kill(self): + dlist = [] + for host in self.engine_hosts.keys(): + d = self._killall(host) + dlist.append(d) + return gatherBoth(dlist, consumeErrors=True) + + def _killall(self, hostname): + d = self._scp_engine_killer(hostname) + d.addCallback(lambda r: self._ssh_kill(hostname)) + # d.addErrback(self._exec_err) + return d + + def _scp_engine_killer(self, hostname): + scp_cmd = "scp %s %s:%s/%s-engine_killer.sh" % ( + self.engine_killer, + hostname, + self.temp_dir, + os.environ['USER'] + ) + cmds = scp_cmd.split() + log.msg('Copying engine_killer: %s' % scp_cmd) + d = getProcessOutput(cmds[0], cmds[1:], env=os.environ) + return d + + def _ssh_kill(self, hostname): + kill_cmd = "ssh %s sh %s/%s-engine_killer.sh" % ( + hostname, + self.temp_dir, + os.environ['USER'] + ) + log.msg('Killing engine: %s' % kill_cmd) + kill_cmd = kill_cmd.split() + d = getProcessOutput(kill_cmd[0], kill_cmd[1:], env=os.environ) + return d + + def _exec_err(self, r): + log.msg(r) + #----------------------------------------------------------------------------- # Main functions for the different types of clusters #----------------------------------------------------------------------------- @@ -343,6 +478,7 @@ Try running ipcluster with the -xy flags: ipcluster local -xy -n 4""") cont_args.append('-y') return True + def main_local(args): cont_args = [] cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller')) @@ -376,6 +512,7 @@ def main_local(args): dstart.addCallback(delay_start) dstart.addErrback(lambda f: f.raiseException()) + def main_mpirun(args): cont_args = [] cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller')) @@ -413,6 +550,7 @@ def main_mpirun(args): dstart.addCallback(delay_start) dstart.addErrback(lambda f: f.raiseException()) + def main_pbs(args): cont_args = [] cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller')) @@ -437,6 +575,49 @@ def main_pbs(args): dstart.addErrback(lambda f: f.raiseException()) +def main_ssh(args): + """Start a controller on localhost and engines using ssh. + + Your clusterfile should look like:: + + send_furl = False # True, if you want + engines = { + 'engine_host1' : engine_count, + 'engine_host2' : engine_count2 + } + """ + clusterfile = {} + execfile(args.clusterfile, clusterfile) + if not clusterfile.has_key('send_furl'): + clusterfile['send_furl'] = False + + cont_args = [] + cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller')) + + # Check security settings before proceeding + if not check_security(args, cont_args): + return + + cl = ControllerLauncher(extra_args=cont_args) + dstart = cl.start() + def start_engines(cont_pid): + ssh_set = SSHEngineSet(clusterfile['engines'], sshx=args.sshx) + def shutdown(signum, frame): + d = ssh_set.kill() + # d.addErrback(log.err) + cl.interrupt_then_kill(1.0) + reactor.callLater(2.0, reactor.stop) + signal.signal(signal.SIGINT,shutdown) + d = ssh_set.start(clusterfile['send_furl']) + return d + + def delay_start(cont_pid): + reactor.callLater(1.0, start_engines, cont_pid) + + dstart.addCallback(delay_start) + dstart.addErrback(lambda f: f.raiseException()) + + def get_args(): base_parser = argparse.ArgumentParser(add_help=False) base_parser.add_argument( @@ -508,6 +689,27 @@ def get_args(): default='pbs.template' ) parser_pbs.set_defaults(func=main_pbs) + + parser_ssh = subparsers.add_parser( + 'ssh', + help='run a cluster using ssh, should have ssh-keys setup', + parents=[base_parser] + ) + parser_ssh.add_argument( + '--clusterfile', + type=str, + dest='clusterfile', + help='python file describing the cluster', + default='clusterfile.py', + ) + parser_ssh.add_argument( + '--sshx', + type=str, + dest='sshx', + help='sshx launcher helper' + ) + parser_ssh.set_defaults(func=main_ssh) + args = parser.parse_args() return args diff --git a/docs/source/changes.txt b/docs/source/changes.txt index f1971d2..2408bbf 100644 --- a/docs/source/changes.txt +++ b/docs/source/changes.txt @@ -27,6 +27,9 @@ Release dev New features ------------ +* The new ipcluster now has a fully working ssh mode that should work on + Linux, Unix and OS X. Thanks to Vishal Vatsa for implementing this! + * The wonderful TextMate editor can now be used with %edit on OS X. Thanks to Matt Foster for this patch. @@ -59,6 +62,8 @@ New features Bug fixes --------- +* Numerous bugs on Windows with the new ipcluster have been fixed. + * The ipengine and ipcontroller scripts now handle missing furl files more gracefully by giving better error messages. diff --git a/docs/source/parallel/parallel_process.txt b/docs/source/parallel/parallel_process.txt index 660d06d..d35ffc9 100644 --- a/docs/source/parallel/parallel_process.txt +++ b/docs/source/parallel/parallel_process.txt @@ -53,6 +53,8 @@ The :command:`ipcluster` command provides a simple way of starting a controller 2. When engines are started using the :command:`mpirun` command that comes with most MPI [MPI]_ implementations 3. When engines are started using the PBS [PBS]_ batch system. +4. When the controller is started on localhost and the engines are started on + remote nodes using :command:`ssh`. .. note:: @@ -66,7 +68,8 @@ The :command:`ipcluster` command provides a simple way of starting a controller :file:`~/.ipython/security` directory live on a shared filesystem that is seen by both the controller and engines. If you don't have a shared file system you will need to use :command:`ipcontroller` and - :command:`ipengine` directly. + :command:`ipengine` directly. This constraint can be relaxed if you are + using the :command:`ssh` method to start the cluster. Underneath the hood, :command:`ipcluster` just uses :command:`ipcontroller` and :command:`ipengine` to perform the steps described above. @@ -159,6 +162,75 @@ Additional command line options for this mode can be found by doing:: $ ipcluster pbs -h +Using :command:`ipcluster` in SSH mode +-------------------------------------- + +The SSH mode uses :command:`ssh` to execute :command:`ipengine` on remote +nodes and the :command:`ipcontroller` on localhost. + +When using using this mode it highly recommended that you have set up SSH keys and are using ssh-agent [SSH]_ for password-less logins. + +To use this mode you need a python file describing the cluster, here is an example of such a "clusterfile": + +.. sourcecode:: python + + send_furl = True + engines = { 'host1.example.com' : 2, + 'host2.example.com' : 5, + 'host3.example.com' : 1, + 'host4.example.com' : 8 } + +Since this is a regular python file usual python syntax applies. Things to note: + +* The `engines` dict, where the keys is the host we want to run engines on and + the value is the number of engines to run on that host. +* send_furl can either be `True` or `False`, if `True` it will copy over the + furl needed for :command:`ipengine` to each host. + +The ``--clusterfile`` command line option lets you specify the file to use for +the cluster definition. Once you have your cluster file and you can +:command:`ssh` into the remote hosts with out an password you are ready to +start your cluster like so: + +.. sourcecode:: bash + + $ ipcluster ssh --clusterfile /path/to/my/clusterfile.py + + +Two helper shell scripts are used to start and stop :command:`ipengine` on remote hosts: + +* sshx.sh +* engine_killer.sh + +Defaults for both of these are contained in the source code for :command:`ipcluster`. The default scripts are written to a local file in a tmep directory and then copied to a temp directory on the remote host and executed from there. On most Unix, Linux and OS X systems this is /tmp. + +The default sshx.sh is the following: + +.. sourcecode:: bash + + #!/bin/sh + "$@" &> /dev/null & + echo $! + +If you want to use a custom sshx.sh script you need to use the ``--sshx`` +option and specify the file to use. Using a custom sshx.sh file could be +helpful when you need to setup the environment on the remote host before +executing :command:`ipengine`. + +For a detailed options list: + +.. sourcecode:: bash + + $ ipcluster ssh -h + +Current limitations of the SSH mode of :command:`ipcluster` are: + +* Untested on Windows. Would require a working :command:`ssh` on Windows. + Also, we are using shell scripts to setup and execute commands on remote + hosts. +* :command:`ipcontroller` is started on localhost, with no option to start it + on a remote node. + Using the :command:`ipcontroller` and :command:`ipengine` commands ================================================================== @@ -249,3 +321,4 @@ the log files to us will often help us to debug any problems. .. [PBS] Portable Batch System. http://www.openpbs.org/ +.. [SSH] SSH-Agent http://en.wikipedia.org/wiki/Ssh-agent