From b4d67bf2526b4d249690516f60b33a258467bc32 2009-11-12 05:41:09 From: Brian Granger Date: 2009-11-12 05:41:09 Subject: [PATCH] More work on the launchers and Win HPC support. --- diff --git a/IPython/config/default/ipcluster_config.py b/IPython/config/default/ipcluster_config.py index b7ff08e..9a90de7 100644 --- a/IPython/config/default/ipcluster_config.py +++ b/IPython/config/default/ipcluster_config.py @@ -8,26 +8,34 @@ c = get_config() # This allows you to control what method is used to start the controller # and engines. The following methods are currently supported: -# * Start as a regular process on localhost. -# * Start using mpiexec. -# * Start using PBS -# * Start using SSH (currently broken) +# - Start as a regular process on localhost. +# - Start using mpiexec. +# - Start using the Windows HPC Server 2008 scheduler +# - Start using PBS +# - Start using SSH (currently broken) + # The selected launchers can be configured below. -# Options are (LocalControllerLauncher, MPIExecControllerLauncher, -# PBSControllerLauncher, WindowsHPCControllerLauncher) +# Options are: +# - LocalControllerLauncher +# - MPIExecControllerLauncher +# - PBSControllerLauncher +# - WindowsHPCControllerLauncher # c.Global.controller_launcher = 'IPython.kernel.launcher.LocalControllerLauncher' -# Options are (LocalEngineSetLauncher, MPIExecEngineSetLauncher, -# PBSEngineSetLauncher) +# Options are: +# - LocalEngineSetLauncher +# - MPIExecEngineSetLauncher +# - PBSEngineSetLauncher +# - WindowsHPCEngineSetLauncher # c.Global.engine_launcher = 'IPython.kernel.launcher.LocalEngineSetLauncher' #----------------------------------------------------------------------------- # Global configuration #----------------------------------------------------------------------------- -# The default number of engine that will be started. This is overridden by +# The default number of engines that will be started. This is overridden by # the -n command line option: "ipcluster start -n 4" # c.Global.n = 2 @@ -41,18 +49,31 @@ c = get_config() # to change to this directory before starting. # c.Global.working_dir = os.getcwd() + #----------------------------------------------------------------------------- -# Controller launcher configuration +# Local process launchers #----------------------------------------------------------------------------- -# Configure how the controller is started. The configuration of the controller -# can also bet setup by editing the controller config file: -# ipcontroller_config.py +# The working directory for the controller +# c.LocalControllerLauncher.working_dir = u'' # The command line arguments to call the controller with. # c.LocalControllerLauncher.controller_args = \ # ['--log-to-file','--log-level', '40'] +# The working directory for the controller +# c.LocalEngineSetLauncher.working_dir = u'' + +# Command line argument passed to the engines. +# c.LocalEngineSetLauncher.engine_args = ['--log-to-file','--log-level', '40'] + +#----------------------------------------------------------------------------- +# MPIExec launchers +#----------------------------------------------------------------------------- + +# The working directory for the controller +# c.MPIExecControllerLauncher.working_dir = u'' + # The mpiexec/mpirun command to use in started the controller. # c.MPIExecControllerLauncher.mpi_cmd = ['mpiexec'] @@ -63,6 +84,36 @@ c = get_config() # c.MPIExecControllerLauncher.controller_args = \ # ['--log-to-file','--log-level', '40'] + +# The working directory for the controller +# c.MPIExecEngineSetLauncher.working_dir = u'' + +# The mpiexec/mpirun command to use in started the controller. +# c.MPIExecEngineSetLauncher.mpi_cmd = ['mpiexec'] + +# Additional arguments to pass to the actual mpiexec command. +# c.MPIExecEngineSetLauncher.mpi_args = [] + +# Command line argument passed to the engines. +# c.MPIExecEngineSetLauncher.engine_args = ['--log-to-file','--log-level', '40'] + +# The default number of engines to start if not given elsewhere. +# c.MPIExecEngineSetLauncher.n = 1 + +#----------------------------------------------------------------------------- +# SSH launchers +#----------------------------------------------------------------------------- + +# Todo + + +#----------------------------------------------------------------------------- +# Unix batch (PBS) schedulers launchers +#----------------------------------------------------------------------------- + +# The working directory for the controller +# c.PBSControllerLauncher.working_dir = u'' + # The command line program to use to submit a PBS job. # c.PBSControllerLauncher.submit_command = 'qsub' @@ -82,63 +133,9 @@ c = get_config() # submit the job. This will be written to the cluster directory. # c.PBSControllerLauncher.batch_file_name = u'pbs_batch_script_controller' -#----------------------------------------------------------------------------- -# Windows HPC Server 2008 launcher configuration -#----------------------------------------------------------------------------- - -# c.WinHPCJob.username = 'DOMAIN\\user' -# c.WinHPCJob.priority = 'Highest' -# c.WinHPCJob.requested_nodes = '' -# c.WinHPCJob.project = '' -# c.WinHPCJob.is_exclusive = False - -# c.WinHPCTask.environment_variables = {} -# c.WinHPCTask.work_directory = '' -# c.WinHPCTask.is_rerunnable = True - -# c.IPControllerTask.task_name = 'IPController' -# c.IPControllerTask.controller_cmd = [u'ipcontroller.exe'] -# c.IPControllerTask.controller_args = ['--log-to-file', '--log-level', '40'] -# c.IPControllerTask.environment_variables = {} - -# c.IPEngineTask.task_name = 'IPController' -# c.IPEngineTask.engine_cmd = [u'ipengine.exe'] -# c.IPEngineTask.engine_args = ['--log-to-file', '--log-level', '40'] -# c.IPEngineTask.environment_variables = {} - -# c.WindowsHPCLauncher.scheduler = 'HEADNODE' -# c.WindowsHPCLauncher.username = '\\DOMAIN\USERNAME' -# c.WindowsHPCLauncher.priority = 'Highest' -# c.WindowsHPCLauncher.requested_nodes = '' -# c.WindowsHPCLauncher.job_file_name = u'ipython_job.xml' -# c.WindowsHPCLauncher.project = 'MyProject' - -# c.WindowsHPCControllerLauncher.scheduler = 'HEADNODE' -# c.WindowsHPCControllerLauncher.username = '\\DOMAIN\USERNAME' -# c.WindowsHPCControllerLauncher.priority = 'Highest' -# c.WindowsHPCControllerLauncher.requested_nodes = '' -# c.WindowsHPCControllerLauncher.job_file_name = u'ipcontroller_job.xml' -# c.WindowsHPCControllerLauncher.project = 'MyProject' - - -#----------------------------------------------------------------------------- -# Engine launcher configuration -#----------------------------------------------------------------------------- - -# Command line argument passed to the engines. -# c.LocalEngineSetLauncher.engine_args = ['--log-to-file','--log-level', '40'] - -# The mpiexec/mpirun command to use in started the controller. -# c.MPIExecEngineSetLauncher.mpi_cmd = ['mpiexec'] - -# Additional arguments to pass to the actual mpiexec command. -# c.MPIExecEngineSetLauncher.mpi_args = [] - -# Command line argument passed to the engines. -# c.MPIExecEngineSetLauncher.engine_args = ['--log-to-file','--log-level', '40'] -# The default number of engines to start if not given elsewhere. -# c.MPIExecEngineSetLauncher.n = 1 +# The working directory for the controller +# c.PBSEngineSetLauncher.working_dir = u'' # The command line program to use to submit a PBS job. # c.PBSEngineSetLauncher.submit_command = 'qsub' @@ -161,37 +158,44 @@ c = get_config() # c.PBSEngineSetLauncher.batch_file_name = u'pbs_batch_script_engines' #----------------------------------------------------------------------------- -# Base launcher configuration +# Windows HPC Server 2008 launcher configuration #----------------------------------------------------------------------------- -# The various launchers are organized into an inheritance hierarchy. -# The configurations can also be iherited and the following attributes -# allow you to configure the base classes. - -# c.MPIExecLauncher.mpi_cmd = ['mpiexec'] -# c.MPIExecLauncher.mpi_args = [] -# c.MPIExecLauncher.program = [] -# c.MPIExecLauncher.program_args = [] -# c.MPIExecLauncher.n = 1 - -# c.SSHLauncher.ssh_cmd = ['ssh'] -# c.SSHLauncher.ssh_args = [] -# c.SSHLauncher.program = [] -# s.SSHLauncher.program_args = [] -# c.SSHLauncher.hostname = '' -# c.SSHLauncher.user = os.environ['USER'] - -# c.BatchSystemLauncher.submit_command -# c.BatchSystemLauncher.delete_command -# c.BatchSystemLauncher.job_id_regexp -# c.BatchSystemLauncher.batch_template -# c.BatchSystemLauncher.batch_file_name - -# c.PBSLauncher.submit_command = 'qsub' -# c.PBSLauncher.delete_command = 'qdel' -# c.PBSLauncher.job_id_regexp = '\d+' -# c.PBSLauncher.batch_template = """""" -# c.PBSLauncher.batch_file_name = u'pbs_batch_script' +# c.IPControllerJob.job_name = 'IPController' +# c.IPControllerJob.is_exclusive = False +# c.IPControllerJob.username = 'USERDOMAIN\\USERNAME' +# c.IPControllerJob.priority = 'Highest' +# c.IPControllerJob.requested_nodes = '' +# c.IPControllerJob.project = 'MyProject' + +# c.IPControllerTask.task_name = 'IPController' +# c.IPControllerTask.controller_cmd = [u'ipcontroller.exe'] +# c.IPControllerTask.controller_args = ['--log-to-file', '--log-level', '40'] +# c.IPControllerTask.environment_variables = {} + +# c.WindowsHPCControllerLauncher.working_dir = u'' +# c.WindowsHPCControllerLauncher.scheduler = 'HEADNODE' +# c.WindowsHPCControllerLauncher.job_file_name = u'ipcontroller_job.xml' + + +# c.IPEngineSetJob.job_name = 'IPEngineSet' +# c.IPEngineSetJob.is_exclusive = False +# c.IPEngineSetJob.username = 'USERDOMAIN\\USERNAME' +# c.IPEngineSetJob.priority = 'Highest' +# c.IPEngineSetJob.requested_nodes = '' +# c.IPEngineSetJob.project = 'MyProject' + +# c.IPEngineTask.task_name = 'IPEngine' +# c.IPEngineTask.engine_cmd = [u'ipengine.exe'] +# c.IPEngineTask.engine_args = ['--log-to-file', '--log-level', '40'] +# c.IPEngineTask.environment_variables = {} + +# c.WindowsHPCEngineSetLauncher.working_dir = u'' +# c.WindowsHPCEngineSetLauncher.scheduler = 'HEADNODE' +# c.WindowsHPCEngineSetLauncher.job_file_name = u'ipengineset_job.xml' + + + diff --git a/IPython/kernel/launcher.py b/IPython/kernel/launcher.py index 2d2e4ef..d1b7ce1 100644 --- a/IPython/kernel/launcher.py +++ b/IPython/kernel/launcher.py @@ -1,7 +1,7 @@ #!/usr/bin/env python # encoding: utf-8 """ -Facilities for launching processing asynchronously. +Facilities for launching IPython processes asynchronously. """ #----------------------------------------------------------------------------- @@ -26,7 +26,8 @@ from IPython.utils.platutils import find_cmd from IPython.kernel.twistedutil import gatherBoth, make_deferred, sleep_deferred from IPython.kernel.winhpcjob import ( WinHPCJob, WinHPCTask, - IPControllerTask, IPEngineTask + IPControllerTask, IPEngineTask, + IPControllerJob, IPEngineSetJob ) from twisted.internet import reactor, defer @@ -38,7 +39,48 @@ from twisted.python import log from twisted.python.failure import Failure #----------------------------------------------------------------------------- -# Generic launchers +# Utilities +#----------------------------------------------------------------------------- + + +def find_controller_cmd(): + """Find the command line ipcontroller program in a cross platform way.""" + if sys.platform == 'win32': + # This logic is needed because the ipcontroller script doesn't + # always get installed in the same way or in the same location. + from IPython.kernel import ipcontrollerapp + script_location = ipcontrollerapp.__file__.replace('.pyc', '.py') + # The -u option here turns on unbuffered output, which is required + # on Win32 to prevent wierd conflict and problems with Twisted. + # Also, use sys.executable to make sure we are picking up the + # right python exe. + cmd = [sys.executable, '-u', script_location] + else: + # ipcontroller has to be on the PATH in this case. + cmd = ['ipcontroller'] + return cmd + + +def find_engine_cmd(): + """Find the command line ipengine program in a cross platform way.""" + if sys.platform == 'win32': + # This logic is needed because the ipengine script doesn't + # always get installed in the same way or in the same location. + from IPython.kernel import ipengineapp + script_location = ipengineapp.__file__.replace('.pyc', '.py') + # The -u option here turns on unbuffered output, which is required + # on Win32 to prevent wierd conflict and problems with Twisted. + # Also, use sys.executable to make sure we are picking up the + # right python exe. + cmd = [sys.executable, '-u', script_location] + else: + # ipcontroller has to be on the PATH in this case. + cmd = ['ipengine'] + return cmd + + +#----------------------------------------------------------------------------- +# Base launchers and errors #----------------------------------------------------------------------------- @@ -57,8 +99,6 @@ class UnknownStatus(LauncherError): class BaseLauncher(Component): """An asbtraction for starting, stopping and signaling a process.""" - # A directory for files related to the process. But, we don't cd to - # this directory, working_dir = Unicode(u'') def __init__(self, working_dir, parent=None, name=None, config=None): @@ -181,6 +221,11 @@ class BaseLauncher(Component): ) +#----------------------------------------------------------------------------- +# Local process launchers +#----------------------------------------------------------------------------- + + class LocalProcessLauncherProtocol(ProcessProtocol): """A ProcessProtocol to go with the LocalProcessLauncher.""" @@ -278,6 +323,112 @@ class LocalProcessLauncher(BaseLauncher): yield self.signal('KILL') +class LocalControllerLauncher(LocalProcessLauncher): + """Launch a controller as a regular external process.""" + + controller_cmd = List(find_controller_cmd(), config=True) + # Command line arguments to ipcontroller. + controller_args = List(['--log-to-file','--log-level', '40'], config=True) + + def find_args(self): + return self.controller_cmd + self.controller_args + \ + ['--working-dir', self.working_dir] + + def start(self, cluster_dir): + """Start the controller by cluster_dir.""" + self.controller_args.extend(['--cluster-dir', cluster_dir]) + self.cluster_dir = unicode(cluster_dir) + log.msg("Starting LocalControllerLauncher: %r" % self.args) + return super(LocalControllerLauncher, self).start() + + +class LocalEngineLauncher(LocalProcessLauncher): + """Launch a single engine as a regular externall process.""" + + engine_cmd = List(find_engine_cmd(), config=True) + # Command line arguments for ipengine. + engine_args = List( + ['--log-to-file','--log-level', '40'], config=True + ) + + def find_args(self): + return self.engine_cmd + self.engine_args + \ + ['--working-dir', self.working_dir] + + def start(self, cluster_dir): + """Start the engine by cluster_dir.""" + self.engine_args.extend(['--cluster-dir', cluster_dir]) + self.cluster_dir = unicode(cluster_dir) + return super(LocalEngineLauncher, self).start() + + +class LocalEngineSetLauncher(BaseLauncher): + """Launch a set of engines as regular external processes.""" + + # Command line arguments for ipengine. + engine_args = List( + ['--log-to-file','--log-level', '40'], config=True + ) + + def __init__(self, working_dir, parent=None, name=None, config=None): + super(LocalEngineSetLauncher, self).__init__( + working_dir, parent, name, config + ) + self.launchers = [] + + def start(self, n, cluster_dir): + """Start n engines by profile or cluster_dir.""" + self.cluster_dir = unicode(cluster_dir) + dlist = [] + for i in range(n): + el = LocalEngineLauncher(self.working_dir, self) + # Copy the engine args over to each engine launcher. + import copy + el.engine_args = copy.deepcopy(self.engine_args) + d = el.start(cluster_dir) + if i==0: + log.msg("Starting LocalEngineSetLauncher: %r" % el.args) + self.launchers.append(el) + dlist.append(d) + # The consumeErrors here could be dangerous + dfinal = gatherBoth(dlist, consumeErrors=True) + dfinal.addCallback(self.notify_start) + return dfinal + + def find_args(self): + return ['engine set'] + + def signal(self, sig): + dlist = [] + for el in self.launchers: + d = el.signal(sig) + dlist.append(d) + dfinal = gatherBoth(dlist, consumeErrors=True) + return dfinal + + def interrupt_then_kill(self, delay=1.0): + dlist = [] + for el in self.launchers: + d = el.interrupt_then_kill(delay) + dlist.append(d) + dfinal = gatherBoth(dlist, consumeErrors=True) + return dfinal + + def stop(self): + return self.interrupt_then_kill() + + def observe_stop(self): + dlist = [el.observe_stop() for el in self.launchers] + dfinal = gatherBoth(dlist, consumeErrors=False) + dfinal.addCallback(self.notify_stop) + return dfinal + + +#----------------------------------------------------------------------------- +# MPIExec launchers +#----------------------------------------------------------------------------- + + class MPIExecLauncher(LocalProcessLauncher): """Launch an external process using mpiexec.""" @@ -303,6 +454,54 @@ class MPIExecLauncher(LocalProcessLauncher): return super(MPIExecLauncher, self).start() +class MPIExecControllerLauncher(MPIExecLauncher): + """Launch a controller using mpiexec.""" + + controller_cmd = List(find_controller_cmd(), config=True) + # Command line arguments to ipcontroller. + controller_args = List(['--log-to-file','--log-level', '40'], config=True) + n = Int(1, config=False) + + def start(self, cluster_dir): + """Start the controller by cluster_dir.""" + self.controller_args.extend(['--cluster-dir', cluster_dir]) + self.cluster_dir = unicode(cluster_dir) + log.msg("Starting MPIExecControllerLauncher: %r" % self.args) + return super(MPIExecControllerLauncher, self).start(1) + + def find_args(self): + return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \ + self.controller_cmd + self.controller_args + \ + ['--working-dir', self.working_dir] + + +class MPIExecEngineSetLauncher(MPIExecLauncher): + + engine_cmd = List(find_engine_cmd(), config=True) + # Command line arguments for ipengine. + engine_args = List( + ['--log-to-file','--log-level', '40'], config=True + ) + n = Int(1, config=True) + + def start(self, n, cluster_dir): + """Start n engines by profile or cluster_dir.""" + self.engine_args.extend(['--cluster-dir', cluster_dir]) + self.cluster_dir = unicode(cluster_dir) + log.msg('Starting MPIExecEngineSetLauncher: %r' % self.args) + return super(MPIExecEngineSetLauncher, self).start(n) + + def find_args(self): + return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \ + self.engine_cmd + self.engine_args + \ + ['--working-dir', self.working_dir] + + +#----------------------------------------------------------------------------- +# SSH launchers +#----------------------------------------------------------------------------- + + class SSHLauncher(BaseLauncher): """A minimal launcher for ssh. @@ -337,11 +536,25 @@ class SSHLauncher(BaseLauncher): return super(SSHLauncher, self).start() +class SSHControllerLauncher(SSHLauncher): + pass + + +class SSHEngineSetLauncher(BaseLauncher): + pass + + +#----------------------------------------------------------------------------- +# Windows HPC Server 2008 scheduler launchers +#----------------------------------------------------------------------------- + + # This is only used on Windows. -if os.name=='nt': - job_cmd = find_cmd('job') -else: - job_cmd = 'job' +def find_job_cmd(): + if os.name=='nt': + return find_cmd('job') + else: + return 'job' class WindowsHPCLauncher(BaseLauncher): @@ -355,13 +568,8 @@ class WindowsHPCLauncher(BaseLauncher): # by combining the working_dir with the job_file_name. job_file = Unicode(u'') # The hostname of the scheduler to submit the job to - scheduler = Str('HEADNODE', config=True) - username = Str(os.environ.get('USERNAME', ''), config=True) - priority = Enum(('Lowest','BelowNormal','Normal','AboveNormal','Highest'), - default_value='Highest', config=True) - requested_nodes = Str('', config=True) - project = Str('MyProject', config=True) - job_cmd = Str(job_cmd, config=True) + scheduler = Str('', config=True) + job_cmd = Str(find_job_cmd, config=True) def __init__(self, working_dir, parent=None, name=None, config=None): super(WindowsHPCLauncher, self).__init__( @@ -369,6 +577,10 @@ class WindowsHPCLauncher(BaseLauncher): ) self.job_file = os.path.join(self.working_dir, self.job_file_name) + @property + def job_file(self): + return os.path.join(self.working_dir, self.job_file_name) + def write_job_file(self, n): raise NotImplementedError("Implement write_job_file in a subclass.") @@ -423,7 +635,79 @@ class WindowsHPCLauncher(BaseLauncher): output = 'The job already appears to be stoppped: %r' % self.job_id self.notify_stop(output) # Pass the output of the kill cmd defer.returnValue(output) - + + +class WindowsHPCControllerLauncher(WindowsHPCLauncher): + + job_file_name = Unicode(u'ipcontroller_job.xml', config=True) + extra_args = List([], config=False) + + def write_job_file(self, n): + job = IPControllerJob(self) + + t = IPControllerTask(self) + # The tasks work directory is *not* the actual work directory of + # the controller. It is used as the base path for the stdout/stderr + # files that the scheduler redirects to. + t.work_directory = self.cluster_dir + # Add the --cluster-dir and --working-dir from self.start(). + t.controller_args.extend(self.extra_args) + job.add_task(t) + + log.msg("Writing job description file: %s" % self.job_file) + job.write(self.job_file) + + @property + def job_file(self): + return os.path.join(self.cluster_dir, self.job_file_name) + + def start(self, cluster_dir): + """Start the controller by cluster_dir.""" + self.extra_args = [ + '--cluster-dir', cluster_dir, '--working-dir', self.working_dir + ] + self.cluster_dir = unicode(cluster_dir) + return super(WindowsHPCControllerLauncher, self).start(1) + + +class WindowsHPCEngineSetLauncher(WindowsHPCLauncher): + + job_file_name = Unicode(u'ipengineset_job.xml', config=True) + extra_args = List([], config=False) + + def write_job_file(self, n): + job = IPControllerJob(self) + + for i in range(n): + t = IPEngineTask(self) + # The tasks work directory is *not* the actual work directory of + # the engine. It is used as the base path for the stdout/stderr + # files that the scheduler redirects to. + t.work_directory = self.cluster_dir + # Add the --cluster-dir and --working-dir from self.start(). + t.engine_args.extend(self.extra_args) + job.add_task(t) + + log.msg("Writing job description file: %s" % self.job_file) + job.write(self.job_file) + + @property + def job_file(self): + return os.path.join(self.cluster_dir, self.job_file_name) + + def start(self, cluster_dir): + """Start the controller by cluster_dir.""" + self.extra_args = [ + '--cluster-dir', cluster_dir, '--working-dir', self.working_dir + ] + self.cluster_dir = unicode(cluster_dir) + return super(WindowsHPCControllerLauncher, self).start(n) + + +#----------------------------------------------------------------------------- +# Batch (PBS) system launchers +#----------------------------------------------------------------------------- + class BatchSystemLauncher(BaseLauncher): """Launch an external process using a batch system. @@ -511,272 +795,34 @@ class PBSLauncher(BatchSystemLauncher): batch_file = Unicode(u'') -#----------------------------------------------------------------------------- -# Controller launchers -#----------------------------------------------------------------------------- - -def find_controller_cmd(): - """Find the command line ipcontroller program in a cross platform way.""" - if sys.platform == 'win32': - # This logic is needed because the ipcontroller script doesn't - # always get installed in the same way or in the same location. - from IPython.kernel import ipcontrollerapp - script_location = ipcontrollerapp.__file__.replace('.pyc', '.py') - # The -u option here turns on unbuffered output, which is required - # on Win32 to prevent wierd conflict and problems with Twisted. - # Also, use sys.executable to make sure we are picking up the - # right python exe. - cmd = [sys.executable, '-u', script_location] - else: - # ipcontroller has to be on the PATH in this case. - cmd = ['ipcontroller'] - return cmd - - -class LocalControllerLauncher(LocalProcessLauncher): - """Launch a controller as a regular external process.""" - - controller_cmd = List(find_controller_cmd(), config=True) - # Command line arguments to ipcontroller. - controller_args = List(['--log-to-file','--log-level', '40'], config=True) - - def find_args(self): - return self.controller_cmd + self.controller_args - - def start(self, profile=None, cluster_dir=None): - """Start the controller by profile or cluster_dir.""" - if cluster_dir is not None: - self.controller_args.extend(['--cluster-dir', cluster_dir]) - if profile is not None: - self.controller_args.extend(['--profile', profile]) - log.msg("Starting LocalControllerLauncher: %r" % self.args) - return super(LocalControllerLauncher, self).start() - - -class WindowsHPCControllerLauncher(WindowsHPCLauncher): - - job_file_name = Unicode(u'ipcontroller_job.xml', config=True) - extra_args = List([],config=False) - - def write_job_file(self, n): - job = WinHPCJob(self) - job.job_name = "IPController" - job.username = self.username - job.priority = self.priority - job.requested_nodes = self.requested_nodes - job.project = self.project - - t = IPControllerTask(self) - t.work_directory = self.working_dir - # Add the --profile and --cluster-dir args from start. - t.controller_args.extend(self.extra_args) - job.add_task(t) - log.msg("Writing job description file: %s" % self.job_file) - job.write(self.job_file) - - def start(self, profile=None, cluster_dir=None): - """Start the controller by profile or cluster_dir.""" - if cluster_dir is not None: - self.extra_args = ['--cluster-dir', cluster_dir] - if profile is not None: - self.extra_args = ['--profile', profile] - return super(WindowsHPCControllerLauncher, self).start(1) - - -class MPIExecControllerLauncher(MPIExecLauncher): - """Launch a controller using mpiexec.""" - - controller_cmd = List(find_controller_cmd(), config=True) - # Command line arguments to ipcontroller. - controller_args = List(['--log-to-file','--log-level', '40'], config=True) - n = Int(1, config=False) - - def start(self, profile=None, cluster_dir=None): - """Start the controller by profile or cluster_dir.""" - if cluster_dir is not None: - self.controller_args.extend(['--cluster-dir', cluster_dir]) - if profile is not None: - self.controller_args.extend(['--profile', profile]) - log.msg("Starting MPIExecControllerLauncher: %r" % self.args) - return super(MPIExecControllerLauncher, self).start(1) - - def find_args(self): - return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \ - self.controller_cmd + self.controller_args - - class PBSControllerLauncher(PBSLauncher): """Launch a controller using PBS.""" batch_file_name = Unicode(u'pbs_batch_script_controller', config=True) - def start(self, profile=None, cluster_dir=None): + def start(self, cluster_dir): """Start the controller by profile or cluster_dir.""" # Here we save profile and cluster_dir in the context so they # can be used in the batch script template as ${profile} and # ${cluster_dir} - if cluster_dir is not None: - self.context['cluster_dir'] = cluster_dir - if profile is not None: - self.context['profile'] = profile + self.context['cluster_dir'] = cluster_dir + self.cluster_dir = unicode(cluster_dir) log.msg("Starting PBSControllerLauncher: %r" % self.args) return super(PBSControllerLauncher, self).start(1) -class SSHControllerLauncher(SSHLauncher): - pass - - -#----------------------------------------------------------------------------- -# Engine launchers -#----------------------------------------------------------------------------- - - -def find_engine_cmd(): - """Find the command line ipengine program in a cross platform way.""" - if sys.platform == 'win32': - # This logic is needed because the ipengine script doesn't - # always get installed in the same way or in the same location. - from IPython.kernel import ipengineapp - script_location = ipengineapp.__file__.replace('.pyc', '.py') - # The -u option here turns on unbuffered output, which is required - # on Win32 to prevent wierd conflict and problems with Twisted. - # Also, use sys.executable to make sure we are picking up the - # right python exe. - cmd = [sys.executable, '-u', script_location] - else: - # ipcontroller has to be on the PATH in this case. - cmd = ['ipengine'] - return cmd - - -class LocalEngineLauncher(LocalProcessLauncher): - """Launch a single engine as a regular externall process.""" - - engine_cmd = List(find_engine_cmd(), config=True) - # Command line arguments for ipengine. - engine_args = List( - ['--log-to-file','--log-level', '40'], config=True - ) - - def find_args(self): - return self.engine_cmd + self.engine_args - - def start(self, profile=None, cluster_dir=None): - """Start the engine by profile or cluster_dir.""" - if cluster_dir is not None: - self.engine_args.extend(['--cluster-dir', cluster_dir]) - if profile is not None: - self.engine_args.extend(['--profile', profile]) - return super(LocalEngineLauncher, self).start() - - -class LocalEngineSetLauncher(BaseLauncher): - """Launch a set of engines as regular external processes.""" - - # Command line arguments for ipengine. - engine_args = List( - ['--log-to-file','--log-level', '40'], config=True - ) - - def __init__(self, working_dir, parent=None, name=None, config=None): - super(LocalEngineSetLauncher, self).__init__( - working_dir, parent, name, config - ) - self.launchers = [] - - def start(self, n, profile=None, cluster_dir=None): - """Start n engines by profile or cluster_dir.""" - dlist = [] - for i in range(n): - el = LocalEngineLauncher(self.working_dir, self) - # Copy the engine args over to each engine launcher. - import copy - el.engine_args = copy.deepcopy(self.engine_args) - d = el.start(profile, cluster_dir) - if i==0: - log.msg("Starting LocalEngineSetLauncher: %r" % el.args) - self.launchers.append(el) - dlist.append(d) - # The consumeErrors here could be dangerous - dfinal = gatherBoth(dlist, consumeErrors=True) - dfinal.addCallback(self.notify_start) - return dfinal - - def find_args(self): - return ['engine set'] - - def signal(self, sig): - dlist = [] - for el in self.launchers: - d = el.signal(sig) - dlist.append(d) - dfinal = gatherBoth(dlist, consumeErrors=True) - return dfinal - - def interrupt_then_kill(self, delay=1.0): - dlist = [] - for el in self.launchers: - d = el.interrupt_then_kill(delay) - dlist.append(d) - dfinal = gatherBoth(dlist, consumeErrors=True) - return dfinal - - def stop(self): - return self.interrupt_then_kill() - - def observe_stop(self): - dlist = [el.observe_stop() for el in self.launchers] - dfinal = gatherBoth(dlist, consumeErrors=False) - dfinal.addCallback(self.notify_stop) - return dfinal - - -class MPIExecEngineSetLauncher(MPIExecLauncher): - - engine_cmd = List(find_engine_cmd(), config=True) - # Command line arguments for ipengine. - engine_args = List( - ['--log-to-file','--log-level', '40'], config=True - ) - n = Int(1, config=True) - - def start(self, n, profile=None, cluster_dir=None): - """Start n engines by profile or cluster_dir.""" - if cluster_dir is not None: - self.engine_args.extend(['--cluster-dir', cluster_dir]) - if profile is not None: - self.engine_args.extend(['--profile', profile]) - log.msg('Starting MPIExecEngineSetLauncher: %r' % self.args) - return super(MPIExecEngineSetLauncher, self).start(n) - - def find_args(self): - return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \ - self.engine_cmd + self.engine_args - - -class WindowsHPCEngineSetLauncher(WindowsHPCLauncher): - pass - - class PBSEngineSetLauncher(PBSLauncher): batch_file_name = Unicode(u'pbs_batch_script_engines', config=True) - def start(self, n, profile=None, cluster_dir=None): + def start(self, n, cluster_dir): """Start n engines by profile or cluster_dir.""" - if cluster_dir is not None: - self.program_args.extend(['--cluster-dir', cluster_dir]) - if profile is not None: - self.program_args.extend(['-p', profile]) + self.program_args.extend(['--cluster-dir', cluster_dir]) + self.cluster_dir = unicode(cluster_dir) log.msg('Starting PBSEngineSetLauncher: %r' % self.args) return super(PBSEngineSetLauncher, self).start(n) -class SSHEngineSetLauncher(BaseLauncher): - pass - - #----------------------------------------------------------------------------- # A launcher for ipcluster itself! #----------------------------------------------------------------------------- diff --git a/IPython/kernel/winhpcjob.py b/IPython/kernel/winhpcjob.py index 1aad29e..0b791fd 100644 --- a/IPython/kernel/winhpcjob.py +++ b/IPython/kernel/winhpcjob.py @@ -67,6 +67,15 @@ def indent(elem, level=0): elem.tail = i +def find_username(): + domain = os.environ.get('USERDOMAIN') + username = os.environ.get('USERNAME','') + if domain is None: + return username + else: + return '%s\\%s' % (domain, username) + + class WinHPCJob(Component): job_id = Str('') @@ -82,7 +91,7 @@ class WinHPCJob(Component): auto_calculate_max = Bool(True, config=True) run_until_canceled = Bool(False, config=True) is_exclusive = Bool(False, config=True) - username = Str(os.environ.get('USERNAME', ''), config=True) + username = Str(find_username(), config=True) job_type = Str('Batch', config=True) priority = Enum(('Lowest','BelowNormal','Normal','AboveNormal','Highest'), default_value='Highest', config=True) @@ -216,7 +225,27 @@ class WinHPCTask(Component): # By declaring these, we can configure the controller and engine separately! - + +class IPControllerJob(WinHPCJob): + job_name = Str('IPController', config=False) + is_exclusive = Bool(False, config=True) + username = Str(find_username(), config=True) + priority = Enum(('Lowest','BelowNormal','Normal','AboveNormal','Highest'), + default_value='Highest', config=True) + requested_nodes = Str('', config=True) + project = Str('IPython', config=True) + + +class IPEngineSetJob(WinHPCJob): + job_name = Str('IPEngineSet', config=False) + is_exclusive = Bool(False, config=True) + username = Str(find_username(), config=True) + priority = Enum(('Lowest','BelowNormal','Normal','AboveNormal','Highest'), + default_value='Highest', config=True) + requested_nodes = Str('', config=True) + project = Str('IPython', config=True) + + class IPControllerTask(WinHPCTask): task_name = Str('IPController', config=True)