##// END OF EJS Templates
Initial version of Win HPC job scehduler support.
bgranger -
Show More
@@ -16,7 +16,7 b' c = get_config()'
16 # The selected launchers can be configured below.
16 # The selected launchers can be configured below.
17
17
18 # Options are (LocalControllerLauncher, MPIExecControllerLauncher,
18 # Options are (LocalControllerLauncher, MPIExecControllerLauncher,
19 # PBSControllerLauncher)
19 # PBSControllerLauncher, WindowsHPCControllerLauncher)
20 # c.Global.controller_launcher = 'IPython.kernel.launcher.LocalControllerLauncher'
20 # c.Global.controller_launcher = 'IPython.kernel.launcher.LocalControllerLauncher'
21
21
22 # Options are (LocalEngineSetLauncher, MPIExecEngineSetLauncher,
22 # Options are (LocalEngineSetLauncher, MPIExecEngineSetLauncher,
@@ -79,6 +79,45 b' c = get_config()'
79 # c.PBSControllerLauncher.batch_file_name = u'pbs_batch_script_controller'
79 # c.PBSControllerLauncher.batch_file_name = u'pbs_batch_script_controller'
80
80
81 #-----------------------------------------------------------------------------
81 #-----------------------------------------------------------------------------
82 # Windows HPC Server 2008 launcher configuration
83 #-----------------------------------------------------------------------------
84
85 # c.WinHPCJob.username = 'DOMAIN\\user'
86 # c.WinHPCJob.priority = 'Highest'
87 # c.WinHPCJob.requested_nodes = ''
88 # c.WinHPCJob.project = ''
89 # c.WinHPCJob.is_exclusive = False
90
91 # c.WinHPCTask.environment_variables = {}
92 # c.WinHPCTask.work_directory = ''
93 # c.WinHPCTask.is_rerunnable = True
94
95 # c.IPControllerTask.task_name = 'IPController'
96 # c.IPControllerTask.controller_cmd = ['ipcontroller.exe']
97 # c.IPControllerTask.controller_args = ['--log-to-file', '--log-level', '40']
98 # c.IPControllerTask.environment_variables = {}
99
100 # c.IPEngineTask.task_name = 'IPController'
101 # c.IPEngineTask.engine_cmd = ['ipengine.exe']
102 # c.IPEngineTask.engine_args = ['--log-to-file', '--log-level', '40']
103 # c.IPEngineTask.environment_variables = {}
104
105 # c.WindowsHPCLauncher.scheduler = 'HEADNODE'
106 # c.WindowsHPCLauncher.username = '\\DOMAIN\USERNAME'
107 # c.WindowsHPCLauncher.priority = 'Highest'
108 # c.WindowsHPCLauncher.requested_nodes = ''
109 # c.WindowsHPCLauncher.job_file_name = u'ipython_job.xml'
110 # c.WindowsHPCLauncher.project = 'MyProject'
111
112 # c.WindowsHPCControllerLauncher.scheduler = 'HEADNODE'
113 # c.WindowsHPCControllerLauncher.username = '\\DOMAIN\USERNAME'
114 # c.WindowsHPCControllerLauncher.priority = 'Highest'
115 # c.WindowsHPCControllerLauncher.requested_nodes = ''
116 # c.WindowsHPCControllerLauncher.job_file_name = u'ipcontroller_job.xml'
117 # c.WindowsHPCControllerLauncher.project = 'MyProject'
118
119
120 #-----------------------------------------------------------------------------
82 # Engine launcher configuration
121 # Engine launcher configuration
83 #-----------------------------------------------------------------------------
122 #-----------------------------------------------------------------------------
84
123
@@ -21,8 +21,13 b' import sys'
21
21
22 from IPython.core.component import Component
22 from IPython.core.component import Component
23 from IPython.external import Itpl
23 from IPython.external import Itpl
24 from IPython.utils.traitlets import Str, Int, List, Unicode
24 from IPython.utils.traitlets import Str, Int, List, Unicode, Enum
25 from IPython.utils.platutils import find_cmd
25 from IPython.kernel.twistedutil import gatherBoth, make_deferred, sleep_deferred
26 from IPython.kernel.twistedutil import gatherBoth, make_deferred, sleep_deferred
27 from IPython.kernel.winhpcjob import (
28 WinHPCJob, WinHPCTask,
29 IPControllerTask, IPEngineTask
30 )
26
31
27 from twisted.internet import reactor, defer
32 from twisted.internet import reactor, defer
28 from twisted.internet.defer import inlineCallbacks
33 from twisted.internet.defer import inlineCallbacks
@@ -328,8 +333,85 b' class SSHLauncher(BaseLauncher):'
328
333
329
334
330 class WindowsHPCLauncher(BaseLauncher):
335 class WindowsHPCLauncher(BaseLauncher):
331 pass
332
336
337 # A regular expression used to get the job id from the output of the
338 # submit_command.
339 job_id_regexp = Str('\d+', config=True)
340 # The filename of the instantiated job script.
341 job_file_name = Unicode(u'ipython_job.xml', config=True)
342 # The full path to the instantiated job script. This gets made dynamically
343 # by combining the working_dir with the job_file_name.
344 job_file = Unicode(u'')
345 # The hostname of the scheduler to submit the job to
346 scheduler = Str('HEADNODE', config=True)
347 username = Str(os.environ.get('USERNAME', ''), config=True)
348 priority = Enum(('Lowest','BelowNormal','Normal','AboveNormal','Highest'),
349 default_value='Highest', config=True)
350 requested_nodes = Str('', config=True)
351 project = Str('MyProject', config=True)
352 job_cmd = Str(find_cmd('job'), config=True)
353
354 def __init__(self, working_dir, parent=None, name=None, config=None):
355 super(WindowsHPCLauncher, self).__init__(
356 working_dir, parent, name, config
357 )
358 self.job_file = os.path.join(self.working_dir, self.job_file_name)
359
360 def write_job_file(self, n):
361 raise NotImplementedError("Implement write_job_file in a subclass.")
362
363 def find_args(self):
364 return ['job.exe']
365
366 def parse_job_id(self, output):
367 """Take the output of the submit command and return the job id."""
368 m = re.search(self.job_id_regexp, output)
369 if m is not None:
370 job_id = m.group()
371 else:
372 raise LauncherError("Job id couldn't be determined: %s" % output)
373 self.job_id = job_id
374 log.msg('Job started with job id: %r' % job_id)
375 return job_id
376
377 @inlineCallbacks
378 def start(self, n):
379 """Start n copies of the process using the Win HPC job scheduler."""
380 self.write_job_file(n)
381 args = [
382 'submit',
383 '/jobfile:%s' % self.job_file,
384 '/scheduler:%s' % self.scheduler
385 ]
386 log.msg("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
387 output = yield getProcessOutput(self.job_cmd,
388 args,
389 env=os.environ,
390 path=self.working_dir
391 )
392 job_id = self.parse_job_id(output)
393 self.notify_start(job_id)
394 defer.returnValue(job_id)
395
396 @inlineCallbacks
397 def stop(self):
398 args = [
399 'cancel',
400 self.job_id,
401 '/scheduler:%s' % self.scheduler
402 ]
403 log.msg("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
404 try:
405 output = yield getProcessOutput(self.job_cmd,
406 args,
407 env=os.environ,
408 path=self.working_dir
409 )
410 except:
411 output = 'The job already appears to be stoppped: %r' % self.job_id
412 self.notify_stop(output) # Pass the output of the kill cmd
413 defer.returnValue(output)
414
333
415
334 class BatchSystemLauncher(BaseLauncher):
416 class BatchSystemLauncher(BaseLauncher):
335 """Launch an external process using a batch system.
417 """Launch an external process using a batch system.
@@ -460,7 +542,33 b' class LocalControllerLauncher(LocalProcessLauncher):'
460
542
461
543
462 class WindowsHPCControllerLauncher(WindowsHPCLauncher):
544 class WindowsHPCControllerLauncher(WindowsHPCLauncher):
463 pass
545
546 job_file_name = Unicode(u'ipcontroller_job.xml', config=True)
547 extra_args = List([],config=False)
548
549 def write_job_file(self, n):
550 job = WinHPCJob(self)
551 job.job_name = "IPController"
552 job.username = self.username
553 job.priority = self.priority
554 job.requested_nodes = self.requested_nodes
555 job.project = self.project
556
557 t = IPControllerTask(self)
558 t.work_directory = self.working_dir
559 # Add the --profile and --cluster-dir args from start.
560 t.controller_args.extend(self.extra_args)
561 job.add_task(t)
562 log.msg("Writing job description file: %s" % self.job_file)
563 job.write(self.job_file)
564
565 def start(self, profile=None, cluster_dir=None):
566 """Start the controller by profile or cluster_dir."""
567 if cluster_dir is not None:
568 self.extra_args = ['--cluster-dir', cluster_dir]
569 if profile is not None:
570 self.extra_args = ['--profile', profile]
571 return super(WindowsHPCControllerLauncher, self).start(1)
464
572
465
573
466 class MPIExecControllerLauncher(MPIExecLauncher):
574 class MPIExecControllerLauncher(MPIExecLauncher):
@@ -20,6 +20,7 b' from __future__ import with_statement'
20
20
21 import os
21 import os
22 import re
22 import re
23 import uuid
23
24
24 from xml.etree import ElementTree as ET
25 from xml.etree import ElementTree as ET
25 from xml.dom import minidom
26 from xml.dom import minidom
@@ -28,7 +29,7 b' from IPython.core.component import Component'
28 from IPython.external import Itpl
29 from IPython.external import Itpl
29 from IPython.utils.traitlets import (
30 from IPython.utils.traitlets import (
30 Str, Int, List, Unicode, Instance,
31 Str, Int, List, Unicode, Instance,
31 Enum, Bool
32 Enum, Bool, CStr
32 )
33 )
33
34
34 #-----------------------------------------------------------------------------
35 #-----------------------------------------------------------------------------
@@ -82,7 +83,6 b' class WinHPCJob(Component):'
82 run_until_canceled = Bool(False, config=True)
83 run_until_canceled = Bool(False, config=True)
83 is_exclusive = Bool(False, config=True)
84 is_exclusive = Bool(False, config=True)
84 username = Str(os.environ.get('USERNAME', ''), config=True)
85 username = Str(os.environ.get('USERNAME', ''), config=True)
85 owner = Str('', config=True)
86 job_type = Str('Batch', config=True)
86 job_type = Str('Batch', config=True)
87 priority = Enum(('Lowest','BelowNormal','Normal','AboveNormal','Highest'),
87 priority = Enum(('Lowest','BelowNormal','Normal','AboveNormal','Highest'),
88 default_value='Highest', config=True)
88 default_value='Highest', config=True)
@@ -92,8 +92,9 b' class WinHPCJob(Component):'
92 version = Str("2.000")
92 version = Str("2.000")
93 tasks = List([])
93 tasks = List([])
94
94
95 def _username_changed(self, name, old, new):
95 @property
96 self.owner = new
96 def owner(self):
97 return self.username
97
98
98 def _write_attr(self, root, attr, key):
99 def _write_attr(self, root, attr, key):
99 s = as_str(getattr(self, attr, ''))
100 s = as_str(getattr(self, attr, ''))
@@ -169,13 +170,13 b' class WinHPCTask(Component):'
169 min_nodes = Int(1, config=True)
170 min_nodes = Int(1, config=True)
170 max_nodes = Int(1, config=True)
171 max_nodes = Int(1, config=True)
171 unit_type = Str("Core", config=True)
172 unit_type = Str("Core", config=True)
172 command_line = Str('', config=True)
173 command_line = CStr('', config=True)
173 work_directory = Str('', config=True)
174 work_directory = CStr('', config=True)
174 is_rerunnaable = Bool(True, config=True)
175 is_rerunnaable = Bool(True, config=True)
175 std_out_file_path = Str('', config=True)
176 std_out_file_path = CStr('', config=True)
176 std_err_file_path = Str('', config=True)
177 std_err_file_path = CStr('', config=True)
177 is_parametric = Bool(False, config=True)
178 is_parametric = Bool(False, config=True)
178 environment_variables = Instance(dict, args=())
179 environment_variables = Instance(dict, args=(), config=True)
179
180
180 def _write_attr(self, root, attr, key):
181 def _write_attr(self, root, attr, key):
181 s = as_str(getattr(self, attr, ''))
182 s = as_str(getattr(self, attr, ''))
@@ -213,6 +214,53 b' class WinHPCTask(Component):'
213 return env_vars
214 return env_vars
214
215
215
216
217
218 # By declaring these, we can configure the controller and engine separately!
219
220 class IPControllerTask(WinHPCTask):
221
222 task_name = Str('IPController', config=True)
223 controller_cmd = List(['ipcontroller.exe'], config=True)
224 controller_args = List(['--log-to-file', '--log-level', '40'], config=True)
225 # I don't want these to be configurable
226 std_out_file_path = CStr(os.path.join('log','ipcontroller-out.txt'), config=False)
227 std_err_file_path = CStr(os.path.join('log','ipcontroller-err.txt'), config=False)
228 min_cores = Int(1, config=False)
229 max_cores = Int(1, config=False)
230 min_sockets = Int(1, config=False)
231 max_sockets = Int(1, config=False)
232 min_nodes = Int(1, config=False)
233 max_nodes = Int(1, config=False)
234 unit_type = Str("Core", config=False)
235 work_directory = CStr('', config=False)
236
237 @property
238 def command_line(self):
239 return ' '.join(self.controller_cmd + self.controller_args)
240
241
242 class IPEngineTask(WinHPCTask):
243
244 task_name = Str('IPEngine', config=True)
245 engine_cmd = List(['ipengine.exe'], config=True)
246 engine_args = List(['--log-to-file', '--log-level', '40'], config=True)
247 # I don't want these to be configurable
248 std_out_file_path = CStr(os.path.join('log','ipengine-out-%s.txt' % uuid.uuid1()), config=False)
249 std_err_file_path = CStr(os.path.join('log','ipengine-err-%s.txt' % uuid.uuid1()), config=False)
250 min_cores = Int(1, config=False)
251 max_cores = Int(1, config=False)
252 min_sockets = Int(1, config=False)
253 max_sockets = Int(1, config=False)
254 min_nodes = Int(1, config=False)
255 max_nodes = Int(1, config=False)
256 unit_type = Str("Core", config=False)
257 work_directory = CStr('', config=False)
258
259 @property
260 def command_line(self):
261 return ' '.join(self.engine_cmd + self.engine_args)
262
263
216 # j = WinHPCJob(None)
264 # j = WinHPCJob(None)
217 # j.job_name = 'IPCluster'
265 # j.job_name = 'IPCluster'
218 # j.username = 'GNET\\bgranger'
266 # j.username = 'GNET\\bgranger'
General Comments 0
You need to be logged in to leave comments. Login now