##// END OF EJS Templates
Address shell object in magic explicitly.
Address shell object in magic explicitly.

File last commit:

r5344:293d3eed
r5414:f39cf0be
Show More
launcher.py
1184 lines | 40.7 KiB | text/x-python | PythonLexer
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 # encoding: utf-8
"""
Facilities for launching IPython processes asynchronously.
MinRK
update recently changed modules with Authors in docstring
r4018
Authors:
* Brian Granger
* MinRK
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 """
#-----------------------------------------------------------------------------
MinRK
update recently changed modules with Authors in docstring
r4018 # Copyright (C) 2008-2011 The IPython Development Team
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 #
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-----------------------------------------------------------------------------
#-----------------------------------------------------------------------------
# Imports
#-----------------------------------------------------------------------------
MinRK
adjustments to PBS/SGE, SSH Launchers + docs update
r3647 import copy
MinRK
resort imports in a cleaner order
r3631 import logging
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 import os
import re
MinRK
Update PBS/SGE launchers with 0.10.1 options and defaults
r3659 import stat
Ben Edwards
Added import time to IPython/parallel/apps/launcher.py...
r4667 import time
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605
MinRK
improve process cleanup on Windows...
r3778 # signal imports, handling various platforms, versions
MinRK
untwist PBS, WinHPC Launchers in newparallel
r3613 from signal import SIGINT, SIGTERM
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 try:
from signal import SIGKILL
except ImportError:
MinRK
improve process cleanup on Windows...
r3778 # Windows
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 SIGKILL=SIGTERM
MinRK
improve process cleanup on Windows...
r3778 try:
# Windows >= 2.7, 3.2
from signal import CTRL_C_EVENT as SIGINT
except ImportError:
pass
MinRK
untwist PBS, WinHPC Launchers in newparallel
r3613 from subprocess import Popen, PIPE, STDOUT
try:
MinRK
add ipcluster engines; fix ssh process shutdown
r3615 from subprocess import check_output
MinRK
untwist PBS, WinHPC Launchers in newparallel
r3613 except ImportError:
MinRK
adjustments to PBS/SGE, SSH Launchers + docs update
r3647 # pre-2.7, define check_output with Popen
MinRK
add ipcluster engines; fix ssh process shutdown
r3615 def check_output(*args, **kwargs):
MinRK
adjustments to PBS/SGE, SSH Launchers + docs update
r3647 kwargs.update(dict(stdout=PIPE))
MinRK
untwist PBS, WinHPC Launchers in newparallel
r3613 p = Popen(*args, **kwargs)
out,err = p.communicate()
return out
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605
from zmq.eventloop import ioloop
MinRK
get default logger from Application.instance()
r4012 from IPython.config.application import Application
MinRK
add LoggingConfigurable base class
r4016 from IPython.config.configurable import LoggingConfigurable
MinRK
add EvalFormatter for batch system (PBS) launcher templates...
r4004 from IPython.utils.text import EvalFormatter
MinRK
add cluster_id support to ipcluster/launchers...
r4848 from IPython.utils.traitlets import (
MinRK
add Integer traitlet...
r5344 Any, Integer, CFloat, List, Unicode, Dict, Instance, HasTraits,
MinRK
add cluster_id support to ipcluster/launchers...
r4848 )
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 from IPython.utils.path import get_ipython_module_path
from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError
MinRK
forward subprocess IO over zmq on Windows...
r3771 from .win32support import forward_read_events
MinRK
improve process cleanup on Windows...
r3778 from .winhpcjob import IPControllerTask, IPEngineTask, IPControllerJob, IPEngineSetJob
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605
MinRK
forward subprocess IO over zmq on Windows...
r3771 WINDOWS = os.name == 'nt'
MinRK
interrupt windows subprocesses with CTRL-C instead of SIGINT...
r3772
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 #-----------------------------------------------------------------------------
# Paths to the kernel apps
#-----------------------------------------------------------------------------
MinRK
rebase IPython.parallel after removal of IPython.kernel...
r3672 ipcluster_cmd_argv = pycmd2argv(get_ipython_module_path(
MinRK
organize IPython.parallel into subpackages
r3673 'IPython.parallel.apps.ipclusterapp'
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 ))
MinRK
rebase IPython.parallel after removal of IPython.kernel...
r3672 ipengine_cmd_argv = pycmd2argv(get_ipython_module_path(
MinRK
organize IPython.parallel into subpackages
r3673 'IPython.parallel.apps.ipengineapp'
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 ))
MinRK
rebase IPython.parallel after removal of IPython.kernel...
r3672 ipcontroller_cmd_argv = pycmd2argv(get_ipython_module_path(
MinRK
organize IPython.parallel into subpackages
r3673 'IPython.parallel.apps.ipcontrollerapp'
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 ))
#-----------------------------------------------------------------------------
# Base launchers and errors
#-----------------------------------------------------------------------------
class LauncherError(Exception):
pass
class ProcessStateError(LauncherError):
pass
class UnknownStatus(LauncherError):
pass
MinRK
add LoggingConfigurable base class
r4016 class BaseLauncher(LoggingConfigurable):
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 """An asbtraction for starting, stopping and signaling a process."""
# In all of the launchers, the work_dir is where child processes will be
MinRK
update parallel apps to use ProfileDir
r3992 # run. This will usually be the profile_dir, but may not be. any work_dir
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 # passed into the __init__ method will override the config value.
# This should not be used to set the work_dir for the actual engine
# and controller. Instead, use their own config files or the
# controller_args, engine_args attributes of the launchers to add
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 # the work_dir option.
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 work_dir = Unicode(u'.')
loop = Instance('zmq.eventloop.ioloop.IOLoop')
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
adjustments to PBS/SGE, SSH Launchers + docs update
r3647 start_data = Any()
stop_data = Any()
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 def _loop_default(self):
return ioloop.IOLoop.instance()
MinRK
rework logging connections
r3610 def __init__(self, work_dir=u'.', config=None, **kwargs):
super(BaseLauncher, self).__init__(work_dir=work_dir, config=config, **kwargs)
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 self.state = 'before' # can be before, running, after
self.stop_callbacks = []
self.start_data = None
self.stop_data = None
@property
def args(self):
"""A list of cmd and args that will be used to start the process.
This is what is passed to :func:`spawnProcess` and the first element
will be the process name.
"""
return self.find_args()
def find_args(self):
"""The ``.args`` property calls this to find the args list.
Subcommand should implement this to construct the cmd and args.
"""
raise NotImplementedError('find_args must be implemented in a subclass')
@property
def arg_str(self):
"""The string form of the program arguments."""
return ' '.join(self.args)
@property
def running(self):
"""Am I running."""
if self.state == 'running':
return True
else:
return False
def start(self):
MinRK
scrub twisted/deferred references from launchers...
r4019 """Start the process."""
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 raise NotImplementedError('start must be implemented in a subclass')
def stop(self):
"""Stop the process and notify observers of stopping.
MinRK
scrub twisted/deferred references from launchers...
r4019 This method will return None immediately.
To observe the actual process stopping, see :meth:`on_stop`.
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 """
raise NotImplementedError('stop must be implemented in a subclass')
def on_stop(self, f):
MinRK
scrub twisted/deferred references from launchers...
r4019 """Register a callback to be called with this Launcher's stop_data
when the process actually finishes.
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 """
if self.state=='after':
return f(self.stop_data)
else:
self.stop_callbacks.append(f)
def notify_start(self, data):
"""Call this to trigger startup actions.
This logs the process startup and sets the state to 'running'. It is
a pass-through so it can be used as a callback.
"""
MinRK
rework logging connections
r3610 self.log.info('Process %r started: %r' % (self.args[0], data))
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 self.start_data = data
self.state = 'running'
return data
def notify_stop(self, data):
"""Call this to trigger process stop actions.
This logs the process stopping and sets the state to 'after'. Call
MinRK
scrub twisted/deferred references from launchers...
r4019 this to trigger callbacks registered via :meth:`on_stop`."""
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605
MinRK
rework logging connections
r3610 self.log.info('Process %r stopped: %r' % (self.args[0], data))
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 self.stop_data = data
self.state = 'after'
for i in range(len(self.stop_callbacks)):
d = self.stop_callbacks.pop()
d(data)
return data
def signal(self, sig):
"""Signal the process.
Parameters
----------
sig : str or int
'KILL', 'INT', etc., or any signal number
"""
raise NotImplementedError('signal must be implemented in a subclass')
MinRK
add cluster_id support to ipcluster/launchers...
r4848 class ClusterAppMixin(HasTraits):
"""MixIn for cluster args as traits"""
cluster_args = List([])
profile_dir=Unicode('')
cluster_id=Unicode('')
def _profile_dir_changed(self, name, old, new):
self.cluster_args = []
if self.profile_dir:
self.cluster_args.extend(['--profile-dir', self.profile_dir])
if self.cluster_id:
self.cluster_args.extend(['--cluster-id', self.cluster_id])
_cluster_id_changed = _profile_dir_changed
class ControllerMixin(ClusterAppMixin):
controller_cmd = List(ipcontroller_cmd_argv, config=True,
help="""Popen command to launch ipcontroller.""")
# Command line arguments to ipcontroller.
MinRK
parallel.apps cleanup per review...
r4850 controller_args = List(['--log-to-file','--log-level=%i' % logging.INFO], config=True,
MinRK
add cluster_id support to ipcluster/launchers...
r4848 help="""command-line args to pass to ipcontroller""")
class EngineMixin(ClusterAppMixin):
engine_cmd = List(ipengine_cmd_argv, config=True,
help="""command to launch the Engine.""")
# Command line arguments for ipengine.
MinRK
parallel.apps cleanup per review...
r4850 engine_args = List(['--log-to-file','--log-level=%i' % logging.INFO], config=True,
MinRK
add cluster_id support to ipcluster/launchers...
r4848 help="command-line arguments to pass to ipengine"
)
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605
#-----------------------------------------------------------------------------
# Local process launchers
#-----------------------------------------------------------------------------
class LocalProcessLauncher(BaseLauncher):
"""Start and stop an external process in an asynchronous manner.
This will launch the external process with a working directory of
``self.work_dir``.
"""
Bernardo B. Marques
remove all trailling spaces
r4872 # This is used to to construct self.args, which is passed to
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 # spawnProcess.
cmd_and_args = List([])
MinRK
add Integer traitlet...
r5344 poll_frequency = Integer(100) # in ms
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605
MinRK
rework logging connections
r3610 def __init__(self, work_dir=u'.', config=None, **kwargs):
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 super(LocalProcessLauncher, self).__init__(
MinRK
rework logging connections
r3610 work_dir=work_dir, config=config, **kwargs
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 )
self.process = None
self.poller = None
def find_args(self):
return self.cmd_and_args
def start(self):
if self.state == 'before':
self.process = Popen(self.args,
stdout=PIPE,stderr=PIPE,stdin=PIPE,
env=os.environ,
cwd=self.work_dir
)
MinRK
forward subprocess IO over zmq on Windows...
r3771 if WINDOWS:
self.stdout = forward_read_events(self.process.stdout)
self.stderr = forward_read_events(self.process.stderr)
else:
self.stdout = self.process.stdout.fileno()
self.stderr = self.process.stderr.fileno()
self.loop.add_handler(self.stdout, self.handle_stdout, self.loop.READ)
self.loop.add_handler(self.stderr, self.handle_stderr, self.loop.READ)
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop)
self.poller.start()
self.notify_start(self.process.pid)
else:
s = 'The process was already started and has state: %r' % self.state
raise ProcessStateError(s)
def stop(self):
return self.interrupt_then_kill()
def signal(self, sig):
if self.state == 'running':
MinRK
improve process cleanup on Windows...
r3778 if WINDOWS and sig != SIGINT:
# use Windows tree-kill for better child cleanup
check_output(['taskkill', '-pid', str(self.process.pid), '-t', '-f'])
else:
self.process.send_signal(sig)
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605
def interrupt_then_kill(self, delay=2.0):
"""Send INT, wait a delay and then send KILL."""
MinRK
improve process cleanup on Windows...
r3778 try:
self.signal(SIGINT)
except Exception:
self.log.debug("interrupt failed")
pass
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop)
self.killer.start()
# callbacks, etc:
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 def handle_stdout(self, fd, events):
MinRK
forward subprocess IO over zmq on Windows...
r3771 if WINDOWS:
line = self.stdout.recv()
else:
line = self.process.stdout.readline()
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 # a stopped process will be readable but return empty strings
if line:
MinRK
rework logging connections
r3610 self.log.info(line[:-1])
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 else:
self.poll()
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 def handle_stderr(self, fd, events):
MinRK
forward subprocess IO over zmq on Windows...
r3771 if WINDOWS:
line = self.stderr.recv()
else:
line = self.process.stderr.readline()
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 # a stopped process will be readable but return empty strings
if line:
MinRK
rework logging connections
r3610 self.log.error(line[:-1])
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 else:
self.poll()
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 def poll(self):
status = self.process.poll()
if status is not None:
self.poller.stop()
MinRK
forward subprocess IO over zmq on Windows...
r3771 self.loop.remove_handler(self.stdout)
self.loop.remove_handler(self.stderr)
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
return status
MinRK
add cluster_id support to ipcluster/launchers...
r4848 class LocalControllerLauncher(LocalProcessLauncher, ControllerMixin):
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 """Launch a controller as a regular external process."""
def find_args(self):
MinRK
add cluster_id support to ipcluster/launchers...
r4848 return self.controller_cmd + self.cluster_args + self.controller_args
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605
MinRK
add cluster_id support to ipcluster/launchers...
r4848 def start(self):
MinRK
update parallel apps to use ProfileDir
r3992 """Start the controller by profile_dir."""
MinRK
rework logging connections
r3610 self.log.info("Starting LocalControllerLauncher: %r" % self.args)
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 return super(LocalControllerLauncher, self).start()
MinRK
add cluster_id support to ipcluster/launchers...
r4848 class LocalEngineLauncher(LocalProcessLauncher, EngineMixin):
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 """Launch a single engine as a regular externall process."""
def find_args(self):
MinRK
add cluster_id support to ipcluster/launchers...
r4848 return self.engine_cmd + self.cluster_args + self.engine_args
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605
MinRK
add cluster_id support to ipcluster/launchers...
r4848 class LocalEngineSetLauncher(LocalEngineLauncher):
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 """Launch a set of engines as regular external processes."""
MinRK
add delay configurable to EngineSetLaunchers...
r4587 delay = CFloat(0.1, config=True,
help="""delay (in seconds) between starting each engine after the first.
This can help force the engines to get their ids in order, or limit
process flood when starting many engines."""
)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 # launcher class
launcher_class = LocalEngineLauncher
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
adjustments to PBS/SGE, SSH Launchers + docs update
r3647 launchers = Dict()
stop_data = Dict()
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
rework logging connections
r3610 def __init__(self, work_dir=u'.', config=None, **kwargs):
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 super(LocalEngineSetLauncher, self).__init__(
MinRK
rework logging connections
r3610 work_dir=work_dir, config=config, **kwargs
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 )
self.stop_data = {}
MinRK
add cluster_id support to ipcluster/launchers...
r4848 def start(self, n):
MinRK
update parallel apps to use ProfileDir
r3992 """Start n engines by profile or profile_dir."""
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 dlist = []
for i in range(n):
MinRK
add delay configurable to EngineSetLaunchers...
r4587 if i > 0:
time.sleep(self.delay)
MinRK
add cluster_id support to ipcluster/launchers...
r4848 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log,
profile_dir=self.profile_dir, cluster_id=self.cluster_id,
)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 # Copy the engine args over to each engine launcher.
MinRK
add cluster_id support to ipcluster/launchers...
r4848 el.engine_cmd = copy.deepcopy(self.engine_cmd)
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 el.engine_args = copy.deepcopy(self.engine_args)
el.on_stop(self._notice_engine_stopped)
MinRK
add cluster_id support to ipcluster/launchers...
r4848 d = el.start()
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 if i==0:
MinRK
rework logging connections
r3610 self.log.info("Starting LocalEngineSetLauncher: %r" % el.args)
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 self.launchers[i] = el
dlist.append(d)
self.notify_start(dlist)
return dlist
def find_args(self):
return ['engine set']
def signal(self, sig):
dlist = []
for el in self.launchers.itervalues():
d = el.signal(sig)
dlist.append(d)
return dlist
def interrupt_then_kill(self, delay=1.0):
dlist = []
for el in self.launchers.itervalues():
d = el.interrupt_then_kill(delay)
dlist.append(d)
return dlist
def stop(self):
return self.interrupt_then_kill()
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 def _notice_engine_stopped(self, data):
pid = data['pid']
for idx,el in self.launchers.iteritems():
if el.process.pid == pid:
break
self.launchers.pop(idx)
self.stop_data[idx] = data
if not self.launchers:
self.notify_stop(self.stop_data)
#-----------------------------------------------------------------------------
# MPIExec launchers
#-----------------------------------------------------------------------------
class MPIExecLauncher(LocalProcessLauncher):
"""Launch an external process using mpiexec."""
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 mpi_cmd = List(['mpiexec'], config=True,
help="The mpiexec command to use in starting the process."
)
mpi_args = List([], config=True,
help="The command line arguments to pass to mpiexec."
)
MinRK
add cluster_id support to ipcluster/launchers...
r4848 program = List(['date'],
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 help="The program to start via mpiexec.")
MinRK
add cluster_id support to ipcluster/launchers...
r4848 program_args = List([],
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 help="The command line argument to the program."
)
MinRK
add Integer traitlet...
r5344 n = Integer(1)
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605
def find_args(self):
"""Build self.args using all the fields."""
MinRK
adjustments to PBS/SGE, SSH Launchers + docs update
r3647 return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 self.program + self.program_args
def start(self, n):
"""Start n instances of the program using mpiexec."""
self.n = n
return super(MPIExecLauncher, self).start()
MinRK
add cluster_id support to ipcluster/launchers...
r4848 class MPIExecControllerLauncher(MPIExecLauncher, ControllerMixin):
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 """Launch a controller using mpiexec."""
MinRK
add cluster_id support to ipcluster/launchers...
r4848 # alias back to *non-configurable* program[_args] for use in find_args()
# this way all Controller/EngineSetLaunchers have the same form, rather
# than *some* having `program_args` and others `controller_args`
@property
def program(self):
return self.controller_cmd
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
add cluster_id support to ipcluster/launchers...
r4848 @property
def program_args(self):
return self.cluster_args + self.controller_args
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605
MinRK
add cluster_id support to ipcluster/launchers...
r4848 def start(self):
MinRK
update parallel apps to use ProfileDir
r3992 """Start the controller by profile_dir."""
MinRK
rework logging connections
r3610 self.log.info("Starting MPIExecControllerLauncher: %r" % self.args)
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 return super(MPIExecControllerLauncher, self).start(1)
MinRK
add cluster_id support to ipcluster/launchers...
r4848 class MPIExecEngineSetLauncher(MPIExecLauncher, EngineMixin):
"""Launch engines using mpiexec"""
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605
MinRK
add cluster_id support to ipcluster/launchers...
r4848 # alias back to *non-configurable* program[_args] for use in find_args()
# this way all Controller/EngineSetLaunchers have the same form, rather
# than *some* having `program_args` and others `controller_args`
@property
def program(self):
return self.engine_cmd
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
add cluster_id support to ipcluster/launchers...
r4848 @property
def program_args(self):
return self.cluster_args + self.engine_args
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605
MinRK
add cluster_id support to ipcluster/launchers...
r4848 def start(self, n):
MinRK
update parallel apps to use ProfileDir
r3992 """Start n engines by profile or profile_dir."""
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 self.n = n
MinRK
rework logging connections
r3610 self.log.info('Starting MPIExecEngineSetLauncher: %r' % self.args)
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 return super(MPIExecEngineSetLauncher, self).start(n)
#-----------------------------------------------------------------------------
# SSH launchers
#-----------------------------------------------------------------------------
MinRK
scrub twisted/deferred references from launchers...
r4019 # TODO: Get SSH Launcher back to level of sshx in 0.10.2
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605
class SSHLauncher(LocalProcessLauncher):
"""A minimal launcher for ssh.
To be useful this will probably have to be extended to use the ``sshx``
idea for environment variables. There could be other things this needs
as well.
"""
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 ssh_cmd = List(['ssh'], config=True,
help="command for starting ssh")
ssh_args = List(['-tt'], config=True,
help="args to pass to ssh")
MinRK
add cluster_id support to ipcluster/launchers...
r4848 program = List(['date'],
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 help="Program to launch via ssh")
MinRK
add cluster_id support to ipcluster/launchers...
r4848 program_args = List([],
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 help="args to pass to remote program")
MinRK
cleanup parallel traits...
r3988 hostname = Unicode('', config=True,
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 help="hostname on which to launch the program")
MinRK
cleanup parallel traits...
r3988 user = Unicode('', config=True,
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 help="username for ssh")
MinRK
cleanup parallel traits...
r3988 location = Unicode('', config=True,
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 help="user@hostname location for ssh in one setting")
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605
def _hostname_changed(self, name, old, new):
MinRK
adjustments to PBS/SGE, SSH Launchers + docs update
r3647 if self.user:
MinRK
Update PBS/SGE launchers with 0.10.1 options and defaults
r3659 self.location = u'%s@%s' % (self.user, new)
MinRK
adjustments to PBS/SGE, SSH Launchers + docs update
r3647 else:
self.location = new
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605
def _user_changed(self, name, old, new):
MinRK
Update PBS/SGE launchers with 0.10.1 options and defaults
r3659 self.location = u'%s@%s' % (new, self.hostname)
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605
def find_args(self):
return self.ssh_cmd + self.ssh_args + [self.location] + \
self.program + self.program_args
MinRK
add cluster_id support to ipcluster/launchers...
r4848 def start(self, hostname=None, user=None):
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 if hostname is not None:
self.hostname = hostname
if user is not None:
self.user = user
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 return super(SSHLauncher, self).start()
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
add ipcluster engines; fix ssh process shutdown
r3615 def signal(self, sig):
if self.state == 'running':
# send escaped ssh connection-closer
self.process.stdin.write('~.')
self.process.stdin.flush()
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605
MinRK
add cluster_id support to ipcluster/launchers...
r4848 class SSHControllerLauncher(SSHLauncher, ControllerMixin):
# alias back to *non-configurable* program[_args] for use in find_args()
# this way all Controller/EngineSetLaunchers have the same form, rather
# than *some* having `program_args` and others `controller_args`
@property
def program(self):
return self.controller_cmd
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
add cluster_id support to ipcluster/launchers...
r4848 @property
def program_args(self):
return self.cluster_args + self.controller_args
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605
MinRK
add cluster_id support to ipcluster/launchers...
r4848 class SSHEngineLauncher(SSHLauncher, EngineMixin):
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605
MinRK
add cluster_id support to ipcluster/launchers...
r4848 # alias back to *non-configurable* program[_args] for use in find_args()
# this way all Controller/EngineSetLaunchers have the same form, rather
# than *some* having `program_args` and others `controller_args`
@property
def program(self):
return self.engine_cmd
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
add cluster_id support to ipcluster/launchers...
r4848 @property
def program_args(self):
return self.cluster_args + self.engine_args
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 class SSHEngineSetLauncher(LocalEngineSetLauncher):
launcher_class = SSHEngineLauncher
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 engines = Dict(config=True,
help="""dict of engines to launch. This is a dict by hostname of ints,
corresponding to the number of engines to start on that host.""")
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
add cluster_id support to ipcluster/launchers...
r4848 def start(self, n):
MinRK
update parallel apps to use ProfileDir
r3992 """Start engines by profile or profile_dir.
MinRK
adjustments to PBS/SGE, SSH Launchers + docs update
r3647 `n` is ignored, and the `engines` config property is used instead.
"""
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
adjustments to PBS/SGE, SSH Launchers + docs update
r3647 dlist = []
for host, n in self.engines.iteritems():
if isinstance(n, (tuple, list)):
n, args = n
else:
args = copy.deepcopy(self.engine_args)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
adjustments to PBS/SGE, SSH Launchers + docs update
r3647 if '@' in host:
user,host = host.split('@',1)
else:
user=None
for i in range(n):
MinRK
add delay configurable to EngineSetLaunchers...
r4587 if i > 0:
time.sleep(self.delay)
MinRK
add cluster_id support to ipcluster/launchers...
r4848 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log,
profile_dir=self.profile_dir, cluster_id=self.cluster_id,
)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
adjustments to PBS/SGE, SSH Launchers + docs update
r3647 # Copy the engine args over to each engine launcher.
MinRK
add cluster_id support to ipcluster/launchers...
r4848 el.engine_cmd = self.engine_cmd
el.engine_args = args
MinRK
adjustments to PBS/SGE, SSH Launchers + docs update
r3647 el.on_stop(self._notice_engine_stopped)
MinRK
add cluster_id support to ipcluster/launchers...
r4848 d = el.start(user=user, hostname=host)
MinRK
adjustments to PBS/SGE, SSH Launchers + docs update
r3647 if i==0:
self.log.info("Starting SSHEngineSetLauncher: %r" % el.args)
MinRK
add '/' separator to keys in SSH engine dict...
r5181 self.launchers[ "%s/%i" % (host,i) ] = el
MinRK
adjustments to PBS/SGE, SSH Launchers + docs update
r3647 dlist.append(d)
self.notify_start(dlist)
return dlist
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605
#-----------------------------------------------------------------------------
# Windows HPC Server 2008 scheduler launchers
#-----------------------------------------------------------------------------
MinRK
untwist PBS, WinHPC Launchers in newparallel
r3613 # This is only used on Windows.
def find_job_cmd():
MinRK
forward subprocess IO over zmq on Windows...
r3771 if WINDOWS:
MinRK
untwist PBS, WinHPC Launchers in newparallel
r3613 try:
return find_cmd('job')
MinRK
handle potentially absent win32api in launcher.py
r3770 except (FindCmdError, ImportError):
# ImportError will be raised if win32api is not installed
MinRK
untwist PBS, WinHPC Launchers in newparallel
r3613 return 'job'
else:
return 'job'
class WindowsHPCLauncher(BaseLauncher):
MinRK
cleanup parallel traits...
r3988 job_id_regexp = Unicode(r'\d+', config=True,
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 help="""A regular expression used to get the job id from the output of the
submit_command. """
)
MinRK
cleanup parallel traits...
r3988 job_file_name = Unicode(u'ipython_job.xml', config=True,
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 help="The filename of the instantiated job script.")
MinRK
untwist PBS, WinHPC Launchers in newparallel
r3613 # The full path to the instantiated job script. This gets made dynamically
# by combining the work_dir with the job_file_name.
MinRK
cleanup parallel traits...
r3988 job_file = Unicode(u'')
scheduler = Unicode('', config=True,
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 help="The hostname of the scheduler to submit the job to.")
MinRK
cleanup parallel traits...
r3988 job_cmd = Unicode(find_job_cmd(), config=True,
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 help="The command for submitting jobs.")
MinRK
untwist PBS, WinHPC Launchers in newparallel
r3613
MinRK
add ipcluster engines; fix ssh process shutdown
r3615 def __init__(self, work_dir=u'.', config=None, **kwargs):
MinRK
untwist PBS, WinHPC Launchers in newparallel
r3613 super(WindowsHPCLauncher, self).__init__(
MinRK
add ipcluster engines; fix ssh process shutdown
r3615 work_dir=work_dir, config=config, **kwargs
MinRK
untwist PBS, WinHPC Launchers in newparallel
r3613 )
@property
def job_file(self):
return os.path.join(self.work_dir, self.job_file_name)
def write_job_file(self, n):
raise NotImplementedError("Implement write_job_file in a subclass.")
def find_args(self):
MinRK
Update PBS/SGE launchers with 0.10.1 options and defaults
r3659 return [u'job.exe']
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
untwist PBS, WinHPC Launchers in newparallel
r3613 def parse_job_id(self, output):
"""Take the output of the submit command and return the job id."""
m = re.search(self.job_id_regexp, output)
if m is not None:
job_id = m.group()
else:
raise LauncherError("Job id couldn't be determined: %s" % output)
self.job_id = job_id
self.log.info('Job started with job id: %r' % job_id)
return job_id
def start(self, n):
"""Start n copies of the process using the Win HPC job scheduler."""
self.write_job_file(n)
args = [
'submit',
'/jobfile:%s' % self.job_file,
'/scheduler:%s' % self.scheduler
]
self.log.info("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
MinRK
scrub twisted/deferred references from launchers...
r4019
MinRK
untwist PBS, WinHPC Launchers in newparallel
r3613 output = check_output([self.job_cmd]+args,
env=os.environ,
cwd=self.work_dir,
stderr=STDOUT
)
job_id = self.parse_job_id(output)
MinRK
adjustments to PBS/SGE, SSH Launchers + docs update
r3647 self.notify_start(job_id)
MinRK
untwist PBS, WinHPC Launchers in newparallel
r3613 return job_id
def stop(self):
args = [
'cancel',
self.job_id,
'/scheduler:%s' % self.scheduler
]
self.log.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
try:
output = check_output([self.job_cmd]+args,
env=os.environ,
cwd=self.work_dir,
stderr=STDOUT
)
except:
output = 'The job already appears to be stoppped: %r' % self.job_id
MinRK
adjustments to PBS/SGE, SSH Launchers + docs update
r3647 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
MinRK
untwist PBS, WinHPC Launchers in newparallel
r3613 return output
MinRK
add cluster_id support to ipcluster/launchers...
r4848 class WindowsHPCControllerLauncher(WindowsHPCLauncher, ClusterAppMixin):
MinRK
untwist PBS, WinHPC Launchers in newparallel
r3613
MinRK
cleanup parallel traits...
r3988 job_file_name = Unicode(u'ipcontroller_job.xml', config=True,
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 help="WinHPC xml job file.")
MinRK
add cluster_id support to ipcluster/launchers...
r4848 controller_args = List([], config=False,
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 help="extra args to pass to ipcontroller")
MinRK
untwist PBS, WinHPC Launchers in newparallel
r3613
def write_job_file(self, n):
job = IPControllerJob(config=self.config)
t = IPControllerTask(config=self.config)
Bernardo B. Marques
remove all trailling spaces
r4872 # The tasks work directory is *not* the actual work directory of
MinRK
untwist PBS, WinHPC Launchers in newparallel
r3613 # the controller. It is used as the base path for the stdout/stderr
# files that the scheduler redirects to.
MinRK
update parallel apps to use ProfileDir
r3992 t.work_directory = self.profile_dir
# Add the profile_dir and from self.start().
MinRK
add cluster_id support to ipcluster/launchers...
r4848 t.controller_args.extend(self.cluster_args)
t.controller_args.extend(self.controller_args)
MinRK
untwist PBS, WinHPC Launchers in newparallel
r3613 job.add_task(t)
self.log.info("Writing job description file: %s" % self.job_file)
job.write(self.job_file)
@property
def job_file(self):
MinRK
update parallel apps to use ProfileDir
r3992 return os.path.join(self.profile_dir, self.job_file_name)
MinRK
untwist PBS, WinHPC Launchers in newparallel
r3613
MinRK
add cluster_id support to ipcluster/launchers...
r4848 def start(self):
MinRK
update parallel apps to use ProfileDir
r3992 """Start the controller by profile_dir."""
MinRK
untwist PBS, WinHPC Launchers in newparallel
r3613 return super(WindowsHPCControllerLauncher, self).start(1)
MinRK
add cluster_id support to ipcluster/launchers...
r4848 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher, ClusterAppMixin):
MinRK
untwist PBS, WinHPC Launchers in newparallel
r3613
MinRK
cleanup parallel traits...
r3988 job_file_name = Unicode(u'ipengineset_job.xml', config=True,
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 help="jobfile for ipengines job")
MinRK
add cluster_id support to ipcluster/launchers...
r4848 engine_args = List([], config=False,
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 help="extra args to pas to ipengine")
MinRK
untwist PBS, WinHPC Launchers in newparallel
r3613
def write_job_file(self, n):
job = IPEngineSetJob(config=self.config)
for i in range(n):
t = IPEngineTask(config=self.config)
Bernardo B. Marques
remove all trailling spaces
r4872 # The tasks work directory is *not* the actual work directory of
MinRK
untwist PBS, WinHPC Launchers in newparallel
r3613 # the engine. It is used as the base path for the stdout/stderr
# files that the scheduler redirects to.
MinRK
update parallel apps to use ProfileDir
r3992 t.work_directory = self.profile_dir
# Add the profile_dir and from self.start().
MinRK
add cluster_id support to ipcluster/launchers...
r4848 t.controller_args.extend(self.cluster_args)
t.controller_args.extend(self.engine_args)
MinRK
untwist PBS, WinHPC Launchers in newparallel
r3613 job.add_task(t)
self.log.info("Writing job description file: %s" % self.job_file)
job.write(self.job_file)
@property
def job_file(self):
MinRK
update parallel apps to use ProfileDir
r3992 return os.path.join(self.profile_dir, self.job_file_name)
MinRK
untwist PBS, WinHPC Launchers in newparallel
r3613
MinRK
add cluster_id support to ipcluster/launchers...
r4848 def start(self, n):
MinRK
update parallel apps to use ProfileDir
r3992 """Start the controller by profile_dir."""
MinRK
untwist PBS, WinHPC Launchers in newparallel
r3613 return super(WindowsHPCEngineSetLauncher, self).start(n)
#-----------------------------------------------------------------------------
# Batch (PBS) system launchers
#-----------------------------------------------------------------------------
MinRK
add cluster_id support to ipcluster/launchers...
r4848 class BatchClusterAppMixin(ClusterAppMixin):
MinRK
parallel.apps cleanup per review...
r4850 """ClusterApp mixin that updates the self.context dict, rather than cl-args."""
MinRK
add cluster_id support to ipcluster/launchers...
r4848 def _profile_dir_changed(self, name, old, new):
self.context[name] = new
_cluster_id_changed = _profile_dir_changed
MinRK
parallel.apps cleanup per review...
r4850 def _profile_dir_default(self):
self.context['profile_dir'] = ''
return ''
def _cluster_id_default(self):
self.context['cluster_id'] = ''
return ''
MinRK
untwist PBS, WinHPC Launchers in newparallel
r3613 class BatchSystemLauncher(BaseLauncher):
"""Launch an external process using a batch system.
This class is designed to work with UNIX batch systems like PBS, LSF,
GridEngine, etc. The overall model is that there are different commands
like qsub, qdel, etc. that handle the starting and stopping of the process.
This class also has the notion of a batch script. The ``batch_template``
attribute can be set to a string that is a template for the batch script.
Thomas Kluyver
Replace Itpl with str.format in parallel launcher....
r4003 This template is instantiated using string formatting. Thus the template can
use {n} fot the number of instances. Subclasses can add additional variables
MinRK
untwist PBS, WinHPC Launchers in newparallel
r3613 to the template dict.
"""
# Subclasses must fill these in. See PBSEngineSet
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 submit_command = List([''], config=True,
help="The name of the command line program used to submit jobs.")
delete_command = List([''], config=True,
help="The name of the command line program used to delete jobs.")
MinRK
cleanup parallel traits...
r3988 job_id_regexp = Unicode('', config=True,
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 help="""A regular expression used to get the job id from the output of the
submit_command.""")
MinRK
cleanup parallel traits...
r3988 batch_template = Unicode('', config=True,
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 help="The string that is the batch script template itself.")
MinRK
cleanup parallel traits...
r3988 batch_template_file = Unicode(u'', config=True,
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 help="The file that contains the batch template.")
MinRK
cleanup parallel traits...
r3988 batch_file_name = Unicode(u'batch_script', config=True,
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 help="The filename of the instantiated batch script.")
MinRK
cleanup parallel traits...
r3988 queue = Unicode(u'', config=True,
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 help="The PBS Queue.")
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
add cluster_id support to ipcluster/launchers...
r4848 def _queue_changed(self, name, old, new):
self.context[name] = new
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
add Integer traitlet...
r5344 n = Integer(1)
MinRK
add cluster_id support to ipcluster/launchers...
r4848 _n_changed = _queue_changed
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Update PBS/SGE launchers with 0.10.1 options and defaults
r3659 # not configurable, override in subclasses
# PBS Job Array regex
MinRK
cleanup parallel traits...
r3988 job_array_regexp = Unicode('')
job_array_template = Unicode('')
MinRK
Update PBS/SGE launchers with 0.10.1 options and defaults
r3659 # PBS Queue regex
MinRK
cleanup parallel traits...
r3988 queue_regexp = Unicode('')
queue_template = Unicode('')
MinRK
Update PBS/SGE launchers with 0.10.1 options and defaults
r3659 # The default batch template, override in subclasses
MinRK
cleanup parallel traits...
r3988 default_template = Unicode('')
MinRK
untwist PBS, WinHPC Launchers in newparallel
r3613 # The full path to the instantiated batch script.
MinRK
cleanup parallel traits...
r3988 batch_file = Unicode(u'')
MinRK
launcher updates for PBS
r3645 # the format dict used with batch_template:
context = Dict()
MinRK
fix initial context dict for batch launchers
r5305 def _context_default(self):
"""load the default context with the default values for the basic keys
because the _trait_changed methods only load the context if they
are set to something other than the default value.
"""
return dict(n=1, queue=u'', profile_dir=u'', cluster_id=u'')
MinRK
add EvalFormatter for batch system (PBS) launcher templates...
r4004 # the Formatter instance for rendering the templates:
formatter = Instance(EvalFormatter, (), {})
MinRK
untwist PBS, WinHPC Launchers in newparallel
r3613
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
launcher updates for PBS
r3645 def find_args(self):
MinRK
Update PBS/SGE launchers with 0.10.1 options and defaults
r3659 return self.submit_command + [self.batch_file]
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
add ipcluster engines; fix ssh process shutdown
r3615 def __init__(self, work_dir=u'.', config=None, **kwargs):
MinRK
untwist PBS, WinHPC Launchers in newparallel
r3613 super(BatchSystemLauncher, self).__init__(
MinRK
add ipcluster engines; fix ssh process shutdown
r3615 work_dir=work_dir, config=config, **kwargs
MinRK
untwist PBS, WinHPC Launchers in newparallel
r3613 )
self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
def parse_job_id(self, output):
"""Take the output of the submit command and return the job id."""
MinRK
adjustments to PBS/SGE, SSH Launchers + docs update
r3647 m = re.search(self.job_id_regexp, output)
MinRK
untwist PBS, WinHPC Launchers in newparallel
r3613 if m is not None:
job_id = m.group()
else:
raise LauncherError("Job id couldn't be determined: %s" % output)
self.job_id = job_id
MinRK
adjustments to PBS/SGE, SSH Launchers + docs update
r3647 self.log.info('Job submitted with job id: %r' % job_id)
MinRK
untwist PBS, WinHPC Launchers in newparallel
r3613 return job_id
def write_batch_script(self, n):
"""Instantiate and write the batch script to the work_dir."""
MinRK
add cluster_id support to ipcluster/launchers...
r4848 self.n = n
MinRK
Update PBS/SGE launchers with 0.10.1 options and defaults
r3659 # first priority is batch_template if set
if self.batch_template_file and not self.batch_template:
# second priority is batch_template_file
with open(self.batch_template_file) as f:
self.batch_template = f.read()
if not self.batch_template:
# third (last) priority is default_template
self.batch_template = self.default_template
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
don't automatically add jobarray or queue lines to user template...
r4183 # add jobarray or queue lines to user-specified template
# note that this is *only* when user did not specify a template.
regex = re.compile(self.job_array_regexp)
# print regex.search(self.batch_template)
if not regex.search(self.batch_template):
self.log.info("adding job array settings to batch script")
firstline, rest = self.batch_template.split('\n',1)
self.batch_template = u'\n'.join([firstline, self.job_array_template, rest])
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
don't automatically add jobarray or queue lines to user template...
r4183 regex = re.compile(self.queue_regexp)
# print regex.search(self.batch_template)
if self.queue and not regex.search(self.batch_template):
self.log.info("adding PBS queue settings to batch script")
firstline, rest = self.batch_template.split('\n',1)
self.batch_template = u'\n'.join([firstline, self.queue_template, rest])
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
add EvalFormatter for batch system (PBS) launcher templates...
r4004 script_as_string = self.formatter.format(self.batch_template, **self.context)
MinRK
untwist PBS, WinHPC Launchers in newparallel
r3613 self.log.info('Writing instantiated batch script: %s' % self.batch_file)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Update PBS/SGE launchers with 0.10.1 options and defaults
r3659 with open(self.batch_file, 'w') as f:
f.write(script_as_string)
os.chmod(self.batch_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
MinRK
untwist PBS, WinHPC Launchers in newparallel
r3613
MinRK
add cluster_id support to ipcluster/launchers...
r4848 def start(self, n):
MinRK
untwist PBS, WinHPC Launchers in newparallel
r3613 """Start n copies of the process using a batch system."""
Thomas Kluyver
Replace Itpl with str.format in parallel launcher....
r4003 # Here we save profile_dir in the context so they
# can be used in the batch script template as {profile_dir}
MinRK
untwist PBS, WinHPC Launchers in newparallel
r3613 self.write_batch_script(n)
MinRK
adjustments to PBS/SGE, SSH Launchers + docs update
r3647 output = check_output(self.args, env=os.environ)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
untwist PBS, WinHPC Launchers in newparallel
r3613 job_id = self.parse_job_id(output)
MinRK
adjustments to PBS/SGE, SSH Launchers + docs update
r3647 self.notify_start(job_id)
MinRK
untwist PBS, WinHPC Launchers in newparallel
r3613 return job_id
def stop(self):
MinRK
Update PBS/SGE launchers with 0.10.1 options and defaults
r3659 output = check_output(self.delete_command+[self.job_id], env=os.environ)
MinRK
adjustments to PBS/SGE, SSH Launchers + docs update
r3647 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
MinRK
untwist PBS, WinHPC Launchers in newparallel
r3613 return output
class PBSLauncher(BatchSystemLauncher):
"""A BatchSystemLauncher subclass for PBS."""
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 submit_command = List(['qsub'], config=True,
help="The PBS submit command ['qsub']")
delete_command = List(['qdel'], config=True,
help="The PBS delete command ['qsub']")
MinRK
cleanup parallel traits...
r3988 job_id_regexp = Unicode(r'\d+', config=True,
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 help="Regular expresion for identifying the job ID [r'\d+']")
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
cleanup parallel traits...
r3988 batch_file = Unicode(u'')
job_array_regexp = Unicode('#PBS\W+-t\W+[\w\d\-\$]+')
Thomas Kluyver
Replace Itpl with str.format in parallel launcher....
r4003 job_array_template = Unicode('#PBS -t 1-{n}')
MinRK
cleanup parallel traits...
r3988 queue_regexp = Unicode('#PBS\W+-q\W+\$?\w+')
Thomas Kluyver
Replace Itpl with str.format in parallel launcher....
r4003 queue_template = Unicode('#PBS -q {queue}')
MinRK
untwist PBS, WinHPC Launchers in newparallel
r3613
MinRK
parallel.apps cleanup per review...
r4850 class PBSControllerLauncher(PBSLauncher, BatchClusterAppMixin):
MinRK
untwist PBS, WinHPC Launchers in newparallel
r3613 """Launch a controller using PBS."""
MinRK
cleanup parallel traits...
r3988 batch_file_name = Unicode(u'pbs_controller', config=True,
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 help="batch file name for the controller job.")
MinRK
cleanup parallel traits...
r3988 default_template= Unicode("""#!/bin/sh
MinRK
Update PBS/SGE launchers with 0.10.1 options and defaults
r3659 #PBS -V
MinRK
rebase IPython.parallel after removal of IPython.kernel...
r3672 #PBS -N ipcontroller
MinRK
add cluster_id support to ipcluster/launchers...
r4848 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
MinRK
rebase IPython.parallel after removal of IPython.kernel...
r3672 """%(' '.join(ipcontroller_cmd_argv)))
MinRK
untwist PBS, WinHPC Launchers in newparallel
r3613
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
add cluster_id support to ipcluster/launchers...
r4848 def start(self):
MinRK
update parallel apps to use ProfileDir
r3992 """Start the controller by profile or profile_dir."""
MinRK
untwist PBS, WinHPC Launchers in newparallel
r3613 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
MinRK
add cluster_id support to ipcluster/launchers...
r4848 return super(PBSControllerLauncher, self).start(1)
MinRK
untwist PBS, WinHPC Launchers in newparallel
r3613
MinRK
parallel.apps cleanup per review...
r4850 class PBSEngineSetLauncher(PBSLauncher, BatchClusterAppMixin):
MinRK
Update PBS/SGE launchers with 0.10.1 options and defaults
r3659 """Launch Engines using PBS"""
MinRK
cleanup parallel traits...
r3988 batch_file_name = Unicode(u'pbs_engines', config=True,
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 help="batch file name for the engine(s) job.")
MinRK
cleanup parallel traits...
r3988 default_template= Unicode(u"""#!/bin/sh
MinRK
Update PBS/SGE launchers with 0.10.1 options and defaults
r3659 #PBS -V
MinRK
rebase IPython.parallel after removal of IPython.kernel...
r3672 #PBS -N ipengine
MinRK
add cluster_id support to ipcluster/launchers...
r4848 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
MinRK
rebase IPython.parallel after removal of IPython.kernel...
r3672 """%(' '.join(ipengine_cmd_argv)))
MinRK
untwist PBS, WinHPC Launchers in newparallel
r3613
MinRK
add cluster_id support to ipcluster/launchers...
r4848 def start(self, n):
MinRK
update parallel apps to use ProfileDir
r3992 """Start n engines by profile or profile_dir."""
MinRK
SGE test related fixes...
r3668 self.log.info('Starting %i engines with PBSEngineSetLauncher: %r' % (n, self.args))
MinRK
add cluster_id support to ipcluster/launchers...
r4848 return super(PBSEngineSetLauncher, self).start(n)
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605
MinRK
Update PBS/SGE launchers with 0.10.1 options and defaults
r3659 #SGE is very similar to PBS
class SGELauncher(PBSLauncher):
"""Sun GridEngine is a PBS clone with slightly different syntax"""
Thomas Kluyver
Replace Itpl with str.format in parallel launcher....
r4003 job_array_regexp = Unicode('#\$\W+\-t')
job_array_template = Unicode('#$ -t 1-{n}')
queue_regexp = Unicode('#\$\W+-q\W+\$?\w+')
MinRK
fix remaining Itpl syntax in SGE queue_template...
r4089 queue_template = Unicode('#$ -q {queue}')
MinRK
Update PBS/SGE launchers with 0.10.1 options and defaults
r3659
MinRK
parallel.apps cleanup per review...
r4850 class SGEControllerLauncher(SGELauncher, BatchClusterAppMixin):
MinRK
Update PBS/SGE launchers with 0.10.1 options and defaults
r3659 """Launch a controller using SGE."""
MinRK
cleanup parallel traits...
r3988 batch_file_name = Unicode(u'sge_controller', config=True,
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 help="batch file name for the ipontroller job.")
Thomas Kluyver
Replace Itpl with str.format in parallel launcher....
r4003 default_template= Unicode(u"""#$ -V
#$ -S /bin/sh
#$ -N ipcontroller
MinRK
add cluster_id support to ipcluster/launchers...
r4848 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
MinRK
rebase IPython.parallel after removal of IPython.kernel...
r3672 """%(' '.join(ipcontroller_cmd_argv)))
MinRK
Update PBS/SGE launchers with 0.10.1 options and defaults
r3659
MinRK
add cluster_id support to ipcluster/launchers...
r4848 def start(self):
MinRK
update parallel apps to use ProfileDir
r3992 """Start the controller by profile or profile_dir."""
MinRK
fix initial context dict for batch launchers
r5305 self.log.info("Starting SGEControllerLauncher: %r" % self.args)
MinRK
add cluster_id support to ipcluster/launchers...
r4848 return super(SGEControllerLauncher, self).start(1)
MinRK
Update PBS/SGE launchers with 0.10.1 options and defaults
r3659
MinRK
parallel.apps cleanup per review...
r4850 class SGEEngineSetLauncher(SGELauncher, BatchClusterAppMixin):
MinRK
Update PBS/SGE launchers with 0.10.1 options and defaults
r3659 """Launch Engines with SGE"""
MinRK
cleanup parallel traits...
r3988 batch_file_name = Unicode(u'sge_engines', config=True,
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 help="batch file name for the engine(s) job.")
Thomas Kluyver
Replace Itpl with str.format in parallel launcher....
r4003 default_template = Unicode("""#$ -V
#$ -S /bin/sh
#$ -N ipengine
MinRK
add cluster_id support to ipcluster/launchers...
r4848 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
MinRK
rebase IPython.parallel after removal of IPython.kernel...
r3672 """%(' '.join(ipengine_cmd_argv)))
MinRK
Update PBS/SGE launchers with 0.10.1 options and defaults
r3659
MinRK
add cluster_id support to ipcluster/launchers...
r4848 def start(self, n):
MinRK
update parallel apps to use ProfileDir
r3992 """Start n engines by profile or profile_dir."""
MinRK
SGE test related fixes...
r3668 self.log.info('Starting %i engines with SGEEngineSetLauncher: %r' % (n, self.args))
MinRK
add cluster_id support to ipcluster/launchers...
r4848 return super(SGEEngineSetLauncher, self).start(n)
MinRK
Update PBS/SGE launchers with 0.10.1 options and defaults
r3659
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605
Johann Cohen-Tanugi
add LSF launcher for ipcluster
r4229 # LSF launchers
class LSFLauncher(BatchSystemLauncher):
"""A BatchSystemLauncher subclass for LSF."""
Bernardo B. Marques
remove all trailling spaces
r4872
Johann Cohen-Tanugi
add LSF launcher for ipcluster
r4229 submit_command = List(['bsub'], config=True,
help="The PBS submit command ['bsub']")
delete_command = List(['bkill'], config=True,
help="The PBS delete command ['bkill']")
job_id_regexp = Unicode(r'\d+', config=True,
help="Regular expresion for identifying the job ID [r'\d+']")
Bernardo B. Marques
remove all trailling spaces
r4872
Johann Cohen-Tanugi
add LSF launcher for ipcluster
r4229 batch_file = Unicode(u'')
job_array_regexp = Unicode('#BSUB[ \t]-J+\w+\[\d+-\d+\]')
job_array_template = Unicode('#BSUB -J ipengine[1-{n}]')
queue_regexp = Unicode('#BSUB[ \t]+-q[ \t]+\w+')
queue_template = Unicode('#BSUB -q {queue}')
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
add cluster_id support to ipcluster/launchers...
r4848 def start(self, n):
Johann Cohen-Tanugi
add LSF launcher for ipcluster
r4229 """Start n copies of the process using LSF batch system.
This cant inherit from the base class because bsub expects
to be piped a shell script in order to honor the #BSUB directives :
bsub < script
"""
# Here we save profile_dir in the context so they
# can be used in the batch script template as {profile_dir}
self.write_batch_script(n)
#output = check_output(self.args, env=os.environ)
piped_cmd = self.args[0]+'<\"'+self.args[1]+'\"'
p = Popen(piped_cmd, shell=True,env=os.environ,stdout=PIPE)
output,err = p.communicate()
job_id = self.parse_job_id(output)
self.notify_start(job_id)
return job_id
MinRK
parallel.apps cleanup per review...
r4850 class LSFControllerLauncher(LSFLauncher, BatchClusterAppMixin):
Johann Cohen-Tanugi
add LSF launcher for ipcluster
r4229 """Launch a controller using LSF."""
Bernardo B. Marques
remove all trailling spaces
r4872
Johann Cohen-Tanugi
add LSF launcher for ipcluster
r4229 batch_file_name = Unicode(u'lsf_controller', config=True,
help="batch file name for the controller job.")
default_template= Unicode("""#!/bin/sh
#BSUB -J ipcontroller
Bernardo B. Marques
remove all trailling spaces
r4872 #BSUB -oo ipcontroller.o.%%J
#BSUB -eo ipcontroller.e.%%J
MinRK
add cluster_id support to ipcluster/launchers...
r4848 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
Johann Cohen-Tanugi
add LSF launcher for ipcluster
r4229 """%(' '.join(ipcontroller_cmd_argv)))
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
add cluster_id support to ipcluster/launchers...
r4848 def start(self):
Johann Cohen-Tanugi
add LSF launcher for ipcluster
r4229 """Start the controller by profile or profile_dir."""
self.log.info("Starting LSFControllerLauncher: %r" % self.args)
MinRK
add cluster_id support to ipcluster/launchers...
r4848 return super(LSFControllerLauncher, self).start(1)
Johann Cohen-Tanugi
add LSF launcher for ipcluster
r4229
MinRK
parallel.apps cleanup per review...
r4850 class LSFEngineSetLauncher(LSFLauncher, BatchClusterAppMixin):
Johann Cohen-Tanugi
add LSF launcher for ipcluster
r4229 """Launch Engines using LSF"""
batch_file_name = Unicode(u'lsf_engines', config=True,
help="batch file name for the engine(s) job.")
default_template= Unicode(u"""#!/bin/sh
Bernardo B. Marques
remove all trailling spaces
r4872 #BSUB -oo ipengine.o.%%J
#BSUB -eo ipengine.e.%%J
MinRK
add cluster_id support to ipcluster/launchers...
r4848 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
Johann Cohen-Tanugi
add LSF launcher for ipcluster
r4229 """%(' '.join(ipengine_cmd_argv)))
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
add cluster_id support to ipcluster/launchers...
r4848 def start(self, n):
Johann Cohen-Tanugi
add LSF launcher for ipcluster
r4229 """Start n engines by profile or profile_dir."""
self.log.info('Starting %i engines with LSFEngineSetLauncher: %r' % (n, self.args))
MinRK
add cluster_id support to ipcluster/launchers...
r4848 return super(LSFEngineSetLauncher, self).start(n)
Johann Cohen-Tanugi
add LSF launcher for ipcluster
r4229
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 #-----------------------------------------------------------------------------
# A launcher for ipcluster itself!
#-----------------------------------------------------------------------------
class IPClusterLauncher(LocalProcessLauncher):
"""Launch the ipcluster program in an external process."""
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 ipcluster_cmd = List(ipcluster_cmd_argv, config=True,
help="Popen command for ipcluster")
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 ipcluster_args = List(
MinRK
aliases match flag pattern ('-' as wordsep, not '_')...
r4214 ['--clean-logs', '--log-to-file', '--log-level=%i'%logging.INFO], config=True,
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 help="Command line arguments to pass to ipcluster.")
MinRK
cleanup parallel traits...
r3988 ipcluster_subcommand = Unicode('start')
MinRK
add Integer traitlet...
r5344 ipcluster_n = Integer(2)
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605
def find_args(self):
MinRK
disallow no-prefix `ipython foo=bar` argument style....
r4197 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
['--n=%i'%self.ipcluster_n] + self.ipcluster_args
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605
def start(self):
MinRK
rework logging connections
r3610 self.log.info("Starting ipcluster: %r" % self.args)
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 return super(IPClusterLauncher, self).start()
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 #-----------------------------------------------------------------------------
# Collections of launchers
#-----------------------------------------------------------------------------
local_launchers = [
LocalControllerLauncher,
LocalEngineLauncher,
LocalEngineSetLauncher,
]
mpi_launchers = [
MPIExecLauncher,
MPIExecControllerLauncher,
MPIExecEngineSetLauncher,
]
ssh_launchers = [
SSHLauncher,
SSHControllerLauncher,
SSHEngineLauncher,
SSHEngineSetLauncher,
]
winhpc_launchers = [
WindowsHPCLauncher,
WindowsHPCControllerLauncher,
WindowsHPCEngineSetLauncher,
]
pbs_launchers = [
PBSLauncher,
PBSControllerLauncher,
PBSEngineSetLauncher,
]
sge_launchers = [
SGELauncher,
SGEControllerLauncher,
SGEEngineSetLauncher,
]
Johann Cohen-Tanugi
add LSF launcher for ipcluster
r4229 lsf_launchers = [
LSFLauncher,
LSFControllerLauncher,
LSFEngineSetLauncher,
]
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 all_launchers = local_launchers + mpi_launchers + ssh_launchers + winhpc_launchers\
Johann Cohen-Tanugi
add LSF launcher for ipcluster
r4229 + pbs_launchers + sge_launchers + lsf_launchers
MinRK
scrub twisted/deferred references from launchers...
r4019