From 3994edb75897eb8a5e19fcb6191cd0d704eb7376 2011-09-20 20:33:51 From: MinRK Date: 2011-09-20 20:33:51 Subject: [PATCH] Merge PR #795 (cluster-id and launcher cleanup) closes gh-795 --- diff --git a/IPython/config/configurable.py b/IPython/config/configurable.py index 3dc973a..8cfb695 100755 --- a/IPython/config/configurable.py +++ b/IPython/config/configurable.py @@ -207,8 +207,9 @@ class Configurable(HasTraits): for parent in cls.mro(): # only include parents that are not base classes # and are not the class itself - if issubclass(parent, Configurable) and \ - not parent in (Configurable, SingletonConfigurable, cls): + # and have some configurable traits to inherit + if parent is not cls and issubclass(parent, Configurable) and \ + parent.class_traits(config=True): parents.append(parent) if parents: diff --git a/IPython/parallel/apps/baseapp.py b/IPython/parallel/apps/baseapp.py index 78645fa..86a8506 100755 --- a/IPython/parallel/apps/baseapp.py +++ b/IPython/parallel/apps/baseapp.py @@ -75,6 +75,7 @@ base_aliases.update({ 'log-to-file' : 'BaseParallelApplication.log_to_file', 'clean-logs' : 'BaseParallelApplication.clean_logs', 'log-url' : 'BaseParallelApplication.log_url', + 'cluster-id' : 'BaseParallelApplication.cluster_id', }) base_flags = { @@ -116,6 +117,22 @@ class BaseParallelApplication(BaseIPythonApplication): log_url = Unicode('', config=True, help="The ZMQ URL of the iplogger to aggregate logging.") + cluster_id = Unicode('', config=True, + help="""String id to add to runtime files, to prevent name collisions when + using multiple clusters with a single profile simultaneously. + + When set, files will be named like: 'ipcontroller--engine.json' + + Since this is text inserted into filenames, typical recommendations apply: + Simple character strings are ideal, and spaces are not recommended (but should + generally work). + """ + ) + def _cluster_id_changed(self, name, old, new): + self.name = self.__class__.name + if new: + self.name += '-%s'%new + def _config_files_default(self): return ['ipcontroller_config.py', 'ipengine_config.py', 'ipcluster_config.py'] diff --git a/IPython/parallel/apps/ipclusterapp.py b/IPython/parallel/apps/ipclusterapp.py index bf5d1e8..44d4ca4 100755 --- a/IPython/parallel/apps/ipclusterapp.py +++ b/IPython/parallel/apps/ipclusterapp.py @@ -275,19 +275,22 @@ class IPClusterEngines(BaseParallelApplication): self.init_launchers() def init_launchers(self): - self.engine_launcher = self.build_launcher(self.engine_launcher_class) + self.engine_launcher = self.build_launcher(self.engine_launcher_class, 'EngineSet') self.engine_launcher.on_stop(lambda r: self.loop.stop()) def init_signal(self): # Setup signals signal.signal(signal.SIGINT, self.sigint_handler) - def build_launcher(self, clsname): + def build_launcher(self, clsname, kind=None): """import and instantiate a Launcher based on importstring""" if '.' not in clsname: # not a module, presume it's the raw name in apps.launcher + if kind and kind not in clsname: + # doesn't match necessary full class name, assume it's + # just 'PBS' or 'MPIExec' prefix: + clsname = clsname + kind + 'Launcher' clsname = 'IPython.parallel.apps.launcher.'+clsname - # print repr(clsname) try: klass = import_item(clsname) except (ImportError, KeyError): @@ -295,16 +298,14 @@ class IPClusterEngines(BaseParallelApplication): self.exit(1) launcher = klass( - work_dir=u'.', config=self.config, log=self.log + work_dir=u'.', config=self.config, log=self.log, + profile_dir=self.profile_dir.location, cluster_id=self.cluster_id, ) return launcher def start_engines(self): self.log.info("Starting %i engines"%self.n) - self.engine_launcher.start( - self.n, - self.profile_dir.location - ) + self.engine_launcher.start(self.n) def stop_engines(self): self.log.info("Stopping Engines...") @@ -424,14 +425,12 @@ class IPClusterStart(IPClusterEngines): aliases = Dict(start_aliases) def init_launchers(self): - self.controller_launcher = self.build_launcher(self.controller_launcher_class) - self.engine_launcher = self.build_launcher(self.engine_launcher_class) + self.controller_launcher = self.build_launcher(self.controller_launcher_class, 'Controller') + self.engine_launcher = self.build_launcher(self.engine_launcher_class, 'EngineSet') self.controller_launcher.on_stop(self.stop_launchers) def start_controller(self): - self.controller_launcher.start( - self.profile_dir.location - ) + self.controller_launcher.start() def stop_controller(self): # self.log.info("In stop_controller") diff --git a/IPython/parallel/apps/ipcontrollerapp.py b/IPython/parallel/apps/ipcontrollerapp.py index 3f31680..538f38f 100755 --- a/IPython/parallel/apps/ipcontrollerapp.py +++ b/IPython/parallel/apps/ipcontrollerapp.py @@ -174,7 +174,18 @@ class IPControllerApp(BaseParallelApplication): use_threads = Bool(False, config=True, help='Use threads instead of processes for the schedulers', - ) + ) + + engine_json_file = Unicode('ipcontroller-engine.json', config=True, + help="JSON filename where engine connection info will be stored.") + client_json_file = Unicode('ipcontroller-client.json', config=True, + help="JSON filename where client connection info will be stored.") + + def _cluster_id_changed(self, name, old, new): + super(IPControllerApp, self)._cluster_id_changed(name, old, new) + self.engine_json_file = "%s-engine.json" % self.name + self.client_json_file = "%s-client.json" % self.name + # internal children = List() @@ -215,7 +226,7 @@ class IPControllerApp(BaseParallelApplication): """load config from existing json connector files.""" c = self.config # load from engine config - with open(os.path.join(self.profile_dir.security_dir, 'ipcontroller-engine.json')) as f: + with open(os.path.join(self.profile_dir.security_dir, self.engine_json_file)) as f: cfg = json.loads(f.read()) key = c.Session.key = asbytes(cfg['exec_key']) xport,addr = cfg['url'].split('://') @@ -227,7 +238,7 @@ class IPControllerApp(BaseParallelApplication): if not self.engine_ssh_server: self.engine_ssh_server = cfg['ssh'] # load client config - with open(os.path.join(self.profile_dir.security_dir, 'ipcontroller-client.json')) as f: + with open(os.path.join(self.profile_dir.security_dir, self.client_json_file)) as f: cfg = json.loads(f.read()) assert key == cfg['exec_key'], "exec_key mismatch between engine and client keys" xport,addr = cfg['url'].split('://') @@ -277,11 +288,11 @@ class IPControllerApp(BaseParallelApplication): 'url' : "%s://%s:%s"%(f.client_transport, f.client_ip, f.regport), 'location' : self.location } - self.save_connection_dict('ipcontroller-client.json', cdict) + self.save_connection_dict(self.client_json_file, cdict) edict = cdict edict['url']="%s://%s:%s"%((f.client_transport, f.client_ip, f.regport)) edict['ssh'] = self.engine_ssh_server - self.save_connection_dict('ipcontroller-engine.json', edict) + self.save_connection_dict(self.engine_json_file, edict) # def init_schedulers(self): diff --git a/IPython/parallel/apps/ipengineapp.py b/IPython/parallel/apps/ipengineapp.py index 43fca7a..6917733 100755 --- a/IPython/parallel/apps/ipengineapp.py +++ b/IPython/parallel/apps/ipengineapp.py @@ -93,7 +93,7 @@ class MPI(Configurable): help='How to enable MPI (mpi4py, pytrilinos, or empty string to disable).' ) - def _on_use_changed(self, old, new): + def _use_changed(self, name, old, new): # load default init script if it's not set if not self.init_script: self.init_script = self.default_inits.get(new, '') @@ -135,8 +135,8 @@ aliases.update(base_aliases) class IPEngineApp(BaseParallelApplication): - name = Unicode(u'ipengine') - description = Unicode(_description) + name = 'ipengine' + description = _description examples = _examples config_file_name = Unicode(default_config_file_name) classes = List([ProfileDir, Session, EngineFactory, Kernel, MPI]) @@ -158,7 +158,15 @@ class IPEngineApp(BaseParallelApplication): controller and engine are started at the same time and it may take a moment for the controller to write the connector files.""") - url_file_name = Unicode(u'ipcontroller-engine.json') + url_file_name = Unicode(u'ipcontroller-engine.json', config=True) + + def _cluster_id_changed(self, name, old, new): + if new: + base = 'ipcontroller-%s' % new + else: + base = 'ipcontroller' + self.url_file_name = "%s-engine.json" % base + log_url = Unicode('', config=True, help="""The URL for the iploggerapp instance, for forwarding logging to a central location.""") diff --git a/IPython/parallel/apps/launcher.py b/IPython/parallel/apps/launcher.py index fdfe63e..9267f38 100644 --- a/IPython/parallel/apps/launcher.py +++ b/IPython/parallel/apps/launcher.py @@ -57,7 +57,9 @@ from zmq.eventloop import ioloop from IPython.config.application import Application from IPython.config.configurable import LoggingConfigurable from IPython.utils.text import EvalFormatter -from IPython.utils.traitlets import Any, Int, CFloat, List, Unicode, Dict, Instance +from IPython.utils.traitlets import ( + Any, Int, CFloat, List, Unicode, Dict, Instance, HasTraits, +) from IPython.utils.path import get_ipython_module_path from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError @@ -213,6 +215,33 @@ class BaseLauncher(LoggingConfigurable): """ raise NotImplementedError('signal must be implemented in a subclass') +class ClusterAppMixin(HasTraits): + """MixIn for cluster args as traits""" + cluster_args = List([]) + profile_dir=Unicode('') + cluster_id=Unicode('') + def _profile_dir_changed(self, name, old, new): + self.cluster_args = [] + if self.profile_dir: + self.cluster_args.extend(['--profile-dir', self.profile_dir]) + if self.cluster_id: + self.cluster_args.extend(['--cluster-id', self.cluster_id]) + _cluster_id_changed = _profile_dir_changed + +class ControllerMixin(ClusterAppMixin): + controller_cmd = List(ipcontroller_cmd_argv, config=True, + help="""Popen command to launch ipcontroller.""") + # Command line arguments to ipcontroller. + controller_args = List(['--log-to-file','--log-level=%i' % logging.INFO], config=True, + help="""command-line args to pass to ipcontroller""") + +class EngineMixin(ClusterAppMixin): + engine_cmd = List(ipengine_cmd_argv, config=True, + help="""command to launch the Engine.""") + # Command line arguments for ipengine. + engine_args = List(['--log-to-file','--log-level=%i' % logging.INFO], config=True, + help="command-line arguments to pass to ipengine" + ) #----------------------------------------------------------------------------- # Local process launchers @@ -317,54 +346,28 @@ class LocalProcessLauncher(BaseLauncher): self.notify_stop(dict(exit_code=status, pid=self.process.pid)) return status -class LocalControllerLauncher(LocalProcessLauncher): +class LocalControllerLauncher(LocalProcessLauncher, ControllerMixin): """Launch a controller as a regular external process.""" - controller_cmd = List(ipcontroller_cmd_argv, config=True, - help="""Popen command to launch ipcontroller.""") - # Command line arguments to ipcontroller. - controller_args = List(['--log-to-file','--log-level=%i'%logging.INFO], config=True, - help="""command-line args to pass to ipcontroller""") - def find_args(self): - return self.controller_cmd + self.controller_args + return self.controller_cmd + self.cluster_args + self.controller_args - def start(self, profile_dir): + def start(self): """Start the controller by profile_dir.""" - self.controller_args.extend(['--profile-dir=%s'%profile_dir]) - self.profile_dir = unicode(profile_dir) self.log.info("Starting LocalControllerLauncher: %r" % self.args) return super(LocalControllerLauncher, self).start() -class LocalEngineLauncher(LocalProcessLauncher): +class LocalEngineLauncher(LocalProcessLauncher, EngineMixin): """Launch a single engine as a regular externall process.""" - engine_cmd = List(ipengine_cmd_argv, config=True, - help="""command to launch the Engine.""") - # Command line arguments for ipengine. - engine_args = List(['--log-to-file','--log-level=%i'%logging.INFO], config=True, - help="command-line arguments to pass to ipengine" - ) - def find_args(self): - return self.engine_cmd + self.engine_args + return self.engine_cmd + self.cluster_args + self.engine_args - def start(self, profile_dir): - """Start the engine by profile_dir.""" - self.engine_args.extend(['--profile-dir=%s'%profile_dir]) - self.profile_dir = unicode(profile_dir) - return super(LocalEngineLauncher, self).start() - -class LocalEngineSetLauncher(BaseLauncher): +class LocalEngineSetLauncher(LocalEngineLauncher): """Launch a set of engines as regular external processes.""" - # Command line arguments for ipengine. - engine_args = List( - ['--log-to-file','--log-level=%i'%logging.INFO], config=True, - help="command-line arguments to pass to ipengine" - ) delay = CFloat(0.1, config=True, help="""delay (in seconds) between starting each engine after the first. This can help force the engines to get their ids in order, or limit @@ -383,26 +386,26 @@ class LocalEngineSetLauncher(BaseLauncher): ) self.stop_data = {} - def start(self, n, profile_dir): + def start(self, n): """Start n engines by profile or profile_dir.""" - self.profile_dir = unicode(profile_dir) dlist = [] for i in range(n): if i > 0: time.sleep(self.delay) - el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log) + el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log, + profile_dir=self.profile_dir, cluster_id=self.cluster_id, + ) + # Copy the engine args over to each engine launcher. + el.engine_cmd = copy.deepcopy(self.engine_cmd) el.engine_args = copy.deepcopy(self.engine_args) el.on_stop(self._notice_engine_stopped) - d = el.start(profile_dir) + d = el.start() if i==0: self.log.info("Starting LocalEngineSetLauncher: %r" % el.args) self.launchers[i] = el dlist.append(d) self.notify_start(dlist) - # The consumeErrors here could be dangerous - # dfinal = gatherBoth(dlist, consumeErrors=True) - # dfinal.addCallback(self.notify_start) return dlist def find_args(self): @@ -413,7 +416,6 @@ class LocalEngineSetLauncher(BaseLauncher): for el in self.launchers.itervalues(): d = el.signal(sig) dlist.append(d) - # dfinal = gatherBoth(dlist, consumeErrors=True) return dlist def interrupt_then_kill(self, delay=1.0): @@ -421,7 +423,6 @@ class LocalEngineSetLauncher(BaseLauncher): for el in self.launchers.itervalues(): d = el.interrupt_then_kill(delay) dlist.append(d) - # dfinal = gatherBoth(dlist, consumeErrors=True) return dlist def stop(self): @@ -452,9 +453,9 @@ class MPIExecLauncher(LocalProcessLauncher): mpi_args = List([], config=True, help="The command line arguments to pass to mpiexec." ) - program = List(['date'], config=True, + program = List(['date'], help="The program to start via mpiexec.") - program_args = List([], config=True, + program_args = List([], help="The command line argument to the program." ) n = Int(1) @@ -470,44 +471,42 @@ class MPIExecLauncher(LocalProcessLauncher): return super(MPIExecLauncher, self).start() -class MPIExecControllerLauncher(MPIExecLauncher): +class MPIExecControllerLauncher(MPIExecLauncher, ControllerMixin): """Launch a controller using mpiexec.""" - controller_cmd = List(ipcontroller_cmd_argv, config=True, - help="Popen command to launch the Contropper" - ) - controller_args = List(['--log-to-file','--log-level=%i'%logging.INFO], config=True, - help="Command line arguments to pass to ipcontroller." - ) - n = Int(1) + # alias back to *non-configurable* program[_args] for use in find_args() + # this way all Controller/EngineSetLaunchers have the same form, rather + # than *some* having `program_args` and others `controller_args` + @property + def program(self): + return self.controller_cmd + + @property + def program_args(self): + return self.cluster_args + self.controller_args - def start(self, profile_dir): + def start(self): """Start the controller by profile_dir.""" - self.controller_args.extend(['--profile-dir=%s'%profile_dir]) - self.profile_dir = unicode(profile_dir) self.log.info("Starting MPIExecControllerLauncher: %r" % self.args) return super(MPIExecControllerLauncher, self).start(1) - def find_args(self): - return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \ - self.controller_cmd + self.controller_args - -class MPIExecEngineSetLauncher(MPIExecLauncher): +class MPIExecEngineSetLauncher(MPIExecLauncher, EngineMixin): + """Launch engines using mpiexec""" - program = List(ipengine_cmd_argv, config=True, - help="Popen command for ipengine" - ) - program_args = List( - ['--log-to-file','--log-level=%i'%logging.INFO], config=True, - help="Command line arguments for ipengine." - ) - n = Int(1) + # alias back to *non-configurable* program[_args] for use in find_args() + # this way all Controller/EngineSetLaunchers have the same form, rather + # than *some* having `program_args` and others `controller_args` + @property + def program(self): + return self.engine_cmd + + @property + def program_args(self): + return self.cluster_args + self.engine_args - def start(self, n, profile_dir): + def start(self, n): """Start n engines by profile or profile_dir.""" - self.program_args.extend(['--profile-dir=%s'%profile_dir]) - self.profile_dir = unicode(profile_dir) self.n = n self.log.info('Starting MPIExecEngineSetLauncher: %r' % self.args) return super(MPIExecEngineSetLauncher, self).start(n) @@ -530,9 +529,9 @@ class SSHLauncher(LocalProcessLauncher): help="command for starting ssh") ssh_args = List(['-tt'], config=True, help="args to pass to ssh") - program = List(['date'], config=True, + program = List(['date'], help="Program to launch via ssh") - program_args = List([], config=True, + program_args = List([], help="args to pass to remote program") hostname = Unicode('', config=True, help="hostname on which to launch the program") @@ -554,8 +553,7 @@ class SSHLauncher(LocalProcessLauncher): return self.ssh_cmd + self.ssh_args + [self.location] + \ self.program + self.program_args - def start(self, profile_dir, hostname=None, user=None): - self.profile_dir = unicode(profile_dir) + def start(self, hostname=None, user=None): if hostname is not None: self.hostname = hostname if user is not None: @@ -571,22 +569,33 @@ class SSHLauncher(LocalProcessLauncher): -class SSHControllerLauncher(SSHLauncher): +class SSHControllerLauncher(SSHLauncher, ControllerMixin): - program = List(ipcontroller_cmd_argv, config=True, - help="remote ipcontroller command.") - program_args = List(['--reuse-files', '--log-to-file','--log-level=%i'%logging.INFO], config=True, - help="Command line arguments to ipcontroller.") + # alias back to *non-configurable* program[_args] for use in find_args() + # this way all Controller/EngineSetLaunchers have the same form, rather + # than *some* having `program_args` and others `controller_args` + @property + def program(self): + return self.controller_cmd + + @property + def program_args(self): + return self.cluster_args + self.controller_args -class SSHEngineLauncher(SSHLauncher): - program = List(ipengine_cmd_argv, config=True, - help="remote ipengine command.") - # Command line arguments for ipengine. - program_args = List( - ['--log-to-file','--log_level=%i'%logging.INFO], config=True, - help="Command line arguments to ipengine." - ) +class SSHEngineLauncher(SSHLauncher, EngineMixin): + + # alias back to *non-configurable* program[_args] for use in find_args() + # this way all Controller/EngineSetLaunchers have the same form, rather + # than *some* having `program_args` and others `controller_args` + @property + def program(self): + return self.engine_cmd + + @property + def program_args(self): + return self.cluster_args + self.engine_args + class SSHEngineSetLauncher(LocalEngineSetLauncher): launcher_class = SSHEngineLauncher @@ -594,12 +603,11 @@ class SSHEngineSetLauncher(LocalEngineSetLauncher): help="""dict of engines to launch. This is a dict by hostname of ints, corresponding to the number of engines to start on that host.""") - def start(self, n, profile_dir): + def start(self, n): """Start engines by profile or profile_dir. `n` is ignored, and the `engines` config property is used instead. """ - self.profile_dir = unicode(profile_dir) dlist = [] for host, n in self.engines.iteritems(): if isinstance(n, (tuple, list)): @@ -614,13 +622,15 @@ class SSHEngineSetLauncher(LocalEngineSetLauncher): for i in range(n): if i > 0: time.sleep(self.delay) - el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log) + el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log, + profile_dir=self.profile_dir, cluster_id=self.cluster_id, + ) # Copy the engine args over to each engine launcher. - i - el.program_args = args + el.engine_cmd = self.engine_cmd + el.engine_args = args el.on_stop(self._notice_engine_stopped) - d = el.start(profile_dir, user=user, hostname=host) + d = el.start(user=user, hostname=host) if i==0: self.log.info("Starting SSHEngineSetLauncher: %r" % el.args) self.launchers[host+str(i)] = el @@ -727,11 +737,11 @@ class WindowsHPCLauncher(BaseLauncher): return output -class WindowsHPCControllerLauncher(WindowsHPCLauncher): +class WindowsHPCControllerLauncher(WindowsHPCLauncher, ClusterAppMixin): job_file_name = Unicode(u'ipcontroller_job.xml', config=True, help="WinHPC xml job file.") - extra_args = List([], config=False, + controller_args = List([], config=False, help="extra args to pass to ipcontroller") def write_job_file(self, n): @@ -743,7 +753,8 @@ class WindowsHPCControllerLauncher(WindowsHPCLauncher): # files that the scheduler redirects to. t.work_directory = self.profile_dir # Add the profile_dir and from self.start(). - t.controller_args.extend(self.extra_args) + t.controller_args.extend(self.cluster_args) + t.controller_args.extend(self.controller_args) job.add_task(t) self.log.info("Writing job description file: %s" % self.job_file) @@ -753,18 +764,16 @@ class WindowsHPCControllerLauncher(WindowsHPCLauncher): def job_file(self): return os.path.join(self.profile_dir, self.job_file_name) - def start(self, profile_dir): + def start(self): """Start the controller by profile_dir.""" - self.extra_args = ['--profile-dir=%s'%profile_dir] - self.profile_dir = unicode(profile_dir) return super(WindowsHPCControllerLauncher, self).start(1) -class WindowsHPCEngineSetLauncher(WindowsHPCLauncher): +class WindowsHPCEngineSetLauncher(WindowsHPCLauncher, ClusterAppMixin): job_file_name = Unicode(u'ipengineset_job.xml', config=True, help="jobfile for ipengines job") - extra_args = List([], config=False, + engine_args = List([], config=False, help="extra args to pas to ipengine") def write_job_file(self, n): @@ -777,7 +786,8 @@ class WindowsHPCEngineSetLauncher(WindowsHPCLauncher): # files that the scheduler redirects to. t.work_directory = self.profile_dir # Add the profile_dir and from self.start(). - t.engine_args.extend(self.extra_args) + t.controller_args.extend(self.cluster_args) + t.controller_args.extend(self.engine_args) job.add_task(t) self.log.info("Writing job description file: %s" % self.job_file) @@ -787,10 +797,8 @@ class WindowsHPCEngineSetLauncher(WindowsHPCLauncher): def job_file(self): return os.path.join(self.profile_dir, self.job_file_name) - def start(self, n, profile_dir): + def start(self, n): """Start the controller by profile_dir.""" - self.extra_args = ['--profile-dir=%s'%profile_dir] - self.profile_dir = unicode(profile_dir) return super(WindowsHPCEngineSetLauncher, self).start(n) @@ -798,6 +806,20 @@ class WindowsHPCEngineSetLauncher(WindowsHPCLauncher): # Batch (PBS) system launchers #----------------------------------------------------------------------------- +class BatchClusterAppMixin(ClusterAppMixin): + """ClusterApp mixin that updates the self.context dict, rather than cl-args.""" + def _profile_dir_changed(self, name, old, new): + self.context[name] = new + _cluster_id_changed = _profile_dir_changed + + def _profile_dir_default(self): + self.context['profile_dir'] = '' + return '' + def _cluster_id_default(self): + self.context['cluster_id'] = '' + return '' + + class BatchSystemLauncher(BaseLauncher): """Launch an external process using a batch system. @@ -829,6 +851,12 @@ class BatchSystemLauncher(BaseLauncher): queue = Unicode(u'', config=True, help="The PBS Queue.") + def _queue_changed(self, name, old, new): + self.context[name] = new + + n = Int(1) + _n_changed = _queue_changed + # not configurable, override in subclasses # PBS Job Array regex job_array_regexp = Unicode('') @@ -868,8 +896,7 @@ class BatchSystemLauncher(BaseLauncher): def write_batch_script(self, n): """Instantiate and write the batch script to the work_dir.""" - self.context['n'] = n - self.context['queue'] = self.queue + self.n = n # first priority is batch_template if set if self.batch_template_file and not self.batch_template: # second priority is batch_template_file @@ -902,12 +929,10 @@ class BatchSystemLauncher(BaseLauncher): f.write(script_as_string) os.chmod(self.batch_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR) - def start(self, n, profile_dir): + def start(self, n): """Start n copies of the process using a batch system.""" # Here we save profile_dir in the context so they # can be used in the batch script template as {profile_dir} - self.context['profile_dir'] = profile_dir - self.profile_dir = unicode(profile_dir) self.write_batch_script(n) output = check_output(self.args, env=os.environ) @@ -938,7 +963,7 @@ class PBSLauncher(BatchSystemLauncher): queue_template = Unicode('#PBS -q {queue}') -class PBSControllerLauncher(PBSLauncher): +class PBSControllerLauncher(PBSLauncher, BatchClusterAppMixin): """Launch a controller using PBS.""" batch_file_name = Unicode(u'pbs_controller', config=True, @@ -946,29 +971,30 @@ class PBSControllerLauncher(PBSLauncher): default_template= Unicode("""#!/bin/sh #PBS -V #PBS -N ipcontroller -%s --log-to-file --profile-dir={profile_dir} +%s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}" """%(' '.join(ipcontroller_cmd_argv))) - def start(self, profile_dir): + + def start(self): """Start the controller by profile or profile_dir.""" self.log.info("Starting PBSControllerLauncher: %r" % self.args) - return super(PBSControllerLauncher, self).start(1, profile_dir) + return super(PBSControllerLauncher, self).start(1) -class PBSEngineSetLauncher(PBSLauncher): +class PBSEngineSetLauncher(PBSLauncher, BatchClusterAppMixin): """Launch Engines using PBS""" batch_file_name = Unicode(u'pbs_engines', config=True, help="batch file name for the engine(s) job.") default_template= Unicode(u"""#!/bin/sh #PBS -V #PBS -N ipengine -%s --profile-dir={profile_dir} +%s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}" """%(' '.join(ipengine_cmd_argv))) - def start(self, n, profile_dir): + def start(self, n): """Start n engines by profile or profile_dir.""" self.log.info('Starting %i engines with PBSEngineSetLauncher: %r' % (n, self.args)) - return super(PBSEngineSetLauncher, self).start(n, profile_dir) + return super(PBSEngineSetLauncher, self).start(n) #SGE is very similar to PBS @@ -979,7 +1005,7 @@ class SGELauncher(PBSLauncher): queue_regexp = Unicode('#\$\W+-q\W+\$?\w+') queue_template = Unicode('#$ -q {queue}') -class SGEControllerLauncher(SGELauncher): +class SGEControllerLauncher(SGELauncher, BatchClusterAppMixin): """Launch a controller using SGE.""" batch_file_name = Unicode(u'sge_controller', config=True, @@ -987,28 +1013,28 @@ class SGEControllerLauncher(SGELauncher): default_template= Unicode(u"""#$ -V #$ -S /bin/sh #$ -N ipcontroller -%s --log-to-file --profile-dir={profile_dir} +%s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}" """%(' '.join(ipcontroller_cmd_argv))) - def start(self, profile_dir): + def start(self): """Start the controller by profile or profile_dir.""" self.log.info("Starting PBSControllerLauncher: %r" % self.args) - return super(SGEControllerLauncher, self).start(1, profile_dir) + return super(SGEControllerLauncher, self).start(1) -class SGEEngineSetLauncher(SGELauncher): +class SGEEngineSetLauncher(SGELauncher, BatchClusterAppMixin): """Launch Engines with SGE""" batch_file_name = Unicode(u'sge_engines', config=True, help="batch file name for the engine(s) job.") default_template = Unicode("""#$ -V #$ -S /bin/sh #$ -N ipengine -%s --profile-dir={profile_dir} +%s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}" """%(' '.join(ipengine_cmd_argv))) - def start(self, n, profile_dir): + def start(self, n): """Start n engines by profile or profile_dir.""" self.log.info('Starting %i engines with SGEEngineSetLauncher: %r' % (n, self.args)) - return super(SGEEngineSetLauncher, self).start(n, profile_dir) + return super(SGEEngineSetLauncher, self).start(n) # LSF launchers @@ -1029,7 +1055,7 @@ class LSFLauncher(BatchSystemLauncher): queue_regexp = Unicode('#BSUB[ \t]+-q[ \t]+\w+') queue_template = Unicode('#BSUB -q {queue}') - def start(self, n, profile_dir): + def start(self, n): """Start n copies of the process using LSF batch system. This cant inherit from the base class because bsub expects to be piped a shell script in order to honor the #BSUB directives : @@ -1037,8 +1063,6 @@ class LSFLauncher(BatchSystemLauncher): """ # Here we save profile_dir in the context so they # can be used in the batch script template as {profile_dir} - self.context['profile_dir'] = profile_dir - self.profile_dir = unicode(profile_dir) self.write_batch_script(n) #output = check_output(self.args, env=os.environ) piped_cmd = self.args[0]+'<\"'+self.args[1]+'\"' @@ -1049,7 +1073,7 @@ class LSFLauncher(BatchSystemLauncher): return job_id -class LSFControllerLauncher(LSFLauncher): +class LSFControllerLauncher(LSFLauncher, BatchClusterAppMixin): """Launch a controller using LSF.""" batch_file_name = Unicode(u'lsf_controller', config=True, @@ -1058,29 +1082,29 @@ class LSFControllerLauncher(LSFLauncher): #BSUB -J ipcontroller #BSUB -oo ipcontroller.o.%%J #BSUB -eo ipcontroller.e.%%J - %s --log-to-file --profile-dir={profile_dir} + %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}" """%(' '.join(ipcontroller_cmd_argv))) - def start(self, profile_dir): + def start(self): """Start the controller by profile or profile_dir.""" self.log.info("Starting LSFControllerLauncher: %r" % self.args) - return super(LSFControllerLauncher, self).start(1, profile_dir) + return super(LSFControllerLauncher, self).start(1) -class LSFEngineSetLauncher(LSFLauncher): +class LSFEngineSetLauncher(LSFLauncher, BatchClusterAppMixin): """Launch Engines using LSF""" batch_file_name = Unicode(u'lsf_engines', config=True, help="batch file name for the engine(s) job.") default_template= Unicode(u"""#!/bin/sh #BSUB -oo ipengine.o.%%J #BSUB -eo ipengine.e.%%J - %s --profile-dir={profile_dir} + %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}" """%(' '.join(ipengine_cmd_argv))) - def start(self, n, profile_dir): + def start(self, n): """Start n engines by profile or profile_dir.""" self.log.info('Starting %i engines with LSFEngineSetLauncher: %r' % (n, self.args)) - return super(LSFEngineSetLauncher, self).start(n, profile_dir) + return super(LSFEngineSetLauncher, self).start(n) #-----------------------------------------------------------------------------