##// END OF EJS Templates
More work on the launchers and Win HPC support.
Brian Granger -
Show More
@@ -1,198 +1,202 b''
1 1 import os
2 2
3 3 c = get_config()
4 4
5 5 #-----------------------------------------------------------------------------
6 6 # Select which launchers to use
7 7 #-----------------------------------------------------------------------------
8 8
9 9 # This allows you to control what method is used to start the controller
10 10 # and engines. The following methods are currently supported:
11 # * Start as a regular process on localhost.
12 # * Start using mpiexec.
13 # * Start using PBS
14 # * Start using SSH (currently broken)
11 # - Start as a regular process on localhost.
12 # - Start using mpiexec.
13 # - Start using the Windows HPC Server 2008 scheduler
14 # - Start using PBS
15 # - Start using SSH (currently broken)
16
15 17
16 18 # The selected launchers can be configured below.
17 19
18 # Options are (LocalControllerLauncher, MPIExecControllerLauncher,
19 # PBSControllerLauncher, WindowsHPCControllerLauncher)
20 # Options are:
21 # - LocalControllerLauncher
22 # - MPIExecControllerLauncher
23 # - PBSControllerLauncher
24 # - WindowsHPCControllerLauncher
20 25 # c.Global.controller_launcher = 'IPython.kernel.launcher.LocalControllerLauncher'
21 26
22 # Options are (LocalEngineSetLauncher, MPIExecEngineSetLauncher,
23 # PBSEngineSetLauncher)
27 # Options are:
28 # - LocalEngineSetLauncher
29 # - MPIExecEngineSetLauncher
30 # - PBSEngineSetLauncher
31 # - WindowsHPCEngineSetLauncher
24 32 # c.Global.engine_launcher = 'IPython.kernel.launcher.LocalEngineSetLauncher'
25 33
26 34 #-----------------------------------------------------------------------------
27 35 # Global configuration
28 36 #-----------------------------------------------------------------------------
29 37
30 # The default number of engine that will be started. This is overridden by
38 # The default number of engines that will be started. This is overridden by
31 39 # the -n command line option: "ipcluster start -n 4"
32 40 # c.Global.n = 2
33 41
34 42 # Log to a file in cluster_dir/log, otherwise just log to sys.stdout.
35 43 # c.Global.log_to_file = False
36 44
37 45 # Remove old logs from cluster_dir/log before starting.
38 46 # c.Global.clean_logs = True
39 47
40 48 # The working directory for the process. The application will use os.chdir
41 49 # to change to this directory before starting.
42 50 # c.Global.working_dir = os.getcwd()
43 51
52
44 53 #-----------------------------------------------------------------------------
45 # Controller launcher configuration
54 # Local process launchers
46 55 #-----------------------------------------------------------------------------
47 56
48 # Configure how the controller is started. The configuration of the controller
49 # can also bet setup by editing the controller config file:
50 # ipcontroller_config.py
57 # The working directory for the controller
58 # c.LocalControllerLauncher.working_dir = u''
51 59
52 60 # The command line arguments to call the controller with.
53 61 # c.LocalControllerLauncher.controller_args = \
54 62 # ['--log-to-file','--log-level', '40']
55 63
64 # The working directory for the controller
65 # c.LocalEngineSetLauncher.working_dir = u''
66
67 # Command line argument passed to the engines.
68 # c.LocalEngineSetLauncher.engine_args = ['--log-to-file','--log-level', '40']
69
70 #-----------------------------------------------------------------------------
71 # MPIExec launchers
72 #-----------------------------------------------------------------------------
73
74 # The working directory for the controller
75 # c.MPIExecControllerLauncher.working_dir = u''
76
56 77 # The mpiexec/mpirun command to use in started the controller.
57 78 # c.MPIExecControllerLauncher.mpi_cmd = ['mpiexec']
58 79
59 80 # Additional arguments to pass to the actual mpiexec command.
60 81 # c.MPIExecControllerLauncher.mpi_args = []
61 82
62 83 # The command line argument to call the controller with.
63 84 # c.MPIExecControllerLauncher.controller_args = \
64 85 # ['--log-to-file','--log-level', '40']
65 86
87
88 # The working directory for the controller
89 # c.MPIExecEngineSetLauncher.working_dir = u''
90
91 # The mpiexec/mpirun command to use in started the controller.
92 # c.MPIExecEngineSetLauncher.mpi_cmd = ['mpiexec']
93
94 # Additional arguments to pass to the actual mpiexec command.
95 # c.MPIExecEngineSetLauncher.mpi_args = []
96
97 # Command line argument passed to the engines.
98 # c.MPIExecEngineSetLauncher.engine_args = ['--log-to-file','--log-level', '40']
99
100 # The default number of engines to start if not given elsewhere.
101 # c.MPIExecEngineSetLauncher.n = 1
102
103 #-----------------------------------------------------------------------------
104 # SSH launchers
105 #-----------------------------------------------------------------------------
106
107 # Todo
108
109
110 #-----------------------------------------------------------------------------
111 # Unix batch (PBS) schedulers launchers
112 #-----------------------------------------------------------------------------
113
114 # The working directory for the controller
115 # c.PBSControllerLauncher.working_dir = u''
116
66 117 # The command line program to use to submit a PBS job.
67 118 # c.PBSControllerLauncher.submit_command = 'qsub'
68 119
69 120 # The command line program to use to delete a PBS job.
70 121 # c.PBSControllerLauncher.delete_command = 'qdel'
71 122
72 123 # A regular expression that takes the output of qsub and find the job id.
73 124 # c.PBSControllerLauncher.job_id_regexp = '\d+'
74 125
75 126 # The batch submission script used to start the controller. This is where
76 127 # environment variables would be setup, etc. This string is interpolated using
77 128 # the Itpl module in IPython.external. Basically, you can use ${profile} for
78 129 # the controller profile or ${cluster_dir} for the cluster_dir.
79 130 # c.PBSControllerLauncher.batch_template = """"""
80 131
81 132 # The name of the instantiated batch script that will actually be used to
82 133 # submit the job. This will be written to the cluster directory.
83 134 # c.PBSControllerLauncher.batch_file_name = u'pbs_batch_script_controller'
84 135
85 #-----------------------------------------------------------------------------
86 # Windows HPC Server 2008 launcher configuration
87 #-----------------------------------------------------------------------------
88
89 # c.WinHPCJob.username = 'DOMAIN\\user'
90 # c.WinHPCJob.priority = 'Highest'
91 # c.WinHPCJob.requested_nodes = ''
92 # c.WinHPCJob.project = ''
93 # c.WinHPCJob.is_exclusive = False
94
95 # c.WinHPCTask.environment_variables = {}
96 # c.WinHPCTask.work_directory = ''
97 # c.WinHPCTask.is_rerunnable = True
98
99 # c.IPControllerTask.task_name = 'IPController'
100 # c.IPControllerTask.controller_cmd = [u'ipcontroller.exe']
101 # c.IPControllerTask.controller_args = ['--log-to-file', '--log-level', '40']
102 # c.IPControllerTask.environment_variables = {}
103
104 # c.IPEngineTask.task_name = 'IPController'
105 # c.IPEngineTask.engine_cmd = [u'ipengine.exe']
106 # c.IPEngineTask.engine_args = ['--log-to-file', '--log-level', '40']
107 # c.IPEngineTask.environment_variables = {}
108
109 # c.WindowsHPCLauncher.scheduler = 'HEADNODE'
110 # c.WindowsHPCLauncher.username = '\\DOMAIN\USERNAME'
111 # c.WindowsHPCLauncher.priority = 'Highest'
112 # c.WindowsHPCLauncher.requested_nodes = ''
113 # c.WindowsHPCLauncher.job_file_name = u'ipython_job.xml'
114 # c.WindowsHPCLauncher.project = 'MyProject'
115
116 # c.WindowsHPCControllerLauncher.scheduler = 'HEADNODE'
117 # c.WindowsHPCControllerLauncher.username = '\\DOMAIN\USERNAME'
118 # c.WindowsHPCControllerLauncher.priority = 'Highest'
119 # c.WindowsHPCControllerLauncher.requested_nodes = ''
120 # c.WindowsHPCControllerLauncher.job_file_name = u'ipcontroller_job.xml'
121 # c.WindowsHPCControllerLauncher.project = 'MyProject'
122
123
124 #-----------------------------------------------------------------------------
125 # Engine launcher configuration
126 #-----------------------------------------------------------------------------
127
128 # Command line argument passed to the engines.
129 # c.LocalEngineSetLauncher.engine_args = ['--log-to-file','--log-level', '40']
130
131 # The mpiexec/mpirun command to use in started the controller.
132 # c.MPIExecEngineSetLauncher.mpi_cmd = ['mpiexec']
133
134 # Additional arguments to pass to the actual mpiexec command.
135 # c.MPIExecEngineSetLauncher.mpi_args = []
136
137 # Command line argument passed to the engines.
138 # c.MPIExecEngineSetLauncher.engine_args = ['--log-to-file','--log-level', '40']
139 136
140 # The default number of engines to start if not given elsewhere.
141 # c.MPIExecEngineSetLauncher.n = 1
137 # The working directory for the controller
138 # c.PBSEngineSetLauncher.working_dir = u''
142 139
143 140 # The command line program to use to submit a PBS job.
144 141 # c.PBSEngineSetLauncher.submit_command = 'qsub'
145 142
146 143 # The command line program to use to delete a PBS job.
147 144 # c.PBSEngineSetLauncher.delete_command = 'qdel'
148 145
149 146 # A regular expression that takes the output of qsub and find the job id.
150 147 # c.PBSEngineSetLauncher.job_id_regexp = '\d+'
151 148
152 149 # The batch submission script used to start the engines. This is where
153 150 # environment variables would be setup, etc. This string is interpolated using
154 151 # the Itpl module in IPython.external. Basically, you can use ${n} for the
155 152 # number of engine, ${profile} or the engine profile and ${cluster_dir}
156 153 # for the cluster_dir.
157 154 # c.PBSEngineSetLauncher.batch_template = """"""
158 155
159 156 # The name of the instantiated batch script that will actually be used to
160 157 # submit the job. This will be written to the cluster directory.
161 158 # c.PBSEngineSetLauncher.batch_file_name = u'pbs_batch_script_engines'
162 159
163 160 #-----------------------------------------------------------------------------
164 # Base launcher configuration
161 # Windows HPC Server 2008 launcher configuration
165 162 #-----------------------------------------------------------------------------
166 163
167 # The various launchers are organized into an inheritance hierarchy.
168 # The configurations can also be iherited and the following attributes
169 # allow you to configure the base classes.
170
171 # c.MPIExecLauncher.mpi_cmd = ['mpiexec']
172 # c.MPIExecLauncher.mpi_args = []
173 # c.MPIExecLauncher.program = []
174 # c.MPIExecLauncher.program_args = []
175 # c.MPIExecLauncher.n = 1
176
177 # c.SSHLauncher.ssh_cmd = ['ssh']
178 # c.SSHLauncher.ssh_args = []
179 # c.SSHLauncher.program = []
180 # s.SSHLauncher.program_args = []
181 # c.SSHLauncher.hostname = ''
182 # c.SSHLauncher.user = os.environ['USER']
183
184 # c.BatchSystemLauncher.submit_command
185 # c.BatchSystemLauncher.delete_command
186 # c.BatchSystemLauncher.job_id_regexp
187 # c.BatchSystemLauncher.batch_template
188 # c.BatchSystemLauncher.batch_file_name
189
190 # c.PBSLauncher.submit_command = 'qsub'
191 # c.PBSLauncher.delete_command = 'qdel'
192 # c.PBSLauncher.job_id_regexp = '\d+'
193 # c.PBSLauncher.batch_template = """"""
194 # c.PBSLauncher.batch_file_name = u'pbs_batch_script'
164 # c.IPControllerJob.job_name = 'IPController'
165 # c.IPControllerJob.is_exclusive = False
166 # c.IPControllerJob.username = 'USERDOMAIN\\USERNAME'
167 # c.IPControllerJob.priority = 'Highest'
168 # c.IPControllerJob.requested_nodes = ''
169 # c.IPControllerJob.project = 'MyProject'
170
171 # c.IPControllerTask.task_name = 'IPController'
172 # c.IPControllerTask.controller_cmd = [u'ipcontroller.exe']
173 # c.IPControllerTask.controller_args = ['--log-to-file', '--log-level', '40']
174 # c.IPControllerTask.environment_variables = {}
175
176 # c.WindowsHPCControllerLauncher.working_dir = u''
177 # c.WindowsHPCControllerLauncher.scheduler = 'HEADNODE'
178 # c.WindowsHPCControllerLauncher.job_file_name = u'ipcontroller_job.xml'
179
180
181 # c.IPEngineSetJob.job_name = 'IPEngineSet'
182 # c.IPEngineSetJob.is_exclusive = False
183 # c.IPEngineSetJob.username = 'USERDOMAIN\\USERNAME'
184 # c.IPEngineSetJob.priority = 'Highest'
185 # c.IPEngineSetJob.requested_nodes = ''
186 # c.IPEngineSetJob.project = 'MyProject'
187
188 # c.IPEngineTask.task_name = 'IPEngine'
189 # c.IPEngineTask.engine_cmd = [u'ipengine.exe']
190 # c.IPEngineTask.engine_args = ['--log-to-file', '--log-level', '40']
191 # c.IPEngineTask.environment_variables = {}
192
193 # c.WindowsHPCEngineSetLauncher.working_dir = u''
194 # c.WindowsHPCEngineSetLauncher.scheduler = 'HEADNODE'
195 # c.WindowsHPCEngineSetLauncher.job_file_name = u'ipengineset_job.xml'
196
197
198
195 199
196 200
197 201
198 202
This diff has been collapsed as it changes many lines, (568 lines changed) Show them Hide them
@@ -1,820 +1,866 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 Facilities for launching processing asynchronously.
4 Facilities for launching IPython processes asynchronously.
5 5 """
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2008-2009 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 import os
19 19 import re
20 20 import sys
21 21
22 22 from IPython.core.component import Component
23 23 from IPython.external import Itpl
24 24 from IPython.utils.traitlets import Str, Int, List, Unicode, Enum
25 25 from IPython.utils.platutils import find_cmd
26 26 from IPython.kernel.twistedutil import gatherBoth, make_deferred, sleep_deferred
27 27 from IPython.kernel.winhpcjob import (
28 28 WinHPCJob, WinHPCTask,
29 IPControllerTask, IPEngineTask
29 IPControllerTask, IPEngineTask,
30 IPControllerJob, IPEngineSetJob
30 31 )
31 32
32 33 from twisted.internet import reactor, defer
33 34 from twisted.internet.defer import inlineCallbacks
34 35 from twisted.internet.protocol import ProcessProtocol
35 36 from twisted.internet.utils import getProcessOutput
36 37 from twisted.internet.error import ProcessDone, ProcessTerminated
37 38 from twisted.python import log
38 39 from twisted.python.failure import Failure
39 40
40 41 #-----------------------------------------------------------------------------
41 # Generic launchers
42 # Utilities
43 #-----------------------------------------------------------------------------
44
45
46 def find_controller_cmd():
47 """Find the command line ipcontroller program in a cross platform way."""
48 if sys.platform == 'win32':
49 # This logic is needed because the ipcontroller script doesn't
50 # always get installed in the same way or in the same location.
51 from IPython.kernel import ipcontrollerapp
52 script_location = ipcontrollerapp.__file__.replace('.pyc', '.py')
53 # The -u option here turns on unbuffered output, which is required
54 # on Win32 to prevent wierd conflict and problems with Twisted.
55 # Also, use sys.executable to make sure we are picking up the
56 # right python exe.
57 cmd = [sys.executable, '-u', script_location]
58 else:
59 # ipcontroller has to be on the PATH in this case.
60 cmd = ['ipcontroller']
61 return cmd
62
63
64 def find_engine_cmd():
65 """Find the command line ipengine program in a cross platform way."""
66 if sys.platform == 'win32':
67 # This logic is needed because the ipengine script doesn't
68 # always get installed in the same way or in the same location.
69 from IPython.kernel import ipengineapp
70 script_location = ipengineapp.__file__.replace('.pyc', '.py')
71 # The -u option here turns on unbuffered output, which is required
72 # on Win32 to prevent wierd conflict and problems with Twisted.
73 # Also, use sys.executable to make sure we are picking up the
74 # right python exe.
75 cmd = [sys.executable, '-u', script_location]
76 else:
77 # ipcontroller has to be on the PATH in this case.
78 cmd = ['ipengine']
79 return cmd
80
81
82 #-----------------------------------------------------------------------------
83 # Base launchers and errors
42 84 #-----------------------------------------------------------------------------
43 85
44 86
45 87 class LauncherError(Exception):
46 88 pass
47 89
48 90
49 91 class ProcessStateError(LauncherError):
50 92 pass
51 93
52 94
53 95 class UnknownStatus(LauncherError):
54 96 pass
55 97
56 98
57 99 class BaseLauncher(Component):
58 100 """An asbtraction for starting, stopping and signaling a process."""
59 101
60 # A directory for files related to the process. But, we don't cd to
61 # this directory,
62 102 working_dir = Unicode(u'')
63 103
64 104 def __init__(self, working_dir, parent=None, name=None, config=None):
65 105 super(BaseLauncher, self).__init__(parent, name, config)
66 106 self.working_dir = working_dir
67 107 self.state = 'before' # can be before, running, after
68 108 self.stop_deferreds = []
69 109 self.start_data = None
70 110 self.stop_data = None
71 111
72 112 @property
73 113 def args(self):
74 114 """A list of cmd and args that will be used to start the process.
75 115
76 116 This is what is passed to :func:`spawnProcess` and the first element
77 117 will be the process name.
78 118 """
79 119 return self.find_args()
80 120
81 121 def find_args(self):
82 122 """The ``.args`` property calls this to find the args list.
83 123
84 124 Subcommand should implement this to construct the cmd and args.
85 125 """
86 126 raise NotImplementedError('find_args must be implemented in a subclass')
87 127
88 128 @property
89 129 def arg_str(self):
90 130 """The string form of the program arguments."""
91 131 return ' '.join(self.args)
92 132
93 133 @property
94 134 def running(self):
95 135 """Am I running."""
96 136 if self.state == 'running':
97 137 return True
98 138 else:
99 139 return False
100 140
101 141 def start(self):
102 142 """Start the process.
103 143
104 144 This must return a deferred that fires with information about the
105 145 process starting (like a pid, job id, etc.).
106 146 """
107 147 return defer.fail(
108 148 Failure(NotImplementedError(
109 149 'start must be implemented in a subclass')
110 150 )
111 151 )
112 152
113 153 def stop(self):
114 154 """Stop the process and notify observers of stopping.
115 155
116 156 This must return a deferred that fires with information about the
117 157 processing stopping, like errors that occur while the process is
118 158 attempting to be shut down. This deferred won't fire when the process
119 159 actually stops. To observe the actual process stopping, see
120 160 :func:`observe_stop`.
121 161 """
122 162 return defer.fail(
123 163 Failure(NotImplementedError(
124 164 'stop must be implemented in a subclass')
125 165 )
126 166 )
127 167
128 168 def observe_stop(self):
129 169 """Get a deferred that will fire when the process stops.
130 170
131 171 The deferred will fire with data that contains information about
132 172 the exit status of the process.
133 173 """
134 174 if self.state=='after':
135 175 return defer.succeed(self.stop_data)
136 176 else:
137 177 d = defer.Deferred()
138 178 self.stop_deferreds.append(d)
139 179 return d
140 180
141 181 def notify_start(self, data):
142 182 """Call this to trigger startup actions.
143 183
144 184 This logs the process startup and sets the state to 'running'. It is
145 185 a pass-through so it can be used as a callback.
146 186 """
147 187
148 188 log.msg('Process %r started: %r' % (self.args[0], data))
149 189 self.start_data = data
150 190 self.state = 'running'
151 191 return data
152 192
153 193 def notify_stop(self, data):
154 194 """Call this to trigger process stop actions.
155 195
156 196 This logs the process stopping and sets the state to 'after'. Call
157 197 this to trigger all the deferreds from :func:`observe_stop`."""
158 198
159 199 log.msg('Process %r stopped: %r' % (self.args[0], data))
160 200 self.stop_data = data
161 201 self.state = 'after'
162 202 for i in range(len(self.stop_deferreds)):
163 203 d = self.stop_deferreds.pop()
164 204 d.callback(data)
165 205 return data
166 206
167 207 def signal(self, sig):
168 208 """Signal the process.
169 209
170 210 Return a semi-meaningless deferred after signaling the process.
171 211
172 212 Parameters
173 213 ----------
174 214 sig : str or int
175 215 'KILL', 'INT', etc., or any signal number
176 216 """
177 217 return defer.fail(
178 218 Failure(NotImplementedError(
179 219 'signal must be implemented in a subclass')
180 220 )
181 221 )
182 222
183 223
224 #-----------------------------------------------------------------------------
225 # Local process launchers
226 #-----------------------------------------------------------------------------
227
228
184 229 class LocalProcessLauncherProtocol(ProcessProtocol):
185 230 """A ProcessProtocol to go with the LocalProcessLauncher."""
186 231
187 232 def __init__(self, process_launcher):
188 233 self.process_launcher = process_launcher
189 234 self.pid = None
190 235
191 236 def connectionMade(self):
192 237 self.pid = self.transport.pid
193 238 self.process_launcher.notify_start(self.transport.pid)
194 239
195 240 def processEnded(self, status):
196 241 value = status.value
197 242 if isinstance(value, ProcessDone):
198 243 self.process_launcher.notify_stop(
199 244 {'exit_code':0,
200 245 'signal':None,
201 246 'status':None,
202 247 'pid':self.pid
203 248 }
204 249 )
205 250 elif isinstance(value, ProcessTerminated):
206 251 self.process_launcher.notify_stop(
207 252 {'exit_code':value.exitCode,
208 253 'signal':value.signal,
209 254 'status':value.status,
210 255 'pid':self.pid
211 256 }
212 257 )
213 258 else:
214 259 raise UnknownStatus("Unknown exit status, this is probably a "
215 260 "bug in Twisted")
216 261
217 262 def outReceived(self, data):
218 263 log.msg(data)
219 264
220 265 def errReceived(self, data):
221 266 log.err(data)
222 267
223 268
224 269 class LocalProcessLauncher(BaseLauncher):
225 270 """Start and stop an external process in an asynchronous manner.
226 271
227 272 This will launch the external process with a working directory of
228 273 ``self.working_dir``.
229 274 """
230 275
231 276 # This is used to to construct self.args, which is passed to
232 277 # spawnProcess.
233 278 cmd_and_args = List([])
234 279
235 280 def __init__(self, working_dir, parent=None, name=None, config=None):
236 281 super(LocalProcessLauncher, self).__init__(
237 282 working_dir, parent, name, config
238 283 )
239 284 self.process_protocol = None
240 285 self.start_deferred = None
241 286
242 287 def find_args(self):
243 288 return self.cmd_and_args
244 289
245 290 def start(self):
246 291 if self.state == 'before':
247 292 self.process_protocol = LocalProcessLauncherProtocol(self)
248 293 self.start_deferred = defer.Deferred()
249 294 self.process_transport = reactor.spawnProcess(
250 295 self.process_protocol,
251 296 str(self.args[0]), # twisted expects these to be str, not unicode
252 297 [str(a) for a in self.args], # str expected, not unicode
253 298 env=os.environ,
254 299 path=self.working_dir # start in the working_dir
255 300 )
256 301 return self.start_deferred
257 302 else:
258 303 s = 'The process was already started and has state: %r' % self.state
259 304 return defer.fail(ProcessStateError(s))
260 305
261 306 def notify_start(self, data):
262 307 super(LocalProcessLauncher, self).notify_start(data)
263 308 self.start_deferred.callback(data)
264 309
265 310 def stop(self):
266 311 return self.interrupt_then_kill()
267 312
268 313 @make_deferred
269 314 def signal(self, sig):
270 315 if self.state == 'running':
271 316 self.process_transport.signalProcess(sig)
272 317
273 318 @inlineCallbacks
274 319 def interrupt_then_kill(self, delay=2.0):
275 320 """Send INT, wait a delay and then send KILL."""
276 321 yield self.signal('INT')
277 322 yield sleep_deferred(delay)
278 323 yield self.signal('KILL')
279 324
280 325
326 class LocalControllerLauncher(LocalProcessLauncher):
327 """Launch a controller as a regular external process."""
328
329 controller_cmd = List(find_controller_cmd(), config=True)
330 # Command line arguments to ipcontroller.
331 controller_args = List(['--log-to-file','--log-level', '40'], config=True)
332
333 def find_args(self):
334 return self.controller_cmd + self.controller_args + \
335 ['--working-dir', self.working_dir]
336
337 def start(self, cluster_dir):
338 """Start the controller by cluster_dir."""
339 self.controller_args.extend(['--cluster-dir', cluster_dir])
340 self.cluster_dir = unicode(cluster_dir)
341 log.msg("Starting LocalControllerLauncher: %r" % self.args)
342 return super(LocalControllerLauncher, self).start()
343
344
345 class LocalEngineLauncher(LocalProcessLauncher):
346 """Launch a single engine as a regular externall process."""
347
348 engine_cmd = List(find_engine_cmd(), config=True)
349 # Command line arguments for ipengine.
350 engine_args = List(
351 ['--log-to-file','--log-level', '40'], config=True
352 )
353
354 def find_args(self):
355 return self.engine_cmd + self.engine_args + \
356 ['--working-dir', self.working_dir]
357
358 def start(self, cluster_dir):
359 """Start the engine by cluster_dir."""
360 self.engine_args.extend(['--cluster-dir', cluster_dir])
361 self.cluster_dir = unicode(cluster_dir)
362 return super(LocalEngineLauncher, self).start()
363
364
365 class LocalEngineSetLauncher(BaseLauncher):
366 """Launch a set of engines as regular external processes."""
367
368 # Command line arguments for ipengine.
369 engine_args = List(
370 ['--log-to-file','--log-level', '40'], config=True
371 )
372
373 def __init__(self, working_dir, parent=None, name=None, config=None):
374 super(LocalEngineSetLauncher, self).__init__(
375 working_dir, parent, name, config
376 )
377 self.launchers = []
378
379 def start(self, n, cluster_dir):
380 """Start n engines by profile or cluster_dir."""
381 self.cluster_dir = unicode(cluster_dir)
382 dlist = []
383 for i in range(n):
384 el = LocalEngineLauncher(self.working_dir, self)
385 # Copy the engine args over to each engine launcher.
386 import copy
387 el.engine_args = copy.deepcopy(self.engine_args)
388 d = el.start(cluster_dir)
389 if i==0:
390 log.msg("Starting LocalEngineSetLauncher: %r" % el.args)
391 self.launchers.append(el)
392 dlist.append(d)
393 # The consumeErrors here could be dangerous
394 dfinal = gatherBoth(dlist, consumeErrors=True)
395 dfinal.addCallback(self.notify_start)
396 return dfinal
397
398 def find_args(self):
399 return ['engine set']
400
401 def signal(self, sig):
402 dlist = []
403 for el in self.launchers:
404 d = el.signal(sig)
405 dlist.append(d)
406 dfinal = gatherBoth(dlist, consumeErrors=True)
407 return dfinal
408
409 def interrupt_then_kill(self, delay=1.0):
410 dlist = []
411 for el in self.launchers:
412 d = el.interrupt_then_kill(delay)
413 dlist.append(d)
414 dfinal = gatherBoth(dlist, consumeErrors=True)
415 return dfinal
416
417 def stop(self):
418 return self.interrupt_then_kill()
419
420 def observe_stop(self):
421 dlist = [el.observe_stop() for el in self.launchers]
422 dfinal = gatherBoth(dlist, consumeErrors=False)
423 dfinal.addCallback(self.notify_stop)
424 return dfinal
425
426
427 #-----------------------------------------------------------------------------
428 # MPIExec launchers
429 #-----------------------------------------------------------------------------
430
431
281 432 class MPIExecLauncher(LocalProcessLauncher):
282 433 """Launch an external process using mpiexec."""
283 434
284 435 # The mpiexec command to use in starting the process.
285 436 mpi_cmd = List(['mpiexec'], config=True)
286 437 # The command line arguments to pass to mpiexec.
287 438 mpi_args = List([], config=True)
288 439 # The program to start using mpiexec.
289 440 program = List(['date'], config=True)
290 441 # The command line argument to the program.
291 442 program_args = List([], config=True)
292 443 # The number of instances of the program to start.
293 444 n = Int(1, config=True)
294 445
295 446 def find_args(self):
296 447 """Build self.args using all the fields."""
297 448 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
298 449 self.program + self.program_args
299 450
300 451 def start(self, n):
301 452 """Start n instances of the program using mpiexec."""
302 453 self.n = n
303 454 return super(MPIExecLauncher, self).start()
304 455
305 456
457 class MPIExecControllerLauncher(MPIExecLauncher):
458 """Launch a controller using mpiexec."""
459
460 controller_cmd = List(find_controller_cmd(), config=True)
461 # Command line arguments to ipcontroller.
462 controller_args = List(['--log-to-file','--log-level', '40'], config=True)
463 n = Int(1, config=False)
464
465 def start(self, cluster_dir):
466 """Start the controller by cluster_dir."""
467 self.controller_args.extend(['--cluster-dir', cluster_dir])
468 self.cluster_dir = unicode(cluster_dir)
469 log.msg("Starting MPIExecControllerLauncher: %r" % self.args)
470 return super(MPIExecControllerLauncher, self).start(1)
471
472 def find_args(self):
473 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
474 self.controller_cmd + self.controller_args + \
475 ['--working-dir', self.working_dir]
476
477
478 class MPIExecEngineSetLauncher(MPIExecLauncher):
479
480 engine_cmd = List(find_engine_cmd(), config=True)
481 # Command line arguments for ipengine.
482 engine_args = List(
483 ['--log-to-file','--log-level', '40'], config=True
484 )
485 n = Int(1, config=True)
486
487 def start(self, n, cluster_dir):
488 """Start n engines by profile or cluster_dir."""
489 self.engine_args.extend(['--cluster-dir', cluster_dir])
490 self.cluster_dir = unicode(cluster_dir)
491 log.msg('Starting MPIExecEngineSetLauncher: %r' % self.args)
492 return super(MPIExecEngineSetLauncher, self).start(n)
493
494 def find_args(self):
495 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
496 self.engine_cmd + self.engine_args + \
497 ['--working-dir', self.working_dir]
498
499
500 #-----------------------------------------------------------------------------
501 # SSH launchers
502 #-----------------------------------------------------------------------------
503
504
306 505 class SSHLauncher(BaseLauncher):
307 506 """A minimal launcher for ssh.
308 507
309 508 To be useful this will probably have to be extended to use the ``sshx``
310 509 idea for environment variables. There could be other things this needs
311 510 as well.
312 511 """
313 512
314 513 ssh_cmd = List(['ssh'], config=True)
315 514 ssh_args = List([], config=True)
316 515 program = List(['date'], config=True)
317 516 program_args = List([], config=True)
318 517 hostname = Str('', config=True)
319 518 user = Str('', config=True)
320 519 location = Str('')
321 520
322 521 def _hostname_changed(self, name, old, new):
323 522 self.location = '%s@%s' % (self.user, new)
324 523
325 524 def _user_changed(self, name, old, new):
326 525 self.location = '%s@%s' % (new, self.hostname)
327 526
328 527 def find_args(self):
329 528 return self.ssh_cmd + self.ssh_args + [self.location] + \
330 529 self.program + self.program_args
331 530
332 531 def start(self, n, hostname=None, user=None):
333 532 if hostname is not None:
334 533 self.hostname = hostname
335 534 if user is not None:
336 535 self.user = user
337 536 return super(SSHLauncher, self).start()
338 537
339 538
539 class SSHControllerLauncher(SSHLauncher):
540 pass
541
542
543 class SSHEngineSetLauncher(BaseLauncher):
544 pass
545
546
547 #-----------------------------------------------------------------------------
548 # Windows HPC Server 2008 scheduler launchers
549 #-----------------------------------------------------------------------------
550
551
340 552 # This is only used on Windows.
341 if os.name=='nt':
342 job_cmd = find_cmd('job')
343 else:
344 job_cmd = 'job'
553 def find_job_cmd():
554 if os.name=='nt':
555 return find_cmd('job')
556 else:
557 return 'job'
345 558
346 559
347 560 class WindowsHPCLauncher(BaseLauncher):
348 561
349 562 # A regular expression used to get the job id from the output of the
350 563 # submit_command.
351 564 job_id_regexp = Str('\d+', config=True)
352 565 # The filename of the instantiated job script.
353 566 job_file_name = Unicode(u'ipython_job.xml', config=True)
354 567 # The full path to the instantiated job script. This gets made dynamically
355 568 # by combining the working_dir with the job_file_name.
356 569 job_file = Unicode(u'')
357 570 # The hostname of the scheduler to submit the job to
358 scheduler = Str('HEADNODE', config=True)
359 username = Str(os.environ.get('USERNAME', ''), config=True)
360 priority = Enum(('Lowest','BelowNormal','Normal','AboveNormal','Highest'),
361 default_value='Highest', config=True)
362 requested_nodes = Str('', config=True)
363 project = Str('MyProject', config=True)
364 job_cmd = Str(job_cmd, config=True)
571 scheduler = Str('', config=True)
572 job_cmd = Str(find_job_cmd, config=True)
365 573
366 574 def __init__(self, working_dir, parent=None, name=None, config=None):
367 575 super(WindowsHPCLauncher, self).__init__(
368 576 working_dir, parent, name, config
369 577 )
370 578 self.job_file = os.path.join(self.working_dir, self.job_file_name)
371 579
580 @property
581 def job_file(self):
582 return os.path.join(self.working_dir, self.job_file_name)
583
372 584 def write_job_file(self, n):
373 585 raise NotImplementedError("Implement write_job_file in a subclass.")
374 586
375 587 def find_args(self):
376 588 return ['job.exe']
377 589
378 590 def parse_job_id(self, output):
379 591 """Take the output of the submit command and return the job id."""
380 592 m = re.search(self.job_id_regexp, output)
381 593 if m is not None:
382 594 job_id = m.group()
383 595 else:
384 596 raise LauncherError("Job id couldn't be determined: %s" % output)
385 597 self.job_id = job_id
386 598 log.msg('Job started with job id: %r' % job_id)
387 599 return job_id
388 600
389 601 @inlineCallbacks
390 602 def start(self, n):
391 603 """Start n copies of the process using the Win HPC job scheduler."""
392 604 self.write_job_file(n)
393 605 args = [
394 606 'submit',
395 607 '/jobfile:%s' % self.job_file,
396 608 '/scheduler:%s' % self.scheduler
397 609 ]
398 610 log.msg("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
399 611 output = yield getProcessOutput(self.job_cmd,
400 612 args,
401 613 env=os.environ,
402 614 path=self.working_dir
403 615 )
404 616 job_id = self.parse_job_id(output)
405 617 self.notify_start(job_id)
406 618 defer.returnValue(job_id)
407 619
408 620 @inlineCallbacks
409 621 def stop(self):
410 622 args = [
411 623 'cancel',
412 624 self.job_id,
413 625 '/scheduler:%s' % self.scheduler
414 626 ]
415 627 log.msg("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
416 628 try:
417 629 output = yield getProcessOutput(self.job_cmd,
418 630 args,
419 631 env=os.environ,
420 632 path=self.working_dir
421 633 )
422 634 except:
423 635 output = 'The job already appears to be stoppped: %r' % self.job_id
424 636 self.notify_stop(output) # Pass the output of the kill cmd
425 637 defer.returnValue(output)
426
638
639
640 class WindowsHPCControllerLauncher(WindowsHPCLauncher):
641
642 job_file_name = Unicode(u'ipcontroller_job.xml', config=True)
643 extra_args = List([], config=False)
644
645 def write_job_file(self, n):
646 job = IPControllerJob(self)
647
648 t = IPControllerTask(self)
649 # The tasks work directory is *not* the actual work directory of
650 # the controller. It is used as the base path for the stdout/stderr
651 # files that the scheduler redirects to.
652 t.work_directory = self.cluster_dir
653 # Add the --cluster-dir and --working-dir from self.start().
654 t.controller_args.extend(self.extra_args)
655 job.add_task(t)
656
657 log.msg("Writing job description file: %s" % self.job_file)
658 job.write(self.job_file)
659
660 @property
661 def job_file(self):
662 return os.path.join(self.cluster_dir, self.job_file_name)
663
664 def start(self, cluster_dir):
665 """Start the controller by cluster_dir."""
666 self.extra_args = [
667 '--cluster-dir', cluster_dir, '--working-dir', self.working_dir
668 ]
669 self.cluster_dir = unicode(cluster_dir)
670 return super(WindowsHPCControllerLauncher, self).start(1)
671
672
673 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):
674
675 job_file_name = Unicode(u'ipengineset_job.xml', config=True)
676 extra_args = List([], config=False)
677
678 def write_job_file(self, n):
679 job = IPControllerJob(self)
680
681 for i in range(n):
682 t = IPEngineTask(self)
683 # The tasks work directory is *not* the actual work directory of
684 # the engine. It is used as the base path for the stdout/stderr
685 # files that the scheduler redirects to.
686 t.work_directory = self.cluster_dir
687 # Add the --cluster-dir and --working-dir from self.start().
688 t.engine_args.extend(self.extra_args)
689 job.add_task(t)
690
691 log.msg("Writing job description file: %s" % self.job_file)
692 job.write(self.job_file)
693
694 @property
695 def job_file(self):
696 return os.path.join(self.cluster_dir, self.job_file_name)
697
698 def start(self, cluster_dir):
699 """Start the controller by cluster_dir."""
700 self.extra_args = [
701 '--cluster-dir', cluster_dir, '--working-dir', self.working_dir
702 ]
703 self.cluster_dir = unicode(cluster_dir)
704 return super(WindowsHPCControllerLauncher, self).start(n)
705
706
707 #-----------------------------------------------------------------------------
708 # Batch (PBS) system launchers
709 #-----------------------------------------------------------------------------
710
427 711
428 712 class BatchSystemLauncher(BaseLauncher):
429 713 """Launch an external process using a batch system.
430 714
431 715 This class is designed to work with UNIX batch systems like PBS, LSF,
432 716 GridEngine, etc. The overall model is that there are different commands
433 717 like qsub, qdel, etc. that handle the starting and stopping of the process.
434 718
435 719 This class also has the notion of a batch script. The ``batch_template``
436 720 attribute can be set to a string that is a template for the batch script.
437 721 This template is instantiated using Itpl. Thus the template can use
438 722 ${n} fot the number of instances. Subclasses can add additional variables
439 723 to the template dict.
440 724 """
441 725
442 726 # Subclasses must fill these in. See PBSEngineSet
443 727 # The name of the command line program used to submit jobs.
444 728 submit_command = Str('', config=True)
445 729 # The name of the command line program used to delete jobs.
446 730 delete_command = Str('', config=True)
447 731 # A regular expression used to get the job id from the output of the
448 732 # submit_command.
449 733 job_id_regexp = Str('', config=True)
450 734 # The string that is the batch script template itself.
451 735 batch_template = Str('', config=True)
452 736 # The filename of the instantiated batch script.
453 737 batch_file_name = Unicode(u'batch_script', config=True)
454 738 # The full path to the instantiated batch script.
455 739 batch_file = Unicode(u'')
456 740
457 741 def __init__(self, working_dir, parent=None, name=None, config=None):
458 742 super(BatchSystemLauncher, self).__init__(
459 743 working_dir, parent, name, config
460 744 )
461 745 self.batch_file = os.path.join(self.working_dir, self.batch_file_name)
462 746 self.context = {}
463 747
464 748 def parse_job_id(self, output):
465 749 """Take the output of the submit command and return the job id."""
466 750 m = re.match(self.job_id_regexp, output)
467 751 if m is not None:
468 752 job_id = m.group()
469 753 else:
470 754 raise LauncherError("Job id couldn't be determined: %s" % output)
471 755 self.job_id = job_id
472 756 log.msg('Job started with job id: %r' % job_id)
473 757 return job_id
474 758
475 759 def write_batch_script(self, n):
476 760 """Instantiate and write the batch script to the working_dir."""
477 761 self.context['n'] = n
478 762 script_as_string = Itpl.itplns(self.batch_template, self.context)
479 763 log.msg('Writing instantiated batch script: %s' % self.batch_file)
480 764 f = open(self.batch_file, 'w')
481 765 f.write(script_as_string)
482 766 f.close()
483 767
484 768 @inlineCallbacks
485 769 def start(self, n):
486 770 """Start n copies of the process using a batch system."""
487 771 self.write_batch_script(n)
488 772 output = yield getProcessOutput(self.submit_command,
489 773 [self.batch_file], env=os.environ)
490 774 job_id = self.parse_job_id(output)
491 775 self.notify_start(job_id)
492 776 defer.returnValue(job_id)
493 777
494 778 @inlineCallbacks
495 779 def stop(self):
496 780 output = yield getProcessOutput(self.delete_command,
497 781 [self.job_id], env=os.environ
498 782 )
499 783 self.notify_stop(output) # Pass the output of the kill cmd
500 784 defer.returnValue(output)
501 785
502 786
503 787 class PBSLauncher(BatchSystemLauncher):
504 788 """A BatchSystemLauncher subclass for PBS."""
505 789
506 790 submit_command = Str('qsub', config=True)
507 791 delete_command = Str('qdel', config=True)
508 792 job_id_regexp = Str('\d+', config=True)
509 793 batch_template = Str('', config=True)
510 794 batch_file_name = Unicode(u'pbs_batch_script', config=True)
511 795 batch_file = Unicode(u'')
512 796
513 797
514 #-----------------------------------------------------------------------------
515 # Controller launchers
516 #-----------------------------------------------------------------------------
517
518 def find_controller_cmd():
519 """Find the command line ipcontroller program in a cross platform way."""
520 if sys.platform == 'win32':
521 # This logic is needed because the ipcontroller script doesn't
522 # always get installed in the same way or in the same location.
523 from IPython.kernel import ipcontrollerapp
524 script_location = ipcontrollerapp.__file__.replace('.pyc', '.py')
525 # The -u option here turns on unbuffered output, which is required
526 # on Win32 to prevent wierd conflict and problems with Twisted.
527 # Also, use sys.executable to make sure we are picking up the
528 # right python exe.
529 cmd = [sys.executable, '-u', script_location]
530 else:
531 # ipcontroller has to be on the PATH in this case.
532 cmd = ['ipcontroller']
533 return cmd
534
535
536 class LocalControllerLauncher(LocalProcessLauncher):
537 """Launch a controller as a regular external process."""
538
539 controller_cmd = List(find_controller_cmd(), config=True)
540 # Command line arguments to ipcontroller.
541 controller_args = List(['--log-to-file','--log-level', '40'], config=True)
542
543 def find_args(self):
544 return self.controller_cmd + self.controller_args
545
546 def start(self, profile=None, cluster_dir=None):
547 """Start the controller by profile or cluster_dir."""
548 if cluster_dir is not None:
549 self.controller_args.extend(['--cluster-dir', cluster_dir])
550 if profile is not None:
551 self.controller_args.extend(['--profile', profile])
552 log.msg("Starting LocalControllerLauncher: %r" % self.args)
553 return super(LocalControllerLauncher, self).start()
554
555
556 class WindowsHPCControllerLauncher(WindowsHPCLauncher):
557
558 job_file_name = Unicode(u'ipcontroller_job.xml', config=True)
559 extra_args = List([],config=False)
560
561 def write_job_file(self, n):
562 job = WinHPCJob(self)
563 job.job_name = "IPController"
564 job.username = self.username
565 job.priority = self.priority
566 job.requested_nodes = self.requested_nodes
567 job.project = self.project
568
569 t = IPControllerTask(self)
570 t.work_directory = self.working_dir
571 # Add the --profile and --cluster-dir args from start.
572 t.controller_args.extend(self.extra_args)
573 job.add_task(t)
574 log.msg("Writing job description file: %s" % self.job_file)
575 job.write(self.job_file)
576
577 def start(self, profile=None, cluster_dir=None):
578 """Start the controller by profile or cluster_dir."""
579 if cluster_dir is not None:
580 self.extra_args = ['--cluster-dir', cluster_dir]
581 if profile is not None:
582 self.extra_args = ['--profile', profile]
583 return super(WindowsHPCControllerLauncher, self).start(1)
584
585
586 class MPIExecControllerLauncher(MPIExecLauncher):
587 """Launch a controller using mpiexec."""
588
589 controller_cmd = List(find_controller_cmd(), config=True)
590 # Command line arguments to ipcontroller.
591 controller_args = List(['--log-to-file','--log-level', '40'], config=True)
592 n = Int(1, config=False)
593
594 def start(self, profile=None, cluster_dir=None):
595 """Start the controller by profile or cluster_dir."""
596 if cluster_dir is not None:
597 self.controller_args.extend(['--cluster-dir', cluster_dir])
598 if profile is not None:
599 self.controller_args.extend(['--profile', profile])
600 log.msg("Starting MPIExecControllerLauncher: %r" % self.args)
601 return super(MPIExecControllerLauncher, self).start(1)
602
603 def find_args(self):
604 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
605 self.controller_cmd + self.controller_args
606
607
608 798 class PBSControllerLauncher(PBSLauncher):
609 799 """Launch a controller using PBS."""
610 800
611 801 batch_file_name = Unicode(u'pbs_batch_script_controller', config=True)
612 802
613 def start(self, profile=None, cluster_dir=None):
803 def start(self, cluster_dir):
614 804 """Start the controller by profile or cluster_dir."""
615 805 # Here we save profile and cluster_dir in the context so they
616 806 # can be used in the batch script template as ${profile} and
617 807 # ${cluster_dir}
618 if cluster_dir is not None:
619 self.context['cluster_dir'] = cluster_dir
620 if profile is not None:
621 self.context['profile'] = profile
808 self.context['cluster_dir'] = cluster_dir
809 self.cluster_dir = unicode(cluster_dir)
622 810 log.msg("Starting PBSControllerLauncher: %r" % self.args)
623 811 return super(PBSControllerLauncher, self).start(1)
624 812
625 813
626 class SSHControllerLauncher(SSHLauncher):
627 pass
628
629
630 #-----------------------------------------------------------------------------
631 # Engine launchers
632 #-----------------------------------------------------------------------------
633
634
635 def find_engine_cmd():
636 """Find the command line ipengine program in a cross platform way."""
637 if sys.platform == 'win32':
638 # This logic is needed because the ipengine script doesn't
639 # always get installed in the same way or in the same location.
640 from IPython.kernel import ipengineapp
641 script_location = ipengineapp.__file__.replace('.pyc', '.py')
642 # The -u option here turns on unbuffered output, which is required
643 # on Win32 to prevent wierd conflict and problems with Twisted.
644 # Also, use sys.executable to make sure we are picking up the
645 # right python exe.
646 cmd = [sys.executable, '-u', script_location]
647 else:
648 # ipcontroller has to be on the PATH in this case.
649 cmd = ['ipengine']
650 return cmd
651
652
653 class LocalEngineLauncher(LocalProcessLauncher):
654 """Launch a single engine as a regular externall process."""
655
656 engine_cmd = List(find_engine_cmd(), config=True)
657 # Command line arguments for ipengine.
658 engine_args = List(
659 ['--log-to-file','--log-level', '40'], config=True
660 )
661
662 def find_args(self):
663 return self.engine_cmd + self.engine_args
664
665 def start(self, profile=None, cluster_dir=None):
666 """Start the engine by profile or cluster_dir."""
667 if cluster_dir is not None:
668 self.engine_args.extend(['--cluster-dir', cluster_dir])
669 if profile is not None:
670 self.engine_args.extend(['--profile', profile])
671 return super(LocalEngineLauncher, self).start()
672
673
674 class LocalEngineSetLauncher(BaseLauncher):
675 """Launch a set of engines as regular external processes."""
676
677 # Command line arguments for ipengine.
678 engine_args = List(
679 ['--log-to-file','--log-level', '40'], config=True
680 )
681
682 def __init__(self, working_dir, parent=None, name=None, config=None):
683 super(LocalEngineSetLauncher, self).__init__(
684 working_dir, parent, name, config
685 )
686 self.launchers = []
687
688 def start(self, n, profile=None, cluster_dir=None):
689 """Start n engines by profile or cluster_dir."""
690 dlist = []
691 for i in range(n):
692 el = LocalEngineLauncher(self.working_dir, self)
693 # Copy the engine args over to each engine launcher.
694 import copy
695 el.engine_args = copy.deepcopy(self.engine_args)
696 d = el.start(profile, cluster_dir)
697 if i==0:
698 log.msg("Starting LocalEngineSetLauncher: %r" % el.args)
699 self.launchers.append(el)
700 dlist.append(d)
701 # The consumeErrors here could be dangerous
702 dfinal = gatherBoth(dlist, consumeErrors=True)
703 dfinal.addCallback(self.notify_start)
704 return dfinal
705
706 def find_args(self):
707 return ['engine set']
708
709 def signal(self, sig):
710 dlist = []
711 for el in self.launchers:
712 d = el.signal(sig)
713 dlist.append(d)
714 dfinal = gatherBoth(dlist, consumeErrors=True)
715 return dfinal
716
717 def interrupt_then_kill(self, delay=1.0):
718 dlist = []
719 for el in self.launchers:
720 d = el.interrupt_then_kill(delay)
721 dlist.append(d)
722 dfinal = gatherBoth(dlist, consumeErrors=True)
723 return dfinal
724
725 def stop(self):
726 return self.interrupt_then_kill()
727
728 def observe_stop(self):
729 dlist = [el.observe_stop() for el in self.launchers]
730 dfinal = gatherBoth(dlist, consumeErrors=False)
731 dfinal.addCallback(self.notify_stop)
732 return dfinal
733
734
735 class MPIExecEngineSetLauncher(MPIExecLauncher):
736
737 engine_cmd = List(find_engine_cmd(), config=True)
738 # Command line arguments for ipengine.
739 engine_args = List(
740 ['--log-to-file','--log-level', '40'], config=True
741 )
742 n = Int(1, config=True)
743
744 def start(self, n, profile=None, cluster_dir=None):
745 """Start n engines by profile or cluster_dir."""
746 if cluster_dir is not None:
747 self.engine_args.extend(['--cluster-dir', cluster_dir])
748 if profile is not None:
749 self.engine_args.extend(['--profile', profile])
750 log.msg('Starting MPIExecEngineSetLauncher: %r' % self.args)
751 return super(MPIExecEngineSetLauncher, self).start(n)
752
753 def find_args(self):
754 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
755 self.engine_cmd + self.engine_args
756
757
758 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):
759 pass
760
761
762 814 class PBSEngineSetLauncher(PBSLauncher):
763 815
764 816 batch_file_name = Unicode(u'pbs_batch_script_engines', config=True)
765 817
766 def start(self, n, profile=None, cluster_dir=None):
818 def start(self, n, cluster_dir):
767 819 """Start n engines by profile or cluster_dir."""
768 if cluster_dir is not None:
769 self.program_args.extend(['--cluster-dir', cluster_dir])
770 if profile is not None:
771 self.program_args.extend(['-p', profile])
820 self.program_args.extend(['--cluster-dir', cluster_dir])
821 self.cluster_dir = unicode(cluster_dir)
772 822 log.msg('Starting PBSEngineSetLauncher: %r' % self.args)
773 823 return super(PBSEngineSetLauncher, self).start(n)
774 824
775 825
776 class SSHEngineSetLauncher(BaseLauncher):
777 pass
778
779
780 826 #-----------------------------------------------------------------------------
781 827 # A launcher for ipcluster itself!
782 828 #-----------------------------------------------------------------------------
783 829
784 830
785 831 def find_ipcluster_cmd():
786 832 """Find the command line ipcluster program in a cross platform way."""
787 833 if sys.platform == 'win32':
788 834 # This logic is needed because the ipcluster script doesn't
789 835 # always get installed in the same way or in the same location.
790 836 from IPython.kernel import ipclusterapp
791 837 script_location = ipclusterapp.__file__.replace('.pyc', '.py')
792 838 # The -u option here turns on unbuffered output, which is required
793 839 # on Win32 to prevent wierd conflict and problems with Twisted.
794 840 # Also, use sys.executable to make sure we are picking up the
795 841 # right python exe.
796 842 cmd = [sys.executable, '-u', script_location]
797 843 else:
798 844 # ipcontroller has to be on the PATH in this case.
799 845 cmd = ['ipcluster']
800 846 return cmd
801 847
802 848
803 849 class IPClusterLauncher(LocalProcessLauncher):
804 850 """Launch the ipcluster program in an external process."""
805 851
806 852 ipcluster_cmd = List(find_ipcluster_cmd(), config=True)
807 853 # Command line arguments to pass to ipcluster.
808 854 ipcluster_args = List(
809 855 ['--clean-logs', '--log-to-file', '--log-level', '40'], config=True)
810 856 ipcluster_subcommand = Str('start')
811 857 ipcluster_n = Int(2)
812 858
813 859 def find_args(self):
814 860 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
815 861 ['-n', repr(self.ipcluster_n)] + self.ipcluster_args
816 862
817 863 def start(self):
818 864 log.msg("Starting ipcluster: %r" % self.args)
819 865 return super(IPClusterLauncher, self).start()
820 866
@@ -1,277 +1,306 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 Job and task components for writing .xml files that the Windows HPC Server
5 5 2008 can use to start jobs.
6 6 """
7 7
8 8 #-----------------------------------------------------------------------------
9 9 # Copyright (C) 2008-2009 The IPython Development Team
10 10 #
11 11 # Distributed under the terms of the BSD License. The full license is in
12 12 # the file COPYING, distributed as part of this software.
13 13 #-----------------------------------------------------------------------------
14 14
15 15 #-----------------------------------------------------------------------------
16 16 # Imports
17 17 #-----------------------------------------------------------------------------
18 18
19 19 from __future__ import with_statement
20 20
21 21 import os
22 22 import re
23 23 import uuid
24 24
25 25 from xml.etree import ElementTree as ET
26 26 from xml.dom import minidom
27 27
28 28 from IPython.core.component import Component
29 29 from IPython.external import Itpl
30 30 from IPython.utils.traitlets import (
31 31 Str, Int, List, Unicode, Instance,
32 32 Enum, Bool, CStr
33 33 )
34 34
35 35 #-----------------------------------------------------------------------------
36 36 # Job and Task Component
37 37 #-----------------------------------------------------------------------------
38 38
39 39
40 40 def as_str(value):
41 41 if isinstance(value, str):
42 42 return value
43 43 elif isinstance(value, bool):
44 44 if value:
45 45 return 'true'
46 46 else:
47 47 return 'false'
48 48 elif isinstance(value, (int, float)):
49 49 return repr(value)
50 50 else:
51 51 return value
52 52
53 53
54 54 def indent(elem, level=0):
55 55 i = "\n" + level*" "
56 56 if len(elem):
57 57 if not elem.text or not elem.text.strip():
58 58 elem.text = i + " "
59 59 if not elem.tail or not elem.tail.strip():
60 60 elem.tail = i
61 61 for elem in elem:
62 62 indent(elem, level+1)
63 63 if not elem.tail or not elem.tail.strip():
64 64 elem.tail = i
65 65 else:
66 66 if level and (not elem.tail or not elem.tail.strip()):
67 67 elem.tail = i
68 68
69 69
70 def find_username():
71 domain = os.environ.get('USERDOMAIN')
72 username = os.environ.get('USERNAME','')
73 if domain is None:
74 return username
75 else:
76 return '%s\\%s' % (domain, username)
77
78
70 79 class WinHPCJob(Component):
71 80
72 81 job_id = Str('')
73 82 job_name = Str('MyJob', config=True)
74 83 min_cores = Int(1, config=True)
75 84 max_cores = Int(1, config=True)
76 85 min_sockets = Int(1, config=True)
77 86 max_sockets = Int(1, config=True)
78 87 min_nodes = Int(1, config=True)
79 88 max_nodes = Int(1, config=True)
80 89 unit_type = Str("Core", config=True)
81 90 auto_calculate_min = Bool(True, config=True)
82 91 auto_calculate_max = Bool(True, config=True)
83 92 run_until_canceled = Bool(False, config=True)
84 93 is_exclusive = Bool(False, config=True)
85 username = Str(os.environ.get('USERNAME', ''), config=True)
94 username = Str(find_username(), config=True)
86 95 job_type = Str('Batch', config=True)
87 96 priority = Enum(('Lowest','BelowNormal','Normal','AboveNormal','Highest'),
88 97 default_value='Highest', config=True)
89 98 requested_nodes = Str('', config=True)
90 99 project = Str('IPython', config=True)
91 100 xmlns = Str('http://schemas.microsoft.com/HPCS2008/scheduler/')
92 101 version = Str("2.000")
93 102 tasks = List([])
94 103
95 104 @property
96 105 def owner(self):
97 106 return self.username
98 107
99 108 def _write_attr(self, root, attr, key):
100 109 s = as_str(getattr(self, attr, ''))
101 110 if s:
102 111 root.set(key, s)
103 112
104 113 def as_element(self):
105 114 # We have to add _A_ type things to get the right order than
106 115 # the MSFT XML parser expects.
107 116 root = ET.Element('Job')
108 117 self._write_attr(root, 'version', '_A_Version')
109 118 self._write_attr(root, 'job_name', '_B_Name')
110 119 self._write_attr(root, 'unit_type', '_C_UnitType')
111 120 self._write_attr(root, 'min_cores', '_D_MinCores')
112 121 self._write_attr(root, 'max_cores', '_E_MaxCores')
113 122 self._write_attr(root, 'min_sockets', '_F_MinSockets')
114 123 self._write_attr(root, 'max_sockets', '_G_MaxSockets')
115 124 self._write_attr(root, 'min_nodes', '_H_MinNodes')
116 125 self._write_attr(root, 'max_nodes', '_I_MaxNodes')
117 126 self._write_attr(root, 'run_until_canceled', '_J_RunUntilCanceled')
118 127 self._write_attr(root, 'is_exclusive', '_K_IsExclusive')
119 128 self._write_attr(root, 'username', '_L_UserName')
120 129 self._write_attr(root, 'job_type', '_M_JobType')
121 130 self._write_attr(root, 'priority', '_N_Priority')
122 131 self._write_attr(root, 'requested_nodes', '_O_RequestedNodes')
123 132 self._write_attr(root, 'auto_calculate_max', '_P_AutoCalculateMax')
124 133 self._write_attr(root, 'auto_calculate_min', '_Q_AutoCalculateMin')
125 134 self._write_attr(root, 'project', '_R_Project')
126 135 self._write_attr(root, 'owner', '_S_Owner')
127 136 self._write_attr(root, 'xmlns', '_T_xmlns')
128 137 dependencies = ET.SubElement(root, "Dependencies")
129 138 etasks = ET.SubElement(root, "Tasks")
130 139 for t in self.tasks:
131 140 etasks.append(t.as_element())
132 141 return root
133 142
134 143 def tostring(self):
135 144 """Return the string representation of the job description XML."""
136 145 root = self.as_element()
137 146 indent(root)
138 147 txt = ET.tostring(root, encoding="utf-8")
139 148 # Now remove the tokens used to order the attributes.
140 149 txt = re.sub(r'_[A-Z]_','',txt)
141 150 txt = '<?xml version="1.0" encoding="utf-8"?>\n' + txt
142 151 return txt
143 152
144 153 def write(self, filename):
145 154 """Write the XML job description to a file."""
146 155 txt = self.tostring()
147 156 with open(filename, 'w') as f:
148 157 f.write(txt)
149 158
150 159 def add_task(self, task):
151 160 """Add a task to the job.
152 161
153 162 Parameters
154 163 ----------
155 164 task : :class:`WinHPCTask`
156 165 The task object to add.
157 166 """
158 167 self.tasks.append(task)
159 168
160 169
161 170 class WinHPCTask(Component):
162 171
163 172 task_id = Str('')
164 173 task_name = Str('')
165 174 version = Str("2.000")
166 175 min_cores = Int(1, config=True)
167 176 max_cores = Int(1, config=True)
168 177 min_sockets = Int(1, config=True)
169 178 max_sockets = Int(1, config=True)
170 179 min_nodes = Int(1, config=True)
171 180 max_nodes = Int(1, config=True)
172 181 unit_type = Str("Core", config=True)
173 182 command_line = CStr('', config=True)
174 183 work_directory = CStr('', config=True)
175 184 is_rerunnaable = Bool(True, config=True)
176 185 std_out_file_path = CStr('', config=True)
177 186 std_err_file_path = CStr('', config=True)
178 187 is_parametric = Bool(False, config=True)
179 188 environment_variables = Instance(dict, args=(), config=True)
180 189
181 190 def _write_attr(self, root, attr, key):
182 191 s = as_str(getattr(self, attr, ''))
183 192 if s:
184 193 root.set(key, s)
185 194
186 195 def as_element(self):
187 196 root = ET.Element('Task')
188 197 self._write_attr(root, 'version', '_A_Version')
189 198 self._write_attr(root, 'task_name', '_B_Name')
190 199 self._write_attr(root, 'min_cores', '_C_MinCores')
191 200 self._write_attr(root, 'max_cores', '_D_MaxCores')
192 201 self._write_attr(root, 'min_sockets', '_E_MinSockets')
193 202 self._write_attr(root, 'max_sockets', '_F_MaxSockets')
194 203 self._write_attr(root, 'min_nodes', '_G_MinNodes')
195 204 self._write_attr(root, 'max_nodes', '_H_MaxNodes')
196 205 self._write_attr(root, 'command_line', '_I_CommandLine')
197 206 self._write_attr(root, 'work_directory', '_J_WorkDirectory')
198 207 self._write_attr(root, 'is_rerunnaable', '_K_IsRerunnable')
199 208 self._write_attr(root, 'std_out_file_path', '_L_StdOutFilePath')
200 209 self._write_attr(root, 'std_err_file_path', '_M_StdErrFilePath')
201 210 self._write_attr(root, 'is_parametric', '_N_IsParametric')
202 211 self._write_attr(root, 'unit_type', '_O_UnitType')
203 212 root.append(self.get_env_vars())
204 213 return root
205 214
206 215 def get_env_vars(self):
207 216 env_vars = ET.Element('EnvironmentVariables')
208 217 for k, v in self.environment_variables.items():
209 218 variable = ET.SubElement(env_vars, "Variable")
210 219 name = ET.SubElement(variable, "Name")
211 220 name.text = k
212 221 value = ET.SubElement(variable, "Value")
213 222 value.text = v
214 223 return env_vars
215 224
216 225
217 226
218 227 # By declaring these, we can configure the controller and engine separately!
219
228
229 class IPControllerJob(WinHPCJob):
230 job_name = Str('IPController', config=False)
231 is_exclusive = Bool(False, config=True)
232 username = Str(find_username(), config=True)
233 priority = Enum(('Lowest','BelowNormal','Normal','AboveNormal','Highest'),
234 default_value='Highest', config=True)
235 requested_nodes = Str('', config=True)
236 project = Str('IPython', config=True)
237
238
239 class IPEngineSetJob(WinHPCJob):
240 job_name = Str('IPEngineSet', config=False)
241 is_exclusive = Bool(False, config=True)
242 username = Str(find_username(), config=True)
243 priority = Enum(('Lowest','BelowNormal','Normal','AboveNormal','Highest'),
244 default_value='Highest', config=True)
245 requested_nodes = Str('', config=True)
246 project = Str('IPython', config=True)
247
248
220 249 class IPControllerTask(WinHPCTask):
221 250
222 251 task_name = Str('IPController', config=True)
223 252 controller_cmd = List(['ipcontroller.exe'], config=True)
224 253 controller_args = List(['--log-to-file', '--log-level', '40'], config=True)
225 254 # I don't want these to be configurable
226 255 std_out_file_path = CStr(os.path.join('log','ipcontroller-out.txt'), config=False)
227 256 std_err_file_path = CStr(os.path.join('log','ipcontroller-err.txt'), config=False)
228 257 min_cores = Int(1, config=False)
229 258 max_cores = Int(1, config=False)
230 259 min_sockets = Int(1, config=False)
231 260 max_sockets = Int(1, config=False)
232 261 min_nodes = Int(1, config=False)
233 262 max_nodes = Int(1, config=False)
234 263 unit_type = Str("Core", config=False)
235 264 work_directory = CStr('', config=False)
236 265
237 266 @property
238 267 def command_line(self):
239 268 return ' '.join(self.controller_cmd + self.controller_args)
240 269
241 270
242 271 class IPEngineTask(WinHPCTask):
243 272
244 273 task_name = Str('IPEngine', config=True)
245 274 engine_cmd = List(['ipengine.exe'], config=True)
246 275 engine_args = List(['--log-to-file', '--log-level', '40'], config=True)
247 276 # I don't want these to be configurable
248 277 std_out_file_path = CStr(os.path.join('log','ipengine-out-%s.txt' % uuid.uuid1()), config=False)
249 278 std_err_file_path = CStr(os.path.join('log','ipengine-err-%s.txt' % uuid.uuid1()), config=False)
250 279 min_cores = Int(1, config=False)
251 280 max_cores = Int(1, config=False)
252 281 min_sockets = Int(1, config=False)
253 282 max_sockets = Int(1, config=False)
254 283 min_nodes = Int(1, config=False)
255 284 max_nodes = Int(1, config=False)
256 285 unit_type = Str("Core", config=False)
257 286 work_directory = CStr('', config=False)
258 287
259 288 @property
260 289 def command_line(self):
261 290 return ' '.join(self.engine_cmd + self.engine_args)
262 291
263 292
264 293 # j = WinHPCJob(None)
265 294 # j.job_name = 'IPCluster'
266 295 # j.username = 'GNET\\bgranger'
267 296 # j.requested_nodes = 'GREEN'
268 297 #
269 298 # t = WinHPCTask(None)
270 299 # t.task_name = 'Controller'
271 300 # t.command_line = r"\\blue\domainusers$\bgranger\Python\Python25\Scripts\ipcontroller.exe --log-to-file -p default --log-level 10"
272 301 # t.work_directory = r"\\blue\domainusers$\bgranger\.ipython\cluster_default"
273 302 # t.std_out_file_path = 'controller-out.txt'
274 303 # t.std_err_file_path = 'controller-err.txt'
275 304 # t.environment_variables['PYTHONPATH'] = r"\\blue\domainusers$\bgranger\Python\Python25\Lib\site-packages"
276 305 # j.add_task(t)
277 306
General Comments 0
You need to be logged in to leave comments. Login now