##// END OF EJS Templates
Remove bash-ism and use standard posix redirect in sshx.sh script....
Remove bash-ism and use standard posix redirect in sshx.sh script. Closes gh-274, thanks to github user juliantaylor for the fix!

File last commit:

r3370:a63739a5
r3370:a63739a5
Show More
ipcluster.py
1041 lines | 33.6 KiB | text/x-python | PythonLexer
Brian Granger
Fixing unticketed bug in ipcluster.py....
r1943 #!/usr/bin/env python
Brian Granger
Final few cleanup changes to the PBS cluster in ipcluster.
r1774 # encoding: utf-8
"""Start an IPython cluster = (controller + engines)."""
#-----------------------------------------------------------------------------
# Copyright (C) 2008 The IPython Development Team
#
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-----------------------------------------------------------------------------
#-----------------------------------------------------------------------------
# Imports
#-----------------------------------------------------------------------------
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234 import os
Brian Granger
Initial version of working refactored ipcluster....
r1770 import re
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234 import sys
Brian Granger
Initial version of working refactored ipcluster....
r1770 import signal
Matthieu Brucher
Fixes for LSF
r2680 import stat
Brian Granger
Initial merge of ssh cluster from ~vvatsa's ipcluster-dev.
r1830 import tempfile
Brian Granger
Initial version of working refactored ipcluster....
r1770 pjoin = os.path.join
from twisted.internet import reactor, defer
from twisted.internet.protocol import ProcessProtocol
from twisted.internet.error import ProcessDone, ProcessTerminated
from twisted.internet.utils import getProcessOutput
Brian Granger
Fixing small things in response to review.
r1826 from twisted.python import failure, log
Brian Granger
Initial version of working refactored ipcluster....
r1770
from IPython.external import argparse
from IPython.external import Itpl
Brian Granger
Addressing various review comments.
r1958 from IPython.genutils import (
get_ipython_dir,
get_log_dir,
get_security_dir,
num_cpus
)
Administrator
Fixing numerous bugs in ipcluster on Win32....
r1815 from IPython.kernel.fcutil import have_crypto
Brian Granger
Fix for ticket: https://bugs.launchpad.net/bugs/361414...
r1945
# Create various ipython directories if they don't exist.
# This must be done before IPython.kernel.config is imported.
from IPython.iplib import user_setup
if os.name == 'posix':
rc_suffix = ''
else:
rc_suffix = '.ini'
user_setup(get_ipython_dir(), rc_suffix, mode='install', interactive=False)
get_log_dir()
get_security_dir()
Brian Granger
Fix for ipcluster bug: https://bugs.launchpad.net/bugs/358202...
r1944 from IPython.kernel.config import config_manager as kernel_config_manager
from IPython.kernel.error import SecurityError, FileTimeoutError
Brian Granger
Fixing small things in response to review.
r1826 from IPython.kernel.fcutil import have_crypto
Brian Granger
Fix for ipcluster bug: https://bugs.launchpad.net/bugs/358202...
r1944 from IPython.kernel.twistedutil import gatherBoth, wait_for_file
Brian Granger
Fixing small things in response to review.
r1826 from IPython.kernel.util import printer
Brian Granger
Final few cleanup changes to the PBS cluster in ipcluster.
r1774 #-----------------------------------------------------------------------------
# General process handling code
#-----------------------------------------------------------------------------
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234
Brian Granger
Initial version of working refactored ipcluster....
r1770 class ProcessStateError(Exception):
pass
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234
Brian Granger
Initial version of working refactored ipcluster....
r1770 class UnknownStatus(Exception):
pass
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234
Brian Granger
Initial version of working refactored ipcluster....
r1770 class LauncherProcessProtocol(ProcessProtocol):
"""
A ProcessProtocol to go with the ProcessLauncher.
"""
def __init__(self, process_launcher):
self.process_launcher = process_launcher
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234
Brian Granger
Initial version of working refactored ipcluster....
r1770 def connectionMade(self):
self.process_launcher.fire_start_deferred(self.transport.pid)
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234
Brian Granger
Initial version of working refactored ipcluster....
r1770 def processEnded(self, status):
value = status.value
if isinstance(value, ProcessDone):
self.process_launcher.fire_stop_deferred(0)
elif isinstance(value, ProcessTerminated):
self.process_launcher.fire_stop_deferred(
{'exit_code':value.exitCode,
'signal':value.signal,
'status':value.status
}
)
else:
raise UnknownStatus("unknown exit status, this is probably a bug in Twisted")
Brian Granger
Initial merge of ssh cluster from ~vvatsa's ipcluster-dev.
r1830
Brian Granger
Initial version of working refactored ipcluster....
r1770 def outReceived(self, data):
log.msg(data)
Brian Granger
Initial merge of ssh cluster from ~vvatsa's ipcluster-dev.
r1830
Brian Granger
Initial version of working refactored ipcluster....
r1770 def errReceived(self, data):
log.err(data)
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234
Brian Granger
Initial version of working refactored ipcluster....
r1770 class ProcessLauncher(object):
"""
Start and stop an external process in an asynchronous manner.
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234
Brian Granger
Initial version of working refactored ipcluster....
r1770 Currently this uses deferreds to notify other parties of process state
changes. This is an awkward design and should be moved to using
a formal NotificationCenter.
"""
def __init__(self, cmd_and_args):
self.cmd = cmd_and_args[0]
self.args = cmd_and_args
self._reset()
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234
Brian Granger
Initial version of working refactored ipcluster....
r1770 def _reset(self):
self.process_protocol = None
self.pid = None
self.start_deferred = None
self.stop_deferreds = []
self.state = 'before' # before, running, or after
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234
Brian Granger
Initial version of working refactored ipcluster....
r1770 @property
def running(self):
if self.state == 'running':
return True
else:
return False
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234
Brian Granger
Initial version of working refactored ipcluster....
r1770 def fire_start_deferred(self, pid):
self.pid = pid
self.state = 'running'
log.msg('Process %r has started with pid=%i' % (self.args, pid))
self.start_deferred.callback(pid)
def start(self):
if self.state == 'before':
self.process_protocol = LauncherProcessProtocol(self)
self.start_deferred = defer.Deferred()
self.process_transport = reactor.spawnProcess(
self.process_protocol,
self.cmd,
self.args,
env=os.environ
)
return self.start_deferred
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234 else:
Brian Granger
Initial version of working refactored ipcluster....
r1770 s = 'the process has already been started and has state: %r' % \
self.state
return defer.fail(ProcessStateError(s))
def get_stop_deferred(self):
if self.state == 'running' or self.state == 'before':
d = defer.Deferred()
self.stop_deferreds.append(d)
return d
else:
s = 'this process is already complete'
return defer.fail(ProcessStateError(s))
def fire_stop_deferred(self, exit_code):
log.msg('Process %r has stopped with %r' % (self.args, exit_code))
self.state = 'after'
for d in self.stop_deferreds:
d.callback(exit_code)
def signal(self, sig):
"""
Send a signal to the process.
The argument sig can be ('KILL','INT', etc.) or any signal number.
"""
if self.state == 'running':
self.process_transport.signalProcess(sig)
Brian Granger
The local mode of ipcluster now works on Win32.
r1771 # def __del__(self):
# self.signal('KILL')
Brian Granger
Removed the -f option to ipcluster. The remote starting of an IPython...
r1610
Brian Granger
Initial version of working refactored ipcluster....
r1770 def interrupt_then_kill(self, delay=1.0):
self.signal('INT')
reactor.callLater(delay, self.signal, 'KILL')
Brian Granger
Removed the -f option to ipcluster. The remote starting of an IPython...
r1610
Brian Granger
Final few cleanup changes to the PBS cluster in ipcluster.
r1774 #-----------------------------------------------------------------------------
# Code for launching controller and engines
#-----------------------------------------------------------------------------
Brian Granger
Initial version of working refactored ipcluster....
r1770 class ControllerLauncher(ProcessLauncher):
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234
Brian Granger
Initial version of working refactored ipcluster....
r1770 def __init__(self, extra_args=None):
Brian Granger
The local mode of ipcluster now works on Win32.
r1771 if sys.platform == 'win32':
Administrator
Fixing numerous bugs in ipcluster on Win32....
r1815 # This logic is needed because the ipcontroller script doesn't
# always get installed in the same way or in the same location.
from IPython.kernel.scripts import ipcontroller
script_location = ipcontroller.__file__.replace('.pyc', '.py')
# The -u option here turns on unbuffered output, which is required
Brian Granger
Fixing unticketed bug in ipcluster.py....
r1943 # on Win32 to prevent wierd conflict and problems with Twisted.
# Also, use sys.executable to make sure we are picking up the
# right python exe.
args = [sys.executable, '-u', script_location]
Brian Granger
The local mode of ipcluster now works on Win32.
r1771 else:
args = ['ipcontroller']
Brian Granger
Initial version of working refactored ipcluster....
r1770 self.extra_args = extra_args
if extra_args is not None:
Brian Granger
The local mode of ipcluster now works on Win32.
r1771 args.extend(extra_args)
Brian Granger
Initial version of working refactored ipcluster....
r1770
Brian Granger
The local mode of ipcluster now works on Win32.
r1771 ProcessLauncher.__init__(self, args)
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234
Brian Granger
Initial version of working refactored ipcluster....
r1770 class EngineLauncher(ProcessLauncher):
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234
Brian Granger
Initial version of working refactored ipcluster....
r1770 def __init__(self, extra_args=None):
Brian Granger
The local mode of ipcluster now works on Win32.
r1771 if sys.platform == 'win32':
Administrator
Fixing numerous bugs in ipcluster on Win32....
r1815 # This logic is needed because the ipcontroller script doesn't
# always get installed in the same way or in the same location.
from IPython.kernel.scripts import ipengine
script_location = ipengine.__file__.replace('.pyc', '.py')
# The -u option here turns on unbuffered output, which is required
Brian Granger
Fixing unticketed bug in ipcluster.py....
r1943 # on Win32 to prevent wierd conflict and problems with Twisted.
# Also, use sys.executable to make sure we are picking up the
# right python exe.
args = [sys.executable, '-u', script_location]
Brian Granger
The local mode of ipcluster now works on Win32.
r1771 else:
args = ['ipengine']
Brian Granger
Initial version of working refactored ipcluster....
r1770 self.extra_args = extra_args
if extra_args is not None:
Brian Granger
The local mode of ipcluster now works on Win32.
r1771 args.extend(extra_args)
Brian Granger
Initial version of working refactored ipcluster....
r1770
Brian Granger
The local mode of ipcluster now works on Win32.
r1771 ProcessLauncher.__init__(self, args)
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234
Brian Granger
Initial version of working refactored ipcluster....
r1770 class LocalEngineSet(object):
def __init__(self, extra_args=None):
self.extra_args = extra_args
self.launchers = []
def start(self, n):
dlist = []
for i in range(n):
Satrajit Ghosh
updated sge to launch multiple engines - start cleanly but does not shutdown cleanly
r2623 print "starting engine:", i
Brian Granger
Initial version of working refactored ipcluster....
r1770 el = EngineLauncher(extra_args=self.extra_args)
d = el.start()
self.launchers.append(el)
dlist.append(d)
dfinal = gatherBoth(dlist, consumeErrors=True)
dfinal.addCallback(self._handle_start)
return dfinal
def _handle_start(self, r):
log.msg('Engines started with pids: %r' % r)
return r
def _handle_stop(self, r):
log.msg('Engines received signal: %r' % r)
return r
def signal(self, sig):
dlist = []
for el in self.launchers:
d = el.get_stop_deferred()
dlist.append(d)
el.signal(sig)
dfinal = gatherBoth(dlist, consumeErrors=True)
dfinal.addCallback(self._handle_stop)
return dfinal
def interrupt_then_kill(self, delay=1.0):
dlist = []
for el in self.launchers:
d = el.get_stop_deferred()
dlist.append(d)
el.interrupt_then_kill(delay)
dfinal = gatherBoth(dlist, consumeErrors=True)
dfinal.addCallback(self._handle_stop)
return dfinal
class BatchEngineSet(object):
Justin Riley
update PBSEngineSet to use job arrays...
r2676
# Subclasses must fill these in. See PBSEngineSet/SGEEngineSet
name = ''
Brian Granger
Initial version of working refactored ipcluster....
r1770 submit_command = ''
delete_command = ''
job_id_regexp = ''
Justin Riley
update PBSEngineSet to use job arrays...
r2676 job_array_regexp = ''
Justin Riley
add --queue to ipcluster's SGE/PBS/LSF subcmds
r2679 job_array_template = ''
queue_regexp = ''
queue_template = ''
Justin Riley
update PBSEngineSet to use job arrays...
r2676 default_template = ''
Justin Riley
add --queue to ipcluster's SGE/PBS/LSF subcmds
r2679 def __init__(self, template_file, queue, **kwargs):
Brian Granger
Initial version of working refactored ipcluster....
r1770 self.template_file = template_file
Justin Riley
add --queue to ipcluster's SGE/PBS/LSF subcmds
r2679 self.queue = queue
Justin Riley
update PBSEngineSet to use job arrays...
r2676
Brian Granger
Initial version of working refactored ipcluster....
r1770 def parse_job_id(self, output):
Justin Riley
update PBSEngineSet to use job arrays...
r2676 m = re.search(self.job_id_regexp, output)
Brian Granger
Initial version of working refactored ipcluster....
r1770 if m is not None:
job_id = m.group()
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234 else:
Brian Granger
Initial version of working refactored ipcluster....
r1770 raise Exception("job id couldn't be determined: %s" % output)
self.job_id = job_id
Brian Granger
More changes to the PBS batch cluster.
r1773 log.msg('Job started with job id: %r' % job_id)
Brian Granger
Initial version of working refactored ipcluster....
r1770 return job_id
Justin Riley
update PBSEngineSet to use job arrays...
r2676
Brian Granger
Initial version of working refactored ipcluster....
r1770 def handle_error(self, f):
f.printTraceback()
Brian Granger
More changes to the PBS batch cluster.
r1773 f.raiseException()
Justin Riley
update SGEEngineSet to use SGE job arrays...
r2675
Satrajit Ghosh
updated sge to launch multiple engines - start cleanly but does not shutdown cleanly
r2623 def start(self, n):
Justin Riley
update SGEEngineSet to use SGE job arrays...
r2675 log.msg("starting %d engines" % n)
self._temp_file = tempfile.NamedTemporaryFile()
Justin Riley
force /bin/sh in default SGE/PBS/LFS templates
r2681 os.chmod(self._temp_file.name, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
Justin Riley
update SGEEngineSet to use SGE job arrays...
r2675 if self.template_file:
Justin Riley
update PBSEngineSet to use job arrays...
r2676 log.msg("Using %s script %s" % (self.name, self.template_file))
Justin Riley
update SGEEngineSet to use SGE job arrays...
r2675 contents = open(self.template_file, 'r').read()
Justin Riley
add --queue to ipcluster's SGE/PBS/LSF subcmds
r2679 new_script = contents
regex = re.compile(self.job_array_regexp)
Justin Riley
update SGEEngineSet to use SGE job arrays...
r2675 if not regex.search(contents):
Justin Riley
update PBSEngineSet to use job arrays...
r2676 log.msg("adding job array settings to %s script" % self.name)
Justin Riley
add --queue to ipcluster's SGE/PBS/LSF subcmds
r2679 new_script = self.job_array_template % n +'\n' + new_script
print self.queue_regexp
regex = re.compile(self.queue_regexp)
print regex.search(contents)
if self.queue and not regex.search(contents):
log.msg("adding queue settings to %s script" % self.name)
new_script = self.queue_template % self.queue + '\n' + new_script
if new_script != contents:
self._temp_file.write(new_script)
Justin Riley
update SGEEngineSet to use SGE job arrays...
r2675 self.template_file = self._temp_file.name
else:
Justin Riley
add --queue to ipcluster's SGE/PBS/LSF subcmds
r2679 default_script = self.default_template % n
if self.queue:
default_script = self.queue_template % self.queue + \
'\n' + default_script
Justin Riley
update PBSEngineSet to use job arrays...
r2676 log.msg("using default ipengine %s script: \n%s" %
Justin Riley
add --queue to ipcluster's SGE/PBS/LSF subcmds
r2679 (self.name, default_script))
self._temp_file.file.write(default_script)
Justin Riley
update SGEEngineSet to use SGE job arrays...
r2675 self.template_file = self._temp_file.name
Matthieu Brucher
Fixes for LSF
r2680 self._temp_file.file.close()
Justin Riley
update SGEEngineSet to use SGE job arrays...
r2675 d = getProcessOutput(self.submit_command,
[self.template_file],
env=os.environ)
d.addCallback(self.parse_job_id)
d.addErrback(self.handle_error)
return d
Justin Riley
update PBSEngineSet to use job arrays...
r2676 def kill(self):
d = getProcessOutput(self.delete_command,
[self.job_id],env=os.environ)
return d
class PBSEngineSet(BatchEngineSet):
name = 'PBS'
submit_command = 'qsub'
delete_command = 'qdel'
job_id_regexp = '\d+'
job_array_regexp = '#PBS[ \t]+-t[ \t]+\d+'
Justin Riley
add --queue to ipcluster's SGE/PBS/LSF subcmds
r2679 job_array_template = '#PBS -t 1-%d'
queue_regexp = '#PBS[ \t]+-q[ \t]+\w+'
queue_template = '#PBS -q %s'
Justin Riley
force /bin/sh in default SGE/PBS/LFS templates
r2681 default_template="""#!/bin/sh
#PBS -V
Justin Riley
update PBSEngineSet to use job arrays...
r2676 #PBS -t 1-%d
#PBS -N ipengine
eid=$(($PBS_ARRAYID - 1))
ipengine --logfile=ipengine${eid}.log
"""
class SGEEngineSet(PBSEngineSet):
name = 'SGE'
job_array_regexp = '#\$[ \t]+-t[ \t]+\d+'
Justin Riley
add --queue to ipcluster's SGE/PBS/LSF subcmds
r2679 job_array_template = '#$ -t 1-%d'
queue_regexp = '#\$[ \t]+-q[ \t]+\w+'
queue_template = '#$ -q %s'
Justin Riley
update PBSEngineSet to use job arrays...
r2676 default_template="""#$ -V
Justin Riley
force /bin/sh in default SGE/PBS/LFS templates
r2681 #$ -S /bin/sh
Justin Riley
update SGEEngineSet to use SGE job arrays...
r2675 #$ -t 1-%d
#$ -N ipengine
eid=$(($SGE_TASK_ID - 1))
ipengine --logfile=ipengine${eid}.log
"""
Justin Riley
add experimental LSF support (needs testing)
r2677 class LSFEngineSet(PBSEngineSet):
name = 'LSF'
submit_command = 'bsub'
delete_command = 'bkill'
Justin Riley
add --queue to ipcluster's SGE/PBS/LSF subcmds
r2679 job_array_regexp = '#BSUB[ \t]-J+\w+\[\d+-\d+\]'
job_array_template = '#BSUB -J ipengine[1-%d]'
queue_regexp = '#BSUB[ \t]+-q[ \t]+\w+'
queue_template = '#BSUB -q %s'
Justin Riley
force /bin/sh in default SGE/PBS/LFS templates
r2681 default_template="""#!/bin/sh
#BSUB -J ipengine[1-%d]
Justin Riley
add experimental LSF support (needs testing)
r2677 eid=$(($LSB_JOBINDEX - 1))
ipengine --logfile=ipengine${eid}.log
"""
Justin Riley
add bsub wrapper for LSFEngineSet...
r2682 bsub_wrapper="""#!/bin/sh
bsub < $1
"""
def __init__(self, template_file, queue, **kwargs):
self._bsub_wrapper = self._make_bsub_wrapper()
self.submit_command = self._bsub_wrapper.name
PBSEngineSet.__init__(self,template_file, queue, **kwargs)
def _make_bsub_wrapper(self):
bsub_wrapper = tempfile.NamedTemporaryFile()
bsub_wrapper.write(self.bsub_wrapper)
bsub_wrapper.file.close()
os.chmod(bsub_wrapper.name, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
return bsub_wrapper
Justin Riley
add experimental LSF support (needs testing)
r2677
Satrajit Ghosh
ENH: ssh now allows sending all current environment args to remote ipengine
r3061 sshx_template_prefix="""#!/bin/sh
"""
Fernando Perez
Remove bash-ism and use standard posix redirect in sshx.sh script....
r3370 sshx_template_suffix=""""$@" > /dev/null 2>&1 &
Brian Granger
Merging in vvatsa's ssh mode for ipcluster with some changes....
r1832 echo $!
"""
engine_killer_template="""#!/bin/sh
ps -fu `whoami` | grep '[i]pengine' | awk '{print $2}' | xargs kill -TERM
"""
Brian Granger
Initial merge of ssh cluster from ~vvatsa's ipcluster-dev.
r1830
Satrajit Ghosh
FIX: updated ssh environment copying to escape special characters for posix compatibility
r3063 def escape_strings(val):
val = val.replace('(','\(')
val = val.replace(')','\)')
if ' ' in val:
val = '"%s"'%val
return val
Brian Granger
Merging in vvatsa's ssh mode for ipcluster with some changes....
r1832 class SSHEngineSet(object):
Satrajit Ghosh
ENH: ssh now allows sending all current environment args to remote ipengine
r3061 sshx_template_prefix=sshx_template_prefix
sshx_template_suffix=sshx_template_suffix
Brian Granger
Merging in vvatsa's ssh mode for ipcluster with some changes....
r1832 engine_killer_template=engine_killer_template
Brian Granger
Initial merge of ssh cluster from ~vvatsa's ipcluster-dev.
r1830
Satrajit Ghosh
ENH: ssh now allows sending all current environment args to remote ipengine
r3061 def __init__(self, engine_hosts, sshx=None, copyenvs=None, ipengine="ipengine"):
Brian Granger
Merging in vvatsa's ssh mode for ipcluster with some changes....
r1832 """Start a controller on localhost and engines using ssh.
The engine_hosts argument is a dict with hostnames as keys and
the number of engine (int) as values. sshx is the name of a local
file that will be used to run remote commands. This file is used
to setup the environment properly.
"""
Brian Granger
Initial merge of ssh cluster from ~vvatsa's ipcluster-dev.
r1830 self.temp_dir = tempfile.gettempdir()
Brian Granger
Merging in vvatsa's ssh mode for ipcluster with some changes....
r1832 if sshx is not None:
Brian Granger
Initial merge of ssh cluster from ~vvatsa's ipcluster-dev.
r1830 self.sshx = sshx
else:
Brian Granger
Merging in vvatsa's ssh mode for ipcluster with some changes....
r1832 # Write the sshx.sh file locally from our template.
self.sshx = os.path.join(
self.temp_dir,
'%s-main-sshx.sh' % os.environ['USER']
)
Brian Granger
Initial merge of ssh cluster from ~vvatsa's ipcluster-dev.
r1830 f = open(self.sshx, 'w')
Satrajit Ghosh
ENH: ssh now allows sending all current environment args to remote ipengine
r3061 f.writelines(self.sshx_template_prefix)
if copyenvs:
Satrajit Ghosh
FIX: updated ssh environment copying to escape special characters for posix compatibility
r3063 for key, val in sorted(os.environ.items()):
newval = escape_strings(val)
f.writelines('export %s=%s\n'%(key,newval))
Satrajit Ghosh
ENH: ssh now allows sending all current environment args to remote ipengine
r3061 f.writelines(self.sshx_template_suffix)
Brian Granger
Initial merge of ssh cluster from ~vvatsa's ipcluster-dev.
r1830 f.close()
self.engine_command = ipengine
self.engine_hosts = engine_hosts
Brian Granger
Merging in vvatsa's ssh mode for ipcluster with some changes....
r1832 # Write the engine killer script file locally from our template.
self.engine_killer = os.path.join(
self.temp_dir,
'%s-local-engine_killer.sh' % os.environ['USER']
)
Brian Granger
Initial merge of ssh cluster from ~vvatsa's ipcluster-dev.
r1830 f = open(self.engine_killer, 'w')
f.writelines(self.engine_killer_template)
f.close()
def start(self, send_furl=False):
Brian Granger
Merging in vvatsa's ssh mode for ipcluster with some changes....
r1832 dlist = []
Brian Granger
Initial merge of ssh cluster from ~vvatsa's ipcluster-dev.
r1830 for host in self.engine_hosts.keys():
count = self.engine_hosts[host]
Brian Granger
Merging in vvatsa's ssh mode for ipcluster with some changes....
r1832 d = self._start(host, count, send_furl)
dlist.append(d)
return gatherBoth(dlist, consumeErrors=True)
Brian Granger
Initial merge of ssh cluster from ~vvatsa's ipcluster-dev.
r1830
Brian Granger
Merging in vvatsa's ssh mode for ipcluster with some changes....
r1832 def _start(self, hostname, count=1, send_furl=False):
Brian Granger
Initial merge of ssh cluster from ~vvatsa's ipcluster-dev.
r1830 if send_furl:
Brian Granger
Merging in vvatsa's ssh mode for ipcluster with some changes....
r1832 d = self._scp_furl(hostname)
Brian Granger
Initial merge of ssh cluster from ~vvatsa's ipcluster-dev.
r1830 else:
Brian Granger
Merging in vvatsa's ssh mode for ipcluster with some changes....
r1832 d = defer.succeed(None)
d.addCallback(lambda r: self._scp_sshx(hostname))
d.addCallback(lambda r: self._ssh_engine(hostname, count))
return d
Brian Granger
Initial merge of ssh cluster from ~vvatsa's ipcluster-dev.
r1830
Brian Granger
Merging in vvatsa's ssh mode for ipcluster with some changes....
r1832 def _scp_furl(self, hostname):
scp_cmd = "scp ~/.ipython/security/ipcontroller-engine.furl %s:.ipython/security/" % (hostname)
cmd_list = scp_cmd.split()
cmd_list[1] = os.path.expanduser(cmd_list[1])
log.msg('Copying furl file: %s' % scp_cmd)
d = getProcessOutput(cmd_list[0], cmd_list[1:], env=os.environ)
return d
def _scp_sshx(self, hostname):
scp_cmd = "scp %s %s:%s/%s-sshx.sh" % (
self.sshx, hostname,
self.temp_dir, os.environ['USER']
)
print
log.msg("Copying sshx: %s" % scp_cmd)
sshx_scp = scp_cmd.split()
d = getProcessOutput(sshx_scp[0], sshx_scp[1:], env=os.environ)
return d
def _ssh_engine(self, hostname, count):
exec_engine = "ssh %s sh %s/%s-sshx.sh %s" % (
hostname, self.temp_dir,
os.environ['USER'], self.engine_command
)
cmds = exec_engine.split()
dlist = []
log.msg("about to start engines...")
for i in range(count):
log.msg('Starting engines: %s' % exec_engine)
d = getProcessOutput(cmds[0], cmds[1:], env=os.environ)
dlist.append(d)
return gatherBoth(dlist, consumeErrors=True)
def kill(self):
dlist = []
for host in self.engine_hosts.keys():
d = self._killall(host)
dlist.append(d)
return gatherBoth(dlist, consumeErrors=True)
def _killall(self, hostname):
d = self._scp_engine_killer(hostname)
d.addCallback(lambda r: self._ssh_kill(hostname))
# d.addErrback(self._exec_err)
return d
def _scp_engine_killer(self, hostname):
scp_cmd = "scp %s %s:%s/%s-engine_killer.sh" % (
self.engine_killer,
hostname,
self.temp_dir,
os.environ['USER']
)
Brian Granger
Initial merge of ssh cluster from ~vvatsa's ipcluster-dev.
r1830 cmds = scp_cmd.split()
Brian Granger
Merging in vvatsa's ssh mode for ipcluster with some changes....
r1832 log.msg('Copying engine_killer: %s' % scp_cmd)
Brian Granger
Initial merge of ssh cluster from ~vvatsa's ipcluster-dev.
r1830 d = getProcessOutput(cmds[0], cmds[1:], env=os.environ)
Brian Granger
Merging in vvatsa's ssh mode for ipcluster with some changes....
r1832 return d
def _ssh_kill(self, hostname):
kill_cmd = "ssh %s sh %s/%s-engine_killer.sh" % (
hostname,
self.temp_dir,
os.environ['USER']
)
log.msg('Killing engine: %s' % kill_cmd)
kill_cmd = kill_cmd.split()
d = getProcessOutput(kill_cmd[0], kill_cmd[1:], env=os.environ)
return d
Brian Granger
Initial merge of ssh cluster from ~vvatsa's ipcluster-dev.
r1830
Brian Granger
Merging in vvatsa's ssh mode for ipcluster with some changes....
r1832 def _exec_err(self, r):
log.msg(r)
Brian Granger
Initial version of working refactored ipcluster....
r1770
Brian Granger
Final few cleanup changes to the PBS cluster in ipcluster.
r1774 #-----------------------------------------------------------------------------
# Main functions for the different types of clusters
#-----------------------------------------------------------------------------
# TODO:
# The logic in these codes should be moved into classes like LocalCluster
# MpirunCluster, PBSCluster, etc. This would remove alot of the duplications.
# The main functions should then just parse the command line arguments, create
# the appropriate class and call a 'start' method.
Brian Granger
Fix for ipcluster bug: https://bugs.launchpad.net/bugs/358202...
r1944
Administrator
Fixing numerous bugs in ipcluster on Win32....
r1815 def check_security(args, cont_args):
Brian Granger
Addressing various review comments.
r1958 """Check to see if we should run with SSL support."""
Administrator
Fixing numerous bugs in ipcluster on Win32....
r1815 if (not args.x or not args.y) and not have_crypto:
log.err("""
OpenSSL/pyOpenSSL is not available, so we can't run in secure mode.
Try running ipcluster with the -xy flags: ipcluster local -xy -n 4""")
reactor.stop()
return False
Brian Granger
The local mode of ipcluster now works on Win32.
r1771 if args.x:
cont_args.append('-x')
if args.y:
cont_args.append('-y')
Administrator
Fixing numerous bugs in ipcluster on Win32....
r1815 return True
Brian Granger
Fix for ipcluster bug: https://bugs.launchpad.net/bugs/358202...
r1944
Brian Granger
The ipcluster command is now able to reuse FURL files....
r1883 def check_reuse(args, cont_args):
Brian Granger
Addressing various review comments.
r1958 """Check to see if we should try to resuse FURL files."""
Brian Granger
The ipcluster command is now able to reuse FURL files....
r1883 if args.r:
cont_args.append('-r')
if args.client_port == 0 or args.engine_port == 0:
log.err("""
To reuse FURL files, you must also set the client and engine ports using
the --client-port and --engine-port options.""")
reactor.stop()
return False
cont_args.append('--client-port=%i' % args.client_port)
cont_args.append('--engine-port=%i' % args.engine_port)
return True
Brian Granger
Initial merge of ssh cluster from ~vvatsa's ipcluster-dev.
r1830
Brian Granger
Fix for ipcluster bug: https://bugs.launchpad.net/bugs/358202...
r1944
def _err_and_stop(f):
Brian Granger
Addressing various review comments.
r1958 """Errback to log a failure and halt the reactor on a fatal error."""
Brian Granger
Fix for ipcluster bug: https://bugs.launchpad.net/bugs/358202...
r1944 log.err(f)
reactor.stop()
def _delay_start(cont_pid, start_engines, furl_file, reuse):
Brian Granger
Addressing various review comments.
r1958 """Wait for controller to create FURL files and the start the engines."""
Brian Granger
Fix for ipcluster bug: https://bugs.launchpad.net/bugs/358202...
r1944 if not reuse:
if os.path.isfile(furl_file):
os.unlink(furl_file)
log.msg('Waiting for controller to finish starting...')
d = wait_for_file(furl_file, delay=0.2, max_tries=50)
d.addCallback(lambda _: log.msg('Controller started'))
d.addCallback(lambda _: start_engines(cont_pid))
return d
Administrator
Fixing numerous bugs in ipcluster on Win32....
r1815 def main_local(args):
cont_args = []
cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
Brian Granger
The ipcluster command is now able to reuse FURL files....
r1883
Administrator
Fixing numerous bugs in ipcluster on Win32....
r1815 # Check security settings before proceeding
Brian Granger
Fixing small things in response to review.
r1826 if not check_security(args, cont_args):
return
Brian Granger
The ipcluster command is now able to reuse FURL files....
r1883
# See if we are reusing FURL files
if not check_reuse(args, cont_args):
return
Brian Granger
Initial version of working refactored ipcluster....
r1770 cl = ControllerLauncher(extra_args=cont_args)
dstart = cl.start()
def start_engines(cont_pid):
engine_args = []
engine_args.append('--logfile=%s' % \
pjoin(args.logdir,'ipengine%s-' % cont_pid))
eset = LocalEngineSet(extra_args=engine_args)
def shutdown(signum, frame):
log.msg('Stopping local cluster')
# We are still playing with the times here, but these seem
# to be reliable in allowing everything to exit cleanly.
eset.interrupt_then_kill(0.5)
cl.interrupt_then_kill(0.5)
reactor.callLater(1.0, reactor.stop)
signal.signal(signal.SIGINT,shutdown)
d = eset.start(args.n)
return d
Brian Granger
Fix for ipcluster bug: https://bugs.launchpad.net/bugs/358202...
r1944 config = kernel_config_manager.get_config_obj()
furl_file = config['controller']['engine_furl_file']
dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
dstart.addErrback(_err_and_stop)
Brian Granger
Initial version of working refactored ipcluster....
r1770
Brian Granger
Initial merge of ssh cluster from ~vvatsa's ipcluster-dev.
r1830
Brian Granger
Adding support for mpiexec as well as mpirun....
r1880 def main_mpi(args):
Brian Granger
Initial version of working refactored ipcluster....
r1770 cont_args = []
cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
Brian Granger
The ipcluster command is now able to reuse FURL files....
r1883
Administrator
Fixing numerous bugs in ipcluster on Win32....
r1815 # Check security settings before proceeding
Brian Granger
Fixing small things in response to review.
r1826 if not check_security(args, cont_args):
return
Brian Granger
The ipcluster command is now able to reuse FURL files....
r1883
# See if we are reusing FURL files
if not check_reuse(args, cont_args):
return
Brian Granger
Initial version of working refactored ipcluster....
r1770 cl = ControllerLauncher(extra_args=cont_args)
dstart = cl.start()
def start_engines(cont_pid):
Brian Granger
Adding support for mpiexec as well as mpirun....
r1880 raw_args = [args.cmd]
Brian Granger
Initial version of working refactored ipcluster....
r1770 raw_args.extend(['-n',str(args.n)])
raw_args.append('ipengine')
raw_args.append('-l')
raw_args.append(pjoin(args.logdir,'ipengine%s-' % cont_pid))
Brian Granger
Update of docs to reflect the new ipcluster version....
r1788 if args.mpi:
raw_args.append('--mpi=%s' % args.mpi)
Brian Granger
Initial version of working refactored ipcluster....
r1770 eset = ProcessLauncher(raw_args)
def shutdown(signum, frame):
log.msg('Stopping local cluster')
# We are still playing with the times here, but these seem
# to be reliable in allowing everything to exit cleanly.
eset.interrupt_then_kill(1.0)
cl.interrupt_then_kill(1.0)
reactor.callLater(2.0, reactor.stop)
signal.signal(signal.SIGINT,shutdown)
d = eset.start()
return d
Brian Granger
Fix for ipcluster bug: https://bugs.launchpad.net/bugs/358202...
r1944 config = kernel_config_manager.get_config_obj()
furl_file = config['controller']['engine_furl_file']
dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
dstart.addErrback(_err_and_stop)
Brian Granger
Initial version of working refactored ipcluster....
r1770
Brian Granger
Initial merge of ssh cluster from ~vvatsa's ipcluster-dev.
r1830
Brian Granger
Initial version of working refactored ipcluster....
r1770 def main_pbs(args):
Brian Granger
Initial version that works with PBS.
r1772 cont_args = []
cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
Brian Granger
The ipcluster command is now able to reuse FURL files....
r1883
Administrator
Fixing numerous bugs in ipcluster on Win32....
r1815 # Check security settings before proceeding
Brian Granger
Fixing small things in response to review.
r1826 if not check_security(args, cont_args):
return
Brian Granger
The ipcluster command is now able to reuse FURL files....
r1883
# See if we are reusing FURL files
if not check_reuse(args, cont_args):
return
Justin Riley
add main_lsf for experimental LSF support
r2678
if args.pbsscript and not os.path.isfile(args.pbsscript):
log.err('PBS script does not exist: %s' % args.pbsscript)
return
Brian Granger
The ipcluster command is now able to reuse FURL files....
r1883
Brian Granger
Initial version that works with PBS.
r1772 cl = ControllerLauncher(extra_args=cont_args)
Brian Granger
Initial version of working refactored ipcluster....
r1770 dstart = cl.start()
def start_engines(r):
Justin Riley
add --queue to ipcluster's SGE/PBS/LSF subcmds
r2679 pbs_set = PBSEngineSet(args.pbsscript, args.pbsqueue)
Brian Granger
More changes to the PBS batch cluster.
r1773 def shutdown(signum, frame):
Justin Riley
add main_lsf for experimental LSF support
r2678 log.msg('Stopping PBS cluster')
Brian Granger
More changes to the PBS batch cluster.
r1773 d = pbs_set.kill()
d.addBoth(lambda _: cl.interrupt_then_kill(1.0))
d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop))
signal.signal(signal.SIGINT,shutdown)
Brian Granger
Initial version of working refactored ipcluster....
r1770 d = pbs_set.start(args.n)
return d
Brian Granger
Fix for ipcluster bug: https://bugs.launchpad.net/bugs/358202...
r1944 config = kernel_config_manager.get_config_obj()
furl_file = config['controller']['engine_furl_file']
dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
dstart.addErrback(_err_and_stop)
Brian Granger
Initial version of working refactored ipcluster....
r1770
Satrajit Ghosh
initial sge compatibility attemp
r2621 def main_sge(args):
cont_args = []
cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
# Check security settings before proceeding
if not check_security(args, cont_args):
return
# See if we are reusing FURL files
if not check_reuse(args, cont_args):
return
Justin Riley
update SGEEngineSet to use SGE job arrays...
r2675
if args.sgescript and not os.path.isfile(args.sgescript):
log.err('SGE script does not exist: %s' % args.sgescript)
return
Satrajit Ghosh
initial sge compatibility attemp
r2621
cl = ControllerLauncher(extra_args=cont_args)
dstart = cl.start()
def start_engines(r):
Justin Riley
add --queue to ipcluster's SGE/PBS/LSF subcmds
r2679 sge_set = SGEEngineSet(args.sgescript, args.sgequeue)
Satrajit Ghosh
initial sge compatibility attemp
r2621 def shutdown(signum, frame):
log.msg('Stopping sge cluster')
d = sge_set.kill()
d.addBoth(lambda _: cl.interrupt_then_kill(1.0))
d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop))
signal.signal(signal.SIGINT,shutdown)
d = sge_set.start(args.n)
return d
config = kernel_config_manager.get_config_obj()
furl_file = config['controller']['engine_furl_file']
dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
dstart.addErrback(_err_and_stop)
Justin Riley
add main_lsf for experimental LSF support
r2678 def main_lsf(args):
cont_args = []
cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
# Check security settings before proceeding
if not check_security(args, cont_args):
return
# See if we are reusing FURL files
if not check_reuse(args, cont_args):
return
if args.lsfscript and not os.path.isfile(args.lsfscript):
log.err('LSF script does not exist: %s' % args.lsfscript)
return
cl = ControllerLauncher(extra_args=cont_args)
dstart = cl.start()
def start_engines(r):
Justin Riley
add --queue to ipcluster's SGE/PBS/LSF subcmds
r2679 lsf_set = LSFEngineSet(args.lsfscript, args.lsfqueue)
Justin Riley
add main_lsf for experimental LSF support
r2678 def shutdown(signum, frame):
log.msg('Stopping LSF cluster')
d = lsf_set.kill()
d.addBoth(lambda _: cl.interrupt_then_kill(1.0))
d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop))
signal.signal(signal.SIGINT,shutdown)
d = lsf_set.start(args.n)
return d
config = kernel_config_manager.get_config_obj()
furl_file = config['controller']['engine_furl_file']
dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
dstart.addErrback(_err_and_stop)
Brian Granger
Initial version of working refactored ipcluster....
r1770
Brian Granger
Initial merge of ssh cluster from ~vvatsa's ipcluster-dev.
r1830 def main_ssh(args):
Brian Granger
Merging in vvatsa's ssh mode for ipcluster with some changes....
r1832 """Start a controller on localhost and engines using ssh.
Your clusterfile should look like::
send_furl = False # True, if you want
engines = {
'engine_host1' : engine_count,
'engine_host2' : engine_count2
}
"""
Brian Granger
Initial merge of ssh cluster from ~vvatsa's ipcluster-dev.
r1830 clusterfile = {}
execfile(args.clusterfile, clusterfile)
if not clusterfile.has_key('send_furl'):
clusterfile['send_furl'] = False
cont_args = []
cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
Brian Granger
Merging in vvatsa's ssh mode for ipcluster with some changes....
r1832
# Check security settings before proceeding
if not check_security(args, cont_args):
return
Brian Granger
The ipcluster command is now able to reuse FURL files....
r1883 # See if we are reusing FURL files
if not check_reuse(args, cont_args):
return
Brian Granger
Initial merge of ssh cluster from ~vvatsa's ipcluster-dev.
r1830 cl = ControllerLauncher(extra_args=cont_args)
dstart = cl.start()
def start_engines(cont_pid):
Satrajit Ghosh
ENH: ssh now allows sending all current environment args to remote ipengine
r3061 ssh_set = SSHEngineSet(clusterfile['engines'], sshx=args.sshx,
copyenvs=args.copyenvs)
Brian Granger
Initial merge of ssh cluster from ~vvatsa's ipcluster-dev.
r1830 def shutdown(signum, frame):
Brian Granger
Merging in vvatsa's ssh mode for ipcluster with some changes....
r1832 d = ssh_set.kill()
cl.interrupt_then_kill(1.0)
Brian Granger
Initial merge of ssh cluster from ~vvatsa's ipcluster-dev.
r1830 reactor.callLater(2.0, reactor.stop)
signal.signal(signal.SIGINT,shutdown)
Brian Granger
Merging in vvatsa's ssh mode for ipcluster with some changes....
r1832 d = ssh_set.start(clusterfile['send_furl'])
return d
Brian Granger
Fix for ipcluster bug: https://bugs.launchpad.net/bugs/358202...
r1944 config = kernel_config_manager.get_config_obj()
furl_file = config['controller']['engine_furl_file']
dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
dstart.addErrback(_err_and_stop)
Brian Granger
Initial merge of ssh cluster from ~vvatsa's ipcluster-dev.
r1830
Brian Granger
Initial version of working refactored ipcluster....
r1770 def get_args():
Brian Granger
The local mode of ipcluster now works on Win32.
r1771 base_parser = argparse.ArgumentParser(add_help=False)
base_parser.add_argument(
Brian Granger
The ipcluster command is now able to reuse FURL files....
r1883 '-r',
action='store_true',
dest='r',
help='try to reuse FURL files. Use with --client-port and --engine-port'
)
base_parser.add_argument(
'--client-port',
type=int,
dest='client_port',
help='the port the controller will listen on for client connections',
default=0
)
base_parser.add_argument(
'--engine-port',
type=int,
dest='engine_port',
help='the port the controller will listen on for engine connections',
default=0
)
base_parser.add_argument(
Brian Granger
The local mode of ipcluster now works on Win32.
r1771 '-x',
action='store_true',
dest='x',
help='turn off client security'
)
base_parser.add_argument(
'-y',
action='store_true',
dest='y',
help='turn off engine security'
)
base_parser.add_argument(
"--logdir",
type=str,
dest="logdir",
help="directory to put log files (default=$IPYTHONDIR/log)",
default=pjoin(get_ipython_dir(),'log')
)
base_parser.add_argument(
"-n",
"--num",
type=int,
dest="n",
default=2,
help="the number of engines to start"
)
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234
Brian Granger
The local mode of ipcluster now works on Win32.
r1771 parser = argparse.ArgumentParser(
description='IPython cluster startup. This starts a controller and\
Brian Granger
Adding information about IPYTHONDIR to usage of ipcluster and friends.
r1959 engines using various approaches. Use the IPYTHONDIR environment\
variable to change your IPython directory from the default of\
.ipython or _ipython. The log and security subdirectories of your\
IPython directory will be used by this script for log files and\
security files.'
Brian Granger
The local mode of ipcluster now works on Win32.
r1771 )
subparsers = parser.add_subparsers(
help='available cluster types. For help, do "ipcluster TYPE --help"')
Brian Granger
Added test in ipcluster.py to see if the platform is win32. If so,...
r1613
Brian Granger
The local mode of ipcluster now works on Win32.
r1771 parser_local = subparsers.add_parser(
'local',
help='run a local cluster',
parents=[base_parser]
)
Brian Granger
Initial version of working refactored ipcluster....
r1770 parser_local.set_defaults(func=main_local)
Brian Granger
The local mode of ipcluster now works on Win32.
r1771 parser_mpirun = subparsers.add_parser(
'mpirun',
Brian Granger
Adding support for mpiexec as well as mpirun....
r1880 help='run a cluster using mpirun (mpiexec also works)',
Brian Granger
The local mode of ipcluster now works on Win32.
r1771 parents=[base_parser]
)
parser_mpirun.add_argument(
"--mpi",
type=str,
Brian Granger
Update of docs to reflect the new ipcluster version....
r1788 dest="mpi", # Don't put a default here to allow no MPI support
Brian Granger
The local mode of ipcluster now works on Win32.
r1771 help="how to call MPI_Init (default=mpi4py)"
)
Brian Granger
Adding support for mpiexec as well as mpirun....
r1880 parser_mpirun.set_defaults(func=main_mpi, cmd='mpirun')
parser_mpiexec = subparsers.add_parser(
'mpiexec',
help='run a cluster using mpiexec (mpirun also works)',
parents=[base_parser]
)
parser_mpiexec.add_argument(
"--mpi",
type=str,
dest="mpi", # Don't put a default here to allow no MPI support
help="how to call MPI_Init (default=mpi4py)"
)
parser_mpiexec.set_defaults(func=main_mpi, cmd='mpiexec')
Justin Riley
add --queue to ipcluster's SGE/PBS/LSF subcmds
r2679
Brian Granger
Initial version that works with PBS.
r1772 parser_pbs = subparsers.add_parser(
Justin Riley
add --queue to ipcluster's SGE/PBS/LSF subcmds
r2679 'pbs',
Brian Granger
Initial version that works with PBS.
r1772 help='run a pbs cluster',
parents=[base_parser]
)
parser_pbs.add_argument(
Justin Riley
add --queue to ipcluster's SGE/PBS/LSF subcmds
r2679 '-s',
Brian Granger
Initial version that works with PBS.
r1772 '--pbs-script',
Justin Riley
add --queue to ipcluster's SGE/PBS/LSF subcmds
r2679 type=str,
Brian Granger
Initial version that works with PBS.
r1772 dest='pbsscript',
help='PBS script template',
Justin Riley
update PBSEngineSet to use job arrays...
r2676 default=''
Brian Granger
Initial version that works with PBS.
r1772 )
Justin Riley
add --queue to ipcluster's SGE/PBS/LSF subcmds
r2679 parser_pbs.add_argument(
'-q',
'--queue',
type=str,
dest='pbsqueue',
help='PBS queue to use when starting the engines',
default=None,
)
Brian Granger
Initial version of working refactored ipcluster....
r1770 parser_pbs.set_defaults(func=main_pbs)
Justin Riley
add --queue to ipcluster's SGE/PBS/LSF subcmds
r2679
Satrajit Ghosh
initial sge compatibility attemp
r2621 parser_sge = subparsers.add_parser(
Justin Riley
add --queue to ipcluster's SGE/PBS/LSF subcmds
r2679 'sge',
Satrajit Ghosh
firt pass to support sge
r2622 help='run an sge cluster',
Satrajit Ghosh
initial sge compatibility attemp
r2621 parents=[base_parser]
)
parser_sge.add_argument(
Justin Riley
add --queue to ipcluster's SGE/PBS/LSF subcmds
r2679 '-s',
Satrajit Ghosh
initial sge compatibility attemp
r2621 '--sge-script',
Justin Riley
add --queue to ipcluster's SGE/PBS/LSF subcmds
r2679 type=str,
Satrajit Ghosh
initial sge compatibility attemp
r2621 dest='sgescript',
help='SGE script template',
Justin Riley
update SGEEngineSet to use SGE job arrays...
r2675 default='' # SGEEngineSet will create one if not specified
Satrajit Ghosh
initial sge compatibility attemp
r2621 )
Justin Riley
add --queue to ipcluster's SGE/PBS/LSF subcmds
r2679 parser_sge.add_argument(
'-q',
'--queue',
type=str,
dest='sgequeue',
help='SGE queue to use when starting the engines',
default=None,
)
Satrajit Ghosh
initial sge compatibility attemp
r2621 parser_sge.set_defaults(func=main_sge)
Justin Riley
add experimental LSF support (needs testing)
r2677
parser_lsf = subparsers.add_parser(
Justin Riley
add --queue to ipcluster's SGE/PBS/LSF subcmds
r2679 'lsf',
Justin Riley
add experimental LSF support (needs testing)
r2677 help='run an lsf cluster',
parents=[base_parser]
)
Justin Riley
add --queue to ipcluster's SGE/PBS/LSF subcmds
r2679
Justin Riley
add experimental LSF support (needs testing)
r2677 parser_lsf.add_argument(
Justin Riley
add --queue to ipcluster's SGE/PBS/LSF subcmds
r2679 '-s',
Justin Riley
add experimental LSF support (needs testing)
r2677 '--lsf-script',
Justin Riley
add --queue to ipcluster's SGE/PBS/LSF subcmds
r2679 type=str,
Justin Riley
add experimental LSF support (needs testing)
r2677 dest='lsfscript',
help='LSF script template',
default='' # LSFEngineSet will create one if not specified
)
Justin Riley
add --queue to ipcluster's SGE/PBS/LSF subcmds
r2679
parser_lsf.add_argument(
'-q',
'--queue',
type=str,
dest='lsfqueue',
help='LSF queue to use when starting the engines',
default=None,
)
Justin Riley
add main_lsf for experimental LSF support
r2678 parser_lsf.set_defaults(func=main_lsf)
Satrajit Ghosh
initial sge compatibility attemp
r2621
Brian Granger
Initial merge of ssh cluster from ~vvatsa's ipcluster-dev.
r1830 parser_ssh = subparsers.add_parser(
'ssh',
help='run a cluster using ssh, should have ssh-keys setup',
parents=[base_parser]
)
parser_ssh.add_argument(
Satrajit Ghosh
ENH: ssh now allows sending all current environment args to remote ipengine
r3061 '-e',
'--copyenvs',
action='store_true',
dest='copyenvs',
help='Copy current shell environment to remote location',
default=False,
)
parser_ssh.add_argument(
Brian Granger
Initial merge of ssh cluster from ~vvatsa's ipcluster-dev.
r1830 '--clusterfile',
type=str,
dest='clusterfile',
help='python file describing the cluster',
default='clusterfile.py',
)
parser_ssh.add_argument(
'--sshx',
type=str,
dest='sshx',
Brian Granger
Merging in vvatsa's ssh mode for ipcluster with some changes....
r1832 help='sshx launcher helper'
Brian Granger
Initial merge of ssh cluster from ~vvatsa's ipcluster-dev.
r1830 )
parser_ssh.set_defaults(func=main_ssh)
Brian Granger
Initial version of working refactored ipcluster....
r1770 args = parser.parse_args()
return args
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234
Brian Granger
Initial version of working refactored ipcluster....
r1770 def main():
args = get_args()
reactor.callWhenRunning(args.func, args)
log.startLogging(sys.stdout)
reactor.run()
if __name__ == '__main__':
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234 main()