diff --git a/IPython/kernel/scripts/ipcluster.py b/IPython/kernel/scripts/ipcluster.py index 08e8c9a..708158f 100644 --- a/IPython/kernel/scripts/ipcluster.py +++ b/IPython/kernel/scripts/ipcluster.py @@ -1,347 +1,381 @@ -#!/usr/bin/env python -# encoding: utf-8 - -"""Start an IPython cluster conveniently, either locally or remotely. - -Basic usage ------------ - -For local operation, the simplest mode of usage is: - - %prog -n N - -where N is the number of engines you want started. - -For remote operation, you must call it with a cluster description file: - - %prog -f clusterfile.py - -The cluster file is a normal Python script which gets run via execfile(). You -can have arbitrary logic in it, but all that matters is that at the end of the -execution, it declares the variables 'controller', 'engines', and optionally -'sshx'. See the accompanying examples for details on what these variables must -contain. - - -Notes ------ - -WARNING: this code is still UNFINISHED and EXPERIMENTAL! It is incomplete, -some listed options are not really implemented, and all of its interfaces are -subject to change. - -When operating over SSH for a remote cluster, this program relies on the -existence of a particular script called 'sshx'. This script must live in the -target systems where you'll be running your controller and engines, and is -needed to configure your PATH and PYTHONPATH variables for further execution of -python code at the other end of an SSH connection. The script can be as simple -as: - -#!/bin/sh -. $HOME/.bashrc -"$@" - -which is the default one provided by IPython. You can modify this or provide -your own. Since it's quite likely that for different clusters you may need -this script to configure things differently or that it may live in different -locations, its full path can be set in the same file where you define the -cluster setup. IPython's order of evaluation for this variable is the -following: - - a) Internal default: 'sshx'. This only works if it is in the default system - path which SSH sets up in non-interactive mode. - - b) Environment variable: if $IPYTHON_SSHX is defined, this overrides the - internal default. - - c) Variable 'sshx' in the cluster configuration file: finally, this will - override the previous two values. - -This code is Unix-only, with precious little hope of any of this ever working -under Windows, since we need SSH from the ground up, we background processes, -etc. Ports of this functionality to Windows are welcome. - - -Call summary ------------- - - %prog [options] -""" - -__docformat__ = "restructuredtext en" - -#------------------------------------------------------------------------------- -# Copyright (C) 2008 The IPython Development Team -# -# Distributed under the terms of the BSD License. The full license is in -# the file COPYING, distributed as part of this software. -#------------------------------------------------------------------------------- - -#------------------------------------------------------------------------------- -# Stdlib imports -#------------------------------------------------------------------------------- - import os -import signal +import re import sys -import time - -from optparse import OptionParser -from subprocess import Popen,call - -#--------------------------------------------------------------------------- -# IPython imports -#--------------------------------------------------------------------------- -from IPython.tools import utils +import signal +pjoin = os.path.join + +from twisted.internet import reactor, defer +from twisted.internet.protocol import ProcessProtocol +from twisted.python import failure, log +from twisted.internet.error import ProcessDone, ProcessTerminated +from twisted.internet.utils import getProcessOutput + +from IPython.external import argparse +from IPython.external import Itpl +from IPython.kernel.twistedutil import gatherBoth +from IPython.kernel.util import printer from IPython.genutils import get_ipython_dir -#--------------------------------------------------------------------------- -# Normal code begins -#--------------------------------------------------------------------------- - -def parse_args(): - """Parse command line and return opts,args.""" - - parser = OptionParser(usage=__doc__) - newopt = parser.add_option # shorthand - newopt("--controller-port", type="int", dest="controllerport", - help="the TCP port the controller is listening on") +# Test local cluster on Win32 +# Look at local cluster usage strings +# PBS stuff - newopt("--controller-ip", type="string", dest="controllerip", - help="the TCP ip address of the controller") +class ProcessStateError(Exception): + pass - newopt("-n", "--num", type="int", dest="n",default=2, - help="the number of engines to start") +class UnknownStatus(Exception): + pass - newopt("--engine-port", type="int", dest="engineport", - help="the TCP port the controller will listen on for engine " - "connections") +class LauncherProcessProtocol(ProcessProtocol): + """ + A ProcessProtocol to go with the ProcessLauncher. + """ + def __init__(self, process_launcher): + self.process_launcher = process_launcher - newopt("--engine-ip", type="string", dest="engineip", - help="the TCP ip address the controller will listen on " - "for engine connections") - - newopt("--mpi", type="string", dest="mpi", - help="use mpi with package: for instance --mpi=mpi4py") + def connectionMade(self): + self.process_launcher.fire_start_deferred(self.transport.pid) - newopt("-l", "--logfile", type="string", dest="logfile", - help="log file name") - - newopt('-f','--cluster-file',dest='clusterfile', - help='file describing a remote cluster') - - return parser.parse_args() + def processEnded(self, status): + value = status.value + if isinstance(value, ProcessDone): + self.process_launcher.fire_stop_deferred(0) + elif isinstance(value, ProcessTerminated): + self.process_launcher.fire_stop_deferred( + {'exit_code':value.exitCode, + 'signal':value.signal, + 'status':value.status + } + ) + else: + raise UnknownStatus("unknown exit status, this is probably a bug in Twisted") -def numAlive(controller,engines): - """Return the number of processes still alive.""" - retcodes = [controller.poll()] + \ - [e.poll() for e in engines] - return retcodes.count(None) + def outReceived(self, data): + log.msg(data) -stop = lambda pid: os.kill(pid,signal.SIGINT) -kill = lambda pid: os.kill(pid,signal.SIGTERM) + def errReceived(self, data): + log.err(data) -def cleanup(clean,controller,engines): - """Stop the controller and engines with the given cleanup method.""" +class ProcessLauncher(object): + """ + Start and stop an external process in an asynchronous manner. - for e in engines: - if e.poll() is None: - print 'Stopping engine, pid',e.pid - clean(e.pid) - if controller.poll() is None: - print 'Stopping controller, pid',controller.pid - clean(controller.pid) - - -def ensureDir(path): - """Ensure a directory exists or raise an exception.""" - if not os.path.isdir(path): - os.makedirs(path) - - -def startMsg(control_host,control_port=10105): - """Print a startup message""" - print - print 'Your cluster is up and running.' - print - print 'For interactive use, you can make a MultiEngineClient with:' - print - print 'from IPython.kernel import client' - print "mec = client.MultiEngineClient()" - print - print 'You can then cleanly stop the cluster from IPython using:' - print - print 'mec.kill(controller=True)' - print + Currently this uses deferreds to notify other parties of process state + changes. This is an awkward design and should be moved to using + a formal NotificationCenter. + """ + def __init__(self, cmd_and_args): + self.cmd = cmd_and_args[0] + self.args = cmd_and_args + self._reset() -def clusterLocal(opt,arg): - """Start a cluster on the local machine.""" + def _reset(self): + self.process_protocol = None + self.pid = None + self.start_deferred = None + self.stop_deferreds = [] + self.state = 'before' # before, running, or after - # Store all logs inside the ipython directory - ipdir = get_ipython_dir() - pjoin = os.path.join - - logfile = opt.logfile - if logfile is None: - logdir_base = pjoin(ipdir,'log') - ensureDir(logdir_base) - logfile = pjoin(logdir_base,'ipcluster-') - - print 'Starting controller:', - controller = Popen(['ipcontroller','--logfile',logfile,'-x','-y']) - print 'Controller PID:',controller.pid - - print 'Starting engines: ', - time.sleep(5) - - englogfile = '%s%s-' % (logfile,controller.pid) - mpi = opt.mpi - if mpi: # start with mpi - killing the engines with sigterm will not work if you do this - engines = [Popen(['mpirun', '-np', str(opt.n), 'ipengine', '--mpi', - mpi, '--logfile',englogfile])] - # engines = [Popen(['mpirun', '-np', str(opt.n), 'ipengine', '--mpi', mpi])] - else: # do what we would normally do - engines = [ Popen(['ipengine','--logfile',englogfile]) - for i in range(opt.n) ] - eids = [e.pid for e in engines] - print 'Engines PIDs: ',eids - print 'Log files: %s*' % englogfile + @property + def running(self): + if self.state == 'running': + return True + else: + return False - proc_ids = eids + [controller.pid] - procs = engines + [controller] - - grpid = os.getpgrp() - try: - startMsg('127.0.0.1') - print 'You can also hit Ctrl-C to stop it, or use from the cmd line:' - print - print 'kill -INT',grpid - print - try: - while True: - time.sleep(5) - except: - pass - finally: - print 'Stopping cluster. Cleaning up...' - cleanup(stop,controller,engines) - for i in range(4): - time.sleep(i+2) - nZombies = numAlive(controller,engines) - if nZombies== 0: - print 'OK: All processes cleaned up.' - break - print 'Trying again, %d processes did not stop...' % nZombies - cleanup(kill,controller,engines) - if numAlive(controller,engines) == 0: - print 'OK: All processes cleaned up.' - break + def fire_start_deferred(self, pid): + self.pid = pid + self.state = 'running' + log.msg('Process %r has started with pid=%i' % (self.args, pid)) + self.start_deferred.callback(pid) + + def start(self): + if self.state == 'before': + self.process_protocol = LauncherProcessProtocol(self) + self.start_deferred = defer.Deferred() + self.process_transport = reactor.spawnProcess( + self.process_protocol, + self.cmd, + self.args, + env=os.environ + ) + return self.start_deferred else: - print '*'*75 - print 'ERROR: could not kill some processes, try to do it', - print 'manually.' - zombies = [] - if controller.returncode is None: - print 'Controller is alive: pid =',controller.pid - zombies.append(controller.pid) - liveEngines = [ e for e in engines if e.returncode is None ] - for e in liveEngines: - print 'Engine is alive: pid =',e.pid - zombies.append(e.pid) - print - print 'Zombie summary:',' '.join(map(str,zombies)) - -def clusterRemote(opt,arg): - """Start a remote cluster over SSH""" - - # B. Granger, 9/3/08 - # The launching of a remote cluster using SSH and a clusterfile - # is broken. Because it won't be fixed before the 0.9 release, - # we are removing it. For now, we just print a message to the - # user and abort. + s = 'the process has already been started and has state: %r' % \ + self.state + return defer.fail(ProcessStateError(s)) + + def get_stop_deferred(self): + if self.state == 'running' or self.state == 'before': + d = defer.Deferred() + self.stop_deferreds.append(d) + return d + else: + s = 'this process is already complete' + return defer.fail(ProcessStateError(s)) + + def fire_stop_deferred(self, exit_code): + log.msg('Process %r has stopped with %r' % (self.args, exit_code)) + self.state = 'after' + for d in self.stop_deferreds: + d.callback(exit_code) + + def signal(self, sig): + """ + Send a signal to the process. + + The argument sig can be ('KILL','INT', etc.) or any signal number. + """ + if self.state == 'running': + self.process_transport.signalProcess(sig) + + def __del__(self): + self.signal('KILL') - print """The launching of a remote IPython cluster using SSL -and a clusterfile has been removed in this release. -It has been broken for a while and we are in the process -of building a new process management system that will be -used to provide a more robust way of starting an IPython -cluster. + def interrupt_then_kill(self, delay=1.0): + self.signal('INT') + reactor.callLater(delay, self.signal, 'KILL') -For now remote clusters have to be launched using ipcontroller -and ipengine separately. - """ - sys.exit(1) - # Load the remote cluster configuration - clConfig = {} - execfile(opt.clusterfile,clConfig) - contConfig = clConfig['controller'] - engConfig = clConfig['engines'] - # Determine where to find sshx: - sshx = clConfig.get('sshx',os.environ.get('IPYTHON_SSHX','sshx')) +class ControllerLauncher(ProcessLauncher): - # Store all logs inside the ipython directory - ipdir = get_ipython_dir() - pjoin = os.path.join + def __init__(self, extra_args=None): + self.args = ['ipcontroller'] + self.extra_args = extra_args + if extra_args is not None: + self.args.extend(extra_args) + + ProcessLauncher.__init__(self, self.args) - logfile = opt.logfile - if logfile is None: - logdir_base = pjoin(ipdir,'log') - ensureDir(logdir_base) - logfile = pjoin(logdir_base,'ipcluster') - # Append this script's PID to the logfile name always - logfile = '%s-%s' % (logfile,os.getpid()) +class EngineLauncher(ProcessLauncher): - print 'Starting controller:' - # Controller data: - xsys = os.system + def __init__(self, extra_args=None): + self.args = ['ipengine'] + self.extra_args = extra_args + if extra_args is not None: + self.args.extend(extra_args) + + ProcessLauncher.__init__(self, self.args) - contHost = contConfig['host'] - contLog = '%s-con-%s-' % (logfile,contHost) - cmd = "ssh %s '%s' 'ipcontroller --logfile %s' &" % \ - (contHost,sshx,contLog) - #print 'cmd:<%s>' % cmd # dbg - xsys(cmd) - time.sleep(2) - print 'Starting engines: ' - for engineHost,engineData in engConfig.iteritems(): - if isinstance(engineData,int): - numEngines = engineData +class LocalEngineSet(object): + + def __init__(self, extra_args=None): + self.extra_args = extra_args + self.launchers = [] + + def start(self, n): + dlist = [] + for i in range(n): + el = EngineLauncher(extra_args=self.extra_args) + d = el.start() + self.launchers.append(el) + dlist.append(d) + dfinal = gatherBoth(dlist, consumeErrors=True) + dfinal.addCallback(self._handle_start) + return dfinal + + def _handle_start(self, r): + log.msg('Engines started with pids: %r' % r) + return r + + def _handle_stop(self, r): + log.msg('Engines received signal: %r' % r) + return r + + def signal(self, sig): + dlist = [] + for el in self.launchers: + d = el.get_stop_deferred() + dlist.append(d) + el.signal(sig) + dfinal = gatherBoth(dlist, consumeErrors=True) + dfinal.addCallback(self._handle_stop) + return dfinal + + def interrupt_then_kill(self, delay=1.0): + dlist = [] + for el in self.launchers: + d = el.get_stop_deferred() + dlist.append(d) + el.interrupt_then_kill(delay) + dfinal = gatherBoth(dlist, consumeErrors=True) + dfinal.addCallback(self._handle_stop) + return dfinal + + +class BatchEngineSet(object): + + # Subclasses must fill these in. See PBSEngineSet + submit_command = '' + delete_command = '' + job_id_regexp = '' + + def __init__(self, template_file, **kwargs): + self.template_file = template_file + self.context = {} + self.context.update(kwargs) + self.batch_file = 'batch-script' + + def parse_job_id(self, output): + m = re.match(self.job_id_regexp, output) + if m is not None: + job_id = m.group() else: - raise NotImplementedError('port configuration not finished for engines') - - print 'Sarting %d engines on %s' % (numEngines,engineHost) - engLog = '%s-eng-%s-' % (logfile,engineHost) - for i in range(numEngines): - cmd = "ssh %s '%s' 'ipengine --controller-ip %s --logfile %s' &" % \ - (engineHost,sshx,contHost,engLog) - #print 'cmd:<%s>' % cmd # dbg - xsys(cmd) - # Wait after each host a little bit - time.sleep(1) - - startMsg(contConfig['host']) + raise Exception("job id couldn't be determined: %s" % output) + self.job_id = job_id + print 'Job started with job id:', job_id + return job_id + + def write_batch_script(self, n): + print 'n', n + self.context['n'] = n + template = open(self.template_file, 'r').read() + print 'template', template + script_as_string = Itpl.itplns(template, self.context) + print 'script', script_as_string + f = open(self.batch_file,'w') + f.write(script_as_string) + f.close() + + def handle_error(self, f): + f.printTraceback() + f.raiseException() + + def start(self, n): + self.write_batch_script(n) + d = getProcessOutput(self.submit_command, + [self.batch_file],env=os.environ) + d.addCallback(self.parse_job_id) + #d.addErrback(self.handle_error) + return d -def main(): - """Main driver for the two big options: local or remote cluster.""" + def kill(self): + d = getProcessOutput(self.delete_command, + [self.job_id],env=os.environ) + return d + +class PBSEngineSet(BatchEngineSet): + + submit_command = 'qsub' + delete_command = 'qdel' + job_id_regexp = '\d+' + + def __init__(self, template_file, **kwargs): + BatchEngineSet.__init__(self, template_file, **kwargs) + + +def main_local(args): + cont_args = [] + cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller')) + cl = ControllerLauncher(extra_args=cont_args) + dstart = cl.start() + def start_engines(cont_pid): + engine_args = [] + engine_args.append('--logfile=%s' % \ + pjoin(args.logdir,'ipengine%s-' % cont_pid)) + eset = LocalEngineSet(extra_args=engine_args) + def shutdown(signum, frame): + log.msg('Stopping local cluster') + # We are still playing with the times here, but these seem + # to be reliable in allowing everything to exit cleanly. + eset.interrupt_then_kill(0.5) + cl.interrupt_then_kill(0.5) + reactor.callLater(1.0, reactor.stop) + signal.signal(signal.SIGINT,shutdown) + d = eset.start(args.n) + return d + def delay_start(cont_pid): + # This is needed because the controller doesn't start listening + # right when it starts and the controller needs to write + # furl files for the engine to pick up + reactor.callLater(1.0, start_engines, cont_pid) + dstart.addCallback(delay_start) + dstart.addErrback(lambda f: f.raiseException()) + +def main_mpirun(args): + cont_args = [] + cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller')) + cl = ControllerLauncher(extra_args=cont_args) + dstart = cl.start() + def start_engines(cont_pid): + raw_args = ['mpirun'] + raw_args.extend(['-n',str(args.n)]) + raw_args.append('ipengine') + raw_args.append('-l') + raw_args.append(pjoin(args.logdir,'ipengine%s-' % cont_pid)) + raw_args.append('--mpi=%s' % args.mpi) + eset = ProcessLauncher(raw_args) + def shutdown(signum, frame): + log.msg('Stopping local cluster') + # We are still playing with the times here, but these seem + # to be reliable in allowing everything to exit cleanly. + eset.interrupt_then_kill(1.0) + cl.interrupt_then_kill(1.0) + reactor.callLater(2.0, reactor.stop) + signal.signal(signal.SIGINT,shutdown) + d = eset.start() + return d + def delay_start(cont_pid): + # This is needed because the controller doesn't start listening + # right when it starts and the controller needs to write + # furl files for the engine to pick up + reactor.callLater(1.0, start_engines, cont_pid) + dstart.addCallback(delay_start) + dstart.addErrback(lambda f: f.raiseException()) + +def main_pbs(args): + cl = ControllerLauncher() + dstart = cl.start() + def start_engines(r): + pbs_set = PBSEngineSet('pbs.template') + print pbs_set.template_file + d = pbs_set.start(args.n) + return d + dstart.addCallback(start_engines) + dstart.addErrback(lambda f: f.printTraceback()) + + +def get_args(): + parser = argparse.ArgumentParser( + description='IPython cluster startup') + newopt = parser.add_argument # shorthand - if sys.platform=='win32': - print """ipcluster does not work on Microsoft Windows. Please start -your IPython cluster using the ipcontroller and ipengine scripts.""" - sys.exit(1) + subparsers = parser.add_subparsers(help='sub-command help') - opt,arg = parse_args() + parser_local = subparsers.add_parser('local', help='run a local cluster') + parser_local.add_argument("--logdir", type=str, dest="logdir", + help="directory to put log files (default=$IPYTHONDIR/log)", + default=pjoin(get_ipython_dir(),'log')) + parser_local.add_argument("-n", "--num", type=int, dest="n",default=2, + help="the number of engines to start") + parser_local.set_defaults(func=main_local) + + parser_local = subparsers.add_parser('mpirun', help='run a cluster using mpirun') + parser_local.add_argument("--logdir", type=str, dest="logdir", + help="directory to put log files (default=$IPYTHONDIR/log)", + default=pjoin(get_ipython_dir(),'log')) + parser_local.add_argument("-n", "--num", type=int, dest="n",default=2, + help="the number of engines to start") + parser_local.add_argument("--mpi", type=str, dest="mpi",default='mpi4py', + help="how to call MPI_Init (default=mpi4py)") + parser_local.set_defaults(func=main_mpirun) + + parser_pbs = subparsers.add_parser('pbs', help='run a pbs cluster') + parser_pbs.add_argument('--pbs-script', type=str, dest='pbsscript', + help='PBS script template') + parser_pbs.set_defaults(func=main_pbs) + args = parser.parse_args() + return args - clusterfile = opt.clusterfile - if clusterfile: - clusterRemote(opt,arg) - else: - clusterLocal(opt,arg) - - -if __name__=='__main__': +def main(): + args = get_args() + reactor.callWhenRunning(args.func, args) + log.startLogging(sys.stdout) + reactor.run() + +if __name__ == '__main__': main()