From 34a51241698a3b66963bd76503dc8c1f4d67f48a 2009-02-20 07:06:37 From: Brian Granger Date: 2009-02-20 07:06:37 Subject: [PATCH] Merging in vvatsa's ssh mode for ipcluster with some changes. We now have a fully working ssh mode for the new ipcluster. It should work well on Unix, Linux and OS X. --- diff --git a/IPython/kernel/engineservice.py b/IPython/kernel/engineservice.py index 657eda8..b301f0f 100644 --- a/IPython/kernel/engineservice.py +++ b/IPython/kernel/engineservice.py @@ -400,7 +400,6 @@ class EngineService(object, service.Service): # The IEngine methods. See the interface for documentation. - @profile def execute(self, lines): msg = {'engineid':self.id, 'method':'execute', diff --git a/IPython/kernel/multiengine.py b/IPython/kernel/multiengine.py index b42b3b6..4d1fe6c 100644 --- a/IPython/kernel/multiengine.py +++ b/IPython/kernel/multiengine.py @@ -552,7 +552,6 @@ class SynchronousMultiEngine(PendingDeferredManager): # Decorated pending deferred methods #--------------------------------------------------------------------------- - @profile @two_phase def execute(self, lines, targets='all'): d = self.multiengine.execute(lines, targets) diff --git a/IPython/kernel/scripts/ipcluster.py b/IPython/kernel/scripts/ipcluster.py index db482a6..640a41a 100644 --- a/IPython/kernel/scripts/ipcluster.py +++ b/IPython/kernel/scripts/ipcluster.py @@ -320,83 +320,140 @@ class PBSEngineSet(BatchEngineSet): def __init__(self, template_file, **kwargs): BatchEngineSet.__init__(self, template_file, **kwargs) -class SSHEngineSet(object): - sshx_template="""#!/bin/sh + +sshx_template="""#!/bin/sh "$@" &> /dev/null & -echo $!""" - - engine_killer_template="""#!/bin/sh +echo $! +""" + +engine_killer_template="""#!/bin/sh +ps -fu `whoami` | grep '[i]pengine' | awk '{print $2}' | xargs kill -TERM +""" -ps -fu `whoami` | grep ipengine | awk '{print $2}' | xargs kill -TERM""" +class SSHEngineSet(object): + sshx_template=sshx_template + engine_killer_template=engine_killer_template def __init__(self, engine_hosts, sshx=None, ipengine="ipengine"): + """Start a controller on localhost and engines using ssh. + + The engine_hosts argument is a dict with hostnames as keys and + the number of engine (int) as values. sshx is the name of a local + file that will be used to run remote commands. This file is used + to setup the environment properly. + """ + self.temp_dir = tempfile.gettempdir() - if sshx != None: + if sshx is not None: self.sshx = sshx else: - self.sshx = os.path.join(self.temp_dir, '%s-main-sshx.sh'%os.environ['USER']) + # Write the sshx.sh file locally from our template. + self.sshx = os.path.join( + self.temp_dir, + '%s-main-sshx.sh' % os.environ['USER'] + ) f = open(self.sshx, 'w') f.writelines(self.sshx_template) f.close() self.engine_command = ipengine self.engine_hosts = engine_hosts - self.engine_killer = os.path.join(self.temp_dir, '%s-main-engine_killer.sh'%os.environ['USER']) + # Write the engine killer script file locally from our template. + self.engine_killer = os.path.join( + self.temp_dir, + '%s-local-engine_killer.sh' % os.environ['USER'] + ) f = open(self.engine_killer, 'w') f.writelines(self.engine_killer_template) f.close() def start(self, send_furl=False): + dlist = [] for host in self.engine_hosts.keys(): count = self.engine_hosts[host] - self._start(host, count, send_furl) - - def killall(self): - for host in self.engine_hosts.keys(): - self._killall(host) + d = self._start(host, count, send_furl) + dlist.append(d) + return gatherBoth(dlist, consumeErrors=True) - def _start(self, host_name, count=1, send_furl=False): - - def _scp_sshx(d): - scp_cmd = "scp %s %s:%s/%s-sshx.sh"%(self.sshx, host_name, self.temp_dir, os.environ['USER']) - sshx_scp = scp_cmd.split() - print sshx_scp - d = getProcessOutput(sshx_scp[0], sshx_scp[1:], env=os.environ) - d.addCallback(_exec_engine) - - def _exec_engine(d): - exec_engine = "ssh %s sh %s/%s-sshx.sh %s"%(host_name, self.temp_dir, os.environ['USER'], self.engine_command) - cmds = exec_engine.split() - print cmds - for i in range(count): - d = getProcessOutput(cmds[0], cmds[1:], env=os.environ) - + def _start(self, hostname, count=1, send_furl=False): if send_furl: - scp_cmd = "scp ~/.ipython/security/ipcontroller-engine.furl %s:.ipython/security/"%(host_name) - cmd_list = scp_cmd.split() - cmd_list[1] = os.path.expanduser(cmd_list[1]) - print cmd_list - d = getProcessOutput(cmd_list[0], cmd_list[1:], env=os.environ) - d.addCallback(_scp_sshx) + d = self._scp_furl(hostname) else: - _scp_sshx(d=None) - - def _killall(self, host_name): - def _exec_err(d): - if d.getErrorMessage()[-18:] != "No such process\\n\'": - raise d + d = defer.succeed(None) + d.addCallback(lambda r: self._scp_sshx(hostname)) + d.addCallback(lambda r: self._ssh_engine(hostname, count)) + return d - def _exec_kill(d): - kill_cmd = "ssh %s sh %s/%s-engine_killer.sh"%( host_name, self.temp_dir, os.environ['USER']) - kill_cmd = kill_cmd.split() - print kill_cmd - d = getProcessOutput(kill_cmd[0], kill_cmd[1:], env=os.environ) - d.addErrback(_exec_err) - scp_cmd = "scp %s %s:%s/%s-engine_killer.sh"%( self.engine_killer, host_name, self.temp_dir, os.environ['USER']) + def _scp_furl(self, hostname): + scp_cmd = "scp ~/.ipython/security/ipcontroller-engine.furl %s:.ipython/security/" % (hostname) + cmd_list = scp_cmd.split() + cmd_list[1] = os.path.expanduser(cmd_list[1]) + log.msg('Copying furl file: %s' % scp_cmd) + d = getProcessOutput(cmd_list[0], cmd_list[1:], env=os.environ) + return d + + def _scp_sshx(self, hostname): + scp_cmd = "scp %s %s:%s/%s-sshx.sh" % ( + self.sshx, hostname, + self.temp_dir, os.environ['USER'] + ) + print + log.msg("Copying sshx: %s" % scp_cmd) + sshx_scp = scp_cmd.split() + d = getProcessOutput(sshx_scp[0], sshx_scp[1:], env=os.environ) + return d + + def _ssh_engine(self, hostname, count): + exec_engine = "ssh %s sh %s/%s-sshx.sh %s" % ( + hostname, self.temp_dir, + os.environ['USER'], self.engine_command + ) + cmds = exec_engine.split() + dlist = [] + log.msg("about to start engines...") + for i in range(count): + log.msg('Starting engines: %s' % exec_engine) + d = getProcessOutput(cmds[0], cmds[1:], env=os.environ) + dlist.append(d) + return gatherBoth(dlist, consumeErrors=True) + + def kill(self): + dlist = [] + for host in self.engine_hosts.keys(): + d = self._killall(host) + dlist.append(d) + return gatherBoth(dlist, consumeErrors=True) + + def _killall(self, hostname): + d = self._scp_engine_killer(hostname) + d.addCallback(lambda r: self._ssh_kill(hostname)) + # d.addErrback(self._exec_err) + return d + + def _scp_engine_killer(self, hostname): + scp_cmd = "scp %s %s:%s/%s-engine_killer.sh" % ( + self.engine_killer, + hostname, + self.temp_dir, + os.environ['USER'] + ) cmds = scp_cmd.split() + log.msg('Copying engine_killer: %s' % scp_cmd) d = getProcessOutput(cmds[0], cmds[1:], env=os.environ) - d.addCallback(_exec_kill) - d.addErrback(_exec_err) + return d + + def _ssh_kill(self, hostname): + kill_cmd = "ssh %s sh %s/%s-engine_killer.sh" % ( + hostname, + self.temp_dir, + os.environ['USER'] + ) + log.msg('Killing engine: %s' % kill_cmd) + kill_cmd = kill_cmd.split() + d = getProcessOutput(kill_cmd[0], kill_cmd[1:], env=os.environ) + return d + def _exec_err(self, r): + log.msg(r) #----------------------------------------------------------------------------- # Main functions for the different types of clusters @@ -518,11 +575,17 @@ def main_pbs(args): dstart.addErrback(lambda f: f.raiseException()) -# currently the ssh launcher only launches the controller on localhost. def main_ssh(args): - # the clusterfile should look like: - # send_furl = False # True, if you want - # engines = {'engine_host1' : engine_count, 'engine_host2' : engine_count2} + """Start a controller on localhost and engines using ssh. + + Your clusterfile should look like:: + + send_furl = False # True, if you want + engines = { + 'engine_host1' : engine_count, + 'engine_host2' : engine_count2 + } + """ clusterfile = {} execfile(args.clusterfile, clusterfile) if not clusterfile.has_key('send_furl'): @@ -530,21 +593,24 @@ def main_ssh(args): cont_args = [] cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller')) - if args.x: - cont_args.append('-x') - if args.y: - cont_args.append('-y') + + # Check security settings before proceeding + if not check_security(args, cont_args): + return + cl = ControllerLauncher(extra_args=cont_args) dstart = cl.start() def start_engines(cont_pid): - est = SSHEngineSet(clusterfile['engines'], sshx=args.sshx) - est.start(clusterfile['send_furl']) + ssh_set = SSHEngineSet(clusterfile['engines'], sshx=args.sshx) def shutdown(signum, frame): - est.killall() - cl.interrupt_then_kill(0.5) + d = ssh_set.kill() + # d.addErrback(log.err) + cl.interrupt_then_kill(1.0) reactor.callLater(2.0, reactor.stop) signal.signal(signal.SIGINT,shutdown) - + d = ssh_set.start(clusterfile['send_furl']) + return d + def delay_start(cont_pid): reactor.callLater(1.0, start_engines, cont_pid) @@ -640,8 +706,7 @@ def get_args(): '--sshx', type=str, dest='sshx', - help='sshx launcher helper', - default='sshx.sh', + help='sshx launcher helper' ) parser_ssh.set_defaults(func=main_ssh) diff --git a/docs/source/changes.txt b/docs/source/changes.txt index f1971d2..2408bbf 100644 --- a/docs/source/changes.txt +++ b/docs/source/changes.txt @@ -27,6 +27,9 @@ Release dev New features ------------ +* The new ipcluster now has a fully working ssh mode that should work on + Linux, Unix and OS X. Thanks to Vishal Vatsa for implementing this! + * The wonderful TextMate editor can now be used with %edit on OS X. Thanks to Matt Foster for this patch. @@ -59,6 +62,8 @@ New features Bug fixes --------- +* Numerous bugs on Windows with the new ipcluster have been fixed. + * The ipengine and ipcontroller scripts now handle missing furl files more gracefully by giving better error messages. diff --git a/docs/source/parallel/parallel_process.txt b/docs/source/parallel/parallel_process.txt index 47a0090..d35ffc9 100644 --- a/docs/source/parallel/parallel_process.txt +++ b/docs/source/parallel/parallel_process.txt @@ -202,11 +202,9 @@ Two helper shell scripts are used to start and stop :command:`ipengine` on remot * sshx.sh * engine_killer.sh -Both are provided in the :dir:`IPython.kernel.scripts`. They are copied to a -temp directory on the remote host and executed from there, on most Unix, Linux -and OS X systems this is /tmp. +Defaults for both of these are contained in the source code for :command:`ipcluster`. The default scripts are written to a local file in a tmep directory and then copied to a temp directory on the remote host and executed from there. On most Unix, Linux and OS X systems this is /tmp. -The sshx.sh is as simple as: +The default sshx.sh is the following: .. sourcecode:: bash @@ -231,7 +229,7 @@ Current limitations of the SSH mode of :command:`ipcluster` are: Also, we are using shell scripts to setup and execute commands on remote hosts. * :command:`ipcontroller` is started on localhost, with no option to start it - on a remote node also. + on a remote node. Using the :command:`ipcontroller` and :command:`ipengine` commands ==================================================================