##// END OF EJS Templates
Merge PR #795 (cluster-id and launcher cleanup)...
MinRK -
r4852:3994edb7 merge
parent child Browse files
Show More
@@ -207,8 +207,9 b' class Configurable(HasTraits):'
207 for parent in cls.mro():
207 for parent in cls.mro():
208 # only include parents that are not base classes
208 # only include parents that are not base classes
209 # and are not the class itself
209 # and are not the class itself
210 if issubclass(parent, Configurable) and \
210 # and have some configurable traits to inherit
211 not parent in (Configurable, SingletonConfigurable, cls):
211 if parent is not cls and issubclass(parent, Configurable) and \
212 parent.class_traits(config=True):
212 parents.append(parent)
213 parents.append(parent)
213
214
214 if parents:
215 if parents:
@@ -75,6 +75,7 b' base_aliases.update({'
75 'log-to-file' : 'BaseParallelApplication.log_to_file',
75 'log-to-file' : 'BaseParallelApplication.log_to_file',
76 'clean-logs' : 'BaseParallelApplication.clean_logs',
76 'clean-logs' : 'BaseParallelApplication.clean_logs',
77 'log-url' : 'BaseParallelApplication.log_url',
77 'log-url' : 'BaseParallelApplication.log_url',
78 'cluster-id' : 'BaseParallelApplication.cluster_id',
78 })
79 })
79
80
80 base_flags = {
81 base_flags = {
@@ -116,6 +117,22 b' class BaseParallelApplication(BaseIPythonApplication):'
116 log_url = Unicode('', config=True,
117 log_url = Unicode('', config=True,
117 help="The ZMQ URL of the iplogger to aggregate logging.")
118 help="The ZMQ URL of the iplogger to aggregate logging.")
118
119
120 cluster_id = Unicode('', config=True,
121 help="""String id to add to runtime files, to prevent name collisions when
122 using multiple clusters with a single profile simultaneously.
123
124 When set, files will be named like: 'ipcontroller-<cluster_id>-engine.json'
125
126 Since this is text inserted into filenames, typical recommendations apply:
127 Simple character strings are ideal, and spaces are not recommended (but should
128 generally work).
129 """
130 )
131 def _cluster_id_changed(self, name, old, new):
132 self.name = self.__class__.name
133 if new:
134 self.name += '-%s'%new
135
119 def _config_files_default(self):
136 def _config_files_default(self):
120 return ['ipcontroller_config.py', 'ipengine_config.py', 'ipcluster_config.py']
137 return ['ipcontroller_config.py', 'ipengine_config.py', 'ipcluster_config.py']
121
138
@@ -275,19 +275,22 b' class IPClusterEngines(BaseParallelApplication):'
275 self.init_launchers()
275 self.init_launchers()
276
276
277 def init_launchers(self):
277 def init_launchers(self):
278 self.engine_launcher = self.build_launcher(self.engine_launcher_class)
278 self.engine_launcher = self.build_launcher(self.engine_launcher_class, 'EngineSet')
279 self.engine_launcher.on_stop(lambda r: self.loop.stop())
279 self.engine_launcher.on_stop(lambda r: self.loop.stop())
280
280
281 def init_signal(self):
281 def init_signal(self):
282 # Setup signals
282 # Setup signals
283 signal.signal(signal.SIGINT, self.sigint_handler)
283 signal.signal(signal.SIGINT, self.sigint_handler)
284
284
285 def build_launcher(self, clsname):
285 def build_launcher(self, clsname, kind=None):
286 """import and instantiate a Launcher based on importstring"""
286 """import and instantiate a Launcher based on importstring"""
287 if '.' not in clsname:
287 if '.' not in clsname:
288 # not a module, presume it's the raw name in apps.launcher
288 # not a module, presume it's the raw name in apps.launcher
289 if kind and kind not in clsname:
290 # doesn't match necessary full class name, assume it's
291 # just 'PBS' or 'MPIExec' prefix:
292 clsname = clsname + kind + 'Launcher'
289 clsname = 'IPython.parallel.apps.launcher.'+clsname
293 clsname = 'IPython.parallel.apps.launcher.'+clsname
290 # print repr(clsname)
291 try:
294 try:
292 klass = import_item(clsname)
295 klass = import_item(clsname)
293 except (ImportError, KeyError):
296 except (ImportError, KeyError):
@@ -295,16 +298,14 b' class IPClusterEngines(BaseParallelApplication):'
295 self.exit(1)
298 self.exit(1)
296
299
297 launcher = klass(
300 launcher = klass(
298 work_dir=u'.', config=self.config, log=self.log
301 work_dir=u'.', config=self.config, log=self.log,
302 profile_dir=self.profile_dir.location, cluster_id=self.cluster_id,
299 )
303 )
300 return launcher
304 return launcher
301
305
302 def start_engines(self):
306 def start_engines(self):
303 self.log.info("Starting %i engines"%self.n)
307 self.log.info("Starting %i engines"%self.n)
304 self.engine_launcher.start(
308 self.engine_launcher.start(self.n)
305 self.n,
306 self.profile_dir.location
307 )
308
309
309 def stop_engines(self):
310 def stop_engines(self):
310 self.log.info("Stopping Engines...")
311 self.log.info("Stopping Engines...")
@@ -424,14 +425,12 b' class IPClusterStart(IPClusterEngines):'
424 aliases = Dict(start_aliases)
425 aliases = Dict(start_aliases)
425
426
426 def init_launchers(self):
427 def init_launchers(self):
427 self.controller_launcher = self.build_launcher(self.controller_launcher_class)
428 self.controller_launcher = self.build_launcher(self.controller_launcher_class, 'Controller')
428 self.engine_launcher = self.build_launcher(self.engine_launcher_class)
429 self.engine_launcher = self.build_launcher(self.engine_launcher_class, 'EngineSet')
429 self.controller_launcher.on_stop(self.stop_launchers)
430 self.controller_launcher.on_stop(self.stop_launchers)
430
431
431 def start_controller(self):
432 def start_controller(self):
432 self.controller_launcher.start(
433 self.controller_launcher.start()
433 self.profile_dir.location
434 )
435
434
436 def stop_controller(self):
435 def stop_controller(self):
437 # self.log.info("In stop_controller")
436 # self.log.info("In stop_controller")
@@ -174,7 +174,18 b' class IPControllerApp(BaseParallelApplication):'
174
174
175 use_threads = Bool(False, config=True,
175 use_threads = Bool(False, config=True,
176 help='Use threads instead of processes for the schedulers',
176 help='Use threads instead of processes for the schedulers',
177 )
177 )
178
179 engine_json_file = Unicode('ipcontroller-engine.json', config=True,
180 help="JSON filename where engine connection info will be stored.")
181 client_json_file = Unicode('ipcontroller-client.json', config=True,
182 help="JSON filename where client connection info will be stored.")
183
184 def _cluster_id_changed(self, name, old, new):
185 super(IPControllerApp, self)._cluster_id_changed(name, old, new)
186 self.engine_json_file = "%s-engine.json" % self.name
187 self.client_json_file = "%s-client.json" % self.name
188
178
189
179 # internal
190 # internal
180 children = List()
191 children = List()
@@ -215,7 +226,7 b' class IPControllerApp(BaseParallelApplication):'
215 """load config from existing json connector files."""
226 """load config from existing json connector files."""
216 c = self.config
227 c = self.config
217 # load from engine config
228 # load from engine config
218 with open(os.path.join(self.profile_dir.security_dir, 'ipcontroller-engine.json')) as f:
229 with open(os.path.join(self.profile_dir.security_dir, self.engine_json_file)) as f:
219 cfg = json.loads(f.read())
230 cfg = json.loads(f.read())
220 key = c.Session.key = asbytes(cfg['exec_key'])
231 key = c.Session.key = asbytes(cfg['exec_key'])
221 xport,addr = cfg['url'].split('://')
232 xport,addr = cfg['url'].split('://')
@@ -227,7 +238,7 b' class IPControllerApp(BaseParallelApplication):'
227 if not self.engine_ssh_server:
238 if not self.engine_ssh_server:
228 self.engine_ssh_server = cfg['ssh']
239 self.engine_ssh_server = cfg['ssh']
229 # load client config
240 # load client config
230 with open(os.path.join(self.profile_dir.security_dir, 'ipcontroller-client.json')) as f:
241 with open(os.path.join(self.profile_dir.security_dir, self.client_json_file)) as f:
231 cfg = json.loads(f.read())
242 cfg = json.loads(f.read())
232 assert key == cfg['exec_key'], "exec_key mismatch between engine and client keys"
243 assert key == cfg['exec_key'], "exec_key mismatch between engine and client keys"
233 xport,addr = cfg['url'].split('://')
244 xport,addr = cfg['url'].split('://')
@@ -277,11 +288,11 b' class IPControllerApp(BaseParallelApplication):'
277 'url' : "%s://%s:%s"%(f.client_transport, f.client_ip, f.regport),
288 'url' : "%s://%s:%s"%(f.client_transport, f.client_ip, f.regport),
278 'location' : self.location
289 'location' : self.location
279 }
290 }
280 self.save_connection_dict('ipcontroller-client.json', cdict)
291 self.save_connection_dict(self.client_json_file, cdict)
281 edict = cdict
292 edict = cdict
282 edict['url']="%s://%s:%s"%((f.client_transport, f.client_ip, f.regport))
293 edict['url']="%s://%s:%s"%((f.client_transport, f.client_ip, f.regport))
283 edict['ssh'] = self.engine_ssh_server
294 edict['ssh'] = self.engine_ssh_server
284 self.save_connection_dict('ipcontroller-engine.json', edict)
295 self.save_connection_dict(self.engine_json_file, edict)
285
296
286 #
297 #
287 def init_schedulers(self):
298 def init_schedulers(self):
@@ -93,7 +93,7 b' class MPI(Configurable):'
93 help='How to enable MPI (mpi4py, pytrilinos, or empty string to disable).'
93 help='How to enable MPI (mpi4py, pytrilinos, or empty string to disable).'
94 )
94 )
95
95
96 def _on_use_changed(self, old, new):
96 def _use_changed(self, name, old, new):
97 # load default init script if it's not set
97 # load default init script if it's not set
98 if not self.init_script:
98 if not self.init_script:
99 self.init_script = self.default_inits.get(new, '')
99 self.init_script = self.default_inits.get(new, '')
@@ -135,8 +135,8 b' aliases.update(base_aliases)'
135
135
136 class IPEngineApp(BaseParallelApplication):
136 class IPEngineApp(BaseParallelApplication):
137
137
138 name = Unicode(u'ipengine')
138 name = 'ipengine'
139 description = Unicode(_description)
139 description = _description
140 examples = _examples
140 examples = _examples
141 config_file_name = Unicode(default_config_file_name)
141 config_file_name = Unicode(default_config_file_name)
142 classes = List([ProfileDir, Session, EngineFactory, Kernel, MPI])
142 classes = List([ProfileDir, Session, EngineFactory, Kernel, MPI])
@@ -158,7 +158,15 b' class IPEngineApp(BaseParallelApplication):'
158 controller and engine are started at the same time and it
158 controller and engine are started at the same time and it
159 may take a moment for the controller to write the connector files.""")
159 may take a moment for the controller to write the connector files.""")
160
160
161 url_file_name = Unicode(u'ipcontroller-engine.json')
161 url_file_name = Unicode(u'ipcontroller-engine.json', config=True)
162
163 def _cluster_id_changed(self, name, old, new):
164 if new:
165 base = 'ipcontroller-%s' % new
166 else:
167 base = 'ipcontroller'
168 self.url_file_name = "%s-engine.json" % base
169
162 log_url = Unicode('', config=True,
170 log_url = Unicode('', config=True,
163 help="""The URL for the iploggerapp instance, for forwarding
171 help="""The URL for the iploggerapp instance, for forwarding
164 logging to a central location.""")
172 logging to a central location.""")
@@ -57,7 +57,9 b' from zmq.eventloop import ioloop'
57 from IPython.config.application import Application
57 from IPython.config.application import Application
58 from IPython.config.configurable import LoggingConfigurable
58 from IPython.config.configurable import LoggingConfigurable
59 from IPython.utils.text import EvalFormatter
59 from IPython.utils.text import EvalFormatter
60 from IPython.utils.traitlets import Any, Int, CFloat, List, Unicode, Dict, Instance
60 from IPython.utils.traitlets import (
61 Any, Int, CFloat, List, Unicode, Dict, Instance, HasTraits,
62 )
61 from IPython.utils.path import get_ipython_module_path
63 from IPython.utils.path import get_ipython_module_path
62 from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError
64 from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError
63
65
@@ -213,6 +215,33 b' class BaseLauncher(LoggingConfigurable):'
213 """
215 """
214 raise NotImplementedError('signal must be implemented in a subclass')
216 raise NotImplementedError('signal must be implemented in a subclass')
215
217
218 class ClusterAppMixin(HasTraits):
219 """MixIn for cluster args as traits"""
220 cluster_args = List([])
221 profile_dir=Unicode('')
222 cluster_id=Unicode('')
223 def _profile_dir_changed(self, name, old, new):
224 self.cluster_args = []
225 if self.profile_dir:
226 self.cluster_args.extend(['--profile-dir', self.profile_dir])
227 if self.cluster_id:
228 self.cluster_args.extend(['--cluster-id', self.cluster_id])
229 _cluster_id_changed = _profile_dir_changed
230
231 class ControllerMixin(ClusterAppMixin):
232 controller_cmd = List(ipcontroller_cmd_argv, config=True,
233 help="""Popen command to launch ipcontroller.""")
234 # Command line arguments to ipcontroller.
235 controller_args = List(['--log-to-file','--log-level=%i' % logging.INFO], config=True,
236 help="""command-line args to pass to ipcontroller""")
237
238 class EngineMixin(ClusterAppMixin):
239 engine_cmd = List(ipengine_cmd_argv, config=True,
240 help="""command to launch the Engine.""")
241 # Command line arguments for ipengine.
242 engine_args = List(['--log-to-file','--log-level=%i' % logging.INFO], config=True,
243 help="command-line arguments to pass to ipengine"
244 )
216
245
217 #-----------------------------------------------------------------------------
246 #-----------------------------------------------------------------------------
218 # Local process launchers
247 # Local process launchers
@@ -317,54 +346,28 b' class LocalProcessLauncher(BaseLauncher):'
317 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
346 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
318 return status
347 return status
319
348
320 class LocalControllerLauncher(LocalProcessLauncher):
349 class LocalControllerLauncher(LocalProcessLauncher, ControllerMixin):
321 """Launch a controller as a regular external process."""
350 """Launch a controller as a regular external process."""
322
351
323 controller_cmd = List(ipcontroller_cmd_argv, config=True,
324 help="""Popen command to launch ipcontroller.""")
325 # Command line arguments to ipcontroller.
326 controller_args = List(['--log-to-file','--log-level=%i'%logging.INFO], config=True,
327 help="""command-line args to pass to ipcontroller""")
328
329 def find_args(self):
352 def find_args(self):
330 return self.controller_cmd + self.controller_args
353 return self.controller_cmd + self.cluster_args + self.controller_args
331
354
332 def start(self, profile_dir):
355 def start(self):
333 """Start the controller by profile_dir."""
356 """Start the controller by profile_dir."""
334 self.controller_args.extend(['--profile-dir=%s'%profile_dir])
335 self.profile_dir = unicode(profile_dir)
336 self.log.info("Starting LocalControllerLauncher: %r" % self.args)
357 self.log.info("Starting LocalControllerLauncher: %r" % self.args)
337 return super(LocalControllerLauncher, self).start()
358 return super(LocalControllerLauncher, self).start()
338
359
339
360
340 class LocalEngineLauncher(LocalProcessLauncher):
361 class LocalEngineLauncher(LocalProcessLauncher, EngineMixin):
341 """Launch a single engine as a regular externall process."""
362 """Launch a single engine as a regular externall process."""
342
363
343 engine_cmd = List(ipengine_cmd_argv, config=True,
344 help="""command to launch the Engine.""")
345 # Command line arguments for ipengine.
346 engine_args = List(['--log-to-file','--log-level=%i'%logging.INFO], config=True,
347 help="command-line arguments to pass to ipengine"
348 )
349
350 def find_args(self):
364 def find_args(self):
351 return self.engine_cmd + self.engine_args
365 return self.engine_cmd + self.cluster_args + self.engine_args
352
366
353 def start(self, profile_dir):
354 """Start the engine by profile_dir."""
355 self.engine_args.extend(['--profile-dir=%s'%profile_dir])
356 self.profile_dir = unicode(profile_dir)
357 return super(LocalEngineLauncher, self).start()
358
367
359
368 class LocalEngineSetLauncher(LocalEngineLauncher):
360 class LocalEngineSetLauncher(BaseLauncher):
361 """Launch a set of engines as regular external processes."""
369 """Launch a set of engines as regular external processes."""
362
370
363 # Command line arguments for ipengine.
364 engine_args = List(
365 ['--log-to-file','--log-level=%i'%logging.INFO], config=True,
366 help="command-line arguments to pass to ipengine"
367 )
368 delay = CFloat(0.1, config=True,
371 delay = CFloat(0.1, config=True,
369 help="""delay (in seconds) between starting each engine after the first.
372 help="""delay (in seconds) between starting each engine after the first.
370 This can help force the engines to get their ids in order, or limit
373 This can help force the engines to get their ids in order, or limit
@@ -383,26 +386,26 b' class LocalEngineSetLauncher(BaseLauncher):'
383 )
386 )
384 self.stop_data = {}
387 self.stop_data = {}
385
388
386 def start(self, n, profile_dir):
389 def start(self, n):
387 """Start n engines by profile or profile_dir."""
390 """Start n engines by profile or profile_dir."""
388 self.profile_dir = unicode(profile_dir)
389 dlist = []
391 dlist = []
390 for i in range(n):
392 for i in range(n):
391 if i > 0:
393 if i > 0:
392 time.sleep(self.delay)
394 time.sleep(self.delay)
393 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log)
395 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log,
396 profile_dir=self.profile_dir, cluster_id=self.cluster_id,
397 )
398
394 # Copy the engine args over to each engine launcher.
399 # Copy the engine args over to each engine launcher.
400 el.engine_cmd = copy.deepcopy(self.engine_cmd)
395 el.engine_args = copy.deepcopy(self.engine_args)
401 el.engine_args = copy.deepcopy(self.engine_args)
396 el.on_stop(self._notice_engine_stopped)
402 el.on_stop(self._notice_engine_stopped)
397 d = el.start(profile_dir)
403 d = el.start()
398 if i==0:
404 if i==0:
399 self.log.info("Starting LocalEngineSetLauncher: %r" % el.args)
405 self.log.info("Starting LocalEngineSetLauncher: %r" % el.args)
400 self.launchers[i] = el
406 self.launchers[i] = el
401 dlist.append(d)
407 dlist.append(d)
402 self.notify_start(dlist)
408 self.notify_start(dlist)
403 # The consumeErrors here could be dangerous
404 # dfinal = gatherBoth(dlist, consumeErrors=True)
405 # dfinal.addCallback(self.notify_start)
406 return dlist
409 return dlist
407
410
408 def find_args(self):
411 def find_args(self):
@@ -413,7 +416,6 b' class LocalEngineSetLauncher(BaseLauncher):'
413 for el in self.launchers.itervalues():
416 for el in self.launchers.itervalues():
414 d = el.signal(sig)
417 d = el.signal(sig)
415 dlist.append(d)
418 dlist.append(d)
416 # dfinal = gatherBoth(dlist, consumeErrors=True)
417 return dlist
419 return dlist
418
420
419 def interrupt_then_kill(self, delay=1.0):
421 def interrupt_then_kill(self, delay=1.0):
@@ -421,7 +423,6 b' class LocalEngineSetLauncher(BaseLauncher):'
421 for el in self.launchers.itervalues():
423 for el in self.launchers.itervalues():
422 d = el.interrupt_then_kill(delay)
424 d = el.interrupt_then_kill(delay)
423 dlist.append(d)
425 dlist.append(d)
424 # dfinal = gatherBoth(dlist, consumeErrors=True)
425 return dlist
426 return dlist
426
427
427 def stop(self):
428 def stop(self):
@@ -452,9 +453,9 b' class MPIExecLauncher(LocalProcessLauncher):'
452 mpi_args = List([], config=True,
453 mpi_args = List([], config=True,
453 help="The command line arguments to pass to mpiexec."
454 help="The command line arguments to pass to mpiexec."
454 )
455 )
455 program = List(['date'], config=True,
456 program = List(['date'],
456 help="The program to start via mpiexec.")
457 help="The program to start via mpiexec.")
457 program_args = List([], config=True,
458 program_args = List([],
458 help="The command line argument to the program."
459 help="The command line argument to the program."
459 )
460 )
460 n = Int(1)
461 n = Int(1)
@@ -470,44 +471,42 b' class MPIExecLauncher(LocalProcessLauncher):'
470 return super(MPIExecLauncher, self).start()
471 return super(MPIExecLauncher, self).start()
471
472
472
473
473 class MPIExecControllerLauncher(MPIExecLauncher):
474 class MPIExecControllerLauncher(MPIExecLauncher, ControllerMixin):
474 """Launch a controller using mpiexec."""
475 """Launch a controller using mpiexec."""
475
476
476 controller_cmd = List(ipcontroller_cmd_argv, config=True,
477 # alias back to *non-configurable* program[_args] for use in find_args()
477 help="Popen command to launch the Contropper"
478 # this way all Controller/EngineSetLaunchers have the same form, rather
478 )
479 # than *some* having `program_args` and others `controller_args`
479 controller_args = List(['--log-to-file','--log-level=%i'%logging.INFO], config=True,
480 @property
480 help="Command line arguments to pass to ipcontroller."
481 def program(self):
481 )
482 return self.controller_cmd
482 n = Int(1)
483
484 @property
485 def program_args(self):
486 return self.cluster_args + self.controller_args
483
487
484 def start(self, profile_dir):
488 def start(self):
485 """Start the controller by profile_dir."""
489 """Start the controller by profile_dir."""
486 self.controller_args.extend(['--profile-dir=%s'%profile_dir])
487 self.profile_dir = unicode(profile_dir)
488 self.log.info("Starting MPIExecControllerLauncher: %r" % self.args)
490 self.log.info("Starting MPIExecControllerLauncher: %r" % self.args)
489 return super(MPIExecControllerLauncher, self).start(1)
491 return super(MPIExecControllerLauncher, self).start(1)
490
492
491 def find_args(self):
492 return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \
493 self.controller_cmd + self.controller_args
494
495
493
496 class MPIExecEngineSetLauncher(MPIExecLauncher):
494 class MPIExecEngineSetLauncher(MPIExecLauncher, EngineMixin):
495 """Launch engines using mpiexec"""
497
496
498 program = List(ipengine_cmd_argv, config=True,
497 # alias back to *non-configurable* program[_args] for use in find_args()
499 help="Popen command for ipengine"
498 # this way all Controller/EngineSetLaunchers have the same form, rather
500 )
499 # than *some* having `program_args` and others `controller_args`
501 program_args = List(
500 @property
502 ['--log-to-file','--log-level=%i'%logging.INFO], config=True,
501 def program(self):
503 help="Command line arguments for ipengine."
502 return self.engine_cmd
504 )
503
505 n = Int(1)
504 @property
505 def program_args(self):
506 return self.cluster_args + self.engine_args
506
507
507 def start(self, n, profile_dir):
508 def start(self, n):
508 """Start n engines by profile or profile_dir."""
509 """Start n engines by profile or profile_dir."""
509 self.program_args.extend(['--profile-dir=%s'%profile_dir])
510 self.profile_dir = unicode(profile_dir)
511 self.n = n
510 self.n = n
512 self.log.info('Starting MPIExecEngineSetLauncher: %r' % self.args)
511 self.log.info('Starting MPIExecEngineSetLauncher: %r' % self.args)
513 return super(MPIExecEngineSetLauncher, self).start(n)
512 return super(MPIExecEngineSetLauncher, self).start(n)
@@ -530,9 +529,9 b' class SSHLauncher(LocalProcessLauncher):'
530 help="command for starting ssh")
529 help="command for starting ssh")
531 ssh_args = List(['-tt'], config=True,
530 ssh_args = List(['-tt'], config=True,
532 help="args to pass to ssh")
531 help="args to pass to ssh")
533 program = List(['date'], config=True,
532 program = List(['date'],
534 help="Program to launch via ssh")
533 help="Program to launch via ssh")
535 program_args = List([], config=True,
534 program_args = List([],
536 help="args to pass to remote program")
535 help="args to pass to remote program")
537 hostname = Unicode('', config=True,
536 hostname = Unicode('', config=True,
538 help="hostname on which to launch the program")
537 help="hostname on which to launch the program")
@@ -554,8 +553,7 b' class SSHLauncher(LocalProcessLauncher):'
554 return self.ssh_cmd + self.ssh_args + [self.location] + \
553 return self.ssh_cmd + self.ssh_args + [self.location] + \
555 self.program + self.program_args
554 self.program + self.program_args
556
555
557 def start(self, profile_dir, hostname=None, user=None):
556 def start(self, hostname=None, user=None):
558 self.profile_dir = unicode(profile_dir)
559 if hostname is not None:
557 if hostname is not None:
560 self.hostname = hostname
558 self.hostname = hostname
561 if user is not None:
559 if user is not None:
@@ -571,22 +569,33 b' class SSHLauncher(LocalProcessLauncher):'
571
569
572
570
573
571
574 class SSHControllerLauncher(SSHLauncher):
572 class SSHControllerLauncher(SSHLauncher, ControllerMixin):
575
573
576 program = List(ipcontroller_cmd_argv, config=True,
574 # alias back to *non-configurable* program[_args] for use in find_args()
577 help="remote ipcontroller command.")
575 # this way all Controller/EngineSetLaunchers have the same form, rather
578 program_args = List(['--reuse-files', '--log-to-file','--log-level=%i'%logging.INFO], config=True,
576 # than *some* having `program_args` and others `controller_args`
579 help="Command line arguments to ipcontroller.")
577 @property
578 def program(self):
579 return self.controller_cmd
580
581 @property
582 def program_args(self):
583 return self.cluster_args + self.controller_args
580
584
581
585
582 class SSHEngineLauncher(SSHLauncher):
586 class SSHEngineLauncher(SSHLauncher, EngineMixin):
583 program = List(ipengine_cmd_argv, config=True,
587
584 help="remote ipengine command.")
588 # alias back to *non-configurable* program[_args] for use in find_args()
585 # Command line arguments for ipengine.
589 # this way all Controller/EngineSetLaunchers have the same form, rather
586 program_args = List(
590 # than *some* having `program_args` and others `controller_args`
587 ['--log-to-file','--log_level=%i'%logging.INFO], config=True,
591 @property
588 help="Command line arguments to ipengine."
592 def program(self):
589 )
593 return self.engine_cmd
594
595 @property
596 def program_args(self):
597 return self.cluster_args + self.engine_args
598
590
599
591 class SSHEngineSetLauncher(LocalEngineSetLauncher):
600 class SSHEngineSetLauncher(LocalEngineSetLauncher):
592 launcher_class = SSHEngineLauncher
601 launcher_class = SSHEngineLauncher
@@ -594,12 +603,11 b' class SSHEngineSetLauncher(LocalEngineSetLauncher):'
594 help="""dict of engines to launch. This is a dict by hostname of ints,
603 help="""dict of engines to launch. This is a dict by hostname of ints,
595 corresponding to the number of engines to start on that host.""")
604 corresponding to the number of engines to start on that host.""")
596
605
597 def start(self, n, profile_dir):
606 def start(self, n):
598 """Start engines by profile or profile_dir.
607 """Start engines by profile or profile_dir.
599 `n` is ignored, and the `engines` config property is used instead.
608 `n` is ignored, and the `engines` config property is used instead.
600 """
609 """
601
610
602 self.profile_dir = unicode(profile_dir)
603 dlist = []
611 dlist = []
604 for host, n in self.engines.iteritems():
612 for host, n in self.engines.iteritems():
605 if isinstance(n, (tuple, list)):
613 if isinstance(n, (tuple, list)):
@@ -614,13 +622,15 b' class SSHEngineSetLauncher(LocalEngineSetLauncher):'
614 for i in range(n):
622 for i in range(n):
615 if i > 0:
623 if i > 0:
616 time.sleep(self.delay)
624 time.sleep(self.delay)
617 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log)
625 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log,
626 profile_dir=self.profile_dir, cluster_id=self.cluster_id,
627 )
618
628
619 # Copy the engine args over to each engine launcher.
629 # Copy the engine args over to each engine launcher.
620 i
630 el.engine_cmd = self.engine_cmd
621 el.program_args = args
631 el.engine_args = args
622 el.on_stop(self._notice_engine_stopped)
632 el.on_stop(self._notice_engine_stopped)
623 d = el.start(profile_dir, user=user, hostname=host)
633 d = el.start(user=user, hostname=host)
624 if i==0:
634 if i==0:
625 self.log.info("Starting SSHEngineSetLauncher: %r" % el.args)
635 self.log.info("Starting SSHEngineSetLauncher: %r" % el.args)
626 self.launchers[host+str(i)] = el
636 self.launchers[host+str(i)] = el
@@ -727,11 +737,11 b' class WindowsHPCLauncher(BaseLauncher):'
727 return output
737 return output
728
738
729
739
730 class WindowsHPCControllerLauncher(WindowsHPCLauncher):
740 class WindowsHPCControllerLauncher(WindowsHPCLauncher, ClusterAppMixin):
731
741
732 job_file_name = Unicode(u'ipcontroller_job.xml', config=True,
742 job_file_name = Unicode(u'ipcontroller_job.xml', config=True,
733 help="WinHPC xml job file.")
743 help="WinHPC xml job file.")
734 extra_args = List([], config=False,
744 controller_args = List([], config=False,
735 help="extra args to pass to ipcontroller")
745 help="extra args to pass to ipcontroller")
736
746
737 def write_job_file(self, n):
747 def write_job_file(self, n):
@@ -743,7 +753,8 b' class WindowsHPCControllerLauncher(WindowsHPCLauncher):'
743 # files that the scheduler redirects to.
753 # files that the scheduler redirects to.
744 t.work_directory = self.profile_dir
754 t.work_directory = self.profile_dir
745 # Add the profile_dir and from self.start().
755 # Add the profile_dir and from self.start().
746 t.controller_args.extend(self.extra_args)
756 t.controller_args.extend(self.cluster_args)
757 t.controller_args.extend(self.controller_args)
747 job.add_task(t)
758 job.add_task(t)
748
759
749 self.log.info("Writing job description file: %s" % self.job_file)
760 self.log.info("Writing job description file: %s" % self.job_file)
@@ -753,18 +764,16 b' class WindowsHPCControllerLauncher(WindowsHPCLauncher):'
753 def job_file(self):
764 def job_file(self):
754 return os.path.join(self.profile_dir, self.job_file_name)
765 return os.path.join(self.profile_dir, self.job_file_name)
755
766
756 def start(self, profile_dir):
767 def start(self):
757 """Start the controller by profile_dir."""
768 """Start the controller by profile_dir."""
758 self.extra_args = ['--profile-dir=%s'%profile_dir]
759 self.profile_dir = unicode(profile_dir)
760 return super(WindowsHPCControllerLauncher, self).start(1)
769 return super(WindowsHPCControllerLauncher, self).start(1)
761
770
762
771
763 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):
772 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher, ClusterAppMixin):
764
773
765 job_file_name = Unicode(u'ipengineset_job.xml', config=True,
774 job_file_name = Unicode(u'ipengineset_job.xml', config=True,
766 help="jobfile for ipengines job")
775 help="jobfile for ipengines job")
767 extra_args = List([], config=False,
776 engine_args = List([], config=False,
768 help="extra args to pas to ipengine")
777 help="extra args to pas to ipengine")
769
778
770 def write_job_file(self, n):
779 def write_job_file(self, n):
@@ -777,7 +786,8 b' class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):'
777 # files that the scheduler redirects to.
786 # files that the scheduler redirects to.
778 t.work_directory = self.profile_dir
787 t.work_directory = self.profile_dir
779 # Add the profile_dir and from self.start().
788 # Add the profile_dir and from self.start().
780 t.engine_args.extend(self.extra_args)
789 t.controller_args.extend(self.cluster_args)
790 t.controller_args.extend(self.engine_args)
781 job.add_task(t)
791 job.add_task(t)
782
792
783 self.log.info("Writing job description file: %s" % self.job_file)
793 self.log.info("Writing job description file: %s" % self.job_file)
@@ -787,10 +797,8 b' class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):'
787 def job_file(self):
797 def job_file(self):
788 return os.path.join(self.profile_dir, self.job_file_name)
798 return os.path.join(self.profile_dir, self.job_file_name)
789
799
790 def start(self, n, profile_dir):
800 def start(self, n):
791 """Start the controller by profile_dir."""
801 """Start the controller by profile_dir."""
792 self.extra_args = ['--profile-dir=%s'%profile_dir]
793 self.profile_dir = unicode(profile_dir)
794 return super(WindowsHPCEngineSetLauncher, self).start(n)
802 return super(WindowsHPCEngineSetLauncher, self).start(n)
795
803
796
804
@@ -798,6 +806,20 b' class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):'
798 # Batch (PBS) system launchers
806 # Batch (PBS) system launchers
799 #-----------------------------------------------------------------------------
807 #-----------------------------------------------------------------------------
800
808
809 class BatchClusterAppMixin(ClusterAppMixin):
810 """ClusterApp mixin that updates the self.context dict, rather than cl-args."""
811 def _profile_dir_changed(self, name, old, new):
812 self.context[name] = new
813 _cluster_id_changed = _profile_dir_changed
814
815 def _profile_dir_default(self):
816 self.context['profile_dir'] = ''
817 return ''
818 def _cluster_id_default(self):
819 self.context['cluster_id'] = ''
820 return ''
821
822
801 class BatchSystemLauncher(BaseLauncher):
823 class BatchSystemLauncher(BaseLauncher):
802 """Launch an external process using a batch system.
824 """Launch an external process using a batch system.
803
825
@@ -829,6 +851,12 b' class BatchSystemLauncher(BaseLauncher):'
829 queue = Unicode(u'', config=True,
851 queue = Unicode(u'', config=True,
830 help="The PBS Queue.")
852 help="The PBS Queue.")
831
853
854 def _queue_changed(self, name, old, new):
855 self.context[name] = new
856
857 n = Int(1)
858 _n_changed = _queue_changed
859
832 # not configurable, override in subclasses
860 # not configurable, override in subclasses
833 # PBS Job Array regex
861 # PBS Job Array regex
834 job_array_regexp = Unicode('')
862 job_array_regexp = Unicode('')
@@ -868,8 +896,7 b' class BatchSystemLauncher(BaseLauncher):'
868
896
869 def write_batch_script(self, n):
897 def write_batch_script(self, n):
870 """Instantiate and write the batch script to the work_dir."""
898 """Instantiate and write the batch script to the work_dir."""
871 self.context['n'] = n
899 self.n = n
872 self.context['queue'] = self.queue
873 # first priority is batch_template if set
900 # first priority is batch_template if set
874 if self.batch_template_file and not self.batch_template:
901 if self.batch_template_file and not self.batch_template:
875 # second priority is batch_template_file
902 # second priority is batch_template_file
@@ -902,12 +929,10 b' class BatchSystemLauncher(BaseLauncher):'
902 f.write(script_as_string)
929 f.write(script_as_string)
903 os.chmod(self.batch_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
930 os.chmod(self.batch_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
904
931
905 def start(self, n, profile_dir):
932 def start(self, n):
906 """Start n copies of the process using a batch system."""
933 """Start n copies of the process using a batch system."""
907 # Here we save profile_dir in the context so they
934 # Here we save profile_dir in the context so they
908 # can be used in the batch script template as {profile_dir}
935 # can be used in the batch script template as {profile_dir}
909 self.context['profile_dir'] = profile_dir
910 self.profile_dir = unicode(profile_dir)
911 self.write_batch_script(n)
936 self.write_batch_script(n)
912 output = check_output(self.args, env=os.environ)
937 output = check_output(self.args, env=os.environ)
913
938
@@ -938,7 +963,7 b' class PBSLauncher(BatchSystemLauncher):'
938 queue_template = Unicode('#PBS -q {queue}')
963 queue_template = Unicode('#PBS -q {queue}')
939
964
940
965
941 class PBSControllerLauncher(PBSLauncher):
966 class PBSControllerLauncher(PBSLauncher, BatchClusterAppMixin):
942 """Launch a controller using PBS."""
967 """Launch a controller using PBS."""
943
968
944 batch_file_name = Unicode(u'pbs_controller', config=True,
969 batch_file_name = Unicode(u'pbs_controller', config=True,
@@ -946,29 +971,30 b' class PBSControllerLauncher(PBSLauncher):'
946 default_template= Unicode("""#!/bin/sh
971 default_template= Unicode("""#!/bin/sh
947 #PBS -V
972 #PBS -V
948 #PBS -N ipcontroller
973 #PBS -N ipcontroller
949 %s --log-to-file --profile-dir={profile_dir}
974 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
950 """%(' '.join(ipcontroller_cmd_argv)))
975 """%(' '.join(ipcontroller_cmd_argv)))
951
976
952 def start(self, profile_dir):
977
978 def start(self):
953 """Start the controller by profile or profile_dir."""
979 """Start the controller by profile or profile_dir."""
954 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
980 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
955 return super(PBSControllerLauncher, self).start(1, profile_dir)
981 return super(PBSControllerLauncher, self).start(1)
956
982
957
983
958 class PBSEngineSetLauncher(PBSLauncher):
984 class PBSEngineSetLauncher(PBSLauncher, BatchClusterAppMixin):
959 """Launch Engines using PBS"""
985 """Launch Engines using PBS"""
960 batch_file_name = Unicode(u'pbs_engines', config=True,
986 batch_file_name = Unicode(u'pbs_engines', config=True,
961 help="batch file name for the engine(s) job.")
987 help="batch file name for the engine(s) job.")
962 default_template= Unicode(u"""#!/bin/sh
988 default_template= Unicode(u"""#!/bin/sh
963 #PBS -V
989 #PBS -V
964 #PBS -N ipengine
990 #PBS -N ipengine
965 %s --profile-dir={profile_dir}
991 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
966 """%(' '.join(ipengine_cmd_argv)))
992 """%(' '.join(ipengine_cmd_argv)))
967
993
968 def start(self, n, profile_dir):
994 def start(self, n):
969 """Start n engines by profile or profile_dir."""
995 """Start n engines by profile or profile_dir."""
970 self.log.info('Starting %i engines with PBSEngineSetLauncher: %r' % (n, self.args))
996 self.log.info('Starting %i engines with PBSEngineSetLauncher: %r' % (n, self.args))
971 return super(PBSEngineSetLauncher, self).start(n, profile_dir)
997 return super(PBSEngineSetLauncher, self).start(n)
972
998
973 #SGE is very similar to PBS
999 #SGE is very similar to PBS
974
1000
@@ -979,7 +1005,7 b' class SGELauncher(PBSLauncher):'
979 queue_regexp = Unicode('#\$\W+-q\W+\$?\w+')
1005 queue_regexp = Unicode('#\$\W+-q\W+\$?\w+')
980 queue_template = Unicode('#$ -q {queue}')
1006 queue_template = Unicode('#$ -q {queue}')
981
1007
982 class SGEControllerLauncher(SGELauncher):
1008 class SGEControllerLauncher(SGELauncher, BatchClusterAppMixin):
983 """Launch a controller using SGE."""
1009 """Launch a controller using SGE."""
984
1010
985 batch_file_name = Unicode(u'sge_controller', config=True,
1011 batch_file_name = Unicode(u'sge_controller', config=True,
@@ -987,28 +1013,28 b' class SGEControllerLauncher(SGELauncher):'
987 default_template= Unicode(u"""#$ -V
1013 default_template= Unicode(u"""#$ -V
988 #$ -S /bin/sh
1014 #$ -S /bin/sh
989 #$ -N ipcontroller
1015 #$ -N ipcontroller
990 %s --log-to-file --profile-dir={profile_dir}
1016 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
991 """%(' '.join(ipcontroller_cmd_argv)))
1017 """%(' '.join(ipcontroller_cmd_argv)))
992
1018
993 def start(self, profile_dir):
1019 def start(self):
994 """Start the controller by profile or profile_dir."""
1020 """Start the controller by profile or profile_dir."""
995 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
1021 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
996 return super(SGEControllerLauncher, self).start(1, profile_dir)
1022 return super(SGEControllerLauncher, self).start(1)
997
1023
998 class SGEEngineSetLauncher(SGELauncher):
1024 class SGEEngineSetLauncher(SGELauncher, BatchClusterAppMixin):
999 """Launch Engines with SGE"""
1025 """Launch Engines with SGE"""
1000 batch_file_name = Unicode(u'sge_engines', config=True,
1026 batch_file_name = Unicode(u'sge_engines', config=True,
1001 help="batch file name for the engine(s) job.")
1027 help="batch file name for the engine(s) job.")
1002 default_template = Unicode("""#$ -V
1028 default_template = Unicode("""#$ -V
1003 #$ -S /bin/sh
1029 #$ -S /bin/sh
1004 #$ -N ipengine
1030 #$ -N ipengine
1005 %s --profile-dir={profile_dir}
1031 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1006 """%(' '.join(ipengine_cmd_argv)))
1032 """%(' '.join(ipengine_cmd_argv)))
1007
1033
1008 def start(self, n, profile_dir):
1034 def start(self, n):
1009 """Start n engines by profile or profile_dir."""
1035 """Start n engines by profile or profile_dir."""
1010 self.log.info('Starting %i engines with SGEEngineSetLauncher: %r' % (n, self.args))
1036 self.log.info('Starting %i engines with SGEEngineSetLauncher: %r' % (n, self.args))
1011 return super(SGEEngineSetLauncher, self).start(n, profile_dir)
1037 return super(SGEEngineSetLauncher, self).start(n)
1012
1038
1013
1039
1014 # LSF launchers
1040 # LSF launchers
@@ -1029,7 +1055,7 b' class LSFLauncher(BatchSystemLauncher):'
1029 queue_regexp = Unicode('#BSUB[ \t]+-q[ \t]+\w+')
1055 queue_regexp = Unicode('#BSUB[ \t]+-q[ \t]+\w+')
1030 queue_template = Unicode('#BSUB -q {queue}')
1056 queue_template = Unicode('#BSUB -q {queue}')
1031
1057
1032 def start(self, n, profile_dir):
1058 def start(self, n):
1033 """Start n copies of the process using LSF batch system.
1059 """Start n copies of the process using LSF batch system.
1034 This cant inherit from the base class because bsub expects
1060 This cant inherit from the base class because bsub expects
1035 to be piped a shell script in order to honor the #BSUB directives :
1061 to be piped a shell script in order to honor the #BSUB directives :
@@ -1037,8 +1063,6 b' class LSFLauncher(BatchSystemLauncher):'
1037 """
1063 """
1038 # Here we save profile_dir in the context so they
1064 # Here we save profile_dir in the context so they
1039 # can be used in the batch script template as {profile_dir}
1065 # can be used in the batch script template as {profile_dir}
1040 self.context['profile_dir'] = profile_dir
1041 self.profile_dir = unicode(profile_dir)
1042 self.write_batch_script(n)
1066 self.write_batch_script(n)
1043 #output = check_output(self.args, env=os.environ)
1067 #output = check_output(self.args, env=os.environ)
1044 piped_cmd = self.args[0]+'<\"'+self.args[1]+'\"'
1068 piped_cmd = self.args[0]+'<\"'+self.args[1]+'\"'
@@ -1049,7 +1073,7 b' class LSFLauncher(BatchSystemLauncher):'
1049 return job_id
1073 return job_id
1050
1074
1051
1075
1052 class LSFControllerLauncher(LSFLauncher):
1076 class LSFControllerLauncher(LSFLauncher, BatchClusterAppMixin):
1053 """Launch a controller using LSF."""
1077 """Launch a controller using LSF."""
1054
1078
1055 batch_file_name = Unicode(u'lsf_controller', config=True,
1079 batch_file_name = Unicode(u'lsf_controller', config=True,
@@ -1058,29 +1082,29 b' class LSFControllerLauncher(LSFLauncher):'
1058 #BSUB -J ipcontroller
1082 #BSUB -J ipcontroller
1059 #BSUB -oo ipcontroller.o.%%J
1083 #BSUB -oo ipcontroller.o.%%J
1060 #BSUB -eo ipcontroller.e.%%J
1084 #BSUB -eo ipcontroller.e.%%J
1061 %s --log-to-file --profile-dir={profile_dir}
1085 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1062 """%(' '.join(ipcontroller_cmd_argv)))
1086 """%(' '.join(ipcontroller_cmd_argv)))
1063
1087
1064 def start(self, profile_dir):
1088 def start(self):
1065 """Start the controller by profile or profile_dir."""
1089 """Start the controller by profile or profile_dir."""
1066 self.log.info("Starting LSFControllerLauncher: %r" % self.args)
1090 self.log.info("Starting LSFControllerLauncher: %r" % self.args)
1067 return super(LSFControllerLauncher, self).start(1, profile_dir)
1091 return super(LSFControllerLauncher, self).start(1)
1068
1092
1069
1093
1070 class LSFEngineSetLauncher(LSFLauncher):
1094 class LSFEngineSetLauncher(LSFLauncher, BatchClusterAppMixin):
1071 """Launch Engines using LSF"""
1095 """Launch Engines using LSF"""
1072 batch_file_name = Unicode(u'lsf_engines', config=True,
1096 batch_file_name = Unicode(u'lsf_engines', config=True,
1073 help="batch file name for the engine(s) job.")
1097 help="batch file name for the engine(s) job.")
1074 default_template= Unicode(u"""#!/bin/sh
1098 default_template= Unicode(u"""#!/bin/sh
1075 #BSUB -oo ipengine.o.%%J
1099 #BSUB -oo ipengine.o.%%J
1076 #BSUB -eo ipengine.e.%%J
1100 #BSUB -eo ipengine.e.%%J
1077 %s --profile-dir={profile_dir}
1101 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1078 """%(' '.join(ipengine_cmd_argv)))
1102 """%(' '.join(ipengine_cmd_argv)))
1079
1103
1080 def start(self, n, profile_dir):
1104 def start(self, n):
1081 """Start n engines by profile or profile_dir."""
1105 """Start n engines by profile or profile_dir."""
1082 self.log.info('Starting %i engines with LSFEngineSetLauncher: %r' % (n, self.args))
1106 self.log.info('Starting %i engines with LSFEngineSetLauncher: %r' % (n, self.args))
1083 return super(LSFEngineSetLauncher, self).start(n, profile_dir)
1107 return super(LSFEngineSetLauncher, self).start(n)
1084
1108
1085
1109
1086 #-----------------------------------------------------------------------------
1110 #-----------------------------------------------------------------------------
General Comments 0
You need to be logged in to leave comments. Login now