|
|
#!/usr/bin/env python
|
|
|
# encoding: utf-8
|
|
|
|
|
|
"""Start an IPython cluster = (controller + engines)."""
|
|
|
|
|
|
#-----------------------------------------------------------------------------
|
|
|
# Copyright (C) 2008 The IPython Development Team
|
|
|
#
|
|
|
# Distributed under the terms of the BSD License. The full license is in
|
|
|
# the file COPYING, distributed as part of this software.
|
|
|
#-----------------------------------------------------------------------------
|
|
|
|
|
|
#-----------------------------------------------------------------------------
|
|
|
# Imports
|
|
|
#-----------------------------------------------------------------------------
|
|
|
|
|
|
import os
|
|
|
import re
|
|
|
import sys
|
|
|
import signal
|
|
|
pjoin = os.path.join
|
|
|
|
|
|
from twisted.internet import reactor, defer
|
|
|
from twisted.internet.protocol import ProcessProtocol
|
|
|
from twisted.internet.error import ProcessDone, ProcessTerminated
|
|
|
from twisted.internet.utils import getProcessOutput
|
|
|
from twisted.python import failure, log
|
|
|
|
|
|
from IPython.external import argparse
|
|
|
from IPython.external import Itpl
|
|
|
from IPython.genutils import get_ipython_dir, num_cpus
|
|
|
from IPython.kernel.fcutil import have_crypto
|
|
|
from IPython.kernel.error import SecurityError
|
|
|
from IPython.kernel.fcutil import have_crypto
|
|
|
from IPython.kernel.twistedutil import gatherBoth
|
|
|
from IPython.kernel.util import printer
|
|
|
|
|
|
|
|
|
#-----------------------------------------------------------------------------
|
|
|
# General process handling code
|
|
|
#-----------------------------------------------------------------------------
|
|
|
|
|
|
def find_exe(cmd):
|
|
|
try:
|
|
|
import win32api
|
|
|
except ImportError:
|
|
|
raise ImportError('you need to have pywin32 installed for this to work')
|
|
|
else:
|
|
|
try:
|
|
|
(path, offest) = win32api.SearchPath(os.environ['PATH'],cmd + '.exe')
|
|
|
except:
|
|
|
(path, offset) = win32api.SearchPath(os.environ['PATH'],cmd + '.bat')
|
|
|
return path
|
|
|
|
|
|
class ProcessStateError(Exception):
|
|
|
pass
|
|
|
|
|
|
class UnknownStatus(Exception):
|
|
|
pass
|
|
|
|
|
|
class LauncherProcessProtocol(ProcessProtocol):
|
|
|
"""
|
|
|
A ProcessProtocol to go with the ProcessLauncher.
|
|
|
"""
|
|
|
def __init__(self, process_launcher):
|
|
|
self.process_launcher = process_launcher
|
|
|
|
|
|
def connectionMade(self):
|
|
|
self.process_launcher.fire_start_deferred(self.transport.pid)
|
|
|
|
|
|
def processEnded(self, status):
|
|
|
value = status.value
|
|
|
if isinstance(value, ProcessDone):
|
|
|
self.process_launcher.fire_stop_deferred(0)
|
|
|
elif isinstance(value, ProcessTerminated):
|
|
|
self.process_launcher.fire_stop_deferred(
|
|
|
{'exit_code':value.exitCode,
|
|
|
'signal':value.signal,
|
|
|
'status':value.status
|
|
|
}
|
|
|
)
|
|
|
else:
|
|
|
raise UnknownStatus("unknown exit status, this is probably a bug in Twisted")
|
|
|
|
|
|
def outReceived(self, data):
|
|
|
log.msg(data)
|
|
|
|
|
|
def errReceived(self, data):
|
|
|
log.err(data)
|
|
|
|
|
|
class ProcessLauncher(object):
|
|
|
"""
|
|
|
Start and stop an external process in an asynchronous manner.
|
|
|
|
|
|
Currently this uses deferreds to notify other parties of process state
|
|
|
changes. This is an awkward design and should be moved to using
|
|
|
a formal NotificationCenter.
|
|
|
"""
|
|
|
def __init__(self, cmd_and_args):
|
|
|
self.cmd = cmd_and_args[0]
|
|
|
self.args = cmd_and_args
|
|
|
self._reset()
|
|
|
|
|
|
def _reset(self):
|
|
|
self.process_protocol = None
|
|
|
self.pid = None
|
|
|
self.start_deferred = None
|
|
|
self.stop_deferreds = []
|
|
|
self.state = 'before' # before, running, or after
|
|
|
|
|
|
@property
|
|
|
def running(self):
|
|
|
if self.state == 'running':
|
|
|
return True
|
|
|
else:
|
|
|
return False
|
|
|
|
|
|
def fire_start_deferred(self, pid):
|
|
|
self.pid = pid
|
|
|
self.state = 'running'
|
|
|
log.msg('Process %r has started with pid=%i' % (self.args, pid))
|
|
|
self.start_deferred.callback(pid)
|
|
|
|
|
|
def start(self):
|
|
|
if self.state == 'before':
|
|
|
self.process_protocol = LauncherProcessProtocol(self)
|
|
|
self.start_deferred = defer.Deferred()
|
|
|
self.process_transport = reactor.spawnProcess(
|
|
|
self.process_protocol,
|
|
|
self.cmd,
|
|
|
self.args,
|
|
|
env=os.environ
|
|
|
)
|
|
|
return self.start_deferred
|
|
|
else:
|
|
|
s = 'the process has already been started and has state: %r' % \
|
|
|
self.state
|
|
|
return defer.fail(ProcessStateError(s))
|
|
|
|
|
|
def get_stop_deferred(self):
|
|
|
if self.state == 'running' or self.state == 'before':
|
|
|
d = defer.Deferred()
|
|
|
self.stop_deferreds.append(d)
|
|
|
return d
|
|
|
else:
|
|
|
s = 'this process is already complete'
|
|
|
return defer.fail(ProcessStateError(s))
|
|
|
|
|
|
def fire_stop_deferred(self, exit_code):
|
|
|
log.msg('Process %r has stopped with %r' % (self.args, exit_code))
|
|
|
self.state = 'after'
|
|
|
for d in self.stop_deferreds:
|
|
|
d.callback(exit_code)
|
|
|
|
|
|
def signal(self, sig):
|
|
|
"""
|
|
|
Send a signal to the process.
|
|
|
|
|
|
The argument sig can be ('KILL','INT', etc.) or any signal number.
|
|
|
"""
|
|
|
if self.state == 'running':
|
|
|
self.process_transport.signalProcess(sig)
|
|
|
|
|
|
# def __del__(self):
|
|
|
# self.signal('KILL')
|
|
|
|
|
|
def interrupt_then_kill(self, delay=1.0):
|
|
|
self.signal('INT')
|
|
|
reactor.callLater(delay, self.signal, 'KILL')
|
|
|
|
|
|
|
|
|
#-----------------------------------------------------------------------------
|
|
|
# Code for launching controller and engines
|
|
|
#-----------------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
class ControllerLauncher(ProcessLauncher):
|
|
|
|
|
|
def __init__(self, extra_args=None):
|
|
|
if sys.platform == 'win32':
|
|
|
# This logic is needed because the ipcontroller script doesn't
|
|
|
# always get installed in the same way or in the same location.
|
|
|
from IPython.kernel.scripts import ipcontroller
|
|
|
script_location = ipcontroller.__file__.replace('.pyc', '.py')
|
|
|
# The -u option here turns on unbuffered output, which is required
|
|
|
# on Win32 to prevent wierd conflict and problems with Twisted
|
|
|
args = [find_exe('python'), '-u', script_location]
|
|
|
else:
|
|
|
args = ['ipcontroller']
|
|
|
self.extra_args = extra_args
|
|
|
if extra_args is not None:
|
|
|
args.extend(extra_args)
|
|
|
|
|
|
ProcessLauncher.__init__(self, args)
|
|
|
|
|
|
|
|
|
class EngineLauncher(ProcessLauncher):
|
|
|
|
|
|
def __init__(self, extra_args=None):
|
|
|
if sys.platform == 'win32':
|
|
|
# This logic is needed because the ipcontroller script doesn't
|
|
|
# always get installed in the same way or in the same location.
|
|
|
from IPython.kernel.scripts import ipengine
|
|
|
script_location = ipengine.__file__.replace('.pyc', '.py')
|
|
|
# The -u option here turns on unbuffered output, which is required
|
|
|
# on Win32 to prevent wierd conflict and problems with Twisted
|
|
|
args = [find_exe('python'), '-u', script_location]
|
|
|
else:
|
|
|
args = ['ipengine']
|
|
|
self.extra_args = extra_args
|
|
|
if extra_args is not None:
|
|
|
args.extend(extra_args)
|
|
|
|
|
|
ProcessLauncher.__init__(self, args)
|
|
|
|
|
|
|
|
|
class LocalEngineSet(object):
|
|
|
|
|
|
def __init__(self, extra_args=None):
|
|
|
self.extra_args = extra_args
|
|
|
self.launchers = []
|
|
|
|
|
|
def start(self, n):
|
|
|
dlist = []
|
|
|
for i in range(n):
|
|
|
el = EngineLauncher(extra_args=self.extra_args)
|
|
|
d = el.start()
|
|
|
self.launchers.append(el)
|
|
|
dlist.append(d)
|
|
|
dfinal = gatherBoth(dlist, consumeErrors=True)
|
|
|
dfinal.addCallback(self._handle_start)
|
|
|
return dfinal
|
|
|
|
|
|
def _handle_start(self, r):
|
|
|
log.msg('Engines started with pids: %r' % r)
|
|
|
return r
|
|
|
|
|
|
def _handle_stop(self, r):
|
|
|
log.msg('Engines received signal: %r' % r)
|
|
|
return r
|
|
|
|
|
|
def signal(self, sig):
|
|
|
dlist = []
|
|
|
for el in self.launchers:
|
|
|
d = el.get_stop_deferred()
|
|
|
dlist.append(d)
|
|
|
el.signal(sig)
|
|
|
dfinal = gatherBoth(dlist, consumeErrors=True)
|
|
|
dfinal.addCallback(self._handle_stop)
|
|
|
return dfinal
|
|
|
|
|
|
def interrupt_then_kill(self, delay=1.0):
|
|
|
dlist = []
|
|
|
for el in self.launchers:
|
|
|
d = el.get_stop_deferred()
|
|
|
dlist.append(d)
|
|
|
el.interrupt_then_kill(delay)
|
|
|
dfinal = gatherBoth(dlist, consumeErrors=True)
|
|
|
dfinal.addCallback(self._handle_stop)
|
|
|
return dfinal
|
|
|
|
|
|
|
|
|
class BatchEngineSet(object):
|
|
|
|
|
|
# Subclasses must fill these in. See PBSEngineSet
|
|
|
submit_command = ''
|
|
|
delete_command = ''
|
|
|
job_id_regexp = ''
|
|
|
|
|
|
def __init__(self, template_file, **kwargs):
|
|
|
self.template_file = template_file
|
|
|
self.context = {}
|
|
|
self.context.update(kwargs)
|
|
|
self.batch_file = self.template_file+'-run'
|
|
|
|
|
|
def parse_job_id(self, output):
|
|
|
m = re.match(self.job_id_regexp, output)
|
|
|
if m is not None:
|
|
|
job_id = m.group()
|
|
|
else:
|
|
|
raise Exception("job id couldn't be determined: %s" % output)
|
|
|
self.job_id = job_id
|
|
|
log.msg('Job started with job id: %r' % job_id)
|
|
|
return job_id
|
|
|
|
|
|
def write_batch_script(self, n):
|
|
|
self.context['n'] = n
|
|
|
template = open(self.template_file, 'r').read()
|
|
|
log.msg('Using template for batch script: %s' % self.template_file)
|
|
|
script_as_string = Itpl.itplns(template, self.context)
|
|
|
log.msg('Writing instantiated batch script: %s' % self.batch_file)
|
|
|
f = open(self.batch_file,'w')
|
|
|
f.write(script_as_string)
|
|
|
f.close()
|
|
|
|
|
|
def handle_error(self, f):
|
|
|
f.printTraceback()
|
|
|
f.raiseException()
|
|
|
|
|
|
def start(self, n):
|
|
|
self.write_batch_script(n)
|
|
|
d = getProcessOutput(self.submit_command,
|
|
|
[self.batch_file],env=os.environ)
|
|
|
d.addCallback(self.parse_job_id)
|
|
|
d.addErrback(self.handle_error)
|
|
|
return d
|
|
|
|
|
|
def kill(self):
|
|
|
d = getProcessOutput(self.delete_command,
|
|
|
[self.job_id],env=os.environ)
|
|
|
return d
|
|
|
|
|
|
class PBSEngineSet(BatchEngineSet):
|
|
|
|
|
|
submit_command = 'qsub'
|
|
|
delete_command = 'qdel'
|
|
|
job_id_regexp = '\d+'
|
|
|
|
|
|
def __init__(self, template_file, **kwargs):
|
|
|
BatchEngineSet.__init__(self, template_file, **kwargs)
|
|
|
|
|
|
|
|
|
#-----------------------------------------------------------------------------
|
|
|
# Main functions for the different types of clusters
|
|
|
#-----------------------------------------------------------------------------
|
|
|
|
|
|
# TODO:
|
|
|
# The logic in these codes should be moved into classes like LocalCluster
|
|
|
# MpirunCluster, PBSCluster, etc. This would remove alot of the duplications.
|
|
|
# The main functions should then just parse the command line arguments, create
|
|
|
# the appropriate class and call a 'start' method.
|
|
|
|
|
|
def check_security(args, cont_args):
|
|
|
if (not args.x or not args.y) and not have_crypto:
|
|
|
log.err("""
|
|
|
OpenSSL/pyOpenSSL is not available, so we can't run in secure mode.
|
|
|
Try running ipcluster with the -xy flags: ipcluster local -xy -n 4""")
|
|
|
reactor.stop()
|
|
|
return False
|
|
|
if args.x:
|
|
|
cont_args.append('-x')
|
|
|
if args.y:
|
|
|
cont_args.append('-y')
|
|
|
return True
|
|
|
|
|
|
def main_local(args):
|
|
|
cont_args = []
|
|
|
cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
|
|
|
|
|
|
# Check security settings before proceeding
|
|
|
if not check_security(args, cont_args):
|
|
|
return
|
|
|
|
|
|
cl = ControllerLauncher(extra_args=cont_args)
|
|
|
dstart = cl.start()
|
|
|
def start_engines(cont_pid):
|
|
|
engine_args = []
|
|
|
engine_args.append('--logfile=%s' % \
|
|
|
pjoin(args.logdir,'ipengine%s-' % cont_pid))
|
|
|
eset = LocalEngineSet(extra_args=engine_args)
|
|
|
def shutdown(signum, frame):
|
|
|
log.msg('Stopping local cluster')
|
|
|
# We are still playing with the times here, but these seem
|
|
|
# to be reliable in allowing everything to exit cleanly.
|
|
|
eset.interrupt_then_kill(0.5)
|
|
|
cl.interrupt_then_kill(0.5)
|
|
|
reactor.callLater(1.0, reactor.stop)
|
|
|
signal.signal(signal.SIGINT,shutdown)
|
|
|
d = eset.start(args.n)
|
|
|
return d
|
|
|
def delay_start(cont_pid):
|
|
|
# This is needed because the controller doesn't start listening
|
|
|
# right when it starts and the controller needs to write
|
|
|
# furl files for the engine to pick up
|
|
|
reactor.callLater(1.0, start_engines, cont_pid)
|
|
|
dstart.addCallback(delay_start)
|
|
|
dstart.addErrback(lambda f: f.raiseException())
|
|
|
|
|
|
def main_mpirun(args):
|
|
|
cont_args = []
|
|
|
cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
|
|
|
|
|
|
# Check security settings before proceeding
|
|
|
if not check_security(args, cont_args):
|
|
|
return
|
|
|
|
|
|
cl = ControllerLauncher(extra_args=cont_args)
|
|
|
dstart = cl.start()
|
|
|
def start_engines(cont_pid):
|
|
|
raw_args = ['mpirun']
|
|
|
raw_args.extend(['-n',str(args.n)])
|
|
|
raw_args.append('ipengine')
|
|
|
raw_args.append('-l')
|
|
|
raw_args.append(pjoin(args.logdir,'ipengine%s-' % cont_pid))
|
|
|
if args.mpi:
|
|
|
raw_args.append('--mpi=%s' % args.mpi)
|
|
|
eset = ProcessLauncher(raw_args)
|
|
|
def shutdown(signum, frame):
|
|
|
log.msg('Stopping local cluster')
|
|
|
# We are still playing with the times here, but these seem
|
|
|
# to be reliable in allowing everything to exit cleanly.
|
|
|
eset.interrupt_then_kill(1.0)
|
|
|
cl.interrupt_then_kill(1.0)
|
|
|
reactor.callLater(2.0, reactor.stop)
|
|
|
signal.signal(signal.SIGINT,shutdown)
|
|
|
d = eset.start()
|
|
|
return d
|
|
|
def delay_start(cont_pid):
|
|
|
# This is needed because the controller doesn't start listening
|
|
|
# right when it starts and the controller needs to write
|
|
|
# furl files for the engine to pick up
|
|
|
reactor.callLater(1.0, start_engines, cont_pid)
|
|
|
dstart.addCallback(delay_start)
|
|
|
dstart.addErrback(lambda f: f.raiseException())
|
|
|
|
|
|
def main_pbs(args):
|
|
|
cont_args = []
|
|
|
cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
|
|
|
|
|
|
# Check security settings before proceeding
|
|
|
if not check_security(args, cont_args):
|
|
|
return
|
|
|
|
|
|
cl = ControllerLauncher(extra_args=cont_args)
|
|
|
dstart = cl.start()
|
|
|
def start_engines(r):
|
|
|
pbs_set = PBSEngineSet(args.pbsscript)
|
|
|
def shutdown(signum, frame):
|
|
|
log.msg('Stopping pbs cluster')
|
|
|
d = pbs_set.kill()
|
|
|
d.addBoth(lambda _: cl.interrupt_then_kill(1.0))
|
|
|
d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop))
|
|
|
signal.signal(signal.SIGINT,shutdown)
|
|
|
d = pbs_set.start(args.n)
|
|
|
return d
|
|
|
dstart.addCallback(start_engines)
|
|
|
dstart.addErrback(lambda f: f.raiseException())
|
|
|
|
|
|
|
|
|
def get_args():
|
|
|
base_parser = argparse.ArgumentParser(add_help=False)
|
|
|
base_parser.add_argument(
|
|
|
'-x',
|
|
|
action='store_true',
|
|
|
dest='x',
|
|
|
help='turn off client security'
|
|
|
)
|
|
|
base_parser.add_argument(
|
|
|
'-y',
|
|
|
action='store_true',
|
|
|
dest='y',
|
|
|
help='turn off engine security'
|
|
|
)
|
|
|
base_parser.add_argument(
|
|
|
"--logdir",
|
|
|
type=str,
|
|
|
dest="logdir",
|
|
|
help="directory to put log files (default=$IPYTHONDIR/log)",
|
|
|
default=pjoin(get_ipython_dir(),'log')
|
|
|
)
|
|
|
base_parser.add_argument(
|
|
|
"-n",
|
|
|
"--num",
|
|
|
type=int,
|
|
|
dest="n",
|
|
|
default=2,
|
|
|
help="the number of engines to start"
|
|
|
)
|
|
|
|
|
|
parser = argparse.ArgumentParser(
|
|
|
description='IPython cluster startup. This starts a controller and\
|
|
|
engines using various approaches. THIS IS A TECHNOLOGY PREVIEW AND\
|
|
|
THE API WILL CHANGE SIGNIFICANTLY BEFORE THE FINAL RELEASE.'
|
|
|
)
|
|
|
subparsers = parser.add_subparsers(
|
|
|
help='available cluster types. For help, do "ipcluster TYPE --help"')
|
|
|
|
|
|
parser_local = subparsers.add_parser(
|
|
|
'local',
|
|
|
help='run a local cluster',
|
|
|
parents=[base_parser]
|
|
|
)
|
|
|
parser_local.set_defaults(func=main_local)
|
|
|
|
|
|
parser_mpirun = subparsers.add_parser(
|
|
|
'mpirun',
|
|
|
help='run a cluster using mpirun',
|
|
|
parents=[base_parser]
|
|
|
)
|
|
|
parser_mpirun.add_argument(
|
|
|
"--mpi",
|
|
|
type=str,
|
|
|
dest="mpi", # Don't put a default here to allow no MPI support
|
|
|
help="how to call MPI_Init (default=mpi4py)"
|
|
|
)
|
|
|
parser_mpirun.set_defaults(func=main_mpirun)
|
|
|
|
|
|
parser_pbs = subparsers.add_parser(
|
|
|
'pbs',
|
|
|
help='run a pbs cluster',
|
|
|
parents=[base_parser]
|
|
|
)
|
|
|
parser_pbs.add_argument(
|
|
|
'--pbs-script',
|
|
|
type=str,
|
|
|
dest='pbsscript',
|
|
|
help='PBS script template',
|
|
|
default='pbs.template'
|
|
|
)
|
|
|
parser_pbs.set_defaults(func=main_pbs)
|
|
|
args = parser.parse_args()
|
|
|
return args
|
|
|
|
|
|
def main():
|
|
|
args = get_args()
|
|
|
reactor.callWhenRunning(args.func, args)
|
|
|
log.startLogging(sys.stdout)
|
|
|
reactor.run()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
main()
|
|
|
|