diff --git a/IPython/kernel/scripts/ipcluster.py b/IPython/kernel/scripts/ipcluster.py index 34b2d61..d4e87d4 100644 --- a/IPython/kernel/scripts/ipcluster.py +++ b/IPython/kernel/scripts/ipcluster.py @@ -18,6 +18,7 @@ import os import re import sys import signal +import stat import tempfile pjoin = os.path.join @@ -234,6 +235,7 @@ class LocalEngineSet(object): def start(self, n): dlist = [] for i in range(n): + print "starting engine:", i el = EngineLauncher(extra_args=self.extra_args) d = el.start() self.launchers.append(el) @@ -270,22 +272,25 @@ class LocalEngineSet(object): dfinal.addCallback(self._handle_stop) return dfinal - class BatchEngineSet(object): - - # Subclasses must fill these in. See PBSEngineSet + + # Subclasses must fill these in. See PBSEngineSet/SGEEngineSet + name = '' submit_command = '' delete_command = '' job_id_regexp = '' - - def __init__(self, template_file, **kwargs): + job_array_regexp = '' + job_array_template = '' + queue_regexp = '' + queue_template = '' + default_template = '' + + def __init__(self, template_file, queue, **kwargs): self.template_file = template_file - self.context = {} - self.context.update(kwargs) - self.batch_file = self.template_file+'-run' - + self.queue = queue + def parse_job_id(self, output): - m = re.match(self.job_id_regexp, output) + m = re.search(self.job_id_regexp, output) if m is not None: job_id = m.group() else: @@ -293,46 +298,120 @@ class BatchEngineSet(object): self.job_id = job_id log.msg('Job started with job id: %r' % job_id) return job_id - - def write_batch_script(self, n): - self.context['n'] = n - template = open(self.template_file, 'r').read() - log.msg('Using template for batch script: %s' % self.template_file) - script_as_string = Itpl.itplns(template, self.context) - log.msg('Writing instantiated batch script: %s' % self.batch_file) - f = open(self.batch_file,'w') - f.write(script_as_string) - f.close() - + def handle_error(self, f): f.printTraceback() f.raiseException() - + def start(self, n): - self.write_batch_script(n) + log.msg("starting %d engines" % n) + self._temp_file = tempfile.NamedTemporaryFile() + os.chmod(self._temp_file.name, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR) + if self.template_file: + log.msg("Using %s script %s" % (self.name, self.template_file)) + contents = open(self.template_file, 'r').read() + new_script = contents + regex = re.compile(self.job_array_regexp) + if not regex.search(contents): + log.msg("adding job array settings to %s script" % self.name) + new_script = self.job_array_template % n +'\n' + new_script + print self.queue_regexp + regex = re.compile(self.queue_regexp) + print regex.search(contents) + if self.queue and not regex.search(contents): + log.msg("adding queue settings to %s script" % self.name) + new_script = self.queue_template % self.queue + '\n' + new_script + if new_script != contents: + self._temp_file.write(new_script) + self.template_file = self._temp_file.name + else: + default_script = self.default_template % n + if self.queue: + default_script = self.queue_template % self.queue + \ + '\n' + default_script + log.msg("using default ipengine %s script: \n%s" % + (self.name, default_script)) + self._temp_file.file.write(default_script) + self.template_file = self._temp_file.name + self._temp_file.file.close() d = getProcessOutput(self.submit_command, - [self.batch_file],env=os.environ) + [self.template_file], + env=os.environ) d.addCallback(self.parse_job_id) d.addErrback(self.handle_error) return d - + def kill(self): d = getProcessOutput(self.delete_command, [self.job_id],env=os.environ) return d class PBSEngineSet(BatchEngineSet): - + + name = 'PBS' submit_command = 'qsub' delete_command = 'qdel' job_id_regexp = '\d+' - - def __init__(self, template_file, **kwargs): - BatchEngineSet.__init__(self, template_file, **kwargs) + job_array_regexp = '#PBS[ \t]+-t[ \t]+\d+' + job_array_template = '#PBS -t 1-%d' + queue_regexp = '#PBS[ \t]+-q[ \t]+\w+' + queue_template = '#PBS -q %s' + default_template="""#!/bin/sh +#PBS -V +#PBS -t 1-%d +#PBS -N ipengine +eid=$(($PBS_ARRAYID - 1)) +ipengine --logfile=ipengine${eid}.log +""" + +class SGEEngineSet(PBSEngineSet): + + name = 'SGE' + job_array_regexp = '#\$[ \t]+-t[ \t]+\d+' + job_array_template = '#$ -t 1-%d' + queue_regexp = '#\$[ \t]+-q[ \t]+\w+' + queue_template = '#$ -q %s' + default_template="""#$ -V +#$ -S /bin/sh +#$ -t 1-%d +#$ -N ipengine +eid=$(($SGE_TASK_ID - 1)) +ipengine --logfile=ipengine${eid}.log +""" + +class LSFEngineSet(PBSEngineSet): + + name = 'LSF' + submit_command = 'bsub' + delete_command = 'bkill' + job_array_regexp = '#BSUB[ \t]-J+\w+\[\d+-\d+\]' + job_array_template = '#BSUB -J ipengine[1-%d]' + queue_regexp = '#BSUB[ \t]+-q[ \t]+\w+' + queue_template = '#BSUB -q %s' + default_template="""#!/bin/sh +#BSUB -J ipengine[1-%d] +eid=$(($LSB_JOBINDEX - 1)) +ipengine --logfile=ipengine${eid}.log +""" + bsub_wrapper="""#!/bin/sh +bsub < $1 +""" + + def __init__(self, template_file, queue, **kwargs): + self._bsub_wrapper = self._make_bsub_wrapper() + self.submit_command = self._bsub_wrapper.name + PBSEngineSet.__init__(self,template_file, queue, **kwargs) + def _make_bsub_wrapper(self): + bsub_wrapper = tempfile.NamedTemporaryFile() + bsub_wrapper.write(self.bsub_wrapper) + bsub_wrapper.file.close() + os.chmod(bsub_wrapper.name, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR) + return bsub_wrapper -sshx_template="""#!/bin/sh -"$@" &> /dev/null & +sshx_template_prefix="""#!/bin/sh +""" +sshx_template_suffix=""""$@" &> /dev/null & echo $! """ @@ -340,11 +419,19 @@ engine_killer_template="""#!/bin/sh ps -fu `whoami` | grep '[i]pengine' | awk '{print $2}' | xargs kill -TERM """ +def escape_strings(val): + val = val.replace('(','\(') + val = val.replace(')','\)') + if ' ' in val: + val = '"%s"'%val + return val + class SSHEngineSet(object): - sshx_template=sshx_template + sshx_template_prefix=sshx_template_prefix + sshx_template_suffix=sshx_template_suffix engine_killer_template=engine_killer_template - def __init__(self, engine_hosts, sshx=None, ipengine="ipengine"): + def __init__(self, engine_hosts, sshx=None, copyenvs=None, ipengine="ipengine"): """Start a controller on localhost and engines using ssh. The engine_hosts argument is a dict with hostnames as keys and @@ -363,7 +450,12 @@ class SSHEngineSet(object): '%s-main-sshx.sh' % os.environ['USER'] ) f = open(self.sshx, 'w') - f.writelines(self.sshx_template) + f.writelines(self.sshx_template_prefix) + if copyenvs: + for key, val in sorted(os.environ.items()): + newval = escape_strings(val) + f.writelines('export %s=%s\n'%(key,newval)) + f.writelines(self.sshx_template_suffix) f.close() self.engine_command = ipengine self.engine_hosts = engine_hosts @@ -609,13 +701,17 @@ def main_pbs(args): # See if we are reusing FURL files if not check_reuse(args, cont_args): return + + if args.pbsscript and not os.path.isfile(args.pbsscript): + log.err('PBS script does not exist: %s' % args.pbsscript) + return cl = ControllerLauncher(extra_args=cont_args) dstart = cl.start() def start_engines(r): - pbs_set = PBSEngineSet(args.pbsscript) + pbs_set = PBSEngineSet(args.pbsscript, args.pbsqueue) def shutdown(signum, frame): - log.msg('Stopping pbs cluster') + log.msg('Stopping PBS cluster') d = pbs_set.kill() d.addBoth(lambda _: cl.interrupt_then_kill(1.0)) d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop)) @@ -627,6 +723,72 @@ def main_pbs(args): dstart.addCallback(_delay_start, start_engines, furl_file, args.r) dstart.addErrback(_err_and_stop) +def main_sge(args): + cont_args = [] + cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller')) + + # Check security settings before proceeding + if not check_security(args, cont_args): + return + + # See if we are reusing FURL files + if not check_reuse(args, cont_args): + return + + if args.sgescript and not os.path.isfile(args.sgescript): + log.err('SGE script does not exist: %s' % args.sgescript) + return + + cl = ControllerLauncher(extra_args=cont_args) + dstart = cl.start() + def start_engines(r): + sge_set = SGEEngineSet(args.sgescript, args.sgequeue) + def shutdown(signum, frame): + log.msg('Stopping sge cluster') + d = sge_set.kill() + d.addBoth(lambda _: cl.interrupt_then_kill(1.0)) + d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop)) + signal.signal(signal.SIGINT,shutdown) + d = sge_set.start(args.n) + return d + config = kernel_config_manager.get_config_obj() + furl_file = config['controller']['engine_furl_file'] + dstart.addCallback(_delay_start, start_engines, furl_file, args.r) + dstart.addErrback(_err_and_stop) + +def main_lsf(args): + cont_args = [] + cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller')) + + # Check security settings before proceeding + if not check_security(args, cont_args): + return + + # See if we are reusing FURL files + if not check_reuse(args, cont_args): + return + + if args.lsfscript and not os.path.isfile(args.lsfscript): + log.err('LSF script does not exist: %s' % args.lsfscript) + return + + cl = ControllerLauncher(extra_args=cont_args) + dstart = cl.start() + def start_engines(r): + lsf_set = LSFEngineSet(args.lsfscript, args.lsfqueue) + def shutdown(signum, frame): + log.msg('Stopping LSF cluster') + d = lsf_set.kill() + d.addBoth(lambda _: cl.interrupt_then_kill(1.0)) + d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop)) + signal.signal(signal.SIGINT,shutdown) + d = lsf_set.start(args.n) + return d + config = kernel_config_manager.get_config_obj() + furl_file = config['controller']['engine_furl_file'] + dstart.addCallback(_delay_start, start_engines, furl_file, args.r) + dstart.addErrback(_err_and_stop) + def main_ssh(args): """Start a controller on localhost and engines using ssh. @@ -658,7 +820,8 @@ def main_ssh(args): cl = ControllerLauncher(extra_args=cont_args) dstart = cl.start() def start_engines(cont_pid): - ssh_set = SSHEngineSet(clusterfile['engines'], sshx=args.sshx) + ssh_set = SSHEngineSet(clusterfile['engines'], sshx=args.sshx, + copyenvs=args.copyenvs) def shutdown(signum, frame): d = ssh_set.kill() cl.interrupt_then_kill(1.0) @@ -765,20 +928,77 @@ def get_args(): help="how to call MPI_Init (default=mpi4py)" ) parser_mpiexec.set_defaults(func=main_mpi, cmd='mpiexec') - + parser_pbs = subparsers.add_parser( - 'pbs', + 'pbs', help='run a pbs cluster', parents=[base_parser] ) parser_pbs.add_argument( + '-s', '--pbs-script', - type=str, + type=str, dest='pbsscript', help='PBS script template', - default='pbs.template' + default='' + ) + parser_pbs.add_argument( + '-q', + '--queue', + type=str, + dest='pbsqueue', + help='PBS queue to use when starting the engines', + default=None, ) parser_pbs.set_defaults(func=main_pbs) + + parser_sge = subparsers.add_parser( + 'sge', + help='run an sge cluster', + parents=[base_parser] + ) + parser_sge.add_argument( + '-s', + '--sge-script', + type=str, + dest='sgescript', + help='SGE script template', + default='' # SGEEngineSet will create one if not specified + ) + parser_sge.add_argument( + '-q', + '--queue', + type=str, + dest='sgequeue', + help='SGE queue to use when starting the engines', + default=None, + ) + parser_sge.set_defaults(func=main_sge) + + parser_lsf = subparsers.add_parser( + 'lsf', + help='run an lsf cluster', + parents=[base_parser] + ) + + parser_lsf.add_argument( + '-s', + '--lsf-script', + type=str, + dest='lsfscript', + help='LSF script template', + default='' # LSFEngineSet will create one if not specified + ) + + parser_lsf.add_argument( + '-q', + '--queue', + type=str, + dest='lsfqueue', + help='LSF queue to use when starting the engines', + default=None, + ) + parser_lsf.set_defaults(func=main_lsf) parser_ssh = subparsers.add_parser( 'ssh', @@ -786,6 +1006,14 @@ def get_args(): parents=[base_parser] ) parser_ssh.add_argument( + '-e', + '--copyenvs', + action='store_true', + dest='copyenvs', + help='Copy current shell environment to remote location', + default=False, + ) + parser_ssh.add_argument( '--clusterfile', type=str, dest='clusterfile', diff --git a/docs/source/parallel/parallel_process.txt b/docs/source/parallel/parallel_process.txt index c91d5d6..ae10352 100644 --- a/docs/source/parallel/parallel_process.txt +++ b/docs/source/parallel/parallel_process.txt @@ -53,7 +53,9 @@ The :command:`ipcluster` command provides a simple way of starting a controller 2. When engines are started using the :command:`mpirun` command that comes with most MPI [MPI]_ implementations 3. When engines are started using the PBS [PBS]_ batch system. -4. When the controller is started on localhost and the engines are started on +4. When engines are started using the SGE [SGE]_ batch system. +5. When engines are started using the LSF [LSF]_ batch system. +6. When the controller is started on localhost and the engines are started on remote nodes using :command:`ssh`. .. note:: @@ -126,49 +128,115 @@ More details on using MPI with IPython can be found :ref:`here `. Using :command:`ipcluster` in PBS mode -------------------------------------- -The PBS mode uses the Portable Batch System [PBS]_ to start the engines. To use this mode, you first need to create a PBS script template that will be used to start the engines. Here is a sample PBS script template: +The PBS mode uses the Portable Batch System [PBS]_ to start the engines. -.. sourcecode:: bash - - #PBS -N ipython - #PBS -j oe - #PBS -l walltime=00:10:00 - #PBS -l nodes=${n/4}:ppn=4 - #PBS -q parallel +To start an ipcluster using the Portable Batch System:: - cd $$PBS_O_WORKDIR - export PATH=$$HOME/usr/local/bin - export PYTHONPATH=$$HOME/usr/local/lib/python2.4/site-packages - /usr/local/bin/mpiexec -n ${n} ipengine --logfile=$$PBS_O_WORKDIR/ipengine + $ ipcluster pbs -n 12 -There are a few important points about this template: +The above command will launch a PBS job array with 12 tasks using the default queue. If you would like to submit the job to a different queue use the -q option: -1. This template will be rendered at runtime using IPython's :mod:`Itpl` - template engine. + $ ipcluster pbs -n 12 -q hpcqueue -2. Instead of putting in the actual number of engines, use the notation - ``${n}`` to indicate the number of engines to be started. You can also uses - expressions like ``${n/4}`` in the template to indicate the number of - nodes. +By default, ipcluster will generate and submit a job script to launch the engines. However, if you need to use your own job script use the -s option: -3. Because ``$`` is a special character used by the template engine, you must - escape any ``$`` by using ``$$``. This is important when referring to - environment variables in the template. + $ ipcluster pbs -n 12 -q hpcqueue -s mypbscript.sh -4. Any options to :command:`ipengine` should be given in the batch script - template. +For example the default autogenerated script looks like:: -5. Depending on the configuration of you system, you may have to set - environment variables in the script template. + #PBS -q hpcqueue + #!/bin/sh + #PBS -V + #PBS -t 1-12 + #PBS -N ipengine + eid=$(($PBS_ARRAYID - 1)) + ipengine --logfile=ipengine${eid}.log -Once you have created such a script, save it with a name like :file:`pbs.template`. Now you are ready to start your job:: +.. note:: - $ ipcluster pbs -n 128 --pbs-script=pbs.template + ipcluster relies on using PBS job arrays to start the + engines. If you specify your own job script without specifying the + job array settings ipcluster will automatically add the job array + settings (#PBS -t 1-N) to your script. Additional command line options for this mode can be found by doing:: $ ipcluster pbs -h +Using :command:`ipcluster` in SGE mode +-------------------------------------- + +The SGE mode uses the Sun Grid Engine [SGE]_ to start the engines. + +To start an ipcluster using Sun Grid Engine:: + + $ ipcluster sge -n 12 + +The above command will launch an SGE job array with 12 tasks using the default queue. If you would like to submit the job to a different queue use the -q option: + + $ ipcluster sge -n 12 -q hpcqueue + +By default, ipcluster will generate and submit a job script to launch the engines. However, if you need to use your own job script use the -s option: + + $ ipcluster sge -n 12 -q hpcqueue -s mysgescript.sh + +For example the default autogenerated script looks like:: + + #$ -q hpcqueue + #$ -V + #$ -S /bin/sh + #$ -t 1-12 + #$ -N ipengine + eid=$(($SGE_TASK_ID - 1)) + ipengine --logfile=ipengine${eid}.log #$ -V + +.. note:: + + ipcluster relies on using SGE job arrays to start the engines. If + you specify your own job script without specifying the job array + settings ipcluster will automatically add the job array settings (#$ -t + 1-N) to your script. + +Additional command line options for this mode can be found by doing:: + + $ ipcluster sge -h + +Using :command:`ipcluster` in LSF mode +-------------------------------------- + +The LSF mode uses the Load Sharing Facility [LSF]_ to start the engines. + +To start an ipcluster using the Load Sharing Facility:: + + $ ipcluster lsf -n 12 + +The above command will launch an LSF job array with 12 tasks using the default queue. If you would like to submit the job to a different queue use the -q option: + + $ ipcluster lsf -n 12 -q hpcqueue + +By default, ipcluster will generate and submit a job script to launch the engines. However, if you need to use your own job script use the -s option: + + $ ipcluster lsf -n 12 -q hpcqueue -s mylsfscript.sh + +For example the default autogenerated script looks like:: + + #BSUB -q hpcqueue + #!/bin/sh + #BSUB -J ipengine[1-12] + eid=$(($LSB_JOBINDEX - 1)) + ipengine --logfile=ipengine${eid}.log + +.. note:: + + ipcluster relies on using LSF job arrays to start the engines. If you + specify your own job script without specifying the job array settings + ipcluster will automatically add the job array settings (#BSUB -J + ipengine[1-N]) to your script. + +Additional command line options for this mode can be found by doing:: + + $ ipcluster lsf -h + Using :command:`ipcluster` in SSH mode -------------------------------------- @@ -348,4 +416,6 @@ the log files to us will often help us to debug any problems. .. [PBS] Portable Batch System. http://www.openpbs.org/ +.. [SGE] Sun Grid Engine. http://www.sun.com/software/sge/ +.. [LSF] Load Sharing Facility. http://www.platform.com/ .. [SSH] SSH-Agent http://en.wikipedia.org/wiki/Ssh-agent