##// END OF EJS Templates
Fix line endings and encoding.
Fix line endings and encoding.

File last commit:

r1832:34a51241
r1863:a42b7b46
Show More
ipcluster.py
723 lines | 23.3 KiB | text/x-python | PythonLexer
Brian Granger
Final few cleanup changes to the PBS cluster in ipcluster.
r1774 #!/usr/bin/env python
# encoding: utf-8
"""Start an IPython cluster = (controller + engines)."""
#-----------------------------------------------------------------------------
# Copyright (C) 2008 The IPython Development Team
#
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-----------------------------------------------------------------------------
#-----------------------------------------------------------------------------
# Imports
#-----------------------------------------------------------------------------
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234 import os
Brian Granger
Initial version of working refactored ipcluster....
r1770 import re
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234 import sys
Brian Granger
Initial version of working refactored ipcluster....
r1770 import signal
Brian Granger
Initial merge of ssh cluster from ~vvatsa's ipcluster-dev.
r1830 import tempfile
Brian Granger
Initial version of working refactored ipcluster....
r1770 pjoin = os.path.join
from twisted.internet import reactor, defer
from twisted.internet.protocol import ProcessProtocol
from twisted.internet.error import ProcessDone, ProcessTerminated
from twisted.internet.utils import getProcessOutput
Brian Granger
Fixing small things in response to review.
r1826 from twisted.python import failure, log
Brian Granger
Initial version of working refactored ipcluster....
r1770
from IPython.external import argparse
from IPython.external import Itpl
Brian Granger
The local mode of ipcluster now works on Win32.
r1771 from IPython.genutils import get_ipython_dir, num_cpus
Administrator
Fixing numerous bugs in ipcluster on Win32....
r1815 from IPython.kernel.fcutil import have_crypto
from IPython.kernel.error import SecurityError
Brian Granger
Fixing small things in response to review.
r1826 from IPython.kernel.fcutil import have_crypto
from IPython.kernel.twistedutil import gatherBoth
from IPython.kernel.util import printer
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234
Brian Granger
Final few cleanup changes to the PBS cluster in ipcluster.
r1774 #-----------------------------------------------------------------------------
# General process handling code
#-----------------------------------------------------------------------------
Brian Granger
The local mode of ipcluster now works on Win32.
r1771 def find_exe(cmd):
try:
import win32api
except ImportError:
raise ImportError('you need to have pywin32 installed for this to work')
else:
Brian Granger
Fixing small things in response to review.
r1826 try:
(path, offest) = win32api.SearchPath(os.environ['PATH'],cmd + '.exe')
except:
(path, offset) = win32api.SearchPath(os.environ['PATH'],cmd + '.bat')
Administrator
Fixing command finding bug on Win32 in ipcluster.py
r1814 return path
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234
Brian Granger
Initial version of working refactored ipcluster....
r1770 class ProcessStateError(Exception):
pass
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234
Brian Granger
Initial version of working refactored ipcluster....
r1770 class UnknownStatus(Exception):
pass
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234
Brian Granger
Initial version of working refactored ipcluster....
r1770 class LauncherProcessProtocol(ProcessProtocol):
"""
A ProcessProtocol to go with the ProcessLauncher.
"""
def __init__(self, process_launcher):
self.process_launcher = process_launcher
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234
Brian Granger
Initial version of working refactored ipcluster....
r1770 def connectionMade(self):
self.process_launcher.fire_start_deferred(self.transport.pid)
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234
Brian Granger
Initial version of working refactored ipcluster....
r1770 def processEnded(self, status):
value = status.value
if isinstance(value, ProcessDone):
self.process_launcher.fire_stop_deferred(0)
elif isinstance(value, ProcessTerminated):
self.process_launcher.fire_stop_deferred(
{'exit_code':value.exitCode,
'signal':value.signal,
'status':value.status
}
)
else:
raise UnknownStatus("unknown exit status, this is probably a bug in Twisted")
Brian Granger
Initial merge of ssh cluster from ~vvatsa's ipcluster-dev.
r1830
Brian Granger
Initial version of working refactored ipcluster....
r1770 def outReceived(self, data):
log.msg(data)
Brian Granger
Initial merge of ssh cluster from ~vvatsa's ipcluster-dev.
r1830
Brian Granger
Initial version of working refactored ipcluster....
r1770 def errReceived(self, data):
log.err(data)
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234
Brian Granger
Initial version of working refactored ipcluster....
r1770 class ProcessLauncher(object):
"""
Start and stop an external process in an asynchronous manner.
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234
Brian Granger
Initial version of working refactored ipcluster....
r1770 Currently this uses deferreds to notify other parties of process state
changes. This is an awkward design and should be moved to using
a formal NotificationCenter.
"""
def __init__(self, cmd_and_args):
self.cmd = cmd_and_args[0]
self.args = cmd_and_args
self._reset()
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234
Brian Granger
Initial version of working refactored ipcluster....
r1770 def _reset(self):
self.process_protocol = None
self.pid = None
self.start_deferred = None
self.stop_deferreds = []
self.state = 'before' # before, running, or after
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234
Brian Granger
Initial version of working refactored ipcluster....
r1770 @property
def running(self):
if self.state == 'running':
return True
else:
return False
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234
Brian Granger
Initial version of working refactored ipcluster....
r1770 def fire_start_deferred(self, pid):
self.pid = pid
self.state = 'running'
log.msg('Process %r has started with pid=%i' % (self.args, pid))
self.start_deferred.callback(pid)
def start(self):
if self.state == 'before':
self.process_protocol = LauncherProcessProtocol(self)
self.start_deferred = defer.Deferred()
self.process_transport = reactor.spawnProcess(
self.process_protocol,
self.cmd,
self.args,
env=os.environ
)
return self.start_deferred
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234 else:
Brian Granger
Initial version of working refactored ipcluster....
r1770 s = 'the process has already been started and has state: %r' % \
self.state
return defer.fail(ProcessStateError(s))
def get_stop_deferred(self):
if self.state == 'running' or self.state == 'before':
d = defer.Deferred()
self.stop_deferreds.append(d)
return d
else:
s = 'this process is already complete'
return defer.fail(ProcessStateError(s))
def fire_stop_deferred(self, exit_code):
log.msg('Process %r has stopped with %r' % (self.args, exit_code))
self.state = 'after'
for d in self.stop_deferreds:
d.callback(exit_code)
def signal(self, sig):
"""
Send a signal to the process.
The argument sig can be ('KILL','INT', etc.) or any signal number.
"""
if self.state == 'running':
self.process_transport.signalProcess(sig)
Brian Granger
The local mode of ipcluster now works on Win32.
r1771 # def __del__(self):
# self.signal('KILL')
Brian Granger
Removed the -f option to ipcluster. The remote starting of an IPython...
r1610
Brian Granger
Initial version of working refactored ipcluster....
r1770 def interrupt_then_kill(self, delay=1.0):
self.signal('INT')
reactor.callLater(delay, self.signal, 'KILL')
Brian Granger
Removed the -f option to ipcluster. The remote starting of an IPython...
r1610
Brian Granger
Final few cleanup changes to the PBS cluster in ipcluster.
r1774 #-----------------------------------------------------------------------------
# Code for launching controller and engines
#-----------------------------------------------------------------------------
Brian Granger
Initial version of working refactored ipcluster....
r1770 class ControllerLauncher(ProcessLauncher):
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234
Brian Granger
Initial version of working refactored ipcluster....
r1770 def __init__(self, extra_args=None):
Brian Granger
The local mode of ipcluster now works on Win32.
r1771 if sys.platform == 'win32':
Administrator
Fixing numerous bugs in ipcluster on Win32....
r1815 # This logic is needed because the ipcontroller script doesn't
# always get installed in the same way or in the same location.
from IPython.kernel.scripts import ipcontroller
script_location = ipcontroller.__file__.replace('.pyc', '.py')
# The -u option here turns on unbuffered output, which is required
# on Win32 to prevent wierd conflict and problems with Twisted
args = [find_exe('python'), '-u', script_location]
Brian Granger
The local mode of ipcluster now works on Win32.
r1771 else:
args = ['ipcontroller']
Brian Granger
Initial version of working refactored ipcluster....
r1770 self.extra_args = extra_args
if extra_args is not None:
Brian Granger
The local mode of ipcluster now works on Win32.
r1771 args.extend(extra_args)
Brian Granger
Initial version of working refactored ipcluster....
r1770
Brian Granger
The local mode of ipcluster now works on Win32.
r1771 ProcessLauncher.__init__(self, args)
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234
Brian Granger
Initial version of working refactored ipcluster....
r1770 class EngineLauncher(ProcessLauncher):
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234
Brian Granger
Initial version of working refactored ipcluster....
r1770 def __init__(self, extra_args=None):
Brian Granger
The local mode of ipcluster now works on Win32.
r1771 if sys.platform == 'win32':
Administrator
Fixing numerous bugs in ipcluster on Win32....
r1815 # This logic is needed because the ipcontroller script doesn't
# always get installed in the same way or in the same location.
from IPython.kernel.scripts import ipengine
script_location = ipengine.__file__.replace('.pyc', '.py')
# The -u option here turns on unbuffered output, which is required
# on Win32 to prevent wierd conflict and problems with Twisted
args = [find_exe('python'), '-u', script_location]
Brian Granger
The local mode of ipcluster now works on Win32.
r1771 else:
args = ['ipengine']
Brian Granger
Initial version of working refactored ipcluster....
r1770 self.extra_args = extra_args
if extra_args is not None:
Brian Granger
The local mode of ipcluster now works on Win32.
r1771 args.extend(extra_args)
Brian Granger
Initial version of working refactored ipcluster....
r1770
Brian Granger
The local mode of ipcluster now works on Win32.
r1771 ProcessLauncher.__init__(self, args)
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234
Brian Granger
Initial version of working refactored ipcluster....
r1770 class LocalEngineSet(object):
def __init__(self, extra_args=None):
self.extra_args = extra_args
self.launchers = []
def start(self, n):
dlist = []
for i in range(n):
el = EngineLauncher(extra_args=self.extra_args)
d = el.start()
self.launchers.append(el)
dlist.append(d)
dfinal = gatherBoth(dlist, consumeErrors=True)
dfinal.addCallback(self._handle_start)
return dfinal
def _handle_start(self, r):
log.msg('Engines started with pids: %r' % r)
return r
def _handle_stop(self, r):
log.msg('Engines received signal: %r' % r)
return r
def signal(self, sig):
dlist = []
for el in self.launchers:
d = el.get_stop_deferred()
dlist.append(d)
el.signal(sig)
dfinal = gatherBoth(dlist, consumeErrors=True)
dfinal.addCallback(self._handle_stop)
return dfinal
def interrupt_then_kill(self, delay=1.0):
dlist = []
for el in self.launchers:
d = el.get_stop_deferred()
dlist.append(d)
el.interrupt_then_kill(delay)
dfinal = gatherBoth(dlist, consumeErrors=True)
dfinal.addCallback(self._handle_stop)
return dfinal
class BatchEngineSet(object):
# Subclasses must fill these in. See PBSEngineSet
submit_command = ''
delete_command = ''
job_id_regexp = ''
def __init__(self, template_file, **kwargs):
self.template_file = template_file
self.context = {}
self.context.update(kwargs)
Brian Granger
More changes to the PBS batch cluster.
r1773 self.batch_file = self.template_file+'-run'
Brian Granger
Initial merge of ssh cluster from ~vvatsa's ipcluster-dev.
r1830
Brian Granger
Initial version of working refactored ipcluster....
r1770 def parse_job_id(self, output):
m = re.match(self.job_id_regexp, output)
if m is not None:
job_id = m.group()
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234 else:
Brian Granger
Initial version of working refactored ipcluster....
r1770 raise Exception("job id couldn't be determined: %s" % output)
self.job_id = job_id
Brian Granger
More changes to the PBS batch cluster.
r1773 log.msg('Job started with job id: %r' % job_id)
Brian Granger
Initial version of working refactored ipcluster....
r1770 return job_id
def write_batch_script(self, n):
self.context['n'] = n
template = open(self.template_file, 'r').read()
Brian Granger
More changes to the PBS batch cluster.
r1773 log.msg('Using template for batch script: %s' % self.template_file)
Brian Granger
Initial version of working refactored ipcluster....
r1770 script_as_string = Itpl.itplns(template, self.context)
Brian Granger
More changes to the PBS batch cluster.
r1773 log.msg('Writing instantiated batch script: %s' % self.batch_file)
Brian Granger
Initial version of working refactored ipcluster....
r1770 f = open(self.batch_file,'w')
f.write(script_as_string)
f.close()
Brian Granger
Fixing small things in response to review.
r1826
Brian Granger
Initial version of working refactored ipcluster....
r1770 def handle_error(self, f):
f.printTraceback()
Brian Granger
More changes to the PBS batch cluster.
r1773 f.raiseException()
Brian Granger
Initial version of working refactored ipcluster....
r1770
def start(self, n):
self.write_batch_script(n)
d = getProcessOutput(self.submit_command,
[self.batch_file],env=os.environ)
d.addCallback(self.parse_job_id)
Brian Granger
Initial version that works with PBS.
r1772 d.addErrback(self.handle_error)
Brian Granger
Initial version of working refactored ipcluster....
r1770 return d
Brian Granger
Fixing small things in response to review.
r1826
Brian Granger
Initial version of working refactored ipcluster....
r1770 def kill(self):
d = getProcessOutput(self.delete_command,
[self.job_id],env=os.environ)
return d
class PBSEngineSet(BatchEngineSet):
submit_command = 'qsub'
delete_command = 'qdel'
job_id_regexp = '\d+'
def __init__(self, template_file, **kwargs):
BatchEngineSet.__init__(self, template_file, **kwargs)
Brian Granger
Merging in vvatsa's ssh mode for ipcluster with some changes....
r1832
sshx_template="""#!/bin/sh
Brian Granger
Initial merge of ssh cluster from ~vvatsa's ipcluster-dev.
r1830 "$@" &> /dev/null &
Brian Granger
Merging in vvatsa's ssh mode for ipcluster with some changes....
r1832 echo $!
"""
engine_killer_template="""#!/bin/sh
ps -fu `whoami` | grep '[i]pengine' | awk '{print $2}' | xargs kill -TERM
"""
Brian Granger
Initial merge of ssh cluster from ~vvatsa's ipcluster-dev.
r1830
Brian Granger
Merging in vvatsa's ssh mode for ipcluster with some changes....
r1832 class SSHEngineSet(object):
sshx_template=sshx_template
engine_killer_template=engine_killer_template
Brian Granger
Initial merge of ssh cluster from ~vvatsa's ipcluster-dev.
r1830
def __init__(self, engine_hosts, sshx=None, ipengine="ipengine"):
Brian Granger
Merging in vvatsa's ssh mode for ipcluster with some changes....
r1832 """Start a controller on localhost and engines using ssh.
The engine_hosts argument is a dict with hostnames as keys and
the number of engine (int) as values. sshx is the name of a local
file that will be used to run remote commands. This file is used
to setup the environment properly.
"""
Brian Granger
Initial merge of ssh cluster from ~vvatsa's ipcluster-dev.
r1830 self.temp_dir = tempfile.gettempdir()
Brian Granger
Merging in vvatsa's ssh mode for ipcluster with some changes....
r1832 if sshx is not None:
Brian Granger
Initial merge of ssh cluster from ~vvatsa's ipcluster-dev.
r1830 self.sshx = sshx
else:
Brian Granger
Merging in vvatsa's ssh mode for ipcluster with some changes....
r1832 # Write the sshx.sh file locally from our template.
self.sshx = os.path.join(
self.temp_dir,
'%s-main-sshx.sh' % os.environ['USER']
)
Brian Granger
Initial merge of ssh cluster from ~vvatsa's ipcluster-dev.
r1830 f = open(self.sshx, 'w')
f.writelines(self.sshx_template)
f.close()
self.engine_command = ipengine
self.engine_hosts = engine_hosts
Brian Granger
Merging in vvatsa's ssh mode for ipcluster with some changes....
r1832 # Write the engine killer script file locally from our template.
self.engine_killer = os.path.join(
self.temp_dir,
'%s-local-engine_killer.sh' % os.environ['USER']
)
Brian Granger
Initial merge of ssh cluster from ~vvatsa's ipcluster-dev.
r1830 f = open(self.engine_killer, 'w')
f.writelines(self.engine_killer_template)
f.close()
def start(self, send_furl=False):
Brian Granger
Merging in vvatsa's ssh mode for ipcluster with some changes....
r1832 dlist = []
Brian Granger
Initial merge of ssh cluster from ~vvatsa's ipcluster-dev.
r1830 for host in self.engine_hosts.keys():
count = self.engine_hosts[host]
Brian Granger
Merging in vvatsa's ssh mode for ipcluster with some changes....
r1832 d = self._start(host, count, send_furl)
dlist.append(d)
return gatherBoth(dlist, consumeErrors=True)
Brian Granger
Initial merge of ssh cluster from ~vvatsa's ipcluster-dev.
r1830
Brian Granger
Merging in vvatsa's ssh mode for ipcluster with some changes....
r1832 def _start(self, hostname, count=1, send_furl=False):
Brian Granger
Initial merge of ssh cluster from ~vvatsa's ipcluster-dev.
r1830 if send_furl:
Brian Granger
Merging in vvatsa's ssh mode for ipcluster with some changes....
r1832 d = self._scp_furl(hostname)
Brian Granger
Initial merge of ssh cluster from ~vvatsa's ipcluster-dev.
r1830 else:
Brian Granger
Merging in vvatsa's ssh mode for ipcluster with some changes....
r1832 d = defer.succeed(None)
d.addCallback(lambda r: self._scp_sshx(hostname))
d.addCallback(lambda r: self._ssh_engine(hostname, count))
return d
Brian Granger
Initial merge of ssh cluster from ~vvatsa's ipcluster-dev.
r1830
Brian Granger
Merging in vvatsa's ssh mode for ipcluster with some changes....
r1832 def _scp_furl(self, hostname):
scp_cmd = "scp ~/.ipython/security/ipcontroller-engine.furl %s:.ipython/security/" % (hostname)
cmd_list = scp_cmd.split()
cmd_list[1] = os.path.expanduser(cmd_list[1])
log.msg('Copying furl file: %s' % scp_cmd)
d = getProcessOutput(cmd_list[0], cmd_list[1:], env=os.environ)
return d
def _scp_sshx(self, hostname):
scp_cmd = "scp %s %s:%s/%s-sshx.sh" % (
self.sshx, hostname,
self.temp_dir, os.environ['USER']
)
print
log.msg("Copying sshx: %s" % scp_cmd)
sshx_scp = scp_cmd.split()
d = getProcessOutput(sshx_scp[0], sshx_scp[1:], env=os.environ)
return d
def _ssh_engine(self, hostname, count):
exec_engine = "ssh %s sh %s/%s-sshx.sh %s" % (
hostname, self.temp_dir,
os.environ['USER'], self.engine_command
)
cmds = exec_engine.split()
dlist = []
log.msg("about to start engines...")
for i in range(count):
log.msg('Starting engines: %s' % exec_engine)
d = getProcessOutput(cmds[0], cmds[1:], env=os.environ)
dlist.append(d)
return gatherBoth(dlist, consumeErrors=True)
def kill(self):
dlist = []
for host in self.engine_hosts.keys():
d = self._killall(host)
dlist.append(d)
return gatherBoth(dlist, consumeErrors=True)
def _killall(self, hostname):
d = self._scp_engine_killer(hostname)
d.addCallback(lambda r: self._ssh_kill(hostname))
# d.addErrback(self._exec_err)
return d
def _scp_engine_killer(self, hostname):
scp_cmd = "scp %s %s:%s/%s-engine_killer.sh" % (
self.engine_killer,
hostname,
self.temp_dir,
os.environ['USER']
)
Brian Granger
Initial merge of ssh cluster from ~vvatsa's ipcluster-dev.
r1830 cmds = scp_cmd.split()
Brian Granger
Merging in vvatsa's ssh mode for ipcluster with some changes....
r1832 log.msg('Copying engine_killer: %s' % scp_cmd)
Brian Granger
Initial merge of ssh cluster from ~vvatsa's ipcluster-dev.
r1830 d = getProcessOutput(cmds[0], cmds[1:], env=os.environ)
Brian Granger
Merging in vvatsa's ssh mode for ipcluster with some changes....
r1832 return d
def _ssh_kill(self, hostname):
kill_cmd = "ssh %s sh %s/%s-engine_killer.sh" % (
hostname,
self.temp_dir,
os.environ['USER']
)
log.msg('Killing engine: %s' % kill_cmd)
kill_cmd = kill_cmd.split()
d = getProcessOutput(kill_cmd[0], kill_cmd[1:], env=os.environ)
return d
Brian Granger
Initial merge of ssh cluster from ~vvatsa's ipcluster-dev.
r1830
Brian Granger
Merging in vvatsa's ssh mode for ipcluster with some changes....
r1832 def _exec_err(self, r):
log.msg(r)
Brian Granger
Initial version of working refactored ipcluster....
r1770
Brian Granger
Final few cleanup changes to the PBS cluster in ipcluster.
r1774 #-----------------------------------------------------------------------------
# Main functions for the different types of clusters
#-----------------------------------------------------------------------------
# TODO:
# The logic in these codes should be moved into classes like LocalCluster
# MpirunCluster, PBSCluster, etc. This would remove alot of the duplications.
# The main functions should then just parse the command line arguments, create
# the appropriate class and call a 'start' method.
Administrator
Fixing numerous bugs in ipcluster on Win32....
r1815 def check_security(args, cont_args):
if (not args.x or not args.y) and not have_crypto:
log.err("""
OpenSSL/pyOpenSSL is not available, so we can't run in secure mode.
Try running ipcluster with the -xy flags: ipcluster local -xy -n 4""")
reactor.stop()
return False
Brian Granger
The local mode of ipcluster now works on Win32.
r1771 if args.x:
cont_args.append('-x')
if args.y:
cont_args.append('-y')
Administrator
Fixing numerous bugs in ipcluster on Win32....
r1815 return True
Brian Granger
Initial merge of ssh cluster from ~vvatsa's ipcluster-dev.
r1830
Administrator
Fixing numerous bugs in ipcluster on Win32....
r1815 def main_local(args):
cont_args = []
cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
# Check security settings before proceeding
Brian Granger
Fixing small things in response to review.
r1826 if not check_security(args, cont_args):
return
Administrator
Fixing numerous bugs in ipcluster on Win32....
r1815
Brian Granger
Initial version of working refactored ipcluster....
r1770 cl = ControllerLauncher(extra_args=cont_args)
dstart = cl.start()
def start_engines(cont_pid):
engine_args = []
engine_args.append('--logfile=%s' % \
pjoin(args.logdir,'ipengine%s-' % cont_pid))
eset = LocalEngineSet(extra_args=engine_args)
def shutdown(signum, frame):
log.msg('Stopping local cluster')
# We are still playing with the times here, but these seem
# to be reliable in allowing everything to exit cleanly.
eset.interrupt_then_kill(0.5)
cl.interrupt_then_kill(0.5)
reactor.callLater(1.0, reactor.stop)
signal.signal(signal.SIGINT,shutdown)
d = eset.start(args.n)
return d
def delay_start(cont_pid):
# This is needed because the controller doesn't start listening
# right when it starts and the controller needs to write
# furl files for the engine to pick up
reactor.callLater(1.0, start_engines, cont_pid)
dstart.addCallback(delay_start)
dstart.addErrback(lambda f: f.raiseException())
Brian Granger
Initial merge of ssh cluster from ~vvatsa's ipcluster-dev.
r1830
Brian Granger
Initial version of working refactored ipcluster....
r1770 def main_mpirun(args):
cont_args = []
cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
Administrator
Fixing numerous bugs in ipcluster on Win32....
r1815
# Check security settings before proceeding
Brian Granger
Fixing small things in response to review.
r1826 if not check_security(args, cont_args):
return
Administrator
Fixing numerous bugs in ipcluster on Win32....
r1815
Brian Granger
Initial version of working refactored ipcluster....
r1770 cl = ControllerLauncher(extra_args=cont_args)
dstart = cl.start()
def start_engines(cont_pid):
raw_args = ['mpirun']
raw_args.extend(['-n',str(args.n)])
raw_args.append('ipengine')
raw_args.append('-l')
raw_args.append(pjoin(args.logdir,'ipengine%s-' % cont_pid))
Brian Granger
Update of docs to reflect the new ipcluster version....
r1788 if args.mpi:
raw_args.append('--mpi=%s' % args.mpi)
Brian Granger
Initial version of working refactored ipcluster....
r1770 eset = ProcessLauncher(raw_args)
def shutdown(signum, frame):
log.msg('Stopping local cluster')
# We are still playing with the times here, but these seem
# to be reliable in allowing everything to exit cleanly.
eset.interrupt_then_kill(1.0)
cl.interrupt_then_kill(1.0)
reactor.callLater(2.0, reactor.stop)
signal.signal(signal.SIGINT,shutdown)
d = eset.start()
return d
def delay_start(cont_pid):
# This is needed because the controller doesn't start listening
# right when it starts and the controller needs to write
# furl files for the engine to pick up
reactor.callLater(1.0, start_engines, cont_pid)
dstart.addCallback(delay_start)
dstart.addErrback(lambda f: f.raiseException())
Brian Granger
Initial merge of ssh cluster from ~vvatsa's ipcluster-dev.
r1830
Brian Granger
Initial version of working refactored ipcluster....
r1770 def main_pbs(args):
Brian Granger
Initial version that works with PBS.
r1772 cont_args = []
cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
Administrator
Fixing numerous bugs in ipcluster on Win32....
r1815
# Check security settings before proceeding
Brian Granger
Fixing small things in response to review.
r1826 if not check_security(args, cont_args):
return
Administrator
Fixing numerous bugs in ipcluster on Win32....
r1815
Brian Granger
Initial version that works with PBS.
r1772 cl = ControllerLauncher(extra_args=cont_args)
Brian Granger
Initial version of working refactored ipcluster....
r1770 dstart = cl.start()
def start_engines(r):
Brian Granger
Initial version that works with PBS.
r1772 pbs_set = PBSEngineSet(args.pbsscript)
Brian Granger
More changes to the PBS batch cluster.
r1773 def shutdown(signum, frame):
log.msg('Stopping pbs cluster')
d = pbs_set.kill()
d.addBoth(lambda _: cl.interrupt_then_kill(1.0))
d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop))
signal.signal(signal.SIGINT,shutdown)
Brian Granger
Initial version of working refactored ipcluster....
r1770 d = pbs_set.start(args.n)
return d
dstart.addCallback(start_engines)
Brian Granger
Initial version that works with PBS.
r1772 dstart.addErrback(lambda f: f.raiseException())
Brian Granger
Initial version of working refactored ipcluster....
r1770
Brian Granger
Initial merge of ssh cluster from ~vvatsa's ipcluster-dev.
r1830 def main_ssh(args):
Brian Granger
Merging in vvatsa's ssh mode for ipcluster with some changes....
r1832 """Start a controller on localhost and engines using ssh.
Your clusterfile should look like::
send_furl = False # True, if you want
engines = {
'engine_host1' : engine_count,
'engine_host2' : engine_count2
}
"""
Brian Granger
Initial merge of ssh cluster from ~vvatsa's ipcluster-dev.
r1830 clusterfile = {}
execfile(args.clusterfile, clusterfile)
if not clusterfile.has_key('send_furl'):
clusterfile['send_furl'] = False
cont_args = []
cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
Brian Granger
Merging in vvatsa's ssh mode for ipcluster with some changes....
r1832
# Check security settings before proceeding
if not check_security(args, cont_args):
return
Brian Granger
Initial merge of ssh cluster from ~vvatsa's ipcluster-dev.
r1830 cl = ControllerLauncher(extra_args=cont_args)
dstart = cl.start()
def start_engines(cont_pid):
Brian Granger
Merging in vvatsa's ssh mode for ipcluster with some changes....
r1832 ssh_set = SSHEngineSet(clusterfile['engines'], sshx=args.sshx)
Brian Granger
Initial merge of ssh cluster from ~vvatsa's ipcluster-dev.
r1830 def shutdown(signum, frame):
Brian Granger
Merging in vvatsa's ssh mode for ipcluster with some changes....
r1832 d = ssh_set.kill()
# d.addErrback(log.err)
cl.interrupt_then_kill(1.0)
Brian Granger
Initial merge of ssh cluster from ~vvatsa's ipcluster-dev.
r1830 reactor.callLater(2.0, reactor.stop)
signal.signal(signal.SIGINT,shutdown)
Brian Granger
Merging in vvatsa's ssh mode for ipcluster with some changes....
r1832 d = ssh_set.start(clusterfile['send_furl'])
return d
Brian Granger
Initial merge of ssh cluster from ~vvatsa's ipcluster-dev.
r1830 def delay_start(cont_pid):
reactor.callLater(1.0, start_engines, cont_pid)
dstart.addCallback(delay_start)
dstart.addErrback(lambda f: f.raiseException())
Brian Granger
Initial version of working refactored ipcluster....
r1770 def get_args():
Brian Granger
The local mode of ipcluster now works on Win32.
r1771 base_parser = argparse.ArgumentParser(add_help=False)
base_parser.add_argument(
'-x',
action='store_true',
dest='x',
help='turn off client security'
)
base_parser.add_argument(
'-y',
action='store_true',
dest='y',
help='turn off engine security'
)
base_parser.add_argument(
"--logdir",
type=str,
dest="logdir",
help="directory to put log files (default=$IPYTHONDIR/log)",
default=pjoin(get_ipython_dir(),'log')
)
base_parser.add_argument(
"-n",
"--num",
type=int,
dest="n",
default=2,
help="the number of engines to start"
)
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234
Brian Granger
The local mode of ipcluster now works on Win32.
r1771 parser = argparse.ArgumentParser(
description='IPython cluster startup. This starts a controller and\
Brian Granger
Updating docs to reflect recent work on ipcluster....
r1775 engines using various approaches. THIS IS A TECHNOLOGY PREVIEW AND\
THE API WILL CHANGE SIGNIFICANTLY BEFORE THE FINAL RELEASE.'
Brian Granger
The local mode of ipcluster now works on Win32.
r1771 )
subparsers = parser.add_subparsers(
help='available cluster types. For help, do "ipcluster TYPE --help"')
Brian Granger
Added test in ipcluster.py to see if the platform is win32. If so,...
r1613
Brian Granger
The local mode of ipcluster now works on Win32.
r1771 parser_local = subparsers.add_parser(
'local',
help='run a local cluster',
parents=[base_parser]
)
Brian Granger
Initial version of working refactored ipcluster....
r1770 parser_local.set_defaults(func=main_local)
Brian Granger
The local mode of ipcluster now works on Win32.
r1771 parser_mpirun = subparsers.add_parser(
'mpirun',
help='run a cluster using mpirun',
parents=[base_parser]
)
parser_mpirun.add_argument(
"--mpi",
type=str,
Brian Granger
Update of docs to reflect the new ipcluster version....
r1788 dest="mpi", # Don't put a default here to allow no MPI support
Brian Granger
The local mode of ipcluster now works on Win32.
r1771 help="how to call MPI_Init (default=mpi4py)"
)
parser_mpirun.set_defaults(func=main_mpirun)
Brian Granger
Initial version of working refactored ipcluster....
r1770
Brian Granger
Initial version that works with PBS.
r1772 parser_pbs = subparsers.add_parser(
Brian Granger
Final few cleanup changes to the PBS cluster in ipcluster.
r1774 'pbs',
Brian Granger
Initial version that works with PBS.
r1772 help='run a pbs cluster',
parents=[base_parser]
)
parser_pbs.add_argument(
'--pbs-script',
type=str,
dest='pbsscript',
help='PBS script template',
default='pbs.template'
)
Brian Granger
Initial version of working refactored ipcluster....
r1770 parser_pbs.set_defaults(func=main_pbs)
Brian Granger
Initial merge of ssh cluster from ~vvatsa's ipcluster-dev.
r1830
parser_ssh = subparsers.add_parser(
'ssh',
help='run a cluster using ssh, should have ssh-keys setup',
parents=[base_parser]
)
parser_ssh.add_argument(
'--clusterfile',
type=str,
dest='clusterfile',
help='python file describing the cluster',
default='clusterfile.py',
)
parser_ssh.add_argument(
'--sshx',
type=str,
dest='sshx',
Brian Granger
Merging in vvatsa's ssh mode for ipcluster with some changes....
r1832 help='sshx launcher helper'
Brian Granger
Initial merge of ssh cluster from ~vvatsa's ipcluster-dev.
r1830 )
parser_ssh.set_defaults(func=main_ssh)
Brian Granger
Initial version of working refactored ipcluster....
r1770 args = parser.parse_args()
return args
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234
Brian Granger
Initial version of working refactored ipcluster....
r1770 def main():
args = get_args()
reactor.callWhenRunning(args.func, args)
log.startLogging(sys.stdout)
reactor.run()
if __name__ == '__main__':
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234 main()