diff --git a/IPython/kernel/scripts/ipcluster.py b/IPython/kernel/scripts/ipcluster.py index 7c3b278..db482a6 100644 --- a/IPython/kernel/scripts/ipcluster.py +++ b/IPython/kernel/scripts/ipcluster.py @@ -18,6 +18,7 @@ import os import re import sys import signal +import tempfile pjoin = os.path.join from twisted.internet import reactor, defer @@ -81,10 +82,10 @@ class LauncherProcessProtocol(ProcessProtocol): ) else: raise UnknownStatus("unknown exit status, this is probably a bug in Twisted") - + def outReceived(self, data): log.msg(data) - + def errReceived(self, data): log.err(data) @@ -272,7 +273,7 @@ class BatchEngineSet(object): self.context = {} self.context.update(kwargs) self.batch_file = self.template_file+'-run' - + def parse_job_id(self, output): m = re.match(self.job_id_regexp, output) if m is not None: @@ -319,6 +320,83 @@ class PBSEngineSet(BatchEngineSet): def __init__(self, template_file, **kwargs): BatchEngineSet.__init__(self, template_file, **kwargs) +class SSHEngineSet(object): + sshx_template="""#!/bin/sh +"$@" &> /dev/null & +echo $!""" + + engine_killer_template="""#!/bin/sh + +ps -fu `whoami` | grep ipengine | awk '{print $2}' | xargs kill -TERM""" + + def __init__(self, engine_hosts, sshx=None, ipengine="ipengine"): + self.temp_dir = tempfile.gettempdir() + if sshx != None: + self.sshx = sshx + else: + self.sshx = os.path.join(self.temp_dir, '%s-main-sshx.sh'%os.environ['USER']) + f = open(self.sshx, 'w') + f.writelines(self.sshx_template) + f.close() + self.engine_command = ipengine + self.engine_hosts = engine_hosts + self.engine_killer = os.path.join(self.temp_dir, '%s-main-engine_killer.sh'%os.environ['USER']) + f = open(self.engine_killer, 'w') + f.writelines(self.engine_killer_template) + f.close() + + def start(self, send_furl=False): + for host in self.engine_hosts.keys(): + count = self.engine_hosts[host] + self._start(host, count, send_furl) + + def killall(self): + for host in self.engine_hosts.keys(): + self._killall(host) + + def _start(self, host_name, count=1, send_furl=False): + + def _scp_sshx(d): + scp_cmd = "scp %s %s:%s/%s-sshx.sh"%(self.sshx, host_name, self.temp_dir, os.environ['USER']) + sshx_scp = scp_cmd.split() + print sshx_scp + d = getProcessOutput(sshx_scp[0], sshx_scp[1:], env=os.environ) + d.addCallback(_exec_engine) + + def _exec_engine(d): + exec_engine = "ssh %s sh %s/%s-sshx.sh %s"%(host_name, self.temp_dir, os.environ['USER'], self.engine_command) + cmds = exec_engine.split() + print cmds + for i in range(count): + d = getProcessOutput(cmds[0], cmds[1:], env=os.environ) + + if send_furl: + scp_cmd = "scp ~/.ipython/security/ipcontroller-engine.furl %s:.ipython/security/"%(host_name) + cmd_list = scp_cmd.split() + cmd_list[1] = os.path.expanduser(cmd_list[1]) + print cmd_list + d = getProcessOutput(cmd_list[0], cmd_list[1:], env=os.environ) + d.addCallback(_scp_sshx) + else: + _scp_sshx(d=None) + + def _killall(self, host_name): + def _exec_err(d): + if d.getErrorMessage()[-18:] != "No such process\\n\'": + raise d + + def _exec_kill(d): + kill_cmd = "ssh %s sh %s/%s-engine_killer.sh"%( host_name, self.temp_dir, os.environ['USER']) + kill_cmd = kill_cmd.split() + print kill_cmd + d = getProcessOutput(kill_cmd[0], kill_cmd[1:], env=os.environ) + d.addErrback(_exec_err) + scp_cmd = "scp %s %s:%s/%s-engine_killer.sh"%( self.engine_killer, host_name, self.temp_dir, os.environ['USER']) + cmds = scp_cmd.split() + d = getProcessOutput(cmds[0], cmds[1:], env=os.environ) + d.addCallback(_exec_kill) + d.addErrback(_exec_err) + #----------------------------------------------------------------------------- # Main functions for the different types of clusters @@ -343,6 +421,7 @@ Try running ipcluster with the -xy flags: ipcluster local -xy -n 4""") cont_args.append('-y') return True + def main_local(args): cont_args = [] cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller')) @@ -376,6 +455,7 @@ def main_local(args): dstart.addCallback(delay_start) dstart.addErrback(lambda f: f.raiseException()) + def main_mpirun(args): cont_args = [] cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller')) @@ -413,6 +493,7 @@ def main_mpirun(args): dstart.addCallback(delay_start) dstart.addErrback(lambda f: f.raiseException()) + def main_pbs(args): cont_args = [] cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller')) @@ -437,6 +518,40 @@ def main_pbs(args): dstart.addErrback(lambda f: f.raiseException()) +# currently the ssh launcher only launches the controller on localhost. +def main_ssh(args): + # the clusterfile should look like: + # send_furl = False # True, if you want + # engines = {'engine_host1' : engine_count, 'engine_host2' : engine_count2} + clusterfile = {} + execfile(args.clusterfile, clusterfile) + if not clusterfile.has_key('send_furl'): + clusterfile['send_furl'] = False + + cont_args = [] + cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller')) + if args.x: + cont_args.append('-x') + if args.y: + cont_args.append('-y') + cl = ControllerLauncher(extra_args=cont_args) + dstart = cl.start() + def start_engines(cont_pid): + est = SSHEngineSet(clusterfile['engines'], sshx=args.sshx) + est.start(clusterfile['send_furl']) + def shutdown(signum, frame): + est.killall() + cl.interrupt_then_kill(0.5) + reactor.callLater(2.0, reactor.stop) + signal.signal(signal.SIGINT,shutdown) + + def delay_start(cont_pid): + reactor.callLater(1.0, start_engines, cont_pid) + + dstart.addCallback(delay_start) + dstart.addErrback(lambda f: f.raiseException()) + + def get_args(): base_parser = argparse.ArgumentParser(add_help=False) base_parser.add_argument( @@ -508,6 +623,28 @@ def get_args(): default='pbs.template' ) parser_pbs.set_defaults(func=main_pbs) + + parser_ssh = subparsers.add_parser( + 'ssh', + help='run a cluster using ssh, should have ssh-keys setup', + parents=[base_parser] + ) + parser_ssh.add_argument( + '--clusterfile', + type=str, + dest='clusterfile', + help='python file describing the cluster', + default='clusterfile.py', + ) + parser_ssh.add_argument( + '--sshx', + type=str, + dest='sshx', + help='sshx launcher helper', + default='sshx.sh', + ) + parser_ssh.set_defaults(func=main_ssh) + args = parser.parse_args() return args diff --git a/docs/source/parallel/parallel_process.txt b/docs/source/parallel/parallel_process.txt index 660d06d..47a0090 100644 --- a/docs/source/parallel/parallel_process.txt +++ b/docs/source/parallel/parallel_process.txt @@ -53,6 +53,8 @@ The :command:`ipcluster` command provides a simple way of starting a controller 2. When engines are started using the :command:`mpirun` command that comes with most MPI [MPI]_ implementations 3. When engines are started using the PBS [PBS]_ batch system. +4. When the controller is started on localhost and the engines are started on + remote nodes using :command:`ssh`. .. note:: @@ -66,7 +68,8 @@ The :command:`ipcluster` command provides a simple way of starting a controller :file:`~/.ipython/security` directory live on a shared filesystem that is seen by both the controller and engines. If you don't have a shared file system you will need to use :command:`ipcontroller` and - :command:`ipengine` directly. + :command:`ipengine` directly. This constraint can be relaxed if you are + using the :command:`ssh` method to start the cluster. Underneath the hood, :command:`ipcluster` just uses :command:`ipcontroller` and :command:`ipengine` to perform the steps described above. @@ -159,6 +162,77 @@ Additional command line options for this mode can be found by doing:: $ ipcluster pbs -h +Using :command:`ipcluster` in SSH mode +-------------------------------------- + +The SSH mode uses :command:`ssh` to execute :command:`ipengine` on remote +nodes and the :command:`ipcontroller` on localhost. + +When using using this mode it highly recommended that you have set up SSH keys and are using ssh-agent [SSH]_ for password-less logins. + +To use this mode you need a python file describing the cluster, here is an example of such a "clusterfile": + +.. sourcecode:: python + + send_furl = True + engines = { 'host1.example.com' : 2, + 'host2.example.com' : 5, + 'host3.example.com' : 1, + 'host4.example.com' : 8 } + +Since this is a regular python file usual python syntax applies. Things to note: + +* The `engines` dict, where the keys is the host we want to run engines on and + the value is the number of engines to run on that host. +* send_furl can either be `True` or `False`, if `True` it will copy over the + furl needed for :command:`ipengine` to each host. + +The ``--clusterfile`` command line option lets you specify the file to use for +the cluster definition. Once you have your cluster file and you can +:command:`ssh` into the remote hosts with out an password you are ready to +start your cluster like so: + +.. sourcecode:: bash + + $ ipcluster ssh --clusterfile /path/to/my/clusterfile.py + + +Two helper shell scripts are used to start and stop :command:`ipengine` on remote hosts: + +* sshx.sh +* engine_killer.sh + +Both are provided in the :dir:`IPython.kernel.scripts`. They are copied to a +temp directory on the remote host and executed from there, on most Unix, Linux +and OS X systems this is /tmp. + +The sshx.sh is as simple as: + +.. sourcecode:: bash + + #!/bin/sh + "$@" &> /dev/null & + echo $! + +If you want to use a custom sshx.sh script you need to use the ``--sshx`` +option and specify the file to use. Using a custom sshx.sh file could be +helpful when you need to setup the environment on the remote host before +executing :command:`ipengine`. + +For a detailed options list: + +.. sourcecode:: bash + + $ ipcluster ssh -h + +Current limitations of the SSH mode of :command:`ipcluster` are: + +* Untested on Windows. Would require a working :command:`ssh` on Windows. + Also, we are using shell scripts to setup and execute commands on remote + hosts. +* :command:`ipcontroller` is started on localhost, with no option to start it + on a remote node also. + Using the :command:`ipcontroller` and :command:`ipengine` commands ================================================================== @@ -249,3 +323,4 @@ the log files to us will often help us to debug any problems. .. [PBS] Portable Batch System. http://www.openpbs.org/ +.. [SSH] SSH-Agent http://en.wikipedia.org/wiki/Ssh-agent