Show More
@@ -331,15 +331,11 b' class PBSEngineSet(BatchEngineSet):' | |||||
331 | def __init__(self, template_file, **kwargs): |
|
331 | def __init__(self, template_file, **kwargs): | |
332 | BatchEngineSet.__init__(self, template_file, **kwargs) |
|
332 | BatchEngineSet.__init__(self, template_file, **kwargs) | |
333 |
|
333 | |||
334 |
class SGEEngineSet( |
|
334 | class SGEEngineSet(PBSEngineSet): | |
335 |
|
335 | |||
336 | submit_command = 'qsub' |
|
|||
337 | delete_command = 'qdel' |
|
|||
338 | job_id_regexp = '\d+' |
|
|||
339 |
|
||||
340 | def __init__(self, template_file, **kwargs): |
|
336 | def __init__(self, template_file, **kwargs): | |
341 | BatchEngineSet.__init__(self, template_file, **kwargs) |
|
337 | BatchEngineSet.__init__(self, template_file, **kwargs) | |
342 |
self. |
|
338 | self._temp_file = None | |
343 |
|
339 | |||
344 | def parse_job_id(self, output): |
|
340 | def parse_job_id(self, output): | |
345 | m = re.search(self.job_id_regexp, output) |
|
341 | m = re.search(self.job_id_regexp, output) | |
@@ -347,48 +343,42 b' class SGEEngineSet(BatchEngineSet):' | |||||
347 | job_id = m.group() |
|
343 | job_id = m.group() | |
348 | else: |
|
344 | else: | |
349 | raise Exception("job id couldn't be determined: %s" % output) |
|
345 | raise Exception("job id couldn't be determined: %s" % output) | |
350 |
self.job_id |
|
346 | self.job_id = job_id | |
351 |
log.msg(' |
|
347 | log.msg('job started with job id: %r' % job_id) | |
352 | return job_id |
|
348 | return job_id | |
353 |
|
349 | |||
354 | def kill_job(self, output): |
|
|||
355 | log.msg(output) |
|
|||
356 | return output |
|
|||
357 |
|
||||
358 | def write_batch_script(self, i): |
|
|||
359 | context = {'eid':i} |
|
|||
360 | template = open(self.template_file, 'r').read() |
|
|||
361 | log.msg('Using template for batch script: %s' % self.template_file) |
|
|||
362 | script_as_string = Itpl.itplns(template, context) |
|
|||
363 | log.msg('Writing instantiated batch script: %s' % self.batch_file+str(i)) |
|
|||
364 | f = open(self.batch_file+str(i),'w') |
|
|||
365 | f.write(script_as_string) |
|
|||
366 | f.close() |
|
|||
367 |
|
||||
368 | def start(self, n): |
|
350 | def start(self, n): | |
369 | dlist = [] |
|
351 | log.msg("starting %d engines" % n) | |
370 | self.num_engines = 0 |
|
352 | self._temp_file = tempfile.NamedTemporaryFile() | |
371 | self.job_id = [] |
|
353 | regex = re.compile('#\$[ \t]+-t[ \t]+\d+') | |
372 | for i in range(n): |
|
354 | if self.template_file: | |
373 |
log.msg(" |
|
355 | log.msg("Using sge script %s" % self.template_file) | |
374 | self.write_batch_script(i) |
|
356 | contents = open(self.template_file, 'r').read() | |
375 | d = getProcessOutput(self.submit_command, |
|
357 | if not regex.search(contents): | |
376 | [self.batch_file+str(i)],env=os.environ) |
|
358 | log.msg("adding job array settings to sge script") | |
377 | d.addCallback(self.parse_job_id) |
|
359 | contents = ("#$ -t 1-%d\n" % n) + contents | |
378 | d.addErrback(self.handle_error) |
|
360 | self._temp_file.write(contents) | |
379 | dlist.append(d) |
|
361 | self.template_file = self._temp_file.name | |
380 | return gatherBoth(dlist, consumeErrors=True) |
|
362 | else: | |
381 |
|
363 | log.msg("using default ipengine sge script: \n%s" % | ||
382 | def kill(self): |
|
364 | (sge_template % n)) | |
383 | dlist = [] |
|
365 | self._temp_file.file.write(sge_template % n) | |
384 | for i in range(self.num_engines): |
|
366 | self.template_file = self._temp_file.name | |
385 | log.msg("killing job id: %d"%self.job_id[i]) |
|
367 | self._temp_file.file.flush() | |
386 |
|
|
368 | d = getProcessOutput(self.submit_command, | |
387 |
|
|
369 | [self.template_file], | |
388 | d.addCallback(self.kill_job) |
|
370 | env=os.environ) | |
389 | dlist.append(d) |
|
371 | d.addCallback(self.parse_job_id) | |
390 | return gatherBoth(dlist, consumeErrors=True) |
|
372 | d.addErrback(self.handle_error) | |
391 |
|
373 | return d | ||
|
374 | ||||
|
375 | sge_template="""#$ -V | |||
|
376 | #$ -t 1-%d | |||
|
377 | #$ -N ipengine | |||
|
378 | eid=$(($SGE_TASK_ID - 1)) | |||
|
379 | ipengine --logfile=ipengine${eid}.log | |||
|
380 | """ | |||
|
381 | ||||
392 | sshx_template="""#!/bin/sh |
|
382 | sshx_template="""#!/bin/sh | |
393 | "$@" &> /dev/null & |
|
383 | "$@" &> /dev/null & | |
394 | echo $! |
|
384 | echo $! | |
@@ -696,6 +686,10 b' def main_sge(args):' | |||||
696 | # See if we are reusing FURL files |
|
686 | # See if we are reusing FURL files | |
697 | if not check_reuse(args, cont_args): |
|
687 | if not check_reuse(args, cont_args): | |
698 | return |
|
688 | return | |
|
689 | ||||
|
690 | if args.sgescript and not os.path.isfile(args.sgescript): | |||
|
691 | log.err('SGE script does not exist: %s' % args.sgescript) | |||
|
692 | return | |||
699 |
|
693 | |||
700 | cl = ControllerLauncher(extra_args=cont_args) |
|
694 | cl = ControllerLauncher(extra_args=cont_args) | |
701 | dstart = cl.start() |
|
695 | dstart = cl.start() | |
@@ -877,7 +871,7 b' def get_args():' | |||||
877 | type=str, |
|
871 | type=str, | |
878 | dest='sgescript', |
|
872 | dest='sgescript', | |
879 | help='SGE script template', |
|
873 | help='SGE script template', | |
880 | default='template.sge' |
|
874 | default='' # SGEEngineSet will create one if not specified | |
881 | ) |
|
875 | ) | |
882 | parser_sge.set_defaults(func=main_sge) |
|
876 | parser_sge.set_defaults(func=main_sge) | |
883 |
|
877 |
General Comments 0
You need to be logged in to leave comments.
Login now