##// END OF EJS Templates
update PBSEngineSet to use job arrays...
Justin Riley -
Show More
@@ -271,22 +271,22 b' class LocalEngineSet(object):'
271 271 dfinal.addCallback(self._handle_stop)
272 272 return dfinal
273 273
274
275 274 class BatchEngineSet(object):
276 275
277 # Subclasses must fill these in. See PBSEngineSet
276 # Subclasses must fill these in. See PBSEngineSet/SGEEngineSet
277 name = ''
278 278 submit_command = ''
279 279 delete_command = ''
280 script_param_prefix = ''
280 281 job_id_regexp = ''
282 job_array_regexp = ''
283 default_template = ''
281 284
282 285 def __init__(self, template_file, **kwargs):
283 286 self.template_file = template_file
284 self.context = {}
285 self.context.update(kwargs)
286 self.batch_file = self.template_file+'-run'
287 287
288 288 def parse_job_id(self, output):
289 m = re.match(self.job_id_regexp, output)
289 m = re.search(self.job_id_regexp, output)
290 290 if m is not None:
291 291 job_id = m.group()
292 292 else:
@@ -295,24 +295,31 b' class BatchEngineSet(object):'
295 295 log.msg('Job started with job id: %r' % job_id)
296 296 return job_id
297 297
298 def write_batch_script(self, n):
299 self.context['n'] = n
300 template = open(self.template_file, 'r').read()
301 log.msg('Using template for batch script: %s' % self.template_file)
302 script_as_string = Itpl.itplns(template, self.context)
303 log.msg('Writing instantiated batch script: %s' % self.batch_file)
304 f = open(self.batch_file,'w')
305 f.write(script_as_string)
306 f.close()
307
308 298 def handle_error(self, f):
309 299 f.printTraceback()
310 300 f.raiseException()
311 301
312 302 def start(self, n):
313 self.write_batch_script(n)
303 log.msg("starting %d engines" % n)
304 self._temp_file = tempfile.NamedTemporaryFile()
305 regex = re.compile(self.job_array_regexp)
306 if self.template_file:
307 log.msg("Using %s script %s" % (self.name, self.template_file))
308 contents = open(self.template_file, 'r').read()
309 if not regex.search(contents):
310 log.msg("adding job array settings to %s script" % self.name)
311 contents = ("%s -t 1-%d\n" % (self.script_param_prefix,n)) + contents
312 self._temp_file.write(contents)
313 self.template_file = self._temp_file.name
314 else:
315 log.msg("using default ipengine %s script: \n%s" %
316 (self.name, (self.default_template % n)))
317 self._temp_file.file.write(self.default_template % n)
318 self.template_file = self._temp_file.name
319 self._temp_file.file.flush()
314 320 d = getProcessOutput(self.submit_command,
315 [self.batch_file],env=os.environ)
321 [self.template_file],
322 env=os.environ)
316 323 d.addCallback(self.parse_job_id)
317 324 d.addErrback(self.handle_error)
318 325 return d
@@ -324,55 +331,25 b' class BatchEngineSet(object):'
324 331
325 332 class PBSEngineSet(BatchEngineSet):
326 333
334 name = 'PBS'
327 335 submit_command = 'qsub'
328 336 delete_command = 'qdel'
337 script_param_prefix = "#PBS"
329 338 job_id_regexp = '\d+'
330
331 def __init__(self, template_file, **kwargs):
332 BatchEngineSet.__init__(self, template_file, **kwargs)
339 job_array_regexp = '#PBS[ \t]+-t[ \t]+\d+'
340 default_template="""#PBS -V
341 #PBS -t 1-%d
342 #PBS -N ipengine
343 eid=$(($PBS_ARRAYID - 1))
344 ipengine --logfile=ipengine${eid}.log
345 """
333 346
334 347 class SGEEngineSet(PBSEngineSet):
335 348
336 def __init__(self, template_file, **kwargs):
337 BatchEngineSet.__init__(self, template_file, **kwargs)
338 self._temp_file = None
339
340 def parse_job_id(self, output):
341 m = re.search(self.job_id_regexp, output)
342 if m is not None:
343 job_id = m.group()
344 else:
345 raise Exception("job id couldn't be determined: %s" % output)
346 self.job_id = job_id
347 log.msg('job started with job id: %r' % job_id)
348 return job_id
349
350 def start(self, n):
351 log.msg("starting %d engines" % n)
352 self._temp_file = tempfile.NamedTemporaryFile()
353 regex = re.compile('#\$[ \t]+-t[ \t]+\d+')
354 if self.template_file:
355 log.msg("Using sge script %s" % self.template_file)
356 contents = open(self.template_file, 'r').read()
357 if not regex.search(contents):
358 log.msg("adding job array settings to sge script")
359 contents = ("#$ -t 1-%d\n" % n) + contents
360 self._temp_file.write(contents)
361 self.template_file = self._temp_file.name
362 else:
363 log.msg("using default ipengine sge script: \n%s" %
364 (sge_template % n))
365 self._temp_file.file.write(sge_template % n)
366 self.template_file = self._temp_file.name
367 self._temp_file.file.flush()
368 d = getProcessOutput(self.submit_command,
369 [self.template_file],
370 env=os.environ)
371 d.addCallback(self.parse_job_id)
372 d.addErrback(self.handle_error)
373 return d
374
375 sge_template="""#$ -V
349 name = 'SGE'
350 script_param_prefix = "#$"
351 job_array_regexp = '#\$[ \t]+-t[ \t]+\d+'
352 default_template="""#$ -V
376 353 #$ -t 1-%d
377 354 #$ -N ipengine
378 355 eid=$(($SGE_TASK_ID - 1))
@@ -857,7 +834,7 b' def get_args():'
857 834 type=str,
858 835 dest='pbsscript',
859 836 help='PBS script template',
860 default='pbs.template'
837 default=''
861 838 )
862 839 parser_pbs.set_defaults(func=main_pbs)
863 840
General Comments 0
You need to be logged in to leave comments. Login now