From 225cd966aafbe718b9fe21d437d5e5e6dd59cff3 2010-08-01 21:25:30 From: Justin Riley Date: 2010-08-01 21:25:30 Subject: [PATCH] update SGEEngineSet to use SGE job arrays SGE job arrays allow one job id to be associated with a set of processes on an SGE cluster. modified SGEEngineSet to be a subclass PBSEngineSet ipcluster will now generate a default SGE job script if --sge-script is not provided. most folks should ignore the --sge-script option unless they know they need it. if --sge-script is passed, check that the script exists and that the user has defined a "#$ -t" setting within the script. if not, add the setting for them by copying the script to a temp file and launching the job array using the modified temp file. ipengines terminate cleanly now when the ipcluster command exits. i think we still need to handle furl files when engines are assigned to different hosts via SGE. without using ssh or nfs, the only other way is to put the contents of the furl in the job script before submission but this is less secure. need to discuss this. --- diff --git a/IPython/kernel/scripts/ipcluster.py b/IPython/kernel/scripts/ipcluster.py index 06d79d7..4d8442a 100644 --- a/IPython/kernel/scripts/ipcluster.py +++ b/IPython/kernel/scripts/ipcluster.py @@ -331,15 +331,11 @@ class PBSEngineSet(BatchEngineSet): def __init__(self, template_file, **kwargs): BatchEngineSet.__init__(self, template_file, **kwargs) -class SGEEngineSet(BatchEngineSet): - - submit_command = 'qsub' - delete_command = 'qdel' - job_id_regexp = '\d+' - +class SGEEngineSet(PBSEngineSet): + def __init__(self, template_file, **kwargs): BatchEngineSet.__init__(self, template_file, **kwargs) - self.num_engines = None + self._temp_file = None def parse_job_id(self, output): m = re.search(self.job_id_regexp, output) @@ -347,48 +343,42 @@ class SGEEngineSet(BatchEngineSet): job_id = m.group() else: raise Exception("job id couldn't be determined: %s" % output) - self.job_id.append(job_id) - log.msg('Job started with job id: %r' % job_id) + self.job_id = job_id + log.msg('job started with job id: %r' % job_id) return job_id - - def kill_job(self, output): - log.msg(output) - return output - - def write_batch_script(self, i): - context = {'eid':i} - template = open(self.template_file, 'r').read() - log.msg('Using template for batch script: %s' % self.template_file) - script_as_string = Itpl.itplns(template, context) - log.msg('Writing instantiated batch script: %s' % self.batch_file+str(i)) - f = open(self.batch_file+str(i),'w') - f.write(script_as_string) - f.close() - + def start(self, n): - dlist = [] - self.num_engines = 0 - self.job_id = [] - for i in range(n): - log.msg("starting engine: %d"%i) - self.write_batch_script(i) - d = getProcessOutput(self.submit_command, - [self.batch_file+str(i)],env=os.environ) - d.addCallback(self.parse_job_id) - d.addErrback(self.handle_error) - dlist.append(d) - return gatherBoth(dlist, consumeErrors=True) - - def kill(self): - dlist = [] - for i in range(self.num_engines): - log.msg("killing job id: %d"%self.job_id[i]) - d = getProcessOutput(self.delete_command, - [self.job_id[i]],env=os.environ) - d.addCallback(self.kill_job) - dlist.append(d) - return gatherBoth(dlist, consumeErrors=True) - + log.msg("starting %d engines" % n) + self._temp_file = tempfile.NamedTemporaryFile() + regex = re.compile('#\$[ \t]+-t[ \t]+\d+') + if self.template_file: + log.msg("Using sge script %s" % self.template_file) + contents = open(self.template_file, 'r').read() + if not regex.search(contents): + log.msg("adding job array settings to sge script") + contents = ("#$ -t 1-%d\n" % n) + contents + self._temp_file.write(contents) + self.template_file = self._temp_file.name + else: + log.msg("using default ipengine sge script: \n%s" % + (sge_template % n)) + self._temp_file.file.write(sge_template % n) + self.template_file = self._temp_file.name + self._temp_file.file.flush() + d = getProcessOutput(self.submit_command, + [self.template_file], + env=os.environ) + d.addCallback(self.parse_job_id) + d.addErrback(self.handle_error) + return d + +sge_template="""#$ -V +#$ -t 1-%d +#$ -N ipengine +eid=$(($SGE_TASK_ID - 1)) +ipengine --logfile=ipengine${eid}.log +""" + sshx_template="""#!/bin/sh "$@" &> /dev/null & echo $! @@ -696,6 +686,10 @@ def main_sge(args): # See if we are reusing FURL files if not check_reuse(args, cont_args): return + + if args.sgescript and not os.path.isfile(args.sgescript): + log.err('SGE script does not exist: %s' % args.sgescript) + return cl = ControllerLauncher(extra_args=cont_args) dstart = cl.start() @@ -877,7 +871,7 @@ def get_args(): type=str, dest='sgescript', help='SGE script template', - default='template.sge' + default='' # SGEEngineSet will create one if not specified ) parser_sge.set_defaults(func=main_sge)