##// END OF EJS Templates
Merge PR #795 (cluster-id and launcher cleanup)...
MinRK -
r4852:3994edb7 merge
parent child Browse files
Show More
@@ -207,8 +207,9 b' class Configurable(HasTraits):'
207 207 for parent in cls.mro():
208 208 # only include parents that are not base classes
209 209 # and are not the class itself
210 if issubclass(parent, Configurable) and \
211 not parent in (Configurable, SingletonConfigurable, cls):
210 # and have some configurable traits to inherit
211 if parent is not cls and issubclass(parent, Configurable) and \
212 parent.class_traits(config=True):
212 213 parents.append(parent)
213 214
214 215 if parents:
@@ -75,6 +75,7 b' base_aliases.update({'
75 75 'log-to-file' : 'BaseParallelApplication.log_to_file',
76 76 'clean-logs' : 'BaseParallelApplication.clean_logs',
77 77 'log-url' : 'BaseParallelApplication.log_url',
78 'cluster-id' : 'BaseParallelApplication.cluster_id',
78 79 })
79 80
80 81 base_flags = {
@@ -116,6 +117,22 b' class BaseParallelApplication(BaseIPythonApplication):'
116 117 log_url = Unicode('', config=True,
117 118 help="The ZMQ URL of the iplogger to aggregate logging.")
118 119
120 cluster_id = Unicode('', config=True,
121 help="""String id to add to runtime files, to prevent name collisions when
122 using multiple clusters with a single profile simultaneously.
123
124 When set, files will be named like: 'ipcontroller-<cluster_id>-engine.json'
125
126 Since this is text inserted into filenames, typical recommendations apply:
127 Simple character strings are ideal, and spaces are not recommended (but should
128 generally work).
129 """
130 )
131 def _cluster_id_changed(self, name, old, new):
132 self.name = self.__class__.name
133 if new:
134 self.name += '-%s'%new
135
119 136 def _config_files_default(self):
120 137 return ['ipcontroller_config.py', 'ipengine_config.py', 'ipcluster_config.py']
121 138
@@ -275,19 +275,22 b' class IPClusterEngines(BaseParallelApplication):'
275 275 self.init_launchers()
276 276
277 277 def init_launchers(self):
278 self.engine_launcher = self.build_launcher(self.engine_launcher_class)
278 self.engine_launcher = self.build_launcher(self.engine_launcher_class, 'EngineSet')
279 279 self.engine_launcher.on_stop(lambda r: self.loop.stop())
280 280
281 281 def init_signal(self):
282 282 # Setup signals
283 283 signal.signal(signal.SIGINT, self.sigint_handler)
284 284
285 def build_launcher(self, clsname):
285 def build_launcher(self, clsname, kind=None):
286 286 """import and instantiate a Launcher based on importstring"""
287 287 if '.' not in clsname:
288 288 # not a module, presume it's the raw name in apps.launcher
289 if kind and kind not in clsname:
290 # doesn't match necessary full class name, assume it's
291 # just 'PBS' or 'MPIExec' prefix:
292 clsname = clsname + kind + 'Launcher'
289 293 clsname = 'IPython.parallel.apps.launcher.'+clsname
290 # print repr(clsname)
291 294 try:
292 295 klass = import_item(clsname)
293 296 except (ImportError, KeyError):
@@ -295,16 +298,14 b' class IPClusterEngines(BaseParallelApplication):'
295 298 self.exit(1)
296 299
297 300 launcher = klass(
298 work_dir=u'.', config=self.config, log=self.log
301 work_dir=u'.', config=self.config, log=self.log,
302 profile_dir=self.profile_dir.location, cluster_id=self.cluster_id,
299 303 )
300 304 return launcher
301 305
302 306 def start_engines(self):
303 307 self.log.info("Starting %i engines"%self.n)
304 self.engine_launcher.start(
305 self.n,
306 self.profile_dir.location
307 )
308 self.engine_launcher.start(self.n)
308 309
309 310 def stop_engines(self):
310 311 self.log.info("Stopping Engines...")
@@ -424,14 +425,12 b' class IPClusterStart(IPClusterEngines):'
424 425 aliases = Dict(start_aliases)
425 426
426 427 def init_launchers(self):
427 self.controller_launcher = self.build_launcher(self.controller_launcher_class)
428 self.engine_launcher = self.build_launcher(self.engine_launcher_class)
428 self.controller_launcher = self.build_launcher(self.controller_launcher_class, 'Controller')
429 self.engine_launcher = self.build_launcher(self.engine_launcher_class, 'EngineSet')
429 430 self.controller_launcher.on_stop(self.stop_launchers)
430 431
431 432 def start_controller(self):
432 self.controller_launcher.start(
433 self.profile_dir.location
434 )
433 self.controller_launcher.start()
435 434
436 435 def stop_controller(self):
437 436 # self.log.info("In stop_controller")
@@ -174,7 +174,18 b' class IPControllerApp(BaseParallelApplication):'
174 174
175 175 use_threads = Bool(False, config=True,
176 176 help='Use threads instead of processes for the schedulers',
177 )
177 )
178
179 engine_json_file = Unicode('ipcontroller-engine.json', config=True,
180 help="JSON filename where engine connection info will be stored.")
181 client_json_file = Unicode('ipcontroller-client.json', config=True,
182 help="JSON filename where client connection info will be stored.")
183
184 def _cluster_id_changed(self, name, old, new):
185 super(IPControllerApp, self)._cluster_id_changed(name, old, new)
186 self.engine_json_file = "%s-engine.json" % self.name
187 self.client_json_file = "%s-client.json" % self.name
188
178 189
179 190 # internal
180 191 children = List()
@@ -215,7 +226,7 b' class IPControllerApp(BaseParallelApplication):'
215 226 """load config from existing json connector files."""
216 227 c = self.config
217 228 # load from engine config
218 with open(os.path.join(self.profile_dir.security_dir, 'ipcontroller-engine.json')) as f:
229 with open(os.path.join(self.profile_dir.security_dir, self.engine_json_file)) as f:
219 230 cfg = json.loads(f.read())
220 231 key = c.Session.key = asbytes(cfg['exec_key'])
221 232 xport,addr = cfg['url'].split('://')
@@ -227,7 +238,7 b' class IPControllerApp(BaseParallelApplication):'
227 238 if not self.engine_ssh_server:
228 239 self.engine_ssh_server = cfg['ssh']
229 240 # load client config
230 with open(os.path.join(self.profile_dir.security_dir, 'ipcontroller-client.json')) as f:
241 with open(os.path.join(self.profile_dir.security_dir, self.client_json_file)) as f:
231 242 cfg = json.loads(f.read())
232 243 assert key == cfg['exec_key'], "exec_key mismatch between engine and client keys"
233 244 xport,addr = cfg['url'].split('://')
@@ -277,11 +288,11 b' class IPControllerApp(BaseParallelApplication):'
277 288 'url' : "%s://%s:%s"%(f.client_transport, f.client_ip, f.regport),
278 289 'location' : self.location
279 290 }
280 self.save_connection_dict('ipcontroller-client.json', cdict)
291 self.save_connection_dict(self.client_json_file, cdict)
281 292 edict = cdict
282 293 edict['url']="%s://%s:%s"%((f.client_transport, f.client_ip, f.regport))
283 294 edict['ssh'] = self.engine_ssh_server
284 self.save_connection_dict('ipcontroller-engine.json', edict)
295 self.save_connection_dict(self.engine_json_file, edict)
285 296
286 297 #
287 298 def init_schedulers(self):
@@ -93,7 +93,7 b' class MPI(Configurable):'
93 93 help='How to enable MPI (mpi4py, pytrilinos, or empty string to disable).'
94 94 )
95 95
96 def _on_use_changed(self, old, new):
96 def _use_changed(self, name, old, new):
97 97 # load default init script if it's not set
98 98 if not self.init_script:
99 99 self.init_script = self.default_inits.get(new, '')
@@ -135,8 +135,8 b' aliases.update(base_aliases)'
135 135
136 136 class IPEngineApp(BaseParallelApplication):
137 137
138 name = Unicode(u'ipengine')
139 description = Unicode(_description)
138 name = 'ipengine'
139 description = _description
140 140 examples = _examples
141 141 config_file_name = Unicode(default_config_file_name)
142 142 classes = List([ProfileDir, Session, EngineFactory, Kernel, MPI])
@@ -158,7 +158,15 b' class IPEngineApp(BaseParallelApplication):'
158 158 controller and engine are started at the same time and it
159 159 may take a moment for the controller to write the connector files.""")
160 160
161 url_file_name = Unicode(u'ipcontroller-engine.json')
161 url_file_name = Unicode(u'ipcontroller-engine.json', config=True)
162
163 def _cluster_id_changed(self, name, old, new):
164 if new:
165 base = 'ipcontroller-%s' % new
166 else:
167 base = 'ipcontroller'
168 self.url_file_name = "%s-engine.json" % base
169
162 170 log_url = Unicode('', config=True,
163 171 help="""The URL for the iploggerapp instance, for forwarding
164 172 logging to a central location.""")
@@ -57,7 +57,9 b' from zmq.eventloop import ioloop'
57 57 from IPython.config.application import Application
58 58 from IPython.config.configurable import LoggingConfigurable
59 59 from IPython.utils.text import EvalFormatter
60 from IPython.utils.traitlets import Any, Int, CFloat, List, Unicode, Dict, Instance
60 from IPython.utils.traitlets import (
61 Any, Int, CFloat, List, Unicode, Dict, Instance, HasTraits,
62 )
61 63 from IPython.utils.path import get_ipython_module_path
62 64 from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError
63 65
@@ -213,6 +215,33 b' class BaseLauncher(LoggingConfigurable):'
213 215 """
214 216 raise NotImplementedError('signal must be implemented in a subclass')
215 217
218 class ClusterAppMixin(HasTraits):
219 """MixIn for cluster args as traits"""
220 cluster_args = List([])
221 profile_dir=Unicode('')
222 cluster_id=Unicode('')
223 def _profile_dir_changed(self, name, old, new):
224 self.cluster_args = []
225 if self.profile_dir:
226 self.cluster_args.extend(['--profile-dir', self.profile_dir])
227 if self.cluster_id:
228 self.cluster_args.extend(['--cluster-id', self.cluster_id])
229 _cluster_id_changed = _profile_dir_changed
230
231 class ControllerMixin(ClusterAppMixin):
232 controller_cmd = List(ipcontroller_cmd_argv, config=True,
233 help="""Popen command to launch ipcontroller.""")
234 # Command line arguments to ipcontroller.
235 controller_args = List(['--log-to-file','--log-level=%i' % logging.INFO], config=True,
236 help="""command-line args to pass to ipcontroller""")
237
238 class EngineMixin(ClusterAppMixin):
239 engine_cmd = List(ipengine_cmd_argv, config=True,
240 help="""command to launch the Engine.""")
241 # Command line arguments for ipengine.
242 engine_args = List(['--log-to-file','--log-level=%i' % logging.INFO], config=True,
243 help="command-line arguments to pass to ipengine"
244 )
216 245
217 246 #-----------------------------------------------------------------------------
218 247 # Local process launchers
@@ -317,54 +346,28 b' class LocalProcessLauncher(BaseLauncher):'
317 346 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
318 347 return status
319 348
320 class LocalControllerLauncher(LocalProcessLauncher):
349 class LocalControllerLauncher(LocalProcessLauncher, ControllerMixin):
321 350 """Launch a controller as a regular external process."""
322 351
323 controller_cmd = List(ipcontroller_cmd_argv, config=True,
324 help="""Popen command to launch ipcontroller.""")
325 # Command line arguments to ipcontroller.
326 controller_args = List(['--log-to-file','--log-level=%i'%logging.INFO], config=True,
327 help="""command-line args to pass to ipcontroller""")
328
329 352 def find_args(self):
330 return self.controller_cmd + self.controller_args
353 return self.controller_cmd + self.cluster_args + self.controller_args
331 354
332 def start(self, profile_dir):
355 def start(self):
333 356 """Start the controller by profile_dir."""
334 self.controller_args.extend(['--profile-dir=%s'%profile_dir])
335 self.profile_dir = unicode(profile_dir)
336 357 self.log.info("Starting LocalControllerLauncher: %r" % self.args)
337 358 return super(LocalControllerLauncher, self).start()
338 359
339 360
340 class LocalEngineLauncher(LocalProcessLauncher):
361 class LocalEngineLauncher(LocalProcessLauncher, EngineMixin):
341 362 """Launch a single engine as a regular externall process."""
342 363
343 engine_cmd = List(ipengine_cmd_argv, config=True,
344 help="""command to launch the Engine.""")
345 # Command line arguments for ipengine.
346 engine_args = List(['--log-to-file','--log-level=%i'%logging.INFO], config=True,
347 help="command-line arguments to pass to ipengine"
348 )
349
350 364 def find_args(self):
351 return self.engine_cmd + self.engine_args
365 return self.engine_cmd + self.cluster_args + self.engine_args
352 366
353 def start(self, profile_dir):
354 """Start the engine by profile_dir."""
355 self.engine_args.extend(['--profile-dir=%s'%profile_dir])
356 self.profile_dir = unicode(profile_dir)
357 return super(LocalEngineLauncher, self).start()
358 367
359
360 class LocalEngineSetLauncher(BaseLauncher):
368 class LocalEngineSetLauncher(LocalEngineLauncher):
361 369 """Launch a set of engines as regular external processes."""
362 370
363 # Command line arguments for ipengine.
364 engine_args = List(
365 ['--log-to-file','--log-level=%i'%logging.INFO], config=True,
366 help="command-line arguments to pass to ipengine"
367 )
368 371 delay = CFloat(0.1, config=True,
369 372 help="""delay (in seconds) between starting each engine after the first.
370 373 This can help force the engines to get their ids in order, or limit
@@ -383,26 +386,26 b' class LocalEngineSetLauncher(BaseLauncher):'
383 386 )
384 387 self.stop_data = {}
385 388
386 def start(self, n, profile_dir):
389 def start(self, n):
387 390 """Start n engines by profile or profile_dir."""
388 self.profile_dir = unicode(profile_dir)
389 391 dlist = []
390 392 for i in range(n):
391 393 if i > 0:
392 394 time.sleep(self.delay)
393 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log)
395 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log,
396 profile_dir=self.profile_dir, cluster_id=self.cluster_id,
397 )
398
394 399 # Copy the engine args over to each engine launcher.
400 el.engine_cmd = copy.deepcopy(self.engine_cmd)
395 401 el.engine_args = copy.deepcopy(self.engine_args)
396 402 el.on_stop(self._notice_engine_stopped)
397 d = el.start(profile_dir)
403 d = el.start()
398 404 if i==0:
399 405 self.log.info("Starting LocalEngineSetLauncher: %r" % el.args)
400 406 self.launchers[i] = el
401 407 dlist.append(d)
402 408 self.notify_start(dlist)
403 # The consumeErrors here could be dangerous
404 # dfinal = gatherBoth(dlist, consumeErrors=True)
405 # dfinal.addCallback(self.notify_start)
406 409 return dlist
407 410
408 411 def find_args(self):
@@ -413,7 +416,6 b' class LocalEngineSetLauncher(BaseLauncher):'
413 416 for el in self.launchers.itervalues():
414 417 d = el.signal(sig)
415 418 dlist.append(d)
416 # dfinal = gatherBoth(dlist, consumeErrors=True)
417 419 return dlist
418 420
419 421 def interrupt_then_kill(self, delay=1.0):
@@ -421,7 +423,6 b' class LocalEngineSetLauncher(BaseLauncher):'
421 423 for el in self.launchers.itervalues():
422 424 d = el.interrupt_then_kill(delay)
423 425 dlist.append(d)
424 # dfinal = gatherBoth(dlist, consumeErrors=True)
425 426 return dlist
426 427
427 428 def stop(self):
@@ -452,9 +453,9 b' class MPIExecLauncher(LocalProcessLauncher):'
452 453 mpi_args = List([], config=True,
453 454 help="The command line arguments to pass to mpiexec."
454 455 )
455 program = List(['date'], config=True,
456 program = List(['date'],
456 457 help="The program to start via mpiexec.")
457 program_args = List([], config=True,
458 program_args = List([],
458 459 help="The command line argument to the program."
459 460 )
460 461 n = Int(1)
@@ -470,44 +471,42 b' class MPIExecLauncher(LocalProcessLauncher):'
470 471 return super(MPIExecLauncher, self).start()
471 472
472 473
473 class MPIExecControllerLauncher(MPIExecLauncher):
474 class MPIExecControllerLauncher(MPIExecLauncher, ControllerMixin):
474 475 """Launch a controller using mpiexec."""
475 476
476 controller_cmd = List(ipcontroller_cmd_argv, config=True,
477 help="Popen command to launch the Contropper"
478 )
479 controller_args = List(['--log-to-file','--log-level=%i'%logging.INFO], config=True,
480 help="Command line arguments to pass to ipcontroller."
481 )
482 n = Int(1)
477 # alias back to *non-configurable* program[_args] for use in find_args()
478 # this way all Controller/EngineSetLaunchers have the same form, rather
479 # than *some* having `program_args` and others `controller_args`
480 @property
481 def program(self):
482 return self.controller_cmd
483
484 @property
485 def program_args(self):
486 return self.cluster_args + self.controller_args
483 487
484 def start(self, profile_dir):
488 def start(self):
485 489 """Start the controller by profile_dir."""
486 self.controller_args.extend(['--profile-dir=%s'%profile_dir])
487 self.profile_dir = unicode(profile_dir)
488 490 self.log.info("Starting MPIExecControllerLauncher: %r" % self.args)
489 491 return super(MPIExecControllerLauncher, self).start(1)
490 492
491 def find_args(self):
492 return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \
493 self.controller_cmd + self.controller_args
494
495 493
496 class MPIExecEngineSetLauncher(MPIExecLauncher):
494 class MPIExecEngineSetLauncher(MPIExecLauncher, EngineMixin):
495 """Launch engines using mpiexec"""
497 496
498 program = List(ipengine_cmd_argv, config=True,
499 help="Popen command for ipengine"
500 )
501 program_args = List(
502 ['--log-to-file','--log-level=%i'%logging.INFO], config=True,
503 help="Command line arguments for ipengine."
504 )
505 n = Int(1)
497 # alias back to *non-configurable* program[_args] for use in find_args()
498 # this way all Controller/EngineSetLaunchers have the same form, rather
499 # than *some* having `program_args` and others `controller_args`
500 @property
501 def program(self):
502 return self.engine_cmd
503
504 @property
505 def program_args(self):
506 return self.cluster_args + self.engine_args
506 507
507 def start(self, n, profile_dir):
508 def start(self, n):
508 509 """Start n engines by profile or profile_dir."""
509 self.program_args.extend(['--profile-dir=%s'%profile_dir])
510 self.profile_dir = unicode(profile_dir)
511 510 self.n = n
512 511 self.log.info('Starting MPIExecEngineSetLauncher: %r' % self.args)
513 512 return super(MPIExecEngineSetLauncher, self).start(n)
@@ -530,9 +529,9 b' class SSHLauncher(LocalProcessLauncher):'
530 529 help="command for starting ssh")
531 530 ssh_args = List(['-tt'], config=True,
532 531 help="args to pass to ssh")
533 program = List(['date'], config=True,
532 program = List(['date'],
534 533 help="Program to launch via ssh")
535 program_args = List([], config=True,
534 program_args = List([],
536 535 help="args to pass to remote program")
537 536 hostname = Unicode('', config=True,
538 537 help="hostname on which to launch the program")
@@ -554,8 +553,7 b' class SSHLauncher(LocalProcessLauncher):'
554 553 return self.ssh_cmd + self.ssh_args + [self.location] + \
555 554 self.program + self.program_args
556 555
557 def start(self, profile_dir, hostname=None, user=None):
558 self.profile_dir = unicode(profile_dir)
556 def start(self, hostname=None, user=None):
559 557 if hostname is not None:
560 558 self.hostname = hostname
561 559 if user is not None:
@@ -571,22 +569,33 b' class SSHLauncher(LocalProcessLauncher):'
571 569
572 570
573 571
574 class SSHControllerLauncher(SSHLauncher):
572 class SSHControllerLauncher(SSHLauncher, ControllerMixin):
575 573
576 program = List(ipcontroller_cmd_argv, config=True,
577 help="remote ipcontroller command.")
578 program_args = List(['--reuse-files', '--log-to-file','--log-level=%i'%logging.INFO], config=True,
579 help="Command line arguments to ipcontroller.")
574 # alias back to *non-configurable* program[_args] for use in find_args()
575 # this way all Controller/EngineSetLaunchers have the same form, rather
576 # than *some* having `program_args` and others `controller_args`
577 @property
578 def program(self):
579 return self.controller_cmd
580
581 @property
582 def program_args(self):
583 return self.cluster_args + self.controller_args
580 584
581 585
582 class SSHEngineLauncher(SSHLauncher):
583 program = List(ipengine_cmd_argv, config=True,
584 help="remote ipengine command.")
585 # Command line arguments for ipengine.
586 program_args = List(
587 ['--log-to-file','--log_level=%i'%logging.INFO], config=True,
588 help="Command line arguments to ipengine."
589 )
586 class SSHEngineLauncher(SSHLauncher, EngineMixin):
587
588 # alias back to *non-configurable* program[_args] for use in find_args()
589 # this way all Controller/EngineSetLaunchers have the same form, rather
590 # than *some* having `program_args` and others `controller_args`
591 @property
592 def program(self):
593 return self.engine_cmd
594
595 @property
596 def program_args(self):
597 return self.cluster_args + self.engine_args
598
590 599
591 600 class SSHEngineSetLauncher(LocalEngineSetLauncher):
592 601 launcher_class = SSHEngineLauncher
@@ -594,12 +603,11 b' class SSHEngineSetLauncher(LocalEngineSetLauncher):'
594 603 help="""dict of engines to launch. This is a dict by hostname of ints,
595 604 corresponding to the number of engines to start on that host.""")
596 605
597 def start(self, n, profile_dir):
606 def start(self, n):
598 607 """Start engines by profile or profile_dir.
599 608 `n` is ignored, and the `engines` config property is used instead.
600 609 """
601 610
602 self.profile_dir = unicode(profile_dir)
603 611 dlist = []
604 612 for host, n in self.engines.iteritems():
605 613 if isinstance(n, (tuple, list)):
@@ -614,13 +622,15 b' class SSHEngineSetLauncher(LocalEngineSetLauncher):'
614 622 for i in range(n):
615 623 if i > 0:
616 624 time.sleep(self.delay)
617 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log)
625 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log,
626 profile_dir=self.profile_dir, cluster_id=self.cluster_id,
627 )
618 628
619 629 # Copy the engine args over to each engine launcher.
620 i
621 el.program_args = args
630 el.engine_cmd = self.engine_cmd
631 el.engine_args = args
622 632 el.on_stop(self._notice_engine_stopped)
623 d = el.start(profile_dir, user=user, hostname=host)
633 d = el.start(user=user, hostname=host)
624 634 if i==0:
625 635 self.log.info("Starting SSHEngineSetLauncher: %r" % el.args)
626 636 self.launchers[host+str(i)] = el
@@ -727,11 +737,11 b' class WindowsHPCLauncher(BaseLauncher):'
727 737 return output
728 738
729 739
730 class WindowsHPCControllerLauncher(WindowsHPCLauncher):
740 class WindowsHPCControllerLauncher(WindowsHPCLauncher, ClusterAppMixin):
731 741
732 742 job_file_name = Unicode(u'ipcontroller_job.xml', config=True,
733 743 help="WinHPC xml job file.")
734 extra_args = List([], config=False,
744 controller_args = List([], config=False,
735 745 help="extra args to pass to ipcontroller")
736 746
737 747 def write_job_file(self, n):
@@ -743,7 +753,8 b' class WindowsHPCControllerLauncher(WindowsHPCLauncher):'
743 753 # files that the scheduler redirects to.
744 754 t.work_directory = self.profile_dir
745 755 # Add the profile_dir and from self.start().
746 t.controller_args.extend(self.extra_args)
756 t.controller_args.extend(self.cluster_args)
757 t.controller_args.extend(self.controller_args)
747 758 job.add_task(t)
748 759
749 760 self.log.info("Writing job description file: %s" % self.job_file)
@@ -753,18 +764,16 b' class WindowsHPCControllerLauncher(WindowsHPCLauncher):'
753 764 def job_file(self):
754 765 return os.path.join(self.profile_dir, self.job_file_name)
755 766
756 def start(self, profile_dir):
767 def start(self):
757 768 """Start the controller by profile_dir."""
758 self.extra_args = ['--profile-dir=%s'%profile_dir]
759 self.profile_dir = unicode(profile_dir)
760 769 return super(WindowsHPCControllerLauncher, self).start(1)
761 770
762 771
763 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):
772 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher, ClusterAppMixin):
764 773
765 774 job_file_name = Unicode(u'ipengineset_job.xml', config=True,
766 775 help="jobfile for ipengines job")
767 extra_args = List([], config=False,
776 engine_args = List([], config=False,
768 777 help="extra args to pas to ipengine")
769 778
770 779 def write_job_file(self, n):
@@ -777,7 +786,8 b' class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):'
777 786 # files that the scheduler redirects to.
778 787 t.work_directory = self.profile_dir
779 788 # Add the profile_dir and from self.start().
780 t.engine_args.extend(self.extra_args)
789 t.controller_args.extend(self.cluster_args)
790 t.controller_args.extend(self.engine_args)
781 791 job.add_task(t)
782 792
783 793 self.log.info("Writing job description file: %s" % self.job_file)
@@ -787,10 +797,8 b' class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):'
787 797 def job_file(self):
788 798 return os.path.join(self.profile_dir, self.job_file_name)
789 799
790 def start(self, n, profile_dir):
800 def start(self, n):
791 801 """Start the controller by profile_dir."""
792 self.extra_args = ['--profile-dir=%s'%profile_dir]
793 self.profile_dir = unicode(profile_dir)
794 802 return super(WindowsHPCEngineSetLauncher, self).start(n)
795 803
796 804
@@ -798,6 +806,20 b' class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):'
798 806 # Batch (PBS) system launchers
799 807 #-----------------------------------------------------------------------------
800 808
809 class BatchClusterAppMixin(ClusterAppMixin):
810 """ClusterApp mixin that updates the self.context dict, rather than cl-args."""
811 def _profile_dir_changed(self, name, old, new):
812 self.context[name] = new
813 _cluster_id_changed = _profile_dir_changed
814
815 def _profile_dir_default(self):
816 self.context['profile_dir'] = ''
817 return ''
818 def _cluster_id_default(self):
819 self.context['cluster_id'] = ''
820 return ''
821
822
801 823 class BatchSystemLauncher(BaseLauncher):
802 824 """Launch an external process using a batch system.
803 825
@@ -829,6 +851,12 b' class BatchSystemLauncher(BaseLauncher):'
829 851 queue = Unicode(u'', config=True,
830 852 help="The PBS Queue.")
831 853
854 def _queue_changed(self, name, old, new):
855 self.context[name] = new
856
857 n = Int(1)
858 _n_changed = _queue_changed
859
832 860 # not configurable, override in subclasses
833 861 # PBS Job Array regex
834 862 job_array_regexp = Unicode('')
@@ -868,8 +896,7 b' class BatchSystemLauncher(BaseLauncher):'
868 896
869 897 def write_batch_script(self, n):
870 898 """Instantiate and write the batch script to the work_dir."""
871 self.context['n'] = n
872 self.context['queue'] = self.queue
899 self.n = n
873 900 # first priority is batch_template if set
874 901 if self.batch_template_file and not self.batch_template:
875 902 # second priority is batch_template_file
@@ -902,12 +929,10 b' class BatchSystemLauncher(BaseLauncher):'
902 929 f.write(script_as_string)
903 930 os.chmod(self.batch_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
904 931
905 def start(self, n, profile_dir):
932 def start(self, n):
906 933 """Start n copies of the process using a batch system."""
907 934 # Here we save profile_dir in the context so they
908 935 # can be used in the batch script template as {profile_dir}
909 self.context['profile_dir'] = profile_dir
910 self.profile_dir = unicode(profile_dir)
911 936 self.write_batch_script(n)
912 937 output = check_output(self.args, env=os.environ)
913 938
@@ -938,7 +963,7 b' class PBSLauncher(BatchSystemLauncher):'
938 963 queue_template = Unicode('#PBS -q {queue}')
939 964
940 965
941 class PBSControllerLauncher(PBSLauncher):
966 class PBSControllerLauncher(PBSLauncher, BatchClusterAppMixin):
942 967 """Launch a controller using PBS."""
943 968
944 969 batch_file_name = Unicode(u'pbs_controller', config=True,
@@ -946,29 +971,30 b' class PBSControllerLauncher(PBSLauncher):'
946 971 default_template= Unicode("""#!/bin/sh
947 972 #PBS -V
948 973 #PBS -N ipcontroller
949 %s --log-to-file --profile-dir={profile_dir}
974 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
950 975 """%(' '.join(ipcontroller_cmd_argv)))
951 976
952 def start(self, profile_dir):
977
978 def start(self):
953 979 """Start the controller by profile or profile_dir."""
954 980 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
955 return super(PBSControllerLauncher, self).start(1, profile_dir)
981 return super(PBSControllerLauncher, self).start(1)
956 982
957 983
958 class PBSEngineSetLauncher(PBSLauncher):
984 class PBSEngineSetLauncher(PBSLauncher, BatchClusterAppMixin):
959 985 """Launch Engines using PBS"""
960 986 batch_file_name = Unicode(u'pbs_engines', config=True,
961 987 help="batch file name for the engine(s) job.")
962 988 default_template= Unicode(u"""#!/bin/sh
963 989 #PBS -V
964 990 #PBS -N ipengine
965 %s --profile-dir={profile_dir}
991 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
966 992 """%(' '.join(ipengine_cmd_argv)))
967 993
968 def start(self, n, profile_dir):
994 def start(self, n):
969 995 """Start n engines by profile or profile_dir."""
970 996 self.log.info('Starting %i engines with PBSEngineSetLauncher: %r' % (n, self.args))
971 return super(PBSEngineSetLauncher, self).start(n, profile_dir)
997 return super(PBSEngineSetLauncher, self).start(n)
972 998
973 999 #SGE is very similar to PBS
974 1000
@@ -979,7 +1005,7 b' class SGELauncher(PBSLauncher):'
979 1005 queue_regexp = Unicode('#\$\W+-q\W+\$?\w+')
980 1006 queue_template = Unicode('#$ -q {queue}')
981 1007
982 class SGEControllerLauncher(SGELauncher):
1008 class SGEControllerLauncher(SGELauncher, BatchClusterAppMixin):
983 1009 """Launch a controller using SGE."""
984 1010
985 1011 batch_file_name = Unicode(u'sge_controller', config=True,
@@ -987,28 +1013,28 b' class SGEControllerLauncher(SGELauncher):'
987 1013 default_template= Unicode(u"""#$ -V
988 1014 #$ -S /bin/sh
989 1015 #$ -N ipcontroller
990 %s --log-to-file --profile-dir={profile_dir}
1016 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
991 1017 """%(' '.join(ipcontroller_cmd_argv)))
992 1018
993 def start(self, profile_dir):
1019 def start(self):
994 1020 """Start the controller by profile or profile_dir."""
995 1021 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
996 return super(SGEControllerLauncher, self).start(1, profile_dir)
1022 return super(SGEControllerLauncher, self).start(1)
997 1023
998 class SGEEngineSetLauncher(SGELauncher):
1024 class SGEEngineSetLauncher(SGELauncher, BatchClusterAppMixin):
999 1025 """Launch Engines with SGE"""
1000 1026 batch_file_name = Unicode(u'sge_engines', config=True,
1001 1027 help="batch file name for the engine(s) job.")
1002 1028 default_template = Unicode("""#$ -V
1003 1029 #$ -S /bin/sh
1004 1030 #$ -N ipengine
1005 %s --profile-dir={profile_dir}
1031 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1006 1032 """%(' '.join(ipengine_cmd_argv)))
1007 1033
1008 def start(self, n, profile_dir):
1034 def start(self, n):
1009 1035 """Start n engines by profile or profile_dir."""
1010 1036 self.log.info('Starting %i engines with SGEEngineSetLauncher: %r' % (n, self.args))
1011 return super(SGEEngineSetLauncher, self).start(n, profile_dir)
1037 return super(SGEEngineSetLauncher, self).start(n)
1012 1038
1013 1039
1014 1040 # LSF launchers
@@ -1029,7 +1055,7 b' class LSFLauncher(BatchSystemLauncher):'
1029 1055 queue_regexp = Unicode('#BSUB[ \t]+-q[ \t]+\w+')
1030 1056 queue_template = Unicode('#BSUB -q {queue}')
1031 1057
1032 def start(self, n, profile_dir):
1058 def start(self, n):
1033 1059 """Start n copies of the process using LSF batch system.
1034 1060 This cant inherit from the base class because bsub expects
1035 1061 to be piped a shell script in order to honor the #BSUB directives :
@@ -1037,8 +1063,6 b' class LSFLauncher(BatchSystemLauncher):'
1037 1063 """
1038 1064 # Here we save profile_dir in the context so they
1039 1065 # can be used in the batch script template as {profile_dir}
1040 self.context['profile_dir'] = profile_dir
1041 self.profile_dir = unicode(profile_dir)
1042 1066 self.write_batch_script(n)
1043 1067 #output = check_output(self.args, env=os.environ)
1044 1068 piped_cmd = self.args[0]+'<\"'+self.args[1]+'\"'
@@ -1049,7 +1073,7 b' class LSFLauncher(BatchSystemLauncher):'
1049 1073 return job_id
1050 1074
1051 1075
1052 class LSFControllerLauncher(LSFLauncher):
1076 class LSFControllerLauncher(LSFLauncher, BatchClusterAppMixin):
1053 1077 """Launch a controller using LSF."""
1054 1078
1055 1079 batch_file_name = Unicode(u'lsf_controller', config=True,
@@ -1058,29 +1082,29 b' class LSFControllerLauncher(LSFLauncher):'
1058 1082 #BSUB -J ipcontroller
1059 1083 #BSUB -oo ipcontroller.o.%%J
1060 1084 #BSUB -eo ipcontroller.e.%%J
1061 %s --log-to-file --profile-dir={profile_dir}
1085 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1062 1086 """%(' '.join(ipcontroller_cmd_argv)))
1063 1087
1064 def start(self, profile_dir):
1088 def start(self):
1065 1089 """Start the controller by profile or profile_dir."""
1066 1090 self.log.info("Starting LSFControllerLauncher: %r" % self.args)
1067 return super(LSFControllerLauncher, self).start(1, profile_dir)
1091 return super(LSFControllerLauncher, self).start(1)
1068 1092
1069 1093
1070 class LSFEngineSetLauncher(LSFLauncher):
1094 class LSFEngineSetLauncher(LSFLauncher, BatchClusterAppMixin):
1071 1095 """Launch Engines using LSF"""
1072 1096 batch_file_name = Unicode(u'lsf_engines', config=True,
1073 1097 help="batch file name for the engine(s) job.")
1074 1098 default_template= Unicode(u"""#!/bin/sh
1075 1099 #BSUB -oo ipengine.o.%%J
1076 1100 #BSUB -eo ipengine.e.%%J
1077 %s --profile-dir={profile_dir}
1101 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1078 1102 """%(' '.join(ipengine_cmd_argv)))
1079 1103
1080 def start(self, n, profile_dir):
1104 def start(self, n):
1081 1105 """Start n engines by profile or profile_dir."""
1082 1106 self.log.info('Starting %i engines with LSFEngineSetLauncher: %r' % (n, self.args))
1083 return super(LSFEngineSetLauncher, self).start(n, profile_dir)
1107 return super(LSFEngineSetLauncher, self).start(n)
1084 1108
1085 1109
1086 1110 #-----------------------------------------------------------------------------
General Comments 0
You need to be logged in to leave comments. Login now