##// END OF EJS Templates
Initial version of Win HPC job scehduler support.
bgranger -
Show More
@@ -1,155 +1,194 b''
1 1 import os
2 2
3 3 c = get_config()
4 4
5 5 #-----------------------------------------------------------------------------
6 6 # Select which launchers to use
7 7 #-----------------------------------------------------------------------------
8 8
9 9 # This allows you to control what method is used to start the controller
10 10 # and engines. The following methods are currently supported:
11 11 # * Start as a regular process on localhost.
12 12 # * Start using mpiexec.
13 13 # * Start using PBS
14 14 # * Start using SSH (currently broken)
15 15
16 16 # The selected launchers can be configured below.
17 17
18 18 # Options are (LocalControllerLauncher, MPIExecControllerLauncher,
19 # PBSControllerLauncher)
19 # PBSControllerLauncher, WindowsHPCControllerLauncher)
20 20 # c.Global.controller_launcher = 'IPython.kernel.launcher.LocalControllerLauncher'
21 21
22 22 # Options are (LocalEngineSetLauncher, MPIExecEngineSetLauncher,
23 23 # PBSEngineSetLauncher)
24 24 # c.Global.engine_launcher = 'IPython.kernel.launcher.LocalEngineSetLauncher'
25 25
26 26 #-----------------------------------------------------------------------------
27 27 # Global configuration
28 28 #-----------------------------------------------------------------------------
29 29
30 30 # The default number of engine that will be started. This is overridden by
31 31 # the -n command line option: "ipcluster start -n 4"
32 32 # c.Global.n = 2
33 33
34 34 # Log to a file in cluster_dir/log, otherwise just log to sys.stdout.
35 35 # c.Global.log_to_file = False
36 36
37 37 # Remove old logs from cluster_dir/log before starting.
38 38 # c.Global.clean_logs = True
39 39
40 40 #-----------------------------------------------------------------------------
41 41 # Controller launcher configuration
42 42 #-----------------------------------------------------------------------------
43 43
44 44 # Configure how the controller is started. The configuration of the controller
45 45 # can also bet setup by editing the controller config file:
46 46 # ipcontroller_config.py
47 47
48 48 # The command line arguments to call the controller with.
49 49 # c.LocalControllerLauncher.controller_args = \
50 50 # ['--log-to-file','--log-level', '40']
51 51
52 52 # The mpiexec/mpirun command to use in started the controller.
53 53 # c.MPIExecControllerLauncher.mpi_cmd = ['mpiexec']
54 54
55 55 # Additional arguments to pass to the actual mpiexec command.
56 56 # c.MPIExecControllerLauncher.mpi_args = []
57 57
58 58 # The command line argument to call the controller with.
59 59 # c.MPIExecControllerLauncher.controller_args = \
60 60 # ['--log-to-file','--log-level', '40']
61 61
62 62 # The command line program to use to submit a PBS job.
63 63 # c.PBSControllerLauncher.submit_command = 'qsub'
64 64
65 65 # The command line program to use to delete a PBS job.
66 66 # c.PBSControllerLauncher.delete_command = 'qdel'
67 67
68 68 # A regular expression that takes the output of qsub and find the job id.
69 69 # c.PBSControllerLauncher.job_id_regexp = '\d+'
70 70
71 71 # The batch submission script used to start the controller. This is where
72 72 # environment variables would be setup, etc. This string is interpolated using
73 73 # the Itpl module in IPython.external. Basically, you can use ${profile} for
74 74 # the controller profile or ${cluster_dir} for the cluster_dir.
75 75 # c.PBSControllerLauncher.batch_template = """"""
76 76
77 77 # The name of the instantiated batch script that will actually be used to
78 78 # submit the job. This will be written to the cluster directory.
79 79 # c.PBSControllerLauncher.batch_file_name = u'pbs_batch_script_controller'
80 80
81 81 #-----------------------------------------------------------------------------
82 # Windows HPC Server 2008 launcher configuration
83 #-----------------------------------------------------------------------------
84
85 # c.WinHPCJob.username = 'DOMAIN\\user'
86 # c.WinHPCJob.priority = 'Highest'
87 # c.WinHPCJob.requested_nodes = ''
88 # c.WinHPCJob.project = ''
89 # c.WinHPCJob.is_exclusive = False
90
91 # c.WinHPCTask.environment_variables = {}
92 # c.WinHPCTask.work_directory = ''
93 # c.WinHPCTask.is_rerunnable = True
94
95 # c.IPControllerTask.task_name = 'IPController'
96 # c.IPControllerTask.controller_cmd = ['ipcontroller.exe']
97 # c.IPControllerTask.controller_args = ['--log-to-file', '--log-level', '40']
98 # c.IPControllerTask.environment_variables = {}
99
100 # c.IPEngineTask.task_name = 'IPController'
101 # c.IPEngineTask.engine_cmd = ['ipengine.exe']
102 # c.IPEngineTask.engine_args = ['--log-to-file', '--log-level', '40']
103 # c.IPEngineTask.environment_variables = {}
104
105 # c.WindowsHPCLauncher.scheduler = 'HEADNODE'
106 # c.WindowsHPCLauncher.username = '\\DOMAIN\USERNAME'
107 # c.WindowsHPCLauncher.priority = 'Highest'
108 # c.WindowsHPCLauncher.requested_nodes = ''
109 # c.WindowsHPCLauncher.job_file_name = u'ipython_job.xml'
110 # c.WindowsHPCLauncher.project = 'MyProject'
111
112 # c.WindowsHPCControllerLauncher.scheduler = 'HEADNODE'
113 # c.WindowsHPCControllerLauncher.username = '\\DOMAIN\USERNAME'
114 # c.WindowsHPCControllerLauncher.priority = 'Highest'
115 # c.WindowsHPCControllerLauncher.requested_nodes = ''
116 # c.WindowsHPCControllerLauncher.job_file_name = u'ipcontroller_job.xml'
117 # c.WindowsHPCControllerLauncher.project = 'MyProject'
118
119
120 #-----------------------------------------------------------------------------
82 121 # Engine launcher configuration
83 122 #-----------------------------------------------------------------------------
84 123
85 124 # Command line argument passed to the engines.
86 125 # c.LocalEngineSetLauncher.engine_args = ['--log-to-file','--log-level', '40']
87 126
88 127 # The mpiexec/mpirun command to use in started the controller.
89 128 # c.MPIExecEngineSetLauncher.mpi_cmd = ['mpiexec']
90 129
91 130 # Additional arguments to pass to the actual mpiexec command.
92 131 # c.MPIExecEngineSetLauncher.mpi_args = []
93 132
94 133 # Command line argument passed to the engines.
95 134 # c.MPIExecEngineSetLauncher.engine_args = ['--log-to-file','--log-level', '40']
96 135
97 136 # The default number of engines to start if not given elsewhere.
98 137 # c.MPIExecEngineSetLauncher.n = 1
99 138
100 139 # The command line program to use to submit a PBS job.
101 140 # c.PBSEngineSetLauncher.submit_command = 'qsub'
102 141
103 142 # The command line program to use to delete a PBS job.
104 143 # c.PBSEngineSetLauncher.delete_command = 'qdel'
105 144
106 145 # A regular expression that takes the output of qsub and find the job id.
107 146 # c.PBSEngineSetLauncher.job_id_regexp = '\d+'
108 147
109 148 # The batch submission script used to start the engines. This is where
110 149 # environment variables would be setup, etc. This string is interpolated using
111 150 # the Itpl module in IPython.external. Basically, you can use ${n} for the
112 151 # number of engine, ${profile} or the engine profile and ${cluster_dir}
113 152 # for the cluster_dir.
114 153 # c.PBSEngineSetLauncher.batch_template = """"""
115 154
116 155 # The name of the instantiated batch script that will actually be used to
117 156 # submit the job. This will be written to the cluster directory.
118 157 # c.PBSEngineSetLauncher.batch_file_name = u'pbs_batch_script_engines'
119 158
120 159 #-----------------------------------------------------------------------------
121 160 # Base launcher configuration
122 161 #-----------------------------------------------------------------------------
123 162
124 163 # The various launchers are organized into an inheritance hierarchy.
125 164 # The configurations can also be iherited and the following attributes
126 165 # allow you to configure the base classes.
127 166
128 167 # c.MPIExecLauncher.mpi_cmd = ['mpiexec']
129 168 # c.MPIExecLauncher.mpi_args = []
130 169 # c.MPIExecLauncher.program = []
131 170 # c.MPIExecLauncher.program_args = []
132 171 # c.MPIExecLauncher.n = 1
133 172
134 173 # c.SSHLauncher.ssh_cmd = ['ssh']
135 174 # c.SSHLauncher.ssh_args = []
136 175 # c.SSHLauncher.program = []
137 176 # s.SSHLauncher.program_args = []
138 177 # c.SSHLauncher.hostname = ''
139 178 # c.SSHLauncher.user = os.environ['USER']
140 179
141 180 # c.BatchSystemLauncher.submit_command
142 181 # c.BatchSystemLauncher.delete_command
143 182 # c.BatchSystemLauncher.job_id_regexp
144 183 # c.BatchSystemLauncher.batch_template
145 184 # c.BatchSystemLauncher.batch_file_name
146 185
147 186 # c.PBSLauncher.submit_command = 'qsub'
148 187 # c.PBSLauncher.delete_command = 'qdel'
149 188 # c.PBSLauncher.job_id_regexp = '\d+'
150 189 # c.PBSLauncher.batch_template = """"""
151 190 # c.PBSLauncher.batch_file_name = u'pbs_batch_script'
152 191
153 192
154 193
155 194
@@ -1,700 +1,808 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 Facilities for launching processing asynchronously.
5 5 """
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2008-2009 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 import os
19 19 import re
20 20 import sys
21 21
22 22 from IPython.core.component import Component
23 23 from IPython.external import Itpl
24 from IPython.utils.traitlets import Str, Int, List, Unicode
24 from IPython.utils.traitlets import Str, Int, List, Unicode, Enum
25 from IPython.utils.platutils import find_cmd
25 26 from IPython.kernel.twistedutil import gatherBoth, make_deferred, sleep_deferred
27 from IPython.kernel.winhpcjob import (
28 WinHPCJob, WinHPCTask,
29 IPControllerTask, IPEngineTask
30 )
26 31
27 32 from twisted.internet import reactor, defer
28 33 from twisted.internet.defer import inlineCallbacks
29 34 from twisted.internet.protocol import ProcessProtocol
30 35 from twisted.internet.utils import getProcessOutput
31 36 from twisted.internet.error import ProcessDone, ProcessTerminated
32 37 from twisted.python import log
33 38 from twisted.python.failure import Failure
34 39
35 40 #-----------------------------------------------------------------------------
36 41 # Generic launchers
37 42 #-----------------------------------------------------------------------------
38 43
39 44
40 45 class LauncherError(Exception):
41 46 pass
42 47
43 48
44 49 class ProcessStateError(LauncherError):
45 50 pass
46 51
47 52
48 53 class UnknownStatus(LauncherError):
49 54 pass
50 55
51 56
52 57 class BaseLauncher(Component):
53 58 """An asbtraction for starting, stopping and signaling a process."""
54 59
55 60 # A directory for files related to the process. But, we don't cd to
56 61 # this directory,
57 62 working_dir = Unicode(u'')
58 63
59 64 def __init__(self, working_dir, parent=None, name=None, config=None):
60 65 super(BaseLauncher, self).__init__(parent, name, config)
61 66 self.working_dir = working_dir
62 67 self.state = 'before' # can be before, running, after
63 68 self.stop_deferreds = []
64 69 self.start_data = None
65 70 self.stop_data = None
66 71
67 72 @property
68 73 def args(self):
69 74 """A list of cmd and args that will be used to start the process.
70 75
71 76 This is what is passed to :func:`spawnProcess` and the first element
72 77 will be the process name.
73 78 """
74 79 return self.find_args()
75 80
76 81 def find_args(self):
77 82 """The ``.args`` property calls this to find the args list.
78 83
79 84 Subcommand should implement this to construct the cmd and args.
80 85 """
81 86 raise NotImplementedError('find_args must be implemented in a subclass')
82 87
83 88 @property
84 89 def arg_str(self):
85 90 """The string form of the program arguments."""
86 91 return ' '.join(self.args)
87 92
88 93 @property
89 94 def running(self):
90 95 """Am I running."""
91 96 if self.state == 'running':
92 97 return True
93 98 else:
94 99 return False
95 100
96 101 def start(self):
97 102 """Start the process.
98 103
99 104 This must return a deferred that fires with information about the
100 105 process starting (like a pid, job id, etc.).
101 106 """
102 107 return defer.fail(
103 108 Failure(NotImplementedError(
104 109 'start must be implemented in a subclass')
105 110 )
106 111 )
107 112
108 113 def stop(self):
109 114 """Stop the process and notify observers of stopping.
110 115
111 116 This must return a deferred that fires with information about the
112 117 processing stopping, like errors that occur while the process is
113 118 attempting to be shut down. This deferred won't fire when the process
114 119 actually stops. To observe the actual process stopping, see
115 120 :func:`observe_stop`.
116 121 """
117 122 return defer.fail(
118 123 Failure(NotImplementedError(
119 124 'stop must be implemented in a subclass')
120 125 )
121 126 )
122 127
123 128 def observe_stop(self):
124 129 """Get a deferred that will fire when the process stops.
125 130
126 131 The deferred will fire with data that contains information about
127 132 the exit status of the process.
128 133 """
129 134 if self.state=='after':
130 135 return defer.succeed(self.stop_data)
131 136 else:
132 137 d = defer.Deferred()
133 138 self.stop_deferreds.append(d)
134 139 return d
135 140
136 141 def notify_start(self, data):
137 142 """Call this to trigger startup actions.
138 143
139 144 This logs the process startup and sets the state to 'running'. It is
140 145 a pass-through so it can be used as a callback.
141 146 """
142 147
143 148 log.msg('Process %r started: %r' % (self.args[0], data))
144 149 self.start_data = data
145 150 self.state = 'running'
146 151 return data
147 152
148 153 def notify_stop(self, data):
149 154 """Call this to trigger process stop actions.
150 155
151 156 This logs the process stopping and sets the state to 'after'. Call
152 157 this to trigger all the deferreds from :func:`observe_stop`."""
153 158
154 159 log.msg('Process %r stopped: %r' % (self.args[0], data))
155 160 self.stop_data = data
156 161 self.state = 'after'
157 162 for i in range(len(self.stop_deferreds)):
158 163 d = self.stop_deferreds.pop()
159 164 d.callback(data)
160 165 return data
161 166
162 167 def signal(self, sig):
163 168 """Signal the process.
164 169
165 170 Return a semi-meaningless deferred after signaling the process.
166 171
167 172 Parameters
168 173 ----------
169 174 sig : str or int
170 175 'KILL', 'INT', etc., or any signal number
171 176 """
172 177 return defer.fail(
173 178 Failure(NotImplementedError(
174 179 'signal must be implemented in a subclass')
175 180 )
176 181 )
177 182
178 183
179 184 class LocalProcessLauncherProtocol(ProcessProtocol):
180 185 """A ProcessProtocol to go with the LocalProcessLauncher."""
181 186
182 187 def __init__(self, process_launcher):
183 188 self.process_launcher = process_launcher
184 189 self.pid = None
185 190
186 191 def connectionMade(self):
187 192 self.pid = self.transport.pid
188 193 self.process_launcher.notify_start(self.transport.pid)
189 194
190 195 def processEnded(self, status):
191 196 value = status.value
192 197 if isinstance(value, ProcessDone):
193 198 self.process_launcher.notify_stop(
194 199 {'exit_code':0,
195 200 'signal':None,
196 201 'status':None,
197 202 'pid':self.pid
198 203 }
199 204 )
200 205 elif isinstance(value, ProcessTerminated):
201 206 self.process_launcher.notify_stop(
202 207 {'exit_code':value.exitCode,
203 208 'signal':value.signal,
204 209 'status':value.status,
205 210 'pid':self.pid
206 211 }
207 212 )
208 213 else:
209 214 raise UnknownStatus("Unknown exit status, this is probably a "
210 215 "bug in Twisted")
211 216
212 217 def outReceived(self, data):
213 218 log.msg(data)
214 219
215 220 def errReceived(self, data):
216 221 log.err(data)
217 222
218 223
219 224 class LocalProcessLauncher(BaseLauncher):
220 225 """Start and stop an external process in an asynchronous manner."""
221 226
222 227 # This is used to to construct self.args, which is passed to
223 228 # spawnProcess.
224 229 cmd_and_args = List([])
225 230
226 231 def __init__(self, working_dir, parent=None, name=None, config=None):
227 232 super(LocalProcessLauncher, self).__init__(
228 233 working_dir, parent, name, config
229 234 )
230 235 self.process_protocol = None
231 236 self.start_deferred = None
232 237
233 238 def find_args(self):
234 239 return self.cmd_and_args
235 240
236 241 def start(self):
237 242 if self.state == 'before':
238 243 self.process_protocol = LocalProcessLauncherProtocol(self)
239 244 self.start_deferred = defer.Deferred()
240 245 self.process_transport = reactor.spawnProcess(
241 246 self.process_protocol,
242 247 str(self.args[0]),
243 248 [str(a) for a in self.args],
244 249 env=os.environ
245 250 )
246 251 return self.start_deferred
247 252 else:
248 253 s = 'The process was already started and has state: %r' % self.state
249 254 return defer.fail(ProcessStateError(s))
250 255
251 256 def notify_start(self, data):
252 257 super(LocalProcessLauncher, self).notify_start(data)
253 258 self.start_deferred.callback(data)
254 259
255 260 def stop(self):
256 261 return self.interrupt_then_kill()
257 262
258 263 @make_deferred
259 264 def signal(self, sig):
260 265 if self.state == 'running':
261 266 self.process_transport.signalProcess(sig)
262 267
263 268 @inlineCallbacks
264 269 def interrupt_then_kill(self, delay=2.0):
265 270 """Send INT, wait a delay and then send KILL."""
266 271 yield self.signal('INT')
267 272 yield sleep_deferred(delay)
268 273 yield self.signal('KILL')
269 274
270 275
271 276 class MPIExecLauncher(LocalProcessLauncher):
272 277 """Launch an external process using mpiexec."""
273 278
274 279 # The mpiexec command to use in starting the process.
275 280 mpi_cmd = List(['mpiexec'], config=True)
276 281 # The command line arguments to pass to mpiexec.
277 282 mpi_args = List([], config=True)
278 283 # The program to start using mpiexec.
279 284 program = List(['date'], config=True)
280 285 # The command line argument to the program.
281 286 program_args = List([], config=True)
282 287 # The number of instances of the program to start.
283 288 n = Int(1, config=True)
284 289
285 290 def find_args(self):
286 291 """Build self.args using all the fields."""
287 292 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
288 293 self.program + self.program_args
289 294
290 295 def start(self, n):
291 296 """Start n instances of the program using mpiexec."""
292 297 self.n = n
293 298 return super(MPIExecLauncher, self).start()
294 299
295 300
296 301 class SSHLauncher(BaseLauncher):
297 302 """A minimal launcher for ssh.
298 303
299 304 To be useful this will probably have to be extended to use the ``sshx``
300 305 idea for environment variables. There could be other things this needs
301 306 as well.
302 307 """
303 308
304 309 ssh_cmd = List(['ssh'], config=True)
305 310 ssh_args = List([], config=True)
306 311 program = List(['date'], config=True)
307 312 program_args = List([], config=True)
308 313 hostname = Str('', config=True)
309 314 user = Str('', config=True)
310 315 location = Str('')
311 316
312 317 def _hostname_changed(self, name, old, new):
313 318 self.location = '%s@%s' % (self.user, new)
314 319
315 320 def _user_changed(self, name, old, new):
316 321 self.location = '%s@%s' % (new, self.hostname)
317 322
318 323 def find_args(self):
319 324 return self.ssh_cmd + self.ssh_args + [self.location] + \
320 325 self.program + self.program_args
321 326
322 327 def start(self, n, hostname=None, user=None):
323 328 if hostname is not None:
324 329 self.hostname = hostname
325 330 if user is not None:
326 331 self.user = user
327 332 return super(SSHLauncher, self).start()
328 333
329 334
330 335 class WindowsHPCLauncher(BaseLauncher):
331 pass
332 336
337 # A regular expression used to get the job id from the output of the
338 # submit_command.
339 job_id_regexp = Str('\d+', config=True)
340 # The filename of the instantiated job script.
341 job_file_name = Unicode(u'ipython_job.xml', config=True)
342 # The full path to the instantiated job script. This gets made dynamically
343 # by combining the working_dir with the job_file_name.
344 job_file = Unicode(u'')
345 # The hostname of the scheduler to submit the job to
346 scheduler = Str('HEADNODE', config=True)
347 username = Str(os.environ.get('USERNAME', ''), config=True)
348 priority = Enum(('Lowest','BelowNormal','Normal','AboveNormal','Highest'),
349 default_value='Highest', config=True)
350 requested_nodes = Str('', config=True)
351 project = Str('MyProject', config=True)
352 job_cmd = Str(find_cmd('job'), config=True)
353
354 def __init__(self, working_dir, parent=None, name=None, config=None):
355 super(WindowsHPCLauncher, self).__init__(
356 working_dir, parent, name, config
357 )
358 self.job_file = os.path.join(self.working_dir, self.job_file_name)
359
360 def write_job_file(self, n):
361 raise NotImplementedError("Implement write_job_file in a subclass.")
362
363 def find_args(self):
364 return ['job.exe']
365
366 def parse_job_id(self, output):
367 """Take the output of the submit command and return the job id."""
368 m = re.search(self.job_id_regexp, output)
369 if m is not None:
370 job_id = m.group()
371 else:
372 raise LauncherError("Job id couldn't be determined: %s" % output)
373 self.job_id = job_id
374 log.msg('Job started with job id: %r' % job_id)
375 return job_id
376
377 @inlineCallbacks
378 def start(self, n):
379 """Start n copies of the process using the Win HPC job scheduler."""
380 self.write_job_file(n)
381 args = [
382 'submit',
383 '/jobfile:%s' % self.job_file,
384 '/scheduler:%s' % self.scheduler
385 ]
386 log.msg("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
387 output = yield getProcessOutput(self.job_cmd,
388 args,
389 env=os.environ,
390 path=self.working_dir
391 )
392 job_id = self.parse_job_id(output)
393 self.notify_start(job_id)
394 defer.returnValue(job_id)
395
396 @inlineCallbacks
397 def stop(self):
398 args = [
399 'cancel',
400 self.job_id,
401 '/scheduler:%s' % self.scheduler
402 ]
403 log.msg("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
404 try:
405 output = yield getProcessOutput(self.job_cmd,
406 args,
407 env=os.environ,
408 path=self.working_dir
409 )
410 except:
411 output = 'The job already appears to be stoppped: %r' % self.job_id
412 self.notify_stop(output) # Pass the output of the kill cmd
413 defer.returnValue(output)
414
333 415
334 416 class BatchSystemLauncher(BaseLauncher):
335 417 """Launch an external process using a batch system.
336 418
337 419 This class is designed to work with UNIX batch systems like PBS, LSF,
338 420 GridEngine, etc. The overall model is that there are different commands
339 421 like qsub, qdel, etc. that handle the starting and stopping of the process.
340 422
341 423 This class also has the notion of a batch script. The ``batch_template``
342 424 attribute can be set to a string that is a template for the batch script.
343 425 This template is instantiated using Itpl. Thus the template can use
344 426 ${n} fot the number of instances. Subclasses can add additional variables
345 427 to the template dict.
346 428 """
347 429
348 430 # Subclasses must fill these in. See PBSEngineSet
349 431 # The name of the command line program used to submit jobs.
350 432 submit_command = Str('', config=True)
351 433 # The name of the command line program used to delete jobs.
352 434 delete_command = Str('', config=True)
353 435 # A regular expression used to get the job id from the output of the
354 436 # submit_command.
355 437 job_id_regexp = Str('', config=True)
356 438 # The string that is the batch script template itself.
357 439 batch_template = Str('', config=True)
358 440 # The filename of the instantiated batch script.
359 441 batch_file_name = Unicode(u'batch_script', config=True)
360 442 # The full path to the instantiated batch script.
361 443 batch_file = Unicode(u'')
362 444
363 445 def __init__(self, working_dir, parent=None, name=None, config=None):
364 446 super(BatchSystemLauncher, self).__init__(
365 447 working_dir, parent, name, config
366 448 )
367 449 self.batch_file = os.path.join(self.working_dir, self.batch_file_name)
368 450 self.context = {}
369 451
370 452 def parse_job_id(self, output):
371 453 """Take the output of the submit command and return the job id."""
372 454 m = re.match(self.job_id_regexp, output)
373 455 if m is not None:
374 456 job_id = m.group()
375 457 else:
376 458 raise LauncherError("Job id couldn't be determined: %s" % output)
377 459 self.job_id = job_id
378 460 log.msg('Job started with job id: %r' % job_id)
379 461 return job_id
380 462
381 463 def write_batch_script(self, n):
382 464 """Instantiate and write the batch script to the working_dir."""
383 465 self.context['n'] = n
384 466 script_as_string = Itpl.itplns(self.batch_template, self.context)
385 467 log.msg('Writing instantiated batch script: %s' % self.batch_file)
386 468 f = open(self.batch_file, 'w')
387 469 f.write(script_as_string)
388 470 f.close()
389 471
390 472 @inlineCallbacks
391 473 def start(self, n):
392 474 """Start n copies of the process using a batch system."""
393 475 self.write_batch_script(n)
394 476 output = yield getProcessOutput(self.submit_command,
395 477 [self.batch_file], env=os.environ)
396 478 job_id = self.parse_job_id(output)
397 479 self.notify_start(job_id)
398 480 defer.returnValue(job_id)
399 481
400 482 @inlineCallbacks
401 483 def stop(self):
402 484 output = yield getProcessOutput(self.delete_command,
403 485 [self.job_id], env=os.environ
404 486 )
405 487 self.notify_stop(output) # Pass the output of the kill cmd
406 488 defer.returnValue(output)
407 489
408 490
409 491 class PBSLauncher(BatchSystemLauncher):
410 492 """A BatchSystemLauncher subclass for PBS."""
411 493
412 494 submit_command = Str('qsub', config=True)
413 495 delete_command = Str('qdel', config=True)
414 496 job_id_regexp = Str('\d+', config=True)
415 497 batch_template = Str('', config=True)
416 498 batch_file_name = Unicode(u'pbs_batch_script', config=True)
417 499 batch_file = Unicode(u'')
418 500
419 501
420 502 #-----------------------------------------------------------------------------
421 503 # Controller launchers
422 504 #-----------------------------------------------------------------------------
423 505
424 506 def find_controller_cmd():
425 507 """Find the command line ipcontroller program in a cross platform way."""
426 508 if sys.platform == 'win32':
427 509 # This logic is needed because the ipcontroller script doesn't
428 510 # always get installed in the same way or in the same location.
429 511 from IPython.kernel import ipcontrollerapp
430 512 script_location = ipcontrollerapp.__file__.replace('.pyc', '.py')
431 513 # The -u option here turns on unbuffered output, which is required
432 514 # on Win32 to prevent wierd conflict and problems with Twisted.
433 515 # Also, use sys.executable to make sure we are picking up the
434 516 # right python exe.
435 517 cmd = [sys.executable, '-u', script_location]
436 518 else:
437 519 # ipcontroller has to be on the PATH in this case.
438 520 cmd = ['ipcontroller']
439 521 return cmd
440 522
441 523
442 524 class LocalControllerLauncher(LocalProcessLauncher):
443 525 """Launch a controller as a regular external process."""
444 526
445 527 controller_cmd = List(find_controller_cmd(), config=True)
446 528 # Command line arguments to ipcontroller.
447 529 controller_args = List(['--log-to-file','--log-level', '40'], config=True)
448 530
449 531 def find_args(self):
450 532 return self.controller_cmd + self.controller_args
451 533
452 534 def start(self, profile=None, cluster_dir=None):
453 535 """Start the controller by profile or cluster_dir."""
454 536 if cluster_dir is not None:
455 537 self.controller_args.extend(['--cluster-dir', cluster_dir])
456 538 if profile is not None:
457 539 self.controller_args.extend(['--profile', profile])
458 540 log.msg("Starting LocalControllerLauncher: %r" % self.args)
459 541 return super(LocalControllerLauncher, self).start()
460 542
461 543
462 544 class WindowsHPCControllerLauncher(WindowsHPCLauncher):
463 pass
545
546 job_file_name = Unicode(u'ipcontroller_job.xml', config=True)
547 extra_args = List([],config=False)
548
549 def write_job_file(self, n):
550 job = WinHPCJob(self)
551 job.job_name = "IPController"
552 job.username = self.username
553 job.priority = self.priority
554 job.requested_nodes = self.requested_nodes
555 job.project = self.project
556
557 t = IPControllerTask(self)
558 t.work_directory = self.working_dir
559 # Add the --profile and --cluster-dir args from start.
560 t.controller_args.extend(self.extra_args)
561 job.add_task(t)
562 log.msg("Writing job description file: %s" % self.job_file)
563 job.write(self.job_file)
564
565 def start(self, profile=None, cluster_dir=None):
566 """Start the controller by profile or cluster_dir."""
567 if cluster_dir is not None:
568 self.extra_args = ['--cluster-dir', cluster_dir]
569 if profile is not None:
570 self.extra_args = ['--profile', profile]
571 return super(WindowsHPCControllerLauncher, self).start(1)
464 572
465 573
466 574 class MPIExecControllerLauncher(MPIExecLauncher):
467 575 """Launch a controller using mpiexec."""
468 576
469 577 controller_cmd = List(find_controller_cmd(), config=True)
470 578 # Command line arguments to ipcontroller.
471 579 controller_args = List(['--log-to-file','--log-level', '40'], config=True)
472 580 n = Int(1, config=False)
473 581
474 582 def start(self, profile=None, cluster_dir=None):
475 583 """Start the controller by profile or cluster_dir."""
476 584 if cluster_dir is not None:
477 585 self.controller_args.extend(['--cluster-dir', cluster_dir])
478 586 if profile is not None:
479 587 self.controller_args.extend(['--profile', profile])
480 588 log.msg("Starting MPIExecControllerLauncher: %r" % self.args)
481 589 return super(MPIExecControllerLauncher, self).start(1)
482 590
483 591 def find_args(self):
484 592 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
485 593 self.controller_cmd + self.controller_args
486 594
487 595
488 596 class PBSControllerLauncher(PBSLauncher):
489 597 """Launch a controller using PBS."""
490 598
491 599 batch_file_name = Unicode(u'pbs_batch_script_controller', config=True)
492 600
493 601 def start(self, profile=None, cluster_dir=None):
494 602 """Start the controller by profile or cluster_dir."""
495 603 # Here we save profile and cluster_dir in the context so they
496 604 # can be used in the batch script template as ${profile} and
497 605 # ${cluster_dir}
498 606 if cluster_dir is not None:
499 607 self.context['cluster_dir'] = cluster_dir
500 608 if profile is not None:
501 609 self.context['profile'] = profile
502 610 log.msg("Starting PBSControllerLauncher: %r" % self.args)
503 611 return super(PBSControllerLauncher, self).start(1)
504 612
505 613
506 614 class SSHControllerLauncher(SSHLauncher):
507 615 pass
508 616
509 617
510 618 #-----------------------------------------------------------------------------
511 619 # Engine launchers
512 620 #-----------------------------------------------------------------------------
513 621
514 622
515 623 def find_engine_cmd():
516 624 """Find the command line ipengine program in a cross platform way."""
517 625 if sys.platform == 'win32':
518 626 # This logic is needed because the ipengine script doesn't
519 627 # always get installed in the same way or in the same location.
520 628 from IPython.kernel import ipengineapp
521 629 script_location = ipengineapp.__file__.replace('.pyc', '.py')
522 630 # The -u option here turns on unbuffered output, which is required
523 631 # on Win32 to prevent wierd conflict and problems with Twisted.
524 632 # Also, use sys.executable to make sure we are picking up the
525 633 # right python exe.
526 634 cmd = [sys.executable, '-u', script_location]
527 635 else:
528 636 # ipcontroller has to be on the PATH in this case.
529 637 cmd = ['ipengine']
530 638 return cmd
531 639
532 640
533 641 class LocalEngineLauncher(LocalProcessLauncher):
534 642 """Launch a single engine as a regular externall process."""
535 643
536 644 engine_cmd = List(find_engine_cmd(), config=True)
537 645 # Command line arguments for ipengine.
538 646 engine_args = List(
539 647 ['--log-to-file','--log-level', '40'], config=True
540 648 )
541 649
542 650 def find_args(self):
543 651 return self.engine_cmd + self.engine_args
544 652
545 653 def start(self, profile=None, cluster_dir=None):
546 654 """Start the engine by profile or cluster_dir."""
547 655 if cluster_dir is not None:
548 656 self.engine_args.extend(['--cluster-dir', cluster_dir])
549 657 if profile is not None:
550 658 self.engine_args.extend(['--profile', profile])
551 659 return super(LocalEngineLauncher, self).start()
552 660
553 661
554 662 class LocalEngineSetLauncher(BaseLauncher):
555 663 """Launch a set of engines as regular external processes."""
556 664
557 665 # Command line arguments for ipengine.
558 666 engine_args = List(
559 667 ['--log-to-file','--log-level', '40'], config=True
560 668 )
561 669
562 670 def __init__(self, working_dir, parent=None, name=None, config=None):
563 671 super(LocalEngineSetLauncher, self).__init__(
564 672 working_dir, parent, name, config
565 673 )
566 674 self.launchers = []
567 675
568 676 def start(self, n, profile=None, cluster_dir=None):
569 677 """Start n engines by profile or cluster_dir."""
570 678 dlist = []
571 679 for i in range(n):
572 680 el = LocalEngineLauncher(self.working_dir, self)
573 681 # Copy the engine args over to each engine launcher.
574 682 import copy
575 683 el.engine_args = copy.deepcopy(self.engine_args)
576 684 d = el.start(profile, cluster_dir)
577 685 if i==0:
578 686 log.msg("Starting LocalEngineSetLauncher: %r" % el.args)
579 687 self.launchers.append(el)
580 688 dlist.append(d)
581 689 # The consumeErrors here could be dangerous
582 690 dfinal = gatherBoth(dlist, consumeErrors=True)
583 691 dfinal.addCallback(self.notify_start)
584 692 return dfinal
585 693
586 694 def find_args(self):
587 695 return ['engine set']
588 696
589 697 def signal(self, sig):
590 698 dlist = []
591 699 for el in self.launchers:
592 700 d = el.signal(sig)
593 701 dlist.append(d)
594 702 dfinal = gatherBoth(dlist, consumeErrors=True)
595 703 return dfinal
596 704
597 705 def interrupt_then_kill(self, delay=1.0):
598 706 dlist = []
599 707 for el in self.launchers:
600 708 d = el.interrupt_then_kill(delay)
601 709 dlist.append(d)
602 710 dfinal = gatherBoth(dlist, consumeErrors=True)
603 711 return dfinal
604 712
605 713 def stop(self):
606 714 return self.interrupt_then_kill()
607 715
608 716 def observe_stop(self):
609 717 dlist = [el.observe_stop() for el in self.launchers]
610 718 dfinal = gatherBoth(dlist, consumeErrors=False)
611 719 dfinal.addCallback(self.notify_stop)
612 720 return dfinal
613 721
614 722
615 723 class MPIExecEngineSetLauncher(MPIExecLauncher):
616 724
617 725 engine_cmd = List(find_engine_cmd(), config=True)
618 726 # Command line arguments for ipengine.
619 727 engine_args = List(
620 728 ['--log-to-file','--log-level', '40'], config=True
621 729 )
622 730 n = Int(1, config=True)
623 731
624 732 def start(self, n, profile=None, cluster_dir=None):
625 733 """Start n engines by profile or cluster_dir."""
626 734 if cluster_dir is not None:
627 735 self.engine_args.extend(['--cluster-dir', cluster_dir])
628 736 if profile is not None:
629 737 self.engine_args.extend(['--profile', profile])
630 738 log.msg('Starting MPIExecEngineSetLauncher: %r' % self.args)
631 739 return super(MPIExecEngineSetLauncher, self).start(n)
632 740
633 741 def find_args(self):
634 742 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
635 743 self.engine_cmd + self.engine_args
636 744
637 745
638 746 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):
639 747 pass
640 748
641 749
642 750 class PBSEngineSetLauncher(PBSLauncher):
643 751
644 752 batch_file_name = Unicode(u'pbs_batch_script_engines', config=True)
645 753
646 754 def start(self, n, profile=None, cluster_dir=None):
647 755 """Start n engines by profile or cluster_dir."""
648 756 if cluster_dir is not None:
649 757 self.program_args.extend(['--cluster-dir', cluster_dir])
650 758 if profile is not None:
651 759 self.program_args.extend(['-p', profile])
652 760 log.msg('Starting PBSEngineSetLauncher: %r' % self.args)
653 761 return super(PBSEngineSetLauncher, self).start(n)
654 762
655 763
656 764 class SSHEngineSetLauncher(BaseLauncher):
657 765 pass
658 766
659 767
660 768 #-----------------------------------------------------------------------------
661 769 # A launcher for ipcluster itself!
662 770 #-----------------------------------------------------------------------------
663 771
664 772
665 773 def find_ipcluster_cmd():
666 774 """Find the command line ipcluster program in a cross platform way."""
667 775 if sys.platform == 'win32':
668 776 # This logic is needed because the ipcluster script doesn't
669 777 # always get installed in the same way or in the same location.
670 778 from IPython.kernel import ipclusterapp
671 779 script_location = ipclusterapp.__file__.replace('.pyc', '.py')
672 780 # The -u option here turns on unbuffered output, which is required
673 781 # on Win32 to prevent wierd conflict and problems with Twisted.
674 782 # Also, use sys.executable to make sure we are picking up the
675 783 # right python exe.
676 784 cmd = [sys.executable, '-u', script_location]
677 785 else:
678 786 # ipcontroller has to be on the PATH in this case.
679 787 cmd = ['ipcluster']
680 788 return cmd
681 789
682 790
683 791 class IPClusterLauncher(LocalProcessLauncher):
684 792 """Launch the ipcluster program in an external process."""
685 793
686 794 ipcluster_cmd = List(find_ipcluster_cmd(), config=True)
687 795 # Command line arguments to pass to ipcluster.
688 796 ipcluster_args = List(
689 797 ['--clean-logs', '--log-to-file', '--log-level', '40'], config=True)
690 798 ipcluster_subcommand = Str('start')
691 799 ipcluster_n = Int(2)
692 800
693 801 def find_args(self):
694 802 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
695 803 ['-n', repr(self.ipcluster_n)] + self.ipcluster_args
696 804
697 805 def start(self):
698 806 log.msg("Starting ipcluster: %r" % self.args)
699 807 return super(IPClusterLauncher, self).start()
700 808
@@ -1,229 +1,277 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 Job and task components for writing .xml files that the Windows HPC Server
5 5 2008 can use to start jobs.
6 6 """
7 7
8 8 #-----------------------------------------------------------------------------
9 9 # Copyright (C) 2008-2009 The IPython Development Team
10 10 #
11 11 # Distributed under the terms of the BSD License. The full license is in
12 12 # the file COPYING, distributed as part of this software.
13 13 #-----------------------------------------------------------------------------
14 14
15 15 #-----------------------------------------------------------------------------
16 16 # Imports
17 17 #-----------------------------------------------------------------------------
18 18
19 19 from __future__ import with_statement
20 20
21 21 import os
22 22 import re
23 import uuid
23 24
24 25 from xml.etree import ElementTree as ET
25 26 from xml.dom import minidom
26 27
27 28 from IPython.core.component import Component
28 29 from IPython.external import Itpl
29 30 from IPython.utils.traitlets import (
30 31 Str, Int, List, Unicode, Instance,
31 Enum, Bool
32 Enum, Bool, CStr
32 33 )
33 34
34 35 #-----------------------------------------------------------------------------
35 36 # Job and Task Component
36 37 #-----------------------------------------------------------------------------
37 38
38 39
39 40 def as_str(value):
40 41 if isinstance(value, str):
41 42 return value
42 43 elif isinstance(value, bool):
43 44 if value:
44 45 return 'true'
45 46 else:
46 47 return 'false'
47 48 elif isinstance(value, (int, float)):
48 49 return repr(value)
49 50 else:
50 51 return value
51 52
52 53
53 54 def indent(elem, level=0):
54 55 i = "\n" + level*" "
55 56 if len(elem):
56 57 if not elem.text or not elem.text.strip():
57 58 elem.text = i + " "
58 59 if not elem.tail or not elem.tail.strip():
59 60 elem.tail = i
60 61 for elem in elem:
61 62 indent(elem, level+1)
62 63 if not elem.tail or not elem.tail.strip():
63 64 elem.tail = i
64 65 else:
65 66 if level and (not elem.tail or not elem.tail.strip()):
66 67 elem.tail = i
67 68
68 69
69 70 class WinHPCJob(Component):
70 71
71 72 job_id = Str('')
72 73 job_name = Str('MyJob', config=True)
73 74 min_cores = Int(1, config=True)
74 75 max_cores = Int(1, config=True)
75 76 min_sockets = Int(1, config=True)
76 77 max_sockets = Int(1, config=True)
77 78 min_nodes = Int(1, config=True)
78 79 max_nodes = Int(1, config=True)
79 80 unit_type = Str("Core", config=True)
80 81 auto_calculate_min = Bool(True, config=True)
81 82 auto_calculate_max = Bool(True, config=True)
82 83 run_until_canceled = Bool(False, config=True)
83 84 is_exclusive = Bool(False, config=True)
84 85 username = Str(os.environ.get('USERNAME', ''), config=True)
85 owner = Str('', config=True)
86 86 job_type = Str('Batch', config=True)
87 87 priority = Enum(('Lowest','BelowNormal','Normal','AboveNormal','Highest'),
88 88 default_value='Highest', config=True)
89 89 requested_nodes = Str('', config=True)
90 90 project = Str('IPython', config=True)
91 91 xmlns = Str('http://schemas.microsoft.com/HPCS2008/scheduler/')
92 92 version = Str("2.000")
93 93 tasks = List([])
94 94
95 def _username_changed(self, name, old, new):
96 self.owner = new
95 @property
96 def owner(self):
97 return self.username
97 98
98 99 def _write_attr(self, root, attr, key):
99 100 s = as_str(getattr(self, attr, ''))
100 101 if s:
101 102 root.set(key, s)
102 103
103 104 def as_element(self):
104 105 # We have to add _A_ type things to get the right order than
105 106 # the MSFT XML parser expects.
106 107 root = ET.Element('Job')
107 108 self._write_attr(root, 'version', '_A_Version')
108 109 self._write_attr(root, 'job_name', '_B_Name')
109 110 self._write_attr(root, 'unit_type', '_C_UnitType')
110 111 self._write_attr(root, 'min_cores', '_D_MinCores')
111 112 self._write_attr(root, 'max_cores', '_E_MaxCores')
112 113 self._write_attr(root, 'min_sockets', '_F_MinSockets')
113 114 self._write_attr(root, 'max_sockets', '_G_MaxSockets')
114 115 self._write_attr(root, 'min_nodes', '_H_MinNodes')
115 116 self._write_attr(root, 'max_nodes', '_I_MaxNodes')
116 117 self._write_attr(root, 'run_until_canceled', '_J_RunUntilCanceled')
117 118 self._write_attr(root, 'is_exclusive', '_K_IsExclusive')
118 119 self._write_attr(root, 'username', '_L_UserName')
119 120 self._write_attr(root, 'job_type', '_M_JobType')
120 121 self._write_attr(root, 'priority', '_N_Priority')
121 122 self._write_attr(root, 'requested_nodes', '_O_RequestedNodes')
122 123 self._write_attr(root, 'auto_calculate_max', '_P_AutoCalculateMax')
123 124 self._write_attr(root, 'auto_calculate_min', '_Q_AutoCalculateMin')
124 125 self._write_attr(root, 'project', '_R_Project')
125 126 self._write_attr(root, 'owner', '_S_Owner')
126 127 self._write_attr(root, 'xmlns', '_T_xmlns')
127 128 dependencies = ET.SubElement(root, "Dependencies")
128 129 etasks = ET.SubElement(root, "Tasks")
129 130 for t in self.tasks:
130 131 etasks.append(t.as_element())
131 132 return root
132 133
133 134 def tostring(self):
134 135 """Return the string representation of the job description XML."""
135 136 root = self.as_element()
136 137 indent(root)
137 138 txt = ET.tostring(root, encoding="utf-8")
138 139 # Now remove the tokens used to order the attributes.
139 140 txt = re.sub(r'_[A-Z]_','',txt)
140 141 txt = '<?xml version="1.0" encoding="utf-8"?>\n' + txt
141 142 return txt
142 143
143 144 def write(self, filename):
144 145 """Write the XML job description to a file."""
145 146 txt = self.tostring()
146 147 with open(filename, 'w') as f:
147 148 f.write(txt)
148 149
149 150 def add_task(self, task):
150 151 """Add a task to the job.
151 152
152 153 Parameters
153 154 ----------
154 155 task : :class:`WinHPCTask`
155 156 The task object to add.
156 157 """
157 158 self.tasks.append(task)
158 159
159 160
160 161 class WinHPCTask(Component):
161 162
162 163 task_id = Str('')
163 164 task_name = Str('')
164 165 version = Str("2.000")
165 166 min_cores = Int(1, config=True)
166 167 max_cores = Int(1, config=True)
167 168 min_sockets = Int(1, config=True)
168 169 max_sockets = Int(1, config=True)
169 170 min_nodes = Int(1, config=True)
170 171 max_nodes = Int(1, config=True)
171 172 unit_type = Str("Core", config=True)
172 command_line = Str('', config=True)
173 work_directory = Str('', config=True)
173 command_line = CStr('', config=True)
174 work_directory = CStr('', config=True)
174 175 is_rerunnaable = Bool(True, config=True)
175 std_out_file_path = Str('', config=True)
176 std_err_file_path = Str('', config=True)
176 std_out_file_path = CStr('', config=True)
177 std_err_file_path = CStr('', config=True)
177 178 is_parametric = Bool(False, config=True)
178 environment_variables = Instance(dict, args=())
179 environment_variables = Instance(dict, args=(), config=True)
179 180
180 181 def _write_attr(self, root, attr, key):
181 182 s = as_str(getattr(self, attr, ''))
182 183 if s:
183 184 root.set(key, s)
184 185
185 186 def as_element(self):
186 187 root = ET.Element('Task')
187 188 self._write_attr(root, 'version', '_A_Version')
188 189 self._write_attr(root, 'task_name', '_B_Name')
189 190 self._write_attr(root, 'min_cores', '_C_MinCores')
190 191 self._write_attr(root, 'max_cores', '_D_MaxCores')
191 192 self._write_attr(root, 'min_sockets', '_E_MinSockets')
192 193 self._write_attr(root, 'max_sockets', '_F_MaxSockets')
193 194 self._write_attr(root, 'min_nodes', '_G_MinNodes')
194 195 self._write_attr(root, 'max_nodes', '_H_MaxNodes')
195 196 self._write_attr(root, 'command_line', '_I_CommandLine')
196 197 self._write_attr(root, 'work_directory', '_J_WorkDirectory')
197 198 self._write_attr(root, 'is_rerunnaable', '_K_IsRerunnable')
198 199 self._write_attr(root, 'std_out_file_path', '_L_StdOutFilePath')
199 200 self._write_attr(root, 'std_err_file_path', '_M_StdErrFilePath')
200 201 self._write_attr(root, 'is_parametric', '_N_IsParametric')
201 202 self._write_attr(root, 'unit_type', '_O_UnitType')
202 203 root.append(self.get_env_vars())
203 204 return root
204 205
205 206 def get_env_vars(self):
206 207 env_vars = ET.Element('EnvironmentVariables')
207 208 for k, v in self.environment_variables.items():
208 209 variable = ET.SubElement(env_vars, "Variable")
209 210 name = ET.SubElement(variable, "Name")
210 211 name.text = k
211 212 value = ET.SubElement(variable, "Value")
212 213 value.text = v
213 214 return env_vars
214 215
215 216
217
218 # By declaring these, we can configure the controller and engine separately!
219
220 class IPControllerTask(WinHPCTask):
221
222 task_name = Str('IPController', config=True)
223 controller_cmd = List(['ipcontroller.exe'], config=True)
224 controller_args = List(['--log-to-file', '--log-level', '40'], config=True)
225 # I don't want these to be configurable
226 std_out_file_path = CStr(os.path.join('log','ipcontroller-out.txt'), config=False)
227 std_err_file_path = CStr(os.path.join('log','ipcontroller-err.txt'), config=False)
228 min_cores = Int(1, config=False)
229 max_cores = Int(1, config=False)
230 min_sockets = Int(1, config=False)
231 max_sockets = Int(1, config=False)
232 min_nodes = Int(1, config=False)
233 max_nodes = Int(1, config=False)
234 unit_type = Str("Core", config=False)
235 work_directory = CStr('', config=False)
236
237 @property
238 def command_line(self):
239 return ' '.join(self.controller_cmd + self.controller_args)
240
241
242 class IPEngineTask(WinHPCTask):
243
244 task_name = Str('IPEngine', config=True)
245 engine_cmd = List(['ipengine.exe'], config=True)
246 engine_args = List(['--log-to-file', '--log-level', '40'], config=True)
247 # I don't want these to be configurable
248 std_out_file_path = CStr(os.path.join('log','ipengine-out-%s.txt' % uuid.uuid1()), config=False)
249 std_err_file_path = CStr(os.path.join('log','ipengine-err-%s.txt' % uuid.uuid1()), config=False)
250 min_cores = Int(1, config=False)
251 max_cores = Int(1, config=False)
252 min_sockets = Int(1, config=False)
253 max_sockets = Int(1, config=False)
254 min_nodes = Int(1, config=False)
255 max_nodes = Int(1, config=False)
256 unit_type = Str("Core", config=False)
257 work_directory = CStr('', config=False)
258
259 @property
260 def command_line(self):
261 return ' '.join(self.engine_cmd + self.engine_args)
262
263
216 264 # j = WinHPCJob(None)
217 265 # j.job_name = 'IPCluster'
218 266 # j.username = 'GNET\\bgranger'
219 267 # j.requested_nodes = 'GREEN'
220 268 #
221 269 # t = WinHPCTask(None)
222 270 # t.task_name = 'Controller'
223 271 # t.command_line = r"\\blue\domainusers$\bgranger\Python\Python25\Scripts\ipcontroller.exe --log-to-file -p default --log-level 10"
224 272 # t.work_directory = r"\\blue\domainusers$\bgranger\.ipython\cluster_default"
225 273 # t.std_out_file_path = 'controller-out.txt'
226 274 # t.std_err_file_path = 'controller-err.txt'
227 275 # t.environment_variables['PYTHONPATH'] = r"\\blue\domainusers$\bgranger\Python\Python25\Lib\site-packages"
228 276 # j.add_task(t)
229 277
General Comments 0
You need to be logged in to leave comments. Login now