##// END OF EJS Templates
More work on the launchers and Win HPC support.
Brian Granger -
Show More
@@ -1,198 +1,202 b''
1 import os
1 import os
2
2
3 c = get_config()
3 c = get_config()
4
4
5 #-----------------------------------------------------------------------------
5 #-----------------------------------------------------------------------------
6 # Select which launchers to use
6 # Select which launchers to use
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8
8
9 # This allows you to control what method is used to start the controller
9 # This allows you to control what method is used to start the controller
10 # and engines. The following methods are currently supported:
10 # and engines. The following methods are currently supported:
11 # * Start as a regular process on localhost.
11 # - Start as a regular process on localhost.
12 # * Start using mpiexec.
12 # - Start using mpiexec.
13 # * Start using PBS
13 # - Start using the Windows HPC Server 2008 scheduler
14 # * Start using SSH (currently broken)
14 # - Start using PBS
15 # - Start using SSH (currently broken)
16
15
17
16 # The selected launchers can be configured below.
18 # The selected launchers can be configured below.
17
19
18 # Options are (LocalControllerLauncher, MPIExecControllerLauncher,
20 # Options are:
19 # PBSControllerLauncher, WindowsHPCControllerLauncher)
21 # - LocalControllerLauncher
22 # - MPIExecControllerLauncher
23 # - PBSControllerLauncher
24 # - WindowsHPCControllerLauncher
20 # c.Global.controller_launcher = 'IPython.kernel.launcher.LocalControllerLauncher'
25 # c.Global.controller_launcher = 'IPython.kernel.launcher.LocalControllerLauncher'
21
26
22 # Options are (LocalEngineSetLauncher, MPIExecEngineSetLauncher,
27 # Options are:
23 # PBSEngineSetLauncher)
28 # - LocalEngineSetLauncher
29 # - MPIExecEngineSetLauncher
30 # - PBSEngineSetLauncher
31 # - WindowsHPCEngineSetLauncher
24 # c.Global.engine_launcher = 'IPython.kernel.launcher.LocalEngineSetLauncher'
32 # c.Global.engine_launcher = 'IPython.kernel.launcher.LocalEngineSetLauncher'
25
33
26 #-----------------------------------------------------------------------------
34 #-----------------------------------------------------------------------------
27 # Global configuration
35 # Global configuration
28 #-----------------------------------------------------------------------------
36 #-----------------------------------------------------------------------------
29
37
30 # The default number of engine that will be started. This is overridden by
38 # The default number of engines that will be started. This is overridden by
31 # the -n command line option: "ipcluster start -n 4"
39 # the -n command line option: "ipcluster start -n 4"
32 # c.Global.n = 2
40 # c.Global.n = 2
33
41
34 # Log to a file in cluster_dir/log, otherwise just log to sys.stdout.
42 # Log to a file in cluster_dir/log, otherwise just log to sys.stdout.
35 # c.Global.log_to_file = False
43 # c.Global.log_to_file = False
36
44
37 # Remove old logs from cluster_dir/log before starting.
45 # Remove old logs from cluster_dir/log before starting.
38 # c.Global.clean_logs = True
46 # c.Global.clean_logs = True
39
47
40 # The working directory for the process. The application will use os.chdir
48 # The working directory for the process. The application will use os.chdir
41 # to change to this directory before starting.
49 # to change to this directory before starting.
42 # c.Global.working_dir = os.getcwd()
50 # c.Global.working_dir = os.getcwd()
43
51
52
44 #-----------------------------------------------------------------------------
53 #-----------------------------------------------------------------------------
45 # Controller launcher configuration
54 # Local process launchers
46 #-----------------------------------------------------------------------------
55 #-----------------------------------------------------------------------------
47
56
48 # Configure how the controller is started. The configuration of the controller
57 # The working directory for the controller
49 # can also bet setup by editing the controller config file:
58 # c.LocalControllerLauncher.working_dir = u''
50 # ipcontroller_config.py
51
59
52 # The command line arguments to call the controller with.
60 # The command line arguments to call the controller with.
53 # c.LocalControllerLauncher.controller_args = \
61 # c.LocalControllerLauncher.controller_args = \
54 # ['--log-to-file','--log-level', '40']
62 # ['--log-to-file','--log-level', '40']
55
63
64 # The working directory for the controller
65 # c.LocalEngineSetLauncher.working_dir = u''
66
67 # Command line argument passed to the engines.
68 # c.LocalEngineSetLauncher.engine_args = ['--log-to-file','--log-level', '40']
69
70 #-----------------------------------------------------------------------------
71 # MPIExec launchers
72 #-----------------------------------------------------------------------------
73
74 # The working directory for the controller
75 # c.MPIExecControllerLauncher.working_dir = u''
76
56 # The mpiexec/mpirun command to use in started the controller.
77 # The mpiexec/mpirun command to use in started the controller.
57 # c.MPIExecControllerLauncher.mpi_cmd = ['mpiexec']
78 # c.MPIExecControllerLauncher.mpi_cmd = ['mpiexec']
58
79
59 # Additional arguments to pass to the actual mpiexec command.
80 # Additional arguments to pass to the actual mpiexec command.
60 # c.MPIExecControllerLauncher.mpi_args = []
81 # c.MPIExecControllerLauncher.mpi_args = []
61
82
62 # The command line argument to call the controller with.
83 # The command line argument to call the controller with.
63 # c.MPIExecControllerLauncher.controller_args = \
84 # c.MPIExecControllerLauncher.controller_args = \
64 # ['--log-to-file','--log-level', '40']
85 # ['--log-to-file','--log-level', '40']
65
86
87
88 # The working directory for the controller
89 # c.MPIExecEngineSetLauncher.working_dir = u''
90
91 # The mpiexec/mpirun command to use in started the controller.
92 # c.MPIExecEngineSetLauncher.mpi_cmd = ['mpiexec']
93
94 # Additional arguments to pass to the actual mpiexec command.
95 # c.MPIExecEngineSetLauncher.mpi_args = []
96
97 # Command line argument passed to the engines.
98 # c.MPIExecEngineSetLauncher.engine_args = ['--log-to-file','--log-level', '40']
99
100 # The default number of engines to start if not given elsewhere.
101 # c.MPIExecEngineSetLauncher.n = 1
102
103 #-----------------------------------------------------------------------------
104 # SSH launchers
105 #-----------------------------------------------------------------------------
106
107 # Todo
108
109
110 #-----------------------------------------------------------------------------
111 # Unix batch (PBS) schedulers launchers
112 #-----------------------------------------------------------------------------
113
114 # The working directory for the controller
115 # c.PBSControllerLauncher.working_dir = u''
116
66 # The command line program to use to submit a PBS job.
117 # The command line program to use to submit a PBS job.
67 # c.PBSControllerLauncher.submit_command = 'qsub'
118 # c.PBSControllerLauncher.submit_command = 'qsub'
68
119
69 # The command line program to use to delete a PBS job.
120 # The command line program to use to delete a PBS job.
70 # c.PBSControllerLauncher.delete_command = 'qdel'
121 # c.PBSControllerLauncher.delete_command = 'qdel'
71
122
72 # A regular expression that takes the output of qsub and find the job id.
123 # A regular expression that takes the output of qsub and find the job id.
73 # c.PBSControllerLauncher.job_id_regexp = '\d+'
124 # c.PBSControllerLauncher.job_id_regexp = '\d+'
74
125
75 # The batch submission script used to start the controller. This is where
126 # The batch submission script used to start the controller. This is where
76 # environment variables would be setup, etc. This string is interpolated using
127 # environment variables would be setup, etc. This string is interpolated using
77 # the Itpl module in IPython.external. Basically, you can use ${profile} for
128 # the Itpl module in IPython.external. Basically, you can use ${profile} for
78 # the controller profile or ${cluster_dir} for the cluster_dir.
129 # the controller profile or ${cluster_dir} for the cluster_dir.
79 # c.PBSControllerLauncher.batch_template = """"""
130 # c.PBSControllerLauncher.batch_template = """"""
80
131
81 # The name of the instantiated batch script that will actually be used to
132 # The name of the instantiated batch script that will actually be used to
82 # submit the job. This will be written to the cluster directory.
133 # submit the job. This will be written to the cluster directory.
83 # c.PBSControllerLauncher.batch_file_name = u'pbs_batch_script_controller'
134 # c.PBSControllerLauncher.batch_file_name = u'pbs_batch_script_controller'
84
135
85 #-----------------------------------------------------------------------------
86 # Windows HPC Server 2008 launcher configuration
87 #-----------------------------------------------------------------------------
88
89 # c.WinHPCJob.username = 'DOMAIN\\user'
90 # c.WinHPCJob.priority = 'Highest'
91 # c.WinHPCJob.requested_nodes = ''
92 # c.WinHPCJob.project = ''
93 # c.WinHPCJob.is_exclusive = False
94
95 # c.WinHPCTask.environment_variables = {}
96 # c.WinHPCTask.work_directory = ''
97 # c.WinHPCTask.is_rerunnable = True
98
99 # c.IPControllerTask.task_name = 'IPController'
100 # c.IPControllerTask.controller_cmd = [u'ipcontroller.exe']
101 # c.IPControllerTask.controller_args = ['--log-to-file', '--log-level', '40']
102 # c.IPControllerTask.environment_variables = {}
103
104 # c.IPEngineTask.task_name = 'IPController'
105 # c.IPEngineTask.engine_cmd = [u'ipengine.exe']
106 # c.IPEngineTask.engine_args = ['--log-to-file', '--log-level', '40']
107 # c.IPEngineTask.environment_variables = {}
108
109 # c.WindowsHPCLauncher.scheduler = 'HEADNODE'
110 # c.WindowsHPCLauncher.username = '\\DOMAIN\USERNAME'
111 # c.WindowsHPCLauncher.priority = 'Highest'
112 # c.WindowsHPCLauncher.requested_nodes = ''
113 # c.WindowsHPCLauncher.job_file_name = u'ipython_job.xml'
114 # c.WindowsHPCLauncher.project = 'MyProject'
115
116 # c.WindowsHPCControllerLauncher.scheduler = 'HEADNODE'
117 # c.WindowsHPCControllerLauncher.username = '\\DOMAIN\USERNAME'
118 # c.WindowsHPCControllerLauncher.priority = 'Highest'
119 # c.WindowsHPCControllerLauncher.requested_nodes = ''
120 # c.WindowsHPCControllerLauncher.job_file_name = u'ipcontroller_job.xml'
121 # c.WindowsHPCControllerLauncher.project = 'MyProject'
122
123
124 #-----------------------------------------------------------------------------
125 # Engine launcher configuration
126 #-----------------------------------------------------------------------------
127
128 # Command line argument passed to the engines.
129 # c.LocalEngineSetLauncher.engine_args = ['--log-to-file','--log-level', '40']
130
131 # The mpiexec/mpirun command to use in started the controller.
132 # c.MPIExecEngineSetLauncher.mpi_cmd = ['mpiexec']
133
134 # Additional arguments to pass to the actual mpiexec command.
135 # c.MPIExecEngineSetLauncher.mpi_args = []
136
137 # Command line argument passed to the engines.
138 # c.MPIExecEngineSetLauncher.engine_args = ['--log-to-file','--log-level', '40']
139
136
140 # The default number of engines to start if not given elsewhere.
137 # The working directory for the controller
141 # c.MPIExecEngineSetLauncher.n = 1
138 # c.PBSEngineSetLauncher.working_dir = u''
142
139
143 # The command line program to use to submit a PBS job.
140 # The command line program to use to submit a PBS job.
144 # c.PBSEngineSetLauncher.submit_command = 'qsub'
141 # c.PBSEngineSetLauncher.submit_command = 'qsub'
145
142
146 # The command line program to use to delete a PBS job.
143 # The command line program to use to delete a PBS job.
147 # c.PBSEngineSetLauncher.delete_command = 'qdel'
144 # c.PBSEngineSetLauncher.delete_command = 'qdel'
148
145
149 # A regular expression that takes the output of qsub and find the job id.
146 # A regular expression that takes the output of qsub and find the job id.
150 # c.PBSEngineSetLauncher.job_id_regexp = '\d+'
147 # c.PBSEngineSetLauncher.job_id_regexp = '\d+'
151
148
152 # The batch submission script used to start the engines. This is where
149 # The batch submission script used to start the engines. This is where
153 # environment variables would be setup, etc. This string is interpolated using
150 # environment variables would be setup, etc. This string is interpolated using
154 # the Itpl module in IPython.external. Basically, you can use ${n} for the
151 # the Itpl module in IPython.external. Basically, you can use ${n} for the
155 # number of engine, ${profile} or the engine profile and ${cluster_dir}
152 # number of engine, ${profile} or the engine profile and ${cluster_dir}
156 # for the cluster_dir.
153 # for the cluster_dir.
157 # c.PBSEngineSetLauncher.batch_template = """"""
154 # c.PBSEngineSetLauncher.batch_template = """"""
158
155
159 # The name of the instantiated batch script that will actually be used to
156 # The name of the instantiated batch script that will actually be used to
160 # submit the job. This will be written to the cluster directory.
157 # submit the job. This will be written to the cluster directory.
161 # c.PBSEngineSetLauncher.batch_file_name = u'pbs_batch_script_engines'
158 # c.PBSEngineSetLauncher.batch_file_name = u'pbs_batch_script_engines'
162
159
163 #-----------------------------------------------------------------------------
160 #-----------------------------------------------------------------------------
164 # Base launcher configuration
161 # Windows HPC Server 2008 launcher configuration
165 #-----------------------------------------------------------------------------
162 #-----------------------------------------------------------------------------
166
163
167 # The various launchers are organized into an inheritance hierarchy.
164 # c.IPControllerJob.job_name = 'IPController'
168 # The configurations can also be iherited and the following attributes
165 # c.IPControllerJob.is_exclusive = False
169 # allow you to configure the base classes.
166 # c.IPControllerJob.username = 'USERDOMAIN\\USERNAME'
170
167 # c.IPControllerJob.priority = 'Highest'
171 # c.MPIExecLauncher.mpi_cmd = ['mpiexec']
168 # c.IPControllerJob.requested_nodes = ''
172 # c.MPIExecLauncher.mpi_args = []
169 # c.IPControllerJob.project = 'MyProject'
173 # c.MPIExecLauncher.program = []
170
174 # c.MPIExecLauncher.program_args = []
171 # c.IPControllerTask.task_name = 'IPController'
175 # c.MPIExecLauncher.n = 1
172 # c.IPControllerTask.controller_cmd = [u'ipcontroller.exe']
176
173 # c.IPControllerTask.controller_args = ['--log-to-file', '--log-level', '40']
177 # c.SSHLauncher.ssh_cmd = ['ssh']
174 # c.IPControllerTask.environment_variables = {}
178 # c.SSHLauncher.ssh_args = []
175
179 # c.SSHLauncher.program = []
176 # c.WindowsHPCControllerLauncher.working_dir = u''
180 # s.SSHLauncher.program_args = []
177 # c.WindowsHPCControllerLauncher.scheduler = 'HEADNODE'
181 # c.SSHLauncher.hostname = ''
178 # c.WindowsHPCControllerLauncher.job_file_name = u'ipcontroller_job.xml'
182 # c.SSHLauncher.user = os.environ['USER']
179
183
180
184 # c.BatchSystemLauncher.submit_command
181 # c.IPEngineSetJob.job_name = 'IPEngineSet'
185 # c.BatchSystemLauncher.delete_command
182 # c.IPEngineSetJob.is_exclusive = False
186 # c.BatchSystemLauncher.job_id_regexp
183 # c.IPEngineSetJob.username = 'USERDOMAIN\\USERNAME'
187 # c.BatchSystemLauncher.batch_template
184 # c.IPEngineSetJob.priority = 'Highest'
188 # c.BatchSystemLauncher.batch_file_name
185 # c.IPEngineSetJob.requested_nodes = ''
189
186 # c.IPEngineSetJob.project = 'MyProject'
190 # c.PBSLauncher.submit_command = 'qsub'
187
191 # c.PBSLauncher.delete_command = 'qdel'
188 # c.IPEngineTask.task_name = 'IPEngine'
192 # c.PBSLauncher.job_id_regexp = '\d+'
189 # c.IPEngineTask.engine_cmd = [u'ipengine.exe']
193 # c.PBSLauncher.batch_template = """"""
190 # c.IPEngineTask.engine_args = ['--log-to-file', '--log-level', '40']
194 # c.PBSLauncher.batch_file_name = u'pbs_batch_script'
191 # c.IPEngineTask.environment_variables = {}
192
193 # c.WindowsHPCEngineSetLauncher.working_dir = u''
194 # c.WindowsHPCEngineSetLauncher.scheduler = 'HEADNODE'
195 # c.WindowsHPCEngineSetLauncher.job_file_name = u'ipengineset_job.xml'
196
197
198
195
199
196
200
197
201
198
202
This diff has been collapsed as it changes many lines, (558 lines changed) Show them Hide them
@@ -1,820 +1,866 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 Facilities for launching processing asynchronously.
4 Facilities for launching IPython processes asynchronously.
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2009 The IPython Development Team
8 # Copyright (C) 2008-2009 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 import os
18 import os
19 import re
19 import re
20 import sys
20 import sys
21
21
22 from IPython.core.component import Component
22 from IPython.core.component import Component
23 from IPython.external import Itpl
23 from IPython.external import Itpl
24 from IPython.utils.traitlets import Str, Int, List, Unicode, Enum
24 from IPython.utils.traitlets import Str, Int, List, Unicode, Enum
25 from IPython.utils.platutils import find_cmd
25 from IPython.utils.platutils import find_cmd
26 from IPython.kernel.twistedutil import gatherBoth, make_deferred, sleep_deferred
26 from IPython.kernel.twistedutil import gatherBoth, make_deferred, sleep_deferred
27 from IPython.kernel.winhpcjob import (
27 from IPython.kernel.winhpcjob import (
28 WinHPCJob, WinHPCTask,
28 WinHPCJob, WinHPCTask,
29 IPControllerTask, IPEngineTask
29 IPControllerTask, IPEngineTask,
30 IPControllerJob, IPEngineSetJob
30 )
31 )
31
32
32 from twisted.internet import reactor, defer
33 from twisted.internet import reactor, defer
33 from twisted.internet.defer import inlineCallbacks
34 from twisted.internet.defer import inlineCallbacks
34 from twisted.internet.protocol import ProcessProtocol
35 from twisted.internet.protocol import ProcessProtocol
35 from twisted.internet.utils import getProcessOutput
36 from twisted.internet.utils import getProcessOutput
36 from twisted.internet.error import ProcessDone, ProcessTerminated
37 from twisted.internet.error import ProcessDone, ProcessTerminated
37 from twisted.python import log
38 from twisted.python import log
38 from twisted.python.failure import Failure
39 from twisted.python.failure import Failure
39
40
40 #-----------------------------------------------------------------------------
41 #-----------------------------------------------------------------------------
41 # Generic launchers
42 # Utilities
43 #-----------------------------------------------------------------------------
44
45
46 def find_controller_cmd():
47 """Find the command line ipcontroller program in a cross platform way."""
48 if sys.platform == 'win32':
49 # This logic is needed because the ipcontroller script doesn't
50 # always get installed in the same way or in the same location.
51 from IPython.kernel import ipcontrollerapp
52 script_location = ipcontrollerapp.__file__.replace('.pyc', '.py')
53 # The -u option here turns on unbuffered output, which is required
54 # on Win32 to prevent wierd conflict and problems with Twisted.
55 # Also, use sys.executable to make sure we are picking up the
56 # right python exe.
57 cmd = [sys.executable, '-u', script_location]
58 else:
59 # ipcontroller has to be on the PATH in this case.
60 cmd = ['ipcontroller']
61 return cmd
62
63
64 def find_engine_cmd():
65 """Find the command line ipengine program in a cross platform way."""
66 if sys.platform == 'win32':
67 # This logic is needed because the ipengine script doesn't
68 # always get installed in the same way or in the same location.
69 from IPython.kernel import ipengineapp
70 script_location = ipengineapp.__file__.replace('.pyc', '.py')
71 # The -u option here turns on unbuffered output, which is required
72 # on Win32 to prevent wierd conflict and problems with Twisted.
73 # Also, use sys.executable to make sure we are picking up the
74 # right python exe.
75 cmd = [sys.executable, '-u', script_location]
76 else:
77 # ipcontroller has to be on the PATH in this case.
78 cmd = ['ipengine']
79 return cmd
80
81
82 #-----------------------------------------------------------------------------
83 # Base launchers and errors
42 #-----------------------------------------------------------------------------
84 #-----------------------------------------------------------------------------
43
85
44
86
45 class LauncherError(Exception):
87 class LauncherError(Exception):
46 pass
88 pass
47
89
48
90
49 class ProcessStateError(LauncherError):
91 class ProcessStateError(LauncherError):
50 pass
92 pass
51
93
52
94
53 class UnknownStatus(LauncherError):
95 class UnknownStatus(LauncherError):
54 pass
96 pass
55
97
56
98
57 class BaseLauncher(Component):
99 class BaseLauncher(Component):
58 """An asbtraction for starting, stopping and signaling a process."""
100 """An asbtraction for starting, stopping and signaling a process."""
59
101
60 # A directory for files related to the process. But, we don't cd to
61 # this directory,
62 working_dir = Unicode(u'')
102 working_dir = Unicode(u'')
63
103
64 def __init__(self, working_dir, parent=None, name=None, config=None):
104 def __init__(self, working_dir, parent=None, name=None, config=None):
65 super(BaseLauncher, self).__init__(parent, name, config)
105 super(BaseLauncher, self).__init__(parent, name, config)
66 self.working_dir = working_dir
106 self.working_dir = working_dir
67 self.state = 'before' # can be before, running, after
107 self.state = 'before' # can be before, running, after
68 self.stop_deferreds = []
108 self.stop_deferreds = []
69 self.start_data = None
109 self.start_data = None
70 self.stop_data = None
110 self.stop_data = None
71
111
72 @property
112 @property
73 def args(self):
113 def args(self):
74 """A list of cmd and args that will be used to start the process.
114 """A list of cmd and args that will be used to start the process.
75
115
76 This is what is passed to :func:`spawnProcess` and the first element
116 This is what is passed to :func:`spawnProcess` and the first element
77 will be the process name.
117 will be the process name.
78 """
118 """
79 return self.find_args()
119 return self.find_args()
80
120
81 def find_args(self):
121 def find_args(self):
82 """The ``.args`` property calls this to find the args list.
122 """The ``.args`` property calls this to find the args list.
83
123
84 Subcommand should implement this to construct the cmd and args.
124 Subcommand should implement this to construct the cmd and args.
85 """
125 """
86 raise NotImplementedError('find_args must be implemented in a subclass')
126 raise NotImplementedError('find_args must be implemented in a subclass')
87
127
88 @property
128 @property
89 def arg_str(self):
129 def arg_str(self):
90 """The string form of the program arguments."""
130 """The string form of the program arguments."""
91 return ' '.join(self.args)
131 return ' '.join(self.args)
92
132
93 @property
133 @property
94 def running(self):
134 def running(self):
95 """Am I running."""
135 """Am I running."""
96 if self.state == 'running':
136 if self.state == 'running':
97 return True
137 return True
98 else:
138 else:
99 return False
139 return False
100
140
101 def start(self):
141 def start(self):
102 """Start the process.
142 """Start the process.
103
143
104 This must return a deferred that fires with information about the
144 This must return a deferred that fires with information about the
105 process starting (like a pid, job id, etc.).
145 process starting (like a pid, job id, etc.).
106 """
146 """
107 return defer.fail(
147 return defer.fail(
108 Failure(NotImplementedError(
148 Failure(NotImplementedError(
109 'start must be implemented in a subclass')
149 'start must be implemented in a subclass')
110 )
150 )
111 )
151 )
112
152
113 def stop(self):
153 def stop(self):
114 """Stop the process and notify observers of stopping.
154 """Stop the process and notify observers of stopping.
115
155
116 This must return a deferred that fires with information about the
156 This must return a deferred that fires with information about the
117 processing stopping, like errors that occur while the process is
157 processing stopping, like errors that occur while the process is
118 attempting to be shut down. This deferred won't fire when the process
158 attempting to be shut down. This deferred won't fire when the process
119 actually stops. To observe the actual process stopping, see
159 actually stops. To observe the actual process stopping, see
120 :func:`observe_stop`.
160 :func:`observe_stop`.
121 """
161 """
122 return defer.fail(
162 return defer.fail(
123 Failure(NotImplementedError(
163 Failure(NotImplementedError(
124 'stop must be implemented in a subclass')
164 'stop must be implemented in a subclass')
125 )
165 )
126 )
166 )
127
167
128 def observe_stop(self):
168 def observe_stop(self):
129 """Get a deferred that will fire when the process stops.
169 """Get a deferred that will fire when the process stops.
130
170
131 The deferred will fire with data that contains information about
171 The deferred will fire with data that contains information about
132 the exit status of the process.
172 the exit status of the process.
133 """
173 """
134 if self.state=='after':
174 if self.state=='after':
135 return defer.succeed(self.stop_data)
175 return defer.succeed(self.stop_data)
136 else:
176 else:
137 d = defer.Deferred()
177 d = defer.Deferred()
138 self.stop_deferreds.append(d)
178 self.stop_deferreds.append(d)
139 return d
179 return d
140
180
141 def notify_start(self, data):
181 def notify_start(self, data):
142 """Call this to trigger startup actions.
182 """Call this to trigger startup actions.
143
183
144 This logs the process startup and sets the state to 'running'. It is
184 This logs the process startup and sets the state to 'running'. It is
145 a pass-through so it can be used as a callback.
185 a pass-through so it can be used as a callback.
146 """
186 """
147
187
148 log.msg('Process %r started: %r' % (self.args[0], data))
188 log.msg('Process %r started: %r' % (self.args[0], data))
149 self.start_data = data
189 self.start_data = data
150 self.state = 'running'
190 self.state = 'running'
151 return data
191 return data
152
192
153 def notify_stop(self, data):
193 def notify_stop(self, data):
154 """Call this to trigger process stop actions.
194 """Call this to trigger process stop actions.
155
195
156 This logs the process stopping and sets the state to 'after'. Call
196 This logs the process stopping and sets the state to 'after'. Call
157 this to trigger all the deferreds from :func:`observe_stop`."""
197 this to trigger all the deferreds from :func:`observe_stop`."""
158
198
159 log.msg('Process %r stopped: %r' % (self.args[0], data))
199 log.msg('Process %r stopped: %r' % (self.args[0], data))
160 self.stop_data = data
200 self.stop_data = data
161 self.state = 'after'
201 self.state = 'after'
162 for i in range(len(self.stop_deferreds)):
202 for i in range(len(self.stop_deferreds)):
163 d = self.stop_deferreds.pop()
203 d = self.stop_deferreds.pop()
164 d.callback(data)
204 d.callback(data)
165 return data
205 return data
166
206
167 def signal(self, sig):
207 def signal(self, sig):
168 """Signal the process.
208 """Signal the process.
169
209
170 Return a semi-meaningless deferred after signaling the process.
210 Return a semi-meaningless deferred after signaling the process.
171
211
172 Parameters
212 Parameters
173 ----------
213 ----------
174 sig : str or int
214 sig : str or int
175 'KILL', 'INT', etc., or any signal number
215 'KILL', 'INT', etc., or any signal number
176 """
216 """
177 return defer.fail(
217 return defer.fail(
178 Failure(NotImplementedError(
218 Failure(NotImplementedError(
179 'signal must be implemented in a subclass')
219 'signal must be implemented in a subclass')
180 )
220 )
181 )
221 )
182
222
183
223
224 #-----------------------------------------------------------------------------
225 # Local process launchers
226 #-----------------------------------------------------------------------------
227
228
184 class LocalProcessLauncherProtocol(ProcessProtocol):
229 class LocalProcessLauncherProtocol(ProcessProtocol):
185 """A ProcessProtocol to go with the LocalProcessLauncher."""
230 """A ProcessProtocol to go with the LocalProcessLauncher."""
186
231
187 def __init__(self, process_launcher):
232 def __init__(self, process_launcher):
188 self.process_launcher = process_launcher
233 self.process_launcher = process_launcher
189 self.pid = None
234 self.pid = None
190
235
191 def connectionMade(self):
236 def connectionMade(self):
192 self.pid = self.transport.pid
237 self.pid = self.transport.pid
193 self.process_launcher.notify_start(self.transport.pid)
238 self.process_launcher.notify_start(self.transport.pid)
194
239
195 def processEnded(self, status):
240 def processEnded(self, status):
196 value = status.value
241 value = status.value
197 if isinstance(value, ProcessDone):
242 if isinstance(value, ProcessDone):
198 self.process_launcher.notify_stop(
243 self.process_launcher.notify_stop(
199 {'exit_code':0,
244 {'exit_code':0,
200 'signal':None,
245 'signal':None,
201 'status':None,
246 'status':None,
202 'pid':self.pid
247 'pid':self.pid
203 }
248 }
204 )
249 )
205 elif isinstance(value, ProcessTerminated):
250 elif isinstance(value, ProcessTerminated):
206 self.process_launcher.notify_stop(
251 self.process_launcher.notify_stop(
207 {'exit_code':value.exitCode,
252 {'exit_code':value.exitCode,
208 'signal':value.signal,
253 'signal':value.signal,
209 'status':value.status,
254 'status':value.status,
210 'pid':self.pid
255 'pid':self.pid
211 }
256 }
212 )
257 )
213 else:
258 else:
214 raise UnknownStatus("Unknown exit status, this is probably a "
259 raise UnknownStatus("Unknown exit status, this is probably a "
215 "bug in Twisted")
260 "bug in Twisted")
216
261
217 def outReceived(self, data):
262 def outReceived(self, data):
218 log.msg(data)
263 log.msg(data)
219
264
220 def errReceived(self, data):
265 def errReceived(self, data):
221 log.err(data)
266 log.err(data)
222
267
223
268
224 class LocalProcessLauncher(BaseLauncher):
269 class LocalProcessLauncher(BaseLauncher):
225 """Start and stop an external process in an asynchronous manner.
270 """Start and stop an external process in an asynchronous manner.
226
271
227 This will launch the external process with a working directory of
272 This will launch the external process with a working directory of
228 ``self.working_dir``.
273 ``self.working_dir``.
229 """
274 """
230
275
231 # This is used to to construct self.args, which is passed to
276 # This is used to to construct self.args, which is passed to
232 # spawnProcess.
277 # spawnProcess.
233 cmd_and_args = List([])
278 cmd_and_args = List([])
234
279
235 def __init__(self, working_dir, parent=None, name=None, config=None):
280 def __init__(self, working_dir, parent=None, name=None, config=None):
236 super(LocalProcessLauncher, self).__init__(
281 super(LocalProcessLauncher, self).__init__(
237 working_dir, parent, name, config
282 working_dir, parent, name, config
238 )
283 )
239 self.process_protocol = None
284 self.process_protocol = None
240 self.start_deferred = None
285 self.start_deferred = None
241
286
242 def find_args(self):
287 def find_args(self):
243 return self.cmd_and_args
288 return self.cmd_and_args
244
289
245 def start(self):
290 def start(self):
246 if self.state == 'before':
291 if self.state == 'before':
247 self.process_protocol = LocalProcessLauncherProtocol(self)
292 self.process_protocol = LocalProcessLauncherProtocol(self)
248 self.start_deferred = defer.Deferred()
293 self.start_deferred = defer.Deferred()
249 self.process_transport = reactor.spawnProcess(
294 self.process_transport = reactor.spawnProcess(
250 self.process_protocol,
295 self.process_protocol,
251 str(self.args[0]), # twisted expects these to be str, not unicode
296 str(self.args[0]), # twisted expects these to be str, not unicode
252 [str(a) for a in self.args], # str expected, not unicode
297 [str(a) for a in self.args], # str expected, not unicode
253 env=os.environ,
298 env=os.environ,
254 path=self.working_dir # start in the working_dir
299 path=self.working_dir # start in the working_dir
255 )
300 )
256 return self.start_deferred
301 return self.start_deferred
257 else:
302 else:
258 s = 'The process was already started and has state: %r' % self.state
303 s = 'The process was already started and has state: %r' % self.state
259 return defer.fail(ProcessStateError(s))
304 return defer.fail(ProcessStateError(s))
260
305
261 def notify_start(self, data):
306 def notify_start(self, data):
262 super(LocalProcessLauncher, self).notify_start(data)
307 super(LocalProcessLauncher, self).notify_start(data)
263 self.start_deferred.callback(data)
308 self.start_deferred.callback(data)
264
309
265 def stop(self):
310 def stop(self):
266 return self.interrupt_then_kill()
311 return self.interrupt_then_kill()
267
312
268 @make_deferred
313 @make_deferred
269 def signal(self, sig):
314 def signal(self, sig):
270 if self.state == 'running':
315 if self.state == 'running':
271 self.process_transport.signalProcess(sig)
316 self.process_transport.signalProcess(sig)
272
317
273 @inlineCallbacks
318 @inlineCallbacks
274 def interrupt_then_kill(self, delay=2.0):
319 def interrupt_then_kill(self, delay=2.0):
275 """Send INT, wait a delay and then send KILL."""
320 """Send INT, wait a delay and then send KILL."""
276 yield self.signal('INT')
321 yield self.signal('INT')
277 yield sleep_deferred(delay)
322 yield sleep_deferred(delay)
278 yield self.signal('KILL')
323 yield self.signal('KILL')
279
324
280
325
326 class LocalControllerLauncher(LocalProcessLauncher):
327 """Launch a controller as a regular external process."""
328
329 controller_cmd = List(find_controller_cmd(), config=True)
330 # Command line arguments to ipcontroller.
331 controller_args = List(['--log-to-file','--log-level', '40'], config=True)
332
333 def find_args(self):
334 return self.controller_cmd + self.controller_args + \
335 ['--working-dir', self.working_dir]
336
337 def start(self, cluster_dir):
338 """Start the controller by cluster_dir."""
339 self.controller_args.extend(['--cluster-dir', cluster_dir])
340 self.cluster_dir = unicode(cluster_dir)
341 log.msg("Starting LocalControllerLauncher: %r" % self.args)
342 return super(LocalControllerLauncher, self).start()
343
344
345 class LocalEngineLauncher(LocalProcessLauncher):
346 """Launch a single engine as a regular externall process."""
347
348 engine_cmd = List(find_engine_cmd(), config=True)
349 # Command line arguments for ipengine.
350 engine_args = List(
351 ['--log-to-file','--log-level', '40'], config=True
352 )
353
354 def find_args(self):
355 return self.engine_cmd + self.engine_args + \
356 ['--working-dir', self.working_dir]
357
358 def start(self, cluster_dir):
359 """Start the engine by cluster_dir."""
360 self.engine_args.extend(['--cluster-dir', cluster_dir])
361 self.cluster_dir = unicode(cluster_dir)
362 return super(LocalEngineLauncher, self).start()
363
364
365 class LocalEngineSetLauncher(BaseLauncher):
366 """Launch a set of engines as regular external processes."""
367
368 # Command line arguments for ipengine.
369 engine_args = List(
370 ['--log-to-file','--log-level', '40'], config=True
371 )
372
373 def __init__(self, working_dir, parent=None, name=None, config=None):
374 super(LocalEngineSetLauncher, self).__init__(
375 working_dir, parent, name, config
376 )
377 self.launchers = []
378
379 def start(self, n, cluster_dir):
380 """Start n engines by profile or cluster_dir."""
381 self.cluster_dir = unicode(cluster_dir)
382 dlist = []
383 for i in range(n):
384 el = LocalEngineLauncher(self.working_dir, self)
385 # Copy the engine args over to each engine launcher.
386 import copy
387 el.engine_args = copy.deepcopy(self.engine_args)
388 d = el.start(cluster_dir)
389 if i==0:
390 log.msg("Starting LocalEngineSetLauncher: %r" % el.args)
391 self.launchers.append(el)
392 dlist.append(d)
393 # The consumeErrors here could be dangerous
394 dfinal = gatherBoth(dlist, consumeErrors=True)
395 dfinal.addCallback(self.notify_start)
396 return dfinal
397
398 def find_args(self):
399 return ['engine set']
400
401 def signal(self, sig):
402 dlist = []
403 for el in self.launchers:
404 d = el.signal(sig)
405 dlist.append(d)
406 dfinal = gatherBoth(dlist, consumeErrors=True)
407 return dfinal
408
409 def interrupt_then_kill(self, delay=1.0):
410 dlist = []
411 for el in self.launchers:
412 d = el.interrupt_then_kill(delay)
413 dlist.append(d)
414 dfinal = gatherBoth(dlist, consumeErrors=True)
415 return dfinal
416
417 def stop(self):
418 return self.interrupt_then_kill()
419
420 def observe_stop(self):
421 dlist = [el.observe_stop() for el in self.launchers]
422 dfinal = gatherBoth(dlist, consumeErrors=False)
423 dfinal.addCallback(self.notify_stop)
424 return dfinal
425
426
427 #-----------------------------------------------------------------------------
428 # MPIExec launchers
429 #-----------------------------------------------------------------------------
430
431
281 class MPIExecLauncher(LocalProcessLauncher):
432 class MPIExecLauncher(LocalProcessLauncher):
282 """Launch an external process using mpiexec."""
433 """Launch an external process using mpiexec."""
283
434
284 # The mpiexec command to use in starting the process.
435 # The mpiexec command to use in starting the process.
285 mpi_cmd = List(['mpiexec'], config=True)
436 mpi_cmd = List(['mpiexec'], config=True)
286 # The command line arguments to pass to mpiexec.
437 # The command line arguments to pass to mpiexec.
287 mpi_args = List([], config=True)
438 mpi_args = List([], config=True)
288 # The program to start using mpiexec.
439 # The program to start using mpiexec.
289 program = List(['date'], config=True)
440 program = List(['date'], config=True)
290 # The command line argument to the program.
441 # The command line argument to the program.
291 program_args = List([], config=True)
442 program_args = List([], config=True)
292 # The number of instances of the program to start.
443 # The number of instances of the program to start.
293 n = Int(1, config=True)
444 n = Int(1, config=True)
294
445
295 def find_args(self):
446 def find_args(self):
296 """Build self.args using all the fields."""
447 """Build self.args using all the fields."""
297 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
448 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
298 self.program + self.program_args
449 self.program + self.program_args
299
450
300 def start(self, n):
451 def start(self, n):
301 """Start n instances of the program using mpiexec."""
452 """Start n instances of the program using mpiexec."""
302 self.n = n
453 self.n = n
303 return super(MPIExecLauncher, self).start()
454 return super(MPIExecLauncher, self).start()
304
455
305
456
457 class MPIExecControllerLauncher(MPIExecLauncher):
458 """Launch a controller using mpiexec."""
459
460 controller_cmd = List(find_controller_cmd(), config=True)
461 # Command line arguments to ipcontroller.
462 controller_args = List(['--log-to-file','--log-level', '40'], config=True)
463 n = Int(1, config=False)
464
465 def start(self, cluster_dir):
466 """Start the controller by cluster_dir."""
467 self.controller_args.extend(['--cluster-dir', cluster_dir])
468 self.cluster_dir = unicode(cluster_dir)
469 log.msg("Starting MPIExecControllerLauncher: %r" % self.args)
470 return super(MPIExecControllerLauncher, self).start(1)
471
472 def find_args(self):
473 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
474 self.controller_cmd + self.controller_args + \
475 ['--working-dir', self.working_dir]
476
477
478 class MPIExecEngineSetLauncher(MPIExecLauncher):
479
480 engine_cmd = List(find_engine_cmd(), config=True)
481 # Command line arguments for ipengine.
482 engine_args = List(
483 ['--log-to-file','--log-level', '40'], config=True
484 )
485 n = Int(1, config=True)
486
487 def start(self, n, cluster_dir):
488 """Start n engines by profile or cluster_dir."""
489 self.engine_args.extend(['--cluster-dir', cluster_dir])
490 self.cluster_dir = unicode(cluster_dir)
491 log.msg('Starting MPIExecEngineSetLauncher: %r' % self.args)
492 return super(MPIExecEngineSetLauncher, self).start(n)
493
494 def find_args(self):
495 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
496 self.engine_cmd + self.engine_args + \
497 ['--working-dir', self.working_dir]
498
499
500 #-----------------------------------------------------------------------------
501 # SSH launchers
502 #-----------------------------------------------------------------------------
503
504
306 class SSHLauncher(BaseLauncher):
505 class SSHLauncher(BaseLauncher):
307 """A minimal launcher for ssh.
506 """A minimal launcher for ssh.
308
507
309 To be useful this will probably have to be extended to use the ``sshx``
508 To be useful this will probably have to be extended to use the ``sshx``
310 idea for environment variables. There could be other things this needs
509 idea for environment variables. There could be other things this needs
311 as well.
510 as well.
312 """
511 """
313
512
314 ssh_cmd = List(['ssh'], config=True)
513 ssh_cmd = List(['ssh'], config=True)
315 ssh_args = List([], config=True)
514 ssh_args = List([], config=True)
316 program = List(['date'], config=True)
515 program = List(['date'], config=True)
317 program_args = List([], config=True)
516 program_args = List([], config=True)
318 hostname = Str('', config=True)
517 hostname = Str('', config=True)
319 user = Str('', config=True)
518 user = Str('', config=True)
320 location = Str('')
519 location = Str('')
321
520
322 def _hostname_changed(self, name, old, new):
521 def _hostname_changed(self, name, old, new):
323 self.location = '%s@%s' % (self.user, new)
522 self.location = '%s@%s' % (self.user, new)
324
523
325 def _user_changed(self, name, old, new):
524 def _user_changed(self, name, old, new):
326 self.location = '%s@%s' % (new, self.hostname)
525 self.location = '%s@%s' % (new, self.hostname)
327
526
328 def find_args(self):
527 def find_args(self):
329 return self.ssh_cmd + self.ssh_args + [self.location] + \
528 return self.ssh_cmd + self.ssh_args + [self.location] + \
330 self.program + self.program_args
529 self.program + self.program_args
331
530
332 def start(self, n, hostname=None, user=None):
531 def start(self, n, hostname=None, user=None):
333 if hostname is not None:
532 if hostname is not None:
334 self.hostname = hostname
533 self.hostname = hostname
335 if user is not None:
534 if user is not None:
336 self.user = user
535 self.user = user
337 return super(SSHLauncher, self).start()
536 return super(SSHLauncher, self).start()
338
537
339
538
539 class SSHControllerLauncher(SSHLauncher):
540 pass
541
542
543 class SSHEngineSetLauncher(BaseLauncher):
544 pass
545
546
547 #-----------------------------------------------------------------------------
548 # Windows HPC Server 2008 scheduler launchers
549 #-----------------------------------------------------------------------------
550
551
340 # This is only used on Windows.
552 # This is only used on Windows.
553 def find_job_cmd():
341 if os.name=='nt':
554 if os.name=='nt':
342 job_cmd = find_cmd('job')
555 return find_cmd('job')
343 else:
556 else:
344 job_cmd = 'job'
557 return 'job'
345
558
346
559
347 class WindowsHPCLauncher(BaseLauncher):
560 class WindowsHPCLauncher(BaseLauncher):
348
561
349 # A regular expression used to get the job id from the output of the
562 # A regular expression used to get the job id from the output of the
350 # submit_command.
563 # submit_command.
351 job_id_regexp = Str('\d+', config=True)
564 job_id_regexp = Str('\d+', config=True)
352 # The filename of the instantiated job script.
565 # The filename of the instantiated job script.
353 job_file_name = Unicode(u'ipython_job.xml', config=True)
566 job_file_name = Unicode(u'ipython_job.xml', config=True)
354 # The full path to the instantiated job script. This gets made dynamically
567 # The full path to the instantiated job script. This gets made dynamically
355 # by combining the working_dir with the job_file_name.
568 # by combining the working_dir with the job_file_name.
356 job_file = Unicode(u'')
569 job_file = Unicode(u'')
357 # The hostname of the scheduler to submit the job to
570 # The hostname of the scheduler to submit the job to
358 scheduler = Str('HEADNODE', config=True)
571 scheduler = Str('', config=True)
359 username = Str(os.environ.get('USERNAME', ''), config=True)
572 job_cmd = Str(find_job_cmd, config=True)
360 priority = Enum(('Lowest','BelowNormal','Normal','AboveNormal','Highest'),
361 default_value='Highest', config=True)
362 requested_nodes = Str('', config=True)
363 project = Str('MyProject', config=True)
364 job_cmd = Str(job_cmd, config=True)
365
573
366 def __init__(self, working_dir, parent=None, name=None, config=None):
574 def __init__(self, working_dir, parent=None, name=None, config=None):
367 super(WindowsHPCLauncher, self).__init__(
575 super(WindowsHPCLauncher, self).__init__(
368 working_dir, parent, name, config
576 working_dir, parent, name, config
369 )
577 )
370 self.job_file = os.path.join(self.working_dir, self.job_file_name)
578 self.job_file = os.path.join(self.working_dir, self.job_file_name)
371
579
580 @property
581 def job_file(self):
582 return os.path.join(self.working_dir, self.job_file_name)
583
372 def write_job_file(self, n):
584 def write_job_file(self, n):
373 raise NotImplementedError("Implement write_job_file in a subclass.")
585 raise NotImplementedError("Implement write_job_file in a subclass.")
374
586
375 def find_args(self):
587 def find_args(self):
376 return ['job.exe']
588 return ['job.exe']
377
589
378 def parse_job_id(self, output):
590 def parse_job_id(self, output):
379 """Take the output of the submit command and return the job id."""
591 """Take the output of the submit command and return the job id."""
380 m = re.search(self.job_id_regexp, output)
592 m = re.search(self.job_id_regexp, output)
381 if m is not None:
593 if m is not None:
382 job_id = m.group()
594 job_id = m.group()
383 else:
595 else:
384 raise LauncherError("Job id couldn't be determined: %s" % output)
596 raise LauncherError("Job id couldn't be determined: %s" % output)
385 self.job_id = job_id
597 self.job_id = job_id
386 log.msg('Job started with job id: %r' % job_id)
598 log.msg('Job started with job id: %r' % job_id)
387 return job_id
599 return job_id
388
600
389 @inlineCallbacks
601 @inlineCallbacks
390 def start(self, n):
602 def start(self, n):
391 """Start n copies of the process using the Win HPC job scheduler."""
603 """Start n copies of the process using the Win HPC job scheduler."""
392 self.write_job_file(n)
604 self.write_job_file(n)
393 args = [
605 args = [
394 'submit',
606 'submit',
395 '/jobfile:%s' % self.job_file,
607 '/jobfile:%s' % self.job_file,
396 '/scheduler:%s' % self.scheduler
608 '/scheduler:%s' % self.scheduler
397 ]
609 ]
398 log.msg("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
610 log.msg("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
399 output = yield getProcessOutput(self.job_cmd,
611 output = yield getProcessOutput(self.job_cmd,
400 args,
612 args,
401 env=os.environ,
613 env=os.environ,
402 path=self.working_dir
614 path=self.working_dir
403 )
615 )
404 job_id = self.parse_job_id(output)
616 job_id = self.parse_job_id(output)
405 self.notify_start(job_id)
617 self.notify_start(job_id)
406 defer.returnValue(job_id)
618 defer.returnValue(job_id)
407
619
408 @inlineCallbacks
620 @inlineCallbacks
409 def stop(self):
621 def stop(self):
410 args = [
622 args = [
411 'cancel',
623 'cancel',
412 self.job_id,
624 self.job_id,
413 '/scheduler:%s' % self.scheduler
625 '/scheduler:%s' % self.scheduler
414 ]
626 ]
415 log.msg("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
627 log.msg("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
416 try:
628 try:
417 output = yield getProcessOutput(self.job_cmd,
629 output = yield getProcessOutput(self.job_cmd,
418 args,
630 args,
419 env=os.environ,
631 env=os.environ,
420 path=self.working_dir
632 path=self.working_dir
421 )
633 )
422 except:
634 except:
423 output = 'The job already appears to be stoppped: %r' % self.job_id
635 output = 'The job already appears to be stoppped: %r' % self.job_id
424 self.notify_stop(output) # Pass the output of the kill cmd
636 self.notify_stop(output) # Pass the output of the kill cmd
425 defer.returnValue(output)
637 defer.returnValue(output)
426
638
427
639
640 class WindowsHPCControllerLauncher(WindowsHPCLauncher):
641
642 job_file_name = Unicode(u'ipcontroller_job.xml', config=True)
643 extra_args = List([], config=False)
644
645 def write_job_file(self, n):
646 job = IPControllerJob(self)
647
648 t = IPControllerTask(self)
649 # The tasks work directory is *not* the actual work directory of
650 # the controller. It is used as the base path for the stdout/stderr
651 # files that the scheduler redirects to.
652 t.work_directory = self.cluster_dir
653 # Add the --cluster-dir and --working-dir from self.start().
654 t.controller_args.extend(self.extra_args)
655 job.add_task(t)
656
657 log.msg("Writing job description file: %s" % self.job_file)
658 job.write(self.job_file)
659
660 @property
661 def job_file(self):
662 return os.path.join(self.cluster_dir, self.job_file_name)
663
664 def start(self, cluster_dir):
665 """Start the controller by cluster_dir."""
666 self.extra_args = [
667 '--cluster-dir', cluster_dir, '--working-dir', self.working_dir
668 ]
669 self.cluster_dir = unicode(cluster_dir)
670 return super(WindowsHPCControllerLauncher, self).start(1)
671
672
673 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):
674
675 job_file_name = Unicode(u'ipengineset_job.xml', config=True)
676 extra_args = List([], config=False)
677
678 def write_job_file(self, n):
679 job = IPControllerJob(self)
680
681 for i in range(n):
682 t = IPEngineTask(self)
683 # The tasks work directory is *not* the actual work directory of
684 # the engine. It is used as the base path for the stdout/stderr
685 # files that the scheduler redirects to.
686 t.work_directory = self.cluster_dir
687 # Add the --cluster-dir and --working-dir from self.start().
688 t.engine_args.extend(self.extra_args)
689 job.add_task(t)
690
691 log.msg("Writing job description file: %s" % self.job_file)
692 job.write(self.job_file)
693
694 @property
695 def job_file(self):
696 return os.path.join(self.cluster_dir, self.job_file_name)
697
698 def start(self, cluster_dir):
699 """Start the controller by cluster_dir."""
700 self.extra_args = [
701 '--cluster-dir', cluster_dir, '--working-dir', self.working_dir
702 ]
703 self.cluster_dir = unicode(cluster_dir)
704 return super(WindowsHPCControllerLauncher, self).start(n)
705
706
707 #-----------------------------------------------------------------------------
708 # Batch (PBS) system launchers
709 #-----------------------------------------------------------------------------
710
711
428 class BatchSystemLauncher(BaseLauncher):
712 class BatchSystemLauncher(BaseLauncher):
429 """Launch an external process using a batch system.
713 """Launch an external process using a batch system.
430
714
431 This class is designed to work with UNIX batch systems like PBS, LSF,
715 This class is designed to work with UNIX batch systems like PBS, LSF,
432 GridEngine, etc. The overall model is that there are different commands
716 GridEngine, etc. The overall model is that there are different commands
433 like qsub, qdel, etc. that handle the starting and stopping of the process.
717 like qsub, qdel, etc. that handle the starting and stopping of the process.
434
718
435 This class also has the notion of a batch script. The ``batch_template``
719 This class also has the notion of a batch script. The ``batch_template``
436 attribute can be set to a string that is a template for the batch script.
720 attribute can be set to a string that is a template for the batch script.
437 This template is instantiated using Itpl. Thus the template can use
721 This template is instantiated using Itpl. Thus the template can use
438 ${n} fot the number of instances. Subclasses can add additional variables
722 ${n} fot the number of instances. Subclasses can add additional variables
439 to the template dict.
723 to the template dict.
440 """
724 """
441
725
442 # Subclasses must fill these in. See PBSEngineSet
726 # Subclasses must fill these in. See PBSEngineSet
443 # The name of the command line program used to submit jobs.
727 # The name of the command line program used to submit jobs.
444 submit_command = Str('', config=True)
728 submit_command = Str('', config=True)
445 # The name of the command line program used to delete jobs.
729 # The name of the command line program used to delete jobs.
446 delete_command = Str('', config=True)
730 delete_command = Str('', config=True)
447 # A regular expression used to get the job id from the output of the
731 # A regular expression used to get the job id from the output of the
448 # submit_command.
732 # submit_command.
449 job_id_regexp = Str('', config=True)
733 job_id_regexp = Str('', config=True)
450 # The string that is the batch script template itself.
734 # The string that is the batch script template itself.
451 batch_template = Str('', config=True)
735 batch_template = Str('', config=True)
452 # The filename of the instantiated batch script.
736 # The filename of the instantiated batch script.
453 batch_file_name = Unicode(u'batch_script', config=True)
737 batch_file_name = Unicode(u'batch_script', config=True)
454 # The full path to the instantiated batch script.
738 # The full path to the instantiated batch script.
455 batch_file = Unicode(u'')
739 batch_file = Unicode(u'')
456
740
457 def __init__(self, working_dir, parent=None, name=None, config=None):
741 def __init__(self, working_dir, parent=None, name=None, config=None):
458 super(BatchSystemLauncher, self).__init__(
742 super(BatchSystemLauncher, self).__init__(
459 working_dir, parent, name, config
743 working_dir, parent, name, config
460 )
744 )
461 self.batch_file = os.path.join(self.working_dir, self.batch_file_name)
745 self.batch_file = os.path.join(self.working_dir, self.batch_file_name)
462 self.context = {}
746 self.context = {}
463
747
464 def parse_job_id(self, output):
748 def parse_job_id(self, output):
465 """Take the output of the submit command and return the job id."""
749 """Take the output of the submit command and return the job id."""
466 m = re.match(self.job_id_regexp, output)
750 m = re.match(self.job_id_regexp, output)
467 if m is not None:
751 if m is not None:
468 job_id = m.group()
752 job_id = m.group()
469 else:
753 else:
470 raise LauncherError("Job id couldn't be determined: %s" % output)
754 raise LauncherError("Job id couldn't be determined: %s" % output)
471 self.job_id = job_id
755 self.job_id = job_id
472 log.msg('Job started with job id: %r' % job_id)
756 log.msg('Job started with job id: %r' % job_id)
473 return job_id
757 return job_id
474
758
475 def write_batch_script(self, n):
759 def write_batch_script(self, n):
476 """Instantiate and write the batch script to the working_dir."""
760 """Instantiate and write the batch script to the working_dir."""
477 self.context['n'] = n
761 self.context['n'] = n
478 script_as_string = Itpl.itplns(self.batch_template, self.context)
762 script_as_string = Itpl.itplns(self.batch_template, self.context)
479 log.msg('Writing instantiated batch script: %s' % self.batch_file)
763 log.msg('Writing instantiated batch script: %s' % self.batch_file)
480 f = open(self.batch_file, 'w')
764 f = open(self.batch_file, 'w')
481 f.write(script_as_string)
765 f.write(script_as_string)
482 f.close()
766 f.close()
483
767
484 @inlineCallbacks
768 @inlineCallbacks
485 def start(self, n):
769 def start(self, n):
486 """Start n copies of the process using a batch system."""
770 """Start n copies of the process using a batch system."""
487 self.write_batch_script(n)
771 self.write_batch_script(n)
488 output = yield getProcessOutput(self.submit_command,
772 output = yield getProcessOutput(self.submit_command,
489 [self.batch_file], env=os.environ)
773 [self.batch_file], env=os.environ)
490 job_id = self.parse_job_id(output)
774 job_id = self.parse_job_id(output)
491 self.notify_start(job_id)
775 self.notify_start(job_id)
492 defer.returnValue(job_id)
776 defer.returnValue(job_id)
493
777
494 @inlineCallbacks
778 @inlineCallbacks
495 def stop(self):
779 def stop(self):
496 output = yield getProcessOutput(self.delete_command,
780 output = yield getProcessOutput(self.delete_command,
497 [self.job_id], env=os.environ
781 [self.job_id], env=os.environ
498 )
782 )
499 self.notify_stop(output) # Pass the output of the kill cmd
783 self.notify_stop(output) # Pass the output of the kill cmd
500 defer.returnValue(output)
784 defer.returnValue(output)
501
785
502
786
503 class PBSLauncher(BatchSystemLauncher):
787 class PBSLauncher(BatchSystemLauncher):
504 """A BatchSystemLauncher subclass for PBS."""
788 """A BatchSystemLauncher subclass for PBS."""
505
789
506 submit_command = Str('qsub', config=True)
790 submit_command = Str('qsub', config=True)
507 delete_command = Str('qdel', config=True)
791 delete_command = Str('qdel', config=True)
508 job_id_regexp = Str('\d+', config=True)
792 job_id_regexp = Str('\d+', config=True)
509 batch_template = Str('', config=True)
793 batch_template = Str('', config=True)
510 batch_file_name = Unicode(u'pbs_batch_script', config=True)
794 batch_file_name = Unicode(u'pbs_batch_script', config=True)
511 batch_file = Unicode(u'')
795 batch_file = Unicode(u'')
512
796
513
797
514 #-----------------------------------------------------------------------------
515 # Controller launchers
516 #-----------------------------------------------------------------------------
517
518 def find_controller_cmd():
519 """Find the command line ipcontroller program in a cross platform way."""
520 if sys.platform == 'win32':
521 # This logic is needed because the ipcontroller script doesn't
522 # always get installed in the same way or in the same location.
523 from IPython.kernel import ipcontrollerapp
524 script_location = ipcontrollerapp.__file__.replace('.pyc', '.py')
525 # The -u option here turns on unbuffered output, which is required
526 # on Win32 to prevent wierd conflict and problems with Twisted.
527 # Also, use sys.executable to make sure we are picking up the
528 # right python exe.
529 cmd = [sys.executable, '-u', script_location]
530 else:
531 # ipcontroller has to be on the PATH in this case.
532 cmd = ['ipcontroller']
533 return cmd
534
535
536 class LocalControllerLauncher(LocalProcessLauncher):
537 """Launch a controller as a regular external process."""
538
539 controller_cmd = List(find_controller_cmd(), config=True)
540 # Command line arguments to ipcontroller.
541 controller_args = List(['--log-to-file','--log-level', '40'], config=True)
542
543 def find_args(self):
544 return self.controller_cmd + self.controller_args
545
546 def start(self, profile=None, cluster_dir=None):
547 """Start the controller by profile or cluster_dir."""
548 if cluster_dir is not None:
549 self.controller_args.extend(['--cluster-dir', cluster_dir])
550 if profile is not None:
551 self.controller_args.extend(['--profile', profile])
552 log.msg("Starting LocalControllerLauncher: %r" % self.args)
553 return super(LocalControllerLauncher, self).start()
554
555
556 class WindowsHPCControllerLauncher(WindowsHPCLauncher):
557
558 job_file_name = Unicode(u'ipcontroller_job.xml', config=True)
559 extra_args = List([],config=False)
560
561 def write_job_file(self, n):
562 job = WinHPCJob(self)
563 job.job_name = "IPController"
564 job.username = self.username
565 job.priority = self.priority
566 job.requested_nodes = self.requested_nodes
567 job.project = self.project
568
569 t = IPControllerTask(self)
570 t.work_directory = self.working_dir
571 # Add the --profile and --cluster-dir args from start.
572 t.controller_args.extend(self.extra_args)
573 job.add_task(t)
574 log.msg("Writing job description file: %s" % self.job_file)
575 job.write(self.job_file)
576
577 def start(self, profile=None, cluster_dir=None):
578 """Start the controller by profile or cluster_dir."""
579 if cluster_dir is not None:
580 self.extra_args = ['--cluster-dir', cluster_dir]
581 if profile is not None:
582 self.extra_args = ['--profile', profile]
583 return super(WindowsHPCControllerLauncher, self).start(1)
584
585
586 class MPIExecControllerLauncher(MPIExecLauncher):
587 """Launch a controller using mpiexec."""
588
589 controller_cmd = List(find_controller_cmd(), config=True)
590 # Command line arguments to ipcontroller.
591 controller_args = List(['--log-to-file','--log-level', '40'], config=True)
592 n = Int(1, config=False)
593
594 def start(self, profile=None, cluster_dir=None):
595 """Start the controller by profile or cluster_dir."""
596 if cluster_dir is not None:
597 self.controller_args.extend(['--cluster-dir', cluster_dir])
598 if profile is not None:
599 self.controller_args.extend(['--profile', profile])
600 log.msg("Starting MPIExecControllerLauncher: %r" % self.args)
601 return super(MPIExecControllerLauncher, self).start(1)
602
603 def find_args(self):
604 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
605 self.controller_cmd + self.controller_args
606
607
608 class PBSControllerLauncher(PBSLauncher):
798 class PBSControllerLauncher(PBSLauncher):
609 """Launch a controller using PBS."""
799 """Launch a controller using PBS."""
610
800
611 batch_file_name = Unicode(u'pbs_batch_script_controller', config=True)
801 batch_file_name = Unicode(u'pbs_batch_script_controller', config=True)
612
802
613 def start(self, profile=None, cluster_dir=None):
803 def start(self, cluster_dir):
614 """Start the controller by profile or cluster_dir."""
804 """Start the controller by profile or cluster_dir."""
615 # Here we save profile and cluster_dir in the context so they
805 # Here we save profile and cluster_dir in the context so they
616 # can be used in the batch script template as ${profile} and
806 # can be used in the batch script template as ${profile} and
617 # ${cluster_dir}
807 # ${cluster_dir}
618 if cluster_dir is not None:
619 self.context['cluster_dir'] = cluster_dir
808 self.context['cluster_dir'] = cluster_dir
620 if profile is not None:
809 self.cluster_dir = unicode(cluster_dir)
621 self.context['profile'] = profile
622 log.msg("Starting PBSControllerLauncher: %r" % self.args)
810 log.msg("Starting PBSControllerLauncher: %r" % self.args)
623 return super(PBSControllerLauncher, self).start(1)
811 return super(PBSControllerLauncher, self).start(1)
624
812
625
813
626 class SSHControllerLauncher(SSHLauncher):
627 pass
628
629
630 #-----------------------------------------------------------------------------
631 # Engine launchers
632 #-----------------------------------------------------------------------------
633
634
635 def find_engine_cmd():
636 """Find the command line ipengine program in a cross platform way."""
637 if sys.platform == 'win32':
638 # This logic is needed because the ipengine script doesn't
639 # always get installed in the same way or in the same location.
640 from IPython.kernel import ipengineapp
641 script_location = ipengineapp.__file__.replace('.pyc', '.py')
642 # The -u option here turns on unbuffered output, which is required
643 # on Win32 to prevent wierd conflict and problems with Twisted.
644 # Also, use sys.executable to make sure we are picking up the
645 # right python exe.
646 cmd = [sys.executable, '-u', script_location]
647 else:
648 # ipcontroller has to be on the PATH in this case.
649 cmd = ['ipengine']
650 return cmd
651
652
653 class LocalEngineLauncher(LocalProcessLauncher):
654 """Launch a single engine as a regular externall process."""
655
656 engine_cmd = List(find_engine_cmd(), config=True)
657 # Command line arguments for ipengine.
658 engine_args = List(
659 ['--log-to-file','--log-level', '40'], config=True
660 )
661
662 def find_args(self):
663 return self.engine_cmd + self.engine_args
664
665 def start(self, profile=None, cluster_dir=None):
666 """Start the engine by profile or cluster_dir."""
667 if cluster_dir is not None:
668 self.engine_args.extend(['--cluster-dir', cluster_dir])
669 if profile is not None:
670 self.engine_args.extend(['--profile', profile])
671 return super(LocalEngineLauncher, self).start()
672
673
674 class LocalEngineSetLauncher(BaseLauncher):
675 """Launch a set of engines as regular external processes."""
676
677 # Command line arguments for ipengine.
678 engine_args = List(
679 ['--log-to-file','--log-level', '40'], config=True
680 )
681
682 def __init__(self, working_dir, parent=None, name=None, config=None):
683 super(LocalEngineSetLauncher, self).__init__(
684 working_dir, parent, name, config
685 )
686 self.launchers = []
687
688 def start(self, n, profile=None, cluster_dir=None):
689 """Start n engines by profile or cluster_dir."""
690 dlist = []
691 for i in range(n):
692 el = LocalEngineLauncher(self.working_dir, self)
693 # Copy the engine args over to each engine launcher.
694 import copy
695 el.engine_args = copy.deepcopy(self.engine_args)
696 d = el.start(profile, cluster_dir)
697 if i==0:
698 log.msg("Starting LocalEngineSetLauncher: %r" % el.args)
699 self.launchers.append(el)
700 dlist.append(d)
701 # The consumeErrors here could be dangerous
702 dfinal = gatherBoth(dlist, consumeErrors=True)
703 dfinal.addCallback(self.notify_start)
704 return dfinal
705
706 def find_args(self):
707 return ['engine set']
708
709 def signal(self, sig):
710 dlist = []
711 for el in self.launchers:
712 d = el.signal(sig)
713 dlist.append(d)
714 dfinal = gatherBoth(dlist, consumeErrors=True)
715 return dfinal
716
717 def interrupt_then_kill(self, delay=1.0):
718 dlist = []
719 for el in self.launchers:
720 d = el.interrupt_then_kill(delay)
721 dlist.append(d)
722 dfinal = gatherBoth(dlist, consumeErrors=True)
723 return dfinal
724
725 def stop(self):
726 return self.interrupt_then_kill()
727
728 def observe_stop(self):
729 dlist = [el.observe_stop() for el in self.launchers]
730 dfinal = gatherBoth(dlist, consumeErrors=False)
731 dfinal.addCallback(self.notify_stop)
732 return dfinal
733
734
735 class MPIExecEngineSetLauncher(MPIExecLauncher):
736
737 engine_cmd = List(find_engine_cmd(), config=True)
738 # Command line arguments for ipengine.
739 engine_args = List(
740 ['--log-to-file','--log-level', '40'], config=True
741 )
742 n = Int(1, config=True)
743
744 def start(self, n, profile=None, cluster_dir=None):
745 """Start n engines by profile or cluster_dir."""
746 if cluster_dir is not None:
747 self.engine_args.extend(['--cluster-dir', cluster_dir])
748 if profile is not None:
749 self.engine_args.extend(['--profile', profile])
750 log.msg('Starting MPIExecEngineSetLauncher: %r' % self.args)
751 return super(MPIExecEngineSetLauncher, self).start(n)
752
753 def find_args(self):
754 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
755 self.engine_cmd + self.engine_args
756
757
758 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):
759 pass
760
761
762 class PBSEngineSetLauncher(PBSLauncher):
814 class PBSEngineSetLauncher(PBSLauncher):
763
815
764 batch_file_name = Unicode(u'pbs_batch_script_engines', config=True)
816 batch_file_name = Unicode(u'pbs_batch_script_engines', config=True)
765
817
766 def start(self, n, profile=None, cluster_dir=None):
818 def start(self, n, cluster_dir):
767 """Start n engines by profile or cluster_dir."""
819 """Start n engines by profile or cluster_dir."""
768 if cluster_dir is not None:
769 self.program_args.extend(['--cluster-dir', cluster_dir])
820 self.program_args.extend(['--cluster-dir', cluster_dir])
770 if profile is not None:
821 self.cluster_dir = unicode(cluster_dir)
771 self.program_args.extend(['-p', profile])
772 log.msg('Starting PBSEngineSetLauncher: %r' % self.args)
822 log.msg('Starting PBSEngineSetLauncher: %r' % self.args)
773 return super(PBSEngineSetLauncher, self).start(n)
823 return super(PBSEngineSetLauncher, self).start(n)
774
824
775
825
776 class SSHEngineSetLauncher(BaseLauncher):
777 pass
778
779
780 #-----------------------------------------------------------------------------
826 #-----------------------------------------------------------------------------
781 # A launcher for ipcluster itself!
827 # A launcher for ipcluster itself!
782 #-----------------------------------------------------------------------------
828 #-----------------------------------------------------------------------------
783
829
784
830
785 def find_ipcluster_cmd():
831 def find_ipcluster_cmd():
786 """Find the command line ipcluster program in a cross platform way."""
832 """Find the command line ipcluster program in a cross platform way."""
787 if sys.platform == 'win32':
833 if sys.platform == 'win32':
788 # This logic is needed because the ipcluster script doesn't
834 # This logic is needed because the ipcluster script doesn't
789 # always get installed in the same way or in the same location.
835 # always get installed in the same way or in the same location.
790 from IPython.kernel import ipclusterapp
836 from IPython.kernel import ipclusterapp
791 script_location = ipclusterapp.__file__.replace('.pyc', '.py')
837 script_location = ipclusterapp.__file__.replace('.pyc', '.py')
792 # The -u option here turns on unbuffered output, which is required
838 # The -u option here turns on unbuffered output, which is required
793 # on Win32 to prevent wierd conflict and problems with Twisted.
839 # on Win32 to prevent wierd conflict and problems with Twisted.
794 # Also, use sys.executable to make sure we are picking up the
840 # Also, use sys.executable to make sure we are picking up the
795 # right python exe.
841 # right python exe.
796 cmd = [sys.executable, '-u', script_location]
842 cmd = [sys.executable, '-u', script_location]
797 else:
843 else:
798 # ipcontroller has to be on the PATH in this case.
844 # ipcontroller has to be on the PATH in this case.
799 cmd = ['ipcluster']
845 cmd = ['ipcluster']
800 return cmd
846 return cmd
801
847
802
848
803 class IPClusterLauncher(LocalProcessLauncher):
849 class IPClusterLauncher(LocalProcessLauncher):
804 """Launch the ipcluster program in an external process."""
850 """Launch the ipcluster program in an external process."""
805
851
806 ipcluster_cmd = List(find_ipcluster_cmd(), config=True)
852 ipcluster_cmd = List(find_ipcluster_cmd(), config=True)
807 # Command line arguments to pass to ipcluster.
853 # Command line arguments to pass to ipcluster.
808 ipcluster_args = List(
854 ipcluster_args = List(
809 ['--clean-logs', '--log-to-file', '--log-level', '40'], config=True)
855 ['--clean-logs', '--log-to-file', '--log-level', '40'], config=True)
810 ipcluster_subcommand = Str('start')
856 ipcluster_subcommand = Str('start')
811 ipcluster_n = Int(2)
857 ipcluster_n = Int(2)
812
858
813 def find_args(self):
859 def find_args(self):
814 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
860 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
815 ['-n', repr(self.ipcluster_n)] + self.ipcluster_args
861 ['-n', repr(self.ipcluster_n)] + self.ipcluster_args
816
862
817 def start(self):
863 def start(self):
818 log.msg("Starting ipcluster: %r" % self.args)
864 log.msg("Starting ipcluster: %r" % self.args)
819 return super(IPClusterLauncher, self).start()
865 return super(IPClusterLauncher, self).start()
820
866
@@ -1,277 +1,306 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 Job and task components for writing .xml files that the Windows HPC Server
4 Job and task components for writing .xml files that the Windows HPC Server
5 2008 can use to start jobs.
5 2008 can use to start jobs.
6 """
6 """
7
7
8 #-----------------------------------------------------------------------------
8 #-----------------------------------------------------------------------------
9 # Copyright (C) 2008-2009 The IPython Development Team
9 # Copyright (C) 2008-2009 The IPython Development Team
10 #
10 #
11 # Distributed under the terms of the BSD License. The full license is in
11 # Distributed under the terms of the BSD License. The full license is in
12 # the file COPYING, distributed as part of this software.
12 # the file COPYING, distributed as part of this software.
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14
14
15 #-----------------------------------------------------------------------------
15 #-----------------------------------------------------------------------------
16 # Imports
16 # Imports
17 #-----------------------------------------------------------------------------
17 #-----------------------------------------------------------------------------
18
18
19 from __future__ import with_statement
19 from __future__ import with_statement
20
20
21 import os
21 import os
22 import re
22 import re
23 import uuid
23 import uuid
24
24
25 from xml.etree import ElementTree as ET
25 from xml.etree import ElementTree as ET
26 from xml.dom import minidom
26 from xml.dom import minidom
27
27
28 from IPython.core.component import Component
28 from IPython.core.component import Component
29 from IPython.external import Itpl
29 from IPython.external import Itpl
30 from IPython.utils.traitlets import (
30 from IPython.utils.traitlets import (
31 Str, Int, List, Unicode, Instance,
31 Str, Int, List, Unicode, Instance,
32 Enum, Bool, CStr
32 Enum, Bool, CStr
33 )
33 )
34
34
35 #-----------------------------------------------------------------------------
35 #-----------------------------------------------------------------------------
36 # Job and Task Component
36 # Job and Task Component
37 #-----------------------------------------------------------------------------
37 #-----------------------------------------------------------------------------
38
38
39
39
40 def as_str(value):
40 def as_str(value):
41 if isinstance(value, str):
41 if isinstance(value, str):
42 return value
42 return value
43 elif isinstance(value, bool):
43 elif isinstance(value, bool):
44 if value:
44 if value:
45 return 'true'
45 return 'true'
46 else:
46 else:
47 return 'false'
47 return 'false'
48 elif isinstance(value, (int, float)):
48 elif isinstance(value, (int, float)):
49 return repr(value)
49 return repr(value)
50 else:
50 else:
51 return value
51 return value
52
52
53
53
54 def indent(elem, level=0):
54 def indent(elem, level=0):
55 i = "\n" + level*" "
55 i = "\n" + level*" "
56 if len(elem):
56 if len(elem):
57 if not elem.text or not elem.text.strip():
57 if not elem.text or not elem.text.strip():
58 elem.text = i + " "
58 elem.text = i + " "
59 if not elem.tail or not elem.tail.strip():
59 if not elem.tail or not elem.tail.strip():
60 elem.tail = i
60 elem.tail = i
61 for elem in elem:
61 for elem in elem:
62 indent(elem, level+1)
62 indent(elem, level+1)
63 if not elem.tail or not elem.tail.strip():
63 if not elem.tail or not elem.tail.strip():
64 elem.tail = i
64 elem.tail = i
65 else:
65 else:
66 if level and (not elem.tail or not elem.tail.strip()):
66 if level and (not elem.tail or not elem.tail.strip()):
67 elem.tail = i
67 elem.tail = i
68
68
69
69
70 def find_username():
71 domain = os.environ.get('USERDOMAIN')
72 username = os.environ.get('USERNAME','')
73 if domain is None:
74 return username
75 else:
76 return '%s\\%s' % (domain, username)
77
78
70 class WinHPCJob(Component):
79 class WinHPCJob(Component):
71
80
72 job_id = Str('')
81 job_id = Str('')
73 job_name = Str('MyJob', config=True)
82 job_name = Str('MyJob', config=True)
74 min_cores = Int(1, config=True)
83 min_cores = Int(1, config=True)
75 max_cores = Int(1, config=True)
84 max_cores = Int(1, config=True)
76 min_sockets = Int(1, config=True)
85 min_sockets = Int(1, config=True)
77 max_sockets = Int(1, config=True)
86 max_sockets = Int(1, config=True)
78 min_nodes = Int(1, config=True)
87 min_nodes = Int(1, config=True)
79 max_nodes = Int(1, config=True)
88 max_nodes = Int(1, config=True)
80 unit_type = Str("Core", config=True)
89 unit_type = Str("Core", config=True)
81 auto_calculate_min = Bool(True, config=True)
90 auto_calculate_min = Bool(True, config=True)
82 auto_calculate_max = Bool(True, config=True)
91 auto_calculate_max = Bool(True, config=True)
83 run_until_canceled = Bool(False, config=True)
92 run_until_canceled = Bool(False, config=True)
84 is_exclusive = Bool(False, config=True)
93 is_exclusive = Bool(False, config=True)
85 username = Str(os.environ.get('USERNAME', ''), config=True)
94 username = Str(find_username(), config=True)
86 job_type = Str('Batch', config=True)
95 job_type = Str('Batch', config=True)
87 priority = Enum(('Lowest','BelowNormal','Normal','AboveNormal','Highest'),
96 priority = Enum(('Lowest','BelowNormal','Normal','AboveNormal','Highest'),
88 default_value='Highest', config=True)
97 default_value='Highest', config=True)
89 requested_nodes = Str('', config=True)
98 requested_nodes = Str('', config=True)
90 project = Str('IPython', config=True)
99 project = Str('IPython', config=True)
91 xmlns = Str('http://schemas.microsoft.com/HPCS2008/scheduler/')
100 xmlns = Str('http://schemas.microsoft.com/HPCS2008/scheduler/')
92 version = Str("2.000")
101 version = Str("2.000")
93 tasks = List([])
102 tasks = List([])
94
103
95 @property
104 @property
96 def owner(self):
105 def owner(self):
97 return self.username
106 return self.username
98
107
99 def _write_attr(self, root, attr, key):
108 def _write_attr(self, root, attr, key):
100 s = as_str(getattr(self, attr, ''))
109 s = as_str(getattr(self, attr, ''))
101 if s:
110 if s:
102 root.set(key, s)
111 root.set(key, s)
103
112
104 def as_element(self):
113 def as_element(self):
105 # We have to add _A_ type things to get the right order than
114 # We have to add _A_ type things to get the right order than
106 # the MSFT XML parser expects.
115 # the MSFT XML parser expects.
107 root = ET.Element('Job')
116 root = ET.Element('Job')
108 self._write_attr(root, 'version', '_A_Version')
117 self._write_attr(root, 'version', '_A_Version')
109 self._write_attr(root, 'job_name', '_B_Name')
118 self._write_attr(root, 'job_name', '_B_Name')
110 self._write_attr(root, 'unit_type', '_C_UnitType')
119 self._write_attr(root, 'unit_type', '_C_UnitType')
111 self._write_attr(root, 'min_cores', '_D_MinCores')
120 self._write_attr(root, 'min_cores', '_D_MinCores')
112 self._write_attr(root, 'max_cores', '_E_MaxCores')
121 self._write_attr(root, 'max_cores', '_E_MaxCores')
113 self._write_attr(root, 'min_sockets', '_F_MinSockets')
122 self._write_attr(root, 'min_sockets', '_F_MinSockets')
114 self._write_attr(root, 'max_sockets', '_G_MaxSockets')
123 self._write_attr(root, 'max_sockets', '_G_MaxSockets')
115 self._write_attr(root, 'min_nodes', '_H_MinNodes')
124 self._write_attr(root, 'min_nodes', '_H_MinNodes')
116 self._write_attr(root, 'max_nodes', '_I_MaxNodes')
125 self._write_attr(root, 'max_nodes', '_I_MaxNodes')
117 self._write_attr(root, 'run_until_canceled', '_J_RunUntilCanceled')
126 self._write_attr(root, 'run_until_canceled', '_J_RunUntilCanceled')
118 self._write_attr(root, 'is_exclusive', '_K_IsExclusive')
127 self._write_attr(root, 'is_exclusive', '_K_IsExclusive')
119 self._write_attr(root, 'username', '_L_UserName')
128 self._write_attr(root, 'username', '_L_UserName')
120 self._write_attr(root, 'job_type', '_M_JobType')
129 self._write_attr(root, 'job_type', '_M_JobType')
121 self._write_attr(root, 'priority', '_N_Priority')
130 self._write_attr(root, 'priority', '_N_Priority')
122 self._write_attr(root, 'requested_nodes', '_O_RequestedNodes')
131 self._write_attr(root, 'requested_nodes', '_O_RequestedNodes')
123 self._write_attr(root, 'auto_calculate_max', '_P_AutoCalculateMax')
132 self._write_attr(root, 'auto_calculate_max', '_P_AutoCalculateMax')
124 self._write_attr(root, 'auto_calculate_min', '_Q_AutoCalculateMin')
133 self._write_attr(root, 'auto_calculate_min', '_Q_AutoCalculateMin')
125 self._write_attr(root, 'project', '_R_Project')
134 self._write_attr(root, 'project', '_R_Project')
126 self._write_attr(root, 'owner', '_S_Owner')
135 self._write_attr(root, 'owner', '_S_Owner')
127 self._write_attr(root, 'xmlns', '_T_xmlns')
136 self._write_attr(root, 'xmlns', '_T_xmlns')
128 dependencies = ET.SubElement(root, "Dependencies")
137 dependencies = ET.SubElement(root, "Dependencies")
129 etasks = ET.SubElement(root, "Tasks")
138 etasks = ET.SubElement(root, "Tasks")
130 for t in self.tasks:
139 for t in self.tasks:
131 etasks.append(t.as_element())
140 etasks.append(t.as_element())
132 return root
141 return root
133
142
134 def tostring(self):
143 def tostring(self):
135 """Return the string representation of the job description XML."""
144 """Return the string representation of the job description XML."""
136 root = self.as_element()
145 root = self.as_element()
137 indent(root)
146 indent(root)
138 txt = ET.tostring(root, encoding="utf-8")
147 txt = ET.tostring(root, encoding="utf-8")
139 # Now remove the tokens used to order the attributes.
148 # Now remove the tokens used to order the attributes.
140 txt = re.sub(r'_[A-Z]_','',txt)
149 txt = re.sub(r'_[A-Z]_','',txt)
141 txt = '<?xml version="1.0" encoding="utf-8"?>\n' + txt
150 txt = '<?xml version="1.0" encoding="utf-8"?>\n' + txt
142 return txt
151 return txt
143
152
144 def write(self, filename):
153 def write(self, filename):
145 """Write the XML job description to a file."""
154 """Write the XML job description to a file."""
146 txt = self.tostring()
155 txt = self.tostring()
147 with open(filename, 'w') as f:
156 with open(filename, 'w') as f:
148 f.write(txt)
157 f.write(txt)
149
158
150 def add_task(self, task):
159 def add_task(self, task):
151 """Add a task to the job.
160 """Add a task to the job.
152
161
153 Parameters
162 Parameters
154 ----------
163 ----------
155 task : :class:`WinHPCTask`
164 task : :class:`WinHPCTask`
156 The task object to add.
165 The task object to add.
157 """
166 """
158 self.tasks.append(task)
167 self.tasks.append(task)
159
168
160
169
161 class WinHPCTask(Component):
170 class WinHPCTask(Component):
162
171
163 task_id = Str('')
172 task_id = Str('')
164 task_name = Str('')
173 task_name = Str('')
165 version = Str("2.000")
174 version = Str("2.000")
166 min_cores = Int(1, config=True)
175 min_cores = Int(1, config=True)
167 max_cores = Int(1, config=True)
176 max_cores = Int(1, config=True)
168 min_sockets = Int(1, config=True)
177 min_sockets = Int(1, config=True)
169 max_sockets = Int(1, config=True)
178 max_sockets = Int(1, config=True)
170 min_nodes = Int(1, config=True)
179 min_nodes = Int(1, config=True)
171 max_nodes = Int(1, config=True)
180 max_nodes = Int(1, config=True)
172 unit_type = Str("Core", config=True)
181 unit_type = Str("Core", config=True)
173 command_line = CStr('', config=True)
182 command_line = CStr('', config=True)
174 work_directory = CStr('', config=True)
183 work_directory = CStr('', config=True)
175 is_rerunnaable = Bool(True, config=True)
184 is_rerunnaable = Bool(True, config=True)
176 std_out_file_path = CStr('', config=True)
185 std_out_file_path = CStr('', config=True)
177 std_err_file_path = CStr('', config=True)
186 std_err_file_path = CStr('', config=True)
178 is_parametric = Bool(False, config=True)
187 is_parametric = Bool(False, config=True)
179 environment_variables = Instance(dict, args=(), config=True)
188 environment_variables = Instance(dict, args=(), config=True)
180
189
181 def _write_attr(self, root, attr, key):
190 def _write_attr(self, root, attr, key):
182 s = as_str(getattr(self, attr, ''))
191 s = as_str(getattr(self, attr, ''))
183 if s:
192 if s:
184 root.set(key, s)
193 root.set(key, s)
185
194
186 def as_element(self):
195 def as_element(self):
187 root = ET.Element('Task')
196 root = ET.Element('Task')
188 self._write_attr(root, 'version', '_A_Version')
197 self._write_attr(root, 'version', '_A_Version')
189 self._write_attr(root, 'task_name', '_B_Name')
198 self._write_attr(root, 'task_name', '_B_Name')
190 self._write_attr(root, 'min_cores', '_C_MinCores')
199 self._write_attr(root, 'min_cores', '_C_MinCores')
191 self._write_attr(root, 'max_cores', '_D_MaxCores')
200 self._write_attr(root, 'max_cores', '_D_MaxCores')
192 self._write_attr(root, 'min_sockets', '_E_MinSockets')
201 self._write_attr(root, 'min_sockets', '_E_MinSockets')
193 self._write_attr(root, 'max_sockets', '_F_MaxSockets')
202 self._write_attr(root, 'max_sockets', '_F_MaxSockets')
194 self._write_attr(root, 'min_nodes', '_G_MinNodes')
203 self._write_attr(root, 'min_nodes', '_G_MinNodes')
195 self._write_attr(root, 'max_nodes', '_H_MaxNodes')
204 self._write_attr(root, 'max_nodes', '_H_MaxNodes')
196 self._write_attr(root, 'command_line', '_I_CommandLine')
205 self._write_attr(root, 'command_line', '_I_CommandLine')
197 self._write_attr(root, 'work_directory', '_J_WorkDirectory')
206 self._write_attr(root, 'work_directory', '_J_WorkDirectory')
198 self._write_attr(root, 'is_rerunnaable', '_K_IsRerunnable')
207 self._write_attr(root, 'is_rerunnaable', '_K_IsRerunnable')
199 self._write_attr(root, 'std_out_file_path', '_L_StdOutFilePath')
208 self._write_attr(root, 'std_out_file_path', '_L_StdOutFilePath')
200 self._write_attr(root, 'std_err_file_path', '_M_StdErrFilePath')
209 self._write_attr(root, 'std_err_file_path', '_M_StdErrFilePath')
201 self._write_attr(root, 'is_parametric', '_N_IsParametric')
210 self._write_attr(root, 'is_parametric', '_N_IsParametric')
202 self._write_attr(root, 'unit_type', '_O_UnitType')
211 self._write_attr(root, 'unit_type', '_O_UnitType')
203 root.append(self.get_env_vars())
212 root.append(self.get_env_vars())
204 return root
213 return root
205
214
206 def get_env_vars(self):
215 def get_env_vars(self):
207 env_vars = ET.Element('EnvironmentVariables')
216 env_vars = ET.Element('EnvironmentVariables')
208 for k, v in self.environment_variables.items():
217 for k, v in self.environment_variables.items():
209 variable = ET.SubElement(env_vars, "Variable")
218 variable = ET.SubElement(env_vars, "Variable")
210 name = ET.SubElement(variable, "Name")
219 name = ET.SubElement(variable, "Name")
211 name.text = k
220 name.text = k
212 value = ET.SubElement(variable, "Value")
221 value = ET.SubElement(variable, "Value")
213 value.text = v
222 value.text = v
214 return env_vars
223 return env_vars
215
224
216
225
217
226
218 # By declaring these, we can configure the controller and engine separately!
227 # By declaring these, we can configure the controller and engine separately!
219
228
229 class IPControllerJob(WinHPCJob):
230 job_name = Str('IPController', config=False)
231 is_exclusive = Bool(False, config=True)
232 username = Str(find_username(), config=True)
233 priority = Enum(('Lowest','BelowNormal','Normal','AboveNormal','Highest'),
234 default_value='Highest', config=True)
235 requested_nodes = Str('', config=True)
236 project = Str('IPython', config=True)
237
238
239 class IPEngineSetJob(WinHPCJob):
240 job_name = Str('IPEngineSet', config=False)
241 is_exclusive = Bool(False, config=True)
242 username = Str(find_username(), config=True)
243 priority = Enum(('Lowest','BelowNormal','Normal','AboveNormal','Highest'),
244 default_value='Highest', config=True)
245 requested_nodes = Str('', config=True)
246 project = Str('IPython', config=True)
247
248
220 class IPControllerTask(WinHPCTask):
249 class IPControllerTask(WinHPCTask):
221
250
222 task_name = Str('IPController', config=True)
251 task_name = Str('IPController', config=True)
223 controller_cmd = List(['ipcontroller.exe'], config=True)
252 controller_cmd = List(['ipcontroller.exe'], config=True)
224 controller_args = List(['--log-to-file', '--log-level', '40'], config=True)
253 controller_args = List(['--log-to-file', '--log-level', '40'], config=True)
225 # I don't want these to be configurable
254 # I don't want these to be configurable
226 std_out_file_path = CStr(os.path.join('log','ipcontroller-out.txt'), config=False)
255 std_out_file_path = CStr(os.path.join('log','ipcontroller-out.txt'), config=False)
227 std_err_file_path = CStr(os.path.join('log','ipcontroller-err.txt'), config=False)
256 std_err_file_path = CStr(os.path.join('log','ipcontroller-err.txt'), config=False)
228 min_cores = Int(1, config=False)
257 min_cores = Int(1, config=False)
229 max_cores = Int(1, config=False)
258 max_cores = Int(1, config=False)
230 min_sockets = Int(1, config=False)
259 min_sockets = Int(1, config=False)
231 max_sockets = Int(1, config=False)
260 max_sockets = Int(1, config=False)
232 min_nodes = Int(1, config=False)
261 min_nodes = Int(1, config=False)
233 max_nodes = Int(1, config=False)
262 max_nodes = Int(1, config=False)
234 unit_type = Str("Core", config=False)
263 unit_type = Str("Core", config=False)
235 work_directory = CStr('', config=False)
264 work_directory = CStr('', config=False)
236
265
237 @property
266 @property
238 def command_line(self):
267 def command_line(self):
239 return ' '.join(self.controller_cmd + self.controller_args)
268 return ' '.join(self.controller_cmd + self.controller_args)
240
269
241
270
242 class IPEngineTask(WinHPCTask):
271 class IPEngineTask(WinHPCTask):
243
272
244 task_name = Str('IPEngine', config=True)
273 task_name = Str('IPEngine', config=True)
245 engine_cmd = List(['ipengine.exe'], config=True)
274 engine_cmd = List(['ipengine.exe'], config=True)
246 engine_args = List(['--log-to-file', '--log-level', '40'], config=True)
275 engine_args = List(['--log-to-file', '--log-level', '40'], config=True)
247 # I don't want these to be configurable
276 # I don't want these to be configurable
248 std_out_file_path = CStr(os.path.join('log','ipengine-out-%s.txt' % uuid.uuid1()), config=False)
277 std_out_file_path = CStr(os.path.join('log','ipengine-out-%s.txt' % uuid.uuid1()), config=False)
249 std_err_file_path = CStr(os.path.join('log','ipengine-err-%s.txt' % uuid.uuid1()), config=False)
278 std_err_file_path = CStr(os.path.join('log','ipengine-err-%s.txt' % uuid.uuid1()), config=False)
250 min_cores = Int(1, config=False)
279 min_cores = Int(1, config=False)
251 max_cores = Int(1, config=False)
280 max_cores = Int(1, config=False)
252 min_sockets = Int(1, config=False)
281 min_sockets = Int(1, config=False)
253 max_sockets = Int(1, config=False)
282 max_sockets = Int(1, config=False)
254 min_nodes = Int(1, config=False)
283 min_nodes = Int(1, config=False)
255 max_nodes = Int(1, config=False)
284 max_nodes = Int(1, config=False)
256 unit_type = Str("Core", config=False)
285 unit_type = Str("Core", config=False)
257 work_directory = CStr('', config=False)
286 work_directory = CStr('', config=False)
258
287
259 @property
288 @property
260 def command_line(self):
289 def command_line(self):
261 return ' '.join(self.engine_cmd + self.engine_args)
290 return ' '.join(self.engine_cmd + self.engine_args)
262
291
263
292
264 # j = WinHPCJob(None)
293 # j = WinHPCJob(None)
265 # j.job_name = 'IPCluster'
294 # j.job_name = 'IPCluster'
266 # j.username = 'GNET\\bgranger'
295 # j.username = 'GNET\\bgranger'
267 # j.requested_nodes = 'GREEN'
296 # j.requested_nodes = 'GREEN'
268 #
297 #
269 # t = WinHPCTask(None)
298 # t = WinHPCTask(None)
270 # t.task_name = 'Controller'
299 # t.task_name = 'Controller'
271 # t.command_line = r"\\blue\domainusers$\bgranger\Python\Python25\Scripts\ipcontroller.exe --log-to-file -p default --log-level 10"
300 # t.command_line = r"\\blue\domainusers$\bgranger\Python\Python25\Scripts\ipcontroller.exe --log-to-file -p default --log-level 10"
272 # t.work_directory = r"\\blue\domainusers$\bgranger\.ipython\cluster_default"
301 # t.work_directory = r"\\blue\domainusers$\bgranger\.ipython\cluster_default"
273 # t.std_out_file_path = 'controller-out.txt'
302 # t.std_out_file_path = 'controller-out.txt'
274 # t.std_err_file_path = 'controller-err.txt'
303 # t.std_err_file_path = 'controller-err.txt'
275 # t.environment_variables['PYTHONPATH'] = r"\\blue\domainusers$\bgranger\Python\Python25\Lib\site-packages"
304 # t.environment_variables['PYTHONPATH'] = r"\\blue\domainusers$\bgranger\Python\Python25\Lib\site-packages"
276 # j.add_task(t)
305 # j.add_task(t)
277
306
General Comments 0
You need to be logged in to leave comments. Login now