Show More
@@ -17,6 +17,7 b' The IPython cluster directory' | |||||
17 |
|
17 | |||
18 | from __future__ import with_statement |
|
18 | from __future__ import with_statement | |
19 |
|
19 | |||
|
20 | import re | |||
20 | import os |
|
21 | import os | |
21 | import shutil |
|
22 | import shutil | |
22 | import sys |
|
23 | import sys | |
@@ -469,7 +470,8 b' class ApplicationWithClusterDir(Application):' | |||||
469 | if self.master_config.Global.clean_logs: |
|
470 | if self.master_config.Global.clean_logs: | |
470 | log_dir = self.master_config.Global.log_dir |
|
471 | log_dir = self.master_config.Global.log_dir | |
471 | for f in os.listdir(log_dir): |
|
472 | for f in os.listdir(log_dir): | |
472 | if f.startswith(self.name + u'-') and f.endswith('.log'): |
|
473 | if re.match(r'%s-\d+\.(log|err|out)'%self.name,f): | |
|
474 | # if f.startswith(self.name + u'-') and f.endswith('.log'): | |||
473 | os.remove(os.path.join(log_dir, f)) |
|
475 | os.remove(os.path.join(log_dir, f)) | |
474 | # Start logging to the new log file |
|
476 | # Start logging to the new log file | |
475 | if self.master_config.Global.log_to_file: |
|
477 | if self.master_config.Global.log_to_file: |
@@ -15,6 +15,7 b' The ipcluster application.' | |||||
15 | # Imports |
|
15 | # Imports | |
16 | #----------------------------------------------------------------------------- |
|
16 | #----------------------------------------------------------------------------- | |
17 |
|
17 | |||
|
18 | import re | |||
18 | import logging |
|
19 | import logging | |
19 | import os |
|
20 | import os | |
20 | import signal |
|
21 | import signal | |
@@ -412,13 +413,9 b' class IPClusterApp(ApplicationWithClusterDir):' | |||||
412 | if self.master_config.Global.clean_logs: |
|
413 | if self.master_config.Global.clean_logs: | |
413 | log_dir = self.master_config.Global.log_dir |
|
414 | log_dir = self.master_config.Global.log_dir | |
414 | for f in os.listdir(log_dir): |
|
415 | for f in os.listdir(log_dir): | |
415 | if f.startswith('ipengine' + '-'): |
|
416 | if re.match(r'ip(engine|controller)z-\d+\.(log|err|out)',f): | |
416 | if f.endswith('.log') or f.endswith('.out') or f.endswith('.err'): |
|
417 | os.remove(os.path.join(log_dir, f)) | |
417 | os.remove(os.path.join(log_dir, f)) |
|
418 | # This will remove old log files for ipcluster itself | |
418 | if f.startswith('ipcontroller' + '-'): |
|
|||
419 | if f.endswith('.log') or f.endswith('.out') or f.endswith('.err'): |
|
|||
420 | os.remove(os.path.join(log_dir, f)) |
|
|||
421 | # This will remote old log files for ipcluster itself |
|
|||
422 | super(IPClusterApp, self).start_logging() |
|
419 | super(IPClusterApp, self).start_logging() | |
423 |
|
420 | |||
424 | def start_app(self): |
|
421 | def start_app(self): |
This diff has been collapsed as it changes many lines, (584 lines changed) Show them Hide them | |||||
@@ -20,26 +20,44 b' import re' | |||||
20 | import sys |
|
20 | import sys | |
21 | import logging |
|
21 | import logging | |
22 |
|
22 | |||
23 | from signal import SIGINT |
|
23 | from signal import SIGINT, SIGTERM | |
24 | try: |
|
24 | try: | |
25 | from signal import SIGKILL |
|
25 | from signal import SIGKILL | |
26 | except ImportError: |
|
26 | except ImportError: | |
27 | SIGKILL=SIGTERM |
|
27 | SIGKILL=SIGTERM | |
28 |
|
28 | |||
29 | from subprocess import Popen, PIPE |
|
29 | from subprocess import Popen, PIPE, STDOUT | |
|
30 | try: | |||
|
31 | from subprocess import check_open | |||
|
32 | except ImportError: | |||
|
33 | # pre-2.7: | |||
|
34 | from StringIO import StringIO | |||
|
35 | ||||
|
36 | def check_open(*args, **kwargs): | |||
|
37 | sio = StringIO() | |||
|
38 | kwargs.update(dict(stdout=PIPE, stderr=STDOUT)) | |||
|
39 | p = Popen(*args, **kwargs) | |||
|
40 | out,err = p.communicate() | |||
|
41 | return out | |||
30 |
|
42 | |||
31 | from zmq.eventloop import ioloop |
|
43 | from zmq.eventloop import ioloop | |
32 |
|
44 | |||
|
45 | from IPython.external import Itpl | |||
33 | # from IPython.config.configurable import Configurable |
|
46 | # from IPython.config.configurable import Configurable | |
34 | from IPython.utils.traitlets import Str, Int, List, Unicode, Instance |
|
47 | from IPython.utils.traitlets import Str, Int, List, Unicode, Instance | |
35 | from IPython.utils.path import get_ipython_module_path |
|
48 | from IPython.utils.path import get_ipython_module_path | |
36 | from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError |
|
49 | from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError | |
37 |
|
50 | |||
38 | from factory import LoggingFactory |
|
51 | from factory import LoggingFactory | |
39 | # from IPython.kernel.winhpcjob import ( |
|
52 | ||
40 | # IPControllerTask, IPEngineTask, |
|
53 | # load winhpcjob from IPython.kernel | |
41 | # IPControllerJob, IPEngineSetJob |
|
54 | try: | |
42 | # ) |
|
55 | from IPython.kernel.winhpcjob import ( | |
|
56 | IPControllerTask, IPEngineTask, | |||
|
57 | IPControllerJob, IPEngineSetJob | |||
|
58 | ) | |||
|
59 | except ImportError: | |||
|
60 | pass | |||
43 |
|
61 | |||
44 |
|
62 | |||
45 | #----------------------------------------------------------------------------- |
|
63 | #----------------------------------------------------------------------------- | |
@@ -286,7 +304,7 b' class LocalControllerLauncher(LocalProcessLauncher):' | |||||
286 |
|
304 | |||
287 | controller_cmd = List(ipcontroller_cmd_argv, config=True) |
|
305 | controller_cmd = List(ipcontroller_cmd_argv, config=True) | |
288 | # Command line arguments to ipcontroller. |
|
306 | # Command line arguments to ipcontroller. | |
289 |
controller_args = List(['--log-to-file','--log-level', str(logging. |
|
307 | controller_args = List(['--log-to-file','--log-level', str(logging.INFO)], config=True) | |
290 |
|
308 | |||
291 | def find_args(self): |
|
309 | def find_args(self): | |
292 | return self.controller_cmd + self.controller_args |
|
310 | return self.controller_cmd + self.controller_args | |
@@ -305,7 +323,7 b' class LocalEngineLauncher(LocalProcessLauncher):' | |||||
305 | engine_cmd = List(ipengine_cmd_argv, config=True) |
|
323 | engine_cmd = List(ipengine_cmd_argv, config=True) | |
306 | # Command line arguments for ipengine. |
|
324 | # Command line arguments for ipengine. | |
307 | engine_args = List( |
|
325 | engine_args = List( | |
308 |
['--log-to-file','--log-level', str(logging. |
|
326 | ['--log-to-file','--log-level', str(logging.INFO)], config=True | |
309 | ) |
|
327 | ) | |
310 |
|
328 | |||
311 | def find_args(self): |
|
329 | def find_args(self): | |
@@ -323,7 +341,7 b' class LocalEngineSetLauncher(BaseLauncher):' | |||||
323 |
|
341 | |||
324 | # Command line arguments for ipengine. |
|
342 | # Command line arguments for ipengine. | |
325 | engine_args = List( |
|
343 | engine_args = List( | |
326 |
['--log-to-file','--log-level', str(logging. |
|
344 | ['--log-to-file','--log-level', str(logging.INFO)], config=True | |
327 | ) |
|
345 | ) | |
328 | # launcher class |
|
346 | # launcher class | |
329 | launcher_class = LocalEngineLauncher |
|
347 | launcher_class = LocalEngineLauncher | |
@@ -425,7 +443,7 b' class MPIExecControllerLauncher(MPIExecLauncher):' | |||||
425 |
|
443 | |||
426 | controller_cmd = List(ipcontroller_cmd_argv, config=True) |
|
444 | controller_cmd = List(ipcontroller_cmd_argv, config=True) | |
427 | # Command line arguments to ipcontroller. |
|
445 | # Command line arguments to ipcontroller. | |
428 |
controller_args = List(['--log-to-file','--log-level', str(logging. |
|
446 | controller_args = List(['--log-to-file','--log-level', str(logging.INFO)], config=True) | |
429 | n = Int(1, config=False) |
|
447 | n = Int(1, config=False) | |
430 |
|
448 | |||
431 | def start(self, cluster_dir): |
|
449 | def start(self, cluster_dir): | |
@@ -445,7 +463,7 b' class MPIExecEngineSetLauncher(MPIExecLauncher):' | |||||
445 | engine_cmd = List(ipengine_cmd_argv, config=True) |
|
463 | engine_cmd = List(ipengine_cmd_argv, config=True) | |
446 | # Command line arguments for ipengine. |
|
464 | # Command line arguments for ipengine. | |
447 | engine_args = List( |
|
465 | engine_args = List( | |
448 |
['--log-to-file','--log-level', str(logging. |
|
466 | ['--log-to-file','--log-level', str(logging.INFO)], config=True | |
449 | ) |
|
467 | ) | |
450 | n = Int(1, config=True) |
|
468 | n = Int(1, config=True) | |
451 |
|
469 | |||
@@ -506,14 +524,14 b' class SSHControllerLauncher(SSHLauncher):' | |||||
506 |
|
524 | |||
507 | program = List(ipcontroller_cmd_argv, config=True) |
|
525 | program = List(ipcontroller_cmd_argv, config=True) | |
508 | # Command line arguments to ipcontroller. |
|
526 | # Command line arguments to ipcontroller. | |
509 |
program_args = List(['--log-to-file','--log-level', str(logging. |
|
527 | program_args = List(['--log-to-file','--log-level', str(logging.INFO)], config=True) | |
510 |
|
528 | |||
511 |
|
529 | |||
512 | class SSHEngineLauncher(SSHLauncher): |
|
530 | class SSHEngineLauncher(SSHLauncher): | |
513 | program = List(ipengine_cmd_argv, config=True) |
|
531 | program = List(ipengine_cmd_argv, config=True) | |
514 | # Command line arguments for ipengine. |
|
532 | # Command line arguments for ipengine. | |
515 | program_args = List( |
|
533 | program_args = List( | |
516 |
['--log-to-file','--log-level', str(logging. |
|
534 | ['--log-to-file','--log-level', str(logging.INFO)], config=True | |
517 | ) |
|
535 | ) | |
518 |
|
536 | |||
519 | class SSHEngineSetLauncher(LocalEngineSetLauncher): |
|
537 | class SSHEngineSetLauncher(LocalEngineSetLauncher): | |
@@ -525,279 +543,271 b' class SSHEngineSetLauncher(LocalEngineSetLauncher):' | |||||
525 | #----------------------------------------------------------------------------- |
|
543 | #----------------------------------------------------------------------------- | |
526 |
|
544 | |||
527 |
|
545 | |||
528 |
|
|
546 | # This is only used on Windows. | |
529 |
|
|
547 | def find_job_cmd(): | |
530 |
|
|
548 | if os.name=='nt': | |
531 |
|
|
549 | try: | |
532 |
|
|
550 | return find_cmd('job') | |
533 |
|
|
551 | except FindCmdError: | |
534 |
|
|
552 | return 'job' | |
535 |
|
|
553 | else: | |
536 |
|
|
554 | return 'job' | |
537 | # |
|
555 | ||
538 | # |
|
556 | ||
539 |
|
|
557 | class WindowsHPCLauncher(BaseLauncher): | |
540 | # |
|
558 | ||
541 |
|
|
559 | # A regular expression used to get the job id from the output of the | |
542 |
|
|
560 | # submit_command. | |
543 |
|
|
561 | job_id_regexp = Str(r'\d+', config=True) | |
544 |
|
|
562 | # The filename of the instantiated job script. | |
545 |
|
|
563 | job_file_name = Unicode(u'ipython_job.xml', config=True) | |
546 |
|
|
564 | # The full path to the instantiated job script. This gets made dynamically | |
547 |
|
|
565 | # by combining the work_dir with the job_file_name. | |
548 |
|
|
566 | job_file = Unicode(u'') | |
549 |
|
|
567 | # The hostname of the scheduler to submit the job to | |
550 |
|
|
568 | scheduler = Str('', config=True) | |
551 |
|
|
569 | job_cmd = Str(find_job_cmd(), config=True) | |
552 | # |
|
570 | ||
553 |
|
|
571 | def __init__(self, work_dir=u'.', config=None): | |
554 |
|
|
572 | super(WindowsHPCLauncher, self).__init__( | |
555 |
|
|
573 | work_dir=work_dir, config=config | |
556 |
|
|
574 | ) | |
557 | # |
|
575 | ||
558 |
|
|
576 | @property | |
559 |
|
|
577 | def job_file(self): | |
560 |
|
|
578 | return os.path.join(self.work_dir, self.job_file_name) | |
561 | # |
|
579 | ||
562 |
|
|
580 | def write_job_file(self, n): | |
563 |
|
|
581 | raise NotImplementedError("Implement write_job_file in a subclass.") | |
564 | # |
|
582 | ||
565 |
|
|
583 | def find_args(self): | |
566 |
|
|
584 | return ['job.exe'] | |
567 |
|
|
585 | ||
568 |
|
|
586 | def parse_job_id(self, output): | |
569 |
|
|
587 | """Take the output of the submit command and return the job id.""" | |
570 |
|
|
588 | m = re.search(self.job_id_regexp, output) | |
571 |
|
|
589 | if m is not None: | |
572 |
|
|
590 | job_id = m.group() | |
573 |
|
|
591 | else: | |
574 |
|
|
592 | raise LauncherError("Job id couldn't be determined: %s" % output) | |
575 |
|
|
593 | self.job_id = job_id | |
576 |
|
|
594 | self.log.info('Job started with job id: %r' % job_id) | |
577 |
|
|
595 | return job_id | |
578 | # |
|
596 | ||
579 | # @inlineCallbacks |
|
597 | def start(self, n): | |
580 | # def start(self, n): |
|
598 | """Start n copies of the process using the Win HPC job scheduler.""" | |
581 | # """Start n copies of the process using the Win HPC job scheduler.""" |
|
599 | self.write_job_file(n) | |
582 | # self.write_job_file(n) |
|
600 | args = [ | |
583 | # args = [ |
|
601 | 'submit', | |
584 | # 'submit', |
|
602 | '/jobfile:%s' % self.job_file, | |
585 |
|
|
603 | '/scheduler:%s' % self.scheduler | |
586 | # '/scheduler:%s' % self.scheduler |
|
604 | ] | |
587 | # ] |
|
605 | self.log.info("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),)) | |
588 | # self.log.info("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),)) |
|
606 | # Twisted will raise DeprecationWarnings if we try to pass unicode to this | |
589 | # # Twisted will raise DeprecationWarnings if we try to pass unicode to this |
|
607 | output = check_output([self.job_cmd]+args, | |
590 | # output = yield getProcessOutput(str(self.job_cmd), |
|
608 | env=os.environ, | |
591 | # [str(a) for a in args], |
|
609 | cwd=self.work_dir, | |
592 | # env=dict((str(k),str(v)) for k,v in os.environ.items()), |
|
610 | stderr=STDOUT | |
593 | # path=self.work_dir |
|
611 | ) | |
594 | # ) |
|
612 | job_id = self.parse_job_id(output) | |
595 | # job_id = self.parse_job_id(output) |
|
613 | # self.notify_start(job_id) | |
596 | # self.notify_start(job_id) |
|
614 | return job_id | |
597 | # defer.returnValue(job_id) |
|
615 | ||
598 | # |
|
616 | def stop(self): | |
599 | # @inlineCallbacks |
|
617 | args = [ | |
600 | # def stop(self): |
|
618 | 'cancel', | |
601 | # args = [ |
|
619 | self.job_id, | |
602 | # 'cancel', |
|
620 | '/scheduler:%s' % self.scheduler | |
603 | # self.job_id, |
|
621 | ] | |
604 | # '/scheduler:%s' % self.scheduler |
|
622 | self.log.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),)) | |
605 |
|
|
623 | try: | |
606 | # self.log.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),)) |
|
624 | output = check_output([self.job_cmd]+args, | |
607 | # try: |
|
625 | env=os.environ, | |
608 | # # Twisted will raise DeprecationWarnings if we try to pass unicode to this |
|
626 | cwd=self.work_dir, | |
609 | # output = yield getProcessOutput(str(self.job_cmd), |
|
627 | stderr=STDOUT | |
610 | # [str(a) for a in args], |
|
628 | ) | |
611 | # env=dict((str(k),str(v)) for k,v in os.environ.iteritems()), |
|
629 | except: | |
612 | # path=self.work_dir |
|
630 | output = 'The job already appears to be stoppped: %r' % self.job_id | |
613 | # ) |
|
631 | self.notify_stop(output) # Pass the output of the kill cmd | |
614 | # except: |
|
632 | return output | |
615 | # output = 'The job already appears to be stoppped: %r' % self.job_id |
|
633 | ||
616 | # self.notify_stop(output) # Pass the output of the kill cmd |
|
634 | ||
617 | # defer.returnValue(output) |
|
635 | class WindowsHPCControllerLauncher(WindowsHPCLauncher): | |
618 | # |
|
636 | ||
619 | # |
|
637 | job_file_name = Unicode(u'ipcontroller_job.xml', config=True) | |
620 | # class WindowsHPCControllerLauncher(WindowsHPCLauncher): |
|
638 | extra_args = List([], config=False) | |
621 | # |
|
639 | ||
622 | # job_file_name = Unicode(u'ipcontroller_job.xml', config=True) |
|
640 | def write_job_file(self, n): | |
623 | # extra_args = List([], config=False) |
|
641 | job = IPControllerJob(config=self.config) | |
624 | # |
|
642 | ||
625 | # def write_job_file(self, n): |
|
643 | t = IPControllerTask(config=self.config) | |
626 | # job = IPControllerJob(config=self.config) |
|
644 | # The tasks work directory is *not* the actual work directory of | |
627 | # |
|
645 | # the controller. It is used as the base path for the stdout/stderr | |
628 | # t = IPControllerTask(config=self.config) |
|
646 | # files that the scheduler redirects to. | |
629 | # # The tasks work directory is *not* the actual work directory of |
|
647 | t.work_directory = self.cluster_dir | |
630 | # # the controller. It is used as the base path for the stdout/stderr |
|
648 | # Add the --cluster-dir and from self.start(). | |
631 | # # files that the scheduler redirects to. |
|
649 | t.controller_args.extend(self.extra_args) | |
632 | # t.work_directory = self.cluster_dir |
|
650 | job.add_task(t) | |
633 | # # Add the --cluster-dir and from self.start(). |
|
651 | ||
634 | # t.controller_args.extend(self.extra_args) |
|
652 | self.log.info("Writing job description file: %s" % self.job_file) | |
635 | # job.add_task(t) |
|
653 | job.write(self.job_file) | |
636 | # |
|
654 | ||
637 | # self.log.info("Writing job description file: %s" % self.job_file) |
|
655 | @property | |
638 | # job.write(self.job_file) |
|
656 | def job_file(self): | |
639 | # |
|
657 | return os.path.join(self.cluster_dir, self.job_file_name) | |
640 | # @property |
|
658 | ||
641 | # def job_file(self): |
|
659 | def start(self, cluster_dir): | |
642 | # return os.path.join(self.cluster_dir, self.job_file_name) |
|
660 | """Start the controller by cluster_dir.""" | |
643 | # |
|
661 | self.extra_args = ['--cluster-dir', cluster_dir] | |
644 | # def start(self, cluster_dir): |
|
662 | self.cluster_dir = unicode(cluster_dir) | |
645 | # """Start the controller by cluster_dir.""" |
|
663 | return super(WindowsHPCControllerLauncher, self).start(1) | |
646 | # self.extra_args = ['--cluster-dir', cluster_dir] |
|
664 | ||
647 | # self.cluster_dir = unicode(cluster_dir) |
|
665 | ||
648 | # return super(WindowsHPCControllerLauncher, self).start(1) |
|
666 | class WindowsHPCEngineSetLauncher(WindowsHPCLauncher): | |
649 | # |
|
667 | ||
650 | # |
|
668 | job_file_name = Unicode(u'ipengineset_job.xml', config=True) | |
651 | # class WindowsHPCEngineSetLauncher(WindowsHPCLauncher): |
|
669 | extra_args = List([], config=False) | |
652 | # |
|
670 | ||
653 | # job_file_name = Unicode(u'ipengineset_job.xml', config=True) |
|
671 | def write_job_file(self, n): | |
654 | # extra_args = List([], config=False) |
|
672 | job = IPEngineSetJob(config=self.config) | |
655 | # |
|
673 | ||
656 | # def write_job_file(self, n): |
|
674 | for i in range(n): | |
657 |
|
|
675 | t = IPEngineTask(config=self.config) | |
658 | # |
|
676 | # The tasks work directory is *not* the actual work directory of | |
659 | # for i in range(n): |
|
677 | # the engine. It is used as the base path for the stdout/stderr | |
660 | # t = IPEngineTask(config=self.config) |
|
678 | # files that the scheduler redirects to. | |
661 | # # The tasks work directory is *not* the actual work directory of |
|
679 | t.work_directory = self.cluster_dir | |
662 | # # the engine. It is used as the base path for the stdout/stderr |
|
680 | # Add the --cluster-dir and from self.start(). | |
663 | # # files that the scheduler redirects to. |
|
681 | t.engine_args.extend(self.extra_args) | |
664 | # t.work_directory = self.cluster_dir |
|
682 | job.add_task(t) | |
665 | # # Add the --cluster-dir and from self.start(). |
|
683 | ||
666 | # t.engine_args.extend(self.extra_args) |
|
684 | self.log.info("Writing job description file: %s" % self.job_file) | |
667 | # job.add_task(t) |
|
685 | job.write(self.job_file) | |
668 | # |
|
686 | ||
669 | # self.log.info("Writing job description file: %s" % self.job_file) |
|
687 | @property | |
670 | # job.write(self.job_file) |
|
688 | def job_file(self): | |
671 | # |
|
689 | return os.path.join(self.cluster_dir, self.job_file_name) | |
672 | # @property |
|
690 | ||
673 | # def job_file(self): |
|
691 | def start(self, n, cluster_dir): | |
674 | # return os.path.join(self.cluster_dir, self.job_file_name) |
|
692 | """Start the controller by cluster_dir.""" | |
675 | # |
|
693 | self.extra_args = ['--cluster-dir', cluster_dir] | |
676 | # def start(self, n, cluster_dir): |
|
694 | self.cluster_dir = unicode(cluster_dir) | |
677 | # """Start the controller by cluster_dir.""" |
|
695 | return super(WindowsHPCEngineSetLauncher, self).start(n) | |
678 | # self.extra_args = ['--cluster-dir', cluster_dir] |
|
696 | ||
679 | # self.cluster_dir = unicode(cluster_dir) |
|
697 | ||
680 | # return super(WindowsHPCEngineSetLauncher, self).start(n) |
|
698 | #----------------------------------------------------------------------------- | |
681 | # |
|
699 | # Batch (PBS) system launchers | |
682 | # |
|
700 | #----------------------------------------------------------------------------- | |
683 | # #----------------------------------------------------------------------------- |
|
701 | ||
684 | # # Batch (PBS) system launchers |
|
702 | # TODO: Get PBS launcher working again. | |
685 | # #----------------------------------------------------------------------------- |
|
703 | ||
686 | # |
|
704 | class BatchSystemLauncher(BaseLauncher): | |
687 | # # TODO: Get PBS launcher working again. |
|
705 | """Launch an external process using a batch system. | |
688 | # |
|
706 | ||
689 | # class BatchSystemLauncher(BaseLauncher): |
|
707 | This class is designed to work with UNIX batch systems like PBS, LSF, | |
690 | # """Launch an external process using a batch system. |
|
708 | GridEngine, etc. The overall model is that there are different commands | |
691 | # |
|
709 | like qsub, qdel, etc. that handle the starting and stopping of the process. | |
692 | # This class is designed to work with UNIX batch systems like PBS, LSF, |
|
710 | ||
693 | # GridEngine, etc. The overall model is that there are different commands |
|
711 | This class also has the notion of a batch script. The ``batch_template`` | |
694 | # like qsub, qdel, etc. that handle the starting and stopping of the process. |
|
712 | attribute can be set to a string that is a template for the batch script. | |
695 | # |
|
713 | This template is instantiated using Itpl. Thus the template can use | |
696 | # This class also has the notion of a batch script. The ``batch_template`` |
|
714 | ${n} fot the number of instances. Subclasses can add additional variables | |
697 | # attribute can be set to a string that is a template for the batch script. |
|
715 | to the template dict. | |
698 | # This template is instantiated using Itpl. Thus the template can use |
|
716 | """ | |
699 | # ${n} fot the number of instances. Subclasses can add additional variables |
|
717 | ||
700 | # to the template dict. |
|
718 | # Subclasses must fill these in. See PBSEngineSet | |
701 | # """ |
|
719 | # The name of the command line program used to submit jobs. | |
702 | # |
|
720 | submit_command = Str('', config=True) | |
703 | # # Subclasses must fill these in. See PBSEngineSet |
|
721 | # The name of the command line program used to delete jobs. | |
704 | # # The name of the command line program used to submit jobs. |
|
722 | delete_command = Str('', config=True) | |
705 | # submit_command = Str('', config=True) |
|
723 | # A regular expression used to get the job id from the output of the | |
706 | # # The name of the command line program used to delete jobs. |
|
724 | # submit_command. | |
707 |
|
|
725 | job_id_regexp = Str('', config=True) | |
708 | # # A regular expression used to get the job id from the output of the |
|
726 | # The string that is the batch script template itself. | |
709 | # # submit_command. |
|
727 | batch_template = Str('', config=True) | |
710 | # job_id_regexp = Str('', config=True) |
|
728 | # The filename of the instantiated batch script. | |
711 | # # The string that is the batch script template itself. |
|
729 | batch_file_name = Unicode(u'batch_script', config=True) | |
712 | # batch_template = Str('', config=True) |
|
730 | # The full path to the instantiated batch script. | |
713 | # # The filename of the instantiated batch script. |
|
731 | batch_file = Unicode(u'') | |
714 | # batch_file_name = Unicode(u'batch_script', config=True) |
|
732 | ||
715 | # # The full path to the instantiated batch script. |
|
733 | def __init__(self, work_dir=u'.', config=None): | |
716 | # batch_file = Unicode(u'') |
|
734 | super(BatchSystemLauncher, self).__init__( | |
717 | # |
|
735 | work_dir=work_dir, config=config | |
718 | # def __init__(self, work_dir=u'.', config=None): |
|
736 | ) | |
719 | # super(BatchSystemLauncher, self).__init__( |
|
737 | self.batch_file = os.path.join(self.work_dir, self.batch_file_name) | |
720 | # work_dir=work_dir, config=config |
|
738 | self.context = {} | |
721 | # ) |
|
739 | ||
722 | # self.batch_file = os.path.join(self.work_dir, self.batch_file_name) |
|
740 | def parse_job_id(self, output): | |
723 | # self.context = {} |
|
741 | """Take the output of the submit command and return the job id.""" | |
724 | # |
|
742 | m = re.match(self.job_id_regexp, output) | |
725 | # def parse_job_id(self, output): |
|
743 | if m is not None: | |
726 | # """Take the output of the submit command and return the job id.""" |
|
744 | job_id = m.group() | |
727 | # m = re.match(self.job_id_regexp, output) |
|
745 | else: | |
728 | # if m is not None: |
|
746 | raise LauncherError("Job id couldn't be determined: %s" % output) | |
729 |
|
|
747 | self.job_id = job_id | |
730 | # else: |
|
748 | self.log.info('Job started with job id: %r' % job_id) | |
731 | # raise LauncherError("Job id couldn't be determined: %s" % output) |
|
749 | return job_id | |
732 | # self.job_id = job_id |
|
750 | ||
733 | # self.log.info('Job started with job id: %r' % job_id) |
|
751 | def write_batch_script(self, n): | |
734 | # return job_id |
|
752 | """Instantiate and write the batch script to the work_dir.""" | |
735 | # |
|
753 | self.context['n'] = n | |
736 | # def write_batch_script(self, n): |
|
754 | script_as_string = Itpl.itplns(self.batch_template, self.context) | |
737 | # """Instantiate and write the batch script to the work_dir.""" |
|
755 | self.log.info('Writing instantiated batch script: %s' % self.batch_file) | |
738 | # self.context['n'] = n |
|
756 | f = open(self.batch_file, 'w') | |
739 | # script_as_string = Itpl.itplns(self.batch_template, self.context) |
|
757 | f.write(script_as_string) | |
740 | # self.log.info('Writing instantiated batch script: %s' % self.batch_file) |
|
758 | f.close() | |
741 | # f = open(self.batch_file, 'w') |
|
759 | ||
742 | # f.write(script_as_string) |
|
760 | def start(self, n): | |
743 | # f.close() |
|
761 | """Start n copies of the process using a batch system.""" | |
744 | # |
|
762 | self.write_batch_script(n) | |
745 | # @inlineCallbacks |
|
763 | output = check_output([self.submit_command, self.batch_file], env=os.environ, stdout=STDOUT) | |
746 | # def start(self, n): |
|
764 | job_id = self.parse_job_id(output) | |
747 | # """Start n copies of the process using a batch system.""" |
|
765 | # self.notify_start(job_id) | |
748 | # self.write_batch_script(n) |
|
766 | return job_id | |
749 | # output = yield getProcessOutput(self.submit_command, |
|
767 | ||
750 | # [self.batch_file], env=os.environ) |
|
768 | def stop(self): | |
751 | # job_id = self.parse_job_id(output) |
|
769 | output = Popen([self.delete_command, self.job_id], env=os.environ, stderr=STDOUT) | |
752 | # self.notify_start(job_id) |
|
770 | self.notify_stop(output) # Pass the output of the kill cmd | |
753 | # defer.returnValue(job_id) |
|
771 | return output | |
754 | # |
|
772 | ||
755 | # @inlineCallbacks |
|
773 | ||
756 | # def stop(self): |
|
774 | class PBSLauncher(BatchSystemLauncher): | |
757 | # output = yield getProcessOutput(self.delete_command, |
|
775 | """A BatchSystemLauncher subclass for PBS.""" | |
758 | # [self.job_id], env=os.environ |
|
776 | ||
759 | # ) |
|
777 | submit_command = Str('qsub', config=True) | |
760 | # self.notify_stop(output) # Pass the output of the kill cmd |
|
778 | delete_command = Str('qdel', config=True) | |
761 | # defer.returnValue(output) |
|
779 | job_id_regexp = Str(r'\d+', config=True) | |
762 | # |
|
780 | batch_template = Str('', config=True) | |
763 | # |
|
781 | batch_file_name = Unicode(u'pbs_batch_script', config=True) | |
764 | # class PBSLauncher(BatchSystemLauncher): |
|
782 | batch_file = Unicode(u'') | |
765 | # """A BatchSystemLauncher subclass for PBS.""" |
|
783 | ||
766 | # |
|
784 | ||
767 | # submit_command = Str('qsub', config=True) |
|
785 | class PBSControllerLauncher(PBSLauncher): | |
768 | # delete_command = Str('qdel', config=True) |
|
786 | """Launch a controller using PBS.""" | |
769 | # job_id_regexp = Str(r'\d+', config=True) |
|
787 | ||
770 | # batch_template = Str('', config=True) |
|
788 | batch_file_name = Unicode(u'pbs_batch_script_controller', config=True) | |
771 | # batch_file_name = Unicode(u'pbs_batch_script', config=True) |
|
789 | ||
772 | # batch_file = Unicode(u'') |
|
790 | def start(self, cluster_dir): | |
773 | # |
|
791 | """Start the controller by profile or cluster_dir.""" | |
774 | # |
|
792 | # Here we save profile and cluster_dir in the context so they | |
775 | # class PBSControllerLauncher(PBSLauncher): |
|
793 | # can be used in the batch script template as ${profile} and | |
776 | # """Launch a controller using PBS.""" |
|
794 | # ${cluster_dir} | |
777 | # |
|
795 | self.context['cluster_dir'] = cluster_dir | |
778 | # batch_file_name = Unicode(u'pbs_batch_script_controller', config=True) |
|
796 | self.cluster_dir = unicode(cluster_dir) | |
779 | # |
|
797 | self.log.info("Starting PBSControllerLauncher: %r" % self.args) | |
780 | # def start(self, cluster_dir): |
|
798 | return super(PBSControllerLauncher, self).start(1) | |
781 | # """Start the controller by profile or cluster_dir.""" |
|
799 | ||
782 | # # Here we save profile and cluster_dir in the context so they |
|
800 | ||
783 | # # can be used in the batch script template as ${profile} and |
|
801 | class PBSEngineSetLauncher(PBSLauncher): | |
784 | # # ${cluster_dir} |
|
802 | ||
785 | # self.context['cluster_dir'] = cluster_dir |
|
803 | batch_file_name = Unicode(u'pbs_batch_script_engines', config=True) | |
786 | # self.cluster_dir = unicode(cluster_dir) |
|
804 | ||
787 | # self.log.info("Starting PBSControllerLauncher: %r" % self.args) |
|
805 | def start(self, n, cluster_dir): | |
788 | # return super(PBSControllerLauncher, self).start(1) |
|
806 | """Start n engines by profile or cluster_dir.""" | |
789 | # |
|
807 | self.program_args.extend(['--cluster-dir', cluster_dir]) | |
790 | # |
|
808 | self.cluster_dir = unicode(cluster_dir) | |
791 | # class PBSEngineSetLauncher(PBSLauncher): |
|
809 | self.log.info('Starting PBSEngineSetLauncher: %r' % self.args) | |
792 | # |
|
810 | return super(PBSEngineSetLauncher, self).start(n) | |
793 | # batch_file_name = Unicode(u'pbs_batch_script_engines', config=True) |
|
|||
794 | # |
|
|||
795 | # def start(self, n, cluster_dir): |
|
|||
796 | # """Start n engines by profile or cluster_dir.""" |
|
|||
797 | # self.program_args.extend(['--cluster-dir', cluster_dir]) |
|
|||
798 | # self.cluster_dir = unicode(cluster_dir) |
|
|||
799 | # self.log.info('Starting PBSEngineSetLauncher: %r' % self.args) |
|
|||
800 | # return super(PBSEngineSetLauncher, self).start(n) |
|
|||
801 |
|
811 | |||
802 |
|
812 | |||
803 | #----------------------------------------------------------------------------- |
|
813 | #----------------------------------------------------------------------------- | |
@@ -811,7 +821,7 b' class IPClusterLauncher(LocalProcessLauncher):' | |||||
811 | ipcluster_cmd = List(ipcluster_cmd_argv, config=True) |
|
821 | ipcluster_cmd = List(ipcluster_cmd_argv, config=True) | |
812 | # Command line arguments to pass to ipcluster. |
|
822 | # Command line arguments to pass to ipcluster. | |
813 | ipcluster_args = List( |
|
823 | ipcluster_args = List( | |
814 |
['--clean-logs', '--log-to-file', '--log-level', str(logging. |
|
824 | ['--clean-logs', '--log-to-file', '--log-level', str(logging.INFO)], config=True) | |
815 | ipcluster_subcommand = Str('start') |
|
825 | ipcluster_subcommand = Str('start') | |
816 | ipcluster_n = Int(2) |
|
826 | ipcluster_n = Int(2) | |
817 |
|
827 |
General Comments 0
You need to be logged in to leave comments.
Login now