##// END OF EJS Templates
update SGEEngineSet to use SGE job arrays...
Justin Riley -
Show More
@@ -331,15 +331,11 b' class PBSEngineSet(BatchEngineSet):'
331 def __init__(self, template_file, **kwargs):
331 def __init__(self, template_file, **kwargs):
332 BatchEngineSet.__init__(self, template_file, **kwargs)
332 BatchEngineSet.__init__(self, template_file, **kwargs)
333
333
334 class SGEEngineSet(BatchEngineSet):
334 class SGEEngineSet(PBSEngineSet):
335
335
336 submit_command = 'qsub'
337 delete_command = 'qdel'
338 job_id_regexp = '\d+'
339
340 def __init__(self, template_file, **kwargs):
336 def __init__(self, template_file, **kwargs):
341 BatchEngineSet.__init__(self, template_file, **kwargs)
337 BatchEngineSet.__init__(self, template_file, **kwargs)
342 self.num_engines = None
338 self._temp_file = None
343
339
344 def parse_job_id(self, output):
340 def parse_job_id(self, output):
345 m = re.search(self.job_id_regexp, output)
341 m = re.search(self.job_id_regexp, output)
@@ -347,48 +343,42 b' class SGEEngineSet(BatchEngineSet):'
347 job_id = m.group()
343 job_id = m.group()
348 else:
344 else:
349 raise Exception("job id couldn't be determined: %s" % output)
345 raise Exception("job id couldn't be determined: %s" % output)
350 self.job_id.append(job_id)
346 self.job_id = job_id
351 log.msg('Job started with job id: %r' % job_id)
347 log.msg('job started with job id: %r' % job_id)
352 return job_id
348 return job_id
353
349
354 def kill_job(self, output):
355 log.msg(output)
356 return output
357
358 def write_batch_script(self, i):
359 context = {'eid':i}
360 template = open(self.template_file, 'r').read()
361 log.msg('Using template for batch script: %s' % self.template_file)
362 script_as_string = Itpl.itplns(template, context)
363 log.msg('Writing instantiated batch script: %s' % self.batch_file+str(i))
364 f = open(self.batch_file+str(i),'w')
365 f.write(script_as_string)
366 f.close()
367
368 def start(self, n):
350 def start(self, n):
369 dlist = []
351 log.msg("starting %d engines" % n)
370 self.num_engines = 0
352 self._temp_file = tempfile.NamedTemporaryFile()
371 self.job_id = []
353 regex = re.compile('#\$[ \t]+-t[ \t]+\d+')
372 for i in range(n):
354 if self.template_file:
373 log.msg("starting engine: %d"%i)
355 log.msg("Using sge script %s" % self.template_file)
374 self.write_batch_script(i)
356 contents = open(self.template_file, 'r').read()
375 d = getProcessOutput(self.submit_command,
357 if not regex.search(contents):
376 [self.batch_file+str(i)],env=os.environ)
358 log.msg("adding job array settings to sge script")
377 d.addCallback(self.parse_job_id)
359 contents = ("#$ -t 1-%d\n" % n) + contents
378 d.addErrback(self.handle_error)
360 self._temp_file.write(contents)
379 dlist.append(d)
361 self.template_file = self._temp_file.name
380 return gatherBoth(dlist, consumeErrors=True)
362 else:
381
363 log.msg("using default ipengine sge script: \n%s" %
382 def kill(self):
364 (sge_template % n))
383 dlist = []
365 self._temp_file.file.write(sge_template % n)
384 for i in range(self.num_engines):
366 self.template_file = self._temp_file.name
385 log.msg("killing job id: %d"%self.job_id[i])
367 self._temp_file.file.flush()
386 d = getProcessOutput(self.delete_command,
368 d = getProcessOutput(self.submit_command,
387 [self.job_id[i]],env=os.environ)
369 [self.template_file],
388 d.addCallback(self.kill_job)
370 env=os.environ)
389 dlist.append(d)
371 d.addCallback(self.parse_job_id)
390 return gatherBoth(dlist, consumeErrors=True)
372 d.addErrback(self.handle_error)
391
373 return d
374
375 sge_template="""#$ -V
376 #$ -t 1-%d
377 #$ -N ipengine
378 eid=$(($SGE_TASK_ID - 1))
379 ipengine --logfile=ipengine${eid}.log
380 """
381
392 sshx_template="""#!/bin/sh
382 sshx_template="""#!/bin/sh
393 "$@" &> /dev/null &
383 "$@" &> /dev/null &
394 echo $!
384 echo $!
@@ -696,6 +686,10 b' def main_sge(args):'
696 # See if we are reusing FURL files
686 # See if we are reusing FURL files
697 if not check_reuse(args, cont_args):
687 if not check_reuse(args, cont_args):
698 return
688 return
689
690 if args.sgescript and not os.path.isfile(args.sgescript):
691 log.err('SGE script does not exist: %s' % args.sgescript)
692 return
699
693
700 cl = ControllerLauncher(extra_args=cont_args)
694 cl = ControllerLauncher(extra_args=cont_args)
701 dstart = cl.start()
695 dstart = cl.start()
@@ -877,7 +871,7 b' def get_args():'
877 type=str,
871 type=str,
878 dest='sgescript',
872 dest='sgescript',
879 help='SGE script template',
873 help='SGE script template',
880 default='template.sge'
874 default='' # SGEEngineSet will create one if not specified
881 )
875 )
882 parser_sge.set_defaults(func=main_sge)
876 parser_sge.set_defaults(func=main_sge)
883
877
General Comments 0
You need to be logged in to leave comments. Login now