##// END OF EJS Templates
Minor improvements to the parallel computing stuff....
Minor improvements to the parallel computing stuff. * Made various user messages better. * Added a longer (4s) paused before ipcluster exits.

File last commit:

r2302:c7310047
r2338:aecf73f6
Show More
ipcluster.py
811 lines | 26.1 KiB | text/x-python | PythonLexer
Brian Granger
Fixing unticketed bug in ipcluster.py....
r1943 #!/usr/bin/env python
Brian Granger
Final few cleanup changes to the PBS cluster in ipcluster.
r1774 # encoding: utf-8
"""Start an IPython cluster = (controller + engines)."""
#-----------------------------------------------------------------------------
Brian Granger
Semi-working refactored ipcluster....
r2302 # Copyright (C) 2008-2009 The IPython Development Team
Brian Granger
Final few cleanup changes to the PBS cluster in ipcluster.
r1774 #
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-----------------------------------------------------------------------------
#-----------------------------------------------------------------------------
# Imports
#-----------------------------------------------------------------------------
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234 import os
Brian Granger
Initial version of working refactored ipcluster....
r1770 import re
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234 import sys
Brian Granger
Initial version of working refactored ipcluster....
r1770 import signal
Brian Granger
Initial merge of ssh cluster from ~vvatsa's ipcluster-dev.
r1830 import tempfile
Brian Granger
Initial version of working refactored ipcluster....
r1770 pjoin = os.path.join
from twisted.internet import reactor, defer
from twisted.internet.protocol import ProcessProtocol
from twisted.internet.error import ProcessDone, ProcessTerminated
from twisted.internet.utils import getProcessOutput
Brian Granger
Semi-working refactored ipcluster....
r2302 from twisted.python import log
Brian Granger
Initial version of working refactored ipcluster....
r1770
from IPython.external import argparse
from IPython.external import Itpl
Brian Granger
genutils.py => utils/genutils.py and updated imports and tests.
r2023 from IPython.utils.genutils import (
Brian Granger
Addressing various review comments.
r1958 get_ipython_dir,
get_log_dir,
get_security_dir,
num_cpus
)
Administrator
Fixing numerous bugs in ipcluster on Win32....
r1815 from IPython.kernel.fcutil import have_crypto
Brian Granger
Fix for ticket: https://bugs.launchpad.net/bugs/361414...
r1945
# Create various ipython directories if they don't exist.
# This must be done before IPython.kernel.config is imported.
Brian Granger
Massive, crazy refactoring of everything....
r2202 from IPython.core.oldusersetup import user_setup
Brian Granger
Fix for ticket: https://bugs.launchpad.net/bugs/361414...
r1945 if os.name == 'posix':
rc_suffix = ''
else:
rc_suffix = '.ini'
user_setup(get_ipython_dir(), rc_suffix, mode='install', interactive=False)
get_log_dir()
get_security_dir()
Brian Granger
Fix for ipcluster bug: https://bugs.launchpad.net/bugs/358202...
r1944 from IPython.kernel.config import config_manager as kernel_config_manager
from IPython.kernel.twistedutil import gatherBoth, wait_for_file
Brian Granger
Semi-working refactored ipcluster....
r2302
Brian Granger
Fixing small things in response to review.
r1826
Brian Granger
Final few cleanup changes to the PBS cluster in ipcluster.
r1774 #-----------------------------------------------------------------------------
# General process handling code
#-----------------------------------------------------------------------------
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234
Brian Granger
Initial version of working refactored ipcluster....
r1770 class ProcessStateError(Exception):
pass
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234
Brian Granger
Initial version of working refactored ipcluster....
r1770 class UnknownStatus(Exception):
pass
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234
Brian Granger
Initial version of working refactored ipcluster....
r1770 class LauncherProcessProtocol(ProcessProtocol):
"""
A ProcessProtocol to go with the ProcessLauncher.
"""
def __init__(self, process_launcher):
self.process_launcher = process_launcher
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234
Brian Granger
Initial version of working refactored ipcluster....
r1770 def connectionMade(self):
self.process_launcher.fire_start_deferred(self.transport.pid)
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234
Brian Granger
Initial version of working refactored ipcluster....
r1770 def processEnded(self, status):
value = status.value
if isinstance(value, ProcessDone):
self.process_launcher.fire_stop_deferred(0)
elif isinstance(value, ProcessTerminated):
self.process_launcher.fire_stop_deferred(
{'exit_code':value.exitCode,
'signal':value.signal,
'status':value.status
}
)
else:
raise UnknownStatus("unknown exit status, this is probably a bug in Twisted")
Brian Granger
Initial merge of ssh cluster from ~vvatsa's ipcluster-dev.
r1830
Brian Granger
Initial version of working refactored ipcluster....
r1770 def outReceived(self, data):
log.msg(data)
Brian Granger
Initial merge of ssh cluster from ~vvatsa's ipcluster-dev.
r1830
Brian Granger
Initial version of working refactored ipcluster....
r1770 def errReceived(self, data):
log.err(data)
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234
Brian Granger
Initial version of working refactored ipcluster....
r1770 class ProcessLauncher(object):
"""
Start and stop an external process in an asynchronous manner.
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234
Brian Granger
Initial version of working refactored ipcluster....
r1770 Currently this uses deferreds to notify other parties of process state
changes. This is an awkward design and should be moved to using
a formal NotificationCenter.
"""
def __init__(self, cmd_and_args):
self.cmd = cmd_and_args[0]
self.args = cmd_and_args
self._reset()
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234
Brian Granger
Initial version of working refactored ipcluster....
r1770 def _reset(self):
self.process_protocol = None
self.pid = None
self.start_deferred = None
self.stop_deferreds = []
self.state = 'before' # before, running, or after
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234
Brian Granger
Initial version of working refactored ipcluster....
r1770 @property
def running(self):
if self.state == 'running':
return True
else:
return False
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234
Brian Granger
Initial version of working refactored ipcluster....
r1770 def fire_start_deferred(self, pid):
self.pid = pid
self.state = 'running'
log.msg('Process %r has started with pid=%i' % (self.args, pid))
self.start_deferred.callback(pid)
def start(self):
if self.state == 'before':
self.process_protocol = LauncherProcessProtocol(self)
self.start_deferred = defer.Deferred()
self.process_transport = reactor.spawnProcess(
self.process_protocol,
self.cmd,
self.args,
env=os.environ
)
return self.start_deferred
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234 else:
Brian Granger
Semi-working refactored ipcluster....
r2302 s = 'The process has already been started and has state: %r' % \
Brian Granger
Initial version of working refactored ipcluster....
r1770 self.state
return defer.fail(ProcessStateError(s))
def get_stop_deferred(self):
if self.state == 'running' or self.state == 'before':
d = defer.Deferred()
self.stop_deferreds.append(d)
return d
else:
s = 'this process is already complete'
return defer.fail(ProcessStateError(s))
def fire_stop_deferred(self, exit_code):
log.msg('Process %r has stopped with %r' % (self.args, exit_code))
self.state = 'after'
for d in self.stop_deferreds:
d.callback(exit_code)
def signal(self, sig):
"""
Send a signal to the process.
The argument sig can be ('KILL','INT', etc.) or any signal number.
"""
if self.state == 'running':
self.process_transport.signalProcess(sig)
Brian Granger
The local mode of ipcluster now works on Win32.
r1771 # def __del__(self):
# self.signal('KILL')
Brian Granger
Removed the -f option to ipcluster. The remote starting of an IPython...
r1610
Brian Granger
Initial version of working refactored ipcluster....
r1770 def interrupt_then_kill(self, delay=1.0):
self.signal('INT')
reactor.callLater(delay, self.signal, 'KILL')
Brian Granger
Removed the -f option to ipcluster. The remote starting of an IPython...
r1610
Brian Granger
Final few cleanup changes to the PBS cluster in ipcluster.
r1774 #-----------------------------------------------------------------------------
# Code for launching controller and engines
#-----------------------------------------------------------------------------
Brian Granger
Initial version of working refactored ipcluster....
r1770 class ControllerLauncher(ProcessLauncher):
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234
Brian Granger
Initial version of working refactored ipcluster....
r1770 def __init__(self, extra_args=None):
Brian Granger
The local mode of ipcluster now works on Win32.
r1771 if sys.platform == 'win32':
Administrator
Fixing numerous bugs in ipcluster on Win32....
r1815 # This logic is needed because the ipcontroller script doesn't
# always get installed in the same way or in the same location.
from IPython.kernel.scripts import ipcontroller
script_location = ipcontroller.__file__.replace('.pyc', '.py')
# The -u option here turns on unbuffered output, which is required
Brian Granger
Fixing unticketed bug in ipcluster.py....
r1943 # on Win32 to prevent wierd conflict and problems with Twisted.
# Also, use sys.executable to make sure we are picking up the
# right python exe.
args = [sys.executable, '-u', script_location]
Brian Granger
The local mode of ipcluster now works on Win32.
r1771 else:
args = ['ipcontroller']
Brian Granger
Initial version of working refactored ipcluster....
r1770 self.extra_args = extra_args
if extra_args is not None:
Brian Granger
The local mode of ipcluster now works on Win32.
r1771 args.extend(extra_args)
Brian Granger
Initial version of working refactored ipcluster....
r1770
Brian Granger
The local mode of ipcluster now works on Win32.
r1771 ProcessLauncher.__init__(self, args)
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234
Brian Granger
Initial version of working refactored ipcluster....
r1770 class EngineLauncher(ProcessLauncher):
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234
Brian Granger
Initial version of working refactored ipcluster....
r1770 def __init__(self, extra_args=None):
Brian Granger
The local mode of ipcluster now works on Win32.
r1771 if sys.platform == 'win32':
Administrator
Fixing numerous bugs in ipcluster on Win32....
r1815 # This logic is needed because the ipcontroller script doesn't
# always get installed in the same way or in the same location.
from IPython.kernel.scripts import ipengine
script_location = ipengine.__file__.replace('.pyc', '.py')
# The -u option here turns on unbuffered output, which is required
Brian Granger
Fixing unticketed bug in ipcluster.py....
r1943 # on Win32 to prevent wierd conflict and problems with Twisted.
# Also, use sys.executable to make sure we are picking up the
# right python exe.
args = [sys.executable, '-u', script_location]
Brian Granger
The local mode of ipcluster now works on Win32.
r1771 else:
args = ['ipengine']
Brian Granger
Initial version of working refactored ipcluster....
r1770 self.extra_args = extra_args
if extra_args is not None:
Brian Granger
The local mode of ipcluster now works on Win32.
r1771 args.extend(extra_args)
Brian Granger
Initial version of working refactored ipcluster....
r1770
Brian Granger
The local mode of ipcluster now works on Win32.
r1771 ProcessLauncher.__init__(self, args)
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234
Brian Granger
Initial version of working refactored ipcluster....
r1770 class LocalEngineSet(object):
def __init__(self, extra_args=None):
self.extra_args = extra_args
self.launchers = []
def start(self, n):
dlist = []
for i in range(n):
el = EngineLauncher(extra_args=self.extra_args)
d = el.start()
self.launchers.append(el)
dlist.append(d)
dfinal = gatherBoth(dlist, consumeErrors=True)
dfinal.addCallback(self._handle_start)
return dfinal
def _handle_start(self, r):
log.msg('Engines started with pids: %r' % r)
return r
def _handle_stop(self, r):
log.msg('Engines received signal: %r' % r)
return r
def signal(self, sig):
dlist = []
for el in self.launchers:
d = el.get_stop_deferred()
dlist.append(d)
el.signal(sig)
dfinal = gatherBoth(dlist, consumeErrors=True)
dfinal.addCallback(self._handle_stop)
return dfinal
def interrupt_then_kill(self, delay=1.0):
dlist = []
for el in self.launchers:
d = el.get_stop_deferred()
dlist.append(d)
el.interrupt_then_kill(delay)
dfinal = gatherBoth(dlist, consumeErrors=True)
dfinal.addCallback(self._handle_stop)
return dfinal
class BatchEngineSet(object):
# Subclasses must fill these in. See PBSEngineSet
submit_command = ''
delete_command = ''
job_id_regexp = ''
def __init__(self, template_file, **kwargs):
self.template_file = template_file
self.context = {}
self.context.update(kwargs)
Brian Granger
More changes to the PBS batch cluster.
r1773 self.batch_file = self.template_file+'-run'
Brian Granger
Initial merge of ssh cluster from ~vvatsa's ipcluster-dev.
r1830
Brian Granger
Initial version of working refactored ipcluster....
r1770 def parse_job_id(self, output):
m = re.match(self.job_id_regexp, output)
if m is not None:
job_id = m.group()
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234 else:
Brian Granger
Initial version of working refactored ipcluster....
r1770 raise Exception("job id couldn't be determined: %s" % output)
self.job_id = job_id
Brian Granger
More changes to the PBS batch cluster.
r1773 log.msg('Job started with job id: %r' % job_id)
Brian Granger
Initial version of working refactored ipcluster....
r1770 return job_id
def write_batch_script(self, n):
self.context['n'] = n
template = open(self.template_file, 'r').read()
Brian Granger
More changes to the PBS batch cluster.
r1773 log.msg('Using template for batch script: %s' % self.template_file)
Brian Granger
Initial version of working refactored ipcluster....
r1770 script_as_string = Itpl.itplns(template, self.context)
Brian Granger
More changes to the PBS batch cluster.
r1773 log.msg('Writing instantiated batch script: %s' % self.batch_file)
Brian Granger
Initial version of working refactored ipcluster....
r1770 f = open(self.batch_file,'w')
f.write(script_as_string)
f.close()
Brian Granger
Fixing small things in response to review.
r1826
Brian Granger
Initial version of working refactored ipcluster....
r1770 def handle_error(self, f):
f.printTraceback()
Brian Granger
More changes to the PBS batch cluster.
r1773 f.raiseException()
Brian Granger
Initial version of working refactored ipcluster....
r1770
def start(self, n):
self.write_batch_script(n)
d = getProcessOutput(self.submit_command,
[self.batch_file],env=os.environ)
d.addCallback(self.parse_job_id)
Brian Granger
Initial version that works with PBS.
r1772 d.addErrback(self.handle_error)
Brian Granger
Initial version of working refactored ipcluster....
r1770 return d
Brian Granger
Fixing small things in response to review.
r1826
Brian Granger
Initial version of working refactored ipcluster....
r1770 def kill(self):
d = getProcessOutput(self.delete_command,
[self.job_id],env=os.environ)
return d
class PBSEngineSet(BatchEngineSet):
submit_command = 'qsub'
delete_command = 'qdel'
job_id_regexp = '\d+'
def __init__(self, template_file, **kwargs):
BatchEngineSet.__init__(self, template_file, **kwargs)
Brian Granger
Merging in vvatsa's ssh mode for ipcluster with some changes....
r1832
sshx_template="""#!/bin/sh
Brian Granger
Initial merge of ssh cluster from ~vvatsa's ipcluster-dev.
r1830 "$@" &> /dev/null &
Brian Granger
Merging in vvatsa's ssh mode for ipcluster with some changes....
r1832 echo $!
"""
engine_killer_template="""#!/bin/sh
ps -fu `whoami` | grep '[i]pengine' | awk '{print $2}' | xargs kill -TERM
"""
Brian Granger
Initial merge of ssh cluster from ~vvatsa's ipcluster-dev.
r1830
Brian Granger
Merging in vvatsa's ssh mode for ipcluster with some changes....
r1832 class SSHEngineSet(object):
sshx_template=sshx_template
engine_killer_template=engine_killer_template
Brian Granger
Initial merge of ssh cluster from ~vvatsa's ipcluster-dev.
r1830
def __init__(self, engine_hosts, sshx=None, ipengine="ipengine"):
Brian Granger
Merging in vvatsa's ssh mode for ipcluster with some changes....
r1832 """Start a controller on localhost and engines using ssh.
The engine_hosts argument is a dict with hostnames as keys and
the number of engine (int) as values. sshx is the name of a local
file that will be used to run remote commands. This file is used
to setup the environment properly.
"""
Brian Granger
Initial merge of ssh cluster from ~vvatsa's ipcluster-dev.
r1830 self.temp_dir = tempfile.gettempdir()
Brian Granger
Merging in vvatsa's ssh mode for ipcluster with some changes....
r1832 if sshx is not None:
Brian Granger
Initial merge of ssh cluster from ~vvatsa's ipcluster-dev.
r1830 self.sshx = sshx
else:
Brian Granger
Merging in vvatsa's ssh mode for ipcluster with some changes....
r1832 # Write the sshx.sh file locally from our template.
self.sshx = os.path.join(
self.temp_dir,
'%s-main-sshx.sh' % os.environ['USER']
)
Brian Granger
Initial merge of ssh cluster from ~vvatsa's ipcluster-dev.
r1830 f = open(self.sshx, 'w')
f.writelines(self.sshx_template)
f.close()
self.engine_command = ipengine
self.engine_hosts = engine_hosts
Brian Granger
Merging in vvatsa's ssh mode for ipcluster with some changes....
r1832 # Write the engine killer script file locally from our template.
self.engine_killer = os.path.join(
self.temp_dir,
'%s-local-engine_killer.sh' % os.environ['USER']
)
Brian Granger
Initial merge of ssh cluster from ~vvatsa's ipcluster-dev.
r1830 f = open(self.engine_killer, 'w')
f.writelines(self.engine_killer_template)
f.close()
def start(self, send_furl=False):
Brian Granger
Merging in vvatsa's ssh mode for ipcluster with some changes....
r1832 dlist = []
Brian Granger
Initial merge of ssh cluster from ~vvatsa's ipcluster-dev.
r1830 for host in self.engine_hosts.keys():
count = self.engine_hosts[host]
Brian Granger
Merging in vvatsa's ssh mode for ipcluster with some changes....
r1832 d = self._start(host, count, send_furl)
dlist.append(d)
return gatherBoth(dlist, consumeErrors=True)
Brian Granger
Initial merge of ssh cluster from ~vvatsa's ipcluster-dev.
r1830
Brian Granger
Merging in vvatsa's ssh mode for ipcluster with some changes....
r1832 def _start(self, hostname, count=1, send_furl=False):
Brian Granger
Initial merge of ssh cluster from ~vvatsa's ipcluster-dev.
r1830 if send_furl:
Brian Granger
Merging in vvatsa's ssh mode for ipcluster with some changes....
r1832 d = self._scp_furl(hostname)
Brian Granger
Initial merge of ssh cluster from ~vvatsa's ipcluster-dev.
r1830 else:
Brian Granger
Merging in vvatsa's ssh mode for ipcluster with some changes....
r1832 d = defer.succeed(None)
d.addCallback(lambda r: self._scp_sshx(hostname))
d.addCallback(lambda r: self._ssh_engine(hostname, count))
return d
Brian Granger
Initial merge of ssh cluster from ~vvatsa's ipcluster-dev.
r1830
Brian Granger
Merging in vvatsa's ssh mode for ipcluster with some changes....
r1832 def _scp_furl(self, hostname):
scp_cmd = "scp ~/.ipython/security/ipcontroller-engine.furl %s:.ipython/security/" % (hostname)
cmd_list = scp_cmd.split()
cmd_list[1] = os.path.expanduser(cmd_list[1])
log.msg('Copying furl file: %s' % scp_cmd)
d = getProcessOutput(cmd_list[0], cmd_list[1:], env=os.environ)
return d
def _scp_sshx(self, hostname):
scp_cmd = "scp %s %s:%s/%s-sshx.sh" % (
self.sshx, hostname,
self.temp_dir, os.environ['USER']
)
print
log.msg("Copying sshx: %s" % scp_cmd)
sshx_scp = scp_cmd.split()
d = getProcessOutput(sshx_scp[0], sshx_scp[1:], env=os.environ)
return d
def _ssh_engine(self, hostname, count):
exec_engine = "ssh %s sh %s/%s-sshx.sh %s" % (
hostname, self.temp_dir,
os.environ['USER'], self.engine_command
)
cmds = exec_engine.split()
dlist = []
log.msg("about to start engines...")
for i in range(count):
log.msg('Starting engines: %s' % exec_engine)
d = getProcessOutput(cmds[0], cmds[1:], env=os.environ)
dlist.append(d)
return gatherBoth(dlist, consumeErrors=True)
def kill(self):
dlist = []
for host in self.engine_hosts.keys():
d = self._killall(host)
dlist.append(d)
return gatherBoth(dlist, consumeErrors=True)
def _killall(self, hostname):
d = self._scp_engine_killer(hostname)
d.addCallback(lambda r: self._ssh_kill(hostname))
# d.addErrback(self._exec_err)
return d
def _scp_engine_killer(self, hostname):
scp_cmd = "scp %s %s:%s/%s-engine_killer.sh" % (
self.engine_killer,
hostname,
self.temp_dir,
os.environ['USER']
)
Brian Granger
Initial merge of ssh cluster from ~vvatsa's ipcluster-dev.
r1830 cmds = scp_cmd.split()
Brian Granger
Merging in vvatsa's ssh mode for ipcluster with some changes....
r1832 log.msg('Copying engine_killer: %s' % scp_cmd)
Brian Granger
Initial merge of ssh cluster from ~vvatsa's ipcluster-dev.
r1830 d = getProcessOutput(cmds[0], cmds[1:], env=os.environ)
Brian Granger
Merging in vvatsa's ssh mode for ipcluster with some changes....
r1832 return d
def _ssh_kill(self, hostname):
kill_cmd = "ssh %s sh %s/%s-engine_killer.sh" % (
hostname,
self.temp_dir,
os.environ['USER']
)
log.msg('Killing engine: %s' % kill_cmd)
kill_cmd = kill_cmd.split()
d = getProcessOutput(kill_cmd[0], kill_cmd[1:], env=os.environ)
return d
Brian Granger
Initial merge of ssh cluster from ~vvatsa's ipcluster-dev.
r1830
Brian Granger
Merging in vvatsa's ssh mode for ipcluster with some changes....
r1832 def _exec_err(self, r):
log.msg(r)
Brian Granger
Initial version of working refactored ipcluster....
r1770
Brian Granger
Final few cleanup changes to the PBS cluster in ipcluster.
r1774 #-----------------------------------------------------------------------------
# Main functions for the different types of clusters
#-----------------------------------------------------------------------------
# TODO:
# The logic in these codes should be moved into classes like LocalCluster
# MpirunCluster, PBSCluster, etc. This would remove alot of the duplications.
# The main functions should then just parse the command line arguments, create
# the appropriate class and call a 'start' method.
Brian Granger
Fix for ipcluster bug: https://bugs.launchpad.net/bugs/358202...
r1944
Administrator
Fixing numerous bugs in ipcluster on Win32....
r1815 def check_security(args, cont_args):
Brian Granger
Addressing various review comments.
r1958 """Check to see if we should run with SSL support."""
Administrator
Fixing numerous bugs in ipcluster on Win32....
r1815 if (not args.x or not args.y) and not have_crypto:
log.err("""
OpenSSL/pyOpenSSL is not available, so we can't run in secure mode.
Try running ipcluster with the -xy flags: ipcluster local -xy -n 4""")
reactor.stop()
return False
Brian Granger
The local mode of ipcluster now works on Win32.
r1771 if args.x:
cont_args.append('-x')
if args.y:
cont_args.append('-y')
Administrator
Fixing numerous bugs in ipcluster on Win32....
r1815 return True
Brian Granger
Fix for ipcluster bug: https://bugs.launchpad.net/bugs/358202...
r1944
Brian Granger
The ipcluster command is now able to reuse FURL files....
r1883 def check_reuse(args, cont_args):
Brian Granger
Addressing various review comments.
r1958 """Check to see if we should try to resuse FURL files."""
Brian Granger
The ipcluster command is now able to reuse FURL files....
r1883 if args.r:
cont_args.append('-r')
if args.client_port == 0 or args.engine_port == 0:
log.err("""
To reuse FURL files, you must also set the client and engine ports using
the --client-port and --engine-port options.""")
reactor.stop()
return False
cont_args.append('--client-port=%i' % args.client_port)
cont_args.append('--engine-port=%i' % args.engine_port)
return True
Brian Granger
Initial merge of ssh cluster from ~vvatsa's ipcluster-dev.
r1830
Brian Granger
Fix for ipcluster bug: https://bugs.launchpad.net/bugs/358202...
r1944
def _err_and_stop(f):
Brian Granger
Addressing various review comments.
r1958 """Errback to log a failure and halt the reactor on a fatal error."""
Brian Granger
Fix for ipcluster bug: https://bugs.launchpad.net/bugs/358202...
r1944 log.err(f)
reactor.stop()
def _delay_start(cont_pid, start_engines, furl_file, reuse):
Brian Granger
Addressing various review comments.
r1958 """Wait for controller to create FURL files and the start the engines."""
Brian Granger
Fix for ipcluster bug: https://bugs.launchpad.net/bugs/358202...
r1944 if not reuse:
if os.path.isfile(furl_file):
os.unlink(furl_file)
log.msg('Waiting for controller to finish starting...')
d = wait_for_file(furl_file, delay=0.2, max_tries=50)
d.addCallback(lambda _: log.msg('Controller started'))
d.addCallback(lambda _: start_engines(cont_pid))
return d
Administrator
Fixing numerous bugs in ipcluster on Win32....
r1815 def main_local(args):
cont_args = []
cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
Brian Granger
The ipcluster command is now able to reuse FURL files....
r1883
Administrator
Fixing numerous bugs in ipcluster on Win32....
r1815 # Check security settings before proceeding
Brian Granger
Fixing small things in response to review.
r1826 if not check_security(args, cont_args):
return
Brian Granger
The ipcluster command is now able to reuse FURL files....
r1883
# See if we are reusing FURL files
if not check_reuse(args, cont_args):
return
Brian Granger
Initial version of working refactored ipcluster....
r1770 cl = ControllerLauncher(extra_args=cont_args)
dstart = cl.start()
def start_engines(cont_pid):
engine_args = []
engine_args.append('--logfile=%s' % \
pjoin(args.logdir,'ipengine%s-' % cont_pid))
eset = LocalEngineSet(extra_args=engine_args)
def shutdown(signum, frame):
log.msg('Stopping local cluster')
# We are still playing with the times here, but these seem
# to be reliable in allowing everything to exit cleanly.
eset.interrupt_then_kill(0.5)
cl.interrupt_then_kill(0.5)
reactor.callLater(1.0, reactor.stop)
signal.signal(signal.SIGINT,shutdown)
d = eset.start(args.n)
return d
Brian Granger
Fix for ipcluster bug: https://bugs.launchpad.net/bugs/358202...
r1944 config = kernel_config_manager.get_config_obj()
furl_file = config['controller']['engine_furl_file']
dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
dstart.addErrback(_err_and_stop)
Brian Granger
Initial version of working refactored ipcluster....
r1770
Brian Granger
Initial merge of ssh cluster from ~vvatsa's ipcluster-dev.
r1830
Brian Granger
Adding support for mpiexec as well as mpirun....
r1880 def main_mpi(args):
Brian Granger
Initial version of working refactored ipcluster....
r1770 cont_args = []
cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
Brian Granger
The ipcluster command is now able to reuse FURL files....
r1883
Administrator
Fixing numerous bugs in ipcluster on Win32....
r1815 # Check security settings before proceeding
Brian Granger
Fixing small things in response to review.
r1826 if not check_security(args, cont_args):
return
Brian Granger
The ipcluster command is now able to reuse FURL files....
r1883
# See if we are reusing FURL files
if not check_reuse(args, cont_args):
return
Brian Granger
Initial version of working refactored ipcluster....
r1770 cl = ControllerLauncher(extra_args=cont_args)
dstart = cl.start()
def start_engines(cont_pid):
Brian Granger
Adding support for mpiexec as well as mpirun....
r1880 raw_args = [args.cmd]
Brian Granger
Initial version of working refactored ipcluster....
r1770 raw_args.extend(['-n',str(args.n)])
raw_args.append('ipengine')
raw_args.append('-l')
raw_args.append(pjoin(args.logdir,'ipengine%s-' % cont_pid))
Brian Granger
Update of docs to reflect the new ipcluster version....
r1788 if args.mpi:
raw_args.append('--mpi=%s' % args.mpi)
Brian Granger
Initial version of working refactored ipcluster....
r1770 eset = ProcessLauncher(raw_args)
def shutdown(signum, frame):
log.msg('Stopping local cluster')
# We are still playing with the times here, but these seem
# to be reliable in allowing everything to exit cleanly.
eset.interrupt_then_kill(1.0)
cl.interrupt_then_kill(1.0)
reactor.callLater(2.0, reactor.stop)
signal.signal(signal.SIGINT,shutdown)
d = eset.start()
return d
Brian Granger
Fix for ipcluster bug: https://bugs.launchpad.net/bugs/358202...
r1944 config = kernel_config_manager.get_config_obj()
furl_file = config['controller']['engine_furl_file']
dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
dstart.addErrback(_err_and_stop)
Brian Granger
Initial version of working refactored ipcluster....
r1770
Brian Granger
Initial merge of ssh cluster from ~vvatsa's ipcluster-dev.
r1830
Brian Granger
Initial version of working refactored ipcluster....
r1770 def main_pbs(args):
Brian Granger
Initial version that works with PBS.
r1772 cont_args = []
cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
Brian Granger
The ipcluster command is now able to reuse FURL files....
r1883
Administrator
Fixing numerous bugs in ipcluster on Win32....
r1815 # Check security settings before proceeding
Brian Granger
Fixing small things in response to review.
r1826 if not check_security(args, cont_args):
return
Brian Granger
The ipcluster command is now able to reuse FURL files....
r1883
# See if we are reusing FURL files
if not check_reuse(args, cont_args):
return
Brian Granger
Initial version that works with PBS.
r1772 cl = ControllerLauncher(extra_args=cont_args)
Brian Granger
Initial version of working refactored ipcluster....
r1770 dstart = cl.start()
def start_engines(r):
Brian Granger
Initial version that works with PBS.
r1772 pbs_set = PBSEngineSet(args.pbsscript)
Brian Granger
More changes to the PBS batch cluster.
r1773 def shutdown(signum, frame):
log.msg('Stopping pbs cluster')
d = pbs_set.kill()
d.addBoth(lambda _: cl.interrupt_then_kill(1.0))
d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop))
signal.signal(signal.SIGINT,shutdown)
Brian Granger
Initial version of working refactored ipcluster....
r1770 d = pbs_set.start(args.n)
return d
Brian Granger
Fix for ipcluster bug: https://bugs.launchpad.net/bugs/358202...
r1944 config = kernel_config_manager.get_config_obj()
furl_file = config['controller']['engine_furl_file']
dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
dstart.addErrback(_err_and_stop)
Brian Granger
Initial version of working refactored ipcluster....
r1770
Brian Granger
Initial merge of ssh cluster from ~vvatsa's ipcluster-dev.
r1830 def main_ssh(args):
Brian Granger
Merging in vvatsa's ssh mode for ipcluster with some changes....
r1832 """Start a controller on localhost and engines using ssh.
Your clusterfile should look like::
send_furl = False # True, if you want
engines = {
'engine_host1' : engine_count,
'engine_host2' : engine_count2
}
"""
Brian Granger
Initial merge of ssh cluster from ~vvatsa's ipcluster-dev.
r1830 clusterfile = {}
execfile(args.clusterfile, clusterfile)
if not clusterfile.has_key('send_furl'):
clusterfile['send_furl'] = False
cont_args = []
cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
Brian Granger
Merging in vvatsa's ssh mode for ipcluster with some changes....
r1832
# Check security settings before proceeding
if not check_security(args, cont_args):
return
Brian Granger
The ipcluster command is now able to reuse FURL files....
r1883 # See if we are reusing FURL files
if not check_reuse(args, cont_args):
return
Brian Granger
Initial merge of ssh cluster from ~vvatsa's ipcluster-dev.
r1830 cl = ControllerLauncher(extra_args=cont_args)
dstart = cl.start()
def start_engines(cont_pid):
Brian Granger
Merging in vvatsa's ssh mode for ipcluster with some changes....
r1832 ssh_set = SSHEngineSet(clusterfile['engines'], sshx=args.sshx)
Brian Granger
Initial merge of ssh cluster from ~vvatsa's ipcluster-dev.
r1830 def shutdown(signum, frame):
Brian Granger
Merging in vvatsa's ssh mode for ipcluster with some changes....
r1832 d = ssh_set.kill()
cl.interrupt_then_kill(1.0)
Brian Granger
Initial merge of ssh cluster from ~vvatsa's ipcluster-dev.
r1830 reactor.callLater(2.0, reactor.stop)
signal.signal(signal.SIGINT,shutdown)
Brian Granger
Merging in vvatsa's ssh mode for ipcluster with some changes....
r1832 d = ssh_set.start(clusterfile['send_furl'])
return d
Brian Granger
Fix for ipcluster bug: https://bugs.launchpad.net/bugs/358202...
r1944 config = kernel_config_manager.get_config_obj()
furl_file = config['controller']['engine_furl_file']
dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
dstart.addErrback(_err_and_stop)
Brian Granger
Initial merge of ssh cluster from ~vvatsa's ipcluster-dev.
r1830
Brian Granger
Initial version of working refactored ipcluster....
r1770 def get_args():
Brian Granger
The local mode of ipcluster now works on Win32.
r1771 base_parser = argparse.ArgumentParser(add_help=False)
base_parser.add_argument(
Brian Granger
The ipcluster command is now able to reuse FURL files....
r1883 '-r',
action='store_true',
dest='r',
help='try to reuse FURL files. Use with --client-port and --engine-port'
)
base_parser.add_argument(
'--client-port',
type=int,
dest='client_port',
help='the port the controller will listen on for client connections',
default=0
)
base_parser.add_argument(
'--engine-port',
type=int,
dest='engine_port',
help='the port the controller will listen on for engine connections',
default=0
)
base_parser.add_argument(
Brian Granger
The local mode of ipcluster now works on Win32.
r1771 '-x',
action='store_true',
dest='x',
help='turn off client security'
)
base_parser.add_argument(
'-y',
action='store_true',
dest='y',
help='turn off engine security'
)
base_parser.add_argument(
"--logdir",
type=str,
dest="logdir",
help="directory to put log files (default=$IPYTHONDIR/log)",
default=pjoin(get_ipython_dir(),'log')
)
base_parser.add_argument(
"-n",
"--num",
type=int,
dest="n",
default=2,
help="the number of engines to start"
)
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234
Brian Granger
The local mode of ipcluster now works on Win32.
r1771 parser = argparse.ArgumentParser(
description='IPython cluster startup. This starts a controller and\
Brian Granger
Adding information about IPYTHONDIR to usage of ipcluster and friends.
r1959 engines using various approaches. Use the IPYTHONDIR environment\
variable to change your IPython directory from the default of\
.ipython or _ipython. The log and security subdirectories of your\
IPython directory will be used by this script for log files and\
security files.'
Brian Granger
The local mode of ipcluster now works on Win32.
r1771 )
subparsers = parser.add_subparsers(
help='available cluster types. For help, do "ipcluster TYPE --help"')
Brian Granger
Added test in ipcluster.py to see if the platform is win32. If so,...
r1613
Brian Granger
The local mode of ipcluster now works on Win32.
r1771 parser_local = subparsers.add_parser(
'local',
help='run a local cluster',
parents=[base_parser]
)
Brian Granger
Initial version of working refactored ipcluster....
r1770 parser_local.set_defaults(func=main_local)
Brian Granger
The local mode of ipcluster now works on Win32.
r1771 parser_mpirun = subparsers.add_parser(
'mpirun',
Brian Granger
Adding support for mpiexec as well as mpirun....
r1880 help='run a cluster using mpirun (mpiexec also works)',
Brian Granger
The local mode of ipcluster now works on Win32.
r1771 parents=[base_parser]
)
parser_mpirun.add_argument(
"--mpi",
type=str,
Brian Granger
Update of docs to reflect the new ipcluster version....
r1788 dest="mpi", # Don't put a default here to allow no MPI support
Brian Granger
The local mode of ipcluster now works on Win32.
r1771 help="how to call MPI_Init (default=mpi4py)"
)
Brian Granger
Adding support for mpiexec as well as mpirun....
r1880 parser_mpirun.set_defaults(func=main_mpi, cmd='mpirun')
parser_mpiexec = subparsers.add_parser(
'mpiexec',
help='run a cluster using mpiexec (mpirun also works)',
parents=[base_parser]
)
parser_mpiexec.add_argument(
"--mpi",
type=str,
dest="mpi", # Don't put a default here to allow no MPI support
help="how to call MPI_Init (default=mpi4py)"
)
parser_mpiexec.set_defaults(func=main_mpi, cmd='mpiexec')
Brian Granger
Initial version of working refactored ipcluster....
r1770
Brian Granger
Initial version that works with PBS.
r1772 parser_pbs = subparsers.add_parser(
Brian Granger
Final few cleanup changes to the PBS cluster in ipcluster.
r1774 'pbs',
Brian Granger
Initial version that works with PBS.
r1772 help='run a pbs cluster',
parents=[base_parser]
)
parser_pbs.add_argument(
'--pbs-script',
type=str,
dest='pbsscript',
help='PBS script template',
default='pbs.template'
)
Brian Granger
Initial version of working refactored ipcluster....
r1770 parser_pbs.set_defaults(func=main_pbs)
Brian Granger
Initial merge of ssh cluster from ~vvatsa's ipcluster-dev.
r1830
parser_ssh = subparsers.add_parser(
'ssh',
help='run a cluster using ssh, should have ssh-keys setup',
parents=[base_parser]
)
parser_ssh.add_argument(
'--clusterfile',
type=str,
dest='clusterfile',
help='python file describing the cluster',
default='clusterfile.py',
)
parser_ssh.add_argument(
'--sshx',
type=str,
dest='sshx',
Brian Granger
Merging in vvatsa's ssh mode for ipcluster with some changes....
r1832 help='sshx launcher helper'
Brian Granger
Initial merge of ssh cluster from ~vvatsa's ipcluster-dev.
r1830 )
parser_ssh.set_defaults(func=main_ssh)
Brian Granger
Initial version of working refactored ipcluster....
r1770 args = parser.parse_args()
return args
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234
Brian Granger
Initial version of working refactored ipcluster....
r1770 def main():
args = get_args()
reactor.callWhenRunning(args.func, args)
log.startLogging(sys.stdout)
reactor.run()
if __name__ == '__main__':
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234 main()