##// END OF EJS Templates
untwist PBS, WinHPC Launchers in newparallel
MinRK -
Show More
@@ -17,6 +17,7 b' The IPython cluster directory'
17
17
18 from __future__ import with_statement
18 from __future__ import with_statement
19
19
20 import re
20 import os
21 import os
21 import shutil
22 import shutil
22 import sys
23 import sys
@@ -469,7 +470,8 b' class ApplicationWithClusterDir(Application):'
469 if self.master_config.Global.clean_logs:
470 if self.master_config.Global.clean_logs:
470 log_dir = self.master_config.Global.log_dir
471 log_dir = self.master_config.Global.log_dir
471 for f in os.listdir(log_dir):
472 for f in os.listdir(log_dir):
472 if f.startswith(self.name + u'-') and f.endswith('.log'):
473 if re.match(r'%s-\d+\.(log|err|out)'%self.name,f):
474 # if f.startswith(self.name + u'-') and f.endswith('.log'):
473 os.remove(os.path.join(log_dir, f))
475 os.remove(os.path.join(log_dir, f))
474 # Start logging to the new log file
476 # Start logging to the new log file
475 if self.master_config.Global.log_to_file:
477 if self.master_config.Global.log_to_file:
@@ -15,6 +15,7 b' The ipcluster application.'
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 import re
18 import logging
19 import logging
19 import os
20 import os
20 import signal
21 import signal
@@ -412,13 +413,9 b' class IPClusterApp(ApplicationWithClusterDir):'
412 if self.master_config.Global.clean_logs:
413 if self.master_config.Global.clean_logs:
413 log_dir = self.master_config.Global.log_dir
414 log_dir = self.master_config.Global.log_dir
414 for f in os.listdir(log_dir):
415 for f in os.listdir(log_dir):
415 if f.startswith('ipengine' + '-'):
416 if re.match(r'ip(engine|controller)z-\d+\.(log|err|out)',f):
416 if f.endswith('.log') or f.endswith('.out') or f.endswith('.err'):
417 os.remove(os.path.join(log_dir, f))
417 os.remove(os.path.join(log_dir, f))
418 # This will remove old log files for ipcluster itself
418 if f.startswith('ipcontroller' + '-'):
419 if f.endswith('.log') or f.endswith('.out') or f.endswith('.err'):
420 os.remove(os.path.join(log_dir, f))
421 # This will remote old log files for ipcluster itself
422 super(IPClusterApp, self).start_logging()
419 super(IPClusterApp, self).start_logging()
423
420
424 def start_app(self):
421 def start_app(self):
This diff has been collapsed as it changes many lines, (584 lines changed) Show them Hide them
@@ -20,26 +20,44 b' import re'
20 import sys
20 import sys
21 import logging
21 import logging
22
22
23 from signal import SIGINT
23 from signal import SIGINT, SIGTERM
24 try:
24 try:
25 from signal import SIGKILL
25 from signal import SIGKILL
26 except ImportError:
26 except ImportError:
27 SIGKILL=SIGTERM
27 SIGKILL=SIGTERM
28
28
29 from subprocess import Popen, PIPE
29 from subprocess import Popen, PIPE, STDOUT
30 try:
31 from subprocess import check_open
32 except ImportError:
33 # pre-2.7:
34 from StringIO import StringIO
35
36 def check_open(*args, **kwargs):
37 sio = StringIO()
38 kwargs.update(dict(stdout=PIPE, stderr=STDOUT))
39 p = Popen(*args, **kwargs)
40 out,err = p.communicate()
41 return out
30
42
31 from zmq.eventloop import ioloop
43 from zmq.eventloop import ioloop
32
44
45 from IPython.external import Itpl
33 # from IPython.config.configurable import Configurable
46 # from IPython.config.configurable import Configurable
34 from IPython.utils.traitlets import Str, Int, List, Unicode, Instance
47 from IPython.utils.traitlets import Str, Int, List, Unicode, Instance
35 from IPython.utils.path import get_ipython_module_path
48 from IPython.utils.path import get_ipython_module_path
36 from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError
49 from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError
37
50
38 from factory import LoggingFactory
51 from factory import LoggingFactory
39 # from IPython.kernel.winhpcjob import (
52
40 # IPControllerTask, IPEngineTask,
53 # load winhpcjob from IPython.kernel
41 # IPControllerJob, IPEngineSetJob
54 try:
42 # )
55 from IPython.kernel.winhpcjob import (
56 IPControllerTask, IPEngineTask,
57 IPControllerJob, IPEngineSetJob
58 )
59 except ImportError:
60 pass
43
61
44
62
45 #-----------------------------------------------------------------------------
63 #-----------------------------------------------------------------------------
@@ -286,7 +304,7 b' class LocalControllerLauncher(LocalProcessLauncher):'
286
304
287 controller_cmd = List(ipcontroller_cmd_argv, config=True)
305 controller_cmd = List(ipcontroller_cmd_argv, config=True)
288 # Command line arguments to ipcontroller.
306 # Command line arguments to ipcontroller.
289 controller_args = List(['--log-to-file','--log-level', str(logging.ERROR)], config=True)
307 controller_args = List(['--log-to-file','--log-level', str(logging.INFO)], config=True)
290
308
291 def find_args(self):
309 def find_args(self):
292 return self.controller_cmd + self.controller_args
310 return self.controller_cmd + self.controller_args
@@ -305,7 +323,7 b' class LocalEngineLauncher(LocalProcessLauncher):'
305 engine_cmd = List(ipengine_cmd_argv, config=True)
323 engine_cmd = List(ipengine_cmd_argv, config=True)
306 # Command line arguments for ipengine.
324 # Command line arguments for ipengine.
307 engine_args = List(
325 engine_args = List(
308 ['--log-to-file','--log-level', str(logging.ERROR)], config=True
326 ['--log-to-file','--log-level', str(logging.INFO)], config=True
309 )
327 )
310
328
311 def find_args(self):
329 def find_args(self):
@@ -323,7 +341,7 b' class LocalEngineSetLauncher(BaseLauncher):'
323
341
324 # Command line arguments for ipengine.
342 # Command line arguments for ipengine.
325 engine_args = List(
343 engine_args = List(
326 ['--log-to-file','--log-level', str(logging.ERROR)], config=True
344 ['--log-to-file','--log-level', str(logging.INFO)], config=True
327 )
345 )
328 # launcher class
346 # launcher class
329 launcher_class = LocalEngineLauncher
347 launcher_class = LocalEngineLauncher
@@ -425,7 +443,7 b' class MPIExecControllerLauncher(MPIExecLauncher):'
425
443
426 controller_cmd = List(ipcontroller_cmd_argv, config=True)
444 controller_cmd = List(ipcontroller_cmd_argv, config=True)
427 # Command line arguments to ipcontroller.
445 # Command line arguments to ipcontroller.
428 controller_args = List(['--log-to-file','--log-level', str(logging.ERROR)], config=True)
446 controller_args = List(['--log-to-file','--log-level', str(logging.INFO)], config=True)
429 n = Int(1, config=False)
447 n = Int(1, config=False)
430
448
431 def start(self, cluster_dir):
449 def start(self, cluster_dir):
@@ -445,7 +463,7 b' class MPIExecEngineSetLauncher(MPIExecLauncher):'
445 engine_cmd = List(ipengine_cmd_argv, config=True)
463 engine_cmd = List(ipengine_cmd_argv, config=True)
446 # Command line arguments for ipengine.
464 # Command line arguments for ipengine.
447 engine_args = List(
465 engine_args = List(
448 ['--log-to-file','--log-level', str(logging.ERROR)], config=True
466 ['--log-to-file','--log-level', str(logging.INFO)], config=True
449 )
467 )
450 n = Int(1, config=True)
468 n = Int(1, config=True)
451
469
@@ -506,14 +524,14 b' class SSHControllerLauncher(SSHLauncher):'
506
524
507 program = List(ipcontroller_cmd_argv, config=True)
525 program = List(ipcontroller_cmd_argv, config=True)
508 # Command line arguments to ipcontroller.
526 # Command line arguments to ipcontroller.
509 program_args = List(['--log-to-file','--log-level', str(logging.ERROR)], config=True)
527 program_args = List(['--log-to-file','--log-level', str(logging.INFO)], config=True)
510
528
511
529
512 class SSHEngineLauncher(SSHLauncher):
530 class SSHEngineLauncher(SSHLauncher):
513 program = List(ipengine_cmd_argv, config=True)
531 program = List(ipengine_cmd_argv, config=True)
514 # Command line arguments for ipengine.
532 # Command line arguments for ipengine.
515 program_args = List(
533 program_args = List(
516 ['--log-to-file','--log-level', str(logging.ERROR)], config=True
534 ['--log-to-file','--log-level', str(logging.INFO)], config=True
517 )
535 )
518
536
519 class SSHEngineSetLauncher(LocalEngineSetLauncher):
537 class SSHEngineSetLauncher(LocalEngineSetLauncher):
@@ -525,279 +543,271 b' class SSHEngineSetLauncher(LocalEngineSetLauncher):'
525 #-----------------------------------------------------------------------------
543 #-----------------------------------------------------------------------------
526
544
527
545
528 # # This is only used on Windows.
546 # This is only used on Windows.
529 # def find_job_cmd():
547 def find_job_cmd():
530 # if os.name=='nt':
548 if os.name=='nt':
531 # try:
549 try:
532 # return find_cmd('job')
550 return find_cmd('job')
533 # except FindCmdError:
551 except FindCmdError:
534 # return 'job'
552 return 'job'
535 # else:
553 else:
536 # return 'job'
554 return 'job'
537 #
555
538 #
556
539 # class WindowsHPCLauncher(BaseLauncher):
557 class WindowsHPCLauncher(BaseLauncher):
540 #
558
541 # # A regular expression used to get the job id from the output of the
559 # A regular expression used to get the job id from the output of the
542 # # submit_command.
560 # submit_command.
543 # job_id_regexp = Str(r'\d+', config=True)
561 job_id_regexp = Str(r'\d+', config=True)
544 # # The filename of the instantiated job script.
562 # The filename of the instantiated job script.
545 # job_file_name = Unicode(u'ipython_job.xml', config=True)
563 job_file_name = Unicode(u'ipython_job.xml', config=True)
546 # # The full path to the instantiated job script. This gets made dynamically
564 # The full path to the instantiated job script. This gets made dynamically
547 # # by combining the work_dir with the job_file_name.
565 # by combining the work_dir with the job_file_name.
548 # job_file = Unicode(u'')
566 job_file = Unicode(u'')
549 # # The hostname of the scheduler to submit the job to
567 # The hostname of the scheduler to submit the job to
550 # scheduler = Str('', config=True)
568 scheduler = Str('', config=True)
551 # job_cmd = Str(find_job_cmd(), config=True)
569 job_cmd = Str(find_job_cmd(), config=True)
552 #
570
553 # def __init__(self, work_dir=u'.', config=None):
571 def __init__(self, work_dir=u'.', config=None):
554 # super(WindowsHPCLauncher, self).__init__(
572 super(WindowsHPCLauncher, self).__init__(
555 # work_dir=work_dir, config=config
573 work_dir=work_dir, config=config
556 # )
574 )
557 #
575
558 # @property
576 @property
559 # def job_file(self):
577 def job_file(self):
560 # return os.path.join(self.work_dir, self.job_file_name)
578 return os.path.join(self.work_dir, self.job_file_name)
561 #
579
562 # def write_job_file(self, n):
580 def write_job_file(self, n):
563 # raise NotImplementedError("Implement write_job_file in a subclass.")
581 raise NotImplementedError("Implement write_job_file in a subclass.")
564 #
582
565 # def find_args(self):
583 def find_args(self):
566 # return ['job.exe']
584 return ['job.exe']
567 #
585
568 # def parse_job_id(self, output):
586 def parse_job_id(self, output):
569 # """Take the output of the submit command and return the job id."""
587 """Take the output of the submit command and return the job id."""
570 # m = re.search(self.job_id_regexp, output)
588 m = re.search(self.job_id_regexp, output)
571 # if m is not None:
589 if m is not None:
572 # job_id = m.group()
590 job_id = m.group()
573 # else:
591 else:
574 # raise LauncherError("Job id couldn't be determined: %s" % output)
592 raise LauncherError("Job id couldn't be determined: %s" % output)
575 # self.job_id = job_id
593 self.job_id = job_id
576 # self.log.info('Job started with job id: %r' % job_id)
594 self.log.info('Job started with job id: %r' % job_id)
577 # return job_id
595 return job_id
578 #
596
579 # @inlineCallbacks
597 def start(self, n):
580 # def start(self, n):
598 """Start n copies of the process using the Win HPC job scheduler."""
581 # """Start n copies of the process using the Win HPC job scheduler."""
599 self.write_job_file(n)
582 # self.write_job_file(n)
600 args = [
583 # args = [
601 'submit',
584 # 'submit',
602 '/jobfile:%s' % self.job_file,
585 # '/jobfile:%s' % self.job_file,
603 '/scheduler:%s' % self.scheduler
586 # '/scheduler:%s' % self.scheduler
604 ]
587 # ]
605 self.log.info("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
588 # self.log.info("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
606 # Twisted will raise DeprecationWarnings if we try to pass unicode to this
589 # # Twisted will raise DeprecationWarnings if we try to pass unicode to this
607 output = check_output([self.job_cmd]+args,
590 # output = yield getProcessOutput(str(self.job_cmd),
608 env=os.environ,
591 # [str(a) for a in args],
609 cwd=self.work_dir,
592 # env=dict((str(k),str(v)) for k,v in os.environ.items()),
610 stderr=STDOUT
593 # path=self.work_dir
611 )
594 # )
612 job_id = self.parse_job_id(output)
595 # job_id = self.parse_job_id(output)
613 # self.notify_start(job_id)
596 # self.notify_start(job_id)
614 return job_id
597 # defer.returnValue(job_id)
615
598 #
616 def stop(self):
599 # @inlineCallbacks
617 args = [
600 # def stop(self):
618 'cancel',
601 # args = [
619 self.job_id,
602 # 'cancel',
620 '/scheduler:%s' % self.scheduler
603 # self.job_id,
621 ]
604 # '/scheduler:%s' % self.scheduler
622 self.log.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
605 # ]
623 try:
606 # self.log.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
624 output = check_output([self.job_cmd]+args,
607 # try:
625 env=os.environ,
608 # # Twisted will raise DeprecationWarnings if we try to pass unicode to this
626 cwd=self.work_dir,
609 # output = yield getProcessOutput(str(self.job_cmd),
627 stderr=STDOUT
610 # [str(a) for a in args],
628 )
611 # env=dict((str(k),str(v)) for k,v in os.environ.iteritems()),
629 except:
612 # path=self.work_dir
630 output = 'The job already appears to be stoppped: %r' % self.job_id
613 # )
631 self.notify_stop(output) # Pass the output of the kill cmd
614 # except:
632 return output
615 # output = 'The job already appears to be stoppped: %r' % self.job_id
633
616 # self.notify_stop(output) # Pass the output of the kill cmd
634
617 # defer.returnValue(output)
635 class WindowsHPCControllerLauncher(WindowsHPCLauncher):
618 #
636
619 #
637 job_file_name = Unicode(u'ipcontroller_job.xml', config=True)
620 # class WindowsHPCControllerLauncher(WindowsHPCLauncher):
638 extra_args = List([], config=False)
621 #
639
622 # job_file_name = Unicode(u'ipcontroller_job.xml', config=True)
640 def write_job_file(self, n):
623 # extra_args = List([], config=False)
641 job = IPControllerJob(config=self.config)
624 #
642
625 # def write_job_file(self, n):
643 t = IPControllerTask(config=self.config)
626 # job = IPControllerJob(config=self.config)
644 # The tasks work directory is *not* the actual work directory of
627 #
645 # the controller. It is used as the base path for the stdout/stderr
628 # t = IPControllerTask(config=self.config)
646 # files that the scheduler redirects to.
629 # # The tasks work directory is *not* the actual work directory of
647 t.work_directory = self.cluster_dir
630 # # the controller. It is used as the base path for the stdout/stderr
648 # Add the --cluster-dir and from self.start().
631 # # files that the scheduler redirects to.
649 t.controller_args.extend(self.extra_args)
632 # t.work_directory = self.cluster_dir
650 job.add_task(t)
633 # # Add the --cluster-dir and from self.start().
651
634 # t.controller_args.extend(self.extra_args)
652 self.log.info("Writing job description file: %s" % self.job_file)
635 # job.add_task(t)
653 job.write(self.job_file)
636 #
654
637 # self.log.info("Writing job description file: %s" % self.job_file)
655 @property
638 # job.write(self.job_file)
656 def job_file(self):
639 #
657 return os.path.join(self.cluster_dir, self.job_file_name)
640 # @property
658
641 # def job_file(self):
659 def start(self, cluster_dir):
642 # return os.path.join(self.cluster_dir, self.job_file_name)
660 """Start the controller by cluster_dir."""
643 #
661 self.extra_args = ['--cluster-dir', cluster_dir]
644 # def start(self, cluster_dir):
662 self.cluster_dir = unicode(cluster_dir)
645 # """Start the controller by cluster_dir."""
663 return super(WindowsHPCControllerLauncher, self).start(1)
646 # self.extra_args = ['--cluster-dir', cluster_dir]
664
647 # self.cluster_dir = unicode(cluster_dir)
665
648 # return super(WindowsHPCControllerLauncher, self).start(1)
666 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):
649 #
667
650 #
668 job_file_name = Unicode(u'ipengineset_job.xml', config=True)
651 # class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):
669 extra_args = List([], config=False)
652 #
670
653 # job_file_name = Unicode(u'ipengineset_job.xml', config=True)
671 def write_job_file(self, n):
654 # extra_args = List([], config=False)
672 job = IPEngineSetJob(config=self.config)
655 #
673
656 # def write_job_file(self, n):
674 for i in range(n):
657 # job = IPEngineSetJob(config=self.config)
675 t = IPEngineTask(config=self.config)
658 #
676 # The tasks work directory is *not* the actual work directory of
659 # for i in range(n):
677 # the engine. It is used as the base path for the stdout/stderr
660 # t = IPEngineTask(config=self.config)
678 # files that the scheduler redirects to.
661 # # The tasks work directory is *not* the actual work directory of
679 t.work_directory = self.cluster_dir
662 # # the engine. It is used as the base path for the stdout/stderr
680 # Add the --cluster-dir and from self.start().
663 # # files that the scheduler redirects to.
681 t.engine_args.extend(self.extra_args)
664 # t.work_directory = self.cluster_dir
682 job.add_task(t)
665 # # Add the --cluster-dir and from self.start().
683
666 # t.engine_args.extend(self.extra_args)
684 self.log.info("Writing job description file: %s" % self.job_file)
667 # job.add_task(t)
685 job.write(self.job_file)
668 #
686
669 # self.log.info("Writing job description file: %s" % self.job_file)
687 @property
670 # job.write(self.job_file)
688 def job_file(self):
671 #
689 return os.path.join(self.cluster_dir, self.job_file_name)
672 # @property
690
673 # def job_file(self):
691 def start(self, n, cluster_dir):
674 # return os.path.join(self.cluster_dir, self.job_file_name)
692 """Start the controller by cluster_dir."""
675 #
693 self.extra_args = ['--cluster-dir', cluster_dir]
676 # def start(self, n, cluster_dir):
694 self.cluster_dir = unicode(cluster_dir)
677 # """Start the controller by cluster_dir."""
695 return super(WindowsHPCEngineSetLauncher, self).start(n)
678 # self.extra_args = ['--cluster-dir', cluster_dir]
696
679 # self.cluster_dir = unicode(cluster_dir)
697
680 # return super(WindowsHPCEngineSetLauncher, self).start(n)
698 #-----------------------------------------------------------------------------
681 #
699 # Batch (PBS) system launchers
682 #
700 #-----------------------------------------------------------------------------
683 # #-----------------------------------------------------------------------------
701
684 # # Batch (PBS) system launchers
702 # TODO: Get PBS launcher working again.
685 # #-----------------------------------------------------------------------------
703
686 #
704 class BatchSystemLauncher(BaseLauncher):
687 # # TODO: Get PBS launcher working again.
705 """Launch an external process using a batch system.
688 #
706
689 # class BatchSystemLauncher(BaseLauncher):
707 This class is designed to work with UNIX batch systems like PBS, LSF,
690 # """Launch an external process using a batch system.
708 GridEngine, etc. The overall model is that there are different commands
691 #
709 like qsub, qdel, etc. that handle the starting and stopping of the process.
692 # This class is designed to work with UNIX batch systems like PBS, LSF,
710
693 # GridEngine, etc. The overall model is that there are different commands
711 This class also has the notion of a batch script. The ``batch_template``
694 # like qsub, qdel, etc. that handle the starting and stopping of the process.
712 attribute can be set to a string that is a template for the batch script.
695 #
713 This template is instantiated using Itpl. Thus the template can use
696 # This class also has the notion of a batch script. The ``batch_template``
714 ${n} fot the number of instances. Subclasses can add additional variables
697 # attribute can be set to a string that is a template for the batch script.
715 to the template dict.
698 # This template is instantiated using Itpl. Thus the template can use
716 """
699 # ${n} fot the number of instances. Subclasses can add additional variables
717
700 # to the template dict.
718 # Subclasses must fill these in. See PBSEngineSet
701 # """
719 # The name of the command line program used to submit jobs.
702 #
720 submit_command = Str('', config=True)
703 # # Subclasses must fill these in. See PBSEngineSet
721 # The name of the command line program used to delete jobs.
704 # # The name of the command line program used to submit jobs.
722 delete_command = Str('', config=True)
705 # submit_command = Str('', config=True)
723 # A regular expression used to get the job id from the output of the
706 # # The name of the command line program used to delete jobs.
724 # submit_command.
707 # delete_command = Str('', config=True)
725 job_id_regexp = Str('', config=True)
708 # # A regular expression used to get the job id from the output of the
726 # The string that is the batch script template itself.
709 # # submit_command.
727 batch_template = Str('', config=True)
710 # job_id_regexp = Str('', config=True)
728 # The filename of the instantiated batch script.
711 # # The string that is the batch script template itself.
729 batch_file_name = Unicode(u'batch_script', config=True)
712 # batch_template = Str('', config=True)
730 # The full path to the instantiated batch script.
713 # # The filename of the instantiated batch script.
731 batch_file = Unicode(u'')
714 # batch_file_name = Unicode(u'batch_script', config=True)
732
715 # # The full path to the instantiated batch script.
733 def __init__(self, work_dir=u'.', config=None):
716 # batch_file = Unicode(u'')
734 super(BatchSystemLauncher, self).__init__(
717 #
735 work_dir=work_dir, config=config
718 # def __init__(self, work_dir=u'.', config=None):
736 )
719 # super(BatchSystemLauncher, self).__init__(
737 self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
720 # work_dir=work_dir, config=config
738 self.context = {}
721 # )
739
722 # self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
740 def parse_job_id(self, output):
723 # self.context = {}
741 """Take the output of the submit command and return the job id."""
724 #
742 m = re.match(self.job_id_regexp, output)
725 # def parse_job_id(self, output):
743 if m is not None:
726 # """Take the output of the submit command and return the job id."""
744 job_id = m.group()
727 # m = re.match(self.job_id_regexp, output)
745 else:
728 # if m is not None:
746 raise LauncherError("Job id couldn't be determined: %s" % output)
729 # job_id = m.group()
747 self.job_id = job_id
730 # else:
748 self.log.info('Job started with job id: %r' % job_id)
731 # raise LauncherError("Job id couldn't be determined: %s" % output)
749 return job_id
732 # self.job_id = job_id
750
733 # self.log.info('Job started with job id: %r' % job_id)
751 def write_batch_script(self, n):
734 # return job_id
752 """Instantiate and write the batch script to the work_dir."""
735 #
753 self.context['n'] = n
736 # def write_batch_script(self, n):
754 script_as_string = Itpl.itplns(self.batch_template, self.context)
737 # """Instantiate and write the batch script to the work_dir."""
755 self.log.info('Writing instantiated batch script: %s' % self.batch_file)
738 # self.context['n'] = n
756 f = open(self.batch_file, 'w')
739 # script_as_string = Itpl.itplns(self.batch_template, self.context)
757 f.write(script_as_string)
740 # self.log.info('Writing instantiated batch script: %s' % self.batch_file)
758 f.close()
741 # f = open(self.batch_file, 'w')
759
742 # f.write(script_as_string)
760 def start(self, n):
743 # f.close()
761 """Start n copies of the process using a batch system."""
744 #
762 self.write_batch_script(n)
745 # @inlineCallbacks
763 output = check_output([self.submit_command, self.batch_file], env=os.environ, stdout=STDOUT)
746 # def start(self, n):
764 job_id = self.parse_job_id(output)
747 # """Start n copies of the process using a batch system."""
765 # self.notify_start(job_id)
748 # self.write_batch_script(n)
766 return job_id
749 # output = yield getProcessOutput(self.submit_command,
767
750 # [self.batch_file], env=os.environ)
768 def stop(self):
751 # job_id = self.parse_job_id(output)
769 output = Popen([self.delete_command, self.job_id], env=os.environ, stderr=STDOUT)
752 # self.notify_start(job_id)
770 self.notify_stop(output) # Pass the output of the kill cmd
753 # defer.returnValue(job_id)
771 return output
754 #
772
755 # @inlineCallbacks
773
756 # def stop(self):
774 class PBSLauncher(BatchSystemLauncher):
757 # output = yield getProcessOutput(self.delete_command,
775 """A BatchSystemLauncher subclass for PBS."""
758 # [self.job_id], env=os.environ
776
759 # )
777 submit_command = Str('qsub', config=True)
760 # self.notify_stop(output) # Pass the output of the kill cmd
778 delete_command = Str('qdel', config=True)
761 # defer.returnValue(output)
779 job_id_regexp = Str(r'\d+', config=True)
762 #
780 batch_template = Str('', config=True)
763 #
781 batch_file_name = Unicode(u'pbs_batch_script', config=True)
764 # class PBSLauncher(BatchSystemLauncher):
782 batch_file = Unicode(u'')
765 # """A BatchSystemLauncher subclass for PBS."""
783
766 #
784
767 # submit_command = Str('qsub', config=True)
785 class PBSControllerLauncher(PBSLauncher):
768 # delete_command = Str('qdel', config=True)
786 """Launch a controller using PBS."""
769 # job_id_regexp = Str(r'\d+', config=True)
787
770 # batch_template = Str('', config=True)
788 batch_file_name = Unicode(u'pbs_batch_script_controller', config=True)
771 # batch_file_name = Unicode(u'pbs_batch_script', config=True)
789
772 # batch_file = Unicode(u'')
790 def start(self, cluster_dir):
773 #
791 """Start the controller by profile or cluster_dir."""
774 #
792 # Here we save profile and cluster_dir in the context so they
775 # class PBSControllerLauncher(PBSLauncher):
793 # can be used in the batch script template as ${profile} and
776 # """Launch a controller using PBS."""
794 # ${cluster_dir}
777 #
795 self.context['cluster_dir'] = cluster_dir
778 # batch_file_name = Unicode(u'pbs_batch_script_controller', config=True)
796 self.cluster_dir = unicode(cluster_dir)
779 #
797 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
780 # def start(self, cluster_dir):
798 return super(PBSControllerLauncher, self).start(1)
781 # """Start the controller by profile or cluster_dir."""
799
782 # # Here we save profile and cluster_dir in the context so they
800
783 # # can be used in the batch script template as ${profile} and
801 class PBSEngineSetLauncher(PBSLauncher):
784 # # ${cluster_dir}
802
785 # self.context['cluster_dir'] = cluster_dir
803 batch_file_name = Unicode(u'pbs_batch_script_engines', config=True)
786 # self.cluster_dir = unicode(cluster_dir)
804
787 # self.log.info("Starting PBSControllerLauncher: %r" % self.args)
805 def start(self, n, cluster_dir):
788 # return super(PBSControllerLauncher, self).start(1)
806 """Start n engines by profile or cluster_dir."""
789 #
807 self.program_args.extend(['--cluster-dir', cluster_dir])
790 #
808 self.cluster_dir = unicode(cluster_dir)
791 # class PBSEngineSetLauncher(PBSLauncher):
809 self.log.info('Starting PBSEngineSetLauncher: %r' % self.args)
792 #
810 return super(PBSEngineSetLauncher, self).start(n)
793 # batch_file_name = Unicode(u'pbs_batch_script_engines', config=True)
794 #
795 # def start(self, n, cluster_dir):
796 # """Start n engines by profile or cluster_dir."""
797 # self.program_args.extend(['--cluster-dir', cluster_dir])
798 # self.cluster_dir = unicode(cluster_dir)
799 # self.log.info('Starting PBSEngineSetLauncher: %r' % self.args)
800 # return super(PBSEngineSetLauncher, self).start(n)
801
811
802
812
803 #-----------------------------------------------------------------------------
813 #-----------------------------------------------------------------------------
@@ -811,7 +821,7 b' class IPClusterLauncher(LocalProcessLauncher):'
811 ipcluster_cmd = List(ipcluster_cmd_argv, config=True)
821 ipcluster_cmd = List(ipcluster_cmd_argv, config=True)
812 # Command line arguments to pass to ipcluster.
822 # Command line arguments to pass to ipcluster.
813 ipcluster_args = List(
823 ipcluster_args = List(
814 ['--clean-logs', '--log-to-file', '--log-level', str(logging.ERROR)], config=True)
824 ['--clean-logs', '--log-to-file', '--log-level', str(logging.INFO)], config=True)
815 ipcluster_subcommand = Str('start')
825 ipcluster_subcommand = Str('start')
816 ipcluster_n = Int(2)
826 ipcluster_n = Int(2)
817
827
General Comments 0
You need to be logged in to leave comments. Login now