diff --git a/IPython/kernel/scripts/ipcluster.py b/IPython/kernel/scripts/ipcluster.py index 4d8442a..476267f 100644 --- a/IPython/kernel/scripts/ipcluster.py +++ b/IPython/kernel/scripts/ipcluster.py @@ -271,22 +271,22 @@ class LocalEngineSet(object): dfinal.addCallback(self._handle_stop) return dfinal - class BatchEngineSet(object): - - # Subclasses must fill these in. See PBSEngineSet + + # Subclasses must fill these in. See PBSEngineSet/SGEEngineSet + name = '' submit_command = '' delete_command = '' + script_param_prefix = '' job_id_regexp = '' - + job_array_regexp = '' + default_template = '' + def __init__(self, template_file, **kwargs): self.template_file = template_file - self.context = {} - self.context.update(kwargs) - self.batch_file = self.template_file+'-run' - + def parse_job_id(self, output): - m = re.match(self.job_id_regexp, output) + m = re.search(self.job_id_regexp, output) if m is not None: job_id = m.group() else: @@ -294,75 +294,27 @@ class BatchEngineSet(object): self.job_id = job_id log.msg('Job started with job id: %r' % job_id) return job_id - - def write_batch_script(self, n): - self.context['n'] = n - template = open(self.template_file, 'r').read() - log.msg('Using template for batch script: %s' % self.template_file) - script_as_string = Itpl.itplns(template, self.context) - log.msg('Writing instantiated batch script: %s' % self.batch_file) - f = open(self.batch_file,'w') - f.write(script_as_string) - f.close() - + def handle_error(self, f): f.printTraceback() f.raiseException() - - def start(self, n): - self.write_batch_script(n) - d = getProcessOutput(self.submit_command, - [self.batch_file],env=os.environ) - d.addCallback(self.parse_job_id) - d.addErrback(self.handle_error) - return d - - def kill(self): - d = getProcessOutput(self.delete_command, - [self.job_id],env=os.environ) - return d - -class PBSEngineSet(BatchEngineSet): - - submit_command = 'qsub' - delete_command = 'qdel' - job_id_regexp = '\d+' - - def __init__(self, template_file, **kwargs): - BatchEngineSet.__init__(self, template_file, **kwargs) - -class SGEEngineSet(PBSEngineSet): - - def __init__(self, template_file, **kwargs): - BatchEngineSet.__init__(self, template_file, **kwargs) - self._temp_file = None - - def parse_job_id(self, output): - m = re.search(self.job_id_regexp, output) - if m is not None: - job_id = m.group() - else: - raise Exception("job id couldn't be determined: %s" % output) - self.job_id = job_id - log.msg('job started with job id: %r' % job_id) - return job_id def start(self, n): log.msg("starting %d engines" % n) self._temp_file = tempfile.NamedTemporaryFile() - regex = re.compile('#\$[ \t]+-t[ \t]+\d+') + regex = re.compile(self.job_array_regexp) if self.template_file: - log.msg("Using sge script %s" % self.template_file) + log.msg("Using %s script %s" % (self.name, self.template_file)) contents = open(self.template_file, 'r').read() if not regex.search(contents): - log.msg("adding job array settings to sge script") - contents = ("#$ -t 1-%d\n" % n) + contents + log.msg("adding job array settings to %s script" % self.name) + contents = ("%s -t 1-%d\n" % (self.script_param_prefix,n)) + contents self._temp_file.write(contents) self.template_file = self._temp_file.name else: - log.msg("using default ipengine sge script: \n%s" % - (sge_template % n)) - self._temp_file.file.write(sge_template % n) + log.msg("using default ipengine %s script: \n%s" % + (self.name, (self.default_template % n))) + self._temp_file.file.write(self.default_template % n) self.template_file = self._temp_file.name self._temp_file.file.flush() d = getProcessOutput(self.submit_command, @@ -372,7 +324,32 @@ class SGEEngineSet(PBSEngineSet): d.addErrback(self.handle_error) return d -sge_template="""#$ -V + def kill(self): + d = getProcessOutput(self.delete_command, + [self.job_id],env=os.environ) + return d + +class PBSEngineSet(BatchEngineSet): + + name = 'PBS' + submit_command = 'qsub' + delete_command = 'qdel' + script_param_prefix = "#PBS" + job_id_regexp = '\d+' + job_array_regexp = '#PBS[ \t]+-t[ \t]+\d+' + default_template="""#PBS -V +#PBS -t 1-%d +#PBS -N ipengine +eid=$(($PBS_ARRAYID - 1)) +ipengine --logfile=ipengine${eid}.log +""" + +class SGEEngineSet(PBSEngineSet): + + name = 'SGE' + script_param_prefix = "#$" + job_array_regexp = '#\$[ \t]+-t[ \t]+\d+' + default_template="""#$ -V #$ -t 1-%d #$ -N ipengine eid=$(($SGE_TASK_ID - 1)) @@ -857,7 +834,7 @@ def get_args(): type=str, dest='pbsscript', help='PBS script template', - default='pbs.template' + default='' ) parser_pbs.set_defaults(func=main_pbs)