ipcluster.py
814 lines
| 26.2 KiB
| text/x-python
|
PythonLexer
Brian Granger
|
r1943 | #!/usr/bin/env python | ||
Brian Granger
|
r1774 | # encoding: utf-8 | ||
"""Start an IPython cluster = (controller + engines).""" | ||||
#----------------------------------------------------------------------------- | ||||
# Copyright (C) 2008 The IPython Development Team | ||||
# | ||||
# Distributed under the terms of the BSD License. The full license is in | ||||
# the file COPYING, distributed as part of this software. | ||||
#----------------------------------------------------------------------------- | ||||
#----------------------------------------------------------------------------- | ||||
# Imports | ||||
#----------------------------------------------------------------------------- | ||||
Brian E Granger
|
r1234 | import os | ||
Brian Granger
|
r1770 | import re | ||
Brian E Granger
|
r1234 | import sys | ||
Brian Granger
|
r1770 | import signal | ||
Brian Granger
|
r1830 | import tempfile | ||
Brian Granger
|
r1770 | pjoin = os.path.join | ||
from twisted.internet import reactor, defer | ||||
from twisted.internet.protocol import ProcessProtocol | ||||
from twisted.internet.error import ProcessDone, ProcessTerminated | ||||
from twisted.internet.utils import getProcessOutput | ||||
Brian Granger
|
r1826 | from twisted.python import failure, log | ||
Brian Granger
|
r1770 | |||
from IPython.external import argparse | ||||
from IPython.external import Itpl | ||||
Brian Granger
|
r1945 | from IPython.genutils import get_ipython_dir, get_log_dir, get_security_dir | ||
from IPython.genutils import num_cpus | ||||
Administrator
|
r1815 | from IPython.kernel.fcutil import have_crypto | ||
Brian Granger
|
r1945 | |||
# Create various ipython directories if they don't exist. | ||||
# This must be done before IPython.kernel.config is imported. | ||||
from IPython.iplib import user_setup | ||||
if os.name == 'posix': | ||||
rc_suffix = '' | ||||
else: | ||||
rc_suffix = '.ini' | ||||
user_setup(get_ipython_dir(), rc_suffix, mode='install', interactive=False) | ||||
get_log_dir() | ||||
get_security_dir() | ||||
Brian Granger
|
r1944 | from IPython.kernel.config import config_manager as kernel_config_manager | ||
from IPython.kernel.error import SecurityError, FileTimeoutError | ||||
Brian Granger
|
r1826 | from IPython.kernel.fcutil import have_crypto | ||
Brian Granger
|
r1944 | from IPython.kernel.twistedutil import gatherBoth, wait_for_file | ||
Brian Granger
|
r1826 | from IPython.kernel.util import printer | ||
Brian E Granger
|
r1234 | |||
Brian Granger
|
r1774 | #----------------------------------------------------------------------------- | ||
# General process handling code | ||||
#----------------------------------------------------------------------------- | ||||
Brian Granger
|
r1771 | def find_exe(cmd): | ||
try: | ||||
import win32api | ||||
except ImportError: | ||||
raise ImportError('you need to have pywin32 installed for this to work') | ||||
else: | ||||
Brian Granger
|
r1826 | try: | ||
(path, offest) = win32api.SearchPath(os.environ['PATH'],cmd + '.exe') | ||||
except: | ||||
(path, offset) = win32api.SearchPath(os.environ['PATH'],cmd + '.bat') | ||||
Administrator
|
r1814 | return path | ||
Brian E Granger
|
r1234 | |||
Brian Granger
|
r1770 | class ProcessStateError(Exception): | ||
pass | ||||
Brian E Granger
|
r1234 | |||
Brian Granger
|
r1770 | class UnknownStatus(Exception): | ||
pass | ||||
Brian E Granger
|
r1234 | |||
Brian Granger
|
r1770 | class LauncherProcessProtocol(ProcessProtocol): | ||
""" | ||||
A ProcessProtocol to go with the ProcessLauncher. | ||||
""" | ||||
def __init__(self, process_launcher): | ||||
self.process_launcher = process_launcher | ||||
Brian E Granger
|
r1234 | |||
Brian Granger
|
r1770 | def connectionMade(self): | ||
self.process_launcher.fire_start_deferred(self.transport.pid) | ||||
Brian E Granger
|
r1234 | |||
Brian Granger
|
r1770 | def processEnded(self, status): | ||
value = status.value | ||||
if isinstance(value, ProcessDone): | ||||
self.process_launcher.fire_stop_deferred(0) | ||||
elif isinstance(value, ProcessTerminated): | ||||
self.process_launcher.fire_stop_deferred( | ||||
{'exit_code':value.exitCode, | ||||
'signal':value.signal, | ||||
'status':value.status | ||||
} | ||||
) | ||||
else: | ||||
raise UnknownStatus("unknown exit status, this is probably a bug in Twisted") | ||||
Brian Granger
|
r1830 | |||
Brian Granger
|
r1770 | def outReceived(self, data): | ||
log.msg(data) | ||||
Brian Granger
|
r1830 | |||
Brian Granger
|
r1770 | def errReceived(self, data): | ||
log.err(data) | ||||
Brian E Granger
|
r1234 | |||
Brian Granger
|
r1770 | class ProcessLauncher(object): | ||
""" | ||||
Start and stop an external process in an asynchronous manner. | ||||
Brian E Granger
|
r1234 | |||
Brian Granger
|
r1770 | Currently this uses deferreds to notify other parties of process state | ||
changes. This is an awkward design and should be moved to using | ||||
a formal NotificationCenter. | ||||
""" | ||||
def __init__(self, cmd_and_args): | ||||
self.cmd = cmd_and_args[0] | ||||
self.args = cmd_and_args | ||||
self._reset() | ||||
Brian E Granger
|
r1234 | |||
Brian Granger
|
r1770 | def _reset(self): | ||
self.process_protocol = None | ||||
self.pid = None | ||||
self.start_deferred = None | ||||
self.stop_deferreds = [] | ||||
self.state = 'before' # before, running, or after | ||||
Brian E Granger
|
r1234 | |||
Brian Granger
|
r1770 | @property | ||
def running(self): | ||||
if self.state == 'running': | ||||
return True | ||||
else: | ||||
return False | ||||
Brian E Granger
|
r1234 | |||
Brian Granger
|
r1770 | def fire_start_deferred(self, pid): | ||
self.pid = pid | ||||
self.state = 'running' | ||||
log.msg('Process %r has started with pid=%i' % (self.args, pid)) | ||||
self.start_deferred.callback(pid) | ||||
def start(self): | ||||
if self.state == 'before': | ||||
self.process_protocol = LauncherProcessProtocol(self) | ||||
self.start_deferred = defer.Deferred() | ||||
self.process_transport = reactor.spawnProcess( | ||||
self.process_protocol, | ||||
self.cmd, | ||||
self.args, | ||||
env=os.environ | ||||
) | ||||
return self.start_deferred | ||||
Brian E Granger
|
r1234 | else: | ||
Brian Granger
|
r1770 | s = 'the process has already been started and has state: %r' % \ | ||
self.state | ||||
return defer.fail(ProcessStateError(s)) | ||||
def get_stop_deferred(self): | ||||
if self.state == 'running' or self.state == 'before': | ||||
d = defer.Deferred() | ||||
self.stop_deferreds.append(d) | ||||
return d | ||||
else: | ||||
s = 'this process is already complete' | ||||
return defer.fail(ProcessStateError(s)) | ||||
def fire_stop_deferred(self, exit_code): | ||||
log.msg('Process %r has stopped with %r' % (self.args, exit_code)) | ||||
self.state = 'after' | ||||
for d in self.stop_deferreds: | ||||
d.callback(exit_code) | ||||
def signal(self, sig): | ||||
""" | ||||
Send a signal to the process. | ||||
The argument sig can be ('KILL','INT', etc.) or any signal number. | ||||
""" | ||||
if self.state == 'running': | ||||
self.process_transport.signalProcess(sig) | ||||
Brian Granger
|
r1771 | # def __del__(self): | ||
# self.signal('KILL') | ||||
Brian Granger
|
r1610 | |||
Brian Granger
|
r1770 | def interrupt_then_kill(self, delay=1.0): | ||
self.signal('INT') | ||||
reactor.callLater(delay, self.signal, 'KILL') | ||||
Brian Granger
|
r1610 | |||
Brian Granger
|
r1774 | #----------------------------------------------------------------------------- | ||
# Code for launching controller and engines | ||||
#----------------------------------------------------------------------------- | ||||
Brian Granger
|
r1770 | class ControllerLauncher(ProcessLauncher): | ||
Brian E Granger
|
r1234 | |||
Brian Granger
|
r1770 | def __init__(self, extra_args=None): | ||
Brian Granger
|
r1771 | if sys.platform == 'win32': | ||
Administrator
|
r1815 | # This logic is needed because the ipcontroller script doesn't | ||
# always get installed in the same way or in the same location. | ||||
from IPython.kernel.scripts import ipcontroller | ||||
script_location = ipcontroller.__file__.replace('.pyc', '.py') | ||||
# The -u option here turns on unbuffered output, which is required | ||||
Brian Granger
|
r1943 | # on Win32 to prevent wierd conflict and problems with Twisted. | ||
# Also, use sys.executable to make sure we are picking up the | ||||
# right python exe. | ||||
args = [sys.executable, '-u', script_location] | ||||
Brian Granger
|
r1771 | else: | ||
args = ['ipcontroller'] | ||||
Brian Granger
|
r1770 | self.extra_args = extra_args | ||
if extra_args is not None: | ||||
Brian Granger
|
r1771 | args.extend(extra_args) | ||
Brian Granger
|
r1770 | |||
Brian Granger
|
r1771 | ProcessLauncher.__init__(self, args) | ||
Brian E Granger
|
r1234 | |||
Brian Granger
|
r1770 | class EngineLauncher(ProcessLauncher): | ||
Brian E Granger
|
r1234 | |||
Brian Granger
|
r1770 | def __init__(self, extra_args=None): | ||
Brian Granger
|
r1771 | if sys.platform == 'win32': | ||
Administrator
|
r1815 | # This logic is needed because the ipcontroller script doesn't | ||
# always get installed in the same way or in the same location. | ||||
from IPython.kernel.scripts import ipengine | ||||
script_location = ipengine.__file__.replace('.pyc', '.py') | ||||
# The -u option here turns on unbuffered output, which is required | ||||
Brian Granger
|
r1943 | # on Win32 to prevent wierd conflict and problems with Twisted. | ||
# Also, use sys.executable to make sure we are picking up the | ||||
# right python exe. | ||||
args = [sys.executable, '-u', script_location] | ||||
Brian Granger
|
r1771 | else: | ||
args = ['ipengine'] | ||||
Brian Granger
|
r1770 | self.extra_args = extra_args | ||
if extra_args is not None: | ||||
Brian Granger
|
r1771 | args.extend(extra_args) | ||
Brian Granger
|
r1770 | |||
Brian Granger
|
r1771 | ProcessLauncher.__init__(self, args) | ||
Brian E Granger
|
r1234 | |||
Brian Granger
|
r1770 | class LocalEngineSet(object): | ||
def __init__(self, extra_args=None): | ||||
self.extra_args = extra_args | ||||
self.launchers = [] | ||||
def start(self, n): | ||||
dlist = [] | ||||
for i in range(n): | ||||
el = EngineLauncher(extra_args=self.extra_args) | ||||
d = el.start() | ||||
self.launchers.append(el) | ||||
dlist.append(d) | ||||
dfinal = gatherBoth(dlist, consumeErrors=True) | ||||
dfinal.addCallback(self._handle_start) | ||||
return dfinal | ||||
def _handle_start(self, r): | ||||
log.msg('Engines started with pids: %r' % r) | ||||
return r | ||||
def _handle_stop(self, r): | ||||
log.msg('Engines received signal: %r' % r) | ||||
return r | ||||
def signal(self, sig): | ||||
dlist = [] | ||||
for el in self.launchers: | ||||
d = el.get_stop_deferred() | ||||
dlist.append(d) | ||||
el.signal(sig) | ||||
dfinal = gatherBoth(dlist, consumeErrors=True) | ||||
dfinal.addCallback(self._handle_stop) | ||||
return dfinal | ||||
def interrupt_then_kill(self, delay=1.0): | ||||
dlist = [] | ||||
for el in self.launchers: | ||||
d = el.get_stop_deferred() | ||||
dlist.append(d) | ||||
el.interrupt_then_kill(delay) | ||||
dfinal = gatherBoth(dlist, consumeErrors=True) | ||||
dfinal.addCallback(self._handle_stop) | ||||
return dfinal | ||||
class BatchEngineSet(object): | ||||
# Subclasses must fill these in. See PBSEngineSet | ||||
submit_command = '' | ||||
delete_command = '' | ||||
job_id_regexp = '' | ||||
def __init__(self, template_file, **kwargs): | ||||
self.template_file = template_file | ||||
self.context = {} | ||||
self.context.update(kwargs) | ||||
Brian Granger
|
r1773 | self.batch_file = self.template_file+'-run' | ||
Brian Granger
|
r1830 | |||
Brian Granger
|
r1770 | def parse_job_id(self, output): | ||
m = re.match(self.job_id_regexp, output) | ||||
if m is not None: | ||||
job_id = m.group() | ||||
Brian E Granger
|
r1234 | else: | ||
Brian Granger
|
r1770 | raise Exception("job id couldn't be determined: %s" % output) | ||
self.job_id = job_id | ||||
Brian Granger
|
r1773 | log.msg('Job started with job id: %r' % job_id) | ||
Brian Granger
|
r1770 | return job_id | ||
def write_batch_script(self, n): | ||||
self.context['n'] = n | ||||
template = open(self.template_file, 'r').read() | ||||
Brian Granger
|
r1773 | log.msg('Using template for batch script: %s' % self.template_file) | ||
Brian Granger
|
r1770 | script_as_string = Itpl.itplns(template, self.context) | ||
Brian Granger
|
r1773 | log.msg('Writing instantiated batch script: %s' % self.batch_file) | ||
Brian Granger
|
r1770 | f = open(self.batch_file,'w') | ||
f.write(script_as_string) | ||||
f.close() | ||||
Brian Granger
|
r1826 | |||
Brian Granger
|
r1770 | def handle_error(self, f): | ||
f.printTraceback() | ||||
Brian Granger
|
r1773 | f.raiseException() | ||
Brian Granger
|
r1770 | |||
def start(self, n): | ||||
self.write_batch_script(n) | ||||
d = getProcessOutput(self.submit_command, | ||||
[self.batch_file],env=os.environ) | ||||
d.addCallback(self.parse_job_id) | ||||
Brian Granger
|
r1772 | d.addErrback(self.handle_error) | ||
Brian Granger
|
r1770 | return d | ||
Brian Granger
|
r1826 | |||
Brian Granger
|
r1770 | def kill(self): | ||
d = getProcessOutput(self.delete_command, | ||||
[self.job_id],env=os.environ) | ||||
return d | ||||
class PBSEngineSet(BatchEngineSet): | ||||
submit_command = 'qsub' | ||||
delete_command = 'qdel' | ||||
job_id_regexp = '\d+' | ||||
def __init__(self, template_file, **kwargs): | ||||
BatchEngineSet.__init__(self, template_file, **kwargs) | ||||
Brian Granger
|
r1832 | |||
sshx_template="""#!/bin/sh | ||||
Brian Granger
|
r1830 | "$@" &> /dev/null & | ||
Brian Granger
|
r1832 | echo $! | ||
""" | ||||
engine_killer_template="""#!/bin/sh | ||||
ps -fu `whoami` | grep '[i]pengine' | awk '{print $2}' | xargs kill -TERM | ||||
""" | ||||
Brian Granger
|
r1830 | |||
Brian Granger
|
r1832 | class SSHEngineSet(object): | ||
sshx_template=sshx_template | ||||
engine_killer_template=engine_killer_template | ||||
Brian Granger
|
r1830 | |||
def __init__(self, engine_hosts, sshx=None, ipengine="ipengine"): | ||||
Brian Granger
|
r1832 | """Start a controller on localhost and engines using ssh. | ||
The engine_hosts argument is a dict with hostnames as keys and | ||||
the number of engine (int) as values. sshx is the name of a local | ||||
file that will be used to run remote commands. This file is used | ||||
to setup the environment properly. | ||||
""" | ||||
Brian Granger
|
r1830 | self.temp_dir = tempfile.gettempdir() | ||
Brian Granger
|
r1832 | if sshx is not None: | ||
Brian Granger
|
r1830 | self.sshx = sshx | ||
else: | ||||
Brian Granger
|
r1832 | # Write the sshx.sh file locally from our template. | ||
self.sshx = os.path.join( | ||||
self.temp_dir, | ||||
'%s-main-sshx.sh' % os.environ['USER'] | ||||
) | ||||
Brian Granger
|
r1830 | f = open(self.sshx, 'w') | ||
f.writelines(self.sshx_template) | ||||
f.close() | ||||
self.engine_command = ipengine | ||||
self.engine_hosts = engine_hosts | ||||
Brian Granger
|
r1832 | # Write the engine killer script file locally from our template. | ||
self.engine_killer = os.path.join( | ||||
self.temp_dir, | ||||
'%s-local-engine_killer.sh' % os.environ['USER'] | ||||
) | ||||
Brian Granger
|
r1830 | f = open(self.engine_killer, 'w') | ||
f.writelines(self.engine_killer_template) | ||||
f.close() | ||||
def start(self, send_furl=False): | ||||
Brian Granger
|
r1832 | dlist = [] | ||
Brian Granger
|
r1830 | for host in self.engine_hosts.keys(): | ||
count = self.engine_hosts[host] | ||||
Brian Granger
|
r1832 | d = self._start(host, count, send_furl) | ||
dlist.append(d) | ||||
return gatherBoth(dlist, consumeErrors=True) | ||||
Brian Granger
|
r1830 | |||
Brian Granger
|
r1832 | def _start(self, hostname, count=1, send_furl=False): | ||
Brian Granger
|
r1830 | if send_furl: | ||
Brian Granger
|
r1832 | d = self._scp_furl(hostname) | ||
Brian Granger
|
r1830 | else: | ||
Brian Granger
|
r1832 | d = defer.succeed(None) | ||
d.addCallback(lambda r: self._scp_sshx(hostname)) | ||||
d.addCallback(lambda r: self._ssh_engine(hostname, count)) | ||||
return d | ||||
Brian Granger
|
r1830 | |||
Brian Granger
|
r1832 | def _scp_furl(self, hostname): | ||
scp_cmd = "scp ~/.ipython/security/ipcontroller-engine.furl %s:.ipython/security/" % (hostname) | ||||
cmd_list = scp_cmd.split() | ||||
cmd_list[1] = os.path.expanduser(cmd_list[1]) | ||||
log.msg('Copying furl file: %s' % scp_cmd) | ||||
d = getProcessOutput(cmd_list[0], cmd_list[1:], env=os.environ) | ||||
return d | ||||
def _scp_sshx(self, hostname): | ||||
scp_cmd = "scp %s %s:%s/%s-sshx.sh" % ( | ||||
self.sshx, hostname, | ||||
self.temp_dir, os.environ['USER'] | ||||
) | ||||
log.msg("Copying sshx: %s" % scp_cmd) | ||||
sshx_scp = scp_cmd.split() | ||||
d = getProcessOutput(sshx_scp[0], sshx_scp[1:], env=os.environ) | ||||
return d | ||||
def _ssh_engine(self, hostname, count): | ||||
exec_engine = "ssh %s sh %s/%s-sshx.sh %s" % ( | ||||
hostname, self.temp_dir, | ||||
os.environ['USER'], self.engine_command | ||||
) | ||||
cmds = exec_engine.split() | ||||
dlist = [] | ||||
log.msg("about to start engines...") | ||||
for i in range(count): | ||||
log.msg('Starting engines: %s' % exec_engine) | ||||
d = getProcessOutput(cmds[0], cmds[1:], env=os.environ) | ||||
dlist.append(d) | ||||
return gatherBoth(dlist, consumeErrors=True) | ||||
def kill(self): | ||||
dlist = [] | ||||
for host in self.engine_hosts.keys(): | ||||
d = self._killall(host) | ||||
dlist.append(d) | ||||
return gatherBoth(dlist, consumeErrors=True) | ||||
def _killall(self, hostname): | ||||
d = self._scp_engine_killer(hostname) | ||||
d.addCallback(lambda r: self._ssh_kill(hostname)) | ||||
# d.addErrback(self._exec_err) | ||||
return d | ||||
def _scp_engine_killer(self, hostname): | ||||
scp_cmd = "scp %s %s:%s/%s-engine_killer.sh" % ( | ||||
self.engine_killer, | ||||
hostname, | ||||
self.temp_dir, | ||||
os.environ['USER'] | ||||
) | ||||
Brian Granger
|
r1830 | cmds = scp_cmd.split() | ||
Brian Granger
|
r1832 | log.msg('Copying engine_killer: %s' % scp_cmd) | ||
Brian Granger
|
r1830 | d = getProcessOutput(cmds[0], cmds[1:], env=os.environ) | ||
Brian Granger
|
r1832 | return d | ||
def _ssh_kill(self, hostname): | ||||
kill_cmd = "ssh %s sh %s/%s-engine_killer.sh" % ( | ||||
hostname, | ||||
self.temp_dir, | ||||
os.environ['USER'] | ||||
) | ||||
log.msg('Killing engine: %s' % kill_cmd) | ||||
kill_cmd = kill_cmd.split() | ||||
d = getProcessOutput(kill_cmd[0], kill_cmd[1:], env=os.environ) | ||||
return d | ||||
Brian Granger
|
r1830 | |||
Brian Granger
|
r1832 | def _exec_err(self, r): | ||
log.msg(r) | ||||
Brian Granger
|
r1770 | |||
Brian Granger
|
r1774 | #----------------------------------------------------------------------------- | ||
# Main functions for the different types of clusters | ||||
#----------------------------------------------------------------------------- | ||||
# TODO: | ||||
# The logic in these codes should be moved into classes like LocalCluster | ||||
# MpirunCluster, PBSCluster, etc. This would remove alot of the duplications. | ||||
# The main functions should then just parse the command line arguments, create | ||||
# the appropriate class and call a 'start' method. | ||||
Brian Granger
|
r1944 | |||
Administrator
|
r1815 | def check_security(args, cont_args): | ||
if (not args.x or not args.y) and not have_crypto: | ||||
log.err(""" | ||||
OpenSSL/pyOpenSSL is not available, so we can't run in secure mode. | ||||
Try running ipcluster with the -xy flags: ipcluster local -xy -n 4""") | ||||
reactor.stop() | ||||
return False | ||||
Brian Granger
|
r1771 | if args.x: | ||
cont_args.append('-x') | ||||
if args.y: | ||||
cont_args.append('-y') | ||||
Administrator
|
r1815 | return True | ||
Brian Granger
|
r1944 | |||
Brian Granger
|
r1883 | def check_reuse(args, cont_args): | ||
if args.r: | ||||
cont_args.append('-r') | ||||
if args.client_port == 0 or args.engine_port == 0: | ||||
log.err(""" | ||||
To reuse FURL files, you must also set the client and engine ports using | ||||
the --client-port and --engine-port options.""") | ||||
reactor.stop() | ||||
return False | ||||
cont_args.append('--client-port=%i' % args.client_port) | ||||
cont_args.append('--engine-port=%i' % args.engine_port) | ||||
return True | ||||
Brian Granger
|
r1830 | |||
Brian Granger
|
r1944 | |||
def _err_and_stop(f): | ||||
log.err(f) | ||||
reactor.stop() | ||||
def _delay_start(cont_pid, start_engines, furl_file, reuse): | ||||
if not reuse: | ||||
if os.path.isfile(furl_file): | ||||
os.unlink(furl_file) | ||||
log.msg('Waiting for controller to finish starting...') | ||||
d = wait_for_file(furl_file, delay=0.2, max_tries=50) | ||||
d.addCallback(lambda _: log.msg('Controller started')) | ||||
d.addCallback(lambda _: start_engines(cont_pid)) | ||||
return d | ||||
Administrator
|
r1815 | def main_local(args): | ||
cont_args = [] | ||||
cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller')) | ||||
Brian Granger
|
r1883 | |||
Administrator
|
r1815 | # Check security settings before proceeding | ||
Brian Granger
|
r1826 | if not check_security(args, cont_args): | ||
return | ||||
Brian Granger
|
r1883 | |||
# See if we are reusing FURL files | ||||
if not check_reuse(args, cont_args): | ||||
return | ||||
Brian Granger
|
r1770 | cl = ControllerLauncher(extra_args=cont_args) | ||
dstart = cl.start() | ||||
def start_engines(cont_pid): | ||||
engine_args = [] | ||||
engine_args.append('--logfile=%s' % \ | ||||
pjoin(args.logdir,'ipengine%s-' % cont_pid)) | ||||
eset = LocalEngineSet(extra_args=engine_args) | ||||
def shutdown(signum, frame): | ||||
log.msg('Stopping local cluster') | ||||
# We are still playing with the times here, but these seem | ||||
# to be reliable in allowing everything to exit cleanly. | ||||
eset.interrupt_then_kill(0.5) | ||||
cl.interrupt_then_kill(0.5) | ||||
reactor.callLater(1.0, reactor.stop) | ||||
signal.signal(signal.SIGINT,shutdown) | ||||
d = eset.start(args.n) | ||||
return d | ||||
Brian Granger
|
r1944 | config = kernel_config_manager.get_config_obj() | ||
furl_file = config['controller']['engine_furl_file'] | ||||
dstart.addCallback(_delay_start, start_engines, furl_file, args.r) | ||||
dstart.addErrback(_err_and_stop) | ||||
Brian Granger
|
r1770 | |||
Brian Granger
|
r1830 | |||
Brian Granger
|
r1880 | def main_mpi(args): | ||
Brian Granger
|
r1770 | cont_args = [] | ||
cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller')) | ||||
Brian Granger
|
r1883 | |||
Administrator
|
r1815 | # Check security settings before proceeding | ||
Brian Granger
|
r1826 | if not check_security(args, cont_args): | ||
return | ||||
Brian Granger
|
r1883 | |||
# See if we are reusing FURL files | ||||
if not check_reuse(args, cont_args): | ||||
return | ||||
Brian Granger
|
r1770 | cl = ControllerLauncher(extra_args=cont_args) | ||
dstart = cl.start() | ||||
def start_engines(cont_pid): | ||||
Brian Granger
|
r1880 | raw_args = [args.cmd] | ||
Brian Granger
|
r1770 | raw_args.extend(['-n',str(args.n)]) | ||
raw_args.append('ipengine') | ||||
raw_args.append('-l') | ||||
raw_args.append(pjoin(args.logdir,'ipengine%s-' % cont_pid)) | ||||
Brian Granger
|
r1788 | if args.mpi: | ||
raw_args.append('--mpi=%s' % args.mpi) | ||||
Brian Granger
|
r1770 | eset = ProcessLauncher(raw_args) | ||
def shutdown(signum, frame): | ||||
log.msg('Stopping local cluster') | ||||
# We are still playing with the times here, but these seem | ||||
# to be reliable in allowing everything to exit cleanly. | ||||
eset.interrupt_then_kill(1.0) | ||||
cl.interrupt_then_kill(1.0) | ||||
reactor.callLater(2.0, reactor.stop) | ||||
signal.signal(signal.SIGINT,shutdown) | ||||
d = eset.start() | ||||
return d | ||||
Brian Granger
|
r1944 | config = kernel_config_manager.get_config_obj() | ||
furl_file = config['controller']['engine_furl_file'] | ||||
dstart.addCallback(_delay_start, start_engines, furl_file, args.r) | ||||
dstart.addErrback(_err_and_stop) | ||||
Brian Granger
|
r1770 | |||
Brian Granger
|
r1830 | |||
Brian Granger
|
r1770 | def main_pbs(args): | ||
Brian Granger
|
r1772 | cont_args = [] | ||
cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller')) | ||||
Brian Granger
|
r1883 | |||
Administrator
|
r1815 | # Check security settings before proceeding | ||
Brian Granger
|
r1826 | if not check_security(args, cont_args): | ||
return | ||||
Brian Granger
|
r1883 | |||
# See if we are reusing FURL files | ||||
if not check_reuse(args, cont_args): | ||||
return | ||||
Brian Granger
|
r1772 | cl = ControllerLauncher(extra_args=cont_args) | ||
Brian Granger
|
r1770 | dstart = cl.start() | ||
def start_engines(r): | ||||
Brian Granger
|
r1772 | pbs_set = PBSEngineSet(args.pbsscript) | ||
Brian Granger
|
r1773 | def shutdown(signum, frame): | ||
log.msg('Stopping pbs cluster') | ||||
d = pbs_set.kill() | ||||
d.addBoth(lambda _: cl.interrupt_then_kill(1.0)) | ||||
d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop)) | ||||
signal.signal(signal.SIGINT,shutdown) | ||||
Brian Granger
|
r1770 | d = pbs_set.start(args.n) | ||
return d | ||||
Brian Granger
|
r1944 | config = kernel_config_manager.get_config_obj() | ||
furl_file = config['controller']['engine_furl_file'] | ||||
dstart.addCallback(_delay_start, start_engines, furl_file, args.r) | ||||
dstart.addErrback(_err_and_stop) | ||||
Brian Granger
|
r1770 | |||
Brian Granger
|
r1830 | def main_ssh(args): | ||
Brian Granger
|
r1832 | """Start a controller on localhost and engines using ssh. | ||
Your clusterfile should look like:: | ||||
send_furl = False # True, if you want | ||||
engines = { | ||||
'engine_host1' : engine_count, | ||||
'engine_host2' : engine_count2 | ||||
} | ||||
""" | ||||
Brian Granger
|
r1830 | clusterfile = {} | ||
execfile(args.clusterfile, clusterfile) | ||||
if not clusterfile.has_key('send_furl'): | ||||
clusterfile['send_furl'] = False | ||||
cont_args = [] | ||||
cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller')) | ||||
Brian Granger
|
r1832 | |||
# Check security settings before proceeding | ||||
if not check_security(args, cont_args): | ||||
return | ||||
Brian Granger
|
r1883 | # See if we are reusing FURL files | ||
if not check_reuse(args, cont_args): | ||||
return | ||||
Brian Granger
|
r1830 | cl = ControllerLauncher(extra_args=cont_args) | ||
dstart = cl.start() | ||||
def start_engines(cont_pid): | ||||
Brian Granger
|
r1832 | ssh_set = SSHEngineSet(clusterfile['engines'], sshx=args.sshx) | ||
Brian Granger
|
r1830 | def shutdown(signum, frame): | ||
Brian Granger
|
r1832 | d = ssh_set.kill() | ||
cl.interrupt_then_kill(1.0) | ||||
Brian Granger
|
r1830 | reactor.callLater(2.0, reactor.stop) | ||
signal.signal(signal.SIGINT,shutdown) | ||||
Brian Granger
|
r1832 | d = ssh_set.start(clusterfile['send_furl']) | ||
return d | ||||
Brian Granger
|
r1944 | config = kernel_config_manager.get_config_obj() | ||
furl_file = config['controller']['engine_furl_file'] | ||||
dstart.addCallback(_delay_start, start_engines, furl_file, args.r) | ||||
dstart.addErrback(_err_and_stop) | ||||
Brian Granger
|
r1830 | |||
Brian Granger
|
r1770 | def get_args(): | ||
Brian Granger
|
r1771 | base_parser = argparse.ArgumentParser(add_help=False) | ||
base_parser.add_argument( | ||||
Brian Granger
|
r1883 | '-r', | ||
action='store_true', | ||||
dest='r', | ||||
help='try to reuse FURL files. Use with --client-port and --engine-port' | ||||
) | ||||
base_parser.add_argument( | ||||
'--client-port', | ||||
type=int, | ||||
dest='client_port', | ||||
help='the port the controller will listen on for client connections', | ||||
default=0 | ||||
) | ||||
base_parser.add_argument( | ||||
'--engine-port', | ||||
type=int, | ||||
dest='engine_port', | ||||
help='the port the controller will listen on for engine connections', | ||||
default=0 | ||||
) | ||||
base_parser.add_argument( | ||||
Brian Granger
|
r1771 | '-x', | ||
action='store_true', | ||||
dest='x', | ||||
help='turn off client security' | ||||
) | ||||
base_parser.add_argument( | ||||
'-y', | ||||
action='store_true', | ||||
dest='y', | ||||
help='turn off engine security' | ||||
) | ||||
base_parser.add_argument( | ||||
"--logdir", | ||||
type=str, | ||||
dest="logdir", | ||||
help="directory to put log files (default=$IPYTHONDIR/log)", | ||||
default=pjoin(get_ipython_dir(),'log') | ||||
) | ||||
base_parser.add_argument( | ||||
"-n", | ||||
"--num", | ||||
type=int, | ||||
dest="n", | ||||
default=2, | ||||
help="the number of engines to start" | ||||
) | ||||
Brian E Granger
|
r1234 | |||
Brian Granger
|
r1771 | parser = argparse.ArgumentParser( | ||
description='IPython cluster startup. This starts a controller and\ | ||||
Brian Granger
|
r1775 | engines using various approaches. THIS IS A TECHNOLOGY PREVIEW AND\ | ||
THE API WILL CHANGE SIGNIFICANTLY BEFORE THE FINAL RELEASE.' | ||||
Brian Granger
|
r1771 | ) | ||
subparsers = parser.add_subparsers( | ||||
help='available cluster types. For help, do "ipcluster TYPE --help"') | ||||
Brian Granger
|
r1613 | |||
Brian Granger
|
r1771 | parser_local = subparsers.add_parser( | ||
'local', | ||||
help='run a local cluster', | ||||
parents=[base_parser] | ||||
) | ||||
Brian Granger
|
r1770 | parser_local.set_defaults(func=main_local) | ||
Brian Granger
|
r1771 | parser_mpirun = subparsers.add_parser( | ||
'mpirun', | ||||
Brian Granger
|
r1880 | help='run a cluster using mpirun (mpiexec also works)', | ||
Brian Granger
|
r1771 | parents=[base_parser] | ||
) | ||||
parser_mpirun.add_argument( | ||||
"--mpi", | ||||
type=str, | ||||
Brian Granger
|
r1788 | dest="mpi", # Don't put a default here to allow no MPI support | ||
Brian Granger
|
r1771 | help="how to call MPI_Init (default=mpi4py)" | ||
) | ||||
Brian Granger
|
r1880 | parser_mpirun.set_defaults(func=main_mpi, cmd='mpirun') | ||
parser_mpiexec = subparsers.add_parser( | ||||
'mpiexec', | ||||
help='run a cluster using mpiexec (mpirun also works)', | ||||
parents=[base_parser] | ||||
) | ||||
parser_mpiexec.add_argument( | ||||
"--mpi", | ||||
type=str, | ||||
dest="mpi", # Don't put a default here to allow no MPI support | ||||
help="how to call MPI_Init (default=mpi4py)" | ||||
) | ||||
parser_mpiexec.set_defaults(func=main_mpi, cmd='mpiexec') | ||||
Brian Granger
|
r1770 | |||
Brian Granger
|
r1772 | parser_pbs = subparsers.add_parser( | ||
Brian Granger
|
r1774 | 'pbs', | ||
Brian Granger
|
r1772 | help='run a pbs cluster', | ||
parents=[base_parser] | ||||
) | ||||
parser_pbs.add_argument( | ||||
'--pbs-script', | ||||
type=str, | ||||
dest='pbsscript', | ||||
help='PBS script template', | ||||
default='pbs.template' | ||||
) | ||||
Brian Granger
|
r1770 | parser_pbs.set_defaults(func=main_pbs) | ||
Brian Granger
|
r1830 | |||
parser_ssh = subparsers.add_parser( | ||||
'ssh', | ||||
help='run a cluster using ssh, should have ssh-keys setup', | ||||
parents=[base_parser] | ||||
) | ||||
parser_ssh.add_argument( | ||||
'--clusterfile', | ||||
type=str, | ||||
dest='clusterfile', | ||||
help='python file describing the cluster', | ||||
default='clusterfile.py', | ||||
) | ||||
parser_ssh.add_argument( | ||||
'--sshx', | ||||
type=str, | ||||
dest='sshx', | ||||
Brian Granger
|
r1832 | help='sshx launcher helper' | ||
Brian Granger
|
r1830 | ) | ||
parser_ssh.set_defaults(func=main_ssh) | ||||
Brian Granger
|
r1770 | args = parser.parse_args() | ||
return args | ||||
Brian E Granger
|
r1234 | |||
Brian Granger
|
r1770 | def main(): | ||
args = get_args() | ||||
reactor.callWhenRunning(args.func, args) | ||||
log.startLogging(sys.stdout) | ||||
reactor.run() | ||||
if __name__ == '__main__': | ||||
Brian E Granger
|
r1234 | main() | ||