diff --git a/IPython/kernel/scripts/ipcluster.py b/IPython/kernel/scripts/ipcluster.py index 06d79d7..4d8442a 100644 --- a/IPython/kernel/scripts/ipcluster.py +++ b/IPython/kernel/scripts/ipcluster.py @@ -331,15 +331,11 @@ class PBSEngineSet(BatchEngineSet): def __init__(self, template_file, **kwargs): BatchEngineSet.__init__(self, template_file, **kwargs) -class SGEEngineSet(BatchEngineSet): - - submit_command = 'qsub' - delete_command = 'qdel' - job_id_regexp = '\d+' - +class SGEEngineSet(PBSEngineSet): + def __init__(self, template_file, **kwargs): BatchEngineSet.__init__(self, template_file, **kwargs) - self.num_engines = None + self._temp_file = None def parse_job_id(self, output): m = re.search(self.job_id_regexp, output) @@ -347,48 +343,42 @@ class SGEEngineSet(BatchEngineSet): job_id = m.group() else: raise Exception("job id couldn't be determined: %s" % output) - self.job_id.append(job_id) - log.msg('Job started with job id: %r' % job_id) + self.job_id = job_id + log.msg('job started with job id: %r' % job_id) return job_id - - def kill_job(self, output): - log.msg(output) - return output - - def write_batch_script(self, i): - context = {'eid':i} - template = open(self.template_file, 'r').read() - log.msg('Using template for batch script: %s' % self.template_file) - script_as_string = Itpl.itplns(template, context) - log.msg('Writing instantiated batch script: %s' % self.batch_file+str(i)) - f = open(self.batch_file+str(i),'w') - f.write(script_as_string) - f.close() - + def start(self, n): - dlist = [] - self.num_engines = 0 - self.job_id = [] - for i in range(n): - log.msg("starting engine: %d"%i) - self.write_batch_script(i) - d = getProcessOutput(self.submit_command, - [self.batch_file+str(i)],env=os.environ) - d.addCallback(self.parse_job_id) - d.addErrback(self.handle_error) - dlist.append(d) - return gatherBoth(dlist, consumeErrors=True) - - def kill(self): - dlist = [] - for i in range(self.num_engines): - log.msg("killing job id: %d"%self.job_id[i]) - d = getProcessOutput(self.delete_command, - [self.job_id[i]],env=os.environ) - d.addCallback(self.kill_job) - dlist.append(d) - return gatherBoth(dlist, consumeErrors=True) - + log.msg("starting %d engines" % n) + self._temp_file = tempfile.NamedTemporaryFile() + regex = re.compile('#\$[ \t]+-t[ \t]+\d+') + if self.template_file: + log.msg("Using sge script %s" % self.template_file) + contents = open(self.template_file, 'r').read() + if not regex.search(contents): + log.msg("adding job array settings to sge script") + contents = ("#$ -t 1-%d\n" % n) + contents + self._temp_file.write(contents) + self.template_file = self._temp_file.name + else: + log.msg("using default ipengine sge script: \n%s" % + (sge_template % n)) + self._temp_file.file.write(sge_template % n) + self.template_file = self._temp_file.name + self._temp_file.file.flush() + d = getProcessOutput(self.submit_command, + [self.template_file], + env=os.environ) + d.addCallback(self.parse_job_id) + d.addErrback(self.handle_error) + return d + +sge_template="""#$ -V +#$ -t 1-%d +#$ -N ipengine +eid=$(($SGE_TASK_ID - 1)) +ipengine --logfile=ipengine${eid}.log +""" + sshx_template="""#!/bin/sh "$@" &> /dev/null & echo $! @@ -696,6 +686,10 @@ def main_sge(args): # See if we are reusing FURL files if not check_reuse(args, cont_args): return + + if args.sgescript and not os.path.isfile(args.sgescript): + log.err('SGE script does not exist: %s' % args.sgescript) + return cl = ControllerLauncher(extra_args=cont_args) dstart = cl.start() @@ -877,7 +871,7 @@ def get_args(): type=str, dest='sgescript', help='SGE script template', - default='template.sge' + default='' # SGEEngineSet will create one if not specified ) parser_sge.set_defaults(func=main_sge)