##// END OF EJS Templates
update PBSEngineSet to use job arrays...
Justin Riley -
Show More
@@ -271,22 +271,22 b' class LocalEngineSet(object):'
271 dfinal.addCallback(self._handle_stop)
271 dfinal.addCallback(self._handle_stop)
272 return dfinal
272 return dfinal
273
273
274
275 class BatchEngineSet(object):
274 class BatchEngineSet(object):
276
275
277 # Subclasses must fill these in. See PBSEngineSet
276 # Subclasses must fill these in. See PBSEngineSet/SGEEngineSet
277 name = ''
278 submit_command = ''
278 submit_command = ''
279 delete_command = ''
279 delete_command = ''
280 script_param_prefix = ''
280 job_id_regexp = ''
281 job_id_regexp = ''
281
282 job_array_regexp = ''
283 default_template = ''
284
282 def __init__(self, template_file, **kwargs):
285 def __init__(self, template_file, **kwargs):
283 self.template_file = template_file
286 self.template_file = template_file
284 self.context = {}
287
285 self.context.update(kwargs)
286 self.batch_file = self.template_file+'-run'
287
288 def parse_job_id(self, output):
288 def parse_job_id(self, output):
289 m = re.match(self.job_id_regexp, output)
289 m = re.search(self.job_id_regexp, output)
290 if m is not None:
290 if m is not None:
291 job_id = m.group()
291 job_id = m.group()
292 else:
292 else:
@@ -294,75 +294,27 b' class BatchEngineSet(object):'
294 self.job_id = job_id
294 self.job_id = job_id
295 log.msg('Job started with job id: %r' % job_id)
295 log.msg('Job started with job id: %r' % job_id)
296 return job_id
296 return job_id
297
297
298 def write_batch_script(self, n):
299 self.context['n'] = n
300 template = open(self.template_file, 'r').read()
301 log.msg('Using template for batch script: %s' % self.template_file)
302 script_as_string = Itpl.itplns(template, self.context)
303 log.msg('Writing instantiated batch script: %s' % self.batch_file)
304 f = open(self.batch_file,'w')
305 f.write(script_as_string)
306 f.close()
307
308 def handle_error(self, f):
298 def handle_error(self, f):
309 f.printTraceback()
299 f.printTraceback()
310 f.raiseException()
300 f.raiseException()
311
312 def start(self, n):
313 self.write_batch_script(n)
314 d = getProcessOutput(self.submit_command,
315 [self.batch_file],env=os.environ)
316 d.addCallback(self.parse_job_id)
317 d.addErrback(self.handle_error)
318 return d
319
320 def kill(self):
321 d = getProcessOutput(self.delete_command,
322 [self.job_id],env=os.environ)
323 return d
324
325 class PBSEngineSet(BatchEngineSet):
326
327 submit_command = 'qsub'
328 delete_command = 'qdel'
329 job_id_regexp = '\d+'
330
331 def __init__(self, template_file, **kwargs):
332 BatchEngineSet.__init__(self, template_file, **kwargs)
333
334 class SGEEngineSet(PBSEngineSet):
335
336 def __init__(self, template_file, **kwargs):
337 BatchEngineSet.__init__(self, template_file, **kwargs)
338 self._temp_file = None
339
340 def parse_job_id(self, output):
341 m = re.search(self.job_id_regexp, output)
342 if m is not None:
343 job_id = m.group()
344 else:
345 raise Exception("job id couldn't be determined: %s" % output)
346 self.job_id = job_id
347 log.msg('job started with job id: %r' % job_id)
348 return job_id
349
301
350 def start(self, n):
302 def start(self, n):
351 log.msg("starting %d engines" % n)
303 log.msg("starting %d engines" % n)
352 self._temp_file = tempfile.NamedTemporaryFile()
304 self._temp_file = tempfile.NamedTemporaryFile()
353 regex = re.compile('#\$[ \t]+-t[ \t]+\d+')
305 regex = re.compile(self.job_array_regexp)
354 if self.template_file:
306 if self.template_file:
355 log.msg("Using sge script %s" % self.template_file)
307 log.msg("Using %s script %s" % (self.name, self.template_file))
356 contents = open(self.template_file, 'r').read()
308 contents = open(self.template_file, 'r').read()
357 if not regex.search(contents):
309 if not regex.search(contents):
358 log.msg("adding job array settings to sge script")
310 log.msg("adding job array settings to %s script" % self.name)
359 contents = ("#$ -t 1-%d\n" % n) + contents
311 contents = ("%s -t 1-%d\n" % (self.script_param_prefix,n)) + contents
360 self._temp_file.write(contents)
312 self._temp_file.write(contents)
361 self.template_file = self._temp_file.name
313 self.template_file = self._temp_file.name
362 else:
314 else:
363 log.msg("using default ipengine sge script: \n%s" %
315 log.msg("using default ipengine %s script: \n%s" %
364 (sge_template % n))
316 (self.name, (self.default_template % n)))
365 self._temp_file.file.write(sge_template % n)
317 self._temp_file.file.write(self.default_template % n)
366 self.template_file = self._temp_file.name
318 self.template_file = self._temp_file.name
367 self._temp_file.file.flush()
319 self._temp_file.file.flush()
368 d = getProcessOutput(self.submit_command,
320 d = getProcessOutput(self.submit_command,
@@ -372,7 +324,32 b' class SGEEngineSet(PBSEngineSet):'
372 d.addErrback(self.handle_error)
324 d.addErrback(self.handle_error)
373 return d
325 return d
374
326
375 sge_template="""#$ -V
327 def kill(self):
328 d = getProcessOutput(self.delete_command,
329 [self.job_id],env=os.environ)
330 return d
331
332 class PBSEngineSet(BatchEngineSet):
333
334 name = 'PBS'
335 submit_command = 'qsub'
336 delete_command = 'qdel'
337 script_param_prefix = "#PBS"
338 job_id_regexp = '\d+'
339 job_array_regexp = '#PBS[ \t]+-t[ \t]+\d+'
340 default_template="""#PBS -V
341 #PBS -t 1-%d
342 #PBS -N ipengine
343 eid=$(($PBS_ARRAYID - 1))
344 ipengine --logfile=ipengine${eid}.log
345 """
346
347 class SGEEngineSet(PBSEngineSet):
348
349 name = 'SGE'
350 script_param_prefix = "#$"
351 job_array_regexp = '#\$[ \t]+-t[ \t]+\d+'
352 default_template="""#$ -V
376 #$ -t 1-%d
353 #$ -t 1-%d
377 #$ -N ipengine
354 #$ -N ipengine
378 eid=$(($SGE_TASK_ID - 1))
355 eid=$(($SGE_TASK_ID - 1))
@@ -857,7 +834,7 b' def get_args():'
857 type=str,
834 type=str,
858 dest='pbsscript',
835 dest='pbsscript',
859 help='PBS script template',
836 help='PBS script template',
860 default='pbs.template'
837 default=''
861 )
838 )
862 parser_pbs.set_defaults(func=main_pbs)
839 parser_pbs.set_defaults(func=main_pbs)
863
840
General Comments 0
You need to be logged in to leave comments. Login now