diff --git a/IPython/kernel/scripts/ipcluster.py b/IPython/kernel/scripts/ipcluster.py index 15618be..41242cb 100644 --- a/IPython/kernel/scripts/ipcluster.py +++ b/IPython/kernel/scripts/ipcluster.py @@ -277,13 +277,16 @@ class BatchEngineSet(object): name = '' submit_command = '' delete_command = '' - script_param_prefix = '' job_id_regexp = '' job_array_regexp = '' + job_array_template = '' + queue_regexp = '' + queue_template = '' default_template = '' - def __init__(self, template_file, **kwargs): + def __init__(self, template_file, queue, **kwargs): self.template_file = template_file + self.queue = queue def parse_job_id(self, output): m = re.search(self.job_id_regexp, output) @@ -302,19 +305,31 @@ class BatchEngineSet(object): def start(self, n): log.msg("starting %d engines" % n) self._temp_file = tempfile.NamedTemporaryFile() - regex = re.compile(self.job_array_regexp) if self.template_file: log.msg("Using %s script %s" % (self.name, self.template_file)) contents = open(self.template_file, 'r').read() + new_script = contents + regex = re.compile(self.job_array_regexp) if not regex.search(contents): log.msg("adding job array settings to %s script" % self.name) - contents = ("%s -t 1-%d\n" % (self.script_param_prefix,n)) + contents - self._temp_file.write(contents) + new_script = self.job_array_template % n +'\n' + new_script + print self.queue_regexp + regex = re.compile(self.queue_regexp) + print regex.search(contents) + if self.queue and not regex.search(contents): + log.msg("adding queue settings to %s script" % self.name) + new_script = self.queue_template % self.queue + '\n' + new_script + if new_script != contents: + self._temp_file.write(new_script) self.template_file = self._temp_file.name else: + default_script = self.default_template % n + if self.queue: + default_script = self.queue_template % self.queue + \ + '\n' + default_script log.msg("using default ipengine %s script: \n%s" % - (self.name, (self.default_template % n))) - self._temp_file.file.write(self.default_template % n) + (self.name, default_script)) + self._temp_file.file.write(default_script) self.template_file = self._temp_file.name self._temp_file.file.flush() d = getProcessOutput(self.submit_command, @@ -334,9 +349,11 @@ class PBSEngineSet(BatchEngineSet): name = 'PBS' submit_command = 'qsub' delete_command = 'qdel' - script_param_prefix = "#PBS" job_id_regexp = '\d+' job_array_regexp = '#PBS[ \t]+-t[ \t]+\d+' + job_array_template = '#PBS -t 1-%d' + queue_regexp = '#PBS[ \t]+-q[ \t]+\w+' + queue_template = '#PBS -q %s' default_template="""#PBS -V #PBS -t 1-%d #PBS -N ipengine @@ -347,8 +364,10 @@ ipengine --logfile=ipengine${eid}.log class SGEEngineSet(PBSEngineSet): name = 'SGE' - script_param_prefix = "#$" job_array_regexp = '#\$[ \t]+-t[ \t]+\d+' + job_array_template = '#$ -t 1-%d' + queue_regexp = '#\$[ \t]+-q[ \t]+\w+' + queue_template = '#$ -q %s' default_template="""#$ -V #$ -t 1-%d #$ -N ipengine @@ -361,9 +380,11 @@ class LSFEngineSet(PBSEngineSet): name = 'LSF' submit_command = 'bsub' delete_command = 'bkill' - script_param_prefix = "#BSUB" - job_array_regexp = '#BSUB[ \t]+\w+\[\d+-\d+\]' - default_template="""#BSUB ipengine[1-%d] + job_array_regexp = '#BSUB[ \t]-J+\w+\[\d+-\d+\]' + job_array_template = '#BSUB -J ipengine[1-%d]' + queue_regexp = '#BSUB[ \t]+-q[ \t]+\w+' + queue_template = '#BSUB -q %s' + default_template="""#BSUB -J ipengine[1-%d] eid=$(($LSB_JOBINDEX - 1)) ipengine --logfile=ipengine${eid}.log """ @@ -654,7 +675,7 @@ def main_pbs(args): cl = ControllerLauncher(extra_args=cont_args) dstart = cl.start() def start_engines(r): - pbs_set = PBSEngineSet(args.pbsscript) + pbs_set = PBSEngineSet(args.pbsscript, args.pbsqueue) def shutdown(signum, frame): log.msg('Stopping PBS cluster') d = pbs_set.kill() @@ -687,7 +708,7 @@ def main_sge(args): cl = ControllerLauncher(extra_args=cont_args) dstart = cl.start() def start_engines(r): - sge_set = SGEEngineSet(args.sgescript) + sge_set = SGEEngineSet(args.sgescript, args.sgequeue) def shutdown(signum, frame): log.msg('Stopping sge cluster') d = sge_set.kill() @@ -720,7 +741,7 @@ def main_lsf(args): cl = ControllerLauncher(extra_args=cont_args) dstart = cl.start() def start_engines(r): - lsf_set = LSFEngineSet(args.lsfscript) + lsf_set = LSFEngineSet(args.lsfscript, args.lsfqueue) def shutdown(signum, frame): log.msg('Stopping LSF cluster') d = lsf_set.kill() @@ -872,47 +893,76 @@ def get_args(): help="how to call MPI_Init (default=mpi4py)" ) parser_mpiexec.set_defaults(func=main_mpi, cmd='mpiexec') - + parser_pbs = subparsers.add_parser( - 'pbs', + 'pbs', help='run a pbs cluster', parents=[base_parser] ) parser_pbs.add_argument( + '-s', '--pbs-script', - type=str, + type=str, dest='pbsscript', help='PBS script template', default='' ) + parser_pbs.add_argument( + '-q', + '--queue', + type=str, + dest='pbsqueue', + help='PBS queue to use when starting the engines', + default=None, + ) parser_pbs.set_defaults(func=main_pbs) - + parser_sge = subparsers.add_parser( - 'sge', + 'sge', help='run an sge cluster', parents=[base_parser] ) parser_sge.add_argument( + '-s', '--sge-script', - type=str, + type=str, dest='sgescript', help='SGE script template', default='' # SGEEngineSet will create one if not specified ) + parser_sge.add_argument( + '-q', + '--queue', + type=str, + dest='sgequeue', + help='SGE queue to use when starting the engines', + default=None, + ) parser_sge.set_defaults(func=main_sge) parser_lsf = subparsers.add_parser( - 'lsf', + 'lsf', help='run an lsf cluster', parents=[base_parser] ) + parser_lsf.add_argument( + '-s', '--lsf-script', - type=str, + type=str, dest='lsfscript', help='LSF script template', default='' # LSFEngineSet will create one if not specified ) + + parser_lsf.add_argument( + '-q', + '--queue', + type=str, + dest='lsfqueue', + help='LSF queue to use when starting the engines', + default=None, + ) parser_lsf.set_defaults(func=main_lsf) parser_ssh = subparsers.add_parser(