diff --git a/IPython/config/default/ipcluster_config.py b/IPython/config/default/ipcluster_config.py new file mode 100644 index 0000000..d4ee051 --- /dev/null +++ b/IPython/config/default/ipcluster_config.py @@ -0,0 +1,66 @@ +import os + +c = get_config() + +# Options are: +# * LocalControllerLauncher +# * PBSControllerLauncher +# c.Global.controller_launcher = 'IPython.kernel.launcher.LocalControllerLauncher' + +# Options are: +# * LocalEngineSetLauncher +# * MPIExecEngineSetLauncher +# * PBSEngineSetLauncher +# c.Global.engine_launcher = 'IPython.kernel.launcher.LocalEngineSetLauncher' + +# c.Global.log_to_file = False +# c.Global.n = 2 +# c.Global.reset_config = False + +# c.MPIExecLauncher.mpi_cmd = ['mpiexec'] +# c.MPIExecLauncher.mpi_args = [] +# c.MPIExecLauncher.program = [] +# c.MPIExecLauncher.program_args = [] +# c.MPIExecLauncher.n = 1 + +# c.SSHLauncher.ssh_cmd = ['ssh'] +# c.SSHLauncher.ssh_args = [] +# c.SSHLauncher.program = [] +# s.SSHLauncher.program_args = [] +# c.SSHLauncher.hostname = '' +# c.SSHLauncher.user = os.environ['USER'] + +# c.PBSLauncher.submit_command = 'qsub' +# c.PBSLauncher.delete_command = 'qdel' +# c.PBSLauncher.job_id_regexp = '\d+' +# c.PBSLauncher.batch_template = """""" +# c.PBSLauncher.batch_file_name = u'pbs_batch_script' + +# c.LocalControllerLauncher.controller_args = [] + +# c.MPIExecControllerLauncher.mpi_cmd = ['mpiexec'] +# c.MPIExecControllerLauncher.mpi_args = [] +# c.MPIExecControllerLauncher.controller_args = [] +# c.MPIExecControllerLauncher.n = 1 + +# c.PBSControllerLauncher.submit_command = 'qsub' +# c.PBSControllerLauncher.delete_command = 'qdel' +# c.PBSControllerLauncher.job_id_regexp = '\d+' +# c.PBSControllerLauncher.batch_template = """""" +# c.PBSLauncher.batch_file_name = u'pbs_batch_script' + +# c.LocalEngineLauncher.engine_args = [] + +# c.LocalEngineSetLauncher.engine_args = [] + +# c.MPIExecEngineSetLauncher.mpi_cmd = ['mpiexec'] +# c.MPIExecEngineSetLauncher.mpi_args = [] +# c.MPIExecEngineSetLauncher.controller_args = [] +# c.MPIExecEngineSetLauncher.n = 1 + +# c.PBSEngineSetLauncher.submit_command = 'qsub' +# c.PBSEngineSetLauncher.delete_command = 'qdel' +# c.PBSEngineSetLauncher.job_id_regexp = '\d+' +# c.PBSEngineSetLauncher.batch_template = """""" +# c.PBSEngineSetLauncher.batch_file_name = u'pbs_batch_script' + diff --git a/IPython/kernel/ipclusterapp.py b/IPython/kernel/ipclusterapp.py new file mode 100644 index 0000000..3401375 --- /dev/null +++ b/IPython/kernel/ipclusterapp.py @@ -0,0 +1,283 @@ +#!/usr/bin/env python +# encoding: utf-8 +""" +The ipcluster application. +""" + +#----------------------------------------------------------------------------- +# Copyright (C) 2008-2009 The IPython Development Team +# +# Distributed under the terms of the BSD License. The full license is in +# the file COPYING, distributed as part of this software. +#----------------------------------------------------------------------------- + +#----------------------------------------------------------------------------- +# Imports +#----------------------------------------------------------------------------- + +import logging +import os +import signal +import sys + +from IPython.core import release +from IPython.external import argparse +from IPython.config.loader import ArgParseConfigLoader, NoConfigDefault +from IPython.utils.importstring import import_item + +from IPython.kernel.clusterdir import ( + ApplicationWithClusterDir, ClusterDirError +) + +from twisted.internet import reactor, defer +from twisted.python import log + +#----------------------------------------------------------------------------- +# Code for launchers +#----------------------------------------------------------------------------- + + + +#----------------------------------------------------------------------------- +# The ipcluster application +#----------------------------------------------------------------------------- + + +class IPClusterCLLoader(ArgParseConfigLoader): + + def _add_arguments(self): + # This has all the common options that all subcommands use + parent_parser1 = argparse.ArgumentParser(add_help=False) + parent_parser1.add_argument('-ipythondir', '--ipython-dir', + dest='Global.ipythondir',type=str, + help='Set to override default location of Global.ipythondir.', + default=NoConfigDefault, + metavar='Global.ipythondir') + parent_parser1.add_argument('-log_level', '--log-level', + dest="Global.log_level",type=int, + help='Set the log level (0,10,20,30,40,50). Default is 30.', + default=NoConfigDefault, + metavar='Global.log_level') + + # This has all the common options that other subcommands use + parent_parser2 = argparse.ArgumentParser(add_help=False) + parent_parser2.add_argument('-p','-profile', '--profile', + dest='Global.profile',type=str, + default=NoConfigDefault, + help='The string name of the profile to be used. This determines ' + 'the name of the cluster dir as: cluster_. The default profile ' + 'is named "default". The cluster directory is resolve this way ' + 'if the --cluster-dir option is not used.', + default=NoConfigDefault, + metavar='Global.profile') + parent_parser2.add_argument('-cluster_dir', '--cluster-dir', + dest='Global.cluster_dir',type=str, + default=NoConfigDefault, + help='Set the cluster dir. This overrides the logic used by the ' + '--profile option.', + default=NoConfigDefault, + metavar='Global.cluster_dir') + parent_parser2.add_argument('--log-to-file', + action='store_true', dest='Global.log_to_file', + default=NoConfigDefault, + help='Log to a file in the log directory (default is stdout)' + ) + + subparsers = self.parser.add_subparsers( + dest='Global.subcommand', + title='ipcluster subcommands', + description='ipcluster has a variety of subcommands. ' + 'The general way of running ipcluster is "ipcluster ' + ' [options]""', + help='For more help, type "ipcluster -h"') + + parser_list = subparsers.add_parser( + 'list', + help='List all clusters in cwd and ipythondir.', + parents=[parent_parser1] + ) + + parser_create = subparsers.add_parser( + 'create', + help='Create a new cluster directory.', + parents=[parent_parser1, parent_parser2] + ) + parser_create.add_argument( + '--reset-config', + dest='Global.reset_config', action='store_true', + default=NoConfigDefault, + help='Recopy the default config files to the cluster directory. ' + 'You will loose any modifications you have made to these files.' + ) + + parser_start = subparsers.add_parser( + 'start', + help='Start a cluster.', + parents=[parent_parser1, parent_parser2] + ) + parser_start.add_argument( + '-n', '--number', + type=int, dest='Global.n', + default=NoConfigDefault, + help='The number of engines to start.', + metavar='Global.n' + ) + + +default_config_file_name = 'ipcluster_config.py' + + +class IPClusterApp(ApplicationWithClusterDir): + + name = 'ipcluster' + description = 'Start an IPython cluster (controller and engines).' + config_file_name = default_config_file_name + default_log_level = logging.INFO + auto_create_cluster_dir = False + + def create_default_config(self): + super(IPClusterApp, self).create_default_config() + self.default_config.Global.controller_launcher = \ + 'IPython.kernel.launcher.LocalControllerLauncher' + self.default_config.Global.engine_launcher = \ + 'IPython.kernel.launcher.LocalEngineSetLauncher' + self.default_config.Global.log_to_file = False + self.default_config.Global.n = 2 + self.default_config.Global.reset_config = False + + def create_command_line_config(self): + """Create and return a command line config loader.""" + return IPClusterCLLoader( + description=self.description, + version=release.version + ) + + def find_resources(self): + subcommand = self.command_line_config.Global.subcommand + if subcommand=='list': + self.list_cluster_dirs() + # Exit immediately because there is nothing left to do. + self.exit() + elif subcommand=='create': + self.auto_create_cluster_dir = True + super(IPClusterApp, self).find_resources() + elif subcommand=='start': + self.auto_create_cluster_dir = False + try: + super(IPClusterApp, self).find_resources() + except ClusterDirError: + raise ClusterDirError( + "Could not find a cluster directory. A cluster dir must " + "be created before running 'ipcluster start'. Do " + "'ipcluster create -h' or 'ipcluster list -h' for more " + "information about creating and listing cluster dirs." + ) + def construct(self): + config = self.master_config + if config.Global.subcommand=='list': + pass + elif config.Global.subcommand=='create': + self.log.info('Copying default config files to cluster directory ' + '[overwrite=%r]' % (config.Global.reset_config,)) + self.cluster_dir_obj.copy_all_config_files(overwrite=config.Global.reset_config) + elif config.Global.subcommand=='start': + self.start_logging() + reactor.callWhenRunning(self.start_launchers) + + def list_cluster_dirs(self): + cluster_dir_paths = os.environ.get('IPCLUSTERDIR_PATH','') + if cluster_dir_paths: + cluster_dir_paths = cluster_dir_paths.split(':') + else: + cluster_dir_paths = [] + # We need to look both in default_config and command_line_config!!! + paths = [os.getcwd(), self.default_config.Global.ipythondir] + \ + cluster_dir_paths + self.log.info('Searching for cluster dirs in paths: %r' % paths) + for path in paths: + files = os.listdir(path) + for f in files: + full_path = os.path.join(path, f) + if os.path.isdir(full_path) and f.startswith('cluster_'): + profile = full_path.split('_')[-1] + start_cmd = '"ipcluster start -n 4 -p %s"' % profile + print start_cmd + " ==> " + full_path + + def start_logging(self): + if self.master_config.Global.log_to_file: + log_filename = self.name + '-' + str(os.getpid()) + '.log' + logfile = os.path.join(self.log_dir, log_filename) + open_log_file = open(logfile, 'w') + else: + open_log_file = sys.stdout + log.startLogging(open_log_file) + + def start_launchers(self): + config = self.master_config + + # Create the launchers + el_class = import_item(config.Global.engine_launcher) + self.engine_launcher = el_class( + self.cluster_dir, config=config + ) + cl_class = import_item(config.Global.controller_launcher) + self.controller_launcher = cl_class( + self.cluster_dir, config=config + ) + + # Setup signals + signal.signal(signal.SIGINT, self.stop_launchers) + + # Setup the observing of stopping + d1 = self.controller_launcher.observe_stop() + d1.addCallback(self.stop_engines) + d1.addErrback(self.err_and_stop) + # If this triggers, just let them die + # d2 = self.engine_launcher.observe_stop() + + # Start the controller and engines + d = self.controller_launcher.start( + profile=None, cluster_dir=config.Global.cluster_dir + ) + d.addCallback(lambda _: self.start_engines()) + d.addErrback(self.err_and_stop) + + def err_and_stop(self, f): + log.msg('Unexpected error in ipcluster:') + log.err(f) + reactor.stop() + + def stop_engines(self, r): + return self.engine_launcher.stop() + + def start_engines(self): + config = self.master_config + d = self.engine_launcher.start( + config.Global.n, + profile=None, cluster_dir=config.Global.cluster_dir + ) + return d + + def stop_launchers(self, signum, frame): + log.msg("Stopping cluster") + d1 = self.engine_launcher.stop() + d1.addCallback(lambda _: self.controller_launcher.stop) + d1.addErrback(self.err_and_stop) + reactor.callLater(2.0, reactor.stop) + + def start_app(self): + config = self.master_config + if config.Global.subcommand=='create' or config.Global.subcommand=='list': + return + elif config.Global.subcommand=='start': + reactor.run() + + +def launch_new_instance(): + """Create and run the IPython cluster.""" + app = IPClusterApp() + app.start() + + +if __name__ == '__main__': + launch_new_instance() \ No newline at end of file diff --git a/IPython/kernel/launcher.py b/IPython/kernel/launcher.py new file mode 100644 index 0000000..9fe0826 --- /dev/null +++ b/IPython/kernel/launcher.py @@ -0,0 +1,585 @@ +#!/usr/bin/env python +# encoding: utf-8 +""" +Facilities for launching processing asynchronously. +""" + +#----------------------------------------------------------------------------- +# Copyright (C) 2008-2009 The IPython Development Team +# +# Distributed under the terms of the BSD License. The full license is in +# the file COPYING, distributed as part of this software. +#----------------------------------------------------------------------------- + +#----------------------------------------------------------------------------- +# Imports +#----------------------------------------------------------------------------- + +import os +import re +import sys + +from IPython.core.component import Component +from IPython.external import Itpl +from IPython.utils.traitlets import Str, Int, List, Unicode +from IPython.kernel.twistedutil import gatherBoth, make_deferred, sleep_deferred + +from twisted.internet import reactor, defer +from twisted.internet.defer import inlineCallbacks +from twisted.internet.protocol import ProcessProtocol +from twisted.internet.utils import getProcessOutput +from twisted.internet.error import ProcessDone, ProcessTerminated +from twisted.python import log +from twisted.python.failure import Failure + +#----------------------------------------------------------------------------- +# Generic launchers +#----------------------------------------------------------------------------- + + +class LauncherError(Exception): + pass + + +class ProcessStateError(LauncherError): + pass + + +class UnknownStatus(LauncherError): + pass + + +class BaseLauncher(Component): + """An asbtraction for starting, stopping and signaling a process.""" + + working_dir = Unicode(u'') + + def __init__(self, working_dir, parent=None, name=None, config=None): + super(BaseLauncher, self).__init__(parent, name, config) + self.working_dir = working_dir + self.state = 'before' # can be before, running, after + self.stop_deferreds = [] + self.start_data = None + self.stop_data = None + + @property + def args(self): + """A list of cmd and args that will be used to start the process.""" + return self.find_args() + + def find_args(self): + """The ``.args`` property calls this to find the args list.""" + raise NotImplementedError('find_args must be implemented in a subclass') + + @property + def arg_str(self): + """The string form of the program arguments.""" + return ' '.join(self.args) + + @property + def running(self): + if self.state == 'running': + return True + else: + return False + + def start(self): + """Start the process. + + This must return a deferred that fires with information about the + process starting (like a pid, job id, etc.) + """ + return defer.fail( + Failure(NotImplementedError( + 'start must be implemented in a subclass') + ) + ) + + def stop(self): + """Stop the process and notify observers of ProcessStopped. + + This must return a deferred that fires with any errors that occur + while the process is attempting to be shut down. This deferred + won't fire when the process actually stops. These events are + handled by calling :func:`observe_stop`. + """ + return defer.fail( + Failure(NotImplementedError( + 'stop must be implemented in a subclass') + ) + ) + + def observe_stop(self): + """Get a deferred that will fire when the process stops. + + The deferred will fire with data that contains information about + the exit status of the process. + """ + if self.state=='after': + return defer.succeed(self.stop_data) + else: + d = defer.Deferred() + self.stop_deferreds.append(d) + return d + + def notify_start(self, data): + """Call this to tigger startup actions. + + This logs the process startup and sets the state to running. It is + a pass-through so it can be used as a callback. + """ + + log.msg('Process %r started: %r' % (self.args[0], data)) + self.start_data = data + self.state = 'running' + return data + + def notify_stop(self, data): + """Call this to trigger all the deferreds from :func:`observe_stop`.""" + + log.msg('Process %r stopped: %r' % (self.args[0], data)) + self.stop_data = data + self.state = 'after' + for i in range(len(self.stop_deferreds)): + d = self.stop_deferreds.pop() + d.callback(data) + return data + + def signal(self, sig): + """Signal the process. + + Return a semi-meaningless deferred after signaling the process. + + Parameters + ---------- + sig : str or int + 'KILL', 'INT', etc., or any signal number + """ + return defer.fail( + Failure(NotImplementedError( + 'signal must be implemented in a subclass') + ) + ) + + +class LocalProcessLauncherProtocol(ProcessProtocol): + """A ProcessProtocol to go with the LocalProcessLauncher.""" + + def __init__(self, process_launcher): + self.process_launcher = process_launcher + self.pid = None + + def connectionMade(self): + self.pid = self.transport.pid + self.process_launcher.notify_start(self.transport.pid) + + def processEnded(self, status): + value = status.value + if isinstance(value, ProcessDone): + self.process_launcher.notify_stop( + {'exit_code':0, + 'signal':None, + 'status':None, + 'pid':self.pid + } + ) + elif isinstance(value, ProcessTerminated): + self.process_launcher.notify_stop( + {'exit_code':value.exitCode, + 'signal':value.signal, + 'status':value.status, + 'pid':self.pid + } + ) + else: + raise UnknownStatus("Unknown exit status, this is probably a " + "bug in Twisted") + + def outReceived(self, data): + log.msg(data) + + def errReceived(self, data): + log.err(data) + + +class LocalProcessLauncher(BaseLauncher): + """Start and stop an external process in an asynchronous manner.""" + + cmd_and_args = List([]) + + def __init__(self, working_dir, parent=None, name=None, config=None): + super(LocalProcessLauncher, self).__init__( + working_dir, parent, name, config + ) + self.process_protocol = None + self.start_deferred = None + + def find_args(self): + return self.cmd_and_args + + def start(self): + if self.state == 'before': + self.process_protocol = LocalProcessLauncherProtocol(self) + self.start_deferred = defer.Deferred() + self.process_transport = reactor.spawnProcess( + self.process_protocol, + str(self.args[0]), + [str(a) for a in self.args], + env=os.environ + ) + return self.start_deferred + else: + s = 'The process was already started and has state: %r' % self.state + return defer.fail(ProcessStateError(s)) + + def notify_start(self, data): + super(LocalProcessLauncher, self).notify_start(data) + self.start_deferred.callback(data) + + def stop(self): + return self.interrupt_then_kill() + + @make_deferred + def signal(self, sig): + if self.state == 'running': + self.process_transport.signalProcess(sig) + + @inlineCallbacks + def interrupt_then_kill(self, delay=1.0): + yield self.signal('INT') + yield sleep_deferred(delay) + yield self.signal('KILL') + + +class MPIExecLauncher(LocalProcessLauncher): + + mpi_cmd = List(['mpiexec'], config=True) + mpi_args = List([], config=True) + program = List(['date'], config=True) + program_args = List([], config=True) + n = Int(1, config=True) + + def find_args(self): + return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \ + self.program + self.program_args + + def start(self, n): + self.n = n + return super(MPIExecLauncher, self).start() + + +class SSHLauncher(BaseLauncher): + """A minimal launcher for ssh. + + To be useful this will probably have to be extended to use the ``sshx`` + idea for environment variables. There could be other things this needs + as well. + """ + + ssh_cmd = List(['ssh'], config=True) + ssh_args = List([], config=True) + program = List(['date'], config=True) + program_args = List([], config=True) + hostname = Str('', config=True) + user = Str(os.environ['USER'], config=True) + location = Str('') + + def _hostname_changed(self, name, old, new): + self.location = '%s@%s' % (self.user, new) + + def _user_changed(self, name, old, new): + self.location = '%s@%s' % (new, self.hostname) + + def find_args(self): + return self.ssh_cmd + self.ssh_args + [self.location] + \ + self.program + self.program_args + + def start(self, n, hostname=None, user=None): + if hostname is not None: + self.hostname = hostname + if user is not None: + self.user = user + return super(SSHLauncher, self).start() + + +class WindowsHPCLauncher(BaseLauncher): + pass + + +class BatchSystemLauncher(BaseLauncher): + + # Subclasses must fill these in. See PBSEngineSet + submit_command = Str('', config=True) + delete_command = Str('', config=True) + job_id_regexp = Str('', config=True) + batch_template = Str('', config=True) + batch_file_name = Unicode(u'batch_script', config=True) + batch_file = Unicode(u'') + + def __init__(self, working_dir, parent=None, name=None, config=None): + super(BatchSystemLauncher, self).__init__( + working_dir, parent, name, config + ) + self.batch_file = os.path.join(self.working_dir, self.batch_file_name) + self.context = {} + + def parse_job_id(self, output): + m = re.match(self.job_id_regexp, output) + if m is not None: + job_id = m.group() + else: + raise LauncherError("Job id couldn't be determined: %s" % output) + self.job_id = job_id + log.msg('Job started with job id: %r' % job_id) + return job_id + + def write_batch_script(self, n): + self.context['n'] = n + script_as_string = Itpl.itplns(self.batch_template, self.context) + log.msg('Writing instantiated batch script: %s' % self.batch_file) + f = open(self.batch_file, 'w') + f.write(script_as_string) + f.close() + + @inlineCallbacks + def start(self, n): + """Start n copies of the process using a batch system.""" + self.write_batch_script(n) + output = yield getProcessOutput(self.submit_command, + [self.batch_file], env=os.environ) + job_id = self.parse_job_id(output) + self.notify_start(job_id) + defer.returnValue(job_id) + + @inlineCallbacks + def stop(self): + output = yield getProcessOutput(self.delete_command, + [self.job_id], env=os.environ + ) + self.notify_stop(output) # Pass the output of the kill cmd + defer.returnValue(output) + + +class PBSLauncher(BatchSystemLauncher): + + submit_command = Str('qsub', config=True) + delete_command = Str('qdel', config=True) + job_id_regexp = Str('\d+', config=True) + batch_template = Str('', config=True) + batch_file_name = Unicode(u'pbs_batch_script', config=True) + batch_file = Unicode(u'') + + +#----------------------------------------------------------------------------- +# Controller launchers +#----------------------------------------------------------------------------- + +def find_controller_cmd(): + if sys.platform == 'win32': + # This logic is needed because the ipcontroller script doesn't + # always get installed in the same way or in the same location. + from IPython.kernel import ipcontrollerapp + script_location = ipcontrollerapp.__file__.replace('.pyc', '.py') + # The -u option here turns on unbuffered output, which is required + # on Win32 to prevent wierd conflict and problems with Twisted. + # Also, use sys.executable to make sure we are picking up the + # right python exe. + cmd = [sys.executable, '-u', script_location] + else: + # ipcontroller has to be on the PATH in this case. + cmd = ['ipcontroller'] + return cmd + + +class LocalControllerLauncher(LocalProcessLauncher): + + controller_cmd = List(find_controller_cmd()) + controller_args = List(['--log-to-file','--log-level', '40'], config=True) + + def find_args(self): + return self.controller_cmd + self.controller_args + + def start(self, profile=None, cluster_dir=None): + if cluster_dir is not None: + self.controller_args.extend(['--cluster-dir', cluster_dir]) + if profile is not None: + self.controller_args.extend(['--profile', profile]) + log.msg("Starting LocalControllerLauncher: %r" % self.args) + return super(LocalControllerLauncher, self).start() + + +class WindowsHPCControllerLauncher(WindowsHPCLauncher): + pass + + +class MPIExecControllerLauncher(MPIExecLauncher): + + controller_cmd = List(find_controller_cmd(), config=False) + controller_args = List(['--log-to-file','--log-level', '40'], config=True) + n = Int(1, config=False) + + def start(self, profile=None, cluster_dir=None): + if cluster_dir is not None: + self.controller_args.extend(['--cluster-dir', cluster_dir]) + if profile is not None: + self.controller_args.extend(['--profile', profile]) + log.msg("Starting MPIExecControllerLauncher: %r" % self.args) + return super(MPIExecControllerLauncher, self).start(1) + + + def find_args(self): + return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \ + self.controller_cmd + self.controller_args + + +class PBSControllerLauncher(PBSLauncher): + + def start(self, profile=None, cluster_dir=None): + # Here we save profile and cluster_dir in the context so they + # can be used in the batch script template as ${profile} and + # ${cluster_dir} + if cluster_dir is not None: + self.context['cluster_dir'] = cluster_dir + if profile is not None: + self.context['profile'] = profile + log.msg("Starting PBSControllerLauncher: %r" % self.args) + return super(PBSControllerLauncher, self).start(1) + + +class SSHControllerLauncher(SSHLauncher): + pass + + +#----------------------------------------------------------------------------- +# Engine launchers +#----------------------------------------------------------------------------- + + +def find_engine_cmd(): + if sys.platform == 'win32': + # This logic is needed because the ipengine script doesn't + # always get installed in the same way or in the same location. + from IPython.kernel import ipengineapp + script_location = ipengineapp.__file__.replace('.pyc', '.py') + # The -u option here turns on unbuffered output, which is required + # on Win32 to prevent wierd conflict and problems with Twisted. + # Also, use sys.executable to make sure we are picking up the + # right python exe. + cmd = [sys.executable, '-u', script_location] + else: + # ipcontroller has to be on the PATH in this case. + cmd = ['ipengine'] + return cmd + + +class LocalEngineLauncher(LocalProcessLauncher): + + engine_cmd = List(find_engine_cmd()) + engine_args = List(['--log-to-file','--log-level', '40'], config=True) + + def find_args(self): + return self.engine_cmd + self.engine_args + + def start(self, profile=None, cluster_dir=None): + if cluster_dir is not None: + self.engine_args.extend(['--cluster-dir', cluster_dir]) + if profile is not None: + self.engine_args.extend(['--profile', profile]) + return super(LocalEngineLauncher, self).start() + + +class LocalEngineSetLauncher(BaseLauncher): + + engine_args = List(['--log-to-file','--log-level', '40'], config=True) + + def __init__(self, working_dir, parent=None, name=None, config=None): + super(LocalEngineSetLauncher, self).__init__( + working_dir, parent, name, config + ) + self.launchers = [] + + def start(self, n, profile=None, cluster_dir=None): + dlist = [] + for i in range(n): + el = LocalEngineLauncher(self.working_dir, self) + # Copy the engine args over to each engine launcher. + import copy + el.engine_args = copy.deepcopy(self.engine_args) + d = el.start(profile, cluster_dir) + if i==0: + log.msg("Starting LocalEngineSetLauncher: %r" % el.args) + self.launchers.append(el) + dlist.append(d) + # The consumeErrors here could be dangerous + dfinal = gatherBoth(dlist, consumeErrors=True) + dfinal.addCallback(self.notify_start) + return dfinal + + def find_args(self): + return ['engine set'] + + def signal(self, sig): + dlist = [] + for el in self.launchers: + d = el.signal(sig) + dlist.append(d) + dfinal = gatherBoth(dlist, consumeErrors=True) + return dfinal + + def interrupt_then_kill(self, delay=1.0): + dlist = [] + for el in self.launchers: + d = el.interrupt_then_kill(delay) + dlist.append(d) + dfinal = gatherBoth(dlist, consumeErrors=True) + return dfinal + + def stop(self): + return self.interrupt_then_kill() + + def observe_stop(self): + dlist = [el.observe_stop() for el in self.launchers] + dfinal = gatherBoth(dlist, consumeErrors=False) + dfinal.addCallback(self.notify_stop) + return dfinal + + +class MPIExecEngineSetLauncher(MPIExecLauncher): + + engine_cmd = List(find_engine_cmd(), config=False) + engine_args = List(['--log-to-file','--log-level', '40'], config=True) + n = Int(1, config=True) + + def start(self, n, profile=None, cluster_dir=None): + if cluster_dir is not None: + self.engine_args.extend(['--cluster-dir', cluster_dir]) + if profile is not None: + self.engine_args.extend(['--profile', profile]) + log.msg('Starting MPIExecEngineSetLauncher: %r' % self.args) + return super(MPIExecEngineSetLauncher, self).start(n) + + def find_args(self): + return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \ + self.engine_cmd + self.engine_args + + +class WindowsHPCEngineSetLauncher(WindowsHPCLauncher): + pass + + +class PBSEngineSetLauncher(PBSLauncher): + + def start(self, n, profile=None, cluster_dir=None): + if cluster_dir is not None: + self.program_args.extend(['--cluster-dir', cluster_dir]) + if profile is not None: + self.program_args.extend(['-p', profile]) + log.msg('Starting PBSEngineSetLauncher: %r' % self.args) + return super(PBSEngineSetLauncher, self).start(n) + + +class SSHEngineSetLauncher(BaseLauncher): + pass + + +