##// END OF EJS Templates
add cluster_id support to ipcluster/launchers...
MinRK -
Show More
@@ -295,16 +295,14 b' class IPClusterEngines(BaseParallelApplication):'
295 self.exit(1)
295 self.exit(1)
296
296
297 launcher = klass(
297 launcher = klass(
298 work_dir=u'.', config=self.config, log=self.log
298 work_dir=u'.', config=self.config, log=self.log,
299 profile_dir=self.profile_dir.location, cluster_id=self.cluster_id,
299 )
300 )
300 return launcher
301 return launcher
301
302
302 def start_engines(self):
303 def start_engines(self):
303 self.log.info("Starting %i engines"%self.n)
304 self.log.info("Starting %i engines"%self.n)
304 self.engine_launcher.start(
305 self.engine_launcher.start(self.n)
305 self.n,
306 self.profile_dir.location
307 )
308
306
309 def stop_engines(self):
307 def stop_engines(self):
310 self.log.info("Stopping Engines...")
308 self.log.info("Stopping Engines...")
@@ -429,9 +427,7 b' class IPClusterStart(IPClusterEngines):'
429 self.controller_launcher.on_stop(self.stop_launchers)
427 self.controller_launcher.on_stop(self.stop_launchers)
430
428
431 def start_controller(self):
429 def start_controller(self):
432 self.controller_launcher.start(
430 self.controller_launcher.start()
433 self.profile_dir.location
434 )
435
431
436 def stop_controller(self):
432 def stop_controller(self):
437 # self.log.info("In stop_controller")
433 # self.log.info("In stop_controller")
@@ -57,7 +57,9 b' from zmq.eventloop import ioloop'
57 from IPython.config.application import Application
57 from IPython.config.application import Application
58 from IPython.config.configurable import LoggingConfigurable
58 from IPython.config.configurable import LoggingConfigurable
59 from IPython.utils.text import EvalFormatter
59 from IPython.utils.text import EvalFormatter
60 from IPython.utils.traitlets import Any, Int, CFloat, List, Unicode, Dict, Instance
60 from IPython.utils.traitlets import (
61 Any, Int, CFloat, List, Unicode, Dict, Instance, HasTraits,
62 )
61 from IPython.utils.path import get_ipython_module_path
63 from IPython.utils.path import get_ipython_module_path
62 from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError
64 from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError
63
65
@@ -213,6 +215,33 b' class BaseLauncher(LoggingConfigurable):'
213 """
215 """
214 raise NotImplementedError('signal must be implemented in a subclass')
216 raise NotImplementedError('signal must be implemented in a subclass')
215
217
218 class ClusterAppMixin(HasTraits):
219 """MixIn for cluster args as traits"""
220 cluster_args = List([])
221 profile_dir=Unicode('')
222 cluster_id=Unicode('')
223 def _profile_dir_changed(self, name, old, new):
224 self.cluster_args = []
225 if self.profile_dir:
226 self.cluster_args.extend(['--profile-dir', self.profile_dir])
227 if self.cluster_id:
228 self.cluster_args.extend(['--cluster-id', self.cluster_id])
229 _cluster_id_changed = _profile_dir_changed
230
231 class ControllerMixin(ClusterAppMixin):
232 controller_cmd = List(ipcontroller_cmd_argv, config=True,
233 help="""Popen command to launch ipcontroller.""")
234 # Command line arguments to ipcontroller.
235 controller_args = List(['--log-to-file','--log-level=%i'%logging.INFO], config=True,
236 help="""command-line args to pass to ipcontroller""")
237
238 class EngineMixin(ClusterAppMixin):
239 engine_cmd = List(ipengine_cmd_argv, config=True,
240 help="""command to launch the Engine.""")
241 # Command line arguments for ipengine.
242 engine_args = List(['--log-to-file','--log-level=%i'%logging.INFO], config=True,
243 help="command-line arguments to pass to ipengine"
244 )
216
245
217 #-----------------------------------------------------------------------------
246 #-----------------------------------------------------------------------------
218 # Local process launchers
247 # Local process launchers
@@ -317,54 +346,28 b' class LocalProcessLauncher(BaseLauncher):'
317 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
346 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
318 return status
347 return status
319
348
320 class LocalControllerLauncher(LocalProcessLauncher):
349 class LocalControllerLauncher(LocalProcessLauncher, ControllerMixin):
321 """Launch a controller as a regular external process."""
350 """Launch a controller as a regular external process."""
322
351
323 controller_cmd = List(ipcontroller_cmd_argv, config=True,
324 help="""Popen command to launch ipcontroller.""")
325 # Command line arguments to ipcontroller.
326 controller_args = List(['--log-to-file','--log-level=%i'%logging.INFO], config=True,
327 help="""command-line args to pass to ipcontroller""")
328
329 def find_args(self):
352 def find_args(self):
330 return self.controller_cmd + self.controller_args
353 return self.controller_cmd + self.cluster_args + self.controller_args
331
354
332 def start(self, profile_dir):
355 def start(self):
333 """Start the controller by profile_dir."""
356 """Start the controller by profile_dir."""
334 self.controller_args.extend(['--profile-dir=%s'%profile_dir])
335 self.profile_dir = unicode(profile_dir)
336 self.log.info("Starting LocalControllerLauncher: %r" % self.args)
357 self.log.info("Starting LocalControllerLauncher: %r" % self.args)
337 return super(LocalControllerLauncher, self).start()
358 return super(LocalControllerLauncher, self).start()
338
359
339
360
340 class LocalEngineLauncher(LocalProcessLauncher):
361 class LocalEngineLauncher(LocalProcessLauncher, EngineMixin):
341 """Launch a single engine as a regular externall process."""
362 """Launch a single engine as a regular externall process."""
342
363
343 engine_cmd = List(ipengine_cmd_argv, config=True,
344 help="""command to launch the Engine.""")
345 # Command line arguments for ipengine.
346 engine_args = List(['--log-to-file','--log-level=%i'%logging.INFO], config=True,
347 help="command-line arguments to pass to ipengine"
348 )
349
350 def find_args(self):
364 def find_args(self):
351 return self.engine_cmd + self.engine_args
365 return self.engine_cmd + self.cluster_args + self.engine_args
352
353 def start(self, profile_dir):
354 """Start the engine by profile_dir."""
355 self.engine_args.extend(['--profile-dir=%s'%profile_dir])
356 self.profile_dir = unicode(profile_dir)
357 return super(LocalEngineLauncher, self).start()
358
366
359
367
360 class LocalEngineSetLauncher(BaseLauncher):
368 class LocalEngineSetLauncher(LocalEngineLauncher):
361 """Launch a set of engines as regular external processes."""
369 """Launch a set of engines as regular external processes."""
362
370
363 # Command line arguments for ipengine.
364 engine_args = List(
365 ['--log-to-file','--log-level=%i'%logging.INFO], config=True,
366 help="command-line arguments to pass to ipengine"
367 )
368 delay = CFloat(0.1, config=True,
371 delay = CFloat(0.1, config=True,
369 help="""delay (in seconds) between starting each engine after the first.
372 help="""delay (in seconds) between starting each engine after the first.
370 This can help force the engines to get their ids in order, or limit
373 This can help force the engines to get their ids in order, or limit
@@ -383,26 +386,26 b' class LocalEngineSetLauncher(BaseLauncher):'
383 )
386 )
384 self.stop_data = {}
387 self.stop_data = {}
385
388
386 def start(self, n, profile_dir):
389 def start(self, n):
387 """Start n engines by profile or profile_dir."""
390 """Start n engines by profile or profile_dir."""
388 self.profile_dir = unicode(profile_dir)
389 dlist = []
391 dlist = []
390 for i in range(n):
392 for i in range(n):
391 if i > 0:
393 if i > 0:
392 time.sleep(self.delay)
394 time.sleep(self.delay)
393 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log)
395 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log,
396 profile_dir=self.profile_dir, cluster_id=self.cluster_id,
397 )
398
394 # Copy the engine args over to each engine launcher.
399 # Copy the engine args over to each engine launcher.
400 el.engine_cmd = copy.deepcopy(self.engine_cmd)
395 el.engine_args = copy.deepcopy(self.engine_args)
401 el.engine_args = copy.deepcopy(self.engine_args)
396 el.on_stop(self._notice_engine_stopped)
402 el.on_stop(self._notice_engine_stopped)
397 d = el.start(profile_dir)
403 d = el.start()
398 if i==0:
404 if i==0:
399 self.log.info("Starting LocalEngineSetLauncher: %r" % el.args)
405 self.log.info("Starting LocalEngineSetLauncher: %r" % el.args)
400 self.launchers[i] = el
406 self.launchers[i] = el
401 dlist.append(d)
407 dlist.append(d)
402 self.notify_start(dlist)
408 self.notify_start(dlist)
403 # The consumeErrors here could be dangerous
404 # dfinal = gatherBoth(dlist, consumeErrors=True)
405 # dfinal.addCallback(self.notify_start)
406 return dlist
409 return dlist
407
410
408 def find_args(self):
411 def find_args(self):
@@ -413,7 +416,6 b' class LocalEngineSetLauncher(BaseLauncher):'
413 for el in self.launchers.itervalues():
416 for el in self.launchers.itervalues():
414 d = el.signal(sig)
417 d = el.signal(sig)
415 dlist.append(d)
418 dlist.append(d)
416 # dfinal = gatherBoth(dlist, consumeErrors=True)
417 return dlist
419 return dlist
418
420
419 def interrupt_then_kill(self, delay=1.0):
421 def interrupt_then_kill(self, delay=1.0):
@@ -421,7 +423,6 b' class LocalEngineSetLauncher(BaseLauncher):'
421 for el in self.launchers.itervalues():
423 for el in self.launchers.itervalues():
422 d = el.interrupt_then_kill(delay)
424 d = el.interrupt_then_kill(delay)
423 dlist.append(d)
425 dlist.append(d)
424 # dfinal = gatherBoth(dlist, consumeErrors=True)
425 return dlist
426 return dlist
426
427
427 def stop(self):
428 def stop(self):
@@ -452,9 +453,9 b' class MPIExecLauncher(LocalProcessLauncher):'
452 mpi_args = List([], config=True,
453 mpi_args = List([], config=True,
453 help="The command line arguments to pass to mpiexec."
454 help="The command line arguments to pass to mpiexec."
454 )
455 )
455 program = List(['date'], config=True,
456 program = List(['date'],
456 help="The program to start via mpiexec.")
457 help="The program to start via mpiexec.")
457 program_args = List([], config=True,
458 program_args = List([],
458 help="The command line argument to the program."
459 help="The command line argument to the program."
459 )
460 )
460 n = Int(1)
461 n = Int(1)
@@ -470,44 +471,42 b' class MPIExecLauncher(LocalProcessLauncher):'
470 return super(MPIExecLauncher, self).start()
471 return super(MPIExecLauncher, self).start()
471
472
472
473
473 class MPIExecControllerLauncher(MPIExecLauncher):
474 class MPIExecControllerLauncher(MPIExecLauncher, ControllerMixin):
474 """Launch a controller using mpiexec."""
475 """Launch a controller using mpiexec."""
475
476
476 controller_cmd = List(ipcontroller_cmd_argv, config=True,
477 # alias back to *non-configurable* program[_args] for use in find_args()
477 help="Popen command to launch the Contropper"
478 # this way all Controller/EngineSetLaunchers have the same form, rather
478 )
479 # than *some* having `program_args` and others `controller_args`
479 controller_args = List(['--log-to-file','--log-level=%i'%logging.INFO], config=True,
480 @property
480 help="Command line arguments to pass to ipcontroller."
481 def program(self):
481 )
482 return self.controller_cmd
482 n = Int(1)
483
484 @property
485 def program_args(self):
486 return self.cluster_args + self.controller_args
483
487
484 def start(self, profile_dir):
488 def start(self):
485 """Start the controller by profile_dir."""
489 """Start the controller by profile_dir."""
486 self.controller_args.extend(['--profile-dir=%s'%profile_dir])
487 self.profile_dir = unicode(profile_dir)
488 self.log.info("Starting MPIExecControllerLauncher: %r" % self.args)
490 self.log.info("Starting MPIExecControllerLauncher: %r" % self.args)
489 return super(MPIExecControllerLauncher, self).start(1)
491 return super(MPIExecControllerLauncher, self).start(1)
490
492
491 def find_args(self):
492 return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \
493 self.controller_cmd + self.controller_args
494
495
493
496 class MPIExecEngineSetLauncher(MPIExecLauncher):
494 class MPIExecEngineSetLauncher(MPIExecLauncher, EngineMixin):
495 """Launch engines using mpiexec"""
497
496
498 program = List(ipengine_cmd_argv, config=True,
497 # alias back to *non-configurable* program[_args] for use in find_args()
499 help="Popen command for ipengine"
498 # this way all Controller/EngineSetLaunchers have the same form, rather
500 )
499 # than *some* having `program_args` and others `controller_args`
501 program_args = List(
500 @property
502 ['--log-to-file','--log-level=%i'%logging.INFO], config=True,
501 def program(self):
503 help="Command line arguments for ipengine."
502 return self.engine_cmd
504 )
503
505 n = Int(1)
504 @property
505 def program_args(self):
506 return self.cluster_args + self.engine_args
506
507
507 def start(self, n, profile_dir):
508 def start(self, n):
508 """Start n engines by profile or profile_dir."""
509 """Start n engines by profile or profile_dir."""
509 self.program_args.extend(['--profile-dir=%s'%profile_dir])
510 self.profile_dir = unicode(profile_dir)
511 self.n = n
510 self.n = n
512 self.log.info('Starting MPIExecEngineSetLauncher: %r' % self.args)
511 self.log.info('Starting MPIExecEngineSetLauncher: %r' % self.args)
513 return super(MPIExecEngineSetLauncher, self).start(n)
512 return super(MPIExecEngineSetLauncher, self).start(n)
@@ -530,9 +529,9 b' class SSHLauncher(LocalProcessLauncher):'
530 help="command for starting ssh")
529 help="command for starting ssh")
531 ssh_args = List(['-tt'], config=True,
530 ssh_args = List(['-tt'], config=True,
532 help="args to pass to ssh")
531 help="args to pass to ssh")
533 program = List(['date'], config=True,
532 program = List(['date'],
534 help="Program to launch via ssh")
533 help="Program to launch via ssh")
535 program_args = List([], config=True,
534 program_args = List([],
536 help="args to pass to remote program")
535 help="args to pass to remote program")
537 hostname = Unicode('', config=True,
536 hostname = Unicode('', config=True,
538 help="hostname on which to launch the program")
537 help="hostname on which to launch the program")
@@ -554,8 +553,7 b' class SSHLauncher(LocalProcessLauncher):'
554 return self.ssh_cmd + self.ssh_args + [self.location] + \
553 return self.ssh_cmd + self.ssh_args + [self.location] + \
555 self.program + self.program_args
554 self.program + self.program_args
556
555
557 def start(self, profile_dir, hostname=None, user=None):
556 def start(self, hostname=None, user=None):
558 self.profile_dir = unicode(profile_dir)
559 if hostname is not None:
557 if hostname is not None:
560 self.hostname = hostname
558 self.hostname = hostname
561 if user is not None:
559 if user is not None:
@@ -571,22 +569,33 b' class SSHLauncher(LocalProcessLauncher):'
571
569
572
570
573
571
574 class SSHControllerLauncher(SSHLauncher):
572 class SSHControllerLauncher(SSHLauncher, ControllerMixin):
573
574 # alias back to *non-configurable* program[_args] for use in find_args()
575 # this way all Controller/EngineSetLaunchers have the same form, rather
576 # than *some* having `program_args` and others `controller_args`
577 @property
578 def program(self):
579 return self.controller_cmd
580
581 @property
582 def program_args(self):
583 return self.cluster_args + self.controller_args
575
584
576 program = List(ipcontroller_cmd_argv, config=True,
577 help="remote ipcontroller command.")
578 program_args = List(['--reuse-files', '--log-to-file','--log-level=%i'%logging.INFO], config=True,
579 help="Command line arguments to ipcontroller.")
580
585
586 class SSHEngineLauncher(SSHLauncher, EngineMixin):
581
587
582 class SSHEngineLauncher(SSHLauncher):
588 # alias back to *non-configurable* program[_args] for use in find_args()
583 program = List(ipengine_cmd_argv, config=True,
589 # this way all Controller/EngineSetLaunchers have the same form, rather
584 help="remote ipengine command.")
590 # than *some* having `program_args` and others `controller_args`
585 # Command line arguments for ipengine.
591 @property
586 program_args = List(
592 def program(self):
587 ['--log-to-file','--log_level=%i'%logging.INFO], config=True,
593 return self.engine_cmd
588 help="Command line arguments to ipengine."
594
589 )
595 @property
596 def program_args(self):
597 return self.cluster_args + self.engine_args
598
590
599
591 class SSHEngineSetLauncher(LocalEngineSetLauncher):
600 class SSHEngineSetLauncher(LocalEngineSetLauncher):
592 launcher_class = SSHEngineLauncher
601 launcher_class = SSHEngineLauncher
@@ -594,12 +603,11 b' class SSHEngineSetLauncher(LocalEngineSetLauncher):'
594 help="""dict of engines to launch. This is a dict by hostname of ints,
603 help="""dict of engines to launch. This is a dict by hostname of ints,
595 corresponding to the number of engines to start on that host.""")
604 corresponding to the number of engines to start on that host.""")
596
605
597 def start(self, n, profile_dir):
606 def start(self, n):
598 """Start engines by profile or profile_dir.
607 """Start engines by profile or profile_dir.
599 `n` is ignored, and the `engines` config property is used instead.
608 `n` is ignored, and the `engines` config property is used instead.
600 """
609 """
601
610
602 self.profile_dir = unicode(profile_dir)
603 dlist = []
611 dlist = []
604 for host, n in self.engines.iteritems():
612 for host, n in self.engines.iteritems():
605 if isinstance(n, (tuple, list)):
613 if isinstance(n, (tuple, list)):
@@ -614,13 +622,15 b' class SSHEngineSetLauncher(LocalEngineSetLauncher):'
614 for i in range(n):
622 for i in range(n):
615 if i > 0:
623 if i > 0:
616 time.sleep(self.delay)
624 time.sleep(self.delay)
617 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log)
625 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log,
626 profile_dir=self.profile_dir, cluster_id=self.cluster_id,
627 )
618
628
619 # Copy the engine args over to each engine launcher.
629 # Copy the engine args over to each engine launcher.
620 i
630 el.engine_cmd = self.engine_cmd
621 el.program_args = args
631 el.engine_args = args
622 el.on_stop(self._notice_engine_stopped)
632 el.on_stop(self._notice_engine_stopped)
623 d = el.start(profile_dir, user=user, hostname=host)
633 d = el.start(user=user, hostname=host)
624 if i==0:
634 if i==0:
625 self.log.info("Starting SSHEngineSetLauncher: %r" % el.args)
635 self.log.info("Starting SSHEngineSetLauncher: %r" % el.args)
626 self.launchers[host+str(i)] = el
636 self.launchers[host+str(i)] = el
@@ -727,11 +737,11 b' class WindowsHPCLauncher(BaseLauncher):'
727 return output
737 return output
728
738
729
739
730 class WindowsHPCControllerLauncher(WindowsHPCLauncher):
740 class WindowsHPCControllerLauncher(WindowsHPCLauncher, ClusterAppMixin):
731
741
732 job_file_name = Unicode(u'ipcontroller_job.xml', config=True,
742 job_file_name = Unicode(u'ipcontroller_job.xml', config=True,
733 help="WinHPC xml job file.")
743 help="WinHPC xml job file.")
734 extra_args = List([], config=False,
744 controller_args = List([], config=False,
735 help="extra args to pass to ipcontroller")
745 help="extra args to pass to ipcontroller")
736
746
737 def write_job_file(self, n):
747 def write_job_file(self, n):
@@ -743,7 +753,8 b' class WindowsHPCControllerLauncher(WindowsHPCLauncher):'
743 # files that the scheduler redirects to.
753 # files that the scheduler redirects to.
744 t.work_directory = self.profile_dir
754 t.work_directory = self.profile_dir
745 # Add the profile_dir and from self.start().
755 # Add the profile_dir and from self.start().
746 t.controller_args.extend(self.extra_args)
756 t.controller_args.extend(self.cluster_args)
757 t.controller_args.extend(self.controller_args)
747 job.add_task(t)
758 job.add_task(t)
748
759
749 self.log.info("Writing job description file: %s" % self.job_file)
760 self.log.info("Writing job description file: %s" % self.job_file)
@@ -753,18 +764,16 b' class WindowsHPCControllerLauncher(WindowsHPCLauncher):'
753 def job_file(self):
764 def job_file(self):
754 return os.path.join(self.profile_dir, self.job_file_name)
765 return os.path.join(self.profile_dir, self.job_file_name)
755
766
756 def start(self, profile_dir):
767 def start(self):
757 """Start the controller by profile_dir."""
768 """Start the controller by profile_dir."""
758 self.extra_args = ['--profile-dir=%s'%profile_dir]
759 self.profile_dir = unicode(profile_dir)
760 return super(WindowsHPCControllerLauncher, self).start(1)
769 return super(WindowsHPCControllerLauncher, self).start(1)
761
770
762
771
763 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):
772 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher, ClusterAppMixin):
764
773
765 job_file_name = Unicode(u'ipengineset_job.xml', config=True,
774 job_file_name = Unicode(u'ipengineset_job.xml', config=True,
766 help="jobfile for ipengines job")
775 help="jobfile for ipengines job")
767 extra_args = List([], config=False,
776 engine_args = List([], config=False,
768 help="extra args to pas to ipengine")
777 help="extra args to pas to ipengine")
769
778
770 def write_job_file(self, n):
779 def write_job_file(self, n):
@@ -777,7 +786,8 b' class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):'
777 # files that the scheduler redirects to.
786 # files that the scheduler redirects to.
778 t.work_directory = self.profile_dir
787 t.work_directory = self.profile_dir
779 # Add the profile_dir and from self.start().
788 # Add the profile_dir and from self.start().
780 t.engine_args.extend(self.extra_args)
789 t.controller_args.extend(self.cluster_args)
790 t.controller_args.extend(self.engine_args)
781 job.add_task(t)
791 job.add_task(t)
782
792
783 self.log.info("Writing job description file: %s" % self.job_file)
793 self.log.info("Writing job description file: %s" % self.job_file)
@@ -787,10 +797,8 b' class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):'
787 def job_file(self):
797 def job_file(self):
788 return os.path.join(self.profile_dir, self.job_file_name)
798 return os.path.join(self.profile_dir, self.job_file_name)
789
799
790 def start(self, n, profile_dir):
800 def start(self, n):
791 """Start the controller by profile_dir."""
801 """Start the controller by profile_dir."""
792 self.extra_args = ['--profile-dir=%s'%profile_dir]
793 self.profile_dir = unicode(profile_dir)
794 return super(WindowsHPCEngineSetLauncher, self).start(n)
802 return super(WindowsHPCEngineSetLauncher, self).start(n)
795
803
796
804
@@ -798,6 +806,13 b' class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):'
798 # Batch (PBS) system launchers
806 # Batch (PBS) system launchers
799 #-----------------------------------------------------------------------------
807 #-----------------------------------------------------------------------------
800
808
809 class BatchClusterAppMixin(ClusterAppMixin):
810 """ClusterApp mixin that updates context dict, rather than args"""
811 context = Dict({'profile_dir':'', 'cluster_id':''})
812 def _profile_dir_changed(self, name, old, new):
813 self.context[name] = new
814 _cluster_id_changed = _profile_dir_changed
815
801 class BatchSystemLauncher(BaseLauncher):
816 class BatchSystemLauncher(BaseLauncher):
802 """Launch an external process using a batch system.
817 """Launch an external process using a batch system.
803
818
@@ -829,6 +844,12 b' class BatchSystemLauncher(BaseLauncher):'
829 queue = Unicode(u'', config=True,
844 queue = Unicode(u'', config=True,
830 help="The PBS Queue.")
845 help="The PBS Queue.")
831
846
847 def _queue_changed(self, name, old, new):
848 self.context[name] = new
849
850 n = Int(1)
851 _n_changed = _queue_changed
852
832 # not configurable, override in subclasses
853 # not configurable, override in subclasses
833 # PBS Job Array regex
854 # PBS Job Array regex
834 job_array_regexp = Unicode('')
855 job_array_regexp = Unicode('')
@@ -868,8 +889,7 b' class BatchSystemLauncher(BaseLauncher):'
868
889
869 def write_batch_script(self, n):
890 def write_batch_script(self, n):
870 """Instantiate and write the batch script to the work_dir."""
891 """Instantiate and write the batch script to the work_dir."""
871 self.context['n'] = n
892 self.n = n
872 self.context['queue'] = self.queue
873 # first priority is batch_template if set
893 # first priority is batch_template if set
874 if self.batch_template_file and not self.batch_template:
894 if self.batch_template_file and not self.batch_template:
875 # second priority is batch_template_file
895 # second priority is batch_template_file
@@ -902,12 +922,10 b' class BatchSystemLauncher(BaseLauncher):'
902 f.write(script_as_string)
922 f.write(script_as_string)
903 os.chmod(self.batch_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
923 os.chmod(self.batch_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
904
924
905 def start(self, n, profile_dir):
925 def start(self, n):
906 """Start n copies of the process using a batch system."""
926 """Start n copies of the process using a batch system."""
907 # Here we save profile_dir in the context so they
927 # Here we save profile_dir in the context so they
908 # can be used in the batch script template as {profile_dir}
928 # can be used in the batch script template as {profile_dir}
909 self.context['profile_dir'] = profile_dir
910 self.profile_dir = unicode(profile_dir)
911 self.write_batch_script(n)
929 self.write_batch_script(n)
912 output = check_output(self.args, env=os.environ)
930 output = check_output(self.args, env=os.environ)
913
931
@@ -938,7 +956,7 b' class PBSLauncher(BatchSystemLauncher):'
938 queue_template = Unicode('#PBS -q {queue}')
956 queue_template = Unicode('#PBS -q {queue}')
939
957
940
958
941 class PBSControllerLauncher(PBSLauncher):
959 class PBSControllerLauncher(BatchClusterAppMixin, PBSLauncher):
942 """Launch a controller using PBS."""
960 """Launch a controller using PBS."""
943
961
944 batch_file_name = Unicode(u'pbs_controller', config=True,
962 batch_file_name = Unicode(u'pbs_controller', config=True,
@@ -946,29 +964,30 b' class PBSControllerLauncher(PBSLauncher):'
946 default_template= Unicode("""#!/bin/sh
964 default_template= Unicode("""#!/bin/sh
947 #PBS -V
965 #PBS -V
948 #PBS -N ipcontroller
966 #PBS -N ipcontroller
949 %s --log-to-file --profile-dir={profile_dir}
967 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
950 """%(' '.join(ipcontroller_cmd_argv)))
968 """%(' '.join(ipcontroller_cmd_argv)))
951
969
952 def start(self, profile_dir):
970
971 def start(self):
953 """Start the controller by profile or profile_dir."""
972 """Start the controller by profile or profile_dir."""
954 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
973 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
955 return super(PBSControllerLauncher, self).start(1, profile_dir)
974 return super(PBSControllerLauncher, self).start(1)
956
975
957
976
958 class PBSEngineSetLauncher(PBSLauncher):
977 class PBSEngineSetLauncher(BatchClusterAppMixin, PBSLauncher):
959 """Launch Engines using PBS"""
978 """Launch Engines using PBS"""
960 batch_file_name = Unicode(u'pbs_engines', config=True,
979 batch_file_name = Unicode(u'pbs_engines', config=True,
961 help="batch file name for the engine(s) job.")
980 help="batch file name for the engine(s) job.")
962 default_template= Unicode(u"""#!/bin/sh
981 default_template= Unicode(u"""#!/bin/sh
963 #PBS -V
982 #PBS -V
964 #PBS -N ipengine
983 #PBS -N ipengine
965 %s --profile-dir={profile_dir}
984 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
966 """%(' '.join(ipengine_cmd_argv)))
985 """%(' '.join(ipengine_cmd_argv)))
967
986
968 def start(self, n, profile_dir):
987 def start(self, n):
969 """Start n engines by profile or profile_dir."""
988 """Start n engines by profile or profile_dir."""
970 self.log.info('Starting %i engines with PBSEngineSetLauncher: %r' % (n, self.args))
989 self.log.info('Starting %i engines with PBSEngineSetLauncher: %r' % (n, self.args))
971 return super(PBSEngineSetLauncher, self).start(n, profile_dir)
990 return super(PBSEngineSetLauncher, self).start(n)
972
991
973 #SGE is very similar to PBS
992 #SGE is very similar to PBS
974
993
@@ -979,7 +998,7 b' class SGELauncher(PBSLauncher):'
979 queue_regexp = Unicode('#\$\W+-q\W+\$?\w+')
998 queue_regexp = Unicode('#\$\W+-q\W+\$?\w+')
980 queue_template = Unicode('#$ -q {queue}')
999 queue_template = Unicode('#$ -q {queue}')
981
1000
982 class SGEControllerLauncher(SGELauncher):
1001 class SGEControllerLauncher(BatchClusterAppMixin, SGELauncher):
983 """Launch a controller using SGE."""
1002 """Launch a controller using SGE."""
984
1003
985 batch_file_name = Unicode(u'sge_controller', config=True,
1004 batch_file_name = Unicode(u'sge_controller', config=True,
@@ -987,28 +1006,28 b' class SGEControllerLauncher(SGELauncher):'
987 default_template= Unicode(u"""#$ -V
1006 default_template= Unicode(u"""#$ -V
988 #$ -S /bin/sh
1007 #$ -S /bin/sh
989 #$ -N ipcontroller
1008 #$ -N ipcontroller
990 %s --log-to-file --profile-dir={profile_dir}
1009 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
991 """%(' '.join(ipcontroller_cmd_argv)))
1010 """%(' '.join(ipcontroller_cmd_argv)))
992
1011
993 def start(self, profile_dir):
1012 def start(self):
994 """Start the controller by profile or profile_dir."""
1013 """Start the controller by profile or profile_dir."""
995 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
1014 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
996 return super(SGEControllerLauncher, self).start(1, profile_dir)
1015 return super(SGEControllerLauncher, self).start(1)
997
1016
998 class SGEEngineSetLauncher(SGELauncher):
1017 class SGEEngineSetLauncher(BatchClusterAppMixin, SGELauncher):
999 """Launch Engines with SGE"""
1018 """Launch Engines with SGE"""
1000 batch_file_name = Unicode(u'sge_engines', config=True,
1019 batch_file_name = Unicode(u'sge_engines', config=True,
1001 help="batch file name for the engine(s) job.")
1020 help="batch file name for the engine(s) job.")
1002 default_template = Unicode("""#$ -V
1021 default_template = Unicode("""#$ -V
1003 #$ -S /bin/sh
1022 #$ -S /bin/sh
1004 #$ -N ipengine
1023 #$ -N ipengine
1005 %s --profile-dir={profile_dir}
1024 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1006 """%(' '.join(ipengine_cmd_argv)))
1025 """%(' '.join(ipengine_cmd_argv)))
1007
1026
1008 def start(self, n, profile_dir):
1027 def start(self, n):
1009 """Start n engines by profile or profile_dir."""
1028 """Start n engines by profile or profile_dir."""
1010 self.log.info('Starting %i engines with SGEEngineSetLauncher: %r' % (n, self.args))
1029 self.log.info('Starting %i engines with SGEEngineSetLauncher: %r' % (n, self.args))
1011 return super(SGEEngineSetLauncher, self).start(n, profile_dir)
1030 return super(SGEEngineSetLauncher, self).start(n)
1012
1031
1013
1032
1014 # LSF launchers
1033 # LSF launchers
@@ -1029,7 +1048,7 b' class LSFLauncher(BatchSystemLauncher):'
1029 queue_regexp = Unicode('#BSUB[ \t]+-q[ \t]+\w+')
1048 queue_regexp = Unicode('#BSUB[ \t]+-q[ \t]+\w+')
1030 queue_template = Unicode('#BSUB -q {queue}')
1049 queue_template = Unicode('#BSUB -q {queue}')
1031
1050
1032 def start(self, n, profile_dir):
1051 def start(self, n):
1033 """Start n copies of the process using LSF batch system.
1052 """Start n copies of the process using LSF batch system.
1034 This cant inherit from the base class because bsub expects
1053 This cant inherit from the base class because bsub expects
1035 to be piped a shell script in order to honor the #BSUB directives :
1054 to be piped a shell script in order to honor the #BSUB directives :
@@ -1037,8 +1056,6 b' class LSFLauncher(BatchSystemLauncher):'
1037 """
1056 """
1038 # Here we save profile_dir in the context so they
1057 # Here we save profile_dir in the context so they
1039 # can be used in the batch script template as {profile_dir}
1058 # can be used in the batch script template as {profile_dir}
1040 self.context['profile_dir'] = profile_dir
1041 self.profile_dir = unicode(profile_dir)
1042 self.write_batch_script(n)
1059 self.write_batch_script(n)
1043 #output = check_output(self.args, env=os.environ)
1060 #output = check_output(self.args, env=os.environ)
1044 piped_cmd = self.args[0]+'<\"'+self.args[1]+'\"'
1061 piped_cmd = self.args[0]+'<\"'+self.args[1]+'\"'
@@ -1049,7 +1066,7 b' class LSFLauncher(BatchSystemLauncher):'
1049 return job_id
1066 return job_id
1050
1067
1051
1068
1052 class LSFControllerLauncher(LSFLauncher):
1069 class LSFControllerLauncher(BatchClusterAppMixin, LSFLauncher):
1053 """Launch a controller using LSF."""
1070 """Launch a controller using LSF."""
1054
1071
1055 batch_file_name = Unicode(u'lsf_controller', config=True,
1072 batch_file_name = Unicode(u'lsf_controller', config=True,
@@ -1058,29 +1075,29 b' class LSFControllerLauncher(LSFLauncher):'
1058 #BSUB -J ipcontroller
1075 #BSUB -J ipcontroller
1059 #BSUB -oo ipcontroller.o.%%J
1076 #BSUB -oo ipcontroller.o.%%J
1060 #BSUB -eo ipcontroller.e.%%J
1077 #BSUB -eo ipcontroller.e.%%J
1061 %s --log-to-file --profile-dir={profile_dir}
1078 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1062 """%(' '.join(ipcontroller_cmd_argv)))
1079 """%(' '.join(ipcontroller_cmd_argv)))
1063
1080
1064 def start(self, profile_dir):
1081 def start(self):
1065 """Start the controller by profile or profile_dir."""
1082 """Start the controller by profile or profile_dir."""
1066 self.log.info("Starting LSFControllerLauncher: %r" % self.args)
1083 self.log.info("Starting LSFControllerLauncher: %r" % self.args)
1067 return super(LSFControllerLauncher, self).start(1, profile_dir)
1084 return super(LSFControllerLauncher, self).start(1)
1068
1085
1069
1086
1070 class LSFEngineSetLauncher(LSFLauncher):
1087 class LSFEngineSetLauncher(BatchClusterAppMixin, LSFLauncher):
1071 """Launch Engines using LSF"""
1088 """Launch Engines using LSF"""
1072 batch_file_name = Unicode(u'lsf_engines', config=True,
1089 batch_file_name = Unicode(u'lsf_engines', config=True,
1073 help="batch file name for the engine(s) job.")
1090 help="batch file name for the engine(s) job.")
1074 default_template= Unicode(u"""#!/bin/sh
1091 default_template= Unicode(u"""#!/bin/sh
1075 #BSUB -oo ipengine.o.%%J
1092 #BSUB -oo ipengine.o.%%J
1076 #BSUB -eo ipengine.e.%%J
1093 #BSUB -eo ipengine.e.%%J
1077 %s --profile-dir={profile_dir}
1094 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1078 """%(' '.join(ipengine_cmd_argv)))
1095 """%(' '.join(ipengine_cmd_argv)))
1079
1096
1080 def start(self, n, profile_dir):
1097 def start(self, n):
1081 """Start n engines by profile or profile_dir."""
1098 """Start n engines by profile or profile_dir."""
1082 self.log.info('Starting %i engines with LSFEngineSetLauncher: %r' % (n, self.args))
1099 self.log.info('Starting %i engines with LSFEngineSetLauncher: %r' % (n, self.args))
1083 return super(LSFEngineSetLauncher, self).start(n, profile_dir)
1100 return super(LSFEngineSetLauncher, self).start(n)
1084
1101
1085
1102
1086 #-----------------------------------------------------------------------------
1103 #-----------------------------------------------------------------------------
General Comments 0
You need to be logged in to leave comments. Login now