diff --git a/IPython/parallel/apps/ipclusterapp.py b/IPython/parallel/apps/ipclusterapp.py index bf5d1e8..5a5f856 100755 --- a/IPython/parallel/apps/ipclusterapp.py +++ b/IPython/parallel/apps/ipclusterapp.py @@ -295,16 +295,14 @@ class IPClusterEngines(BaseParallelApplication): self.exit(1) launcher = klass( - work_dir=u'.', config=self.config, log=self.log + work_dir=u'.', config=self.config, log=self.log, + profile_dir=self.profile_dir.location, cluster_id=self.cluster_id, ) return launcher def start_engines(self): self.log.info("Starting %i engines"%self.n) - self.engine_launcher.start( - self.n, - self.profile_dir.location - ) + self.engine_launcher.start(self.n) def stop_engines(self): self.log.info("Stopping Engines...") @@ -429,9 +427,7 @@ class IPClusterStart(IPClusterEngines): self.controller_launcher.on_stop(self.stop_launchers) def start_controller(self): - self.controller_launcher.start( - self.profile_dir.location - ) + self.controller_launcher.start() def stop_controller(self): # self.log.info("In stop_controller") diff --git a/IPython/parallel/apps/launcher.py b/IPython/parallel/apps/launcher.py index fdfe63e..25daa98 100644 --- a/IPython/parallel/apps/launcher.py +++ b/IPython/parallel/apps/launcher.py @@ -57,7 +57,9 @@ from zmq.eventloop import ioloop from IPython.config.application import Application from IPython.config.configurable import LoggingConfigurable from IPython.utils.text import EvalFormatter -from IPython.utils.traitlets import Any, Int, CFloat, List, Unicode, Dict, Instance +from IPython.utils.traitlets import ( + Any, Int, CFloat, List, Unicode, Dict, Instance, HasTraits, +) from IPython.utils.path import get_ipython_module_path from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError @@ -213,6 +215,33 @@ class BaseLauncher(LoggingConfigurable): """ raise NotImplementedError('signal must be implemented in a subclass') +class ClusterAppMixin(HasTraits): + """MixIn for cluster args as traits""" + cluster_args = List([]) + profile_dir=Unicode('') + cluster_id=Unicode('') + def _profile_dir_changed(self, name, old, new): + self.cluster_args = [] + if self.profile_dir: + self.cluster_args.extend(['--profile-dir', self.profile_dir]) + if self.cluster_id: + self.cluster_args.extend(['--cluster-id', self.cluster_id]) + _cluster_id_changed = _profile_dir_changed + +class ControllerMixin(ClusterAppMixin): + controller_cmd = List(ipcontroller_cmd_argv, config=True, + help="""Popen command to launch ipcontroller.""") + # Command line arguments to ipcontroller. + controller_args = List(['--log-to-file','--log-level=%i'%logging.INFO], config=True, + help="""command-line args to pass to ipcontroller""") + +class EngineMixin(ClusterAppMixin): + engine_cmd = List(ipengine_cmd_argv, config=True, + help="""command to launch the Engine.""") + # Command line arguments for ipengine. + engine_args = List(['--log-to-file','--log-level=%i'%logging.INFO], config=True, + help="command-line arguments to pass to ipengine" + ) #----------------------------------------------------------------------------- # Local process launchers @@ -317,54 +346,28 @@ class LocalProcessLauncher(BaseLauncher): self.notify_stop(dict(exit_code=status, pid=self.process.pid)) return status -class LocalControllerLauncher(LocalProcessLauncher): +class LocalControllerLauncher(LocalProcessLauncher, ControllerMixin): """Launch a controller as a regular external process.""" - controller_cmd = List(ipcontroller_cmd_argv, config=True, - help="""Popen command to launch ipcontroller.""") - # Command line arguments to ipcontroller. - controller_args = List(['--log-to-file','--log-level=%i'%logging.INFO], config=True, - help="""command-line args to pass to ipcontroller""") - def find_args(self): - return self.controller_cmd + self.controller_args + return self.controller_cmd + self.cluster_args + self.controller_args - def start(self, profile_dir): + def start(self): """Start the controller by profile_dir.""" - self.controller_args.extend(['--profile-dir=%s'%profile_dir]) - self.profile_dir = unicode(profile_dir) self.log.info("Starting LocalControllerLauncher: %r" % self.args) return super(LocalControllerLauncher, self).start() -class LocalEngineLauncher(LocalProcessLauncher): +class LocalEngineLauncher(LocalProcessLauncher, EngineMixin): """Launch a single engine as a regular externall process.""" - engine_cmd = List(ipengine_cmd_argv, config=True, - help="""command to launch the Engine.""") - # Command line arguments for ipengine. - engine_args = List(['--log-to-file','--log-level=%i'%logging.INFO], config=True, - help="command-line arguments to pass to ipengine" - ) - def find_args(self): - return self.engine_cmd + self.engine_args - - def start(self, profile_dir): - """Start the engine by profile_dir.""" - self.engine_args.extend(['--profile-dir=%s'%profile_dir]) - self.profile_dir = unicode(profile_dir) - return super(LocalEngineLauncher, self).start() + return self.engine_cmd + self.cluster_args + self.engine_args -class LocalEngineSetLauncher(BaseLauncher): +class LocalEngineSetLauncher(LocalEngineLauncher): """Launch a set of engines as regular external processes.""" - # Command line arguments for ipengine. - engine_args = List( - ['--log-to-file','--log-level=%i'%logging.INFO], config=True, - help="command-line arguments to pass to ipengine" - ) delay = CFloat(0.1, config=True, help="""delay (in seconds) between starting each engine after the first. This can help force the engines to get their ids in order, or limit @@ -383,26 +386,26 @@ class LocalEngineSetLauncher(BaseLauncher): ) self.stop_data = {} - def start(self, n, profile_dir): + def start(self, n): """Start n engines by profile or profile_dir.""" - self.profile_dir = unicode(profile_dir) dlist = [] for i in range(n): if i > 0: time.sleep(self.delay) - el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log) + el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log, + profile_dir=self.profile_dir, cluster_id=self.cluster_id, + ) + # Copy the engine args over to each engine launcher. + el.engine_cmd = copy.deepcopy(self.engine_cmd) el.engine_args = copy.deepcopy(self.engine_args) el.on_stop(self._notice_engine_stopped) - d = el.start(profile_dir) + d = el.start() if i==0: self.log.info("Starting LocalEngineSetLauncher: %r" % el.args) self.launchers[i] = el dlist.append(d) self.notify_start(dlist) - # The consumeErrors here could be dangerous - # dfinal = gatherBoth(dlist, consumeErrors=True) - # dfinal.addCallback(self.notify_start) return dlist def find_args(self): @@ -413,7 +416,6 @@ class LocalEngineSetLauncher(BaseLauncher): for el in self.launchers.itervalues(): d = el.signal(sig) dlist.append(d) - # dfinal = gatherBoth(dlist, consumeErrors=True) return dlist def interrupt_then_kill(self, delay=1.0): @@ -421,7 +423,6 @@ class LocalEngineSetLauncher(BaseLauncher): for el in self.launchers.itervalues(): d = el.interrupt_then_kill(delay) dlist.append(d) - # dfinal = gatherBoth(dlist, consumeErrors=True) return dlist def stop(self): @@ -452,9 +453,9 @@ class MPIExecLauncher(LocalProcessLauncher): mpi_args = List([], config=True, help="The command line arguments to pass to mpiexec." ) - program = List(['date'], config=True, + program = List(['date'], help="The program to start via mpiexec.") - program_args = List([], config=True, + program_args = List([], help="The command line argument to the program." ) n = Int(1) @@ -470,44 +471,42 @@ class MPIExecLauncher(LocalProcessLauncher): return super(MPIExecLauncher, self).start() -class MPIExecControllerLauncher(MPIExecLauncher): +class MPIExecControllerLauncher(MPIExecLauncher, ControllerMixin): """Launch a controller using mpiexec.""" - controller_cmd = List(ipcontroller_cmd_argv, config=True, - help="Popen command to launch the Contropper" - ) - controller_args = List(['--log-to-file','--log-level=%i'%logging.INFO], config=True, - help="Command line arguments to pass to ipcontroller." - ) - n = Int(1) + # alias back to *non-configurable* program[_args] for use in find_args() + # this way all Controller/EngineSetLaunchers have the same form, rather + # than *some* having `program_args` and others `controller_args` + @property + def program(self): + return self.controller_cmd + + @property + def program_args(self): + return self.cluster_args + self.controller_args - def start(self, profile_dir): + def start(self): """Start the controller by profile_dir.""" - self.controller_args.extend(['--profile-dir=%s'%profile_dir]) - self.profile_dir = unicode(profile_dir) self.log.info("Starting MPIExecControllerLauncher: %r" % self.args) return super(MPIExecControllerLauncher, self).start(1) - def find_args(self): - return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \ - self.controller_cmd + self.controller_args - -class MPIExecEngineSetLauncher(MPIExecLauncher): +class MPIExecEngineSetLauncher(MPIExecLauncher, EngineMixin): + """Launch engines using mpiexec""" - program = List(ipengine_cmd_argv, config=True, - help="Popen command for ipengine" - ) - program_args = List( - ['--log-to-file','--log-level=%i'%logging.INFO], config=True, - help="Command line arguments for ipengine." - ) - n = Int(1) + # alias back to *non-configurable* program[_args] for use in find_args() + # this way all Controller/EngineSetLaunchers have the same form, rather + # than *some* having `program_args` and others `controller_args` + @property + def program(self): + return self.engine_cmd + + @property + def program_args(self): + return self.cluster_args + self.engine_args - def start(self, n, profile_dir): + def start(self, n): """Start n engines by profile or profile_dir.""" - self.program_args.extend(['--profile-dir=%s'%profile_dir]) - self.profile_dir = unicode(profile_dir) self.n = n self.log.info('Starting MPIExecEngineSetLauncher: %r' % self.args) return super(MPIExecEngineSetLauncher, self).start(n) @@ -530,9 +529,9 @@ class SSHLauncher(LocalProcessLauncher): help="command for starting ssh") ssh_args = List(['-tt'], config=True, help="args to pass to ssh") - program = List(['date'], config=True, + program = List(['date'], help="Program to launch via ssh") - program_args = List([], config=True, + program_args = List([], help="args to pass to remote program") hostname = Unicode('', config=True, help="hostname on which to launch the program") @@ -554,8 +553,7 @@ class SSHLauncher(LocalProcessLauncher): return self.ssh_cmd + self.ssh_args + [self.location] + \ self.program + self.program_args - def start(self, profile_dir, hostname=None, user=None): - self.profile_dir = unicode(profile_dir) + def start(self, hostname=None, user=None): if hostname is not None: self.hostname = hostname if user is not None: @@ -571,22 +569,33 @@ class SSHLauncher(LocalProcessLauncher): -class SSHControllerLauncher(SSHLauncher): +class SSHControllerLauncher(SSHLauncher, ControllerMixin): + + # alias back to *non-configurable* program[_args] for use in find_args() + # this way all Controller/EngineSetLaunchers have the same form, rather + # than *some* having `program_args` and others `controller_args` + @property + def program(self): + return self.controller_cmd + + @property + def program_args(self): + return self.cluster_args + self.controller_args - program = List(ipcontroller_cmd_argv, config=True, - help="remote ipcontroller command.") - program_args = List(['--reuse-files', '--log-to-file','--log-level=%i'%logging.INFO], config=True, - help="Command line arguments to ipcontroller.") +class SSHEngineLauncher(SSHLauncher, EngineMixin): -class SSHEngineLauncher(SSHLauncher): - program = List(ipengine_cmd_argv, config=True, - help="remote ipengine command.") - # Command line arguments for ipengine. - program_args = List( - ['--log-to-file','--log_level=%i'%logging.INFO], config=True, - help="Command line arguments to ipengine." - ) + # alias back to *non-configurable* program[_args] for use in find_args() + # this way all Controller/EngineSetLaunchers have the same form, rather + # than *some* having `program_args` and others `controller_args` + @property + def program(self): + return self.engine_cmd + + @property + def program_args(self): + return self.cluster_args + self.engine_args + class SSHEngineSetLauncher(LocalEngineSetLauncher): launcher_class = SSHEngineLauncher @@ -594,12 +603,11 @@ class SSHEngineSetLauncher(LocalEngineSetLauncher): help="""dict of engines to launch. This is a dict by hostname of ints, corresponding to the number of engines to start on that host.""") - def start(self, n, profile_dir): + def start(self, n): """Start engines by profile or profile_dir. `n` is ignored, and the `engines` config property is used instead. """ - self.profile_dir = unicode(profile_dir) dlist = [] for host, n in self.engines.iteritems(): if isinstance(n, (tuple, list)): @@ -614,13 +622,15 @@ class SSHEngineSetLauncher(LocalEngineSetLauncher): for i in range(n): if i > 0: time.sleep(self.delay) - el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log) + el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log, + profile_dir=self.profile_dir, cluster_id=self.cluster_id, + ) # Copy the engine args over to each engine launcher. - i - el.program_args = args + el.engine_cmd = self.engine_cmd + el.engine_args = args el.on_stop(self._notice_engine_stopped) - d = el.start(profile_dir, user=user, hostname=host) + d = el.start(user=user, hostname=host) if i==0: self.log.info("Starting SSHEngineSetLauncher: %r" % el.args) self.launchers[host+str(i)] = el @@ -727,11 +737,11 @@ class WindowsHPCLauncher(BaseLauncher): return output -class WindowsHPCControllerLauncher(WindowsHPCLauncher): +class WindowsHPCControllerLauncher(WindowsHPCLauncher, ClusterAppMixin): job_file_name = Unicode(u'ipcontroller_job.xml', config=True, help="WinHPC xml job file.") - extra_args = List([], config=False, + controller_args = List([], config=False, help="extra args to pass to ipcontroller") def write_job_file(self, n): @@ -743,7 +753,8 @@ class WindowsHPCControllerLauncher(WindowsHPCLauncher): # files that the scheduler redirects to. t.work_directory = self.profile_dir # Add the profile_dir and from self.start(). - t.controller_args.extend(self.extra_args) + t.controller_args.extend(self.cluster_args) + t.controller_args.extend(self.controller_args) job.add_task(t) self.log.info("Writing job description file: %s" % self.job_file) @@ -753,18 +764,16 @@ class WindowsHPCControllerLauncher(WindowsHPCLauncher): def job_file(self): return os.path.join(self.profile_dir, self.job_file_name) - def start(self, profile_dir): + def start(self): """Start the controller by profile_dir.""" - self.extra_args = ['--profile-dir=%s'%profile_dir] - self.profile_dir = unicode(profile_dir) return super(WindowsHPCControllerLauncher, self).start(1) -class WindowsHPCEngineSetLauncher(WindowsHPCLauncher): +class WindowsHPCEngineSetLauncher(WindowsHPCLauncher, ClusterAppMixin): job_file_name = Unicode(u'ipengineset_job.xml', config=True, help="jobfile for ipengines job") - extra_args = List([], config=False, + engine_args = List([], config=False, help="extra args to pas to ipengine") def write_job_file(self, n): @@ -777,7 +786,8 @@ class WindowsHPCEngineSetLauncher(WindowsHPCLauncher): # files that the scheduler redirects to. t.work_directory = self.profile_dir # Add the profile_dir and from self.start(). - t.engine_args.extend(self.extra_args) + t.controller_args.extend(self.cluster_args) + t.controller_args.extend(self.engine_args) job.add_task(t) self.log.info("Writing job description file: %s" % self.job_file) @@ -787,10 +797,8 @@ class WindowsHPCEngineSetLauncher(WindowsHPCLauncher): def job_file(self): return os.path.join(self.profile_dir, self.job_file_name) - def start(self, n, profile_dir): + def start(self, n): """Start the controller by profile_dir.""" - self.extra_args = ['--profile-dir=%s'%profile_dir] - self.profile_dir = unicode(profile_dir) return super(WindowsHPCEngineSetLauncher, self).start(n) @@ -798,6 +806,13 @@ class WindowsHPCEngineSetLauncher(WindowsHPCLauncher): # Batch (PBS) system launchers #----------------------------------------------------------------------------- +class BatchClusterAppMixin(ClusterAppMixin): + """ClusterApp mixin that updates context dict, rather than args""" + context = Dict({'profile_dir':'', 'cluster_id':''}) + def _profile_dir_changed(self, name, old, new): + self.context[name] = new + _cluster_id_changed = _profile_dir_changed + class BatchSystemLauncher(BaseLauncher): """Launch an external process using a batch system. @@ -829,6 +844,12 @@ class BatchSystemLauncher(BaseLauncher): queue = Unicode(u'', config=True, help="The PBS Queue.") + def _queue_changed(self, name, old, new): + self.context[name] = new + + n = Int(1) + _n_changed = _queue_changed + # not configurable, override in subclasses # PBS Job Array regex job_array_regexp = Unicode('') @@ -868,8 +889,7 @@ class BatchSystemLauncher(BaseLauncher): def write_batch_script(self, n): """Instantiate and write the batch script to the work_dir.""" - self.context['n'] = n - self.context['queue'] = self.queue + self.n = n # first priority is batch_template if set if self.batch_template_file and not self.batch_template: # second priority is batch_template_file @@ -902,12 +922,10 @@ class BatchSystemLauncher(BaseLauncher): f.write(script_as_string) os.chmod(self.batch_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR) - def start(self, n, profile_dir): + def start(self, n): """Start n copies of the process using a batch system.""" # Here we save profile_dir in the context so they # can be used in the batch script template as {profile_dir} - self.context['profile_dir'] = profile_dir - self.profile_dir = unicode(profile_dir) self.write_batch_script(n) output = check_output(self.args, env=os.environ) @@ -938,7 +956,7 @@ class PBSLauncher(BatchSystemLauncher): queue_template = Unicode('#PBS -q {queue}') -class PBSControllerLauncher(PBSLauncher): +class PBSControllerLauncher(BatchClusterAppMixin, PBSLauncher): """Launch a controller using PBS.""" batch_file_name = Unicode(u'pbs_controller', config=True, @@ -946,29 +964,30 @@ class PBSControllerLauncher(PBSLauncher): default_template= Unicode("""#!/bin/sh #PBS -V #PBS -N ipcontroller -%s --log-to-file --profile-dir={profile_dir} +%s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}" """%(' '.join(ipcontroller_cmd_argv))) - def start(self, profile_dir): + + def start(self): """Start the controller by profile or profile_dir.""" self.log.info("Starting PBSControllerLauncher: %r" % self.args) - return super(PBSControllerLauncher, self).start(1, profile_dir) + return super(PBSControllerLauncher, self).start(1) -class PBSEngineSetLauncher(PBSLauncher): +class PBSEngineSetLauncher(BatchClusterAppMixin, PBSLauncher): """Launch Engines using PBS""" batch_file_name = Unicode(u'pbs_engines', config=True, help="batch file name for the engine(s) job.") default_template= Unicode(u"""#!/bin/sh #PBS -V #PBS -N ipengine -%s --profile-dir={profile_dir} +%s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}" """%(' '.join(ipengine_cmd_argv))) - def start(self, n, profile_dir): + def start(self, n): """Start n engines by profile or profile_dir.""" self.log.info('Starting %i engines with PBSEngineSetLauncher: %r' % (n, self.args)) - return super(PBSEngineSetLauncher, self).start(n, profile_dir) + return super(PBSEngineSetLauncher, self).start(n) #SGE is very similar to PBS @@ -979,7 +998,7 @@ class SGELauncher(PBSLauncher): queue_regexp = Unicode('#\$\W+-q\W+\$?\w+') queue_template = Unicode('#$ -q {queue}') -class SGEControllerLauncher(SGELauncher): +class SGEControllerLauncher(BatchClusterAppMixin, SGELauncher): """Launch a controller using SGE.""" batch_file_name = Unicode(u'sge_controller', config=True, @@ -987,28 +1006,28 @@ class SGEControllerLauncher(SGELauncher): default_template= Unicode(u"""#$ -V #$ -S /bin/sh #$ -N ipcontroller -%s --log-to-file --profile-dir={profile_dir} +%s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}" """%(' '.join(ipcontroller_cmd_argv))) - def start(self, profile_dir): + def start(self): """Start the controller by profile or profile_dir.""" self.log.info("Starting PBSControllerLauncher: %r" % self.args) - return super(SGEControllerLauncher, self).start(1, profile_dir) + return super(SGEControllerLauncher, self).start(1) -class SGEEngineSetLauncher(SGELauncher): +class SGEEngineSetLauncher(BatchClusterAppMixin, SGELauncher): """Launch Engines with SGE""" batch_file_name = Unicode(u'sge_engines', config=True, help="batch file name for the engine(s) job.") default_template = Unicode("""#$ -V #$ -S /bin/sh #$ -N ipengine -%s --profile-dir={profile_dir} +%s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}" """%(' '.join(ipengine_cmd_argv))) - def start(self, n, profile_dir): + def start(self, n): """Start n engines by profile or profile_dir.""" self.log.info('Starting %i engines with SGEEngineSetLauncher: %r' % (n, self.args)) - return super(SGEEngineSetLauncher, self).start(n, profile_dir) + return super(SGEEngineSetLauncher, self).start(n) # LSF launchers @@ -1029,7 +1048,7 @@ class LSFLauncher(BatchSystemLauncher): queue_regexp = Unicode('#BSUB[ \t]+-q[ \t]+\w+') queue_template = Unicode('#BSUB -q {queue}') - def start(self, n, profile_dir): + def start(self, n): """Start n copies of the process using LSF batch system. This cant inherit from the base class because bsub expects to be piped a shell script in order to honor the #BSUB directives : @@ -1037,8 +1056,6 @@ class LSFLauncher(BatchSystemLauncher): """ # Here we save profile_dir in the context so they # can be used in the batch script template as {profile_dir} - self.context['profile_dir'] = profile_dir - self.profile_dir = unicode(profile_dir) self.write_batch_script(n) #output = check_output(self.args, env=os.environ) piped_cmd = self.args[0]+'<\"'+self.args[1]+'\"' @@ -1049,7 +1066,7 @@ class LSFLauncher(BatchSystemLauncher): return job_id -class LSFControllerLauncher(LSFLauncher): +class LSFControllerLauncher(BatchClusterAppMixin, LSFLauncher): """Launch a controller using LSF.""" batch_file_name = Unicode(u'lsf_controller', config=True, @@ -1058,29 +1075,29 @@ class LSFControllerLauncher(LSFLauncher): #BSUB -J ipcontroller #BSUB -oo ipcontroller.o.%%J #BSUB -eo ipcontroller.e.%%J - %s --log-to-file --profile-dir={profile_dir} + %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}" """%(' '.join(ipcontroller_cmd_argv))) - def start(self, profile_dir): + def start(self): """Start the controller by profile or profile_dir.""" self.log.info("Starting LSFControllerLauncher: %r" % self.args) - return super(LSFControllerLauncher, self).start(1, profile_dir) + return super(LSFControllerLauncher, self).start(1) -class LSFEngineSetLauncher(LSFLauncher): +class LSFEngineSetLauncher(BatchClusterAppMixin, LSFLauncher): """Launch Engines using LSF""" batch_file_name = Unicode(u'lsf_engines', config=True, help="batch file name for the engine(s) job.") default_template= Unicode(u"""#!/bin/sh #BSUB -oo ipengine.o.%%J #BSUB -eo ipengine.e.%%J - %s --profile-dir={profile_dir} + %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}" """%(' '.join(ipengine_cmd_argv))) - def start(self, n, profile_dir): + def start(self, n): """Start n engines by profile or profile_dir.""" self.log.info('Starting %i engines with LSFEngineSetLauncher: %r' % (n, self.args)) - return super(LSFEngineSetLauncher, self).start(n, profile_dir) + return super(LSFEngineSetLauncher, self).start(n) #-----------------------------------------------------------------------------