##// END OF EJS Templates
add cluster_id support to ipcluster/launchers...
MinRK -
Show More
@@ -295,16 +295,14 b' class IPClusterEngines(BaseParallelApplication):'
295 295 self.exit(1)
296 296
297 297 launcher = klass(
298 work_dir=u'.', config=self.config, log=self.log
298 work_dir=u'.', config=self.config, log=self.log,
299 profile_dir=self.profile_dir.location, cluster_id=self.cluster_id,
299 300 )
300 301 return launcher
301 302
302 303 def start_engines(self):
303 304 self.log.info("Starting %i engines"%self.n)
304 self.engine_launcher.start(
305 self.n,
306 self.profile_dir.location
307 )
305 self.engine_launcher.start(self.n)
308 306
309 307 def stop_engines(self):
310 308 self.log.info("Stopping Engines...")
@@ -429,9 +427,7 b' class IPClusterStart(IPClusterEngines):'
429 427 self.controller_launcher.on_stop(self.stop_launchers)
430 428
431 429 def start_controller(self):
432 self.controller_launcher.start(
433 self.profile_dir.location
434 )
430 self.controller_launcher.start()
435 431
436 432 def stop_controller(self):
437 433 # self.log.info("In stop_controller")
@@ -57,7 +57,9 b' from zmq.eventloop import ioloop'
57 57 from IPython.config.application import Application
58 58 from IPython.config.configurable import LoggingConfigurable
59 59 from IPython.utils.text import EvalFormatter
60 from IPython.utils.traitlets import Any, Int, CFloat, List, Unicode, Dict, Instance
60 from IPython.utils.traitlets import (
61 Any, Int, CFloat, List, Unicode, Dict, Instance, HasTraits,
62 )
61 63 from IPython.utils.path import get_ipython_module_path
62 64 from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError
63 65
@@ -213,6 +215,33 b' class BaseLauncher(LoggingConfigurable):'
213 215 """
214 216 raise NotImplementedError('signal must be implemented in a subclass')
215 217
218 class ClusterAppMixin(HasTraits):
219 """MixIn for cluster args as traits"""
220 cluster_args = List([])
221 profile_dir=Unicode('')
222 cluster_id=Unicode('')
223 def _profile_dir_changed(self, name, old, new):
224 self.cluster_args = []
225 if self.profile_dir:
226 self.cluster_args.extend(['--profile-dir', self.profile_dir])
227 if self.cluster_id:
228 self.cluster_args.extend(['--cluster-id', self.cluster_id])
229 _cluster_id_changed = _profile_dir_changed
230
231 class ControllerMixin(ClusterAppMixin):
232 controller_cmd = List(ipcontroller_cmd_argv, config=True,
233 help="""Popen command to launch ipcontroller.""")
234 # Command line arguments to ipcontroller.
235 controller_args = List(['--log-to-file','--log-level=%i'%logging.INFO], config=True,
236 help="""command-line args to pass to ipcontroller""")
237
238 class EngineMixin(ClusterAppMixin):
239 engine_cmd = List(ipengine_cmd_argv, config=True,
240 help="""command to launch the Engine.""")
241 # Command line arguments for ipengine.
242 engine_args = List(['--log-to-file','--log-level=%i'%logging.INFO], config=True,
243 help="command-line arguments to pass to ipengine"
244 )
216 245
217 246 #-----------------------------------------------------------------------------
218 247 # Local process launchers
@@ -317,54 +346,28 b' class LocalProcessLauncher(BaseLauncher):'
317 346 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
318 347 return status
319 348
320 class LocalControllerLauncher(LocalProcessLauncher):
349 class LocalControllerLauncher(LocalProcessLauncher, ControllerMixin):
321 350 """Launch a controller as a regular external process."""
322 351
323 controller_cmd = List(ipcontroller_cmd_argv, config=True,
324 help="""Popen command to launch ipcontroller.""")
325 # Command line arguments to ipcontroller.
326 controller_args = List(['--log-to-file','--log-level=%i'%logging.INFO], config=True,
327 help="""command-line args to pass to ipcontroller""")
328
329 352 def find_args(self):
330 return self.controller_cmd + self.controller_args
353 return self.controller_cmd + self.cluster_args + self.controller_args
331 354
332 def start(self, profile_dir):
355 def start(self):
333 356 """Start the controller by profile_dir."""
334 self.controller_args.extend(['--profile-dir=%s'%profile_dir])
335 self.profile_dir = unicode(profile_dir)
336 357 self.log.info("Starting LocalControllerLauncher: %r" % self.args)
337 358 return super(LocalControllerLauncher, self).start()
338 359
339 360
340 class LocalEngineLauncher(LocalProcessLauncher):
361 class LocalEngineLauncher(LocalProcessLauncher, EngineMixin):
341 362 """Launch a single engine as a regular externall process."""
342 363
343 engine_cmd = List(ipengine_cmd_argv, config=True,
344 help="""command to launch the Engine.""")
345 # Command line arguments for ipengine.
346 engine_args = List(['--log-to-file','--log-level=%i'%logging.INFO], config=True,
347 help="command-line arguments to pass to ipengine"
348 )
349
350 364 def find_args(self):
351 return self.engine_cmd + self.engine_args
352
353 def start(self, profile_dir):
354 """Start the engine by profile_dir."""
355 self.engine_args.extend(['--profile-dir=%s'%profile_dir])
356 self.profile_dir = unicode(profile_dir)
357 return super(LocalEngineLauncher, self).start()
365 return self.engine_cmd + self.cluster_args + self.engine_args
358 366
359 367
360 class LocalEngineSetLauncher(BaseLauncher):
368 class LocalEngineSetLauncher(LocalEngineLauncher):
361 369 """Launch a set of engines as regular external processes."""
362 370
363 # Command line arguments for ipengine.
364 engine_args = List(
365 ['--log-to-file','--log-level=%i'%logging.INFO], config=True,
366 help="command-line arguments to pass to ipengine"
367 )
368 371 delay = CFloat(0.1, config=True,
369 372 help="""delay (in seconds) between starting each engine after the first.
370 373 This can help force the engines to get their ids in order, or limit
@@ -383,26 +386,26 b' class LocalEngineSetLauncher(BaseLauncher):'
383 386 )
384 387 self.stop_data = {}
385 388
386 def start(self, n, profile_dir):
389 def start(self, n):
387 390 """Start n engines by profile or profile_dir."""
388 self.profile_dir = unicode(profile_dir)
389 391 dlist = []
390 392 for i in range(n):
391 393 if i > 0:
392 394 time.sleep(self.delay)
393 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log)
395 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log,
396 profile_dir=self.profile_dir, cluster_id=self.cluster_id,
397 )
398
394 399 # Copy the engine args over to each engine launcher.
400 el.engine_cmd = copy.deepcopy(self.engine_cmd)
395 401 el.engine_args = copy.deepcopy(self.engine_args)
396 402 el.on_stop(self._notice_engine_stopped)
397 d = el.start(profile_dir)
403 d = el.start()
398 404 if i==0:
399 405 self.log.info("Starting LocalEngineSetLauncher: %r" % el.args)
400 406 self.launchers[i] = el
401 407 dlist.append(d)
402 408 self.notify_start(dlist)
403 # The consumeErrors here could be dangerous
404 # dfinal = gatherBoth(dlist, consumeErrors=True)
405 # dfinal.addCallback(self.notify_start)
406 409 return dlist
407 410
408 411 def find_args(self):
@@ -413,7 +416,6 b' class LocalEngineSetLauncher(BaseLauncher):'
413 416 for el in self.launchers.itervalues():
414 417 d = el.signal(sig)
415 418 dlist.append(d)
416 # dfinal = gatherBoth(dlist, consumeErrors=True)
417 419 return dlist
418 420
419 421 def interrupt_then_kill(self, delay=1.0):
@@ -421,7 +423,6 b' class LocalEngineSetLauncher(BaseLauncher):'
421 423 for el in self.launchers.itervalues():
422 424 d = el.interrupt_then_kill(delay)
423 425 dlist.append(d)
424 # dfinal = gatherBoth(dlist, consumeErrors=True)
425 426 return dlist
426 427
427 428 def stop(self):
@@ -452,9 +453,9 b' class MPIExecLauncher(LocalProcessLauncher):'
452 453 mpi_args = List([], config=True,
453 454 help="The command line arguments to pass to mpiexec."
454 455 )
455 program = List(['date'], config=True,
456 program = List(['date'],
456 457 help="The program to start via mpiexec.")
457 program_args = List([], config=True,
458 program_args = List([],
458 459 help="The command line argument to the program."
459 460 )
460 461 n = Int(1)
@@ -470,44 +471,42 b' class MPIExecLauncher(LocalProcessLauncher):'
470 471 return super(MPIExecLauncher, self).start()
471 472
472 473
473 class MPIExecControllerLauncher(MPIExecLauncher):
474 class MPIExecControllerLauncher(MPIExecLauncher, ControllerMixin):
474 475 """Launch a controller using mpiexec."""
475 476
476 controller_cmd = List(ipcontroller_cmd_argv, config=True,
477 help="Popen command to launch the Contropper"
478 )
479 controller_args = List(['--log-to-file','--log-level=%i'%logging.INFO], config=True,
480 help="Command line arguments to pass to ipcontroller."
481 )
482 n = Int(1)
477 # alias back to *non-configurable* program[_args] for use in find_args()
478 # this way all Controller/EngineSetLaunchers have the same form, rather
479 # than *some* having `program_args` and others `controller_args`
480 @property
481 def program(self):
482 return self.controller_cmd
483
484 @property
485 def program_args(self):
486 return self.cluster_args + self.controller_args
483 487
484 def start(self, profile_dir):
488 def start(self):
485 489 """Start the controller by profile_dir."""
486 self.controller_args.extend(['--profile-dir=%s'%profile_dir])
487 self.profile_dir = unicode(profile_dir)
488 490 self.log.info("Starting MPIExecControllerLauncher: %r" % self.args)
489 491 return super(MPIExecControllerLauncher, self).start(1)
490 492
491 def find_args(self):
492 return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \
493 self.controller_cmd + self.controller_args
494
495 493
496 class MPIExecEngineSetLauncher(MPIExecLauncher):
494 class MPIExecEngineSetLauncher(MPIExecLauncher, EngineMixin):
495 """Launch engines using mpiexec"""
497 496
498 program = List(ipengine_cmd_argv, config=True,
499 help="Popen command for ipengine"
500 )
501 program_args = List(
502 ['--log-to-file','--log-level=%i'%logging.INFO], config=True,
503 help="Command line arguments for ipengine."
504 )
505 n = Int(1)
497 # alias back to *non-configurable* program[_args] for use in find_args()
498 # this way all Controller/EngineSetLaunchers have the same form, rather
499 # than *some* having `program_args` and others `controller_args`
500 @property
501 def program(self):
502 return self.engine_cmd
503
504 @property
505 def program_args(self):
506 return self.cluster_args + self.engine_args
506 507
507 def start(self, n, profile_dir):
508 def start(self, n):
508 509 """Start n engines by profile or profile_dir."""
509 self.program_args.extend(['--profile-dir=%s'%profile_dir])
510 self.profile_dir = unicode(profile_dir)
511 510 self.n = n
512 511 self.log.info('Starting MPIExecEngineSetLauncher: %r' % self.args)
513 512 return super(MPIExecEngineSetLauncher, self).start(n)
@@ -530,9 +529,9 b' class SSHLauncher(LocalProcessLauncher):'
530 529 help="command for starting ssh")
531 530 ssh_args = List(['-tt'], config=True,
532 531 help="args to pass to ssh")
533 program = List(['date'], config=True,
532 program = List(['date'],
534 533 help="Program to launch via ssh")
535 program_args = List([], config=True,
534 program_args = List([],
536 535 help="args to pass to remote program")
537 536 hostname = Unicode('', config=True,
538 537 help="hostname on which to launch the program")
@@ -554,8 +553,7 b' class SSHLauncher(LocalProcessLauncher):'
554 553 return self.ssh_cmd + self.ssh_args + [self.location] + \
555 554 self.program + self.program_args
556 555
557 def start(self, profile_dir, hostname=None, user=None):
558 self.profile_dir = unicode(profile_dir)
556 def start(self, hostname=None, user=None):
559 557 if hostname is not None:
560 558 self.hostname = hostname
561 559 if user is not None:
@@ -571,22 +569,33 b' class SSHLauncher(LocalProcessLauncher):'
571 569
572 570
573 571
574 class SSHControllerLauncher(SSHLauncher):
572 class SSHControllerLauncher(SSHLauncher, ControllerMixin):
573
574 # alias back to *non-configurable* program[_args] for use in find_args()
575 # this way all Controller/EngineSetLaunchers have the same form, rather
576 # than *some* having `program_args` and others `controller_args`
577 @property
578 def program(self):
579 return self.controller_cmd
580
581 @property
582 def program_args(self):
583 return self.cluster_args + self.controller_args
575 584
576 program = List(ipcontroller_cmd_argv, config=True,
577 help="remote ipcontroller command.")
578 program_args = List(['--reuse-files', '--log-to-file','--log-level=%i'%logging.INFO], config=True,
579 help="Command line arguments to ipcontroller.")
580 585
586 class SSHEngineLauncher(SSHLauncher, EngineMixin):
581 587
582 class SSHEngineLauncher(SSHLauncher):
583 program = List(ipengine_cmd_argv, config=True,
584 help="remote ipengine command.")
585 # Command line arguments for ipengine.
586 program_args = List(
587 ['--log-to-file','--log_level=%i'%logging.INFO], config=True,
588 help="Command line arguments to ipengine."
589 )
588 # alias back to *non-configurable* program[_args] for use in find_args()
589 # this way all Controller/EngineSetLaunchers have the same form, rather
590 # than *some* having `program_args` and others `controller_args`
591 @property
592 def program(self):
593 return self.engine_cmd
594
595 @property
596 def program_args(self):
597 return self.cluster_args + self.engine_args
598
590 599
591 600 class SSHEngineSetLauncher(LocalEngineSetLauncher):
592 601 launcher_class = SSHEngineLauncher
@@ -594,12 +603,11 b' class SSHEngineSetLauncher(LocalEngineSetLauncher):'
594 603 help="""dict of engines to launch. This is a dict by hostname of ints,
595 604 corresponding to the number of engines to start on that host.""")
596 605
597 def start(self, n, profile_dir):
606 def start(self, n):
598 607 """Start engines by profile or profile_dir.
599 608 `n` is ignored, and the `engines` config property is used instead.
600 609 """
601 610
602 self.profile_dir = unicode(profile_dir)
603 611 dlist = []
604 612 for host, n in self.engines.iteritems():
605 613 if isinstance(n, (tuple, list)):
@@ -614,13 +622,15 b' class SSHEngineSetLauncher(LocalEngineSetLauncher):'
614 622 for i in range(n):
615 623 if i > 0:
616 624 time.sleep(self.delay)
617 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log)
625 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log,
626 profile_dir=self.profile_dir, cluster_id=self.cluster_id,
627 )
618 628
619 629 # Copy the engine args over to each engine launcher.
620 i
621 el.program_args = args
630 el.engine_cmd = self.engine_cmd
631 el.engine_args = args
622 632 el.on_stop(self._notice_engine_stopped)
623 d = el.start(profile_dir, user=user, hostname=host)
633 d = el.start(user=user, hostname=host)
624 634 if i==0:
625 635 self.log.info("Starting SSHEngineSetLauncher: %r" % el.args)
626 636 self.launchers[host+str(i)] = el
@@ -727,11 +737,11 b' class WindowsHPCLauncher(BaseLauncher):'
727 737 return output
728 738
729 739
730 class WindowsHPCControllerLauncher(WindowsHPCLauncher):
740 class WindowsHPCControllerLauncher(WindowsHPCLauncher, ClusterAppMixin):
731 741
732 742 job_file_name = Unicode(u'ipcontroller_job.xml', config=True,
733 743 help="WinHPC xml job file.")
734 extra_args = List([], config=False,
744 controller_args = List([], config=False,
735 745 help="extra args to pass to ipcontroller")
736 746
737 747 def write_job_file(self, n):
@@ -743,7 +753,8 b' class WindowsHPCControllerLauncher(WindowsHPCLauncher):'
743 753 # files that the scheduler redirects to.
744 754 t.work_directory = self.profile_dir
745 755 # Add the profile_dir and from self.start().
746 t.controller_args.extend(self.extra_args)
756 t.controller_args.extend(self.cluster_args)
757 t.controller_args.extend(self.controller_args)
747 758 job.add_task(t)
748 759
749 760 self.log.info("Writing job description file: %s" % self.job_file)
@@ -753,18 +764,16 b' class WindowsHPCControllerLauncher(WindowsHPCLauncher):'
753 764 def job_file(self):
754 765 return os.path.join(self.profile_dir, self.job_file_name)
755 766
756 def start(self, profile_dir):
767 def start(self):
757 768 """Start the controller by profile_dir."""
758 self.extra_args = ['--profile-dir=%s'%profile_dir]
759 self.profile_dir = unicode(profile_dir)
760 769 return super(WindowsHPCControllerLauncher, self).start(1)
761 770
762 771
763 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):
772 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher, ClusterAppMixin):
764 773
765 774 job_file_name = Unicode(u'ipengineset_job.xml', config=True,
766 775 help="jobfile for ipengines job")
767 extra_args = List([], config=False,
776 engine_args = List([], config=False,
768 777 help="extra args to pas to ipengine")
769 778
770 779 def write_job_file(self, n):
@@ -777,7 +786,8 b' class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):'
777 786 # files that the scheduler redirects to.
778 787 t.work_directory = self.profile_dir
779 788 # Add the profile_dir and from self.start().
780 t.engine_args.extend(self.extra_args)
789 t.controller_args.extend(self.cluster_args)
790 t.controller_args.extend(self.engine_args)
781 791 job.add_task(t)
782 792
783 793 self.log.info("Writing job description file: %s" % self.job_file)
@@ -787,10 +797,8 b' class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):'
787 797 def job_file(self):
788 798 return os.path.join(self.profile_dir, self.job_file_name)
789 799
790 def start(self, n, profile_dir):
800 def start(self, n):
791 801 """Start the controller by profile_dir."""
792 self.extra_args = ['--profile-dir=%s'%profile_dir]
793 self.profile_dir = unicode(profile_dir)
794 802 return super(WindowsHPCEngineSetLauncher, self).start(n)
795 803
796 804
@@ -798,6 +806,13 b' class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):'
798 806 # Batch (PBS) system launchers
799 807 #-----------------------------------------------------------------------------
800 808
809 class BatchClusterAppMixin(ClusterAppMixin):
810 """ClusterApp mixin that updates context dict, rather than args"""
811 context = Dict({'profile_dir':'', 'cluster_id':''})
812 def _profile_dir_changed(self, name, old, new):
813 self.context[name] = new
814 _cluster_id_changed = _profile_dir_changed
815
801 816 class BatchSystemLauncher(BaseLauncher):
802 817 """Launch an external process using a batch system.
803 818
@@ -829,6 +844,12 b' class BatchSystemLauncher(BaseLauncher):'
829 844 queue = Unicode(u'', config=True,
830 845 help="The PBS Queue.")
831 846
847 def _queue_changed(self, name, old, new):
848 self.context[name] = new
849
850 n = Int(1)
851 _n_changed = _queue_changed
852
832 853 # not configurable, override in subclasses
833 854 # PBS Job Array regex
834 855 job_array_regexp = Unicode('')
@@ -868,8 +889,7 b' class BatchSystemLauncher(BaseLauncher):'
868 889
869 890 def write_batch_script(self, n):
870 891 """Instantiate and write the batch script to the work_dir."""
871 self.context['n'] = n
872 self.context['queue'] = self.queue
892 self.n = n
873 893 # first priority is batch_template if set
874 894 if self.batch_template_file and not self.batch_template:
875 895 # second priority is batch_template_file
@@ -902,12 +922,10 b' class BatchSystemLauncher(BaseLauncher):'
902 922 f.write(script_as_string)
903 923 os.chmod(self.batch_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
904 924
905 def start(self, n, profile_dir):
925 def start(self, n):
906 926 """Start n copies of the process using a batch system."""
907 927 # Here we save profile_dir in the context so they
908 928 # can be used in the batch script template as {profile_dir}
909 self.context['profile_dir'] = profile_dir
910 self.profile_dir = unicode(profile_dir)
911 929 self.write_batch_script(n)
912 930 output = check_output(self.args, env=os.environ)
913 931
@@ -938,7 +956,7 b' class PBSLauncher(BatchSystemLauncher):'
938 956 queue_template = Unicode('#PBS -q {queue}')
939 957
940 958
941 class PBSControllerLauncher(PBSLauncher):
959 class PBSControllerLauncher(BatchClusterAppMixin, PBSLauncher):
942 960 """Launch a controller using PBS."""
943 961
944 962 batch_file_name = Unicode(u'pbs_controller', config=True,
@@ -946,29 +964,30 b' class PBSControllerLauncher(PBSLauncher):'
946 964 default_template= Unicode("""#!/bin/sh
947 965 #PBS -V
948 966 #PBS -N ipcontroller
949 %s --log-to-file --profile-dir={profile_dir}
967 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
950 968 """%(' '.join(ipcontroller_cmd_argv)))
951 969
952 def start(self, profile_dir):
970
971 def start(self):
953 972 """Start the controller by profile or profile_dir."""
954 973 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
955 return super(PBSControllerLauncher, self).start(1, profile_dir)
974 return super(PBSControllerLauncher, self).start(1)
956 975
957 976
958 class PBSEngineSetLauncher(PBSLauncher):
977 class PBSEngineSetLauncher(BatchClusterAppMixin, PBSLauncher):
959 978 """Launch Engines using PBS"""
960 979 batch_file_name = Unicode(u'pbs_engines', config=True,
961 980 help="batch file name for the engine(s) job.")
962 981 default_template= Unicode(u"""#!/bin/sh
963 982 #PBS -V
964 983 #PBS -N ipengine
965 %s --profile-dir={profile_dir}
984 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
966 985 """%(' '.join(ipengine_cmd_argv)))
967 986
968 def start(self, n, profile_dir):
987 def start(self, n):
969 988 """Start n engines by profile or profile_dir."""
970 989 self.log.info('Starting %i engines with PBSEngineSetLauncher: %r' % (n, self.args))
971 return super(PBSEngineSetLauncher, self).start(n, profile_dir)
990 return super(PBSEngineSetLauncher, self).start(n)
972 991
973 992 #SGE is very similar to PBS
974 993
@@ -979,7 +998,7 b' class SGELauncher(PBSLauncher):'
979 998 queue_regexp = Unicode('#\$\W+-q\W+\$?\w+')
980 999 queue_template = Unicode('#$ -q {queue}')
981 1000
982 class SGEControllerLauncher(SGELauncher):
1001 class SGEControllerLauncher(BatchClusterAppMixin, SGELauncher):
983 1002 """Launch a controller using SGE."""
984 1003
985 1004 batch_file_name = Unicode(u'sge_controller', config=True,
@@ -987,28 +1006,28 b' class SGEControllerLauncher(SGELauncher):'
987 1006 default_template= Unicode(u"""#$ -V
988 1007 #$ -S /bin/sh
989 1008 #$ -N ipcontroller
990 %s --log-to-file --profile-dir={profile_dir}
1009 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
991 1010 """%(' '.join(ipcontroller_cmd_argv)))
992 1011
993 def start(self, profile_dir):
1012 def start(self):
994 1013 """Start the controller by profile or profile_dir."""
995 1014 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
996 return super(SGEControllerLauncher, self).start(1, profile_dir)
1015 return super(SGEControllerLauncher, self).start(1)
997 1016
998 class SGEEngineSetLauncher(SGELauncher):
1017 class SGEEngineSetLauncher(BatchClusterAppMixin, SGELauncher):
999 1018 """Launch Engines with SGE"""
1000 1019 batch_file_name = Unicode(u'sge_engines', config=True,
1001 1020 help="batch file name for the engine(s) job.")
1002 1021 default_template = Unicode("""#$ -V
1003 1022 #$ -S /bin/sh
1004 1023 #$ -N ipengine
1005 %s --profile-dir={profile_dir}
1024 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1006 1025 """%(' '.join(ipengine_cmd_argv)))
1007 1026
1008 def start(self, n, profile_dir):
1027 def start(self, n):
1009 1028 """Start n engines by profile or profile_dir."""
1010 1029 self.log.info('Starting %i engines with SGEEngineSetLauncher: %r' % (n, self.args))
1011 return super(SGEEngineSetLauncher, self).start(n, profile_dir)
1030 return super(SGEEngineSetLauncher, self).start(n)
1012 1031
1013 1032
1014 1033 # LSF launchers
@@ -1029,7 +1048,7 b' class LSFLauncher(BatchSystemLauncher):'
1029 1048 queue_regexp = Unicode('#BSUB[ \t]+-q[ \t]+\w+')
1030 1049 queue_template = Unicode('#BSUB -q {queue}')
1031 1050
1032 def start(self, n, profile_dir):
1051 def start(self, n):
1033 1052 """Start n copies of the process using LSF batch system.
1034 1053 This cant inherit from the base class because bsub expects
1035 1054 to be piped a shell script in order to honor the #BSUB directives :
@@ -1037,8 +1056,6 b' class LSFLauncher(BatchSystemLauncher):'
1037 1056 """
1038 1057 # Here we save profile_dir in the context so they
1039 1058 # can be used in the batch script template as {profile_dir}
1040 self.context['profile_dir'] = profile_dir
1041 self.profile_dir = unicode(profile_dir)
1042 1059 self.write_batch_script(n)
1043 1060 #output = check_output(self.args, env=os.environ)
1044 1061 piped_cmd = self.args[0]+'<\"'+self.args[1]+'\"'
@@ -1049,7 +1066,7 b' class LSFLauncher(BatchSystemLauncher):'
1049 1066 return job_id
1050 1067
1051 1068
1052 class LSFControllerLauncher(LSFLauncher):
1069 class LSFControllerLauncher(BatchClusterAppMixin, LSFLauncher):
1053 1070 """Launch a controller using LSF."""
1054 1071
1055 1072 batch_file_name = Unicode(u'lsf_controller', config=True,
@@ -1058,29 +1075,29 b' class LSFControllerLauncher(LSFLauncher):'
1058 1075 #BSUB -J ipcontroller
1059 1076 #BSUB -oo ipcontroller.o.%%J
1060 1077 #BSUB -eo ipcontroller.e.%%J
1061 %s --log-to-file --profile-dir={profile_dir}
1078 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1062 1079 """%(' '.join(ipcontroller_cmd_argv)))
1063 1080
1064 def start(self, profile_dir):
1081 def start(self):
1065 1082 """Start the controller by profile or profile_dir."""
1066 1083 self.log.info("Starting LSFControllerLauncher: %r" % self.args)
1067 return super(LSFControllerLauncher, self).start(1, profile_dir)
1084 return super(LSFControllerLauncher, self).start(1)
1068 1085
1069 1086
1070 class LSFEngineSetLauncher(LSFLauncher):
1087 class LSFEngineSetLauncher(BatchClusterAppMixin, LSFLauncher):
1071 1088 """Launch Engines using LSF"""
1072 1089 batch_file_name = Unicode(u'lsf_engines', config=True,
1073 1090 help="batch file name for the engine(s) job.")
1074 1091 default_template= Unicode(u"""#!/bin/sh
1075 1092 #BSUB -oo ipengine.o.%%J
1076 1093 #BSUB -eo ipengine.e.%%J
1077 %s --profile-dir={profile_dir}
1094 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1078 1095 """%(' '.join(ipengine_cmd_argv)))
1079 1096
1080 def start(self, n, profile_dir):
1097 def start(self, n):
1081 1098 """Start n engines by profile or profile_dir."""
1082 1099 self.log.info('Starting %i engines with LSFEngineSetLauncher: %r' % (n, self.args))
1083 return super(LSFEngineSetLauncher, self).start(n, profile_dir)
1100 return super(LSFEngineSetLauncher, self).start(n)
1084 1101
1085 1102
1086 1103 #-----------------------------------------------------------------------------
General Comments 0
You need to be logged in to leave comments. Login now