##// END OF EJS Templates
rewrite pastefig() qt doc section for display()
rewrite pastefig() qt doc section for display()

File last commit:

r4229:30efc065
r4398:0fcec1e3
Show More
launcher.py
1142 lines | 39.8 KiB | text/x-python | PythonLexer
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 #!/usr/bin/env python
# encoding: utf-8
"""
Facilities for launching IPython processes asynchronously.
MinRK
update recently changed modules with Authors in docstring
r4018
Authors:
* Brian Granger
* MinRK
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 """
#-----------------------------------------------------------------------------
MinRK
update recently changed modules with Authors in docstring
r4018 # Copyright (C) 2008-2011 The IPython Development Team
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 #
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-----------------------------------------------------------------------------
#-----------------------------------------------------------------------------
# Imports
#-----------------------------------------------------------------------------
MinRK
adjustments to PBS/SGE, SSH Launchers + docs update
r3647 import copy
MinRK
resort imports in a cleaner order
r3631 import logging
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 import os
import re
MinRK
Update PBS/SGE launchers with 0.10.1 options and defaults
r3659 import stat
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605
MinRK
improve process cleanup on Windows...
r3778 # signal imports, handling various platforms, versions
MinRK
untwist PBS, WinHPC Launchers in newparallel
r3613 from signal import SIGINT, SIGTERM
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 try:
from signal import SIGKILL
except ImportError:
MinRK
improve process cleanup on Windows...
r3778 # Windows
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 SIGKILL=SIGTERM
MinRK
improve process cleanup on Windows...
r3778 try:
# Windows >= 2.7, 3.2
from signal import CTRL_C_EVENT as SIGINT
except ImportError:
pass
MinRK
untwist PBS, WinHPC Launchers in newparallel
r3613 from subprocess import Popen, PIPE, STDOUT
try:
MinRK
add ipcluster engines; fix ssh process shutdown
r3615 from subprocess import check_output
MinRK
untwist PBS, WinHPC Launchers in newparallel
r3613 except ImportError:
MinRK
adjustments to PBS/SGE, SSH Launchers + docs update
r3647 # pre-2.7, define check_output with Popen
MinRK
add ipcluster engines; fix ssh process shutdown
r3615 def check_output(*args, **kwargs):
MinRK
adjustments to PBS/SGE, SSH Launchers + docs update
r3647 kwargs.update(dict(stdout=PIPE))
MinRK
untwist PBS, WinHPC Launchers in newparallel
r3613 p = Popen(*args, **kwargs)
out,err = p.communicate()
return out
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605
from zmq.eventloop import ioloop
MinRK
get default logger from Application.instance()
r4012 from IPython.config.application import Application
MinRK
add LoggingConfigurable base class
r4016 from IPython.config.configurable import LoggingConfigurable
MinRK
add EvalFormatter for batch system (PBS) launcher templates...
r4004 from IPython.utils.text import EvalFormatter
MinRK
cleanup parallel traits...
r3988 from IPython.utils.traitlets import Any, Int, List, Unicode, Dict, Instance
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 from IPython.utils.path import get_ipython_module_path
from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError
MinRK
forward subprocess IO over zmq on Windows...
r3771 from .win32support import forward_read_events
MinRK
improve process cleanup on Windows...
r3778 from .winhpcjob import IPControllerTask, IPEngineTask, IPControllerJob, IPEngineSetJob
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605
MinRK
forward subprocess IO over zmq on Windows...
r3771 WINDOWS = os.name == 'nt'
MinRK
interrupt windows subprocesses with CTRL-C instead of SIGINT...
r3772
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 #-----------------------------------------------------------------------------
# Paths to the kernel apps
#-----------------------------------------------------------------------------
MinRK
rebase IPython.parallel after removal of IPython.kernel...
r3672 ipcluster_cmd_argv = pycmd2argv(get_ipython_module_path(
MinRK
organize IPython.parallel into subpackages
r3673 'IPython.parallel.apps.ipclusterapp'
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 ))
MinRK
rebase IPython.parallel after removal of IPython.kernel...
r3672 ipengine_cmd_argv = pycmd2argv(get_ipython_module_path(
MinRK
organize IPython.parallel into subpackages
r3673 'IPython.parallel.apps.ipengineapp'
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 ))
MinRK
rebase IPython.parallel after removal of IPython.kernel...
r3672 ipcontroller_cmd_argv = pycmd2argv(get_ipython_module_path(
MinRK
organize IPython.parallel into subpackages
r3673 'IPython.parallel.apps.ipcontrollerapp'
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 ))
#-----------------------------------------------------------------------------
# Base launchers and errors
#-----------------------------------------------------------------------------
class LauncherError(Exception):
pass
class ProcessStateError(LauncherError):
pass
class UnknownStatus(LauncherError):
pass
MinRK
add LoggingConfigurable base class
r4016 class BaseLauncher(LoggingConfigurable):
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 """An asbtraction for starting, stopping and signaling a process."""
# In all of the launchers, the work_dir is where child processes will be
MinRK
update parallel apps to use ProfileDir
r3992 # run. This will usually be the profile_dir, but may not be. any work_dir
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 # passed into the __init__ method will override the config value.
# This should not be used to set the work_dir for the actual engine
# and controller. Instead, use their own config files or the
# controller_args, engine_args attributes of the launchers to add
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 # the work_dir option.
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 work_dir = Unicode(u'.')
loop = Instance('zmq.eventloop.ioloop.IOLoop')
MinRK
adjustments to PBS/SGE, SSH Launchers + docs update
r3647
start_data = Any()
stop_data = Any()
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 def _loop_default(self):
return ioloop.IOLoop.instance()
MinRK
rework logging connections
r3610 def __init__(self, work_dir=u'.', config=None, **kwargs):
super(BaseLauncher, self).__init__(work_dir=work_dir, config=config, **kwargs)
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 self.state = 'before' # can be before, running, after
self.stop_callbacks = []
self.start_data = None
self.stop_data = None
@property
def args(self):
"""A list of cmd and args that will be used to start the process.
This is what is passed to :func:`spawnProcess` and the first element
will be the process name.
"""
return self.find_args()
def find_args(self):
"""The ``.args`` property calls this to find the args list.
Subcommand should implement this to construct the cmd and args.
"""
raise NotImplementedError('find_args must be implemented in a subclass')
@property
def arg_str(self):
"""The string form of the program arguments."""
return ' '.join(self.args)
@property
def running(self):
"""Am I running."""
if self.state == 'running':
return True
else:
return False
def start(self):
MinRK
scrub twisted/deferred references from launchers...
r4019 """Start the process."""
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 raise NotImplementedError('start must be implemented in a subclass')
def stop(self):
"""Stop the process and notify observers of stopping.
MinRK
scrub twisted/deferred references from launchers...
r4019 This method will return None immediately.
To observe the actual process stopping, see :meth:`on_stop`.
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 """
raise NotImplementedError('stop must be implemented in a subclass')
def on_stop(self, f):
MinRK
scrub twisted/deferred references from launchers...
r4019 """Register a callback to be called with this Launcher's stop_data
when the process actually finishes.
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 """
if self.state=='after':
return f(self.stop_data)
else:
self.stop_callbacks.append(f)
def notify_start(self, data):
"""Call this to trigger startup actions.
This logs the process startup and sets the state to 'running'. It is
a pass-through so it can be used as a callback.
"""
MinRK
rework logging connections
r3610 self.log.info('Process %r started: %r' % (self.args[0], data))
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 self.start_data = data
self.state = 'running'
return data
def notify_stop(self, data):
"""Call this to trigger process stop actions.
This logs the process stopping and sets the state to 'after'. Call
MinRK
scrub twisted/deferred references from launchers...
r4019 this to trigger callbacks registered via :meth:`on_stop`."""
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605
MinRK
rework logging connections
r3610 self.log.info('Process %r stopped: %r' % (self.args[0], data))
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 self.stop_data = data
self.state = 'after'
for i in range(len(self.stop_callbacks)):
d = self.stop_callbacks.pop()
d(data)
return data
def signal(self, sig):
"""Signal the process.
Parameters
----------
sig : str or int
'KILL', 'INT', etc., or any signal number
"""
raise NotImplementedError('signal must be implemented in a subclass')
#-----------------------------------------------------------------------------
# Local process launchers
#-----------------------------------------------------------------------------
class LocalProcessLauncher(BaseLauncher):
"""Start and stop an external process in an asynchronous manner.
This will launch the external process with a working directory of
``self.work_dir``.
"""
# This is used to to construct self.args, which is passed to
# spawnProcess.
cmd_and_args = List([])
poll_frequency = Int(100) # in ms
MinRK
rework logging connections
r3610 def __init__(self, work_dir=u'.', config=None, **kwargs):
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 super(LocalProcessLauncher, self).__init__(
MinRK
rework logging connections
r3610 work_dir=work_dir, config=config, **kwargs
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 )
self.process = None
self.poller = None
def find_args(self):
return self.cmd_and_args
def start(self):
if self.state == 'before':
self.process = Popen(self.args,
stdout=PIPE,stderr=PIPE,stdin=PIPE,
env=os.environ,
cwd=self.work_dir
)
MinRK
forward subprocess IO over zmq on Windows...
r3771 if WINDOWS:
self.stdout = forward_read_events(self.process.stdout)
self.stderr = forward_read_events(self.process.stderr)
else:
self.stdout = self.process.stdout.fileno()
self.stderr = self.process.stderr.fileno()
self.loop.add_handler(self.stdout, self.handle_stdout, self.loop.READ)
self.loop.add_handler(self.stderr, self.handle_stderr, self.loop.READ)
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop)
self.poller.start()
self.notify_start(self.process.pid)
else:
s = 'The process was already started and has state: %r' % self.state
raise ProcessStateError(s)
def stop(self):
return self.interrupt_then_kill()
def signal(self, sig):
if self.state == 'running':
MinRK
improve process cleanup on Windows...
r3778 if WINDOWS and sig != SIGINT:
# use Windows tree-kill for better child cleanup
check_output(['taskkill', '-pid', str(self.process.pid), '-t', '-f'])
else:
self.process.send_signal(sig)
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605
def interrupt_then_kill(self, delay=2.0):
"""Send INT, wait a delay and then send KILL."""
MinRK
improve process cleanup on Windows...
r3778 try:
self.signal(SIGINT)
except Exception:
self.log.debug("interrupt failed")
pass
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop)
self.killer.start()
# callbacks, etc:
def handle_stdout(self, fd, events):
MinRK
forward subprocess IO over zmq on Windows...
r3771 if WINDOWS:
line = self.stdout.recv()
else:
line = self.process.stdout.readline()
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 # a stopped process will be readable but return empty strings
if line:
MinRK
rework logging connections
r3610 self.log.info(line[:-1])
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 else:
self.poll()
def handle_stderr(self, fd, events):
MinRK
forward subprocess IO over zmq on Windows...
r3771 if WINDOWS:
line = self.stderr.recv()
else:
line = self.process.stderr.readline()
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 # a stopped process will be readable but return empty strings
if line:
MinRK
rework logging connections
r3610 self.log.error(line[:-1])
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 else:
self.poll()
def poll(self):
status = self.process.poll()
if status is not None:
self.poller.stop()
MinRK
forward subprocess IO over zmq on Windows...
r3771 self.loop.remove_handler(self.stdout)
self.loop.remove_handler(self.stderr)
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
return status
class LocalControllerLauncher(LocalProcessLauncher):
"""Launch a controller as a regular external process."""
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 controller_cmd = List(ipcontroller_cmd_argv, config=True,
help="""Popen command to launch ipcontroller.""")
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 # Command line arguments to ipcontroller.
MinRK
aliases match flag pattern ('-' as wordsep, not '_')...
r4214 controller_args = List(['--log-to-file','--log-level=%i'%logging.INFO], config=True,
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 help="""command-line args to pass to ipcontroller""")
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605
def find_args(self):
return self.controller_cmd + self.controller_args
MinRK
update parallel apps to use ProfileDir
r3992 def start(self, profile_dir):
"""Start the controller by profile_dir."""
MinRK
aliases match flag pattern ('-' as wordsep, not '_')...
r4214 self.controller_args.extend(['--profile-dir=%s'%profile_dir])
MinRK
update parallel apps to use ProfileDir
r3992 self.profile_dir = unicode(profile_dir)
MinRK
rework logging connections
r3610 self.log.info("Starting LocalControllerLauncher: %r" % self.args)
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 return super(LocalControllerLauncher, self).start()
class LocalEngineLauncher(LocalProcessLauncher):
"""Launch a single engine as a regular externall process."""
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 engine_cmd = List(ipengine_cmd_argv, config=True,
help="""command to launch the Engine.""")
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 # Command line arguments for ipengine.
MinRK
aliases match flag pattern ('-' as wordsep, not '_')...
r4214 engine_args = List(['--log-to-file','--log-level=%i'%logging.INFO], config=True,
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 help="command-line arguments to pass to ipengine"
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 )
def find_args(self):
return self.engine_cmd + self.engine_args
MinRK
update parallel apps to use ProfileDir
r3992 def start(self, profile_dir):
"""Start the engine by profile_dir."""
MinRK
aliases match flag pattern ('-' as wordsep, not '_')...
r4214 self.engine_args.extend(['--profile-dir=%s'%profile_dir])
MinRK
update parallel apps to use ProfileDir
r3992 self.profile_dir = unicode(profile_dir)
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 return super(LocalEngineLauncher, self).start()
class LocalEngineSetLauncher(BaseLauncher):
"""Launch a set of engines as regular external processes."""
# Command line arguments for ipengine.
engine_args = List(
MinRK
aliases match flag pattern ('-' as wordsep, not '_')...
r4214 ['--log-to-file','--log-level=%i'%logging.INFO], config=True,
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 help="command-line arguments to pass to ipengine"
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 )
# launcher class
launcher_class = LocalEngineLauncher
MinRK
adjustments to PBS/SGE, SSH Launchers + docs update
r3647 launchers = Dict()
stop_data = Dict()
MinRK
rework logging connections
r3610 def __init__(self, work_dir=u'.', config=None, **kwargs):
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 super(LocalEngineSetLauncher, self).__init__(
MinRK
rework logging connections
r3610 work_dir=work_dir, config=config, **kwargs
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 )
self.stop_data = {}
MinRK
update parallel apps to use ProfileDir
r3992 def start(self, n, profile_dir):
"""Start n engines by profile or profile_dir."""
self.profile_dir = unicode(profile_dir)
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 dlist = []
for i in range(n):
MinRK
reorganize Factory classes to follow relocation of Session object
r4007 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log)
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 # Copy the engine args over to each engine launcher.
el.engine_args = copy.deepcopy(self.engine_args)
el.on_stop(self._notice_engine_stopped)
MinRK
update parallel apps to use ProfileDir
r3992 d = el.start(profile_dir)
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 if i==0:
MinRK
rework logging connections
r3610 self.log.info("Starting LocalEngineSetLauncher: %r" % el.args)
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 self.launchers[i] = el
dlist.append(d)
self.notify_start(dlist)
# The consumeErrors here could be dangerous
# dfinal = gatherBoth(dlist, consumeErrors=True)
# dfinal.addCallback(self.notify_start)
return dlist
def find_args(self):
return ['engine set']
def signal(self, sig):
dlist = []
for el in self.launchers.itervalues():
d = el.signal(sig)
dlist.append(d)
# dfinal = gatherBoth(dlist, consumeErrors=True)
return dlist
def interrupt_then_kill(self, delay=1.0):
dlist = []
for el in self.launchers.itervalues():
d = el.interrupt_then_kill(delay)
dlist.append(d)
# dfinal = gatherBoth(dlist, consumeErrors=True)
return dlist
def stop(self):
return self.interrupt_then_kill()
def _notice_engine_stopped(self, data):
pid = data['pid']
for idx,el in self.launchers.iteritems():
if el.process.pid == pid:
break
self.launchers.pop(idx)
self.stop_data[idx] = data
if not self.launchers:
self.notify_stop(self.stop_data)
#-----------------------------------------------------------------------------
# MPIExec launchers
#-----------------------------------------------------------------------------
class MPIExecLauncher(LocalProcessLauncher):
"""Launch an external process using mpiexec."""
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 mpi_cmd = List(['mpiexec'], config=True,
help="The mpiexec command to use in starting the process."
)
mpi_args = List([], config=True,
help="The command line arguments to pass to mpiexec."
)
program = List(['date'], config=True,
help="The program to start via mpiexec.")
program_args = List([], config=True,
help="The command line argument to the program."
)
n = Int(1)
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605
def find_args(self):
"""Build self.args using all the fields."""
MinRK
adjustments to PBS/SGE, SSH Launchers + docs update
r3647 return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 self.program + self.program_args
def start(self, n):
"""Start n instances of the program using mpiexec."""
self.n = n
return super(MPIExecLauncher, self).start()
class MPIExecControllerLauncher(MPIExecLauncher):
"""Launch a controller using mpiexec."""
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 controller_cmd = List(ipcontroller_cmd_argv, config=True,
help="Popen command to launch the Contropper"
)
MinRK
aliases match flag pattern ('-' as wordsep, not '_')...
r4214 controller_args = List(['--log-to-file','--log-level=%i'%logging.INFO], config=True,
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 help="Command line arguments to pass to ipcontroller."
)
n = Int(1)
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605
MinRK
update parallel apps to use ProfileDir
r3992 def start(self, profile_dir):
"""Start the controller by profile_dir."""
MinRK
aliases match flag pattern ('-' as wordsep, not '_')...
r4214 self.controller_args.extend(['--profile-dir=%s'%profile_dir])
MinRK
update parallel apps to use ProfileDir
r3992 self.profile_dir = unicode(profile_dir)
MinRK
rework logging connections
r3610 self.log.info("Starting MPIExecControllerLauncher: %r" % self.args)
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 return super(MPIExecControllerLauncher, self).start(1)
def find_args(self):
MinRK
disallow no-prefix `ipython foo=bar` argument style....
r4197 return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 self.controller_cmd + self.controller_args
class MPIExecEngineSetLauncher(MPIExecLauncher):
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 program = List(ipengine_cmd_argv, config=True,
help="Popen command for ipengine"
)
MinRK
adjustments to PBS/SGE, SSH Launchers + docs update
r3647 program_args = List(
MinRK
aliases match flag pattern ('-' as wordsep, not '_')...
r4214 ['--log-to-file','--log-level=%i'%logging.INFO], config=True,
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 help="Command line arguments for ipengine."
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 )
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 n = Int(1)
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605
MinRK
update parallel apps to use ProfileDir
r3992 def start(self, n, profile_dir):
"""Start n engines by profile or profile_dir."""
MinRK
aliases match flag pattern ('-' as wordsep, not '_')...
r4214 self.program_args.extend(['--profile-dir=%s'%profile_dir])
MinRK
update parallel apps to use ProfileDir
r3992 self.profile_dir = unicode(profile_dir)
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 self.n = n
MinRK
rework logging connections
r3610 self.log.info('Starting MPIExecEngineSetLauncher: %r' % self.args)
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 return super(MPIExecEngineSetLauncher, self).start(n)
#-----------------------------------------------------------------------------
# SSH launchers
#-----------------------------------------------------------------------------
MinRK
scrub twisted/deferred references from launchers...
r4019 # TODO: Get SSH Launcher back to level of sshx in 0.10.2
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605
class SSHLauncher(LocalProcessLauncher):
"""A minimal launcher for ssh.
To be useful this will probably have to be extended to use the ``sshx``
idea for environment variables. There could be other things this needs
as well.
"""
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 ssh_cmd = List(['ssh'], config=True,
help="command for starting ssh")
ssh_args = List(['-tt'], config=True,
help="args to pass to ssh")
program = List(['date'], config=True,
help="Program to launch via ssh")
program_args = List([], config=True,
help="args to pass to remote program")
MinRK
cleanup parallel traits...
r3988 hostname = Unicode('', config=True,
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 help="hostname on which to launch the program")
MinRK
cleanup parallel traits...
r3988 user = Unicode('', config=True,
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 help="username for ssh")
MinRK
cleanup parallel traits...
r3988 location = Unicode('', config=True,
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 help="user@hostname location for ssh in one setting")
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605
def _hostname_changed(self, name, old, new):
MinRK
adjustments to PBS/SGE, SSH Launchers + docs update
r3647 if self.user:
MinRK
Update PBS/SGE launchers with 0.10.1 options and defaults
r3659 self.location = u'%s@%s' % (self.user, new)
MinRK
adjustments to PBS/SGE, SSH Launchers + docs update
r3647 else:
self.location = new
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605
def _user_changed(self, name, old, new):
MinRK
Update PBS/SGE launchers with 0.10.1 options and defaults
r3659 self.location = u'%s@%s' % (new, self.hostname)
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605
def find_args(self):
return self.ssh_cmd + self.ssh_args + [self.location] + \
self.program + self.program_args
MinRK
update parallel apps to use ProfileDir
r3992 def start(self, profile_dir, hostname=None, user=None):
self.profile_dir = unicode(profile_dir)
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 if hostname is not None:
self.hostname = hostname
if user is not None:
self.user = user
MinRK
adjustments to PBS/SGE, SSH Launchers + docs update
r3647
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 return super(SSHLauncher, self).start()
MinRK
add ipcluster engines; fix ssh process shutdown
r3615
def signal(self, sig):
if self.state == 'running':
# send escaped ssh connection-closer
self.process.stdin.write('~.')
self.process.stdin.flush()
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605
class SSHControllerLauncher(SSHLauncher):
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 program = List(ipcontroller_cmd_argv, config=True,
help="remote ipcontroller command.")
MinRK
aliases match flag pattern ('-' as wordsep, not '_')...
r4214 program_args = List(['--reuse-files', '--log-to-file','--log-level=%i'%logging.INFO], config=True,
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 help="Command line arguments to ipcontroller.")
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605
class SSHEngineLauncher(SSHLauncher):
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 program = List(ipengine_cmd_argv, config=True,
help="remote ipengine command.")
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 # Command line arguments for ipengine.
program_args = List(
Brian E. Granger
Fixing command line options and help strings to use new syntax....
r4219 ['--log-to-file','--log_level=%i'%logging.INFO], config=True,
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 help="Command line arguments to ipengine."
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 )
class SSHEngineSetLauncher(LocalEngineSetLauncher):
launcher_class = SSHEngineLauncher
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 engines = Dict(config=True,
help="""dict of engines to launch. This is a dict by hostname of ints,
corresponding to the number of engines to start on that host.""")
MinRK
adjustments to PBS/SGE, SSH Launchers + docs update
r3647
MinRK
update parallel apps to use ProfileDir
r3992 def start(self, n, profile_dir):
"""Start engines by profile or profile_dir.
MinRK
adjustments to PBS/SGE, SSH Launchers + docs update
r3647 `n` is ignored, and the `engines` config property is used instead.
"""
MinRK
update parallel apps to use ProfileDir
r3992 self.profile_dir = unicode(profile_dir)
MinRK
adjustments to PBS/SGE, SSH Launchers + docs update
r3647 dlist = []
for host, n in self.engines.iteritems():
if isinstance(n, (tuple, list)):
n, args = n
else:
args = copy.deepcopy(self.engine_args)
if '@' in host:
user,host = host.split('@',1)
else:
user=None
for i in range(n):
MinRK
reorganize Factory classes to follow relocation of Session object
r4007 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log)
MinRK
adjustments to PBS/SGE, SSH Launchers + docs update
r3647
# Copy the engine args over to each engine launcher.
i
el.program_args = args
el.on_stop(self._notice_engine_stopped)
MinRK
update parallel apps to use ProfileDir
r3992 d = el.start(profile_dir, user=user, hostname=host)
MinRK
adjustments to PBS/SGE, SSH Launchers + docs update
r3647 if i==0:
self.log.info("Starting SSHEngineSetLauncher: %r" % el.args)
self.launchers[host+str(i)] = el
dlist.append(d)
self.notify_start(dlist)
return dlist
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605
#-----------------------------------------------------------------------------
# Windows HPC Server 2008 scheduler launchers
#-----------------------------------------------------------------------------
MinRK
untwist PBS, WinHPC Launchers in newparallel
r3613 # This is only used on Windows.
def find_job_cmd():
MinRK
forward subprocess IO over zmq on Windows...
r3771 if WINDOWS:
MinRK
untwist PBS, WinHPC Launchers in newparallel
r3613 try:
return find_cmd('job')
MinRK
handle potentially absent win32api in launcher.py
r3770 except (FindCmdError, ImportError):
# ImportError will be raised if win32api is not installed
MinRK
untwist PBS, WinHPC Launchers in newparallel
r3613 return 'job'
else:
return 'job'
class WindowsHPCLauncher(BaseLauncher):
MinRK
cleanup parallel traits...
r3988 job_id_regexp = Unicode(r'\d+', config=True,
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 help="""A regular expression used to get the job id from the output of the
submit_command. """
)
MinRK
cleanup parallel traits...
r3988 job_file_name = Unicode(u'ipython_job.xml', config=True,
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 help="The filename of the instantiated job script.")
MinRK
untwist PBS, WinHPC Launchers in newparallel
r3613 # The full path to the instantiated job script. This gets made dynamically
# by combining the work_dir with the job_file_name.
MinRK
cleanup parallel traits...
r3988 job_file = Unicode(u'')
scheduler = Unicode('', config=True,
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 help="The hostname of the scheduler to submit the job to.")
MinRK
cleanup parallel traits...
r3988 job_cmd = Unicode(find_job_cmd(), config=True,
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 help="The command for submitting jobs.")
MinRK
untwist PBS, WinHPC Launchers in newparallel
r3613
MinRK
add ipcluster engines; fix ssh process shutdown
r3615 def __init__(self, work_dir=u'.', config=None, **kwargs):
MinRK
untwist PBS, WinHPC Launchers in newparallel
r3613 super(WindowsHPCLauncher, self).__init__(
MinRK
add ipcluster engines; fix ssh process shutdown
r3615 work_dir=work_dir, config=config, **kwargs
MinRK
untwist PBS, WinHPC Launchers in newparallel
r3613 )
@property
def job_file(self):
return os.path.join(self.work_dir, self.job_file_name)
def write_job_file(self, n):
raise NotImplementedError("Implement write_job_file in a subclass.")
def find_args(self):
MinRK
Update PBS/SGE launchers with 0.10.1 options and defaults
r3659 return [u'job.exe']
MinRK
untwist PBS, WinHPC Launchers in newparallel
r3613
def parse_job_id(self, output):
"""Take the output of the submit command and return the job id."""
m = re.search(self.job_id_regexp, output)
if m is not None:
job_id = m.group()
else:
raise LauncherError("Job id couldn't be determined: %s" % output)
self.job_id = job_id
self.log.info('Job started with job id: %r' % job_id)
return job_id
def start(self, n):
"""Start n copies of the process using the Win HPC job scheduler."""
self.write_job_file(n)
args = [
'submit',
'/jobfile:%s' % self.job_file,
'/scheduler:%s' % self.scheduler
]
self.log.info("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
MinRK
scrub twisted/deferred references from launchers...
r4019
MinRK
untwist PBS, WinHPC Launchers in newparallel
r3613 output = check_output([self.job_cmd]+args,
env=os.environ,
cwd=self.work_dir,
stderr=STDOUT
)
job_id = self.parse_job_id(output)
MinRK
adjustments to PBS/SGE, SSH Launchers + docs update
r3647 self.notify_start(job_id)
MinRK
untwist PBS, WinHPC Launchers in newparallel
r3613 return job_id
def stop(self):
args = [
'cancel',
self.job_id,
'/scheduler:%s' % self.scheduler
]
self.log.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
try:
output = check_output([self.job_cmd]+args,
env=os.environ,
cwd=self.work_dir,
stderr=STDOUT
)
except:
output = 'The job already appears to be stoppped: %r' % self.job_id
MinRK
adjustments to PBS/SGE, SSH Launchers + docs update
r3647 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
MinRK
untwist PBS, WinHPC Launchers in newparallel
r3613 return output
class WindowsHPCControllerLauncher(WindowsHPCLauncher):
MinRK
cleanup parallel traits...
r3988 job_file_name = Unicode(u'ipcontroller_job.xml', config=True,
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 help="WinHPC xml job file.")
extra_args = List([], config=False,
help="extra args to pass to ipcontroller")
MinRK
untwist PBS, WinHPC Launchers in newparallel
r3613
def write_job_file(self, n):
job = IPControllerJob(config=self.config)
t = IPControllerTask(config=self.config)
# The tasks work directory is *not* the actual work directory of
# the controller. It is used as the base path for the stdout/stderr
# files that the scheduler redirects to.
MinRK
update parallel apps to use ProfileDir
r3992 t.work_directory = self.profile_dir
# Add the profile_dir and from self.start().
MinRK
untwist PBS, WinHPC Launchers in newparallel
r3613 t.controller_args.extend(self.extra_args)
job.add_task(t)
self.log.info("Writing job description file: %s" % self.job_file)
job.write(self.job_file)
@property
def job_file(self):
MinRK
update parallel apps to use ProfileDir
r3992 return os.path.join(self.profile_dir, self.job_file_name)
MinRK
untwist PBS, WinHPC Launchers in newparallel
r3613
MinRK
update parallel apps to use ProfileDir
r3992 def start(self, profile_dir):
"""Start the controller by profile_dir."""
MinRK
aliases match flag pattern ('-' as wordsep, not '_')...
r4214 self.extra_args = ['--profile-dir=%s'%profile_dir]
MinRK
update parallel apps to use ProfileDir
r3992 self.profile_dir = unicode(profile_dir)
MinRK
untwist PBS, WinHPC Launchers in newparallel
r3613 return super(WindowsHPCControllerLauncher, self).start(1)
class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):
MinRK
cleanup parallel traits...
r3988 job_file_name = Unicode(u'ipengineset_job.xml', config=True,
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 help="jobfile for ipengines job")
extra_args = List([], config=False,
help="extra args to pas to ipengine")
MinRK
untwist PBS, WinHPC Launchers in newparallel
r3613
def write_job_file(self, n):
job = IPEngineSetJob(config=self.config)
for i in range(n):
t = IPEngineTask(config=self.config)
# The tasks work directory is *not* the actual work directory of
# the engine. It is used as the base path for the stdout/stderr
# files that the scheduler redirects to.
MinRK
update parallel apps to use ProfileDir
r3992 t.work_directory = self.profile_dir
# Add the profile_dir and from self.start().
MinRK
untwist PBS, WinHPC Launchers in newparallel
r3613 t.engine_args.extend(self.extra_args)
job.add_task(t)
self.log.info("Writing job description file: %s" % self.job_file)
job.write(self.job_file)
@property
def job_file(self):
MinRK
update parallel apps to use ProfileDir
r3992 return os.path.join(self.profile_dir, self.job_file_name)
MinRK
untwist PBS, WinHPC Launchers in newparallel
r3613
MinRK
update parallel apps to use ProfileDir
r3992 def start(self, n, profile_dir):
"""Start the controller by profile_dir."""
MinRK
aliases match flag pattern ('-' as wordsep, not '_')...
r4214 self.extra_args = ['--profile-dir=%s'%profile_dir]
MinRK
update parallel apps to use ProfileDir
r3992 self.profile_dir = unicode(profile_dir)
MinRK
untwist PBS, WinHPC Launchers in newparallel
r3613 return super(WindowsHPCEngineSetLauncher, self).start(n)
#-----------------------------------------------------------------------------
# Batch (PBS) system launchers
#-----------------------------------------------------------------------------
class BatchSystemLauncher(BaseLauncher):
"""Launch an external process using a batch system.
This class is designed to work with UNIX batch systems like PBS, LSF,
GridEngine, etc. The overall model is that there are different commands
like qsub, qdel, etc. that handle the starting and stopping of the process.
This class also has the notion of a batch script. The ``batch_template``
attribute can be set to a string that is a template for the batch script.
Thomas Kluyver
Replace Itpl with str.format in parallel launcher....
r4003 This template is instantiated using string formatting. Thus the template can
use {n} fot the number of instances. Subclasses can add additional variables
MinRK
untwist PBS, WinHPC Launchers in newparallel
r3613 to the template dict.
"""
# Subclasses must fill these in. See PBSEngineSet
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 submit_command = List([''], config=True,
help="The name of the command line program used to submit jobs.")
delete_command = List([''], config=True,
help="The name of the command line program used to delete jobs.")
MinRK
cleanup parallel traits...
r3988 job_id_regexp = Unicode('', config=True,
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 help="""A regular expression used to get the job id from the output of the
submit_command.""")
MinRK
cleanup parallel traits...
r3988 batch_template = Unicode('', config=True,
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 help="The string that is the batch script template itself.")
MinRK
cleanup parallel traits...
r3988 batch_template_file = Unicode(u'', config=True,
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 help="The file that contains the batch template.")
MinRK
cleanup parallel traits...
r3988 batch_file_name = Unicode(u'batch_script', config=True,
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 help="The filename of the instantiated batch script.")
MinRK
cleanup parallel traits...
r3988 queue = Unicode(u'', config=True,
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 help="The PBS Queue.")
MinRK
Update PBS/SGE launchers with 0.10.1 options and defaults
r3659
# not configurable, override in subclasses
# PBS Job Array regex
MinRK
cleanup parallel traits...
r3988 job_array_regexp = Unicode('')
job_array_template = Unicode('')
MinRK
Update PBS/SGE launchers with 0.10.1 options and defaults
r3659 # PBS Queue regex
MinRK
cleanup parallel traits...
r3988 queue_regexp = Unicode('')
queue_template = Unicode('')
MinRK
Update PBS/SGE launchers with 0.10.1 options and defaults
r3659 # The default batch template, override in subclasses
MinRK
cleanup parallel traits...
r3988 default_template = Unicode('')
MinRK
untwist PBS, WinHPC Launchers in newparallel
r3613 # The full path to the instantiated batch script.
MinRK
cleanup parallel traits...
r3988 batch_file = Unicode(u'')
MinRK
launcher updates for PBS
r3645 # the format dict used with batch_template:
context = Dict()
MinRK
add EvalFormatter for batch system (PBS) launcher templates...
r4004 # the Formatter instance for rendering the templates:
formatter = Instance(EvalFormatter, (), {})
MinRK
untwist PBS, WinHPC Launchers in newparallel
r3613
MinRK
launcher updates for PBS
r3645
def find_args(self):
MinRK
Update PBS/SGE launchers with 0.10.1 options and defaults
r3659 return self.submit_command + [self.batch_file]
MinRK
launcher updates for PBS
r3645
MinRK
add ipcluster engines; fix ssh process shutdown
r3615 def __init__(self, work_dir=u'.', config=None, **kwargs):
MinRK
untwist PBS, WinHPC Launchers in newparallel
r3613 super(BatchSystemLauncher, self).__init__(
MinRK
add ipcluster engines; fix ssh process shutdown
r3615 work_dir=work_dir, config=config, **kwargs
MinRK
untwist PBS, WinHPC Launchers in newparallel
r3613 )
self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
def parse_job_id(self, output):
"""Take the output of the submit command and return the job id."""
MinRK
adjustments to PBS/SGE, SSH Launchers + docs update
r3647 m = re.search(self.job_id_regexp, output)
MinRK
untwist PBS, WinHPC Launchers in newparallel
r3613 if m is not None:
job_id = m.group()
else:
raise LauncherError("Job id couldn't be determined: %s" % output)
self.job_id = job_id
MinRK
adjustments to PBS/SGE, SSH Launchers + docs update
r3647 self.log.info('Job submitted with job id: %r' % job_id)
MinRK
untwist PBS, WinHPC Launchers in newparallel
r3613 return job_id
def write_batch_script(self, n):
"""Instantiate and write the batch script to the work_dir."""
self.context['n'] = n
MinRK
Update PBS/SGE launchers with 0.10.1 options and defaults
r3659 self.context['queue'] = self.queue
# first priority is batch_template if set
if self.batch_template_file and not self.batch_template:
# second priority is batch_template_file
with open(self.batch_template_file) as f:
self.batch_template = f.read()
if not self.batch_template:
# third (last) priority is default_template
self.batch_template = self.default_template
MinRK
don't automatically add jobarray or queue lines to user template...
r4183 # add jobarray or queue lines to user-specified template
# note that this is *only* when user did not specify a template.
regex = re.compile(self.job_array_regexp)
# print regex.search(self.batch_template)
if not regex.search(self.batch_template):
self.log.info("adding job array settings to batch script")
firstline, rest = self.batch_template.split('\n',1)
self.batch_template = u'\n'.join([firstline, self.job_array_template, rest])
MinRK
Update PBS/SGE launchers with 0.10.1 options and defaults
r3659
MinRK
don't automatically add jobarray or queue lines to user template...
r4183 regex = re.compile(self.queue_regexp)
# print regex.search(self.batch_template)
if self.queue and not regex.search(self.batch_template):
self.log.info("adding PBS queue settings to batch script")
firstline, rest = self.batch_template.split('\n',1)
self.batch_template = u'\n'.join([firstline, self.queue_template, rest])
MinRK
Update PBS/SGE launchers with 0.10.1 options and defaults
r3659
MinRK
add EvalFormatter for batch system (PBS) launcher templates...
r4004 script_as_string = self.formatter.format(self.batch_template, **self.context)
MinRK
untwist PBS, WinHPC Launchers in newparallel
r3613 self.log.info('Writing instantiated batch script: %s' % self.batch_file)
MinRK
Update PBS/SGE launchers with 0.10.1 options and defaults
r3659
with open(self.batch_file, 'w') as f:
f.write(script_as_string)
os.chmod(self.batch_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
MinRK
untwist PBS, WinHPC Launchers in newparallel
r3613
MinRK
update parallel apps to use ProfileDir
r3992 def start(self, n, profile_dir):
MinRK
untwist PBS, WinHPC Launchers in newparallel
r3613 """Start n copies of the process using a batch system."""
Thomas Kluyver
Replace Itpl with str.format in parallel launcher....
r4003 # Here we save profile_dir in the context so they
# can be used in the batch script template as {profile_dir}
MinRK
update parallel apps to use ProfileDir
r3992 self.context['profile_dir'] = profile_dir
self.profile_dir = unicode(profile_dir)
MinRK
untwist PBS, WinHPC Launchers in newparallel
r3613 self.write_batch_script(n)
MinRK
adjustments to PBS/SGE, SSH Launchers + docs update
r3647 output = check_output(self.args, env=os.environ)
MinRK
untwist PBS, WinHPC Launchers in newparallel
r3613 job_id = self.parse_job_id(output)
MinRK
adjustments to PBS/SGE, SSH Launchers + docs update
r3647 self.notify_start(job_id)
MinRK
untwist PBS, WinHPC Launchers in newparallel
r3613 return job_id
def stop(self):
MinRK
Update PBS/SGE launchers with 0.10.1 options and defaults
r3659 output = check_output(self.delete_command+[self.job_id], env=os.environ)
MinRK
adjustments to PBS/SGE, SSH Launchers + docs update
r3647 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
MinRK
untwist PBS, WinHPC Launchers in newparallel
r3613 return output
class PBSLauncher(BatchSystemLauncher):
"""A BatchSystemLauncher subclass for PBS."""
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 submit_command = List(['qsub'], config=True,
help="The PBS submit command ['qsub']")
delete_command = List(['qdel'], config=True,
help="The PBS delete command ['qsub']")
MinRK
cleanup parallel traits...
r3988 job_id_regexp = Unicode(r'\d+', config=True,
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 help="Regular expresion for identifying the job ID [r'\d+']")
MinRK
Update PBS/SGE launchers with 0.10.1 options and defaults
r3659
MinRK
cleanup parallel traits...
r3988 batch_file = Unicode(u'')
job_array_regexp = Unicode('#PBS\W+-t\W+[\w\d\-\$]+')
Thomas Kluyver
Replace Itpl with str.format in parallel launcher....
r4003 job_array_template = Unicode('#PBS -t 1-{n}')
MinRK
cleanup parallel traits...
r3988 queue_regexp = Unicode('#PBS\W+-q\W+\$?\w+')
Thomas Kluyver
Replace Itpl with str.format in parallel launcher....
r4003 queue_template = Unicode('#PBS -q {queue}')
MinRK
untwist PBS, WinHPC Launchers in newparallel
r3613
class PBSControllerLauncher(PBSLauncher):
"""Launch a controller using PBS."""
MinRK
cleanup parallel traits...
r3988 batch_file_name = Unicode(u'pbs_controller', config=True,
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 help="batch file name for the controller job.")
MinRK
cleanup parallel traits...
r3988 default_template= Unicode("""#!/bin/sh
MinRK
Update PBS/SGE launchers with 0.10.1 options and defaults
r3659 #PBS -V
MinRK
rebase IPython.parallel after removal of IPython.kernel...
r3672 #PBS -N ipcontroller
MinRK
aliases match flag pattern ('-' as wordsep, not '_')...
r4214 %s --log-to-file --profile-dir={profile_dir}
MinRK
rebase IPython.parallel after removal of IPython.kernel...
r3672 """%(' '.join(ipcontroller_cmd_argv)))
MinRK
untwist PBS, WinHPC Launchers in newparallel
r3613
MinRK
update parallel apps to use ProfileDir
r3992 def start(self, profile_dir):
"""Start the controller by profile or profile_dir."""
MinRK
untwist PBS, WinHPC Launchers in newparallel
r3613 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
MinRK
update parallel apps to use ProfileDir
r3992 return super(PBSControllerLauncher, self).start(1, profile_dir)
MinRK
untwist PBS, WinHPC Launchers in newparallel
r3613
class PBSEngineSetLauncher(PBSLauncher):
MinRK
Update PBS/SGE launchers with 0.10.1 options and defaults
r3659 """Launch Engines using PBS"""
MinRK
cleanup parallel traits...
r3988 batch_file_name = Unicode(u'pbs_engines', config=True,
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 help="batch file name for the engine(s) job.")
MinRK
cleanup parallel traits...
r3988 default_template= Unicode(u"""#!/bin/sh
MinRK
Update PBS/SGE launchers with 0.10.1 options and defaults
r3659 #PBS -V
MinRK
rebase IPython.parallel after removal of IPython.kernel...
r3672 #PBS -N ipengine
MinRK
aliases match flag pattern ('-' as wordsep, not '_')...
r4214 %s --profile-dir={profile_dir}
MinRK
rebase IPython.parallel after removal of IPython.kernel...
r3672 """%(' '.join(ipengine_cmd_argv)))
MinRK
untwist PBS, WinHPC Launchers in newparallel
r3613
MinRK
update parallel apps to use ProfileDir
r3992 def start(self, n, profile_dir):
"""Start n engines by profile or profile_dir."""
MinRK
SGE test related fixes...
r3668 self.log.info('Starting %i engines with PBSEngineSetLauncher: %r' % (n, self.args))
MinRK
update parallel apps to use ProfileDir
r3992 return super(PBSEngineSetLauncher, self).start(n, profile_dir)
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605
MinRK
Update PBS/SGE launchers with 0.10.1 options and defaults
r3659 #SGE is very similar to PBS
class SGELauncher(PBSLauncher):
"""Sun GridEngine is a PBS clone with slightly different syntax"""
Thomas Kluyver
Replace Itpl with str.format in parallel launcher....
r4003 job_array_regexp = Unicode('#\$\W+\-t')
job_array_template = Unicode('#$ -t 1-{n}')
queue_regexp = Unicode('#\$\W+-q\W+\$?\w+')
MinRK
fix remaining Itpl syntax in SGE queue_template...
r4089 queue_template = Unicode('#$ -q {queue}')
MinRK
Update PBS/SGE launchers with 0.10.1 options and defaults
r3659
class SGEControllerLauncher(SGELauncher):
"""Launch a controller using SGE."""
MinRK
cleanup parallel traits...
r3988 batch_file_name = Unicode(u'sge_controller', config=True,
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 help="batch file name for the ipontroller job.")
Thomas Kluyver
Replace Itpl with str.format in parallel launcher....
r4003 default_template= Unicode(u"""#$ -V
#$ -S /bin/sh
#$ -N ipcontroller
MinRK
aliases match flag pattern ('-' as wordsep, not '_')...
r4214 %s --log-to-file --profile-dir={profile_dir}
MinRK
rebase IPython.parallel after removal of IPython.kernel...
r3672 """%(' '.join(ipcontroller_cmd_argv)))
MinRK
Update PBS/SGE launchers with 0.10.1 options and defaults
r3659
MinRK
update parallel apps to use ProfileDir
r3992 def start(self, profile_dir):
"""Start the controller by profile or profile_dir."""
MinRK
Update PBS/SGE launchers with 0.10.1 options and defaults
r3659 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
MinRK
don't pass profile_dir as kwarg in ipclusterapp...
r4001 return super(SGEControllerLauncher, self).start(1, profile_dir)
MinRK
Update PBS/SGE launchers with 0.10.1 options and defaults
r3659
class SGEEngineSetLauncher(SGELauncher):
"""Launch Engines with SGE"""
MinRK
cleanup parallel traits...
r3988 batch_file_name = Unicode(u'sge_engines', config=True,
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 help="batch file name for the engine(s) job.")
Thomas Kluyver
Replace Itpl with str.format in parallel launcher....
r4003 default_template = Unicode("""#$ -V
#$ -S /bin/sh
#$ -N ipengine
MinRK
aliases match flag pattern ('-' as wordsep, not '_')...
r4214 %s --profile-dir={profile_dir}
MinRK
rebase IPython.parallel after removal of IPython.kernel...
r3672 """%(' '.join(ipengine_cmd_argv)))
MinRK
Update PBS/SGE launchers with 0.10.1 options and defaults
r3659
MinRK
update parallel apps to use ProfileDir
r3992 def start(self, n, profile_dir):
"""Start n engines by profile or profile_dir."""
MinRK
SGE test related fixes...
r3668 self.log.info('Starting %i engines with SGEEngineSetLauncher: %r' % (n, self.args))
MinRK
update parallel apps to use ProfileDir
r3992 return super(SGEEngineSetLauncher, self).start(n, profile_dir)
MinRK
Update PBS/SGE launchers with 0.10.1 options and defaults
r3659
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605
Johann Cohen-Tanugi
add LSF launcher for ipcluster
r4229 # LSF launchers
class LSFLauncher(BatchSystemLauncher):
"""A BatchSystemLauncher subclass for LSF."""
submit_command = List(['bsub'], config=True,
help="The PBS submit command ['bsub']")
delete_command = List(['bkill'], config=True,
help="The PBS delete command ['bkill']")
job_id_regexp = Unicode(r'\d+', config=True,
help="Regular expresion for identifying the job ID [r'\d+']")
batch_file = Unicode(u'')
job_array_regexp = Unicode('#BSUB[ \t]-J+\w+\[\d+-\d+\]')
job_array_template = Unicode('#BSUB -J ipengine[1-{n}]')
queue_regexp = Unicode('#BSUB[ \t]+-q[ \t]+\w+')
queue_template = Unicode('#BSUB -q {queue}')
def start(self, n, profile_dir):
"""Start n copies of the process using LSF batch system.
This cant inherit from the base class because bsub expects
to be piped a shell script in order to honor the #BSUB directives :
bsub < script
"""
# Here we save profile_dir in the context so they
# can be used in the batch script template as {profile_dir}
self.context['profile_dir'] = profile_dir
self.profile_dir = unicode(profile_dir)
self.write_batch_script(n)
#output = check_output(self.args, env=os.environ)
piped_cmd = self.args[0]+'<\"'+self.args[1]+'\"'
p = Popen(piped_cmd, shell=True,env=os.environ,stdout=PIPE)
output,err = p.communicate()
job_id = self.parse_job_id(output)
self.notify_start(job_id)
return job_id
class LSFControllerLauncher(LSFLauncher):
"""Launch a controller using LSF."""
batch_file_name = Unicode(u'lsf_controller', config=True,
help="batch file name for the controller job.")
default_template= Unicode("""#!/bin/sh
#BSUB -J ipcontroller
#BSUB -oo ipcontroller.o.%%J
#BSUB -eo ipcontroller.e.%%J
%s --log-to-file --profile-dir={profile_dir}
"""%(' '.join(ipcontroller_cmd_argv)))
def start(self, profile_dir):
"""Start the controller by profile or profile_dir."""
self.log.info("Starting LSFControllerLauncher: %r" % self.args)
return super(LSFControllerLauncher, self).start(1, profile_dir)
class LSFEngineSetLauncher(LSFLauncher):
"""Launch Engines using LSF"""
batch_file_name = Unicode(u'lsf_engines', config=True,
help="batch file name for the engine(s) job.")
default_template= Unicode(u"""#!/bin/sh
#BSUB -oo ipengine.o.%%J
#BSUB -eo ipengine.e.%%J
%s --profile-dir={profile_dir}
"""%(' '.join(ipengine_cmd_argv)))
def start(self, n, profile_dir):
"""Start n engines by profile or profile_dir."""
self.log.info('Starting %i engines with LSFEngineSetLauncher: %r' % (n, self.args))
return super(LSFEngineSetLauncher, self).start(n, profile_dir)
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 #-----------------------------------------------------------------------------
# A launcher for ipcluster itself!
#-----------------------------------------------------------------------------
class IPClusterLauncher(LocalProcessLauncher):
"""Launch the ipcluster program in an external process."""
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 ipcluster_cmd = List(ipcluster_cmd_argv, config=True,
help="Popen command for ipcluster")
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 ipcluster_args = List(
MinRK
aliases match flag pattern ('-' as wordsep, not '_')...
r4214 ['--clean-logs', '--log-to-file', '--log-level=%i'%logging.INFO], config=True,
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 help="Command line arguments to pass to ipcluster.")
MinRK
cleanup parallel traits...
r3988 ipcluster_subcommand = Unicode('start')
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 ipcluster_n = Int(2)
def find_args(self):
MinRK
disallow no-prefix `ipython foo=bar` argument style....
r4197 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
['--n=%i'%self.ipcluster_n] + self.ipcluster_args
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605
def start(self):
MinRK
rework logging connections
r3610 self.log.info("Starting ipcluster: %r" % self.args)
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 return super(IPClusterLauncher, self).start()
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 #-----------------------------------------------------------------------------
# Collections of launchers
#-----------------------------------------------------------------------------
local_launchers = [
LocalControllerLauncher,
LocalEngineLauncher,
LocalEngineSetLauncher,
]
mpi_launchers = [
MPIExecLauncher,
MPIExecControllerLauncher,
MPIExecEngineSetLauncher,
]
ssh_launchers = [
SSHLauncher,
SSHControllerLauncher,
SSHEngineLauncher,
SSHEngineSetLauncher,
]
winhpc_launchers = [
WindowsHPCLauncher,
WindowsHPCControllerLauncher,
WindowsHPCEngineSetLauncher,
]
pbs_launchers = [
PBSLauncher,
PBSControllerLauncher,
PBSEngineSetLauncher,
]
sge_launchers = [
SGELauncher,
SGEControllerLauncher,
SGEEngineSetLauncher,
]
Johann Cohen-Tanugi
add LSF launcher for ipcluster
r4229 lsf_launchers = [
LSFLauncher,
LSFControllerLauncher,
LSFEngineSetLauncher,
]
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 all_launchers = local_launchers + mpi_launchers + ssh_launchers + winhpc_launchers\
Johann Cohen-Tanugi
add LSF launcher for ipcluster
r4229 + pbs_launchers + sge_launchers + lsf_launchers
MinRK
scrub twisted/deferred references from launchers...
r4019