diff --git a/IPython/config/default/ipcluster_config.py b/IPython/config/default/ipcluster_config.py index ff40ca4..f28b26d 100644 --- a/IPython/config/default/ipcluster_config.py +++ b/IPython/config/default/ipcluster_config.py @@ -16,7 +16,7 @@ c = get_config() # The selected launchers can be configured below. # Options are (LocalControllerLauncher, MPIExecControllerLauncher, -# PBSControllerLauncher) +# PBSControllerLauncher, WindowsHPCControllerLauncher) # c.Global.controller_launcher = 'IPython.kernel.launcher.LocalControllerLauncher' # Options are (LocalEngineSetLauncher, MPIExecEngineSetLauncher, @@ -79,6 +79,45 @@ c = get_config() # c.PBSControllerLauncher.batch_file_name = u'pbs_batch_script_controller' #----------------------------------------------------------------------------- +# Windows HPC Server 2008 launcher configuration +#----------------------------------------------------------------------------- + +# c.WinHPCJob.username = 'DOMAIN\\user' +# c.WinHPCJob.priority = 'Highest' +# c.WinHPCJob.requested_nodes = '' +# c.WinHPCJob.project = '' +# c.WinHPCJob.is_exclusive = False + +# c.WinHPCTask.environment_variables = {} +# c.WinHPCTask.work_directory = '' +# c.WinHPCTask.is_rerunnable = True + +# c.IPControllerTask.task_name = 'IPController' +# c.IPControllerTask.controller_cmd = ['ipcontroller.exe'] +# c.IPControllerTask.controller_args = ['--log-to-file', '--log-level', '40'] +# c.IPControllerTask.environment_variables = {} + +# c.IPEngineTask.task_name = 'IPController' +# c.IPEngineTask.engine_cmd = ['ipengine.exe'] +# c.IPEngineTask.engine_args = ['--log-to-file', '--log-level', '40'] +# c.IPEngineTask.environment_variables = {} + +# c.WindowsHPCLauncher.scheduler = 'HEADNODE' +# c.WindowsHPCLauncher.username = '\\DOMAIN\USERNAME' +# c.WindowsHPCLauncher.priority = 'Highest' +# c.WindowsHPCLauncher.requested_nodes = '' +# c.WindowsHPCLauncher.job_file_name = u'ipython_job.xml' +# c.WindowsHPCLauncher.project = 'MyProject' + +# c.WindowsHPCControllerLauncher.scheduler = 'HEADNODE' +# c.WindowsHPCControllerLauncher.username = '\\DOMAIN\USERNAME' +# c.WindowsHPCControllerLauncher.priority = 'Highest' +# c.WindowsHPCControllerLauncher.requested_nodes = '' +# c.WindowsHPCControllerLauncher.job_file_name = u'ipcontroller_job.xml' +# c.WindowsHPCControllerLauncher.project = 'MyProject' + + +#----------------------------------------------------------------------------- # Engine launcher configuration #----------------------------------------------------------------------------- diff --git a/IPython/kernel/launcher.py b/IPython/kernel/launcher.py index 5dde18e..a64620f 100644 --- a/IPython/kernel/launcher.py +++ b/IPython/kernel/launcher.py @@ -21,8 +21,13 @@ import sys from IPython.core.component import Component from IPython.external import Itpl -from IPython.utils.traitlets import Str, Int, List, Unicode +from IPython.utils.traitlets import Str, Int, List, Unicode, Enum +from IPython.utils.platutils import find_cmd from IPython.kernel.twistedutil import gatherBoth, make_deferred, sleep_deferred +from IPython.kernel.winhpcjob import ( + WinHPCJob, WinHPCTask, + IPControllerTask, IPEngineTask +) from twisted.internet import reactor, defer from twisted.internet.defer import inlineCallbacks @@ -328,8 +333,85 @@ class SSHLauncher(BaseLauncher): class WindowsHPCLauncher(BaseLauncher): - pass + # A regular expression used to get the job id from the output of the + # submit_command. + job_id_regexp = Str('\d+', config=True) + # The filename of the instantiated job script. + job_file_name = Unicode(u'ipython_job.xml', config=True) + # The full path to the instantiated job script. This gets made dynamically + # by combining the working_dir with the job_file_name. + job_file = Unicode(u'') + # The hostname of the scheduler to submit the job to + scheduler = Str('HEADNODE', config=True) + username = Str(os.environ.get('USERNAME', ''), config=True) + priority = Enum(('Lowest','BelowNormal','Normal','AboveNormal','Highest'), + default_value='Highest', config=True) + requested_nodes = Str('', config=True) + project = Str('MyProject', config=True) + job_cmd = Str(find_cmd('job'), config=True) + + def __init__(self, working_dir, parent=None, name=None, config=None): + super(WindowsHPCLauncher, self).__init__( + working_dir, parent, name, config + ) + self.job_file = os.path.join(self.working_dir, self.job_file_name) + + def write_job_file(self, n): + raise NotImplementedError("Implement write_job_file in a subclass.") + + def find_args(self): + return ['job.exe'] + + def parse_job_id(self, output): + """Take the output of the submit command and return the job id.""" + m = re.search(self.job_id_regexp, output) + if m is not None: + job_id = m.group() + else: + raise LauncherError("Job id couldn't be determined: %s" % output) + self.job_id = job_id + log.msg('Job started with job id: %r' % job_id) + return job_id + + @inlineCallbacks + def start(self, n): + """Start n copies of the process using the Win HPC job scheduler.""" + self.write_job_file(n) + args = [ + 'submit', + '/jobfile:%s' % self.job_file, + '/scheduler:%s' % self.scheduler + ] + log.msg("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),)) + output = yield getProcessOutput(self.job_cmd, + args, + env=os.environ, + path=self.working_dir + ) + job_id = self.parse_job_id(output) + self.notify_start(job_id) + defer.returnValue(job_id) + + @inlineCallbacks + def stop(self): + args = [ + 'cancel', + self.job_id, + '/scheduler:%s' % self.scheduler + ] + log.msg("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),)) + try: + output = yield getProcessOutput(self.job_cmd, + args, + env=os.environ, + path=self.working_dir + ) + except: + output = 'The job already appears to be stoppped: %r' % self.job_id + self.notify_stop(output) # Pass the output of the kill cmd + defer.returnValue(output) + class BatchSystemLauncher(BaseLauncher): """Launch an external process using a batch system. @@ -460,7 +542,33 @@ class LocalControllerLauncher(LocalProcessLauncher): class WindowsHPCControllerLauncher(WindowsHPCLauncher): - pass + + job_file_name = Unicode(u'ipcontroller_job.xml', config=True) + extra_args = List([],config=False) + + def write_job_file(self, n): + job = WinHPCJob(self) + job.job_name = "IPController" + job.username = self.username + job.priority = self.priority + job.requested_nodes = self.requested_nodes + job.project = self.project + + t = IPControllerTask(self) + t.work_directory = self.working_dir + # Add the --profile and --cluster-dir args from start. + t.controller_args.extend(self.extra_args) + job.add_task(t) + log.msg("Writing job description file: %s" % self.job_file) + job.write(self.job_file) + + def start(self, profile=None, cluster_dir=None): + """Start the controller by profile or cluster_dir.""" + if cluster_dir is not None: + self.extra_args = ['--cluster-dir', cluster_dir] + if profile is not None: + self.extra_args = ['--profile', profile] + return super(WindowsHPCControllerLauncher, self).start(1) class MPIExecControllerLauncher(MPIExecLauncher): diff --git a/IPython/kernel/winhpcjob.py b/IPython/kernel/winhpcjob.py index 913c3fe..1aad29e 100644 --- a/IPython/kernel/winhpcjob.py +++ b/IPython/kernel/winhpcjob.py @@ -20,6 +20,7 @@ from __future__ import with_statement import os import re +import uuid from xml.etree import ElementTree as ET from xml.dom import minidom @@ -28,7 +29,7 @@ from IPython.core.component import Component from IPython.external import Itpl from IPython.utils.traitlets import ( Str, Int, List, Unicode, Instance, - Enum, Bool + Enum, Bool, CStr ) #----------------------------------------------------------------------------- @@ -82,7 +83,6 @@ class WinHPCJob(Component): run_until_canceled = Bool(False, config=True) is_exclusive = Bool(False, config=True) username = Str(os.environ.get('USERNAME', ''), config=True) - owner = Str('', config=True) job_type = Str('Batch', config=True) priority = Enum(('Lowest','BelowNormal','Normal','AboveNormal','Highest'), default_value='Highest', config=True) @@ -92,8 +92,9 @@ class WinHPCJob(Component): version = Str("2.000") tasks = List([]) - def _username_changed(self, name, old, new): - self.owner = new + @property + def owner(self): + return self.username def _write_attr(self, root, attr, key): s = as_str(getattr(self, attr, '')) @@ -169,13 +170,13 @@ class WinHPCTask(Component): min_nodes = Int(1, config=True) max_nodes = Int(1, config=True) unit_type = Str("Core", config=True) - command_line = Str('', config=True) - work_directory = Str('', config=True) + command_line = CStr('', config=True) + work_directory = CStr('', config=True) is_rerunnaable = Bool(True, config=True) - std_out_file_path = Str('', config=True) - std_err_file_path = Str('', config=True) + std_out_file_path = CStr('', config=True) + std_err_file_path = CStr('', config=True) is_parametric = Bool(False, config=True) - environment_variables = Instance(dict, args=()) + environment_variables = Instance(dict, args=(), config=True) def _write_attr(self, root, attr, key): s = as_str(getattr(self, attr, '')) @@ -213,6 +214,53 @@ class WinHPCTask(Component): return env_vars + +# By declaring these, we can configure the controller and engine separately! + +class IPControllerTask(WinHPCTask): + + task_name = Str('IPController', config=True) + controller_cmd = List(['ipcontroller.exe'], config=True) + controller_args = List(['--log-to-file', '--log-level', '40'], config=True) + # I don't want these to be configurable + std_out_file_path = CStr(os.path.join('log','ipcontroller-out.txt'), config=False) + std_err_file_path = CStr(os.path.join('log','ipcontroller-err.txt'), config=False) + min_cores = Int(1, config=False) + max_cores = Int(1, config=False) + min_sockets = Int(1, config=False) + max_sockets = Int(1, config=False) + min_nodes = Int(1, config=False) + max_nodes = Int(1, config=False) + unit_type = Str("Core", config=False) + work_directory = CStr('', config=False) + + @property + def command_line(self): + return ' '.join(self.controller_cmd + self.controller_args) + + +class IPEngineTask(WinHPCTask): + + task_name = Str('IPEngine', config=True) + engine_cmd = List(['ipengine.exe'], config=True) + engine_args = List(['--log-to-file', '--log-level', '40'], config=True) + # I don't want these to be configurable + std_out_file_path = CStr(os.path.join('log','ipengine-out-%s.txt' % uuid.uuid1()), config=False) + std_err_file_path = CStr(os.path.join('log','ipengine-err-%s.txt' % uuid.uuid1()), config=False) + min_cores = Int(1, config=False) + max_cores = Int(1, config=False) + min_sockets = Int(1, config=False) + max_sockets = Int(1, config=False) + min_nodes = Int(1, config=False) + max_nodes = Int(1, config=False) + unit_type = Str("Core", config=False) + work_directory = CStr('', config=False) + + @property + def command_line(self): + return ' '.join(self.engine_cmd + self.engine_args) + + # j = WinHPCJob(None) # j.job_name = 'IPCluster' # j.username = 'GNET\\bgranger'